Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: per-topic message IDs #741

Merged
merged 3 commits into from
Jan 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestInFlightWorker(t *testing.T) {
channel := topic.GetChannel("channel")

for i := 0; i < count; i++ {
msg := NewMessage(<-nsqd.idChan, []byte("test"))
msg := NewMessage(topic.GenerateID(), []byte("test"))
channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
}

Expand Down Expand Up @@ -131,7 +131,7 @@ func TestChannelEmpty(t *testing.T) {

msgs := make([]*Message, 0, 25)
for i := 0; i < 25; i++ {
msg := NewMessage(<-nsqd.idChan, []byte("test"))
msg := NewMessage(topic.GenerateID(), []byte("test"))
channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
msgs = append(msgs, msg)
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestChannelEmptyConsumer(t *testing.T) {
channel.AddClient(client.ID, client)

for i := 0; i < 25; i++ {
msg := NewMessage(<-nsqd.idChan, []byte("test"))
msg := NewMessage(topic.GenerateID(), []byte("test"))
channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
client.SendingMessage()
}
Expand Down Expand Up @@ -202,15 +202,15 @@ func TestChannelHealth(t *testing.T) {

channel.backend = &errorBackendQueue{}

msg := NewMessage(<-nsqd.idChan, make([]byte, 100))
msg := NewMessage(topic.GenerateID(), make([]byte, 100))
err := channel.PutMessage(msg)
test.Nil(t, err)

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
msg = NewMessage(topic.GenerateID(), make([]byte, 100))
err = channel.PutMessage(msg)
test.Nil(t, err)

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
msg = NewMessage(topic.GenerateID(), make([]byte, 100))
err = channel.PutMessage(msg)
test.NotNil(t, err)

Expand All @@ -224,7 +224,7 @@ func TestChannelHealth(t *testing.T) {

channel.backend = &errorRecoveredBackendQueue{}

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
msg = NewMessage(topic.GenerateID(), make([]byte, 100))
err = channel.PutMessage(msg)
test.Nil(t, err)

Expand Down
21 changes: 19 additions & 2 deletions nsqd/guid.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package nsqd
import (
"encoding/hex"
"errors"
"sync"
"time"
)

Expand All @@ -33,22 +34,35 @@ var ErrIDBackwards = errors.New("ID went backward")
type guid int64

type guidFactory struct {
sync.Mutex

workerID int64
sequence int64
lastTimestamp int64
lastID guid
}

func (f *guidFactory) NewGUID(workerID int64) (guid, error) {
func NewGUIDFactory(workerID int64) *guidFactory {
return &guidFactory{
workerID: workerID,
}
}

func (f *guidFactory) NewGUID() (guid, error) {
f.Lock()

// divide by 1048576, giving pseudo-milliseconds
ts := time.Now().UnixNano() >> 20

if ts < f.lastTimestamp {
f.Unlock()
return 0, ErrTimeBackwards
}

if f.lastTimestamp == ts {
f.sequence = (f.sequence + 1) & sequenceMask
if f.sequence == 0 {
f.Unlock()
return 0, ErrSequenceExpired
}
} else {
Expand All @@ -58,15 +72,18 @@ func (f *guidFactory) NewGUID(workerID int64) (guid, error) {
f.lastTimestamp = ts

id := guid(((ts - twepoch) << timestampShift) |
(workerID << workerIDShift) |
(f.workerID << workerIDShift) |
f.sequence)

if id <= f.lastID {
f.Unlock()
return 0, ErrIDBackwards
}

f.lastID = id

f.Unlock()

return id, nil
}

Expand Down
7 changes: 2 additions & 5 deletions nsqd/guid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ func BenchmarkGUIDUnsafe(b *testing.B) {
func BenchmarkGUID(b *testing.B) {
factory := &guidFactory{}
for i := 0; i < b.N; i++ {
guid, err := factory.NewGUID(0)
if err != nil {
continue
}
guid.Hex()
id, _ := factory.NewGUID()
id.Hex()
}
}
6 changes: 3 additions & 3 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout
}
}

msg := NewMessage(<-s.ctx.nsqd.idChan, body)
msg := NewMessage(topic.GenerateID(), body)
msg.deferred = deferred
err = topic.PutMessage(msg)
if err != nil {
Expand Down Expand Up @@ -243,7 +243,7 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou
_, ok := reqParams["binary"]
if ok {
tmp := make([]byte, 4)
msgs, err = readMPUB(req.Body, tmp, s.ctx.nsqd.idChan,
msgs, err = readMPUB(req.Body, tmp, topic,
s.ctx.nsqd.getOpts().MaxMsgSize, s.ctx.nsqd.getOpts().MaxBodySize)
if err != nil {
return nil, http_api.Err{413, err.(*protocol.FatalClientErr).Code[2:]}
Expand Down Expand Up @@ -282,7 +282,7 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou
return nil, http_api.Err{413, "MSG_TOO_BIG"}
}

msg := NewMessage(<-s.ctx.nsqd.idChan, block)
msg := NewMessage(topic.GenerateID(), block)
msgs = append(msgs, msg)
}
}
Expand Down
33 changes: 0 additions & 33 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net"
"os"
"path"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -60,7 +59,6 @@ type NSQD struct {

poolSize int

idChan chan MessageID
notifyChan chan interface{}
optsNotificationChan chan struct{}
exitChan chan int
Expand All @@ -79,7 +77,6 @@ func New(opts *Options) *NSQD {
n := &NSQD{
startTime: time.Now(),
topicMap: make(map[string]*Topic),
idChan: make(chan MessageID, 4096),
exitChan: make(chan int),
notifyChan: make(chan interface{}),
optsNotificationChan: make(chan struct{}, 1),
Expand Down Expand Up @@ -253,7 +250,6 @@ func (n *NSQD) Main() {
})

n.waitGroup.Wrap(func() { n.queueScanLoop() })
n.waitGroup.Wrap(func() { n.idPump() })
n.waitGroup.Wrap(func() { n.lookupLoop() })
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(func() { n.statsdLoop() })
Expand Down Expand Up @@ -401,8 +397,6 @@ func (n *NSQD) Exit() {
}
n.Unlock()

// we want to do this last as it closes the idPump (if closed first it
// could potentially starve items in process and deadlock)
close(n.exitChan)
n.waitGroup.Wait()

Expand Down Expand Up @@ -511,33 +505,6 @@ func (n *NSQD) DeleteExistingTopic(topicName string) error {
return nil
}

func (n *NSQD) idPump() {
factory := &guidFactory{}
lastError := time.Unix(0, 0)
workerID := n.getOpts().ID
for {
id, err := factory.NewGUID(workerID)
if err != nil {
now := time.Now()
if now.Sub(lastError) > time.Second {
// only print the error once/second
n.logf("ERROR: %s", err)
lastError = now
}
runtime.Gosched()
continue
}
select {
case n.idChan <- id.Hex():
case <-n.exitChan:
goto exit
}
}

exit:
n.logf("ID: closing")
}

func (n *NSQD) Notify(v interface{}) {
// since the in-memory metadata is incomplete,
// should not persist metadata while loading it.
Expand Down
4 changes: 2 additions & 2 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestStartup(t *testing.T) {
body := make([]byte, 256)
topic := nsqd.GetTopic(topicName)
for i := 0; i < iterations; i++ {
msg := NewMessage(<-nsqd.idChan, body)
msg := NewMessage(topic.GenerateID(), body)
topic.PutMessage(msg)
}

Expand Down Expand Up @@ -182,7 +182,7 @@ func TestEphemeralTopicsAndChannels(t *testing.T) {
client := newClientV2(0, nil, &context{nsqd})
ephemeralChannel.AddClient(client.ID, client)

msg := NewMessage(<-nsqd.idChan, body)
msg := NewMessage(topic.GenerateID(), body)
topic.PutMessage(msg)
msg = <-ephemeralChannel.memoryMsgChan
test.Equal(t, body, msg.Body)
Expand Down
22 changes: 11 additions & 11 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
}

topic := p.ctx.nsqd.GetTopic(topicName)
msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody)
msg := NewMessage(topic.GenerateID(), messageBody)
err = topic.PutMessage(msg)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
Expand All @@ -803,6 +803,12 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) {
fmt.Sprintf("E_BAD_TOPIC MPUB topic name %q is not valid", topicName))
}

if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil {
return nil, err
}

topic := p.ctx.nsqd.GetTopic(topicName)

bodyLen, err := readLen(client.Reader, client.lenSlice)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read body size")
Expand All @@ -818,18 +824,12 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) {
fmt.Sprintf("MPUB body too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxBodySize))
}

messages, err := readMPUB(client.Reader, client.lenSlice, p.ctx.nsqd.idChan,
messages, err := readMPUB(client.Reader, client.lenSlice, topic,
p.ctx.nsqd.getOpts().MaxMsgSize, p.ctx.nsqd.getOpts().MaxBodySize)
if err != nil {
return nil, err
}

if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil {
return nil, err
}

topic := p.ctx.nsqd.GetTopic(topicName)

// if we've made it this far we've validated all the input,
// the only possible error is that the topic is exiting during
// this next call (and no messages will be queued in that case)
Expand Down Expand Up @@ -893,7 +893,7 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
}

topic := p.ctx.nsqd.GetTopic(topicName)
msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody)
msg := NewMessage(topic.GenerateID(), messageBody)
msg.deferred = timeoutDuration
err = topic.PutMessage(msg)
if err != nil {
Expand Down Expand Up @@ -930,7 +930,7 @@ func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error) {
return nil, nil
}

func readMPUB(r io.Reader, tmp []byte, idChan chan MessageID, maxMessageSize int64, maxBodySize int64) ([]*Message, error) {
func readMPUB(r io.Reader, tmp []byte, topic *Topic, maxMessageSize int64, maxBodySize int64) ([]*Message, error) {
numMessages, err := readLen(r, tmp)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read message count")
Expand Down Expand Up @@ -967,7 +967,7 @@ func readMPUB(r io.Reader, tmp []byte, idChan chan MessageID, maxMessageSize int
return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "MPUB failed to read message body")
}

messages = append(messages, NewMessage(<-idChan, msgBody))
messages = append(messages, NewMessage(topic.GenerateID(), msgBody))
}

return messages, nil
Expand Down
Loading