[WIP] feat(workflows): batch entity processing — entityList-only for automated task nodes#26715
Open
[WIP] feat(workflows): batch entity processing — entityList-only for automated task nodes#26715
Conversation
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Issue: https://github.com/open-metadata/openmetadata-collate/issues/3363
What
Phase 1 of batch entity processing in OpenMetadata's Flowable-based governance workflow engine.
All 6 automated task node types (
checkEntityAttributesTask,checkChangeDescriptionTask,setEntityAttributeTask,rollbackEntityTask,sinkTask,dataCompletenessTask) now process aList<String>of entity link strings (entityList) in a single workflow instance. Single-entity mode is batch mode of size 1 — the same code path handles both.Why
Previously, event-based workflows spawned one Flowable subprocess per entity via
loopCardinality. Under load this creates O(n) Flowable process instances. The batch design processes the entire entity set in one pass, with per-entity retry isolation, so failures on individual entities never abort the workflow.Architecture
Two independent channels carry data through the workflow:
RESULT_VARIABLE(result)${nodeName_result == 'value'}entityList/true_entityList/false_entityList/{band}_entityListCheck nodes split incoming entities into conditional lists. Downstream nodes read the correct list via
inputNamespaceMapusing the upstream node name as namespace (e.g."true_entityList": "CheckOwner").WorkflowVariableHandler.getEntityList()detects conditional keys viaendsWith("_entityList").Files Changed
Workflow entry points — seeding
entityListWorkflowEventConsumer.java—defaultHandler()andhandleTagRecognizerFeedback()now setglobal_entityList = List.of(entityLinkString)alongsideglobal_relatedEntity; this is where event-based workflows receive their initial entity list. Entity references are now resolved withInclude.NON_DELETED— deleted entities are skipped before any workflow is triggeredEventBasedEntityTrigger.java— explicitly passesentityListas anIOParameterfrom the filter subprocess into the main workflow (in addition torelatedEntity); skipsentityListin the dynamic output loop to prevent double-addingPeriodicBatchEntityTrigger.java/FetchEntitiesImpl.java—COLLECTION_VARIABLEremoved fromPeriodicBatchEntityTrigger;HAS_FINISHED_VARIABLEmadefinal;FetchEntitiesImplimportsENTITY_LIST_VARIABLEfromWorkflow.singleExecution=truecardinality uses${entityList != null && !entityList.isEmpty() ? 1 : 0}— creates 0 Flowable instances on empty batches (no crash) and exactly 1 on non-empty batches (full-list single invocation)FilterEntityImpl.java— bug fix: was only settingPASSES_FILTER_VARIABLEbut never populatingglobal_entityList, so event-based workflows received an empty entity list downstream; now setsglobal_entityList = List.of(entityLinkStr)unconditionallyWorkflow engine
Workflow.java— addedTRUE_ENTITY_LIST_VARIABLE = "true_entityList"constantWorkflowVariableHandler.java—getEntityList()supports conditional routing keys (true_entityList,false_entityList,{band}_entityList) viaendsWith("_entityList"); addedLOG.debugfor empty-list casesCheckEntityAttributesTask,CheckChangeDescriptionTask,DataCompletenessTask,RollbackEntityTask,SetEntityAttributeTask,SinkTask) — renamedconverted→definedNamespaceMapTriggerFactory.java— Javadoc cleanup onlyAutomated task implementations (6 files)
All now loop over
entityListwith per-entity resilience4j retry (3 attempts, 500 ms). Per-entity failures are stored in node variables and never throwBpmnError.CheckEntityAttributesImpl/CheckChangeDescriptionTaskImpl— producetrue_entityList,false_entityList,RESULT_VARIABLEonly; backward-compat plainentityListoutput removedRollbackEntityImpl— renamedgetPreviousApprovedVersion→getPreviousRollbackTargetVersion; APPROVED → DRAFT → DEPRECATED fallback chain; reflection-basedgetEntityStatus()returnsEntityStatusdirectly; removed unused fieldsSetEntityAttributeImpl/SinkTaskDelegate— retry + per-entity error tracking;SinkTaskDelegatedeclaresentityLinkoutside try-block with fallback to rawentityLinkStron parse failure.SetEntityAttributeImplloads entity links withInclude.NON_DELETEDso soft-deleted entities are never picked up for attribute writesEntityRepository.java—bulkUpdateEntitiesForGovernanceWorkflowgained askipDeletedboolean parameter; governance calls passtrueso the implicitrestoreEntity()path is bypassed for deleted entities, preventing governance workflows from resurrecting soft-deleted entities (e.g. glossary terms deleted mid-run)DataCompletenessImpl— retry + fixed band iteration (was iterating empty list);storeFieldList()always returnsList<String>;LOG.info→LOG.debugfor per-entity logsWorkflow definition (GlossaryApprovalWorkflow.json)
8 nodes downstream of check nodes updated to use conditional
inputNamespaceMapkeys:CheckIfGlossaryTermUpdatedByIsReviewer"entityList": "global""true_entityList": "CheckGlossaryTermHasReviewers"SetGlossaryTermStatusToApprovedByReviewer"entityList": "global""true_entityList": "CheckIfGlossaryTermUpdatedByIsReviewer"CheckGlossaryTermIsReadyToBeReviewed"entityList": "global""false_entityList": "CheckIfGlossaryTermUpdatedByIsReviewer"SetGlossaryTermStatusToDraft"entityList": "global""false_entityList": "CheckGlossaryTermIsReadyToBeReviewed"CheckIfGlossaryTermIsNew"entityList": "global""true_entityList": "CheckGlossaryTermIsReadyToBeReviewed"SetGlossaryTermStatusToInReview"entityList": "global""true_entityList": "CheckIfGlossaryTermIsNew"SetGlossaryTermStatusToInReviewForUpdate"entityList": "global""false_entityList": "CheckIfGlossaryTermIsNew"SetGlossaryTermStatusToApproved"entityList": "global""false_entityList": "CheckGlossaryTermHasReviewers"Nodes after user tasks (
ApproveGlossaryTerm,ApprovalForUpdates) keep"entityList": "global"— user tasks are not check nodes.JSON Schemas (openmetadata-spec)
checkEntityAttributesTask,checkChangeDescriptionTask,setEntityAttributeTask,rollbackEntityTask,sinkTask,dataCompletenessTask) —additionalProperties: trueininputNamespaceMap;entityListadded to input/output defaults;checkEntityAttributesTaskandcheckChangeDescriptionTaskalso addtrue_entityListto output defaultseventBasedEntityTrigger,periodicBatchEntityTrigger) — addedentityListto output defaults alongsiderelatedEntity; removedmaxItems: 1constraint (now outputs 2 items)Migrations
v1140/MigrationUtil.java— graph-aware, idempotent migration for all existing workflow definitions:entityListto trigger output; inspects incoming edges — check node source with condition →{condition}_entityList: sourceNode; otherwise →entityList: "global"; removesrelatedEntity; preserves existingentityList/conditional keys (only computes when missing)WorkflowHandler.deploy()incomingEdgeusesMap<String, List<String[]>>withcomputeIfAbsent().add()v1105/MigrationUtil.java— hardcoded node templates in the v1105 migration (which inserts update-flow nodes into GlossaryApprovalWorkflow) updated fromrelatedEntitytoentityList; also migratesinputNamespaceMapinline for existing nodes during that migrationmysql/v1140/Migration.java/postgres/v1140/Migration.java— wiredinitializeWorkflowHandler()+migrateWorkflowInputNamespaceMap()into both DB flavorsBugs Fixed
additionalProperties: falsein task schemas blocked conditional keys from surviving REST API re-savestruein all 6 task schemasTRUE_ENTITY_LIST_VARIABLEconstant — raw string concat usedWorkflow.javaincomingEdgemap usedput()— overwrote earlier entries for nodes with multiple incoming edgesMap<String, List<String[]>>withcomputeIfAbsent().add()addEntityListToNamespaceMapunconditionally overwrote existingentityListwhenrelatedEntityalso presentFilterEntityImplnever populatedglobal_entityList— event-based workflows got empty entity listglobal_entityList = List.of(entityLinkStr)unconditionallygetEntityList()was a silent no-opLOG.debug(...)Tests Added / Updated
MigrationUtilTest(v1140) — 33 tests: all migration paths, conditional routing, multi-edge, idempotency, the overwrite regressionMigrationUtilTest(v1105) — updated forentityListfixturesCheckEntityAttributesTaskTest,CheckChangeDescriptionTaskTest,DataCompletenessTaskTest,RollbackEntityTaskTest,SetEntityAttributeTaskTestCheckEntityAttributesImplTest,CheckChangeDescriptionTaskImplTest,SetEntityAttributeImplTest(mock updated toInclude.NON_DELETED; reflects entity-link loading change)TriggerFactoryTest/PeriodicBatchEntityTriggerTest— cardinality assertions updated to expect${entityList != null && !entityList.isEmpty() ? 1 : 0}for allsingleExecution=truecasesSinkTaskDelegateTest— error path / catch block coverageSinkTaskTest— updated fixtures fromrelatedEntity→entityListWorkflowDefinitionResourceIT— integration test updated:entityListininput/inputNamespaceMap;false_entityListadded to output listType of change:
Checklist:
Summary by Gitar
Include.NON_DELETEDto prevent workflows from restoring soft-deleted entities.PeriodicBatchEntityTriggerto dynamically set loop cardinality based onentityListpresence, preventing errors on empty batches.This will update automatically on new commits.