Skip to content

Commit a2cf8d2

Browse files
harshachgithub-actions[bot]ulixius9
authored
Storage Service Connect , path specification (#27408)
* Storage Service Connect , path specification and manifest file deprecation * Storage Service Connect , path specification and manifest file deprecation * Update generated TypeScript types * Storage Service Connect , path specification and manifest file deprecation * Storage Service Connect , path specification and manifest file deprecation * Update generated TypeScript types * Address comments * Address comments * Add tests * Add tests * Update generated TypeScript types * Add UI support for manifest config * Improve manifest support for storage services * Address comments * Fix #24823: containerFilterPattern and include/exclude Not work * Update generated TypeScript types * Address commments * Address commments * fix partiion column type conversion error * Address comments * Update generated TypeScript types * Address comments on manifest widget * address comments --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: ulixius9 <mayursingal9@gmail.com>
1 parent 1905f6e commit a2cf8d2

46 files changed

Lines changed: 6343 additions & 1213 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

ingestion/src/metadata/ingestion/models/custom_pydantic.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,27 @@ def handle_secret(value: Any, handler, info: SerializationInfo) -> str:
215215
CustomSecretStr = Annotated[_CustomSecretStr, WrapSerializer(handle_secret)]
216216

217217

218+
def format_validation_error(exc: Exception) -> str:
219+
"""Render a Pydantic ``ValidationError`` (v2) as a compact one-liner
220+
suitable for log messages and workflow status warnings.
221+
222+
Each field error becomes ``field.path: message``, joined by ``; ``.
223+
Falls back to ``str(exc)`` for non-Pydantic exceptions so callers
224+
don't need to type-check.
225+
226+
Example output::
227+
228+
entries.0.dataPath: Field required; entries.1.structureFormat: Input should be a valid string
229+
"""
230+
errors = getattr(exc, "errors", None)
231+
if callable(errors):
232+
return "; ".join(
233+
f"{'.'.join(str(p) for p in err.get('loc', ()))}: {err.get('msg', '')}"
234+
for err in errors()
235+
)
236+
return str(exc)
237+
238+
218239
def ignore_type_decoder(type_: Any) -> None:
219240
"""Given a type_, add a custom decoder to the BaseModel
220241
to ignore any decoding errors for that type_."""

ingestion/src/metadata/ingestion/source/storage/s3/metadata.py

Lines changed: 134 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from metadata.generated.schema.type.tagLabel import TagLabel
4747
from metadata.ingestion.api.models import Either
4848
from metadata.ingestion.api.steps import InvalidSourceException
49+
from metadata.ingestion.models.custom_pydantic import format_validation_error
4950
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
5051
from metadata.ingestion.ometa.ometa_api import OpenMetadata
5152
from metadata.ingestion.source.storage.s3.models import (
@@ -65,6 +66,7 @@
6566
from metadata.utils.filters import filter_by_container
6667
from metadata.utils.logger import ingestion_logger
6768
from metadata.utils.s3_utils import list_s3_objects
69+
from metadata.utils.storage_utils import COLD_STORAGE_CLASSES, is_excluded_artifact
6870
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_label
6971

7072
logger = ingestion_logger()
@@ -130,38 +132,27 @@ def get_containers(self) -> Iterable[S3ContainerDetails]:
130132
parent_entity: EntityReference = EntityReference(
131133
id=self._bucket_cache[bucket_name].id.root, type="container"
132134
)
133-
if self.global_manifest:
134-
manifest_entries_for_current_bucket = (
135-
self._manifest_entries_to_metadata_entries_by_container(
136-
container_name=bucket_name, manifest=self.global_manifest
137-
)
135+
manifest_entries = self._resolve_manifest_entries(bucket_name)
136+
if manifest_entries:
137+
expanded_entries = self.expand_entries(
138+
bucket_name=bucket_name, entries=manifest_entries
139+
)
140+
# Apply containerFilterPattern + default Spark-artifact
141+
# excludes to the concrete paths *before* we attempt to
142+
# list sample files / infer schema. Prevents Issue #24823
143+
# where entries like ``_SUCCESS`` or user-excluded paths
144+
# would still be processed.
145+
filtered_entries = self.filter_manifest_entries(
146+
bucket_name=bucket_name, entries=expanded_entries
138147
)
139-
# Check if we have entries in the manifest file belonging to this bucket
140-
if manifest_entries_for_current_bucket:
141-
# ingest all the relevant valid paths from it
142-
yield from self._generate_structured_containers(
143-
bucket_response=bucket_response,
144-
entries=manifest_entries_for_current_bucket,
145-
parent=parent_entity,
146-
)
147-
yield from self._generate_unstructured_containers(
148-
bucket_response=bucket_response,
149-
entries=manifest_entries_for_current_bucket,
150-
parent=parent_entity,
151-
)
152-
# nothing else do to for the current bucket, skipping to the next
153-
continue
154-
# If no global file, or no valid entries in the manifest, check for bucket level metadata file
155-
metadata_config = self._load_metadata_file(bucket_name=bucket_name)
156-
if metadata_config:
157148
yield from self._generate_structured_containers(
158149
bucket_response=bucket_response,
159-
entries=metadata_config.entries,
150+
entries=filtered_entries,
160151
parent=parent_entity,
161152
)
162153
yield from self._generate_unstructured_containers(
163154
bucket_response=bucket_response,
164-
entries=metadata_config.entries,
155+
entries=filtered_entries,
165156
parent=parent_entity,
166157
)
167158

@@ -358,8 +349,7 @@ def _generate_structured_containers_by_depth(
358349
if entry
359350
and entry.get("Key")
360351
and len(entry.get("Key").split("/")) > total_depth
361-
and "/_delta_log/" not in entry.get("Key")
362-
and not entry.get("Key").endswith("/_SUCCESS")
352+
and not is_excluded_artifact(entry.get("Key"))
363353
}
364354
for key in candidate_keys:
365355
metadata_entry_copy = deepcopy(metadata_entry)
@@ -563,6 +553,24 @@ def _generate_unstructured_containers(
563553
),
564554
)
565555

556+
def list_keys(self, bucket_name: str, prefix: str) -> Iterable[Tuple[str, int]]:
557+
"""List (key, size_bytes) for all files under prefix.
558+
559+
Filters out directories, cold storage objects, and Spark/Delta
560+
sentinel artifacts (``_SUCCESS``, ``*.crc``, ``_committed_*``,
561+
etc.) so they never participate in glob matching or grouping.
562+
"""
563+
for obj in list_s3_objects(self.s3_client, Bucket=bucket_name, Prefix=prefix):
564+
key = obj.get("Key", "")
565+
if not key or key.endswith("/"):
566+
continue
567+
storage_class = obj.get("StorageClass", "STANDARD")
568+
if storage_class in COLD_STORAGE_CLASSES:
569+
continue
570+
if is_excluded_artifact(key):
571+
continue
572+
yield key, obj.get("Size", 0)
573+
566574
def fetch_buckets(self) -> List[S3BucketResponse]:
567575
results: List[S3BucketResponse] = []
568576
try:
@@ -678,36 +686,57 @@ def _get_sample_file_path(
678686
self, bucket_name: str, metadata_entry: MetadataEntry
679687
) -> Optional[str]:
680688
"""
681-
Given a bucket and a metadata entry, returns the full path key to a file which can then be used to infer schema
682-
or None in the case of a non-structured metadata entry, or if no such keys can be found
689+
Given a bucket and a metadata entry, returns the full path key to a
690+
file which can then be used to infer schema, or None if no suitable
691+
file exists.
692+
693+
Spark/Delta artifacts (``_SUCCESS``, ``_SUCCESS.crc``,
694+
``_delta_log``, ``_temporary``, ``_spark_metadata``, ``.tmp``,
695+
``_committed_*``, ``_started_*``) are always skipped — these
696+
sentinel files are commonly 0-byte or non-parquet and would
697+
crash the schema-inference readers (see Issue #24823).
698+
699+
The entry's ``structureFormat`` (if set) is used to prefer a
700+
matching extension so a ``.parquet`` table is not sampled from
701+
a neighbouring ``.csv`` or ``.crc`` file.
683702
"""
684703
prefix = self._get_sample_file_prefix(metadata_entry=metadata_entry)
685-
# this will look only in the first 1000 files under that path (default for list_objects_v2).
686-
# We'd rather not do pagination here as it would incur unwanted costs
704+
if not prefix:
705+
return None
706+
707+
# this will look only in the first 1000 files under that path
708+
# (default for list_objects_v2). Pagination would incur unwanted costs.
687709
try:
688-
if prefix:
689-
response = self.s3_client.list_objects_v2(
690-
Bucket=bucket_name, Prefix=prefix
691-
)
692-
candidate_keys = [
693-
entry["Key"]
694-
for entry in response[S3_CLIENT_ROOT_RESPONSE]
695-
if entry
696-
and entry.get("Key")
697-
and not entry.get("Key").endswith("/")
698-
and "/_delta_log/" not in entry.get("Key")
699-
and not entry.get("Key").endswith("/_SUCCESS")
700-
]
701-
# pick a random key out of the candidates if any were returned
702-
if candidate_keys:
703-
result_key = secrets.choice(candidate_keys)
704-
logger.info(
705-
f"File {result_key} was picked to infer data structure from."
706-
)
707-
return result_key
708-
logger.warning(
709-
f"No sample files found in {prefix} with {metadata_entry.structureFormat} extension"
710+
response = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
711+
all_keys = [
712+
entry["Key"]
713+
for entry in response.get(S3_CLIENT_ROOT_RESPONSE, []) or []
714+
if entry
715+
and entry.get("Key")
716+
and not entry.get("Key").endswith("/")
717+
and not is_excluded_artifact(entry.get("Key"))
718+
]
719+
# Prefer files that match the requested structureFormat
720+
# extension when one is set; fall back to any remaining file
721+
# if none match (some tables write parquet with uncommon
722+
# extensions like .pq / .parq).
723+
fmt = (metadata_entry.structureFormat or "").strip().lower()
724+
if fmt:
725+
preferred = [k for k in all_keys if k.lower().endswith("." + fmt)]
726+
candidate_keys = preferred or all_keys
727+
else:
728+
candidate_keys = all_keys
729+
730+
if candidate_keys:
731+
result_key = secrets.choice(candidate_keys)
732+
logger.info(
733+
f"File {result_key} was picked to infer data structure from."
710734
)
735+
return result_key
736+
logger.warning(
737+
f"No sample files found in {prefix} with "
738+
f"{metadata_entry.structureFormat} extension"
739+
)
711740
return None
712741
except Exception:
713742
logger.debug(traceback.format_exc())
@@ -790,27 +819,66 @@ def _get_object_source_url(self, bucket_name: str, prefix: str) -> Optional[str]
790819

791820
def _load_metadata_file(self, bucket_name: str) -> Optional[StorageContainerConfig]:
792821
"""
793-
Load the metadata template file from the root of the bucket, if it exists
822+
Load the metadata template file from the root of the bucket, if it exists.
823+
824+
Errors are distinguished so users can diagnose why a bucket was not
825+
registered:
826+
827+
- Missing file → logged at INFO (expected when no manifest is used)
828+
- JSON syntax error → WARNING with line/column
829+
- Schema validation error (e.g. missing required field, wrong type) →
830+
WARNING with Pydantic's per-field message
831+
- Any other error → WARNING with the exception repr
832+
833+
All non-missing errors are also recorded on the workflow ``status``
834+
so they show up in the Ingestion tab alongside other warnings.
794835
"""
836+
manifest_uri = f"s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}"
795837
try:
796-
logger.info(
797-
f"Looking for metadata template file at - s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}"
798-
)
838+
logger.info(f"Looking for metadata template file at - {manifest_uri}")
799839
response_object = self.s3_reader.read(
800840
path=OPENMETADATA_TEMPLATE_FILE_NAME,
801841
bucket_name=bucket_name,
802842
verbose=False,
803843
)
844+
except ReadException:
845+
logger.info(
846+
f"No manifest file found at {manifest_uri} — falling back to "
847+
f"defaultManifest / global manifest if configured."
848+
)
849+
return None
850+
851+
try:
804852
content = json.loads(response_object)
853+
except json.JSONDecodeError as exc:
854+
msg = (
855+
f"Bucket manifest {manifest_uri} is not valid JSON "
856+
f"(line {exc.lineno}, column {exc.colno}): {exc.msg}. "
857+
f"This bucket will use the defaultManifest fallback if one is "
858+
f"configured; otherwise no nested containers will be ingested."
859+
)
860+
logger.warning(msg)
861+
self.status.warning(bucket_name, msg)
862+
return None
863+
864+
try:
805865
metadata_config = StorageContainerConfig.model_validate(content)
806-
return metadata_config
807-
except ReadException:
808-
logger.warning(
809-
f"No metadata file found at s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}"
866+
except ValidationError as exc:
867+
msg = (
868+
f"Bucket manifest {manifest_uri} does not match the expected "
869+
f"schema: {format_validation_error(exc)}. This bucket will use the defaultManifest "
870+
f"fallback if one is configured; otherwise no nested "
871+
f"containers will be ingested."
810872
)
811-
except Exception as exc:
873+
logger.warning(msg)
874+
self.status.warning(bucket_name, msg)
875+
return None
876+
except Exception as exc: # pragma: no cover — defensive
812877
logger.debug(traceback.format_exc())
813-
logger.warning(
814-
f"Failed loading metadata file s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}-{exc}"
815-
)
816-
return None
878+
msg = f"Unexpected error loading manifest {manifest_uri}: {exc}"
879+
logger.warning(msg)
880+
self.status.warning(bucket_name, msg)
881+
return None
882+
883+
logger.info(f"Loaded bucket-level manifest from {manifest_uri}")
884+
return metadata_config

0 commit comments

Comments
 (0)