Skip to content

Commit be1ea8f

Browse files
Merge branch 'main' into feature/22644-iceberg-gcp-support
2 parents 6f83140 + 7e7ed3b commit be1ea8f

79 files changed

Lines changed: 4381 additions & 1148 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/auto-cherry-pick-labeled-prs.yaml

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,49 +16,61 @@ permissions:
1616
env:
1717
CURRENT_RELEASE_ENDPOINT: ${{ vars.CURRENT_RELEASE_ENDPOINT }} # Endpoint that returns the current release version in json format
1818
jobs:
19-
cherry_pick_to_release_branch:
19+
get_release_branch:
2020
if: github.event.pull_request.merged == true &&
2121
contains(github.event.pull_request.labels.*.name, 'To release')
22+
runs-on: ubuntu-latest
23+
outputs:
24+
release_branches: ${{ steps.get_release_version.outputs.release_branches }}
25+
steps:
26+
- name: Get the release version
27+
id: get_release_version
28+
run: |
29+
CURRENT_RELEASE=$(curl -s $CURRENT_RELEASE_ENDPOINT | jq -c '.collate_branches // []')
30+
echo "release_branches=${CURRENT_RELEASE}" >> $GITHUB_OUTPUT
31+
32+
cherry_pick_to_release_branch:
33+
needs: get_release_branch
34+
if: needs.get_release_branch.outputs.release_branches != '' && needs.get_release_branch.outputs.release_branches != '[]'
2235
runs-on: ubuntu-latest # Running it on ubuntu-latest on purpose (we're not using all the free minutes)
36+
strategy:
37+
fail-fast: false
38+
matrix:
39+
branch: ${{ fromJson(needs.get_release_branch.outputs.release_branches) }}
2340
steps:
2441
- name: Checkout main branch
2542
uses: actions/checkout@v4
2643
with:
2744
ref: main
2845
fetch-depth: 0
29-
- name: Get the release version
30-
id: get_release_version
31-
run: |
32-
CURRENT_RELEASE=$(curl -s $CURRENT_RELEASE_ENDPOINT | jq -r .om_branch)
33-
echo "CURRENT_RELEASE=${CURRENT_RELEASE}" >> $GITHUB_ENV
3446
- name: Cherry-pick changes from PR
3547
id: cherry_pick
3648
continue-on-error: true
3749
run: |
3850
git config --global user.email "release-bot@open-metadata.org"
3951
git config --global user.name "OpenMetadata Release Bot"
40-
git fetch origin ${CURRENT_RELEASE}
41-
git checkout ${CURRENT_RELEASE}
52+
git fetch origin ${{ matrix.branch }}
53+
git checkout ${{ matrix.branch }}
4254
git cherry-pick -x ${{ github.event.pull_request.merge_commit_sha }}
4355
- name: Push changes to release branch
4456
id: push_changes
4557
continue-on-error: true
4658
if: steps.cherry_pick.outcome == 'success'
4759
run: |
48-
git push origin ${CURRENT_RELEASE}
60+
git push origin ${{ matrix.branch }}
4961
- name: Post a comment on failure
5062
if: steps.cherry_pick.outcome != 'success' || steps.push_changes.outcome != 'success'
5163
uses: actions/github-script@v7
5264
with:
5365
script: |
5466
const prNumber = context.payload.pull_request.number;
55-
const releaseVersion = process.env.CURRENT_RELEASE;
67+
const releaseBranch = '${{ matrix.branch }}';
5668
const workflowRunUrl = `${process.env.GITHUB_SERVER_URL}/${process.env.GITHUB_REPOSITORY}/actions/runs/${process.env.GITHUB_RUN_ID}`;
5769
github.rest.issues.createComment({
5870
owner: context.repo.owner,
5971
repo: context.repo.repo,
6072
issue_number: prNumber,
61-
body: `Failed to cherry-pick changes to the ${releaseVersion} branch.
73+
body: `Failed to cherry-pick changes to the ${releaseBranch} branch.
6274
Please cherry-pick the changes manually.
6375
You can find more details [here](${workflowRunUrl}).`
6476
})
@@ -68,10 +80,10 @@ jobs:
6880
with:
6981
script: |
7082
const prNumber = context.payload.pull_request.number;
71-
const releaseVersion = process.env.CURRENT_RELEASE;
83+
const releaseBranch = '${{ matrix.branch }}';
7284
github.rest.issues.createComment({
7385
owner: context.repo.owner,
7486
repo: context.repo.repo,
7587
issue_number: prNumber,
76-
body: `Changes have been cherry-picked to the ${releaseVersion} branch.`
88+
body: `Changes have been cherry-picked to the ${releaseBranch} branch.`
7789
})

bootstrap/sql/migrations/native/1.13.0/mysql/postDataMigrationSQLScript.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ UPDATE glossary_term_entity
8080
SET json = JSON_REMOVE(json, '$.relatedTerms')
8181
WHERE JSON_EXTRACT(json, '$.relatedTerms') IS NOT NULL;
8282

83+
-- entity_extension version snapshots: handled by Java migration
84+
-- migrateGlossaryTermVersionRelatedTermsToTermRelation (transforms in place to preserve history).
85+
8386
-- Backfill conceptMappings for existing glossary terms
8487
UPDATE glossary_term_entity
8588
SET json = JSON_SET(COALESCE(json, '{}'), '$.conceptMappings', JSON_ARRAY())

bootstrap/sql/migrations/native/1.13.0/postgres/postDataMigrationSQLScript.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ UPDATE glossary_term_entity
8282
SET json = (json::jsonb - 'relatedTerms')::json
8383
WHERE jsonb_exists(json::jsonb, 'relatedTerms');
8484

85+
-- entity_extension version snapshots: handled by Java migration
86+
-- migrateGlossaryTermVersionRelatedTermsToTermRelation (transforms in place to preserve history).
87+
8588
-- Backfill conceptMappings for existing glossary terms
8689
UPDATE glossary_term_entity
8790
SET json = jsonb_set(COALESCE(json::jsonb, '{}'::jsonb), '{conceptMappings}', '[]'::jsonb)

ingestion/src/metadata/ingestion/source/database/redshift/utils.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import re
1616
from collections import defaultdict
17+
from typing import Any
1718

1819
import sqlalchemy as sa
1920
from packaging.version import Version
@@ -66,7 +67,7 @@ def _redshift_initialize(self, connection):
6667
self._has_native_hstore = False
6768

6869

69-
def _load_domains(self, connection, **kw):
70+
def _load_domains(self, connection, schema: str | None = None, **kw: Any) -> dict:
7071
"""
7172
Override to return empty dict since Redshift does not support user-created
7273
domains and pg_catalog.pg_collation does not exist in Redshift, causing a
@@ -85,7 +86,15 @@ def get_temp_table_names(self, connection, schema=None, **kw):
8586
return []
8687

8788

88-
def get_multi_columns(self, connection, **kw):
89+
def get_multi_columns(
90+
self,
91+
connection,
92+
schema: str | None = None,
93+
filter_names: Any | None = None,
94+
scope: Any | None = None,
95+
kind: Any | None = None,
96+
**kw: Any,
97+
):
8998
"""
9099
Override PGDialect's get_multi_columns to avoid querying
91100
pg_attribute.attcollation which does not exist in Redshift.

ingestion/src/metadata/profiler/orm/converter/redshift/converter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
to an SQLAlchemy ORM class.
1515
"""
1616

17-
from typing import Dict, Set # noqa: UP035
17+
from typing import Dict, Set, cast # noqa: UP035
1818

1919
from sqlalchemy.sql.sqltypes import TypeEngine
2020

@@ -45,5 +45,5 @@ def map_sqa_to_om_types() -> Dict[TypeEngine, Set[DataType]]: # noqa: UP006
4545

4646
return {
4747
**CommonMapTypes.map_sqa_to_om_types(),
48-
GEOMETRY: {DataType.GEOMETRY},
48+
cast("TypeEngine", GEOMETRY): {DataType.GEOMETRY},
4949
}

ingestion/src/metadata/utils/datalake/datalake_utils.py

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@
3434
logger = utils_logger()
3535

3636

37+
class _ArrayOfStruct:
38+
"""Marker for a JSON value observed as a list of dicts. Carries the merged struct shape
39+
so downstream column construction can render it as ARRAY<STRUCT<...>>."""
40+
41+
__slots__ = ("struct",)
42+
43+
def __init__(self, struct: Dict): # noqa: UP006
44+
self.struct = struct
45+
46+
3747
def fetch_dataframe_generator(
3848
config_source,
3949
client,
@@ -317,6 +327,10 @@ def _get_columns(cls, data_frame: "DataFrame"): # noqa: F821
317327
}
318328
if data_type == DataType.ARRAY:
319329
parsed_string["arrayDataType"] = DataType.UNKNOWN
330+
struct_children = cls._get_array_struct_children(data_frame[column].dropna()[:100])
331+
if struct_children:
332+
parsed_string["arrayDataType"] = DataType.STRUCT
333+
parsed_string["children"] = struct_children
320334

321335
if data_type == DataType.JSON:
322336
parsed_string["children"] = cls.get_children(data_frame[column].dropna()[:100])
@@ -418,6 +432,11 @@ def unique_json_structure(cls, dicts: List[Dict]) -> Dict: # noqa: UP006
418432
result[key] = cls.unique_json_structure(
419433
[nested_json if isinstance(nested_json, dict) else {}, value]
420434
)
435+
elif isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
436+
merged_struct = cls.unique_json_structure(value)
437+
existing = result.get(key)
438+
existing_struct = existing.struct if isinstance(existing, _ArrayOfStruct) else {}
439+
result[key] = _ArrayOfStruct(cls.unique_json_structure([existing_struct, merged_struct]))
421440
else:
422441
result[key] = value
423442
return result
@@ -432,13 +451,19 @@ def construct_json_column_children(cls, json_column: Dict) -> List[Dict]: # noq
432451
children = []
433452
for key, value in json_column.items():
434453
column = {}
435-
type_ = type(value).__name__.lower()
436-
column["dataTypeDisplay"] = cls._data_formats.get(type_, DataType.UNKNOWN).value
437-
column["dataType"] = cls._data_formats.get(type_, DataType.UNKNOWN).value
438454
column["name"] = truncate_column_name(key)
439455
column["displayName"] = key
440-
if isinstance(value, dict):
441-
column["children"] = cls.construct_json_column_children(value)
456+
if isinstance(value, _ArrayOfStruct):
457+
column["dataType"] = DataType.ARRAY.value
458+
column["dataTypeDisplay"] = DataType.ARRAY.value
459+
column["arrayDataType"] = DataType.STRUCT
460+
column["children"] = cls.construct_json_column_children(value.struct)
461+
else:
462+
type_ = type(value).__name__.lower()
463+
column["dataTypeDisplay"] = cls._data_formats.get(type_, DataType.UNKNOWN).value
464+
column["dataType"] = cls._data_formats.get(type_, DataType.UNKNOWN).value
465+
if isinstance(value, dict):
466+
column["children"] = cls.construct_json_column_children(value)
442467
children.append(column)
443468

444469
return children
@@ -466,6 +491,27 @@ def get_children(cls, json_column) -> List[Dict]: # noqa: UP006
466491

467492
return cls.construct_json_column_children(json_structure)
468493

494+
@classmethod
495+
def _get_array_struct_children(cls, array_column: Any) -> List[Dict]: # noqa: UP006
496+
"""For an ARRAY column whose elements are dicts, infer the merged struct shape and
497+
return it as children. Returns an empty list when elements are not dicts.
498+
"""
499+
flattened = []
500+
for value in array_column.values.tolist():
501+
if isinstance(value, str):
502+
try:
503+
value = json.loads(value) # noqa: PLW2901
504+
except (TypeError, ValueError):
505+
continue
506+
if isinstance(value, dict):
507+
flattened.append(value)
508+
elif isinstance(value, list):
509+
flattened.extend(item for item in value if isinstance(item, dict))
510+
if not flattened:
511+
return []
512+
merged_struct = cls.unique_json_structure(flattened)
513+
return cls.construct_json_column_children(merged_struct)
514+
469515

470516
# pylint: disable=import-outside-toplevel
471517
class ParquetDataFrameColumnParser:

ingestion/tests/unit/utils/test_datalake.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,63 @@ def test_construct_column(self):
167167
for el in zip(expected, actual): # noqa: B905
168168
self.assertDictEqual(el[0], el[1])
169169

170+
def test_unique_json_structure_with_list_of_dicts(self):
171+
"""list-of-dicts values are merged into a struct shape (e.g. Iceberg `schema.fields`)."""
172+
sample_data = [
173+
{
174+
"schema": {
175+
"fields": [
176+
{"id": 1, "name": "customer_id", "type": "string"},
177+
{"id": 2, "name": "customer_type_cd", "type": "string"},
178+
]
179+
}
180+
}
181+
]
182+
183+
actual = GenericDataFrameColumnParser.unique_json_structure(sample_data)
184+
fields_value = actual["schema"]["fields"]
185+
186+
from metadata.utils.datalake.datalake_utils import _ArrayOfStruct
187+
188+
assert isinstance(fields_value, _ArrayOfStruct)
189+
assert set(fields_value.struct.keys()) == {"id", "name", "type"}
190+
191+
def test_unique_json_structure_merges_list_of_dicts_across_samples(self):
192+
"""list-of-dicts values across multiple samples are unioned, not overwritten."""
193+
from metadata.utils.datalake.datalake_utils import _ArrayOfStruct
194+
195+
sample_data = [
196+
{"schema": {"fields": [{"id": 1, "name": "customer_id", "type": "string"}]}},
197+
{"schema": {"fields": [{"id": 2, "required": False, "type": "string"}]}},
198+
{"schema": {"fields": [{"description": "ciam id"}]}},
199+
]
200+
201+
actual = GenericDataFrameColumnParser.unique_json_structure(sample_data)
202+
fields_value = actual["schema"]["fields"]
203+
204+
assert isinstance(fields_value, _ArrayOfStruct)
205+
assert set(fields_value.struct.keys()) == {"id", "name", "type", "required", "description"}
206+
207+
def test_construct_column_with_array_of_struct(self):
208+
"""list-of-dicts values render as ARRAY<STRUCT<...>> with children for the struct fields."""
209+
structure = {
210+
"schema": {
211+
"fields": [
212+
{"id": 1, "name": "customer_id", "type": "string"},
213+
{"id": 2, "name": "ciam_id", "type": "string"},
214+
]
215+
}
216+
}
217+
merged = GenericDataFrameColumnParser.unique_json_structure([structure])
218+
children = GenericDataFrameColumnParser.construct_json_column_children(merged)
219+
220+
schema_col = children[0]
221+
fields_col = next(c for c in schema_col["children"] if c["name"] == "fields")
222+
223+
assert fields_col["dataType"] == DataType.ARRAY.value
224+
assert fields_col["arrayDataType"] == DataType.STRUCT
225+
assert {child["name"] for child in fields_col["children"]} == {"id", "name", "type"}
226+
170227
def test_create_column_object(self):
171228
"""test create column object fn"""
172229
formatted_column = GenericDataFrameColumnParser.construct_json_column_children(STRUCTURE)

0 commit comments

Comments
 (0)