Skip to content

Commit

Permalink
fifo by default
Browse files Browse the repository at this point in the history
  • Loading branch information
yrong committed Jan 5, 2021
1 parent dbf81d0 commit 9c1d97d
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea
18 changes: 14 additions & 4 deletions enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@ type EnqueueData struct {
EnqueueOptions
}

const (
fifo = iota + 1
lifo
)

type EnqueueOptions struct {
RetryCount int `json:"retry_count,omitempty"`
Retry bool `json:"retry,omitempty"`
At float64 `json:"at,omitempty"`
Policy int `json:"policy,omitempty"`
}

func generateJid() string {
Expand All @@ -38,15 +44,15 @@ func generateJid() string {
}

func Enqueue(queue, class string, args interface{}) (string, error) {
return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: nowToSecondsWithNanoPrecision()})
return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: nowToSecondsWithNanoPrecision(),Policy: fifo})
}

func EnqueueIn(queue, class string, in float64, args interface{}) (string, error) {
return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: nowToSecondsWithNanoPrecision() + in})
return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: nowToSecondsWithNanoPrecision() + in,Policy: fifo})
}

func EnqueueAt(queue, class string, at time.Time, args interface{}) (string, error) {
return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: timeToSecondsWithNanoPrecision(at)})
return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: timeToSecondsWithNanoPrecision(at),Policy: fifo})
}

func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error) {
Expand Down Expand Up @@ -78,7 +84,11 @@ func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptio
return "", err
}
queue = Config.Namespace + "queue:" + queue
_, err = conn.Do("rpush", queue, bytes)
if opts.Policy == fifo {
_, err = conn.Do("lpush", queue, bytes)
} else{
_, err = conn.Do("rpush", queue, bytes)
}
if err != nil {
return "", err
}
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/itering/go-workers

go 1.14

0 comments on commit 9c1d97d

Please sign in to comment.