Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions clients/python/src/taskbroker_client/worker/workerchild.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,16 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None:
},
)
pending_task_futures.remove(task)
# FIXME(benm): Use passthrough option to get future-related metrics
# without placing a duplicate ProcessingResult
_task_execution_complete(
inflight=task.inflight,
next_state=task.status,
execution_start_time=task.execution_start_time,
execution_end_time=task.futures_start_time,
task_func=task.task_func,
futures_start_time=task.futures_start_time,
passthrough=True,
)

def get_oldest_pending_activation() -> ActivationWithPendingFutures | None:
Expand Down Expand Up @@ -512,6 +515,13 @@ def check_task_future_completion() -> None:
task_func,
)
else:
# FIXME(benm): Temporarily bypass producer futures needing to be completed
# before writing to `processed_tasks` while still recording metrics.
_place_processing_result(

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming its safe to put a task's ProcessingResult in the processed_tasks queue while still keeping the inflight activation itself pending in the worker child's queue - if I need to somehow clone anything here let me know

inflight,
next_state,
task_func,
)
Comment thread
sentry[bot] marked this conversation as resolved.
Comment thread
bmckerry marked this conversation as resolved.
for name, futures in task_produced_futures.items():
# How many futures were produced in the executed task,
# tagged by producer name
Expand Down Expand Up @@ -721,13 +731,10 @@ def record_task_execution(
status=monitor_status,
)

def _task_execution_complete(
def _place_processing_result(
inflight: InflightTaskActivation,
next_state: TaskActivationStatus.ValueType,
execution_start_time: float,
execution_end_time: float,
task_func: Task[Any, Any] | None,
futures_start_time: float | None = None,
) -> None:
Comment thread
sentry[bot] marked this conversation as resolved.
with metrics.timer(
"taskworker.worker.processed_tasks.put.duration",
Expand Down Expand Up @@ -766,6 +773,26 @@ def _task_execution_complete(
receive_timestamp=inflight.receive_timestamp,
)
)

def _task_execution_complete(
inflight: InflightTaskActivation,
next_state: TaskActivationStatus.ValueType,
execution_start_time: float,
execution_end_time: float,
task_func: Task[Any, Any] | None,
futures_start_time: float | None = None,
# FIXME(benm): Temp option to skip placing a task in processed_tasks.
# This is for tasks with pending producer futures, as we still want to record
# metrics as usual but want to have a `ProcessingResult` placed immediately
# while we troubleshoot why futures are never being marked as done.
passthrough: bool = False,
) -> None:
if not passthrough:
_place_processing_result(
inflight,
next_state,
task_func,
)
record_task_execution(
inflight.activation,
next_state,
Expand Down
184 changes: 93 additions & 91 deletions clients/python/tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1523,54 +1523,55 @@ def test_child_process_tracks_producer_futures(
assert result.status == TASK_ACTIVATION_STATUS_COMPLETE


def test_child_process_holds_result_until_futures_done(
clear_pending_futures: None, restore_signal_handlers: None
) -> None:
task = _producing_task()
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
shutdown = Event()

pending_future: Future[BrokerValue[KafkaPayload]] = Future()
todo.put(task)

# `child_process` calls `signal.signal`, which must run on the main thread.
# Use a helper thread to observe the queue while the future is still
# pending, then resolve the future so the drain can complete.
observed_empty_while_pending = threading.Event()

def observe_and_resolve() -> None:
# Wait for child_process to process the task and enter the drain loop.
time.sleep(0.5)
if processed.qsize() == 0:
observed_empty_while_pending.set()
pending_future.set_result(_make_broker_value())

observer = threading.Thread(target=observe_and_resolve, name="future-observer")
observer.start()
try:
with mock.patch.object(
TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}}
):
child_process(
"examples.app:app",
todo,
processed,
shutdown,
max_task_count=1,
processing_pool_name="test",
process_type="fork",
)
finally:
observer.join(timeout=5)
shutdown.set()

assert (
observed_empty_while_pending.is_set()
), "result was pushed before the producer future was resolved"
result = processed.get(timeout=5)
assert result.task_id == task.activation.id
assert result.status == TASK_ACTIVATION_STATUS_COMPLETE
# FIXME(benm): Skip this test while we're bypassing awaiting future completion
# def test_child_process_holds_result_until_futures_done(
# clear_pending_futures: None, restore_signal_handlers: None
# ) -> None:
# task = _producing_task()
# todo: queue.Queue[InflightTaskActivation] = queue.Queue()
# processed: queue.Queue[ProcessingResult] = queue.Queue()
# shutdown = Event()

# pending_future: Future[BrokerValue[KafkaPayload]] = Future()
# todo.put(task)

# # `child_process` calls `signal.signal`, which must run on the main thread.
# # Use a helper thread to observe the queue while the future is still
# # pending, then resolve the future so the drain can complete.
# observed_empty_while_pending = threading.Event()

# def observe_and_resolve() -> None:
# # Wait for child_process to process the task and enter the drain loop.
# time.sleep(0.5)
# if processed.qsize() == 0:
# observed_empty_while_pending.set()
# pending_future.set_result(_make_broker_value())

# observer = threading.Thread(target=observe_and_resolve, name="future-observer")
# observer.start()
# try:
# with mock.patch.object(
# TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}}
# ):
# child_process(
# "examples.app:app",
# todo,
# processed,
# shutdown,
# max_task_count=1,
# processing_pool_name="test",
# process_type="fork",
# )
# finally:
# observer.join(timeout=5)
# shutdown.set()

# assert (
# observed_empty_while_pending.is_set()
# ), "result was pushed before the producer future was resolved"
# result = processed.get(timeout=5)
# assert result.task_id == task.activation.id
# assert result.status == TASK_ACTIVATION_STATUS_COMPLETE


def test_child_process_drains_pending_futures_on_sigterm(
Expand Down Expand Up @@ -1616,49 +1617,50 @@ def deliver_sigterm() -> None:
assert result.status == TASK_ACTIVATION_STATUS_COMPLETE


def test_child_process_retries_on_failed_future(
clear_pending_futures: None, restore_signal_handlers: None
) -> None:
retriable_task = InflightTaskActivation(
host="localhost:50051",
receive_timestamp=0,
activation=TaskActivation(
id="failed-future-retry",
taskname="examples.will_retry",
namespace="examples",
parameters=orjson.dumps({"args": ["noop"], "kwargs": {}}).decode("utf8"),
processing_deadline_duration=2,
retry_state=RetryState(
attempts=0,
max_attempts=3,
on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD,
),
),
)
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
shutdown = Event()

failed_future: Future[BrokerValue[KafkaPayload]] = Future()
failed_future.set_exception(RuntimeError("kafka produce failed"))

todo.put(retriable_task)
with mock.patch.object(
TaskProducer, "collect_futures", return_value={"test.producer": {failed_future}}
):
child_process(
"examples.app:app",
todo,
processed,
shutdown,
max_task_count=1,
processing_pool_name="test",
process_type="fork",
)

result = processed.get(timeout=5)
assert result.task_id == retriable_task.activation.id
assert result.status == TASK_ACTIVATION_STATUS_RETRY
# FIXME(benm): Skip this test while we're bypassing awaiting future completion
# def test_child_process_retries_on_failed_future(
# clear_pending_futures: None, restore_signal_handlers: None
# ) -> None:
# retriable_task = InflightTaskActivation(
# host="localhost:50051",
# receive_timestamp=0,
# activation=TaskActivation(
# id="failed-future-retry",
# taskname="examples.will_retry",
# namespace="examples",
# parameters=orjson.dumps({"args": ["noop"], "kwargs": {}}).decode("utf8"),
# processing_deadline_duration=2,
# retry_state=RetryState(
# attempts=0,
# max_attempts=3,
# on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD,
# ),
# ),
# )
# todo: queue.Queue[InflightTaskActivation] = queue.Queue()
# processed: queue.Queue[ProcessingResult] = queue.Queue()
# shutdown = Event()

# failed_future: Future[BrokerValue[KafkaPayload]] = Future()
# failed_future.set_exception(RuntimeError("kafka produce failed"))

# todo.put(retriable_task)
# with mock.patch.object(
# TaskProducer, "collect_futures", return_value={"test.producer": {failed_future}}
# ):
# child_process(
# "examples.app:app",
# todo,
# processed,
# shutdown,
# max_task_count=1,
# processing_pool_name="test",
# process_type="fork",
# )

# result = processed.get(timeout=5)
# assert result.task_id == retriable_task.activation.id
# assert result.status == TASK_ACTIVATION_STATUS_RETRY


def test_child_process_clears_pending_futures_when_task_fails(
Expand Down
Loading