Skip to content

Commit

Permalink
feat(core): add publish with concurrency control in outbox
Browse files Browse the repository at this point in the history
  • Loading branch information
gtoselli committed Mar 30, 2024
1 parent 73d9c7a commit 2f0b54f
Showing 1 changed file with 42 additions and 2 deletions.
44 changes: 42 additions & 2 deletions packages/ddd-toolkit/src/outbox/mongo-outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import { IOutbox } from './outbox.interface';
type OutboxEventModel = {
event: IEvent<unknown>;
scheduledAt: Date;
status: 'scheduled' | 'published';
status: 'scheduled' | 'processing' | 'published';
publishedAt?: Date;
contextName?: string;
locked?: boolean;
};

export class MongoOutbox implements IOutbox {
Expand All @@ -30,6 +31,7 @@ export class MongoOutbox implements IOutbox {
}

public async init() {
this.logger.debug(`Starting outbox monitoring with interval ${this.monitoringIntervalMs}ms`);
void this.checkScheduledEvents([]);
}

Expand All @@ -48,6 +50,7 @@ export class MongoOutbox implements IOutbox {
})),
{ session: clientSession },
);
this.logger.debug(`Scheduled events ${Object.values(insertedIds).join(', ')}`);
return Object.values(insertedIds).map((id) => id.toString());
}

Expand All @@ -58,6 +61,7 @@ export class MongoOutbox implements IOutbox {
const outboxModels = await this.outboxCollection
.find({ _id: { $in: eventIds.map((id) => new ObjectId(id)) }, status: 'scheduled' }, { session })
.toArray();
if (!outboxModels.length) return;
const events = outboxModels.map((model) => model.event);
await this.publishEventsFn(events);
const publishedIds = outboxModels.map((model) => model._id);
Expand Down Expand Up @@ -91,12 +95,13 @@ export class MongoOutbox implements IOutbox {
const toPublish = intersection(currentIds, warningIds);
if (toPublish.length) {
this.logger.warn(`Events ${toPublish.join(', ')} are still scheduled.`);
await this.publishEvents(toPublish);
await Promise.all(toPublish.map((eventId) => this.publishEventWithConcurrencyControl(eventId)));
}
const nextWarning = difference(currentIds, toPublish);
void this.checkScheduledEvents(nextWarning);
} catch (e) {
this.logger.error(`Failed to check scheduled events. ${inspect(e)}`);
void this.checkScheduledEvents([]);
}
}

Expand All @@ -109,6 +114,41 @@ export class MongoOutbox implements IOutbox {
.toArray();
return scheduledEvents.map((event) => event._id.toString());
}

private async publishEventWithConcurrencyControl(eventId: string) {
const session = this.mongoClient.startSession();
try {
await session.withTransaction(async () => {
const { modifiedCount } = await this.outboxCollection.updateOne(
{ _id: new ObjectId(eventId), status: 'scheduled' },
{ $set: { status: 'processing' } },
{ session },
);
if (modifiedCount !== 1) {
this.logger.debug(`Event ${eventId} is already being processed.`);
return;
}
this.logger.debug(`Event ${eventId} is being processed.`);

const outBoxModel = await this.outboxCollection.findOne({ _id: new ObjectId(eventId) }, { session });
if (!outBoxModel) return;

await this.publishEventsFn([outBoxModel.event]);
await this.outboxCollection.updateOne(
{ _id: new ObjectId(eventId) },
{
$set: {
status: 'published',
publishedAt: new Date(),
},
},
{ session },
);
});
} finally {
await session.endSession();
}
}
}

const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

0 comments on commit 2f0b54f

Please sign in to comment.