Skip to content

[WIP] feat(workflows): batch entity processing — entityList-only for automated task nodes#26715

Open
yan-3005 wants to merge 67 commits intomainfrom
ram/workflow-improvements
Open

[WIP] feat(workflows): batch entity processing — entityList-only for automated task nodes#26715
yan-3005 wants to merge 67 commits intomainfrom
ram/workflow-improvements

Conversation

@yan-3005
Copy link
Copy Markdown
Contributor

@yan-3005 yan-3005 commented Mar 24, 2026

Issue: https://github.com/open-metadata/openmetadata-collate/issues/3363

What

Phase 1 of batch entity processing in OpenMetadata's Flowable-based governance workflow engine.

All 6 automated task node types (checkEntityAttributesTask, checkChangeDescriptionTask, setEntityAttributeTask, rollbackEntityTask, sinkTask, dataCompletenessTask) now process a List<String> of entity link strings (entityList) in a single workflow instance. Single-entity mode is batch mode of size 1 — the same code path handles both.

Why

Previously, event-based workflows spawned one Flowable subprocess per entity via loopCardinality. Under load this creates O(n) Flowable process instances. The batch design processes the entire entity set in one pass, with per-entity retry isolation, so failures on individual entities never abort the workflow.

Architecture

Two independent channels carry data through the workflow:

Channel Variable Purpose
Control flow RESULT_VARIABLE (result) Drives conditional edge routing via ${nodeName_result == 'value'}
Data flow entityList / true_entityList / false_entityList / {band}_entityList Routes the actual entity sets between nodes

Check nodes split incoming entities into conditional lists. Downstream nodes read the correct list via inputNamespaceMap using the upstream node name as namespace (e.g. "true_entityList": "CheckOwner"). WorkflowVariableHandler.getEntityList() detects conditional keys via endsWith("_entityList").

Files Changed

Workflow entry points — seeding entityList

  • WorkflowEventConsumer.javadefaultHandler() and handleTagRecognizerFeedback() now set global_entityList = List.of(entityLinkString) alongside global_relatedEntity; this is where event-based workflows receive their initial entity list. Entity references are now resolved with Include.NON_DELETED — deleted entities are skipped before any workflow is triggered
  • EventBasedEntityTrigger.java — explicitly passes entityList as an IOParameter from the filter subprocess into the main workflow (in addition to relatedEntity); skips entityList in the dynamic output loop to prevent double-adding
  • PeriodicBatchEntityTrigger.java / FetchEntitiesImpl.javaCOLLECTION_VARIABLE removed from PeriodicBatchEntityTrigger; HAS_FINISHED_VARIABLE made final; FetchEntitiesImpl imports ENTITY_LIST_VARIABLE from Workflow. singleExecution=true cardinality uses ${entityList != null && !entityList.isEmpty() ? 1 : 0} — creates 0 Flowable instances on empty batches (no crash) and exactly 1 on non-empty batches (full-list single invocation)
  • FilterEntityImpl.java — bug fix: was only setting PASSES_FILTER_VARIABLE but never populating global_entityList, so event-based workflows received an empty entity list downstream; now sets global_entityList = List.of(entityLinkStr) unconditionally

Workflow engine

  • Workflow.java — added TRUE_ENTITY_LIST_VARIABLE = "true_entityList" constant
  • WorkflowVariableHandler.javagetEntityList() supports conditional routing keys (true_entityList, false_entityList, {band}_entityList) via endsWith("_entityList"); added LOG.debug for empty-list cases
  • 6 BPMN builder files (CheckEntityAttributesTask, CheckChangeDescriptionTask, DataCompletenessTask, RollbackEntityTask, SetEntityAttributeTask, SinkTask) — renamed converteddefinedNamespaceMap
  • TriggerFactory.java — Javadoc cleanup only

Automated task implementations (6 files)

All now loop over entityList with per-entity resilience4j retry (3 attempts, 500 ms). Per-entity failures are stored in node variables and never throw BpmnError.

  • CheckEntityAttributesImpl / CheckChangeDescriptionTaskImpl — produce true_entityList, false_entityList, RESULT_VARIABLE only; backward-compat plain entityList output removed
  • RollbackEntityImpl — renamed getPreviousApprovedVersiongetPreviousRollbackTargetVersion; APPROVED → DRAFT → DEPRECATED fallback chain; reflection-based getEntityStatus() returns EntityStatus directly; removed unused fields
  • SetEntityAttributeImpl / SinkTaskDelegate — retry + per-entity error tracking; SinkTaskDelegate declares entityLink outside try-block with fallback to raw entityLinkStr on parse failure. SetEntityAttributeImpl loads entity links with Include.NON_DELETED so soft-deleted entities are never picked up for attribute writes
  • EntityRepository.javabulkUpdateEntitiesForGovernanceWorkflow gained a skipDeleted boolean parameter; governance calls pass true so the implicit restoreEntity() path is bypassed for deleted entities, preventing governance workflows from resurrecting soft-deleted entities (e.g. glossary terms deleted mid-run)
  • DataCompletenessImpl — retry + fixed band iteration (was iterating empty list); storeFieldList() always returns List<String>; LOG.infoLOG.debug for per-entity logs

Workflow definition (GlossaryApprovalWorkflow.json)

8 nodes downstream of check nodes updated to use conditional inputNamespaceMap keys:

Node Before After
CheckIfGlossaryTermUpdatedByIsReviewer "entityList": "global" "true_entityList": "CheckGlossaryTermHasReviewers"
SetGlossaryTermStatusToApprovedByReviewer "entityList": "global" "true_entityList": "CheckIfGlossaryTermUpdatedByIsReviewer"
CheckGlossaryTermIsReadyToBeReviewed "entityList": "global" "false_entityList": "CheckIfGlossaryTermUpdatedByIsReviewer"
SetGlossaryTermStatusToDraft "entityList": "global" "false_entityList": "CheckGlossaryTermIsReadyToBeReviewed"
CheckIfGlossaryTermIsNew "entityList": "global" "true_entityList": "CheckGlossaryTermIsReadyToBeReviewed"
SetGlossaryTermStatusToInReview "entityList": "global" "true_entityList": "CheckIfGlossaryTermIsNew"
SetGlossaryTermStatusToInReviewForUpdate "entityList": "global" "false_entityList": "CheckIfGlossaryTermIsNew"
SetGlossaryTermStatusToApproved "entityList": "global" "false_entityList": "CheckGlossaryTermHasReviewers"

Nodes after user tasks (ApproveGlossaryTerm, ApprovalForUpdates) keep "entityList": "global" — user tasks are not check nodes.

JSON Schemas (openmetadata-spec)

  • 6 task schemas (checkEntityAttributesTask, checkChangeDescriptionTask, setEntityAttributeTask, rollbackEntityTask, sinkTask, dataCompletenessTask) — additionalProperties: true in inputNamespaceMap; entityList added to input/output defaults; checkEntityAttributesTask and checkChangeDescriptionTask also add true_entityList to output defaults
  • Trigger schemas (eventBasedEntityTrigger, periodicBatchEntityTrigger) — added entityList to output defaults alongside relatedEntity; removed maxItems: 1 constraint (now outputs 2 items)
  • Generated TypeScript — auto-generated from schema changes above

Migrations

  • v1140/MigrationUtil.java — graph-aware, idempotent migration for all existing workflow definitions:
    • Phase 1 (raw JSON, bypasses schema validation): adds entityList to trigger output; inspects incoming edges — check node source with condition → {condition}_entityList: sourceNode; otherwise → entityList: "global"; removes relatedEntity; preserves existing entityList/conditional keys (only computes when missing)
    • Phase 2: redeploys all workflows via WorkflowHandler.deploy()
    • Multi-edge fix: incomingEdge uses Map<String, List<String[]>> with computeIfAbsent().add()
  • v1105/MigrationUtil.java — hardcoded node templates in the v1105 migration (which inserts update-flow nodes into GlossaryApprovalWorkflow) updated from relatedEntity to entityList; also migrates inputNamespaceMap inline for existing nodes during that migration
  • mysql/v1140/Migration.java / postgres/v1140/Migration.java — wired initializeWorkflowHandler() + migrateWorkflowInputNamespaceMap() into both DB flavors

Bugs Fixed

# Bug Fix
1 additionalProperties: false in task schemas blocked conditional keys from surviving REST API re-saves Changed to true in all 6 task schemas
2 No TRUE_ENTITY_LIST_VARIABLE constant — raw string concat used Added constant to Workflow.java
3 incomingEdge map used put() — overwrote earlier entries for nodes with multiple incoming edges Changed to Map<String, List<String[]>> with computeIfAbsent().add()
4 addEntityListToNamespaceMap unconditionally overwrote existing entityList when relatedEntity also present Only compute entityList keys when none exist yet
5 FilterEntityImpl never populated global_entityList — event-based workflows got empty entity list Set global_entityList = List.of(entityLinkStr) unconditionally
6 Empty entity list in getEntityList() was a silent no-op Added LOG.debug(...)

Tests Added / Updated

  • MigrationUtilTest (v1140) — 33 tests: all migration paths, conditional routing, multi-edge, idempotency, the overwrite regression
  • MigrationUtilTest (v1105) — updated for entityList fixtures
  • Builder testsCheckEntityAttributesTaskTest, CheckChangeDescriptionTaskTest, DataCompletenessTaskTest, RollbackEntityTaskTest, SetEntityAttributeTaskTest
  • Impl testsCheckEntityAttributesImplTest, CheckChangeDescriptionTaskImplTest, SetEntityAttributeImplTest (mock updated to Include.NON_DELETED; reflects entity-link loading change)
  • TriggerFactoryTest / PeriodicBatchEntityTriggerTest — cardinality assertions updated to expect ${entityList != null && !entityList.isEmpty() ? 1 : 0} for all singleExecution=true cases
  • SinkTaskDelegateTest — error path / catch block coverage
  • SinkTaskTest — updated fixtures from relatedEntityentityList
  • WorkflowDefinitionResourceIT — integration test updated: entityList in input/inputNamespaceMap; false_entityList added to output list

Type of change:

  • Bug fix
  • Improvement

Checklist:

  • I have read the CONTRIBUTING document.
  • For JSON Schema changes: I updated the migration scripts.
  • I have added tests around the new logic.

Summary by Gitar

  • Governance resilience:
    • Restricted entity lookups and processing to Include.NON_DELETED to prevent workflows from restoring soft-deleted entities.
    • Added a safety guard in PeriodicBatchEntityTrigger to dynamically set loop cardinality based on entityList presence, preventing errors on empty batches.

This will update automatically on new commits.

Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants