diff --git a/client.go b/client.go index acd7481..b0eaa02 100644 --- a/client.go +++ b/client.go @@ -12,16 +12,11 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/vvatanabe/dynamomq/internal/clock" + "github.com/vvatanabe/dynamomq/internal/constant" ) const ( - DefaultTableName = "dynamo-mq-table" - DefaultQueueingIndexName = "dynamo-mq-index-queue_type-sent_at" - DefaultRetryMaxAttempts = 10 - DefaultVisibilityTimeoutInSeconds = 30 - DefaultVisibilityTimeout = DefaultVisibilityTimeoutInSeconds * time.Second - DefaultMaxListMessages = 10 - DefaultQueryLimit = 250 + defaultQueryLimit = 250 ) // Client is an interface for interacting with a DynamoDB-based message queue system. @@ -150,9 +145,9 @@ func WithAWSRetryMaxAttempts(retryMaxAttempts int) func(*ClientOptions) { // It returns an error if the initialization of the DynamoDB client fails. func NewFromConfig[T any](cfg aws.Config, optFns ...func(*ClientOptions)) (Client[T], error) { o := &ClientOptions{ - TableName: DefaultTableName, - QueueingIndexName: DefaultQueueingIndexName, - RetryMaxAttempts: DefaultRetryMaxAttempts, + TableName: constant.DefaultTableName, + QueueingIndexName: constant.DefaultQueueingIndexName, + RetryMaxAttempts: constant.DefaultRetryMaxAttempts, UseFIFO: false, Clock: &clock.RealClock{}, MarshalMap: attributevalue.MarshalMap, @@ -276,7 +271,7 @@ func (c *ClientImpl[T]) ReceiveMessage(ctx context.Context, params *ReceiveMessa params.QueueType = QueueTypeStandard } if params.VisibilityTimeout <= 0 { - params.VisibilityTimeout = DefaultVisibilityTimeoutInSeconds + params.VisibilityTimeout = constant.DefaultVisibilityTimeoutInSeconds } selected, err := c.selectMessage(ctx, params) @@ -330,7 +325,7 @@ func (c *ClientImpl[T]) executeQuery(ctx context.Context, params *ReceiveMessage KeyConditionExpression: expr.KeyCondition(), ExpressionAttributeNames: expr.Names(), ExpressionAttributeValues: expr.Values(), - Limit: aws.Int32(DefaultQueryLimit), + Limit: aws.Int32(defaultQueryLimit), ScanIndexForward: aws.Bool(true), ExclusiveStartKey: exclusiveStartKey, }) @@ -653,7 +648,7 @@ func (c *ClientImpl[T]) queryAndCalculateQueueStats(ctx context.Context, expr ex ExpressionAttributeNames: expr.Names(), KeyConditionExpression: expr.KeyCondition(), ScanIndexForward: aws.Bool(true), - Limit: aws.Int32(DefaultQueryLimit), + Limit: aws.Int32(defaultQueryLimit), ExpressionAttributeValues: expr.Values(), ExclusiveStartKey: exclusiveStartKey, }) @@ -745,7 +740,7 @@ func (c *ClientImpl[T]) queryAndCalculateDLQStats(ctx context.Context, expr expr ExpressionAttributeNames: expr.Names(), ExpressionAttributeValues: expr.Values(), KeyConditionExpression: expr.KeyCondition(), - Limit: aws.Int32(DefaultQueryLimit), + Limit: aws.Int32(defaultQueryLimit), ScanIndexForward: aws.Bool(true), ExclusiveStartKey: lastEvaluatedKey, }) @@ -837,7 +832,7 @@ func (c *ClientImpl[T]) ListMessages(ctx context.Context, params *ListMessagesIn params = &ListMessagesInput{} } if params.Size <= 0 { - params.Size = DefaultMaxListMessages + params.Size = constant.DefaultMaxListMessages } output, err := c.dynamoDB.Scan(ctx, &dynamodb.ScanInput{ TableName: &c.tableName, diff --git a/client_test.go b/client_test.go index 7f92211..e51dc23 100644 --- a/client_test.go +++ b/client_test.go @@ -18,13 +18,14 @@ import ( "github.com/upsidr/dynamotest" "github.com/vvatanabe/dynamomq" "github.com/vvatanabe/dynamomq/internal/clock" + "github.com/vvatanabe/dynamomq/internal/constant" "github.com/vvatanabe/dynamomq/internal/mock" "github.com/vvatanabe/dynamomq/internal/test" ) func SetupDynamoDB(t *testing.T, initialData ...*types.PutRequest) (tableName string, client *dynamodb.Client, clean func()) { client, clean = dynamotest.NewDynamoDB(t) - tableName = dynamomq.DefaultTableName + "-" + uuid.NewString() + tableName = constant.DefaultTableName + "-" + uuid.NewString() dynamotest.PrepTable(t, client, dynamotest.InitialTableSetup{ Table: &dynamodb.CreateTableInput{ AttributeDefinitions: []types.AttributeDefinition{ @@ -322,7 +323,7 @@ func TestDynamoMQClientReceiveMessage(t *testing.T) { runTestsParallel[any, *dynamomq.ReceiveMessageOutput[test.MessageData]](t, "ReceiveMessage()", tests, func(client dynamomq.Client[test.MessageData], _ any) (*dynamomq.ReceiveMessageOutput[test.MessageData], error) { return client.ReceiveMessage(context.Background(), &dynamomq.ReceiveMessageInput{ - VisibilityTimeout: dynamomq.DefaultVisibilityTimeoutInSeconds, + VisibilityTimeout: constant.DefaultVisibilityTimeoutInSeconds, }) }) } @@ -348,7 +349,7 @@ func testDynamoMQClientReceiveMessageSequence(t *testing.T, useFIFO bool) { for i, want := range wants { result, err := client.ReceiveMessage(ctx, &dynamomq.ReceiveMessageInput{ QueueType: dynamomq.QueueTypeStandard, - VisibilityTimeout: dynamomq.DefaultVisibilityTimeoutInSeconds, + VisibilityTimeout: constant.DefaultVisibilityTimeoutInSeconds, }) test.AssertError(t, err, nil, fmt.Sprintf("ReceiveMessage() [%d-1]", i)) test.AssertDeepEqual(t, result, want, fmt.Sprintf("ReceiveMessage() [%d-2]", i)) @@ -358,7 +359,7 @@ func testDynamoMQClientReceiveMessageSequence(t *testing.T, useFIFO bool) { } _, err = client.ReceiveMessage(ctx, &dynamomq.ReceiveMessageInput{ - VisibilityTimeout: dynamomq.DefaultVisibilityTimeoutInSeconds, + VisibilityTimeout: constant.DefaultVisibilityTimeoutInSeconds, }) test.AssertError(t, err, &dynamomq.EmptyQueueError{}, fmt.Sprintf("ReceiveMessage() [%d-3]", i)) @@ -369,7 +370,7 @@ func testDynamoMQClientReceiveMessageSequence(t *testing.T, useFIFO bool) { } _, err := client.ReceiveMessage(ctx, &dynamomq.ReceiveMessageInput{ - VisibilityTimeout: dynamomq.DefaultVisibilityTimeoutInSeconds, + VisibilityTimeout: constant.DefaultVisibilityTimeoutInSeconds, }) test.AssertError(t, err, &dynamomq.EmptyQueueError{}, "ReceiveMessage() [last]") } @@ -840,12 +841,12 @@ func prepareTestClient(ctx context.Context, t *testing.T, tableName, raw, clean := setupTable(t) optFns := []func(*dynamomq.ClientOptions){ dynamomq.WithTableName(tableName), - dynamomq.WithQueueingIndexName(dynamomq.DefaultQueueingIndexName), + dynamomq.WithQueueingIndexName(constant.DefaultQueueingIndexName), dynamomq.WithAWSBaseEndpoint(""), dynamomq.WithAWSDynamoDBClient(raw), mock.WithClock(sdkClock), dynamomq.WithUseFIFO(useFIFO), - dynamomq.WithAWSRetryMaxAttempts(dynamomq.DefaultRetryMaxAttempts), + dynamomq.WithAWSRetryMaxAttempts(constant.DefaultRetryMaxAttempts), WithUnmarshalMap(unmarshalMap), WithMarshalMap(marshalMap), WithUnmarshalListOfMaps(unmarshalListOfMaps), @@ -994,7 +995,7 @@ func TestTestDynamoMQClientReturnUnmarshalingAttributeError(t *testing.T) { name: "ListMessages should return UnmarshalingAttributeError", operation: func() (any, error) { return client.ListMessages(context.Background(), &dynamomq.ListMessagesInput{ - Size: dynamomq.DefaultMaxListMessages, + Size: constant.DefaultMaxListMessages, }) }, }, @@ -1097,7 +1098,7 @@ func TestTestDynamoMQClientReturnDynamoDBAPIError(t *testing.T) { name: "ListMessages should return DynamoDBAPIError", operation: func() (any, error) { return client.ListMessages(context.Background(), &dynamomq.ListMessagesInput{ - Size: dynamomq.DefaultMaxListMessages, + Size: constant.DefaultMaxListMessages, }) }, }, diff --git a/consumer.go b/consumer.go index 6c4c269..f38c13b 100644 --- a/consumer.go +++ b/consumer.go @@ -8,6 +8,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/vvatanabe/dynamomq/internal/constant" ) const ( @@ -86,7 +88,7 @@ func NewConsumer[T any](client Client[T], processor MessageProcessor[T], opts .. PollingInterval: defaultPollingInterval, Concurrency: defaultConcurrency, MaximumReceives: defaultMaximumReceives, - VisibilityTimeout: DefaultVisibilityTimeoutInSeconds, + VisibilityTimeout: constant.DefaultVisibilityTimeoutInSeconds, RetryInterval: defaultRetryIntervalInSeconds, QueueType: defaultQueueType, } diff --git a/dynamomq_test.go b/dynamomq_test.go index 92cbfe5..7cba69e 100644 --- a/dynamomq_test.go +++ b/dynamomq_test.go @@ -5,6 +5,7 @@ import ( "github.com/vvatanabe/dynamomq" "github.com/vvatanabe/dynamomq/internal/clock" + "github.com/vvatanabe/dynamomq/internal/constant" "github.com/vvatanabe/dynamomq/internal/test" ) @@ -12,7 +13,7 @@ func MarkAsProcessing[T any](m *dynamomq.Message[T], now time.Time) { ts := clock.FormatRFC3339Nano(now) m.UpdatedAt = ts m.ReceivedAt = ts - m.InvisibleUntilAt = clock.FormatRFC3339Nano(now.Add(dynamomq.DefaultVisibilityTimeout)) + m.InvisibleUntilAt = clock.FormatRFC3339Nano(now.Add(constant.DefaultVisibilityTimeout)) } func MarkAsMovedToDLQ[T any](m *dynamomq.Message[T], now time.Time) { @@ -47,7 +48,7 @@ func NewMessageFromReadyToProcessing(id string, MarkAsProcessing(m, processingTime) m.Version = 2 m.ReceiveCount = 1 - m.InvisibleUntilAt = clock.FormatRFC3339Nano(processingTime.Add(dynamomq.DefaultVisibilityTimeout)) + m.InvisibleUntilAt = clock.FormatRFC3339Nano(processingTime.Add(constant.DefaultVisibilityTimeout)) r := &dynamomq.ReceiveMessageOutput[test.MessageData]{ Result: &dynamomq.Result{ ID: m.ID, diff --git a/internal/cmd/flag.go b/internal/cmd/flag.go index cc3ef0f..f2a75f0 100644 --- a/internal/cmd/flag.go +++ b/internal/cmd/flag.go @@ -1,6 +1,8 @@ package cmd -import "github.com/vvatanabe/dynamomq" +import ( + "github.com/vvatanabe/dynamomq/internal/constant" +) var flgs = &Flags{} @@ -16,12 +18,12 @@ var flagMap = FlagMap{ TableName: FlagSet[string]{ Name: "table-name", Usage: "The name of the table to contain the item.", - Value: dynamomq.DefaultTableName, + Value: constant.DefaultTableName, }, IndexName: FlagSet[string]{ Name: "index-name", Usage: "The name of the queueing index.", - Value: dynamomq.DefaultQueueingIndexName, + Value: constant.DefaultQueueingIndexName, }, EndpointURL: FlagSet[string]{ Name: "endpoint-url", diff --git a/internal/cmd/interactive.go b/internal/cmd/interactive.go index 09463e6..cda89eb 100644 --- a/internal/cmd/interactive.go +++ b/internal/cmd/interactive.go @@ -9,6 +9,7 @@ import ( "github.com/vvatanabe/dynamomq" "github.com/vvatanabe/dynamomq/internal/clock" + "github.com/vvatanabe/dynamomq/internal/constant" "github.com/vvatanabe/dynamomq/internal/test" ) @@ -145,7 +146,7 @@ func (c *Interactive) system(_ context.Context, _ []string) error { } func (c *Interactive) ls(ctx context.Context, _ []string) error { - out, err := c.Client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: dynamomq.DefaultMaxListMessages}) + out, err := c.Client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: constant.DefaultMaxListMessages}) if err != nil { return err } @@ -161,7 +162,7 @@ func (c *Interactive) ls(ctx context.Context, _ []string) error { } func (c *Interactive) purge(ctx context.Context, _ []string) error { - out, err := c.Client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: dynamomq.DefaultMaxListMessages}) + out, err := c.Client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: constant.DefaultMaxListMessages}) if err != nil { return err } diff --git a/internal/cmd/ls.go b/internal/cmd/ls.go index 98ad46e..f0a2d96 100644 --- a/internal/cmd/ls.go +++ b/internal/cmd/ls.go @@ -6,6 +6,7 @@ import ( "github.com/spf13/cobra" "github.com/vvatanabe/dynamomq" "github.com/vvatanabe/dynamomq/internal/clock" + "github.com/vvatanabe/dynamomq/internal/constant" ) func (f CommandFactory) CreateLSCommand(flgs *Flags) *cobra.Command { @@ -19,7 +20,7 @@ func (f CommandFactory) CreateLSCommand(flgs *Flags) *cobra.Command { if err != nil { return err } - out, err := client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: dynamomq.DefaultMaxListMessages}) + out, err := client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: constant.DefaultMaxListMessages}) if err != nil { return err } diff --git a/internal/cmd/purge.go b/internal/cmd/purge.go index 4242c27..63a78e9 100644 --- a/internal/cmd/purge.go +++ b/internal/cmd/purge.go @@ -5,6 +5,7 @@ import ( "github.com/spf13/cobra" "github.com/vvatanabe/dynamomq" + "github.com/vvatanabe/dynamomq/internal/constant" ) func (f CommandFactory) CreatePurgeCommand(flgs *Flags) *cobra.Command { @@ -18,7 +19,7 @@ func (f CommandFactory) CreatePurgeCommand(flgs *Flags) *cobra.Command { if err != nil { return err } - out, err := client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: dynamomq.DefaultMaxListMessages}) + out, err := client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: constant.DefaultMaxListMessages}) if err != nil { return err } diff --git a/internal/constant/constant.go b/internal/constant/constant.go new file mode 100644 index 0000000..d1fc4aa --- /dev/null +++ b/internal/constant/constant.go @@ -0,0 +1,12 @@ +package constant + +import "time" + +const ( + DefaultTableName = "dynamo-mq-table" + DefaultQueueingIndexName = "dynamo-mq-index-queue_type-sent_at" + DefaultRetryMaxAttempts = 10 + DefaultVisibilityTimeoutInSeconds = 30 + DefaultVisibilityTimeout = DefaultVisibilityTimeoutInSeconds * time.Second + DefaultMaxListMessages = 10 +)