Skip to content

Commit 0780e4f

Browse files
committed
feat: integrate invoke spawn/cancel into engines and event routing
- Add InvokeManager and states_to_invoke to BaseEngine - Track states with invocations in _enter_states(), cancel in _exit_states() - Spawn child sessions in sync (daemon thread) and async (asyncio task) engines - Implement finalize and autoforward for invoke events - Add invokeid field to TriggerData for cross-session event tracking - Implement #_parent, #_child, and #_<invokeid> send targets - Convert InvokeDefinition to InvokeConfig in SCXML processor - Add State(invoke=...) parameter for Python API
1 parent 2b94c34 commit 0780e4f

11 files changed

Lines changed: 213 additions & 44 deletions

File tree

statemachine/engines/async_.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,13 @@ async def _exit_states( # type: ignore[override]
173173
for info in ordered_states:
174174
args, kwargs = await self._get_args_kwargs(info.transition, trigger_data)
175175

176+
# Cancel invocations for states being exited
177+
if info.state is not None and getattr(info.state, "invocations", None):
178+
self.invoke_manager.cancel_for_state(info.state)
179+
180+
# Remove from states_to_invoke if we exit before invoking
181+
self.states_to_invoke.discard(info.state)
182+
176183
if info.state is not None: # pragma: no branch
177184
await self.sm._callbacks.async_call(
178185
info.state.exit.key, *args, on_error=on_error, **kwargs
@@ -358,6 +365,15 @@ async def processing_loop( # noqa: C901
358365
took_events = True
359366
await self._run_microstep(enabled_transitions, internal_event)
360367

368+
# Spawn invocations for states entered during this macrostep
369+
for state in sorted(
370+
self.states_to_invoke,
371+
key=lambda s: s.document_order,
372+
):
373+
for config in state.invocations:
374+
await self.invoke_manager.spawn_async(state, config, internal_event)
375+
self.states_to_invoke.clear()
376+
361377
# Phase 2: remaining internal events
362378
while not self.internal_queue.is_empty(): # pragma: no cover
363379
internal_event = self.internal_queue.pop()
@@ -381,6 +397,16 @@ async def processing_loop( # noqa: C901
381397

382398
logger.debug("External event: %s", external_event.event)
383399

400+
# Handle invoke finalize and autoforward
401+
for state in self.sm.configuration:
402+
for inv in self.invoke_manager.active_for_state(state):
403+
if external_event.invokeid and inv.invokeid == external_event.invokeid:
404+
self.invoke_manager.apply_finalize(inv, external_event)
405+
if inv.config.autoforward and external_event.event:
406+
self.invoke_manager.forward_event(
407+
inv, str(external_event.event), external_event
408+
)
409+
384410
# Handle lazy initial state activation.
385411
# Break out of phase 3 so the outer loop restarts from phase 1
386412
# (eventless/internal), ensuring internal events queued during

statemachine/engines/base.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from ..event_data import TriggerData
2121
from ..exceptions import InvalidDefinition
2222
from ..exceptions import TransitionNotAllowed
23+
from ..invoke import InvokeManager
2324
from ..orderedset import OrderedSet
2425
from ..state import HistoryState
2526
from ..state import State
@@ -94,6 +95,8 @@ def __init__(self, sm: "StateChart"):
9495
self.running = True
9596
self._processing = Lock()
9697
self._cache: Dict = {} # Cache for _get_args_kwargs results
98+
self.invoke_manager = InvokeManager(self)
99+
self.states_to_invoke: "OrderedSet[State]" = OrderedSet()
97100

98101
def empty(self): # pragma: no cover
99102
return self.external_queue.is_empty()
@@ -485,6 +488,13 @@ def _exit_states(
485488
for info in ordered_states:
486489
args, kwargs = self._get_args_kwargs(info.transition, trigger_data)
487490

491+
# Cancel invocations for states being exited
492+
if info.state is not None and getattr(info.state, "invocations", None):
493+
self.invoke_manager.cancel_for_state(info.state)
494+
495+
# Remove from states_to_invoke if we exit before invoking
496+
self.states_to_invoke.discard(info.state)
497+
488498
# Execute `onexit` handlers — same per-block error isolation as onentry.
489499
if info.state is not None: # pragma: no branch
490500
self.sm._callbacks.call(info.state.exit.key, *args, on_error=on_error, **kwargs)
@@ -556,6 +566,7 @@ def _handle_final_state(self, target: State, on_entry_result: list):
556566
"""Handle final state entry: queue done events. No direct callback dispatch."""
557567
if target.parent is None:
558568
self.running = False
569+
self.invoke_manager.cancel_all()
559570
else:
560571
parent = target.parent
561572
grandparent = parent.parent
@@ -645,6 +656,10 @@ def _enter_states( # noqa: C901
645656
new_configuration=new_configuration,
646657
)
647658

659+
# Track states with invocations for post-macrostep spawning
660+
if getattr(target, "invocations", None):
661+
self.states_to_invoke.add(target)
662+
648663
# Handle final states
649664
if target.final:
650665
self._handle_final_state(target, on_entry_result)

statemachine/engines/sync.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,14 @@ def processing_loop(self, caller_future=None): # noqa: C901
107107
took_events = True
108108
self._run_microstep(enabled_transitions, internal_event)
109109

110-
# TODO: Invoke platform-specific logic
111-
# for state in sorted(self.states_to_invoke, key=self.entry_order):
112-
# for inv in sorted(state.invoke, key=self.document_order):
113-
# self.invoke(inv)
114-
# self.states_to_invoke.clear()
110+
# Spawn invocations for states entered during this macrostep
111+
for state in sorted(
112+
self.states_to_invoke,
113+
key=lambda s: s.document_order,
114+
):
115+
for config in state.invocations:
116+
self.invoke_manager.spawn_sync(state, config, internal_event)
117+
self.states_to_invoke.clear()
115118

116119
# Process remaining internal events before external events.
117120
# Note: the macrostep loop above already drains the internal queue,
@@ -137,18 +140,16 @@ def processing_loop(self, caller_future=None): # noqa: C901
137140
break
138141

139142
logger.debug("External event: %s", external_event.event)
140-
# # TODO: Handle cancel event
141-
# if self.is_cancel_event(external_event):
142-
# self.running = False
143-
# return
144-
145-
# TODO: Invoke states
146-
# for state in self.configuration:
147-
# for inv in state.invoke:
148-
# if inv.invokeid == external_event.invokeid:
149-
# self.apply_finalize(inv, external_event)
150-
# if inv.autoforward:
151-
# self.send(inv.id, external_event)
143+
144+
# Handle invoke finalize and autoforward
145+
for state in self.sm.configuration:
146+
for inv in self.invoke_manager.active_for_state(state):
147+
if external_event.invokeid and inv.invokeid == external_event.invokeid:
148+
self.invoke_manager.apply_finalize(inv, external_event)
149+
if inv.config.autoforward and external_event.event:
150+
self.invoke_manager.forward_event(
151+
inv, str(external_event.event), external_event
152+
)
152153

153154
enabled_transitions = self.select_transitions(external_event)
154155
logger.debug("Enabled transitions: %s", enabled_transitions)

statemachine/event.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,17 +121,26 @@ def __get__(self, instance, owner):
121121
return self
122122
return BoundEvent(id=self.id, name=self.name, delay=self.delay, _sm=instance)
123123

124-
def put(self, *args, send_id: "str | None" = None, **kwargs):
124+
def put(self, *args, send_id: "str | None" = None, invokeid: "str | None" = None, **kwargs):
125125
# The `__call__` is declared here to help IDEs knowing that an `Event`
126126
# can be called as a method. But it is not meant to be called without
127127
# an SM instance. Such SM instance is provided by `__get__` method when
128128
# used as a property descriptor.
129129
assert self._sm is not None
130-
trigger_data = self.build_trigger(*args, machine=self._sm, send_id=send_id, **kwargs)
130+
trigger_data = self.build_trigger(
131+
*args, machine=self._sm, send_id=send_id, invokeid=invokeid, **kwargs
132+
)
131133
self._sm._put_nonblocking(trigger_data, internal=self.internal)
132134
return trigger_data
133135

134-
def build_trigger(self, *args, machine: "StateChart", send_id: "str | None" = None, **kwargs):
136+
def build_trigger(
137+
self,
138+
*args,
139+
machine: "StateChart",
140+
send_id: "str | None" = None,
141+
invokeid: "str | None" = None,
142+
**kwargs,
143+
):
135144
if machine is None:
136145
raise RuntimeError(_("Event {} cannot be called without a SM instance").format(self))
137146

@@ -140,6 +149,7 @@ def build_trigger(self, *args, machine: "StateChart", send_id: "str | None" = No
140149
machine=machine,
141150
event=self,
142151
send_id=send_id,
152+
invokeid=invokeid,
143153
args=args,
144154
kwargs=kwargs,
145155
)

statemachine/event_data.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ class TriggerData:
2424
Allow revoking a delayed :ref:`TriggerData` instance.
2525
"""
2626

27+
invokeid: "str | None" = field(compare=False, default=None)
28+
"""The invoke id of the child session that generated this event, if any."""
29+
2730
execution_time: float = field(default=0.0)
2831
"""The time at which the :ref:`Event` should run."""
2932

statemachine/invoke.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,11 @@ def _create_child(
168168
invocation: Invocation,
169169
trigger_data: Any,
170170
) -> "StateChart | None":
171-
"""Create and return a child StateChart instance."""
171+
"""Create and return a child StateChart instance.
172+
173+
Sets ``_parent_sm`` and ``_invokeid`` on the child class before
174+
instantiation so that ``#_parent`` sends work during initial entry.
175+
"""
172176
from .io.scxml.processor import SCXMLProcessor
173177

174178
bridge = ParentBridge(self.sm, invokeid, invocation)
@@ -177,10 +181,21 @@ def _create_child(
177181
child_sm: "StateChart | None" = None
178182

179183
if child_class is not None:
180-
child_sm = child_class(listeners=[bridge])
184+
# For user-provided classes, set parent refs on the class temporarily
185+
child_class._parent_sm = self.sm # type: ignore[attr-defined]
186+
child_class._invokeid = invokeid # type: ignore[attr-defined]
187+
try:
188+
child_sm = child_class(listeners=[bridge])
189+
finally:
190+
# Clean up class-level attrs (they're now on the instance)
191+
del child_class._parent_sm # type: ignore[attr-defined]
192+
del child_class._invokeid # type: ignore[attr-defined]
181193
elif config.content:
182194
processor = SCXMLProcessor()
183195
processor.parse_scxml(f"invoke_{invokeid}", config.content)
196+
child_cls = next(iter(processor.scs.values()))
197+
child_cls._parent_sm = self.sm # type: ignore[attr-defined]
198+
child_cls._invokeid = invokeid # type: ignore[attr-defined]
184199
child_sm = processor.start(listeners=[bridge])
185200
elif config.src:
186201
from pathlib import Path
@@ -191,13 +206,16 @@ def _create_child(
191206
path = Path(parsed.path) if parsed.scheme == "file" else Path(config.src)
192207
processor = SCXMLProcessor()
193208
processor.parse_scxml_file(path)
209+
child_cls = next(iter(processor.scs.values()))
210+
child_cls._parent_sm = self.sm # type: ignore[attr-defined]
211+
child_cls._invokeid = invokeid # type: ignore[attr-defined]
194212
child_sm = processor.start(listeners=[bridge])
195213

196214
if child_sm is None:
197215
logger.warning("Could not create child for invoke %s", invokeid)
198216
return None
199217

200-
# Set parent references on child
218+
# Ensure instance-level parent references are set
201219
child_sm._parent_sm = self.sm # type: ignore[attr-defined]
202220
child_sm._invokeid = invokeid # type: ignore[attr-defined]
203221

statemachine/io/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class BaseStateKwargs(TypedDict, total=False):
4646
enter: "str | ActionProtocol | Sequence[str] | Sequence[ActionProtocol]"
4747
exit: "str | ActionProtocol | Sequence[str] | Sequence[ActionProtocol]"
4848
donedata: "ActionProtocol | None"
49+
invoke: "Any"
4950

5051

5152
class StateKwargs(BaseStateKwargs, total=False):

statemachine/io/scxml/actions.py

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ class EventDataWrapper:
112112
def __init__(self, event_data):
113113
self.event_data = event_data
114114
self.sendid = event_data.trigger_data.send_id
115+
self.invokeid = event_data.trigger_data.invokeid or ""
115116
if event_data.trigger_data.event is None or event_data.trigger_data.event.internal:
116117
if "error.execution" == event_data.trigger_data.event:
117118
self.type = "platform"
@@ -364,9 +365,60 @@ def raise_action(*args, **kwargs):
364365
return raise_action
365366

366367

368+
def _send_to_parent(machine: "StateChart", event: str, content: Any, params_values: dict):
369+
"""Route an event to the parent session via #_parent."""
370+
parent_sm = getattr(machine, "_parent_sm", None)
371+
if parent_sm is not None:
372+
child_invokeid = getattr(machine, "_invokeid", None)
373+
parent_sm.send(event, *content, invokeid=child_invokeid, **params_values)
374+
else:
375+
machine.send("error.communication", internal=True)
376+
377+
378+
def _send_to_child(machine: "StateChart", event: str, params_values: dict):
379+
"""Route an event to the first active child via #_child."""
380+
if hasattr(machine, "_engine") and hasattr(machine._engine, "invoke_manager"):
381+
machine._engine.invoke_manager.send_to_child(event, **params_values)
382+
else:
383+
machine.send("error.communication", internal=True)
384+
385+
386+
def _send_to_invokeid(
387+
machine: "StateChart", target: str, event: str, action: SendAction, **kwargs
388+
):
389+
"""Route an event to a specific child by #_<invokeid>."""
390+
invokeid = target[2:]
391+
if hasattr(machine, "_engine") and hasattr(machine._engine, "invoke_manager"):
392+
params_values = {}
393+
for param in action.params:
394+
if param.expr:
395+
params_values[param.name] = _eval(param.expr, **kwargs)
396+
machine._engine.invoke_manager.send_to_invokeid(invokeid, event, **params_values)
397+
elif target.startswith("#_scxml_"):
398+
machine.send("error.communication", internal=True)
399+
else:
400+
raise ValueError(f"Invalid target: {target}")
401+
402+
403+
def _eval_send_params(action: SendAction, **kwargs) -> dict:
404+
"""Evaluate namelist and <param> into a dict of keyword arguments."""
405+
machine: "StateChart" = kwargs["machine"]
406+
names = []
407+
for name in (action.namelist or "").strip().split():
408+
if not hasattr(machine.model, name):
409+
raise NameError(f"Namelist variable '{name}' not found on model")
410+
names.append(Param(name=name, expr=name))
411+
params_values = {}
412+
for param in chain(names, action.params):
413+
if param.expr is None:
414+
continue
415+
params_values[param.name] = _eval(param.expr, **kwargs)
416+
return params_values
417+
418+
367419
def create_send_action_callable(action: SendAction) -> Callable: # noqa: C901
368420
content: Any = ()
369-
_valid_targets = (None, "#_internal", "internal", "#_parent", "parent")
421+
_valid_targets = (None, "#_internal", "internal", "#_parent", "parent", "#_child")
370422
if action.content:
371423
try:
372424
content = (eval(action.content, {}, {}),)
@@ -379,22 +431,35 @@ def send_action(*args, **kwargs):
379431
target = action.target if action.target else None
380432

381433
if action.type and action.type != "http://www.w3.org/TR/scxml/#SCXMLEventProcessor":
382-
# Per SCXML spec 6.2.3, unsupported type raises error.execution
383434
raise ValueError(
384435
f"Unsupported send type: {action.type}. "
385436
"Only 'http://www.w3.org/TR/scxml/#SCXMLEventProcessor' is supported"
386437
)
438+
387439
if target not in _valid_targets:
388440
if target and target.startswith("#_scxml_"):
389-
# Valid SCXML session reference but undispatchable error.communication
441+
# Valid SCXML session reference but undispatchable -> error.communication
390442
machine.send("error.communication", internal=True)
443+
return
444+
elif target and target.startswith("#_"):
445+
# Handle #_invokeid target (send to specific child by invoke id)
446+
_send_to_invokeid(machine, target, event, action, **kwargs)
447+
return
391448
else:
392-
# Invalid target expression → error.execution (raised as exception)
393449
raise ValueError(f"Invalid target: {target}. Must be one of {_valid_targets}")
394450
return
395451

396-
internal = target in ("#_internal", "internal")
452+
params_values = _eval_send_params(action, **kwargs)
397453

454+
# Handle cross-session targets
455+
if target in ("#_parent", "parent"):
456+
_send_to_parent(machine, event, content, params_values)
457+
return
458+
if target == "#_child":
459+
_send_to_child(machine, event, params_values)
460+
return
461+
462+
internal = target in ("#_internal", "internal")
398463
send_id = None
399464
if action.id:
400465
send_id = action.id
@@ -403,20 +468,6 @@ def send_action(*args, **kwargs):
403468
setattr(machine.model, action.idlocation, send_id)
404469

405470
delay = ParseTime.parse_delay(action.delay, action.delayexpr, **kwargs)
406-
407-
# Per SCXML spec, if namelist evaluation causes an error (e.g., variable not found),
408-
# the send MUST NOT be dispatched and error.execution is raised.
409-
names = []
410-
for name in (action.namelist or "").strip().split():
411-
if not hasattr(machine.model, name):
412-
raise NameError(f"Namelist variable '{name}' not found on model")
413-
names.append(Param(name=name, expr=name))
414-
params_values = {}
415-
for param in chain(names, action.params):
416-
if param.expr is None:
417-
continue
418-
params_values[param.name] = _eval(param.expr, **kwargs)
419-
420471
Event(id=event, name=event, delay=delay, internal=internal, _sm=machine).put(
421472
*content,
422473
send_id=send_id,

0 commit comments

Comments
 (0)