Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions plugins/balance/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func (a *Balance) InitHttp() []router.Http {
return http.Router(srv)
}

func (a *Balance) ProcessExtrinsic(*storage.Block, *storage.Extrinsic, []storage.Event) error {
return nil
func (a *Balance) ProcessExtrinsic(block *storage.Block, _ *storage.Extrinsic, events []storage.Event) error {
return dao.CreateOmniBridgePayoutTransfers(context.TODO(), a.storage(), events, block)
}

func (a *Balance) ProcessEvent(block *storage.Block, event *storage.Event, _ decimal.Decimal) error {
Expand All @@ -118,7 +118,7 @@ func (a *Balance) ProcessEvent(block *storage.Block, event *storage.Event, _ dec
}

func (a *Balance) SubscribeExtrinsic() []string {
return nil
return []string{"omnibridge", "OmniBridge", "Omnibridge"}
}

func (a *Balance) SubscribeEvent() []string {
Expand Down
135 changes: 125 additions & 10 deletions plugins/balance/dao/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package dao
import (
"context"
"fmt"
"strings"

subscan_plugin "github.com/itering/subscan-plugin"
"github.com/itering/subscan-plugin/storage"
"github.com/itering/subscan/model"
bModel "github.com/itering/subscan/plugins/balance/model"
"github.com/itering/subscan/share/token"
"github.com/itering/subscan/util"
"github.com/shopspring/decimal"
"gorm.io/gorm"
)

Expand All @@ -17,6 +20,22 @@ type Storage struct {
Pool subscan_plugin.RedisPool
}

const (
TransferCategoryTransfer = "transfer"
TransferCategoryBridgeIn = "bridge_in"

TransferSourceBalances = "balances"
TransferSourceOmniBridge = "omnibridge"

TransferEventTransfer = "Transfer"
TransferEventMinted = "Minted"
TransferEventPaidOut = "PaidOut"

// OmniBridge payout rows use a stable synthetic sender because the chain
// emits incoming funds as balances.Minted without a source account.
OmniBridgeSyntheticSender = "omnibridge"
)

func EmitEvent(ctx context.Context, d *Storage, event *storage.Event, block *storage.Block) error {
var paramEvent []storage.EventParam
_ = util.UnmarshalAny(&paramEvent, event.Params)
Expand All @@ -26,23 +45,119 @@ func EmitEvent(ctx context.Context, d *Storage, event *storage.Event, block *sto
return RefreshAccount(ctx, d, model.CheckoutParamValueAddress(paramEvent[0].Value))
// ["AccountId","AccountId","Balance"]
case "Transfer":
from := model.CheckoutParamValueAddress(paramEvent[0].Value)
to := model.CheckoutParamValueAddress(paramEvent[1].Value)
balance := util.DecimalFromInterface(paramEvent[2].Value)
t := token.GetDefaultToken()
return CreateTransfer(ctx, d, &bModel.Transfer{
transfer := BalanceTransferFromEvent(event, block)
if transfer == nil {
return nil
}
return CreateTransfer(ctx, d, transfer)
}
return nil
}

func BalanceTransferFromEvent(event *storage.Event, block *storage.Block) *bModel.Transfer {
if event == nil || !strings.EqualFold(event.ModuleId, TransferSourceBalances) || !strings.EqualFold(event.EventId, TransferEventTransfer) {
return nil
}
var paramEvent []storage.EventParam
_ = util.UnmarshalAny(&paramEvent, event.Params)
if len(paramEvent) < 3 {
return nil
}
t := token.GetDefaultToken()
blockTimestamp := int64(0)
if block != nil {
blockTimestamp = int64(block.BlockTimestamp)
}
return &bModel.Transfer{
Id: event.Id,
Sender: model.CheckoutParamValueAddress(paramEvent[0].Value),
Receiver: model.CheckoutParamValueAddress(paramEvent[1].Value),
Amount: util.DecimalFromInterface(paramEvent[2].Value),
BlockNum: uint(event.BlockNum),
BlockTimestamp: blockTimestamp,
Symbol: t.Symbol,
TokenId: t.TokenId,
ExtrinsicIndex: fmt.Sprintf("%d-%d", event.BlockNum, event.ExtrinsicIdx),
Category: TransferCategoryTransfer,
SourceModule: TransferSourceBalances,
SourceEvent: TransferEventTransfer,
BalanceEvent: TransferEventTransfer,
}
}

func CreateOmniBridgePayoutTransfers(ctx context.Context, d *Storage, events []storage.Event, block *storage.Block) error {
for _, transfer := range OmniBridgePayoutTransfers(events, block) {
if err := CreateTransfer(ctx, d, transfer); err != nil {
return err
}
}
return nil
}

func OmniBridgePayoutTransfers(events []storage.Event, block *storage.Block) []*bModel.Transfer {
if !hasOmniBridgePaidOut(events) {
return nil
}
t := token.GetDefaultToken()
blockTimestamp := int64(0)
if block != nil {
blockTimestamp = int64(block.BlockTimestamp)
}
var transfers []*bModel.Transfer
for index := range events {
event := events[index]
if !strings.EqualFold(event.ModuleId, TransferSourceBalances) || !strings.EqualFold(event.EventId, TransferEventMinted) {
continue
}
var paramEvent []storage.EventParam
_ = util.UnmarshalAny(&paramEvent, event.Params)
if len(paramEvent) < 2 {
continue
}
receiver := model.CheckoutParamValueAddress(paramEvent[0].Value)
if receiver == "" {
continue
}
transfers = append(transfers, &bModel.Transfer{
Id: event.Id,
Sender: from,
Receiver: to,
Amount: balance,
Sender: OmniBridgeSyntheticSender,
Receiver: receiver,
Amount: balanceAmountFromEventParam(paramEvent[1].Value),
BlockNum: uint(event.BlockNum),
BlockTimestamp: int64(block.BlockTimestamp),
BlockTimestamp: blockTimestamp,
Symbol: t.Symbol,
TokenId: t.TokenId,
ExtrinsicIndex: fmt.Sprintf("%d-%d", event.BlockNum, event.ExtrinsicIdx),
Category: TransferCategoryBridgeIn,
SourceModule: TransferSourceOmniBridge,
SourceEvent: TransferEventPaidOut,
BalanceEvent: TransferEventMinted,
})
}
return nil
return transfers
}

func hasOmniBridgePaidOut(events []storage.Event) bool {
for index := range events {
event := events[index]
if strings.EqualFold(event.ModuleId, TransferSourceOmniBridge) && strings.EqualFold(event.EventId, TransferEventPaidOut) {
return true
}
}
return false
}

func balanceAmountFromEventParam(value interface{}) decimal.Decimal {
amount := util.DecimalFromInterface(value)
if !amount.IsZero() {
return amount
}
valueString := strings.TrimSpace(util.ToString(value))
trimmed := strings.TrimPrefix(valueString, "0x")
if valueString != "" && len(trimmed)%2 == 0 && len(trimmed) >= 32 {
return util.EvmReverseU256Decoder(valueString)
}
return amount
}

func RefreshMetadata(ctx context.Context, d *Storage) {
Expand Down
102 changes: 102 additions & 0 deletions plugins/balance/dao/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package dao

import (
"encoding/json"
"testing"

"github.com/itering/subscan-plugin/storage"
"github.com/itering/subscan/share/token"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const bridgeReceiver = "00f160c0e8fff2d4f00ab03e18dced9f2ac52a6b865cda497a33aee5b3fe335b"

func TestBalanceTransferFromEventMarksNormalTransferMetadata(t *testing.T) {
token.SetDefault(&token.Token{Symbol: "HEI", TokenId: "HEI"})
event := eventWithParams(100000000001, 1000000, 0, TransferSourceBalances, TransferEventTransfer, []storage.EventParam{
{Type: "AccountId", Value: "242f0781faa44f34ddcbc9e731d0ddb51c97f5b58bb2202090a3a1c679fc4c63"},
{Type: "AccountId", Value: bridgeReceiver},
{Type: "Balance", Value: "12345"},
})

transfer := BalanceTransferFromEvent(&event, &storage.Block{BlockTimestamp: 1770000000})

require.NotNil(t, transfer)
assert.Equal(t, TransferCategoryTransfer, transfer.Category)
assert.Equal(t, TransferSourceBalances, transfer.SourceModule)
assert.Equal(t, TransferEventTransfer, transfer.SourceEvent)
assert.Equal(t, TransferEventTransfer, transfer.BalanceEvent)
assert.True(t, decimal.RequireFromString("12345").Equal(transfer.Amount))
}

func TestOmniBridgePayoutTransfersCreatesBridgeInFromPaidOutAndMinted(t *testing.T) {
token.SetDefault(&token.Token{Symbol: "HEI", TokenId: "HEI"})
events := []storage.Event{
eventWithParams(971637600004, 9716376, 2, TransferSourceOmniBridge, TransferEventPaidOut, nil),
eventWithParams(971637600003, 9716376, 2, TransferSourceBalances, TransferEventMinted, []storage.EventParam{
{Type: "AccountId", Value: bridgeReceiver},
{Type: "Balance", Value: "0000E8890423C78A0000000000000000"},
}),
}

transfers := OmniBridgePayoutTransfers(events, &storage.Block{BlockTimestamp: 1780000000})

require.Len(t, transfers, 1)
assert.Equal(t, uint(971637600003), transfers[0].Id)
assert.Equal(t, OmniBridgeSyntheticSender, transfers[0].Sender)
assert.Equal(t, bridgeReceiver, transfers[0].Receiver)
assert.True(t, decimal.RequireFromString("10000000000000000000").Equal(transfers[0].Amount))
assert.Equal(t, uint(9716376), transfers[0].BlockNum)
assert.Equal(t, int64(1780000000), transfers[0].BlockTimestamp)
assert.Equal(t, "9716376-2", transfers[0].ExtrinsicIndex)
assert.Equal(t, TransferCategoryBridgeIn, transfers[0].Category)
assert.Equal(t, TransferSourceOmniBridge, transfers[0].SourceModule)
assert.Equal(t, TransferEventPaidOut, transfers[0].SourceEvent)
assert.Equal(t, TransferEventMinted, transfers[0].BalanceEvent)
}

func TestOmniBridgePayoutTransfersIgnoresUnrelatedMinted(t *testing.T) {
token.SetDefault(&token.Token{Symbol: "HEI", TokenId: "HEI"})
events := []storage.Event{
eventWithParams(971637600003, 9716376, 2, TransferSourceBalances, TransferEventMinted, []storage.EventParam{
{Type: "AccountId", Value: bridgeReceiver},
{Type: "Balance", Value: "10000000000000000000"},
}),
}

assert.Empty(t, OmniBridgePayoutTransfers(events, nil))
}

func TestOmniBridgePayoutTransfersIsStableForDuplicateReprocessing(t *testing.T) {
token.SetDefault(&token.Token{Symbol: "HEI", TokenId: "HEI"})
events := []storage.Event{
eventWithParams(971637600004, 9716376, 2, "OmniBridge", TransferEventPaidOut, nil),
eventWithParams(971637600003, 9716376, 2, "Balances", TransferEventMinted, []storage.EventParam{
{Type: "AccountId", Value: bridgeReceiver},
{Type: "Balance", Value: "10000000000000000000"},
}),
}

first := OmniBridgePayoutTransfers(events, nil)
second := OmniBridgePayoutTransfers(events, nil)

require.Len(t, first, 1)
require.Len(t, second, 1)
assert.Equal(t, first[0].Id, second[0].Id)
assert.Equal(t, uint(971637600003), second[0].Id)
assert.True(t, first[0].Amount.Equal(second[0].Amount))
}

func eventWithParams(id uint, blockNum int, extrinsicIdx int, moduleID string, eventID string, params []storage.EventParam) storage.Event {
raw, _ := json.Marshal(params)
return storage.Event{
Id: id,
BlockNum: blockNum,
ExtrinsicIdx: extrinsicIdx,
ModuleId: moduleID,
EventId: eventID,
Params: raw,
}
}
73 changes: 69 additions & 4 deletions plugins/balance/dao/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func RefreshAllAccount(sg *Storage, options ...RefreshAllAccountOptions) error {
func InitTransfer(sg *Storage) {
c := context.TODO()
db := sg.Dao.GetDbInstance().(*gorm.DB)
MarkMissingTransferMetadata(c, db)

blockNum, _ := sg.Dao.GetCurrentBlockNum(c)
for i := int(blockNum); i >= 0; i -= int(model.SplitTableBlockNum) {
Expand All @@ -235,16 +236,80 @@ func InitTransfer(sg *Storage) {
blocks[b.BlockNum] = b
}

var extrinsicIds []string
for _, e := range events {
extrinsicIds = append(extrinsicIds, e.ExtrinsicIndex)
}
for index := range events {
event := events[index]
_ = EmitEvent(c, sg, event.AsPlugin(), blocks[int(event.BlockNum)])
}
return nil
})
backfillOmniBridgePayoutTransfers(c, sg, db, tableName)
}

}

func MarkMissingTransferMetadata(ctx context.Context, db *gorm.DB) error {
return db.WithContext(ctx).
Model(&bModel.Transfer{}).
Where(
"category = '' OR category IS NULL OR source_module = '' OR source_module IS NULL OR source_event = '' OR source_event IS NULL OR balance_event = '' OR balance_event IS NULL",
).
Updates(map[string]interface{}{
"category": TransferCategoryTransfer,
"source_module": TransferSourceBalances,
"source_event": TransferEventTransfer,
"balance_event": TransferEventTransfer,
}).Error
}

func backfillOmniBridgePayoutTransfers(ctx context.Context, sg *Storage, db *gorm.DB, tableName string) {
var paidOutEvents []*model.ChainEvent
query := db.Table(tableName).
Where("LOWER(module_id) = ?", TransferSourceOmniBridge).
Where("LOWER(event_id) = ?", strings.ToLower(TransferEventPaidOut))
query.FindInBatches(&paidOutEvents, 50000, func(tx *gorm.DB, batch int) error {
extrinsicSeen := make(map[string]bool)
var extrinsicIds []string
for _, e := range paidOutEvents {
if e.ExtrinsicIndex == "" || extrinsicSeen[e.ExtrinsicIndex] {
continue
}
extrinsicSeen[e.ExtrinsicIndex] = true
extrinsicIds = append(extrinsicIds, e.ExtrinsicIndex)
}
if len(extrinsicIds) == 0 {
return nil
}

var groupedEvents []*model.ChainEvent
if err := tx.Table(tableName).
Where("extrinsic_index IN ?", extrinsicIds).
Where("(LOWER(module_id) = ? AND LOWER(event_id) = ?) OR (LOWER(module_id) = ? AND LOWER(event_id) = ?)",
TransferSourceOmniBridge, strings.ToLower(TransferEventPaidOut),
TransferSourceBalances, strings.ToLower(TransferEventMinted),
).
Order("id asc").
Find(&groupedEvents).Error; err != nil {
return err
}

var blockNums []uint
eventsByExtrinsic := make(map[string][]storage.Event)
for _, e := range groupedEvents {
eventsByExtrinsic[e.ExtrinsicIndex] = append(eventsByExtrinsic[e.ExtrinsicIndex], *e.AsPlugin())
if strings.EqualFold(e.ModuleId, TransferSourceBalances) && strings.EqualFold(e.EventId, TransferEventMinted) {
blockNums = append(blockNums, e.BlockNum)
}
}
blocks := make(map[int]*storage.Block)
for _, b := range sg.Dao.GetBlocksByNums(ctx, blockNums, "id,block_num,block_timestamp") {
blocks[b.BlockNum] = b
}
for _, events := range eventsByExtrinsic {
if len(events) == 0 {
continue
}
_ = CreateOmniBridgePayoutTransfers(ctx, sg, events, blocks[events[0].BlockNum])
}
return nil
})
}
4 changes: 4 additions & 0 deletions plugins/balance/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ type Transfer struct {
Symbol string `json:"symbol" gorm:"size:255"`
TokenId string `json:"token_id" gorm:"size:255"`
ExtrinsicIndex string `json:"extrinsic_index" gorm:"size:255;index:extrinsic_index"`
Category string `json:"category" gorm:"size:64;index"`
SourceModule string `json:"source_module" gorm:"size:64;index"`
SourceEvent string `json:"source_event" gorm:"size:64;index"`
BalanceEvent string `json:"balance_event" gorm:"size:64"`
}

func (a *Transfer) TableName() string {
Expand Down
Loading
Loading