Skip to content

Commit 08a539f

Browse files
ulixius9claudeharshach
committed
fix(ssrs): streamline test connection, stream reports, retry transient failures (#27637)
* fix(ssrs): streamline test connection, stream reports, and retry transient failures - Test connection's GetDashboards step no longer paginates every report; it now issues a single $top=1 probe to /Reports, matching the existing CheckAccess probe against /Folders. - SsrsClient.get_reports is now a generator that yields each page as it is fetched, so ingestion memory stays flat regardless of report count. - Session retries transient failures (connect errors, read timeouts, 5xx) up to 2 times with exponential backoff to survive flaky SSRS endpoints. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Fixing handling of pagination * fix test --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Sriharsha Chintalapani <harsha@getcollate.io>
1 parent e043fa9 commit 08a539f

6 files changed

Lines changed: 301 additions & 65 deletions

File tree

ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py

Lines changed: 65 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111
"""
1212
SSRS REST client
1313
"""
14-
import traceback
15-
from typing import List, Optional, Union
14+
from typing import Iterable, Iterator, Optional, Union
1615

1716
import requests
17+
from requests.adapters import HTTPAdapter
1818
from requests_ntlm import HttpNtlmAuth
19+
from urllib3.util.retry import Retry
1920

2021
from metadata.generated.schema.entity.services.connections.dashboard.ssrsConnection import (
2122
SsrsConnection,
@@ -33,8 +34,14 @@
3334
logger = ingestion_logger()
3435

3536
API_VERSION = "api/v2.0"
36-
DEFAULT_TIMEOUT = 30
37+
CONNECT_TIMEOUT = 10
38+
READ_TIMEOUT = 120
3739
PAGE_SIZE = 100
40+
MAX_RETRIES = 2
41+
BACKOFF_FACTOR = 1
42+
RETRY_STATUS_CODES = (500, 502, 503, 504)
43+
REPORT_SELECT_FIELDS = "Id,Name,Path,Description,Type,Hidden,HasDataSources"
44+
FOLDER_SELECT_FIELDS = "Id,Name,Path"
3845

3946

4047
class SsrsClient:
@@ -53,17 +60,51 @@ def __init__(
5360
self.session.headers.update({"Accept": "application/json"})
5461
if verify_ssl is not None:
5562
self.session.verify = verify_ssl
63+
retry = Retry(
64+
total=MAX_RETRIES,
65+
connect=MAX_RETRIES,
66+
read=MAX_RETRIES,
67+
status=MAX_RETRIES,
68+
backoff_factor=BACKOFF_FACTOR,
69+
status_forcelist=RETRY_STATUS_CODES,
70+
allowed_methods=frozenset(["GET"]),
71+
raise_on_status=False,
72+
)
73+
adapter = HTTPAdapter(max_retries=retry)
74+
self.session.mount("http://", adapter)
75+
self.session.mount("https://", adapter)
5676

5777
def close(self) -> None:
5878
if self.session:
5979
self.session.close()
6080

6181
def _get(self, path: str, params: Optional[dict] = None) -> dict:
6282
url = f"{self.base_url}{path}"
63-
resp = self.session.get(url, timeout=DEFAULT_TIMEOUT, params=params)
83+
resp = self.session.get(
84+
url, timeout=(CONNECT_TIMEOUT, READ_TIMEOUT), params=params
85+
)
6486
resp.raise_for_status()
6587
return resp.json()
6688

89+
def _paginate(self, path: str, params: dict, resource_label: str) -> Iterable[dict]:
90+
"""Yield pages from an OData endpoint. Any per-page failure raises
91+
``SourceConnectionException`` so callers can surface it instead of
92+
producing a silently truncated result set."""
93+
skip = 0
94+
while True:
95+
page_params = {**params, "$top": str(PAGE_SIZE), "$skip": str(skip)}
96+
try:
97+
data = self._get(path, params=page_params)
98+
except Exception as exc:
99+
raise SourceConnectionException(
100+
f"Failed to fetch SSRS {resource_label} at skip={skip}: {exc}"
101+
) from exc
102+
yield data
103+
value = data.get("value") or []
104+
if len(value) < PAGE_SIZE:
105+
return
106+
skip += PAGE_SIZE
107+
67108
def test_access(self) -> None:
68109
try:
69110
self._get("/Folders", params={"$top": "1"})
@@ -72,40 +113,26 @@ def test_access(self) -> None:
72113
f"Failed to connect to SSRS: {exc}"
73114
) from exc
74115

75-
def get_folders(self) -> List[SsrsFolder]:
116+
def test_get_reports(self) -> None:
76117
try:
77-
results: List[SsrsFolder] = []
78-
skip = 0
79-
while True:
80-
data = self._get(
81-
"/Folders", params={"$top": str(PAGE_SIZE), "$skip": str(skip)}
82-
)
83-
response = SsrsFolderListResponse(**data)
84-
results.extend(response.value)
85-
if len(response.value) < PAGE_SIZE:
86-
break
87-
skip += PAGE_SIZE
88-
return results
118+
self._get("/Reports", params={"$top": "1"})
89119
except Exception as exc:
90-
logger.debug(traceback.format_exc())
91-
logger.warning("Failed to fetch SSRS folders: %s", exc)
92-
return []
120+
raise SourceConnectionException(
121+
f"Failed to fetch SSRS reports: {exc}"
122+
) from exc
93123

94-
def get_reports(self) -> List[SsrsReport]:
95-
try:
96-
results: List[SsrsReport] = []
97-
skip = 0
98-
while True:
99-
data = self._get(
100-
"/Reports", params={"$top": str(PAGE_SIZE), "$skip": str(skip)}
101-
)
102-
response = SsrsReportListResponse(**data)
103-
results.extend(response.value)
104-
if len(response.value) < PAGE_SIZE:
105-
break
106-
skip += PAGE_SIZE
107-
return results
108-
except Exception as exc:
109-
logger.debug(traceback.format_exc())
110-
logger.warning("Failed to fetch SSRS reports: %s", exc)
111-
return []
124+
def get_folders(self) -> Iterator[SsrsFolder]:
125+
params = {
126+
"$orderby": "Id",
127+
"$select": FOLDER_SELECT_FIELDS,
128+
}
129+
for data in self._paginate("/Folders", params, "folders"):
130+
yield from SsrsFolderListResponse(**data).value
131+
132+
def get_reports(self) -> Iterator[SsrsReport]:
133+
params = {
134+
"$orderby": "Id",
135+
"$select": REPORT_SELECT_FIELDS,
136+
}
137+
for data in self._paginate("/Reports", params, "reports"):
138+
yield from SsrsReportListResponse(**data).value

ingestion/src/metadata/ingestion/source/dashboard/ssrs/connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def test_connection(
4646
) -> TestConnectionResult:
4747
test_fn = {
4848
"CheckAccess": client.test_access,
49-
"GetDashboards": client.get_reports,
49+
"GetDashboards": client.test_get_reports,
5050
}
5151

5252
return test_connection_steps(

ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
SSRS source module
1313
"""
1414
import traceback
15-
from typing import Any, Dict, Iterable, List, Optional
15+
from typing import Any, Dict, Iterable, Optional
1616

1717
from metadata.generated.schema.api.data.createChart import CreateChartRequest
1818
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
@@ -77,13 +77,15 @@ def __init__(
7777
self.folder_path_map: Dict[str, str] = {}
7878

7979
def prepare(self):
80-
folders = self.client.get_folders()
81-
self.folder_path_map = {folder.path: folder.name for folder in folders}
80+
self.folder_path_map = {
81+
folder.path: folder.name for folder in self.client.get_folders()
82+
}
8283
return super().prepare()
8384

84-
def get_dashboards_list(self) -> Optional[List[SsrsReport]]:
85-
reports = self.client.get_reports()
86-
return [r for r in reports if not r.hidden]
85+
def get_dashboards_list(self) -> Iterable[SsrsReport]:
86+
for report in self.client.get_reports():
87+
if not report.hidden:
88+
yield report
8789

8890
def get_dashboard_name(self, dashboard: SsrsReport) -> str:
8991
return dashboard.name

ingestion/tests/integration/connections/test_ssrs_connection.py

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
SsrsConnection,
2222
)
2323
from metadata.ingestion.connections.test_connections import SourceConnectionException
24-
from metadata.ingestion.source.dashboard.ssrs.client import SsrsClient
24+
from metadata.ingestion.source.dashboard.ssrs.client import MAX_RETRIES, SsrsClient
2525
from metadata.ingestion.source.dashboard.ssrs.connection import get_connection
2626

2727

@@ -38,6 +38,42 @@ def log_message(self, format, *args):
3838
pass
3939

4040

41+
class _FlakyHandler(BaseHTTPRequestHandler):
42+
failures_remaining = 2
43+
request_count = 0
44+
45+
def do_GET(self):
46+
type(self).request_count += 1
47+
if type(self).failures_remaining > 0:
48+
type(self).failures_remaining -= 1
49+
self.send_response(503)
50+
self.send_header("Content-Length", "0")
51+
self.end_headers()
52+
return
53+
body = json.dumps({"value": []}).encode()
54+
self.send_response(200)
55+
self.send_header("Content-Type", "application/json")
56+
self.send_header("Content-Length", str(len(body)))
57+
self.end_headers()
58+
self.wfile.write(body)
59+
60+
def log_message(self, format, *args):
61+
pass
62+
63+
64+
class _AlwaysFailingHandler(BaseHTTPRequestHandler):
65+
request_count = 0
66+
67+
def do_GET(self):
68+
type(self).request_count += 1
69+
self.send_response(500)
70+
self.send_header("Content-Length", "0")
71+
self.end_headers()
72+
73+
def log_message(self, format, *args):
74+
pass
75+
76+
4177
@pytest.fixture(scope="module")
4278
def ssrs_mock_url():
4379
server = HTTPServer(("127.0.0.1", 0), _MockHandler)
@@ -48,6 +84,29 @@ def ssrs_mock_url():
4884
server.shutdown()
4985

5086

87+
@pytest.fixture()
88+
def ssrs_flaky_url():
89+
_FlakyHandler.failures_remaining = 2
90+
_FlakyHandler.request_count = 0
91+
server = HTTPServer(("127.0.0.1", 0), _FlakyHandler)
92+
port = server.server_address[1]
93+
thread = threading.Thread(target=server.serve_forever, daemon=True)
94+
thread.start()
95+
yield f"http://127.0.0.1:{port}/reports"
96+
server.shutdown()
97+
98+
99+
@pytest.fixture()
100+
def ssrs_always_failing_url():
101+
_AlwaysFailingHandler.request_count = 0
102+
server = HTTPServer(("127.0.0.1", 0), _AlwaysFailingHandler)
103+
port = server.server_address[1]
104+
thread = threading.Thread(target=server.serve_forever, daemon=True)
105+
thread.start()
106+
yield f"http://127.0.0.1:{port}/reports"
107+
server.shutdown()
108+
109+
51110
@pytest.mark.integration
52111
class TestSsrsConnection:
53112
def test_get_connection(self, ssrs_mock_url):
@@ -64,10 +123,48 @@ def test_get_connection_test_access(self, ssrs_mock_url):
64123
client = get_connection(connection)
65124
client.test_access()
66125

126+
def test_get_connection_test_get_reports(self, ssrs_mock_url):
127+
connection = SsrsConnection(
128+
hostPort=ssrs_mock_url, username="test_user", password="test_pass"
129+
)
130+
client = get_connection(connection)
131+
client.test_get_reports()
132+
67133
def test_connection_bad_host(self):
68134
connection = SsrsConnection(
69135
hostPort="http://localhost:1", username="test_user", password="test_pass"
70136
)
71137
client = get_connection(connection)
72138
with pytest.raises(SourceConnectionException):
73139
client.test_access()
140+
141+
def test_connection_bad_host_get_reports(self):
142+
connection = SsrsConnection(
143+
hostPort="http://localhost:1", username="test_user", password="test_pass"
144+
)
145+
client = get_connection(connection)
146+
with pytest.raises(SourceConnectionException):
147+
client.test_get_reports()
148+
149+
def test_get_reports_retries_transient_failures(self, ssrs_flaky_url):
150+
connection = SsrsConnection(
151+
hostPort=ssrs_flaky_url, username="test_user", password="test_pass"
152+
)
153+
client = get_connection(connection)
154+
reports = list(client.get_reports())
155+
assert reports == []
156+
assert _FlakyHandler.request_count == 3
157+
158+
def test_get_reports_raises_on_persistent_failure(self, ssrs_always_failing_url):
159+
"""A /Reports endpoint that keeps 5xx'ing after retries must surface
160+
as SourceConnectionException — otherwise the pipeline reports success
161+
with zero records and mark_deleted wipes the catalog."""
162+
connection = SsrsConnection(
163+
hostPort=ssrs_always_failing_url,
164+
username="test_user",
165+
password="test_pass",
166+
)
167+
client = get_connection(connection)
168+
with pytest.raises(SourceConnectionException):
169+
list(client.get_reports())
170+
assert _AlwaysFailingHandler.request_count == MAX_RETRIES + 1

ingestion/tests/integration/ssrs/test_metadata.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def test_client_get_reports(self, ssrs_service):
2626
hostPort=ssrs_service, username="test_user", password="test_pass"
2727
)
2828
client = SsrsClient(connection)
29-
reports = client.get_reports()
29+
reports = list(client.get_reports())
3030
assert len(reports) == 4
3131
assert reports[0].name == "Report 1"
3232
assert reports[0].path == "/TestFolder/Report 1"
@@ -36,7 +36,7 @@ def test_client_get_folders(self, ssrs_service):
3636
hostPort=ssrs_service, username="test_user", password="test_pass"
3737
)
3838
client = SsrsClient(connection)
39-
folders = client.get_folders()
39+
folders = list(client.get_folders())
4040
assert len(folders) == 1
4141
assert folders[0].name == "TestFolder"
4242

@@ -52,7 +52,7 @@ def test_hidden_reports_present_in_raw(self, ssrs_service):
5252
hostPort=ssrs_service, username="test_user", password="test_pass"
5353
)
5454
client = SsrsClient(connection)
55-
reports = client.get_reports()
55+
reports = list(client.get_reports())
5656
assert any(r.hidden for r in reports)
5757
visible = [r for r in reports if not r.hidden]
5858
assert len(visible) == 3

0 commit comments

Comments
 (0)