From bd8fb5f1538144d57f18ca66a7b63fc454b48b73 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Wed, 22 Apr 2026 23:45:04 -0700 Subject: [PATCH 1/6] Improve SSRS Connector - Lineage --- .claude/scheduled_tasks.lock | 1 + .../ingestion/source/dashboard/ssrs/client.py | 61 +++ .../source/dashboard/ssrs/metadata.py | 347 +++++++++++++++++- .../source/dashboard/ssrs/rdl_parser.py | 204 ++++++++++ ingestion/tests/integration/ssrs/conftest.py | 48 +++ .../tests/integration/ssrs/test_metadata.py | 30 ++ .../fixtures/ssrs/expression_commandtype.rdl | 23 ++ .../ssrs/inline_multi_dataset_2010.rdl | 35 ++ .../ssrs/inline_single_dataset_2016.rdl | 27 ++ .../dashboard/fixtures/ssrs/malformed.rdl | 4 + .../dashboard/fixtures/ssrs/no_datasource.rdl | 5 + .../fixtures/ssrs/shared_datasource.rdl | 21 ++ .../unit/topology/dashboard/test_ssrs.py | 343 ++++++++++++++++- .../dashboard/test_ssrs_rdl_parser.py | 143 ++++++++ .../entity/data/dashboardDataModel.json | 6 +- 15 files changed, 1278 insertions(+), 20 deletions(-) create mode 100644 .claude/scheduled_tasks.lock create mode 100644 ingestion/src/metadata/ingestion/source/dashboard/ssrs/rdl_parser.py create mode 100644 ingestion/tests/unit/topology/dashboard/fixtures/ssrs/expression_commandtype.rdl create mode 100644 ingestion/tests/unit/topology/dashboard/fixtures/ssrs/inline_multi_dataset_2010.rdl create mode 100644 ingestion/tests/unit/topology/dashboard/fixtures/ssrs/inline_single_dataset_2016.rdl create mode 100644 ingestion/tests/unit/topology/dashboard/fixtures/ssrs/malformed.rdl create mode 100644 ingestion/tests/unit/topology/dashboard/fixtures/ssrs/no_datasource.rdl create mode 100644 ingestion/tests/unit/topology/dashboard/fixtures/ssrs/shared_datasource.rdl create mode 100644 ingestion/tests/unit/topology/dashboard/test_ssrs_rdl_parser.py diff --git a/.claude/scheduled_tasks.lock b/.claude/scheduled_tasks.lock new file mode 100644 index 000000000000..ce2f15798e50 --- /dev/null +++ b/.claude/scheduled_tasks.lock @@ -0,0 +1 @@ +{"sessionId":"7ec70b3a-5b8b-4854-b2f0-9dc0e9773078","pid":30468,"acquiredAt":1776386757898} \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py index 0c08ec3745db..2fd6de665717 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py @@ -11,6 +11,8 @@ """ SSRS REST client """ +import base64 +import binascii from typing import Iterable, Iterator, Optional, Union import requests @@ -36,12 +38,15 @@ API_VERSION = "api/v2.0" CONNECT_TIMEOUT = 10 READ_TIMEOUT = 120 +RDL_READ_TIMEOUT = 60 PAGE_SIZE = 100 MAX_RETRIES = 2 BACKOFF_FACTOR = 1 RETRY_STATUS_CODES = (500, 502, 503, 504) REPORT_SELECT_FIELDS = "Id,Name,Path,Description,Type,Hidden,HasDataSources" FOLDER_SELECT_FIELDS = "Id,Name,Path" +RDL_CONTENT_PATHS = ("/Reports({id})/Content/$value", "/CatalogItems({id})/Content") +RDL_NOT_FOUND_STATUS = {404, 400} class SsrsClient: @@ -136,3 +141,59 @@ def get_reports(self) -> Iterator[SsrsReport]: } for data in self._paginate("/Reports", params, "reports"): yield from SsrsReportListResponse(**data).value + + def get_report_definition(self, report_id: str) -> Optional[bytes]: + """Return the RDL XML bytes for a report, or ``None`` if unavailable. + + Tries ``/Reports({id})/Content/$value`` first, then ``/CatalogItems({id})/Content``. + Not-found responses (404/400) trigger fallback silently; transport errors + propagate so operators see outages instead of empty catalogs.""" + last_err: Optional[Exception] = None + for template in RDL_CONTENT_PATHS: + path = template.format(id=report_id) + try: + body = self._fetch_report_content(path) + except requests.RequestException as exc: + last_err = exc + logger.warning("RDL fetch transport error for %s: %s", path, exc) + continue + if body is not None: + return body + if last_err is not None: + raise SourceConnectionException( + f"Failed to fetch RDL content for report [{report_id}]: {last_err}" + ) from last_err + return None + + def _fetch_report_content(self, path: str) -> Optional[bytes]: + url = f"{self.base_url}{path}" + resp = self.session.get( + url, + timeout=(CONNECT_TIMEOUT, RDL_READ_TIMEOUT), + headers={"Accept": "application/xml,application/octet-stream"}, + ) + if resp.status_code in RDL_NOT_FOUND_STATUS: + return None + if not resp.ok: + logger.warning("RDL fetch returned HTTP %s for %s", resp.status_code, path) + return None + return _decode_rdl_response(resp, path) + + +def _decode_rdl_response(resp: requests.Response, path: str) -> Optional[bytes]: + content_type = (resp.headers.get("Content-Type") or "").lower() + if "json" not in content_type: + return resp.content or None + try: + payload = resp.json() + except ValueError: + return resp.content or None + value = payload.get("Value") if isinstance(payload, dict) else None + if not value: + logger.warning("RDL JSON response missing 'Value' field at %s", path) + return None + try: + return base64.b64decode(value) + except (binascii.Error, ValueError) as exc: + logger.warning("Malformed base64 in RDL response at %s: %s", path, exc) + return None diff --git a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py index f4f1c2e36127..8990391cfdfc 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py @@ -12,18 +12,29 @@ SSRS source module """ import traceback -from typing import Any, Dict, Iterable, Optional +from dataclasses import dataclass +from typing import Any, Dict, Iterable, List, Optional, Union from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest +from metadata.generated.schema.api.data.createDashboardDataModel import ( + CreateDashboardDataModelRequest, +) from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.chart import Chart, ChartType +from metadata.generated.schema.entity.data.dashboard import Dashboard +from metadata.generated.schema.entity.data.dashboardDataModel import ( + DashboardDataModel, + DataModelType, +) +from metadata.generated.schema.entity.data.table import Column, DataType, Table from metadata.generated.schema.entity.services.connections.dashboard.ssrsConnection import ( SsrsConnection, ) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) +from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, ) @@ -35,19 +46,55 @@ FullyQualifiedEntityName, Markdown, SourceUrl, + SqlQuery, ) from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect +from metadata.ingestion.lineage.parser import LineageParser from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource from metadata.ingestion.source.dashboard.ssrs.models import SsrsReport +from metadata.ingestion.source.dashboard.ssrs.rdl_parser import ( + SsrsDataSet, + SsrsDataSource, + SsrsReportDefinition, + parse_rdl, +) from metadata.utils import fqn from metadata.utils.filters import filter_by_chart -from metadata.utils.helpers import clean_uri +from metadata.utils.fqn import build_es_fqn_search_string +from metadata.utils.helpers import clean_uri, get_database_name_for_lineage from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +SKIP_COMMAND_TYPES = {"StoredProcedure", "Expression"} +MDX_PROVIDERS = {"OLEDB-MD", "ADOMD", "SAPBW"} + +DATA_PROVIDER_DIALECT = { + "SQL": Dialect.TSQL, + "ORACLE": Dialect.ORACLE, + "MYSQL": Dialect.MYSQL, + "POSTGRESQL": Dialect.POSTGRES, + "PGSQL": Dialect.POSTGRES, + "DB2": Dialect.DB2, + "SNOWFLAKE": Dialect.SNOWFLAKE, + "REDSHIFT": Dialect.REDSHIFT, + "BIGQUERY": Dialect.BIGQUERY, + "TERADATA": Dialect.TERADATA, +} + + +@dataclass(frozen=True) +class _LineageContext: + db_service_name: Optional[str] + db_service_entity: Optional[DatabaseService] + prefix_database: Optional[str] + prefix_schema: Optional[str] + prefix_table: Optional[str] + dialect: Dialect + class SsrsSource(DashboardServiceSource): config: WorkflowSource @@ -75,6 +122,7 @@ def __init__( ): super().__init__(config, metadata) self.folder_path_map: Dict[str, str] = {} + self._report_definitions: Dict[str, SsrsReportDefinition] = {} def prepare(self): self.folder_path_map = { @@ -91,8 +139,30 @@ def get_dashboard_name(self, dashboard: SsrsReport) -> str: return dashboard.name def get_dashboard_details(self, dashboard: SsrsReport) -> Optional[SsrsReport]: + self._load_report_definition(dashboard) return dashboard + def _load_report_definition(self, dashboard: SsrsReport) -> None: + if dashboard.has_data_sources is False: + return + try: + rdl_bytes = self.client.get_report_definition(dashboard.id) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + "Could not fetch RDL for report [%s]: %s", dashboard.name, exc + ) + return + if not rdl_bytes: + return + try: + self._report_definitions[dashboard.id] = parse_rdl(rdl_bytes) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + "Could not parse RDL for report [%s]: %s", dashboard.name, exc + ) + def get_project_name(self, dashboard_details: Any) -> Optional[str]: try: if isinstance(dashboard_details, SsrsReport) and dashboard_details.path: @@ -181,12 +251,283 @@ def yield_dashboard_chart( ) ) + def yield_datamodel( + self, dashboard_details: SsrsReport + ) -> Iterable[Either[CreateDashboardDataModelRequest]]: + if not self.source_config.includeDataModels: + return + rdl = self._report_definitions.get(dashboard_details.id) + if not rdl: + return + for dataset in rdl.data_sets: + try: + datamodel_request = self._build_datamodel_request( + dashboard_details, dataset + ) + if datamodel_request is None: + continue + yield Either(right=datamodel_request) + self.register_record_datamodel(datamodel_request=datamodel_request) + except Exception as exc: + yield Either( + left=StackTraceError( + name=f"{dashboard_details.name}.{dataset.name}", + error=( + f"Error yielding DataModel [{dataset.name}] for report " + f"[{dashboard_details.name}]: {exc}" + ), + stackTrace=traceback.format_exc(), + ) + ) + + def _build_datamodel_request( + self, dashboard_details: SsrsReport, dataset: SsrsDataSet + ) -> Optional[CreateDashboardDataModelRequest]: + datamodel_name = self._datamodel_name(dashboard_details.id, dataset.name) + sql = ( + dataset.command_text + if dataset.command_text and dataset.command_type not in SKIP_COMMAND_TYPES + else None + ) + return CreateDashboardDataModelRequest( + name=EntityName(datamodel_name), + displayName=dataset.name, + service=FullyQualifiedEntityName(self.context.get().dashboard_service), + dataModelType=DataModelType.SsrsDataModel.value, + serviceType=self.service_connection.type.value, + sql=SqlQuery(sql) if sql else None, + columns=self._build_datamodel_columns(dataset), + ) + + @staticmethod + def _datamodel_name(report_id: str, dataset_name: str) -> str: + return f"{report_id}.{dataset_name}" + + @staticmethod + def _build_datamodel_columns(dataset: SsrsDataSet) -> List[Column]: + columns: List[Column] = [] + for field_info in dataset.fields: + try: + columns.append( + Column( + name=field_info.name, + displayName=field_info.name, + dataType=DataType.UNKNOWN, + dataTypeDisplay="SSRS Field", + ) + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + "Error building SSRS datamodel column [%s]: %s", + field_info.name, + exc, + ) + return columns + def yield_dashboard_lineage_details( self, dashboard_details: SsrsReport, db_service_prefix: Optional[str] = None, ) -> Iterable[Either[AddLineageRequest]]: - return + rdl = self._report_definitions.get(dashboard_details.id) + if not rdl: + return + + ( + db_service_name, + prefix_database, + prefix_schema, + prefix_table, + ) = self.parse_db_service_prefix(db_service_prefix) + + db_service_entity = self._resolve_db_service(db_service_name) + datasource_index = {ds.name: ds for ds in rdl.data_sources} + + try: + for dataset in rdl.data_sets: + datasource = datasource_index.get(dataset.data_source_name or "") + context = _LineageContext( + db_service_name=db_service_name, + db_service_entity=db_service_entity, + prefix_database=prefix_database, + prefix_schema=prefix_schema, + prefix_table=prefix_table, + dialect=self._resolve_dialect(db_service_entity, datasource), + ) + try: + yield from self._yield_dataset_lineage( + dashboard_details, dataset, datasource, context + ) + except Exception as exc: + yield Either( + left=StackTraceError( + name=f"{dashboard_details.name}.{dataset.name}", + error=( + f"Error yielding lineage for dataset [{dataset.name}] " + f"in report [{dashboard_details.name}]: {exc}" + ), + stackTrace=traceback.format_exc(), + ) + ) + finally: + self._report_definitions.pop(dashboard_details.id, None) + + def _yield_dataset_lineage( + self, + dashboard_details: SsrsReport, + dataset: SsrsDataSet, + datasource: Optional[SsrsDataSource], + context: _LineageContext, + ) -> Iterable[Either[AddLineageRequest]]: + if not self._is_dataset_lineage_eligible(dataset, datasource): + return + + to_entity = self._resolve_lineage_target(dashboard_details, dataset) + if to_entity is None: + return + + try: + lineage_parser = LineageParser( + dataset.command_text, + context.dialect, + parser_type=self.get_query_parser_type(), + ) + except Exception as exc: + logger.debug("LineageParser failed for dataset [%s]: %s", dataset.name, exc) + return + + default_database = datasource.database if datasource else None + for source_table in lineage_parser.source_tables or []: + yield from self._yield_table_to_target_lineage( + source_table=str(source_table), + to_entity=to_entity, + command_text=dataset.command_text, + context=context, + default_database=default_database, + ) + + @staticmethod + def _is_dataset_lineage_eligible( + dataset: SsrsDataSet, datasource: Optional[SsrsDataSource] + ) -> bool: + if not dataset.command_text: + logger.debug( + "Skipping lineage for dataset [%s]: empty CommandText", dataset.name + ) + return False + if dataset.command_type in SKIP_COMMAND_TYPES: + logger.debug( + "Skipping lineage for dataset [%s]: command type [%s]", + dataset.name, + dataset.command_type, + ) + return False + if datasource and datasource.data_provider in MDX_PROVIDERS: + logger.debug( + "Skipping lineage for dataset [%s]: MDX data provider [%s]", + dataset.name, + datasource.data_provider, + ) + return False + if dataset.shared_reference: + logger.debug( + "Skipping lineage for dataset [%s]: shared dataset reference [%s]", + dataset.name, + dataset.shared_reference, + ) + return False + return True + + def _resolve_lineage_target( + self, dashboard_details: SsrsReport, dataset: SsrsDataSet + ) -> Optional[Union[DashboardDataModel, Dashboard]]: + if self.source_config.includeDataModels: + datamodel_fqn = fqn.build( + metadata=self.metadata, + entity_type=DashboardDataModel, + service_name=self.context.get().dashboard_service, + data_model_name=self._datamodel_name( + dashboard_details.id, dataset.name + ), + ) + return self.metadata.get_by_name( + entity=DashboardDataModel, fqn=datamodel_fqn + ) + dashboard_fqn = fqn.build( + self.metadata, + entity_type=Dashboard, + service_name=self.context.get().dashboard_service, + dashboard_name=dashboard_details.id, + ) + return self.metadata.get_by_name(entity=Dashboard, fqn=dashboard_fqn) + + def _resolve_db_service( + self, db_service_name: Optional[str] + ) -> Optional[DatabaseService]: + if not db_service_name: + return None + try: + return self.metadata.get_by_name( + entity=DatabaseService, fqn=db_service_name + ) + except Exception as exc: + logger.debug("Could not resolve DB service [%s]: %s", db_service_name, exc) + return None + + @staticmethod + def _resolve_dialect( + db_service_entity: Optional[DatabaseService], + datasource: Optional[SsrsDataSource] = None, + ) -> Dialect: + if db_service_entity and db_service_entity.serviceType: + return ConnectionTypeDialectMapper.dialect_of( + db_service_entity.serviceType.value + ) + if datasource and datasource.data_provider: + provider_dialect = DATA_PROVIDER_DIALECT.get( + datasource.data_provider.upper() + ) + if provider_dialect is not None: + return provider_dialect + return Dialect.TSQL + + def _yield_table_to_target_lineage( + self, + source_table: str, + to_entity: Union[DashboardDataModel, Dashboard], + command_text: str, + context: _LineageContext, + default_database: Optional[str], + ) -> Iterable[Either[AddLineageRequest]]: + split = fqn.split_table_name(source_table) + table_name = context.prefix_table or split.get("table") + if not table_name: + return + database_name = ( + context.prefix_database or split.get("database") or default_database + ) + schema_name = context.prefix_schema or split.get("database_schema") + if context.db_service_entity and database_name: + database_name = get_database_name_for_lineage( + context.db_service_entity, database_name + ) + fqn_search_string = build_es_fqn_search_string( + service_name=context.db_service_name or "*", + database_name=database_name, + schema_name=schema_name, + table_name=table_name, + ) + table_entity = self.metadata.search_in_any_service( + entity_type=Table, fqn_search_string=fqn_search_string + ) + if not table_entity: + return + lineage = self._get_add_lineage_request( + to_entity=to_entity, from_entity=table_entity, sql=command_text + ) + if lineage is not None: + yield lineage def close(self): self.client.close() diff --git a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/rdl_parser.py b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/rdl_parser.py new file mode 100644 index 000000000000..461b051508a4 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/rdl_parser.py @@ -0,0 +1,204 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Parser for SSRS RDL (Report Definition Language) XML documents. + +RDL namespaces differ across SSRS versions (2008/2010/2016+). Traversal is +namespace-agnostic: we compare element local names. +""" +from dataclasses import dataclass, field +from typing import List, Optional, Tuple +from xml.etree import ElementTree as ET + +SERVER_KEYS = {"data source", "server", "address", "addr", "network address"} +DATABASE_KEYS = {"initial catalog", "database"} + + +@dataclass +class SsrsField: + name: str + data_field: Optional[str] = None + + +@dataclass +class SsrsDataSource: + name: str + data_provider: Optional[str] = None + connect_string: Optional[str] = None + server: Optional[str] = None + database: Optional[str] = None + shared_reference: Optional[str] = None + + +@dataclass +class SsrsDataSet: + name: str + data_source_name: Optional[str] = None + command_type: Optional[str] = None + command_text: Optional[str] = None + fields: List[SsrsField] = field(default_factory=list) + shared_reference: Optional[str] = None + + +@dataclass +class SsrsReportDefinition: + data_sources: List[SsrsDataSource] = field(default_factory=list) + data_sets: List[SsrsDataSet] = field(default_factory=list) + + +def parse_rdl(rdl_bytes: bytes) -> SsrsReportDefinition: + """Parse RDL XML into a structured definition. Raises ``ValueError`` on malformed XML.""" + if not rdl_bytes: + raise ValueError("Empty RDL content") + try: + root = ET.fromstring(rdl_bytes) + except ET.ParseError as exc: + raise ValueError(f"Malformed RDL XML: {exc}") from exc + return SsrsReportDefinition( + data_sources=_parse_data_sources(root), + data_sets=_parse_data_sets(root), + ) + + +def parse_connect_string( + connect_string: Optional[str], +) -> Tuple[Optional[str], Optional[str]]: + """Extract ``(server, database)`` from a connection string. + + Accepts common SSRS/SQL-Server variants (``Data Source=``, ``Server=``, + ``Initial Catalog=``, ``Database=``). Case-insensitive, semicolon-delimited.""" + if not connect_string: + return None, None + server: Optional[str] = None + database: Optional[str] = None + for segment in connect_string.split(";"): + if "=" not in segment: + continue + key, _, value = segment.partition("=") + key_lower = key.strip().lower() + value = value.strip() + if not value: + continue + if server is None and key_lower in SERVER_KEYS: + server = value + elif database is None and key_lower in DATABASE_KEYS: + database = value + return server, database + + +def _local(tag: str) -> str: + return tag.rsplit("}", 1)[-1] + + +def _find_child(parent: ET.Element, name: str) -> Optional[ET.Element]: + for child in parent: + if _local(child.tag) == name: + return child + return None + + +def _find_children(parent: ET.Element, name: str) -> List[ET.Element]: + return [child for child in parent if _local(child.tag) == name] + + +def _text(elem: Optional[ET.Element]) -> Optional[str]: + if elem is None or elem.text is None: + return None + stripped = elem.text.strip() + return stripped or None + + +def _parse_data_sources(root: ET.Element) -> List[SsrsDataSource]: + container = _find_child(root, "DataSources") + if container is None: + return [] + sources: List[SsrsDataSource] = [] + for ds_elem in _find_children(container, "DataSource"): + name = ds_elem.attrib.get("Name") or "" + if not name: + continue + ref = _find_child(ds_elem, "DataSourceReference") + if ref is not None: + sources.append(SsrsDataSource(name=name, shared_reference=_text(ref))) + continue + props = _find_child(ds_elem, "ConnectionProperties") + if props is None: + sources.append(SsrsDataSource(name=name)) + continue + connect_string = _text(_find_child(props, "ConnectString")) + data_provider = _text(_find_child(props, "DataProvider")) + server, database = parse_connect_string(connect_string) + sources.append( + SsrsDataSource( + name=name, + data_provider=data_provider, + connect_string=connect_string, + server=server, + database=database, + ) + ) + return sources + + +def _parse_data_sets(root: ET.Element) -> List[SsrsDataSet]: + container = _find_child(root, "DataSets") + if container is None: + return [] + datasets: List[SsrsDataSet] = [] + for ds_elem in _find_children(container, "DataSet"): + name = ds_elem.attrib.get("Name") or "" + if not name: + continue + shared_ref = _text(_find_child(ds_elem, "SharedDataSet")) + if shared_ref is None: + shared_container = _find_child(ds_elem, "SharedDataSetReference") + shared_ref = _text(shared_container) + datasets.append(_build_dataset(ds_elem, name, shared_ref)) + return datasets + + +def _build_dataset( + ds_elem: ET.Element, name: str, shared_ref: Optional[str] +) -> SsrsDataSet: + query = _find_child(ds_elem, "Query") + command_type = None + command_text = None + data_source_name = None + if query is not None: + data_source_name = _text(_find_child(query, "DataSourceName")) + command_type = _text(_find_child(query, "CommandType")) + command_text = _text(_find_child(query, "CommandText")) + return SsrsDataSet( + name=name, + data_source_name=data_source_name, + command_type=command_type, + command_text=command_text, + fields=_parse_fields(ds_elem), + shared_reference=shared_ref, + ) + + +def _parse_fields(ds_elem: ET.Element) -> List[SsrsField]: + fields_container = _find_child(ds_elem, "Fields") + if fields_container is None: + return [] + fields: List[SsrsField] = [] + for field_elem in _find_children(fields_container, "Field"): + field_name = field_elem.attrib.get("Name") + if not field_name: + continue + fields.append( + SsrsField( + name=field_name, + data_field=_text(_find_child(field_elem, "DataField")), + ) + ) + return fields diff --git a/ingestion/tests/integration/ssrs/conftest.py b/ingestion/tests/integration/ssrs/conftest.py index 380d91ba391d..db3e002427a8 100644 --- a/ingestion/tests/integration/ssrs/conftest.py +++ b/ingestion/tests/integration/ssrs/conftest.py @@ -45,6 +45,30 @@ {"Id": "folder-1", "Name": "TestFolder", "Path": "/TestFolder"}, ] +MOCK_RDL_BY_ID = { + "report-1": ( + b'' + b'' + b"" + b'' + b"" + b"SQL" + b"Data Source=sql01;Initial Catalog=SalesDB" + b"" + b"" + b"" + b"" + b'' + b"MainDS" + b"Text" + b"SELECT OrderId FROM dbo.Orders" + b'OrderId' + b"" + b"" + ), +} + class SsrsMockHandler(BaseHTTPRequestHandler): def do_GET(self): @@ -59,9 +83,22 @@ def do_GET(self): skip = int(params.get("$skip", ["0"])[0]) page = MOCK_REPORTS[skip : skip + top] self._respond({"value": page}) + elif self._match_rdl(path) is not None: + self._respond_rdl(self._match_rdl(path)) else: self.send_error(404) + @staticmethod + def _match_rdl(path: str): + for template in ( + "/reports/api/v2.0/Reports({id})/Content/$value", + "/reports/api/v2.0/CatalogItems({id})/Content", + ): + prefix, _, suffix = template.partition("{id}") + if path.startswith(prefix) and path.endswith(suffix): + return path[len(prefix) : len(path) - len(suffix)] + return None + def _respond(self, data: dict): body = json.dumps(data).encode() self.send_response(200) @@ -70,6 +107,17 @@ def _respond(self, data: dict): self.end_headers() self.wfile.write(body) + def _respond_rdl(self, report_id: str): + body = MOCK_RDL_BY_ID.get(report_id) + if body is None: + self.send_error(404) + return + self.send_response(200) + self.send_header("Content-Type", "application/xml") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + def log_message(self, format, *args): pass diff --git a/ingestion/tests/integration/ssrs/test_metadata.py b/ingestion/tests/integration/ssrs/test_metadata.py index cd20cfae695f..e634ab870e6c 100644 --- a/ingestion/tests/integration/ssrs/test_metadata.py +++ b/ingestion/tests/integration/ssrs/test_metadata.py @@ -56,3 +56,33 @@ def test_hidden_reports_present_in_raw(self, ssrs_service): assert any(r.hidden for r in reports) visible = [r for r in reports if not r.hidden] assert len(visible) == 3 + + def test_client_get_report_definition_returns_bytes(self, ssrs_service): + connection = SsrsConnection( + hostPort=ssrs_service, username="test_user", password="test_pass" + ) + client = SsrsClient(connection) + rdl = client.get_report_definition("report-1") + assert rdl is not None + assert b"" in rdl + assert b"SELECT OrderId FROM dbo.Orders" in rdl + + def test_client_get_report_definition_404_returns_none(self, ssrs_service): + connection = SsrsConnection( + hostPort=ssrs_service, username="test_user", password="test_pass" + ) + client = SsrsClient(connection) + assert client.get_report_definition("does-not-exist") is None + + def test_end_to_end_rdl_parse_via_mock_server(self, ssrs_service): + from metadata.ingestion.source.dashboard.ssrs.rdl_parser import parse_rdl + + connection = SsrsConnection( + hostPort=ssrs_service, username="test_user", password="test_pass" + ) + client = SsrsClient(connection) + rdl = client.get_report_definition("report-1") + parsed = parse_rdl(rdl) + assert len(parsed.data_sets) == 1 + assert parsed.data_sets[0].command_text == "SELECT OrderId FROM dbo.Orders" + assert parsed.data_sources[0].database == "SalesDB" diff --git a/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/expression_commandtype.rdl b/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/expression_commandtype.rdl new file mode 100644 index 000000000000..f9970a46560b --- /dev/null +++ b/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/expression_commandtype.rdl @@ -0,0 +1,23 @@ + + + + + + SQL + Data Source=sql01;Initial Catalog=DynDB + + + + + + + DynamicDS + Expression + ="SELECT * FROM " & Parameters!Tbl.Value + + + Col + + + + diff --git a/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/inline_multi_dataset_2010.rdl b/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/inline_multi_dataset_2010.rdl new file mode 100644 index 000000000000..6f64de70e04e --- /dev/null +++ b/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/inline_multi_dataset_2010.rdl @@ -0,0 +1,35 @@ + + + + + + SQL + Server=finance01;Database=FinanceDB + + + + + + + FinanceDS + Text + SELECT MonthName, Amount FROM dbo.Revenue + + + MonthName + Amount + + + + + FinanceDS + Text + SELECT Category, Amount FROM dbo.Expenses + + + Category + Amount + + + + diff --git a/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/inline_single_dataset_2016.rdl b/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/inline_single_dataset_2016.rdl new file mode 100644 index 000000000000..8a01880c0115 --- /dev/null +++ b/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/inline_single_dataset_2016.rdl @@ -0,0 +1,27 @@ + + + + + + SQL + Data Source=sql01.example.com;Initial Catalog=SalesDB + + Integrated + + + + + + SalesDS + Text + SELECT OrderId, CustomerName, Total FROM dbo.Orders WHERE Total > @minTotal + + + OrderId + CustomerName + Total + + + + diff --git a/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/malformed.rdl b/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/malformed.rdl new file mode 100644 index 000000000000..6e96342df31e --- /dev/null +++ b/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/malformed.rdl @@ -0,0 +1,4 @@ + + + + diff --git a/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/no_datasource.rdl b/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/no_datasource.rdl new file mode 100644 index 000000000000..f5229a192988 --- /dev/null +++ b/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/no_datasource.rdl @@ -0,0 +1,5 @@ + + + + + diff --git a/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/shared_datasource.rdl b/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/shared_datasource.rdl new file mode 100644 index 000000000000..90f56044edcf --- /dev/null +++ b/ingestion/tests/unit/topology/dashboard/fixtures/ssrs/shared_datasource.rdl @@ -0,0 +1,21 @@ + + + + + /Shared Data Sources/Warehouse + + + + + + SharedDS + Text + SELECT Sku, Qty FROM dbo.Inventory + + + Sku + Qty + + + + diff --git a/ingestion/tests/unit/topology/dashboard/test_ssrs.py b/ingestion/tests/unit/topology/dashboard/test_ssrs.py index 7df4fa79c1d7..698ea0ded8b0 100644 --- a/ingestion/tests/unit/topology/dashboard/test_ssrs.py +++ b/ingestion/tests/unit/topology/dashboard/test_ssrs.py @@ -20,20 +20,31 @@ from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.entity.data.chart import ChartType +from metadata.generated.schema.entity.data.dashboard import Dashboard +from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDataModel from metadata.generated.schema.entity.services.dashboardService import ( DashboardConnection, DashboardService, DashboardServiceType, ) +from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) from metadata.generated.schema.type.basic import FullyQualifiedEntityName from metadata.ingestion.api.models import Either +from metadata.ingestion.connections.test_connections import SourceConnectionException +from metadata.ingestion.lineage.models import Dialect from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.ssrs.client import SsrsClient from metadata.ingestion.source.dashboard.ssrs.metadata import SsrsSource from metadata.ingestion.source.dashboard.ssrs.models import SsrsFolder, SsrsReport +from metadata.ingestion.source.dashboard.ssrs.rdl_parser import ( + SsrsDataSet, + SsrsDataSource, + SsrsField, + SsrsReportDefinition, +) MOCK_DASHBOARD_SERVICE = DashboardService( id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", @@ -238,10 +249,6 @@ def test_yield_dashboard_chart_filtered(self, ssrs_source): results = list(ssrs_source.yield_dashboard_chart(MOCK_REPORTS[0])) assert len(results) == 0 - def test_yield_dashboard_lineage_is_noop(self, ssrs_source): - result = ssrs_source.yield_dashboard_lineage_details(MOCK_REPORTS[0]) - assert result is None - def test_chart_source_state_populated(self, ssrs_source): """Verify register_record_chart populates chart_source_state after yield_dashboard_chart.""" ssrs_source.chart_source_state = set() @@ -448,10 +455,6 @@ def test_get_reports_raises_on_persistent_failure(self): """Ensure a failed page surfaces as SourceConnectionException rather than a silently truncated stream — otherwise mark-deleted would wipe dashboards whenever SSRS is slow or briefly down.""" - from metadata.ingestion.connections.test_connections import ( - SourceConnectionException, - ) - client = _build_mock_client() client._get = MagicMock(side_effect=requests.ReadTimeout("boom")) @@ -462,10 +465,6 @@ def test_get_reports_raises_mid_stream(self): """If page N succeeds but page N+1 fails, the generator must raise — yielding a partial set silently would cause mark_deleted to drop the rest of the catalog.""" - from metadata.ingestion.connections.test_connections import ( - SourceConnectionException, - ) - client = _build_mock_client() page1 = { "value": [ @@ -481,12 +480,324 @@ def test_get_reports_raises_mid_stream(self): next(reports_iter) def test_get_folders_raises_on_failure(self): - from metadata.ingestion.connections.test_connections import ( - SourceConnectionException, - ) - client = _build_mock_client() client._get = MagicMock(side_effect=requests.ConnectionError("no route")) with pytest.raises(SourceConnectionException): list(client.get_folders()) + + +RDL_SALES = SsrsReportDefinition( + data_sources=[ + SsrsDataSource( + name="SalesDS", + data_provider="SQL", + connect_string="Data Source=sql01;Initial Catalog=SalesDB", + server="sql01", + database="SalesDB", + ) + ], + data_sets=[ + SsrsDataSet( + name="SalesDataset", + data_source_name="SalesDS", + command_type="Text", + command_text="SELECT OrderId, CustomerName FROM dbo.Orders", + fields=[ + SsrsField(name="OrderId", data_field="OrderId"), + SsrsField(name="CustomerName", data_field="CustomerName"), + ], + ) + ], +) + +RDL_MULTI = SsrsReportDefinition( + data_sources=[ + SsrsDataSource( + name="FinanceDS", + data_provider="SQL", + connect_string="Server=fin;Database=FinanceDB", + server="fin", + database="FinanceDB", + ) + ], + data_sets=[ + SsrsDataSet( + name="Revenue", + data_source_name="FinanceDS", + command_type="Text", + command_text="SELECT MonthName, Amount FROM dbo.Revenue", + fields=[SsrsField(name="MonthName"), SsrsField(name="Amount")], + ), + SsrsDataSet( + name="Expenses", + data_source_name="FinanceDS", + command_type="Text", + command_text="SELECT Category, Amount FROM dbo.Expenses", + fields=[SsrsField(name="Category"), SsrsField(name="Amount")], + ), + ], +) + +RDL_EXPRESSION = SsrsReportDefinition( + data_sources=[SsrsDataSource(name="D", data_provider="SQL", database="DB")], + data_sets=[ + SsrsDataSet( + name="Dyn", + data_source_name="D", + command_type="Expression", + command_text='="SELECT * FROM " & Parameters!Tbl.Value', + ) + ], +) + +RDL_STORED_PROC = SsrsReportDefinition( + data_sources=[SsrsDataSource(name="D", data_provider="SQL", database="DB")], + data_sets=[ + SsrsDataSet( + name="Proc", + data_source_name="D", + command_type="StoredProcedure", + command_text="dbo.usp_GetThings", + ) + ], +) + +RDL_MDX = SsrsReportDefinition( + data_sources=[ + SsrsDataSource(name="Cube", data_provider="OLEDB-MD", database="OLAP") + ], + data_sets=[ + SsrsDataSet( + name="MDXQuery", + data_source_name="Cube", + command_type="Text", + command_text="SELECT [Measures].[X] ON 0 FROM [Cube]", + ) + ], +) + + +def _set_context(source, **kwargs): + for key, value in kwargs.items(): + source.context.get().__dict__[key] = value + + +class TestSsrsYieldDatamodel: + def _prepare(self, ssrs_source, rdl): + ssrs_source._report_definitions = {MOCK_REPORTS[0].id: rdl} + ssrs_source.source_config.includeDataModels = True + + def test_emits_one_per_dataset(self, ssrs_source): + self._prepare(ssrs_source, RDL_MULTI) + results = list(ssrs_source.yield_datamodel(MOCK_REPORTS[0])) + names = [str(r.right.name.root) for r in results] + assert names == [ + f"{MOCK_REPORTS[0].id}.Revenue", + f"{MOCK_REPORTS[0].id}.Expenses", + ] + assert all(r.right.dataModelType.value == "SsrsDataModel" for r in results) + + def test_single_dataset_attaches_sql_and_columns(self, ssrs_source): + self._prepare(ssrs_source, RDL_SALES) + results = list(ssrs_source.yield_datamodel(MOCK_REPORTS[0])) + assert len(results) == 1 + model = results[0].right + assert model.sql.root == "SELECT OrderId, CustomerName FROM dbo.Orders" + assert [c.name.root for c in model.columns] == ["OrderId", "CustomerName"] + + def test_sql_omitted_for_stored_procedure(self, ssrs_source): + self._prepare(ssrs_source, RDL_STORED_PROC) + results = list(ssrs_source.yield_datamodel(MOCK_REPORTS[0])) + assert results[0].right.sql is None + + def test_sql_omitted_for_expression(self, ssrs_source): + self._prepare(ssrs_source, RDL_EXPRESSION) + results = list(ssrs_source.yield_datamodel(MOCK_REPORTS[0])) + assert results[0].right.sql is None + + def test_skipped_when_include_data_models_false(self, ssrs_source): + ssrs_source._report_definitions = {MOCK_REPORTS[0].id: RDL_SALES} + ssrs_source.source_config.includeDataModels = False + assert list(ssrs_source.yield_datamodel(MOCK_REPORTS[0])) == [] + + def test_no_rdl_cached(self, ssrs_source): + ssrs_source._report_definitions = {} + ssrs_source.source_config.includeDataModels = True + assert list(ssrs_source.yield_datamodel(MOCK_REPORTS[0])) == [] + + +class TestSsrsLineage: + def _prepare(self, ssrs_source, rdl, *, include_data_models=True): + ssrs_source._report_definitions = {MOCK_REPORTS[0].id: rdl} + ssrs_source.source_config.includeDataModels = include_data_models + datamodel_entity = SimpleNamespace( + id=SimpleNamespace(root="dm-uuid"), fullyQualifiedName=None + ) + dashboard_entity = SimpleNamespace( + id=SimpleNamespace(root="dash-uuid"), fullyQualifiedName=None + ) + table_entity = SimpleNamespace( + id=SimpleNamespace(root="tbl-uuid"), fullyQualifiedName=None + ) + + def by_name(entity, fqn=None, **_): + if entity is DashboardDataModel: + return datamodel_entity + if entity is Dashboard: + return dashboard_entity + if entity is DatabaseService: + return None + return None + + ssrs_source.metadata = MagicMock() + ssrs_source.metadata.get_by_name = MagicMock(side_effect=by_name) + ssrs_source.metadata.search_in_any_service = MagicMock( + return_value=table_entity + ) + return datamodel_entity, dashboard_entity, table_entity + + def test_inline_datasource_yields_lineage(self, ssrs_source): + datamodel, _, table = self._prepare(ssrs_source, RDL_SALES) + lineage_calls = [] + + def fake_lineage(to_entity=None, from_entity=None, sql=None, **_): + lineage_calls.append({"to": to_entity, "from": from_entity, "sql": sql}) + return Either(right=SimpleNamespace(sql=sql)) + + with patch( + "metadata.ingestion.source.dashboard.ssrs.metadata.LineageParser" + ) as mock_parser, patch.object( + SsrsSource, "_get_add_lineage_request", staticmethod(fake_lineage) + ): + mock_parser.return_value.source_tables = ["dbo.Orders"] + results = list( + ssrs_source.yield_dashboard_lineage_details( + MOCK_REPORTS[0], db_service_prefix="my_mssql" + ) + ) + assert len(results) == 1 + assert lineage_calls[0]["sql"] == "SELECT OrderId, CustomerName FROM dbo.Orders" + assert lineage_calls[0]["to"] is datamodel + assert lineage_calls[0]["from"] is table + search_call = ssrs_source.metadata.search_in_any_service.call_args + assert "SalesDB" in search_call.kwargs["fqn_search_string"] + assert "dbo" in search_call.kwargs["fqn_search_string"] + assert "Orders" in search_call.kwargs["fqn_search_string"] + assert MOCK_REPORTS[0].id not in ssrs_source._report_definitions + + def test_skips_expression_command(self, ssrs_source): + self._prepare(ssrs_source, RDL_EXPRESSION) + with patch( + "metadata.ingestion.source.dashboard.ssrs.metadata.LineageParser" + ) as mock_parser: + results = list(ssrs_source.yield_dashboard_lineage_details(MOCK_REPORTS[0])) + assert results == [] + mock_parser.assert_not_called() + + def test_skips_stored_procedure(self, ssrs_source): + self._prepare(ssrs_source, RDL_STORED_PROC) + with patch( + "metadata.ingestion.source.dashboard.ssrs.metadata.LineageParser" + ) as mock_parser: + results = list(ssrs_source.yield_dashboard_lineage_details(MOCK_REPORTS[0])) + assert results == [] + mock_parser.assert_not_called() + + def test_skips_mdx_datasource(self, ssrs_source): + self._prepare(ssrs_source, RDL_MDX) + with patch( + "metadata.ingestion.source.dashboard.ssrs.metadata.LineageParser" + ) as mock_parser: + results = list(ssrs_source.yield_dashboard_lineage_details(MOCK_REPORTS[0])) + assert results == [] + mock_parser.assert_not_called() + + def test_parser_failure_for_one_dataset_does_not_block_others(self, ssrs_source): + self._prepare(ssrs_source, RDL_MULTI) + parser_expenses = MagicMock() + parser_expenses.source_tables = ["dbo.Expenses"] + captured = [] + + def fake_lineage(to_entity=None, from_entity=None, sql=None, **_): + captured.append(sql) + return Either(right=SimpleNamespace(sql=sql)) + + with patch( + "metadata.ingestion.source.dashboard.ssrs.metadata.LineageParser", + side_effect=[Exception("parse error"), parser_expenses], + ), patch.object( + SsrsSource, "_get_add_lineage_request", staticmethod(fake_lineage) + ): + results = list(ssrs_source.yield_dashboard_lineage_details(MOCK_REPORTS[0])) + assert len(results) == 1 + assert captured == ["SELECT Category, Amount FROM dbo.Expenses"] + + def test_dialect_defaults_to_tsql(self, ssrs_source): + self._prepare(ssrs_source, RDL_SALES) + with patch( + "metadata.ingestion.source.dashboard.ssrs.metadata.LineageParser" + ) as mock_parser: + mock_parser.return_value.source_tables = [] + list(ssrs_source.yield_dashboard_lineage_details(MOCK_REPORTS[0])) + assert Dialect.TSQL in mock_parser.call_args.args + + def test_dialect_uses_data_provider_when_no_db_service(self, ssrs_source): + rdl = SsrsReportDefinition( + data_sources=[ + SsrsDataSource( + name="Oracle", + data_provider="ORACLE", + connect_string="Data Source=ora;Initial Catalog=ODB", + server="ora", + database="ODB", + ) + ], + data_sets=[ + SsrsDataSet( + name="Q", + data_source_name="Oracle", + command_type="Text", + command_text="SELECT * FROM ora_schema.things", + fields=[SsrsField(name="things")], + ) + ], + ) + self._prepare(ssrs_source, rdl) + with patch( + "metadata.ingestion.source.dashboard.ssrs.metadata.LineageParser" + ) as mock_parser: + mock_parser.return_value.source_tables = [] + list(ssrs_source.yield_dashboard_lineage_details(MOCK_REPORTS[0])) + assert Dialect.ORACLE in mock_parser.call_args.args + + def test_no_rdl_yields_nothing(self, ssrs_source): + ssrs_source._report_definitions = {} + ssrs_source.source_config.includeDataModels = True + assert list(ssrs_source.yield_dashboard_lineage_details(MOCK_REPORTS[0])) == [] + + def test_falls_back_to_dashboard_target_when_datamodels_disabled(self, ssrs_source): + _, dashboard_entity, _ = self._prepare( + ssrs_source, RDL_SALES, include_data_models=False + ) + captured = [] + + def fake_lineage(to_entity=None, from_entity=None, sql=None, **_): + captured.append(to_entity) + return Either(right=SimpleNamespace()) + + with patch( + "metadata.ingestion.source.dashboard.ssrs.metadata.LineageParser" + ) as mock_parser, patch.object( + SsrsSource, "_get_add_lineage_request", staticmethod(fake_lineage) + ): + mock_parser.return_value.source_tables = ["dbo.Orders"] + results = list(ssrs_source.yield_dashboard_lineage_details(MOCK_REPORTS[0])) + assert len(results) == 1 + assert captured == [dashboard_entity] + entity_classes = { + call.kwargs.get("entity") + for call in ssrs_source.metadata.get_by_name.call_args_list + } + assert Dashboard in entity_classes + assert DashboardDataModel not in entity_classes diff --git a/ingestion/tests/unit/topology/dashboard/test_ssrs_rdl_parser.py b/ingestion/tests/unit/topology/dashboard/test_ssrs_rdl_parser.py new file mode 100644 index 000000000000..8406ba31918a --- /dev/null +++ b/ingestion/tests/unit/topology/dashboard/test_ssrs_rdl_parser.py @@ -0,0 +1,143 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Unit tests for SSRS RDL parser +""" +from pathlib import Path + +import pytest + +from metadata.ingestion.source.dashboard.ssrs.rdl_parser import ( + parse_connect_string, + parse_rdl, +) + +FIXTURES = Path(__file__).parent / "fixtures" / "ssrs" + + +def _load(name: str) -> bytes: + return (FIXTURES / name).read_bytes() + + +class TestParseRdl: + def test_inline_single_dataset_2016(self): + result = parse_rdl(_load("inline_single_dataset_2016.rdl")) + assert len(result.data_sources) == 1 + ds = result.data_sources[0] + assert ds.name == "SalesDS" + assert ds.data_provider == "SQL" + assert ds.server == "sql01.example.com" + assert ds.database == "SalesDB" + assert ds.shared_reference is None + + assert len(result.data_sets) == 1 + dataset = result.data_sets[0] + assert dataset.name == "SalesDataset" + assert dataset.data_source_name == "SalesDS" + assert dataset.command_type == "Text" + assert "SELECT OrderId" in dataset.command_text + assert "@minTotal" in dataset.command_text + assert [f.name for f in dataset.fields] == [ + "OrderId", + "CustomerName", + "Total", + ] + + def test_inline_multi_dataset_2010(self): + result = parse_rdl(_load("inline_multi_dataset_2010.rdl")) + assert len(result.data_sources) == 1 + assert result.data_sources[0].server == "finance01" + assert result.data_sources[0].database == "FinanceDB" + + names = [d.name for d in result.data_sets] + assert names == ["Revenue", "Expenses"] + assert result.data_sets[0].command_text.startswith("SELECT MonthName, Amount") + assert result.data_sets[1].command_text.startswith("SELECT Category, Amount") + + def test_shared_datasource_reference(self): + result = parse_rdl(_load("shared_datasource.rdl")) + assert len(result.data_sources) == 1 + ds = result.data_sources[0] + assert ds.name == "SharedDS" + assert ds.shared_reference == "/Shared Data Sources/Warehouse" + assert ds.connect_string is None + assert ds.database is None + + def test_no_datasource(self): + result = parse_rdl(_load("no_datasource.rdl")) + assert result.data_sources == [] + assert result.data_sets == [] + + def test_expression_command_type(self): + result = parse_rdl(_load("expression_commandtype.rdl")) + dataset = result.data_sets[0] + assert dataset.command_type == "Expression" + + def test_malformed_raises_value_error(self): + with pytest.raises(ValueError): + parse_rdl(_load("malformed.rdl")) + + def test_empty_bytes_raises_value_error(self): + with pytest.raises(ValueError): + parse_rdl(b"") + + def test_namespace_2008_2010_2016_equivalence(self): + template = ( + '' + "" + '' + "" + "SQL" + "Data Source=s;Initial Catalog=d" + "" + "" + '' + "DS" + "Text" + "SELECT 1" + "" + ) + for ns in ( + "http://schemas.microsoft.com/sqlserver/reporting/2008/01/reportdefinition", + "http://schemas.microsoft.com/sqlserver/reporting/2010/01/reportdefinition", + "http://schemas.microsoft.com/sqlserver/reporting/2016/01/reportdefinition", + ): + result = parse_rdl(template.format(ns=ns).encode("utf-8")) + assert result.data_sources[0].database == "d" + assert result.data_sets[0].command_text == "SELECT 1" + + +class TestParseConnectString: + @pytest.mark.parametrize( + "connect_string,expected_server,expected_db", + [ + ("Data Source=srv;Initial Catalog=db", "srv", "db"), + ("data source=srv;initial catalog=db", "srv", "db"), + ("Server=srv;Database=db", "srv", "db"), + ("Address=srv;Database=db", "srv", "db"), + ( + "Data Source=srv;Initial Catalog=db;Integrated Security=SSPI;", + "srv", + "db", + ), + ("Data Source=srv", "srv", None), + ("Initial Catalog=db", None, "db"), + ("", None, None), + (None, None, None), + ("Data Source=;Initial Catalog=db", None, "db"), + ("garbage;no;equals", None, None), + ], + ) + def test_variants(self, connect_string, expected_server, expected_db): + assert parse_connect_string(connect_string) == ( + expected_server, + expected_db, + ) diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json index 489506e265e2..73ef958344e9 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json @@ -30,7 +30,8 @@ "SigmaDataModel", "PowerBIDataFlow", "MicroStrategyDataset", - "ThoughtSpotDataModel" + "ThoughtSpotDataModel", + "SsrsDataModel" ], "javaEnums": [ { @@ -74,6 +75,9 @@ }, { "name": "ThoughtSpotDataModel" + }, + { + "name": "SsrsDataModel" } ] } From 721b2c6b9169b87b1f3c1d27beb66af17c6ddfde Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 23 Apr 2026 06:49:56 +0000 Subject: [PATCH 2/6] Update generated TypeScript types --- .../ui/src/generated/api/data/createDashboardDataModel.ts | 1 + .../resources/ui/src/generated/entity/data/dashboardDataModel.ts | 1 + 2 files changed, 2 insertions(+) diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createDashboardDataModel.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createDashboardDataModel.ts index 84943d303147..6d80cfd7bcff 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createDashboardDataModel.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createDashboardDataModel.ts @@ -757,6 +757,7 @@ export enum DataModelType { QlikDataModel = "QlikDataModel", QuickSightDataModel = "QuickSightDataModel", SigmaDataModel = "SigmaDataModel", + SsrsDataModel = "SsrsDataModel", SupersetDataModel = "SupersetDataModel", TableauDataModel = "TableauDataModel", TableauEmbeddedDatasource = "TableauEmbeddedDatasource", diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/dashboardDataModel.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/dashboardDataModel.ts index 2199a6441c48..500b38438472 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/dashboardDataModel.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/dashboardDataModel.ts @@ -901,6 +901,7 @@ export enum DataModelType { QlikDataModel = "QlikDataModel", QuickSightDataModel = "QuickSightDataModel", SigmaDataModel = "SigmaDataModel", + SsrsDataModel = "SsrsDataModel", SupersetDataModel = "SupersetDataModel", TableauDataModel = "TableauDataModel", TableauEmbeddedDatasource = "TableauEmbeddedDatasource", From 2a039e42872a93cae14294b45f896bd045009be9 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 23 Apr 2026 00:06:39 -0700 Subject: [PATCH 3/6] Add ownership extraction --- .../ingestion/source/dashboard/ssrs/client.py | 45 ++++- .../source/dashboard/ssrs/metadata.py | 86 ++++++++- .../ingestion/source/dashboard/ssrs/models.py | 1 + .../source/dashboard/ssrs/rdl_parser.py | 7 +- .../unit/topology/dashboard/test_ssrs.py | 173 ++++++++++++++++++ .../dashboard/test_ssrs_rdl_parser.py | 14 ++ 6 files changed, 310 insertions(+), 16 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py index 2fd6de665717..7c6ee667fa75 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py @@ -43,10 +43,11 @@ MAX_RETRIES = 2 BACKOFF_FACTOR = 1 RETRY_STATUS_CODES = (500, 502, 503, 504) -REPORT_SELECT_FIELDS = "Id,Name,Path,Description,Type,Hidden,HasDataSources" +REPORT_SELECT_FIELDS = "Id,Name,Path,Description,Type,Hidden,HasDataSources,CreatedBy" FOLDER_SELECT_FIELDS = "Id,Name,Path" RDL_CONTENT_PATHS = ("/Reports({id})/Content/$value", "/CatalogItems({id})/Content") -RDL_NOT_FOUND_STATUS = {404, 400} +RDL_NOT_FOUND_STATUS = {404} +MAX_RDL_BYTES = 50 * 1024 * 1024 class SsrsClient: @@ -177,23 +178,57 @@ def _fetch_report_content(self, path: str) -> Optional[bytes]: if not resp.ok: logger.warning("RDL fetch returned HTTP %s for %s", resp.status_code, path) return None + if _exceeds_size_limit(resp, path): + return None return _decode_rdl_response(resp, path) +def _exceeds_size_limit(resp: requests.Response, path: str) -> bool: + length = resp.headers.get("Content-Length") + if length is None: + return False + try: + length_int = int(length) + except ValueError: + return False + if length_int > MAX_RDL_BYTES: + logger.warning( + "RDL at %s exceeds size limit (%s bytes > %s); skipping to avoid OOM", + path, + length_int, + MAX_RDL_BYTES, + ) + return True + return False + + def _decode_rdl_response(resp: requests.Response, path: str) -> Optional[bytes]: content_type = (resp.headers.get("Content-Type") or "").lower() if "json" not in content_type: - return resp.content or None + return _truncate_to_limit(resp.content, path) if resp.content else None try: payload = resp.json() except ValueError: - return resp.content or None + return _truncate_to_limit(resp.content, path) if resp.content else None value = payload.get("Value") if isinstance(payload, dict) else None if not value: logger.warning("RDL JSON response missing 'Value' field at %s", path) return None try: - return base64.b64decode(value) + decoded = base64.b64decode(value, validate=True) except (binascii.Error, ValueError) as exc: logger.warning("Malformed base64 in RDL response at %s: %s", path, exc) return None + return _truncate_to_limit(decoded, path) + + +def _truncate_to_limit(body: bytes, path: str) -> Optional[bytes]: + if len(body) > MAX_RDL_BYTES: + logger.warning( + "RDL at %s exceeds size limit (%s bytes > %s); skipping to avoid OOM", + path, + len(body), + MAX_RDL_BYTES, + ) + return None + return body diff --git a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py index 8990391cfdfc..225f27883667 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py @@ -48,6 +48,7 @@ SourceUrl, SqlQuery, ) +from metadata.generated.schema.type.entityReferenceList import EntityReferenceList from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect @@ -83,6 +84,17 @@ "REDSHIFT": Dialect.REDSHIFT, "BIGQUERY": Dialect.BIGQUERY, "TERADATA": Dialect.TERADATA, + "HIVE": Dialect.HIVE, + "CLICKHOUSE": Dialect.CLICKHOUSE, + "DATABRICKS": Dialect.DATABRICKS, + "VERTICA": Dialect.VERTICA, + "TRINO": Dialect.TRINO, + "SPARK": Dialect.SPARKSQL, + "SPARKSQL": Dialect.SPARKSQL, + "ATHENA": Dialect.ATHENA, + "IMPALA": Dialect.IMPALA, + "MARIADB": Dialect.MARIADB, + "SQLITE": Dialect.SQLITE, } @@ -132,19 +144,27 @@ def prepare(self): def get_dashboards_list(self) -> Iterable[SsrsReport]: for report in self.client.get_reports(): - if not report.hidden: - yield report + if report.hidden: + self.status.filter(report.name, "Hidden report") + continue + yield report def get_dashboard_name(self, dashboard: SsrsReport) -> str: return dashboard.name def get_dashboard_details(self, dashboard: SsrsReport) -> Optional[SsrsReport]: - self._load_report_definition(dashboard) return dashboard - def _load_report_definition(self, dashboard: SsrsReport) -> None: + def _get_report_definition( + self, dashboard: SsrsReport + ) -> Optional[SsrsReportDefinition]: + """Fetch and cache RDL lazily. Returns ``None`` when the report has no + sources or the RDL cannot be fetched/parsed.""" + cached = self._report_definitions.get(dashboard.id) + if cached is not None: + return cached if dashboard.has_data_sources is False: - return + return None try: rdl_bytes = self.client.get_report_definition(dashboard.id) except Exception as exc: @@ -152,16 +172,19 @@ def _load_report_definition(self, dashboard: SsrsReport) -> None: logger.warning( "Could not fetch RDL for report [%s]: %s", dashboard.name, exc ) - return + return None if not rdl_bytes: - return + return None try: - self._report_definitions[dashboard.id] = parse_rdl(rdl_bytes) + parsed = parse_rdl(rdl_bytes) except Exception as exc: logger.debug(traceback.format_exc()) logger.warning( "Could not parse RDL for report [%s]: %s", dashboard.name, exc ) + return None + self._report_definitions[dashboard.id] = parsed + return parsed def get_project_name(self, dashboard_details: Any) -> Optional[str]: try: @@ -174,6 +197,48 @@ def get_project_name(self, dashboard_details: Any) -> Optional[str]: logger.warning("Error fetching project name: %s", exc) return None + def get_owner_ref( + self, dashboard_details: SsrsReport + ) -> Optional[EntityReferenceList]: + """Resolve the report's ``CreatedBy`` (``DOMAIN\\user``) to an OpenMetadata user. + + Defensive: missing owner, unknown user, or lookup failure are all logged and + produce ``None`` so the rest of the dashboard ingestion continues.""" + try: + if not self.source_config.includeOwners: + return None + owner_name = self._normalize_owner(dashboard_details.created_by) + if not owner_name: + return None + owner_ref = self.metadata.get_reference_by_name( + name=owner_name, is_owner=True + ) + if owner_ref is None: + logger.debug( + "Owner [%s] for report [%s] not found in OpenMetadata; " + "continuing without ownership", + owner_name, + dashboard_details.name, + ) + return owner_ref + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + "Could not resolve owner for report [%s]: %s; " + "continuing without ownership", + dashboard_details.name, + exc, + ) + return None + + @staticmethod + def _normalize_owner(raw: Optional[str]) -> Optional[str]: + if not raw: + return None + _, sep, user = raw.rpartition("\\") + candidate = user if sep else raw + return candidate.strip() or None + def yield_dashboard( self, dashboard_details: SsrsReport ) -> Iterable[Either[CreateDashboardRequest]]: @@ -204,6 +269,7 @@ def yield_dashboard( ], project=self.context.get().project_name, service=self.context.get().dashboard_service, + owners=self.get_owner_ref(dashboard_details=dashboard_details), ) yield Either(right=dashboard_request) self.register_record(dashboard_request=dashboard_request) @@ -256,7 +322,7 @@ def yield_datamodel( ) -> Iterable[Either[CreateDashboardDataModelRequest]]: if not self.source_config.includeDataModels: return - rdl = self._report_definitions.get(dashboard_details.id) + rdl = self._get_report_definition(dashboard_details) if not rdl: return for dataset in rdl.data_sets: @@ -330,7 +396,7 @@ def yield_dashboard_lineage_details( dashboard_details: SsrsReport, db_service_prefix: Optional[str] = None, ) -> Iterable[Either[AddLineageRequest]]: - rdl = self._report_definitions.get(dashboard_details.id) + rdl = self._get_report_definition(dashboard_details) if not rdl: return diff --git a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/models.py b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/models.py index ff16407791a9..a9855d38f85a 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/models.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/models.py @@ -26,6 +26,7 @@ class SsrsReport(BaseModel): type: Optional[str] = Field(None, alias="Type") hidden: bool = Field(False, alias="Hidden") has_data_sources: Optional[bool] = Field(None, alias="HasDataSources") + created_by: Optional[str] = Field(None, alias="CreatedBy") class SsrsFolder(BaseModel): diff --git a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/rdl_parser.py b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/rdl_parser.py index 461b051508a4..c6b05e1b65f4 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/rdl_parser.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/rdl_parser.py @@ -20,6 +20,7 @@ SERVER_KEYS = {"data source", "server", "address", "addr", "network address"} DATABASE_KEYS = {"initial catalog", "database"} +FORBIDDEN_XML_TOKENS = (b" SsrsReportDefinition: - """Parse RDL XML into a structured definition. Raises ``ValueError`` on malformed XML.""" + """Parse RDL XML into a structured definition. Raises ``ValueError`` on malformed + XML or when the document contains a DTD / entity declaration (guard against + billion-laughs expansion since stdlib ElementTree honors internal entities).""" if not rdl_bytes: raise ValueError("Empty RDL content") + if any(token in rdl_bytes for token in FORBIDDEN_XML_TOKENS): + raise ValueError("RDL contains a DTD or entity declaration; refusing to parse") try: root = ET.fromstring(rdl_bytes) except ET.ParseError as exc: diff --git a/ingestion/tests/unit/topology/dashboard/test_ssrs.py b/ingestion/tests/unit/topology/dashboard/test_ssrs.py index 698ea0ded8b0..5271d0e40fb6 100644 --- a/ingestion/tests/unit/topology/dashboard/test_ssrs.py +++ b/ingestion/tests/unit/topology/dashboard/test_ssrs.py @@ -196,6 +196,14 @@ def test_dashboards_list_filters_hidden(self, ssrs_source): assert len(result) == 3 assert all(not r.hidden for r in result) + def test_hidden_reports_recorded_in_status(self, ssrs_source): + ssrs_source.client.get_reports = lambda: iter(MOCK_REPORTS_WITH_HIDDEN) + ssrs_source.status = MagicMock() + list(ssrs_source.get_dashboards_list()) + ssrs_source.status.filter.assert_called_once_with( + "Hidden Report", "Hidden report" + ) + def test_project_name(self, ssrs_source): assert ssrs_source.get_project_name(MOCK_REPORTS[0]) == "Finance" assert ssrs_source.get_project_name(MOCK_REPORTS[1]) == "Operations" @@ -257,6 +265,138 @@ def test_chart_source_state_populated(self, ssrs_source): assert any("mock_ssrs" in fqn for fqn in ssrs_source.chart_source_state) +class TestSsrsOwnership: + def test_get_owner_ref_strips_domain_and_looks_up_user(self, ssrs_source): + report = SsrsReport( + Id="r-owner-1", + Name="Owned Report", + Path="/Finance/Owned", + CreatedBy="CONTOSO\\alice", + ) + ssrs_source.source_config.includeOwners = True + sentinel = object() + ssrs_source.metadata = MagicMock() + ssrs_source.metadata.get_reference_by_name = MagicMock(return_value=sentinel) + + result = ssrs_source.get_owner_ref(report) + + assert result is sentinel + ssrs_source.metadata.get_reference_by_name.assert_called_once_with( + name="alice", is_owner=True + ) + + def test_get_owner_ref_handles_plain_username(self, ssrs_source): + report = SsrsReport( + Id="r-owner-2", + Name="Plain Owner", + Path="/Ops/Plain", + CreatedBy="bob", + ) + ssrs_source.source_config.includeOwners = True + ssrs_source.metadata = MagicMock() + ssrs_source.metadata.get_reference_by_name = MagicMock(return_value=None) + + ssrs_source.get_owner_ref(report) + ssrs_source.metadata.get_reference_by_name.assert_called_once_with( + name="bob", is_owner=True + ) + + def test_get_owner_ref_skipped_when_include_owners_false(self, ssrs_source): + report = SsrsReport( + Id="r-owner-3", + Name="Skip Owner", + Path="/Ops/Skip", + CreatedBy="CONTOSO\\carol", + ) + ssrs_source.source_config.includeOwners = False + ssrs_source.metadata = MagicMock() + + assert ssrs_source.get_owner_ref(report) is None + ssrs_source.metadata.get_reference_by_name.assert_not_called() + + def test_get_owner_ref_returns_none_when_created_by_missing(self, ssrs_source): + report = SsrsReport( + Id="r-owner-4", + Name="No Owner", + Path="/Ops/None", + ) + ssrs_source.source_config.includeOwners = True + ssrs_source.metadata = MagicMock() + + assert ssrs_source.get_owner_ref(report) is None + ssrs_source.metadata.get_reference_by_name.assert_not_called() + + def test_get_owner_ref_swallows_lookup_exceptions(self, ssrs_source): + report = SsrsReport( + Id="r-owner-5", + Name="Boom", + Path="/Ops/Boom", + CreatedBy="CONTOSO\\dan", + ) + ssrs_source.source_config.includeOwners = True + ssrs_source.metadata = MagicMock() + ssrs_source.metadata.get_reference_by_name = MagicMock( + side_effect=Exception("lookup failed") + ) + + assert ssrs_source.get_owner_ref(report) is None + + def test_yield_dashboard_continues_when_owner_not_found(self, ssrs_source): + report = SsrsReport( + Id="r-owner-missing", + Name="Unknown Owner", + Path="/Finance/Unknown Owner", + CreatedBy="CONTOSO\\ghost", + ) + ssrs_source.source_config.includeOwners = True + ssrs_source.metadata = MagicMock() + ssrs_source.metadata.get_reference_by_name = MagicMock(return_value=None) + ssrs_source.context.get().__dict__["project_name"] = "Finance" + + results = list(ssrs_source.yield_dashboard(report)) + + assert len(results) == 1 + assert isinstance(results[0].right, CreateDashboardRequest) + assert results[0].right.owners is None + assert str(results[0].right.name.root) == "r-owner-missing" + + @pytest.mark.parametrize( + "raw,expected", + [ + ("CONTOSO\\alice", "alice"), + ("alice", "alice"), + ("\\alice", "alice"), + ("CONTOSO\\", None), + ("", None), + (None, None), + (" ", None), + (" bob ", "bob"), + ], + ) + def test_normalize_owner_variants(self, raw, expected): + assert SsrsSource._normalize_owner(raw) == expected + + def test_yield_dashboard_continues_when_owner_lookup_raises(self, ssrs_source): + report = SsrsReport( + Id="r-owner-raises", + Name="Raises Owner", + Path="/Finance/Raises", + CreatedBy="CONTOSO\\eve", + ) + ssrs_source.source_config.includeOwners = True + ssrs_source.metadata = MagicMock() + ssrs_source.metadata.get_reference_by_name = MagicMock( + side_effect=Exception("OM lookup failed") + ) + ssrs_source.context.get().__dict__["project_name"] = "Finance" + + results = list(ssrs_source.yield_dashboard(report)) + + assert len(results) == 1 + assert isinstance(results[0].right, CreateDashboardRequest) + assert results[0].right.owners is None + + class TestSsrsModels: def test_ssrs_report_parsing(self): data = { @@ -304,6 +444,16 @@ def test_ssrs_report_hidden(self): report = SsrsReport(**data) assert report.hidden is True + def test_ssrs_report_created_by_alias(self): + data = { + "Id": "abc-999", + "Name": "Owned Report", + "Path": "/Reports/Owned", + "CreatedBy": "CONTOSO\\alice", + } + report = SsrsReport(**data) + assert report.created_by == "CONTOSO\\alice" + def _build_mock_client(): """Return a MagicMock with the real ``get_reports``/``get_folders``/``_paginate`` @@ -704,6 +854,29 @@ def test_skips_stored_procedure(self, ssrs_source): assert results == [] mock_parser.assert_not_called() + def test_skips_shared_dataset_reference(self, ssrs_source): + rdl = SsrsReportDefinition( + data_sources=[ + SsrsDataSource(name="Shared", shared_reference="/Shared/Src") + ], + data_sets=[ + SsrsDataSet( + name="SharedDS", + data_source_name="Shared", + command_type="Text", + command_text="SELECT * FROM dbo.X", + shared_reference="/Shared DataSets/Orders", + ) + ], + ) + self._prepare(ssrs_source, rdl) + with patch( + "metadata.ingestion.source.dashboard.ssrs.metadata.LineageParser" + ) as mock_parser: + results = list(ssrs_source.yield_dashboard_lineage_details(MOCK_REPORTS[0])) + assert results == [] + mock_parser.assert_not_called() + def test_skips_mdx_datasource(self, ssrs_source): self._prepare(ssrs_source, RDL_MDX) with patch( diff --git a/ingestion/tests/unit/topology/dashboard/test_ssrs_rdl_parser.py b/ingestion/tests/unit/topology/dashboard/test_ssrs_rdl_parser.py index 8406ba31918a..42de31312451 100644 --- a/ingestion/tests/unit/topology/dashboard/test_ssrs_rdl_parser.py +++ b/ingestion/tests/unit/topology/dashboard/test_ssrs_rdl_parser.py @@ -89,6 +89,20 @@ def test_empty_bytes_raises_value_error(self): with pytest.raises(ValueError): parse_rdl(b"") + def test_doctype_is_rejected(self): + payload = ( + b'' + b']>' + b"" + ) + with pytest.raises(ValueError, match="DTD or entity"): + parse_rdl(payload) + + def test_entity_is_rejected(self): + payload = b'' + with pytest.raises(ValueError, match="DTD or entity"): + parse_rdl(payload) + def test_namespace_2008_2010_2016_equivalence(self): template = ( '' From 4841b19c930d8f0e7d0a7ee2fd24e37a738ca176 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 23 Apr 2026 00:08:03 -0700 Subject: [PATCH 4/6] remove claude file --- .claude/scheduled_tasks.lock | 1 - 1 file changed, 1 deletion(-) delete mode 100644 .claude/scheduled_tasks.lock diff --git a/.claude/scheduled_tasks.lock b/.claude/scheduled_tasks.lock deleted file mode 100644 index ce2f15798e50..000000000000 --- a/.claude/scheduled_tasks.lock +++ /dev/null @@ -1 +0,0 @@ -{"sessionId":"7ec70b3a-5b8b-4854-b2f0-9dc0e9773078","pid":30468,"acquiredAt":1776386757898} \ No newline at end of file From 2022446da21eac2443f30cf30860abe6d33425df Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 23 Apr 2026 00:13:31 -0700 Subject: [PATCH 5/6] Address comments --- .gitignore | 2 + .../source/dashboard/ssrs/metadata.py | 62 ++++++++-------- .../unit/topology/dashboard/test_ssrs.py | 71 +++++++++++++++++++ 3 files changed, 106 insertions(+), 29 deletions(-) diff --git a/.gitignore b/.gitignore index da34d1d0262b..563f4a8741d9 100644 --- a/.gitignore +++ b/.gitignore @@ -197,5 +197,7 @@ ingestion/.claude/agents # Connector audit working files — per-session, never committed .claude/audit-results/ .claude/connector-audit.json +.claude/scheduled_tasks.lock +.claude/plans/ test-results/ diff --git a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py index 225f27883667..b46f20f7da37 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py @@ -104,7 +104,6 @@ class _LineageContext: db_service_entity: Optional[DatabaseService] prefix_database: Optional[str] prefix_schema: Optional[str] - prefix_table: Optional[str] dialect: Dialect @@ -391,6 +390,15 @@ def _build_datamodel_columns(dataset: SsrsDataSet) -> List[Column]: ) return columns + def yield_dashboard_lineage(self, dashboard_details: SsrsReport): + """Base class loops over ``db_service_prefixes`` and calls + ``yield_dashboard_lineage_details`` once per prefix. We evict the + cached RDL once at the end so every prefix sees the same parsed RDL.""" + try: + yield from super().yield_dashboard_lineage(dashboard_details) + finally: + self._report_definitions.pop(dashboard_details.id, None) + def yield_dashboard_lineage_details( self, dashboard_details: SsrsReport, @@ -404,40 +412,36 @@ def yield_dashboard_lineage_details( db_service_name, prefix_database, prefix_schema, - prefix_table, + _, ) = self.parse_db_service_prefix(db_service_prefix) db_service_entity = self._resolve_db_service(db_service_name) datasource_index = {ds.name: ds for ds in rdl.data_sources} - try: - for dataset in rdl.data_sets: - datasource = datasource_index.get(dataset.data_source_name or "") - context = _LineageContext( - db_service_name=db_service_name, - db_service_entity=db_service_entity, - prefix_database=prefix_database, - prefix_schema=prefix_schema, - prefix_table=prefix_table, - dialect=self._resolve_dialect(db_service_entity, datasource), + for dataset in rdl.data_sets: + datasource = datasource_index.get(dataset.data_source_name or "") + context = _LineageContext( + db_service_name=db_service_name, + db_service_entity=db_service_entity, + prefix_database=prefix_database, + prefix_schema=prefix_schema, + dialect=self._resolve_dialect(db_service_entity, datasource), + ) + try: + yield from self._yield_dataset_lineage( + dashboard_details, dataset, datasource, context ) - try: - yield from self._yield_dataset_lineage( - dashboard_details, dataset, datasource, context - ) - except Exception as exc: - yield Either( - left=StackTraceError( - name=f"{dashboard_details.name}.{dataset.name}", - error=( - f"Error yielding lineage for dataset [{dataset.name}] " - f"in report [{dashboard_details.name}]: {exc}" - ), - stackTrace=traceback.format_exc(), - ) + except Exception as exc: + yield Either( + left=StackTraceError( + name=f"{dashboard_details.name}.{dataset.name}", + error=( + f"Error yielding lineage for dataset [{dataset.name}] " + f"in report [{dashboard_details.name}]: {exc}" + ), + stackTrace=traceback.format_exc(), ) - finally: - self._report_definitions.pop(dashboard_details.id, None) + ) def _yield_dataset_lineage( self, @@ -567,7 +571,7 @@ def _yield_table_to_target_lineage( default_database: Optional[str], ) -> Iterable[Either[AddLineageRequest]]: split = fqn.split_table_name(source_table) - table_name = context.prefix_table or split.get("table") + table_name = split.get("table") if not table_name: return database_name = ( diff --git a/ingestion/tests/unit/topology/dashboard/test_ssrs.py b/ingestion/tests/unit/topology/dashboard/test_ssrs.py index 5271d0e40fb6..8a6e260f9a99 100644 --- a/ingestion/tests/unit/topology/dashboard/test_ssrs.py +++ b/ingestion/tests/unit/topology/dashboard/test_ssrs.py @@ -834,6 +834,44 @@ def fake_lineage(to_entity=None, from_entity=None, sql=None, **_): assert "SalesDB" in search_call.kwargs["fqn_search_string"] assert "dbo" in search_call.kwargs["fqn_search_string"] assert "Orders" in search_call.kwargs["fqn_search_string"] + + def test_multiple_prefixes_all_produce_lineage(self, ssrs_source): + """Regression: base class calls yield_dashboard_lineage_details once per + db_service_prefix. Evicting the RDL inside that method dropped lineage + for every prefix after the first.""" + self._prepare(ssrs_source, RDL_SALES) + captured_services = [] + + def record(*, fqn_search_string, **_): + captured_services.append(fqn_search_string.split(".", 1)[0]) + return SimpleNamespace(id=SimpleNamespace(root="t")) + + ssrs_source.metadata.search_in_any_service = MagicMock(side_effect=record) + with patch( + "metadata.ingestion.source.dashboard.ssrs.metadata.LineageParser" + ) as mock_parser, patch.object( + SsrsSource, + "_get_add_lineage_request", + staticmethod(lambda **_: Either(right=SimpleNamespace())), + ): + mock_parser.return_value.source_tables = ["dbo.Orders"] + for prefix in ("service_a", "service_b", "service_c"): + list( + ssrs_source.yield_dashboard_lineage_details( + MOCK_REPORTS[0], db_service_prefix=prefix + ) + ) + assert captured_services == ["service_a", "service_b", "service_c"] + assert MOCK_REPORTS[0].id in ssrs_source._report_definitions + + def test_yield_dashboard_lineage_evicts_rdl_after_all_prefixes(self, ssrs_source): + """Eviction moved from yield_dashboard_lineage_details to the outer + yield_dashboard_lineage so it fires once per dashboard, not per prefix.""" + self._prepare(ssrs_source, RDL_SALES) + ssrs_source.context.get().__dict__["dashboard"] = MOCK_REPORTS[0].id + ssrs_source.context.get().__dict__["dataModels"] = [] + assert MOCK_REPORTS[0].id in ssrs_source._report_definitions + list(ssrs_source.yield_dashboard_lineage(MOCK_REPORTS[0])) assert MOCK_REPORTS[0].id not in ssrs_source._report_definitions def test_skips_expression_command(self, ssrs_source): @@ -915,6 +953,39 @@ def test_dialect_defaults_to_tsql(self, ssrs_source): list(ssrs_source.yield_dashboard_lineage_details(MOCK_REPORTS[0])) assert Dialect.TSQL in mock_parser.call_args.args + def test_four_part_prefix_does_not_collapse_lineage(self, ssrs_source): + """A dbServicePrefix with 4 dot-parts used to overwrite every source + table with its last segment, collapsing all lineage to one target.""" + self._prepare(ssrs_source, RDL_MULTI) + parser_revenue = MagicMock() + parser_revenue.source_tables = ["dbo.Revenue"] + parser_expenses = MagicMock() + parser_expenses.source_tables = ["dbo.Expenses"] + captured_tables = [] + + def record(*, fqn_search_string, **_): + captured_tables.append(fqn_search_string) + return SimpleNamespace(id=SimpleNamespace(root="tbl")) + + ssrs_source.metadata.search_in_any_service = MagicMock(side_effect=record) + with patch( + "metadata.ingestion.source.dashboard.ssrs.metadata.LineageParser", + side_effect=[parser_revenue, parser_expenses], + ), patch.object( + SsrsSource, + "_get_add_lineage_request", + staticmethod(lambda **_: Either(right=SimpleNamespace())), + ): + list( + ssrs_source.yield_dashboard_lineage_details( + MOCK_REPORTS[0], + db_service_prefix="my_mssql.FinanceDB.dbo.OVERRIDE_TABLE", + ) + ) + assert any("Revenue" in q for q in captured_tables) + assert any("Expenses" in q for q in captured_tables) + assert not any("OVERRIDE_TABLE" in q for q in captured_tables) + def test_dialect_uses_data_provider_when_no_db_service(self, ssrs_source): rdl = SsrsReportDefinition( data_sources=[ From 9314497747a0c25ced305675ad4abe1d0f9f6b8e Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 23 Apr 2026 00:30:33 -0700 Subject: [PATCH 6/6] address comments --- .../ingestion/source/dashboard/ssrs/client.py | 78 ++++++--- .../source/dashboard/ssrs/metadata.py | 43 ++--- .../source/dashboard/ssrs/rdl_parser.py | 6 +- .../unit/topology/dashboard/test_ssrs.py | 164 ++++++++++++++++-- .../dashboard/test_ssrs_rdl_parser.py | 28 +++ 5 files changed, 251 insertions(+), 68 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py index 7c6ee667fa75..e8ff5097d92f 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py @@ -13,6 +13,7 @@ """ import base64 import binascii +import json from typing import Iterable, Iterator, Optional, Union import requests @@ -147,16 +148,17 @@ def get_report_definition(self, report_id: str) -> Optional[bytes]: """Return the RDL XML bytes for a report, or ``None`` if unavailable. Tries ``/Reports({id})/Content/$value`` first, then ``/CatalogItems({id})/Content``. - Not-found responses (404/400) trigger fallback silently; transport errors - propagate so operators see outages instead of empty catalogs.""" + Only 404 triggers silent fallback; permission errors (401/403), server errors + (5xx after retries), and transport errors raise ``SourceConnectionException`` so + operators see outages instead of silently deleted entities.""" last_err: Optional[Exception] = None for template in RDL_CONTENT_PATHS: path = template.format(id=report_id) try: body = self._fetch_report_content(path) - except requests.RequestException as exc: + except (requests.RequestException, SourceConnectionException) as exc: last_err = exc - logger.warning("RDL fetch transport error for %s: %s", path, exc) + logger.warning("RDL fetch failed for %s: %s", path, exc) continue if body is not None: return body @@ -168,19 +170,45 @@ def get_report_definition(self, report_id: str) -> Optional[bytes]: def _fetch_report_content(self, path: str) -> Optional[bytes]: url = f"{self.base_url}{path}" - resp = self.session.get( + with self.session.get( url, timeout=(CONNECT_TIMEOUT, RDL_READ_TIMEOUT), headers={"Accept": "application/xml,application/octet-stream"}, - ) - if resp.status_code in RDL_NOT_FOUND_STATUS: - return None - if not resp.ok: - logger.warning("RDL fetch returned HTTP %s for %s", resp.status_code, path) - return None - if _exceeds_size_limit(resp, path): + stream=True, + ) as resp: + if resp.status_code in RDL_NOT_FOUND_STATUS: + return None + if not resp.ok: + raise SourceConnectionException( + f"RDL fetch returned HTTP {resp.status_code} for {path}" + ) + if _exceeds_size_limit(resp, path): + return None + body = _read_bounded_body(resp, path) + if body is None: + return None + return _decode_rdl_body( + body, + (resp.headers.get("Content-Type") or "").lower(), + path, + ) + + +def _read_bounded_body(resp: requests.Response, path: str) -> Optional[bytes]: + """Stream response body into memory, aborting if it exceeds ``MAX_RDL_BYTES``.""" + buffer = bytearray() + for chunk in resp.iter_content(chunk_size=65536): + if not chunk: + continue + if len(buffer) + len(chunk) > MAX_RDL_BYTES: + logger.warning( + "RDL at %s exceeds size limit (>%s bytes); aborting download", + path, + MAX_RDL_BYTES, + ) return None - return _decode_rdl_response(resp, path) + buffer.extend(chunk) + return bytes(buffer) def _exceeds_size_limit(resp: requests.Response, path: str) -> bool: @@ -202,14 +230,16 @@ def _exceeds_size_limit(resp: requests.Response, path: str) -> bool: return False -def _decode_rdl_response(resp: requests.Response, path: str) -> Optional[bytes]: - content_type = (resp.headers.get("Content-Type") or "").lower() +def _decode_rdl_body(body: bytes, content_type: str, path: str) -> Optional[bytes]: + """Decode an already-read response body. If JSON-wrapped base64, unwrap it.""" + if not body: + return None if "json" not in content_type: - return _truncate_to_limit(resp.content, path) if resp.content else None + return body try: - payload = resp.json() + payload = json.loads(body) except ValueError: - return _truncate_to_limit(resp.content, path) if resp.content else None + return body value = payload.get("Value") if isinstance(payload, dict) else None if not value: logger.warning("RDL JSON response missing 'Value' field at %s", path) @@ -219,16 +249,12 @@ def _decode_rdl_response(resp: requests.Response, path: str) -> Optional[bytes]: except (binascii.Error, ValueError) as exc: logger.warning("Malformed base64 in RDL response at %s: %s", path, exc) return None - return _truncate_to_limit(decoded, path) - - -def _truncate_to_limit(body: bytes, path: str) -> Optional[bytes]: - if len(body) > MAX_RDL_BYTES: + if len(decoded) > MAX_RDL_BYTES: logger.warning( - "RDL at %s exceeds size limit (%s bytes > %s); skipping to avoid OOM", + "RDL at %s exceeds size limit after base64 decode (%s > %s)", path, - len(body), + len(decoded), MAX_RDL_BYTES, ) return None - return body + return decoded diff --git a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py index b46f20f7da37..7a94a840693f 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py @@ -13,7 +13,7 @@ """ import traceback from dataclasses import dataclass -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest @@ -133,7 +133,7 @@ def __init__( ): super().__init__(config, metadata) self.folder_path_map: Dict[str, str] = {} - self._report_definitions: Dict[str, SsrsReportDefinition] = {} + self._current_rdl: Optional[Tuple[str, SsrsReportDefinition]] = None def prepare(self): self.folder_path_map = { @@ -157,32 +157,32 @@ def get_dashboard_details(self, dashboard: SsrsReport) -> Optional[SsrsReport]: def _get_report_definition( self, dashboard: SsrsReport ) -> Optional[SsrsReportDefinition]: - """Fetch and cache RDL lazily. Returns ``None`` when the report has no - sources or the RDL cannot be fetched/parsed.""" - cached = self._report_definitions.get(dashboard.id) - if cached is not None: - return cached + """Fetch and cache the RDL for the dashboard currently being processed. + + Uses a single-entry cache keyed by report id so memory is bounded at + O(1) across the ingestion run — the previous report's RDL is released + the moment a new report is requested. + + ``SourceConnectionException`` propagates so that mark-deleted flows do + not drop entities during a transient SSRS outage. ``ValueError`` from a + malformed RDL is treated as a per-report problem and skipped.""" + if self._current_rdl and self._current_rdl[0] == dashboard.id: + return self._current_rdl[1] + self._current_rdl = None if dashboard.has_data_sources is False: return None - try: - rdl_bytes = self.client.get_report_definition(dashboard.id) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning( - "Could not fetch RDL for report [%s]: %s", dashboard.name, exc - ) - return None + rdl_bytes = self.client.get_report_definition(dashboard.id) if not rdl_bytes: return None try: parsed = parse_rdl(rdl_bytes) - except Exception as exc: + except ValueError as exc: logger.debug(traceback.format_exc()) logger.warning( "Could not parse RDL for report [%s]: %s", dashboard.name, exc ) return None - self._report_definitions[dashboard.id] = parsed + self._current_rdl = (dashboard.id, parsed) return parsed def get_project_name(self, dashboard_details: Any) -> Optional[str]: @@ -390,15 +390,6 @@ def _build_datamodel_columns(dataset: SsrsDataSet) -> List[Column]: ) return columns - def yield_dashboard_lineage(self, dashboard_details: SsrsReport): - """Base class loops over ``db_service_prefixes`` and calls - ``yield_dashboard_lineage_details`` once per prefix. We evict the - cached RDL once at the end so every prefix sees the same parsed RDL.""" - try: - yield from super().yield_dashboard_lineage(dashboard_details) - finally: - self._report_definitions.pop(dashboard_details.id, None) - def yield_dashboard_lineage_details( self, dashboard_details: SsrsReport, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/rdl_parser.py b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/rdl_parser.py index c6b05e1b65f4..b007f20f4ca4 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/ssrs/rdl_parser.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/ssrs/rdl_parser.py @@ -20,7 +20,7 @@ SERVER_KEYS = {"data source", "server", "address", "addr", "network address"} DATABASE_KEYS = {"initial catalog", "database"} -FORBIDDEN_XML_TOKENS = (b" SsrsReportDefinition: billion-laughs expansion since stdlib ElementTree honors internal entities).""" if not rdl_bytes: raise ValueError("Empty RDL content") - if any(token in rdl_bytes for token in FORBIDDEN_XML_TOKENS): + lowered = rdl_bytes.lower() + if any(token in lowered for token in FORBIDDEN_XML_TOKENS): raise ValueError("RDL contains a DTD or entity declaration; refusing to parse") + del lowered try: root = ET.fromstring(rdl_bytes) except ET.ParseError as exc: diff --git a/ingestion/tests/unit/topology/dashboard/test_ssrs.py b/ingestion/tests/unit/topology/dashboard/test_ssrs.py index 8a6e260f9a99..dbc405bf058a 100644 --- a/ingestion/tests/unit/topology/dashboard/test_ssrs.py +++ b/ingestion/tests/unit/topology/dashboard/test_ssrs.py @@ -466,6 +466,115 @@ def _build_mock_client(): return client +class _StreamResponseStub: + """Minimal context-manager stand-in for ``requests.Response`` in streaming mode.""" + + def __init__(self, chunks, headers=None, status_code=200): + self._chunks = chunks + self.headers = headers or {} + self.status_code = status_code + + @property + def ok(self) -> bool: + return 200 <= self.status_code < 400 + + def iter_content(self, chunk_size=None): + yield from self._chunks + + def __enter__(self): + return self + + def __exit__(self, *args): + return False + + +class TestSsrsClientRdl: + def _client(self): + from metadata.generated.schema.entity.services.connections.dashboard.ssrsConnection import ( + SsrsConnection, + ) + + return SsrsClient( + SsrsConnection( + hostPort="http://ssrs.example.com/reports", + username="u", + password="p", + ) + ) + + def test_get_report_definition_aborts_on_oversized_stream(self, monkeypatch): + from metadata.ingestion.source.dashboard.ssrs import client as client_module + + monkeypatch.setattr(client_module, "MAX_RDL_BYTES", 100) + client = self._client() + oversized_chunks = [b"x" * 60, b"y" * 60] + client.session = MagicMock() + client.session.get = MagicMock( + return_value=_StreamResponseStub( + chunks=oversized_chunks, + headers={"Content-Type": "application/xml"}, + ) + ) + assert client.get_report_definition("big-report") is None + client.session.get.assert_called() + assert client.session.get.call_args.kwargs["stream"] is True + + def test_get_report_definition_raises_on_permission_error(self): + client = self._client() + client.session = MagicMock() + client.session.get = MagicMock( + side_effect=[ + _StreamResponseStub(chunks=iter([]), status_code=403), + _StreamResponseStub(chunks=iter([]), status_code=403), + ] + ) + with pytest.raises(SourceConnectionException): + client.get_report_definition("no-access") + + def test_get_report_definition_raises_on_server_error(self): + client = self._client() + client.session = MagicMock() + client.session.get = MagicMock( + side_effect=[ + _StreamResponseStub(chunks=iter([]), status_code=500), + _StreamResponseStub(chunks=iter([]), status_code=500), + ] + ) + with pytest.raises(SourceConnectionException): + client.get_report_definition("broken") + + def test_get_report_definition_404_triggers_silent_fallback(self): + client = self._client() + client.session = MagicMock() + client.session.get = MagicMock( + side_effect=[ + _StreamResponseStub(chunks=iter([]), status_code=404), + _StreamResponseStub(chunks=iter([]), status_code=404), + ] + ) + assert client.get_report_definition("missing") is None + + def test_get_report_definition_rejects_by_content_length_before_reading( + self, monkeypatch + ): + from metadata.ingestion.source.dashboard.ssrs import client as client_module + + monkeypatch.setattr(client_module, "MAX_RDL_BYTES", 100) + client = self._client() + + def exploding_chunks(): + raise AssertionError("body should not be read when Content-Length trips") + yield + + stub = _StreamResponseStub( + chunks=exploding_chunks(), + headers={"Content-Length": "999", "Content-Type": "application/xml"}, + ) + client.session = MagicMock() + client.session.get = MagicMock(return_value=stub) + assert client.get_report_definition("big-by-header") is None + + class TestSsrsClientPagination: def test_get_reports_single_page(self): client = _build_mock_client() @@ -735,7 +844,7 @@ def _set_context(source, **kwargs): class TestSsrsYieldDatamodel: def _prepare(self, ssrs_source, rdl): - ssrs_source._report_definitions = {MOCK_REPORTS[0].id: rdl} + ssrs_source._current_rdl = (MOCK_REPORTS[0].id, rdl) ssrs_source.source_config.includeDataModels = True def test_emits_one_per_dataset(self, ssrs_source): @@ -767,19 +876,21 @@ def test_sql_omitted_for_expression(self, ssrs_source): assert results[0].right.sql is None def test_skipped_when_include_data_models_false(self, ssrs_source): - ssrs_source._report_definitions = {MOCK_REPORTS[0].id: RDL_SALES} + ssrs_source._current_rdl = (MOCK_REPORTS[0].id, RDL_SALES) ssrs_source.source_config.includeDataModels = False assert list(ssrs_source.yield_datamodel(MOCK_REPORTS[0])) == [] def test_no_rdl_cached(self, ssrs_source): - ssrs_source._report_definitions = {} + ssrs_source._current_rdl = None + ssrs_source.client = MagicMock() + ssrs_source.client.get_report_definition = MagicMock(return_value=None) ssrs_source.source_config.includeDataModels = True assert list(ssrs_source.yield_datamodel(MOCK_REPORTS[0])) == [] class TestSsrsLineage: def _prepare(self, ssrs_source, rdl, *, include_data_models=True): - ssrs_source._report_definitions = {MOCK_REPORTS[0].id: rdl} + ssrs_source._current_rdl = (MOCK_REPORTS[0].id, rdl) ssrs_source.source_config.includeDataModels = include_data_models datamodel_entity = SimpleNamespace( id=SimpleNamespace(root="dm-uuid"), fullyQualifiedName=None @@ -862,17 +973,40 @@ def record(*, fqn_search_string, **_): ) ) assert captured_services == ["service_a", "service_b", "service_c"] - assert MOCK_REPORTS[0].id in ssrs_source._report_definitions + assert ssrs_source._current_rdl[0] == MOCK_REPORTS[0].id - def test_yield_dashboard_lineage_evicts_rdl_after_all_prefixes(self, ssrs_source): - """Eviction moved from yield_dashboard_lineage_details to the outer - yield_dashboard_lineage so it fires once per dashboard, not per prefix.""" + def test_single_entry_cache_displaces_previous_report(self, ssrs_source): + """The cache is bounded to one entry by construction — fetching a new + report's RDL evicts the previous one automatically.""" self._prepare(ssrs_source, RDL_SALES) - ssrs_source.context.get().__dict__["dashboard"] = MOCK_REPORTS[0].id - ssrs_source.context.get().__dict__["dataModels"] = [] - assert MOCK_REPORTS[0].id in ssrs_source._report_definitions - list(ssrs_source.yield_dashboard_lineage(MOCK_REPORTS[0])) - assert MOCK_REPORTS[0].id not in ssrs_source._report_definitions + assert ssrs_source._current_rdl[0] == MOCK_REPORTS[0].id + new_report = SsrsReport( + Id="report-next", + Name="Next", + Path="/next", + HasDataSources=True, + ) + ssrs_source.client = MagicMock() + ssrs_source.client.get_report_definition = MagicMock(return_value=None) + ssrs_source._get_report_definition(new_report) + assert ssrs_source._current_rdl is None + + def test_source_connection_error_propagates(self, ssrs_source): + """Transient SSRS failures must propagate so mark-deleted does not drop + entities during an outage.""" + ssrs_source._current_rdl = None + ssrs_source.client = MagicMock() + ssrs_source.client.get_report_definition = MagicMock( + side_effect=SourceConnectionException("SSRS is down") + ) + report = SsrsReport( + Id="r-outage", + Name="Outage", + Path="/outage", + HasDataSources=True, + ) + with pytest.raises(SourceConnectionException): + ssrs_source._get_report_definition(report) def test_skips_expression_command(self, ssrs_source): self._prepare(ssrs_source, RDL_EXPRESSION) @@ -1016,7 +1150,9 @@ def test_dialect_uses_data_provider_when_no_db_service(self, ssrs_source): assert Dialect.ORACLE in mock_parser.call_args.args def test_no_rdl_yields_nothing(self, ssrs_source): - ssrs_source._report_definitions = {} + ssrs_source._current_rdl = None + ssrs_source.client = MagicMock() + ssrs_source.client.get_report_definition = MagicMock(return_value=None) ssrs_source.source_config.includeDataModels = True assert list(ssrs_source.yield_dashboard_lineage_details(MOCK_REPORTS[0])) == [] diff --git a/ingestion/tests/unit/topology/dashboard/test_ssrs_rdl_parser.py b/ingestion/tests/unit/topology/dashboard/test_ssrs_rdl_parser.py index 42de31312451..fd5d159e8e31 100644 --- a/ingestion/tests/unit/topology/dashboard/test_ssrs_rdl_parser.py +++ b/ingestion/tests/unit/topology/dashboard/test_ssrs_rdl_parser.py @@ -103,6 +103,34 @@ def test_entity_is_rejected(self): with pytest.raises(ValueError, match="DTD or entity"): parse_rdl(payload) + @pytest.mark.parametrize( + "variant", + [ + b"' + variant + b' x "y">' + with pytest.raises(ValueError, match="DTD or entity"): + parse_rdl(payload) + + def test_doctype_after_leading_comment_rejected(self): + padding = b"" + payload = ( + b'' + + padding + + b']>' + ) + with pytest.raises(ValueError, match="DTD or entity"): + parse_rdl(payload) + def test_namespace_2008_2010_2016_equivalence(self): template = ( ''