Skip to content

Commit c2a75de

Browse files
authored
Fix Parquet Reader to use boto3 client (#27361)
* Fix Parquet Reader to use boto3 client * Fix Avro/csv to use boto3 client * Fix Avro/csv to use boto3 client * Address comments * Fix S3 tests * Fix failing test * address gitar comments * address gitar comments * Address co-pilot comments
1 parent bb64349 commit c2a75de

22 files changed

Lines changed: 770 additions & 253 deletions

ingestion/src/metadata/ingestion/source/database/datalake/clients/azure_blob.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
Datalake Azure Blob Client
1414
"""
1515
from functools import partial
16-
from typing import Callable, Iterable, Optional, Set
16+
from typing import Callable, Iterable, Optional, Set, Tuple
1717

1818
from azure.storage.blob import BlobServiceClient
1919

@@ -62,7 +62,7 @@ def get_table_names(
6262
bucket_name: str,
6363
prefix: Optional[str],
6464
skip_cold_storage: bool = False,
65-
) -> Iterable[str]:
65+
) -> Iterable[Tuple[str, Optional[int]]]:
6666
container_client = self._client.get_container_client(bucket_name)
6767

6868
for file in container_client.list_blobs(name_starts_with=prefix or None):
@@ -74,7 +74,7 @@ def get_table_names(
7474
f"(blob_tier: {blob_tier})"
7575
)
7676
continue
77-
yield file.name
77+
yield file.name, getattr(file, "size", None)
7878

7979
def close(self, service_connection):
8080
self._client.close()

ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,24 @@
1313
Datalake Base Client
1414
"""
1515
from abc import ABC, abstractmethod
16-
from typing import Any, Callable, Iterable, Optional
16+
from typing import Any, Callable, Iterable, Optional, Tuple
1717

1818

1919
class DatalakeBaseClient(ABC):
2020
"""Base DL client implementation"""
2121

22-
def __init__(self, client: Any, **kwargs):
22+
def __init__(self, client: Any, session: Any = None, **kwargs):
2323
self._client = client
24+
self._session = session
2425

2526
@property
2627
def client(self) -> Any:
2728
return self._client
2829

30+
@property
31+
def session(self) -> Any:
32+
return self._session
33+
2934
@classmethod
3035
@abstractmethod
3136
def from_config(cls, config) -> "DatalakeBaseClient":
@@ -49,8 +54,8 @@ def get_table_names(
4954
bucket_name: str,
5055
prefix: Optional[str],
5156
skip_cold_storage: bool = False,
52-
) -> Iterable[str]:
53-
"""Returns the Table names, based on the underlying client."""
57+
) -> Iterable[Tuple[str, Optional[int]]]:
58+
"""Returns (key, file_size_bytes) tuples. Size may be None if unavailable."""
5459

5560
@abstractmethod
5661
def close(self, service_connection):

ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import os
1616
from copy import deepcopy
1717
from functools import partial
18-
from typing import Callable, Iterable, List, Optional, Set
18+
from typing import Callable, Iterable, List, Optional, Set, Tuple
1919

2020
from google.cloud import storage
2121

@@ -117,7 +117,7 @@ def get_table_names(
117117
bucket_name: str,
118118
prefix: Optional[str],
119119
skip_cold_storage: bool = False,
120-
) -> Iterable[str]:
120+
) -> Iterable[Tuple[str, Optional[int]]]:
121121
bucket = self._client.get_bucket(bucket_name)
122122

123123
for key in bucket.list_blobs(prefix=prefix):
@@ -129,7 +129,7 @@ def get_table_names(
129129
f"(storage_class: {storage_class})"
130130
)
131131
continue
132-
yield key.name
132+
yield key.name, key.size
133133

134134
def close(self, service_connection):
135135
os.environ.pop("GOOGLE_CLOUD_PROJECT", "")

ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
Datalake S3 Client
1414
"""
1515
from functools import partial
16-
from typing import Callable, Iterable, Optional, Set
16+
from typing import Callable, Iterable, Optional, Set, Tuple
1717

1818
from metadata.clients.aws_client import AWSClient
1919
from metadata.generated.schema.entity.services.connections.database.datalake.s3Config import (
@@ -35,8 +35,16 @@ def from_config(cls, config: S3Config) -> "DatalakeS3Client":
3535
if not config.securityConfig:
3636
raise RuntimeError("S3Config securityConfig can't be None.")
3737

38-
s3_client = AWSClient(config.securityConfig).get_client(service_name="s3")
39-
return cls(client=s3_client)
38+
aws_client = AWSClient(config.securityConfig)
39+
session = aws_client.create_session()
40+
if config.securityConfig.endPointURL:
41+
s3_client = session.client(
42+
service_name="s3",
43+
endpoint_url=str(config.securityConfig.endPointURL),
44+
)
45+
else:
46+
s3_client = session.client(service_name="s3")
47+
return cls(client=s3_client, session=session)
4048

4149
def update_client_database(self, config, database_name: str):
4250
# For the S3 Client we don't need to do anything when changing the database
@@ -57,7 +65,7 @@ def get_table_names(
5765
bucket_name: str,
5866
prefix: Optional[str],
5967
skip_cold_storage: bool = False,
60-
) -> Iterable[str]:
68+
) -> Iterable[Tuple[str, Optional[int]]]:
6169
kwargs = {"Bucket": bucket_name}
6270

6371
if prefix:
@@ -76,7 +84,7 @@ def get_table_names(
7684
f"(StorageClass: {storage_class}, ArchiveStatus: {archive_status})"
7785
)
7886
continue
79-
yield key["Key"]
87+
yield key["Key"], key.get("Size")
8088

8189
def get_folders_prefix(
8290
self, bucket_name: str, prefix: Optional[str]

ingestion/src/metadata/ingestion/source/database/datalake/metadata.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ def yield_database_schema(
226226

227227
def get_tables_name_and_type( # pylint: disable=too-many-branches
228228
self,
229-
) -> Iterable[Tuple[str, TableType, SupportedTypes]]:
229+
) -> Iterable[Tuple[str, TableType, SupportedTypes, Optional[int]]]:
230230
"""
231231
Handle table and views.
232232
@@ -251,7 +251,7 @@ def get_tables_name_and_type( # pylint: disable=too-many-branches
251251
skip_cold_storage = (
252252
getattr(self.service_connection, "skipColdStorage", False) or False
253253
)
254-
for key_name in self.client.get_table_names(
254+
for key_name, file_size in self.client.get_table_names(
255255
bucket_name, prefix, skip_cold_storage=skip_cold_storage
256256
):
257257
table_name = self.standardize_table_name(bucket_name, key_name)
@@ -269,29 +269,31 @@ def get_tables_name_and_type( # pylint: disable=too-many-branches
269269
)
270270
continue
271271

272-
yield table_name, TableType.Regular, file_extension
272+
yield table_name, TableType.Regular, file_extension, file_size
273273

274274
def yield_table(
275-
self, table_name_and_type: Tuple[str, TableType, SupportedTypes]
275+
self, table_name_and_type: Tuple[str, TableType, SupportedTypes, Optional[int]]
276276
) -> Iterable[Either[CreateTableRequest]]:
277277
"""
278278
From topology.
279279
Prepare a table request and pass it to the sink.
280280
Uses first chunk only for schema inference to avoid loading entire file.
281281
"""
282-
table_name, table_type, table_extension = table_name_and_type
282+
table_name, table_type, table_extension, file_size = table_name_and_type
283283
schema_name = self.context.get().database_schema
284284
try:
285285
table_constraints = None
286286
data_frame, raw_data = fetch_dataframe_first_chunk(
287287
config_source=self.config_source,
288-
client=self.client._client,
288+
client=self.client.client,
289289
file_fqn=DatalakeTableSchemaWrapper(
290290
key=table_name,
291291
bucket_name=schema_name,
292292
file_extension=table_extension,
293+
file_size=file_size,
293294
),
294295
fetch_raw_data=True,
296+
session=getattr(self.client, "session", None),
295297
)
296298
if data_frame:
297299
data_frame = next(data_frame)

ingestion/src/metadata/readers/dataframe/avro.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
from metadata.readers.dataframe.base import DataFrameReader, FileFormatException
3434
from metadata.readers.dataframe.models import DatalakeColumnWrapper
3535
from metadata.readers.file.adls import return_azure_storage_options
36-
from metadata.readers.file.s3 import return_s3_storage_options
3736
from metadata.readers.models import ConfigSource
3837
from metadata.utils.constants import CHUNKSIZE
3938
from metadata.utils.logger import ingestion_logger
@@ -109,19 +108,18 @@ def _read_avro_dispatch(
109108
@_read_avro_dispatch.register
110109
def _(self, _: S3Config, key: str, bucket_name: str) -> DatalakeColumnWrapper:
111110
"""Stream Avro from S3 without loading entire file into memory."""
112-
from s3fs import S3FileSystem
113-
114-
storage_options = return_s3_storage_options(self.config_source)
115-
s3 = S3FileSystem(**storage_options)
116-
file_path = f"s3://{bucket_name}/{key}"
117-
118-
with s3.open(file_path, "rb") as f:
119-
columns = self._get_avro_columns(f)
111+
schema_response = self.client.get_object(Bucket=bucket_name, Key=key)
112+
try:
113+
columns = self._get_avro_columns(schema_response["Body"])
114+
finally:
115+
schema_response["Body"].close()
120116

121117
def chunk_generator():
122118
response = self.client.get_object(Bucket=bucket_name, Key=key)
123-
file_stream = response["Body"]
124-
yield from self._stream_avro_records(file_stream)
119+
try:
120+
yield from self._stream_avro_records(response["Body"])
121+
finally:
122+
response["Body"].close()
125123

126124
return DatalakeColumnWrapper(
127125
columns=columns,

ingestion/src/metadata/readers/dataframe/base.py

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -64,43 +64,44 @@ class DataFrameReader(ABC):
6464
config_source: ConfigSource
6565
reader: Reader
6666

67-
def __init__(self, config_source: ConfigSource, client: Optional[Any]):
67+
def __init__(
68+
self,
69+
config_source: ConfigSource,
70+
client: Optional[Any],
71+
session: Optional[Any] = None,
72+
):
6873
self.config_source = config_source
6974
self.client = client
75+
self.session = session
7076

7177
self.reader = get_reader(config_source=config_source, client=client)
7278

73-
def _get_file_size_mb(self, key: str, bucket_name: str) -> float:
79+
def _get_file_size_mb(
80+
self, key: str, bucket_name: str, file_size: Optional[int] = None
81+
) -> float:
7482
"""
7583
Get file size in MB. Returns 0 if unable to determine.
76-
Uses efficient HEAD operations from cloud providers.
84+
If file_size (bytes) is provided from listing metadata, uses that
85+
to avoid a redundant HEAD/info API call.
7786
"""
87+
if file_size is not None:
88+
return file_size / (1024 * 1024)
7889
try:
7990
if isinstance(self.config_source, S3Config):
8091
response = self.client.head_object(Bucket=bucket_name, Key=key)
8192
return response.get("ContentLength", 0) / (1024 * 1024)
8293

8394
elif isinstance(self.config_source, GCSConfig):
84-
from gcsfs import GCSFileSystem
85-
86-
gcs = GCSFileSystem()
87-
file_path = f"gs://{bucket_name}/{key}"
88-
file_info = gcs.info(file_path)
89-
return file_info.get("size", 0) / (1024 * 1024)
95+
bucket = self.client.get_bucket(bucket_name)
96+
blob = bucket.get_blob(key)
97+
return (blob.size or 0) / (1024 * 1024) if blob else 0
9098

9199
elif isinstance(self.config_source, AzureConfig):
92-
from adlfs import AzureBlobFileSystem
93-
94-
from metadata.readers.file.adls import return_azure_storage_options
95-
96-
storage_options = return_azure_storage_options(self.config_source)
97-
adlfs_fs = AzureBlobFileSystem(
98-
account_name=self.config_source.securityConfig.accountName,
99-
**storage_options,
100+
blob_client = self.client.get_blob_client(
101+
container=bucket_name, blob=key
100102
)
101-
file_path = f"{bucket_name}/{key}"
102-
file_info = adlfs_fs.info(file_path)
103-
return file_info.get("size", 0) / (1024 * 1024)
103+
props = blob_client.get_blob_properties()
104+
return (props.size or 0) / (1024 * 1024)
104105

105106
elif isinstance(self.config_source, LocalConfig):
106107
import os

ingestion/src/metadata/readers/dataframe/dsv.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
from metadata.readers.dataframe.base import DataFrameReader, FileFormatException
3535
from metadata.readers.dataframe.models import DatalakeColumnWrapper
3636
from metadata.readers.file.adls import AZURE_PATH, return_azure_storage_options
37-
from metadata.readers.file.s3 import return_s3_storage_options
3837
from metadata.readers.models import ConfigSource
3938
from metadata.utils.constants import CHUNKSIZE
4039
from metadata.utils.logger import ingestion_logger
@@ -116,9 +115,10 @@ def __init__(
116115
config_source: ConfigSource,
117116
client: Optional[Any],
118117
separator: str = CSV_SEPARATOR,
118+
session: Optional[Any] = None,
119119
):
120120
self.separator = separator
121-
super().__init__(config_source, client)
121+
super().__init__(config_source, client, session=session)
122122

123123
def read_from_pandas(
124124
self,
@@ -173,12 +173,32 @@ def _(self, _: GCSConfig, key: str, bucket_name: str) -> DatalakeColumnWrapper:
173173

174174
@_read_dsv_dispatch.register
175175
def _(self, _: S3Config, key: str, bucket_name: str) -> DatalakeColumnWrapper:
176+
import pandas as pd # pylint: disable=import-outside-toplevel
177+
176178
compression = "gzip" if key.endswith(".gz") else None
177179

178-
storage_options = return_s3_storage_options(self.config_source)
179-
path = f"s3://{bucket_name}/{key}"
180-
return self.read_from_pandas(
181-
path=path, storage_options=storage_options, compression=compression
180+
def chunk_generator():
181+
response = self.client.get_object(Bucket=bucket_name, Key=key)
182+
try:
183+
with pd.read_csv(
184+
response["Body"],
185+
sep=self.separator,
186+
chunksize=CHUNKSIZE,
187+
compression=compression,
188+
encoding_errors="ignore",
189+
escapechar="\\",
190+
) as reader:
191+
for chunks in reader:
192+
fixed = self._fix_malformed_quoted_chunk(
193+
chunk_list=[chunks], separator=self.separator
194+
)
195+
if fixed:
196+
yield fixed[0]
197+
finally:
198+
response["Body"].close()
199+
200+
return DatalakeColumnWrapper(
201+
dataframes=chunk_generator, columns=None, raw_data=None
182202
)
183203

184204
@_read_dsv_dispatch.register

ingestion/src/metadata/readers/dataframe/json.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,11 @@ def _is_json_lines(file_obj) -> bool:
158158
return False
159159

160160
def _read_json_smart(
161-
self, file_obj_getter, key: str, bucket_name: str
161+
self,
162+
file_obj_getter,
163+
key: str,
164+
bucket_name: str,
165+
file_size: Optional[int] = None,
162166
) -> DatalakeColumnWrapper:
163167
"""
164168
Smart JSON reading with automatic format detection and streaming.
@@ -179,7 +183,7 @@ def chunk_generator():
179183
dataframes=chunk_generator, raw_data=None, columns=None
180184
)
181185

182-
file_size_mb = self._get_file_size_mb(key, bucket_name)
186+
file_size_mb = self._get_file_size_mb(key, bucket_name, file_size=file_size)
183187
if file_size_mb > (MAX_FILE_SIZE_FOR_PREVIEW / (1024 * 1024)):
184188
logger.info(
185189
f"Large JSON file ({file_size_mb:.2f} MB). Streaming with ijson."
@@ -223,7 +227,9 @@ def get_stream():
223227
finally:
224228
response["Body"].close()
225229

226-
return self._read_json_smart(get_stream, key, bucket_name)
230+
return self._read_json_smart(
231+
get_stream, key, bucket_name, file_size=self._file_size
232+
)
227233

228234
@_read_json_dispatch.register
229235
def _(self, _: GCSConfig, key: str, bucket_name: str) -> DatalakeColumnWrapper:
@@ -271,7 +277,10 @@ def get_stream():
271277

272278
return self._read_json_smart(get_stream, key, bucket_name)
273279

274-
def _read(self, *, key: str, bucket_name: str, **__) -> DatalakeColumnWrapper:
280+
def _read(
281+
self, *, key: str, bucket_name: str, file_size: Optional[int] = None, **__
282+
) -> DatalakeColumnWrapper:
283+
self._file_size = file_size
275284
return self._read_json_dispatch(
276285
self.config_source, key=key, bucket_name=bucket_name
277286
)

0 commit comments

Comments
 (0)