Skip to content

Commit d04f8a2

Browse files
Fix usage Entity Already Exists (#24882)
1 parent 911f647 commit d04f8a2

3 files changed

Lines changed: 200 additions & 9 deletions

File tree

ingestion/src/metadata/ingestion/bulksink/metadata_usage.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -283,16 +283,21 @@ def get_table_usage_and_joins(
283283
table_entity=table_entity, table_usage=table_usage
284284
)
285285
except APIError as err:
286-
error = f"Failed to update query join for {table_usage}: {err}"
287-
logger.debug(traceback.format_exc())
288-
logger.warning(error)
289-
self.status.failed(
290-
StackTraceError(
291-
name=table_usage.table,
292-
error=error,
293-
stackTrace=traceback.format_exc(),
286+
if err.status_code == 409:
287+
logger.warning(
288+
f"Entity already exists for {table_usage.table}, skipping: {err}"
289+
)
290+
else:
291+
error = f"Failed to update query join for {table_usage}: {err}"
292+
logger.debug(traceback.format_exc())
293+
logger.warning(error)
294+
self.status.failed(
295+
StackTraceError(
296+
name=table_usage.table,
297+
error=error,
298+
stackTrace=traceback.format_exc(),
299+
)
294300
)
295-
)
296301
except Exception as exc:
297302
name = table_entity.name.root
298303
error = (
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
"""
12+
Unit tests for MetadataUsageBulkSink error handling
13+
"""
14+
from unittest import TestCase
15+
from unittest.mock import MagicMock
16+
from uuid import uuid4
17+
18+
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
19+
from metadata.generated.schema.type.basic import (
20+
FullyQualifiedEntityName,
21+
SqlQuery,
22+
Timestamp,
23+
)
24+
from metadata.generated.schema.type.tableUsageCount import TableUsageCount
25+
from metadata.ingestion.bulksink.metadata_usage import (
26+
MetadataUsageBulkSink,
27+
MetadataUsageSinkConfig,
28+
)
29+
from metadata.ingestion.ometa.client import APIError
30+
31+
32+
def create_api_error(status_code: int, message: str) -> APIError:
33+
"""Helper to create APIError with specific status code"""
34+
http_error = MagicMock()
35+
http_error.response.status_code = status_code
36+
return APIError({"code": status_code, "message": message}, http_error=http_error)
37+
38+
39+
def create_mock_table(name: str = "test_table") -> MagicMock:
40+
"""Create a minimal mock Table entity"""
41+
table = MagicMock()
42+
table.id.root = uuid4()
43+
table.name.root = name
44+
table.fullyQualifiedName.root = f"service.db.schema.{name}"
45+
return table
46+
47+
48+
def create_table_usage_with_queries() -> TableUsageCount:
49+
"""Create a TableUsageCount with SQL queries for testing"""
50+
return TableUsageCount(
51+
table="test_table",
52+
date="1702000000000",
53+
databaseName="test_db",
54+
databaseSchema="test_schema",
55+
count=1,
56+
sqlQueries=[
57+
CreateQueryRequest(
58+
query=SqlQuery("SELECT * FROM test_table"),
59+
queryDate=Timestamp(1702000000000),
60+
service=FullyQualifiedEntityName("test_service"),
61+
)
62+
],
63+
joins=[],
64+
serviceName="test_service",
65+
)
66+
67+
68+
def create_table_usage() -> TableUsageCount:
69+
"""Create a minimal TableUsageCount for testing (no queries)"""
70+
return TableUsageCount(
71+
table="test_table",
72+
date="1702000000000",
73+
databaseName="test_db",
74+
databaseSchema="test_schema",
75+
count=1,
76+
sqlQueries=None,
77+
joins=[],
78+
serviceName="test_service",
79+
)
80+
81+
82+
class TestMetadataUsageBulkSinkErrorHandling(TestCase):
83+
"""Test APIError handling in MetadataUsageBulkSink"""
84+
85+
def setUp(self):
86+
"""Set up test fixtures"""
87+
self.mock_metadata = MagicMock()
88+
self.config = MetadataUsageSinkConfig(filename="/tmp/test_usage")
89+
self.sink = MetadataUsageBulkSink(
90+
config=self.config, metadata=self.mock_metadata
91+
)
92+
self.sink.service_name = "test_service"
93+
94+
def test_api_error_409_logs_warning_and_continues(self):
95+
"""Test that 409 (entity conflict) errors are logged as warnings and don't mark as failed"""
96+
mock_table = create_mock_table()
97+
table_usage = create_table_usage_with_queries()
98+
99+
self.mock_metadata.ingest_entity_queries_data.side_effect = create_api_error(
100+
409, "Entity already exists"
101+
)
102+
103+
initial_failures = len(self.sink.status.failures)
104+
self.sink.get_table_usage_and_joins([mock_table], table_usage)
105+
106+
self.assertEqual(
107+
len(self.sink.status.failures),
108+
initial_failures,
109+
"409 error should not add to failures list",
110+
)
111+
112+
def test_api_error_400_marks_as_failed(self):
113+
"""Test that 400 (bad request) errors mark the ingestion as failed"""
114+
mock_table = create_mock_table()
115+
table_usage = create_table_usage_with_queries()
116+
117+
self.mock_metadata.ingest_entity_queries_data.side_effect = create_api_error(
118+
400, "Date range can only include past 30 days starting today"
119+
)
120+
121+
initial_failures = len(self.sink.status.failures)
122+
self.sink.get_table_usage_and_joins([mock_table], table_usage)
123+
124+
self.assertEqual(
125+
len(self.sink.status.failures),
126+
initial_failures + 1,
127+
"400 error should add to failures list",
128+
)
129+
130+
def test_api_error_500_marks_as_failed(self):
131+
"""Test that 500 (server error) errors mark the ingestion as failed"""
132+
mock_table = create_mock_table()
133+
table_usage = create_table_usage_with_queries()
134+
135+
self.mock_metadata.ingest_entity_queries_data.side_effect = create_api_error(
136+
500, "Internal server error"
137+
)
138+
139+
initial_failures = len(self.sink.status.failures)
140+
self.sink.get_table_usage_and_joins([mock_table], table_usage)
141+
142+
self.assertEqual(
143+
len(self.sink.status.failures),
144+
initial_failures + 1,
145+
"500 error should add to failures list",
146+
)
147+
148+
def test_api_error_409_allows_processing_to_continue(self):
149+
"""Test that after a 409 error, the sink can continue processing other tables"""
150+
mock_table1 = create_mock_table("table1")
151+
mock_table2 = create_mock_table("table2")
152+
table_usage = create_table_usage_with_queries()
153+
154+
call_count = [0]
155+
156+
def side_effect_fn(*args, **kwargs):
157+
call_count[0] += 1
158+
if call_count[0] == 1:
159+
raise create_api_error(409, "Entity already exists")
160+
return None
161+
162+
self.mock_metadata.ingest_entity_queries_data.side_effect = side_effect_fn
163+
164+
initial_failures = len(self.sink.status.failures)
165+
self.sink.get_table_usage_and_joins([mock_table1, mock_table2], table_usage)
166+
167+
self.assertEqual(
168+
len(self.sink.status.failures),
169+
initial_failures,
170+
"409 errors should not add to failures list",
171+
)
172+
self.assertEqual(
173+
self.mock_metadata.ingest_entity_queries_data.call_count,
174+
2,
175+
"Both tables should be processed",
176+
)

0 commit comments

Comments
 (0)