@@ -212,10 +212,39 @@ def _poll_for_queues(self) -> None:
212212
213213 This is only used when using polling to get queues with queued tasks.
214214 """
215- if not self . _did_work :
216- time . sleep ( self .config [ "POLL_TASK_QUEUES_INTERVAL" ] )
215+
216+ self ._wait_to_refresh_queue_set ( )
217217 self ._refresh_queue_set ()
218218
219+ def _wait_to_refresh_queue_set (self ):
220+ interval = self .config ["POLL_TASK_QUEUES_INTERVAL" ]
221+
222+ if self ._did_work :
223+ self .log .info ("Queue poll: No delay" )
224+ return
225+
226+ def throttle_queue_poll ():
227+ return self .connection .exists (
228+ self ._key ("throttle_queue_poll" , self .worker_group_name )
229+ )
230+
231+ lock = self .connection .lock (
232+ self ._key ("lockv2" , "queue_poll" , self .worker_group_name ),
233+ timeout = interval * 0.5 ,
234+ )
235+
236+ while True :
237+ if not throttle_queue_poll ():
238+ self .log .info (f"Queue poll: Sleeping { interval } s" )
239+ time .sleep (interval )
240+ return
241+
242+ if lock .acquire (blocking = False ):
243+ self .log .info ("Queue poll: Acquired lock" )
244+ return
245+
246+ time .sleep (interval * 0.1 )
247+
219248 def _pubsub_for_queues (self , timeout = 0 , batch_timeout = 0 ) -> None :
220249 """
221250 Check activity channel for new queues and wait as necessary.
@@ -1155,7 +1184,20 @@ def _refresh_queue_set(self) -> None:
11551184 self ._filter_queues (self ._retrieve_queues (self ._key (QUEUED )))
11561185 )
11571186
1187+ throttle_key = self ._key ("throttle_queue_poll" , self .worker_group_name )
1188+
1189+ if self ._queue_set :
1190+ self .connection .delete (throttle_key )
1191+ else :
1192+ self .connection .set (
1193+ throttle_key ,
1194+ value = 1 ,
1195+ px = int (self .config ["POLL_TASK_QUEUES_INTERVAL" ] * 1000 ),
1196+ )
1197+
11581198 def _retrieve_queues (self , key ) -> Set [str ]:
1199+ self .log .info ("Queue poll: Done" )
1200+
11591201 if len (self .only_queues ) != 1 :
11601202 return self .connection .smembers (key )
11611203
0 commit comments