Streaming daemon: Phase 1 layer 2 — primitives (#815)#818
Conversation
Catalog + one-write protocol + key schema, config loader/locking, chunk/window geometry, the retention floor/gate, the crash-injection hook seam, and the artifact/path layout for the full-history streaming daemon. Cold substrate only; the primitives (processChunk, buildTxhashIndex) and orchestration land in the stacked layers above. Part of #815.
3d6b182 to
20c4e3b
Compare
672d01f to
55cabde
Compare
Non-behavioral cleanup of the Phase 1 foundations package. Code: - Delegate cold-artifact leaf filenames to the owning store packages (ledger.PackName, eventstore.*, txhash.ColdBinName) instead of re-deriving them in Layout — one source of truth, matching ingest. - Alias DefaultChunksPerTxhashIndex to txhash.DefaultChunksPerIndex and DefaultEarliestLedger to EarliestGenesis. - Extract syncAndClose, the sync-then-close epilogue shared by fsyncFile/fsyncDir. - Compute chunkKey once per iteration in windowTxhashKeysPresent. - Drop the duplicate Windows.ChunksIn (identical to ChunksPerIndex). Docs: - Condense doc comments across the package (~26% fewer comment words; the high-volume files 20–43%), cutting repetition while preserving invariants, rationale, design-doc citations, and //nolint directives. No behavior change; gofmt clean, build/vet/tests green.
Review-time cleanups, no behavior change: - Rename the Paths.LockRoots() accessor to RootsToLock() so it no longer echoes the package-level LockRoots() that actually acquires the flocks. The call site now reads LockRoots(paths.RootsToLock()...): the method is the noun (roots to lock), the func is the verb (lock them). - Fix stale filename references in the Catalog doc comment (protocol.go -> catalog_protocol.go, sweep.go -> catalog_sweep.go). gofmt clean; build/vet/tests green.
No behavior change: - Remove the unexported retentionFloorFor (a one-line pass-through to effectiveRetentionFloor) and seqWithinRetention (test-only). NewRetentionGate now calls effectiveRetentionFloor directly. - Drop the tautological test assertion that the free function and the gate agree — both were seq >= effectiveRetentionFloor(...), so gate.Admits already covers it. - Fold the free-floating contract header onto the RetentionGate type and condense the docs (retention.go 95 -> 59 lines). gofmt clean; vet 0; tests green.
The ArtifactSet doc cited "design-docs rule 2" / "rule 1's per-kind idempotency", but the design's only numbered rules are the reader contract's (a different concept). Describe the concepts directly instead. Comment-only; no behavior change.
Pure test reorganization, no logic change — the 894-line omnibus split by concern, function bodies moved verbatim: - window_test.go geometry / window arithmetic - keys_test.go key schema + key<->path bijection - catalog_test.go catalog reader (pins, scans, FrozenCoverage) - catalog_protocol_test.go mark/flip + CommitIndex - catalog_sweep_test.go the two sweeps - crashsafety_test.go power-loss-between-steps + never-unlink-under-frozen - helpers_test.go shared test helpers + vars Same 33 tests (51 package-wide); gofmt clean, vet/test green.
aac0f4b to
1b47f8e
Compare
Review follow-ups to the Phase 1 foundations, the first being a crash-safety
fix:
- Durability: MkdirAll creates the storage roots but fsyncs neither the new
dirs nor the direntries naming them, and the one-write protocol's
grandparent fsync (barrierNewFile) only reaches a root's CONTENTS, never the
root's own link in its parent. On a fresh deployment a crash just after the
first freeze could lose a whole storage subtree while the synced catalog
still advertised a "frozen" artifact under it. lockOne now records the
deepest pre-existing ancestor before MkdirAll and fsyncs the created chain
(deepestExistingDir + fsyncNewDirs in paths.go) — one extra dir fsync per
root at startup.
- Catalog doc: warn that PutEarliestLedger/PutChunksPerTxhashIndex must not be
used to pin the layout on first start (that is PinLayout's atomic
both-or-neither job); they exist only for isolated pin writes.
- windowTxhashKeysPresent: drop the redundant cid <= last condition — the
explicit `if cid == last { break }` is the real inclusive-bound /
chunk.ID-wraparound guard.
- Tests: cover retention_chunks=0 (full history pins at earliest_ledger, the
earliest-wins branch the existing tests miss) and the young-store /
oversized-retention clamp to genesis; and prove the flock is released when
the lock fd is closed without LOCK_UN — the kernel guarantee kill -9 relies
on.
gofmt clean; the new fsync helpers were build- and behavior-checked standalone.
Part of #815.
1b47f8e to
72368b2
Compare
…export Get/Has
Addresses three review comments on the Phase 1 foundations layer:
- r3473146235: rename the "window" concept to "tx-hash index" end to end and
change the on-disk catalog key prefix index: -> txhash_index:. The tx-hash
index family is now uniformly qualified: WindowID->TxHashIndexID,
Windows->TxHashIndexLayout, IndexCoverage->TxHashIndexCoverage,
FrozenCoverage->FrozenTxHashIndex, IndexKeys/AllIndexKeys->TxHashIndexKeys/
AllTxHashIndexKeys, MarkIndexFreezing/CommitIndex/SweepIndexKey->
Mark/Commit/Sweep TxHashIndex*, IndexFilePath->TxHashIndexFilePath,
IndexWindowDir->TxHashIndexDir, the unexported indexKey/parseIndexKey/
indexPrefix helpers, and the TxHashIndexCoverage.Index / Catalog.txhashIndex
fields. ("crash window" and "Retention window" are different concepts, kept.)
- r3473322303: remove the speculative PutEarliestLedger/PutChunksPerTxhashIndex
single-key setters (no production caller); first-start pinning is PinLayout's
atomic both-or-neither job. TestConfigPins now drives PinLayout, giving it its
first test coverage.
- r3473953493: un-export Catalog.Get/Has -> get/has; nothing outside the package
uses them (the daemon entrypoint is in-package).
gofmt/vet/test -short green. Part of #815.
Admits/Floor() had no caller anywhere in the stack (only retention_test.go), and
TxHashIndexBelowFloor was identical to ChunkBelowFloor(LastChunk(idx)). Collapse
the type to RetentionFloor{chunk chunk.ID} with a single Excludes(c) predicate:
effectiveRetentionFloor is already chunk-aligned and every caller either converts
it to a chunk or compares whole chunks, so the floor is a chunk.ID — dropping the
TxHashIndexLayout parameter and the .LastLedger() arithmetic. An index is below
the floor exactly when its last chunk is, so callers use Excludes(layout.LastChunk
(idx)). The reader's seq-level admit predicate and the ledger-seq floor for the
§8.2 coverage filter come back with the read path (#772). Part of #815.
…71335272) Split the platform-specific lock primitive behind acquireLock/releaseLock: config_lock_unix.go keeps the flock (LOCK_EX|LOCK_NB, EWOULDBLOCK→errLockHeld), and config_lock_windows.go adds the equivalent LockFileEx (LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY, ERROR_LOCK_VIOLATION→errLockHeld) over a one-byte range. Both release on handle close and on any process exit, so the crash-release contract holds on both platforms. The shared config_lock.go is now platform-neutral (errLockHeld sentinel → ErrRootLocked). Part of #815.
Reorganize the TOML schema around what each field governs, not which phase
touches it, per tamirms's proposal:
[layout] chunks_per_txhash_index (window-size geometry; PINNED)
[retention] earliest_ledger (PINNED), retention_chunks
[storage] catalog, ledgers, events, txhash_raw, txhash_index, hot
(consolidates the old [catalog]/[immutable_storage.*]/
[streaming.hot_storage] sections into flat path keys)
[backfill] workers, max_retries, [backfill.bsb] (genuine backfill knobs only)
[ingestion] captive_core_config (renamed from [streaming])
chunks_per_txhash_index was geometry sitting under [backfill]; retention_chunks
and the pinned earliest_ledger were under [streaming] though they drive the
backfill range. The two PINNED fields (set once on first start, abort-on-mismatch
after) now have that contract called out on the field docs. ResolvePaths reads
the consolidated [storage]; resolved Paths and RootsToLock are unchanged.
gofmt/vet/test -short green. Part of #815.
Doc/name cleanup from the review-response churn (no behavior change): - catalog.go: get/has doc comments still read "Get"/"Has" after un-exporting them. - config_lock.go, paths.go: prose still named the old [catalog]/[immutable_storage.*]/ [streaming.hot_storage] sections after the [storage] consolidation. - catalog_protocol.go: drop the stale "rule 1" design-rule-number reference (adcb936 removed it elsewhere; this instance was missed). - window_test.go: TestNewWindows_Validation/TestWindowArithmetic -> TestNewTxHashIndexLayout_Validation/TestTxHashIndexArithmetic. gofmt/vet/test -short green. Part of #815.
The CI golangci-lint run (default: all) flagged 8 issues, all in this PR's new files: - funcorder: the now-unexported Catalog.get/has must sit after the exported methods — moved them into the unexported-helpers section after PinLayout. - godoclint: the EarliestGenesis and DefaultEarliestLedger const docs must start with the symbol name. - gosec G115 (uintptr->int on the flock fd): routed both unix.Flock calls through a single fdInt(f) helper carrying one //nolint:gosec, instead of two inline directives (avoids a too-long line and a duplicate-conversion unused-directive). - nolintlint: dropped two now-unused //nolint:gosec directives in artifacts.go (gosec no longer flags the uint8(1)<<i shifts). Verified with golangci-lint v2.11.3 (the CI version): 0 issues, full-scan. gofmt/test -short green. Part of #815.
72368b2 to
531dca9
Compare
…hanges Rebased onto the updated #818 and propagated #817's API changes into the orchestration/daemon layer: - window -> tx-hash index rename + key prefix index: -> txhash_index: (incl. IndexBuild.Window -> .Index, Catalog.windows -> .txhashIndex). - Catalog.Get/Has -> unexported get/has. - config sections regrouped: cfg.Backfill.ChunksPerTxhashIndex -> cfg.Layout.*, cfg.Streaming.{RetentionChunks,EarliestLedger,CaptiveCoreConfig} -> cfg.Retention.* / cfg.Ingestion.*, cfg.Catalog.Path + cfg.ImmutableStorage.* + cfg.Streaming. HotStorage -> cfg.Storage.*; daemon_test config literals + TOML fixtures updated. - removed pin setters: tests now pin via PinLayout(testCPI, earliest). Mechanical propagation only; build/vet/test -short green (incl. the daemon E2E).
…, drop Windows Addresses tamirms's round-2 review on #817 and the codex threads behind it. - window.go -> txhash_index.go: cap MaxChunksPerTxhashIndex at the cold tx-hash index's payload capacity (2^(8*ColdPayloadSize) / LedgersPerChunk, ~1,677), derived from txhash.ColdPayloadSize so the two can't drift. A larger cpi passed validation but made every index build fail, and cpi is immutable once pinned. (codex window.go:18) - CommitTxHashIndex: terminal demotion now covers only cov's [Lo,Hi] range, not the whole index window, so a frozen .bin below Lo (a lower coverage's input) survives. (codex catalog_protocol.go:101) - CommitTxHashIndex: refuse a stale/out-of-order commit whose range the frozen coverage already spans, so FrozenTxHashIndex never regresses to a shorter index. (codex catalog_protocol.go:110) - SweepTxHashIndexKey: base the frozen->pruning demote on the current durable value, not the caller's possibly-stale cov.State; an absent key is a no-op. (codex catalog_sweep.go:69) - MarkChunkFreezing: refuse to re-materialize a "pruning" key — a sweep owns it, and resurrecting it would race into a frozen key with no file. (codex catalog_protocol.go:36) - Drop the crash-test hooks (crashHooks, the fire*/failCommitBatch injection, and the tests that used them); keep the invariant-check tests. Real replacement tracked in #823. - Revert config_lock to unix-only (fold flock back in, delete the unix/windows split) and remove windows-latest from the build matrix: directory fsync has no Windows equivalent (codex paths.go:151), so the daemon can't run there. Follow-ups opened: #823 (fault-injection crash-safety harness), #824 (split the streaming package).
The streaming daemon is unix-only, but cmd/stellar-rpc does not import the streaming package yet, so the Windows `go build ./cmd/stellar-rpc` job still passes. Keep the matrix entry untouched until the daemon is wired into the binary (when it will need to be revisited).
First step of #824 (done now per the round-2 review request): carve the catch-all streaming package into purpose-named subpackages. streaming/ config, single-process locking, retention floor geometry/ chunk + tx-hash-index math, the key/path-naming bijection, and the fsync helpers (was keys.go, paths.go, txhash_index.go) catalog/ durable on-disk state: the one-write protocol, the key-driven sweeps, and the artifact-kind sets (was catalog*.go, artifacts.go) The dependency graph is one-way with no cycles: geometry depends on neither streaming nor catalog; catalog imports only geometry; streaming imports geometry (config_lock, retention). Cross-package helpers that were unexported are now exported from geometry (ChunkKey, FsyncDir, LastCompleteChunkAt, ...). NewLayoutFromPaths stays in streaming as a thin bridge over the new geometry.NewLayoutFromRoots, which takes plain strings so geometry carries no dependency on the config Paths type. The parent directory stays `streaming` for now; renaming it (dropping the codename) and the remaining store/lifecycle splits are deferred to the rest of #824. No behavior change — a pure repackage. Verified: go build, go vet, and go test -race are all green for the three packages.
Rebase the daemon layer onto the split #818 and follow the issue #824 package layout instead of the old single streaming package: - backfill/: resolve.go + execute.go join process.go/txindex.go (the planner + bounded-worker executor). RunBackfill and the RunChunk/ RunIndex test seams are exported so the daemon package's catch-up tests can inject fakes across the package boundary. - observability/: the daemon's control-plane Metrics sink (NopMetrics and MetricsOrNop exported for backfill, which consumes the interface). - fullhistory/ (top level, package fullhistory): daemon, startup (catch-up + serve), config_validate, progress, doc. Adopt #817's chunks_per_txhash_index-is-a-constant change in the daemon layer: drop the cpi config field + its form/restart validation, replace PinLayout(cpi, earliest) with PinEarliestLedger(earliest), and build the TxHashIndexLayout from geometry.ChunksPerTxhashIndex. The daemon E2E test now exercises a non-terminal (rolling) index coverage, since one complete chunk no longer finalizes a 1000-chunk window; the terminal demote+sweep stays unit-tested in backfill/txindex_test. Propagate #818's window->index naming into the daemon: windowsOverlapping -> indexesOverlapping, the TxHashIndexLayout vars -> txLayout. closes #815
Rebase the daemon layer onto the split #818 and follow the issue #824 package layout instead of the old single streaming package: - backfill/: resolve.go + execute.go join process.go/txindex.go (the planner + bounded-worker executor). RunBackfill and the RunChunk/ RunIndex test seams are exported so the daemon package's catch-up tests can inject fakes across the package boundary. - observability/: the daemon's control-plane Metrics sink (NopMetrics and MetricsOrNop exported for backfill, which consumes the interface). - fullhistory/ (top level, package fullhistory): daemon, startup (catch-up + serve), config_validate, progress, doc. Adopt #817's chunks_per_txhash_index-is-a-constant change in the daemon layer: drop the cpi config field + its form/restart validation, replace PinLayout(cpi, earliest) with PinEarliestLedger(earliest), and build the TxHashIndexLayout from geometry.ChunksPerTxhashIndex. The daemon E2E test now exercises a non-terminal (rolling) index coverage, since one complete chunk no longer finalizes a 1000-chunk window; the terminal demote+sweep stays unit-tested in backfill/txindex_test. Propagate #818's window->index naming into the daemon: windowsOverlapping -> indexesOverlapping, the TxHashIndexLayout vars -> txLayout. closes #815
| // Snapshot which bucket dirs exist before the write, so the barrier below | ||
| // fsyncs the grandparent dirent only for dirs THIS freeze created. (Buckets | ||
| // are created in arbitrary order, so a chunk-id heuristic can't tell.) | ||
| bucketExisted := make(map[string]bool) |
There was a problem hiding this comment.
Could this snapshot and the newParent plumbing into BarrierNewFile just go away? It exists to skip the grandparent fsync when the bucket already existed — but fsyncing an unchanged directory is nearly free (no dirty metadata to flush). If BarrierNewFile always did file → parent → grandparent, you'd delete this loop, the newParent parameter, and the newIndexDir branch in txindex.go, for one fewer before-the-write step to get subtly wrong. (geometry's DeepestExistingDir/FsyncNewDirs already do "fsync a created dir chain" if you'd rather keep the optimization but not hand-roll it.)
There was a problem hiding this comment.
Done in 7a51cf81. BarrierNewFile now always fsyncs file → parent → grandparent (the newParent bool is gone), so the bucketExisted snapshot loop in processChunk and the newIndexDir branch in buildTxhashIndex both disappear — and with the stat gone the index dir is just an idempotent os.MkdirAll. Always-fsync-grandparent is a strict superset of the old durability (an unchanged dir has no dirty metadata to flush, so it's nearly free), so it's never less safe.
I kept the hand-rolled two-level barrier rather than reaching for DeepestExistingDir/FsyncNewDirs: after the startup root creation, only a single bucket/index dir is ever created on demand, so file → parent → grandparent is exactly the depth needed — those helpers are for the multi-level root chain at startup.
| } | ||
| defer func() { _ = closeSource() }() | ||
|
|
||
| if err := cat.MarkChunkFreezing(chunkID, kinds...); err != nil { |
There was a problem hiding this comment.
This mark → write → barrier → flip skeleton is the crash-safety invariant, and it's spelled out twice (here and in buildTxhashIndex), with a third copy in the queued live-ingestion PR — openHotTierForChunk is PutHotTransient → mkdir/open → fsync → FlipHotReady. Three sites share this exact order. A small oneWrite(mark, create, barrier, flip) helper would keep the order in one place so none of the three can drift; the states and the create step differ per site, the order doesn't. Worth establishing here and adopting in the hot-tier open.
There was a problem hiding this comment.
Done in 7a51cf81. Added oneWrite(mark, create, barrier, flip func() error) error and adopted it in both processChunk and buildTxhashIndex, so the mark → create → barrier → flip order is defined once and the sites only supply the per-step closures.
I scoped it to the backfill package for now since both current callers live here; when #820 adds the hot-tier openHotTierForChunk (PutHotTransient → mkdir/open → fsync → FlipHotReady) it can promote/relocate oneWrite and adopt it as the third caller. Folding the ordering out also dropped both functions under the cyclop threshold (13 < 15), so I removed their //nolint:cyclop directives.
There was a problem hiding this comment.
Done now instead — promoted oneWrite to catalog.OneWrite in catalog_protocol.go, right under the protocol header it implements (the file already documents the same mark→create→barrier→flip steps). processChunk and buildTxhashIndex now call it across the package boundary.
It's a zero-dependency pure function and catalog never imports backfill, so there's no import cycle; #820's openHotTierForChunk adopts it as the third caller by import alone, with no later relocation. Pushed in 7e56b926.
|
|
||
| // pollingBackendWaiter polls Tip until it reaches chunkLastLedger, ctx is | ||
| // canceled, or Timeout elapses. | ||
| type pollingBackendWaiter struct { |
There was a problem hiding this comment.
github.com/cenkalti/backoff/v4 is already a direct dependency — this hand-rolled poll loop could use it and shed the timer/deadline/select bookkeeping. backoff.Retry with a ConstantBackOff (the interval), WithContext (cancellation), and WithMaxElapsedTime (the timeout) covers all of it; wrap the tip-query error in backoff.Permanent to keep its current fatal-on-error behavior. The executor's own retry logic (#819) could share the same primitive too.
There was a problem hiding this comment.
Done in 7a51cf81. WaitForCoverage now drives backoff.Retry: the tip-query error is wrapped in backoff.Permanent (stays fatal), and the low-tip case returns the ErrBackendCoverageTimeout-wrapped error — which Retry surfaces once MaxElapsedTime stops the loop, so callers still classify the timeout via errors.Is.
Two deviations from the literal recipe, both deliberate:
WithMaxElapsedTimeis anExponentialBackOffOpts, not a wrapper aroundConstantBackOff, so the constant poll is expressed asNewExponentialBackOffwithWithMultiplier(1)+WithRandomizationFactor(0)(+WithInitialInterval/WithMaxInterval= the interval).- I kept a per-call
context.WithDeadlineinside the operation: the backoff bounds total retry time but not a single in-flightTipcall, so dropping it would regress the hung-query bound added in808eb622(and itsTestPollingBackendWaiter_TipBoundedByDeadline).
Added TestPollingBackendWaiter_TimeoutReturnsSentinel to cover the timeout→sentinel path. Agreed the #819 executor retry can share this primitive.
|
|
||
| // forEachChunkTxHashState calls fn with each chunk's txhash state over [lo, hi]. | ||
| // Centralizes the chunk.ID wraparound guard (the top can be the max id). | ||
| func forEachChunkTxHashState(cat *catalog.Catalog, lo, hi chunk.ID, fn func(chunk.ID, geometry.State) error) error { |
There was a problem hiding this comment.
go.mod is on 1.26, and the catalog already iterates this way (for e, err := range c.store.PrefixScan(...)), so this callback is a bit against the grain. As an iter.Seq2[chunk.ID, geometry.State] it'd read for cid, state := range cat.TxHashStates(lo, hi) { … }, and the two callers (txhashBinInputs, demotedTxhashRefs) could use a plain for with break/continue instead of returning sentinel errors from a closure.
There was a problem hiding this comment.
Done in 7a51cf81. forEachChunkTxHashState is now txHashStates(...) iter.Seq2[…, error], and txhashBinInputs/demotedTxhashRefs use a plain for cs, err := range … with normal error handling and continue instead of returning sentinel errors from a closure.
One adjustment from the sketch: since catalog.State can return an error, I followed the codebase's iter.Seq2[T, error] scan convention (the same shape as PrefixScan/IterateLedgers) rather than iter.Seq2[chunk.ID, geometry.State] — yielding a small chunkTxHashState{Chunk, State} value paired with the error. I left it as a backfill free function (not a cat.TxHashStates method) to avoid growing the catalog API for a backfill-local helper, but happy to move it to a method if you'd prefer that form.
| for _, kind := range kinds { | ||
| for _, path := range layout.ArtifactPaths(chunkID, kind) { | ||
| newParent := !bucketExisted[filepath.Dir(path)] | ||
| if berr := geometry.BarrierNewFile(path, newParent); berr != nil { |
There was a problem hiding this comment.
Side note, visible here but really a #824 / geometry-package thing: the fsync helpers BarrierNewFile & co. don't belong in geometry. BarrierNewFile, FsyncDir, FsyncParentDirs, DeepestExistingDir, FsyncNewDirs, DeleteFileIfExists, RmdirIfEmpty are generic os-level durability ops with zero geometry or domain references — they only landed in geometry because they shared paths.go with the path bijection in the monolith. geometry should be naming + math (identity and location, no I/O); crash-safe file ordering is a different concern that'd sit better in a small durable/fsync leaf package imported by catalog, backfill, and the hot-tier code. Worth folding into the #824 split.
Rebase the daemon layer onto the split #818 and follow the issue #824 package layout instead of the old single streaming package: - backfill/: resolve.go + execute.go join process.go/txindex.go (the planner + bounded-worker executor). RunBackfill and the RunChunk/ RunIndex test seams are exported so the daemon package's catch-up tests can inject fakes across the package boundary. - observability/: the daemon's control-plane Metrics sink (NopMetrics and MetricsOrNop exported for backfill, which consumes the interface). - fullhistory/ (top level, package fullhistory): daemon, startup (catch-up + serve), config_validate, progress, doc. Adopt #817's chunks_per_txhash_index-is-a-constant change in the daemon layer: drop the cpi config field + its form/restart validation, replace PinLayout(cpi, earliest) with PinEarliestLedger(earliest), and build the TxHashIndexLayout from geometry.ChunksPerTxhashIndex. The daemon E2E test now exercises a non-terminal (rolling) index coverage, since one complete chunk no longer finalizes a 1000-chunk window; the terminal demote+sweep stays unit-tested in backfill/txindex_test. Propagate #818's window->index naming into the daemon: windowsOverlapping -> indexesOverlapping, the TxHashIndexLayout vars -> txLayout. closes #815
…ages Every error in process.go/txindex.go is either wrapped with %w (so the chain already carries operation + id context) or a self-describing leaf/sentinel. A package prefix just repeats at each wrap level and the "streaming:" token is a stale package name besides. Drop it, per review.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 4f85de7dfd
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
- geometry.BarrierNewFile: always fsync file -> parent -> grandparent; drop the newParent bool and the bucketExisted snapshot / newIndexDir branch that fed it. An fsync of an unchanged dir is nearly free, so the optimization wasn't worth the per-write bookkeeping. MkdirAll is idempotent, so the index-dir stat goes away too. (r3479586375) - backfill: extract oneWrite(mark, create, barrier, flip) and adopt it in processChunk and buildTxhashIndex, so the crash-safe one-write order lives in one place rather than being spelled out per site. (r3479586490) - pollingBackendWaiter.WaitForCoverage: replace the hand-rolled timer/select loop with backoff.Retry. The tip-query error is backoff.Permanent (stays fatal); the per-call context.WithDeadline is kept so a hung query still can't outlast Timeout; the low-tip error is returned once MaxElapsedTime stops the loop, so callers still classify the timeout via ErrBackendCoverageTimeout. WithMaxElapsedTime only composes with ExponentialBackOff, so a unit multiplier and zero randomization reduce it to a constant poll. (r3479586626) - txindex: replace the forEachChunkTxHashState callback with an iter.Seq2[chunkTxHashState, error] iterator (the catalog scan convention, since catalog.State can fail); txhashBinInputs/demotedTxhashRefs use plain for-range with break/continue instead of sentinel errors from a closure. (r3479586743) The //nolint:cyclop on processChunk/buildTxhashIndex are removed: both now sit at complexity 13, under the 15 threshold.
Relocate the one-write protocol ordering helper (mark -> create -> barrier -> flip) from backfill/process.go to catalog_protocol.go, where the protocol's states and mark/flip steps already live, and export it as catalog.OneWrite. processChunk and buildTxhashIndex now call it across the package boundary. It is a zero-dependency pure function and catalog never imports backfill, so there is no import cycle; #820's hot-tier openHotTierForChunk adopts it as the third caller by import alone, with no later relocation. Addresses the #818 review thread that asked to establish the shared helper here rather than deferring the move to #820.
|
Two remaining items in the 1. The ledger-source boundary between
|
…evert OneWrite
Redraw the ledger-source boundary between backfill and ingest, and revert the
catalog.OneWrite wrapper, per the review (comment 4812281401). None of this is
wired into the daemon yet, so the structural change is cheap to make now.
ingest: make the cold materializer source-blind. RunColdChunk becomes
WriteColdChunk(ctx, logger, chunkID, raw iter.Seq2[[]byte, error], dirs, sink,
cfg), taking the raw ledger iterator rather than a source. Delete source.go
whole (ChunkSource, ChunkSourceFunc, packSource/NewPackSource, dataStoreSource/
NewDataStoreSource) and the dead bulk path (RunCold, the per-chunk drive, the
buildColdIngesters convenience); rename buildColdIngestersIn -> buildColdIngesters.
drain now takes the raw iterator; RunHot takes a ledgerbackend.LedgerStream
directly. Source resolution (pack-stat, coverage wait) moves to the backfill
caller, so a pack-missing/coverage-timeout failure is metered there, not as a
cold ColdChunkTotal attempt.
ledger: relocate packStream into the store as NewPackStream(path string) — the
local frozen .pack is just a bare ledgerbackend.LedgerStream.
backfill: introduce Backend{ ledgerbackend.LedgerStream; Tip(ctx) } — bsbSource
now (Tip = datastore.FindLatestLedgerSequence), captive core later (the interface
is the seam). Collapse BackendWaiter/pollingBackendWaiter/NewPollingBackendWaiter
into a single waitForCoverage, and reduce ProcessConfig's source surface to one
Backend. backfillSource returns a bare LedgerStream (pack arm via NewPackStream,
bulk arm gated by waitForCoverage on the Backend's Tip).
catalog: revert OneWrite. Inline the four-step protocol straight-line at
processChunk and buildTxhashIndex, each step labelled // one-write:mark|create|
barrier|flip. The wrapper only sequenced calls; crash-safety is guaranteed by
catalog/crashsafety_test.go, not the wrapper.
|
Both done in 1. Source boundary.
2. |
golangci-lint: - backend.go: blank line separating the embedded LedgerStream from the ds field (embeddedstructfieldcheck) - ingest_test.go: wrap the >120-char WriteColdChunk call in TestWriteColdChunk_CanceledContext (lll) - ingest_test.go: //nolint:unparam on the chunk-general packPath/rawChunk test helpers — every surviving caller uses chunk 0 after the multi-chunk RunCold tests were removed, but the params stay to express intent review nits: - rename TestRunCold_* -> TestWriteColdChunk_* and fix the stale "RunCold" prose now that the cold materializer is WriteColdChunk (keeping the two historical references to the removed RunCold) - drop the dead `_ = root` binding in TestProcessChunk_ProducesAllArtifactsAndFreezes
tamirms
left a comment
There was a problem hiding this comment.
Approving. The source-boundary redesign and the OneWrite revert both landed cleanly in 096463ef, and the rest of the primitives layer matches the design — the resolver/executor and daemon wiring are correctly left to their later layers, and the empty-window (#826) and geometry/fsync (#824) items are tracked. Build and unit tests are green; the two red checks are the unrelated XDR Go/Rust rev mismatch.
A few optional cosmetic nits, none blocking:
process.go:93— the one-write comment is garbled (a dangling "The" and a doubled// //);txindex.gohas the clean version to copy.geometry/paths.go:109— theTxHashRawRootdoc still referencesRunCold, which this PR removed; theTestRunCold_*names iningest_test.goare similarly stale now that they callWriteColdChunk.ingest/service.go— theColdServicestruct comment says the timer starts "from the Finalize/Close call," but it actually starts at construction (the constructor comment is the accurate one).
Rebase the daemon layer onto the split #818 and follow the issue #824 package layout instead of the old single streaming package: - backfill/: resolve.go + execute.go join process.go/txindex.go (the planner + bounded-worker executor). RunBackfill and the RunChunk/ RunIndex test seams are exported so the daemon package's catch-up tests can inject fakes across the package boundary. - observability/: the daemon's control-plane Metrics sink (NopMetrics and MetricsOrNop exported for backfill, which consumes the interface). - fullhistory/ (top level, package fullhistory): daemon, startup (catch-up + serve), config_validate, progress, doc. Adopt #817's chunks_per_txhash_index-is-a-constant change in the daemon layer: drop the cpi config field + its form/restart validation, replace PinLayout(cpi, earliest) with PinEarliestLedger(earliest), and build the TxHashIndexLayout from geometry.ChunksPerTxhashIndex. The daemon E2E test now exercises a non-terminal (rolling) index coverage, since one complete chunk no longer finalizes a 1000-chunk window; the terminal demote+sweep stays unit-tested in backfill/txindex_test. Propagate #818's window->index naming into the daemon: windowsOverlapping -> indexesOverlapping, the TxHashIndexLayout vars -> txLayout. closes #815
#818's review unified its bulk-source seams: the separate ChunkSource (OpenStream) and BackendWaiter (WaitForCoverage) collapsed into one backfill.Backend interface (ledgerbackend.LedgerStream + Tip), with the coverage wait now internal to backfill (waitForCoverage, driven by Backend.Tip). Adapt the daemon layer to the frozen #818 surface: - Boundaries.Backend is now a backfill.Backend; drop the BackendWaiter field, its validate() check, and the ProcessConfig wiring. - backendTip adapts a backfill.Backend to NetworkTipBackend via Tip, so the catch-up tip and the freeze's coverage frontier are one source and cannot disagree. Its old WaitForCoverage half (and the four TestBackendTip_WaitForCoverage* tests) are dropped — that logic now lives in and is tested by backfill.waitForCoverage. - E2E + adapter tests use a fakeBackend (LedgerStream + Tip) in place of the old countingChunkSource/fakeWaiter/fakeLedgerBackend fakes.
The resolver's kind-rules comment used windowFirstChunk/windowLastChunk — names that read like identifiers but don't exist; the code calls txLayout.FirstChunk(w)/LastChunk(w). Name the real API so the pseudo-code is greppable and accurate. Prose still says "window" for the index unit, matching #818's txindex.go convention and the design doc.
… loop Per tamirms's #818 review ("the executor's own retry logic (#819) could share the same primitive too"), replace the hand-rolled withRetries + retryBackoffFor + ctxSleep with cenkalti/backoff — the same primitive backfillSource's waitForCoverage already uses. Behavior-preserving: exponential x2 from RetryBackoff, capped at maxRetryBackoff, no jitter, MaxRetries+1 total attempts, ctx-cancellation aborts. Drops the retrySleep test seam; the retry tests now assert the policy's NextBackOff sequence directly and use a tiny base for the call-count tests.
Part of #815 — Phase 1 (Backfill), layer 2 of 3. Stacked on PR1A (
streaming-phase1-foundations).The primitives — single-pass cold materialization for all three data types:
processChunk— materializes a chunk's ledgers (.pack), events (cold segment), and tx-hash (sorted.bin) artifacts for everyKind, with per-kind idempotencybackfillSource(frozen.pack→ bulk backend)buildTxhashIndex— k-way merge of the in-window.binruns into a sorted.idx(byte-format-identical to the single-index build)ingest.RunColdChunksingle-chunk cold primitiveVerification:
go build+go vet+go test -shortgreen on./cmd/stellar-rpc/internal/fullhistory/...(cgo RocksDB toolchain). Note: the fullcmd/stellar-rpcbinary link requires the Rustlibpreflight/libxdr2json(CImake build-libs);go vet ./cmd/stellar-rpc/type-checks the entrypoint locally.golangci-lintruns in CI.Stack:
streaming-phase1-primitives→streaming-phase1-foundations