Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`supervisorId`, `dataSource`, `tags`| < 1s |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.|
|`ingest/segmentUpgrade/count`|Number of pending segments that a concurrent replace (for example, compaction) upgraded to a new version and asked the supervisor to have running tasks announce under the new version. Emitted by the replace task only when streaming ingestion is running concurrently with replace on the same interval.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0 unless [concurrent append and replace](../ingestion/concurrent-append-replace.md) is in use.|
|`ingest/segmentUpgrade/notified`|Number of upgraded pending segments the supervisor successfully routed to at least one running task. Compare with `ingest/segmentUpgrade/count`: `count` should equal `notified` + `unmatched` over the same period and `dataSource`.|`supervisorId`, `dataSource`, `stream`, `tags`|Equal to `count` in a healthy cluster.|
|`ingest/segmentUpgrade/unmatched`|Number of upgraded pending segments the supervisor could not route to any running task. These are not announced under the new version until handoff, so the corresponding data may be briefly missing from queries.|`supervisorId`, `dataSource`, `stream`, `tags`|0. A non-zero value indicates a lost upgrade announcement.|
|`ingest/segmentUpgrade/sendFailed`|Number of upgrade requests that matched a running task but failed to reach it over the wire after retries.|`supervisorId`, `dataSource`, `stream`, `taskId`, `tags`|0|
|`ingest/segmentUpgrade/announced`|Number of upgraded segments a task announced under the new version. Emitted once per task, so it scales with the replica count. Do not subtract it directly from `ingest/segmentUpgrade/count`, which is per-segment rather than per-replica.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Greater than 0 while concurrent replace occurs.|
|`ingest/segmentUpgrade/skipped`|Number of upgrade requests a task received but did not announce. The `reason` dimension is one of `unknownBase` (the request reached the wrong task), `noSink` (the base sink is no longer present), or `dropping` (the base sink is handing off, which is benign and covered by the durable publish path).|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`, `reason`|0, excluding `reason=dropping`.|
|`task/autoScaler/requiredCount`|Count of required tasks based on the calculations of the auto scaler.|`supervisorId`, `dataSource`, `stream`, `scalingSkipReason`|Depends on auto scaler config.|
|`task/autoScaler/scaleActionTime`|Time taken in milliseconds to complete the scale action.|`supervisorId`, `dataSource`, `stream`, `tags`|Depends on auto scaler config.|
|`task/autoScaler/costBased/optimalTaskCount`|Optimal task count computed by the cost-based auto scaler.|`supervisorId`, `dataSource`, `stream`|Depends on auto scaler config.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,29 @@
"ingest/events/messageGap": [
"dataSource"
],
"ingest/segmentUpgrade/count": [
"dataSource"
],
"ingest/segmentUpgrade/notified": [
"dataSource",
"stream"
],
"ingest/segmentUpgrade/unmatched": [
"dataSource",
"stream"
],
"ingest/segmentUpgrade/sendFailed": [
"dataSource",
"stream",
"taskId"
],
"ingest/segmentUpgrade/announced": [
"dataSource"
],
"ingest/segmentUpgrade/skipped": [
"dataSource",
"reason"
],
"ingest/kafka/lag": [
"dataSource",
"stream"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@
"ingest/notices/time" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to process a notice by the supervisor." },
"ingest/pause/time" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds spent by a task in a paused state without ingesting." },
"ingest/handoff/time" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000.0, "help": "Total number of seconds taken to handoff a set of segments." },
"ingest/segmentUpgrade/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of pending segments upgraded by a concurrent replace." },
"ingest/segmentUpgrade/notified" : { "dimensions" : ["dataSource", "stream"], "type" : "count", "help": "Number of upgraded pending segments the supervisor routed to a running task." },
"ingest/segmentUpgrade/unmatched" : { "dimensions" : ["dataSource", "stream"], "type" : "count", "help": "Number of upgraded pending segments that matched no running task." },
"ingest/segmentUpgrade/sendFailed" : { "dimensions" : ["dataSource", "stream", "taskId"], "type" : "count", "help": "Number of upgrade requests that failed to reach a task." },
"ingest/segmentUpgrade/announced" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of upgraded segments a task announced under the new version." },
"ingest/segmentUpgrade/skipped" : { "dimensions" : ["dataSource", "reason"], "type" : "count", "help": "Number of upgrade requests a task received but did not announce." },
"task/autoScaler/requiredCount" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Count of required tasks based on the calculations of lagBased auto scaler." },

"task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to run a task."},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@
"ingest/segments/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"ingest/rows/published": { "dimensions" : ["dataSource"], "type" : "count" },

"ingest/segmentUpgrade/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"ingest/segmentUpgrade/notified" : { "dimensions" : ["dataSource", "stream"], "type" : "count" },
"ingest/segmentUpgrade/unmatched" : { "dimensions" : ["dataSource", "stream"], "type" : "count" },
"ingest/segmentUpgrade/sendFailed" : { "dimensions" : ["dataSource", "stream", "taskId"], "type" : "count" },
"ingest/segmentUpgrade/announced" : { "dimensions" : ["dataSource"], "type" : "count" },
"ingest/segmentUpgrade/skipped" : { "dimensions" : ["dataSource", "reason"], "type" : "count" },

"ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" },
"ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" },
"ingest/kafka/avgLag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" },
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common;

/**
* Metric names and dimension values for the re-announcement of pending segments upgraded by a concurrent REPLACE.
* <ul>
* <li>the task action emits {@link #COUNT} (how many upgrades a commit produced),</li>
* <li>the supervisor emits {@link #NOTIFIED}, {@link #UNMATCHED} and {@link #SEND_FAILED} as it fans requests out,</li>
* <li>the streaming task emits {@link #ANNOUNCED} and {@link #SKIPPED} (with a {@code reason}) as it applies them.</li>
* </ul>
* Comparing {@link #COUNT} (created) against {@link #ANNOUNCED} (applied) per dataSource quantifies the visibility gap;
* {@link #UNMATCHED}, {@link #SEND_FAILED} and {@link #SKIPPED} attribute where a lost upgrade dropped out.
*/
public class SegmentUpgradeMetrics
{
/** Number of upgraded pending segments a REPLACE commit created and handed to the supervisor. Task-action dims. */
public static final String COUNT = "ingest/segmentUpgrade/count";

/** A record was delivered to at least one running task. Supervisor dims. */
public static final String NOTIFIED = "ingest/segmentUpgrade/notified";

/** A record matched no running task and will not be re-announced until handoff. Supervisor dims. */
public static final String UNMATCHED = "ingest/segmentUpgrade/unmatched";

/** An upgrade request failed to reach a task over the wire. Supervisor dims plus {@code taskId}. */
public static final String SEND_FAILED = "ingest/segmentUpgrade/sendFailed";

/** A task announced an upgraded segment under the new version. Task dims. */
public static final String ANNOUNCED = "ingest/segmentUpgrade/announced";

/** A task received an upgrade request but did not announce it; see the {@code reason} dimension. Task dims. */
public static final String SKIPPED = "ingest/segmentUpgrade/skipped";

// Values for the DruidMetrics.REASON dimension on SKIPPED.

/** The task holds no pending segment matching upgradedFromSegmentId (request targeted the wrong task). */
public static final String REASON_UNKNOWN_BASE = "unknownBase";
/** The base sink is gone even though this task once held it. */
public static final String REASON_NO_SINK = "noSink";
/** The base sink is being dropped (handoff in progress); the durable path re-announces at the new version. */
public static final String REASON_DROPPING = "dropping";

private SegmentUpgradeMetrics()
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.SegmentUpgradeMetrics;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentSchemaMapping;
Expand Down Expand Up @@ -169,14 +171,33 @@ private void registerUpgradedPendingSegmentsOnSupervisor(
List<PendingSegmentRecord> upgradedPendingSegments
)
{
// Emit the count of upgrades this commit produced regardless of whether a supervisor exists to
// receive them, so it can be compared against the count actually announced by tasks.
if (!upgradedPendingSegments.isEmpty()) {
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
toolbox.getEmitter().emit(metricBuilder.setMetric(SegmentUpgradeMetrics.COUNT, upgradedPendingSegments.size()));
}

final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
final Optional<String> activeSupervisorIdWithAppendLock =
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());

if (!activeSupervisorIdWithAppendLock.isPresent()) {
log.info("No active streaming supervisor for datasource[%s]; the [%d] upgraded pending segment(s) from task[%s]"
+ " will become queryable when their tasks hand off.",
task.getDataSource(),
upgradedPendingSegments.size(),
task.getId()
);
return;
}

log.info("Registering [%d] upgraded pending segments created by task[%s] on supervisor[%s]",
upgradedPendingSegments.size(),
task.getId(),
activeSupervisorIdWithAppendLock.get()
);
upgradedPendingSegments.forEach(
upgradedPendingSegment -> supervisorManager.registerUpgradedPendingSegmentOnSupervisor(
activeSupervisorIdWithAppendLock.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentUpgradeMetrics;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
Expand All @@ -80,6 +81,7 @@
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.InputRowFilterResult;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
Expand Down Expand Up @@ -1846,7 +1848,9 @@ public Response registerUpgradedPendingSegment(
{
authorizationCheck(req);
try {
((StreamAppenderator) appenderator).registerUpgradedPendingSegment(upgradedPendingSegment);
final StreamAppenderator.UpgradeAnnouncementOutcome outcome =
((StreamAppenderator) appenderator).registerUpgradedPendingSegment(upgradedPendingSegment);
emitUpgradeAnnouncementMetric(outcome);
return Response.ok().build();
}
catch (DruidException e) {
Expand All @@ -1865,6 +1869,33 @@ public Response registerUpgradedPendingSegment(
}
}

private void emitUpgradeAnnouncementMetric(StreamAppenderator.UpgradeAnnouncementOutcome outcome)
{
if (outcome == StreamAppenderator.UpgradeAnnouncementOutcome.ANNOUNCED) {
task.emitMetric(toolbox.getEmitter(), SegmentUpgradeMetrics.ANNOUNCED, 1);
return;
}
final String reason;
switch (outcome) {
case SKIPPED_UNKNOWN_BASE:
reason = SegmentUpgradeMetrics.REASON_UNKNOWN_BASE;
break;
case SKIPPED_NO_SINK:
reason = SegmentUpgradeMetrics.REASON_NO_SINK;
break;
case SKIPPED_DROPPING:
reason = SegmentUpgradeMetrics.REASON_DROPPING;
break;
default:
return;
}
toolbox.getEmitter().emit(
task.getMetricBuilder()
.setDimension(DruidMetrics.REASON, reason)
.setMetric(SegmentUpgradeMetrics.SKIPPED, 1)
);
}

public Map<String, Object> doGetRowStats()
{
Map<String, Object> returnMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
Expand All @@ -48,6 +49,7 @@
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentUpgradeMetrics;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
Expand Down Expand Up @@ -1419,22 +1421,89 @@ public void registerNewVersionOfPendingSegment(
PendingSegmentRecord pendingSegmentRecord
)
{
final String taskAllocatorId = pendingSegmentRecord.getTaskAllocatorId();
int matchedGroups = 0;
int notifiedTasks = 0;

for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
if (taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) {
if (taskGroup.baseSequenceName.equals(taskAllocatorId)) {
matchedGroups++;
for (String taskId : taskGroup.taskIds()) {
taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord);
notifyTaskOfUpgradedPendingSegment(taskId, pendingSegmentRecord);
notifiedTasks++;
}
}
}
for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values()) {
for (TaskGroup taskGroup : taskGroupList) {
if (taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) {
if (taskGroup.baseSequenceName.equals(taskAllocatorId)) {
matchedGroups++;
for (String taskId : taskGroup.taskIds()) {
taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord);
notifyTaskOfUpgradedPendingSegment(taskId, pendingSegmentRecord);
notifiedTasks++;
}
}
}
}

if (notifiedTasks == 0) {
// No running task matched: the segment will not be re-announced until handoff. This is a potential silent-loss
// window where data will not be queryable until handoff.
log.warn(
"Upgraded pending segment[%s] (upgradedFrom[%s], taskAllocatorId[%s]) matched no running task on"
+ " supervisor[%s]; it will not be re-announced until handoff. Currently tracking [%d] activelyReading"
+ " and [%d] pendingCompletion task group(s).",
pendingSegmentRecord.getId(),
pendingSegmentRecord.getUpgradedFromSegmentId(),
taskAllocatorId,
supervisorId,
activelyReadingTaskGroups.size(),
pendingCompletionTaskGroups.size()
);
emitter.emit(getMetricBuilder().setMetric(SegmentUpgradeMetrics.UNMATCHED, 1));
} else {
log.info(
"Notified [%d] task(s) across [%d] task group(s) of upgraded pending segment[%s] (upgradedFrom[%s]).",
notifiedTasks,
matchedGroups,
pendingSegmentRecord.getId(),
pendingSegmentRecord.getUpgradedFromSegmentId()
);
emitter.emit(getMetricBuilder().setMetric(SegmentUpgradeMetrics.NOTIFIED, 1));
}
}

/**
* Sends an upgraded pending segment to a single task and records a metric if the request fails to reach the task.
*/
private void notifyTaskOfUpgradedPendingSegment(String taskId, PendingSegmentRecord pendingSegmentRecord)
{
Futures.addCallback(
taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord),
new FutureCallback<>()
{
@Override
public void onSuccess(Boolean result)
{
// Successful delivery is captured by the notified metric; nothing to do here.
}

@Override
public void onFailure(Throwable t)
{
log.warn(
t,
"Failed to send upgraded pending segment[%s] to task[%s] on supervisor[%s].",
pendingSegmentRecord.getId(), taskId, supervisorId
);
emitter.emit(
getMetricBuilder().setDimension(DruidMetrics.TASK_ID, taskId)
.setMetric(SegmentUpgradeMetrics.SEND_FAILED, 1)
);
}
},
MoreExecutors.directExecutor()
);
}

public ReentrantLock getRecordSupplierLock()
Expand Down
Loading
Loading