Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 5 additions & 16 deletions workers/queue_backend/__init__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,11 @@
"""Queue-backend seam for workers.
"""Queue-backend seam.
This module is the single place where the choice of queue substrate
(Celery+RabbitMQ today; PG Queue in the future) lives.
Today both entry points are no-op aliases over Celery primitives:
* ``dispatch(task_name, args, kwargs, queue)`` -> ``current_app.send_task(...)``
* ``@worker_task`` -> ``@shared_task``
A later phase will route specific tasks through a non-Celery substrate
(PG Queue) based on configuration; until then everything goes to Celery.
The exact routing mechanism is intentionally not pinned here.
Call sites should migrate to this module so the eventual substrate switch
is a single-flag operation rather than a codebase-wide rewrite.
Single place where the substrate choice (Celery today; PG Queue later)
lives. Both entry points are transparent passthroughs to Celery today.
"""

from .decorator import worker_task
from .dispatch import dispatch
from .fairness import FairnessKey

__all__ = ["dispatch", "worker_task"]
__all__ = ["FairnessKey", "dispatch", "worker_task"]
32 changes: 7 additions & 25 deletions workers/queue_backend/decorator.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,7 @@
"""Task registration decorator.
"""Transparent wrapper over ``celery.shared_task``.
Today: a transparent wrapper over ``celery.shared_task``.
Future: registers the task body with whichever substrates are enabled
(Celery + optionally PG Queue), so a single ``@worker_task`` definition
can be served by either consumer.
Accepts both Celery decorator forms — ``shared_task`` handles them
internally, so a pass-through ``*args, **kwargs`` is enough:
@worker_task
def healthcheck(self): ...
@worker_task(bind=True, name="my.task")
def my_task(self, payload): ...
Accepts both decorator forms (bare and parameterised); a later phase
may register the task body with non-Celery substrates from here too.
"""

from __future__ import annotations
Expand All @@ -23,17 +12,10 @@ def my_task(self, payload): ...


def worker_task(*args: Any, **kwargs: Any) -> Any:
"""Register a function as a worker task via the queue_backend seam.
Today this is a one-line passthrough to ``celery.shared_task``. The
indirection is the seam: when a later phase adds PG Queue routing,
the consumer-registration logic lands here without touching call
sites.
"""Register a function as a worker task.
The return type is ``Any`` because ``shared_task`` returns different
objects depending on call form — a ``PromiseProxy`` for the bare
``@worker_task`` form and a decorator factory for the parameterised
``@worker_task(name=...)`` form. Pinning a tighter type would lock
out future routing variants without buying real safety today.
``Any`` return type because ``shared_task`` produces a
``PromiseProxy`` for the bare form and a decorator factory for the
parameterised form.
"""
return shared_task(*args, **kwargs)
36 changes: 13 additions & 23 deletions workers/queue_backend/dispatch.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
"""Transport-agnostic task dispatch.
Today: thin pass-through to ``celery.current_app.send_task``.
A later phase will introduce per-task routing through a non-Celery
substrate (PG Queue); call sites stay untouched.
The signature intentionally exposes only what the current call sites
actually use (args, kwargs, queue). More Celery options can be added
when a real call site needs them — not before.
Thin pass-through to ``celery.current_app.send_task``; the indirection
is the seam — a future per-task router can land here without touching
call sites.
"""

from __future__ import annotations
Expand All @@ -16,14 +12,14 @@

from celery import current_app

from .fairness import FAIRNESS_HEADER_NAME, FairnessKey


class DispatchHandle(Protocol):
"""The minimum contract every dispatch substrate must satisfy.
"""Minimum contract every dispatch substrate must satisfy.
Today this is satisfied by Celery's ``AsyncResult`` (which exposes
``.id``). A future PG Queue handle will need to expose the same
attribute so existing callers — e.g. ``scheduler/tasks.py`` — keep
working unchanged.
Celery's ``AsyncResult`` satisfies this via ``.id``; any future
substrate handle must expose the same attribute.
"""

id: str
Expand All @@ -35,24 +31,18 @@ def dispatch(
args: Sequence[Any] | None = None,
kwargs: Mapping[str, Any] | None = None,
queue: str | None = None,
fairness: FairnessKey | None = None,
) -> DispatchHandle:
"""Enqueue a task by name.
Args:
task_name: Registered task name (e.g. "send_webhook_notification").
args: Positional task args. Forwarded verbatim; Celery normalises
``None`` internally.
kwargs: Keyword task args. Forwarded verbatim; Celery normalises
``None`` internally.
queue: Target queue name. Defaults to the task's bound queue.
Returns:
A handle to the enqueued task. ``.id`` is guaranteed; everything
else is substrate-specific and callers must not rely on it.
``fairness`` is attached as the ``x-fairness-key`` header (not in
kwargs). Pass ``None`` for non-workflow worker tasks.
"""
headers = {FAIRNESS_HEADER_NAME: fairness.to_dict()} if fairness is not None else None
return current_app.send_task(
task_name,
args=args,
kwargs=kwargs,
queue=queue,
headers=headers,
)
55 changes: 55 additions & 0 deletions workers/queue_backend/fairness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Workflow-execution fairness key.
Attached to dispatches that start a workflow execution. Non-workflow
worker tasks (notifications, callbacks, healthchecks) pass
``fairness=None``.
"""

from __future__ import annotations

from dataclasses import dataclass
from enum import StrEnum
from typing import Final


class WorkloadType(StrEnum):
"""Workflow execution type. Labs L2 check is binary api-vs-not."""

API = "api"
NON_API = "non_api"


# pipeline_priority bounds per labs schema (1..10, higher = sooner).
MIN_PRIORITY: Final[int] = 1
MAX_PRIORITY: Final[int] = 10
DEFAULT_PRIORITY: Final[int] = 5

# Header (not kwarg) so task-body signatures without **kwargs aren't broken.
FAIRNESS_HEADER_NAME: Final[str] = "x-fairness-key"


@dataclass(frozen=True)
class FairnessKey:
"""Routing metadata for a workflow-execution dispatch.
``org_id=None`` is valid for cross-org tasks — the scheduler's
``org_config`` JOIN simply doesn't match.
"""

org_id: str | None
workload_type: WorkloadType
pipeline_priority: int = DEFAULT_PRIORITY
Comment thread
muhammad-ali-e marked this conversation as resolved.

def __post_init__(self) -> None:
if not MIN_PRIORITY <= self.pipeline_priority <= MAX_PRIORITY:
raise ValueError(
"pipeline_priority out of range "
f"[{MIN_PRIORITY}, {MAX_PRIORITY}]: {self.pipeline_priority}"
)

def to_dict(self) -> dict[str, str | int | None]:
return {
"org_id": self.org_id,
"workload_type": self.workload_type.value,
"pipeline_priority": self.pipeline_priority,
}
24 changes: 14 additions & 10 deletions workers/scheduler/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import traceback
from typing import Any

from queue_backend import dispatch, worker_task
from queue_backend import FairnessKey, dispatch, worker_task
from queue_backend.fairness import WorkloadType
from shared.enums.status_enums import PipelineStatus
from shared.enums.worker_enums import QueueName
from shared.infrastructure.config import WorkerConfig
Expand Down Expand Up @@ -150,21 +151,24 @@ def _execute_scheduled_workflow(
)

try:
# Dispatch through the queue_backend seam (Celery underneath today).
async_result = dispatch(
"async_execute_bin",
args=[
context.organization_id, # schema_name (organization_id)
context.workflow_id, # workflow_id
execution_id, # execution_id
{}, # hash_values_of_files (empty for scheduled)
True, # scheduled (THIS IS A SCHEDULED EXECUTION)
context.organization_id,
context.workflow_id,
execution_id,
{},
True, # scheduled
],
kwargs={
"use_file_history": context.use_file_history, # Pass as kwarg
"pipeline_id": context.pipeline_id, # CRITICAL FIX: Pass pipeline_id for direct status updates
"use_file_history": context.use_file_history,
"pipeline_id": context.pipeline_id,
},
queue=QueueName.GENERAL, # Route to General queue for proper separation
queue=QueueName.GENERAL,
fairness=FairnessKey(
org_id=context.organization_id,
workload_type=WorkloadType.NON_API,
),
)

task_id = async_result.id
Expand Down
1 change: 1 addition & 0 deletions workers/shared/patterns/notification/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def send_notification_to_worker(
"platform": platform,
},
queue="notifications",
fairness=None, # not a workflow-execution dispatch
)

logger.info(
Expand Down
Loading
Loading