Skip to content

Commit 5d830b7

Browse files
Refactor and Improve Glossary Term Operations - Tentative Release 1.11.4 (#24804)
* Manual Creation of ChangeEvent in Move Glossary Term Operation, Refactored redundant Update EntityStatus function, Change EntityLink in WorkflowInstance table when name is changed * Revert workflow instance change, Fix Feed Repository * Apply patch status as a graceful fallback if any of the user approval tasks in Flowable are corrupted * Fix In Transaction for CollectionDAO feed counter * Test Cases * cleanup thread tasks in test, return true for workflow handler resolution generic error * Remove Limit 1000 in FeedResourceTest --------- Co-authored-by: sonika-shah <58761340+sonika-shah@users.noreply.github.com>
1 parent e0f04a6 commit 5d830b7

13 files changed

Lines changed: 292 additions & 70 deletions

File tree

openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -533,11 +533,11 @@ public Map<String, Object> transformToNodeVariables(
533533
return namespacedVariables;
534534
}
535535

536-
public void resolveTask(UUID taskId) {
537-
resolveTask(taskId, null);
536+
public boolean resolveTask(UUID taskId) {
537+
return resolveTask(taskId, null);
538538
}
539539

540-
public void resolveTask(UUID customTaskId, Map<String, Object> variables) {
540+
public boolean resolveTask(UUID customTaskId, Map<String, Object> variables) {
541541
TaskService taskService = processEngine.getTaskService();
542542
LOG.debug("[WorkflowTask] RESOLVE: customTaskId='{}' variables={}", customTaskId, variables);
543543
try {
@@ -589,18 +589,21 @@ public void resolveTask(UUID customTaskId, Map<String, Object> variables) {
589589
});
590590
LOG.debug("[WorkflowTask] SUCCESS: Task '{}' resolved", customTaskId);
591591
}
592+
return true;
592593
} else {
593594
LOG.warn("[WorkflowTask] NOT_FOUND: No Flowable task for customTaskId='{}'", customTaskId);
595+
return false;
594596
}
595597
} catch (FlowableObjectNotFoundException ex) {
596598
LOG.error(
597599
"[WorkflowTask] ERROR: Flowable task not found for customTaskId='{}': {}",
598600
customTaskId,
599601
ex.getMessage());
602+
return false;
600603
} catch (Exception e) {
601604
LOG.error(
602605
"[WorkflowTask] ERROR: Failed to resolve task '{}': {}", customTaskId, e.getMessage(), e);
603-
throw e;
606+
return false;
604607
}
605608
}
606609

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2202,7 +2202,10 @@ interface FeedDAO {
22022202
connectionType = POSTGRES)
22032203
void updateTaskId();
22042204

2205-
@SqlQuery("SELECT id FROM task_sequence LIMIT 1")
2205+
@ConnectionAwareSqlQuery(value = "SELECT LAST_INSERT_ID()", connectionType = MYSQL)
2206+
@ConnectionAwareSqlQuery(
2207+
value = "SELECT id FROM task_sequence LIMIT 1",
2208+
connectionType = POSTGRES)
22062209
int getTaskId();
22072210

22082211
@SqlQuery("SELECT json FROM thread_entity WHERE taskId = :id")

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataContractRepository.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED;
1919
import static org.openmetadata.service.Entity.ADMIN_USER_NAME;
2020
import static org.openmetadata.service.Entity.DATA_CONTRACT;
21+
import static org.openmetadata.service.Entity.FIELD_ENTITY_STATUS;
2122
import static org.openmetadata.service.Entity.TEAM;
2223
import static org.openmetadata.service.exception.CatalogExceptionMessage.notReviewer;
2324
import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE;
@@ -92,6 +93,7 @@
9293
import org.openmetadata.service.rules.RuleEngine;
9394
import org.openmetadata.service.secrets.SecretsManagerFactory;
9495
import org.openmetadata.service.security.AuthorizationException;
96+
import org.openmetadata.service.util.EntityFieldUtils;
9597
import org.openmetadata.service.util.EntityUtil;
9698
import org.openmetadata.service.util.EntityUtil.Fields;
9799
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
@@ -1174,8 +1176,19 @@ public EntityInterface performTask(String user, ResolveTask resolveTask) {
11741176
variables.put(RESULT_VARIABLE, resolveTask.getNewValue().equalsIgnoreCase("approved"));
11751177
variables.put(UPDATED_BY_VARIABLE, user);
11761178
WorkflowHandler workflowHandler = WorkflowHandler.getInstance();
1177-
workflowHandler.resolveTask(
1178-
taskId, workflowHandler.transformToNodeVariables(taskId, variables));
1179+
boolean workflowSuccess =
1180+
workflowHandler.resolveTask(
1181+
taskId, workflowHandler.transformToNodeVariables(taskId, variables));
1182+
1183+
// If workflow failed (corrupted Flowable task), apply the status directly
1184+
if (!workflowSuccess) {
1185+
LOG.warn(
1186+
"[GlossaryTerm] Workflow failed for taskId='{}', applying status directly", taskId);
1187+
Boolean approved = (Boolean) variables.get(RESULT_VARIABLE);
1188+
String entityStatus = (approved != null && approved) ? "Approved" : "Rejected";
1189+
EntityFieldUtils.setEntityField(
1190+
dataContract, DATA_CONTRACT, user, FIELD_ENTITY_STATUS, entityStatus, true);
1191+
}
11791192

11801193
return dataContract;
11811194
}

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataProductRepository.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.openmetadata.schema.type.Include.NON_DELETED;
2020
import static org.openmetadata.service.Entity.DATA_PRODUCT;
2121
import static org.openmetadata.service.Entity.DOMAIN;
22+
import static org.openmetadata.service.Entity.FIELD_ENTITY_STATUS;
2223
import static org.openmetadata.service.Entity.FIELD_EXPERTS;
2324
import static org.openmetadata.service.Entity.FIELD_OWNERS;
2425
import static org.openmetadata.service.Entity.TEAM;
@@ -73,6 +74,7 @@
7374
import org.openmetadata.service.search.InheritedFieldEntitySearch.InheritedFieldQuery;
7475
import org.openmetadata.service.search.InheritedFieldEntitySearch.InheritedFieldResult;
7576
import org.openmetadata.service.security.AuthorizationException;
77+
import org.openmetadata.service.util.EntityFieldUtils;
7678
import org.openmetadata.service.util.EntityUtil;
7779
import org.openmetadata.service.util.EntityUtil.Fields;
7880
import org.openmetadata.service.util.LineageUtil;
@@ -538,8 +540,19 @@ public EntityInterface performTask(String user, ResolveTask resolveTask) {
538540
variables.put(RESULT_VARIABLE, resolveTask.getNewValue().equalsIgnoreCase("approved"));
539541
variables.put(UPDATED_BY_VARIABLE, user);
540542
WorkflowHandler workflowHandler = WorkflowHandler.getInstance();
541-
workflowHandler.resolveTask(
542-
taskId, workflowHandler.transformToNodeVariables(taskId, variables));
543+
boolean workflowSuccess =
544+
workflowHandler.resolveTask(
545+
taskId, workflowHandler.transformToNodeVariables(taskId, variables));
546+
547+
// If workflow failed (corrupted Flowable task), apply the status directly
548+
if (!workflowSuccess) {
549+
LOG.warn(
550+
"[GlossaryTerm] Workflow failed for taskId='{}', applying status directly", taskId);
551+
Boolean approved = (Boolean) variables.get(RESULT_VARIABLE);
552+
String entityStatus = (approved != null && approved) ? "Approved" : "Rejected";
553+
EntityFieldUtils.setEntityField(
554+
dataProduct, DATA_PRODUCT, user, FIELD_ENTITY_STATUS, entityStatus, true);
555+
}
543556

544557
return dataProduct;
545558
}

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import static org.openmetadata.service.Entity.getEntityReferenceById;
5454
import static org.openmetadata.service.exception.CatalogExceptionMessage.csvNotSupported;
5555
import static org.openmetadata.service.exception.CatalogExceptionMessage.entityNotFound;
56+
import static org.openmetadata.service.exception.CatalogExceptionMessage.notReviewer;
5657
import static org.openmetadata.service.resources.tags.TagLabelUtil.addDerivedTags;
5758
import static org.openmetadata.service.resources.tags.TagLabelUtil.addDerivedTagsGracefully;
5859
import static org.openmetadata.service.resources.tags.TagLabelUtil.checkDisabledTags;
@@ -215,6 +216,7 @@
215216
import org.openmetadata.service.search.SearchRepository;
216217
import org.openmetadata.service.search.SearchResultListMapper;
217218
import org.openmetadata.service.search.SearchSortFilter;
219+
import org.openmetadata.service.security.AuthorizationException;
218220
import org.openmetadata.service.security.policyevaluator.SubjectContext;
219221
import org.openmetadata.service.util.EntityETag;
220222
import org.openmetadata.service.util.EntityUtil;
@@ -4218,7 +4220,7 @@ private void updateInternal(boolean consolidatingChanges) {
42184220
updateDeleted();
42194221
updateDescription();
42204222
updateDisplayName();
4221-
updateEntityStatus();
4223+
updateEntityStatus(consolidatingChanges);
42224224
updateOwners();
42234225
updateExtension(consolidatingChanges);
42244226
updateTags(
@@ -4244,7 +4246,7 @@ private void updateInternalForImport(boolean consolidatingChanges) {
42444246
updateDeleted();
42454247
updateDescription();
42464248
updateDisplayName();
4247-
updateEntityStatus();
4249+
updateEntityStatus(consolidatingChanges);
42484250
updateOwnersForImport();
42494251
updateExtension(consolidatingChanges);
42504252
updateTagsForImport(
@@ -4301,9 +4303,47 @@ private void updateDisplayName() {
43014303
recordChange(FIELD_DISPLAY_NAME, original.getDisplayName(), updated.getDisplayName());
43024304
}
43034305

4304-
private void updateEntityStatus() {
4306+
private void updateEntityStatus(boolean consolidatingChanges) {
43054307
if (supportsEntityStatus) {
4306-
recordChange(FIELD_ENTITY_STATUS, original.getEntityStatus(), updated.getEntityStatus());
4308+
if (original.getEntityStatus().equals(updated.getEntityStatus())) {
4309+
return;
4310+
}
4311+
// Only reviewers can change from IN_REVIEW status to APPROVED/REJECTED status
4312+
if (!consolidatingChanges
4313+
&& original.getEntityStatus() == EntityStatus.IN_REVIEW
4314+
&& (updated.getEntityStatus() == EntityStatus.APPROVED
4315+
|| updated.getEntityStatus() == EntityStatus.REJECTED)) {
4316+
checkUpdatedByReviewer(original, updated.getUpdatedBy());
4317+
}
4318+
recordChange("entityStatus", original.getEntityStatus(), updated.getEntityStatus());
4319+
}
4320+
}
4321+
4322+
public static void checkUpdatedByReviewer(EntityInterface entity, String updatedBy) {
4323+
// Only list of allowed reviewers can change the status from DRAFT to APPROVED
4324+
List<EntityReference> reviewers = entity.getReviewers();
4325+
if (!nullOrEmpty(reviewers)) {
4326+
// Updating user must be one of the reviewers
4327+
boolean isReviewer =
4328+
reviewers.stream()
4329+
.anyMatch(
4330+
e -> {
4331+
if (e.getType().equals(TEAM)) {
4332+
Team team =
4333+
Entity.getEntityByName(TEAM, e.getName(), "users", Include.NON_DELETED);
4334+
return team.getUsers().stream()
4335+
.anyMatch(
4336+
u ->
4337+
u.getName().equals(updatedBy)
4338+
|| u.getFullyQualifiedName().equals(updatedBy));
4339+
} else {
4340+
return e.getName().equals(updatedBy)
4341+
|| e.getFullyQualifiedName().equals(updatedBy);
4342+
}
4343+
});
4344+
if (!isReviewer) {
4345+
throw new AuthorizationException(notReviewer(updatedBy));
4346+
}
43074347
}
43084348
}
43094349

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,13 @@ public enum PaginationType {
150150
}
151151

152152
public int getNextTaskId() {
153-
dao.feedDAO().updateTaskId();
154-
return dao.feedDAO().getTaskId();
153+
return Entity.getJdbi()
154+
.inTransaction(
155+
handle -> {
156+
CollectionDAO.FeedDAO feed = handle.attach(CollectionDAO.FeedDAO.class);
157+
feed.updateTaskId();
158+
return feed.getTaskId();
159+
});
155160
}
156161

157162
@Getter

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/GlossaryTermRepository.java

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
2121
import static org.openmetadata.schema.type.EventType.ENTITY_CREATED;
2222
import static org.openmetadata.schema.type.Include.ALL;
23+
import static org.openmetadata.service.Entity.FIELD_ENTITY_STATUS;
2324
import static org.openmetadata.service.Entity.GLOSSARY;
2425
import static org.openmetadata.service.Entity.GLOSSARY_TERM;
2526
import static org.openmetadata.service.Entity.TEAM;
@@ -35,8 +36,8 @@
3536
import static org.openmetadata.service.util.EntityUtil.compareEntityReferenceById;
3637
import static org.openmetadata.service.util.EntityUtil.compareTagLabel;
3738
import static org.openmetadata.service.util.EntityUtil.entityReferenceMatch;
38-
import static org.openmetadata.service.util.EntityUtil.fieldUpdated;
3939
import static org.openmetadata.service.util.EntityUtil.getId;
40+
import static org.openmetadata.service.util.EntityUtil.isNullOrEmptyChangeDescription;
4041
import static org.openmetadata.service.util.EntityUtil.stringMatch;
4142
import static org.openmetadata.service.util.EntityUtil.tagLabelMatch;
4243
import static org.openmetadata.service.util.EntityUtil.termReferenceMatch;
@@ -79,8 +80,10 @@
7980
import org.openmetadata.schema.search.SearchRequest;
8081
import org.openmetadata.schema.type.ApiStatus;
8182
import org.openmetadata.schema.type.ChangeDescription;
83+
import org.openmetadata.schema.type.ChangeEvent;
8284
import org.openmetadata.schema.type.EntityReference;
8385
import org.openmetadata.schema.type.EntityStatus;
86+
import org.openmetadata.schema.type.EventType;
8487
import org.openmetadata.schema.type.Include;
8588
import org.openmetadata.schema.type.ProviderType;
8689
import org.openmetadata.schema.type.Relationship;
@@ -108,6 +111,7 @@
108111
import org.openmetadata.service.search.InheritedFieldEntitySearch.InheritedFieldQuery;
109112
import org.openmetadata.service.search.InheritedFieldEntitySearch.InheritedFieldResult;
110113
import org.openmetadata.service.security.AuthorizationException;
114+
import org.openmetadata.service.util.EntityFieldUtils;
111115
import org.openmetadata.service.util.EntityUtil;
112116
import org.openmetadata.service.util.EntityUtil.Fields;
113117
import org.openmetadata.service.util.FullyQualifiedName;
@@ -789,8 +793,19 @@ public EntityInterface performTask(String user, ResolveTask resolveTask) {
789793
variables.put(RESULT_VARIABLE, resolveTask.getNewValue().equalsIgnoreCase("approved"));
790794
variables.put(UPDATED_BY_VARIABLE, user);
791795
WorkflowHandler workflowHandler = WorkflowHandler.getInstance();
792-
workflowHandler.resolveTask(
793-
taskId, workflowHandler.transformToNodeVariables(taskId, variables));
796+
boolean workflowSuccess =
797+
workflowHandler.resolveTask(
798+
taskId, workflowHandler.transformToNodeVariables(taskId, variables));
799+
800+
// If workflow failed (corrupted Flowable task), apply the status directly
801+
if (!workflowSuccess) {
802+
LOG.warn(
803+
"[GlossaryTerm] Workflow failed for taskId='{}', applying status directly", taskId);
804+
Boolean approved = (Boolean) variables.get(RESULT_VARIABLE);
805+
String entityStatus = (approved != null && approved) ? "Approved" : "Rejected";
806+
EntityFieldUtils.setEntityField(
807+
glossaryTerm, "glossaryTerm", user, FIELD_ENTITY_STATUS, entityStatus, true);
808+
}
794809
// ---
795810

796811
// TODO: performTask returns the updated Entity and the flow applies the new value.
@@ -1224,7 +1239,6 @@ public void updateReviewers() {
12241239
@Override
12251240
public void entitySpecificUpdate(boolean consolidatingChanges) {
12261241
validateParent();
1227-
updateStatus(original, updated, consolidatingChanges);
12281242
updateSynonyms(original, updated);
12291243
updateReferences(original, updated);
12301244
updateRelatedTerms(original, updated);
@@ -1239,27 +1253,14 @@ public void entitySpecificUpdate(boolean consolidatingChanges) {
12391253
*/
12401254
@Transaction
12411255
public void moveAndStore() {
1242-
updateChangeDescriptionForMove();
1256+
changeDescription = new ChangeDescription().withPreviousVersion(original.getVersion());
12431257
// Now updated from previous/original to updated one
12441258
validateParent();
12451259
updateParent(original, updated); // Only update parent/glossary and FQN/relationships
12461260
storeUpdate();
12471261
postUpdate(original, updated);
12481262
}
12491263

1250-
private void updateChangeDescriptionForMove() {
1251-
ChangeDescription change = new ChangeDescription().withPreviousVersion(original.getVersion());
1252-
if (!Objects.equals(original.getParent(), updated.getParent())) {
1253-
fieldUpdated(change, "parent", original.getParent(), updated.getParent());
1254-
}
1255-
if (!Objects.equals(original.getGlossary(), updated.getGlossary())) {
1256-
fieldUpdated(change, "glossary", original.getGlossary(), updated.getGlossary());
1257-
}
1258-
updated.setIncrementalChangeDescription(change);
1259-
updated.setChangeDescription(change);
1260-
this.changeDescription = change;
1261-
}
1262-
12631264
private boolean validateIfTagsAreEqual(
12641265
List<TagLabel> originalTags, List<TagLabel> updatedTags) {
12651266
Set<String> originalTagsFqn =
@@ -1314,21 +1315,6 @@ protected void updateTags(
13141315
}
13151316
}
13161317

1317-
private void updateStatus(
1318-
GlossaryTerm origTerm, GlossaryTerm updatedTerm, boolean consolidatingChanges) {
1319-
if (origTerm.getEntityStatus() == updatedTerm.getEntityStatus()) {
1320-
return;
1321-
}
1322-
// Only reviewers can change from IN_REVIEW status to APPROVED/REJECTED status
1323-
if (!consolidatingChanges
1324-
&& origTerm.getEntityStatus() == EntityStatus.IN_REVIEW
1325-
&& (updatedTerm.getEntityStatus() == EntityStatus.APPROVED
1326-
|| updatedTerm.getEntityStatus() == EntityStatus.REJECTED)) {
1327-
checkUpdatedByReviewer(origTerm, updatedTerm.getUpdatedBy());
1328-
}
1329-
recordChange("entityStatus", origTerm.getEntityStatus(), updatedTerm.getEntityStatus());
1330-
}
1331-
13321318
private void updateSynonyms(GlossaryTerm origTerm, GlossaryTerm updatedTerm) {
13331319
List<String> origSynonyms = listOrEmpty(origTerm.getSynonyms());
13341320
List<String> updatedSynonyms = listOrEmpty(updatedTerm.getSynonyms());
@@ -1720,6 +1706,26 @@ public GlossaryTerm moveGlossaryTerm(UUID id, MoveGlossaryTermRequest moveReques
17201706
updated.setUpdatedAt(System.currentTimeMillis());
17211707
GlossaryTermUpdater updater = new GlossaryTermUpdater(original, updated, Operation.PUT);
17221708
updater.moveAndStore();
1709+
if (updated.getChangeDescription() != null
1710+
&& !isNullOrEmptyChangeDescription(updated.getChangeDescription())) {
1711+
try {
1712+
ChangeEvent changeEvent =
1713+
new ChangeEvent()
1714+
.withId(UUID.randomUUID())
1715+
.withEventType(EventType.ENTITY_UPDATED)
1716+
.withEntityType(entityType)
1717+
.withEntityId(updated.getId())
1718+
.withEntityFullyQualifiedName(updated.getFullyQualifiedName())
1719+
.withUserName(updated.getUpdatedBy())
1720+
.withPreviousVersion(original.getVersion())
1721+
.withCurrentVersion(updated.getVersion())
1722+
.withTimestamp(System.currentTimeMillis())
1723+
.withEntity(updated);
1724+
Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
1725+
} catch (Exception e) {
1726+
LOG.error("Failed to insert change event for async move operation", e);
1727+
}
1728+
}
17231729
return updated;
17241730
}
17251731

0 commit comments

Comments
 (0)