Skip to content

Commit 72c8089

Browse files
Bound YamlNodeStore.set_nodes cancel-drain loop
The shield-cancel-drain loop holds the store lock; previously the loop absorbed cancels with no upper bound. A cancel-storm + a wedged worker thread (kernel I/O hang on NAS/NFS/encrypted volume) would spin here forever, wedging every other set_nodes caller on the same store process-wide. Cap absorbed cancels at _MAX_CANCEL_DRAIN_ITERS=64; when exceeded, cancel the inner task, give the asyncio side one bounded settle round, and raise a clear RuntimeError so the failure is visible rather than silent. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent fb470bf commit 72c8089

2 files changed

Lines changed: 195 additions & 0 deletions

File tree

src/dqliteclient/node_store.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,17 @@
2020

2121
logger = logging.getLogger(__name__)
2222

23+
# Upper bound on cancel-storm absorption inside
24+
# ``YamlNodeStore.set_nodes``'s shield-drain loop. The lock is held
25+
# across this loop; without a cap, a cancel-storm + a wedged worker
26+
# thread (kernel I/O hang) would spin here forever and wedge every
27+
# other ``set_nodes`` caller on the same store process-wide. The
28+
# small constant balances tolerating ordinary shutdown cancel
29+
# cascades (a handful of cancels) against bounding pathological
30+
# storms. Exceeding it surfaces a clear ``RuntimeError`` rather
31+
# than letting the store wedge silently.
32+
_MAX_CANCEL_DRAIN_ITERS = 64
33+
2334

2435
@final
2536
@dataclass(frozen=True, slots=True)
@@ -741,10 +752,54 @@ async def set_nodes(self, nodes: Sequence[NodeInfo]) -> None:
741752
# Loop on repeated cancel so a stubborn outer
742753
# cancel cascade still waits out the worker
743754
# thread, which asyncio cannot abort.
755+
#
756+
# Bound the absorption: a cancel-storm + a wedged
757+
# worker thread (kernel I/O hang, NFS mount lost,
758+
# encrypted volume sealed) would otherwise spin
759+
# here forever holding ``self._lock``, which wedges
760+
# every other ``set_nodes`` caller on the same store
761+
# process-wide. ``_MAX_CANCEL_DRAIN_ITERS`` caps the
762+
# absorbed cancels at a small constant. The worker
763+
# thread itself cannot be aborted from Python — so a
764+
# truly stuck fsync still loses the store; the cap
765+
# makes the failure visible (a clear
766+
# ``RuntimeError``) rather than silent.
767+
cancel_count = 0
744768
while not inner.done():
745769
try:
746770
await asyncio.shield(inner)
747771
except asyncio.CancelledError:
772+
cancel_count += 1
773+
if cancel_count > _MAX_CANCEL_DRAIN_ITERS:
774+
logger.warning(
775+
"set_nodes: cancel-drain budget exceeded "
776+
"(%d cancels absorbed); worker thread "
777+
"appears stuck. Releasing lock and "
778+
"surfacing RuntimeError.",
779+
cancel_count,
780+
)
781+
# Best-effort: ask asyncio to cancel the
782+
# inner task (no-op for a to_thread future
783+
# whose worker is mid-syscall, but
784+
# harmless), then give the asyncio side
785+
# one bounded settle round.
786+
# ``wait_for``'s ``timeout`` keeps the
787+
# cap's wall-clock honest — a wedged worker
788+
# thread would otherwise leave us parked
789+
# here on the settle-await forever and
790+
# defeat the cap. ``suppress`` swallows
791+
# ``TimeoutError`` / ``CancelledError`` so
792+
# this path always reaches the ``raise
793+
# RuntimeError`` below.
794+
inner.cancel()
795+
with contextlib.suppress(asyncio.CancelledError, Exception):
796+
await asyncio.wait_for(asyncio.shield(inner), timeout=0.5)
797+
raise RuntimeError(
798+
"YamlNodeStore.set_nodes: cancel-drain "
799+
f"budget exceeded after {cancel_count} "
800+
"cancels absorbed; worker thread is "
801+
"stuck on fsync."
802+
) from None
748803
continue
749804
except Exception:
750805
# Non-cancel inner failure (``OSError`` on a
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
"""Pin: ``YamlNodeStore.set_nodes``'s shield-cancel-drain loop bounds
2+
the number of absorbed cancels at ``_MAX_CANCEL_DRAIN_ITERS``.
3+
4+
Without the bound, a persistent cancel-storm + a wedged worker
5+
thread (kernel I/O hang, NFS mount lost, encrypted volume sealed)
6+
would spin here forever holding ``self._lock`` — wedging every
7+
other ``set_nodes`` caller on the same store process-wide. The
8+
cap surfaces a clear ``RuntimeError`` and releases the lock so
9+
the wedge is visible rather than silent.
10+
"""
11+
12+
from __future__ import annotations
13+
14+
import asyncio
15+
import os
16+
import threading
17+
from pathlib import Path
18+
19+
import pytest
20+
21+
from dqliteclient.node_store import (
22+
_MAX_CANCEL_DRAIN_ITERS,
23+
NodeInfo,
24+
YamlNodeStore,
25+
)
26+
from dqlitewire import NodeRole
27+
28+
29+
@pytest.mark.asyncio
30+
async def test_cancel_drain_loop_caps_at_max_iters_and_releases_lock(
31+
tmp_path: Path,
32+
monkeypatch: pytest.MonkeyPatch,
33+
) -> None:
34+
"""Drive a persistent cancel-storm against a parked worker. After
35+
``_MAX_CANCEL_DRAIN_ITERS`` cancels are absorbed, the loop must
36+
raise ``RuntimeError`` with a stuck-worker diagnostic AND release
37+
the lock so a subsequent ``set_nodes`` is not wedged.
38+
"""
39+
store = YamlNodeStore(tmp_path / "nodes.yaml")
40+
new_nodes = [
41+
NodeInfo(node_id=1, address="127.0.0.1:9001", role=NodeRole.VOTER),
42+
]
43+
44+
started = threading.Event()
45+
can_finish = threading.Event()
46+
real_replace = os.replace
47+
48+
def parked_replace(
49+
src: str | os.PathLike[str],
50+
dst: str | os.PathLike[str],
51+
) -> None:
52+
started.set()
53+
# Park until the test explicitly releases — this stands in for
54+
# a wedged worker thread (NAS pause, NFS mount lost, etc.).
55+
can_finish.wait(timeout=30.0)
56+
real_replace(src, dst)
57+
58+
monkeypatch.setattr("dqliteclient.node_store.os.replace", parked_replace)
59+
60+
task = asyncio.create_task(store.set_nodes(new_nodes))
61+
62+
# Wait for the worker thread to enter the parked replace.
63+
for _ in range(200):
64+
if started.is_set():
65+
break
66+
await asyncio.sleep(0.01)
67+
assert started.is_set(), "worker did not enter parked_replace"
68+
assert store._lock.locked(), "lock must be held while inner runs"
69+
70+
# Cancel-storm: well past the cap.
71+
for _ in range(_MAX_CANCEL_DRAIN_ITERS + 5):
72+
task.cancel()
73+
# Give the loop a chance to absorb each cancel.
74+
await asyncio.sleep(0)
75+
await asyncio.sleep(0)
76+
77+
# The cap fires: RuntimeError surfaces with the stuck-worker
78+
# diagnostic. The pre-fix behaviour would have spun absorbing
79+
# cancels forever.
80+
with pytest.raises(RuntimeError, match="cancel-drain budget exceeded"):
81+
await task
82+
83+
# Lock released so a subsequent set_nodes is not wedged.
84+
assert not store._lock.locked(), (
85+
"lock must be released after the cap fires — otherwise every "
86+
"subsequent set_nodes is wedged process-wide"
87+
)
88+
89+
# Release the parked worker so the orphaned to_thread future
90+
# finishes cleanly and the test does not hang the runner shutdown.
91+
can_finish.set()
92+
93+
94+
@pytest.mark.asyncio
95+
async def test_modest_cancel_burst_below_cap_still_completes_normally(
96+
tmp_path: Path,
97+
monkeypatch: pytest.MonkeyPatch,
98+
) -> None:
99+
"""A handful of cancels (well below the cap) must NOT trip the
100+
RuntimeError arm — the existing drain-loop semantics (wait for
101+
inner to finish, then re-raise the cancel) are preserved."""
102+
store = YamlNodeStore(tmp_path / "nodes.yaml")
103+
new_nodes = [
104+
NodeInfo(node_id=1, address="127.0.0.1:9001", role=NodeRole.VOTER),
105+
]
106+
107+
started = threading.Event()
108+
can_finish = threading.Event()
109+
real_replace = os.replace
110+
111+
def slow_replace(
112+
src: str | os.PathLike[str],
113+
dst: str | os.PathLike[str],
114+
) -> None:
115+
started.set()
116+
can_finish.wait(timeout=5.0)
117+
real_replace(src, dst)
118+
119+
monkeypatch.setattr("dqliteclient.node_store.os.replace", slow_replace)
120+
121+
task = asyncio.create_task(store.set_nodes(new_nodes))
122+
123+
for _ in range(200):
124+
if started.is_set():
125+
break
126+
await asyncio.sleep(0.01)
127+
assert started.is_set()
128+
129+
# Modest burst, well below the cap.
130+
for _ in range(3):
131+
task.cancel()
132+
await asyncio.sleep(0)
133+
134+
# Release the worker — the drain loop completes normally and
135+
# re-raises CancelledError (no RuntimeError, no cap trip).
136+
can_finish.set()
137+
with pytest.raises(asyncio.CancelledError):
138+
await task
139+
140+
assert not store._lock.locked()

0 commit comments

Comments
 (0)