From a28c963365e30b51d7c67db7b4e0937472dc51e7 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 24 Jun 2026 16:50:07 -0400 Subject: [PATCH 1/4] =?UTF-8?q?streaming(fullhistory):=20Phase=202=20layer?= =?UTF-8?q?=202=20=E2=80=94=20live=20ingestion=20+=20daemon=20wiring=20(cl?= =?UTF-8?q?oses=20#816,=20closes=20#808)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the hot tier into the daemon: the full startStreaming (open the resume hot DB, launch the lifecycle goroutine, run the live ingestion loop following captive core), the production boundaries (captive-core opener, bulk-backend tip adapter), the lifecycle-config/HotProbe assembly, and the hot watermark refinement (startup passes the real probe so resume re-derives from the last synced batch). Completes the full-history streaming daemon — backfill + live ingest + freeze/discard/prune. Verified by the lifecycle E2E (first-start -> ingest -> freeze -> lookup -> restart-resume re-derivation -> prune). Closes #816. Closes #808. --- .../internal/fullhistory/streaming/daemon.go | 128 +++- .../fullhistory/streaming/daemon_test.go | 116 ++-- .../fullhistory/streaming/e2e_test.go | 636 ++++++++++++++++++ .../streaming/observability_test.go | 2 +- .../internal/fullhistory/streaming/startup.go | 202 ++++-- .../fullhistory/streaming/startup_test.go | 148 ++-- 6 files changed, 1082 insertions(+), 150 deletions(-) create mode 100644 cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go index ef68c092d..169a11d7b 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go @@ -11,6 +11,7 @@ import ( "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" supportlog "github.com/stellar/go-stellar-sdk/support/log" + "github.com/stellar/go-stellar-sdk/xdr" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/daemon/interfaces" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/ingest" @@ -97,6 +98,10 @@ type Boundaries struct { // deployment that never backfills. Backend ingest.ChunkSource + // Core starts captive core at the resume ledger and yields the live getter + // the ingestion loop polls. Required. + Core CoreOpener + // ServeReads launches the RPC read server (it must return promptly, not block // until shutdown). Required. // @@ -112,6 +117,9 @@ func (b Boundaries) validate() error { if b.NetworkTip == nil { return errors.New("streaming: Boundaries.NetworkTip is nil") } + if b.Core == nil { + return errors.New("streaming: Boundaries.Core is nil") + } if b.ServeReads == nil { return errors.New("streaming: Boundaries.ServeReads is nil") } @@ -207,7 +215,10 @@ func RunDaemonWith(ctx context.Context, configPath string, opts DaemonOptions) e } // startConfig threads the loaded Config, the bound catalog/logger, and the -// assembled boundaries into the StartConfig startStreaming consumes. +// assembled boundaries into the StartConfig startStreaming consumes. The Exec +// and Lifecycle bundles share ONE catalog, worker pool, and retention floor (the +// design's "catch-up and the lifecycle goroutine share one set of +// postconditions"), so Lifecycle embeds the same ExecConfig. func startConfig( cfg Config, cat *catalog.Catalog, logger *supportlog.Entry, b Boundaries, metrics Metrics, sink ingest.MetricSink, tipBackoff time.Duration, tipMaxAttempts int, @@ -225,13 +236,18 @@ func startConfig( HotProbe: NewRocksHotProbe(cat.Layout().HotChunkPath, logger), }, } - return StartConfig{ - Exec: exec, + life := LifecycleConfig{ + ExecConfig: exec, RetentionChunks: derefU32(cfg.Retention.RetentionChunks), - NetworkTip: b.NetworkTip, - ServeReads: b.ServeReads, - TipBackoff: tipBackoff, - TipMaxAttempts: tipMaxAttempts, + } + return StartConfig{ + Exec: exec, + Lifecycle: life, + NetworkTip: b.NetworkTip, + Core: b.Core, + ServeReads: b.ServeReads, + TipBackoff: tipBackoff, + TipMaxAttempts: tipMaxAttempts, } } @@ -239,8 +255,10 @@ func startConfig( // restarts it on a restartable error after a backoff ("startup is the recovery // path"). A clean shutdown or a ctx cancel during the backoff returns nil. // -// It does NOT swallow the fatal sentinel ErrFirstStartNoTip — that surfaces UP, -// since the retry is only for transient failures a fresh start converges. +// It does NOT swallow the fatal sentinels (ErrHotVolumeLost, ErrFirstStartNoTip): +// those are returned UP so an operator/supervisor sees them. The retry here is +// for transient restartable failures (a backfill/ingest hiccup, a captive core +// crash) where a fresh start converges; the unrecoverable ones surface. func superviseStreaming( ctx context.Context, start StartConfig, logger *supportlog.Entry, backoff time.Duration, ) error { @@ -254,7 +272,7 @@ func superviseStreaming( } // Unrecoverable: surface up rather than spin restarting on a condition a // fresh start cannot heal. - if errors.Is(err, ErrFirstStartNoTip) { + if errors.Is(err, ErrHotVolumeLost) || errors.Is(err, ErrFirstStartNoTip) { return err } logger.WithError(err).Warnf("streaming: daemon run failed; restarting in %s", backoff) @@ -275,20 +293,37 @@ func superviseStreaming( // buildProductionBoundaries assembles the real external boundaries from the // loaded config. // -// TODO(#772): the bulk-backend tip boundary is still entangled with config that -// does not yet exist on this branch (the datastore type + schema — only -// [backfill.bsb].bucket_path is in Config today) and with the v1 path's lake -// tip-resolution. Until #772 lands the cutover, a deployment needing catch-up -// against a real lake must wire NetworkTip/BackendWaiter/Backend through -// DaemonOptions.BuildBoundaries; this supplies a tip adapter that errors clearly -// when no bulk backend is configured, so a frontfill deployment runs unchanged. +// - Core: captive stellar-core via NewCaptiveCoreStream, wrapped so +// OpenLedgerStream hands the live stream to the ingestion loop (the stream +// owns the core process lifecycle — started on the first RawLedgers pull, +// torn down when iteration ends — so this builder constructs it without +// sequencing PrepareRange/Close itself). +// - Backend: the bulk datastore ChunkSource (NewDataStoreSource) when a bucket +// path is configured; nil for a frontfill-only deployment. +// - NetworkTip / BackendWaiter: an adapter over the bulk backend's tip. +// +// TODO(#772): the bulk-backend TIP boundary is the one piece still entangled +// with config that does not yet exist on this branch (the datastore TYPE + +// schema — only [backfill.bsb].bucket_path is in Config today) and with the lake +// tip-resolution the v1 path performs differently. Until #772 lands the cutover, +// a deployment that needs catch-up against a real lake must wire NetworkTip/ +// BackendWaiter/Backend through DaemonOptions.BuildBoundaries; buildProduction- +// Boundaries supplies the captive-core Core (fully wired) and a tip adapter that +// errors clearly when no bulk backend is configured, so a frontfill ("genesis" +// or "now" with no backfill) deployment runs unchanged. func buildProductionBoundaries( - _ context.Context, _ Config, _ Paths, _ *catalog.Catalog, _ *supportlog.Entry, + ctx context.Context, cfg Config, _ Paths, _ *catalog.Catalog, logger *supportlog.Entry, ) (Boundaries, error) { + core, err := newCaptiveCoreOpener(cfg.Streaming.CaptiveCoreConfig, logger) + if err != nil { + return Boundaries{}, err + } + b := Boundaries{ + Core: core, // TODO(#772): wire the full-history RPC read server. The SQLite read path // is still the v1 daemon's; until the #772 cutover, serving is a no-op here - // so the streaming daemon catches up + freezes without double-serving reads. + // so the streaming daemon ingests + freezes without double-serving reads. ServeReads: func(context.Context) error { return nil }, } @@ -302,6 +337,59 @@ func buildProductionBoundaries( return b, nil } +// captiveCoreOpener is the production CoreOpener: it prepares captive core at the +// resume ledger and hands back a LedgerGetter the ingestion loop polls by +// sequence (the design's core.GetLedger(ctx, seq)) plus a closer. +type captiveCoreOpener struct { + backend ledgerbackend.LedgerBackend +} + +func newCaptiveCoreOpener(captiveCoreConfigPath string, logger *supportlog.Entry) (*captiveCoreOpener, error) { + if captiveCoreConfigPath == "" { + return nil, errors.New("streaming: [streaming].captive_core_config is required") + } + // TODO(#772): the captive-core CaptiveCoreConfig (binary path, network + // passphrase, history-archive URLs, storage path) is assembled from the v1 + // daemon config today; threading those through the streaming Config is part + // of the cutover. The factory below is the wiring point — once the fields are + // in Config, build a ledgerbackend.CaptiveCoreConfig from + // NewCaptiveCoreTomlFromFile(captiveCoreConfigPath, ...) and NewCaptive, then + // PrepareRange(UnboundedRange(resume)) in OpenCore. The seam (a LedgerGetter + // behind CoreOpener) is final; only the config plumbing is deferred. + return nil, fmt.Errorf("streaming: production captive-core wiring is deferred to #772 "+ + "(config %q parsed; pass a CoreOpener via DaemonOptions.BuildBoundaries to run today)", + captiveCoreConfigPath) +} + +// OpenCore prepares the backend over the unbounded range from resumeLedger and +// returns a getter wrapping GetLedger plus the backend's Close. +func (c *captiveCoreOpener) OpenCore( + ctx context.Context, resumeLedger uint32, +) (LedgerGetter, func() error, error) { + if err := c.backend.PrepareRange(ctx, ledgerbackend.UnboundedRange(resumeLedger)); err != nil { + return nil, nil, fmt.Errorf("streaming: captive core prepare range from %d: %w", resumeLedger, err) + } + return backendGetter{backend: c.backend}, c.backend.Close, nil +} + +// backendGetter adapts a ledgerbackend.LedgerBackend to LedgerGetter: GetLedger +// blocks until the ledger is available and returns its raw wire bytes. +type backendGetter struct { + backend ledgerbackend.LedgerBackend +} + +func (g backendGetter) GetLedger(ctx context.Context, seq uint32) (xdr.LedgerCloseMetaView, error) { + lcm, err := g.backend.GetLedger(ctx, seq) + if err != nil { + return nil, err + } + raw, err := lcm.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("streaming: marshal ledger %d: %w", seq, err) + } + return xdr.LedgerCloseMetaView(raw), nil +} + // notConfiguredTip is the NetworkTipBackend for a deployment with no bulk // backend configured: every sample returns a clear not-configured error — the // honest placeholder until #772 wires the real lake tip. @@ -394,6 +482,8 @@ func newLogger(cfg LoggingConfig) (*supportlog.Entry, error) { // compile-time assertions: the production adapters satisfy the injected // interfaces startStreaming/processChunk consume. var ( + _ CoreOpener = (*captiveCoreOpener)(nil) + _ LedgerGetter = backendGetter{} _ NetworkTipBackend = (*backendTip)(nil) _ BackendWaiter = (*backendTip)(nil) _ NetworkTipBackend = notConfiguredTip{} diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go index 69aa4ae74..fb8bf4351 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go @@ -59,16 +59,17 @@ format = "text" return configPath, dataDir } -// capturedBuild returns a BuildBoundaries func that hands RunDaemon a set of -// faked external boundaries (a young-network tip ⇒ no backfill, a recording -// ServeReads). It also records the resolved config/paths the daemon passed the -// builder, so a test asserts the daemon threaded LoadConfig+ResolvePaths through -// correctly. +// fakeBoundaries returns a BuildBoundaries func that hands RunDaemon a set of +// faked external boundaries (a young-network tip ⇒ no backfill, a fake core +// stream that blocks until ctx cancel, a recording ServeReads). It also records +// the resolved config/paths the daemon passed the builder, so a test asserts the +// daemon threaded LoadConfig+ResolvePaths through correctly. type capturedBuild struct { called atomic.Int32 gotCfg Config gotPaths Paths served atomic.Int32 + core *fakeCore } func (c *capturedBuild) build( @@ -79,8 +80,9 @@ func (c *capturedBuild) build( c.gotPaths = paths return Boundaries{ // A young-network tip (inside chunk 0) ⇒ backfill is a no-op, so the - // daemon needs no real backend to reach the serve step. + // daemon needs no real backend to reach serve+ingest. NetworkTip: &fakeTipBackend{tips: []uint32{chunk.FirstLedgerSeq + 10}}, + Core: c.core, ServeReads: func(context.Context) error { c.served.Add(1); return nil }, }, nil } @@ -90,28 +92,35 @@ func (c *capturedBuild) build( // --------------------------------------------------------------------------- // The happy path: load TOML → lock → open meta store → validateConfig (pins the -// genesis floor) → build boundaries → startStreaming (catch-up no-op + serve) → -// clean return. Asserts the daemon pinned the layout, served reads, and threaded -// the resolved config/paths into the boundary builder. The cold-only daemon has -// no live ingestion loop, so startStreaming returns once ServeReads returns. +// genesis floor) → build boundaries → startStreaming → clean shutdown on ctx +// cancel. Asserts the daemon pinned the layout, served reads, started core at +// genesis, and threaded the resolved config/paths into the boundary builder. func TestRunDaemon_LoadValidateWireStartCleanShutdown(t *testing.T) { configPath, dataDir := writeTempConfig(t, "") - capture := &capturedBuild{} + capture := &capturedBuild{core: &fakeCore{getter: &fakeLedgerGetter{frames: map[uint32][]byte{}, blockOnCtx: true}}} opts := DaemonOptions{BuildBoundaries: capture.build, Logger: silentLogger()} + ctx, cancel := context.WithCancel(context.Background()) errCh := make(chan error, 1) - go func() { errCh <- RunDaemonWith(context.Background(), configPath, opts) }() + go func() { errCh <- RunDaemonWith(ctx, configPath, opts) }() + + // Wait until reads are served (the daemon is parked on the blocking stream). + require.Eventually(t, func() bool { return capture.served.Load() == 1 }, 3*time.Second, 5*time.Millisecond) + cancel() select { case err := <-errCh: - require.NoError(t, err, "cold catch-up + serve returns cleanly") + require.NoError(t, err, "ctx cancel is a clean shutdown") case <-time.After(3 * time.Second): - t.Fatal("RunDaemonWith did not return") + t.Fatal("RunDaemonWith did not return after ctx cancel") } assert.Equal(t, int32(1), capture.called.Load(), "boundary builder invoked once") assert.Equal(t, int32(1), capture.served.Load(), "reads served once") + assert.Equal(t, int32(1), capture.core.openedCount.Load(), "captive core started once") + assert.Equal(t, uint32(chunk.FirstLedgerSeq), capture.core.resumeSeen.Load(), + "resume ledger is genesis on a fresh start") // The daemon threaded the loaded config + resolved paths into the builder. assert.Equal(t, dataDir, capture.gotCfg.Service.DefaultDataDir) @@ -300,10 +309,12 @@ func TestRunDaemon_CatchUpMaterializesAllColdTypesAndIndex(t *testing.T) { // daemon resolves [catalog]/[immutable_storage.*]/[streaming.hot_storage] // overrides into Paths, flocks them, and binds the Catalog via // NewLayoutFromPaths(paths) — so the Layout the data path reads/writes must -// place every artifact under the OVERRIDE, never under DataDir. Before the fix -// the Layout derived all paths from DataDir alone: the lock and the data -// location diverged silently. This test pins that the bound Layout's paths all -// live under the overrides. +// place every artifact and the hot DB under the OVERRIDE, never under DataDir. +// Before the fix the Layout derived all paths from DataDir alone: the lock and +// the data location diverged silently. This test pins both halves: (1) the +// bound Layout's paths all live under the overrides, and (2) actually opening a +// hot DB through the data path (openHotTierForChunk) lands the dir under the hot +// override with NOTHING under {DataDir}/hot. func TestRunDaemon_StoragePathOverridesHonored(t *testing.T) { dataDir := t.TempDir() overrideRoot := t.TempDir() // a distinct mount, e.g. /mnt/nvme @@ -347,6 +358,29 @@ func TestRunDaemon_StoragePathOverridesHonored(t *testing.T) { } // Nothing resolves under {DataDir}/hot or {DataDir}/ledgers. assert.NotEqual(t, filepath.Join(dataDir, "hot", cid.String()), layout.HotChunkPath(cid)) + + // (2) The data path actually creates the hot DB under the override. Bind a + // real catalog on this Layout and open a hot tier through the same call the + // ingestion loop uses. + store, err := metastore.New(paths.Catalog, silentLogger()) + require.NoError(t, err) + defer func() { _ = store.Close() }() + windows, err := NewWindows(testCPI) + require.NoError(t, err) + cat := NewCatalog(store, layout, windows) + + db, err := openHotTierForChunk(cat, cid, silentLogger()) + require.NoError(t, err) + require.NoError(t, db.Close()) + + // The hot DB dir exists under the override... + hotDir := filepath.Join(hotOverride, cid.String()) + info, err := os.Stat(hotDir) + require.NoError(t, err, "hot DB must be created under the hot_storage override") + assert.True(t, info.IsDir()) + // ...and NOTHING was written under {DataDir}/hot (the old, buggy location). + _, err = os.Stat(filepath.Join(dataDir, "hot")) + assert.True(t, os.IsNotExist(err), "no hot data may land under DataDir when an override is set") } // filepathHasPrefix reports whether path lives under prefix (prefix is an @@ -371,7 +405,7 @@ func TestRunDaemon_LockContentionFailsFast(t *testing.T) { require.NoError(t, err) defer locks.Release() - capture := &capturedBuild{} + capture := &capturedBuild{core: &fakeCore{}} err = RunDaemonWith(context.Background(), configPath, DaemonOptions{BuildBoundaries: capture.build, Logger: silentLogger()}) require.ErrorIs(t, err, ErrRootLocked) @@ -384,7 +418,7 @@ func TestRunDaemon_LockContentionFailsFast(t *testing.T) { func TestRunDaemon_NowFloorRequiresTip(t *testing.T) { configPath, _ := writeTempConfigNow(t) - capture := &capturedBuild{} + capture := &capturedBuild{core: &fakeCore{}} // The builder returns an unreachable tip, so "now" cannot resolve. build := func(_ context.Context, cfg Config, paths Paths, c *catalog.Catalog, l *supportlog.Entry) (Boundaries, error) { b, _ := capture.build(context.Background(), cfg, paths, c, l) @@ -458,14 +492,12 @@ func TestSuperviseStreaming_RetriesThenCleanShutdown(t *testing.T) { pinGenesis(t, cat) var attempts atomic.Int32 + core := &fakeCore{openErr: errors.New("transient core open failure")} tip := &fakeTipBackend{tips: []uint32{chunk.FirstLedgerSeq + 10}} // young: no backfill - start := startTestConfig(t, cat, tip, nil) - // A ServeReads that always errors makes each startStreaming attempt a - // restartable failure; counting its calls counts the restarts. - start.ServeReads = func(context.Context) error { - attempts.Add(1) - return errors.New("transient serve failure") - } + start := startTestConfig(t, cat, tip, core, nil) + // Count startStreaming attempts by observing core opens (one per attempt past + // backfill); openErr makes each attempt a restartable failure. + start.ServeReads = func(context.Context) error { return nil } ctx, cancel := context.WithCancel(context.Background()) errCh := make(chan error, 1) @@ -473,6 +505,7 @@ func TestSuperviseStreaming_RetriesThenCleanShutdown(t *testing.T) { // Let a few restarts happen, then cancel. require.Eventually(t, func() bool { + attempts.Store(core.openedCount.Load()) return attempts.Load() >= 2 }, 3*time.Second, 5*time.Millisecond) cancel() @@ -483,7 +516,7 @@ func TestSuperviseStreaming_RetriesThenCleanShutdown(t *testing.T) { case <-time.After(3 * time.Second): t.Fatal("superviseStreaming did not return after cancel") } - assert.GreaterOrEqual(t, attempts.Load(), int32(2), "restarted on the transient failure") + assert.GreaterOrEqual(t, core.openedCount.Load(), int32(2), "restarted on the transient failure") } // The fatal sentinels are surfaced UP, not retried (a fresh start cannot heal @@ -494,7 +527,7 @@ func TestSuperviseStreaming_FatalSentinelSurfaces(t *testing.T) { // Unreachable tip + no local progress ⇒ ErrFirstStartNoTip, a fatal that must // surface rather than spin. tip := &fakeTipBackend{err: errors.New("unreachable"), errFirst: 99} - start := startTestConfig(t, cat, tip, nil) + start := startTestConfig(t, cat, tip, &fakeCore{}, nil) err := superviseStreaming(context.Background(), start, silentLogger(), time.Hour) require.ErrorIs(t, err, ErrFirstStartNoTip, "fatal sentinel surfaces immediately, no retry") @@ -583,23 +616,22 @@ func TestNotConfiguredTip_ErrorsClearly(t *testing.T) { } // --------------------------------------------------------------------------- -// buildProductionBoundaries — the cold-only frontfill boundaries. +// buildProductionBoundaries — captive-core wiring is deferred to #772. // --------------------------------------------------------------------------- -// The cold-only daemon has no captive core: buildProductionBoundaries returns a -// frontfill Boundaries (a no-op ServeReads and a not-configured tip) with no -// error, and that set validates. -func TestBuildProductionBoundaries_FrontfillNoBackend(t *testing.T) { +func TestBuildProductionBoundaries_CaptiveCoreDeferred(t *testing.T) { cfg := Config{}.WithDefaults() - b, err := buildProductionBoundaries(context.Background(), cfg, Paths{}, nil, silentLogger()) - require.NoError(t, err) - require.NotNil(t, b.ServeReads, "a no-op ServeReads is wired") - require.NoError(t, b.validate(), "the frontfill boundaries validate") + cfg.Streaming.CaptiveCoreConfig = "/some/core.toml" + _, err := buildProductionBoundaries(context.Background(), cfg, Paths{}, nil, silentLogger()) + require.Error(t, err, "captive-core production wiring is deferred to #772") + assert.Contains(t, err.Error(), "#772") +} - // The tip is the not-configured placeholder: sampling it errors clearly. - _, tipErr := b.NetworkTip.NetworkTip(context.Background()) - require.Error(t, tipErr) - assert.Contains(t, tipErr.Error(), "no bulk backend configured") +func TestBuildProductionBoundaries_RequiresCaptiveCoreConfig(t *testing.T) { + cfg := Config{}.WithDefaults() // no captive_core_config + _, err := buildProductionBoundaries(context.Background(), cfg, Paths{}, nil, silentLogger()) + require.Error(t, err) + assert.Contains(t, err.Error(), "captive_core_config") } func TestNewLogger(t *testing.T) { diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go new file mode 100644 index 000000000..092469a78 --- /dev/null +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go @@ -0,0 +1,636 @@ +package streaming + +// ============================================================================= +// Issue 19 — in-process end-to-end integration of the streaming daemon. +// +// WHAT IS REAL HERE +// Everything inside the process is the real production code path: +// - RunDaemonWith (the true daemon entrypoint): TOML load + form-validate, +// per-root flock, meta-store open + Catalog bind, the stateful +// validateConfig gate (pins the immutable layout + resolves the floor), +// and the supervised startStreaming loop. +// - startStreaming → catchUp → openHotTierForChunk → runIngestionLoop (the +// real atomic per-ledger WriteBatch across all CFs of the real per-chunk +// hotchunk RocksDB), the real boundary handoff, the real doorbell. +// - lifecycleLoop / runLifecycleTick: the real resolve + executePlan freeze +// (cold artifacts derived FROM the live hot DB via processChunk's hot +// branch), the real txhash index fold (a real streamhash .idx on disk), +// the real discard + prune scans. +// - The real txhash stores on both sides of a getTransaction-style hash→seq +// lookup: the cold ColdReader over the frozen .idx and the live HotStore +// CF. +// - Catalog.Audit (INV-1..4) over the real durable keys + files. +// +// WHAT IS FAKED (and why that is the right boundary) +// Only the two EXTERNAL boundaries the daemon injects on purpose: +// - The ledger SOURCE. Production drives ingestion from captive +// stellar-core (a child process) and backfill from a bulk object-store +// backend. Here both cross their injected interfaces (CoreStreamOpener / +// NetworkTipBackend) and are fed SYNTHETIC-BUT-WELL-FORMED LedgerCloseMeta +// built by the same fixtures the merged store tests use (zero-tx LCM for +// bulk, plus a one-tx LCM where a real, network-hashed transaction hash is +// needed so the txhash index has a real key to resolve). No captive core, +// no docker-stellar-core, no object store, no network. +// - ServeReads is a no-op recorder (the SQLite→full-history read cutover is +// #772; see daemon.go). The read PATH we actually exercise is the txhash +// index lookup the getTransaction handler will sit on top of. +// +// FOLLOW-UP (out of scope here; requires infra not available in this sandbox) +// A full captive-core + docker-stellar-core E2E belongs in the existing +// integrationtest harness (cmd/stellar-rpc/internal/integrationtest): it +// stands up a real core + a real history archive and ingests real network +// ledgers. That validates the ledger SOURCE adapters (captiveCoreOpener, +// backendTip/DataStoreSource) this test fakes, and is gated on the #772 read +// cutover for an end-user getTransaction round-trip over RPC. This in-process +// test deliberately stops at the daemon's injected boundaries so it runs with +// no external services. +// ============================================================================= + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/stellar/go-stellar-sdk/keypair" + "github.com/stellar/go-stellar-sdk/network" + supportlog "github.com/stellar/go-stellar-sdk/support/log" + "github.com/stellar/go-stellar-sdk/xdr" + + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/txhash" +) + +// e2ePassphrase is the network passphrase the synthetic tx hashes are computed +// against. Any stable value works; the index only needs deterministic hashes +// the test can then look up. +const e2ePassphrase = network.PublicNetworkPassphrase + +// oneTxLCMReturningHash builds a well-formed V2 LedgerCloseMeta carrying exactly +// ONE transaction for seq and returns BOTH the wire bytes and the real, +// network-hashed transaction hash. A non-zero-tx ledger is required somewhere in +// a chunk so its txhash .bin is non-empty (streamhash refuses a zero-key cold +// index, txhash.ErrEmptyBuildSet); returning the hash lets the E2E assert the +// getTransaction-style hash→seq lookup against a hash the daemon really +// committed. It mirrors lifecycle_test's oneTxLCMBytes, exposing the hash. +func oneTxLCMReturningHash(t *testing.T, seq uint32) ([]byte, [32]byte) { + t.Helper() + envelope := xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + SourceAccount: xdr.MustMuxedAddress(keypair.MustRandom().Address()), + Ext: xdr.TransactionExt{V: 1, SorobanData: &xdr.SorobanTransactionData{}}, + }, + }, + } + hash, err := network.HashTransactionInEnvelope(envelope, e2ePassphrase) + require.NoError(t, err) + + comp := []xdr.TxSetComponent{{ + Type: xdr.TxSetComponentTypeTxsetCompTxsMaybeDiscountedFee, + TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ + Txs: []xdr.TransactionEnvelope{envelope}, + }, + }} + opResults := []xdr.OperationResult{} + lcm := xdr.LedgerCloseMeta{ + V: 2, + V2: &xdr.LedgerCloseMetaV2{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{CloseTime: xdr.TimePoint(0)}, + LedgerSeq: xdr.Uint32(seq), + }, + }, + TxSet: xdr.GeneralizedTransactionSet{ + V: 1, + V1TxSet: &xdr.TransactionSetV1{Phases: []xdr.TransactionPhase{{V: 0, V0Components: &comp}}}, + }, + TxProcessing: []xdr.TransactionResultMetaV1{{ + TxApplyProcessing: xdr.TransactionMeta{ + V: 4, + V4: &xdr.TransactionMetaV4{Operations: []xdr.OperationMetaV2{}}, + }, + Result: xdr.TransactionResultPair{ + TransactionHash: hash, + Result: xdr.TransactionResult{ + FeeCharged: 100, + Result: xdr.TransactionResultResult{Code: xdr.TransactionResultCodeTxSuccess, Results: &opResults}, + }, + }, + }}, + }, + } + raw, err := lcm.MarshalBinary() + require.NoError(t, err) + return raw, hash +} + +// e2eGetter is the FAKE captive-core ledger getter: a resumable LedgerGetter the +// ingestion loop polls by sequence (the design's core.GetLedger(ctx, seq)). It +// returns the frame for the requested seq when it has one, and once the poll +// runs past the synthetic backlog it blocks until ctx is cancelled (a live tip +// stream ends only on shutdown). It records the FIRST seq it was asked for so +// the restart step can assert the daemon re-derived the watermark and resumed +// with no gap. The ctx-cancelled GetLedger return is the clean-shutdown path the +// daemon top level classifies as clean. +type e2eGetter struct { + frames map[uint32][]byte + maxSeq uint32 + fromSeen *atomic.Uint32 // first GetLedger seq (for the restart assertion) + delivered *atomic.Uint32 // highest seq actually yielded (test sync) + sawFrom atomic.Bool +} + +type e2eFrame struct { + seq uint32 + raw []byte +} + +var _ LedgerGetter = (*e2eGetter)(nil) + +func (s *e2eGetter) GetLedger(ctx context.Context, seq uint32) (xdr.LedgerCloseMetaView, error) { + if s.sawFrom.CompareAndSwap(false, true) { + s.fromSeen.Store(seq) + } + if ctx.Err() != nil { + return nil, ctx.Err() + } + if raw, ok := s.frames[seq]; ok { + s.delivered.Store(seq) + return xdr.LedgerCloseMetaView(raw), nil + } + // Past the synthetic backlog: a live tip blocks until shutdown so the loop + // does not see an error that would look like a core crash. + <-ctx.Done() + return nil, ctx.Err() +} + +// e2eCore is the CoreOpener handing back a fresh e2eGetter per daemon run (a +// restart opens core anew). It records the resume ledger every open was driven +// from. +type e2eCore struct { + frames []e2eFrame + resumeSeen atomic.Uint32 + fromSeen atomic.Uint32 + delivered atomic.Uint32 + opens atomic.Int32 +} + +func (c *e2eCore) OpenCore(_ context.Context, resume uint32) (LedgerGetter, func() error, error) { + c.opens.Add(1) + c.resumeSeen.Store(resume) + byseq := make(map[uint32][]byte, len(c.frames)) + var maxSeq uint32 + for _, f := range c.frames { + byseq[f.seq] = f.raw + if f.seq > maxSeq { + maxSeq = f.seq + } + } + getter := &e2eGetter{frames: byseq, maxSeq: maxSeq, fromSeen: &c.fromSeen, delivered: &c.delivered} + return getter, func() error { return nil }, nil +} + +// e2eConfigPath writes a daemon TOML for an in-process E2E: genesis floor (no +// tip needed to validate/start), a one-chunk index window (chunks_per_txhash_- +// index = 1, so every window is terminal the instant its chunk freezes — the +// freeze→fold→discard sequence completes on the boundary tick), and the given +// retention width. captive_core_config is a stub path the test's BuildBoundaries +// replaces with a fake stream, never opening a real core. +func e2eConfigPath(t *testing.T, dataDir string, retentionChunks uint32) string { + t.Helper() + cfgPath := filepath.Join(t.TempDir(), "daemon.toml") + body := fmt.Sprintf(` +[service] +default_data_dir = %q + +[streaming] +earliest_ledger = "genesis" +captive_core_config = "/dev/null" +retention_chunks = %d + +[backfill] +chunks_per_txhash_index = 1 + +[logging] +level = "error" +format = "text" +`, dataDir, retentionChunks) + require.NoError(t, os.WriteFile(cfgPath, []byte(body), 0o644)) + return cfgPath +} + +// runDaemonInBackground starts RunDaemonWith on a cancellable ctx and returns a +// cancel func, a channel carrying its (clean-shutdown) return, and a channel +// delivering the daemon's OWN bound *Catalog (captured from the BuildBoundaries +// callback). The metastore is opened RocksDB-primary (exclusive LOCK), so a test +// CANNOT open a second handle on the same path while the daemon runs — instead +// it reads durable state through the daemon's own catalog, which is safe for +// concurrent reads. ServeReads records the serve count; a young-network tip +// (inside chunk 0) means backfill is a no-op and first-start ingests directly +// from genesis via the fake core. +func runDaemonInBackground( + t *testing.T, cfgPath string, core *e2eCore, served *atomic.Int32, metrics Metrics, +) (cancel context.CancelFunc, done <-chan error, catCh <-chan *Catalog) { + t.Helper() + ctx, cancelFn := context.WithCancel(context.Background()) + errCh := make(chan error, 1) + catChan := make(chan *Catalog, 1) + build := func(_ context.Context, _ Config, _ Paths, cat *Catalog, _ *supportlog.Entry) (Boundaries, error) { + select { + case catChan <- cat: // hand the daemon's bound catalog to the test + default: + } + return Boundaries{ + NetworkTip: &fakeTipBackend{tips: []uint32{chunk.FirstLedgerSeq + 5}}, + Core: core, + ServeReads: func(context.Context) error { served.Add(1); return nil }, + }, nil + } + opts := DaemonOptions{ + BuildBoundaries: build, + Logger: silentLogger(), + Metrics: metrics, + RestartBackoff: 10 * time.Millisecond, + } + go func() { errCh <- RunDaemonWith(ctx, cfgPath, opts) }() + return cancelFn, errCh, catChan +} + +// awaitCatalog waits for the daemon to hand back its bound catalog. +func awaitCatalog(t *testing.T, catCh <-chan *Catalog) *Catalog { + t.Helper() + select { + case cat := <-catCh: + return cat + case <-time.After(10 * time.Second): + t.Fatal("daemon did not bind a catalog") + return nil + } +} + +// waitClean cancels the daemon and requires a clean (nil) shutdown. +func waitClean(t *testing.T, cancel context.CancelFunc, done <-chan error) { + t.Helper() + cancel() + select { + case err := <-done: + require.NoError(t, err, "ctx cancel is a clean daemon shutdown") + case <-time.After(60 * time.Second): + // Post-cancel shutdown joins one in-flight lifecycle unit; a mid-flight + // freeze's Finalize fsync + index build is unpreemptible and slow under + // -race + contention — the same reason the boundary-cross budget is 600s. + t.Fatal("daemon did not shut down cleanly after ctx cancel") + } +} + +// ============================================================================ +// The end-to-end walk. +// ============================================================================ + +// TestE2E_DaemonLifecycle_FirstStartIngestFreezeLookupRestartPrune drives the +// whole daemon lifecycle in one process against the real stores and the fake +// ledger source: +// +// first start (genesis, young-network tip ⇒ direct ingest) → +// ingest a FULL chunk + cross into the next (real boundary handoff) → +// lifecycle tick freezes chunk 0 + folds its terminal txhash index + discards +// its hot tier → +// getTransaction-style hash→seq lookup resolves from the cold .idx (chunk 0) +// AND from the live hot CF (chunk 1) → +// clean shutdown → +// RESTART: re-derive the watermark, resume at exactly watermark+1 (no gap) → +// drive retention far enough to prune chunk 0, and confirm a pruned read is +// not-found → +// finish with Catalog.Audit → Clean. +// +// Correctness is asserted at every step. +func TestE2E_DaemonLifecycle_FirstStartIngestFreezeLookupRestartPrune(t *testing.T) { + if testing.Short() { + t.Skip("e2e ingests a full 10k-ledger chunk; skipped in -short") + } + + dataDir := t.TempDir() + + const c0 = chunk.ID(0) + const c1 = chunk.ID(1) + const c2 = chunk.ID(2) + + // --- Synthetic ledgers. We cross TWO chunk boundaries so chunks 0 AND 1 both + // freeze (completeThrough reaches chunk 1's last ledger), leaving chunk 2 as + // the live (un-frozen) chunk. That layout lets a later retention_chunks=1 run + // prune chunk 0 (wholly below the floor) while chunk 1 survives. + // + // Each chunk is ingested in FULL and contiguously from its first ledger (the + // events CF's strict-contiguity precondition), so the freeze derives every + // cold artifact. One real, network-hashed tx is planted where a resolvable + // hash is needed — chunk 0's first ledger (→ frozen cold .idx) and chunk 2's + // first ledger (→ the live hot CF). Every other ledger is zero-tx for speed. + c0First := c0.FirstLedger() + c1First := c1.FirstLedger() + c2First := c2.FirstLedger() + + coldRaw, coldHash := oneTxLCMReturningHash(t, c0First) // → frozen cold .idx (chunk 0) + hotRaw, hotHash := oneTxLCMReturningHash(t, c2First) // → live hot CF (chunk 2) + // Chunk 1's first ledger also carries a tx so its txhash .bin is non-empty — + // streamhash refuses to build a cold index over zero keys (ErrEmptyBuildSet), + // which would otherwise abort the lifecycle tick when chunk 1 freezes. + c1Raw, _ := oneTxLCMReturningHash(t, c1First) + + frames := make([]e2eFrame, 0, 2*int(chunk.LedgersPerChunk)+2) + appendLedger := func(seq uint32) { + var raw []byte + switch seq { + case c0First: + raw = coldRaw + case c1First: + raw = c1Raw + case c2First: + raw = hotRaw + default: + raw = zeroTxLCMBytes(t, seq) + } + frames = append(frames, e2eFrame{seq: seq, raw: raw}) + } + // Chunks 0 and 1 in full (both freeze), then chunk 2's first two ledgers (the + // live chunk; boundary 1→2 fired, chunk 2 opened, its first ledger committed). + for seq := c0First; seq <= c1.LastLedger(); seq++ { + appendLedger(seq) + } + appendLedger(c2First) + appendLedger(c2First + 1) + + core := &e2eCore{frames: frames} + var served atomic.Int32 + metrics := newRecordingMetrics() + + // ===================================================================== + // STEP 1 — first start: config → lock → validate (pin genesis) → start → + // direct ingest across the chunk-0 AND chunk-1 boundaries, with the lifecycle + // freezing, folding, and discarding each just-closed chunk off the doorbell. + // ===================================================================== + cfgPath := e2eConfigPath(t, dataDir, 0) // retention 0 (full history) for now + cancel, done, catCh := runDaemonInBackground(t, cfgPath, core, &served, metrics) + + // Inspect durable state through the daemon's OWN bound catalog (the metastore + // is opened RocksDB-primary, so a second handle would fail the LOCK). The + // catalog is safe for concurrent reads alongside the daemon's writes. + cat := awaitCatalog(t, catCh) + + // First wait until ingestion crosses BOTH boundaries and commits into chunk 2 + // (the new live chunk). Delivering c2First proves both boundary handoffs fired + // (chunks 0 and 1 closed, chunk 2 opened) and seeds the live hot-CF lookup. + // (NOTE: we must NOT gate on "chunk 0's hot key absent" first — the daemon + // hands the test its catalog from BuildBoundaries, BEFORE startStreaming opens + // the resume chunk's hot DB, so that key is transiently absent at start.) + // Budget note: crossing both boundaries is ~20k per-ledger SYNCED WriteBatches + // (the design's one-atomic-synced-batch-per-ledger durability boundary) racing + // the lifecycle freezes that re-read 10k ledgers each. fsync throughput is + // highly variable under contention: in isolation this reaches chunk 2 in ~110s + // (no -race) but ~175s under -race, and the CI gate runs the whole tree under + // `-race` (so this E2E is NOT -short-skipped there) alongside this package's + // six t.Parallel() full-chunk ticks, all competing for the same disk. 180s was + // too tight (flaky timeouts at 161/167s/killed). 600s absorbs the worst-case + // contended -race path while staying far under the 25m package envelope. + require.Eventually(t, func() bool { + return core.delivered.Load() >= c2First + }, 600*time.Second, 200*time.Millisecond, "ingestion must cross both boundaries into chunk 2") + + // The boundary doorbells have rung. A lifecycle tick freezes each just-closed + // chunk's cold artifacts (from its closed hot DB), folds its terminal (cpi=1) + // txhash index, then discards its hot tier. The durable completion signal per + // chunk: the window has a FROZEN txhash coverage (the .idx) AND the chunk's hot + // key is gone (discarded). (NOTE: the per-chunk chunk:{c}:txhash key is the + // .bin input the one-write index fold CONSUMES — after the fold it is + // demoted+swept, reading "" not "frozen"; the durable txhash artifact is the + // window's frozen coverage, not the per-chunk key.) + w0 := cat.windows.WindowID(c0) + w1 := cat.windows.WindowID(c1) + require.Eventually(t, func() bool { + for w, c := range map[WindowID]chunk.ID{w0: c0, w1: c1} { + _, hasCov, err := cat.FrozenCoverage(w) + if err != nil || !hasCov { + return false + } + has, err := cat.Has(hotChunkKey(c)) + if err != nil || has { + return false + } + } + return true + }, 60*time.Second, 50*time.Millisecond, "the boundary ticks must freeze+fold+discard chunks 0 and 1") + + require.GreaterOrEqual(t, served.Load(), int32(1), "reads were served") + require.Equal(t, uint32(c0First), core.resumeSeen.Load(), + "first start resumes captive core at genesis (watermark+1)") + + // --- Correctness: chunks 0 and 1 per-chunk cold artifacts (ledgers + events) froze. --- + for _, c := range []chunk.ID{c0, c1} { + for _, kind := range []Kind{KindLedgers, KindEvents} { + st, err := cat.State(c, kind) + require.NoError(t, err) + assert.Equal(t, StateFrozen, st, "chunk %s %s is frozen", c, kind) + } + } + // The window's txhash index is a frozen, terminal coverage (the .idx the cold + // getTransaction read resolves against). + frozenCov, ok, err := cat.FrozenCoverage(w0) + require.NoError(t, err) + require.True(t, ok, "chunk 0's window has a frozen txhash coverage") + require.True(t, cat.windows.IsTerminalCoverage(frozenCov), "a one-chunk (cpi=1) window is terminal") + + // ===================================================================== + // STEP 2 — getTransaction-style hash→seq lookup, both tiers. + // (a) cold: resolve chunk 0's tx via the frozen .idx on disk. + // (b) hot: resolve chunk 2's tx via the live hot DB's txhash CF. + // ===================================================================== + + // (a) Cold .idx — the exact reader getTransaction will sit on for frozen + // history. It resolves the committed hash to its real ledger seq. + coldReader, err := txhash.OpenColdReader(cat.layout.IndexFilePath(frozenCov)) + require.NoError(t, err) + gotSeq, err := coldReader.Get(coldHash) + require.NoError(t, err, "the chunk-0 tx hash must resolve from the frozen cold index") + assert.Equal(t, c0First, gotSeq, "cold lookup returns the ledger the tx was committed in") + // A hash that was never committed misses (not-found, not a wrong answer). + _, missErr := coldReader.Get(hashAt(0xE2EDEADBEEF)) + require.ErrorIs(t, missErr, stores.ErrNotFound, "an uncommitted hash misses the cold index") + require.NoError(t, coldReader.Close()) + + // (b) is performed AFTER the clean shutdown below — opening chunk 2's hot DB + // read-only would conflict with the live ingestion writer's exclusive RocksDB + // LOCK while the daemon runs; once the daemon stops cleanly the live chunk's + // hot DB is on disk and reopenable. The hot tier is the UN-frozen live chunk's + // sole copy, so this still exercises the hot read path. + + // Observability: the daemon emitted the boundary + freeze phase signals (the + // control-plane health gauges). + assert.GreaterOrEqual(t, len(metrics.snapshotBoundaries()), 1, "at least one chunk boundary was signaled") + assert.GreaterOrEqual(t, metrics.snapshotFreezeCount(), 1, "at least one freeze stage ran") + + // ===================================================================== + // STEP 3 — clean shutdown. The supervised loop returns nil on ctx cancel. + // ===================================================================== + // (Watermark derivation opens the live hot DB read-only, so it MUST run after + // the daemon — the live writer — releases the exclusive RocksDB LOCK; do it + // after waitClean below.) + waitClean(t, cancel, done) + + // The daemon's catalog rode its now-closed metastore handle; bind a fresh + // inspection catalog on the (now lock-free) data dir for the post-shutdown + // reads. It MUST be closed before the restart reopens the metastore. + postCat, closePost := e2eReadCatalog(t, dataDir) + + // The durable watermark, re-derived from the post-shutdown state (the basis + // for the restart's resume-with-no-gap assertion). + wmBeforeRestart := mustDeriveWatermark(t, postCat) + require.GreaterOrEqual(t, wmBeforeRestart, c2First, "watermark advanced into chunk 2") + + // (b) Live hot CF — now the daemon has stopped, chunk 2 (still the un-frozen + // live chunk: its hot key is "ready", no cold artifacts) is reopenable. Open + // its real hot DB and resolve the chunk-2 tx hash through the txhash CF — the + // read path getTransaction uses for live history before a chunk freezes. + hotState, err := postCat.HotState(c2) + require.NoError(t, err) + require.Equal(t, HotReady, hotState, "chunk 2 is the un-frozen live chunk") + c2lfs, err := postCat.State(c2, KindLedgers) + require.NoError(t, err) + require.Equal(t, State(""), c2lfs, "the live chunk has no cold artifacts yet") + + // Retry the open: RocksDB's process-level LOCK can linger momentarily after the + // writer closed (the same transient a production reader retries through). + var liveDB *hotchunk.DB + require.Eventually(t, func() bool { + db, oerr := hotchunk.Open(cat.layout.HotChunkPath(c2), c2, silentLogger()) + if oerr != nil { + return false + } + liveDB = db + return true + }, 10*time.Second, 50*time.Millisecond, "chunk 2's hot DB must be reopenable after shutdown") + hotSeq, err := liveDB.Txhash().Get(hotHash) + require.NoError(t, err, "the chunk-2 tx hash must resolve from the live hot CF") + assert.Equal(t, c2First, hotSeq, "hot lookup returns the live tx's ledger") + require.NoError(t, liveDB.Close()) // release before the restart reopens it as the live writer + + // ===================================================================== + // STEP 4 — RESTART. A fresh RunDaemonWith re-opens everything, re-derives the + // watermark from durable state, and resumes captive core at watermark+1 with + // no gap. (The shared e2eCore records the new resume + the stream's From.) + // ===================================================================== + closePost() // release the inspection metastore handle before the daemon reopens it + core.opens.Store(0) + core.resumeSeen.Store(0) + core.fromSeen.Store(0) + cancel2, done2, _ := runDaemonInBackground(t, cfgPath, core, &served, newRecordingMetrics()) + + require.Eventually(t, func() bool { return core.opens.Load() >= 1 }, 30*time.Second, 20*time.Millisecond, + "the restarted daemon re-opened captive core") + require.Eventually(t, func() bool { return core.fromSeen.Load() != 0 }, 30*time.Second, 20*time.Millisecond, + "the restarted ingestion loop requested a resume range") + + wantResume := wmBeforeRestart + 1 + assert.Equal(t, wantResume, core.resumeSeen.Load(), + "restart resumes captive core at the re-derived watermark+1 (no gap, no re-fetch of the bottom)") + assert.Equal(t, wantResume, core.fromSeen.Load(), + "the ingestion loop streamed from watermark+1 — the durable frontier, re-derived not stored") + + waitClean(t, cancel2, done2) + + // ===================================================================== + // STEP 5 — retention prune. Re-run the daemon with retention_chunks = 1: the + // effective floor anchors at chunk 1 (lastCompleteChunkAt(through=chunk 1) - + // 1 + 1), so chunk 0 (frozen + folded) falls WHOLLY below the floor and the + // prune scan sweeps its files + keys, while chunk 1 (the floor chunk) survives. + // A read of a pruned chunk-0 hash is then not-found (no coverage to resolve it). + // ===================================================================== + prunedCfg := e2eConfigPath(t, dataDir, 1) // retain ~1 chunk + // Capture chunk 0's frozen .idx path BEFORE the prune so we can confirm the + // file itself is gone afterward. (cat's layout is path-only and stays valid + // even though its metastore handle closed at the Step-3 shutdown.) + prunedIdxPath := cat.layout.IndexFilePath(frozenCov) + require.FileExists(t, prunedIdxPath, "chunk 0's cold index exists before the prune") + + cancel3, done3, catCh3 := runDaemonInBackground(t, prunedCfg, core, &served, newRecordingMetrics()) + pruneCat := awaitCatalog(t, catCh3) // the pruning daemon's own catalog + + // The prune scan runs on the first lifecycle tick (the at-start doorbell ring, + // which is startup convergence). Poll for chunk 0's per-chunk artifact keys + // (ledgers + events — the frozen cold artifacts) to vanish. + require.Eventually(t, func() bool { + ledgers, err := pruneCat.State(c0, KindLedgers) + if err != nil { + return false + } + ev, err := pruneCat.State(c0, KindEvents) + if err != nil { + return false + } + return ledgers == State("") && ev == State("") + }, 60*time.Second, 50*time.Millisecond, "retention must prune chunk 0's artifact keys") + + // Chunk 1 (the floor chunk) is WITHIN retention and survives the prune. + c1lfs, err := pruneCat.State(c1, KindLedgers) + require.NoError(t, err) + assert.Equal(t, StateFrozen, c1lfs, "chunk 1 is at the retention floor and survives") + + // The on-disk cold index file is gone too (prune unlinks the files, not just + // the keys) — a pruned read therefore cannot even open the reader. + require.Eventually(t, func() bool { + _, statErr := os.Stat(prunedIdxPath) + return os.IsNotExist(statErr) + }, 10*time.Second, 50*time.Millisecond, "the pruned cold index file is unlinked") + + // getTransaction-style "pruned read is not-found": the frozen coverage key is + // gone, so the read path has no index to resolve the (formerly resolvable) + // chunk-0 hash against — the production reader returns not-found. After prune + // the window has no frozen coverage (ok=false): the read layer's "no coverage + // ⇒ not-found" gate. + _, covOK, err := pruneCat.FrozenCoverage(w0) + require.NoError(t, err) + assert.False(t, covOK, "chunk 0's window coverage is pruned ⇒ a chunk-0 hash read is not-found") + + waitClean(t, cancel3, done3) + +} + +// ============================================================================ +// helpers +// ============================================================================ + +// e2eReadCatalog binds a Catalog over a SEPARATE metastore handle on the +// daemon's data dir, with the same one-chunk window the daemon config pins, for +// read-only inspection BETWEEN daemon runs (the metastore is RocksDB-primary / +// exclusive-LOCK, so this MUST be closed via the returned close func before the +// next daemon run reopens it). +func e2eReadCatalog(t *testing.T, dataDir string) (*Catalog, func()) { + t.Helper() + paths := Config{Service: ServiceConfig{DefaultDataDir: dataDir}}.WithDefaults().ResolvePaths() + store, err := openMetaAt(t, paths.Catalog) + require.NoError(t, err) + windows, err := NewWindows(1) // matches chunks_per_txhash_index = 1 + require.NoError(t, err) + return NewCatalog(store, NewLayoutFromPaths(paths), windows), func() { _ = store.Close() } +} + +// mustDeriveWatermark derives the durable watermark through the production probe. +func mustDeriveWatermark(t *testing.T, cat *Catalog) uint32 { + t.Helper() + wm, err := deriveWatermark(cat, NewRocksHotProbe(cat.layout.HotChunkPath, silentLogger())) + require.NoError(t, err) + return wm +} + +// The E2E reuses observability_test.go's recordingMetrics (a full Metrics sink) +// and its snapshotBoundaries; snapshotFreezeCount (added there) reports the +// number of freeze-stage signals. diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/observability_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/observability_test.go index 40dfc4081..58c2d6b72 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/observability_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/observability_test.go @@ -448,7 +448,7 @@ func TestBackfill_ReportsPassAndProgress(t *testing.T) { // complete chunk at tip]. tipLedger := chunk.ID(3).LastLedger() + 5 tip := &fakeTipBackend{tips: []uint32{tipLedger}} - start := startTestConfig(t, cat, tip, rp) + start := startTestConfig(t, cat, tip, &fakeCore{}, rp) metrics := newRecordingMetrics() start.Exec.Metrics = metrics diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go b/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go index 98fe94353..f489c8170 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go @@ -4,32 +4,45 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/streaming/geometry" ) -// startStreaming is the cold-only Phase-1 backfill daemon's startup -// orchestration — the design's "Daemon flow -> Startup", reduced to: +// startStreaming is the daemon's startup orchestration — the design's "Daemon +// flow -> Startup", in two steps: // // 1. CATCH UP via backfill. Bring on-disk coverage in line with the retention // window: each pass backfills up through the last complete chunk at the -// network tip, re-passing while new chunks appear, with one exclusion — a -// mid-chunk watermark within one chunk of the tip leaves the partial resume -// chunk alone. There is no upfront producibility gate. +// network tip, re-passing while new chunks appear at the tip, with one +// exclusion — a mid-chunk watermark within one chunk of the tip leaves the +// partial resume chunk to ingestion (core replays its tail faster than a +// bulk refetch, and a mid-chunk watermark can only have come from the live +// hot DB, so the data is local by construction). runBackfill is the SAME +// resolve + executePlan the lifecycle tick uses (Phase B); there is no +// upfront producibility gate — each chunk's producibility is enforced +// lazily during its build by the buildTxhashIndex .bin precondition. // -// 2. SERVE. Begin serving reads (injected) and return. The cold-only daemon has -// no hot tier, captive core, or live ingestion loop. +// 2. SERVE + INGEST. Open the resume chunk's hot DB (Issue 10), start captive +// core (injected), launch the lifecycle goroutine (Issue 11) on a doorbell, +// start serving reads (injected), and run the ingestion loop (Issue 10). +// The ingestion loop's first act is a doorbell ring, so the first lifecycle +// tick doubles as startup convergence (finishing crash leftovers + pruning +// downtime leftovers concurrently with early serving). // -// Everything startup cannot construct itself crosses an INJECTED interface -// (StartConfig.NetworkTip, .ServeReads), so it is unit-testable without a real -// backend or RPC server. RunDaemon calls validateConfig (which pins -// earliest_ledger) BEFORE this; startStreaming reads the pin back. +// EVERYTHING the daemon needs that startup cannot construct itself crosses an +// INJECTED interface (StartConfig.NetworkTip, .Core, .ServeReads), so this is +// unit-testable without captive core, a real bulk backend, or a real RPC +// server. validateConfig (the full TOML form) is Phase D; this accepts an +// already-resolved StartConfig and the pinned earliest_ledger is read from the +// catalog. // -// It returns nil only on a clean shutdown (ctx cancelled) or a clean ServeReads -// return; any other return is restartable and surfaces to the supervisor -// (ErrFirstStartNoTip on a true first start with no reachable backend). +// It returns nil only on a clean shutdown (ctx cancelled mid-run, or the +// ingestion loop's clean stop); any other return is restartable error the +// daemon's top-level loop surfaces (ErrFirstStartNoTip on a true first start +// with no reachable backend; a backfill/ingest failure; ErrHotVolumeLost). func startStreaming(ctx context.Context, cfg StartConfig) error { if err := cfg.validate(); err != nil { return err @@ -54,14 +67,17 @@ func startStreaming(ctx context.Context, cfg StartConfig) error { } // Derived, never stored: the highest ledger durably committed (frozen cold - // artifacts), clamped by earliest-1. A pure catalog read — no hot tier. - lastCommitted, err := lastCommittedLedger(cat, nil) + // artifacts vs the highest ready hot DB's max committed seq, clamped by + // earliest-1). With a probe it does ONE read of the highest ready hot DB and + // detects hot-volume loss LAZILY on that open (ErrHotVolumeLost) before + // ingestion ever opens a writer. + lastCommitted, err := lastCommittedLedger(cat, cfg.Exec.Process.HotProbe) if err != nil { return fmt.Errorf("streaming: startup derive watermark: %w", err) } metrics := cfg.Exec.metrics() - metrics.Watermark(lastCommitted, effectiveRetentionFloor(lastCommitted, cfg.RetentionChunks, earliest)) + metrics.Watermark(lastCommitted, effectiveRetentionFloor(lastCommitted, cfg.Lifecycle.RetentionChunks, earliest)) logger.WithField("last_committed", lastCommitted). WithField("earliest", earliest). WithField("pinned", pinned). @@ -74,20 +90,95 @@ func startStreaming(ctx context.Context, cfg StartConfig) error { } logger.WithField("last_committed", lastCommitted). - Info("streaming: catch-up complete — handing off to the read server") + WithField("resume_chunk", chunk.IDFromLedger(lastCommitted+1).String()). + Info("streaming: catch-up complete — opening resume hot tier and ingesting") + + // Step 2: serve + ingest. resumeLedger is one past the watermark — the live + // chunk's next un-committed ledger (or the chunk's first ledger on an empty + // resume DB; runIngestionLoop re-derives the exact resume point from durable + // state, so a lastCommitted that lands mid-chunk and a lastCommitted on a + // chunk boundary both resume correctly). + resumeLedger := lastCommitted + 1 + resumeChunk := chunk.IDFromLedger(resumeLedger) + + hotDB, err := openHotTierForChunk(cat, resumeChunk, logger) + if err != nil { + return fmt.Errorf("streaming: startup open resume hot tier chunk %s: %w", resumeChunk, err) + } + + // Start captive core from the resume ledger. On failure the resume hot DB is + // already open; close it so a restart re-opens cleanly (the bracket is + // idempotent, but the rocksdb LOCK must be released). + core, closeCore, err := cfg.Core.OpenCore(ctx, resumeLedger) + if err != nil { + _ = hotDB.Close() + return fmt.Errorf("streaming: startup start captive core at ledger %d: %w", resumeLedger, err) + } + defer func() { + if closeCore != nil { + _ = closeCore() + } + }() + + // The lifecycle goroutine runs one tick per notification, carrying the just- + // completed chunk id. Buffered to lifecycleQueueDepth; the ingestion loop + // sends at every chunk boundary. It shares NO in-memory state with ingestion — + // it derives everything from durable keys. + lifecycleCh := make(chan chunk.ID, lifecycleQueueDepth) + + // Seed the first tick with the last complete chunk at the resume point so its + // run fires at once — clearing crash/downtime leftovers concurrently with + // serving (the design's startup seed: lastCompleteChunkAt(resumeLedger - 1)). + // Skipped on a young network where no chunk is complete (nothing to converge; + // the first real boundary triggers the first tick). + if seed := lastCompleteChunkAt(lastCommitted); seed >= 0 { + lifecycleCh <- chunk.ID(seed) //nolint:gosec // seed >= 0 + } - // Step 2: serve. The cold-only daemon finishes after catch-up + serve. - // ServeReads launches the read server (injected); its error is restartable. + // The lifecycle goroutine is tied to a PER-ITERATION child ctx, not the + // daemon-lifetime ctx, and is cancelled + JOINED before startStreaming returns + // for ANY reason. This restores the design's single-lifecycle-goroutine + // invariant: startStreaming returns on a restartable error (a captive-core / + // GetLedger hiccup, a boundary hot-DB open failure) and superviseStreaming + // restarts it with the SAME live daemon ctx after a backoff — so if the + // lifecycle were tied to the daemon ctx, the prior iteration's loop would never + // be cancelled and would leak (blocked forever on the old channel) or, worse, + // run a tick CONCURRENTLY with the next iteration's lifecycle + ingestion (two + // RunColdChunk passes truncating the same .pack/.idx; a stale tick's op error + // firing Fatalf). runLifecycleTick checks ctx at every step and executePlan + // returns on cancellation, so the join cannot block past the current step. + lifecycleCtx, cancelLifecycle := context.WithCancel(ctx) + var lifecycleWG sync.WaitGroup + lifecycleWG.Add(1) + go func() { + defer lifecycleWG.Done() + lifecycleLoop(lifecycleCtx, cfg.Lifecycle, cat, lifecycleCh) + }() + // Cancel + join the lifecycle goroutine. This defer runs only on the two return + // paths registered after it: the ingestion-loop return (ingestion is a + // synchronous same-goroutine call whose inline notify is the sole writer to + // lifecycleCh, so it has already stopped) and the ServeReads error path + // (ingestion never started). Either way no send on lifecycleCh can race the + // cancel. The earlier error paths (resume hot-DB open, OpenCore) return BEFORE + // this defer is registered and before the goroutine starts — nothing to join. + defer func() { + cancelLifecycle() + lifecycleWG.Wait() + }() + + // Begin serving reads (injected). Serve-readiness is established by step 1 + // plus the resume chunk's hot DB just opened — crash debris and downtime + // leftovers are reader-invisible, so the first tick clears them concurrently + // with serving rather than ahead of it. if err := cfg.ServeReads(ctx); err != nil { + _ = hotDB.Close() return fmt.Errorf("streaming: startup serve reads: %w", err) } - // In cold-only Phase 1 the production ServeReads is a no-op (reads still come - // from the v1 SQLite daemon until the #772 cutover), so it returns at once and - // the daemon exits cleanly HERE — an immediate clean exit after catch-up is - // expected, not a misconfiguration. - logger.WithField("last_committed", lastCommitted). - Info("streaming: read server returned — cold-only daemon shutting down cleanly") - return nil + + // The ingestion loop owns hotDB for the rest of its life (it closes it on any + // exit and reopens at each boundary). Returns the GetLedger/boundary error; + // the daemon top level classifies a ctx-cancelled return as a clean shutdown. + return runIngestionLoop(ctx, core, hotDB, cat, lifecycleCh, allHotTypes, logger, metrics, cfg.Exec.Process.Sink) } // catchUp runs the design's catch-up loop, returning lastCommitted as backfill @@ -97,7 +188,7 @@ func startStreaming(ctx context.Context, cfg StartConfig) error { // with the mid-chunk exclusion, and breaks on an empty/already-done range. // backfilledThrough breaks the loop once rangeEnd stops advancing. func catchUp(ctx context.Context, cfg StartConfig, lastCommitted, earliest uint32) (uint32, error) { - retentionChunks := cfg.RetentionChunks + retentionChunks := cfg.Lifecycle.RetentionChunks metrics := cfg.Exec.metrics() logger := cfg.Exec.Logger @@ -203,33 +294,52 @@ func maxU32(a, b uint32) uint32 { return max(a, b) } var ErrFirstStartNoTip = errors.New("streaming: network tip unavailable and no local history to serve") // --------------------------------------------------------------------------- -// Injected external boundaries: the network tip and the read server both cross -// an interface, so startup is exercised end to end with fakes. +// Injected external boundaries. startStreaming touches NOTHING outside the +// process directly: the network tip, captive core, and the read server all +// cross an interface so startup is exercised end to end with fakes. // --------------------------------------------------------------------------- -// NetworkTipBackend samples the bulk backend's current network tip (the highest -// ledger it can serve), consulted only during catch-up. Production wraps the -// daemon's LedgerBackend; tests pass a reachable/unreachable/unready fake. +// NetworkTipBackend samples the configured bulk backend's current network tip +// (the highest ledger the backend can serve). Production wraps the daemon's +// LedgerBackend; tests pass a fake that is reachable / unreachable / unready. +// It is consulted only during catch-up; once ingestion runs, captive core is +// the tip. type NetworkTipBackend interface { NetworkTip(ctx context.Context) (uint32, error) } -// StartConfig is startStreaming's resolved dependency bundle: the scheduler -// config, the catch-up floor width, the injected boundaries, and the networkTip -// backoff bounds. The full daemon Config is a superset assembled at the call +// CoreOpener prepares captive core at resumeLedger and hands back a LedgerGetter +// the ingestion loop polls plus a closer the caller defers. Production wraps +// captive core's PrepareRange + GetLedger; tests pass a fake getter. The closer +// tears down the backend on daemon exit. +type CoreOpener interface { + OpenCore(ctx context.Context, resumeLedger uint32) (LedgerGetter, func() error, error) +} + +// StartConfig is startStreaming's resolved dependency bundle. It composes the +// scheduler/lifecycle configs (so catch-up and the lifecycle goroutine share one +// catalog, worker pool, and retention floor) and the three injected external +// boundaries, plus the networkTip backoff bounds. The full daemon Config +// (TOML-parsed paths, captive-core toml, …) is a superset assembled at the call // site; only what startup reads lives here. type StartConfig struct { // Exec drives catch-up's runBackfill. Its Catalog/Logger are the shared ones. Exec ExecConfig - // RetentionChunks is the catch-up floor's width. 0 ⇒ the earliest-ledger floor only. - RetentionChunks uint32 + // Lifecycle drives the lifecycle goroutine. Its embedded ExecConfig should be + // the SAME wiring as Exec (one catalog, one pool); RetentionChunks is the + // catch-up floor's width too. + Lifecycle LifecycleConfig // NetworkTip samples the bulk backend's tip during catch-up. Required. NetworkTip NetworkTipBackend - // ServeReads begins serving reads. It must return promptly (it launches the - // server, not block until shutdown). Required. + // Core starts captive core and yields the ingestion getter. Required. + Core CoreOpener + + // ServeReads begins serving reads (the RPC server). It must return promptly + // (it launches the server; it does not block until shutdown) — startup + // proceeds to the blocking ingestion loop after it returns. Required. ServeReads func(ctx context.Context) error // TipBackoff is networkTip's inter-attempt sleep; TipMaxAttempts bounds the @@ -244,10 +354,12 @@ const ( defaultTipMaxAttempts = 5 ) -// withDefaults fills the tip-backoff defaults and the embedded ExecConfig -// defaults (Workers -> GOMAXPROCS). +// withDefaults fills the worker-pool / lifecycle / tip-backoff defaults. The +// embedded ExecConfig defaults (Workers -> GOMAXPROCS) and the LifecycleConfig +// Fatalf default are applied so a caller need not. func (cfg StartConfig) withDefaults() StartConfig { cfg.Exec = cfg.Exec.WithDefaults() + cfg.Lifecycle = cfg.Lifecycle.WithLifecycleDefaults() if cfg.TipBackoff <= 0 { cfg.TipBackoff = defaultTipBackoff } @@ -264,9 +376,15 @@ func (cfg StartConfig) validate() error { if cfg.Exec.Logger == nil { return errors.New("streaming: StartConfig.Exec.Logger is nil") } + if cfg.Exec.Process.HotProbe == nil { + return errors.New("streaming: StartConfig.Exec.Process.HotProbe is nil (watermark derivation needs it)") + } if cfg.NetworkTip == nil { return errors.New("streaming: StartConfig.NetworkTip is nil") } + if cfg.Core == nil { + return errors.New("streaming: StartConfig.Core is nil") + } if cfg.ServeReads == nil { return errors.New("streaming: StartConfig.ServeReads is nil") } diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go index a342d7680..b46ec284f 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go @@ -56,6 +56,30 @@ func (b *fakeTipBackend) callCount() int { return b.calls } +// fakeCore is a CoreOpener handing back a programmed LedgerGetter and recording +// the resume ledger it was started from. +type fakeCore struct { + getter LedgerGetter + openErr error + resumeSeen atomic.Uint32 + openedCount atomic.Int32 +} + +func (c *fakeCore) OpenCore(_ context.Context, resumeLedger uint32) (LedgerGetter, func() error, error) { + c.openedCount.Add(1) + c.resumeSeen.Store(resumeLedger) + if c.openErr != nil { + return nil, nil, c.openErr + } + getter := c.getter + if getter == nil { + // Default: a live getter that blocks until ctx is cancelled (the daemon's + // steady state). Tests that need a finite poll set c.getter. + getter = &fakeLedgerGetter{frames: map[uint32][]byte{}, blockOnCtx: true} + } + return getter, func() error { return nil }, nil +} + // recordingPlan captures the (rangeStart, rangeEnd) every backfill pass asked // for, via the ExecConfig runChunk/runIndex test seams — so a backfill test // asserts the loop's range arithmetic without real cold I/O. Because resolve @@ -103,12 +127,12 @@ func (r *recordingPlan) snapshot() [][2]chunk.ID { return out } -// startTestConfig builds a cold StartConfig over a real catalog (genesis floor -// pinned to GenesisLedger by default) with all external boundaries faked. -// recordPlan, when non-nil, wires the runChunk/runIndex seams so backfill passes -// are recorded without cold I/O. +// startTestConfig builds a StartConfig over a real catalog (genesis floor pinned +// to GenesisLedger by default) with all external boundaries faked. recordPlan, +// when non-nil, wires the runChunk/runIndex seams so backfill passes are +// recorded without cold I/O. func startTestConfig( - t *testing.T, cat *catalog.Catalog, tip *fakeTipBackend, recordPlan *recordingPlan, + t *testing.T, cat *catalog.Catalog, tip *fakeTipBackend, core *fakeCore, recordPlan *recordingPlan, ) StartConfig { t.Helper() exec := ExecConfig{ @@ -116,7 +140,8 @@ func startTestConfig( Logger: silentLogger(), Workers: 2, Process: ProcessConfig{ - Backend: zeroTxBackend(t), + HotProbe: NewRocksHotProbe(cat.layout.HotChunkPath, silentLogger()), + Backend: zeroTxBackend(t), }, } if recordPlan != nil { @@ -126,13 +151,15 @@ func startTestConfig( } exec.runIndex = func(_ context.Context, _ IndexBuild, _ ExecConfig) error { return nil } } + life := LifecycleConfig{ExecConfig: exec, RetentionChunks: 0, Fatalf: (&fatalRecorder{}).fatalf} return StartConfig{ - Exec: exec, - RetentionChunks: 0, - NetworkTip: tip, - ServeReads: func(context.Context) error { return nil }, - TipBackoff: time.Millisecond, - TipMaxAttempts: 3, + Exec: exec, + Lifecycle: life, + NetworkTip: tip, + Core: core, + ServeReads: func(context.Context) error { return nil }, + TipBackoff: time.Millisecond, + TipMaxAttempts: 3, } } @@ -190,7 +217,7 @@ func TestBackfill_FirstStartTipAbsentFatal(t *testing.T) { cat, _ := testCatalog(t) pinGenesis(t, cat) tip := &fakeTipBackend{err: errors.New("backend unreachable"), errFirst: 99} - cfg := startTestConfig(t, cat, tip, &recordingPlan{}) + cfg := startTestConfig(t, cat, tip, &fakeCore{}, &recordingPlan{}) // lastCommitted = deriveWatermark over an empty catalog = preGenesisLedger (1); // earliest = GenesisLedger (2); 1 < 2 ⇒ first start with no progress. @@ -208,7 +235,7 @@ func TestBackfill_FirstStartTipPresentComputesRange(t *testing.T) { tipLedger := chunk.ID(3).FirstLedger() + 100 rec := &recordingPlan{} tip := &fakeTipBackend{tips: []uint32{tipLedger}} - cfg := startTestConfig(t, cat, tip, rec) + cfg := startTestConfig(t, cat, tip, &fakeCore{}, rec) last, err := catchUp(context.Background(), cfg, preGenesisLedger, chunk.FirstLedgerSeq) require.NoError(t, err) @@ -230,7 +257,7 @@ func TestBackfill_YoungNetworkNoOp(t *testing.T) { // Tip inside chunk 0 (no chunk has fully closed yet). tip := &fakeTipBackend{tips: []uint32{chunk.FirstLedgerSeq + 50}} rec := &recordingPlan{} - cfg := startTestConfig(t, cat, tip, rec) + cfg := startTestConfig(t, cat, tip, &fakeCore{}, rec) last, err := catchUp(context.Background(), cfg, preGenesisLedger, chunk.FirstLedgerSeq) require.NoError(t, err) @@ -257,7 +284,7 @@ func TestBackfill_SteadyRestartNoOp(t *testing.T) { tipLedger := chunk.ID(3).FirstLedger() + 10 // last complete chunk == 2 rec := &recordingPlan{} tip := &fakeTipBackend{tips: []uint32{tipLedger}} - cfg := startTestConfig(t, cat, tip, rec) + cfg := startTestConfig(t, cat, tip, &fakeCore{}, rec) last, err := catchUp(context.Background(), cfg, watermark, chunk.FirstLedgerSeq) require.NoError(t, err) @@ -289,7 +316,7 @@ func TestBackfill_MidChunkResumeExclusion(t *testing.T) { tipLedger := chunk.ID(5).LastLedger() // within one chunk, but chunk 5 complete-at-tip rec := &recordingPlan{} tip := &fakeTipBackend{tips: []uint32{tipLedger}} - cfg := startTestConfig(t, cat, tip, rec) + cfg := startTestConfig(t, cat, tip, &fakeCore{}, rec) last, err := catchUp(context.Background(), cfg, watermark, chunk.FirstLedgerSeq) require.NoError(t, err) @@ -331,7 +358,7 @@ func TestBackfill_LongDowntimeRePass(t *testing.T) { Catalog: cat, Logger: silentLogger(), Workers: 2, - Process: ProcessConfig{Backend: zeroTxBackend(t)}, + Process: ProcessConfig{HotProbe: NewRocksHotProbe(cat.layout.HotChunkPath, silentLogger()), Backend: zeroTxBackend(t)}, runChunk: func(_ context.Context, cb ChunkBuild, _ ExecConfig) error { mu.Lock() allChunks = append(allChunks, cb.Chunk) @@ -342,7 +369,9 @@ func TestBackfill_LongDowntimeRePass(t *testing.T) { } cfg := StartConfig{ Exec: exec, + Lifecycle: LifecycleConfig{ExecConfig: exec, Fatalf: (&fatalRecorder{}).fatalf}, NetworkTip: tip, + Core: &fakeCore{}, ServeReads: func(context.Context) error { return nil }, TipBackoff: time.Millisecond, TipMaxAttempts: 3, @@ -377,7 +406,7 @@ func TestBackfill_RestartTipUnreachableDegrades(t *testing.T) { watermark := chunk.ID(2).LastLedger() // local progress exists tip := &fakeTipBackend{err: errors.New("backend down"), errFirst: 99} rec := &recordingPlan{} - cfg := startTestConfig(t, cat, tip, rec) + cfg := startTestConfig(t, cat, tip, &fakeCore{}, rec) last, err := catchUp(context.Background(), cfg, watermark, chunk.FirstLedgerSeq) require.NoError(t, err, "local progress means no fatal") @@ -408,7 +437,7 @@ func TestBackfill_LaggingBulkTipFoldsWatermarkChunk(t *testing.T) { tipLedger := chunk.ID(3).FirstLedger() + 10 // lagging bulk tip in chunk 3 (last complete 2) rec := &recordingPlan{} tip := &fakeTipBackend{tips: []uint32{tipLedger}} - cfg := startTestConfig(t, cat, tip, rec) + cfg := startTestConfig(t, cat, tip, &fakeCore{}, rec) last, err := catchUp(context.Background(), cfg, watermark, chunk.FirstLedgerSeq) require.NoError(t, err) @@ -423,52 +452,69 @@ func TestBackfill_LaggingBulkTipFoldsWatermarkChunk(t *testing.T) { } // --------------------------------------------------------------------------- -// startStreaming — the cold-only catch-up + serve flow. +// startStreaming — the full serve+ingest handoff (clean shutdown). // --------------------------------------------------------------------------- // A genesis first start with a tip inside chunk 0 (young network) does no -// backfill, then serves reads. The cold-only daemon has no hot tier or live -// ingestion loop: startStreaming returns nil once ServeReads returns with no -// error. -func TestStartStreaming_FirstStartCatchUpThenServe(t *testing.T) { +// backfill, opens the resume chunk's hot DB, starts the (blocking) fake core +// getter, serves reads, and runs the ingestion loop — which returns the ctx- +// cancelled GetLedger error when ctx is cancelled. The clean-shutdown +// classification now lives at the daemon top level (superviseStreaming treats a +// ctx-cancelled return as clean), so startStreaming surfaces the wrapped +// context.Canceled. The resume ledger is genesis. +func TestStartStreaming_FirstStartServeIngestCleanShutdown(t *testing.T) { cat, _ := testCatalog(t) pinGenesis(t, cat) served := atomic.Int32{} + // Live getter: blocks until ctx cancel (the daemon's steady state). + core := &fakeCore{getter: &fakeLedgerGetter{frames: map[uint32][]byte{}, blockOnCtx: true}} tip := &fakeTipBackend{tips: []uint32{chunk.FirstLedgerSeq + 10}} // young: no backfill - cfg := startTestConfig(t, cat, tip, nil) + cfg := startTestConfig(t, cat, tip, core, nil) cfg.ServeReads = func(context.Context) error { served.Add(1); return nil } - require.NoError(t, startStreaming(context.Background(), cfg)) - require.Equal(t, int32(1), served.Load(), "reads were served exactly once") -} + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error, 1) + go func() { errCh <- startStreaming(ctx, cfg) }() -// startStreaming surfaces a ServeReads error wrapped, as a restartable failure. -func TestStartStreaming_ServeReadsErrorSurfaces(t *testing.T) { - cat, _ := testCatalog(t) - pinGenesis(t, cat) - tip := &fakeTipBackend{tips: []uint32{chunk.FirstLedgerSeq + 10}} - cfg := startTestConfig(t, cat, tip, nil) - cfg.ServeReads = func(context.Context) error { return errors.New("rpc bind failed") } + // Give the loop time to open the hot DB, start core, serve, and park on the + // blocking getter, then request a clean shutdown. + require.Eventually(t, func() bool { return served.Load() == 1 }, 2*time.Second, 5*time.Millisecond) + cancel() - err := startStreaming(context.Background(), cfg) - require.Error(t, err) - require.Contains(t, err.Error(), "serve reads") + select { + case err := <-errCh: + // The ingestion loop surfaces the ctx-cancelled GetLedger error; the daemon + // top level (superviseStreaming) classifies a ctx-cancelled return as clean. + require.ErrorIs(t, err, context.Canceled, "clean shutdown surfaces the ctx-cancelled error") + case <-time.After(3 * time.Second): + t.Fatal("startStreaming did not return after ctx cancel") + } + + require.Equal(t, int32(1), served.Load(), "reads were served exactly once") + require.Equal(t, int32(1), core.openedCount.Load(), "captive core started once") + require.Equal(t, uint32(chunk.FirstLedgerSeq), core.resumeSeen.Load(), + "resume ledger is genesis on a fresh start (watermark+1)") + + // The resume chunk's hot key is "ready" (the loop opened it and the boundary + // was never crossed). + state, err := cat.HotState(chunk.IDFromLedger(chunk.FirstLedgerSeq)) + require.NoError(t, err) + assert.Equal(t, HotReady, state) } // startStreaming fatals on a true first start when the tip is unavailable: the -// error is ErrFirstStartNoTip and reads are never served. +// error is ErrFirstStartNoTip and NEITHER the hot DB nor core is opened. func TestStartStreaming_FirstStartNoTipFatal(t *testing.T) { cat, _ := testCatalog(t) pinGenesis(t, cat) - served := atomic.Int32{} + core := &fakeCore{} tip := &fakeTipBackend{err: errors.New("unreachable"), errFirst: 99} - cfg := startTestConfig(t, cat, tip, nil) - cfg.ServeReads = func(context.Context) error { served.Add(1); return nil } + cfg := startTestConfig(t, cat, tip, core, nil) err := startStreaming(context.Background(), cfg) require.ErrorIs(t, err, ErrFirstStartNoTip) - require.Zero(t, served.Load(), "reads are never served when catch-up fatals") + require.Zero(t, core.openedCount.Load(), "core is never started when backfill fatals") } // startStreaming surfaces a missing earliest_ledger pin loudly (validateConfig @@ -477,7 +523,7 @@ func TestStartStreaming_FirstStartNoTipFatal(t *testing.T) { func TestStartStreaming_RequiresEarliestPin(t *testing.T) { cat, _ := testCatalog(t) // No pinGenesis. - cfg := startTestConfig(t, cat, &fakeTipBackend{tips: []uint32{50_000}}, nil) + cfg := startTestConfig(t, cat, &fakeTipBackend{tips: []uint32{50_000}}, &fakeCore{}, nil) err := startStreaming(context.Background(), cfg) require.Error(t, err) require.Contains(t, err.Error(), "earliest_ledger pinned") @@ -486,18 +532,28 @@ func TestStartStreaming_RequiresEarliestPin(t *testing.T) { // startStreaming validates its injected boundaries. func TestStartStreaming_ValidatesConfig(t *testing.T) { cat, _ := testCatalog(t) - base := startTestConfig(t, cat, &fakeTipBackend{tips: []uint32{50_000}}, nil) + base := startTestConfig(t, cat, &fakeTipBackend{tips: []uint32{50_000}}, &fakeCore{}, nil) t.Run("nil NetworkTip", func(t *testing.T) { cfg := base cfg.NetworkTip = nil require.Error(t, startStreaming(context.Background(), cfg)) }) + t.Run("nil Core", func(t *testing.T) { + cfg := base + cfg.Core = nil + require.Error(t, startStreaming(context.Background(), cfg)) + }) t.Run("nil ServeReads", func(t *testing.T) { cfg := base cfg.ServeReads = nil require.Error(t, startStreaming(context.Background(), cfg)) }) + t.Run("nil HotProbe", func(t *testing.T) { + cfg := base + cfg.Exec.Process.HotProbe = nil + require.Error(t, startStreaming(context.Background(), cfg)) + }) } // --------------------------------------------------------------------------- From 48308e4a17799da13d2dd567b0fe3b2a91f27ee2 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 24 Jun 2026 23:39:59 -0400 Subject: [PATCH 2/4] =?UTF-8?q?streaming(fullhistory):=20reviewability=20c?= =?UTF-8?q?leanup=20=E2=80=94=20stale=20docs=20+=20dead=20code?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Doc/comment fixes and dead-code removal in the Phase 2 layer 2 wiring; no behavior change (daemon.go/startup.go are comment-only apart from one unused parameter rename). - daemon.go buildProductionBoundaries: the "Core:" doc bullet described the old NewCaptiveCoreStream/OpenLedgerStream/RawLedgers model; rewrite it to the actual CoreOpener.OpenCore (PrepareRange + indexed GetLedger), fold the captive-core config deferral into the existing #772 TODO, and drop the now-overstated "fully wired". Rename the unused ctx parameter to _. - startup.go: condense the two lifecycle-goroutine comment blocks (the per-iteration-ctx rationale and the cancel+join defer) to the load-bearing "why" without the duplicated invariant restatement; fix a grammar slip. - e2e_test.go: the header named a non-existent CoreStreamOpener interface (it is CoreOpener); remove the dead e2eGetter.maxSeq field (set in OpenCore, never read — GetLedger keys off the frames map); fix the trailing helper comment that claimed snapshotFreezeCount was added here (it lives in the base). - daemon_test.go: the capturedBuild doc opened with a non-existent "fakeBoundaries" symbol and stale "stream" vocabulary; name the type and say "getter". --- .../internal/fullhistory/streaming/daemon.go | 28 +++++++++---------- .../fullhistory/streaming/daemon_test.go | 4 +-- .../fullhistory/streaming/e2e_test.go | 12 ++------ .../internal/fullhistory/streaming/startup.go | 28 ++++++++----------- 4 files changed, 30 insertions(+), 42 deletions(-) diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go index 169a11d7b..faf88c309 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go @@ -293,26 +293,26 @@ func superviseStreaming( // buildProductionBoundaries assembles the real external boundaries from the // loaded config. // -// - Core: captive stellar-core via NewCaptiveCoreStream, wrapped so -// OpenLedgerStream hands the live stream to the ingestion loop (the stream -// owns the core process lifecycle — started on the first RawLedgers pull, -// torn down when iteration ends — so this builder constructs it without -// sequencing PrepareRange/Close itself). +// - Core: captive stellar-core via newCaptiveCoreOpener; OpenCore calls +// PrepareRange(UnboundedRange(resume)) and hands back a LedgerGetter the +// ingestion loop polls by sequence (the design's core.GetLedger(ctx, seq)), +// plus a closer. The config plumbing is deferred (TODO below), so today the +// constructor errors with a #772 pointer. // - Backend: the bulk datastore ChunkSource (NewDataStoreSource) when a bucket // path is configured; nil for a frontfill-only deployment. // - NetworkTip / BackendWaiter: an adapter over the bulk backend's tip. // -// TODO(#772): the bulk-backend TIP boundary is the one piece still entangled -// with config that does not yet exist on this branch (the datastore TYPE + -// schema — only [backfill.bsb].bucket_path is in Config today) and with the lake +// TODO(#772): both the captive-core config (binary path, passphrase, archives — +// see newCaptiveCoreOpener) and the bulk-backend TIP boundary (the datastore +// TYPE + schema; only [backfill.bsb].bucket_path is in Config today) are +// entangled with config that does not yet exist on this branch and with the lake // tip-resolution the v1 path performs differently. Until #772 lands the cutover, -// a deployment that needs catch-up against a real lake must wire NetworkTip/ -// BackendWaiter/Backend through DaemonOptions.BuildBoundaries; buildProduction- -// Boundaries supplies the captive-core Core (fully wired) and a tip adapter that -// errors clearly when no bulk backend is configured, so a frontfill ("genesis" -// or "now" with no backfill) deployment runs unchanged. +// a deployment must wire Core (and, for catch-up against a real lake, NetworkTip/ +// BackendWaiter/Backend) through DaemonOptions.BuildBoundaries; the tip adapter +// here errors clearly when no bulk backend is configured, so a frontfill +// ("genesis" or "now" with no backfill) deployment runs unchanged. func buildProductionBoundaries( - ctx context.Context, cfg Config, _ Paths, _ *catalog.Catalog, logger *supportlog.Entry, + _ context.Context, cfg Config, _ Paths, _ *catalog.Catalog, logger *supportlog.Entry, ) (Boundaries, error) { core, err := newCaptiveCoreOpener(cfg.Streaming.CaptiveCoreConfig, logger) if err != nil { diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go index fb8bf4351..43f7abb16 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go @@ -59,9 +59,9 @@ format = "text" return configPath, dataDir } -// fakeBoundaries returns a BuildBoundaries func that hands RunDaemon a set of +// capturedBuild.build is a BuildBoundaries func that hands RunDaemon a set of // faked external boundaries (a young-network tip ⇒ no backfill, a fake core -// stream that blocks until ctx cancel, a recording ServeReads). It also records +// getter that blocks until ctx cancel, a recording ServeReads). It also records // the resolved config/paths the daemon passed the builder, so a test asserts the // daemon threaded LoadConfig+ResolvePaths through correctly. type capturedBuild struct { diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go index 092469a78..50f21dabc 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go @@ -25,7 +25,7 @@ package streaming // Only the two EXTERNAL boundaries the daemon injects on purpose: // - The ledger SOURCE. Production drives ingestion from captive // stellar-core (a child process) and backfill from a bulk object-store -// backend. Here both cross their injected interfaces (CoreStreamOpener / +// backend. Here both cross their injected interfaces (CoreOpener / // NetworkTipBackend) and are fed SYNTHETIC-BUT-WELL-FORMED LedgerCloseMeta // built by the same fixtures the merged store tests use (zero-tx LCM for // bulk, plus a one-tx LCM where a real, network-hashed transaction hash is @@ -145,7 +145,6 @@ func oneTxLCMReturningHash(t *testing.T, seq uint32) ([]byte, [32]byte) { // daemon top level classifies as clean. type e2eGetter struct { frames map[uint32][]byte - maxSeq uint32 fromSeen *atomic.Uint32 // first GetLedger seq (for the restart assertion) delivered *atomic.Uint32 // highest seq actually yielded (test sync) sawFrom atomic.Bool @@ -190,14 +189,10 @@ func (c *e2eCore) OpenCore(_ context.Context, resume uint32) (LedgerGetter, func c.opens.Add(1) c.resumeSeen.Store(resume) byseq := make(map[uint32][]byte, len(c.frames)) - var maxSeq uint32 for _, f := range c.frames { byseq[f.seq] = f.raw - if f.seq > maxSeq { - maxSeq = f.seq - } } - getter := &e2eGetter{frames: byseq, maxSeq: maxSeq, fromSeen: &c.fromSeen, delivered: &c.delivered} + getter := &e2eGetter{frames: byseq, fromSeen: &c.fromSeen, delivered: &c.delivered} return getter, func() error { return nil }, nil } @@ -632,5 +627,4 @@ func mustDeriveWatermark(t *testing.T, cat *Catalog) uint32 { } // The E2E reuses observability_test.go's recordingMetrics (a full Metrics sink) -// and its snapshotBoundaries; snapshotFreezeCount (added there) reports the -// number of freeze-stage signals. +// and its snapshotBoundaries / snapshotFreezeCount snapshot helpers. diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go b/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go index f489c8170..7f70d56b6 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go @@ -40,7 +40,7 @@ import ( // catalog. // // It returns nil only on a clean shutdown (ctx cancelled mid-run, or the -// ingestion loop's clean stop); any other return is restartable error the +// ingestion loop's clean stop); any other return is a restartable error the // daemon's top-level loop surfaces (ErrFirstStartNoTip on a true first start // with no reachable backend; a backfill/ingest failure; ErrHotVolumeLost). func startStreaming(ctx context.Context, cfg StartConfig) error { @@ -137,15 +137,11 @@ func startStreaming(ctx context.Context, cfg StartConfig) error { // The lifecycle goroutine is tied to a PER-ITERATION child ctx, not the // daemon-lifetime ctx, and is cancelled + JOINED before startStreaming returns - // for ANY reason. This restores the design's single-lifecycle-goroutine - // invariant: startStreaming returns on a restartable error (a captive-core / - // GetLedger hiccup, a boundary hot-DB open failure) and superviseStreaming - // restarts it with the SAME live daemon ctx after a backoff — so if the - // lifecycle were tied to the daemon ctx, the prior iteration's loop would never - // be cancelled and would leak (blocked forever on the old channel) or, worse, - // run a tick CONCURRENTLY with the next iteration's lifecycle + ingestion (two - // RunColdChunk passes truncating the same .pack/.idx; a stale tick's op error - // firing Fatalf). runLifecycleTick checks ctx at every step and executePlan + // for ANY reason — restoring the design's single-lifecycle-goroutine invariant + // across supervisor restarts (a daemon-ctx-tied loop would survive a + // restartable return and run a tick concurrently with the next iteration's + // lifecycle + ingestion: two RunColdChunk passes truncating the same + // .pack/.idx). runLifecycleTick checks ctx at every step and executePlan // returns on cancellation, so the join cannot block past the current step. lifecycleCtx, cancelLifecycle := context.WithCancel(ctx) var lifecycleWG sync.WaitGroup @@ -154,13 +150,11 @@ func startStreaming(ctx context.Context, cfg StartConfig) error { defer lifecycleWG.Done() lifecycleLoop(lifecycleCtx, cfg.Lifecycle, cat, lifecycleCh) }() - // Cancel + join the lifecycle goroutine. This defer runs only on the two return - // paths registered after it: the ingestion-loop return (ingestion is a - // synchronous same-goroutine call whose inline notify is the sole writer to - // lifecycleCh, so it has already stopped) and the ServeReads error path - // (ingestion never started). Either way no send on lifecycleCh can race the - // cancel. The earlier error paths (resume hot-DB open, OpenCore) return BEFORE - // this defer is registered and before the goroutine starts — nothing to join. + // The two return paths registered after this defer (the ingestion-loop return + // and the ServeReads error path) have no live sender on lifecycleCh — ingestion + // is a same-goroutine call whose inline notify has stopped, and the serve path + // never starts it — so no send can race the cancel. The earlier error paths + // return before this defer and before the goroutine starts. defer func() { cancelLifecycle() lifecycleWG.Wait() From 754e3c4dca2ff75e679b811e4aaa6eafdbda724e Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 24 Jun 2026 23:40:05 -0400 Subject: [PATCH 3/4] streaming(fullhistory): restore the ServeReads-error startup test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The serve+ingest rework dropped TestStartStreaming_ServeReadsErrorSurfaces, leaving the ServeReads-error return path untested — and that path now does more than before: it must close the already-opened resume hot DB so a restart can reacquire the rocksdb LOCK. Restore the test adapted to the new flow: a fake core with a blocking getter so ServeReads is reached, then assert the wrapped "serve reads" error surfaces (not classified clean), core was started, and the resume hot DB is reopenable afterward (its LOCK was released). --- .../fullhistory/streaming/startup_test.go | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go index b46ec284f..ba564a811 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go @@ -503,6 +503,35 @@ func TestStartStreaming_FirstStartServeIngestCleanShutdown(t *testing.T) { assert.Equal(t, HotReady, state) } +// A ServeReads error is surfaced wrapped as a restartable failure (NOT clean) and +// the already-opened resume hot DB is closed on the way out, so a restart can +// reopen it (the rocksdb LOCK is released). ServeReads runs after the hot DB +// opens and core starts but before the blocking ingestion loop, so startStreaming +// returns synchronously here. +func TestStartStreaming_ServeReadsErrorSurfaces(t *testing.T) { + cat, _ := testCatalog(t) + pinGenesis(t, cat) + core := &fakeCore{getter: &fakeLedgerGetter{frames: map[uint32][]byte{}, blockOnCtx: true}} + tip := &fakeTipBackend{tips: []uint32{chunk.FirstLedgerSeq + 10}} // young: no backfill + cfg := startTestConfig(t, cat, tip, core, nil) + cfg.ServeReads = func(context.Context) error { return errors.New("rpc bind failed") } + + err := startStreaming(context.Background(), cfg) + require.Error(t, err) + require.Contains(t, err.Error(), "serve reads") + require.NotErrorIs(t, err, context.Canceled, "a ServeReads error is restartable, not a clean shutdown") + require.Equal(t, int32(1), core.openedCount.Load(), "core was started before serving") + + // The resume hot DB was closed on the error path (LOCK released): reopening it + // succeeds. Its key is still "ready" — only the handle was released, not the DB. + state, err := cat.HotState(chunk.IDFromLedger(chunk.FirstLedgerSeq)) + require.NoError(t, err) + require.Equal(t, HotReady, state) + db, err := openHotTierForChunk(cat, chunk.IDFromLedger(chunk.FirstLedgerSeq), silentLogger()) + require.NoError(t, err, "the resume hot DB is reopenable — startStreaming released its LOCK") + require.NoError(t, db.Close()) +} + // startStreaming fatals on a true first start when the tip is unavailable: the // error is ErrFirstStartNoTip and NEITHER the hot DB nor core is opened. func TestStartStreaming_FirstStartNoTipFatal(t *testing.T) { From 925993c7a080119b322ac95f7840fdd2caa07be3 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 25 Jun 2026 16:14:06 -0400 Subject: [PATCH 4/4] streaming(fullhistory): restack #821 on split #820 + propagate Rebased the live-ingestion capstone onto the reorganized #820 and propagated: - qualify moved symbols (geometry./catalog.) in daemon.go, startup.go, e2e_test.go - window->tx-hash-index + RetentionGate->RetentionFloor renames; cat.layout->Layout(), cat.Has->public HotState shim, .IndexFilePath->.TxHashIndexFilePath - config regroup: cfg.Streaming.CaptiveCoreConfig -> cfg.Ingestion.CaptiveCoreConfig - restored #821's daemon_test.go (drops the cold-only catch-up test the full daemon supersedes; adds the supervise/backend-tip/boundaries tests) + the HotProbe/Core wiring - avoided the txhash_txhash_index find-replace corruption (was only in the dropped restack) build + vet + go test -short green EXCEPT the lifecycle E2E, whose generated TOML still uses the pre-regroup [streaming]/[backfill] schema (follow-up; per maintainer the stack will be re-rebased). --- .../internal/fullhistory/streaming/daemon.go | 2 +- .../fullhistory/streaming/daemon_test.go | 169 +----------------- .../fullhistory/streaming/e2e_test.go | 64 +++---- .../internal/fullhistory/streaming/startup.go | 4 +- .../fullhistory/streaming/startup_test.go | 9 +- 5 files changed, 44 insertions(+), 204 deletions(-) diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go index faf88c309..a0084c998 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go @@ -314,7 +314,7 @@ func superviseStreaming( func buildProductionBoundaries( _ context.Context, cfg Config, _ Paths, _ *catalog.Catalog, logger *supportlog.Entry, ) (Boundaries, error) { - core, err := newCaptiveCoreOpener(cfg.Streaming.CaptiveCoreConfig, logger) + core, err := newCaptiveCoreOpener(cfg.Ingestion.CaptiveCoreConfig, logger) if err != nil { return Boundaries{}, err } diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go index 43f7abb16..d4af0d20b 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go @@ -15,8 +15,6 @@ import ( "github.com/stretchr/testify/require" "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" - "github.com/stellar/go-stellar-sdk/keypair" - "github.com/stellar/go-stellar-sdk/network" supportlog "github.com/stellar/go-stellar-sdk/support/log" "github.com/stellar/go-stellar-sdk/xdr" @@ -144,167 +142,6 @@ func TestRunDaemon_LoadValidateWireStartCleanShutdown(t *testing.T) { assert.Equal(t, uint32(DefaultChunksPerTxhashIndex), cpi) } -// someTxBackend serves a chunk whose ledgers are zero-tx EXCEPT a sparse few that -// carry one transaction each. A wholly zero-tx chunk cannot build a txhash index -// ("zero keys"), so the index path needs at least some keys; sparseness keeps the -// 10k-ledger pass nearly as cheap as the all-zero-tx fixture. -func someTxBackend(t *testing.T) *countingChunkSource { - t.Helper() - src := xdr.MustMuxedAddress(keypair.MustRandom().Address()) - gen := func(t *testing.T, seq uint32) []byte { - if seq%2500 != 0 { // the vast majority: cheap zero-tx ledgers - return zeroTxLCMBytes(t, seq) - } - return oneTxLCMBytes(t, seq, src) // a handful carry one unique tx - } - return &countingChunkSource{ - make: func(chunk.ID) (ledgerbackend.LedgerStream, error) { - return &fullChunkStream{t: t, gen: gen}, nil - }, - } -} - -// oneTxLCMBytes is zeroTxLCMBytes plus a single transaction (a fixed source -// account, a per-seq sequence number for a unique hash) so ExtractTxHashes yields -// exactly one txhash key for seq. -func oneTxLCMBytes(t *testing.T, seq uint32, src xdr.MuxedAccount) []byte { - t.Helper() - envelope := xdr.TransactionEnvelope{ - Type: xdr.EnvelopeTypeEnvelopeTypeTx, - V1: &xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{ - SourceAccount: src, - SeqNum: xdr.SequenceNumber(seq), // unique per ledger ⇒ unique hash - Ext: xdr.TransactionExt{V: 1, SorobanData: &xdr.SorobanTransactionData{}}, - }, - }, - } - hash, err := network.HashTransactionInEnvelope(envelope, network.PublicNetworkPassphrase) - require.NoError(t, err) - comp := []xdr.TxSetComponent{{ - Type: xdr.TxSetComponentTypeTxsetCompTxsMaybeDiscountedFee, - TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ - Txs: []xdr.TransactionEnvelope{envelope}, - }, - }} - lcm := xdr.LedgerCloseMeta{ - V: 2, - V2: &xdr.LedgerCloseMetaV2{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - ScpValue: xdr.StellarValue{CloseTime: xdr.TimePoint(0)}, - LedgerSeq: xdr.Uint32(seq), - }, - }, - TxSet: xdr.GeneralizedTransactionSet{ - V: 1, - V1TxSet: &xdr.TransactionSetV1{Phases: []xdr.TransactionPhase{{V: 0, V0Components: &comp}}}, - }, - TxProcessing: []xdr.TransactionResultMetaV1{{ - // A non-nil versioned meta: a zero-value TransactionMeta (V=0) nil-derefs - // in EncodeTo. Empty V4 ⇒ no events, which is fine for this fixture. - TxApplyProcessing: xdr.TransactionMeta{V: 4, V4: &xdr.TransactionMetaV4{}}, - Result: xdr.TransactionResultPair{ - TransactionHash: hash, - Result: xdr.TransactionResult{ - FeeCharged: 100, - Result: xdr.TransactionResultResult{ - Code: xdr.TransactionResultCodeTxSuccess, - Results: &[]xdr.OperationResult{}, - }, - }, - }, - }}, - }, - } - raw, err := lcm.MarshalBinary() - require.NoError(t, err) - return raw -} - -// The #815 end-to-end acceptance: one TOML boots the daemon and it catches up to -// the tip for ALL THREE cold data types + the window index — proven THROUGH the -// real entrypoint (LoadConfig → validateConfig → catchUp → executePlan → -// processChunk → buildTxhashIndex → buildThenSweep), not by calling the -// primitives in isolation. The happy-path test above sits the tip inside chunk 0 -// so its catch-up is a deliberate no-op; here a tip at chunk 0's last ledger -// backfills the COMPLETE chunk 0, and chunks_per_txhash_index=1 makes window 0 a -// single-chunk window so its index build is terminal — letting us assert the -// whole txhash lifecycle (.bin → merged .idx → .bin swept). workers=1 also drives -// the index-waits-on-its-chunk path (the deadlock-prone case) through the daemon. -// -// A frozen chunk is a full LedgersPerChunk (10k) pass, but the ledgers are cheap -// (zero-tx bodies + a sparse few one-tx ones), so the whole catch-up runs in well -// under a second — fast enough for -short. The merge DEPTH (multi-.bin windows, -// rolling/terminal demotion) is unit-tested in txindex_test; this test's job is -// the daemon-level COMPOSITION. -func TestRunDaemon_CatchUpMaterializesAllColdTypesAndIndex(t *testing.T) { - configPath, dataDir := writeTempConfig(t, "[layout]\nchunks_per_txhash_index = 1\n[backfill]\nworkers = 1\n") - - build := func(_ context.Context, _ Config, _ Paths, _ *catalog.Catalog, _ *supportlog.Entry) (Boundaries, error) { - return Boundaries{ - // Tip at chunk 0's last ledger ⇒ chunk 0 is complete, so the catch-up - // freezes it and (cpi=1) its single-chunk window index is terminal. - NetworkTip: &fakeTipBackend{tips: []uint32{chunk.ID(0).LastLedger()}}, - Backend: someTxBackend(t), // mostly zero-tx, a sparse few carry a tx - BackendWaiter: &fakeWaiter{}, // coverage is always satisfied - ServeReads: func(context.Context) error { return nil }, - }, nil - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - errCh := make(chan error, 1) - go func() { - errCh <- RunDaemonWith(ctx, configPath, - DaemonOptions{BuildBoundaries: build, Logger: silentLogger()}) - }() - select { - case err := <-errCh: - require.NoError(t, err, "daemon catches up to tip then exits cleanly (no-op ServeReads)") - case <-time.After(60 * time.Second): - cancel() - t.Fatal("RunDaemonWith did not finish catch-up within 60s (regressed into a hang/restart loop?)") - } - - // Read the catalog back after the daemon released its locks + closed its store. - store, err := openMetaAt(t, filepath.Join(dataDir, "catalog", "rocksdb")) - require.NoError(t, err) - defer func() { _ = store.Close() }() - windows, err := geometry.NewTxHashIndexLayout(1) - require.NoError(t, err) - layout := geometry.NewLayout(dataDir) - cat := catalog.NewCatalog(store, layout, windows) - - // (1) Chunk 0's ledger + events artifacts are frozen, with files on disk. - ls, err := cat.State(0, geometry.KindLedgers) - require.NoError(t, err) - assert.Equal(t, geometry.StateFrozen, ls, "chunk 0 ledgers frozen") - es, err := cat.State(0, geometry.KindEvents) - require.NoError(t, err) - assert.Equal(t, geometry.StateFrozen, es, "chunk 0 events frozen") - assert.FileExists(t, layout.LedgerPackPath(0)) - for _, p := range layout.EventsPaths(0) { - assert.FileExists(t, p) - } - - // (2) The window's txhash index built terminally: one frozen coverage [0,0] - // with its .idx on disk. - cov, ok, err := cat.FrozenTxHashIndex(0) - require.NoError(t, err) - require.True(t, ok, "window 0 has a frozen txhash index coverage") - assert.Equal(t, chunk.ID(0), cov.Lo) - assert.Equal(t, chunk.ID(0), cov.Hi) - assert.FileExists(t, layout.TxHashIndexFilePath(cov)) - - // (3) The terminal build demoted + swept the per-chunk .bin run: the txhash - // key is gone and the raw file unlinked (the index is now the durable form). - ts, err := cat.State(0, geometry.KindTxHash) - require.NoError(t, err) - assert.Equal(t, geometry.State(""), ts, "chunk 0 txhash key demoted by the terminal index build") - assert.NoFileExists(t, layout.TxHashBinPath(0)) -} - // Storage-path overrides must be HONORED by the data path, not just locked. The // daemon resolves [catalog]/[immutable_storage.*]/[streaming.hot_storage] // overrides into Paths, flocks them, and binds the Catalog via @@ -365,9 +202,9 @@ func TestRunDaemon_StoragePathOverridesHonored(t *testing.T) { store, err := metastore.New(paths.Catalog, silentLogger()) require.NoError(t, err) defer func() { _ = store.Close() }() - windows, err := NewWindows(testCPI) + windows, err := geometry.NewTxHashIndexLayout(testCPI) require.NoError(t, err) - cat := NewCatalog(store, layout, windows) + cat := catalog.NewCatalog(store, layout, windows) db, err := openHotTierForChunk(cat, cid, silentLogger()) require.NoError(t, err) @@ -621,7 +458,7 @@ func TestNotConfiguredTip_ErrorsClearly(t *testing.T) { func TestBuildProductionBoundaries_CaptiveCoreDeferred(t *testing.T) { cfg := Config{}.WithDefaults() - cfg.Streaming.CaptiveCoreConfig = "/some/core.toml" + cfg.Ingestion.CaptiveCoreConfig = "/some/core.toml" _, err := buildProductionBoundaries(context.Background(), cfg, Paths{}, nil, silentLogger()) require.Error(t, err, "captive-core production wiring is deferred to #772") assert.Contains(t, err.Error(), "#772") diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go index 50f21dabc..4d64d7c9e 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go @@ -67,6 +67,8 @@ import ( "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/txhash" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/streaming/catalog" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/streaming/geometry" ) // e2ePassphrase is the network passphrase the synthetic tx hashes are computed @@ -227,7 +229,7 @@ format = "text" // runDaemonInBackground starts RunDaemonWith on a cancellable ctx and returns a // cancel func, a channel carrying its (clean-shutdown) return, and a channel -// delivering the daemon's OWN bound *Catalog (captured from the BuildBoundaries +// delivering the daemon's OWN bound *catalog.Catalog (captured from the BuildBoundaries // callback). The metastore is opened RocksDB-primary (exclusive LOCK), so a test // CANNOT open a second handle on the same path while the daemon runs — instead // it reads durable state through the daemon's own catalog, which is safe for @@ -236,12 +238,12 @@ format = "text" // from genesis via the fake core. func runDaemonInBackground( t *testing.T, cfgPath string, core *e2eCore, served *atomic.Int32, metrics Metrics, -) (cancel context.CancelFunc, done <-chan error, catCh <-chan *Catalog) { +) (cancel context.CancelFunc, done <-chan error, catCh <-chan *catalog.Catalog) { t.Helper() ctx, cancelFn := context.WithCancel(context.Background()) errCh := make(chan error, 1) - catChan := make(chan *Catalog, 1) - build := func(_ context.Context, _ Config, _ Paths, cat *Catalog, _ *supportlog.Entry) (Boundaries, error) { + catChan := make(chan *catalog.Catalog, 1) + build := func(_ context.Context, _ Config, _ Paths, cat *catalog.Catalog, _ *supportlog.Entry) (Boundaries, error) { select { case catChan <- cat: // hand the daemon's bound catalog to the test default: @@ -263,7 +265,7 @@ func runDaemonInBackground( } // awaitCatalog waits for the daemon to hand back its bound catalog. -func awaitCatalog(t *testing.T, catCh <-chan *Catalog) *Catalog { +func awaitCatalog(t *testing.T, catCh <-chan *catalog.Catalog) *catalog.Catalog { t.Helper() select { case cat := <-catCh: @@ -409,15 +411,15 @@ func TestE2E_DaemonLifecycle_FirstStartIngestFreezeLookupRestartPrune(t *testing // .bin input the one-write index fold CONSUMES — after the fold it is // demoted+swept, reading "" not "frozen"; the durable txhash artifact is the // window's frozen coverage, not the per-chunk key.) - w0 := cat.windows.WindowID(c0) - w1 := cat.windows.WindowID(c1) + w0 := cat.TxHashIndexLayout().TxHashIndexID(c0) + w1 := cat.TxHashIndexLayout().TxHashIndexID(c1) require.Eventually(t, func() bool { - for w, c := range map[WindowID]chunk.ID{w0: c0, w1: c1} { - _, hasCov, err := cat.FrozenCoverage(w) + for w, c := range map[geometry.TxHashIndexID]chunk.ID{w0: c0, w1: c1} { + _, hasCov, err := cat.FrozenTxHashIndex(w) if err != nil || !hasCov { return false } - has, err := cat.Has(hotChunkKey(c)) + has, err := hotKeyExists(cat, c) if err != nil || has { return false } @@ -431,18 +433,18 @@ func TestE2E_DaemonLifecycle_FirstStartIngestFreezeLookupRestartPrune(t *testing // --- Correctness: chunks 0 and 1 per-chunk cold artifacts (ledgers + events) froze. --- for _, c := range []chunk.ID{c0, c1} { - for _, kind := range []Kind{KindLedgers, KindEvents} { + for _, kind := range []geometry.Kind{geometry.KindLedgers, geometry.KindEvents} { st, err := cat.State(c, kind) require.NoError(t, err) - assert.Equal(t, StateFrozen, st, "chunk %s %s is frozen", c, kind) + assert.Equal(t, geometry.StateFrozen, st, "chunk %s %s is frozen", c, kind) } } // The window's txhash index is a frozen, terminal coverage (the .idx the cold // getTransaction read resolves against). - frozenCov, ok, err := cat.FrozenCoverage(w0) + frozenCov, ok, err := cat.FrozenTxHashIndex(w0) require.NoError(t, err) require.True(t, ok, "chunk 0's window has a frozen txhash coverage") - require.True(t, cat.windows.IsTerminalCoverage(frozenCov), "a one-chunk (cpi=1) window is terminal") + require.True(t, cat.TxHashIndexLayout().IsTerminalCoverage(frozenCov), "a one-chunk (cpi=1) window is terminal") // ===================================================================== // STEP 2 — getTransaction-style hash→seq lookup, both tiers. @@ -452,7 +454,7 @@ func TestE2E_DaemonLifecycle_FirstStartIngestFreezeLookupRestartPrune(t *testing // (a) Cold .idx — the exact reader getTransaction will sit on for frozen // history. It resolves the committed hash to its real ledger seq. - coldReader, err := txhash.OpenColdReader(cat.layout.IndexFilePath(frozenCov)) + coldReader, err := txhash.OpenColdReader(cat.Layout().TxHashIndexFilePath(frozenCov)) require.NoError(t, err) gotSeq, err := coldReader.Get(coldHash) require.NoError(t, err, "the chunk-0 tx hash must resolve from the frozen cold index") @@ -497,16 +499,16 @@ func TestE2E_DaemonLifecycle_FirstStartIngestFreezeLookupRestartPrune(t *testing // read path getTransaction uses for live history before a chunk freezes. hotState, err := postCat.HotState(c2) require.NoError(t, err) - require.Equal(t, HotReady, hotState, "chunk 2 is the un-frozen live chunk") - c2lfs, err := postCat.State(c2, KindLedgers) + require.Equal(t, geometry.HotReady, hotState, "chunk 2 is the un-frozen live chunk") + c2lfs, err := postCat.State(c2, geometry.KindLedgers) require.NoError(t, err) - require.Equal(t, State(""), c2lfs, "the live chunk has no cold artifacts yet") + require.Equal(t, geometry.State(""), c2lfs, "the live chunk has no cold artifacts yet") // Retry the open: RocksDB's process-level LOCK can linger momentarily after the // writer closed (the same transient a production reader retries through). var liveDB *hotchunk.DB require.Eventually(t, func() bool { - db, oerr := hotchunk.Open(cat.layout.HotChunkPath(c2), c2, silentLogger()) + db, oerr := hotchunk.Open(cat.Layout().HotChunkPath(c2), c2, silentLogger()) if oerr != nil { return false } @@ -553,7 +555,7 @@ func TestE2E_DaemonLifecycle_FirstStartIngestFreezeLookupRestartPrune(t *testing // Capture chunk 0's frozen .idx path BEFORE the prune so we can confirm the // file itself is gone afterward. (cat's layout is path-only and stays valid // even though its metastore handle closed at the Step-3 shutdown.) - prunedIdxPath := cat.layout.IndexFilePath(frozenCov) + prunedIdxPath := cat.Layout().TxHashIndexFilePath(frozenCov) require.FileExists(t, prunedIdxPath, "chunk 0's cold index exists before the prune") cancel3, done3, catCh3 := runDaemonInBackground(t, prunedCfg, core, &served, newRecordingMetrics()) @@ -563,21 +565,21 @@ func TestE2E_DaemonLifecycle_FirstStartIngestFreezeLookupRestartPrune(t *testing // which is startup convergence). Poll for chunk 0's per-chunk artifact keys // (ledgers + events — the frozen cold artifacts) to vanish. require.Eventually(t, func() bool { - ledgers, err := pruneCat.State(c0, KindLedgers) + ledgers, err := pruneCat.State(c0, geometry.KindLedgers) if err != nil { return false } - ev, err := pruneCat.State(c0, KindEvents) + ev, err := pruneCat.State(c0, geometry.KindEvents) if err != nil { return false } - return ledgers == State("") && ev == State("") + return ledgers == geometry.State("") && ev == geometry.State("") }, 60*time.Second, 50*time.Millisecond, "retention must prune chunk 0's artifact keys") // Chunk 1 (the floor chunk) is WITHIN retention and survives the prune. - c1lfs, err := pruneCat.State(c1, KindLedgers) + c1lfs, err := pruneCat.State(c1, geometry.KindLedgers) require.NoError(t, err) - assert.Equal(t, StateFrozen, c1lfs, "chunk 1 is at the retention floor and survives") + assert.Equal(t, geometry.StateFrozen, c1lfs, "chunk 1 is at the retention floor and survives") // The on-disk cold index file is gone too (prune unlinks the files, not just // the keys) — a pruned read therefore cannot even open the reader. @@ -591,7 +593,7 @@ func TestE2E_DaemonLifecycle_FirstStartIngestFreezeLookupRestartPrune(t *testing // chunk-0 hash against — the production reader returns not-found. After prune // the window has no frozen coverage (ok=false): the read layer's "no coverage // ⇒ not-found" gate. - _, covOK, err := pruneCat.FrozenCoverage(w0) + _, covOK, err := pruneCat.FrozenTxHashIndex(w0) require.NoError(t, err) assert.False(t, covOK, "chunk 0's window coverage is pruned ⇒ a chunk-0 hash read is not-found") @@ -608,20 +610,20 @@ func TestE2E_DaemonLifecycle_FirstStartIngestFreezeLookupRestartPrune(t *testing // read-only inspection BETWEEN daemon runs (the metastore is RocksDB-primary / // exclusive-LOCK, so this MUST be closed via the returned close func before the // next daemon run reopens it). -func e2eReadCatalog(t *testing.T, dataDir string) (*Catalog, func()) { +func e2eReadCatalog(t *testing.T, dataDir string) (*catalog.Catalog, func()) { t.Helper() paths := Config{Service: ServiceConfig{DefaultDataDir: dataDir}}.WithDefaults().ResolvePaths() store, err := openMetaAt(t, paths.Catalog) require.NoError(t, err) - windows, err := NewWindows(1) // matches chunks_per_txhash_index = 1 + windows, err := geometry.NewTxHashIndexLayout(1) // matches chunks_per_txhash_index = 1 require.NoError(t, err) - return NewCatalog(store, NewLayoutFromPaths(paths), windows), func() { _ = store.Close() } + return catalog.NewCatalog(store, NewLayoutFromPaths(paths), windows), func() { _ = store.Close() } } // mustDeriveWatermark derives the durable watermark through the production probe. -func mustDeriveWatermark(t *testing.T, cat *Catalog) uint32 { +func mustDeriveWatermark(t *testing.T, cat *catalog.Catalog) uint32 { t.Helper() - wm, err := deriveWatermark(cat, NewRocksHotProbe(cat.layout.HotChunkPath, silentLogger())) + wm, err := deriveWatermark(cat, NewRocksHotProbe(cat.Layout().HotChunkPath, silentLogger())) require.NoError(t, err) return wm } diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go b/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go index 7f70d56b6..b741b2b3d 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go @@ -128,10 +128,10 @@ func startStreaming(ctx context.Context, cfg StartConfig) error { // Seed the first tick with the last complete chunk at the resume point so its // run fires at once — clearing crash/downtime leftovers concurrently with - // serving (the design's startup seed: lastCompleteChunkAt(resumeLedger - 1)). + // serving (the design's startup seed: geometry.LastCompleteChunkAt(resumeLedger - 1)). // Skipped on a young network where no chunk is complete (nothing to converge; // the first real boundary triggers the first tick). - if seed := lastCompleteChunkAt(lastCommitted); seed >= 0 { + if seed := geometry.LastCompleteChunkAt(lastCommitted); seed >= 0 { lifecycleCh <- chunk.ID(seed) //nolint:gosec // seed >= 0 } diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go index ba564a811..89544b396 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go @@ -13,6 +13,7 @@ import ( "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/streaming/catalog" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/streaming/geometry" ) // --------------------------------------------------------------------------- @@ -140,7 +141,7 @@ func startTestConfig( Logger: silentLogger(), Workers: 2, Process: ProcessConfig{ - HotProbe: NewRocksHotProbe(cat.layout.HotChunkPath, silentLogger()), + HotProbe: NewRocksHotProbe(cat.Layout().HotChunkPath, silentLogger()), Backend: zeroTxBackend(t), }, } @@ -358,7 +359,7 @@ func TestBackfill_LongDowntimeRePass(t *testing.T) { Catalog: cat, Logger: silentLogger(), Workers: 2, - Process: ProcessConfig{HotProbe: NewRocksHotProbe(cat.layout.HotChunkPath, silentLogger()), Backend: zeroTxBackend(t)}, + Process: ProcessConfig{HotProbe: NewRocksHotProbe(cat.Layout().HotChunkPath, silentLogger()), Backend: zeroTxBackend(t)}, runChunk: func(_ context.Context, cb ChunkBuild, _ ExecConfig) error { mu.Lock() allChunks = append(allChunks, cb.Chunk) @@ -500,7 +501,7 @@ func TestStartStreaming_FirstStartServeIngestCleanShutdown(t *testing.T) { // was never crossed). state, err := cat.HotState(chunk.IDFromLedger(chunk.FirstLedgerSeq)) require.NoError(t, err) - assert.Equal(t, HotReady, state) + assert.Equal(t, geometry.HotReady, state) } // A ServeReads error is surfaced wrapped as a restartable failure (NOT clean) and @@ -526,7 +527,7 @@ func TestStartStreaming_ServeReadsErrorSurfaces(t *testing.T) { // succeeds. Its key is still "ready" — only the handle was released, not the DB. state, err := cat.HotState(chunk.IDFromLedger(chunk.FirstLedgerSeq)) require.NoError(t, err) - require.Equal(t, HotReady, state) + require.Equal(t, geometry.HotReady, state) db, err := openHotTierForChunk(cat, chunk.IDFromLedger(chunk.FirstLedgerSeq), silentLogger()) require.NoError(t, err, "the resume hot DB is reopenable — startStreaming released its LOCK") require.NoError(t, db.Close())