Skip to content

Commit

Permalink
Make WaitTimeSeconds adjustable to support long polling (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgale authored Apr 2, 2020
1 parent a733391 commit b64bc81
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 11 deletions.
60 changes: 49 additions & 11 deletions sqs/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
const (
defaultMessageIDPrefix = "msg_"
defaultExpireSecond = 180
defaultWaitTimeSeconds = 0
)

// Queue is SQS Queue wrapper struct.
Expand All @@ -33,20 +34,22 @@ type Queue struct {
deleteSpool []*SDK.DeleteMessageBatchRequestEntry
failedDelete []*SDK.BatchResultErrorEntry

autoDel bool
expire int
autoDel bool
expire int
waitTimeSeconds int
}

// NewQueue returns initialized *Queue.
func NewQueue(svc *SQS, name string, url string) *Queue {
queueName := svc.prefix + name
return &Queue{
service: svc,
name: name,
nameWithPrefix: queueName,
url: pointers.String(url),
autoDel: false,
expire: defaultExpireSecond,
service: svc,
name: name,
nameWithPrefix: queueName,
url: pointers.String(url),
autoDel: false,
expire: defaultExpireSecond,
waitTimeSeconds: defaultWaitTimeSeconds,
}
}

Expand All @@ -60,7 +63,14 @@ func (q *Queue) SetExpire(sec int) {
q.expire = sec
}

// SetWaitTimeSeconds sets wait time timeout for message.
// Setting this value allows for a long polling workflow.
func (q *Queue) SetWaitTimeSeconds(sec int) {
q.waitTimeSeconds = sec
}

// AddMessage adds message to the send spool.
// This assumes a Standard SQS Queue and not a FifoQueue
func (q *Queue) AddMessage(message string) {
q.sendSpoolMu.Lock()
defer q.sendSpoolMu.Unlock()
Expand All @@ -73,6 +83,21 @@ func (q *Queue) AddMessage(message string) {
q.sendSpool = append(q.sendSpool, m)
}

// AddMessageWithGroupID adds a message to the send spool but adds the required attributes
// for a SQS FIFO Queue. This assumes the SQS FIFO Queue has ContentBasedDeduplication enabled.
func (q *Queue) AddMessageWithGroupID(message string, messageGroupID string) {
q.sendSpoolMu.Lock()
defer q.sendSpoolMu.Unlock()

num := fmt.Sprint(len(q.sendSpool) + 1)
m := &SDK.SendMessageBatchRequestEntry{
MessageBody: pointers.String(message),
Id: pointers.String(defaultMessageIDPrefix + num), // serial numbering for convenience sake
MessageGroupId: pointers.String(messageGroupID),
}
q.sendSpool = append(q.sendSpool, m)
}

// AddMessageJSONMarshal adds message to the send pool with encoding json data.
func (q *Queue) AddMessageJSONMarshal(message interface{}) error {
msg, err := json.Marshal(message)
Expand Down Expand Up @@ -137,9 +162,10 @@ func (q *Queue) send(msg []*SDK.SendMessageBatchRequestEntry) error {

// Fetch fetches message list from the queue with limit.
func (q *Queue) Fetch(num int) ([]*Message, error) {
wait := 0

if num > 1 {
wait := q.waitTimeSeconds

if wait == 0 && num > 1 {
wait = 1 // use long-polling for 1sec when to get multiple messages
}

Expand All @@ -148,7 +174,7 @@ func (q *Queue) Fetch(num int) ([]*Message, error) {
QueueUrl: q.url,
WaitTimeSeconds: pointers.Long(wait),
MaxNumberOfMessages: pointers.Long(num),
VisibilityTimeout: pointers.Long(defaultExpireSecond),
VisibilityTimeout: pointers.Long(q.expire),
})
if err != nil {
q.service.Errorf("error on `ReceiveMessage` operation; queue=%s; error=%s;", q.nameWithPrefix, err.Error())
Expand Down Expand Up @@ -270,6 +296,18 @@ func (q *Queue) DeleteMessage(msg *Message) error {
return err
}

// DeleteMessageWithReceipt sends the request to AWS api to delete the message.
func (q *Queue) DeleteMessageWithReceipt(msgReceipt string) error {
_, err := q.service.client.DeleteMessage(&SDK.DeleteMessageInput{
QueueUrl: q.url,
ReceiptHandle: pointers.String(msgReceipt),
})
if err != nil {
q.service.Errorf("error on `DeleteMessage`; queue=%s; error=%s;", q.nameWithPrefix, err.Error())
}
return err
}

// DeleteListItems executes delete operation in the delete spool.
func (q *Queue) DeleteListItems() error {
q.deleteSpoolMu.Lock()
Expand Down
40 changes: 40 additions & 0 deletions sqs/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ func TestSetExpire(t *testing.T) {
assert.Equal(10, q.expire)
}

func TestSetWaitTimeSeconds(t *testing.T) {
assert := assert.New(t)
svc := getTestClient(t)
q, _ := svc.GetQueue("test")

assert.Equal(defaultWaitTimeSeconds, q.waitTimeSeconds)
q.SetWaitTimeSeconds(20)
assert.Equal(20, q.waitTimeSeconds)
}

func TestAddMessage(t *testing.T) {
assert := assert.New(t)
svc := getTestClient(t)
Expand All @@ -60,6 +70,17 @@ func TestAddMessage(t *testing.T) {
assert.Equal("foo msg", msg)
}

func TestAddMessageWithGroupID(t *testing.T) {
assert := assert.New(t)
svc := getTestClient(t)
q, _ := svc.GetQueue("test")

q.AddMessageWithGroupID("foo msg", "Grp1")
assert.Equal(1, len(q.sendSpool))
msg := *(q.sendSpool[0].MessageBody)
assert.Equal("foo msg", msg)
}

func TestAddMessageMap(t *testing.T) {
assert := assert.New(t)
svc := getTestClient(t)
Expand Down Expand Up @@ -243,6 +264,25 @@ func TestDeleteMessage(t *testing.T) {
cleanQueue(q)
}

func TestDeleteMessageWithReceipt(t *testing.T) {
assert := assert.New(t)
svc := getTestClient(t)
q, _ := svc.GetQueue("test")
cleanQueue(q)

// prepare messages
addTestMessage(q, 3)
msg, err := q.FetchOne()
assert.Nil(err)

msgReceipt := msg.GetReceiptHandle()
// test this feature
err = q.DeleteMessageWithReceipt(*msgReceipt)
assert.Nil(err)

cleanQueue(q)
}

func TestDeleteListItems(t *testing.T) {
assert := assert.New(t)
svc := getTestClient(t)
Expand Down

0 comments on commit b64bc81

Please sign in to comment.