diff --git a/packages/ddd-toolkit/src/outbox/mongo-outbox.spec.ts b/packages/ddd-toolkit/src/outbox/mongo-outbox.spec.ts new file mode 100644 index 0000000..d56be8a --- /dev/null +++ b/packages/ddd-toolkit/src/outbox/mongo-outbox.spec.ts @@ -0,0 +1,165 @@ +import { MongoMemoryReplSet } from 'mongodb-memory-server'; +import { MongoClient, ObjectId } from 'mongodb'; +import { MongoOutbox } from './mongo-outbox'; +import { Event } from '../event-bus/event'; + +class FooEvent extends Event<{ foo: string }> { + constructor(public readonly payload: { foo: string }) { + super(payload); + } +} + +class BarEvent extends Event<{ foo: string }> { + constructor(public readonly payload: { foo: string }) { + super(payload); + } +} + +describe('Mongo outbox', () => { + let mongodb: MongoMemoryReplSet; + let mongoClient: MongoClient; + + let outbox: MongoOutbox; + + const PublishEventsFnMock = jest.fn(); + + beforeAll(async () => { + mongodb = await MongoMemoryReplSet.create({ + replSet: { + count: 1, + dbName: 'test', + storageEngine: 'wiredTiger', + }, + }); + + mongoClient = new MongoClient(mongodb.getUri()); + await mongoClient.connect(); + outbox = new MongoOutbox(mongoClient, 'outbox', async (events) => PublishEventsFnMock(events)); + }); + + afterAll(async () => { + await outbox.dispose(); + await mongoClient.close(); + await mongodb.stop(); + }); + + afterEach(async () => { + jest.resetAllMocks(); + await outbox['outboxCollection'].deleteMany({}); + }); + + describe('When scheduleEvents with two events', () => { + it('should return two objectIds', async () => { + const events = [new FooEvent({ foo: 'bar' }), new BarEvent({ foo: 'bar' })]; + const session = mongoClient.startSession(); + const ids = await outbox.scheduleEvents(events, session); + expect(ids.length).toBe(2); + expect(ids.every(ObjectId.isValid)).toBe(true); + }); + + beforeEach(async () => { + const events = [new FooEvent({ foo: 'bar' }), new BarEvent({ foo: 'bar' })]; + const session = mongoClient.startSession(); + await outbox.scheduleEvents(events, session); + }); + + it('should insert two events in the outbox', async () => { + const events = await outbox['outboxCollection'].find().toArray(); + expect(events.length).toBe(2); + expect(events[0]).toMatchObject({ + _id: expect.any(ObjectId), + contextName: null, + event: expect.any(Object), + scheduledAt: expect.any(Date), + status: 'scheduled', + }); + }); + }); + + describe('Given two scheduled events', () => { + const events = [new FooEvent({ foo: 'bar' }), new BarEvent({ foo: 'bar' })]; + let eventIds: string[]; + beforeEach(async () => { + const session = mongoClient.startSession(); + eventIds = await outbox.scheduleEvents(events, session); + }); + + describe('Given a resolving publishEventsFn', () => { + beforeEach(() => { + PublishEventsFnMock.mockResolvedValue('ok'); + }); + + describe('When publish', () => { + it('should call publishEventsFn once', async () => { + await outbox.publishEvents(eventIds); + expect(PublishEventsFnMock).toBeCalled(); + }); + + it('should pass both events to publishEventsFn', async () => { + await outbox.publishEvents(eventIds); + expect(PublishEventsFnMock).toBeCalledWith(events); + }); + + it('should update the status of the events to published', async () => { + await outbox.publishEvents(eventIds); + const events = await outbox['outboxCollection'].find().toArray(); + expect(events.every((event) => event.status === 'published')).toBe(true); + }); + + it('should set the publishedAt date', async () => { + await outbox.publishEvents(eventIds); + const events = await outbox['outboxCollection'].find().toArray(); + expect(events[0].publishedAt).toEqual(expect.any(Date)); + }); + }); + }); + + describe('Given a rejecting publishEventsFn', () => { + beforeEach(() => { + PublishEventsFnMock.mockRejectedValue('error'); + }); + + describe('When publish', () => { + it('should call publishEventsFn once', async () => { + await outbox.publishEvents(eventIds); + expect(PublishEventsFnMock).toBeCalled(); + }); + + it('should not update the status of the events to published', async () => { + await outbox.publishEvents(eventIds); + const events = await outbox['outboxCollection'].find().toArray(); + expect(events.every((event) => event.status === 'scheduled')).toBe(true); + }); + }); + }); + + describe('When startMonitoring', () => { + it('after about 1 second it should publish them', async () => { + const now = Date.now(); + await outbox.init(); + await waitFor(() => expect(PublishEventsFnMock).toBeCalled(), 3000); + const elapsed = Date.now() - now; + console.log(`Elapsed: ${elapsed}ms`); + expect(elapsed).toBeGreaterThan(1000); + }); + }); + }); +}); + +async function waitFor(statement: () => Promise | void, timeout = 1000): Promise { + const startTime = Date.now(); + + let latestStatementError; + while (true) { + try { + await statement(); + return; + } catch (e) { + latestStatementError = e; + } + + if (Date.now() - startTime > timeout) throw latestStatementError; + + await new Promise((resolve) => setTimeout(resolve, 100)); + } +} diff --git a/packages/ddd-toolkit/src/outbox/mongo-outbox.ts b/packages/ddd-toolkit/src/outbox/mongo-outbox.ts new file mode 100644 index 0000000..4fceea4 --- /dev/null +++ b/packages/ddd-toolkit/src/outbox/mongo-outbox.ts @@ -0,0 +1,114 @@ +import { ClientSession, Collection, MongoClient, ObjectId } from 'mongodb'; +import { IEvent } from '../event-bus/event-bus.interface'; +import { ILogger } from '../logger'; +import { inspect } from 'util'; +import { difference, intersection } from 'lodash'; +import { IOutbox } from './outbox.interface'; + +type OutboxEventModel = { + event: IEvent; + scheduledAt: Date; + status: 'scheduled' | 'published'; + publishedAt?: Date; + contextName?: string; +}; + +export class MongoOutbox implements IOutbox { + private outboxCollection: Collection; + + private stopping = false; + + constructor( + private readonly mongoClient: MongoClient, + collectionName: string = 'outbox', + private readonly publishEventsFn: (events: IEvent[]) => Promise | void, + private readonly logger: ILogger = console, + private readonly contextName?: string, + private readonly monitoringIntervalMs = 500, + ) { + this.outboxCollection = mongoClient.db().collection(collectionName); + } + + public async init() { + void this.checkScheduledEvents([]); + } + + public async dispose() { + this.stopping = true; + await sleep(this.monitoringIntervalMs); + } + + public async scheduleEvents(events: IEvent[], clientSession: ClientSession): Promise { + const { insertedIds } = await this.outboxCollection.insertMany( + events.map((event) => ({ + event, + scheduledAt: new Date(), + status: 'scheduled', + contextName: this.contextName, + })), + { session: clientSession }, + ); + return Object.values(insertedIds).map((id) => id.toString()); + } + + public async publishEvents(eventIds: string[]): Promise { + const session = this.mongoClient.startSession(); + try { + await session.withTransaction(async () => { + const outboxModels = await this.outboxCollection + .find({ _id: { $in: eventIds.map((id) => new ObjectId(id)) }, status: 'scheduled' }, { session }) + .toArray(); + const events = outboxModels.map((model) => model.event); + await this.publishEventsFn(events); + const publishedIds = outboxModels.map((model) => model._id); + await this.outboxCollection.updateMany( + { + _id: { $in: publishedIds }, + status: 'scheduled', + }, + { + $set: { + status: 'published', + publishedAt: new Date(), + }, + }, + { session }, + ); + }); + } catch (e) { + this.logger.warn(`Failed to publish events ${eventIds.join(', ')}. ${inspect(e)}`); + } finally { + await session.endSession(); + } + } + + //FROM https://github.com/gpad/ms-practical-ws/blob/main/src/infra/outbox_pattern.ts + private async checkScheduledEvents(warningIds: string[]) { + try { + if (this.stopping) return; + await sleep(this.monitoringIntervalMs); + const currentIds = await this.retrieveScheduledEvents(); + const toPublish = intersection(currentIds, warningIds); + if (toPublish.length) { + this.logger.warn(`Events ${toPublish.join(', ')} are still scheduled.`); + await this.publishEvents(toPublish); + } + const nextWarning = difference(currentIds, toPublish); + void this.checkScheduledEvents(nextWarning); + } catch (e) { + this.logger.error(`Failed to check scheduled events. ${inspect(e)}`); + } + } + + private async retrieveScheduledEvents() { + const scheduledEvents = await this.outboxCollection + .find({ + status: 'scheduled', + contextName: this.contextName, + }) + .toArray(); + return scheduledEvents.map((event) => event._id.toString()); + } +} + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); diff --git a/packages/ddd-toolkit/src/outbox/outbox.interface.ts b/packages/ddd-toolkit/src/outbox/outbox.interface.ts new file mode 100644 index 0000000..bb8978f --- /dev/null +++ b/packages/ddd-toolkit/src/outbox/outbox.interface.ts @@ -0,0 +1,11 @@ +import { IEvent } from '../event-bus/event-bus.interface'; + +export interface IOutbox { + init(): Promise; + + dispose(): Promise; + + scheduleEvents(events: IEvent[], transaction: unknown): Promise; + + publishEvents(eventIds: string[]): Promise; +}