Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Microcontroller script improvements #37

Merged
merged 3 commits into from
Oct 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 59 additions & 15 deletions build/figurines/microcontroller_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@

# =========================== IMPORTS ================================== #

import serial.tools.list_ports
import threading
import time
from time import sleep
import requests
import socket
from json import JSONDecodeError
import json
import logging

import serial.tools.list_ports
import requests
from serial import SerialException, SerialTimeoutException
# =========================== GLOBAL VALUES ================================== #

POLL_DELAY = 15
COM_PORT_PREFIX = "cu.usbmodem" # usbmodem1101 and usbmodem2101
MAX_CONNECTION_COUNT = 10

global connect_status, read_state, port, serialPort
global serial_count # Optimized counter, used to clear the serial input buffer
serial_count = 0
Expand All @@ -30,8 +33,8 @@

# =========================== SERIAL HANDLING ================================== #

logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s")
_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.DEBUG)

def initialise_serial(port):
global serialPort, connect_status
Expand Down Expand Up @@ -106,8 +109,11 @@ def check_presence(port, interval=0.01):


def start_thread():
"""Starts a thread that checks serial port presence"""
global port, connect_status
_LOGGER.info("Starting thread at port %s", port)
# target - callable object invoked by the start() or run() command
# args - arguments passed to check_presence command
port_controller = threading.Thread(target=check_presence, args=(port, 0.1))
port_controller.daemon = True
port_controller.start()
Expand All @@ -117,31 +123,69 @@ def start_thread():


def main():
"""Main script loop that will attempt to connect to the microcontroller.
If so, it will send data via serial to move the servo motors
"""

global port, connect_status
figurine_status = {}
# figurine_status = {}

connection_count = 0

while True:
if connection_count >= MAX_CONNECTION_COUNT:
_LOGGER.error("Could not find valid serial connection and timed out, try again")
return

if not connect_status:
port_device = None
port_device = None
ports = serial.tools.list_ports.comports()
for port in ports:
if COM_PORT_PREFIX in port.name:
_LOGGER.info("Trying port - %s", port.device)
if port.name is not None and COM_PORT_PREFIX in port.name:
port_device = port[0]
break
connection_count += 1
if port_device is not None:
initialise_serial(port_device)
start_thread()
try:
initialise_serial(port_device)
except ValueError as e:
_LOGGER.error("Parameter given is out of range")
_LOGGER.debug(e)
except SerialException as e:
# SerialException derives from IOError
_LOGGER.error("Device can not be found or configured")
_LOGGER.debug(e)

try:
start_thread()
except RuntimeError as e:
_LOGGER.error("start() method called more than once for same thread object")
_LOGGER.debug(e)

else:
connect_status = False
continue
if serialPort.is_open:
try:
response = json.loads(
requests.get('http://127.0.0.1:8000/figurines', timeout=5).text
)
if response != figurine_status:
figurine_status = response
send_to_controller(figurine_status)
response_body = requests.get('http://127.0.0.1:8000/figurines', timeout=5).text
if response_body is not None:
try:
response = json.loads(response_body)
except JSONDecodeError as e:
_LOGGER.error("Not valid JSON document")
_LOGGER.debug(e)
else:
_LOGGER.Error("Got nothing from the api")
continue
try:
send_to_controller(response)
except SerialTimeoutException as e:
_LOGGER.error("Write timeout")
_LOGGER.debug(e)
# if response != figurine_status:
# figurine_status = response
# send_to_controller(figurine_status)
sleep(POLL_DELAY)
except requests.exceptions.ConnectionError as e:
_LOGGER.info("Error making HTTP request")
Expand Down