Skip to content

Commit

Permalink
use priority handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
mdaskalov committed Oct 28, 2024
1 parent 5f63cce commit eaea983
Showing 1 changed file with 82 additions and 40 deletions.
122 changes: 82 additions & 40 deletions src/mqttClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ import { Logger, PlatformConfig } from 'homebridge';
import { IClientOptions, MqttClient, connect } from 'mqtt';

type TopicCallback =
(msg: string, topic: string) => void;
(msg: string, topic: string) => boolean | void; // priority handler consumes message if not false

type TopicHandler = {
id: string;
topic: string;
messageDump: boolean;
callOnce: boolean;
callback: TopicCallback;
};

export const DEFALT_TIMEOUT = 5000;
const DEFALT_TIMEOUT = 5000;
const MAX_COUNTER = 2147483647; // 2^31 - 1

export class MQTTClient {
private topicHandlers: Array<TopicHandler> = [];
private prioHandlers: TopicHandler[] = [];
private handlers: TopicHandler[] = [];
private client: MqttClient;
private last = 0;
private idCounter = 0;

constructor(private log: Logger, private config: PlatformConfig) {
const broker = config.mqttBroker || 'localhost';
const options: IClientOptions = {
clientId: 'homebridge-zbbridge_' + Math.random().toString(16).substr(2, 8),
clientId: 'homebridge-zbbridge_' + Math.random().toString(16).slice(2, 10),
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
Expand All @@ -39,22 +39,19 @@ export class MQTTClient {
});

this.client.on('message', (topic, message) => {
const callOnceHandlers = this.topicHandlers.filter(h => h.callOnce === true && this.matchTopic(h, topic));
if (callOnceHandlers.length !== 0) {
const msg = callOnceHandlers.some(h => h.messageDump) ? topic + ' ' + message : topic;
this.log.debug('MQTT Message %s, onceHandlers: %s', msg, callOnceHandlers.length);
callOnceHandlers.forEach(h => h.callback(message.toString(), topic));
this.topicHandlers = this.topicHandlers.filter(h => !callOnceHandlers.includes(h));
const handlersCount = this.topicHandlers.filter(h => this.matchTopic(h, topic)).length;
if (handlersCount === 0) {
this.client.unsubscribe(topic);
this.log.debug('MQTT: Unsubscribed %s', topic);
const handlers = this.handlers.filter(h => this.matchTopic(h, topic));
const prioHandlers = this.prioHandlers.filter(h => this.matchTopic(h, topic));
const handlersCount = handlers.length + prioHandlers.length;
this.log.debug('MQTT: Message on %s, handler(s): %d/%d', topic, handlersCount, prioHandlers.length);
for (const prioHandler of prioHandlers) {
if (prioHandler.callback(message.toString(), topic) === true) {
return;
}
}
for (const handler of handlers) {
if (handler.callback(message.toString(), topic) === true) {
return;
}
} else {
const hadnlers = this.topicHandlers.filter(h => this.matchTopic(h, topic));
const msg = hadnlers.some(h => h.messageDump) ? topic + ' ' + message : topic;
this.log.debug('MQTT Message %s, handlers: %s', msg, hadnlers.length);
hadnlers.forEach(h => h.callback(message.toString(), topic));
}
});
}
Expand All @@ -71,36 +68,71 @@ export class MQTTClient {
return false;
}

uniqueID() {
const pid = process && process.pid ? process.pid.toString(36) : '';
const time = Date.now();
this.last = time > this.last ? time : this.last + 1;
return pid + this.last.toString(36);
uniqueID(): string {
const timestamp = Date.now();
if (this.idCounter >= MAX_COUNTER) {
this.idCounter = 0;
}
return `${timestamp}-${this.idCounter++}`;
}

handersCount(topic: string): { handlersCount: number, prioHandlersCount } {
const prioHandlersCount = this.prioHandlers.filter(h => this.matchTopic(h, topic)).length;
const handlersCount = prioHandlersCount + this.handlers.filter(h => this.matchTopic(h, topic)).length;
return { handlersCount, prioHandlersCount };
}

subscribeTopic(topic: string, callback: TopicCallback, messageDump = true, callOnce = false): string {
subscribeTopic(topic: string, callback: TopicCallback, messageDump = true, priority = false): string | undefined {
if (this.client) {
const id = this.uniqueID();
this.log.debug('MQTT Subscribed: %s, %s', topic, id);
this.topicHandlers.push({ id, topic, messageDump, callOnce, callback });
const handlersCount = this.topicHandlers.filter(h => this.matchTopic(h, topic)).length;
const handler = { id, topic, messageDump, priority, callback };
if (priority) {
this.prioHandlers.push(handler);
} else {
this.handlers.push(handler);
}
const {prioHandlersCount, handlersCount} = this.handersCount(topic);
this.log.debug('MQTT: Subscribed: %s :- %s%s - %d/%d handler(s)',
id,
topic,
priority ? ' (priority)' : '',
handlersCount,
prioHandlersCount,
);
if (handlersCount === 1) {
this.client.subscribe(topic);
}
return id;
}
return '';
return undefined;
}

unsubscribe(id: string) {
const handler = this.topicHandlers.find(h => h.id === id);
let priority = true;
let handler = this.prioHandlers.find(h => h.id === id);
if (!handler) {
priority = false;
handler = this.handlers.find(h => h.id === id);
}
if (handler) {
this.topicHandlers = this.topicHandlers.filter(h => h.id !== id);
const handlersCount = this.topicHandlers.filter(h => this.matchTopic(h, handler.topic)).length;
if (priority) {
this.prioHandlers = this.prioHandlers.filter((h => h.id !== id));
} else {
this.handlers = this.handlers.filter((h => h.id !== id));
}
const {prioHandlersCount, handlersCount} = this.handersCount(handler.topic);
if (handlersCount === 0) {
this.client.unsubscribe(handler.topic);
}
this.log.debug('MQTT: Unsubscribed %s :- %s %d handler(s)', id, handler.topic, handlersCount);
this.log.debug('MQTT: Unsubscribed %s :- %s%s - %d/%d handler(s)',
handler.id,
handler.topic,
priority ? ' (priority)' : '',
handlersCount,
prioHandlersCount,
);
} else {
this.log.warn('MQTT: Cannot unsubscribe %s - not found', id);
}
}

Expand All @@ -110,11 +142,16 @@ export class MQTTClient {
}

read(topic: string, timeout?: number, messageDump?: boolean): Promise<string> {
return new Promise((resolve: (message: string) => void, reject) => {
return new Promise((resolve, reject) => {
const start = Date.now();
const handlerId = this.subscribeTopic(topic, message => {
let handlerId: string | undefined = undefined;
handlerId = this.subscribeTopic(topic, message => {
clearTimeout(timer);
resolve(message);
if (handlerId !== undefined) {
this.unsubscribe(handlerId);
}
return true;
}, messageDump === undefined ? true : messageDump, true);
const timer = setTimeout(() => {
if (handlerId !== undefined) {
Expand All @@ -126,8 +163,13 @@ export class MQTTClient {
});
}

submit(topic: string, message: string, responseTopic = topic, timeout?: number, messageDump?: boolean): Promise<string> {
async submit(topic: string, message: string, responseTopic = topic, timeout?: number, messageDump?: boolean): Promise<string> {
this.publish(topic, message);
return this.read(responseTopic, timeout, messageDump);
try {
return await this.read(responseTopic, timeout, messageDump);
} catch {
this.log.error('Submit timeout on %s %s', topic, message);
return '';
}
}
}

0 comments on commit eaea983

Please sign in to comment.