From 80e2e0977768abb16c0da1cedbe9b039f47de996 Mon Sep 17 00:00:00 2001 From: adrigzr Date: Sun, 19 Mar 2023 19:52:48 +0100 Subject: [PATCH] feat(adapter-tcp): add commands --- .../adapter-tcp/src/connection.manager.ts | 110 +++++++++++------- packages/adapter-tcp/src/device.connection.ts | 29 +++-- .../device-located.event-handler.ts | 15 +++ packages/toolkit/package.json | 1 + .../src/event-handler.registry.test.ts} | 8 +- .../src/event-handler.registry.ts} | 5 +- packages/toolkit/src/index.ts | 1 + packages/transport-tcp/src/packet.server.ts | 20 +++- 8 files changed, 127 insertions(+), 62 deletions(-) create mode 100644 packages/adapter-tcp/src/event-handlers/packet-event-handlers/device-located.event-handler.ts rename packages/{adapter-tcp/src/event-handler.manager.test.ts => toolkit/src/event-handler.registry.test.ts} (80%) rename packages/{adapter-tcp/src/event-handler.manager.ts => toolkit/src/event-handler.registry.ts} (72%) diff --git a/packages/adapter-tcp/src/connection.manager.ts b/packages/adapter-tcp/src/connection.manager.ts index 02d8c34..8baea4a 100644 --- a/packages/adapter-tcp/src/connection.manager.ts +++ b/packages/adapter-tcp/src/connection.manager.ts @@ -7,68 +7,98 @@ import type { ID } from '@agnoc/toolkit'; import type { PacketServer, PacketFactory, Packet, PayloadObjectName } from '@agnoc/transport-tcp'; export class ConnectionManager { - private readonly connections = new Set(); + private readonly servers = new Map>(); constructor( - private readonly servers: PacketServer[], private readonly packetEventBus: PacketEventBus, private readonly packetFactory: PacketFactory, private readonly deviceRepository: DeviceRepository, - ) { - this.addListeners(); - } + ) {} findConnectionsByDeviceId(deviceId: ID): DeviceConnection[] { - return [...this.connections].filter((connection) => connection.device?.id.equals(deviceId)); + const connections = [...this.servers.values()].flatMap((connections) => [...connections]); + + return connections.filter((connection) => connection.device?.id.equals(deviceId)); } - private addListeners() { - this.servers.forEach((server) => { - server.on('connection', (socket) => { - const connection = new DeviceConnection(this.packetFactory, socket); + addServers(...servers: PacketServer[]): void { + servers.forEach((server) => { + this.servers.set(server, new Set()); + this.addListeners(server); + }); + } - this.connections.add(connection); + private addListeners(server: PacketServer) { + server.on('connection', (socket) => { + const connection = new DeviceConnection(this.packetFactory, socket); - connection.on('data', async (packet: Packet) => { - const event = packet.payload.opcode.name as PayloadObjectName; - const packetMessage = new PacketMessage(connection, packet) as PacketEventBusEvents[PayloadObjectName]; + this.servers.get(server)?.add(connection); - // Update the device on the connection if the device id has changed. - if (!packet.deviceId.equals(connection.device?.id)) { - connection.device = await this.findDeviceById(packet.deviceId); - } + connection.on('data', async (packet: Packet) => { + const event = packet.payload.opcode.name as PayloadObjectName; + const packetMessage = new PacketMessage(connection, packet); - const count = this.packetEventBus.listenerCount(event); + // Update the device on the connection if the device id has changed. + await this.updateConnectionDevice(packet, connection); - // Throw an error if there is no event handler for the packet event. - if (count === 0) { - throw new DomainException(`No event handler found for packet event '${event}'`); - } + // Send the packet message to the packet event bus. + await this.emitPacketEvent(event, packetMessage); - // Emit the packet event. - await this.packetEventBus.emit(event, packetMessage); + // This is a hack to only mark the device as connected if there is more than one connection. + // Here we should check that the connections are from the same ip address. + await this.tryToSetDeviceAsConnected(connection); + }); - // This is a hack to only mark the device as connected if there is more than one connection. - // Here we should check that the connections are from the same ip address. - if (connection.device && !connection.device.isConnected) { - const connections = this.findConnectionsByDeviceId(connection.device.id); + connection.on('close', () => { + this.servers.get(server)?.delete(connection); + }); + }); - if (connections.length > 1) { - connection.device.setAsConnected(); + server.on('close', async () => { + const connections = this.servers.get(server); - await this.deviceRepository.saveOne(connection.device); - } - } - }); + if (connections) { + await Promise.all([...connections].map((connection) => connection.close())); - connection.on('close', () => { - connection.removeAllListeners(); - this.connections.delete(connection); - }); - }); + this.servers.delete(server); + } }); } + private async tryToSetDeviceAsConnected(connection: DeviceConnection) { + if (connection.device && !connection.device.isConnected) { + const connections = this.findConnectionsByDeviceId(connection.device.id); + + if (connections.length > 1) { + connection.device.setAsConnected(); + + await this.deviceRepository.saveOne(connection.device); + } + } + } + + private async emitPacketEvent(event: PayloadObjectName, packetMessage: PacketMessage) { + this.checkForPacketEventHandler(event); + + // Emit the packet event. + await this.packetEventBus.emit(event, packetMessage as PacketEventBusEvents[PayloadObjectName]); + } + + private checkForPacketEventHandler(event: PayloadObjectName) { + const count = this.packetEventBus.listenerCount(event); + + // Throw an error if there is no event handler for the packet event. + if (count === 0) { + throw new DomainException(`No event handler found for packet event '${event}'`); + } + } + + private async updateConnectionDevice(packet: Packet, connection: DeviceConnection) { + if (!packet.deviceId.equals(connection.device?.id)) { + connection.device = await this.findDeviceById(packet.deviceId); + } + } + private async findDeviceById(id: ID): Promise { if (id.value === 0) { return undefined; diff --git a/packages/adapter-tcp/src/device.connection.ts b/packages/adapter-tcp/src/device.connection.ts index bed95ee..e4d9f61 100644 --- a/packages/adapter-tcp/src/device.connection.ts +++ b/packages/adapter-tcp/src/device.connection.ts @@ -1,16 +1,16 @@ import { Device } from '@agnoc/domain'; import { ArgumentInvalidException, DomainException, ID } from '@agnoc/toolkit'; import { PacketSocket } from '@agnoc/transport-tcp'; -import { TypedEmitter } from 'tiny-typed-emitter'; +import Emittery from 'emittery'; import type { Packet, PacketFactory, PayloadObjectName, PayloadObjectFrom } from '@agnoc/transport-tcp'; export interface DeviceConnectionEvents { - data: (packet: Packet) => void | Promise; - close: () => void; - error: (err: Error) => void; + data: Packet; + close: undefined; + error: Error; } -export class DeviceConnection extends TypedEmitter { +export class DeviceConnection extends Emittery { #device?: Device; constructor(private readonly packetFactory: PacketFactory, private readonly socket: PacketSocket) { @@ -37,17 +37,25 @@ export class DeviceConnection extends TypedEmitter { const props = { deviceId: this.#device?.id ?? new ID(0), userId: this.#device?.userId ?? new ID(0) }; const packet = this.packetFactory.create(name, object, props); - return this.socket.write(packet); + return this.write(packet); } respond(name: Name, object: PayloadObjectFrom, packet: Packet): Promise { - return this.socket.write(this.packetFactory.create(name, object, packet)); + return this.write(this.packetFactory.create(name, object, packet)); } close(): Promise { return this.socket.end(); } + private async write(packet: Packet) { + if (!this.socket.connected) { + return; + } + + return this.socket.write(packet); + } + private validateSocket() { if (!(this.socket instanceof PacketSocket)) { throw new DomainException('Socket for Connection is not an instance of PacketSocket'); @@ -60,14 +68,13 @@ export class DeviceConnection extends TypedEmitter { private addListeners() { this.socket.on('data', (packet) => { - this.emit('data', packet); + void this.emit('data', packet); }); this.socket.on('error', (err) => { - this.emit('error', err); + void this.emit('error', err); }); this.socket.on('close', () => { - this.socket.removeAllListeners(); - this.emit('close'); + void this.emit('close'); }); } } diff --git a/packages/adapter-tcp/src/event-handlers/packet-event-handlers/device-located.event-handler.ts b/packages/adapter-tcp/src/event-handlers/packet-event-handlers/device-located.event-handler.ts new file mode 100644 index 0000000..65e79dc --- /dev/null +++ b/packages/adapter-tcp/src/event-handlers/packet-event-handlers/device-located.event-handler.ts @@ -0,0 +1,15 @@ +import { DomainException } from '@agnoc/toolkit'; +import type { PacketEventHandler } from '../../packet.event-handler'; +import type { PacketMessage } from '../../packet.message'; + +export class DeviceLocatedEventHandler implements PacketEventHandler { + readonly eventName = 'DEVICE_SEEK_LOCATION_RSP'; + + async handle(message: PacketMessage<'DEVICE_SEEK_LOCATION_RSP'>): Promise { + if (!message.device) { + throw new DomainException('Device not found'); + } + + // TODO: Should do something here? + } +} diff --git a/packages/toolkit/package.json b/packages/toolkit/package.json index 4a65376..c84529b 100644 --- a/packages/toolkit/package.json +++ b/packages/toolkit/package.json @@ -70,6 +70,7 @@ "tslib": "^2.5.0" }, "devDependencies": { + "@johanblumenberg/ts-mockito": "^1.0.35", "chai": "^4.3.7" }, "engines": { diff --git a/packages/adapter-tcp/src/event-handler.manager.test.ts b/packages/toolkit/src/event-handler.registry.test.ts similarity index 80% rename from packages/adapter-tcp/src/event-handler.manager.test.ts rename to packages/toolkit/src/event-handler.registry.test.ts index 405bb43..01077fb 100644 --- a/packages/adapter-tcp/src/event-handler.manager.test.ts +++ b/packages/toolkit/src/event-handler.registry.test.ts @@ -1,17 +1,17 @@ import { anything, capture, imock, instance, verify, when } from '@johanblumenberg/ts-mockito'; import { expect } from 'chai'; -import { EventHandlerManager } from './event-handler.manager'; +import { EventHandlerRegistry } from './event-handler.registry'; import type { EventBus, EventHandler } from '@agnoc/toolkit'; -describe('EventHandlerManager', function () { +describe('EventHandlerRegistry', function () { let eventBus: EventBus; let eventHandler: EventHandler; - let eventHandlerManager: EventHandlerManager; + let eventHandlerManager: EventHandlerRegistry; beforeEach(function () { eventBus = imock(); eventHandler = imock(); - eventHandlerManager = new EventHandlerManager(instance(eventBus)); + eventHandlerManager = new EventHandlerRegistry(instance(eventBus)); }); it('should listen for events on the bus', function () { diff --git a/packages/adapter-tcp/src/event-handler.manager.ts b/packages/toolkit/src/event-handler.registry.ts similarity index 72% rename from packages/adapter-tcp/src/event-handler.manager.ts rename to packages/toolkit/src/event-handler.registry.ts index 053739a..f462499 100644 --- a/packages/adapter-tcp/src/event-handler.manager.ts +++ b/packages/toolkit/src/event-handler.registry.ts @@ -1,7 +1,8 @@ -import type { EventBus, EventHandler } from '@agnoc/toolkit'; +import type { EventBus } from './base-classes/event-bus.base'; +import type { EventHandler } from './base-classes/event-handler.base'; /** Manages event handlers. */ -export class EventHandlerManager { +export class EventHandlerRegistry { // eslint-disable-next-line @typescript-eslint/no-explicit-any constructor(private readonly eventBus: EventBus) {} diff --git a/packages/toolkit/src/index.ts b/packages/toolkit/src/index.ts index 5dd962e..6e710bd 100644 --- a/packages/toolkit/src/index.ts +++ b/packages/toolkit/src/index.ts @@ -15,6 +15,7 @@ export * from './base-classes/value-object.base'; export * from './decorators/bind.decorator'; export * from './domain-primitives/id.domain-primitive'; export * from './event-buses/domain.event-bus'; +export * from './event-handler.registry'; export * from './exceptions/argument-invalid.exception'; export * from './exceptions/argument-not-provided.exception'; export * from './exceptions/argument-out-of-range.exception'; diff --git a/packages/transport-tcp/src/packet.server.ts b/packages/transport-tcp/src/packet.server.ts index d089759..992bb8a 100644 --- a/packages/transport-tcp/src/packet.server.ts +++ b/packages/transport-tcp/src/packet.server.ts @@ -20,11 +20,11 @@ export interface PacketServerEvents { /** Server that emits `PacketSockets`. */ export class PacketServer extends Emittery { - private server: Server; + private readonly sockets = new Set(); + private readonly server = new Server(); constructor(private readonly packetMapper: PacketMapper) { super(); - this.server = new Server(); this.addListeners(); } @@ -66,19 +66,29 @@ export class PacketServer extends Emittery { resolve(); }); + + this.sockets.forEach((socket) => void socket.end()); }); } private onConnection(socket: Socket): void { - const client = new PacketSocket(this.packetMapper, socket); + const packetSocket = new PacketSocket(this.packetMapper, socket); + + this.sockets.add(packetSocket); - void this.emit('connection', client); + packetSocket.on('close', () => { + this.sockets.delete(packetSocket); + }); + + void this.emit('connection', packetSocket); } private addListeners(): void { this.server.on('connection', this.onConnection.bind(this)); this.server.on('listening', () => void this.emit('listening')); - this.server.on('close', () => void this.emit('close')); + this.server.on('close', () => { + void this.emit('close'); + }); /* istanbul ignore next - unable to test */ this.server.on('error', (error) => void this.emit('error', error)); /* istanbul ignore next - unable to test */