Skip to content

Commit 1b8ea81

Browse files
committed
Fixes #27538: [OpenLineage] Add AWS Glue, Kusto, and Cosmos DB dataset naming support (#27533)
* feat(openlineage): add AWS Glue, Kusto, and Cosmos DB dataset naming support * Add better logging for exceptions * Add cosmos defensive check for `colls/` prefix in names * Correct spell mistakes and wrap long line * fix: guard None source attributes in _sort_array_entity_fields Optional array fields (tasks, columns, fields) on existing OMD entities can be None when the API omits empty arrays in responses. Iterating over None crashed with TypeError. Mirrors the existing `or []` guard already applied to destination_attributes on the line below. * Add test for schema not found case retuning none instead of rasing exception
1 parent 470d0de commit 1b8ea81

2 files changed

Lines changed: 261 additions & 23 deletions

File tree

ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

Lines changed: 99 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
OpenLineage source to extract metadata from Kafka or Kinesis events
1414
"""
1515
import json
16+
import re
1617
import time
1718
import traceback
1819
from collections import defaultdict
@@ -99,7 +100,8 @@ class OpenlineageSource(PipelineServiceSource):
99100
Works under the assumption that OpenLineage integrations produce events to Kafka topic or Kinesis stream,
100101
which is a source of events for this connector.
101102
102-
Only OpenLineage events that indicate successfull data movement (COMPLETE, RUNNING, START) are taken into account in this connector.
103+
Only OpenLineage events that indicate successful data movement (COMPLETE, RUNNING, START) are taken into account
104+
in this connector.
103105
104106
Configuring OpenLineage integrations: https://openlineage.io/docs/integrations/about
105107
"""
@@ -183,6 +185,27 @@ def _get_table_details(cls, data: Dict) -> TableDetails:
183185
"input table name cannot be retrieved from name attribute."
184186
)
185187

188+
namespace = data.get("namespace", "")
189+
190+
# AWS Glue: arn:aws:glue:{region}:{account} / table/{database}/{table}
191+
# Source: https://openlineage.io/docs/spec/naming/
192+
if namespace.startswith("arn:aws:glue:"):
193+
result = OpenlineageSource._parse_glue_table_name(name)
194+
if result:
195+
return result
196+
197+
# Azure Data Explorer (Kusto): azurekusto://{host} / {database}/{table}
198+
if namespace.startswith("azurekusto://"):
199+
result = OpenlineageSource._parse_slash_table_name(name)
200+
if result:
201+
return result
202+
203+
# Azure Cosmos DB: azurecosmos://{host}/dbs/{db} / colls/{collection}
204+
if namespace.startswith("azurecosmos://"):
205+
result = OpenlineageSource._parse_cosmos_table_name(namespace, name)
206+
if result:
207+
return result
208+
186209
name_parts = name.split(".")
187210

188211
if len(name_parts) < 2:
@@ -228,6 +251,59 @@ def _get_topic_details(data: Dict) -> TopicDetails:
228251

229252
return TopicDetails(name=name, broker_hostname=broker_hostname)
230253

254+
@staticmethod
255+
def _parse_glue_table_name(name: str) -> Optional[TableDetails]:
256+
"""
257+
Parse AWS Glue OL dataset name: ``table/{database}/{table}``.
258+
259+
Glue EMR jobs emit a slash-separated name with a ``table/`` prefix instead
260+
of the dot-separated ``schema.table`` convention used by SQL engines.
261+
262+
Source: https://github.com/OpenLineage/OpenLineage/blob/main/client/java/
263+
src/main/java/io/openlineage/client/dataset/Naming.java (GlueNaming)
264+
"""
265+
if not name.startswith("table/"):
266+
return None
267+
parts = name[len("table/") :].split("/")
268+
if len(parts) < 2:
269+
return None
270+
return TableDetails(name=parts[-1].lower(), schema=parts[-2].lower())
271+
272+
@staticmethod
273+
def _parse_slash_table_name(name: str) -> Optional[TableDetails]:
274+
"""
275+
Parse slash-separated ``{database}/{table}`` OL dataset names.
276+
277+
Used by Azure Data Explorer (Kusto):
278+
namespace ``azurekusto://{host}`` / name ``{database}/{table}``
279+
280+
Source: https://github.com/OpenLineage/OpenLineage/blob/main/client/java/
281+
src/main/java/io/openlineage/client/dataset/Naming.java (KustoNaming)
282+
"""
283+
parts = name.split("/")
284+
if len(parts) < 2:
285+
return None
286+
return TableDetails(name=parts[-1].lower(), schema=parts[-2].lower())
287+
288+
@staticmethod
289+
def _parse_cosmos_table_name(namespace: str, name: str) -> Optional[TableDetails]:
290+
"""
291+
Parse Azure Cosmos DB OL dataset names.
292+
293+
The database lives in the namespace path (``azurecosmos://{host}/dbs/{db}``)
294+
while the name field is ``colls/{collection}``.
295+
296+
Source: https://github.com/OpenLineage/OpenLineage/blob/main/client/java/
297+
src/main/java/io/openlineage/client/dataset/Naming.java (CosmosNaming)
298+
"""
299+
db_match = re.search(r"/dbs/([^/]+)", namespace)
300+
coll_match = re.fullmatch(r"colls/([^/]+)", name)
301+
if not db_match or not coll_match:
302+
return None
303+
return TableDetails(
304+
name=coll_match.group(1).lower(), schema=db_match.group(1).lower()
305+
)
306+
231307
def _get_by_name_cached(self, entity_class, fqn_str: str, **kwargs):
232308
"""Wrapper around metadata.get_by_name with in-memory caching."""
233309
if not hasattr(self, "_entity_cache"):
@@ -323,6 +399,11 @@ def _get_table_fqn(
323399
)
324400
return f"{schema_fqn}.{table_details.name}"
325401
except FQNNotFoundException:
402+
logger.debug(
403+
f"Table '{table_details.name}' in schema '{table_details.schema}' "
404+
f"not found in services {resolved_services or self.get_db_service_names()}. "
405+
"Skipping lineage edge."
406+
)
326407
return None
327408
except Exception:
328409
logger.warning(
@@ -465,7 +546,7 @@ def _get_schema_fqn_from_om(
465546

466547
if not result:
467548
raise FQNNotFoundException(
468-
f"Schema FQN not found within services: {services}"
549+
f"Schema '{schema}' not found in services: {services}"
469550
)
470551

471552
return result
@@ -565,14 +646,13 @@ def get_create_table_request(self, table: Dict) -> Optional[Either]:
565646
if not om_table_fqn:
566647
try:
567648
om_schema_fqn = self._get_schema_fqn_from_om(table_details.schema)
568-
except FQNNotFoundException as e:
569-
return Either(
570-
left=StackTraceError(
571-
name="",
572-
error=f"Failed to get fully qualified schema name: {e}",
573-
stackTrace=traceback.format_exc(),
574-
)
649+
except FQNNotFoundException:
650+
logger.warning(
651+
f"Schema '{table_details.schema}' not found in configured services "
652+
f"{self.get_db_service_names()}. Skipping table creation for "
653+
f"'{table_details.name}'."
575654
)
655+
return None
576656

577657
# After finding schema fqn (based on partial schema name) we know where we can create table
578658
# and we move forward with creating request.
@@ -870,10 +950,13 @@ def _poll_kafka(self, broker: KafkaBrokerConfig) -> Iterable[OpenLineageEvent]:
870950
if result:
871951
yield result
872952
except Exception as e:
873-
logger.debug(e)
953+
logger.warning(
954+
f"Failed to parse OpenLineage event from Kafka message: {e}"
955+
)
956+
logger.debug(traceback.format_exc())
874957

875958
except Exception as e:
876-
traceback.print_exc()
959+
logger.debug(traceback.format_exc())
877960
raise InvalidSourceException(f"Failed to read from Kafka: {str(e)}")
878961

879962
finally:
@@ -933,12 +1016,15 @@ def _poll_kinesis(self, broker: KinesisBrokerConfig) -> Iterable[OpenLineageEven
9331016
if result:
9341017
yield result
9351018
except Exception as e:
936-
logger.debug(e)
1019+
logger.warning(
1020+
f"Failed to parse OpenLineage event from Kinesis record: {e}"
1021+
)
1022+
logger.debug(traceback.format_exc())
9371023

9381024
time.sleep(pool_timeout)
9391025

9401026
except Exception as e:
941-
traceback.print_exc()
1027+
logger.debug(traceback.format_exc())
9421028
raise InvalidSourceException(f"Failed to read from Kinesis: {str(e)}")
9431029

9441030
def get_pipeline_name(self, pipeline_details: OpenLineageEvent) -> str:

ingestion/tests/unit/topology/pipeline/test_openlineage.py

Lines changed: 162 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,28 @@ def test_get_create_table_request(self, mock_get_schema_fqn, mock_get_table_fqn)
799799
create_request.columns[i].dataTypeDisplay, expected_type_display
800800
)
801801

802+
@patch(
803+
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om"
804+
)
805+
@patch(
806+
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_schema_fqn_from_om"
807+
)
808+
def test_get_create_table_request_schema_not_found_returns_none(
809+
self, mock_get_schema_fqn, mock_get_table_fqn
810+
):
811+
"""Schema not found in any configured service — returns None without raising."""
812+
mock_get_table_fqn.side_effect = FQNNotFoundException("Table not found")
813+
mock_get_schema_fqn.side_effect = FQNNotFoundException("Schema not found")
814+
table_data = {
815+
"name": "unknown_schema.employees",
816+
"namespace": "bigquery",
817+
"facets": {},
818+
}
819+
820+
result = self.open_lineage_source.get_create_table_request(table_data)
821+
822+
assert result is None
823+
802824
@patch("confluent_kafka.Consumer")
803825
def test_get_pipelines_list_filters_complete_events(self, mock_consumer_class):
804826
"""Test that get_pipelines_list returns COMPLETE events"""
@@ -1689,16 +1711,18 @@ def test_yield_pipeline_lineage_topic_not_found_skips_gracefully(self):
16891711
mock_pipeline = Mock()
16901712
mock_pipeline.id.root = pipeline_id
16911713

1692-
with patch.object(
1693-
self.open_lineage_source, "metadata"
1694-
) as mock_metadata, patch.object(
1695-
self.open_lineage_source,
1696-
"_get_table_fqn",
1697-
return_value="db-service.public.some_table",
1698-
), patch.object(
1699-
self.open_lineage_source,
1700-
"get_create_table_request",
1701-
return_value=None,
1714+
with (
1715+
patch.object(self.open_lineage_source, "metadata") as mock_metadata,
1716+
patch.object(
1717+
self.open_lineage_source,
1718+
"_get_table_fqn",
1719+
return_value="db-service.public.some_table",
1720+
),
1721+
patch.object(
1722+
self.open_lineage_source,
1723+
"get_create_table_request",
1724+
return_value=None,
1725+
),
17021726
):
17031727
# Empty messaging services list — no broker match for unknown-broker
17041728
mock_metadata.list_all_entities.return_value = iter([])
@@ -1735,6 +1759,134 @@ def mock_get_by_name(entity, fqn, **kwargs):
17351759
"No lineage edges should be produced when input topic cannot be resolved",
17361760
)
17371761

1762+
def test_parse_glue_table_name_trino_glue_catalog_schema(self):
1763+
"""Trino backed by AWS Glue Data Catalog uses the public schema and underscore-separated table names.
1764+
Verifies the parser handles the common Glue catalog table naming pattern correctly.
1765+
"""
1766+
result = OpenlineageSource._parse_glue_table_name(
1767+
"table/public/order_line_items"
1768+
)
1769+
self.assertEqual(result.name, "order_line_items")
1770+
self.assertEqual(result.schema, "public")
1771+
1772+
def test_parse_glue_table_name_happy_path(self):
1773+
"""Glue OL naming: table/{database}/{table} — source: Naming.java GlueNaming."""
1774+
result = OpenlineageSource._parse_glue_table_name("table/sales/users")
1775+
self.assertEqual(result.name, "users")
1776+
self.assertEqual(result.schema, "sales")
1777+
1778+
def test_parse_glue_table_name_normalizes_to_lowercase(self):
1779+
"""Glue table and database names are normalized to lowercase for FQN matching."""
1780+
result = OpenlineageSource._parse_glue_table_name("table/Sales/Users")
1781+
self.assertEqual(result.name, "users")
1782+
self.assertEqual(result.schema, "sales")
1783+
1784+
def test_parse_glue_table_name_not_glue_format_returns_none(self):
1785+
"""Names without the table/ prefix are not Glue format and return None."""
1786+
self.assertIsNone(OpenlineageSource._parse_glue_table_name("sales.users"))
1787+
1788+
def test_parse_glue_table_name_missing_table_part_returns_none(self):
1789+
"""table/ prefix with only one path segment is malformed and returns None."""
1790+
self.assertIsNone(OpenlineageSource._parse_glue_table_name("table/only_db"))
1791+
1792+
def test_parse_slash_table_name_happy_path(self):
1793+
"""Kusto OL naming: {database}/{table} — source: Naming.java KustoNaming."""
1794+
result = OpenlineageSource._parse_slash_table_name("mydb/mytable")
1795+
self.assertEqual(result.name, "mytable")
1796+
self.assertEqual(result.schema, "mydb")
1797+
1798+
def test_parse_slash_table_name_normalizes_to_lowercase(self):
1799+
"""Kusto table and database names are normalized to lowercase for FQN matching."""
1800+
result = OpenlineageSource._parse_slash_table_name("MyDB/MyTable")
1801+
self.assertEqual(result.name, "mytable")
1802+
self.assertEqual(result.schema, "mydb")
1803+
1804+
def test_parse_slash_table_name_single_part_returns_none(self):
1805+
"""A single path segment without a slash cannot be split into db/table and returns None."""
1806+
self.assertIsNone(OpenlineageSource._parse_slash_table_name("only_table"))
1807+
1808+
def test_parse_cosmos_table_name_happy_path(self):
1809+
"""Cosmos OL naming: db from namespace /dbs/{db}, name colls/{coll} — source: Naming.java CosmosNaming."""
1810+
result = OpenlineageSource._parse_cosmos_table_name(
1811+
"azurecosmos://myaccount.documents.azure.com/dbs/mydb",
1812+
"colls/mycollection",
1813+
)
1814+
self.assertEqual(result.name, "mycollection")
1815+
self.assertEqual(result.schema, "mydb")
1816+
1817+
def test_parse_cosmos_table_name_normalizes_to_lowercase(self):
1818+
"""Cosmos database and collection names are normalized to lowercase for FQN matching."""
1819+
result = OpenlineageSource._parse_cosmos_table_name(
1820+
"azurecosmos://host/dbs/MyDB", "colls/MyCollection"
1821+
)
1822+
self.assertEqual(result.name, "mycollection")
1823+
self.assertEqual(result.schema, "mydb")
1824+
1825+
def test_parse_cosmos_table_name_no_dbs_segment_returns_none(self):
1826+
"""A Cosmos namespace without /dbs/{db} cannot provide the database name and returns None."""
1827+
self.assertIsNone(
1828+
OpenlineageSource._parse_cosmos_table_name(
1829+
"azurecosmos://host", "colls/mycoll"
1830+
)
1831+
)
1832+
1833+
def test_parse_cosmos_table_name_non_colls_name_returns_none(self):
1834+
"""A Cosmos name not matching colls/{collection} is non-conformant and returns None."""
1835+
self.assertIsNone(
1836+
OpenlineageSource._parse_cosmos_table_name(
1837+
"azurecosmos://host/dbs/mydb", "mycollection"
1838+
)
1839+
)
1840+
1841+
def test_get_table_details_glue_namespace_parses_slash_name(self):
1842+
"""AWS Glue EMR events use arn:aws:glue namespace + table/{db}/{table} name."""
1843+
data = {
1844+
"namespace": "arn:aws:glue:us-east-1:123456789012",
1845+
"name": "table/sales/users",
1846+
}
1847+
result = OpenlineageSource._get_table_details(data)
1848+
self.assertEqual(result.name, "users")
1849+
self.assertEqual(result.schema, "sales")
1850+
1851+
def test_get_table_details_kusto_namespace_parses_slash_name(self):
1852+
"""Azure Kusto events use azurekusto namespace + {db}/{table} name."""
1853+
data = {
1854+
"namespace": "azurekusto://mycluster.kusto.windows.net",
1855+
"name": "mydb/mytable",
1856+
}
1857+
result = OpenlineageSource._get_table_details(data)
1858+
self.assertEqual(result.name, "mytable")
1859+
self.assertEqual(result.schema, "mydb")
1860+
1861+
def test_get_table_details_cosmos_namespace_parses_colls_name(self):
1862+
"""Azure Cosmos DB events carry the database in the namespace path."""
1863+
data = {
1864+
"namespace": "azurecosmos://host.documents.azure.com/dbs/mydb",
1865+
"name": "colls/orders",
1866+
}
1867+
result = OpenlineageSource._get_table_details(data)
1868+
self.assertEqual(result.name, "orders")
1869+
self.assertEqual(result.schema, "mydb")
1870+
1871+
def test_get_entity_details_glue_namespace_resolves_to_table(self):
1872+
"""Glue ARN namespace + table/{db}/{table} name resolves to a table entity."""
1873+
data = {
1874+
"namespace": "arn:aws:glue:us-east-1:123456789012",
1875+
"name": "table/sales/users",
1876+
"facets": {},
1877+
}
1878+
result = OpenlineageSource._get_entity_details(data)
1879+
self.assertIsNotNone(result)
1880+
self.assertEqual(result.entity_type, "table")
1881+
self.assertEqual(result.table_details.name, "users")
1882+
self.assertEqual(result.table_details.schema, "sales")
1883+
1884+
def test_get_entity_details_unparseable_name_raises_value_error(self):
1885+
"""Unrecognised name formats raise ValueError so callers can surface the error."""
1886+
data = {"namespace": "trino://host:8080", "name": "invalidname"}
1887+
with self.assertRaises(ValueError):
1888+
OpenlineageSource._get_entity_details(data)
1889+
17381890

17391891
if __name__ == "__main__":
17401892
unittest.main()

0 commit comments

Comments
 (0)