Skip to content

Commit 8571a70

Browse files
ulixius9claude
andauthored
fix(ingestion): release Engine resources on database switch (#27625) (#27627)
* fix(ingestion): release Engine resources on database switch (#27625) * fix(ingestion): release engine resources on database switch Close fairies in _connection_map and clear _inspector_map before engine.dispose() in CommonDbSourceService.set_inspector/close. Dispose alone does not free Inspector.info_cache or release checked-out ConnectionFairies, leaving the old engine GC-pinned across DB switches and triggering _finalize_fairy RecursionError at interpreter shutdown. Eagerly fetch multi-DB name queries (MultiDBSource._execute_database_query and SnowflakeSource.get_database_names_raw) so the cursor closes before the caller invokes set_inspector, which disposes the engine the cursor was bound to. Also rebind scoped_session to the new engine so it doesn't keep the disposed one alive via sessionmaker.bind. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * py format * fix(ingestion): address PR review feedback from gitar-bot and Copilot - Set self.engine = None after dispose in _release_engine (gitar-bot): prevents close() from leaving a dangling disposed-engine reference that would produce a confusing pool error on accidental later access. - _FakeSource now has close() and is wrapped in a fixture that cleans up its checked-out connection (Copilot #1): avoids resource warnings and an interfering fairy across test teardown. - Rewrite test_generator_survives_engine_dispose_mid_iteration as test_generator_survives_connection_close_mid_iteration (Copilot #2): Engine.dispose() does not close checked-out connections, so the old test did not reproduce what _release_engine actually does. The real regression is the explicit conn.close() on the fairy in _connection_map before dispose. The new test closes the connection mid-iteration, which is what fetchall() needs to survive. - Switch the query in _FakeSource.get_database_names_raw and the seeded INSERT assertions to the TEXT name column (Copilot #3): _execute_database_query is typed Iterable[str]; testing on integer ids obscured the actual contract. - Update test_disposes_pool to assert surrogate.engine is None after release (follows from the new self.engine = None behavior) and verify the original pool's checkedout() is 0. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(ingestion): keep connection_obj in sync with engine across DB switches self.connection_obj is set once in __init__ to the initial engine and never updated. After set_inspector rebuilds self.engine, connection_obj still points at the disposed original engine — pinning its dialect and compiled_cache alive for the source's lifetime. Rebind connection_obj when creating the new engine in set_inspector, and clear it in _release_engine so close() leaves nothing dangling. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d373a14 commit 8571a70

5 files changed

Lines changed: 324 additions & 10 deletions

File tree

ingestion/src/metadata/ingestion/source/database/common_db_source.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
from metadata.ingestion.models.patch_request import PatchedEntity, PatchRequest
6363
from metadata.ingestion.ometa.ometa_api import OpenMetadata
6464
from metadata.ingestion.source.connections import get_connection
65-
from metadata.ingestion.source.connections_utils import kill_active_connections
6665
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
6766
from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandlerMixin
6867
from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource
@@ -152,15 +151,42 @@ def set_inspector(self, database_name: str) -> None:
152151
:param database_name: new database to set
153152
"""
154153

155-
kill_active_connections(self.engine)
154+
self._release_engine()
156155
logger.info(f"Ingesting from database: {database_name}")
157156

158157
new_service_connection = deepcopy(self.service_connection)
159158
new_service_connection.database = database_name
160159
self.engine = get_connection(new_service_connection)
160+
self.session = create_and_bind_thread_safe_session(self.engine)
161+
self.connection_obj = self.engine
161162

162-
self._connection_map = {} # Lazy init as well
163+
def _release_engine(self) -> None:
164+
# Close fairies first so _ConnectionRecord drops its pool reference;
165+
# dispose alone leaves them orphaned and causes _finalize_fairy
166+
# RecursionErrors at GC time. Clearing _inspector_map is what
167+
# actually frees Inspector.info_cache — dispose() does not.
168+
if getattr(self, "engine", None) is None:
169+
return
170+
for conn in self._connection_map.values():
171+
try:
172+
conn.close()
173+
except Exception: # pylint: disable=broad-except
174+
logger.debug("Connection already closed", exc_info=True)
175+
self._connection_map = {}
163176
self._inspector_map = {}
177+
session = getattr(self, "session", None)
178+
if session is not None:
179+
try:
180+
session.remove()
181+
except Exception: # pylint: disable=broad-except
182+
logger.debug("Session cleanup failed", exc_info=True)
183+
self.session = None
184+
try:
185+
self.engine.dispose()
186+
except Exception as exc: # pylint: disable=broad-except
187+
logger.warning(f"Failed to dispose engine: {exc}")
188+
self.engine = None
189+
self.connection_obj = None
164190

165191
def get_database_names(self) -> Iterable[str]:
166192
"""
@@ -780,14 +806,10 @@ def inspector(self) -> Inspector:
780806
return self._inspector_map[thread_id]
781807

782808
def close(self):
783-
if self.connection is not None:
784-
self.connection.close()
785-
for connection in self._connection_map.values():
786-
connection.close()
809+
self._release_engine()
787810
if hasattr(self, "ssl_manager") and self.ssl_manager:
788811
self.ssl_manager = cast(SSLManager, self.ssl_manager)
789812
self.ssl_manager.cleanup_temp_files()
790-
self.engine.dispose()
791813

792814
def fetch_table_tags(
793815
self,

ingestion/src/metadata/ingestion/source/database/multi_db_source.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ def get_database_names_raw(self) -> Iterable[str]:
3333
"""
3434

3535
def _execute_database_query(self, query: str) -> Iterable[str]:
36-
results = self.connection.execute(text(query)) # pylint: disable=no-member
36+
results = self.connection.execute(
37+
text(query)
38+
).fetchall() # pylint: disable=no-member
3739
for res in results:
3840
row = list(res)
3941
yield row[0]

ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ def get_configured_database(self) -> Optional[str]:
402402
return self.service_connection.database
403403

404404
def get_database_names_raw(self) -> Iterable[str]:
405-
results = self.connection.execute(text(SNOWFLAKE_GET_DATABASES))
405+
results = self.connection.execute(text(SNOWFLAKE_GET_DATABASES)).fetchall()
406406
for res in results:
407407
row = list(res)
408408
yield row[1]

ingestion/tests/unit/topology/database/test_common_db_source.py

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,15 @@
1313
Tests for CommonDbSourceService._prepare_foreign_constraints
1414
"""
1515

16+
import gc
17+
import weakref
1618
from unittest.mock import MagicMock, patch
1719

1820
import pytest
21+
from sqlalchemy import create_engine, text
22+
from sqlalchemy.engine import Engine
23+
from sqlalchemy.inspection import inspect
24+
from sqlalchemy.pool import QueuePool
1925

2026
from metadata.generated.schema.entity.data.table import (
2127
Column,
@@ -24,8 +30,10 @@
2430
Table,
2531
TableConstraint,
2632
)
33+
from metadata.ingestion.connections.session import create_and_bind_thread_safe_session
2734
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
2835
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
36+
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
2937

3038

3139
@pytest.fixture
@@ -334,3 +342,239 @@ def test_constraint_with_none_columns_skipped(self):
334342
result = DatabaseServiceSource.normalize_table_constraints(constraints, columns)
335343
assert result[0].columns is None
336344
assert result[1].columns == ["id"]
345+
346+
347+
class _ReleaseOnlySurrogate(CommonDbSourceService):
348+
"""
349+
Minimal concrete subclass that bypasses CommonDbSourceService.__init__
350+
(which needs a full workflow config) so we can drive _release_engine /
351+
close against a real SQLAlchemy engine in isolation.
352+
"""
353+
354+
def __init__(self, engine=None): # pylint: disable=super-init-not-called
355+
self.engine = engine
356+
self.connection_obj = engine
357+
self._connection_map = {}
358+
self._inspector_map = {}
359+
self.session = None
360+
self.ssl_manager = None
361+
362+
def create(self, *args, **kwargs): # satisfy abstract method contract
363+
raise NotImplementedError
364+
365+
366+
def _make_release_surrogate(engine=None):
367+
"""
368+
Build a minimal stand-in for CommonDbSourceService that has just the
369+
attributes _release_engine touches, bypassing the heavy __init__ that
370+
requires a full workflow config.
371+
"""
372+
return _ReleaseOnlySurrogate(engine=engine)
373+
374+
375+
@pytest.fixture
376+
def sqlite_engine():
377+
"""Real, in-memory SQLite engine with an explicit QueuePool."""
378+
engine = create_engine("sqlite:///:memory:", poolclass=QueuePool)
379+
yield engine
380+
try:
381+
engine.dispose()
382+
except Exception:
383+
pass
384+
385+
386+
@pytest.fixture
387+
def surrogate(sqlite_engine):
388+
"""Minimal CommonDbSourceService with a real engine attached."""
389+
return _make_release_surrogate(sqlite_engine)
390+
391+
392+
class TestReleaseEngine:
393+
"""Option B: _release_engine closes all pooled connections, clears
394+
inspector/session state, and disposes the engine regardless of
395+
which thread called it."""
396+
397+
def test_closes_every_connection_map_entry(self, surrogate):
398+
conn_a = surrogate.engine.connect()
399+
conn_b = surrogate.engine.connect()
400+
surrogate._connection_map[111] = conn_a
401+
surrogate._connection_map[222] = conn_b
402+
403+
surrogate._release_engine()
404+
405+
assert conn_a.closed is True
406+
assert conn_b.closed is True
407+
assert surrogate._connection_map == {}
408+
409+
def test_clears_inspector_map(self, surrogate):
410+
surrogate._connection_map[999] = surrogate.engine.connect()
411+
surrogate._inspector_map[999] = inspect(surrogate._connection_map[999])
412+
assert len(surrogate._inspector_map) == 1
413+
414+
surrogate._release_engine()
415+
416+
assert surrogate._inspector_map == {}
417+
418+
def test_disposes_pool_and_clears_engine_ref(self, surrogate):
419+
captured_engine = surrogate.engine
420+
original_pool = captured_engine.pool
421+
assert isinstance(original_pool, QueuePool)
422+
connection = surrogate.engine.connect()
423+
surrogate._connection_map[1] = connection
424+
425+
surrogate._release_engine()
426+
427+
assert surrogate.engine is None
428+
assert connection.closed is True
429+
assert original_pool.checkedout() == 0
430+
431+
def test_removes_session(self, surrogate):
432+
surrogate.session = create_and_bind_thread_safe_session(surrogate.engine)
433+
assert surrogate.session is not None
434+
435+
surrogate._release_engine()
436+
437+
assert surrogate.session is None
438+
439+
def test_idempotent_when_engine_is_none(self):
440+
surrogate = _make_release_surrogate(engine=None)
441+
surrogate._release_engine()
442+
assert surrogate.engine is None
443+
assert surrogate._connection_map == {}
444+
assert surrogate._inspector_map == {}
445+
446+
def test_tolerates_already_closed_connection(self, surrogate):
447+
healthy = surrogate.engine.connect()
448+
already_closed = surrogate.engine.connect()
449+
already_closed.close()
450+
surrogate._connection_map[1] = healthy
451+
surrogate._connection_map[2] = already_closed
452+
453+
surrogate._release_engine()
454+
455+
assert healthy.closed is True
456+
assert surrogate._connection_map == {}
457+
458+
def test_clears_connection_obj_alongside_engine(self, surrogate):
459+
# connection_obj is set in __init__ to the initial engine and used by
460+
# test_connection(); without clearing it on release, it pins the
461+
# original Engine alive for the source's lifetime even after dispose.
462+
assert surrogate.connection_obj is surrogate.engine
463+
464+
surrogate._release_engine()
465+
466+
assert surrogate.connection_obj is None
467+
468+
def test_closes_connections_from_arbitrary_thread_ids(self, surrogate):
469+
"""Key property of Option B: close-all, not detach-current-thread.
470+
Every fairy in _connection_map must close regardless of the caller's
471+
thread id."""
472+
conns = {
473+
111: surrogate.engine.connect(),
474+
222: surrogate.engine.connect(),
475+
333: surrogate.engine.connect(),
476+
}
477+
surrogate._connection_map.update(conns)
478+
479+
surrogate._release_engine()
480+
481+
for conn in conns.values():
482+
assert conn.closed is True
483+
assert surrogate._connection_map == {}
484+
485+
486+
class TestEngineGcReclamation:
487+
"""Acceptance test for the memory leak fix: after _release_engine and
488+
dropping the strong reference, the old Engine must be garbage-collectable.
489+
The previous kill_active_connections path left _ConnectionRecord fairies
490+
pinning the engine, which is what this test guards against."""
491+
492+
def test_old_engine_becomes_gc_eligible_after_release(self):
493+
engine = create_engine("sqlite:///:memory:", poolclass=QueuePool)
494+
surrogate = _make_release_surrogate(engine)
495+
surrogate._connection_map[12345] = surrogate.engine.connect()
496+
497+
old_engine_ref = weakref.ref(surrogate.engine)
498+
499+
surrogate._release_engine()
500+
surrogate.engine = None
501+
engine = None # drop local strong ref too
502+
503+
gc.collect()
504+
505+
assert old_engine_ref() is None
506+
507+
508+
class _FakeSource(MultiDBSource):
509+
"""Minimal MultiDBSource that exposes a real SQLAlchemy connection so we
510+
can exercise _execute_database_query against a live cursor."""
511+
512+
def __init__(self, engine: Engine):
513+
self._engine = engine
514+
self._conn = engine.connect()
515+
516+
@property
517+
def connection(self):
518+
return self._conn
519+
520+
def close(self):
521+
try:
522+
self._conn.close()
523+
except Exception:
524+
pass
525+
526+
def get_configured_database(self):
527+
return None
528+
529+
def get_database_names_raw(self):
530+
return self._execute_database_query("SELECT name FROM dbs ORDER BY id")
531+
532+
533+
class TestExecuteDatabaseQueryEagerFetch:
534+
"""Option B Part 2: _execute_database_query must eagerly .fetchall()
535+
so that _release_engine closing the connection in _connection_map
536+
(the original regression pattern from set_inspector) does not
537+
invalidate the cursor the generator is iterating."""
538+
539+
@pytest.fixture
540+
def seeded_engine(self):
541+
engine = create_engine("sqlite:///:memory:", poolclass=QueuePool)
542+
with engine.connect() as conn:
543+
conn.execute(text("CREATE TABLE dbs (id INTEGER PRIMARY KEY, name TEXT)"))
544+
conn.execute(
545+
text(
546+
"INSERT INTO dbs(id, name) VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')"
547+
)
548+
)
549+
conn.commit()
550+
yield engine
551+
try:
552+
engine.dispose()
553+
except Exception:
554+
pass
555+
556+
@pytest.fixture
557+
def fake_source(self, seeded_engine):
558+
source = _FakeSource(seeded_engine)
559+
yield source
560+
source.close()
561+
562+
def test_generator_survives_connection_close_mid_iteration(self, fake_source):
563+
# Simulates what _release_engine actually does: it close()s every
564+
# connection in _connection_map BEFORE disposing the engine. Without
565+
# .fetchall() the cursor would die at that close() and the next
566+
# yield would raise; with .fetchall() the rows are already buffered.
567+
generator = fake_source.get_database_names_raw()
568+
569+
first = next(generator)
570+
assert first == "alpha"
571+
572+
fake_source._conn.close()
573+
574+
remaining = list(generator)
575+
assert remaining == ["beta", "gamma"]
576+
577+
def test_returns_all_rows_in_order(self, fake_source):
578+
results = list(fake_source.get_database_names_raw())
579+
580+
assert results == ["alpha", "beta", "gamma"]

0 commit comments

Comments
 (0)