Skip to content

feat: add state materialization across regions#4490

Open
aglinxinyuan wants to merge 16 commits intomainfrom
xinyuan-state-materialization
Open

feat: add state materialization across regions#4490
aglinxinyuan wants to merge 16 commits intomainfrom
xinyuan-state-materialization

Conversation

@aglinxinyuan
Copy link
Copy Markdown
Contributor

@aglinxinyuan aglinxinyuan commented Apr 24, 2026

What changes were proposed in this PR?

This PR adds state materialization as a general mechanism for passing state across different regions.

  • materialize state as a separate storage object alongside result storage
  • store one serialized state per row
  • use the same cross-language format on Python and Scala/Java sides
  • let downstream regions read back both data and state when needed

Any related issues, documentation, discussions?

Closes #4489

How was this PR tested?

  • added a Python round-trip test for materialized state storage in test_iceberg_document.py
  • added a Scala round-trip test for materialized state storage in IcebergDocumentSpec.scala

Was this PR authored or co-authored using generative AI tooling?

Generated-by: ChatGPT (Codex), Claude Code (Claude Opus 4.7)

@aglinxinyuan aglinxinyuan self-assigned this Apr 24, 2026
@aglinxinyuan aglinxinyuan changed the title feat: add state materialization across regions wip: feat: add state materialization across regions Apr 24, 2026
@aglinxinyuan aglinxinyuan changed the base branch from main to xinyuan-state-only April 24, 2026 05:07
@aglinxinyuan aglinxinyuan marked this pull request as ready for review April 24, 2026 05:08
@aglinxinyuan aglinxinyuan changed the title wip: feat: add state materialization across regions feat: add state materialization across regions Apr 24, 2026
@aglinxinyuan aglinxinyuan reopened this Apr 28, 2026
Base automatically changed from xinyuan-state-only to main May 1, 2026 06:10
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 2, 2026

Codecov Report

❌ Patch coverage is 81.60920% with 16 lines in your changes missing coverage. Please review.
✅ Project coverage is 42.69%. Comparing base (8b5dbf8) to head (4428c9d).
⚠️ Report is 5 commits behind head on main.

Files with missing lines Patch % Lines
...thon/core/architecture/packaging/output_manager.py 65.00% 7 Missing ⚠️
...anagers/InputPortMaterializationReaderThread.scala 42.85% 3 Missing and 1 partial ⚠️
...ne/architecture/messaginglayer/OutputManager.scala 84.61% 2 Missing ⚠️
...he/texera/amber/core/storage/DocumentFactory.scala 0.00% 0 Missing and 2 partials ⚠️
...chitecture/pythonworker/PythonWorkflowWorker.scala 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #4490      +/-   ##
============================================
+ Coverage     42.49%   42.69%   +0.19%     
- Complexity     2180     2185       +5     
============================================
  Files          1005     1005              
  Lines         37429    37504      +75     
  Branches       3914     3921       +7     
============================================
+ Hits          15907    16013     +106     
+ Misses        20558    20520      -38     
- Partials        964      971       +7     
Flag Coverage Δ
access-control-service 39.53% <ø> (ø)
amber 43.17% <64.00%> (+0.03%) ⬆️
computing-unit-managing-service 0.00% <ø> (ø)
config-service 0.00% <ø> (ø)
file-service 33.24% <ø> (ø)
python 89.25% <88.70%> (+1.18%) ⬆️
workflow-compiling-service 47.72% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

aglinxinyuan added a commit that referenced this pull request May 3, 2026
Adds 9 unit tests targeting the codecov-flagged gaps in PR #4490:

- InputPortMaterializationReaderRunnable.run() inner state-reading
  try-block, including the missing-state-document path (ValueError
  swallow).
- DocumentFactory.create_document / open_document namespace routing
  for STATE and RESULT, plus the unsupported-resource-type and
  missing-table error paths. Iceberg dependencies are mocked at the
  document_factory import site so the tests run without Postgres.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 5, 2026 00:40
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces state materialization as a first-class storage artifact alongside result materialization so that operator state can be persisted and later replayed when execution crosses region boundaries, using a shared serialization format across Scala/Java and Python.

Changes:

  • Add a new VFS resource type (state) and route it to a dedicated Iceberg namespace/config on both Scala and Python sides.
  • Persist state rows (serialized) into a separate Iceberg table per operator output, and read them back during input-port materialization.
  • Add round-trip tests for state materialization in both Scala and Python.

Reviewed changes

Copilot reviewed 24 out of 24 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala Adds Scala tests for round-tripping materialized state rows via Iceberg.
common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala Introduces STATE as a new VFS resource type.
common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala Routes STATE resource type to a configurable Iceberg namespace (create/open).
common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala Adds helper to derive a state URI from a result URI.
common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala Adds icebergTableStateNamespace configuration and env var constant.
common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala Adds env var constant for state namespace.
common/config/src/main/resources/storage.conf Adds storage.iceberg.table.state-namespace default + env override.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala Reads materialized state table and emits StateFrames to the input queue.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala Attempts to persist state when processing state messages.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala Creates the state table alongside the result table during scheduling/setup.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala Passes state namespace into the Python worker process args.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala Adds state persistence to storage for output ports with materialization enabled.
amber/src/main/python/texera_run_python_worker.py Extends CLI arg parsing to accept state namespace and initialize StorageConfig accordingly.
amber/src/main/python/pytexera/storage/test_large_binary_manager.py Updates test StorageConfig initialization to include the state namespace.
amber/src/main/python/core/storage/vfs_uri_factory.py Adds STATE as a Python VFS resource type.
amber/src/main/python/core/storage/test_document_factory.py Adds Python unit tests verifying namespace routing for RESULT vs STATE.
amber/src/main/python/core/storage/storage_config.py Adds ICEBERG_TABLE_STATE_NAMESPACE and wires it into initialization.
amber/src/main/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py Adds Python tests for emitting state frames and state-table read behavior.
amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py Reads and emits state materialization rows before tuple materialization.
amber/src/main/python/core/storage/iceberg/test_iceberg_document.py Adds Python integration-ish tests for round-tripping materialized state rows.
amber/src/main/python/core/storage/document_factory.py Routes STATE to the state namespace for create/open.
amber/src/main/python/core/models/state.py Adds helper to derive a state URI from a result URI.
amber/src/main/python/core/architecture/packaging/test_output_manager.py Adds unit tests for Python OutputManager state persistence behavior.
amber/src/main/python/core/architecture/packaging/output_manager.py Adds state persistence API and records storage URIs for later state writes.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread amber/src/main/python/core/architecture/packaging/output_manager.py
Adds a state-materialization path alongside the existing tuple-result
storage. State produced by an operator's processState is written to a
companion Iceberg table whose URI is derived from the result URI. The
input-port materialization reader replays both tuples and states into
downstream workers.

Key pieces:

- New STATE resource type and a state-namespace storage config entry
  on both Python and Scala sides; namespaces are read from
  StorageConfig instead of hardcoded strings.
- RegionExecutionCoordinator provisions a state document next to every
  result document at scheduling time, so readers and writers can rely
  on its presence without try/catch.
- One long-lived BufferedItemWriter per output port, opened at port
  setup and closed at port completion, so a single Iceberg snapshot is
  produced per port instead of one per state.
- DataProcessor.processInputState (Scala) and MainLoop.process_input_state
  (Python) persist the executor's *output* state, matching the state
  that is also emitted downstream.
- New Python and Scala unit tests covering the State JSON wire format,
  the OutputManager state-writer lifecycle, the reader's state-replay
  block, and DocumentFactory namespace routing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@aglinxinyuan aglinxinyuan force-pushed the xinyuan-state-materialization branch from 10f243f to 581d574 Compare May 5, 2026 06:01
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 26 out of 26 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

aglinxinyuan and others added 2 commits May 5, 2026 19:08
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@aglinxinyuan aglinxinyuan requested a review from Copilot May 6, 2026 02:22
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 25 out of 25 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread amber/src/main/python/core/models/state.py
Copy link
Copy Markdown
Contributor

@Xiao-zhen-Liu Xiao-zhen-Liu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.


def fromTuple(row: Tuple): State = fromJson(row.getField[String](Content))

def uriFromResultUri(resultUri: URI): URI =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helper is hacky. State and Result are sibling resources — both belong to the same (workflowId, executionId, globalPortId) and neither is derived from the other. Deriving one URI from another by string replacement also breaks if /result appears anywhere else in the URI (e.g. an operator named result).

A cleaner approach to add a VFSURIFactory.createStateURI(workflowId, executionId, globalPortId) that mirrors createResultURI, build state URIs through it, and delete this helper.

return cls.from_json(row[cls.CONTENT])

@staticmethod
def uri_from_result_uri(result_uri: str) -> str:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

if receiver == self.worker_actor_id:
yield self.tuples_to_data_frame(tuples)

def emit_state_with_filter(self, state: State) -> typing.Iterator[DataPayload]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Python implementation seems to be different from Scala imp. This routes through partitioner.flush_state only to immediately filter the broadcast back down to this worker — given the design that every worker reads every state, the partitioner detour is a no-op. Consider dropping this method and emitting StateFrame(State.from_tuple(row)) directly in run(), mirroring the Scala reader. The current name (_with_filter) also implies routing logic that doesn't actually exist.

})
}

def saveStateToStorageIfNeeded(state: State): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This writes the same state into every output port's state table. For multi-output-port operators that's a fan-out by N. Is this intended? If yes, please add a comment explaining the behavior so it's not read as a bug. Same goes for Python.

case None =>
}

this.stateWriters.remove(outputPortId).foreach(_.close())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Result tuples are written via a dedicated OutputPortResultWriterThread so the DP thread isn't blocked on Iceberg I/O. State writes here run synchronously on the caller's thread (DataProcessor), so every putOne — and eventually buffer flushes / commits — stalls the DP thread. For loops that emit state frequently this becomes a real cost. Please mirror the result path with a OutputPortStateWriterThread (or generalize the existing one) so DP just enqueues.

aglinxinyuan and others added 7 commits May 5, 2026 21:38
Address PR #4490 review comment 3192875005: document why the
input-port materialization reader replays states before tuples
(downstream operators typically need their state in place before
processing the incoming tuples).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #4490 review comment 3192889029: explain why the state
loop intentionally enqueues every row to every downstream worker
while the tuple loop filters by partitioner -- state is shared
context, not per-key data.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support state materialization across regions

4 participants