From 87d3fad43ba500185a14a03144fab3f6b4a6f7c3 Mon Sep 17 00:00:00 2001 From: Carlo Satta Date: Sat, 19 Jan 2019 12:11:36 +0100 Subject: [PATCH] Rewrite addons login and fix some problems ref: #1 #2 #3 #4 #5 --- hassio_meross/app.js | 359 +-------------------------- hassio_meross/config.json | 2 +- hassio_meross/lib/channel/channel.js | 84 +++++++ hassio_meross/lib/channel/sensor.js | 136 ++++++++++ hassio_meross/lib/channel/switch.js | 121 +++++++++ hassio_meross/lib/device.js | 132 ++++++++++ hassio_meross/lib/meross.js | 57 +++++ hassio_meross/lib/mqtt.js | 75 ++++++ 8 files changed, 610 insertions(+), 356 deletions(-) create mode 100644 hassio_meross/lib/channel/channel.js create mode 100644 hassio_meross/lib/channel/sensor.js create mode 100644 hassio_meross/lib/channel/switch.js create mode 100644 hassio_meross/lib/device.js create mode 100644 hassio_meross/lib/meross.js create mode 100644 hassio_meross/lib/mqtt.js diff --git a/hassio_meross/app.js b/hassio_meross/app.js index 9d71ed8..c375a05 100644 --- a/hassio_meross/app.js +++ b/hassio_meross/app.js @@ -1,360 +1,9 @@ 'use strict'; - -const util = require('util'); -const q = require('q'); -const JSON = require('circular-json'); - -const MerossCloud = require('meross-cloud'); -const mqtt = require('mqtt') -const mqtt_regex = require("mqtt-regex"); +require('events').EventEmitter.defaultMaxListeners = 0; const options = require('./options.json'); +const mqtt = require('./lib/mqtt')(options.mqtt); -// MQTT -if(!options.mqtt.username || !options.mqtt.username.length) { - delete options.mqtt.username; -} - -if(!options.mqtt.password || !options.mqtt.password.length) { - delete options.mqtt.password; -} - -const client = mqtt.connect(options.mqtt); - -client.on('message', function (topic, message) { - - message = message.toString(); - message = JSON.parse(message); - - var topicReg = mqtt_regex('+prefix/+type/+elements/+action').exec; - var params = topicReg(topic); - let elements = params.elements.split('_'); - params.id = elements[0]; - params.channel = elements[1]; - - switch(params.action) { - case 'config': - console.debug(`Device ${params.id} configured ${JSON.stringify(message)}.`); - break; - case 'command': - console.debug(`Device ${params.id} recive a command ${JSON.stringify(message)}.`); - setSwitchStatus(params.id, params.channel, booleanStatus(message)); - break; - - case 'state': - console.debug(`Device ${params.id} change status ${JSON.stringify(message)}.`); - break; - - default: - console.warn(`Device ${params.id} send a unknow message ${JSON.stringify(message)}.`); - break; - } - -}); - -// MEROSS -const meross = new MerossCloud(options.meross); - -meross.connect((error) => { - if(error) { - console.error(error); - process.exit(1); - } -}) - -meross.on('connected', (uuid) => { - let device = meross.getDevice(uuid); - console.log(`Device ${device.dev.devName} (${device.dev.deviceType}) (${uuid}) discovered.`); -}); - -meross.on('error', (uuid, error) => { - console.error(`Error from the device ${uuid}.`); - console.error(error); -}); - -meross.on('close', (uuid, error) => { - console.error(`Connection close from the device ${uuid}.`); - console.error(error); -}); - -meross.on('reconnect', (uuid) => { - console.error(`Reconnection from the device ${uuid}.`); -}); - -meross.on('deviceInitialized', (uuid, deviceDef, device) => { - device.on('connected', () => { - manageDevice(deviceDef, device) - }); - - device.on('data', (namespace, payload) => { - switch (namespace) { - case 'Appliance.Control.ToggleX': - sendSwitchStatus(uuid, payload); - break; - - default: - console.warn(`Unsupported ability ${namespace} for the divice ${uuid}.`); - console.debug(payload); - break; - } - }); - -}); - -// Generic -function manageDevice(definition, device = null) { - for (let channel in definition.channels) { - let name = definition.channels[channel].devName ? `${definition.channels[channel].devName}` : `${definition.devName}`; - let config = createSwitchConfig(name, definition.uuid, channel); - manageSubscriptionSwitchDevice(definition.uuid, channel, config); - - if (device) { - device.getSystemAllData((err, info) => { - if(err || !info || !info.all || !info.all.digest) return; - sendSwitchStatus(definition.uuid, info.all.digest); - }); - } - - - device.getSystemAbilities((err, info) => { - - if(err || !info || !info.ability) return; - - if (info.ability['Appliance.Control.ConsumptionX']) { - let config = createSensorConfig(name, definition.uuid, channel, 'consumption'); - manageSubscriptionSensorDevice(definition.uuid, 'consumption', config); - } - - if (info.ability['Appliance.Control.Electricity']) { - device.getControlElectricity((err, info) => { - if(err || !info || !info.electricity) return; - - for (let type in info.electricity) { - if (type === 'channel') { - continue; - } - let config = createSensorConfig(name, definition.uuid, channel, type); - manageSubscriptionSensorDevice(definition.uuid, type, config); - } - }); - } - - if (info.ability['Appliance.Control.ConsumptionX'] || info.ability['Appliance.Control.Electricity']) { - sendSensorData(definition.uuid, channel); - } - - }); - } -} - - -// Switch -function createSwitchConfig(name, uuid, channel) { - return { - name: name, - command_topic: `${options.topic.discovery_prefix}/switch/${uuid}_${channel}/command`, - state_topic: `${options.topic.discovery_prefix}/switch/${uuid}_${channel}/state`, - value_template: '{{value_json.state}}', - payload_on: "1", - payload_off: "0", - state_on: "1", - state_off: "0", - unique_id: `${uuid}_${channel}`, - device: getDeviceInfo(uuid), - retain: true - }; -} - -function manageSubscriptionSwitchDevice(uuid, channel, config) { - client.publish(`${options.topic.discovery_prefix}/switch/${uuid}_${channel}/config`, JSON.stringify(config), { - qos: 2, - retain: true - }); - - client.subscribe(`${options.topic.discovery_prefix}/switch/${uuid}_${channel}/config`, function (err) { - // console.error(err) - }) - - client.subscribe(`${config.command_topic}`, function (err) { - // console.error(err) - }) - - client.subscribe(`${config.state_topic}`, function (err) { - // console.error(err) - }) -} - -function sendSwitchStatus(uuid, status) { - if (!status || !status.togglex) return; - - if (!Array.isArray(status.togglex)) { - status.togglex = [status.togglex]; - } - - for (let info of status.togglex) { - client.publish(`${options.topic.discovery_prefix}/switch/${uuid}_${info.channel}/state`, JSON.stringify({ - state: info.onoff, - device: getDeviceInfo(uuid) - }), {}); - } -} - -function setSwitchStatus(uuid, channel, state) { - let device = meross.getDevice(uuid); - device.controlToggleX(channel, state); -} - - -// Sensor -function createSensorConfig(name, uuid, channel, type) { - return { - name: `${name} - ${type}`, - state_topic: `${options.topic.discovery_prefix}/sensor/${uuid}_${channel}/state`, - value_template: typeToSymbol(type).template, - unit_of_measurement: typeToSymbol(type).unit, - unique_id: `${uuid}_${channel}_${type}`, - device: getDeviceInfo(uuid) - }; -} - -function manageSubscriptionSensorDevice(uuid, type, config) { - client.publish(`${options.topic.discovery_prefix}/sensor/${uuid}_${type}/config`, JSON.stringify(config), { - qos: 2, - retain: true - }); -} - -function sendSensorData(uuid, channel) { - let sensor_data = {}; - - let device = meross.getDevice(uuid); - - return q.fcall(() => { - var deferred = q.defer(); - device.getSystemAbilities((err, abilities) => { - if (err) return deferred.resolve(err); - return deferred.resolve(abilities); - }); - return deferred.promise; - }) - .then((abilities) => { - return [ - (() => { - var deferred = q.defer(); - if (abilities.ability['Appliance.Control.ConsumptionX']) { - device.getControlPowerConsumptionX((err, consumption) => { - if (err) return deferred.resolve(err); - return deferred.resolve(consumption); - }); - return deferred.promise; - } - return null; - })(), - (() => { - var deferred = q.defer(); - if (abilities.ability['Appliance.Control.ConsumptionX']) { - device.getControlElectricity((err, electricity) => { - if (err) return deferred.resolve(err); - return deferred.resolve(electricity); - }); - return deferred.promise; - } - return null; - })(), - ]; - }) - .spread((consumption, electricity) => { - if (consumption && consumption.consumptionx) { - sensor_data.consumption = consumption.consumptionx.pop().value; - } - - if (electricity && electricity.electricity) { - for (let type in electricity.electricity) { - if (type === 'channel') { - continue; - } - sensor_data[type] = electricity.electricity[type]; - } - } - return; - }) - .then(() => { - sensor_data.device = getDeviceInfo(uuid); - client.publish(`${options.topic.discovery_prefix}/sensor/${uuid}_${channel}/state`, JSON.stringify(sensor_data), { - qos: 1 - }); - }) - .then(() => { - setTimeout(() => { - sendSensorData(uuid, channel); - }, options.devices.refresh); - }) - .catch((error) => { - console.error('Impossible to retrieve sensor data.'); - console.error(error); - }) -} - -// Tool - -function getDeviceInfo(uuid) { - let device = meross.getDevice(uuid); - return { - name: device.dev.devName, - model: device.dev.deviceType, - sw_version: device.dev.fmwareVersion, - identifiers: [ - device.dev.uuid - ] - } -} - -function typeToSymbol (type) { - switch (type) { - case 'power': - return { - unit: 'W', - template: `{{ value_json.${type} | float / 1000}}`, - }; - break; - - case 'current' : - return { - unit: 'A', - template: `{{ value_json.${type} | float / 1000}}`, - }; - break; - - case 'voltage' : - return { - unit: 'V', - template: `{{value_json.${type} | float / 10}}`, - }; - break; - - case 'consumption' : - return { - unit: 'kWh', - template: `{{ value_json.${type} | float / 1000}}`, - }; - break; - } -} - -function booleanStatus(string) { - return [ - 1, - '1', - true, - 'true', - 'True', - 'TRUE', - 'yes', - 'Yes', - 'YES', - 'on', - 'On', - 'ON' - ].includes(string); -} \ No newline at end of file +const Meross = require('./lib/meross'); +const meross = new Meross(options.meross); diff --git a/hassio_meross/config.json b/hassio_meross/config.json index 940c948..77da2af 100644 --- a/hassio_meross/config.json +++ b/hassio_meross/config.json @@ -1,6 +1,6 @@ { "name": "Hassio Meross", - "version": "0.0.19", + "version": "0.1.0", "slug": "hassio_meross", "description": "Hassio addon for Meross device using MQTT and autodiscovery.", "startup": "once", diff --git a/hassio_meross/lib/channel/channel.js b/hassio_meross/lib/channel/channel.js new file mode 100644 index 0000000..c8f8932 --- /dev/null +++ b/hassio_meross/lib/channel/channel.js @@ -0,0 +1,84 @@ +const options = require('../../options.json'); +const mqtt = require('../mqtt')(); + + +class Channel { + constructor(id, device, { type = 'Switch', devName }) { + this.id = id; + this.type = type; + this.name = devName ? `${devName}` : device.name; + this.device = device; + + this.mqtt = { + baseTopic: 'undefined', + pattern: `${this.baseTopic}/+action`, + discovery_prefix: options.topic.discovery_prefix, + } + } + + sendConfig(config) { + return this.send({ + context: 'config', + message: config, + retain: true, + }) + .then(() => { + console.log(` [${this.type}][${this.name}] Config send.`); + }) + .catch((error) => { + console.error(error); + }); + } + + messageState(state) { + return { + name: this.name, + state: state, + device: this.device.getInfo(), + }; + } + + sendState(state) { + return this.send({ + context: 'state', + message: this.messageState(state), + retain: true, + }).then(() => { + console.log(` [${this.type}][${this.name}] State send.`); + }) + .catch((error) => { + console.error(error); + }); + } + + async listen() { + mqtt.client.subscribe(`${this.mqtt.baseTopic}/#`, (error) => { + console.log(` [${this.type}][${this.name}] Listening on base topic ${this.mqtt.baseTopic}/.`); + }); + this.manageMessage(); + } + + manageMessage() { + // WUT?! + } + + send({context, message, qos = 0, retain = false}) { + const topic = `${this.mqtt.baseTopic}/${context}`; + return mqtt.publish( + topic, + message, + {qos, retain} + ) + .then(() => { + console.log(` [${this.type}][${this.name}] Message send to ${topic}.`); + }) + .catch((error) => { + console.error(` [${this.type}][${this.name}] Error sending message to ${context}.`); + console.error(error); + }); + } + +} + +module.exports = Channel; + \ No newline at end of file diff --git a/hassio_meross/lib/channel/sensor.js b/hassio_meross/lib/channel/sensor.js new file mode 100644 index 0000000..23afd73 --- /dev/null +++ b/hassio_meross/lib/channel/sensor.js @@ -0,0 +1,136 @@ +const q = require('q'); + +const mqtt = require('../mqtt')(); +const options = require('../../options.json'); +const Channel = require('./channel'); + +class Sensor extends Channel{ + constructor(...args) { + super(...args); + this.__state; + this.type = 'Sensor'; + this.ability = args.pop(); + this.name = `${this.name} - ${this.type} ${this.ability}`; + this.mqtt.baseTopic = `${this.mqtt.discovery_prefix}/sensor/${this.device.uuid}_${this.id}_${this.ability}`; + this.bootstrap(); + } + + async bootstrap() { + console.log(` [${this.type}][${this.name}] Preparation.`); + try { + await mqtt.publish(`${this.mqtt.baseTopic}/state`, ''); + await this.sendConfig(this.config); + this.watchState(); + } catch (error) { + console.error(`[ERROR][${this.type}][${this.name}] Error in bootstrap: ${error.message}.`); + console.error(error); + return; + } + } + + set state(value) { + this.__state = value; + this.sendState(this.state); + } + + get state() { + return this.__state; + } + + async watchState() { + + switch (this.ability) { + case 'current': + const consumption = await this.getControlPowerConsumptionX(); + this.state = consumption.consumptionx.pop().value; + break; + + default: + const electricity = await this.getControlElectricity(); + const value = electricity.electricity[this.ability] ? electricity.electricity[this.ability] : 0; + this.state = value; + break; + } + + setTimeout(async () => { + await this.watchState(); + }, options.devices.refresh); + } + + get config() { + return { + name: this.name, + state_topic: `${this.mqtt.baseTopic}/state`, + value_template: typeToSymbol(this.ability).template, + unit_of_measurement: typeToSymbol(this.ability).unit, + unique_id: `${this.device.uuid}_${this.id}_${this.ability}`, + device: this.device.getInfo() + }; + } + + messageState(state) { + return { + name: this.name, + state: state.toString(), + device: this.device.getInfo(), + }; + } + + async getControlPowerConsumptionX() { + const deferred = q.defer(); + this.device.meross.getControlPowerConsumptionX((error, response) => { + if (error) { + deferred.reject(error); + } + deferred.resolve(response); + }); + return deferred.promise; + } + + async getControlElectricity() { + const deferred = q.defer(); + this.device.meross.getControlElectricity((error, response) => { + if (error) { + deferred.reject(error); + } + deferred.resolve(response); + }); + return deferred.promise; + } + +} + +module.exports = Sensor; + +function typeToSymbol (type) { + switch (type) { + case 'power': + return { + unit: 'W', + template: '{{ value_json.state | float / 1000}}', + }; + break; + + case 'current' : + return { + unit: 'A', + template: '{{ value_json.state | float / 1000}}', + }; + break; + + case 'voltage' : + return { + unit: 'V', + template: `{{value_json.state | float / 10}}`, + }; + break; + + case 'consumption' : + return { + unit: 'kWh', + template: '{{ value_json.state | float / 1000}}', + }; + break; + } +} + \ No newline at end of file diff --git a/hassio_meross/lib/channel/switch.js b/hassio_meross/lib/channel/switch.js new file mode 100644 index 0000000..90f4ac0 --- /dev/null +++ b/hassio_meross/lib/channel/switch.js @@ -0,0 +1,121 @@ +const q = require('q'); + +const Channel = require('./channel'); +const mqtt = require('../mqtt')(); +const mqtt_regex = require("mqtt-regex"); + +class Switch extends Channel{ + constructor(...args) { + super(...args); + this.__state; + this.mqtt.baseTopic = `${this.mqtt.discovery_prefix}/switch/${this.device.uuid}_${this.id}`; + this.mqtt.pattern = `${this.mqtt.baseTopic}/+action`; + this.bootstrap(); + } + + get config() { + return { + name: this.name, + command_topic: `${this.mqtt.baseTopic}/command`, + state_topic: `${this.mqtt.baseTopic}/state`, + value_template: '{{value_json.state}}', + payload_on: "1", + payload_off: "0", + state_on: "1", + state_off: "0", + unique_id: `${this.device.uuid}_${this.id}`, + device: this.device.getInfo() + }; + } + + set state(value) { + this.__state = value; + this.sendState(this.state); + } + + get state() { + return this.__state; + } + + async command(state) { + try { + await this.controlToggle(state) + this.state = state; + } catch (error) { + console.error(`[ERROR][${this.type}][${this.name}] Error in command: ${error.message}.`); + } + } + + async bootstrap() { + console.log(` [${this.type}][${this.name}] Preparation.`); + + await mqtt.publish(`${this.mqtt.baseTopic}/command`, ''); + await mqtt.publish(`${this.mqtt.baseTopic}/state`, ''); + + try { + let info = await this.getSystemAllData(); + this.state = info.all.digest.togglex[this.id].onoff; + } catch (error) { + console.warn(`[WARN][${this.type}][${this.name}] Impossibile retrieve real status: ${error.message}.`); + this.state = 0; + } + + await this.sendConfig(this.config); + await this.listen(); + } + + manageMessage() { + mqtt.client.on('message', (topic, message) => { + const room_message_info = mqtt_regex(this.mqtt.pattern).exec; + const params = room_message_info(topic); + + if (!params || !params.action) { + return; + } + + try { + console.log(` [${this.type}][${this.name}] Message arrived to ${topic}.`); + message = JSON.parse(message); + switch (params.action) { + case 'command': + this.command(message); + break; + + default: + // console.debug(`[DEBUG][${this.type}][${this.name}] Ignored action "${params.action}" called.`); + break; + } + } catch (error) { + console.error(` [${this.type}][${this.name}] ${error.message}.`) + } + + + }); + } + + getSystemAllData() { + const deferred = q.defer(); + this.device.meross.getSystemAllData((error, response) => { + if (error) { + deferred.reject(error); + } + deferred.resolve(response); + }); + return deferred.promise; + } + + controlToggle(state) { + const deferred = q.defer(); + this.device.meross.controlToggleX(this.id, state, (error, response) => { + if (error) { + deferred.reject(error); + } + deferred.resolve(response); + }); + return deferred.promise; + } + +} + +module.exports = Switch; + \ No newline at end of file diff --git a/hassio_meross/lib/device.js b/hassio_meross/lib/device.js new file mode 100644 index 0000000..9013f17 --- /dev/null +++ b/hassio_meross/lib/device.js @@ -0,0 +1,132 @@ +const Switch = require('./channel/switch'); +const Sensor = require('./channel/sensor'); + +class Device { + constructor(device) { + this.meross = device; + this.uuid = this.meross.dev.uuid; + this.name = this.meross.dev.devName; + this.deviceType = this.meross.dev.deviceType; + this.fmwareVersion = this.meross.dev.fmwareVersion; + this.hdwareVersion = this.meross.dev.hdwareVersion; + + this.switchs = []; + this.sensors = []; + + this.meross.on('connected', () => { + this.connected(); + }); + + this.meross.on('data', (namespace, payload) => { + switch (namespace) { + case 'Appliance.Control.Toggle': + case 'Appliance.Control.ToggleX': + try { + if (!Array.isArray(payload.togglex)) { + payload.togglex = [payload.togglex]; + } + payload.togglex.map((info) => { + this.switchs[info.channel ? info.channel : 0].state = info.onoff; + }); + } catch (error) { + console.error(`[Device][${this.name}] ${error.message}`); + } + break; + + case 'Appliance.System.Online': + // Not mapped ability Appliance.System.Online for the device . + // { online: { status: 1 } } // ONLINE -- { online: { status: 2 } } // OFFLINE + + try { + if (payload.online.status === 1) { + console.log(`[Device][${this.name}] Online.`); + this.connected(); + return; + } + // console.log(this, payload); + console.log(`[Device][${this.name}] Offline. TODO.`); + } catch (error) { + console.error(`[Device][${this.name}] ${error.message}`); + } + + break; + + // Not mapped ability Appliance.System.Report for the device . + // { report: [ { type: '1', value: '0', timestamp: 1547321502 } ] } + + // Not mapped ability Appliance.Control.ConsumptionX for the device . + // { consumptionx: + // [ { date: '2019-01-01', time: 1546383598, value: 245 }, + // .... + // { date: '2018-12-23', time: 1545597881, value: 6 } ] } + + default: + console.warn(`Not mapped ability ${namespace} for the device ${this.uuid}.`); + console.debug(payload); + break; + } + }); + } + + connected() { + console.log(`[Device][${this.name}] Device ${this.uuid}: connected`); + // console.log(this.meross); + this.switchs = []; + this.meross.dev.channels.map((info, id) => { + this.switchs.push(new Switch(id, this, info)); + }); + + this.sensors = []; + this.meross.getSystemAbilities((error, abilities) => { + this.meross.dev.channels.map((info, id) => { + // console.log(this.name, abilities); + try { + if (error) { + throw new Error(error); + } + + if (!abilities.ability['Appliance.Control.ConsumptionX'] && !abilities.ability['Appliance.Control.Electricity']) { + throw new Error(`The channel ${id} have not abilities to be a sensor`); + } + + this.sensors[id] = []; + + if (abilities.ability['Appliance.Control.ConsumptionX']) { + this.sensors[id].push(new Sensor(id, this, info, 'consumption')); + } + + if (abilities.ability['Appliance.Control.Electricity']) { + this.meross.getControlElectricity((error, electricity) => { + if (error) { + throw new Error(error); + } + for(let ability in electricity.electricity) { + if (ability === 'channel') continue; + this.sensors[id].push(new Sensor(id, this, info, ability)); + } + }); + } + } catch (error) { + console.error(`[Device][${this.name}] ${error.message}`); + return; + } + + }); + + }); + } + + getInfo() { + return { + name: this.name, + model: this.type, + sw_version: this.fmwareVersion, + identifiers: [ + this.uuid + ] + } + } + +} + +module.exports = Device; diff --git a/hassio_meross/lib/meross.js b/hassio_meross/lib/meross.js new file mode 100644 index 0000000..06efbde --- /dev/null +++ b/hassio_meross/lib/meross.js @@ -0,0 +1,57 @@ +const MerossCloud = require('meross-cloud'); + +const Device = require('./device'); + +class Meross { + constructor(config) { + this.meross = new MerossCloud(config); + this.meross.setMaxListeners(0); + this.devices = {}; + + this.meross.connect((error) => { + if (error) { + this.connectionError(error); + } + }); + + this.meross.on('connected', (uuid) => { + this.connected(uuid); + }); + + this.meross.on('deviceInitialized', (uuid, deviceDef, device) => { + this.deviceInitialized(uuid, device); + }); + + this.meross.on('error', (uuid, error) => { + console.error(`Error from the device ${uuid}.`); + console.error(error); + }); + + this.meross.on('close', (uuid, error) => { + console.error(`Connection close from the device ${uuid}.`); + console.error(error); + }); + + this.meross.on('reconnect', (uuid) => { + console.error(`Reconnection from the device ${uuid}.`); + }); + + return this; + } + + connected(uuid) {} + + connectionError(error) { + if(error) { + console.error(error); + // process.exit(1); + } + } + + deviceInitialized(uuid, device) { + console.log(`Initizializing the device ${uuid}.`); + this.devices[uuid] = new Device(device); + } +} + +module.exports = Meross; \ No newline at end of file diff --git a/hassio_meross/lib/mqtt.js b/hassio_meross/lib/mqtt.js new file mode 100644 index 0000000..63aa48d --- /dev/null +++ b/hassio_meross/lib/mqtt.js @@ -0,0 +1,75 @@ +const mqttClient = require('mqtt'); +const q = require('q'); + +function mqtt(config = undefined) { + + if (config || !this.config) { + this.config = config; + } + + if(!this.config.username || !this.config.username.length) { + delete this.config.username; + } + + if(!this.config.password || !this.config.password.length) { + delete this.config.password; + } + + if (!this.client) { + this.config.clean = true; + this.client = mqttClient.connect(this.config); + + this.client.on('connect', () => { + console.log('[MQTT] Connect to the broker.'); + }); + + this.client.on('reconnect', () => { + console.log('[MQTT] Try to reconnect.'); + }); + + this.client.on('close', () => { + console.log('[MQTT] Connection closed.'); + }); + + this.client.on('offline', () => { + console.log('[MQTT] Server offline.'); + }); + + this.client.on('error', (error) => { + console.error('[MQTT] Error!.'); + console.log(error); + }); + + this.client.on('end', () => { + console.log('[MQTT] Connection ended.'); + }); + + this.client.on('message', (topic, message) => { + // console.log(`[MQTT] message on topic ${topic}.`); + }); + + } + + this.publish = (topic, message, options = {}) => { + const deferred = q.defer(); + + if (message) { + message = JSON.stringify(message); + } + + this.client.publish(topic, message, options, (error) => { + if (error) { + deferred.reject(new Error(error)); + } else { + deferred.resolve(); + } + }); + return deferred.promise; + } + + + return this; + +} + +module.exports = mqtt; \ No newline at end of file