Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce for SQS.SendMessageBatch #101

Merged
merged 7 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: '3.3'
services:
localstack:
container_name: 'k6-jslib-aws-localstack'
image: 'localstack/localstack:2.0.2'
image: 'localstack/localstack:3.4.0'
ports:
- '127.0.0.1:4566:4566' # LocalStack Gateway
- '127.0.0.1:4510-4559:4510-4559' # external services port range
Expand Down
250 changes: 163 additions & 87 deletions src/internal/sqs.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import { AWSClient } from './client'
import { AWSConfig } from './config'
import { InvalidSignatureError, SignatureV4 } from './signature'
import { HTTPHeaders, SignedHTTPRequest } from './http'
import { HTTPHeaders } from './http'
import http, { RefinedResponse, ResponseType } from 'k6/http'
import { toFormUrlEncoded } from './utils'
import { AWSError } from './error'

const API_VERSION = '2012-11-05'
import { AMZ_TARGET_HEADER } from './constants'
import { JSONObject } from './json'

export class SQSClient extends AWSClient {
private readonly signature: SignatureV4
private readonly commonHeaders: HTTPHeaders

private readonly serviceVersion: string

constructor(awsConfig: AWSConfig) {
super(awsConfig, 'sqs')

this.serviceVersion = 'AmazonSQS'

this.signature = new SignatureV4({
service: this.serviceName,
region: this.awsConfig.region,
Expand All @@ -28,7 +31,7 @@ export class SQSClient extends AWSClient {
})

this.commonHeaders = {
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Type': 'application/x-amz-json-1.0',
}
}

Expand All @@ -40,77 +43,72 @@ export class SQSClient extends AWSClient {
* @param {Object} options - Options for the request
* @param {string} [options.messageDeduplicationId] - The message deduplication id.
* @param {string} [options.messageGroupId] - The message group ID for FIFO queues
* @returns {Message} - The message that was sent.
* @returns {MessageResponse} - The message that was sent.
*/
async sendMessage(
queueUrl: string,
messageBody: string,
options: SendMessageOptions = {}
): Promise<Message> {
const method = 'POST'
): Promise<MessageResponse> {
const action = 'SendMessage'

let body: object = {
Action: 'SendMessage',
Version: API_VERSION,
const body = {
QueueUrl: queueUrl,
MessageBody: messageBody,
...this._combineQueueMessageBodyAndOptions(messageBody, options),
}

if (typeof options.messageDeduplicationId !== 'undefined') {
body = { ...body, MessageDeduplicationId: options.messageDeduplicationId }
}
const res = await this._sendRequest(action, body)

if (typeof options.messageGroupId !== 'undefined') {
body = { ...body, MessageGroupId: options.messageGroupId }
}
const parsed = res.json() as JSONObject
return new MessageResponse(
parsed['MessageId'] as string,
parsed['MD5OfMessageBody'] as string
)
}

if (typeof options.messageAttributes !== 'undefined') {
/*
* A single message attribute is represented as 3 separate parameters: name, value, and type.
* The name of the value parameter varies based on the data type.
* See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html#SQS-SendMessage-request-MessageAttributes
* for more information.
*/
const attributeParameters = Object.entries(options.messageAttributes).reduce(
(params, [name, attribute], i) => {
const valueParameterSuffix =
attribute.type === 'Binary' ? 'BinaryValue' : 'StringValue'
return Object.assign(params, {
[`MessageAttribute.${i + 1}.Name`]: name,
[`MessageAttribute.${i + 1}.Value.${valueParameterSuffix}`]:
attribute.value,
[`MessageAttribute.${i + 1}.Value.DataType`]: attribute.type,
})
},
{} as Record<string, string>
/**
* Delivers up to ten messages to the specified queue.
*
* @param {string} queueUrl - The URL of the Amazon SQS queue to which a message is sent. Queue URLs and names are case-sensitive.
* @param {SendMessageBatchEntry[]} entries - A list of up to ten messages to send.
* @returns {MessageBatchResponse} - The messages that were sent.
*/
async sendMessageBatch(
queueUrl: string,
entries: SendMessageBatchEntry[]
): Promise<MessageBatchResponse> {
const action = 'SendMessageBatch'

const requestMessageEntries = entries.map((entry) => {
let requestMessageEntry = this._combineQueueMessageBodyAndOptions(
entry.messageBody,
entry.messageOptions
)
body = { ...body, ...attributeParameters }
}
requestMessageEntry = { ...requestMessageEntry, Id: entry.messageId }
return requestMessageEntry
})

if (typeof options.delaySeconds !== 'undefined') {
body = { ...body, DelaySeconds: options.delaySeconds }
}
const body = { QueueUrl: queueUrl, Entries: requestMessageEntries }

const signedRequest: SignedHTTPRequest = this.signature.sign(
{
method: 'POST',
endpoint: this.endpoint,
path: '/',
headers: {
...this.commonHeaders,
},
body: toFormUrlEncoded(body),
},
{}
)
const res = await this._sendRequest(action, body)

const res = await http.asyncRequest(method, signedRequest.url, signedRequest.body || '', {
headers: signedRequest.headers,
})
this._handleError('SendMessage', res)
const parsed = res.json() as JSONObject
const successful: JSONObject[] = (parsed['Successful'] as JSONObject[]) || []
const failed: JSONObject[] = (parsed['Failed'] as JSONObject[]) || []

const parsed = res.html('SendMessageResponse > SendMessageResult')
return new Message(parsed.find('MessageId').text(), parsed.find('MD5OfMessageBody').text())
return {
successful: successful.map(
(entry) =>
new MessageResponse(
entry['MessageId'] as string,
entry['MD5OfMessageBody'] as string
)
),
failed: failed.map(
(entry) =>
new SQSServiceError(entry['Message'] as string, entry['Code'] as string, action)
),
}
}

/**
Expand All @@ -125,12 +123,9 @@ export class SQSClient extends AWSClient {
* @returns {string} [Object.nextToken] - In the future, you can use NextToken to request the next set of results.
*/
async listQueues(parameters: ListQueuesRequestParameters = {}): Promise<ListQueuesResponse> {
const method = 'POST'
const action = 'ListQueues'

let body: object = {
Action: 'ListQueues',
Version: API_VERSION,
}
let body: object = {}

if (typeof parameters?.maxResults !== 'undefined') {
body = { ...body, MaxResults: parameters.maxResults }
Expand All @@ -144,60 +139,109 @@ export class SQSClient extends AWSClient {
body = { ...body, QueueNamePrefix: parameters.queueNamePrefix }
}

const signedRequest: SignedHTTPRequest = this.signature.sign(
const res = await this._sendRequest(action, body)

const parsed = res.json() as JSONObject
return {
urls: parsed['QueueUrls'] as string[],
nextToken: parsed?.NextToken as string,
}
}

private _combineQueueMessageBodyAndOptions(
messageBody: string,
options?: SendMessageOptions
): object {
let body: object = { MessageBody: messageBody }

if (options === undefined) {
return body
}

if (typeof options.messageDeduplicationId !== 'undefined') {
body = { ...body, MessageDeduplicationId: options.messageDeduplicationId }
}

if (typeof options.messageGroupId !== 'undefined') {
body = { ...body, MessageGroupId: options.messageGroupId }
}

if (typeof options.messageAttributes !== 'undefined') {
const messageAttributes: Record<string, Record<string, string>> = {}

for (const [name, attribute] of Object.entries(options.messageAttributes)) {
const valueParameterSuffix =
attribute.type === 'Binary' ? 'BinaryValue' : 'StringValue'
messageAttributes[name] = {
DataType: attribute.type,
}
messageAttributes[name][valueParameterSuffix] = attribute.value
}

body = { ...body, MessageAttributes: messageAttributes }
}

if (typeof options.delaySeconds !== 'undefined') {
body = { ...body, DelaySeconds: options.delaySeconds }
}

return body
}

private async _sendRequest(
action: SQSOperation,
body: object
): Promise<RefinedResponse<ResponseType>> {
const signedRequest = this.signature.sign(
{
method: 'POST',
endpoint: this.endpoint,
path: '/',
headers: {
...this.commonHeaders,
Host: this.endpoint.host,
[AMZ_TARGET_HEADER]: `${this.serviceVersion}.${action}`,
},
body: toFormUrlEncoded(body),
body: JSON.stringify(body),
},
{}
)

const res = await http.asyncRequest(method, signedRequest.url, signedRequest.body || '', {
const res = await http.asyncRequest('POST', signedRequest.url, signedRequest.body, {
headers: signedRequest.headers,
})
this._handleError('ListQueues', res)

const parsed = res.html()
return {
urls: parsed
.find('QueueUrl')
.toArray()
.map((e) => e.text()),
nextToken: parsed.find('NextToken').text() || undefined,
}
this._handleError(action, res)
return res
}

private _handleError(
operation: SQSOperation,
response: RefinedResponse<ResponseType | undefined>
) {
const errorCode: number = response.error_code
const errorMessage: string = response.error

if (errorMessage == '' && errorCode === 0) {
if (errorCode === 0) {
return
}

const awsError = AWSError.parseXML(response.body as string)
switch (awsError.code) {
case 'AuthorizationHeaderMalformed':
throw new InvalidSignatureError(awsError.message, awsError.code)
const error = response.json() as JSONObject

const errorMessage: string =
(error.Message as string) || (error.message as string) || (error.__type as string)

switch (error.__type) {
case 'InvalidSignatureException':
throw new InvalidSignatureError(errorMessage, error.__type)
default:
throw new SQSServiceError(awsError.message, awsError.code || 'unknown', operation)
throw new SQSServiceError(errorMessage, error.__type as string, operation)
}
}
}

/**
* An Amazon SQS message.
*/
export class Message {
export class MessageResponse {
/**
* A unique identifier for the message.
* A MessageIdis considered unique across all AWS accounts for an extended period of time.
Expand All @@ -221,6 +265,32 @@ export class Message {
}
}

/**
* An Amazon SQS message Batch Response.
*/
export class MessageBatchResponse {
/**
* A list of successful messages.
*/
successful: MessageResponse[]

/**
* A list of failed messages.
*/
failed: SQSServiceError[]

/**
* Instantiates a new MessageBatchResponse object.
*
* @param successful
* @param failed
*/
constructor(successful: MessageResponse[], failed: SQSServiceError[]) {
this.successful = successful
this.failed = failed
}
}

/**
* SQSServiceError indicates an error occurred while interacting with the SQS API.
*/
Expand All @@ -237,7 +307,7 @@ export class SQSServiceError extends AWSError {
/**
* SQSOperation describes possible SQS operations.
*/
type SQSOperation = 'ListQueues' | 'SendMessage'
type SQSOperation = 'ListQueues' | 'SendMessage' | 'SendMessageBatch'

export interface SendMessageOptions {
/**
Expand All @@ -263,6 +333,12 @@ export interface SendMessageOptions {
delaySeconds?: number
}

export interface SendMessageBatchEntry {
messageId: string
messageBody: string
messageOptions?: SendMessageOptions
}

export interface ListQueuesRequestParameters {
/**
* Maximum number of results to include in the response. Value range is 1 to 1000.
Expand Down
Loading
Loading