@@ -228,12 +228,14 @@ async def mock_consume_generator():
228228 (
229229 result ,
230230 interrupted ,
231+ bg_task ,
231232 ) = await self .aggregator .consume_and_break_on_interrupt (
232233 self .mock_event_consumer
233234 )
234235
235236 self .assertEqual (result , sample_message )
236237 self .assertFalse (interrupted )
238+ self .assertIsNone (bg_task )
237239 self .mock_task_manager .process .assert_not_called () # Process is not called for the Message if returned directly
238240 # _continue_consuming should not be called if it's a message interrupt
239241 # and no auth_required state.
@@ -265,12 +267,14 @@ async def mock_consume_generator():
265267 (
266268 result ,
267269 interrupted ,
270+ bg_task ,
268271 ) = await self .aggregator .consume_and_break_on_interrupt (
269272 self .mock_event_consumer
270273 )
271274
272275 self .assertEqual (result , auth_task )
273276 self .assertTrue (interrupted )
277+ self .assertIsNotNone (bg_task )
274278 self .mock_task_manager .process .assert_called_once_with (auth_task )
275279 mock_create_task .assert_called_once () # Check that create_task was called
276280 # self.aggregator._continue_consuming is an AsyncMock.
@@ -317,12 +321,14 @@ async def mock_consume_generator():
317321 (
318322 result ,
319323 interrupted ,
324+ bg_task ,
320325 ) = await self .aggregator .consume_and_break_on_interrupt (
321326 self .mock_event_consumer
322327 )
323328
324329 self .assertEqual (result , current_task_state_after_update )
325330 self .assertTrue (interrupted )
331+ self .assertIsNotNone (bg_task )
326332 self .mock_task_manager .process .assert_called_once_with (
327333 auth_status_update
328334 )
@@ -353,13 +359,15 @@ async def mock_consume_generator():
353359 (
354360 result ,
355361 interrupted ,
362+ bg_task ,
356363 ) = await self .aggregator .consume_and_break_on_interrupt (
357364 self .mock_event_consumer
358365 )
359366
360367 # If the first event is a Message, it's returned directly.
361368 self .assertEqual (result , event1 )
362369 self .assertFalse (interrupted )
370+ self .assertIsNone (bg_task )
363371 # process() is NOT called for the Message if it's the one causing the return
364372 self .mock_task_manager .process .assert_not_called ()
365373 self .mock_task_manager .get_task .assert_not_called ()
@@ -415,12 +423,14 @@ async def mock_consume_generator():
415423 (
416424 result ,
417425 interrupted ,
426+ bg_task ,
418427 ) = await self .aggregator .consume_and_break_on_interrupt (
419428 self .mock_event_consumer , blocking = False
420429 )
421430
422431 self .assertEqual (result , first_event )
423432 self .assertTrue (interrupted )
433+ self .assertIsNotNone (bg_task )
424434 self .mock_task_manager .process .assert_called_once_with (first_event )
425435 mock_create_task .assert_called_once ()
426436 # The background task should be created with the remaining stream
@@ -468,7 +478,7 @@ async def initial_consume_generator():
468478 mock_create_task .side_effect = lambda coro : asyncio .ensure_future (coro )
469479
470480 # Call the main method that triggers _continue_consuming via create_task
471- _ , _ = await self .aggregator .consume_and_break_on_interrupt (
481+ _ , _ , _ = await self .aggregator .consume_and_break_on_interrupt (
472482 self .mock_event_consumer
473483 )
474484
0 commit comments