Skip to content

Commit 1905f6e

Browse files
microsoft-fabric - database & pipeline (#27198)
* microsoft-fabric - database + pipeline * fix style * address comments * fix: organize imports in ServiceIconUtils.ts --------- Co-authored-by: ulixius9 <mayursingal9@gmail.com>
1 parent cd0b259 commit 1905f6e

33 files changed

Lines changed: 4163 additions & 2 deletions

ingestion/setup.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@
192192
"atlas": {},
193193
"azuresql": {VERSIONS["pyodbc"]},
194194
"azure-sso": {VERSIONS["msal"]},
195+
"microsoftfabric": {VERSIONS["pyodbc"], VERSIONS["msal"]},
196+
"microsoftfabricpipeline": {VERSIONS["msal"]},
195197
"backup": {VERSIONS["boto3"], VERSIONS["azure-identity"], "azure-storage-blob"},
196198
"googledrive": {
197199
"google-api-python-client>=2.0.0",
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
"""
12+
Microsoft Fabric client module
13+
"""
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
"""
12+
Microsoft Fabric Authentication Module
13+
14+
Provides unified authentication for all Fabric services:
15+
- Fabric Warehouse/Lakehouse (Database)
16+
- Fabric Data Factory (Pipeline)
17+
- Fabric Power BI (Dashboard)
18+
19+
Supports:
20+
- Service Principal (ClientSecretCredential)
21+
- Managed Identity (DefaultAzureCredential)
22+
"""
23+
import traceback
24+
from time import sleep
25+
from typing import Callable, Optional, Tuple
26+
27+
import msal
28+
29+
from metadata.utils.logger import ingestion_logger
30+
31+
logger = ingestion_logger()
32+
33+
AUTH_TOKEN_MAX_RETRIES = 5
34+
AUTH_TOKEN_RETRY_WAIT = 120
35+
36+
# OAuth2 scopes for different Fabric services
37+
FABRIC_API_SCOPE = ["https://api.fabric.microsoft.com/.default"]
38+
POWER_BI_SCOPE = ["https://analysis.windows.net/powerbi/api/.default"]
39+
DATABASE_SCOPE = ["https://database.windows.net/.default"]
40+
41+
42+
class FabricAuthenticator:
43+
"""
44+
Unified authenticator for Microsoft Fabric services.
45+
46+
Provides token acquisition for REST APIs using MSAL.
47+
"""
48+
49+
def __init__(
50+
self,
51+
tenant_id: str,
52+
client_id: str,
53+
client_secret: str,
54+
authority_uri: str = "https://login.microsoftonline.com/",
55+
):
56+
self.tenant_id = tenant_id
57+
self.client_id = client_id
58+
self.client_secret = client_secret
59+
self.authority_uri = authority_uri
60+
self._msal_client: Optional[msal.ConfidentialClientApplication] = None
61+
62+
@property
63+
def msal_client(self) -> msal.ConfidentialClientApplication:
64+
"""Lazy-initialize MSAL client for OAuth token acquisition"""
65+
if self._msal_client is None:
66+
self._msal_client = msal.ConfidentialClientApplication(
67+
client_id=self.client_id,
68+
client_credential=self.client_secret,
69+
authority=f"{self.authority_uri}{self.tenant_id}",
70+
)
71+
return self._msal_client
72+
73+
def get_token(self, scopes: list) -> Tuple[str, int]:
74+
"""
75+
Acquire OAuth2 access token for the given scopes.
76+
77+
Returns:
78+
Tuple of (access_token, expires_in_seconds)
79+
"""
80+
# Try cache first
81+
response_data = self._get_token_from_cache(scopes)
82+
if not response_data:
83+
logger.info("Token does not exist in the cache. Getting a new token.")
84+
response_data = self._generate_new_token(scopes)
85+
86+
response_data = response_data or {}
87+
access_token = response_data.get("access_token")
88+
expires_in = response_data.get("expires_in", 3600)
89+
90+
if not access_token:
91+
raise ValueError(
92+
f"Failed to acquire token: {response_data.get('error_description', 'Unknown error')}"
93+
)
94+
95+
logger.info("Fabric access token generated successfully")
96+
return access_token, expires_in
97+
98+
def _generate_new_token(self, scopes: list) -> Optional[dict]:
99+
"""Generate new auth token with retry logic"""
100+
retry = AUTH_TOKEN_MAX_RETRIES
101+
while retry:
102+
try:
103+
response_data = self.msal_client.acquire_token_for_client(scopes=scopes)
104+
return response_data
105+
except Exception as exc:
106+
logger.debug(traceback.format_exc())
107+
logger.warning(f"Error generating new auth token: {exc}")
108+
retry -= 1
109+
if retry:
110+
logger.warning(
111+
f"Error generating new token: {exc}, "
112+
f"sleep {AUTH_TOKEN_RETRY_WAIT} seconds retrying {retry} more times.."
113+
)
114+
sleep(AUTH_TOKEN_RETRY_WAIT)
115+
else:
116+
logger.warning(
117+
"Could not generate new token after maximum retries, "
118+
"Please check provided configs"
119+
)
120+
return None
121+
122+
def _get_token_from_cache(self, scopes: list) -> Optional[dict]:
123+
"""Fetch auth token from cache with retry logic"""
124+
retry = AUTH_TOKEN_MAX_RETRIES
125+
while retry:
126+
try:
127+
response_data = self.msal_client.acquire_token_silent(
128+
scopes=scopes, account=None
129+
)
130+
return response_data
131+
except Exception as exc:
132+
logger.debug(traceback.format_exc())
133+
logger.warning(f"Error getting token from cache: {exc}")
134+
retry -= 1
135+
if retry:
136+
logger.warning(
137+
f"Error getting token from cache: {exc}, "
138+
f"sleep {AUTH_TOKEN_RETRY_WAIT} seconds retrying {retry} more times.."
139+
)
140+
sleep(AUTH_TOKEN_RETRY_WAIT)
141+
else:
142+
logger.warning(
143+
"Could not get token from cache after maximum retries, "
144+
"Please check provided configs"
145+
)
146+
return None
147+
148+
def get_fabric_api_token(self) -> Tuple[str, int]:
149+
"""Get token for Fabric REST API"""
150+
return self.get_token(FABRIC_API_SCOPE)
151+
152+
def get_power_bi_token(self) -> Tuple[str, int]:
153+
"""Get token for Power BI API"""
154+
return self.get_token(POWER_BI_SCOPE)
155+
156+
def get_token_callback(self, scopes: list) -> Callable[[], Tuple[str, int]]:
157+
"""
158+
Returns a callable for lazy token acquisition.
159+
Useful for REST clients that need refreshable tokens.
160+
"""
161+
return lambda: self.get_token(scopes)

0 commit comments

Comments
 (0)