@@ -1807,6 +1807,171 @@ void test_EventBasedWorkflowForMultipleEntities(TestInfo test)
18071807
18081808 @ Test
18091809 @ Order (6 )
1810+ void test_WorkflowFieldUpdateDoesNotCreateRedundantChangeEvents (TestInfo test ) throws Exception {
1811+ LOG .info ("Starting test to verify workflow field updates don't create redundant change events" );
1812+
1813+ // Create a test table
1814+ CreateDatabaseService createService =
1815+ databaseServiceTest .createRequest (
1816+ "test_changeevent_service_" + test .getDisplayName ().replaceAll ("[^a-zA-Z0-9_]" , "" ));
1817+ DatabaseService service = databaseServiceTest .createEntity (createService , ADMIN_AUTH_HEADERS );
1818+
1819+ CreateDatabase createDatabase =
1820+ new CreateDatabase ()
1821+ .withName (
1822+ "test_changeevent_db_" + test .getDisplayName ().replaceAll ("[^a-zA-Z0-9_]" , "" ))
1823+ .withService (service .getFullyQualifiedName ());
1824+ Database database = databaseTest .createEntity (createDatabase , ADMIN_AUTH_HEADERS );
1825+
1826+ CreateDatabaseSchema createSchema =
1827+ new CreateDatabaseSchema ()
1828+ .withName (
1829+ "test_changeevent_schema_" + test .getDisplayName ().replaceAll ("[^a-zA-Z0-9_]" , "" ))
1830+ .withDatabase (database .getFullyQualifiedName ());
1831+ DatabaseSchema schema = schemaTest .createEntity (createSchema , ADMIN_AUTH_HEADERS );
1832+
1833+ CreateTable createTable =
1834+ new CreateTable ()
1835+ .withName (
1836+ "test_changeevent_table_" + test .getDisplayName ().replaceAll ("[^a-zA-Z0-9_]" , "" ))
1837+ .withDatabaseSchema (schema .getFullyQualifiedName ())
1838+ .withColumns (
1839+ List .of (
1840+ new Column ().withName ("id" ).withDataType (ColumnDataType .INT ),
1841+ new Column ().withName ("name" ).withDataType (ColumnDataType .STRING )));
1842+ Table table = tableTest .createEntity (createTable , ADMIN_AUTH_HEADERS );
1843+ LOG .debug ("Created test table: {}" , table .getName ());
1844+
1845+ // Record initial change event count
1846+ long initialOffset =
1847+ org .openmetadata .service .Entity .getCollectionDAO ().changeEventDAO ().getLatestOffset ();
1848+ LOG .debug ("Initial change event offset: {}" , initialOffset );
1849+
1850+ // Create workflow that sets tags (this should create meaningful changes)
1851+ String workflowJson =
1852+ """
1853+ {
1854+ "name": "testRedundantChangeEvents",
1855+ "displayName": "Test Redundant Change Events",
1856+ "description": "Test workflow to verify no redundant change events",
1857+ "trigger": {
1858+ "type": "periodicBatchEntity",
1859+ "config": {
1860+ "entityTypes": ["table"],
1861+ "schedule": {"scheduleTimeline": "None"},
1862+ "batchSize": 100,
1863+ "filters": {}
1864+ },
1865+ "output": ["relatedEntity", "updatedBy"]
1866+ },
1867+ "nodes": [
1868+ {"type": "startEvent", "subType": "startEvent", "name": "start", "displayName": "start"},
1869+ {
1870+ "type": "automatedTask",
1871+ "subType": "setEntityAttributeTask",
1872+ "name": "setTag",
1873+ "displayName": "Set Tag",
1874+ "config": {
1875+ "fieldName": "tags",
1876+ "fieldValue": "Tier.Tier1"
1877+ },
1878+ "input": ["relatedEntity", "updatedBy"],
1879+ "inputNamespaceMap": {"relatedEntity": "global", "updatedBy": "global"},
1880+ "output": []
1881+ },
1882+ {"type": "endEvent", "subType": "endEvent", "name": "end", "displayName": "end"}
1883+ ],
1884+ "edges": [
1885+ {"from": "start", "to": "setTag"},
1886+ {"from": "setTag", "to": "end"}
1887+ ],
1888+ "config": {"storeStageStatus": true}
1889+ }
1890+ """ ;
1891+
1892+ CreateWorkflowDefinition workflow =
1893+ JsonUtils .readValue (workflowJson , CreateWorkflowDefinition .class );
1894+
1895+ // Create and trigger workflow
1896+ Response response =
1897+ SecurityUtil .addHeaders (getResource ("governance/workflowDefinitions" ), ADMIN_AUTH_HEADERS )
1898+ .post (Entity .json (workflow ));
1899+ assertTrue (
1900+ response .getStatus () == Response .Status .CREATED .getStatusCode ()
1901+ || response .getStatus () == Response .Status .OK .getStatusCode ());
1902+
1903+ // Wait a moment for workflow setup
1904+ java .lang .Thread .sleep (2000 );
1905+
1906+ // Trigger the workflow FIRST time
1907+ Response triggerResponse =
1908+ SecurityUtil .addHeaders (
1909+ getResource (
1910+ "governance/workflowDefinitions/name/testRedundantChangeEvents/trigger" ),
1911+ ADMIN_AUTH_HEADERS )
1912+ .post (Entity .json ("{}" ));
1913+ assertEquals (Response .Status .OK .getStatusCode (), triggerResponse .getStatus ());
1914+
1915+ // Wait for workflow to complete
1916+ java .lang .Thread .sleep (15000 );
1917+
1918+ // Count change events after first workflow run
1919+ long firstRunOffset =
1920+ org .openmetadata .service .Entity .getCollectionDAO ().changeEventDAO ().getLatestOffset ();
1921+ long firstRunEventCount = firstRunOffset - initialOffset ;
1922+
1923+ // Verify the tag was actually added (meaningful change)
1924+ Table updatedTable = tableTest .getEntity (table .getId (), "tags" , ADMIN_AUTH_HEADERS );
1925+ boolean hasTag =
1926+ updatedTable .getTags () != null
1927+ && updatedTable .getTags ().stream ()
1928+ .anyMatch (tag -> "Tier.Tier1" .equals (tag .getTagFQN ()));
1929+ assertTrue (hasTag , "Table should have Tier.Tier1 tag after first workflow run" );
1930+
1931+ LOG .info ("First workflow run created {} change events" , firstRunEventCount );
1932+ assertTrue (
1933+ firstRunEventCount > 0 , "First workflow run should create at least one change event" );
1934+
1935+ // Trigger the workflow SECOND time (should NOT create new events since no actual changes)
1936+ triggerResponse =
1937+ SecurityUtil .addHeaders (
1938+ getResource (
1939+ "governance/workflowDefinitions/name/testRedundantChangeEvents/trigger" ),
1940+ ADMIN_AUTH_HEADERS )
1941+ .post (Entity .json ("{}" ));
1942+ assertEquals (Response .Status .OK .getStatusCode (), triggerResponse .getStatus ());
1943+
1944+ // Wait for second workflow to complete
1945+ java .lang .Thread .sleep (15000 );
1946+
1947+ // Count change events after second workflow run
1948+ long secondRunOffset =
1949+ org .openmetadata .service .Entity .getCollectionDAO ().changeEventDAO ().getLatestOffset ();
1950+ long secondRunEventCount = secondRunOffset - firstRunOffset ;
1951+
1952+ LOG .info ("Second workflow run created {} change events" , secondRunEventCount );
1953+
1954+ // CRITICAL ASSERTION: Second run should create NO new change events
1955+ // because the tag is already set and EntityFieldUtils should not generate
1956+ // redundant events due to updateEntityMetadata timestamp changes
1957+ assertEquals (
1958+ 0 ,
1959+ secondRunEventCount ,
1960+ "Second workflow run should NOT create change events when no actual field changes occur. "
1961+ + "This verifies the fix for redundant updateEntityMetadata events." );
1962+
1963+ // Verify the tag is still there (no regression)
1964+ Table finalTable = tableTest .getEntity (table .getId (), "tags" , ADMIN_AUTH_HEADERS );
1965+ boolean stillHasTag =
1966+ finalTable .getTags () != null
1967+ && finalTable .getTags ().stream ().anyMatch (tag -> "Tier.Tier1" .equals (tag .getTagFQN ()));
1968+ assertTrue (stillHasTag , "Table should still have the tag after second workflow run" );
1969+
1970+ LOG .info ("✓ PASSED: Workflow field updates do not create redundant change events" );
1971+ }
1972+
1973+ @ Test
1974+ @ Order (7 )
18101975 void test_MultiEntityPeriodicQueryWithFilters (TestInfo test )
18111976 throws IOException , InterruptedException {
18121977 LOG .info ("Starting test_MultiEntityPeriodicQueryWithFilters" );
0 commit comments