From 79a1e14fef2e49f09ad6e2b53cc35ba69ce727c4 Mon Sep 17 00:00:00 2001 From: Sanaz Taheri <35961250+staheri14@users.noreply.github.com> Date: Thu, 28 Sep 2023 09:51:40 -0600 Subject: [PATCH] chore: refactors CAT mempool to use SendEnvelope instead of Send (#1101) Closes #1099 and #1085 --- mempool/cat/reactor.go | 29 ++++++++++++----------------- mempool/cat/reactor_test.go | 17 +++++++++++------ 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/mempool/cat/reactor.go b/mempool/cat/reactor.go index 4e246c2326..152c029b29 100644 --- a/mempool/cat/reactor.go +++ b/mempool/cat/reactor.go @@ -328,10 +328,6 @@ func (memR *Reactor) broadcastSeenTx(txKey types.TxKey) { }, }, } - bz, err := msg.Marshal() - if err != nil { - panic(err) - } // Add jitter to when the node broadcasts it's seen txs to stagger when nodes // in the network broadcast their seenTx messages. @@ -353,7 +349,10 @@ func (memR *Reactor) broadcastSeenTx(txKey types.TxKey) { continue } - peer.Send(MempoolStateChannel, bz) //nolint:staticcheck + p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + ChannelID: MempoolStateChannel, + Message: msg, + }, memR.Logger) } } @@ -366,10 +365,6 @@ func (memR *Reactor) broadcastNewTx(wtx *wrappedTx) { }, }, } - bz, err := msg.Marshal() - if err != nil { - panic(err) - } for id, peer := range memR.ids.GetAll() { if p, ok := peer.Get(types.PeerStateKey).(PeerState); ok { @@ -386,9 +381,10 @@ func (memR *Reactor) broadcastNewTx(wtx *wrappedTx) { continue } - if peer.Send(mempool.MempoolChannel, bz) { //nolint:staticcheck - memR.mempool.PeerHasTx(id, wtx.key) - } + p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + ChannelID: mempool.MempoolChannel, + Message: msg, + }, memR.Logger) } } @@ -405,12 +401,11 @@ func (memR *Reactor) requestTx(txKey types.TxKey, peer p2p.Peer) { WantTx: &protomem.WantTx{TxKey: txKey[:]}, }, } - bz, err := msg.Marshal() - if err != nil { - panic(err) - } - success := peer.Send(MempoolStateChannel, bz) //nolint:staticcheck + success := p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + ChannelID: MempoolStateChannel, + Message: msg, + }, memR.Logger) if success { memR.mempool.metrics.RequestedTxs.Add(1) requested := memR.requests.Add(txKey, memR.ids.GetIDForPeer(peer.ID()), memR.findNewPeerToRequestTx) diff --git a/mempool/cat/reactor_test.go b/mempool/cat/reactor_test.go index 62e167b735..15d67bfad5 100644 --- a/mempool/cat/reactor_test.go +++ b/mempool/cat/reactor_test.go @@ -74,11 +74,13 @@ func TestReactorSendWantTxAfterReceiveingSeenTx(t *testing.T) { msgWant := &protomem.Message{ Sum: &protomem.Message_WantTx{WantTx: &protomem.WantTx{TxKey: key[:]}}, } - msgWantB, err := msgWant.Marshal() - require.NoError(t, err) + envWant := p2p.Envelope{ + Message: msgWant, + ChannelID: MempoolStateChannel, + } peer := genPeer() - peer.On("Send", MempoolStateChannel, msgWantB).Return(true) + peer.On("SendEnvelope", envWant).Return(true) reactor.InitPeer(peer) reactor.Receive(MempoolStateChannel, peer, msgSeenB) @@ -136,13 +138,16 @@ func TestReactorBroadcastsSeenTxAfterReceivingTx(t *testing.T) { seenMsg := &protomem.Message{ Sum: &protomem.Message_SeenTx{SeenTx: &protomem.SeenTx{TxKey: key[:]}}, } - seenMsgBytes, err := seenMsg.Marshal() - require.NoError(t, err) peers := genPeers(2) // only peer 1 should receive the seen tx message as peer 0 broadcasted // the transaction in the first place - peers[1].On("Send", MempoolStateChannel, seenMsgBytes).Return(true) + seenEnv := p2p.Envelope{ + Message: seenMsg, + ChannelID: MempoolStateChannel, + } + + peers[1].On("SendEnvelope", seenEnv).Return(true) reactor.InitPeer(peers[0]) reactor.InitPeer(peers[1])