Skip to content

Commit

Permalink
Add connect and disconnect as context manager
Browse files Browse the repository at this point in the history
  • Loading branch information
rickwu666666 committed Apr 15, 2024
1 parent 64a73fa commit 222125c
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 101 deletions.
133 changes: 91 additions & 42 deletions contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/check_gpio.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#!/usr/bin/env python3
import argparse
import os
from checkbox_support.snap_utils.snapd import Snapd
from contextlib import contextmanager
# from checkbox_support.snap_utils.snapd import Snapd
from snapd import Snapd
from checkbox_support.snap_utils.system import get_gadget_snap
import requests

Expand All @@ -25,10 +27,7 @@ def list_gpio_slots(snapd, gadget_name):
for slots in snapd.interfaces()["slots"]:
if slots["interface"] == "gpio" and slots["snap"] == gadget_name:
gpio_slot[slots["slot"]] = {"number": slots["attrs"]["number"]}
if gpio_slot:
return gpio_slot
else:
raise SystemExit("Error: Can not find any GPIO slot")
return gpio_slot


def parse_config(config):
Expand Down Expand Up @@ -88,51 +87,99 @@ def check_gpio_list(gpio_list, config):
SystemExit: If any expected GPIO slot is not defined in the gadget
snap.
"""
expect_port = parse_config(config)
for gpio in gpio_list.values():
if gpio["number"] in expect_port:
expect_port.remove(gpio["number"])
if expect_port:
for gpio_slot in expect_port:
print(
"Error: Slot of GPIO {} is not defined in gadget snap".format(
gpio_slot)
if gpio_list:
expect_port = parse_config(config)
for gpio in gpio_list.values():
if gpio["number"] in expect_port:
expect_port.remove(gpio["number"])
if expect_port:
for gpio_slot in expect_port:
print(
"Error: Slot of GPIO {} is not defined in gadget snap".
format(gpio_slot)
)
raise SystemExit(1)
else:
print("All expected GPIO slots have been defined in gadget snap.")
else:
raise SystemExit("Error: No any GPIO slots existed!")


@contextmanager
def interface_test(gpio_slot, gadget_name, timeout=60):
snap = os.environ['SNAP_NAME']
timeout = int(os.environ.get('SNAPD_TASK_TIMEOUT', timeout))
try:
connect_interface(gadget_name,
gpio_slot,
snap,
timeout)
yield
finally:
disconnect_interface(gadget_name,
gpio_slot,
snap,
timeout)


def connect_interface(gadget_name,
gpio_slot,
snap,
timeout):
"""
Connect GPIO plugs of checkbox to GPIO slots of gadget snap.
Args:
gpio_slot: A GPIO slot information.
gadget_name: The name of the gadget snap.
Raises:
SystemExit: If failed to connect any GPIO.
"""

# Get the snap name of checkbox
print("Attempting connect GPIO to {}:{}".format(gadget_name, gpio_slot))
try:
Snapd(task_timeout=timeout).connect(
gadget_name,
gpio_slot,
snap,
"gpio"
)
print("Success")
except requests.HTTPError:
print("Failed to connect {}".format(gpio_slot))
raise SystemExit(1)
else:
print("All expected GPIO slots have been defined in gadget snap.")


def connect_gpio(gpio_slots, gadget_name):
def disconnect_interface(gadget_name,
gpio_slot,
snap,
timeout):
"""
Connect GPIO plugs of checkbox to GPIO slots of gadget snap.
Args:
gpio_slots: A dictionary containing GPIO slot information.
gpio_slot: A GPIO slot information.
gadget_name: The name of the gadget snap.
Raises:
SystemExit: If failed to connect any GPIO.
"""

exit_code = 0
# Get the snap name of checkbox
snap = os.environ['SNAP_NAME']
timeout = int(os.environ.get('SNAPD_TASK_TIMEOUT', 60))
for gpio_num in gpio_slots.keys():
print("Attempting connect gpio to {}:{}".format(gadget_name, gpio_num))
try:
Snapd(task_timeout=timeout).connect(
gadget_name,
gpio_num,
snap,
"gpio"
)
print("Success")
except requests.HTTPError:
print("Failed to connect {}".format(gpio_num))
exit_code = 1
if exit_code == 1:
print("Attempting disconnect GPIO slot {}:{}".
format(gadget_name, gpio_slot))
try:
Snapd(task_timeout=timeout).disconnect(
gadget_name,
gpio_slot,
snap,
"gpio"
)
print("Success")
except requests.HTTPError:
print("Failed to disconnect {}".format(gpio_slot))
raise SystemExit(1)


Expand All @@ -158,7 +205,7 @@ def main():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(
dest='action',
help="Action in check-gpio, check-node, connect and dump",
help="Action in check-gpio, check-node and dump",
)
check_gpio_subparser = subparsers.add_parser("check-gpio")
check_gpio_subparser.add_argument(
Expand All @@ -176,9 +223,12 @@ def main():
required=True,
help="GPIO number to check if node exported",
)
subparsers.add_parser(
"connect",
help="Connect checkbox GPIO plug and gadget GPIO slots "
check_node_subparser.add_argument(
"-s",
"--slot",
type=str,
required=True,
help="GPIO slot to connect.",
)
subparsers.add_parser(
"dump",
Expand All @@ -190,8 +240,6 @@ def main():
gpio_slots = list_gpio_slots(snapd, gadget_name)
if args.action == "check-gpio":
check_gpio_list(gpio_slots, args.config)
if args.action == "connect":
connect_gpio(gpio_slots, gadget_name)
if args.action == "dump":
for x in gpio_slots:
print(
Expand All @@ -200,7 +248,8 @@ def main():
)
)
if args.action == "check-node":
check_node(args.num)
with interface_test(args.slot, gadget_name):
check_node(args.num)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from unittest.mock import patch, MagicMock, call
import requests
import check_gpio
import os


class TestCheckGpio(unittest.TestCase):
Expand Down Expand Up @@ -65,12 +66,11 @@ def test_list_gpio_slots_not_exist(self):
"attrs": {}},
]
}
expected_result = "Error: Can not find any GPIO slot"
with self.assertRaises(SystemExit) as err:
check_gpio.list_gpio_slots(snapd_mock, gadget_name)
self.assertEqual(err.exception.args[0], expected_result)
expected_result = {}
self.assertEqual(check_gpio.list_gpio_slots(snapd_mock, gadget_name),
expected_result)

@patch('builtins.print') # Mock print function to prevent actual printing
@patch('builtins.print')
def test_check_gpio_list_all_defined(self, mock_print):
gpio_list = {
1: {"number": 499},
Expand All @@ -85,7 +85,7 @@ def test_check_gpio_list_all_defined(self, mock_print):
mock_print.assert_called_with(
"All expected GPIO slots have been defined in gadget snap.")

@patch('builtins.print') # Mock print function to prevent actual printing
@patch('builtins.print')
def test_check_gpio_list_missing(self, mock_print):
gpio_list = {
1: {"number": 499},
Expand All @@ -105,65 +105,91 @@ def test_check_gpio_list_missing(self, mock_print):
# Assert that SystemExit is raised with exit code 1
self.assertEqual(context.exception.code, 1)

@patch('check_gpio.os.environ')
@patch('builtins.print')
@patch('check_gpio.Snapd')
def test_connect_gpio_success(self, mock_snapd, mock_environ):
mock_environ.__getitem__.side_effect = lambda x: {
'SNAP_NAME': 'checkbox_snap',
'SNAPD_TASK_TIMEOUT': '30'
}[x]
def test_connect_interface_success(self, mock_snapd, mock_print):
mock_snapd.return_value.connect.side_effect = None

gpio_slots = {
"gpio-499": {"number": 499},
"gpio-500": {"number": 500}
}
gpio_slot = "gpio-499"
gadget_name = "gadget_snap"
snap = "checkbox_snap"
timeout = 30
check_gpio.connect_interface(gadget_name,
gpio_slot,
snap,
timeout)

check_gpio.connect_gpio(gpio_slots, gadget_name)

# Assert that connect is called for each GPIO slot
expected_calls = [call(gadget_name,
'gpio-499',
'checkbox_snap',
'gpio'),
call(gadget_name,
'gpio-500',
'checkbox_snap',
'gpio')
]
gpio_slot,
snap,
'gpio')]
mock_snapd.return_value.connect.assert_has_calls(expected_calls)
mock_print.assert_called_with("Success")

@patch('check_gpio.os.environ')
@patch('builtins.print')
@patch('check_gpio.Snapd')
def test_connect_gpio_fail(self, mock_snapd, mock_environ):
mock_environ.__getitem__.side_effect = lambda x: {
'SNAP_NAME': 'checkbox_snap',
'SNAPD_TASK_TIMEOUT': '30'
}[x]
def test_connect_interface_fail(self, mock_snapd, mock_print):
mock_snapd.return_value.connect.side_effect = requests.HTTPError

gpio_slots = {
"gpio-499": {"number": 499},
"gpio-500": {"number": 500}
}
gpio_slot = "gpio-499"
gadget_name = "gadget_snap"
snap = "checkbox_snap"
timeout = 30
with self.assertRaises(SystemExit) as err:
check_gpio.connect_gpio(gpio_slots, gadget_name)
check_gpio.connect_interface(gadget_name,
gpio_slot,
snap,
timeout)

# Assert that connect is called for each GPIO slot
expected_calls = [call(gadget_name,
'gpio-499',
'checkbox_snap',
'gpio'),
call(gadget_name,
'gpio-500',
'checkbox_snap',
gpio_slot,
snap,
'gpio')]
mock_snapd.return_value.connect.assert_has_calls(expected_calls)
mock_print.assert_called_with("Failed to connect gpio-499")
self.assertEqual(err.exception.code, 1)

@patch('builtins.print') # Mock print function to prevent actual printing
@patch('builtins.print')
@patch('check_gpio.Snapd')
def test_disconnect_interface_success(self, mock_snapd, mock_print):
mock_snapd.return_value.disconnect.side_effect = None
gpio_slot = "gpio-499"
gadget_name = "gadget_snap"
snap = "checkbox_snap"
timeout = 30
check_gpio.disconnect_interface(gadget_name,
gpio_slot,
snap,
timeout)

expected_calls = [call(gadget_name,
gpio_slot,
snap,
'gpio')]
mock_snapd.return_value.disconnect.assert_has_calls(expected_calls)
mock_print.assert_called_with("Success")

@patch('builtins.print')
@patch('check_gpio.Snapd')
def test_disconnect_interface_fail(self, mock_snapd, mock_print):
mock_snapd.return_value.disconnect.side_effect = requests.HTTPError
gpio_slot = "gpio-499"
gadget_name = "gadget_snap"
snap = "checkbox_snap"
timeout = 30
with self.assertRaises(SystemExit) as err:
check_gpio.disconnect_interface(gadget_name,
gpio_slot,
snap,
timeout)

expected_calls = [call(gadget_name,
gpio_slot,
snap,
'gpio')]
mock_snapd.return_value.disconnect.assert_has_calls(expected_calls)
mock_print.assert_called_with("Failed to disconnect gpio-499")
self.assertEqual(err.exception.code, 1)

@patch('builtins.print')
def test_check_node_exists(self, mock_print):
# Mocking os.path.exists to return True
with patch('os.path.exists', return_value=True):
Expand All @@ -180,6 +206,27 @@ def test_check_node_not_exist(self):
self.assertEqual(context.exception.args[0],
"GPIO node of 499 not exist!")

@patch.dict(os.environ, {'SNAP_NAME': 'checkbox_snap'})
@patch.dict(os.environ, {'SNAPD_TASK_TIMEOUT': '30'})
@patch('check_gpio.connect_interface')
@patch('check_gpio.disconnect_interface')
def test_interface_test(self,
mock_disconnect,
mock_connect):
gadget_name = "gadget"
gpio_slot = "gpio-499"
mock_connect.side_effect = None
mock_disconnect.side_effect = None
with check_gpio.interface_test(gpio_slot, gadget_name):
mock_connect.assert_called_once_with(gadget_name,
gpio_slot,
'checkbox_snap',
30)
mock_disconnect.assert_called_once_with(gadget_name,
gpio_slot,
'checkbox_snap',
30)


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 222125c

Please sign in to comment.