Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 35 additions & 15 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
}

// Construct the different synchronisation mechanisms
manager.downloader = downloader.New(chaindb, manager.eventMux, blockchain, nil, manager.removePeer, handleProposedBlock)
manager.downloader = downloader.New(chaindb, manager.eventMux, blockchain, nil, manager.removePeerByID, handleProposedBlock)

validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true)
Expand Down Expand Up @@ -237,7 +237,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
return manager.blockchain.PrepareBlock(block)
}
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, handleProposedBlock, manager.BroadcastBlock, heighter, inserter, prepare, manager.removePeer)
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, handleProposedBlock, manager.BroadcastBlock, heighter, inserter, prepare, manager.removePeerByID)
//Define bft function
broadcasts := bft.BroadcastFns{
Vote: manager.BroadcastVote,
Expand All @@ -261,23 +261,43 @@ func (pm *ProtocolManager) addLendingPoolProtocol(lendingpool lendingPool) {
pm.lendingpool = lendingpool
}

func (pm *ProtocolManager) removePeer(id string) {
// Short circuit if the peer was already removed
peer := pm.peers.Peer(id)
// removePeer disconnects a peer instance, unregistering it only when it is the
// current primary connection because the downloader invariant is primary-only.
func (pm *ProtocolManager) removePeer(peer *peer) {
if peer == nil {
return
}
log.Debug("Removing Ethereum peer", "peer", id)
removedPrimary, err := pm.peers.UnregisterPeer(peer)
if err != nil {
if errors.Is(err, errPairNotRegistered) {
log.Debug("Stale paired peer removal", "peer", peer.id, "err", err)
} else if errors.Is(err, errNotRegistered) {
log.Debug("Peer already removed", "peer", peer.id)
} else {
log.Warn("Peer removal failed", "peer", peer.id, "err", err)
}
// Intentionally disconnect even on not-registered errors. For an
// already tearing-down peer this is redundant, and for a stale paired
// peer it is a harmless idempotent fallback that keeps cleanup robust.
peer.Peer.Disconnect(p2p.DiscUselessPeer)
return
}
log.Debug("Removing Ethereum peer", "peer", peer.id)

// Unregister the peer from the downloader and Ethereum peer set
pm.downloader.UnregisterPeer(id)
if err := pm.peers.Unregister(id); err != nil {
log.Debug("Peer removal failed", "peer", id, "err", err)
// Only the currently registered primary connection is tracked by the
// downloader. Paired connections skip downloader.RegisterPeer in handle.
if removedPrimary {
pm.downloader.UnregisterPeer(peer.id)
}
// Hard disconnect at the networking layer
peer.Peer.Disconnect(p2p.DiscUselessPeer)
}

// removePeerByID adapts downloader and fetcher callbacks that only expose a peer id.
func (pm *ProtocolManager) removePeerByID(id string) {
pm.removePeer(pm.peers.Peer(id))
}

func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers

Expand Down Expand Up @@ -369,7 +389,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
p.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
defer pm.removePeer(p.id)
defer pm.removePeer(p)
if err != p2p.ErrAddPairPeer {
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
Expand All @@ -389,7 +409,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
p.Log().Debug("Timed out DAO fork-check, dropping")
pm.removePeer(p.id)
pm.removePeer(p)
})
// Make sure it's cleaned up if the peer dies off
defer func() {
Expand Down Expand Up @@ -958,7 +978,7 @@ func (pm *ProtocolManager) BroadcastVote(vote *types.Vote) {
err := peer.SendVote(vote)
if err != nil {
log.Debug("[BroadcastVote] Fail to broadcast vote message", "peerId", peer.id, "version", peer.version, "blockNum", vote.ProposedBlockInfo.Number, "err", err)
pm.removePeer(peer.id)
pm.removePeer(peer)
}
}
log.Trace("Propagated Vote", "vote hash", vote.Hash(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round, "recipients", len(peers))
Expand All @@ -975,7 +995,7 @@ func (pm *ProtocolManager) BroadcastTimeout(timeout *types.Timeout) {
err := peer.SendTimeout(timeout)
if err != nil {
log.Debug("[BroadcastTimeout] Fail to broadcast timeout message, remove peer", "peerId", peer.id, "version", peer.version, "timeout", timeout, "err", err)
pm.removePeer(peer.id)
pm.removePeer(peer)
}
}
log.Trace("Propagated Timeout", "hash", hash, "recipients", len(peers))
Expand All @@ -992,7 +1012,7 @@ func (pm *ProtocolManager) BroadcastSyncInfo(syncInfo *types.SyncInfo) {
err := peer.SendSyncInfo(syncInfo)
if err != nil {
log.Debug("[BroadcastSyncInfo] Fail to broadcast syncInfo message, remove peer", "peerId", peer.id, "version", peer.version, "syncInfo", syncInfo, "err", err)
pm.removePeer(peer.id)
pm.removePeer(peer)
}
}
log.Trace("Propagated SyncInfo", "hash", hash, "recipients", len(peers))
Expand Down
140 changes: 140 additions & 0 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/eth/ethconfig"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/p2p/discover"
"github.com/XinFinOrg/XDPoSChain/params"
)

Expand Down Expand Up @@ -73,6 +74,145 @@ func TestGetBlockHeaders62(t *testing.T) { testGetBlockHeaders(t, 62) }
// TestGetBlockHeaders63 tests get block headers 63.
func TestGetBlockHeaders63(t *testing.T) { testGetBlockHeaders(t, 63) }

func TestGetBlockHeadersAfterPairDrop63(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxHashFetch+15, nil, nil)
defer pm.Stop()

var id discover.NodeID
id[0] = 1

newPeerWithID := func(name string) (*testPeer, <-chan error) {
app, net := p2p.MsgPipe()
peer := pm.newPeer(eth63, p2p.NewPeer(id, name, nil), net)

errc := make(chan error, 1)
go func() {
select {
case pm.newPeerCh <- peer:
errc <- pm.handle(peer)
case <-pm.quitSync:
errc <- p2p.DiscQuitting
}
}()

return &testPeer{app: app, net: net, peer: peer}, errc
}

primary, primaryErrc := newPeerWithID("primary")
defer primary.close()
pair, pairErrc := newPeerWithID("pair")

var (
genesis = pm.blockchain.Genesis()
head = pm.blockchain.CurrentHeader()
hash = head.Hash()
td = pm.blockchain.GetTd(hash, head.Number.Uint64())
)
primary.handshake(t, td, hash, genesis.Hash())
pair.handshake(t, td, hash, genesis.Hash())

ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
timeout := time.After(3 * time.Second)
for primary.pairWriter() == nil {
select {
case <-ticker.C:
case <-timeout:
t.Fatalf("pairing state not established: peers=%d pairRWSet=%t", pm.peers.Len(), primary.pairWriter() != nil)
}
}

pair.close()
select {
case <-pairErrc:
case <-time.After(time.Second):
t.Fatal("timed out waiting for pair peer to disconnect")
}

if got := pm.peers.Peer(primary.id); got != primary.peer {
t.Fatal("primary peer was removed after pair disconnect")
}
if primary.pairWriter() != nil {
t.Fatal("primary peer still has stale pair writer after pair disconnect")
}
if primary.PairPeer() != nil {
t.Fatal("primary peer still references pair peer after pair disconnect")
}

query := &getBlockHeadersData{Origin: hashOrNumber{Number: 1}, Amount: 1}
expected := []*types.Header{pm.blockchain.GetBlockByNumber(1).Header()}

if err := p2p.Send(primary.app, GetBlockHeadersMsg, query); err != nil {
t.Fatalf("failed to send header request from primary peer: %v", err)
}
if err := p2p.ExpectMsg(primary.app, BlockHeadersMsg, expected); err != nil {
t.Fatalf("failed to receive header response after pair disconnect: %v", err)
}

select {
case err := <-primaryErrc:
t.Fatalf("primary peer disconnected unexpectedly: %v", err)
default:
}
}

func TestRemovePeerKeepsDownloaderRegistrationForPair(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
defer pm.Stop()

var id discover.NodeID
id[0] = 11

primary := pm.newPeer(eth63, p2p.NewPeer(id, "primary", nil), &stubMsgReadWriter{})
pair := pm.newPeer(eth63, p2p.NewPeer(id, "pair", nil), &stubMsgReadWriter{})

if err := pm.peers.Register(primary); err != nil {
t.Fatalf("register primary: %v", err)
}
if err := pm.downloader.RegisterPeer(primary.id, primary.version, primary); err != nil {
t.Fatalf("register downloader primary: %v", err)
}
if err := pm.peers.Register(pair); err != p2p.ErrAddPairPeer {
t.Fatalf("register pair: got %v want %v", err, p2p.ErrAddPairPeer)
}
pm.removePeer(pair)

if err := pm.downloader.RegisterPeer(primary.id, primary.version, primary); err == nil {
t.Fatal("pair removal should keep downloader primary registration")
}
if got := pm.peers.Peer(primary.id); got != primary {
t.Fatal("pair removal should keep primary registered")
}
if primary.pairWriter() != nil {
t.Fatal("pair removal should clear the primary pair writer")
}
}

func TestRemovePeerUnregistersDownloaderForPrimary(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
defer pm.Stop()

var id discover.NodeID
id[0] = 12

primary := pm.newPeer(eth63, p2p.NewPeer(id, "primary", nil), &stubMsgReadWriter{})

if err := pm.peers.Register(primary); err != nil {
t.Fatalf("register primary: %v", err)
}
if err := pm.downloader.RegisterPeer(primary.id, primary.version, primary); err != nil {
t.Fatalf("register downloader primary: %v", err)
}
pm.removePeer(primary)

if err := pm.downloader.RegisterPeer(primary.id, primary.version, primary); err != nil {
t.Fatalf("primary removal should unregister downloader peer: %v", err)
}
if got := pm.peers.Peer(primary.id); got != nil {
t.Fatal("primary removal should unregister primary peer")
}
}

func testGetBlockHeaders(t *testing.T, protocol int) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxHashFetch+15, nil, nil)
peer, _ := newTestPeer("peer", protocol, pm, true)
Expand Down
Loading
Loading