diff --git a/engine/job.go b/engine/job.go index 0a11f005..eb35c351 100644 --- a/engine/job.go +++ b/engine/job.go @@ -2,9 +2,7 @@ package engine import ( "encoding" - "encoding/binary" "encoding/json" - "errors" "github.com/bitleak/lmstfy/uuid" ) @@ -20,8 +18,6 @@ type Job interface { ElapsedMS() int64 Attributes() map[string]string - encoding.BinaryMarshaler - encoding.BinaryUnmarshaler encoding.TextMarshaler } @@ -43,7 +39,7 @@ type jobImpl struct { // a tombstone record in that AOF. func NewJob(namespace, queue string, body []byte, ttl, delay uint32, tries uint16, jobID string) Job { if jobID == "" { - jobID = uuid.GenUniqueJobIDWithDelay(delay) + jobID = uuid.GenJobIDWithVersion(0, delay) } return &jobImpl{ namespace: namespace, @@ -110,70 +106,6 @@ func (j *jobImpl) Attributes() map[string]string { return j.attributes } -// Marshal into binary of the format: -// {total len: 4 bytes}{ns len: 1 byte}{ns}{queue len: 1 byte}{queue}{id: 16 bytes}{ttl: 4 bytes}{tries: 2 byte}{job data} -func (j *jobImpl) MarshalBinary() (data []byte, err error) { - nsLen := len(j.namespace) - qLen := len(j.queue) - bodyLen := len(j.body) - totalSize := 1 + nsLen + 1 + qLen + 16 + 4 + 2 + bodyLen - buf := make([]byte, totalSize+4) - binary.LittleEndian.PutUint32(buf, uint32(totalSize)) - - nsOffset := 4 + 1 - qOffset := nsOffset + nsLen + 1 - idOffset := qOffset + qLen - ttlOffset := idOffset + 16 - triesOffset := ttlOffset + 4 - jobOffset := triesOffset + 2 - - buf[4] = uint8(nsLen) - copy(buf[nsOffset:], j.namespace) - buf[qOffset-1] = uint8(qLen) - copy(buf[qOffset:], j.queue) - binID := uuid.UniqueIDToBinary(j.id) - copy(buf[idOffset:], binID[:]) // binary ID is 16 byte-long - binary.LittleEndian.PutUint32(buf[ttlOffset:], j.ttl) - binary.LittleEndian.PutUint16(buf[triesOffset:], j.tries) - copy(buf[jobOffset:], j.body) - return buf, nil -} - -func (j *jobImpl) UnmarshalBinary(data []byte) error { - if len(data) <= 4 { - return errors.New("data too small") - } - totalSize := binary.LittleEndian.Uint32(data[0:]) - if len(data) != int(totalSize)+4 { - return errors.New("corrupted data") - } - - nsLen := int(data[4]) - nsOffset := 4 + 1 - j.namespace = string(data[nsOffset : nsOffset+nsLen]) - qOffset := nsOffset + nsLen + 1 - qLen := int(data[qOffset-1]) - j.queue = string(data[qOffset : qOffset+qLen]) - idOffset := qOffset + qLen - var binaryID [16]byte - copy(binaryID[:], data[idOffset:idOffset+16]) - j.id = uuid.BinaryToUniqueID(binaryID) - ttlOffset := idOffset + 16 - j.ttl = binary.LittleEndian.Uint32(data[ttlOffset:]) - triesOffset := ttlOffset + 4 - j.tries = binary.LittleEndian.Uint16(data[triesOffset:]) - jobOffset := triesOffset + 2 - j.body = make([]byte, len(data)-jobOffset) - copy(j.body, data[jobOffset:]) - - delay, err := uuid.ExtractDelaySecondFromUniqueID(j.id) - if err != nil { - return err - } - j.delay = delay - return nil -} - func (j *jobImpl) MarshalText() (text []byte, err error) { var job struct { Namespace string `json:"namespace"` diff --git a/engine/job_test.go b/engine/job_test.go deleted file mode 100644 index e9eba688..00000000 --- a/engine/job_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package engine - -import ( - "bytes" - "testing" -) - -func TestJobImpl_Marshal(t *testing.T) { - j := NewJob("ns-1", "q-1", []byte("hello data"), 20, 10, 1, "") - bin, err := j.MarshalBinary() - if err != nil { - t.Fatal("Failed to marshal") - } - var j2 jobImpl - err = j2.UnmarshalBinary(bin) - if err != nil { - t.Fatalf("Failed to unmarshal: %s", err) - } - if j.Namespace() != j2.namespace || - j.Queue() != j2.queue || - j.TTL() != j2.ttl || - j.Delay() != j2.delay || - j.Tries() != j2.tries || - !bytes.Equal(j.Body(), j2.body) { - t.Fatal("Data mismatched") - } -} diff --git a/engine/redis/pool.go b/engine/redis/pool.go index 25ec1c9f..095e9bc2 100644 --- a/engine/redis/pool.go +++ b/engine/redis/pool.go @@ -1,14 +1,20 @@ package redis import ( + "encoding/json" "errors" "time" go_redis "github.com/go-redis/redis/v8" "github.com/bitleak/lmstfy/engine" + "github.com/bitleak/lmstfy/uuid" ) +type JobPayload struct { + Body []byte `json:"body"` +} + // Pool stores all the jobs' data. this is a global singleton per engine // note: this `Pool` is NOT the same terminology as the EnginePool type Pool struct { @@ -33,14 +39,24 @@ func PoolJobKeyPrefix(namespace, queue string) string { return join(PoolPrefix, namespace, queue) } -func (p *Pool) Add(j engine.Job) error { - body := j.Body() +func (p *Pool) Add(j engine.Job) (err error) { metrics.poolAddJobs.WithLabelValues(p.redis.Name).Inc() + + // For the version 0(legacy) jobID, the payload is the body directly, + // for the version 1 jobID, the payload is a JSON string contains the body. + payload := j.Body() + if uuid.ExtractJobIDVersion(j.ID()) != 0 { + payload, err = json.Marshal(JobPayload{Body: j.Body()}) + if err != nil { + return err + } + } + // SetNX return OK(true) if key didn't exist before. - ok, err := p.redis.Conn.SetNX(dummyCtx, PoolJobKey(j), body, time.Duration(j.TTL())*time.Second).Result() + ok, err := p.redis.Conn.SetNX(dummyCtx, PoolJobKey(j), payload, time.Duration(j.TTL())*time.Second).Result() if err != nil { // Just retry once. - ok, err = p.redis.Conn.SetNX(dummyCtx, PoolJobKey(j), body, time.Duration(j.TTL())*time.Second).Result() + ok, err = p.redis.Conn.SetNX(dummyCtx, PoolJobKey(j), payload, time.Duration(j.TTL())*time.Second).Result() } if err != nil { return err @@ -57,24 +73,35 @@ func (p *Pool) Get(namespace, queue, jobID string) (body []byte, ttlSecond uint3 getCmd := pipeline.Get(dummyCtx, jobKey) ttlCmd := pipeline.TTL(dummyCtx, jobKey) _, err = pipeline.Exec(dummyCtx) - switch err { - case nil: - val := getCmd.Val() - ttl := int64(ttlCmd.Val().Seconds()) - if ttl < 0 { - // Use `0` to identify indefinite TTL, NOTE: in redis ttl=0 is possible when - // the key is not recycled fast enough. but here is okay we use `0` to identify - // indefinite TTL, because we issue GET cmd before TTL cmd, so the ttl must be > 0, - // OR GET cmd would fail. - ttl = 0 + if err != nil { + if errors.Is(err, go_redis.Nil) { + return nil, 0, engine.ErrNotFound } - metrics.poolGetJobs.WithLabelValues(p.redis.Name).Inc() - return []byte(val), uint32(ttl), nil - case go_redis.Nil: - return nil, 0, engine.ErrNotFound - default: return nil, 0, err } + + val := []byte(getCmd.Val()) + ttl := int64(ttlCmd.Val().Seconds()) + if ttl < 0 { + // Use `0` to identify indefinite TTL, NOTE: in redis ttl=0 is possible when + // the key is not recycled fast enough. but here is okay we use `0` to identify + // indefinite TTL, because we issue GET cmd before TTL cmd, so the ttl must be > 0, + // OR GET cmd would fail. + ttl = 0 + } + metrics.poolGetJobs.WithLabelValues(p.redis.Name).Inc() + if uuid.ExtractJobIDVersion(jobID) == 0 { + // For the version 0(legacy) jobID, the val only contains the body, + // so we need to return the val as body directly. + return val, uint32(ttl), nil + } + // For the version 1 jobID, the value is encoded as a JSON string, + // need to unmarshal it before return. + var payload JobPayload + if err := json.Unmarshal(val, &payload); err != nil { + return nil, 0, err + } + return payload.Body, uint32(ttl), nil } func (p *Pool) Delete(namespace, queue, jobID string) error { diff --git a/engine/redis/pool_test.go b/engine/redis/pool_test.go index eaf22079..6e25a5eb 100644 --- a/engine/redis/pool_test.go +++ b/engine/redis/pool_test.go @@ -6,8 +6,10 @@ import ( "time" go_redis "github.com/go-redis/redis/v8" + "github.com/stretchr/testify/require" "github.com/bitleak/lmstfy/engine" + "github.com/bitleak/lmstfy/uuid" ) func TestPool_Add(t *testing.T) { @@ -55,3 +57,20 @@ func TestPool_Get(t *testing.T) { t.Fatalf("Expected TTL is around 50 seconds") } } + +func TestPool_GetCompatibility(t *testing.T) { + p := NewPool(R) + + t.Run("test job with different versions should get correct body", func(t *testing.T) { + for i := 0; i <= uuid.JobIDV1; i++ { + jobID := uuid.GenJobIDWithVersion(i, 123) + job := engine.NewJob("ns-pool", "q5", []byte("hello msg 5"), 50, 0, 1, jobID) + p.Add(job) + body, ttl, err := p.Get(job.Namespace(), job.Queue(), job.ID()) + require.NoError(t, err) + require.Equal(t, []byte("hello msg 5"), body) + require.InDelta(t, 50, ttl, 5) + require.Equal(t, i, uuid.ExtractJobIDVersion(job.ID())) + } + }) +} diff --git a/server/handlers/queue.go b/server/handlers/queue.go index 0e3e3810..083500c7 100644 --- a/server/handlers/queue.go +++ b/server/handlers/queue.go @@ -10,6 +10,7 @@ import ( "github.com/sirupsen/logrus" "github.com/bitleak/lmstfy/engine" + "github.com/bitleak/lmstfy/uuid" ) const ( @@ -31,6 +32,8 @@ func Publish(c *gin.Context) { queue := c.Param("queue") jobID := c.Param("job_id") + enabledJobVersion := strings.ToUpper(c.GetHeader("Enable-Job-Version")) == "YES" + if jobID != "" { // delete job whatever other publish parameters if err := e.Delete(namespace, queue, jobID); err != nil { @@ -85,7 +88,14 @@ func Publish(c *gin.Context) { c.JSON(http.StatusRequestEntityTooLarge, gin.H{"error": "body too large"}) return } - job := engine.NewJob(namespace, queue, body, uint32(ttlSecond), uint32(delaySecond), uint16(tries), "") + + if enabledJobVersion { + jobID = uuid.GenJobIDWithVersion(uuid.JobIDV1, uint32(delaySecond)) + } else { + // use the legacy jobID if the version is not enabled + jobID = uuid.GenJobIDWithVersion(0, uint32(delaySecond)) + } + job := engine.NewJob(namespace, queue, body, uint32(ttlSecond), uint32(delaySecond), uint16(tries), jobID) jobID, err = e.Publish(job) if err != nil { logger.WithFields(logrus.Fields{ @@ -122,6 +132,8 @@ func PublishBulk(c *gin.Context) { namespace := c.Param("namespace") queue := c.Param("queue") + enabledJobVersion := strings.ToUpper(c.GetHeader("Enable-Job-Version")) == "YES" + delaySecondStr := c.DefaultQuery("delay", DefaultDelay) delaySecond, err := strconv.ParseUint(delaySecondStr, 10, 32) if err != nil { @@ -180,7 +192,14 @@ func PublishBulk(c *gin.Context) { jobIDs := make([]string, 0) for _, job := range jobs { - j := engine.NewJob(namespace, queue, job, uint32(ttlSecond), uint32(delaySecond), uint16(tries), "") + var jobID string + if enabledJobVersion { + jobID = uuid.GenJobIDWithVersion(uuid.JobIDV1, uint32(delaySecond)) + } else { + // use the legacy jobID if the version is not enabled + jobID = uuid.GenJobIDWithVersion(0, uint32(delaySecond)) + } + j := engine.NewJob(namespace, queue, job, uint32(ttlSecond), uint32(delaySecond), uint16(tries), jobID) jobID, err := e.Publish(j) if err != nil { logger.WithFields(logrus.Fields{ diff --git a/server/handlers/queue_test.go b/server/handlers/queue_test.go index e12d5ba8..afb17dd1 100644 --- a/server/handlers/queue_test.go +++ b/server/handlers/queue_test.go @@ -12,9 +12,11 @@ import ( "time" "github.com/magiconair/properties/assert" + "github.com/stretchr/testify/require" "github.com/bitleak/lmstfy/engine" "github.com/bitleak/lmstfy/server/handlers" + "github.com/bitleak/lmstfy/uuid" ) func TestPublish(t *testing.T) { @@ -543,6 +545,41 @@ func TestPublishBulk(t *testing.T) { } } +func TestPublish_WithJobVersion(t *testing.T) { + for _, enable := range []string{"YES", "NO"} { + query := url.Values{} + query.Add("delay", "0") + query.Add("ttl", "10") + query.Add("tries", "1") + targetUrl := fmt.Sprintf("http://localhost/api/ns/q18?%s", query.Encode()) + body := strings.NewReader("hello job version") + req, err := http.NewRequest("PUT", targetUrl, body) + req.Header.Add("Enable-Job-Version", enable) + require.NoError(t, err, "Failed to create request") + + c, e, resp := ginTest(req) + e.Use(handlers.ValidateParams, handlers.SetupQueueEngine) + e.PUT("/api/:namespace/:queue", handlers.Publish) + e.HandleContext(c) + + require.Equal(t, http.StatusCreated, resp.Code, "Failed to publish") + var payload struct { + JobID string `json:"job_id"` + } + require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &payload)) + expectedVersion := 0 + if enable == "YES" { + expectedVersion = uuid.JobIDV1 + } + require.Equal(t, expectedVersion, uuid.ExtractJobIDVersion(payload.JobID)) + + // Consume should also return the correct version and job body + bytes, jobID := consumeTestJob("ns", "q18", 10, 3) + require.Equal(t, expectedVersion, uuid.ExtractJobIDVersion(jobID)) + require.Equal(t, "hello job version", string(bytes)) + } +} + func publishTestJob(ns, q string, delay, ttl uint32) (body []byte, jobID string) { e := engine.GetEngine("") body = make([]byte, 10) diff --git a/uuid/uuid.go b/uuid/uuid.go index 72248b07..f6ccc196 100644 --- a/uuid/uuid.go +++ b/uuid/uuid.go @@ -3,6 +3,7 @@ package uuid import ( "encoding/binary" "errors" + "fmt" "math/rand" "sync" "time" @@ -10,6 +11,8 @@ import ( "github.com/oklog/ulid" ) +const JobIDV1 = 1 + // Use pool to avoid concurrent access for rand.Source var entropyPool = sync.Pool{ New: func() interface{} { @@ -26,34 +29,38 @@ func GenUniqueID() string { return id.String() } +// GenJobIDWithVersion generates a job ID with version prefix and delaySecond. +// For the legacy version 0 job ID, the version prefix is not included, +// we use the version prefix to distinguish different job payload format. +// // Use the last four bytes of the 16-byte's ULID to store the delaySecond. // The last fours bytes was some random value in ULID, so changing that value won't // affect anything except randomness. -func GenUniqueJobIDWithDelay(delaySecond uint32) string { +func GenJobIDWithVersion(version int, delaySecond uint32) string { entropy := entropyPool.Get().(*rand.Rand) defer entropyPool.Put(entropy) id := ulid.MustNew(ulid.Now(), entropy) // Encode the delayHour in littleEndian and store at the last four bytes binary.LittleEndian.PutUint32(id[len(id)-4:], delaySecond) - return id.String() -} - -func UniqueIDToBinary(id string) [16]byte { - return ulid.MustParse(id) -} - -func BinaryToUniqueID(bin [16]byte) string { - return ulid.ULID(bin).String() + // legacy version is 0, it doesn't include version prefix in the id + if version == 0 { + return id.String() + } + if version < 0 || version > 9 { + version = JobIDV1 + } + return fmt.Sprintf("%d%s", version, id.String()) } func ElapsedMilliSecondFromUniqueID(s string) (int64, error) { + s, _ = extractJobID(s) id, err := ulid.Parse(s) if err != nil { return 0, err } t := id.Time() now := ulid.Now() - if t < now { + if t <= now { return int64(now - t), nil } else { return 0, errors.New("id has a future timestamp") @@ -61,9 +68,24 @@ func ElapsedMilliSecondFromUniqueID(s string) (int64, error) { } func ExtractDelaySecondFromUniqueID(s string) (uint32, error) { + s, _ = extractJobID(s) id, err := ulid.Parse(s) if err != nil { return 0, err } return binary.LittleEndian.Uint32(id[len(id)-4:]), nil } + +func extractJobID(s string) (string, int) { + if len(s) <= ulid.EncodedSize { + return s, 0 + } + return s[1:], int(s[0] - '0') +} + +func ExtractJobIDVersion(s string) int { + if len(s) == ulid.EncodedSize { + return 0 + } + return int(s[0] - '0') +} diff --git a/uuid/uuid_test.go b/uuid/uuid_test.go new file mode 100644 index 00000000..e4cfb393 --- /dev/null +++ b/uuid/uuid_test.go @@ -0,0 +1,27 @@ +package uuid + +import ( + "testing" + "time" + + "github.com/oklog/ulid" + "github.com/stretchr/testify/require" +) + +func TestJobID(t *testing.T) { + jobID := GenJobIDWithVersion(JobIDV1, 10) + + id, version := extractJobID(jobID) + require.Equal(t, 1, version) + require.Equal(t, ulid.EncodedSize, len(id)) + + delay, err := ExtractDelaySecondFromUniqueID(jobID) + require.NoError(t, err) + require.Equal(t, uint32(10), delay) + + // Test elapsed time + time.Sleep(10 * time.Millisecond) + delayMilliseconds, err := ElapsedMilliSecondFromUniqueID(jobID) + require.NoError(t, err) + require.InDelta(t, 10, delayMilliseconds, 2) +}