diff --git a/sqs/queue.go b/sqs/queue.go index c5912c6..6559b02 100644 --- a/sqs/queue.go +++ b/sqs/queue.go @@ -28,11 +28,13 @@ type Queue struct { sendSpoolMu sync.Mutex sendSpool []*SDK.SendMessageBatchRequestEntry - failedSend []*SDK.BatchResultErrorEntry deleteSpoolMu sync.Mutex deleteSpool []*SDK.DeleteMessageBatchRequestEntry - failedDelete []*SDK.BatchResultErrorEntry + + failedMu sync.Mutex + failedSend []*SDK.BatchResultErrorEntry + failedDelete []*SDK.BatchResultErrorEntry autoDel bool expire int @@ -156,7 +158,11 @@ func (q *Queue) send(msg []*SDK.SendMessageBatchRequestEntry) error { Entries: msg, QueueUrl: q.url, }) - q.failedSend = append(q.failedSend, res.Failed...) + if len(res.Failed) != 0 { + q.failedMu.Lock() + defer q.failedMu.Unlock() + q.failedSend = append(q.failedSend, res.Failed...) + } return err } @@ -357,6 +363,8 @@ func (q *Queue) delete(msg []*SDK.DeleteMessageBatchRequestEntry) error { QueueUrl: q.url, }) if err != nil { + q.failedMu.Lock() + defer q.failedMu.Unlock() q.service.Errorf("error on `DeleteMessageBatch`; queue=%s; error=%s;", q.nameWithPrefix, err.Error()) q.failedDelete = append(q.failedDelete, res.Failed...) } @@ -396,3 +404,25 @@ func (q *Queue) Purge() error { q.service.Infof("success on `PurgeQueue` operation; queue=%s;", q.nameWithPrefix) return nil } + +// FailedResults contains failed results of batch request. +type FailedResults struct { + Send []*SDK.BatchResultErrorEntry + Delete []*SDK.BatchResultErrorEntry +} + +// GetFailedResults gets failed results of batch request. +func (q *Queue) GetFailedResults() FailedResults { + return FailedResults{ + Send: q.failedSend, + Delete: q.failedDelete, + } +} + +// ClearFailedResults resets failed results of batch request. +func (q *Queue) ClearFailedResults() { + q.failedMu.Lock() + defer q.failedMu.Unlock() + q.failedSend = nil + q.failedDelete = nil +}