From 9c1d97d6321c3dbaa6a90c97e13566c3977741f2 Mon Sep 17 00:00:00 2001 From: yrong1997 Date: Tue, 5 Jan 2021 13:33:22 +0800 Subject: [PATCH] fifo by default --- .gitignore | 1 + enqueue.go | 18 ++++++++++++++---- go.mod | 3 +++ 3 files changed, 18 insertions(+), 4 deletions(-) create mode 100644 .gitignore create mode 100644 go.mod diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/enqueue.go b/enqueue.go index f9af7a8..f8d0cb5 100644 --- a/enqueue.go +++ b/enqueue.go @@ -21,10 +21,16 @@ type EnqueueData struct { EnqueueOptions } +const ( + fifo = iota + 1 + lifo +) + type EnqueueOptions struct { RetryCount int `json:"retry_count,omitempty"` Retry bool `json:"retry,omitempty"` At float64 `json:"at,omitempty"` + Policy int `json:"policy,omitempty"` } func generateJid() string { @@ -38,15 +44,15 @@ func generateJid() string { } func Enqueue(queue, class string, args interface{}) (string, error) { - return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: nowToSecondsWithNanoPrecision()}) + return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: nowToSecondsWithNanoPrecision(),Policy: fifo}) } func EnqueueIn(queue, class string, in float64, args interface{}) (string, error) { - return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: nowToSecondsWithNanoPrecision() + in}) + return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: nowToSecondsWithNanoPrecision() + in,Policy: fifo}) } func EnqueueAt(queue, class string, at time.Time, args interface{}) (string, error) { - return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: timeToSecondsWithNanoPrecision(at)}) + return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: timeToSecondsWithNanoPrecision(at),Policy: fifo}) } func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error) { @@ -78,7 +84,11 @@ func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptio return "", err } queue = Config.Namespace + "queue:" + queue - _, err = conn.Do("rpush", queue, bytes) + if opts.Policy == fifo { + _, err = conn.Do("lpush", queue, bytes) + } else{ + _, err = conn.Do("rpush", queue, bytes) + } if err != nil { return "", err } diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..8b41c1d --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/itering/go-workers + +go 1.14