Skip to content

Commit e043fa9

Browse files
ulixius9claude
andcommitted
fix(ingestion): release Engine resources on database switch (#27625) (#27627)
* fix(ingestion): release Engine resources on database switch (#27625) * fix(ingestion): release engine resources on database switch Close fairies in _connection_map and clear _inspector_map before engine.dispose() in CommonDbSourceService.set_inspector/close. Dispose alone does not free Inspector.info_cache or release checked-out ConnectionFairies, leaving the old engine GC-pinned across DB switches and triggering _finalize_fairy RecursionError at interpreter shutdown. Eagerly fetch multi-DB name queries (MultiDBSource._execute_database_query and SnowflakeSource.get_database_names_raw) so the cursor closes before the caller invokes set_inspector, which disposes the engine the cursor was bound to. Also rebind scoped_session to the new engine so it doesn't keep the disposed one alive via sessionmaker.bind. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * py format * fix(ingestion): address PR review feedback from gitar-bot and Copilot - Set self.engine = None after dispose in _release_engine (gitar-bot): prevents close() from leaving a dangling disposed-engine reference that would produce a confusing pool error on accidental later access. - _FakeSource now has close() and is wrapped in a fixture that cleans up its checked-out connection (Copilot #1): avoids resource warnings and an interfering fairy across test teardown. - Rewrite test_generator_survives_engine_dispose_mid_iteration as test_generator_survives_connection_close_mid_iteration (Copilot #2): Engine.dispose() does not close checked-out connections, so the old test did not reproduce what _release_engine actually does. The real regression is the explicit conn.close() on the fairy in _connection_map before dispose. The new test closes the connection mid-iteration, which is what fetchall() needs to survive. - Switch the query in _FakeSource.get_database_names_raw and the seeded INSERT assertions to the TEXT name column (Copilot #3): _execute_database_query is typed Iterable[str]; testing on integer ids obscured the actual contract. - Update test_disposes_pool to assert surrogate.engine is None after release (follows from the new self.engine = None behavior) and verify the original pool's checkedout() is 0. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(ingestion): keep connection_obj in sync with engine across DB switches self.connection_obj is set once in __init__ to the initial engine and never updated. After set_inspector rebuilds self.engine, connection_obj still points at the disposed original engine — pinning its dialect and compiled_cache alive for the source's lifetime. Rebind connection_obj when creating the new engine in set_inspector, and clear it in _release_engine so close() leaves nothing dangling. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 3df57da commit e043fa9

5 files changed

Lines changed: 659 additions & 10 deletions

File tree

ingestion/src/metadata/ingestion/source/database/common_db_source.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
from metadata.ingestion.models.patch_request import PatchedEntity, PatchRequest
6363
from metadata.ingestion.ometa.ometa_api import OpenMetadata
6464
from metadata.ingestion.source.connections import get_connection
65-
from metadata.ingestion.source.connections_utils import kill_active_connections
6665
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
6766
from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandlerMixin
6867
from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource
@@ -152,15 +151,42 @@ def set_inspector(self, database_name: str) -> None:
152151
:param database_name: new database to set
153152
"""
154153

155-
kill_active_connections(self.engine)
154+
self._release_engine()
156155
logger.info(f"Ingesting from database: {database_name}")
157156

158157
new_service_connection = deepcopy(self.service_connection)
159158
new_service_connection.database = database_name
160159
self.engine = get_connection(new_service_connection)
160+
self.session = create_and_bind_thread_safe_session(self.engine)
161+
self.connection_obj = self.engine
161162

162-
self._connection_map = {} # Lazy init as well
163+
def _release_engine(self) -> None:
164+
# Close fairies first so _ConnectionRecord drops its pool reference;
165+
# dispose alone leaves them orphaned and causes _finalize_fairy
166+
# RecursionErrors at GC time. Clearing _inspector_map is what
167+
# actually frees Inspector.info_cache — dispose() does not.
168+
if getattr(self, "engine", None) is None:
169+
return
170+
for conn in self._connection_map.values():
171+
try:
172+
conn.close()
173+
except Exception: # pylint: disable=broad-except
174+
logger.debug("Connection already closed", exc_info=True)
175+
self._connection_map = {}
163176
self._inspector_map = {}
177+
session = getattr(self, "session", None)
178+
if session is not None:
179+
try:
180+
session.remove()
181+
except Exception: # pylint: disable=broad-except
182+
logger.debug("Session cleanup failed", exc_info=True)
183+
self.session = None
184+
try:
185+
self.engine.dispose()
186+
except Exception as exc: # pylint: disable=broad-except
187+
logger.warning(f"Failed to dispose engine: {exc}")
188+
self.engine = None
189+
self.connection_obj = None
164190

165191
def get_database_names(self) -> Iterable[str]:
166192
"""
@@ -781,14 +807,10 @@ def inspector(self) -> Inspector:
781807
return self._inspector_map[thread_id]
782808

783809
def close(self):
784-
if self.connection is not None:
785-
self.connection.close()
786-
for connection in self._connection_map.values():
787-
connection.close()
810+
self._release_engine()
788811
if hasattr(self, "ssl_manager") and self.ssl_manager:
789812
self.ssl_manager = cast(SSLManager, self.ssl_manager)
790813
self.ssl_manager.cleanup_temp_files()
791-
self.engine.dispose()
792814

793815
def fetch_table_tags(
794816
self,

ingestion/src/metadata/ingestion/source/database/multi_db_source.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ def get_database_names_raw(self) -> Iterable[str]:
3131
"""
3232

3333
def _execute_database_query(self, query: str) -> Iterable[str]:
34-
results = self.connection.execute(query) # pylint: disable=no-member
34+
results = self.connection.execute(query).fetchall() # pylint: disable=no-member
35+
3536
for res in results:
3637
row = list(res)
3738
yield row[0]

ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ def get_configured_database(self) -> Optional[str]:
358358
return self.service_connection.database
359359

360360
def get_database_names_raw(self) -> Iterable[str]:
361-
results = self.connection.execute(SNOWFLAKE_GET_DATABASES)
361+
results = self.connection.execute(SNOWFLAKE_GET_DATABASES).fetchall()
362362
for res in results:
363363
row = list(res)
364364
yield row[1]

0 commit comments

Comments
 (0)