Skip to content

Commit

Permalink
Finish consumer documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel Cataldo committed Dec 14, 2023
1 parent 3207d80 commit a5d4b07
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 18 deletions.
147 changes: 130 additions & 17 deletions sqs/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"time"
)

// MessageReceived represents a received message from Amazon Simple Queue Service (SQS).
// It contains information about the message, such as its ID, receipt handle, body, attributes,
type MessageReceived[Body, MessageAttributes any] struct {
// A unique identifier for the message. An Id is considered unique across all
// Amazon Web Services accounts for an extended period of time.
Expand All @@ -20,8 +22,19 @@ type MessageReceived[Body, MessageAttributes any] struct {
// handler is returned every time you receive a message. When deleting a message,
// you provide the last received receipt handler to delete the message.
ReceiptHandle string
// The message's contents (not URL-encoded).
Body Body
// Message body converted
Body Body
// Supported attributes:
// - ApproximateReceiveCount
// - ApproximateFirstReceiveTimestamp
// - MessageDeduplicationId
// - MessageGroupId
// - SenderId
// - SentTimestamp
// - SequenceNumber
// ApproximateFirstReceiveTimestamp and SentTimestamp are each returned as an
// integer representing the epoch time (http://en.wikipedia.org/wiki/Unix_time) in
// milliseconds.
Attributes Attributes
// An MD5 digest of the non-URL-encoded message body string.
MD5OfBody string
Expand All @@ -30,35 +43,74 @@ type MessageReceived[Body, MessageAttributes any] struct {
// URL-decodes the message before creating the MD5 digest. For information about
// MD5, see RFC1321 (https://www.ietf.org/rfc/rfc1321.txt).
MD5OfMessageAttributes *string
// Each message attribute consists of a Name, Type, and Value. For more
// information, see Amazon SQS message attributes (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes)
// in the Amazon SQS Developer Guide.
// Converted message attributes
MessageAttributes MessageAttributes
}

// Attributes represents the various attributes associated with a received message from Amazon Simple Queue Service (SQS).
// It includes information such as the approximate receipt count, the approximate timestamp of the first receipt,
// message deduplication ID (FIFO), message group ID (FIFO), sender ID, sent timestamp, and sequence number.
type Attributes struct {
ApproximateReceiveCount int
// Approximate receipt count
ApproximateReceiveCount int
// Approximate timestamp of first receipt
ApproximateFirstReceiveTimestamp time.Time
MessageDeduplicationId string
MessageGroupId string
SenderId string
SentTimestamp time.Time
SequenceNumber int
// Message deduplication ID (FIFO)
MessageDeduplicationId string
// Message group ID (FIFO)
MessageGroupId string
// sender ID
SenderId string
// Timestamp sent
SentTimestamp time.Time
// Sequence Number
SequenceNumber int
}

// Context represents the execution context of a consumer handler in the package.
// It contains the necessary information for handling the message in the queue.
type Context[Body, MessageAttributes any] struct {
context.Context
// context used to process the message, may have a timeout if you pass the option.Consumer.ConsumerMessageTimeout
// as a parameter
context.Context `json:"-"`
// message queue url
QueueUrl string
Message MessageReceived[Body, MessageAttributes]
// converted message to process
Message MessageReceived[Body, MessageAttributes]
}

// SimpleContext represents the execution context of a consumer handler in the package.
// It contains the necessary information for handling the message in the queue.
type SimpleContext[Body any] Context[Body, map[string]types.MessageAttributeValue]

// HandlerConsumerFunc is a function that consumes a message and returns an error if a failure occurs while processing the message.
type HandlerConsumerFunc[Body, MessageAttributes any] func(ctx *Context[Body, MessageAttributes]) error

// HandlerSimpleConsumerFunc is a function that consumes a message and returns an error if a failure occurs while processing the message.
type HandlerSimpleConsumerFunc[Body any] func(ctx *SimpleContext[Body]) error

var ctxInterrupt context.Context

// ReceiveMessage Works as a repeating job, when triggered, it will fetch messages from the indicated queue
// in the queueUrl parameter, if it does not find any messages it will reprocess the function looking for new messages again,
// otherwise, it will process these messages converting them into Context with the types of Body and MessageAttributes
// passed explicitly in the function, converted to the Context.Message.Body and Context.Message.MessageAttributes field,
// if you don't use MessageAttributes, you can use the SimpleReceiveMessage function. After conversion,
// we call the handler parameter so that this function processes message by message, in this handler we expect a return
// error, if no error occurred when processing the message, we have the auto-delete option
// (option.Consumer.DeleteMessageProcessedSuccess) if you have it enabled, we will remove the message from the file for you.
// finally, when processing all messages, we return to the initial flow looking for new messages.
//
// # Parameters
//
// - queueUrl: url of the queue where you want to fetch messages
// - handler: function to process the received message
// - opts: list of option.Consumer to customize the job
//
// # Panic
//
// If 3 errors occur when obtaining the message from the queue, it will trigger a panic informing the error returned
// from AWS SQS.
func ReceiveMessage[Body, MessageAttributes any](
queueUrl string,
handler HandlerConsumerFunc[Body, MessageAttributes],
Expand All @@ -67,6 +119,28 @@ func ReceiveMessage[Body, MessageAttributes any](
receiveMessage(queueUrl, handler, option.GetConsumerByParams(opts))
}

// ReceiveMessageAsync Works like a repeating job, when triggered, it will fetch messages from the indicated queue
// in the queueUrl parameter, if it does not find any messages it will reprocess the function looking for new messages again,
// otherwise, it will process these messages converting them into Context with the types of Body and MessageAttributes
// passed explicitly in the function, converted to the Context.Message.Body and Context.Message.MessageAttributes field,
// if you don't use MessageAttributes, you can use the SimpleReceiveMessageAsync function. After conversion,
// we call the handler parameter so that this function processes message by message, in this handler we expect a return
// error, if no error occurred when processing the message, we have the auto-delete option
// (option.Consumer.DeleteMessageProcessedSuccess) where if enabled, we remove the message from the queue for you.
// finally, when processing all messages, we return to the initial flow looking for new messages.
//
// Unlike ReceiveMessage, this function will be processed asynchronously.
//
// # Parameters
//
// - queueUrl: url of the queue where you want to fetch messages
// - handler: function to process the received message
// - opts: list of options.Consumer to customize the job
//
// # Panic
//
// If 3 errors occur when obtaining the message from the queue, it will trigger a panic informing the error returned
// from AWS SQS.
func ReceiveMessageAsync[Body, MessageAttributes any](
queueUrl string,
handler HandlerConsumerFunc[Body, MessageAttributes],
Expand All @@ -75,6 +149,25 @@ func ReceiveMessageAsync[Body, MessageAttributes any](
go receiveMessage(queueUrl, handler, option.GetConsumerByParams(opts))
}

// SimpleReceiveMessage Works as a repeating job, when triggered, it will fetch messages from the indicated queue
// in the queueUrl parameter, if it does not find any messages it will reprocess the function looking for new messages
// again, otherwise, it will process these messages converting them to Context with the Body type passed explicitly
// in the function, converted to the Context.Message.Body field. After the conversion, we call the handler parameter
// so that this function processes message by message, in this handler we expect an error return, if none has occurred
// error processing the message, we have the auto-delete option (option.Consumer.DeleteMessageProcessedSuccess) in which case
// is enabled, we remove the message from the queue for you. Finally, after processing all messages, we return to the flow
// initial searching for new messages.
//
// # Parameters
//
// - queueUrl: url of the queue where you want to fetch messages
// - handler: function to process the received message
// - opts: list of options.Consumer to customize the job
//
// # Panic
//
// If 3 errors occur when obtaining the message from the queue, it will trigger a panic informing the error returned
// from AWS SQS.
func SimpleReceiveMessage[Body any](
queueUrl string,
simpleHandle HandlerSimpleConsumerFunc[Body],
Expand All @@ -84,6 +177,27 @@ func SimpleReceiveMessage[Body any](
receiveMessage(queueUrl, handler, option.GetConsumerByParams(opts))
}

// SimpleReceiveMessageAsync Works like a repeating job, when triggered, it will fetch messages from the indicated queue
// in the queueUrl parameter, if it does not find any messages it will reprocess the function looking for new messages again,
// otherwise, it will process these messages converting them to Context with the Body type passed explicitly
// in the function, converted to the Context.Message.Body field. After the conversion, we call the handler parameter so
// that this function processes message by message, in this handler we expect an error return, if none has occurred
// error processing the message, we have the auto-delete option (option.Consumer.DeleteMessageProcessedSuccess) in which case
// is enabled, we remove the message from the queue for you. Finally, after processing all messages, we return to the flow
// initial searching for new messages.
//
// Unlike SimpleReceiveMessage, this function will be processed asynchronously.
//
// # Parameters
//
// - queueUrl: url of the queue where you want to fetch messages
// - handler: function to process the received message
// - opts: list of options.Consumer to customize the job
//
// # Panic
//
// If 3 errors occur when obtaining the message from the queue, it will trigger a panic informing the error returned
// from AWS SQS.
func SimpleReceiveMessageAsync[Body any](
queueUrl string,
simpleHandle HandlerSimpleConsumerFunc[Body],
Expand Down Expand Up @@ -166,7 +280,7 @@ func processMessages[Body, MessageAttributes any](
var count int
var mgsS, mgsF []string
for _, message := range output.Messages {
nCtx, err := prepareContextConsumer[Body, MessageAttributes](ctx, queueUrl, message, opt)
nCtx, err := prepareContextConsumer[Body, MessageAttributes](ctx, queueUrl, message)
if err != nil {
loggerErr(opt.DebugMode, "error prepare context to consumer:", err)
return
Expand All @@ -185,7 +299,6 @@ func prepareContextConsumer[Body, MessageAttributes any](
ctx context.Context,
queueUrl string,
message types.Message,
opt option.Consumer,
) (*Context[Body, MessageAttributes], error) {
ctxConsumer := &Context[Body, MessageAttributes]{
Context: ctx,
Expand All @@ -211,7 +324,7 @@ func prepareContextConsumer[Body, MessageAttributes any](
if util.IsMapMessageAttributeValues(messagesAttributes) {
messageReceived.MessageAttributes = any(message.MessageAttributes).(MessageAttributes)
} else {
convertMessageAttributes[MessageAttributes](message.MessageAttributes, &messagesAttributes, opt)
convertMessageAttributes[MessageAttributes](message.MessageAttributes, &messagesAttributes)
messageReceived.MessageAttributes = messagesAttributes
}
}
Expand All @@ -229,7 +342,7 @@ func appendMessagesByResult(messageId string, err error, mgsS, mgsF []string) {
}
}

func convertMessageAttributes[T any](messageAttributes map[string]types.MessageAttributeValue, dest *T, opt option.Consumer) {
func convertMessageAttributes[T any](messageAttributes map[string]types.MessageAttributeValue, dest *T) {
m := map[string]any{}
for k, v := range messageAttributes {
var valueProcessed any
Expand Down
1 change: 0 additions & 1 deletion sqs/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,3 @@ import "errors"

var ErrMessageBodyEmpty = errors.New("sqs: no message body passed")
var ErrParseBody = errors.New("sqs: message parse body failed")
var ErrInvalidBodyType = errors.New("sqs: invalid type body")

0 comments on commit a5d4b07

Please sign in to comment.