diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index 1dd141733..bccd8b582 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -89,7 +89,7 @@ func TestInFlightWorker(t *testing.T) { channel := topic.GetChannel("channel") for i := 0; i < count; i++ { - msg := NewMessage(<-nsqd.idChan, []byte("test")) + msg := NewMessage(topic.GenerateID(), []byte("test")) channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) } @@ -131,7 +131,7 @@ func TestChannelEmpty(t *testing.T) { msgs := make([]*Message, 0, 25) for i := 0; i < 25; i++ { - msg := NewMessage(<-nsqd.idChan, []byte("test")) + msg := NewMessage(topic.GenerateID(), []byte("test")) channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) msgs = append(msgs, msg) } @@ -169,7 +169,7 @@ func TestChannelEmptyConsumer(t *testing.T) { channel.AddClient(client.ID, client) for i := 0; i < 25; i++ { - msg := NewMessage(<-nsqd.idChan, []byte("test")) + msg := NewMessage(topic.GenerateID(), []byte("test")) channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) client.SendingMessage() } @@ -202,15 +202,15 @@ func TestChannelHealth(t *testing.T) { channel.backend = &errorBackendQueue{} - msg := NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg := NewMessage(topic.GenerateID(), make([]byte, 100)) err := channel.PutMessage(msg) test.Nil(t, err) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = channel.PutMessage(msg) test.Nil(t, err) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = channel.PutMessage(msg) test.NotNil(t, err) @@ -224,7 +224,7 @@ func TestChannelHealth(t *testing.T) { channel.backend = &errorRecoveredBackendQueue{} - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = channel.PutMessage(msg) test.Nil(t, err) diff --git a/nsqd/guid.go b/nsqd/guid.go index 0fa5823a1..f86c78b8e 100644 --- a/nsqd/guid.go +++ b/nsqd/guid.go @@ -12,6 +12,7 @@ package nsqd import ( "encoding/hex" "errors" + "sync" "time" ) @@ -33,22 +34,35 @@ var ErrIDBackwards = errors.New("ID went backward") type guid int64 type guidFactory struct { + sync.Mutex + + workerID int64 sequence int64 lastTimestamp int64 lastID guid } -func (f *guidFactory) NewGUID(workerID int64) (guid, error) { +func NewGUIDFactory(workerID int64) *guidFactory { + return &guidFactory{ + workerID: workerID, + } +} + +func (f *guidFactory) NewGUID() (guid, error) { + f.Lock() + // divide by 1048576, giving pseudo-milliseconds ts := time.Now().UnixNano() >> 20 if ts < f.lastTimestamp { + f.Unlock() return 0, ErrTimeBackwards } if f.lastTimestamp == ts { f.sequence = (f.sequence + 1) & sequenceMask if f.sequence == 0 { + f.Unlock() return 0, ErrSequenceExpired } } else { @@ -58,15 +72,18 @@ func (f *guidFactory) NewGUID(workerID int64) (guid, error) { f.lastTimestamp = ts id := guid(((ts - twepoch) << timestampShift) | - (workerID << workerIDShift) | + (f.workerID << workerIDShift) | f.sequence) if id <= f.lastID { + f.Unlock() return 0, ErrIDBackwards } f.lastID = id + f.Unlock() + return id, nil } diff --git a/nsqd/guid_test.go b/nsqd/guid_test.go index 946b742c7..291929f98 100644 --- a/nsqd/guid_test.go +++ b/nsqd/guid_test.go @@ -25,10 +25,7 @@ func BenchmarkGUIDUnsafe(b *testing.B) { func BenchmarkGUID(b *testing.B) { factory := &guidFactory{} for i := 0; i < b.N; i++ { - guid, err := factory.NewGUID(0) - if err != nil { - continue - } - guid.Hex() + id, _ := factory.NewGUID() + id.Hex() } } diff --git a/nsqd/http.go b/nsqd/http.go index 7efd3c3a2..611ada63a 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -214,7 +214,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout } } - msg := NewMessage(<-s.ctx.nsqd.idChan, body) + msg := NewMessage(topic.GenerateID(), body) msg.deferred = deferred err = topic.PutMessage(msg) if err != nil { @@ -243,7 +243,7 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou _, ok := reqParams["binary"] if ok { tmp := make([]byte, 4) - msgs, err = readMPUB(req.Body, tmp, s.ctx.nsqd.idChan, + msgs, err = readMPUB(req.Body, tmp, topic, s.ctx.nsqd.getOpts().MaxMsgSize, s.ctx.nsqd.getOpts().MaxBodySize) if err != nil { return nil, http_api.Err{413, err.(*protocol.FatalClientErr).Code[2:]} @@ -282,7 +282,7 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou return nil, http_api.Err{413, "MSG_TOO_BIG"} } - msg := NewMessage(<-s.ctx.nsqd.idChan, block) + msg := NewMessage(topic.GenerateID(), block) msgs = append(msgs, msg) } } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 4c5ba90fe..018332691 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -11,7 +11,6 @@ import ( "net" "os" "path" - "runtime" "strings" "sync" "sync/atomic" @@ -60,7 +59,6 @@ type NSQD struct { poolSize int - idChan chan MessageID notifyChan chan interface{} optsNotificationChan chan struct{} exitChan chan int @@ -79,7 +77,6 @@ func New(opts *Options) *NSQD { n := &NSQD{ startTime: time.Now(), topicMap: make(map[string]*Topic), - idChan: make(chan MessageID, 4096), exitChan: make(chan int), notifyChan: make(chan interface{}), optsNotificationChan: make(chan struct{}, 1), @@ -253,7 +250,6 @@ func (n *NSQD) Main() { }) n.waitGroup.Wrap(func() { n.queueScanLoop() }) - n.waitGroup.Wrap(func() { n.idPump() }) n.waitGroup.Wrap(func() { n.lookupLoop() }) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(func() { n.statsdLoop() }) @@ -401,8 +397,6 @@ func (n *NSQD) Exit() { } n.Unlock() - // we want to do this last as it closes the idPump (if closed first it - // could potentially starve items in process and deadlock) close(n.exitChan) n.waitGroup.Wait() @@ -511,33 +505,6 @@ func (n *NSQD) DeleteExistingTopic(topicName string) error { return nil } -func (n *NSQD) idPump() { - factory := &guidFactory{} - lastError := time.Unix(0, 0) - workerID := n.getOpts().ID - for { - id, err := factory.NewGUID(workerID) - if err != nil { - now := time.Now() - if now.Sub(lastError) > time.Second { - // only print the error once/second - n.logf("ERROR: %s", err) - lastError = now - } - runtime.Gosched() - continue - } - select { - case n.idChan <- id.Hex(): - case <-n.exitChan: - goto exit - } - } - -exit: - n.logf("ID: closing") -} - func (n *NSQD) Notify(v interface{}) { // since the in-memory metadata is incomplete, // should not persist metadata while loading it. diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index 374c0619e..dcc3ba501 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -76,7 +76,7 @@ func TestStartup(t *testing.T) { body := make([]byte, 256) topic := nsqd.GetTopic(topicName) for i := 0; i < iterations; i++ { - msg := NewMessage(<-nsqd.idChan, body) + msg := NewMessage(topic.GenerateID(), body) topic.PutMessage(msg) } @@ -182,7 +182,7 @@ func TestEphemeralTopicsAndChannels(t *testing.T) { client := newClientV2(0, nil, &context{nsqd}) ephemeralChannel.AddClient(client.ID, client) - msg := NewMessage(<-nsqd.idChan, body) + msg := NewMessage(topic.GenerateID(), body) topic.PutMessage(msg) msg = <-ephemeralChannel.memoryMsgChan test.Equal(t, body, msg.Body) diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 8f6dbe2c4..000ecb140 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -781,7 +781,7 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { } topic := p.ctx.nsqd.GetTopic(topicName) - msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody) + msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) @@ -803,6 +803,12 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("E_BAD_TOPIC MPUB topic name %q is not valid", topicName)) } + if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil { + return nil, err + } + + topic := p.ctx.nsqd.GetTopic(topicName) + bodyLen, err := readLen(client.Reader, client.lenSlice) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read body size") @@ -818,18 +824,12 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("MPUB body too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxBodySize)) } - messages, err := readMPUB(client.Reader, client.lenSlice, p.ctx.nsqd.idChan, + messages, err := readMPUB(client.Reader, client.lenSlice, topic, p.ctx.nsqd.getOpts().MaxMsgSize, p.ctx.nsqd.getOpts().MaxBodySize) if err != nil { return nil, err } - if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil { - return nil, err - } - - topic := p.ctx.nsqd.GetTopic(topicName) - // if we've made it this far we've validated all the input, // the only possible error is that the topic is exiting during // this next call (and no messages will be queued in that case) @@ -893,7 +893,7 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { } topic := p.ctx.nsqd.GetTopic(topicName) - msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody) + msg := NewMessage(topic.GenerateID(), messageBody) msg.deferred = timeoutDuration err = topic.PutMessage(msg) if err != nil { @@ -930,7 +930,7 @@ func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error) { return nil, nil } -func readMPUB(r io.Reader, tmp []byte, idChan chan MessageID, maxMessageSize int64, maxBodySize int64) ([]*Message, error) { +func readMPUB(r io.Reader, tmp []byte, topic *Topic, maxMessageSize int64, maxBodySize int64) ([]*Message, error) { numMessages, err := readLen(r, tmp) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read message count") @@ -967,7 +967,7 @@ func readMPUB(r io.Reader, tmp []byte, idChan chan MessageID, maxMessageSize int return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "MPUB failed to read message body") } - messages = append(messages, NewMessage(<-idChan, msgBody)) + messages = append(messages, NewMessage(topic.GenerateID(), msgBody)) } return messages, nil diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 13da68401..4db7c58ff 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -130,7 +130,7 @@ func TestBasicV2(t *testing.T) { topicName := "test_v2" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) conn, err := mustConnectNSQD(tcpAddr) @@ -165,7 +165,7 @@ func TestMultipleConsumerV2(t *testing.T) { topicName := "test_multiple_v2" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.GetChannel("ch1") topic.GetChannel("ch2") topic.PutMessage(msg) @@ -375,7 +375,7 @@ func TestPausing(t *testing.T) { test.Nil(t, err) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) channel := topic.GetChannel("ch") topic.PutMessage(msg) @@ -400,7 +400,7 @@ func TestPausing(t *testing.T) { // sleep to allow the paused state to take effect time.Sleep(50 * time.Millisecond) - msg = NewMessage(<-nsqd.idChan, []byte("test body2")) + msg = NewMessage(topic.GenerateID(), []byte("test body2")) topic.PutMessage(msg) // allow the client to possibly get a message, the test would hang indefinitely @@ -412,7 +412,7 @@ func TestPausing(t *testing.T) { // unpause the channel... the client should now be pushed a message channel.UnPause() - msg = NewMessage(<-nsqd.idChan, []byte("test body3")) + msg = NewMessage(topic.GenerateID(), []byte("test body3")) topic.PutMessage(msg) resp, _ = nsq.ReadResponse(conn) @@ -618,7 +618,7 @@ func TestTouch(t *testing.T) { topic := nsqd.GetTopic(topicName) channel := topic.GetChannel("ch") - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) _, err = nsq.Ready(1).WriteTo(conn) @@ -660,7 +660,7 @@ func TestMaxRdyCount(t *testing.T) { defer conn.Close() topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) data := identify(t, conn, nil, frameTypeResponse) @@ -736,7 +736,7 @@ func TestOutputBuffering(t *testing.T) { outputBufferTimeout := 500 topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, make([]byte, outputBufferSize-1024)) + msg := NewMessage(topic.GenerateID(), make([]byte, outputBufferSize-1024)) topic.PutMessage(msg) start := time.Now() @@ -1132,7 +1132,7 @@ func TestSnappy(t *testing.T) { test.Nil(t, err) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, msgBody) + msg := NewMessage(topic.GenerateID(), msgBody) topic.PutMessage(msg) resp, _ = nsq.ReadResponse(compressConn) @@ -1226,7 +1226,7 @@ func TestSampling(t *testing.T) { topicName := "test_sampling" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) for i := 0; i < num; i++ { - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) } channel := topic.GetChannel("ch") @@ -1330,13 +1330,13 @@ func TestClientMsgTimeout(t *testing.T) { topicName := "test_cmsg_timeout" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) ch := topic.GetChannel("ch") - msg := NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg := NewMessage(topic.GenerateID(), make([]byte, 100)) topic.PutMessage(msg) // without this the race detector thinks there's a write // to msg.Attempts that races with the read in the protocol's messagePump... // it does not reflect a realistically possible condition - topic.PutMessage(NewMessage(<-nsqd.idChan, make([]byte, 100))) + topic.PutMessage(NewMessage(topic.GenerateID(), make([]byte, 100))) conn, err := mustConnectNSQD(tcpAddr) test.Nil(t, err) @@ -1518,6 +1518,81 @@ func BenchmarkProtocolV2Exec(b *testing.B) { } } +func benchmarkProtocolV2PubMultiTopic(b *testing.B, numTopics int) { + var wg sync.WaitGroup + b.StopTimer() + opts := NewOptions() + size := 200 + batchSize := int(opts.MaxBodySize) / (size + 4) + opts.Logger = test.NewTestLogger(b) + opts.MemQueueSize = int64(b.N) + tcpAddr, _, nsqd := mustStartNSQD(opts) + defer os.RemoveAll(opts.DataPath) + msg := make([]byte, size) + batch := make([][]byte, batchSize) + for i := range batch { + batch[i] = msg + } + b.SetBytes(int64(len(msg))) + b.StartTimer() + + for j := 0; j < numTopics; j++ { + topicName := fmt.Sprintf("bench_v2_pub_multi_topic_%d_%d", j, time.Now().Unix()) + wg.Add(1) + go func() { + conn, err := mustConnectNSQD(tcpAddr) + if err != nil { + panic(err.Error()) + } + rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + num := b.N / numTopics / batchSize + wg.Add(1) + go func() { + for i := 0; i < num; i++ { + cmd, _ := nsq.MultiPublish(topicName, batch) + _, err := cmd.WriteTo(rw) + if err != nil { + panic(err.Error()) + } + err = rw.Flush() + if err != nil { + panic(err.Error()) + } + } + wg.Done() + }() + wg.Add(1) + go func() { + for i := 0; i < num; i++ { + resp, err := nsq.ReadResponse(rw) + if err != nil { + panic(err.Error()) + } + _, data, _ := nsq.UnpackResponse(resp) + if !bytes.Equal(data, []byte("OK")) { + panic("invalid response") + } + } + wg.Done() + }() + wg.Done() + }() + } + + wg.Wait() + + b.StopTimer() + nsqd.Exit() +} + +func BenchmarkProtocolV2PubMultiTopic1(b *testing.B) { benchmarkProtocolV2PubMultiTopic(b, 1) } +func BenchmarkProtocolV2PubMultiTopic2(b *testing.B) { benchmarkProtocolV2PubMultiTopic(b, 2) } +func BenchmarkProtocolV2PubMultiTopic4(b *testing.B) { benchmarkProtocolV2PubMultiTopic(b, 4) } +func BenchmarkProtocolV2PubMultiTopic8(b *testing.B) { benchmarkProtocolV2PubMultiTopic(b, 8) } +func BenchmarkProtocolV2PubMultiTopic16(b *testing.B) { benchmarkProtocolV2PubMultiTopic(b, 16) } +func BenchmarkProtocolV2PubMultiTopic32(b *testing.B) { benchmarkProtocolV2PubMultiTopic(b, 32) } + func benchmarkProtocolV2Pub(b *testing.B, size int) { var wg sync.WaitGroup b.StopTimer() @@ -1546,25 +1621,35 @@ func benchmarkProtocolV2Pub(b *testing.B, size int) { rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) num := b.N / runtime.GOMAXPROCS(0) / batchSize - for i := 0; i < num; i++ { - cmd, _ := nsq.MultiPublish(topicName, batch) - _, err := cmd.WriteTo(rw) - if err != nil { - panic(err.Error()) - } - err = rw.Flush() - if err != nil { - panic(err.Error()) - } - resp, err := nsq.ReadResponse(rw) - if err != nil { - panic(err.Error()) + wg.Add(1) + go func() { + for i := 0; i < num; i++ { + cmd, _ := nsq.MultiPublish(topicName, batch) + _, err := cmd.WriteTo(rw) + if err != nil { + panic(err.Error()) + } + err = rw.Flush() + if err != nil { + panic(err.Error()) + } } - _, data, _ := nsq.UnpackResponse(resp) - if !bytes.Equal(data, []byte("OK")) { - panic("invalid response") + wg.Done() + }() + wg.Add(1) + go func() { + for i := 0; i < num; i++ { + resp, err := nsq.ReadResponse(rw) + if err != nil { + panic(err.Error()) + } + _, data, _ := nsq.UnpackResponse(resp) + if !bytes.Equal(data, []byte("OK")) { + panic("invalid response") + } } - } + wg.Done() + }() wg.Done() }() } @@ -1601,7 +1686,7 @@ func benchmarkProtocolV2Sub(b *testing.B, size int) { topicName := "bench_v2_sub" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) for i := 0; i < b.N; i++ { - msg := NewMessage(<-nsqd.idChan, msg) + msg := NewMessage(topic.GenerateID(), msg) topic.PutMessage(msg) } topic.GetChannel("ch") @@ -1701,7 +1786,7 @@ func benchmarkProtocolV2MultiSub(b *testing.B, num int) { topicName := "bench_v2" + strconv.Itoa(b.N) + "_" + strconv.Itoa(i) + "_" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) for i := 0; i < b.N; i++ { - msg := NewMessage(<-nsqd.idChan, msg) + msg := NewMessage(topic.GenerateID(), msg) topic.PutMessage(msg) } topic.GetChannel("ch") diff --git a/nsqd/stats_test.go b/nsqd/stats_test.go index 70275ac1e..c6ecaf5c8 100644 --- a/nsqd/stats_test.go +++ b/nsqd/stats_test.go @@ -22,7 +22,7 @@ func TestStats(t *testing.T) { topicName := "test_stats" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) conn, err := mustConnectNSQD(tcpAddr) diff --git a/nsqd/topic.go b/nsqd/topic.go index 7d0e886e5..b375746eb 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -6,6 +6,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/nsqio/nsq/internal/quantile" "github.com/nsqio/nsq/internal/util" @@ -25,6 +26,7 @@ type Topic struct { channelUpdateChan chan int waitGroup util.WaitGroupWrapper exitFlag int32 + idFactory *guidFactory ephemeral bool deleteCallback func(*Topic) @@ -47,6 +49,7 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi ctx: ctx, pauseChan: make(chan bool), deleteCallback: deleteCallback, + idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID), } if strings.HasSuffix(topicName, "#ephemeral") { @@ -435,3 +438,13 @@ func (t *Topic) doPause(pause bool) error { func (t *Topic) IsPaused() bool { return atomic.LoadInt32(&t.paused) == 1 } + +func (t *Topic) GenerateID() MessageID { +retry: + id, err := t.idFactory.NewGUID() + if err != nil { + time.Sleep(time.Millisecond) + goto retry + } + return id.Hex() +} diff --git a/nsqd/topic_test.go b/nsqd/topic_test.go index bdf661e0b..9a7fa53b9 100644 --- a/nsqd/topic_test.go +++ b/nsqd/topic_test.go @@ -76,19 +76,19 @@ func TestHealth(t *testing.T) { topic := nsqd.GetTopic("test") topic.backend = &errorBackendQueue{} - msg := NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg := NewMessage(topic.GenerateID(), make([]byte, 100)) err := topic.PutMessage(msg) test.Nil(t, err) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = topic.PutMessages([]*Message{msg}) test.Nil(t, err) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = topic.PutMessage(msg) test.NotNil(t, err) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = topic.PutMessages([]*Message{msg}) test.NotNil(t, err) @@ -102,7 +102,7 @@ func TestHealth(t *testing.T) { topic.backend = &errorRecoveredBackendQueue{} - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = topic.PutMessages([]*Message{msg}) test.Nil(t, err) @@ -155,7 +155,7 @@ func TestDeleteLast(t *testing.T) { test.Nil(t, err) test.Equal(t, 0, len(topic.channelMap)) - msg := NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) + msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) err = topic.PutMessage(msg) time.Sleep(100 * time.Millisecond) test.Nil(t, err) @@ -177,7 +177,7 @@ func TestPause(t *testing.T) { channel := topic.GetChannel("ch1") test.NotNil(t, channel) - msg := NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) + msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) err = topic.PutMessage(msg) test.Nil(t, err) @@ -221,7 +221,7 @@ func BenchmarkTopicPut(b *testing.B) { for i := 0; i <= b.N; i++ { topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) + msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) topic.PutMessage(msg) } } @@ -241,7 +241,7 @@ func BenchmarkTopicToChannelPut(b *testing.B) { for i := 0; i <= b.N; i++ { topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) + msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) topic.PutMessage(msg) }