Skip to content

Commit c54160f

Browse files
authored
fix(apps): route app logs through streamable log storage when enabled (#27383)
1 parent 08e52b9 commit c54160f

1 file changed

Lines changed: 60 additions & 6 deletions

File tree

  • openmetadata-service/src/main/java/org/openmetadata/service/resources/apps

openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.UUID;
4949
import java.util.concurrent.ExecutorService;
5050
import java.util.concurrent.TimeUnit;
51+
import java.util.stream.Collectors;
5152
import lombok.extern.slf4j.Slf4j;
5253
import org.openmetadata.schema.ServiceEntityInterface;
5354
import org.openmetadata.schema.api.data.RestoreEntity;
@@ -542,7 +543,13 @@ public Response getLastLogs(
542543
+ "If not provided, returns logs for the latest run.",
543544
schema = @Schema(type = "string"))
544545
@QueryParam("runId")
545-
String runId) {
546+
String runId,
547+
@Parameter(
548+
description = "Maximum number of lines to return (only applies to streamable logs)",
549+
schema = @Schema(type = "integer"))
550+
@QueryParam("limit")
551+
@DefaultValue("1000")
552+
int limit) {
546553
App installation = repository.getByName(uriInfo, name, repository.getFields("id,pipelines"));
547554
if (installation.getAppType().equals(AppType.Internal)) {
548555
AppRunRecord latestRun = repository.getLatestAppRunsOptional(installation).orElse(null);
@@ -557,16 +564,63 @@ public Response getLastLogs(
557564
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
558565
IngestionPipeline ingestionPipeline =
559566
ingestionPipelineRepository.get(
560-
uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS));
561-
return Response.ok(
562-
pipelineServiceClient.getIngestionLogs(ingestionPipeline, after, runId),
563-
MediaType.APPLICATION_JSON_TYPE)
564-
.build();
567+
uriInfo,
568+
pipelineRef.getId(),
569+
ingestionPipelineRepository.getFields(
570+
FIELD_OWNERS + ",pipelineStatuses,ingestionRunner"));
571+
Map<String, String> lastLogs =
572+
getAppLastLogs(ingestionPipelineRepository, ingestionPipeline, after, runId, limit);
573+
return Response.ok(lastLogs, MediaType.APPLICATION_JSON_TYPE).build();
565574
}
566575
}
567576
throw new BadRequestException("Failed to Get Logs for the Installation.");
568577
}
569578

579+
private Map<String, String> getAppLastLogs(
580+
IngestionPipelineRepository ingestionPipelineRepository,
581+
IngestionPipeline ingestionPipeline,
582+
String after,
583+
String runId,
584+
int limit) {
585+
boolean useStreamableLogs =
586+
Boolean.TRUE.equals(ingestionPipeline.getEnableStreamableLogs())
587+
|| (ingestionPipeline.getIngestionRunner() != null
588+
&& ingestionPipelineRepository.isIngestionRunnerStreamableLogsEnabled(
589+
ingestionPipeline.getIngestionRunner()));
590+
if (useStreamableLogs) {
591+
String effectiveRunId =
592+
!nullOrEmpty(runId)
593+
? runId
594+
: (ingestionPipeline.getPipelineStatuses() != null
595+
? ingestionPipeline.getPipelineStatuses().getRunId()
596+
: null);
597+
if (!nullOrEmpty(effectiveRunId)) {
598+
UUID runUuid;
599+
try {
600+
runUuid = UUID.fromString(effectiveRunId);
601+
} catch (IllegalArgumentException e) {
602+
throw new BadRequestException("Invalid runId format: " + effectiveRunId);
603+
}
604+
Map<String, Object> raw =
605+
ingestionPipelineRepository.getLogs(
606+
ingestionPipeline.getFullyQualifiedName(), runUuid, after, limit);
607+
Map<String, String> lastLogs =
608+
raw.entrySet().stream()
609+
.filter(e -> e.getValue() != null)
610+
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
611+
Object logs = lastLogs.remove("logs");
612+
if (logs != null) {
613+
lastLogs.put(
614+
PipelineServiceClientInterface.TYPE_TO_TASK.get(
615+
ingestionPipeline.getPipelineType().toString()),
616+
logs.toString());
617+
}
618+
return lastLogs;
619+
}
620+
}
621+
return pipelineServiceClient.getIngestionLogs(ingestionPipeline, after, runId);
622+
}
623+
570624
@GET
571625
@Path("/name/{name}/runs/latest")
572626
@Operation(

0 commit comments

Comments
 (0)