Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 109 additions & 19 deletions cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

"github.com/stellar/go-stellar-sdk/ingest/ledgerbackend"
supportlog "github.com/stellar/go-stellar-sdk/support/log"
"github.com/stellar/go-stellar-sdk/xdr"

"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/daemon/interfaces"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/ingest"
Expand Down Expand Up @@ -97,6 +98,10 @@
// deployment that never backfills.
Backend ingest.ChunkSource

// Core starts captive core at the resume ledger and yields the live getter
// the ingestion loop polls. Required.
Core CoreOpener

// ServeReads launches the RPC read server (it must return promptly, not block
// until shutdown). Required.
//
Expand All @@ -112,6 +117,9 @@
if b.NetworkTip == nil {
return errors.New("streaming: Boundaries.NetworkTip is nil")
}
if b.Core == nil {
return errors.New("streaming: Boundaries.Core is nil")
}
if b.ServeReads == nil {
return errors.New("streaming: Boundaries.ServeReads is nil")
}
Expand Down Expand Up @@ -207,7 +215,10 @@
}

// startConfig threads the loaded Config, the bound catalog/logger, and the
// assembled boundaries into the StartConfig startStreaming consumes.
// assembled boundaries into the StartConfig startStreaming consumes. The Exec
// and Lifecycle bundles share ONE catalog, worker pool, and retention floor (the
// design's "catch-up and the lifecycle goroutine share one set of
// postconditions"), so Lifecycle embeds the same ExecConfig.
func startConfig(
cfg Config, cat *catalog.Catalog, logger *supportlog.Entry, b Boundaries, metrics Metrics,
sink ingest.MetricSink, tipBackoff time.Duration, tipMaxAttempts int,
Expand All @@ -225,22 +236,29 @@
HotProbe: NewRocksHotProbe(cat.Layout().HotChunkPath, logger),
},
}
return StartConfig{
Exec: exec,
life := LifecycleConfig{
ExecConfig: exec,
RetentionChunks: derefU32(cfg.Retention.RetentionChunks),
NetworkTip: b.NetworkTip,
ServeReads: b.ServeReads,
TipBackoff: tipBackoff,
TipMaxAttempts: tipMaxAttempts,
}
return StartConfig{
Exec: exec,
Lifecycle: life,
NetworkTip: b.NetworkTip,
Core: b.Core,
ServeReads: b.ServeReads,
TipBackoff: tipBackoff,
TipMaxAttempts: tipMaxAttempts,
}
}

// superviseStreaming is the daemon's top-level loop: it runs startStreaming and
// restarts it on a restartable error after a backoff ("startup is the recovery
// path"). A clean shutdown or a ctx cancel during the backoff returns nil.
//
// It does NOT swallow the fatal sentinel ErrFirstStartNoTip — that surfaces UP,
// since the retry is only for transient failures a fresh start converges.
// It does NOT swallow the fatal sentinels (ErrHotVolumeLost, ErrFirstStartNoTip):
// those are returned UP so an operator/supervisor sees them. The retry here is
// for transient restartable failures (a backfill/ingest hiccup, a captive core
// crash) where a fresh start converges; the unrecoverable ones surface.
func superviseStreaming(
ctx context.Context, start StartConfig, logger *supportlog.Entry, backoff time.Duration,
) error {
Expand All @@ -254,7 +272,7 @@
}
// Unrecoverable: surface up rather than spin restarting on a condition a
// fresh start cannot heal.
if errors.Is(err, ErrFirstStartNoTip) {
if errors.Is(err, ErrHotVolumeLost) || errors.Is(err, ErrFirstStartNoTip) {
return err
}
logger.WithError(err).Warnf("streaming: daemon run failed; restarting in %s", backoff)
Expand All @@ -275,20 +293,37 @@
// buildProductionBoundaries assembles the real external boundaries from the
// loaded config.
//
// TODO(#772): the bulk-backend tip boundary is still entangled with config that
// does not yet exist on this branch (the datastore type + schema — only
// [backfill.bsb].bucket_path is in Config today) and with the v1 path's lake
// tip-resolution. Until #772 lands the cutover, a deployment needing catch-up
// against a real lake must wire NetworkTip/BackendWaiter/Backend through
// DaemonOptions.BuildBoundaries; this supplies a tip adapter that errors clearly
// when no bulk backend is configured, so a frontfill deployment runs unchanged.
// - Core: captive stellar-core via newCaptiveCoreOpener; OpenCore calls
// PrepareRange(UnboundedRange(resume)) and hands back a LedgerGetter the
// ingestion loop polls by sequence (the design's core.GetLedger(ctx, seq)),
// plus a closer. The config plumbing is deferred (TODO below), so today the
// constructor errors with a #772 pointer.
// - Backend: the bulk datastore ChunkSource (NewDataStoreSource) when a bucket
// path is configured; nil for a frontfill-only deployment.
// - NetworkTip / BackendWaiter: an adapter over the bulk backend's tip.
//
// TODO(#772): both the captive-core config (binary path, passphrase, archives —
// see newCaptiveCoreOpener) and the bulk-backend TIP boundary (the datastore
// TYPE + schema; only [backfill.bsb].bucket_path is in Config today) are
// entangled with config that does not yet exist on this branch and with the lake
// tip-resolution the v1 path performs differently. Until #772 lands the cutover,
// a deployment must wire Core (and, for catch-up against a real lake, NetworkTip/
// BackendWaiter/Backend) through DaemonOptions.BuildBoundaries; the tip adapter
// here errors clearly when no bulk backend is configured, so a frontfill
// ("genesis" or "now" with no backfill) deployment runs unchanged.
func buildProductionBoundaries(
_ context.Context, _ Config, _ Paths, _ *catalog.Catalog, _ *supportlog.Entry,
_ context.Context, cfg Config, _ Paths, _ *catalog.Catalog, logger *supportlog.Entry,
) (Boundaries, error) {
core, err := newCaptiveCoreOpener(cfg.Ingestion.CaptiveCoreConfig, logger)
if err != nil {
return Boundaries{}, err
}

b := Boundaries{
Core: core,
// TODO(#772): wire the full-history RPC read server. The SQLite read path
// is still the v1 daemon's; until the #772 cutover, serving is a no-op here
// so the streaming daemon catches up + freezes without double-serving reads.
// so the streaming daemon ingests + freezes without double-serving reads.
ServeReads: func(context.Context) error { return nil },
}

Expand All @@ -302,6 +337,59 @@
return b, nil
}

// captiveCoreOpener is the production CoreOpener: it prepares captive core at the
// resume ledger and hands back a LedgerGetter the ingestion loop polls by
// sequence (the design's core.GetLedger(ctx, seq)) plus a closer.
type captiveCoreOpener struct {
backend ledgerbackend.LedgerBackend
}

func newCaptiveCoreOpener(captiveCoreConfigPath string, logger *supportlog.Entry) (*captiveCoreOpener, error) {

Check failure on line 347 in cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unused-parameter: parameter 'logger' seems to be unused, consider removing or renaming it as _ (revive)
if captiveCoreConfigPath == "" {
return nil, errors.New("streaming: [streaming].captive_core_config is required")
}
// TODO(#772): the captive-core CaptiveCoreConfig (binary path, network
// passphrase, history-archive URLs, storage path) is assembled from the v1
// daemon config today; threading those through the streaming Config is part
// of the cutover. The factory below is the wiring point — once the fields are
// in Config, build a ledgerbackend.CaptiveCoreConfig from
// NewCaptiveCoreTomlFromFile(captiveCoreConfigPath, ...) and NewCaptive, then
// PrepareRange(UnboundedRange(resume)) in OpenCore. The seam (a LedgerGetter
// behind CoreOpener) is final; only the config plumbing is deferred.
return nil, fmt.Errorf("streaming: production captive-core wiring is deferred to #772 "+
"(config %q parsed; pass a CoreOpener via DaemonOptions.BuildBoundaries to run today)",
captiveCoreConfigPath)
}

// OpenCore prepares the backend over the unbounded range from resumeLedger and
// returns a getter wrapping GetLedger plus the backend's Close.
func (c *captiveCoreOpener) OpenCore(
ctx context.Context, resumeLedger uint32,
) (LedgerGetter, func() error, error) {
if err := c.backend.PrepareRange(ctx, ledgerbackend.UnboundedRange(resumeLedger)); err != nil {
return nil, nil, fmt.Errorf("streaming: captive core prepare range from %d: %w", resumeLedger, err)
}
return backendGetter{backend: c.backend}, c.backend.Close, nil
}

// backendGetter adapts a ledgerbackend.LedgerBackend to LedgerGetter: GetLedger
// blocks until the ledger is available and returns its raw wire bytes.
type backendGetter struct {
backend ledgerbackend.LedgerBackend
}

func (g backendGetter) GetLedger(ctx context.Context, seq uint32) (xdr.LedgerCloseMetaView, error) {
lcm, err := g.backend.GetLedger(ctx, seq)
if err != nil {
return nil, err
}
raw, err := lcm.MarshalBinary()
if err != nil {
return nil, fmt.Errorf("streaming: marshal ledger %d: %w", seq, err)
}
return xdr.LedgerCloseMetaView(raw), nil
}

// notConfiguredTip is the NetworkTipBackend for a deployment with no bulk
// backend configured: every sample returns a clear not-configured error — the
// honest placeholder until #772 wires the real lake tip.
Expand Down Expand Up @@ -394,6 +482,8 @@
// compile-time assertions: the production adapters satisfy the injected
// interfaces startStreaming/processChunk consume.
var (
_ CoreOpener = (*captiveCoreOpener)(nil)
_ LedgerGetter = backendGetter{}
_ NetworkTipBackend = (*backendTip)(nil)
_ BackendWaiter = (*backendTip)(nil)
_ NetworkTipBackend = notConfiguredTip{}
Expand Down
Loading
Loading