Skip to content

Commit

Permalink
[sqs] Add 'GetFailedResults', 'ClearFailedResults' (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
evalphobia authored Oct 5, 2020
1 parent 58fc2cc commit 4e336ec
Showing 1 changed file with 33 additions and 3 deletions.
36 changes: 33 additions & 3 deletions sqs/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ type Queue struct {

sendSpoolMu sync.Mutex
sendSpool []*SDK.SendMessageBatchRequestEntry
failedSend []*SDK.BatchResultErrorEntry

deleteSpoolMu sync.Mutex
deleteSpool []*SDK.DeleteMessageBatchRequestEntry
failedDelete []*SDK.BatchResultErrorEntry

failedMu sync.Mutex
failedSend []*SDK.BatchResultErrorEntry
failedDelete []*SDK.BatchResultErrorEntry

autoDel bool
expire int
Expand Down Expand Up @@ -156,7 +158,11 @@ func (q *Queue) send(msg []*SDK.SendMessageBatchRequestEntry) error {
Entries: msg,
QueueUrl: q.url,
})
q.failedSend = append(q.failedSend, res.Failed...)
if len(res.Failed) != 0 {
q.failedMu.Lock()
defer q.failedMu.Unlock()
q.failedSend = append(q.failedSend, res.Failed...)
}
return err
}

Expand Down Expand Up @@ -357,6 +363,8 @@ func (q *Queue) delete(msg []*SDK.DeleteMessageBatchRequestEntry) error {
QueueUrl: q.url,
})
if err != nil {
q.failedMu.Lock()
defer q.failedMu.Unlock()
q.service.Errorf("error on `DeleteMessageBatch`; queue=%s; error=%s;", q.nameWithPrefix, err.Error())
q.failedDelete = append(q.failedDelete, res.Failed...)
}
Expand Down Expand Up @@ -396,3 +404,25 @@ func (q *Queue) Purge() error {
q.service.Infof("success on `PurgeQueue` operation; queue=%s;", q.nameWithPrefix)
return nil
}

// FailedResults contains failed results of batch request.
type FailedResults struct {
Send []*SDK.BatchResultErrorEntry
Delete []*SDK.BatchResultErrorEntry
}

// GetFailedResults gets failed results of batch request.
func (q *Queue) GetFailedResults() FailedResults {
return FailedResults{
Send: q.failedSend,
Delete: q.failedDelete,
}
}

// ClearFailedResults resets failed results of batch request.
func (q *Queue) ClearFailedResults() {
q.failedMu.Lock()
defer q.failedMu.Unlock()
q.failedSend = nil
q.failedDelete = nil
}

0 comments on commit 4e336ec

Please sign in to comment.