Skip to content

Streaming daemon: Slice 1 — Layer 1 (foundations)#805

Closed
chowbao wants to merge 8 commits into
feature/full-historyfrom
slice1a
Closed

Streaming daemon: Slice 1 — Layer 1 (foundations)#805
chowbao wants to merge 8 commits into
feature/full-historyfrom
slice1a

Conversation

@chowbao

@chowbao chowbao commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Slice 1, Layer 1 of 4 — Foundations. First of a layered split of the ledgers-skeleton streaming daemon (the slice was ~13.5k; layered into reviewable concerns). Base: feature/full-history.

The durable-state substrate, with no daemon yet — reviewable in isolation:

  • the catalog (meta-store) key schema + the one-write protocol (mark→fsync→flip) + key-driven sweeps;
  • geometry (keys/paths, chunk↔bucket layout);
  • the config struct + loader + single-process flock locking;
  • the retention gate and the ArtifactSet/Kind abstraction;
  • crash-safety hooks.

Compiles and passes its -short tests on its own. Layers 2–4 (storage → orchestration → daemon) stack on top; events and tx-hash follow as later slices.

@chatgpt-codex-connector

Copy link
Copy Markdown

Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits.
Credits must be used to enable repository wide code reviews.


const (
// KindLedgers is the ledger pack file (.pack).
KindLedgers Kind = "ledgers"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

events and txhash added in future PRs

Rewrite the package doc File map so it lists only the files this layer
(Slice 1 · Layer 1) actually introduces — keys/paths, catalog +
one-write protocol + sweep, config schema/loader/lock, the
ArtifactSet/Kind abstraction, and the test-seam hooks. The previous map
described the whole finished package (process.go, lifecycle.go,
startup.go, audit.go, ...), none of which exist in this layer.

Drop the reference to design-docs/full-history-implementation-status.md
and prefer 'catalog' over 'meta-store' in the prose (the catalog is the
metastore.Store wrapper). Add a 'Later layers' note pointing at the
storage/orchestration/daemon layers and the events/tx-hash slices.
// {root}/
// ├── catalog/rocksdb/
// ├── hot/{chunk:08d}/
// └── ledgers/{bucket:05d}/{chunk:08d}.pack

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

events and txhash added in future PRs

"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/ingest"
)

// ArtifactSet is the subset of per-chunk artifact Kinds a processChunk pass must

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used by resolver/processChunk in future PRs

- config_lock: //nolint:gosec on int(f.Fd()) (a file descriptor always
  fits in int) — line-above to stay within the 120-col lll limit
- artifacts: drop two dead //nolint:gosec directives (gosec no longer
  fires on the uint8 shift); //nolint:unused on ingestConfig (consumed
  by processChunk in a later layer)
- config: //nolint:revive on StreamingConfig (Config names the top-level
  daemon config; sections follow the XxxConfig convention — no rename)
- hooks: //nolint:unused on the forward-declared crash hooks
  (failCommitBatch/afterMarkFreezing + their accessors) fired from
  processChunk / recovery in later layers

@chowbao chowbao left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design-doc → code map (Slice 1 · Layer 1)

Inline comments below anchor each design reference from #809 to the code that implements it, so reviewing this slice against full-history-streaming-workflow.md is a direct comparison. Quick index:

Design section Code
Catalog keys + states keys.go
Data model (keys-first / pure catalog) catalog.go, artifacts.go
One write protocol catalog_protocol.go, catalog_sweep.go, paths.go (fsync barriers)
Directory layout / Filesystem artifacts paths.go
Geometry pkg/chunk (already merged) — consumed in keys.go / paths.go
Configuration config.go
Single-process enforcement config_lock.go
Substrate assumptions (crash-safety) hooks.go + streaming_test.go

Two notes on issue↔PR scope while mapping:

  • Geometry core math (chunkFirstLedger/chunkLastLedger, LedgersPerChunk, the chunk −1 sentinel) lives in the already-merged pkg/chunk, so it isn't in this diff; this layer only consumes it (8-digit padded ids, bucket = chunk / 1000).
  • Retention gate / Reader contract (#reader-contract): the issue lists RetentionGate / ChunkBelowFloor under L1 cross-cutting primitives, but this PR contains only the retention_chunks config field — the gate predicate (retention.go) is grouped under the Orchestration layer in doc.go, i.e. a later layer. Worth confirming whether that's intentional deferral or a scope mismatch in the issue text.

Comment on lines +16 to +40
const (
// StateFreezing — the immutable file is being written. Set BEFORE any I/O
// (the mark-then-write rule), so a crash mid-write is detectable from the
// key alone and every file on disk is reachable from a key.
StateFreezing State = "freezing"
// StateFrozen — the file and its dirent are fsynced and durable. Truth:
// readers and the resolver trust it blindly.
StateFrozen State = "frozen"
// StatePruning — the file is queued for removal; it may or may not still be
// on disk. A sweep finishes the unlink and then deletes the key.
StatePruning State = "pruning"
)

// HotState is a hot-DB key's value. One key per chunk brackets the chunk's
// hot RocksDB directory; the column families inside carry no individual key.
type HotState string

const (
// HotTransient — a directory operation is in flight (creation or
// deletion), or a recovery demoted the key. The recovery is identical
// either way: the open path wipes and recreates, the discard scan re-runs.
HotTransient HotState = "transient"
// HotReady — the dir exists and is usable for reads and writes.
HotReady HotState = "ready"
)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design → Catalog keys (Data model)

The artifact lifecycle states the spec defines: freezing | frozen | pruning for per-chunk artifacts and transient | ready for the hot DB, plus the Kind registry. Empty State (key absent) is the design's "neither file nor in-progress write exists".

Comment on lines +64 to +139
const (
chunkPrefix = "chunk:"
hotPrefix = "hot:chunk:"

// Config pins.
configEarliestLedger = "config:earliest_ledger"
)

// chunkKey returns the per-chunk artifact key chunk:{chunk:08d}:{kind}.
func chunkKey(c chunk.ID, kind Kind) string {
return chunkPrefix + c.String() + ":" + string(kind)
}

// hotChunkKey returns the hot-DB key hot:chunk:{chunk:08d}.
func hotChunkKey(c chunk.ID) string {
return hotPrefix + c.String()
}

// ---------------------------------------------------------------------------
// Key parsing. The inverse of the constructors above; every parser is the
// reverse bijection of exactly one constructor.
// ---------------------------------------------------------------------------

// parseChunkKey decodes chunk:{chunk:08d}:{kind}. ok is false for any key that
// is not a well-formed per-chunk artifact key.
func parseChunkKey(key string) (chunk.ID, Kind, bool) {
rest, found := strings.CutPrefix(key, chunkPrefix)
if !found {
return 0, "", false
}
id, suffix, found := strings.Cut(rest, ":")
if !found {
return 0, "", false
}
n, err := parsePadded(id)
if err != nil {
return 0, "", false
}
kind := Kind(suffix)
if !isKnownKind(kind) {
return 0, "", false
}
return chunk.ID(n), kind, true
}

// parseHotChunkKey decodes hot:chunk:{chunk:08d}.
func parseHotChunkKey(key string) (chunk.ID, bool) {
rest, found := strings.CutPrefix(key, hotPrefix)
if !found {
return 0, false
}
n, err := parsePadded(rest)
if err != nil {
return 0, false
}
return chunk.ID(n), true
}

// parsePadded parses an 8-digit zero-padded decimal segment as produced by
// chunk.ID.String(). It enforces the fixed 8-char width so the bijection is
// exact — a non-padded or wrong-width segment is rejected, not silently
// accepted.
func parsePadded(s string) (uint32, error) {
if len(s) != 8 {
return 0, fmt.Errorf("streaming: %q is not an 8-digit padded id", s)
}
n, err := strconv.ParseUint(s, 10, 32)
if err != nil {
return 0, fmt.Errorf("streaming: %q is not numeric: %w", s, err)
}
return uint32(n), nil
}

func isKnownKind(k Kind) bool {
return slices.Contains(allKinds, k)
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design → Catalog keys · padding form per Geometry

The key families from the spec — chunk:{c:08d}:{kind}, hot:chunk:{c:08d}, config:* pins — with every key built and parsed in exactly one place (the single source of truth for the key↔path bijection). parsePadded enforces the fixed 8-digit zero-padded width that chunk.ID.String() (geometry, in pkg/chunk) produces, so lexical order == numeric order and the bijection is exact rather than lenient.

Comment on lines +9 to +47
// ArtifactSet is the subset of per-chunk artifact Kinds a processChunk pass must
// produce (design-docs rule 2). It is a small immutable set over the per-chunk
// kinds (currently just ledgers); the resolver builds it from the catalog
// difference and processChunk narrows it further by dropping already-frozen
// kinds (rule 1's per-kind idempotency).
//
// The representation is a fixed-width bitmask over allKinds' canonical order, so
// Kinds() yields kinds in that order (the same order buildColdIngesters uses)
// and membership tests are allocation-free.
type ArtifactSet struct {
mask uint8
}

// kindBit maps a Kind to its bit in ArtifactSet.mask via its index in allKinds.
// An unknown kind returns (0,false) so callers never set a phantom bit.
func kindBit(k Kind) (uint8, bool) {
for i, kk := range allKinds {
if kk == k {
return uint8(1) << i, true //nolint:gosec // len(allKinds) small, no overflow
}
}
return 0, false
}

// NewArtifactSet builds a set from the given kinds. Unknown kinds are ignored
// (the kind registry in keys.go is the authority); duplicates are idempotent.
func NewArtifactSet(kinds ...Kind) ArtifactSet {
var s ArtifactSet
for _, k := range kinds {
if bit, ok := kindBit(k); ok {
s.mask |= bit
}
}
return s
}

// AllArtifacts is the full set (currently just ledgers) — what a from-scratch
// chunk freeze requests before per-kind idempotency narrows it.
func AllArtifacts() ArtifactSet { return NewArtifactSet(allKinds...) }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design → one-write protocol, "produce the full artifact set" (rule 2)

The ArtifactSet / Kind abstraction the issue lists under cross-cutting primitives — a fixed-width bitmask over allKinds. The resolver / processChunk (later layers) build it from the catalog difference and narrow it by per-kind idempotency. Today only KindLedgers is wired; events / txhash arrive in Slices 2–3, so ingestConfig below is intentionally forward-looking.

Comment on lines +10 to +54
// The one write protocol — mark-then-write. Every durable per-chunk artifact
// flows through here:
//
// 1. Put the key "freezing" via metastore BEFORE any I/O.
// 2. The caller writes the file.
// 3. The caller fsyncs the FILE + its PARENT dirent (+ the GRANDPARENT dirent
// when the parent dir was just created) — barrierNewFile in paths.go.
// 4. Flip to "frozen": a single Put for per-chunk artifacts.
//
// The pre-mark gives "every file on disk has its meta key"; the dirent
// barriers guarantee the key never outlives the file's creation; the frozen
// flip is the only transition readers trust. The catalog owns steps 1 and 4
// (meta-store writes); the caller owns steps 2 and 3 (I/O), calling
// MarkChunkFreezing before and FlipChunkFrozen after.

// MarkChunkFreezing puts every requested kind's key to "freezing" in one
// atomic synced batch, BEFORE any file I/O. Re-marking a "freezing"/"pruning"/
// absent key is the idempotent re-materialization entry; a "frozen" kind is
// the caller's to skip (rule 1's per-kind idempotency), not this helper's.
func (c *Catalog) MarkChunkFreezing(chunkID chunk.ID, kinds ...Kind) error {
if len(kinds) == 0 {
return errors.New("streaming: MarkChunkFreezing requires at least one kind")
}
return c.store.Batch(func(w *metastore.BatchWriter) error {
for _, kind := range kinds {
w.Put(chunkKey(chunkID, kind), string(StateFreezing))
}
return nil
})
}

// FlipChunkFrozen flips every requested kind's key to "frozen" in one atomic
// synced batch. The caller MUST have completed barrierNewFile for every file
// first — "frozen" means durable and complete, trusted blindly downstream.
func (c *Catalog) FlipChunkFrozen(chunkID chunk.ID, kinds ...Kind) error {
if len(kinds) == 0 {
return errors.New("streaming: FlipChunkFrozen requires at least one kind")
}
return c.store.Batch(func(w *metastore.BatchWriter) error {
for _, kind := range kinds {
w.Put(chunkKey(chunkID, kind), string(StateFrozen))
}
return nil
})
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design → One write protocol

Mark-then-write, exactly as specced: MarkChunkFreezing puts "freezing" before any file I/O (so every file on disk is reachable from a key and a crash mid-write is detectable from the key alone), the caller writes + fsyncs, then FlipChunkFrozen flips to "frozen" — the only state readers/resolver trust. Multi-kind transitions commit in one atomic synced metastore batch. The hot-DB transient → ready bracket below is the same two ideas applied to a directory.

Comment on lines +7 to +79
// Key-driven sweep — the ONLY deletion body in the system. Its ordering is
// load-bearing:
//
// demote-if-still-"frozen" (never unlink under a frozen key)
// -> unlink file(s)
// -> fsyncDir(parent) (the unlink becomes durable BEFORE the key goes)
// -> delete key (batched)
//
// This gives the exit-side invariant "key absent => file gone": because the
// key outlives the durable unlink, a crash anywhere leaves the key in place
// and the sweep re-runs. Deleting the key first would, on a crash, leave a
// file with no key — the one orphan class this design cannot find.

// SweepChunkArtifacts deletes the files for a batch of per-chunk artifact refs
// and removes their keys. Refs already past "frozen" (i.e. "freezing" or
// "pruning") are unlinked directly; a still-"frozen" ref is demoted to
// "pruning" first, in one atomic batch, so no unlink ever happens under a
// frozen key.
//
// The whole batch shares three barriers: one demote batch, one fsync pass over
// the affected parent dirs, one key-delete batch — so sweeping many refs at
// once pays a single round of each.
func (c *Catalog) SweepChunkArtifacts(refs []ArtifactRef) error {
if len(refs) == 0 {
return nil
}

// Demote first — never unlink under a "frozen" key. A crash after this
// batch but before the unlinks leaves "pruning" keys the next sweep
// finishes.
if err := c.store.Batch(func(w *metastore.BatchWriter) error {
for _, ref := range refs {
if ref.State == StateFrozen {
w.Put(ref.Key(), string(StatePruning))
}
}
return nil
}); err != nil {
return err
}

// Between the demote and the unlink: every "frozen" ref must now read
// "pruning". Dropping the demote above would leave it "frozen" here.
c.hooks.fireBeforeUnlink()

// Unlink every file (idempotent on already-gone paths), collecting parents
// for the durability barrier.
var paths []string
for _, ref := range refs {
for _, p := range c.layout.ArtifactPaths(ref.Chunk, ref.Kind) {
if err := deleteFileIfExists(p); err != nil {
return err
}
paths = append(paths, p)
}
}
if err := fsyncParentDirs(paths); err != nil { // unlinks durable BEFORE keys
return err
}

// Between the durable unlink and the key delete: the files are gone but the
// keys still exist. Reordering the delete ahead of the unlink would leave a
// file present here under no key — the one orphan class this order forbids.
c.hooks.fireBeforeKeyDelete()

// Delete the keys — only now that the unlinks are durable.
return c.store.Batch(func(w *metastore.BatchWriter) error {
for _, ref := range refs {
w.Delete(ref.Key())
}
return nil
})
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design → One write protocol (deletion = the reverse)

The single deletion body in the system. The order is load-bearing: demote frozen → pruning → unlink → fsyncDir(parent) → delete key, so the key always outlives the durable unlink ⟹ the spec's exit invariant key absent ⟹ file gone. Deleting the key before the unlink is durable would, on a crash, orphan a file under no key — the one orphan class this design cannot find. The three shared barriers (one demote batch / one fsync pass / one delete batch) make a multi-ref sweep pay each cost once.

Comment on lines +9 to +92

// Layout resolves meta-store keys to on-disk paths. It holds one root PER
// artifact tree — the key<->path mapping is fixed
// (design-docs/full-history-streaming-workflow.md "Directory layout"), so a
// Layout plus a key is enough to find any file without listing a directory.
//
// In the default deployment all roots sit under one data dir (NewLayout):
//
// {root}/
// ├── catalog/rocksdb/
// ├── hot/{chunk:08d}/
// └── ledgers/{bucket:05d}/{chunk:08d}.pack
//
// But each tree's root is independently settable (NewLayoutFromPaths) so an
// operator's [catalog]/[immutable_storage.*]/[streaming.hot_storage] path
// overrides are honored — Layout is the SINGLE source of truth for storage
// paths, and the same roots that get flocked (Paths.LockRoots) are the ones the
// data path reads/writes. Below each per-tree root the bucket structure is
// fixed (a bucket is a filesystem concern only; bucket ids never appear in
// meta-store keys).
type Layout struct {
catalogRoot string // meta-store RocksDB dir (a leaf, not a tree root)
hotRoot string // per-chunk hot RocksDB dirs live directly under here
ledgersRoot string // {ledgersRoot}/{bucket}/{chunk}.pack
}

// NewLayout returns a Layout with every tree defaulting under a single data
// directory root — the no-override deployment. Equivalent to feeding
// NewLayoutFromPaths the Paths that Config.ResolvePaths produces when no path
// override is set. Tests and the default production layout use this.
func NewLayout(root string) Layout {
return Layout{
catalogRoot: filepath.Join(root, "catalog", "rocksdb"),
hotRoot: filepath.Join(root, "hot"),
ledgersRoot: filepath.Join(root, "ledgers"),
}
}

// NewLayoutFromPaths binds a Layout to RESOLVED per-tree roots — the roots
// Config.ResolvePaths produced (each override applied, each unset tree defaulted
// under default_data_dir) and that Paths.LockRoots flocked. This is the binding
// the daemon/audit/recovery use so the lock and the data location can never
// disagree: every artifact and hot path below honors the same override the
// flock was taken on.
func NewLayoutFromPaths(p Paths) Layout {
return Layout{
catalogRoot: p.Catalog,
hotRoot: p.HotStorage,
ledgersRoot: p.Ledgers,
}
}

// CatalogPath is the meta-store RocksDB directory.
func (l Layout) CatalogPath() string { return l.catalogRoot }

// HotRoot is the directory under which per-chunk hot RocksDB dirs are created.
func (l Layout) HotRoot() string { return l.hotRoot }

// HotChunkPath is the per-chunk hot RocksDB directory {hotRoot}/{chunk:08d}/.
func (l Layout) HotChunkPath(c chunk.ID) string {
return filepath.Join(l.hotRoot, c.String())
}

// LedgerPackPath is {ledgersRoot}/{bucket:05d}/{chunk:08d}.pack.
func (l Layout) LedgerPackPath(c chunk.ID) string {
return filepath.Join(l.ledgersRoot, c.BucketID(), c.String()+".pack")
}

// LedgersRoot is the directory under which per-chunk ledger packs are bucketed.
// A cold ledger ingester rooted here composes the {bucket:05d}/{chunk:08d}.pack
// path matching LedgerPackPath.
func (l Layout) LedgersRoot() string { return l.ledgersRoot }

// ArtifactPaths returns every file a per-chunk artifact kind owns on disk.
// One path for ledgers. The single place that maps a (chunk, kind) to its
// files, so the sweep and the freeze writer agree.
func (l Layout) ArtifactPaths(c chunk.ID, kind Kind) []string {
switch kind {
case KindLedgers:
return []string{l.LedgerPackPath(c)}
default:
return nil
}
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design → Directory layout · Filesystem artifacts · bucket id per Geometry

Layout is the inverse of the key schema — the fixed key→path bijection and single source of truth for storage paths, so the flocked roots and the read/write roots can never disagree. The tree (catalog/, hot/{chunk:08d}/, ledgers/{bucket:05d}/{chunk:08d}.pack) matches the design's directory layout; LedgerPackPath derives the bucket via chunk.BucketID() (= chunk / 1000, the geometry layer), and bucket ids deliberately never appear in meta-store keys.

Comment on lines +13 to +112
// Config is the on-disk TOML schema for the full-history streaming daemon — the
// one --config file (design "Configuration"). Every section maps to a nested
// struct; optional scalars are pointers so an absent key is distinguishable
// from an explicit zero and the documented default applies in WithDefaults.
//
// The TOML form is the daemon's INPUT; validateConfig turns it (plus the
// catalog's pins and a network-tip backend) into the resolved StartConfig that
// startStreaming consumes. The two layout-defining values
// (chunks_per_txhash_index, earliest_ledger) are pinned immutably on first
// start and validated against their pins on every restart.
type Config struct {
Service ServiceConfig `toml:"service"`
Backfill BackfillConfig `toml:"backfill"`
ImmutableStorage ImmutableStorageConfig `toml:"immutable_storage"`
Catalog CatalogConfig `toml:"catalog"`
Streaming StreamingConfig `toml:"streaming"`
Logging LoggingConfig `toml:"logging"`
}

// ServiceConfig is [service].
type ServiceConfig struct {
// DefaultDataDir is the base directory for the catalog and the default
// storage paths. Required.
DefaultDataDir string `toml:"default_data_dir"`
}

// BackfillConfig is [backfill] plus the nested [backfill.bsb].
type BackfillConfig struct {
// Workers is the concurrent task-slot count for bulk catch-up. Default
// GOMAXPROCS. Must be >= 1.
Workers *int `toml:"workers"`

// MaxRetries is per-task retries before the daemon aborts. Default
// DefaultMaxRetries. Must be >= 0 (0 = run once, no retry).
MaxRetries *int `toml:"max_retries"`

// BSB is the Buffered Storage Backend — the default bulk LedgerBackend.
BSB BSBConfig `toml:"bsb"`
}

// BSBConfig is [backfill.bsb] — the Buffered Storage Backend. Required unless
// another conformant LedgerBackend is wired as the bulk source.
type BSBConfig struct {
// BucketPath is the remote object-store path for LedgerCloseMeta (no gs://
// prefix for GCS). Required when BSB is the bulk source.
BucketPath string `toml:"bucket_path"`

// BufferSize is the prefetch buffer depth per connection. Default
// DefaultBSBBufferSize.
BufferSize *int `toml:"buffer_size"`

// NumWorkers is the download workers per connection. Default
// DefaultBSBNumWorkers.
NumWorkers *int `toml:"num_workers"`
}

// ImmutableStorageConfig is [immutable_storage.*] — one optional path per
// artifact tree. An empty path means "default under default_data_dir".
type ImmutableStorageConfig struct {
Ledgers StoragePathConfig `toml:"ledgers"`
}

// StoragePathConfig is one [immutable_storage.*] / [catalog] / [hot_storage]
// section: an optional path override.
type StoragePathConfig struct {
Path string `toml:"path"`
}

// CatalogConfig is [catalog] — optional path override
// (default {default_data_dir}/catalog/rocksdb).
type CatalogConfig struct {
Path string `toml:"path"`
}

// StreamingConfig is [streaming] plus the nested [streaming.hot_storage].
type StreamingConfig struct {
// RetentionChunks is the retention window in chunks; 0 = full history.
// Default 0.
RetentionChunks *uint32 `toml:"retention_chunks"`

// EarliestLedger is the earliest ledger this daemon will ever have data
// for: "genesis", "now", or a chunk-aligned decimal ledger. Default
// "genesis". Pinned immutably on first start.
EarliestLedger string `toml:"earliest_ledger"`

// CaptiveCoreConfig is the path to the CaptiveStellarCore config file.
// Required.
CaptiveCoreConfig string `toml:"captive_core_config"`

// HotStorage is [streaming.hot_storage].
HotStorage StoragePathConfig `toml:"hot_storage"`
}

// LoggingConfig is [logging].
type LoggingConfig struct {
// Level is debug/info/warn/error. Default "info".
Level string `toml:"level"`
// Format is text/json. Default "text".
Format string `toml:"format"`
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design → Configuration

The one --config TOML schema: [service], [backfill(.bsb)], [immutable_storage.*], [catalog], [streaming(.hot_storage)], [logging]. Optional scalars are pointers so an absent key is distinguishable from an explicit zero (defaults applied in WithDefaults). Per the issue, validateConfig (malformed-value rejection + earliest_ledger pin resolution) is intentionally deferred to Layer 4 / #812 because it calls networkTip — this layer ships schema + loader + defaults only. Note ParseConfig (L153) decodes strictly, so a typo in a layout-defining key like earliest_ledger fails loudly instead of silently pinning a default on first start.

Comment on lines +12 to +104
// Single-process enforcement (design "Single-process enforcement"). The daemon
// holds a kernel flock on a LOCK file under EVERY independently configurable
// storage root — the catalog, each immutable_storage tree, AND the
// hot_storage tree. A second daemon that touches any shared root fails fast.
//
// Why all roots and not just the catalog: [catalog], each
// [immutable_storage.*] path, and [streaming.hot_storage] are independently
// configurable, so two daemons with DIFFERENT catalogs could still share an
// artifact tree or a hot-DB tree. The hot root matters most — its hot/{chunk}
// DBs are the only copy of recently-ingested ledgers, independently
// created/opened/deleted by ingestion and discard, so two daemons sharing it
// would corrupt or delete that sole copy.
//
// A kernel flock is the right primitive: it releases on ANY process exit
// (including kill -9 / a crash), so a stale lock never strands the next start —
// nothing on disk to clean up.

// ErrRootLocked is returned when a LOCK file in a configured root is already
// held by another process. It wraps the offending root so the daemon can name
// it in the operator-facing error.
var ErrRootLocked = errors.New("streaming: storage root is locked by another process")

// lockFileName is the per-root lock file. Kept distinct from RocksDB's own
// "LOCK" so the catalog root's flock and RocksDB's internal lock never
// collide — the catalog root carries both, on different files.
const lockFileName = "stellar-rpc-fullhistory.lock"

// RootLocks holds the flock handles for every configured storage root. Release
// (defer'd by the daemon for the process's whole life) unlocks and closes them
// all; the kernel also drops them on any process exit.
type RootLocks struct {
files []*os.File
}

// LockRoots takes a non-blocking exclusive flock on the LOCK file in each
// distinct root in roots, in the order given. Duplicate paths (e.g. the
// immutable trees all defaulting under default_data_dir is NOT a duplicate —
// they are distinct subdirs — but a caller passing the same root twice) are
// de-duplicated so one root is locked once. On the FIRST root that is already
// held by another process it releases everything acquired so far and returns
// ErrRootLocked naming that root — fail fast, leak nothing.
//
// Each root directory is created if absent (MkdirAll): a fresh deployment locks
// before any store opens, and the lock file must have a directory to live in.
func LockRoots(roots ...string) (*RootLocks, error) {
locks := &RootLocks{}
seen := make(map[string]struct{}, len(roots))
for _, root := range roots {
if root == "" {
continue
}
abs, err := filepath.Abs(root)
if err != nil {
locks.Release()
return nil, fmt.Errorf("streaming: resolve lock root %q: %w", root, err)
}
if _, dup := seen[abs]; dup {
continue
}
seen[abs] = struct{}{}

f, err := lockOne(abs)
if err != nil {
locks.Release()
return nil, err
}
locks.files = append(locks.files, f)
}
return locks, nil
}

// lockOne creates root (if absent), opens its LOCK file, and takes a
// non-blocking exclusive flock. An EWOULDBLOCK means another live process holds
// it — surfaced as ErrRootLocked, the fail-fast case. Any other error (mkdir,
// open, a non-contention flock failure) surfaces verbatim.
func lockOne(root string) (*os.File, error) {
if err := os.MkdirAll(root, 0o755); err != nil {
return nil, fmt.Errorf("streaming: create lock root %q: %w", root, err)
}
path := filepath.Join(root, lockFileName)
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0o644)
if err != nil {
return nil, fmt.Errorf("streaming: open lock file %q: %w", path, err)
}
if err := unix.Flock(int(f.Fd()), unix.LOCK_EX|unix.LOCK_NB); err != nil {
_ = f.Close()
if errors.Is(err, unix.EWOULDBLOCK) {
return nil, fmt.Errorf("%w: %q (another daemon is using it)", ErrRootLocked, root)
}
return nil, fmt.Errorf("streaming: flock %q: %w", path, err)
}
return f, nil
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design → Concurrency model → Single-process enforcement

A non-blocking kernel flock per storage root (config.go:235 LockRoots enumerates them: catalog + every immutable_storage tree + hot_storage; the shared data dir itself is deliberately not locked). A second daemon touching any shared root fails fast with ErrRootLocked. flock is the right primitive per the spec because it releases on any process exit (incl. kill -9 / crash), so a stale lock never strands the next start — nothing on disk to clean up.

// The whole batch shares three barriers: one demote batch, one fsync pass over
// the affected parent dirs, one key-delete batch — so sweeping many refs at
// once pays a single round of each.
func (c *Catalog) SweepChunkArtifacts(refs []ArtifactRef) error {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pruning to be done in the Slice 1: Layer 3 orchestration layer

chowbao added 5 commits June 23, 2026 23:15
Replace the per-data-type [immutable_storage.*] overrides with ONE
[immutable_storage] path (the cold-tier root); ledgers/events/txhash are
fixed subdirectories beneath it. The tier boundaries that actually matter
for ops — catalog / cold / hot — stay independently configurable; the
per-cold-type granularity (4 roots, 4 flocks, 4 derivation branches + the
divergence-bug class it created) is dropped. The cold root defaults to the
data dir, so the on-disk layout is unchanged.

- config: ImmutableStorageConfig{Ledgers}→{Path}; Paths.Ledgers→Cold;
  ResolvePaths derives Cold (default = data dir); LockRoots locks the cold
  root. paths: NewLayoutFromPaths derives ledgersRoot from Cold (NewLayout
  unchanged). Slice 3 re-adds an optional txhash_index_path override.
- Add TestArtifactPaths_EveryKindMapped: assert every kind in allKinds has a
  non-empty Layout.ArtifactPaths mapping. The kind registry (allKinds) and the
  file mapping (ArtifactPaths) are separate sources of truth; a kind added to
  allKinds without its path case makes the sweep unlink nothing and delete the
  key, orphaning the files (the one orphan class the key-driven sweep prevents).
  This fails at CI on drift instead of orphaning at runtime; it extends
  automatically as later slices add kinds.
- catalog_sweep doc: 'freezing' precedes 'frozen' (freezing→frozen→pruning), so
  reword 'already past frozen' to 'not in the frozen state' (code unchanged).
Comment-only cleanup of the foundations layer — no code changes, behavior
byte-identical. Moderate trim:

- Consolidate the one-write protocol, which was re-explained in four files,
  into its authoritative home (catalog_protocol.go); keys.go/catalog.go now
  reference it instead of restating it.
- Collapse multi-line prose `// ----` section dividers to one-line labels.
- Drop the speculative per-layer enumeration in doc.go's package comment.
- Cut restatement and over-justification, keeping all crash-safety ordering
  rationale (catalog_sweep.go left untouched) and design-doc references.

Comment lines: 581 -> 497. gofmt clean; go vet and go test -short pass.
The ce07f68 refactor replaced the per-data-type [immutable_storage.*]
overrides with a single [immutable_storage] cold-tier root, but several
comments still described the old "one configurable path per artifact tree"
model. The intent of each comment is unchanged; only the model it describes
is corrected. Comment-only — behavior byte-identical.

- config.go: ImmutableStorageConfig is one [immutable_storage] cold-tier root,
  not one path per tree (the contradicting field doc was already correct);
  StoragePathConfig is now used only by [streaming.hot_storage].
- config_lock.go: the locked roots are catalog / cold-tier / hot, not "each
  immutable_storage tree".
- paths.go: the configurable boundaries are catalog / cold-tier / hot, with the
  ledgers tree derived under the cold root (no longer independently settable).

gofmt clean; go vet and go test -short pass.
Cross-checked Layer 1's "later layer (...)" forward-references against the
actual later-layer code (slice1b/1c, streaming-slice1-ledgers). Two were
phantom; the rest (processChunk, RunColdChunk, the resolver, etc.) are accurate.

- failCommitBatch: fired only from recovery.go — there is no CommitIndex
  anywhere in the stack. (recovery/CommitIndex) -> (recovery).
- commitBatchShouldFail: called only from recovery.go:239; catalog_protocol
  never calls it. (catalog_protocol/recovery) -> (recovery).

Comment-only; nolint directives preserved. gofmt clean; go vet and
go test -short pass.
Comment on lines +33 to +39
type crashHooks struct {
beforeKeyDelete func()
beforeUnlink func()
failCommitBatch func() bool //nolint:unused // fired from a later layer (recovery)
afterMarkFreezing func() //nolint:unused // fired from a later layer (processChunk)
beforeHotTransient func(chunkID chunk.ID)
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design → Substrate assumptions (the crash-safety verification seam)

Test-only fault-injection points — nil and no-op in production — fired from inside the real protocol/sweep methods. They exist because the crash-safety invariants are properties of the step ordering in the actual catalog methods, not of a hand-replayed test (which can stay green after the production order breaks). Each hook observes durable state at one load-bearing instant; see TestCrashSafety_* and TestSweepChunk_NeverUnlinksUnderFrozenKey in streaming_test.go for the assertions they enable.

Comment on lines +87 to +112
// Scans. Every "find work" operation iterates keys via PrefixScan; results
// are sorted so callers (maxChunk, uniqueness checks) need no second pass.

// ChunkArtifactKeys returns every per-chunk artifact key (all kinds, all
// chunks) with its value, sorted by key. This is the deletion/audit surface
// for chunk:* keys.
func (c *Catalog) ChunkArtifactKeys() ([]ArtifactRef, error) {
var refs []ArtifactRef
for e, err := range c.store.PrefixScan(chunkPrefix) {
if err != nil {
return nil, err
}
id, kind, ok := parseChunkKey(e.Key)
if !ok {
return nil, fmt.Errorf("streaming: malformed chunk key %q", e.Key)
}
refs = append(refs, ArtifactRef{Chunk: id, Kind: kind, State: State(e.Value)})
}
return refs, nil
}

// HotChunkKeys returns every hot-DB chunk id (value-blind), sorted ascending.
// The highest is the live chunk — the ingestion/lifecycle partition boundary.
func (c *Catalog) HotChunkKeys() ([]chunk.ID, error) {
return c.hotChunkKeysWith(nil)
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design → Data model

The keys-first contract in code: every "find work" scan iterates meta-store keys via PrefixScannothing lists a directory. Together with the type doc above ("progress is derived, never stored"), this is the design's pure-catalog rule. ReadyHotChunkKeys vs value-blind HotChunkKeys is what lets recovery demote a hot key to transient without disturbing the watermark.

return closeErr
}

// fsyncDir fsyncs a directory entry, making creations and unlinks within it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design → One write protocol durability · Substrate assumptions

The fsync barriers the protocol and sweeps rely on. barrierNewFile is the exact two-level barrier the spec mandates before a key flips to frozen: file + parent dirent, plus the grandparent dirent when the write created the parent (a new bucket dir every 1000th chunk). fsyncParentDirs is the "unlink durable before the key delete" barrier used by the sweep. These encode the design's substrate assumption that a creation is durable only once both the data and the directory entry naming it are fsynced.

chowbao added a commit that referenced this pull request Jun 24, 2026
Comment-only (behavior byte-identical; gofmt/vet/-short green). Same moderate
trim applied to #805: consolidate the repeated "decision (a)" explanation
(single shared DB, one atomic synced WriteBatch, single watermark) into its
authoritative home — the pkg/stores/hotchunk package doc — and have process.go /
hotsource.go / service.go / driver.go / hot_store.go reference it instead of
re-stating it. Collapse prose dividers, cut restatement/over-justification.
Kept all load-bearing rationale: loss-vs-staleness, mark-then-write ordering,
the borrowed-buffer contract, the shared-vs-standalone close-once ownership
model, and the live-chunk RocksDB-LOCK caller contract.

(This branch was also rebased onto the updated slice1a, so it inherits the L1
comment polish from #805; the diff vs slice1a is now L2-only.)
@chowbao

chowbao commented Jun 24, 2026

Copy link
Copy Markdown
Contributor Author

Superseded by a new 2-phase stacked series that re-slices this work by phase (backfill → live ingestion + lifecycle), with the MVP scope cuts (recovery / audit / convergence / retention-reconfiguration dropped) and the folded-in fixes (cold+hot ingest service + NewPrometheusSink metrics wiring, exponential withRetries backoff, deletion of the dead RunHot/RunCold stream-drain orchestration):

Each layer builds + go vet + go test -short green; the capstone (#821) also passes the lifecycle E2E. Leaving this open for now — can be closed once the new stack is reviewed.

@chowbao chowbao closed this Jun 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant