Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions sei-tendermint/internal/consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,7 @@ func (cs *State) OnStart(ctx context.Context) error {

// we need the timeoutRoutine for replay so
// we don't block on the tick chan.
// NOTE: we will get a build up of garbage go routines
// firing on the tockChan until the receiveRoutine is started
// to deal with them (by that point, at most one will be valid)
cs.Spawn("timeoutTicker", cs.timeoutTicker.Run)
cs.SpawnCritical("timeoutTicker", cs.timeoutTicker.Run)

// We may have lost some votes if the process crashed reload from consensus
// log to catchup.
Expand Down
38 changes: 24 additions & 14 deletions sei-tendermint/internal/consensus/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ import (
"github.com/tendermint/tendermint/libs/utils/scope"
)

var (
tickTockBufferSize = 10
)

// TimeoutTicker is a timer that schedules timeouts
// conditional on the height/round/step in the timeoutInfo.
// The timeoutInfo.Duration may be non-positive.
Expand All @@ -36,7 +32,7 @@ func NewTimeoutTicker(logger log.Logger) TimeoutTicker {
tt := &timeoutTicker{
logger: logger,
tick: utils.NewAtomicWatch(utils.None[timeoutInfo]()),
tockChan: make(chan timeoutInfo, tickTockBufferSize),
tockChan: make(chan timeoutInfo),
}
return tt
}
Expand All @@ -60,19 +56,33 @@ func (t *timeoutTicker) ScheduleTimeout(newti timeoutInfo) {
// timers are interupted and replaced by new ticks from later steps
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan
func (t *timeoutTicker) Run(ctx context.Context) error {
tock := utils.NewAtomicWatch(utils.None[timeoutInfo]()) // last fired timeout
return scope.Run(ctx, func(ctx context.Context, s scope.Scope) error {
return t.tick.Iter(ctx, func(ctx context.Context, mti utils.Option[timeoutInfo]) error {
ti, ok := mti.Get()
s.Spawn(func() error {
// Task measuring timeouts.
return t.tick.Iter(ctx, func(ctx context.Context, mti utils.Option[timeoutInfo]) error {
ti, ok := mti.Get()
if !ok {
return nil
}
t.logger.Debug("Internal state machine timeout scheduled", "duration", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
if err := utils.Sleep(ctx, ti.Duration); err != nil {
return err
}
t.logger.Debug("Internal state machine timeout elapsed ", "duration", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
tock.Store(utils.Some(ti))
return nil
})
})
// Task reporting timeouts via channel.
// TODO(gprusak): it would be better to expose t.tock directly,
// however the receiving task doesn't support receiving from AtomicWatch yet.
return tock.Iter(ctx, func(ctx context.Context, mto utils.Option[timeoutInfo]) error {
to, ok := mto.Get()
if !ok {
return nil
}
t.logger.Debug("Internal state machine timeout scheduled", "duration", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
if err := utils.Sleep(ctx, ti.Duration); err != nil {
return err
}
t.logger.Debug("Internal state machine timeout elapsed ", "duration", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
s.Spawn(func() error { return utils.Send(ctx, t.tockChan, ti) })
return nil
return utils.Send(ctx, t.tockChan, to)
})
})
}
62 changes: 62 additions & 0 deletions sei-tendermint/internal/consensus/ticker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package consensus

import (
"context"
"fmt"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/utils"
"github.com/tendermint/tendermint/libs/utils/scope"
"testing"
"time"
)

func TestTicker(t *testing.T) {
err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error {
logger, _ := log.NewDefaultLogger("plain", "debug")
ticker := NewTimeoutTicker(logger)
ch := ticker.Chan()
s.SpawnBg(func() error {
err := ticker.Run(ctx)
if ctx.Err() == nil {
return fmt.Errorf("ticker terminated with %v, before the test ended", err)
}
return utils.IgnoreCancel(err)
})
if got := len(ch); got > 0 {
return fmt.Errorf("expected empty, got len=%v", got)
}

t.Log("Fill the channel.")
h := int64(0)
for h < int64(cap(ch)) {
h += 1
ticker.ScheduleTimeout(timeoutInfo{Height: h, Duration: 0})
for len(ch) < int(h) {
if err := utils.Sleep(ctx, 10*time.Millisecond); err != nil {
return err
}
}
}
t.Log("Add a bunch of timeouts blindly.")
for range 3 {
h += 1
ticker.ScheduleTimeout(timeoutInfo{Height: h, Duration: 0})
if err := utils.Sleep(ctx, 100*time.Millisecond); err != nil {
return err
}
}
t.Log("Await the latest timeout")
for {
got, err := utils.Recv(ctx, ch)
if err != nil {
return err
}
if got.Height == h {
return nil
}
}
})
if err != nil {
t.Fatal(err)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use testify since the repo depends on it already and other tests use it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are 2 problems with testify:

  • it is weakly typed (I've provided strongly typed wrapper in sei-tendermint/libs/utils/require)
  • require package works only from the test's main goroutine (same as t.Fatal though), assert package is a wrapper on t.Errorf which reports errors lazily (the test actually continues after t.Errorf call, which obfuscates error - extra logs, potential deadlocks, cascading errors).

Hence, I prefer regular error passing (or optionally panics for test helpers, for which error passing gets too verbose). The final t.Fatal call is just for reporting the error back to the testing harness.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK thanks for explaining.

Would be nice to maintain less custom code and converge on using existing popular enough libraries maintained by someone other than us 😅

But both approaches work.

}
}
11 changes: 9 additions & 2 deletions sei-tendermint/libs/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"errors"
"fmt"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/utils"
Expand Down Expand Up @@ -165,6 +166,10 @@ func (bs *BaseService) Spawn(name string, task func(ctx context.Context) error)
}()
}

// Spawns a critical task which should run until success OR as long as the service is running.
// It panics in any of the following cases:
// * task returns context.Canceled BEFORE the service is canceled.
// * task returns an error other than context.Canceled.
func (bs *BaseService) SpawnCritical(name string, task func(ctx context.Context) error) {
inner := bs.inner.Load()
if inner == nil {
Expand All @@ -174,8 +179,10 @@ func (bs *BaseService) SpawnCritical(name string, task func(ctx context.Context)
inner.wg.Add(1)
go func() {
defer inner.wg.Done()
if err := utils.IgnoreCancel(task(inner.ctx)); err != nil {
panic(fmt.Sprintf("critical task failed: name=%v, service=%v: %v", name, bs.name, err))
if err := task(inner.ctx); err != nil {
if !errors.Is(err, context.Canceled) || inner.ctx.Err() == nil {
panic(fmt.Sprintf("critical task failed: name=%v, service=%v: %v", name, bs.name, err))
}
}
}()
}
Expand Down
Loading