1010
1111class MessageBus ::ReliablePubSub
1212
13+ class NoMoreRetries < StandardError ; end
14+ class BackLogOutOfOrder < StandardError
15+ attr_accessor :highest_id
16+
17+ def initialize ( highest_id )
18+ @highest_id = highest_id
19+ end
20+ end
21+
22+ def max_publish_retries = ( val )
23+ @max_publish_retries = val
24+ end
25+
26+ def max_publish_retries
27+ @max_publish_retries ||= 10
28+ end
29+
30+ def max_publish_wait = ( ms )
31+ @max_publish_wait = ms
32+ end
33+
34+ def max_publish_wait
35+ @max_publish_wait ||= 500
36+ end
37+
1338 # max_backlog_size is per multiplexed channel
1439 def initialize ( redis_config = { } , max_backlog_size = 1000 )
1540 @redis_config = redis_config
@@ -42,25 +67,21 @@ def pub_redis
4267 @pub_redis ||= new_redis_connection
4368 end
4469
45- def offset_key ( channel )
46- "__mb_offset_#{ channel } "
47- end
48-
4970 def backlog_key ( channel )
5071 "__mb_backlog_#{ channel } "
5172 end
5273
74+ def backlog_id_key ( channel )
75+ "__mb_backlog_id_#{ channel } "
76+ end
77+
5378 def global_id_key
5479 "__mb_global_id"
5580 end
5681
5782 def global_backlog_key
5883 "__mb_global_backlog"
5984 end
60-
61- def global_offset_key
62- "__mb_global_offset"
63- end
6485
6586 # use with extreme care, will nuke all of the data
6687 def reset!
@@ -71,74 +92,49 @@ def reset!
7192
7293 def publish ( channel , data )
7394 redis = pub_redis
74- offset_key = offset_key ( channel )
95+ backlog_id_key = backlog_id_key ( channel )
7596 backlog_key = backlog_key ( channel )
7697
77- redis . watch ( offset_key , backlog_key , global_id_key , global_backlog_key , global_offset_key ) do
78- offset = redis . get ( offset_key ) . to_i
79- backlog = redis . llen ( backlog_key ) . to_i
98+ global_id = nil
99+ backlog_id = nil
80100
81- global_offset = redis . get ( global_offset_key ) . to_i
82- global_backlog = redis . llen ( global_backlog_key ) . to_i
83-
84- global_id = redis . get ( global_id_key ) . to_i
85- global_id += 1
101+ redis . multi do |m |
102+ global_id = m . incr ( global_id_key )
103+ backlog_id = m . incr ( backlog_id_key )
104+ end
86105
87- too_big = backlog + 1 > @max_backlog_size
88- global_too_big = global_backlog + 1 > @max_global_backlog_size
106+ global_id = global_id . value
107+ backlog_id = backlog_id . value
89108
90- message_id = backlog + offset + 1
91- redis . multi do
92- if too_big
93- redis . ltrim backlog_key , ( backlog +1 ) - @max_backlog_size , -1
94- offset += ( backlog +1 ) - @max_backlog_size
95- redis . set ( offset_key , offset )
96- end
109+ msg = MessageBus ::Message . new global_id , backlog_id , channel , data
110+ payload = msg . encode
97111
98- if global_too_big
99- redis . ltrim global_backlog_key , ( global_backlog +1 ) - @max_global_backlog_size , -1
100- global_offset += ( global_backlog +1 ) - @max_global_backlog_size
101- redis . set ( global_offset_key , global_offset )
102- end
112+ redis . zadd backlog_key , backlog_id , payload
113+ redis . zadd global_backlog_key , global_id , backlog_id . to_s << "|" << channel
103114
104- msg = MessageBus ::Message . new global_id , message_id , channel , data
105- payload = msg . encode
115+ redis . publish redis_channel_name , payload
106116
107- redis . set global_id_key , global_id
108- redis . rpush backlog_key , payload
109- redis . rpush global_backlog_key , message_id . to_s << "|" << channel
110- redis . publish redis_channel_name , payload
111- end
117+ if backlog_id > @max_backlog_size
118+ redis . zremrangebyscore backlog_key , 1 , backlog_id - @max_backlog_size
119+ end
112120
113- return message_id
121+ if global_id > @max_global_backlog_size
122+ redis . zremrangebyscore global_backlog_key , 1 , backlog_id - @max_backlog_size
114123 end
124+
125+ backlog_id
115126 end
116127
117128 def last_id ( channel )
118129 redis = pub_redis
119- offset_key = offset_key ( channel )
120- backlog_key = backlog_key ( channel )
121-
122- offset , len = nil
123- redis . watch offset_key , backlog_key do
124- offset = redis . get ( offset_key ) . to_i
125- len = redis . llen backlog_key
126- end
127- offset + len
130+ backlog_id_key = backlog_id_key ( channel )
131+ redis . get ( backlog_id_key ) . to_i
128132 end
129133
130134 def backlog ( channel , last_id = nil )
131135 redis = pub_redis
132- offset_key = offset_key ( channel )
133136 backlog_key = backlog_key ( channel )
134-
135- items = nil
136-
137- redis . watch offset_key , backlog_key do
138- offset = redis . get ( offset_key ) . to_i
139- start_at = last_id . to_i - offset
140- items = redis . lrange backlog_key , start_at , -1
141- end
137+ items = redis . zrangebyscore backlog_key , last_id . to_i + 1 , "+inf"
142138
143139 items . map do |i |
144140 MessageBus ::Message . decode ( i )
@@ -147,14 +143,9 @@ def backlog(channel, last_id = nil)
147143
148144 def global_backlog ( last_id = nil )
149145 last_id = last_id . to_i
150- items = nil
151146 redis = pub_redis
152147
153- redis . watch global_backlog_key , global_offset_key do
154- offset = redis . get ( global_offset_key ) . to_i
155- start_at = last_id . to_i - offset
156- items = redis . lrange global_backlog_key , start_at , -1
157- end
148+ items = redis . zrangebyscore global_backlog_key , last_id . to_i + 1 , "+inf"
158149
159150 items . map! do |i |
160151 pipe = i . index "|"
@@ -165,50 +156,73 @@ def global_backlog(last_id = nil)
165156 end
166157
167158 items . compact!
168-
169159 items
170160 end
171161
172162 def get_message ( channel , message_id )
173163 redis = pub_redis
174- offset_key = offset_key ( channel )
175164 backlog_key = backlog_key ( channel )
176165
177- msg = nil
178- redis . watch ( offset_key , backlog_key ) do
179- offset = redis . get ( offset_key ) . to_i
180- idx = ( message_id -1 ) - offset
181- return nil if idx < 0
182- msg = redis . lindex ( backlog_key , idx )
183- end
184-
185- if msg
186- msg = MessageBus ::Message . decode ( msg )
166+ items = redis . zrangebyscore backlog_key , message_id , message_id
167+ if items && items [ 0 ]
168+ MessageBus ::Message . decode ( items [ 0 ] )
169+ else
170+ nil
187171 end
188- msg
189172 end
190173
191174 def subscribe ( channel , last_id = nil )
192175 # trivial implementation for now,
193176 # can cut down on connections if we only have one global subscriber
194177 raise ArgumentError unless block_given?
195178
179+ if last_id
180+ # we need to translate this to a global id, at least give it a shot
181+ # we are subscribing on global and global is always going to be bigger than local
182+ # so worst case is a replay of a few messages
183+ message = get_message ( channel , last_id )
184+ if message
185+ last_id = message . global_id
186+ end
187+ end
196188 global_subscribe ( last_id ) do |m |
197189 yield m if m . channel == channel
198190 end
199191 end
200192
193+ def process_global_backlog ( highest_id , raise_error , &blk )
194+ global_backlog ( highest_id ) . each do |old |
195+ if highest_id + 1 == old . global_id
196+ yield old
197+ highest_id = old . global_id
198+ else
199+ raise BackLogOutOfOrder . new ( highest_id ) if raise_error
200+ if old . global_id > highest_id
201+ yield old
202+ highest_id = old . global_id
203+ end
204+ end
205+ end
206+ highest_id
207+ end
208+
201209 def global_subscribe ( last_id = nil , &blk )
202210 raise ArgumentError unless block_given?
203211 highest_id = last_id
204212
205- clear_backlog = lambda do
206- global_backlog ( highest_id ) . each do |old |
207- highest_id = old . global_id
208- yield old
213+ clear_backlog = lambda do
214+ retries = 4
215+ begin
216+ highest_id = process_global_backlog ( highest_id , retries > 0 , &blk )
217+ rescue BackLogOutOfOrder => e
218+ highest_id = e . highest_id
219+ retries -= 1
220+ sleep ( rand ( 50 ) / 1000.0 )
221+ retry
209222 end
210223 end
211224
225+
212226 begin
213227 redis = new_redis_connection
214228
@@ -224,11 +238,18 @@ def global_subscribe(last_id=nil, &blk)
224238 end
225239 on . message do |c , m |
226240 m = MessageBus ::Message . decode m
227- if highest_id && m . global_id != highest_id + 1
241+
242+ # we have 2 options
243+ #
244+ # 1. message came in the correct order GREAT, just deal with it
245+ # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog
246+
247+ if highest_id . nil? || m . global_id == highest_id + 1
248+ highest_id = m . global_id
249+ yield m
250+ else
228251 clear_backlog . call ( &blk )
229252 end
230- yield m if highest_id . nil? || m . global_id > highest_id
231- highest_id = m . global_id
232253 end
233254 end
234255 rescue => error
@@ -238,5 +259,4 @@ def global_subscribe(last_id=nil, &blk)
238259 end
239260 end
240261
241-
242262end
0 commit comments