Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: MConnection benchmark #1176

Merged
merged 23 commits into from
Jan 19, 2024
Merged
Changes from 20 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 271 additions & 0 deletions p2p/conn/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"net"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -769,3 +770,273 @@ func stopAll(t *testing.T, stoppers ...stopper) func() {
}
}
}

// generateMessages sends a sequence of messages to the specified multiplex connection `mc`.
// Each message has the given size and is sent at the specified rate
// `messagingRate`. This process continues for the duration `totalDuration` or
// until `totalNum` messages are sent. If `totalNum` is negative,
// messaging persists for the entire `totalDuration`.
func generateMessages(mc *MConnection,
messagingRate time.Duration,
totalDuration time.Duration, totalNum int, msgSize int, chID byte) {
// all messages have an identical content
msg := bytes.Repeat([]byte{'x'}, msgSize)

// message generation interval ticker
ticker := time.NewTicker(messagingRate)
defer ticker.Stop()

// timer for the total duration
timer := time.NewTimer(totalDuration)
defer timer.Stop()

sentNum := 0
// generating messages
for {
select {
case <-ticker.C:
// generate message
if mc.Send(chID, msg) {
sentNum++
if totalNum > 0 && sentNum >= totalNum {
log.TestingLogger().Info("Completed the message generation as the" +
" total number of messages is reached")
return
}
}
case <-timer.C:
// time's up
log.TestingLogger().Info("Completed the message generation as the total " +
"duration is reached")
return
}
}
}

func BenchmarkMConnection(b *testing.B) {
chID := byte(0x01)

// Testcases 1-3 evaluate the effect of send queue capacity on message transmission delay.

// Testcases 3-5 assess the full utilization of maximum bandwidth by
// incrementally increasing the total load ( to higher than the bandwidth)
// while keeping send and receive rates constant.
// The transmission time should be ~ totalMsg*msgSize/sendRate,
// indicating that the actual sendRate is in effect and has been
// utilized.

// Testcases 5-7 assess if surpassing available bandwidth maintains
// consistent transmission delay without congestion or performance
// degradation. A uniform delay across these testcases is expected.

tests := []struct {
name string
msgSize int // size of each message in bytes
totalMsg int // total number of messages to be sent
messagingRate time.Duration // rate at which messages are sent
totalDuration time.Duration // total duration for which messages are sent
sendQueueCapacity int // send queue capacity i.e., the number of messages that can be buffered
sendRate int64 // send rate in bytes per second
recRate int64 // receive rate in bytes per second
}{
{
// testcase 1
// Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second.
// Ideal transmission time for 50 messages is ~1 second.
// SendQueueCapacity should not affect transmission delay.
name: "queue capacity = 1, " +
"total load = 50 KB, " +
"msg rate = send rate",
msgSize: 1 * 1024,
totalMsg: 1 * 50,
sendQueueCapacity: 1,
messagingRate: 20 * time.Millisecond,
totalDuration: 1 * time.Minute,
sendRate: 50 * 1024,
recRate: 50 * 1024,
},
{
// testcase 2
// Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second.
// Ideal transmission time for 50 messages is ~1 second.
// Increasing SendQueueCapacity should not affect transmission
// delay.
name: "queue capacity = 50, " +
"total load = 50 KB, " +
"traffic rate = send rate",
msgSize: 1 * 1024,
totalMsg: 1 * 50,
sendQueueCapacity: 50,
messagingRate: 20 * time.Millisecond,
totalDuration: 1 * time.Minute,
sendRate: 50 * 1024,
recRate: 50 * 1024,
},
{
// testcase 3
// Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second.
// Ideal transmission time for 50 messages is ~1 second.
// Increasing SendQueueCapacity should not affect transmission
// delay.
name: "queue capacity = 100, " +
"total load = 50 KB, " +
"traffic rate = send rate",
msgSize: 1 * 1024,
totalMsg: 1 * 50,
sendQueueCapacity: 100,
messagingRate: 20 * time.Millisecond,
totalDuration: 1 * time.Minute,
sendRate: 50 * 1024,
recRate: 50 * 1024,
},
{
// testcase 4
// Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second.
// The test runs for 100 messages, expecting a total transmission time of ~2 seconds.
name: "queue capacity = 100, " +
"total load = 2 * 50 KB, " +
"traffic rate = send rate",
msgSize: 1 * 1024,
totalMsg: 2 * 50,
sendQueueCapacity: 10,
messagingRate: 20 * time.Millisecond,
totalDuration: 1 * time.Minute,
sendRate: 50 * 1024,
recRate: 50 * 1024,
},
{
// testcase 5
// Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second.
// The test runs for 400 messages,
// expecting a total transmission time of ~8 seconds.
name: "queue capacity = 100, " +
"total load = 8 * 50 KB, " +
"traffic rate = send rate",
msgSize: 1 * 1024,
totalMsg: 8 * 50,
sendQueueCapacity: 10,
messagingRate: 20 * time.Millisecond,
totalDuration: 1 * time.Minute,
sendRate: 50 * 1024,
recRate: 50 * 1024,
},
{
// testcase 6
// Sends one 1KB message every 10ms, totaling 100 messages (100KB/s) per second.
// The test runs for 400 messages,
// expecting a total transmission time of ~8 seconds.
name: "queue capacity = 100, " +
"total load = 8 * 50 KB, " +
"traffic rate = 2 * send rate",
msgSize: 1 * 1024,
totalMsg: 8 * 50,
sendQueueCapacity: 10,
messagingRate: 10 * time.Millisecond,
totalDuration: 1 * time.Minute,
sendRate: 50 * 1024,
recRate: 50 * 1024,
},
{
// testcase 7
// Sends one 1KB message every 2ms, totaling 500 messages (500KB/s) per second.
// The test runs for 400 messages,
// expecting a total transmission time of ~8 seconds.
name: "queue capacity = 100, " +
"total load = 8 * 50 KB, " +
"traffic rate = 10 * send rate",
msgSize: 1 * 1024,
totalMsg: 8 * 50,
sendQueueCapacity: 10,
messagingRate: 2 * time.Millisecond,
totalDuration: 1 * time.Minute,
sendRate: 50 * 1024,
recRate: 50 * 1024,
},
}

for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
for n := 0; n < b.N; n++ {
// set up two networked connections
// server, client := NetPipe() // can alternatively use this and comment out the line below
server, client := tcpNetPipe()
defer server.Close()
defer client.Close()

// prepare callback to receive messages
allReceived := make(chan bool)
receivedLoad := 0 // number of messages received
onReceive := func(chID byte, msgBytes []byte) {
receivedLoad++
if receivedLoad >= tt.totalMsg && tt.totalMsg > 0 {
allReceived <- true
}
}

cnfg := DefaultMConnConfig()
cnfg.SendRate = 50 * 1024 // 500 KB/s
cnfg.RecvRate = 50 * 1024 // 500 KB/s
chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1,
SendQueueCapacity: tt.sendQueueCapacity}}
clientMconn := NewMConnectionWithConfig(client, chDescs,
func(chID byte, msgBytes []byte) {},
func(r interface{}) {},
cnfg)
serverChDescs := []*ChannelDescriptor{{ID: chID, Priority: 1,
SendQueueCapacity: tt.sendQueueCapacity}}
serverMconn := NewMConnectionWithConfig(server, serverChDescs,
onReceive,
func(r interface{}) {},
cnfg)
clientMconn.SetLogger(log.TestingLogger())
serverMconn.SetLogger(log.TestingLogger())

err := clientMconn.Start()
require.Nil(b, err)
defer func() {
_ = clientMconn.Stop()
}()
err = serverMconn.Start()
require.Nil(b, err)
defer func() {
_ = serverMconn.Stop()
}()

// start measuring the time from here to exclude the time
// taken to set up the connections
b.StartTimer()
// start generating messages, it is a blocking call
generateMessages(clientMconn,
tt.messagingRate,
tt.totalDuration,
tt.totalMsg,
tt.msgSize, chID)

// wait for all messages to be received
<-allReceived
b.StopTimer()
}
})

}

}

// testPipe creates a pair of connected net.Conn objects that can be used in tests.
func tcpNetPipe() (net.Conn, net.Conn) {
ln, _ := net.Listen("tcp", "127.0.0.1:0")
var conn1 net.Conn
var wg sync.WaitGroup
wg.Add(1)
go func(c *net.Conn) {
*c, _ = ln.Accept()
wg.Done()
}(&conn1)

addr := ln.Addr().String()
conn2, _ := net.Dial("tcp", addr)

wg.Wait()

return conn2, conn1
}
Loading