Skip to content

Commit

Permalink
feat(rabbit-bus): add rabbit connection class
Browse files Browse the repository at this point in the history
  • Loading branch information
gtoselli committed Mar 30, 2024
1 parent 11a1807 commit 07030fd
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 47 deletions.
1 change: 1 addition & 0 deletions packages/ddd-toolkit-rabbit-bus/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './rabbit-event-bus';

Check warning on line 1 in packages/ddd-toolkit-rabbit-bus/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/index.ts#L1

Added line #L1 was not covered by tests
112 changes: 112 additions & 0 deletions packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { Channel, ConfirmChannel, connect, Connection } from 'amqplib';
import { ILogger } from '@fizzbuds/ddd-toolkit';
import { inspect } from 'util';

// FROM https://github.com/gpad/ms-practical-ws/blob/main/src/infra/rabbit.ts

export class RabbitConnection {
private static RECONNECTION_TIMEOUT = 2000;

private connection: Connection;
private consumerChannel: Channel;
private producerChannel: ConfirmChannel;

private waiting = false;
private stopping = false;

constructor(
private readonly amqpUri: string,
private readonly exchangeName: string,
private readonly prefetch: number,
private readonly logger: ILogger = console,
) {}

public async setupConnection(): Promise<void> {
try {
this.logger.debug('Starting Rabbit connection');
this.stopping = false;
this.connection = await connect(this.amqpUri);
this.connection.on('error', async (err) => {
if (this.stopping) return;
this.logger.error(`Connection with rabbit closed with ${inspect(err)} try to reconnect`);
this.scheduleReconnection();

Check warning on line 32 in packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts#L31-L32

Added lines #L31 - L32 were not covered by tests
});
this.connection.on('close', async (reason) => {
if (this.stopping) return;
this.logger.debug(`Connection with rabbit closed with ${inspect(reason)} try to reconnect`);
this.scheduleReconnection();

Check warning on line 37 in packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts#L36-L37

Added lines #L36 - L37 were not covered by tests
});
await this.setupConsumerChannel();
await this.setupProducerChannel();
await this.setupExchanges();
this.logger.debug('Rabbit connection established');
} catch (error) {
this.logger.error(`Error connection ${inspect(error)}`);
throw error;

Check warning on line 45 in packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts#L44-L45

Added lines #L44 - L45 were not covered by tests
}
}

public getConsumerChannel(): Channel {
return this.consumerChannel;
}

public getProducerChannel(): ConfirmChannel {
return this.producerChannel;
}

public async terminate() {
this.logger.debug('Stopping rabbit connection');
this.stopping = true;
await this.producerChannel?.close();
await this.consumerChannel?.close();
await this.connection?.close();
this.logger.debug('Rabbit connection stopped');
}

private scheduleReconnection() {

Check warning on line 66 in packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts#L66

Added line #L66 was not covered by tests
if (this.waiting) {
this.logger.warn('Reconnection already scheduled');
return;

Check warning on line 69 in packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts#L68-L69

Added lines #L68 - L69 were not covered by tests
}
this.waiting = true;
setTimeout(async () => {
this.waiting = false;
try {
await this.setupConnection();

Check warning on line 75 in packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts#L71-L75

Added lines #L71 - L75 were not covered by tests
} catch (error) {
this.logger.error(`Unable to connect with rabbit, schedule a new connection`);
this.scheduleReconnection();

Check warning on line 78 in packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts#L77-L78

Added lines #L77 - L78 were not covered by tests
}
}, RabbitConnection.RECONNECTION_TIMEOUT);
}

private async setupConsumerChannel() {
this.consumerChannel = await this.connection.createConfirmChannel();
await this.consumerChannel.prefetch(this.prefetch);

this.consumerChannel.on('error', async (err) => {
if (!this.stopping) return;
this.logger.error(`Consumer channel with rabbit closed with ${inspect(err)} try to recreate`);
await new Promise((resolve) => setTimeout(resolve, RabbitConnection.RECONNECTION_TIMEOUT));
await this.setupConsumerChannel();

Check warning on line 91 in packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts#L89-L91

Added lines #L89 - L91 were not covered by tests
});
}

private async setupProducerChannel() {
this.producerChannel = await this.connection.createConfirmChannel();
this.producerChannel.on('error', async (err) => {
this.logger.error(`Producer channel with rabbit closed with ${inspect(err)} try to recreate`);
await new Promise((resolve) => setTimeout(resolve, RabbitConnection.RECONNECTION_TIMEOUT));
await this.setupProducerChannel();

Check warning on line 100 in packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts#L98-L100

Added lines #L98 - L100 were not covered by tests
});
}

private async setupExchanges() {
if (!this.producerChannel) {
throw new Error('Unable to setup exchange because channel is null');

Check warning on line 106 in packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts#L106

Added line #L106 was not covered by tests
}
await this.producerChannel.assertExchange(this.exchangeName, 'direct', {
durable: true,
});
}
}
20 changes: 1 addition & 19 deletions packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Event, IEventHandler, ILogger } from '../../ddd-toolkit';
import { Event, IEventHandler, ILogger, waitFor } from '@fizzbuds/ddd-toolkit';
import { RabbitEventBus } from './rabbit-event-bus';

const loggerMock: ILogger = {
Expand Down Expand Up @@ -145,21 +145,3 @@ describe('RabbitEventBus', () => {
});
});
});

async function waitFor(statement: () => void, timeout = 1000): Promise<void> {
const startTime = Date.now();

let latestStatementError;
while (true) {
try {
statement();
return;
} catch (e) {
latestStatementError = e;
}

if (Date.now() - startTime > timeout) throw latestStatementError;

await new Promise((resolve) => setTimeout(resolve, 100));
}
}
50 changes: 22 additions & 28 deletions packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,74 +6,68 @@ import {
IEventHandler,
ILogger,
IRetryMechanism,
} from '../../ddd-toolkit';
} from '@fizzbuds/ddd-toolkit';

import { Channel, ConfirmChannel, connect, Connection, ConsumeMessage } from 'amqplib';
import { ConsumeMessage } from 'amqplib';
import { inspect } from 'util';
import { RabbitConnection } from './rabbit-connection';

export class RabbitEventBus implements IEventBus {
private amqpConnection: Connection;
private consumerChannel: Channel;
private producerChannel: ConfirmChannel;
private connection: RabbitConnection;

private handlers: { eventName: string; queueName: string; handler: IEventHandler<IEvent<unknown>> }[] = [];

constructor(
private readonly amqpUrl: string,
amqpUrl: string,
private readonly exchangeName: string,
private readonly consumerPrefetch: number = 10,
consumerPrefetch: number = 10,
private readonly maxAttempts: number = 3,
private readonly exponentialBackoff: IRetryMechanism = new ExponentialBackoff(1000),
private readonly logger: ILogger,
private readonly logger: ILogger = console,
private readonly queuePrefix: string = '',
private readonly queueNameFormatter: (handlerName: string) => string = camelCaseToKebabCase,
private readonly queueExpirationMs: number = 30 * 60000,
) {}
) {
this.connection = new RabbitConnection(amqpUrl, exchangeName, consumerPrefetch, logger);
}

public async init(): Promise<void> {
this.amqpConnection = await connect(this.amqpUrl);
this.consumerChannel = await this.amqpConnection.createChannel();
this.producerChannel = await this.amqpConnection.createConfirmChannel();

await this.consumerChannel.assertExchange(this.exchangeName, 'direct', { durable: true });
await this.consumerChannel.prefetch(this.consumerPrefetch);
await this.connection.setupConnection();
}

public async subscribe<T extends IEvent<unknown>>(event: IEventClass<T>, handler: IEventHandler<T>): Promise<void> {
const queueName = this.queuePrefix + this.queueNameFormatter(handler.constructor.name);
if (this.handlers.find((h) => h.queueName === queueName))
throw new Error(`Handler ${handler.constructor.name} already exists`);

await this.consumerChannel.assertQueue(queueName, {
await this.connection.getConsumerChannel().assertQueue(queueName, {
durable: true,
arguments: { 'x-queue-type': 'quorum', 'x-expires': this.queueExpirationMs },
});

this.handlers.push({ eventName: event.name, queueName, handler });

await this.consumerChannel.consume(queueName, (msg) => this.onMessage(msg, queueName));
await this.consumerChannel.bindQueue(queueName, this.exchangeName, event.name);
await this.connection.getConsumerChannel().consume(queueName, (msg) => this.onMessage(msg, queueName));
await this.connection.getConsumerChannel().bindQueue(queueName, this.exchangeName, event.name);
}

public async publish<T extends IEvent<unknown>>(event: T): Promise<void> {
const serializedEvent = JSON.stringify(event);
const message = Buffer.from(serializedEvent);
this.producerChannel.publish(this.exchangeName, event.name, message);
await this.producerChannel.waitForConfirms();
this.connection.getProducerChannel().publish(this.exchangeName, event.name, message);
await this.connection.getProducerChannel().waitForConfirms();
}

public async terminate(): Promise<void> {
await this.consumerChannel.close();
await this.producerChannel.close();
await this.amqpConnection.close();
await this.connection.terminate();
}

private async onMessage(rawMessage: ConsumeMessage | null, queueName: string) {
if (rawMessage === null) return;
const parsedMessage = JSON.parse(rawMessage.content.toString());

if (!this.isAValidMessage(parsedMessage)) {
this.consumerChannel.nack(rawMessage, false, false);
this.connection.getConsumerChannel().nack(rawMessage, false, false);
this.logger.warn(`Message discarded due to invalid format`);
return;

Check warning on line 72 in packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts#L70-L72

Added lines #L70 - L72 were not covered by tests
}
Expand All @@ -82,23 +76,23 @@ export class RabbitEventBus implements IEventBus {
?.handler;

if (!handler) {
this.consumerChannel.nack(rawMessage, false, false);
this.connection.getConsumerChannel().nack(rawMessage, false, false);
this.logger.warn(`Message discarded due to missing handler for ${parsedMessage.name}`);
return;

Check warning on line 81 in packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts#L79-L81

Added lines #L79 - L81 were not covered by tests
}

try {
await handler.handle(parsedMessage);
this.consumerChannel.ack(rawMessage);
this.connection.getConsumerChannel().ack(rawMessage);
} catch (e) {
this.logger.warn(`Error handling message due ${inspect(e)}`);

Check warning on line 88 in packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts#L88

Added line #L88 was not covered by tests
const deliveryCount = rawMessage.properties.headers?.['x-delivery-count'] || 0;
if (deliveryCount < this.maxAttempts) {
await new Promise((resolve) => setTimeout(resolve, this.exponentialBackoff.getDelay(deliveryCount)));
this.consumerChannel.nack(rawMessage, false, true);
this.connection.getConsumerChannel().nack(rawMessage, false, true);
this.logger.warn(`Message re-queued due ${inspect(e)}`);
} else {
this.consumerChannel.nack(rawMessage, false, false);
this.connection.getConsumerChannel().nack(rawMessage, false, false);
this.logger.error(`Message sent to dlq due ${inspect(e)}`);

Check warning on line 96 in packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts#L91-L96

Added lines #L91 - L96 were not covered by tests
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/ddd-toolkit/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ export * from './event-bus/event-bus.interface';
export * from './logger';
export * from './event-bus/event';
export * from './event-bus/exponential-backoff';
export * from './utils';

Check warning on line 11 in packages/ddd-toolkit/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit/src/index.ts#L8-L11

Added lines #L8 - L11 were not covered by tests

0 comments on commit 07030fd

Please sign in to comment.