diff --git a/sqs/consumer.go b/sqs/consumer.go index 55c90e4..e9955d7 100644 --- a/sqs/consumer.go +++ b/sqs/consumer.go @@ -12,6 +12,8 @@ import ( "time" ) +// MessageReceived represents a received message from Amazon Simple Queue Service (SQS). +// It contains information about the message, such as its ID, receipt handle, body, attributes, type MessageReceived[Body, MessageAttributes any] struct { // A unique identifier for the message. An Id is considered unique across all // Amazon Web Services accounts for an extended period of time. @@ -20,8 +22,19 @@ type MessageReceived[Body, MessageAttributes any] struct { // handler is returned every time you receive a message. When deleting a message, // you provide the last received receipt handler to delete the message. ReceiptHandle string - // The message's contents (not URL-encoded). - Body Body + // Message body converted + Body Body + // Supported attributes: + // - ApproximateReceiveCount + // - ApproximateFirstReceiveTimestamp + // - MessageDeduplicationId + // - MessageGroupId + // - SenderId + // - SentTimestamp + // - SequenceNumber + // ApproximateFirstReceiveTimestamp and SentTimestamp are each returned as an + // integer representing the epoch time (http://en.wikipedia.org/wiki/Unix_time) in + // milliseconds. Attributes Attributes // An MD5 digest of the non-URL-encoded message body string. MD5OfBody string @@ -30,35 +43,74 @@ type MessageReceived[Body, MessageAttributes any] struct { // URL-decodes the message before creating the MD5 digest. For information about // MD5, see RFC1321 (https://www.ietf.org/rfc/rfc1321.txt). MD5OfMessageAttributes *string - // Each message attribute consists of a Name, Type, and Value. For more - // information, see Amazon SQS message attributes (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes) - // in the Amazon SQS Developer Guide. + // Converted message attributes MessageAttributes MessageAttributes } +// Attributes represents the various attributes associated with a received message from Amazon Simple Queue Service (SQS). +// It includes information such as the approximate receipt count, the approximate timestamp of the first receipt, +// message deduplication ID (FIFO), message group ID (FIFO), sender ID, sent timestamp, and sequence number. type Attributes struct { - ApproximateReceiveCount int + // Approximate receipt count + ApproximateReceiveCount int + // Approximate timestamp of first receipt ApproximateFirstReceiveTimestamp time.Time - MessageDeduplicationId string - MessageGroupId string - SenderId string - SentTimestamp time.Time - SequenceNumber int + // Message deduplication ID (FIFO) + MessageDeduplicationId string + // Message group ID (FIFO) + MessageGroupId string + // sender ID + SenderId string + // Timestamp sent + SentTimestamp time.Time + // Sequence Number + SequenceNumber int } +// Context represents the execution context of a consumer handler in the package. +// It contains the necessary information for handling the message in the queue. type Context[Body, MessageAttributes any] struct { - context.Context + // context used to process the message, may have a timeout if you pass the option.Consumer.ConsumerMessageTimeout + // as a parameter + context.Context `json:"-"` + // message queue url QueueUrl string - Message MessageReceived[Body, MessageAttributes] + // converted message to process + Message MessageReceived[Body, MessageAttributes] } +// SimpleContext represents the execution context of a consumer handler in the package. +// It contains the necessary information for handling the message in the queue. type SimpleContext[Body any] Context[Body, map[string]types.MessageAttributeValue] +// HandlerConsumerFunc is a function that consumes a message and returns an error if a failure occurs while processing the message. type HandlerConsumerFunc[Body, MessageAttributes any] func(ctx *Context[Body, MessageAttributes]) error + +// HandlerSimpleConsumerFunc is a function that consumes a message and returns an error if a failure occurs while processing the message. type HandlerSimpleConsumerFunc[Body any] func(ctx *SimpleContext[Body]) error var ctxInterrupt context.Context +// ReceiveMessage Works as a repeating job, when triggered, it will fetch messages from the indicated queue +// in the queueUrl parameter, if it does not find any messages it will reprocess the function looking for new messages again, +// otherwise, it will process these messages converting them into Context with the types of Body and MessageAttributes +// passed explicitly in the function, converted to the Context.Message.Body and Context.Message.MessageAttributes field, +// if you don't use MessageAttributes, you can use the SimpleReceiveMessage function. After conversion, +// we call the handler parameter so that this function processes message by message, in this handler we expect a return +// error, if no error occurred when processing the message, we have the auto-delete option +// (option.Consumer.DeleteMessageProcessedSuccess) if you have it enabled, we will remove the message from the file for you. +// finally, when processing all messages, we return to the initial flow looking for new messages. +// +// # Parameters +// +// - queueUrl: url of the queue where you want to fetch messages +// - handler: function to process the received message +// - opts: list of option.Consumer to customize the job +// +// # Panic +// +// If 3 errors occur when obtaining the message from the queue, it will trigger a panic informing the error returned +// from AWS SQS. func ReceiveMessage[Body, MessageAttributes any]( queueUrl string, handler HandlerConsumerFunc[Body, MessageAttributes], @@ -67,6 +119,28 @@ func ReceiveMessage[Body, MessageAttributes any]( receiveMessage(queueUrl, handler, option.GetConsumerByParams(opts)) } +// ReceiveMessageAsync Works like a repeating job, when triggered, it will fetch messages from the indicated queue +// in the queueUrl parameter, if it does not find any messages it will reprocess the function looking for new messages again, +// otherwise, it will process these messages converting them into Context with the types of Body and MessageAttributes +// passed explicitly in the function, converted to the Context.Message.Body and Context.Message.MessageAttributes field, +// if you don't use MessageAttributes, you can use the SimpleReceiveMessageAsync function. After conversion, +// we call the handler parameter so that this function processes message by message, in this handler we expect a return +// error, if no error occurred when processing the message, we have the auto-delete option +// (option.Consumer.DeleteMessageProcessedSuccess) where if enabled, we remove the message from the queue for you. +// finally, when processing all messages, we return to the initial flow looking for new messages. +// +// Unlike ReceiveMessage, this function will be processed asynchronously. +// +// # Parameters +// +// - queueUrl: url of the queue where you want to fetch messages +// - handler: function to process the received message +// - opts: list of options.Consumer to customize the job +// +// # Panic +// +// If 3 errors occur when obtaining the message from the queue, it will trigger a panic informing the error returned +// from AWS SQS. func ReceiveMessageAsync[Body, MessageAttributes any]( queueUrl string, handler HandlerConsumerFunc[Body, MessageAttributes], @@ -75,6 +149,25 @@ func ReceiveMessageAsync[Body, MessageAttributes any]( go receiveMessage(queueUrl, handler, option.GetConsumerByParams(opts)) } +// SimpleReceiveMessage Works as a repeating job, when triggered, it will fetch messages from the indicated queue +// in the queueUrl parameter, if it does not find any messages it will reprocess the function looking for new messages +// again, otherwise, it will process these messages converting them to Context with the Body type passed explicitly +// in the function, converted to the Context.Message.Body field. After the conversion, we call the handler parameter +// so that this function processes message by message, in this handler we expect an error return, if none has occurred +// error processing the message, we have the auto-delete option (option.Consumer.DeleteMessageProcessedSuccess) in which case +// is enabled, we remove the message from the queue for you. Finally, after processing all messages, we return to the flow +// initial searching for new messages. +// +// # Parameters +// +// - queueUrl: url of the queue where you want to fetch messages +// - handler: function to process the received message +// - opts: list of options.Consumer to customize the job +// +// # Panic +// +// If 3 errors occur when obtaining the message from the queue, it will trigger a panic informing the error returned +// from AWS SQS. func SimpleReceiveMessage[Body any]( queueUrl string, simpleHandle HandlerSimpleConsumerFunc[Body], @@ -84,6 +177,27 @@ func SimpleReceiveMessage[Body any]( receiveMessage(queueUrl, handler, option.GetConsumerByParams(opts)) } +// SimpleReceiveMessageAsync Works like a repeating job, when triggered, it will fetch messages from the indicated queue +// in the queueUrl parameter, if it does not find any messages it will reprocess the function looking for new messages again, +// otherwise, it will process these messages converting them to Context with the Body type passed explicitly +// in the function, converted to the Context.Message.Body field. After the conversion, we call the handler parameter so +// that this function processes message by message, in this handler we expect an error return, if none has occurred +// error processing the message, we have the auto-delete option (option.Consumer.DeleteMessageProcessedSuccess) in which case +// is enabled, we remove the message from the queue for you. Finally, after processing all messages, we return to the flow +// initial searching for new messages. +// +// Unlike SimpleReceiveMessage, this function will be processed asynchronously. +// +// # Parameters +// +// - queueUrl: url of the queue where you want to fetch messages +// - handler: function to process the received message +// - opts: list of options.Consumer to customize the job +// +// # Panic +// +// If 3 errors occur when obtaining the message from the queue, it will trigger a panic informing the error returned +// from AWS SQS. func SimpleReceiveMessageAsync[Body any]( queueUrl string, simpleHandle HandlerSimpleConsumerFunc[Body], @@ -166,7 +280,7 @@ func processMessages[Body, MessageAttributes any]( var count int var mgsS, mgsF []string for _, message := range output.Messages { - nCtx, err := prepareContextConsumer[Body, MessageAttributes](ctx, queueUrl, message, opt) + nCtx, err := prepareContextConsumer[Body, MessageAttributes](ctx, queueUrl, message) if err != nil { loggerErr(opt.DebugMode, "error prepare context to consumer:", err) return @@ -185,7 +299,6 @@ func prepareContextConsumer[Body, MessageAttributes any]( ctx context.Context, queueUrl string, message types.Message, - opt option.Consumer, ) (*Context[Body, MessageAttributes], error) { ctxConsumer := &Context[Body, MessageAttributes]{ Context: ctx, @@ -211,7 +324,7 @@ func prepareContextConsumer[Body, MessageAttributes any]( if util.IsMapMessageAttributeValues(messagesAttributes) { messageReceived.MessageAttributes = any(message.MessageAttributes).(MessageAttributes) } else { - convertMessageAttributes[MessageAttributes](message.MessageAttributes, &messagesAttributes, opt) + convertMessageAttributes[MessageAttributes](message.MessageAttributes, &messagesAttributes) messageReceived.MessageAttributes = messagesAttributes } } @@ -229,7 +342,7 @@ func appendMessagesByResult(messageId string, err error, mgsS, mgsF []string) { } } -func convertMessageAttributes[T any](messageAttributes map[string]types.MessageAttributeValue, dest *T, opt option.Consumer) { +func convertMessageAttributes[T any](messageAttributes map[string]types.MessageAttributeValue, dest *T) { m := map[string]any{} for k, v := range messageAttributes { var valueProcessed any diff --git a/sqs/error.go b/sqs/error.go index ae67732..17b95b9 100644 --- a/sqs/error.go +++ b/sqs/error.go @@ -4,4 +4,3 @@ import "errors" var ErrMessageBodyEmpty = errors.New("sqs: no message body passed") var ErrParseBody = errors.New("sqs: message parse body failed") -var ErrInvalidBodyType = errors.New("sqs: invalid type body")