2626import org .apache .dolphinscheduler .server .master .engine .exceptions .WorkflowEventFireException ;
2727import org .apache .dolphinscheduler .server .master .engine .task .lifecycle .AbstractTaskLifecycleEvent ;
2828import org .apache .dolphinscheduler .server .master .engine .task .lifecycle .event .TaskFatalLifecycleEvent ;
29- import org .apache .dolphinscheduler .server .master .engine .workflow .runnable . IWorkflowExecutionRunnable ;
29+ import org .apache .dolphinscheduler .server .master .engine .workflow .execution . IWorkflowExecution ;
3030import org .apache .dolphinscheduler .server .master .runner .IWorkflowExecuteContext ;
3131import org .apache .dolphinscheduler .server .master .utils .ExceptionUtils ;
3232
4747@ SuppressWarnings ({"rawtypes" , "unchecked" })
4848public class WorkflowEventBusFireWorker {
4949
50- private final Map <Integer , IWorkflowExecutionRunnable > registeredWorkflowExecuteRunnableMap =
50+ private final Map <Integer , IWorkflowExecution > registeredWorkflowExecuteRunnableMap =
5151 new ConcurrentHashMap <>();
5252
5353 private final Map <ILifecycleEventType , ILifecycleEventHandler > eventHandlerMap = new ConcurrentHashMap <>();
@@ -58,35 +58,35 @@ public void registerEventHandler(ILifecycleEventHandler eventHandler) {
5858 eventHandlerMap .put (eventHandler .matchEventType (), eventHandler );
5959 }
6060
61- public void registerWorkflowEventBus (IWorkflowExecutionRunnable workflowExecutionRunnable ) {
62- final IWorkflowExecuteContext workflowExecuteContext = workflowExecutionRunnable .getWorkflowExecuteContext ();
61+ public void registerWorkflowEventBus (IWorkflowExecution workflowExecution ) {
62+ final IWorkflowExecuteContext workflowExecuteContext = workflowExecution .getWorkflowExecuteContext ();
6363 final WorkflowInstance workflowInstance = workflowExecuteContext .getWorkflowInstance ();
6464 final Integer workflowInstanceId = workflowInstance .getId ();
6565 final String workflowInstanceName = workflowInstance .getName ();
6666 checkState (!registeredWorkflowExecuteRunnableMap .containsKey (workflowInstanceId ),
6767 "WorkflowExecuteRunnable(%s/%s already registered at WorkflowEventBusFireWorker" , workflowInstanceId ,
6868 workflowInstanceName );
69- registeredWorkflowExecuteRunnableMap .put (workflowInstanceId , workflowExecutionRunnable );
69+ registeredWorkflowExecuteRunnableMap .put (workflowInstanceId , workflowExecution );
7070 }
7171
72- public void unRegisterWorkflowEventBus (IWorkflowExecutionRunnable workflowExecutionRunnable ) {
73- final IWorkflowExecuteContext workflowExecuteContext = workflowExecutionRunnable .getWorkflowExecuteContext ();
72+ public void unRegisterWorkflowEventBus (IWorkflowExecution workflowExecution ) {
73+ final IWorkflowExecuteContext workflowExecuteContext = workflowExecution .getWorkflowExecuteContext ();
7474 final WorkflowInstance workflowInstance = workflowExecuteContext .getWorkflowInstance ();
7575 final Integer workflowInstanceId = workflowInstance .getId ();
76- registeredWorkflowExecuteRunnableMap .remove (workflowInstanceId , workflowExecutionRunnable );
76+ registeredWorkflowExecuteRunnableMap .remove (workflowInstanceId , workflowExecution );
7777 }
7878
7979 public void fireAllRegisteredEvent () {
80- final List <IWorkflowExecutionRunnable > workflowExecutionRunnables = getWaitingFireWorkflowExecutionRunnables ();
81- if (CollectionUtils .isEmpty (workflowExecutionRunnables )) {
80+ final List <IWorkflowExecution > workflowExecutions = getWaitingFireWorkflowExecutions ();
81+ if (CollectionUtils .isEmpty (workflowExecutions )) {
8282 return ;
8383 }
84- for (final IWorkflowExecutionRunnable workflowExecutionRunnable : workflowExecutionRunnables ) {
85- final Integer workflowInstanceId = workflowExecutionRunnable .getId ();
86- final String workflowInstanceName = workflowExecutionRunnable .getName ();
84+ for (final IWorkflowExecution workflowExecution : workflowExecutions ) {
85+ final Integer workflowInstanceId = workflowExecution .getId ();
86+ final String workflowInstanceName = workflowExecution .getName ();
8787 try {
8888 LogUtils .setWorkflowInstanceIdMDC (workflowInstanceId );
89- doFireSingleWorkflowEventBus (workflowExecutionRunnable );
89+ doFireSingleWorkflowEventBus (workflowExecution );
9090 } catch (Exception ex ) {
9191 log .error ("Fire event failed for WorkflowExecuteRunnable: {}" , workflowInstanceName , ex );
9292 } finally {
@@ -99,7 +99,7 @@ public int getRegisteredWorkflowExecuteRunnableSize() {
9999 return registeredWorkflowExecuteRunnableMap .size ();
100100 }
101101
102- private List <IWorkflowExecutionRunnable > getWaitingFireWorkflowExecutionRunnables () {
102+ private List <IWorkflowExecution > getWaitingFireWorkflowExecutions () {
103103 if (MapUtils .isEmpty (registeredWorkflowExecuteRunnableMap )) {
104104 return Collections .emptyList ();
105105 }
@@ -109,8 +109,8 @@ private List<IWorkflowExecutionRunnable> getWaitingFireWorkflowExecutionRunnable
109109 .collect (Collectors .toList ());
110110 }
111111
112- private void doFireSingleWorkflowEventBus (final IWorkflowExecutionRunnable workflowExecutionRunnable ) {
113- final WorkflowEventBus workflowEventBus = workflowExecutionRunnable .getWorkflowEventBus ();
112+ private void doFireSingleWorkflowEventBus (final IWorkflowExecution workflowExecution ) {
113+ final WorkflowEventBus workflowEventBus = workflowExecution .getWorkflowEventBus ();
114114 while (!workflowEventBus .isEmpty ()) {
115115 Optional <AbstractLifecycleEvent > eventOptional = workflowEventBus .poll ();
116116 if (!eventOptional .isPresent ()) {
@@ -122,7 +122,7 @@ private void doFireSingleWorkflowEventBus(final IWorkflowExecutionRunnable workf
122122 // So we increase the event count before the event fired then we can get the correct event count
123123 // And if the event handle failed we will decrease the success event count
124124 workflowEventBus .getWorkflowEventBusSummary ().increaseFireSuccessEventCount ();
125- doFireSingleEvent (workflowExecutionRunnable , lifecycleEvent );
125+ doFireSingleEvent (workflowExecution , lifecycleEvent );
126126 } catch (Exception ex ) {
127127 // If the database connection is failed, do not remove the event from the event bus
128128 // so that the event can be fired again when the database connection is recovered
@@ -136,7 +136,7 @@ private void doFireSingleWorkflowEventBus(final IWorkflowExecutionRunnable workf
136136 if (ExceptionUtils .isTaskExecutionContextCreateException (ex )) {
137137 AbstractTaskLifecycleEvent taskLifecycleEvent = (AbstractTaskLifecycleEvent ) lifecycleEvent ;
138138 final TaskFatalLifecycleEvent taskFatalEvent = TaskFatalLifecycleEvent .builder ()
139- .taskExecutionRunnable (taskLifecycleEvent .getTaskExecutionRunnable ())
139+ .taskExecution (taskLifecycleEvent .getTaskExecution ())
140140 .endTime (new Date ())
141141 .build ();
142142 workflowEventBus .publish (taskFatalEvent );
@@ -149,13 +149,13 @@ private void doFireSingleWorkflowEventBus(final IWorkflowExecutionRunnable workf
149149 }
150150 }
151151
152- private void doFireSingleEvent (final IWorkflowExecutionRunnable workflowExecutionRunnable ,
152+ private void doFireSingleEvent (final IWorkflowExecution workflowExecution ,
153153 final AbstractLifecycleEvent event ) {
154154 final ILifecycleEventHandler lifecycleEventHandler = eventHandlerMap .get (event .getEventType ());
155155 if (lifecycleEventHandler == null ) {
156156 throw new RuntimeException ("No EventHandler found for event: " + event .getEventType ());
157157 }
158- lifecycleEventHandler .handle (workflowExecutionRunnable , event );
158+ lifecycleEventHandler .handle (workflowExecution , event );
159159 }
160160
161161}
0 commit comments