Skip to content

Commit

Permalink
feat(core): add outbox to mongo aggregate repo
Browse files Browse the repository at this point in the history
  • Loading branch information
gtoselli committed Mar 30, 2024
1 parent bd84bd1 commit 74885dc
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 60 deletions.
19 changes: 1 addition & 18 deletions packages/ddd-toolkit/src/outbox/mongo-outbox.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { MongoMemoryReplSet } from 'mongodb-memory-server';
import { MongoClient, ObjectId } from 'mongodb';
import { MongoOutbox } from './mongo-outbox';
import { Event } from '../event-bus/event';
import { waitFor } from '../utils';

class FooEvent extends Event<{ foo: string }> {
constructor(public readonly payload: { foo: string }) {
Expand Down Expand Up @@ -145,21 +146,3 @@ describe('Mongo outbox', () => {
});
});
});

async function waitFor(statement: () => Promise<void> | void, timeout = 1000): Promise<void> {
const startTime = Date.now();

let latestStatementError;
while (true) {
try {
await statement();
return;
} catch (e) {
latestStatementError = e;
}

if (Date.now() - startTime > timeout) throw latestStatementError;

await new Promise((resolve) => setTimeout(resolve, 100));
}
}
1 change: 1 addition & 0 deletions packages/ddd-toolkit/src/outbox/mongo-outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export class MongoOutbox implements IOutbox {
},
{ session },
);
this.logger.debug(`Published events ${eventIds.join(', ')}`);
});
} catch (e) {
this.logger.warn(`Failed to publish events ${eventIds.join(', ')}. ${inspect(e)}`);
Expand Down
130 changes: 88 additions & 42 deletions packages/ddd-toolkit/src/repo/mongo-aggregate-repo.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import { IRepoHooks } from './repo-hooks';
import { Collection, Document, MongoClient } from 'mongodb';
import { ClientSession, Collection, Document, MongoClient } from 'mongodb';
import { ISerializer } from './serializer.interface';
import { merge } from 'lodash';
import { DuplicatedIdError, OptimisticLockError, RepoHookError } from '../errors';
import { ILogger } from '../logger';
import { IInit } from '../init.interface';
import { IEvent } from '../event-bus/event-bus.interface';
import { IOutbox } from '../outbox/outbox.interface';

export interface IAggregateRepo<A> {
// TODO add id as a generic type
getById: (id: string) => Promise<WithVersion<A> | null>;
save: (aggregate: A) => Promise<void>;
}

export interface IAggregateRepoWithOutbox<A> extends IAggregateRepo<A> {
saveAndPublish: (aggregate: A, eventsToBePublished?: IEvent<unknown>[]) => Promise<void>;
}

export type DocumentWithId = { id: string } & Document;

export type WithVersion<T> = T & { __version: number };
Expand All @@ -21,15 +27,19 @@ type WithOptionalVersion<T> = T & { __version?: number };
// TODO probably we should create a dedicated interface whit like DocumentWithIdAndTimestamps
const MONGODB_UNIQUE_INDEX_CONSTRAINT_ERROR = 11000;

export class MongoAggregateRepo<A, AM extends DocumentWithId> implements IAggregateRepo<A>, IInit {
export class MongoAggregateRepo<A, AM extends DocumentWithId>
implements IAggregateRepo<A>, IAggregateRepoWithOutbox<A>, IInit
{
protected readonly collection: Collection<AM>;

constructor(
protected readonly serializer: ISerializer<A, AM>,
protected readonly mongoClient: MongoClient,
protected readonly collectionName: string,
collection?: Collection<AM>,
protected readonly repoHooks?: IRepoHooks<AM>,
protected readonly logger: ILogger = console,
protected readonly outbox?: IOutbox,
) {
if (!collection) {
this.collection = this.mongoClient.db().collection(this.collectionName);
Expand All @@ -43,66 +53,102 @@ export class MongoAggregateRepo<A, AM extends DocumentWithId> implements IAggreg
async save(aggregate: WithOptionalVersion<A>) {
const aggregateModel = this.serializer.aggregateToModel(aggregate);
const aggregateVersion = aggregate.__version || 0;
const session = this.mongoClient.startSession();
try {
await session.withTransaction(async () => {
await this.upsertWriteModel(aggregateModel, aggregateVersion, session);
await this.handleRepoHooks(aggregateModel, session);
});
} catch (e) {
this.catchSaveTransaction(e, aggregateVersion, aggregateModel);
} finally {
await session.endSession();
}
}

public async saveAndPublish(aggregate: WithOptionalVersion<A>, eventsToBePublished: IEvent<unknown>[] = []) {
const aggregateModel = this.serializer.aggregateToModel(aggregate);
const aggregateVersion = aggregate.__version || 0;

const session = this.mongoClient.startSession();

const scheduledEventIds: string[] = [];
try {
await session.withTransaction(async () => {
await this.collection.updateOne(
{ id: aggregateModel.id, __version: aggregateVersion } as any,
{
$set: {
...aggregateModel,
__version: aggregateVersion + 1,
updatedAt: new Date(),
},
$setOnInsert: { createdAt: new Date() } as any,
},
{ upsert: true, session, ignoreUndefined: true },
);
this.logger.debug(
`Aggregate with id ${
aggregateModel.id
} and version ${aggregateVersion} saved successfully. ${JSON.stringify(aggregateModel)}`,
);

try {
if (this.repoHooks) {
await this.repoHooks.onSave(aggregateModel, session);
this.logger.debug(`RepoHook onSave method executed successfully.`);
}
} catch (e) {
throw new RepoHookError(`RepoHook onSave method failed with error: ${e.message}`);
}
await this.upsertWriteModel(aggregateModel, aggregateVersion, session);
await this.handleRepoHooks(aggregateModel, session);
await this.handleOutbox(eventsToBePublished, session);
});
} catch (e) {
// FIXME verify the field name because constraints could be added on other fields
if (e.code === MONGODB_UNIQUE_INDEX_CONSTRAINT_ERROR) {
if (this.isTheFirstVersion(aggregateVersion)) {
throw new DuplicatedIdError(
`Cannot save aggregate with id: ${aggregateModel.id} due to duplicated id.`,
);
} else {
throw new OptimisticLockError(
`Cannot save aggregate with id: ${aggregateModel.id} due to optimistic locking.`,
);
}
}
throw e;
this.catchSaveTransaction(e, aggregateVersion, aggregateModel);

Check warning on line 83 in packages/ddd-toolkit/src/repo/mongo-aggregate-repo.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit/src/repo/mongo-aggregate-repo.ts#L83

Added line #L83 was not covered by tests
} finally {
await session.endSession();
}

if (this.outbox && scheduledEventIds.length) void this.outbox.publishEvents(scheduledEventIds);
}

// TODO evaluate to implement getOrThrow
async getById(id: string): Promise<WithVersion<A> | null> {
public async getById(id: string): Promise<WithVersion<A> | null> {
const aggregateModel = await this.collection.findOne({ id: id } as any);
this.logger.debug(`Retrieving aggregate ${id}. Found: ${JSON.stringify(aggregateModel)}`);
if (!aggregateModel) return null;
const aggregate = this.serializer.modelToAggregate(aggregateModel as AM);
return merge<A, { __version: number }>(aggregate, { __version: aggregateModel.__version });
}

private async handleOutbox(eventsToBePublished: IEvent<unknown>[], session: ClientSession) {
if (!this.outbox) throw new Error('Outbox not configured');
return await this.outbox.scheduleEvents(eventsToBePublished, session);
}

private catchSaveTransaction(e: any, aggregateVersion: number, aggregateModel: AM) {
// FIXME verify the field name because constraints could be added on other fields
if (e.code === MONGODB_UNIQUE_INDEX_CONSTRAINT_ERROR) {
if (this.isTheFirstVersion(aggregateVersion)) {
throw new DuplicatedIdError(
`Cannot save aggregate with id: ${aggregateModel.id} due to duplicated id.`,
);
} else {
throw new OptimisticLockError(
`Cannot save aggregate with id: ${aggregateModel.id} due to optimistic locking.`,
);
}
}
throw e;

Check warning on line 118 in packages/ddd-toolkit/src/repo/mongo-aggregate-repo.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit/src/repo/mongo-aggregate-repo.ts#L118

Added line #L118 was not covered by tests
}

private async handleRepoHooks(aggregateModel: AM, session: ClientSession) {
try {
if (this.repoHooks) {
await this.repoHooks.onSave(aggregateModel, session);
this.logger.debug(`RepoHook onSave method executed successfully.`);

Check warning on line 125 in packages/ddd-toolkit/src/repo/mongo-aggregate-repo.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit/src/repo/mongo-aggregate-repo.ts#L124-L125

Added lines #L124 - L125 were not covered by tests
}
} catch (e) {
throw new RepoHookError(`RepoHook onSave method failed with error: ${e.message}`);

Check warning on line 128 in packages/ddd-toolkit/src/repo/mongo-aggregate-repo.ts

View check run for this annotation

Codecov / codecov/patch

packages/ddd-toolkit/src/repo/mongo-aggregate-repo.ts#L128

Added line #L128 was not covered by tests
}
}

private async upsertWriteModel(aggregateModel: AM, aggregateVersion: number, session: ClientSession) {
await this.collection.updateOne(
{ id: aggregateModel.id, __version: aggregateVersion } as any,
{
$set: {
...aggregateModel,
__version: aggregateVersion + 1,
updatedAt: new Date(),
},
$setOnInsert: { createdAt: new Date() } as any,
},
{ upsert: true, session, ignoreUndefined: true },
);
this.logger.debug(
`Aggregate with id ${
aggregateModel.id
} and version ${aggregateVersion} saved successfully. ${JSON.stringify(aggregateModel)}`,
);
}

private isTheFirstVersion(aggregateVersion: number) {
return aggregateVersion === 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@ import { MongoAggregateRepo } from '../mongo-aggregate-repo';
import { MongoMemoryReplSet } from 'mongodb-memory-server';
import { MongoClient } from 'mongodb';
import { TestAggregate, TestModel, TestSerializer } from './example.serializer';
import { MongoOutbox } from '../../outbox/mongo-outbox';
import { Event } from '../../event-bus/event';
import { IOutbox } from '../../outbox/outbox.interface';
import { waitFor } from '../../utils';

describe('MongoAggregateRepo MongoDB Integration', () => {
let mongodb: MongoMemoryReplSet;
let mongoClient: MongoClient;
let aggregateRepo: MongoAggregateRepo<TestAggregate, TestModel>;
let outbox: IOutbox;
const collectionName = 'collectionName';

const EventBusPublishMock = jest.fn();

beforeAll(async () => {
mongodb = await MongoMemoryReplSet.create({
replSet: {
Expand All @@ -20,12 +27,21 @@ describe('MongoAggregateRepo MongoDB Integration', () => {
mongoClient = new MongoClient(mongodb.getUri());
await mongoClient.connect();

outbox = new MongoOutbox(mongoClient, undefined, async (events) => {
await EventBusPublishMock(events);
});

aggregateRepo = new MongoAggregateRepo<TestAggregate, TestModel>(
new TestSerializer(),
mongoClient,
collectionName,
undefined,
undefined,
undefined,
outbox,
);
await aggregateRepo.init();
await outbox.init();
});

afterEach(async () => {
Expand All @@ -34,6 +50,7 @@ describe('MongoAggregateRepo MongoDB Integration', () => {
});

afterAll(async () => {
await outbox.dispose();
await mongoClient.close();
await mongodb.stop();
});
Expand Down Expand Up @@ -119,4 +136,23 @@ describe('MongoAggregateRepo MongoDB Integration', () => {
});
});
});

describe('Outbox', () => {
class FooEvent extends Event<{ foo: string }> {
constructor(public readonly payload: { foo: string }) {
super(payload);
}
}

describe('When saveAndPublishEvents is called', () => {
it('should publish the events', async () => {
const aggregate = { id: 'foo-id', data: 'value' };
const events = [new FooEvent({ foo: 'bar' })];
await aggregateRepo.saveAndPublish(aggregate, events);
await waitFor(() => {
expect(EventBusPublishMock).toBeCalledWith(events);
});
});
});
});
});

0 comments on commit 74885dc

Please sign in to comment.