Skip to content

Commit

Permalink
Extend mq package (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
covain authored Apr 26, 2022
1 parent 2cda407 commit 448820b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 9 deletions.
20 changes: 16 additions & 4 deletions mq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ const headerRemainingRetries = "x-remaining-retries"
type consumer struct {
client *Client

queue Queue
fn func(Message) error
options *ConsumerOptions
queue Queue
messageProcessor MessageProcessor
options *ConsumerOptions

messages <-chan amqp.Delivery
stopChan chan struct{}
Expand All @@ -30,6 +30,18 @@ type Consumer interface {
Reconnect(ctx context.Context) error
}

type MessageProcessor interface {
Process(Message) error
}

// MessageProcessorFunc is an adapter to allow to use
// an ordinary functions as mq MessageProcessor.
type MessageProcessorFunc func(message Message) error

func (f MessageProcessorFunc) Process(m Message) error {
return f(m)
}

func (c *consumer) Start(ctx context.Context) error {
c.stopChan = make(chan struct{})

Expand Down Expand Up @@ -117,7 +129,7 @@ func (c *consumer) process(queueName string, body []byte) error {
}

defer metric.Duration(metric.Start())
err := c.fn(body)
err := c.messageProcessor.Process(body)

if err != nil {
metric.Failure()
Expand Down
10 changes: 5 additions & 5 deletions mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ func (c *Client) InitExchange(name ExchangeName) Exchange {
}
}

func (c *Client) InitConsumer(queueName QueueName, options *ConsumerOptions, fn func(message Message) error) Consumer {
func (c *Client) InitConsumer(queueName QueueName, options *ConsumerOptions, processor MessageProcessor) Consumer {
return &consumer{
client: c,
queue: c.InitQueue(queueName),
fn: fn,
options: options,
client: c,
queue: c.InitQueue(queueName),
messageProcessor: processor,
options: options,
}
}

Expand Down

0 comments on commit 448820b

Please sign in to comment.