From cab06c185f3ceb6a9cc97f3d5800b038e52e555c Mon Sep 17 00:00:00 2001 From: Zzih96 Date: Thu, 28 Aug 2025 14:08:04 +0800 Subject: [PATCH] add workflow execution strategy --- .../dao/repository/WorkflowInstanceDao.java | 9 + .../impl/WorkflowInstanceDaoImpl.java | 5 + .../RecoverSerialWaitCommandHandler.java | 132 ++++++++ .../handler/RunWorkflowCommandHandler.java | 15 +- .../AbstractWorkflowStateAction.java | 12 + .../WorkflowExecutionStrategyService.java | 45 +++ .../WorkflowExecutionStrategyServiceImpl.java | 310 ++++++++++++++++++ 7 files changed, 525 insertions(+), 3 deletions(-) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/strategy/WorkflowExecutionStrategyService.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/strategy/WorkflowExecutionStrategyServiceImpl.java diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java index 9f8f11687f4c..6208f786c5cf 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java @@ -91,6 +91,15 @@ List queryByWorkflowCodeVersionStatus(Long workflowDefinitionC int workflowDefinitionVersion, int[] states); + /** + * query workflow instance by workflowDefinitionCode and stateArray + * + * @param workflowDefinitionCode workflowDefinitionCode + * @param states states array + * @return workflow instance list + */ + List queryByWorkflowDefinitionCodeAndStatus(Long workflowDefinitionCode, int[] states); + List queryNeedFailoverMasters(); /** diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java index cfc381ab8a0c..56976befb52b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java @@ -168,6 +168,11 @@ public List queryByWorkflowCodeVersionStatus(Long workflowDefi states); } + @Override + public List queryByWorkflowDefinitionCodeAndStatus(Long workflowDefinitionCode, int[] states) { + return mybatisMapper.queryByWorkflowDefinitionCodeAndStatus(workflowDefinitionCode, states); + } + @Override public List queryNeedFailoverMasters() { return mybatisMapper diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java new file mode 100644 index 000000000000..82b6619f9509 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.command.handler; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; +import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; +import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph; +import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; + +import java.util.Set; +import java.util.function.BiConsumer; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +/** + * This handler used to handle {@link CommandType#RECOVER_SERIAL_WAIT}. + * Will recover a workflow instance from serial wait state to running state. + */ +@Slf4j +@Component +public class RecoverSerialWaitCommandHandler extends AbstractCommandHandler { + + @Autowired + private WorkflowInstanceDao workflowInstanceDao; + + @Autowired + private ApplicationContext applicationContext; + + /** + * Generate the recover workflow instance from serial wait state. + * Will use the origin workflow instance, but will update the following fields: + *
    + *
  • state: from SERIAL_WAIT to RUNNING_EXECUTION
  • + *
  • command type
  • + *
  • start time
  • + *
  • restart time
  • + *
+ */ + @Override + protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { + final Command command = workflowExecuteContextBuilder.getCommand(); + final int workflowInstanceId = command.getWorkflowInstanceId(); + + log.info("Recovering workflow instance from SERIAL_WAIT state, instance id: {}", workflowInstanceId); + + final WorkflowInstance workflowInstance = workflowInstanceDao.queryOptionalById(workflowInstanceId) + .orElseThrow(() -> new IllegalArgumentException("Cannot find WorkflowInstance:" + workflowInstanceId)); + + log.info("Found workflow instance: {} (id: {}), current state: {}", + workflowInstance.getName(), workflowInstance.getId(), workflowInstance.getState()); + + // Check if the workflow instance is in SERIAL_WAIT state + if (workflowInstance.getState() != WorkflowExecutionStatus.SERIAL_WAIT) { + log.warn("Workflow instance {} is not in SERIAL_WAIT state, current state: {}", + workflowInstance.getName(), workflowInstance.getState()); + throw new IllegalStateException("Workflow instance is not in SERIAL_WAIT state"); + } + + workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, command.getCommandType().name()); + workflowInstance.setCommandType(command.getCommandType()); + workflowInstanceDao.updateById(workflowInstance); + + log.info("Successfully recovered workflow instance {} from SERIAL_WAIT to RUNNING_EXECUTION", + workflowInstance.getName()); + workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); + } + + @Override + protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { + // For serial wait recovery, we need to create the workflow execution graph from the beginning + // This is similar to RunWorkflowCommandHandler + final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph(); + final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); + final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { + final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = + TaskExecutionRunnableBuilder + .builder() + .workflowExecutionGraph(workflowExecutionGraph) + .workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition()) + .project(workflowExecuteContextBuilder.getProject()) + .workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance()) + .taskDefinition(workflowGraph.getTaskNodeByName(task)) + .workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus()) + .applicationContext(applicationContext) + .build(); + workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); + workflowExecutionGraph.addEdge(task, successors); + }; + + final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = + WorkflowGraphTopologyLogicalVisitor.builder() + .taskDependType(workflowExecuteContextBuilder.getWorkflowInstance().getTaskDependType()) + .onWorkflowGraph(workflowGraph) + .fromTask(parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder)) + .doVisitFunction(taskExecutionRunnableCreator) + .build(); + workflowGraphTopologyLogicalVisitor.visit(); + + workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph); + } + + @Override + public CommandType commandType() { + return CommandType.RECOVER_SERIAL_WAIT; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java index 2e1bf835629e..f45374648ffb 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java @@ -34,8 +34,8 @@ import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor; import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; +import org.apache.dolphinscheduler.server.master.engine.workflow.strategy.WorkflowExecutionStrategyService; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; -import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.commons.collections4.CollectionUtils; @@ -70,7 +70,7 @@ public class RunWorkflowCommandHandler extends AbstractCommandHandler { private ApplicationContext applicationContext; @Autowired - private CuringParamsService curingParamsService; + private WorkflowExecutionStrategyService workflowExecutionStrategyService; /** * Will generate a new workflow instance based on the command. @@ -80,7 +80,16 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work final WorkflowDefinition workflowDefinition = workflowExecuteContextBuilder.getWorkflowDefinition(); final Command command = workflowExecuteContextBuilder.getCommand(); final WorkflowInstance workflowInstance = workflowInstanceDao.queryById(command.getWorkflowInstanceId()); - workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, command.getCommandType().name()); + + // Apply execution strategy before setting the state + boolean shouldExecute = workflowExecutionStrategyService.checkAndApplyExecutionStrategy( + workflowDefinition, workflowInstance); + + if (shouldExecute) { + workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, + command.getCommandType().name()); + } + workflowInstance.setHost(masterConfig.getMasterAddress()); workflowInstance.setCommandParam(command.getCommandParam()); workflowInstance.setGlobalParams(mergeCommandParamsWithWorkflowParams(command, workflowDefinition)); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java index 9c62a997ac10..906967bf14a4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.engine.workflow.strategy.WorkflowExecutionStrategyService; import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils; import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager; @@ -65,6 +66,9 @@ public abstract class AbstractWorkflowStateAction implements IWorkflowStateActio @Autowired protected WorkflowAlertManager workflowAlertManager; + @Autowired + protected WorkflowExecutionStrategyService workflowExecutionStrategyService; + /** * Try to trigger the tasks if the trigger condition is met. *

If all the given tasks trigger condition is not met then will try to emit workflow finish event. @@ -196,6 +200,14 @@ protected void finalizeEventAction(final IWorkflowExecutionRunnable workflowExec workflowEventBusCoordinator.unRegisterWorkflowEventBus(workflowExecutionRunnable); workflowAlertManager.sendAlertWorkflowInstance(workflowExecutionRunnable.getWorkflowInstance()); + // Check and wake up next serial waiting workflow instance + try { + workflowExecutionStrategyService + .checkAndWakeUpNextSerialInstance(workflowExecutionRunnable.getWorkflowInstance()); + } catch (Exception e) { + log.warn("Failed to check and wake up next serial waiting workflow instance", e); + } + log.info("Successfully finalize WorkflowExecuteRunnable: {}", workflowExecutionRunnable.getName()); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/strategy/WorkflowExecutionStrategyService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/strategy/WorkflowExecutionStrategyService.java new file mode 100644 index 000000000000..cceb3f2b6757 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/strategy/WorkflowExecutionStrategyService.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.strategy; + +import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; + +public interface WorkflowExecutionStrategyService { + + boolean checkAndApplyExecutionStrategy(WorkflowDefinition workflowDefinition, + WorkflowInstance workflowInstance); + + boolean handleParallelStrategy(WorkflowDefinition workflowDefinition, + WorkflowInstance workflowInstance); + + boolean handleSerialWaitStrategy(WorkflowDefinition workflowDefinition, + WorkflowInstance workflowInstance); + + boolean handleSerialDiscardStrategy(WorkflowDefinition workflowDefinition, + WorkflowInstance workflowInstance); + + boolean handleSerialPriorityStrategy(WorkflowDefinition workflowDefinition, + WorkflowInstance workflowInstance); + + void killWorkflowInstance(WorkflowInstance workflowInstance); + + void checkAndWakeUpNextSerialInstance(WorkflowInstance completedInstance); + + void wakeUpSerialWaitingInstance(WorkflowInstance waitingInstance); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/strategy/WorkflowExecutionStrategyServiceImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/strategy/WorkflowExecutionStrategyServiceImpl.java new file mode 100644 index 000000000000..fe13d3f220fe --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/strategy/WorkflowExecutionStrategyServiceImpl.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.strategy; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionTypeEnum; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao; +import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; +import org.apache.dolphinscheduler.service.command.CommandService; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Workflow execution strategy service for 3.3.x version. + *

This service handles different execution strategies for workflow instances: + * - PARALLEL: Execute workflow instances in parallel + * - SERIAL_WAIT: Execute workflow instances in serial order, waiting for previous to complete + * - SERIAL_DISCARD: Discard new instances and kill running ones + * - SERIAL_PRIORITY: Execute workflow instances in serial order based on priority + */ +@Slf4j +@Service +public class WorkflowExecutionStrategyServiceImpl implements WorkflowExecutionStrategyService { + + @Autowired + private WorkflowInstanceDao workflowInstanceDao; + + @Autowired + private CommandService commandService; + + @Autowired + private WorkflowDefinitionLogDao workflowDefinitionLogDao; + + /** + * Check and apply execution strategy for a new workflow instance. + * + * @param workflowDefinition the workflow definition + * @param workflowInstance the workflow instance to check + * @return true if the workflow instance should be executed, false otherwise + */ + @Override + public boolean checkAndApplyExecutionStrategy(WorkflowDefinition workflowDefinition, + WorkflowInstance workflowInstance) { + WorkflowExecutionTypeEnum executionType = workflowDefinition.getExecutionType(); + + switch (executionType) { + case PARALLEL: + return handleParallelStrategy(workflowDefinition, workflowInstance); + case SERIAL_WAIT: + return handleSerialWaitStrategy(workflowDefinition, workflowInstance); + case SERIAL_DISCARD: + return handleSerialDiscardStrategy(workflowDefinition, workflowInstance); + case SERIAL_PRIORITY: + return handleSerialPriorityStrategy(workflowDefinition, workflowInstance); + default: + log.warn("Unknown execution type: {}, defaulting to parallel", executionType); + return true; + } + } + + /** + * Handle parallel execution strategy. + * All workflow instances can run in parallel. + */ + @Override + public boolean handleParallelStrategy(WorkflowDefinition workflowDefinition, + WorkflowInstance workflowInstance) { + log.debug("Applying parallel strategy for workflow: {}", workflowInstance.getName()); + return true; + } + + /** + * Handle serial wait execution strategy. + * New workflow instances will wait for the previous one to complete. + */ + @Override + public boolean handleSerialWaitStrategy(WorkflowDefinition workflowDefinition, + WorkflowInstance workflowInstance) { + log.debug("Applying serial wait strategy for workflow: {}", workflowInstance.getName()); + + // Check if there are any running instances of the same workflow definition + List runningInstances = workflowInstanceDao.queryByWorkflowDefinitionCodeAndStatus( + workflowDefinition.getCode(), + new int[]{WorkflowExecutionStatus.RUNNING_EXECUTION.getCode()}); + + if (CollectionUtils.isNotEmpty(runningInstances)) { + // Set the new instance to SERIAL_WAIT state + workflowInstance.setStateWithDesc(WorkflowExecutionStatus.SERIAL_WAIT, + "wait by serial_wait strategy"); + workflowInstanceDao.upsertWorkflowInstance(workflowInstance); + log.info("Workflow instance {} set to SERIAL_WAIT state", workflowInstance.getName()); + return false; + } + + return true; + } + + /** + * Handle serial discard execution strategy. + * New workflow instances will be discarded and running ones will be killed. + */ + @Override + public boolean handleSerialDiscardStrategy(WorkflowDefinition workflowDefinition, + WorkflowInstance workflowInstance) { + log.debug("Applying serial discard strategy for workflow: {}", workflowInstance.getName()); + + // Check if there are any running instances of the same workflow definition + List runningInstances = workflowInstanceDao.queryByWorkflowDefinitionCodeAndStatus( + workflowDefinition.getCode(), + new int[]{WorkflowExecutionStatus.RUNNING_EXECUTION.getCode()}); + + if (CollectionUtils.isNotEmpty(runningInstances)) { + // Kill all running instances + for (WorkflowInstance runningInstance : runningInstances) { + killWorkflowInstance(runningInstance); + } + + // Set the new instance to RUNNING state + workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, + "submit from serial_discard strategy"); + workflowInstanceDao.upsertWorkflowInstance(workflowInstance); + log.info("Killed {} running instances and started new instance: {}", + runningInstances.size(), workflowInstance.getName()); + return true; + } + + return true; + } + + /** + * Handle serial priority execution strategy. + *

Workflow instances will be executed in serial order based on priority. + */ + @Override + public boolean handleSerialPriorityStrategy(WorkflowDefinition workflowDefinition, + WorkflowInstance workflowInstance) { + log.debug("Applying serial priority strategy for workflow: {}", workflowInstance.getName()); + + // Check if there are any running instances of the same workflow definition + List runningInstances = workflowInstanceDao.queryByWorkflowDefinitionCodeAndStatus( + workflowDefinition.getCode(), + new int[]{WorkflowExecutionStatus.RUNNING_EXECUTION.getCode()}); + + if (CollectionUtils.isNotEmpty(runningInstances)) { + // Check if the new instance has higher priority than running instances + boolean hasHigherPriority = runningInstances.stream() + .allMatch(runningInstance -> workflowInstance.getWorkflowInstancePriority() + .getCode() > runningInstance.getWorkflowInstancePriority().getCode()); + + if (hasHigherPriority) { + // Kill all running instances and start the new one + for (WorkflowInstance runningInstance : runningInstances) { + killWorkflowInstance(runningInstance); + } + + workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, + "submit from serial_priority strategy with higher priority"); + workflowInstanceDao.upsertWorkflowInstance(workflowInstance); + log.info("Killed {} running instances and started higher priority instance: {}", + runningInstances.size(), workflowInstance.getName()); + return true; + } else { + // Set the new instance to SERIAL_WAIT state + workflowInstance.setStateWithDesc(WorkflowExecutionStatus.SERIAL_WAIT, + "wait by serial_priority strategy"); + workflowInstanceDao.upsertWorkflowInstance(workflowInstance); + log.info("Workflow instance {} set to SERIAL_WAIT state due to lower priority", + workflowInstance.getName()); + return false; + } + } + + return true; + } + + /** + * Kill a workflow instance by creating a stop command. + */ + + @Override + public void killWorkflowInstance(WorkflowInstance workflowInstance) { + Command command = new Command(); + command.setCommandType(CommandType.STOP); + command.setWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode()); + command.setWorkflowDefinitionVersion(workflowInstance.getWorkflowDefinitionVersion()); + command.setWorkflowInstanceId(workflowInstance.getId()); + command.setExecutorId(workflowInstance.getExecutorId()); + + Map commandParam = new HashMap<>(); + commandParam.put("workflowInstanceId", String.valueOf(workflowInstance.getId())); + command.setCommandParam(JSONUtils.toJsonString(commandParam)); + + commandService.createCommand(command); + log.info("Created stop command for workflow instance: {}", workflowInstance.getName()); + } + + /** + * Check and wake up the next serial waiting workflow instance. + * This method should be called when a workflow instance completes. + */ + @Override + public void checkAndWakeUpNextSerialInstance(WorkflowInstance completedInstance) { + log.info("Checking for next serial waiting instance after workflow instance {} completes", + completedInstance.getName()); + + // Get workflow definition by code and version + WorkflowDefinition workflowDefinition = workflowDefinitionLogDao.queryByDefinitionCodeAndVersion( + completedInstance.getWorkflowDefinitionCode(), + completedInstance.getWorkflowDefinitionVersion()); + + if (workflowDefinition == null) { + log.warn("Cannot find workflow definition for instance: {} (code: {}, version: {})", + completedInstance.getName(), + completedInstance.getWorkflowDefinitionCode(), + completedInstance.getWorkflowDefinitionVersion()); + return; + } + + WorkflowExecutionTypeEnum executionType = workflowDefinition.getExecutionType(); + log.info("Workflow definition {} has execution type: {}", workflowDefinition.getName(), executionType); + + if (executionType == WorkflowExecutionTypeEnum.SERIAL_WAIT || + executionType == WorkflowExecutionTypeEnum.SERIAL_PRIORITY) { + + // Find the next waiting instance + List waitingInstances = workflowInstanceDao.queryByWorkflowDefinitionCodeAndStatus( + workflowDefinition.getCode(), + new int[]{WorkflowExecutionStatus.SERIAL_WAIT.getCode()}); + + log.info("Found {} waiting instances for workflow definition {}", + waitingInstances.size(), workflowDefinition.getName()); + + if (CollectionUtils.isNotEmpty(waitingInstances)) { + // Sort by priority if it's SERIAL_PRIORITY strategy + if (executionType == WorkflowExecutionTypeEnum.SERIAL_PRIORITY) { + waitingInstances.sort((a, b) -> Integer.compare(b.getWorkflowInstancePriority().getCode(), + a.getWorkflowInstancePriority().getCode())); + } + + // Wake up the first waiting instance + WorkflowInstance nextInstance = waitingInstances.get(0); + log.info("Waking up next waiting instance: {}", nextInstance.getName()); + wakeUpSerialWaitingInstance(nextInstance); + } else { + log.info("No waiting instances found for workflow definition {}", workflowDefinition.getName()); + } + } else { + log.info("Workflow definition {} is not using serial strategy, no need to wake up waiting instances", + workflowDefinition.getName()); + } + } + + /** + * Wake up a serial waiting workflow instance. + */ + @Override + public void wakeUpSerialWaitingInstance(WorkflowInstance waitingInstance) { + log.info("Creating RECOVER_SERIAL_WAIT command for workflow instance: {} (id: {})", + waitingInstance.getName(), waitingInstance.getId()); + + Command command = new Command(); + command.setCommandType(CommandType.RECOVER_SERIAL_WAIT); + command.setWorkflowDefinitionCode(waitingInstance.getWorkflowDefinitionCode()); + command.setWorkflowDefinitionVersion(waitingInstance.getWorkflowDefinitionVersion()); + command.setWorkflowInstanceId(waitingInstance.getId()); + command.setExecutorId(waitingInstance.getExecutorId()); + + Map commandParam = new HashMap<>(); + commandParam.put("workflowInstanceId", String.valueOf(waitingInstance.getId())); + command.setCommandParam(JSONUtils.toJsonString(commandParam)); + + try { + commandService.createCommand(command); + log.info("Successfully created RECOVER_SERIAL_WAIT command for workflow instance: {}", + waitingInstance.getName()); + } catch (Exception e) { + log.error("Failed to create RECOVER_SERIAL_WAIT command for workflow instance: {}", + waitingInstance.getName(), e); + } + } +}