@@ -22,6 +22,36 @@ def is_closed(self) -> bool:
2222 return self ._closed
2323
2424
25+ class FakeQueueWithException :
26+ def __init__ (self , exception ):
27+ self .exception = exception
28+
29+ async def dequeue_event (self , no_wait : bool = False ):
30+ raise self .exception
31+
32+ def is_closed (self ) -> bool :
33+ return False
34+
35+
36+ class FakeQueueWithDelay :
37+ def __init__ (self , items , delay = 0.1 ):
38+ self ._items = list (items )
39+ self .delay = delay
40+ self ._closed = False
41+
42+ async def dequeue_event (self , no_wait : bool = False ):
43+ if no_wait and not self ._items :
44+ raise asyncio .QueueEmpty
45+ if self .delay > 0 :
46+ await asyncio .sleep (self .delay )
47+ if not self ._items :
48+ raise asyncio .QueueEmpty
49+ return self ._items .pop (0 )
50+
51+ def is_closed (self ) -> bool :
52+ return self ._closed
53+
54+
2555@pytest .mark .asyncio
2656async def test_consume_one_uses_no_wait ():
2757 q = FakeQueue ([])
@@ -44,3 +74,99 @@ async def test_consume_all_yields_until_closed():
4474 with pytest .raises (StopAsyncIteration ):
4575 await anext (it )
4676 assert results == [1 , 2 ]
77+
78+
79+ @pytest .mark .asyncio
80+ async def test_consume_one_with_item ():
81+ q = FakeQueue ([42 ])
82+ consumer = RedisEventConsumer (q )
83+ result = await consumer .consume_one ()
84+ assert result == 42
85+
86+
87+ @pytest .mark .asyncio
88+ async def test_consume_all_with_empty_queue ():
89+ q = FakeQueue ([])
90+ consumer = RedisEventConsumer (q )
91+ it = consumer .consume_all ()
92+ # mark closed immediately
93+ q ._closed = True
94+ with pytest .raises (StopAsyncIteration ):
95+ await anext (it )
96+
97+
98+ @pytest .mark .asyncio
99+ async def test_consume_all_with_exception_in_dequeue ():
100+ q = FakeQueueWithException (RuntimeError ('Test error' ))
101+ consumer = RedisEventConsumer (q )
102+ it = consumer .consume_all ()
103+ with pytest .raises (RuntimeError , match = 'Test error' ):
104+ await anext (it )
105+
106+
107+ @pytest .mark .asyncio
108+ async def test_consume_one_with_exception_in_dequeue ():
109+ q = FakeQueueWithException (ValueError ('Test error' ))
110+ consumer = RedisEventConsumer (q )
111+ with pytest .raises (ValueError , match = 'Test error' ):
112+ await consumer .consume_one ()
113+
114+
115+ @pytest .mark .asyncio
116+ async def test_consume_all_handles_queue_empty_then_closed ():
117+ q = FakeQueue ([])
118+ consumer = RedisEventConsumer (q )
119+ it = consumer .consume_all ()
120+ # First iteration should raise QueueEmpty but continue since not closed
121+ # Mark closed during the exception handling
122+ q ._closed = True
123+ with pytest .raises (StopAsyncIteration ):
124+ await anext (it )
125+
126+
127+ @pytest .mark .asyncio
128+ async def test_consume_all_with_delay ():
129+ q = FakeQueueWithDelay ([1 , 2 , 3 ], delay = 0.01 )
130+ consumer = RedisEventConsumer (q )
131+ it = consumer .consume_all ()
132+ results = []
133+ async for item in it :
134+ results .append (item )
135+ if len (results ) >= 3 :
136+ q ._closed = True
137+ break
138+ assert results == [1 , 2 , 3 ]
139+
140+
141+ @pytest .mark .asyncio
142+ async def test_consumer_initialization ():
143+ q = FakeQueue ([1 ])
144+ consumer = RedisEventConsumer (q )
145+ assert consumer ._queue is q
146+
147+
148+ @pytest .mark .asyncio
149+ async def test_consume_all_stops_when_closed_during_iteration ():
150+ q = FakeQueue ([1 , 2 , 3 , 4 , 5 ])
151+ consumer = RedisEventConsumer (q )
152+ it = consumer .consume_all ()
153+ results = []
154+ # Consume a few items
155+ results .append (await anext (it ))
156+ results .append (await anext (it ))
157+ # Mark closed during iteration
158+ q ._closed = True
159+ # Next iteration should stop
160+ with pytest .raises (StopAsyncIteration ):
161+ await anext (it )
162+ assert results == [1 , 2 ]
163+
164+
165+ @pytest .mark .asyncio
166+ async def test_consume_one_no_wait_false ():
167+ """Test that consume_one always uses no_wait=True regardless of parameter."""
168+ q = FakeQueue ([])
169+ consumer = RedisEventConsumer (q )
170+ # Even though dequeue_event might support no_wait=False, consume_one should always use True
171+ with pytest .raises (asyncio .QueueEmpty ):
172+ await consumer .consume_one ()
0 commit comments