Skip to content

Commit

Permalink
feat(core): add mongo outbox implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gtoselli committed Mar 30, 2024
1 parent 8b4e5e0 commit 73d9c7a
Show file tree
Hide file tree
Showing 3 changed files with 290 additions and 0 deletions.
165 changes: 165 additions & 0 deletions packages/ddd-toolkit/src/outbox/mongo-outbox.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import { MongoMemoryReplSet } from 'mongodb-memory-server';
import { MongoClient, ObjectId } from 'mongodb';
import { MongoOutbox } from './mongo-outbox';
import { Event } from '../event-bus/event';

class FooEvent extends Event<{ foo: string }> {
constructor(public readonly payload: { foo: string }) {
super(payload);
}
}

class BarEvent extends Event<{ foo: string }> {
constructor(public readonly payload: { foo: string }) {
super(payload);
}
}

describe('Mongo outbox', () => {
let mongodb: MongoMemoryReplSet;
let mongoClient: MongoClient;

let outbox: MongoOutbox;

const PublishEventsFnMock = jest.fn();

beforeAll(async () => {
mongodb = await MongoMemoryReplSet.create({
replSet: {
count: 1,
dbName: 'test',
storageEngine: 'wiredTiger',
},
});

mongoClient = new MongoClient(mongodb.getUri());
await mongoClient.connect();
outbox = new MongoOutbox(mongoClient, 'outbox', async (events) => PublishEventsFnMock(events));
});

afterAll(async () => {
await outbox.dispose();
await mongoClient.close();
await mongodb.stop();
});

afterEach(async () => {
jest.resetAllMocks();
await outbox['outboxCollection'].deleteMany({});
});

describe('When scheduleEvents with two events', () => {
it('should return two objectIds', async () => {
const events = [new FooEvent({ foo: 'bar' }), new BarEvent({ foo: 'bar' })];
const session = mongoClient.startSession();
const ids = await outbox.scheduleEvents(events, session);
expect(ids.length).toBe(2);
expect(ids.every(ObjectId.isValid)).toBe(true);
});

beforeEach(async () => {
const events = [new FooEvent({ foo: 'bar' }), new BarEvent({ foo: 'bar' })];
const session = mongoClient.startSession();
await outbox.scheduleEvents(events, session);
});

it('should insert two events in the outbox', async () => {
const events = await outbox['outboxCollection'].find().toArray();
expect(events.length).toBe(2);
expect(events[0]).toMatchObject({
_id: expect.any(ObjectId),
contextName: null,
event: expect.any(Object),
scheduledAt: expect.any(Date),
status: 'scheduled',
});
});
});

describe('Given two scheduled events', () => {
const events = [new FooEvent({ foo: 'bar' }), new BarEvent({ foo: 'bar' })];
let eventIds: string[];
beforeEach(async () => {
const session = mongoClient.startSession();
eventIds = await outbox.scheduleEvents(events, session);
});

describe('Given a resolving publishEventsFn', () => {
beforeEach(() => {
PublishEventsFnMock.mockResolvedValue('ok');
});

describe('When publish', () => {
it('should call publishEventsFn once', async () => {
await outbox.publishEvents(eventIds);
expect(PublishEventsFnMock).toBeCalled();
});

it('should pass both events to publishEventsFn', async () => {
await outbox.publishEvents(eventIds);
expect(PublishEventsFnMock).toBeCalledWith(events);
});

it('should update the status of the events to published', async () => {
await outbox.publishEvents(eventIds);
const events = await outbox['outboxCollection'].find().toArray();
expect(events.every((event) => event.status === 'published')).toBe(true);
});

it('should set the publishedAt date', async () => {
await outbox.publishEvents(eventIds);
const events = await outbox['outboxCollection'].find().toArray();
expect(events[0].publishedAt).toEqual(expect.any(Date));
});
});
});

describe('Given a rejecting publishEventsFn', () => {
beforeEach(() => {
PublishEventsFnMock.mockRejectedValue('error');
});

describe('When publish', () => {
it('should call publishEventsFn once', async () => {
await outbox.publishEvents(eventIds);
expect(PublishEventsFnMock).toBeCalled();
});

it('should not update the status of the events to published', async () => {
await outbox.publishEvents(eventIds);
const events = await outbox['outboxCollection'].find().toArray();
expect(events.every((event) => event.status === 'scheduled')).toBe(true);
});
});
});

describe('When startMonitoring', () => {
it('after about 1 second it should publish them', async () => {
const now = Date.now();
await outbox.init();
await waitFor(() => expect(PublishEventsFnMock).toBeCalled(), 3000);
const elapsed = Date.now() - now;
console.log(`Elapsed: ${elapsed}ms`);
expect(elapsed).toBeGreaterThan(1000);
});
});
});
});

async function waitFor(statement: () => Promise<void> | void, timeout = 1000): Promise<void> {
const startTime = Date.now();

let latestStatementError;
while (true) {
try {
await statement();
return;
} catch (e) {
latestStatementError = e;
}

if (Date.now() - startTime > timeout) throw latestStatementError;

await new Promise((resolve) => setTimeout(resolve, 100));
}
}
114 changes: 114 additions & 0 deletions packages/ddd-toolkit/src/outbox/mongo-outbox.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { ClientSession, Collection, MongoClient, ObjectId } from 'mongodb';
import { IEvent } from '../event-bus/event-bus.interface';
import { ILogger } from '../logger';
import { inspect } from 'util';
import { difference, intersection } from 'lodash';
import { IOutbox } from './outbox.interface';

type OutboxEventModel = {
event: IEvent<unknown>;
scheduledAt: Date;
status: 'scheduled' | 'published';
publishedAt?: Date;
contextName?: string;
};

export class MongoOutbox implements IOutbox {
private outboxCollection: Collection<OutboxEventModel>;

private stopping = false;

constructor(
private readonly mongoClient: MongoClient,
collectionName: string = 'outbox',
private readonly publishEventsFn: (events: IEvent<unknown>[]) => Promise<void> | void,
private readonly logger: ILogger = console,
private readonly contextName?: string,
private readonly monitoringIntervalMs = 500,
) {
this.outboxCollection = mongoClient.db().collection(collectionName);
}

public async init() {
void this.checkScheduledEvents([]);
}

public async dispose() {
this.stopping = true;
await sleep(this.monitoringIntervalMs);
}

public async scheduleEvents(events: IEvent<unknown>[], clientSession: ClientSession): Promise<string[]> {
const { insertedIds } = await this.outboxCollection.insertMany(
events.map((event) => ({
event,
scheduledAt: new Date(),
status: 'scheduled',
contextName: this.contextName,
})),
{ session: clientSession },
);
return Object.values(insertedIds).map((id) => id.toString());
}

public async publishEvents(eventIds: string[]): Promise<void> {
const session = this.mongoClient.startSession();
try {
await session.withTransaction(async () => {
const outboxModels = await this.outboxCollection
.find({ _id: { $in: eventIds.map((id) => new ObjectId(id)) }, status: 'scheduled' }, { session })
.toArray();
const events = outboxModels.map((model) => model.event);
await this.publishEventsFn(events);
const publishedIds = outboxModels.map((model) => model._id);
await this.outboxCollection.updateMany(
{
_id: { $in: publishedIds },
status: 'scheduled',
},
{
$set: {
status: 'published',
publishedAt: new Date(),
},
},
{ session },
);
});
} catch (e) {
this.logger.warn(`Failed to publish events ${eventIds.join(', ')}. ${inspect(e)}`);
} finally {
await session.endSession();
}
}

//FROM https://github.com/gpad/ms-practical-ws/blob/main/src/infra/outbox_pattern.ts
private async checkScheduledEvents(warningIds: string[]) {
try {
if (this.stopping) return;
await sleep(this.monitoringIntervalMs);
const currentIds = await this.retrieveScheduledEvents();
const toPublish = intersection(currentIds, warningIds);
if (toPublish.length) {
this.logger.warn(`Events ${toPublish.join(', ')} are still scheduled.`);
await this.publishEvents(toPublish);
}
const nextWarning = difference(currentIds, toPublish);
void this.checkScheduledEvents(nextWarning);
} catch (e) {
this.logger.error(`Failed to check scheduled events. ${inspect(e)}`);
}
}

private async retrieveScheduledEvents() {
const scheduledEvents = await this.outboxCollection
.find({
status: 'scheduled',
contextName: this.contextName,
})
.toArray();
return scheduledEvents.map((event) => event._id.toString());
}
}

const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
11 changes: 11 additions & 0 deletions packages/ddd-toolkit/src/outbox/outbox.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { IEvent } from '../event-bus/event-bus.interface';

export interface IOutbox {
init(): Promise<void>;

dispose(): Promise<void>;

scheduleEvents(events: IEvent<unknown>[], transaction: unknown): Promise<string[]>;

publishEvents(eventIds: string[]): Promise<void>;
}

0 comments on commit 73d9c7a

Please sign in to comment.