Skip to content

Commit 778bab0

Browse files
authored
[Fix-17026][API] Set workflowDefinition status to offline when importing (#17132)
1 parent 7a8d910 commit 778bab0

7 files changed

Lines changed: 457 additions & 15 deletions

File tree

docs/docs/en/faq.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,4 +752,13 @@ start API server. If you want disabled when Python gateway service you could cha
752752

753753
---
754754

755+
## Q: Why is the schedule status of a workflow definition set to "Offline" after importing an online workflow definition?
756+
757+
A: This is because we want to prevent users from directly importing a scheduled workflow that is already "Online".
758+
Therefore, when exporting such workflows, the system automatically changes their status to "Offline".
759+
To enforce this rule, even if a user manually sets the schedule status to "Online" in the workflow definition before importing,
760+
the system will override it and set it to "Offline".
761+
762+
---
763+
755764
We will collect more FAQ later

docs/docs/zh/faq.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,4 +736,13 @@ A: 对于标识为`缓存执行`的任务, 当任务启动时会生成一个缓
736736

737737
若不需要缓存时,可以在工作流实例中右键运行清除缓存,则会清除该版本下当前输入的参数的缓存数据。
738738

739+
---
740+
741+
## Q:为什么导入一个在线的工作流定义,导入后的工作流定义的定时状态是下线的?
742+
743+
A:因为我们不希望用户直接导入一个`在线`的定时工作流,所以在导出时,系统会将它的状态改为`下线`
744+
为了遵循这个原则,即使用户自行将要导入的工作流定义中的定时状态改为`在线`,系统也会将状态覆盖为`下线`
745+
746+
---
747+
739748
我们会持续收集更多的 FAQ。

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1576,6 +1576,8 @@ protected boolean checkAndImport(User loginUser,
15761576
schedule.setUserId(loginUser.getId());
15771577
schedule.setCreateTime(now);
15781578
schedule.setUpdateTime(now);
1579+
// not allow to import an online schedule
1580+
schedule.setReleaseState(ReleaseState.OFFLINE);
15791581
int scheduleInsert = scheduleMapper.insert(schedule);
15801582
if (0 == scheduleInsert) {
15811583
log.error(

dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java

Lines changed: 142 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,18 @@
2626
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
2727
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
2828
import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING;
29+
import static org.mockito.ArgumentMatchers.any;
30+
import static org.mockito.ArgumentMatchers.anyList;
31+
import static org.mockito.ArgumentMatchers.anyLong;
32+
import static org.mockito.ArgumentMatchers.anyString;
33+
import static org.mockito.ArgumentMatchers.eq;
2934
import static org.mockito.ArgumentMatchers.isA;
3035
import static org.mockito.Mockito.doNothing;
3136
import static org.mockito.Mockito.doThrow;
3237
import static org.mockito.Mockito.times;
3338
import static org.mockito.Mockito.when;
3439

40+
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
3541
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest;
3642
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowFilterRequest;
3743
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
@@ -63,6 +69,7 @@
6369
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
6470
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
6571
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
72+
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
6673
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
6774
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
6875
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper;
@@ -79,7 +86,12 @@
7986
import org.apache.commons.lang3.StringUtils;
8087

8188
import java.io.ByteArrayOutputStream;
89+
import java.io.IOException;
90+
import java.net.URISyntaxException;
8291
import java.nio.charset.StandardCharsets;
92+
import java.nio.file.Files;
93+
import java.nio.file.Path;
94+
import java.nio.file.Paths;
8395
import java.text.MessageFormat;
8496
import java.util.ArrayList;
8597
import java.util.Arrays;
@@ -105,6 +117,7 @@
105117
import org.mockito.Mockito;
106118
import org.mockito.junit.jupiter.MockitoExtension;
107119
import org.springframework.mock.web.MockMultipartFile;
120+
import org.springframework.web.multipart.MultipartFile;
108121

109122
import com.google.common.collect.Lists;
110123

@@ -182,6 +195,9 @@ public class WorkflowDefinitionServiceTest extends BaseServiceTestTool {
182195
@Mock
183196
private WorkflowDefinitionLogDao workflowDefinitionLogDao;
184197

198+
@Mock
199+
private TaskDefinitionLogMapper taskDefinitionLogMapper;
200+
185201
@Mock
186202
private UserMapper userMapper;
187203

@@ -343,11 +359,11 @@ public void testQueryWorkflowDefinitionListPaging() {
343359
.totalCount(30)
344360
.build();
345361
when(workflowDefinitionDao.listingWorkflowDefinition(
346-
Mockito.eq(0),
347-
Mockito.eq(10),
348-
Mockito.eq(""),
349-
Mockito.eq(1),
350-
Mockito.eq(projectCode))).thenReturn(pageListingResult);
362+
eq(0),
363+
eq(10),
364+
eq(""),
365+
eq(1),
366+
eq(projectCode))).thenReturn(pageListingResult);
351367
String user1 = "user1";
352368
String user2 = "user2";
353369
when(userMapper.queryUserWithWorkflowDefinitionCode(processDefinitionCodes))
@@ -403,7 +419,7 @@ public void testQueryWorkflowDefinitionByCode() {
403419
when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_DEFINITION))
404420
.thenReturn(result);
405421
DagData dagData = new DagData(getWorkflowDefinition(), null, null);
406-
when(processService.genDagData(Mockito.any())).thenReturn(dagData);
422+
when(processService.genDagData(any())).thenReturn(dagData);
407423

408424
Map<String, Object> instanceNotexitRes =
409425
processDefinitionService.queryWorkflowDefinitionByCode(user, projectCode, 1L);
@@ -743,7 +759,7 @@ public void testGetTaskNodeListByDefinitionCode() {
743759
// success
744760
WorkflowDefinition workflowDefinition = getWorkflowDefinition();
745761
putMsg(result, Status.SUCCESS, projectCode);
746-
when(processService.genDagData(Mockito.any())).thenReturn(new DagData(workflowDefinition, null, null));
762+
when(processService.genDagData(any())).thenReturn(new DagData(workflowDefinition, null, null));
747763
when(workflowDefinitionMapper.queryByCode(46L)).thenReturn(workflowDefinition);
748764
Map<String, Object> dataNotValidRes =
749765
processDefinitionService.getTaskNodeListByDefinitionCode(user, projectCode, 46L);
@@ -774,7 +790,7 @@ public void testGetTaskNodeListByDefinitionCodes() {
774790
workflowDefinitionList.add(workflowDefinition);
775791

776792
when(workflowDefinitionMapper.queryByCodes(defineCodeSet)).thenReturn(workflowDefinitionList);
777-
when(processService.genDagData(Mockito.any())).thenReturn(new DagData(workflowDefinition, null, null));
793+
when(processService.genDagData(any())).thenReturn(new DagData(workflowDefinition, null, null));
778794
Project project1 = getProject(projectCode);
779795
List<Project> projects = new ArrayList<>();
780796
projects.add(project1);
@@ -881,7 +897,7 @@ public void testBatchExportWorkflowDefinitionByCodes() {
881897
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
882898

883899
DagData dagData = new DagData(getWorkflowDefinition(), null, null);
884-
when(processService.genDagData(Mockito.any())).thenReturn(dagData);
900+
when(processService.genDagData(any())).thenReturn(dagData);
885901
processDefinitionService.batchExportWorkflowDefinitionByCodes(user, projectCode, "1", response);
886902
Assertions.assertNotNull(processDefinitionService.exportWorkflowDagData(workflowDefinition));
887903
}
@@ -917,13 +933,13 @@ public void testImportSqlWorkflowDefinition() throws Exception {
917933
when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
918934
when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_IMPORT))
919935
.thenReturn(result);
920-
when(processService.saveTaskDefine(Mockito.same(user), Mockito.eq(projectCode), Mockito.notNull(),
936+
when(processService.saveTaskDefine(Mockito.same(user), eq(projectCode), Mockito.notNull(),
921937
Mockito.anyBoolean())).thenReturn(2);
922938
when(processService.saveWorkflowDefine(Mockito.same(user), Mockito.notNull(), Mockito.notNull(),
923939
Mockito.anyBoolean())).thenReturn(1);
924940
when(
925-
processService.saveTaskRelation(Mockito.same(user), Mockito.eq(projectCode), Mockito.anyLong(),
926-
Mockito.eq(1), Mockito.notNull(), Mockito.notNull(), Mockito.anyBoolean()))
941+
processService.saveTaskRelation(Mockito.same(user), eq(projectCode), anyLong(),
942+
eq(1), Mockito.notNull(), Mockito.notNull(), Mockito.anyBoolean()))
927943
.thenReturn(0);
928944
result = processDefinitionService.importSqlWorkflowDefinition(user, projectCode, mockMultipartFile);
929945

@@ -991,8 +1007,8 @@ public void testCreateWorkflowDefinitionV2() {
9911007
workflowCreateRequest.setReleaseState(releaseState);
9921008
workflowCreateRequest.setWarningGroupId(warningGroupId);
9931009
workflowCreateRequest.setExecutionType(executionType);
994-
when(workflowDefinitionLogMapper.insert(Mockito.any())).thenReturn(1);
995-
when(workflowDefinitionMapper.insert(Mockito.any())).thenReturn(1);
1010+
when(workflowDefinitionLogMapper.insert(any())).thenReturn(1);
1011+
when(workflowDefinitionMapper.insert(any())).thenReturn(1);
9961012
WorkflowDefinition workflowDefinition =
9971013
processDefinitionService.createSingleWorkflowDefinition(user, workflowCreateRequest);
9981014

@@ -1109,7 +1125,7 @@ public void testUpdateWorkflowDefinitionV2() {
11091125
// error update process definition mapper
11101126
workflowUpdateRequest.setName(name);
11111127
when(workflowDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(workflowDefinition);
1112-
when(workflowDefinitionLogMapper.insert(Mockito.any())).thenReturn(1);
1128+
when(workflowDefinitionLogMapper.insert(any())).thenReturn(1);
11131129
exception = Assertions.assertThrows(ServiceException.class, () -> processDefinitionService
11141130
.updateSingleWorkflowDefinition(user, processDefinitionCode, workflowUpdateRequest));
11151131
Assertions.assertEquals(Status.UPDATE_WORKFLOW_DEFINITION_ERROR.getCode(),
@@ -1254,4 +1270,115 @@ private List<TaskMainInfo> getTaskMainInfo() {
12541270
taskMainInfos.add(taskMainInfo);
12551271
return taskMainInfos;
12561272
}
1273+
1274+
@Test
1275+
public void testImportWorkflowDefinitionWithoutProjectAuth() {
1276+
Project project = this.getProject(projectCode);
1277+
Map<String, Object> successResult = new HashMap<>();
1278+
putMsg(successResult, Status.SUCCESS);
1279+
MultipartFile file = new MockMultipartFile(
1280+
"file", "", "application/json", "".getBytes());
1281+
Map<String, Object> checkProjectPermResult1 = new HashMap<>();
1282+
putMsg(checkProjectPermResult1, Status.USER_NO_OPERATION_PROJECT_PERM);
1283+
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
1284+
when(projectService.checkProjectAndAuth(user, project, project.getCode(), WORKFLOW_IMPORT))
1285+
.thenReturn(checkProjectPermResult1);
1286+
Map<String, Object> checkProjectPermResult = processDefinitionService.importWorkflowDefinition(
1287+
user, projectCode, file);
1288+
Assertions.assertEquals(
1289+
checkProjectPermResult.get(Constants.STATUS), checkProjectPermResult1.get(Constants.STATUS));
1290+
}
1291+
1292+
@Test
1293+
public void testImportWorkflowDefinitionWithEmptyFileContent() {
1294+
Project project = this.getProject(projectCode);
1295+
Map<String, Object> successResult = new HashMap<>();
1296+
putMsg(successResult, Status.SUCCESS);
1297+
MultipartFile file = new MockMultipartFile("file", "", "application/json", "".getBytes());
1298+
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
1299+
when(projectService.checkProjectAndAuth(user, project, project.getCode(), WORKFLOW_IMPORT))
1300+
.thenReturn(successResult);
1301+
Map<String, Object> result = processDefinitionService.importWorkflowDefinition(user, projectCode, file);
1302+
Assertions.assertEquals(Status.DATA_IS_NULL, result.get(Constants.STATUS));
1303+
}
1304+
1305+
@Test
1306+
public void testImportWorkflowDefinitionWhenMissImportanceParams() throws URISyntaxException, IOException {
1307+
Project project = this.getProject(projectCode);
1308+
Map<String, Object> successResult = new HashMap<>();
1309+
putMsg(successResult, Status.SUCCESS);
1310+
// miss workflowTaskRelationList
1311+
MultipartFile checkImportanceParamsFile = createMultipartFile("workflowImport/check_importance_params.json");
1312+
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
1313+
when(projectService.checkProjectAndAuth(user, project, project.getCode(), WORKFLOW_IMPORT))
1314+
.thenReturn(successResult);
1315+
Map<String, Object> checkImportanceParamsResult = processDefinitionService.importWorkflowDefinition(
1316+
user, projectCode, checkImportanceParamsFile);
1317+
Assertions.assertEquals(Status.DATA_IS_NULL, checkImportanceParamsResult.get(Constants.STATUS));
1318+
}
1319+
1320+
@Test
1321+
public void testImportWorkflowDefinitionWhenNameExist() throws URISyntaxException, IOException {
1322+
Project project = this.getProject(projectCode);
1323+
Map<String, Object> successResult = new HashMap<>();
1324+
putMsg(successResult, Status.SUCCESS);
1325+
MultipartFile checkDuplicateNameFile = createMultipartFile("workflowImport/check_duplicate_name.json");
1326+
Map<String, Object> verifyNameResult = new HashMap<>();
1327+
putMsg(verifyNameResult, Status.WORKFLOW_DEFINITION_NAME_EXIST);
1328+
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
1329+
when(projectService.checkProjectAndAuth(user, project, project.getCode(), WORKFLOW_IMPORT))
1330+
.thenReturn(successResult);
1331+
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
1332+
when(projectService.checkProjectAndAuth(user, project, project.getCode(), WORKFLOW_CREATE))
1333+
.thenReturn(successResult);
1334+
WorkflowDefinition workflowDefinition = new WorkflowDefinition();
1335+
workflowDefinition.setCode(2);
1336+
workflowDefinition.setName("workflow1");
1337+
when(workflowDefinitionMapper.verifyByDefineName(eq(projectCode), anyString()))
1338+
.thenReturn(workflowDefinition);
1339+
Map<String, Object> checkDuplicateNameResult = processDefinitionService.importWorkflowDefinition(
1340+
user, projectCode, checkDuplicateNameFile);
1341+
Assertions.assertEquals(Status.WORKFLOW_DEFINITION_NAME_EXIST, checkDuplicateNameResult.get(Constants.STATUS));
1342+
}
1343+
1344+
@Test
1345+
public void testImportWorkflowDefinitionSuccessful() throws URISyntaxException, IOException {
1346+
Project project = this.getProject(projectCode);
1347+
Map<String, Object> successResult = new HashMap<>();
1348+
putMsg(successResult, Status.SUCCESS);
1349+
MultipartFile successfulFile = createMultipartFile("workflowImport/check_successful.json");
1350+
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
1351+
when(projectService.checkProjectAndAuth(user, project, project.getCode(),
1352+
ApiFuncIdentificationConstant.WORKFLOW_IMPORT))
1353+
.thenReturn(successResult);
1354+
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
1355+
when(projectService.checkProjectAndAuth(user, project, project.getCode(), WORKFLOW_CREATE))
1356+
.thenReturn(successResult);
1357+
when(workflowDefinitionMapper.verifyByDefineName(eq(projectCode), anyString()))
1358+
.thenReturn(null);
1359+
when(taskDefinitionMapper.batchInsert(anyList())).thenReturn(1);
1360+
when(taskDefinitionLogMapper.batchInsert(anyList())).thenReturn(1);
1361+
WorkflowDefinition successWorkflowDef = new WorkflowDefinition();
1362+
successWorkflowDef.setCode(123);
1363+
when(workflowDefinitionMapper.queryByCode(anyLong())).thenReturn(successWorkflowDef);
1364+
when(scheduleMapper.insert(any())).thenReturn(1);
1365+
when(processService.saveWorkflowDefine(eq(user), any(), eq(true), eq(true)))
1366+
.thenReturn(Constants.VERSION_FIRST);
1367+
Map<String, Object> successfulResul = processDefinitionService.importWorkflowDefinition(
1368+
user, 1L, successfulFile);
1369+
Assertions.assertEquals(Status.SUCCESS, successfulResul.get(Constants.STATUS));
1370+
}
1371+
1372+
private MultipartFile createMultipartFile(String filePath) throws URISyntaxException, IOException {
1373+
Path path = Paths.get(getClass().getClassLoader().getResource(filePath).toURI());
1374+
byte[] content = Files.readAllBytes(path);
1375+
1376+
// 2. 创建MockMultipartFile对象
1377+
MultipartFile multipartFile = new MockMultipartFile(
1378+
"file",
1379+
"",
1380+
"application/json",
1381+
content);
1382+
return multipartFile;
1383+
}
12571384
}

0 commit comments

Comments
 (0)