Skip to content

Commit

Permalink
feat(adapter-tcp): add commands
Browse files Browse the repository at this point in the history
  • Loading branch information
adrigzr committed Mar 19, 2023
1 parent 466923c commit 80e2e09
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 62 deletions.
110 changes: 70 additions & 40 deletions packages/adapter-tcp/src/connection.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,68 +7,98 @@ import type { ID } from '@agnoc/toolkit';
import type { PacketServer, PacketFactory, Packet, PayloadObjectName } from '@agnoc/transport-tcp';

export class ConnectionManager {
private readonly connections = new Set<DeviceConnection>();
private readonly servers = new Map<PacketServer, Set<DeviceConnection>>();

constructor(
private readonly servers: PacketServer[],
private readonly packetEventBus: PacketEventBus,
private readonly packetFactory: PacketFactory,
private readonly deviceRepository: DeviceRepository,
) {
this.addListeners();
}
) {}

findConnectionsByDeviceId(deviceId: ID): DeviceConnection[] {
return [...this.connections].filter((connection) => connection.device?.id.equals(deviceId));
const connections = [...this.servers.values()].flatMap((connections) => [...connections]);

return connections.filter((connection) => connection.device?.id.equals(deviceId));
}

private addListeners() {
this.servers.forEach((server) => {
server.on('connection', (socket) => {
const connection = new DeviceConnection(this.packetFactory, socket);
addServers(...servers: PacketServer[]): void {
servers.forEach((server) => {
this.servers.set(server, new Set());
this.addListeners(server);
});
}

this.connections.add(connection);
private addListeners(server: PacketServer) {
server.on('connection', (socket) => {
const connection = new DeviceConnection(this.packetFactory, socket);

connection.on('data', async (packet: Packet) => {
const event = packet.payload.opcode.name as PayloadObjectName;
const packetMessage = new PacketMessage(connection, packet) as PacketEventBusEvents[PayloadObjectName];
this.servers.get(server)?.add(connection);

// Update the device on the connection if the device id has changed.
if (!packet.deviceId.equals(connection.device?.id)) {
connection.device = await this.findDeviceById(packet.deviceId);
}
connection.on('data', async (packet: Packet) => {
const event = packet.payload.opcode.name as PayloadObjectName;
const packetMessage = new PacketMessage(connection, packet);

const count = this.packetEventBus.listenerCount(event);
// Update the device on the connection if the device id has changed.
await this.updateConnectionDevice(packet, connection);

// Throw an error if there is no event handler for the packet event.
if (count === 0) {
throw new DomainException(`No event handler found for packet event '${event}'`);
}
// Send the packet message to the packet event bus.
await this.emitPacketEvent(event, packetMessage);

// Emit the packet event.
await this.packetEventBus.emit(event, packetMessage);
// This is a hack to only mark the device as connected if there is more than one connection.
// Here we should check that the connections are from the same ip address.
await this.tryToSetDeviceAsConnected(connection);
});

// This is a hack to only mark the device as connected if there is more than one connection.
// Here we should check that the connections are from the same ip address.
if (connection.device && !connection.device.isConnected) {
const connections = this.findConnectionsByDeviceId(connection.device.id);
connection.on('close', () => {
this.servers.get(server)?.delete(connection);
});
});

if (connections.length > 1) {
connection.device.setAsConnected();
server.on('close', async () => {
const connections = this.servers.get(server);

await this.deviceRepository.saveOne(connection.device);
}
}
});
if (connections) {
await Promise.all([...connections].map((connection) => connection.close()));

connection.on('close', () => {
connection.removeAllListeners();
this.connections.delete(connection);
});
});
this.servers.delete(server);
}
});
}

private async tryToSetDeviceAsConnected(connection: DeviceConnection) {
if (connection.device && !connection.device.isConnected) {
const connections = this.findConnectionsByDeviceId(connection.device.id);

if (connections.length > 1) {
connection.device.setAsConnected();

await this.deviceRepository.saveOne(connection.device);
}
}
}

private async emitPacketEvent(event: PayloadObjectName, packetMessage: PacketMessage<PayloadObjectName>) {
this.checkForPacketEventHandler(event);

// Emit the packet event.
await this.packetEventBus.emit(event, packetMessage as PacketEventBusEvents[PayloadObjectName]);
}

private checkForPacketEventHandler(event: PayloadObjectName) {
const count = this.packetEventBus.listenerCount(event);

// Throw an error if there is no event handler for the packet event.
if (count === 0) {
throw new DomainException(`No event handler found for packet event '${event}'`);
}
}

private async updateConnectionDevice(packet: Packet, connection: DeviceConnection) {
if (!packet.deviceId.equals(connection.device?.id)) {
connection.device = await this.findDeviceById(packet.deviceId);
}
}

private async findDeviceById(id: ID): Promise<Device | undefined> {
if (id.value === 0) {
return undefined;
Expand Down
29 changes: 18 additions & 11 deletions packages/adapter-tcp/src/device.connection.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { Device } from '@agnoc/domain';
import { ArgumentInvalidException, DomainException, ID } from '@agnoc/toolkit';
import { PacketSocket } from '@agnoc/transport-tcp';
import { TypedEmitter } from 'tiny-typed-emitter';
import Emittery from 'emittery';
import type { Packet, PacketFactory, PayloadObjectName, PayloadObjectFrom } from '@agnoc/transport-tcp';

export interface DeviceConnectionEvents {
data: (packet: Packet) => void | Promise<void>;
close: () => void;
error: (err: Error) => void;
data: Packet;
close: undefined;
error: Error;
}

export class DeviceConnection extends TypedEmitter<DeviceConnectionEvents> {
export class DeviceConnection extends Emittery<DeviceConnectionEvents> {
#device?: Device;

constructor(private readonly packetFactory: PacketFactory, private readonly socket: PacketSocket) {
Expand All @@ -37,17 +37,25 @@ export class DeviceConnection extends TypedEmitter<DeviceConnectionEvents> {
const props = { deviceId: this.#device?.id ?? new ID(0), userId: this.#device?.userId ?? new ID(0) };
const packet = this.packetFactory.create(name, object, props);

return this.socket.write(packet);
return this.write(packet);
}

respond<Name extends PayloadObjectName>(name: Name, object: PayloadObjectFrom<Name>, packet: Packet): Promise<void> {
return this.socket.write(this.packetFactory.create(name, object, packet));
return this.write(this.packetFactory.create(name, object, packet));
}

close(): Promise<void> {
return this.socket.end();
}

private async write(packet: Packet) {
if (!this.socket.connected) {
return;
}

return this.socket.write(packet);
}

private validateSocket() {
if (!(this.socket instanceof PacketSocket)) {
throw new DomainException('Socket for Connection is not an instance of PacketSocket');
Expand All @@ -60,14 +68,13 @@ export class DeviceConnection extends TypedEmitter<DeviceConnectionEvents> {

private addListeners() {
this.socket.on('data', (packet) => {
this.emit('data', packet);
void this.emit('data', packet);
});
this.socket.on('error', (err) => {
this.emit('error', err);
void this.emit('error', err);
});
this.socket.on('close', () => {
this.socket.removeAllListeners();
this.emit('close');
void this.emit('close');
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { DomainException } from '@agnoc/toolkit';
import type { PacketEventHandler } from '../../packet.event-handler';
import type { PacketMessage } from '../../packet.message';

export class DeviceLocatedEventHandler implements PacketEventHandler {
readonly eventName = 'DEVICE_SEEK_LOCATION_RSP';

async handle(message: PacketMessage<'DEVICE_SEEK_LOCATION_RSP'>): Promise<void> {
if (!message.device) {
throw new DomainException('Device not found');
}

// TODO: Should do something here?
}
}
1 change: 1 addition & 0 deletions packages/toolkit/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"tslib": "^2.5.0"
},
"devDependencies": {
"@johanblumenberg/ts-mockito": "^1.0.35",
"chai": "^4.3.7"
},
"engines": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { anything, capture, imock, instance, verify, when } from '@johanblumenberg/ts-mockito';
import { expect } from 'chai';
import { EventHandlerManager } from './event-handler.manager';
import { EventHandlerRegistry } from './event-handler.registry';
import type { EventBus, EventHandler } from '@agnoc/toolkit';

describe('EventHandlerManager', function () {
describe('EventHandlerRegistry', function () {
let eventBus: EventBus;
let eventHandler: EventHandler;
let eventHandlerManager: EventHandlerManager;
let eventHandlerManager: EventHandlerRegistry;

beforeEach(function () {
eventBus = imock();
eventHandler = imock();
eventHandlerManager = new EventHandlerManager(instance(eventBus));
eventHandlerManager = new EventHandlerRegistry(instance(eventBus));
});

it('should listen for events on the bus', function () {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type { EventBus, EventHandler } from '@agnoc/toolkit';
import type { EventBus } from './base-classes/event-bus.base';
import type { EventHandler } from './base-classes/event-handler.base';

/** Manages event handlers. */
export class EventHandlerManager {
export class EventHandlerRegistry {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
constructor(private readonly eventBus: EventBus<any>) {}

Expand Down
1 change: 1 addition & 0 deletions packages/toolkit/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export * from './base-classes/value-object.base';
export * from './decorators/bind.decorator';
export * from './domain-primitives/id.domain-primitive';
export * from './event-buses/domain.event-bus';
export * from './event-handler.registry';
export * from './exceptions/argument-invalid.exception';
export * from './exceptions/argument-not-provided.exception';
export * from './exceptions/argument-out-of-range.exception';
Expand Down
20 changes: 15 additions & 5 deletions packages/transport-tcp/src/packet.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ export interface PacketServerEvents {

/** Server that emits `PacketSockets`. */
export class PacketServer extends Emittery<PacketServerEvents> {
private server: Server;
private readonly sockets = new Set<PacketSocket>();
private readonly server = new Server();

constructor(private readonly packetMapper: PacketMapper) {
super();
this.server = new Server();
this.addListeners();
}

Expand Down Expand Up @@ -66,19 +66,29 @@ export class PacketServer extends Emittery<PacketServerEvents> {

resolve();
});

this.sockets.forEach((socket) => void socket.end());
});
}

private onConnection(socket: Socket): void {
const client = new PacketSocket(this.packetMapper, socket);
const packetSocket = new PacketSocket(this.packetMapper, socket);

this.sockets.add(packetSocket);

void this.emit('connection', client);
packetSocket.on('close', () => {
this.sockets.delete(packetSocket);
});

void this.emit('connection', packetSocket);
}

private addListeners(): void {
this.server.on('connection', this.onConnection.bind(this));
this.server.on('listening', () => void this.emit('listening'));
this.server.on('close', () => void this.emit('close'));
this.server.on('close', () => {
void this.emit('close');
});
/* istanbul ignore next - unable to test */
this.server.on('error', (error) => void this.emit('error', error));
/* istanbul ignore next - unable to test */
Expand Down

0 comments on commit 80e2e09

Please sign in to comment.