Skip to content

Commit

Permalink
refactor: move default constant values to internal/constant package
Browse files Browse the repository at this point in the history
  • Loading branch information
vvatanabe committed Dec 9, 2023
1 parent 7d3b73b commit 48ba250
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 34 deletions.
25 changes: 10 additions & 15 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,11 @@ import (
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/vvatanabe/dynamomq/internal/clock"
"github.com/vvatanabe/dynamomq/internal/constant"
)

const (
DefaultTableName = "dynamo-mq-table"
DefaultQueueingIndexName = "dynamo-mq-index-queue_type-sent_at"
DefaultRetryMaxAttempts = 10
DefaultVisibilityTimeoutInSeconds = 30
DefaultVisibilityTimeout = DefaultVisibilityTimeoutInSeconds * time.Second
DefaultMaxListMessages = 10
DefaultQueryLimit = 250
defaultQueryLimit = 250
)

// Client is an interface for interacting with a DynamoDB-based message queue system.
Expand Down Expand Up @@ -150,9 +145,9 @@ func WithAWSRetryMaxAttempts(retryMaxAttempts int) func(*ClientOptions) {
// It returns an error if the initialization of the DynamoDB client fails.
func NewFromConfig[T any](cfg aws.Config, optFns ...func(*ClientOptions)) (Client[T], error) {
o := &ClientOptions{
TableName: DefaultTableName,
QueueingIndexName: DefaultQueueingIndexName,
RetryMaxAttempts: DefaultRetryMaxAttempts,
TableName: constant.DefaultTableName,
QueueingIndexName: constant.DefaultQueueingIndexName,
RetryMaxAttempts: constant.DefaultRetryMaxAttempts,
UseFIFO: false,
Clock: &clock.RealClock{},
MarshalMap: attributevalue.MarshalMap,
Expand Down Expand Up @@ -276,7 +271,7 @@ func (c *ClientImpl[T]) ReceiveMessage(ctx context.Context, params *ReceiveMessa
params.QueueType = QueueTypeStandard
}
if params.VisibilityTimeout <= 0 {
params.VisibilityTimeout = DefaultVisibilityTimeoutInSeconds
params.VisibilityTimeout = constant.DefaultVisibilityTimeoutInSeconds
}

selected, err := c.selectMessage(ctx, params)
Expand Down Expand Up @@ -330,7 +325,7 @@ func (c *ClientImpl[T]) executeQuery(ctx context.Context, params *ReceiveMessage
KeyConditionExpression: expr.KeyCondition(),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
Limit: aws.Int32(DefaultQueryLimit),
Limit: aws.Int32(defaultQueryLimit),
ScanIndexForward: aws.Bool(true),
ExclusiveStartKey: exclusiveStartKey,
})
Expand Down Expand Up @@ -653,7 +648,7 @@ func (c *ClientImpl[T]) queryAndCalculateQueueStats(ctx context.Context, expr ex
ExpressionAttributeNames: expr.Names(),
KeyConditionExpression: expr.KeyCondition(),
ScanIndexForward: aws.Bool(true),
Limit: aws.Int32(DefaultQueryLimit),
Limit: aws.Int32(defaultQueryLimit),
ExpressionAttributeValues: expr.Values(),
ExclusiveStartKey: exclusiveStartKey,
})
Expand Down Expand Up @@ -745,7 +740,7 @@ func (c *ClientImpl[T]) queryAndCalculateDLQStats(ctx context.Context, expr expr
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
KeyConditionExpression: expr.KeyCondition(),
Limit: aws.Int32(DefaultQueryLimit),
Limit: aws.Int32(defaultQueryLimit),
ScanIndexForward: aws.Bool(true),
ExclusiveStartKey: lastEvaluatedKey,
})
Expand Down Expand Up @@ -837,7 +832,7 @@ func (c *ClientImpl[T]) ListMessages(ctx context.Context, params *ListMessagesIn
params = &ListMessagesInput{}
}
if params.Size <= 0 {
params.Size = DefaultMaxListMessages
params.Size = constant.DefaultMaxListMessages
}
output, err := c.dynamoDB.Scan(ctx, &dynamodb.ScanInput{
TableName: &c.tableName,
Expand Down
19 changes: 10 additions & 9 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
"github.com/upsidr/dynamotest"
"github.com/vvatanabe/dynamomq"
"github.com/vvatanabe/dynamomq/internal/clock"
"github.com/vvatanabe/dynamomq/internal/constant"
"github.com/vvatanabe/dynamomq/internal/mock"
"github.com/vvatanabe/dynamomq/internal/test"
)

func SetupDynamoDB(t *testing.T, initialData ...*types.PutRequest) (tableName string, client *dynamodb.Client, clean func()) {
client, clean = dynamotest.NewDynamoDB(t)
tableName = dynamomq.DefaultTableName + "-" + uuid.NewString()
tableName = constant.DefaultTableName + "-" + uuid.NewString()
dynamotest.PrepTable(t, client, dynamotest.InitialTableSetup{
Table: &dynamodb.CreateTableInput{
AttributeDefinitions: []types.AttributeDefinition{
Expand Down Expand Up @@ -322,7 +323,7 @@ func TestDynamoMQClientReceiveMessage(t *testing.T) {
runTestsParallel[any, *dynamomq.ReceiveMessageOutput[test.MessageData]](t, "ReceiveMessage()", tests,
func(client dynamomq.Client[test.MessageData], _ any) (*dynamomq.ReceiveMessageOutput[test.MessageData], error) {
return client.ReceiveMessage(context.Background(), &dynamomq.ReceiveMessageInput{
VisibilityTimeout: dynamomq.DefaultVisibilityTimeoutInSeconds,
VisibilityTimeout: constant.DefaultVisibilityTimeoutInSeconds,
})
})
}
Expand All @@ -348,7 +349,7 @@ func testDynamoMQClientReceiveMessageSequence(t *testing.T, useFIFO bool) {
for i, want := range wants {
result, err := client.ReceiveMessage(ctx, &dynamomq.ReceiveMessageInput{
QueueType: dynamomq.QueueTypeStandard,
VisibilityTimeout: dynamomq.DefaultVisibilityTimeoutInSeconds,
VisibilityTimeout: constant.DefaultVisibilityTimeoutInSeconds,
})
test.AssertError(t, err, nil, fmt.Sprintf("ReceiveMessage() [%d-1]", i))
test.AssertDeepEqual(t, result, want, fmt.Sprintf("ReceiveMessage() [%d-2]", i))
Expand All @@ -358,7 +359,7 @@ func testDynamoMQClientReceiveMessageSequence(t *testing.T, useFIFO bool) {
}

_, err = client.ReceiveMessage(ctx, &dynamomq.ReceiveMessageInput{
VisibilityTimeout: dynamomq.DefaultVisibilityTimeoutInSeconds,
VisibilityTimeout: constant.DefaultVisibilityTimeoutInSeconds,
})
test.AssertError(t, err, &dynamomq.EmptyQueueError{}, fmt.Sprintf("ReceiveMessage() [%d-3]", i))

Expand All @@ -369,7 +370,7 @@ func testDynamoMQClientReceiveMessageSequence(t *testing.T, useFIFO bool) {
}

_, err := client.ReceiveMessage(ctx, &dynamomq.ReceiveMessageInput{
VisibilityTimeout: dynamomq.DefaultVisibilityTimeoutInSeconds,
VisibilityTimeout: constant.DefaultVisibilityTimeoutInSeconds,
})
test.AssertError(t, err, &dynamomq.EmptyQueueError{}, "ReceiveMessage() [last]")
}
Expand Down Expand Up @@ -840,12 +841,12 @@ func prepareTestClient(ctx context.Context, t *testing.T,
tableName, raw, clean := setupTable(t)
optFns := []func(*dynamomq.ClientOptions){
dynamomq.WithTableName(tableName),
dynamomq.WithQueueingIndexName(dynamomq.DefaultQueueingIndexName),
dynamomq.WithQueueingIndexName(constant.DefaultQueueingIndexName),
dynamomq.WithAWSBaseEndpoint(""),
dynamomq.WithAWSDynamoDBClient(raw),
mock.WithClock(sdkClock),
dynamomq.WithUseFIFO(useFIFO),
dynamomq.WithAWSRetryMaxAttempts(dynamomq.DefaultRetryMaxAttempts),
dynamomq.WithAWSRetryMaxAttempts(constant.DefaultRetryMaxAttempts),
WithUnmarshalMap(unmarshalMap),
WithMarshalMap(marshalMap),
WithUnmarshalListOfMaps(unmarshalListOfMaps),
Expand Down Expand Up @@ -994,7 +995,7 @@ func TestTestDynamoMQClientReturnUnmarshalingAttributeError(t *testing.T) {
name: "ListMessages should return UnmarshalingAttributeError",
operation: func() (any, error) {
return client.ListMessages(context.Background(), &dynamomq.ListMessagesInput{
Size: dynamomq.DefaultMaxListMessages,
Size: constant.DefaultMaxListMessages,
})
},
},
Expand Down Expand Up @@ -1097,7 +1098,7 @@ func TestTestDynamoMQClientReturnDynamoDBAPIError(t *testing.T) {
name: "ListMessages should return DynamoDBAPIError",
operation: func() (any, error) {
return client.ListMessages(context.Background(), &dynamomq.ListMessagesInput{
Size: dynamomq.DefaultMaxListMessages,
Size: constant.DefaultMaxListMessages,
})
},
},
Expand Down
4 changes: 3 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/vvatanabe/dynamomq/internal/constant"
)

const (
Expand Down Expand Up @@ -86,7 +88,7 @@ func NewConsumer[T any](client Client[T], processor MessageProcessor[T], opts ..
PollingInterval: defaultPollingInterval,
Concurrency: defaultConcurrency,
MaximumReceives: defaultMaximumReceives,
VisibilityTimeout: DefaultVisibilityTimeoutInSeconds,
VisibilityTimeout: constant.DefaultVisibilityTimeoutInSeconds,
RetryInterval: defaultRetryIntervalInSeconds,
QueueType: defaultQueueType,
}
Expand Down
5 changes: 3 additions & 2 deletions dynamomq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (

"github.com/vvatanabe/dynamomq"
"github.com/vvatanabe/dynamomq/internal/clock"
"github.com/vvatanabe/dynamomq/internal/constant"
"github.com/vvatanabe/dynamomq/internal/test"
)

func MarkAsProcessing[T any](m *dynamomq.Message[T], now time.Time) {
ts := clock.FormatRFC3339Nano(now)
m.UpdatedAt = ts
m.ReceivedAt = ts
m.InvisibleUntilAt = clock.FormatRFC3339Nano(now.Add(dynamomq.DefaultVisibilityTimeout))
m.InvisibleUntilAt = clock.FormatRFC3339Nano(now.Add(constant.DefaultVisibilityTimeout))
}

func MarkAsMovedToDLQ[T any](m *dynamomq.Message[T], now time.Time) {
Expand Down Expand Up @@ -47,7 +48,7 @@ func NewMessageFromReadyToProcessing(id string,
MarkAsProcessing(m, processingTime)
m.Version = 2
m.ReceiveCount = 1
m.InvisibleUntilAt = clock.FormatRFC3339Nano(processingTime.Add(dynamomq.DefaultVisibilityTimeout))
m.InvisibleUntilAt = clock.FormatRFC3339Nano(processingTime.Add(constant.DefaultVisibilityTimeout))
r := &dynamomq.ReceiveMessageOutput[test.MessageData]{
Result: &dynamomq.Result{
ID: m.ID,
Expand Down
8 changes: 5 additions & 3 deletions internal/cmd/flag.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cmd

import "github.com/vvatanabe/dynamomq"
import (
"github.com/vvatanabe/dynamomq/internal/constant"
)

var flgs = &Flags{}

Expand All @@ -16,12 +18,12 @@ var flagMap = FlagMap{
TableName: FlagSet[string]{
Name: "table-name",
Usage: "The name of the table to contain the item.",
Value: dynamomq.DefaultTableName,
Value: constant.DefaultTableName,
},
IndexName: FlagSet[string]{
Name: "index-name",
Usage: "The name of the queueing index.",
Value: dynamomq.DefaultQueueingIndexName,
Value: constant.DefaultQueueingIndexName,
},
EndpointURL: FlagSet[string]{
Name: "endpoint-url",
Expand Down
5 changes: 3 additions & 2 deletions internal/cmd/interactive.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/vvatanabe/dynamomq"
"github.com/vvatanabe/dynamomq/internal/clock"
"github.com/vvatanabe/dynamomq/internal/constant"
"github.com/vvatanabe/dynamomq/internal/test"
)

Expand Down Expand Up @@ -145,7 +146,7 @@ func (c *Interactive) system(_ context.Context, _ []string) error {
}

func (c *Interactive) ls(ctx context.Context, _ []string) error {
out, err := c.Client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: dynamomq.DefaultMaxListMessages})
out, err := c.Client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: constant.DefaultMaxListMessages})
if err != nil {
return err
}
Expand All @@ -161,7 +162,7 @@ func (c *Interactive) ls(ctx context.Context, _ []string) error {
}

func (c *Interactive) purge(ctx context.Context, _ []string) error {
out, err := c.Client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: dynamomq.DefaultMaxListMessages})
out, err := c.Client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: constant.DefaultMaxListMessages})
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/cmd/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/spf13/cobra"
"github.com/vvatanabe/dynamomq"
"github.com/vvatanabe/dynamomq/internal/clock"
"github.com/vvatanabe/dynamomq/internal/constant"
)

func (f CommandFactory) CreateLSCommand(flgs *Flags) *cobra.Command {
Expand All @@ -19,7 +20,7 @@ func (f CommandFactory) CreateLSCommand(flgs *Flags) *cobra.Command {
if err != nil {
return err
}
out, err := client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: dynamomq.DefaultMaxListMessages})
out, err := client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: constant.DefaultMaxListMessages})
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/cmd/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/spf13/cobra"
"github.com/vvatanabe/dynamomq"
"github.com/vvatanabe/dynamomq/internal/constant"
)

func (f CommandFactory) CreatePurgeCommand(flgs *Flags) *cobra.Command {
Expand All @@ -18,7 +19,7 @@ func (f CommandFactory) CreatePurgeCommand(flgs *Flags) *cobra.Command {
if err != nil {
return err
}
out, err := client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: dynamomq.DefaultMaxListMessages})
out, err := client.ListMessages(ctx, &dynamomq.ListMessagesInput{Size: constant.DefaultMaxListMessages})
if err != nil {
return err
}
Expand Down
12 changes: 12 additions & 0 deletions internal/constant/constant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package constant

import "time"

const (
DefaultTableName = "dynamo-mq-table"
DefaultQueueingIndexName = "dynamo-mq-index-queue_type-sent_at"
DefaultRetryMaxAttempts = 10
DefaultVisibilityTimeoutInSeconds = 30
DefaultVisibilityTimeout = DefaultVisibilityTimeoutInSeconds * time.Second
DefaultMaxListMessages = 10
)

0 comments on commit 48ba250

Please sign in to comment.