diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index 1dd141733..bccd8b582 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -89,7 +89,7 @@ func TestInFlightWorker(t *testing.T) { channel := topic.GetChannel("channel") for i := 0; i < count; i++ { - msg := NewMessage(<-nsqd.idChan, []byte("test")) + msg := NewMessage(topic.GenerateID(), []byte("test")) channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) } @@ -131,7 +131,7 @@ func TestChannelEmpty(t *testing.T) { msgs := make([]*Message, 0, 25) for i := 0; i < 25; i++ { - msg := NewMessage(<-nsqd.idChan, []byte("test")) + msg := NewMessage(topic.GenerateID(), []byte("test")) channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) msgs = append(msgs, msg) } @@ -169,7 +169,7 @@ func TestChannelEmptyConsumer(t *testing.T) { channel.AddClient(client.ID, client) for i := 0; i < 25; i++ { - msg := NewMessage(<-nsqd.idChan, []byte("test")) + msg := NewMessage(topic.GenerateID(), []byte("test")) channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) client.SendingMessage() } @@ -202,15 +202,15 @@ func TestChannelHealth(t *testing.T) { channel.backend = &errorBackendQueue{} - msg := NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg := NewMessage(topic.GenerateID(), make([]byte, 100)) err := channel.PutMessage(msg) test.Nil(t, err) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = channel.PutMessage(msg) test.Nil(t, err) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = channel.PutMessage(msg) test.NotNil(t, err) @@ -224,7 +224,7 @@ func TestChannelHealth(t *testing.T) { channel.backend = &errorRecoveredBackendQueue{} - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = channel.PutMessage(msg) test.Nil(t, err) diff --git a/nsqd/guid.go b/nsqd/guid.go index 0fa5823a1..4e5ba021e 100644 --- a/nsqd/guid.go +++ b/nsqd/guid.go @@ -1,73 +1,18 @@ package nsqd -// the core algorithm here was borrowed from: -// Blake Mizerany's `noeqd` https://github.com/bmizerany/noeqd -// and indirectly: -// Twitter's `snowflake` https://github.com/twitter/snowflake - -// only minor cleanup and changes to introduce a type, combine the concept -// of workerID + datacenterId into a single identifier, and modify the -// behavior when sequences rollover for our specific implementation needs - import ( "encoding/hex" - "errors" - "time" + "sync/atomic" ) -const ( - workerIDBits = uint64(10) - sequenceBits = uint64(12) - workerIDShift = sequenceBits - timestampShift = sequenceBits + workerIDBits - sequenceMask = int64(-1) ^ (int64(-1) << sequenceBits) - - // ( 2012-10-28 16:23:42 UTC ).UnixNano() >> 20 - twepoch = int64(1288834974288) -) - -var ErrTimeBackwards = errors.New("time has gone backwards") -var ErrSequenceExpired = errors.New("sequence expired") -var ErrIDBackwards = errors.New("ID went backward") - type guid int64 type guidFactory struct { - sequence int64 - lastTimestamp int64 - lastID guid + sequence int64 } -func (f *guidFactory) NewGUID(workerID int64) (guid, error) { - // divide by 1048576, giving pseudo-milliseconds - ts := time.Now().UnixNano() >> 20 - - if ts < f.lastTimestamp { - return 0, ErrTimeBackwards - } - - if f.lastTimestamp == ts { - f.sequence = (f.sequence + 1) & sequenceMask - if f.sequence == 0 { - return 0, ErrSequenceExpired - } - } else { - f.sequence = 0 - } - - f.lastTimestamp = ts - - id := guid(((ts - twepoch) << timestampShift) | - (workerID << workerIDShift) | - f.sequence) - - if id <= f.lastID { - return 0, ErrIDBackwards - } - - f.lastID = id - - return id, nil +func (f *guidFactory) NewGUID() guid { + return guid(atomic.AddInt64(&f.sequence, 1)) } func (g guid) Hex() MessageID { diff --git a/nsqd/guid_test.go b/nsqd/guid_test.go index 946b742c7..0056c5460 100644 --- a/nsqd/guid_test.go +++ b/nsqd/guid_test.go @@ -25,10 +25,6 @@ func BenchmarkGUIDUnsafe(b *testing.B) { func BenchmarkGUID(b *testing.B) { factory := &guidFactory{} for i := 0; i < b.N; i++ { - guid, err := factory.NewGUID(0) - if err != nil { - continue - } - guid.Hex() + factory.NewGUID().Hex() } } diff --git a/nsqd/http.go b/nsqd/http.go index 7efd3c3a2..611ada63a 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -214,7 +214,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout } } - msg := NewMessage(<-s.ctx.nsqd.idChan, body) + msg := NewMessage(topic.GenerateID(), body) msg.deferred = deferred err = topic.PutMessage(msg) if err != nil { @@ -243,7 +243,7 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou _, ok := reqParams["binary"] if ok { tmp := make([]byte, 4) - msgs, err = readMPUB(req.Body, tmp, s.ctx.nsqd.idChan, + msgs, err = readMPUB(req.Body, tmp, topic, s.ctx.nsqd.getOpts().MaxMsgSize, s.ctx.nsqd.getOpts().MaxBodySize) if err != nil { return nil, http_api.Err{413, err.(*protocol.FatalClientErr).Code[2:]} @@ -282,7 +282,7 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou return nil, http_api.Err{413, "MSG_TOO_BIG"} } - msg := NewMessage(<-s.ctx.nsqd.idChan, block) + msg := NewMessage(topic.GenerateID(), block) msgs = append(msgs, msg) } } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 4c5ba90fe..018332691 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -11,7 +11,6 @@ import ( "net" "os" "path" - "runtime" "strings" "sync" "sync/atomic" @@ -60,7 +59,6 @@ type NSQD struct { poolSize int - idChan chan MessageID notifyChan chan interface{} optsNotificationChan chan struct{} exitChan chan int @@ -79,7 +77,6 @@ func New(opts *Options) *NSQD { n := &NSQD{ startTime: time.Now(), topicMap: make(map[string]*Topic), - idChan: make(chan MessageID, 4096), exitChan: make(chan int), notifyChan: make(chan interface{}), optsNotificationChan: make(chan struct{}, 1), @@ -253,7 +250,6 @@ func (n *NSQD) Main() { }) n.waitGroup.Wrap(func() { n.queueScanLoop() }) - n.waitGroup.Wrap(func() { n.idPump() }) n.waitGroup.Wrap(func() { n.lookupLoop() }) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(func() { n.statsdLoop() }) @@ -401,8 +397,6 @@ func (n *NSQD) Exit() { } n.Unlock() - // we want to do this last as it closes the idPump (if closed first it - // could potentially starve items in process and deadlock) close(n.exitChan) n.waitGroup.Wait() @@ -511,33 +505,6 @@ func (n *NSQD) DeleteExistingTopic(topicName string) error { return nil } -func (n *NSQD) idPump() { - factory := &guidFactory{} - lastError := time.Unix(0, 0) - workerID := n.getOpts().ID - for { - id, err := factory.NewGUID(workerID) - if err != nil { - now := time.Now() - if now.Sub(lastError) > time.Second { - // only print the error once/second - n.logf("ERROR: %s", err) - lastError = now - } - runtime.Gosched() - continue - } - select { - case n.idChan <- id.Hex(): - case <-n.exitChan: - goto exit - } - } - -exit: - n.logf("ID: closing") -} - func (n *NSQD) Notify(v interface{}) { // since the in-memory metadata is incomplete, // should not persist metadata while loading it. diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index 374c0619e..dcc3ba501 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -76,7 +76,7 @@ func TestStartup(t *testing.T) { body := make([]byte, 256) topic := nsqd.GetTopic(topicName) for i := 0; i < iterations; i++ { - msg := NewMessage(<-nsqd.idChan, body) + msg := NewMessage(topic.GenerateID(), body) topic.PutMessage(msg) } @@ -182,7 +182,7 @@ func TestEphemeralTopicsAndChannels(t *testing.T) { client := newClientV2(0, nil, &context{nsqd}) ephemeralChannel.AddClient(client.ID, client) - msg := NewMessage(<-nsqd.idChan, body) + msg := NewMessage(topic.GenerateID(), body) topic.PutMessage(msg) msg = <-ephemeralChannel.memoryMsgChan test.Equal(t, body, msg.Body) diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 8f6dbe2c4..000ecb140 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -781,7 +781,7 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { } topic := p.ctx.nsqd.GetTopic(topicName) - msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody) + msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) @@ -803,6 +803,12 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("E_BAD_TOPIC MPUB topic name %q is not valid", topicName)) } + if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil { + return nil, err + } + + topic := p.ctx.nsqd.GetTopic(topicName) + bodyLen, err := readLen(client.Reader, client.lenSlice) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read body size") @@ -818,18 +824,12 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("MPUB body too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxBodySize)) } - messages, err := readMPUB(client.Reader, client.lenSlice, p.ctx.nsqd.idChan, + messages, err := readMPUB(client.Reader, client.lenSlice, topic, p.ctx.nsqd.getOpts().MaxMsgSize, p.ctx.nsqd.getOpts().MaxBodySize) if err != nil { return nil, err } - if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil { - return nil, err - } - - topic := p.ctx.nsqd.GetTopic(topicName) - // if we've made it this far we've validated all the input, // the only possible error is that the topic is exiting during // this next call (and no messages will be queued in that case) @@ -893,7 +893,7 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { } topic := p.ctx.nsqd.GetTopic(topicName) - msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody) + msg := NewMessage(topic.GenerateID(), messageBody) msg.deferred = timeoutDuration err = topic.PutMessage(msg) if err != nil { @@ -930,7 +930,7 @@ func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error) { return nil, nil } -func readMPUB(r io.Reader, tmp []byte, idChan chan MessageID, maxMessageSize int64, maxBodySize int64) ([]*Message, error) { +func readMPUB(r io.Reader, tmp []byte, topic *Topic, maxMessageSize int64, maxBodySize int64) ([]*Message, error) { numMessages, err := readLen(r, tmp) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read message count") @@ -967,7 +967,7 @@ func readMPUB(r io.Reader, tmp []byte, idChan chan MessageID, maxMessageSize int return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "MPUB failed to read message body") } - messages = append(messages, NewMessage(<-idChan, msgBody)) + messages = append(messages, NewMessage(topic.GenerateID(), msgBody)) } return messages, nil diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 08c5ac0fe..1c5b72ffb 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -130,7 +130,7 @@ func TestBasicV2(t *testing.T) { topicName := "test_v2" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) conn, err := mustConnectNSQD(tcpAddr) @@ -165,7 +165,7 @@ func TestMultipleConsumerV2(t *testing.T) { topicName := "test_multiple_v2" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.GetChannel("ch1") topic.GetChannel("ch2") topic.PutMessage(msg) @@ -375,7 +375,7 @@ func TestPausing(t *testing.T) { test.Nil(t, err) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) channel := topic.GetChannel("ch") topic.PutMessage(msg) @@ -400,7 +400,7 @@ func TestPausing(t *testing.T) { // sleep to allow the paused state to take effect time.Sleep(50 * time.Millisecond) - msg = NewMessage(<-nsqd.idChan, []byte("test body2")) + msg = NewMessage(topic.GenerateID(), []byte("test body2")) topic.PutMessage(msg) // allow the client to possibly get a message, the test would hang indefinitely @@ -412,7 +412,7 @@ func TestPausing(t *testing.T) { // unpause the channel... the client should now be pushed a message channel.UnPause() - msg = NewMessage(<-nsqd.idChan, []byte("test body3")) + msg = NewMessage(topic.GenerateID(), []byte("test body3")) topic.PutMessage(msg) resp, _ = nsq.ReadResponse(conn) @@ -618,7 +618,7 @@ func TestTouch(t *testing.T) { topic := nsqd.GetTopic(topicName) channel := topic.GetChannel("ch") - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) _, err = nsq.Ready(1).WriteTo(conn) @@ -660,7 +660,7 @@ func TestMaxRdyCount(t *testing.T) { defer conn.Close() topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) data := identify(t, conn, nil, frameTypeResponse) @@ -736,7 +736,7 @@ func TestOutputBuffering(t *testing.T) { outputBufferTimeout := 500 topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, make([]byte, outputBufferSize-1024)) + msg := NewMessage(topic.GenerateID(), make([]byte, outputBufferSize-1024)) topic.PutMessage(msg) start := time.Now() @@ -1132,7 +1132,7 @@ func TestSnappy(t *testing.T) { test.Nil(t, err) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, msgBody) + msg := NewMessage(topic.GenerateID(), msgBody) topic.PutMessage(msg) resp, _ = nsq.ReadResponse(compressConn) @@ -1226,7 +1226,7 @@ func TestSampling(t *testing.T) { topicName := "test_sampling" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) for i := 0; i < num; i++ { - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) } channel := topic.GetChannel("ch") @@ -1330,13 +1330,13 @@ func TestClientMsgTimeout(t *testing.T) { topicName := "test_cmsg_timeout" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) ch := topic.GetChannel("ch") - msg := NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg := NewMessage(topic.GenerateID(), make([]byte, 100)) topic.PutMessage(msg) // without this the race detector thinks there's a write // to msg.Attempts that races with the read in the protocol's messagePump... // it does not reflect a realistically possible condition - topic.PutMessage(NewMessage(<-nsqd.idChan, make([]byte, 100))) + topic.PutMessage(NewMessage(topic.GenerateID(), make([]byte, 100))) conn, err := mustConnectNSQD(tcpAddr) test.Nil(t, err) @@ -1666,7 +1666,7 @@ func benchmarkProtocolV2Sub(b *testing.B, size int) { topicName := "bench_v2_sub" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) for i := 0; i < b.N; i++ { - msg := NewMessage(<-nsqd.idChan, msg) + msg := NewMessage(topic.GenerateID(), msg) topic.PutMessage(msg) } topic.GetChannel("ch") @@ -1766,7 +1766,7 @@ func benchmarkProtocolV2MultiSub(b *testing.B, num int) { topicName := "bench_v2" + strconv.Itoa(b.N) + "_" + strconv.Itoa(i) + "_" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) for i := 0; i < b.N; i++ { - msg := NewMessage(<-nsqd.idChan, msg) + msg := NewMessage(topic.GenerateID(), msg) topic.PutMessage(msg) } topic.GetChannel("ch") diff --git a/nsqd/stats_test.go b/nsqd/stats_test.go index 70275ac1e..c6ecaf5c8 100644 --- a/nsqd/stats_test.go +++ b/nsqd/stats_test.go @@ -22,7 +22,7 @@ func TestStats(t *testing.T) { topicName := "test_stats" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) conn, err := mustConnectNSQD(tcpAddr) diff --git a/nsqd/topic.go b/nsqd/topic.go index 7d0e886e5..81dc2ad87 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -25,6 +25,7 @@ type Topic struct { channelUpdateChan chan int waitGroup util.WaitGroupWrapper exitFlag int32 + idFactory *guidFactory ephemeral bool deleteCallback func(*Topic) @@ -47,6 +48,7 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi ctx: ctx, pauseChan: make(chan bool), deleteCallback: deleteCallback, + idFactory: &guidFactory{}, } if strings.HasSuffix(topicName, "#ephemeral") { @@ -435,3 +437,7 @@ func (t *Topic) doPause(pause bool) error { func (t *Topic) IsPaused() bool { return atomic.LoadInt32(&t.paused) == 1 } + +func (t *Topic) GenerateID() MessageID { + return t.idFactory.NewGUID().Hex() +} diff --git a/nsqd/topic_test.go b/nsqd/topic_test.go index bdf661e0b..9a7fa53b9 100644 --- a/nsqd/topic_test.go +++ b/nsqd/topic_test.go @@ -76,19 +76,19 @@ func TestHealth(t *testing.T) { topic := nsqd.GetTopic("test") topic.backend = &errorBackendQueue{} - msg := NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg := NewMessage(topic.GenerateID(), make([]byte, 100)) err := topic.PutMessage(msg) test.Nil(t, err) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = topic.PutMessages([]*Message{msg}) test.Nil(t, err) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = topic.PutMessage(msg) test.NotNil(t, err) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = topic.PutMessages([]*Message{msg}) test.NotNil(t, err) @@ -102,7 +102,7 @@ func TestHealth(t *testing.T) { topic.backend = &errorRecoveredBackendQueue{} - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = topic.PutMessages([]*Message{msg}) test.Nil(t, err) @@ -155,7 +155,7 @@ func TestDeleteLast(t *testing.T) { test.Nil(t, err) test.Equal(t, 0, len(topic.channelMap)) - msg := NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) + msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) err = topic.PutMessage(msg) time.Sleep(100 * time.Millisecond) test.Nil(t, err) @@ -177,7 +177,7 @@ func TestPause(t *testing.T) { channel := topic.GetChannel("ch1") test.NotNil(t, channel) - msg := NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) + msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) err = topic.PutMessage(msg) test.Nil(t, err) @@ -221,7 +221,7 @@ func BenchmarkTopicPut(b *testing.B) { for i := 0; i <= b.N; i++ { topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) + msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) topic.PutMessage(msg) } } @@ -241,7 +241,7 @@ func BenchmarkTopicToChannelPut(b *testing.B) { for i := 0; i <= b.N; i++ { topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) + msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) topic.PutMessage(msg) }