Skip to content

Commit

Permalink
Merge branch 'main' into pr/414
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholasgriffintn committed Nov 11, 2023
2 parents 020d9d1 + d0e815f commit ab1c57f
Show file tree
Hide file tree
Showing 11 changed files with 2,949 additions and 2,108 deletions.
14 changes: 14 additions & 0 deletions .github/workflows/dependency-review.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: 'Dependency Review'
on: [pull_request]

permissions:
contents: read

jobs:
dependency-review:
runs-on: ubuntu-latest
steps:
- name: 'Checkout Repository'
uses: actions/checkout@v3
- name: 'Dependency Review'
uses: actions/dependency-review-action@v3
4 changes: 4 additions & 0 deletions .github/workflows/stale.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ jobs:
days-before-issue-close: 5
days-before-pr-close: 10
operations-per-run: 90
exempt-issue-labels: keep
exempt-pr-labels: keep
exempt-all-assignees: true
exempt-all-milestones: true
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [16.x, 18.x]
node-version: [18.x, 20.x]

steps:
- uses: actions/checkout@v3
Expand Down
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ npm install sqs-consumer
### Node version
From v7 and above, this library will only support Node v16 or above. If you are still using Node 14, please use a previous version of the library.
This decision was made due to the removal of security support from the Node.JS team from April 30th, 2023.
We will only support Node versions that are actively or security supported by the Node team. If you are still using an Node 14, please use a version of this library before the v7 release, if you are using Node 16, please use a version before the v7.3.0 release.
## Usage
Expand Down
4,878 changes: 2,812 additions & 2,066 deletions package-lock.json

Large diffs are not rendered by default.

47 changes: 24 additions & 23 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{
"name": "sqs-consumer",
"version": "7.2.2",
"version": "7.4.0",
"description": "Build SQS-based Node applications without the boilerplate",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"engines": {
"node": ">=16.0.0"
"node": ">=18.0.0"
},
"scripts": {
"build": "npm run clean && tsc",
Expand All @@ -17,8 +17,8 @@
"test:integration:init": "sh ./test/scripts/initIntTests.sh",
"test:integration": "npm run test:integration:init && cucumber-js --config ./test/config/cucumber.js",
"test": "npm run test:unit && npm run test:integration",
"coverage": "nyc mocha && nyc report --reporter=html && nyc report --reporter=json-summary",
"lcov": "nyc mocha && nyc report --reporter=lcov",
"coverage": "c8 mocha && c8 report --reporter=html && c8 report --reporter=json-summary",
"lcov": "c8 mocha && c8 report --reporter=lcov",
"lint": "eslint . --ext .ts",
"lint:fix": "eslint . --fix",
"format": "prettier --loglevel warn --write \"**/*.{js,json,jsx,md,ts,tsx,html}\"",
Expand All @@ -41,37 +41,37 @@
],
"license": "Apache-2.0",
"devDependencies": {
"@cucumber/cucumber": "^9.2.0",
"@types/chai": "^4.3.5",
"@types/mocha": "^10.0.1",
"@types/node": "^20.4.1",
"@types/sinon": "^10.0.15",
"chai": "^4.3.7",
"eslint": "^8.44.0",
"eslint-config-iplayer-ts": "^4.1.1",
"eslint-config-prettier": "^8.8.0",
"@cucumber/cucumber": "10.0.1",
"@types/chai": "^4.3.9",
"@types/mocha": "^10.0.3",
"@types/node": "^20.8.7",
"@types/sinon": "^10.0.20",
"chai": "^4.3.10",
"eslint": "^8.52.0",
"eslint-config-iplayer": "^9.1.0",
"eslint-config-prettier": "^9.0.0",
"mocha": "^10.2.0",
"nyc": "^15.1.0",
"c8": "^8.0.1",
"p-event": "^4.2.0",
"prettier": "^3.0.0",
"sinon": "^15.2.0",
"sqs-producer": "^3.2.1",
"prettier": "^3.0.3",
"sinon": "^17.0.0",
"sqs-producer": "^3.4.0",
"ts-node": "^10.9.1",
"typedoc": "^0.24.8",
"typescript": "^4.9.4"
"typedoc": "^0.25.2",
"typescript": "^5.2.2"
},
"dependencies": {
"@aws-sdk/client-sqs": "^3.363.0",
"@aws-sdk/client-sqs": "^3.428.0",
"debug": "^4.3.4"
},
"peerDependencies": {
"@aws-sdk/client-sqs": "^3.363.0"
"@aws-sdk/client-sqs": "^3.428.0"
},
"mocha": {
"spec": "test/tests/**/**/*.test.ts",
"require": "ts-node/register"
},
"nyc": {
"c8": {
"include": [
"src/**/*.ts"
],
Expand All @@ -86,7 +86,8 @@
},
"eslintConfig": {
"extends": [
"iplayer-ts",
"iplayer/base",
"iplayer/ts",
"prettier"
],
"parserOptions": {
Expand Down
38 changes: 28 additions & 10 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import {
isConnectionError
} from './errors';
import { validateOption, assertOptions, hasMessages } from './validation';
import { abortController } from './controllers';
import { logger } from './logger';

/**
Expand All @@ -40,6 +39,8 @@ export class Consumer extends TypedEventEmitter {
private queueUrl: string;
private handleMessage: (message: Message) => Promise<Message | void>;
private handleMessageBatch: (message: Message[]) => Promise<Message[] | void>;
private preReceiveMessageCallback?: () => Promise<void>;
private postReceiveMessageCallback?: () => Promise<void>;
private sqs: SQSClient;
private handleMessageTimeout: number;
private attributeNames: string[];
Expand All @@ -52,13 +53,16 @@ export class Consumer extends TypedEventEmitter {
private authenticationErrorTimeout: number;
private pollingWaitTimeMs: number;
private heartbeatInterval: number;
public abortController: AbortController;

constructor(options: ConsumerOptions) {
super();
assertOptions(options);
this.queueUrl = options.queueUrl;
this.handleMessage = options.handleMessage;
this.handleMessageBatch = options.handleMessageBatch;
this.preReceiveMessageCallback = options.preReceiveMessageCallback;
this.postReceiveMessageCallback = options.postReceiveMessageCallback;
this.handleMessageTimeout = options.handleMessageTimeout;
this.attributeNames = options.attributeNames || [];
this.messageAttributeNames = options.messageAttributeNames || [];
Expand Down Expand Up @@ -92,13 +96,26 @@ export class Consumer extends TypedEventEmitter {
*/
public start(): void {
if (this.stopped) {
// Create a new abort controller each time the consumer is started
this.abortController = new AbortController();
logger.debug('starting');
this.stopped = false;
this.emit('started');
this.poll();
}
}

/**
* A reusable options object for sqs.send that's used to avoid duplication.
*/
private get sqsSendOptions(): { abortSignal: AbortSignal } {
return {
// return the current abortController signal or a fresh signal that has not been aborted.
// This effectively defaults the signal sent to the AWS SDK to not aborted
abortSignal: this.abortController?.signal || new AbortController().signal
};
}

/**
* Stop polling the queue for messages (pre existing requests will still be made until concluded).
*/
Expand All @@ -118,7 +135,7 @@ export class Consumer extends TypedEventEmitter {

if (options?.abort) {
logger.debug('aborting');
abortController.abort();
this.abortController.abort();
this.emit('aborted');
}

Expand Down Expand Up @@ -165,13 +182,6 @@ export class Consumer extends TypedEventEmitter {
}
}

/**
* A reusable options object for sqs.send that's used to avoid duplication.
*/
private sqsSendOptions = {
abortSignal: abortController.signal
};

/**
* Poll for new messages from SQS
*/
Expand Down Expand Up @@ -225,10 +235,18 @@ export class Consumer extends TypedEventEmitter {
params: ReceiveMessageCommandInput
): Promise<ReceiveMessageCommandOutput> {
try {
return await this.sqs.send(
if (this.preReceiveMessageCallback) {
await this.preReceiveMessageCallback();
}
const result = await this.sqs.send(
new ReceiveMessageCommand(params),
this.sqsSendOptions
);
if (this.postReceiveMessageCallback) {
await this.postReceiveMessageCallback();
}

return result;
} catch (err) {
throw toSQSError(err, `SQS receive message failed: ${err.message}`);
}
Expand Down
1 change: 0 additions & 1 deletion src/controllers.ts

This file was deleted.

16 changes: 16 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,22 @@ export interface ConsumerOptions {
* the successful messages only.
*/
handleMessageBatch?(messages: Message[]): Promise<Message[] | void>;
/**
* An `async` function (or function that returns a `Promise`) to be called right
* before the SQS Client sends a receive message command.
*
* This function is usefull if SQS Client module exports have been modified, for
* example to add middlewares.
*/
preReceiveMessageCallback?(): Promise<void>;
/**
* An `async` function (or function that returns a `Promise`) to be called right
* after the SQS Client sends a receive message command.
*
* This function is usefull if SQS Client module exports have been modified, for
* example to add middlewares.
*/
postReceiveMessageCallback?(): Promise<void>;
}

export type UpdatableOptions =
Expand Down
1 change: 0 additions & 1 deletion test/config/cucumber.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ module.exports = {
default: {
parallel: 0,
format: ['html:test/reports/cucumber-report.html'],
publishQuiet: true,
paths: ['test/features'],
forceExit: true
}
Expand Down
52 changes: 49 additions & 3 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import * as pEvent from 'p-event';

import { AWSError } from '../../src/types';
import { Consumer } from '../../src/consumer';
import { abortController } from '../../src/controllers';
import { logger } from '../../src/logger';

const sandbox = sinon.createSandbox();
Expand Down Expand Up @@ -168,6 +167,32 @@ describe('Consumer', () => {
});

describe('.start', () => {
it('uses the correct abort signal', async () => {
sqs.send
.withArgs(mockReceiveMessage)
.resolves(new Promise((res) => setTimeout(res, 100)));

// Starts and abort is false
consumer.start();
assert.isFalse(sqs.send.lastCall.lastArg.abortSignal.aborted);

// normal stop without an abort and abort is false
consumer.stop();
assert.isFalse(sqs.send.lastCall.lastArg.abortSignal.aborted);

// Starts and abort is false
consumer.start();
assert.isFalse(sqs.send.lastCall.lastArg.abortSignal.aborted);

// Stop with abort and abort is true
consumer.stop({ abort: true });
assert.isTrue(sqs.send.lastCall.lastArg.abortSignal.aborted);

// Starts and abort is false
consumer.start();
assert.isFalse(sqs.send.lastCall.lastArg.abortSignal.aborted);
});

it('fires an event when the consumer is started', async () => {
const handleStart = sandbox.stub().returns(null);

Expand Down Expand Up @@ -506,6 +531,28 @@ describe('Consumer', () => {
sandbox.assert.calledWith(handleMessage, response.Messages[0]);
});

it('calls the preReceiveMessageCallback and postReceiveMessageCallback function before receiving a message', async () => {
const preReceiveMessageCallbackStub = sandbox.stub().resolves(null);
const postReceiveMessageCallbackStub = sandbox.stub().resolves(null);

consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
preReceiveMessageCallback: preReceiveMessageCallbackStub,
postReceiveMessageCallback: postReceiveMessageCallbackStub
});

consumer.start();
await pEvent(consumer, 'message_processed');
consumer.stop();

sandbox.assert.calledOnce(preReceiveMessageCallbackStub);
sandbox.assert.calledOnce(postReceiveMessageCallbackStub);
});

it('deletes the message when the handleMessage function is called', async () => {
handleMessage.resolves();

Expand Down Expand Up @@ -1359,7 +1406,6 @@ describe('Consumer', () => {
it('aborts requests when the abort param is true', async () => {
const handleStop = sandbox.stub().returns(null);
const handleAbort = sandbox.stub().returns(null);
const abortControllerAbort = sandbox.stub(abortController, 'abort');

consumer.on('stopped', handleStop);
consumer.on('aborted', handleAbort);
Expand All @@ -1369,8 +1415,8 @@ describe('Consumer', () => {

await clock.runAllAsync();

assert.isTrue(consumer.abortController.signal.aborted);
sandbox.assert.calledOnce(handleMessage);
sandbox.assert.calledOnce(abortControllerAbort);
sandbox.assert.calledOnce(handleAbort);
sandbox.assert.calledOnce(handleStop);
});
Expand Down

0 comments on commit ab1c57f

Please sign in to comment.