From f61d7b61cf385ab4700f345ddaa2a53968d78ada Mon Sep 17 00:00:00 2001 From: nnn-gif Date: Thu, 7 May 2026 17:37:00 +0530 Subject: [PATCH] feat: update based on deviation --- services/attestor/main.go | 17 +-- services/attestor/pkg/config/config.go | 35 ++++-- services/attestor/pkg/config/types.go | 15 ++- services/attestor/pkg/service/attestor.go | 141 +++++++++++++++++++--- 4 files changed, 171 insertions(+), 37 deletions(-) diff --git a/services/attestor/main.go b/services/attestor/main.go index c5f3cff..79eef82 100644 --- a/services/attestor/main.go +++ b/services/attestor/main.go @@ -95,13 +95,16 @@ func main() { signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) logFields := map[string]interface{}{ - "symbols": cfg.Attestor.Symbols, - "oracle": cfg.Oracle.Address, - "oracle_client_type": cfg.Oracle.ClientType.String(), - "registry": cfg.Registry.Address, - "polling_time": cfg.Attestor.PollingTime.String(), - "batch_mode": cfg.Attestor.BatchMode, - "mode": cfg.Attestor.Mode.String(), + "symbols": cfg.Attestor.Symbols, + "oracle": cfg.Oracle.Address, + "oracle_client_type": cfg.Oracle.ClientType.String(), + "registry": cfg.Registry.Address, + "polling_time": cfg.Attestor.PollingTime.String(), + "batch_mode": cfg.Attestor.BatchMode, + "mode": cfg.Attestor.Mode.String(), + "deviation_trigger": cfg.Attestor.DeviationTrigger, + "deviation_threshold_bips": cfg.Attestor.DeviationThreshold, + "force_update_interval": cfg.Attestor.ForceUpdateInterval.String(), } if cfg.Attestor.Mode == config.ModeReplica { diff --git a/services/attestor/pkg/config/config.go b/services/attestor/pkg/config/config.go index 6ec8ed4..e86d8df 100644 --- a/services/attestor/pkg/config/config.go +++ b/services/attestor/pkg/config/config.go @@ -99,15 +99,18 @@ type Config struct { } `mapstructure:"registry"` Attestor struct { - PrivateKey string `mapstructure:"private_key"` - Symbols []string `mapstructure:"symbols"` - PollingTime time.Duration `mapstructure:"polling_time"` - BatchMode bool `mapstructure:"batch_mode"` - Mode AttestorMode `mapstructure:"mode"` - ReplicaBackupDelay int `mapstructure:"replica_backup_delay"` - IntentType string `mapstructure:"intent_type"` - IntentVersion string `mapstructure:"intent_version"` - Guardian GuardianConfig `mapstructure:"guardian"` + PrivateKey string `mapstructure:"private_key"` + Symbols []string `mapstructure:"symbols"` + PollingTime time.Duration `mapstructure:"polling_time"` + BatchMode bool `mapstructure:"batch_mode"` + Mode AttestorMode `mapstructure:"mode"` + ReplicaBackupDelay int `mapstructure:"replica_backup_delay"` + IntentType string `mapstructure:"intent_type"` + IntentVersion string `mapstructure:"intent_version"` + DeviationTrigger bool `mapstructure:"deviation_trigger"` + DeviationThreshold int `mapstructure:"deviation_threshold"` + ForceUpdateInterval time.Duration `mapstructure:"force_update_interval"` + Guardian GuardianConfig `mapstructure:"guardian"` } `mapstructure:"attestor"` Logging struct { @@ -181,6 +184,9 @@ func Init(configPath string) (*Config, error) { v.BindEnv("attestor.replica_backup_delay", "ATTESTOR_ATTESTOR_REPLICA_BACKUP_DELAY") v.BindEnv("attestor.intent_type", "ATTESTOR_ATTESTOR_INTENT_TYPE") v.BindEnv("attestor.intent_version", "ATTESTOR_ATTESTOR_INTENT_VERSION") + v.BindEnv("attestor.deviation_trigger", "ATTESTOR_ATTESTOR_DEVIATION_TRIGGER") + v.BindEnv("attestor.deviation_threshold", "ATTESTOR_ATTESTOR_DEVIATION_THRESHOLD") + v.BindEnv("attestor.force_update_interval", "ATTESTOR_ATTESTOR_FORCE_UPDATE_INTERVAL") v.BindEnv("oracle.client_type", "ATTESTOR_ORACLE_CLIENT_TYPE") v.BindEnv("attestor.guardian.default.max_deviation_bips", "ATTESTOR_GUARDIAN_MAX_DEVIATION_BIPS") v.BindEnv("attestor.guardian.default.max_timestamp_age", "ATTESTOR_GUARDIAN_MAX_TIMESTAMP_AGE") @@ -199,6 +205,9 @@ func Init(configPath string) (*Config, error) { v.SetDefault("attestor.replica_backup_delay", 300) v.SetDefault("attestor.intent_type", "OracleUpdate") v.SetDefault("attestor.intent_version", "1.0") + v.SetDefault("attestor.deviation_trigger", false) + v.SetDefault("attestor.deviation_threshold", 50) + v.SetDefault("attestor.force_update_interval", "0s") v.SetDefault("attestor.guardian.default.max_deviation_bips", 500) v.SetDefault("attestor.guardian.default.max_timestamp_age", 3600) v.SetDefault("attestor.guardian.default.min_guardian_matches", 1) @@ -317,6 +326,14 @@ func validateConfig(cfg *Config) error { return fmt.Errorf("invalid polling time: %v", cfg.Attestor.PollingTime) } + if cfg.Attestor.DeviationThreshold < 0 || cfg.Attestor.DeviationThreshold > 10000 { + return fmt.Errorf("invalid deviation_threshold: %d (must be between 0 and 10000)", cfg.Attestor.DeviationThreshold) + } + + if cfg.Attestor.ForceUpdateInterval < 0 { + return fmt.Errorf("invalid force_update_interval: %v (must be zero or positive)", cfg.Attestor.ForceUpdateInterval) + } + // Validate guardian params if err := validateGuardianConfig(&cfg.Attestor.Guardian); err != nil { return fmt.Errorf("guardian configuration invalid: %w", err) diff --git a/services/attestor/pkg/config/types.go b/services/attestor/pkg/config/types.go index a8f1c98..b4a542d 100644 --- a/services/attestor/pkg/config/types.go +++ b/services/attestor/pkg/config/types.go @@ -59,12 +59,15 @@ func ParseOracleClientType(s string) (OracleClientType, error) { // AttestorConfig holds attestor-specific configuration type AttestorConfig struct { - PrivateKey string `mapstructure:"private_key"` - Symbols []string `mapstructure:"symbols"` - PollingTime time.Duration `mapstructure:"polling_time"` - BatchMode bool `mapstructure:"batch_mode"` - IntentType string `mapstructure:"intent_type"` - IntentVersion string `mapstructure:"intent_version"` + PrivateKey string `mapstructure:"private_key"` + Symbols []string `mapstructure:"symbols"` + PollingTime time.Duration `mapstructure:"polling_time"` + BatchMode bool `mapstructure:"batch_mode"` + IntentType string `mapstructure:"intent_type"` + IntentVersion string `mapstructure:"intent_version"` + DeviationTrigger bool `mapstructure:"deviation_trigger"` + DeviationThreshold int `mapstructure:"deviation_threshold"` + ForceUpdateInterval time.Duration `mapstructure:"force_update_interval"` } // OracleConfig holds oracle configuration diff --git a/services/attestor/pkg/service/attestor.go b/services/attestor/pkg/service/attestor.go index 58fae49..5245227 100644 --- a/services/attestor/pkg/service/attestor.go +++ b/services/attestor/pkg/service/attestor.go @@ -21,9 +21,11 @@ type AttestorService struct { signer interfaces.IntentSigner metrics interfaces.MetricsCollector - mu sync.RWMutex - running bool - cancelFunc context.CancelFunc + mu sync.RWMutex + running bool + cancelFunc context.CancelFunc + lastPublishedPrice map[string]*big.Int + lastPublishedAt map[string]time.Time } // NewAttestorService creates a new attestor service @@ -35,11 +37,13 @@ func NewAttestorService( metrics interfaces.MetricsCollector, ) *AttestorService { return &AttestorService{ - config: cfg, - oracle: oracle, - registry: registry, - signer: signer, - metrics: metrics, + config: cfg, + oracle: oracle, + registry: registry, + signer: signer, + metrics: metrics, + lastPublishedPrice: make(map[string]*big.Int), + lastPublishedAt: make(map[string]time.Time), } } @@ -143,6 +147,83 @@ func (s *AttestorService) IsRunning() bool { return s.running } +func (s *AttestorService) publishPolicyEnabled() bool { + return s.config.Attestor.DeviationTrigger || s.config.Attestor.ForceUpdateInterval > 0 +} + +func (s *AttestorService) shouldPublishPriceUpdate(symbol string, price *big.Int, now time.Time) (bool, string) { + if !s.publishPolicyEnabled() { + return true, "poll" + } + + s.mu.RLock() + lastPrice, hasLastPrice := s.lastPublishedPrice[symbol] + lastPublishedAt, hasLastPublishedAt := s.lastPublishedAt[symbol] + s.mu.RUnlock() + + if !hasLastPrice || lastPrice == nil || lastPrice.Sign() <= 0 || !hasLastPublishedAt || lastPublishedAt.IsZero() { + return true, "initial" + } + + if s.config.Attestor.DeviationTrigger { + deviationBips := priceDeviationBips(lastPrice, price) + if deviationBips.Cmp(big.NewInt(int64(s.config.Attestor.DeviationThreshold))) >= 0 { + logger.WithFields(map[string]interface{}{ + "symbol": symbol, + "last_price": lastPrice.String(), + "new_price": price.String(), + "deviation_bips": deviationBips.String(), + "threshold_bips": s.config.Attestor.DeviationThreshold, + "publish_trigger": "deviation", + }).Info("Publishing due to price deviation") + return true, "deviation" + } + } + + if s.config.Attestor.ForceUpdateInterval > 0 && !now.Before(lastPublishedAt.Add(s.config.Attestor.ForceUpdateInterval)) { + logger.WithFields(map[string]interface{}{ + "symbol": symbol, + "time_since_publish": now.Sub(lastPublishedAt).String(), + "force_update_interval": s.config.Attestor.ForceUpdateInterval.String(), + "publish_trigger": "force_interval", + }).Info("Publishing due to force update interval") + return true, "force_interval" + } + + logger.WithFields(map[string]interface{}{ + "symbol": symbol, + "price": price.String(), + "last_price": lastPrice.String(), + "time_since_publish": now.Sub(lastPublishedAt).String(), + "deviation_trigger": s.config.Attestor.DeviationTrigger, + "force_update_interval": s.config.Attestor.ForceUpdateInterval.String(), + }).Debug("Skipping publish after polling price") + return false, "skip" +} + +func priceDeviationBips(oldPrice, newPrice *big.Int) *big.Int { + if oldPrice == nil || oldPrice.Sign() <= 0 { + return big.NewInt(0) + } + + deviation := new(big.Int).Sub(newPrice, oldPrice) + if deviation.Sign() < 0 { + deviation.Neg(deviation) + } + + deviation.Mul(deviation, big.NewInt(10000)) + deviation.Div(deviation, oldPrice) + return deviation +} + +func (s *AttestorService) recordPublishedPrice(symbol string, price *big.Int, publishedAt time.Time) { + s.mu.Lock() + defer s.mu.Unlock() + + s.lastPublishedPrice[symbol] = new(big.Int).Set(price) + s.lastPublishedAt[symbol] = publishedAt +} + func (s *AttestorService) shouldPublishInReplicaMode(ctx context.Context, symbol string) bool { if s.config.Attestor.Mode != config.ModeReplica { return true @@ -250,6 +331,11 @@ func (s *AttestorService) processSingleAttestation(ctx context.Context, symbol s return errors.NewOracleError(symbol, fmt.Sprintf("failed to fetch value (called %s)", contractFunction), err) } + shouldPublish, publishReason := s.shouldPublishPriceUpdate(symbol, price, time.Now()) + if !shouldPublish { + return nil + } + // Default volume volume := big.NewInt(1) @@ -277,11 +363,13 @@ func (s *AttestorService) processSingleAttestation(ctx context.Context, symbol s return errors.NewRegistryError("publish", "", err) } s.metrics.RecordIntentPublished(symbol, true) + s.recordPublishedPrice(symbol, price, time.Now()) logger.WithFields(map[string]interface{}{ - "symbol": symbol, - "tx_hash": txHash, - "duration": time.Since(start).String(), + "symbol": symbol, + "tx_hash": txHash, + "publish_trigger": publishReason, + "duration": time.Since(start).String(), }).Info("Successfully published intent") return nil @@ -298,6 +386,8 @@ func (s *AttestorService) processBatchAttestation(ctx context.Context) error { // Collect symbol data symbolData := make([]interfaces.SymbolData, 0, len(s.config.Attestor.Symbols)) + publishReasons := make(map[string]string) + shouldPublishBatch := !s.publishPolicyEnabled() symbolLoop: for _, symbol := range s.config.Attestor.Symbols { @@ -342,6 +432,12 @@ symbolLoop: continue } + shouldPublish, publishReason := s.shouldPublishPriceUpdate(symbol, price, time.Now()) + if shouldPublish { + shouldPublishBatch = true + publishReasons[symbol] = publishReason + } + volume := big.NewInt(1) logFields := map[string]interface{}{ @@ -356,13 +452,21 @@ symbolLoop: Price: price, Volume: volume, }) - s.metrics.RecordIntentCreated(symbol, true) } if len(symbolData) == 0 { return fmt.Errorf("no valid symbol data collected") } + if !shouldPublishBatch { + logger.WithField("symbol_count", len(symbolData)).Debug("Skipping batch publish after polling prices") + return nil + } + + for _, data := range symbolData { + s.metrics.RecordIntentCreated(data.Symbol, true) + } + // Sign batch intent signedIntent, err := s.signer.SignBatchIntent(ctx, symbolData) if err != nil { @@ -383,12 +487,14 @@ symbolLoop: for _, data := range symbolData { s.metrics.RecordIntentPublished(data.Symbol, true) + s.recordPublishedPrice(data.Symbol, data.Price, time.Now()) } logger.WithFields(map[string]interface{}{ - "symbol_count": len(symbolData), - "tx_hash": txHash, - "duration": time.Since(start).String(), + "symbol_count": len(symbolData), + "tx_hash": txHash, + "publish_triggers": publishReasons, + "duration": time.Since(start).String(), }).Info("Successfully published batch intent") return nil @@ -405,6 +511,11 @@ func (s *AttestorService) Health() map[string]interface{} { "symbols": s.config.Attestor.Symbols, "batch_mode": s.config.Attestor.BatchMode, "polling_time": s.config.Attestor.PollingTime.String(), + "publish_policy": map[string]interface{}{ + "deviation_trigger": s.config.Attestor.DeviationTrigger, + "deviation_threshold": s.config.Attestor.DeviationThreshold, + "force_update_interval": s.config.Attestor.ForceUpdateInterval.String(), + }, }, } }