From d3addc595dae90ee4c151c476d0d9f0a4f4d3376 Mon Sep 17 00:00:00 2001 From: Gabriele Toselli Date: Sat, 30 Mar 2024 00:27:56 +0100 Subject: [PATCH] feat(rabbit-bus): add rabbit connection class --- .../src/rabbit-connection.ts | 112 ++++++++++++++++++ .../src/rabbit-event-bus.spec.ts | 20 +--- .../src/rabbit-event-bus.ts | 50 ++++---- packages/ddd-toolkit/src/index.ts | 1 + 4 files changed, 136 insertions(+), 47 deletions(-) create mode 100644 packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts diff --git a/packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts b/packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts new file mode 100644 index 0000000..a32f6a2 --- /dev/null +++ b/packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts @@ -0,0 +1,112 @@ +import { Channel, ConfirmChannel, connect, Connection } from 'amqplib'; +import { ILogger } from '@fizzbuds/ddd-toolkit'; +import { inspect } from 'util'; + +// FROM https://github.com/gpad/ms-practical-ws/blob/main/src/infra/rabbit.ts + +export class RabbitConnection { + private static RECONNECTION_TIMEOUT = 2000; + + private connection: Connection; + private consumerChannel: Channel; + private producerChannel: ConfirmChannel; + + private waiting = false; + private stopping = false; + + constructor( + private readonly amqpUri: string, + private readonly exchangeName: string, + private readonly prefetch: number, + private readonly logger: ILogger = console, + ) {} + + public async setupConnection(): Promise { + try { + this.logger.debug('Starting Rabbit connection'); + this.stopping = false; + this.connection = await connect(this.amqpUri); + this.connection.on('error', async (err) => { + if (this.stopping) return; + this.logger.error(`Connection with rabbit closed with ${inspect(err)} try to reconnect`); + this.scheduleReconnection(); + }); + this.connection.on('close', async (reason) => { + if (this.stopping) return; + this.logger.debug(`Connection with rabbit closed with ${inspect(reason)} try to reconnect`); + this.scheduleReconnection(); + }); + await this.setupConsumerChannel(); + await this.setupProducerChannel(); + await this.setupExchanges(); + this.logger.debug('Rabbit connection established'); + } catch (error) { + this.logger.error(`Error connection ${inspect(error)}`); + throw error; + } + } + + public getConsumerChannel(): Channel { + return this.consumerChannel; + } + + public getProducerChannel(): ConfirmChannel { + return this.producerChannel; + } + + public async terminate() { + this.logger.debug('Stopping rabbit connection'); + this.stopping = true; + await this.producerChannel?.close(); + await this.consumerChannel?.close(); + await this.connection?.close(); + this.logger.debug('Rabbit connection stopped'); + } + + private scheduleReconnection() { + if (this.waiting) { + this.logger.warn('Reconnection already scheduled'); + return; + } + this.waiting = true; + setTimeout(async () => { + this.waiting = false; + try { + await this.setupConnection(); + } catch (error) { + this.logger.error(`Unable to connect with rabbit, schedule a new connection`); + this.scheduleReconnection(); + } + }, RabbitConnection.RECONNECTION_TIMEOUT); + } + + private async setupConsumerChannel() { + this.consumerChannel = await this.connection.createConfirmChannel(); + await this.consumerChannel.prefetch(this.prefetch); + + this.consumerChannel.on('error', async (err) => { + if (!this.stopping) return; + this.logger.error(`Consumer channel with rabbit closed with ${inspect(err)} try to recreate`); + await new Promise((resolve) => setTimeout(resolve, RabbitConnection.RECONNECTION_TIMEOUT)); + await this.setupConsumerChannel(); + }); + } + + private async setupProducerChannel() { + this.producerChannel = await this.connection.createConfirmChannel(); + this.producerChannel.on('error', async (err) => { + this.logger.error(`Producer channel with rabbit closed with ${inspect(err)} try to recreate`); + await new Promise((resolve) => setTimeout(resolve, RabbitConnection.RECONNECTION_TIMEOUT)); + await this.setupProducerChannel(); + }); + } + + private async setupExchanges() { + if (!this.producerChannel) { + throw new Error('Unable to setup exchange because channel is null'); + } + await this.producerChannel.assertExchange(this.exchangeName, 'direct', { + durable: true, + }); + } +} diff --git a/packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.spec.ts b/packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.spec.ts index a99296c..e71ff58 100644 --- a/packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.spec.ts +++ b/packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.spec.ts @@ -1,4 +1,4 @@ -import { Event, IEventHandler, ILogger } from '../../ddd-toolkit'; +import { Event, IEventHandler, ILogger, waitFor } from '@fizzbuds/ddd-toolkit'; import { RabbitEventBus } from './rabbit-event-bus'; const loggerMock: ILogger = { @@ -145,21 +145,3 @@ describe('RabbitEventBus', () => { }); }); }); - -async function waitFor(statement: () => void, timeout = 1000): Promise { - const startTime = Date.now(); - - let latestStatementError; - while (true) { - try { - statement(); - return; - } catch (e) { - latestStatementError = e; - } - - if (Date.now() - startTime > timeout) throw latestStatementError; - - await new Promise((resolve) => setTimeout(resolve, 100)); - } -} diff --git a/packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts b/packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts index ae2f658..58dfcd1 100644 --- a/packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts +++ b/packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts @@ -6,37 +6,33 @@ import { IEventHandler, ILogger, IRetryMechanism, -} from '../../ddd-toolkit'; +} from '@fizzbuds/ddd-toolkit'; -import { Channel, ConfirmChannel, connect, Connection, ConsumeMessage } from 'amqplib'; +import { ConsumeMessage } from 'amqplib'; import { inspect } from 'util'; +import { RabbitConnection } from './rabbit-connection'; export class RabbitEventBus implements IEventBus { - private amqpConnection: Connection; - private consumerChannel: Channel; - private producerChannel: ConfirmChannel; + private connection: RabbitConnection; private handlers: { eventName: string; queueName: string; handler: IEventHandler> }[] = []; constructor( - private readonly amqpUrl: string, + amqpUrl: string, private readonly exchangeName: string, - private readonly consumerPrefetch: number = 10, + consumerPrefetch: number = 10, private readonly maxAttempts: number = 3, private readonly exponentialBackoff: IRetryMechanism = new ExponentialBackoff(1000), - private readonly logger: ILogger, + private readonly logger: ILogger = console, private readonly queuePrefix: string = '', private readonly queueNameFormatter: (handlerName: string) => string = camelCaseToKebabCase, private readonly queueExpirationMs: number = 30 * 60000, - ) {} + ) { + this.connection = new RabbitConnection(amqpUrl, exchangeName, consumerPrefetch, logger); + } public async init(): Promise { - this.amqpConnection = await connect(this.amqpUrl); - this.consumerChannel = await this.amqpConnection.createChannel(); - this.producerChannel = await this.amqpConnection.createConfirmChannel(); - - await this.consumerChannel.assertExchange(this.exchangeName, 'direct', { durable: true }); - await this.consumerChannel.prefetch(this.consumerPrefetch); + await this.connection.setupConnection(); } public async subscribe>(event: IEventClass, handler: IEventHandler): Promise { @@ -44,28 +40,26 @@ export class RabbitEventBus implements IEventBus { if (this.handlers.find((h) => h.queueName === queueName)) throw new Error(`Handler ${handler.constructor.name} already exists`); - await this.consumerChannel.assertQueue(queueName, { + await this.connection.getConsumerChannel().assertQueue(queueName, { durable: true, arguments: { 'x-queue-type': 'quorum', 'x-expires': this.queueExpirationMs }, }); this.handlers.push({ eventName: event.name, queueName, handler }); - await this.consumerChannel.consume(queueName, (msg) => this.onMessage(msg, queueName)); - await this.consumerChannel.bindQueue(queueName, this.exchangeName, event.name); + await this.connection.getConsumerChannel().consume(queueName, (msg) => this.onMessage(msg, queueName)); + await this.connection.getConsumerChannel().bindQueue(queueName, this.exchangeName, event.name); } public async publish>(event: T): Promise { const serializedEvent = JSON.stringify(event); const message = Buffer.from(serializedEvent); - this.producerChannel.publish(this.exchangeName, event.name, message); - await this.producerChannel.waitForConfirms(); + this.connection.getProducerChannel().publish(this.exchangeName, event.name, message); + await this.connection.getProducerChannel().waitForConfirms(); } public async terminate(): Promise { - await this.consumerChannel.close(); - await this.producerChannel.close(); - await this.amqpConnection.close(); + await this.connection.terminate(); } private async onMessage(rawMessage: ConsumeMessage | null, queueName: string) { @@ -73,7 +67,7 @@ export class RabbitEventBus implements IEventBus { const parsedMessage = JSON.parse(rawMessage.content.toString()); if (!this.isAValidMessage(parsedMessage)) { - this.consumerChannel.nack(rawMessage, false, false); + this.connection.getConsumerChannel().nack(rawMessage, false, false); this.logger.warn(`Message discarded due to invalid format`); return; } @@ -82,23 +76,23 @@ export class RabbitEventBus implements IEventBus { ?.handler; if (!handler) { - this.consumerChannel.nack(rawMessage, false, false); + this.connection.getConsumerChannel().nack(rawMessage, false, false); this.logger.warn(`Message discarded due to missing handler for ${parsedMessage.name}`); return; } try { await handler.handle(parsedMessage); - this.consumerChannel.ack(rawMessage); + this.connection.getConsumerChannel().ack(rawMessage); } catch (e) { this.logger.warn(`Error handling message due ${inspect(e)}`); const deliveryCount = rawMessage.properties.headers?.['x-delivery-count'] || 0; if (deliveryCount < this.maxAttempts) { await new Promise((resolve) => setTimeout(resolve, this.exponentialBackoff.getDelay(deliveryCount))); - this.consumerChannel.nack(rawMessage, false, true); + this.connection.getConsumerChannel().nack(rawMessage, false, true); this.logger.warn(`Message re-queued due ${inspect(e)}`); } else { - this.consumerChannel.nack(rawMessage, false, false); + this.connection.getConsumerChannel().nack(rawMessage, false, false); this.logger.error(`Message sent to dlq due ${inspect(e)}`); } } diff --git a/packages/ddd-toolkit/src/index.ts b/packages/ddd-toolkit/src/index.ts index 344c7ff..56eb4e1 100644 --- a/packages/ddd-toolkit/src/index.ts +++ b/packages/ddd-toolkit/src/index.ts @@ -8,3 +8,4 @@ export * from './event-bus/event-bus.interface'; export * from './logger'; export * from './event-bus/event'; export * from './event-bus/exponential-backoff'; +export * from './utils';