feat: persist and rehydrate scheduling state across root restarts#64
feat: persist and rehydrate scheduling state across root restarts#64kaiitunnz wants to merge 15 commits into
Conversation
First-class drain primitive over destroy_all_workers() so the rolling-restart flow (release in-flight tasks before recreating the worker-managing service) is part of the SDK surface. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
…t [SERVICE]` `flowmesh stack restart server` drains managed workers then recreates only the server/supervisor (--no-deps --force-recreate), leaving Redis and the rest of the stack running. This is the per-node primitive for a rolling image update: recreate the server on each node in turn (root last). No service arg keeps the whole-stack drain/down/up behavior. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The dispatcher's task DAG, ready queue, and epoch state lived only in the root server's memory, so a restart lost all in-flight scheduling. Persist each task's mutable state (status, attempts, assigned worker, failed workers, merge linkage) plus its dependency edges and epoch index to Redis on every transition, and persist per-workflow epoch ordering / frontier. TaskRuntime.rehydrate() rebuilds every live workflow from these durable records on startup: it restores the DAG, re-derives pending deps and the ready queue, and reconstructs epoch frontiers. In-flight tasks stay assigned to their worker — completions that occur during the restart are replayed via the task event stream, and departed workers are reclaimed by the watchdog. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Task lifecycle events flowed over Redis pub/sub, so any TASK_SUCCEEDED / TASK_FAILED emitted while the root server was down was dropped — the dispatcher never learned the outcome. Publish them to a durable Redis stream instead (supervisor relay and watchdog producers), and consume via a blocking stream read that resumes from a persisted cursor, so events emitted during a rolling restart are replayed on startup. mark_succeeded / mark_failed now short-circuit on an already-terminal task so a replayed event can't double-count usage or re-cascade dependents. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Rebuild the scheduler from durable records before the dispatch loop and event consumer start, so a restarted root resumes in-flight workflows. Because this runs inside the ASGI lifespan before it yields, the server does not accept traffic (and its healthcheck does not pass) until rehydration completes — readiness is implicit, no separate probe needed. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Add docs/ROLLING_UPDATES.md (operator flow, root survivability, constraints) and an ARCHITECTURE.md pointer. Tests cover scheduler-state persistence and rehydration (completed/ready restore, in-flight tasks staying dispatched, epoch frontier restore, replay idempotency), the SDK drain_workers() call, and the service-scoped `stack restart` (drain-then-recreate, --no-pull, redis skips drain, unknown-service guard). Extend the runtime test doubles with the new persistence methods. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The server flushed the entire Redis DB on shutdown (atexit + lifespan stop), which wiped the durable scheduler state and event stream a restart rehydrates from — so a rolling image update silently dropped every in-flight workflow. State lifetime belongs to the Redis volume, not the server process: stopping or recreating the server now preserves it, and a planned down/up resumes the queue. Reset to a clean slate via `flowmesh stack clean` (removes the Redis volumes). Removes the now-unused clear_redis_state cleanup module. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
|
@kaiitunnz The purpose of this PR looks sound, but I think the restart contract needs a bit more clarification before we rely on it operationally. It would be helpful to include an e2e pipeline that exercises the intended restart order and demonstrates that queued, dispatched, and in-flight tasks preserve state correctly. A few design questions I’d like to clarify:
|
|
@timzsu This is an initial work on decoupling cluster states from the root server so that they can outlive root restarts, so this PR will cover only basic features. New features or advanced restart semantics can be added in future PRs. Below is my reply to your concerns.
On the e2e pipeline, I have built my own e2e test suites and use them locally for my recent PRs. For this PR, the test covers queued and completed tasks across a server restart. It will be extended to exercise dispatched and in-flight tasks and the intended restart order. I will push the e2e test in this PR, but we have to work on standardizing e2e regression tests in a future PR. |
A replayed or late TASK_DISPATCHED / TASK_STARTED / progress update must not regress a task that already reached DONE/FAILED/CANCELLED. Without the guard, replay after a root restart could move a terminal task back to DISPATCHED and double-count its usage. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
…contract Persist the stream cursor after each handled entry rather than once per batch, shrinking the window of events replayed after a crash. Spell out the at-least-once replay contract (persist transition → emit → handle → advance cursor; idempotent handlers) in the consumer docstring and the rolling-update doc. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
…restart Worker heartbeats are pub/sub and dropped while the root is down, so a surviving worker looks briefly stale once the root restarts. The watchdog would then reap its rehydrated in-flight tasks and needlessly requeue them. Track tasks rehydrated as DISPATCHED/CANCELLING and extend the watchdog's death grace for the owning worker until WORKER_REHYDRATION_GRACE_SEC (default 120s) has elapsed since rehydration, giving the worker time to re-register first. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The task-event stream is length-bounded, so a consumer that stays down long enough for its cursor to fall behind the trim horizon silently loses the events in between. On resume, compare the persisted cursor against the stream's oldest surviving entry and log a warning when entries were trimmed before they were consumed. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The echo executor now honors spec.data.delay_sec and sleeps for that many seconds before producing its result, keeping the task RUNNING long enough to exercise dispatch and recovery paths (e.g. restarting the server mid-task). Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
|
@kaiitunnz Thanks for the clarification, which looks good to me. May you clarify what the worker's server mean? Is it a RunMesh term? |
|
@timzsu Worker server is a FlowMesh term. A FlowMesh cluster consists of two kinds of nodes: a root node and a worker node. The root node holds the cluster states and the Redis instances, while the worker nodes only manage the workers. A worker server is just a server in a worker node. |
A handler exception previously propagated out of the consumer loop and killed the thread, silently stopping every later completion from being processed — a single poison event could wedge the whole control plane. Process each batch in a helper that logs and skips a failing entry (advancing the cursor past it) so the stream keeps flowing; a task left stuck by the skip is reclaimed by the watchdog. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Remove the dead TASK_EVENT_CHANNEL constant (all producers/consumers use the stream); document why rehydrate subtracts only completed deps, not failed ones; trim the startup comment to the non-obvious ordering rationale; drop a stale noqa on test stubs that are no longer over-length. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The no-worker grace window is ephemeral scheduler state that is not persisted, so it resets on a root restart by design. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Purpose
The root holds the dispatcher's scheduling state in memory and task events flowed over fire-and-forget pub/sub, so any restart of the root server — a crash, a deploy, an image bump — lost every in-flight workflow. This makes the root restart-survivable: per-task scheduling state is persisted to Redis on each transition and rebuilt on startup, and task lifecycle events flow through a durable, replayable stream consumed from a persisted cursor. The cost is a brief control-plane pause while the server recreates and rehydrates; workers keep running their tasks throughout.
Rolling node image updates — recreate one node at a time, root last, with no full-cluster teardown — fall out as one application of this, via a per-node
stack restartprimitive. The rollout itself stays operator-driven.Changes
task/runtime.py,registries/workflow.py,clients/redis.py) — persist per-task state, deps, and epoch index to Redis on each transition;TaskRuntime.rehydrate()rebuilds the DAG / ready queue / epoch frontiers on startup.supervisor/services/relay_service.py,services/watchdog.py,services/monitoring.py) — task events move to a durable Redis stream consumed from a persisted cursor (advanced per entry);mark_succeeded/mark_failedare idempotent against replay.main.py) — state lifetime follows the Redis volume, not the process, so a restart rehydrates instead of starting empty; reset viastack clean. Removes the deadcleanup.py.main.py) — rehydrate before the lifespan yields, so readiness is implicit (no traffic until scheduling state is restored).mark_started/mark_dispatched/mark_updatedso a replayed stale event can't regress a terminal task or double-count usage; heartbeat grace for rehydrated in-flight tasks (WORKER_REHYDRATION_GRACE_SEC, default 120s) so a worker catching up after a restart isn't reaped (heartbeats are pub/sub and drop during downtime); trim-horizon detection logs (instead of silently dropping) when the cursor falls behind a bounded stream; the event consumer survives a poison event (logs and skips) instead of the handler exception killing the thread.stack restart [SERVICE](cli/stack/.../stack.py) recreates one Compose service in place (--no-deps --force-recreate, optional--pull), leaving Redis up and draining managed workers first;NodeClient.drain_workers()(sdk/stack/.../node_client.py) exposes the drain overdestroy_all_workers().echodelay_sec(worker/executors/echo_executor.py) — holds a task in flight for testing dispatch/recovery paths.docs/ROLLING_UPDATES.md,ARCHITECTURE.md,CLI.md,ENV.md,EXECUTORS.md; persistence/rehydration, terminal-guard, rehydration-grace, trim-detection, consumer-resilience, echo-delay,drain_workers, andstack restarttests.Design
State is rebuilt, not snapshotted: task IDs are random per parse, so the DAG is reconstructed from persisted per-task records and overlaid with status. Events use a stream + persisted cursor (like the existing log streams) rather than consumer groups; the persist-transition → advance-cursor ordering gives at-least-once delivery, made safe by idempotent handlers and terminal-status guards. In-flight tasks stay assigned on rehydrate — survivors' completions arrive via the stream (no double execution), and departed workers are reclaimed by the watchdog only after the rehydration grace elapses (heartbeats drop during downtime, so a survivor looks briefly stale on resume). Only the server is recreated on a node; Redis stays up so durable state survives.
Test Plan
Test Result
pre-commit run --all-filesall passed (incl. workspace mypy). Unit tests: 444 passed (tests/server/+tests/shared/+ the SDK/CLI/echo cases above); the fulltests/suite (GPU worker paths) was not run this session. E2E on a single root + CPU worker: 14/14 — queued and completed workflows survive astack restart server, only the server container is recreated (Redis untouched), rehydrated work runs to DONE, and an in-flight task caught atDISPATCHEDsurvives the restart, is requeued (attempts0→1), and runs to DONE once a worker is back.Pre-submission Checklist
pre-commit run --all-filesand fixed any issues.uv run pytest tests/passes locally.uv sync --all-packages --group ci --frozen).[BREAKING]and described migration steps above.