From 2ac07967c20ae5f10b146ebb12c33d5d1b88d7a4 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Mon, 2 Jan 2017 15:44:36 -0800 Subject: [PATCH] nsqd: read/write concurrently in MPUB benchmarks --- nsqd/protocol_v2_test.go | 88 ++++++++++++++++++++++++---------------- 1 file changed, 54 insertions(+), 34 deletions(-) diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 1c5b72ffb..4db7c58ff 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -1547,25 +1547,35 @@ func benchmarkProtocolV2PubMultiTopic(b *testing.B, numTopics int) { rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) num := b.N / numTopics / batchSize - for i := 0; i < num; i++ { - cmd, _ := nsq.MultiPublish(topicName, batch) - _, err := cmd.WriteTo(rw) - if err != nil { - panic(err.Error()) - } - err = rw.Flush() - if err != nil { - panic(err.Error()) - } - resp, err := nsq.ReadResponse(rw) - if err != nil { - panic(err.Error()) + wg.Add(1) + go func() { + for i := 0; i < num; i++ { + cmd, _ := nsq.MultiPublish(topicName, batch) + _, err := cmd.WriteTo(rw) + if err != nil { + panic(err.Error()) + } + err = rw.Flush() + if err != nil { + panic(err.Error()) + } } - _, data, _ := nsq.UnpackResponse(resp) - if !bytes.Equal(data, []byte("OK")) { - panic("invalid response") + wg.Done() + }() + wg.Add(1) + go func() { + for i := 0; i < num; i++ { + resp, err := nsq.ReadResponse(rw) + if err != nil { + panic(err.Error()) + } + _, data, _ := nsq.UnpackResponse(resp) + if !bytes.Equal(data, []byte("OK")) { + panic("invalid response") + } } - } + wg.Done() + }() wg.Done() }() } @@ -1611,25 +1621,35 @@ func benchmarkProtocolV2Pub(b *testing.B, size int) { rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) num := b.N / runtime.GOMAXPROCS(0) / batchSize - for i := 0; i < num; i++ { - cmd, _ := nsq.MultiPublish(topicName, batch) - _, err := cmd.WriteTo(rw) - if err != nil { - panic(err.Error()) - } - err = rw.Flush() - if err != nil { - panic(err.Error()) - } - resp, err := nsq.ReadResponse(rw) - if err != nil { - panic(err.Error()) + wg.Add(1) + go func() { + for i := 0; i < num; i++ { + cmd, _ := nsq.MultiPublish(topicName, batch) + _, err := cmd.WriteTo(rw) + if err != nil { + panic(err.Error()) + } + err = rw.Flush() + if err != nil { + panic(err.Error()) + } } - _, data, _ := nsq.UnpackResponse(resp) - if !bytes.Equal(data, []byte("OK")) { - panic("invalid response") + wg.Done() + }() + wg.Add(1) + go func() { + for i := 0; i < num; i++ { + resp, err := nsq.ReadResponse(rw) + if err != nil { + panic(err.Error()) + } + _, data, _ := nsq.UnpackResponse(resp) + if !bytes.Equal(data, []byte("OK")) { + panic("invalid response") + } } - } + wg.Done() + }() wg.Done() }() }