From e095c4f094b7dc7a9fdcf6fa2adacd4c2f3c3dcc Mon Sep 17 00:00:00 2001 From: yashnevatia Date: Tue, 16 Jun 2026 12:52:46 +0100 Subject: [PATCH 1/3] Add multinode support for aptos service --- go.mod | 5 +- go.sum | 10 +- relayer/aptos_service.go | 14 +-- relayer/aptos_service_test.go | 5 + relayer/chain/chain.go | 83 +++++++++++-- relayer/chain/chain_test.go | 45 +++++++ relayer/chain/multinode_client.go | 145 +++++++++++++++++++++++ relayer/chain/multinode_client_test.go | 158 +++++++++++++++++++++++++ relayer/config/config.go | 56 +++++++++ relayer/config/config_test.go | 50 ++++++++ 10 files changed, 550 insertions(+), 21 deletions(-) create mode 100644 relayer/chain/chain_test.go create mode 100644 relayer/chain/multinode_client.go create mode 100644 relayer/chain/multinode_client_test.go diff --git a/go.mod b/go.mod index eaefae5a..674ae7e3 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82 github.com/smartcontractkit/chainlink-common v0.11.2-0.20260406055916-9aa6b6c0ae81 github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260505202410-b350dca113b4 + github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260326111235-8c09d1a4491f github.com/stretchr/testify v1.11.1 github.com/valyala/fastjson v1.6.10 @@ -128,9 +129,9 @@ require ( go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.36.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.36.0 // indirect go.opentelemetry.io/otel/log v0.15.0 // indirect - go.opentelemetry.io/otel/sdk v1.40.0 // indirect + go.opentelemetry.io/otel/sdk v1.41.0 // indirect go.opentelemetry.io/otel/sdk/log v0.15.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.41.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect diff --git a/go.sum b/go.sum index 49a7f966..aa569669 100644 --- a/go.sum +++ b/go.sum @@ -339,6 +339,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9 github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260505202410-b350dca113b4 h1:v/rAtObo9zMpQDnGQ0seaSEqw60JUjowwcNw3DCpVY4= github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260505202410-b350dca113b4/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4= +github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243 h1:71PGTkjdFZ0JrloEC2Fs8eHl1b1gmUuH+bq7q23usKk= +github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243/go.mod h1:7ketk4ischPQW/JQgmyHz6zdzLUJv1VC29SiSgosydQ= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260326111235-8c09d1a4491f h1:8p3vE987AHM3Of1JvnNJXNE/AtWtfNvJhk3TeeAG3Qw= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260326111235-8c09d1a4491f/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= @@ -420,14 +422,14 @@ go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzu go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM= go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY= go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= -go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= -go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/sdk v1.41.0 h1:YPIEXKmiAwkGl3Gu1huk1aYWwtpRLeskpV+wPisxBp8= +go.opentelemetry.io/otel/sdk v1.41.0/go.mod h1:ahFdU0G5y8IxglBf0QBJXgSe7agzjE4GiTJ6HT9ud90= go.opentelemetry.io/otel/sdk/log v0.15.0 h1:WgMEHOUt5gjJE93yqfqJOkRflApNif84kxoHWS9VVHE= go.opentelemetry.io/otel/sdk/log v0.15.0/go.mod h1:qDC/FlKQCXfH5hokGsNg9aUBGMJQsrUyeOiW5u+dKBQ= go.opentelemetry.io/otel/sdk/log/logtest v0.13.0 h1:9yio6AFZ3QD9j9oqshV1Ibm9gPLlHNxurno5BreMtIA= go.opentelemetry.io/otel/sdk/log/logtest v0.13.0/go.mod h1:QOGiAJHl+fob8Nu85ifXfuQYmJTFAvcrxL6w5/tu168= -go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= -go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= +go.opentelemetry.io/otel/sdk/metric v1.41.0 h1:siZQIYBAUd1rlIWQT2uCxWJxcCO7q3TriaMlf08rXw8= +go.opentelemetry.io/otel/sdk/metric v1.41.0/go.mod h1:HNBuSvT7ROaGtGI50ArdRLUnvRTRGniSUZbxiWxSO8Y= go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A= go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0= diff --git a/relayer/aptos_service.go b/relayer/aptos_service.go index 0f67299d..b063e2d7 100644 --- a/relayer/aptos_service.go +++ b/relayer/aptos_service.go @@ -27,7 +27,7 @@ type aptosService struct { } func (s *aptosService) LedgerVersion(ctx context.Context) (uint64, error) { - client, err := s.chain.GetClient() + client, err := s.chain.GetMultiNodeClient(ctx) if err != nil { return 0, fmt.Errorf("failed to get client: %w", err) } @@ -41,7 +41,7 @@ func (s *aptosService) LedgerVersion(ctx context.Context) (uint64, error) { } func (s *aptosService) AccountAPTBalance(ctx context.Context, req commonaptos.AccountAPTBalanceRequest) (*commonaptos.AccountAPTBalanceReply, error) { - client, err := s.chain.GetClient() + client, err := s.chain.GetMultiNodeClient(ctx) if err != nil { return nil, fmt.Errorf("failed to get client: %w", err) } @@ -67,7 +67,7 @@ func (s *aptosService) View(ctx context.Context, req commonaptos.ViewRequest) (* "args", req.Payload.Args, ) - client, err := s.chain.GetClient() + client, err := s.chain.GetMultiNodeClient(ctx) if err != nil { s.logger.Errorw("View: failed to get client", "error", err) return nil, fmt.Errorf("failed to get client: %w", err) @@ -105,7 +105,7 @@ func (s *aptosService) View(ctx context.Context, req commonaptos.ViewRequest) (* } func (s *aptosService) TransactionByHash(ctx context.Context, req commonaptos.TransactionByHashRequest) (*commonaptos.TransactionByHashReply, error) { - client, err := s.chain.GetClient() + client, err := s.chain.GetMultiNodeClient(ctx) if err != nil { return nil, fmt.Errorf("failed to get client: %w", err) } @@ -138,7 +138,7 @@ func (s *aptosService) AccountTransactions(ctx context.Context, req commonaptos. "hasLimit", req.Limit != nil, ) - client, err := s.chain.GetClient() + client, err := s.chain.GetMultiNodeClient(ctx) if err != nil { s.logger.Errorw("AccountTransactions: failed to get client", "error", err) return nil, fmt.Errorf("failed to get client: %w", err) @@ -186,7 +186,7 @@ func (s *aptosService) AccountTransactions(ctx context.Context, req commonaptos. } func (s *aptosService) accountTransactionsWindow( - client aptos_sdk.AptosRpcClient, + client chain.ServiceRPCClient, address aptos_sdk.AccountAddress, start *uint64, limit *uint64, @@ -367,7 +367,7 @@ func (s *aptosService) getAccountWithHighestBalance(ctx context.Context, account return accounts[0], nil } - client, err := s.chain.GetClient() + client, err := s.chain.GetMultiNodeClient(ctx) if err != nil { return "", fmt.Errorf("failed to get client: %w", err) } diff --git a/relayer/aptos_service_test.go b/relayer/aptos_service_test.go index 1e84d4cf..0eea7e65 100644 --- a/relayer/aptos_service_test.go +++ b/relayer/aptos_service_test.go @@ -15,6 +15,7 @@ import ( commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/chains/aptos" + "github.com/smartcontractkit/chainlink-aptos/relayer/chain" chainconfig "github.com/smartcontractkit/chainlink-aptos/relayer/config" "github.com/smartcontractkit/chainlink-aptos/relayer/logpoller" clientmocks "github.com/smartcontractkit/chainlink-aptos/relayer/monitor/mocks" @@ -266,6 +267,10 @@ func (t testChain) GetClient() (aptos_sdk.AptosRpcClient, error) { return t.client, nil } +func (t testChain) GetMultiNodeClient(_ context.Context) (chain.ServiceRPCClient, error) { + return t.client, nil +} + func (t testChain) KeyStore() loop.Keystore { return nil } diff --git a/relayer/chain/chain.go b/relayer/chain/chain.go index d5a7542a..5f33f9ae 100644 --- a/relayer/chain/chain.go +++ b/relayer/chain/chain.go @@ -21,6 +21,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/types" commonutils "github.com/smartcontractkit/chainlink-common/pkg/utils" + frameworkmetrics "github.com/smartcontractkit/chainlink-framework/metrics" + "github.com/smartcontractkit/chainlink-framework/multinode" "github.com/smartcontractkit/chainlink-aptos/relayer/config" "github.com/smartcontractkit/chainlink-aptos/relayer/logpoller" @@ -41,6 +43,9 @@ type Chain interface { TxManager() *txm.AptosTxm LogPoller() *logpoller.AptosLogPoller GetClient() (aptos.AptosRpcClient, error) + // GetMultiNodeClient returns a healthy RPC node selected by the multinode pool. + // Used by the aptos service layer; returns multinode.ErrNodeError when no live node is available. + GetMultiNodeClient(ctx context.Context) (ServiceRPCClient, error) KeyStore() loop.Keystore } @@ -92,6 +97,7 @@ type chain struct { txm *txm.AptosTxm logPoller *logpoller.AptosLogPoller balanceMonitor services.Service + multiNode *multinode.MultiNode[multinode.StringID, *MultiNodeClient] } func NewChain(cfg *config.TOMLConfig, opts ChainOpts) (Chain, error) { @@ -123,14 +129,21 @@ func newChain(cfg *config.TOMLConfig, loopKs loop.Keystore, lggr logger.Logger, } ch := &chain{ - id: cfg.ChainID, - cfg: cfg, - lggr: logger.Named(lggr, "Chain"), - ds: ds, - keyStore: loopKs, + id: cfg.ChainID, + cfg: cfg, + lggr: logger.Named(lggr, "Chain"), + ds: ds, + keyStore: loopKs, clientCache: make(map[string]aptos.AptosRpcClient), } + cfg.SetMultiNodeDefaults() + mn, err := newMultiNode(cfg, lggr) + if err != nil { + return nil, fmt.Errorf("failed to create multinode pool: %w", err) + } + ch.multiNode = mn + ch.txm, err = txm.New(lggr, loopKs, *cfg.TransactionManager, ch.GetClient, cfg.ChainID) if err != nil { return nil, err @@ -185,6 +198,59 @@ func (c *chain) KeyStore() loop.Keystore { return c.keyStore } +// newMultiNode builds the framework multinode pool from the configured RPC nodes. A single-node +// pool is valid; the pool still provides background health checking and dead-node eviction. +func newMultiNode(cfg *config.TOMLConfig, lggr logger.Logger) (*multinode.MultiNode[multinode.StringID, *MultiNodeClient], error) { + chainID := multinode.StringID(cfg.ChainID) + + mnMetrics, err := frameworkmetrics.NewGenericMultiNodeMetrics(config.ChainFamilyName, cfg.ChainID) + if err != nil { + return nil, fmt.Errorf("failed to create multinode metrics: %w", err) + } + rpcMetrics, err := frameworkmetrics.NewRPCClientMetrics(frameworkmetrics.RPCClientMetricsConfig{ + ChainFamily: config.ChainFamilyName, + ChainID: cfg.ChainID, + }) + if err != nil { + return nil, fmt.Errorf("failed to create rpc client metrics: %w", err) + } + + primaries := make([]multinode.Node[multinode.StringID, *MultiNodeClient], 0, len(cfg.Nodes)) + for i, node := range cfg.Nodes { + rpc, err := NewMultiNodeClient(node.URL.String(), &cfg.MultiNode, cfg.RequestTimeout.Duration(), lggr, rpcMetrics) + if err != nil { + return nil, fmt.Errorf("failed to create multinode client for node %s: %w", *node.Name, err) + } + var order int32 + if node.Order != nil { + order = *node.Order + } + // NewNode takes (nodeCfg NodeConfig, chainCfg ChainConfig, ...) -- two different + // interfaces for node-level vs chain-level config. MultiNodeConfig implements both, + // so the same object is passed twice in each role. + // The wsuri/httpuri split is an Ethereum-centric design (WS for subscriptions, HTTP + // for queries). For HTTP-only chains like Aptos, the single URL goes as wsuri (the + // primary endpoint used by the node lifecycle) and httpuri is nil. + primaries = append(primaries, multinode.NewNode[multinode.StringID, *Head, *MultiNodeClient]( + &cfg.MultiNode, &cfg.MultiNode, lggr, mnMetrics, + node.URL.URL(), nil, + *node.Name, i, chainID, order, rpc, config.ChainFamilyName, + false, // isLoadBalancedRPC + )) + } + + return multinode.NewMultiNode[multinode.StringID, *MultiNodeClient]( + lggr, mnMetrics, cfg.MultiNode.SelectionMode(), cfg.MultiNode.LeaseDuration(), + primaries, nil, // no send-only nodes + chainID, config.ChainFamilyName, cfg.MultiNode.DeathDeclarationDelay(), + ), nil +} + +// GetMultiNodeClient returns a healthy RPC node selected by the multinode pool. +func (c *chain) GetMultiNodeClient(ctx context.Context) (ServiceRPCClient, error) { + return c.multiNode.SelectRPC(ctx) +} + // GetClient returns a client, randomly selecting one from available and valid nodes. // Clients are cached per node URL. Uses http.DefaultClient so all NodeClients share // the process-wide default transport (avoids port exhaustion with aptos-go-sdk v1.12+). @@ -270,7 +336,7 @@ func (c *chain) Start(ctx context.Context) error { c.lggr.Debug("Starting balance monitor") var ms services.MultiStart - return ms.Start(ctx, c.txm, c.logPoller, c.balanceMonitor) + return ms.Start(ctx, c.multiNode, c.txm, c.logPoller, c.balanceMonitor) }) } @@ -281,16 +347,17 @@ func (c *chain) Close() error { c.lggr.Debug("Stopping logPoller") c.lggr.Debug("Stopping balance monitor") - return services.CloseAll(c.txm, c.logPoller, c.balanceMonitor) + return services.CloseAll(c.txm, c.logPoller, c.balanceMonitor, c.multiNode) }) } func (c *chain) Ready() error { - return errors.Join(c.starter.Ready(), c.txm.Ready(), c.logPoller.Ready(), c.balanceMonitor.Ready()) + return errors.Join(c.starter.Ready(), c.multiNode.Ready(), c.txm.Ready(), c.logPoller.Ready(), c.balanceMonitor.Ready()) } func (c *chain) HealthReport() map[string]error { report := map[string]error{c.Name(): c.starter.Healthy()} + services.CopyHealth(report, c.multiNode.HealthReport()) services.CopyHealth(report, c.txm.HealthReport()) services.CopyHealth(report, c.logPoller.HealthReport()) services.CopyHealth(report, c.balanceMonitor.HealthReport()) diff --git a/relayer/chain/chain_test.go b/relayer/chain/chain_test.go new file mode 100644 index 00000000..3a0fc7fe --- /dev/null +++ b/relayer/chain/chain_test.go @@ -0,0 +1,45 @@ +package chain + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + + "github.com/smartcontractkit/chainlink-aptos/relayer/config" +) + +func newTestTOMLConfig(t *testing.T) *config.TOMLConfig { + t.Helper() + cfg, err := config.NewDecodedTOMLConfig(` +ChainID = "2" + +[[Nodes]] +Name = "primary" +URL = "http://localhost:8080" + +[[Nodes]] +Name = "secondary" +URL = "http://localhost:8081" +`) + require.NoError(t, err) + return cfg +} + +func TestNewMultiNode(t *testing.T) { + t.Parallel() + + cfg := newTestTOMLConfig(t) + mn, err := newMultiNode(cfg, logger.Test(t)) + require.NoError(t, err) + require.NotNil(t, mn) + + // An unstarted pool has no live node, so selection fails fast rather than returning a + // usable client. This is the path that surfaces multinode.ErrNodeError through GetMultiNodeClient. + ctx, cancel := context.WithCancel(t.Context()) + cancel() + _, err = mn.SelectRPC(ctx) + require.Error(t, err) +} diff --git a/relayer/chain/multinode_client.go b/relayer/chain/multinode_client.go new file mode 100644 index 00000000..506ade98 --- /dev/null +++ b/relayer/chain/multinode_client.go @@ -0,0 +1,145 @@ +package chain + +import ( + "context" + "fmt" + "math/big" + "net/http" + "strconv" + "time" + + "github.com/aptos-labs/aptos-go-sdk" + "github.com/aptos-labs/aptos-go-sdk/api" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + frameworkmetrics "github.com/smartcontractkit/chainlink-framework/metrics" + "github.com/smartcontractkit/chainlink-framework/multinode" +) + +// ServiceRPCClient is the subset of the Aptos SDK used by the multinode adapter and the aptos +// service layer. Keeping it narrow decouples the relayer from churn in the full SDK interface +// and makes mocking straightforward. +type ServiceRPCClient interface { + Info() (aptos.NodeInfo, error) + GetChainId() (uint8, error) + NodeAPIHealthCheck(durationSecs ...uint64) (api.HealthCheckResponse, error) + Account(address aptos.AccountAddress, ledgerVersion ...uint64) (aptos.AccountInfo, error) + AccountAPTBalance(address aptos.AccountAddress, ledgerVersion ...uint64) (uint64, error) + View(payload *aptos.ViewPayload, ledgerVersion ...uint64) ([]any, error) + TransactionByHash(txnHash string) (*api.Transaction, error) + AccountTransactions(address aptos.AccountAddress, start *uint64, limit *uint64) ([]*api.CommittedTransaction, error) +} + +// Head is the multinode head for Aptos. Aptos blocks have single-shot finality (committed +// blocks are final, no reorgs), so there is a single head notion: the latest block is also +// the finalized block. Difficulty/total-difficulty are PoW concepts and are always nil. +type Head struct { + Height uint64 +} + +func (h *Head) BlockNumber() int64 { + if !h.IsValid() { + return 0 + } + return int64(h.Height) +} + +func (h *Head) BlockDifficulty() *big.Int { return nil } +func (h *Head) GetTotalDifficulty() *big.Int { return nil } + +func (h *Head) IsValid() bool { return h != nil && h.Height > 0 } + +// MultiNodeClient embeds *multinode.RPCClientBase, which supplies the head/finalized-head +// subscriptions (via polling) and subscription bookkeeping. It also embeds ServiceRPCClient +// which promotes the narrow set of domain methods needed by the aptos service. +type MultiNodeClient struct { + *multinode.RPCClientBase[*Head] + + ServiceRPCClient + lggr logger.Logger +} + +var _ multinode.RPCClient[multinode.StringID, *Head] = (*MultiNodeClient)(nil) + +// NewMultiNodeClient builds an adapter around the Aptos RPC at url. cfg supplies the +// head/finalized poll intervals consumed by RPCClientBase. +func NewMultiNodeClient( + url string, + cfg multinode.RPCClientBaseConfig, + requestTimeout time.Duration, + lggr logger.Logger, + rpcMetrics frameworkmetrics.RPCClientMetrics, +) (*MultiNodeClient, error) { + nodeClient, err := aptos.NewNodeClientWithHttpClient(url, 0, &http.Client{Timeout: requestTimeout}) + if err != nil { + return nil, fmt.Errorf("failed to create aptos node client for %s: %w", url, err) + } + + c := &MultiNodeClient{ + ServiceRPCClient: nodeClient, + lggr: logger.Named(lggr, "MultiNodeClient"), + } + c.RPCClientBase = multinode.NewRPCClientBase[*Head]( + cfg, requestTimeout, lggr, + c.latestBlock, + c.latestFinalizedBlock, + url, + false, // not send-only + rpcMetrics, + ) + return c, nil +} + +// Dial validates reachability of the endpoint. The SDK client is HTTP and does not hold a +// persistent connection, so a successful health probe stands in for a dial handshake. +func (c *MultiNodeClient) Dial(ctx context.Context) error { + _, err := c.NodeAPIHealthCheck() + if err != nil { + return fmt.Errorf("aptos rpc dial/health check failed: %w", err) + } + return nil +} + +// ChainID returns the Aptos chain ID as a StringID (e.g. "2" for testnet). This matches +// the config ChainID form so node chain-ID verification can be enabled. +func (c *MultiNodeClient) ChainID(_ context.Context) (multinode.StringID, error) { + chainID, err := c.GetChainId() + if err != nil { + return "", fmt.Errorf("failed to get chain id: %w", err) + } + return multinode.StringID(strconv.FormatUint(uint64(chainID), 10)), nil +} + +// ClientVersion doubles as the periodic liveness probe; it returns an error when the RPC is +// unreachable, which the node lifecycle treats as a health failure. +func (c *MultiNodeClient) ClientVersion(_ context.Context) (string, error) { + info, err := c.Info() + if err != nil { + return "", err + } + return info.GitHash, nil +} + +// IsSyncing is always false: Aptos RPC does not expose a backfill/sync state that blocks +// reads, and lagging nodes are caught by head-based out-of-sync detection instead. +func (c *MultiNodeClient) IsSyncing(_ context.Context) (bool, error) { return false, nil } + +// latestBlock / latestFinalizedBlock back the RPCClientBase head subscriptions. They are +// identical because the latest committed block is already final on Aptos. +func (c *MultiNodeClient) latestBlock(_ context.Context) (*Head, error) { + info, err := c.Info() + if err != nil { + return nil, err + } + return &Head{Height: info.BlockHeight()}, nil +} + +func (c *MultiNodeClient) latestFinalizedBlock(ctx context.Context) (*Head, error) { + return c.latestBlock(ctx) +} + +// Close tears down the framework subscriptions. The underlying SDK client is HTTP-only +// and has no persistent connection to close. +func (c *MultiNodeClient) Close() { + c.RPCClientBase.Close() +} diff --git a/relayer/chain/multinode_client_test.go b/relayer/chain/multinode_client_test.go new file mode 100644 index 00000000..32f62730 --- /dev/null +++ b/relayer/chain/multinode_client_test.go @@ -0,0 +1,158 @@ +package chain_test + +import ( + "errors" + "testing" + + aptos_sdk "github.com/aptos-labs/aptos-go-sdk" + "github.com/aptos-labs/aptos-go-sdk/api" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-aptos/relayer/chain" +) + +// mockServiceRPC is a minimal mock for ServiceRPCClient (8 methods). +type mockServiceRPC struct { + mock.Mock +} + +func (m *mockServiceRPC) Info() (aptos_sdk.NodeInfo, error) { + args := m.Called() + return args.Get(0).(aptos_sdk.NodeInfo), args.Error(1) +} + +func (m *mockServiceRPC) GetChainId() (uint8, error) { + args := m.Called() + return args.Get(0).(uint8), args.Error(1) +} + +func (m *mockServiceRPC) NodeAPIHealthCheck(durationSecs ...uint64) (api.HealthCheckResponse, error) { + args := m.Called(durationSecs) + return args.Get(0).(api.HealthCheckResponse), args.Error(1) +} + +func (m *mockServiceRPC) Account(address aptos_sdk.AccountAddress, ledgerVersion ...uint64) (aptos_sdk.AccountInfo, error) { + args := m.Called(address, ledgerVersion) + return args.Get(0).(aptos_sdk.AccountInfo), args.Error(1) +} + +func (m *mockServiceRPC) AccountAPTBalance(address aptos_sdk.AccountAddress, ledgerVersion ...uint64) (uint64, error) { + args := m.Called(address, ledgerVersion) + return args.Get(0).(uint64), args.Error(1) +} + +func (m *mockServiceRPC) View(payload *aptos_sdk.ViewPayload, ledgerVersion ...uint64) ([]any, error) { + args := m.Called(payload, ledgerVersion) + return args.Get(0).([]any), args.Error(1) +} + +func (m *mockServiceRPC) TransactionByHash(txnHash string) (*api.Transaction, error) { + args := m.Called(txnHash) + return args.Get(0).(*api.Transaction), args.Error(1) +} + +func (m *mockServiceRPC) AccountTransactions(address aptos_sdk.AccountAddress, start *uint64, limit *uint64) ([]*api.CommittedTransaction, error) { + args := m.Called(address, start, limit) + return args.Get(0).([]*api.CommittedTransaction), args.Error(1) +} + +// newTestAdapter wires a MultiNodeClient over a mocked ServiceRPCClient. RPCClientBase is +// left nil: the methods under test (ChainID/Dial/ClientVersion/IsSyncing and promoted domain +// calls) do not touch it. +func newTestAdapter(rpc chain.ServiceRPCClient) *chain.MultiNodeClient { + return &chain.MultiNodeClient{ServiceRPCClient: rpc} +} + +func TestHead(t *testing.T) { + t.Parallel() + + t.Run("valid head maps height to block number", func(t *testing.T) { + h := &chain.Head{Height: 1234} + require.True(t, h.IsValid()) + require.Equal(t, int64(1234), h.BlockNumber()) + require.Nil(t, h.BlockDifficulty()) + require.Nil(t, h.GetTotalDifficulty()) + }) + + t.Run("zero height and nil are invalid", func(t *testing.T) { + require.False(t, (&chain.Head{Height: 0}).IsValid()) + require.Equal(t, int64(0), (&chain.Head{Height: 0}).BlockNumber()) + var nilHead *chain.Head + require.False(t, nilHead.IsValid()) + require.Equal(t, int64(0), nilHead.BlockNumber()) + }) +} + +func TestMultiNodeClient_ChainID(t *testing.T) { + t.Parallel() + + t.Run("returns chain ID as string", func(t *testing.T) { + m := &mockServiceRPC{} + m.On("GetChainId").Return(uint8(2), nil) + + id, err := newTestAdapter(m).ChainID(t.Context()) + require.NoError(t, err) + require.Equal(t, "2", id.String()) + m.AssertExpectations(t) + }) + + t.Run("propagates RPC error", func(t *testing.T) { + m := &mockServiceRPC{} + m.On("GetChainId").Return(uint8(0), errors.New("boom")) + + _, err := newTestAdapter(m).ChainID(t.Context()) + require.ErrorContains(t, err, "boom") + m.AssertExpectations(t) + }) +} + +func TestMultiNodeClient_ClientVersion(t *testing.T) { + t.Parallel() + + m := &mockServiceRPC{} + m.On("Info").Return(aptos_sdk.NodeInfo{GitHash: "abc123"}, nil) + + v, err := newTestAdapter(m).ClientVersion(t.Context()) + require.NoError(t, err) + require.Equal(t, "abc123", v) + m.AssertExpectations(t) +} + +func TestMultiNodeClient_IsSyncing(t *testing.T) { + t.Parallel() + syncing, err := newTestAdapter(nil).IsSyncing(t.Context()) + require.NoError(t, err) + require.False(t, syncing) +} + +func TestMultiNodeClient_Dial(t *testing.T) { + t.Parallel() + + t.Run("ok when health succeeds", func(t *testing.T) { + m := &mockServiceRPC{} + m.On("NodeAPIHealthCheck", mock.Anything).Return(api.HealthCheckResponse{Message: "ok"}, nil) + require.NoError(t, newTestAdapter(m).Dial(t.Context())) + m.AssertExpectations(t) + }) + + t.Run("errors when health fails", func(t *testing.T) { + m := &mockServiceRPC{} + m.On("NodeAPIHealthCheck", mock.Anything).Return(api.HealthCheckResponse{}, errors.New("unreachable")) + require.ErrorContains(t, newTestAdapter(m).Dial(t.Context()), "unreachable") + m.AssertExpectations(t) + }) +} + +func TestMultiNodeClient_ForwardsDomainCall(t *testing.T) { + t.Parallel() + + m := &mockServiceRPC{} + addr := aptos_sdk.AccountAddress{} + m.On("AccountAPTBalance", addr, mock.Anything).Return(uint64(42), nil) + + balance, err := newTestAdapter(m).AccountAPTBalance(addr) + require.NoError(t, err) + require.Equal(t, uint64(42), balance) + m.AssertExpectations(t) +} diff --git a/relayer/config/config.go b/relayer/config/config.go index fc2023d2..001ed974 100644 --- a/relayer/config/config.go +++ b/relayer/config/config.go @@ -4,11 +4,13 @@ import ( "errors" "fmt" "strings" + "time" "github.com/aptos-labs/aptos-go-sdk" "github.com/pelletier/go-toml/v2" "github.com/smartcontractkit/chainlink-common/pkg/config" + mncfg "github.com/smartcontractkit/chainlink-framework/multinode/config" "github.com/smartcontractkit/chainlink-aptos/relayer/logpoller" "github.com/smartcontractkit/chainlink-aptos/relayer/monitor" @@ -50,6 +52,9 @@ type Chain struct { type Node struct { Name *string URL *config.URL + // Order is the node priority used as a tiebreak by the multinode selector (lower wins on + // equal head). Defaults to 0 when unset. + Order *int32 `toml:"Order"` } func (n *Node) ValidateConfig() (err error) { @@ -79,6 +84,55 @@ type TOMLConfig struct { Chain Nodes Nodes + + // MultiNode configures RPC node selection, health checking, and failover. Omitted fields + // are filled by SetMultiNodeDefaults. See chainlink-framework/multinode. + MultiNode mncfg.MultiNodeConfig `toml:"MultiNode"` + + // RequestTimeout bounds each individual Aptos RPC call (and the underlying HTTP client + // timeout). Defaults to DefaultRequestTimeout when unset. + RequestTimeout *config.Duration `toml:"RequestTimeout"` +} + +// DefaultRequestTimeout bounds each individual Aptos RPC call when RequestTimeout is unset. +const DefaultRequestTimeout = 30 * time.Second + +// SetMultiNodeDefaults fills any unset MultiNode field with an Aptos-appropriate default. The +// framework's config accessors dereference these pointers directly, so every field consumed by +// the node/multinode lifecycle must be non-nil. Tuned to Aptos's ~1-2s block time and its +// single-finality model (a committed block is final: no reorgs). +func (c *TOMLConfig) SetMultiNodeDefaults() { + m := &c.MultiNode.MultiNode + setDefault(&m.Enabled, true) + setDefault(&m.PollFailureThreshold, uint32(5)) + setDefault(&m.PollInterval, *config.MustNewDuration(5*time.Second)) + setDefault(&m.SelectionMode, "HighestHead") + setDefault(&m.SyncThreshold, uint32(5)) + setDefault(&m.NodeIsSyncingEnabled, false) + setDefault(&m.LeaseDuration, *config.MustNewDuration(0)) + // Poll heads slightly faster than the ~1-2s block time so out-of-sync nodes are detected + setDefault(&m.NewHeadsPollInterval, *config.MustNewDuration(2*time.Second)) + setDefault(&m.FinalizedBlockPollInterval, *config.MustNewDuration(2*time.Second)) + setDefault(&m.EnforceRepeatableRead, false) + setDefault(&m.DeathDeclarationDelay, *config.MustNewDuration(20*time.Second)) + setDefault(&m.VerifyChainID, true) + setDefault(&m.NodeNoNewHeadsThreshold, *config.MustNewDuration(15*time.Second)) + // NoNewFinalizedHeadsThreshold is read unconditionally by the node lifecycle even though + // the finalized-head subscription is disabled (FinalityTagEnabled=false); keep it non-nil. + setDefault(&m.NoNewFinalizedHeadsThreshold, *config.MustNewDuration(15*time.Second)) + // Aptos blocks are final at commit: derive "finalized" as latest (FinalityDepth=0) and + // never run the finalized-head subscription (FinalityTagEnabled=false). + setDefault(&m.FinalityDepth, uint32(0)) + setDefault(&m.FinalityTagEnabled, false) + setDefault(&m.FinalizedBlockOffset, uint32(0)) + setDefault(&c.RequestTimeout, *config.MustNewDuration(DefaultRequestTimeout)) +} + +func setDefault[T any](p **T, val T) { + if *p == nil { + v := val + *p = &v + } } // applyDefaults ensures all component configs are non-nil and fully populated. @@ -105,6 +159,8 @@ func (cfg *TOMLConfig) applyDefaults() { } cfg.WriteTargetCap.Resolve() + cfg.SetMultiNodeDefaults() + // Set network name defaults if cfg.NetworkName == "" { network, err := GetNetworkConfig(cfg.ChainID) diff --git a/relayer/config/config_test.go b/relayer/config/config_test.go index d348ff03..47dc1507 100644 --- a/relayer/config/config_test.go +++ b/relayer/config/config_test.go @@ -141,6 +141,56 @@ BalancePollPeriod = "30s" }) } +func TestSetMultiNodeDefaults(t *testing.T) { + t.Parallel() + + t.Run("fills Aptos-appropriate defaults", func(t *testing.T) { + t.Parallel() + + cfg, err := NewDecodedTOMLConfig(baseTOML) + require.NoError(t, err) + + m := &cfg.MultiNode + + assert.True(t, m.Enabled()) + assert.Equal(t, uint32(5), m.PollFailureThreshold()) + assert.Equal(t, 5*time.Second, m.PollInterval()) + assert.Equal(t, "HighestHead", m.SelectionMode()) + assert.Equal(t, uint32(5), m.SyncThreshold()) + assert.False(t, m.NodeIsSyncingEnabled()) + assert.Equal(t, time.Duration(0), m.LeaseDuration()) + assert.Equal(t, 2*time.Second, m.NewHeadsPollInterval()) + assert.Equal(t, 2*time.Second, m.FinalizedBlockPollInterval()) + assert.False(t, m.EnforceRepeatableRead()) + assert.Equal(t, 20*time.Second, m.DeathDeclarationDelay()) + assert.True(t, m.VerifyChainID()) + assert.Equal(t, 15*time.Second, m.NodeNoNewHeadsThreshold()) + assert.Equal(t, 15*time.Second, m.NoNewFinalizedHeadsThreshold()) + assert.False(t, m.FinalityTagEnabled()) + assert.Equal(t, uint32(0), m.FinalityDepth()) + assert.Equal(t, uint32(0), m.FinalizedBlockOffset()) + + require.NotNil(t, cfg.RequestTimeout) + assert.Equal(t, DefaultRequestTimeout, cfg.RequestTimeout.Duration()) + }) + + t.Run("respects explicit overrides", func(t *testing.T) { + t.Parallel() + + raw := baseTOML + ` +[MultiNode] +SelectionMode = "RoundRobin" +NewHeadsPollInterval = "1s" +` + cfg, err := NewDecodedTOMLConfig(raw) + require.NoError(t, err) + assert.Equal(t, "RoundRobin", cfg.MultiNode.SelectionMode()) + assert.Equal(t, 1*time.Second, cfg.MultiNode.NewHeadsPollInterval()) + // Unset fields still get defaults + assert.Equal(t, uint32(5), cfg.MultiNode.PollFailureThreshold()) + }) +} + // Regression guard: global DefaultConfigSet must never be mutated by config resolution func TestNoGlobalMutation(t *testing.T) { t.Parallel() From 81b83025cf6599a1a0c207ea3f15ee5443aa7a22 Mon Sep 17 00:00:00 2001 From: yashnevatia Date: Tue, 16 Jun 2026 14:04:54 +0100 Subject: [PATCH 2/3] Add multinode --- relayer/chain/chain.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/relayer/chain/chain.go b/relayer/chain/chain.go index 5f33f9ae..1144fdd1 100644 --- a/relayer/chain/chain.go +++ b/relayer/chain/chain.go @@ -334,6 +334,7 @@ func (c *chain) Start(ctx context.Context) error { c.lggr.Debug("Starting txm") c.lggr.Debug("Starting logPoller") c.lggr.Debug("Starting balance monitor") + c.lggr.Debug("Starting multinode") var ms services.MultiStart return ms.Start(ctx, c.multiNode, c.txm, c.logPoller, c.balanceMonitor) @@ -346,6 +347,7 @@ func (c *chain) Close() error { c.lggr.Debug("Stopping txm") c.lggr.Debug("Stopping logPoller") c.lggr.Debug("Stopping balance monitor") + c.lggr.Debug("Stopping multinode") return services.CloseAll(c.txm, c.logPoller, c.balanceMonitor, c.multiNode) }) From 04c39cbba6e63a48148dcc7c90176528077b4cd9 Mon Sep 17 00:00:00 2001 From: yashnevatia Date: Tue, 16 Jun 2026 14:07:56 +0100 Subject: [PATCH 3/3] Tidy --- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 396fb67d..4f531391 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -378,7 +378,7 @@ require ( github.com/smartcontractkit/chainlink-framework/capabilities v0.0.0-20250818175541-3389ac08a563 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20260326122810-b657beadfb57 // indirect github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260505202410-b350dca113b4 // indirect - github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260326180413-c69f27e37a13 // indirect + github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243 // indirect github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 // indirect github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d // indirect github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat v0.0.0-20260115142640-f6b99095c12e // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 914a7853..b935a127 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1425,8 +1425,8 @@ github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20260326122810-b65 github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20260326122810-b657beadfb57/go.mod h1:kGprqyjsz6qFNVszOQoHc24wfvCjyipNZFste/3zcbs= github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260505202410-b350dca113b4 h1:v/rAtObo9zMpQDnGQ0seaSEqw60JUjowwcNw3DCpVY4= github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260505202410-b350dca113b4/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4= -github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260326180413-c69f27e37a13 h1:3KLLkTCIAy9CvT35Ey0k6pcWX/u+qsm3Y/58TI5VSAg= -github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260326180413-c69f27e37a13/go.mod h1:Y7h84PqCe/Vimf2h1Nc6tMiOJStDbtM33fEUeaaF5xk= +github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243 h1:71PGTkjdFZ0JrloEC2Fs8eHl1b1gmUuH+bq7q23usKk= +github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243/go.mod h1:7ketk4ischPQW/JQgmyHz6zdzLUJv1VC29SiSgosydQ= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d h1:VYoBBNnQpZ5p+enPTl8SkKBRaubqyGpO0ul3B1np++I=