Skip to content

Commit

Permalink
[libgpiod] test_all_input sample test working, cleaned up GPIO
Browse files Browse the repository at this point in the history
initialization part in gpio.py
  • Loading branch information
sinha-shreyash authored and ravi-rahul committed Jul 12, 2024
1 parent 58fb113 commit 1972d07
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 450 deletions.
130 changes: 67 additions & 63 deletions lib/python/TI/GPIO/gpio.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

from TI.GPIO import gpio_event as event
from TI.GPIO import gpio_pin_data
import TI.GPIO as GPIO
import os
import time
import warnings
from threading import Thread
import threading

try:
import thread
Expand Down Expand Up @@ -89,9 +88,10 @@
# GPIOD variables
MAX_EVENTS = 64
line_request = None
channelLineRequest = dict()
channelLineRequest = defaultdict(lambda: None)
eventCallbacks = defaultdict(list)


def _validate_mode_set():
if _gpio_mode is None:
raise RuntimeError(
Expand Down Expand Up @@ -135,6 +135,7 @@ def _channels_to_infos(channels, need_gpio=False, need_pwm=False):
for c in _make_iterable(channels)
]


def channel_configuration(ch_info):
"""Return the current configuration of a channel as reported by sysfs. Any
of IN, OUT, PWM, or None may be returned."""
Expand All @@ -157,20 +158,21 @@ def channel_configuration(ch_info):
else:
return None


def _app_channel_configuration(ch_info):
"""Return the current configuration of a channel as requested by this
module in this process. Any of IN, OUT, or None may be returned."""

return _channel_configuration.get(ch_info.channel, None)

def _reconfigure_lines(line_request, ch_info, direction, initial):

def _reconfigure_lines(line_request, ch_info, direction, initial):
gpiod_direction = Direction.AS_IS
gpiod_value = Value.INACTIVE

if (direction == OUT):
if direction == OUT:
gpiod_direction = Direction.OUTPUT
if (initial == HIGH):
if initial == HIGH:
gpiod_value = Value.ACTIVE

else:
Expand All @@ -185,63 +187,65 @@ def _reconfigure_lines(line_request, ch_info, direction, initial):
)
_channel_configuration[ch_info.channel] = direction

def _setup_single_out(ch_info, initial=None):

def _setup_single_out(ch_info, initial=None):
chip_path = "/dev/gpiochip" + str(ch_info.gpiochip)
gpiod_val = Value.INACTIVE

if (initial == HIGH):
if initial == HIGH:
gpiod_val = Value.ACTIVE

line_request = gpiod.request_lines(
chip_path,
consumer= None,
config={
ch_info.gpio: gpiod.LineSettings(
direction=Direction.OUTPUT, output_value=gpiod_val
)
},
)
chip_path,
consumer=None,
config={
ch_info.gpio: gpiod.LineSettings(
direction=Direction.OUTPUT, output_value=gpiod_val
)
},
)

channelLineRequest[ch_info.gpio] = line_request
_channel_configuration[ch_info.channel] = OUT


def _setup_single_in(ch_info):

chip_path = "/dev/gpiochip" + str(ch_info.gpiochip)
line_request = gpiod.request_lines(
chip_path,
consumer=None,
config={ch_info.gpio: gpiod.LineSettings(direction=Direction.INPUT)},
)
chip_path,
consumer=None,
config={ch_info.gpio: gpiod.LineSettings(direction=Direction.INPUT)},
)

channelLineRequest[ch_info.gpio] = line_request
_channel_configuration[ch_info.channel] = IN


def callback_handler (channel):
def callback_handler(channel):
ch_info = _channel_to_info(channel, need_gpio=True)
while _run_loop:
noEvents = channelLineRequest[ch_info.gpio].read_edge_events(MAX_EVENTS)

for event in noEvents:
for callback in eventCallbacks[ch_info.gpio]:
callback( channel )
callback(channel)


def start_thread( channel ):
def start_thread(channel):
global _run_loop
_run_loop = True
_run_loop = True
thread.start_new_thread(callback_handler, (channel,))


def stop_thread():
_run_loop = False


def event_cleanup ( ch_info ):
def event_cleanup(ch_info):
stop_thread()
eventCallbacks[ch_info.gpio].clear()


def _pwm_path(ch_info):
return ch_info.pwm_chip_dir + "/pwm" + str(ch_info.pwm_id)

Expand Down Expand Up @@ -330,6 +334,7 @@ def _cleanup_one(ch_info):

del _channel_configuration[ch_info.channel]


def _cleanup_all():
global _gpio_mode

Expand Down Expand Up @@ -432,20 +437,17 @@ def setup(channels, direction, pull_up_down=_Default(PUD_OFF), initial=None):
)

for ch_info in ch_infos:
app_cfg = _app_channel_configuration(ch_info)
if app_cfg is not None:
_reconfigure_lines( channelLineRequest[ch_info.gpio], ch_info, direction, initial )
if channelLineRequest[ch_info.gpio] is not None:
_reconfigure_lines(
channelLineRequest[ch_info.gpio], ch_info, direction, initial
)

if direction == OUT:
initial = _make_iterable(initial, len(ch_infos))
if len(initial) != len(ch_infos):
raise RuntimeError("Number of values != number of channels")
for ch_info, init in zip(ch_infos, initial):
_setup_single_out(ch_info, init)
else:
if initial is not None:
raise ValueError("initial parameter is not valid for inputs")
for ch_info in ch_infos:
elif direction == OUT:
_setup_single_out(ch_info, initial)

else:
if initial is not None:
raise ValueError("initial parameter is not valid for inputs")
_setup_single_in(ch_info)


Expand Down Expand Up @@ -486,7 +488,7 @@ def input(channel):
raise RuntimeError("You must setup() the GPIO channel first")

value_read = channelLineRequest[ch_info.gpio].get_value(ch_info.gpio)
if (value_read == Value.ACTIVE):
if value_read == Value.ACTIVE:
return HIGH
else:
return LOW
Expand All @@ -507,14 +509,12 @@ def output(channels, values):
raise RuntimeError("The GPIO channel has not been set up as an " "OUTPUT")

for ch_info, value in zip(ch_infos, values):
if (value == HIGH):
if value == HIGH:
channelLineRequest[ch_info.gpio].set_value(ch_info.gpio, Value.ACTIVE)
else:
channelLineRequest[ch_info.gpio].set_value(ch_info.gpio, Value.INACTIVE)




# Function used to check if an event occurred on the specified channel.
# Param channel must be an integer.
# This function return True or False
Expand Down Expand Up @@ -549,6 +549,7 @@ def add_event_callback(channel, callback):

eventCallbacks[ch_info.gpio].append(callback)


# Function used to add threaded event detection for a specified gpio channel.
# Param gpio must be an integer specifying the channel, edge must be RISING,
# FALLING or BOTH. A callback function to be called when the event is detected
Expand Down Expand Up @@ -582,21 +583,21 @@ def add_event_detect(channel, edge, callback=None, bouncetime=None):
else:
gpiod_edge = Edge.BOTH

if (bouncetime != None):
if bouncetime != None:
channelLineRequest[ch_info.gpio].reconfigure_lines(
config={
ch_info.gpio: gpiod.LineSettings(
edge_detection=gpiod_edge, debounce_period=timedelta(milliseconds=bouncetime),

config={
ch_info.gpio: gpiod.LineSettings(
edge_detection=gpiod_edge,
debounce_period=timedelta(milliseconds=bouncetime),
)
}
)
else:
channelLineRequest[ch_info.gpio].reconfigure_lines(
config={
ch_info.gpio: gpiod.LineSettings(
edge_detection=gpiod_edge, debounce_period=timedelta(),

config={
ch_info.gpio: gpiod.LineSettings(
edge_detection=gpiod_edge,
debounce_period=timedelta(),
)
}
)
Expand All @@ -612,6 +613,7 @@ def remove_event_detect(channel):
ch_info = _channel_to_info(channel, need_gpio=True)
eventCallbacks[ch_info.gpio].clear()


# Function used to perform a blocking wait until the specified edge
# is detected for the param channel. Channel must be an integer and edge must
# be either RISING, FALLING or BOTH.
Expand Down Expand Up @@ -652,27 +654,29 @@ def wait_for_edge(channel, edge, bouncetime=None, timeout=None):
else:
gpiod_edge = Edge.BOTH

if (bouncetime != None):
if bouncetime != None:
channelLineRequest[ch_info.gpio].reconfigure_lines(
config={
ch_info.gpio: gpiod.LineSettings(
edge_detection=gpiod_edge, debounce_period=timedelta(milliseconds=bouncetime),

config={
ch_info.gpio: gpiod.LineSettings(
edge_detection=gpiod_edge,
debounce_period=timedelta(milliseconds=bouncetime),
)
}
)
else:
channelLineRequest[ch_info.gpio].reconfigure_lines(
config={
ch_info.gpio: gpiod.LineSettings(
edge_detection=gpiod_edge, debounce_period=timedelta(),

config={
ch_info.gpio: gpiod.LineSettings(
edge_detection=gpiod_edge,
debounce_period=timedelta(),
)
}
)

if (timeout != None):
status = channelLineRequest[ch_info.gpio].wait_edge_events(timedelta(milliseconds=timeout))
if timeout != None:
status = channelLineRequest[ch_info.gpio].wait_edge_events(
timedelta(milliseconds=timeout)
)
else:
status = channelLineRequest[ch_info.gpio].wait_edge_events(None)

Expand Down
Loading

0 comments on commit 1972d07

Please sign in to comment.