Skip to content

Commit

Permalink
nsqd: per-topic message IDs
Browse files Browse the repository at this point in the history
this modifies message ID generation such that each topic
maintains a monotonic/atomic counter to generate
message IDs, a more scalable and performant approach.
  • Loading branch information
mreiferson committed Jan 1, 2017
1 parent 7a70f6c commit a279a58
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 144 deletions.
14 changes: 7 additions & 7 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestInFlightWorker(t *testing.T) {
channel := topic.GetChannel("channel")

for i := 0; i < count; i++ {
msg := NewMessage(<-nsqd.idChan, []byte("test"))
msg := NewMessage(topic.GenerateID(), []byte("test"))
channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
}

Expand Down Expand Up @@ -131,7 +131,7 @@ func TestChannelEmpty(t *testing.T) {

msgs := make([]*Message, 0, 25)
for i := 0; i < 25; i++ {
msg := NewMessage(<-nsqd.idChan, []byte("test"))
msg := NewMessage(topic.GenerateID(), []byte("test"))
channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
msgs = append(msgs, msg)
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestChannelEmptyConsumer(t *testing.T) {
channel.AddClient(client.ID, client)

for i := 0; i < 25; i++ {
msg := NewMessage(<-nsqd.idChan, []byte("test"))
msg := NewMessage(topic.GenerateID(), []byte("test"))
channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
client.SendingMessage()
}
Expand Down Expand Up @@ -202,15 +202,15 @@ func TestChannelHealth(t *testing.T) {

channel.backend = &errorBackendQueue{}

msg := NewMessage(<-nsqd.idChan, make([]byte, 100))
msg := NewMessage(topic.GenerateID(), make([]byte, 100))
err := channel.PutMessage(msg)
test.Nil(t, err)

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
msg = NewMessage(topic.GenerateID(), make([]byte, 100))
err = channel.PutMessage(msg)
test.Nil(t, err)

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
msg = NewMessage(topic.GenerateID(), make([]byte, 100))
err = channel.PutMessage(msg)
test.NotNil(t, err)

Expand All @@ -224,7 +224,7 @@ func TestChannelHealth(t *testing.T) {

channel.backend = &errorRecoveredBackendQueue{}

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
msg = NewMessage(topic.GenerateID(), make([]byte, 100))
err = channel.PutMessage(msg)
test.Nil(t, err)

Expand Down
63 changes: 4 additions & 59 deletions nsqd/guid.go
Original file line number Diff line number Diff line change
@@ -1,73 +1,18 @@
package nsqd

// the core algorithm here was borrowed from:
// Blake Mizerany's `noeqd` https://github.com/bmizerany/noeqd
// and indirectly:
// Twitter's `snowflake` https://github.com/twitter/snowflake

// only minor cleanup and changes to introduce a type, combine the concept
// of workerID + datacenterId into a single identifier, and modify the
// behavior when sequences rollover for our specific implementation needs

import (
"encoding/hex"
"errors"
"time"
"sync/atomic"
)

const (
workerIDBits = uint64(10)
sequenceBits = uint64(12)
workerIDShift = sequenceBits
timestampShift = sequenceBits + workerIDBits
sequenceMask = int64(-1) ^ (int64(-1) << sequenceBits)

// ( 2012-10-28 16:23:42 UTC ).UnixNano() >> 20
twepoch = int64(1288834974288)
)

var ErrTimeBackwards = errors.New("time has gone backwards")
var ErrSequenceExpired = errors.New("sequence expired")
var ErrIDBackwards = errors.New("ID went backward")

type guid int64

type guidFactory struct {
sequence int64
lastTimestamp int64
lastID guid
sequence int64
}

func (f *guidFactory) NewGUID(workerID int64) (guid, error) {
// divide by 1048576, giving pseudo-milliseconds
ts := time.Now().UnixNano() >> 20

if ts < f.lastTimestamp {
return 0, ErrTimeBackwards
}

if f.lastTimestamp == ts {
f.sequence = (f.sequence + 1) & sequenceMask
if f.sequence == 0 {
return 0, ErrSequenceExpired
}
} else {
f.sequence = 0
}

f.lastTimestamp = ts

id := guid(((ts - twepoch) << timestampShift) |
(workerID << workerIDShift) |
f.sequence)

if id <= f.lastID {
return 0, ErrIDBackwards
}

f.lastID = id

return id, nil
func (f *guidFactory) NewGUID() guid {
return guid(atomic.AddInt64(&f.sequence, 1))
}

func (g guid) Hex() MessageID {
Expand Down
6 changes: 1 addition & 5 deletions nsqd/guid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ func BenchmarkGUIDUnsafe(b *testing.B) {
func BenchmarkGUID(b *testing.B) {
factory := &guidFactory{}
for i := 0; i < b.N; i++ {
guid, err := factory.NewGUID(0)
if err != nil {
continue
}
guid.Hex()
factory.NewGUID().Hex()
}
}
6 changes: 3 additions & 3 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout
}
}

msg := NewMessage(<-s.ctx.nsqd.idChan, body)
msg := NewMessage(topic.GenerateID(), body)
msg.deferred = deferred
err = topic.PutMessage(msg)
if err != nil {
Expand Down Expand Up @@ -243,7 +243,7 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou
_, ok := reqParams["binary"]
if ok {
tmp := make([]byte, 4)
msgs, err = readMPUB(req.Body, tmp, s.ctx.nsqd.idChan,
msgs, err = readMPUB(req.Body, tmp, topic,
s.ctx.nsqd.getOpts().MaxMsgSize, s.ctx.nsqd.getOpts().MaxBodySize)
if err != nil {
return nil, http_api.Err{413, err.(*protocol.FatalClientErr).Code[2:]}
Expand Down Expand Up @@ -282,7 +282,7 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou
return nil, http_api.Err{413, "MSG_TOO_BIG"}
}

msg := NewMessage(<-s.ctx.nsqd.idChan, block)
msg := NewMessage(topic.GenerateID(), block)
msgs = append(msgs, msg)
}
}
Expand Down
33 changes: 0 additions & 33 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net"
"os"
"path"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -60,7 +59,6 @@ type NSQD struct {

poolSize int

idChan chan MessageID
notifyChan chan interface{}
optsNotificationChan chan struct{}
exitChan chan int
Expand All @@ -79,7 +77,6 @@ func New(opts *Options) *NSQD {
n := &NSQD{
startTime: time.Now(),
topicMap: make(map[string]*Topic),
idChan: make(chan MessageID, 4096),
exitChan: make(chan int),
notifyChan: make(chan interface{}),
optsNotificationChan: make(chan struct{}, 1),
Expand Down Expand Up @@ -253,7 +250,6 @@ func (n *NSQD) Main() {
})

n.waitGroup.Wrap(func() { n.queueScanLoop() })
n.waitGroup.Wrap(func() { n.idPump() })
n.waitGroup.Wrap(func() { n.lookupLoop() })
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(func() { n.statsdLoop() })
Expand Down Expand Up @@ -401,8 +397,6 @@ func (n *NSQD) Exit() {
}
n.Unlock()

// we want to do this last as it closes the idPump (if closed first it
// could potentially starve items in process and deadlock)
close(n.exitChan)
n.waitGroup.Wait()

Expand Down Expand Up @@ -511,33 +505,6 @@ func (n *NSQD) DeleteExistingTopic(topicName string) error {
return nil
}

func (n *NSQD) idPump() {
factory := &guidFactory{}
lastError := time.Unix(0, 0)
workerID := n.getOpts().ID
for {
id, err := factory.NewGUID(workerID)
if err != nil {
now := time.Now()
if now.Sub(lastError) > time.Second {
// only print the error once/second
n.logf("ERROR: %s", err)
lastError = now
}
runtime.Gosched()
continue
}
select {
case n.idChan <- id.Hex():
case <-n.exitChan:
goto exit
}
}

exit:
n.logf("ID: closing")
}

func (n *NSQD) Notify(v interface{}) {
// since the in-memory metadata is incomplete,
// should not persist metadata while loading it.
Expand Down
4 changes: 2 additions & 2 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestStartup(t *testing.T) {
body := make([]byte, 256)
topic := nsqd.GetTopic(topicName)
for i := 0; i < iterations; i++ {
msg := NewMessage(<-nsqd.idChan, body)
msg := NewMessage(topic.GenerateID(), body)
topic.PutMessage(msg)
}

Expand Down Expand Up @@ -182,7 +182,7 @@ func TestEphemeralTopicsAndChannels(t *testing.T) {
client := newClientV2(0, nil, &context{nsqd})
ephemeralChannel.AddClient(client.ID, client)

msg := NewMessage(<-nsqd.idChan, body)
msg := NewMessage(topic.GenerateID(), body)
topic.PutMessage(msg)
msg = <-ephemeralChannel.memoryMsgChan
test.Equal(t, body, msg.Body)
Expand Down
22 changes: 11 additions & 11 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
}

topic := p.ctx.nsqd.GetTopic(topicName)
msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody)
msg := NewMessage(topic.GenerateID(), messageBody)
err = topic.PutMessage(msg)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
Expand All @@ -803,6 +803,12 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) {
fmt.Sprintf("E_BAD_TOPIC MPUB topic name %q is not valid", topicName))
}

if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil {
return nil, err
}

topic := p.ctx.nsqd.GetTopic(topicName)

bodyLen, err := readLen(client.Reader, client.lenSlice)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read body size")
Expand All @@ -818,18 +824,12 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) {
fmt.Sprintf("MPUB body too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxBodySize))
}

messages, err := readMPUB(client.Reader, client.lenSlice, p.ctx.nsqd.idChan,
messages, err := readMPUB(client.Reader, client.lenSlice, topic,
p.ctx.nsqd.getOpts().MaxMsgSize, p.ctx.nsqd.getOpts().MaxBodySize)
if err != nil {
return nil, err
}

if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil {
return nil, err
}

topic := p.ctx.nsqd.GetTopic(topicName)

// if we've made it this far we've validated all the input,
// the only possible error is that the topic is exiting during
// this next call (and no messages will be queued in that case)
Expand Down Expand Up @@ -893,7 +893,7 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
}

topic := p.ctx.nsqd.GetTopic(topicName)
msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody)
msg := NewMessage(topic.GenerateID(), messageBody)
msg.deferred = timeoutDuration
err = topic.PutMessage(msg)
if err != nil {
Expand Down Expand Up @@ -930,7 +930,7 @@ func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error) {
return nil, nil
}

func readMPUB(r io.Reader, tmp []byte, idChan chan MessageID, maxMessageSize int64, maxBodySize int64) ([]*Message, error) {
func readMPUB(r io.Reader, tmp []byte, topic *Topic, maxMessageSize int64, maxBodySize int64) ([]*Message, error) {
numMessages, err := readLen(r, tmp)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read message count")
Expand Down Expand Up @@ -967,7 +967,7 @@ func readMPUB(r io.Reader, tmp []byte, idChan chan MessageID, maxMessageSize int
return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "MPUB failed to read message body")
}

messages = append(messages, NewMessage(<-idChan, msgBody))
messages = append(messages, NewMessage(topic.GenerateID(), msgBody))
}

return messages, nil
Expand Down
Loading

0 comments on commit a279a58

Please sign in to comment.