Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions core/config/app_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"github.com/google/uuid"
pkgerrors "github.com/pkg/errors"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-data-streams/llo/transmitter/de"
)

var (
Expand Down Expand Up @@ -48,7 +46,7 @@ type AppConfig interface {
JobDistributor() JobDistributor
JobPipeline() JobPipeline
Log() Log
Mercury() de.Mercury
Mercury() Mercury
OCR() OCR
OCR2() OCR2
P2P() P2P
Expand Down
12 changes: 12 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,18 @@ ReaperFrequency = "1h" # Default
# stale. Setting to 0 disables the reaper.
ReaperMaxAge = "48h" # Default

# Mercury.DataSource controls node-side tuning for the LLO observation data source.
[Mercury.DataSource]
# ObservationTimingBase sets the base duration T used to size the LLO observation loop timing: cache entry TTL (2×T),
# the stale-refresh threshold, the background loop pacing, and the background pipeline timeout. By default (unset/zero),
# T is derived from the plugin's per-round Observe deadline remainder, which is bounded by the on-chain
# `MaxDurationObservation`. At very low on-chain values (e.g. 25ms for high-frequency feeds) that makes cache entries
# expire before the background refresh loop can complete, producing cache misses even when bridge calls succeed.
# Setting this decouples the observation timing from the on-chain deadline without changing the plugin Observe budget itself.
# A value around 30-50ms is a reasonable starting point for high-frequency feeds; larger values widen the refresh
# runway but make cached values staler.
ObservationTimingBase = '50ms' # Example

# Telemetry holds OTEL settings.
# This data includes open telemetry metrics, traces, & logs.
# It does not currently include prometheus metrics or standard out logs, but may in the future.
Expand Down
23 changes: 23 additions & 0 deletions core/config/mercury_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package config

import (
"time"

"github.com/smartcontractkit/chainlink-data-streams/llo/transmitter/de"
)

// Mercury is the node-side view of Mercury config. It embeds the data-streams
// transmitter's Mercury interface and adds node-only sections (e.g. DataSource).
type Mercury interface {
de.Mercury
DataSource() MercuryDataSource
}

// MercuryDataSource exposes node-side tuning for the LLO observation data source.
type MercuryDataSource interface {
// ObservationTimingBase returns the base duration T used to size the LLO observation
// loop timing: cache entry TTL, stale-refresh threshold, loop pacing, and background
// pipeline timeout. When zero, the plugin Observe deadline remainder is used instead
// (legacy behavior tied to the on-chain MaxDurationObservation).
ObservationTimingBase() time.Duration
}
31 changes: 30 additions & 1 deletion core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1936,6 +1936,7 @@ type Mercury struct {
TLS MercuryTLS `toml:",omitempty"`
Transmitter MercuryTransmitter `toml:",omitempty"`
VerboseLogging *bool `toml:",omitempty"`
DataSource MercuryDataSource `toml:",omitempty"`
}

func (m *Mercury) setFrom(f *Mercury) {
Expand All @@ -1945,10 +1946,38 @@ func (m *Mercury) setFrom(f *Mercury) {
if v := f.VerboseLogging; v != nil {
m.VerboseLogging = v
}
m.DataSource.setFrom(&f.DataSource)
}

func (m *Mercury) ValidateConfig() (err error) {
return m.TLS.ValidateConfig()
err = m.TLS.ValidateConfig()
err = errors.Join(err, m.DataSource.ValidateConfig())
return err
}

// MercuryDataSource holds node-side tuning for the LLO/Mercury observation data source.
type MercuryDataSource struct {
ObservationTimingBase *commonconfig.Duration
}

func (m *MercuryDataSource) setFrom(f *MercuryDataSource) {
if f.ObservationTimingBase != nil {
m.ObservationTimingBase = f.ObservationTimingBase
}
}

func (m *MercuryDataSource) ValidateConfig() error {
if m.ObservationTimingBase == nil {
return nil
}
if m.ObservationTimingBase.Duration() < 0 {
return configutils.ErrInvalid{
Name: "ObservationTimingBase",
Value: m.ObservationTimingBase.Duration(),
Msg: "must be non-negative",
}
}
return nil
}

type MercuryCredentials struct {
Expand Down
3 changes: 1 addition & 2 deletions core/services/chainlink/config_general.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/smartcontractkit/chainlink-common/keystore/corekeys"
"github.com/smartcontractkit/chainlink-common/keystore/corekeys/p2pkey"
commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-data-streams/llo/transmitter/de"
evmcfg "github.com/smartcontractkit/chainlink-evm/pkg/config/toml"

coreconfig "github.com/smartcontractkit/chainlink/v2/core/config"
Expand Down Expand Up @@ -549,7 +548,7 @@ func (g *generalConfig) Prometheus() coreconfig.Prometheus {
return &prometheusConfig{s: g.secrets.Prometheus}
}

func (g *generalConfig) Mercury() de.Mercury {
func (g *generalConfig) Mercury() coreconfig.Mercury {
return &mercuryConfig{c: g.c.Mercury, s: g.secrets.Mercury}
}

Expand Down
22 changes: 21 additions & 1 deletion core/services/chainlink/config_mercury.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types"
mercurytransmitter "github.com/smartcontractkit/chainlink-data-streams/llo/transmitter/de"

coreconfig "github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/config/toml"
)

var _ mercurytransmitter.MercuryCache = (*mercuryCacheConfig)(nil)
var (
_ coreconfig.Mercury = (*mercuryConfig)(nil)
_ coreconfig.MercuryDataSource = (*mercuryDataSourceConfig)(nil)
_ mercurytransmitter.MercuryCache = (*mercuryCacheConfig)(nil)
)

type mercuryCacheConfig struct {
c toml.MercuryCache
Expand Down Expand Up @@ -100,3 +105,18 @@ func (m *mercuryConfig) Transmitter() mercurytransmitter.MercuryTransmitter {
func (m *mercuryConfig) VerboseLogging() bool {
return *m.c.VerboseLogging
}

func (m *mercuryConfig) DataSource() coreconfig.MercuryDataSource {
return &mercuryDataSourceConfig{c: m.c.DataSource}
}

type mercuryDataSourceConfig struct {
c toml.MercuryDataSource
}

func (m *mercuryDataSourceConfig) ObservationTimingBase() time.Duration {
if m.c.ObservationTimingBase == nil {
return 0
}
return m.c.ObservationTimingBase.Duration()
}
27 changes: 27 additions & 0 deletions core/services/chainlink/config_mercury_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chainlink

import (
"testing"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/types"

Expand Down Expand Up @@ -48,3 +49,29 @@ func TestMercuryTLS(t *testing.T) {

assert.Equal(t, certPath, cfg.TLS().CertFile())
}

func TestMercuryDataSourceConfig(t *testing.T) {
t.Parallel()
t.Run("defaults", func(t *testing.T) {
t.Parallel()
opts := GeneralConfigOpts{
ConfigStrings: []string{`[Feature]
LogPoller = false`},
}
cfg, err := opts.New()
require.NoError(t, err)

assert.Equal(t, time.Duration(0), cfg.Mercury().DataSource().ObservationTimingBase())
})

t.Run("from full fixture", func(t *testing.T) {
t.Parallel()
opts := GeneralConfigOpts{
ConfigStrings: []string{fullTOML},
}
cfg, err := opts.New()
require.NoError(t, err)

assert.Equal(t, 50*time.Millisecond, cfg.Mercury().DataSource().ObservationTimingBase())
})
}
6 changes: 6 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,9 @@ func TestConfig_Marshal(t *testing.T) {
ReaperMaxAge: commoncfg.MustNewDuration(678 * time.Hour),
},
VerboseLogging: ptr(true),
DataSource: toml.MercuryDataSource{
ObservationTimingBase: commoncfg.MustNewDuration(50 * time.Millisecond),
},
}

for _, tt := range []struct {
Expand Down Expand Up @@ -1277,6 +1280,9 @@ TransmitTimeout = '3m54s'
TransmitConcurrency = 456
ReaperFrequency = '9m27s'
ReaperMaxAge = '678h0m0s'

[Mercury.DataSource]
ObservationTimingBase = '50ms'
`},
{"full", full, fullTOML},
{"multi-chain", multiChain, multiChainTOML},
Expand Down
13 changes: 6 additions & 7 deletions core/services/chainlink/mocks/general_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ TransmitConcurrency = 100
ReaperFrequency = '1h0m0s'
ReaperMaxAge = '48h0m0s'

[Mercury.DataSource]
ObservationTimingBase = '0s'

[Capabilities]
[Capabilities.RateLimit]
GlobalRPS = 200.0
Expand Down
3 changes: 3 additions & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ TransmitConcurrency = 456
ReaperFrequency = '9m27s'
ReaperMaxAge = '678h0m0s'

[Mercury.DataSource]
ObservationTimingBase = '50ms'

[Capabilities]
[Capabilities.RateLimit]
GlobalRPS = 200.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ TransmitConcurrency = 100
ReaperFrequency = '1h0m0s'
ReaperMaxAge = '48h0m0s'

[Mercury.DataSource]
ObservationTimingBase = '0s'

[Capabilities]
[Capabilities.RateLimit]
GlobalRPS = 200.0
Expand Down
3 changes: 3 additions & 0 deletions core/services/llo/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
ocrcommontypes "github.com/smartcontractkit/libocr/commontypes"
Expand Down Expand Up @@ -61,6 +62,7 @@ type DelegateConfig struct {
CaptureObservationTelemetry bool
CaptureOutcomeTelemetry bool
CaptureReportTelemetry bool
ObservationTimingBase time.Duration

// LLO
ChannelDefinitionCache llotypes.ChannelDefinitionCache
Expand Down Expand Up @@ -128,6 +130,7 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) {
logger.Named(lggr, "DataSource"),
cfg.Registry,
t,
cfg.ObservationTimingBase,
)

notifier, ok := cfg.ContractTransmitter.(transmitter.TransmitNotifier)
Expand Down
Loading
Loading