From bc3b3a143658e47971db405a6c97ff392ef27734 Mon Sep 17 00:00:00 2001 From: hulk Date: Wed, 10 Jul 2024 17:38:44 +0800 Subject: [PATCH] Add support of parsing job body format according to the job id version (#225) Currently, we store the job payload in Redis without any encoding, so it's possible to extend more fields for a job like attributes, etc. To mitigate this issue, we introduce the version prefix for the job id to identify different job payload formats. Use the length to tell if it's a legacy id or not since the ulid's id is a fixed-length string(26 chars). That said, we will return the value as the job body directly if the length is 26. Otherwise, decode it in JSON format. To avoid introducing breaking changes during the upgrade stage, we add a new HTTP header: Enable-Job-Version to enable this feature. The new job format would be enabled only if the header `Enable-Job-Version: yes` was explicitly passed in the publish request. So that we can smoothly support the new job payload format by upgrading the server first, and enabling it on the client side since then. --- engine/job.go | 70 +---------------------------------- engine/job_test.go | 27 -------------- engine/redis/pool.go | 65 ++++++++++++++++++++++---------- engine/redis/pool_test.go | 19 ++++++++++ server/handlers/queue.go | 23 +++++++++++- server/handlers/queue_test.go | 37 ++++++++++++++++++ uuid/uuid.go | 44 ++++++++++++++++------ uuid/uuid_test.go | 27 ++++++++++++++ 8 files changed, 184 insertions(+), 128 deletions(-) delete mode 100644 engine/job_test.go create mode 100644 uuid/uuid_test.go diff --git a/engine/job.go b/engine/job.go index 0a11f005..eb35c351 100644 --- a/engine/job.go +++ b/engine/job.go @@ -2,9 +2,7 @@ package engine import ( "encoding" - "encoding/binary" "encoding/json" - "errors" "github.com/bitleak/lmstfy/uuid" ) @@ -20,8 +18,6 @@ type Job interface { ElapsedMS() int64 Attributes() map[string]string - encoding.BinaryMarshaler - encoding.BinaryUnmarshaler encoding.TextMarshaler } @@ -43,7 +39,7 @@ type jobImpl struct { // a tombstone record in that AOF. func NewJob(namespace, queue string, body []byte, ttl, delay uint32, tries uint16, jobID string) Job { if jobID == "" { - jobID = uuid.GenUniqueJobIDWithDelay(delay) + jobID = uuid.GenJobIDWithVersion(0, delay) } return &jobImpl{ namespace: namespace, @@ -110,70 +106,6 @@ func (j *jobImpl) Attributes() map[string]string { return j.attributes } -// Marshal into binary of the format: -// {total len: 4 bytes}{ns len: 1 byte}{ns}{queue len: 1 byte}{queue}{id: 16 bytes}{ttl: 4 bytes}{tries: 2 byte}{job data} -func (j *jobImpl) MarshalBinary() (data []byte, err error) { - nsLen := len(j.namespace) - qLen := len(j.queue) - bodyLen := len(j.body) - totalSize := 1 + nsLen + 1 + qLen + 16 + 4 + 2 + bodyLen - buf := make([]byte, totalSize+4) - binary.LittleEndian.PutUint32(buf, uint32(totalSize)) - - nsOffset := 4 + 1 - qOffset := nsOffset + nsLen + 1 - idOffset := qOffset + qLen - ttlOffset := idOffset + 16 - triesOffset := ttlOffset + 4 - jobOffset := triesOffset + 2 - - buf[4] = uint8(nsLen) - copy(buf[nsOffset:], j.namespace) - buf[qOffset-1] = uint8(qLen) - copy(buf[qOffset:], j.queue) - binID := uuid.UniqueIDToBinary(j.id) - copy(buf[idOffset:], binID[:]) // binary ID is 16 byte-long - binary.LittleEndian.PutUint32(buf[ttlOffset:], j.ttl) - binary.LittleEndian.PutUint16(buf[triesOffset:], j.tries) - copy(buf[jobOffset:], j.body) - return buf, nil -} - -func (j *jobImpl) UnmarshalBinary(data []byte) error { - if len(data) <= 4 { - return errors.New("data too small") - } - totalSize := binary.LittleEndian.Uint32(data[0:]) - if len(data) != int(totalSize)+4 { - return errors.New("corrupted data") - } - - nsLen := int(data[4]) - nsOffset := 4 + 1 - j.namespace = string(data[nsOffset : nsOffset+nsLen]) - qOffset := nsOffset + nsLen + 1 - qLen := int(data[qOffset-1]) - j.queue = string(data[qOffset : qOffset+qLen]) - idOffset := qOffset + qLen - var binaryID [16]byte - copy(binaryID[:], data[idOffset:idOffset+16]) - j.id = uuid.BinaryToUniqueID(binaryID) - ttlOffset := idOffset + 16 - j.ttl = binary.LittleEndian.Uint32(data[ttlOffset:]) - triesOffset := ttlOffset + 4 - j.tries = binary.LittleEndian.Uint16(data[triesOffset:]) - jobOffset := triesOffset + 2 - j.body = make([]byte, len(data)-jobOffset) - copy(j.body, data[jobOffset:]) - - delay, err := uuid.ExtractDelaySecondFromUniqueID(j.id) - if err != nil { - return err - } - j.delay = delay - return nil -} - func (j *jobImpl) MarshalText() (text []byte, err error) { var job struct { Namespace string `json:"namespace"` diff --git a/engine/job_test.go b/engine/job_test.go deleted file mode 100644 index e9eba688..00000000 --- a/engine/job_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package engine - -import ( - "bytes" - "testing" -) - -func TestJobImpl_Marshal(t *testing.T) { - j := NewJob("ns-1", "q-1", []byte("hello data"), 20, 10, 1, "") - bin, err := j.MarshalBinary() - if err != nil { - t.Fatal("Failed to marshal") - } - var j2 jobImpl - err = j2.UnmarshalBinary(bin) - if err != nil { - t.Fatalf("Failed to unmarshal: %s", err) - } - if j.Namespace() != j2.namespace || - j.Queue() != j2.queue || - j.TTL() != j2.ttl || - j.Delay() != j2.delay || - j.Tries() != j2.tries || - !bytes.Equal(j.Body(), j2.body) { - t.Fatal("Data mismatched") - } -} diff --git a/engine/redis/pool.go b/engine/redis/pool.go index 25ec1c9f..095e9bc2 100644 --- a/engine/redis/pool.go +++ b/engine/redis/pool.go @@ -1,14 +1,20 @@ package redis import ( + "encoding/json" "errors" "time" go_redis "github.com/go-redis/redis/v8" "github.com/bitleak/lmstfy/engine" + "github.com/bitleak/lmstfy/uuid" ) +type JobPayload struct { + Body []byte `json:"body"` +} + // Pool stores all the jobs' data. this is a global singleton per engine // note: this `Pool` is NOT the same terminology as the EnginePool type Pool struct { @@ -33,14 +39,24 @@ func PoolJobKeyPrefix(namespace, queue string) string { return join(PoolPrefix, namespace, queue) } -func (p *Pool) Add(j engine.Job) error { - body := j.Body() +func (p *Pool) Add(j engine.Job) (err error) { metrics.poolAddJobs.WithLabelValues(p.redis.Name).Inc() + + // For the version 0(legacy) jobID, the payload is the body directly, + // for the version 1 jobID, the payload is a JSON string contains the body. + payload := j.Body() + if uuid.ExtractJobIDVersion(j.ID()) != 0 { + payload, err = json.Marshal(JobPayload{Body: j.Body()}) + if err != nil { + return err + } + } + // SetNX return OK(true) if key didn't exist before. - ok, err := p.redis.Conn.SetNX(dummyCtx, PoolJobKey(j), body, time.Duration(j.TTL())*time.Second).Result() + ok, err := p.redis.Conn.SetNX(dummyCtx, PoolJobKey(j), payload, time.Duration(j.TTL())*time.Second).Result() if err != nil { // Just retry once. - ok, err = p.redis.Conn.SetNX(dummyCtx, PoolJobKey(j), body, time.Duration(j.TTL())*time.Second).Result() + ok, err = p.redis.Conn.SetNX(dummyCtx, PoolJobKey(j), payload, time.Duration(j.TTL())*time.Second).Result() } if err != nil { return err @@ -57,24 +73,35 @@ func (p *Pool) Get(namespace, queue, jobID string) (body []byte, ttlSecond uint3 getCmd := pipeline.Get(dummyCtx, jobKey) ttlCmd := pipeline.TTL(dummyCtx, jobKey) _, err = pipeline.Exec(dummyCtx) - switch err { - case nil: - val := getCmd.Val() - ttl := int64(ttlCmd.Val().Seconds()) - if ttl < 0 { - // Use `0` to identify indefinite TTL, NOTE: in redis ttl=0 is possible when - // the key is not recycled fast enough. but here is okay we use `0` to identify - // indefinite TTL, because we issue GET cmd before TTL cmd, so the ttl must be > 0, - // OR GET cmd would fail. - ttl = 0 + if err != nil { + if errors.Is(err, go_redis.Nil) { + return nil, 0, engine.ErrNotFound } - metrics.poolGetJobs.WithLabelValues(p.redis.Name).Inc() - return []byte(val), uint32(ttl), nil - case go_redis.Nil: - return nil, 0, engine.ErrNotFound - default: return nil, 0, err } + + val := []byte(getCmd.Val()) + ttl := int64(ttlCmd.Val().Seconds()) + if ttl < 0 { + // Use `0` to identify indefinite TTL, NOTE: in redis ttl=0 is possible when + // the key is not recycled fast enough. but here is okay we use `0` to identify + // indefinite TTL, because we issue GET cmd before TTL cmd, so the ttl must be > 0, + // OR GET cmd would fail. + ttl = 0 + } + metrics.poolGetJobs.WithLabelValues(p.redis.Name).Inc() + if uuid.ExtractJobIDVersion(jobID) == 0 { + // For the version 0(legacy) jobID, the val only contains the body, + // so we need to return the val as body directly. + return val, uint32(ttl), nil + } + // For the version 1 jobID, the value is encoded as a JSON string, + // need to unmarshal it before return. + var payload JobPayload + if err := json.Unmarshal(val, &payload); err != nil { + return nil, 0, err + } + return payload.Body, uint32(ttl), nil } func (p *Pool) Delete(namespace, queue, jobID string) error { diff --git a/engine/redis/pool_test.go b/engine/redis/pool_test.go index eaf22079..6e25a5eb 100644 --- a/engine/redis/pool_test.go +++ b/engine/redis/pool_test.go @@ -6,8 +6,10 @@ import ( "time" go_redis "github.com/go-redis/redis/v8" + "github.com/stretchr/testify/require" "github.com/bitleak/lmstfy/engine" + "github.com/bitleak/lmstfy/uuid" ) func TestPool_Add(t *testing.T) { @@ -55,3 +57,20 @@ func TestPool_Get(t *testing.T) { t.Fatalf("Expected TTL is around 50 seconds") } } + +func TestPool_GetCompatibility(t *testing.T) { + p := NewPool(R) + + t.Run("test job with different versions should get correct body", func(t *testing.T) { + for i := 0; i <= uuid.JobIDV1; i++ { + jobID := uuid.GenJobIDWithVersion(i, 123) + job := engine.NewJob("ns-pool", "q5", []byte("hello msg 5"), 50, 0, 1, jobID) + p.Add(job) + body, ttl, err := p.Get(job.Namespace(), job.Queue(), job.ID()) + require.NoError(t, err) + require.Equal(t, []byte("hello msg 5"), body) + require.InDelta(t, 50, ttl, 5) + require.Equal(t, i, uuid.ExtractJobIDVersion(job.ID())) + } + }) +} diff --git a/server/handlers/queue.go b/server/handlers/queue.go index 0e3e3810..083500c7 100644 --- a/server/handlers/queue.go +++ b/server/handlers/queue.go @@ -10,6 +10,7 @@ import ( "github.com/sirupsen/logrus" "github.com/bitleak/lmstfy/engine" + "github.com/bitleak/lmstfy/uuid" ) const ( @@ -31,6 +32,8 @@ func Publish(c *gin.Context) { queue := c.Param("queue") jobID := c.Param("job_id") + enabledJobVersion := strings.ToUpper(c.GetHeader("Enable-Job-Version")) == "YES" + if jobID != "" { // delete job whatever other publish parameters if err := e.Delete(namespace, queue, jobID); err != nil { @@ -85,7 +88,14 @@ func Publish(c *gin.Context) { c.JSON(http.StatusRequestEntityTooLarge, gin.H{"error": "body too large"}) return } - job := engine.NewJob(namespace, queue, body, uint32(ttlSecond), uint32(delaySecond), uint16(tries), "") + + if enabledJobVersion { + jobID = uuid.GenJobIDWithVersion(uuid.JobIDV1, uint32(delaySecond)) + } else { + // use the legacy jobID if the version is not enabled + jobID = uuid.GenJobIDWithVersion(0, uint32(delaySecond)) + } + job := engine.NewJob(namespace, queue, body, uint32(ttlSecond), uint32(delaySecond), uint16(tries), jobID) jobID, err = e.Publish(job) if err != nil { logger.WithFields(logrus.Fields{ @@ -122,6 +132,8 @@ func PublishBulk(c *gin.Context) { namespace := c.Param("namespace") queue := c.Param("queue") + enabledJobVersion := strings.ToUpper(c.GetHeader("Enable-Job-Version")) == "YES" + delaySecondStr := c.DefaultQuery("delay", DefaultDelay) delaySecond, err := strconv.ParseUint(delaySecondStr, 10, 32) if err != nil { @@ -180,7 +192,14 @@ func PublishBulk(c *gin.Context) { jobIDs := make([]string, 0) for _, job := range jobs { - j := engine.NewJob(namespace, queue, job, uint32(ttlSecond), uint32(delaySecond), uint16(tries), "") + var jobID string + if enabledJobVersion { + jobID = uuid.GenJobIDWithVersion(uuid.JobIDV1, uint32(delaySecond)) + } else { + // use the legacy jobID if the version is not enabled + jobID = uuid.GenJobIDWithVersion(0, uint32(delaySecond)) + } + j := engine.NewJob(namespace, queue, job, uint32(ttlSecond), uint32(delaySecond), uint16(tries), jobID) jobID, err := e.Publish(j) if err != nil { logger.WithFields(logrus.Fields{ diff --git a/server/handlers/queue_test.go b/server/handlers/queue_test.go index e12d5ba8..afb17dd1 100644 --- a/server/handlers/queue_test.go +++ b/server/handlers/queue_test.go @@ -12,9 +12,11 @@ import ( "time" "github.com/magiconair/properties/assert" + "github.com/stretchr/testify/require" "github.com/bitleak/lmstfy/engine" "github.com/bitleak/lmstfy/server/handlers" + "github.com/bitleak/lmstfy/uuid" ) func TestPublish(t *testing.T) { @@ -543,6 +545,41 @@ func TestPublishBulk(t *testing.T) { } } +func TestPublish_WithJobVersion(t *testing.T) { + for _, enable := range []string{"YES", "NO"} { + query := url.Values{} + query.Add("delay", "0") + query.Add("ttl", "10") + query.Add("tries", "1") + targetUrl := fmt.Sprintf("http://localhost/api/ns/q18?%s", query.Encode()) + body := strings.NewReader("hello job version") + req, err := http.NewRequest("PUT", targetUrl, body) + req.Header.Add("Enable-Job-Version", enable) + require.NoError(t, err, "Failed to create request") + + c, e, resp := ginTest(req) + e.Use(handlers.ValidateParams, handlers.SetupQueueEngine) + e.PUT("/api/:namespace/:queue", handlers.Publish) + e.HandleContext(c) + + require.Equal(t, http.StatusCreated, resp.Code, "Failed to publish") + var payload struct { + JobID string `json:"job_id"` + } + require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &payload)) + expectedVersion := 0 + if enable == "YES" { + expectedVersion = uuid.JobIDV1 + } + require.Equal(t, expectedVersion, uuid.ExtractJobIDVersion(payload.JobID)) + + // Consume should also return the correct version and job body + bytes, jobID := consumeTestJob("ns", "q18", 10, 3) + require.Equal(t, expectedVersion, uuid.ExtractJobIDVersion(jobID)) + require.Equal(t, "hello job version", string(bytes)) + } +} + func publishTestJob(ns, q string, delay, ttl uint32) (body []byte, jobID string) { e := engine.GetEngine("") body = make([]byte, 10) diff --git a/uuid/uuid.go b/uuid/uuid.go index 72248b07..f6ccc196 100644 --- a/uuid/uuid.go +++ b/uuid/uuid.go @@ -3,6 +3,7 @@ package uuid import ( "encoding/binary" "errors" + "fmt" "math/rand" "sync" "time" @@ -10,6 +11,8 @@ import ( "github.com/oklog/ulid" ) +const JobIDV1 = 1 + // Use pool to avoid concurrent access for rand.Source var entropyPool = sync.Pool{ New: func() interface{} { @@ -26,34 +29,38 @@ func GenUniqueID() string { return id.String() } +// GenJobIDWithVersion generates a job ID with version prefix and delaySecond. +// For the legacy version 0 job ID, the version prefix is not included, +// we use the version prefix to distinguish different job payload format. +// // Use the last four bytes of the 16-byte's ULID to store the delaySecond. // The last fours bytes was some random value in ULID, so changing that value won't // affect anything except randomness. -func GenUniqueJobIDWithDelay(delaySecond uint32) string { +func GenJobIDWithVersion(version int, delaySecond uint32) string { entropy := entropyPool.Get().(*rand.Rand) defer entropyPool.Put(entropy) id := ulid.MustNew(ulid.Now(), entropy) // Encode the delayHour in littleEndian and store at the last four bytes binary.LittleEndian.PutUint32(id[len(id)-4:], delaySecond) - return id.String() -} - -func UniqueIDToBinary(id string) [16]byte { - return ulid.MustParse(id) -} - -func BinaryToUniqueID(bin [16]byte) string { - return ulid.ULID(bin).String() + // legacy version is 0, it doesn't include version prefix in the id + if version == 0 { + return id.String() + } + if version < 0 || version > 9 { + version = JobIDV1 + } + return fmt.Sprintf("%d%s", version, id.String()) } func ElapsedMilliSecondFromUniqueID(s string) (int64, error) { + s, _ = extractJobID(s) id, err := ulid.Parse(s) if err != nil { return 0, err } t := id.Time() now := ulid.Now() - if t < now { + if t <= now { return int64(now - t), nil } else { return 0, errors.New("id has a future timestamp") @@ -61,9 +68,24 @@ func ElapsedMilliSecondFromUniqueID(s string) (int64, error) { } func ExtractDelaySecondFromUniqueID(s string) (uint32, error) { + s, _ = extractJobID(s) id, err := ulid.Parse(s) if err != nil { return 0, err } return binary.LittleEndian.Uint32(id[len(id)-4:]), nil } + +func extractJobID(s string) (string, int) { + if len(s) <= ulid.EncodedSize { + return s, 0 + } + return s[1:], int(s[0] - '0') +} + +func ExtractJobIDVersion(s string) int { + if len(s) == ulid.EncodedSize { + return 0 + } + return int(s[0] - '0') +} diff --git a/uuid/uuid_test.go b/uuid/uuid_test.go new file mode 100644 index 00000000..e4cfb393 --- /dev/null +++ b/uuid/uuid_test.go @@ -0,0 +1,27 @@ +package uuid + +import ( + "testing" + "time" + + "github.com/oklog/ulid" + "github.com/stretchr/testify/require" +) + +func TestJobID(t *testing.T) { + jobID := GenJobIDWithVersion(JobIDV1, 10) + + id, version := extractJobID(jobID) + require.Equal(t, 1, version) + require.Equal(t, ulid.EncodedSize, len(id)) + + delay, err := ExtractDelaySecondFromUniqueID(jobID) + require.NoError(t, err) + require.Equal(t, uint32(10), delay) + + // Test elapsed time + time.Sleep(10 * time.Millisecond) + delayMilliseconds, err := ElapsedMilliSecondFromUniqueID(jobID) + require.NoError(t, err) + require.InDelta(t, 10, delayMilliseconds, 2) +}