Skip to content

Commit 6434576

Browse files
authored
[Feature-18136][API] Support view the running task/workflow of a active master/worker at UI monitor page (#18138)
1 parent 8077c2b commit 6434576

32 files changed

Lines changed: 1086 additions & 258 deletions

File tree

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/MonitorController.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.api.controller;
1919

20+
import static org.apache.dolphinscheduler.api.enums.Status.INTERNAL_SERVER_ERROR_ARGS;
2021
import static org.apache.dolphinscheduler.api.enums.Status.LIST_MASTERS_ERROR;
2122
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DATABASE_STATE_ERROR;
2223

@@ -27,7 +28,9 @@
2728
import org.apache.dolphinscheduler.common.model.Server;
2829
import org.apache.dolphinscheduler.dao.entity.User;
2930
import org.apache.dolphinscheduler.dao.plugin.api.monitor.DatabaseMetrics;
31+
import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecutorDTO;
3032
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
33+
import org.apache.dolphinscheduler.task.executor.dto.TaskExecutorDTO;
3134

3235
import java.util.List;
3336

@@ -37,6 +40,7 @@
3740
import org.springframework.web.bind.annotation.PathVariable;
3841
import org.springframework.web.bind.annotation.RequestAttribute;
3942
import org.springframework.web.bind.annotation.RequestMapping;
43+
import org.springframework.web.bind.annotation.RequestParam;
4044
import org.springframework.web.bind.annotation.ResponseStatus;
4145
import org.springframework.web.bind.annotation.RestController;
4246

@@ -81,4 +85,34 @@ public Result<List<DatabaseMetrics>> queryDatabaseState(@Parameter(hidden = true
8185
return Result.success(databaseMetrics);
8286
}
8387

88+
/**
89+
* query running workflow instances on a specific master
90+
*
91+
* @param masterAddress master address in host:port format
92+
* @return running workflow instance list
93+
*/
94+
@GetMapping(value = "/masters/workflow-executors")
95+
@ResponseStatus(HttpStatus.OK)
96+
@ApiException(INTERNAL_SERVER_ERROR_ARGS)
97+
public Result<List<WorkflowExecutorDTO>> queryWorkflowExecutors(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
98+
@RequestParam("masterAddress") String masterAddress) {
99+
List<WorkflowExecutorDTO> workflows = monitorService.queryWorkflowExecutors(loginUser, masterAddress);
100+
return Result.success(workflows);
101+
}
102+
103+
/**
104+
* query running task instances on a specific worker
105+
*
106+
* @param serverAddress worker address in host:port format
107+
* @return running task instance list
108+
*/
109+
@GetMapping(value = "/workers/task-executors")
110+
@ResponseStatus(HttpStatus.OK)
111+
@ApiException(INTERNAL_SERVER_ERROR_ARGS)
112+
public Result<List<TaskExecutorDTO>> queryTaskExecutors(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
113+
@RequestParam("serverAddress") String serverAddress) {
114+
List<TaskExecutorDTO> tasks = monitorService.queryTaskExecutors(loginUser, serverAddress);
115+
return Result.success(tasks);
116+
}
117+
84118
}

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import org.apache.dolphinscheduler.common.model.Server;
2121
import org.apache.dolphinscheduler.dao.entity.User;
2222
import org.apache.dolphinscheduler.dao.plugin.api.monitor.DatabaseMetrics;
23+
import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecutorDTO;
2324
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
25+
import org.apache.dolphinscheduler.task.executor.dto.TaskExecutorDTO;
2426

2527
import java.util.List;
2628

@@ -41,4 +43,8 @@ public interface MonitorService {
4143
* @return server information list
4244
*/
4345
List<Server> listServer(RegistryNodeType nodeType);
46+
47+
List<WorkflowExecutorDTO> queryWorkflowExecutors(User loginUser, String masterAddress);
48+
49+
List<TaskExecutorDTO> queryTaskExecutors(User loginUser, String serverAddress);
4450
}

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,25 @@
1717

1818
package org.apache.dolphinscheduler.api.service.impl;
1919

20+
import org.apache.dolphinscheduler.api.enums.Status;
21+
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
2022
import org.apache.dolphinscheduler.api.service.MonitorService;
23+
import org.apache.dolphinscheduler.common.enums.UserType;
2124
import org.apache.dolphinscheduler.common.model.Server;
2225
import org.apache.dolphinscheduler.dao.entity.User;
2326
import org.apache.dolphinscheduler.dao.plugin.api.monitor.DatabaseMetrics;
2427
import org.apache.dolphinscheduler.dao.plugin.api.monitor.DatabaseMonitor;
28+
import org.apache.dolphinscheduler.extract.base.client.Clients;
29+
import org.apache.dolphinscheduler.extract.master.IWorkflowExecutorQueryClient;
30+
import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecutorDTO;
31+
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowExecutorQueryRequest;
32+
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowExecutorQueryResponse;
33+
import org.apache.dolphinscheduler.extract.worker.ITaskExecutorQueryClient;
34+
import org.apache.dolphinscheduler.extract.worker.transportor.TaskExecutorQueryRequest;
35+
import org.apache.dolphinscheduler.extract.worker.transportor.TaskExecutorQueryResponse;
2536
import org.apache.dolphinscheduler.registry.api.RegistryClient;
2637
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
38+
import org.apache.dolphinscheduler.task.executor.dto.TaskExecutorDTO;
2739

2840
import java.util.List;
2941

@@ -59,4 +71,36 @@ public List<DatabaseMetrics> queryDatabaseState(User loginUser) {
5971
public List<Server> listServer(RegistryNodeType nodeType) {
6072
return registryClient.getServerList(nodeType);
6173
}
74+
75+
@Override
76+
public List<WorkflowExecutorDTO> queryWorkflowExecutors(User loginUser, String masterAddress) {
77+
if (!loginUser.getUserType().equals(UserType.ADMIN_USER)) {
78+
throw new ServiceException(Status.NO_CURRENT_OPERATING_PERMISSION);
79+
}
80+
81+
WorkflowExecutorQueryResponse response = Clients
82+
.withService(IWorkflowExecutorQueryClient.class)
83+
.withHost(masterAddress)
84+
.queryWorkflowExecutors(new WorkflowExecutorQueryRequest());
85+
if (!response.isSuccess()) {
86+
throw new ServiceException(response.getMessage());
87+
}
88+
return response.getWorkflowExecutors();
89+
}
90+
91+
@Override
92+
public List<TaskExecutorDTO> queryTaskExecutors(User loginUser, String serverAddress) {
93+
if (!loginUser.getUserType().equals(UserType.ADMIN_USER)) {
94+
throw new ServiceException(Status.NO_CURRENT_OPERATING_PERMISSION);
95+
}
96+
TaskExecutorQueryResponse response = Clients
97+
.withService(ITaskExecutorQueryClient.class)
98+
.withHost(serverAddress)
99+
.queryTaskInstances(new TaskExecutorQueryRequest());
100+
if (!response.isSuccess()) {
101+
throw new ServiceException(response.getMessage());
102+
}
103+
return response.getTaskExecutors();
104+
}
105+
62106
}

dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/serialize/JsonSerializer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
2222
import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
2323
import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
24+
import static com.fasterxml.jackson.databind.SerializationFeature.FAIL_ON_EMPTY_BEANS;
2425
import static org.apache.dolphinscheduler.common.constants.DateConstants.YYYY_MM_DD_HH_MM_SS;
2526

2627
import org.apache.dolphinscheduler.common.constants.SystemConstants;
@@ -46,6 +47,7 @@ public class JsonSerializer {
4647
.configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
4748
.configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
4849
.configure(REQUIRE_SETTERS_FOR_GETTERS, true)
50+
.configure(FAIL_ON_EMPTY_BEANS, false)
4951
.addModule(new SimpleModule()
5052
.addSerializer(LocalDateTime.class, new JSONUtils.LocalDateTimeSerializer())
5153
.addDeserializer(LocalDateTime.class, new JSONUtils.LocalDateTimeDeserializer()))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.extract.master;
19+
20+
import org.apache.dolphinscheduler.extract.base.RpcMethod;
21+
import org.apache.dolphinscheduler.extract.base.RpcService;
22+
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowExecutorQueryRequest;
23+
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowExecutorQueryResponse;
24+
25+
@RpcService
26+
public interface IWorkflowExecutorQueryClient {
27+
28+
@RpcMethod
29+
WorkflowExecutorQueryResponse queryWorkflowExecutors(WorkflowExecutorQueryRequest request);
30+
31+
}

dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/dto/TaskInstanceExecuteDto.java

Lines changed: 0 additions & 112 deletions
This file was deleted.

0 commit comments

Comments
 (0)