diff --git a/libs/log/testing_logger.go b/libs/log/testing_logger.go index 7c6f661a74..5dbb381007 100644 --- a/libs/log/testing_logger.go +++ b/libs/log/testing_logger.go @@ -23,7 +23,7 @@ func TestingLogger() Logger { return TestingLoggerWithOutput(os.Stdout) } -// TestingLoggerWOutput returns a TMLogger which writes to (w io.Writer) if testing being run +// TestingLoggerWithOutput returns a TMLogger which writes to (w io.Writer) if testing being run // with the verbose (-v) flag, NopLogger otherwise. // // Note that the call to TestingLoggerWithOutput(w io.Writer) must be made diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 1cf1f7a0f4..b9d8a5fbbd 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "net" + "sync" "testing" "time" @@ -769,3 +770,273 @@ func stopAll(t *testing.T, stoppers ...stopper) func() { } } } + +// generateMessages sends a sequence of messages to the specified multiplex connection `mc`. +// Each message has the given size and is sent at the specified rate +// `messagingRate`. This process continues for the duration `totalDuration` or +// until `totalNum` messages are sent. If `totalNum` is negative, +// messaging persists for the entire `totalDuration`. +func generateMessages(mc *MConnection, + messagingRate time.Duration, + totalDuration time.Duration, totalNum int, msgSize int, chID byte) { + // all messages have an identical content + msg := bytes.Repeat([]byte{'x'}, msgSize) + + // message generation interval ticker + ticker := time.NewTicker(messagingRate) + defer ticker.Stop() + + // timer for the total duration + timer := time.NewTimer(totalDuration) + defer timer.Stop() + + sentNum := 0 + // generating messages + for { + select { + case <-ticker.C: + // generate message + if mc.Send(chID, msg) { + sentNum++ + if totalNum > 0 && sentNum >= totalNum { + log.TestingLogger().Info("Completed the message generation as the" + + " total number of messages is reached") + return + } + } + case <-timer.C: + // time's up + log.TestingLogger().Info("Completed the message generation as the total " + + "duration is reached") + return + } + } +} + +func BenchmarkMConnection(b *testing.B) { + chID := byte(0x01) + + // Testcases 1-3 evaluate the effect of send queue capacity on message transmission delay. + + // Testcases 3-5 assess the full utilization of maximum bandwidth by + // incrementally increasing the total load while keeping send and + // receive rates constant. + // The transmission time should be ~ totalMsg*msgSize/sendRate, + // indicating that the actual sendRate is in effect and has been + // utilized. + + // Testcases 5-7 assess if surpassing available bandwidth maintains + // consistent transmission delay without congestion or performance + // degradation. A uniform delay across these testcases is expected. + + tests := []struct { + name string + msgSize int // size of each message in bytes + totalMsg int // total number of messages to be sent + messagingRate time.Duration // rate at which messages are sent + totalDuration time.Duration // total duration for which messages are sent + sendQueueCapacity int // send queue capacity i.e., the number of messages that can be buffered + sendRate int64 // send rate in bytes per second + recRate int64 // receive rate in bytes per second + }{ + { + // testcase 1 + // Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second. + // Ideal transmission time for 50 messages is ~1 second. + // SendQueueCapacity should not affect transmission delay. + name: "queue capacity = 1, " + + "total load = 50 KB, " + + "msg rate = send rate", + msgSize: 1 * 1024, + totalMsg: 1 * 50, + sendQueueCapacity: 1, + messagingRate: 20 * time.Millisecond, + totalDuration: 1 * time.Minute, + sendRate: 50 * 1024, + recRate: 50 * 1024, + }, + { + // testcase 2 + // Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second. + // Ideal transmission time for 50 messages is ~1 second. + // Increasing SendQueueCapacity should not affect transmission + // delay. + name: "queue capacity = 50, " + + "total load = 50 KB, " + + "traffic rate = send rate", + msgSize: 1 * 1024, + totalMsg: 1 * 50, + sendQueueCapacity: 50, + messagingRate: 20 * time.Millisecond, + totalDuration: 1 * time.Minute, + sendRate: 50 * 1024, + recRate: 50 * 1024, + }, + { + // testcase 3 + // Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second. + // Ideal transmission time for 50 messages is ~1 second. + // Increasing SendQueueCapacity should not affect transmission + // delay. + name: "queue capacity = 100, " + + "total load = 50 KB, " + + "traffic rate = send rate", + msgSize: 1 * 1024, + totalMsg: 1 * 50, + sendQueueCapacity: 100, + messagingRate: 20 * time.Millisecond, + totalDuration: 1 * time.Minute, + sendRate: 50 * 1024, + recRate: 50 * 1024, + }, + { + // testcase 4 + // Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second. + // The test runs for 100 messages, expecting a total transmission time of ~2 seconds. + name: "queue capacity = 100, " + + "total load = 2 * 50 KB, " + + "traffic rate = send rate", + msgSize: 1 * 1024, + totalMsg: 2 * 50, + sendQueueCapacity: 100, + messagingRate: 20 * time.Millisecond, + totalDuration: 1 * time.Minute, + sendRate: 50 * 1024, + recRate: 50 * 1024, + }, + { + // testcase 5 + // Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second. + // The test runs for 400 messages, + // expecting a total transmission time of ~8 seconds. + name: "queue capacity = 100, " + + "total load = 8 * 50 KB, " + + "traffic rate = send rate", + msgSize: 1 * 1024, + totalMsg: 8 * 50, + sendQueueCapacity: 100, + messagingRate: 20 * time.Millisecond, + totalDuration: 1 * time.Minute, + sendRate: 50 * 1024, + recRate: 50 * 1024, + }, + { + // testcase 6 + // Sends one 1KB message every 10ms, totaling 100 messages (100KB/s) per second. + // The test runs for 400 messages, + // expecting a total transmission time of ~8 seconds. + name: "queue capacity = 100, " + + "total load = 8 * 50 KB, " + + "traffic rate = 2 * send rate", + msgSize: 1 * 1024, + totalMsg: 8 * 50, + sendQueueCapacity: 100, + messagingRate: 10 * time.Millisecond, + totalDuration: 1 * time.Minute, + sendRate: 50 * 1024, + recRate: 50 * 1024, + }, + { + // testcase 7 + // Sends one 1KB message every 2ms, totaling 500 messages (500KB/s) per second. + // The test runs for 400 messages, + // expecting a total transmission time of ~8 seconds. + name: "queue capacity = 100, " + + "total load = 8 * 50 KB, " + + "traffic rate = 10 * send rate", + msgSize: 1 * 1024, + totalMsg: 8 * 50, + sendQueueCapacity: 100, + messagingRate: 2 * time.Millisecond, + totalDuration: 1 * time.Minute, + sendRate: 50 * 1024, + recRate: 50 * 1024, + }, + } + + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + for n := 0; n < b.N; n++ { + // set up two networked connections + // server, client := NetPipe() // can alternatively use this and comment out the line below + server, client := tcpNetPipe() + defer server.Close() + defer client.Close() + + // prepare callback to receive messages + allReceived := make(chan bool) + receivedLoad := 0 // number of messages received + onReceive := func(chID byte, msgBytes []byte) { + receivedLoad++ + if receivedLoad >= tt.totalMsg && tt.totalMsg > 0 { + allReceived <- true + } + } + + cnfg := DefaultMConnConfig() + cnfg.SendRate = 50 * 1024 // 500 KB/s + cnfg.RecvRate = 50 * 1024 // 500 KB/s + chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, + SendQueueCapacity: tt.sendQueueCapacity}} + clientMconn := NewMConnectionWithConfig(client, chDescs, + func(chID byte, msgBytes []byte) {}, + func(r interface{}) {}, + cnfg) + serverChDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, + SendQueueCapacity: tt.sendQueueCapacity}} + serverMconn := NewMConnectionWithConfig(server, serverChDescs, + onReceive, + func(r interface{}) {}, + cnfg) + clientMconn.SetLogger(log.TestingLogger()) + serverMconn.SetLogger(log.TestingLogger()) + + err := clientMconn.Start() + require.Nil(b, err) + defer func() { + _ = clientMconn.Stop() + }() + err = serverMconn.Start() + require.Nil(b, err) + defer func() { + _ = serverMconn.Stop() + }() + + // start measuring the time from here to exclude the time + // taken to set up the connections + b.StartTimer() + // start generating messages, it is a blocking call + generateMessages(clientMconn, + tt.messagingRate, + tt.totalDuration, + tt.totalMsg, + tt.msgSize, chID) + + // wait for all messages to be received + <-allReceived + b.StopTimer() + } + }) + + } + +} + +// tcpNetPipe creates a pair of connected net.Conn objects that can be used in tests. +func tcpNetPipe() (net.Conn, net.Conn) { + ln, _ := net.Listen("tcp", "127.0.0.1:0") + var conn1 net.Conn + var wg sync.WaitGroup + wg.Add(1) + go func(c *net.Conn) { + *c, _ = ln.Accept() + wg.Done() + }(&conn1) + + addr := ln.Addr().String() + conn2, _ := net.Dial("tcp", addr) + + wg.Wait() + + return conn2, conn1 +}