Skip to content

Commit 7a8d910

Browse files
authored
[DSIP-40][APIService] Add LogClient to fetch log (#17165)
1 parent 935117c commit 7a8d910

13 files changed

Lines changed: 697 additions & 156 deletions

File tree

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.executor.logging;
19+
20+
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
21+
import org.apache.dolphinscheduler.extract.base.client.Clients;
22+
import org.apache.dolphinscheduler.extract.common.ILogService;
23+
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
24+
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
25+
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
26+
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
27+
28+
import lombok.extern.slf4j.Slf4j;
29+
30+
import org.springframework.stereotype.Component;
31+
32+
@Component
33+
@Slf4j
34+
public class LocalLogClient {
35+
36+
/**
37+
* Download the complete log of a task instance.
38+
* This method is used to retrieve all log information from the start to the end of a task instance,
39+
* suitable for scenarios where a complete log record is required.
40+
*
41+
* @param taskInstance The task instance object, containing information needed to retrieve the log.
42+
* @return The complete log file download response of the task instance, including log content and metadata.
43+
*/
44+
public TaskInstanceLogFileDownloadResponse getWholeLog(TaskInstance taskInstance) {
45+
return getLocalWholeLog(taskInstance);
46+
}
47+
48+
/**
49+
* Query a portion of the log of a task instance.
50+
* This method is used to query log information of a task instance in a paginated manner,
51+
* suitable for scenarios where the log content is large and needs to be retrieved in batches.
52+
*
53+
* @param taskInstance The task instance object, containing information needed to retrieve the log.
54+
* @param skipLineNum The number of lines to skip, indicating from which line to start reading the log.
55+
* @param limit The maximum number of lines to read, indicating the maximum number of lines to retrieve in this query.
56+
* @return The partial log query response, including log content within the specified range and metadata.
57+
*/
58+
public TaskInstanceLogPageQueryResponse getPartLog(TaskInstance taskInstance, int skipLineNum, int limit) {
59+
return getLocalPartLog(taskInstance, skipLineNum, limit);
60+
}
61+
62+
private TaskInstanceLogFileDownloadResponse getLocalWholeLog(TaskInstance taskInstance) {
63+
TaskInstanceLogFileDownloadRequest request = new TaskInstanceLogFileDownloadRequest(
64+
taskInstance.getId(),
65+
taskInstance.getLogPath());
66+
return getProxyLogService(taskInstance).getTaskInstanceWholeLogFileBytes(request);
67+
}
68+
69+
private TaskInstanceLogPageQueryResponse getLocalPartLog(TaskInstance taskInstance, int skipLineNum,
70+
int limit) {
71+
TaskInstanceLogPageQueryRequest request = TaskInstanceLogPageQueryRequest
72+
.builder()
73+
.taskInstanceId(taskInstance.getId())
74+
.taskInstanceLogAbsolutePath(taskInstance.getLogPath())
75+
.skipLineNum(skipLineNum)
76+
.limit(limit)
77+
.build();
78+
return getProxyLogService(taskInstance).pageQueryTaskInstanceLog(request);
79+
}
80+
81+
private ILogService getProxyLogService(TaskInstance taskInstance) {
82+
ILogService logService = Clients
83+
.withService(ILogService.class)
84+
.withHost(taskInstance.getHost());
85+
log.debug("Created log service for host: {}", taskInstance.getHost());
86+
return logService;
87+
}
88+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.executor.logging;
19+
20+
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
21+
import org.apache.dolphinscheduler.extract.common.transportor.LogResponseStatus;
22+
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
23+
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
24+
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
25+
import org.apache.dolphinscheduler.registry.api.RegistryClient;
26+
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
27+
28+
import lombok.extern.slf4j.Slf4j;
29+
30+
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.stereotype.Component;
32+
33+
@Slf4j
34+
@Component
35+
public class LogClientDelegate {
36+
37+
@Autowired
38+
private LocalLogClient localLogClient;
39+
@Autowired
40+
private RemoteLogClient remoteLogClient;
41+
@Autowired
42+
private RegistryClient registryClient;
43+
44+
/**
45+
* Retrieves a portion of the log string for a given task instance.
46+
* This method first attempts to fetch the log from local storage; if unsuccessful, it tries to obtain the log from remote storage.
47+
*
48+
* @param taskInstance The task instance object, containing information needed for log retrieval.
49+
* @param skipLineNum The number of log lines to skip from the beginning.
50+
* @param limit The maximum number of log lines to retrieve.
51+
* @return A string containing the specified portion of the log.
52+
*/
53+
public String getPartLogString(TaskInstance taskInstance, int skipLineNum, int limit) {
54+
checkArgs(taskInstance);
55+
if (checkNodeExists(taskInstance)) {
56+
TaskInstanceLogPageQueryResponse response = localLogClient.getPartLog(taskInstance, skipLineNum, limit);
57+
if (response.getCode() == LogResponseStatus.SUCCESS) {
58+
return response.getLogContent();
59+
} else {
60+
log.warn("get part log string is not success for task instance {}; reason :{}",
61+
taskInstance.getId(), response.getMessage());
62+
return remoteLogClient.getPartLog(taskInstance, skipLineNum, limit);
63+
}
64+
} else {
65+
return remoteLogClient.getPartLog(taskInstance, skipLineNum, limit);
66+
}
67+
}
68+
69+
/**
70+
* Retrieves the complete log content for a given task instance as a byte array.
71+
* This method first attempts to fetch the log from local storage; if unsuccessful, it tries to obtain the log from remote storage.
72+
*
73+
* @param taskInstance The task instance object, containing information needed for log retrieval.
74+
* @return A byte array containing the complete log content.
75+
*/
76+
public byte[] getWholeLogBytes(TaskInstance taskInstance) {
77+
checkArgs(taskInstance);
78+
if (checkNodeExists(taskInstance)) {
79+
TaskInstanceLogFileDownloadResponse response = localLogClient.getWholeLog(taskInstance);
80+
if (response.getCode() == LogResponseStatus.SUCCESS) {
81+
return response.getLogBytes();
82+
} else {
83+
log.warn("get whole log bytes is not success for task instance {}; reason :{}", taskInstance.getId(),
84+
response.getMessage());
85+
return remoteLogClient.getWholeLog(taskInstance);
86+
}
87+
} else {
88+
return remoteLogClient.getWholeLog(taskInstance);
89+
}
90+
}
91+
92+
private static void checkArgs(TaskInstance taskInstance) {
93+
if (taskInstance == null) {
94+
throw new IllegalArgumentException("canFetchLog task instance is null");
95+
}
96+
}
97+
98+
private boolean checkNodeExists(TaskInstance taskInstance) {
99+
RegistryNodeType nodeType;
100+
if (TaskTypeUtils.isLogicTask(taskInstance.getTaskType())) {
101+
nodeType = RegistryNodeType.MASTER;
102+
} else {
103+
nodeType = RegistryNodeType.WORKER;
104+
}
105+
boolean exists = registryClient.checkNodeExists(taskInstance.getHost(), nodeType);
106+
if (!exists) {
107+
log.warn("Node {} does not exist for task instance {}", taskInstance.getHost(), taskInstance.getId());
108+
}
109+
return exists;
110+
}
111+
112+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.executor.logging;
19+
20+
import org.apache.dolphinscheduler.common.utils.LogUtils;
21+
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
22+
23+
import org.springframework.stereotype.Component;
24+
25+
@Component
26+
public class RemoteLogClient {
27+
28+
/**
29+
* Retrieves the entire log content for a given task instance.
30+
* This method is used when it is necessary to obtain all the log information for a task instance.
31+
*
32+
* @param taskInstance The task instance object, containing information such as the task ID and log path.
33+
* @return Returns the log content in byte array format.
34+
*/
35+
public byte[] getWholeLog(TaskInstance taskInstance) {
36+
return LogUtils.getFileContentBytesFromRemote(taskInstance.getLogPath());
37+
}
38+
39+
/**
40+
* Retrieves part of the log content for a given task instance, based on the specified line number and the number of lines to read.
41+
* This method is used when it is necessary to browse a portion of the log content, allowing for skipping a certain number of lines and limiting the number of lines read.
42+
*
43+
* @param taskInstance The task instance object, containing information such as the task ID and log path.
44+
* @param skipLineNum The number of lines to skip, starting from the beginning of the log.
45+
* @param limit The maximum number of lines to read.
46+
* @return Returns the specified part of the log content in string format.
47+
*/
48+
public String getPartLog(TaskInstance taskInstance, int skipLineNum, int limit) {
49+
// todo We can optimize requests by the actual range, reducing disk usage and network traffic.
50+
return LogUtils.rollViewLogLines(
51+
LogUtils.readPartFileContentFromRemote(taskInstance.getLogPath(), skipLineNum, limit));
52+
}
53+
54+
}

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.dolphinscheduler.api.enums.Status;
2424
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
25+
import org.apache.dolphinscheduler.api.executor.logging.LogClientDelegate;
2526
import org.apache.dolphinscheduler.api.service.LoggerService;
2627
import org.apache.dolphinscheduler.api.service.ProjectService;
2728
import org.apache.dolphinscheduler.api.utils.Result;
@@ -34,12 +35,6 @@
3435
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
3536
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
3637
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
37-
import org.apache.dolphinscheduler.extract.base.client.Clients;
38-
import org.apache.dolphinscheduler.extract.common.ILogService;
39-
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
40-
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
41-
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
42-
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
4338

4439
import org.apache.commons.lang3.StringUtils;
4540

@@ -73,6 +68,9 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
7368
@Autowired
7469
private TaskDefinitionMapper taskDefinitionMapper;
7570

71+
@Autowired
72+
private LogClientDelegate logClientDelegate;
73+
7674
/**
7775
* view log
7876
*
@@ -203,17 +201,7 @@ private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
203201
}
204202

205203
try {
206-
TaskInstanceLogPageQueryRequest request = TaskInstanceLogPageQueryRequest.builder()
207-
.taskInstanceId(taskInstance.getId())
208-
.taskInstanceLogAbsolutePath(logPath)
209-
.skipLineNum(skipLineNum)
210-
.limit(limit)
211-
.build();
212-
final TaskInstanceLogPageQueryResponse response = Clients
213-
.withService(ILogService.class)
214-
.withHost(taskInstance.getHost())
215-
.pageQueryTaskInstanceLog(request);
216-
String logContent = response.getLogContent();
204+
String logContent = logClientDelegate.getPartLogString(taskInstance, skipLineNum, limit);
217205
if (logContent != null) {
218206
sb.append(logContent);
219207
}
@@ -241,14 +229,7 @@ private byte[] getLogBytes(TaskInstance taskInstance) {
241229
byte[] logBytes;
242230

243231
try {
244-
final TaskInstanceLogFileDownloadRequest request = new TaskInstanceLogFileDownloadRequest(
245-
taskInstance.getId(),
246-
logPath);
247-
final TaskInstanceLogFileDownloadResponse response = Clients
248-
.withService(ILogService.class)
249-
.withHost(taskInstance.getHost())
250-
.getTaskInstanceWholeLogFileBytes(request);
251-
logBytes = response.getLogBytes();
232+
logBytes = logClientDelegate.getWholeLogBytes(taskInstance);
252233
return Bytes.concat(head, logBytes);
253234
} catch (Exception ex) {
254235
log.error("Download TaskInstance: {} Log Error", taskInstance.getName(), ex);

0 commit comments

Comments
 (0)