-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
rework raspberry pi driver with digitalio
- Loading branch information
Showing
10 changed files
with
406 additions
and
159 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,4 +11,5 @@ can.md | |
pyserial.md | ||
sdwire.md | ||
ustreamer.md | ||
raspberrypi.md | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
# Raspberry Pi drivers | ||
|
||
Raspberry Pi drivers are a set of drivers for the various peripherals on Pi and similar single board computers. | ||
|
||
## Driver configuration | ||
```yaml | ||
export: | ||
my_serial: | ||
type: "jumpstarter_driver_raspberrypi.driver.DigitalIO" | ||
config: | ||
pin: "D3" | ||
``` | ||
### Config parameters | ||
| Parameter | Description | Type | Required | Default | | ||
|-----------|-------------|------|----------|---------| | ||
| pin | Name of the GPIO pin to connect to, in [Adafruit Blinka format](https://docs.circuitpython.org/projects/blinka/en/latest/index.html#usage-example) | str | yes | | | ||
## DigitalIOClient API | ||
```{eval-rst} | ||
.. autoclass:: jumpstarter_driver_raspberrypi.client.DigitalIOClient | ||
:members: | ||
``` | ||
## Examples | ||
Switch pin to push pull output and set output to high | ||
```{testcode} | ||
digitalioclient.switch_to_output(value=False, drive_mode=digitalio.DriveMode.PUSH_PULL) # default to low | ||
digitalioclient.value = True | ||
``` | ||
|
||
Switch pin to input with pull up and read value | ||
```{testcode} | ||
digitalioclient.switch_to_input(pull=digitalio.Pull.UP) | ||
print(digitalioclient.value) | ||
``` |
56 changes: 45 additions & 11 deletions
56
packages/jumpstarter-driver-raspberrypi/jumpstarter_driver_raspberrypi/client.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,21 +1,55 @@ | ||
from dataclasses import dataclass | ||
|
||
from digitalio import DriveMode, Pull | ||
|
||
from jumpstarter.client import DriverClient | ||
|
||
|
||
@dataclass(kw_only=True) | ||
class DigitalOutputClient(DriverClient): | ||
def off(self): | ||
self.call("off") | ||
class DigitalIOClient(DriverClient): | ||
"""DigitalIO (Digital GPIO) client class | ||
def on(self): | ||
self.call("on") | ||
Client methods for the DigitalIO driver. | ||
""" | ||
|
||
def switch_to_output(self, value: bool = False, drive_mode: DriveMode = DriveMode.PUSH_PULL) -> None: | ||
""" | ||
Switch pin to output mode with given default value and drive mode | ||
""" | ||
|
||
@dataclass(kw_only=True) | ||
class DigitalInputClient(DriverClient): | ||
def wait_for_active(self, timeout: float | None = None): | ||
self.call("wait_for_active", timeout) | ||
match drive_mode: | ||
case DriveMode.PUSH_PULL: | ||
drive_mode = 0 | ||
case DriveMode.OPEN_DRAIN: | ||
drive_mode = 1 | ||
case _: | ||
raise ValueError("unrecognized drive_mode") | ||
self.call("switch_to_output", value, drive_mode) | ||
|
||
def switch_to_input(self, pull: Pull | None = None) -> None: | ||
""" | ||
Switch pin to input mode with given pull up/down mode | ||
""" | ||
|
||
match pull: | ||
case None: | ||
pull = 0 | ||
case Pull.UP: | ||
pull = 1 | ||
case Pull.DOWN: | ||
pull = 2 | ||
case _: | ||
raise ValueError("unrecognized pull") | ||
self.call("switch_to_input", pull) | ||
|
||
@property | ||
def value(self) -> bool: | ||
""" | ||
Current value of the pin | ||
""" | ||
|
||
return self.call("get_value") | ||
|
||
def wait_for_inactive(self, timeout: float | None = None): | ||
self.call("wait_for_inactive", timeout) | ||
@value.setter | ||
def value(self, value: bool) -> None: | ||
self.call("set_value", value) |
125 changes: 95 additions & 30 deletions
125
packages/jumpstarter-driver-raspberrypi/jumpstarter_driver_raspberrypi/driver.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,63 +1,128 @@ | ||
from collections.abc import AsyncGenerator | ||
from dataclasses import dataclass, field | ||
from time import sleep | ||
|
||
from gpiozero import DigitalInputDevice, DigitalOutputDevice, InputDevice | ||
import board | ||
from digitalio import DigitalInOut, DriveMode, Pull | ||
from jumpstarter_driver_power.driver import PowerInterface, PowerReading | ||
|
||
from jumpstarter.driver import Driver, export | ||
|
||
|
||
@dataclass(kw_only=True) | ||
class DigitalOutput(Driver): | ||
pin: int | str | ||
device: InputDevice = field(init=False) # Start as input | ||
class DigitalIO(Driver): | ||
pin: str | ||
device: DigitalInOut = field(init=False) | ||
|
||
@classmethod | ||
def client(cls) -> str: | ||
return "jumpstarter_driver_raspberrypi.client.DigitalOutputClient" | ||
return "jumpstarter_driver_raspberrypi.client.DigitalIOClient" | ||
|
||
def __post_init__(self): | ||
if hasattr(super(), "__post_init__"): | ||
super().__post_init__() | ||
# Initialize as InputDevice first | ||
self.device = InputDevice(pin=self.pin) | ||
# Defaults to input with no pull | ||
try: | ||
self.device = DigitalInOut(pin=getattr(board, self.pin)) | ||
except AttributeError as err: | ||
raise ValueError(f"Invalid pin name: {self.pin}") from err | ||
|
||
def close(self): | ||
if hasattr(self, "device"): | ||
self.device.close() | ||
super().close() | ||
self.device.deinit() | ||
|
||
@export | ||
def off(self): | ||
if not isinstance(self.device, DigitalOutputDevice): | ||
self.device.close() | ||
self.device = DigitalOutputDevice(pin=self.pin, initial_value=None) | ||
self.device.off() | ||
def switch_to_output(self, value: bool = False, drive_mode: int = 0) -> None: | ||
match drive_mode: | ||
case 0: | ||
drive_mode = DriveMode.PUSH_PULL | ||
case 1: | ||
drive_mode = DriveMode.OPEN_DRAIN | ||
case _: | ||
raise ValueError("unrecognized drive_mode") | ||
|
||
self.device.switch_to_output(value, drive_mode) | ||
|
||
@export | ||
def on(self): | ||
if not isinstance(self.device, DigitalOutputDevice): | ||
self.device.close() | ||
self.device = DigitalOutputDevice(pin=self.pin, initial_value=None) | ||
self.device.on() | ||
def switch_to_input(self, pull: int = 0) -> None: | ||
match pull: | ||
case 0: | ||
pull = None | ||
case 1: | ||
pull = Pull.UP | ||
case 2: | ||
pull = Pull.DOWN | ||
case _: | ||
raise ValueError("unrecognized pull") | ||
|
||
self.device.switch_to_input(pull) | ||
|
||
@export | ||
def set_value(self, value: bool) -> None: | ||
self.device.value = value | ||
|
||
@export | ||
def get_value(self) -> bool: | ||
return self.device.value | ||
|
||
|
||
@dataclass(kw_only=True) | ||
class DigitalInput(Driver): | ||
pin: int | str | ||
device: DigitalInputDevice = field(init=False) | ||
class DigitalPowerSwitch(PowerInterface, DigitalIO): | ||
value: bool = False | ||
drive_mode: str = "PUSH_PULL" | ||
|
||
@classmethod | ||
def client(cls) -> str: | ||
return "jumpstarter_driver_raspberrypi.client.DigitalInputClient" | ||
def __post_init__(self): | ||
if hasattr(super(), "__post_init__"): | ||
super().__post_init__() | ||
|
||
try: | ||
self.device.switch_to_output(value=self.value, drive_mode=getattr(DriveMode, self.drive_mode)) | ||
except AttributeError as err: | ||
raise ValueError(f"Invalid drive mode: {self.drive_mode}") from err | ||
|
||
@export | ||
def on(self) -> None: | ||
self.device.value = True | ||
|
||
@export | ||
def off(self) -> None: | ||
self.device.value = False | ||
|
||
@export | ||
def read(self) -> AsyncGenerator[PowerReading, None]: | ||
raise NotImplementedError | ||
|
||
|
||
@dataclass(kw_only=True) | ||
class DigitalPowerButton(PowerInterface, DigitalIO): | ||
value: bool = False | ||
drive_mode: str = "OPEN_DRAIN" | ||
on_press_seconds: int = 1 | ||
off_press_seconds: int = 5 | ||
|
||
def __post_init__(self): | ||
if hasattr(super(), "__post_init__"): | ||
super().__post_init__() | ||
self.device = DigitalInputDevice(pin=self.pin) | ||
|
||
try: | ||
self.device.switch_to_output(value=self.value, drive_mode=getattr(DriveMode, self.drive_mode)) | ||
except AttributeError as err: | ||
raise ValueError(f"Invalid drive mode: {self.drive_mode}") from err | ||
|
||
def press(self, seconds: int) -> None: | ||
self.device.value = self.value | ||
self.device.value = not self.value | ||
sleep(seconds) | ||
self.device.value = self.value | ||
|
||
@export | ||
def on(self) -> None: | ||
self.press(self.on_press_seconds) | ||
|
||
@export | ||
def wait_for_active(self, timeout: float | None = None): | ||
self.device.wait_for_active(timeout) | ||
def off(self) -> None: | ||
self.press(self.off_press_seconds) | ||
|
||
@export | ||
def wait_for_inactive(self, timeout: float | None = None): | ||
self.device.wait_for_inactive(timeout) | ||
def read(self) -> AsyncGenerator[PowerReading, None]: | ||
raise NotImplementedError |
61 changes: 31 additions & 30 deletions
61
packages/jumpstarter-driver-raspberrypi/jumpstarter_driver_raspberrypi/driver_test.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,46 +1,47 @@ | ||
from concurrent.futures import ThreadPoolExecutor | ||
|
||
from gpiozero import Device | ||
from gpiozero.pins.mock import MockFactory | ||
|
||
from jumpstarter_driver_raspberrypi.driver import DigitalInput, DigitalOutput | ||
|
||
from jumpstarter.common.utils import serve | ||
|
||
Device.pin_factory = MockFactory() | ||
|
||
def test_drivers_gpio_digital_input(monkeypatch): | ||
monkeypatch.setenv("BLINKA_OS_AGNOSTIC", "1") | ||
|
||
def test_drivers_gpio_digital_output(): | ||
pin_factory = MockFactory() | ||
Device.pin_factory = pin_factory | ||
pin_number = 1 | ||
mock_pin = pin_factory.pin(pin_number) | ||
from digitalio import Pull | ||
|
||
instance = DigitalOutput(pin=pin_number) | ||
from jumpstarter_driver_raspberrypi.driver import DigitalIO | ||
|
||
assert not mock_pin.state | ||
with serve(DigitalIO(pin="Dx_INPUT_TOGGLE")) as client: | ||
client.switch_to_input(pull=Pull.UP) | ||
assert client.value | ||
assert not client.value | ||
assert client.value | ||
|
||
with serve(instance) as client: | ||
client.off() | ||
assert not mock_pin.state | ||
|
||
client.on() | ||
assert mock_pin.state | ||
def test_drivers_gpio_digital_output(monkeypatch): | ||
monkeypatch.setenv("BLINKA_OS_AGNOSTIC", "1") | ||
|
||
client.off() | ||
assert not mock_pin.state | ||
from digitalio import DriveMode | ||
|
||
from jumpstarter_driver_raspberrypi.driver import DigitalIO | ||
|
||
mock_pin.assert_states([False, True, False]) | ||
with serve(DigitalIO(pin="Dx_OUTPUT")) as client: | ||
client.switch_to_output(value=True, drive_mode=DriveMode.PUSH_PULL) | ||
client.value = True | ||
assert client.value | ||
client.value = False | ||
# Dx_OUTPUT is always True | ||
assert client.value | ||
|
||
|
||
def test_drivers_gpio_digital_input(): | ||
instance = DigitalInput(pin=4) | ||
def test_drivers_gpio_power(monkeypatch): | ||
monkeypatch.setenv("BLINKA_OS_AGNOSTIC", "1") | ||
|
||
with serve(instance) as client: | ||
with ThreadPoolExecutor() as pool: | ||
pool.submit(client.wait_for_active) | ||
instance.device.pin.drive_high() | ||
from jumpstarter_driver_raspberrypi.driver import DigitalPowerButton, DigitalPowerSwitch | ||
|
||
with ThreadPoolExecutor() as pool: | ||
pool.submit(client.wait_for_inactive) | ||
instance.device.pin.drive_low() | ||
with serve(DigitalPowerSwitch(pin="Dx_OUTPUT", drive_mode="PUSH_PULL")) as client: | ||
client.off() | ||
client.on() | ||
|
||
with serve(DigitalPowerButton(pin="Dx_OUTPUT", drive_mode="PUSH_PULL", off_press_seconds=1)) as client: | ||
client.off() | ||
client.on() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.