From 1972d07c07695cad5b66716fe80b903a6fdacb6a Mon Sep 17 00:00:00 2001 From: Shreyash Sinha Date: Thu, 14 Mar 2024 23:09:24 +0530 Subject: [PATCH] [libgpiod] test_all_input sample test working, cleaned up GPIO initialization part in gpio.py --- lib/python/TI/GPIO/gpio.py | 130 +++++----- lib/python/TI/GPIO/gpio_event.py | 377 ---------------------------- lib/python/TI/GPIO/gpio_pin_data.py | 18 +- 3 files changed, 75 insertions(+), 450 deletions(-) delete mode 100644 lib/python/TI/GPIO/gpio_event.py diff --git a/lib/python/TI/GPIO/gpio.py b/lib/python/TI/GPIO/gpio.py index 8f6d1f1..d7e2d97 100644 --- a/lib/python/TI/GPIO/gpio.py +++ b/lib/python/TI/GPIO/gpio.py @@ -20,13 +20,12 @@ # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. -from TI.GPIO import gpio_event as event from TI.GPIO import gpio_pin_data import TI.GPIO as GPIO import os import time import warnings -from threading import Thread +import threading try: import thread @@ -89,9 +88,10 @@ # GPIOD variables MAX_EVENTS = 64 line_request = None -channelLineRequest = dict() +channelLineRequest = defaultdict(lambda: None) eventCallbacks = defaultdict(list) + def _validate_mode_set(): if _gpio_mode is None: raise RuntimeError( @@ -135,6 +135,7 @@ def _channels_to_infos(channels, need_gpio=False, need_pwm=False): for c in _make_iterable(channels) ] + def channel_configuration(ch_info): """Return the current configuration of a channel as reported by sysfs. Any of IN, OUT, PWM, or None may be returned.""" @@ -157,20 +158,21 @@ def channel_configuration(ch_info): else: return None + def _app_channel_configuration(ch_info): """Return the current configuration of a channel as requested by this module in this process. Any of IN, OUT, or None may be returned.""" return _channel_configuration.get(ch_info.channel, None) -def _reconfigure_lines(line_request, ch_info, direction, initial): +def _reconfigure_lines(line_request, ch_info, direction, initial): gpiod_direction = Direction.AS_IS gpiod_value = Value.INACTIVE - if (direction == OUT): + if direction == OUT: gpiod_direction = Direction.OUTPUT - if (initial == HIGH): + if initial == HIGH: gpiod_value = Value.ACTIVE else: @@ -185,63 +187,65 @@ def _reconfigure_lines(line_request, ch_info, direction, initial): ) _channel_configuration[ch_info.channel] = direction -def _setup_single_out(ch_info, initial=None): +def _setup_single_out(ch_info, initial=None): chip_path = "/dev/gpiochip" + str(ch_info.gpiochip) gpiod_val = Value.INACTIVE - if (initial == HIGH): + if initial == HIGH: gpiod_val = Value.ACTIVE line_request = gpiod.request_lines( - chip_path, - consumer= None, - config={ - ch_info.gpio: gpiod.LineSettings( - direction=Direction.OUTPUT, output_value=gpiod_val - ) - }, - ) + chip_path, + consumer=None, + config={ + ch_info.gpio: gpiod.LineSettings( + direction=Direction.OUTPUT, output_value=gpiod_val + ) + }, + ) channelLineRequest[ch_info.gpio] = line_request _channel_configuration[ch_info.channel] = OUT def _setup_single_in(ch_info): - chip_path = "/dev/gpiochip" + str(ch_info.gpiochip) line_request = gpiod.request_lines( - chip_path, - consumer=None, - config={ch_info.gpio: gpiod.LineSettings(direction=Direction.INPUT)}, - ) + chip_path, + consumer=None, + config={ch_info.gpio: gpiod.LineSettings(direction=Direction.INPUT)}, + ) channelLineRequest[ch_info.gpio] = line_request _channel_configuration[ch_info.channel] = IN -def callback_handler (channel): +def callback_handler(channel): ch_info = _channel_to_info(channel, need_gpio=True) while _run_loop: noEvents = channelLineRequest[ch_info.gpio].read_edge_events(MAX_EVENTS) for event in noEvents: for callback in eventCallbacks[ch_info.gpio]: - callback( channel ) + callback(channel) + -def start_thread( channel ): +def start_thread(channel): global _run_loop - _run_loop = True + _run_loop = True thread.start_new_thread(callback_handler, (channel,)) + def stop_thread(): _run_loop = False -def event_cleanup ( ch_info ): +def event_cleanup(ch_info): stop_thread() eventCallbacks[ch_info.gpio].clear() + def _pwm_path(ch_info): return ch_info.pwm_chip_dir + "/pwm" + str(ch_info.pwm_id) @@ -330,6 +334,7 @@ def _cleanup_one(ch_info): del _channel_configuration[ch_info.channel] + def _cleanup_all(): global _gpio_mode @@ -432,20 +437,17 @@ def setup(channels, direction, pull_up_down=_Default(PUD_OFF), initial=None): ) for ch_info in ch_infos: - app_cfg = _app_channel_configuration(ch_info) - if app_cfg is not None: - _reconfigure_lines( channelLineRequest[ch_info.gpio], ch_info, direction, initial ) + if channelLineRequest[ch_info.gpio] is not None: + _reconfigure_lines( + channelLineRequest[ch_info.gpio], ch_info, direction, initial + ) - if direction == OUT: - initial = _make_iterable(initial, len(ch_infos)) - if len(initial) != len(ch_infos): - raise RuntimeError("Number of values != number of channels") - for ch_info, init in zip(ch_infos, initial): - _setup_single_out(ch_info, init) - else: - if initial is not None: - raise ValueError("initial parameter is not valid for inputs") - for ch_info in ch_infos: + elif direction == OUT: + _setup_single_out(ch_info, initial) + + else: + if initial is not None: + raise ValueError("initial parameter is not valid for inputs") _setup_single_in(ch_info) @@ -486,7 +488,7 @@ def input(channel): raise RuntimeError("You must setup() the GPIO channel first") value_read = channelLineRequest[ch_info.gpio].get_value(ch_info.gpio) - if (value_read == Value.ACTIVE): + if value_read == Value.ACTIVE: return HIGH else: return LOW @@ -507,14 +509,12 @@ def output(channels, values): raise RuntimeError("The GPIO channel has not been set up as an " "OUTPUT") for ch_info, value in zip(ch_infos, values): - if (value == HIGH): + if value == HIGH: channelLineRequest[ch_info.gpio].set_value(ch_info.gpio, Value.ACTIVE) else: channelLineRequest[ch_info.gpio].set_value(ch_info.gpio, Value.INACTIVE) - - # Function used to check if an event occurred on the specified channel. # Param channel must be an integer. # This function return True or False @@ -549,6 +549,7 @@ def add_event_callback(channel, callback): eventCallbacks[ch_info.gpio].append(callback) + # Function used to add threaded event detection for a specified gpio channel. # Param gpio must be an integer specifying the channel, edge must be RISING, # FALLING or BOTH. A callback function to be called when the event is detected @@ -582,21 +583,21 @@ def add_event_detect(channel, edge, callback=None, bouncetime=None): else: gpiod_edge = Edge.BOTH - if (bouncetime != None): + if bouncetime != None: channelLineRequest[ch_info.gpio].reconfigure_lines( - config={ - ch_info.gpio: gpiod.LineSettings( - edge_detection=gpiod_edge, debounce_period=timedelta(milliseconds=bouncetime), - + config={ + ch_info.gpio: gpiod.LineSettings( + edge_detection=gpiod_edge, + debounce_period=timedelta(milliseconds=bouncetime), ) } ) else: channelLineRequest[ch_info.gpio].reconfigure_lines( - config={ - ch_info.gpio: gpiod.LineSettings( - edge_detection=gpiod_edge, debounce_period=timedelta(), - + config={ + ch_info.gpio: gpiod.LineSettings( + edge_detection=gpiod_edge, + debounce_period=timedelta(), ) } ) @@ -612,6 +613,7 @@ def remove_event_detect(channel): ch_info = _channel_to_info(channel, need_gpio=True) eventCallbacks[ch_info.gpio].clear() + # Function used to perform a blocking wait until the specified edge # is detected for the param channel. Channel must be an integer and edge must # be either RISING, FALLING or BOTH. @@ -652,27 +654,29 @@ def wait_for_edge(channel, edge, bouncetime=None, timeout=None): else: gpiod_edge = Edge.BOTH - if (bouncetime != None): + if bouncetime != None: channelLineRequest[ch_info.gpio].reconfigure_lines( - config={ - ch_info.gpio: gpiod.LineSettings( - edge_detection=gpiod_edge, debounce_period=timedelta(milliseconds=bouncetime), - + config={ + ch_info.gpio: gpiod.LineSettings( + edge_detection=gpiod_edge, + debounce_period=timedelta(milliseconds=bouncetime), ) } ) else: channelLineRequest[ch_info.gpio].reconfigure_lines( - config={ - ch_info.gpio: gpiod.LineSettings( - edge_detection=gpiod_edge, debounce_period=timedelta(), - + config={ + ch_info.gpio: gpiod.LineSettings( + edge_detection=gpiod_edge, + debounce_period=timedelta(), ) } ) - if (timeout != None): - status = channelLineRequest[ch_info.gpio].wait_edge_events(timedelta(milliseconds=timeout)) + if timeout != None: + status = channelLineRequest[ch_info.gpio].wait_edge_events( + timedelta(milliseconds=timeout) + ) else: status = channelLineRequest[ch_info.gpio].wait_edge_events(None) diff --git a/lib/python/TI/GPIO/gpio_event.py b/lib/python/TI/GPIO/gpio_event.py deleted file mode 100644 index effc05f..0000000 --- a/lib/python/TI/GPIO/gpio_event.py +++ /dev/null @@ -1,377 +0,0 @@ -# Copyright (c) 2012-2017 Ben Croston . -# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. -# Copyright (c) 2021-2023, Texas Instruments Incorporated. All rights reserved. -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the "Software"), -# to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, -# and/or sell copies of the Software, and to permit persons to whom the -# Software is furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -# DEALINGS IN THE SOFTWARE. - -# Python2 has module thread. Renamed to _thread in Python3 -try: - import thread -except: - import _thread as thread - -from select import epoll, EPOLLIN, EPOLLET, EPOLLPRI -from datetime import datetime - -try: - InterruptedError = InterruptedError -except: - InterruptedError = IOError - -# sysfs root -ROOT = "/sys/class/gpio" - -# Edge possibilities -NO_EDGE = 0 -RISING_EDGE = 1 -FALLING_EDGE = 2 -BOTH_EDGE = 3 - -# epoll thread object -_epoll_fd_thread = None - -# epoll blocking wait object -_epoll_fd_blocking = None - -# dictionary of GPIO class objects. key = gpio number, -# value = GPIO class object -_gpio_event_list = {} - -# variable to keep track of thread state -_thread_running = False - -# string representations for edges to write to sysfs -_edge_str = ["none", "rising", "falling", "both"] - -# lock object for thread -_mutex = thread.allocate_lock() - - -class _Gpios: - def __init__(self, gpio_name, edge=None, bouncetime=None): - self.edge = edge - self.value_fd = open(ROOT + "/" + gpio_name + "/value", "r") - self.initial_thread = True - self.initial_wait = True - self.thread_added = False - self.bouncetime = bouncetime - self.callbacks = [] - self.lastcall = 0 - self.event_occurred = False - - def __del__(self): - self.value_fd.close() - del self.callbacks - - -def add_edge_detect(gpio, gpio_name, edge, bouncetime): - global _epoll_fd_thread - gpios = None - res = gpio_event_added(gpio) - - # event not added - if not res: - gpios = _Gpios(gpio_name, edge, bouncetime) - _set_edge(gpio_name, edge) - - # event already added - elif res == edge: - gpios = _get_gpio_object(gpio) - if ( - bouncetime is not None and gpios.bouncetime != bouncetime - ) or gpios.thread_added: - return 1 - else: - return 1 - # create epoll object for fd if not already open - if _epoll_fd_thread is None: - _epoll_fd_thread = epoll() - if _epoll_fd_thread is None: - return 2 - - # add eventmask and fd to epoll object - try: - _epoll_fd_thread.register(gpios.value_fd, EPOLLIN | EPOLLET | EPOLLPRI) - except IOError: - remove_edge_detect(gpio, gpio_name) - return 2 - - gpios.thread_added = 1 - _gpio_event_list[gpio] = gpios - - # create and start poll thread if not already running - if not _thread_running: - try: - thread.start_new_thread(_poll_thread, ()) - except RuntimeError: - remove_edge_detect(gpio, gpio_name) - return 2 - return 0 - - -def remove_edge_detect(gpio, gpio_name): - if gpio not in _gpio_event_list: - return - - if _epoll_fd_thread is not None: - _epoll_fd_thread.unregister(_gpio_event_list[gpio].value_fd) - - _set_edge(gpio_name, NO_EDGE) - - _mutex.acquire() - del _gpio_event_list[gpio] - _mutex.release() - - -def add_edge_callback(gpio, callback): - if gpio not in _gpio_event_list or not _gpio_event_list[gpio].thread_added: - return - - _gpio_event_list[gpio].callbacks.append(callback) - - -def edge_event_detected(gpio): - retval = False - if gpio in _gpio_event_list: - _mutex.acquire() - if _gpio_event_list[gpio].event_occurred: - _gpio_event_list[gpio].event_occurred = False - retval = True - _mutex.release() - return retval - - -def gpio_event_added(gpio): - if gpio not in _gpio_event_list: - return NO_EDGE - - return _gpio_event_list[gpio].edge - - -def _get_gpio_object(gpio): - if gpio not in _gpio_event_list: - return None - return _gpio_event_list[gpio] - - -def _set_edge(gpio_name, edge): - edge_path = ROOT + "/" + gpio_name + "/edge" - - with open(edge_path, "w") as edge_file: - edge_file.write(_edge_str[edge]) - - -def _get_gpio_obj_key(fd): - for key in _gpio_event_list: - if _gpio_event_list[key].value_fd == fd: - return key - return None - - -def _get_gpio_file_object(fileno): - for key in _gpio_event_list: - if _gpio_event_list[key].value_fd.fileno() == fileno: - return _gpio_event_list[key].value_fd - return None - - -def _poll_thread(): - global _thread_running - - _thread_running = True - while _thread_running: - try: - events = _epoll_fd_thread.poll(maxevents=1) - fd = events[0][0] - _mutex.acquire() - - # check if file object has been deleted or closed from main thread - fd = _get_gpio_file_object(fd) - if fd is None or fd.closed: - continue - - # read file to make sure event is valid - fd.seek(0) - if len(fd.read().rstrip()) != 1: - _thread_running = False - thread.exit() - - # check key to make sure gpio object has not been deleted - # from main thread - key = _get_gpio_obj_key(fd) - if key is None: - continue - - gpio_obj = _gpio_event_list[key] - - # ignore first epoll trigger - if gpio_obj.initial_thread: - gpio_obj.initial_thread = False - _gpio_event_list[key] = gpio_obj - else: - # debounce the input event for the specified bouncetime - time = datetime.now() - time = time.second * 1e6 + time.microsecond - if ( - gpio_obj.bouncetime is None - or (time - gpio_obj.lastcall > gpio_obj.bouncetime * 1000) - or (gpio_obj.lastcall == 0) - or gpio_obj.lastcall > time - ): - gpio_obj.lastcall = time - gpio_obj.event_occurred = True - _gpio_event_list[key] = gpio_obj - _mutex.release() - for cb_func in gpio_obj.callbacks: - cb_func() - - # if interrupted by a signal, continue to start of the loop - except InterruptedError: - continue - except AttributeError: - break - finally: - if _mutex.locked(): - _mutex.release() - thread.exit() - - -def blocking_wait_for_edge(gpio, gpio_name, edge, bouncetime, timeout): - global _epoll_fd_blocking - gpio_obj = None - finished = False - res = None - initial_edge = True - - if timeout is None: - timeout = -1 - else: - timeout = float(timeout) / 1000 - - if gpio in _gpio_event_list: - if _gpio_event_list[gpio].callbacks: - return -1 - - # check if gpio edge already added. Add if not already added - added_edge = gpio_event_added(gpio) - - # get existing record - if added_edge == edge: - gpio_obj = _get_gpio_object(gpio) - if gpio_obj.bouncetime is not None and gpio_obj.bouncetime != bouncetime: - return -1 - - # not added. create new record - elif added_edge == NO_EDGE: - gpio_obj = _Gpios(gpio_name, edge, bouncetime) - _set_edge(gpio_name, edge) - _gpio_event_list[gpio] = gpio_obj - - # added_edge != edge. Event is for different edge - else: - _mutex.acquire() - gpio_obj = _get_gpio_object(gpio) - _set_edge(gpio_name, edge) - gpio_obj.edge = edge - gpio_obj.bouncetime = bouncetime - gpio_obj.initial_wait = True - _gpio_event_list[gpio] = gpio_obj - _mutex.release() - - # create epoll blocking object if not already created - if _epoll_fd_blocking is None: - _epoll_fd_blocking = epoll() - if _epoll_fd_blocking is None: - return -2 - - # register gpio value fd with epoll - try: - _epoll_fd_blocking.register(gpio_obj.value_fd, EPOLLIN | EPOLLET | EPOLLPRI) - except IOError: - print("IOError occured while register epoll blocking for GPIO %s" % gpio) - return -2 - - while not finished: - # retry polling if interrupted by signal - try: - res = _epoll_fd_blocking.poll(timeout, maxevents=1) - except InterruptedError: - continue - - # First trigger is with current state so ignore - if initial_edge: - initial_edge = False - continue - - # debounce input for specified time - else: - time = datetime.now() - time = time.second * 1e6 + time.microsecond - if ( - (gpio_obj.bouncetime is None) - or (time - gpio_obj.lastcall > gpio_obj.bouncetime * 1000) - or (gpio_obj.lastcall == 0) - or (gpio_obj.lastcall > time) - ): - gpio_obj.lastcall = time - _mutex.acquire() - _gpio_event_list[gpio] = gpio_obj - _mutex.release() - finished = True - - # check if the event detected was valid - if res: - fileno = res[0][0] - fd = gpio_obj.value_fd - if fileno != fd.fileno(): - _epoll_fd_blocking.unregister(gpio_obj.value_fd) - print("File object not found after wait for GPIO %s" % gpio) - return -2 - else: - _mutex.acquire() - fd.seek(0) - value_str = fd.read().rstrip() - _mutex.release() - if len(value_str) != 1: - _epoll_fd_blocking.unregister(gpio_obj.value_fd) - print("Length of value string was not 1 for GPIO %s" % gpio) - return -2 - - _epoll_fd_blocking.unregister(gpio_obj.value_fd) - - # 0 if timeout occured - res == [] - # 1 if event was valid - return int(res != []) - - -def event_cleanup(gpio, gpio_name): - global _epoll_fd_thread, _epoll_fd_blocking, _thread_running - - _thread_running = False - if gpio in _gpio_event_list: - remove_edge_detect(gpio, gpio_name) - - if _gpio_event_list == {}: - if _epoll_fd_blocking is not None: - _epoll_fd_blocking.close() - _epoll_fd_blocking = None - - if _epoll_fd_thread is not None: - _epoll_fd_thread.close() - _epoll_fd_thread = None diff --git a/lib/python/TI/GPIO/gpio_pin_data.py b/lib/python/TI/GPIO/gpio_pin_data.py index a751389..61393a7 100644 --- a/lib/python/TI/GPIO/gpio_pin_data.py +++ b/lib/python/TI/GPIO/gpio_pin_data.py @@ -182,10 +182,10 @@ (40, 2, "600000.gpio", 32, 12, "GPIO0_40", None, None), (10, 3, "601000.gpio", 33, 13, "GPIO1_10", "23010000.pwm", 1), (13, 3, "601000.gpio", 35, 19, "GPIO1_13", "23000000.pwm", 0), - (9, 3, "601000.gpio", 36, 16, "GPIO1_09", "23010000.pwm", 0), + (9, 3, "601000.gpio", 36, 16, "GPIO1_09", "23010000.pwm", 0), (41, 2, "600000.gpio", 37, 26, "GPIO0_41", None, None), - (8, 3, "601000.gpio", 38, 20, "GPIO1_08", None, None), - (7, 3, "601000.gpio", 40, 21, "GPIO1_07", None, None), + (8, 3, "601000.gpio", 38, 20, "GPIO1_08", None, None), + (7, 3, "601000.gpio", 40, 21, "GPIO1_07", None, None), ] compats_am62ask = ( @@ -217,10 +217,10 @@ (40, 1, "600000.gpio", 32, 12, "GPIO0_40", None, None), (10, 2, "601000.gpio", 33, 13, "GPIO1_10", "23010000.pwm", 1), (13, 2, "601000.gpio", 35, 19, "GPIO1_13", None, None), - (9, 2, "601000.gpio", 36, 16, "GPIO1_09", "23010000.pwm", 0), + (9, 2, "601000.gpio", 36, 16, "GPIO1_09", "23010000.pwm", 0), (41, 1, "600000.gpio", 37, 26, "GPIO0_41", None, None), - (8, 2, "601000.gpio", 38, 20, "GPIO1_08", None, None), - (7, 2, "601000.gpio", 40, 21, "GPIO1_07", None, None), + (8, 2, "601000.gpio", 38, 20, "GPIO1_08", None, None), + (7, 2, "601000.gpio", 40, 21, "GPIO1_07", None, None), ] compats_am62psk = ( @@ -283,9 +283,7 @@ class ChannelInfo(object): - def __init__( - self, channel, gpiochip, gpio, pwm_chip_dir, pwm_id - ): + def __init__(self, channel, gpiochip, gpio, pwm_chip_dir, pwm_id): self.channel = channel self.gpiochip = gpiochip self.gpio = gpio @@ -362,7 +360,7 @@ def model_data(key_col, pin_defs): x[GPIO_CHIP_ENTRY], x[OFFSET_ENTRY], pwm_chip_dir=pwm_dirs.get(x[PWM_SYSFS_DIR_ENTRY], None), - pwm_id=x[PWM_ID_ENTRY] + pwm_id=x[PWM_ID_ENTRY], ) for x in pin_defs }