Skip to content

Commit

Permalink
Add support of publish/consume with job attributes (bitleak#227)
Browse files Browse the repository at this point in the history
This PR introduces the new API: PublishJob and RePublishJob to allow
the use of the job attributes and mark the old API as deprecated. But we
don't add the new API for the batch publish since we think it's not a rigorous
implementation and don't encourage users to use that.
  • Loading branch information
git-hulk authored Jul 12, 2024
1 parent 6694ab6 commit 7c2834a
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 7 deletions.
48 changes: 43 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ type Job struct {
TTL int64 `json:"ttl"`
ElapsedMS int64 `json:"elapsed_ms"`
RemainTries int64 `json:"remain_tries"`
Attributes map[string]string
}

type JobRequest struct {
Queue string `json:"queue"`
ID string `json:"job_id"`
Data []byte `json:"data"`
TTL uint32 `json:"ttl"`
Tries uint16 `json:"tries"`
Delay uint32 `json:"delay"`
Attributes map[string]string `json:"attributes"`
}

type LmstfyClient struct {
Expand Down Expand Up @@ -111,27 +122,48 @@ func (c *LmstfyClient) getReq(method, relativePath string, query url.Values, bod
return
}

// Deprecated: Use PublishJob instead
//
// Publish a new job to the queue.
// - ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL.
// - tries is the maximum times the job can be fetched.
// - delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied.
func (c *LmstfyClient) Publish(queue string, data []byte, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) {
return c.publish(queue, "", data, ttlSecond, tries, delaySecond)
return c.publish(queue, "", data, nil, ttlSecond, tries, delaySecond)
}

func (c *LmstfyClient) PublishJob(job *JobRequest) (jobID string, e error) {
return c.publish(job.Queue, "", job.Data, job.Attributes, job.TTL, job.Tries, job.Delay)
}

// Deprecated: Use RePublishJob instead
//
// RePublish delete(ack) the job of the queue and publish the job again.
// - ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL.
// - tries is the maximum times the job can be fetched.
// - delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied.
func (c *LmstfyClient) RePublish(job *Job, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) {
return c.publish(job.Queue, job.ID, job.Data, ttlSecond, tries, delaySecond)
return c.publish(job.Queue, job.ID, job.Data, nil, ttlSecond, tries, delaySecond)
}

func (c *LmstfyClient) RePublishJob(job *JobRequest) (jobID string, e error) {
return c.publish(job.Queue, jobID, job.Data, job.Attributes, job.TTL, job.Tries, job.Delay)
}

func (c *LmstfyClient) publish(queue, ackJobID string, data []byte, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) {
func (c *LmstfyClient) publish(
queue,
ackJobID string,
data []byte,
attributes map[string]string,
ttlSecond uint32,
tries uint16,
delaySecond uint32,
) (jobID string, e error) {
query := url.Values{}
query.Add("ttl", strconv.FormatUint(uint64(ttlSecond), 10))
query.Add("tries", strconv.FormatUint(uint64(tries), 10))
query.Add("delay", strconv.FormatUint(uint64(delaySecond), 10))

retryCount := 0
relativePath := queue
if ackJobID != "" {
Expand All @@ -145,6 +177,12 @@ RETRY:
Reason: err.Error(),
}
}
if len(attributes) > 0 {
req.Header.Add("Enable-Job-Version", "YES")
for k, v := range attributes {
req.Header.Add(fmt.Sprintf("Job-Attr-%s", strings.ToTitle(k)), v)
}
}

resp, err := c.httpCli.Do(req)
if err != nil {
Expand Down Expand Up @@ -364,7 +402,7 @@ func (c *LmstfyClient) consume(queue string, ttrSecond, timeoutSecond uint32, fr
RequestID: resp.Header.Get("X-Request-ID"),
}
}
respBytes, err := ioutil.ReadAll(resp.Body)
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, &APIError{
Type: ResponseErr,
Expand Down Expand Up @@ -502,7 +540,7 @@ func (c *LmstfyClient) batchConsume(queues []string, count, ttrSecond, timeoutSe

// Consume from multiple queues with priority.
// The order of the queues in the params implies the priority. eg.
// ConsumeFromQueues(120, 5, "queue-a", "queue-b", "queue-c")
// ConsumeFromQueues(120, 5, "queue-a", "queue-b", "queue-c")
// if all the queues have jobs to be fetched, the job in `queue-a` will be return.
func (c *LmstfyClient) ConsumeFromQueues(ttrSecond, timeoutSecond uint32, queues ...string) (job *Job, e error) {
return c.consumeFromQueues(ttrSecond, timeoutSecond, false, queues...)
Expand Down
25 changes: 25 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestParseSchemeFromURL(t *testing.T) {
Expand Down Expand Up @@ -121,6 +123,29 @@ func TestLmstfyClient_Consume(t *testing.T) {
}
}

func TestLmstfyClient_PublishWithAttributes(t *testing.T) {
cli := NewLmstfyClient(Host, Port, Namespace, Token)
jobID, _ := cli.PublishJob(&JobRequest{
Queue: "test-publish-attributes",
Data: []byte("hello"),
TTL: 10,
Tries: 10,
Delay: 0,
Attributes: map[string]string{
"hello": "world",
"foo": "bar",
},
})
job, err := cli.Consume("test-publish-attributes", 10, 3)
require.NoError(t, err)
require.NotNil(t, job)
require.Equal(t, jobID, job.ID)
require.Equal(t, "hello", string(job.Data))
require.Len(t, job.Attributes, 2)
require.Equal(t, "world", job.Attributes["hello"])
require.Equal(t, "bar", job.Attributes["foo"])
}

func TestLmstfyClient_BatchConsume(t *testing.T) {
cli := NewLmstfyClient(Host, Port, Namespace, Token)
jobMap := map[string]bool{}
Expand Down
4 changes: 2 additions & 2 deletions client/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"net/http"
"os"
"testing"
Expand Down Expand Up @@ -53,7 +53,7 @@ func setup(CONF *config.Config) {
if resp.StatusCode != http.StatusCreated {
panic("Failed to create testing token")
}
respBytes, err := ioutil.ReadAll(resp.Body)
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
panic("Failed to create testing token")
}
Expand Down

0 comments on commit 7c2834a

Please sign in to comment.