diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py index 7dec26ef0492..51726e17b5bf 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py @@ -157,7 +157,7 @@ def get_for_table( ) ) return TypeAdapter(List[SnowflakeQueryLogEntry]).validate_python( - [ExtendedDict(r).lower_case_keys() for r in rows] + [ExtendedDict(r._asdict()).lower_case_keys() for r in rows] ) diff --git a/ingestion/tests/unit/metadata/ingestion/source/database/snowflake/profiler/test_system_metrics.py b/ingestion/tests/unit/metadata/ingestion/source/database/snowflake/profiler/test_system_metrics.py index a9ef6928cf4b..3d2a5e637a85 100644 --- a/ingestion/tests/unit/metadata/ingestion/source/database/snowflake/profiler/test_system_metrics.py +++ b/ingestion/tests/unit/metadata/ingestion/source/database/snowflake/profiler/test_system_metrics.py @@ -1,7 +1,9 @@ -from datetime import datetime -from unittest.mock import MagicMock, Mock, patch +from datetime import datetime, timedelta +from unittest.mock import MagicMock, Mock, create_autospec, patch import pytest +from sqlalchemy.engine.result import IteratorResult, SimpleResultMetaData +from sqlalchemy.orm import Session from metadata.generated.schema.entity.data.table import ( DmlOperationType, @@ -13,6 +15,7 @@ ) from metadata.ingestion.source.database.snowflake.models import ( SnowflakeDynamicTableRefreshEntry, + SnowflakeQueryLogEntry, ) from metadata.profiler.metrics.system.snowflake.system import ( PUBLIC_SCHEMA, @@ -372,3 +375,112 @@ def test_get_dynamic_table_filters_by_table_name( assert len(inserts) == 1 assert inserts[0].rowsAffected == 10 + + +def test_it_turns_sql_alchemy_response_to_snowflake_query_log_entries() -> None: + start_time = datetime.now() + + session = create_autospec(Session, instance=True) + + # Set up test data + row_metadata = SimpleResultMetaData( + [ + "query_id", + "query_text", + "query_type", + "start_time", + "database_name", + "schema_name", + "rows_inserted", + "rows_updated", + "rows_deleted", + ] + ) + result = IteratorResult( + row_metadata, + iter( + [ + ( + "1", + "INSERT INTO Foo (c, b) VALUES (1, 2), (2, 3)", + "INSERT", + start_time, + "TEST", + "TEST_SCHEMA", + 2, + 0, + 0, + ), + ( + "2", + "DELETE FROM Foo WHERE c = 1", + "DELETE", + start_time + timedelta(hours=1), + "TEST", + "TEST_SCHEMA", + 0, + 0, + 1, + ), + ( + "3", + "UPDATE Foo SET b = 5", + "UPDATE", + start_time + timedelta(hours=2), + "TEST", + "TEST_SCHEMA", + 0, + 1, + 0, + ), + ] + ), + ) + session.execute.return_value = result + + # Mock connection + snowflake_connection = SnowflakeConnection.model_construct( + accountUsageSchema="SNOWFLAKE.ACCOUNT_USAGE" + ) + + queries = SnowflakeQueryLogEntry.get_for_table( + session=session, + tablename="Foo", + service_connection_config=snowflake_connection, + ) + + assert queries == [ + SnowflakeQueryLogEntry( + query_id="1", + query_text="INSERT INTO Foo (c, b) VALUES (1, 2), (2, 3)", + query_type="INSERT", + start_time=start_time, + database_name="TEST", + schema_name="TEST_SCHEMA", + rows_inserted=2, + rows_updated=0, + rows_deleted=0, + ), + SnowflakeQueryLogEntry( + query_id="2", + query_text="DELETE FROM Foo WHERE c = 1", + query_type="DELETE", + start_time=start_time + timedelta(hours=1), + database_name="TEST", + schema_name="TEST_SCHEMA", + rows_inserted=0, + rows_updated=0, + rows_deleted=1, + ), + SnowflakeQueryLogEntry( + query_id="3", + query_text="UPDATE Foo SET b = 5", + query_type="UPDATE", + start_time=start_time + timedelta(hours=2), + database_name="TEST", + schema_name="TEST_SCHEMA", + rows_inserted=0, + rows_updated=1, + rows_deleted=0, + ), + ]