From 2119312fb6dd8eb5aec5c1fa52893747d59a12a7 Mon Sep 17 00:00:00 2001 From: Lorenz Junglas <4759511+lolleko@users.noreply.github.com> Date: Mon, 15 Jul 2024 09:02:23 +0200 Subject: [PATCH] Introduce for SQS.SendMessageBatch (#101) * Introduce for SQS.SendMessageBatch - Introduces SQS.SendMessageBatch function - Refactors SQS service to use json instead of xml requests. - Had to bump local stack image because of https://github.com/localstack/localstack/issues/9610 * Format * Format * Update build * Update build --- docker-compose.yml | 2 +- src/internal/sqs.ts | 250 +++++++++++++++++++++++++++--------------- tests/internal/sqs.js | 59 +++++++++- 3 files changed, 218 insertions(+), 93 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 1d86a3c..dedad51 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ version: '3.3' services: localstack: container_name: 'k6-jslib-aws-localstack' - image: 'localstack/localstack:2.0.2' + image: 'localstack/localstack:3.4.0' ports: - '127.0.0.1:4566:4566' # LocalStack Gateway - '127.0.0.1:4510-4559:4510-4559' # external services port range diff --git a/src/internal/sqs.ts b/src/internal/sqs.ts index 0e4398c..b65aef8 100644 --- a/src/internal/sqs.ts +++ b/src/internal/sqs.ts @@ -1,20 +1,23 @@ import { AWSClient } from './client' import { AWSConfig } from './config' import { InvalidSignatureError, SignatureV4 } from './signature' -import { HTTPHeaders, SignedHTTPRequest } from './http' +import { HTTPHeaders } from './http' import http, { RefinedResponse, ResponseType } from 'k6/http' -import { toFormUrlEncoded } from './utils' import { AWSError } from './error' - -const API_VERSION = '2012-11-05' +import { AMZ_TARGET_HEADER } from './constants' +import { JSONObject } from './json' export class SQSClient extends AWSClient { private readonly signature: SignatureV4 private readonly commonHeaders: HTTPHeaders + private readonly serviceVersion: string + constructor(awsConfig: AWSConfig) { super(awsConfig, 'sqs') + this.serviceVersion = 'AmazonSQS' + this.signature = new SignatureV4({ service: this.serviceName, region: this.awsConfig.region, @@ -28,7 +31,7 @@ export class SQSClient extends AWSClient { }) this.commonHeaders = { - 'Content-Type': 'application/x-www-form-urlencoded', + 'Content-Type': 'application/x-amz-json-1.0', } } @@ -40,77 +43,72 @@ export class SQSClient extends AWSClient { * @param {Object} options - Options for the request * @param {string} [options.messageDeduplicationId] - The message deduplication id. * @param {string} [options.messageGroupId] - The message group ID for FIFO queues - * @returns {Message} - The message that was sent. + * @returns {MessageResponse} - The message that was sent. */ async sendMessage( queueUrl: string, messageBody: string, options: SendMessageOptions = {} - ): Promise { - const method = 'POST' + ): Promise { + const action = 'SendMessage' - let body: object = { - Action: 'SendMessage', - Version: API_VERSION, + const body = { QueueUrl: queueUrl, - MessageBody: messageBody, + ...this._combineQueueMessageBodyAndOptions(messageBody, options), } - if (typeof options.messageDeduplicationId !== 'undefined') { - body = { ...body, MessageDeduplicationId: options.messageDeduplicationId } - } + const res = await this._sendRequest(action, body) - if (typeof options.messageGroupId !== 'undefined') { - body = { ...body, MessageGroupId: options.messageGroupId } - } + const parsed = res.json() as JSONObject + return new MessageResponse( + parsed['MessageId'] as string, + parsed['MD5OfMessageBody'] as string + ) + } - if (typeof options.messageAttributes !== 'undefined') { - /* - * A single message attribute is represented as 3 separate parameters: name, value, and type. - * The name of the value parameter varies based on the data type. - * See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html#SQS-SendMessage-request-MessageAttributes - * for more information. - */ - const attributeParameters = Object.entries(options.messageAttributes).reduce( - (params, [name, attribute], i) => { - const valueParameterSuffix = - attribute.type === 'Binary' ? 'BinaryValue' : 'StringValue' - return Object.assign(params, { - [`MessageAttribute.${i + 1}.Name`]: name, - [`MessageAttribute.${i + 1}.Value.${valueParameterSuffix}`]: - attribute.value, - [`MessageAttribute.${i + 1}.Value.DataType`]: attribute.type, - }) - }, - {} as Record + /** + * Delivers up to ten messages to the specified queue. + * + * @param {string} queueUrl - The URL of the Amazon SQS queue to which a message is sent. Queue URLs and names are case-sensitive. + * @param {SendMessageBatchEntry[]} entries - A list of up to ten messages to send. + * @returns {MessageBatchResponse} - The messages that were sent. + */ + async sendMessageBatch( + queueUrl: string, + entries: SendMessageBatchEntry[] + ): Promise { + const action = 'SendMessageBatch' + + const requestMessageEntries = entries.map((entry) => { + let requestMessageEntry = this._combineQueueMessageBodyAndOptions( + entry.messageBody, + entry.messageOptions ) - body = { ...body, ...attributeParameters } - } + requestMessageEntry = { ...requestMessageEntry, Id: entry.messageId } + return requestMessageEntry + }) - if (typeof options.delaySeconds !== 'undefined') { - body = { ...body, DelaySeconds: options.delaySeconds } - } + const body = { QueueUrl: queueUrl, Entries: requestMessageEntries } - const signedRequest: SignedHTTPRequest = this.signature.sign( - { - method: 'POST', - endpoint: this.endpoint, - path: '/', - headers: { - ...this.commonHeaders, - }, - body: toFormUrlEncoded(body), - }, - {} - ) + const res = await this._sendRequest(action, body) - const res = await http.asyncRequest(method, signedRequest.url, signedRequest.body || '', { - headers: signedRequest.headers, - }) - this._handleError('SendMessage', res) + const parsed = res.json() as JSONObject + const successful: JSONObject[] = (parsed['Successful'] as JSONObject[]) || [] + const failed: JSONObject[] = (parsed['Failed'] as JSONObject[]) || [] - const parsed = res.html('SendMessageResponse > SendMessageResult') - return new Message(parsed.find('MessageId').text(), parsed.find('MD5OfMessageBody').text()) + return { + successful: successful.map( + (entry) => + new MessageResponse( + entry['MessageId'] as string, + entry['MD5OfMessageBody'] as string + ) + ), + failed: failed.map( + (entry) => + new SQSServiceError(entry['Message'] as string, entry['Code'] as string, action) + ), + } } /** @@ -125,12 +123,9 @@ export class SQSClient extends AWSClient { * @returns {string} [Object.nextToken] - In the future, you can use NextToken to request the next set of results. */ async listQueues(parameters: ListQueuesRequestParameters = {}): Promise { - const method = 'POST' + const action = 'ListQueues' - let body: object = { - Action: 'ListQueues', - Version: API_VERSION, - } + let body: object = {} if (typeof parameters?.maxResults !== 'undefined') { body = { ...body, MaxResults: parameters.maxResults } @@ -144,33 +139,79 @@ export class SQSClient extends AWSClient { body = { ...body, QueueNamePrefix: parameters.queueNamePrefix } } - const signedRequest: SignedHTTPRequest = this.signature.sign( + const res = await this._sendRequest(action, body) + + const parsed = res.json() as JSONObject + return { + urls: parsed['QueueUrls'] as string[], + nextToken: parsed?.NextToken as string, + } + } + + private _combineQueueMessageBodyAndOptions( + messageBody: string, + options?: SendMessageOptions + ): object { + let body: object = { MessageBody: messageBody } + + if (options === undefined) { + return body + } + + if (typeof options.messageDeduplicationId !== 'undefined') { + body = { ...body, MessageDeduplicationId: options.messageDeduplicationId } + } + + if (typeof options.messageGroupId !== 'undefined') { + body = { ...body, MessageGroupId: options.messageGroupId } + } + + if (typeof options.messageAttributes !== 'undefined') { + const messageAttributes: Record> = {} + + for (const [name, attribute] of Object.entries(options.messageAttributes)) { + const valueParameterSuffix = + attribute.type === 'Binary' ? 'BinaryValue' : 'StringValue' + messageAttributes[name] = { + DataType: attribute.type, + } + messageAttributes[name][valueParameterSuffix] = attribute.value + } + + body = { ...body, MessageAttributes: messageAttributes } + } + + if (typeof options.delaySeconds !== 'undefined') { + body = { ...body, DelaySeconds: options.delaySeconds } + } + + return body + } + + private async _sendRequest( + action: SQSOperation, + body: object + ): Promise> { + const signedRequest = this.signature.sign( { method: 'POST', endpoint: this.endpoint, path: '/', headers: { ...this.commonHeaders, - Host: this.endpoint.host, + [AMZ_TARGET_HEADER]: `${this.serviceVersion}.${action}`, }, - body: toFormUrlEncoded(body), + body: JSON.stringify(body), }, {} ) - const res = await http.asyncRequest(method, signedRequest.url, signedRequest.body || '', { + const res = await http.asyncRequest('POST', signedRequest.url, signedRequest.body, { headers: signedRequest.headers, }) - this._handleError('ListQueues', res) - const parsed = res.html() - return { - urls: parsed - .find('QueueUrl') - .toArray() - .map((e) => e.text()), - nextToken: parsed.find('NextToken').text() || undefined, - } + this._handleError(action, res) + return res } private _handleError( @@ -178,18 +219,21 @@ export class SQSClient extends AWSClient { response: RefinedResponse ) { const errorCode: number = response.error_code - const errorMessage: string = response.error - if (errorMessage == '' && errorCode === 0) { + if (errorCode === 0) { return } - const awsError = AWSError.parseXML(response.body as string) - switch (awsError.code) { - case 'AuthorizationHeaderMalformed': - throw new InvalidSignatureError(awsError.message, awsError.code) + const error = response.json() as JSONObject + + const errorMessage: string = + (error.Message as string) || (error.message as string) || (error.__type as string) + + switch (error.__type) { + case 'InvalidSignatureException': + throw new InvalidSignatureError(errorMessage, error.__type) default: - throw new SQSServiceError(awsError.message, awsError.code || 'unknown', operation) + throw new SQSServiceError(errorMessage, error.__type as string, operation) } } } @@ -197,7 +241,7 @@ export class SQSClient extends AWSClient { /** * An Amazon SQS message. */ -export class Message { +export class MessageResponse { /** * A unique identifier for the message. * A MessageIdis considered unique across all AWS accounts for an extended period of time. @@ -221,6 +265,32 @@ export class Message { } } +/** + * An Amazon SQS message Batch Response. + */ +export class MessageBatchResponse { + /** + * A list of successful messages. + */ + successful: MessageResponse[] + + /** + * A list of failed messages. + */ + failed: SQSServiceError[] + + /** + * Instantiates a new MessageBatchResponse object. + * + * @param successful + * @param failed + */ + constructor(successful: MessageResponse[], failed: SQSServiceError[]) { + this.successful = successful + this.failed = failed + } +} + /** * SQSServiceError indicates an error occurred while interacting with the SQS API. */ @@ -237,7 +307,7 @@ export class SQSServiceError extends AWSError { /** * SQSOperation describes possible SQS operations. */ -type SQSOperation = 'ListQueues' | 'SendMessage' +type SQSOperation = 'ListQueues' | 'SendMessage' | 'SendMessageBatch' export interface SendMessageOptions { /** @@ -263,6 +333,12 @@ export interface SendMessageOptions { delaySeconds?: number } +export interface SendMessageBatchEntry { + messageId: string + messageBody: string + messageOptions?: SendMessageOptions +} + export interface ListQueuesRequestParameters { /** * Maximum number of results to include in the response. Value range is 1 to 1000. diff --git a/tests/internal/sqs.js b/tests/internal/sqs.js index 470cff6..5027685 100644 --- a/tests/internal/sqs.js +++ b/tests/internal/sqs.js @@ -46,17 +46,17 @@ export async function sqsTestSuite(data) { messageAttributes: { 'test-string': { type: 'String', - value: 'test' + value: 'test', }, 'test-number': { type: 'Number', - value: '23' + value: '23', }, 'test-binary': { type: 'Binary', - value: b64encode('test') - } - } + value: b64encode('test'), + }, + }, }) // Assert @@ -92,4 +92,53 @@ export async function sqsTestSuite(data) { expect(sendMessageToNonExistentQueueError).to.not.be.undefined expect(sendMessageToNonExistentQueueError).to.be.an.instanceOf(SQSServiceError) }) + + await asyncDescribe('sqs.sendMessageBatch successful', async (expect) => { + // Arrange + const queues = await sqsClient.listQueues() + const standardQueueUrl = queues.urls[0] + const messageBatch = [ + { messageId: '0', messageBody: 'test0' }, + { messageId: '1', messageBody: 'test1' }, + ] + + // Act + const messageBatchResponse = await sqsClient.sendMessageBatch( + standardQueueUrl, + messageBatch + ) + + // Assert + const test0Md5 = 'f6f4061a1bddc1c04d8109b39f581270' + const test1Md5 = '5a105e8b9d40e1329780d62ea2265d8a' + + expect(messageBatchResponse.successful).to.have.length(2) + expect(messageBatchResponse.successful[0].id).to.be.a('string') + expect(messageBatchResponse.successful[0].bodyMD5).to.equal(test0Md5) + + expect(messageBatchResponse.successful[1].id).to.be.a('string') + expect(messageBatchResponse.successful[1].bodyMD5).to.equal(test1Md5) + }) + + await asyncDescribe('sqs.sendMessageBatch ids not distinct', async (expect) => { + // Arrange + const queues = await sqsClient.listQueues() + const standardQueueUrl = queues.urls[0] + const messageBatch = [ + { messageId: '0', messageBody: 'test0' }, + { messageId: '0', messageBody: 'test0' }, + ] + + let batchEntryIdsNotDistinctError + try { + // Act + await sqsClient.sendMessageBatch(standardQueueUrl, messageBatch) + } catch (error) { + batchEntryIdsNotDistinctError = error + } + + // Assert + expect(batchEntryIdsNotDistinctError).to.not.be.undefined + expect(batchEntryIdsNotDistinctError).to.be.an.instanceOf(SQSServiceError) + }) }