Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public void stop()
Preconditions.checkState(started, "SupervisorManager not started");
List<ListenableFuture<Void>> stopFutures = new ArrayList<>();
synchronized (lock) {
log.info("Stopping [%d] supervisors", supervisors.keySet().size());
log.info("Stopping [%d] supervisors", supervisors.size());
for (String id : supervisors.keySet()) {
try {
stopFutures.add(supervisors.get(id).lhs.stopAsync());
Expand Down Expand Up @@ -377,7 +377,7 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata resetData
* Resets a supervisor to the latest stream offsets and starts a bounded backfill supervisor to
* process the skipped range from the previously checkpointed offsets up to the latest offsets.
*
* @param id supervisor ID
* @param id supervisor ID
* @param backfillTaskCount number of tasks for the backfill supervisor, or null to inherit from the source spec
* @return map with {@code "id"} (the original supervisor ID) and {@code "backfillSupervisorId"}
* @throws IllegalArgumentException if the supervisor is not a {@link SeekableStreamSupervisor},
Expand Down Expand Up @@ -424,10 +424,20 @@ public Map<String, Object> resetToLatestAndBackfill(String id, @Nullable Integer
String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id + "_backfill");

try {
Map<String, Object> normalizedStartOffsets = jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class);
Map<String, Object> normalizedEndOffsets = jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class);
Map<String, Object> normalizedStartOffsets = jsonMapper.readValue(
jsonMapper.writeValueAsString(startOffsets),
Map.class
);
Map<String, Object> normalizedEndOffsets = jsonMapper.readValue(
jsonMapper.writeValueAsString(endOffsets),
Map.class
);
BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets);
SupervisorSpec backfillSpec = streamSpec.createBackfillSpec(backfillSupervisorId, boundedStreamConfig, backfillTaskCount);
SupervisorSpec backfillSpec = streamSpec.createBackfillSpec(
backfillSupervisorId,
boundedStreamConfig,
backfillTaskCount
);
createOrUpdateAndStartSupervisor(backfillSpec);
}
catch (JsonProcessingException e) {
Expand Down Expand Up @@ -615,12 +625,10 @@ private SupervisorSpec possiblyStopAndRemoveSupervisorInternal(String id, boolea
}

if (writeTombstone) {
metadataSupervisorManager.insert(
id,
new NoopSupervisorSpec(null, pair.rhs.getDataSources())
); // where NoopSupervisorSpec is a tombstone
// NoopSupervisorSpec is a tombstone
metadataSupervisorManager.insert(id, new NoopSupervisorSpec(null, pair.rhs.getDataSources()));
}
pair.lhs.stop(true);
pair.lhs.stop(writeTombstone || pair.lhs.stopGracefullyOnNewSpec());
supervisors.remove(id);

SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,12 @@ public void start()
}
}

@Override
public boolean stopGracefullyOnNewSpec()
{
return true;
}

@Override
public void stop(boolean stopGracefully)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public void testCreateUpdateAndRemoveSupervisor()
resetAll();
supervisor2.start();
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(true);
supervisor1.stop(true);
replayAll();

Expand Down Expand Up @@ -580,6 +581,7 @@ public void testNoPersistOnFailedStart()
metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert));
supervisor1.start();
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(true);
supervisor1.stop(true);
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor2.start();
Expand Down Expand Up @@ -729,6 +731,7 @@ public void testCreateSuspendResumeAndStopSupervisor()
metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert));
supervisor2.start();
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(true);
supervisor1.stop(true);
replayAll();

Expand All @@ -742,6 +745,7 @@ public void testCreateSuspendResumeAndStopSupervisor()
// in TestSupervisorSpec implementation of createRunningSpec
resetAll();
metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert));
EasyMock.expect(supervisor2.stopGracefullyOnNewSpec()).andReturn(true);
supervisor2.stop(true);
supervisor1.start();
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
Expand Down Expand Up @@ -778,6 +782,79 @@ public void testCreateSuspendResumeAndStopSupervisor()
Assert.assertTrue(manager.getSupervisorIds().isEmpty());
}

@Test
public void testStopGracefullyOnNewSpecFalseUsesNonGracefulStop()
{
final RecordingSupervisor originalSupervisor = new RecordingSupervisor();
final SupervisorSpec spec = new TestSupervisorSpec("id1", originalSupervisor);
final SupervisorSpec spec2 = new TestSupervisorSpec("id1", supervisor2);
final Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
"id3", new TestSupervisorSpec("id3", supervisor3)
);

EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
metadataSupervisorManager.insert("id1", spec);
supervisor3.start();
EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();

manager.start();
manager.createOrUpdateAndStartSupervisor(spec);
verifyAll();

// spec update: originalSupervisor opts out of graceful stop-on-new-spec, so it is stopped with stop(false),
// leaving its managed tasks running for the replacement supervisor to reconcile.
resetAll();
supervisor2.start();
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();

manager.createOrUpdateAndStartSupervisor(spec2);
Assert.assertEquals(Boolean.FALSE, originalSupervisor.stopGracefully);
verifyAll();

// terminate always stops gracefully, regardless of the supervisor's new-spec stop policy, so that the terminate
// contract of stopping associated tasks and publishing segments is honored.
resetAll();
metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.anyObject(NoopSupervisorSpec.class));
supervisor2.stop(true);
replayAll();

Assert.assertTrue(manager.stopAndRemoveSupervisor("id1"));
verifyAll();
}

@Test
public void testSuspendHonorsStopGracefullyOnNewSpecFalse()
{
// suspend triggers the same stop path as a spec update, so it must also consult
// stopGracefullyOnNewSpec() and stop with stop(false) when the supervisor opts out.
final Capture<TestSupervisorSpec> capturedInsert = Capture.newInstance();
final SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1, false, supervisor2);

EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
metadataSupervisorManager.insert("id1", spec);
supervisor1.start();
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();

manager.start();
manager.createOrUpdateAndStartSupervisor(spec);
verifyAll();

resetAll();
metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert));
supervisor2.start();
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(false);
supervisor1.stop(false);
replayAll();

manager.suspendOrResumeSupervisor("id1", true);
Assert.assertTrue(capturedInsert.getValue().suspended);
verifyAll();
}

@Test
public void testGetActiveSupervisorIdForDatasourceWithAppendLock()
{
Expand Down Expand Up @@ -1329,6 +1406,39 @@ public List<String> getDataSources()
}
}

private static class RecordingSupervisor implements Supervisor
{
private Boolean stopGracefully;

@Override
public void start()
{
}

@Override
public void stop(boolean stopGracefully)
{
this.stopGracefully = stopGracefully;
}

@Override
public SupervisorReport getStatus()
{
return null;
}

@Override
public SupervisorStateManager.State getState()
{
return SupervisorStateManager.BasicState.RUNNING;
}

@Override
public void reset(DataSourceMetadata dataSourceMetadata)
{
}
}

@JsonTypeName("testBackfill")
private static class TestBackfillSupervisorSpec extends SeekableStreamSupervisorSpec
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,20 @@ public void testRunning()
verifyAll();
}

@Test
public void testStopGracefullyOnNewSpecReturnsTrue()
{
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
replayAll();

SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

// SeekableStreamSupervisor retains the historical graceful stop-and-roll behavior on a spec update.
Assert.assertTrue(supervisor.stopGracefullyOnNewSpec());

verifyAll();
}

@Test
public void testRunningStreamGetSequenceNumberReturnsNull()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ public interface Supervisor
*/
void stop(boolean stopGracefully);

/**
* Indicates whether this supervisor should be stopped gracefully when its spec is updated/suspended/resumed
*/
default boolean stopGracefullyOnNewSpec()
{
return false;
}

/**
* Starts non-graceful shutdown of the supervisor and returns a future that completes when shutdown is complete.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,12 @@ public void testInputSourceResources()
NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec(null, Collections.singletonList("datasource1"));
Assert.assertTrue(noopSupervisorSpec.getInputSourceResources().isEmpty());
}

@Test
public void testStopGracefullyOnNewSpecDefaultsToFalse()
{
final Supervisor supervisor = new NoopSupervisorSpec(null, Collections.singletonList("datasource1"))
.createSupervisor();
Assert.assertFalse(supervisor.stopGracefullyOnNewSpec());
}
}
Loading