|
1 | | -from datetime import datetime |
2 | | -from unittest.mock import MagicMock, Mock, patch |
| 1 | +from datetime import datetime, timedelta |
| 2 | +from unittest.mock import MagicMock, Mock, create_autospec, patch |
3 | 3 |
|
4 | 4 | import pytest |
| 5 | +from sqlalchemy.engine.result import IteratorResult, SimpleResultMetaData |
| 6 | +from sqlalchemy.orm import Session |
5 | 7 |
|
6 | 8 | from metadata.generated.schema.entity.data.table import ( |
7 | 9 | DmlOperationType, |
|
13 | 15 | ) |
14 | 16 | from metadata.ingestion.source.database.snowflake.models import ( |
15 | 17 | SnowflakeDynamicTableRefreshEntry, |
| 18 | + SnowflakeQueryLogEntry, |
16 | 19 | ) |
17 | 20 | from metadata.profiler.metrics.system.snowflake.system import ( |
18 | 21 | PUBLIC_SCHEMA, |
@@ -372,3 +375,112 @@ def test_get_dynamic_table_filters_by_table_name( |
372 | 375 |
|
373 | 376 | assert len(inserts) == 1 |
374 | 377 | assert inserts[0].rowsAffected == 10 |
| 378 | + |
| 379 | + |
| 380 | +def test_it_turns_sql_alchemy_response_to_snowflake_query_log_entries() -> None: |
| 381 | + start_time = datetime.now() |
| 382 | + |
| 383 | + session = create_autospec(Session, instance=True) |
| 384 | + |
| 385 | + # Set up test data |
| 386 | + row_metadata = SimpleResultMetaData( |
| 387 | + [ |
| 388 | + "query_id", |
| 389 | + "query_text", |
| 390 | + "query_type", |
| 391 | + "start_time", |
| 392 | + "database_name", |
| 393 | + "schema_name", |
| 394 | + "rows_inserted", |
| 395 | + "rows_updated", |
| 396 | + "rows_deleted", |
| 397 | + ] |
| 398 | + ) |
| 399 | + result = IteratorResult( |
| 400 | + row_metadata, |
| 401 | + iter( |
| 402 | + [ |
| 403 | + ( |
| 404 | + "1", |
| 405 | + "INSERT INTO Foo (c, b) VALUES (1, 2), (2, 3)", |
| 406 | + "INSERT", |
| 407 | + start_time, |
| 408 | + "TEST", |
| 409 | + "TEST_SCHEMA", |
| 410 | + 2, |
| 411 | + 0, |
| 412 | + 0, |
| 413 | + ), |
| 414 | + ( |
| 415 | + "2", |
| 416 | + "DELETE FROM Foo WHERE c = 1", |
| 417 | + "DELETE", |
| 418 | + start_time + timedelta(hours=1), |
| 419 | + "TEST", |
| 420 | + "TEST_SCHEMA", |
| 421 | + 0, |
| 422 | + 0, |
| 423 | + 1, |
| 424 | + ), |
| 425 | + ( |
| 426 | + "3", |
| 427 | + "UPDATE Foo SET b = 5", |
| 428 | + "UPDATE", |
| 429 | + start_time + timedelta(hours=2), |
| 430 | + "TEST", |
| 431 | + "TEST_SCHEMA", |
| 432 | + 0, |
| 433 | + 1, |
| 434 | + 0, |
| 435 | + ), |
| 436 | + ] |
| 437 | + ), |
| 438 | + ) |
| 439 | + session.execute.return_value = result |
| 440 | + |
| 441 | + # Mock connection |
| 442 | + snowflake_connection = SnowflakeConnection.model_construct( |
| 443 | + accountUsageSchema="SNOWFLAKE.ACCOUNT_USAGE" |
| 444 | + ) |
| 445 | + |
| 446 | + queries = SnowflakeQueryLogEntry.get_for_table( |
| 447 | + session=session, |
| 448 | + tablename="Foo", |
| 449 | + service_connection_config=snowflake_connection, |
| 450 | + ) |
| 451 | + |
| 452 | + assert queries == [ |
| 453 | + SnowflakeQueryLogEntry( |
| 454 | + query_id="1", |
| 455 | + query_text="INSERT INTO Foo (c, b) VALUES (1, 2), (2, 3)", |
| 456 | + query_type="INSERT", |
| 457 | + start_time=start_time, |
| 458 | + database_name="TEST", |
| 459 | + schema_name="TEST_SCHEMA", |
| 460 | + rows_inserted=2, |
| 461 | + rows_updated=0, |
| 462 | + rows_deleted=0, |
| 463 | + ), |
| 464 | + SnowflakeQueryLogEntry( |
| 465 | + query_id="2", |
| 466 | + query_text="DELETE FROM Foo WHERE c = 1", |
| 467 | + query_type="DELETE", |
| 468 | + start_time=start_time + timedelta(hours=1), |
| 469 | + database_name="TEST", |
| 470 | + schema_name="TEST_SCHEMA", |
| 471 | + rows_inserted=0, |
| 472 | + rows_updated=0, |
| 473 | + rows_deleted=1, |
| 474 | + ), |
| 475 | + SnowflakeQueryLogEntry( |
| 476 | + query_id="3", |
| 477 | + query_text="UPDATE Foo SET b = 5", |
| 478 | + query_type="UPDATE", |
| 479 | + start_time=start_time + timedelta(hours=2), |
| 480 | + database_name="TEST", |
| 481 | + schema_name="TEST_SCHEMA", |
| 482 | + rows_inserted=0, |
| 483 | + rows_updated=1, |
| 484 | + rows_deleted=0, |
| 485 | + ), |
| 486 | + ] |
0 commit comments