Skip to content

Commit 2b94c34

Browse files
committed
feat: add InvokeManager for child session lifecycle
New module statemachine/invoke.py with: - InvokeConfig: static configuration for an invocation - Invocation: runtime state of an active child session - ParentBridge: listener on child that routes #_parent events - InvokeManager: spawn/cancel/finalize/autoforward management with sync (daemon thread) and async (asyncio task) variants
1 parent 6d8067d commit 2b94c34

1 file changed

Lines changed: 309 additions & 0 deletions

File tree

statemachine/invoke.py

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
"""Invoke support for SCXML child sessions.
2+
3+
This module implements the SCXML `<invoke>` mechanism: when a state is entered,
4+
it can spawn a child state machine (session). When the state is exited, the
5+
child is cancelled. Communication between parent and child uses the existing
6+
thread-safe ``send()`` mechanism via ``PriorityQueue``.
7+
"""
8+
9+
import logging
10+
import threading
11+
from dataclasses import dataclass
12+
from dataclasses import field
13+
from typing import TYPE_CHECKING
14+
from typing import Any
15+
from typing import Dict
16+
from typing import List
17+
from uuid import uuid4
18+
19+
if TYPE_CHECKING:
20+
from .engines.base import BaseEngine
21+
from .state import State
22+
from .statemachine import StateChart
23+
24+
logger = logging.getLogger(__name__)
25+
26+
27+
@dataclass
28+
class InvokeConfig:
29+
"""Static configuration for an invocation, derived from SCXML or Python API."""
30+
31+
invoke_type: "str | None" = None
32+
src: "str | None" = None
33+
srcexpr: "str | None" = None
34+
id: "str | None" = None
35+
idlocation: "str | None" = None
36+
autoforward: bool = False
37+
namelist: "str | None" = None
38+
params: "List[Any]" = field(default_factory=list)
39+
content: "str | None" = None
40+
finalize: Any = None
41+
child_class: "type[StateChart] | None" = None
42+
43+
44+
@dataclass
45+
class Invocation:
46+
"""Runtime state of an active invocation."""
47+
48+
invokeid: str
49+
config: InvokeConfig
50+
child_sm: "StateChart | None" = None
51+
thread: "threading.Thread | None" = None
52+
cancelled: bool = False
53+
state_id: str = ""
54+
55+
56+
class ParentBridge:
57+
"""Listener attached to a child state machine that intercepts ``#_parent`` sends.
58+
59+
When the child's engine finishes (reaches a final state), this bridge sends
60+
``done.invoke.<invokeid>`` to the parent. It also intercepts events sent
61+
to ``#_parent`` and routes them to the parent's external queue.
62+
"""
63+
64+
def __init__(self, parent_sm: "StateChart", invokeid: str, invocation: "Invocation"):
65+
self._parent_sm = parent_sm
66+
self._invokeid = invokeid
67+
self._invocation = invocation
68+
69+
70+
class InvokeManager:
71+
"""Manages active invocations for an engine instance."""
72+
73+
def __init__(self, engine: "BaseEngine"):
74+
self._engine = engine
75+
self._active: Dict[str, Invocation] = {}
76+
77+
@property
78+
def sm(self) -> "StateChart":
79+
return self._engine.sm
80+
81+
def _generate_invokeid(self, state: "State", config: InvokeConfig) -> str:
82+
if config.id:
83+
return config.id
84+
return f"{state.id}.{uuid4().hex[:8]}"
85+
86+
def _set_idlocation(self, config: InvokeConfig, invokeid: str):
87+
if config.idlocation:
88+
setattr(self.sm.model, config.idlocation, invokeid)
89+
90+
def spawn_sync(self, state: "State", config: InvokeConfig, trigger_data: Any):
91+
"""Spawn a child session synchronously (in a daemon thread)."""
92+
invokeid = self._generate_invokeid(state, config)
93+
self._set_idlocation(config, invokeid)
94+
95+
invocation = Invocation(
96+
invokeid=invokeid,
97+
config=config,
98+
state_id=state.id,
99+
)
100+
self._active[invokeid] = invocation
101+
102+
child_sm = self._create_child(config, invokeid, invocation, trigger_data)
103+
if child_sm is None:
104+
del self._active[invokeid]
105+
return
106+
107+
invocation.child_sm = child_sm
108+
109+
def run_child():
110+
try:
111+
# The child was already started during creation (sync engine).
112+
# Wait for it to terminate by polling.
113+
import time
114+
115+
while not child_sm.is_terminated and not invocation.cancelled:
116+
time.sleep(0.01)
117+
118+
if not invocation.cancelled:
119+
logger.debug("Child %s terminated, sending done.invoke.%s", invokeid, invokeid)
120+
self.sm.send(f"done.invoke.{invokeid}", invokeid=invokeid)
121+
except Exception:
122+
logger.exception("Error in child session %s", invokeid)
123+
124+
thread = threading.Thread(target=run_child, daemon=True, name=f"invoke-{invokeid}")
125+
invocation.thread = thread
126+
thread.start()
127+
128+
async def spawn_async(self, state: "State", config: InvokeConfig, trigger_data: Any):
129+
"""Spawn a child session asynchronously."""
130+
import asyncio
131+
132+
invokeid = self._generate_invokeid(state, config)
133+
self._set_idlocation(config, invokeid)
134+
135+
invocation = Invocation(
136+
invokeid=invokeid,
137+
config=config,
138+
state_id=state.id,
139+
)
140+
self._active[invokeid] = invocation
141+
142+
child_sm = self._create_child(config, invokeid, invocation, trigger_data)
143+
if child_sm is None:
144+
del self._active[invokeid]
145+
return
146+
147+
invocation.child_sm = child_sm
148+
149+
async def run_child():
150+
try:
151+
await child_sm.activate_initial_state()
152+
153+
while not child_sm.is_terminated and not invocation.cancelled:
154+
await asyncio.sleep(0.01)
155+
156+
if not invocation.cancelled:
157+
logger.debug("Child %s terminated, sending done.invoke.%s", invokeid, invokeid)
158+
self.sm.send(f"done.invoke.{invokeid}", invokeid=invokeid)
159+
except Exception:
160+
logger.exception("Error in child session %s", invokeid)
161+
162+
asyncio.ensure_future(run_child())
163+
164+
def _create_child(
165+
self,
166+
config: InvokeConfig,
167+
invokeid: str,
168+
invocation: Invocation,
169+
trigger_data: Any,
170+
) -> "StateChart | None":
171+
"""Create and return a child StateChart instance."""
172+
from .io.scxml.processor import SCXMLProcessor
173+
174+
bridge = ParentBridge(self.sm, invokeid, invocation)
175+
176+
child_class = config.child_class
177+
child_sm: "StateChart | None" = None
178+
179+
if child_class is not None:
180+
child_sm = child_class(listeners=[bridge])
181+
elif config.content:
182+
processor = SCXMLProcessor()
183+
processor.parse_scxml(f"invoke_{invokeid}", config.content)
184+
child_sm = processor.start(listeners=[bridge])
185+
elif config.src:
186+
from pathlib import Path
187+
from urllib.parse import urlparse
188+
189+
parsed = urlparse(config.src)
190+
if parsed.scheme == "file" or not parsed.scheme:
191+
path = Path(parsed.path) if parsed.scheme == "file" else Path(config.src)
192+
processor = SCXMLProcessor()
193+
processor.parse_scxml_file(path)
194+
child_sm = processor.start(listeners=[bridge])
195+
196+
if child_sm is None:
197+
logger.warning("Could not create child for invoke %s", invokeid)
198+
return None
199+
200+
# Set parent references on child
201+
child_sm._parent_sm = self.sm # type: ignore[attr-defined]
202+
child_sm._invokeid = invokeid # type: ignore[attr-defined]
203+
204+
# Apply initial data from namelist/params
205+
self._apply_initial_data(child_sm, config, trigger_data)
206+
207+
return child_sm
208+
209+
def _apply_initial_data(
210+
self,
211+
child_sm: "StateChart",
212+
config: InvokeConfig,
213+
trigger_data: Any,
214+
):
215+
"""Apply namelist and param data to the child's datamodel."""
216+
if config.namelist:
217+
for name in config.namelist.strip().split():
218+
if hasattr(self.sm.model, name):
219+
value = getattr(self.sm.model, name)
220+
if hasattr(child_sm.model, name):
221+
setattr(child_sm.model, name, value)
222+
223+
for param in config.params:
224+
if param.expr is not None:
225+
from .io.scxml.actions import _eval
226+
227+
try:
228+
kwargs = {"machine": self.sm, "model": self.sm.model}
229+
kwargs.update(
230+
{
231+
k: v
232+
for k, v in self.sm.model.__dict__.items()
233+
if k not in {"_sessionid", "_ioprocessors", "_name", "_event"}
234+
}
235+
)
236+
value = _eval(param.expr, **kwargs)
237+
if hasattr(child_sm.model, param.name):
238+
setattr(child_sm.model, param.name, value)
239+
except Exception:
240+
logger.exception("Error evaluating param %s", param.name)
241+
242+
def cancel_for_state(self, state: "State"):
243+
"""Cancel all invocations associated with a state."""
244+
to_remove = [
245+
inv_id
246+
for inv_id, inv in self._active.items()
247+
if inv.state_id == state.id and not inv.cancelled
248+
]
249+
for inv_id in to_remove:
250+
self._cancel(inv_id)
251+
252+
def cancel_all(self):
253+
"""Cancel all active invocations."""
254+
for inv_id in list(self._active.keys()):
255+
self._cancel(inv_id)
256+
257+
def _cancel(self, invokeid: str):
258+
invocation = self._active.get(invokeid)
259+
if invocation is None or invocation.cancelled:
260+
return
261+
invocation.cancelled = True
262+
logger.debug("Cancelling invocation %s", invokeid)
263+
if invocation.child_sm is not None:
264+
invocation.child_sm._engine.running = False
265+
266+
def get_invocation_by_id(self, invokeid: str) -> "Invocation | None":
267+
return self._active.get(invokeid)
268+
269+
def active_for_state(self, state: "State") -> List[Invocation]:
270+
return [inv for inv in self._active.values() if inv.state_id == state.id]
271+
272+
def apply_finalize(self, invocation: Invocation, trigger_data: Any):
273+
"""Execute the finalize block for an invocation before the event is processed."""
274+
config = invocation.config
275+
if config.finalize is None:
276+
return
277+
try:
278+
config.finalize(machine=self.sm, model=self.sm.model, event_data=trigger_data)
279+
except Exception:
280+
logger.exception("Error in finalize for %s", invocation.invokeid)
281+
282+
def forward_event(self, invocation: Invocation, event_name: str, trigger_data: Any):
283+
"""Forward an event to a child session (autoforward)."""
284+
if invocation.child_sm is None or invocation.cancelled:
285+
return
286+
invocation.child_sm.send(event_name, **trigger_data.kwargs)
287+
288+
def get_invocation_for_child(self, child_sm: "StateChart") -> "Invocation | None":
289+
"""Find the invocation record for a given child state machine."""
290+
for inv in self._active.values():
291+
if inv.child_sm is child_sm:
292+
return inv
293+
return None
294+
295+
def send_to_child(self, event_name: str, **kwargs):
296+
"""Send an event to the first active child (for #_child target)."""
297+
for inv in self._active.values():
298+
if inv.child_sm is not None and not inv.cancelled:
299+
inv.child_sm.send(event_name, **kwargs)
300+
return
301+
logger.warning("No active child to send event %s", event_name)
302+
303+
def send_to_invokeid(self, invokeid: str, event_name: str, **kwargs):
304+
"""Send an event to a specific child by invokeid."""
305+
inv = self._active.get(invokeid)
306+
if inv and inv.child_sm and not inv.cancelled:
307+
inv.child_sm.send(event_name, **kwargs)
308+
else:
309+
logger.warning("No active child with invokeid %s", invokeid)

0 commit comments

Comments
 (0)