Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions clients/python/src/taskbroker_client/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@
to drain pending produce futures on shutdown before sending SIGKILL.
"""

TASK_PRODUCER_MAX_PENDING_FUTURES = 10_000
"""
Maximum number of pending futures that can be in the TaskProducer module's
`_pending_futures` list. This list is a global, so is shared between all instances
of TaskProducer.
"""


class CompressionType(Enum):
"""
Expand Down
15 changes: 12 additions & 3 deletions clients/python/src/taskbroker_client/worker/producer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import deque
from collections.abc import Callable
from concurrent.futures import Future
from typing import Any, Sequence
Expand All @@ -6,11 +7,16 @@
from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Topic

from taskbroker_client.constants import TASK_PRODUCER_MAX_PENDING_FUTURES
from taskbroker_client.types import ProducerProtocol

# This is global as TaskWorker needs to be able to call TaskProducer.collect_futures()
# without a reference to a task's specific instance of TaskProducer.
_pending_futures: set[ProducerFuture[BrokerValue[KafkaPayload]]] = set()
# Has a max_len to prevent unbounded future growth if TaskProducer.collect_futures()
# is never called.
_pending_futures: deque[ProducerFuture[BrokerValue[KafkaPayload]]] = deque(
maxlen=TASK_PRODUCER_MAX_PENDING_FUTURES
Comment thread
sentry[bot] marked this conversation as resolved.
)


class TaskProducer:
Expand All @@ -21,6 +27,9 @@ class TaskProducer:
producer futures tracked by TaskProducer, and will only register the task activation as
a success if all producer futures from that activation were successful.
Otherwise, the activation will be retried.

Args:
producer_factory: Callable that returns a producer object.
"""

def __init__(self, producer_factory: Callable[[], ProducerProtocol]) -> None:
Expand All @@ -33,13 +42,13 @@ def _get(self) -> ProducerProtocol:
return self._inner_producer

def track_future(self, future: ProducerFuture[BrokerValue[KafkaPayload]]) -> None:
_pending_futures.add(future)
_pending_futures.append(future)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Evicted futures skip completion checks

High Severity

With _pending_futures capped at TASK_PRODUCER_MAX_PENDING_FUTURES, each append past the limit drops the oldest tracked future. A single activation that produces more than that many messages before collect_futures() runs can finish as success even though some Kafka produces were never included in pending_futures or awaited.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit a615652. Configure here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick Claude search says we don't have any tasks in Sentry that produce more than ~1000 messages per task, so I'm going to assume 10,000 is a safe cap.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return an error, just in case someone does decide to produce >10k messages?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed in slack, I opened a PR to record a metric for queue size that we can alert on instead.


@staticmethod
def collect_futures() -> set[ProducerFuture[BrokerValue[KafkaPayload]]]:
futures = _pending_futures.copy()
_pending_futures.clear()
return futures
return set(futures)

def produce(
self,
Expand Down
7 changes: 7 additions & 0 deletions clients/python/tests/worker/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,10 @@ def callback(future: Future[BrokerValue[KafkaPayload]]) -> None:

with pytest.raises(RuntimeError, match="SimpleProducerFuture"):
producer.produce(Topic("test"), make_kafka_payload(), callbacks=[callback])


def test_pending_futures_max_len() -> None:
producer = TaskProducer(partial(get_dummy_producer, use_simple_futures=True))
for _ in range(10001):
producer.produce(Topic("test"), make_kafka_payload())
assert len(_pending_futures) == 10000
2 changes: 1 addition & 1 deletion clients/python/tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1452,7 +1452,7 @@ def test_child_process_clears_pending_futures_when_task_fails(
) -> None:
leftover_future: Future[BrokerValue[KafkaPayload]] = Future()
leftover_future.set_result(_make_broker_value())
_pending_futures.add(leftover_future)
_pending_futures.append(leftover_future)
assert len(_pending_futures) == 1

todo: queue.Queue[InflightTaskActivation] = queue.Queue()
Expand Down
Loading