refactor: make supervisor diff+restart algorithm smarter on submission#19541
refactor: make supervisor diff+restart algorithm smarter on submission#19541jtuglu1 wants to merge 4 commits into
Conversation
b6ae435 to
d3e9127
Compare
1edda80 to
b18c3b3
Compare
d773ffa to
82dc32b
Compare
82dc32b to
fa855ef
Compare
There was a problem hiding this comment.
Pull request overview
This PR refactors supervisor “diff + restart” behavior so that supervisor specs can decide whether a changed submission actually requires a restart, while still persisting byte-level spec changes to metadata storage even when a restart is skipped. It introduces a SupervisorSpec.requireRestart(old) hook, implements smarter restart logic for seekable-stream supervisors, and adds IOConfig/spec builders plus equality semantics to support structured comparisons rather than opaque JSON byte diffs.
Changes:
- Add
SupervisorSpec.requireRestart(SupervisorSpec old)(default: restart on any non-identical change) and use it fromSupervisorManager.shouldUpdateSupervisor. - Persist spec updates even when restart is skipped via
SupervisorManager.updateSupervisorSpecWithoutRestart, wired intoSupervisorResourcePOST handling. - Introduce/extend builders and
equals/hashCodeimplementations across seekable-stream specs/configs (Kafka/Kinesis/RabbitStream) and update tests accordingly.
Reviewed changes
Copilot reviewed 34 out of 34 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java | Adds requireRestart(old) hook to let specs participate in restart decisions. |
| indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java | Splits “spec changed” vs “restart required”, adds no-restart persistence path. |
| indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java | Persists changed specs even when restart is skipped. |
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java | Implements structured restart decision + adds toBuilder() and spec equality/hash. |
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java | Adds equals/hashCode and toBuilder() for IOConfig comparisons/copying. |
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIngestionSpec.java | Adds equals/hashCode so spec equality can be structured. |
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SupervisorIOConfigBuilder.java | Enhances builder (copyFromBase, more fields) and adds a default IOConfig builder. |
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/LagAggregator.java | Makes default lag aggregator comparable across serde (stateless equality). |
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java | Adds equals/hashCode for structured comparisons. |
| extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java | Adds toBuilder() to support restart comparison/copy patterns. |
| extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java | Adds equals/hashCode and toBuilder() for IOConfig comparisons/copying. |
| extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java | Adds equals/hashCode to ensure tuning changes affect comparisons. |
| extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisIOConfigBuilder.java | New builder for Kinesis IOConfig with copyFrom/build support. |
| extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java | Adds toBuilder() to support restart comparison/copy patterns. |
| extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java | Adds equals/hashCode and toBuilder() for IOConfig comparisons/copying. |
| extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java | Adds equals/hashCode to ensure tuning changes affect comparisons. |
| extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java | Expands builder (copyFrom + additional fields), relies on base bounded/serverPriority fields. |
| extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java | Adds toBuilder() to support restart comparison/copy patterns. |
| extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java | Adds equals/hashCode and toBuilder() for IOConfig comparisons/copying. |
| extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfig.java | Adds equals/hashCode to ensure tuning changes affect comparisons. |
| extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamIOConfigBuilder.java | New builder for RabbitStream IOConfig with copyFrom/build support. |
| indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java | Updates test spec to support toBuilder() and uses IOConfig builders. |
| indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java | Replaces inline IOConfig anonymous classes with builders for stable equality. |
| indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java | Adds requireRestart tests; updates IOConfig construction to builder pattern. |
| indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java | Adds equality/hashCode tests for IOConfig + related value types. |
| indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java | Adjusts POST expectations to include no-restart persistence call. |
| indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java | Adds tests for updateSupervisorSpecWithoutRestart and introduces versioned test spec. |
| extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java | Updates tests to use new Kinesis IOConfig builder. |
| extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java | Adds equality/hashCode tests and builder-based construction. |
| extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java | Updates sampler tests to use Kinesis IOConfig builder. |
| extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | Updates tests to use Kafka IOConfig builder. |
| extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java | Adds equality/hashCode tests and builder-based construction. |
| extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java | Updates tests to use RabbitStream IOConfig builder. |
| extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfigTest.java | Adds equality/hashCode tests and builder-based construction. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 2 |
| P3 | 0 |
| Total | 3 |
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 2 |
| P3 | 0 |
| Total | 3 |
Reviewed 34 of 34 changed files.
This is an automated review by Codex GPT-5.5
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 2 |
| P3 | 0 |
| Total | 2 |
Reviewed 36 of 36 changed files.
This is an automated review by Codex GPT-5.5
cecemei
left a comment
There was a problem hiding this comment.
overall looks good to me, left a few minor comments
19edbf8 to
5168809
Compare
FrankChen021
left a comment
There was a problem hiding this comment.
I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.
Reviewed 38 of 38 changed files.
This is an automated review by Codex GPT-5.5
| */ | ||
| public SupervisorIOConfigBuilder<?, ?> toBuilder() | ||
| { | ||
| return new SupervisorIOConfigBuilder.DefaultSupervisorIOConfigBuilder().copyFromBase(this); |
There was a problem hiding this comment.
If sub-classes must always override this method, we should make it abstract.
| return new LagStats(maxLag, totalLag, avgLag); | ||
| } | ||
|
|
||
| // Stateless: all instances are equal. Needed because Jackson creates fresh instances on |
There was a problem hiding this comment.
Alternatively, you can make the constructor private and add a static @JsonCreator method which just returns the DEFAULT instance.
| */ | ||
| public Builder<?> toBuilder() | ||
| { | ||
| throw new UnsupportedOperationException("No builder is available for this supervisor spec."); |
There was a problem hiding this comment.
Can this method be abstract instead?
| this.taskStorage = spec.taskStorage; | ||
| this.taskMaster = spec.taskMaster; | ||
| this.indexerMetadataStorageCoordinator = spec.indexerMetadataStorageCoordinator; | ||
| this.indexTaskClientFactory = spec.indexTaskClientFactory; | ||
| this.mapper = spec.mapper; | ||
| this.emitter = spec.emitter; | ||
| this.monitorSchedulerConfig = spec.monitorSchedulerConfig; | ||
| this.rowIngestionMetersFactory = spec.rowIngestionMetersFactory; | ||
| this.supervisorStateManagerConfig = spec.supervisorStateManagerConfig; |
There was a problem hiding this comment.
Will the builder ever use these dependencies?
The output of the build() method will only ever be used for an equality check which should not involve the services anyway. So the build() method could simply pass these as null, atleast for now.
| synchronized (lock) { | ||
| Preconditions.checkState(started, "SupervisorManager not started"); | ||
| try { | ||
| byte[] specAsBytes = jsonMapper.writeValueAsBytes(spec); |
There was a problem hiding this comment.
Maybe rename this method from shouldUpdateSupervisor to shouldRestartSupervisor to avoid ambiguity.
There was a problem hiding this comment.
Alternatively, add a new method shouldRestartSupervisor and leave shouldUpdateSupervisor unchanged.
76ad757 to
4948912
Compare
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 1 |
| P3 | 0 |
| Total | 1 |
Reviewed 43 of 43 changed files.
This is an automated review by Codex GPT-5.5
4948912 to
ef7bb59
Compare
FrankChen021
left a comment
There was a problem hiding this comment.
I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.
Reviewed 43 of 43 changed files.
This is an automated review by Codex GPT-5.5
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 1 |
| P3 | 0 |
| Total | 1 |
Found 1 issue.
Reviewed 43 of 43 changed files.
This is an automated review by Codex GPT-5.5
| // The effective (merged) spec is what will be persisted, so validate that transition exactly once. | ||
| current.rhs.validateSpecUpdateTo(spec); | ||
|
|
||
| if (!current.rhs.requireRestart(spec)) { |
There was a problem hiding this comment.
[P2] No-restart task-count updates leave the live supervisor stale
When an autoscaling supervisor is resubmitted with only an effective taskCount change, for example after autoscaling has moved the current ioConfig.taskCount to 5 and the new POST is merged back to taskCountStart 2, SeekableStreamSupervisorSpec.requireRestart masks taskCount differences whenever either side has autoscaling enabled. This branch then persists the new spec and swaps only the manager's stored spec, but it does not restart current.lhs or update the running SeekableStreamSupervisor's ioConfig. The API can return modified=true,restarted=false while ingestion continues using the old taskCount until a later autoscaler action or Overlord restart. Treat this reset as restart-required, or explicitly propagate the new taskCount into the running supervisor before taking the no-restart path.
c932e9c to
f16d41b
Compare
FrankChen021
left a comment
There was a problem hiding this comment.
I have reviewed the updated changes for correctness, edge cases, concurrency, and integration risks; no issues found.
Reviewed 44 of 44 changed files.
This is an automated review by Codex GPT-5.5
Description
Restarting a supervisor spec after submission sometimes isn't always needed and can be expensive for large supervisors. The current skip restart logic is very bare-bones (doing effectively a byte-level check between the 2 specs) instead of allowing each spec to override its own restart requirements, often causing some false positive restarts. This diff can also be affected by false-positive diffs like ioConfig.taskCount being updated while auto-scaling is configured, whitespace changes, etc.
This updates the restart-required checking logic by allowing supervisors to implement their own restart requirement checks (plus some sane defaults) along with proper comparators, and stop treating each supervisor spec as an opaque ball of text. Also adds builders to keep things clean.
Notably, this doesn't change the spec persistence behavior: byte-level differing spec updates are persisted into the DB, but may not necessarily invoke a supervisor restart.
Release note
Make supervisor diff+restart algorithm smarter on submission
This PR has: