Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor guid, message, queue #626

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nsqd/backend_queue.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package nsqd

import "bytes"

// BackendQueue represents the behavior for the secondary message
// storage system
type BackendQueue interface {
Expand All @@ -9,4 +11,5 @@ type BackendQueue interface {
Delete() error
Depth() int64
Empty() error
WriteMsg(buf *bytes.Buffer, msg *Message) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was intentional for DiskQueue to not have first class knowledge of Message - that's why we had the shim in between the two.

}
10 changes: 5 additions & 5 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (c *Channel) flush() error {
// this will read until it's closed (exited)
for msg := range c.clientMsgChan {
c.ctx.nsqd.logf("CHANNEL(%s): recovered buffered message from clientMsgChan", c.name)
writeMessageToBackend(&msgBuf, msg, c.backend)
c.backend.WriteMsg(&msgBuf, msg)
}

if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 {
Expand All @@ -235,7 +235,7 @@ func (c *Channel) flush() error {
for {
select {
case msg := <-c.memoryMsgChan:
err := writeMessageToBackend(&msgBuf, msg, c.backend)
err := c.backend.WriteMsg(&msgBuf, msg)
if err != nil {
c.ctx.nsqd.logf("ERROR: failed to write message to backend - %s", err)
}
Expand All @@ -246,15 +246,15 @@ func (c *Channel) flush() error {

finish:
for _, msg := range c.inFlightMessages {
err := writeMessageToBackend(&msgBuf, msg, c.backend)
err := c.backend.WriteMsg(&msgBuf, msg)
if err != nil {
c.ctx.nsqd.logf("ERROR: failed to write message to backend - %s", err)
}
}

for _, item := range c.deferredMessages {
msg := item.Value.(*Message)
err := writeMessageToBackend(&msgBuf, msg, c.backend)
err := c.backend.WriteMsg(&msgBuf, msg)
if err != nil {
c.ctx.nsqd.logf("ERROR: failed to write message to backend - %s", err)
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func (c *Channel) put(m *Message) error {
case c.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, c.backend)
err := c.backend.WriteMsg(b, m)
bufferPoolPut(b)
if err != nil {
c.ctx.nsqd.logf("CHANNEL(%s) ERROR: failed to write message to backend - %s",
Expand Down
9 changes: 9 additions & 0 deletions nsqd/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,3 +630,12 @@ exit:
syncTicker.Stop()
d.exitSyncChan <- 1
}

func (d *diskQueue) WriteMsg(buf *bytes.Buffer, msg *Message) error {
buf.Reset()
_, err := msg.WriteTo(buf)
if err != nil {
return err
}
return d.Put(buf.Bytes())
}
8 changes: 8 additions & 0 deletions nsqd/dummy_backend_queue.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package nsqd

import (
"bytes"
)

type dummyBackendQueue struct {
readChan chan []byte
}
Expand Down Expand Up @@ -31,3 +35,7 @@ func (d *dummyBackendQueue) Depth() int64 {
func (d *dummyBackendQueue) Empty() error {
return nil
}

func (d *dummyBackendQueue) WriteMsg(buf *bytes.Buffer, msg *Message) error {
return nil
}
30 changes: 6 additions & 24 deletions nsqd/guid.go → nsqd/guid/guid.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package nsqd
package guid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this whole package would be going away in #625, so I wouldn't spin your wheels on it


// the core algorithm here was borrowed from:
// Blake Mizerany's `noeqd` https://github.com/bmizerany/noeqd
Expand All @@ -10,7 +10,6 @@ package nsqd
// behavior when sequences rollover for our specific implementation needs

import (
"encoding/hex"
"errors"
"time"
)
Expand All @@ -30,15 +29,15 @@ var ErrTimeBackwards = errors.New("time has gone backwards")
var ErrSequenceExpired = errors.New("sequence expired")
var ErrIDBackwards = errors.New("ID went backward")

type guid int64
type Guid int64

type guidFactory struct {
type GuidFactory struct {
sequence int64
lastTimestamp int64
lastID guid
lastID Guid
}

func (f *guidFactory) NewGUID(workerID int64) (guid, error) {
func (f *GuidFactory) NewGUID(workerID int64) (Guid, error) {
ts := time.Now().UnixNano() / 1e6

if ts < f.lastTimestamp {
Expand All @@ -56,7 +55,7 @@ func (f *guidFactory) NewGUID(workerID int64) (guid, error) {

f.lastTimestamp = ts

id := guid(((ts - twepoch) << timestampShift) |
id := Guid(((ts - twepoch) << timestampShift) |
(workerID << workerIDShift) |
f.sequence)

Expand All @@ -68,20 +67,3 @@ func (f *guidFactory) NewGUID(workerID int64) (guid, error) {

return id, nil
}

func (g guid) Hex() MessageID {
var h MessageID
var b [8]byte

b[0] = byte(g >> 56)
b[1] = byte(g >> 48)
b[2] = byte(g >> 40)
b[3] = byte(g >> 32)
b[4] = byte(g >> 24)
b[5] = byte(g >> 16)
b[6] = byte(g >> 8)
b[7] = byte(g)

hex.Encode(h[:], b[:])
return h
}
33 changes: 20 additions & 13 deletions nsqd/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package nsqd
import (
"bytes"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"time"

"github.com/bitly/nsq/nsqd/guid"
)

const (
Expand All @@ -16,6 +19,23 @@ const (

type MessageID [MsgIDLength]byte

func NewMessageID(g guid.Guid) MessageID {
var h MessageID
var b [8]byte

b[0] = byte(g >> 56)
b[1] = byte(g >> 48)
b[2] = byte(g >> 40)
b[3] = byte(g >> 32)
b[4] = byte(g >> 24)
b[5] = byte(g >> 16)
b[6] = byte(g >> 8)
b[7] = byte(g)

hex.Encode(h[:], b[:])
return h
}

type Message struct {
ID MessageID
Body []byte
Expand Down Expand Up @@ -90,16 +110,3 @@ func decodeMessage(b []byte) (*Message, error) {

return &msg, nil
}

func writeMessageToBackend(buf *bytes.Buffer, msg *Message, bq BackendQueue) error {
buf.Reset()
_, err := msg.WriteTo(buf)
if err != nil {
return err
}
err = bq.Put(buf.Bytes())
if err != nil {
return err
}
return nil
}
5 changes: 3 additions & 2 deletions nsqd/guid_test.go → nsqd/message_guid_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nsqd

import (
"github.com/bitly/nsq/nsqd/guid"
"testing"
"unsafe"
)
Expand All @@ -23,12 +24,12 @@ func BenchmarkGUIDUnsafe(b *testing.B) {
}

func BenchmarkGUID(b *testing.B) {
factory := &guidFactory{}
factory := &guid.GuidFactory{}
for i := 0; i < b.N; i++ {
guid, err := factory.NewGUID(0)
if err != nil {
continue
}
guid.Hex()
NewMessageID(guid)
}
}
5 changes: 3 additions & 2 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/bitly/nsq/internal/statsd"
"github.com/bitly/nsq/internal/util"
"github.com/bitly/nsq/internal/version"
"github.com/bitly/nsq/nsqd/guid"
)

const (
Expand Down Expand Up @@ -519,7 +520,7 @@ func (n *NSQD) DeleteExistingTopic(topicName string) error {
}

func (n *NSQD) idPump() {
factory := &guidFactory{}
factory := &guid.GuidFactory{}
lastError := time.Unix(0, 0)
workerID := n.getOpts().ID
for {
Expand All @@ -535,7 +536,7 @@ func (n *NSQD) idPump() {
continue
}
select {
case n.idChan <- id.Hex():
case n.idChan <- NewMessageID(id):
case <-n.exitChan:
goto exit
}
Expand Down
4 changes: 2 additions & 2 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (t *Topic) put(m *Message) error {
case t.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
err := t.backend.WriteMsg(b, m)
bufferPoolPut(b)
if err != nil {
t.ctx.nsqd.logf(
Expand Down Expand Up @@ -373,7 +373,7 @@ func (t *Topic) flush() error {
for {
select {
case msg := <-t.memoryMsgChan:
err := writeMessageToBackend(&msgBuf, msg, t.backend)
err := t.backend.WriteMsg(&msgBuf, msg)
if err != nil {
t.ctx.nsqd.logf(
"ERROR: failed to write message to backend - %s", err)
Expand Down
14 changes: 8 additions & 6 deletions nsqd/topic_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nsqd

import (
"bytes"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -52,12 +53,13 @@ func TestGetChannel(t *testing.T) {

type errorBackendQueue struct{}

func (d *errorBackendQueue) Put([]byte) error { return errors.New("never gonna happen") }
func (d *errorBackendQueue) ReadChan() chan []byte { return nil }
func (d *errorBackendQueue) Close() error { return nil }
func (d *errorBackendQueue) Delete() error { return nil }
func (d *errorBackendQueue) Depth() int64 { return 0 }
func (d *errorBackendQueue) Empty() error { return nil }
func (d *errorBackendQueue) Put([]byte) error { return errors.New("never gonna happen") }
func (d *errorBackendQueue) ReadChan() chan []byte { return nil }
func (d *errorBackendQueue) Close() error { return nil }
func (d *errorBackendQueue) Delete() error { return nil }
func (d *errorBackendQueue) Depth() int64 { return 0 }
func (d *errorBackendQueue) Empty() error { return nil }
func (d *errorBackendQueue) WriteMsg(buf *bytes.Buffer, msg *Message) error { return nil }

func TestHealth(t *testing.T) {
opts := NewOptions()
Expand Down