Skip to content

Commit

Permalink
feat(adapter-tcp): add commands
Browse files Browse the repository at this point in the history
  • Loading branch information
adrigzr committed Mar 19, 2023
1 parent 466923c commit 866fa53
Show file tree
Hide file tree
Showing 33 changed files with 427 additions and 107 deletions.
116 changes: 76 additions & 40 deletions packages/adapter-tcp/src/connection.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,68 +7,104 @@ import type { ID } from '@agnoc/toolkit';
import type { PacketServer, PacketFactory, Packet, PayloadObjectName } from '@agnoc/transport-tcp';

export class ConnectionManager {
private readonly connections = new Set<DeviceConnection>();
private readonly servers = new Map<PacketServer, Set<DeviceConnection>>();

constructor(
private readonly servers: PacketServer[],
private readonly packetEventBus: PacketEventBus,
private readonly packetFactory: PacketFactory,
private readonly deviceRepository: DeviceRepository,
) {
this.addListeners();
}
) {}

findConnectionsByDeviceId(deviceId: ID): DeviceConnection[] {
return [...this.connections].filter((connection) => connection.device?.id.equals(deviceId));
const connections = [...this.servers.values()].flatMap((connections) => [...connections]);

return connections.filter((connection) => connection.device?.id.equals(deviceId));
}

private addListeners() {
this.servers.forEach((server) => {
server.on('connection', (socket) => {
const connection = new DeviceConnection(this.packetFactory, socket);
addServers(...servers: PacketServer[]): void {
servers.forEach((server) => {
this.servers.set(server, new Set());
this.addListeners(server);
});
}

this.connections.add(connection);
private addListeners(server: PacketServer) {
server.on('connection', (socket) => {
const connection = new DeviceConnection(this.packetFactory, this.packetEventBus, socket);

connection.on('data', async (packet: Packet) => {
const event = packet.payload.opcode.name as PayloadObjectName;
const packetMessage = new PacketMessage(connection, packet) as PacketEventBusEvents[PayloadObjectName];
this.servers.get(server)?.add(connection);

// Update the device on the connection if the device id has changed.
if (!packet.deviceId.equals(connection.device?.id)) {
connection.device = await this.findDeviceById(packet.deviceId);
}
connection.on('data', async (packet: Packet) => {
const packetMessage = new PacketMessage(connection, packet);

const count = this.packetEventBus.listenerCount(event);
// Update the device on the connection if the device id has changed.
await this.updateConnectionDevice(packet, connection);

// Throw an error if there is no event handler for the packet event.
if (count === 0) {
throw new DomainException(`No event handler found for packet event '${event}'`);
}
// Send the packet message to the packet event bus.
await this.emitPacketEvent(packetMessage);

// Emit the packet event.
await this.packetEventBus.emit(event, packetMessage);
// This is a hack to only mark the device as connected if there is more than one connection.
// Here we should check that the connections are from the same ip address.
await this.tryToSetDeviceAsConnected(connection);
});

// This is a hack to only mark the device as connected if there is more than one connection.
// Here we should check that the connections are from the same ip address.
if (connection.device && !connection.device.isConnected) {
const connections = this.findConnectionsByDeviceId(connection.device.id);
connection.on('close', () => {
this.servers.get(server)?.delete(connection);
});
});

if (connections.length > 1) {
connection.device.setAsConnected();
server.on('close', async () => {
const connections = this.servers.get(server);

await this.deviceRepository.saveOne(connection.device);
}
}
});
if (connections) {
await Promise.all([...connections].map((connection) => connection.close()));

connection.on('close', () => {
connection.removeAllListeners();
this.connections.delete(connection);
});
});
this.servers.delete(server);
}
});
}

private async tryToSetDeviceAsConnected(connection: DeviceConnection) {
if (connection.device && !connection.device.isConnected) {
const connections = this.findConnectionsByDeviceId(connection.device.id);

if (connections.length > 1) {
connection.device.setAsConnected();

await this.deviceRepository.saveOne(connection.device);
}
}
}

private async emitPacketEvent(message: PacketMessage<PayloadObjectName>) {
const name = message.packet.payload.opcode.name as PayloadObjectName;
const sequence = message.packet.sequence.toString();

this.checkForPacketEventHandler(name);

// Emit the packet event by the sequence string.
// This is used to wait for a response from a packet.
await this.packetEventBus.emit(sequence, message as PacketEventBusEvents[PayloadObjectName]);

// Emit the packet event by the opcode name.
await this.packetEventBus.emit(name, message as PacketEventBusEvents[PayloadObjectName]);
}

private checkForPacketEventHandler(event: PayloadObjectName) {
const count = this.packetEventBus.listenerCount(event);

// Throw an error if there is no event handler for the packet event.
if (count === 0) {
throw new DomainException(`No event handler found for packet event '${event}'`);
}
}

private async updateConnectionDevice(packet: Packet, connection: DeviceConnection) {
if (!packet.deviceId.equals(connection.device?.id)) {
connection.device = await this.findDeviceById(packet.deviceId);
}
}

private async findDeviceById(id: ID): Promise<Device | undefined> {
if (id.value === 0) {
return undefined;
Expand Down
73 changes: 58 additions & 15 deletions packages/adapter-tcp/src/device.connection.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
import { Device } from '@agnoc/domain';
import { ArgumentInvalidException, DomainException, ID } from '@agnoc/toolkit';
import { PacketSocket } from '@agnoc/transport-tcp';
import { TypedEmitter } from 'tiny-typed-emitter';
import type { Packet, PacketFactory, PayloadObjectName, PayloadObjectFrom } from '@agnoc/transport-tcp';
import Emittery from 'emittery';
import type { PacketEventBus } from './packet.event-bus';
import type { PacketMessage } from './packet.message';
import type {
Packet,
PacketFactory,
PayloadObjectName,
PayloadObjectFrom,
CreatePacketProps,
} from '@agnoc/transport-tcp';

export interface DeviceConnectionEvents {
data: (packet: Packet) => void | Promise<void>;
close: () => void;
error: (err: Error) => void;
data: Packet;
close: undefined;
error: Error;
}

export class DeviceConnection extends TypedEmitter<DeviceConnectionEvents> {
export class DeviceConnection extends Emittery<DeviceConnectionEvents> {
#device?: Device;

constructor(private readonly packetFactory: PacketFactory, private readonly socket: PacketSocket) {
constructor(
private readonly packetFactory: PacketFactory,
private readonly eventBus: PacketEventBus,
private readonly socket: PacketSocket,
) {
super();
this.validateSocket();
this.addListeners();
Expand All @@ -34,20 +46,52 @@ export class DeviceConnection extends TypedEmitter<DeviceConnectionEvents> {
}

send<Name extends PayloadObjectName>(name: Name, object: PayloadObjectFrom<Name>): Promise<void> {
const props = { deviceId: this.#device?.id ?? new ID(0), userId: this.#device?.userId ?? new ID(0) };
const packet = this.packetFactory.create(name, object, props);
const packet = this.packetFactory.create(name, object, this.getPacketProps());

return this.socket.write(packet);
return this.write(packet);
}

respond<Name extends PayloadObjectName>(name: Name, object: PayloadObjectFrom<Name>, packet: Packet): Promise<void> {
return this.socket.write(this.packetFactory.create(name, object, packet));
return this.write(this.packetFactory.create(name, object, packet));
}

sendAndWait<Name extends PayloadObjectName>(name: Name, object: PayloadObjectFrom<Name>): Promise<PacketMessage> {
const packet = this.packetFactory.create(name, object, this.getPacketProps());

return this.writeAndWait(packet);
}

respondAndWait<Name extends PayloadObjectName>(
name: Name,
object: PayloadObjectFrom<Name>,
packet: Packet,
): Promise<PacketMessage> {
return this.writeAndWait(this.packetFactory.create(name, object, packet));
}

close(): Promise<void> {
return this.socket.end();
}

private getPacketProps(): CreatePacketProps {
return { deviceId: this.#device?.id ?? new ID(0), userId: this.#device?.userId ?? new ID(0) };
}

private writeAndWait(packet: Packet): Promise<PacketMessage> {
return new Promise((resolve, reject) => {
this.eventBus.once(packet.sequence.toString()).then(resolve, reject);
this.write(packet).catch(reject);
});
}

private async write(packet: Packet) {
if (!this.socket.connected) {
return;
}

return this.socket.write(packet);
}

private validateSocket() {
if (!(this.socket instanceof PacketSocket)) {
throw new DomainException('Socket for Connection is not an instance of PacketSocket');
Expand All @@ -60,14 +104,13 @@ export class DeviceConnection extends TypedEmitter<DeviceConnectionEvents> {

private addListeners() {
this.socket.on('data', (packet) => {
this.emit('data', packet);
void this.emit('data', packet);
});
this.socket.on('error', (err) => {
this.emit('error', err);
void this.emit('error', err);
});
this.socket.on('close', () => {
this.socket.removeAllListeners();
this.emit('close');
void this.emit('close');
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { DomainException } from '@agnoc/toolkit';
import type { ConnectionManager } from '../../connection.manager';
import type { CommandEventHandler, LocateDeviceCommand } from '@agnoc/domain';

export class LocateDeviceEventHandler implements CommandEventHandler {
readonly eventName = 'LocateDeviceCommand';

constructor(private readonly connectionManager: ConnectionManager) {}

async handle(event: LocateDeviceCommand): Promise<void> {
const [connection] = this.connectionManager.findConnectionsByDeviceId(event.deviceId);

if (!connection || !connection.device) {
throw new DomainException(`Unable to find a connection for the device with id ${event.deviceId.value}`);
}

await connection.sendAndWait('DEVICE_SEEK_LOCATION_REQ', {});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { DomainException } from '@agnoc/toolkit';
import type { PacketEventHandler } from '../../packet.event-handler';
import type { PacketMessage } from '../../packet.message';

export class DeviceLocatedEventHandler implements PacketEventHandler {
readonly eventName = 'DEVICE_SEEK_LOCATION_RSP';

async handle(message: PacketMessage<'DEVICE_SEEK_LOCATION_RSP'>): Promise<void> {
if (!message.device) {
throw new DomainException('Device not found');
}

// TODO: Should do something here?
}
}
2 changes: 2 additions & 0 deletions packages/adapter-tcp/src/packet.event-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import type { PayloadObjectName } from '@agnoc/transport-tcp';
/** Events for the packet event bus. */
export type PacketEventBusEvents = {
[Name in PayloadObjectName]: PacketMessage<Name>;
} & {
[key: string]: PacketMessage;
};

/** Event bus for packets. */
Expand Down
6 changes: 5 additions & 1 deletion packages/adapter-tcp/src/packet.message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { DeviceConnection } from './device.connection';
import type { Device } from '@agnoc/domain';
import type { Packet, PayloadObjectFrom, PayloadObjectName } from '@agnoc/transport-tcp';

export class PacketMessage<Name extends PayloadObjectName> {
export class PacketMessage<Name extends PayloadObjectName = PayloadObjectName> {
constructor(readonly connection: DeviceConnection, readonly packet: Packet<Name>) {}

get device(): Device | undefined {
Expand All @@ -12,4 +12,8 @@ export class PacketMessage<Name extends PayloadObjectName> {
respond<Name extends PayloadObjectName>(name: Name, object: PayloadObjectFrom<Name>): Promise<void> {
return this.connection.respond(name, object, this.packet);
}

respondAndWait<Name extends PayloadObjectName>(name: Name, object: PayloadObjectFrom<Name>): Promise<PacketMessage> {
return this.connection.respondAndWait(name, object, this.packet);
}
}
Loading

0 comments on commit 866fa53

Please sign in to comment.