Skip to content

Commit

Permalink
Merge pull request #15 from myDevicesIoT/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
jburhenn authored Jul 18, 2018
2 parents 0ab787f + 0fcecf5 commit 1b71c6c
Show file tree
Hide file tree
Showing 19 changed files with 710 additions and 1,012 deletions.
7 changes: 1 addition & 6 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,10 @@ Requirements

sudo apt-get install python3-setuptools

* libiw-dev - Wireless tools development file package. Via `apt-get` this can be installed with:
::

sudo apt-get install libiw-dev

All of the above packages can be installed at once via `apt-get` by running:
::

sudo apt-get install python3-pip python3-dev python3-setuptools libiw-dev
sudo apt-get install python3-pip python3-dev python3-setuptools

***************
Getting Started
Expand Down
2 changes: 1 addition & 1 deletion myDevices/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
This package contains the Cayenne agent, which is a full featured client for the Cayenne IoT project builder: https://cayenne.mydevices.com. It sends system information as well as sensor and actuator data and responds to actuator messages initiated from the Cayenne dashboard and mobile apps.
"""
__version__ = '2.0.0'
__version__ = '2.0.1'
16 changes: 11 additions & 5 deletions myDevices/cloud/apiclient.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from myDevices.requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor
import json
from myDevices.utils.logger import error, exception
from myDevices.system.hardware import Hardware
from myDevices.system.systeminfo import SystemInfo
from concurrent.futures import ThreadPoolExecutor

from myDevices import __version__
from myDevices.cloud import cayennemqtt
from myDevices.devices.digital.gpio import NativeGPIO
from myDevices.requests_futures.sessions import FuturesSession
from myDevices.system.hardware import Hardware
from myDevices.system.systeminfo import SystemInfo
from myDevices.utils.config import Config, APP_SETTINGS
from myDevices.utils.logger import error, exception


class CayenneApiClient:
def __init__(self, host):
Expand Down Expand Up @@ -56,6 +60,8 @@ def getMessageBody(self, inviteCode):
system_data = []
cayennemqtt.DataChannel.add(system_data, cayennemqtt.SYS_HARDWARE_MAKE, value=hardware.getManufacturer(), type='string', unit='utf8')
cayennemqtt.DataChannel.add(system_data, cayennemqtt.SYS_HARDWARE_MODEL, value=hardware.getModel(), type='string', unit='utf8')
config = Config(APP_SETTINGS)
cayennemqtt.DataChannel.add(system_data, cayennemqtt.AGENT_VERSION, value=config.get('Agent', 'Version', __version__))
system_info = SystemInfo()
capacity_data = system_info.getMemoryInfo((cayennemqtt.CAPACITY,))
capacity_data += system_info.getDiskInfo((cayennemqtt.CAPACITY,))
Expand Down
41 changes: 26 additions & 15 deletions myDevices/cloud/cayennemqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
COMMAND_TOPIC = 'cmd'
COMMAND_JSON_TOPIC = 'cmd.json'
COMMAND_RESPONSE_TOPIC = 'response'
JOBS_TOPIC = 'jobs.json'

# Data Channels
SYS_HARDWARE_MAKE = 'sys:hw:make'
Expand All @@ -30,6 +31,7 @@
AGENT_VERSION = 'agent:version'
AGENT_DEVICES = 'agent:devices'
AGENT_MANAGE = 'agent:manage'
AGENT_SCHEDULER = 'agent:scheduler'
DEV_SENSOR = 'dev'

# Channel Suffixes
Expand Down Expand Up @@ -68,7 +70,7 @@ def add(data_list, prefix, channel=None, suffix=None, value=None, type=None, uni
class CayenneMQTTClient:
"""Cayenne MQTT Client class.
This is the main client class for connecting to Cayenne and sending and recFUeiving data.
This is the main client class for connecting to Cayenne and sending and receiving data.
Standard usage:
* Set on_message callback, if you are receiving data.
Expand Down Expand Up @@ -150,6 +152,26 @@ def disconnect_callback(self, client, userdata, rc):
print("Reconnect failed, retrying")
time.sleep(5)

def transform_command(self, command, payload=[], channel=[]):
"""Transform a command message into an object.
command is the command object that will be transformed in place.
payload is an optional list of payload data items.
channel is an optional list containing channel and suffix data.
"""
if not payload:
command['payload'] = command.pop('value')
channel = command['channel'].split('/')[-1].split(';')
else:
if len(payload) > 1:
command['cmdId'] = payload[0]
command['payload'] = payload[1]
else:
command['payload'] = payload[0]
command['channel'] = channel[0]
if len(channel) > 1:
command['suffix'] = channel[1]

def message_callback(self, client, userdata, msg):
"""The callback for when a message is received from the server.
Expand All @@ -160,21 +182,10 @@ def message_callback(self, client, userdata, msg):
try:
message = {}
if msg.topic[-len(COMMAND_JSON_TOPIC):] == COMMAND_JSON_TOPIC:
payload = loads(msg.payload.decode())
message['payload'] = payload['value']
message['cmdId'] = payload['cmdId']
channel = payload['channel'].split('/')[-1].split(';')
message = loads(msg.payload.decode())
self.transform_command(message)
else:
payload = msg.payload.decode().split(',')
if len(payload) > 1:
message['cmdId'] = payload[0]
message['payload'] = payload[1]
else:
message['payload'] = payload[0]
channel = msg.topic.split('/')[-1].split(';')
message['channel'] = channel[0]
if len(channel) > 1:
message['suffix'] = channel[1]
self.transform_command(message, msg.payload.decode().split(','), msg.topic.split('/')[-1].split(';'))
debug('message_callback: {}'.format(message))
if self.on_message:
self.on_message(message)
Expand Down
124 changes: 97 additions & 27 deletions myDevices/cloud/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from myDevices.utils.logger import exception, info, warn, error, debug, logJson
from myDevices.sensors import sensors
from myDevices.system.hardware import Hardware
# from myDevices.cloud.scheduler import SchedulerEngine
from myDevices.cloud.scheduler import SchedulerEngine
from myDevices.cloud.download_speed import DownloadSpeed
from myDevices.cloud.updater import Updater
from myDevices.system.systemconfig import SystemConfig
Expand Down Expand Up @@ -110,16 +110,24 @@ def run(self):
if self.cloudClient.mqttClient.connected == False:
info('WriterThread mqttClient not connected')
continue
got_packet = False
topic, message = self.cloudClient.DequeuePacket()
if message:
# debug('WriterThread, topic: {} {}'.format(topic, message))
if not isinstance(message, str):
message = dumps(message)
self.cloudClient.mqttClient.publish_packet(topic, message)
message = None
self.cloudClient.writeQueue.task_done()
if topic or message:
got_packet = True
try:
if message or topic == cayennemqtt.JOBS_TOPIC:
# debug('WriterThread, topic: {} {}'.format(topic, message))
if not isinstance(message, str):
message = dumps(message)
self.cloudClient.mqttClient.publish_packet(topic, message)
message = None
except:
exception("WriterThread publish packet error")
finally:
if got_packet:
self.cloudClient.writeQueue.task_done()
except:
exception("WriterThread Unexpected error")
exception("WriterThread unexpected error")
return

def stop(self):
Expand Down Expand Up @@ -187,7 +195,7 @@ def Start(self):
if not self.Connect():
error('Error starting agent')
return
# self.schedulerEngine = SchedulerEngine(self, 'client_scheduler')
self.schedulerEngine = SchedulerEngine(self, 'client_scheduler')
self.sensorsClient = sensors.SensorsClient()
self.readQueue = Queue()
self.writeQueue = Queue()
Expand All @@ -206,6 +214,8 @@ def Start(self):
TimerThread(self.SendSystemState, 30, 5)
self.updater = Updater(self.config)
self.updater.start()
events = self.schedulerEngine.get_scheduled_events()
self.EnqueuePacket(events, cayennemqtt.JOBS_TOPIC)
# self.sentHistoryData = {}
# self.historySendFails = 0
# self.historyThread = Thread(target=self.SendHistoryData)
Expand Down Expand Up @@ -359,8 +369,12 @@ def OnMessage(self, message):

def RunAction(self, action):
"""Run a specified action"""
debug('RunAction')
self.ExecuteMessage(action)
debug('RunAction: {}'.format(action))
result = True
command = action.copy()
self.mqttClient.transform_command(command)
result = self.ExecuteMessage(command)
return result

def ProcessMessage(self):
"""Process a message from the server"""
Expand All @@ -373,28 +387,36 @@ def ProcessMessage(self):
self.ExecuteMessage(messageObject)

def ExecuteMessage(self, message):
"""Execute an action described in a message object"""
"""Execute an action described in a message object
Returns: True if action was executed, False otherwise."""
result = False
if not message:
return
return result
channel = message['channel']
info('ExecuteMessage: {}'.format(message))
if channel in (cayennemqtt.SYS_POWER_RESET, cayennemqtt.SYS_POWER_HALT):
self.ProcessPowerCommand(message)
result = self.ProcessPowerCommand(message)
elif channel.startswith(cayennemqtt.DEV_SENSOR):
self.ProcessSensorCommand(message)
result = self.ProcessSensorCommand(message)
elif channel.startswith(cayennemqtt.SYS_GPIO):
self.ProcessGpioCommand(message)
result = self.ProcessGpioCommand(message)
elif channel == cayennemqtt.AGENT_DEVICES:
self.ProcessDeviceCommand(message)
result = self.ProcessDeviceCommand(message)
elif channel in (cayennemqtt.SYS_I2C, cayennemqtt.SYS_SPI, cayennemqtt.SYS_UART, cayennemqtt.SYS_ONEWIRE):
self.ProcessConfigCommand(message)
result = self.ProcessConfigCommand(message)
elif channel == cayennemqtt.AGENT_MANAGE:
self.ProcessAgentCommand(message)
result = self.ProcessAgentCommand(message)
elif channel == cayennemqtt.AGENT_SCHEDULER:
result = self.ProcessSchedulerCommand(message)
else:
info('Unknown message')
return result

def ProcessPowerCommand(self, message):
"""Process command to reboot/shutdown the system"""
"""Process command to reboot/shutdown the system
Returns: True if command was processed, False otherwise."""
error_message = None
try:
self.EnqueueCommandResponse(message, error_message)
Expand All @@ -405,6 +427,7 @@ def ProcessPowerCommand(self, message):
cayennemqtt.DataChannel.add(data, message['channel'], value=1)
self.EnqueuePacket(data)
self.writeQueue.join()
info('Calling execute: {}'.format(commands[message['channel']]))
output, result = executeCommand(commands[message['channel']])
debug('ProcessPowerCommand: {}, result: {}, output: {}'.format(message, result, output))
if result != 0:
Expand All @@ -416,9 +439,13 @@ def ProcessPowerCommand(self, message):
data = []
cayennemqtt.DataChannel.add(data, message['channel'], value=0)
self.EnqueuePacket(data)
raise ExecuteMessageError(error_message)
return error_message == None

def ProcessAgentCommand(self, message):
"""Process command to manage the agent state"""
"""Process command to manage the agent state
Returns: True if command was processed, False otherwise."""
error = None
try:
if message['suffix'] == 'uninstall':
Expand All @@ -439,9 +466,14 @@ def ProcessAgentCommand(self, message):
except Exception as ex:
error = '{}: {}'.format(type(ex).__name__, ex)
self.EnqueueCommandResponse(message, error)
if error:
raise ExecuteMessageError(error)
return error == None

def ProcessConfigCommand(self, message):
"""Process system configuration command"""
"""Process system configuration command
Returns: True if command was processed, False otherwise."""
error = None
try:
value = 1 - int(message['payload']) #Invert the value since the config script uses 0 for enable and 1 for disable
Expand All @@ -453,9 +485,12 @@ def ProcessConfigCommand(self, message):
except Exception as ex:
error = '{}: {}'.format(type(ex).__name__, ex)
self.EnqueueCommandResponse(message, error)

return error == None

def ProcessGpioCommand(self, message):
"""Process GPIO command"""
"""Process GPIO command
Returns: True if command was processed, False otherwise."""
error = None
try:
channel = int(message['channel'].replace(cayennemqtt.SYS_GPIO + ':', ''))
Expand All @@ -466,9 +501,12 @@ def ProcessGpioCommand(self, message):
except Exception as ex:
error = '{}: {}'.format(type(ex).__name__, ex)
self.EnqueueCommandResponse(message, error)
return error == None

def ProcessSensorCommand(self, message):
"""Process sensor command"""
"""Process sensor command
Returns: True if command was processed, False otherwise."""
error = None
try:
sensor_info = message['channel'].replace(cayennemqtt.DEV_SENSOR + ':', '').split(':')
Expand All @@ -483,9 +521,12 @@ def ProcessSensorCommand(self, message):
except Exception as ex:
error = '{}: {}'.format(type(ex).__name__, ex)
self.EnqueueCommandResponse(message, error)
return error == None

def ProcessDeviceCommand(self, message):
"""Process a device command to add/edit/remove a sensor"""
"""Process a device command to add/edit/remove a sensor
Returns: True if command was processed, False otherwise."""
error = None
try:
payload = message['payload']
Expand All @@ -504,9 +545,38 @@ def ProcessDeviceCommand(self, message):
except Exception as ex:
error = '{}: {}'.format(type(ex).__name__, ex)
self.EnqueueCommandResponse(message, error)
return error == None

def ProcessSchedulerCommand(self, message):
"""Process command to add/edit/remove a scheduled action
Returns: True if command was processed, False otherwise."""
error = None
try:
if message['suffix'] == 'add':
result = self.schedulerEngine.add_scheduled_event(message['payload'], True)
elif message['suffix'] == 'edit':
result = self.schedulerEngine.update_scheduled_event(message['payload'])
elif message['suffix'] == 'delete':
result = self.schedulerEngine.remove_scheduled_event(message['payload'])
elif message['suffix'] == 'get':
events = self.schedulerEngine.get_scheduled_events()
self.EnqueuePacket(events, cayennemqtt.JOBS_TOPIC)
else:
error = 'Unknown schedule command: {}'.format(message['suffix'])
debug('ProcessSchedulerCommand result: {}'.format(result))
if result is False:
error = 'Schedule command failed'
except Exception as ex:
error = '{}: {}'.format(type(ex).__name__, ex)
self.EnqueueCommandResponse(message, error)
return error == None

def EnqueueCommandResponse(self, message, error):
"""Send response after processing a command message"""
if not 'cmdId' in message:
# If there is no command id we assume this is a scheduled command and don't send a response message.
return
debug('EnqueueCommandResponse error: {}'.format(error))
if error:
response = 'error,{}={}'.format(message['cmdId'], error)
Expand Down
2 changes: 1 addition & 1 deletion myDevices/cloud/dbmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,6 @@ def test():
cursor = connection.cursor()
except Exception as ex:
error('DbManager failed to initialize: ' + str(ex))
DbManager.CreateTable('scheduled_settings', "id TEXT PRIMARY KEY, data TEXT", ['id', 'data'])
# DbManager.CreateTable('scheduled_events', "id TEXT PRIMARY KEY, data TEXT", ['id', 'data'])
DbManager.CreateTable('disabled_sensors', "id TEXT PRIMARY KEY", ['id'])
DbManager.CreateTable('historical_averages', "id INTEGER PRIMARY KEY, data TEXT, count INTEGER, start TIMESTAMP, end TIMESTAMP, interval TEXT, send TEXT, count_sensor TEXT", ['id', 'data', 'count', 'start', 'end', 'interval', 'send', 'count_sensor'])
Loading

0 comments on commit 1b71c6c

Please sign in to comment.