Skip to content

Commit

Permalink
fix(core): wrong behaviour on handlers failing in local-event-bus wit…
Browse files Browse the repository at this point in the history
…h publishAndWaitForHandlers method
  • Loading branch information
gtoselli committed Apr 23, 2024
1 parent 91f4c46 commit a799318
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 4 deletions.
5 changes: 5 additions & 0 deletions .changeset/ninety-kangaroos-sell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@fizzbuds/ddd-toolkit": patch
---

fix(core): wrong behaviour on handlers failing in local-event-bus with publishAndWaitForHandlers method
47 changes: 45 additions & 2 deletions packages/ddd-toolkit/src/event-bus/local-event-bus.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ describe('LocalEventBus', () => {
class FooEventHandlerKo {
async handle(event: FooEvent) {
await handlerKoMock(event);
throw new Error('ko');
}
}

Expand All @@ -180,6 +179,11 @@ describe('LocalEventBus', () => {
await eventBus.publish(event);
expect(handlerOkMock).toBeCalledWith(event);
expect(handlerKoMock).toBeCalledWith(event);

await waitFor(() => {
expect(handlerOkMock).toBeCalledTimes(1);
expect(handlerKoMock).toBeCalledTimes(1);
});
});

it('should log error for failing handler', async () => {
Expand All @@ -191,6 +195,25 @@ describe('LocalEventBus', () => {
);
});
});

describe('When publishAndWaitForHandlers event', () => {
const event = new FooEvent({ foo: 'bar' });

it('publish throw an exception', async () => {
await expect(() => eventBus.publishAndWaitForHandlers(event)).rejects.toThrow();
});

it('both handler should be called', async () => {
try {
await eventBus.publishAndWaitForHandlers(event);
} catch {}
expect(handlerOkMock).toBeCalledWith(event);
expect(handlerKoMock).toBeCalledWith(event);

expect(handlerOkMock).toBeCalledTimes(1);
expect(handlerKoMock).toBeCalledTimes(3);
});
});
});

describe('Given one subscribed handler which fails the first execution but not the second', () => {
Expand Down Expand Up @@ -227,13 +250,33 @@ describe('LocalEventBus', () => {

it('should log one retry for failing handler', async () => {
await waitFor(() => {
expect(handlerMock).toBeCalledTimes(2); // needed to wait for the second call to be done
expect(loggerMock.warn).toBeCalledTimes(1);
expect(loggerMock.warn).toBeCalledWith(
expect.stringContaining('FooEventHandlerOk failed to handle FooEvent event. Attempt 1/3'),
expect.stringContaining('FooEventHandlerOk failed to handle FooEvent event. Attempt 2/3'),
);
});
});
});

describe('When publishAndWaitForHandlers event', () => {
const event = new FooEvent({ foo: 'bar' });

beforeEach(async () => {
await eventBus.publishAndWaitForHandlers(event);
});

it('handler should be called two times', async () => {
expect(handlerMock).toBeCalledTimes(2);
});

it('should log one retry for failing handler', async () => {
expect(loggerMock.warn).toBeCalledTimes(1);
expect(loggerMock.warn).toBeCalledWith(
expect.stringContaining('FooEventHandlerOk failed to handle FooEvent event. Attempt 2/3'),
);
});
});
});
});
});
28 changes: 26 additions & 2 deletions packages/ddd-toolkit/src/event-bus/local-event-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { inspect } from 'util';
import { ILogger } from '../logger';
import { IEvent, IEventBus, IEventClass, IEventHandler } from './event-bus.interface';
import { ExponentialBackoff, IRetryMechanism } from './exponential-backoff';
import { sleep } from '../utils';

export class LocalEventBus implements IEventBus {
private readonly retryMechanism: IRetryMechanism;
Expand Down Expand Up @@ -38,10 +39,10 @@ export class LocalEventBus implements IEventBus {
return;
}

await this.handleEvent(event, handlers);
await this.handleEventSync(event, handlers);
}

private async handleEvent<T extends IEvent<unknown>>(event: T, handlers: IEventHandler<T>[], attempt = 0) {
private async handleEvent<T extends IEvent<unknown>>(event: T, handlers: IEventHandler<T>[], attempt = 1) {
const results = await Promise.allSettled(handlers.map((handler) => handler.handle(event)));
results.forEach((result, index) => {
if (result.status === 'fulfilled') return;
Expand All @@ -61,4 +62,27 @@ export class LocalEventBus implements IEventBus {
this.logger.error(`${handlerName} failed to handle ${event.name} event due to ${inspect(result.reason)}`);
});
}

private async handleEventSync<T extends IEvent<unknown>>(event: T, handlers: IEventHandler<T>[], attempt = 1) {
const results = await Promise.allSettled(handlers.map((handler) => handler.handle(event)));
for (const [index, result] of results.entries()) {
if (result.status === 'fulfilled') continue;

const handler = handlers[index];
const handlerName = handler.constructor.name;

if (attempt < this.retryMaxAttempts) {
const nextAttempt = attempt + 1;
const delay = this.retryMechanism.getDelay(nextAttempt);
this.logger.warn(
`${handlerName} failed to handle ${event.name} event. Attempt ${nextAttempt}/${this.retryMaxAttempts}. Delaying for ${delay}ms.`,
);
await sleep(delay);
await this.handleEventSync(event, [handler], nextAttempt);
continue;
}

throw new Error(`${handlerName} failed to handle ${event.name} event due to ${inspect(result.reason)}`);
}
}
}

0 comments on commit a799318

Please sign in to comment.