@@ -517,6 +517,7 @@ async def _run_consumer(self) -> None: # noqa: PLR0915, PLR0912
517517 # The consumer is dead. The ActiveTask is permanently finished.
518518 self ._is_finished .set ()
519519 self ._request_queue .shutdown (immediate = True )
520+ await self ._event_queue_agent .close (immediate = True )
520521
521522 logger .debug ('Consumer[%s]: Finishing' , self ._task_id )
522523 await self ._maybe_cleanup ()
@@ -574,53 +575,42 @@ async def subscribe( # noqa: PLR0912, PLR0915
574575 if self ._exception :
575576 raise self ._exception
576577
577- # Wait for next event or task completion
578- try :
579- dequeued = await asyncio .wait_for (
580- tapped_queue .dequeue_event (), timeout = 0.1
581- )
582- event , updated_task = cast ('Any' , dequeued )
578+ dequeued = await tapped_queue .dequeue_event ()
579+ event , updated_task = cast ('Any' , dequeued )
580+ logger .debug (
581+ 'Subscriber[%s]\n Dequeued event %s\n Updated task %s\n ' ,
582+ self ._task_id ,
583+ event ,
584+ updated_task ,
585+ )
586+ if replace_status_update_with_task and isinstance (
587+ event , TaskStatusUpdateEvent
588+ ):
583589 logger .debug (
584- 'Subscriber[%s]\n Dequeued event %s \n Updated task %s \n ' ,
590+ 'Subscriber[%s]: Replacing TaskStatusUpdateEvent with Task: %s ' ,
585591 self ._task_id ,
586- event ,
587592 updated_task ,
588593 )
589- if replace_status_update_with_task and isinstance (
590- event , TaskStatusUpdateEvent
594+ event = updated_task
595+ if self ._exception :
596+ raise self ._exception from None
597+ if isinstance (event , _RequestCompleted ):
598+ if (
599+ request_id is not None
600+ and event .request_id == request_id
591601 ):
592602 logger .debug (
593- 'Subscriber[%s]: Replacing TaskStatusUpdateEvent with Task: %s' ,
594- self ._task_id ,
595- updated_task ,
596- )
597- event = updated_task
598- if self ._exception :
599- raise self ._exception from None
600- if isinstance (event , _RequestCompleted ):
601- if (
602- request_id is not None
603- and event .request_id == request_id
604- ):
605- logger .debug (
606- 'Subscriber[%s]: Request completed' ,
607- self ._task_id ,
608- )
609- return
610- continue
611- elif isinstance (event , _RequestStarted ):
612- logger .debug (
613- 'Subscriber[%s]: Request started' ,
603+ 'Subscriber[%s]: Request completed' ,
614604 self ._task_id ,
615605 )
616- continue
617- except (asyncio .TimeoutError , TimeoutError ):
618- if self ._is_finished .is_set ():
619- if self ._exception :
620- raise self ._exception from None
621- break
606+ return
607+ continue
608+ elif isinstance (event , _RequestStarted ):
609+ logger .debug (
610+ 'Subscriber[%s]: Request started' ,
611+ self ._task_id ,
612+ )
622613 continue
623-
624614 try :
625615 yield event
626616 finally :
0 commit comments