Skip to content

Commit ed3bd1e

Browse files
ulixius9Copilotclaude
committed
Fix: Improve ingestion logging for streaming logs (#27213)
* Fix: Improve ingestion logging for streaming logs * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * test: verify print_status runs before stop in execute() teardown Covers the ordering fix and finally-guarantee: print_status must run before stop (so records are flushed while resources are alive), and stop must still run when print_status raises. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9df1fc6 commit ed3bd1e

2 files changed

Lines changed: 47 additions & 2 deletions

File tree

ingestion/src/metadata/workflow/base.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,10 @@ def execute(self) -> None:
281281
finally:
282282
ingestion_status = self.build_ingestion_status()
283283
self.set_ingestion_pipeline_status(pipeline_state, ingestion_status)
284-
self.stop()
285-
self.print_status()
284+
try:
285+
self.print_status()
286+
finally:
287+
self.stop()
286288

287289
@property
288290
def run_id(self) -> str:

ingestion/tests/unit/workflow/test_base_workflow.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"""
1414
from typing import Iterable, Tuple
1515
from unittest import TestCase
16+
from unittest.mock import MagicMock, patch
1617

1718
import pytest
1819

@@ -200,3 +201,45 @@ def test_workflow_config_supports_ingestion_runner_name(self):
200201
)
201202

202203
self.assertEqual(workflow_config.ingestionRunnerName, "test-runner")
204+
205+
206+
class TestWorkflowExecuteTeardown:
207+
"""
208+
Validates the execute() teardown contract: status must be printed before
209+
stop() tears down resources (so final records are flushed while the
210+
metadata client and steps are alive), and stop() must still run when
211+
print_status() raises so we never leak the timer thread or OM client.
212+
"""
213+
214+
def test_print_status_runs_before_stop(self):
215+
workflow = SimpleWorkflow(config=config)
216+
manager = MagicMock()
217+
218+
with patch.object(
219+
workflow, "print_status", wraps=workflow.print_status
220+
) as mock_print_status, patch.object(
221+
workflow, "stop", wraps=workflow.stop
222+
) as mock_stop:
223+
manager.attach_mock(mock_print_status, "print_status")
224+
manager.attach_mock(mock_stop, "stop")
225+
226+
workflow.execute()
227+
228+
ordered_names = [mock_call[0] for mock_call in manager.mock_calls]
229+
assert ordered_names == ["print_status", "stop"]
230+
231+
def test_stop_still_runs_when_print_status_raises(self):
232+
workflow = SimpleWorkflow(config=config)
233+
234+
with patch.object(
235+
workflow,
236+
"print_status",
237+
side_effect=RuntimeError("boom"),
238+
) as mock_print_status, patch.object(
239+
workflow, "stop", wraps=workflow.stop
240+
) as mock_stop:
241+
with pytest.raises(RuntimeError, match="boom"):
242+
workflow.execute()
243+
244+
mock_print_status.assert_called_once()
245+
mock_stop.assert_called_once()

0 commit comments

Comments
 (0)