Skip to content

Commit

Permalink
switch to functional design
Browse files Browse the repository at this point in the history
  • Loading branch information
johnwaalsh committed Jan 2, 2025
1 parent 42657bc commit ebde1dd
Showing 1 changed file with 89 additions and 79 deletions.
168 changes: 89 additions & 79 deletions src/phyto_arm/src/digital_logger_node.py
Original file line number Diff line number Diff line change
@@ -1,99 +1,109 @@
#!/usr/bin/env python3
"""
Functionality for monitoring and controlling a Digital Loggers Web Power Switch Pro model.
"""
import base64
import urllib.request

import rospy

from phyto_arm.msg import OutletStatus

# Global variables relevant to digital logger control. These variables are defined once the digital
# logger node is initialized.
AUTH = ''
ADDRESS = ''
OUTLETS = ''
OUTLET_NAMES = ''

class DigitalLogger:
def __init__(self):
rospy.init_node('digital_logger')

# subscribe to the digital logger control topic
rospy.Subscriber('/digital_logger/control', OutletStatus, self.control_outlet)
def control_outlet(msg):
"""
Send the given msg to the digital logger as an HTTP request.
"""

self.username = rospy.get_param('~username')
self.password = rospy.get_param('~password')
self.address = rospy.get_param('~address')
self.outlets = rospy.get_param('~outlets')
self.outlet_names = {outlet['name']: int(outlet['outlet']) for outlet in self.outlets}
outlet_num = OUTLET_NAMES.get(msg.name)

self.auth = f'Basic {base64.b64encode(f"{self.username}:{self.password}".encode()).decode()}'
data = f'value={str(msg.is_active).lower()}'
url = f'http://{ADDRESS}/restapi/relay/outlets/{outlet_num}/state/'

self.outlet_publishers = []
for outlet_num, _ in enumerate(self.outlets):
self.outlet_publishers.append(rospy.Publisher(f'/digital_logger/outlet/{outlet_num}/status/', OutletStatus, queue_size=10))
# Create a PUT request
req = urllib.request.Request(url, data=data.encode("utf-8"), method="PUT")
req.add_header("Authorization", AUTH)
req.add_header("X-CSRF", 'x')

# Monitor outlets at 1Hz
self.rate = rospy.Rate(1)

def run(self):
"""
Run the digital logger node. Publishes outlet statuses at
/digital_logger/outlets/{outlet num}/status. The outlets can be controlled by publishing a
OutletStatus message to /digital_logger/control.
"""
while not rospy.is_shutdown():
# send a status request for each available outlet
for outlet_index, _ in enumerate(self.outlets):
# Construct request
url = f'http://{self.address}/restapi/relay/outlets/{outlet_index}/state/'
request = urllib.request.Request(url)
request.add_header("Authorization", self.auth)

try:
# Send the GET request
with urllib.request.urlopen(request) as response:
# Read and decode the response
response_data = response.read().decode('utf-8')
except urllib.error.HTTPError as http_err:
raise ValueError(f"HTTP Error: {http_err.code} : {http_err.reason}") from http_err
except urllib.error.URLError as url_err:
raise ValueError(f"URL Error: {url_err.reason}") from url_err

# publish the status of each outlet to its specific topic
outlet_status = OutletStatus()
outlet_status.name = self.outlets[outlet_index]['name']
# DL API uses 'true' and 'false' to denote outlet status, which need to be converted to Python bools
outlet_status.is_active = response_data == 'true'

self.outlet_publishers[outlet_index].publish(outlet_status)

self.rate.sleep()

def control_outlet(self, msg):
"""
Send the given msg to the digital logger as an HTTP request.
"""

outlet_num = self.outlet_names.get(msg.name)

data = f'value={str(msg.is_active).lower()}'
url = f'http://{self.address}/restapi/relay/outlets/{outlet_num}/state/'

# Create a PUT request
req = urllib.request.Request(url, data=data.encode("utf-8"), method="PUT")
req.add_header("Authorization", self.auth)
req.add_header("X-CSRF", 'x')

try:
# Send the PUT request
response = urllib.request.urlopen(req)
except urllib.error.HTTPError as http_err:
raise ValueError(f"HTTP Error: {http_err.code} : {http_err.reason}") from http_err
except urllib.error.URLError as url_err:
raise ValueError(f"URL Error: {url_err.reason}") from url_err

result = response.read().decode('utf-8')

rospy.loginfo(f'sent status={str(msg.is_active).lower()} to {self.address}:{url}, received: code {response.status} : {result}')
try:
# Send the PUT request
response = urllib.request.urlopen(req)
except urllib.error.HTTPError as http_err:
raise ValueError(f"HTTP Error: {http_err.code} : {http_err.reason}") from http_err
except urllib.error.URLError as url_err:
raise ValueError(f"URL Error: {url_err.reason}") from url_err

result = response.read().decode('utf-8')

rospy.loginfo(f'sent status={str(msg.is_active).lower()} to {ADDRESS}:{url}, received: code {response.status} : {result}')


def run_digital_logger_node():
"""
Run the digital logger node. Publishes outlet statuses at
/digital_logger/OUTLETS/{outlet num}/status. The OUTLETS can be controlled by publishing a
OutletStatus message to /digital_logger/control.
"""
global AUTH, ADDRESS, OUTLETS, OUTLET_NAMES

rospy.init_node('digital_logger')

rospy.init_node('digital_logger')
username = rospy.get_param('~username')
password = rospy.get_param('~password')
AUTH = f"Basic {base64.b64encode(f'{username}:{password}'.encode()).decode()}"
ADDRESS = rospy.get_param('~address')
OUTLETS = rospy.get_param('~outlets')
OUTLET_NAMES = {outlet['name']: int(outlet['outlet']) for outlet in OUTLETS}

# subscribe to the digital logger control topic
rospy.Subscriber('/digital_logger/control', OutletStatus, control_outlet)

outlet_publishers = []
for outlet_num, _ in enumerate(OUTLETS):
outlet_publishers.append(rospy.Publisher(f'/digital_logger/outlet/{outlet_num}/status/', OutletStatus, queue_size=10))

# Monitor outlets at 1Hz
rate = rospy.Rate(1)

while not rospy.is_shutdown():
# send a status request for each available outlet
for outlet_index, _ in enumerate(OUTLETS):
# Construct request
url = f'http://{ADDRESS}/restapi/relay/outlets/{outlet_index}/state/'
request = urllib.request.Request(url)
request.add_header("Authorization", AUTH)

try:
# Send the GET request
with urllib.request.urlopen(request) as response:
# Read and decode the response
response_data = response.read().decode('utf-8')
except urllib.error.HTTPError as http_err:
raise ValueError(f"HTTP Error: {http_err.code} : {http_err.reason}") from http_err
except urllib.error.URLError as url_err:
raise ValueError(f"URL Error: {url_err.reason}") from url_err

# publish the status of each outlet to its specific topic
outlet_status = OutletStatus()
outlet_status.name = OUTLETS[outlet_index]['name']
# DL API uses 'true' and 'false' to denote outlet status, which need to be converted to Python booleans
outlet_status.is_active = response_data == 'true'

outlet_publishers[outlet_index].publish(outlet_status)

rate.sleep()


if __name__ == '__main__':
try:
digital_logger = DigitalLogger()
digital_logger.run()
run_digital_logger_node()
except rospy.ROSInterruptException:
pass

0 comments on commit ebde1dd

Please sign in to comment.