diff --git a/packages/adapter-tcp/src/connection.manager.ts b/packages/adapter-tcp/src/connection.manager.ts index 02d8c34..4349717 100644 --- a/packages/adapter-tcp/src/connection.manager.ts +++ b/packages/adapter-tcp/src/connection.manager.ts @@ -7,68 +7,104 @@ import type { ID } from '@agnoc/toolkit'; import type { PacketServer, PacketFactory, Packet, PayloadObjectName } from '@agnoc/transport-tcp'; export class ConnectionManager { - private readonly connections = new Set(); + private readonly servers = new Map>(); constructor( - private readonly servers: PacketServer[], private readonly packetEventBus: PacketEventBus, private readonly packetFactory: PacketFactory, private readonly deviceRepository: DeviceRepository, - ) { - this.addListeners(); - } + ) {} findConnectionsByDeviceId(deviceId: ID): DeviceConnection[] { - return [...this.connections].filter((connection) => connection.device?.id.equals(deviceId)); + const connections = [...this.servers.values()].flatMap((connections) => [...connections]); + + return connections.filter((connection) => connection.device?.id.equals(deviceId)); } - private addListeners() { - this.servers.forEach((server) => { - server.on('connection', (socket) => { - const connection = new DeviceConnection(this.packetFactory, socket); + addServers(...servers: PacketServer[]): void { + servers.forEach((server) => { + this.servers.set(server, new Set()); + this.addListeners(server); + }); + } - this.connections.add(connection); + private addListeners(server: PacketServer) { + server.on('connection', (socket) => { + const connection = new DeviceConnection(this.packetFactory, this.packetEventBus, socket); - connection.on('data', async (packet: Packet) => { - const event = packet.payload.opcode.name as PayloadObjectName; - const packetMessage = new PacketMessage(connection, packet) as PacketEventBusEvents[PayloadObjectName]; + this.servers.get(server)?.add(connection); - // Update the device on the connection if the device id has changed. - if (!packet.deviceId.equals(connection.device?.id)) { - connection.device = await this.findDeviceById(packet.deviceId); - } + connection.on('data', async (packet: Packet) => { + const packetMessage = new PacketMessage(connection, packet); - const count = this.packetEventBus.listenerCount(event); + // Update the device on the connection if the device id has changed. + await this.updateConnectionDevice(packet, connection); - // Throw an error if there is no event handler for the packet event. - if (count === 0) { - throw new DomainException(`No event handler found for packet event '${event}'`); - } + // Send the packet message to the packet event bus. + await this.emitPacketEvent(packetMessage); - // Emit the packet event. - await this.packetEventBus.emit(event, packetMessage); + // This is a hack to only mark the device as connected if there is more than one connection. + // Here we should check that the connections are from the same ip address. + await this.tryToSetDeviceAsConnected(connection); + }); - // This is a hack to only mark the device as connected if there is more than one connection. - // Here we should check that the connections are from the same ip address. - if (connection.device && !connection.device.isConnected) { - const connections = this.findConnectionsByDeviceId(connection.device.id); + connection.on('close', () => { + this.servers.get(server)?.delete(connection); + }); + }); - if (connections.length > 1) { - connection.device.setAsConnected(); + server.on('close', async () => { + const connections = this.servers.get(server); - await this.deviceRepository.saveOne(connection.device); - } - } - }); + if (connections) { + await Promise.all([...connections].map((connection) => connection.close())); - connection.on('close', () => { - connection.removeAllListeners(); - this.connections.delete(connection); - }); - }); + this.servers.delete(server); + } }); } + private async tryToSetDeviceAsConnected(connection: DeviceConnection) { + if (connection.device && !connection.device.isConnected) { + const connections = this.findConnectionsByDeviceId(connection.device.id); + + if (connections.length > 1) { + connection.device.setAsConnected(); + + await this.deviceRepository.saveOne(connection.device); + } + } + } + + private async emitPacketEvent(message: PacketMessage) { + const name = message.packet.payload.opcode.name as PayloadObjectName; + const sequence = message.packet.sequence.toString(); + + this.checkForPacketEventHandler(name); + + // Emit the packet event by the sequence string. + // This is used to wait for a response from a packet. + await this.packetEventBus.emit(sequence, message as PacketEventBusEvents[PayloadObjectName]); + + // Emit the packet event by the opcode name. + await this.packetEventBus.emit(name, message as PacketEventBusEvents[PayloadObjectName]); + } + + private checkForPacketEventHandler(event: PayloadObjectName) { + const count = this.packetEventBus.listenerCount(event); + + // Throw an error if there is no event handler for the packet event. + if (count === 0) { + throw new DomainException(`No event handler found for packet event '${event}'`); + } + } + + private async updateConnectionDevice(packet: Packet, connection: DeviceConnection) { + if (!packet.deviceId.equals(connection.device?.id)) { + connection.device = await this.findDeviceById(packet.deviceId); + } + } + private async findDeviceById(id: ID): Promise { if (id.value === 0) { return undefined; diff --git a/packages/adapter-tcp/src/device.connection.ts b/packages/adapter-tcp/src/device.connection.ts index bed95ee..02476bd 100644 --- a/packages/adapter-tcp/src/device.connection.ts +++ b/packages/adapter-tcp/src/device.connection.ts @@ -1,19 +1,31 @@ import { Device } from '@agnoc/domain'; import { ArgumentInvalidException, DomainException, ID } from '@agnoc/toolkit'; import { PacketSocket } from '@agnoc/transport-tcp'; -import { TypedEmitter } from 'tiny-typed-emitter'; -import type { Packet, PacketFactory, PayloadObjectName, PayloadObjectFrom } from '@agnoc/transport-tcp'; +import Emittery from 'emittery'; +import type { PacketEventBus } from './packet.event-bus'; +import type { PacketMessage } from './packet.message'; +import type { + Packet, + PacketFactory, + PayloadObjectName, + PayloadObjectFrom, + CreatePacketProps, +} from '@agnoc/transport-tcp'; export interface DeviceConnectionEvents { - data: (packet: Packet) => void | Promise; - close: () => void; - error: (err: Error) => void; + data: Packet; + close: undefined; + error: Error; } -export class DeviceConnection extends TypedEmitter { +export class DeviceConnection extends Emittery { #device?: Device; - constructor(private readonly packetFactory: PacketFactory, private readonly socket: PacketSocket) { + constructor( + private readonly packetFactory: PacketFactory, + private readonly eventBus: PacketEventBus, + private readonly socket: PacketSocket, + ) { super(); this.validateSocket(); this.addListeners(); @@ -34,20 +46,52 @@ export class DeviceConnection extends TypedEmitter { } send(name: Name, object: PayloadObjectFrom): Promise { - const props = { deviceId: this.#device?.id ?? new ID(0), userId: this.#device?.userId ?? new ID(0) }; - const packet = this.packetFactory.create(name, object, props); + const packet = this.packetFactory.create(name, object, this.getPacketProps()); - return this.socket.write(packet); + return this.write(packet); } respond(name: Name, object: PayloadObjectFrom, packet: Packet): Promise { - return this.socket.write(this.packetFactory.create(name, object, packet)); + return this.write(this.packetFactory.create(name, object, packet)); + } + + sendAndWait(name: Name, object: PayloadObjectFrom): Promise { + const packet = this.packetFactory.create(name, object, this.getPacketProps()); + + return this.writeAndWait(packet); + } + + respondAndWait( + name: Name, + object: PayloadObjectFrom, + packet: Packet, + ): Promise { + return this.writeAndWait(this.packetFactory.create(name, object, packet)); } close(): Promise { return this.socket.end(); } + private getPacketProps(): CreatePacketProps { + return { deviceId: this.#device?.id ?? new ID(0), userId: this.#device?.userId ?? new ID(0) }; + } + + private writeAndWait(packet: Packet): Promise { + return new Promise((resolve, reject) => { + this.eventBus.once(packet.sequence.toString()).then(resolve, reject); + this.write(packet).catch(reject); + }); + } + + private async write(packet: Packet) { + if (!this.socket.connected) { + return; + } + + return this.socket.write(packet); + } + private validateSocket() { if (!(this.socket instanceof PacketSocket)) { throw new DomainException('Socket for Connection is not an instance of PacketSocket'); @@ -60,14 +104,13 @@ export class DeviceConnection extends TypedEmitter { private addListeners() { this.socket.on('data', (packet) => { - this.emit('data', packet); + void this.emit('data', packet); }); this.socket.on('error', (err) => { - this.emit('error', err); + void this.emit('error', err); }); this.socket.on('close', () => { - this.socket.removeAllListeners(); - this.emit('close'); + void this.emit('close'); }); } } diff --git a/packages/adapter-tcp/src/event-handlers/command-event-handlers/locate-device.event-handler.ts b/packages/adapter-tcp/src/event-handlers/command-event-handlers/locate-device.event-handler.ts new file mode 100644 index 0000000..573541e --- /dev/null +++ b/packages/adapter-tcp/src/event-handlers/command-event-handlers/locate-device.event-handler.ts @@ -0,0 +1,19 @@ +import { DomainException } from '@agnoc/toolkit'; +import type { ConnectionManager } from '../../connection.manager'; +import type { CommandEventHandler, LocateDeviceCommand } from '@agnoc/domain'; + +export class LocateDeviceEventHandler implements CommandEventHandler { + readonly eventName = 'LocateDeviceCommand'; + + constructor(private readonly connectionManager: ConnectionManager) {} + + async handle(event: LocateDeviceCommand): Promise { + const [connection] = this.connectionManager.findConnectionsByDeviceId(event.deviceId); + + if (!connection || !connection.device) { + throw new DomainException(`Unable to find a connection for the device with id ${event.deviceId.value}`); + } + + await connection.sendAndWait('DEVICE_SEEK_LOCATION_REQ', {}); + } +} diff --git a/packages/adapter-tcp/src/event-handlers/packet-event-handlers/device-located.event-handler.ts b/packages/adapter-tcp/src/event-handlers/packet-event-handlers/device-located.event-handler.ts new file mode 100644 index 0000000..65e79dc --- /dev/null +++ b/packages/adapter-tcp/src/event-handlers/packet-event-handlers/device-located.event-handler.ts @@ -0,0 +1,15 @@ +import { DomainException } from '@agnoc/toolkit'; +import type { PacketEventHandler } from '../../packet.event-handler'; +import type { PacketMessage } from '../../packet.message'; + +export class DeviceLocatedEventHandler implements PacketEventHandler { + readonly eventName = 'DEVICE_SEEK_LOCATION_RSP'; + + async handle(message: PacketMessage<'DEVICE_SEEK_LOCATION_RSP'>): Promise { + if (!message.device) { + throw new DomainException('Device not found'); + } + + // TODO: Should do something here? + } +} diff --git a/packages/adapter-tcp/src/packet.event-bus.ts b/packages/adapter-tcp/src/packet.event-bus.ts index 9d8dec8..d1a2109 100644 --- a/packages/adapter-tcp/src/packet.event-bus.ts +++ b/packages/adapter-tcp/src/packet.event-bus.ts @@ -5,6 +5,8 @@ import type { PayloadObjectName } from '@agnoc/transport-tcp'; /** Events for the packet event bus. */ export type PacketEventBusEvents = { [Name in PayloadObjectName]: PacketMessage; +} & { + [key: string]: PacketMessage; }; /** Event bus for packets. */ diff --git a/packages/adapter-tcp/src/packet.message.ts b/packages/adapter-tcp/src/packet.message.ts index 9e780e7..a5cffcc 100644 --- a/packages/adapter-tcp/src/packet.message.ts +++ b/packages/adapter-tcp/src/packet.message.ts @@ -2,7 +2,7 @@ import type { DeviceConnection } from './device.connection'; import type { Device } from '@agnoc/domain'; import type { Packet, PayloadObjectFrom, PayloadObjectName } from '@agnoc/transport-tcp'; -export class PacketMessage { +export class PacketMessage { constructor(readonly connection: DeviceConnection, readonly packet: Packet) {} get device(): Device | undefined { @@ -12,4 +12,8 @@ export class PacketMessage { respond(name: Name, object: PayloadObjectFrom): Promise { return this.connection.respond(name, object, this.packet); } + + respondAndWait(name: Name, object: PayloadObjectFrom): Promise { + return this.connection.respondAndWait(name, object, this.packet); + } } diff --git a/packages/adapter-tcp/src/tcp.adapter.ts b/packages/adapter-tcp/src/tcp.adapter.ts new file mode 100644 index 0000000..3ec4262 --- /dev/null +++ b/packages/adapter-tcp/src/tcp.adapter.ts @@ -0,0 +1,138 @@ +import { EventHandlerRegistry } from '@agnoc/toolkit'; +import { + getCustomDecoders, + getProtobufRoot, + PacketMapper, + PacketServer, + PayloadMapper, + PayloadObjectParserService, + PacketFactory, +} from '@agnoc/transport-tcp'; +import { ConnectionManager } from './connection.manager'; +import { LocateDeviceEventHandler } from './event-handlers/command-event-handlers/locate-device.event-handler'; +import { LockDeviceWhenDeviceIsConnectedEventHandler } from './event-handlers/domain-event-handlers/lock-device-when-device-is-connected-event-handler.event-handler'; +import { QueryDeviceInfoWhenDeviceIsLockedEventHandler } from './event-handlers/domain-event-handlers/query-device-info-when-device-is-locked-event-handler.event-handler'; +import { ClientHeartbeatEventHandler } from './event-handlers/packet-event-handlers/client-heartbeat.event-handler'; +import { ClientLoginEventHandler } from './event-handlers/packet-event-handlers/client-login.event-handler'; +import { DeviceBatteryUpdateEventHandler } from './event-handlers/packet-event-handlers/device-battery-update.event-handler'; +import { DeviceCleanMapDataReportEventHandler } from './event-handlers/packet-event-handlers/device-clean-map-data-report.event-handler'; +import { DeviceCleanMapReportEventHandler } from './event-handlers/packet-event-handlers/device-clean-map-report.event-handler'; +import { DeviceCleanTaskReportEventHandler } from './event-handlers/packet-event-handlers/device-clean-task-report.event-handler'; +import { DeviceGetAllGlobalMapEventHandler } from './event-handlers/packet-event-handlers/device-get-all-global-map.event-handler'; +import { DeviceLocatedEventHandler } from './event-handlers/packet-event-handlers/device-located.event-handler'; +import { DeviceLockedEventHandler } from './event-handlers/packet-event-handlers/device-locked.event-handler'; +import { DeviceMapChargerPositionUpdateEventHandler } from './event-handlers/packet-event-handlers/device-map-charger-position-update.event-handler'; +import { DeviceMapUpdateEventHandler } from './event-handlers/packet-event-handlers/device-map-update.event-handler'; +import { DeviceMapWorkStatusUpdateEventHandler } from './event-handlers/packet-event-handlers/device-map-work-status-update.event-handler'; +import { DeviceMemoryMapInfoEventHandler } from './event-handlers/packet-event-handlers/device-memory-map-info.event-handler'; +import { DeviceOfflineEventHandler } from './event-handlers/packet-event-handlers/device-offline.event-handler'; +import { DeviceRegisterEventHandler } from './event-handlers/packet-event-handlers/device-register.event-handler'; +import { DeviceSettingsUpdateEventHandler } from './event-handlers/packet-event-handlers/device-settings-update.event-handler'; +import { DeviceTimeUpdateEventHandler } from './event-handlers/packet-event-handlers/device-time-update.event-handler'; +import { DeviceUpgradeInfoEventHandler } from './event-handlers/packet-event-handlers/device-upgrade-info.event-handler'; +import { DeviceVersionUpdateEventHandler } from './event-handlers/packet-event-handlers/device-version-update.event-handler'; +import { DeviceWlanUpdateEventHandler } from './event-handlers/packet-event-handlers/device-wlan-update.event-handler'; +import { DeviceBatteryMapper } from './mappers/device-battery.mapper'; +import { DeviceErrorMapper } from './mappers/device-error.mapper'; +import { DeviceFanSpeedMapper } from './mappers/device-fan-speed.mapper'; +import { DeviceModeMapper } from './mappers/device-mode.mapper'; +import { DeviceStateMapper } from './mappers/device-state.mapper'; +import { DeviceVoiceMapper } from './mappers/device-voice.mapper'; +import { DeviceWaterLevelMapper } from './mappers/device-water-level.mapper'; +import { PacketEventBus } from './packet.event-bus'; +import { TimeSyncServer } from './time-sync.server'; +import type { DeviceRepository } from '@agnoc/domain'; + +export class TCPAdapter { + private readonly timeSyncServer: TimeSyncServer; + private readonly cmdServer: PacketServer; + private readonly mapServer: PacketServer; + + constructor( + private readonly deviceRepository: DeviceRepository, + private readonly domainEventHandlerRegistry: EventHandlerRegistry, + private readonly commandEventHandlerRegistry: EventHandlerRegistry, + ) { + // Packet foundation + const payloadMapper = new PayloadMapper(new PayloadObjectParserService(getProtobufRoot(), getCustomDecoders())); + const packetMapper = new PacketMapper(payloadMapper); + const packetFactory = new PacketFactory(); + + // Servers + this.timeSyncServer = new TimeSyncServer(new PacketServer(packetMapper), packetFactory); + this.cmdServer = new PacketServer(packetMapper); + this.mapServer = new PacketServer(packetMapper); + + // Mappers + const deviceFanSpeedMapper = new DeviceFanSpeedMapper(); + const deviceWaterLevelMapper = new DeviceWaterLevelMapper(); + const deviceVoiceMapper = new DeviceVoiceMapper(); + const deviceStateMapper = new DeviceStateMapper(); + const deviceModeMapper = new DeviceModeMapper(); + const deviceErrorMapper = new DeviceErrorMapper(); + const deviceBatteryMapper = new DeviceBatteryMapper(); + + // Packet event bus + const packetEventBus = new PacketEventBus(); + const packetEventHandlerRegistry = new EventHandlerRegistry(packetEventBus); + + // Connection managers + const connectionManager = new ConnectionManager(packetEventBus, packetFactory, this.deviceRepository); + + connectionManager.addServers(this.cmdServer, this.mapServer); + + // Packet event handlers + packetEventHandlerRegistry.register( + new ClientHeartbeatEventHandler(), + new ClientLoginEventHandler(), + new DeviceBatteryUpdateEventHandler(deviceBatteryMapper), + new DeviceCleanMapDataReportEventHandler(), + new DeviceCleanMapReportEventHandler(), + new DeviceCleanTaskReportEventHandler(), + new DeviceGetAllGlobalMapEventHandler(), + new DeviceLocatedEventHandler(), + new DeviceLockedEventHandler(this.deviceRepository), + new DeviceMapChargerPositionUpdateEventHandler(), + new DeviceMapWorkStatusUpdateEventHandler( + deviceStateMapper, + deviceModeMapper, + deviceErrorMapper, + deviceBatteryMapper, + deviceFanSpeedMapper, + deviceWaterLevelMapper, + ), + new DeviceMemoryMapInfoEventHandler(), + new DeviceOfflineEventHandler(), + new DeviceRegisterEventHandler(this.deviceRepository), + new DeviceSettingsUpdateEventHandler(deviceVoiceMapper), + new DeviceTimeUpdateEventHandler(), + new DeviceUpgradeInfoEventHandler(), + new DeviceVersionUpdateEventHandler(), + new DeviceWlanUpdateEventHandler(), + new DeviceMapUpdateEventHandler( + deviceBatteryMapper, + deviceModeMapper, + deviceStateMapper, + deviceErrorMapper, + deviceFanSpeedMapper, + ), + ); + + // Domain event handlers + this.domainEventHandlerRegistry.register( + new LockDeviceWhenDeviceIsConnectedEventHandler(connectionManager), + new QueryDeviceInfoWhenDeviceIsLockedEventHandler(connectionManager), + ); + + // Command event handlers + this.commandEventHandlerRegistry.register(new LocateDeviceEventHandler(connectionManager)); + } + + async start(): Promise { + await Promise.all([this.cmdServer.listen(4010), this.mapServer.listen(4030), this.timeSyncServer.listen()]); + } + + async stop(): Promise { + await Promise.all([this.cmdServer.close(), this.mapServer.close(), this.timeSyncServer.close()]); + } +} diff --git a/packages/domain/src/commands/commands.ts b/packages/domain/src/commands/commands.ts new file mode 100644 index 0000000..caa1063 --- /dev/null +++ b/packages/domain/src/commands/commands.ts @@ -0,0 +1,7 @@ +import type { LocateDeviceCommand } from './locate-device.command'; + +export type CommandEvents = { + LocateDeviceCommand: LocateDeviceCommand; +}; + +export type CommandEventNames = keyof CommandEvents; diff --git a/packages/domain/src/commands/locate-device.command.ts b/packages/domain/src/commands/locate-device.command.ts new file mode 100644 index 0000000..bbc6ab8 --- /dev/null +++ b/packages/domain/src/commands/locate-device.command.ts @@ -0,0 +1,16 @@ +import { Command } from '@agnoc/toolkit'; +import type { ID } from '@agnoc/toolkit'; + +export interface LocateDeviceCommandProps { + deviceId: ID; +} + +export class LocateDeviceCommand extends Command { + get deviceId(): ID { + return this.props.deviceId; + } + + protected validate(_: LocateDeviceCommandProps): void { + // noop + } +} diff --git a/packages/domain/src/events/device-connected.domain-event.test.ts b/packages/domain/src/domain-events/device-connected.domain-event.test.ts similarity index 100% rename from packages/domain/src/events/device-connected.domain-event.test.ts rename to packages/domain/src/domain-events/device-connected.domain-event.test.ts diff --git a/packages/domain/src/events/device-connected.domain-event.ts b/packages/domain/src/domain-events/device-connected.domain-event.ts similarity index 100% rename from packages/domain/src/events/device-connected.domain-event.ts rename to packages/domain/src/domain-events/device-connected.domain-event.ts diff --git a/packages/domain/src/events/device-locked.domain-event.ts b/packages/domain/src/domain-events/device-locked.domain-event.ts similarity index 100% rename from packages/domain/src/events/device-locked.domain-event.ts rename to packages/domain/src/domain-events/device-locked.domain-event.ts diff --git a/packages/domain/src/events/index.ts b/packages/domain/src/domain-events/domain-events.ts similarity index 100% rename from packages/domain/src/events/index.ts rename to packages/domain/src/domain-events/domain-events.ts diff --git a/packages/domain/src/entities/device.entity.ts b/packages/domain/src/entities/device.entity.ts index 5d124c3..581c6dd 100644 --- a/packages/domain/src/entities/device.entity.ts +++ b/packages/domain/src/entities/device.entity.ts @@ -1,12 +1,12 @@ import { AggregateRoot, ID } from '@agnoc/toolkit'; +import { DeviceConnectedDomainEvent } from '../domain-events/device-connected.domain-event'; +import { DeviceLockedDomainEvent } from '../domain-events/device-locked.domain-event'; import { DeviceBattery } from '../domain-primitives/device-battery.domain-primitive'; import { DeviceError } from '../domain-primitives/device-error.domain-primitive'; import { DeviceFanSpeed } from '../domain-primitives/device-fan-speed.domain-primitive'; import { DeviceMode } from '../domain-primitives/device-mode.domain-primitive'; import { DeviceState } from '../domain-primitives/device-state.domain-primitive'; import { DeviceWaterLevel } from '../domain-primitives/device-water-level.domain-primitive'; -import { DeviceConnectedDomainEvent } from '../events/device-connected.domain-event'; -import { DeviceLockedDomainEvent } from '../events/device-locked.domain-event'; import { DeviceCleanWork } from '../value-objects/device-clean-work.value-object'; import { DeviceConsumable } from '../value-objects/device-consumable.value-object'; import { DeviceSettings } from '../value-objects/device-settings.value-object'; diff --git a/packages/domain/src/event-buses/command.event-bus.ts b/packages/domain/src/event-buses/command.event-bus.ts new file mode 100644 index 0000000..35e8cdd --- /dev/null +++ b/packages/domain/src/event-buses/command.event-bus.ts @@ -0,0 +1,6 @@ +import { EventBus } from '@agnoc/toolkit'; +import type { CommandEventNames, CommandEvents } from '../commands/commands'; + +type CommandEventBusEvents = { [Name in CommandEventNames]: CommandEvents[Name] }; + +export class CommandEventBus extends EventBus {} diff --git a/packages/toolkit/src/event-buses/domain.event-bus.test.ts b/packages/domain/src/event-buses/domain.event-bus.test.ts similarity index 85% rename from packages/toolkit/src/event-buses/domain.event-bus.test.ts rename to packages/domain/src/event-buses/domain.event-bus.test.ts index c53f761..1bb4872 100644 --- a/packages/toolkit/src/event-buses/domain.event-bus.test.ts +++ b/packages/domain/src/event-buses/domain.event-bus.test.ts @@ -1,5 +1,5 @@ +import { EventBus } from '@agnoc/toolkit'; import { expect } from 'chai'; -import { EventBus } from '../base-classes/event-bus.base'; import { DomainEventBus } from './domain.event-bus'; describe('DomainEventBus', function () { diff --git a/packages/domain/src/event-buses/domain.event-bus.ts b/packages/domain/src/event-buses/domain.event-bus.ts new file mode 100644 index 0000000..5565845 --- /dev/null +++ b/packages/domain/src/event-buses/domain.event-bus.ts @@ -0,0 +1,6 @@ +import { EventBus } from '@agnoc/toolkit'; +import type { DomainEventNames, DomainEvents } from '../domain-events/domain-events'; + +type DomainEventBusEvents = { [Name in DomainEventNames]: DomainEvents[Name] }; + +export class DomainEventBus extends EventBus {} diff --git a/packages/domain/src/event-handlers/command.event-handler.ts b/packages/domain/src/event-handlers/command.event-handler.ts new file mode 100644 index 0000000..857556e --- /dev/null +++ b/packages/domain/src/event-handlers/command.event-handler.ts @@ -0,0 +1,7 @@ +import type { CommandEventNames, CommandEvents } from '../commands/commands'; +import type { EventHandler } from '@agnoc/toolkit'; + +export abstract class CommandEventHandler implements EventHandler { + abstract eventName: CommandEventNames; + abstract handle(event: CommandEvents[this['eventName']]): void; +} diff --git a/packages/domain/src/event-handlers/domain.event-handler.ts b/packages/domain/src/event-handlers/domain.event-handler.ts index 07fcd31..c5f492d 100644 --- a/packages/domain/src/event-handlers/domain.event-handler.ts +++ b/packages/domain/src/event-handlers/domain.event-handler.ts @@ -1,4 +1,4 @@ -import type { DomainEventNames, DomainEvents } from '../events'; +import type { DomainEventNames, DomainEvents } from '../domain-events/domain-events'; import type { EventHandler } from '@agnoc/toolkit'; /** Base class for domain event handlers. */ diff --git a/packages/domain/src/index.ts b/packages/domain/src/index.ts index 692d51a..a65934c 100644 --- a/packages/domain/src/index.ts +++ b/packages/domain/src/index.ts @@ -1,3 +1,8 @@ +export * from './commands/commands'; +export * from './commands/locate-device.command'; +export * from './domain-events/device-connected.domain-event'; +export * from './domain-events/device-locked.domain-event'; +export * from './domain-events/domain-events'; export * from './domain-primitives/clean-mode.domain-primitive'; export * from './domain-primitives/clean-size.domain-primitive'; export * from './domain-primitives/device-battery.domain-primitive'; @@ -12,10 +17,10 @@ export * from './entities/device-order.entity'; export * from './entities/device.entity'; export * from './entities/room.entity'; export * from './entities/zone.entity'; +export * from './event-buses/command.event-bus'; +export * from './event-buses/domain.event-bus'; +export * from './event-handlers/command.event-handler'; export * from './event-handlers/domain.event-handler'; -export * from './events'; -export * from './events/device-connected.domain-event'; -export * from './events/device-locked.domain-event'; export * from './repositories/device.repository'; export * from './value-objects/device-clean-work.value-object'; export * from './value-objects/device-consumable.value-object'; diff --git a/packages/domain/src/repositories/device.repository.test.ts b/packages/domain/src/repositories/device.repository.test.ts index 5268081..987328d 100644 --- a/packages/domain/src/repositories/device.repository.test.ts +++ b/packages/domain/src/repositories/device.repository.test.ts @@ -2,17 +2,17 @@ import { Repository } from '@agnoc/toolkit'; import { imock, instance } from '@johanblumenberg/ts-mockito'; import { expect } from 'chai'; import { DeviceRepository } from './device.repository'; -import type { Adapter, DomainEventBus } from '@agnoc/toolkit'; +import type { Adapter, EventBus } from '@agnoc/toolkit'; describe('DeviceRepository', function () { - let domainEventBus: DomainEventBus; + let eventBus: EventBus; let adapter: Adapter; let repository: DeviceRepository; beforeEach(function () { - domainEventBus = imock(); + eventBus = imock(); adapter = imock(); - repository = new DeviceRepository(instance(domainEventBus), instance(adapter)); + repository = new DeviceRepository(instance(eventBus), instance(adapter)); }); it('should be a repository', function () { diff --git a/packages/toolkit/package.json b/packages/toolkit/package.json index 4a65376..c84529b 100644 --- a/packages/toolkit/package.json +++ b/packages/toolkit/package.json @@ -70,6 +70,7 @@ "tslib": "^2.5.0" }, "devDependencies": { + "@johanblumenberg/ts-mockito": "^1.0.35", "chai": "^4.3.7" }, "engines": { diff --git a/packages/toolkit/src/base-classes/aggregate-root.base.test.ts b/packages/toolkit/src/base-classes/aggregate-root.base.test.ts index 0d0ff75..333a7ef 100644 --- a/packages/toolkit/src/base-classes/aggregate-root.base.test.ts +++ b/packages/toolkit/src/base-classes/aggregate-root.base.test.ts @@ -6,13 +6,13 @@ import { DomainEvent } from './domain-event.base'; import { Entity } from './entity.base'; import type { DomainEventProps } from './domain-event.base'; import type { EntityProps } from './entity.base'; -import type { DomainEventBus } from '../event-buses/domain.event-bus'; +import type { EventBus } from './event-bus.base'; describe('AggregateRoot', function () { - let domainEventBus: DomainEventBus; + let eventBus: EventBus; beforeEach(function () { - domainEventBus = imock(); + eventBus = imock(); }); it('should be created', function () { @@ -34,15 +34,15 @@ describe('AggregateRoot', function () { const id = ID.generate(); const dummyAggregateRoot = new DummyAggregateRoot({ id }); - when(domainEventBus.emit(anything(), anything())).thenResolve(); + when(eventBus.emit(anything(), anything())).thenResolve(); dummyAggregateRoot.doSomething(); - await dummyAggregateRoot.publishEvents(instance(domainEventBus)); + await dummyAggregateRoot.publishEvents(instance(eventBus)); expect(dummyAggregateRoot.domainEvents).to.be.lengthOf(0); - verify(domainEventBus.emit('DummyDomainEvent', deepEqual(new DummyDomainEvent({ aggregateId: id })))).once(); + verify(eventBus.emit('DummyDomainEvent', deepEqual(new DummyDomainEvent({ aggregateId: id })))).once(); }); it('should be able to clear domain events', function () { diff --git a/packages/toolkit/src/base-classes/aggregate-root.base.ts b/packages/toolkit/src/base-classes/aggregate-root.base.ts index 5a28c3b..61d69b2 100644 --- a/packages/toolkit/src/base-classes/aggregate-root.base.ts +++ b/packages/toolkit/src/base-classes/aggregate-root.base.ts @@ -2,7 +2,7 @@ import { debug } from '../utils/debug.util'; import { Entity } from './entity.base'; import type { DomainEvent } from './domain-event.base'; import type { EntityProps } from './entity.base'; -import type { DomainEventBus } from '../event-buses/domain.event-bus'; +import type { EventBus } from './event-bus.base'; export abstract class AggregateRoot extends Entity { private readonly debug = debug(__filename).extend(`${this.constructor.name.toLowerCase()}:${this.id.value}`); @@ -16,11 +16,11 @@ export abstract class AggregateRoot extends this.#domainEvents.clear(); } - async publishEvents(domainEventBus: DomainEventBus): Promise { + async publishEvents(eventBus: EventBus): Promise { await Promise.all( this.domainEvents.map(async (event) => { this.debug(`publishing domain event '${event.constructor.name}'`); - return domainEventBus.emit(event.constructor.name, event); + return eventBus.emit(event.constructor.name, event); }), ); diff --git a/packages/toolkit/src/base-classes/command.base.ts b/packages/toolkit/src/base-classes/command.base.ts new file mode 100644 index 0000000..a92f79b --- /dev/null +++ b/packages/toolkit/src/base-classes/command.base.ts @@ -0,0 +1,10 @@ +import { Validatable } from './validatable.base'; + +// eslint-disable-next-line @typescript-eslint/no-empty-interface +export interface CommandProps {} + +export abstract class Command extends Validatable { + constructor(protected readonly props: T) { + super(props); + } +} diff --git a/packages/toolkit/src/base-classes/event-bus.base.ts b/packages/toolkit/src/base-classes/event-bus.base.ts index 3168262..351f702 100644 --- a/packages/toolkit/src/base-classes/event-bus.base.ts +++ b/packages/toolkit/src/base-classes/event-bus.base.ts @@ -1,5 +1,4 @@ import Emittery from 'emittery'; -export type EventBusEvents = { [key: string]: unknown }; - -export abstract class EventBus extends Emittery {} +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export abstract class EventBus extends Emittery {} diff --git a/packages/toolkit/src/base-classes/repository.base.test.ts b/packages/toolkit/src/base-classes/repository.base.test.ts index 26a0dfb..174712b 100644 --- a/packages/toolkit/src/base-classes/repository.base.test.ts +++ b/packages/toolkit/src/base-classes/repository.base.test.ts @@ -5,17 +5,17 @@ import { AggregateRoot } from './aggregate-root.base'; import { Repository } from './repository.base'; import type { Adapter } from './adapter.base'; import type { EntityProps } from './entity.base'; -import type { DomainEventBus } from '../event-buses/domain.event-bus'; +import type { EventBus } from './event-bus.base'; describe('Repository', function () { - let domainEventBus: DomainEventBus; + let eventBus: EventBus; let adapter: Adapter; let dummyRepository: DummyRepository; beforeEach(function () { - domainEventBus = imock(); + eventBus = imock(); adapter = imock(); - dummyRepository = new DummyRepository(instance(domainEventBus), instance(adapter)); + dummyRepository = new DummyRepository(instance(eventBus), instance(adapter)); }); it('should find one by id', async function () { @@ -52,7 +52,7 @@ describe('Repository', function () { await dummyRepository.saveOne(entity); verify(adapter.set(id, entity)).once(); - verify(entitySpy.publishEvents(instance(domainEventBus))).once(); + verify(entitySpy.publishEvents(instance(eventBus))).once(); }); it('should delete one', async function () { @@ -63,7 +63,7 @@ describe('Repository', function () { await dummyRepository.deleteOne(entity); verify(adapter.delete(id)).once(); - verify(entitySpy.publishEvents(instance(domainEventBus))).once(); + verify(entitySpy.publishEvents(instance(eventBus))).once(); }); }); diff --git a/packages/toolkit/src/base-classes/repository.base.ts b/packages/toolkit/src/base-classes/repository.base.ts index 7d88d29..9e42db8 100644 --- a/packages/toolkit/src/base-classes/repository.base.ts +++ b/packages/toolkit/src/base-classes/repository.base.ts @@ -2,11 +2,11 @@ import type { Adapter } from './adapter.base'; import type { AggregateRoot } from './aggregate-root.base'; import type { EntityProps } from './entity.base'; +import type { EventBus } from './event-bus.base'; import type { ID } from '../domain-primitives/id.domain-primitive'; -import type { DomainEventBus } from '../event-buses/domain.event-bus'; export abstract class Repository> { - constructor(private readonly domainEventBus: DomainEventBus, private readonly adapter: Adapter) {} + constructor(private readonly eventBus: EventBus, private readonly adapter: Adapter) {} async findOneById(id: ID): Promise { return this.adapter.get(id) as T | undefined; @@ -18,11 +18,11 @@ export abstract class Repository> { async saveOne(entity: T): Promise { this.adapter.set(entity.id, entity); - await entity.publishEvents(this.domainEventBus); + await entity.publishEvents(this.eventBus); } async deleteOne(entity: T): Promise { this.adapter.delete(entity.id); - await entity.publishEvents(this.domainEventBus); + await entity.publishEvents(this.eventBus); } } diff --git a/packages/toolkit/src/event-buses/domain.event-bus.ts b/packages/toolkit/src/event-buses/domain.event-bus.ts deleted file mode 100644 index bee7935..0000000 --- a/packages/toolkit/src/event-buses/domain.event-bus.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { EventBus } from '../base-classes/event-bus.base'; -import type { DomainEvent } from '../base-classes/domain-event.base'; - -type DomainEvents = { [key: string]: DomainEvent }; - -export class DomainEventBus extends EventBus {} diff --git a/packages/adapter-tcp/src/event-handler.manager.test.ts b/packages/toolkit/src/event-handler.registry.test.ts similarity index 80% rename from packages/adapter-tcp/src/event-handler.manager.test.ts rename to packages/toolkit/src/event-handler.registry.test.ts index 405bb43..01077fb 100644 --- a/packages/adapter-tcp/src/event-handler.manager.test.ts +++ b/packages/toolkit/src/event-handler.registry.test.ts @@ -1,17 +1,17 @@ import { anything, capture, imock, instance, verify, when } from '@johanblumenberg/ts-mockito'; import { expect } from 'chai'; -import { EventHandlerManager } from './event-handler.manager'; +import { EventHandlerRegistry } from './event-handler.registry'; import type { EventBus, EventHandler } from '@agnoc/toolkit'; -describe('EventHandlerManager', function () { +describe('EventHandlerRegistry', function () { let eventBus: EventBus; let eventHandler: EventHandler; - let eventHandlerManager: EventHandlerManager; + let eventHandlerManager: EventHandlerRegistry; beforeEach(function () { eventBus = imock(); eventHandler = imock(); - eventHandlerManager = new EventHandlerManager(instance(eventBus)); + eventHandlerManager = new EventHandlerRegistry(instance(eventBus)); }); it('should listen for events on the bus', function () { diff --git a/packages/adapter-tcp/src/event-handler.manager.ts b/packages/toolkit/src/event-handler.registry.ts similarity index 72% rename from packages/adapter-tcp/src/event-handler.manager.ts rename to packages/toolkit/src/event-handler.registry.ts index 053739a..f462499 100644 --- a/packages/adapter-tcp/src/event-handler.manager.ts +++ b/packages/toolkit/src/event-handler.registry.ts @@ -1,7 +1,8 @@ -import type { EventBus, EventHandler } from '@agnoc/toolkit'; +import type { EventBus } from './base-classes/event-bus.base'; +import type { EventHandler } from './base-classes/event-handler.base'; /** Manages event handlers. */ -export class EventHandlerManager { +export class EventHandlerRegistry { // eslint-disable-next-line @typescript-eslint/no-explicit-any constructor(private readonly eventBus: EventBus) {} diff --git a/packages/toolkit/src/index.ts b/packages/toolkit/src/index.ts index 5dd962e..c879e72 100644 --- a/packages/toolkit/src/index.ts +++ b/packages/toolkit/src/index.ts @@ -1,6 +1,7 @@ export * from './adapters/memory.adapter'; export * from './base-classes/adapter.base'; export * from './base-classes/aggregate-root.base'; +export * from './base-classes/command.base'; export * from './base-classes/domain-event.base'; export * from './base-classes/domain-primitive.base'; export * from './base-classes/entity.base'; @@ -14,7 +15,7 @@ export * from './base-classes/validatable.base'; export * from './base-classes/value-object.base'; export * from './decorators/bind.decorator'; export * from './domain-primitives/id.domain-primitive'; -export * from './event-buses/domain.event-bus'; +export * from './event-handler.registry'; export * from './exceptions/argument-invalid.exception'; export * from './exceptions/argument-not-provided.exception'; export * from './exceptions/argument-out-of-range.exception'; diff --git a/packages/transport-tcp/src/packet.server.ts b/packages/transport-tcp/src/packet.server.ts index d089759..992bb8a 100644 --- a/packages/transport-tcp/src/packet.server.ts +++ b/packages/transport-tcp/src/packet.server.ts @@ -20,11 +20,11 @@ export interface PacketServerEvents { /** Server that emits `PacketSockets`. */ export class PacketServer extends Emittery { - private server: Server; + private readonly sockets = new Set(); + private readonly server = new Server(); constructor(private readonly packetMapper: PacketMapper) { super(); - this.server = new Server(); this.addListeners(); } @@ -66,19 +66,29 @@ export class PacketServer extends Emittery { resolve(); }); + + this.sockets.forEach((socket) => void socket.end()); }); } private onConnection(socket: Socket): void { - const client = new PacketSocket(this.packetMapper, socket); + const packetSocket = new PacketSocket(this.packetMapper, socket); + + this.sockets.add(packetSocket); - void this.emit('connection', client); + packetSocket.on('close', () => { + this.sockets.delete(packetSocket); + }); + + void this.emit('connection', packetSocket); } private addListeners(): void { this.server.on('connection', this.onConnection.bind(this)); this.server.on('listening', () => void this.emit('listening')); - this.server.on('close', () => void this.emit('close')); + this.server.on('close', () => { + void this.emit('close'); + }); /* istanbul ignore next - unable to test */ this.server.on('error', (error) => void this.emit('error', error)); /* istanbul ignore next - unable to test */