From 683f93f7d6d01e9bef2bb785282c62b5e20cbb10 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 4 Sep 2025 23:28:47 +0800 Subject: [PATCH] Fix workflow can be deleted which contains failover instance --- ...PauseWorkflowInstanceExecutorDelegate.java | 8 +- ...endedWorkflowInstanceExecutorDelegate.java | 2 +- ...nningWorkflowInstanceExecutorDelegate.java | 2 +- .../StopWorkflowInstanceExecutorDelegate.java | 8 +- .../api/service/impl/ExecutorServiceImpl.java | 2 +- .../service/impl/TaskInstanceServiceImpl.java | 2 +- .../api/service/impl/TenantServiceImpl.java | 2 +- .../service/impl/WorkerGroupServiceImpl.java | 2 +- .../impl/WorkflowDefinitionServiceImpl.java | 4 +- .../impl/WorkflowInstanceServiceImpl.java | 10 +- .../controller/WorkerGroupControllerTest.java | 2 +- ...opWorkflowInstanceExecuteFunctionTest.java | 12 +- .../api/service/TenantServiceTest.java | 2 +- .../api/service/WorkerGroupServiceTest.java | 4 +- .../common/enums/WorkflowExecutionStatus.java | 175 +++++++----------- .../enums/WorkflowExecutionStatusTest.java | 93 ++++++++++ .../repository/impl/TaskInstanceDaoImpl.java | 2 +- .../impl/WorkflowInstanceDaoImpl.java | 4 +- .../dao/utils/WorkflowUtils.java | 2 +- .../impl/WorkflowInstanceDaoImplTest.java | 7 +- .../subworkflow/SubWorkflowLogicTask.java | 6 +- .../server/master/utils/DependentExecute.java | 8 +- .../WorkflowInstanceFailoverTestCase.java | 96 +++++----- ...b_workflow_not_running_in_diff_master.yaml | 2 +- .../service/alert/WorkflowAlertManager.java | 2 +- .../service/process/ProcessServiceImpl.java | 2 +- .../subworkflow/SubWorkflowService.java | 44 ----- .../subworkflow/SubWorkflowServiceImpl.java | 111 ----------- 28 files changed, 245 insertions(+), 371 deletions(-) create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatusTest.java delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java index 6405596386f3..0e673882944d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java @@ -45,7 +45,7 @@ public class PauseWorkflowInstanceExecutorDelegate public Void execute(PauseWorkflowInstanceOperation workflowInstanceControlRequest) { final WorkflowInstance workflowInstance = workflowInstanceControlRequest.workflowInstance; exceptionIfWorkflowInstanceCannotPause(workflowInstance); - if (ifWorkflowInstanceCanDirectPauseInDB(workflowInstance)) { + if (workflowInstance.getState().isCanDirectPauseInDB()) { directPauseInDB(workflowInstance); } else { pauseInMaster(workflowInstance); @@ -55,7 +55,7 @@ public Void execute(PauseWorkflowInstanceOperation workflowInstanceControlReques private void exceptionIfWorkflowInstanceCannotPause(WorkflowInstance workflowInstance) { WorkflowExecutionStatus workflowInstanceState = workflowInstance.getState(); - if (workflowInstanceState.canPause()) { + if (workflowInstanceState.isCanPause()) { return; } throw new ServiceException( @@ -63,10 +63,6 @@ private void exceptionIfWorkflowInstanceCannotPause(WorkflowInstance workflowIns + ", can not pause"); } - private boolean ifWorkflowInstanceCanDirectPauseInDB(WorkflowInstance workflowInstance) { - return workflowInstance.getState().canDirectPauseInDB(); - } - private void directPauseInDB(WorkflowInstance workflowInstance) { workflowInstanceDao.updateWorkflowInstanceState( workflowInstance.getId(), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverSuspendedWorkflowInstanceExecutorDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverSuspendedWorkflowInstanceExecutorDelegate.java index 0e107fb9a778..2884d242a9bc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverSuspendedWorkflowInstanceExecutorDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverSuspendedWorkflowInstanceExecutorDelegate.java @@ -42,7 +42,7 @@ public class RecoverSuspendedWorkflowInstanceExecutorDelegate @Override public Void execute(RecoverSuspendedWorkflowInstanceOperation workflowInstanceControlRequest) { final WorkflowInstance workflowInstance = workflowInstanceControlRequest.workflowInstance; - if (!workflowInstance.getState().isPause() && !workflowInstance.getState().isStop()) { + if (!workflowInstance.getState().isPaused() && !workflowInstance.getState().isStopped()) { throw new ServiceException( String.format("The workflow instance: %s state is %s, cannot recovery", workflowInstance.getName(), workflowInstance.getState())); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RepeatRunningWorkflowInstanceExecutorDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RepeatRunningWorkflowInstanceExecutorDelegate.java index 8fde42e90daf..7aab82b8a26e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RepeatRunningWorkflowInstanceExecutorDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RepeatRunningWorkflowInstanceExecutorDelegate.java @@ -42,7 +42,7 @@ public class RepeatRunningWorkflowInstanceExecutorDelegate @Override public Void execute(RepeatRunningWorkflowInstanceOperation workflowInstanceControlRequest) { final WorkflowInstance workflowInstance = workflowInstanceControlRequest.workflowInstance; - if (workflowInstance.getState() == null || !workflowInstance.getState().isFinished()) { + if (workflowInstance.getState() == null || !workflowInstance.getState().isFinalState()) { throw new ServiceException( String.format("The workflow instance: %s status is %s, cannot repeat running", workflowInstance.getName(), workflowInstance.getState())); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java index 1d77a3ff4574..124c4c628504 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java @@ -46,7 +46,7 @@ public Void execute(StopWorkflowInstanceOperation workflowInstanceControlRequest final WorkflowInstance workflowInstance = workflowInstanceControlRequest.workflowInstance; exceptionIfWorkflowInstanceCannotStop(workflowInstance); - if (ifWorkflowInstanceCanDirectStopInDB(workflowInstance)) { + if (workflowInstance.getState().isCanDirectStopInDB()) { directStopInDB(workflowInstance); } else { stopInMaster(workflowInstance); @@ -56,7 +56,7 @@ public Void execute(StopWorkflowInstanceOperation workflowInstanceControlRequest void exceptionIfWorkflowInstanceCannotStop(WorkflowInstance workflowInstance) { final WorkflowExecutionStatus workflowInstanceState = workflowInstance.getState(); - if (workflowInstanceState.canStop()) { + if (workflowInstanceState.isCanStop()) { return; } throw new ServiceException( @@ -64,10 +64,6 @@ void exceptionIfWorkflowInstanceCannotStop(WorkflowInstance workflowInstance) { + ", can not stop"); } - boolean ifWorkflowInstanceCanDirectStopInDB(WorkflowInstance workflowInstance) { - return workflowInstance.getState().canDirectStopInDB(); - } - void directStopInDB(WorkflowInstance workflowInstance) { workflowInstanceDao.updateWorkflowInstanceState( workflowInstance.getId(), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index fac2c5726575..c93c23791518 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -329,7 +329,7 @@ public WorkflowExecuteResponse executeTask(User loginUser, WorkflowInstance workflowInstance = processService.findWorkflowInstanceDetailById(workflowInstanceId) .orElseThrow(() -> new ServiceException(Status.WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId)); - if (!workflowInstance.getState().isFinished()) { + if (!workflowInstance.getState().isFinalState()) { log.error("Can not execute task for workflow instance which is not finished, workflowInstanceId:{}.", workflowInstanceId); putMsg(response, Status.WORKFLOW_INSTANCE_IS_NOT_FINISHED); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index d7f6faea53d8..1e599eb7a778 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -218,7 +218,7 @@ public void forceTaskSuccess(User loginUser, long projectCode, Integer taskInsta WorkflowInstance workflowInstance = workflowInstanceDao.queryOptionalById(task.getWorkflowInstanceId()) .orElseThrow( () -> new ServiceException(Status.WORKFLOW_INSTANCE_NOT_EXIST, task.getWorkflowInstanceId())); - if (!workflowInstance.getState().isFinished()) { + if (!workflowInstance.getState().isFinalState()) { throw new ServiceException("The workflow instance is not finished: " + workflowInstance.getState() + " cannot force start task instance"); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java index d318892a4d24..f2dab3b868ee 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java @@ -258,7 +258,7 @@ public void deleteTenantById(User loginUser, int id) throws Exception { private List getWorkflowInstancesByTenant(Tenant tenant) { return workflowInstanceMapper.queryByTenantCodeAndStatus( tenant.getTenantCode(), - WorkflowExecutionStatus.getNotTerminalStatus()); + WorkflowExecutionStatus.NOT_TERMINAL_STATES); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index eefe30767026..331e0ba77af2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -346,7 +346,7 @@ public Map deleteWorkerGroupById(User loginUser, Integer id) { } List workflowInstances = workflowInstanceMapper.queryByWorkerGroupNameAndStatus( workerGroup.getName(), - WorkflowExecutionStatus.getNotTerminalStatus()); + WorkflowExecutionStatus.NOT_TERMINAL_STATES); if (CollectionUtils.isNotEmpty(workflowInstances)) { List workflowInstanceIds = workflowInstances.stream().map(WorkflowInstance::getId).collect(Collectors.toList()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java index 3a72c7b116e8..6d9678d06d83 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java @@ -1054,7 +1054,7 @@ private void workflowDefinitionUsedInOtherTaskValid(WorkflowDefinition workflowD // check workflow instances is already running List workflowInstances = workflowInstanceService.queryByWorkflowDefinitionCodeAndStatus( - workflowDefinition.getCode(), WorkflowExecutionStatus.getNotTerminalStatus()); + workflowDefinition.getCode(), WorkflowExecutionStatus.NOT_TERMINAL_STATES); if (CollectionUtils.isNotEmpty(workflowInstances)) { throw new ServiceException(Status.DELETE_WORKFLOW_DEFINITION_EXECUTING_FAIL, workflowInstances.size()); } @@ -2405,7 +2405,7 @@ public void deleteWorkflowDefinitionVersion(User loginUser, List workflowInstances = workflowInstanceService.queryByWorkflowCodeVersionStatus( code, version, - WorkflowExecutionStatus.getNotTerminalStatus()); + WorkflowExecutionStatus.NOT_TERMINAL_STATES); if (CollectionUtils.isNotEmpty(workflowInstances)) { throw new ServiceException(Status.DELETE_WORKFLOW_DEFINITION_EXECUTING_FAIL, workflowInstances.size()); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java index cee3d69332fa..1b3a63fec4b4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java @@ -655,9 +655,9 @@ public Map updateWorkflowInstance(User loginUser, long projectCo return result; } // check workflow instance status - if (!workflowInstance.getState().isFinished()) { + if (!workflowInstance.getState().isFinalState()) { log.warn("workflow Instance state is {} so can not update workflow instance, workflowInstanceId:{}.", - workflowInstance.getState().getDesc(), workflowInstanceId); + workflowInstance.getState().name(), workflowInstanceId); putMsg(result, WORKFLOW_INSTANCE_STATE_OPERATION_ERROR, workflowInstance.getName(), workflowInstance.getState().toString(), "update"); return result; @@ -835,9 +835,9 @@ public void deleteWorkflowInstanceById(User loginUser, Integer workflowInstanceI projectService.checkProjectAndAuthThrowException(loginUser, project, ApiFuncIdentificationConstant.INSTANCE_DELETE); // check workflow instance status - if (!workflowInstance.getState().isFinished()) { + if (!workflowInstance.getState().isFinalState()) { log.warn("workflow Instance state is {} so can not delete workflow instance, workflowInstanceId:{}.", - workflowInstance.getState().getDesc(), workflowInstanceId); + workflowInstance.getState().name(), workflowInstanceId); throw new ServiceException(WORKFLOW_INSTANCE_STATE_OPERATION_ERROR, workflowInstance.getName(), workflowInstance.getState(), "delete"); } @@ -1076,7 +1076,7 @@ public void deleteWorkflowInstanceByWorkflowDefinitionCode(long workflowDefiniti } log.info("Begin to delete workflow instance, workflow definition code: {}", workflowDefinitionCode); for (WorkflowInstance workflowInstance : workflowInstances) { - if (!workflowInstance.getState().isFinished()) { + if (!workflowInstance.getState().isFinalState()) { log.warn("Workflow instance is not finished cannot delete, workflow instance id:{}", workflowInstance.getId()); throw new ServiceException(WORKFLOW_INSTANCE_STATE_OPERATION_ERROR, workflowInstance.getName(), diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java index 06ac58fde5ce..6153648383cf 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java @@ -135,7 +135,7 @@ public void testDeleteById() throws Exception { workerGroup.setName("测试"); Mockito.when(workerGroupDao.queryById(12)).thenReturn(workerGroup); Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus("测试", - WorkflowExecutionStatus.getNotTerminalStatus())) + WorkflowExecutionStatus.NOT_TERMINAL_STATES)) .thenReturn(null); Mockito.when(workerGroupDao.deleteById(12)).thenReturn(true); Mockito.when(workflowInstanceMapper.updateWorkflowInstanceByWorkerGroupName("测试", "")).thenReturn(1); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecuteFunctionTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecuteFunctionTest.java index 60e11477a085..cd982352aa00 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecuteFunctionTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecuteFunctionTest.java @@ -96,21 +96,13 @@ void exceptionIfWorkflowInstanceCannotStop_canNotStop(WorkflowExecutionStatus wo @EnumSource(value = WorkflowExecutionStatus.class, names = { "SERIAL_WAIT"}) void ifWorkflowInstanceCanDirectStopInDB_canDirectStopInDB(WorkflowExecutionStatus workflowExecutionStatus) { - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setName("Workflow-1"); - workflowInstance.setState(workflowExecutionStatus); - Assertions - .assertTrue(stopWorkflowInstanceExecutorDelegate.ifWorkflowInstanceCanDirectStopInDB(workflowInstance)); + Assertions.assertTrue(workflowExecutionStatus.isCanDirectStopInDB()); } @ParameterizedTest @EnumSource(value = WorkflowExecutionStatus.class, names = { "SERIAL_WAIT"}, mode = EnumSource.Mode.EXCLUDE) void ifWorkflowInstanceCanDirectStopInDB_canNotDirectStopInDB(WorkflowExecutionStatus workflowExecutionStatus) { - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setName("Workflow-1"); - workflowInstance.setState(workflowExecutionStatus); - Assertions.assertFalse( - stopWorkflowInstanceExecutorDelegate.ifWorkflowInstanceCanDirectStopInDB(workflowInstance)); + Assertions.assertFalse(workflowExecutionStatus.isCanDirectStopInDB()); } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java index c595415b0b38..58a36c6348c6 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java @@ -192,7 +192,7 @@ public void testDeleteTenantById() { baseServiceLogger)).thenReturn(true); when(tenantMapper.queryById(1)).thenReturn(getTenant()); when(workflowInstanceMapper.queryByTenantCodeAndStatus(tenantCode, - WorkflowExecutionStatus.getNotTerminalStatus())) + WorkflowExecutionStatus.NOT_TERMINAL_STATES)) .thenReturn(getInstanceList()); when(scheduleMapper.queryScheduleListByTenant(tenantCode)).thenReturn(getScheduleList()); when(userMapper.queryUserListByTenant(3)).thenReturn(getUserList()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java index 01a140635edf..851cc8ac7b1b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java @@ -237,7 +237,7 @@ public void giveRunningProcess_whenDeleteWorkerGroupById_expectFailed() { List workflowInstances = new ArrayList(); workflowInstances.add(workflowInstance); when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), - WorkflowExecutionStatus.getNotTerminalStatus())) + WorkflowExecutionStatus.NOT_TERMINAL_STATES)) .thenReturn(workflowInstances); Map deleteFailed = workerGroupService.deleteWorkerGroupById(loginUser, 1); @@ -255,7 +255,7 @@ public void giveValidParams_whenDeleteWorkerGroupById_expectSuccess() { WorkerGroup workerGroup = getWorkerGroup(1); when(workerGroupDao.queryById(1)).thenReturn(workerGroup); when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), - WorkflowExecutionStatus.getNotTerminalStatus())).thenReturn(null); + WorkflowExecutionStatus.NOT_TERMINAL_STATES)).thenReturn(null); when(workerGroupDao.deleteById(1)).thenReturn(true); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java index 1cf115b4a250..8c7096060dc8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java @@ -17,110 +17,95 @@ package org.apache.dolphinscheduler.common.enums; -import java.util.HashMap; -import java.util.Map; +import java.util.Arrays; import lombok.Getter; -import lombok.NonNull; import com.baomidou.mybatisplus.annotation.EnumValue; @Getter public enum WorkflowExecutionStatus { - SUBMITTED_SUCCESS(0, "submitted"), - RUNNING_EXECUTION(1, "running"), - READY_PAUSE(2, "ready pause"), - PAUSE(3, "pause"), - READY_STOP(4, "ready stop"), - STOP(5, "stop"), - FAILURE(6, "failure"), - SUCCESS(7, "success"), - SERIAL_WAIT(14, "serial wait"), - FAILOVER(18, "failover"); - - private static final Map CODE_MAP = new HashMap<>(); - private static final int[] NEED_FAILOVER_STATES = new int[]{ - RUNNING_EXECUTION.getCode(), - READY_PAUSE.getCode(), - READY_STOP.getCode() - }; - - private static final int[] NOT_TERMINAL_STATUS = new int[]{ - SUBMITTED_SUCCESS.getCode(), - RUNNING_EXECUTION.getCode(), - READY_PAUSE.getCode(), - READY_STOP.getCode(), - SERIAL_WAIT.getCode() - }; - - static { - for (WorkflowExecutionStatus executionStatus : WorkflowExecutionStatus.values()) { - CODE_MAP.put(executionStatus.getCode(), executionStatus); - } - } + SUBMITTED_SUCCESS(0, false, false, false, false, false, false), + RUNNING_EXECUTION(1, true, false, true, false, false, true), + READY_PAUSE(2, true, false, true, false, false, true), + PAUSE(3, false, false, false, false, true, false), + READY_STOP(4, true, false, false, false, false, true), + STOP(5, false, false, false, false, true, false), + FAILURE(6, false, false, false, false, true, false), + SUCCESS(7, false, false, false, false, true, false), + SERIAL_WAIT(14, true, true, true, true, false, false), + FAILOVER(18, false, false, false, false, false, false); /** - * Get WorkflowExecutionStatus by code, if the code is invalidated will throw {@link IllegalArgumentException}. + * The unique code represent of the state which will be stored in database. */ - public static @NonNull WorkflowExecutionStatus of(int code) { - WorkflowExecutionStatus workflowExecutionStatus = CODE_MAP.get(code); - if (workflowExecutionStatus == null) { - throw new IllegalArgumentException(String.format("The workflow execution status code: %s is invalidated", - code)); - } - return workflowExecutionStatus; - } + @EnumValue + private final int code; - public boolean isRunning() { - return this == RUNNING_EXECUTION; - } + /** + * Whether the instance can be stopped, if the state is final state, it can't be stopped. + * todo: Right now the SUBMITTED_SUCCESS state can't be stopped, we should support it in the future. + */ + private final boolean canStop; - public boolean canStop() { - return this == RUNNING_EXECUTION - || this == READY_PAUSE - || this == READY_STOP - || this == SERIAL_WAIT; - } + /** + * Whether the instance can be directly stopped in database, if true, the workflow instance will be directly set to STOP state in database. + * Right now only serial_wait state support this. + * todo: We should support SUBMITTED_SUCCESS state in the future. + */ + private final boolean canDirectStopInDB; - public boolean canDirectStopInDB() { - return this == SERIAL_WAIT; - } + /** + * Whether the instance can be paused, if the state is final state, it can't be paused. + * todo: Right now the SUBMITTED_SUCCESS state can't be paused, we should support it in the future. + */ + private final boolean canPause; - public boolean canPause() { - return this == RUNNING_EXECUTION - || this == READY_PAUSE - || this == SERIAL_WAIT; - } + /** + * Whether the instance can be directly paused in database, if true, the workflow instance will be directly set to PAUSE state in database. + * Right now only serial_wait state support this. + * todo: We should support SUBMITTED_SUCCESS state in the future. + */ + private final boolean canDirectPauseInDB; /** - * status can be take over on sub-workflow - * @return bool + * Whether the instance need failover when the instance running on the master which is down. + * It the instance is launched and not in final state, it should failover. */ - public boolean canTakeover() { - return this == RUNNING_EXECUTION - || this == READY_PAUSE - || this == PAUSE - || this == READY_STOP - || this == STOP - || this == FAILURE - || this == SUCCESS - || this == FAILOVER; - } + private final boolean needFailover; - public boolean canDirectPauseInDB() { - return this == SERIAL_WAIT; + /** + * Whether the state is a final state, if true, means the instance is finished and the state will not change unless the user retry/recover it. + */ + private final boolean finalState; + + WorkflowExecutionStatus(int code, + boolean canStop, + boolean canDirectStopInDB, + boolean canPause, + boolean canDirectPauseInDB, + boolean finalState, + boolean needFailover) { + this.code = code; + this.canStop = canStop; + this.canDirectStopInDB = canDirectStopInDB; + this.canPause = canPause; + this.canDirectPauseInDB = canDirectPauseInDB; + this.finalState = finalState; + this.needFailover = needFailover; } - public boolean isFinished() { - return isSuccess() || isFailure() || isStop() || isPause(); - } + public static final int[] NEED_FAILOVER_STATES = Arrays.stream(WorkflowExecutionStatus.values()) + .filter(WorkflowExecutionStatus::isNeedFailover) + .mapToInt(WorkflowExecutionStatus::getCode) + .toArray(); + + public static final int[] NOT_TERMINAL_STATES = Arrays.stream(WorkflowExecutionStatus.values()) + .filter(workflowExecutionStatus -> !workflowExecutionStatus.isFinalState()) + .mapToInt(WorkflowExecutionStatus::getCode) + .toArray(); - /** - * status is success - * - * @return status - */ public boolean isSuccess() { return this == SUCCESS; } @@ -129,36 +114,14 @@ public boolean isFailure() { return this == FAILURE; } - public boolean isPause() { + public boolean isPaused() { return this == PAUSE; } - public boolean isReadyStop() { - return this == READY_STOP; - } - - public boolean isStop() { + public boolean isStopped() { return this == STOP; } - public static int[] getNeedFailoverWorkflowInstanceState() { - return NEED_FAILOVER_STATES; - } - - public static int[] getNotTerminalStatus() { - return NOT_TERMINAL_STATUS; - } - - @EnumValue - private final int code; - - private final String desc; - - WorkflowExecutionStatus(int code, String desc) { - this.code = code; - this.desc = desc; - } - @Override public String toString() { return name(); diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatusTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatusTest.java new file mode 100644 index 000000000000..7fdf81e36313 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatusTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +import static com.google.common.truth.Truth.assertThat; + +import org.junit.jupiter.api.Test; + +class WorkflowExecutionStatusTest { + + @Test + void testIsSuccess() { + assertThat(WorkflowExecutionStatus.SUCCESS.isSuccess()).isTrue(); + } + + @Test + void testIsFailure() { + assertThat(WorkflowExecutionStatus.FAILURE.isFailure()).isTrue(); + } + + @Test + void testIsPaused() { + assertThat(WorkflowExecutionStatus.PAUSE.isPaused()).isTrue(); + } + + @Test + void testIsStopped() { + assertThat(WorkflowExecutionStatus.STOP.isStopped()).isTrue(); + } + + @Test + void testNonTerminalStates() { + assertThat(WorkflowExecutionStatus.NOT_TERMINAL_STATES).asList().containsExactly( + WorkflowExecutionStatus.SUBMITTED_SUCCESS.getCode(), + WorkflowExecutionStatus.RUNNING_EXECUTION.getCode(), + WorkflowExecutionStatus.READY_PAUSE.getCode(), + WorkflowExecutionStatus.READY_STOP.getCode(), + WorkflowExecutionStatus.SERIAL_WAIT.getCode(), + WorkflowExecutionStatus.FAILOVER.getCode()); + } + + @Test + void testNeedFailoverStates() { + assertThat(WorkflowExecutionStatus.NEED_FAILOVER_STATES).asList().containsExactly( + WorkflowExecutionStatus.RUNNING_EXECUTION.getCode(), + WorkflowExecutionStatus.READY_PAUSE.getCode(), + WorkflowExecutionStatus.READY_STOP.getCode()); + } + + @Test + void testCanStop() { + assertThat(WorkflowExecutionStatus.SUBMITTED_SUCCESS.isCanStop()).isFalse(); + assertThat(WorkflowExecutionStatus.RUNNING_EXECUTION.isCanStop()).isTrue(); + assertThat(WorkflowExecutionStatus.READY_PAUSE.isCanStop()).isTrue(); + assertThat(WorkflowExecutionStatus.PAUSE.isCanStop()).isFalse(); + assertThat(WorkflowExecutionStatus.READY_STOP.isCanStop()).isTrue(); + assertThat(WorkflowExecutionStatus.STOP.isCanStop()).isFalse(); + assertThat(WorkflowExecutionStatus.FAILURE.isCanStop()).isFalse(); + assertThat(WorkflowExecutionStatus.SUCCESS.isCanStop()).isFalse(); + assertThat(WorkflowExecutionStatus.SERIAL_WAIT.isCanStop()).isTrue(); + assertThat(WorkflowExecutionStatus.FAILOVER.isCanStop()).isFalse(); + } + + @Test + void testCanPause() { + assertThat(WorkflowExecutionStatus.SUBMITTED_SUCCESS.isCanPause()).isFalse(); + assertThat(WorkflowExecutionStatus.RUNNING_EXECUTION.isCanPause()).isTrue(); + assertThat(WorkflowExecutionStatus.READY_PAUSE.isCanPause()).isTrue(); + assertThat(WorkflowExecutionStatus.PAUSE.isCanPause()).isFalse(); + assertThat(WorkflowExecutionStatus.READY_STOP.isCanPause()).isFalse(); + assertThat(WorkflowExecutionStatus.STOP.isCanPause()).isFalse(); + assertThat(WorkflowExecutionStatus.FAILURE.isCanPause()).isFalse(); + assertThat(WorkflowExecutionStatus.SUCCESS.isCanPause()).isFalse(); + assertThat(WorkflowExecutionStatus.SERIAL_WAIT.isCanPause()).isTrue(); + assertThat(WorkflowExecutionStatus.FAILOVER.isCanPause()).isFalse(); + } + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java index 1bed138bda03..c675f502aae4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java @@ -66,7 +66,7 @@ public boolean upsertTaskInstance(TaskInstance taskInstance) { @Override public boolean submitTaskInstanceToDB(TaskInstance taskInstance, WorkflowInstance workflowInstance) { WorkflowExecutionStatus processInstanceState = workflowInstance.getState(); - if (processInstanceState.isFinished() || processInstanceState == WorkflowExecutionStatus.READY_STOP) { + if (processInstanceState.isFinalState() || processInstanceState == WorkflowExecutionStatus.READY_STOP) { log.warn("processInstance: {} state was: {}, skip submit this task, taskCode: {}", workflowInstance.getId(), processInstanceState, diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java index cfc381ab8a0c..7e58b8495c6d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java @@ -171,12 +171,12 @@ public List queryByWorkflowCodeVersionStatus(Long workflowDefi @Override public List queryNeedFailoverMasters() { return mybatisMapper - .queryNeedFailoverWorkflowInstanceHost(WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState()); + .queryNeedFailoverWorkflowInstanceHost(WorkflowExecutionStatus.NEED_FAILOVER_STATES); } @Override public List queryNeedFailoverWorkflowInstances(String masterAddress) { return mybatisMapper.queryByHostAndStatus(masterAddress, - WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState()); + WorkflowExecutionStatus.NEED_FAILOVER_STATES); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkflowUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkflowUtils.java index 839ffc7dc9ef..ad07d9e0237c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkflowUtils.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkflowUtils.java @@ -35,7 +35,7 @@ public class WorkflowUtils { * @return workflow duration */ public static String getWorkflowInstanceDuration(WorkflowInstance workflowInstance) { - return workflowInstance.getState() != null && workflowInstance.getState().isFinished() + return workflowInstance.getState() != null && workflowInstance.getState().isFinalState() ? DateUtils.format2Duration(workflowInstance.getStartTime(), workflowInstance.getEndTime()) : DateUtils.format2Duration(workflowInstance.getStartTime(), new Date()); } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java index 520a9b907e78..1860b0d3893f 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java @@ -41,17 +41,16 @@ class WorkflowInstanceDaoImplTest extends BaseDaoTest { void queryByWorkflowCodeVersionStatus_EMPTY_INSTANCE() { long workflowDefinitionCode = 1L; int workflowDefinitionVersion = 1; - int[] status = WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState(); assertTrue(isEmpty(workflowInstanceDao.queryByWorkflowCodeVersionStatus(workflowDefinitionCode, - workflowDefinitionVersion, status))); + workflowDefinitionVersion, WorkflowExecutionStatus.NEED_FAILOVER_STATES))); } @Test void queryByWorkflowCodeVersionStatus_EXIST_NOT_FINISH_INSTANCE() { long workflowDefinitionCode = 1L; int workflowDefinitionVersion = 1; - int[] status = WorkflowExecutionStatus.getNotTerminalStatus(); + int[] status = WorkflowExecutionStatus.NOT_TERMINAL_STATES; assertTrue(isEmpty(workflowInstanceDao.queryByWorkflowCodeVersionStatus(workflowDefinitionCode, workflowDefinitionVersion, status))); @@ -101,7 +100,7 @@ void updateWorkflowInstanceState_failed() { void queryByWorkflowCodeVersionStatus_EXIST_FINISH_INSTANCE() { long workflowDefinitionCode = 1L; int workflowDefinitionVersion = 1; - int[] status = WorkflowExecutionStatus.getNotTerminalStatus(); + int[] status = WorkflowExecutionStatus.NOT_TERMINAL_STATES; workflowInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion, WorkflowExecutionStatus.PAUSE)); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java index 82c435583f4f..89222ebc2468 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java @@ -167,9 +167,9 @@ private SubWorkflowLogicTaskRuntimeContext recoverFromFaultTolerantTasks() { final WorkflowInstance subWorkflowInstance = workflowInstanceDao.queryById( subWorkflowLogicTaskRuntimeContext.getSubWorkflowInstanceId()); - if (subWorkflowInstance != null && subWorkflowInstance.getState().canTakeover()) { - // Here we only need to take over the runtime context of sub-workflow, - // the sub-workflow will be failover by master-server when needed. + if (subWorkflowInstance != null) { + // If the sub workflow instance is existed, means we already trigger the sub workflow instance. + // So we don't need to trigger again. return subWorkflowLogicTaskRuntimeContext; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java index a66ed1bb337a..e6e4d16ff153 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java @@ -180,7 +180,7 @@ private DependResult calculateResultForTasks(DependentItem dependentItem, * @return */ private DependResult dependResultByWorkflowInstance(WorkflowInstance workflowInstance) { - if (!workflowInstance.getState().isFinished()) { + if (!workflowInstance.getState().isFinalState()) { return DependResult.WAITING; } if (workflowInstance.getState().isSuccess()) { @@ -199,7 +199,7 @@ private DependResult dependResultByWorkflowInstance(WorkflowInstance workflowIns * @return */ private DependResult dependResultByAllTaskOfWorkflowInstance(WorkflowInstance workflowInstance) { - if (!workflowInstance.getState().isFinished()) { + if (!workflowInstance.getState().isFinalState()) { log.info( "Wait for the dependent workflow to complete, workflowDefinitionCode: {}, pworkflowInstanceId: {}.", workflowInstance.getWorkflowDefinitionCode(), workflowInstance.getId()); @@ -272,7 +272,7 @@ private DependResult dependResultBySingleTaskInstance(WorkflowInstance workflowI return DependResult.SUCCESS; } - if (!workflowInstance.getState().isFinished()) { + if (!workflowInstance.getState().isFinalState()) { log.info( "Wait for the dependent workflow to complete, workflowDefinitionCode: {}, workflowInstanceId: {}.", workflowInstance.getWorkflowDefinitionCode(), workflowInstance.getId()); @@ -362,7 +362,7 @@ private DependResult getDependResultOfTask(WorkflowInstance workflowInstance, Ta } else if (state.isSuccess()) { return DependResult.SUCCESS; } else { - if (workflowInstance.getState().isRunning() + if (!workflowInstance.getState().isFinalState() && taskInstance.getRetryTimes() < taskInstance.getMaxRetryTimes()) { log.info("taskDefinitionCode: {}, taskDefinitionName: {}, retryTimes: {}, maxRetryTimes: {}", taskInstance.getTaskCode(), taskInstance.getName(), taskInstance.getRetryTimes(), diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java index ab6f751df4f8..1be7f71f7fe7 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java @@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus; import org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMasterFailoverEvent; import org.apache.dolphinscheduler.server.master.engine.system.event.MasterFailoverEvent; -import org.apache.dolphinscheduler.server.master.failover.FailoverCoordinator; import org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext; import org.apache.commons.lang3.StringUtils; @@ -45,6 +44,7 @@ import java.time.Duration; import java.util.Date; import java.util.List; +import java.util.Objects; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -54,9 +54,6 @@ public class WorkflowInstanceFailoverTestCase extends AbstractMasterIntegrationT @Autowired private SystemEventBus systemEventBus; - @Autowired - FailoverCoordinator failoverCoordinator; - @Test public void testGlobalFailover_runningWorkflow_withSubmittedTasks() { final String yaml = "/it/failover/running_workflowInstance_with_one_submitted_fake_task.yaml"; @@ -895,69 +892,62 @@ public void testMasterFailover_runningWorkflow_takeOverSubWorkflowOnChildNotHeal assertThat(mainWorkflowInstance).isNotNull(); assertThat(submittedSubWorkflowInstance).isNotNull(); - MasterServerMetadata masterServerMain = MasterServerMetadata.builder() - .cpuUsage(0.2) - .memoryUsage(0.4) - .serverStatus(ServerStatus.NORMAL) - .address(mainWorkflowInstance.getHost()) - .build(); - MasterServerMetadata masterServerSub = MasterServerMetadata.builder() - .cpuUsage(0.2) - .memoryUsage(0.4) - .serverStatus(ServerStatus.NORMAL) - .address(submittedSubWorkflowInstance.getHost()) - .build(); - - // first start workflow to simulate the normal parent workflow - systemEventBus.publish(MasterFailoverEvent.of(masterServerMain, new Date(), 0)); - - final String mainMasterFailoverNodePath = RegistryUtils.getFailoveredNodePath( - masterServerMain.getAddress(), - masterServerMain.getServerStartupTime(), - masterServerMain.getProcessId()); - // wait failover main-workflow + // Failover the main and sub workflow master + systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); + // wait failover finished await() .atMost(Duration.ofMinutes(1)) .pollInterval(Duration.ofMillis(500)) .untilAsserted(() -> { - assertThat(registryClient.exists(mainMasterFailoverNodePath)).isTrue(); + assertThat(registryClient + .exists(RegistryUtils.getGlobalMasterFailoverNodePath(mainWorkflowInstance.getHost()))) + .isTrue(); + assertThat(registryClient.exists( + RegistryUtils.getGlobalMasterFailoverNodePath(submittedSubWorkflowInstance.getHost()))) + .isTrue(); }); - await() - .atMost(Duration.ofMinutes(1)) - .pollInterval(Duration.ofMillis(500)) - .untilAsserted(() -> { - assertThat(repository.queryWorkflowInstance(1).getState()) - .isEqualTo(WorkflowExecutionStatus.SUCCESS); - }); + // Two workflow instance is success await() .atMost(Duration.ofMinutes(1)) .pollInterval(Duration.ofMillis(500)) .untilAsserted(() -> { assertThat(repository.queryAllWorkflowInstance()) - .hasSize(3) - .filteredOn(workflowInstance -> workflowInstance.getId() == 3) - .allSatisfy(workflowInstance -> { - assertThat(workflowInstance.getState()) - .isEqualTo(WorkflowExecutionStatus.SUCCESS); - }); - }); - - await() - .atMost(Duration.ofMinutes(1)) - .pollInterval(Duration.ofMillis(500)) - .untilAsserted(() -> { - assertThat(repository.queryAllTaskInstance()) - .hasSize(3) - .filteredOn(taskInstance -> taskInstance.getId() > 1) - .allSatisfy(taskInstance -> { - assertThat(taskInstance.getState()) - .isEqualTo(TaskExecutionStatus.SUCCESS); + .hasSize(2) + .satisfies(workflowInstances -> { + // Main workflow instance should be success + // And the sub-workflow task instance should be success too. + assertThat(workflowInstances) + .filteredOn(workflowInstance -> Objects.equals(workflowInstance.getId(), + mainWorkflowInstance.getId())) + .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState()) + .isEqualTo(WorkflowExecutionStatus.SUCCESS)) + .satisfiesExactly(workflowInstance -> { + assertThat(repository.queryTaskInstance(mainWorkflowInstance.getId())) + .hasSize(2) + .anySatisfy(taskInstance -> assertThat(taskInstance.getState()) + .isEqualTo(TaskExecutionStatus.NEED_FAULT_TOLERANCE)) + .anySatisfy(taskInstance -> assertThat(taskInstance.getState()) + .isEqualTo(TaskExecutionStatus.SUCCESS)); + }); + // Sub-workflow instance should be success. + // The task instance in sub-workflow should be failover. + assertThat(workflowInstances) + .filteredOn(workflowInstance -> Objects.equals(workflowInstance.getId(), + submittedSubWorkflowInstance.getId())) + .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState()) + .isEqualTo(WorkflowExecutionStatus.SUCCESS)) + .satisfiesExactly(workflowInstance -> { + assertThat( + repository.queryTaskInstance(submittedSubWorkflowInstance.getId())) + .hasSize(1) + .satisfiesExactly( + taskInstance -> assertThat(taskInstance.getState()) + .isEqualTo(TaskExecutionStatus.SUCCESS)); + }); }); }); - masterContainer.assertAllResourceReleased(); - } @Test diff --git a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml index a97c4f4e45f0..c519312248b0 100644 --- a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml +++ b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml @@ -101,7 +101,7 @@ workflowInstances: workflowDefinitionCode: 2 workflowDefinitionVersion: 1 projectCode: 1 - state: SUBMITTED_SUCCESS + state: RUNNING_EXECUTION recovery: NO startTime: 2025-04-24 18:00:00 endTime: null diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java index ede66d3eaa86..b792ee508962 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java @@ -240,7 +240,7 @@ public boolean isNeedToSendWarning(WorkflowInstance workflowInstance) { WarningType warningType = workflowInstance.getWarningType(); switch (warningType) { case ALL: - if (workflowInstance.getState().isFinished()) { + if (workflowInstance.getState().isFinalState()) { sendWarning = true; } break; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index bf6805afa4b8..fb9b66b93695 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -754,7 +754,7 @@ public String findConfigYamlByName(String clusterName) { public void forceWorkflowInstanceSuccessByTaskInstanceId(TaskInstance task) { WorkflowInstance workflowInstance = findWorkflowInstanceDetailById(task.getWorkflowInstanceId()).orElse(null); if (workflowInstance != null - && (workflowInstance.getState().isFailure() || workflowInstance.getState().isStop())) { + && (workflowInstance.getState().isFailure() || workflowInstance.getState().isStopped())) { List validTaskList = taskInstanceDao.queryValidTaskListByWorkflowInstanceId(workflowInstance.getId()); List instanceTaskCodeList = diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java deleted file mode 100644 index ae50ddaa2acd..000000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.subworkflow; - -import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow; -import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; - -import java.util.List; - -import org.springframework.stereotype.Component; - -@Component -public interface SubWorkflowService { - - List getAllDynamicSubWorkflow(long processInstanceId, long taskCode); - - int batchInsertRelationSubWorkflow(List relationSubWorkflowList); - - List filterFinishProcessInstances(List workflowInstanceList); - - List filterSuccessProcessInstances(List workflowInstanceList); - - List filterRunningProcessInstances(List workflowInstanceList); - - List filterFailedProcessInstances(List workflowInstanceList); - - List getWorkflowOutputParameters(WorkflowInstance workflowInstance); -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java deleted file mode 100644 index cbdc99672298..000000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.subworkflow; - -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow; -import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog; -import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.dao.mapper.RelationSubWorkflowMapper; -import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper; -import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class SubWorkflowServiceImpl implements SubWorkflowService { - - @Autowired - private RelationSubWorkflowMapper relationSubWorkflowMapper; - - @Autowired - private WorkflowInstanceDao workflowInstanceDao; - - @Autowired - private WorkflowDefinitionLogMapper workflowDefinitionLogMapper; - - @Override - public List getAllDynamicSubWorkflow(long processInstanceId, long taskCode) { - List relationSubWorkflows = - relationSubWorkflowMapper.queryAllSubWorkflowInstance(processInstanceId, taskCode); - List allSubProcessInstanceId = relationSubWorkflows.stream() - .map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(Collectors.toList()); - - List allSubWorkflowInstance = workflowInstanceDao.queryByIds(allSubProcessInstanceId); - allSubWorkflowInstance.sort(Comparator.comparing(WorkflowInstance::getId)); - return allSubWorkflowInstance; - } - - @Override - public int batchInsertRelationSubWorkflow(List relationSubWorkflowList) { - int insertN = relationSubWorkflowMapper.batchInsert(relationSubWorkflowList); - return insertN; - } - - @Override - public List filterFinishProcessInstances(List workflowInstanceList) { - return workflowInstanceList.stream() - .filter(subProcessInstance -> subProcessInstance.getState().isFinished()).collect(Collectors.toList()); - } - - @Override - public List filterSuccessProcessInstances(List workflowInstanceList) { - return workflowInstanceList.stream() - .filter(subProcessInstance -> subProcessInstance.getState().isSuccess()).collect(Collectors.toList()); - } - - @Override - public List filterRunningProcessInstances(List workflowInstanceList) { - return workflowInstanceList.stream() - .filter(subProcessInstance -> subProcessInstance.getState().isRunning()).collect(Collectors.toList()); - } - - @Override - public List filterFailedProcessInstances(List workflowInstanceList) { - return workflowInstanceList.stream() - .filter(subProcessInstance -> subProcessInstance.getState().isFailure()).collect(Collectors.toList()); - } - - @Override - public List getWorkflowOutputParameters(WorkflowInstance workflowInstance) { - List outputParamList = - new ArrayList<>(JSONUtils.toList(workflowInstance.getVarPool(), Property.class)); - - WorkflowDefinitionLog processDefinition = workflowDefinitionLogMapper - .queryByDefinitionCodeAndVersion(workflowInstance.getWorkflowDefinitionCode(), - workflowInstance.getWorkflowDefinitionVersion()); - List globalParamList = JSONUtils.toList(processDefinition.getGlobalParams(), Property.class); - - Set ouputParamSet = outputParamList.stream().map(Property::getProp).collect(Collectors.toSet()); - - // add output global parameters which are not in output parameters list - globalParamList.stream().filter(globalParam -> !ouputParamSet.contains(globalParam.getProp())) - .forEach(outputParamList::add); - - return outputParamList; - - } -}