Skip to content

Commit

Permalink
feat(rabbit-bus): add rabbit connection class
Browse files Browse the repository at this point in the history
  • Loading branch information
gtoselli committed Mar 29, 2024
1 parent 5932b23 commit 3514f6a
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 27 deletions.
112 changes: 112 additions & 0 deletions packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { Channel, ConfirmChannel, connect, Connection } from 'amqplib';
import { ILogger } from '@fizzbuds/ddd-toolkit';
import { inspect } from 'util';

// FROM https://github.com/gpad/ms-practical-ws/blob/main/src/infra/rabbit.ts

export class RabbitConnection {
private static RECONNECTION_TIMEOUT = 2000;

private connection: Connection;
private consumerChannel: Channel;
private producerChannel: ConfirmChannel;

private waiting = false;
private stopping = false;

constructor(
private readonly amqpUri: string,
private readonly exchangeName: string,
private readonly prefetch: number,
private readonly logger: ILogger = console,
) {}

public async setupConnection(): Promise<void> {
try {
this.logger.debug('Starting Rabbit connection');
this.stopping = false;
this.connection = await connect(this.amqpUri);
this.connection.on('error', async (err) => {
if (this.stopping) return;
this.logger.error(`Connection with rabbit closed with ${inspect(err)} try to reconnect`);
this.scheduleReconnection();
});
this.connection.on('close', async (reason) => {
if (this.stopping) return;
this.logger.debug(`Connection with rabbit closed with ${inspect(reason)} try to reconnect`);
this.scheduleReconnection();
});
await this.setupConsumerChannel();
await this.setupProducerChannel();
await this.setupExchanges();
this.logger.debug('Rabbit connection established');
} catch (error) {
this.logger.error(`Error connection ${inspect(error)}`);
throw error;
}
}

public getConsumerChannel(): Channel {
return this.consumerChannel;
}

public getProducerChannel(): ConfirmChannel {
return this.producerChannel;
}

public async terminate() {
this.logger.debug('Stopping rabbit connection');
this.stopping = true;
await this.producerChannel?.close();
await this.consumerChannel?.close();
await this.connection?.close();
this.logger.debug('Rabbit connection stopped');
}

private scheduleReconnection() {
if (this.waiting) {
this.logger.warn('Reconnection already scheduled');
return;
}
this.waiting = true;
setTimeout(async () => {
this.waiting = false;
try {
await this.setupConnection();
} catch (error) {
this.logger.error(`Unable to connect with rabbit, schedule a new connection`);
this.scheduleReconnection();
}
}, RabbitConnection.RECONNECTION_TIMEOUT);
}

private async setupConsumerChannel() {
this.consumerChannel = await this.connection.createConfirmChannel();
await this.consumerChannel.prefetch(this.prefetch);

this.consumerChannel.on('error', async (err) => {
if (!this.stopping) return;
this.logger.error(`Consumer channel with rabbit closed with ${inspect(err)} try to recreate`);
await new Promise((resolve) => setTimeout(resolve, RabbitConnection.RECONNECTION_TIMEOUT));
await this.setupConsumerChannel();
});
}

private async setupProducerChannel() {
this.producerChannel = await this.connection.createConfirmChannel();
this.producerChannel.on('error', async (err) => {
this.logger.error(`Producer channel with rabbit closed with ${inspect(err)} try to recreate`);
await new Promise((resolve) => setTimeout(resolve, RabbitConnection.RECONNECTION_TIMEOUT));
await this.setupProducerChannel();
});
}

private async setupExchanges() {
if (!this.producerChannel) {
throw new Error('Unable to setup exchange because channel is null');
}
await this.producerChannel.assertExchange(this.exchangeName, 'direct', {
durable: true,
});
}
}
48 changes: 21 additions & 27 deletions packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,72 +8,66 @@ import {
IRetryMechanism,
} from '../../ddd-toolkit';

import { Channel, ConfirmChannel, connect, Connection, ConsumeMessage } from 'amqplib';
import { ConsumeMessage } from 'amqplib';
import { inspect } from 'util';
import { RabbitConnection } from './rabbit-connection';

export class RabbitEventBus implements IEventBus {
private amqpConnection: Connection;
private consumerChannel: Channel;
private producerChannel: ConfirmChannel;
private connection: RabbitConnection;

private handlers: { eventName: string; queueName: string; handler: IEventHandler<IEvent<unknown>> }[] = [];

constructor(
private readonly amqpUrl: string,
amqpUrl: string,
private readonly exchangeName: string,
private readonly consumerPrefetch: number = 10,
consumerPrefetch: number = 10,
private readonly maxAttempts: number = 3,
private readonly exponentialBackoff: IRetryMechanism = new ExponentialBackoff(1000),
private readonly logger: ILogger,
private readonly logger: ILogger = console,
private readonly queuePrefix: string = '',
private readonly queueNameFormatter: (handlerName: string) => string = camelCaseToKebabCase,
private readonly queueExpirationMs: number = 30 * 60000,
) {}
) {
this.connection = new RabbitConnection(amqpUrl, exchangeName, consumerPrefetch, logger);
}

public async init(): Promise<void> {
this.amqpConnection = await connect(this.amqpUrl);
this.consumerChannel = await this.amqpConnection.createChannel();
this.producerChannel = await this.amqpConnection.createConfirmChannel();

await this.consumerChannel.assertExchange(this.exchangeName, 'direct', { durable: true });
await this.consumerChannel.prefetch(this.consumerPrefetch);
await this.connection.setupConnection();
}

public async subscribe<T extends IEvent<unknown>>(event: IEventClass<T>, handler: IEventHandler<T>): Promise<void> {
const queueName = this.queuePrefix + this.queueNameFormatter(handler.constructor.name);
if (this.handlers.find((h) => h.queueName === queueName))
throw new Error(`Handler ${handler.constructor.name} already exists`);

await this.consumerChannel.assertQueue(queueName, {
await this.connection.getConsumerChannel().assertQueue(queueName, {
durable: true,
arguments: { 'x-queue-type': 'quorum', 'x-expires': this.queueExpirationMs },
});

this.handlers.push({ eventName: event.name, queueName, handler });

await this.consumerChannel.consume(queueName, (msg) => this.onMessage(msg, queueName));
await this.consumerChannel.bindQueue(queueName, this.exchangeName, event.name);
await this.connection.getConsumerChannel().consume(queueName, (msg) => this.onMessage(msg, queueName));
await this.connection.getConsumerChannel().bindQueue(queueName, this.exchangeName, event.name);
}

public async publish<T extends IEvent<unknown>>(event: T): Promise<void> {
const serializedEvent = JSON.stringify(event);
const message = Buffer.from(serializedEvent);
this.producerChannel.publish(this.exchangeName, event.name, message);
await this.producerChannel.waitForConfirms();
this.connection.getProducerChannel().publish(this.exchangeName, event.name, message);
await this.connection.getProducerChannel().waitForConfirms();
}

public async terminate(): Promise<void> {
await this.consumerChannel.close();
await this.producerChannel.close();
await this.amqpConnection.close();
await this.connection.terminate();
}

private async onMessage(rawMessage: ConsumeMessage | null, queueName: string) {
if (rawMessage === null) return;
const parsedMessage = JSON.parse(rawMessage.content.toString());

if (!this.isAValidMessage(parsedMessage)) {
this.consumerChannel.nack(rawMessage, false, false);
this.connection.getConsumerChannel().nack(rawMessage, false, false);
this.logger.warn(`Message discarded due to invalid format`);
return;
}
Expand All @@ -82,23 +76,23 @@ export class RabbitEventBus implements IEventBus {
?.handler;

if (!handler) {
this.consumerChannel.nack(rawMessage, false, false);
this.connection.getConsumerChannel().nack(rawMessage, false, false);
this.logger.warn(`Message discarded due to missing handler for ${parsedMessage.name}`);
return;
}

try {
await handler.handle(parsedMessage);
this.consumerChannel.ack(rawMessage);
this.connection.getConsumerChannel().ack(rawMessage);
} catch (e) {
this.logger.warn(`Error handling message due ${inspect(e)}`);
const deliveryCount = rawMessage.properties.headers?.['x-delivery-count'] || 0;
if (deliveryCount < this.maxAttempts) {
await new Promise((resolve) => setTimeout(resolve, this.exponentialBackoff.getDelay(deliveryCount)));
this.consumerChannel.nack(rawMessage, false, true);
this.connection.getConsumerChannel().nack(rawMessage, false, true);
this.logger.warn(`Message re-queued due ${inspect(e)}`);
} else {
this.consumerChannel.nack(rawMessage, false, false);
this.connection.getConsumerChannel().nack(rawMessage, false, false);
this.logger.error(`Message sent to dlq due ${inspect(e)}`);
}
}
Expand Down

0 comments on commit 3514f6a

Please sign in to comment.