feat: add state materialization across regions#4490
feat: add state materialization across regions#4490aglinxinyuan wants to merge 16 commits intomainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4490 +/- ##
============================================
+ Coverage 42.49% 42.69% +0.19%
- Complexity 2180 2185 +5
============================================
Files 1005 1005
Lines 37429 37504 +75
Branches 3914 3921 +7
============================================
+ Hits 15907 16013 +106
+ Misses 20558 20520 -38
- Partials 964 971 +7
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Adds 9 unit tests targeting the codecov-flagged gaps in PR #4490: - InputPortMaterializationReaderRunnable.run() inner state-reading try-block, including the missing-state-document path (ValueError swallow). - DocumentFactory.create_document / open_document namespace routing for STATE and RESULT, plus the unsupported-resource-type and missing-table error paths. Iceberg dependencies are mocked at the document_factory import site so the tests run without Postgres. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces state materialization as a first-class storage artifact alongside result materialization so that operator state can be persisted and later replayed when execution crosses region boundaries, using a shared serialization format across Scala/Java and Python.
Changes:
- Add a new VFS resource type (
state) and route it to a dedicated Iceberg namespace/config on both Scala and Python sides. - Persist state rows (serialized) into a separate Iceberg table per operator output, and read them back during input-port materialization.
- Add round-trip tests for state materialization in both Scala and Python.
Reviewed changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala | Adds Scala tests for round-tripping materialized state rows via Iceberg. |
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala | Introduces STATE as a new VFS resource type. |
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala | Routes STATE resource type to a configurable Iceberg namespace (create/open). |
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala | Adds helper to derive a state URI from a result URI. |
| common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala | Adds icebergTableStateNamespace configuration and env var constant. |
| common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala | Adds env var constant for state namespace. |
| common/config/src/main/resources/storage.conf | Adds storage.iceberg.table.state-namespace default + env override. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala | Reads materialized state table and emits StateFrames to the input queue. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala | Attempts to persist state when processing state messages. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala | Creates the state table alongside the result table during scheduling/setup. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala | Passes state namespace into the Python worker process args. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala | Adds state persistence to storage for output ports with materialization enabled. |
| amber/src/main/python/texera_run_python_worker.py | Extends CLI arg parsing to accept state namespace and initialize StorageConfig accordingly. |
| amber/src/main/python/pytexera/storage/test_large_binary_manager.py | Updates test StorageConfig initialization to include the state namespace. |
| amber/src/main/python/core/storage/vfs_uri_factory.py | Adds STATE as a Python VFS resource type. |
| amber/src/main/python/core/storage/test_document_factory.py | Adds Python unit tests verifying namespace routing for RESULT vs STATE. |
| amber/src/main/python/core/storage/storage_config.py | Adds ICEBERG_TABLE_STATE_NAMESPACE and wires it into initialization. |
| amber/src/main/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py | Adds Python tests for emitting state frames and state-table read behavior. |
| amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py | Reads and emits state materialization rows before tuple materialization. |
| amber/src/main/python/core/storage/iceberg/test_iceberg_document.py | Adds Python integration-ish tests for round-tripping materialized state rows. |
| amber/src/main/python/core/storage/document_factory.py | Routes STATE to the state namespace for create/open. |
| amber/src/main/python/core/models/state.py | Adds helper to derive a state URI from a result URI. |
| amber/src/main/python/core/architecture/packaging/test_output_manager.py | Adds unit tests for Python OutputManager state persistence behavior. |
| amber/src/main/python/core/architecture/packaging/output_manager.py | Adds state persistence API and records storage URIs for later state writes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Adds a state-materialization path alongside the existing tuple-result storage. State produced by an operator's processState is written to a companion Iceberg table whose URI is derived from the result URI. The input-port materialization reader replays both tuples and states into downstream workers. Key pieces: - New STATE resource type and a state-namespace storage config entry on both Python and Scala sides; namespaces are read from StorageConfig instead of hardcoded strings. - RegionExecutionCoordinator provisions a state document next to every result document at scheduling time, so readers and writers can rely on its presence without try/catch. - One long-lived BufferedItemWriter per output port, opened at port setup and closed at port completion, so a single Iceberg snapshot is produced per port instead of one per state. - DataProcessor.processInputState (Scala) and MainLoop.process_input_state (Python) persist the executor's *output* state, matching the state that is also emitted downstream. - New Python and Scala unit tests covering the State JSON wire format, the OutputManager state-writer lifecycle, the reader's state-replay block, and DocumentFactory namespace routing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
10f243f to
581d574
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 26 out of 26 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Xiao-zhen-Liu
left a comment
There was a problem hiding this comment.
Left some comments.
|
|
||
| def fromTuple(row: Tuple): State = fromJson(row.getField[String](Content)) | ||
|
|
||
| def uriFromResultUri(resultUri: URI): URI = |
There was a problem hiding this comment.
This helper is hacky. State and Result are sibling resources — both belong to the same (workflowId, executionId, globalPortId) and neither is derived from the other. Deriving one URI from another by string replacement also breaks if /result appears anywhere else in the URI (e.g. an operator named result).
A cleaner approach to add a VFSURIFactory.createStateURI(workflowId, executionId, globalPortId) that mirrors createResultURI, build state URIs through it, and delete this helper.
| return cls.from_json(row[cls.CONTENT]) | ||
|
|
||
| @staticmethod | ||
| def uri_from_result_uri(result_uri: str) -> str: |
| if receiver == self.worker_actor_id: | ||
| yield self.tuples_to_data_frame(tuples) | ||
|
|
||
| def emit_state_with_filter(self, state: State) -> typing.Iterator[DataPayload]: |
There was a problem hiding this comment.
This Python implementation seems to be different from Scala imp. This routes through partitioner.flush_state only to immediately filter the broadcast back down to this worker — given the design that every worker reads every state, the partitioner detour is a no-op. Consider dropping this method and emitting StateFrame(State.from_tuple(row)) directly in run(), mirroring the Scala reader. The current name (_with_filter) also implies routing logic that doesn't actually exist.
| }) | ||
| } | ||
|
|
||
| def saveStateToStorageIfNeeded(state: State): Unit = { |
There was a problem hiding this comment.
This writes the same state into every output port's state table. For multi-output-port operators that's a fan-out by N. Is this intended? If yes, please add a comment explaining the behavior so it's not read as a bug. Same goes for Python.
| case None => | ||
| } | ||
|
|
||
| this.stateWriters.remove(outputPortId).foreach(_.close()) |
There was a problem hiding this comment.
Result tuples are written via a dedicated OutputPortResultWriterThread so the DP thread isn't blocked on Iceberg I/O. State writes here run synchronously on the caller's thread (DataProcessor), so every putOne — and eventually buffer flushes / commits — stalls the DP thread. For loops that emit state frequently this becomes a real cost. Please mirror the result path with a OutputPortStateWriterThread (or generalize the existing one) so DP just enqueues.
Address PR #4490 review comment 3192875005: document why the input-port materialization reader replays states before tuples (downstream operators typically need their state in place before processing the incoming tuples). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #4490 review comment 3192889029: explain why the state loop intentionally enqueues every row to every downstream worker while the tuple loop filters by partitioner -- state is shared context, not per-key data. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
What changes were proposed in this PR?
This PR adds state materialization as a general mechanism for passing state across different regions.
Any related issues, documentation, discussions?
Closes #4489
How was this PR tested?
Was this PR authored or co-authored using generative AI tooling?
Generated-by: ChatGPT (Codex), Claude Code (Claude Opus 4.7)