diff --git a/README.md b/README.md index 7442e47..9325b91 100644 --- a/README.md +++ b/README.md @@ -1,73 +1,116 @@ +# DynamoMQ: Implementing message queueing with Amazon DynamoDB in Go. + [![Build](https://github.com/vvatanabe/dynamomq/actions/workflows/build.yml/badge.svg)](https://github.com/vvatanabe/dynamomq/actions/workflows/build.yml) [![Go Reference](https://pkg.go.dev/badge/github.com/vvatanabe/dynamomq.svg)](https://pkg.go.dev/github.com/vvatanabe/dynamomq) [![Go Report Card](https://goreportcard.com/badge/github.com/vvatanabe/dynamomq)](https://goreportcard.com/report/github.com/vvatanabe/dynamomq) [![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=vvatanabe_dynamomq&metric=alert_status)](https://sonarcloud.io/summary/new_code?id=vvatanabe_dynamomq) [![Reliability Rating](https://sonarcloud.io/api/project_badges/measure?project=vvatanabe_dynamomq&metric=reliability_rating)](https://sonarcloud.io/summary/new_code?id=vvatanabe_dynamomq) [![Security Rating](https://sonarcloud.io/api/project_badges/measure?project=vvatanabe_dynamomq&metric=security_rating)](https://sonarcloud.io/summary/new_code?id=vvatanabe_dynamomq) [![Maintainability Rating](https://sonarcloud.io/api/project_badges/measure?project=vvatanabe_dynamomq&metric=sqale_rating)](https://sonarcloud.io/summary/new_code?id=vvatanabe_dynamomq) [![Vulnerabilities](https://sonarcloud.io/api/project_badges/measure?project=vvatanabe_dynamomq&metric=vulnerabilities)](https://sonarcloud.io/summary/new_code?id=vvatanabe_dynamomq) [![Bugs](https://sonarcloud.io/api/project_badges/measure?project=vvatanabe_dynamomq&metric=bugs)](https://sonarcloud.io/summary/new_code?id=vvatanabe_dynamomq) [![Code Smells](https://sonarcloud.io/api/project_badges/measure?project=vvatanabe_dynamomq&metric=code_smells)](https://sonarcloud.io/summary/new_code?id=vvatanabe_dynamomq) [![Duplicated Lines (%)](https://sonarcloud.io/api/project_badges/measure?project=vvatanabe_dynamomq&metric=duplicated_lines_density)](https://sonarcloud.io/summary/new_code?id=vvatanabe_dynamomq) [![Coverage](https://sonarcloud.io/api/project_badges/measure?project=vvatanabe_dynamomq&metric=coverage)](https://sonarcloud.io/summary/new_code?id=vvatanabe_dynamomq)

-Implementing message queueing with Amazon DynamoDB in Go. +DynamoMQ is a message queuing library that leverages DynamoDB as storage, implemented in Go. It provides an SDK to support the implementation of consumers and producers in Go, along with a CLI that functions as a management tool. ## Table of Contents -- [Current Status](#current-status) - [Motivation](#motivation) -- [Features](#features) -- [Installation](#installation) - * [DynamoMQ CLI](#dynamomq-cli) - * [DynamoMQ Library](#dynamomq-library) +- [Comparison with Existing AWS Queuing Solutions](#comparison-with-existing-aws-queuing-solutions) +- [Installation DynamoMQ](#installation-dynamomq) - [Setup DynamoMQ](#setup-dynamomq) - * [Required IAM Policy](#required-iam-policy) - * [Create Table with AWS CLI](#create-table-with-aws-cli) - * [Create Table with Terraform](#create-table-with-terraform) - [Authentication and access credentials](#authentication-and-access-credentials) - * [Environment Variables](#environment-variables) - * [Shared Configuration and Credentials Files](#shared-configuration-and-credentials-files) - [Usage for DynamoMQ CLI](#usage-for-dynamomq-cli) - * [Available Commands](#available-commands) - * [Global Flags](#global-flags) - * [Example Usage](#example-usage) - * [Interactive Mode](#interactive-mode) -- [Usage for DynamoMQ Library](#usage-for-dynamomq-library) - * [DynamoMQ Client](#dynamomq-client) - * [DynamoMQ Producer](#dynamomq-producer) - * [DynamoMQ Consumer](#dynamomq-consumer) -- [Software Design](#software-design) - * [State Machine](#state-machine) - * [Table Definition](#table-definition) - * [Data Transition](#data-transition) +- [Usage for DynamoMQ SDK](#usage-for-dynamomq-sdk) +- [About the Design of DynamoMQ](#about-the-design-of-dynamomq) +- [Conclusion](#conclusion) +- [Acknowledgments](#acknowledgments) - [Authors](#authors) - [License](#license) +## Motivation -## Current Status +Message Queuing systems are widely adopted for asynchronous message transmission between applications. These systems emphasize high throughput and reliability. In this section, we explore the benefits of integrating the features of DynamoDB with MQ systems. -This project is actively under development, but it is currently in version 0. Please be aware that the public API and exported methods may undergo changes. +### Overview of DynamoDB -## Motivation +Amazon DynamoDB is a high-performance NoSQL database service that supports a wide range of data models, from simple key-value stores to complex document-based stores. The main features of DynamoDB include: + +- **Performance and Scalability**: DynamoDB responds in milliseconds and can automatically scale up or down its read and write capacity as needed. +- **High Availability and Durability**: Data is automatically replicated across multiple geographically distributed facilities, ensuring high levels of availability and durability. +- **Fully Managed and Serverless**: It eliminates the need for server management and configuration, reducing operational costs. + +Reference: [What is Amazon DynamoDB?](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Introduction.html) + +### Advantages of Using DynamoDB as Message Queue + +Applying the features of DynamoDB to MQ systems offers the following benefits: + +1. **Scalability**: The ability to efficiently handle a large volume of messages is extremely important for MQ systems. DynamoDB's on-demand mode allows for easy adjustment of resources in response to fluctuating demands. +2. **Cost Efficiency**: Using the on-demand mode reduces the risk of over-provisioning or under-provisioning resources, thus lowering costs. Additionally, choosing the provisioning mode when usage can be predicted can further reduce costs. +3. **Reliability and Availability**: Message loss is critical for MQ systems. DynamoDB's high durability and availability ensure the safety of messages. +4. **Flexibility**: Editing items stored in DynamoDB as messages allows for dynamic changes in the message order within the queue, updates to attributes, and cancellation of message processing, offering more flexible operations. + +## Comparison with Existing AWS Queuing Solutions + +AWS offers several solutions for MQ, including Amazon SQS, Amazon MQ, and Amazon MSK based on Kafka. By comparing these services with DynamoMQ, we can examine the unique advantages of DynamoMQ. + +### Comparison with Amazon SQS + +Amazon SQS is a fully managed, serverless MQ service widely used for asynchronous message processing in large-scale distributed applications. + +**It excels in scalability, cost efficiency, reliability, and availability, and I generally recommend using Amazon SQS.** + +However, Amazon SQS lacks flexibility in certain aspects. Messages sent to the queue cannot be reordered or have their attributes updated from outside. Also, to reference messages within the queue, they must first be received. These constraints can make it challenging to handle specific problems and hinder investigation and recovery efforts. + +### Comparison with Amazon MQ + +Amazon MQ is a message broker service based on Apache ActiveMQ and RabbitMQ, primarily aimed at facilitating the migration of existing applications. It supports a variety of protocols such as JMS, AMQP, STOMP, MQTT, OpenWire, and WebSocket. + +Amazon MQ offers numerous features provided by its base technologies, ActiveMQ and RabbitMQ. However, it is not serverless, which means setup and management require more effort. + +### Comparison with Amazon MSK + +Amazon MSK provides a fully managed Apache Kafka service, suitable for processing large volumes of streaming data. + +While MSK specializes in real-time data streaming, DynamoMQ focuses on general message queuing needs. MSK allows for advanced configurations, but this can lead to increased complexity and costs. + +### Advantages of DynamoMQ + +DynamoMQ leverages the scalability, durability, and serverless nature of DynamoDB. Compared to some AWS queuing solutions, it offers flexibility in managing the order and attributes of messages, low-cost operations, and easy setup and management. -> DynamoDB is a key-value and document database that delivers single-digit millisecond performance at any scale. It’s a serverless and fully managed service that you can use for mobile, web, gaming, ad tech, IoT, and other applications that need low-latency data access at a large scale. -> -> There are many queuing implementations that offer persistence, single-message processing, and distributed computing. Some popular queuing solutions are Amazon SQS, Amazon MQ, Apache ActiveMQ, RabbitMQ, and Kafka. Those services handle various queuing features and functions with several different characteristics, such as methods of implementation, scaling, and performance. -> -> However, most of those queuing systems cannot easily change the order of the items after they arrive in the queue. Discussed implementation with DynamoDB can change the order in the queue or cancel items before processing. +## Supported Features -Quoted from AWS official blog: [Implementing Priority Queueing with Amazon DynamoDB](https://aws.amazon.com/blogs/database/implementing-priority-queueing-with-amazon-dynamodb/) +DynamoMQ is equipped with key features that should be provided by a Message Queuing system, supporting the realization of a flexible and reliable system. -## Features +### Redelivery -- [x] **Redelivery**: Redeliver messages that have not completed successfully for a specified number of times. -- [x] **Concurrent Execution**: Process concurrently using multiple goroutines. -- [x] **Dead Letter Queue**: Move messages that exceed the maximum number of redeliveries to the dead letter queue. -- [x] **Graceful Shutdown**: Complete processing of messages before shutting down the consumer process. -- [x] **FIFO (First In, First Out)**: Retrieve messages from the message queue on a first-in, first-out basis. -- [x] **Consumer Process Scaling**: Scale out by running multiple consumer processes without duplicating message retrieval from the same message queue. -- [x] **Visibility Timeout**: DynamoMQ sets a visibility timeout, a period of time during which DynamoMQ prevents all consumers from receiving and processing the message. -- [x] **Delay queues**: Delay queues allow you to delay the delivery of new messages to consumers for a set number of seconds. -- [ ] **Deduplication**: Deduplication messages within the message queue. -- [ ] **Randomized Exponential Backoff**: Prevent overlapping redelivery timing. -- [ ] **Batch Message Processing**: Send and delete multiple messages in bulk to/from the message queue. -- [ ] **Message Compression** +If a message is not processed successfully, it will be redelivered. This approach addresses temporary errors and delays in processing, increasing the chances that messages are processed correctly. -## Installation +### Concurrent Execution + +Multiple goroutines are utilized to process messages concurrently. This feature enables high throughput and efficient use of resources. + +### Dead Letter Queue + +Messages that exceed the maximum number of redeliveries are moved to the Dead Letter Queue (DLQ). This separates messages with persistent errors, allowing for later analysis or manual processing. + +### Graceful Shutdown + +Message processing is completed before the shutdown of the consumer process. This prevents the loss of messages that are being processed at the time of shutdown. + +### FIFO (First In, First Out) + +Messages are retrieved from the queue on a First In, First Out basis. This guarantees that messages are processed in the order they were sent, making it suitable for applications where order is important. + +### Consumer Process Scaling + +Multiple consumer processes can be launched, enabling scaling out. This prevents duplication in retrieving messages from the same message queue. Consequently, processing capacity can be dynamically adjusted according to the load. + +### Visibility Timeout + +A visibility timeout is set for a specific period during which the message is invisible to all consumers. This prevents a message from being received by other consumers while it is being processed. + +### Delay Queuing + +Delay queuing allows the delivery of new messages to consumers to be delayed for a set number of seconds. This feature accommodates the needs of applications that require a delayed message delivery. + +## Installation DynamoMQ Requires Go version 1.21 or greater. @@ -114,10 +157,6 @@ DynamoMQ's CLI and library configure AWS Config with credentials obtained from e - `AWS_SECRET_ACCESS_KEY` - Your AWS secret key. - `AWS_SESSION_TOKEN` - Session token for temporary credentials. -### Shared Configuration and Credentials Files - -These files provide a common location for storing AWS credentials and configuration settings, enabling consistent credential management across different AWS tools and applications. - ## Usage for DynamoMQ CLI The `dynamomq` command-line interface provides a range of commands to interact with your DynamoDB-based message queue. Below are the available commands and global flags that can be used with `dynamomq`. @@ -148,36 +187,6 @@ The `dynamomq` command-line interface provides a range of commands to interact w To get more detailed information about a specific command, use `dynamomq [command] --help`. -### Example Usage - -Here are a few examples of how to use the `dynamomq` commands: - -```sh -# Generate autocompletion script for bash -dynamomq completion bash - -# Delete a message with ID 'A-123' -dynamomq delete --id A-123 - -# Retrieve DLQ statistics -dynamomq dlq - -# Enqueue test messages -dynamomq enqueue-test - -# Get a message by ID -dynamomq get --id A-123 - -# List the first 10 message IDs in the queue -dynamomq ls - -# Receive a message from the queue -dynamomq receive - -# Reset system information of a message with ID -dynamomq reset --id A-123 -``` - ### Interactive Mode The DynamoMQ CLI supports an Interactive Mode for an enhanced user experience. To enter the Interactive Mode, simply run the `dynamomq` command without specifying any subcommands. @@ -202,7 +211,7 @@ Once in Interactive Mode, you will have access to a suite of commands to manage - `fail`: Simulates the failed processing of a message by putting it back into the queue; the message will need to be received again. - `invalid`: Moves a message from the standard queue to the DLQ for manual fixing. -## Usage for DynamoMQ Library +## Usage for DynamoMQ SDK ### DynamoMQ Client @@ -297,95 +306,134 @@ func (c *Counter[T]) Process(msg *dynamomq.Message[T]) error { } ``` -## Software Design +## About the Design of DynamoMQ -### State Machine +### Message Attributes and Table Definition -The state machine diagram below illustrates the key steps a message goes through as it traverses the system. +Here's a diagram showing the table definition used by DynamoMQ to implement its message queuing mechanism. -![State Machine](https://cacoo.com/diagrams/DjoA2pSKnhCghTYM-9383C.png) +| Key | Attributes | Type | Example Value | +|-------|--------------------|--------|-------------------------------------| +| PK | id | string | A-101 | +| | data | any | any | +| | receive_count | number | 1 | +| GSIPK | queue_type | string | STANDARD or DLQ | +| | version | number | 1 | +| | created_at | string | 2006-01-02T15:04:05.999999999Z07:00 | +| | updated_at | string | 2006-01-02T15:04:05.999999999Z07:00 | +| GSISK | sent_at | string | 2006-01-02T15:04:05.999999999Z07:00 | +| | received_at | string | 2006-01-02T15:04:05.999999999Z07:00 | +| | invisible_until_at | string | 2006-01-02T15:04:05.999999999Z07:00 | -#### Basic Flow +#### id (Partition Key) -1. **SendMessage()**: A user sends a message that is placed in the `READY` state in the queue. +This is the unique identifier of the message, serving as the partition key in DynamoDB. It ensures efficient data distribution and access within DynamoDB. When using DynamoMQ's publisher, a UUID is generated by default. Users can also specify their own IDs. -2. **ReceiveMessage()**: The message moves from `READY` to `PROCESSING` status as it is picked up for processing. +#### data -3. **DeleteMessage()**: If processing is successful, the message is deleted from the queue. +This field contains the data included in the message. It can be stored in any format supported by DynamoDB. -#### Error Handling +References: +- [DynamoDB Data Types](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBMapper.DataTypes.html) +- [DynamoDB Item sizes and formats](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/CapacityUnitCalculations.html) -1. **ChangeMessageVisibility()**: If processing fails, the message is made visible again in the `READY` state for retry, and its visibility timeout is updated. +#### receive_count -2. **MoveMessageToDLQ()**: If the message exceeds the retry limit, it is moved to the Dead Letter Queue (DLQ). The DLQ is used to isolate problematic messages for later analysis. +This indicates the number of times the message has been received. It's used for managing retries and moving messages to the Dead Letter Queue. -#### Dead Letter Queue (DLQ) +#### queue_type (Partition Key for GSI) -1. **RedriveMessage()**: The system may choose to return a message to the standard queue if it determines that the issues have been resolved. This is achieved through the `Redrive` operation. +This attribute shows the type of queue where the message is stored, distinguishing between STANDARD and DLQ. -2. **ReceiveMessage()**: Messages in the DLQ are also moved from `READY` to `PROCESSING` status, similar to regular queue messages. +#### version -3. **DeleteMessage()**: Once a message in the DLQ is successfully processed, it is deleted from the queue. +This is the version number of the message. It increments each time the message is updated, facilitating optimistic concurrency control. -This design ensures that DynamoMQ maintains message reliability while enabling tracking and analysis of messages in the event of errors. The use of a DLQ minimizes the impact of failures while maintaining system resiliency. +#### created_at -### Table Definition +The timestamp when the message was created, recorded in ISO 8601 format. -The DynamoDB table for the DynamoMQ message queue system is designed to efficiently manage and track the status of messages. Here’s a breakdown of the table schema: +#### updated_at -| Key | Attributes | Type | Example Value | -|-------|--------------------|--------|-------------------------------------| -| PK | id | string | A-101 | -| | data | any | any | -| | receive_count | number | 1 | -| GSIPK | queue_type | string | STANDARD or DLQ | -| | version | number | 1 | -| | created_at | string | 2006-01-02T15:04:05.999999999Z07:00 | -| | updated_at | string | 2006-01-02T15:04:05.999999999Z07:00 | -| GSISK | sent_at | string | 2006-01-02T15:04:05.999999999Z07:00 | -| | received_at | string | 2006-01-02T15:04:05.999999999Z07:00 | -| | invisible_until_at | string | 2006-01-02T15:04:05.999999999Z07:00 | +The timestamp when the message was last updated, recorded in ISO 8601 format. + +#### sent_at (Sort Key for GSI) + +The timestamp when the message was sent to the queue, recorded in ISO 8601 format. -**PK (Primary Key)** `ID`: A unique identifier for each message, such as 'A-101'. This is a string value that facilitates the retrieval and management of messages. +#### received_at -**GSIPK (Global Secondary Index - Partition Key)** `queue_type`: Used to categorize messages by `queue_type`, such as 'STANDARD' or 'DLQ' (Dead Letter Queue), allowing for quick access and operations on subsets of the queue. +The timestamp when the message was received, recorded in ISO 8601 format. -**GSISK (Global Secondary Index - Sort Key)** `sent_at`: The timestamp when the message was sent to the queue. Facilitates the ordering of messages based on the time they were added to the queue, which is useful for implementing FIFO (First-In-First-Out) or other ordering mechanisms. +#### invisible_until_at -**Attributes**: These are the various properties associated with each message: -- `data`: This attribute holds the content of the message and can be of any type. -- `receive_count`: A numerical count of how many times the message has been retrieved from the queue. -- `version`: A number that can be used for optimistic locking and to ensure that the message is not being concurrently modified. -- `created_at`: The date and time when the message was created. ISO 8601 format. -- `updated_at`: The date and time when the message was last updated. ISO 8601 format. -- `received_at`: The timestamp when the message was last viewed without being altered. ISO 8601 format. -- `invisible_until_at`: The timestamp indicating when the message becomes visible in the queue for processing. ISO 8601 format. +The timestamp indicating when the message will next become visible in the queue. Once this time passes, the message becomes receivable again. -### Data Transition +#### Global Secondary Index (GSI) -This data transition diagram serves as a map for developers and operators to understand how messages flow through the DynamoMQ system, providing insight into the mechanisms of message processing, failure handling, and retries within a DynamoDB-backed queue. +A GSI with `queue_type` as the partition key and `sent_at` as the sort key is set up to receive messages in the order they are added to the queue. + +### Message State Machine + +The following state machine diagram illustrates the lifecycle of messages in DynamoMQ and their possible state transitions. The diagram shows how messages are processed in both STANDARD (`queue_type=STANDARD`) and Dead Letter Queues (`queue_type=DLQ`). + +![State Machine](https://cacoo.com/diagrams/DjoA2pSKnhCghTYM-9383C.png) + +Additionally, the below diagram illustrates how message attributes change with state transitions. Attributes highlighted in red are updated during these transitions. ![Data Transition](https://cacoo.com/diagrams/DjoA2pSKnhCghTYM-DCE15.png) -#### Initial State +#### Standard Queue Data Transition Explanation + +1. **Message Sending** + - The producer uses the `SendMessage()` function to send a message to the standard queue. + - The sent message enters the 'READY' state, indicating it is available for receipt. + - The message is assigned a unique ID and contains data in the `data` field. + - The `queue_type` attribute is set to `STANDARD`, and the version starts at `1`. + - Timestamps for `created_at`, `updated_at`, and `sent_at` are recorded. + +2. **Message Receipt** + - The consumer receives the message using the `ReceiveMessage()` function and begins processing. + - The message transitions to the 'PROCESSING' state, becoming invisible to other consumers during this period. + - The + + `receive_count` and `version` of the message increment with each receipt. + - Timestamps for `updated_at` and `received_at` are updated, and `invisible_until_at` is set with a new timestamp. + +3. **Successful Processing** + - Upon successful processing, the consumer removes the message from the queue using the `DeleteMessage()` function. + +4. **Failed Processing** + - In case of a failure, particularly when a retry is needed, the `ChangeMessageVisibility()` is used to update the `invisible_until_at` attribute of the message for later reprocessing. + - Once the timestamp set in `invisible_until_at` passes, the message returns to the 'READY' state for potential re-receipt. + +#### Dead Letter Queue Data Transition Explanation -- **SendMessage()**: A message is created with an initial `status` of 'READY'. It includes a unique `id`, arbitrary `data`, and a `receive_count` set to 0, indicating it has not yet been processed. The `queue_type` is 'STANDARD', and timestamps are recorded for creation, last update, and when added to the queue. +1. **Move to DLQ** + - If processing of a message fails beyond the maximum redelivery attempts, the `MoveMessageToDLQ()` function moves it to the Dead Letter Queue (DLQ). + - At this point, the `queue_type` of the message changes to `DLQ`, and `receive_count` resets to `0`. + - The message moved to the DLQ reverts to the 'READY' state. -#### Processing +2. **Receipt of Message** + - Within the DLQ, the message is again received through `ReceiveMessage()` and transitions to the 'PROCESSING' state. + - During this period, the message becomes invisible to other consumers. -- **ReceiveMessage()**: The message `status` changes to 'PROCESSING', the `receive_count` increments to reflect the number of times it's been retrieved, and the `version` number increases to facilitate optimistic locking. Timestamps are updated accordingly. +3. **Successful Processing** + - Once successfully processed, the message is removed from the DLQ using `DeleteMessage()`. -#### Retry Logic +4. **Return to Standard Queue** + - Using `RedriveMessage()`, the message can be moved back to the original standard queue. + - This action resets the `queue_type` to `STANDARD’, and the message reverts to the 'READY' state. -- **ChangeMessageVisibility()**: If processing fails, the message's visibility is updated to make it available for retry, and the `receive_count` is incremented. Timestamps are refreshed to reflect the most recent update. +## Conclusion -#### Dead Letter Queue +DynamoMQ is a message queuing library that leverages the features of DynamoDB to achieve high scalability, reliability, and cost efficiency. Notably, its ability to dynamically edit message order and attributes enables flexible adaptation to application requirements. -- **MoveMessageToDLQ()**: After the maximum number of retries is reached without successful processing, the message is moved to the DLQ. Its `queue_type` changes to 'DLQ', and `receive_count` is reset, indicating that it's ready for a fresh attempt or investigation. +Compared to existing solutions, DynamoMQ offers ease of management for developers while providing the reliability of fully managed services like Amazon SQS. It also encompasses key functionalities expected from a message queue, such as concurrent processing with multiple goroutines, Dead Letter Queues, and ensuring FIFO (First In, First Out) order. -#### Redrive Policy +## Acknowledgments -- **RedriveMessage()**: If issues are resolved, messages in the DLQ can be sent back to the standard queue for processing. This is depicted by the `RedriveMessage()` operation, which resets the `receive_count` and alters the `queue_type` back to 'STANDARD', along with updating the timestamps. +We extend our deepest gratitude to the AWS official blog, "[Implementing priority queueing with Amazon DynamoDB](https://aws.amazon.com/blogs/database/implementing-priority-queueing-with-amazon-dynamodb/)," which served as a reference in the development of DynamoMQ. This blog provides a detailed explanation of implementing priority queuing using Amazon DynamoDB, and this knowledge has been immensely beneficial in constructing DynamoMQ. ## Authors