Streaming daemon: Slice 1 — Layer 1 (foundations)#805
Conversation
|
Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits. |
|
|
||
| const ( | ||
| // KindLedgers is the ledger pack file (.pack). | ||
| KindLedgers Kind = "ledgers" |
There was a problem hiding this comment.
events and txhash added in future PRs
Rewrite the package doc File map so it lists only the files this layer (Slice 1 · Layer 1) actually introduces — keys/paths, catalog + one-write protocol + sweep, config schema/loader/lock, the ArtifactSet/Kind abstraction, and the test-seam hooks. The previous map described the whole finished package (process.go, lifecycle.go, startup.go, audit.go, ...), none of which exist in this layer. Drop the reference to design-docs/full-history-implementation-status.md and prefer 'catalog' over 'meta-store' in the prose (the catalog is the metastore.Store wrapper). Add a 'Later layers' note pointing at the storage/orchestration/daemon layers and the events/tx-hash slices.
| // {root}/ | ||
| // ├── catalog/rocksdb/ | ||
| // ├── hot/{chunk:08d}/ | ||
| // └── ledgers/{bucket:05d}/{chunk:08d}.pack |
There was a problem hiding this comment.
events and txhash added in future PRs
| "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/ingest" | ||
| ) | ||
|
|
||
| // ArtifactSet is the subset of per-chunk artifact Kinds a processChunk pass must |
There was a problem hiding this comment.
Used by resolver/processChunk in future PRs
- config_lock: //nolint:gosec on int(f.Fd()) (a file descriptor always fits in int) — line-above to stay within the 120-col lll limit - artifacts: drop two dead //nolint:gosec directives (gosec no longer fires on the uint8 shift); //nolint:unused on ingestConfig (consumed by processChunk in a later layer) - config: //nolint:revive on StreamingConfig (Config names the top-level daemon config; sections follow the XxxConfig convention — no rename) - hooks: //nolint:unused on the forward-declared crash hooks (failCommitBatch/afterMarkFreezing + their accessors) fired from processChunk / recovery in later layers
chowbao
left a comment
There was a problem hiding this comment.
Design-doc → code map (Slice 1 · Layer 1)
Inline comments below anchor each design reference from #809 to the code that implements it, so reviewing this slice against full-history-streaming-workflow.md is a direct comparison. Quick index:
| Design section | Code |
|---|---|
| Catalog keys + states | keys.go |
| Data model (keys-first / pure catalog) | catalog.go, artifacts.go |
| One write protocol | catalog_protocol.go, catalog_sweep.go, paths.go (fsync barriers) |
| Directory layout / Filesystem artifacts | paths.go |
| Geometry | pkg/chunk (already merged) — consumed in keys.go / paths.go |
| Configuration | config.go |
| Single-process enforcement | config_lock.go |
| Substrate assumptions (crash-safety) | hooks.go + streaming_test.go |
Two notes on issue↔PR scope while mapping:
- Geometry core math (
chunkFirstLedger/chunkLastLedger,LedgersPerChunk, the chunk −1 sentinel) lives in the already-mergedpkg/chunk, so it isn't in this diff; this layer only consumes it (8-digit padded ids, bucket =chunk / 1000). - Retention gate / Reader contract (#reader-contract): the issue lists
RetentionGate/ChunkBelowFloorunder L1 cross-cutting primitives, but this PR contains only theretention_chunksconfig field — the gate predicate (retention.go) is grouped under the Orchestration layer indoc.go, i.e. a later layer. Worth confirming whether that's intentional deferral or a scope mismatch in the issue text.
| const ( | ||
| // StateFreezing — the immutable file is being written. Set BEFORE any I/O | ||
| // (the mark-then-write rule), so a crash mid-write is detectable from the | ||
| // key alone and every file on disk is reachable from a key. | ||
| StateFreezing State = "freezing" | ||
| // StateFrozen — the file and its dirent are fsynced and durable. Truth: | ||
| // readers and the resolver trust it blindly. | ||
| StateFrozen State = "frozen" | ||
| // StatePruning — the file is queued for removal; it may or may not still be | ||
| // on disk. A sweep finishes the unlink and then deletes the key. | ||
| StatePruning State = "pruning" | ||
| ) | ||
|
|
||
| // HotState is a hot-DB key's value. One key per chunk brackets the chunk's | ||
| // hot RocksDB directory; the column families inside carry no individual key. | ||
| type HotState string | ||
|
|
||
| const ( | ||
| // HotTransient — a directory operation is in flight (creation or | ||
| // deletion), or a recovery demoted the key. The recovery is identical | ||
| // either way: the open path wipes and recreates, the discard scan re-runs. | ||
| HotTransient HotState = "transient" | ||
| // HotReady — the dir exists and is usable for reads and writes. | ||
| HotReady HotState = "ready" | ||
| ) |
There was a problem hiding this comment.
Design → Catalog keys (Data model)
The artifact lifecycle states the spec defines: freezing | frozen | pruning for per-chunk artifacts and transient | ready for the hot DB, plus the Kind registry. Empty State (key absent) is the design's "neither file nor in-progress write exists".
| const ( | ||
| chunkPrefix = "chunk:" | ||
| hotPrefix = "hot:chunk:" | ||
|
|
||
| // Config pins. | ||
| configEarliestLedger = "config:earliest_ledger" | ||
| ) | ||
|
|
||
| // chunkKey returns the per-chunk artifact key chunk:{chunk:08d}:{kind}. | ||
| func chunkKey(c chunk.ID, kind Kind) string { | ||
| return chunkPrefix + c.String() + ":" + string(kind) | ||
| } | ||
|
|
||
| // hotChunkKey returns the hot-DB key hot:chunk:{chunk:08d}. | ||
| func hotChunkKey(c chunk.ID) string { | ||
| return hotPrefix + c.String() | ||
| } | ||
|
|
||
| // --------------------------------------------------------------------------- | ||
| // Key parsing. The inverse of the constructors above; every parser is the | ||
| // reverse bijection of exactly one constructor. | ||
| // --------------------------------------------------------------------------- | ||
|
|
||
| // parseChunkKey decodes chunk:{chunk:08d}:{kind}. ok is false for any key that | ||
| // is not a well-formed per-chunk artifact key. | ||
| func parseChunkKey(key string) (chunk.ID, Kind, bool) { | ||
| rest, found := strings.CutPrefix(key, chunkPrefix) | ||
| if !found { | ||
| return 0, "", false | ||
| } | ||
| id, suffix, found := strings.Cut(rest, ":") | ||
| if !found { | ||
| return 0, "", false | ||
| } | ||
| n, err := parsePadded(id) | ||
| if err != nil { | ||
| return 0, "", false | ||
| } | ||
| kind := Kind(suffix) | ||
| if !isKnownKind(kind) { | ||
| return 0, "", false | ||
| } | ||
| return chunk.ID(n), kind, true | ||
| } | ||
|
|
||
| // parseHotChunkKey decodes hot:chunk:{chunk:08d}. | ||
| func parseHotChunkKey(key string) (chunk.ID, bool) { | ||
| rest, found := strings.CutPrefix(key, hotPrefix) | ||
| if !found { | ||
| return 0, false | ||
| } | ||
| n, err := parsePadded(rest) | ||
| if err != nil { | ||
| return 0, false | ||
| } | ||
| return chunk.ID(n), true | ||
| } | ||
|
|
||
| // parsePadded parses an 8-digit zero-padded decimal segment as produced by | ||
| // chunk.ID.String(). It enforces the fixed 8-char width so the bijection is | ||
| // exact — a non-padded or wrong-width segment is rejected, not silently | ||
| // accepted. | ||
| func parsePadded(s string) (uint32, error) { | ||
| if len(s) != 8 { | ||
| return 0, fmt.Errorf("streaming: %q is not an 8-digit padded id", s) | ||
| } | ||
| n, err := strconv.ParseUint(s, 10, 32) | ||
| if err != nil { | ||
| return 0, fmt.Errorf("streaming: %q is not numeric: %w", s, err) | ||
| } | ||
| return uint32(n), nil | ||
| } | ||
|
|
||
| func isKnownKind(k Kind) bool { | ||
| return slices.Contains(allKinds, k) | ||
| } |
There was a problem hiding this comment.
Design → Catalog keys · padding form per Geometry
The key families from the spec — chunk:{c:08d}:{kind}, hot:chunk:{c:08d}, config:* pins — with every key built and parsed in exactly one place (the single source of truth for the key↔path bijection). parsePadded enforces the fixed 8-digit zero-padded width that chunk.ID.String() (geometry, in pkg/chunk) produces, so lexical order == numeric order and the bijection is exact rather than lenient.
| // ArtifactSet is the subset of per-chunk artifact Kinds a processChunk pass must | ||
| // produce (design-docs rule 2). It is a small immutable set over the per-chunk | ||
| // kinds (currently just ledgers); the resolver builds it from the catalog | ||
| // difference and processChunk narrows it further by dropping already-frozen | ||
| // kinds (rule 1's per-kind idempotency). | ||
| // | ||
| // The representation is a fixed-width bitmask over allKinds' canonical order, so | ||
| // Kinds() yields kinds in that order (the same order buildColdIngesters uses) | ||
| // and membership tests are allocation-free. | ||
| type ArtifactSet struct { | ||
| mask uint8 | ||
| } | ||
|
|
||
| // kindBit maps a Kind to its bit in ArtifactSet.mask via its index in allKinds. | ||
| // An unknown kind returns (0,false) so callers never set a phantom bit. | ||
| func kindBit(k Kind) (uint8, bool) { | ||
| for i, kk := range allKinds { | ||
| if kk == k { | ||
| return uint8(1) << i, true //nolint:gosec // len(allKinds) small, no overflow | ||
| } | ||
| } | ||
| return 0, false | ||
| } | ||
|
|
||
| // NewArtifactSet builds a set from the given kinds. Unknown kinds are ignored | ||
| // (the kind registry in keys.go is the authority); duplicates are idempotent. | ||
| func NewArtifactSet(kinds ...Kind) ArtifactSet { | ||
| var s ArtifactSet | ||
| for _, k := range kinds { | ||
| if bit, ok := kindBit(k); ok { | ||
| s.mask |= bit | ||
| } | ||
| } | ||
| return s | ||
| } | ||
|
|
||
| // AllArtifacts is the full set (currently just ledgers) — what a from-scratch | ||
| // chunk freeze requests before per-kind idempotency narrows it. | ||
| func AllArtifacts() ArtifactSet { return NewArtifactSet(allKinds...) } |
There was a problem hiding this comment.
Design → one-write protocol, "produce the full artifact set" (rule 2)
The ArtifactSet / Kind abstraction the issue lists under cross-cutting primitives — a fixed-width bitmask over allKinds. The resolver / processChunk (later layers) build it from the catalog difference and narrow it by per-kind idempotency. Today only KindLedgers is wired; events / txhash arrive in Slices 2–3, so ingestConfig below is intentionally forward-looking.
| // The one write protocol — mark-then-write. Every durable per-chunk artifact | ||
| // flows through here: | ||
| // | ||
| // 1. Put the key "freezing" via metastore BEFORE any I/O. | ||
| // 2. The caller writes the file. | ||
| // 3. The caller fsyncs the FILE + its PARENT dirent (+ the GRANDPARENT dirent | ||
| // when the parent dir was just created) — barrierNewFile in paths.go. | ||
| // 4. Flip to "frozen": a single Put for per-chunk artifacts. | ||
| // | ||
| // The pre-mark gives "every file on disk has its meta key"; the dirent | ||
| // barriers guarantee the key never outlives the file's creation; the frozen | ||
| // flip is the only transition readers trust. The catalog owns steps 1 and 4 | ||
| // (meta-store writes); the caller owns steps 2 and 3 (I/O), calling | ||
| // MarkChunkFreezing before and FlipChunkFrozen after. | ||
|
|
||
| // MarkChunkFreezing puts every requested kind's key to "freezing" in one | ||
| // atomic synced batch, BEFORE any file I/O. Re-marking a "freezing"/"pruning"/ | ||
| // absent key is the idempotent re-materialization entry; a "frozen" kind is | ||
| // the caller's to skip (rule 1's per-kind idempotency), not this helper's. | ||
| func (c *Catalog) MarkChunkFreezing(chunkID chunk.ID, kinds ...Kind) error { | ||
| if len(kinds) == 0 { | ||
| return errors.New("streaming: MarkChunkFreezing requires at least one kind") | ||
| } | ||
| return c.store.Batch(func(w *metastore.BatchWriter) error { | ||
| for _, kind := range kinds { | ||
| w.Put(chunkKey(chunkID, kind), string(StateFreezing)) | ||
| } | ||
| return nil | ||
| }) | ||
| } | ||
|
|
||
| // FlipChunkFrozen flips every requested kind's key to "frozen" in one atomic | ||
| // synced batch. The caller MUST have completed barrierNewFile for every file | ||
| // first — "frozen" means durable and complete, trusted blindly downstream. | ||
| func (c *Catalog) FlipChunkFrozen(chunkID chunk.ID, kinds ...Kind) error { | ||
| if len(kinds) == 0 { | ||
| return errors.New("streaming: FlipChunkFrozen requires at least one kind") | ||
| } | ||
| return c.store.Batch(func(w *metastore.BatchWriter) error { | ||
| for _, kind := range kinds { | ||
| w.Put(chunkKey(chunkID, kind), string(StateFrozen)) | ||
| } | ||
| return nil | ||
| }) | ||
| } |
There was a problem hiding this comment.
Design → One write protocol
Mark-then-write, exactly as specced: MarkChunkFreezing puts "freezing" before any file I/O (so every file on disk is reachable from a key and a crash mid-write is detectable from the key alone), the caller writes + fsyncs, then FlipChunkFrozen flips to "frozen" — the only state readers/resolver trust. Multi-kind transitions commit in one atomic synced metastore batch. The hot-DB transient → ready bracket below is the same two ideas applied to a directory.
| // Key-driven sweep — the ONLY deletion body in the system. Its ordering is | ||
| // load-bearing: | ||
| // | ||
| // demote-if-still-"frozen" (never unlink under a frozen key) | ||
| // -> unlink file(s) | ||
| // -> fsyncDir(parent) (the unlink becomes durable BEFORE the key goes) | ||
| // -> delete key (batched) | ||
| // | ||
| // This gives the exit-side invariant "key absent => file gone": because the | ||
| // key outlives the durable unlink, a crash anywhere leaves the key in place | ||
| // and the sweep re-runs. Deleting the key first would, on a crash, leave a | ||
| // file with no key — the one orphan class this design cannot find. | ||
|
|
||
| // SweepChunkArtifacts deletes the files for a batch of per-chunk artifact refs | ||
| // and removes their keys. Refs already past "frozen" (i.e. "freezing" or | ||
| // "pruning") are unlinked directly; a still-"frozen" ref is demoted to | ||
| // "pruning" first, in one atomic batch, so no unlink ever happens under a | ||
| // frozen key. | ||
| // | ||
| // The whole batch shares three barriers: one demote batch, one fsync pass over | ||
| // the affected parent dirs, one key-delete batch — so sweeping many refs at | ||
| // once pays a single round of each. | ||
| func (c *Catalog) SweepChunkArtifacts(refs []ArtifactRef) error { | ||
| if len(refs) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| // Demote first — never unlink under a "frozen" key. A crash after this | ||
| // batch but before the unlinks leaves "pruning" keys the next sweep | ||
| // finishes. | ||
| if err := c.store.Batch(func(w *metastore.BatchWriter) error { | ||
| for _, ref := range refs { | ||
| if ref.State == StateFrozen { | ||
| w.Put(ref.Key(), string(StatePruning)) | ||
| } | ||
| } | ||
| return nil | ||
| }); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Between the demote and the unlink: every "frozen" ref must now read | ||
| // "pruning". Dropping the demote above would leave it "frozen" here. | ||
| c.hooks.fireBeforeUnlink() | ||
|
|
||
| // Unlink every file (idempotent on already-gone paths), collecting parents | ||
| // for the durability barrier. | ||
| var paths []string | ||
| for _, ref := range refs { | ||
| for _, p := range c.layout.ArtifactPaths(ref.Chunk, ref.Kind) { | ||
| if err := deleteFileIfExists(p); err != nil { | ||
| return err | ||
| } | ||
| paths = append(paths, p) | ||
| } | ||
| } | ||
| if err := fsyncParentDirs(paths); err != nil { // unlinks durable BEFORE keys | ||
| return err | ||
| } | ||
|
|
||
| // Between the durable unlink and the key delete: the files are gone but the | ||
| // keys still exist. Reordering the delete ahead of the unlink would leave a | ||
| // file present here under no key — the one orphan class this order forbids. | ||
| c.hooks.fireBeforeKeyDelete() | ||
|
|
||
| // Delete the keys — only now that the unlinks are durable. | ||
| return c.store.Batch(func(w *metastore.BatchWriter) error { | ||
| for _, ref := range refs { | ||
| w.Delete(ref.Key()) | ||
| } | ||
| return nil | ||
| }) | ||
| } |
There was a problem hiding this comment.
Design → One write protocol (deletion = the reverse)
The single deletion body in the system. The order is load-bearing: demote frozen → pruning → unlink → fsyncDir(parent) → delete key, so the key always outlives the durable unlink ⟹ the spec's exit invariant key absent ⟹ file gone. Deleting the key before the unlink is durable would, on a crash, orphan a file under no key — the one orphan class this design cannot find. The three shared barriers (one demote batch / one fsync pass / one delete batch) make a multi-ref sweep pay each cost once.
|
|
||
| // Layout resolves meta-store keys to on-disk paths. It holds one root PER | ||
| // artifact tree — the key<->path mapping is fixed | ||
| // (design-docs/full-history-streaming-workflow.md "Directory layout"), so a | ||
| // Layout plus a key is enough to find any file without listing a directory. | ||
| // | ||
| // In the default deployment all roots sit under one data dir (NewLayout): | ||
| // | ||
| // {root}/ | ||
| // ├── catalog/rocksdb/ | ||
| // ├── hot/{chunk:08d}/ | ||
| // └── ledgers/{bucket:05d}/{chunk:08d}.pack | ||
| // | ||
| // But each tree's root is independently settable (NewLayoutFromPaths) so an | ||
| // operator's [catalog]/[immutable_storage.*]/[streaming.hot_storage] path | ||
| // overrides are honored — Layout is the SINGLE source of truth for storage | ||
| // paths, and the same roots that get flocked (Paths.LockRoots) are the ones the | ||
| // data path reads/writes. Below each per-tree root the bucket structure is | ||
| // fixed (a bucket is a filesystem concern only; bucket ids never appear in | ||
| // meta-store keys). | ||
| type Layout struct { | ||
| catalogRoot string // meta-store RocksDB dir (a leaf, not a tree root) | ||
| hotRoot string // per-chunk hot RocksDB dirs live directly under here | ||
| ledgersRoot string // {ledgersRoot}/{bucket}/{chunk}.pack | ||
| } | ||
|
|
||
| // NewLayout returns a Layout with every tree defaulting under a single data | ||
| // directory root — the no-override deployment. Equivalent to feeding | ||
| // NewLayoutFromPaths the Paths that Config.ResolvePaths produces when no path | ||
| // override is set. Tests and the default production layout use this. | ||
| func NewLayout(root string) Layout { | ||
| return Layout{ | ||
| catalogRoot: filepath.Join(root, "catalog", "rocksdb"), | ||
| hotRoot: filepath.Join(root, "hot"), | ||
| ledgersRoot: filepath.Join(root, "ledgers"), | ||
| } | ||
| } | ||
|
|
||
| // NewLayoutFromPaths binds a Layout to RESOLVED per-tree roots — the roots | ||
| // Config.ResolvePaths produced (each override applied, each unset tree defaulted | ||
| // under default_data_dir) and that Paths.LockRoots flocked. This is the binding | ||
| // the daemon/audit/recovery use so the lock and the data location can never | ||
| // disagree: every artifact and hot path below honors the same override the | ||
| // flock was taken on. | ||
| func NewLayoutFromPaths(p Paths) Layout { | ||
| return Layout{ | ||
| catalogRoot: p.Catalog, | ||
| hotRoot: p.HotStorage, | ||
| ledgersRoot: p.Ledgers, | ||
| } | ||
| } | ||
|
|
||
| // CatalogPath is the meta-store RocksDB directory. | ||
| func (l Layout) CatalogPath() string { return l.catalogRoot } | ||
|
|
||
| // HotRoot is the directory under which per-chunk hot RocksDB dirs are created. | ||
| func (l Layout) HotRoot() string { return l.hotRoot } | ||
|
|
||
| // HotChunkPath is the per-chunk hot RocksDB directory {hotRoot}/{chunk:08d}/. | ||
| func (l Layout) HotChunkPath(c chunk.ID) string { | ||
| return filepath.Join(l.hotRoot, c.String()) | ||
| } | ||
|
|
||
| // LedgerPackPath is {ledgersRoot}/{bucket:05d}/{chunk:08d}.pack. | ||
| func (l Layout) LedgerPackPath(c chunk.ID) string { | ||
| return filepath.Join(l.ledgersRoot, c.BucketID(), c.String()+".pack") | ||
| } | ||
|
|
||
| // LedgersRoot is the directory under which per-chunk ledger packs are bucketed. | ||
| // A cold ledger ingester rooted here composes the {bucket:05d}/{chunk:08d}.pack | ||
| // path matching LedgerPackPath. | ||
| func (l Layout) LedgersRoot() string { return l.ledgersRoot } | ||
|
|
||
| // ArtifactPaths returns every file a per-chunk artifact kind owns on disk. | ||
| // One path for ledgers. The single place that maps a (chunk, kind) to its | ||
| // files, so the sweep and the freeze writer agree. | ||
| func (l Layout) ArtifactPaths(c chunk.ID, kind Kind) []string { | ||
| switch kind { | ||
| case KindLedgers: | ||
| return []string{l.LedgerPackPath(c)} | ||
| default: | ||
| return nil | ||
| } | ||
| } |
There was a problem hiding this comment.
Design → Directory layout · Filesystem artifacts · bucket id per Geometry
Layout is the inverse of the key schema — the fixed key→path bijection and single source of truth for storage paths, so the flocked roots and the read/write roots can never disagree. The tree (catalog/, hot/{chunk:08d}/, ledgers/{bucket:05d}/{chunk:08d}.pack) matches the design's directory layout; LedgerPackPath derives the bucket via chunk.BucketID() (= chunk / 1000, the geometry layer), and bucket ids deliberately never appear in meta-store keys.
| // Config is the on-disk TOML schema for the full-history streaming daemon — the | ||
| // one --config file (design "Configuration"). Every section maps to a nested | ||
| // struct; optional scalars are pointers so an absent key is distinguishable | ||
| // from an explicit zero and the documented default applies in WithDefaults. | ||
| // | ||
| // The TOML form is the daemon's INPUT; validateConfig turns it (plus the | ||
| // catalog's pins and a network-tip backend) into the resolved StartConfig that | ||
| // startStreaming consumes. The two layout-defining values | ||
| // (chunks_per_txhash_index, earliest_ledger) are pinned immutably on first | ||
| // start and validated against their pins on every restart. | ||
| type Config struct { | ||
| Service ServiceConfig `toml:"service"` | ||
| Backfill BackfillConfig `toml:"backfill"` | ||
| ImmutableStorage ImmutableStorageConfig `toml:"immutable_storage"` | ||
| Catalog CatalogConfig `toml:"catalog"` | ||
| Streaming StreamingConfig `toml:"streaming"` | ||
| Logging LoggingConfig `toml:"logging"` | ||
| } | ||
|
|
||
| // ServiceConfig is [service]. | ||
| type ServiceConfig struct { | ||
| // DefaultDataDir is the base directory for the catalog and the default | ||
| // storage paths. Required. | ||
| DefaultDataDir string `toml:"default_data_dir"` | ||
| } | ||
|
|
||
| // BackfillConfig is [backfill] plus the nested [backfill.bsb]. | ||
| type BackfillConfig struct { | ||
| // Workers is the concurrent task-slot count for bulk catch-up. Default | ||
| // GOMAXPROCS. Must be >= 1. | ||
| Workers *int `toml:"workers"` | ||
|
|
||
| // MaxRetries is per-task retries before the daemon aborts. Default | ||
| // DefaultMaxRetries. Must be >= 0 (0 = run once, no retry). | ||
| MaxRetries *int `toml:"max_retries"` | ||
|
|
||
| // BSB is the Buffered Storage Backend — the default bulk LedgerBackend. | ||
| BSB BSBConfig `toml:"bsb"` | ||
| } | ||
|
|
||
| // BSBConfig is [backfill.bsb] — the Buffered Storage Backend. Required unless | ||
| // another conformant LedgerBackend is wired as the bulk source. | ||
| type BSBConfig struct { | ||
| // BucketPath is the remote object-store path for LedgerCloseMeta (no gs:// | ||
| // prefix for GCS). Required when BSB is the bulk source. | ||
| BucketPath string `toml:"bucket_path"` | ||
|
|
||
| // BufferSize is the prefetch buffer depth per connection. Default | ||
| // DefaultBSBBufferSize. | ||
| BufferSize *int `toml:"buffer_size"` | ||
|
|
||
| // NumWorkers is the download workers per connection. Default | ||
| // DefaultBSBNumWorkers. | ||
| NumWorkers *int `toml:"num_workers"` | ||
| } | ||
|
|
||
| // ImmutableStorageConfig is [immutable_storage.*] — one optional path per | ||
| // artifact tree. An empty path means "default under default_data_dir". | ||
| type ImmutableStorageConfig struct { | ||
| Ledgers StoragePathConfig `toml:"ledgers"` | ||
| } | ||
|
|
||
| // StoragePathConfig is one [immutable_storage.*] / [catalog] / [hot_storage] | ||
| // section: an optional path override. | ||
| type StoragePathConfig struct { | ||
| Path string `toml:"path"` | ||
| } | ||
|
|
||
| // CatalogConfig is [catalog] — optional path override | ||
| // (default {default_data_dir}/catalog/rocksdb). | ||
| type CatalogConfig struct { | ||
| Path string `toml:"path"` | ||
| } | ||
|
|
||
| // StreamingConfig is [streaming] plus the nested [streaming.hot_storage]. | ||
| type StreamingConfig struct { | ||
| // RetentionChunks is the retention window in chunks; 0 = full history. | ||
| // Default 0. | ||
| RetentionChunks *uint32 `toml:"retention_chunks"` | ||
|
|
||
| // EarliestLedger is the earliest ledger this daemon will ever have data | ||
| // for: "genesis", "now", or a chunk-aligned decimal ledger. Default | ||
| // "genesis". Pinned immutably on first start. | ||
| EarliestLedger string `toml:"earliest_ledger"` | ||
|
|
||
| // CaptiveCoreConfig is the path to the CaptiveStellarCore config file. | ||
| // Required. | ||
| CaptiveCoreConfig string `toml:"captive_core_config"` | ||
|
|
||
| // HotStorage is [streaming.hot_storage]. | ||
| HotStorage StoragePathConfig `toml:"hot_storage"` | ||
| } | ||
|
|
||
| // LoggingConfig is [logging]. | ||
| type LoggingConfig struct { | ||
| // Level is debug/info/warn/error. Default "info". | ||
| Level string `toml:"level"` | ||
| // Format is text/json. Default "text". | ||
| Format string `toml:"format"` | ||
| } |
There was a problem hiding this comment.
Design → Configuration
The one --config TOML schema: [service], [backfill(.bsb)], [immutable_storage.*], [catalog], [streaming(.hot_storage)], [logging]. Optional scalars are pointers so an absent key is distinguishable from an explicit zero (defaults applied in WithDefaults). Per the issue, validateConfig (malformed-value rejection + earliest_ledger pin resolution) is intentionally deferred to Layer 4 / #812 because it calls networkTip — this layer ships schema + loader + defaults only. Note ParseConfig (L153) decodes strictly, so a typo in a layout-defining key like earliest_ledger fails loudly instead of silently pinning a default on first start.
| // Single-process enforcement (design "Single-process enforcement"). The daemon | ||
| // holds a kernel flock on a LOCK file under EVERY independently configurable | ||
| // storage root — the catalog, each immutable_storage tree, AND the | ||
| // hot_storage tree. A second daemon that touches any shared root fails fast. | ||
| // | ||
| // Why all roots and not just the catalog: [catalog], each | ||
| // [immutable_storage.*] path, and [streaming.hot_storage] are independently | ||
| // configurable, so two daemons with DIFFERENT catalogs could still share an | ||
| // artifact tree or a hot-DB tree. The hot root matters most — its hot/{chunk} | ||
| // DBs are the only copy of recently-ingested ledgers, independently | ||
| // created/opened/deleted by ingestion and discard, so two daemons sharing it | ||
| // would corrupt or delete that sole copy. | ||
| // | ||
| // A kernel flock is the right primitive: it releases on ANY process exit | ||
| // (including kill -9 / a crash), so a stale lock never strands the next start — | ||
| // nothing on disk to clean up. | ||
|
|
||
| // ErrRootLocked is returned when a LOCK file in a configured root is already | ||
| // held by another process. It wraps the offending root so the daemon can name | ||
| // it in the operator-facing error. | ||
| var ErrRootLocked = errors.New("streaming: storage root is locked by another process") | ||
|
|
||
| // lockFileName is the per-root lock file. Kept distinct from RocksDB's own | ||
| // "LOCK" so the catalog root's flock and RocksDB's internal lock never | ||
| // collide — the catalog root carries both, on different files. | ||
| const lockFileName = "stellar-rpc-fullhistory.lock" | ||
|
|
||
| // RootLocks holds the flock handles for every configured storage root. Release | ||
| // (defer'd by the daemon for the process's whole life) unlocks and closes them | ||
| // all; the kernel also drops them on any process exit. | ||
| type RootLocks struct { | ||
| files []*os.File | ||
| } | ||
|
|
||
| // LockRoots takes a non-blocking exclusive flock on the LOCK file in each | ||
| // distinct root in roots, in the order given. Duplicate paths (e.g. the | ||
| // immutable trees all defaulting under default_data_dir is NOT a duplicate — | ||
| // they are distinct subdirs — but a caller passing the same root twice) are | ||
| // de-duplicated so one root is locked once. On the FIRST root that is already | ||
| // held by another process it releases everything acquired so far and returns | ||
| // ErrRootLocked naming that root — fail fast, leak nothing. | ||
| // | ||
| // Each root directory is created if absent (MkdirAll): a fresh deployment locks | ||
| // before any store opens, and the lock file must have a directory to live in. | ||
| func LockRoots(roots ...string) (*RootLocks, error) { | ||
| locks := &RootLocks{} | ||
| seen := make(map[string]struct{}, len(roots)) | ||
| for _, root := range roots { | ||
| if root == "" { | ||
| continue | ||
| } | ||
| abs, err := filepath.Abs(root) | ||
| if err != nil { | ||
| locks.Release() | ||
| return nil, fmt.Errorf("streaming: resolve lock root %q: %w", root, err) | ||
| } | ||
| if _, dup := seen[abs]; dup { | ||
| continue | ||
| } | ||
| seen[abs] = struct{}{} | ||
|
|
||
| f, err := lockOne(abs) | ||
| if err != nil { | ||
| locks.Release() | ||
| return nil, err | ||
| } | ||
| locks.files = append(locks.files, f) | ||
| } | ||
| return locks, nil | ||
| } | ||
|
|
||
| // lockOne creates root (if absent), opens its LOCK file, and takes a | ||
| // non-blocking exclusive flock. An EWOULDBLOCK means another live process holds | ||
| // it — surfaced as ErrRootLocked, the fail-fast case. Any other error (mkdir, | ||
| // open, a non-contention flock failure) surfaces verbatim. | ||
| func lockOne(root string) (*os.File, error) { | ||
| if err := os.MkdirAll(root, 0o755); err != nil { | ||
| return nil, fmt.Errorf("streaming: create lock root %q: %w", root, err) | ||
| } | ||
| path := filepath.Join(root, lockFileName) | ||
| f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0o644) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("streaming: open lock file %q: %w", path, err) | ||
| } | ||
| if err := unix.Flock(int(f.Fd()), unix.LOCK_EX|unix.LOCK_NB); err != nil { | ||
| _ = f.Close() | ||
| if errors.Is(err, unix.EWOULDBLOCK) { | ||
| return nil, fmt.Errorf("%w: %q (another daemon is using it)", ErrRootLocked, root) | ||
| } | ||
| return nil, fmt.Errorf("streaming: flock %q: %w", path, err) | ||
| } | ||
| return f, nil | ||
| } |
There was a problem hiding this comment.
Design → Concurrency model → Single-process enforcement
A non-blocking kernel flock per storage root (config.go:235 LockRoots enumerates them: catalog + every immutable_storage tree + hot_storage; the shared data dir itself is deliberately not locked). A second daemon touching any shared root fails fast with ErrRootLocked. flock is the right primitive per the spec because it releases on any process exit (incl. kill -9 / crash), so a stale lock never strands the next start — nothing on disk to clean up.
| // The whole batch shares three barriers: one demote batch, one fsync pass over | ||
| // the affected parent dirs, one key-delete batch — so sweeping many refs at | ||
| // once pays a single round of each. | ||
| func (c *Catalog) SweepChunkArtifacts(refs []ArtifactRef) error { |
There was a problem hiding this comment.
Pruning to be done in the Slice 1: Layer 3 orchestration layer
Replace the per-data-type [immutable_storage.*] overrides with ONE
[immutable_storage] path (the cold-tier root); ledgers/events/txhash are
fixed subdirectories beneath it. The tier boundaries that actually matter
for ops — catalog / cold / hot — stay independently configurable; the
per-cold-type granularity (4 roots, 4 flocks, 4 derivation branches + the
divergence-bug class it created) is dropped. The cold root defaults to the
data dir, so the on-disk layout is unchanged.
- config: ImmutableStorageConfig{Ledgers}→{Path}; Paths.Ledgers→Cold;
ResolvePaths derives Cold (default = data dir); LockRoots locks the cold
root. paths: NewLayoutFromPaths derives ledgersRoot from Cold (NewLayout
unchanged). Slice 3 re-adds an optional txhash_index_path override.
- Add TestArtifactPaths_EveryKindMapped: assert every kind in allKinds has a non-empty Layout.ArtifactPaths mapping. The kind registry (allKinds) and the file mapping (ArtifactPaths) are separate sources of truth; a kind added to allKinds without its path case makes the sweep unlink nothing and delete the key, orphaning the files (the one orphan class the key-driven sweep prevents). This fails at CI on drift instead of orphaning at runtime; it extends automatically as later slices add kinds. - catalog_sweep doc: 'freezing' precedes 'frozen' (freezing→frozen→pruning), so reword 'already past frozen' to 'not in the frozen state' (code unchanged).
Comment-only cleanup of the foundations layer — no code changes, behavior byte-identical. Moderate trim: - Consolidate the one-write protocol, which was re-explained in four files, into its authoritative home (catalog_protocol.go); keys.go/catalog.go now reference it instead of restating it. - Collapse multi-line prose `// ----` section dividers to one-line labels. - Drop the speculative per-layer enumeration in doc.go's package comment. - Cut restatement and over-justification, keeping all crash-safety ordering rationale (catalog_sweep.go left untouched) and design-doc references. Comment lines: 581 -> 497. gofmt clean; go vet and go test -short pass.
The ce07f68 refactor replaced the per-data-type [immutable_storage.*] overrides with a single [immutable_storage] cold-tier root, but several comments still described the old "one configurable path per artifact tree" model. The intent of each comment is unchanged; only the model it describes is corrected. Comment-only — behavior byte-identical. - config.go: ImmutableStorageConfig is one [immutable_storage] cold-tier root, not one path per tree (the contradicting field doc was already correct); StoragePathConfig is now used only by [streaming.hot_storage]. - config_lock.go: the locked roots are catalog / cold-tier / hot, not "each immutable_storage tree". - paths.go: the configurable boundaries are catalog / cold-tier / hot, with the ledgers tree derived under the cold root (no longer independently settable). gofmt clean; go vet and go test -short pass.
Cross-checked Layer 1's "later layer (...)" forward-references against the actual later-layer code (slice1b/1c, streaming-slice1-ledgers). Two were phantom; the rest (processChunk, RunColdChunk, the resolver, etc.) are accurate. - failCommitBatch: fired only from recovery.go — there is no CommitIndex anywhere in the stack. (recovery/CommitIndex) -> (recovery). - commitBatchShouldFail: called only from recovery.go:239; catalog_protocol never calls it. (catalog_protocol/recovery) -> (recovery). Comment-only; nolint directives preserved. gofmt clean; go vet and go test -short pass.
| type crashHooks struct { | ||
| beforeKeyDelete func() | ||
| beforeUnlink func() | ||
| failCommitBatch func() bool //nolint:unused // fired from a later layer (recovery) | ||
| afterMarkFreezing func() //nolint:unused // fired from a later layer (processChunk) | ||
| beforeHotTransient func(chunkID chunk.ID) | ||
| } |
There was a problem hiding this comment.
Design → Substrate assumptions (the crash-safety verification seam)
Test-only fault-injection points — nil and no-op in production — fired from inside the real protocol/sweep methods. They exist because the crash-safety invariants are properties of the step ordering in the actual catalog methods, not of a hand-replayed test (which can stay green after the production order breaks). Each hook observes durable state at one load-bearing instant; see TestCrashSafety_* and TestSweepChunk_NeverUnlinksUnderFrozenKey in streaming_test.go for the assertions they enable.
| // Scans. Every "find work" operation iterates keys via PrefixScan; results | ||
| // are sorted so callers (maxChunk, uniqueness checks) need no second pass. | ||
|
|
||
| // ChunkArtifactKeys returns every per-chunk artifact key (all kinds, all | ||
| // chunks) with its value, sorted by key. This is the deletion/audit surface | ||
| // for chunk:* keys. | ||
| func (c *Catalog) ChunkArtifactKeys() ([]ArtifactRef, error) { | ||
| var refs []ArtifactRef | ||
| for e, err := range c.store.PrefixScan(chunkPrefix) { | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| id, kind, ok := parseChunkKey(e.Key) | ||
| if !ok { | ||
| return nil, fmt.Errorf("streaming: malformed chunk key %q", e.Key) | ||
| } | ||
| refs = append(refs, ArtifactRef{Chunk: id, Kind: kind, State: State(e.Value)}) | ||
| } | ||
| return refs, nil | ||
| } | ||
|
|
||
| // HotChunkKeys returns every hot-DB chunk id (value-blind), sorted ascending. | ||
| // The highest is the live chunk — the ingestion/lifecycle partition boundary. | ||
| func (c *Catalog) HotChunkKeys() ([]chunk.ID, error) { | ||
| return c.hotChunkKeysWith(nil) | ||
| } |
There was a problem hiding this comment.
Design → Data model
The keys-first contract in code: every "find work" scan iterates meta-store keys via PrefixScan — nothing lists a directory. Together with the type doc above ("progress is derived, never stored"), this is the design's pure-catalog rule. ReadyHotChunkKeys vs value-blind HotChunkKeys is what lets recovery demote a hot key to transient without disturbing the watermark.
| return closeErr | ||
| } | ||
|
|
||
| // fsyncDir fsyncs a directory entry, making creations and unlinks within it |
There was a problem hiding this comment.
Design → One write protocol durability · Substrate assumptions
The fsync barriers the protocol and sweeps rely on. barrierNewFile is the exact two-level barrier the spec mandates before a key flips to frozen: file + parent dirent, plus the grandparent dirent when the write created the parent (a new bucket dir every 1000th chunk). fsyncParentDirs is the "unlink durable before the key delete" barrier used by the sweep. These encode the design's substrate assumption that a creation is durable only once both the data and the directory entry naming it are fsynced.
Comment-only (behavior byte-identical; gofmt/vet/-short green). Same moderate trim applied to #805: consolidate the repeated "decision (a)" explanation (single shared DB, one atomic synced WriteBatch, single watermark) into its authoritative home — the pkg/stores/hotchunk package doc — and have process.go / hotsource.go / service.go / driver.go / hot_store.go reference it instead of re-stating it. Collapse prose dividers, cut restatement/over-justification. Kept all load-bearing rationale: loss-vs-staleness, mark-then-write ordering, the borrowed-buffer contract, the shared-vs-standalone close-once ownership model, and the live-chunk RocksDB-LOCK caller contract. (This branch was also rebased onto the updated slice1a, so it inherits the L1 comment polish from #805; the diff vs slice1a is now L2-only.)
|
Superseded by a new 2-phase stacked series that re-slices this work by phase (backfill → live ingestion + lifecycle), with the MVP scope cuts (recovery / audit / convergence / retention-reconfiguration dropped) and the folded-in fixes (cold+hot ingest service +
Each layer builds + |
Slice 1, Layer 1 of 4 — Foundations. First of a layered split of the ledgers-skeleton streaming daemon (the slice was ~13.5k; layered into reviewable concerns). Base:
feature/full-history.The durable-state substrate, with no daemon yet — reviewable in isolation:
flocklocking;ArtifactSet/Kindabstraction;Compiles and passes its
-shorttests on its own. Layers 2–4 (storage → orchestration → daemon) stack on top; events and tx-hash follow as later slices.