diff --git a/core/config/app_config.go b/core/config/app_config.go index 7965d096c73..447d8f2d91f 100644 --- a/core/config/app_config.go +++ b/core/config/app_config.go @@ -6,8 +6,6 @@ import ( "github.com/google/uuid" pkgerrors "github.com/pkg/errors" "go.uber.org/zap/zapcore" - - "github.com/smartcontractkit/chainlink-data-streams/llo/transmitter/de" ) var ( @@ -48,7 +46,7 @@ type AppConfig interface { JobDistributor() JobDistributor JobPipeline() JobPipeline Log() Log - Mercury() de.Mercury + Mercury() Mercury OCR() OCR OCR2() OCR2 P2P() P2P diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index f37e63fd41a..029ecd68106 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -849,6 +849,18 @@ ReaperFrequency = "1h" # Default # stale. Setting to 0 disables the reaper. ReaperMaxAge = "48h" # Default +# Mercury.DataSource controls node-side tuning for the LLO observation data source. +[Mercury.DataSource] +# ObservationTimingBase sets the base duration T used to size the LLO observation loop timing: cache entry TTL (2×T), +# the stale-refresh threshold, the background loop pacing, and the background pipeline timeout. By default (unset/zero), +# T is derived from the plugin's per-round Observe deadline remainder, which is bounded by the on-chain +# `MaxDurationObservation`. At very low on-chain values (e.g. 25ms for high-frequency feeds) that makes cache entries +# expire before the background refresh loop can complete, producing cache misses even when bridge calls succeed. +# Setting this decouples the observation timing from the on-chain deadline without changing the plugin Observe budget itself. +# A value around 30-50ms is a reasonable starting point for high-frequency feeds; larger values widen the refresh +# runway but make cached values staler. +ObservationTimingBase = '50ms' # Example + # Telemetry holds OTEL settings. # This data includes open telemetry metrics, traces, & logs. # It does not currently include prometheus metrics or standard out logs, but may in the future. diff --git a/core/config/mercury_config.go b/core/config/mercury_config.go new file mode 100644 index 00000000000..e84ec6814b5 --- /dev/null +++ b/core/config/mercury_config.go @@ -0,0 +1,23 @@ +package config + +import ( + "time" + + "github.com/smartcontractkit/chainlink-data-streams/llo/transmitter/de" +) + +// Mercury is the node-side view of Mercury config. It embeds the data-streams +// transmitter's Mercury interface and adds node-only sections (e.g. DataSource). +type Mercury interface { + de.Mercury + DataSource() MercuryDataSource +} + +// MercuryDataSource exposes node-side tuning for the LLO observation data source. +type MercuryDataSource interface { + // ObservationTimingBase returns the base duration T used to size the LLO observation + // loop timing: cache entry TTL, stale-refresh threshold, loop pacing, and background + // pipeline timeout. When zero, the plugin Observe deadline remainder is used instead + // (legacy behavior tied to the on-chain MaxDurationObservation). + ObservationTimingBase() time.Duration +} diff --git a/core/config/toml/types.go b/core/config/toml/types.go index 80b8338de4d..bd5a98f42ed 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -1936,6 +1936,7 @@ type Mercury struct { TLS MercuryTLS `toml:",omitempty"` Transmitter MercuryTransmitter `toml:",omitempty"` VerboseLogging *bool `toml:",omitempty"` + DataSource MercuryDataSource `toml:",omitempty"` } func (m *Mercury) setFrom(f *Mercury) { @@ -1945,10 +1946,38 @@ func (m *Mercury) setFrom(f *Mercury) { if v := f.VerboseLogging; v != nil { m.VerboseLogging = v } + m.DataSource.setFrom(&f.DataSource) } func (m *Mercury) ValidateConfig() (err error) { - return m.TLS.ValidateConfig() + err = m.TLS.ValidateConfig() + err = errors.Join(err, m.DataSource.ValidateConfig()) + return err +} + +// MercuryDataSource holds node-side tuning for the LLO/Mercury observation data source. +type MercuryDataSource struct { + ObservationTimingBase *commonconfig.Duration +} + +func (m *MercuryDataSource) setFrom(f *MercuryDataSource) { + if f.ObservationTimingBase != nil { + m.ObservationTimingBase = f.ObservationTimingBase + } +} + +func (m *MercuryDataSource) ValidateConfig() error { + if m.ObservationTimingBase == nil { + return nil + } + if m.ObservationTimingBase.Duration() < 0 { + return configutils.ErrInvalid{ + Name: "ObservationTimingBase", + Value: m.ObservationTimingBase.Duration(), + Msg: "must be non-negative", + } + } + return nil } type MercuryCredentials struct { diff --git a/core/services/chainlink/config_general.go b/core/services/chainlink/config_general.go index fd75e41e5bb..8b46853d423 100644 --- a/core/services/chainlink/config_general.go +++ b/core/services/chainlink/config_general.go @@ -16,7 +16,6 @@ import ( "github.com/smartcontractkit/chainlink-common/keystore/corekeys" "github.com/smartcontractkit/chainlink-common/keystore/corekeys/p2pkey" commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" - "github.com/smartcontractkit/chainlink-data-streams/llo/transmitter/de" evmcfg "github.com/smartcontractkit/chainlink-evm/pkg/config/toml" coreconfig "github.com/smartcontractkit/chainlink/v2/core/config" @@ -549,7 +548,7 @@ func (g *generalConfig) Prometheus() coreconfig.Prometheus { return &prometheusConfig{s: g.secrets.Prometheus} } -func (g *generalConfig) Mercury() de.Mercury { +func (g *generalConfig) Mercury() coreconfig.Mercury { return &mercuryConfig{c: g.c.Mercury, s: g.secrets.Mercury} } diff --git a/core/services/chainlink/config_mercury.go b/core/services/chainlink/config_mercury.go index eb0ad72fb83..15eab152fb1 100644 --- a/core/services/chainlink/config_mercury.go +++ b/core/services/chainlink/config_mercury.go @@ -6,10 +6,15 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types" mercurytransmitter "github.com/smartcontractkit/chainlink-data-streams/llo/transmitter/de" + coreconfig "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/config/toml" ) -var _ mercurytransmitter.MercuryCache = (*mercuryCacheConfig)(nil) +var ( + _ coreconfig.Mercury = (*mercuryConfig)(nil) + _ coreconfig.MercuryDataSource = (*mercuryDataSourceConfig)(nil) + _ mercurytransmitter.MercuryCache = (*mercuryCacheConfig)(nil) +) type mercuryCacheConfig struct { c toml.MercuryCache @@ -100,3 +105,18 @@ func (m *mercuryConfig) Transmitter() mercurytransmitter.MercuryTransmitter { func (m *mercuryConfig) VerboseLogging() bool { return *m.c.VerboseLogging } + +func (m *mercuryConfig) DataSource() coreconfig.MercuryDataSource { + return &mercuryDataSourceConfig{c: m.c.DataSource} +} + +type mercuryDataSourceConfig struct { + c toml.MercuryDataSource +} + +func (m *mercuryDataSourceConfig) ObservationTimingBase() time.Duration { + if m.c.ObservationTimingBase == nil { + return 0 + } + return m.c.ObservationTimingBase.Duration() +} diff --git a/core/services/chainlink/config_mercury_test.go b/core/services/chainlink/config_mercury_test.go index 1ae8dc0ba2e..e26531b224f 100644 --- a/core/services/chainlink/config_mercury_test.go +++ b/core/services/chainlink/config_mercury_test.go @@ -2,6 +2,7 @@ package chainlink import ( "testing" + "time" "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -48,3 +49,29 @@ func TestMercuryTLS(t *testing.T) { assert.Equal(t, certPath, cfg.TLS().CertFile()) } + +func TestMercuryDataSourceConfig(t *testing.T) { + t.Parallel() + t.Run("defaults", func(t *testing.T) { + t.Parallel() + opts := GeneralConfigOpts{ + ConfigStrings: []string{`[Feature] +LogPoller = false`}, + } + cfg, err := opts.New() + require.NoError(t, err) + + assert.Equal(t, time.Duration(0), cfg.Mercury().DataSource().ObservationTimingBase()) + }) + + t.Run("from full fixture", func(t *testing.T) { + t.Parallel() + opts := GeneralConfigOpts{ + ConfigStrings: []string{fullTOML}, + } + cfg, err := opts.New() + require.NoError(t, err) + + assert.Equal(t, 50*time.Millisecond, cfg.Mercury().DataSource().ObservationTimingBase()) + }) +} diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 127261bc388..41bd0070f3a 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -836,6 +836,9 @@ func TestConfig_Marshal(t *testing.T) { ReaperMaxAge: commoncfg.MustNewDuration(678 * time.Hour), }, VerboseLogging: ptr(true), + DataSource: toml.MercuryDataSource{ + ObservationTimingBase: commoncfg.MustNewDuration(50 * time.Millisecond), + }, } for _, tt := range []struct { @@ -1277,6 +1280,9 @@ TransmitTimeout = '3m54s' TransmitConcurrency = 456 ReaperFrequency = '9m27s' ReaperMaxAge = '678h0m0s' + +[Mercury.DataSource] +ObservationTimingBase = '50ms' `}, {"full", full, fullTOML}, {"multi-chain", multiChain, multiChainTOML}, diff --git a/core/services/chainlink/mocks/general_config.go b/core/services/chainlink/mocks/general_config.go index 7bb1c4a4a1d..ad602702e4e 100644 --- a/core/services/chainlink/mocks/general_config.go +++ b/core/services/chainlink/mocks/general_config.go @@ -6,7 +6,6 @@ import ( time "time" uuid "github.com/google/uuid" - de "github.com/smartcontractkit/chainlink-data-streams/llo/transmitter/de" toml "github.com/smartcontractkit/chainlink-evm/pkg/config/toml" config "github.com/smartcontractkit/chainlink/v2/core/config" chainlink "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" @@ -1425,19 +1424,19 @@ func (_c *GeneralConfig_LogConfiguration_Call) RunAndReturn(run func(config.Logf } // Mercury provides a mock function with no fields -func (_m *GeneralConfig) Mercury() de.Mercury { +func (_m *GeneralConfig) Mercury() config.Mercury { ret := _m.Called() if len(ret) == 0 { panic("no return value specified for Mercury") } - var r0 de.Mercury - if rf, ok := ret.Get(0).(func() de.Mercury); ok { + var r0 config.Mercury + if rf, ok := ret.Get(0).(func() config.Mercury); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(de.Mercury) + r0 = ret.Get(0).(config.Mercury) } } @@ -1461,12 +1460,12 @@ func (_c *GeneralConfig_Mercury_Call) Run(run func()) *GeneralConfig_Mercury_Cal return _c } -func (_c *GeneralConfig_Mercury_Call) Return(_a0 de.Mercury) *GeneralConfig_Mercury_Call { +func (_c *GeneralConfig_Mercury_Call) Return(_a0 config.Mercury) *GeneralConfig_Mercury_Call { _c.Call.Return(_a0) return _c } -func (_c *GeneralConfig_Mercury_Call) RunAndReturn(run func() de.Mercury) *GeneralConfig_Mercury_Call { +func (_c *GeneralConfig_Mercury_Call) RunAndReturn(run func() config.Mercury) *GeneralConfig_Mercury_Call { _c.Call.Return(run) return _c } diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml index f6eb5121505..7817a8377bd 100644 --- a/core/services/chainlink/testdata/config-empty-effective.toml +++ b/core/services/chainlink/testdata/config-empty-effective.toml @@ -251,6 +251,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index 782db0949d7..1781ec0f633 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -261,6 +261,9 @@ TransmitConcurrency = 456 ReaperFrequency = '9m27s' ReaperMaxAge = '678h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '50ms' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index 86121aaa1b8..a5cbb843cc4 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -251,6 +251,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/core/services/llo/delegate.go b/core/services/llo/delegate.go index b565f5a413c..5e75e57e398 100644 --- a/core/services/llo/delegate.go +++ b/core/services/llo/delegate.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strconv" + "time" "github.com/prometheus/client_golang/prometheus" ocrcommontypes "github.com/smartcontractkit/libocr/commontypes" @@ -61,6 +62,7 @@ type DelegateConfig struct { CaptureObservationTelemetry bool CaptureOutcomeTelemetry bool CaptureReportTelemetry bool + ObservationTimingBase time.Duration // LLO ChannelDefinitionCache llotypes.ChannelDefinitionCache @@ -128,6 +130,7 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) { logger.Named(lggr, "DataSource"), cfg.Registry, t, + cfg.ObservationTimingBase, ) notifier, ok := cfg.ContractTransmitter.(transmitter.TransmitNotifier) diff --git a/core/services/llo/observation/data_source.go b/core/services/llo/observation/data_source.go index 90ef7ba890c..e5451d7c0b7 100644 --- a/core/services/llo/observation/data_source.go +++ b/core/services/llo/observation/data_source.go @@ -51,6 +51,16 @@ func cacheEntryTTL(observationTimeout time.Duration) time.Duration { return time.Duration(cacheTTLMultiplier) * observationTimeout } +// resolveObservationTiming returns the base duration T used for cache TTL, stale refresh, +// loop pacing, and background pipeline timeout. When observationTimingBase is non-zero, +// it replaces the plugin Observe deadline remainder. +func resolveObservationTiming(pluginTimeout, observationTimingBase time.Duration) time.Duration { + if observationTimingBase > 0 { + return observationTimingBase + } + return pluginTimeout +} + // staleRefreshSkipThreshold returns (staleRefreshRemainingNumerator/staleRefreshRemainingDenominator)·T. // buildStreamsRefreshPlan treats a cached stream as still fresh (not a refresh driver) while time.Until(expiresAt) // is strictly greater than this value. A larger fraction (e.g. higher numerator) raises the threshold, so the stream @@ -151,6 +161,7 @@ type dataSource struct { registry Registry t Telemeter cache StreamValueCache + observationTimingBase time.Duration observationLoopStarted atomic.Bool observationLoopCloseCh services.StopChan @@ -161,16 +172,17 @@ type dataSource struct { loopWakeCh chan struct{} } -func NewDataSource(lggr logger.Logger, registry Registry, t Telemeter) llo.DataSource { - return newDataSource(lggr, registry, t) +func NewDataSource(lggr logger.Logger, registry Registry, t Telemeter, observationTimingBase time.Duration) llo.DataSource { + return newDataSource(lggr, registry, t, observationTimingBase) } -func newDataSource(lggr logger.Logger, registry Registry, t Telemeter) *dataSource { +func newDataSource(lggr logger.Logger, registry Registry, t Telemeter, observationTimingBase time.Duration) *dataSource { return &dataSource{ lggr: logger.Named(lggr, "DataSource"), registry: registry, t: t, cache: NewCache(time.Minute), + observationTimingBase: observationTimingBase, observationLoopCloseCh: make(chan struct{}), loopWakeCh: make(chan struct{}, 1), } @@ -523,13 +535,20 @@ func (d *dataSource) setObservableStreams(ctx context.Context, streamValues llo. osv.streamValues[streamID] = nil } + pluginTimeout := osv.observationTimeout if deadline, ok := ctx.Deadline(); ok { - osv.observationTimeout = time.Until(deadline) + pluginTimeout = time.Until(deadline) } + osv.observationTimeout = resolveObservationTiming(pluginTimeout, d.observationTimingBase) - d.lggr.Debugw("setObservableStreams", - "timeout_millis", osv.observationTimeout.Milliseconds(), - "observable_streams", len(osv.streamValues)) + logFields := []any{ + "timeout_millis", pluginTimeout.Milliseconds(), + "observable_streams", len(osv.streamValues), + } + if d.observationTimingBase > 0 { + logFields = append(logFields, "observation_timing_base_ms", d.observationTimingBase.Milliseconds()) + } + d.lggr.Debugw("setObservableStreams", logFields...) d.observableStreamsMu.Lock() defer d.observableStreamsMu.Unlock() @@ -537,10 +556,7 @@ func (d *dataSource) setObservableStreams(ctx context.Context, streamValues llo. if d.observableStreams == nil || len(d.observableStreams.streamValues) != len(osv.streamValues) || d.observableStreams.observationTimeout != osv.observationTimeout { - d.lggr.Infow("setObservableStreams: observable streams changed", - "timeout_millis", osv.observationTimeout.Milliseconds(), - "observable_streams", len(osv.streamValues), - ) + d.lggr.Infow("setObservableStreams: observable streams changed", logFields...) } d.observableStreams = osv diff --git a/core/services/llo/observation/data_source_test.go b/core/services/llo/observation/data_source_test.go index 148a8bb1414..5b82bb6c256 100644 --- a/core/services/llo/observation/data_source_test.go +++ b/core/services/llo/observation/data_source_test.go @@ -219,7 +219,7 @@ func Test_DataSource(t *testing.T) { t.Run("doesn't set any values if no streams are defined", func(t *testing.T) { t.Parallel() reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} - ds := newDataSource(lggr, reg, telem.NullTelemeter) + ds := newDataSource(lggr, reg, telem.NullTelemeter, 0) vals := makeStreamValues() ctx, cancel := context.WithTimeout(mainCtx, observationTimeout) @@ -234,7 +234,7 @@ func Test_DataSource(t *testing.T) { t.Run("observes each stream with success and returns values matching map argument", func(t *testing.T) { t.Parallel() reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} - ds := newDataSource(lggr, reg, telem.NullTelemeter) + ds := newDataSource(lggr, reg, telem.NullTelemeter, 0) reg.mu.Lock() sids := []streams.StreamID{1, 2, 3} @@ -265,7 +265,7 @@ func Test_DataSource(t *testing.T) { t.Run("observes each stream and returns success/errors", func(t *testing.T) { t.Parallel() reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} - ds := newDataSource(lggr, reg, telem.NullTelemeter) + ds := newDataSource(lggr, reg, telem.NullTelemeter, 0) reg.mu.Lock() reg.pipelines[11] = pipelineForStream(11, 11, big.NewInt(21810), errors.New("something exploded")) @@ -292,7 +292,7 @@ func Test_DataSource(t *testing.T) { t.Parallel() tm := &mockTelemeter{} reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} - ds := newDataSource(lggr, reg, tm) + ds := newDataSource(lggr, reg, tm, 0) reg.mu.Lock() reg.pipelines[21] = pipelineForStream(21, 100, big.NewInt(2181), nil) @@ -367,7 +367,7 @@ func Test_DataSource(t *testing.T) { t.Parallel() tm := &mockTelemeter{} reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} - ds := newDataSource(lggr, reg, tm) + ds := newDataSource(lggr, reg, tm, 0) reg.mu.Lock() reg.pipelines[31] = pipelineForStream(31, 100, big.NewInt(2181), errors.New("something exploded")) @@ -406,7 +406,7 @@ func Test_DataSource(t *testing.T) { t.Run("uses cached values when available", func(t *testing.T) { t.Parallel() reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} - ds := newDataSource(lggr, reg, telem.NullTelemeter) + ds := newDataSource(lggr, reg, telem.NullTelemeter, 0) // First observation to populate cache reg.mu.Lock() @@ -460,7 +460,7 @@ func Test_DataSource(t *testing.T) { t.Run("refreshes cache after expiration", func(t *testing.T) { t.Parallel() reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} - ds := newDataSource(lggr, reg, telem.NullTelemeter) + ds := newDataSource(lggr, reg, telem.NullTelemeter, 0) // First observation reg.mu.Lock() @@ -495,7 +495,7 @@ func Test_DataSource(t *testing.T) { t.Parallel() // Create a new data source reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} - ds := newDataSource(lggr, reg, telem.NullTelemeter) + ds := newDataSource(lggr, reg, telem.NullTelemeter, 0) // Set up pipeline to return different values reg.mu.Lock() @@ -529,7 +529,7 @@ func Test_DataSource(t *testing.T) { t.Run("cache writes are atomic per pipeline group across observation cycles", func(t *testing.T) { t.Parallel() reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} - ds := newDataSource(lggr, reg, telem.NullTelemeter) + ds := newDataSource(lggr, reg, telem.NullTelemeter, 0) mc := newMockCache(ds.cache) ds.cache = mc defer ds.Close() @@ -609,7 +609,7 @@ func Test_DataSource(t *testing.T) { t.Run("handles cache errors gracefully", func(t *testing.T) { t.Parallel() reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} - ds := newDataSource(lggr, reg, telem.NullTelemeter) + ds := newDataSource(lggr, reg, telem.NullTelemeter, 0) // First observation with error reg.mu.Lock() @@ -655,7 +655,7 @@ func Test_DataSource_ObservationLoopWakeSkipsPacing(t *testing.T) { //nolint:par reg.pipelines[1] = pipelineForStream(1, 1, big.NewInt(42), nil) reg.mu.Unlock() - ds := newDataSource(lggr, reg, telem.NullTelemeter) + ds := newDataSource(lggr, reg, telem.NullTelemeter, 0) defer ds.Close() // Long plugin deadline => large inter-iteration pacing; wake from Observe should advance the loop without waiting. @@ -680,7 +680,7 @@ func Test_DataSource_ObserveWakeManyConcurrent(t *testing.T) { reg.pipelines[1] = pipelineForStream(1, 1, big.NewInt(1), nil) reg.mu.Unlock() - ds := newDataSource(lggr, reg, telem.NullTelemeter) + ds := newDataSource(lggr, reg, telem.NullTelemeter, 0) ctx, cancel := context.WithTimeout(mainCtx, observationTimeout) defer cancel() vals := makeStreamValues(1) @@ -859,6 +859,9 @@ func Test_observationTuningHelpers(t *testing.T) { assert.Equal(t, observationLoopPacingFloor, observationLoopPacing(0)) // T/2 exceeds invariant cap; pacing is min(T/2, cacheTTL−stale−1ns); here 30/2=15ms caps to ~12ms−1ns assert.Equal(t, cacheEntryTTL(30*time.Millisecond)-staleRefreshSkipThreshold(30*time.Millisecond)-time.Nanosecond, observationLoopPacing(30*time.Millisecond)) + + assert.Equal(t, 24*time.Millisecond, resolveObservationTiming(24*time.Millisecond, 0)) + assert.Equal(t, 50*time.Millisecond, resolveObservationTiming(24*time.Millisecond, 50*time.Millisecond)) } func BenchmarkObserve(b *testing.B) { @@ -924,7 +927,7 @@ result3 -> result3_parse -> multiply3; require.NoError(b, err) } - ds := newDataSource(lggr, r, telem.NullTelemeter) + ds := newDataSource(lggr, r, telem.NullTelemeter, 0) vals := make(map[llotypes.StreamID]llo.StreamValue) for i := uint32(0); i < 4*n; i++ { vals[i] = nil diff --git a/core/services/llo/observation/observation_context.go b/core/services/llo/observation/observation_context.go index 828ef52bf13..d9a08b9971f 100644 --- a/core/services/llo/observation/observation_context.go +++ b/core/services/llo/observation/observation_context.go @@ -72,6 +72,9 @@ func (oc *observationContext) Observe(ctx context.Context, streamID streams.Stre found := false for _, trr := range trrs { if trr.Task.TaskStreamID() != nil && *trr.Task.TaskStreamID() == streamID { + if trr.Result.Error != nil { + return nil, fmt.Errorf("terminal task error: %w; all task errors: %w", trr.Result.Error, trrs.AllErrors()) + } val, err = resultToStreamValue(trr.Result.Value) if err != nil { return nil, fmt.Errorf("failed to convert result to StreamValue for streamID %d: %w", streamID, err) diff --git a/core/services/llo/observation/observation_context_test.go b/core/services/llo/observation/observation_context_test.go index 2f107daa398..a39f9aca61c 100644 --- a/core/services/llo/observation/observation_context_test.go +++ b/core/services/llo/observation/observation_context_test.go @@ -91,6 +91,16 @@ func TestObservationContext_Observe(t *testing.T) { //nolint:paralleltest // sub streamIDs: []streams.StreamID{streamID9, streamID10, streamID11}, } + streamID12 := streams.StreamID(12) + taskErr := errors.New("http request timed out or interrupted") + multiPipelineTaskError := &mockPipeline{ + run: &pipeline.Run{}, + trrs: []pipeline.TaskRunResult{ + {Task: &pipeline.MemoTask{BaseTask: pipeline.BaseTask{StreamID: clnull.Uint32From(streamID12)}}, Result: pipeline.Result{Error: taskErr}}, + }, + streamIDs: []streams.StreamID{streamID12}, + } + r.pipelines = map[streams.StreamID]*mockPipeline{ streamID1: &mockPipeline{}, streamID2: makePipelineWithSingleResult[decimal.Decimal](rand.Int64(), decimal.NewFromFloat(12.34), nil), @@ -103,6 +113,7 @@ func TestObservationContext_Observe(t *testing.T) { //nolint:paralleltest // sub streamID9: multiPipelinePartialFail, streamID10: multiPipelinePartialFail, streamID11: multiPipelinePartialFail, + streamID12: multiPipelineTaskError, } t.Run("returns error in case of missing pipeline", func(t *testing.T) { //nolint:paralleltest // shares ObservationContext setup @@ -171,6 +182,12 @@ func TestObservationContext_Observe(t *testing.T) { //nolint:paralleltest // sub assert.Equal(t, int32(1), multiPipelinePartialFail.runCount.Load()) }) + t.Run("returns task error for streamID-tagged task with nil value", func(t *testing.T) { //nolint:paralleltest // shares ObservationContext setup + _, err := oc.Observe(ctx, streamID12, opts) + require.Error(t, err) + assert.Contains(t, err.Error(), "terminal task error") + assert.Contains(t, err.Error(), "http request timed out or interrupted") + }) } func TestObservationContext_Observe_concurrencyStressTest(t *testing.T) { diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 0567345cc77..bedd25b8e3e 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -173,7 +173,7 @@ type DelegateConfig interface { OCR2() ocr2Config JobPipeline() jobPipelineConfig Insecure() insecureConfig - Mercury() de.Mercury + Mercury() coreconfig.Mercury Threshold() coreconfig.Threshold Sharding() coreconfig.Sharding RingStoreForShard0() *ring.Store @@ -185,7 +185,7 @@ type delegateConfig struct { ocr2 ocr2Config jobPipeline jobPipelineConfig insecure insecureConfig - mercury mercuryConfig + mercury coreconfig.Mercury threshold thresholdConfig sharding coreconfig.Sharding ringStore *ring.Store @@ -203,7 +203,7 @@ func (d *delegateConfig) Threshold() coreconfig.Threshold { return d.threshold } -func (d *delegateConfig) Mercury() de.Mercury { +func (d *delegateConfig) Mercury() coreconfig.Mercury { return d.mercury } @@ -257,7 +257,7 @@ type thresholdConfig interface { ThresholdKeyShare() string } -func NewDelegateConfig(ocr2Cfg ocr2Config, m de.Mercury, t coreconfig.Threshold, i insecureConfig, jp jobPipelineConfig, pluginProcessCfg plugins.RegistrarConfig, s coreconfig.Sharding, ringStore *ring.Store) DelegateConfig { +func NewDelegateConfig(ocr2Cfg ocr2Config, m coreconfig.Mercury, t coreconfig.Threshold, i insecureConfig, jp jobPipelineConfig, pluginProcessCfg plugins.RegistrarConfig, s coreconfig.Sharding, ringStore *ring.Store) DelegateConfig { return &delegateConfig{ ocr2: ocr2Cfg, RegistrarConfig: pluginProcessCfg, @@ -1662,6 +1662,7 @@ func (d *Delegate) newServicesLLO( CaptureObservationTelemetry: jb.OCR2OracleSpec.CaptureEATelemetry, CaptureOutcomeTelemetry: jb.OCR2OracleSpec.CaptureEATelemetry, CaptureReportTelemetry: false, + ObservationTimingBase: d.cfg.Mercury().DataSource().ObservationTimingBase(), ChannelDefinitionCache: provider.ChannelDefinitionCache(), RetirementReportCache: d.retirementReportCache, diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml index f6eb5121505..7817a8377bd 100644 --- a/core/web/resolver/testdata/config-empty-effective.toml +++ b/core/web/resolver/testdata/config-empty-effective.toml @@ -251,6 +251,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index dd7fb035052..7c3496d035e 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -261,6 +261,9 @@ TransmitConcurrency = 456 ReaperFrequency = '9m27s' ReaperMaxAge = '678h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '50ms' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index 68a0130ae84..9b5a1297f1a 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -251,6 +251,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/docs/CONFIG.md b/docs/CONFIG.md index d283f2faf4a..8c4dab5cc5d 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -2364,6 +2364,26 @@ ReaperMaxAge = "48h" # Default ReaperMaxAge controls how old a transmission can be before it is considered stale. Setting to 0 disables the reaper. +## Mercury.DataSource +```toml +[Mercury.DataSource] +ObservationTimingBase = '50ms' # Example +``` +Mercury.DataSource controls node-side tuning for the LLO observation data source. + +### ObservationTimingBase +```toml +ObservationTimingBase = '50ms' # Example +``` +ObservationTimingBase sets the base duration T used to size the LLO observation loop timing: cache entry TTL (2×T), +the stale-refresh threshold, the background loop pacing, and the background pipeline timeout. By default (unset/zero), +T is derived from the plugin's per-round Observe deadline remainder, which is bounded by the on-chain +`MaxDurationObservation`. At very low on-chain values (e.g. 25ms for high-frequency feeds) that makes cache entries +expire before the background refresh loop can complete, producing cache misses even when bridge calls succeed. +Setting this decouples the observation timing from the on-chain deadline without changing the plugin Observe budget itself. +A value around 30-50ms is a reasonable starting point for high-frequency feeds; larger values widen the refresh +runway but make cached values staler. + ## Telemetry ```toml [Telemetry] diff --git a/testdata/scripts/config/merge_raw_configs.txtar b/testdata/scripts/config/merge_raw_configs.txtar index 06ac519d21d..c0763c0f47a 100644 --- a/testdata/scripts/config/merge_raw_configs.txtar +++ b/testdata/scripts/config/merge_raw_configs.txtar @@ -398,6 +398,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index 87476ac18f8..5407b363d87 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -263,6 +263,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/testdata/scripts/node/validate/defaults-override.txtar b/testdata/scripts/node/validate/defaults-override.txtar index a0bd0f22ca8..df7bba8d5f2 100644 --- a/testdata/scripts/node/validate/defaults-override.txtar +++ b/testdata/scripts/node/validate/defaults-override.txtar @@ -324,6 +324,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index b514320e81e..fc5fddf66c8 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -307,6 +307,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index 09d630f0410..c3d50eb57bb 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -307,6 +307,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index efa55d6c4a3..76019f0083a 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -307,6 +307,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/testdata/scripts/node/validate/fallback-override.txtar b/testdata/scripts/node/validate/fallback-override.txtar index 4f5406cf44d..046176159bb 100644 --- a/testdata/scripts/node/validate/fallback-override.txtar +++ b/testdata/scripts/node/validate/fallback-override.txtar @@ -409,6 +409,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar index 63ac3270c61..6f469b4aeff 100644 --- a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar +++ b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar @@ -292,6 +292,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index 2ea583aa574..5037e7b9258 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -303,6 +303,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index d8a68cdc8a4..2033dc6e0ae 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -304,6 +304,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0 diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar index cc7c437f38a..898387ee13c 100644 --- a/testdata/scripts/node/validate/warnings.txtar +++ b/testdata/scripts/node/validate/warnings.txtar @@ -286,6 +286,9 @@ TransmitConcurrency = 100 ReaperFrequency = '1h0m0s' ReaperMaxAge = '48h0m0s' +[Mercury.DataSource] +ObservationTimingBase = '0s' + [Capabilities] [Capabilities.RateLimit] GlobalRPS = 200.0