From 481658427aaef5dcd326821a427cb7a3c0901829 Mon Sep 17 00:00:00 2001 From: Gabriele Toselli Date: Fri, 29 Mar 2024 23:37:41 +0100 Subject: [PATCH] feat(core): add command bus --- .../src/command-bus/command-bus.interface.ts | 18 +++ .../ddd-toolkit/src/command-bus/command.ts | 9 ++ .../src/command-bus/local-command-bus.spec.ts | 143 ++++++++++++++++++ .../src/command-bus/local-command-bus.ts | 52 +++++++ .../src/event-bus/local-event-bus.spec.ts | 23 +-- packages/ddd-toolkit/src/utils.ts | 19 +++ 6 files changed, 242 insertions(+), 22 deletions(-) create mode 100644 packages/ddd-toolkit/src/command-bus/command-bus.interface.ts create mode 100644 packages/ddd-toolkit/src/command-bus/command.ts create mode 100644 packages/ddd-toolkit/src/command-bus/local-command-bus.spec.ts create mode 100644 packages/ddd-toolkit/src/command-bus/local-command-bus.ts create mode 100644 packages/ddd-toolkit/src/utils.ts diff --git a/packages/ddd-toolkit/src/command-bus/command-bus.interface.ts b/packages/ddd-toolkit/src/command-bus/command-bus.interface.ts new file mode 100644 index 0000000..ae625a5 --- /dev/null +++ b/packages/ddd-toolkit/src/command-bus/command-bus.interface.ts @@ -0,0 +1,18 @@ +export interface ICommand { + name: string; + payload: T; +} + +export interface ICommandClass> { + new (payload: unknown): C; +} + +export interface ICommandHandler> { + handle: (command: C) => Promise; +} + +export interface ICommandBus { + register>(command: ICommandClass, handler: ICommandHandler): void; + + send>(command: C): Promise; +} diff --git a/packages/ddd-toolkit/src/command-bus/command.ts b/packages/ddd-toolkit/src/command-bus/command.ts new file mode 100644 index 0000000..067ae67 --- /dev/null +++ b/packages/ddd-toolkit/src/command-bus/command.ts @@ -0,0 +1,9 @@ +import { ICommand } from './command-bus.interface'; + +export abstract class Command implements ICommand { + readonly name: string; + + protected constructor(public readonly payload: TPayload) { + this.name = this.constructor.name; + } +} diff --git a/packages/ddd-toolkit/src/command-bus/local-command-bus.spec.ts b/packages/ddd-toolkit/src/command-bus/local-command-bus.spec.ts new file mode 100644 index 0000000..d87cf4e --- /dev/null +++ b/packages/ddd-toolkit/src/command-bus/local-command-bus.spec.ts @@ -0,0 +1,143 @@ +import { LocalCommandBus } from './local-command-bus'; +import { Command } from './command'; +import { loggerMock } from '../logger'; +import { waitFor } from '../utils'; + +class FooCommand extends Command<{ foo: string }> { + constructor(public readonly payload: { foo: string }) { + super(payload); + } +} + +class BarCommand extends Command<{ foo: string }> { + constructor(public readonly payload: { foo: string }) { + super(payload); + } +} + +describe('LocalCommandBus', () => { + describe('Given an command bus', () => { + let commandBus: LocalCommandBus; + + beforeEach(() => { + commandBus = new LocalCommandBus(loggerMock, 3, 100); + }); + + afterEach(() => { + jest.resetAllMocks(); + }); + + describe('Given no registered handler to foo command', () => { + describe('When send a foo command', () => { + it('Should log warning message', async () => { + const command = new FooCommand({ foo: 'bar' }); + await commandBus.send(command); + + expect(loggerMock.warn).toBeCalledWith(`No handler found for ${FooCommand.name}`); + }); + }); + }); + + describe('Given one registered handler to foo command', () => { + const handler1Mock = jest.fn(); + + class FooCommandHandler { + async handle(command: FooCommand) { + await handler1Mock(command); + } + } + + beforeEach(() => { + commandBus.register(FooCommand, new FooCommandHandler()); + }); + + describe('When send a foo command', () => { + it('Should call handler with commandName and payload', async () => { + const command = new FooCommand({ foo: 'bar' }); + await commandBus.send(command); + + await waitFor(() => expect(handler1Mock).toBeCalledWith(command)); + }); + }); + + describe('Given a handler registered for bar command', () => { + const handler3Mock = jest.fn(); + + class BarCommandHandler { + async handle(command: BarCommand) { + await handler3Mock(command); + } + } + + beforeEach(() => { + commandBus.register(BarCommand, new BarCommandHandler()); + }); + + describe('When send FooCommand', () => { + it('Should call only FooCommand handler', async () => { + const command = new FooCommand({ foo: 'bar' }); + await commandBus.send(command); + + await waitFor(() => expect(handler1Mock).toBeCalledWith(command)); + expect(handler3Mock).not.toBeCalled(); + }); + }); + + describe('When send BarCommand', () => { + it('Should call only BarCommand handler', async () => { + const command = new BarCommand({ foo: 'bar' }); + await commandBus.send(command); + + expect(handler1Mock).not.toBeCalled(); + expect(handler3Mock).toBeCalledWith(command); + }); + }); + }); + }); + + describe('Given one registered handler which fails the first execution but not the second', () => { + const handlerMock = jest.fn(); + + class FooCommandHandlerOk { + async handle(command: FooCommand) { + await handlerMock(command); + } + } + + beforeEach(() => { + handlerMock.mockRejectedValueOnce(new Error('ko')).mockResolvedValueOnce('ok'); + commandBus.register(FooCommand, new FooCommandHandlerOk()); + }); + + describe('When send command', () => { + const command = new FooCommand({ foo: 'bar' }); + + beforeEach(async () => await commandBus.send(command)); + + it('handler should be called two times', async () => { + await waitFor(() => { + expect(handlerMock).toBeCalledTimes(2); + }); + }); + + it('should not log error for failing handler', async () => { + await waitFor(() => { + expect(handlerMock).toBeCalledTimes(2); + expect(loggerMock.error).not.toBeCalled(); + }); + }); + + it('should log one retry for failing handler', async () => { + await waitFor(() => { + expect(loggerMock.warn).toBeCalledTimes(1); + expect(loggerMock.warn).toBeCalledWith( + expect.stringContaining( + 'FooCommandHandlerOk failed to handle FooCommand command. Attempt 1/3', + ), + ); + }); + }); + }); + }); + }); +}); diff --git a/packages/ddd-toolkit/src/command-bus/local-command-bus.ts b/packages/ddd-toolkit/src/command-bus/local-command-bus.ts new file mode 100644 index 0000000..051b962 --- /dev/null +++ b/packages/ddd-toolkit/src/command-bus/local-command-bus.ts @@ -0,0 +1,52 @@ +import { ILogger } from '../logger'; +import { ICommand, ICommandBus, ICommandClass, ICommandHandler } from './command-bus.interface'; +import { ExponentialBackoff, IRetryMechanism } from '../event-bus/exponential-backoff'; +import { inspect } from 'util'; + +export class LocalCommandBus implements ICommandBus { + private readonly retryMechanism: IRetryMechanism; + + private handlers: { [key: string]: ICommandHandler> } = {}; + + constructor( + private logger: ILogger, + private readonly retryMaxAttempts = 5, + retryInitialDelay = 100, + ) { + this.retryMechanism = new ExponentialBackoff(retryInitialDelay); + } + + public register>(command: ICommandClass, handler: ICommandHandler): void { + if (this.handlers[command.name]) throw new Error(`Command ${command.name} is already registered`); + this.handlers[command.name] = handler; + } + + public async send>(command: C): Promise { + const handler = this.handlers[command.name] as ICommandHandler; + if (!handler) { + this.logger.warn(`No handler found for ${command.name}`); + return; + } + + void this.handleCommand(command, handler); + } + + private async handleCommand>(command: C, handler: ICommandHandler, attempt = 0) { + try { + await handler.handle(command); + } catch (error) { + if (attempt < this.retryMaxAttempts) { + const nextAttempt = attempt + 1; + const delay = this.retryMechanism.getDelay(nextAttempt); + this.logger.warn( + `${handler.constructor.name} failed to handle ${command.name} command. Attempt ${nextAttempt}/${this.retryMaxAttempts}. Delaying for ${delay}ms.`, + ); + setTimeout(() => this.handleCommand(command, handler, nextAttempt), delay); + return; + } + this.logger.error( + `${handler.constructor.name} failed to handle ${command.name} command due to ${inspect(error)}`, + ); + } + } +} diff --git a/packages/ddd-toolkit/src/event-bus/local-event-bus.spec.ts b/packages/ddd-toolkit/src/event-bus/local-event-bus.spec.ts index b28ba40..3dcc47c 100644 --- a/packages/ddd-toolkit/src/event-bus/local-event-bus.spec.ts +++ b/packages/ddd-toolkit/src/event-bus/local-event-bus.spec.ts @@ -1,6 +1,7 @@ import { LocalEventBus } from './local-event-bus'; import { Event } from './event'; import { loggerMock } from '../logger'; +import { sleep, waitFor } from '../utils'; class FooEvent extends Event<{ foo: string }> { constructor(public readonly payload: { foo: string }) { @@ -229,25 +230,3 @@ describe('LocalEventBus', () => { }); }); }); - -async function waitFor(statement: () => void, timeout = 1000): Promise { - const startTime = Date.now(); - - let latestStatementError; - while (true) { - try { - statement(); - return; - } catch (e) { - latestStatementError = e; - } - - if (Date.now() - startTime > timeout) throw latestStatementError; - - await new Promise((resolve) => setTimeout(resolve, 100)); - } -} - -async function sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); -} diff --git a/packages/ddd-toolkit/src/utils.ts b/packages/ddd-toolkit/src/utils.ts new file mode 100644 index 0000000..9c7f7c2 --- /dev/null +++ b/packages/ddd-toolkit/src/utils.ts @@ -0,0 +1,19 @@ +export const waitFor = async (statement: () => void | Promise, timeout = 1000): Promise => { + const startTime = Date.now(); + + let latestStatementError; + while (true) { + try { + await statement(); + return; + } catch (e) { + latestStatementError = e; + } + + if (Date.now() - startTime > timeout) throw latestStatementError; + + await new Promise((resolve) => setTimeout(resolve, 100)); + } +}; + +export const sleep = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms));