diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index fa7d96634ae6..62f7eca6032c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -281,7 +281,7 @@ public void stop() Preconditions.checkState(started, "SupervisorManager not started"); List> stopFutures = new ArrayList<>(); synchronized (lock) { - log.info("Stopping [%d] supervisors", supervisors.keySet().size()); + log.info("Stopping [%d] supervisors", supervisors.size()); for (String id : supervisors.keySet()) { try { stopFutures.add(supervisors.get(id).lhs.stopAsync()); @@ -377,7 +377,7 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata resetData * Resets a supervisor to the latest stream offsets and starts a bounded backfill supervisor to * process the skipped range from the previously checkpointed offsets up to the latest offsets. * - * @param id supervisor ID + * @param id supervisor ID * @param backfillTaskCount number of tasks for the backfill supervisor, or null to inherit from the source spec * @return map with {@code "id"} (the original supervisor ID) and {@code "backfillSupervisorId"} * @throws IllegalArgumentException if the supervisor is not a {@link SeekableStreamSupervisor}, @@ -424,10 +424,20 @@ public Map resetToLatestAndBackfill(String id, @Nullable Integer String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id + "_backfill"); try { - Map normalizedStartOffsets = jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class); - Map normalizedEndOffsets = jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class); + Map normalizedStartOffsets = jsonMapper.readValue( + jsonMapper.writeValueAsString(startOffsets), + Map.class + ); + Map normalizedEndOffsets = jsonMapper.readValue( + jsonMapper.writeValueAsString(endOffsets), + Map.class + ); BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets); - SupervisorSpec backfillSpec = streamSpec.createBackfillSpec(backfillSupervisorId, boundedStreamConfig, backfillTaskCount); + SupervisorSpec backfillSpec = streamSpec.createBackfillSpec( + backfillSupervisorId, + boundedStreamConfig, + backfillTaskCount + ); createOrUpdateAndStartSupervisor(backfillSpec); } catch (JsonProcessingException e) { @@ -615,12 +625,10 @@ private SupervisorSpec possiblyStopAndRemoveSupervisorInternal(String id, boolea } if (writeTombstone) { - metadataSupervisorManager.insert( - id, - new NoopSupervisorSpec(null, pair.rhs.getDataSources()) - ); // where NoopSupervisorSpec is a tombstone + // NoopSupervisorSpec is a tombstone + metadataSupervisorManager.insert(id, new NoopSupervisorSpec(null, pair.rhs.getDataSources())); } - pair.lhs.stop(true); + pair.lhs.stop(writeTombstone || pair.lhs.stopGracefullyOnNewSpec()); supervisors.remove(id); SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 74329c68e1d2..ae38f788e10c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1285,6 +1285,12 @@ public void start() } } + @Override + public boolean stopGracefullyOnNewSpec() + { + return true; + } + @Override public void stop(boolean stopGracefully) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 199e004b4243..4acead1dab0f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -135,6 +135,7 @@ public void testCreateUpdateAndRemoveSupervisor() resetAll(); supervisor2.start(); EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(true); supervisor1.stop(true); replayAll(); @@ -580,6 +581,7 @@ public void testNoPersistOnFailedStart() metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert)); supervisor1.start(); EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(true); supervisor1.stop(true); EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); supervisor2.start(); @@ -729,6 +731,7 @@ public void testCreateSuspendResumeAndStopSupervisor() metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert)); supervisor2.start(); EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(true); supervisor1.stop(true); replayAll(); @@ -742,6 +745,7 @@ public void testCreateSuspendResumeAndStopSupervisor() // in TestSupervisorSpec implementation of createRunningSpec resetAll(); metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert)); + EasyMock.expect(supervisor2.stopGracefullyOnNewSpec()).andReturn(true); supervisor2.stop(true); supervisor1.start(); EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); @@ -778,6 +782,79 @@ public void testCreateSuspendResumeAndStopSupervisor() Assert.assertTrue(manager.getSupervisorIds().isEmpty()); } + @Test + public void testStopGracefullyOnNewSpecFalseUsesNonGracefulStop() + { + final RecordingSupervisor originalSupervisor = new RecordingSupervisor(); + final SupervisorSpec spec = new TestSupervisorSpec("id1", originalSupervisor); + final SupervisorSpec spec2 = new TestSupervisorSpec("id1", supervisor2); + final Map existingSpecs = ImmutableMap.of( + "id3", new TestSupervisorSpec("id3", supervisor3) + ); + + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); + metadataSupervisorManager.insert("id1", spec); + supervisor3.start(); + EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + replayAll(); + + manager.start(); + manager.createOrUpdateAndStartSupervisor(spec); + verifyAll(); + + // spec update: originalSupervisor opts out of graceful stop-on-new-spec, so it is stopped with stop(false), + // leaving its managed tasks running for the replacement supervisor to reconcile. + resetAll(); + supervisor2.start(); + EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + replayAll(); + + manager.createOrUpdateAndStartSupervisor(spec2); + Assert.assertEquals(Boolean.FALSE, originalSupervisor.stopGracefully); + verifyAll(); + + // terminate always stops gracefully, regardless of the supervisor's new-spec stop policy, so that the terminate + // contract of stopping associated tasks and publishing segments is honored. + resetAll(); + metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.anyObject(NoopSupervisorSpec.class)); + supervisor2.stop(true); + replayAll(); + + Assert.assertTrue(manager.stopAndRemoveSupervisor("id1")); + verifyAll(); + } + + @Test + public void testSuspendHonorsStopGracefullyOnNewSpecFalse() + { + // suspend triggers the same stop path as a spec update, so it must also consult + // stopGracefullyOnNewSpec() and stop with stop(false) when the supervisor opts out. + final Capture capturedInsert = Capture.newInstance(); + final SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1, false, supervisor2); + + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of()); + metadataSupervisorManager.insert("id1", spec); + supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + replayAll(); + + manager.start(); + manager.createOrUpdateAndStartSupervisor(spec); + verifyAll(); + + resetAll(); + metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert)); + supervisor2.start(); + EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(false); + supervisor1.stop(false); + replayAll(); + + manager.suspendOrResumeSupervisor("id1", true); + Assert.assertTrue(capturedInsert.getValue().suspended); + verifyAll(); + } + @Test public void testGetActiveSupervisorIdForDatasourceWithAppendLock() { @@ -1329,6 +1406,39 @@ public List getDataSources() } } + private static class RecordingSupervisor implements Supervisor + { + private Boolean stopGracefully; + + @Override + public void start() + { + } + + @Override + public void stop(boolean stopGracefully) + { + this.stopGracefully = stopGracefully; + } + + @Override + public SupervisorReport getStatus() + { + return null; + } + + @Override + public SupervisorStateManager.State getState() + { + return SupervisorStateManager.BasicState.RUNNING; + } + + @Override + public void reset(DataSourceMetadata dataSourceMetadata) + { + } + } + @JsonTypeName("testBackfill") private static class TestBackfillSupervisorSpec extends SeekableStreamSupervisorSpec { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 9e45920ad719..a1835f177069 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -247,6 +247,20 @@ public void testRunning() verifyAll(); } + @Test + public void testStopGracefullyOnNewSpecReturnsTrue() + { + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + replayAll(); + + SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + // SeekableStreamSupervisor retains the historical graceful stop-and-roll behavior on a spec update. + Assert.assertTrue(supervisor.stopGracefullyOnNewSpec()); + + verifyAll(); + } + @Test public void testRunningStreamGetSequenceNumberReturnsNull() { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index ce6c87e5e082..6c62f2bf0144 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -47,6 +47,14 @@ public interface Supervisor */ void stop(boolean stopGracefully); + /** + * Indicates whether this supervisor should be stopped gracefully when its spec is updated/suspended/resumed + */ + default boolean stopGracefullyOnNewSpec() + { + return false; + } + /** * Starts non-graceful shutdown of the supervisor and returns a future that completes when shutdown is complete. */ diff --git a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java index 37a0202fa95e..9313e03e7b6b 100644 --- a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java +++ b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java @@ -51,4 +51,12 @@ public void testInputSourceResources() NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec(null, Collections.singletonList("datasource1")); Assert.assertTrue(noopSupervisorSpec.getInputSourceResources().isEmpty()); } + + @Test + public void testStopGracefullyOnNewSpecDefaultsToFalse() + { + final Supervisor supervisor = new NoopSupervisorSpec(null, Collections.singletonList("datasource1")) + .createSupervisor(); + Assert.assertFalse(supervisor.stopGracefullyOnNewSpec()); + } }