diff --git a/internal/util/util.go b/internal/util/util.go index e8fa23c..19d971e 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -1,8 +1,6 @@ package util import ( - "bytes" - "encoding/gob" "encoding/json" "fmt" "github.com/aws/aws-sdk-go-v2/service/sqs/types" @@ -32,22 +30,6 @@ func ConvertToString(a any) string { } } -func ConvertToBytes(a any) ([]byte, error) { - if a == nil { - return nil, nil - } - t := reflect.TypeOf(a) - switch t.Kind() { - case reflect.Struct, reflect.Map, reflect.Slice, reflect.Array, reflect.Interface: - return json.Marshal(a) - default: - var buffer bytes.Buffer - enc := gob.NewEncoder(&buffer) - err := enc.Encode(a) - return buffer.Bytes(), err - } -} - func GetDataType(a any) string { t := reflect.TypeOf(a) switch t.Kind() { diff --git a/sqs/consumer_test.go b/sqs/consumer_test.go index 24e80fe..f17ff6b 100644 --- a/sqs/consumer_test.go +++ b/sqs/consumer_test.go @@ -14,7 +14,7 @@ func TestReceiveMessage(t *testing.T) { t.Errorf("ReceiveMessage() error = %v, wantErr %v", r, tt.wantErr) } }() - initMessageString(tt.queueUrl) + initMessageString() initMessageStruct(tt.queueUrl) d := 5 * time.Second if tt.name == "failed" { @@ -43,7 +43,7 @@ func TestSimpleReceiveMessage(t *testing.T) { t.Errorf("SimpleReceiveMessage() error = %v, wantErr %v", r, tt.wantErr) } }() - initMessageString(tt.queueUrl) + initMessageString() initMessageStruct(tt.queueUrl) d := 5 * time.Second if tt.name == "failed" { diff --git a/sqs/main_test.go b/sqs/main_test.go index 1d7dbab..b77957a 100644 --- a/sqs/main_test.go +++ b/sqs/main_test.go @@ -11,8 +11,18 @@ import ( "time" ) +const SqsQueueTestName = "SQS_QUEUE_TEST_NAME" +const SqsQueueTestUrl = "SQS_QUEUE_TEST_URL" +const SqsQueueTestStringArn = "SQS_QUEUE_TEST_STRING_ARN" +const SqsQueueTestFifoUrl = "SQS_QUEUE_TEST_FIFO_URL" +const SqsQueueTestEmptyUrl = "SQS_QUEUE_TEST_EMPTY_URL" +const SqsQueueTestStringUrl = "SQS_QUEUE_TEST_STRING_URL" +const SqsQueueTestDlqArn = "SQS_QUEUE_TEST_DLQ_ARN" const SqsQueueCreateTestName = "SQS_QUEUE_CREATE_TEST_NAME" const SqsQueueCreateTestUrl = "SQS_QUEUE_CREATE_TEST_URL" +const SqsMessageId = "SQS_MESSAGE_ID" +const SqsMessageReceiptHandle = "SQS_MESSAGE_RECEIPT_HANDLE" +const SqsTaskHandle = "SQS_TASK_HANDLE" type testProducer struct { name string @@ -117,6 +127,55 @@ type testListQueue struct { wantErr bool } +type testDeleteMessage struct { + name string + queueUrl string + receiptHandle string + opts []option.Default + wantErr bool +} + +type testDeleteMessageBatch struct { + name string + input DeleteMessageBatchInput + opts []option.Default + wantErr bool +} + +type testChangeMessageVisibility struct { + name string + input ChangeMessageVisibilityInput + opts []option.Default + wantErr bool +} + +type testChangeMessageVisibilityBatch struct { + name string + input ChangeMessageVisibilityBatchInput + opts []option.Default + wantErr bool +} + +type testStartMessageMoveTask struct { + name string + input StartMessageMoveTaskInput + opts []option.Default + wantErr bool +} + +type testCancelMessageMoveTask struct { + name string + taskHandle string + opts []option.Default + wantErr bool +} +type testListMessageMoveTask struct { + name string + sourceArn string + opts []option.ListMessageMoveTasks + wantErr bool +} + type test struct { Name string `json:"name,omitempty"` BirthDate time.Time `json:"birthDate,omitempty"` @@ -152,16 +211,19 @@ type bank struct { func TestMain(t *testing.M) { t.Run() deleteQueueCreateTest() + purgeQueues() + cancelMessageMoveTaskTest() } func initListTestProducer() []testProducer { + msgAttTest := initMessageAttTest() return []testProducer{ { name: "valid request", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), v: initTestStruct(), opts: []option.Producer{ - option.NewProducer().SetMessageAttributes(initMessageAttTest()), + option.NewProducer().SetMessageAttributes(&msgAttTest), option.NewProducer().SetDelaySeconds(2 * time.Second), option.NewProducer().SetDebugMode(true), }, @@ -170,7 +232,7 @@ func initListTestProducer() []testProducer { }, { name: "valid request fifo", - queueUrl: os.Getenv("SQS_QUEUE_TEST_FIFO"), + queueUrl: os.Getenv(SqsQueueTestFifoUrl), v: initTestStruct(), opts: []option.Producer{ option.NewProducer().SetDebugMode(true), @@ -184,7 +246,7 @@ func initListTestProducer() []testProducer { }, { name: "valid request async", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), v: initTestStruct(), opts: []option.Producer{ option.NewProducer().SetMessageAttributes(initTestMap()), @@ -201,7 +263,7 @@ func initListTestProducer() []testProducer { }, { name: "invalid message attributes", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), v: initTestStruct(), opts: []option.Producer{ option.NewProducer().SetMessageAttributes("test message string"), @@ -211,7 +273,7 @@ func initListTestProducer() []testProducer { }, { name: "invalid system message attributes", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), v: initTestStruct(), opts: []option.Producer{ option.NewProducer().SetMessageSystemAttributes(option.MessageSystemAttributes{ @@ -223,17 +285,17 @@ func initListTestProducer() []testProducer { }, { name: "empty message attributes", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), v: initTestStruct(), opts: []option.Producer{ - option.NewProducer().SetMessageAttributes(messageAttTest{}), + option.NewProducer().SetMessageAttributes(struct{}{}), option.NewProducer().SetDebugMode(true), }, wantErr: false, }, { name: "invalid map message attributes", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), v: initTestStruct(), opts: []option.Producer{ option.NewProducer().SetMessageAttributes(initInvalidTestMap()), @@ -243,7 +305,7 @@ func initListTestProducer() []testProducer { }, { name: "invalid message body", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), v: "", wantErr: true, }, @@ -254,28 +316,28 @@ func initListTestConsumer[Body, MessageAttributes any]() []testConsumer[Body, Me return []testConsumer[Body, MessageAttributes]{ { name: "success", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), handler: initHandleConsumer[Body, MessageAttributes], opts: initOptionsConsumerDefault(), wantErr: false, }, { - name: "success FIFO", - queueUrl: os.Getenv("SQS_QUEUE_TEST_FIFO"), + name: "success fifo", + queueUrl: os.Getenv(SqsQueueTestFifoUrl), handler: initHandleConsumer[Body, MessageAttributes], opts: initOptionsConsumerDefault(), wantErr: false, }, { name: "success error consumer", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), handler: initHandleConsumerWithErr[Body, MessageAttributes], opts: initOptionsConsumerDefault(), wantErr: false, }, { name: "success async", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), handler: initHandleConsumer[Body, MessageAttributes], opts: initOptionsConsumerDefault(), async: true, @@ -283,7 +345,7 @@ func initListTestConsumer[Body, MessageAttributes any]() []testConsumer[Body, Me }, { name: "success empty", - queueUrl: os.Getenv("SQS_QUEUE_TEST_EMPTY"), + queueUrl: os.Getenv(SqsQueueTestEmptyUrl), handler: initHandleConsumer[Body, MessageAttributes], opts: initOptionsConsumerDefault(), async: false, @@ -291,7 +353,7 @@ func initListTestConsumer[Body, MessageAttributes any]() []testConsumer[Body, Me }, { name: "failed parse body", - queueUrl: os.Getenv("SQS_QUEUE_TEST_STRING"), + queueUrl: os.Getenv(SqsQueueTestStringUrl), handler: initHandleConsumer[Body, MessageAttributes], opts: initOptionsConsumerDefault(), async: false, @@ -311,21 +373,21 @@ func initListTestSimpleConsumer[Body any]() []testSimpleConsumer[Body] { return []testSimpleConsumer[Body]{ { name: "success", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), handler: initSimpleHandleConsumer[Body], opts: initOptionsConsumerDefault(), wantErr: false, }, { name: "success error consumer", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), handler: initSimpleHandleConsumerWithErr[Body], opts: initOptionsConsumerDefault(), wantErr: false, }, { name: "success async", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), handler: initSimpleHandleConsumer[Body], opts: initOptionsConsumerDefault(), async: true, @@ -333,7 +395,7 @@ func initListTestSimpleConsumer[Body any]() []testSimpleConsumer[Body] { }, { name: "failed parse body", - queueUrl: os.Getenv("SQS_QUEUE_TEST_STRING"), + queueUrl: os.Getenv(SqsQueueTestStringUrl), handler: initSimpleHandleConsumer[Body], opts: initOptionsConsumerDefault(), async: false, @@ -438,7 +500,7 @@ func initListTestPurgeQueue() []testPurgeQueue { return []testPurgeQueue{ { name: "success", - queueUrl: os.Getenv("SQS_QUEUE_TEST_EMPTY"), + queueUrl: os.Getenv(SqsQueueCreateTestUrl), opts: initOptionsDefault(), wantErr: false, }, @@ -489,7 +551,7 @@ func initListTestListQueueTags() []testListQueueTags { return []testListQueueTags{ { name: "success", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), opts: initOptionsDefault(), wantErr: false, }, @@ -506,7 +568,7 @@ func initListTestListDeadLetterSourceQueues() []testListDeadLetterSourceQueues { return []testListDeadLetterSourceQueues{ { name: "success", - queueUrl: os.Getenv("SQS_QUEUE_TEST"), + queueUrl: os.Getenv(SqsQueueTestUrl), opts: initOptionsListDeadLetterSourceQueues(), wantErr: false, }, @@ -534,6 +596,157 @@ func initListTestListQueue() []testListQueue { } } +func initListTestDeleteMessage() []testDeleteMessage { + return []testDeleteMessage{ + { + name: "success", + queueUrl: os.Getenv(SqsQueueTestStringUrl), + receiptHandle: os.Getenv(SqsMessageReceiptHandle), + opts: initOptionsDefault(), + wantErr: false, + }, + { + name: "failed", + queueUrl: "https://google.com", + receiptHandle: "", + opts: initOptionsDefault(), + wantErr: true, + }, + } +} + +func initListTestDeleteMessageBatch() []testDeleteMessageBatch { + return []testDeleteMessageBatch{ + { + name: "success", + input: DeleteMessageBatchInput{ + QueueUrl: os.Getenv(SqsQueueTestStringUrl), + Entries: []DeleteMessageBatchRequestEntry{ + { + Id: os.Getenv(SqsMessageId), + ReceiptHandle: os.Getenv(SqsMessageReceiptHandle), + }, + }, + }, + opts: initOptionsDefault(), + wantErr: false, + }, + { + name: "failed", + input: DeleteMessageBatchInput{ + QueueUrl: "https://google.com/", + Entries: []DeleteMessageBatchRequestEntry{}, + }, + opts: initOptionsDefault(), + wantErr: true, + }, + } +} + +func initListTestChangeMessageVisibility() []testChangeMessageVisibility { + return []testChangeMessageVisibility{ + { + name: "success", + input: ChangeMessageVisibilityInput{ + QueueUrl: os.Getenv(SqsQueueTestStringUrl), + ReceiptHandle: os.Getenv(SqsMessageReceiptHandle), + VisibilityTimeout: 1, + }, + opts: initOptionsDefault(), + wantErr: false, + }, + { + name: "failed", + input: ChangeMessageVisibilityInput{ + QueueUrl: "https://google.com/", + }, + opts: initOptionsDefault(), + wantErr: true, + }, + } +} + +func initListTestChangeMessageVisibilityBatch() []testChangeMessageVisibilityBatch { + return []testChangeMessageVisibilityBatch{ + { + name: "success", + input: ChangeMessageVisibilityBatchInput{ + QueueUrl: os.Getenv(SqsQueueTestStringUrl), + Entries: []ChangeMessageVisibilityBatchRequestEntry{ + { + Id: os.Getenv(SqsMessageId), + ReceiptHandle: os.Getenv(SqsMessageReceiptHandle), + VisibilityTimeout: 3, + }, + }, + }, + opts: initOptionsDefault(), + wantErr: false, + }, + { + name: "failed", + input: ChangeMessageVisibilityBatchInput{ + QueueUrl: "https://google.com/", + }, + opts: initOptionsDefault(), + wantErr: true, + }, + } +} + +func initListTestStartMessageMoveTask() []testStartMessageMoveTask { + return []testStartMessageMoveTask{ + { + name: "success", + input: initStartMessageMoveTaskInput(), + opts: initOptionsDefault(), + wantErr: false, + }, + { + name: "failed", + input: StartMessageMoveTaskInput{ + SourceArn: "https://google.com/", + }, + opts: initOptionsDefault(), + wantErr: true, + }, + } +} + +func initListTestCancelMessageMoveTask() []testCancelMessageMoveTask { + return []testCancelMessageMoveTask{ + { + name: "success", + taskHandle: os.Getenv(SqsTaskHandle), + opts: initOptionsDefault(), + wantErr: false, + }, + { + name: "failed", + taskHandle: "", + opts: initOptionsDefault(), + wantErr: true, + }, + } +} + +func initListTestListMessageMoveTask() []testListMessageMoveTask { + return []testListMessageMoveTask{ + { + name: "success", + sourceArn: os.Getenv(SqsQueueTestStringArn), + opts: initOptionsListMessageMoveTasks(), + wantErr: false, + }, + { + name: "failed", + sourceArn: "", + opts: initOptionsListMessageMoveTasksWithErr(), + wantErr: true, + }, + } +} + func initTestStruct() test { b := bank{ Account: "123456", @@ -635,14 +848,14 @@ func initUntagQueueInput() UntagQueueInput { func initGetQueueUrlInput() GetQueueUrlInput { return GetQueueUrlInput{ - QueueName: os.Getenv("SQS_QUEUE_TEST_NAME"), + QueueName: os.Getenv(SqsQueueTestName), QueueOwnerAWSAccountId: nil, } } func initGetQueueAttributesInput() GetQueueAttributesInput { return GetQueueAttributesInput{ - QueueUrl: os.Getenv("SQS_QUEUE_TEST"), + QueueUrl: os.Getenv(SqsQueueTestUrl), AttributeNames: nil, } } @@ -655,6 +868,15 @@ func initAttributesQueue(delaySeconds string) map[string]string { return map[string]string{"DelaySeconds": delaySeconds} } +func initStartMessageMoveTaskInput() StartMessageMoveTaskInput { + maxNumberOfMessagesPerSecond := int32(1) + return StartMessageMoveTaskInput{ + SourceArn: os.Getenv(SqsQueueTestDlqArn), + DestinationArn: nil, + MaxNumberOfMessagesPerSecond: &maxNumberOfMessagesPerSecond, + } +} + func initOptionsConsumerDefault() []option.Consumer { return []option.Consumer{ option.NewConsumer().SetDebugMode(true), @@ -737,36 +959,64 @@ func initOptionsDefault() []option.Default { } } -func initMessageString(queueUrl string) { - if queueUrl != os.Getenv("SQS_QUEUE_TEST_STRING") { - return +func initOptionsListMessageMoveTasks() []option.ListMessageMoveTasks { + return []option.ListMessageMoveTasks{ + option.NewListMessageMoveTasks().SetDebugMode(true), + option.NewListMessageMoveTasks().SetOptionHttp(option.Http{}), + option.NewListMessageMoveTasks().SetMaxResults(10), } +} + +func initOptionsListMessageMoveTasksWithErr() []option.ListMessageMoveTasks { + return []option.ListMessageMoveTasks{ + option.NewListMessageMoveTasks().SetDebugMode(true), + option.NewListMessageMoveTasks().SetOptionHttp(option.Http{}), + option.NewListMessageMoveTasks().SetMaxResults(0), + } +} + +func initMessageString() { ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() opt := option.NewProducer().SetMessageAttributes(initMessageAttTest()) - _, err := SendMessage(ctx, queueUrl, "test body string", opt) + output, err := SendMessage(ctx, os.Getenv(SqsQueueTestStringUrl), "test body string", opt) if err != nil { logger.Error("error send message:", err) + return } + _ = os.Setenv(SqsMessageId, *output.MessageId) } func initMessageStruct(queueUrl string) { - if queueUrl != os.Getenv("SQS_QUEUE_TEST") && queueUrl != os.Getenv("SQS_QUEUE_TEST_FIFO") { + if queueUrl != os.Getenv(SqsQueueTestUrl) && queueUrl != os.Getenv(SqsQueueTestFifoUrl) { return } ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() opt := option.NewProducer().SetMessageAttributes(initMessageAttTest()) - if queueUrl == os.Getenv("SQS_QUEUE_TEST_FIFO") { - opt.SetMessageGroupId("group") - opt.SetMessageDeduplicationId("deduplication") - } _, err := SendMessage(ctx, queueUrl, initTestStruct(), opt) if err != nil { logger.Error("error send message:", err) } } +func initMessageReceiptHandle() { + initMessageString() + ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) + defer cancel() + ctxInterrupt = ctx + opt := option.NewConsumer(). + SetDebugMode(true). + SetMaxNumberOfMessages(1). + SetVisibilityTimeout(0). + SetDeleteMessageProcessedSuccess(false) + SimpleReceiveMessage[any](os.Getenv(SqsQueueTestStringUrl), func(ctx *SimpleContext[any]) error { + _ = os.Setenv(SqsMessageReceiptHandle, ctx.Message.ReceiptHandle) + cancel() + return nil + }, opt) +} + func initQueueCreateTest() { deleteQueueCreateTest() ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) @@ -782,10 +1032,50 @@ func initQueueCreateTest() { _ = os.Setenv(SqsQueueCreateTestUrl, *output.QueueUrl) } +func initStartMessageMoveTask() { + cancelMessageMoveTaskTest() + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + output, err := StartMessageMoveTask(ctx, initStartMessageMoveTaskInput()) + if err != nil { + logger.Error("error start message move task:", err) + return + } + _ = os.Setenv(SqsTaskHandle, *output.TaskHandle) +} + func deleteQueueCreateTest() { ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() - _, _ = DeleteQueue(ctx, SqsQueueCreateTestUrl) + url := os.Getenv(SqsQueueCreateTestUrl) + if len(url) == 0 { + return + } + _, _ = DeleteQueue(ctx, os.Getenv(SqsQueueCreateTestUrl)) +} + +func purgeQueues() { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + urlString := os.Getenv(SqsQueueTestStringUrl) + urlFifo := os.Getenv(SqsQueueTestFifoUrl) + if len(urlString) != 0 { + _, _ = PurgeQueue(ctx, os.Getenv(SqsQueueTestStringUrl)) + } + if len(urlFifo) != 0 { + _, _ = PurgeQueue(ctx, os.Getenv(SqsQueueTestFifoUrl)) + } +} + +func cancelMessageMoveTaskTest() { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + taskHandle := os.Getenv(SqsTaskHandle) + if len(taskHandle) == 0 { + return + } + _, _ = CancelMessageMoveTask(ctx, os.Getenv(SqsTaskHandle)) + } func getSqsCreateQueueTest() string { diff --git a/sqs/message.go b/sqs/message.go index 05a4e87..0e264d0 100644 --- a/sqs/message.go +++ b/sqs/message.go @@ -5,7 +5,9 @@ import ( "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "go-aws-sqs/internal/client" + "go-aws-sqs/internal/util" "go-aws-sqs/sqs/option" + "time" ) type DeleteMessageBatchInput struct { @@ -44,12 +46,12 @@ type ChangeMessageVisibilityInput struct { // changed. This parameter is returned by the ReceiveMessage action. // // This member is required. - ReceiptHandle *string + ReceiptHandle string // The new value for the message's visibility timeout (in seconds). Values range: 0 // to 43200 . Maximum: 12 hours. // // This member is required. - VisibilityTimeout int32 + VisibilityTimeout time.Duration } type ChangeMessageVisibilityBatchInput struct { @@ -205,9 +207,9 @@ func ChangeMessageVisibility(ctx context.Context, input ChangeMessageVisibilityI loggerInfo(opt.DebugMode, "changing messages visibility..") sqsClient := client.GetClient(ctx) output, err := sqsClient.ChangeMessageVisibility(ctx, &sqs.ChangeMessageVisibilityInput{ - QueueUrl: input.ReceiptHandle, - ReceiptHandle: input.ReceiptHandle, - VisibilityTimeout: input.VisibilityTimeout, + QueueUrl: &input.QueueUrl, + ReceiptHandle: &input.ReceiptHandle, + VisibilityTimeout: util.ConvertDurationToInt32(input.VisibilityTimeout), }, option.FuncByOptionHttp(opt.OptionHttp)) if err != nil { loggerErr(opt.DebugMode, "error charge message visibility:", err) @@ -331,9 +333,6 @@ func ListMessageMoveTasks(ctx context.Context, sourceArn string, opts ...option. func prepareEntriesDeleteMessageBatch(entries []DeleteMessageBatchRequestEntry) []types.DeleteMessageBatchRequestEntry { var result []types.DeleteMessageBatchRequestEntry for _, v := range entries { - if len(v.Id) == 0 && len(v.ReceiptHandle) == 0 { - continue - } rEntry := types.DeleteMessageBatchRequestEntry{ Id: nil, ReceiptHandle: nil, @@ -354,9 +353,6 @@ func prepareEntriesChangeMessageVisibilityBatch( ) []types.ChangeMessageVisibilityBatchRequestEntry { var result []types.ChangeMessageVisibilityBatchRequestEntry for _, v := range entries { - if len(v.Id) == 0 && len(v.ReceiptHandle) == 0 { - continue - } rEntry := types.ChangeMessageVisibilityBatchRequestEntry{ Id: nil, ReceiptHandle: nil, diff --git a/sqs/message_test.go b/sqs/message_test.go new file mode 100644 index 0000000..83490b9 --- /dev/null +++ b/sqs/message_test.go @@ -0,0 +1,110 @@ +package sqs + +import ( + "context" + "os" + "testing" + "time" +) + +func TestDeleteMessage(t *testing.T) { + initMessageReceiptHandle() + for _, tt := range initListTestDeleteMessage() { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + _, err := DeleteMessage(ctx, tt.queueUrl, tt.receiptHandle, tt.opts...) + if (err != nil) != tt.wantErr { + t.Errorf("DeleteMessage() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestDeleteMessageBatch(t *testing.T) { + initMessageReceiptHandle() + for _, tt := range initListTestDeleteMessageBatch() { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + _, err := DeleteMessageBatch(ctx, tt.input, tt.opts...) + if (err != nil) != tt.wantErr { + t.Errorf("DeleteMessageBatch() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestChangeMessageVisibility(t *testing.T) { + initMessageReceiptHandle() + for _, tt := range initListTestChangeMessageVisibility() { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + _, err := ChangeMessageVisibility(ctx, tt.input, tt.opts...) + if (err != nil) != tt.wantErr { + t.Errorf("ChangeMessageVisibility() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestChangeMessageVisibilityBatch(t *testing.T) { + initMessageReceiptHandle() + for _, tt := range initListTestChangeMessageVisibilityBatch() { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + _, err := ChangeMessageVisibilityBatch(ctx, tt.input, tt.opts...) + if (err != nil) != tt.wantErr { + t.Errorf("ChangeMessageVisibilityBatch() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestStartMessageMoveTask(t *testing.T) { + initMessageString() + for _, tt := range initListTestStartMessageMoveTask() { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + output, err := StartMessageMoveTask(ctx, tt.input, tt.opts...) + if (err != nil) != tt.wantErr { + t.Errorf("StartMessageMoveTask() error = %v, wantErr %v", err, tt.wantErr) + return + } else if output != nil && output.TaskHandle != nil { + _ = os.Setenv(SqsTaskHandle, *output.TaskHandle) + cancelMessageMoveTaskTest() + } + }) + } +} + +func TestCancelMessageMoveTask(t *testing.T) { + for _, tt := range initListTestCancelMessageMoveTask() { + t.Run(tt.name, func(t *testing.T) { + initStartMessageMoveTask() + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + _, err := CancelMessageMoveTask(ctx, tt.taskHandle, tt.opts...) + if (err != nil) != tt.wantErr { + t.Errorf("CancelMessageMoveTask() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestListMessageMoveTasks(t *testing.T) { + initStartMessageMoveTask() + for _, tt := range initListTestListMessageMoveTask() { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + _, err := ListMessageMoveTasks(ctx, tt.sourceArn, tt.opts...) + if (err != nil) != tt.wantErr { + t.Errorf("ListMessageMoveTasks() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/sqs/producer.go b/sqs/producer.go index f5542df..0be94b2 100644 --- a/sqs/producer.go +++ b/sqs/producer.go @@ -33,8 +33,7 @@ import ( // Returns: // - *sqs.SendMessageOutput: The result of the SendMessage operation. // - error: An error if one occurs during the SendMessage operation. -func SendMessage(ctx context.Context, queueUrl string, v any, opts ...option.Producer) ( - *sqs.SendMessageOutput, error) { +func SendMessage(ctx context.Context, queueUrl string, v any, opts ...option.Producer) (*sqs.SendMessageOutput, error) { opt := option.GetProducerByParams(opts) loggerInfo(opt.DebugMode, "getting client sqs..") sqsClient := client.GetClient(ctx) @@ -99,8 +98,15 @@ func prepareMessageInput(queueUrl string, v any, opt option.Producer) (*sqs.Send } func getMessageAttValueByOpt(opt option.Producer) (map[string]types.MessageAttributeValue, error) { + if opt.MessageAttributes == nil { + return nil, nil + } v := reflect.ValueOf(opt.MessageAttributes) t := reflect.TypeOf(opt.MessageAttributes) + if t.Kind() == reflect.Pointer || t.Kind() == reflect.Interface { + v = v.Elem() + t = t.Elem() + } if util.IsZeroReflect(v) { return nil, nil } @@ -148,9 +154,6 @@ func convertMapToMessageAttValue(v reflect.Value) (map[string]types.MessageAttri result[mKeyString] = *mValueProcessed } } - if len(result) == 0 { - return nil, nil - } return result, nil } @@ -173,9 +176,6 @@ func convertStructToMessageAttValue(t reflect.Type, v reflect.Value) (map[string result[fieldName] = *valueConverted } } - if len(result) == 0 { - return nil, nil - } return result, nil } diff --git a/sqs/queue_test.go b/sqs/queue_test.go index a3b29fc..732ce16 100644 --- a/sqs/queue_test.go +++ b/sqs/queue_test.go @@ -16,10 +16,11 @@ func TestCreateQueue(t *testing.T) { if (err != nil) != tt.wantErr { t.Errorf("CreateQueue() error = %v, wantErr %v", err, tt.wantErr) return + } else if output != nil && output.QueueUrl != nil { + _ = os.Setenv(SqsQueueCreateTestName, tt.queueName) + _ = os.Setenv(SqsQueueCreateTestUrl, *output.QueueUrl) + deleteQueueCreateTest() } - _ = os.Setenv(SqsQueueCreateTestName, tt.queueName) - _ = os.Setenv(SqsQueueCreateTestUrl, *output.QueueUrl) - deleteQueueCreateTest() }) } } @@ -81,6 +82,7 @@ func TestUntagQueue(t *testing.T) { } func TestPurgeQueue(t *testing.T) { + initQueueCreateTest() for _, tt := range initListTestPurgeQueue() { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)