Skip to content

Commit

Permalink
chore: refactors CAT mempool to use SendEnvelope instead of Send (#1101)
Browse files Browse the repository at this point in the history
Closes #1099 and #1085
  • Loading branch information
staheri14 authored Sep 28, 2023
1 parent 13b9e92 commit 79a1e14
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
29 changes: 12 additions & 17 deletions mempool/cat/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,6 @@ func (memR *Reactor) broadcastSeenTx(txKey types.TxKey) {
},
},
}
bz, err := msg.Marshal()
if err != nil {
panic(err)
}

// Add jitter to when the node broadcasts it's seen txs to stagger when nodes
// in the network broadcast their seenTx messages.
Expand All @@ -353,7 +349,10 @@ func (memR *Reactor) broadcastSeenTx(txKey types.TxKey) {
continue
}

peer.Send(MempoolStateChannel, bz) //nolint:staticcheck
p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: MempoolStateChannel,
Message: msg,
}, memR.Logger)
}
}

Expand All @@ -366,10 +365,6 @@ func (memR *Reactor) broadcastNewTx(wtx *wrappedTx) {
},
},
}
bz, err := msg.Marshal()
if err != nil {
panic(err)
}

for id, peer := range memR.ids.GetAll() {
if p, ok := peer.Get(types.PeerStateKey).(PeerState); ok {
Expand All @@ -386,9 +381,10 @@ func (memR *Reactor) broadcastNewTx(wtx *wrappedTx) {
continue
}

if peer.Send(mempool.MempoolChannel, bz) { //nolint:staticcheck
memR.mempool.PeerHasTx(id, wtx.key)
}
p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: mempool.MempoolChannel,
Message: msg,
}, memR.Logger)
}
}

Expand All @@ -405,12 +401,11 @@ func (memR *Reactor) requestTx(txKey types.TxKey, peer p2p.Peer) {
WantTx: &protomem.WantTx{TxKey: txKey[:]},
},
}
bz, err := msg.Marshal()
if err != nil {
panic(err)
}

success := peer.Send(MempoolStateChannel, bz) //nolint:staticcheck
success := p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: MempoolStateChannel,
Message: msg,
}, memR.Logger)
if success {
memR.mempool.metrics.RequestedTxs.Add(1)
requested := memR.requests.Add(txKey, memR.ids.GetIDForPeer(peer.ID()), memR.findNewPeerToRequestTx)
Expand Down
17 changes: 11 additions & 6 deletions mempool/cat/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ func TestReactorSendWantTxAfterReceiveingSeenTx(t *testing.T) {
msgWant := &protomem.Message{
Sum: &protomem.Message_WantTx{WantTx: &protomem.WantTx{TxKey: key[:]}},
}
msgWantB, err := msgWant.Marshal()
require.NoError(t, err)
envWant := p2p.Envelope{
Message: msgWant,
ChannelID: MempoolStateChannel,
}

peer := genPeer()
peer.On("Send", MempoolStateChannel, msgWantB).Return(true)
peer.On("SendEnvelope", envWant).Return(true)

reactor.InitPeer(peer)
reactor.Receive(MempoolStateChannel, peer, msgSeenB)
Expand Down Expand Up @@ -136,13 +138,16 @@ func TestReactorBroadcastsSeenTxAfterReceivingTx(t *testing.T) {
seenMsg := &protomem.Message{
Sum: &protomem.Message_SeenTx{SeenTx: &protomem.SeenTx{TxKey: key[:]}},
}
seenMsgBytes, err := seenMsg.Marshal()
require.NoError(t, err)

peers := genPeers(2)
// only peer 1 should receive the seen tx message as peer 0 broadcasted
// the transaction in the first place
peers[1].On("Send", MempoolStateChannel, seenMsgBytes).Return(true)
seenEnv := p2p.Envelope{
Message: seenMsg,
ChannelID: MempoolStateChannel,
}

peers[1].On("SendEnvelope", seenEnv).Return(true)

reactor.InitPeer(peers[0])
reactor.InitPeer(peers[1])
Expand Down

0 comments on commit 79a1e14

Please sign in to comment.