Skip to content

Commit

Permalink
Add max priority level to RabbitMQ queue (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertjndw authored Dec 7, 2023
1 parent e465189 commit ed8d5a8
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions shared/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
log "github.com/sirupsen/logrus"
)

const maxPriority uint8 = 5

type Queue struct {
channel *amqp.Channel
queue amqp.Queue
Expand Down Expand Up @@ -38,13 +40,17 @@ func Init(queueName, url string) (*Queue, error) {
return nil, err
}

args := amqp.Table{
"x-max-priority": maxPriority, // Set the max priority level
}

q.queue, err = q.channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
args, // arguments
)
if err != nil {
log.WithError(err).Error("error declaring RabbitMQ queue")
Expand All @@ -57,6 +63,11 @@ func Init(queueName, url string) (*Queue, error) {
func (q *Queue) Enqueue(ctx context.Context, msg payload.QueuePayload, prio uint8) error {
log.Debugf("Enqueue function called with ctx %+v message: %v", ctx, msg)

if prio > maxPriority {
log.Warnf("Priority %d is higher than max priority %d. Setting to max priority", prio, maxPriority)
prio = maxPriority
}

body, err := json.Marshal(msg)
if err != nil {
log.WithError(err).Error("error marshalling message")
Expand All @@ -69,7 +80,7 @@ func (q *Queue) Enqueue(ctx context.Context, msg payload.QueuePayload, prio uint
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
ContentType: "application/json",
Body: []byte(body),
Priority: prio,
})
Expand Down

0 comments on commit ed8d5a8

Please sign in to comment.