From 3c3f35c8c5002476f0f70590af96df127cd058b2 Mon Sep 17 00:00:00 2001 From: Daniel Liu <139250065@qq.com> Date: Thu, 21 May 2026 15:30:40 +0800 Subject: [PATCH] fix(eth/bft): avoid broadcast send blocking during shutdown Guard BFT broadcast enqueue with the quit channel so Vote/Timeout/SyncInfo exit cleanly once shutdown starts. Add a regression test to ensure Vote returns after BFT loop stop and does not hang. --- eth/bft/bft_handler.go | 21 ++++++++++++++++++--- eth/bft/bft_handler_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/eth/bft/bft_handler.go b/eth/bft/bft_handler.go index 72d4e2283f5c..6122ff9ddecd 100644 --- a/eth/bft/bft_handler.go +++ b/eth/bft/bft_handler.go @@ -94,7 +94,9 @@ func (b *Bfter) Vote(peer string, vote *types.Vote) error { } if verified { - b.broadcastCh <- vote + if !b.enqueueForBroadcast(vote) { + return nil + } err = b.consensus.voteHandler(b.blockChainReader, vote) if err != nil { if _, ok := err.(*utils.ErrIncomingMessageRoundTooFarFromCurrentRound); ok { @@ -129,7 +131,9 @@ func (b *Bfter) Timeout(peer string, timeout *types.Timeout) error { log.Debug("[Timeout] Received Timeout", "gap", gapNum, "hash", timeout.Hash().Hex(), "round", timeout.Round, "signer", timeout.GetSigner().Hex()) //get signer after verifyTimeout if verified { - b.broadcastCh <- timeout + if !b.enqueueForBroadcast(timeout) { + return nil + } err = b.consensus.timeoutHandler(b.blockChainReader, timeout) if err != nil { if _, ok := err.(*utils.ErrIncomingMessageRoundNotEqualCurrentRound); ok { @@ -170,7 +174,9 @@ func (b *Bfter) SyncInfo(peer string, syncInfo *types.SyncInfo) error { // Process only if verified and qualified if verified { - b.broadcastCh <- syncInfo + if !b.enqueueForBroadcast(syncInfo) { + return nil + } err = b.consensus.syncInfoHandler(b.blockChainReader, syncInfo) if err != nil { log.Error("[SyncInfo] Handle BFT SyncInfo", "error", err) @@ -180,6 +186,15 @@ func (b *Bfter) SyncInfo(peer string, syncInfo *types.SyncInfo) error { return nil } +func (b *Bfter) enqueueForBroadcast(msg interface{}) bool { + select { + case <-b.quit: + return false + case b.broadcastCh <- msg: + return true + } +} + // Start Bft receiver func (b *Bfter) Start() { go b.loop() diff --git a/eth/bft/bft_handler_test.go b/eth/bft/bft_handler_test.go index b338e64a6664..23f3cc26851b 100644 --- a/eth/bft/bft_handler_test.go +++ b/eth/bft/bft_handler_test.go @@ -393,3 +393,28 @@ func TestTooFarSyncInfo(t *testing.T) { t.Fatalf("count mismatch: have %v on verify, have %v on handler, %v on broadcast, want %v", verifyCounter, handlerCounter, broadcastCounter, targetSyncInfo) } } + +func TestVoteReturnsAfterBftStop(t *testing.T) { + tester := newTester() + tester.bfter.consensus.verifyVote = func(chain consensus.ChainReader, vote *types.Vote) (bool, error) { + return true, nil + } + tester.bfter.consensus.voteHandler = func(chain consensus.ChainReader, vote *types.Vote) error { + return nil + } + + tester.bfter.Stop() + + vote := types.Vote{ProposedBlockInfo: &types.BlockInfo{Number: big.NewInt(1350)}} + done := make(chan struct{}) + go func() { + _ = tester.bfter.Vote(peerID, &vote) + close(done) + }() + + select { + case <-done: + case <-time.After(300 * time.Millisecond): + t.Fatal("Vote blocks after bft loop is stopped") + } +}