Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260406055916-9aa6b6c0ae81
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260505202410-b350dca113b4
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260326111235-8c09d1a4491f
github.com/stretchr/testify v1.11.1
github.com/valyala/fastjson v1.6.10
Expand Down Expand Up @@ -128,9 +129,9 @@ require (
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.36.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.36.0 // indirect
go.opentelemetry.io/otel/log v0.15.0 // indirect
go.opentelemetry.io/otel/sdk v1.40.0 // indirect
go.opentelemetry.io/otel/sdk v1.41.0 // indirect
go.opentelemetry.io/otel/sdk/log v0.15.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.41.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY=
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260505202410-b350dca113b4 h1:v/rAtObo9zMpQDnGQ0seaSEqw60JUjowwcNw3DCpVY4=
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260505202410-b350dca113b4/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4=
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243 h1:71PGTkjdFZ0JrloEC2Fs8eHl1b1gmUuH+bq7q23usKk=
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243/go.mod h1:7ketk4ischPQW/JQgmyHz6zdzLUJv1VC29SiSgosydQ=
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260326111235-8c09d1a4491f h1:8p3vE987AHM3Of1JvnNJXNE/AtWtfNvJhk3TeeAG3Qw=
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260326111235-8c09d1a4491f/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8=
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM=
Expand Down Expand Up @@ -420,14 +422,14 @@ go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzu
go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM=
go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY=
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8=
go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE=
go.opentelemetry.io/otel/sdk v1.41.0 h1:YPIEXKmiAwkGl3Gu1huk1aYWwtpRLeskpV+wPisxBp8=
go.opentelemetry.io/otel/sdk v1.41.0/go.mod h1:ahFdU0G5y8IxglBf0QBJXgSe7agzjE4GiTJ6HT9ud90=
go.opentelemetry.io/otel/sdk/log v0.15.0 h1:WgMEHOUt5gjJE93yqfqJOkRflApNif84kxoHWS9VVHE=
go.opentelemetry.io/otel/sdk/log v0.15.0/go.mod h1:qDC/FlKQCXfH5hokGsNg9aUBGMJQsrUyeOiW5u+dKBQ=
go.opentelemetry.io/otel/sdk/log/logtest v0.13.0 h1:9yio6AFZ3QD9j9oqshV1Ibm9gPLlHNxurno5BreMtIA=
go.opentelemetry.io/otel/sdk/log/logtest v0.13.0/go.mod h1:QOGiAJHl+fob8Nu85ifXfuQYmJTFAvcrxL6w5/tu168=
go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw=
go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg=
go.opentelemetry.io/otel/sdk/metric v1.41.0 h1:siZQIYBAUd1rlIWQT2uCxWJxcCO7q3TriaMlf08rXw8=
go.opentelemetry.io/otel/sdk/metric v1.41.0/go.mod h1:HNBuSvT7ROaGtGI50ArdRLUnvRTRGniSUZbxiWxSO8Y=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A=
go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0=
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ require (
github.com/smartcontractkit/chainlink-framework/capabilities v0.0.0-20250818175541-3389ac08a563 // indirect
github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20260326122810-b657beadfb57 // indirect
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260505202410-b350dca113b4 // indirect
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260326180413-c69f27e37a13 // indirect
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243 // indirect
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 // indirect
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d // indirect
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat v0.0.0-20260115142640-f6b99095c12e // indirect
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1425,8 +1425,8 @@ github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20260326122810-b65
github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20260326122810-b657beadfb57/go.mod h1:kGprqyjsz6qFNVszOQoHc24wfvCjyipNZFste/3zcbs=
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260505202410-b350dca113b4 h1:v/rAtObo9zMpQDnGQ0seaSEqw60JUjowwcNw3DCpVY4=
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260505202410-b350dca113b4/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4=
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260326180413-c69f27e37a13 h1:3KLLkTCIAy9CvT35Ey0k6pcWX/u+qsm3Y/58TI5VSAg=
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260326180413-c69f27e37a13/go.mod h1:Y7h84PqCe/Vimf2h1Nc6tMiOJStDbtM33fEUeaaF5xk=
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243 h1:71PGTkjdFZ0JrloEC2Fs8eHl1b1gmUuH+bq7q23usKk=
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243/go.mod h1:7ketk4ischPQW/JQgmyHz6zdzLUJv1VC29SiSgosydQ=
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE=
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA=
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d h1:VYoBBNnQpZ5p+enPTl8SkKBRaubqyGpO0ul3B1np++I=
Expand Down
14 changes: 7 additions & 7 deletions relayer/aptos_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type aptosService struct {
}

func (s *aptosService) LedgerVersion(ctx context.Context) (uint64, error) {
client, err := s.chain.GetClient()
client, err := s.chain.GetMultiNodeClient(ctx)
if err != nil {
return 0, fmt.Errorf("failed to get client: %w", err)
}
Expand All @@ -41,7 +41,7 @@ func (s *aptosService) LedgerVersion(ctx context.Context) (uint64, error) {
}

func (s *aptosService) AccountAPTBalance(ctx context.Context, req commonaptos.AccountAPTBalanceRequest) (*commonaptos.AccountAPTBalanceReply, error) {
client, err := s.chain.GetClient()
client, err := s.chain.GetMultiNodeClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get client: %w", err)
}
Expand All @@ -67,7 +67,7 @@ func (s *aptosService) View(ctx context.Context, req commonaptos.ViewRequest) (*
"args", req.Payload.Args,
)

client, err := s.chain.GetClient()
client, err := s.chain.GetMultiNodeClient(ctx)
if err != nil {
s.logger.Errorw("View: failed to get client", "error", err)
return nil, fmt.Errorf("failed to get client: %w", err)
Expand Down Expand Up @@ -105,7 +105,7 @@ func (s *aptosService) View(ctx context.Context, req commonaptos.ViewRequest) (*
}

func (s *aptosService) TransactionByHash(ctx context.Context, req commonaptos.TransactionByHashRequest) (*commonaptos.TransactionByHashReply, error) {
client, err := s.chain.GetClient()
client, err := s.chain.GetMultiNodeClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get client: %w", err)
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func (s *aptosService) AccountTransactions(ctx context.Context, req commonaptos.
"hasLimit", req.Limit != nil,
)

client, err := s.chain.GetClient()
client, err := s.chain.GetMultiNodeClient(ctx)
if err != nil {
s.logger.Errorw("AccountTransactions: failed to get client", "error", err)
return nil, fmt.Errorf("failed to get client: %w", err)
Expand Down Expand Up @@ -186,7 +186,7 @@ func (s *aptosService) AccountTransactions(ctx context.Context, req commonaptos.
}

func (s *aptosService) accountTransactionsWindow(
client aptos_sdk.AptosRpcClient,
client chain.ServiceRPCClient,
address aptos_sdk.AccountAddress,
start *uint64,
limit *uint64,
Expand Down Expand Up @@ -367,7 +367,7 @@ func (s *aptosService) getAccountWithHighestBalance(ctx context.Context, account
return accounts[0], nil
}

client, err := s.chain.GetClient()
client, err := s.chain.GetMultiNodeClient(ctx)
if err != nil {
return "", fmt.Errorf("failed to get client: %w", err)
}
Expand Down
5 changes: 5 additions & 0 deletions relayer/aptos_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/chains/aptos"

"github.com/smartcontractkit/chainlink-aptos/relayer/chain"
chainconfig "github.com/smartcontractkit/chainlink-aptos/relayer/config"
"github.com/smartcontractkit/chainlink-aptos/relayer/logpoller"
clientmocks "github.com/smartcontractkit/chainlink-aptos/relayer/monitor/mocks"
Expand Down Expand Up @@ -266,6 +267,10 @@ func (t testChain) GetClient() (aptos_sdk.AptosRpcClient, error) {
return t.client, nil
}

func (t testChain) GetMultiNodeClient(_ context.Context) (chain.ServiceRPCClient, error) {
return t.client, nil
}

func (t testChain) KeyStore() loop.Keystore {
return nil
}
85 changes: 77 additions & 8 deletions relayer/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/types"
commonutils "github.com/smartcontractkit/chainlink-common/pkg/utils"
frameworkmetrics "github.com/smartcontractkit/chainlink-framework/metrics"
"github.com/smartcontractkit/chainlink-framework/multinode"

"github.com/smartcontractkit/chainlink-aptos/relayer/config"
"github.com/smartcontractkit/chainlink-aptos/relayer/logpoller"
Expand All @@ -41,6 +43,9 @@ type Chain interface {
TxManager() *txm.AptosTxm
LogPoller() *logpoller.AptosLogPoller
GetClient() (aptos.AptosRpcClient, error)
// GetMultiNodeClient returns a healthy RPC node selected by the multinode pool.
// Used by the aptos service layer; returns multinode.ErrNodeError when no live node is available.
GetMultiNodeClient(ctx context.Context) (ServiceRPCClient, error)
KeyStore() loop.Keystore
}

Expand Down Expand Up @@ -92,6 +97,7 @@ type chain struct {
txm *txm.AptosTxm
logPoller *logpoller.AptosLogPoller
balanceMonitor services.Service
multiNode *multinode.MultiNode[multinode.StringID, *MultiNodeClient]
}

func NewChain(cfg *config.TOMLConfig, opts ChainOpts) (Chain, error) {
Expand Down Expand Up @@ -123,14 +129,21 @@ func newChain(cfg *config.TOMLConfig, loopKs loop.Keystore, lggr logger.Logger,
}

ch := &chain{
id: cfg.ChainID,
cfg: cfg,
lggr: logger.Named(lggr, "Chain"),
ds: ds,
keyStore: loopKs,
id: cfg.ChainID,
cfg: cfg,
lggr: logger.Named(lggr, "Chain"),
ds: ds,
keyStore: loopKs,
clientCache: make(map[string]aptos.AptosRpcClient),
}

cfg.SetMultiNodeDefaults()
mn, err := newMultiNode(cfg, lggr)
if err != nil {
return nil, fmt.Errorf("failed to create multinode pool: %w", err)
}
ch.multiNode = mn

ch.txm, err = txm.New(lggr, loopKs, *cfg.TransactionManager, ch.GetClient, cfg.ChainID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -185,6 +198,59 @@ func (c *chain) KeyStore() loop.Keystore {
return c.keyStore
}

// newMultiNode builds the framework multinode pool from the configured RPC nodes. A single-node
// pool is valid; the pool still provides background health checking and dead-node eviction.
func newMultiNode(cfg *config.TOMLConfig, lggr logger.Logger) (*multinode.MultiNode[multinode.StringID, *MultiNodeClient], error) {
chainID := multinode.StringID(cfg.ChainID)

mnMetrics, err := frameworkmetrics.NewGenericMultiNodeMetrics(config.ChainFamilyName, cfg.ChainID)
if err != nil {
return nil, fmt.Errorf("failed to create multinode metrics: %w", err)
}
rpcMetrics, err := frameworkmetrics.NewRPCClientMetrics(frameworkmetrics.RPCClientMetricsConfig{
ChainFamily: config.ChainFamilyName,
ChainID: cfg.ChainID,
})
if err != nil {
return nil, fmt.Errorf("failed to create rpc client metrics: %w", err)
}

primaries := make([]multinode.Node[multinode.StringID, *MultiNodeClient], 0, len(cfg.Nodes))
for i, node := range cfg.Nodes {
rpc, err := NewMultiNodeClient(node.URL.String(), &cfg.MultiNode, cfg.RequestTimeout.Duration(), lggr, rpcMetrics)
if err != nil {
return nil, fmt.Errorf("failed to create multinode client for node %s: %w", *node.Name, err)
}
var order int32
if node.Order != nil {
order = *node.Order
}
// NewNode takes (nodeCfg NodeConfig, chainCfg ChainConfig, ...) -- two different
// interfaces for node-level vs chain-level config. MultiNodeConfig implements both,
// so the same object is passed twice in each role.
// The wsuri/httpuri split is an Ethereum-centric design (WS for subscriptions, HTTP
// for queries). For HTTP-only chains like Aptos, the single URL goes as wsuri (the
// primary endpoint used by the node lifecycle) and httpuri is nil.
primaries = append(primaries, multinode.NewNode[multinode.StringID, *Head, *MultiNodeClient](
&cfg.MultiNode, &cfg.MultiNode, lggr, mnMetrics,
node.URL.URL(), nil,
*node.Name, i, chainID, order, rpc, config.ChainFamilyName,
false, // isLoadBalancedRPC
))
}

return multinode.NewMultiNode[multinode.StringID, *MultiNodeClient](
lggr, mnMetrics, cfg.MultiNode.SelectionMode(), cfg.MultiNode.LeaseDuration(),
primaries, nil, // no send-only nodes
chainID, config.ChainFamilyName, cfg.MultiNode.DeathDeclarationDelay(),
), nil
}

// GetMultiNodeClient returns a healthy RPC node selected by the multinode pool.
func (c *chain) GetMultiNodeClient(ctx context.Context) (ServiceRPCClient, error) {
return c.multiNode.SelectRPC(ctx)
}

// GetClient returns a client, randomly selecting one from available and valid nodes.
// Clients are cached per node URL. Uses http.DefaultClient so all NodeClients share
// the process-wide default transport (avoids port exhaustion with aptos-go-sdk v1.12+).
Expand Down Expand Up @@ -268,9 +334,10 @@ func (c *chain) Start(ctx context.Context) error {
c.lggr.Debug("Starting txm")
c.lggr.Debug("Starting logPoller")
c.lggr.Debug("Starting balance monitor")
c.lggr.Debug("Starting multinode")

var ms services.MultiStart
return ms.Start(ctx, c.txm, c.logPoller, c.balanceMonitor)
return ms.Start(ctx, c.multiNode, c.txm, c.logPoller, c.balanceMonitor)
})
}

Expand All @@ -280,17 +347,19 @@ func (c *chain) Close() error {
c.lggr.Debug("Stopping txm")
c.lggr.Debug("Stopping logPoller")
c.lggr.Debug("Stopping balance monitor")
c.lggr.Debug("Stopping multinode")

return services.CloseAll(c.txm, c.logPoller, c.balanceMonitor)
return services.CloseAll(c.txm, c.logPoller, c.balanceMonitor, c.multiNode)
})
}

func (c *chain) Ready() error {
return errors.Join(c.starter.Ready(), c.txm.Ready(), c.logPoller.Ready(), c.balanceMonitor.Ready())
return errors.Join(c.starter.Ready(), c.multiNode.Ready(), c.txm.Ready(), c.logPoller.Ready(), c.balanceMonitor.Ready())
}

func (c *chain) HealthReport() map[string]error {
report := map[string]error{c.Name(): c.starter.Healthy()}
services.CopyHealth(report, c.multiNode.HealthReport())
services.CopyHealth(report, c.txm.HealthReport())
services.CopyHealth(report, c.logPoller.HealthReport())
services.CopyHealth(report, c.balanceMonitor.HealthReport())
Expand Down
45 changes: 45 additions & 0 deletions relayer/chain/chain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package chain

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink-aptos/relayer/config"
)

func newTestTOMLConfig(t *testing.T) *config.TOMLConfig {
t.Helper()
cfg, err := config.NewDecodedTOMLConfig(`
ChainID = "2"

[[Nodes]]
Name = "primary"
URL = "http://localhost:8080"

[[Nodes]]
Name = "secondary"
URL = "http://localhost:8081"
`)
require.NoError(t, err)
return cfg
}

func TestNewMultiNode(t *testing.T) {
t.Parallel()

cfg := newTestTOMLConfig(t)
mn, err := newMultiNode(cfg, logger.Test(t))
require.NoError(t, err)
require.NotNil(t, mn)

// An unstarted pool has no live node, so selection fails fast rather than returning a
// usable client. This is the path that surfaces multinode.ErrNodeError through GetMultiNodeClient.
ctx, cancel := context.WithCancel(t.Context())
cancel()
_, err = mn.SelectRPC(ctx)
require.Error(t, err)
}
Loading
Loading