-
-
Notifications
You must be signed in to change notification settings - Fork 7
fix(TaskProducer): bounded queue of pending futures #678
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| from collections import deque | ||
| from collections.abc import Callable | ||
| from concurrent.futures import Future | ||
| from typing import Any, Sequence | ||
|
|
@@ -6,11 +7,16 @@ | |
| from arroyo.backends.kafka import KafkaPayload | ||
| from arroyo.types import BrokerValue, Topic | ||
|
|
||
| from taskbroker_client.constants import TASK_PRODUCER_MAX_PENDING_FUTURES | ||
| from taskbroker_client.types import ProducerProtocol | ||
|
|
||
| # This is global as TaskWorker needs to be able to call TaskProducer.collect_futures() | ||
| # without a reference to a task's specific instance of TaskProducer. | ||
| _pending_futures: set[ProducerFuture[BrokerValue[KafkaPayload]]] = set() | ||
| # Has a max_len to prevent unbounded future growth if TaskProducer.collect_futures() | ||
| # is never called. | ||
| _pending_futures: deque[ProducerFuture[BrokerValue[KafkaPayload]]] = deque( | ||
| maxlen=TASK_PRODUCER_MAX_PENDING_FUTURES | ||
| ) | ||
|
|
||
|
|
||
| class TaskProducer: | ||
|
|
@@ -21,6 +27,9 @@ class TaskProducer: | |
| producer futures tracked by TaskProducer, and will only register the task activation as | ||
| a success if all producer futures from that activation were successful. | ||
| Otherwise, the activation will be retried. | ||
|
|
||
| Args: | ||
| producer_factory: Callable that returns a producer object. | ||
| """ | ||
|
|
||
| def __init__(self, producer_factory: Callable[[], ProducerProtocol]) -> None: | ||
|
|
@@ -33,13 +42,13 @@ def _get(self) -> ProducerProtocol: | |
| return self._inner_producer | ||
|
|
||
| def track_future(self, future: ProducerFuture[BrokerValue[KafkaPayload]]) -> None: | ||
| _pending_futures.add(future) | ||
| _pending_futures.append(future) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Evicted futures skip completion checksHigh Severity With Reviewed by Cursor Bugbot for commit a615652. Configure here.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Quick Claude search says we don't have any tasks in Sentry that produce more than ~1000 messages per task, so I'm going to assume 10,000 is a safe cap.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this return an error, just in case someone does decide to produce >10k messages?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed in slack, I opened a PR to record a metric for queue size that we can alert on instead. |
||
|
|
||
| @staticmethod | ||
| def collect_futures() -> set[ProducerFuture[BrokerValue[KafkaPayload]]]: | ||
| futures = _pending_futures.copy() | ||
| _pending_futures.clear() | ||
| return futures | ||
| return set(futures) | ||
|
|
||
| def produce( | ||
| self, | ||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.