diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index f4757760b719..506cb0c59eb4 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -317,6 +317,12 @@ batch ingestion emit the following metrics. These metrics are deltas for each em |`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`supervisorId`, `dataSource`, `tags`| < 1s | |`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds| |`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.| +|`ingest/segmentUpgrade/count`|Number of pending segments that a concurrent replace (for example, compaction) upgraded to a new version and asked the supervisor to have running tasks announce under the new version. Emitted by the replace task only when streaming ingestion is running concurrently with replace on the same interval.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0 unless [concurrent append and replace](../ingestion/concurrent-append-replace.md) is in use.| +|`ingest/segmentUpgrade/notified`|Number of upgraded pending segments the supervisor successfully routed to at least one running task. Compare with `ingest/segmentUpgrade/count`: `count` should equal `notified` + `unmatched` over the same period and `dataSource`.|`supervisorId`, `dataSource`, `stream`, `tags`|Equal to `count` in a healthy cluster.| +|`ingest/segmentUpgrade/unmatched`|Number of upgraded pending segments the supervisor could not route to any running task. These are not announced under the new version until handoff, so the corresponding data may be briefly missing from queries.|`supervisorId`, `dataSource`, `stream`, `tags`|0. A non-zero value indicates a lost upgrade announcement.| +|`ingest/segmentUpgrade/sendFailed`|Number of upgrade requests that matched a running task but failed to reach it over the wire after retries.|`supervisorId`, `dataSource`, `stream`, `taskId`, `tags`|0| +|`ingest/segmentUpgrade/announced`|Number of upgraded segments a task announced under the new version. Emitted once per task, so it scales with the replica count. Do not subtract it directly from `ingest/segmentUpgrade/count`, which is per-segment rather than per-replica.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Greater than 0 while concurrent replace occurs.| +|`ingest/segmentUpgrade/skipped`|Number of upgrade requests a task received but did not announce. The `reason` dimension is one of `unknownBase` (the request reached the wrong task), `noSink` (the base sink is no longer present), or `dropping` (the base sink is handing off, which is benign and covered by the durable publish path).|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`, `reason`|0, excluding `reason=dropping`.| |`task/autoScaler/requiredCount`|Count of required tasks based on the calculations of the auto scaler.|`supervisorId`, `dataSource`, `stream`, `scalingSkipReason`|Depends on auto scaler config.| |`task/autoScaler/scaleActionTime`|Time taken in milliseconds to complete the scale action.|`supervisorId`, `dataSource`, `stream`, `tags`|Depends on auto scaler config.| |`task/autoScaler/costBased/optimalTaskCount`|Optimal task count computed by the cost-based auto scaler.|`supervisorId`, `dataSource`, `stream`|Depends on auto scaler config.| diff --git a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json index f38f115348f8..d1adcca8b37c 100644 --- a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json +++ b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json @@ -105,6 +105,29 @@ "ingest/events/messageGap": [ "dataSource" ], + "ingest/segmentUpgrade/count": [ + "dataSource" + ], + "ingest/segmentUpgrade/notified": [ + "dataSource", + "stream" + ], + "ingest/segmentUpgrade/unmatched": [ + "dataSource", + "stream" + ], + "ingest/segmentUpgrade/sendFailed": [ + "dataSource", + "stream", + "taskId" + ], + "ingest/segmentUpgrade/announced": [ + "dataSource" + ], + "ingest/segmentUpgrade/skipped": [ + "dataSource", + "reason" + ], "ingest/kafka/lag": [ "dataSource", "stream" diff --git a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json index e582958e7683..37a7fa1f420e 100644 --- a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json +++ b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json @@ -134,6 +134,12 @@ "ingest/notices/time" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to process a notice by the supervisor." }, "ingest/pause/time" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds spent by a task in a paused state without ingesting." }, "ingest/handoff/time" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000.0, "help": "Total number of seconds taken to handoff a set of segments." }, + "ingest/segmentUpgrade/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of pending segments upgraded by a concurrent replace." }, + "ingest/segmentUpgrade/notified" : { "dimensions" : ["dataSource", "stream"], "type" : "count", "help": "Number of upgraded pending segments the supervisor routed to a running task." }, + "ingest/segmentUpgrade/unmatched" : { "dimensions" : ["dataSource", "stream"], "type" : "count", "help": "Number of upgraded pending segments that matched no running task." }, + "ingest/segmentUpgrade/sendFailed" : { "dimensions" : ["dataSource", "stream", "taskId"], "type" : "count", "help": "Number of upgrade requests that failed to reach a task." }, + "ingest/segmentUpgrade/announced" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of upgraded segments a task announced under the new version." }, + "ingest/segmentUpgrade/skipped" : { "dimensions" : ["dataSource", "reason"], "type" : "count", "help": "Number of upgrade requests a task received but did not announce." }, "task/autoScaler/requiredCount" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Count of required tasks based on the calculations of lagBased auto scaler." }, "task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to run a task."}, diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index 9eae1dd5a62e..e0ac2eeb6d72 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -57,6 +57,13 @@ "ingest/segments/count" : { "dimensions" : ["dataSource"], "type" : "count" }, "ingest/rows/published": { "dimensions" : ["dataSource"], "type" : "count" }, + "ingest/segmentUpgrade/count" : { "dimensions" : ["dataSource"], "type" : "count" }, + "ingest/segmentUpgrade/notified" : { "dimensions" : ["dataSource", "stream"], "type" : "count" }, + "ingest/segmentUpgrade/unmatched" : { "dimensions" : ["dataSource", "stream"], "type" : "count" }, + "ingest/segmentUpgrade/sendFailed" : { "dimensions" : ["dataSource", "stream", "taskId"], "type" : "count" }, + "ingest/segmentUpgrade/announced" : { "dimensions" : ["dataSource"], "type" : "count" }, + "ingest/segmentUpgrade/skipped" : { "dimensions" : ["dataSource", "reason"], "type" : "count" }, + "ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" }, "ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" }, "ingest/kafka/avgLag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" }, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentUpgradeMetrics.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentUpgradeMetrics.java new file mode 100644 index 000000000000..695a874c0387 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentUpgradeMetrics.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common; + +/** + * Metric names and dimension values for the re-announcement of pending segments upgraded by a concurrent REPLACE. + * + * Comparing {@link #COUNT} (created) against {@link #ANNOUNCED} (applied) per dataSource quantifies the visibility gap; + * {@link #UNMATCHED}, {@link #SEND_FAILED} and {@link #SKIPPED} attribute where a lost upgrade dropped out. + */ +public class SegmentUpgradeMetrics +{ + /** Number of upgraded pending segments a REPLACE commit created and handed to the supervisor. Task-action dims. */ + public static final String COUNT = "ingest/segmentUpgrade/count"; + + /** A record was delivered to at least one running task. Supervisor dims. */ + public static final String NOTIFIED = "ingest/segmentUpgrade/notified"; + + /** A record matched no running task and will not be re-announced until handoff. Supervisor dims. */ + public static final String UNMATCHED = "ingest/segmentUpgrade/unmatched"; + + /** An upgrade request failed to reach a task over the wire. Supervisor dims plus {@code taskId}. */ + public static final String SEND_FAILED = "ingest/segmentUpgrade/sendFailed"; + + /** A task announced an upgraded segment under the new version. Task dims. */ + public static final String ANNOUNCED = "ingest/segmentUpgrade/announced"; + + /** A task received an upgrade request but did not announce it; see the {@code reason} dimension. Task dims. */ + public static final String SKIPPED = "ingest/segmentUpgrade/skipped"; + + // Values for the DruidMetrics.REASON dimension on SKIPPED. + + /** The task holds no pending segment matching upgradedFromSegmentId (request targeted the wrong task). */ + public static final String REASON_UNKNOWN_BASE = "unknownBase"; + /** The base sink is gone even though this task once held it. */ + public static final String REASON_NO_SINK = "noSink"; + /** The base sink is being dropped (handoff in progress); the durable path re-announces at the new version. */ + public static final String REASON_DROPPING = "dropping"; + + private SegmentUpgradeMetrics() + { + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index 6546ed80d9d9..72882c9b9236 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -24,12 +24,14 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.SegmentUpgradeMetrics; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.SegmentSchemaMapping; @@ -169,14 +171,33 @@ private void registerUpgradedPendingSegmentsOnSupervisor( List upgradedPendingSegments ) { + // Emit the count of upgrades this commit produced regardless of whether a supervisor exists to + // receive them, so it can be compared against the count actually announced by tasks. + if (!upgradedPendingSegments.isEmpty()) { + final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); + IndexTaskUtils.setTaskDimensions(metricBuilder, task); + toolbox.getEmitter().emit(metricBuilder.setMetric(SegmentUpgradeMetrics.COUNT, upgradedPendingSegments.size())); + } + final SupervisorManager supervisorManager = toolbox.getSupervisorManager(); final Optional activeSupervisorIdWithAppendLock = supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource()); if (!activeSupervisorIdWithAppendLock.isPresent()) { + log.info("No active streaming supervisor for datasource[%s]; the [%d] upgraded pending segment(s) from task[%s]" + + " will become queryable when their tasks hand off.", + task.getDataSource(), + upgradedPendingSegments.size(), + task.getId() + ); return; } + log.info("Registering [%d] upgraded pending segments created by task[%s] on supervisor[%s]", + upgradedPendingSegments.size(), + task.getId(), + activeSupervisorIdWithAppendLock.get() + ); upgradedPendingSegments.forEach( upgradedPendingSegment -> supervisorManager.registerUpgradedPendingSegmentOnSupervisor( activeSupervisorIdWithAppendLock.get(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 2390792fd4d2..0492ac52060e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -58,6 +58,7 @@ import org.apache.druid.indexer.report.TaskContextReport; import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.SegmentUpgradeMetrics; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; @@ -80,6 +81,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.PendingSegmentRecord; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.incremental.InputRowFilterResult; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.ParseExceptionReport; @@ -1846,7 +1848,9 @@ public Response registerUpgradedPendingSegment( { authorizationCheck(req); try { - ((StreamAppenderator) appenderator).registerUpgradedPendingSegment(upgradedPendingSegment); + final StreamAppenderator.UpgradeAnnouncementOutcome outcome = + ((StreamAppenderator) appenderator).registerUpgradedPendingSegment(upgradedPendingSegment); + emitUpgradeAnnouncementMetric(outcome); return Response.ok().build(); } catch (DruidException e) { @@ -1865,6 +1869,33 @@ public Response registerUpgradedPendingSegment( } } + private void emitUpgradeAnnouncementMetric(StreamAppenderator.UpgradeAnnouncementOutcome outcome) + { + if (outcome == StreamAppenderator.UpgradeAnnouncementOutcome.ANNOUNCED) { + task.emitMetric(toolbox.getEmitter(), SegmentUpgradeMetrics.ANNOUNCED, 1); + return; + } + final String reason; + switch (outcome) { + case SKIPPED_UNKNOWN_BASE: + reason = SegmentUpgradeMetrics.REASON_UNKNOWN_BASE; + break; + case SKIPPED_NO_SINK: + reason = SegmentUpgradeMetrics.REASON_NO_SINK; + break; + case SKIPPED_DROPPING: + reason = SegmentUpgradeMetrics.REASON_DROPPING; + break; + default: + return; + } + toolbox.getEmitter().emit( + task.getMetricBuilder() + .setDimension(DruidMetrics.REASON, reason) + .setMetric(SegmentUpgradeMetrics.SKIPPED, 1) + ); + } + public Map doGetRowStats() { Map returnMap = new HashMap<>(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 74329c68e1d2..b1cda8302295 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -48,6 +49,7 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.SegmentUpgradeMetrics; import org.apache.druid.indexing.common.TaskInfoProvider; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.DataSourceMetadata; @@ -1419,22 +1421,89 @@ public void registerNewVersionOfPendingSegment( PendingSegmentRecord pendingSegmentRecord ) { + final String taskAllocatorId = pendingSegmentRecord.getTaskAllocatorId(); + int matchedGroups = 0; + int notifiedTasks = 0; + for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { - if (taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) { + if (taskGroup.baseSequenceName.equals(taskAllocatorId)) { + matchedGroups++; for (String taskId : taskGroup.taskIds()) { - taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord); + notifyTaskOfUpgradedPendingSegment(taskId, pendingSegmentRecord); + notifiedTasks++; } } } for (List taskGroupList : pendingCompletionTaskGroups.values()) { for (TaskGroup taskGroup : taskGroupList) { - if (taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) { + if (taskGroup.baseSequenceName.equals(taskAllocatorId)) { + matchedGroups++; for (String taskId : taskGroup.taskIds()) { - taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord); + notifyTaskOfUpgradedPendingSegment(taskId, pendingSegmentRecord); + notifiedTasks++; } } } } + + if (notifiedTasks == 0) { + // No running task matched: the segment will not be re-announced until handoff. This is a potential silent-loss + // window where data will not be queryable until handoff. + log.warn( + "Upgraded pending segment[%s] (upgradedFrom[%s], taskAllocatorId[%s]) matched no running task on" + + " supervisor[%s]; it will not be re-announced until handoff. Currently tracking [%d] activelyReading" + + " and [%d] pendingCompletion task group(s).", + pendingSegmentRecord.getId(), + pendingSegmentRecord.getUpgradedFromSegmentId(), + taskAllocatorId, + supervisorId, + activelyReadingTaskGroups.size(), + pendingCompletionTaskGroups.size() + ); + emitter.emit(getMetricBuilder().setMetric(SegmentUpgradeMetrics.UNMATCHED, 1)); + } else { + log.info( + "Notified [%d] task(s) across [%d] task group(s) of upgraded pending segment[%s] (upgradedFrom[%s]).", + notifiedTasks, + matchedGroups, + pendingSegmentRecord.getId(), + pendingSegmentRecord.getUpgradedFromSegmentId() + ); + emitter.emit(getMetricBuilder().setMetric(SegmentUpgradeMetrics.NOTIFIED, 1)); + } + } + + /** + * Sends an upgraded pending segment to a single task and records a metric if the request fails to reach the task. + */ + private void notifyTaskOfUpgradedPendingSegment(String taskId, PendingSegmentRecord pendingSegmentRecord) + { + Futures.addCallback( + taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord), + new FutureCallback<>() + { + @Override + public void onSuccess(Boolean result) + { + // Successful delivery is captured by the notified metric; nothing to do here. + } + + @Override + public void onFailure(Throwable t) + { + log.warn( + t, + "Failed to send upgraded pending segment[%s] to task[%s] on supervisor[%s].", + pendingSegmentRecord.getId(), taskId, supervisorId + ); + emitter.emit( + getMetricBuilder().setDimension(DruidMetrics.TASK_ID, taskId) + .setMetric(SegmentUpgradeMetrics.SEND_FAILED, 1) + ); + } + }, + MoreExecutors.directExecutor() + ); } public ReentrantLock getRecordSupplierLock() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 9e45920ad719..c0fe4fbd7b61 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -39,6 +39,7 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.granularity.UniformGranularitySpec; +import org.apache.druid.indexing.common.SegmentUpgradeMetrics; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.DataSourceMetadata; @@ -2116,6 +2117,85 @@ public void testRegisterNewVersionOfPendingSegment() Assert.assertEquals(pendingSegmentRecord0, captured0.getValue()); Assert.assertEquals(pendingSegmentRecord1, captured1.getValue()); + + // Both records matched a running task, so each emits a notified metric and none is unmatched. + Assert.assertEquals(2, emitter.getMetricEventCount(SegmentUpgradeMetrics.NOTIFIED)); + Assert.assertEquals(0, emitter.getMetricEventCount(SegmentUpgradeMetrics.UNMATCHED)); + verifyAll(); + } + + @Test + public void testRegisterNewVersionOfPendingSegmentUnmatchedEmitsMetric() + { + EasyMock.expect(spec.isSuspended()).andReturn(false); + replayAll(); + + final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + supervisor.getIoConfig().setTaskCount(3); + supervisor.start(); + + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("0"), + ImmutableMap.of("0", "5"), + null, + null, + ImmutableSet.of("task0"), + ImmutableSet.of(), + null + ); + + // A taskAllocatorId that matches no running task group + final PendingSegmentRecord record = PendingSegmentRecord.create( + new SegmentIdWithShardSpec("DS", Intervals.of("2024/2025"), "2024", new NumberedShardSpec(1, 0)), + "no-such-allocator", + "prevId", + "someAppendedSegment", + "no-such-allocator" + ); + + supervisor.registerNewVersionOfPendingSegment(record); + + Assert.assertEquals(1, emitter.getMetricEventCount(SegmentUpgradeMetrics.UNMATCHED)); + Assert.assertEquals(0, emitter.getMetricEventCount(SegmentUpgradeMetrics.NOTIFIED)); + verifyAll(); + } + + @Test + public void testRegisterNewVersionOfPendingSegmentSendFailureEmitsMetric() + { + EasyMock.expect(spec.isSuspended()).andReturn(false); + EasyMock.expect( + indexTaskClient.registerNewVersionOfPendingSegmentAsync(EasyMock.eq("task0"), EasyMock.anyObject()) + ).andReturn(Futures.immediateFailedFuture(new RuntimeException("boom"))); + replayAll(); + + final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + supervisor.getIoConfig().setTaskCount(3); + supervisor.start(); + + final SeekableStreamSupervisor.TaskGroup taskGroup0 = supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("0"), + ImmutableMap.of("0", "5"), + null, + null, + ImmutableSet.of("task0"), + ImmutableSet.of(), + null + ); + + final PendingSegmentRecord record = PendingSegmentRecord.create( + new SegmentIdWithShardSpec("DS", Intervals.of("2024/2025"), "2024", new NumberedShardSpec(1, 0)), + taskGroup0.getBaseSequenceName(), + "prevId", + "someAppendedSegment", + taskGroup0.getBaseSequenceName() + ); + + supervisor.registerNewVersionOfPendingSegment(record); + + // The record matched task0 (so notified fires) but delivery failed over the wire (so sendFailed fires). + Assert.assertEquals(1, emitter.getMetricEventCount(SegmentUpgradeMetrics.NOTIFIED)); + Assert.assertEquals(1, emitter.getMetricEventCount(SegmentUpgradeMetrics.SEND_FAILED)); verifyAll(); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 94e7a56679b1..455e5227e1f5 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -1181,12 +1181,46 @@ private void unannounceSegment(DataSegment segment) } } - public void registerUpgradedPendingSegment(PendingSegmentRecord pendingSegmentRecord) throws IOException + /** + * Outcome of a {@link #registerUpgradedPendingSegment} call, returned so the caller (which owns the emitter) can + * emit the corresponding metric. + */ + public enum UpgradeAnnouncementOutcome + { + ANNOUNCED, + SKIPPED_UNKNOWN_BASE, + SKIPPED_NO_SINK, + SKIPPED_DROPPING + } + + public UpgradeAnnouncementOutcome registerUpgradedPendingSegment(PendingSegmentRecord pendingSegmentRecord) throws IOException { SegmentIdWithShardSpec basePendingSegment = idToPendingSegment.get(pendingSegmentRecord.getUpgradedFromSegmentId()); SegmentIdWithShardSpec upgradedPendingSegment = pendingSegmentRecord.getId(); - if (!sinks.containsKey(basePendingSegment) || droppingSinks.contains(basePendingSegment)) { - return; + if (basePendingSegment == null || droppingSinks.contains(basePendingSegment) || !sinks.containsKey(basePendingSegment)) { + if (basePendingSegment == null) { + // This task never allocated a segment matching upgradedFromSegmentId, i.e. the request targeted the wrong task. + log.info( + "Not announcing upgraded pending segment[%s] because this task has no base sink matching" + + " upgradedFromSegmentId[%s]; the upgrade request likely targeted the wrong task[%s].", + upgradedPendingSegment, pendingSegmentRecord.getUpgradedFromSegmentId(), myId + ); + return UpgradeAnnouncementOutcome.SKIPPED_UNKNOWN_BASE; + } else if (droppingSinks.contains(basePendingSegment)) { + // Expected during handoff: the base sink is being dropped + log.debug( + "Not announcing upgraded pending segment[%s] for base segment[%s] on task[%s] because the base sink is being dropped.", + upgradedPendingSegment, basePendingSegment, myId + ); + return UpgradeAnnouncementOutcome.SKIPPED_DROPPING; + } else { + // Unexpected: the base sink is gone even though this task once held it. + log.info( + "Not announcing upgraded pending segment[%s] for base segment[%s] on task[%s] because the base sink is no longer present.", + upgradedPendingSegment, basePendingSegment, myId + ); + return UpgradeAnnouncementOutcome.SKIPPED_NO_SINK; + } } final Sink sink = sinks.get(basePendingSegment); @@ -1201,6 +1235,8 @@ public void registerUpgradedPendingSegment(PendingSegmentRecord pendingSegmentRe segmentAnnouncer.announceSegment(newSegment); baseSegmentToUpgradedSegments.get(basePendingSegment).add(upgradedPendingSegment); upgradedSegmentToBaseSegment.put(upgradedPendingSegment, basePendingSegment); + log.info("Announced upgraded segment[%s] for base segment[%s] on task[%s]", upgradedPendingSegment, basePendingSegment, myId); + return UpgradeAnnouncementOutcome.ANNOUNCED; } private DataSegment getUpgradedSegment(DataSegment baseSegment, SegmentIdWithShardSpec upgradedVersion) diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index bc5b3c6de360..2a09f82dca15 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -1160,6 +1160,60 @@ ScheduledFuture getLastScheduledFuture() } } + @Test + public void testRegisterUpgradedPendingSegmentReturnsAnnouncedWhenBaseSinkExists() throws Exception + { + try ( + final StreamAppenderatorTester tester = + new StreamAppenderatorTester.Builder().maxRowsInMemory(2) + .basePersistDirectory(temporaryFolder.newFolder()) + .build()) { + final StreamAppenderator appenderator = (StreamAppenderator) tester.getAppenderator(); + appenderator.startJob(); + // Create the base sink for IDENTIFIERS.get(0) so the upgrade can be announced against it. + appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Suppliers.ofInstance(Committers.nil())); + + final StreamAppenderator.UpgradeAnnouncementOutcome outcome = appenderator.registerUpgradedPendingSegment( + PendingSegmentRecord.create( + si("2000/2001", "B", 1), + si("2000/2001", "B", 1).asSegmentId().toString(), + IDENTIFIERS.get(0).asSegmentId().toString(), + IDENTIFIERS.get(0).asSegmentId().toString(), + StreamAppenderatorTester.DATASOURCE + ) + ); + + Assert.assertEquals(StreamAppenderator.UpgradeAnnouncementOutcome.ANNOUNCED, outcome); + } + } + + @Test + public void testRegisterUpgradedPendingSegmentReturnsSkippedUnknownBaseWhenBaseNotHeld() throws Exception + { + try ( + final StreamAppenderatorTester tester = + new StreamAppenderatorTester.Builder().maxRowsInMemory(2) + .basePersistDirectory(temporaryFolder.newFolder()) + .build()) { + final StreamAppenderator appenderator = (StreamAppenderator) tester.getAppenderator(); + appenderator.startJob(); + + // No sink has ever been created for the upgradedFromSegmentId below, so this task cannot announce it. + // This is the case where the upgrade request reached the wrong task. + final StreamAppenderator.UpgradeAnnouncementOutcome outcome = appenderator.registerUpgradedPendingSegment( + PendingSegmentRecord.create( + si("2050/2051", "Z", 1), + si("2050/2051", "Z", 1).asSegmentId().toString(), + si("2050/2051", "Y", 0).asSegmentId().toString(), + si("2050/2051", "Y", 0).asSegmentId().toString(), + StreamAppenderatorTester.DATASOURCE + ) + ); + + Assert.assertEquals(StreamAppenderator.UpgradeAnnouncementOutcome.SKIPPED_UNKNOWN_BASE, outcome); + } + } + @Test public void testQueryBySegments_withSegmentVersionUpgrades() throws Exception {