From e5bed88e204e0877c3d082168d097785892dc71c Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 17 Jun 2026 11:07:40 -0400 Subject: [PATCH 1/2] fix: ensure deadlock errors in futures fail workflow task immediately --- .../internal/workflow_task_failure_error.rb | 9 ++ .../worker/workflow_executor/thread_pool.rb | 5 +- temporalio/lib/temporalio/workflow/future.rb | 5 + .../internal/workflow_task_failure_error.rbs | 6 ++ .../worker/workflow_executor/thread_pool.rbs | 3 +- temporalio/test/deadlock_test.rb | 96 +++++++++++++++++++ 6 files changed, 122 insertions(+), 2 deletions(-) create mode 100644 temporalio/lib/temporalio/internal/workflow_task_failure_error.rb create mode 100644 temporalio/sig/temporalio/internal/workflow_task_failure_error.rbs create mode 100644 temporalio/test/deadlock_test.rb diff --git a/temporalio/lib/temporalio/internal/workflow_task_failure_error.rb b/temporalio/lib/temporalio/internal/workflow_task_failure_error.rb new file mode 100644 index 00000000..3d93677b --- /dev/null +++ b/temporalio/lib/temporalio/internal/workflow_task_failure_error.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +module Temporalio + module Internal + # Marker for internal exceptions that must escape workflow fibers and fail the workflow task. + module WorkflowTaskFailureError + end + end +end diff --git a/temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb b/temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb index 9773578f..c3fc468f 100644 --- a/temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb +++ b/temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb @@ -4,6 +4,7 @@ require 'temporalio/internal/bridge/api' require 'temporalio/internal/proto_utils' require 'temporalio/internal/worker/workflow_instance' +require 'temporalio/internal/workflow_task_failure_error' require 'temporalio/scoped_logger' require 'temporalio/worker/thread_pool' require 'temporalio/worker/workflow_executor' @@ -229,7 +230,9 @@ def evict(worker_state, run_id, cache_remove_job) private_constant :Worker # Error raised when a processing a workflow task takes more than the expected amount of time. - class DeadlockError < Exception; end # rubocop:disable Lint/InheritException + class DeadlockError < Exception # rubocop:disable Lint/InheritException + include Internal::WorkflowTaskFailureError + end end end end diff --git a/temporalio/lib/temporalio/workflow/future.rb b/temporalio/lib/temporalio/workflow/future.rb index 33a21941..2f393f7f 100644 --- a/temporalio/lib/temporalio/workflow/future.rb +++ b/temporalio/lib/temporalio/workflow/future.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true +require 'temporalio/internal/workflow_task_failure_error' require 'temporalio/workflow' module Temporalio @@ -81,6 +82,10 @@ def initialize(&block) @fiber = Fiber.schedule do @result = block.call # steep:ignore rescue Exception => e # rubocop:disable Lint/RescueException + # These errors invalidate the whole workflow task. If stored and only raised on wait, the activation may + # complete successfully with commands that were emitted before the error. + raise if e.is_a?(Internal::WorkflowTaskFailureError) + @failure = e ensure @done = true diff --git a/temporalio/sig/temporalio/internal/workflow_task_failure_error.rbs b/temporalio/sig/temporalio/internal/workflow_task_failure_error.rbs new file mode 100644 index 00000000..eab329d6 --- /dev/null +++ b/temporalio/sig/temporalio/internal/workflow_task_failure_error.rbs @@ -0,0 +1,6 @@ +module Temporalio + module Internal + module WorkflowTaskFailureError + end + end +end diff --git a/temporalio/sig/temporalio/worker/workflow_executor/thread_pool.rbs b/temporalio/sig/temporalio/worker/workflow_executor/thread_pool.rbs index c3e6d2ea..bd51249b 100644 --- a/temporalio/sig/temporalio/worker/workflow_executor/thread_pool.rbs +++ b/temporalio/sig/temporalio/worker/workflow_executor/thread_pool.rbs @@ -50,8 +50,9 @@ module Temporalio end class DeadlockError < Exception + include Internal::WorkflowTaskFailureError end end end end -end \ No newline at end of file +end diff --git a/temporalio/test/deadlock_test.rb b/temporalio/test/deadlock_test.rb new file mode 100644 index 00000000..c5059865 --- /dev/null +++ b/temporalio/test/deadlock_test.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +require 'securerandom' +require 'temporalio/testing' +require 'temporalio/worker' +require 'temporalio/workflow' +require 'test' + +class DeadlockTest < Test + class BasicActivity < Temporalio::Activity::Definition + def execute(value) + value + end + end + + class DeadlockTimeoutOverrideExecutor < Temporalio::Worker::WorkflowExecutor::ThreadPool + def initialize(deadlock_timeout) + super() + @deadlock_timeout = deadlock_timeout + end + + def _validate_worker(workflow_worker, worker_state) + # Fairly hacky, but allows us to run this test without taking 5+ seconds + worker_state.instance_variable_set(:@deadlock_timeout, @deadlock_timeout) + super + end + end + + class DeadlockTimeoutInFutureWorkflow < Temporalio::Workflow::Definition + def execute + reached_activity = false + + Temporalio::Workflow::Future.new do + Temporalio::Workflow.execute_activity(BasicActivity, 0, start_to_close_timeout: 10) + end + + future = Temporalio::Workflow::Future.new do + # Do blocking sleep to trigger deadlock detection when using non-standard executor + Temporalio::Workflow::Unsafe.durable_scheduler_disabled { sleep(0.2) } + reached_activity = true + Temporalio::Workflow.execute_activity(BasicActivity, 1, start_to_close_timeout: 10) + end + + Temporalio::Workflow.wait_condition(cancellation: nil) { reached_activity || future.done? } + Temporalio::Workflow.sleep(0.1) + Temporalio::Workflow.wait_condition { false } + end + end + + def test_deadlock_in_future_fails_workflow_task_and_replays_on_new_worker + task_queue = "tq-#{SecureRandom.uuid}" + worker1 = Temporalio::Worker.new( + client: env.client, + task_queue:, + workflows: [DeadlockTimeoutInFutureWorkflow], + activities: [BasicActivity], + workflow_executor: DeadlockTimeoutOverrideExecutor.new(0.05) + ) + handle = worker1.run do + handle = env.client.start_workflow( + DeadlockTimeoutInFutureWorkflow, + id: "wf-#{SecureRandom.uuid}", + task_queue: + ) + assert_eventually_task_fail(handle:, message_contains: 'Potential deadlock detected') + handle + end + events_before_replay = handle.fetch_history_events.to_a + completed_workflow_tasks = events_before_replay.count(&:workflow_task_completed_event_attributes) + failed_workflow_tasks = events_before_replay.count(&:workflow_task_failed_event_attributes) + + worker2 = Temporalio::Worker.new( + client: env.client, + task_queue:, + workflows: [DeadlockTimeoutInFutureWorkflow], + activities: [BasicActivity], + max_cached_workflows: 0 + ) + # new_worker(task_queue:, max_cached_workflows: 0) + worker2.run do + assert_eventually(timeout: 20.0) do + events = handle.fetch_history_events.to_a + new_task_failure = events.select(&:workflow_task_failed_event_attributes).drop(failed_workflow_tasks).first + if new_task_failure + flunk( + 'New workflow task failure found: ' \ + "#{new_task_failure.workflow_task_failed_event_attributes.failure.message}" + ) + end + assert_operator events.count(&:workflow_task_completed_event_attributes), :>, completed_workflow_tasks + end + end + ensure + handle&.terminate + end +end From 0b3a9a2c580811fa6e535c9e5ccc2a5bc82fb803 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 17 Jun 2026 12:46:26 -0400 Subject: [PATCH 2/2] remove leftover comment --- temporalio/test/deadlock_test.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/temporalio/test/deadlock_test.rb b/temporalio/test/deadlock_test.rb index c5059865..db10e6b1 100644 --- a/temporalio/test/deadlock_test.rb +++ b/temporalio/test/deadlock_test.rb @@ -76,7 +76,6 @@ def test_deadlock_in_future_fails_workflow_task_and_replays_on_new_worker activities: [BasicActivity], max_cached_workflows: 0 ) - # new_worker(task_queue:, max_cached_workflows: 0) worker2.run do assert_eventually(timeout: 20.0) do events = handle.fetch_history_events.to_a