Skip to content

feat: persist and rehydrate scheduling state across root restarts#64

Draft
kaiitunnz wants to merge 15 commits into
mainfrom
feat/rolling-node-restart
Draft

feat: persist and rehydrate scheduling state across root restarts#64
kaiitunnz wants to merge 15 commits into
mainfrom
feat/rolling-node-restart

Conversation

@kaiitunnz
Copy link
Copy Markdown
Collaborator

@kaiitunnz kaiitunnz commented Jun 1, 2026

Purpose

The root holds the dispatcher's scheduling state in memory and task events flowed over fire-and-forget pub/sub, so any restart of the root server — a crash, a deploy, an image bump — lost every in-flight workflow. This makes the root restart-survivable: per-task scheduling state is persisted to Redis on each transition and rebuilt on startup, and task lifecycle events flow through a durable, replayable stream consumed from a persisted cursor. The cost is a brief control-plane pause while the server recreates and rehydrates; workers keep running their tasks throughout.

Rolling node image updates — recreate one node at a time, root last, with no full-cluster teardown — fall out as one application of this, via a per-node stack restart primitive. The rollout itself stays operator-driven.

Changes

  • Durable scheduler state (task/runtime.py, registries/workflow.py, clients/redis.py) — persist per-task state, deps, and epoch index to Redis on each transition; TaskRuntime.rehydrate() rebuilds the DAG / ready queue / epoch frontiers on startup.
  • Replayable task events (supervisor/services/relay_service.py, services/watchdog.py, services/monitoring.py) — task events move to a durable Redis stream consumed from a persisted cursor (advanced per entry); mark_succeeded / mark_failed are idempotent against replay.
  • No Redis flush on shutdown (main.py) — state lifetime follows the Redis volume, not the process, so a restart rehydrates instead of starting empty; reset via stack clean. Removes the dead cleanup.py.
  • Lifespan hydration (main.py) — rehydrate before the lifespan yields, so readiness is implicit (no traffic until scheduling state is restored).
  • Restart hardening — terminal-status guards on mark_started / mark_dispatched / mark_updated so a replayed stale event can't regress a terminal task or double-count usage; heartbeat grace for rehydrated in-flight tasks (WORKER_REHYDRATION_GRACE_SEC, default 120s) so a worker catching up after a restart isn't reaped (heartbeats are pub/sub and drop during downtime); trim-horizon detection logs (instead of silently dropping) when the cursor falls behind a bounded stream; the event consumer survives a poison event (logs and skips) instead of the handler exception killing the thread.
  • Per-node restart primitivestack restart [SERVICE] (cli/stack/.../stack.py) recreates one Compose service in place (--no-deps --force-recreate, optional --pull), leaving Redis up and draining managed workers first; NodeClient.drain_workers() (sdk/stack/.../node_client.py) exposes the drain over destroy_all_workers().
  • echo delay_sec (worker/executors/echo_executor.py) — holds a task in flight for testing dispatch/recovery paths.
  • Docs + testsdocs/ROLLING_UPDATES.md, ARCHITECTURE.md, CLI.md, ENV.md, EXECUTORS.md; persistence/rehydration, terminal-guard, rehydration-grace, trim-detection, consumer-resilience, echo-delay, drain_workers, and stack restart tests.

Design

State is rebuilt, not snapshotted: task IDs are random per parse, so the DAG is reconstructed from persisted per-task records and overlaid with status. Events use a stream + persisted cursor (like the existing log streams) rather than consumer groups; the persist-transition → advance-cursor ordering gives at-least-once delivery, made safe by idempotent handlers and terminal-status guards. In-flight tasks stay assigned on rehydrate — survivors' completions arrive via the stream (no double execution), and departed workers are reclaimed by the watchdog only after the rehydration grace elapses (heartbeats drop during downtime, so a survivor looks briefly stale on resume). Only the server is recreated on a node; Redis stays up so durable state survives.

Test Plan

uv run pre-commit run --all-files
uv run pytest tests/server/ tests/shared/ tests/sdk/test_node_client.py tests/cli/test_stack_restart.py tests/worker/test_echo_delay.py
bash tmp/e2e_tests/rolling_node_restart/run.sh

Test Result

pre-commit run --all-files all passed (incl. workspace mypy). Unit tests: 444 passed (tests/server/ + tests/shared/ + the SDK/CLI/echo cases above); the full tests/ suite (GPU worker paths) was not run this session. E2E on a single root + CPU worker: 14/14 — queued and completed workflows survive a stack restart server, only the server container is recreated (Redis untouched), rehydrated work runs to DONE, and an in-flight task caught at DISPATCHED survives the restart, is requeued (attempts 0→1), and runs to DONE once a worker is back.


Pre-submission Checklist
  • I have read the contribution guidelines.
  • I have run pre-commit run --all-files and fixed any issues.
  • I have added or updated tests covering my changes (if applicable).
  • I have verified that uv run pytest tests/ passes locally.
  • If I changed shared schemas or proto definitions, I have checked downstream compatibility across Server and Worker.
  • If I changed the SDK or CLI, I have verified the affected packages work (uv sync --all-packages --group ci --frozen).
  • If this is a breaking change, I have prefixed the PR title with [BREAKING] and described migration steps above.
  • I have updated documentation or config examples if user-facing behavior changed.

kaiitunnz and others added 7 commits June 1, 2026 15:49
First-class drain primitive over destroy_all_workers() so the rolling-restart
flow (release in-flight tasks before recreating the worker-managing service)
is part of the SDK surface.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
…t [SERVICE]`

`flowmesh stack restart server` drains managed workers then recreates only the
server/supervisor (--no-deps --force-recreate), leaving Redis and the rest of
the stack running. This is the per-node primitive for a rolling image update:
recreate the server on each node in turn (root last). No service arg keeps the
whole-stack drain/down/up behavior.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The dispatcher's task DAG, ready queue, and epoch state lived only in the root
server's memory, so a restart lost all in-flight scheduling. Persist each
task's mutable state (status, attempts, assigned worker, failed workers, merge
linkage) plus its dependency edges and epoch index to Redis on every
transition, and persist per-workflow epoch ordering / frontier.

TaskRuntime.rehydrate() rebuilds every live workflow from these durable records
on startup: it restores the DAG, re-derives pending deps and the ready queue,
and reconstructs epoch frontiers. In-flight tasks stay assigned to their
worker — completions that occur during the restart are replayed via the task
event stream, and departed workers are reclaimed by the watchdog.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Task lifecycle events flowed over Redis pub/sub, so any TASK_SUCCEEDED /
TASK_FAILED emitted while the root server was down was dropped — the dispatcher
never learned the outcome. Publish them to a durable Redis stream instead
(supervisor relay and watchdog producers), and consume via a blocking stream
read that resumes from a persisted cursor, so events emitted during a rolling
restart are replayed on startup.

mark_succeeded / mark_failed now short-circuit on an already-terminal task so a
replayed event can't double-count usage or re-cascade dependents.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Rebuild the scheduler from durable records before the dispatch loop and event
consumer start, so a restarted root resumes in-flight workflows. Because this
runs inside the ASGI lifespan before it yields, the server does not accept
traffic (and its healthcheck does not pass) until rehydration completes —
readiness is implicit, no separate probe needed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Add docs/ROLLING_UPDATES.md (operator flow, root survivability, constraints)
and an ARCHITECTURE.md pointer. Tests cover scheduler-state persistence and
rehydration (completed/ready restore, in-flight tasks staying dispatched, epoch
frontier restore, replay idempotency), the SDK drain_workers() call, and the
service-scoped `stack restart` (drain-then-recreate, --no-pull, redis skips
drain, unknown-service guard). Extend the runtime test doubles with the new
persistence methods.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The server flushed the entire Redis DB on shutdown (atexit + lifespan stop),
which wiped the durable scheduler state and event stream a restart rehydrates
from — so a rolling image update silently dropped every in-flight workflow.
State lifetime belongs to the Redis volume, not the server process: stopping or
recreating the server now preserves it, and a planned down/up resumes the queue.
Reset to a clean slate via `flowmesh stack clean` (removes the Redis volumes).

Removes the now-unused clear_redis_state cleanup module.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
@timzsu
Copy link
Copy Markdown
Collaborator

timzsu commented Jun 1, 2026

@kaiitunnz The purpose of this PR looks sound, but I think the restart contract needs a bit more clarification before we rely on it operationally. It would be helpful to include an e2e pipeline that exercises the intended restart order and demonstrates that queued, dispatched, and in-flight tasks preserve state correctly.

A few design questions I’d like to clarify:

  • When rolling workers, do we only restart idle workers, or do we restart all workers regardless of whether they are running tasks? If in-flight workers can be restarted, what is the recovery contract for their active tasks?
  • The description says “workers keep running throughout,” but also says the server restart “drains workers first.” Are these referring to different restart modes or stages?
  • What happens if a worker becomes unreachable while the root/server is restarting? What if it reconnects after the root has rehydrated, or while that worker’s own node is being updated?
  • What is the ordering/atomicity contract for Redis replay? For example, do we persist the task transition before advancing the stream cursor, and are all replayed transitions idempotent if the server crashes between those operations

@kaiitunnz
Copy link
Copy Markdown
Collaborator Author

kaiitunnz commented Jun 3, 2026

@timzsu This is an initial work on decoupling cluster states from the root server so that they can outlive root restarts, so this PR will cover only basic features. New features or advanced restart semantics can be added in future PRs. Below is my reply to your concerns.

  • There are three cases for container version updating:

    • Both server and workers: The server and all directly managed workers are torn down and only the server and workers in the worker config file are started up. Other workers must be created and started manually by the operator.
    • Workers only: Individual workers to be updated are torn down and started with "version: " in the worker config.
    • Server only: The server and all directly managed workers are torn down, and only the server and workers in the worker config file are started up. However, the restarted workers will inherit the new version from the server if not manually specified in the worker config file. So, to use the old worker version, the operator must tear down and start the workers with a manually configured version.

    In all cases, the workers are torn down immediately, regardless of their statuses. The in-flight tasks will be considered failed and handled according to the retrying logic in the server, consuming the retry budget. We may make it not consume the retry budget in a follow-up PR.

  • On a cluster, there are two kinds of servers: root and worker. To update the versions, we need to restart each individual server, so the workers directly managed by the server must be restarted. The statement that “workers keep running throughout" is from the perspective of the root restart. Previously, when we restart the root, we need to tear down the entire cluster, including all the workers. This PR decouples the root from the cluster states, allowing the worker servers' workers to run independently. Workers directly managed by the root server still need to be torn down. This works because restarting only the server leaves the Redis containers up, so the independent workers' events and heartbeats still reach the root's Redis.

  • The normal node restarting logic is to first drain the workers and then restart the server. Intermittently disconnected workers will try to connect to the server's supervisor gRPC endpoint, but once the supervisor has restarted they will not be able to authenticate (as we mint a new token per worker startup, held in the supervisor's in-memory registry) and will be marked stale.

  • Task events are appended to a durable Redis stream, and the consumer persists the task transition before advancing the stream cursor. So if the server crashes in between, the event is replayed (at-least-once delivery). mark_succeeded and mark_failed are idempotent — they short-circuit on an already-terminal task. However, mark_started and mark_dispatched are not yet guarded, so a replayed stale event could regress a terminal task and double-count usage. The transition, the workflow-set update, and the cursor are also separate Redis operations (and the stream is on a different Redis instance from the cursor), so there is no cross-operation atomicity; consistency relies on idempotent replay. We will add terminal-status guards to mark_started/mark_dispatched, advance the cursor per entry, and document this contract.

On the e2e pipeline, I have built my own e2e test suites and use them locally for my recent PRs. For this PR, the test covers queued and completed tasks across a server restart. It will be extended to exercise dispatched and in-flight tasks and the intended restart order. I will push the e2e test in this PR, but we have to work on standardizing e2e regression tests in a future PR.

kaiitunnz added 5 commits June 3, 2026 15:09
A replayed or late TASK_DISPATCHED / TASK_STARTED / progress update must
not regress a task that already reached DONE/FAILED/CANCELLED. Without the
guard, replay after a root restart could move a terminal task back to
DISPATCHED and double-count its usage.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
…contract

Persist the stream cursor after each handled entry rather than once per
batch, shrinking the window of events replayed after a crash. Spell out the
at-least-once replay contract (persist transition → emit → handle → advance
cursor; idempotent handlers) in the consumer docstring and the rolling-update
doc.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
…restart

Worker heartbeats are pub/sub and dropped while the root is down, so a
surviving worker looks briefly stale once the root restarts. The watchdog
would then reap its rehydrated in-flight tasks and needlessly requeue them.
Track tasks rehydrated as DISPATCHED/CANCELLING and extend the watchdog's
death grace for the owning worker until WORKER_REHYDRATION_GRACE_SEC
(default 120s) has elapsed since rehydration, giving the worker time to
re-register first.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The task-event stream is length-bounded, so a consumer that stays down long
enough for its cursor to fall behind the trim horizon silently loses the
events in between. On resume, compare the persisted cursor against the
stream's oldest surviving entry and log a warning when entries were trimmed
before they were consumed.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The echo executor now honors spec.data.delay_sec and sleeps for that many
seconds before producing its result, keeping the task RUNNING long enough to
exercise dispatch and recovery paths (e.g. restarting the server mid-task).

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
@timzsu
Copy link
Copy Markdown
Collaborator

timzsu commented Jun 3, 2026

@kaiitunnz Thanks for the clarification, which looks good to me. May you clarify what the worker's server mean? Is it a RunMesh term?

@kaiitunnz
Copy link
Copy Markdown
Collaborator Author

@timzsu Worker server is a FlowMesh term. A FlowMesh cluster consists of two kinds of nodes: a root node and a worker node. The root node holds the cluster states and the Redis instances, while the worker nodes only manage the workers. A worker server is just a server in a worker node.

kaiitunnz added 3 commits June 3, 2026 19:00
A handler exception previously propagated out of the consumer loop and killed
the thread, silently stopping every later completion from being processed — a
single poison event could wedge the whole control plane. Process each batch in
a helper that logs and skips a failing entry (advancing the cursor past it) so
the stream keeps flowing; a task left stuck by the skip is reclaimed by the
watchdog.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Remove the dead TASK_EVENT_CHANNEL constant (all producers/consumers use the
stream); document why rehydrate subtracts only completed deps, not failed ones;
trim the startup comment to the non-obvious ordering rationale; drop a stale
noqa on test stubs that are no longer over-length.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The no-worker grace window is ephemeral scheduler state that is not persisted,
so it resets on a root restart by design.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
@kaiitunnz kaiitunnz changed the title feat: roll node images one at a time with restart-survivable root feat: persist and rehydrate scheduling state across root restarts Jun 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants