From 93e3bbe683c79a693a48239f03bc29c59d84ae9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BE=AF=E5=B0=A7?= Date: Tue, 20 Jun 2023 13:50:39 +0800 Subject: [PATCH 1/7] fix: Fix the issue of deferred data persistence loss(Incompatible with old data storage methods) --- nsqd/channel.go | 7 ++++++ nsqd/message.go | 55 +++++++++++++++++++++++++++++++++++++++------ nsqd/protocol_v2.go | 7 +++++- 3 files changed, 61 insertions(+), 8 deletions(-) diff --git a/nsqd/channel.go b/nsqd/channel.go index 37e272c34..811f14170 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -240,8 +240,14 @@ finish: c.inFlightMutex.Unlock() c.deferredMutex.Lock() + ts := time.Now().UnixNano() for _, item := range c.deferredMessages { msg := item.Value.(*Message) + if item.Priority > ts { + msg.deferred = time.Duration(item.Priority - ts) + } else { + msg.deferred = 0 + } err := writeMessageToBackend(msg, c.backend) if err != nil { c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err) @@ -572,6 +578,7 @@ func (c *Channel) processDeferredQueue(t int64) bool { if err != nil { goto exit } + msg.deferred = 0 c.put(msg) } diff --git a/nsqd/message.go b/nsqd/message.go index 460cc3722..778471528 100644 --- a/nsqd/message.go +++ b/nsqd/message.go @@ -37,6 +37,39 @@ func NewMessage(id MessageID, body []byte) *Message { } func (m *Message) WriteTo(w io.Writer) (int64, error) { + var buf [18]byte + var total int64 + var expire int64 + + binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp)) + binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts)) + if m.deferred != 0 { + expire = time.Now().Add(m.deferred).UnixNano() + } + binary.BigEndian.PutUint64(buf[10:18], uint64(expire)) + + n, err := w.Write(buf[:]) + total += int64(n) + if err != nil { + return total, err + } + + n, err = w.Write(m.ID[:]) + total += int64(n) + if err != nil { + return total, err + } + + n, err = w.Write(m.Body) + total += int64(n) + if err != nil { + return total, err + } + + return total, nil +} + +func (m *Message) WriteToTCP(w io.Writer) (int64, error) { var buf [10]byte var total int64 @@ -66,16 +99,17 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) { // decodeMessage deserializes data (as []byte) and creates a new Message // -// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... -// | (int64) || || (hex string encoded in ASCII) || (binary) -// | 8-byte || || 16-byte || N-byte -// ------------------------------------------------------------------------------------------... -// nanosecond timestamp ^^ message ID message body +// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... +// | (int64) || || (int64) || (hex string encoded in ASCII) || (binary) +// | 8-byte || || 8-byte || 16-byte || N-byte +// ------------------------------------------------------------------------------------------------------------------... +// nanosecond timestamp ^^ nanosecond expire message ID message body // (uint16) // 2-byte // attempts func decodeMessage(b []byte) (*Message, error) { var msg Message + var expire int64 if len(b) < minValidMsgLength { return nil, fmt.Errorf("invalid message buffer size (%d)", len(b)) @@ -83,8 +117,15 @@ func decodeMessage(b []byte) (*Message, error) { msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8])) msg.Attempts = binary.BigEndian.Uint16(b[8:10]) - copy(msg.ID[:], b[10:10+MsgIDLength]) - msg.Body = b[10+MsgIDLength:] + expire = int64(binary.BigEndian.Uint64(b[10:18])) + if expire > 0 { + ts := time.Now().UnixNano() + if expire > ts { + msg.deferred = time.Duration(expire - ts) + } + } + copy(msg.ID[:], b[18:18+MsgIDLength]) + msg.Body = b[18+MsgIDLength:] return &msg, nil } diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 8ec422430..c767b7988 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -127,7 +127,7 @@ func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error { buf := bufferPoolGet() defer bufferPoolPut(buf) - _, err := msg.WriteTo(buf) + _, err := msg.WriteToTCP(buf) if err != nil { return err } @@ -310,6 +310,11 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } + if msg.deferred != 0 { + subChannel.StartDeferredTimeout(msg, msg.deferred) + continue + } + msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) From b225b9ed551dbb5be1967f22244b2b3e12737caa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BE=AF=E5=B0=A7?= Date: Tue, 20 Jun 2023 15:08:38 +0800 Subject: [PATCH 2/7] fix: Fix the issue of deferred data persistence loss(The reference PR 1137 mode is compatible with the original data) --- nsqd/message.go | 73 ++++++++++++++++++++------------------------- nsqd/protocol_v2.go | 3 +- 2 files changed, 34 insertions(+), 42 deletions(-) diff --git a/nsqd/message.go b/nsqd/message.go index 778471528..9119aa153 100644 --- a/nsqd/message.go +++ b/nsqd/message.go @@ -1,6 +1,7 @@ package nsqd import ( + "bytes" "encoding/binary" "fmt" "io" @@ -12,6 +13,8 @@ const ( minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts ) +var deferMsgMagicFlag = []byte("#DEFER_MSG#") + type MessageID [MsgIDLength]byte type Message struct { @@ -37,61 +40,46 @@ func NewMessage(id MessageID, body []byte) *Message { } func (m *Message) WriteTo(w io.Writer) (int64, error) { - var buf [18]byte + var buf [10]byte var total int64 - var expire int64 binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp)) binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts)) - if m.deferred != 0 { - expire = time.Now().Add(m.deferred).UnixNano() - } - binary.BigEndian.PutUint64(buf[10:18], uint64(expire)) - n, err := w.Write(buf[:]) + n, err := w.Write(buf[:]) // 前8字节写入时间戳信息,8-10字节写入重试次数信息 total += int64(n) if err != nil { return total, err } - n, err = w.Write(m.ID[:]) + n, err = w.Write(m.ID[:]) // 10-26字节写入消息ID信息 total += int64(n) if err != nil { return total, err } - n, err = w.Write(m.Body) + n, err = w.Write(m.Body) // 26字节后写入消息体信息 total += int64(n) if err != nil { return total, err } - return total, nil -} - -func (m *Message) WriteToTCP(w io.Writer) (int64, error) { - var buf [10]byte - var total int64 - - binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp)) - binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts)) - - n, err := w.Write(buf[:]) - total += int64(n) - if err != nil { - return total, err - } + if m.deferred > 0 { + n, err = w.Write(deferMsgMagicFlag) + total += int64(n) + if err != nil { + return total, err + } - n, err = w.Write(m.ID[:]) - total += int64(n) - if err != nil { - return total, err - } + var deferBuf [8]byte + var expire = time.Now().Add(m.deferred).UnixNano() + binary.BigEndian.PutUint64(deferBuf[:8], uint64(expire)) - n, err = w.Write(m.Body) - total += int64(n) - if err != nil { - return total, err + n, err := w.Write(deferBuf[:]) + total += int64(n) + if err != nil { + return total, err + } } return total, nil @@ -99,11 +87,11 @@ func (m *Message) WriteToTCP(w io.Writer) (int64, error) { // decodeMessage deserializes data (as []byte) and creates a new Message // -// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... -// | (int64) || || (int64) || (hex string encoded in ASCII) || (binary) -// | 8-byte || || 8-byte || 16-byte || N-byte +// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...[x][x][x][x][x][x][x][x] +// | (int64) || || (hex string encoded in ASCII) || (binary) || (int64) +// | 8-byte || || 16-byte || N-byte || 8-byte // ------------------------------------------------------------------------------------------------------------------... -// nanosecond timestamp ^^ nanosecond expire message ID message body +// nanosecond timestamp ^^ message ID message body nanosecond expire // (uint16) // 2-byte // attempts @@ -117,15 +105,18 @@ func decodeMessage(b []byte) (*Message, error) { msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8])) msg.Attempts = binary.BigEndian.Uint16(b[8:10]) - expire = int64(binary.BigEndian.Uint64(b[10:18])) - if expire > 0 { + copy(msg.ID[:], b[10:10+MsgIDLength]) + + if bytes.Equal(b[len(b)-8-len(deferMsgMagicFlag):len(b)-8], deferMsgMagicFlag) { + expire = int64(binary.BigEndian.Uint64(b[len(b)-8:])) ts := time.Now().UnixNano() if expire > ts { msg.deferred = time.Duration(expire - ts) } + msg.Body = b[10+MsgIDLength : len(b)-8-len(deferMsgMagicFlag)] + } else { + msg.Body = b[10+MsgIDLength:] } - copy(msg.ID[:], b[18:18+MsgIDLength]) - msg.Body = b[18+MsgIDLength:] return &msg, nil } diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index c767b7988..1672848c8 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -127,7 +127,8 @@ func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error { buf := bufferPoolGet() defer bufferPoolPut(buf) - _, err := msg.WriteToTCP(buf) + msg.deferred = 0 + _, err := msg.WriteTo(buf) if err != nil { return err } From 2abf2016bbe94905a8a6f6f989978ad7b5ba403d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BE=AF=E5=B0=A7?= Date: Tue, 20 Jun 2023 15:16:32 +0800 Subject: [PATCH 3/7] style: Delete Chinese comments --- nsqd/message.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nsqd/message.go b/nsqd/message.go index 9119aa153..79cfb0fd9 100644 --- a/nsqd/message.go +++ b/nsqd/message.go @@ -46,19 +46,19 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) { binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp)) binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts)) - n, err := w.Write(buf[:]) // 前8字节写入时间戳信息,8-10字节写入重试次数信息 + n, err := w.Write(buf[:]) total += int64(n) if err != nil { return total, err } - n, err = w.Write(m.ID[:]) // 10-26字节写入消息ID信息 + n, err = w.Write(m.ID[:]) total += int64(n) if err != nil { return total, err } - n, err = w.Write(m.Body) // 26字节后写入消息体信息 + n, err = w.Write(m.Body) total += int64(n) if err != nil { return total, err From 1059e21412de3b6836241c0dc0a77aebfe2d31d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BE=AF=E5=B0=A7?= Date: Tue, 20 Jun 2023 15:18:59 +0800 Subject: [PATCH 4/7] style: update comments --- nsqd/message.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nsqd/message.go b/nsqd/message.go index 79cfb0fd9..deef2f5cd 100644 --- a/nsqd/message.go +++ b/nsqd/message.go @@ -87,11 +87,11 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) { // decodeMessage deserializes data (as []byte) and creates a new Message // -// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...[x][x][x][x][x][x][x][x] -// | (int64) || || (hex string encoded in ASCII) || (binary) || (int64) -// | 8-byte || || 16-byte || N-byte || 8-byte -// ------------------------------------------------------------------------------------------------------------------... -// nanosecond timestamp ^^ message ID message body nanosecond expire +// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... [x][x][x][x][x][x][x][x] +// | (int64) || || (hex string encoded in ASCII) || (binary) || (int64) +// | 8-byte || || 16-byte || N-byte || 8-byte +// ------------------------------------------------------------------------------------------... ------------------------ +// nanosecond timestamp ^^ message ID message body nanosecond expire // (uint16) // 2-byte // attempts From 82b47ab2dd27af622e6327f81b1042074d3c3cf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BE=AF=E5=B0=A7?= Date: Tue, 20 Jun 2023 15:30:13 +0800 Subject: [PATCH 5/7] style: The expire variable is declared when in use --- nsqd/message.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nsqd/message.go b/nsqd/message.go index deef2f5cd..dcf276a80 100644 --- a/nsqd/message.go +++ b/nsqd/message.go @@ -97,7 +97,6 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) { // attempts func decodeMessage(b []byte) (*Message, error) { var msg Message - var expire int64 if len(b) < minValidMsgLength { return nil, fmt.Errorf("invalid message buffer size (%d)", len(b)) @@ -108,7 +107,7 @@ func decodeMessage(b []byte) (*Message, error) { copy(msg.ID[:], b[10:10+MsgIDLength]) if bytes.Equal(b[len(b)-8-len(deferMsgMagicFlag):len(b)-8], deferMsgMagicFlag) { - expire = int64(binary.BigEndian.Uint64(b[len(b)-8:])) + expire := int64(binary.BigEndian.Uint64(b[len(b)-8:])) ts := time.Now().UnixNano() if expire > ts { msg.deferred = time.Duration(expire - ts) From 32941dd30ebf92ce69794bd67e1ad73f9658dee2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BE=AF=E5=B0=A7?= Date: Sat, 27 Apr 2024 14:40:42 +0800 Subject: [PATCH 6/7] fix: Fix data race issues --- nsqd/channel.go | 7 ------- nsqd/http.go | 1 + nsqd/message.go | 15 +++++++-------- nsqd/protocol_v2.go | 2 +- nsqd/topic.go | 19 ++++++++----------- 5 files changed, 17 insertions(+), 27 deletions(-) diff --git a/nsqd/channel.go b/nsqd/channel.go index 811f14170..37e272c34 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -240,14 +240,8 @@ finish: c.inFlightMutex.Unlock() c.deferredMutex.Lock() - ts := time.Now().UnixNano() for _, item := range c.deferredMessages { msg := item.Value.(*Message) - if item.Priority > ts { - msg.deferred = time.Duration(item.Priority - ts) - } else { - msg.deferred = 0 - } err := writeMessageToBackend(msg, c.backend) if err != nil { c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err) @@ -578,7 +572,6 @@ func (c *Channel) processDeferredQueue(t int64) bool { if err != nil { goto exit } - msg.deferred = 0 c.put(msg) } diff --git a/nsqd/http.go b/nsqd/http.go index c67424187..e2a918b29 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -248,6 +248,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout msg := NewMessage(topic.GenerateID(), body) msg.deferred = deferred + msg.deadline = time.Now().UnixNano() + int64(deferred) err = topic.PutMessage(msg) if err != nil { return nil, http_api.Err{503, "EXITING"} diff --git a/nsqd/message.go b/nsqd/message.go index dcf276a80..eda2fcb53 100644 --- a/nsqd/message.go +++ b/nsqd/message.go @@ -29,6 +29,7 @@ type Message struct { pri int64 index int deferred time.Duration + deadline int64 } func NewMessage(id MessageID, body []byte) *Message { @@ -64,7 +65,7 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) { return total, err } - if m.deferred > 0 { + if m.deadline > time.Now().UnixNano() { n, err = w.Write(deferMsgMagicFlag) total += int64(n) if err != nil { @@ -72,8 +73,7 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) { } var deferBuf [8]byte - var expire = time.Now().Add(m.deferred).UnixNano() - binary.BigEndian.PutUint64(deferBuf[:8], uint64(expire)) + binary.BigEndian.PutUint64(deferBuf[:8], uint64(m.deadline)) n, err := w.Write(deferBuf[:]) total += int64(n) @@ -91,7 +91,7 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) { // | (int64) || || (hex string encoded in ASCII) || (binary) || (int64) // | 8-byte || || 16-byte || N-byte || 8-byte // ------------------------------------------------------------------------------------------... ------------------------ -// nanosecond timestamp ^^ message ID message body nanosecond expire +// nanosecond timestamp ^^ message ID message body nanosecond deadline // (uint16) // 2-byte // attempts @@ -107,10 +107,9 @@ func decodeMessage(b []byte) (*Message, error) { copy(msg.ID[:], b[10:10+MsgIDLength]) if bytes.Equal(b[len(b)-8-len(deferMsgMagicFlag):len(b)-8], deferMsgMagicFlag) { - expire := int64(binary.BigEndian.Uint64(b[len(b)-8:])) - ts := time.Now().UnixNano() - if expire > ts { - msg.deferred = time.Duration(expire - ts) + msg.deadline = int64(binary.BigEndian.Uint64(b[len(b)-8:])) + if deferred := msg.deadline - time.Now().UnixNano(); deferred > 0 { + msg.deferred = time.Duration(deferred) } msg.Body = b[10+MsgIDLength : len(b)-8-len(deferMsgMagicFlag)] } else { diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 1672848c8..384bfdf46 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -127,7 +127,6 @@ func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error { buf := bufferPoolGet() defer bufferPoolPut(buf) - msg.deferred = 0 _, err := msg.WriteTo(buf) if err != nil { return err @@ -925,6 +924,7 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { topic := p.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) msg.deferred = timeoutDuration + msg.deadline = time.Now().UnixNano() + int64(timeoutDuration) err = topic.PutMessage(msg) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error()) diff --git a/nsqd/topic.go b/nsqd/topic.go index 4834c0102..ca86eb056 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -315,18 +315,15 @@ func (t *Topic) messagePump() { goto exit } - for i, channel := range chans { - chanMsg := msg + ts := time.Now().UnixNano() + for _, channel := range chans { // copy the message because each channel - // needs a unique instance but... - // fastpath to avoid copy if its the first channel - // (the topic already created the first copy) - if i > 0 { - chanMsg = NewMessage(msg.ID, msg.Body) - chanMsg.Timestamp = msg.Timestamp - chanMsg.deferred = msg.deferred - } - if chanMsg.deferred != 0 { + // needs a unique instance + chanMsg := NewMessage(msg.ID, msg.Body) + chanMsg.Timestamp = msg.Timestamp + chanMsg.deadline = msg.deadline + if deferred := time.Duration(chanMsg.deadline - ts); deferred > 0 { + chanMsg.deferred = deferred channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } From a653e54ac655b675d5428b6baa2314e407b66ddd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BE=AF=E5=B0=A7?= Date: Sat, 27 Apr 2024 15:06:42 +0800 Subject: [PATCH 7/7] style: Optimize code --- nsqd/topic.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/nsqd/topic.go b/nsqd/topic.go index ca86eb056..ec26bc2c0 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -315,14 +315,19 @@ func (t *Topic) messagePump() { goto exit } - ts := time.Now().UnixNano() - for _, channel := range chans { + deferred := time.Duration(msg.deadline - time.Now().UnixNano()) + for i, channel := range chans { + chanMsg := msg // copy the message because each channel - // needs a unique instance - chanMsg := NewMessage(msg.ID, msg.Body) - chanMsg.Timestamp = msg.Timestamp - chanMsg.deadline = msg.deadline - if deferred := time.Duration(chanMsg.deadline - ts); deferred > 0 { + // needs a unique instance but... + // fastpath to avoid copy if its the first channel + // (the topic already created the first copy) + if i > 0 { + chanMsg = NewMessage(msg.ID, msg.Body) + chanMsg.Timestamp = msg.Timestamp + chanMsg.deadline = msg.deadline + } + if deferred > 0 { chanMsg.deferred = deferred channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue