Skip to content

Commit 1aefcff

Browse files
harshachgithub-actions[bot]
authored andcommitted
Improve SSRS Connector - Lineage (#27652)
* Improve SSRS Connector - Lineage * Update generated TypeScript types * Add ownership extraction * remove claude file * Address comments * address comments --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 25d6213 commit 1aefcff

18 files changed

Lines changed: 1842 additions & 22 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,5 +197,7 @@ ingestion/.claude/agents
197197
# Connector audit working files — per-session, never committed
198198
.claude/audit-results/
199199
.claude/connector-audit.json
200+
.claude/scheduled_tasks.lock
201+
.claude/plans/
200202

201203
test-results/

ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py

Lines changed: 123 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
"""
1212
SSRS REST client
1313
"""
14+
import base64
15+
import binascii
16+
import json
1417
from typing import Iterable, Iterator, Optional, Union
1518

1619
import requests
@@ -36,12 +39,16 @@
3639
API_VERSION = "api/v2.0"
3740
CONNECT_TIMEOUT = 10
3841
READ_TIMEOUT = 120
42+
RDL_READ_TIMEOUT = 60
3943
PAGE_SIZE = 100
4044
MAX_RETRIES = 2
4145
BACKOFF_FACTOR = 1
4246
RETRY_STATUS_CODES = (500, 502, 503, 504)
43-
REPORT_SELECT_FIELDS = "Id,Name,Path,Description,Type,Hidden,HasDataSources"
47+
REPORT_SELECT_FIELDS = "Id,Name,Path,Description,Type,Hidden,HasDataSources,CreatedBy"
4448
FOLDER_SELECT_FIELDS = "Id,Name,Path"
49+
RDL_CONTENT_PATHS = ("/Reports({id})/Content/$value", "/CatalogItems({id})/Content")
50+
RDL_NOT_FOUND_STATUS = {404}
51+
MAX_RDL_BYTES = 50 * 1024 * 1024
4552

4653

4754
class SsrsClient:
@@ -136,3 +143,118 @@ def get_reports(self) -> Iterator[SsrsReport]:
136143
}
137144
for data in self._paginate("/Reports", params, "reports"):
138145
yield from SsrsReportListResponse(**data).value
146+
147+
def get_report_definition(self, report_id: str) -> Optional[bytes]:
148+
"""Return the RDL XML bytes for a report, or ``None`` if unavailable.
149+
150+
Tries ``/Reports({id})/Content/$value`` first, then ``/CatalogItems({id})/Content``.
151+
Only 404 triggers silent fallback; permission errors (401/403), server errors
152+
(5xx after retries), and transport errors raise ``SourceConnectionException`` so
153+
operators see outages instead of silently deleted entities."""
154+
last_err: Optional[Exception] = None
155+
for template in RDL_CONTENT_PATHS:
156+
path = template.format(id=report_id)
157+
try:
158+
body = self._fetch_report_content(path)
159+
except (requests.RequestException, SourceConnectionException) as exc:
160+
last_err = exc
161+
logger.warning("RDL fetch failed for %s: %s", path, exc)
162+
continue
163+
if body is not None:
164+
return body
165+
if last_err is not None:
166+
raise SourceConnectionException(
167+
f"Failed to fetch RDL content for report [{report_id}]: {last_err}"
168+
) from last_err
169+
return None
170+
171+
def _fetch_report_content(self, path: str) -> Optional[bytes]:
172+
url = f"{self.base_url}{path}"
173+
with self.session.get(
174+
url,
175+
timeout=(CONNECT_TIMEOUT, RDL_READ_TIMEOUT),
176+
headers={"Accept": "application/xml,application/octet-stream"},
177+
stream=True,
178+
) as resp:
179+
if resp.status_code in RDL_NOT_FOUND_STATUS:
180+
return None
181+
if not resp.ok:
182+
raise SourceConnectionException(
183+
f"RDL fetch returned HTTP {resp.status_code} for {path}"
184+
)
185+
if _exceeds_size_limit(resp, path):
186+
return None
187+
body = _read_bounded_body(resp, path)
188+
if body is None:
189+
return None
190+
return _decode_rdl_body(
191+
body,
192+
(resp.headers.get("Content-Type") or "").lower(),
193+
path,
194+
)
195+
196+
197+
def _read_bounded_body(resp: requests.Response, path: str) -> Optional[bytes]:
198+
"""Stream response body into memory, aborting if it exceeds ``MAX_RDL_BYTES``."""
199+
buffer = bytearray()
200+
for chunk in resp.iter_content(chunk_size=65536):
201+
if not chunk:
202+
continue
203+
if len(buffer) + len(chunk) > MAX_RDL_BYTES:
204+
logger.warning(
205+
"RDL at %s exceeds size limit (>%s bytes); aborting download",
206+
path,
207+
MAX_RDL_BYTES,
208+
)
209+
return None
210+
buffer.extend(chunk)
211+
return bytes(buffer)
212+
213+
214+
def _exceeds_size_limit(resp: requests.Response, path: str) -> bool:
215+
length = resp.headers.get("Content-Length")
216+
if length is None:
217+
return False
218+
try:
219+
length_int = int(length)
220+
except ValueError:
221+
return False
222+
if length_int > MAX_RDL_BYTES:
223+
logger.warning(
224+
"RDL at %s exceeds size limit (%s bytes > %s); skipping to avoid OOM",
225+
path,
226+
length_int,
227+
MAX_RDL_BYTES,
228+
)
229+
return True
230+
return False
231+
232+
233+
def _decode_rdl_body(body: bytes, content_type: str, path: str) -> Optional[bytes]:
234+
"""Decode an already-read response body. If JSON-wrapped base64, unwrap it."""
235+
if not body:
236+
return None
237+
if "json" not in content_type:
238+
return body
239+
try:
240+
payload = json.loads(body)
241+
except ValueError:
242+
return body
243+
value = payload.get("Value") if isinstance(payload, dict) else None
244+
if not value:
245+
logger.warning("RDL JSON response missing 'Value' field at %s", path)
246+
return None
247+
try:
248+
decoded = base64.b64decode(value, validate=True)
249+
except (binascii.Error, ValueError) as exc:
250+
logger.warning("Malformed base64 in RDL response at %s: %s", path, exc)
251+
return None
252+
if len(decoded) > MAX_RDL_BYTES:
253+
logger.warning(
254+
"RDL at %s exceeds size limit after base64 decode (%s > %s)",
255+
path,
256+
len(decoded),
257+
MAX_RDL_BYTES,
258+
)
259+
return None
260+
return decoded

0 commit comments

Comments
 (0)