UN-3501 [FEAT] Plumb fairness key on every dispatch() call site#2003
UN-3501 [FEAT] Plumb fairness key on every dispatch() call site#2003muhammad-ali-e wants to merge 12 commits into
Conversation
Phase 5.1 of the PG Queue rollout (epic UN-3445). Adds multi-tenant
routing metadata to every dispatch() — org_id, pipeline_priority, tier —
emitted under kwargs["_fairness_key"]. No consumer reads it yet;
Phase 8 (PG Queue Gate) will introduce the reader.
What
* New workers/queue_backend/fairness.py: FairnessKey frozen dataclass +
FairnessKey.system() / FairnessKey.for_org(...) constructors +
FAIRNESS_KWARG_NAME constant. to_dict() produces a JSON-safe nested
dict that round-trips through Celery's serializer.
* queue_backend.dispatch() accepts optional fairness= kwarg; when
provided, merges into a *copy* of the outgoing kwargs (no caller
mutation).
* Two existing production dispatch sites pass fairness:
- shared/patterns/notification/helper.py::send_notification_to_worker
derives org_id from payload.organization_id.
- scheduler/tasks.py::_execute_scheduled_workflow derives org_id
from context.organization_id.
* New tests/test_fairness_key.py (14 tests):
- FairnessKey value-object semantics (frozen, JSON-safe, defaults).
- dispatch() integration (omit ≡ no field added, provided ≡ slotted
under _fairness_key, caller kwargs not mutated).
- AST-based inventory canary: every production dispatch(...) must
pass fairness=. Restricted to bare-name callees so
ExecutionDispatcher.dispatch(...) (different concept) isn't
audited.
- Additive-only canary: no consumer in workers/ references
_fairness_key or FAIRNESS_KWARG_NAME. Phase 8 lifts this.
Why
PG Queue's eventual fairness scheduler needs structured routing data
to dispatch on. Producer-side plumbing now means Phase 8 can land the
reader without ever having to backfill payloads at runtime.
No regression risk
* Additive only — kwargs gain one extra entry under an underscored
slot. No business kwargs change shape.
* No worker code reads it (proved by the additive-only canary).
* Default-on is identical to default-off — no flag needed.
Test count: 31 → 45 (new fairness suite 14; __all__ update from 2 to 3
exports in the seam suite).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds a frozen FairnessKey dataclass and FAIRNESS_HEADER_NAME; extends dispatch() to accept an optional FairnessKey and send it as a Celery header; updates scheduled and notification dispatch call sites to pass fairness metadata; and adds unit, integration, AST, worker-scan, and end-to-end tests. ChangesQueue Backend Fairness Metadata Support
🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Bug: producer-side dispatch() merged the fairness key into the task
body's kwargs as ``_fairness_key``. Celery then passed it to the task
function as a keyword argument, and any task whose signature does NOT
accept ``**kwargs`` blew up at consumption time with:
TypeError: send_webhook_notification() got an unexpected keyword
argument '_fairness_key'
Observed on send_webhook_notification (fixed signature). async_execute_bin
tolerated it silently via its ``**kwargs`` catch-all — semantically wrong
either way: fairness is routing metadata, not business payload.
Fix: route fairness via Celery's message-header slot
(``headers={"x-fairness-key": ...}``). Headers travel with the AMQP
message and reach consumers via ``self.request.headers`` (bind=True),
but are NEVER passed to the task body's function signature. Phase 8's
reader will look at ``self.request.headers`` regardless, so this is the
right slot from the start.
Changes:
* queue_backend/dispatch.py — pass fairness as ``headers={...}``
instead of merging into ``kwargs``. Caller kwargs are now strictly
untouched.
* queue_backend/fairness.py — rename ``FAIRNESS_KWARG_NAME`` to
``FAIRNESS_HEADER_NAME`` ("x-fairness-key", HTTP-style spelling
matching AMQP convention). Module docstring updated.
* tests/test_fairness_key.py — assertions look at
``send_task.call_args.kwargs["headers"]`` instead of kwargs; new
positive assertion that business kwargs are unchanged when fairness
is present; "no consumer reads it yet" canary updated to grep for
the new tokens.
Test count unchanged: 14 fairness tests + 17 seam + 14 characterisation
= 45 passed.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
muhammad-ali-e
left a comment
There was a problem hiding this comment.
PR Review Toolkit — UN-3501 Fairness Key
Focused review covering code quality, silent failures, type design, test coverage, comments, simplification, and regression risk. PR scope is small and surgical (2 production call sites + 1 new value object + tests).
Overall: the headers-instead-of-kwargs rework (commit 6581184) is the right fix — moves routing metadata off the function-signature surface so non-**kwargs tasks don't blow up. Public API stays back-compat. Inventory canary is a nice safeguard.
Main concerns (prioritised):
- Stale docs —
to_dict()docstring and the module-level invariant still referencekwargs["_fairness_key"]. The implementation moved to a header (x-fairness-key); the docs didn't. Actively misleading for the next reader. (HIGH) for_org(**overrides)silently swallows typos —priority=80ortiers="enterprise"get dropped on the floor and defaults apply. Plusoverrides: object+int()/str()coercions paper over real bugs. Use explicit keyword-only params. (HIGH)- Weak invariants on
tier/pipeline_priority— both are unboundedstr/int. PG Queue Phase 8 will be the one to discovertier="enterprize"orpipeline_priority=10_000_000. Tighten now while there are only two callers. (MEDIUM) - Canary misses aliased imports —
from queue_backend import dispatch as fooslips pastast.Name and callee.id == "dispatch". Either lock the import form or expand the matcher. (MEDIUM) FairnessKey.system()adds no invariant beyond defaults — semantically identical toFairnessKey(org_id=None). Either drop the helper or wire it to a distincttier="system"so the "system partition" claim is encoded in data, not naming. (LOW)
Regression check: keyword-only fairness=None, headers=None accepted by Celery 5.5+, two producer sites covered, no current_app.send_task(...) bypasses the seam, to_dict() JSON-safety asserted. No regressions found other than item 4's canary blind spot.
Tests: strong unit coverage; minor gaps — see inline notes for pytest.raises(FrozenInstanceError) and the aliased-import canary gap.
Comment Analyzer — stale docs (2 places): * fairness.py module docstring: ``_fairness_key today`` was dead vocabulary post header-migration; replaced with ``x-fairness-key`` and pinned the canary class. * to_dict() docstring no longer references kwargs. Type Design Analyzer: * Bound pipeline_priority to [0, 100] via __post_init__ — out-of-range values raise ValueError at construction. New MIN_PRIORITY / MAX_PRIORITY constants. * Tightened tier from str to Literal["standard", "enterprise", "system"]. New SYSTEM_TIER constant. * FairnessKey.system() now returns cls(org_id=None, tier="system") — encodes the partition in the message shape rather than leaving it implicit via org_id is None. Phase 8 scheduler matches on a single closed-set field. Silent Failure Hunter (HIGH): * for_org replaced **overrides catch-all with explicit keyword-only params (pipeline_priority, tier). A typo like priority=80 or tiers="enterprise" now raises TypeError at the call site instead of silently dropping the override. Code Simplifier (minor): * dispatch.py: ternary instead of mutable headers rebinding. PR Test Analyzer: * test_is_frozen now uses pytest.raises(FrozenInstanceError) — catches the actual exception type, not "any Exception with 'frozen' in the message". * New tests: priority range rejection (below/above/boundaries), for_org typo rejection, system-key tier encoding. * Inventory canary hardened: new test_dispatch_must_be_imported_unaliased walks ImportFrom nodes and forbids ``from queue_backend import dispatch as <alias>``. Without it the bare-name AST visitor would miss alias-imported call sites — exactly the failure mode the canary exists to prevent. * New end-to-end test_header_present_on_outbound_message constructs a real Celery app on a memory broker, wraps send_task, and asserts the fairness header is attached in the documented shape. Catches the rare-but-expensive case where a future Celery/kombu upgrade silently drops unknown headers. Test count: 14 -> 21 fairness tests (45 -> 52 total). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Review feedback addressed —
|
| Finding | Status |
|---|---|
fairness.py:17 — module docstring still mentioned _fairness_key |
✅ Replaced with x-fairness-key and pinned the test class (test_fairness_key.py::TestNoConsumerYet) so future readers know exactly where the invariant is locked. |
fairness.py:57 — to_dict() docstring described the kwargs-merge design |
✅ Rewrote to "JSON-safe representation carried in the Celery message header x-fairness-key". |
Type Design Analyzer (3/3 fixed)
| Finding | Status |
|---|---|
fairness.py:53 — pipeline_priority: int unbounded |
✅ Added MIN_PRIORITY / MAX_PRIORITY constants + __post_init__ validator. Out-of-range values raise ValueError at construction. New tests pin the boundaries and rejection paths. |
fairness.py:54 — tier: str typo-prone |
✅ Tightened to Literal["standard", "enterprise", "system"] via a new Tier alias. New SYSTEM_TIER constant. mypy now catches typos like "enterprize" or "Standard". |
fairness.py:65 — system() adds an alias, not an invariant |
✅ Adopted the second suggestion ("encode the partition in data") — FairnessKey.system() now returns cls(org_id=None, tier=SYSTEM_TIER). Phase 8's scheduler can match on the closed-set tier field alone, no special-casing org_id is None. Module + method docstrings explain the choice. New test test_system_key_encodes_partition_in_tier pins it. |
Silent Failure Hunter (HIGH) (1/1 fixed)
| Finding | Status |
|---|---|
fairness.py:70 — for_org(**overrides) swallows misspelled keys |
✅ Replaced **overrides with explicit keyword-only params (pipeline_priority, tier). New test_for_org_rejects_misspelled_kwargs asserts that priority=80 and tiers="enterprise" now raise TypeError. |
Code Simplifier (1/1 fixed)
| Finding | Status |
|---|---|
dispatch.py:65 — two-line setup with mutable rebinding |
✅ Collapsed to a single ternary on one line. Annotation dropped (inferable). |
PR Test Analyzer (3/3 fixed)
| Finding | Status |
|---|---|
test_fairness_key.py:57 — try/except/else ladder for "is frozen" |
✅ Replaced with pytest.raises(FrozenInstanceError). Catches the actual exception type instead of "any Exception whose message contains 'frozen' OR any AttributeError". |
test_fairness_key.py:185 — canary blind spot for aliased imports |
✅ Went with Option 2 (lock the import form). New test_dispatch_must_be_imported_unaliased walks ImportFrom nodes and asserts that no file imports dispatch under an alias. The bare-name AST visitor downstream now has the assumption it needs. |
test_fairness_key.py:246 — coverage gap on header survival through Celery |
✅ Added TestHeaderSurvivesCeleryPipeline::test_header_present_on_outbound_message. Constructs a real Celery(broker="memory://") app, patches the seam's current_app onto it, wraps send_task, dispatches, and asserts the fairness header lands on the outbound call in the documented shape. Catches the kombu / Celery upgrade scenario where unknown headers might be quietly dropped. |
Test count
test_fairness_key.py: 14 → 21 teststest_queue_backend_seam.py: unchanged (17)test_dispatch_sites_characterisation.py: unchanged (14)- Total: 45 → 52 — all green in 4.49s
Notable design choice locked in
Per the type analyzer suggestion #3, FairnessKey.system() now emits tier="system" in the message itself. Phase 8's scheduler (and any future debugging tooling) can match on the closed-set tier field without having to remember that org_id is None also means "system partition". The message is self-describing.
SonarCloud (python:S3776) flagged test_dispatch_must_be_imported_unaliased at cognitive complexity 22 (threshold 15). The companion test_every_production_dispatch_passes_fairness sat just under the threshold but carried the same nesting shape. Refactor: extract three module-level helpers so both tests collapse to flat list comprehensions. * _iter_production_trees() walks workers/, skips tests/__pycache__/htmlcov/.venv/queue_backend, parses each file, swallows SyntaxError, yields (rel_path, tree). * _aliased_dispatch_imports(tree) returns (lineno, alias) for every ``from queue_backend import dispatch as <name>``. * _dispatch_calls_missing_fairness(tree) returns linenos of bare dispatch(...) calls without ``fairness=``. Each test body is now ~6 lines, no nested visitor class, no for/if/try/for/if ladder. Same coverage — both tests still pass and still catch the same offences. The helpers are private (underscore prefix) and live in the test module since no other module needs them. Test count unchanged: 52 passed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
| Filename | Overview |
|---|---|
| workers/queue_backend/fairness.py | New frozen dataclass FairnessKey with WorkloadType enum, priority bounds validation, and to_dict(). Clean and correct; no issues found. |
| workers/queue_backend/dispatch.py | Adds fairness kwarg to dispatch(); routes the key as a Celery message header (x-fairness-key) rather than mutating task kwargs. Clean implementation. |
| workers/tests/test_fairness_key.py | Comprehensive test suite, but the canary's error message references FairnessKey.system() which doesn't exist — a developer following the guidance would get an AttributeError. |
| workers/scheduler/tasks.py | Adds FairnessKey(org_id=context.organization_id, workload_type=WorkloadType.NON_API) to the scheduled workflow dispatch. Correct usage. |
| workers/shared/patterns/notification/helper.py | Adds fairness=None opt-out to the notification dispatch with an explanatory comment. Correct. |
| workers/queue_backend/init.py | Exports FairnessKey in __all__ alongside existing dispatch and worker_task. Minor docstring trim. |
Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
workers/tests/test_fairness_key.py:252-258
The canary's failure message tells future developers to call `FairnessKey.system()`, but that classmethod does not exist on `FairnessKey`. Any developer who adds a new dispatch site, triggers this canary, and follows the error message verbatim will immediately get `AttributeError: type object 'FairnessKey' has no attribute 'system'` — defeating the self-service purpose of the canary entirely. The sentinel for cross-org / system tasks is `FairnessKey(org_id=None, workload_type=WorkloadType.NON_API)` (or `API`, depending on context).
```suggestion
assert offenders == [], (
"Production dispatch(...) call site(s) missing fairness=. "
"Every production dispatch must declare its fairness key — "
"pass ``fairness=FairnessKey(org_id=None, workload_type=WorkloadType.NON_API)`` "
"for system / cross-org tasks if there's no tenant context. Found:\n "
+ "\n ".join(offenders)
)
```
Reviews (6): Last reviewed commit: "UN-3501 [REFACTOR] WorkloadType -> StrEn..." | Re-trigger Greptile
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@workers/shared/patterns/notification/helper.py`:
- Line 90: send_notification_to_worker is currently called with
fairness=FairnessKey.for_org(payload.organization_id) which passes None when
NotificationPayload.from_execution_status didn't set organization_id; update the
logic so callers thread the organization_id into trigger_pipeline_notifications
/ trigger_api_notifications (propagating it from handle_status_notifications
into those functions and into the NotificationPayload) or, where org context is
truly unavailable, pass fairness=FairnessKey.system() instead of
FairnessKey.for_org(None) to avoid misclassifying system callbacks; adjust
send_notification_to_worker invocation sites to use payload.organization_id when
present and FairnessKey.system() when payload.organization_id is None.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 33ea390f-4d12-49f9-bbbe-87945397b288
📒 Files selected for processing (7)
workers/queue_backend/__init__.pyworkers/queue_backend/dispatch.pyworkers/queue_backend/fairness.pyworkers/scheduler/tasks.pyworkers/shared/patterns/notification/helper.pyworkers/tests/test_fairness_key.pyworkers/tests/test_queue_backend_seam.py
Greptile #1 (P2) — for_org silently accepts org_id=None: * fairness.py: tightened ``for_org`` signature to ``org_id: str`` (was ``str | None``) and added an explicit ``ValueError`` guard. Passing None would have produced ``FairnessKey(org_id=None, tier="standard")`` — inconsistent with the module's own promise that no-tenant tasks use ``FairnessKey.system()`` (``tier="system"``). Phase 8's scheduler no longer has to special-case a None org in the "standard" partition. * New test ``test_for_org_rejects_none_org_id`` locks the rejection. CodeRabbit (Major) — call site can hit the None path: * shared/patterns/notification/helper.py: ``payload.organization_id`` is ``str | None`` (NotificationPayload default). Callback paths build payloads without org context. With Greptile #1's tightening, ``for_org(None)`` raises, so the call site now picks the right constructor: ``FairnessKey.for_org(org_id) if org_id else FairnessKey.system()``. * Scheduler site unchanged — ``ScheduledPipelineContext.organization_id`` is non-Optional and validated in __post_init__, so it cannot be None at the call site. Greptile #2 (P2) — duplicated skip_top_dirs: * test_fairness_key.py: ``test_no_consumer_reads_fairness_header`` used a local ``skip_top_dirs`` copy of the module-level ``_SKIP_TOP_DIRS`` frozenset. If a new dir is added to one set the two canaries would scope differently. Switched to ``_SKIP_TOP_DIRS`` (and ``_WORKERS_ROOT``) so both canaries share one source of truth. Test count: 52 -> 53. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
New review findings addressed —
|
| Constructor | Result |
|---|---|
FairnessKey.system() |
org_id=None, tier="system" ✅ matches design |
FairnessKey.for_org(None) |
org_id=None, tier="standard" ❌ inconsistent — Phase 8 would have to special-case this |
The module docstring already states the no-tenant convention is tier="system", but for_org quietly let callers create the broken shape.
Fix: Tightened for_org signature to org_id: str (was str | None) and added an explicit ValueError guard with a message pointing callers at FairnessKey.system(). New test_for_org_rejects_none_org_id locks it.
CodeRabbit (Major) — helper.py:90 produces the inconsistent key in practice
Valid. NotificationPayload.organization_id: str | None = None (default). Callback paths build payloads via NotificationPayload.from_execution_status(...) which doesn't pass organization_id, so payload.organization_id can be None at the dispatch site.
With Greptile #1's tightening, that call now raises — the right move is to pick the constructor at the call site:
fairness=(
FairnessKey.for_org(payload.organization_id)
if payload.organization_id
else FairnessKey.system()
),Scheduler site unchanged — ScheduledPipelineContext.organization_id: str (not Optional), and __post_init__ raises if it's falsy. So for_org(context.organization_id) is statically safe there.
Greptile #2 (P2) — test_fairness_key.py:315 duplicates _SKIP_TOP_DIRS
Valid. When I refactored the AST canaries last round (cognitive-complexity fix), I introduced module-level _SKIP_TOP_DIRS and _WORKERS_ROOT constants but test_no_consumer_reads_fairness_header still had its own copy. Drift risk for future directory additions.
Fix: Replaced the local copy + pathlib.Path(__file__)... with the module-level constants. Both audit tests now share one source of truth.
Test count
test_fairness_key.py: 21 → 22 tests (newtest_for_org_rejects_none_org_id)- Total: 52 → 53 — all green in 5.15s
Branch state
- Commit
098c329erebased onto remote merge-from-main →97c6623c. - PR head now reflects all three fixes.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
workers/tests/test_fairness_key.py (1)
158-158: ⚡ Quick winEscape the regex metacharacter in
match=.The
match=argument is treated as a regex (re.search), so the.in"use FairnessKey.system"matches any character, weakening the assertion. This is also what Ruff RUF043 flags.♻️ Proposed fix
- with pytest.raises(ValueError, match="use FairnessKey.system"): + with pytest.raises(ValueError, match=r"use FairnessKey\.system"): FairnessKey.for_org(None) # type: ignore[arg-type]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@workers/tests/test_fairness_key.py` at line 158, The test's pytest.raises() uses match="use FairnessKey.system" which treats the dot as a regex wildcard; update the assertion in the pytest.raises call (the call to pytest.raises(ValueError, match=...)) to escape the dot so the pattern matches a literal period (e.g., use a raw string with a backslash: r"use FairnessKey\.system" or apply re.escape) to ensure the error message is matched exactly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@workers/tests/test_fairness_key.py`:
- Line 158: The test's pytest.raises() uses match="use FairnessKey.system" which
treats the dot as a regex wildcard; update the assertion in the pytest.raises
call (the call to pytest.raises(ValueError, match=...)) to escape the dot so the
pattern matches a literal period (e.g., use a raw string with a backslash: r"use
FairnessKey\.system" or apply re.escape) to ensure the error message is matched
exactly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f3ea5340-4286-4400-8d93-14d270772401
📒 Files selected for processing (3)
workers/queue_backend/fairness.pyworkers/shared/patterns/notification/helper.pyworkers/tests/test_fairness_key.py
🚧 Files skipped from review as they are similar to previous changes (2)
- workers/shared/patterns/notification/helper.py
- workers/queue_backend/fairness.py
…irness/dispatch comments Per @chandrasekharan-zipstack's NITs on PR #2003: code comments that name specific roadmap stages (``Phase 8 (PG Queue Gate)``, ``PR #15``) or capture migration history (``earlier iteration of this module put the key in kwargs``) go stale fast and should describe what the code does, not when in the plan it lives. Changes: * fairness.py - ``Tier`` Literal comment now explains what tiers actually do (cross-tenant resource-allocation tag for preemption / capacity decisions), addressing Chandra's clarifying question on line 33. - MIN/MAX_PRIORITY comment dropped ``Phase 8 then has to special-case`` rationale; the bound itself is the contract. - FAIRNESS_HEADER_NAME comment dropped the kwargs-iteration history. - Module docstring + ``FairnessKey``/``system()`` docstrings: ``Phase 8`` / ``PG Queue Gate`` / ``the future PG Queue scheduler`` -> ``a future dispatch scheduler`` / ``the scheduler``. * dispatch.py - Module docstring + DispatchHandle Protocol + ``dispatch()`` docstring: same genericisation. No behaviour change. 53/53 tests still pass in 5.51s. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…kload_type Following @chandrasekharan-zipstack's question on PR #2003: traced the ``tier`` concept back to the labs design at ``Zipstack/labs:labs-ali/workflow-execution-architecture/docs/pg-queue-implementation-guide.md`` and found three real discrepancies in my implementation: 1. **Tier is server-side, not on the wire.** Per labs: ``CREATE TABLE org_config (org_id UUID PRIMARY KEY, burst_max INTEGER, tier_priority INTEGER NOT NULL DEFAULT 5 -- 1=highest priority)``. The scheduler JOINs ``staging_queue`` against ``org_config`` to find tier; it isn't carried on the task payload. 2. **The third payload field is ``workload_type``, not ``tier``.** Labs ORDER BY: L1: ``oc.tier_priority ASC`` — from org_config JOIN L2: ``(sq.workload_type = 'api')::int DESC`` — from payload L3: ``sq.pipeline_priority DESC`` — from payload So the payload triple is ``(org_id, workload_type, pipeline_priority)``, not ``(org_id, pipeline_priority, tier)``. My memory's ``tier``-named summary fused the L1 dimension's name with the L2 payload field — they're different things. 3. **Priority range is 1..10, not 0..100.** Labs schema: ``pipeline_priority INTEGER DEFAULT 5, -- 1-10``. My bounds were off by an order of magnitude. Changes: * fairness.py - ``Tier = Literal["standard", "enterprise", "system"]`` -> ``WorkloadType = Literal["api", "etl"]`` (matches labs SQL exactly). - ``tier`` field on ``FairnessKey`` -> ``workload_type``. - ``MIN_PRIORITY/MAX_PRIORITY`` 0/100 -> 1/10; ``DEFAULT_PRIORITY`` 50 -> 5. - Dropped ``DEFAULT_TIER``, ``SYSTEM_TIER``, the ``Tier`` alias. - Dropped ``FairnessKey.system()`` and ``FairnessKey.for_org()`` helpers — direct ``FairnessKey(org_id=..., workload_type=...)`` construction is small, typo-safe (dataclass kwargs are checked), and removes guess-y semantics about what tier "system" tasks should carry. - Module docstring now cites the labs source file inline so the architectural source of truth is one click away. * shared/patterns/notification/helper.py — webhook delivery is customer-facing API traffic, so ``workload_type="api"``. ``org_id`` passed as-is (can be None on callback paths). * scheduler/tasks.py — scheduled pipelines fire ETL-style batch executions, so ``workload_type="etl"``. ``context.organization_id`` is non-Optional and validated upstream. * tests/test_fairness_key.py — restructured. Direct construction throughout (no for_org / system helpers to test). New tests pin workload_type semantics, priority bounds at 1..10, and typo-rejection via the dataclass. Test count: 53 -> 51 (lost 4 helper-method tests, gained 2 workload_type tests). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
workers/tests/test_fairness_key.py (2)
7-8:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winStale wire-shape in docstring —
tierwas replaced byworkload_type.This bullet still documents the header dict as
org_id,pipeline_priority,tier, but the contract (and this file's own assertions at Lines 210-216) is now{org_id, workload_type, pipeline_priority}.📝 Proposed doc fix
-1. The shape on the wire is stable (``headers["x-fairness-key"]`` is a - JSON-safe dict with ``org_id``, ``pipeline_priority``, ``tier``). +1. The shape on the wire is stable (``headers["x-fairness-key"]`` is a + JSON-safe dict with ``org_id``, ``workload_type``, ``pipeline_priority``).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@workers/tests/test_fairness_key.py` around lines 7 - 8, Update the docstring/wire-shape comment to match the current contract: replace the outdated "tier" key with "workload_type" so the header shape is documented as headers["x-fairness-key"] being a JSON-safe dict with org_id, workload_type, pipeline_priority; locate the docstring near the test_helpers or top of workers/tests/test_fairness_key.py where headers["x-fairness-key"] is described and change the three-key list accordingly to align with the assertions that validate org_id, workload_type, and pipeline_priority.
291-297:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winCanary failure message references the removed
FairnessKey.system().
system()(andfor_org()) were dropped in favor of direct construction. A developer who trips this canary would be told to call a constructor that no longer exists. Point them at the org-less form (org_id=None) instead.📝 Proposed message fix
assert offenders == [], ( "Production dispatch(...) call site(s) missing fairness=. " "Every production dispatch must declare its fairness key — " - "pass ``fairness=FairnessKey.system()`` for system / " - "cross-org tasks if there's no tenant context. Found:\n " + "pass ``fairness=FairnessKey(org_id=None, workload_type=...)`` " + "for system / cross-org tasks if there's no tenant context. Found:\n " + "\n ".join(offenders) )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@workers/tests/test_fairness_key.py` around lines 291 - 297, The canary failure message still suggests the removed FairnessKey.system() (and for_org()), which is misleading; update the assertion message that mentions "pass ``fairness=FairnessKey.system()``" to instead point to the org-less constructor form (e.g. "pass ``fairness=FairnessKey(org_id=None)``"), so the offenders variable / "Production dispatch(...) call site(s) missing fairness=." message directs developers to use FairnessKey(org_id=None) for system / cross-org tasks with no tenant context.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@workers/tests/test_fairness_key.py`:
- Around line 7-8: Update the docstring/wire-shape comment to match the current
contract: replace the outdated "tier" key with "workload_type" so the header
shape is documented as headers["x-fairness-key"] being a JSON-safe dict with
org_id, workload_type, pipeline_priority; locate the docstring near the
test_helpers or top of workers/tests/test_fairness_key.py where
headers["x-fairness-key"] is described and change the three-key list accordingly
to align with the assertions that validate org_id, workload_type, and
pipeline_priority.
- Around line 291-297: The canary failure message still suggests the removed
FairnessKey.system() (and for_org()), which is misleading; update the assertion
message that mentions "pass ``fairness=FairnessKey.system()``" to instead point
to the org-less constructor form (e.g. "pass
``fairness=FairnessKey(org_id=None)``"), so the offenders variable / "Production
dispatch(...) call site(s) missing fairness=." message directs developers to use
FairnessKey(org_id=None) for system / cross-org tasks with no tenant context.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d8e8aa83-ac19-4dc4-b755-2fcbf67b494d
📒 Files selected for processing (4)
workers/queue_backend/fairness.pyworkers/scheduler/tasks.pyworkers/shared/patterns/notification/helper.pyworkers/tests/test_fairness_key.py
🚧 Files skipped from review as they are similar to previous changes (1)
- workers/scheduler/tasks.py
…w-execution dispatches; trim comments Three corrections rolled together, all triggered by review feedback: WorkloadType is now a StrEnum (one source of truth) Replaced the Literal["api", "non_api"] form with class WorkloadType(StrEnum). Producers import WorkloadType.API / NON_API instead of scattering string literals across call sites. StrEnum members are str subclasses so JSON serialisation is free; to_dict() still calls .value explicitly so the wire payload is a plain string (downstream consumers don't need to import the enum to compare). Fairness applies only to workflow-execution dispatches Earlier confusion treated workload_type as a worker-type tag. Corrected: it's the type of the WORKFLOW being executed (API-deployment vs ETL/Task/App). Notification dispatch (send_webhook_notification on the notifications queue) is a worker-internal task, not a workflow execution — it now passes fairness=None explicitly. The scheduler dispatch (async_execute_bin) does start a workflow execution and keeps FairnessKey(..., workload_type=WorkloadType.NON_API). The inventory canary still requires fairness= to be present on every dispatch (any value, including None) — the point is the conscious choice at the call site, not the value. New test test_explicit_fairness_none_no_header_sent proves fairness=None produces the same on-wire shape as omitting the arg, without forcing producers to invent a workflow_type for tasks that aren't workflow executions. Comment hygiene Trimmed docstrings and inline comments to keep only what's necessary (external citations, surprising design choices, non-obvious invariants). Removed: * Plan-stage references (Phase 8, PR #15). * Private-repo path (Zipstack/labs:labs-ali/...) from the public OSS codebase — same rule as PR descriptions. * Migration history paragraphs that belong in commit messages, not source. * Test docstrings that paraphrased the test name. * Inline comments that restated what the code obviously does. Net: ~267 lines deleted, ~114 added across 7 files (~50% reduction in the seam module's documentation footprint). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
workers/tests/test_fairness_key.py (1)
252-258:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winStale guidance —
FairnessKey.system()no longer exists.The
system()/for_org()helpers were dropped in a prior commit, but this canary failure message still tells developers to `passfairness=FairnessKey.system(). A future contributor hitting this assertion will be sent to a non-existent API.📝 Suggested message fix
assert offenders == [], ( "Production dispatch(...) call site(s) missing fairness=. " "Every production dispatch must declare its fairness key — " - "pass ``fairness=FairnessKey.system()`` for system / " - "cross-org tasks if there's no tenant context. Found:\n " + "pass an explicit ``fairness=FairnessKey(...)`` for " + "workflow-execution dispatches, or ``fairness=None`` for " + "non-workflow tasks. Found:\n " + "\n ".join(offenders) )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@workers/tests/test_fairness_key.py` around lines 252 - 258, The assertion message references a removed helper FairnessKey.system() and will mislead contributors; update the failure text to remove any reference to FairnessKey.system() (and the removed for_org() helper) and instead instruct callers to pass an explicit fairness argument to dispatch (mentioning the fairness parameter and FairnessKey type), e.g. "pass an explicit FairnessKey (tenant-specific or otherwise) via fairness=..."; ensure the message still includes the offenders list and references dispatch(...) and the fairness parameter so users know where to fix their calls.
🧹 Nitpick comments (1)
workers/queue_backend/fairness.py (1)
11-11: ⚡ Quick winPython version requirement already targets >=3.12; StrEnum is safe
workers/pyproject.tomldeclaresrequires-python = ">=3.12", sofrom enum import StrEnumwon’t raiseImportError.
- Optional docstring cleanup: the
WorkloadTypedocstring includes “Labs L2” jargon (line 16).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@workers/queue_backend/fairness.py` at line 11, The import of StrEnum is fine given requires-python >=3.12; update the WorkloadType class docstring to remove internal jargon ("Labs L2") and make it a clear, public-facing description of what the enum represents (e.g., "Workflow execution type: API vs non-API"), editing the WorkloadType class docstring to be concise and descriptive.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@workers/tests/test_fairness_key.py`:
- Around line 252-258: The assertion message references a removed helper
FairnessKey.system() and will mislead contributors; update the failure text to
remove any reference to FairnessKey.system() (and the removed for_org() helper)
and instead instruct callers to pass an explicit fairness argument to dispatch
(mentioning the fairness parameter and FairnessKey type), e.g. "pass an explicit
FairnessKey (tenant-specific or otherwise) via fairness=..."; ensure the message
still includes the offenders list and references dispatch(...) and the fairness
parameter so users know where to fix their calls.
---
Nitpick comments:
In `@workers/queue_backend/fairness.py`:
- Line 11: The import of StrEnum is fine given requires-python >=3.12; update
the WorkloadType class docstring to remove internal jargon ("Labs L2") and make
it a clear, public-facing description of what the enum represents (e.g.,
"Workflow execution type: API vs non-API"), editing the WorkloadType class
docstring to be concise and descriptive.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: fb70c71b-a2f8-41d7-81ca-588116fe60de
📒 Files selected for processing (7)
workers/queue_backend/__init__.pyworkers/queue_backend/decorator.pyworkers/queue_backend/dispatch.pyworkers/queue_backend/fairness.pyworkers/scheduler/tasks.pyworkers/shared/patterns/notification/helper.pyworkers/tests/test_fairness_key.py
✅ Files skipped from review due to trivial changes (1)
- workers/queue_backend/decorator.py
🚧 Files skipped from review as they are similar to previous changes (2)
- workers/queue_backend/init.py
- workers/queue_backend/dispatch.py
Unstract test resultsPer-group results
Critical paths
|
…tem() Greptile P1: the failure message for the inventory canary told future developers to call ``FairnessKey.system()`` — a classmethod removed in the prior commit when helper constructors were dropped in favour of direct dataclass construction. A developer who triggered the canary and followed the message verbatim would have hit ``AttributeError: type object 'FairnessKey' has no attribute 'system'``, defeating the canary's self-service purpose. Updated the message to point at the two real options the canary actually accepts: * ``fairness=FairnessKey(org_id=..., workload_type=WorkloadType...)`` for a workflow-execution dispatch. * ``fairness=None`` for a worker-internal task that doesn't start a workflow execution (notifications, callbacks, healthchecks). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>



What
workers/queue_backend/fairness.py—FairnessKeyfrozen dataclass (org_id,pipeline_priority,tier) +system()/for_org(...)constructors +FAIRNESS_KWARG_NAMEconstant.to_dict()is JSON-safe.queue_backend.dispatch()accepts optionalfairness=kwarg; when provided, merges into a copy of the outgoing kwargs under_fairness_key(no caller-side mutation).dispatch(...)sites now declare a fairness key:shared/patterns/notification/helper.py::send_notification_to_worker— derivesorg_idfrompayload.organization_id.scheduler/tasks.py::_execute_scheduled_workflow— derivesorg_idfromcontext.organization_id.tests/test_fairness_key.py(14 tests): value-object semantics, dispatch() integration, AST-based inventory canary (every productiondispatch(...)must passfairness=), and an additive-only canary (no worker code reads_fairness_keyyet).Why
PG Queue Phase 5 (epic UN-3445) makes every queue-crossing payload machine-readable so a later phase has structured data to dispatch on. Phase 8's PG Queue fairness scheduler will route by
org_id(per-tenant partition),pipeline_priority(within-tenant ordering), andtier(cross-tier preemption). Producer-side plumbing now means Phase 8 can introduce the reader without backfilling in-flight payloads.How
kwargs["_fairness_key"]until Phase 8 introduces the reader.inspect.active()).FairnessKey.system()is the sentinel for tasks with no tenant context (periodic log flush, healthchecks). PG Queue will treat those as a distinct "system" partition rather than as belonging to any tenant.dispatch(...)callees soExecutionDispatcher.dispatch(...)infile_processing/structure_tool_task.py— a totally different concept (executor-side RPC, not a queue boundary) — is not falsely audited.Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
No.
_fairness_key(proved bytest_no_consumer_reads_fairness_kwarg). Phase 8 lifts this canary.dispatch(); the function's previous signature is preserved (fairnessis keyword-only and defaults toNone).test_to_dict_is_json_safe.Database Migrations
Env Config
pipeline_priority=50,tier="standard") live infairness.py.Relevant Docs
queue_backend/fairness.pydocument the rollout invariant and the Phase 8 hand-off.Related Issues or PRs
Dependencies Versions
dataclasses,ast,json).Notes on Testing
Verification greps:
Screenshots
N/A (no UI surface).
Checklist
I have read and understood the Contribution Guidelines.