Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/stack/src/flowmesh_cli_stack/assets/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ TASK_NO_WORKER_GRACE_SEC=60
ENABLE_WORKER_WATCHDOG=true
WORKER_DEATH_CHECK_INTERVAL=30
WORKER_DEATH_GRACE_SEC=60
WORKER_REHYDRATION_GRACE_SEC=120

# ==== Server Heartbeat ====
SERVER_HEARTBEAT_INTERVAL=30
Expand Down
6 changes: 6 additions & 0 deletions cli/stack/src/flowmesh_cli_stack/env_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,12 @@
var_type=EnvVarType.INT,
min_value=0,
),
EnvVar(
"WORKER_REHYDRATION_GRACE_SEC",
"120",
var_type=EnvVarType.INT,
min_value=0,
),
],
),
EnvSection(
Expand Down
77 changes: 65 additions & 12 deletions cli/stack/src/flowmesh_cli_stack/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ def _drain_workers(env_file: Path) -> None:
"""Destroy all dynamically spawned workers before stopping the server."""
try:
client = stack_node_client(env_file, base_url=None, token=None)
client.destroy_all_workers()
client.drain_workers()
except Exception as exc:
logging.warning(f"Unable to drain workers; continuing shutdown. {exc}")

Expand All @@ -531,33 +531,86 @@ def down(
logging.success("FlowMesh stack stopped.")


STACK_SERVICES = ("server", "redis_control", "redis_telemetry")
"""Compose services that can be restarted individually."""

WORKER_MANAGING_SERVICES = ("server",)
"""Services whose restart tears down the supervisor; drain workers first."""


@app.command()
def restart(
service: str | None = typer.Argument(
None,
help=(
"Restart only this compose service in place (one of: "
f"{', '.join(STACK_SERVICES)}), leaving the rest of the stack "
"running. Omit to restart the whole stack."
),
),
env_file: Path = typer.Option(
DEFAULT_ENV_FILE, "--env-file", help="Env file for compose"
),
image_tag: str | None = typer.Option(
None, "--image-tag", help="Override FLOWMESH_VERSION"
),
pull: bool = typer.Option(
True, "--pull/--no-pull", help="Pull the target image before recreating."
),
) -> None:
"""Drain workers and restart the stack."""
logging.info("Draining workers...")
_drain_workers(env_file)
_compose(
["down"],
env_file=env_file,
env=image_env_overrides(image_tag),
profile="root",
)
"""Drain workers and restart the stack, or recreate a single service in place.

With a SERVICE argument the stack is left running and only that service is
recreated (``--no-deps --force-recreate``); when the service manages workers
(the server / supervisor) its workers are drained first so their in-flight
tasks requeue onto other nodes. This is the per-node primitive for a rolling
image update: recreate the server on each node in turn, leaving Redis up.
"""
if service is None:
logging.info("Draining workers...")
_drain_workers(env_file)
_compose(
["down"],
env_file=env_file,
env=image_env_overrides(image_tag),
profile="root",
)
profile = "root" if _node_role(env_file) == NodeRole.ROOT else None
_compose(
["up", "-d", "--wait"],
env_file=env_file,
env=image_env_overrides(image_tag),
to_deploy=True,
profile=profile,
)
logging.success("FlowMesh stack is up.")
return

if service not in STACK_SERVICES:
logging.error(
f"Unknown service {service!r}. "
f"Known services: {', '.join(STACK_SERVICES)}."
)
raise typer.Exit(code=1)

if service in WORKER_MANAGING_SERVICES:
logging.info("Draining workers...")
_drain_workers(env_file)

profile = "root" if _node_role(env_file) == NodeRole.ROOT else None
up_args = ["up", "-d", "--no-deps", "--force-recreate", "--wait"]
if pull:
up_args += ["--pull", "always"]
up_args.append(service)
logging.info(f"Recreating service '{service}'...")
_compose(
["up", "-d", "--wait"],
up_args,
env_file=env_file,
env=image_env_overrides(image_tag),
to_deploy=True,
profile=profile,
)
logging.success("FlowMesh stack is up.")
logging.success(f"Service '{service}' restarted.")


@app.command()
Expand Down
12 changes: 12 additions & 0 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ scripts/dev/ compile_protos, sync_requirements, check_env_examples
`LOG_STREAM_MAXLEN_TASK` / `LOG_STREAM_MAXLEN_WORKFLOW` and
expired `LOG_STREAM_TTL_SEC` after close.

## Rolling image updates

Nodes can be updated to a new image one at a time without a full cluster
teardown. The dispatcher's scheduling state is persisted to Redis on every
task transition and rebuilt on startup (`TaskRuntime.rehydrate`), and task
lifecycle events flow through a durable, replayable Redis stream consumed from
a persisted cursor — so a restarted root resumes in-flight workflows instead of
losing them. Rehydration runs in the ASGI lifespan before it yields, making
readiness implicit. The per-node primitive is `flowmesh stack restart server`,
which drains managed workers then recreates only the server service. See
[`ROLLING_UPDATES.md`](ROLLING_UPDATES.md).

## Plugin extension points

Server extension points are loaded via the `FLOWMESH_PLUGINS` env var.
Expand Down
19 changes: 19 additions & 0 deletions docs/CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,25 @@ server connects to the root node's Redis via `REDIS_CONTROL_URL` and
`REDIS_TELEMETRY_URL`, which must be set in the worker's `.env` to reachable
endpoints on the root node.

## Rolling image updates

To update a running cluster's images one node at a time without tearing the
whole stack down, recreate a single Compose service in place:

```bash
flowmesh stack restart server # drain workers, recreate the server in place
flowmesh stack restart server --no-pull # recreate without pulling a new image
flowmesh stack restart # whole-stack drain + down + up (no service arg)
```

With a `SERVICE` argument (`server`, `redis_control`, `redis_telemetry`) only
that service is recreated (`--no-deps --force-recreate`), leaving the rest of
the stack — including Redis — running. When the service manages workers (the
server / supervisor), its workers are drained first so their in-flight tasks
requeue onto other nodes. Recreate the server on each node in turn (root last)
to roll a new image across the cluster; the server's healthcheck gates
`--wait` until it is back and ready.

After changing executor code, rebuild the affected image before bringing
the stack back up — running containers don't pick up source changes:

Expand Down
1 change: 1 addition & 0 deletions docs/ENV.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ listed here is in `.env.example`.
| `TASK_NO_WORKER_GRACE_SEC` | `60` | Grace before failing a task no worker can satisfy |
| `ENABLE_WORKER_WATCHDOG` | `true` | Worker death detection |
| `WORKER_DEATH_GRACE_SEC` | `60` | Grace period before marking dead |
| `WORKER_REHYDRATION_GRACE_SEC` | `120` | Extra grace for a worker's rehydrated in-flight tasks after a root restart, before the watchdog may reclaim them |
| `FLOWMESH_PLUGINS` | – | Comma-separated plugin module names |
| `FLOWMESH_PLUGIN_DATA_DIR` | `./plugin-data` | Writable mount at `/app/plugin-data` for plugin state. A path -> host bind-mount (auto-created); a bare name -> external Docker volume of that name. |
| `SERVER_CUDA_PROBE_IMAGE` | `nvidia/cuda:12.9.1-base-ubuntu24.04` | CUDA image the server runs briefly to query local GPU names/indices |
Expand Down
2 changes: 1 addition & 1 deletion docs/EXECUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ The worker resolves `spec.taskType` against an executor registry in

| `taskType` | Executor | Use case |
|-----------|----------|----------|
| `echo` | `EchoExecutor` | Echo input back as result (smoke tests) |
| `echo` | `EchoExecutor` | Echo input back as result (smoke tests). `spec.data.delay_sec` holds the task in flight for N seconds, useful for exercising dispatch/recovery paths |
| `inference` | `VLLMExecutor` / `TransformersExecutor` | LLM inference |
| `diffusion` | `DiffusersExecutor` | Image / video diffusion models |
| `omni_text2{audio,image,speech,general}` | `Omni*Executor` | Multimodal generation |
Expand Down
99 changes: 99 additions & 0 deletions docs/ROLLING_UPDATES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Rolling image updates

FlowMesh nodes can be updated to a new image one at a time without tearing the
whole cluster down. The rollout itself is driven externally — by an operator or
a cluster-management tool — and FlowMesh provides the primitives that make each
node safe to recreate and the root node able to resume in-flight work.

## Operator flow

Recreate one node at a time, leaving the others serving. Update the **root node
last**.

```bash
# On each node host, in turn:
flowmesh stack restart server --image-tag <new-version>
```

`flowmesh stack restart server` (see [`CLI.md`](CLI.md)):

1. Drains the node's managed workers, so their in-flight tasks are released and
requeued onto other nodes.
2. Recreates only the `server` Compose service (`--no-deps --force-recreate`),
pulling the new image. Redis and any other services keep running.
3. Blocks (`--wait`) until the recreated server passes its healthcheck.

Because the worker-managing process is the `server` service on both root and
worker nodes, the same command works everywhere.

## What happens to in-flight work

**Worker nodes.** Draining a node tears down its workers. Each worker's
departure produces a `WORKER_UNREGISTER` (the supervisor synthesizes one if the
worker did not send it), so the server recovers the worker's `DISPATCHED` tasks
and requeues them onto other eligible nodes. A recreated node's supervisor
re-creates its configured workers, which re-register themselves on startup. No
cordon step is required.

**Root node.** The root holds the dispatcher's scheduling state in memory, so a
naive restart would lose every in-flight workflow. Two mechanisms make a root
restart safe:

- **Durable scheduler state.** Each task's mutable state (status, attempts,
assigned worker, failed workers, merge linkage), its dependency edges, and its
epoch index are persisted to Redis on every transition, along with per-workflow
epoch ordering and frontier. On startup the server rebuilds the full task DAG,
ready queue, and epoch frontiers from these records
(`TaskRuntime.rehydrate`).
- **Replayable task events.** Task lifecycle events flow through a durable Redis
stream consumed from a persisted cursor. The ordering is what makes replay
safe: a transition is written to durable scheduler state *before* its event is
emitted, and the consumer advances the cursor only *after* it has handled an
entry. Delivery is therefore at-least-once — a crash between handling an entry
and persisting the cursor simply replays that entry on the next startup.
Handlers are idempotent (a terminal task ignores late dispatch / start /
update events, and a repeated completion is dropped), so replay cannot
double-apply. Completions that occur while the root is down are replayed on
startup rather than dropped. In-flight tasks are left assigned to their worker
— surviving workers' completions arrive via the stream, and workers that
genuinely departed are reclaimed by the watchdog.
- **Heartbeat grace for rehydrated work.** Worker heartbeats are dropped while
the root is down, so a surviving worker briefly looks stale once the root is
back. The watchdog gives any worker that owns rehydrated in-flight tasks an
extended grace (`WORKER_REHYDRATION_GRACE_SEC`, default 120s) before it may
reclaim those tasks, so a worker that is merely catching up is not mistaken
for a dead one and its tasks are not needlessly requeued.

Rehydration runs inside the ASGI lifespan **before it yields**, so the server
does not accept traffic (and its healthcheck does not pass) until scheduling
state is fully restored. Readiness is therefore implicit — no separate probe is
needed, and `stack restart`'s `--wait` blocks until the node is genuinely ready.

The result is a brief control-plane pause on the root (the server container
recreate plus rehydration) during which workers keep running their tasks; no
workflow is lost.

## Constraints

- **Recreate only the `server` service on the root.** Leave `redis_control` and
`redis_telemetry` running so durable state and the event stream survive.
Updating the Redis image is a heavier, control-plane-wide outage and is out of
scope for a brief-pause rolling update.
- **Co-located root workers are recreated.** Workers running on the root host die
with the root's supervisor; their tasks requeue and re-run. To avoid this,
prefer not to run workers on the root node.
- **The no-worker grace restarts on a root restart.** The window before a task
that no worker can satisfy is failed (`TASK_NO_WORKER_GRACE_SEC`) is tracked
with ephemeral scheduler state that is intentionally not persisted, so it
starts fresh after a restart. This is deliberate: the restart is itself a
disruption, and a fresh window avoids grace-failing a waiting task the instant
the control plane comes back.

## State lifetime

Cluster state (workflows, durable scheduler records, the task-event stream)
lives as long as the Redis volumes — **not** the server process. Stopping or
recreating the server never clears it, which is what lets a restart resume
in-flight work; a plain `stack down` / `stack up` likewise resumes the queue.
To reset to a clean slate, remove the Redis volumes with `flowmesh stack clean`
(`down -v`).
10 changes: 10 additions & 0 deletions sdk/stack/src/flowmesh_stack/node_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ def destroy_all_workers(self, *, ignore_unreachable: bool = False) -> bool:
return False
return True

def drain_workers(self, *, ignore_unreachable: bool = False) -> bool:
"""Drain the node's managed workers ahead of a service restart.

Destroys every worker the node manages so their in-flight tasks are
released (``WORKER_UNREGISTER`` → requeue) before the worker-managing
service is recreated. Returns ``True`` on success, ``False`` when
``ignore_unreachable=True`` and the server was unreachable.
"""
return self.destroy_all_workers(ignore_unreachable=ignore_unreachable)

def worker_names(self) -> list[str]:
"""Return a list of all worker names."""
data = self.list_workers()
Expand Down
12 changes: 11 additions & 1 deletion src/server/clients/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from redis.connection import SSLConnection as SyncSSLConnection
from redis.typing import EncodableT

TASK_EVENT_CHANNEL = "tasks:events"
TASK_EVENT_STREAM_KEY = "tasks:events:stream"
TASK_EVENT_CURSOR_KEY = "tasks:events:cursor"
TASK_EVENT_STREAM_MAXLEN = 100_000

WORKFLOWS_SET_KEY = "workflows:ids"

Expand Down Expand Up @@ -50,6 +52,14 @@ def workflow_cancelled_tasks_key(workflow_id: str) -> str:
return f"workflow:{workflow_id}:cancelled_tasks"


def workflow_sched_key(workflow_id: str) -> str:
return f"workflow:{workflow_id}:sched"


def task_state_key(task_id: str) -> str:
return f"task:{task_id}:state"


def worker_key(worker_id: str) -> str:
return f"worker:{worker_id}"

Expand Down
4 changes: 4 additions & 0 deletions src/server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,17 @@ class WatchdogConfig:
enabled: bool = True
check_interval: int = 30
grace_sec: int = 60
rehydration_grace_sec: int = 120

@classmethod
def from_env(cls) -> "WatchdogConfig":
return cls(
enabled=parse_bool_env("ENABLE_WORKER_WATCHDOG", True),
check_interval=max(5, parse_int_env("WORKER_DEATH_CHECK_INTERVAL", 30)),
grace_sec=max(0, parse_int_env("WORKER_DEATH_GRACE_SEC", 60)),
rehydration_grace_sec=max(
0, parse_int_env("WORKER_REHYDRATION_GRACE_SEC", 120)
),
)


Expand Down
Loading