Skip to content

Commit

Permalink
Merge pull request #30 from fizzbuds/29-command-bus
Browse files Browse the repository at this point in the history
Command bus
  • Loading branch information
lucagiove authored Mar 30, 2024
2 parents 9d6ab6c + 4816584 commit 33db0f5
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 22 deletions.
18 changes: 18 additions & 0 deletions packages/ddd-toolkit/src/command-bus/command-bus.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
export interface ICommand<T> {
name: string;
payload: T;
}

export interface ICommandClass<C extends ICommand<unknown>> {
new (payload: unknown): C;
}

export interface ICommandHandler<C extends ICommand<unknown>> {
handle: (command: C) => Promise<void>;
}

export interface ICommandBus {
register<C extends ICommand<unknown>>(command: ICommandClass<C>, handler: ICommandHandler<C>): void;

send<C extends ICommand<unknown>>(command: C): Promise<void>;
}
9 changes: 9 additions & 0 deletions packages/ddd-toolkit/src/command-bus/command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { ICommand } from './command-bus.interface';

export abstract class Command<TPayload> implements ICommand<TPayload> {
readonly name: string;

protected constructor(public readonly payload: TPayload) {
this.name = this.constructor.name;
}
}
143 changes: 143 additions & 0 deletions packages/ddd-toolkit/src/command-bus/local-command-bus.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { LocalCommandBus } from './local-command-bus';
import { Command } from './command';
import { loggerMock } from '../logger';
import { waitFor } from '../utils';

class FooCommand extends Command<{ foo: string }> {
constructor(public readonly payload: { foo: string }) {
super(payload);
}
}

class BarCommand extends Command<{ foo: string }> {
constructor(public readonly payload: { foo: string }) {
super(payload);
}
}

describe('LocalCommandBus', () => {
describe('Given an command bus', () => {
let commandBus: LocalCommandBus;

beforeEach(() => {
commandBus = new LocalCommandBus(loggerMock, 3, 100);
});

afterEach(() => {
jest.resetAllMocks();
});

describe('Given no registered handler to foo command', () => {
describe('When send a foo command', () => {
it('Should log warning message', async () => {
const command = new FooCommand({ foo: 'bar' });
await commandBus.send(command);

expect(loggerMock.warn).toBeCalledWith(`No handler found for ${FooCommand.name}`);
});
});
});

describe('Given one registered handler to foo command', () => {
const handler1Mock = jest.fn();

class FooCommandHandler {
async handle(command: FooCommand) {
await handler1Mock(command);
}
}

beforeEach(() => {
commandBus.register(FooCommand, new FooCommandHandler());
});

describe('When send a foo command', () => {
it('Should call handler with commandName and payload', async () => {
const command = new FooCommand({ foo: 'bar' });
await commandBus.send(command);

await waitFor(() => expect(handler1Mock).toBeCalledWith(command));
});
});

describe('Given a handler registered for bar command', () => {
const handler3Mock = jest.fn();

class BarCommandHandler {
async handle(command: BarCommand) {
await handler3Mock(command);
}
}

beforeEach(() => {
commandBus.register(BarCommand, new BarCommandHandler());
});

describe('When send FooCommand', () => {
it('Should call only FooCommand handler', async () => {
const command = new FooCommand({ foo: 'bar' });
await commandBus.send(command);

await waitFor(() => expect(handler1Mock).toBeCalledWith(command));
expect(handler3Mock).not.toBeCalled();
});
});

describe('When send BarCommand', () => {
it('Should call only BarCommand handler', async () => {
const command = new BarCommand({ foo: 'bar' });
await commandBus.send(command);

expect(handler1Mock).not.toBeCalled();
expect(handler3Mock).toBeCalledWith(command);
});
});
});
});

describe('Given one registered handler which fails the first execution but not the second', () => {
const handlerMock = jest.fn();

class FooCommandHandlerOk {
async handle(command: FooCommand) {
await handlerMock(command);
}
}

beforeEach(() => {
handlerMock.mockRejectedValueOnce(new Error('ko')).mockResolvedValueOnce('ok');
commandBus.register(FooCommand, new FooCommandHandlerOk());
});

describe('When send command', () => {
const command = new FooCommand({ foo: 'bar' });

beforeEach(async () => await commandBus.send(command));

it('handler should be called two times', async () => {
await waitFor(() => {
expect(handlerMock).toBeCalledTimes(2);
});
});

it('should not log error for failing handler', async () => {
await waitFor(() => {
expect(handlerMock).toBeCalledTimes(2);
expect(loggerMock.error).not.toBeCalled();
});
});

it('should log one retry for failing handler', async () => {
await waitFor(() => {
expect(loggerMock.warn).toBeCalledTimes(1);
expect(loggerMock.warn).toBeCalledWith(
expect.stringContaining(
'FooCommandHandlerOk failed to handle FooCommand command. Attempt 1/3',
),
);
});
});
});
});
});
});
52 changes: 52 additions & 0 deletions packages/ddd-toolkit/src/command-bus/local-command-bus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { ILogger } from '../logger';
import { ICommand, ICommandBus, ICommandClass, ICommandHandler } from './command-bus.interface';
import { ExponentialBackoff, IRetryMechanism } from '../event-bus/exponential-backoff';
import { inspect } from 'util';

export class LocalCommandBus implements ICommandBus {
private readonly retryMechanism: IRetryMechanism;

private handlers: { [key: string]: ICommandHandler<ICommand<unknown>> } = {};

constructor(
private logger: ILogger,
private readonly retryMaxAttempts = 5,
retryInitialDelay = 100,
) {
this.retryMechanism = new ExponentialBackoff(retryInitialDelay);
}

public register<C extends ICommand<unknown>>(command: ICommandClass<C>, handler: ICommandHandler<C>): void {
if (this.handlers[command.name]) throw new Error(`Command ${command.name} is already registered`);
this.handlers[command.name] = handler;
}

public async send<C extends ICommand<unknown>>(command: C): Promise<void> {
const handler = this.handlers[command.name] as ICommandHandler<C>;
if (!handler) {
this.logger.warn(`No handler found for ${command.name}`);
return;
}

void this.handleCommand(command, handler);
}

private async handleCommand<C extends ICommand<unknown>>(command: C, handler: ICommandHandler<C>, attempt = 0) {
try {
await handler.handle(command);
} catch (error) {
if (attempt < this.retryMaxAttempts) {
const nextAttempt = attempt + 1;
const delay = this.retryMechanism.getDelay(nextAttempt);
this.logger.warn(
`${handler.constructor.name} failed to handle ${command.name} command. Attempt ${nextAttempt}/${this.retryMaxAttempts}. Delaying for ${delay}ms.`,
);
setTimeout(() => this.handleCommand(command, handler, nextAttempt), delay);
return;
}
this.logger.error(
`${handler.constructor.name} failed to handle ${command.name} command due to ${inspect(error)}`,
);
}
}
}
23 changes: 1 addition & 22 deletions packages/ddd-toolkit/src/event-bus/local-event-bus.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { LocalEventBus } from './local-event-bus';
import { Event } from './event';
import { loggerMock } from '../logger';
import { sleep, waitFor } from '../utils';

class FooEvent extends Event<{ foo: string }> {
constructor(public readonly payload: { foo: string }) {
Expand Down Expand Up @@ -229,25 +230,3 @@ describe('LocalEventBus', () => {
});
});
});

async function waitFor(statement: () => void, timeout = 1000): Promise<void> {
const startTime = Date.now();

let latestStatementError;
while (true) {
try {
statement();
return;
} catch (e) {
latestStatementError = e;
}

if (Date.now() - startTime > timeout) throw latestStatementError;

await new Promise((resolve) => setTimeout(resolve, 100));
}
}

async function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
19 changes: 19 additions & 0 deletions packages/ddd-toolkit/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
export const waitFor = async (statement: () => void | Promise<void>, timeout = 1000): Promise<void> => {
const startTime = Date.now();

let latestStatementError;
while (true) {
try {
await statement();
return;
} catch (e) {
latestStatementError = e;
}

if (Date.now() - startTime > timeout) throw latestStatementError;

await new Promise((resolve) => setTimeout(resolve, 100));
}
};

export const sleep = (ms: number): Promise<void> => new Promise((resolve) => setTimeout(resolve, ms));

0 comments on commit 33db0f5

Please sign in to comment.