Skip to content

Commit

Permalink
[INLONG-8637][SDK] Pool data request and batch request (#8638)
Browse files Browse the repository at this point in the history
Co-authored-by: gunli <[email protected]>
  • Loading branch information
gunli and gunli authored Aug 6, 2023
1 parent eaa1a04 commit ee932fa
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/binary"
"strconv"
"strings"
"sync"
"time"
"unsafe"

Expand All @@ -35,13 +36,28 @@ var (
byteOrder = binary.BigEndian
heartbeatRsp = []byte{0x00, 0x00, 0x00, 0x01, 0x01}
heartbeatRspLen = len(heartbeatRsp)
reqPool *sync.Pool
batchPool *sync.Pool
)

const (
msgTypeBatch uint8 = 5
msgTypeHeartbeat uint8 = 1
)

func init() {
reqPool = &sync.Pool{
New: func() interface{} {
return &sendDataReq{}
},
}
batchPool = &sync.Pool{
New: func() interface{} {
return &batchReq{}
},
}
}

type heartbeatReq struct {
}

Expand All @@ -68,6 +84,7 @@ func (h heartbeatReq) encode(buffer *bytes.Buffer) []byte {

type batchCallback func()
type batchReq struct {
pool *sync.Pool
workerID string
batchID string
groupID string
Expand Down Expand Up @@ -112,6 +129,10 @@ func (b *batchReq) done(err error) {
b.metrics.observeTime(errorCode, time.Since(b.batchTime).Milliseconds())
b.metrics.observeSize(errorCode, b.dataSize)
}

if b.pool != nil {
b.pool.Put(b)
}
}

func (b *batchReq) encode() []byte {
Expand Down Expand Up @@ -299,6 +320,7 @@ func (b *batchRsp) decode(input []byte) {
}

type sendDataReq struct {
pool *sync.Pool
ctx context.Context
msg Message
callback Callback
Expand Down Expand Up @@ -328,6 +350,10 @@ func (s *sendDataReq) done(err error, errCode string) {

s.metrics.incMessage(errCode)
}

if s.pool != nil {
s.pool.Put(s)
}
}

type closeReq struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,9 @@ func (w *worker) start() {
}

func (w *worker) doSendAsync(ctx context.Context, msg Message, callback Callback, flushImmediately bool) {
req := &sendDataReq{
req := reqPool.Get().(*sendDataReq)
*req = sendDataReq{
pool: reqPool,
ctx: ctx,
msg: msg,
callback: callback,
Expand Down Expand Up @@ -319,7 +321,9 @@ func (w *worker) handleSendData(req *sendDataReq) {
batch, ok := w.pendingBatches[req.msg.StreamID]
if !ok {
streamID := req.msg.StreamID
batch = &batchReq{
batch = batchPool.Get().(*batchReq)
*batch = batchReq{
pool: batchPool,
workerID: w.indexStr,
batchID: util.SnowFlakeID(),
groupID: w.options.GroupID,
Expand Down

0 comments on commit ee932fa

Please sign in to comment.