Skip to content

Commit 5f64bad

Browse files
authored
[Fix-17239][Dependent] Dependent check get wrong result in manual running execution type (#17240)
1 parent 6f12d1d commit 5f64bad

5 files changed

Lines changed: 72 additions & 29 deletions

File tree

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ List<WorkflowInstance> queryByHostAndStatus(@Param("host") String host,
6565
*/
6666
List<WorkflowInstance> queryMainWorkflowByHostAndStatus(@Param("host") String host,
6767
@Param("states") int[] stateArray);
68+
6869
/**
6970
* query workflow instance host by stateArray
7071
*
@@ -108,15 +109,15 @@ List<WorkflowInstance> queryByWorkerGroupNameAndStatus(@Param("workerGroupName")
108109
/**
109110
* workflow instance page
110111
*
111-
* @param page page
112-
* @param projectCode projectCode
112+
* @param page page
113+
* @param projectCode projectCode
113114
* @param workflowDefinitionCode workflowDefinitionCode
114-
* @param searchVal searchVal
115-
* @param executorName executorName
116-
* @param statusArray statusArray
117-
* @param host host
118-
* @param startTime startTime
119-
* @param endTime endTime
115+
* @param searchVal searchVal
116+
* @param executorName executorName
117+
* @param statusArray statusArray
118+
* @param host host
119+
* @param startTime startTime
120+
* @param endTime endTime
120121
* @return workflow instance page
121122
*/
122123
IPage<WorkflowInstance> queryWorkflowInstanceListPaging(Page<WorkflowInstance> page,
@@ -186,7 +187,7 @@ List<WorkflowInstanceStatusCountDto> countWorkflowInstanceStateByProjectCodes(
186187
* query workflow instance by workflowDefinitionCode
187188
*
188189
* @param workflowDefinitionCode workflowDefinitionCode
189-
* @param size size
190+
* @param size size
190191
* @return workflow instance list
191192
*/
192193
List<WorkflowInstance> queryByWorkflowDefinitionCode(@Param("workflowDefinitionCode") Long workflowDefinitionCode,
@@ -196,10 +197,10 @@ List<WorkflowInstance> queryByWorkflowDefinitionCode(@Param("workflowDefinitionC
196197
* query last scheduler workflow instance
197198
*
198199
* @param workflowDefinitionCode definitionCode
199-
* @param taskDefinitionCode definitionCode
200-
* @param startTime startTime
201-
* @param endTime endTime
202-
* @param testFlag testFlag
200+
* @param taskDefinitionCode definitionCode
201+
* @param startTime startTime
202+
* @param endTime endTime
203+
* @param testFlag testFlag
203204
* @return workflow instance
204205
*/
205206
WorkflowInstance queryLastSchedulerWorkflow(@Param("workflowDefinitionCode") Long workflowDefinitionCode,
@@ -212,10 +213,10 @@ WorkflowInstance queryLastSchedulerWorkflow(@Param("workflowDefinitionCode") Lon
212213
* query last manual workflow instance
213214
*
214215
* @param workflowDefinitionCode workflowDefinitionCode
215-
* @param taskCode taskCode
216-
* @param startTime startTime
217-
* @param endTime endTime
218-
* @param testFlag testFlag
216+
* @param taskCode taskCode
217+
* @param startTime startTime
218+
* @param endTime endTime
219+
* @param testFlag testFlag
219220
* @return workflow instance
220221
*/
221222
WorkflowInstance queryLastManualWorkflow(@Param("workflowDefinitionCode") Long workflowDefinitionCode,
@@ -224,6 +225,11 @@ WorkflowInstance queryLastManualWorkflow(@Param("workflowDefinitionCode") Long w
224225
@Param("endTime") Date endTime,
225226
@Param("testFlag") int testFlag);
226227

228+
WorkflowInstance queryLastRunningWorkflow(@Param("workflowDefinitionCode") Long workflowDefinitionCode,
229+
@Param("startTime") Date startTime,
230+
@Param("endTime") Date endTime,
231+
@Param("states") int[] stateArray);
232+
227233
/**
228234
* query first schedule workflow instance
229235
*
@@ -261,7 +267,7 @@ List<WorkflowInstance> queryTopNWorkflowInstance(@Param("size") int size,
261267
* query workflow instance by workflowDefinitionCode and stateArray
262268
*
263269
* @param workflowDefinitionCode workflowDefinitionCode
264-
* @param states states array
270+
* @param states states array
265271
* @return workflow instance list
266272
*/
267273

@@ -275,12 +281,12 @@ List<WorkflowInstance> queryByWorkflowCodeVersionStatus(@Param("workflowDefiniti
275281
/**
276282
* Filter workflow instance
277283
*
278-
* @param page page
284+
* @param page page
279285
* @param workflowDefinitionCode workflowDefinitionCode
280-
* @param name name
281-
* @param host host
282-
* @param startTime startTime
283-
* @param endTime endTime
286+
* @param name name
287+
* @param host host
288+
* @param startTime startTime
289+
* @param endTime endTime
284290
* @return workflow instance IPage
285291
*/
286292
IPage<WorkflowInstance> queryWorkflowInstanceListV2Paging(Page<WorkflowInstance> page,

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ WorkflowInstance queryLastSchedulerWorkflowInterval(Long workflowDefinitionCode,
6868
WorkflowInstance queryLastManualWorkflowInterval(Long definitionCode, Long taskCode, DateInterval dateInterval,
6969
int testFlag);
7070

71+
WorkflowInstance queryLastRunningWorkflowInterval(Long definitionCode, DateInterval dateInterval);
72+
7173
/**
7274
* query first schedule workflow instance
7375
*

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,16 @@ public WorkflowInstance queryLastManualWorkflowInterval(Long definitionCode, Lon
120120
testFlag);
121121
}
122122

123+
@Override
124+
public WorkflowInstance queryLastRunningWorkflowInterval(Long definitionCode, DateInterval dateInterval) {
125+
int[] runningStateArray = new int[]{WorkflowExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
126+
WorkflowExecutionStatus.RUNNING_EXECUTION.ordinal(),
127+
WorkflowExecutionStatus.READY_PAUSE.ordinal(),
128+
WorkflowExecutionStatus.READY_STOP.ordinal()};
129+
return mybatisMapper.queryLastRunningWorkflow(definitionCode, dateInterval.getStartTime(),
130+
dateInterval.getEndTime(), runningStateArray);
131+
}
132+
123133
/**
124134
* query first schedule process instance
125135
*

dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,24 @@
249249
order by t1.end_time desc limit 1
250250
</select>
251251

252+
<select id="queryLastRunningWorkflow" resultType="org.apache.dolphinscheduler.dao.entity.WorkflowInstance">
253+
select
254+
<include refid="baseSql"/>
255+
from t_ds_workflow_instance
256+
where workflow_definition_code=#{workflowDefinitionCode}
257+
<if test="states !=null and states.length != 0">
258+
and state in
259+
<foreach collection="states" item="i" index="index" open="(" separator="," close=")">
260+
#{i}
261+
</foreach>
262+
</if>
263+
<if test="startTime!=null and endTime != null ">
264+
and ((schedule_time <![CDATA[ >= ]]> #{startTime} and schedule_time <![CDATA[ <= ]]> #{endTime})
265+
or (start_time <![CDATA[ >= ]]> #{startTime} and start_time <![CDATA[ <= ]]> #{endTime}))
266+
</if>
267+
order by start_time desc limit 1
268+
</select>
269+
252270
<select id="queryFirstScheduleWorkflowInstance" resultType="org.apache.dolphinscheduler.dao.entity.WorkflowInstance">
253271
select
254272
<include refid="baseSql"/>

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ private DependResult calculateResultForTasks(DependentItem dependentItem,
155155
DependResult result = DependResult.FAILED;
156156
for (DateInterval dateInterval : dateIntervals) {
157157
WorkflowInstance workflowInstance =
158-
findLastWorkflowInterval(dependentItem.getDefinitionCode(), dependentItem.getDepTaskCode(),
158+
findDependentWorkflowCandidate(dependentItem.getDefinitionCode(), dependentItem.getDepTaskCode(),
159159
dateInterval, testFlag);
160160
if (workflowInstance == null) {
161161
return DependResult.WAITING;
@@ -314,17 +314,24 @@ private void addItemVarPool(String varPoolStr, Long endTime) {
314314
}
315315

316316
/**
317-
* find the last one workflow instance that :
318-
* 1. manual run and finish between the interval
319-
* 2. schedule run and schedule time between the interval
317+
* find the last one workflow instance that:
318+
* 1. running workflow instance in the date interval
319+
* 2. manual run and finish between the interval
320+
* 3. schedule run and schedule time between the interval
320321
*
321322
* @param definitionCode definition code
322323
* @param taskCode task code
323324
* @param dateInterval date interval
324325
* @return workflowInstance
325326
*/
326-
private WorkflowInstance findLastWorkflowInterval(Long definitionCode, Long taskCode, DateInterval dateInterval,
327-
int testFlag) {
327+
private WorkflowInstance findDependentWorkflowCandidate(Long definitionCode, Long taskCode,
328+
DateInterval dateInterval,
329+
int testFlag) {
330+
WorkflowInstance runningWorkflow =
331+
workflowInstanceDao.queryLastRunningWorkflowInterval(definitionCode, dateInterval);
332+
if (runningWorkflow != null) {
333+
return runningWorkflow;
334+
}
328335

329336
WorkflowInstance lastSchedulerWorkflowInstance =
330337
workflowInstanceDao.queryLastSchedulerWorkflowInterval(definitionCode, taskCode, dateInterval,

0 commit comments

Comments
 (0)