From 8acd57074bd8a8cdfa684d5cc8ec1ce10469e34c Mon Sep 17 00:00:00 2001 From: nnn-gif Date: Wed, 13 May 2026 14:30:10 +0530 Subject: [PATCH 1/5] feat: cache last updated intent --- contracts/.gitignore | 2 + services/bridge/Dockerfile | 2 +- services/bridge/config/event_definitions.go | 1 + services/bridge/config/modular_types.go | 2 + services/bridge/config/types.go | 8 + services/bridge/go.mod | 1 + services/bridge/go.sum | 2 + services/bridge/internal/api/prices.go | 89 ++++ services/bridge/internal/api/server.go | 7 + services/bridge/internal/bridge/bridge.go | 42 +- services/bridge/internal/bridge/server.go | 8 +- .../internal/bridge/transaction_handler.go | 6 +- services/bridge/internal/cron/cron_service.go | 453 ++++++++++++++++++ .../processor/generic_event_processor.go | 82 +++- .../bridge/internal/processor/price_cache.go | 180 +++++++ services/bridge/internal/types/types.go | 3 +- services/bridge/pkg/router/generic_router.go | 2 +- 17 files changed, 874 insertions(+), 16 deletions(-) create mode 100644 services/bridge/internal/api/prices.go create mode 100644 services/bridge/internal/cron/cron_service.go create mode 100644 services/bridge/internal/processor/price_cache.go diff --git a/contracts/.gitignore b/contracts/.gitignore index ffdaee3..654d7e7 100644 --- a/contracts/.gitignore +++ b/contracts/.gitignore @@ -22,3 +22,5 @@ ignition/deployments/chain-31337 broadcast contracts/coverage_report coverage_report +config-local + diff --git a/services/bridge/Dockerfile b/services/bridge/Dockerfile index 11a5181..834dca2 100644 --- a/services/bridge/Dockerfile +++ b/services/bridge/Dockerfile @@ -1,5 +1,5 @@ # Build stage -FROM golang:1.24-alpine AS builder +FROM golang:1.25-alpine AS builder # Install dependencies RUN apk add --no-cache git diff --git a/services/bridge/config/event_definitions.go b/services/bridge/config/event_definitions.go index 840b646..88d812c 100644 --- a/services/bridge/config/event_definitions.go +++ b/services/bridge/config/event_definitions.go @@ -63,6 +63,7 @@ type LegacyRouterDestination struct { Method DestinationMethodConfig `json:"method"` Condition string `json:"condition"` TimeThreshold Duration `json:"time_threshold,omitempty"` // Minimum time between updates for this destination + Cron bool `json:"cron,omitempty"` // Enable cron-based updates for this destination } // DestinationMethodConfig defines a contract method call for generic routing diff --git a/services/bridge/config/modular_types.go b/services/bridge/config/modular_types.go index 5777881..976437a 100644 --- a/services/bridge/config/modular_types.go +++ b/services/bridge/config/modular_types.go @@ -36,6 +36,7 @@ type InfrastructureConfig struct { Metrics MetricsConfig `yaml:"metrics" json:"metrics"` Replica *ReplicaConfig `yaml:"replica,omitempty" json:"replica,omitempty"` DryRun bool `yaml:"dry_run,omitempty" json:"dry_run,omitempty"` + CronService CronServiceConfig `yaml:"cron_service" json:"cron_service"` } type ChainConfig struct { @@ -91,6 +92,7 @@ type RouterDestination struct { Condition string `yaml:"condition,omitempty" json:"condition,omitempty"` TimeThreshold Duration `yaml:"time_threshold,omitempty" json:"time_threshold,omitempty"` PriceDeviation string `yaml:"price_deviation,omitempty" json:"price_deviation,omitempty"` // e.g., "0.5%" or "1.0%" + Cron bool `yaml:"cron,omitempty" json:"cron,omitempty"` // Enable cron-based updates for this destination // Gas configuration (from modular version) GasLimit uint64 `yaml:"gas_limit,omitempty" json:"gas_limit,omitempty"` diff --git a/services/bridge/config/types.go b/services/bridge/config/types.go index f8e523c..d1509ed 100644 --- a/services/bridge/config/types.go +++ b/services/bridge/config/types.go @@ -23,6 +23,7 @@ type Config struct { API APIConfig `json:"api"` Metrics MetricsConfig `json:"metrics"` DryRun bool `json:"dry_run"` + CronService CronServiceConfig `json:"cron_service"` } // DatabaseConfig represents database configuration @@ -197,3 +198,10 @@ func (d *Duration) UnmarshalYAML(unmarshal func(interface{}) error) error { func (d Duration) MarshalYAML() (interface{}, error) { return time.Duration(d).String(), nil } + +// CronServiceConfig represents cron-based update service configuration +type CronServiceConfig struct { + Enabled bool `yaml:"enabled" json:"enabled"` + Schedule string `yaml:"schedule" json:"schedule"` // Default cron expression, used if router doesn't have time_threshold + PriceDeviation float64 `yaml:"price_deviation" json:"price_deviation"` // Default minimum price deviation to trigger update (as percentage) +} diff --git a/services/bridge/go.mod b/services/bridge/go.mod index 88590f7..7815c72 100644 --- a/services/bridge/go.mod +++ b/services/bridge/go.mod @@ -49,6 +49,7 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/sagikazarmark/locafero v0.11.0 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/sirupsen/logrus v1.9.3 // indirect diff --git a/services/bridge/go.sum b/services/bridge/go.sum index 84ecc3d..ad0ccee 100644 --- a/services/bridge/go.sum +++ b/services/bridge/go.sum @@ -176,6 +176,8 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= diff --git a/services/bridge/internal/api/prices.go b/services/bridge/internal/api/prices.go new file mode 100644 index 0000000..53ef73b --- /dev/null +++ b/services/bridge/internal/api/prices.go @@ -0,0 +1,89 @@ +package api + +import ( + "encoding/json" + "net/http" + + "github.com/diadata.org/Spectra-interoperability/pkg/logger" + "github.com/diadata.org/Spectra-interoperability/services/bridge/internal/processor" + "github.com/gorilla/mux" +) + +// handleGetPrices returns all prices from the in-memory cache +func (s *Server) handleGetPrices(w http.ResponseWriter, r *http.Request) { + priceCache, ok := s.priceCache.(*processor.PriceCache) + if !ok || priceCache == nil { + logger.Warn("Price cache not available") + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusServiceUnavailable) + json.NewEncoder(w).Encode(map[string]string{ + "error": "Price cache not available", + }) + return + } + + prices := priceCache.GetAllPrices() + + response := make(map[string]interface{}) + for symbol, entry := range prices { + response[symbol] = map[string]interface{}{ + "price": entry.Price, + "timestamp": entry.Timestamp, + "intent_hash": entry.IntentHash, + } + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "count": len(prices), + "prices": response, + }) +} + +// handleGetPrice returns the price for a specific symbol +func (s *Server) handleGetPrice(w http.ResponseWriter, r *http.Request) { + priceCache, ok := s.priceCache.(*processor.PriceCache) + if !ok || priceCache == nil { + logger.Warn("Price cache not available") + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusServiceUnavailable) + json.NewEncoder(w).Encode(map[string]string{ + "error": "Price cache not available", + }) + return + } + + vars := mux.Vars(r) + symbol := vars["symbol"] + + if symbol == "" { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(map[string]string{ + "error": "Symbol is required", + }) + return + } + + priceEntry, exists := priceCache.GetPrice(symbol) + if !exists { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusNotFound) + json.NewEncoder(w).Encode(map[string]string{ + "error": "Price not found for symbol: " + symbol, + }) + return + } + + response := map[string]interface{}{ + "symbol": symbol, + "price": priceEntry.Price, + "timestamp": priceEntry.Timestamp, + "intent_hash": priceEntry.IntentHash, + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(response) +} diff --git a/services/bridge/internal/api/server.go b/services/bridge/internal/api/server.go index f261a18..15ec8ec 100644 --- a/services/bridge/internal/api/server.go +++ b/services/bridge/internal/api/server.go @@ -27,6 +27,7 @@ type Server struct { db *database.DB metrics *metrics.Collector routerRegistry interface{} // Will be *router.Registry when available + priceCache interface{} // Will be *processor.PriceCache when available router *mux.Router httpServer *http.Server @@ -39,6 +40,7 @@ func NewServer( db *database.DB, metricsCollector *metrics.Collector, routerRegistry interface{}, // Pass as interface{} to avoid import cycle + priceCache interface{}, // Pass as interface{} to avoid import cycle ) *Server { s := &Server{ config: &cfgService.GetInfrastructure().API, @@ -46,6 +48,7 @@ func NewServer( db: db, metrics: metricsCollector, routerRegistry: routerRegistry, + priceCache: priceCache, router: mux.NewRouter(), } @@ -155,6 +158,10 @@ func (s *Server) setupRoutes() { v1.HandleFunc("/symbols", s.handleGetSymbols).Methods("GET") v1.HandleFunc("/symbols/{symbol}/updates", s.handleGetSymbolUpdates).Methods("GET") + // Price cache endpoints + v1.HandleFunc("/prices", s.handleGetPrices).Methods("GET") + v1.HandleFunc("/prices/{symbol}", s.handleGetPrice).Methods("GET") + // Failover endpoints (if available) if s.failoverHandler != nil { logger.Info("Registering failover routes - handler is NOT nil") diff --git a/services/bridge/internal/bridge/bridge.go b/services/bridge/internal/bridge/bridge.go index 39dad19..67831a2 100644 --- a/services/bridge/internal/bridge/bridge.go +++ b/services/bridge/internal/bridge/bridge.go @@ -13,6 +13,7 @@ import ( "github.com/diadata.org/Spectra-interoperability/pkg/rpc" "github.com/diadata.org/Spectra-interoperability/services/bridge/config" "github.com/diadata.org/Spectra-interoperability/services/bridge/internal/api" + "github.com/diadata.org/Spectra-interoperability/services/bridge/internal/cron" "github.com/diadata.org/Spectra-interoperability/services/bridge/internal/database" "github.com/diadata.org/Spectra-interoperability/services/bridge/internal/leader" "github.com/diadata.org/Spectra-interoperability/services/bridge/internal/metrics" @@ -54,9 +55,14 @@ type Bridge struct { // On-chain monitor for replica failover onChainMonitor *leader.OnChainMonitor + // Cron service for periodic updates + cronService *cron.Service // Event source eventSource *EventSource + // Event processor + eventProcessor *processor.GenericEventProcessor + // Metrics tracking metricsManager *MetricsManager @@ -254,6 +260,7 @@ func NewBridge(modularCfg *config.ModularConfig, cfgService *config.ConfigServic return nil, fmt.Errorf("failed to create event processor: %w", err) } eventProcessor = ep + bridge.eventProcessor = eventProcessor // Wrap scanner and processor in EventSource bridge.eventSource = NewEventSource(scanner, eventProcessor, eventChan, bridge.updateChan, errorChan) @@ -287,6 +294,16 @@ func NewBridge(modularCfg *config.ModularConfig, cfgService *config.ConfigServic bridge.onChainMonitor = leader.NewOnChainMonitor(routerRegistry, ethClients, monitorConfig) + // Initialize cron service for periodic updates + cronConfig := cfgService.GetInfrastructure().CronService + bridge.cronService = cron.NewService( + eventProcessor.GetPriceCache(), + routerRegistry, + ethClients, + bridge.updateChan, + cronConfig, + ) + logger.Infof("Bridge initialized with %d routers", routerRegistry.Count()) return bridge, nil @@ -374,6 +391,13 @@ func (b *Bridge) Start(ctx context.Context) error { time.Sleep(2 * time.Second) // Wait for initial check } + // Start cron service + if b.cronService != nil { + if err := b.cronService.Start(); err != nil { + return fmt.Errorf("failed to start cron service: %w", err) + } + } + // Start worker pool b.workerPool.Start(ctx) @@ -466,6 +490,11 @@ func (b *Bridge) Stop(ctx context.Context) error { } } + // Stop cron service + if b.cronService != nil { + b.cronService.Stop() + } + // Stop worker pool b.workerPool.Stop(ctx) @@ -574,7 +603,9 @@ func (b *Bridge) processUpdates(ctx context.Context) { } } - if b.onChainMonitor != nil { + if b.onChainMonitor != nil && !updateReq.IsCronTriggered { + logger.Infof("Exceptional Updates") + // Skip ShouldProcess check for cron-triggered updates symbol := "unknown" if updateReq.Intent != nil && updateReq.Intent.Symbol != "" { symbol = updateReq.Intent.Symbol @@ -605,15 +636,20 @@ func (b *Bridge) processUpdates(ctx context.Context) { continue } - // Only set TriggeredByMonitoring if replica failover is actually enabled + // Only set IsMonitoringTriggered if replica failover is actually enabled // (not just monitoring-only mode) if b.onChainMonitor.IsEnabled() { - updateReq.TriggeredByMonitoring = true + updateReq.IsMonitoringTriggered = true logger.Infof("Processing update: monitoring check passed for chain=%d contract=%s symbol=%s", updateReq.DestinationChain.ChainID, updateReq.Contract.Address, symbol) } + } else if updateReq.IsCronTriggered { + logger.Infof("Processing cron-triggered update for chain=%d contract=%s symbol=%s (bypassing monitoring checks)", + updateReq.DestinationChain.ChainID, + updateReq.Contract.Address, + updateReq.Intent.Symbol) } // Create task ID based on available data diff --git a/services/bridge/internal/bridge/server.go b/services/bridge/internal/bridge/server.go index 6003e2c..0ab38d0 100644 --- a/services/bridge/internal/bridge/server.go +++ b/services/bridge/internal/bridge/server.go @@ -26,7 +26,13 @@ func (b *Bridge) startMetricsServer(ctx context.Context) { metricsCollector.FailoverMetrics = b.metricsManager.GetFailoverMetrics() } - apiServer := api.NewServer(b.configService, b.db, metricsCollector, b.routerRegistry) + // Get price cache from event processor if available + var priceCache interface{} + if b.eventProcessor != nil { + priceCache = b.eventProcessor.GetPriceCache() + } + + apiServer := api.NewServer(b.configService, b.db, metricsCollector, b.routerRegistry, priceCache) go func() { if err := apiServer.Start(ctx); err != nil { diff --git a/services/bridge/internal/bridge/transaction_handler.go b/services/bridge/internal/bridge/transaction_handler.go index 27da6fb..cc8e947 100644 --- a/services/bridge/internal/bridge/transaction_handler.go +++ b/services/bridge/internal/bridge/transaction_handler.go @@ -62,7 +62,7 @@ func (h *TransactionHandler) Process(ctx context.Context, updateReq *bridgetypes return err } - logger.Infof("Processing update for %s on chain %d (elapsed=%v)", txCtx.Identifier, txCtx.UpdateRequest.DestinationChain.ChainID, time.Since(startTime)) + logger.Infof("[TX-HANDLER] Processing update for %s on chain %d (elapsed=%v)", txCtx.Identifier, txCtx.UpdateRequest.DestinationChain.ChainID, time.Since(startTime)) if err := h.validate(txCtx); err != nil { return err @@ -75,11 +75,11 @@ func (h *TransactionHandler) Process(ctx context.Context, updateReq *bridgetypes } triggeredByMonitoring := "" - if txCtx.UpdateRequest.TriggeredByMonitoring && h.onChainMonitor != nil { + if txCtx.UpdateRequest.IsMonitoringTriggered && h.onChainMonitor != nil { monitoringInfo := h.getMonitoringInfo(txCtx) triggeredByMonitoring = monitoringInfo } - logger.Infof("Transaction sent: %s for %s on chain %d, router=%s, symbol=%s%s", + logger.Infof("[TX-HANDLER] Transaction sent: %s for %s on chain %d, router=%s, symbol=%s%s", tx.Hash().Hex(), txCtx.Identifier, txCtx.UpdateRequest.DestinationChain.ChainID, txCtx.UpdateRequest.RouterID, txCtx.Symbol, triggeredByMonitoring) diff --git a/services/bridge/internal/cron/cron_service.go b/services/bridge/internal/cron/cron_service.go new file mode 100644 index 0000000..8378635 --- /dev/null +++ b/services/bridge/internal/cron/cron_service.go @@ -0,0 +1,453 @@ +package cron + +import ( + "context" + "fmt" + "math/big" + "strings" + "sync" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/robfig/cron/v3" + + "github.com/diadata.org/Spectra-interoperability/pkg/logger" + "github.com/diadata.org/Spectra-interoperability/pkg/rpc" + "github.com/diadata.org/Spectra-interoperability/services/bridge/config" + "github.com/diadata.org/Spectra-interoperability/services/bridge/internal/processor" + "github.com/diadata.org/Spectra-interoperability/services/bridge/internal/types" + "github.com/diadata.org/Spectra-interoperability/services/bridge/internal/utils" + "github.com/diadata.org/Spectra-interoperability/services/bridge/pkg/router" +) + +// Service manages cron-based price updates from price cache to on-chain +type Service struct { + priceCache *processor.PriceCache + routerRegistry *router.GenericRegistry + writeClients map[int64]rpc.EthClient + updateChan chan *types.UpdateRequest + config config.CronServiceConfig + cron *cron.Cron + ctx context.Context + cancel context.CancelFunc + monitoredDestinations map[string]*DestinationMonitor // key -> monitor + routerSchedules map[string]string // routerID -> cron schedule + mu sync.RWMutex +} + +// DestinationMonitor tracks a destination for cron updates +type DestinationMonitor struct { + RouterID string + ChainID int64 + ContractAddress common.Address + Symbol string + Client rpc.EthClient + TimeThreshold time.Duration // Per-destination time threshold + PriceDeviation float64 // Per-destination price deviation + MethodConfig *config.DestinationMethodConfig // Destination method config for updates + LastOnChainTime uint64 + LastOnChainValue *big.Int + LastUpdateTime time.Time +} + +// NewService creates a new cron service +func NewService( + priceCache *processor.PriceCache, + routerRegistry *router.GenericRegistry, + writeClients map[int64]rpc.EthClient, + updateChan chan *types.UpdateRequest, + cfg config.CronServiceConfig, +) *Service { + ctx, cancel := context.WithCancel(context.Background()) + + service := &Service{ + priceCache: priceCache, + routerRegistry: routerRegistry, + writeClients: writeClients, + updateChan: updateChan, + config: cfg, + ctx: ctx, + cancel: cancel, + monitoredDestinations: make(map[string]*DestinationMonitor), + routerSchedules: make(map[string]string), + } + + return service +} + +// Start initializes and starts the cron service +func (s *Service) Start() error { + if !s.config.Enabled { + logger.Info("Cron service is disabled in configuration") + return nil + } + + logger.Info("Starting cron service") + + // Build monitor list from routers with cron: true + s.buildMonitorList() + + if len(s.monitoredDestinations) == 0 { + logger.Warn("No destinations configured for cron monitoring (no routers with cron: true found)") + return nil + } + + // Initialize cron scheduler + s.cron = cron.New(cron.WithSeconds(), cron.WithLocation(time.UTC)) + + // Add cron jobs for each router with its own schedule + for routerID, schedule := range s.routerSchedules { + if schedule == "" { + schedule = s.config.Schedule + if schedule == "" { + schedule = "0 */5 * * * *" // Default: every 5 minutes + } + } + + // Create a closure to capture routerID + routerID := routerID + schedule := schedule + + _, err := s.cron.AddFunc(schedule, func() { + s.runCronJob(routerID) + }) + if err != nil { + return fmt.Errorf("failed to add cron job for router %s: %w", routerID, err) + } + + logger.Infof("Cron job added for router=%s with schedule: %s", routerID, schedule) + } + + // Start the cron scheduler + s.cron.Start() + + logger.Infof("Cron service started with %d routers and %d destination-symbol combinations", + len(s.routerSchedules), len(s.monitoredDestinations)) + + return nil +} + +// Stop gracefully stops the cron service +func (s *Service) Stop() { + if s.cron != nil { + logger.Info("Stopping cron service") + s.cron.Stop() + } + s.cancel() +} + +// buildMonitorList builds the list of destinations to monitor +func (s *Service) buildMonitorList() { + s.mu.Lock() + defer s.mu.Unlock() + + // Clear existing monitors and schedules + s.monitoredDestinations = make(map[string]*DestinationMonitor) + s.routerSchedules = make(map[string]string) + + // Get active routers + activeRouters := s.routerRegistry.GetActiveRouters() + + for _, routerInstance := range activeRouters { + routerID := routerInstance.ID() + routerConfig := routerInstance.GetConfig() + if routerConfig == nil { + continue + } + + // Check if any destination has cron enabled + hasCronDestinations := false + var routerSchedule string + + // Get symbols from router + symbols := router.GetSymbolsFromConfig(routerConfig) + + // Process each destination + for _, dest := range routerConfig.Destinations { + // Check if cron is enabled for this destination + if !dest.Cron { + continue + } + + hasCronDestinations = true + + // Check if we have a client for this chain + client, exists := s.writeClients[dest.ChainID] + if !exists { + logger.Warnf("No write client for chain %d, skipping cron monitoring for router %s", dest.ChainID, routerID) + continue + } + + // Parse time_threshold to create cron schedule + timeThreshold := dest.TimeThreshold.Duration() + if timeThreshold == 0 { + timeThreshold = 5 * time.Minute // Default + } + + // Convert time_threshold to cron expression + // For example: 1m -> "*/1 * * * *", 5m -> "*/5 * * * *" + minutes := int(timeThreshold.Minutes()) + if minutes < 1 { + minutes = 1 + } + cronSchedule := fmt.Sprintf("0 */%d * * * *", minutes) + + // Store the schedule for this router + if routerSchedule == "" || routerSchedule == cronSchedule { + routerSchedule = cronSchedule + } else { + logger.Warnf("Router %s has destinations with different time_thresholds, using first schedule: %s", routerID, routerSchedule) + } + + // Parse price deviation if available, otherwise use config default + priceDeviation := s.config.PriceDeviation + if dest.PriceDeviation != "" { + // Try to parse percentage (e.g., "0.10%" -> 0.10) + var pd float64 + _, err := fmt.Sscanf(dest.PriceDeviation, "%f%%", &pd) + if err == nil { + priceDeviation = pd + } + } + + contractAddress := common.HexToAddress(dest.Contract) + + // Create a monitor for each symbol + for _, symbol := range symbols { + key := utils.GenerateDestinationKey(dest.ChainID, dest.Contract, symbol) + s.monitoredDestinations[key] = &DestinationMonitor{ + RouterID: routerID, + ChainID: dest.ChainID, + ContractAddress: contractAddress, + Symbol: symbol, + Client: client, + TimeThreshold: timeThreshold, + PriceDeviation: priceDeviation, + MethodConfig: &dest.Method, // Store method config for transaction execution + } + logger.Debugf("Added cron monitoring for router=%s chain=%d contract=%s symbol=%s time_threshold=%v price_deviation=%.2f%%", + routerID, dest.ChainID, dest.Contract, symbol, timeThreshold, priceDeviation) + } + } + + // Store the router schedule if it has cron destinations + if hasCronDestinations && routerSchedule != "" { + s.routerSchedules[routerID] = routerSchedule + logger.Infof("Router %s has cron enabled with schedule: %s", routerID, routerSchedule) + } + } +} + +// runCronJob executes the cron job for a specific router +func (s *Service) runCronJob(routerID string) { + logger.Debugf("Cron job started for router=%s", routerID) + + s.mu.RLock() + destinations := make([]*DestinationMonitor, 0, len(s.monitoredDestinations)) + for _, monitor := range s.monitoredDestinations { + if monitor.RouterID == routerID { + destinations = append(destinations, monitor) + } + } + s.mu.RUnlock() + + // Check each destination for this router + for _, monitor := range destinations { + go s.checkDestination(monitor) + } +} + +// checkDestination checks a single destination and triggers update if needed +func (s *Service) checkDestination(monitor *DestinationMonitor) { + // Get on-chain value and timestamp + onChainValue, onChainTimestamp, err := s.getOnChainValue(monitor) + if err != nil { + logger.Errorf("Failed to get on-chain value for router=%s chain=%d contract=%s symbol=%s: %v", + monitor.RouterID, monitor.ChainID, monitor.ContractAddress.Hex(), monitor.Symbol, err) + return + } + + // Update monitor state + monitor.LastOnChainValue = onChainValue + monitor.LastOnChainTime = onChainTimestamp + monitor.LastUpdateTime = time.Now() + + // Get cached price + priceEntry, exists := s.priceCache.GetPrice(monitor.Symbol) + if !exists { + logger.Debugf("No cached price for symbol=%s", monitor.Symbol) + return + } + + // Parse cached price + cachedPrice, ok := new(big.Int).SetString(priceEntry.Price, 10) + if !ok { + logger.Errorf("Failed to parse cached price for symbol=%s: %s", monitor.Symbol, priceEntry.Price) + return + } + + // Check if update is needed + shouldUpdate := s.shouldUpdate(monitor, cachedPrice, onChainValue, onChainTimestamp, priceEntry.Timestamp) + + if shouldUpdate { + logger.Infof("Cron triggering update for router=%s chain=%d contract=%s symbol=%s: cached=%s on-chain=%s cached_ts=%d on-chain_ts=%d", + monitor.RouterID, monitor.ChainID, monitor.ContractAddress.Hex(), monitor.Symbol, + cachedPrice.String(), onChainValue.String(), priceEntry.Timestamp, onChainTimestamp) + + s.triggerUpdate(monitor, priceEntry) + } else { + logger.Debugf("No update needed for router=%s chain=%d contract=%s symbol=%s", + monitor.RouterID, monitor.ChainID, monitor.ContractAddress.Hex(), monitor.Symbol) + } +} + +// shouldUpdate determines if an update should be triggered +func (s *Service) shouldUpdate(monitor *DestinationMonitor, cachedPrice, onChainValue *big.Int, onChainTimestamp, cachedTimestamp uint64) bool { + // If there's no on-chain value, always update + if onChainValue == nil || onChainValue.Sign() == 0 { + logger.Debugf("No on-chain value for %s, triggering update", monitor.Symbol) + return true + } + + // If cached price is different from on-chain, check deviation + if cachedPrice.Cmp(onChainValue) != 0 { + if monitor.PriceDeviation > 0 { + // Calculate percentage deviation + diff := new(big.Int).Sub(cachedPrice, onChainValue) + oldFloat := new(big.Float).SetInt(onChainValue) + diffFloat := new(big.Float).SetInt(diff) + percentageChange := new(big.Float).Quo(diffFloat, oldFloat) + percentageChange.Mul(percentageChange, big.NewFloat(100)) + absChange := new(big.Float).Abs(percentageChange) + + threshold := big.NewFloat(monitor.PriceDeviation) + if absChange.Cmp(threshold) > 0 { + logger.Debugf("Price deviation for %s: %.2f%% > %.2f%%, triggering update", + monitor.Symbol, percentageChange, monitor.PriceDeviation) + return true + } + } else { + // Any deviation triggers update + logger.Debugf("Price changed for %s: %s -> %s, triggering update", + monitor.Symbol, onChainValue.String(), cachedPrice.String()) + return true + } + } + + // Check time threshold (use per-destination threshold) + if monitor.TimeThreshold > 0 { + timeSinceUpdate := time.Since(time.Unix(int64(onChainTimestamp), 0)) + if timeSinceUpdate > monitor.TimeThreshold { + logger.Debugf("Time threshold exceeded for %s: %v > %v, triggering update", + monitor.Symbol, timeSinceUpdate, monitor.TimeThreshold) + return true + } + } + + return false +} + +// triggerUpdate creates and sends an update request +func (s *Service) triggerUpdate(monitor *DestinationMonitor, priceEntry *processor.PriceEntry) { + // Parse cached price + cachedPrice, ok := new(big.Int).SetString(priceEntry.Price, 10) + if !ok { + logger.Errorf("Failed to parse cached price for symbol=%s: %s", monitor.Symbol, priceEntry.Price) + return + } + + // Create OracleIntent entirely cache entry + intent := &types.OracleIntent{ + Symbol: priceEntry.Symbol, + Price: cachedPrice, + Timestamp: big.NewInt(int64(priceEntry.Timestamp)), + Nonce: priceEntry.Nonce, + Expiry: priceEntry.Expiry, + Signer: priceEntry.Signer, + Signature: priceEntry.Signature, + Source: priceEntry.Source, + ChainID: priceEntry.ChainID, + IntentType: priceEntry.IntentType, + Version: priceEntry.Version, + } + + // Create update request + updateReq := &types.UpdateRequest{ + ID: fmt.Sprintf("cron-%s-%d", monitor.Symbol, time.Now().Unix()), + Intent: intent, + DestinationChain: &config.DestinationConfig{ + ChainID: monitor.ChainID, + Name: fmt.Sprintf("Chain %d", monitor.ChainID), + }, + Contract: &config.ContractConfig{ + Address: monitor.ContractAddress.Hex(), + }, + RouterID: monitor.RouterID, + DestinationMethodConfig: monitor.MethodConfig, + IsCronTriggered: true, // Mark as cron-triggered + CreatedAt: time.Now(), + } + + updateReq.ExtractedData = &config.ExtractedData{ + Enrichment: map[string]interface{}{ + "fullIntent": intent, + }, + } + + // Send to update channel + select { + case s.updateChan <- updateReq: + logger.Infof("Cron update request sent for router=%s symbol=%s", monitor.RouterID, monitor.Symbol) + default: + logger.Warnf("Cron update channel full, dropping update request for router=%s symbol=%s", monitor.RouterID, monitor.Symbol) + } +} + +// getOnChainValue retrieves the current value and timestamp from on-chain +func (s *Service) getOnChainValue(monitor *DestinationMonitor) (*big.Int, uint64, error) { + const getValueABI = `[{ + "inputs": [{"internalType": "string", "name": "key", "type": "string"}], + "name": "getValue", + "outputs": [ + {"internalType": "uint128", "name": "value", "type": "uint128"}, + {"internalType": "uint128", "name": "timestamp", "type": "uint128"} + ], + "stateMutability": "view", + "type": "function" + }]` + + parsedABI, err := abi.JSON(strings.NewReader(getValueABI)) + if err != nil { + return nil, 0, fmt.Errorf("failed to parse ABI: %w", err) + } + + data, err := parsedABI.Pack("getValue", monitor.Symbol) + if err != nil { + return nil, 0, fmt.Errorf("failed to pack call data: %w", err) + } + + msg := ethereum.CallMsg{ + To: &monitor.ContractAddress, + Data: data, + } + + result, err := monitor.Client.CallContract(context.Background(), msg, nil) + if err != nil { + return nil, 0, fmt.Errorf("failed to call contract: %w", err) + } + + // Unpack result + values := struct { + Value *big.Int + Timestamp *big.Int + }{} + + err = parsedABI.UnpackIntoInterface(&values, "getValue", result) + if err != nil { + return nil, 0, fmt.Errorf("failed to unpack result: %w", err) + } + + return values.Value, values.Timestamp.Uint64(), nil +} diff --git a/services/bridge/internal/processor/generic_event_processor.go b/services/bridge/internal/processor/generic_event_processor.go index 30ac862..bd699b4 100644 --- a/services/bridge/internal/processor/generic_event_processor.go +++ b/services/bridge/internal/processor/generic_event_processor.go @@ -44,6 +44,7 @@ type GenericEventProcessor struct { updateChan chan<- *types.UpdateRequest dedupCache *DedupCache + priceCache *PriceCache metricsCollector *metrics.Collector reportQueueSize func() // Callback to report queue size after enqueue @@ -107,6 +108,7 @@ func NewGenericEventProcessor( errorChan: errorChan, updateChan: updateChan, dedupCache: NewDedupCache(cfg.DedupCacheSize, cfg.DedupCacheTTL.Duration()), + priceCache: NewPriceCache(), metricsCollector: metricsCollector, reportQueueSize: reportQueueSize, stopChan: make(chan struct{}), @@ -457,16 +459,79 @@ func (gep *GenericEventProcessor) processEvent(ctx context.Context, event *types ProcessedAt: time.Now(), } - if symbol, ok := extractedData.Event["symbol"].(string); ok { - processedEvent.Symbol = symbol + // Try to extract symbol, price, and timestamp from enrichment (fullIntent) first + var oracleIntent *types.OracleIntent + if fullIntent, ok := extractedData.Enrichment["fullIntent"]; ok { + // Try to convert to *types.OracleIntent + if intent, ok := fullIntent.(*types.OracleIntent); ok { + oracleIntent = intent + if intent.Symbol != "" { + processedEvent.Symbol = intent.Symbol + } + if intent.Price != nil { + processedEvent.Price = intent.Price.String() + } + if intent.Timestamp != nil { + processedEvent.Timestamp = intent.Timestamp.Uint64() + } + } + } + + // Fallback to extracting from raw event if not found in enrichment + if processedEvent.Symbol == "" { + if symbol, ok := extractedData.Event["symbol"].(string); ok { + processedEvent.Symbol = symbol + } + } + + if processedEvent.Price == "" { + if priceValue, ok := extractedData.Event["price"]; ok { + processedEvent.Price = parsePrice(priceValue) + } } - if priceValue, ok := extractedData.Event["price"]; ok { - processedEvent.Price = parsePrice(priceValue) + if processedEvent.Timestamp == 0 { + if timestampValue, ok := extractedData.Event["timestamp"]; ok { + processedEvent.Timestamp = parseTimestamp(timestampValue) + } } - if timestampValue, ok := extractedData.Event["timestamp"]; ok { - processedEvent.Timestamp = parseTimestamp(timestampValue) + // Update in-memory price cache if we have both symbol and price + if processedEvent.Symbol != "" && processedEvent.Price != "" && processedEvent.Timestamp > 0 { + var nonce, expiry, chainID *big.Int + var signer common.Address + var signature []byte + var source string + var intentType, version string + + // Extract additional fields from oracleIntent if available + if oracleIntent != nil { + nonce = oracleIntent.Nonce + expiry = oracleIntent.Expiry + signer = oracleIntent.Signer + signature = oracleIntent.Signature + source = oracleIntent.Source + chainID = oracleIntent.ChainID + intentType = oracleIntent.IntentType + version = oracleIntent.Version + } + + gep.priceCache.UpdatePriceWithMetadata( + processedEvent.Symbol, + processedEvent.Price, + compositeIntentHash, + processedEvent.Timestamp, + intentType, + version, + nonce, + expiry, + chainID, + signer, + signature, + source, + ) + logger.Debugf("Updated price cache: symbol=%s, price=%s, timestamp=%d, signer=%s", + processedEvent.Symbol, processedEvent.Price, processedEvent.Timestamp, signer.Hex()) } logger.Infof("Saving ProcessedEvent with composite IntentHash: %s (len=%d) for destination: %s", compositeIntentHash, len(compositeIntentHash), destID) @@ -683,3 +748,8 @@ func parseTimestamp(timestampValue interface{}) uint64 { } } } + +// GetPriceCache returns the price cache instance +func (gep *GenericEventProcessor) GetPriceCache() *PriceCache { + return gep.priceCache +} diff --git a/services/bridge/internal/processor/price_cache.go b/services/bridge/internal/processor/price_cache.go new file mode 100644 index 0000000..4d74c32 --- /dev/null +++ b/services/bridge/internal/processor/price_cache.go @@ -0,0 +1,180 @@ +package processor + +import ( + "math/big" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" +) + +// PriceEntry represents a single price entry with metadata +type PriceEntry struct { + Symbol string + Price string + Timestamp uint64 + IntentHash string + + IntentType string + Version string + Nonce *big.Int + Expiry *big.Int + Signer common.Address + Signature []byte + Source string + ChainID *big.Int +} + +// PriceCache maintains in-memory prices per symbol +type PriceCache struct { + mu sync.RWMutex + prices map[string]*PriceEntry // symbol -> PriceEntry +} + +// NewPriceCache creates a new price cache +func NewPriceCache() *PriceCache { + return &PriceCache{ + prices: make(map[string]*PriceEntry), + } +} + +// UpdatePrice updates the price for a symbol only if the timestamp is greater than the existing one +func (pc *PriceCache) UpdatePrice(symbol, price, intentHash string, timestamp uint64, nonce, expiry, chainID *big.Int, signer common.Address, signature []byte, source string) { + pc.UpdatePriceWithMetadata(symbol, price, intentHash, timestamp, "price_update", "1.0", nonce, expiry, chainID, signer, signature, source) +} + +// UpdatePriceWithMetadata updates the price with full metadata including intent type and version +func (pc *PriceCache) UpdatePriceWithMetadata(symbol, price, intentHash string, timestamp uint64, intentType, version string, nonce, expiry, chainID *big.Int, signer common.Address, signature []byte, source string) { + pc.mu.Lock() + defer pc.mu.Unlock() + + // Check if existing entry has a greater or equal timestamp + if existingEntry, exists := pc.prices[symbol]; exists { + if existingEntry.Timestamp >= timestamp { + // Existing price is newer or same age, don't update + return + } + } + + pc.prices[symbol] = &PriceEntry{ + Symbol: symbol, + Price: price, + Timestamp: timestamp, + IntentHash: intentHash, + IntentType: intentType, + Version: version, + Nonce: nonce, + Expiry: expiry, + Signer: signer, + Signature: signature, + Source: source, + ChainID: chainID, + } +} + +// GetPrice retrieves the price entry for a symbol +func (pc *PriceCache) GetPrice(symbol string) (*PriceEntry, bool) { + pc.mu.RLock() + defer pc.mu.RUnlock() + + entry, exists := pc.prices[symbol] + if !exists { + return nil, false + } + + // Return a copy to prevent external mutation + return &PriceEntry{ + Symbol: entry.Symbol, + Price: entry.Price, + Timestamp: entry.Timestamp, + IntentHash: entry.IntentHash, + IntentType: entry.IntentType, + Version: entry.Version, + Nonce: entry.Nonce, + Expiry: entry.Expiry, + Signer: entry.Signer, + Signature: entry.Signature, + Source: entry.Source, + ChainID: entry.ChainID, + }, true +} + +// GetAllPrices returns all prices in the cache +func (pc *PriceCache) GetAllPrices() map[string]*PriceEntry { + pc.mu.RLock() + defer pc.mu.RUnlock() + + // Return a copy to prevent external mutation + result := make(map[string]*PriceEntry, len(pc.prices)) + for symbol, entry := range pc.prices { + result[symbol] = &PriceEntry{ + Symbol: entry.Symbol, + Price: entry.Price, + Timestamp: entry.Timestamp, + IntentHash: entry.IntentHash, + IntentType: entry.IntentType, + Version: entry.Version, + Nonce: entry.Nonce, + Expiry: entry.Expiry, + Signer: entry.Signer, + Signature: entry.Signature, + Source: entry.Source, + ChainID: entry.ChainID, + } + } + return result +} + +// GetSymbols returns all symbols in the cache +func (pc *PriceCache) GetSymbols() []string { + pc.mu.RLock() + defer pc.mu.RUnlock() + + symbols := make([]string, 0, len(pc.prices)) + for symbol := range pc.prices { + symbols = append(symbols, symbol) + } + return symbols +} + +// RemoveSymbol removes a symbol from the cache +func (pc *PriceCache) RemoveSymbol(symbol string) { + pc.mu.Lock() + defer pc.mu.Unlock() + + delete(pc.prices, symbol) +} + +// Clear removes all entries from the cache +func (pc *PriceCache) Clear() { + pc.mu.Lock() + defer pc.mu.Unlock() + + pc.prices = make(map[string]*PriceEntry) +} + +// Size returns the number of symbols in the cache +func (pc *PriceCache) Size() int { + pc.mu.RLock() + defer pc.mu.RUnlock() + + return len(pc.prices) +} + +// GetStalePrices returns symbols with prices older than the given duration +func (pc *PriceCache) GetStalePrices(maxAge time.Duration) []string { + pc.mu.RLock() + defer pc.mu.RUnlock() + + var staleSymbols []string + cutoffTime := time.Now().Add(-maxAge) + + for symbol, entry := range pc.prices { + priceTime := time.Unix(int64(entry.Timestamp), 0) + if priceTime.Before(cutoffTime) { + staleSymbols = append(staleSymbols, symbol) + } + } + + return staleSymbols +} diff --git a/services/bridge/internal/types/types.go b/services/bridge/internal/types/types.go index 55ca643..3726759 100644 --- a/services/bridge/internal/types/types.go +++ b/services/bridge/internal/types/types.go @@ -265,7 +265,8 @@ type UpdateRequest struct { DestinationMethodConfig *config.DestinationMethodConfig `json:"destination_method_config,omitempty"` ExtractedData *config.ExtractedData `json:"extracted_data,omitempty"` - TriggeredByMonitoring bool `json:"triggered_by_monitoring,omitempty"` + IsMonitoringTriggered bool `json:"is_monitoring_triggered,omitempty"` + IsCronTriggered bool `json:"is_cron_triggered,omitempty"` } // UpdateResult represents the result of an update operation diff --git a/services/bridge/pkg/router/generic_router.go b/services/bridge/pkg/router/generic_router.go index 7a76b89..ccb5be7 100644 --- a/services/bridge/pkg/router/generic_router.go +++ b/services/bridge/pkg/router/generic_router.go @@ -179,7 +179,7 @@ func (gr *GenericRouter) ShouldRoute(eventName string, data *config.ExtractedDat func (gr *GenericRouter) evaluateCondition(condition config.TriggerCondition, data *config.ExtractedData) bool { value, err := gr.getFieldValue(condition.Field, data) if err != nil { - logger.Debugf("Failed to get field value for condition: %v", err) + logger.Debugf("Failed to get field value for condition: %s Operator: %s Value: %v, error: %v", condition.Field, condition.Operator, condition.Value, err) return false } From fe9f31a13ed6bb4ea2a5f78d946ed7b94c75072c Mon Sep 17 00:00:00 2001 From: nnn-gif Date: Fri, 6 Feb 2026 19:02:27 +0530 Subject: [PATCH 2/5] fix: goroutine timeout read from config --- services/bridge/internal/bridge/bridge.go | 1 + services/bridge/internal/worker/worker_pool.go | 13 +++++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/services/bridge/internal/bridge/bridge.go b/services/bridge/internal/bridge/bridge.go index 67831a2..690f03c 100644 --- a/services/bridge/internal/bridge/bridge.go +++ b/services/bridge/internal/bridge/bridge.go @@ -185,6 +185,7 @@ func NewBridge(modularCfg *config.ModularConfig, cfgService *config.ConfigServic workerPool := worker.NewWorkerPool( cfgService.GetInfrastructure().WorkerPool.MaxWorkers, cfgService.GetInfrastructure().WorkerPool.TaskQueueSize, + cfgService.GetInfrastructure().WorkerPool.TaskTimeout.Duration(), ) if metricsCollector != nil { workerPool.SetMetricsCollector(metricsCollector) diff --git a/services/bridge/internal/worker/worker_pool.go b/services/bridge/internal/worker/worker_pool.go index fc0879c..66ecf73 100644 --- a/services/bridge/internal/worker/worker_pool.go +++ b/services/bridge/internal/worker/worker_pool.go @@ -29,6 +29,7 @@ type WorkerPool struct { running bool metricsCollector *metrics.Collector activeWorkers int32 // Track number of currently active workers + taskTimeout time.Duration } // Worker represents a single worker in the pool @@ -42,19 +43,24 @@ type Worker struct { } // NewWorkerPool creates a new worker pool -func NewWorkerPool(maxWorkers int, taskQueueSize int) *WorkerPool { +func NewWorkerPool(maxWorkers int, taskQueueSize int, taskTimeout time.Duration) *WorkerPool { // Use taskQueueSize if provided, otherwise fallback to maxWorkers*2 queueSize := taskQueueSize if queueSize <= 0 { queueSize = maxWorkers * 2 } - logger.Infof("Creating worker pool: maxWorkers=%d, taskQueueSize=%d", maxWorkers, queueSize) + if taskTimeout <= 0 { + taskTimeout = 6 * time.Minute + } + + logger.Infof("Creating worker pool: maxWorkers=%d, taskQueueSize=%d, taskTimeout=%v", maxWorkers, queueSize, taskTimeout) return &WorkerPool{ maxWorkers: maxWorkers, taskQueue: make(chan *WorkerTask, queueSize), shutdownChan: make(chan struct{}), + taskTimeout: taskTimeout, } } @@ -279,8 +285,7 @@ func (w *Worker) processTask(ctx context.Context, task *WorkerTask) { w.id, task.ID, routerID, symbol, chainID, atomic.LoadInt32(&w.pool.activeWorkers)) // Create timeout context to prevent workers from blocking forever - // 6 minutes = enough time for receipt wait (5 min) + RPC calls - taskCtx, cancel := context.WithTimeout(ctx, 6*time.Minute) + taskCtx, cancel := context.WithTimeout(ctx, w.pool.taskTimeout) defer cancel() // Process the task with retry logic From 292a8dc1397a0c9130223cbd639877abdfba9739 Mon Sep 17 00:00:00 2001 From: nnn-gif Date: Fri, 6 Feb 2026 23:30:58 +0530 Subject: [PATCH 3/5] feat: start new worker pool per router --- services/bridge/internal/bridge/bridge.go | 87 ++++++++++++++++++----- services/bridge/internal/bridge/health.go | 31 ++++++-- 2 files changed, 93 insertions(+), 25 deletions(-) diff --git a/services/bridge/internal/bridge/bridge.go b/services/bridge/internal/bridge/bridge.go index 690f03c..b37256d 100644 --- a/services/bridge/internal/bridge/bridge.go +++ b/services/bridge/internal/bridge/bridge.go @@ -42,12 +42,15 @@ type Bridge struct { running bool stats *bridgetypes.BridgeStats lastProcessedBlock uint64 + ctx context.Context // Context for managing pool lifecycles // Goroutine coordination wg sync.WaitGroup - // Worker management - workerPool *worker.WorkerPool + // Worker management - per-oracle worker pools for isolation + oraclePools map[string]*worker.WorkerPool // Per-oracle pools (routerID -> pool) + oraclePoolsMu sync.RWMutex // Protects oraclePools map + workerPoolConfig config.WorkerPoolConfig // Base config for all pools // Router system routerRegistry *router.GenericRegistry @@ -182,14 +185,8 @@ func NewBridge(modularCfg *config.ModularConfig, cfgService *config.ConfigServic return nil, fmt.Errorf(errorMsg) } - workerPool := worker.NewWorkerPool( - cfgService.GetInfrastructure().WorkerPool.MaxWorkers, - cfgService.GetInfrastructure().WorkerPool.TaskQueueSize, - cfgService.GetInfrastructure().WorkerPool.TaskTimeout.Duration(), - ) - if metricsCollector != nil { - workerPool.SetMetricsCollector(metricsCollector) - } + // Store worker pool config for dynamic pool creation + workerPoolConfig := cfgService.GetInfrastructure().WorkerPool eventChan := make(chan *bridgetypes.EventData, 100) errorChan := make(chan error, 10) @@ -217,7 +214,8 @@ func NewBridge(modularCfg *config.ModularConfig, cfgService *config.ConfigServic StartTime: time.Now(), }, lastProcessedBlock: cfgService.GetInfrastructure().Source.StartBlock, - workerPool: workerPool, + oraclePools: make(map[string]*worker.WorkerPool), + workerPoolConfig: workerPoolConfig, routerRegistry: routerRegistry, metricsManager: metricsManager, queueManager: queueManager, @@ -379,6 +377,7 @@ func (b *Bridge) Start(ctx context.Context) error { return fmt.Errorf("bridge is already running") } b.running = true + b.ctx = ctx // Store context for pool management b.mu.Unlock() logger.Info("Starting bridge service") @@ -399,9 +398,6 @@ func (b *Bridge) Start(ctx context.Context) error { } } - // Start worker pool - b.workerPool.Start(ctx) - // Start event source (scanner + processor) if b.eventSource != nil { if err := b.eventSource.Start(ctx); err != nil { @@ -453,6 +449,50 @@ func (b *Bridge) Start(ctx context.Context) error { return nil } +// getOrCreateOraclePool dynamically creates a worker pool for an oracle if it doesn't exist +func (b *Bridge) getOrCreateOraclePool(routerID string) *worker.WorkerPool { + b.oraclePoolsMu.RLock() + pool, exists := b.oraclePools[routerID] + b.oraclePoolsMu.RUnlock() + + if exists { + return pool + } + + // Create new pool for this oracle/router + b.oraclePoolsMu.Lock() + defer b.oraclePoolsMu.Unlock() + + if pool, exists := b.oraclePools[routerID]; exists { + return pool + } + + logger.Infof("Creating new worker pool for oracle/router: %s (workers=%d, queue=%d, timeout=%v)", + routerID, + b.workerPoolConfig.MaxWorkers, + b.workerPoolConfig.TaskQueueSize, + b.workerPoolConfig.TaskTimeout.Duration()) + + pool = worker.NewWorkerPool( + b.workerPoolConfig.MaxWorkers, + b.workerPoolConfig.TaskQueueSize, + b.workerPoolConfig.TaskTimeout.Duration(), + ) + + // Set metrics collector if available + if b.metricsManager != nil && b.metricsManager.GetCollector() != nil { + pool.SetMetricsCollector(b.metricsManager.GetCollector()) + } + + // Start the pool + pool.Start(b.ctx) + + b.oraclePools[routerID] = pool + + logger.Infof("Worker pool created and started for oracle: %s", routerID) + return pool +} + // Stop stops the bridge service func (b *Bridge) Stop(ctx context.Context) error { b.mu.Lock() @@ -496,8 +536,15 @@ func (b *Bridge) Stop(ctx context.Context) error { b.cronService.Stop() } - // Stop worker pool - b.workerPool.Stop(ctx) + // Stop all oracle worker pools + b.oraclePoolsMu.Lock() + logger.Infof("Stopping %d oracle worker pools", len(b.oraclePools)) + for routerID, pool := range b.oraclePools { + logger.Infof("Stopping worker pool for oracle: %s", routerID) + pool.Stop(ctx) + } + b.oraclePools = make(map[string]*worker.WorkerPool) // Clear the map + b.oraclePoolsMu.Unlock() // Stop transaction queue manager b.queueManager.Stop() @@ -664,8 +711,12 @@ func (b *Bridge) processUpdates(ctx context.Context) { taskID = fmt.Sprintf("Process Updates unknown-%d-%d", updateReq.DestinationChain.ChainID, time.Now().Unix()) } - logger.Infof("[UPDATE-SUBMIT] Submitting to worker pool: task=%s, router=%s", taskID, updateReq.RouterID) - b.workerPool.Submit(&worker.WorkerTask{ + logger.Infof("[UPDATE-SUBMIT] Submitting to oracle pool: task=%s, router=%s", taskID, updateReq.RouterID) + + pool := b.getOrCreateOraclePool(updateReq.RouterID) + + // Submit to the oracle pool + pool.Submit(&worker.WorkerTask{ ID: taskID, Request: updateReq, Handler: b.handleUpdateRequest, diff --git a/services/bridge/internal/bridge/health.go b/services/bridge/internal/bridge/health.go index 47e845d..6c6ed6e 100644 --- a/services/bridge/internal/bridge/health.go +++ b/services/bridge/internal/bridge/health.go @@ -62,17 +62,34 @@ func (b *Bridge) healthCheck(ctx context.Context) { // performHealthCheck performs health checks on all chains func (b *Bridge) performHealthCheck(ctx context.Context) { - // Log worker pool status for debugging - if b.workerPool != nil { - workerStats := b.workerPool.GetStats() + b.oraclePoolsMu.RLock() + totalActive := int32(0) + totalMax := 0 + totalPending := 0 + totalCapacity := 0 + poolCount := 0 + + for routerID, pool := range b.oraclePools { + stats := pool.GetStats() + totalActive += stats.ActiveTasks + totalMax += stats.MaxWorkers + totalPending += stats.PendingTasks + totalCapacity += stats.TotalCapacity + poolCount++ + + logger.Infof("[HEALTH] Oracle pool [%s]: active=%d/%d, pending=%d/%d", + routerID, stats.ActiveTasks, stats.MaxWorkers, + stats.PendingTasks, stats.TotalCapacity) + } + b.oraclePoolsMu.RUnlock() + + if poolCount > 0 { queueSize := 0 if b.eventSource != nil { queueSize = b.eventSource.GetQueueSize() } - logger.Infof("[HEALTH] Worker pool: active=%d/%d, pending=%d/%d, update_queue=%d", - workerStats.ActiveTasks, workerStats.MaxWorkers, - workerStats.PendingTasks, workerStats.TotalCapacity, - queueSize) + logger.Infof("[HEALTH] Total worker pools: %d, aggregate active=%d/%d, pending=%d/%d, update_queue=%d", + poolCount, totalActive, totalMax, totalPending, totalCapacity, queueSize) } // Check source chain From 0d7d0613051fe12c3af03a8357471af39c70030e Mon Sep 17 00:00:00 2001 From: nnn-gif Date: Sun, 8 Feb 2026 08:16:56 +0530 Subject: [PATCH 4/5] feat: on start fetch onchain state of oracle --- services/bridge/internal/bridge/bridge.go | 17 +++++ .../bridge/internal/leader/onchain_monitor.go | 57 +-------------- services/bridge/internal/utils/contract.go | 69 +++++++++++++++++++ .../bridge/internal/worker/worker_pool.go | 14 ++-- .../bridge/pkg/router/generic_interface.go | 4 ++ services/bridge/pkg/router/generic_router.go | 62 +++++++++++++++++ 6 files changed, 161 insertions(+), 62 deletions(-) create mode 100644 services/bridge/internal/utils/contract.go diff --git a/services/bridge/internal/bridge/bridge.go b/services/bridge/internal/bridge/bridge.go index b37256d..daf0ba4 100644 --- a/services/bridge/internal/bridge/bridge.go +++ b/services/bridge/internal/bridge/bridge.go @@ -382,6 +382,22 @@ func (b *Bridge) Start(ctx context.Context) error { logger.Info("Starting bridge service") + // Load chain state per oracle per symbol + if b.routerRegistry != nil { + logger.Info("Fetching router state from on-chain...") + ethClients := make(map[int64]rpc.EthClient) + for chainID, writeClient := range b.writeClients { + ethClients[chainID] = writeClient.GetEthClient() + } + routers := b.routerRegistry.GetActiveRouters() + for _, router := range routers { + if err := router.FetchOracleStateFromOnChain(ctx, ethClients); err != nil { + logger.Warnf("Router state fetch failed for %s: %v", router.ID(), err) + } + } + logger.Info("Router state fetch completed") + } + // Start transaction queue manager b.queueManager.Start() @@ -474,6 +490,7 @@ func (b *Bridge) getOrCreateOraclePool(routerID string) *worker.WorkerPool { b.workerPoolConfig.TaskTimeout.Duration()) pool = worker.NewWorkerPool( + routerID, b.workerPoolConfig.MaxWorkers, b.workerPoolConfig.TaskQueueSize, b.workerPoolConfig.TaskTimeout.Duration(), diff --git a/services/bridge/internal/leader/onchain_monitor.go b/services/bridge/internal/leader/onchain_monitor.go index 761c4a2..5311305 100644 --- a/services/bridge/internal/leader/onchain_monitor.go +++ b/services/bridge/internal/leader/onchain_monitor.go @@ -1,17 +1,13 @@ package leader import ( - "bytes" "context" - "fmt" "math/big" "strconv" "strings" "sync" "time" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/diadata.org/Spectra-interoperability/pkg/logger" @@ -288,58 +284,7 @@ func (m *OnChainMonitor) generateKey(chainID int64, contractAddress common.Addre } func (m *OnChainMonitor) getValueFromContract(dest *DestinationMonitor, symbol string) (*big.Int, uint64, error) { - - const getValueABI = `[{ - "inputs": [{"internalType": "string", "name": "key", "type": "string"}], - "name": "getValue", - "outputs": [ - {"internalType": "uint128", "name": "value", "type": "uint128"}, - {"internalType": "uint128", "name": "timestamp", "type": "uint128"} - ], - "stateMutability": "view", - "type": "function" - }]` - - parsedABI, err := abi.JSON(bytes.NewReader([]byte(getValueABI))) - if err != nil { - return nil, 0, fmt.Errorf("failed to parse ABI: %w", err) - } - - data, err := parsedABI.Pack("getValue", symbol) - if err != nil { - return nil, 0, fmt.Errorf("failed to pack input data: %w", err) - } - - callMsg := ethereum.CallMsg{ - To: &dest.ContractAddress, - Data: data, - } - - resultBytes, err := dest.Client.CallContract(m.ctx, callMsg, nil) - if err != nil { - return nil, 0, fmt.Errorf("contract call failed: %w", err) - } - - outputs, err := parsedABI.Unpack("getValue", resultBytes) - if err != nil { - return nil, 0, fmt.Errorf("failed to unpack result: %w", err) - } - - if len(outputs) != 2 { - return nil, 0, fmt.Errorf("unexpected number of outputs: got %d, want 2", len(outputs)) - } - - value, ok := outputs[0].(*big.Int) - if !ok { - return nil, 0, fmt.Errorf("failed to convert value to big.Int, got type %T: %v", outputs[0], outputs[0]) - } - - timestamp, ok := outputs[1].(*big.Int) - if !ok { - return nil, 0, fmt.Errorf("failed to convert timestamp to big.Int, got type %T: %v", outputs[1], outputs[1]) - } - - return value, timestamp.Uint64(), nil + return utils.GetValueFromOracleContract(m.ctx, dest.Client, dest.ContractAddress, symbol) } func formatValue(v *big.Int) string { diff --git a/services/bridge/internal/utils/contract.go b/services/bridge/internal/utils/contract.go new file mode 100644 index 0000000..05ebbfa --- /dev/null +++ b/services/bridge/internal/utils/contract.go @@ -0,0 +1,69 @@ +package utils + +import ( + "context" + "fmt" + "math/big" + "strings" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + + "github.com/diadata.org/Spectra-interoperability/pkg/rpc" +) + +// GetValueFromOracleContract calls the getValue method on an oracle contract +func GetValueFromOracleContract(ctx context.Context, client rpc.EthClient, contractAddress common.Address, symbol string) (*big.Int, uint64, error) { + const getValueABI = `[{ + "inputs": [{"internalType": "string", "name": "key", "type": "string"}], + "name": "getValue", + "outputs": [ + {"internalType": "uint128", "name": "value", "type": "uint128"}, + {"internalType": "uint128", "name": "timestamp", "type": "uint128"} + ], + "stateMutability": "view", + "type": "function" + }]` + + parsedABI, err := abi.JSON(strings.NewReader(getValueABI)) + if err != nil { + return nil, 0, fmt.Errorf("failed to parse ABI: %w", err) + } + + data, err := parsedABI.Pack("getValue", symbol) + if err != nil { + return nil, 0, fmt.Errorf("failed to pack input data: %w", err) + } + + callMsg := ethereum.CallMsg{ + To: &contractAddress, + Data: data, + } + + resultBytes, err := client.CallContract(ctx, callMsg, nil) + if err != nil { + return nil, 0, fmt.Errorf("contract call failed: %w", err) + } + + outputs, err := parsedABI.Unpack("getValue", resultBytes) + if err != nil { + return nil, 0, fmt.Errorf("failed to unpack result: %w", err) + } + + if len(outputs) != 2 { + return nil, 0, fmt.Errorf("unexpected number of outputs: got %d, want 2", len(outputs)) + } + + value, ok := outputs[0].(*big.Int) + if !ok { + return nil, 0, fmt.Errorf("failed to convert value to big.Int, got type %T: %v", outputs[0], outputs[0]) + } + + timestamp, ok := outputs[1].(*big.Int) + if !ok { + return nil, 0, fmt.Errorf("failed to convert timestamp to big.Int, got type %T: %v", outputs[1], outputs[1]) + } + + return value, timestamp.Uint64(), nil +} diff --git a/services/bridge/internal/worker/worker_pool.go b/services/bridge/internal/worker/worker_pool.go index 66ecf73..c2eaddd 100644 --- a/services/bridge/internal/worker/worker_pool.go +++ b/services/bridge/internal/worker/worker_pool.go @@ -20,6 +20,7 @@ type WorkerTask struct { // WorkerPool manages a pool of workers for processing update requests type WorkerPool struct { + routerID string // Router/oracle identifier for logging maxWorkers int taskQueue chan *WorkerTask workers []*Worker @@ -43,7 +44,7 @@ type Worker struct { } // NewWorkerPool creates a new worker pool -func NewWorkerPool(maxWorkers int, taskQueueSize int, taskTimeout time.Duration) *WorkerPool { +func NewWorkerPool(routerID string, maxWorkers int, taskQueueSize int, taskTimeout time.Duration) *WorkerPool { // Use taskQueueSize if provided, otherwise fallback to maxWorkers*2 queueSize := taskQueueSize if queueSize <= 0 { @@ -54,9 +55,10 @@ func NewWorkerPool(maxWorkers int, taskQueueSize int, taskTimeout time.Duration) taskTimeout = 6 * time.Minute } - logger.Infof("Creating worker pool: maxWorkers=%d, taskQueueSize=%d, taskTimeout=%v", maxWorkers, queueSize, taskTimeout) + logger.Infof("Creating worker pool for router %s: maxWorkers=%d, taskQueueSize=%d, taskTimeout=%v", routerID, maxWorkers, queueSize, taskTimeout) return &WorkerPool{ + routerID: routerID, maxWorkers: maxWorkers, taskQueue: make(chan *WorkerTask, queueSize), shutdownChan: make(chan struct{}), @@ -167,14 +169,14 @@ func (wp *WorkerPool) healthMonitor(ctx context.Context) { // Log warning if queue is getting full (>80% capacity) if queueCap > 0 && float64(queueSize)/float64(queueCap) > 0.8 { - logger.Warnf("Worker pool queue nearing capacity: %d/%d (%.1f%%), active workers: %d/%d", - queueSize, queueCap, float64(queueSize)/float64(queueCap)*100, activeCount, wp.maxWorkers) + logger.Warnf("Worker pool queue nearing capacity for %s: %d/%d (%.1f%%), active workers: %d/%d", + wp.routerID, queueSize, queueCap, float64(queueSize)/float64(queueCap)*100, activeCount, wp.maxWorkers) } // Log warning if all workers are busy and queue has items if int(activeCount) >= wp.maxWorkers && queueSize > 0 { - logger.Warnf("All %d workers busy with %d tasks queued - consider increasing worker count", - wp.maxWorkers, queueSize) + logger.Warnf("All %d workers busy for %s with %d tasks queued - consider increasing worker count", + wp.maxWorkers, wp.routerID, queueSize) } logger.Debugf("Worker pool health: active=%d/%d, queue=%d/%d", diff --git a/services/bridge/pkg/router/generic_interface.go b/services/bridge/pkg/router/generic_interface.go index 30232b1..1776eb9 100644 --- a/services/bridge/pkg/router/generic_interface.go +++ b/services/bridge/pkg/router/generic_interface.go @@ -1,6 +1,9 @@ package router import ( + "context" + + "github.com/diadata.org/Spectra-interoperability/pkg/rpc" "github.com/diadata.org/Spectra-interoperability/services/bridge/config" ) @@ -17,4 +20,5 @@ type GenericRouterInterface interface { GetStats() GenericRouterStats UpdateDestinationTime(dest config.RouterDestination, symbol string, data ...*config.ExtractedData) GetSymbolFromData(data *config.ExtractedData) string + FetchOracleStateFromOnChain(ctx context.Context, clients map[int64]rpc.EthClient) error } diff --git a/services/bridge/pkg/router/generic_router.go b/services/bridge/pkg/router/generic_router.go index ccb5be7..5c9ba99 100644 --- a/services/bridge/pkg/router/generic_router.go +++ b/services/bridge/pkg/router/generic_router.go @@ -1,6 +1,7 @@ package router import ( + "context" "crypto/ecdsa" "fmt" "math/big" @@ -14,6 +15,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/diadata.org/Spectra-interoperability/pkg/logger" + "github.com/diadata.org/Spectra-interoperability/pkg/rpc" "github.com/diadata.org/Spectra-interoperability/services/bridge/config" "github.com/diadata.org/Spectra-interoperability/services/bridge/internal/utils" ) @@ -778,6 +780,66 @@ func (gr *GenericRouter) GetStats() GenericRouterStats { return gr.stats } +// FetchOracleStateFromOnChain fetches the latest state from on-chain contracts +func (gr *GenericRouter) FetchOracleStateFromOnChain(ctx context.Context, clients map[int64]rpc.EthClient) error { + logger.Infof("Fetching oracle state from on-chain for router: %s", gr.config.ID) + + startTime := time.Now() + totalDestinations := 0 + successfulFetches := 0 + + // Get symbols from router config + symbols := GetSymbolsFromConfig(gr.config) + if len(symbols) == 0 { + logger.Infof("No symbols configured for router %s, skipping fetch", gr.config.ID) + return nil + } + + logger.Infof("Found %d symbols in router config for fetch: %v", len(symbols), symbols) + + for _, dest := range gr.config.Destinations { + client, exists := clients[dest.ChainID] + if !exists { + logger.Warnf("No client found for chain %d, skipping fetch", dest.ChainID) + continue + } + + totalDestinations += len(symbols) + + for _, symbol := range symbols { + value, timestamp, err := utils.GetValueFromOracleContract(ctx, client, common.HexToAddress(dest.Contract), symbol) + if err != nil { + logger.Warnf("Failed to fetch on-chain state for %s on chain %d: %v", symbol, dest.ChainID, err) + continue + } + + // Update in-memory cache + destKey := gr.generateDestinationKey(dest, symbol) + state := gr.getOrCreateDestinationState(destKey) + + state.mu.Lock() + if timestamp > 0 { + state.lastUpdate = time.Unix(int64(timestamp), 0) + state.lastTimestamp = timestamp + } + if value != nil && value.Sign() != 0 { + state.lastPrice = value.String() + } + state.mu.Unlock() + + successfulFetches++ + logger.Infof("Fetched state for %s on chain %d: timestamp=%d, price=%s", + symbol, dest.ChainID, timestamp, value.String()) + } + } + + duration := time.Since(startTime) + logger.Infof("Router state fetch completed for %s: %d/%d states fetched in %v", + gr.config.ID, successfulFetches, totalDestinations, duration) + + return nil +} + // ProcessingConfig returns the router's processing configuration func (gr *GenericRouter) ProcessingConfig() *config.ProcessingConfig { return &gr.config.Processing From 5481c264a3ce37798b6e53091eea7fce9adbe798 Mon Sep 17 00:00:00 2001 From: nnn-gif Date: Sun, 8 Feb 2026 08:49:24 +0530 Subject: [PATCH 5/5] chore: add routerid in transaction logs --- .../internal/bridge/transaction_handler.go | 12 ++++----- .../processor/generic_event_processor.go | 2 +- .../bridge/internal/transaction/executor.go | 27 ++++++++++++------- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/services/bridge/internal/bridge/transaction_handler.go b/services/bridge/internal/bridge/transaction_handler.go index cc8e947..6df8c07 100644 --- a/services/bridge/internal/bridge/transaction_handler.go +++ b/services/bridge/internal/bridge/transaction_handler.go @@ -166,7 +166,7 @@ func (h *TransactionHandler) confirm(txCtx *TransactionContext, tx *types.Transa tx.Hash().Hex(), txCtx.UpdateRequest.RouterID, txCtx.Symbol, txCtx.UpdateRequest.DestinationChain.ChainID) confirmStartTime := time.Now() - receipt, err := h.waitForReceipt(txCtx.Ctx, txCtx.DestClient.client, tx.Hash()) + receipt, err := h.waitForReceipt(txCtx.Ctx, txCtx.DestClient.client, tx.Hash(), txCtx.UpdateRequest.RouterID) if err != nil { h.recordFailure(txCtx, "confirmation", "receipt_timeout") logger.Errorf("[TX-CONFIRM] Failed to get receipt after %v: tx=%s, router=%s, symbol=%s, chain=%d, error=%v", @@ -404,8 +404,8 @@ func (h *TransactionHandler) getMonitoringInfo(txCtx *TransactionContext) string } // waitForReceipt waits for a transaction receipt -func (h *TransactionHandler) waitForReceipt(ctx context.Context, client rpc.EthClient, txHash common.Hash) (*types.Receipt, error) { - logger.Infof("Waiting for transaction receipt: %s", txHash.Hex()) +func (h *TransactionHandler) waitForReceipt(ctx context.Context, client rpc.EthClient, txHash common.Hash, routerID string) (*types.Receipt, error) { + logger.Infof("Waiting for transaction receipt: %s, router=%s", txHash.Hex(), routerID) timeout := time.After(5 * time.Minute) ticker := time.NewTicker(5 * time.Second) @@ -423,12 +423,12 @@ func (h *TransactionHandler) waitForReceipt(ctx context.Context, client rpc.EthC receipt, err := client.TransactionReceipt(ctx, txHash) if err != nil { if attempts%12 == 0 { // Log every minute - logger.Debugf("Still waiting for receipt %s (attempt %d): %v", txHash.Hex(), attempts, err) + logger.Debugf("Still waiting for receipt %s (attempt %d), router=%s: %v", txHash.Hex(), attempts, routerID, err) } continue } - logger.Infof("Transaction receipt received: %s, status: %d, gas used: %d", - txHash.Hex(), receipt.Status, receipt.GasUsed) + logger.Infof("Transaction receipt received: %s, status: %d, gas used: %d, router=%s", + txHash.Hex(), receipt.Status, receipt.GasUsed, routerID) return receipt, nil } } diff --git a/services/bridge/internal/processor/generic_event_processor.go b/services/bridge/internal/processor/generic_event_processor.go index bd699b4..018246a 100644 --- a/services/bridge/internal/processor/generic_event_processor.go +++ b/services/bridge/internal/processor/generic_event_processor.go @@ -274,7 +274,7 @@ func (gep *GenericEventProcessor) processEvent(ctx context.Context, event *types routersUsed := 0 for _, result := range routingResults { if result.Routed { - logger.Infof("Router %s approved event %s: %s", result.RouterID, event.EventName, result.Reason) + logger.Infof("Router approved event: router=%s, event=%s, reason=%s", result.RouterID, event.EventName, result.Reason) // Get the router to apply time threshold filtering after enrichment router := gep.routerRegistry.GetRouterByID(result.RouterID) diff --git a/services/bridge/internal/transaction/executor.go b/services/bridge/internal/transaction/executor.go index e39d132..fb6f30e 100644 --- a/services/bridge/internal/transaction/executor.go +++ b/services/bridge/internal/transaction/executor.go @@ -47,33 +47,33 @@ func (e *Executor) Execute(ctx context.Context, req *Request) (*types.Transactio auth := e.receiverClient.GetAuth() contractAddress := common.HexToAddress(req.ContractAddr) - logger.Infof("[PARAM-DEBUG] Method: %s, Contract: %s, ParamCount: %d", req.MethodName, req.ContractAddr, len(req.Params)) + logger.Debugf("[PARAM-DEBUG] Method: %s, Contract: %s, ParamCount: %d", req.MethodName, req.ContractAddr, len(req.Params)) for i, param := range req.Params { if param == nil { - logger.Infof("[PARAM-DEBUG] params[%d]: NIL", i) + logger.Debugf("[PARAM-DEBUG] params[%d]: NIL", i) continue } switch v := param.(type) { case []*big.Int: - logger.Infof("[PARAM-DEBUG] params[%d]: Type=[]*big.Int, Len=%d", i, len(v)) + logger.Debugf("[PARAM-DEBUG] params[%d]: Type=[]*big.Int, Len=%d", i, len(v)) if len(v) > 0 && len(v) <= 5 { for j, val := range v { - logger.Infof("[PARAM-DEBUG] [%d]=%s", j, val.String()) + logger.Debugf("[PARAM-DEBUG] [%d]=%s", j, val.String()) } } case []interface{}: - logger.Infof("[PARAM-DEBUG] params[%d]: Type=[]interface{}, Len=%d - NOT CONVERTED!", i, len(v)) + logger.Debugf("[PARAM-DEBUG] params[%d]: Type=[]interface{}, Len=%d - NOT CONVERTED!", i, len(v)) if len(v) > 0 && len(v) <= 5 { for j, val := range v { - logger.Infof("[PARAM-DEBUG] [%d]=%T: %v", j, val, val) + logger.Debugf("[PARAM-DEBUG] [%d]=%T: %v", j, val, val) } } case *big.Int: - logger.Infof("[PARAM-DEBUG] params[%d]: Type=*big.Int, Value=%s", i, v.String()) + logger.Debugf("[PARAM-DEBUG] params[%d]: Type=*big.Int, Value=%s", i, v.String()) default: - logger.Infof("[PARAM-DEBUG] params[%d]: Type=%T, Value=%v", i, param, param) + logger.Debugf("[PARAM-DEBUG] params[%d]: Type=%T, Value=%v", i, param, param) } } @@ -84,7 +84,11 @@ func (e *Executor) Execute(ctx context.Context, req *Request) (*types.Transactio fromAddress := auth.From - logger.Infof("Simulating transaction for method %s on contract %s", req.MethodName, req.ContractAddr) + routerID := "unknown" + if req.UpdateRequest != nil { + routerID = req.UpdateRequest.RouterID + } + logger.Infof("Simulating transaction for method %s on contract %s, router=%s", req.MethodName, req.ContractAddr, routerID) callMsg := ethereum.CallMsg{ From: fromAddress, To: &contractAddress, @@ -114,7 +118,10 @@ func (e *Executor) Execute(ctx context.Context, req *Request) (*types.Transactio } } - logger.Infof("Transaction simulation successful, proceeding to send transaction") + if req.UpdateRequest != nil && routerID != "unknown" { + routerID = req.UpdateRequest.RouterID + } + logger.Infof("Transaction simulation successful, proceeding to send transaction, router=%s", routerID) // CRITICAL: Allocate nonce immediately before sending to minimize staleness window // This happens AFTER simulation to reduce the time between allocation and sending