diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java index 8c243e04c..d59911b0a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java @@ -21,7 +21,23 @@ public class ShutdownManager implements Closeable { new ExecutorThreadFactory( WorkerThreadsNameHelper.SHUTDOWN_MANAGER_THREAD_NAME_PREFIX, null)); - private static final int CHECK_PERIOD_MS = 250; + private static final int DEFAULT_CHECK_PERIOD_MS = 250; + private final int checkPeriodMs; + + public ShutdownManager() { + this(DEFAULT_CHECK_PERIOD_MS); + } + + /** + * @param checkPeriodMs interval in milliseconds between shutdown-completion polls. Lower values + * speed up teardown at the cost of more frequent polling. Must be positive. + */ + public ShutdownManager(int checkPeriodMs) { + if (checkPeriodMs <= 0) { + throw new IllegalArgumentException("checkPeriodMs must be positive, was: " + checkPeriodMs); + } + this.checkPeriodMs = checkPeriodMs; + } /** executorToShutdown.shutdownNow() -> timed wait for a graceful termination */ public CompletableFuture shutdownExecutorNow( @@ -97,7 +113,7 @@ private CompletableFuture untimedWait( */ private CompletableFuture limitedWait( ExecutorService executorToShutdown, String executorName, Duration timeout) { - int attempts = (int) Math.ceil((double) timeout.toMillis() / CHECK_PERIOD_MS); + int attempts = (int) Math.ceil((double) timeout.toMillis() / checkPeriodMs); CompletableFuture future = new CompletableFuture<>(); scheduledExecutorService.submit( @@ -167,7 +183,7 @@ public void run() { promise.complete(null); return; } - scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS); + scheduledExecutorService.schedule(this, checkPeriodMs, TimeUnit.MILLISECONDS); } abstract boolean isTerminated(); @@ -238,7 +254,7 @@ public void run() { onSlowTermination(); } } - scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS); + scheduledExecutorService.schedule(this, checkPeriodMs, TimeUnit.MILLISECONDS); } abstract boolean isTerminated(); diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java index 741990624..bbf8af3db 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java @@ -411,7 +411,8 @@ private void shutdownInternal(boolean interruptUserTasks) { /** Internal method that actually shuts down workers. Called from the plugin chain. */ private void doShutdown(boolean interruptUserTasks) { - ShutdownManager shutdownManager = new ShutdownManager(); + ShutdownManager shutdownManager = + new ShutdownManager((int) factoryOptions.getShutdownCheckInterval().toMillis()); // Shutdown each worker with plugin hooks List> shutdownFutures = new ArrayList<>(); diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactoryOptions.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactoryOptions.java index 07063416c..95f8ebc75 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactoryOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactoryOptions.java @@ -24,6 +24,7 @@ public static WorkerFactoryOptions getDefaultInstance() { private static final int DEFAULT_WORKFLOW_CACHE_SIZE = 600; private static final int DEFAULT_MAX_WORKFLOW_THREAD_COUNT = 600; + private static final Duration DEFAULT_SHUTDOWN_CHECK_INTERVAL = Duration.ofMillis(250); private static final WorkerFactoryOptions DEFAULT_INSTANCE; @@ -41,6 +42,7 @@ public static class Builder { private boolean enableLoggingInReplay; private boolean usingVirtualWorkflowThreads; private ExecutorService overrideLocalActivityTaskExecutor; + private Duration shutdownCheckInterval; private Builder() {} @@ -57,6 +59,7 @@ private Builder(WorkerFactoryOptions options) { this.enableLoggingInReplay = options.enableLoggingInReplay; this.usingVirtualWorkflowThreads = options.usingVirtualWorkflowThreads; this.overrideLocalActivityTaskExecutor = options.overrideLocalActivityTaskExecutor; + this.shutdownCheckInterval = options.shutdownCheckInterval; } /** @@ -155,6 +158,22 @@ Builder setOverrideLocalActivityTaskExecutor( return this; } + /** + * Sets the interval between polls when checking for executor termination during shutdown. Lower + * values speed up shutdown at the cost of more frequent polling. + * + *

Default is 250ms, which is suitable for production. For test environments, consider + * setting a much lower value (e.g., 1ms) to minimize teardown overhead. + * + * @param shutdownCheckInterval the interval between shutdown polls. Must be positive. + * @return this builder for chaining + */ + @Experimental + public Builder setShutdownCheckInterval(Duration shutdownCheckInterval) { + this.shutdownCheckInterval = shutdownCheckInterval; + return this; + } + public WorkerFactoryOptions build() { return new WorkerFactoryOptions( workflowCacheSize, @@ -165,6 +184,7 @@ public WorkerFactoryOptions build() { enableLoggingInReplay, usingVirtualWorkflowThreads, overrideLocalActivityTaskExecutor, + shutdownCheckInterval, false); } @@ -189,6 +209,7 @@ public WorkerFactoryOptions validateAndBuildWithDefaults() { enableLoggingInReplay, usingVirtualWorkflowThreads, overrideLocalActivityTaskExecutor, + shutdownCheckInterval, true); } } @@ -201,6 +222,7 @@ public WorkerFactoryOptions validateAndBuildWithDefaults() { private final boolean enableLoggingInReplay; private final boolean usingVirtualWorkflowThreads; private final ExecutorService overrideLocalActivityTaskExecutor; + private final Duration shutdownCheckInterval; private WorkerFactoryOptions( int workflowCacheSize, @@ -211,6 +233,7 @@ private WorkerFactoryOptions( boolean enableLoggingInReplay, boolean usingVirtualWorkflowThreads, ExecutorService overrideLocalActivityTaskExecutor, + Duration shutdownCheckInterval, boolean validate) { if (validate) { Preconditions.checkState(workflowCacheSize >= 0, "negative workflowCacheSize"); @@ -233,6 +256,13 @@ private WorkerFactoryOptions( if (plugins == null) { plugins = new WorkerPlugin[0]; } + if (shutdownCheckInterval != null) { + Preconditions.checkState( + !shutdownCheckInterval.isNegative() && !shutdownCheckInterval.isZero(), + "shutdownCheckInterval must be positive"); + } else { + shutdownCheckInterval = DEFAULT_SHUTDOWN_CHECK_INTERVAL; + } } this.workflowCacheSize = workflowCacheSize; this.maxWorkflowThreadCount = maxWorkflowThreadCount; @@ -243,6 +273,7 @@ private WorkerFactoryOptions( this.enableLoggingInReplay = enableLoggingInReplay; this.usingVirtualWorkflowThreads = usingVirtualWorkflowThreads; this.overrideLocalActivityTaskExecutor = overrideLocalActivityTaskExecutor; + this.shutdownCheckInterval = shutdownCheckInterval; } public int getWorkflowCacheSize() { @@ -291,6 +322,16 @@ ExecutorService getOverrideLocalActivityTaskExecutor() { return overrideLocalActivityTaskExecutor; } + /** + * Returns the interval between polls when checking for executor termination during shutdown. + * + * @return the configured shutdown check interval + */ + @Experimental + public Duration getShutdownCheckInterval() { + return shutdownCheckInterval; + } + /** * @deprecated not used anymore by JavaSDK, this value doesn't have any effect */ diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/ShutdownManagerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/ShutdownManagerTest.java new file mode 100644 index 000000000..1e2838047 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/ShutdownManagerTest.java @@ -0,0 +1,16 @@ +package io.temporal.internal.worker; + +import org.junit.Test; + +public class ShutdownManagerTest { + + @Test(expected = IllegalArgumentException.class) + public void zeroPeriodIsRejected() { + new ShutdownManager(0); + } + + @Test(expected = IllegalArgumentException.class) + public void negativePeriodIsRejected() { + new ShutdownManager(-1); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/worker/WorkerFactoryOptionsTest.java b/temporal-sdk/src/test/java/io/temporal/worker/WorkerFactoryOptionsTest.java new file mode 100644 index 000000000..c14a53af4 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/worker/WorkerFactoryOptionsTest.java @@ -0,0 +1,56 @@ +package io.temporal.worker; + +import static org.junit.Assert.assertEquals; + +import java.time.Duration; +import org.junit.Test; + +public class WorkerFactoryOptionsTest { + + @Test + public void shutdownCheckIntervalDefaultIs250ms() { + WorkerFactoryOptions options = WorkerFactoryOptions.newBuilder().validateAndBuildWithDefaults(); + assertEquals(Duration.ofMillis(250), options.getShutdownCheckInterval()); + } + + @Test + public void shutdownCheckIntervalCanBeSet() { + Duration interval = Duration.ofMillis(5); + WorkerFactoryOptions options = + WorkerFactoryOptions.newBuilder().setShutdownCheckInterval(interval).build(); + assertEquals(interval, options.getShutdownCheckInterval()); + } + + @Test + public void shutdownCheckIntervalSurvivesValidateAndBuild() { + Duration interval = Duration.ofMillis(10); + WorkerFactoryOptions options = + WorkerFactoryOptions.newBuilder() + .setShutdownCheckInterval(interval) + .validateAndBuildWithDefaults(); + assertEquals(interval, options.getShutdownCheckInterval()); + } + + @Test + public void shutdownCheckIntervalSurvivesCopyBuilder() { + Duration interval = Duration.ofMillis(3); + WorkerFactoryOptions original = + WorkerFactoryOptions.newBuilder().setShutdownCheckInterval(interval).build(); + WorkerFactoryOptions copy = WorkerFactoryOptions.newBuilder(original).build(); + assertEquals(interval, copy.getShutdownCheckInterval()); + } + + @Test(expected = IllegalStateException.class) + public void shutdownCheckIntervalRejectsZero() { + WorkerFactoryOptions.newBuilder() + .setShutdownCheckInterval(Duration.ZERO) + .validateAndBuildWithDefaults(); + } + + @Test(expected = IllegalStateException.class) + public void shutdownCheckIntervalRejectsNegative() { + WorkerFactoryOptions.newBuilder() + .setShutdownCheckInterval(Duration.ofMillis(-1)) + .validateAndBuildWithDefaults(); + } +}