Skip to content

Commit

Permalink
Parse message bodies with complex types (#4)
Browse files Browse the repository at this point in the history
We assumed initially that every field in an SNS message would be a
string. However some of them can be complex objects. We can avoid
having to figure that out by just parsing them all to json.RawMessage
and then parsing the only two fields we actually need to strings.
  • Loading branch information
packrat386 authored and mlarraz committed Jun 6, 2018
1 parent 1a1fb53 commit 2be0820
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 6 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func init() {
Poll SQS queues specified in a config and enqueue Sidekiq jobs with the queue items.
It gracefully stops when sent SIGTERM.`

app.Version = "1.3"
app.Version = "1.4"

app.Flags = []cli.Flag{
cli.StringFlag{
Expand Down
23 changes: 18 additions & 5 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,39 @@ func (q *queue) deleteMessage(msg Message, ctx log.FieldLogger) {

// enqueueMessage pushes a single message from SQS into redis
func (q *queue) enqueueMessage(msg Message, ctx log.FieldLogger) bool {
body := make(map[string]string)
body := make(map[string]json.RawMessage)
err := json.Unmarshal([]byte(msg.Body), &body)
if err != nil {
ctx.Warn("Message body could not be parsed: ", err.Error())
return true
}

workerClass, ok := q.Topics[topicName(body["TopicArn"])]
var topicARN string
err = json.Unmarshal(body["TopicArn"], &topicARN)
if err != nil {
ctx.Warn("Topic ARN could not be parsed: ", err.Error())
return true
}

workerClass, ok := q.Topics[topicName(topicARN)]
if !ok {
ctx.Warn("No worker for topic: ", topicName(body["TopicArn"]))
ctx.Warn("No worker for topic: ", topicName(topicARN))
return true
}

jid, err := q.WorkerClient.Push(workerClass, body["Message"])
var bodyMessage string
err = json.Unmarshal(body["Message"], &bodyMessage)
if err != nil {
ctx.Warn("'Message' field could not be parsed: ", err.Error())
}

jid, err := q.WorkerClient.Push(workerClass, bodyMessage)
if err != nil {
ctx.WithField("Class", workerClass).Error("Couldn't enqueue worker: ", err.Error())
return false
}

ctx.WithField("Args", body["Message"]).Info("Enqueued job: ", jid)
ctx.WithField("Args", bodyMessage).Info("Enqueued job: ", jid)
return true
}

Expand Down
32 changes: 32 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"encoding/json"
"errors"
"sync"
"testing"
Expand Down Expand Up @@ -133,6 +134,37 @@ func (q *QueueTestSuite) TestQueue_UnparseableBody() {
q.assert.Contains(q.sqsClient.Deleted, badMessage)
}

func (q *QueueTestSuite) TestQueue_ComplexBody() {
// make a message with a body that cannot be represented as map[string]string
msg := map[string]interface{}{
"Message": `{"foo":"bar"}`,
"TopicArn": "topicA",
"Some": map[string]string{"other": "data"},
}

data, err := json.Marshal(msg)
if err != nil {
panic(err)
}

message := Message{Body: string(data)}

// set the mock to return that message
q.sqsClient.Fetchable = []Message{message}

// make a some topics
q.queue.Topics["topicA"] = "WorkerA"

// do the work
q.queue.Poll()

// The worker should be enqueued
q.assert.Contains(q.workerClient.Enqueued, []string{"WorkerA", `{"foo":"bar"}`})

// The message should be deleted
q.assert.Contains(q.sqsClient.Deleted, message)
}

func (q *QueueTestSuite) TestQueue_EnqueueError() {
// make a messages
message1 := MockMessage(`{"foo":"bar"}`, "topicA")
Expand Down

0 comments on commit 2be0820

Please sign in to comment.