Skip to content

Commit

Permalink
feat: add connection repository
Browse files Browse the repository at this point in the history
  • Loading branch information
adrigzr committed Mar 21, 2023
1 parent 09db582 commit 39e10eb
Show file tree
Hide file tree
Showing 24 changed files with 473 additions and 125 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Device } from '@agnoc/domain';
import { ArgumentInvalidException, DomainException, ID } from '@agnoc/toolkit';
import { Connection } from '@agnoc/domain';
import { ID } from '@agnoc/toolkit';
import { PacketSocket } from '@agnoc/transport-tcp';
import Emittery from 'emittery';
import type { PacketEventBus } from './packet.event-bus';
import type { PacketMessage } from './packet.message';
import type { PacketEventBus } from '../packet.event-bus';
import type { PacketMessage } from '../packet.message';
import type { ConnectionProps } from '@agnoc/domain';
import type {
Packet,
PacketFactory,
Expand All @@ -12,37 +12,23 @@ import type {
CreatePacketProps,
} from '@agnoc/transport-tcp';

export interface DeviceConnectionEvents {
data: Packet;
close: undefined;
error: Error;
export interface PacketConnectionProps extends ConnectionProps {
socket: PacketSocket;
}

export class DeviceConnection extends Emittery<DeviceConnectionEvents> {
#device?: Device;
export class PacketConnection extends Connection<PacketConnectionProps> {
readonly connectionType = 'PACKET';

constructor(
private readonly packetFactory: PacketFactory,
private readonly eventBus: PacketEventBus,
private readonly socket: PacketSocket,
props: PacketConnectionProps,
) {
super();
this.validateSocket();
this.addListeners();
super(props);
}

get device(): Device | undefined {
return this.#device;
}

set device(device: Device | undefined) {
if (device && !(device instanceof Device)) {
throw new ArgumentInvalidException(
`Value '${device as string} for property 'device' of Connection is not an instance of Device`,
);
}

this.#device = device;
get socket(): PacketSocket {
return this.props.socket;
}

send<Name extends PayloadObjectName>(name: Name, object: PayloadObjectFrom<Name>): Promise<void> {
Expand Down Expand Up @@ -73,8 +59,15 @@ export class DeviceConnection extends Emittery<DeviceConnectionEvents> {
return this.socket.end();
}

protected override validate(props: PacketConnectionProps): void {
super.validate(props);

this.validateDefinedProp(props, 'socket');
this.validateInstanceProp(props, 'socket', PacketSocket);
}

private getPacketProps(): CreatePacketProps {
return { deviceId: this.#device?.id ?? new ID(0), userId: this.#device?.userId ?? new ID(0) };
return { deviceId: this.device?.id ?? new ID(0), userId: this.device?.userId ?? new ID(0) };
}

private writeAndWait(packet: Packet): Promise<PacketMessage> {
Expand All @@ -92,25 +85,7 @@ export class DeviceConnection extends Emittery<DeviceConnectionEvents> {
return this.socket.write(packet);
}

private validateSocket() {
if (!(this.socket instanceof PacketSocket)) {
throw new DomainException('Socket for Connection is not an instance of PacketSocket');
}

if (!this.socket.connected) {
throw new DomainException('Socket for Connection is closed');
}
}

private addListeners() {
this.socket.on('data', (packet) => {
void this.emit('data', packet);
});
this.socket.on('error', (err) => {
void this.emit('error', err);
});
this.socket.on('close', () => {
void this.emit('close');
});
static isPacketConnection(connection: Connection): connection is PacketConnection {
return connection.connectionType === 'PACKET';
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
import { DomainException } from '@agnoc/toolkit';
import type { PackerServerConnectionHandler } from '../../packet-server.connection-handler';
import type { CommandHandler, LocateDeviceCommand } from '@agnoc/domain';
import { PacketConnection } from '../../aggregate-roots/packet-connection.aggregate-root';
import type { CommandHandler, Connection, ConnectionRepository, LocateDeviceCommand } from '@agnoc/domain';

export class LocateDeviceEventHandler implements CommandHandler {
readonly forName = 'LocateDeviceCommand';

constructor(private readonly connectionManager: PackerServerConnectionHandler) {}
constructor(private readonly connectionRepository: ConnectionRepository) {}

async handle(event: LocateDeviceCommand): Promise<void> {
const [connection] = this.connectionManager.findConnectionsByDeviceId(event.deviceId);
const connections = await this.connectionRepository.findByDeviceId(event.deviceId);

if (!connection || !connection.device) {
if (connections.length === 0) {
throw new DomainException(`Unable to find a connection for the device with id ${event.deviceId.value}`);
}

const connection = connections.find((connection: Connection): connection is PacketConnection =>
PacketConnection.isPacketConnection(connection),
);

if (!connection) {
return;
}

const response = await connection.sendAndWait('DEVICE_SEEK_LOCATION_REQ', {});

if (response.packet.payload.opcode.value !== 'DEVICE_SEEK_LOCATION_RSP') {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
import { DomainException } from '@agnoc/toolkit';
import type { PackerServerConnectionHandler } from '../../packet-server.connection-handler';
import type { DomainEventHandler, DeviceConnectedDomainEvent } from '@agnoc/domain';
import { PacketConnection } from '../../aggregate-roots/packet-connection.aggregate-root';
import type { DomainEventHandler, DeviceConnectedDomainEvent, ConnectionRepository, Connection } from '@agnoc/domain';

export class LockDeviceWhenDeviceIsConnectedEventHandler implements DomainEventHandler {
readonly forName = 'DeviceConnectedDomainEvent';

constructor(private readonly connectionManager: PackerServerConnectionHandler) {}
constructor(private readonly connectionRepository: ConnectionRepository) {}

async handle(event: DeviceConnectedDomainEvent): Promise<void> {
const [connection] = this.connectionManager.findConnectionsByDeviceId(event.aggregateId);
const connections = await this.connectionRepository.findByDeviceId(event.aggregateId);

if (!connection) {
if (connections.length === 0) {
throw new DomainException(`Unable to find a connection for the device with id ${event.aggregateId.value}`);
}

const connection = connections.find((connection: Connection): connection is PacketConnection =>
PacketConnection.isPacketConnection(connection),
);

if (!connection) {
return;
}

await connection.send('DEVICE_CONTROL_LOCK_REQ', {});
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
import { DeviceCapability } from '@agnoc/domain';
import { DomainException } from '@agnoc/toolkit';
import type { PackerServerConnectionHandler } from '../../packet-server.connection-handler';
import type { DeviceLockedDomainEvent, DomainEventHandler } from '@agnoc/domain';
import { PacketConnection } from '../../aggregate-roots/packet-connection.aggregate-root';
import type { DeviceLockedDomainEvent, DomainEventHandler, Connection, ConnectionRepository } from '@agnoc/domain';

export class QueryDeviceInfoWhenDeviceIsLockedEventHandler implements DomainEventHandler {
readonly forName = 'DeviceLockedDomainEvent';

constructor(private readonly connectionManager: PackerServerConnectionHandler) {}
constructor(private readonly connectionRepository: ConnectionRepository) {}

async handle(event: DeviceLockedDomainEvent): Promise<void> {
const [connection] = this.connectionManager.findConnectionsByDeviceId(event.aggregateId);
const connections = await this.connectionRepository.findByDeviceId(event.aggregateId);

if (!connection || !connection.device) {
if (connections.length === 0) {
throw new DomainException(`Unable to find a connection for the device with id ${event.aggregateId.value}`);
}

const connection = connections.find((connection: Connection): connection is PacketConnection =>
PacketConnection.isPacketConnection(connection),
);

if (!connection) {
return;
}

await connection.send('DEVICE_STATUS_GETTING_REQ', {});
await connection.send('DEVICE_GET_ALL_GLOBAL_MAP_INFO_REQ', { unk1: 0, unk2: '' });

Expand All @@ -23,7 +31,7 @@ export class QueryDeviceInfoWhenDeviceIsLockedEventHandler implements DomainEven

// TODO: move this to a get map service.
await connection.send('DEVICE_MAPID_GET_GLOBAL_INFO_REQ', {
mask: connection.device.system.supports(DeviceCapability.MAP_PLANS) ? 0x78ff : 0xff,
mask: connection.device?.system.supports(DeviceCapability.MAP_PLANS) ? 0x78ff : 0xff,
});

// TODO: move this to a get wlan service.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type {
DomainEventHandler,
ConnectionRepository,
DeviceRepository,
ConnectionDeviceChangedDomainEvent,
} from '@agnoc/domain';

export class SetDeviceAsConnectedWhenConnectionDeviceAddedDomainEventHandler implements DomainEventHandler {
readonly forName = 'ConnectionDeviceChangedDomainEvent';

constructor(
private readonly connectionRepository: ConnectionRepository,
private readonly deviceRepository: DeviceRepository,
) {}

async handle(event: ConnectionDeviceChangedDomainEvent): Promise<void> {
if (event.currentDeviceId) {
const connections = await this.connectionRepository.findByDeviceId(event.currentDeviceId);
const device = await this.deviceRepository.findOneById(event.currentDeviceId);

// This is a hack to only mark the device as connected if there is more than one connection.
// Here we should check that the connections are from the same ip address.
if (connections.length > 1 && device && !device.isConnected) {
device.setAsConnected();

await this.deviceRepository.saveOne(device);
}
}

// TODO: handle device disconnection
}
}
13 changes: 13 additions & 0 deletions packages/adapter-tcp/src/factories/connection.factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { PacketConnection } from '../aggregate-roots/packet-connection.aggregate-root';
import type { PacketConnectionProps } from '../aggregate-roots/packet-connection.aggregate-root';
import type { PacketEventBus } from '../packet.event-bus';
import type { Factory } from '@agnoc/toolkit';
import type { PacketFactory } from '@agnoc/transport-tcp';

export class PacketConnectionFactory implements Factory<PacketConnection> {
constructor(private readonly packetEventBus: PacketEventBus, private readonly packetFactory: PacketFactory) {}

create(props: PacketConnectionProps): PacketConnection {
return new PacketConnection(this.packetFactory, this.packetEventBus, props);
}
}
51 changes: 17 additions & 34 deletions packages/adapter-tcp/src/packet-server.connection-handler.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
import { DomainException } from '@agnoc/toolkit';
import { DeviceConnection } from './device.connection';
import { DomainException, ID } from '@agnoc/toolkit';
import { PacketMessage } from './packet.message';
import type { PacketConnection } from './aggregate-roots/packet-connection.aggregate-root';
import type { PacketConnectionFactory } from './factories/connection.factory';
import type { PacketEventBus, PacketEventBusEvents } from './packet.event-bus';
import type { DeviceRepository, Device } from '@agnoc/domain';
import type { ID } from '@agnoc/toolkit';
import type { PacketServer, PacketFactory, Packet, PayloadObjectName } from '@agnoc/transport-tcp';
import type { DeviceRepository, Device, Connection, ConnectionRepository } from '@agnoc/domain';
import type { PacketServer, Packet, PayloadObjectName } from '@agnoc/transport-tcp';

export class PackerServerConnectionHandler {
private readonly servers = new Map<PacketServer, Set<DeviceConnection>>();
private readonly servers = new Map<PacketServer, Set<PacketConnection>>();

constructor(
private readonly packetEventBus: PacketEventBus,
private readonly packetFactory: PacketFactory,
private readonly deviceRepository: DeviceRepository,
private readonly connectionRepository: ConnectionRepository,
private readonly packetConnectionFactory: PacketConnectionFactory,
) {}

findConnectionsByDeviceId(deviceId: ID): DeviceConnection[] {
const connections = [...this.servers.values()].flatMap((connections) => [...connections]);

return connections.filter((connection) => connection.device?.id.equals(deviceId));
}

addServers(...servers: PacketServer[]): void {
servers.forEach((server) => {
this.servers.set(server, new Set());
Expand All @@ -30,25 +25,21 @@ export class PackerServerConnectionHandler {

private addListeners(server: PacketServer) {
server.on('connection', (socket) => {
const connection = new DeviceConnection(this.packetFactory, this.packetEventBus, socket);
const connection = this.packetConnectionFactory.create({ id: ID.generate(), socket });

this.servers.get(server)?.add(connection);

connection.on('data', async (packet: Packet) => {
connection.socket.on('data', async (packet: Packet) => {
const packetMessage = new PacketMessage(connection, packet);

// Update the device on the connection if the device id has changed.
await this.updateConnectionDevice(packet, connection);

// Send the packet message to the packet event bus.
await this.emitPacketEvent(packetMessage);

// This is a hack to only mark the device as connected if there is more than one connection.
// Here we should check that the connections are from the same ip address.
await this.tryToSetDeviceAsConnected(connection);
});

connection.on('close', () => {
connection.socket.on('close', () => {
this.servers.get(server)?.delete(connection);
});
});
Expand All @@ -64,18 +55,6 @@ export class PackerServerConnectionHandler {
});
}

private async tryToSetDeviceAsConnected(connection: DeviceConnection) {
if (connection.device && !connection.device.isConnected) {
const connections = this.findConnectionsByDeviceId(connection.device.id);

if (connections.length > 1) {
connection.device.setAsConnected();

await this.deviceRepository.saveOne(connection.device);
}
}
}

private async emitPacketEvent(message: PacketMessage<PayloadObjectName>) {
const name = message.packet.payload.opcode.name as PayloadObjectName;
const sequence = message.packet.sequence.toString();
Expand All @@ -99,9 +78,13 @@ export class PackerServerConnectionHandler {
}
}

private async updateConnectionDevice(packet: Packet, connection: DeviceConnection) {
private async updateConnectionDevice(packet: Packet, connection: Connection) {
if (!packet.deviceId.equals(connection.device?.id)) {
connection.device = await this.findDeviceById(packet.deviceId);
const device = await this.findDeviceById(packet.deviceId);

connection.setDevice(device);

await this.connectionRepository.saveOne(connection);
}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/adapter-tcp/src/packet.message.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { DeviceConnection } from './device.connection';
import type { PacketConnection } from './aggregate-roots/packet-connection.aggregate-root';
import type { Device } from '@agnoc/domain';
import type { Packet, PayloadObjectFrom, PayloadObjectName } from '@agnoc/transport-tcp';

export class PacketMessage<Name extends PayloadObjectName = PayloadObjectName> {
constructor(readonly connection: DeviceConnection, readonly packet: Packet<Name>) {}
constructor(readonly connection: PacketConnection, readonly packet: Packet<Name>) {}

get device(): Device | undefined {
return this.connection.device;
Expand Down
5 changes: 4 additions & 1 deletion packages/adapter-tcp/src/tcp.server.test.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import { imock, instance } from '@johanblumenberg/ts-mockito';
import { TCPServer } from './tcp.server';
import type { Commands, DeviceRepository } from '@agnoc/domain';
import type { Commands, ConnectionRepository, DeviceRepository } from '@agnoc/domain';
import type { EventHandlerRegistry, TaskHandlerRegistry } from '@agnoc/toolkit';

describe('TCPServer', function () {
let domainEventHandlerRegistry: EventHandlerRegistry;
let commandHandlerRegistry: TaskHandlerRegistry<Commands>;
let deviceRepository: DeviceRepository;
let connectionRepository: ConnectionRepository;
let tcpAdapter: TCPServer;

beforeEach(function () {
domainEventHandlerRegistry = imock();
commandHandlerRegistry = imock();
deviceRepository = imock();
connectionRepository = imock();
tcpAdapter = new TCPServer(
instance(deviceRepository),
instance(connectionRepository),
instance(domainEventHandlerRegistry),
instance(commandHandlerRegistry),
);
Expand Down
Loading

0 comments on commit 39e10eb

Please sign in to comment.