Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions services/attestor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,16 @@ func main() {
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

logFields := map[string]interface{}{
"symbols": cfg.Attestor.Symbols,
"oracle": cfg.Oracle.Address,
"oracle_client_type": cfg.Oracle.ClientType.String(),
"registry": cfg.Registry.Address,
"polling_time": cfg.Attestor.PollingTime.String(),
"batch_mode": cfg.Attestor.BatchMode,
"mode": cfg.Attestor.Mode.String(),
"symbols": cfg.Attestor.Symbols,
"oracle": cfg.Oracle.Address,
"oracle_client_type": cfg.Oracle.ClientType.String(),
"registry": cfg.Registry.Address,
"polling_time": cfg.Attestor.PollingTime.String(),
"batch_mode": cfg.Attestor.BatchMode,
"mode": cfg.Attestor.Mode.String(),
"deviation_trigger": cfg.Attestor.DeviationTrigger,
"deviation_threshold_bips": cfg.Attestor.DeviationThreshold,
"force_update_interval": cfg.Attestor.ForceUpdateInterval.String(),
}

if cfg.Attestor.Mode == config.ModeReplica {
Expand Down
35 changes: 26 additions & 9 deletions services/attestor/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,18 @@ type Config struct {
} `mapstructure:"registry"`

Attestor struct {
PrivateKey string `mapstructure:"private_key"`
Symbols []string `mapstructure:"symbols"`
PollingTime time.Duration `mapstructure:"polling_time"`
BatchMode bool `mapstructure:"batch_mode"`
Mode AttestorMode `mapstructure:"mode"`
ReplicaBackupDelay int `mapstructure:"replica_backup_delay"`
IntentType string `mapstructure:"intent_type"`
IntentVersion string `mapstructure:"intent_version"`
Guardian GuardianConfig `mapstructure:"guardian"`
PrivateKey string `mapstructure:"private_key"`
Symbols []string `mapstructure:"symbols"`
PollingTime time.Duration `mapstructure:"polling_time"`
BatchMode bool `mapstructure:"batch_mode"`
Mode AttestorMode `mapstructure:"mode"`
ReplicaBackupDelay int `mapstructure:"replica_backup_delay"`
IntentType string `mapstructure:"intent_type"`
IntentVersion string `mapstructure:"intent_version"`
DeviationTrigger bool `mapstructure:"deviation_trigger"`
DeviationThreshold int `mapstructure:"deviation_threshold"`
ForceUpdateInterval time.Duration `mapstructure:"force_update_interval"`
Guardian GuardianConfig `mapstructure:"guardian"`
} `mapstructure:"attestor"`

Logging struct {
Expand Down Expand Up @@ -181,6 +184,9 @@ func Init(configPath string) (*Config, error) {
v.BindEnv("attestor.replica_backup_delay", "ATTESTOR_ATTESTOR_REPLICA_BACKUP_DELAY")
v.BindEnv("attestor.intent_type", "ATTESTOR_ATTESTOR_INTENT_TYPE")
v.BindEnv("attestor.intent_version", "ATTESTOR_ATTESTOR_INTENT_VERSION")
v.BindEnv("attestor.deviation_trigger", "ATTESTOR_ATTESTOR_DEVIATION_TRIGGER")
v.BindEnv("attestor.deviation_threshold", "ATTESTOR_ATTESTOR_DEVIATION_THRESHOLD")
v.BindEnv("attestor.force_update_interval", "ATTESTOR_ATTESTOR_FORCE_UPDATE_INTERVAL")
v.BindEnv("oracle.client_type", "ATTESTOR_ORACLE_CLIENT_TYPE")
v.BindEnv("attestor.guardian.default.max_deviation_bips", "ATTESTOR_GUARDIAN_MAX_DEVIATION_BIPS")
v.BindEnv("attestor.guardian.default.max_timestamp_age", "ATTESTOR_GUARDIAN_MAX_TIMESTAMP_AGE")
Expand All @@ -199,6 +205,9 @@ func Init(configPath string) (*Config, error) {
v.SetDefault("attestor.replica_backup_delay", 300)
v.SetDefault("attestor.intent_type", "OracleUpdate")
v.SetDefault("attestor.intent_version", "1.0")
v.SetDefault("attestor.deviation_trigger", false)
v.SetDefault("attestor.deviation_threshold", 50)
v.SetDefault("attestor.force_update_interval", "0s")
v.SetDefault("attestor.guardian.default.max_deviation_bips", 500)
v.SetDefault("attestor.guardian.default.max_timestamp_age", 3600)
v.SetDefault("attestor.guardian.default.min_guardian_matches", 1)
Expand Down Expand Up @@ -317,6 +326,14 @@ func validateConfig(cfg *Config) error {
return fmt.Errorf("invalid polling time: %v", cfg.Attestor.PollingTime)
}

if cfg.Attestor.DeviationThreshold < 0 || cfg.Attestor.DeviationThreshold > 10000 {
return fmt.Errorf("invalid deviation_threshold: %d (must be between 0 and 10000)", cfg.Attestor.DeviationThreshold)
}

if cfg.Attestor.ForceUpdateInterval < 0 {
return fmt.Errorf("invalid force_update_interval: %v (must be zero or positive)", cfg.Attestor.ForceUpdateInterval)
}

// Validate guardian params
if err := validateGuardianConfig(&cfg.Attestor.Guardian); err != nil {
return fmt.Errorf("guardian configuration invalid: %w", err)
Expand Down
15 changes: 9 additions & 6 deletions services/attestor/pkg/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,15 @@ func ParseOracleClientType(s string) (OracleClientType, error) {

// AttestorConfig holds attestor-specific configuration
type AttestorConfig struct {
PrivateKey string `mapstructure:"private_key"`
Symbols []string `mapstructure:"symbols"`
PollingTime time.Duration `mapstructure:"polling_time"`
BatchMode bool `mapstructure:"batch_mode"`
IntentType string `mapstructure:"intent_type"`
IntentVersion string `mapstructure:"intent_version"`
PrivateKey string `mapstructure:"private_key"`
Symbols []string `mapstructure:"symbols"`
PollingTime time.Duration `mapstructure:"polling_time"`
BatchMode bool `mapstructure:"batch_mode"`
IntentType string `mapstructure:"intent_type"`
IntentVersion string `mapstructure:"intent_version"`
DeviationTrigger bool `mapstructure:"deviation_trigger"`
DeviationThreshold int `mapstructure:"deviation_threshold"`
ForceUpdateInterval time.Duration `mapstructure:"force_update_interval"`
}

// OracleConfig holds oracle configuration
Expand Down
141 changes: 126 additions & 15 deletions services/attestor/pkg/service/attestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ type AttestorService struct {
signer interfaces.IntentSigner
metrics interfaces.MetricsCollector

mu sync.RWMutex
running bool
cancelFunc context.CancelFunc
mu sync.RWMutex
running bool
cancelFunc context.CancelFunc
lastPublishedPrice map[string]*big.Int
lastPublishedAt map[string]time.Time
}

// NewAttestorService creates a new attestor service
Expand All @@ -35,11 +37,13 @@ func NewAttestorService(
metrics interfaces.MetricsCollector,
) *AttestorService {
return &AttestorService{
config: cfg,
oracle: oracle,
registry: registry,
signer: signer,
metrics: metrics,
config: cfg,
oracle: oracle,
registry: registry,
signer: signer,
metrics: metrics,
lastPublishedPrice: make(map[string]*big.Int),
lastPublishedAt: make(map[string]time.Time),
}
}

Expand Down Expand Up @@ -143,6 +147,83 @@ func (s *AttestorService) IsRunning() bool {
return s.running
}

func (s *AttestorService) publishPolicyEnabled() bool {
return s.config.Attestor.DeviationTrigger || s.config.Attestor.ForceUpdateInterval > 0
}

func (s *AttestorService) shouldPublishPriceUpdate(symbol string, price *big.Int, now time.Time) (bool, string) {
if !s.publishPolicyEnabled() {
return true, "poll"
}

s.mu.RLock()
lastPrice, hasLastPrice := s.lastPublishedPrice[symbol]
lastPublishedAt, hasLastPublishedAt := s.lastPublishedAt[symbol]
s.mu.RUnlock()

if !hasLastPrice || lastPrice == nil || lastPrice.Sign() <= 0 || !hasLastPublishedAt || lastPublishedAt.IsZero() {
return true, "initial"
}

if s.config.Attestor.DeviationTrigger {
deviationBips := priceDeviationBips(lastPrice, price)
if deviationBips.Cmp(big.NewInt(int64(s.config.Attestor.DeviationThreshold))) >= 0 {
logger.WithFields(map[string]interface{}{
"symbol": symbol,
"last_price": lastPrice.String(),
"new_price": price.String(),
"deviation_bips": deviationBips.String(),
"threshold_bips": s.config.Attestor.DeviationThreshold,
"publish_trigger": "deviation",
}).Info("Publishing due to price deviation")
return true, "deviation"
}
}

if s.config.Attestor.ForceUpdateInterval > 0 && !now.Before(lastPublishedAt.Add(s.config.Attestor.ForceUpdateInterval)) {
logger.WithFields(map[string]interface{}{
"symbol": symbol,
"time_since_publish": now.Sub(lastPublishedAt).String(),
"force_update_interval": s.config.Attestor.ForceUpdateInterval.String(),
"publish_trigger": "force_interval",
}).Info("Publishing due to force update interval")
return true, "force_interval"
}

logger.WithFields(map[string]interface{}{
"symbol": symbol,
"price": price.String(),
"last_price": lastPrice.String(),
"time_since_publish": now.Sub(lastPublishedAt).String(),
"deviation_trigger": s.config.Attestor.DeviationTrigger,
"force_update_interval": s.config.Attestor.ForceUpdateInterval.String(),
}).Debug("Skipping publish after polling price")
return false, "skip"
}

func priceDeviationBips(oldPrice, newPrice *big.Int) *big.Int {
if oldPrice == nil || oldPrice.Sign() <= 0 {
return big.NewInt(0)
}

deviation := new(big.Int).Sub(newPrice, oldPrice)
if deviation.Sign() < 0 {
deviation.Neg(deviation)
}

deviation.Mul(deviation, big.NewInt(10000))
deviation.Div(deviation, oldPrice)
return deviation
}

func (s *AttestorService) recordPublishedPrice(symbol string, price *big.Int, publishedAt time.Time) {
s.mu.Lock()
defer s.mu.Unlock()

s.lastPublishedPrice[symbol] = new(big.Int).Set(price)
s.lastPublishedAt[symbol] = publishedAt
}

func (s *AttestorService) shouldPublishInReplicaMode(ctx context.Context, symbol string) bool {
if s.config.Attestor.Mode != config.ModeReplica {
return true
Expand Down Expand Up @@ -250,6 +331,11 @@ func (s *AttestorService) processSingleAttestation(ctx context.Context, symbol s
return errors.NewOracleError(symbol, fmt.Sprintf("failed to fetch value (called %s)", contractFunction), err)
}

shouldPublish, publishReason := s.shouldPublishPriceUpdate(symbol, price, time.Now())
if !shouldPublish {
return nil
}

// Default volume
volume := big.NewInt(1)

Expand Down Expand Up @@ -277,11 +363,13 @@ func (s *AttestorService) processSingleAttestation(ctx context.Context, symbol s
return errors.NewRegistryError("publish", "", err)
}
s.metrics.RecordIntentPublished(symbol, true)
s.recordPublishedPrice(symbol, price, time.Now())

logger.WithFields(map[string]interface{}{
"symbol": symbol,
"tx_hash": txHash,
"duration": time.Since(start).String(),
"symbol": symbol,
"tx_hash": txHash,
"publish_trigger": publishReason,
"duration": time.Since(start).String(),
}).Info("Successfully published intent")

return nil
Expand All @@ -298,6 +386,8 @@ func (s *AttestorService) processBatchAttestation(ctx context.Context) error {

// Collect symbol data
symbolData := make([]interfaces.SymbolData, 0, len(s.config.Attestor.Symbols))
publishReasons := make(map[string]string)
shouldPublishBatch := !s.publishPolicyEnabled()

symbolLoop:
for _, symbol := range s.config.Attestor.Symbols {
Expand Down Expand Up @@ -342,6 +432,12 @@ symbolLoop:
continue
}

shouldPublish, publishReason := s.shouldPublishPriceUpdate(symbol, price, time.Now())
if shouldPublish {
shouldPublishBatch = true
publishReasons[symbol] = publishReason
}

volume := big.NewInt(1)

logFields := map[string]interface{}{
Expand All @@ -356,13 +452,21 @@ symbolLoop:
Price: price,
Volume: volume,
})
s.metrics.RecordIntentCreated(symbol, true)
}

if len(symbolData) == 0 {
return fmt.Errorf("no valid symbol data collected")
}

if !shouldPublishBatch {
logger.WithField("symbol_count", len(symbolData)).Debug("Skipping batch publish after polling prices")
return nil
}

for _, data := range symbolData {
s.metrics.RecordIntentCreated(data.Symbol, true)
}

// Sign batch intent
signedIntent, err := s.signer.SignBatchIntent(ctx, symbolData)
if err != nil {
Expand All @@ -383,12 +487,14 @@ symbolLoop:

for _, data := range symbolData {
s.metrics.RecordIntentPublished(data.Symbol, true)
s.recordPublishedPrice(data.Symbol, data.Price, time.Now())
}

logger.WithFields(map[string]interface{}{
"symbol_count": len(symbolData),
"tx_hash": txHash,
"duration": time.Since(start).String(),
"symbol_count": len(symbolData),
"tx_hash": txHash,
"publish_triggers": publishReasons,
"duration": time.Since(start).String(),
}).Info("Successfully published batch intent")

return nil
Expand All @@ -405,6 +511,11 @@ func (s *AttestorService) Health() map[string]interface{} {
"symbols": s.config.Attestor.Symbols,
"batch_mode": s.config.Attestor.BatchMode,
"polling_time": s.config.Attestor.PollingTime.String(),
"publish_policy": map[string]interface{}{
"deviation_trigger": s.config.Attestor.DeviationTrigger,
"deviation_threshold": s.config.Attestor.DeviationThreshold,
"force_update_interval": s.config.Attestor.ForceUpdateInterval.String(),
},
},
}
}
Loading