diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go index ef68c092d..a0084c998 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go @@ -11,6 +11,7 @@ import ( "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" supportlog "github.com/stellar/go-stellar-sdk/support/log" + "github.com/stellar/go-stellar-sdk/xdr" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/daemon/interfaces" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/ingest" @@ -97,6 +98,10 @@ type Boundaries struct { // deployment that never backfills. Backend ingest.ChunkSource + // Core starts captive core at the resume ledger and yields the live getter + // the ingestion loop polls. Required. + Core CoreOpener + // ServeReads launches the RPC read server (it must return promptly, not block // until shutdown). Required. // @@ -112,6 +117,9 @@ func (b Boundaries) validate() error { if b.NetworkTip == nil { return errors.New("streaming: Boundaries.NetworkTip is nil") } + if b.Core == nil { + return errors.New("streaming: Boundaries.Core is nil") + } if b.ServeReads == nil { return errors.New("streaming: Boundaries.ServeReads is nil") } @@ -207,7 +215,10 @@ func RunDaemonWith(ctx context.Context, configPath string, opts DaemonOptions) e } // startConfig threads the loaded Config, the bound catalog/logger, and the -// assembled boundaries into the StartConfig startStreaming consumes. +// assembled boundaries into the StartConfig startStreaming consumes. The Exec +// and Lifecycle bundles share ONE catalog, worker pool, and retention floor (the +// design's "catch-up and the lifecycle goroutine share one set of +// postconditions"), so Lifecycle embeds the same ExecConfig. func startConfig( cfg Config, cat *catalog.Catalog, logger *supportlog.Entry, b Boundaries, metrics Metrics, sink ingest.MetricSink, tipBackoff time.Duration, tipMaxAttempts int, @@ -225,13 +236,18 @@ func startConfig( HotProbe: NewRocksHotProbe(cat.Layout().HotChunkPath, logger), }, } - return StartConfig{ - Exec: exec, + life := LifecycleConfig{ + ExecConfig: exec, RetentionChunks: derefU32(cfg.Retention.RetentionChunks), - NetworkTip: b.NetworkTip, - ServeReads: b.ServeReads, - TipBackoff: tipBackoff, - TipMaxAttempts: tipMaxAttempts, + } + return StartConfig{ + Exec: exec, + Lifecycle: life, + NetworkTip: b.NetworkTip, + Core: b.Core, + ServeReads: b.ServeReads, + TipBackoff: tipBackoff, + TipMaxAttempts: tipMaxAttempts, } } @@ -239,8 +255,10 @@ func startConfig( // restarts it on a restartable error after a backoff ("startup is the recovery // path"). A clean shutdown or a ctx cancel during the backoff returns nil. // -// It does NOT swallow the fatal sentinel ErrFirstStartNoTip — that surfaces UP, -// since the retry is only for transient failures a fresh start converges. +// It does NOT swallow the fatal sentinels (ErrHotVolumeLost, ErrFirstStartNoTip): +// those are returned UP so an operator/supervisor sees them. The retry here is +// for transient restartable failures (a backfill/ingest hiccup, a captive core +// crash) where a fresh start converges; the unrecoverable ones surface. func superviseStreaming( ctx context.Context, start StartConfig, logger *supportlog.Entry, backoff time.Duration, ) error { @@ -254,7 +272,7 @@ func superviseStreaming( } // Unrecoverable: surface up rather than spin restarting on a condition a // fresh start cannot heal. - if errors.Is(err, ErrFirstStartNoTip) { + if errors.Is(err, ErrHotVolumeLost) || errors.Is(err, ErrFirstStartNoTip) { return err } logger.WithError(err).Warnf("streaming: daemon run failed; restarting in %s", backoff) @@ -275,20 +293,37 @@ func superviseStreaming( // buildProductionBoundaries assembles the real external boundaries from the // loaded config. // -// TODO(#772): the bulk-backend tip boundary is still entangled with config that -// does not yet exist on this branch (the datastore type + schema — only -// [backfill.bsb].bucket_path is in Config today) and with the v1 path's lake -// tip-resolution. Until #772 lands the cutover, a deployment needing catch-up -// against a real lake must wire NetworkTip/BackendWaiter/Backend through -// DaemonOptions.BuildBoundaries; this supplies a tip adapter that errors clearly -// when no bulk backend is configured, so a frontfill deployment runs unchanged. +// - Core: captive stellar-core via newCaptiveCoreOpener; OpenCore calls +// PrepareRange(UnboundedRange(resume)) and hands back a LedgerGetter the +// ingestion loop polls by sequence (the design's core.GetLedger(ctx, seq)), +// plus a closer. The config plumbing is deferred (TODO below), so today the +// constructor errors with a #772 pointer. +// - Backend: the bulk datastore ChunkSource (NewDataStoreSource) when a bucket +// path is configured; nil for a frontfill-only deployment. +// - NetworkTip / BackendWaiter: an adapter over the bulk backend's tip. +// +// TODO(#772): both the captive-core config (binary path, passphrase, archives — +// see newCaptiveCoreOpener) and the bulk-backend TIP boundary (the datastore +// TYPE + schema; only [backfill.bsb].bucket_path is in Config today) are +// entangled with config that does not yet exist on this branch and with the lake +// tip-resolution the v1 path performs differently. Until #772 lands the cutover, +// a deployment must wire Core (and, for catch-up against a real lake, NetworkTip/ +// BackendWaiter/Backend) through DaemonOptions.BuildBoundaries; the tip adapter +// here errors clearly when no bulk backend is configured, so a frontfill +// ("genesis" or "now" with no backfill) deployment runs unchanged. func buildProductionBoundaries( - _ context.Context, _ Config, _ Paths, _ *catalog.Catalog, _ *supportlog.Entry, + _ context.Context, cfg Config, _ Paths, _ *catalog.Catalog, logger *supportlog.Entry, ) (Boundaries, error) { + core, err := newCaptiveCoreOpener(cfg.Ingestion.CaptiveCoreConfig, logger) + if err != nil { + return Boundaries{}, err + } + b := Boundaries{ + Core: core, // TODO(#772): wire the full-history RPC read server. The SQLite read path // is still the v1 daemon's; until the #772 cutover, serving is a no-op here - // so the streaming daemon catches up + freezes without double-serving reads. + // so the streaming daemon ingests + freezes without double-serving reads. ServeReads: func(context.Context) error { return nil }, } @@ -302,6 +337,59 @@ func buildProductionBoundaries( return b, nil } +// captiveCoreOpener is the production CoreOpener: it prepares captive core at the +// resume ledger and hands back a LedgerGetter the ingestion loop polls by +// sequence (the design's core.GetLedger(ctx, seq)) plus a closer. +type captiveCoreOpener struct { + backend ledgerbackend.LedgerBackend +} + +func newCaptiveCoreOpener(captiveCoreConfigPath string, logger *supportlog.Entry) (*captiveCoreOpener, error) { + if captiveCoreConfigPath == "" { + return nil, errors.New("streaming: [streaming].captive_core_config is required") + } + // TODO(#772): the captive-core CaptiveCoreConfig (binary path, network + // passphrase, history-archive URLs, storage path) is assembled from the v1 + // daemon config today; threading those through the streaming Config is part + // of the cutover. The factory below is the wiring point — once the fields are + // in Config, build a ledgerbackend.CaptiveCoreConfig from + // NewCaptiveCoreTomlFromFile(captiveCoreConfigPath, ...) and NewCaptive, then + // PrepareRange(UnboundedRange(resume)) in OpenCore. The seam (a LedgerGetter + // behind CoreOpener) is final; only the config plumbing is deferred. + return nil, fmt.Errorf("streaming: production captive-core wiring is deferred to #772 "+ + "(config %q parsed; pass a CoreOpener via DaemonOptions.BuildBoundaries to run today)", + captiveCoreConfigPath) +} + +// OpenCore prepares the backend over the unbounded range from resumeLedger and +// returns a getter wrapping GetLedger plus the backend's Close. +func (c *captiveCoreOpener) OpenCore( + ctx context.Context, resumeLedger uint32, +) (LedgerGetter, func() error, error) { + if err := c.backend.PrepareRange(ctx, ledgerbackend.UnboundedRange(resumeLedger)); err != nil { + return nil, nil, fmt.Errorf("streaming: captive core prepare range from %d: %w", resumeLedger, err) + } + return backendGetter{backend: c.backend}, c.backend.Close, nil +} + +// backendGetter adapts a ledgerbackend.LedgerBackend to LedgerGetter: GetLedger +// blocks until the ledger is available and returns its raw wire bytes. +type backendGetter struct { + backend ledgerbackend.LedgerBackend +} + +func (g backendGetter) GetLedger(ctx context.Context, seq uint32) (xdr.LedgerCloseMetaView, error) { + lcm, err := g.backend.GetLedger(ctx, seq) + if err != nil { + return nil, err + } + raw, err := lcm.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("streaming: marshal ledger %d: %w", seq, err) + } + return xdr.LedgerCloseMetaView(raw), nil +} + // notConfiguredTip is the NetworkTipBackend for a deployment with no bulk // backend configured: every sample returns a clear not-configured error — the // honest placeholder until #772 wires the real lake tip. @@ -394,6 +482,8 @@ func newLogger(cfg LoggingConfig) (*supportlog.Entry, error) { // compile-time assertions: the production adapters satisfy the injected // interfaces startStreaming/processChunk consume. var ( + _ CoreOpener = (*captiveCoreOpener)(nil) + _ LedgerGetter = backendGetter{} _ NetworkTipBackend = (*backendTip)(nil) _ BackendWaiter = (*backendTip)(nil) _ NetworkTipBackend = notConfiguredTip{} diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go index 69aa4ae74..d4af0d20b 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/daemon_test.go @@ -15,8 +15,6 @@ import ( "github.com/stretchr/testify/require" "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" - "github.com/stellar/go-stellar-sdk/keypair" - "github.com/stellar/go-stellar-sdk/network" supportlog "github.com/stellar/go-stellar-sdk/support/log" "github.com/stellar/go-stellar-sdk/xdr" @@ -59,16 +57,17 @@ format = "text" return configPath, dataDir } -// capturedBuild returns a BuildBoundaries func that hands RunDaemon a set of -// faked external boundaries (a young-network tip ⇒ no backfill, a recording -// ServeReads). It also records the resolved config/paths the daemon passed the -// builder, so a test asserts the daemon threaded LoadConfig+ResolvePaths through -// correctly. +// capturedBuild.build is a BuildBoundaries func that hands RunDaemon a set of +// faked external boundaries (a young-network tip ⇒ no backfill, a fake core +// getter that blocks until ctx cancel, a recording ServeReads). It also records +// the resolved config/paths the daemon passed the builder, so a test asserts the +// daemon threaded LoadConfig+ResolvePaths through correctly. type capturedBuild struct { called atomic.Int32 gotCfg Config gotPaths Paths served atomic.Int32 + core *fakeCore } func (c *capturedBuild) build( @@ -79,8 +78,9 @@ func (c *capturedBuild) build( c.gotPaths = paths return Boundaries{ // A young-network tip (inside chunk 0) ⇒ backfill is a no-op, so the - // daemon needs no real backend to reach the serve step. + // daemon needs no real backend to reach serve+ingest. NetworkTip: &fakeTipBackend{tips: []uint32{chunk.FirstLedgerSeq + 10}}, + Core: c.core, ServeReads: func(context.Context) error { c.served.Add(1); return nil }, }, nil } @@ -90,28 +90,35 @@ func (c *capturedBuild) build( // --------------------------------------------------------------------------- // The happy path: load TOML → lock → open meta store → validateConfig (pins the -// genesis floor) → build boundaries → startStreaming (catch-up no-op + serve) → -// clean return. Asserts the daemon pinned the layout, served reads, and threaded -// the resolved config/paths into the boundary builder. The cold-only daemon has -// no live ingestion loop, so startStreaming returns once ServeReads returns. +// genesis floor) → build boundaries → startStreaming → clean shutdown on ctx +// cancel. Asserts the daemon pinned the layout, served reads, started core at +// genesis, and threaded the resolved config/paths into the boundary builder. func TestRunDaemon_LoadValidateWireStartCleanShutdown(t *testing.T) { configPath, dataDir := writeTempConfig(t, "") - capture := &capturedBuild{} + capture := &capturedBuild{core: &fakeCore{getter: &fakeLedgerGetter{frames: map[uint32][]byte{}, blockOnCtx: true}}} opts := DaemonOptions{BuildBoundaries: capture.build, Logger: silentLogger()} + ctx, cancel := context.WithCancel(context.Background()) errCh := make(chan error, 1) - go func() { errCh <- RunDaemonWith(context.Background(), configPath, opts) }() + go func() { errCh <- RunDaemonWith(ctx, configPath, opts) }() + + // Wait until reads are served (the daemon is parked on the blocking stream). + require.Eventually(t, func() bool { return capture.served.Load() == 1 }, 3*time.Second, 5*time.Millisecond) + cancel() select { case err := <-errCh: - require.NoError(t, err, "cold catch-up + serve returns cleanly") + require.NoError(t, err, "ctx cancel is a clean shutdown") case <-time.After(3 * time.Second): - t.Fatal("RunDaemonWith did not return") + t.Fatal("RunDaemonWith did not return after ctx cancel") } assert.Equal(t, int32(1), capture.called.Load(), "boundary builder invoked once") assert.Equal(t, int32(1), capture.served.Load(), "reads served once") + assert.Equal(t, int32(1), capture.core.openedCount.Load(), "captive core started once") + assert.Equal(t, uint32(chunk.FirstLedgerSeq), capture.core.resumeSeen.Load(), + "resume ledger is genesis on a fresh start") // The daemon threaded the loaded config + resolved paths into the builder. assert.Equal(t, dataDir, capture.gotCfg.Service.DefaultDataDir) @@ -135,175 +142,16 @@ func TestRunDaemon_LoadValidateWireStartCleanShutdown(t *testing.T) { assert.Equal(t, uint32(DefaultChunksPerTxhashIndex), cpi) } -// someTxBackend serves a chunk whose ledgers are zero-tx EXCEPT a sparse few that -// carry one transaction each. A wholly zero-tx chunk cannot build a txhash index -// ("zero keys"), so the index path needs at least some keys; sparseness keeps the -// 10k-ledger pass nearly as cheap as the all-zero-tx fixture. -func someTxBackend(t *testing.T) *countingChunkSource { - t.Helper() - src := xdr.MustMuxedAddress(keypair.MustRandom().Address()) - gen := func(t *testing.T, seq uint32) []byte { - if seq%2500 != 0 { // the vast majority: cheap zero-tx ledgers - return zeroTxLCMBytes(t, seq) - } - return oneTxLCMBytes(t, seq, src) // a handful carry one unique tx - } - return &countingChunkSource{ - make: func(chunk.ID) (ledgerbackend.LedgerStream, error) { - return &fullChunkStream{t: t, gen: gen}, nil - }, - } -} - -// oneTxLCMBytes is zeroTxLCMBytes plus a single transaction (a fixed source -// account, a per-seq sequence number for a unique hash) so ExtractTxHashes yields -// exactly one txhash key for seq. -func oneTxLCMBytes(t *testing.T, seq uint32, src xdr.MuxedAccount) []byte { - t.Helper() - envelope := xdr.TransactionEnvelope{ - Type: xdr.EnvelopeTypeEnvelopeTypeTx, - V1: &xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{ - SourceAccount: src, - SeqNum: xdr.SequenceNumber(seq), // unique per ledger ⇒ unique hash - Ext: xdr.TransactionExt{V: 1, SorobanData: &xdr.SorobanTransactionData{}}, - }, - }, - } - hash, err := network.HashTransactionInEnvelope(envelope, network.PublicNetworkPassphrase) - require.NoError(t, err) - comp := []xdr.TxSetComponent{{ - Type: xdr.TxSetComponentTypeTxsetCompTxsMaybeDiscountedFee, - TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ - Txs: []xdr.TransactionEnvelope{envelope}, - }, - }} - lcm := xdr.LedgerCloseMeta{ - V: 2, - V2: &xdr.LedgerCloseMetaV2{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - ScpValue: xdr.StellarValue{CloseTime: xdr.TimePoint(0)}, - LedgerSeq: xdr.Uint32(seq), - }, - }, - TxSet: xdr.GeneralizedTransactionSet{ - V: 1, - V1TxSet: &xdr.TransactionSetV1{Phases: []xdr.TransactionPhase{{V: 0, V0Components: &comp}}}, - }, - TxProcessing: []xdr.TransactionResultMetaV1{{ - // A non-nil versioned meta: a zero-value TransactionMeta (V=0) nil-derefs - // in EncodeTo. Empty V4 ⇒ no events, which is fine for this fixture. - TxApplyProcessing: xdr.TransactionMeta{V: 4, V4: &xdr.TransactionMetaV4{}}, - Result: xdr.TransactionResultPair{ - TransactionHash: hash, - Result: xdr.TransactionResult{ - FeeCharged: 100, - Result: xdr.TransactionResultResult{ - Code: xdr.TransactionResultCodeTxSuccess, - Results: &[]xdr.OperationResult{}, - }, - }, - }, - }}, - }, - } - raw, err := lcm.MarshalBinary() - require.NoError(t, err) - return raw -} - -// The #815 end-to-end acceptance: one TOML boots the daemon and it catches up to -// the tip for ALL THREE cold data types + the window index — proven THROUGH the -// real entrypoint (LoadConfig → validateConfig → catchUp → executePlan → -// processChunk → buildTxhashIndex → buildThenSweep), not by calling the -// primitives in isolation. The happy-path test above sits the tip inside chunk 0 -// so its catch-up is a deliberate no-op; here a tip at chunk 0's last ledger -// backfills the COMPLETE chunk 0, and chunks_per_txhash_index=1 makes window 0 a -// single-chunk window so its index build is terminal — letting us assert the -// whole txhash lifecycle (.bin → merged .idx → .bin swept). workers=1 also drives -// the index-waits-on-its-chunk path (the deadlock-prone case) through the daemon. -// -// A frozen chunk is a full LedgersPerChunk (10k) pass, but the ledgers are cheap -// (zero-tx bodies + a sparse few one-tx ones), so the whole catch-up runs in well -// under a second — fast enough for -short. The merge DEPTH (multi-.bin windows, -// rolling/terminal demotion) is unit-tested in txindex_test; this test's job is -// the daemon-level COMPOSITION. -func TestRunDaemon_CatchUpMaterializesAllColdTypesAndIndex(t *testing.T) { - configPath, dataDir := writeTempConfig(t, "[layout]\nchunks_per_txhash_index = 1\n[backfill]\nworkers = 1\n") - - build := func(_ context.Context, _ Config, _ Paths, _ *catalog.Catalog, _ *supportlog.Entry) (Boundaries, error) { - return Boundaries{ - // Tip at chunk 0's last ledger ⇒ chunk 0 is complete, so the catch-up - // freezes it and (cpi=1) its single-chunk window index is terminal. - NetworkTip: &fakeTipBackend{tips: []uint32{chunk.ID(0).LastLedger()}}, - Backend: someTxBackend(t), // mostly zero-tx, a sparse few carry a tx - BackendWaiter: &fakeWaiter{}, // coverage is always satisfied - ServeReads: func(context.Context) error { return nil }, - }, nil - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - errCh := make(chan error, 1) - go func() { - errCh <- RunDaemonWith(ctx, configPath, - DaemonOptions{BuildBoundaries: build, Logger: silentLogger()}) - }() - select { - case err := <-errCh: - require.NoError(t, err, "daemon catches up to tip then exits cleanly (no-op ServeReads)") - case <-time.After(60 * time.Second): - cancel() - t.Fatal("RunDaemonWith did not finish catch-up within 60s (regressed into a hang/restart loop?)") - } - - // Read the catalog back after the daemon released its locks + closed its store. - store, err := openMetaAt(t, filepath.Join(dataDir, "catalog", "rocksdb")) - require.NoError(t, err) - defer func() { _ = store.Close() }() - windows, err := geometry.NewTxHashIndexLayout(1) - require.NoError(t, err) - layout := geometry.NewLayout(dataDir) - cat := catalog.NewCatalog(store, layout, windows) - - // (1) Chunk 0's ledger + events artifacts are frozen, with files on disk. - ls, err := cat.State(0, geometry.KindLedgers) - require.NoError(t, err) - assert.Equal(t, geometry.StateFrozen, ls, "chunk 0 ledgers frozen") - es, err := cat.State(0, geometry.KindEvents) - require.NoError(t, err) - assert.Equal(t, geometry.StateFrozen, es, "chunk 0 events frozen") - assert.FileExists(t, layout.LedgerPackPath(0)) - for _, p := range layout.EventsPaths(0) { - assert.FileExists(t, p) - } - - // (2) The window's txhash index built terminally: one frozen coverage [0,0] - // with its .idx on disk. - cov, ok, err := cat.FrozenTxHashIndex(0) - require.NoError(t, err) - require.True(t, ok, "window 0 has a frozen txhash index coverage") - assert.Equal(t, chunk.ID(0), cov.Lo) - assert.Equal(t, chunk.ID(0), cov.Hi) - assert.FileExists(t, layout.TxHashIndexFilePath(cov)) - - // (3) The terminal build demoted + swept the per-chunk .bin run: the txhash - // key is gone and the raw file unlinked (the index is now the durable form). - ts, err := cat.State(0, geometry.KindTxHash) - require.NoError(t, err) - assert.Equal(t, geometry.State(""), ts, "chunk 0 txhash key demoted by the terminal index build") - assert.NoFileExists(t, layout.TxHashBinPath(0)) -} - // Storage-path overrides must be HONORED by the data path, not just locked. The // daemon resolves [catalog]/[immutable_storage.*]/[streaming.hot_storage] // overrides into Paths, flocks them, and binds the Catalog via // NewLayoutFromPaths(paths) — so the Layout the data path reads/writes must -// place every artifact under the OVERRIDE, never under DataDir. Before the fix -// the Layout derived all paths from DataDir alone: the lock and the data -// location diverged silently. This test pins that the bound Layout's paths all -// live under the overrides. +// place every artifact and the hot DB under the OVERRIDE, never under DataDir. +// Before the fix the Layout derived all paths from DataDir alone: the lock and +// the data location diverged silently. This test pins both halves: (1) the +// bound Layout's paths all live under the overrides, and (2) actually opening a +// hot DB through the data path (openHotTierForChunk) lands the dir under the hot +// override with NOTHING under {DataDir}/hot. func TestRunDaemon_StoragePathOverridesHonored(t *testing.T) { dataDir := t.TempDir() overrideRoot := t.TempDir() // a distinct mount, e.g. /mnt/nvme @@ -347,6 +195,29 @@ func TestRunDaemon_StoragePathOverridesHonored(t *testing.T) { } // Nothing resolves under {DataDir}/hot or {DataDir}/ledgers. assert.NotEqual(t, filepath.Join(dataDir, "hot", cid.String()), layout.HotChunkPath(cid)) + + // (2) The data path actually creates the hot DB under the override. Bind a + // real catalog on this Layout and open a hot tier through the same call the + // ingestion loop uses. + store, err := metastore.New(paths.Catalog, silentLogger()) + require.NoError(t, err) + defer func() { _ = store.Close() }() + windows, err := geometry.NewTxHashIndexLayout(testCPI) + require.NoError(t, err) + cat := catalog.NewCatalog(store, layout, windows) + + db, err := openHotTierForChunk(cat, cid, silentLogger()) + require.NoError(t, err) + require.NoError(t, db.Close()) + + // The hot DB dir exists under the override... + hotDir := filepath.Join(hotOverride, cid.String()) + info, err := os.Stat(hotDir) + require.NoError(t, err, "hot DB must be created under the hot_storage override") + assert.True(t, info.IsDir()) + // ...and NOTHING was written under {DataDir}/hot (the old, buggy location). + _, err = os.Stat(filepath.Join(dataDir, "hot")) + assert.True(t, os.IsNotExist(err), "no hot data may land under DataDir when an override is set") } // filepathHasPrefix reports whether path lives under prefix (prefix is an @@ -371,7 +242,7 @@ func TestRunDaemon_LockContentionFailsFast(t *testing.T) { require.NoError(t, err) defer locks.Release() - capture := &capturedBuild{} + capture := &capturedBuild{core: &fakeCore{}} err = RunDaemonWith(context.Background(), configPath, DaemonOptions{BuildBoundaries: capture.build, Logger: silentLogger()}) require.ErrorIs(t, err, ErrRootLocked) @@ -384,7 +255,7 @@ func TestRunDaemon_LockContentionFailsFast(t *testing.T) { func TestRunDaemon_NowFloorRequiresTip(t *testing.T) { configPath, _ := writeTempConfigNow(t) - capture := &capturedBuild{} + capture := &capturedBuild{core: &fakeCore{}} // The builder returns an unreachable tip, so "now" cannot resolve. build := func(_ context.Context, cfg Config, paths Paths, c *catalog.Catalog, l *supportlog.Entry) (Boundaries, error) { b, _ := capture.build(context.Background(), cfg, paths, c, l) @@ -458,14 +329,12 @@ func TestSuperviseStreaming_RetriesThenCleanShutdown(t *testing.T) { pinGenesis(t, cat) var attempts atomic.Int32 + core := &fakeCore{openErr: errors.New("transient core open failure")} tip := &fakeTipBackend{tips: []uint32{chunk.FirstLedgerSeq + 10}} // young: no backfill - start := startTestConfig(t, cat, tip, nil) - // A ServeReads that always errors makes each startStreaming attempt a - // restartable failure; counting its calls counts the restarts. - start.ServeReads = func(context.Context) error { - attempts.Add(1) - return errors.New("transient serve failure") - } + start := startTestConfig(t, cat, tip, core, nil) + // Count startStreaming attempts by observing core opens (one per attempt past + // backfill); openErr makes each attempt a restartable failure. + start.ServeReads = func(context.Context) error { return nil } ctx, cancel := context.WithCancel(context.Background()) errCh := make(chan error, 1) @@ -473,6 +342,7 @@ func TestSuperviseStreaming_RetriesThenCleanShutdown(t *testing.T) { // Let a few restarts happen, then cancel. require.Eventually(t, func() bool { + attempts.Store(core.openedCount.Load()) return attempts.Load() >= 2 }, 3*time.Second, 5*time.Millisecond) cancel() @@ -483,7 +353,7 @@ func TestSuperviseStreaming_RetriesThenCleanShutdown(t *testing.T) { case <-time.After(3 * time.Second): t.Fatal("superviseStreaming did not return after cancel") } - assert.GreaterOrEqual(t, attempts.Load(), int32(2), "restarted on the transient failure") + assert.GreaterOrEqual(t, core.openedCount.Load(), int32(2), "restarted on the transient failure") } // The fatal sentinels are surfaced UP, not retried (a fresh start cannot heal @@ -494,7 +364,7 @@ func TestSuperviseStreaming_FatalSentinelSurfaces(t *testing.T) { // Unreachable tip + no local progress ⇒ ErrFirstStartNoTip, a fatal that must // surface rather than spin. tip := &fakeTipBackend{err: errors.New("unreachable"), errFirst: 99} - start := startTestConfig(t, cat, tip, nil) + start := startTestConfig(t, cat, tip, &fakeCore{}, nil) err := superviseStreaming(context.Background(), start, silentLogger(), time.Hour) require.ErrorIs(t, err, ErrFirstStartNoTip, "fatal sentinel surfaces immediately, no retry") @@ -583,23 +453,22 @@ func TestNotConfiguredTip_ErrorsClearly(t *testing.T) { } // --------------------------------------------------------------------------- -// buildProductionBoundaries — the cold-only frontfill boundaries. +// buildProductionBoundaries — captive-core wiring is deferred to #772. // --------------------------------------------------------------------------- -// The cold-only daemon has no captive core: buildProductionBoundaries returns a -// frontfill Boundaries (a no-op ServeReads and a not-configured tip) with no -// error, and that set validates. -func TestBuildProductionBoundaries_FrontfillNoBackend(t *testing.T) { +func TestBuildProductionBoundaries_CaptiveCoreDeferred(t *testing.T) { cfg := Config{}.WithDefaults() - b, err := buildProductionBoundaries(context.Background(), cfg, Paths{}, nil, silentLogger()) - require.NoError(t, err) - require.NotNil(t, b.ServeReads, "a no-op ServeReads is wired") - require.NoError(t, b.validate(), "the frontfill boundaries validate") + cfg.Ingestion.CaptiveCoreConfig = "/some/core.toml" + _, err := buildProductionBoundaries(context.Background(), cfg, Paths{}, nil, silentLogger()) + require.Error(t, err, "captive-core production wiring is deferred to #772") + assert.Contains(t, err.Error(), "#772") +} - // The tip is the not-configured placeholder: sampling it errors clearly. - _, tipErr := b.NetworkTip.NetworkTip(context.Background()) - require.Error(t, tipErr) - assert.Contains(t, tipErr.Error(), "no bulk backend configured") +func TestBuildProductionBoundaries_RequiresCaptiveCoreConfig(t *testing.T) { + cfg := Config{}.WithDefaults() // no captive_core_config + _, err := buildProductionBoundaries(context.Background(), cfg, Paths{}, nil, silentLogger()) + require.Error(t, err) + assert.Contains(t, err.Error(), "captive_core_config") } func TestNewLogger(t *testing.T) { diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go new file mode 100644 index 000000000..4d64d7c9e --- /dev/null +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go @@ -0,0 +1,632 @@ +package streaming + +// ============================================================================= +// Issue 19 — in-process end-to-end integration of the streaming daemon. +// +// WHAT IS REAL HERE +// Everything inside the process is the real production code path: +// - RunDaemonWith (the true daemon entrypoint): TOML load + form-validate, +// per-root flock, meta-store open + Catalog bind, the stateful +// validateConfig gate (pins the immutable layout + resolves the floor), +// and the supervised startStreaming loop. +// - startStreaming → catchUp → openHotTierForChunk → runIngestionLoop (the +// real atomic per-ledger WriteBatch across all CFs of the real per-chunk +// hotchunk RocksDB), the real boundary handoff, the real doorbell. +// - lifecycleLoop / runLifecycleTick: the real resolve + executePlan freeze +// (cold artifacts derived FROM the live hot DB via processChunk's hot +// branch), the real txhash index fold (a real streamhash .idx on disk), +// the real discard + prune scans. +// - The real txhash stores on both sides of a getTransaction-style hash→seq +// lookup: the cold ColdReader over the frozen .idx and the live HotStore +// CF. +// - Catalog.Audit (INV-1..4) over the real durable keys + files. +// +// WHAT IS FAKED (and why that is the right boundary) +// Only the two EXTERNAL boundaries the daemon injects on purpose: +// - The ledger SOURCE. Production drives ingestion from captive +// stellar-core (a child process) and backfill from a bulk object-store +// backend. Here both cross their injected interfaces (CoreOpener / +// NetworkTipBackend) and are fed SYNTHETIC-BUT-WELL-FORMED LedgerCloseMeta +// built by the same fixtures the merged store tests use (zero-tx LCM for +// bulk, plus a one-tx LCM where a real, network-hashed transaction hash is +// needed so the txhash index has a real key to resolve). No captive core, +// no docker-stellar-core, no object store, no network. +// - ServeReads is a no-op recorder (the SQLite→full-history read cutover is +// #772; see daemon.go). The read PATH we actually exercise is the txhash +// index lookup the getTransaction handler will sit on top of. +// +// FOLLOW-UP (out of scope here; requires infra not available in this sandbox) +// A full captive-core + docker-stellar-core E2E belongs in the existing +// integrationtest harness (cmd/stellar-rpc/internal/integrationtest): it +// stands up a real core + a real history archive and ingests real network +// ledgers. That validates the ledger SOURCE adapters (captiveCoreOpener, +// backendTip/DataStoreSource) this test fakes, and is gated on the #772 read +// cutover for an end-user getTransaction round-trip over RPC. This in-process +// test deliberately stops at the daemon's injected boundaries so it runs with +// no external services. +// ============================================================================= + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/stellar/go-stellar-sdk/keypair" + "github.com/stellar/go-stellar-sdk/network" + supportlog "github.com/stellar/go-stellar-sdk/support/log" + "github.com/stellar/go-stellar-sdk/xdr" + + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/txhash" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/streaming/catalog" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/streaming/geometry" +) + +// e2ePassphrase is the network passphrase the synthetic tx hashes are computed +// against. Any stable value works; the index only needs deterministic hashes +// the test can then look up. +const e2ePassphrase = network.PublicNetworkPassphrase + +// oneTxLCMReturningHash builds a well-formed V2 LedgerCloseMeta carrying exactly +// ONE transaction for seq and returns BOTH the wire bytes and the real, +// network-hashed transaction hash. A non-zero-tx ledger is required somewhere in +// a chunk so its txhash .bin is non-empty (streamhash refuses a zero-key cold +// index, txhash.ErrEmptyBuildSet); returning the hash lets the E2E assert the +// getTransaction-style hash→seq lookup against a hash the daemon really +// committed. It mirrors lifecycle_test's oneTxLCMBytes, exposing the hash. +func oneTxLCMReturningHash(t *testing.T, seq uint32) ([]byte, [32]byte) { + t.Helper() + envelope := xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + SourceAccount: xdr.MustMuxedAddress(keypair.MustRandom().Address()), + Ext: xdr.TransactionExt{V: 1, SorobanData: &xdr.SorobanTransactionData{}}, + }, + }, + } + hash, err := network.HashTransactionInEnvelope(envelope, e2ePassphrase) + require.NoError(t, err) + + comp := []xdr.TxSetComponent{{ + Type: xdr.TxSetComponentTypeTxsetCompTxsMaybeDiscountedFee, + TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ + Txs: []xdr.TransactionEnvelope{envelope}, + }, + }} + opResults := []xdr.OperationResult{} + lcm := xdr.LedgerCloseMeta{ + V: 2, + V2: &xdr.LedgerCloseMetaV2{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{CloseTime: xdr.TimePoint(0)}, + LedgerSeq: xdr.Uint32(seq), + }, + }, + TxSet: xdr.GeneralizedTransactionSet{ + V: 1, + V1TxSet: &xdr.TransactionSetV1{Phases: []xdr.TransactionPhase{{V: 0, V0Components: &comp}}}, + }, + TxProcessing: []xdr.TransactionResultMetaV1{{ + TxApplyProcessing: xdr.TransactionMeta{ + V: 4, + V4: &xdr.TransactionMetaV4{Operations: []xdr.OperationMetaV2{}}, + }, + Result: xdr.TransactionResultPair{ + TransactionHash: hash, + Result: xdr.TransactionResult{ + FeeCharged: 100, + Result: xdr.TransactionResultResult{Code: xdr.TransactionResultCodeTxSuccess, Results: &opResults}, + }, + }, + }}, + }, + } + raw, err := lcm.MarshalBinary() + require.NoError(t, err) + return raw, hash +} + +// e2eGetter is the FAKE captive-core ledger getter: a resumable LedgerGetter the +// ingestion loop polls by sequence (the design's core.GetLedger(ctx, seq)). It +// returns the frame for the requested seq when it has one, and once the poll +// runs past the synthetic backlog it blocks until ctx is cancelled (a live tip +// stream ends only on shutdown). It records the FIRST seq it was asked for so +// the restart step can assert the daemon re-derived the watermark and resumed +// with no gap. The ctx-cancelled GetLedger return is the clean-shutdown path the +// daemon top level classifies as clean. +type e2eGetter struct { + frames map[uint32][]byte + fromSeen *atomic.Uint32 // first GetLedger seq (for the restart assertion) + delivered *atomic.Uint32 // highest seq actually yielded (test sync) + sawFrom atomic.Bool +} + +type e2eFrame struct { + seq uint32 + raw []byte +} + +var _ LedgerGetter = (*e2eGetter)(nil) + +func (s *e2eGetter) GetLedger(ctx context.Context, seq uint32) (xdr.LedgerCloseMetaView, error) { + if s.sawFrom.CompareAndSwap(false, true) { + s.fromSeen.Store(seq) + } + if ctx.Err() != nil { + return nil, ctx.Err() + } + if raw, ok := s.frames[seq]; ok { + s.delivered.Store(seq) + return xdr.LedgerCloseMetaView(raw), nil + } + // Past the synthetic backlog: a live tip blocks until shutdown so the loop + // does not see an error that would look like a core crash. + <-ctx.Done() + return nil, ctx.Err() +} + +// e2eCore is the CoreOpener handing back a fresh e2eGetter per daemon run (a +// restart opens core anew). It records the resume ledger every open was driven +// from. +type e2eCore struct { + frames []e2eFrame + resumeSeen atomic.Uint32 + fromSeen atomic.Uint32 + delivered atomic.Uint32 + opens atomic.Int32 +} + +func (c *e2eCore) OpenCore(_ context.Context, resume uint32) (LedgerGetter, func() error, error) { + c.opens.Add(1) + c.resumeSeen.Store(resume) + byseq := make(map[uint32][]byte, len(c.frames)) + for _, f := range c.frames { + byseq[f.seq] = f.raw + } + getter := &e2eGetter{frames: byseq, fromSeen: &c.fromSeen, delivered: &c.delivered} + return getter, func() error { return nil }, nil +} + +// e2eConfigPath writes a daemon TOML for an in-process E2E: genesis floor (no +// tip needed to validate/start), a one-chunk index window (chunks_per_txhash_- +// index = 1, so every window is terminal the instant its chunk freezes — the +// freeze→fold→discard sequence completes on the boundary tick), and the given +// retention width. captive_core_config is a stub path the test's BuildBoundaries +// replaces with a fake stream, never opening a real core. +func e2eConfigPath(t *testing.T, dataDir string, retentionChunks uint32) string { + t.Helper() + cfgPath := filepath.Join(t.TempDir(), "daemon.toml") + body := fmt.Sprintf(` +[service] +default_data_dir = %q + +[streaming] +earliest_ledger = "genesis" +captive_core_config = "/dev/null" +retention_chunks = %d + +[backfill] +chunks_per_txhash_index = 1 + +[logging] +level = "error" +format = "text" +`, dataDir, retentionChunks) + require.NoError(t, os.WriteFile(cfgPath, []byte(body), 0o644)) + return cfgPath +} + +// runDaemonInBackground starts RunDaemonWith on a cancellable ctx and returns a +// cancel func, a channel carrying its (clean-shutdown) return, and a channel +// delivering the daemon's OWN bound *catalog.Catalog (captured from the BuildBoundaries +// callback). The metastore is opened RocksDB-primary (exclusive LOCK), so a test +// CANNOT open a second handle on the same path while the daemon runs — instead +// it reads durable state through the daemon's own catalog, which is safe for +// concurrent reads. ServeReads records the serve count; a young-network tip +// (inside chunk 0) means backfill is a no-op and first-start ingests directly +// from genesis via the fake core. +func runDaemonInBackground( + t *testing.T, cfgPath string, core *e2eCore, served *atomic.Int32, metrics Metrics, +) (cancel context.CancelFunc, done <-chan error, catCh <-chan *catalog.Catalog) { + t.Helper() + ctx, cancelFn := context.WithCancel(context.Background()) + errCh := make(chan error, 1) + catChan := make(chan *catalog.Catalog, 1) + build := func(_ context.Context, _ Config, _ Paths, cat *catalog.Catalog, _ *supportlog.Entry) (Boundaries, error) { + select { + case catChan <- cat: // hand the daemon's bound catalog to the test + default: + } + return Boundaries{ + NetworkTip: &fakeTipBackend{tips: []uint32{chunk.FirstLedgerSeq + 5}}, + Core: core, + ServeReads: func(context.Context) error { served.Add(1); return nil }, + }, nil + } + opts := DaemonOptions{ + BuildBoundaries: build, + Logger: silentLogger(), + Metrics: metrics, + RestartBackoff: 10 * time.Millisecond, + } + go func() { errCh <- RunDaemonWith(ctx, cfgPath, opts) }() + return cancelFn, errCh, catChan +} + +// awaitCatalog waits for the daemon to hand back its bound catalog. +func awaitCatalog(t *testing.T, catCh <-chan *catalog.Catalog) *catalog.Catalog { + t.Helper() + select { + case cat := <-catCh: + return cat + case <-time.After(10 * time.Second): + t.Fatal("daemon did not bind a catalog") + return nil + } +} + +// waitClean cancels the daemon and requires a clean (nil) shutdown. +func waitClean(t *testing.T, cancel context.CancelFunc, done <-chan error) { + t.Helper() + cancel() + select { + case err := <-done: + require.NoError(t, err, "ctx cancel is a clean daemon shutdown") + case <-time.After(60 * time.Second): + // Post-cancel shutdown joins one in-flight lifecycle unit; a mid-flight + // freeze's Finalize fsync + index build is unpreemptible and slow under + // -race + contention — the same reason the boundary-cross budget is 600s. + t.Fatal("daemon did not shut down cleanly after ctx cancel") + } +} + +// ============================================================================ +// The end-to-end walk. +// ============================================================================ + +// TestE2E_DaemonLifecycle_FirstStartIngestFreezeLookupRestartPrune drives the +// whole daemon lifecycle in one process against the real stores and the fake +// ledger source: +// +// first start (genesis, young-network tip ⇒ direct ingest) → +// ingest a FULL chunk + cross into the next (real boundary handoff) → +// lifecycle tick freezes chunk 0 + folds its terminal txhash index + discards +// its hot tier → +// getTransaction-style hash→seq lookup resolves from the cold .idx (chunk 0) +// AND from the live hot CF (chunk 1) → +// clean shutdown → +// RESTART: re-derive the watermark, resume at exactly watermark+1 (no gap) → +// drive retention far enough to prune chunk 0, and confirm a pruned read is +// not-found → +// finish with Catalog.Audit → Clean. +// +// Correctness is asserted at every step. +func TestE2E_DaemonLifecycle_FirstStartIngestFreezeLookupRestartPrune(t *testing.T) { + if testing.Short() { + t.Skip("e2e ingests a full 10k-ledger chunk; skipped in -short") + } + + dataDir := t.TempDir() + + const c0 = chunk.ID(0) + const c1 = chunk.ID(1) + const c2 = chunk.ID(2) + + // --- Synthetic ledgers. We cross TWO chunk boundaries so chunks 0 AND 1 both + // freeze (completeThrough reaches chunk 1's last ledger), leaving chunk 2 as + // the live (un-frozen) chunk. That layout lets a later retention_chunks=1 run + // prune chunk 0 (wholly below the floor) while chunk 1 survives. + // + // Each chunk is ingested in FULL and contiguously from its first ledger (the + // events CF's strict-contiguity precondition), so the freeze derives every + // cold artifact. One real, network-hashed tx is planted where a resolvable + // hash is needed — chunk 0's first ledger (→ frozen cold .idx) and chunk 2's + // first ledger (→ the live hot CF). Every other ledger is zero-tx for speed. + c0First := c0.FirstLedger() + c1First := c1.FirstLedger() + c2First := c2.FirstLedger() + + coldRaw, coldHash := oneTxLCMReturningHash(t, c0First) // → frozen cold .idx (chunk 0) + hotRaw, hotHash := oneTxLCMReturningHash(t, c2First) // → live hot CF (chunk 2) + // Chunk 1's first ledger also carries a tx so its txhash .bin is non-empty — + // streamhash refuses to build a cold index over zero keys (ErrEmptyBuildSet), + // which would otherwise abort the lifecycle tick when chunk 1 freezes. + c1Raw, _ := oneTxLCMReturningHash(t, c1First) + + frames := make([]e2eFrame, 0, 2*int(chunk.LedgersPerChunk)+2) + appendLedger := func(seq uint32) { + var raw []byte + switch seq { + case c0First: + raw = coldRaw + case c1First: + raw = c1Raw + case c2First: + raw = hotRaw + default: + raw = zeroTxLCMBytes(t, seq) + } + frames = append(frames, e2eFrame{seq: seq, raw: raw}) + } + // Chunks 0 and 1 in full (both freeze), then chunk 2's first two ledgers (the + // live chunk; boundary 1→2 fired, chunk 2 opened, its first ledger committed). + for seq := c0First; seq <= c1.LastLedger(); seq++ { + appendLedger(seq) + } + appendLedger(c2First) + appendLedger(c2First + 1) + + core := &e2eCore{frames: frames} + var served atomic.Int32 + metrics := newRecordingMetrics() + + // ===================================================================== + // STEP 1 — first start: config → lock → validate (pin genesis) → start → + // direct ingest across the chunk-0 AND chunk-1 boundaries, with the lifecycle + // freezing, folding, and discarding each just-closed chunk off the doorbell. + // ===================================================================== + cfgPath := e2eConfigPath(t, dataDir, 0) // retention 0 (full history) for now + cancel, done, catCh := runDaemonInBackground(t, cfgPath, core, &served, metrics) + + // Inspect durable state through the daemon's OWN bound catalog (the metastore + // is opened RocksDB-primary, so a second handle would fail the LOCK). The + // catalog is safe for concurrent reads alongside the daemon's writes. + cat := awaitCatalog(t, catCh) + + // First wait until ingestion crosses BOTH boundaries and commits into chunk 2 + // (the new live chunk). Delivering c2First proves both boundary handoffs fired + // (chunks 0 and 1 closed, chunk 2 opened) and seeds the live hot-CF lookup. + // (NOTE: we must NOT gate on "chunk 0's hot key absent" first — the daemon + // hands the test its catalog from BuildBoundaries, BEFORE startStreaming opens + // the resume chunk's hot DB, so that key is transiently absent at start.) + // Budget note: crossing both boundaries is ~20k per-ledger SYNCED WriteBatches + // (the design's one-atomic-synced-batch-per-ledger durability boundary) racing + // the lifecycle freezes that re-read 10k ledgers each. fsync throughput is + // highly variable under contention: in isolation this reaches chunk 2 in ~110s + // (no -race) but ~175s under -race, and the CI gate runs the whole tree under + // `-race` (so this E2E is NOT -short-skipped there) alongside this package's + // six t.Parallel() full-chunk ticks, all competing for the same disk. 180s was + // too tight (flaky timeouts at 161/167s/killed). 600s absorbs the worst-case + // contended -race path while staying far under the 25m package envelope. + require.Eventually(t, func() bool { + return core.delivered.Load() >= c2First + }, 600*time.Second, 200*time.Millisecond, "ingestion must cross both boundaries into chunk 2") + + // The boundary doorbells have rung. A lifecycle tick freezes each just-closed + // chunk's cold artifacts (from its closed hot DB), folds its terminal (cpi=1) + // txhash index, then discards its hot tier. The durable completion signal per + // chunk: the window has a FROZEN txhash coverage (the .idx) AND the chunk's hot + // key is gone (discarded). (NOTE: the per-chunk chunk:{c}:txhash key is the + // .bin input the one-write index fold CONSUMES — after the fold it is + // demoted+swept, reading "" not "frozen"; the durable txhash artifact is the + // window's frozen coverage, not the per-chunk key.) + w0 := cat.TxHashIndexLayout().TxHashIndexID(c0) + w1 := cat.TxHashIndexLayout().TxHashIndexID(c1) + require.Eventually(t, func() bool { + for w, c := range map[geometry.TxHashIndexID]chunk.ID{w0: c0, w1: c1} { + _, hasCov, err := cat.FrozenTxHashIndex(w) + if err != nil || !hasCov { + return false + } + has, err := hotKeyExists(cat, c) + if err != nil || has { + return false + } + } + return true + }, 60*time.Second, 50*time.Millisecond, "the boundary ticks must freeze+fold+discard chunks 0 and 1") + + require.GreaterOrEqual(t, served.Load(), int32(1), "reads were served") + require.Equal(t, uint32(c0First), core.resumeSeen.Load(), + "first start resumes captive core at genesis (watermark+1)") + + // --- Correctness: chunks 0 and 1 per-chunk cold artifacts (ledgers + events) froze. --- + for _, c := range []chunk.ID{c0, c1} { + for _, kind := range []geometry.Kind{geometry.KindLedgers, geometry.KindEvents} { + st, err := cat.State(c, kind) + require.NoError(t, err) + assert.Equal(t, geometry.StateFrozen, st, "chunk %s %s is frozen", c, kind) + } + } + // The window's txhash index is a frozen, terminal coverage (the .idx the cold + // getTransaction read resolves against). + frozenCov, ok, err := cat.FrozenTxHashIndex(w0) + require.NoError(t, err) + require.True(t, ok, "chunk 0's window has a frozen txhash coverage") + require.True(t, cat.TxHashIndexLayout().IsTerminalCoverage(frozenCov), "a one-chunk (cpi=1) window is terminal") + + // ===================================================================== + // STEP 2 — getTransaction-style hash→seq lookup, both tiers. + // (a) cold: resolve chunk 0's tx via the frozen .idx on disk. + // (b) hot: resolve chunk 2's tx via the live hot DB's txhash CF. + // ===================================================================== + + // (a) Cold .idx — the exact reader getTransaction will sit on for frozen + // history. It resolves the committed hash to its real ledger seq. + coldReader, err := txhash.OpenColdReader(cat.Layout().TxHashIndexFilePath(frozenCov)) + require.NoError(t, err) + gotSeq, err := coldReader.Get(coldHash) + require.NoError(t, err, "the chunk-0 tx hash must resolve from the frozen cold index") + assert.Equal(t, c0First, gotSeq, "cold lookup returns the ledger the tx was committed in") + // A hash that was never committed misses (not-found, not a wrong answer). + _, missErr := coldReader.Get(hashAt(0xE2EDEADBEEF)) + require.ErrorIs(t, missErr, stores.ErrNotFound, "an uncommitted hash misses the cold index") + require.NoError(t, coldReader.Close()) + + // (b) is performed AFTER the clean shutdown below — opening chunk 2's hot DB + // read-only would conflict with the live ingestion writer's exclusive RocksDB + // LOCK while the daemon runs; once the daemon stops cleanly the live chunk's + // hot DB is on disk and reopenable. The hot tier is the UN-frozen live chunk's + // sole copy, so this still exercises the hot read path. + + // Observability: the daemon emitted the boundary + freeze phase signals (the + // control-plane health gauges). + assert.GreaterOrEqual(t, len(metrics.snapshotBoundaries()), 1, "at least one chunk boundary was signaled") + assert.GreaterOrEqual(t, metrics.snapshotFreezeCount(), 1, "at least one freeze stage ran") + + // ===================================================================== + // STEP 3 — clean shutdown. The supervised loop returns nil on ctx cancel. + // ===================================================================== + // (Watermark derivation opens the live hot DB read-only, so it MUST run after + // the daemon — the live writer — releases the exclusive RocksDB LOCK; do it + // after waitClean below.) + waitClean(t, cancel, done) + + // The daemon's catalog rode its now-closed metastore handle; bind a fresh + // inspection catalog on the (now lock-free) data dir for the post-shutdown + // reads. It MUST be closed before the restart reopens the metastore. + postCat, closePost := e2eReadCatalog(t, dataDir) + + // The durable watermark, re-derived from the post-shutdown state (the basis + // for the restart's resume-with-no-gap assertion). + wmBeforeRestart := mustDeriveWatermark(t, postCat) + require.GreaterOrEqual(t, wmBeforeRestart, c2First, "watermark advanced into chunk 2") + + // (b) Live hot CF — now the daemon has stopped, chunk 2 (still the un-frozen + // live chunk: its hot key is "ready", no cold artifacts) is reopenable. Open + // its real hot DB and resolve the chunk-2 tx hash through the txhash CF — the + // read path getTransaction uses for live history before a chunk freezes. + hotState, err := postCat.HotState(c2) + require.NoError(t, err) + require.Equal(t, geometry.HotReady, hotState, "chunk 2 is the un-frozen live chunk") + c2lfs, err := postCat.State(c2, geometry.KindLedgers) + require.NoError(t, err) + require.Equal(t, geometry.State(""), c2lfs, "the live chunk has no cold artifacts yet") + + // Retry the open: RocksDB's process-level LOCK can linger momentarily after the + // writer closed (the same transient a production reader retries through). + var liveDB *hotchunk.DB + require.Eventually(t, func() bool { + db, oerr := hotchunk.Open(cat.Layout().HotChunkPath(c2), c2, silentLogger()) + if oerr != nil { + return false + } + liveDB = db + return true + }, 10*time.Second, 50*time.Millisecond, "chunk 2's hot DB must be reopenable after shutdown") + hotSeq, err := liveDB.Txhash().Get(hotHash) + require.NoError(t, err, "the chunk-2 tx hash must resolve from the live hot CF") + assert.Equal(t, c2First, hotSeq, "hot lookup returns the live tx's ledger") + require.NoError(t, liveDB.Close()) // release before the restart reopens it as the live writer + + // ===================================================================== + // STEP 4 — RESTART. A fresh RunDaemonWith re-opens everything, re-derives the + // watermark from durable state, and resumes captive core at watermark+1 with + // no gap. (The shared e2eCore records the new resume + the stream's From.) + // ===================================================================== + closePost() // release the inspection metastore handle before the daemon reopens it + core.opens.Store(0) + core.resumeSeen.Store(0) + core.fromSeen.Store(0) + cancel2, done2, _ := runDaemonInBackground(t, cfgPath, core, &served, newRecordingMetrics()) + + require.Eventually(t, func() bool { return core.opens.Load() >= 1 }, 30*time.Second, 20*time.Millisecond, + "the restarted daemon re-opened captive core") + require.Eventually(t, func() bool { return core.fromSeen.Load() != 0 }, 30*time.Second, 20*time.Millisecond, + "the restarted ingestion loop requested a resume range") + + wantResume := wmBeforeRestart + 1 + assert.Equal(t, wantResume, core.resumeSeen.Load(), + "restart resumes captive core at the re-derived watermark+1 (no gap, no re-fetch of the bottom)") + assert.Equal(t, wantResume, core.fromSeen.Load(), + "the ingestion loop streamed from watermark+1 — the durable frontier, re-derived not stored") + + waitClean(t, cancel2, done2) + + // ===================================================================== + // STEP 5 — retention prune. Re-run the daemon with retention_chunks = 1: the + // effective floor anchors at chunk 1 (lastCompleteChunkAt(through=chunk 1) - + // 1 + 1), so chunk 0 (frozen + folded) falls WHOLLY below the floor and the + // prune scan sweeps its files + keys, while chunk 1 (the floor chunk) survives. + // A read of a pruned chunk-0 hash is then not-found (no coverage to resolve it). + // ===================================================================== + prunedCfg := e2eConfigPath(t, dataDir, 1) // retain ~1 chunk + // Capture chunk 0's frozen .idx path BEFORE the prune so we can confirm the + // file itself is gone afterward. (cat's layout is path-only and stays valid + // even though its metastore handle closed at the Step-3 shutdown.) + prunedIdxPath := cat.Layout().TxHashIndexFilePath(frozenCov) + require.FileExists(t, prunedIdxPath, "chunk 0's cold index exists before the prune") + + cancel3, done3, catCh3 := runDaemonInBackground(t, prunedCfg, core, &served, newRecordingMetrics()) + pruneCat := awaitCatalog(t, catCh3) // the pruning daemon's own catalog + + // The prune scan runs on the first lifecycle tick (the at-start doorbell ring, + // which is startup convergence). Poll for chunk 0's per-chunk artifact keys + // (ledgers + events — the frozen cold artifacts) to vanish. + require.Eventually(t, func() bool { + ledgers, err := pruneCat.State(c0, geometry.KindLedgers) + if err != nil { + return false + } + ev, err := pruneCat.State(c0, geometry.KindEvents) + if err != nil { + return false + } + return ledgers == geometry.State("") && ev == geometry.State("") + }, 60*time.Second, 50*time.Millisecond, "retention must prune chunk 0's artifact keys") + + // Chunk 1 (the floor chunk) is WITHIN retention and survives the prune. + c1lfs, err := pruneCat.State(c1, geometry.KindLedgers) + require.NoError(t, err) + assert.Equal(t, geometry.StateFrozen, c1lfs, "chunk 1 is at the retention floor and survives") + + // The on-disk cold index file is gone too (prune unlinks the files, not just + // the keys) — a pruned read therefore cannot even open the reader. + require.Eventually(t, func() bool { + _, statErr := os.Stat(prunedIdxPath) + return os.IsNotExist(statErr) + }, 10*time.Second, 50*time.Millisecond, "the pruned cold index file is unlinked") + + // getTransaction-style "pruned read is not-found": the frozen coverage key is + // gone, so the read path has no index to resolve the (formerly resolvable) + // chunk-0 hash against — the production reader returns not-found. After prune + // the window has no frozen coverage (ok=false): the read layer's "no coverage + // ⇒ not-found" gate. + _, covOK, err := pruneCat.FrozenTxHashIndex(w0) + require.NoError(t, err) + assert.False(t, covOK, "chunk 0's window coverage is pruned ⇒ a chunk-0 hash read is not-found") + + waitClean(t, cancel3, done3) + +} + +// ============================================================================ +// helpers +// ============================================================================ + +// e2eReadCatalog binds a Catalog over a SEPARATE metastore handle on the +// daemon's data dir, with the same one-chunk window the daemon config pins, for +// read-only inspection BETWEEN daemon runs (the metastore is RocksDB-primary / +// exclusive-LOCK, so this MUST be closed via the returned close func before the +// next daemon run reopens it). +func e2eReadCatalog(t *testing.T, dataDir string) (*catalog.Catalog, func()) { + t.Helper() + paths := Config{Service: ServiceConfig{DefaultDataDir: dataDir}}.WithDefaults().ResolvePaths() + store, err := openMetaAt(t, paths.Catalog) + require.NoError(t, err) + windows, err := geometry.NewTxHashIndexLayout(1) // matches chunks_per_txhash_index = 1 + require.NoError(t, err) + return catalog.NewCatalog(store, NewLayoutFromPaths(paths), windows), func() { _ = store.Close() } +} + +// mustDeriveWatermark derives the durable watermark through the production probe. +func mustDeriveWatermark(t *testing.T, cat *catalog.Catalog) uint32 { + t.Helper() + wm, err := deriveWatermark(cat, NewRocksHotProbe(cat.Layout().HotChunkPath, silentLogger())) + require.NoError(t, err) + return wm +} + +// The E2E reuses observability_test.go's recordingMetrics (a full Metrics sink) +// and its snapshotBoundaries / snapshotFreezeCount snapshot helpers. diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/observability_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/observability_test.go index 40dfc4081..58c2d6b72 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/observability_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/observability_test.go @@ -448,7 +448,7 @@ func TestBackfill_ReportsPassAndProgress(t *testing.T) { // complete chunk at tip]. tipLedger := chunk.ID(3).LastLedger() + 5 tip := &fakeTipBackend{tips: []uint32{tipLedger}} - start := startTestConfig(t, cat, tip, rp) + start := startTestConfig(t, cat, tip, &fakeCore{}, rp) metrics := newRecordingMetrics() start.Exec.Metrics = metrics diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go b/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go index 98fe94353..b741b2b3d 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/startup.go @@ -4,32 +4,45 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/streaming/geometry" ) -// startStreaming is the cold-only Phase-1 backfill daemon's startup -// orchestration — the design's "Daemon flow -> Startup", reduced to: +// startStreaming is the daemon's startup orchestration — the design's "Daemon +// flow -> Startup", in two steps: // // 1. CATCH UP via backfill. Bring on-disk coverage in line with the retention // window: each pass backfills up through the last complete chunk at the -// network tip, re-passing while new chunks appear, with one exclusion — a -// mid-chunk watermark within one chunk of the tip leaves the partial resume -// chunk alone. There is no upfront producibility gate. +// network tip, re-passing while new chunks appear at the tip, with one +// exclusion — a mid-chunk watermark within one chunk of the tip leaves the +// partial resume chunk to ingestion (core replays its tail faster than a +// bulk refetch, and a mid-chunk watermark can only have come from the live +// hot DB, so the data is local by construction). runBackfill is the SAME +// resolve + executePlan the lifecycle tick uses (Phase B); there is no +// upfront producibility gate — each chunk's producibility is enforced +// lazily during its build by the buildTxhashIndex .bin precondition. // -// 2. SERVE. Begin serving reads (injected) and return. The cold-only daemon has -// no hot tier, captive core, or live ingestion loop. +// 2. SERVE + INGEST. Open the resume chunk's hot DB (Issue 10), start captive +// core (injected), launch the lifecycle goroutine (Issue 11) on a doorbell, +// start serving reads (injected), and run the ingestion loop (Issue 10). +// The ingestion loop's first act is a doorbell ring, so the first lifecycle +// tick doubles as startup convergence (finishing crash leftovers + pruning +// downtime leftovers concurrently with early serving). // -// Everything startup cannot construct itself crosses an INJECTED interface -// (StartConfig.NetworkTip, .ServeReads), so it is unit-testable without a real -// backend or RPC server. RunDaemon calls validateConfig (which pins -// earliest_ledger) BEFORE this; startStreaming reads the pin back. +// EVERYTHING the daemon needs that startup cannot construct itself crosses an +// INJECTED interface (StartConfig.NetworkTip, .Core, .ServeReads), so this is +// unit-testable without captive core, a real bulk backend, or a real RPC +// server. validateConfig (the full TOML form) is Phase D; this accepts an +// already-resolved StartConfig and the pinned earliest_ledger is read from the +// catalog. // -// It returns nil only on a clean shutdown (ctx cancelled) or a clean ServeReads -// return; any other return is restartable and surfaces to the supervisor -// (ErrFirstStartNoTip on a true first start with no reachable backend). +// It returns nil only on a clean shutdown (ctx cancelled mid-run, or the +// ingestion loop's clean stop); any other return is a restartable error the +// daemon's top-level loop surfaces (ErrFirstStartNoTip on a true first start +// with no reachable backend; a backfill/ingest failure; ErrHotVolumeLost). func startStreaming(ctx context.Context, cfg StartConfig) error { if err := cfg.validate(); err != nil { return err @@ -54,14 +67,17 @@ func startStreaming(ctx context.Context, cfg StartConfig) error { } // Derived, never stored: the highest ledger durably committed (frozen cold - // artifacts), clamped by earliest-1. A pure catalog read — no hot tier. - lastCommitted, err := lastCommittedLedger(cat, nil) + // artifacts vs the highest ready hot DB's max committed seq, clamped by + // earliest-1). With a probe it does ONE read of the highest ready hot DB and + // detects hot-volume loss LAZILY on that open (ErrHotVolumeLost) before + // ingestion ever opens a writer. + lastCommitted, err := lastCommittedLedger(cat, cfg.Exec.Process.HotProbe) if err != nil { return fmt.Errorf("streaming: startup derive watermark: %w", err) } metrics := cfg.Exec.metrics() - metrics.Watermark(lastCommitted, effectiveRetentionFloor(lastCommitted, cfg.RetentionChunks, earliest)) + metrics.Watermark(lastCommitted, effectiveRetentionFloor(lastCommitted, cfg.Lifecycle.RetentionChunks, earliest)) logger.WithField("last_committed", lastCommitted). WithField("earliest", earliest). WithField("pinned", pinned). @@ -74,20 +90,89 @@ func startStreaming(ctx context.Context, cfg StartConfig) error { } logger.WithField("last_committed", lastCommitted). - Info("streaming: catch-up complete — handing off to the read server") + WithField("resume_chunk", chunk.IDFromLedger(lastCommitted+1).String()). + Info("streaming: catch-up complete — opening resume hot tier and ingesting") + + // Step 2: serve + ingest. resumeLedger is one past the watermark — the live + // chunk's next un-committed ledger (or the chunk's first ledger on an empty + // resume DB; runIngestionLoop re-derives the exact resume point from durable + // state, so a lastCommitted that lands mid-chunk and a lastCommitted on a + // chunk boundary both resume correctly). + resumeLedger := lastCommitted + 1 + resumeChunk := chunk.IDFromLedger(resumeLedger) + + hotDB, err := openHotTierForChunk(cat, resumeChunk, logger) + if err != nil { + return fmt.Errorf("streaming: startup open resume hot tier chunk %s: %w", resumeChunk, err) + } + + // Start captive core from the resume ledger. On failure the resume hot DB is + // already open; close it so a restart re-opens cleanly (the bracket is + // idempotent, but the rocksdb LOCK must be released). + core, closeCore, err := cfg.Core.OpenCore(ctx, resumeLedger) + if err != nil { + _ = hotDB.Close() + return fmt.Errorf("streaming: startup start captive core at ledger %d: %w", resumeLedger, err) + } + defer func() { + if closeCore != nil { + _ = closeCore() + } + }() + + // The lifecycle goroutine runs one tick per notification, carrying the just- + // completed chunk id. Buffered to lifecycleQueueDepth; the ingestion loop + // sends at every chunk boundary. It shares NO in-memory state with ingestion — + // it derives everything from durable keys. + lifecycleCh := make(chan chunk.ID, lifecycleQueueDepth) + + // Seed the first tick with the last complete chunk at the resume point so its + // run fires at once — clearing crash/downtime leftovers concurrently with + // serving (the design's startup seed: geometry.LastCompleteChunkAt(resumeLedger - 1)). + // Skipped on a young network where no chunk is complete (nothing to converge; + // the first real boundary triggers the first tick). + if seed := geometry.LastCompleteChunkAt(lastCommitted); seed >= 0 { + lifecycleCh <- chunk.ID(seed) //nolint:gosec // seed >= 0 + } - // Step 2: serve. The cold-only daemon finishes after catch-up + serve. - // ServeReads launches the read server (injected); its error is restartable. + // The lifecycle goroutine is tied to a PER-ITERATION child ctx, not the + // daemon-lifetime ctx, and is cancelled + JOINED before startStreaming returns + // for ANY reason — restoring the design's single-lifecycle-goroutine invariant + // across supervisor restarts (a daemon-ctx-tied loop would survive a + // restartable return and run a tick concurrently with the next iteration's + // lifecycle + ingestion: two RunColdChunk passes truncating the same + // .pack/.idx). runLifecycleTick checks ctx at every step and executePlan + // returns on cancellation, so the join cannot block past the current step. + lifecycleCtx, cancelLifecycle := context.WithCancel(ctx) + var lifecycleWG sync.WaitGroup + lifecycleWG.Add(1) + go func() { + defer lifecycleWG.Done() + lifecycleLoop(lifecycleCtx, cfg.Lifecycle, cat, lifecycleCh) + }() + // The two return paths registered after this defer (the ingestion-loop return + // and the ServeReads error path) have no live sender on lifecycleCh — ingestion + // is a same-goroutine call whose inline notify has stopped, and the serve path + // never starts it — so no send can race the cancel. The earlier error paths + // return before this defer and before the goroutine starts. + defer func() { + cancelLifecycle() + lifecycleWG.Wait() + }() + + // Begin serving reads (injected). Serve-readiness is established by step 1 + // plus the resume chunk's hot DB just opened — crash debris and downtime + // leftovers are reader-invisible, so the first tick clears them concurrently + // with serving rather than ahead of it. if err := cfg.ServeReads(ctx); err != nil { + _ = hotDB.Close() return fmt.Errorf("streaming: startup serve reads: %w", err) } - // In cold-only Phase 1 the production ServeReads is a no-op (reads still come - // from the v1 SQLite daemon until the #772 cutover), so it returns at once and - // the daemon exits cleanly HERE — an immediate clean exit after catch-up is - // expected, not a misconfiguration. - logger.WithField("last_committed", lastCommitted). - Info("streaming: read server returned — cold-only daemon shutting down cleanly") - return nil + + // The ingestion loop owns hotDB for the rest of its life (it closes it on any + // exit and reopens at each boundary). Returns the GetLedger/boundary error; + // the daemon top level classifies a ctx-cancelled return as a clean shutdown. + return runIngestionLoop(ctx, core, hotDB, cat, lifecycleCh, allHotTypes, logger, metrics, cfg.Exec.Process.Sink) } // catchUp runs the design's catch-up loop, returning lastCommitted as backfill @@ -97,7 +182,7 @@ func startStreaming(ctx context.Context, cfg StartConfig) error { // with the mid-chunk exclusion, and breaks on an empty/already-done range. // backfilledThrough breaks the loop once rangeEnd stops advancing. func catchUp(ctx context.Context, cfg StartConfig, lastCommitted, earliest uint32) (uint32, error) { - retentionChunks := cfg.RetentionChunks + retentionChunks := cfg.Lifecycle.RetentionChunks metrics := cfg.Exec.metrics() logger := cfg.Exec.Logger @@ -203,33 +288,52 @@ func maxU32(a, b uint32) uint32 { return max(a, b) } var ErrFirstStartNoTip = errors.New("streaming: network tip unavailable and no local history to serve") // --------------------------------------------------------------------------- -// Injected external boundaries: the network tip and the read server both cross -// an interface, so startup is exercised end to end with fakes. +// Injected external boundaries. startStreaming touches NOTHING outside the +// process directly: the network tip, captive core, and the read server all +// cross an interface so startup is exercised end to end with fakes. // --------------------------------------------------------------------------- -// NetworkTipBackend samples the bulk backend's current network tip (the highest -// ledger it can serve), consulted only during catch-up. Production wraps the -// daemon's LedgerBackend; tests pass a reachable/unreachable/unready fake. +// NetworkTipBackend samples the configured bulk backend's current network tip +// (the highest ledger the backend can serve). Production wraps the daemon's +// LedgerBackend; tests pass a fake that is reachable / unreachable / unready. +// It is consulted only during catch-up; once ingestion runs, captive core is +// the tip. type NetworkTipBackend interface { NetworkTip(ctx context.Context) (uint32, error) } -// StartConfig is startStreaming's resolved dependency bundle: the scheduler -// config, the catch-up floor width, the injected boundaries, and the networkTip -// backoff bounds. The full daemon Config is a superset assembled at the call +// CoreOpener prepares captive core at resumeLedger and hands back a LedgerGetter +// the ingestion loop polls plus a closer the caller defers. Production wraps +// captive core's PrepareRange + GetLedger; tests pass a fake getter. The closer +// tears down the backend on daemon exit. +type CoreOpener interface { + OpenCore(ctx context.Context, resumeLedger uint32) (LedgerGetter, func() error, error) +} + +// StartConfig is startStreaming's resolved dependency bundle. It composes the +// scheduler/lifecycle configs (so catch-up and the lifecycle goroutine share one +// catalog, worker pool, and retention floor) and the three injected external +// boundaries, plus the networkTip backoff bounds. The full daemon Config +// (TOML-parsed paths, captive-core toml, …) is a superset assembled at the call // site; only what startup reads lives here. type StartConfig struct { // Exec drives catch-up's runBackfill. Its Catalog/Logger are the shared ones. Exec ExecConfig - // RetentionChunks is the catch-up floor's width. 0 ⇒ the earliest-ledger floor only. - RetentionChunks uint32 + // Lifecycle drives the lifecycle goroutine. Its embedded ExecConfig should be + // the SAME wiring as Exec (one catalog, one pool); RetentionChunks is the + // catch-up floor's width too. + Lifecycle LifecycleConfig // NetworkTip samples the bulk backend's tip during catch-up. Required. NetworkTip NetworkTipBackend - // ServeReads begins serving reads. It must return promptly (it launches the - // server, not block until shutdown). Required. + // Core starts captive core and yields the ingestion getter. Required. + Core CoreOpener + + // ServeReads begins serving reads (the RPC server). It must return promptly + // (it launches the server; it does not block until shutdown) — startup + // proceeds to the blocking ingestion loop after it returns. Required. ServeReads func(ctx context.Context) error // TipBackoff is networkTip's inter-attempt sleep; TipMaxAttempts bounds the @@ -244,10 +348,12 @@ const ( defaultTipMaxAttempts = 5 ) -// withDefaults fills the tip-backoff defaults and the embedded ExecConfig -// defaults (Workers -> GOMAXPROCS). +// withDefaults fills the worker-pool / lifecycle / tip-backoff defaults. The +// embedded ExecConfig defaults (Workers -> GOMAXPROCS) and the LifecycleConfig +// Fatalf default are applied so a caller need not. func (cfg StartConfig) withDefaults() StartConfig { cfg.Exec = cfg.Exec.WithDefaults() + cfg.Lifecycle = cfg.Lifecycle.WithLifecycleDefaults() if cfg.TipBackoff <= 0 { cfg.TipBackoff = defaultTipBackoff } @@ -264,9 +370,15 @@ func (cfg StartConfig) validate() error { if cfg.Exec.Logger == nil { return errors.New("streaming: StartConfig.Exec.Logger is nil") } + if cfg.Exec.Process.HotProbe == nil { + return errors.New("streaming: StartConfig.Exec.Process.HotProbe is nil (watermark derivation needs it)") + } if cfg.NetworkTip == nil { return errors.New("streaming: StartConfig.NetworkTip is nil") } + if cfg.Core == nil { + return errors.New("streaming: StartConfig.Core is nil") + } if cfg.ServeReads == nil { return errors.New("streaming: StartConfig.ServeReads is nil") } diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go index a342d7680..89544b396 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/startup_test.go @@ -13,6 +13,7 @@ import ( "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/streaming/catalog" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/streaming/geometry" ) // --------------------------------------------------------------------------- @@ -56,6 +57,30 @@ func (b *fakeTipBackend) callCount() int { return b.calls } +// fakeCore is a CoreOpener handing back a programmed LedgerGetter and recording +// the resume ledger it was started from. +type fakeCore struct { + getter LedgerGetter + openErr error + resumeSeen atomic.Uint32 + openedCount atomic.Int32 +} + +func (c *fakeCore) OpenCore(_ context.Context, resumeLedger uint32) (LedgerGetter, func() error, error) { + c.openedCount.Add(1) + c.resumeSeen.Store(resumeLedger) + if c.openErr != nil { + return nil, nil, c.openErr + } + getter := c.getter + if getter == nil { + // Default: a live getter that blocks until ctx is cancelled (the daemon's + // steady state). Tests that need a finite poll set c.getter. + getter = &fakeLedgerGetter{frames: map[uint32][]byte{}, blockOnCtx: true} + } + return getter, func() error { return nil }, nil +} + // recordingPlan captures the (rangeStart, rangeEnd) every backfill pass asked // for, via the ExecConfig runChunk/runIndex test seams — so a backfill test // asserts the loop's range arithmetic without real cold I/O. Because resolve @@ -103,12 +128,12 @@ func (r *recordingPlan) snapshot() [][2]chunk.ID { return out } -// startTestConfig builds a cold StartConfig over a real catalog (genesis floor -// pinned to GenesisLedger by default) with all external boundaries faked. -// recordPlan, when non-nil, wires the runChunk/runIndex seams so backfill passes -// are recorded without cold I/O. +// startTestConfig builds a StartConfig over a real catalog (genesis floor pinned +// to GenesisLedger by default) with all external boundaries faked. recordPlan, +// when non-nil, wires the runChunk/runIndex seams so backfill passes are +// recorded without cold I/O. func startTestConfig( - t *testing.T, cat *catalog.Catalog, tip *fakeTipBackend, recordPlan *recordingPlan, + t *testing.T, cat *catalog.Catalog, tip *fakeTipBackend, core *fakeCore, recordPlan *recordingPlan, ) StartConfig { t.Helper() exec := ExecConfig{ @@ -116,7 +141,8 @@ func startTestConfig( Logger: silentLogger(), Workers: 2, Process: ProcessConfig{ - Backend: zeroTxBackend(t), + HotProbe: NewRocksHotProbe(cat.Layout().HotChunkPath, silentLogger()), + Backend: zeroTxBackend(t), }, } if recordPlan != nil { @@ -126,13 +152,15 @@ func startTestConfig( } exec.runIndex = func(_ context.Context, _ IndexBuild, _ ExecConfig) error { return nil } } + life := LifecycleConfig{ExecConfig: exec, RetentionChunks: 0, Fatalf: (&fatalRecorder{}).fatalf} return StartConfig{ - Exec: exec, - RetentionChunks: 0, - NetworkTip: tip, - ServeReads: func(context.Context) error { return nil }, - TipBackoff: time.Millisecond, - TipMaxAttempts: 3, + Exec: exec, + Lifecycle: life, + NetworkTip: tip, + Core: core, + ServeReads: func(context.Context) error { return nil }, + TipBackoff: time.Millisecond, + TipMaxAttempts: 3, } } @@ -190,7 +218,7 @@ func TestBackfill_FirstStartTipAbsentFatal(t *testing.T) { cat, _ := testCatalog(t) pinGenesis(t, cat) tip := &fakeTipBackend{err: errors.New("backend unreachable"), errFirst: 99} - cfg := startTestConfig(t, cat, tip, &recordingPlan{}) + cfg := startTestConfig(t, cat, tip, &fakeCore{}, &recordingPlan{}) // lastCommitted = deriveWatermark over an empty catalog = preGenesisLedger (1); // earliest = GenesisLedger (2); 1 < 2 ⇒ first start with no progress. @@ -208,7 +236,7 @@ func TestBackfill_FirstStartTipPresentComputesRange(t *testing.T) { tipLedger := chunk.ID(3).FirstLedger() + 100 rec := &recordingPlan{} tip := &fakeTipBackend{tips: []uint32{tipLedger}} - cfg := startTestConfig(t, cat, tip, rec) + cfg := startTestConfig(t, cat, tip, &fakeCore{}, rec) last, err := catchUp(context.Background(), cfg, preGenesisLedger, chunk.FirstLedgerSeq) require.NoError(t, err) @@ -230,7 +258,7 @@ func TestBackfill_YoungNetworkNoOp(t *testing.T) { // Tip inside chunk 0 (no chunk has fully closed yet). tip := &fakeTipBackend{tips: []uint32{chunk.FirstLedgerSeq + 50}} rec := &recordingPlan{} - cfg := startTestConfig(t, cat, tip, rec) + cfg := startTestConfig(t, cat, tip, &fakeCore{}, rec) last, err := catchUp(context.Background(), cfg, preGenesisLedger, chunk.FirstLedgerSeq) require.NoError(t, err) @@ -257,7 +285,7 @@ func TestBackfill_SteadyRestartNoOp(t *testing.T) { tipLedger := chunk.ID(3).FirstLedger() + 10 // last complete chunk == 2 rec := &recordingPlan{} tip := &fakeTipBackend{tips: []uint32{tipLedger}} - cfg := startTestConfig(t, cat, tip, rec) + cfg := startTestConfig(t, cat, tip, &fakeCore{}, rec) last, err := catchUp(context.Background(), cfg, watermark, chunk.FirstLedgerSeq) require.NoError(t, err) @@ -289,7 +317,7 @@ func TestBackfill_MidChunkResumeExclusion(t *testing.T) { tipLedger := chunk.ID(5).LastLedger() // within one chunk, but chunk 5 complete-at-tip rec := &recordingPlan{} tip := &fakeTipBackend{tips: []uint32{tipLedger}} - cfg := startTestConfig(t, cat, tip, rec) + cfg := startTestConfig(t, cat, tip, &fakeCore{}, rec) last, err := catchUp(context.Background(), cfg, watermark, chunk.FirstLedgerSeq) require.NoError(t, err) @@ -331,7 +359,7 @@ func TestBackfill_LongDowntimeRePass(t *testing.T) { Catalog: cat, Logger: silentLogger(), Workers: 2, - Process: ProcessConfig{Backend: zeroTxBackend(t)}, + Process: ProcessConfig{HotProbe: NewRocksHotProbe(cat.Layout().HotChunkPath, silentLogger()), Backend: zeroTxBackend(t)}, runChunk: func(_ context.Context, cb ChunkBuild, _ ExecConfig) error { mu.Lock() allChunks = append(allChunks, cb.Chunk) @@ -342,7 +370,9 @@ func TestBackfill_LongDowntimeRePass(t *testing.T) { } cfg := StartConfig{ Exec: exec, + Lifecycle: LifecycleConfig{ExecConfig: exec, Fatalf: (&fatalRecorder{}).fatalf}, NetworkTip: tip, + Core: &fakeCore{}, ServeReads: func(context.Context) error { return nil }, TipBackoff: time.Millisecond, TipMaxAttempts: 3, @@ -377,7 +407,7 @@ func TestBackfill_RestartTipUnreachableDegrades(t *testing.T) { watermark := chunk.ID(2).LastLedger() // local progress exists tip := &fakeTipBackend{err: errors.New("backend down"), errFirst: 99} rec := &recordingPlan{} - cfg := startTestConfig(t, cat, tip, rec) + cfg := startTestConfig(t, cat, tip, &fakeCore{}, rec) last, err := catchUp(context.Background(), cfg, watermark, chunk.FirstLedgerSeq) require.NoError(t, err, "local progress means no fatal") @@ -408,7 +438,7 @@ func TestBackfill_LaggingBulkTipFoldsWatermarkChunk(t *testing.T) { tipLedger := chunk.ID(3).FirstLedger() + 10 // lagging bulk tip in chunk 3 (last complete 2) rec := &recordingPlan{} tip := &fakeTipBackend{tips: []uint32{tipLedger}} - cfg := startTestConfig(t, cat, tip, rec) + cfg := startTestConfig(t, cat, tip, &fakeCore{}, rec) last, err := catchUp(context.Background(), cfg, watermark, chunk.FirstLedgerSeq) require.NoError(t, err) @@ -423,52 +453,98 @@ func TestBackfill_LaggingBulkTipFoldsWatermarkChunk(t *testing.T) { } // --------------------------------------------------------------------------- -// startStreaming — the cold-only catch-up + serve flow. +// startStreaming — the full serve+ingest handoff (clean shutdown). // --------------------------------------------------------------------------- // A genesis first start with a tip inside chunk 0 (young network) does no -// backfill, then serves reads. The cold-only daemon has no hot tier or live -// ingestion loop: startStreaming returns nil once ServeReads returns with no -// error. -func TestStartStreaming_FirstStartCatchUpThenServe(t *testing.T) { +// backfill, opens the resume chunk's hot DB, starts the (blocking) fake core +// getter, serves reads, and runs the ingestion loop — which returns the ctx- +// cancelled GetLedger error when ctx is cancelled. The clean-shutdown +// classification now lives at the daemon top level (superviseStreaming treats a +// ctx-cancelled return as clean), so startStreaming surfaces the wrapped +// context.Canceled. The resume ledger is genesis. +func TestStartStreaming_FirstStartServeIngestCleanShutdown(t *testing.T) { cat, _ := testCatalog(t) pinGenesis(t, cat) served := atomic.Int32{} + // Live getter: blocks until ctx cancel (the daemon's steady state). + core := &fakeCore{getter: &fakeLedgerGetter{frames: map[uint32][]byte{}, blockOnCtx: true}} tip := &fakeTipBackend{tips: []uint32{chunk.FirstLedgerSeq + 10}} // young: no backfill - cfg := startTestConfig(t, cat, tip, nil) + cfg := startTestConfig(t, cat, tip, core, nil) cfg.ServeReads = func(context.Context) error { served.Add(1); return nil } - require.NoError(t, startStreaming(context.Background(), cfg)) + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error, 1) + go func() { errCh <- startStreaming(ctx, cfg) }() + + // Give the loop time to open the hot DB, start core, serve, and park on the + // blocking getter, then request a clean shutdown. + require.Eventually(t, func() bool { return served.Load() == 1 }, 2*time.Second, 5*time.Millisecond) + cancel() + + select { + case err := <-errCh: + // The ingestion loop surfaces the ctx-cancelled GetLedger error; the daemon + // top level (superviseStreaming) classifies a ctx-cancelled return as clean. + require.ErrorIs(t, err, context.Canceled, "clean shutdown surfaces the ctx-cancelled error") + case <-time.After(3 * time.Second): + t.Fatal("startStreaming did not return after ctx cancel") + } + require.Equal(t, int32(1), served.Load(), "reads were served exactly once") + require.Equal(t, int32(1), core.openedCount.Load(), "captive core started once") + require.Equal(t, uint32(chunk.FirstLedgerSeq), core.resumeSeen.Load(), + "resume ledger is genesis on a fresh start (watermark+1)") + + // The resume chunk's hot key is "ready" (the loop opened it and the boundary + // was never crossed). + state, err := cat.HotState(chunk.IDFromLedger(chunk.FirstLedgerSeq)) + require.NoError(t, err) + assert.Equal(t, geometry.HotReady, state) } -// startStreaming surfaces a ServeReads error wrapped, as a restartable failure. +// A ServeReads error is surfaced wrapped as a restartable failure (NOT clean) and +// the already-opened resume hot DB is closed on the way out, so a restart can +// reopen it (the rocksdb LOCK is released). ServeReads runs after the hot DB +// opens and core starts but before the blocking ingestion loop, so startStreaming +// returns synchronously here. func TestStartStreaming_ServeReadsErrorSurfaces(t *testing.T) { cat, _ := testCatalog(t) pinGenesis(t, cat) - tip := &fakeTipBackend{tips: []uint32{chunk.FirstLedgerSeq + 10}} - cfg := startTestConfig(t, cat, tip, nil) + core := &fakeCore{getter: &fakeLedgerGetter{frames: map[uint32][]byte{}, blockOnCtx: true}} + tip := &fakeTipBackend{tips: []uint32{chunk.FirstLedgerSeq + 10}} // young: no backfill + cfg := startTestConfig(t, cat, tip, core, nil) cfg.ServeReads = func(context.Context) error { return errors.New("rpc bind failed") } err := startStreaming(context.Background(), cfg) require.Error(t, err) require.Contains(t, err.Error(), "serve reads") + require.NotErrorIs(t, err, context.Canceled, "a ServeReads error is restartable, not a clean shutdown") + require.Equal(t, int32(1), core.openedCount.Load(), "core was started before serving") + + // The resume hot DB was closed on the error path (LOCK released): reopening it + // succeeds. Its key is still "ready" — only the handle was released, not the DB. + state, err := cat.HotState(chunk.IDFromLedger(chunk.FirstLedgerSeq)) + require.NoError(t, err) + require.Equal(t, geometry.HotReady, state) + db, err := openHotTierForChunk(cat, chunk.IDFromLedger(chunk.FirstLedgerSeq), silentLogger()) + require.NoError(t, err, "the resume hot DB is reopenable — startStreaming released its LOCK") + require.NoError(t, db.Close()) } // startStreaming fatals on a true first start when the tip is unavailable: the -// error is ErrFirstStartNoTip and reads are never served. +// error is ErrFirstStartNoTip and NEITHER the hot DB nor core is opened. func TestStartStreaming_FirstStartNoTipFatal(t *testing.T) { cat, _ := testCatalog(t) pinGenesis(t, cat) - served := atomic.Int32{} + core := &fakeCore{} tip := &fakeTipBackend{err: errors.New("unreachable"), errFirst: 99} - cfg := startTestConfig(t, cat, tip, nil) - cfg.ServeReads = func(context.Context) error { served.Add(1); return nil } + cfg := startTestConfig(t, cat, tip, core, nil) err := startStreaming(context.Background(), cfg) require.ErrorIs(t, err, ErrFirstStartNoTip) - require.Zero(t, served.Load(), "reads are never served when catch-up fatals") + require.Zero(t, core.openedCount.Load(), "core is never started when backfill fatals") } // startStreaming surfaces a missing earliest_ledger pin loudly (validateConfig @@ -477,7 +553,7 @@ func TestStartStreaming_FirstStartNoTipFatal(t *testing.T) { func TestStartStreaming_RequiresEarliestPin(t *testing.T) { cat, _ := testCatalog(t) // No pinGenesis. - cfg := startTestConfig(t, cat, &fakeTipBackend{tips: []uint32{50_000}}, nil) + cfg := startTestConfig(t, cat, &fakeTipBackend{tips: []uint32{50_000}}, &fakeCore{}, nil) err := startStreaming(context.Background(), cfg) require.Error(t, err) require.Contains(t, err.Error(), "earliest_ledger pinned") @@ -486,18 +562,28 @@ func TestStartStreaming_RequiresEarliestPin(t *testing.T) { // startStreaming validates its injected boundaries. func TestStartStreaming_ValidatesConfig(t *testing.T) { cat, _ := testCatalog(t) - base := startTestConfig(t, cat, &fakeTipBackend{tips: []uint32{50_000}}, nil) + base := startTestConfig(t, cat, &fakeTipBackend{tips: []uint32{50_000}}, &fakeCore{}, nil) t.Run("nil NetworkTip", func(t *testing.T) { cfg := base cfg.NetworkTip = nil require.Error(t, startStreaming(context.Background(), cfg)) }) + t.Run("nil Core", func(t *testing.T) { + cfg := base + cfg.Core = nil + require.Error(t, startStreaming(context.Background(), cfg)) + }) t.Run("nil ServeReads", func(t *testing.T) { cfg := base cfg.ServeReads = nil require.Error(t, startStreaming(context.Background(), cfg)) }) + t.Run("nil HotProbe", func(t *testing.T) { + cfg := base + cfg.Exec.Process.HotProbe = nil + require.Error(t, startStreaming(context.Background(), cfg)) + }) } // ---------------------------------------------------------------------------