Skip to content

Commit 2a0a512

Browse files
sonika-shahOpenMetadata Release Bot
authored andcommitted
Add source filter and indexed hash prefix to cert tag batch query (#27847)
* Add source filter and use indexed hash prefix in cert tag batch query The certification tag batch query (TagUsageDAO.getCertTagsInternalBatch) was hitting ~12 seconds per call on instances with deep classification hierarchies — fired ~5,800 times per Data Insights run, contributing ~19 hrs of cumulative DB time per DI run. Two missing index-friendly predicates caused the slowness: 1. No `source = ?` filter — couldn't use idx_tag_usage_target_exact (source, targetFQNHash, state) INCLUDE (tagFQN, labelType) whose covering INCLUDE has tagFQN. 2. `tagFQN LIKE 'Certification.%'` on the raw column — there's no LIKE-friendly index on raw tagFQN, only on tagfqn_lower text_pattern_ops and tagFQNHash. The LIKE always ran as a post-filter on every row the IN clause returned. Fix: - Add `source = :source` filter (Certifications are always Classification source = 0). - Switch `tagFQN LIKE :tagFQNPrefix` → `tagFQNHash LIKE :tagFQNHashPrefix`, with the hash prefix pre-computed via FullyQualifiedName.buildHash so the query hits the indexed hash column. Same SQL on MySQL and Postgres — no @ConnectionAwareSqlQuery split needed. Also a correctness improvement: the `source = 0` filter excludes glossary terms (source = 1) that happen to have FQNs starting with "Certification.". Previously such glossary terms could be incorrectly returned as certifications; now they're excluded as expected. Test: - Added test_certBatch_bulkFetchReturnsCorrectCertsPerEntity in TagResourceIT — exercises the bulk fetch path with three schemas (cert-tagged / untagged / non-cert-tagged) and asserts each gets the right certification (or null) in the listed response. Locks in source-filter correctness and prevents future regressions where a non-cert tag could leak into the certification field. * Fix duplicate schema names in cert batch test, trim verbose comments * Update EntityRepositoryCertificationTest mocks for new getCertTagsInternalBatch signature * fix check style (cherry picked from commit 4a2f42f)
1 parent cc454b3 commit 2a0a512

4 files changed

Lines changed: 126 additions & 14 deletions

File tree

openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/TagResourceIT.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,6 +1444,104 @@ void test_certificationTagNotLeakingIntoTagsField(TestNamespace ns) {
14441444
"LIST (batch): regular tag must still be present in tags field");
14451445
}
14461446

1447+
@Test
1448+
void test_certBatch_bulkFetchReturnsCorrectCertsPerEntity(TestNamespace ns) {
1449+
OpenMetadataClient client = SdkClients.adminClient();
1450+
1451+
org.openmetadata.schema.entity.classification.Classification certClassification =
1452+
client.classifications().getByName("Certification", null);
1453+
assertNotNull(certClassification, "Certification classification must exist");
1454+
1455+
CreateTag createCertTag = new CreateTag();
1456+
createCertTag.setName(ns.shortPrefix("cert_bulk_tag"));
1457+
createCertTag.setClassification(certClassification.getFullyQualifiedName());
1458+
createCertTag.setDescription("Cert tag for bulk fetch test");
1459+
Tag certTag = SdkClients.adminClient().tags().create(createCertTag);
1460+
1461+
org.openmetadata.schema.entity.classification.Classification regularClassification =
1462+
createClassification(ns);
1463+
CreateTag createRegularTag = new CreateTag();
1464+
createRegularTag.setName(ns.shortPrefix("regular_bulk_tag"));
1465+
createRegularTag.setClassification(regularClassification.getFullyQualifiedName());
1466+
createRegularTag.setDescription("Non-cert tag for bulk fetch test");
1467+
Tag regularTag = SdkClients.adminClient().tags().create(createRegularTag);
1468+
1469+
org.openmetadata.schema.entity.services.DatabaseService dbService =
1470+
createDatabaseService(ns, "cert_bulk_svc");
1471+
org.openmetadata.schema.entity.data.Database db =
1472+
createDatabase(ns, dbService.getFullyQualifiedName());
1473+
1474+
DatabaseSchema schemaWithCert =
1475+
createDatabaseSchemaNamed(ns, db.getFullyQualifiedName(), "cert_bulk_with");
1476+
DatabaseSchema schemaWithoutCert =
1477+
createDatabaseSchemaNamed(ns, db.getFullyQualifiedName(), "cert_bulk_without");
1478+
DatabaseSchema schemaWithRegularTag =
1479+
createDatabaseSchemaNamed(ns, db.getFullyQualifiedName(), "cert_bulk_regular");
1480+
1481+
org.openmetadata.schema.type.TagLabel certTagLabel =
1482+
new org.openmetadata.schema.type.TagLabel()
1483+
.withTagFQN(certTag.getFullyQualifiedName())
1484+
.withSource(org.openmetadata.schema.type.TagLabel.TagSource.CLASSIFICATION)
1485+
.withLabelType(org.openmetadata.schema.type.TagLabel.LabelType.MANUAL);
1486+
schemaWithCert.setCertification(new AssetCertification().withTagLabel(certTagLabel));
1487+
client.databaseSchemas().update(schemaWithCert.getId().toString(), schemaWithCert);
1488+
1489+
org.openmetadata.schema.type.TagLabel regularTagLabel =
1490+
new org.openmetadata.schema.type.TagLabel()
1491+
.withTagFQN(regularTag.getFullyQualifiedName())
1492+
.withSource(org.openmetadata.schema.type.TagLabel.TagSource.CLASSIFICATION)
1493+
.withLabelType(org.openmetadata.schema.type.TagLabel.LabelType.MANUAL);
1494+
schemaWithRegularTag.setTags(List.of(regularTagLabel));
1495+
client.databaseSchemas().update(schemaWithRegularTag.getId().toString(), schemaWithRegularTag);
1496+
1497+
org.openmetadata.sdk.models.ListParams listParams =
1498+
new org.openmetadata.sdk.models.ListParams()
1499+
.setDatabase(db.getFullyQualifiedName())
1500+
.setFields("certification");
1501+
org.openmetadata.sdk.models.ListResponse<DatabaseSchema> listed =
1502+
client.databaseSchemas().list(listParams);
1503+
assertNotNull(listed.getData());
1504+
1505+
DatabaseSchema listedWithCert =
1506+
listed.getData().stream()
1507+
.filter(s -> s.getId().equals(schemaWithCert.getId()))
1508+
.findFirst()
1509+
.orElse(null);
1510+
DatabaseSchema listedWithoutCert =
1511+
listed.getData().stream()
1512+
.filter(s -> s.getId().equals(schemaWithoutCert.getId()))
1513+
.findFirst()
1514+
.orElse(null);
1515+
DatabaseSchema listedWithRegularTag =
1516+
listed.getData().stream()
1517+
.filter(s -> s.getId().equals(schemaWithRegularTag.getId()))
1518+
.findFirst()
1519+
.orElse(null);
1520+
1521+
assertNotNull(listedWithCert);
1522+
assertNotNull(listedWithoutCert);
1523+
assertNotNull(listedWithRegularTag);
1524+
1525+
assertNotNull(listedWithCert.getCertification(), "cert-tagged schema: certification missing");
1526+
assertEquals(
1527+
certTag.getFullyQualifiedName(),
1528+
listedWithCert.getCertification().getTagLabel().getTagFQN());
1529+
1530+
assertNull(listedWithoutCert.getCertification(), "untagged schema: false-positive cert");
1531+
assertNull(
1532+
listedWithRegularTag.getCertification(),
1533+
"non-cert tag from another classification leaked as certification");
1534+
}
1535+
1536+
private org.openmetadata.schema.entity.data.DatabaseSchema createDatabaseSchemaNamed(
1537+
TestNamespace ns, String databaseFqn, String name) {
1538+
org.openmetadata.schema.api.data.CreateDatabaseSchema createSchema =
1539+
new org.openmetadata.schema.api.data.CreateDatabaseSchema();
1540+
createSchema.setName(ns.shortPrefix(name));
1541+
createSchema.setDatabase(databaseFqn);
1542+
return SdkClients.adminClient().databaseSchemas().create(createSchema);
1543+
}
1544+
14471545
@Test
14481546
void test_certificationTagRenamePropagatesToEntityAndSearch(TestNamespace ns) throws Exception {
14491547
OpenMetadataClient client = SdkClients.adminClient();

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4963,13 +4963,15 @@ List<TagLabelWithFQNHash> getTagsInternalBatch(
49634963
@SqlQuery(
49644964
"SELECT targetFQNHash, source, tagFQN, labelType, state, reason, appliedAt, appliedBy, metadata "
49654965
+ "FROM tag_usage "
4966-
+ "WHERE targetFQNHash IN (<targetFQNHashes>) "
4967-
+ "AND tagFQN LIKE :tagFQNPrefix "
4966+
+ "WHERE source = :source "
4967+
+ "AND targetFQNHash IN (<targetFQNHashes>) "
4968+
+ "AND tagFQNHash LIKE :tagFQNHashPrefix "
49684969
+ "ORDER BY targetFQNHash, tagFQN")
49694970
@UseRowMapper(TagLabelWithFQNHashMapper.class)
49704971
List<TagLabelWithFQNHash> getCertTagsInternalBatch(
4972+
@Bind("source") int source,
49714973
@BindListFQN("targetFQNHashes") List<String> targetFQNHashes,
4972-
@Bind("tagFQNPrefix") String tagFQNPrefix);
4974+
@Bind("tagFQNHashPrefix") String tagFQNHashPrefix);
49734975

49744976
/**
49754977
* Batch fetch derived tags for multiple glossary term FQNs. Returns a map from glossary term

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3722,7 +3722,9 @@ protected AssetCertification getCertification(T entity) {
37223722
daoCollection
37233723
.tagUsageDAO()
37243724
.getCertTagsInternalBatch(
3725-
List.of(entity.getFullyQualifiedName()), certClassification + ".%");
3725+
TagLabel.TagSource.CLASSIFICATION.ordinal(),
3726+
List.of(entity.getFullyQualifiedName()),
3727+
FullyQualifiedName.buildHash(certClassification) + ".%");
37263728
if (nullOrEmpty(certTags)) return null;
37273729
TagLabel tagLabel = certTags.get(0).toTagLabel();
37283730
TagLabelUtil.applyTagCommonFieldsGracefully(tagLabel);
@@ -7792,7 +7794,12 @@ private Map<UUID, AssetCertification> batchFetchCertification(List<T> entities)
77927794
List<CollectionDAO.TagUsageDAO.TagLabelWithFQNHash> certTags;
77937795
try {
77947796
certTags =
7795-
daoCollection.tagUsageDAO().getCertTagsInternalBatch(fqnList, certClassification + ".%");
7797+
daoCollection
7798+
.tagUsageDAO()
7799+
.getCertTagsInternalBatch(
7800+
TagLabel.TagSource.CLASSIFICATION.ordinal(),
7801+
fqnList,
7802+
FullyQualifiedName.buildHash(certClassification) + ".%");
77967803
} catch (Exception e) {
77977804
LOG.warn(
77987805
"batchFetchCertification: batch query failed, falling back to individual fetch: {}",

openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/EntityRepositoryCertificationTest.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ void getCertificationReturnsCertWhenTagFound() {
113113
tagEntry.setLabelType(TagLabel.LabelType.AUTOMATED.ordinal());
114114
tagEntry.setState(TagLabel.State.CONFIRMED.ordinal());
115115

116-
when(tagUsageDAO.getCertTagsInternalBatch(anyList(), anyString()))
116+
when(tagUsageDAO.getCertTagsInternalBatch(anyInt(), anyList(), anyString()))
117117
.thenReturn(List.of(tagEntry));
118118

119119
AssetCertification cert = repo.getCertification(entity);
@@ -130,7 +130,8 @@ void getCertificationReturnsNullWhenNoTagFound() {
130130
.withName("my-pipeline")
131131
.withFullyQualifiedName("service.my-pipeline");
132132

133-
when(tagUsageDAO.getCertTagsInternalBatch(anyList(), anyString())).thenReturn(List.of());
133+
when(tagUsageDAO.getCertTagsInternalBatch(anyInt(), anyList(), anyString()))
134+
.thenReturn(List.of());
134135

135136
AssetCertification cert = repo.getCertification(entity);
136137

@@ -206,7 +207,7 @@ void applyCertificationSkipsWhenSameCertAlreadyExists() {
206207
existingEntry.setLabelType(TagLabel.LabelType.AUTOMATED.ordinal());
207208
existingEntry.setState(TagLabel.State.CONFIRMED.ordinal());
208209

209-
when(tagUsageDAO.getCertTagsInternalBatch(anyList(), anyString()))
210+
when(tagUsageDAO.getCertTagsInternalBatch(anyInt(), anyList(), anyString()))
210211
.thenReturn(List.of(existingEntry));
211212

212213
assertDoesNotThrow(() -> repo.applyCertification(entity));
@@ -227,7 +228,8 @@ void applyCertificationAppliesTagWhenCertIsDifferent() {
227228
.withFullyQualifiedName("service.my-pipeline")
228229
.withCertification(incoming);
229230

230-
when(tagUsageDAO.getCertTagsInternalBatch(anyList(), anyString())).thenReturn(List.of());
231+
when(tagUsageDAO.getCertTagsInternalBatch(anyInt(), anyList(), anyString()))
232+
.thenReturn(List.of());
231233

232234
assertDoesNotThrow(() -> repo.applyCertification(entity));
233235

@@ -282,7 +284,8 @@ void storeRelationshipsInternalWithNoCertificationEntityDoesNotThrow() {
282284
.withFullyQualifiedName("service.my-pipeline")
283285
.withCertification(null);
284286

285-
when(tagUsageDAO.getCertTagsInternalBatch(anyList(), anyString())).thenReturn(List.of());
287+
when(tagUsageDAO.getCertTagsInternalBatch(anyInt(), anyList(), anyString()))
288+
.thenReturn(List.of());
286289

287290
assertDoesNotThrow(() -> repo.storeRelationshipsInternal(List.of(entity)));
288291
}
@@ -393,7 +396,8 @@ void storeRelationshipsInternalSingleEntityWithNoCert() {
393396
.withFullyQualifiedName("service.my-pipeline")
394397
.withCertification(null);
395398

396-
when(tagUsageDAO.getCertTagsInternalBatch(anyList(), anyString())).thenReturn(List.of());
399+
when(tagUsageDAO.getCertTagsInternalBatch(anyInt(), anyList(), anyString()))
400+
.thenReturn(List.of());
397401

398402
assertDoesNotThrow(() -> repo.storeRelationshipsInternal(entity));
399403
}
@@ -415,7 +419,7 @@ void fetchAndSetFieldsPopulatesCertification() {
415419
tagEntry.setState(TagLabel.State.CONFIRMED.ordinal());
416420
tagEntry.setTargetFQNHash(FullyQualifiedName.buildHash("service.my-pipeline"));
417421

418-
when(tagUsageDAO.getCertTagsInternalBatch(anyList(), anyString()))
422+
when(tagUsageDAO.getCertTagsInternalBatch(anyInt(), anyList(), anyString()))
419423
.thenReturn(List.of(tagEntry));
420424

421425
Fields certFields = new Fields(Set.of("certification"));
@@ -433,7 +437,7 @@ void fetchAndSetFieldsFallsBackToIndividualFetchOnBatchException() {
433437
.withName("my-pipeline")
434438
.withFullyQualifiedName("service.my-pipeline");
435439

436-
when(tagUsageDAO.getCertTagsInternalBatch(anyList(), anyString()))
440+
when(tagUsageDAO.getCertTagsInternalBatch(anyInt(), anyList(), anyString()))
437441
.thenThrow(new RuntimeException("DB error"))
438442
.thenReturn(List.of());
439443

@@ -568,7 +572,8 @@ void applyCertificationUsesEntityUpdatedByAsAppliedBy() {
568572
.withUpdatedBy("alice")
569573
.withCertification(new AssetCertification().withTagLabel(tagLabel));
570574

571-
when(tagUsageDAO.getCertTagsInternalBatch(anyList(), anyString())).thenReturn(List.of());
575+
when(tagUsageDAO.getCertTagsInternalBatch(anyInt(), anyList(), anyString()))
576+
.thenReturn(List.of());
572577

573578
assertDoesNotThrow(() -> repo.applyCertification(entity));
574579

0 commit comments

Comments
 (0)