Skip to content

Streaming daemon: Phase 1 layer 2 — primitives (#815)#818

Merged
chowbao merged 28 commits into
feature/full-historyfrom
streaming-phase1-primitives
Jun 27, 2026
Merged

Streaming daemon: Phase 1 layer 2 — primitives (#815)#818
chowbao merged 28 commits into
feature/full-historyfrom
streaming-phase1-primitives

Conversation

@chowbao

@chowbao chowbao commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Part of #815 — Phase 1 (Backfill), layer 2 of 3. Stacked on PR1A (streaming-phase1-foundations).

The primitives — single-pass cold materialization for all three data types:

  • processChunk — materializes a chunk's ledgers (.pack), events (cold segment), and tx-hash (sorted .bin) artifacts for every Kind, with per-kind idempotency
  • the cold backfillSource (frozen .pack → bulk backend)
  • buildTxhashIndex — k-way merge of the in-window .bin runs into a sorted .idx (byte-format-identical to the single-index build)
  • adds the ingest.RunColdChunk single-chunk cold primitive

Verification: go build + go vet + go test -short green on ./cmd/stellar-rpc/internal/fullhistory/... (cgo RocksDB toolchain). Note: the full cmd/stellar-rpc binary link requires the Rust libpreflight/libxdr2json (CI make build-libs); go vet ./cmd/stellar-rpc/ type-checks the entrypoint locally. golangci-lint runs in CI.

Stack: streaming-phase1-primitivesstreaming-phase1-foundations

Catalog + one-write protocol + key schema, config loader/locking, chunk/window
geometry, the retention floor/gate, the crash-injection hook seam, and the
artifact/path layout for the full-history streaming daemon. Cold substrate only;
the primitives (processChunk, buildTxhashIndex) and orchestration land in the
stacked layers above.

Part of #815.
@chowbao chowbao force-pushed the streaming-phase1-foundations branch from 3d6b182 to 20c4e3b Compare June 24, 2026 22:04
@chowbao chowbao force-pushed the streaming-phase1-primitives branch from 672d01f to 55cabde Compare June 24, 2026 22:05
chowbao added 5 commits June 24, 2026 20:08
Non-behavioral cleanup of the Phase 1 foundations package.

Code:
- Delegate cold-artifact leaf filenames to the owning store packages
  (ledger.PackName, eventstore.*, txhash.ColdBinName) instead of
  re-deriving them in Layout — one source of truth, matching ingest.
- Alias DefaultChunksPerTxhashIndex to txhash.DefaultChunksPerIndex and
  DefaultEarliestLedger to EarliestGenesis.
- Extract syncAndClose, the sync-then-close epilogue shared by
  fsyncFile/fsyncDir.
- Compute chunkKey once per iteration in windowTxhashKeysPresent.
- Drop the duplicate Windows.ChunksIn (identical to ChunksPerIndex).

Docs:
- Condense doc comments across the package (~26% fewer comment words;
  the high-volume files 20–43%), cutting repetition while preserving
  invariants, rationale, design-doc citations, and //nolint directives.

No behavior change; gofmt clean, build/vet/tests green.
Review-time cleanups, no behavior change:

- Rename the Paths.LockRoots() accessor to RootsToLock() so it no longer
  echoes the package-level LockRoots() that actually acquires the flocks.
  The call site now reads LockRoots(paths.RootsToLock()...): the method is
  the noun (roots to lock), the func is the verb (lock them).
- Fix stale filename references in the Catalog doc comment
  (protocol.go -> catalog_protocol.go, sweep.go -> catalog_sweep.go).

gofmt clean; build/vet/tests green.
No behavior change:

- Remove the unexported retentionFloorFor (a one-line pass-through to
  effectiveRetentionFloor) and seqWithinRetention (test-only). NewRetentionGate
  now calls effectiveRetentionFloor directly.
- Drop the tautological test assertion that the free function and the gate
  agree — both were seq >= effectiveRetentionFloor(...), so gate.Admits already
  covers it.
- Fold the free-floating contract header onto the RetentionGate type and
  condense the docs (retention.go 95 -> 59 lines).

gofmt clean; vet 0; tests green.
The ArtifactSet doc cited "design-docs rule 2" / "rule 1's per-kind
idempotency", but the design's only numbered rules are the reader
contract's (a different concept). Describe the concepts directly instead.

Comment-only; no behavior change.
Pure test reorganization, no logic change — the 894-line omnibus split by
concern, function bodies moved verbatim:

- window_test.go            geometry / window arithmetic
- keys_test.go              key schema + key<->path bijection
- catalog_test.go           catalog reader (pins, scans, FrozenCoverage)
- catalog_protocol_test.go  mark/flip + CommitIndex
- catalog_sweep_test.go     the two sweeps
- crashsafety_test.go       power-loss-between-steps + never-unlink-under-frozen
- helpers_test.go           shared test helpers + vars

Same 33 tests (51 package-wide); gofmt clean, vet/test green.
@chowbao chowbao force-pushed the streaming-phase1-primitives branch from aac0f4b to 1b47f8e Compare June 25, 2026 04:05
Review follow-ups to the Phase 1 foundations, the first being a crash-safety
fix:

- Durability: MkdirAll creates the storage roots but fsyncs neither the new
  dirs nor the direntries naming them, and the one-write protocol's
  grandparent fsync (barrierNewFile) only reaches a root's CONTENTS, never the
  root's own link in its parent. On a fresh deployment a crash just after the
  first freeze could lose a whole storage subtree while the synced catalog
  still advertised a "frozen" artifact under it. lockOne now records the
  deepest pre-existing ancestor before MkdirAll and fsyncs the created chain
  (deepestExistingDir + fsyncNewDirs in paths.go) — one extra dir fsync per
  root at startup.

- Catalog doc: warn that PutEarliestLedger/PutChunksPerTxhashIndex must not be
  used to pin the layout on first start (that is PinLayout's atomic
  both-or-neither job); they exist only for isolated pin writes.

- windowTxhashKeysPresent: drop the redundant cid <= last condition — the
  explicit `if cid == last { break }` is the real inclusive-bound /
  chunk.ID-wraparound guard.

- Tests: cover retention_chunks=0 (full history pins at earliest_ledger, the
  earliest-wins branch the existing tests miss) and the young-store /
  oversized-retention clamp to genesis; and prove the flock is released when
  the lock fd is closed without LOCK_UN — the kernel guarantee kill -9 relies
  on.

gofmt clean; the new fsync helpers were build- and behavior-checked standalone.

Part of #815.
@chowbao chowbao force-pushed the streaming-phase1-primitives branch from 1b47f8e to 72368b2 Compare June 25, 2026 05:19
chowbao added 6 commits June 25, 2026 09:13
…export Get/Has

Addresses three review comments on the Phase 1 foundations layer:

- r3473146235: rename the "window" concept to "tx-hash index" end to end and
  change the on-disk catalog key prefix index: -> txhash_index:. The tx-hash
  index family is now uniformly qualified: WindowID->TxHashIndexID,
  Windows->TxHashIndexLayout, IndexCoverage->TxHashIndexCoverage,
  FrozenCoverage->FrozenTxHashIndex, IndexKeys/AllIndexKeys->TxHashIndexKeys/
  AllTxHashIndexKeys, MarkIndexFreezing/CommitIndex/SweepIndexKey->
  Mark/Commit/Sweep TxHashIndex*, IndexFilePath->TxHashIndexFilePath,
  IndexWindowDir->TxHashIndexDir, the unexported indexKey/parseIndexKey/
  indexPrefix helpers, and the TxHashIndexCoverage.Index / Catalog.txhashIndex
  fields. ("crash window" and "Retention window" are different concepts, kept.)

- r3473322303: remove the speculative PutEarliestLedger/PutChunksPerTxhashIndex
  single-key setters (no production caller); first-start pinning is PinLayout's
  atomic both-or-neither job. TestConfigPins now drives PinLayout, giving it its
  first test coverage.

- r3473953493: un-export Catalog.Get/Has -> get/has; nothing outside the package
  uses them (the daemon entrypoint is in-package).

gofmt/vet/test -short green. Part of #815.
Admits/Floor() had no caller anywhere in the stack (only retention_test.go), and
TxHashIndexBelowFloor was identical to ChunkBelowFloor(LastChunk(idx)). Collapse
the type to RetentionFloor{chunk chunk.ID} with a single Excludes(c) predicate:
effectiveRetentionFloor is already chunk-aligned and every caller either converts
it to a chunk or compares whole chunks, so the floor is a chunk.ID — dropping the
TxHashIndexLayout parameter and the .LastLedger() arithmetic. An index is below
the floor exactly when its last chunk is, so callers use Excludes(layout.LastChunk
(idx)). The reader's seq-level admit predicate and the ledger-seq floor for the
§8.2 coverage filter come back with the read path (#772). Part of #815.
…71335272)

Split the platform-specific lock primitive behind acquireLock/releaseLock:
config_lock_unix.go keeps the flock (LOCK_EX|LOCK_NB, EWOULDBLOCK→errLockHeld),
and config_lock_windows.go adds the equivalent LockFileEx (LOCKFILE_EXCLUSIVE_LOCK
| LOCKFILE_FAIL_IMMEDIATELY, ERROR_LOCK_VIOLATION→errLockHeld) over a one-byte
range. Both release on handle close and on any process exit, so the crash-release
contract holds on both platforms. The shared config_lock.go is now
platform-neutral (errLockHeld sentinel → ErrRootLocked). Part of #815.
Reorganize the TOML schema around what each field governs, not which phase
touches it, per tamirms's proposal:

  [layout]    chunks_per_txhash_index   (window-size geometry; PINNED)
  [retention] earliest_ledger (PINNED), retention_chunks
  [storage]   catalog, ledgers, events, txhash_raw, txhash_index, hot
              (consolidates the old [catalog]/[immutable_storage.*]/
               [streaming.hot_storage] sections into flat path keys)
  [backfill]  workers, max_retries, [backfill.bsb]   (genuine backfill knobs only)
  [ingestion] captive_core_config        (renamed from [streaming])

chunks_per_txhash_index was geometry sitting under [backfill]; retention_chunks
and the pinned earliest_ledger were under [streaming] though they drive the
backfill range. The two PINNED fields (set once on first start, abort-on-mismatch
after) now have that contract called out on the field docs. ResolvePaths reads
the consolidated [storage]; resolved Paths and RootsToLock are unchanged.

gofmt/vet/test -short green. Part of #815.
Doc/name cleanup from the review-response churn (no behavior change):
- catalog.go: get/has doc comments still read "Get"/"Has" after un-exporting them.
- config_lock.go, paths.go: prose still named the old [catalog]/[immutable_storage.*]/
  [streaming.hot_storage] sections after the [storage] consolidation.
- catalog_protocol.go: drop the stale "rule 1" design-rule-number reference (adcb936
  removed it elsewhere; this instance was missed).
- window_test.go: TestNewWindows_Validation/TestWindowArithmetic ->
  TestNewTxHashIndexLayout_Validation/TestTxHashIndexArithmetic.

gofmt/vet/test -short green. Part of #815.
The CI golangci-lint run (default: all) flagged 8 issues, all in this PR's new
files:
- funcorder: the now-unexported Catalog.get/has must sit after the exported
  methods — moved them into the unexported-helpers section after PinLayout.
- godoclint: the EarliestGenesis and DefaultEarliestLedger const docs must start
  with the symbol name.
- gosec G115 (uintptr->int on the flock fd): routed both unix.Flock calls through
  a single fdInt(f) helper carrying one //nolint:gosec, instead of two inline
  directives (avoids a too-long line and a duplicate-conversion unused-directive).
- nolintlint: dropped two now-unused //nolint:gosec directives in artifacts.go
  (gosec no longer flags the uint8(1)<<i shifts).

Verified with golangci-lint v2.11.3 (the CI version): 0 issues, full-scan.
gofmt/test -short green. Part of #815.
@chowbao chowbao force-pushed the streaming-phase1-primitives branch from 72368b2 to 531dca9 Compare June 25, 2026 14:36
chowbao added a commit that referenced this pull request Jun 25, 2026
…hanges

Rebased onto the updated #818 and propagated #817's API changes into the
orchestration/daemon layer:
- window -> tx-hash index rename + key prefix index: -> txhash_index:
  (incl. IndexBuild.Window -> .Index, Catalog.windows -> .txhashIndex).
- Catalog.Get/Has -> unexported get/has.
- config sections regrouped: cfg.Backfill.ChunksPerTxhashIndex -> cfg.Layout.*,
  cfg.Streaming.{RetentionChunks,EarliestLedger,CaptiveCoreConfig} -> cfg.Retention.*
  / cfg.Ingestion.*, cfg.Catalog.Path + cfg.ImmutableStorage.* + cfg.Streaming.
  HotStorage -> cfg.Storage.*; daemon_test config literals + TOML fixtures updated.
- removed pin setters: tests now pin via PinLayout(testCPI, earliest).

Mechanical propagation only; build/vet/test -short green (incl. the daemon E2E).
chowbao added 3 commits June 25, 2026 12:20
…, drop Windows

Addresses tamirms's round-2 review on #817 and the codex threads behind it.

- window.go -> txhash_index.go: cap MaxChunksPerTxhashIndex at the cold tx-hash
  index's payload capacity (2^(8*ColdPayloadSize) / LedgersPerChunk, ~1,677),
  derived from txhash.ColdPayloadSize so the two can't drift. A larger cpi passed
  validation but made every index build fail, and cpi is immutable once pinned.
  (codex window.go:18)
- CommitTxHashIndex: terminal demotion now covers only cov's [Lo,Hi] range, not
  the whole index window, so a frozen .bin below Lo (a lower coverage's input)
  survives. (codex catalog_protocol.go:101)
- CommitTxHashIndex: refuse a stale/out-of-order commit whose range the frozen
  coverage already spans, so FrozenTxHashIndex never regresses to a shorter
  index. (codex catalog_protocol.go:110)
- SweepTxHashIndexKey: base the frozen->pruning demote on the current durable
  value, not the caller's possibly-stale cov.State; an absent key is a no-op.
  (codex catalog_sweep.go:69)
- MarkChunkFreezing: refuse to re-materialize a "pruning" key — a sweep owns it,
  and resurrecting it would race into a frozen key with no file.
  (codex catalog_protocol.go:36)
- Drop the crash-test hooks (crashHooks, the fire*/failCommitBatch injection, and
  the tests that used them); keep the invariant-check tests. Real replacement
  tracked in #823.
- Revert config_lock to unix-only (fold flock back in, delete the unix/windows
  split) and remove windows-latest from the build matrix: directory fsync has no
  Windows equivalent (codex paths.go:151), so the daemon can't run there.

Follow-ups opened: #823 (fault-injection crash-safety harness), #824 (split the
streaming package).
The streaming daemon is unix-only, but cmd/stellar-rpc does not import the
streaming package yet, so the Windows `go build ./cmd/stellar-rpc` job still
passes. Keep the matrix entry untouched until the daemon is wired into the
binary (when it will need to be revisited).
First step of #824 (done now per the round-2 review request): carve the
catch-all streaming package into purpose-named subpackages.

  streaming/        config, single-process locking, retention floor
    geometry/       chunk + tx-hash-index math, the key/path-naming bijection,
                    and the fsync helpers (was keys.go, paths.go, txhash_index.go)
    catalog/        durable on-disk state: the one-write protocol, the key-driven
                    sweeps, and the artifact-kind sets (was catalog*.go, artifacts.go)

The dependency graph is one-way with no cycles: geometry depends on neither
streaming nor catalog; catalog imports only geometry; streaming imports geometry
(config_lock, retention). Cross-package helpers that were unexported are now
exported from geometry (ChunkKey, FsyncDir, LastCompleteChunkAt, ...).
NewLayoutFromPaths stays in streaming as a thin bridge over the new
geometry.NewLayoutFromRoots, which takes plain strings so geometry carries no
dependency on the config Paths type.

The parent directory stays `streaming` for now; renaming it (dropping the
codename) and the remaining store/lifecycle splits are deferred to the rest of
#824. No behavior change — a pure repackage.

Verified: go build, go vet, and go test -race are all green for the three
packages.
chowbao added a commit that referenced this pull request Jun 26, 2026
Rebase the daemon layer onto the split #818 and follow the issue #824
package layout instead of the old single streaming package:

- backfill/: resolve.go + execute.go join process.go/txindex.go (the
  planner + bounded-worker executor). RunBackfill and the RunChunk/
  RunIndex test seams are exported so the daemon package's catch-up
  tests can inject fakes across the package boundary.
- observability/: the daemon's control-plane Metrics sink (NopMetrics
  and MetricsOrNop exported for backfill, which consumes the interface).
- fullhistory/ (top level, package fullhistory): daemon, startup
  (catch-up + serve), config_validate, progress, doc.

Adopt #817's chunks_per_txhash_index-is-a-constant change in the daemon
layer: drop the cpi config field + its form/restart validation, replace
PinLayout(cpi, earliest) with PinEarliestLedger(earliest), and build the
TxHashIndexLayout from geometry.ChunksPerTxhashIndex. The daemon E2E test
now exercises a non-terminal (rolling) index coverage, since one complete
chunk no longer finalizes a 1000-chunk window; the terminal demote+sweep
stays unit-tested in backfill/txindex_test.

Propagate #818's window->index naming into the daemon: windowsOverlapping
-> indexesOverlapping, the TxHashIndexLayout vars -> txLayout.

closes #815
chowbao added a commit that referenced this pull request Jun 26, 2026
Rebase the daemon layer onto the split #818 and follow the issue #824
package layout instead of the old single streaming package:

- backfill/: resolve.go + execute.go join process.go/txindex.go (the
  planner + bounded-worker executor). RunBackfill and the RunChunk/
  RunIndex test seams are exported so the daemon package's catch-up
  tests can inject fakes across the package boundary.
- observability/: the daemon's control-plane Metrics sink (NopMetrics
  and MetricsOrNop exported for backfill, which consumes the interface).
- fullhistory/ (top level, package fullhistory): daemon, startup
  (catch-up + serve), config_validate, progress, doc.

Adopt #817's chunks_per_txhash_index-is-a-constant change in the daemon
layer: drop the cpi config field + its form/restart validation, replace
PinLayout(cpi, earliest) with PinEarliestLedger(earliest), and build the
TxHashIndexLayout from geometry.ChunksPerTxhashIndex. The daemon E2E test
now exercises a non-terminal (rolling) index coverage, since one complete
chunk no longer finalizes a 1000-chunk window; the terminal demote+sweep
stays unit-tested in backfill/txindex_test.

Propagate #818's window->index naming into the daemon: windowsOverlapping
-> indexesOverlapping, the TxHashIndexLayout vars -> txLayout.

closes #815
// Snapshot which bucket dirs exist before the write, so the barrier below
// fsyncs the grandparent dirent only for dirs THIS freeze created. (Buckets
// are created in arbitrary order, so a chunk-id heuristic can't tell.)
bucketExisted := make(map[string]bool)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this snapshot and the newParent plumbing into BarrierNewFile just go away? It exists to skip the grandparent fsync when the bucket already existed — but fsyncing an unchanged directory is nearly free (no dirty metadata to flush). If BarrierNewFile always did file → parent → grandparent, you'd delete this loop, the newParent parameter, and the newIndexDir branch in txindex.go, for one fewer before-the-write step to get subtly wrong. (geometry's DeepestExistingDir/FsyncNewDirs already do "fsync a created dir chain" if you'd rather keep the optimization but not hand-roll it.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 7a51cf81. BarrierNewFile now always fsyncs file → parent → grandparent (the newParent bool is gone), so the bucketExisted snapshot loop in processChunk and the newIndexDir branch in buildTxhashIndex both disappear — and with the stat gone the index dir is just an idempotent os.MkdirAll. Always-fsync-grandparent is a strict superset of the old durability (an unchanged dir has no dirty metadata to flush, so it's nearly free), so it's never less safe.

I kept the hand-rolled two-level barrier rather than reaching for DeepestExistingDir/FsyncNewDirs: after the startup root creation, only a single bucket/index dir is ever created on demand, so file → parent → grandparent is exactly the depth needed — those helpers are for the multi-level root chain at startup.

}
defer func() { _ = closeSource() }()

if err := cat.MarkChunkFreezing(chunkID, kinds...); err != nil {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mark → write → barrier → flip skeleton is the crash-safety invariant, and it's spelled out twice (here and in buildTxhashIndex), with a third copy in the queued live-ingestion PR — openHotTierForChunk is PutHotTransient → mkdir/open → fsync → FlipHotReady. Three sites share this exact order. A small oneWrite(mark, create, barrier, flip) helper would keep the order in one place so none of the three can drift; the states and the create step differ per site, the order doesn't. Worth establishing here and adopting in the hot-tier open.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 7a51cf81. Added oneWrite(mark, create, barrier, flip func() error) error and adopted it in both processChunk and buildTxhashIndex, so the mark → create → barrier → flip order is defined once and the sites only supply the per-step closures.

I scoped it to the backfill package for now since both current callers live here; when #820 adds the hot-tier openHotTierForChunk (PutHotTransient → mkdir/open → fsync → FlipHotReady) it can promote/relocate oneWrite and adopt it as the third caller. Folding the ordering out also dropped both functions under the cyclop threshold (13 < 15), so I removed their //nolint:cyclop directives.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done now instead — promoted oneWrite to catalog.OneWrite in catalog_protocol.go, right under the protocol header it implements (the file already documents the same mark→create→barrier→flip steps). processChunk and buildTxhashIndex now call it across the package boundary.

It's a zero-dependency pure function and catalog never imports backfill, so there's no import cycle; #820's openHotTierForChunk adopts it as the third caller by import alone, with no later relocation. Pushed in 7e56b926.


// pollingBackendWaiter polls Tip until it reaches chunkLastLedger, ctx is
// canceled, or Timeout elapses.
type pollingBackendWaiter struct {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

github.com/cenkalti/backoff/v4 is already a direct dependency — this hand-rolled poll loop could use it and shed the timer/deadline/select bookkeeping. backoff.Retry with a ConstantBackOff (the interval), WithContext (cancellation), and WithMaxElapsedTime (the timeout) covers all of it; wrap the tip-query error in backoff.Permanent to keep its current fatal-on-error behavior. The executor's own retry logic (#819) could share the same primitive too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 7a51cf81. WaitForCoverage now drives backoff.Retry: the tip-query error is wrapped in backoff.Permanent (stays fatal), and the low-tip case returns the ErrBackendCoverageTimeout-wrapped error — which Retry surfaces once MaxElapsedTime stops the loop, so callers still classify the timeout via errors.Is.

Two deviations from the literal recipe, both deliberate:

  • WithMaxElapsedTime is an ExponentialBackOffOpts, not a wrapper around ConstantBackOff, so the constant poll is expressed as NewExponentialBackOff with WithMultiplier(1) + WithRandomizationFactor(0) (+ WithInitialInterval/WithMaxInterval = the interval).
  • I kept a per-call context.WithDeadline inside the operation: the backoff bounds total retry time but not a single in-flight Tip call, so dropping it would regress the hung-query bound added in 808eb622 (and its TestPollingBackendWaiter_TipBoundedByDeadline).

Added TestPollingBackendWaiter_TimeoutReturnsSentinel to cover the timeout→sentinel path. Agreed the #819 executor retry can share this primitive.


// forEachChunkTxHashState calls fn with each chunk's txhash state over [lo, hi].
// Centralizes the chunk.ID wraparound guard (the top can be the max id).
func forEachChunkTxHashState(cat *catalog.Catalog, lo, hi chunk.ID, fn func(chunk.ID, geometry.State) error) error {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go.mod is on 1.26, and the catalog already iterates this way (for e, err := range c.store.PrefixScan(...)), so this callback is a bit against the grain. As an iter.Seq2[chunk.ID, geometry.State] it'd read for cid, state := range cat.TxHashStates(lo, hi) { … }, and the two callers (txhashBinInputs, demotedTxhashRefs) could use a plain for with break/continue instead of returning sentinel errors from a closure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 7a51cf81. forEachChunkTxHashState is now txHashStates(...) iter.Seq2[…, error], and txhashBinInputs/demotedTxhashRefs use a plain for cs, err := range … with normal error handling and continue instead of returning sentinel errors from a closure.

One adjustment from the sketch: since catalog.State can return an error, I followed the codebase's iter.Seq2[T, error] scan convention (the same shape as PrefixScan/IterateLedgers) rather than iter.Seq2[chunk.ID, geometry.State] — yielding a small chunkTxHashState{Chunk, State} value paired with the error. I left it as a backfill free function (not a cat.TxHashStates method) to avoid growing the catalog API for a backfill-local helper, but happy to move it to a method if you'd prefer that form.

for _, kind := range kinds {
for _, path := range layout.ArtifactPaths(chunkID, kind) {
newParent := !bucketExisted[filepath.Dir(path)]
if berr := geometry.BarrierNewFile(path, newParent); berr != nil {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note, visible here but really a #824 / geometry-package thing: the fsync helpers BarrierNewFile & co. don't belong in geometry. BarrierNewFile, FsyncDir, FsyncParentDirs, DeepestExistingDir, FsyncNewDirs, DeleteFileIfExists, RmdirIfEmpty are generic os-level durability ops with zero geometry or domain references — they only landed in geometry because they shared paths.go with the path bijection in the monolith. geometry should be naming + math (identity and location, no I/O); crash-safe file ordering is a different concern that'd sit better in a small durable/fsync leaf package imported by catalog, backfill, and the hot-tier code. Worth folding into the #824 split.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to #824

chowbao added a commit that referenced this pull request Jun 26, 2026
Rebase the daemon layer onto the split #818 and follow the issue #824
package layout instead of the old single streaming package:

- backfill/: resolve.go + execute.go join process.go/txindex.go (the
  planner + bounded-worker executor). RunBackfill and the RunChunk/
  RunIndex test seams are exported so the daemon package's catch-up
  tests can inject fakes across the package boundary.
- observability/: the daemon's control-plane Metrics sink (NopMetrics
  and MetricsOrNop exported for backfill, which consumes the interface).
- fullhistory/ (top level, package fullhistory): daemon, startup
  (catch-up + serve), config_validate, progress, doc.

Adopt #817's chunks_per_txhash_index-is-a-constant change in the daemon
layer: drop the cpi config field + its form/restart validation, replace
PinLayout(cpi, earliest) with PinEarliestLedger(earliest), and build the
TxHashIndexLayout from geometry.ChunksPerTxhashIndex. The daemon E2E test
now exercises a non-terminal (rolling) index coverage, since one complete
chunk no longer finalizes a 1000-chunk window; the terminal demote+sweep
stays unit-tested in backfill/txindex_test.

Propagate #818's window->index naming into the daemon: windowsOverlapping
-> indexesOverlapping, the TxHashIndexLayout vars -> txLayout.

closes #815
…ages

Every error in process.go/txindex.go is either wrapped with %w (so the
chain already carries operation + id context) or a self-describing
leaf/sentinel. A package prefix just repeats at each wrap level and the
"streaming:" token is a stale package name besides. Drop it, per review.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 4f85de7dfd

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread cmd/stellar-rpc/internal/fullhistory/backfill/process.go Outdated
Comment thread cmd/stellar-rpc/internal/fullhistory/backfill/txindex.go Outdated
chowbao added 2 commits June 26, 2026 10:30
- geometry.BarrierNewFile: always fsync file -> parent -> grandparent; drop the
  newParent bool and the bucketExisted snapshot / newIndexDir branch that fed it.
  An fsync of an unchanged dir is nearly free, so the optimization wasn't worth
  the per-write bookkeeping. MkdirAll is idempotent, so the index-dir stat goes
  away too. (r3479586375)
- backfill: extract oneWrite(mark, create, barrier, flip) and adopt it in
  processChunk and buildTxhashIndex, so the crash-safe one-write order lives in
  one place rather than being spelled out per site. (r3479586490)
- pollingBackendWaiter.WaitForCoverage: replace the hand-rolled timer/select
  loop with backoff.Retry. The tip-query error is backoff.Permanent (stays
  fatal); the per-call context.WithDeadline is kept so a hung query still can't
  outlast Timeout; the low-tip error is returned once MaxElapsedTime stops the
  loop, so callers still classify the timeout via ErrBackendCoverageTimeout.
  WithMaxElapsedTime only composes with ExponentialBackOff, so a unit multiplier
  and zero randomization reduce it to a constant poll. (r3479586626)
- txindex: replace the forEachChunkTxHashState callback with an
  iter.Seq2[chunkTxHashState, error] iterator (the catalog scan convention,
  since catalog.State can fail); txhashBinInputs/demotedTxhashRefs use plain
  for-range with break/continue instead of sentinel errors from a closure.
  (r3479586743)

The //nolint:cyclop on processChunk/buildTxhashIndex are removed: both now sit
at complexity 13, under the 15 threshold.
Relocate the one-write protocol ordering helper (mark -> create -> barrier ->
flip) from backfill/process.go to catalog_protocol.go, where the protocol's
states and mark/flip steps already live, and export it as catalog.OneWrite.
processChunk and buildTxhashIndex now call it across the package boundary.

It is a zero-dependency pure function and catalog never imports backfill, so
there is no import cycle; #820's hot-tier openHotTierForChunk adopts it as the
third caller by import alone, with no later relocation. Addresses the #818
review thread that asked to establish the shared helper here rather than
deferring the move to #820.
chowbao added a commit that referenced this pull request Jun 26, 2026
The package is fullhistory, not streaming; remove the redundant prefix
from this layer's error and log messages. Capitalized identifier leads
(ExecConfig/StartConfig/Boundaries ...) are reworded lowercase to satisfy
stylecheck ST1005. The parent layers (#817/#818) still carry the prefix.
@tamirms

tamirms commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

Two remaining items in the backfill/ingest area — both more structural than the last round, and both cheap to do now precisely because none of this is wired into the daemon yet (ProcessConfig/processChunk/RunColdChunk have no production constructor; the daemon still runs the legacy ingest.BackfillMeta path). Fixing them before the streaming layers stack on top is much cheaper than after.

1. The ledger-source boundary between backfill and ingest

The problem. ingest.ChunkSource exposes only OpenStream(chunkID) → LedgerStream, which discards the one thing the bulk source can actually report: how far it currently reaches. To get that back, backfill bolts on a parallel apparatus — BackendWaiter + pollingBackendWaiter + an injected Tip func — to poll a "latest ledger" the source already knows. Separately, ingest still carries the dead multi-chunk batch path (RunColdrunOneChunkColdbuildColdIngesters) next to the live single-chunk RunColdChunk, which duplicates it almost line-for-line. These are two symptoms of one thing: the source abstraction is drawn in the wrong place.

What the SDK already gives us. ledgerbackend.LedgerStream (RawLedgers(ctx, range) iter.Seq2[[]byte,error]) is a universal streaming currency everything already speaks — the local pack (packStream already implements it) and the external backends (NewBufferedStorageStream, NewCaptiveCoreStream) all return it. So ChunkSource/OpenStream is a redundant wrapper over it.

The tip is a separate value — and one trap worth flagging: the SDK's LedgerBackend.GetLatestLedgerSequence is not the tip we want. It's prepared-relative — on BufferedStorageBackend it errors before PrepareRange and otherwise returns the buffer's latest, not the source's frontier. The frontier tip is an independent value per backend: datastore.FindLatestLedgerSequence for the lake, and the history archive's GetRootHAS().CurrentLedger for captive core (whose own GetLatestLedgerSequence likewise needs a running, prepared core, so it's no use here).

Proposed shape. The materializer becomes source-blind — it takes the raw ledger iterator and nothing else, and is faked in tests with a literal slice iterator:

func WriteColdChunk(ctx, logger, chunkID chunk.ID,
	raw iter.Seq2[[]byte, error], dirs ColdDirs, sink MetricSink, cfg Config) error

backfill owns the external-source seam, one interface:

// An external ledger source (BSB now, captive core later) we fetch from and may
// have to wait for. The local frozen .pack is NOT a Backend — it's complete, so
// it's a bare ledgerbackend.LedgerStream with nothing to wait on.
//   bsbSource     (now)   Tip = datastore.FindLatestLedgerSequence
//   captiveSource (later) Tip = historyarchive GetRootHAS().CurrentLedger
type Backend interface {
	ledgerbackend.LedgerStream
	Tip(ctx context.Context) (uint32, error)
}
type ProcessConfig struct { /* … */ Backend Backend }   // was: Backend ChunkSource + BackendWaiter + Tip func

This Backend is a new interface and deliberately not an evolution of today's BackendWaiter — that interface, its pollingBackendWaiter implementation, and NewPollingBackendWaiter are removed entirely. The coverage-polling they did becomes just the Tip method above plus the waitForCoverage function below; nothing in backfill implements BackendWaiter anymore.

The one impl wired now:

type bsbSource struct {
	ledgerbackend.LedgerStream          // NewBufferedStorageStream
	ds datastore.DataStore              // long-lived; tip only; daemon-Closed
}
func (s *bsbSource) Tip(ctx context.Context) (uint32, error) { return datastore.FindLatestLedgerSequence(ctx, s.ds) }

Coverage-waiting collapses to one waitForCoverage function that polls Backend.Tip. This is what removes BackendWaiter, pollingBackendWaiter, and NewPollingBackendWaiter:

func waitForCoverage(ctx, b Backend, target uint32, interval, timeout time.Duration) error // polls b.Tip

The local pack moves to its codec owner as a bare stream, constructed inside backfillSource (not injected) when a chunk's ledgers are already frozen locally and we're deriving the other artifacts from them:

func NewPackStream(path string) ledgerbackend.LedgerStream   // the existing packStream, relocated to pkg/stores/ledger

so the call site is uniform — both arms are just a LedgerStream:

src, err := backfillSource(ctx, chunkID, artifacts, cfg)   // pack OR cfg.Backend
raw := src.RawLedgers(ctx, ledgerbackend.BoundedRange(chunkID.FirstLedger(), chunkID.LastLedger()))
ingest.WriteColdChunk(ctx, cfg.Logger, chunkID, raw, dirs, cfg.Sink, ingestConfigFor(artifacts))

Captive core is deferred, but the path stays open. No need to implement it now — the Backend interface is the seam. It's one new struct (embed NewCaptiveCoreStream + a Tip from the history archive) plus one daemon factory case, with zero changes to ProcessConfig, backfillSource, waitForCoverage, or WriteColdChunk. Captive's tip comes from historyarchive.NewArchivePool(cfg.HistoryArchiveURLs, …).GetRootHAS().CurrentLedger. Listing the intended impls in the Backend doc comment keeps the intent visible without a stub.

Net: delete ingest/source.go whole (ChunkSource, ChunkSourceFunc, packSource, dataStoreSource, NewDataStoreSource); the dead RunCold/runOneChunkCold/buildColdIngesters; and backfill's BackendWaiter/pollingBackendWaiter/NewPollingBackendWaiter/Tip. Rename buildColdIngestersInbuildColdIngesters and RunColdChunkWriteColdChunk. Roughly −7 types and −3 constructors for +1 interface.

Two things to keep honest in the code: the ingest package still imports ledgerbackend for RunHot, so only the materializer function is source-blind, not the package; and since source resolution (pack-stat, coverage wait) now runs in backfill before WriteColdChunk, a pack-missing or coverage-timeout failure is metered there rather than as an ingest ColdChunkTotal attempt — worth a line in the doc.

2. catalog.OneWrite — revert the two backfill sites to straight-line

First, apologies for the round-trip: I asked for the oneWrite wrapper last round, and having gone through both call sites properly since, straight-line reads better and I'd suggest reverting it. Sorry for sending you back and forth on it.

Why the wrapper doesn't pay off: every step wraps its error with per-site context (chunkID/artifacts in processChunk; w/lo/hi/cov/idxPath in buildTxhashIndex), so the closures can't collapse to one-liners — they stay multi-line and carry the closure wrapper on top. And buildTxhashIndex pays extra: because the flip needs the cov the mark returns, it hoists var cov …; var idxPath string above the call and does var merr error; cov, merr = … (no :=). Straight-line, those are ordinary downward locals:

// one-write:mark — returns the coverage the commit (flip) needs.
cov, err := cat.MarkTxHashIndexFreezing(w, lo, hi)
if err != nil { return fmt.Errorf("buildTxhashIndex mark freezing %s: %w", geometry.TxHashIndexKey(w, lo, hi), err) }
idxPath := layout.TxHashIndexFilePath(cov)
// one-write:create  (os.MkdirAll + txhash.BuildColdIndex(ctx, inputs, idxPath, …))
// one-write:barrier (geometry.BarrierNewFile(idxPath))
// one-write:flip — CommitTxHashIndex(cov) promotes cov, demotes predecessor, demotes window .bin on terminal.
if err := cat.CommitTxHashIndex(cov); err != nil { return fmt.Errorf("buildTxhashIndex commit %s [%s,%s]: %w", w, lo, hi, err) }
return nil

Nothing is lost on safety: the wrapper only sequences four calls — it never checks their contents (a wrong fsync path, the wrong catalog method, a dropped step all pass it), so it isn't what guarantees crash-safety. That guarantee is catalog/crashsafety_test.go, which drives the protocol to each crash point and asserts the surviving state is recoverable. A // one-write:mark|create|barrier|flip label per step keeps the four beats visible and greppable. processChunk converts the same way.

So: delete catalog.OneWrite and inline both sites.

…evert OneWrite

Redraw the ledger-source boundary between backfill and ingest, and revert the
catalog.OneWrite wrapper, per the review (comment 4812281401). None of this is
wired into the daemon yet, so the structural change is cheap to make now.

ingest: make the cold materializer source-blind. RunColdChunk becomes
WriteColdChunk(ctx, logger, chunkID, raw iter.Seq2[[]byte, error], dirs, sink,
cfg), taking the raw ledger iterator rather than a source. Delete source.go
whole (ChunkSource, ChunkSourceFunc, packSource/NewPackSource, dataStoreSource/
NewDataStoreSource) and the dead bulk path (RunCold, the per-chunk drive, the
buildColdIngesters convenience); rename buildColdIngestersIn -> buildColdIngesters.
drain now takes the raw iterator; RunHot takes a ledgerbackend.LedgerStream
directly. Source resolution (pack-stat, coverage wait) moves to the backfill
caller, so a pack-missing/coverage-timeout failure is metered there, not as a
cold ColdChunkTotal attempt.

ledger: relocate packStream into the store as NewPackStream(path string) — the
local frozen .pack is just a bare ledgerbackend.LedgerStream.

backfill: introduce Backend{ ledgerbackend.LedgerStream; Tip(ctx) } — bsbSource
now (Tip = datastore.FindLatestLedgerSequence), captive core later (the interface
is the seam). Collapse BackendWaiter/pollingBackendWaiter/NewPollingBackendWaiter
into a single waitForCoverage, and reduce ProcessConfig's source surface to one
Backend. backfillSource returns a bare LedgerStream (pack arm via NewPackStream,
bulk arm gated by waitForCoverage on the Backend's Tip).

catalog: revert OneWrite. Inline the four-step protocol straight-line at
processChunk and buildTxhashIndex, each step labelled // one-write:mark|create|
barrier|flip. The wrapper only sequenced calls; crash-safety is guaranteed by
catalog/crashsafety_test.go, not the wrapper.
@chowbao

chowbao commented Jun 26, 2026

Copy link
Copy Markdown
Contributor Author

Both done in 096463ef.

1. Source boundary.

  • Deleted ingest/source.go (ChunkSource/packSource/dataStoreSource) and the dead RunCold path.
  • Materializer is now source-blind: WriteColdChunk(ctx, logger, chunkID, raw iter.Seq2[[]byte, error], dirs, sink, cfg).
  • buildColdIngestersInbuildColdIngesters; packStreamledger.NewPackStream(path).
  • backfill owns the seam: Backend{ ledgerbackend.LedgerStream; Tip } (bsbSource now, captive later), BackendWaiter/polling collapsed into waitForCoverage, ProcessConfig down to one Backend, backfillSource returns a bare LedgerStream.

2. catalog.OneWrite reverted — inlined straight-line at both sites with // one-write: labels.

golangci-lint:
- backend.go: blank line separating the embedded LedgerStream from the
  ds field (embeddedstructfieldcheck)
- ingest_test.go: wrap the >120-char WriteColdChunk call in
  TestWriteColdChunk_CanceledContext (lll)
- ingest_test.go: //nolint:unparam on the chunk-general packPath/rawChunk
  test helpers — every surviving caller uses chunk 0 after the multi-chunk
  RunCold tests were removed, but the params stay to express intent

review nits:
- rename TestRunCold_* -> TestWriteColdChunk_* and fix the stale "RunCold"
  prose now that the cold materializer is WriteColdChunk (keeping the two
  historical references to the removed RunCold)
- drop the dead `_ = root` binding in
  TestProcessChunk_ProducesAllArtifactsAndFreezes

@tamirms tamirms left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving. The source-boundary redesign and the OneWrite revert both landed cleanly in 096463ef, and the rest of the primitives layer matches the design — the resolver/executor and daemon wiring are correctly left to their later layers, and the empty-window (#826) and geometry/fsync (#824) items are tracked. Build and unit tests are green; the two red checks are the unrelated XDR Go/Rust rev mismatch.

A few optional cosmetic nits, none blocking:

  • process.go:93 — the one-write comment is garbled (a dangling "The" and a doubled // //); txindex.go has the clean version to copy.
  • geometry/paths.go:109 — the TxHashRawRoot doc still references RunCold, which this PR removed; the TestRunCold_* names in ingest_test.go are similarly stale now that they call WriteColdChunk.
  • ingest/service.go — the ColdService struct comment says the timer starts "from the Finalize/Close call," but it actually starts at construction (the constructor comment is the accurate one).

chowbao added a commit that referenced this pull request Jun 26, 2026
Rebase the daemon layer onto the split #818 and follow the issue #824
package layout instead of the old single streaming package:

- backfill/: resolve.go + execute.go join process.go/txindex.go (the
  planner + bounded-worker executor). RunBackfill and the RunChunk/
  RunIndex test seams are exported so the daemon package's catch-up
  tests can inject fakes across the package boundary.
- observability/: the daemon's control-plane Metrics sink (NopMetrics
  and MetricsOrNop exported for backfill, which consumes the interface).
- fullhistory/ (top level, package fullhistory): daemon, startup
  (catch-up + serve), config_validate, progress, doc.

Adopt #817's chunks_per_txhash_index-is-a-constant change in the daemon
layer: drop the cpi config field + its form/restart validation, replace
PinLayout(cpi, earliest) with PinEarliestLedger(earliest), and build the
TxHashIndexLayout from geometry.ChunksPerTxhashIndex. The daemon E2E test
now exercises a non-terminal (rolling) index coverage, since one complete
chunk no longer finalizes a 1000-chunk window; the terminal demote+sweep
stays unit-tested in backfill/txindex_test.

Propagate #818's window->index naming into the daemon: windowsOverlapping
-> indexesOverlapping, the TxHashIndexLayout vars -> txLayout.

closes #815
chowbao added a commit that referenced this pull request Jun 26, 2026
The package is fullhistory, not streaming; remove the redundant prefix
from this layer's error and log messages. Capitalized identifier leads
(ExecConfig/StartConfig/Boundaries ...) are reworded lowercase to satisfy
stylecheck ST1005. The parent layers (#817/#818) still carry the prefix.
chowbao added a commit that referenced this pull request Jun 26, 2026
#818's review unified its bulk-source seams: the separate ChunkSource
(OpenStream) and BackendWaiter (WaitForCoverage) collapsed into one
backfill.Backend interface (ledgerbackend.LedgerStream + Tip), with the
coverage wait now internal to backfill (waitForCoverage, driven by
Backend.Tip). Adapt the daemon layer to the frozen #818 surface:

- Boundaries.Backend is now a backfill.Backend; drop the BackendWaiter
  field, its validate() check, and the ProcessConfig wiring.
- backendTip adapts a backfill.Backend to NetworkTipBackend via Tip, so
  the catch-up tip and the freeze's coverage frontier are one source and
  cannot disagree. Its old WaitForCoverage half (and the four
  TestBackendTip_WaitForCoverage* tests) are dropped — that logic now
  lives in and is tested by backfill.waitForCoverage.
- E2E + adapter tests use a fakeBackend (LedgerStream + Tip) in place of
  the old countingChunkSource/fakeWaiter/fakeLedgerBackend fakes.
chowbao added a commit that referenced this pull request Jun 26, 2026
The resolver's kind-rules comment used windowFirstChunk/windowLastChunk —
names that read like identifiers but don't exist; the code calls
txLayout.FirstChunk(w)/LastChunk(w). Name the real API so the pseudo-code
is greppable and accurate. Prose still says "window" for the index unit,
matching #818's txindex.go convention and the design doc.
chowbao added a commit that referenced this pull request Jun 26, 2026
… loop

Per tamirms's #818 review ("the executor's own retry logic (#819) could
share the same primitive too"), replace the hand-rolled withRetries +
retryBackoffFor + ctxSleep with cenkalti/backoff — the same primitive
backfillSource's waitForCoverage already uses. Behavior-preserving:
exponential x2 from RetryBackoff, capped at maxRetryBackoff, no jitter,
MaxRetries+1 total attempts, ctx-cancellation aborts. Drops the retrySleep
test seam; the retry tests now assert the policy's NextBackOff sequence
directly and use a tiny base for the call-count tests.
Base automatically changed from streaming-phase1-foundations to feature/full-history June 27, 2026 17:02
@chowbao chowbao merged commit 2dfd886 into feature/full-history Jun 27, 2026
13 of 15 checks passed
@chowbao chowbao deleted the streaming-phase1-primitives branch June 27, 2026 17:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants