diff --git a/README.md b/README.md index 0d8d30b..25b0a34 100644 --- a/README.md +++ b/README.md @@ -1 +1,399 @@ -# go-aws-sqs \ No newline at end of file +AWS SQS Template +================= + + + +[![Project status](https://img.shields.io/badge/version-v1.0.0-vividgreen.svg)](https://github.com/GabrielHCataldo/go-aws-sqs/releases/tag/v1.0.5) +[![Go Report Card](https://goreportcard.com/badge/github.com/GabrielHCataldo/go-aws-sqs)](https://goreportcard.com/report/github.com/GabrielHCataldo/go-aws-sqs) +[![Coverage Status](https://coveralls.io/repos/GabrielHCataldo/go-aws-sqs/badge.svg?branch=main&service=github)](https://coveralls.io/github/GabrielHCataldo/go-aws-sqs?branch=main) +[![Open Source Helpers](https://www.codetriage.com/gabrielhcataldo/go-aws-sqs/badges/users.svg)](https://www.codetriage.com/gabrielhcataldo/go-aws-sqs) +[![GoDoc](https://godoc.org/github/GabrielHCataldo/go-aws-sqs?status.svg)](https://pkg.go.dev/github.com/GabrielHCataldo/go-aws-sqs/sqs) +![License](https://img.shields.io/dub/l/vibe-d.svg) + +[//]: # ([![build workflow](https://github.com/GabrielHCataldo/go-aws-sqs/actions/workflows/go.yml/badge.svg)](https://github.com/GabrielHCataldo/go-aws-sqs/actions)) + +[//]: # ([![Source graph](https://sourcegraph.com/github.com/go-aws-sqs/sqs/-/badge.svg)](https://sourcegraph.com/github.com/go-aws-sqs/sqs?badge)) + +[//]: # ([![TODOs](https://badgen.net/https/api.tickgit.com/badgen/github.com/GabrielHCataldo/go-aws-sqs/sqs)](https://www.tickgit.com/browse?repo=github.com/GabrielHCataldo/go-aws-sqs)) + +The go-aws-sqs project came to facilitate the use of aws sqs in your go project with incredible flexibility in sending +messages to any type of body, and a fantastic and simple to use consumer. Below we list some implemented features: + +- Simplicity in message production, with auto conversion of body and message attributes +- Powerful customizable consumer, with auto conversion of body and message attributes. +- Automatic removal of the successfully consumed message (Optional). +- More simplistic function calls. +- Log visibility for the consumer. +- Asynchronous message production. +- Asynchronous message consumption. +- Don't worry about unwanted type conversions anymore. + +Installation +------------ + +Use go get. + + go get github.com/GabrielHCataldo/go-aws-sqs + +Then import the go-aws-sqs package into your own code. + +```go +import "github.com/GabrielHCataldo/go-aws-sqs/sqs" +``` + +Usability and documentation +------------ +**IMPORTANT**: Always check the documentation in the structures and functions fields. +For more details on the examples, visit [All examples link](https://github/GabrielHCataldo/go-aws-sqs/blob/main/_example) + +### Producer +To produce a message, it's simple, in the example below we will send a normal body text: + +```go +import ( + "context" + "github.com/GabrielHCataldo/go-aws-sqs/sqs" + "github.com/GabrielHCataldo/go-logger/logger" + "os" +) + +func main() { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + body := "body test" + message, err := sqs.SendMessage(ctx, os.Getenv("SQS_QUEUE_TEST_URL"), body) + if err != nil { + logger.Error("error send message:", err) + } else { + logger.Info("message sent successfully:", message) + } +} +``` + +Output: + + [INFO 2023/12/15 07:22:34] main.go:33: message sent successfully: {"MD5OfMessageAttributes":null,"MD5OfMessageBody":"2e9cc74f6f6aca12eaa2d252df457910","MD5OfMessageSystemAttributes":null,"MessageId":"48276fb7-022d-4114-8282-b407d8fd4dd3","SequenceNumber":null,"ResultMetadata":{}} + +You can also pass any type of value, below we will pass the body as a structure: + +```go +import ( + "context" + "github.com/GabrielHCataldo/go-aws-sqs/sqs" + "github.com/GabrielHCataldo/go-logger/logger" + "os" + "time" +) + +type test struct { + Name string `json:"name,omitempty"` + BirthDate time.Time `json:"birthDate,omitempty"` + Emails []string `json:"emails,omitempty"` + Bank bank `json:"bank,omitempty"` + Map map[string]any +} + +type bank struct { + Account string `json:"account,omitempty"` + Digits string `json:"digits,omitempty"` + Balance float64 `json:"balance,omitempty"` +} + +func main() { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + body := initTestStruct() + message, err := sqs.SendMessage(ctx, os.Getenv("SQS_QUEUE_TEST_URL"), body) + if err != nil { + logger.Error("error send message:", err) + } else { + logger.Info("message sent successfully:", message) + } +} +``` + +Output: + + [INFO 2023/12/15 07:30:42] main.go:45: message sent successfully: {"MD5OfMessageAttributes":null,"MD5OfMessageBody":"52e95a6a12e47e7ef6f63ff0ccfb77b6","MD5OfMessageSystemAttributes":null,"MessageId":"b43e19ca-48d9-47e8-8457-183242b86e1e","SequenceNumber":null,"ResultMetadata":{}} + +As an optional parameter, we have **DelaySeconds**, **MessageAttributes**, **MessageSystemAttributes** +among others, see below: + +```go +import ( + "context" + "github.com/GabrielHCataldo/go-aws-sqs/sqs" + "github.com/GabrielHCataldo/go-logger/logger" + "os" + "time" +) + +func main() { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + body := initTestStruct() + opt := option.NewProducer(). + // HTTP communication customization options with AWS SQS + SetHttpClient(option.HttpClient{}). + // print logs (default: false) + SetDebugMode(true). + // delay to delay the availability of message processing (default: 0) + SetDelaySeconds(5 * time.Second). + // Message attributes, must be of type Map or Struct, other types are not acceptable. + SetMessageAttributes(initTestMap()). + // The message system attribute to send + SetMessageSystemAttributes(option.MessageSystemAttributes{}). + // This parameter applies only to FIFO (first-in-first-out) queues. The token used for deduplication of sent messages. + SetMessageDeduplicationId(""). + // This parameter applies only to FIFO (first-in-first-out) queues. The tag that specifies that a message belongs to a specific message group. + SetMessageGroupId("") + + message, err := sqs.SendMessage(ctx, os.Getenv("SQS_QUEUE_TEST_URL"), body, opt) + if err != nil { + logger.Error("error send message:", err) + } else { + logger.Info("message sent successfully:", message) + } +} +``` + +Output: + + [INFO 2023/12/15 08:00:54] producer.go:40: getting client sqs.. + [INFO 2023/12/15 08:00:54] producer.go:42: preparing message input.. + [INFO 2023/12/15 08:00:54] producer.go:48: sending message.. + [INFO 2023/12/15 08:00:54] producer.go:53: message sent successfully: {"MD5OfMessageAttributes":"2a0b53405b3678d246c0a76b321e1cea","MD5OfMessageBody":"19554d258904ecb0a27b3cf238c2ecf2","MD5OfMessageSystemAttributes":null,"MessageId":"e08368e5-08b5-456b-a4f3-b6fb7862b893","SequenceNumber":null,"ResultMetadata":{}} + [INFO 2023/12/15 08:00:54] main.go:85: message sent successfully: {"MD5OfMessageAttributes":"2a0b53405b3678d246c0a76b321e1cea","MD5OfMessageBody":"19554d258904ecb0a27b3cf238c2ecf2","MD5OfMessageSystemAttributes":null,"MessageId":"e08368e5-08b5-456b-a4f3-b6fb7862b893","SequenceNumber":null,"ResultMetadata":{}} + +We can use all these examples mentioned above asynchronously, calling the function +**SendMessageAsync**, see: + +```go +import ( + "context" + "github.com/GabrielHCataldo/go-aws-sqs/sqs" + "github.com/GabrielHCataldo/go-logger/logger" + "os" +) + +func main() { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + body := "body test" + sqs.SendMessageAsync(ctx, os.Getenv("SQS_QUEUE_TEST_URL"), body) +} +``` + +For more producer examples visit: [All examples produce](https://github/GabrielHCataldo/go-aws-sqs/blob/main/_example/producer/main.go) + +### Consumer + +To consume messages from the queue, it is also very simple, you don't need to worry about writing loop +lines, conversion lines, log lines, etc., see a simple example below: + +```go +import ( + "context" + "github.com/GabrielHCataldo/go-aws-sqs/sqs" + "github.com/GabrielHCataldo/go-logger/logger" + "os" +) + +func main() { + sqs.SimpleReceiveMessage(os.Getenv("SQS_QUEUE_TEST_STRING_URL"), handler) +} + +func handler(ctx *sqs.SimpleContext[string]) error { + logger.Debug("ctx simple to process message:", ctx) + return nil +} +``` + +Output: + + [DEBUG 2023/12/15 08:27:49] main.go:43: ctx simple to process message: {"QueueUrl":"https://sqs.sa-east-1.amazonaws.com/430896945629/go-aws-sqs-test-string","Message":{"Attributes":{"ApproximateFirstReceiveTimestamp":"0001-01-01T00:00:00Z","ApproximateReceiveCount":0,"MessageDeduplicationId":"","MessageGroupId":"","SenderId":"","SentTimestamp":"0001-01-01T00:00:00Z","SequenceNumber":0},"Body":"body test","Id":"d2b78dfe-8b68-4b7d-af59-15826c52515a","MD5OfBody":"2e9cc74f6f6aca12eaa2d252df457910","MD5OfMessageAttributes":null,"MessageAttributes":null,"ReceiptHandle":"AQEBG2Ek7PrGMDpnvCbTh3pX8U3I2AjkRyQW1e3QZmgZBmn/x4gB5l1Wd6S/c6NTlsuiIZ6qp3230ZrN2ogjWlTbX65jS3zTZfIsDBCrqf3+Zra31XthuZClEzSMf2qP2tpPFtX0Dm5De4YkoOaXm2mk3z0e201DSF5lZK3HK2ACA+ftc3UhlO7HMaGsTanAq+6QvWVLi49xVn5lUEfh+nEeOpBMywCLneIsBzUH9H/Jt62g3p10tYMFG+TAFiTfdwK2yWhwrVcFCOK8oDd+GA90v38vz0bIK8JcqvkRejMe3TUS5HocP+Adt5MW/ona/eLHIPVUqpuFULS7A1FIVIWSwnC4WSBpDOy8eNg/lESgawY9MmThb53h0jJiMp3IXgGTSTiF7qRZFimNIj8IjdFBNg=="}} + [DEBUG 2023/12/15 08:27:49] main.go:43: ctx simple to process message: {"QueueUrl":"https://sqs.sa-east-1.amazonaws.com/430896945629/go-aws-sqs-test-string","Message":{"Attributes":{"ApproximateFirstReceiveTimestamp":"0001-01-01T00:00:00Z","ApproximateReceiveCount":0,"MessageDeduplicationId":"","MessageGroupId":"","SenderId":"","SentTimestamp":"0001-01-01T00:00:00Z","SequenceNumber":0},"Body":"body test","Id":"d2b78dfe-8b68-4b7d-af59-15826c52515a","MD5OfBody":"2e9cc74f6f6aca12eaa2d252df457910","MD5OfMessageAttributes":null,"MessageAttributes":null,"ReceiptHandle":"AQEBfQYdy5nmL+BVJFslmID7vW8WyDnL83OGY8i4YfpfqaAM4xZ2+cwMIPsKoyOVZYII565b6I8XvoJmsZBK7adaGZtUk9r4+w9EZVfe3DCxB4Bt5gYbdb15Kjes8iQAQowuwCZ8e8FSnj4wXEBacPbaOLN0qo1afU3hU3kZ7ZION+0jfiup2AE3yK5/xR7vQtNqTZulNpzda5o1XICsdz/67iLQExhEJqbO3iSrclbzGVaggjPXYOlzx7iFeQrS2hcbQe5boZHCl/W8gC12TRPf/WX2OxNiU9UddY/nBPpy6FJG6I1HPmQla6OMznllE/f4yhjk1a5DzbSEDUjXaaalDFzUBYRbhpEFmb5NbNrsto/myP+QOhoU+DtgdL9iUQen83+MKJDpbCglmdpBYvoIKA=="}} + + +If you want to obtain the message attributes serialized in Map or Struct, you can call the function +**ReceiveMessage** passing in the second parameter of the handler context indicating the type you want to serialize +message attributes, see: + +```go +import ( + "context" + "github.com/GabrielHCataldo/go-aws-sqs/sqs" + "github.com/GabrielHCataldo/go-logger/logger" + "os" +) + +type messageAttTest struct { + Name string `json:"account,omitempty"` + Text string `json:"text,omitempty"` + Balance float64 `json:"balance,omitempty"` + Bool bool `json:"bool"` + Int int `json:"int"` + SubStruct test `json:"subStruct,omitempty"` + PointerTest *test `json:"pointerBank,omitempty"` + Map map[string]any + Any any + EmptyString string `json:"emptyString,omitempty"` + HideString string `json:"-"` +} + +func main() { + sqs.ReceiveMessage(os.Getenv("SQS_QUEUE_TEST_STRING_URL"), handler) +} + +func handler(ctx *sqs.Context[test, messageAttTest]) error { + logger.Debug("ctx to process message:", ctx) + return nil +} +``` + +Output: + + [DEBUG 2023/12/15 08:19:08] main.go:43: ctx to process message: {"QueueUrl":"https://sqs.sa-east-1.amazonaws.com/430896945629/go-aws-sqs-test-string","Message":{"Attributes":{"ApproximateFirstReceiveTimestamp":"0001-01-01T00:00:00Z","ApproximateReceiveCount":0,"MessageDeduplicationId":"","MessageGroupId":"","SenderId":"","SentTimestamp":"0001-01-01T00:00:00Z","SequenceNumber":0},"Body":"body test","Id":"0fe8cca6-92ac-4a6d-98de-ec373f433266","MD5OfBody":"2e9cc74f6f6aca12eaa2d252df457910","MD5OfMessageAttributes":null,"MessageAttributes":{"Any":null,"Map":null,"account":"","balance":0,"bool":false,"emptyString":"","int":0,"subStruct":{"Map":null,"bank":{"account":"","balance":0,"digits":""},"birthDate":"0001-01-01T00:00:00Z","name":""},"text":""},"ReceiptHandle":"AQEBFaWjTuCHZcWH9JNlAJt0/cyJBwnK+K5yWVYpCvT+/EBsnSRkEuOp1G6YqSA5szJKc7EXULmDlUxWzQ4pYvCT9BhP6nk82blgcg3Kw5zQy/fym6vg46CdoqLwT0d9vAlnKOsvtkN/dGC6ze0lcSsGjSbUIujo3NC4uXzmflbuanaUIfPo7eju2LGYtw/eAgeVHIdMeH1kkrYfNfTl5iRQejSuSoaozq0V/E3+hfyNhBpmm+J2Bsays+AHvXEZpMNwouycwTpiBWpQzoybUJbXVwHSiQj539hAOGsrmJ2vG+ZQfYb19AClh0gdvoyKsuR6F4Qny0mGqOaGOqD2k/CiuTHkz+W/6nclTKTmaUlUogc+EHCC5qirnKq3i/pCd8UFWGV8PZ7vy48Deh/KYRvLJw=="}} + +We can consume messages with body of all types in all message receiving functions, +See below an example consuming a body with structure: + +```go +import ( + "context" + "github.com/GabrielHCataldo/go-aws-sqs/sqs" + "github.com/GabrielHCataldo/go-logger/logger" + "os" +) + +type test struct { + Name string `json:"name,omitempty"` + BirthDate time.Time `json:"birthDate,omitempty"` + Emails []string `json:"emails,omitempty"` + Bank bank `json:"bank,omitempty"` + Map map[string]any +} + +type bank struct { + Account string `json:"account,omitempty"` + Digits string `json:"digits,omitempty"` + Balance float64 `json:"balance,omitempty"` +} + +func main() { + sqs.SimpleReceiveMessage(os.Getenv("SQS_QUEUE_TEST_URL"), handler) +} + +func handler(ctx *sqs.SimpleContext[test]) error { + logger.Debug("ctx simple body struct to process message:", ctx) + return nil +} +``` + +Output: + + [DEBUG 2023/12/15 09:08:39] main.go:47: ctx simple body struct to process message: {"QueueUrl":"https://sqs.sa-east-1.amazonaws.com/430896945629/go-aws-sqs-test","Message":{"Attributes":{"ApproximateFirstReceiveTimestamp":"0001-01-01T00:00:00Z","ApproximateReceiveCount":0,"MessageDeduplicationId":"","MessageGroupId":"","SenderId":"","SentTimestamp":"0001-01-01T00:00:00Z","SequenceNumber":0},"Body":{"Map":{"bool":true,"float":1.23,"int":1,"string":"text test"},"bank":{"account":"123456","balance":200.12,"digits":"2"},"birthDate":"2023-12-15T09:08:25-03:00","emails":["test@gmail.com","gabriel@gmail.com","gabriel.test@gmail.com"],"name":"Test Name"},"Id":"ac141711-fcf0-4073-a206-78a8a8914756","MD5OfBody":"9d70de56f2ce5bb257a0e88a4ae5bb64","MD5OfMessageAttributes":null,"MessageAttributes":null,"ReceiptHandle":"AQEBOSU25xkDe6sED3CxRGKFrgwTcLpaHpwgsBUSUkG2VXzVmcKLTnXp1JtnbB9fS1S/C1s3R5XWyFfgAasNQyxuFpwYJ07O0ZrTzixElJxUPCrcOxt0glM7Oa7wHeiU4yzT4xfVy7nv4r+oOPcaLUgiWvk0pp6q72Uu/Y51zqIRbhlpRnG189FPbhExfjqvTWjqzHNUzWop5cCMVhqF7F8BKp1LXVADOfbtaBTaJX17lv5x+tHOjjMg8hecJNWAo7k9PZg3+b23Fxe3/k2scjayKFXdsjkpgWrow+VvzJP5FrZ6b/jTgFTTkU4hz6zLmrntRCSqZpwmdU1rb+tCDGH3cHLoZ61vl0KOcgmCLnrkrG3pU0jfP1zOeHzYm+EZV+75LAnbPtjqdN/iZGLqseotvg=="}} + +If you want to customize consumer roles, see example below: + +```go +import ( + "context" + "github.com/GabrielHCataldo/go-aws-sqs/sqs" + "github.com/GabrielHCataldo/go-logger/logger" + "os" +) + +func main() { + opt := option.NewConsumer(). + // HTTP communication customization options with AWS SQS + SetHttpClient(option.HttpClient{}). + // print logs (default: false) + SetDebugMode(true). + // If true remove the message from the queue after successfully processed (handler error return is null) (default: false) + SetDeleteMessageProcessedSuccess(true). + // Duration time to process the message, timeout applied in the past context. (default: 5 seconds) + SetConsumerMessageTimeout(5 * time.Second). + // Delay to run the next search for messages in the queue (default: 0) + SetDelayQueryLoop(5 * time.Second). + // The maximum number of messages to return. 1 a 10 (default: 10) + SetMaxNumberOfMessages(10). + // The maximum number of messages to return. 1 a 10 (default: 0) + SetVisibilityTimeout(5 * time.Second). + // The duration that the received messages are hidden from subsequent + // retrieve requests after being retrieved by a ReceiveMessage request. + SetReceiveRequestAttemptId(""). + // The duration for which the call waits for a message to arrive in + // the queue before returning (default: 0) + SetWaitTimeSeconds(1 * time.Second) + + sqs.SimpleReceiveMessage(os.Getenv("SQS_QUEUE_TEST_URL"), handler, opt) +} + +func handler(ctx *sqs.SimpleContext[test]) error { + logger.Debug("ctx simple body struct to process message:", ctx) + return nil +} +``` + +Output: + + [INFO 2023/12/15 10:08:44] consumer.go:265: Run start find messages with options: {"Default":{"debugMode":true,"httpClient":{"APIOptions":null,"AppID":"","AuthSchemeResolver":null,"AuthSchemes":null,"BaseEndpoint":null,"ClientLogMode":0,"Credentials":null,"DefaultsMode":"","DisableMessageChecksumValidation":false,"EndpointOptions":{"DisableHTTPS":false,"LogDeprecated":false,"Logger":null,"ResolvedRegion":"","UseDualStackEndpoint":0,"UseFIPSEndpoint":0},"EndpointResolverV2":null,"HTTPClient":null,"HTTPSignerV4":null,"Logger":null,"Region":"","RetryMaxAttempts":0,"RetryMode":"","Retryer":null,"RuntimeEnvironment":{"EC2InstanceMetadataRegion":"","EnvironmentIdentifier":"","Region":""}}},"DeleteMessageProcessedSuccess":true,"ConsumerMessageTimeout":5000000000,"DelayQueryLoop":5000000000,"MaxNumberOfMessages":10,"VisibilityTimeout":5000000000,"ReceiveRequestAttemptId":null,"WaitTimeSeconds":1000000000} + [INFO 2023/12/15 10:08:44] consumer.go:244: Start process received messages size: 3 + [DEBUG 2023/12/15 10:08:44] main.go:77: ctx simple body struct to process message: {"QueueUrl":"https://sqs.sa-east-1.amazonaws.com/430896945629/go-aws-sqs-test","Message":{"Attributes":{"ApproximateFirstReceiveTimestamp":"0001-01-01T00:00:00Z","ApproximateReceiveCount":0,"MessageDeduplicationId":"","MessageGroupId":"","SenderId":"","SentTimestamp":"0001-01-01T00:00:00Z","SequenceNumber":0},"Body":{"Map":{"balance":10.23,"bool":true,"emptyString":"","int":3,"name":"Name test producer","nil":null,"text":"Text field"},"bank":{"account":"123456","balance":200.12,"digits":"2"},"birthDate":"2023-12-15T09:51:48-03:00","emails":["test@gmail.com","gabriel@gmail.com","gabriel.test@gmail.com"],"name":"Test Name"},"Id":"8a41c561-173e-4c8d-b1a9-645e61cd1e85","MD5OfBody":"f2deace7b685238892eab686ce6d4932","MD5OfMessageAttributes":"7aea2eb88bd4799daf004990a6397c00","MessageAttributes":{"Map":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"{\"balance\":10.23,\"bool\":true,\"emptyString\":\"\",\"int\":3,\"name\":\"Name test producer\",\"nil\":null,\"text\":\"Text field\"}"},"account":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"Name test producer"},"balance":{"BinaryListValues":null,"BinaryValue":null,"DataType":"Number","StringListValues":null,"StringValue":"10.23"},"bool":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"true"},"int":{"BinaryListValues":null,"BinaryValue":null,"DataType":"Number","StringListValues":null,"StringValue":"3"},"pointerBank":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"{\"name\":\"Test Name\",\"birthDate\":\"2023-12-15T09:51:48.015621-03:00\",\"emails\":[\"test@gmail.com\",\"gabriel@gmail.com\",\"gabriel.test@gmail.com\"],\"bank\":{\"account\":\"123456\",\"digits\":\"2\",\"balance\":200.12},\"Map\":{\"balance\":10.23,\"bool\":true,\"emptyString\":\"\",\"int\":3,\"name\":\"Name test producer\",\"nil\":null,\"text\":\"Text field\"},\"emptyString\":\"\",\"pointerBank\":{\"account\":\"123456\",\"digits\":\"2\",\"balance\":200.12},\"Any\":null}"},"subStruct":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"{\"name\":\"Test Name\",\"birthDate\":\"2023-12-15T09:51:48.015621-03:00\",\"emails\":[\"test@gmail.com\",\"gabriel@gmail.com\",\"gabriel.test@gmail.com\"],\"bank\":{\"account\":\"123456\",\"digits\":\"2\",\"balance\":200.12},\"Map\":{\"balance\":10.23,\"bool\":true,\"emptyString\":\"\",\"int\":3,\"name\":\"Name test producer\",\"nil\":null,\"text\":\"Text field\"},\"emptyString\":\"\",\"pointerBank\":{\"account\":\"123456\",\"digits\":\"2\",\"balance\":200.12},\"Any\":null}"},"text":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"Text field"}},"ReceiptHandle":"AQEB8Cs8DmT2np6msZsgZq0BrL7HR5Q5imP4sK89UnwfvIKPLTKn5Ca+j0Cy1xDdSmFONpKtTvyhMDTIX5hIJYMXJmf45rZlf0msHYZ6GmBQoKRJbbDseAIFTmqhJTtsD8oXweBvjSAeVh5hw7kuZCW+bMQf2x6cQyk0WPw7oklHbGos+yVuyzn/OWeVx+7wgkF9E9ozfJfuf3lxTrygXOne7bB3q2unnM4luNIVgcD/Hxni6lIoe0RdOK8rpo7XrxdHWgUh64JcR2bQNZbnY6dPU0Tch6TEkb+sB7Yenvdkr2ZTdMClVhDvFqYjeReBawuLXamtVpiOp+dfTvCy8tEVvkZj0cIV+nCrj3nv2Nsz/jqkv/bM9FF4+O8lMYuHvskTz49SYxDmlwxp6vyMZTDROg=="}} + [DEBUG 2023/12/15 10:08:44] main.go:77: ctx simple body struct to process message: {"QueueUrl":"https://sqs.sa-east-1.amazonaws.com/430896945629/go-aws-sqs-test","Message":{"Attributes":{"ApproximateFirstReceiveTimestamp":"0001-01-01T00:00:00Z","ApproximateReceiveCount":0,"MessageDeduplicationId":"","MessageGroupId":"","SenderId":"","SentTimestamp":"0001-01-01T00:00:00Z","SequenceNumber":0},"Body":{"Map":{"balance":10.23,"bool":true,"emptyString":"","int":3,"name":"Name test producer","nil":null,"text":"Text field"},"bank":{"account":"123456","balance":200.12,"digits":"2"},"birthDate":"2023-12-15T09:54:27-03:00","emails":["test@gmail.com","gabriel@gmail.com","gabriel.test@gmail.com"],"name":"Test Name"},"Id":"c3e1360c-2eb2-4326-b80e-85d69c8f32f3","MD5OfBody":"a599c012497c800aed1679812981d6da","MD5OfMessageAttributes":"e1fc8c890c4720384ffca66c432cc0ac","MessageAttributes":{"Map":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"{\"balance\":10.23,\"bool\":true,\"emptyString\":\"\",\"int\":3,\"name\":\"Name test producer\",\"nil\":null,\"text\":\"Text field\"}"},"account":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"Name test producer"},"balance":{"BinaryListValues":null,"BinaryValue":null,"DataType":"Number","StringListValues":null,"StringValue":"10.23"},"bool":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"true"},"int":{"BinaryListValues":null,"BinaryValue":null,"DataType":"Number","StringListValues":null,"StringValue":"3"},"pointerBank":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"{\"name\":\"Test Name\",\"birthDate\":\"2023-12-15T09:54:27.558714-03:00\",\"emails\":[\"test@gmail.com\",\"gabriel@gmail.com\",\"gabriel.test@gmail.com\"],\"bank\":{\"account\":\"123456\",\"digits\":\"2\",\"balance\":200.12},\"Map\":{\"balance\":10.23,\"bool\":true,\"emptyString\":\"\",\"int\":3,\"name\":\"Name test producer\",\"nil\":null,\"text\":\"Text field\"},\"emptyString\":\"\",\"pointerBank\":{\"account\":\"123456\",\"digits\":\"2\",\"balance\":200.12},\"Any\":null}"},"subStruct":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"{\"name\":\"Test Name\",\"birthDate\":\"2023-12-15T09:54:27.558714-03:00\",\"emails\":[\"test@gmail.com\",\"gabriel@gmail.com\",\"gabriel.test@gmail.com\"],\"bank\":{\"account\":\"123456\",\"digits\":\"2\",\"balance\":200.12},\"Map\":{\"balance\":10.23,\"bool\":true,\"emptyString\":\"\",\"int\":3,\"name\":\"Name test producer\",\"nil\":null,\"text\":\"Text field\"},\"emptyString\":\"\",\"pointerBank\":{\"account\":\"123456\",\"digits\":\"2\",\"balance\":200.12},\"Any\":null}"},"text":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"Text field"}},"ReceiptHandle":"AQEBiTWyaLxTUXk+sQbCogksDaphcYmOrNhlaUEk6DuREqbRL6/5Lu8tq6nMXAYCWDytUAH7CnNtuh8bXeeB89S4kl24pUSbKS3/OmOADichHPhgNteUANGxDvutZqonUjQW9DqwoqnEXJAapFSSW3vRBmcSOTOQBroDhunIz0BQzbr1ylkw6J11FU0iIzHMWA90vWQj5hIIhK5tZGTbhTIys+DIJLYqdvY4VQR2QRScVbKj4rSIrz7j0RE94+vkP1h8dB+XR1Guq7+E4N68batQq1oAA2SjDa/q3mYkYTtputb1sw55TqvW9boLetwqb98dQlcE6OoMN3vV3Rudl3ixLESNHXn7Od+mDKmHRKPm0irWhP05m0dHfTlVnPGU5dreFO/9DCJ4vEGZjTONbNpxTg=="}} + [DEBUG 2023/12/15 10:08:44] main.go:77: ctx simple body struct to process message: {"QueueUrl":"https://sqs.sa-east-1.amazonaws.com/430896945629/go-aws-sqs-test","Message":{"Attributes":{"ApproximateFirstReceiveTimestamp":"0001-01-01T00:00:00Z","ApproximateReceiveCount":0,"MessageDeduplicationId":"","MessageGroupId":"","SenderId":"","SentTimestamp":"0001-01-01T00:00:00Z","SequenceNumber":0},"Body":{"Map":{"balance":10.23,"bool":true,"emptyString":"","int":3,"name":"Name test producer","nil":null,"text":"Text field"},"bank":{"account":"123456","balance":200.12,"digits":"2"},"birthDate":"2023-12-15T09:55:12-03:00","emails":["test@gmail.com","gabriel@gmail.com","gabriel.test@gmail.com"],"name":"Test Name"},"Id":"807ccb3f-7bfd-4e8a-83fd-18f461ce7f6b","MD5OfBody":"edcf363ca1c42c81c34fbb3586c4fbb6","MD5OfMessageAttributes":"e7b35866b4dbfa2075553540b18433b5","MessageAttributes":{"Map":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"{\"balance\":10.23,\"bool\":true,\"emptyString\":\"\",\"int\":3,\"name\":\"Name test producer\",\"nil\":null,\"text\":\"Text field\"}"},"account":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"Name test producer"},"balance":{"BinaryListValues":null,"BinaryValue":null,"DataType":"Number","StringListValues":null,"StringValue":"10.23"},"bool":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"true"},"int":{"BinaryListValues":null,"BinaryValue":null,"DataType":"Number","StringListValues":null,"StringValue":"3"},"pointerBank":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"{\"name\":\"Test Name\",\"birthDate\":\"2023-12-15T09:55:12.111056-03:00\",\"emails\":[\"test@gmail.com\",\"gabriel@gmail.com\",\"gabriel.test@gmail.com\"],\"bank\":{\"account\":\"123456\",\"digits\":\"2\",\"balance\":200.12},\"Map\":{\"balance\":10.23,\"bool\":true,\"emptyString\":\"\",\"int\":3,\"name\":\"Name test producer\",\"nil\":null,\"text\":\"Text field\"},\"emptyString\":\"\",\"pointerBank\":{\"account\":\"123456\",\"digits\":\"2\",\"balance\":200.12},\"Any\":null}"},"subStruct":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"{\"name\":\"Test Name\",\"birthDate\":\"2023-12-15T09:55:12.111056-03:00\",\"emails\":[\"test@gmail.com\",\"gabriel@gmail.com\",\"gabriel.test@gmail.com\"],\"bank\":{\"account\":\"123456\",\"digits\":\"2\",\"balance\":200.12},\"Map\":{\"balance\":10.23,\"bool\":true,\"emptyString\":\"\",\"int\":3,\"name\":\"Name test producer\",\"nil\":null,\"text\":\"Text field\"},\"emptyString\":\"\",\"pointerBank\":{\"account\":\"123456\",\"digits\":\"2\",\"balance\":200.12},\"Any\":null}"},"text":{"BinaryListValues":null,"BinaryValue":null,"DataType":"String","StringListValues":null,"StringValue":"Text field"}},"ReceiptHandle":"AQEBw7GpJmzuQjetJgl7VLNVy7ESboXzdTcprQoRxH/xV1aQewH4P5cXmgB8sY/Zoa/QoGSbzq42TbRaBOr3y+htzzv399ysn7vW9FlImpsXWQHgf/TOHr3L4Uv6hwr3KcWZzfSe0nRIzf+YQQzDBf0MayU+NWSfkbcyM1g2FCwByho+SFvLIi5RkYIGpGVh2IpUEP2Bfym5QKHBddV9bXR7Uf32xTQVhHDb+7LxJmhgCyBFawGiRh9ZVVMBP0maZMyHh1keOk7N5Z8v9zVvvzsOMagciMB2ysT9wmbO9kinuPHE683a+mKDdibmnu/Bo0tGTXp2Iv6ksT31nUdM05JwYdCtkoyH7ZU556sY2D/XChc0a+aOJ1PvfHAxe756MmglldU5fdmn30VVsfAzj8/VPQ=="}} + [INFO 2023/12/15 10:08:44] consumer.go:290: Finish process messages! processed: 3 success: ["807ccb3f-7bfd-4e8a-83fd-18f461ce7f6b"] failed: null + + +You can also consume messages asynchronously, see: + +```go +import ( + "context" + "github.com/GabrielHCataldo/go-aws-sqs/sqs" + "github.com/GabrielHCataldo/go-logger/logger" + "os" +) + +func main() { + sqs.SimpleReceiveMessageAsync(os.Getenv("SQS_QUEUE_TEST_URL"), handler, option.NewConsumer().SetDebugMode(true)) + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + select { + case <-c: + logger.Info("Stopped application!") + } +} + +func handler(ctx *sqs.SimpleContext[test]) error { + logger.Debug("ctx simple body struct to process message:", ctx) + return nil +} +``` + +Output: + + [INFO 2023/12/15 10:12:52] consumer.go:265: Run start find messages with options: {"Default":{"debugMode":true},"DeleteMessageProcessedSuccess":false,"ConsumerMessageTimeout":5000000000,"DelayQueryLoop":5000000000,"MaxNumberOfMessages":10,"VisibilityTimeout":0,"ReceiveRequestAttemptId":null,"WaitTimeSeconds":0} + [INFO 2023/12/15 10:12:52] consumer.go:240: No msg available to be processed, searching again in 5s + [INFO 2023/12/15 10:12:57] consumer.go:244: Start process received messages size: 1 + [DEBUG 2023/12/15 10:12:57] main.go:88: ctx simple body struct to process message: {"QueueUrl":"https://sqs.sa-east-1.amazonaws.com/430896945629/go-aws-sqs-test","Message":{"Attributes":{"ApproximateFirstReceiveTimestamp":"0001-01-01T00:00:00Z","ApproximateReceiveCount":0,"MessageDeduplicationId":"","MessageGroupId":"","SenderId":"","SentTimestamp":"0001-01-01T00:00:00Z","SequenceNumber":0},"Body":{"Map":{"bool":true,"float":1.23,"int":1,"string":"text test"},"bank":{"account":"123456","balance":200.12,"digits":"2"},"birthDate":"2023-12-15T10:13:28-03:00","emails":["test@gmail.com","gabriel@gmail.com","gabriel.test@gmail.com"],"name":"Test Name"},"Id":"4ce4ed91-9b4b-4820-897b-9da5a8c6c630","MD5OfBody":"ad4c4de0c67e5eb5fb373b308fd38f53","MD5OfMessageAttributes":null,"MessageAttributes":null,"ReceiptHandle":"AQEB96WZYV/s91wxR1n8MUr39NllE4h+oVRZUN2WdCRXQoZEJ4DrV09kqyRRxnRxkyh9J/jTCX6sbyjKj6WzJ7YYkhrwr3ruQBW4BR/S2zz8b94waclJpfTaoq2fAa3SBl2/5nQhrCfsuFGfdDhIANlQNv9fyRMv4Vxsil9cjWauMXM2ilgsrcSnaDX2mRFzxDPzGhTnLJEIXOsJ+5nWb4ex5IVsYc81V8TEfs1c4dYmGc4PNs7s2MXtlXtDy2mOg+YnfgCT5evflCy3oN8qEXNglKqCDEuqqeQU2JXslN76zsObKG8ReZyB9PZIimfnhbM6AhIif5YbSfMX5ZylcyKZGF/9K8qwhAh51Lq3TBMxnHT+tOhslnov8MlECcWPjfQW2HdaXGnBGd/phV83MwlhVw=="}} + [INFO 2023/12/15 10:14:18] consumer.go:290: Finish process messages! processed: 1 success: ["4ce4ed91-9b4b-4820-897b-9da5a8c6c630"] failed: null + +For more consumer examples visit: [All examples consumer](https://github/GabrielHCataldo/go-aws-sqs/blob/main/_example/consumer/main.go) + +### For more examples + +- [Producer](https://github/GabrielHCataldo/go-aws-sqs/blob/main/_example/consumer/main.go) +- [Consumer](https://github/GabrielHCataldo/go-aws-sqs/blob/main/_example/consumer/main.go) +- [Queue](https://github/GabrielHCataldo/go-aws-sqs/blob/main/_example/queue/main.go) +- [Message](https://github/GabrielHCataldo/go-aws-sqs/blob/main/_example/message/main.go) + +How to contribute +------ +Make a pull request, or if you find a bug, open it +an Issues. + +License +------- +Distributed under MIT license, see the license file within the code for more details. \ No newline at end of file diff --git a/_example/consumer/main.go b/_example/consumer/main.go new file mode 100644 index 0000000..381f7e2 --- /dev/null +++ b/_example/consumer/main.go @@ -0,0 +1,108 @@ +package main + +import ( + "github.com/GabrielHCataldo/go-aws-sqs/sqs" + "github.com/GabrielHCataldo/go-aws-sqs/sqs/option" + "github.com/GabrielHCataldo/go-logger/logger" + "os" + "os/signal" + "time" +) + +type test struct { + Name string `json:"name,omitempty"` + BirthDate time.Time `json:"birthDate,omitempty"` + Emails []string `json:"emails,omitempty"` + Bank bank `json:"bank,omitempty"` + Map map[string]any +} + +type bank struct { + Account string `json:"account,omitempty"` + Digits string `json:"digits,omitempty"` + Balance float64 `json:"balance,omitempty"` +} + +type messageAttTest struct { + Name string `json:"account,omitempty"` + Text string `json:"text,omitempty"` + Balance float64 `json:"balance,omitempty"` + Bool bool `json:"bool"` + Int int `json:"int"` + SubStruct test `json:"subStruct,omitempty"` + PointerTest *test `json:"pointerBank,omitempty"` + Map map[string]any + Any any + EmptyString string `json:"emptyString,omitempty"` + HideString string `json:"-"` +} + +func main() { + simpleReceiveMessage() + simpleReceiveMessageStruct() + receiveMessage() + completeOptions() + simpleReceiveMessageAsync() +} + +func simpleReceiveMessage() { + sqs.SimpleReceiveMessage(os.Getenv("SQS_QUEUE_TEST_STRING_URL"), handlerSimple) +} + +func simpleReceiveMessageStruct() { + sqs.SimpleReceiveMessage(os.Getenv("SQS_QUEUE_TEST_URL"), handler) +} + +func receiveMessage() { + sqs.ReceiveMessage(os.Getenv("SQS_QUEUE_TEST_STRING_URL"), handlerReceiveMessage) +} + +func completeOptions() { + opt := option.NewConsumer(). + // HTTP communication customization options with AWS SQS + SetHttpClient(option.HttpClient{}). + // print logs (default: false) + SetDebugMode(true). + // If true remove the message from the queue after successfully processed (handler error return is null) (default: false) + SetDeleteMessageProcessedSuccess(true). + // Duration time to process the message, timeout applied in the past context. (default: 5 seconds) + SetConsumerMessageTimeout(5 * time.Second). + // Delay to run the next search for messages in the queue (default: 0) + SetDelayQueryLoop(5 * time.Second). + // The maximum number of messages to return. 1 a 10 (default: 10) + SetMaxNumberOfMessages(10). + // The maximum number of messages to return. 1 a 10 (default: 0) + SetVisibilityTimeout(5 * time.Second). + // The duration that the received messages are hidden from subsequent + // retrieve requests after being retrieved by a ReceiveMessage request. + SetReceiveRequestAttemptId(""). + // The duration for which the call waits for a message to arrive in + // the queue before returning (default: 0) + SetWaitTimeSeconds(1 * time.Second) + sqs.SimpleReceiveMessage(os.Getenv("SQS_QUEUE_TEST_URL"), handler, opt) +} + +func simpleReceiveMessageAsync() { + sqs.SimpleReceiveMessageAsync(os.Getenv("SQS_QUEUE_TEST_URL"), handler, option.NewConsumer().SetDebugMode(true)) + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + select { + case <-c: + logger.Info("Stopped application!") + } +} + +func handler(ctx *sqs.SimpleContext[test]) error { + logger.Debug("ctx simple body struct to process message:", ctx) + return nil +} + +func handlerSimple(ctx *sqs.SimpleContext[string]) error { + logger.Debug("ctx simple to process message:", ctx) + return nil +} + +func handlerReceiveMessage(ctx *sqs.Context[string, messageAttTest]) error { + logger.Debug("ctx to process message:", ctx) + return nil +} diff --git a/_example/producer/main.go b/_example/producer/main.go new file mode 100644 index 0000000..1d1ae1c --- /dev/null +++ b/_example/producer/main.go @@ -0,0 +1,150 @@ +package main + +import ( + "context" + "github.com/GabrielHCataldo/go-aws-sqs/sqs" + "github.com/GabrielHCataldo/go-aws-sqs/sqs/option" + "github.com/GabrielHCataldo/go-logger/logger" + "os" + "time" +) + +type test struct { + Name string `json:"name,omitempty"` + BirthDate time.Time `json:"birthDate,omitempty"` + Emails []string `json:"emails,omitempty"` + Bank bank `json:"bank,omitempty"` + Map map[string]any +} + +type bank struct { + Account string `json:"account,omitempty"` + Digits string `json:"digits,omitempty"` + Balance float64 `json:"balance,omitempty"` +} + +type messageAttTest struct { + Name string `json:"account,omitempty"` + Text string `json:"text,omitempty"` + Balance float64 `json:"balance,omitempty"` + Bool bool `json:"bool"` + Int int `json:"int"` + SubStruct test `json:"subStruct,omitempty"` + PointerTest *test `json:"pointerBank,omitempty"` + Map map[string]any + Any any + EmptyString string `json:"emptyString,omitempty"` + HideString string `json:"-"` +} + +func main() { + simple() + simpleAsync() + structBody() + mapBody() + completeOptions() +} + +func simple() { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + body := "body test" + message, err := sqs.SendMessage(ctx, os.Getenv("SQS_QUEUE_TEST_STRING_URL"), body) + if err != nil { + logger.Error("error send message:", err) + } else { + logger.Info("message sent successfully:", message) + } +} + +func simpleAsync() { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + body := "body test" + sqs.SendMessageAsync(ctx, os.Getenv("SQS_QUEUE_TEST_STRING_URL"), body) +} + +func structBody() { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + body := initTestStruct() + message, err := sqs.SendMessage(ctx, os.Getenv("SQS_QUEUE_TEST_URL"), body) + if err != nil { + logger.Error("error send message:", err) + } else { + logger.Info("message sent successfully:", message) + } +} + +func mapBody() { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + body := initTestMap() + message, err := sqs.SendMessage(ctx, os.Getenv("SQS_QUEUE_TEST_STRING_URL"), body) + if err != nil { + logger.Error("error send message:", err) + } else { + logger.Info("message sent successfully:", message) + } +} + +func completeOptions() { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + body := initTestStruct() + opt := option.NewProducer(). + // HTTP communication customization options with AWS SQS + SetHttpClient(option.HttpClient{}). + // print logs (default: false) + SetDebugMode(true). + // delay to delay the availability of message processing (default: 0) + SetDelaySeconds(5 * time.Second). + // Message attributes, must be of type Map or Struct, other types are not acceptable. + SetMessageAttributes(initMessageAttTest()). + // The message system attribute to send + SetMessageSystemAttributes(option.MessageSystemAttributes{}). + // This parameter applies only to FIFO (first-in-first-out) queues. The token used for deduplication of sent messages. + SetMessageDeduplicationId(""). + // This parameter applies only to FIFO (first-in-first-out) queues. The tag that specifies that a message belongs to a specific message group. + SetMessageGroupId("") + message, err := sqs.SendMessage(ctx, os.Getenv("SQS_QUEUE_TEST_URL"), body, opt) + if err != nil { + logger.Error("error send message:", err) + } else { + logger.Info("message sent successfully:", message) + } +} + +func initTestStruct() test { + b := bank{ + Account: "123456", + Digits: "2", + Balance: 200.12, + } + return test{ + Name: "Test Name", + BirthDate: time.Now(), + Emails: []string{"test@gmail.com", "gabriel@gmail.com", "gabriel.test@gmail.com"}, + Bank: b, + Map: map[string]any{"int": 1, "bool": true, "float": 1.23, "string": "text test"}, + } +} + +func initTestMap() map[string]any { + return map[string]any{"int": 1, "bool": true, "float": 1.23, "string": "text test"} +} + +func initMessageAttTest() messageAttTest { + t := initTestStruct() + return messageAttTest{ + Name: "Name test producer", + Text: "Text field", + Balance: 10.23, + Bool: true, + Int: 3, + SubStruct: t, + PointerTest: &t, + Map: initTestMap(), + HideString: "hide test", + } +} diff --git a/go.mod b/go.mod index 66c6a24..3b7463e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ -module go-aws-sqs +module github.com/GabrielHCataldo/go-aws-sqs -go 1.21 +go 1.21.3 require ( github.com/GabrielHCataldo/go-logger v1.0.8 diff --git a/gopher-sqs.png b/gopher-sqs.png new file mode 100644 index 0000000..dbe73c2 Binary files /dev/null and b/gopher-sqs.png differ diff --git a/sqs/consumer.go b/sqs/consumer.go index e9955d7..1756d79 100644 --- a/sqs/consumer.go +++ b/sqs/consumer.go @@ -3,11 +3,11 @@ package sqs import ( "context" "fmt" + "github.com/GabrielHCataldo/go-aws-sqs/internal/client" + "github.com/GabrielHCataldo/go-aws-sqs/internal/util" + "github.com/GabrielHCataldo/go-aws-sqs/sqs/option" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" - "go-aws-sqs/internal/client" - "go-aws-sqs/internal/util" - "go-aws-sqs/sqs/option" "reflect" "time" ) @@ -89,6 +89,11 @@ type HandlerConsumerFunc[Body, MessageAttributes any] func(ctx *Context[Body, Me // HandlerSimpleConsumerFunc is a function that consumes a message and returns an error if a failure occurs while processing the message. type HandlerSimpleConsumerFunc[Body any] func(ctx *SimpleContext[Body]) error +type channelMessageProcessed struct { + Err error + Signal *chan struct{} +} + var ctxInterrupt context.Context // ReceiveMessage Works as a repeating job, when triggered, it will fetch messages from the indicated queue @@ -238,6 +243,7 @@ func receiveMessage[Body, MessageAttributes any]( } loggerInfo(opt.DebugMode, "Start process received messages size:", len(output.Messages)) processMessages[Body, MessageAttributes](queueUrl, output, handler, opt) + time.Sleep(opt.DelayQueryLoop) } } @@ -275,26 +281,61 @@ func processMessages[Body, MessageAttributes any]( handler HandlerConsumerFunc[Body, MessageAttributes], opt option.Consumer, ) { - ctx, cancel := context.WithTimeout(context.TODO(), opt.ConsumerMessageTimeout) - defer cancel() var count int var mgsS, mgsF []string for _, message := range output.Messages { - nCtx, err := prepareContextConsumer[Body, MessageAttributes](ctx, queueUrl, message) - if err != nil { - loggerErr(opt.DebugMode, "error prepare context to consumer:", err) - return - } - err = handler(nCtx) - appendMessagesByResult(nCtx.Message.Id, err, mgsS, mgsF) - if opt.DeleteMessageProcessedSuccess { - go deleteMessage(queueUrl, *message.ReceiptHandle) - } + mgsS, mgsF = processMessage(queueUrl, handler, message, opt) count++ } loggerInfo(opt.DebugMode, "Finish process messages!", "processed:", count, "success:", mgsS, "failed:", mgsF) } +func processMessage[Body, MessageAttributes any]( + queueUrl string, + handler HandlerConsumerFunc[Body, MessageAttributes], + message types.Message, + opt option.Consumer, +) (mgsS, mgsF []string) { + ctx, cancel := context.WithTimeout(context.TODO(), opt.ConsumerMessageTimeout) + defer cancel() + ctxConsumer, err := prepareContextConsumer[Body, MessageAttributes](ctx, queueUrl, message) + if err != nil { + loggerErr(opt.DebugMode, "error prepare context to consumer:", err) + return + } + signal := make(chan struct{}, 1) + channel := channelMessageProcessed{ + Signal: &signal, + } + go processHandler(ctxConsumer, handler, opt, &channel) + select { + case <-ctx.Done(): + appendMessagesByResult(ctxConsumer.Message.Id, ctx.Err(), &mgsS, &mgsF) + break + case <-*channel.Signal: + appendMessagesByResult(*message.MessageId, channel.Err, &mgsS, &mgsF) + break + } + return mgsS, mgsF +} + +func processHandler[Body, MessageAttributes any]( + ctx *Context[Body, MessageAttributes], + handler HandlerConsumerFunc[Body, MessageAttributes], + opt option.Consumer, + channel *channelMessageProcessed, +) { + err := handler(ctx) + if ctx.Err() != nil { + return + } + if err == nil && opt.DeleteMessageProcessedSuccess { + go deleteMessage(ctx.QueueUrl, ctx.Message.ReceiptHandle) + } + channel.Err = err + *channel.Signal <- struct{}{} +} + func prepareContextConsumer[Body, MessageAttributes any]( ctx context.Context, queueUrl string, @@ -334,11 +375,11 @@ func prepareContextConsumer[Body, MessageAttributes any]( return ctxConsumer, nil } -func appendMessagesByResult(messageId string, err error, mgsS, mgsF []string) { +func appendMessagesByResult(messageId string, err error, mgsS, mgsF *[]string) { if err != nil { - mgsF = append(mgsF, messageId) + *mgsF = append(*mgsF, messageId) } else { - mgsS = append(mgsS, messageId) + *mgsS = append(*mgsS, messageId) } } diff --git a/sqs/log.go b/sqs/log.go index 4476127..d2d2431 100644 --- a/sqs/log.go +++ b/sqs/log.go @@ -4,12 +4,12 @@ import "github.com/GabrielHCataldo/go-logger/logger" func loggerInfo(debugMode bool, v ...any) { if debugMode { - logger.InfoSkipCaller(2, v...) + logger.InfoSkipCaller(3, v...) } } func loggerErr(debugMode bool, v ...any) { if debugMode { - logger.ErrorSkipCaller(2, v...) + logger.ErrorSkipCaller(3, v...) } } diff --git a/sqs/main_test.go b/sqs/main_test.go index 5bf2f0c..1c84f67 100644 --- a/sqs/main_test.go +++ b/sqs/main_test.go @@ -3,26 +3,26 @@ package sqs import ( "context" "errors" + "github.com/GabrielHCataldo/go-aws-sqs/sqs/option" "github.com/GabrielHCataldo/go-logger/logger" - "go-aws-sqs/sqs/option" "os" "strconv" "testing" "time" ) -const SqsQueueTestName = "SQS_QUEUE_TEST_NAME" -const SqsQueueTestUrl = "SQS_QUEUE_TEST_URL" -const SqsQueueTestStringArn = "SQS_QUEUE_TEST_STRING_ARN" -const SqsQueueTestFifoUrl = "SQS_QUEUE_TEST_FIFO_URL" -const SqsQueueTestEmptyUrl = "SQS_QUEUE_TEST_EMPTY_URL" -const SqsQueueTestStringUrl = "SQS_QUEUE_TEST_STRING_URL" -const SqsQueueTestDlqArn = "SQS_QUEUE_TEST_DLQ_ARN" -const SqsQueueCreateTestName = "SQS_QUEUE_CREATE_TEST_NAME" -const SqsQueueCreateTestUrl = "SQS_QUEUE_CREATE_TEST_URL" -const SqsMessageId = "SQS_MESSAGE_ID" -const SqsMessageReceiptHandle = "SQS_MESSAGE_RECEIPT_HANDLE" -const SqsTaskHandle = "SQS_TASK_HANDLE" +const sqsQueueTestName = "SQS_QUEUE_TEST_NAME" +const sqsQueueTestUrl = "SQS_QUEUE_TEST_URL" +const sqsQueueTestStringArn = "SQS_QUEUE_TEST_STRING_ARN" +const sqsQueueTestFifoUrl = "SQS_QUEUE_TEST_FIFO_URL" +const sqsQueueTestEmptyUrl = "SQS_QUEUE_TEST_EMPTY_URL" +const sqsQueueTestStringUrl = "SQS_QUEUE_TEST_STRING_URL" +const sqsQueueTestDlqArn = "SQS_QUEUE_TEST_DLQ_ARN" +const sqsQueueCreateTestName = "SQS_QUEUE_CREATE_TEST_NAME" +const sqsQueueCreateTestUrl = "SQS_QUEUE_CREATE_TEST_URL" +const sqsMessageId = "SQS_MESSAGE_ID" +const sqsMessageReceiptHandle = "SQS_MESSAGE_RECEIPT_HANDLE" +const sqsTaskHandle = "SQS_TASK_HANDLE" type testProducer struct { name string @@ -220,7 +220,7 @@ func initListTestProducer() []testProducer { return []testProducer{ { name: "valid request", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), v: initTestStruct(), opts: []option.Producer{ option.NewProducer().SetMessageAttributes(&msgAttTest), @@ -232,7 +232,7 @@ func initListTestProducer() []testProducer { }, { name: "valid request fifo", - queueUrl: os.Getenv(SqsQueueTestFifoUrl), + queueUrl: os.Getenv(sqsQueueTestFifoUrl), v: initTestStruct(), opts: []option.Producer{ option.NewProducer().SetDebugMode(true), @@ -246,7 +246,7 @@ func initListTestProducer() []testProducer { }, { name: "valid request async", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), v: initTestStruct(), opts: []option.Producer{ option.NewProducer().SetMessageAttributes(initTestMap()), @@ -263,7 +263,7 @@ func initListTestProducer() []testProducer { }, { name: "invalid message attributes", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), v: initTestStruct(), opts: []option.Producer{ option.NewProducer().SetMessageAttributes("test message string"), @@ -273,7 +273,7 @@ func initListTestProducer() []testProducer { }, { name: "invalid system message attributes", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), v: initTestStruct(), opts: []option.Producer{ option.NewProducer().SetMessageSystemAttributes(option.MessageSystemAttributes{ @@ -285,7 +285,7 @@ func initListTestProducer() []testProducer { }, { name: "empty message attributes", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), v: initTestStruct(), opts: []option.Producer{ option.NewProducer().SetMessageAttributes(struct{}{}), @@ -295,7 +295,7 @@ func initListTestProducer() []testProducer { }, { name: "invalid map message attributes", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), v: initTestStruct(), opts: []option.Producer{ option.NewProducer().SetMessageAttributes(initInvalidTestMap()), @@ -305,7 +305,7 @@ func initListTestProducer() []testProducer { }, { name: "invalid message body", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), v: "", wantErr: true, }, @@ -316,28 +316,28 @@ func initListTestConsumer[Body, MessageAttributes any]() []testConsumer[Body, Me return []testConsumer[Body, MessageAttributes]{ { name: "success", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), handler: initHandleConsumer[Body, MessageAttributes], opts: initOptionsConsumerDefault(), wantErr: false, }, { name: "success fifo", - queueUrl: os.Getenv(SqsQueueTestFifoUrl), + queueUrl: os.Getenv(sqsQueueTestFifoUrl), handler: initHandleConsumer[Body, MessageAttributes], opts: initOptionsConsumerDefault(), wantErr: false, }, { name: "success error consumer", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), handler: initHandleConsumerWithErr[Body, MessageAttributes], opts: initOptionsConsumerDefault(), wantErr: false, }, { name: "success async", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), handler: initHandleConsumer[Body, MessageAttributes], opts: initOptionsConsumerDefault(), async: true, @@ -345,7 +345,7 @@ func initListTestConsumer[Body, MessageAttributes any]() []testConsumer[Body, Me }, { name: "success empty", - queueUrl: os.Getenv(SqsQueueTestEmptyUrl), + queueUrl: os.Getenv(sqsQueueTestEmptyUrl), handler: initHandleConsumer[Body, MessageAttributes], opts: initOptionsConsumerDefault(), async: false, @@ -353,7 +353,7 @@ func initListTestConsumer[Body, MessageAttributes any]() []testConsumer[Body, Me }, { name: "failed parse body", - queueUrl: os.Getenv(SqsQueueTestStringUrl), + queueUrl: os.Getenv(sqsQueueTestStringUrl), handler: initHandleConsumer[Body, MessageAttributes], opts: initOptionsConsumerDefault(), async: false, @@ -373,21 +373,21 @@ func initListTestSimpleConsumer[Body any]() []testSimpleConsumer[Body] { return []testSimpleConsumer[Body]{ { name: "success", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), handler: initSimpleHandleConsumer[Body], opts: initOptionsConsumerDefault(), wantErr: false, }, { name: "success error consumer", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), handler: initSimpleHandleConsumerWithErr[Body], opts: initOptionsConsumerDefault(), wantErr: false, }, { name: "success async", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), handler: initSimpleHandleConsumer[Body], opts: initOptionsConsumerDefault(), async: true, @@ -395,12 +395,20 @@ func initListTestSimpleConsumer[Body any]() []testSimpleConsumer[Body] { }, { name: "failed parse body", - queueUrl: os.Getenv(SqsQueueTestStringUrl), + queueUrl: os.Getenv(sqsQueueTestStringUrl), handler: initSimpleHandleConsumer[Body], opts: initOptionsConsumerDefault(), async: false, wantErr: false, }, + { + name: "failed timeout", + queueUrl: os.Getenv(sqsQueueTestUrl), + handler: initSimpleHandleConsumerTimeout[Body], + opts: initOptionsConsumerDefault(), + async: false, + wantErr: false, + }, { name: "failed", queueUrl: "https://google.com/", @@ -432,7 +440,7 @@ func initListTestDeleteQueue() []testDeleteQueue { return []testDeleteQueue{ { name: "success", - queueUrl: os.Getenv(SqsQueueCreateTestUrl), + queueUrl: os.Getenv(sqsQueueCreateTestUrl), opts: initOptionsDefault(), wantErr: false, }, @@ -500,7 +508,7 @@ func initListTestPurgeQueue() []testPurgeQueue { return []testPurgeQueue{ { name: "success", - queueUrl: os.Getenv(SqsQueueCreateTestUrl), + queueUrl: os.Getenv(sqsQueueCreateTestUrl), opts: initOptionsDefault(), wantErr: false, }, @@ -551,7 +559,7 @@ func initListTestListQueueTags() []testListQueueTags { return []testListQueueTags{ { name: "success", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), opts: initOptionsDefault(), wantErr: false, }, @@ -568,7 +576,7 @@ func initListTestListDeadLetterSourceQueues() []testListDeadLetterSourceQueues { return []testListDeadLetterSourceQueues{ { name: "success", - queueUrl: os.Getenv(SqsQueueTestUrl), + queueUrl: os.Getenv(sqsQueueTestUrl), opts: initOptionsListDeadLetterSourceQueues(), wantErr: false, }, @@ -600,8 +608,8 @@ func initListTestDeleteMessage() []testDeleteMessage { return []testDeleteMessage{ { name: "success", - queueUrl: os.Getenv(SqsQueueTestStringUrl), - receiptHandle: os.Getenv(SqsMessageReceiptHandle), + queueUrl: os.Getenv(sqsQueueTestStringUrl), + receiptHandle: os.Getenv(sqsMessageReceiptHandle), opts: initOptionsDefault(), wantErr: false, }, @@ -620,11 +628,11 @@ func initListTestDeleteMessageBatch() []testDeleteMessageBatch { { name: "success", input: DeleteMessageBatchInput{ - QueueUrl: os.Getenv(SqsQueueTestStringUrl), + QueueUrl: os.Getenv(sqsQueueTestStringUrl), Entries: []DeleteMessageBatchRequestEntry{ { - Id: os.Getenv(SqsMessageId), - ReceiptHandle: os.Getenv(SqsMessageReceiptHandle), + Id: os.Getenv(sqsMessageId), + ReceiptHandle: os.Getenv(sqsMessageReceiptHandle), }, }, }, @@ -648,8 +656,8 @@ func initListTestChangeMessageVisibility() []testChangeMessageVisibility { { name: "success", input: ChangeMessageVisibilityInput{ - QueueUrl: os.Getenv(SqsQueueTestStringUrl), - ReceiptHandle: os.Getenv(SqsMessageReceiptHandle), + QueueUrl: os.Getenv(sqsQueueTestStringUrl), + ReceiptHandle: os.Getenv(sqsMessageReceiptHandle), VisibilityTimeout: 1, }, opts: initOptionsDefault(), @@ -671,11 +679,11 @@ func initListTestChangeMessageVisibilityBatch() []testChangeMessageVisibilityBat { name: "success", input: ChangeMessageVisibilityBatchInput{ - QueueUrl: os.Getenv(SqsQueueTestStringUrl), + QueueUrl: os.Getenv(sqsQueueTestStringUrl), Entries: []ChangeMessageVisibilityBatchRequestEntry{ { - Id: os.Getenv(SqsMessageId), - ReceiptHandle: os.Getenv(SqsMessageReceiptHandle), + Id: os.Getenv(sqsMessageId), + ReceiptHandle: os.Getenv(sqsMessageReceiptHandle), VisibilityTimeout: 3, }, }, @@ -717,7 +725,7 @@ func initListTestCancelMessageMoveTask() []testCancelMessageMoveTask { return []testCancelMessageMoveTask{ { name: "success", - taskHandle: os.Getenv(SqsTaskHandle), + taskHandle: os.Getenv(sqsTaskHandle), opts: initOptionsDefault(), wantErr: false, }, @@ -734,7 +742,7 @@ func initListTestListMessageMoveTask() []testListMessageMoveTask { return []testListMessageMoveTask{ { name: "success", - sourceArn: os.Getenv(SqsQueueTestStringArn), + sourceArn: os.Getenv(sqsQueueTestStringArn), opts: initOptionsListMessageMoveTasks(), wantErr: false, }, @@ -811,6 +819,12 @@ func initSimpleHandleConsumer[Body any](ctx *SimpleContext[Body]) error { return nil } +func initSimpleHandleConsumerTimeout[Body any](ctx *SimpleContext[Body]) error { + logger.Debug("ctx:", ctx) + time.Sleep(30 * time.Second) + return nil +} + func initHandleConsumerWithErr[Body, MessageAttributes any](ctx *Context[Body, MessageAttributes]) error { logger.Debug("ctx:", ctx) return initErrorConsumer() @@ -827,35 +841,35 @@ func initErrorConsumer() error { func initTagQueueInput() TagQueueInput { return TagQueueInput{ - QueueUrl: os.Getenv(SqsQueueCreateTestUrl), + QueueUrl: os.Getenv(sqsQueueCreateTestUrl), Tags: initTagsQueue(), } } func initSetQueueAttributesInput() SetQueueAttributesInput { return SetQueueAttributesInput{ - QueueUrl: os.Getenv(SqsQueueCreateTestUrl), + QueueUrl: os.Getenv(sqsQueueCreateTestUrl), Attributes: initAttributesQueue("900"), } } func initUntagQueueInput() UntagQueueInput { return UntagQueueInput{ - QueueUrl: os.Getenv(SqsQueueCreateTestUrl), + QueueUrl: os.Getenv(sqsQueueCreateTestUrl), TagKeys: []string{"tag-test"}, } } func initGetQueueUrlInput() GetQueueUrlInput { return GetQueueUrlInput{ - QueueName: os.Getenv(SqsQueueTestName), + QueueName: os.Getenv(sqsQueueTestName), QueueOwnerAWSAccountId: nil, } } func initGetQueueAttributesInput() GetQueueAttributesInput { return GetQueueAttributesInput{ - QueueUrl: os.Getenv(SqsQueueTestUrl), + QueueUrl: os.Getenv(sqsQueueTestUrl), AttributeNames: nil, } } @@ -871,7 +885,7 @@ func initAttributesQueue(delaySeconds string) map[string]string { func initStartMessageMoveTaskInput() StartMessageMoveTaskInput { maxNumberOfMessagesPerSecond := int32(1) return StartMessageMoveTaskInput{ - SourceArn: os.Getenv(SqsQueueTestDlqArn), + SourceArn: os.Getenv(sqsQueueTestDlqArn), DestinationArn: nil, MaxNumberOfMessagesPerSecond: &maxNumberOfMessagesPerSecond, } @@ -979,16 +993,16 @@ func initMessageString() { ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() opt := option.NewProducer().SetMessageAttributes(initMessageAttTest()) - output, err := SendMessage(ctx, os.Getenv(SqsQueueTestStringUrl), "test body string", opt) + output, err := SendMessage(ctx, os.Getenv(sqsQueueTestStringUrl), "test body string", opt) if err != nil { logger.Error("error send message:", err) return } - _ = os.Setenv(SqsMessageId, *output.MessageId) + _ = os.Setenv(sqsMessageId, *output.MessageId) } func initMessageStruct(queueUrl string) { - if queueUrl != os.Getenv(SqsQueueTestUrl) && queueUrl != os.Getenv(SqsQueueTestFifoUrl) { + if queueUrl != os.Getenv(sqsQueueTestUrl) && queueUrl != os.Getenv(sqsQueueTestFifoUrl) { return } ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) @@ -1010,8 +1024,8 @@ func initMessageReceiptHandle() { SetMaxNumberOfMessages(1). SetVisibilityTimeout(0). SetDeleteMessageProcessedSuccess(false) - SimpleReceiveMessage[any](os.Getenv(SqsQueueTestStringUrl), func(ctx *SimpleContext[any]) error { - _ = os.Setenv(SqsMessageReceiptHandle, ctx.Message.ReceiptHandle) + SimpleReceiveMessage[any](os.Getenv(sqsQueueTestStringUrl), func(ctx *SimpleContext[any]) error { + _ = os.Setenv(sqsMessageReceiptHandle, ctx.Message.ReceiptHandle) cancel() return nil }, opt) @@ -1028,8 +1042,8 @@ func initQueueCreateTest() { logger.Error("error create test-queue:", err) return } - _ = os.Setenv(SqsQueueCreateTestName, name) - _ = os.Setenv(SqsQueueCreateTestUrl, *output.QueueUrl) + _ = os.Setenv(sqsQueueCreateTestName, name) + _ = os.Setenv(sqsQueueCreateTestUrl, *output.QueueUrl) } func initStartMessageMoveTask() { @@ -1041,40 +1055,40 @@ func initStartMessageMoveTask() { logger.Error("error start message move task:", err) return } - _ = os.Setenv(SqsTaskHandle, *output.TaskHandle) + _ = os.Setenv(sqsTaskHandle, *output.TaskHandle) } func deleteQueueCreateTest() { ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() - url := os.Getenv(SqsQueueCreateTestUrl) + url := os.Getenv(sqsQueueCreateTestUrl) if len(url) == 0 { return } - _, _ = DeleteQueue(ctx, os.Getenv(SqsQueueCreateTestUrl)) + _, _ = DeleteQueue(ctx, os.Getenv(sqsQueueCreateTestUrl)) } func purgeQueues() { ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() - urlString := os.Getenv(SqsQueueTestStringUrl) - urlFifo := os.Getenv(SqsQueueTestFifoUrl) + urlString := os.Getenv(sqsQueueTestStringUrl) + urlFifo := os.Getenv(sqsQueueTestFifoUrl) if len(urlString) != 0 { - _, _ = PurgeQueue(ctx, os.Getenv(SqsQueueTestStringUrl)) + _, _ = PurgeQueue(ctx, os.Getenv(sqsQueueTestStringUrl)) } if len(urlFifo) != 0 { - _, _ = PurgeQueue(ctx, os.Getenv(SqsQueueTestFifoUrl)) + _, _ = PurgeQueue(ctx, os.Getenv(sqsQueueTestFifoUrl)) } } func cancelMessageMoveTaskTest() { ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() - taskHandle := os.Getenv(SqsTaskHandle) + taskHandle := os.Getenv(sqsTaskHandle) if len(taskHandle) == 0 { return } - _, _ = CancelMessageMoveTask(ctx, os.Getenv(SqsTaskHandle)) + _, _ = CancelMessageMoveTask(ctx, os.Getenv(sqsTaskHandle)) } diff --git a/sqs/message.go b/sqs/message.go index ebd53df..f118a14 100644 --- a/sqs/message.go +++ b/sqs/message.go @@ -2,11 +2,11 @@ package sqs import ( "context" + "github.com/GabrielHCataldo/go-aws-sqs/internal/client" + "github.com/GabrielHCataldo/go-aws-sqs/internal/util" + "github.com/GabrielHCataldo/go-aws-sqs/sqs/option" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" - "go-aws-sqs/internal/client" - "go-aws-sqs/internal/util" - "go-aws-sqs/sqs/option" "time" ) diff --git a/sqs/message_test.go b/sqs/message_test.go index 83490b9..7bb4023 100644 --- a/sqs/message_test.go +++ b/sqs/message_test.go @@ -74,7 +74,7 @@ func TestStartMessageMoveTask(t *testing.T) { t.Errorf("StartMessageMoveTask() error = %v, wantErr %v", err, tt.wantErr) return } else if output != nil && output.TaskHandle != nil { - _ = os.Setenv(SqsTaskHandle, *output.TaskHandle) + _ = os.Setenv(sqsTaskHandle, *output.TaskHandle) cancelMessageMoveTaskTest() } }) diff --git a/sqs/option/consumer.go b/sqs/option/consumer.go index 3401d7b..47e8736 100644 --- a/sqs/option/consumer.go +++ b/sqs/option/consumer.go @@ -4,10 +4,16 @@ import "time" type Consumer struct { Default + // If true remove the message from the queue after successfully processed (handler error return is null) + // // default: false DeleteMessageProcessedSuccess bool + // Duration time to process the message, timeout applied in the past context. + // // default: 5 seconds ConsumerMessageTimeout time.Duration + // Delay to run the next search for messages in the queue + // // default: 5 seconds DelayQueryLoop time.Duration // The maximum number of messages to return. Amazon SQS never returns more diff --git a/sqs/option/producer.go b/sqs/option/producer.go index 58b515d..ea20f62 100644 --- a/sqs/option/producer.go +++ b/sqs/option/producer.go @@ -1,7 +1,8 @@ package option import ( - "go-aws-sqs/internal/util" + "github.com/GabrielHCataldo/go-aws-sqs/internal/util" + "reflect" "time" ) @@ -32,7 +33,7 @@ type Producer struct { // - Every message must have a unique MessageDeduplicationId , // - You may provide a MessageDeduplicationId explicitly. // - If you aren't able to provide a MessageDeduplicationId and you enable - // ContentBasedDeduplication for your queue, Amazon SQS uses a SHA-256 hash to + // ContentBasedDeduplication for your queue, Amazon SQS uses an SHA-256 hash to // generate the MessageDeduplicationId using the body of the message (but not the // attributes of the message). // - If you don't provide a MessageDeduplicationId and the queue doesn't have @@ -136,7 +137,9 @@ func GetProducerByParams(opts []Producer) Producer { if util.IsValidType(opt.MessageAttributes) { result.MessageAttributes = opt.MessageAttributes } - if util.IsValidType(opt.MessageSystemAttributes) { + if util.IsValidType(opt.MessageSystemAttributes) && + !util.IsZeroReflect(reflect.ValueOf(opt.MessageSystemAttributes)) && + len(opt.MessageSystemAttributes.AWSTraceHeader) != 0 { result.MessageSystemAttributes = opt.MessageSystemAttributes } if opt.MessageDeduplicationId != nil && len(*opt.MessageDeduplicationId) != 0 { diff --git a/sqs/producer.go b/sqs/producer.go index 9fd93f9..c2eacef 100644 --- a/sqs/producer.go +++ b/sqs/producer.go @@ -3,12 +3,12 @@ package sqs import ( "context" "errors" + "github.com/GabrielHCataldo/go-aws-sqs/internal/client" + "github.com/GabrielHCataldo/go-aws-sqs/internal/util" + "github.com/GabrielHCataldo/go-aws-sqs/sqs/option" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" - "go-aws-sqs/internal/client" - "go-aws-sqs/internal/util" - "go-aws-sqs/sqs/option" "reflect" ) @@ -28,19 +28,19 @@ import ( // // - ctx (Context): The context of the request. // - queueUrl (string): The URL of the SQS queue. -// - v (any): The content of the message. +// - body (any): The content of the message. // - opts (option.Producer): Optional options to customize the message. (see OptionsProducer declaration for available options) // // # Returns: // // - *sqs.SendMessageOutput: The result of the SendMessage operation. // - error: An error if one occurs during the SendMessage operation. -func SendMessage(ctx context.Context, queueUrl string, v any, opts ...option.Producer) (*sqs.SendMessageOutput, error) { +func SendMessage(ctx context.Context, queueUrl string, body any, opts ...option.Producer) (*sqs.SendMessageOutput, error) { opt := option.GetProducerByParams(opts) loggerInfo(opt.DebugMode, "getting client sqs..") sqsClient := client.GetClient(ctx) loggerInfo(opt.DebugMode, "preparing message input..") - input, err := prepareMessageInput(queueUrl, v, opt) + input, err := prepareMessageInput(queueUrl, body, opt) if err != nil { loggerErr(opt.DebugMode, "error prepare message input:", err) return nil, err @@ -67,18 +67,18 @@ func SendMessage(ctx context.Context, queueUrl string, v any, opts ...option.Pro // // - ctx (Context): The context of the request. // - queueUrl (string): The URL of the SQS queue. -// - v (any): The content of the message. +// - body (any): The content of the message. // - opts (OptionsProducer): Optional options to customize the message. (see OptionsProducer declaration for available options) // // # Example usage: // // SendMessageAsync(ctx, queueUrl, v, opts...) -func SendMessageAsync(ctx context.Context, queueUrl string, v any, opts ...option.Producer) { - go sendMessageAsync(ctx, queueUrl, v, opts...) +func SendMessageAsync(ctx context.Context, queueUrl string, body any, opts ...option.Producer) { + go sendMessageAsync(ctx, queueUrl, body, opts...) } -func sendMessageAsync(ctx context.Context, queueUrl string, v any, opts ...option.Producer) { - _, _ = SendMessage(ctx, queueUrl, v, opts...) +func sendMessageAsync(ctx context.Context, queueUrl string, body any, opts ...option.Producer) { + _, _ = SendMessage(ctx, queueUrl, body, opts...) } func prepareMessageInput(queueUrl string, v any, opt option.Producer) (*sqs.SendMessageInput, error) { diff --git a/sqs/queue.go b/sqs/queue.go index 2fc60bb..0b707cd 100644 --- a/sqs/queue.go +++ b/sqs/queue.go @@ -2,10 +2,10 @@ package sqs import ( "context" + "github.com/GabrielHCataldo/go-aws-sqs/internal/client" + "github.com/GabrielHCataldo/go-aws-sqs/sqs/option" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" - "go-aws-sqs/internal/client" - "go-aws-sqs/sqs/option" ) type TagQueueInput struct { diff --git a/sqs/queue_test.go b/sqs/queue_test.go index 732ce16..5cde22d 100644 --- a/sqs/queue_test.go +++ b/sqs/queue_test.go @@ -17,8 +17,8 @@ func TestCreateQueue(t *testing.T) { t.Errorf("CreateQueue() error = %v, wantErr %v", err, tt.wantErr) return } else if output != nil && output.QueueUrl != nil { - _ = os.Setenv(SqsQueueCreateTestName, tt.queueName) - _ = os.Setenv(SqsQueueCreateTestUrl, *output.QueueUrl) + _ = os.Setenv(sqsQueueCreateTestName, tt.queueName) + _ = os.Setenv(sqsQueueCreateTestUrl, *output.QueueUrl) deleteQueueCreateTest() } })