diff --git a/packages/adapter-tcp/src/device.connection.ts b/packages/adapter-tcp/src/aggregate-roots/packet-connection.aggregate-root.ts similarity index 52% rename from packages/adapter-tcp/src/device.connection.ts rename to packages/adapter-tcp/src/aggregate-roots/packet-connection.aggregate-root.ts index 02476bd..d817e1e 100644 --- a/packages/adapter-tcp/src/device.connection.ts +++ b/packages/adapter-tcp/src/aggregate-roots/packet-connection.aggregate-root.ts @@ -1,9 +1,9 @@ -import { Device } from '@agnoc/domain'; -import { ArgumentInvalidException, DomainException, ID } from '@agnoc/toolkit'; +import { Connection } from '@agnoc/domain'; +import { ID } from '@agnoc/toolkit'; import { PacketSocket } from '@agnoc/transport-tcp'; -import Emittery from 'emittery'; -import type { PacketEventBus } from './packet.event-bus'; -import type { PacketMessage } from './packet.message'; +import type { PacketEventBus } from '../packet.event-bus'; +import type { PacketMessage } from '../packet.message'; +import type { ConnectionProps } from '@agnoc/domain'; import type { Packet, PacketFactory, @@ -12,37 +12,23 @@ import type { CreatePacketProps, } from '@agnoc/transport-tcp'; -export interface DeviceConnectionEvents { - data: Packet; - close: undefined; - error: Error; +export interface PacketConnectionProps extends ConnectionProps { + socket: PacketSocket; } -export class DeviceConnection extends Emittery { - #device?: Device; +export class PacketConnection extends Connection { + readonly connectionType = 'PACKET'; constructor( private readonly packetFactory: PacketFactory, private readonly eventBus: PacketEventBus, - private readonly socket: PacketSocket, + props: PacketConnectionProps, ) { - super(); - this.validateSocket(); - this.addListeners(); + super(props); } - get device(): Device | undefined { - return this.#device; - } - - set device(device: Device | undefined) { - if (device && !(device instanceof Device)) { - throw new ArgumentInvalidException( - `Value '${device as string} for property 'device' of Connection is not an instance of Device`, - ); - } - - this.#device = device; + get socket(): PacketSocket { + return this.props.socket; } send(name: Name, object: PayloadObjectFrom): Promise { @@ -73,8 +59,15 @@ export class DeviceConnection extends Emittery { return this.socket.end(); } + protected override validate(props: PacketConnectionProps): void { + super.validate(props); + + this.validateDefinedProp(props, 'socket'); + this.validateInstanceProp(props, 'socket', PacketSocket); + } + private getPacketProps(): CreatePacketProps { - return { deviceId: this.#device?.id ?? new ID(0), userId: this.#device?.userId ?? new ID(0) }; + return { deviceId: this.device?.id ?? new ID(0), userId: this.device?.userId ?? new ID(0) }; } private writeAndWait(packet: Packet): Promise { @@ -92,25 +85,7 @@ export class DeviceConnection extends Emittery { return this.socket.write(packet); } - private validateSocket() { - if (!(this.socket instanceof PacketSocket)) { - throw new DomainException('Socket for Connection is not an instance of PacketSocket'); - } - - if (!this.socket.connected) { - throw new DomainException('Socket for Connection is closed'); - } - } - - private addListeners() { - this.socket.on('data', (packet) => { - void this.emit('data', packet); - }); - this.socket.on('error', (err) => { - void this.emit('error', err); - }); - this.socket.on('close', () => { - void this.emit('close'); - }); + static isPacketConnection(connection: Connection): connection is PacketConnection { + return connection.connectionType === 'PACKET'; } } diff --git a/packages/adapter-tcp/src/event-handlers/command-event-handlers/locate-device.event-handler.ts b/packages/adapter-tcp/src/event-handlers/command-event-handlers/locate-device.event-handler.ts index e235364..4f2b615 100644 --- a/packages/adapter-tcp/src/event-handlers/command-event-handlers/locate-device.event-handler.ts +++ b/packages/adapter-tcp/src/event-handlers/command-event-handlers/locate-device.event-handler.ts @@ -1,19 +1,27 @@ import { DomainException } from '@agnoc/toolkit'; -import type { PackerServerConnectionHandler } from '../../packet-server.connection-handler'; -import type { CommandHandler, LocateDeviceCommand } from '@agnoc/domain'; +import { PacketConnection } from '../../aggregate-roots/packet-connection.aggregate-root'; +import type { CommandHandler, Connection, ConnectionRepository, LocateDeviceCommand } from '@agnoc/domain'; export class LocateDeviceEventHandler implements CommandHandler { readonly forName = 'LocateDeviceCommand'; - constructor(private readonly connectionManager: PackerServerConnectionHandler) {} + constructor(private readonly connectionRepository: ConnectionRepository) {} async handle(event: LocateDeviceCommand): Promise { - const [connection] = this.connectionManager.findConnectionsByDeviceId(event.deviceId); + const connections = await this.connectionRepository.findByDeviceId(event.deviceId); - if (!connection || !connection.device) { + if (connections.length === 0) { throw new DomainException(`Unable to find a connection for the device with id ${event.deviceId.value}`); } + const connection = connections.find((connection: Connection): connection is PacketConnection => + PacketConnection.isPacketConnection(connection), + ); + + if (!connection) { + return; + } + const response = await connection.sendAndWait('DEVICE_SEEK_LOCATION_REQ', {}); if (response.packet.payload.opcode.value !== 'DEVICE_SEEK_LOCATION_RSP') { diff --git a/packages/adapter-tcp/src/event-handlers/domain-event-handlers/lock-device-when-device-is-connected-event-handler.event-handler.ts b/packages/adapter-tcp/src/event-handlers/domain-event-handlers/lock-device-when-device-is-connected-event-handler.event-handler.ts index aa2eb1e..76c411c 100644 --- a/packages/adapter-tcp/src/event-handlers/domain-event-handlers/lock-device-when-device-is-connected-event-handler.event-handler.ts +++ b/packages/adapter-tcp/src/event-handlers/domain-event-handlers/lock-device-when-device-is-connected-event-handler.event-handler.ts @@ -1,19 +1,27 @@ import { DomainException } from '@agnoc/toolkit'; -import type { PackerServerConnectionHandler } from '../../packet-server.connection-handler'; -import type { DomainEventHandler, DeviceConnectedDomainEvent } from '@agnoc/domain'; +import { PacketConnection } from '../../aggregate-roots/packet-connection.aggregate-root'; +import type { DomainEventHandler, DeviceConnectedDomainEvent, ConnectionRepository, Connection } from '@agnoc/domain'; export class LockDeviceWhenDeviceIsConnectedEventHandler implements DomainEventHandler { readonly forName = 'DeviceConnectedDomainEvent'; - constructor(private readonly connectionManager: PackerServerConnectionHandler) {} + constructor(private readonly connectionRepository: ConnectionRepository) {} async handle(event: DeviceConnectedDomainEvent): Promise { - const [connection] = this.connectionManager.findConnectionsByDeviceId(event.aggregateId); + const connections = await this.connectionRepository.findByDeviceId(event.aggregateId); - if (!connection) { + if (connections.length === 0) { throw new DomainException(`Unable to find a connection for the device with id ${event.aggregateId.value}`); } + const connection = connections.find((connection: Connection): connection is PacketConnection => + PacketConnection.isPacketConnection(connection), + ); + + if (!connection) { + return; + } + await connection.send('DEVICE_CONTROL_LOCK_REQ', {}); } } diff --git a/packages/adapter-tcp/src/event-handlers/domain-event-handlers/query-device-info-when-device-is-locked-event-handler.event-handler.ts b/packages/adapter-tcp/src/event-handlers/domain-event-handlers/query-device-info-when-device-is-locked-event-handler.event-handler.ts index d46a09d..af2b38e 100644 --- a/packages/adapter-tcp/src/event-handlers/domain-event-handlers/query-device-info-when-device-is-locked-event-handler.event-handler.ts +++ b/packages/adapter-tcp/src/event-handlers/domain-event-handlers/query-device-info-when-device-is-locked-event-handler.event-handler.ts @@ -1,20 +1,28 @@ import { DeviceCapability } from '@agnoc/domain'; import { DomainException } from '@agnoc/toolkit'; -import type { PackerServerConnectionHandler } from '../../packet-server.connection-handler'; -import type { DeviceLockedDomainEvent, DomainEventHandler } from '@agnoc/domain'; +import { PacketConnection } from '../../aggregate-roots/packet-connection.aggregate-root'; +import type { DeviceLockedDomainEvent, DomainEventHandler, Connection, ConnectionRepository } from '@agnoc/domain'; export class QueryDeviceInfoWhenDeviceIsLockedEventHandler implements DomainEventHandler { readonly forName = 'DeviceLockedDomainEvent'; - constructor(private readonly connectionManager: PackerServerConnectionHandler) {} + constructor(private readonly connectionRepository: ConnectionRepository) {} async handle(event: DeviceLockedDomainEvent): Promise { - const [connection] = this.connectionManager.findConnectionsByDeviceId(event.aggregateId); + const connections = await this.connectionRepository.findByDeviceId(event.aggregateId); - if (!connection || !connection.device) { + if (connections.length === 0) { throw new DomainException(`Unable to find a connection for the device with id ${event.aggregateId.value}`); } + const connection = connections.find((connection: Connection): connection is PacketConnection => + PacketConnection.isPacketConnection(connection), + ); + + if (!connection) { + return; + } + await connection.send('DEVICE_STATUS_GETTING_REQ', {}); await connection.send('DEVICE_GET_ALL_GLOBAL_MAP_INFO_REQ', { unk1: 0, unk2: '' }); @@ -23,7 +31,7 @@ export class QueryDeviceInfoWhenDeviceIsLockedEventHandler implements DomainEven // TODO: move this to a get map service. await connection.send('DEVICE_MAPID_GET_GLOBAL_INFO_REQ', { - mask: connection.device.system.supports(DeviceCapability.MAP_PLANS) ? 0x78ff : 0xff, + mask: connection.device?.system.supports(DeviceCapability.MAP_PLANS) ? 0x78ff : 0xff, }); // TODO: move this to a get wlan service. diff --git a/packages/adapter-tcp/src/event-handlers/domain-event-handlers/set-device-connected-when-connection-device-changed.domain-event.ts b/packages/adapter-tcp/src/event-handlers/domain-event-handlers/set-device-connected-when-connection-device-changed.domain-event.ts new file mode 100644 index 0000000..ef44589 --- /dev/null +++ b/packages/adapter-tcp/src/event-handlers/domain-event-handlers/set-device-connected-when-connection-device-changed.domain-event.ts @@ -0,0 +1,32 @@ +import type { + DomainEventHandler, + ConnectionRepository, + DeviceRepository, + ConnectionDeviceChangedDomainEvent, +} from '@agnoc/domain'; + +export class SetDeviceAsConnectedWhenConnectionDeviceAddedDomainEventHandler implements DomainEventHandler { + readonly forName = 'ConnectionDeviceChangedDomainEvent'; + + constructor( + private readonly connectionRepository: ConnectionRepository, + private readonly deviceRepository: DeviceRepository, + ) {} + + async handle(event: ConnectionDeviceChangedDomainEvent): Promise { + if (event.currentDeviceId) { + const connections = await this.connectionRepository.findByDeviceId(event.currentDeviceId); + const device = await this.deviceRepository.findOneById(event.currentDeviceId); + + // This is a hack to only mark the device as connected if there is more than one connection. + // Here we should check that the connections are from the same ip address. + if (connections.length > 1 && device && !device.isConnected) { + device.setAsConnected(); + + await this.deviceRepository.saveOne(device); + } + } + + // TODO: handle device disconnection + } +} diff --git a/packages/adapter-tcp/src/factories/connection.factory.ts b/packages/adapter-tcp/src/factories/connection.factory.ts new file mode 100644 index 0000000..431cdfe --- /dev/null +++ b/packages/adapter-tcp/src/factories/connection.factory.ts @@ -0,0 +1,13 @@ +import { PacketConnection } from '../aggregate-roots/packet-connection.aggregate-root'; +import type { PacketConnectionProps } from '../aggregate-roots/packet-connection.aggregate-root'; +import type { PacketEventBus } from '../packet.event-bus'; +import type { Factory } from '@agnoc/toolkit'; +import type { PacketFactory } from '@agnoc/transport-tcp'; + +export class PacketConnectionFactory implements Factory { + constructor(private readonly packetEventBus: PacketEventBus, private readonly packetFactory: PacketFactory) {} + + create(props: PacketConnectionProps): PacketConnection { + return new PacketConnection(this.packetFactory, this.packetEventBus, props); + } +} diff --git a/packages/adapter-tcp/src/packet-server.connection-handler.ts b/packages/adapter-tcp/src/packet-server.connection-handler.ts index 8b7c693..e183361 100644 --- a/packages/adapter-tcp/src/packet-server.connection-handler.ts +++ b/packages/adapter-tcp/src/packet-server.connection-handler.ts @@ -1,26 +1,21 @@ -import { DomainException } from '@agnoc/toolkit'; -import { DeviceConnection } from './device.connection'; +import { DomainException, ID } from '@agnoc/toolkit'; import { PacketMessage } from './packet.message'; +import type { PacketConnection } from './aggregate-roots/packet-connection.aggregate-root'; +import type { PacketConnectionFactory } from './factories/connection.factory'; import type { PacketEventBus, PacketEventBusEvents } from './packet.event-bus'; -import type { DeviceRepository, Device } from '@agnoc/domain'; -import type { ID } from '@agnoc/toolkit'; -import type { PacketServer, PacketFactory, Packet, PayloadObjectName } from '@agnoc/transport-tcp'; +import type { DeviceRepository, Device, Connection, ConnectionRepository } from '@agnoc/domain'; +import type { PacketServer, Packet, PayloadObjectName } from '@agnoc/transport-tcp'; export class PackerServerConnectionHandler { - private readonly servers = new Map>(); + private readonly servers = new Map>(); constructor( private readonly packetEventBus: PacketEventBus, - private readonly packetFactory: PacketFactory, private readonly deviceRepository: DeviceRepository, + private readonly connectionRepository: ConnectionRepository, + private readonly packetConnectionFactory: PacketConnectionFactory, ) {} - findConnectionsByDeviceId(deviceId: ID): DeviceConnection[] { - const connections = [...this.servers.values()].flatMap((connections) => [...connections]); - - return connections.filter((connection) => connection.device?.id.equals(deviceId)); - } - addServers(...servers: PacketServer[]): void { servers.forEach((server) => { this.servers.set(server, new Set()); @@ -30,11 +25,11 @@ export class PackerServerConnectionHandler { private addListeners(server: PacketServer) { server.on('connection', (socket) => { - const connection = new DeviceConnection(this.packetFactory, this.packetEventBus, socket); + const connection = this.packetConnectionFactory.create({ id: ID.generate(), socket }); this.servers.get(server)?.add(connection); - connection.on('data', async (packet: Packet) => { + connection.socket.on('data', async (packet: Packet) => { const packetMessage = new PacketMessage(connection, packet); // Update the device on the connection if the device id has changed. @@ -42,13 +37,9 @@ export class PackerServerConnectionHandler { // Send the packet message to the packet event bus. await this.emitPacketEvent(packetMessage); - - // This is a hack to only mark the device as connected if there is more than one connection. - // Here we should check that the connections are from the same ip address. - await this.tryToSetDeviceAsConnected(connection); }); - connection.on('close', () => { + connection.socket.on('close', () => { this.servers.get(server)?.delete(connection); }); }); @@ -64,18 +55,6 @@ export class PackerServerConnectionHandler { }); } - private async tryToSetDeviceAsConnected(connection: DeviceConnection) { - if (connection.device && !connection.device.isConnected) { - const connections = this.findConnectionsByDeviceId(connection.device.id); - - if (connections.length > 1) { - connection.device.setAsConnected(); - - await this.deviceRepository.saveOne(connection.device); - } - } - } - private async emitPacketEvent(message: PacketMessage) { const name = message.packet.payload.opcode.name as PayloadObjectName; const sequence = message.packet.sequence.toString(); @@ -99,9 +78,13 @@ export class PackerServerConnectionHandler { } } - private async updateConnectionDevice(packet: Packet, connection: DeviceConnection) { + private async updateConnectionDevice(packet: Packet, connection: Connection) { if (!packet.deviceId.equals(connection.device?.id)) { - connection.device = await this.findDeviceById(packet.deviceId); + const device = await this.findDeviceById(packet.deviceId); + + connection.setDevice(device); + + await this.connectionRepository.saveOne(connection); } } diff --git a/packages/adapter-tcp/src/packet.message.ts b/packages/adapter-tcp/src/packet.message.ts index a5cffcc..b3e7d9f 100644 --- a/packages/adapter-tcp/src/packet.message.ts +++ b/packages/adapter-tcp/src/packet.message.ts @@ -1,9 +1,9 @@ -import type { DeviceConnection } from './device.connection'; +import type { PacketConnection } from './aggregate-roots/packet-connection.aggregate-root'; import type { Device } from '@agnoc/domain'; import type { Packet, PayloadObjectFrom, PayloadObjectName } from '@agnoc/transport-tcp'; export class PacketMessage { - constructor(readonly connection: DeviceConnection, readonly packet: Packet) {} + constructor(readonly connection: PacketConnection, readonly packet: Packet) {} get device(): Device | undefined { return this.connection.device; diff --git a/packages/adapter-tcp/src/tcp.server.test.ts b/packages/adapter-tcp/src/tcp.server.test.ts index 27104de..49b782f 100644 --- a/packages/adapter-tcp/src/tcp.server.test.ts +++ b/packages/adapter-tcp/src/tcp.server.test.ts @@ -1,20 +1,23 @@ import { imock, instance } from '@johanblumenberg/ts-mockito'; import { TCPServer } from './tcp.server'; -import type { Commands, DeviceRepository } from '@agnoc/domain'; +import type { Commands, ConnectionRepository, DeviceRepository } from '@agnoc/domain'; import type { EventHandlerRegistry, TaskHandlerRegistry } from '@agnoc/toolkit'; describe('TCPServer', function () { let domainEventHandlerRegistry: EventHandlerRegistry; let commandHandlerRegistry: TaskHandlerRegistry; let deviceRepository: DeviceRepository; + let connectionRepository: ConnectionRepository; let tcpAdapter: TCPServer; beforeEach(function () { domainEventHandlerRegistry = imock(); commandHandlerRegistry = imock(); deviceRepository = imock(); + connectionRepository = imock(); tcpAdapter = new TCPServer( instance(deviceRepository), + instance(connectionRepository), instance(domainEventHandlerRegistry), instance(commandHandlerRegistry), ); diff --git a/packages/adapter-tcp/src/tcp.server.ts b/packages/adapter-tcp/src/tcp.server.ts index cb02d79..0fe43ff 100644 --- a/packages/adapter-tcp/src/tcp.server.ts +++ b/packages/adapter-tcp/src/tcp.server.ts @@ -11,6 +11,7 @@ import { import { LocateDeviceEventHandler } from './event-handlers/command-event-handlers/locate-device.event-handler'; import { LockDeviceWhenDeviceIsConnectedEventHandler } from './event-handlers/domain-event-handlers/lock-device-when-device-is-connected-event-handler.event-handler'; import { QueryDeviceInfoWhenDeviceIsLockedEventHandler } from './event-handlers/domain-event-handlers/query-device-info-when-device-is-locked-event-handler.event-handler'; +import { SetDeviceAsConnectedWhenConnectionDeviceAddedDomainEventHandler } from './event-handlers/domain-event-handlers/set-device-connected-when-connection-device-changed.domain-event'; import { ClientHeartbeatEventHandler } from './event-handlers/packet-event-handlers/client-heartbeat.event-handler'; import { ClientLoginEventHandler } from './event-handlers/packet-event-handlers/client-login.event-handler'; import { DeviceBatteryUpdateEventHandler } from './event-handlers/packet-event-handlers/device-battery-update.event-handler'; @@ -31,6 +32,7 @@ import { DeviceTimeUpdateEventHandler } from './event-handlers/packet-event-hand import { DeviceUpgradeInfoEventHandler } from './event-handlers/packet-event-handlers/device-upgrade-info.event-handler'; import { DeviceVersionUpdateEventHandler } from './event-handlers/packet-event-handlers/device-version-update.event-handler'; import { DeviceWlanUpdateEventHandler } from './event-handlers/packet-event-handlers/device-wlan-update.event-handler'; +import { PacketConnectionFactory } from './factories/connection.factory'; import { DeviceBatteryMapper } from './mappers/device-battery.mapper'; import { DeviceErrorMapper } from './mappers/device-error.mapper'; import { DeviceFanSpeedMapper } from './mappers/device-fan-speed.mapper'; @@ -41,7 +43,7 @@ import { DeviceWaterLevelMapper } from './mappers/device-water-level.mapper'; import { NTPServerConnectionHandler } from './ntp-server.connection-handler'; import { PackerServerConnectionHandler } from './packet-server.connection-handler'; import { PacketEventBus } from './packet.event-bus'; -import type { Commands, DeviceRepository } from '@agnoc/domain'; +import type { Commands, ConnectionRepository, DeviceRepository } from '@agnoc/domain'; import type { Server, TaskHandlerRegistry } from '@agnoc/toolkit'; import type { AddressInfo } from 'net'; @@ -52,6 +54,7 @@ export class TCPServer implements Server { constructor( private readonly deviceRepository: DeviceRepository, + private readonly connectionRepository: ConnectionRepository, private readonly domainEventHandlerRegistry: EventHandlerRegistry, private readonly commandHandlerRegistry: TaskHandlerRegistry, ) { @@ -78,8 +81,14 @@ export class TCPServer implements Server { const packetEventBus = new PacketEventBus(); const packetEventHandlerRegistry = new EventHandlerRegistry(packetEventBus); - // Connection managers - const connectionManager = new PackerServerConnectionHandler(packetEventBus, packetFactory, this.deviceRepository); + // Connection + const packetConnectionFactory = new PacketConnectionFactory(packetEventBus, packetFactory); + const connectionManager = new PackerServerConnectionHandler( + packetEventBus, + this.deviceRepository, + this.connectionRepository, + packetConnectionFactory, + ); connectionManager.addServers(this.cmdServer, this.mapServer); @@ -127,12 +136,13 @@ export class TCPServer implements Server { // Domain event handlers this.domainEventHandlerRegistry.register( - new LockDeviceWhenDeviceIsConnectedEventHandler(connectionManager), - new QueryDeviceInfoWhenDeviceIsLockedEventHandler(connectionManager), + new LockDeviceWhenDeviceIsConnectedEventHandler(connectionRepository), + new QueryDeviceInfoWhenDeviceIsLockedEventHandler(connectionRepository), + new SetDeviceAsConnectedWhenConnectionDeviceAddedDomainEventHandler(connectionRepository, deviceRepository), ); // Command event handlers - this.commandHandlerRegistry.register(new LocateDeviceEventHandler(connectionManager)); + this.commandHandlerRegistry.register(new LocateDeviceEventHandler(connectionRepository)); } async listen(options: TCPAdapterListenOptions = listenDefaultOptions): Promise { diff --git a/packages/adapter-tcp/test/integration/tcp.server.test.ts b/packages/adapter-tcp/test/integration/tcp.server.test.ts index 280a335..fc9bc7c 100644 --- a/packages/adapter-tcp/test/integration/tcp.server.test.ts +++ b/packages/adapter-tcp/test/integration/tcp.server.test.ts @@ -1,5 +1,6 @@ +/* eslint-disable import/no-extraneous-dependencies */ import { once } from 'events'; -import { CommandBus, Device, DeviceRepository, DomainEventBus } from '@agnoc/domain'; +import { CommandBus, ConnectionRepository, Device, DeviceRepository, DomainEventBus } from '@agnoc/domain'; import { givenSomeDeviceProps } from '@agnoc/domain/test-support'; import { EventHandlerRegistry, ID, MemoryAdapter, TaskHandlerRegistry } from '@agnoc/toolkit'; import { @@ -18,12 +19,13 @@ import type { Commands } from '@agnoc/domain'; import type { ICLIENT_ONLINE_REQ, IDEVICE_REGISTER_REQ } from '@agnoc/schemas-tcp'; import type { CreatePacketProps, Packet } from '@agnoc/transport-tcp'; -describe('TCPAdapter', function () { +describe('Integration', function () { let domainEventBus: DomainEventBus; let commandBus: CommandBus; let domainEventHandlerRegistry: EventHandlerRegistry; let commandHandlerRegistry: TaskHandlerRegistry; let deviceRepository: DeviceRepository; + let connectionRepository: ConnectionRepository; let tcpAdapter: TCPServer; let packetSocket: PacketSocket; let secondPacketSocket: PacketSocket; @@ -37,7 +39,13 @@ describe('TCPAdapter', function () { domainEventHandlerRegistry = new EventHandlerRegistry(domainEventBus); commandHandlerRegistry = new TaskHandlerRegistry(commandBus); deviceRepository = new DeviceRepository(domainEventBus, new MemoryAdapter()); - tcpAdapter = new TCPServer(deviceRepository, domainEventHandlerRegistry, commandHandlerRegistry); + connectionRepository = new ConnectionRepository(domainEventBus, new MemoryAdapter()); + tcpAdapter = new TCPServer( + deviceRepository, + connectionRepository, + domainEventHandlerRegistry, + commandHandlerRegistry, + ); // Client blocks const payloadMapper = new PayloadMapper(new PayloadObjectParserService(getProtobufRoot(), getCustomDecoders())); @@ -136,6 +144,7 @@ describe('TCPAdapter', function () { it('should handle a device connection', async function () { const device = new Device(givenSomeDeviceProps()); let receivedPacket: Packet; + let secondReceivedPacket: Packet; await deviceRepository.saveOne(device); @@ -159,21 +168,18 @@ describe('TCPAdapter', function () { // The device already has two identified connections and // the device should be marked as connected. - await domainEventBus.once('DeviceConnectedDomainEvent'); + // eslint-disable-next-line prefer-const + [, [receivedPacket], [secondReceivedPacket]] = await Promise.all([ + domainEventBus.once('DeviceConnectedDomainEvent'), + once(packetSocket, 'data') as Promise[]>, + once(secondPacketSocket, 'data') as Promise[]>, + ]); expect(device.isConnected).to.be.true; - - [receivedPacket] = (await once(secondPacketSocket, 'data')) as Packet<'CLIENT_HEARTBEAT_RSP'>[]; - - expect(receivedPacket.payload.opcode.value).to.be.equal('CLIENT_HEARTBEAT_RSP'); - - expect(device.isConnected).to.be.true; - - [receivedPacket] = (await once(packetSocket, 'data')) as Packet<'DEVICE_CONTROL_LOCK_REQ'>[]; - expect(receivedPacket.payload.opcode.value).to.be.equal('DEVICE_CONTROL_LOCK_REQ'); + expect(secondReceivedPacket.payload.opcode.value).to.be.equal('CLIENT_HEARTBEAT_RSP'); - void secondPacketSocket.write( + void packetSocket.write( packetFactory.create('DEVICE_CONTROL_LOCK_RSP', { result: 0 }, givenSomeCreatePacketProps(device)), ); diff --git a/packages/core/src/agnoc.server.test.ts b/packages/core/src/agnoc.server.test.ts index 318b159..f874f66 100644 --- a/packages/core/src/agnoc.server.test.ts +++ b/packages/core/src/agnoc.server.test.ts @@ -1,4 +1,4 @@ -import { DeviceRepository, LocateDeviceCommand } from '@agnoc/domain'; +import { ConnectionRepository, DeviceRepository, LocateDeviceCommand } from '@agnoc/domain'; import { EventHandlerRegistry, ID, TaskHandlerRegistry } from '@agnoc/toolkit'; import { capture, fnmock, imock, instance, verify, when } from '@johanblumenberg/ts-mockito'; import { expect } from 'chai'; @@ -19,6 +19,7 @@ describe('AgnocServer', function () { it('should provide a container to build an adapter', function () { agnocServer.buildAdapter((container) => { expect(container.deviceRepository).to.be.instanceOf(DeviceRepository); + expect(container.connectionRepository).to.be.instanceOf(ConnectionRepository); expect(container.domainEventHandlerRegistry).to.be.instanceOf(EventHandlerRegistry); expect(container.commandHandlerRegistry).to.be.instanceOf(TaskHandlerRegistry); diff --git a/packages/core/src/agnoc.server.ts b/packages/core/src/agnoc.server.ts index d084f2c..359d3f8 100644 --- a/packages/core/src/agnoc.server.ts +++ b/packages/core/src/agnoc.server.ts @@ -1,4 +1,4 @@ -import { CommandBus, DeviceRepository, DomainEventBus } from '@agnoc/domain'; +import { CommandBus, ConnectionRepository, DeviceRepository, DomainEventBus } from '@agnoc/domain'; import { EventHandlerRegistry, MemoryAdapter, TaskHandlerRegistry } from '@agnoc/toolkit'; import type { DomainEventNames, DomainEvents, Commands } from '@agnoc/domain'; import type { Server, TaskOutput } from '@agnoc/toolkit'; @@ -9,6 +9,7 @@ export class AgnocServer implements Server { private readonly commandBus: CommandBus; private readonly commandHandlerRegistry: TaskHandlerRegistry; private readonly deviceRepository: DeviceRepository; + private readonly connectionRepository: ConnectionRepository; private readonly adapters = new Set(); constructor() { @@ -17,6 +18,7 @@ export class AgnocServer implements Server { this.commandBus = new CommandBus(); this.commandHandlerRegistry = new TaskHandlerRegistry(this.commandBus); this.deviceRepository = new DeviceRepository(this.domainEventBus, new MemoryAdapter()); + this.connectionRepository = new ConnectionRepository(this.domainEventBus, new MemoryAdapter()); } subscribe(eventName: Name, handler: SubscribeHandler): void { @@ -32,6 +34,7 @@ export class AgnocServer implements Server { domainEventHandlerRegistry: this.domainEventHandlerRegistry, commandHandlerRegistry: this.commandHandlerRegistry, deviceRepository: this.deviceRepository, + connectionRepository: this.connectionRepository, }); this.adapters.add(adapter); @@ -52,6 +55,7 @@ export type Container = { domainEventHandlerRegistry: EventHandlerRegistry; commandHandlerRegistry: TaskHandlerRegistry; deviceRepository: DeviceRepository; + connectionRepository: ConnectionRepository; }; export type SubscribeHandler = (event: DomainEvents[Name]) => Promise; diff --git a/packages/domain/src/aggregate-roots/connection.aggregate-root.test.ts b/packages/domain/src/aggregate-roots/connection.aggregate-root.test.ts new file mode 100644 index 0000000..b3bc053 --- /dev/null +++ b/packages/domain/src/aggregate-roots/connection.aggregate-root.test.ts @@ -0,0 +1,116 @@ +import { AggregateRoot, ArgumentInvalidException } from '@agnoc/toolkit'; +import { expect } from 'chai'; +import { ConnectionDeviceChangedDomainEvent } from '../domain-events/connection-device-changed.domain-event'; +import { givenSomeDeviceProps, givenSomeConnectionProps } from '../test-support'; +import { Connection } from './connection.aggregate-root'; +import { Device } from './device.aggregate-root'; +import type { ConnectionProps } from './connection.aggregate-root'; + +describe('Connection', function () { + it('should be created', function () { + const props = givenSomeDeviceProps(); + const connection = new DummyConnection(props); + + expect(connection).to.be.instanceOf(AggregateRoot); + expect(connection.id).to.be.equal(props.id); + }); + + it('should be created with device', function () { + const device = new Device(givenSomeDeviceProps()); + const connection = new DummyConnection({ ...givenSomeConnectionProps(), device }); + + expect(connection.device).to.be.equal(device); + }); + + it("should throw an error when 'deviceId' is not a Device", function () { + // @ts-expect-error - invalid property + expect(() => new DummyConnection({ ...givenSomeConnectionProps(), device: 'foo' })).to.throw( + ArgumentInvalidException, + `Value 'foo' for property 'device' of DummyConnection is not an instance of Device`, + ); + }); + + describe('#setDevice', function () { + it('should set a device', function () { + const device = new Device(givenSomeDeviceProps()); + const connection = new DummyConnection({ ...givenSomeConnectionProps(), device: undefined }); + + connection.setDevice(device); + + expect(connection.device).to.be.equal(device); + expect(connection.domainEvents).to.deep.contain( + new ConnectionDeviceChangedDomainEvent({ + aggregateId: connection.id, + previousDeviceId: undefined, + currentDeviceId: device.id, + }), + ); + }); + + it('should override a device', function () { + const deviceA = new Device(givenSomeDeviceProps()); + const deviceB = new Device(givenSomeDeviceProps()); + const connection = new DummyConnection({ ...givenSomeConnectionProps(), device: deviceA }); + + connection.setDevice(deviceB); + + expect(connection.device).to.be.equal(deviceB); + expect(connection.domainEvents).to.deep.contain( + new ConnectionDeviceChangedDomainEvent({ + aggregateId: connection.id, + previousDeviceId: deviceA.id, + currentDeviceId: deviceB.id, + }), + ); + }); + + it('should unset a device', function () { + const device = new Device(givenSomeDeviceProps()); + const connection = new DummyConnection({ ...givenSomeConnectionProps(), device }); + + connection.setDevice(undefined); + + expect(connection.device).to.be.equal(undefined); + expect(connection.domainEvents).to.deep.contain( + new ConnectionDeviceChangedDomainEvent({ + aggregateId: connection.id, + previousDeviceId: device.id, + currentDeviceId: undefined, + }), + ); + }); + + it('should do nothing when setting the same device', function () { + const device = new Device(givenSomeDeviceProps()); + const connection = new DummyConnection({ ...givenSomeConnectionProps(), device }); + + connection.setDevice(device); + + expect(connection.device).to.be.equal(device); + expect(connection.domainEvents).to.be.empty; + }); + + it('should do nothing when setting nothing over nothing', function () { + const connection = new DummyConnection({ ...givenSomeConnectionProps(), device: undefined }); + + connection.setDevice(undefined); + + expect(connection.device).to.be.equal(undefined); + expect(connection.domainEvents).to.be.empty; + }); + + it('should throw an error when setting not a Device', function () { + const connection = new DummyConnection({ ...givenSomeConnectionProps(), device: undefined }); + + // @ts-expect-error - invalid property + expect(() => connection.setDevice('foo')).to.throw( + ArgumentInvalidException, + `Value 'foo' for property 'device' of DummyConnection is not an instance of Device`, + ); + }); + }); +}); + +class DummyConnection extends Connection { + connectionType = 'Dummy'; +} diff --git a/packages/domain/src/aggregate-roots/connection.aggregate-root.ts b/packages/domain/src/aggregate-roots/connection.aggregate-root.ts new file mode 100644 index 0000000..ffc6c2e --- /dev/null +++ b/packages/domain/src/aggregate-roots/connection.aggregate-root.ts @@ -0,0 +1,42 @@ +import { AggregateRoot } from '@agnoc/toolkit'; +import { ConnectionDeviceChangedDomainEvent } from '../domain-events/connection-device-changed.domain-event'; +import { Device } from './device.aggregate-root'; +import type { EntityProps } from '@agnoc/toolkit'; + +export interface ConnectionProps extends EntityProps { + device?: Device; +} + +export abstract class Connection extends AggregateRoot { + abstract readonly connectionType: string; + + constructor(props: Props) { + super(props); + } + + get device(): Device | undefined { + return this.props.device; + } + + setDevice(device: Device | undefined): void { + if (this.device === device || this.device?.equals(device)) { + return; + } + + if (device) { + this.validateInstanceProp({ device }, 'device', Device); + } + + const previousDeviceId = this.device?.id; + const currentDeviceId = device?.id; + + this.props.device = device; + this.addEvent(new ConnectionDeviceChangedDomainEvent({ aggregateId: this.id, previousDeviceId, currentDeviceId })); + } + + protected validate(props: Props): void { + if (props.device) { + this.validateInstanceProp(props, 'device', Device); + } + } +} diff --git a/packages/domain/src/domain-events/connection-device-changed.domain-event.test.ts b/packages/domain/src/domain-events/connection-device-changed.domain-event.test.ts new file mode 100644 index 0000000..b3ecba1 --- /dev/null +++ b/packages/domain/src/domain-events/connection-device-changed.domain-event.test.ts @@ -0,0 +1,31 @@ +import { ID, DomainEvent } from '@agnoc/toolkit'; +import { expect } from 'chai'; +import { givenSomeConnectionDeviceChangedDomainEventProps } from '../test-support'; +import { ConnectionDeviceChangedDomainEvent } from './connection-device-changed.domain-event'; + +describe('ConnectionDeviceChangedDomainEvent', function () { + it('should be created', function () { + const props = givenSomeConnectionDeviceChangedDomainEventProps(); + const event = new ConnectionDeviceChangedDomainEvent(props); + + expect(event).to.be.instanceOf(DomainEvent); + expect(event.aggregateId).to.be.equal(props.aggregateId); + expect(event.previousDeviceId).to.be.undefined; + expect(event.currentDeviceId).to.be.undefined; + }); + + it('should be created with previousDeviceId', function () { + const props = { ...givenSomeConnectionDeviceChangedDomainEventProps(), previousDeviceId: ID.generate() }; + const event = new ConnectionDeviceChangedDomainEvent(props); + + expect(event.previousDeviceId).to.be.equal(event.previousDeviceId); + expect(event.currentDeviceId).to.be.undefined; + }); + + it('should be created with currentDeviceId', function () { + const event = new ConnectionDeviceChangedDomainEvent({ aggregateId: ID.generate() }); + + expect(event.previousDeviceId).to.be.undefined; + expect(event.previousDeviceId).to.be.equal(event.previousDeviceId); + }); +}); diff --git a/packages/domain/src/domain-events/connection-device-changed.domain-event.ts b/packages/domain/src/domain-events/connection-device-changed.domain-event.ts new file mode 100644 index 0000000..7dbf7ec --- /dev/null +++ b/packages/domain/src/domain-events/connection-device-changed.domain-event.ts @@ -0,0 +1,21 @@ +import { DomainEvent } from '@agnoc/toolkit'; +import type { DomainEventProps, ID } from '@agnoc/toolkit'; + +export interface ConnectionDeviceChangedDomainEventProps extends DomainEventProps { + previousDeviceId?: ID; + currentDeviceId?: ID; +} + +export class ConnectionDeviceChangedDomainEvent extends DomainEvent { + get previousDeviceId(): ID | undefined { + return this.props.previousDeviceId; + } + + get currentDeviceId(): ID | undefined { + return this.props.currentDeviceId; + } + + protected validate(): void { + // noop + } +} diff --git a/packages/domain/src/domain-events/domain-events.ts b/packages/domain/src/domain-events/domain-events.ts index 4118839..99dde31 100644 --- a/packages/domain/src/domain-events/domain-events.ts +++ b/packages/domain/src/domain-events/domain-events.ts @@ -1,9 +1,11 @@ +import type { ConnectionDeviceChangedDomainEvent } from './connection-device-changed.domain-event'; import type { DeviceConnectedDomainEvent } from './device-connected.domain-event'; import type { DeviceLockedDomainEvent } from './device-locked.domain-event'; export type DomainEvents = { DeviceConnectedDomainEvent: DeviceConnectedDomainEvent; DeviceLockedDomainEvent: DeviceLockedDomainEvent; + ConnectionDeviceChangedDomainEvent: ConnectionDeviceChangedDomainEvent; }; export type DomainEventNames = keyof DomainEvents; diff --git a/packages/domain/src/index.ts b/packages/domain/src/index.ts index 4262455..5b5d162 100644 --- a/packages/domain/src/index.ts +++ b/packages/domain/src/index.ts @@ -1,5 +1,8 @@ +export * from './aggregate-roots/connection.aggregate-root'; +export * from './aggregate-roots/device.aggregate-root'; export * from './commands/commands'; export * from './commands/locate-device.command'; +export * from './domain-events/connection-device-changed.domain-event'; export * from './domain-events/device-connected.domain-event'; export * from './domain-events/device-locked.domain-event'; export * from './domain-events/domain-events'; @@ -14,13 +17,13 @@ export * from './domain-primitives/device-water-level.domain-primitive'; export * from './domain-primitives/week-day.domain-primitive'; export * from './entities/device-map.entity'; export * from './entities/device-order.entity'; -export * from './aggregate-roots/device.aggregate-root'; export * from './entities/room.entity'; export * from './entities/zone.entity'; export * from './event-buses/command.event-bus'; export * from './event-buses/domain.event-bus'; export * from './event-handlers/command.event-handler'; export * from './event-handlers/domain.event-handler'; +export * from './repositories/connection.repository'; export * from './repositories/device.repository'; export * from './value-objects/device-clean-work.value-object'; export * from './value-objects/device-consumable.value-object'; diff --git a/packages/domain/src/repositories/connection.repository.test.ts b/packages/domain/src/repositories/connection.repository.test.ts new file mode 100644 index 0000000..25d85be --- /dev/null +++ b/packages/domain/src/repositories/connection.repository.test.ts @@ -0,0 +1,52 @@ +import { ID, Repository } from '@agnoc/toolkit'; +import { imock, instance, when } from '@johanblumenberg/ts-mockito'; +import { expect } from 'chai'; +import { Connection } from '../aggregate-roots/connection.aggregate-root'; +import { Device } from '../aggregate-roots/device.aggregate-root'; +import { givenSomeConnectionProps, givenSomeDeviceProps } from '../test-support'; +import { ConnectionRepository } from './connection.repository'; +import type { ConnectionProps } from '../aggregate-roots/connection.aggregate-root'; +import type { Adapter, EventBus } from '@agnoc/toolkit'; + +describe('ConnectionRepository', function () { + let eventBus: EventBus; + let adapter: Adapter; + let repository: ConnectionRepository; + + beforeEach(function () { + eventBus = imock(); + adapter = imock(); + repository = new ConnectionRepository(instance(eventBus), instance(adapter)); + }); + + it('should be a repository', function () { + expect(repository).to.be.an.instanceOf(Repository); + }); + + describe('#findByDeviceId', function () { + it('should find a connection by device id', async function () { + const connections = [ + givenAConnectionWithDeviceId(new ID(1)), + givenAConnectionWithDeviceId(new ID(2)), + givenAConnectionWithDeviceId(new ID(3)), + ]; + + when(adapter.getAll()).thenReturn(connections); + + const ret = await repository.findByDeviceId(new ID(1)); + + expect(ret).to.contain(connections[0]); + }); + }); +}); + +class DummyConnection extends Connection { + connectionType = 'Dummy'; +} + +function givenAConnectionWithDeviceId(deviceId: ID) { + return new DummyConnection({ + ...givenSomeConnectionProps(), + device: new Device({ ...givenSomeDeviceProps(), id: deviceId }), + }); +} diff --git a/packages/domain/src/repositories/connection.repository.ts b/packages/domain/src/repositories/connection.repository.ts new file mode 100644 index 0000000..0205cbc --- /dev/null +++ b/packages/domain/src/repositories/connection.repository.ts @@ -0,0 +1,15 @@ +import { Repository } from '@agnoc/toolkit'; +import type { Connection } from '../aggregate-roots/connection.aggregate-root'; +import type { ID } from '@agnoc/toolkit'; + +export interface ConnectionRepositoryPorts { + findByDeviceId(deviceId: ID): Promise; +} + +export class ConnectionRepository extends Repository implements ConnectionRepositoryPorts { + async findByDeviceId(deviceId: ID): Promise { + const connections = this.adapter.getAll() as Connection[]; + + return connections.filter((connection) => connection.device?.id.equals(deviceId)); + } +} diff --git a/packages/domain/src/test-support.ts b/packages/domain/src/test-support.ts index 5f4f563..47f0ba5 100644 --- a/packages/domain/src/test-support.ts +++ b/packages/domain/src/test-support.ts @@ -15,7 +15,9 @@ import { MapCoordinate } from './value-objects/map-coordinate.value-object'; import { MapPixel } from './value-objects/map-pixel.value-object'; import { QuietHoursSetting } from './value-objects/quiet-hours-setting.value-object'; import { VoiceSetting } from './value-objects/voice-setting.value-object'; +import type { ConnectionProps } from './aggregate-roots/connection.aggregate-root'; import type { DeviceProps } from './aggregate-roots/device.aggregate-root'; +import type { ConnectionDeviceChangedDomainEventProps } from './domain-events/connection-device-changed.domain-event'; import type { DeviceMapProps } from './entities/device-map.entity'; import type { DeviceOrderProps } from './entities/device-order.entity'; import type { RoomProps } from './entities/room.entity'; @@ -189,3 +191,15 @@ export function givenSomeDeviceProps(): DeviceProps { version: new DeviceVersion(givenSomeDeviceVersionProps()), }; } + +export function givenSomeConnectionProps(): ConnectionProps { + return { + id: ID.generate(), + }; +} + +export function givenSomeConnectionDeviceChangedDomainEventProps(): ConnectionDeviceChangedDomainEventProps { + return { + aggregateId: ID.generate(), + }; +} diff --git a/packages/eslint-config/typescript.js b/packages/eslint-config/typescript.js index db402f2..057174f 100644 --- a/packages/eslint-config/typescript.js +++ b/packages/eslint-config/typescript.js @@ -63,6 +63,7 @@ module.exports = { devDependencies: ['**/*.test.ts', '**/test/**/*.ts'], optionalDependencies: false, peerDependencies: true, + includeInternal: true, }, ], 'import/no-unresolved': 'off', diff --git a/packages/toolkit/src/base-classes/repository.base.ts b/packages/toolkit/src/base-classes/repository.base.ts index 9e42db8..d13b534 100644 --- a/packages/toolkit/src/base-classes/repository.base.ts +++ b/packages/toolkit/src/base-classes/repository.base.ts @@ -6,7 +6,7 @@ import type { EventBus } from './event-bus.base'; import type { ID } from '../domain-primitives/id.domain-primitive'; export abstract class Repository> { - constructor(private readonly eventBus: EventBus, private readonly adapter: Adapter) {} + constructor(private readonly eventBus: EventBus, protected readonly adapter: Adapter) {} async findOneById(id: ID): Promise { return this.adapter.get(id) as T | undefined;