Skip to content

Commit

Permalink
Finish documentation and README
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel Cataldo committed Dec 15, 2023
1 parent bb0f821 commit 19d2891
Show file tree
Hide file tree
Showing 15 changed files with 834 additions and 114 deletions.
400 changes: 399 additions & 1 deletion README.md

Large diffs are not rendered by default.

108 changes: 108 additions & 0 deletions _example/consumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package main

import (
"github.com/GabrielHCataldo/go-aws-sqs/sqs"
"github.com/GabrielHCataldo/go-aws-sqs/sqs/option"
"github.com/GabrielHCataldo/go-logger/logger"
"os"
"os/signal"
"time"
)

type test struct {
Name string `json:"name,omitempty"`
BirthDate time.Time `json:"birthDate,omitempty"`
Emails []string `json:"emails,omitempty"`
Bank bank `json:"bank,omitempty"`
Map map[string]any
}

type bank struct {
Account string `json:"account,omitempty"`
Digits string `json:"digits,omitempty"`
Balance float64 `json:"balance,omitempty"`
}

type messageAttTest struct {
Name string `json:"account,omitempty"`
Text string `json:"text,omitempty"`
Balance float64 `json:"balance,omitempty"`
Bool bool `json:"bool"`
Int int `json:"int"`
SubStruct test `json:"subStruct,omitempty"`
PointerTest *test `json:"pointerBank,omitempty"`
Map map[string]any
Any any
EmptyString string `json:"emptyString,omitempty"`
HideString string `json:"-"`
}

func main() {
simpleReceiveMessage()
simpleReceiveMessageStruct()
receiveMessage()
completeOptions()
simpleReceiveMessageAsync()
}

func simpleReceiveMessage() {
sqs.SimpleReceiveMessage(os.Getenv("SQS_QUEUE_TEST_STRING_URL"), handlerSimple)
}

func simpleReceiveMessageStruct() {
sqs.SimpleReceiveMessage(os.Getenv("SQS_QUEUE_TEST_URL"), handler)
}

func receiveMessage() {
sqs.ReceiveMessage(os.Getenv("SQS_QUEUE_TEST_STRING_URL"), handlerReceiveMessage)
}

func completeOptions() {
opt := option.NewConsumer().
// HTTP communication customization options with AWS SQS
SetHttpClient(option.HttpClient{}).
// print logs (default: false)
SetDebugMode(true).
// If true remove the message from the queue after successfully processed (handler error return is null) (default: false)
SetDeleteMessageProcessedSuccess(true).
// Duration time to process the message, timeout applied in the past context. (default: 5 seconds)
SetConsumerMessageTimeout(5 * time.Second).
// Delay to run the next search for messages in the queue (default: 0)
SetDelayQueryLoop(5 * time.Second).
// The maximum number of messages to return. 1 a 10 (default: 10)
SetMaxNumberOfMessages(10).
// The maximum number of messages to return. 1 a 10 (default: 0)
SetVisibilityTimeout(5 * time.Second).
// The duration that the received messages are hidden from subsequent
// retrieve requests after being retrieved by a ReceiveMessage request.
SetReceiveRequestAttemptId("").
// The duration for which the call waits for a message to arrive in
// the queue before returning (default: 0)
SetWaitTimeSeconds(1 * time.Second)
sqs.SimpleReceiveMessage(os.Getenv("SQS_QUEUE_TEST_URL"), handler, opt)
}

func simpleReceiveMessageAsync() {
sqs.SimpleReceiveMessageAsync(os.Getenv("SQS_QUEUE_TEST_URL"), handler, option.NewConsumer().SetDebugMode(true))
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
select {
case <-c:
logger.Info("Stopped application!")
}
}

func handler(ctx *sqs.SimpleContext[test]) error {
logger.Debug("ctx simple body struct to process message:", ctx)
return nil
}

func handlerSimple(ctx *sqs.SimpleContext[string]) error {
logger.Debug("ctx simple to process message:", ctx)
return nil
}

func handlerReceiveMessage(ctx *sqs.Context[string, messageAttTest]) error {
logger.Debug("ctx to process message:", ctx)
return nil
}
150 changes: 150 additions & 0 deletions _example/producer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package main

import (
"context"
"github.com/GabrielHCataldo/go-aws-sqs/sqs"
"github.com/GabrielHCataldo/go-aws-sqs/sqs/option"
"github.com/GabrielHCataldo/go-logger/logger"
"os"
"time"
)

type test struct {
Name string `json:"name,omitempty"`
BirthDate time.Time `json:"birthDate,omitempty"`
Emails []string `json:"emails,omitempty"`
Bank bank `json:"bank,omitempty"`
Map map[string]any
}

type bank struct {
Account string `json:"account,omitempty"`
Digits string `json:"digits,omitempty"`
Balance float64 `json:"balance,omitempty"`
}

type messageAttTest struct {
Name string `json:"account,omitempty"`
Text string `json:"text,omitempty"`
Balance float64 `json:"balance,omitempty"`
Bool bool `json:"bool"`
Int int `json:"int"`
SubStruct test `json:"subStruct,omitempty"`
PointerTest *test `json:"pointerBank,omitempty"`
Map map[string]any
Any any
EmptyString string `json:"emptyString,omitempty"`
HideString string `json:"-"`
}

func main() {
simple()
simpleAsync()
structBody()
mapBody()
completeOptions()
}

func simple() {
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
body := "body test"
message, err := sqs.SendMessage(ctx, os.Getenv("SQS_QUEUE_TEST_STRING_URL"), body)
if err != nil {
logger.Error("error send message:", err)
} else {
logger.Info("message sent successfully:", message)
}
}

func simpleAsync() {
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
body := "body test"
sqs.SendMessageAsync(ctx, os.Getenv("SQS_QUEUE_TEST_STRING_URL"), body)
}

func structBody() {
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
body := initTestStruct()
message, err := sqs.SendMessage(ctx, os.Getenv("SQS_QUEUE_TEST_URL"), body)
if err != nil {
logger.Error("error send message:", err)
} else {
logger.Info("message sent successfully:", message)
}
}

func mapBody() {
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
body := initTestMap()
message, err := sqs.SendMessage(ctx, os.Getenv("SQS_QUEUE_TEST_STRING_URL"), body)
if err != nil {
logger.Error("error send message:", err)
} else {
logger.Info("message sent successfully:", message)
}
}

func completeOptions() {
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
body := initTestStruct()
opt := option.NewProducer().
// HTTP communication customization options with AWS SQS
SetHttpClient(option.HttpClient{}).
// print logs (default: false)
SetDebugMode(true).
// delay to delay the availability of message processing (default: 0)
SetDelaySeconds(5 * time.Second).
// Message attributes, must be of type Map or Struct, other types are not acceptable.
SetMessageAttributes(initMessageAttTest()).
// The message system attribute to send
SetMessageSystemAttributes(option.MessageSystemAttributes{}).
// This parameter applies only to FIFO (first-in-first-out) queues. The token used for deduplication of sent messages.
SetMessageDeduplicationId("").
// This parameter applies only to FIFO (first-in-first-out) queues. The tag that specifies that a message belongs to a specific message group.
SetMessageGroupId("")
message, err := sqs.SendMessage(ctx, os.Getenv("SQS_QUEUE_TEST_URL"), body, opt)
if err != nil {
logger.Error("error send message:", err)
} else {
logger.Info("message sent successfully:", message)
}
}

func initTestStruct() test {
b := bank{
Account: "123456",
Digits: "2",
Balance: 200.12,
}
return test{
Name: "Test Name",
BirthDate: time.Now(),
Emails: []string{"[email protected]", "[email protected]", "[email protected]"},
Bank: b,
Map: map[string]any{"int": 1, "bool": true, "float": 1.23, "string": "text test"},
}
}

func initTestMap() map[string]any {
return map[string]any{"int": 1, "bool": true, "float": 1.23, "string": "text test"}
}

func initMessageAttTest() messageAttTest {
t := initTestStruct()
return messageAttTest{
Name: "Name test producer",
Text: "Text field",
Balance: 10.23,
Bool: true,
Int: 3,
SubStruct: t,
PointerTest: &t,
Map: initTestMap(),
HideString: "hide test",
}
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module go-aws-sqs
module github.com/GabrielHCataldo/go-aws-sqs

go 1.21
go 1.21.3

require (
github.com/GabrielHCataldo/go-logger v1.0.8
Expand Down
Binary file added gopher-sqs.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
77 changes: 59 additions & 18 deletions sqs/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package sqs
import (
"context"
"fmt"
"github.com/GabrielHCataldo/go-aws-sqs/internal/client"
"github.com/GabrielHCataldo/go-aws-sqs/internal/util"
"github.com/GabrielHCataldo/go-aws-sqs/sqs/option"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"go-aws-sqs/internal/client"
"go-aws-sqs/internal/util"
"go-aws-sqs/sqs/option"
"reflect"
"time"
)
Expand Down Expand Up @@ -89,6 +89,11 @@ type HandlerConsumerFunc[Body, MessageAttributes any] func(ctx *Context[Body, Me
// HandlerSimpleConsumerFunc is a function that consumes a message and returns an error if a failure occurs while processing the message.
type HandlerSimpleConsumerFunc[Body any] func(ctx *SimpleContext[Body]) error

type channelMessageProcessed struct {
Err error
Signal *chan struct{}
}

var ctxInterrupt context.Context

// ReceiveMessage Works as a repeating job, when triggered, it will fetch messages from the indicated queue
Expand Down Expand Up @@ -238,6 +243,7 @@ func receiveMessage[Body, MessageAttributes any](
}
loggerInfo(opt.DebugMode, "Start process received messages size:", len(output.Messages))
processMessages[Body, MessageAttributes](queueUrl, output, handler, opt)
time.Sleep(opt.DelayQueryLoop)
}
}

Expand Down Expand Up @@ -275,26 +281,61 @@ func processMessages[Body, MessageAttributes any](
handler HandlerConsumerFunc[Body, MessageAttributes],
opt option.Consumer,
) {
ctx, cancel := context.WithTimeout(context.TODO(), opt.ConsumerMessageTimeout)
defer cancel()
var count int
var mgsS, mgsF []string
for _, message := range output.Messages {
nCtx, err := prepareContextConsumer[Body, MessageAttributes](ctx, queueUrl, message)
if err != nil {
loggerErr(opt.DebugMode, "error prepare context to consumer:", err)
return
}
err = handler(nCtx)
appendMessagesByResult(nCtx.Message.Id, err, mgsS, mgsF)
if opt.DeleteMessageProcessedSuccess {
go deleteMessage(queueUrl, *message.ReceiptHandle)
}
mgsS, mgsF = processMessage(queueUrl, handler, message, opt)
count++
}
loggerInfo(opt.DebugMode, "Finish process messages!", "processed:", count, "success:", mgsS, "failed:", mgsF)
}

func processMessage[Body, MessageAttributes any](
queueUrl string,
handler HandlerConsumerFunc[Body, MessageAttributes],
message types.Message,
opt option.Consumer,
) (mgsS, mgsF []string) {
ctx, cancel := context.WithTimeout(context.TODO(), opt.ConsumerMessageTimeout)
defer cancel()
ctxConsumer, err := prepareContextConsumer[Body, MessageAttributes](ctx, queueUrl, message)
if err != nil {
loggerErr(opt.DebugMode, "error prepare context to consumer:", err)
return
}
signal := make(chan struct{}, 1)
channel := channelMessageProcessed{
Signal: &signal,
}
go processHandler(ctxConsumer, handler, opt, &channel)
select {
case <-ctx.Done():
appendMessagesByResult(ctxConsumer.Message.Id, ctx.Err(), &mgsS, &mgsF)
break
case <-*channel.Signal:
appendMessagesByResult(*message.MessageId, channel.Err, &mgsS, &mgsF)
break
}
return mgsS, mgsF
}

func processHandler[Body, MessageAttributes any](
ctx *Context[Body, MessageAttributes],
handler HandlerConsumerFunc[Body, MessageAttributes],
opt option.Consumer,
channel *channelMessageProcessed,
) {
err := handler(ctx)
if ctx.Err() != nil {
return
}
if err == nil && opt.DeleteMessageProcessedSuccess {
go deleteMessage(ctx.QueueUrl, ctx.Message.ReceiptHandle)
}
channel.Err = err
*channel.Signal <- struct{}{}
}

func prepareContextConsumer[Body, MessageAttributes any](
ctx context.Context,
queueUrl string,
Expand Down Expand Up @@ -334,11 +375,11 @@ func prepareContextConsumer[Body, MessageAttributes any](
return ctxConsumer, nil
}

func appendMessagesByResult(messageId string, err error, mgsS, mgsF []string) {
func appendMessagesByResult(messageId string, err error, mgsS, mgsF *[]string) {
if err != nil {
mgsF = append(mgsF, messageId)
*mgsF = append(*mgsF, messageId)
} else {
mgsS = append(mgsS, messageId)
*mgsS = append(*mgsS, messageId)
}
}

Expand Down
Loading

0 comments on commit 19d2891

Please sign in to comment.