@@ -747,3 +747,72 @@ async def consumer(q: EventQueue) -> None:
747747 )
748748
749749 await queue .close (immediate = True )
750+
751+
752+ @pytest .mark .asyncio
753+ async def test_event_queue_blocking_behavior () -> None :
754+ _PARENT_QUEUE_SIZE = 10
755+ _TAPPED_QUEUE_SIZE = 15
756+
757+ queue = EventQueueSource (max_queue_size = _PARENT_QUEUE_SIZE )
758+ # tapped_queue initially has no consumer, so it will block.
759+ tapped_queue = await queue .tap (max_queue_size = _TAPPED_QUEUE_SIZE )
760+
761+ producer_task_done = asyncio .Event ()
762+ enqueued_count = 0
763+
764+ async def producer () -> None :
765+ nonlocal enqueued_count
766+ for i in range (50 ):
767+ event = create_sample_message (message_id = str (i ))
768+ await queue .enqueue_event (event )
769+ enqueued_count += 1
770+ producer_task_done .set ()
771+
772+ consumed_first = []
773+
774+ async def consumer_first () -> None :
775+ while True :
776+ try :
777+ event = await queue .dequeue_event ()
778+ consumed_first .append (event )
779+ queue .task_done ()
780+ except QueueShutDown :
781+ break
782+
783+ consumer_first_task = asyncio .create_task (consumer_first ())
784+ producer_task = asyncio .create_task (producer ())
785+
786+ # Wait to let the producer fill the queues and confirm it is blocked
787+ with pytest .raises (asyncio .TimeoutError ):
788+ await asyncio .wait_for (producer_task_done .wait (), timeout = 0.1 )
789+
790+ # Validate that: first consumer receives _TAPPED_QUEUE_SIZE + 1 items.
791+ # Other items are blocking trying to be enqueued to second queue.
792+ assert len (consumed_first ) == _TAPPED_QUEUE_SIZE + 1
793+
794+ # Validate that: once child queue is blocked, parent will continue
795+ # processing other items until it reaches its capacity as well.
796+ assert not producer_task .done ()
797+ assert enqueued_count == _PARENT_QUEUE_SIZE + _TAPPED_QUEUE_SIZE + 1
798+
799+ consumed_second = []
800+
801+ # create a consumer for second queue.
802+ async def consumer_second () -> None :
803+ while True :
804+ try :
805+ event = await tapped_queue .dequeue_event ()
806+ consumed_second .append (event )
807+ tapped_queue .task_done ()
808+ except QueueShutDown :
809+ break
810+
811+ consumer_second_task = asyncio .create_task (consumer_second ())
812+ await asyncio .wait_for (producer_task_done .wait (), timeout = 1.0 )
813+ await queue .close (immediate = False )
814+ await asyncio .gather (consumer_first_task , consumer_second_task )
815+
816+ # Validate that: after unblocking second consumer everything ends smoothly.
817+ assert len (consumed_first ) == 50
818+ assert len (consumed_second ) == 50
0 commit comments