diff --git a/sei-tendermint/internal/consensus/state.go b/sei-tendermint/internal/consensus/state.go index 14835acc9d..18125e153b 100644 --- a/sei-tendermint/internal/consensus/state.go +++ b/sei-tendermint/internal/consensus/state.go @@ -431,10 +431,7 @@ func (cs *State) OnStart(ctx context.Context) error { // we need the timeoutRoutine for replay so // we don't block on the tick chan. - // NOTE: we will get a build up of garbage go routines - // firing on the tockChan until the receiveRoutine is started - // to deal with them (by that point, at most one will be valid) - cs.Spawn("timeoutTicker", cs.timeoutTicker.Run) + cs.SpawnCritical("timeoutTicker", cs.timeoutTicker.Run) // We may have lost some votes if the process crashed reload from consensus // log to catchup. diff --git a/sei-tendermint/internal/consensus/ticker.go b/sei-tendermint/internal/consensus/ticker.go index a362249df9..cdb3b8d4bf 100644 --- a/sei-tendermint/internal/consensus/ticker.go +++ b/sei-tendermint/internal/consensus/ticker.go @@ -7,10 +7,6 @@ import ( "github.com/tendermint/tendermint/libs/utils/scope" ) -var ( - tickTockBufferSize = 10 -) - // TimeoutTicker is a timer that schedules timeouts // conditional on the height/round/step in the timeoutInfo. // The timeoutInfo.Duration may be non-positive. @@ -36,7 +32,7 @@ func NewTimeoutTicker(logger log.Logger) TimeoutTicker { tt := &timeoutTicker{ logger: logger, tick: utils.NewAtomicWatch(utils.None[timeoutInfo]()), - tockChan: make(chan timeoutInfo, tickTockBufferSize), + tockChan: make(chan timeoutInfo), } return tt } @@ -60,19 +56,33 @@ func (t *timeoutTicker) ScheduleTimeout(newti timeoutInfo) { // timers are interupted and replaced by new ticks from later steps // timeouts of 0 on the tickChan will be immediately relayed to the tockChan func (t *timeoutTicker) Run(ctx context.Context) error { + tock := utils.NewAtomicWatch(utils.None[timeoutInfo]()) // last fired timeout return scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { - return t.tick.Iter(ctx, func(ctx context.Context, mti utils.Option[timeoutInfo]) error { - ti, ok := mti.Get() + s.Spawn(func() error { + // Task measuring timeouts. + return t.tick.Iter(ctx, func(ctx context.Context, mti utils.Option[timeoutInfo]) error { + ti, ok := mti.Get() + if !ok { + return nil + } + t.logger.Debug("Internal state machine timeout scheduled", "duration", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) + if err := utils.Sleep(ctx, ti.Duration); err != nil { + return err + } + t.logger.Debug("Internal state machine timeout elapsed ", "duration", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) + tock.Store(utils.Some(ti)) + return nil + }) + }) + // Task reporting timeouts via channel. + // TODO(gprusak): it would be better to expose t.tock directly, + // however the receiving task doesn't support receiving from AtomicWatch yet. + return tock.Iter(ctx, func(ctx context.Context, mto utils.Option[timeoutInfo]) error { + to, ok := mto.Get() if !ok { return nil } - t.logger.Debug("Internal state machine timeout scheduled", "duration", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) - if err := utils.Sleep(ctx, ti.Duration); err != nil { - return err - } - t.logger.Debug("Internal state machine timeout elapsed ", "duration", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) - s.Spawn(func() error { return utils.Send(ctx, t.tockChan, ti) }) - return nil + return utils.Send(ctx, t.tockChan, to) }) }) } diff --git a/sei-tendermint/internal/consensus/ticker_test.go b/sei-tendermint/internal/consensus/ticker_test.go new file mode 100644 index 0000000000..867c96cd2f --- /dev/null +++ b/sei-tendermint/internal/consensus/ticker_test.go @@ -0,0 +1,62 @@ +package consensus + +import ( + "context" + "fmt" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/utils" + "github.com/tendermint/tendermint/libs/utils/scope" + "testing" + "time" +) + +func TestTicker(t *testing.T) { + err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error { + logger, _ := log.NewDefaultLogger("plain", "debug") + ticker := NewTimeoutTicker(logger) + ch := ticker.Chan() + s.SpawnBg(func() error { + err := ticker.Run(ctx) + if ctx.Err() == nil { + return fmt.Errorf("ticker terminated with %v, before the test ended", err) + } + return utils.IgnoreCancel(err) + }) + if got := len(ch); got > 0 { + return fmt.Errorf("expected empty, got len=%v", got) + } + + t.Log("Fill the channel.") + h := int64(0) + for h < int64(cap(ch)) { + h += 1 + ticker.ScheduleTimeout(timeoutInfo{Height: h, Duration: 0}) + for len(ch) < int(h) { + if err := utils.Sleep(ctx, 10*time.Millisecond); err != nil { + return err + } + } + } + t.Log("Add a bunch of timeouts blindly.") + for range 3 { + h += 1 + ticker.ScheduleTimeout(timeoutInfo{Height: h, Duration: 0}) + if err := utils.Sleep(ctx, 100*time.Millisecond); err != nil { + return err + } + } + t.Log("Await the latest timeout") + for { + got, err := utils.Recv(ctx, ch) + if err != nil { + return err + } + if got.Height == h { + return nil + } + } + }) + if err != nil { + t.Fatal(err) + } +} diff --git a/sei-tendermint/libs/service/service.go b/sei-tendermint/libs/service/service.go index c96b32046b..c6e3185330 100644 --- a/sei-tendermint/libs/service/service.go +++ b/sei-tendermint/libs/service/service.go @@ -2,6 +2,7 @@ package service import ( "context" + "errors" "fmt" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/utils" @@ -165,6 +166,10 @@ func (bs *BaseService) Spawn(name string, task func(ctx context.Context) error) }() } +// Spawns a critical task which should run until success OR as long as the service is running. +// It panics in any of the following cases: +// * task returns context.Canceled BEFORE the service is canceled. +// * task returns an error other than context.Canceled. func (bs *BaseService) SpawnCritical(name string, task func(ctx context.Context) error) { inner := bs.inner.Load() if inner == nil { @@ -174,8 +179,10 @@ func (bs *BaseService) SpawnCritical(name string, task func(ctx context.Context) inner.wg.Add(1) go func() { defer inner.wg.Done() - if err := utils.IgnoreCancel(task(inner.ctx)); err != nil { - panic(fmt.Sprintf("critical task failed: name=%v, service=%v: %v", name, bs.name, err)) + if err := task(inner.ctx); err != nil { + if !errors.Is(err, context.Canceled) || inner.ctx.Err() == nil { + panic(fmt.Sprintf("critical task failed: name=%v, service=%v: %v", name, bs.name, err)) + } } }() }