Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,23 @@ public class ShutdownManager implements Closeable {
new ExecutorThreadFactory(
WorkerThreadsNameHelper.SHUTDOWN_MANAGER_THREAD_NAME_PREFIX, null));

private static final int CHECK_PERIOD_MS = 250;
private static final int DEFAULT_CHECK_PERIOD_MS = 250;
private final int checkPeriodMs;

public ShutdownManager() {
this(DEFAULT_CHECK_PERIOD_MS);
}

/**
* @param checkPeriodMs interval in milliseconds between shutdown-completion polls. Lower values
* speed up teardown at the cost of more frequent polling. Must be positive.
*/
public ShutdownManager(int checkPeriodMs) {
if (checkPeriodMs <= 0) {
throw new IllegalArgumentException("checkPeriodMs must be positive, was: " + checkPeriodMs);
}
this.checkPeriodMs = checkPeriodMs;
}

/** executorToShutdown.shutdownNow() -&gt; timed wait for a graceful termination */
public CompletableFuture<Void> shutdownExecutorNow(
Expand Down Expand Up @@ -97,7 +113,7 @@ private CompletableFuture<Void> untimedWait(
*/
private CompletableFuture<Void> limitedWait(
ExecutorService executorToShutdown, String executorName, Duration timeout) {
int attempts = (int) Math.ceil((double) timeout.toMillis() / CHECK_PERIOD_MS);
int attempts = (int) Math.ceil((double) timeout.toMillis() / checkPeriodMs);

CompletableFuture<Void> future = new CompletableFuture<>();
scheduledExecutorService.submit(
Expand Down Expand Up @@ -167,7 +183,7 @@ public void run() {
promise.complete(null);
return;
}
scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
scheduledExecutorService.schedule(this, checkPeriodMs, TimeUnit.MILLISECONDS);
}

abstract boolean isTerminated();
Expand Down Expand Up @@ -238,7 +254,7 @@ public void run() {
onSlowTermination();
}
}
scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
scheduledExecutorService.schedule(this, checkPeriodMs, TimeUnit.MILLISECONDS);
}

abstract boolean isTerminated();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@ private void shutdownInternal(boolean interruptUserTasks) {

/** Internal method that actually shuts down workers. Called from the plugin chain. */
private void doShutdown(boolean interruptUserTasks) {
ShutdownManager shutdownManager = new ShutdownManager();
ShutdownManager shutdownManager =
new ShutdownManager((int) factoryOptions.getShutdownCheckInterval().toMillis());

// Shutdown each worker with plugin hooks
List<CompletableFuture<Void>> shutdownFutures = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public static WorkerFactoryOptions getDefaultInstance() {

private static final int DEFAULT_WORKFLOW_CACHE_SIZE = 600;
private static final int DEFAULT_MAX_WORKFLOW_THREAD_COUNT = 600;
private static final Duration DEFAULT_SHUTDOWN_CHECK_INTERVAL = Duration.ofMillis(250);

private static final WorkerFactoryOptions DEFAULT_INSTANCE;

Expand All @@ -41,6 +42,7 @@ public static class Builder {
private boolean enableLoggingInReplay;
private boolean usingVirtualWorkflowThreads;
private ExecutorService overrideLocalActivityTaskExecutor;
private Duration shutdownCheckInterval;

private Builder() {}

Expand All @@ -57,6 +59,7 @@ private Builder(WorkerFactoryOptions options) {
this.enableLoggingInReplay = options.enableLoggingInReplay;
this.usingVirtualWorkflowThreads = options.usingVirtualWorkflowThreads;
this.overrideLocalActivityTaskExecutor = options.overrideLocalActivityTaskExecutor;
this.shutdownCheckInterval = options.shutdownCheckInterval;
}

/**
Expand Down Expand Up @@ -155,6 +158,22 @@ Builder setOverrideLocalActivityTaskExecutor(
return this;
}

/**
* Sets the interval between polls when checking for executor termination during shutdown. Lower
* values speed up shutdown at the cost of more frequent polling.
*
* <p>Default is 250ms, which is suitable for production. For test environments, consider
* setting a much lower value (e.g., 1ms) to minimize teardown overhead.
*
* @param shutdownCheckInterval the interval between shutdown polls. Must be positive.
* @return this builder for chaining
*/
@Experimental
public Builder setShutdownCheckInterval(Duration shutdownCheckInterval) {
Comment thread
brucearctor marked this conversation as resolved.
this.shutdownCheckInterval = shutdownCheckInterval;
return this;
}

public WorkerFactoryOptions build() {
return new WorkerFactoryOptions(
workflowCacheSize,
Expand All @@ -165,6 +184,7 @@ public WorkerFactoryOptions build() {
enableLoggingInReplay,
usingVirtualWorkflowThreads,
overrideLocalActivityTaskExecutor,
shutdownCheckInterval,
false);
}

Expand All @@ -189,6 +209,7 @@ public WorkerFactoryOptions validateAndBuildWithDefaults() {
enableLoggingInReplay,
usingVirtualWorkflowThreads,
overrideLocalActivityTaskExecutor,
shutdownCheckInterval,
true);
}
}
Expand All @@ -201,6 +222,7 @@ public WorkerFactoryOptions validateAndBuildWithDefaults() {
private final boolean enableLoggingInReplay;
private final boolean usingVirtualWorkflowThreads;
private final ExecutorService overrideLocalActivityTaskExecutor;
private final Duration shutdownCheckInterval;

private WorkerFactoryOptions(
int workflowCacheSize,
Expand All @@ -211,6 +233,7 @@ private WorkerFactoryOptions(
boolean enableLoggingInReplay,
boolean usingVirtualWorkflowThreads,
ExecutorService overrideLocalActivityTaskExecutor,
Duration shutdownCheckInterval,
boolean validate) {
if (validate) {
Preconditions.checkState(workflowCacheSize >= 0, "negative workflowCacheSize");
Expand All @@ -233,6 +256,13 @@ private WorkerFactoryOptions(
if (plugins == null) {
plugins = new WorkerPlugin[0];
}
if (shutdownCheckInterval != null) {
Preconditions.checkState(
!shutdownCheckInterval.isNegative() && !shutdownCheckInterval.isZero(),
"shutdownCheckInterval must be positive");
} else {
shutdownCheckInterval = DEFAULT_SHUTDOWN_CHECK_INTERVAL;
}
Comment thread
brucearctor marked this conversation as resolved.
}
this.workflowCacheSize = workflowCacheSize;
this.maxWorkflowThreadCount = maxWorkflowThreadCount;
Expand All @@ -243,6 +273,7 @@ private WorkerFactoryOptions(
this.enableLoggingInReplay = enableLoggingInReplay;
this.usingVirtualWorkflowThreads = usingVirtualWorkflowThreads;
this.overrideLocalActivityTaskExecutor = overrideLocalActivityTaskExecutor;
this.shutdownCheckInterval = shutdownCheckInterval;
}

public int getWorkflowCacheSize() {
Expand Down Expand Up @@ -291,6 +322,16 @@ ExecutorService getOverrideLocalActivityTaskExecutor() {
return overrideLocalActivityTaskExecutor;
}

/**
* Returns the interval between polls when checking for executor termination during shutdown.
*
* @return the configured shutdown check interval
*/
@Experimental
public Duration getShutdownCheckInterval() {
Comment thread
brucearctor marked this conversation as resolved.
return shutdownCheckInterval;
}

/**
* @deprecated not used anymore by JavaSDK, this value doesn't have any effect
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.temporal.internal.worker;

import org.junit.Test;

public class ShutdownManagerTest {

@Test(expected = IllegalArgumentException.class)
public void zeroPeriodIsRejected() {
new ShutdownManager(0);
}

@Test(expected = IllegalArgumentException.class)
public void negativePeriodIsRejected() {
new ShutdownManager(-1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.temporal.worker;

import static org.junit.Assert.assertEquals;

import java.time.Duration;
import org.junit.Test;

public class WorkerFactoryOptionsTest {

@Test
public void shutdownCheckIntervalDefaultIs250ms() {
WorkerFactoryOptions options =
WorkerFactoryOptions.newBuilder().validateAndBuildWithDefaults();
assertEquals(Duration.ofMillis(250), options.getShutdownCheckInterval());
}
Comment thread
brucearctor marked this conversation as resolved.

@Test
public void shutdownCheckIntervalCanBeSet() {
Duration interval = Duration.ofMillis(5);
WorkerFactoryOptions options =
WorkerFactoryOptions.newBuilder().setShutdownCheckInterval(interval).build();
assertEquals(interval, options.getShutdownCheckInterval());
}

@Test
public void shutdownCheckIntervalSurvivesValidateAndBuild() {
Duration interval = Duration.ofMillis(10);
WorkerFactoryOptions options =
WorkerFactoryOptions.newBuilder()
.setShutdownCheckInterval(interval)
.validateAndBuildWithDefaults();
assertEquals(interval, options.getShutdownCheckInterval());
}

@Test
public void shutdownCheckIntervalSurvivesCopyBuilder() {
Duration interval = Duration.ofMillis(3);
WorkerFactoryOptions original =
WorkerFactoryOptions.newBuilder().setShutdownCheckInterval(interval).build();
WorkerFactoryOptions copy = WorkerFactoryOptions.newBuilder(original).build();
assertEquals(interval, copy.getShutdownCheckInterval());
}

@Test(expected = IllegalStateException.class)
public void shutdownCheckIntervalRejectsZero() {
WorkerFactoryOptions.newBuilder()
.setShutdownCheckInterval(Duration.ZERO)
.validateAndBuildWithDefaults();
}

@Test(expected = IllegalStateException.class)
public void shutdownCheckIntervalRejectsNegative() {
WorkerFactoryOptions.newBuilder()
.setShutdownCheckInterval(Duration.ofMillis(-1))
.validateAndBuildWithDefaults();
}
}
Loading