diff --git a/.gitignore b/.gitignore index 91631d7..93d9338 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,7 @@ # Cache data unittests /.pytest_cache +/.coverage + +# proto folders/scripts +/examples_development diff --git a/examples/update_scripts/README.md b/examples/update_scripts/README.md index c9651df..271bc4c 100644 --- a/examples/update_scripts/README.md +++ b/examples/update_scripts/README.md @@ -45,3 +45,31 @@ Go to the `examples/update_scripts` folder and run the script with following arg Go to the `examples/update_scripts` folder and run the script with following arguments $ python o2x5xx_multi_fw_updater.py -i O2x5xx_Firmware_1.30.10629.swu -H 192.168.0.69 + +### Build stand-alone application + +Navigate to the project folder and build the application with following command: + +``` +python -m PyInstaller --onefile --windowed --name O2X5xxO3D3xxMultiFWUpdater o2x5xx_multi_fw_updater.py +``` + +You will find the stand-alone application LogTracesExtractor.exe in the dist folder. + +Usage of O2X5xxO3D3xxMultiFWUpdater.exe stand-alone application: + + usage: O2X5xxO3D3xxMultiFWUpdater.exe [-h] -i INPUT [INPUT ...] -H HOST [HOST ...] + [-b BACKUP] [-l LOG] [-r] + + example: O2X5xxO3D3xxMultiFWUpdater.exe -i O2x5xx_Firmware_1.30.10629.swu -H 192.168.0.69 + + optional arguments: + -h, --help show this help message and exit + -i INPUT [INPUT ...], --input INPUT [INPUT ...] + specify input SWU file(s) + -H HOST [HOST ...], --host HOST [HOST ...] + specify host IPs + -b BACKUP, --backup BACKUP + path for config backup folder + -l LOG, --log LOG path for log file folder + -r, --remove remove config backup folder after application finished diff --git a/setup.py b/setup.py index 1387bff..ad98522 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ def read_requires(): setup( name='o2x5xx', - version='0.4', + version='0.3.0-beta', description='A Python library for ifm O2x5xx (O2D5xx / O2I5xx) devices', author='Michael Gann', author_email='support.efector.object-ident@ifm.com', diff --git a/source/device/client.py b/source/device/client.py index fc908a3..22ff28e 100644 --- a/source/device/client.py +++ b/source/device/client.py @@ -5,12 +5,10 @@ class O2x5xxDevice(O2x5xxPCICDevice): def __init__(self, address="192.168.0.69", port=50010, autoconnect=True, timeout=SOCKET_TIMEOUT): self._address = address - self._port = port - self._autoconnect = autoconnect + self._timeout = timeout self._rpc = None - if autoconnect: - self._rpc = O2x5xxRPCDevice(address=self._address) - super(O2x5xxPCICDevice, self).__init__(address, port, autoconnect, timeout) + self._rpc = O2x5xxRPCDevice(address=address, timeout=timeout) + super(O2x5xxPCICDevice, self).__init__(address=address, port=port, autoconnect=autoconnect, timeout=timeout) def __enter__(self): return self @@ -22,7 +20,9 @@ def __exit__(self, exc_type, exc_val, exc_tb): self._rpc = None @property - def rpc(self): + def rpc(self) -> O2x5xxRPCDevice: + if not self._rpc: + self._rpc = O2x5xxRPCDevice(address=self._address, timeout=self._timeout) return self._rpc @@ -32,25 +32,29 @@ def __init__(self, address="192.168.0.69", port=50010, autoconnect=True, timeout self._port = port self._timeout = timeout self._autoconnect = autoconnect - self._pcic = None - self._rpc = None + self._rpc = O2x5xxRPCDevice(address=address, timeout=timeout) + self._pcic = O2x5xxPCICDevice(address=address, port=port, autoconnect=autoconnect, timeout=timeout) def __enter__(self): - self._pcic = O2x5xxPCICDevice(address=self._address, port=self._port, autoconnect=self._autoconnect, - timeout=self._timeout) - self._rpc = O2x5xxRPCDevice(address=self._address) return self def __exit__(self, exc_type, exc_val, exc_tb): - self._pcic.close() - self._rpc.mainProxy.close() - self._pcic = None + if self._rpc: + self._rpc.mainProxy.close() + if self._pcic: + self._pcic.close() self._rpc = None + self._pcic = None @property def rpc(self) -> O2x5xxRPCDevice: + if not self._rpc: + self._rpc = O2x5xxRPCDevice(address=self._address, timeout=self._timeout) return self._rpc @property def pcic(self) -> O2x5xxPCICDevice: + if not self._pcic: + self._pcic = O2x5xxPCICDevice(address=self._address, port=self._port, + autoconnect=self._autoconnect, timeout=self._timeout) return self._pcic diff --git a/source/pcic/client.py b/source/pcic/client.py index 99175bf..21b4eaf 100644 --- a/source/pcic/client.py +++ b/source/pcic/client.py @@ -1,5 +1,4 @@ from ..static.formats import error_codes, serialization_format -from .utils import socket_exception_handler import matplotlib.image as mpimg import binascii import socket @@ -18,7 +17,7 @@ def __init__(self, address, port, autoconnect=True, timeout=SOCKET_TIMEOUT): self.address = address self.port = port self.autoconnect = autoconnect - self.timeout = timeout + self._timeout = timeout self.pcicSocket = None self.connected = False if self.autoconnect: @@ -27,7 +26,6 @@ def __init__(self, address, port, autoconnect=True, timeout=SOCKET_TIMEOUT): self.debug = False self.debugFull = False - @socket_exception_handler(timeout=SOCKET_TIMEOUT) def connect(self): """ Open the socket session with the device. @@ -35,11 +33,35 @@ def connect(self): :return: None """ if not self.connected: - self.pcicSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.pcicSocket.settimeout(self.timeout) - self.pcicSocket.connect((self.address, self.port)) + self.pcicSocket = socket.create_connection((self.address, self.port), timeout=self.timeout) self.connected = True + @property + def timeout(self): + """ + Get the current timeout value on blocking socket operations. If no socket instance available + the preset timeout value will be returned. + + :return: (float) socket timeout in seconds + """ + if self.pcicSocket: + return self.pcicSocket.gettimeout() + return self._timeout + + @timeout.setter + def timeout(self, value): + """ + Set a timeout on blocking socket operations. If a non-zero value is given, subsequent socket operations will + raise a timeout exception if the timeout period value has elapsed before the operation has completed. + If zero is given, the socket is put in non-blocking mode. If None is given, the socket is put in blocking mode. + + :param value: (float) non-negative socket timeout in seconds + :return: None + """ + self._timeout = value + if self.pcicSocket: + self.pcicSocket.settimeout(self._timeout) + def disconnect(self): """ Close the socket session with the device. @@ -81,27 +103,6 @@ def recv(self, number_bytes): fragments.append(chunk) return b''.join(fragments) - @property - def pcic_socket_timeout(self) -> float: - """ - Getter for timeout value of lowlevel connection socket. - - :return: (float) socket timeout in seconds - """ - return self.timeout - - @pcic_socket_timeout.setter - def pcic_socket_timeout(self, value: float) -> None: - """ - Setter for timeout value of lowlevel connection socket. - - :param value: (float) in seconds. - :return: None - """ - if self.pcicSocket: - self.pcicSocket.settimeout(value) - self.timeout = value - class PCICV3Client(Client): DEFAULT_TICKET = "1000" @@ -154,7 +155,7 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.disconnect() + self.close() def activate_application(self, application_number: [str, int]) -> str: """ @@ -241,6 +242,7 @@ def retrieve_current_process_interface_configuration(self): """ result = self.send_command('C?') result = result.decode() + return result def request_current_error_state(self): @@ -494,27 +496,6 @@ def set_logic_state_of_an_id(self, io_id, state): result = result.decode() return result - # def set_logic_state_of_an_id2(self, io_id, state): - # """ - # This is a reST style. - # - # :param io_id : (int) - # 2 digits for digital output - # * "01": IO1 - # * "02": IO2 - # :param state : (str) - # this is a second param - # * "01": IO1 - # * "02": IO2 - # :returns: this is a description of what is returned - # :raises keyError: raises an exception - # """ - # if str(io_id).isnumeric(): - # io_id = str(io_id).zfill(2) - # result = self.send_command('o{io_id}{state}'.format(io_id=io_id, state=str(state))) - # result = result.decode() - # return result - def request_state_of_an_id(self, io_id): """ Requests the state of a specific ID. diff --git a/source/pcic/utils.py b/source/pcic/utils.py deleted file mode 100644 index 1250833..0000000 --- a/source/pcic/utils.py +++ /dev/null @@ -1,43 +0,0 @@ -import functools -import multiprocessing.pool - - -def socket_exception_handler(timeout): - """Timeout decorator, parameter in seconds.""" - - def exception_decorator(item): - """Wrap the original function.""" - - def close_socket_on_exception(session): - pcicSocket = getattr(session, "pcicSocket") - if pcicSocket: - pcicSocket.close() - - @functools.wraps(item) - def func_wrapper(*args, **kwargs): - """Closure for function.""" - max_timeout = getattr(args[0], "timeout") - try: - pool = multiprocessing.pool.ThreadPool(processes=1) - async_result = pool.apply_async(item, args, kwargs) - pool.close() - # raises a TimeoutError if execution exceeds max_timeout - return async_result.get(max_timeout) - - except multiprocessing.context.TimeoutError: - close_socket_on_exception(session=args[0]) - raise TimeoutError( - "Unable to establish socket connection to device with IP {}. " - "Execution time exceeded max timeout of {} seconds." - .format(args[0].address, max_timeout)) - - except ConnectionRefusedError: - close_socket_on_exception(session=args[0]) - raise ConnectionRefusedError( - "Device with IP {} refuses socket connection with defined port {}. " - "Is the port ok? Default port number is 50010." - .format(args[0].address, args[0].port)) - - return func_wrapper - - return exception_decorator diff --git a/source/rpc/application.py b/source/rpc/application.py index 76e16aa..86aa5d3 100644 --- a/source/rpc/application.py +++ b/source/rpc/application.py @@ -1,6 +1,7 @@ import json import os import warnings +from .proxy import ImagerProxy class Application(object): @@ -12,6 +13,25 @@ def __init__(self, applicationProxy, device): self._applicationProxy = applicationProxy self._device = device + def editImage(self, imager_index: int): + """ + Requests an Imager object specified by the image index. + + :param imager_index: (int) index of image from ImagerConfigList + :return: Imager (object) + """ + imageIDs = [int(x["Id"]) for x in self.getImagerConfigList()] + if imager_index not in imageIDs: + raise ValueError("Image index {} not available. Choose one imageIndex from following" + "ImagerConfigList or create a new one with method createImagerConfig():\n{}" + .format(imager_index, self.getImagerConfigList())) + + _imagerURL = self._applicationProxy.baseURL + 'imager_{0:03d}/'.format(imager_index) + setattr(self._device, "_imagerURL", _imagerURL) + _imagerProxy = ImagerProxy(url=_imagerURL, device=self._device) + setattr(self._device, "_imagerProxy", _imagerProxy) + return self._device.imager + def getAllParameters(self): """ Returns all parameters of the object in one data-structure. For an overview which parameters can be request use @@ -19,7 +39,7 @@ def getAllParameters(self): :return: (dict) name contains parameter-name, value the stringified parameter-value """ - result = self._applicationProxy.getAllParameters() + result = self._applicationProxy.proxy.getAllParameters() return result def getParameter(self, value): @@ -29,7 +49,7 @@ def getParameter(self, value): :param value: (str) parameter name :return: (str) """ - result = self._applicationProxy.getParameter(value) + result = self._applicationProxy.proxy.getParameter(value) return result def getAllParameterLimits(self) -> dict: @@ -40,7 +60,7 @@ def getAllParameterLimits(self) -> dict: :return: (dict) """ - result = self._applicationProxy.getAllParameterLimits() + result = self._applicationProxy.proxy.getAllParameterLimits() return result @property @@ -74,7 +94,7 @@ def Name(self, value: str) -> None: max_chars = 64 if value.__len__() > max_chars: raise ValueError("Max. {} characters".format(max_chars)) - self._applicationProxy.setParameter("Name", value) + self._applicationProxy.proxy.setParameter("Name", value) self.waitForConfigurationDone() @property @@ -98,7 +118,7 @@ def Description(self, value: str) -> None: max_chars = 500 if value.__len__() > 500: raise ValueError("Max. {} characters".format(max_chars)) - self._applicationProxy.setParameter("Description", value) + self._applicationProxy.proxy.setParameter("Description", value) self.waitForConfigurationDone() @property @@ -139,7 +159,7 @@ def TriggerMode(self, value: int) -> None: if value not in range(int(limits["min"]), int(limits["max"]), 1): raise ValueError("RPC Trigger value not available. Available range: {}\n" "For more help take a look on the docstring documentation.".format(limits)) - self._applicationProxy.setParameter("TriggerMode", value) + self._applicationProxy.proxy.setParameter("TriggerMode", value) self.waitForConfigurationDone() @property @@ -164,7 +184,7 @@ def FrameRate(self, value: float) -> None: if not float(limits["min"]) <= float(value) <= float(limits["max"]): raise ValueError("FrameRate value not available. Available range: {}" .format(self.getAllParameterLimits()["FrameRate"])) - self._applicationProxy.setParameter("FrameRate", value) + self._applicationProxy.proxy.setParameter("FrameRate", value) self.waitForConfigurationDone() @property @@ -174,7 +194,7 @@ def HWROI(self) -> dict: :return: """ - result = eval(self.getParameter("HWROI")) + result = json.loads(self.getParameter("HWROI")) return result @HWROI.setter @@ -228,7 +248,7 @@ def getValueErrorListNumber(): lower=width_lower, upper=width_upper)) if valueErrorList: raise ValueError("".join(valueErrorList)) - self._applicationProxy.setParameter("HWROI", json.dumps(value)) + self._applicationProxy.proxy.setParameter("HWROI", json.dumps(value)) self.waitForConfigurationDone() @property @@ -238,7 +258,7 @@ def Rotate180Degree(self) -> bool: :return: (bool) True / False """ - result = self._applicationProxy.getParameter("Rotate180Degree") + result = self._applicationProxy.proxy.getParameter("Rotate180Degree") if result == "false": return False return True @@ -251,7 +271,7 @@ def Rotate180Degree(self, value: bool) -> None: :param value: (bool) True / False :return: None """ - self._applicationProxy.setParameter("Rotate180Degree", value) + self._applicationProxy.proxy.setParameter("Rotate180Degree", value) self.waitForConfigurationDone() @property @@ -261,7 +281,7 @@ def FocusDistance(self) -> float: :return: (float) current focus distance in meter """ - result = float(self._applicationProxy.getParameter("FocusDistance")) + result = float(self._applicationProxy.proxy.getParameter("FocusDistance")) return result @FocusDistance.setter @@ -276,8 +296,9 @@ def FocusDistance(self, value: float) -> None: if not float(limits["min"]) <= float(value) <= float(limits["max"]): raise ValueError("FocusDistance value not available. Available range: {}" .format(self.getAllParameterLimits()["FocusDistance"])) - self._applicationProxy.setParameter("FocusDistance", value) + self._applicationProxy.proxy.setParameter("FocusDistance", value) # TODO: Wird hier geblockt? Wird der Focus Distance direkt nach dem setzen angefahren? + # Edit: Kein Error, jedoch sind die Bilder unscharf wenn direkt danach das Bild angefordert wird: Fokus wird während requestImage im PCIC noch angefahren! self.waitForConfigurationDone() @property @@ -288,7 +309,7 @@ def ImageEvaluationOrder(self) -> str: :return: (str) """ - result = self._applicationProxy.getParameter("ImageEvaluationOrder") + result = self._applicationProxy.proxy.getParameter("ImageEvaluationOrder") return result @ImageEvaluationOrder.setter @@ -300,7 +321,7 @@ def ImageEvaluationOrder(self, value: list) -> None: :param value: (list) a whitespace separated list of ImagerConfig ids :return: None """ - self._applicationProxy.setParameter("ImageEvaluationOrder", value) + self._applicationProxy.proxy.setParameter("ImageEvaluationOrder", value) self.waitForConfigurationDone() @property @@ -309,7 +330,7 @@ def PcicTcpResultSchema(self) -> str: The PCIC TCP/IP Schema defines which result-data will be sent via TCP/IP. :return: (str) pcic tcp/ip schema config """ - return self._applicationProxy.getParameter("PcicTcpResultSchema") + return self._applicationProxy.proxy.getParameter("PcicTcpResultSchema") @PcicTcpResultSchema.setter def PcicTcpResultSchema(self, schema: str) -> None: @@ -318,7 +339,7 @@ def PcicTcpResultSchema(self, schema: str) -> None: :param schema: (str) pcic tcp/ip schema config :return: None """ - self._applicationProxy.setParameter("PcicTcpResultSchema", schema) + self._applicationProxy.proxy.setParameter("PcicTcpResultSchema", schema) validation = self.validate() if validation: warnings.warn(str(validation), UserWarning) @@ -331,7 +352,7 @@ def LogicGraph(self) -> str: JSON string describing a flow-graph which allows to program the logic between model-results and output-pins. :return: (str) JSON string flow-graph of Logic Layer """ - return self._applicationProxy.getParameter("LogicGraph") + return self._applicationProxy.proxy.getParameter("LogicGraph") @LogicGraph.setter def LogicGraph(self, schema: str) -> None: @@ -340,7 +361,7 @@ def LogicGraph(self, schema: str) -> None: :param schema: (str) JSON string flow-graph of Logic Layer :return: None """ - self._applicationProxy.setParameter("LogicGraph", schema) + self._applicationProxy.proxy.setParameter("LogicGraph", schema) validation = self.validate() if validation: warnings.warn(str(validation), UserWarning) @@ -419,7 +440,7 @@ def save(self) -> None: :return: None """ - self._applicationProxy.save() + self._applicationProxy.proxy.save() self.waitForConfigurationDone() def validate(self) -> list: @@ -428,7 +449,7 @@ def validate(self) -> list: :return: Array of fault-structs (Id: int, Text: string) """ - result = self._applicationProxy.validate() + result = self._applicationProxy.proxy.validate() return result def getImagerConfigList(self) -> list: @@ -437,7 +458,7 @@ def getImagerConfigList(self) -> list: :return: (list) Array of strings """ - result = self._applicationProxy.getImagerConfigList() + result = self._applicationProxy.proxy.getImagerConfigList() return result def availableImagerConfigTypes(self) -> list: @@ -446,7 +467,7 @@ def availableImagerConfigTypes(self) -> list: :return: (list) Array of strings """ - result = self._applicationProxy.availableImagerConfigTypes() + result = self._applicationProxy.proxy.availableImagerConfigTypes() return result def createImagerConfig(self, imagerType='normal', addToEval=True): @@ -458,7 +479,7 @@ def createImagerConfig(self, imagerType='normal', addToEval=True): not be activated for the image acquisition/evaluation run :return: (int) ID of new image-config """ - imagerIndex = self._applicationProxy.createImagerConfig(imagerType) + imagerIndex = self._applicationProxy.proxy.createImagerConfig(imagerType) if addToEval: imageEvalOrder = self.ImageEvaluationOrder imageEvalOrder += "{} ".format(imagerIndex) @@ -473,7 +494,7 @@ def copyImagerConfig(self, imagerIndex: int) -> int: :param imagerIndex: (int) ID of other Imager config :return: (int) ID of new image-config """ - imagerIndex = self._applicationProxy.copyImagerConfig(imagerIndex) + imagerIndex = self._applicationProxy.proxy.copyImagerConfig(imagerIndex) self.waitForConfigurationDone() return imagerIndex @@ -485,7 +506,7 @@ def deleteImagerConfig(self, imagerIndex: int) -> None: :param imagerIndex: (int) ID of image-config that should be removed :return: None """ - self._applicationProxy.deleteImagerConfig(imagerIndex) + self._applicationProxy.proxy.deleteImagerConfig(imagerIndex) self.waitForConfigurationDone() def isConfigurationDone(self) -> bool: @@ -495,7 +516,7 @@ def isConfigurationDone(self) -> bool: :return: (bool) True or False """ - result = self._applicationProxy.isConfigurationDone() + result = self._applicationProxy.proxy.isConfigurationDone() return result def waitForConfigurationDone(self): @@ -506,4 +527,4 @@ def waitForConfigurationDone(self): :return: None """ - self._applicationProxy.waitForConfigurationDone() + self._applicationProxy.proxy.waitForConfigurationDone() diff --git a/source/rpc/client.py b/source/rpc/client.py index a40596f..ce1069a 100644 --- a/source/rpc/client.py +++ b/source/rpc/client.py @@ -1,28 +1,32 @@ +from .proxy import MainProxy, SessionProxy, EditProxy, ApplicationProxy, ImagerProxy +from .session import Session +from .edit import Edit +from .application import Application +from .imager import Imager +from ..static.devices import DevicesMeta import xmlrpc.client import json import io -import time import numpy as np import matplotlib.image as mpimg -from .proxy import MainProxy, SessionProxy, EditProxy, ApplicationProxy, ImagerProxy -from .proxy import Session, Edit, Application, Imager -from .utils import timeout -from ..device.client import O2x5xxPCICDevice -from ..static.devices import DevicesMeta + + +SOCKET_TIMEOUT = 10 class O2x5xxRPCDevice(object): """ Main API class """ - def __init__(self, address="192.168.0.69", api_path="/api/rpc/v1/"): + def __init__(self, address="192.168.0.69", api_path="/api/rpc/v1/", timeout=SOCKET_TIMEOUT): self.address = address self.api_path = api_path + self.timeout = timeout self.baseURL = "http://" + self.address + self.api_path self.mainURL = self.baseURL + "com.ifm.efector/" - self.mainProxy = MainProxy(url=self.mainURL, device=self) - self.tcpIpPort = int(self.getParameter("PcicTcpPort")) + self.mainProxy = MainProxy(url=self.mainURL, timeout=self.timeout, device=self) self.deviceMeta = self._getDeviceMeta() + self._session = None def __enter__(self): return self @@ -52,12 +56,12 @@ def imagerProxy(self) -> ImagerProxy: @property def session(self) -> Session: if self.sessionProxy: - return getattr(self, "_session") + return Session(sessionProxy=self.sessionProxy, device=self) @property def edit(self) -> Edit: if self.editProxy: - return getattr(self, "_edit") + return Edit(editProxy=self.editProxy, device=self) else: raise AttributeError("No editProxy available! Please first create an editProxy " "with method self.device.session.requestOperatingMode(Mode=1) before using Edit!") @@ -65,12 +69,12 @@ def edit(self) -> Edit: @property def application(self) -> Application: if self.applicationProxy: - return getattr(self, "_application") + return Application(applicationProxy=self.applicationProxy, device=self) @property def imager(self) -> Imager: if self.imagerProxy: - return getattr(self, "_imager") + return Imager(imagerProxy=self.imagerProxy, device=self) def _getDeviceMeta(self): _deviceType = self.getParameter(value="DeviceType") @@ -90,7 +94,7 @@ def getParameter(self, value: str) -> str: :return: (str) value of parameter """ try: - result = self.mainProxy.getParameter(value) + result = self.mainProxy.proxy.getParameter(value) return result except xmlrpc.client.Fault as e: if e.faultCode == 101000: @@ -104,7 +108,7 @@ def getAllParameters(self) -> dict: :return: (dict) name contains parameter-name, value the stringified parameter-value """ - result = self.mainProxy.getAllParameters() + result = self.mainProxy.proxy.getAllParameters() return result def getSWVersion(self) -> dict: @@ -113,7 +117,7 @@ def getSWVersion(self) -> dict: :return: (dict) struct of strings """ - result = self.mainProxy.getSWVersion() + result = self.mainProxy.proxy.getSWVersion() return result def getHWInfo(self) -> dict: @@ -122,7 +126,7 @@ def getHWInfo(self) -> dict: :return: (dict) struct of strings """ - result = self.mainProxy.getHWInfo() + result = self.mainProxy.proxy.getHWInfo() return result def getDmesgData(self) -> str: @@ -131,7 +135,7 @@ def getDmesgData(self) -> str: :return: (str) List of kernel messages """ - result = self.mainProxy.getDmesgData() + result = self.mainProxy.proxy.getDmesgData() return result def getClientCompatibilityList(self) -> list: @@ -140,7 +144,7 @@ def getClientCompatibilityList(self) -> list: :return: (list) Array of strings """ - result = self.mainProxy.getClientCompatibilityList() + result = self.mainProxy.proxy.getClientCompatibilityList() return result def getApplicationList(self) -> list: @@ -149,7 +153,7 @@ def getApplicationList(self) -> list: :return: (dict) array list of structs """ - result = self.mainProxy.getApplicationList() + result = self.mainProxy.proxy.getApplicationList() return result def reboot(self, mode: int = 0) -> None: @@ -163,7 +167,7 @@ def reboot(self, mode: int = 0) -> None: """ if mode == 0: print("Rebooting sensor {} ...".format(self.getParameter(value="Name"))) - self.mainProxy.reboot(mode) + self.mainProxy.proxy.reboot(mode) else: raise ValueError("Reboot mode {} not available.".format(str(mode))) @@ -174,7 +178,7 @@ def switchApplication(self, applicationIndex: int) -> None: :param applicationIndex: (int) Index of new application (Range 1-32) :return: None """ - self.mainProxy.switchApplication(applicationIndex) + self.mainProxy.proxy.switchApplication(applicationIndex) self.waitForConfigurationDone() def getTraceLogs(self, nLogs: int = 0) -> list: @@ -185,7 +189,7 @@ def getTraceLogs(self, nLogs: int = 0) -> list: 0: all logs are fetched :return: (list) Array of strings """ - result = self.mainProxy.getTraceLogs(nLogs) + result = self.mainProxy.proxy.getTraceLogs(nLogs) return result def getApplicationStatisticData(self, applicationIndex: int) -> dict: @@ -197,7 +201,7 @@ def getApplicationStatisticData(self, applicationIndex: int) -> dict: :param applicationIndex: (int) Index of application (Range 1-32) :return: (dict) """ - result = eval(self.mainProxy.getApplicationStatisticData(applicationIndex)) + result = json.loads(self.mainProxy.proxy.getApplicationStatisticData(applicationIndex)) return result def getReferenceImage(self) -> np.ndarray: @@ -207,7 +211,7 @@ def getReferenceImage(self) -> np.ndarray: :return: (np.ndarray) a JPEG decompressed image """ b = bytearray() - b.extend(map(ord, str(self.mainProxy.getReferenceImage()))) + b.extend(map(ord, str(self.mainProxy.proxy.getReferenceImage()))) result = mpimg.imread(io.BytesIO(b), format='jpg') return result @@ -218,7 +222,7 @@ def isConfigurationDone(self) -> bool: :return: (bool) True or False """ - result = self.mainProxy.isConfigurationDone() + result = self.mainProxy.proxy.isConfigurationDone() return result def waitForConfigurationDone(self): @@ -229,7 +233,7 @@ def waitForConfigurationDone(self): :return: None """ - self.mainProxy.waitForConfigurationDone() + self.mainProxy.proxy.waitForConfigurationDone() def measure(self, measureInput: dict) -> dict: """ @@ -239,29 +243,17 @@ def measure(self, measureInput: dict) -> dict: :return: (dict) measure result """ input_stringified = json.dumps(measureInput) - result = eval(self.mainProxy.measure(input_stringified)) + result = json.loads(self.mainProxy.proxy.measure(input_stringified)) return result - def trigger(self) -> str: + def trigger(self): """ Executes trigger and read answer. :return: (str) process interface output (TCP/IP) """ - with O2x5xxPCICDevice(address=self.address, port=self.tcpIpPort) as pcicDevice: - while self.getParameter("OperatingMode") != "0": - Warning("Sensor is not in Run Mode. Please finish parametrization first.") - time.sleep(0.1) - self.mainProxy.trigger() - # This is required since there is no lock for application evaluation process within the trigger()-method. - # After an answer is provided by the PCIC interface you can be sure, - # that the trigger count was incremented correctly and the evaluation process finished. - ticket, answer = pcicDevice.read_next_answer() - self.waitForConfigurationDone() - pcicDevice.close() - return answer.decode() - - @timeout(2) + self.mainProxy.proxy.trigger() + def doPing(self) -> str: """ Ping sensor device and check reachability in network. @@ -269,5 +261,24 @@ def doPing(self) -> str: :return: - "up" sensor is reachable through network - "down" sensor is not reachable through network """ - result = self.mainProxy.doPing() + result = self.mainProxy.proxy.doPing() return result + + def requestSession(self, password='', session_id='0' * 32) -> Session: + """ + Request a session-object for access to the configuration and for changing device operating-mode. + This should block parallel editing and allows to put editing behind password. + The ID could optionally be defined from the external system, but it must be the defined format (32char "hex"). + If it is called with only one parameter, the device will generate a SessionID. + + :param password: (str) session password (optional) + :param session_id: (str) session ID (optional) + :return: Session object + """ + _sessionId = self.mainProxy.proxy.requestSession(password, session_id) + setattr(self, "_sessionId", _sessionId) + _sessionURL = self.mainURL + 'session_' + _sessionId + '/' + setattr(self, "_sessionURL", _sessionURL) + _sessionProxy = SessionProxy(url=_sessionURL, device=self) + setattr(self, "_sessionProxy", _sessionProxy) + return self.session diff --git a/source/rpc/edit.py b/source/rpc/edit.py index b3143e8..7a83ed2 100644 --- a/source/rpc/edit.py +++ b/source/rpc/edit.py @@ -1,3 +1,6 @@ +from .proxy import ApplicationProxy + + class Edit(object): """ Edit object @@ -7,6 +10,19 @@ def __init__(self, editProxy, device): self._editProxy = editProxy self._device = device + def editApplication(self, app_index): + """Generator for editApplication to be used in with statement. + + Args: + app_index (int): application index + """ + self._editProxy.proxy.editApplication(app_index) + _applicationURL = self._editProxy.baseURL + "application/" + setattr(self._device, "_applicationURL", _applicationURL) + _applicationProxy = ApplicationProxy(url=_applicationURL, device=self._device) + setattr(self._device, "_applicationProxy", _applicationProxy) + return self._device.application + def createApplication(self, deviceType="WithModels") -> int: """ Creates an "empty" application. @@ -16,7 +32,7 @@ def createApplication(self, deviceType="WithModels") -> int: """ if deviceType not in ["Camera", "WithModels"]: raise AttributeError("Device type must be either value \"Camera\" or \"WithModels\"!") - appIndex = self._editProxy.createApplication(deviceType) + appIndex = self._editProxy.proxy.createApplication(deviceType) return appIndex def copyApplication(self, applicationIndex: int) -> int: @@ -27,7 +43,7 @@ def copyApplication(self, applicationIndex: int) -> int: :param applicationIndex: (int) Index of application which should be copied :return: (int) Index of new application """ - appIndex = self._editProxy.copyApplication(applicationIndex) + appIndex = self._editProxy.proxy.copyApplication(applicationIndex) return appIndex def deleteApplication(self, applicationIndex: int) -> None: @@ -38,7 +54,7 @@ def deleteApplication(self, applicationIndex: int) -> None: :param applicationIndex: (int) application index :return: None """ - self._editProxy.deleteApplication(applicationIndex) + self._editProxy.proxy.deleteApplication(applicationIndex) def changeNameAndDescription(self, applicationIndex: int, name: str = "", description: str = "") -> None: """ @@ -55,7 +71,7 @@ def changeNameAndDescription(self, applicationIndex: int, name: str = "", descri max_chars = 500 if description.__len__() > 500: raise ValueError("Max. {} characters for description".format(max_chars)) - self._editProxy.changeNameAndDescription(applicationIndex, name, description) + self._editProxy.proxy.changeNameAndDescription(applicationIndex, name, description) def moveApplications(self, applicationIndexFrom: int, applicationIndexTo: int) -> None: """ @@ -65,10 +81,10 @@ def moveApplications(self, applicationIndexFrom: int, applicationIndexTo: int) - :param applicationIndexTo: (int) desired application id in application list :return: None """ - app_list = self._device.mainProxy.getApplicationList() + app_list = self._device.getApplicationList() move_list = [] for app in app_list: if int(app["Index"]) == int(applicationIndexFrom): app["Index"] = int(applicationIndexTo) move_list.append({'Id': app['Id'], 'Index': app['Index']}) - self._editProxy.moveApplications(move_list) + self._editProxy.proxy.moveApplications(move_list) diff --git a/source/rpc/imageQualityCheck.py b/source/rpc/imageQualityCheck.py index 874419a..f5751a9 100644 --- a/source/rpc/imageQualityCheck.py +++ b/source/rpc/imageQualityCheck.py @@ -27,7 +27,7 @@ def enabled(self) -> bool: :return: (bool) True / False """ - if self._imagerProxy.getAllParameters()["QualityCheckConfig"]: + if self._imagerProxy.proxy.getAllParameters()["QualityCheckConfig"]: return True return False @@ -40,9 +40,9 @@ def enabled(self, value: bool) -> None: :return: None """ if value: - self._imagerProxy.setParameter("QualityCheckConfig", True) + self._imagerProxy.proxy.setParameter("QualityCheckConfig", True) else: - self._imagerProxy.setParameter("QualityCheckConfig", "") + self._imagerProxy.proxy.setParameter("QualityCheckConfig", "") while self._device.isConfigurationDone() < 1.0: time.sleep(1) @@ -55,7 +55,7 @@ def _QualityCheckConfig(self) -> [dict, None]: """ if not self.enabled: return None - result = self._imagerProxy.getAllParameters()["QualityCheckConfig"] + result = self._imagerProxy.proxy.getAllParameters()["QualityCheckConfig"] result = ast.literal_eval(result) return result @@ -69,7 +69,7 @@ def _QualityCheckConfig(self, inputDict): """ if not self.enabled: self.enabled = True - self._imagerProxy.setParameter("QualityCheckConfig", json.dumps(inputDict)) + self._imagerProxy.proxy.setParameter("QualityCheckConfig", json.dumps(inputDict)) @property def _QualityCheckConfigSchema(self) -> dict: @@ -78,7 +78,7 @@ def _QualityCheckConfigSchema(self) -> dict: :return: (dict) schema of Image Quality Check """ - # ip = urlparse(self._imagerProxy.baseURL).netloc + # ip = urlparse(self._imagerProxy.proxy.baseURL).netloc # with urlopen("http://{}/schema/ParamImageFeatures.json".format(ip)) as url: # data = json.load(url) # return data diff --git a/source/rpc/imager.py b/source/rpc/imager.py index afb47da..2967169 100644 --- a/source/rpc/imager.py +++ b/source/rpc/imager.py @@ -31,7 +31,7 @@ def getAllParameters(self): :return: (dict) name contains parameter-name, value the stringified parameter-value """ - result = self._imagerProxy.getAllParameters() + result = self._imagerProxy.proxy.getAllParameters() return result def getParameter(self, value): @@ -41,7 +41,7 @@ def getParameter(self, value): :param value: (str) parameter name :return: (str) """ - result = self._imagerProxy.getParameter(value) + result = self._imagerProxy.proxy.getParameter(value) return result def getAllParameterLimits(self): @@ -52,7 +52,7 @@ def getAllParameterLimits(self): :return: (dict) """ - result = self._imagerProxy.getAllParameterLimits() + result = self._imagerProxy.proxy.getAllParameterLimits() return result @property @@ -85,7 +85,7 @@ def Name(self, value: str) -> None: max_chars = 64 if value.__len__() > max_chars: raise ValueError("Max. {} characters".format(max_chars)) - self._imagerProxy.setParameter("Name", value) + self._imagerProxy.proxy.setParameter("Name", value) self._device.waitForConfigurationDone() @property @@ -118,7 +118,7 @@ def Illumination(self, value: int) -> None: if value not in range(int(limits["min"]), int(limits["max"]), 1): raise ValueError("Illumination value not available. Available range: {}" .format(self.getAllParameterLimits()["Illumination"])) - self._imagerProxy.setParameter("Illumination", value) + self._imagerProxy.proxy.setParameter("Illumination", value) self._device.waitForConfigurationDone() @property @@ -159,7 +159,7 @@ def IlluInternalSegments(self, inputDict: dict) -> None: value += inputDict["upper-right"] * 0x02 value += inputDict["lower-left"] * 0x04 value += inputDict["lower-right"] * 0x08 - self._imagerProxy.setParameter("IlluInternalSegments", value) + self._imagerProxy.proxy.setParameter("IlluInternalSegments", value) self._device.waitForConfigurationDone() @property @@ -196,7 +196,7 @@ def Color(self, value: int) -> None: if not int(limits["min"]) <= value <= int(limits["max"]): raise ValueError("Color value not available. Available range: {}" .format(self.getAllParameterLimits()["Color"])) - self._imagerProxy.setParameter("Color", value) + self._imagerProxy.proxy.setParameter("Color", value) else: articleNumber = self._device.getParameter(value="ArticleNumber") raise TypeError("Color attribute not available for sensor {}.".format(articleNumber)) @@ -224,7 +224,7 @@ def ExposureTime(self, value: int) -> None: if not int(limits["min"]) <= int(value) <= int(limits["max"]): raise ValueError("ExposureTime value not available. Available range: {}" .format(self.getAllParameterLimits()["ExposureTime"])) - self._imagerProxy.setParameter("ExposureTime", value) + self._imagerProxy.proxy.setParameter("ExposureTime", value) self._device.waitForConfigurationDone() @property @@ -249,7 +249,7 @@ def AnalogGainFactor(self, value: int) -> None: if str(value) not in limits["values"]: raise ValueError("AnalogGainFactor value not available. Available values: {}" .format(self.getAllParameterLimits()["AnalogGainFactor"])) - self._imagerProxy.setParameter("AnalogGainFactor", value) + self._imagerProxy.proxy.setParameter("AnalogGainFactor", value) self._device.waitForConfigurationDone() @property @@ -284,7 +284,7 @@ def FilterType(self, value: int) -> None: if not int(limits["min"]) <= int(value) <= int(limits["max"]): raise ValueError("FilterType value not available. Available range: {}" .format(self.getAllParameterLimits()["FilterType"])) - self._imagerProxy.setParameter("FilterType", value) + self._imagerProxy.proxy.setParameter("FilterType", value) self._device.waitForConfigurationDone() @property @@ -311,7 +311,7 @@ def FilterStrength(self, value: int): if not int(limits["min"]) <= int(value) <= int(limits["max"]): raise ValueError("FilterStrength value not available. Available range: {}" .format(self.getAllParameterLimits()["FilterStrength"])) - self._imagerProxy.setParameter("FilterStrength", value) + self._imagerProxy.proxy.setParameter("FilterStrength", value) self._device.waitForConfigurationDone() @property @@ -321,7 +321,7 @@ def FilterInvert(self) -> bool: :return: (bool) True or False """ - result = self._imagerProxy.getParameter("FilterInvert") + result = self._imagerProxy.proxy.getParameter("FilterInvert") if result == "false": return False return True @@ -334,33 +334,37 @@ def FilterInvert(self, value: bool) -> None: :param value: (bool) True or False :return: None """ - self._imagerProxy.setParameter("FilterInvert", value) + self._imagerProxy.proxy.setParameter("FilterInvert", value) self._device.waitForConfigurationDone() def startCalculateExposureTime(self, minAnalogGainFactor: int = None, maxAnalogGainFactor: int = None, - saturatedRatio: [float, list] = None, ROIs: list = None, RODs: list = None) -> None: + saturatedRatio: float = None, ROIs: list = None, RODs: list = None) -> None: """ Starting calculation "auto exposure time" with analog gain factor, saturation ratio and ROI/ROD-definition. :param minAnalogGainFactor: Min. Analog Gain Factor upper limit. Possible values: 1, 2, 4 or 8 :param maxAnalogGainFactor: Max. Analog Gain Factor upper limit. Possible values: 1, 2, 4 or 8 - :param saturatedRatio: (float/array) maximum acceptable ratio of saturated pixels + :param saturatedRatio: (float/array) maximum acceptable ratio of saturated pixels. Possible range: [0.0, 1.0] :param ROIs: Auto-Exposure is calculated on these set of ROIs :param RODs: RODs are subtracted from the ROI union set :return: None """ inputAutoExposure = {} if minAnalogGainFactor: - inputAutoExposure.update({"minAnalogGainFactor": minAnalogGainFactor}) + inputAutoExposure["minAnalogGainFactor"] = minAnalogGainFactor if maxAnalogGainFactor: - inputAutoExposure.update({"maxAnalogGainFactor": maxAnalogGainFactor}) + inputAutoExposure["maxAnalogGainFactor"] = maxAnalogGainFactor if saturatedRatio: - inputAutoExposure.update({"saturatedRatio": saturatedRatio}) + inputAutoExposure["saturatedRatio"] = saturatedRatio if ROIs: - inputAutoExposure.update({"ROIs": ROIs}) + inputAutoExposure["ROIs"] = ROIs + if RODs and not ROIs: + defaultROIsZone = [{"id": 0, "group": 0, "type": "Rect", "width": 1280, + "height": 960, "angle": 0, "center_x": 640, "center_y": 480}] + inputAutoExposure["ROIs"] = defaultROIsZone if RODs: - inputAutoExposure.update({"RODs": RODs}) - self._imagerProxy.startCalculateExposureTime(json.dumps(inputAutoExposure)) + inputAutoExposure["RODs"] = RODs + self._imagerProxy.proxy.startCalculateExposureTime(json.dumps(inputAutoExposure)) while self.getProgressCalculateExposureTime() < 1.0: time.sleep(1) @@ -371,19 +375,26 @@ def getProgressCalculateExposureTime(self) -> float: :return: (float) progress (0.0 to 1.0) """ - result = self._imagerProxy.getProgressCalculateExposureTime() + result = self._imagerProxy.proxy.getProgressCalculateExposureTime() return result - def startCalculateAutofocus(self) -> None: + def startCalculateAutofocus(self, ROIs: list = None, RODs: list = None) -> None: """ Starting "autofocus" calculation with ROI-definition. The autofocus will be optimized for the center of the image (HWROI). :return: None """ - # This is required due to the long autofocus progress which may take longer than 10 seconds (default) - # self._device.sessionProxy.heartbeat(heartbeatInterval=300) - self._imagerProxy.startCalculateAutofocus() + inputAutoFocus = {} + if ROIs: + inputAutoFocus["ROIs"] = ROIs + if RODs and not ROIs: + defaultROIsZone = [{"id": 0, "group": 0, "type": "Rect", "width": 1280, + "height": 960, "angle": 0, "center_x": 640, "center_y": 480}] + inputAutoFocus["ROIs"] = defaultROIsZone + if RODs: + inputAutoFocus["RODs"] = RODs + self._imagerProxy.proxy.startCalculateAutofocus(json.dumps(inputAutoFocus)) while self.getProgressCalculateAutofocus() < 1.0: time.sleep(1) @@ -395,7 +406,7 @@ def stopCalculateAutofocus(self) -> None: :return: None """ - self._imagerProxy.stopCalculateAutofocus() + self._imagerProxy.proxy.stopCalculateAutofocus() def getProgressCalculateAutofocus(self) -> float: """ @@ -404,7 +415,7 @@ def getProgressCalculateAutofocus(self) -> float: :return: (float) progress (0.0 to 1.0) """ - result = self._imagerProxy.getProgressCalculateAutofocus() + result = self._imagerProxy.proxy.getProgressCalculateAutofocus() return result def getAutofocusDistances(self) -> list: @@ -413,7 +424,7 @@ def getAutofocusDistances(self) -> list: :return: a list of floating point values, separated by comma """ - result = self._imagerProxy.getAutofocusDistances() + result = self._imagerProxy.proxy.getAutofocusDistances() if result: if "," not in result: return [float(result)] @@ -427,7 +438,7 @@ def getAutoExposureResult(self) -> [dict, None]: :return: (dict) json with algo run result as a string """ - result = self._imagerProxy.getAutoExposureResult() + result = self._imagerProxy.proxy.getAutoExposureResult() if result: data = json.loads(result) return data diff --git a/source/rpc/proxy.py b/source/rpc/proxy.py index 6c631af..b3857f6 100644 --- a/source/rpc/proxy.py +++ b/source/rpc/proxy.py @@ -1,37 +1,44 @@ import xmlrpc.client from contextlib import contextmanager from threading import Timer -from .session import Session -from .edit import Edit -from .application import Application -from .imager import Imager + +SOCKET_TIMEOUT = 10 class BaseProxy(object): """Base class for all proxies.""" - def __init__(self, url): + def __init__(self, url, device, timeout=SOCKET_TIMEOUT): """Initialize the actual xmlrpc.client.ServerProxy from given url. Args: url (str): url for xmlrpc.client.ServerProxy + device (obj): device + timeout (float): Timeout values which is valid for the BaseProxy. + Argument can be a non-negative floating point number expressing seconds, or None. + If None, SOCKET_TIMEOUT value is used as default """ - self.__transport = xmlrpc.client.Transport() - self.__proxy = xmlrpc.client.ServerProxy(uri=url, transport=self.__transport) + try: + self.__proxy = xmlrpc.client.ServerProxy(uri=url, allow_none=True) + self.__transport = self.__proxy("transport") + self.__transport.make_connection(host=device.address) + getattr(self.__transport, "_connection")[1].timeout = timeout + except TimeoutError: + self.close() @property - def proxy(self): - return self.__proxy + def timeout(self): + if getattr(self.__transport, "_connection")[1]: + return getattr(self.__transport, "_connection")[1].timeout - def __getattr__(self, name): - """Pass given name to the actual xmlrpc.client.ServerProxy. + @timeout.setter + def timeout(self, value): + if getattr(self.__transport, "_connection")[1]: + getattr(self.__transport, "_connection")[1].timeout = value - Args: - name (str): name of attribute - Returns: - Attribute of xmlrpc.client.ServerProxy - """ - return self.__proxy.__getattr__(name) + @property + def proxy(self): + return self.__proxy def close(self): self.__transport.close() @@ -42,32 +49,37 @@ def close(self): class MainProxy(BaseProxy): """Proxy representing mainProxy.""" - def __init__(self, url, device): + def __init__(self, url, device, timeout): """Initialize main proxy member, device and baseURL. Args: url (str): url for BaseProxy device (obj): device + timeout (float): Timeout values which is valid for the MainProxy. + Argument can be a non-negative floating point number expressing seconds, or None. + If None, SOCKET_TIMEOUT value is used as default """ self.baseURL = url self.device = device - super(MainProxy, self).__init__(url) + super(MainProxy, self).__init__(url, device, timeout) @contextmanager - def requestSession(self, password='', session_id='0' * 32): + def requestSession(self, password='', session_id='0' * 32, timeout=SOCKET_TIMEOUT): """Generator for requestSession to be used in with statement. Args: password (str): password for session session_id (str): session id - - Initializes various proxies and calls installAdditionalProxies(). + timeout (float): Timeout values which is valid for the SessionProxy. + Argument can be a non-negative floating point number expressing seconds, or None. + If None, SOCKET_TIMEOUT value is used as default """ try: - self.device._sessionId = self.__getattr__('requestSession')(password, session_id) + self.device._sessionId = self.proxy.requestSession(password, session_id) self.device._sessionURL = self.baseURL + 'session_' + session_id + '/' - self.device._sessionProxy = SessionProxy(url=self.device._sessionURL, device=self.device) + self.device._sessionProxy = SessionProxy(url=self.device._sessionURL, + device=self.device, timeout=timeout) yield finally: try: @@ -75,7 +87,7 @@ def requestSession(self, password='', session_id='0' * 32): self.device._sessionProxy.autoHeartbeatTimer.cancel() except AttributeError: pass - self.device._sessionProxy.cancelSession() + self.device._sessionProxy.proxy.cancelSession() self.device._sessionProxy.close() self.device._sessionProxy = None self.device._sessionURL = None @@ -85,13 +97,13 @@ def requestSession(self, password='', session_id='0' * 32): class SessionProxy(BaseProxy): """Proxy representing sessionProxy.""" - def __init__(self, url, device, autoHeartbeat=True, autoHeartbeatInterval=30): + def __init__(self, url, device, timeout=SOCKET_TIMEOUT, autoHeartbeat=True, autoHeartbeatInterval=30): self.baseURL = url self.device = device self.autoHeartbeat = autoHeartbeat self.autoHeartbeatInterval = autoHeartbeatInterval - super().__init__(url) + super().__init__(url, device, timeout) if self.autoHeartbeat: self.heartbeat(self.autoHeartbeatInterval) @@ -100,8 +112,6 @@ def __init__(self, url, device, autoHeartbeat=True, autoHeartbeatInterval=30): else: self.heartbeat(300) - self.device._session = Session(sessionProxy=self.proxy, device=self.device) - def heartbeat(self, heartbeatInterval: int) -> int: """ Extend the live time of edit-session If the given value is outside the range of "SessionTimeout", @@ -127,20 +137,22 @@ def doAutoHeartbeat(self) -> None: self.autoHeartbeatTimer.start() @contextmanager - def setOperatingMode(self, mode): + def setOperatingMode(self, mode, timeout=SOCKET_TIMEOUT): """Generator for setOperatingMode to be used in with statement. Args: mode (int): operating mode + timeout (float): Timeout values which is valid for the EditProxy. + Argument can be a non-negative floating point number expressing seconds, or None. + If None, SOCKET_TIMEOUT value is used as default """ try: - self.__getattr__('setOperatingMode')(mode) + self.proxy.setOperatingMode(mode) self.device._editURL = self.baseURL + 'edit/' - self.device._editProxy = EditProxy(url=self.device._editURL, - device=self.device) + self.device._editProxy = EditProxy(url=self.device._editURL, device=self.device, timeout=timeout) yield finally: - self.__getattr__('setOperatingMode')(0) + self.proxy.setOperatingMode(0) self.device._editProxy.close() self.device._editURL = None self.device._editProxy = None @@ -149,30 +161,30 @@ def setOperatingMode(self, mode): class EditProxy(BaseProxy): """Proxy representing editProxy.""" - def __init__(self, url, device): + def __init__(self, url, device, timeout=SOCKET_TIMEOUT): self.baseURL = url self.device = device - super().__init__(url) - - self.device._edit = Edit(editProxy=self.proxy, - device=self.device) + super().__init__(url, device, timeout) @contextmanager - def editApplication(self, app_index): + def editApplication(self, app_index, timeout=SOCKET_TIMEOUT): """Generator for editApplication to be used in with statement. Args: app_index (int): application index + timeout (float): Timeout values which is valid for the ApplicationProxy. + Argument can be a non-negative floating point number expressing seconds, or None. + If None, SOCKET_TIMEOUT value is used as default """ try: - self.__getattr__('editApplication')(app_index) + self.proxy.editApplication(app_index) self.device._applicationURL = self.baseURL + "application/" - self.device._applicationProxy = ApplicationProxy(url=self.device._applicationURL, - device=self.device) + self.device._applicationProxy = ApplicationProxy(url=self.device._applicationURL, device=self.device, + timeout=timeout) yield finally: - self.__getattr__('stopEditingApplication')() + self.proxy.stopEditingApplication() self.device._applicationProxy.close() self.device._applicationURL = None self.device._applicationProxy = None @@ -181,21 +193,21 @@ def editApplication(self, app_index): class ApplicationProxy(BaseProxy): """Proxy representing editProxy.""" - def __init__(self, url, device): + def __init__(self, url, device, timeout=SOCKET_TIMEOUT): self.baseURL = url self.device = device - super().__init__(url) - - self.device._application = Application(applicationProxy=self.proxy, - device=self.device) + super().__init__(url, device, timeout) @contextmanager - def editImager(self, imager_index): + def editImager(self, imager_index, timeout=SOCKET_TIMEOUT): """Generator for editImager to be used in with statement. Args: imager_index (int): imager index + timeout (float): Timeout values which is valid for the ImagerProxy. + Argument can be a non-negative floating point number expressing seconds, or None. + If None, SOCKET_TIMEOUT value is used as default """ try: imager_IDs = [int(x["Id"]) for x in self.proxy.getImagerConfigList()] @@ -204,7 +216,8 @@ def editImager(self, imager_index): "ImagerConfigList or create a new one with method createImagerConfig():\n{}" .format(imager_index, self.proxy.getImagerConfigList())) self.device._imagerURL = self.baseURL + 'imager_{0:03d}/'.format(imager_index) - self.device._imagerProxy = ImagerProxy(url=self.device._imagerURL, device=self.device) + self.device._imagerProxy = ImagerProxy(url=self.device._imagerURL, device=self.device, + timeout=timeout) yield finally: self.device._imagerProxy.close() @@ -215,11 +228,8 @@ def editImager(self, imager_index): class ImagerProxy(BaseProxy): """Proxy representing editProxy.""" - def __init__(self, url, device): + def __init__(self, url, device, timeout=SOCKET_TIMEOUT): self.baseURL = url self.device = device - super().__init__(url) - - self.device._imager = Imager(imagerProxy=self.proxy, - device=self.device) + super().__init__(url, device, timeout) diff --git a/source/rpc/session.py b/source/rpc/session.py index 3fa8d79..93fbe65 100644 --- a/source/rpc/session.py +++ b/source/rpc/session.py @@ -4,6 +4,8 @@ import json import os import base64 +from .edit import Edit +from .proxy import EditProxy class Session(object): @@ -15,6 +17,41 @@ def __init__(self, sessionProxy, device): self._sessionProxy = sessionProxy self._device = device + def startEdit(self) -> Edit: + """ + Starting the edit mode and requesting an Edit object. + + :return: + """ + self.setOperatingMode(1) + _editURL = self._sessionProxy.baseURL + 'edit/' + setattr(self._device, "_editURL", _editURL) + editProxy = EditProxy(url=_editURL, device=self._device) + setattr(self._device, "_editProxy", editProxy) + return self._device.edit + + def stopEdit(self) -> None: + """ + Stopping the edit mode. + + :return: None + """ + self.setOperatingMode(0) + self._device._editURL = None + self._device._editProxy = None + + def setOperatingMode(self, mode) -> [None, Edit]: + """ + Changes the operation mode of the device. Setting this to "edit" will enable the "EditMode"-object on RPC. + + :param mode: 1 digit + 0: run mode + 1: edit mode + 2: simulation mode (Not implemented!) + :return: None or Edit object + """ + self._sessionProxy.proxy.setOperatingMode(mode) + def exportConfig(self) -> bytearray: """ Exports the whole configuration of the sensor-device and stores it at the desired path. @@ -22,14 +59,14 @@ def exportConfig(self) -> bytearray: :return: (bytearray) configuration as one data-blob :binary/base64 """ # increase heartbeat interval which will prevent a closed session after the "long" export progress - self._device.sessionProxy.heartbeat(heartbeatInterval=30) - config = self._sessionProxy.exportConfig() + self._sessionProxy.heartbeat(heartbeatInterval=30) + config = self._sessionProxy.proxy.exportConfig() config_bytes = bytearray() config_bytes.extend(map(ord, str(config))) while self.getExportProgress() < 1.0: time.sleep(1) self.cleanupExport() - self._device.waitForConfigurationDone() + self._device.mainProxy.proxy.waitForConfigurationDone() return config_bytes def importConfig(self, config: str, global_settings=True, network_settings=False, applications=True) -> None: @@ -43,16 +80,16 @@ def importConfig(self, config: str, global_settings=True, network_settings=False :return: None """ # This is required due to the long import progress which may take longer than 10 seconds (default) - self._device.sessionProxy.heartbeat(heartbeatInterval=30) + self._sessionProxy.heartbeat(heartbeatInterval=30) if global_settings: - self._sessionProxy.importConfig(config, 0x0001) + self._sessionProxy.proxy.importConfig(config, 0x0001) if network_settings: - self._sessionProxy.importConfig(config, 0x0002) + self._sessionProxy.proxy.importConfig(config, 0x0002) if applications: - self._sessionProxy.importConfig(config, 0x0010) + self._sessionProxy.proxy.importConfig(config, 0x0010) while self.getImportProgress() < 1.0: time.sleep(1) - self._device.waitForConfigurationDone() + self._device.mainProxy.proxy.waitForConfigurationDone() def exportApplication(self, applicationIndex: int) -> bytearray: """ @@ -62,7 +99,7 @@ def exportApplication(self, applicationIndex: int) -> bytearray: :return: None """ - config = self._sessionProxy.exportApplication(applicationIndex) + config = self._sessionProxy.proxy.exportApplication(applicationIndex) application_bytes = bytearray() application_bytes.extend(map(ord, str(config))) while self.getExportProgress() < 1.0: @@ -79,7 +116,7 @@ def importApplication(self, application: str) -> int: :return: (int) index of new application in list """ - index = int(self._sessionProxy.importApplication(application)) + index = int(self._sessionProxy.proxy.importApplication(application)) while self.getImportProgress() < 1.0: time.sleep(1) return index @@ -92,7 +129,7 @@ def getImportProgress(self) -> float: :return: (float) progress (0.0 to 1.0) """ try: - result = self._sessionProxy.getImportProgress() + result = self._sessionProxy.proxy.getImportProgress() return result except xmlrpc.client.Fault as fault: if fault.faultCode == 101107: @@ -106,7 +143,7 @@ def getExportProgress(self) -> float: :return: (float) progress (0.0 to 1.0) """ try: - result = self._sessionProxy.getExportProgress() + result = self._sessionProxy.proxy.getExportProgress() return result except xmlrpc.client.Fault as fault: if fault.faultCode == 101110: @@ -121,7 +158,7 @@ def cleanupExport(self) -> None: :return: None """ - self._sessionProxy.cleanupExport() + self._sessionProxy.proxy.cleanupExport() def getApplicationDetails(self, applicationIndex: [int, str]) -> dict: """ @@ -131,7 +168,7 @@ def getApplicationDetails(self, applicationIndex: [int, str]) -> dict: :param applicationIndex: (int) application Index :return: (dict) json-string containing application parameters, models and image settings """ - result = json.loads(self._sessionProxy.getApplicationDetails(applicationIndex)) + result = json.loads(self._sessionProxy.proxy.getApplicationDetails(applicationIndex)) return result def resetStatistics(self) -> None: @@ -140,8 +177,8 @@ def resetStatistics(self) -> None: :return: None """ - self._sessionProxy.resetStatistics() - self._device.waitForConfigurationDone() + self._sessionProxy.proxy.resetStatistics() + self._device.mainProxy.proxy.waitForConfigurationDone() def writeApplicationConfigFile(self, applicationName: str, data: bytearray) -> None: """ @@ -209,3 +246,14 @@ def _readConfigFile(self, configFile: str) -> str: return decoded else: raise FileExistsError("File {} does not exist!".format(configFile)) + + def cancelSession(self) -> None: + """ + Method for closing the session. + :return: None + """ + self._sessionProxy.proxy.cancelSession() + self._sessionProxy.close() + self._sessionProxy = None + self._device._sessionURL = None + self._device._sessionId = None diff --git a/source/rpc/simulation.py b/source/rpc/simulation.py new file mode 100644 index 0000000..20d174e --- /dev/null +++ b/source/rpc/simulation.py @@ -0,0 +1,56 @@ +import xmlrpc.client + + +class Simulation(object): + """ + Simulation Mode object + """ + + def __init__(self, editURL, sessionAPI, mainAPI): + self.url = editURL + self.sessionAPI = sessionAPI + self.mainAPI = mainAPI + self.rpc = xmlrpc.client.ServerProxy(self.url) + + @staticmethod + def createPayloadList(image_paths: [str, list]) -> list: + """ + TODO + + :param image_paths: + :return: + """ + payloadList = [] + if isinstance(image_paths, list): + for path in image_paths: + with open(r"{p}".format(p=path), "rb") as fh: + buf = fh.read() + payloadList.append(xmlrpc.client.Binary(buf)) + else: + with open(r"{p}".format(p=image_paths), "rb") as fh: + buf = fh.read() + payloadList.append(xmlrpc.client.Binary(buf)) + return payloadList + + def processImageSequence(self, image_sequence: list, force_trigger: bool) -> None: + """ + Sends an Image Sequence and decode them. + An additional parameter exists to define if a software trigger should be generated. + Notice that the number of images in the image sequence should match the number of image configurations + defined in the currently active application. + An "Image sequence" is a list of images that are meant to be decoded by the device in Simulation-Operating Mode. + Each image in the list should have the same format. Supported format for the images is: + + \* JPEG format as generated in the service report + + \* Size as generated in the service report: 1280 x 960 + + \* Color format as generated in the service report: grayscale 8 bit + + :return: None + """ + self.rpc.processImageSequence(image_sequence, force_trigger) + + def __getattr__(self, name): + # Forward otherwise undefined method calls to XMLRPC proxy + return getattr(self.rpc, name) diff --git a/source/rpc/utils.py b/source/rpc/utils.py index 029f3a3..c74473f 100644 --- a/source/rpc/utils.py +++ b/source/rpc/utils.py @@ -1,24 +1,6 @@ import zipfile import json import warnings -import multiprocessing.pool -import functools - - -def timeout(max_timeout): - """Timeout decorator, parameter in seconds.""" - def timeout_decorator(item): - """Wrap the original function.""" - @functools.wraps(item) - def func_wrapper(*args, **kwargs): - """Closure for function.""" - pool = multiprocessing.pool.ThreadPool(processes=1) - async_result = pool.apply_async(item, args, kwargs) - pool.close() - # raises a TimeoutError if execution exceeds max_timeout - return async_result.get(max_timeout) - return func_wrapper - return timeout_decorator def firmwareWarning(function): @@ -36,37 +18,10 @@ def wrapper(self, *args, **kwargs): raise ImportError("Unknown config file in zip: {}".format(str(zipFiles))) jsonData = json.loads(zipOpen.open(tmp).read()) minConfigFileFirmware = jsonData["Firmware"] - sensorFirmware = self._device.mainProxy.getSWVersion()["IFM_Software"] + sensorFirmware = self._device.getSWVersion()["IFM_Software"] if int(sensorFirmware.replace(".", "")) < int(minConfigFileFirmware.replace(".", "")): message = "Missmatch in firmware versions: Sensor firmware {} is lower than {} firmware {}. " \ "Import of may will fail!".format(sensorFirmware, tmp, minConfigFileFirmware) warnings.warn(message, UserWarning) return function(self, *args, **kwargs) return wrapper - - -def rpc_exception_handler(max_timeout): - """Timeout decorator, parameter in seconds.""" - - def exception_decorator(item): - """Wrap the original function.""" - - @functools.wraps(item) - def func_wrapper(*args, **kwargs): - """Closure for function.""" - try: - pool = multiprocessing.pool.ThreadPool(processes=1) - async_result = pool.apply_async(item, args, kwargs) - # raises a TimeoutError if execution exceeds max_timeout - pool.close() - return async_result.get(max_timeout) - - except multiprocessing.context.TimeoutError as e: - raise TimeoutError( - "Unable to establish XML-RPC connection to device with IP {}. " - "Execution time exceeded max timeout of {} seconds." - .format(args[0].address, max_timeout)) - - return func_wrapper - - return exception_decorator diff --git a/tests/config.py b/tests/config.py index 873e1b6..8da510b 100644 --- a/tests/config.py +++ b/tests/config.py @@ -1,3 +1,6 @@ deviceAddress = '192.168.0.69' pcicTcpPort = 50010 maxNumberContainers = 9 +# Set this to False if the applications for the unittests are already imported on the sensor +importDeviceConfigUnittests = True + diff --git a/tests/test_application.py b/tests/test_application.py index daec4b3..c0efc4b 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -14,23 +14,25 @@ class TestRPC_ApplicationObject(TestCase): @classmethod def setUpClass(cls) -> None: with O2x5xxRPCDevice(deviceAddress) as rpc: - with rpc.mainProxy.requestSession(): - cls.config_backup = rpc.session.exportConfig() + cls.config_file = getImportSetupByPinLayout(rpc=rpc)['config_file'] + cls.app_import_file = getImportSetupByPinLayout(rpc=rpc)['app_import_file'] + if importDeviceConfigUnittests: cls.active_application_backup = rpc.getParameter("ActiveApplication") - cls.config_file = getImportSetupByPinLayout(rpc=rpc)['config_file'] - cls.app_import_file = getImportSetupByPinLayout(rpc=rpc)['app_import_file'] - _configFile = rpc.session.readDeviceConfigFile(configFile=cls.config_file) - rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, - applications=True) + with rpc.mainProxy.requestSession(): + cls.config_backup = rpc.session.exportConfig() + _configFile = rpc.session.readDeviceConfigFile(configFile=cls.config_file) + rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, + applications=True) @classmethod def tearDownClass(cls) -> None: - with O2x5xxRPCDevice(deviceAddress) as rpc: - with rpc.mainProxy.requestSession(): - rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, - applications=True) - if cls.active_application_backup != "0": - rpc.switchApplication(cls.active_application_backup) + if importDeviceConfigUnittests: + with O2x5xxRPCDevice(deviceAddress) as rpc: + with rpc.mainProxy.requestSession(): + rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, + applications=True) + if cls.active_application_backup != "0": + rpc.switchApplication(cls.active_application_backup) def setUp(self) -> None: with O2x5xxRPCDevice(deviceAddress) as self.rpc: diff --git a/tests/test_device.py b/tests/test_device.py new file mode 100644 index 0000000..f1e175c --- /dev/null +++ b/tests/test_device.py @@ -0,0 +1,235 @@ +import socket +from unittest import TestCase +from source import O2x5xxDevice +from source import O2x5xxDeviceV2 +from tests.utils import * +from .config import * +import time + + +class TestDeviceV1(TestCase): + device = None + config_file = None + config_backup = None + active_application_backup = None + + @classmethod + def setUpClass(cls) -> None: + with O2x5xxDevice(deviceAddress) as cls.device: + cls.config_file = getImportSetupByPinLayout(rpc=cls.device.rpc)['config_file'] + cls.app_import_file = getImportSetupByPinLayout(rpc=cls.device.rpc)['app_import_file'] + if importDeviceConfigUnittests: + cls.active_application_backup = cls.device.rpc.getParameter("ActiveApplication") + with cls.device.rpc.mainProxy.requestSession(): + cls.config_backup = cls.device.rpc.session.exportConfig() + _configFile = cls.device.rpc.session.readDeviceConfigFile(configFile=cls.config_file) + cls.device.rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, + applications=True) + + @classmethod + def tearDownClass(cls) -> None: + if importDeviceConfigUnittests: + with O2x5xxDevice(deviceAddress) as cls.device: + with cls.device.rpc.mainProxy.requestSession(): + cls.device.rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, + applications=True) + if cls.active_application_backup != "0": + cls.device.rpc.switchApplication(cls.active_application_backup) + + def setUp(self): + with O2x5xxDevice(deviceAddress, pcicTcpPort) as device: + result = device.activate_application(1) + self.assertEqual(result, "*") + + def tearDown(self): + pass + + def test_device_v1_client_with_multiple_connects(self): + iterations = 100 + for i in range(iterations): + with O2x5xxDevice(deviceAddress, pcicTcpPort, autoconnect=False) as device: + device.connect() + result = device.occupancy_of_application_list() + self.assertNotEqual(result, "!") + self.assertNotEqual(result, "?") + self.assertTrue(result.count('\t') >= 6) + self.assertEqual(result, '006\t01\t01\t02\t03\t04\t05\t06') + + def test_device_v1_client_without_context_manager_with_autoconnect_False(self): + device = O2x5xxDevice(deviceAddress, pcicTcpPort, autoconnect=False) + device.connect() + result = device.occupancy_of_application_list() + self.assertNotEqual(result, "!") + self.assertNotEqual(result, "?") + device.disconnect() + + def test_device_v1_client_without_context_manager_with_autoconnect_True(self): + device = O2x5xxDevice(deviceAddress, pcicTcpPort, autoconnect=True) + result = device.occupancy_of_application_list() + self.assertNotEqual(result, "!") + self.assertNotEqual(result, "?") + device.close() + + def test_RPC_client_device_v1_client_with_wrong_ip_and_autoconnect_true(self): + with self.assertRaises(socket.timeout): + with O2x5xxDevice("192.168.0.5", pcicTcpPort, autoconnect=True) as device: + device.rpc.getParameter("ActiveApplication") + + def test_PCIC_client_device_v1_client_with_wrong_ip_and_autoconnect_true(self): + with self.assertRaises(socket.timeout): + with O2x5xxDevice("192.168.0.5", pcicTcpPort, autoconnect=True) as device: + device.occupancy_of_application_list() + + def test_RPC_client_device_v1_client_with_wrong_ip_and_autoconnect_false(self): + with self.assertRaises(socket.timeout): + with O2x5xxDevice("192.168.0.5", pcicTcpPort, autoconnect=False) as device: + device.rpc.getParameter("ActiveApplication") + + def test_PCIC_client_device_v1_client_with_wrong_ip_and_autoconnect_false(self): + with self.assertRaises(socket.timeout): + with O2x5xxDevice("192.168.0.5", pcicTcpPort, autoconnect=False) as device: + device.connect() + + def test_PCIC_client_device_v1_client_with_wrong_ip_and_autoconnect_false_and_timeout(self): + TIMEOUT_VALUE = 2 + start_time = time.time() + with self.assertRaises(socket.timeout): + with O2x5xxDevice("192.168.0.5", pcicTcpPort, autoconnect=False, timeout=2) as device: + device.connect() + end_time = time.time() + duration_secs = end_time - start_time + self.assertLess(duration_secs, TIMEOUT_VALUE+1) + + def test_RPC_client_device_v1_client_with_wrong_ip_and_autoconnect_false_and_timeout(self): + TIMEOUT_VALUE = 2 + start_time = time.time() + with self.assertRaises(socket.timeout): + with O2x5xxDevice("192.168.0.5", pcicTcpPort, autoconnect=False, timeout=2) as device: + device.rpc.getParameter("ActiveApplication") + end_time = time.time() + duration_secs = end_time - start_time + self.assertLess(duration_secs, TIMEOUT_VALUE+1) + + def test_device_v1_client_RPC_and_PCIC_client_properties_after_exit_method(self): + device = O2x5xxDevice(deviceAddress, pcicTcpPort) + with device: + device.rpc.getParameter("ActiveApplication") + device.request_device_information() + device.rpc.getParameter("ActiveApplication") + with self.assertRaises(AttributeError): + device.request_device_information() + device.connect() + device.request_device_information() + + +class TestDeviceV2(TestCase): + device = None + config_file = None + config_backup = None + active_application_backup = None + + @classmethod + def setUpClass(cls) -> None: + with O2x5xxDeviceV2(deviceAddress) as cls.device: + cls.config_file = getImportSetupByPinLayout(rpc=cls.device.rpc)['config_file'] + cls.app_import_file = getImportSetupByPinLayout(rpc=cls.device.rpc)['app_import_file'] + if importDeviceConfigUnittests: + cls.active_application_backup = cls.device.rpc.getParameter("ActiveApplication") + with cls.device.rpc.mainProxy.requestSession(): + cls.config_backup = cls.device.rpc.session.exportConfig() + _configFile = cls.device.rpc.session.readDeviceConfigFile(configFile=cls.config_file) + cls.device.rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, + applications=True) + + @classmethod + def tearDownClass(cls) -> None: + if importDeviceConfigUnittests: + with O2x5xxDeviceV2(deviceAddress) as cls.device: + with cls.device.rpc.mainProxy.requestSession(): + cls.device.rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, + applications=True) + if cls.active_application_backup != "0": + cls.device.rpc.switchApplication(cls.active_application_backup) + + def setUp(self): + with O2x5xxDeviceV2(deviceAddress, pcicTcpPort) as device: + result = device.pcic.activate_application(1) + self.assertEqual(result, "*") + + def tearDown(self): + pass + + def test_device_v2_client_with_multiple_connects_with_autoconnect_False(self): + iterations = 100 + for i in range(iterations): + with O2x5xxDeviceV2(deviceAddress, pcicTcpPort, autoconnect=False) as device: + device.pcic.connect() + result = device.pcic.occupancy_of_application_list() + self.assertNotEqual(result, "!") + self.assertNotEqual(result, "?") + device.pcic.disconnect() + + def test_device_v2_client_without_context_manager_with_autoconnect_False(self): + device = O2x5xxDeviceV2(deviceAddress, pcicTcpPort, autoconnect=False) + device.pcic.connect() + result = device.pcic.occupancy_of_application_list() + self.assertNotEqual(result, "!") + self.assertNotEqual(result, "?") + device.pcic.disconnect() + device.rpc.mainProxy.close() + + def test_device_v2_client_without_context_manager_with_autoconnect_True(self): + device = O2x5xxDeviceV2(deviceAddress, pcicTcpPort, autoconnect=True) + result = device.pcic.occupancy_of_application_list() + self.assertNotEqual(result, "!") + self.assertNotEqual(result, "?") + device.pcic.close() + device.rpc.mainProxy.close() + + def test_RPC_client_device_v2_client_with_wrong_ip_and_autoconnect_true(self): + with self.assertRaises(socket.timeout): + with O2x5xxDeviceV2("192.168.0.5", pcicTcpPort, autoconnect=True) as device: + device.rpc.getParameter("ActiveApplication") + + def test_PCIC_client_device_v2_client_with_wrong_ip_and_autoconnect_true(self): + with self.assertRaises(socket.timeout): + with O2x5xxDeviceV2("192.168.0.5", pcicTcpPort, autoconnect=True) as device: + device.pcic.occupancy_of_application_list() + + def test_RPC_client_device_v2_client_with_wrong_ip_and_autoconnect_false(self): + with self.assertRaises(socket.timeout): + with O2x5xxDeviceV2("192.168.0.5", pcicTcpPort, autoconnect=False) as device: + device.rpc.getParameter("ActiveApplication") + + def test_PCIC_client_device_v2_client_with_wrong_ip_and_autoconnect_false(self): + with self.assertRaises(socket.timeout): + with O2x5xxDeviceV2("192.168.0.5", pcicTcpPort, autoconnect=False) as device: + device.pcic.connect() + + def test_PCIC_client_device_v2_client_with_wrong_ip_and_autoconnect_false_and_timeout(self): + TIMEOUT_VALUE = 2 + start_time = time.time() + with self.assertRaises(socket.timeout): + with O2x5xxDeviceV2("192.168.0.5", pcicTcpPort, autoconnect=False, timeout=2) as device: + device.pcic.connect() + end_time = time.time() + duration_secs = end_time - start_time + self.assertLess(duration_secs, TIMEOUT_VALUE+1) + + def test_RPC_client_device_v2_client_with_wrong_ip_and_autoconnect_false_and_timeout(self): + TIMEOUT_VALUE = 2 + start_time = time.time() + with self.assertRaises(socket.timeout): + with O2x5xxDeviceV2("192.168.0.5", pcicTcpPort, autoconnect=False, timeout=2) as device: + device.rpc.getParameter("ActiveApplication") + end_time = time.time() + duration_secs = end_time - start_time + self.assertLess(duration_secs, TIMEOUT_VALUE+1) + + def test_device_v2_client_RPC_and_PCIC_client_properties_after_exit_method(self): + device = O2x5xxDeviceV2(deviceAddress, pcicTcpPort) + with device: + device.rpc.getParameter("ActiveApplication") + device.pcic.request_device_information() + device.rpc.getParameter("ActiveApplication") + device.pcic.request_device_information() diff --git a/tests/test_edit.py b/tests/test_edit.py index 8317a3c..43cfcb8 100644 --- a/tests/test_edit.py +++ b/tests/test_edit.py @@ -13,23 +13,25 @@ class TestRPC_EditObject(TestCase): @classmethod def setUpClass(cls) -> None: with O2x5xxRPCDevice(deviceAddress) as rpc: - with rpc.mainProxy.requestSession(): - cls.config_backup = rpc.session.exportConfig() + cls.config_file = getImportSetupByPinLayout(rpc=rpc)['config_file'] + cls.app_import_file = getImportSetupByPinLayout(rpc=rpc)['app_import_file'] + if importDeviceConfigUnittests: cls.active_application_backup = rpc.getParameter("ActiveApplication") - cls.config_file = getImportSetupByPinLayout(rpc=rpc)['config_file'] - cls.app_import_file = getImportSetupByPinLayout(rpc=rpc)['app_import_file'] - _configFile = rpc.session.readDeviceConfigFile(configFile=cls.config_file) - rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, - applications=True) + with rpc.mainProxy.requestSession(): + cls.config_backup = rpc.session.exportConfig() + _configFile = rpc.session.readDeviceConfigFile(configFile=cls.config_file) + rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, + applications=True) @classmethod def tearDownClass(cls) -> None: - with O2x5xxRPCDevice(deviceAddress) as rpc: - with rpc.mainProxy.requestSession(): - rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, - applications=True) - if cls.active_application_backup != "0": - rpc.switchApplication(cls.active_application_backup) + if importDeviceConfigUnittests: + with O2x5xxRPCDevice(deviceAddress) as rpc: + with rpc.mainProxy.requestSession(): + rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, + applications=True) + if cls.active_application_backup != "0": + rpc.switchApplication(cls.active_application_backup) def setUp(self) -> None: with O2x5xxRPCDevice(deviceAddress) as self.rpc: diff --git a/tests/test_imageQualityCheck.py b/tests/test_imageQualityCheck.py index 5ba3952..df7cda3 100644 --- a/tests/test_imageQualityCheck.py +++ b/tests/test_imageQualityCheck.py @@ -13,23 +13,25 @@ class TestRPC_ImageQualityObject(TestCase): @classmethod def setUpClass(cls) -> None: with O2x5xxRPCDevice(deviceAddress) as rpc: - with rpc.mainProxy.requestSession(): - cls.config_backup = rpc.session.exportConfig() + cls.config_file = getImportSetupByPinLayout(rpc=rpc)['config_file'] + cls.app_import_file = getImportSetupByPinLayout(rpc=rpc)['app_import_file'] + if importDeviceConfigUnittests: cls.active_application_backup = rpc.getParameter("ActiveApplication") - cls.config_file = getImportSetupByPinLayout(rpc=rpc)['config_file'] - cls.app_import_file = getImportSetupByPinLayout(rpc=rpc)['app_import_file'] - _configFile = rpc.session.readDeviceConfigFile(configFile=cls.config_file) - rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, - applications=True) + with rpc.mainProxy.requestSession(): + cls.config_backup = rpc.session.exportConfig() + _configFile = rpc.session.readDeviceConfigFile(configFile=cls.config_file) + rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, + applications=True) @classmethod def tearDownClass(cls) -> None: - with O2x5xxRPCDevice(deviceAddress) as rpc: - with rpc.mainProxy.requestSession(): - rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, - applications=True) - if cls.active_application_backup != "0": - rpc.switchApplication(cls.active_application_backup) + if importDeviceConfigUnittests: + with O2x5xxRPCDevice(deviceAddress) as rpc: + with rpc.mainProxy.requestSession(): + rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, + applications=True) + if cls.active_application_backup != "0": + rpc.switchApplication(cls.active_application_backup) def setUp(self) -> None: with O2x5xxRPCDevice(deviceAddress) as self.rpc: diff --git a/tests/test_imager.py b/tests/test_imager.py index a758e25..51d5c30 100644 --- a/tests/test_imager.py +++ b/tests/test_imager.py @@ -1,6 +1,6 @@ from unittest import TestCase -from source import O2x5xxRPCDevice from tests.utils import * +from source import O2x5xxRPCDevice from .config import * @@ -13,23 +13,25 @@ class TestRPC_ImagerObject(TestCase): @classmethod def setUpClass(cls) -> None: with O2x5xxRPCDevice(deviceAddress) as rpc: - with rpc.mainProxy.requestSession(): - cls.config_backup = rpc.session.exportConfig() + cls.config_file = getImportSetupByPinLayout(rpc=rpc)['config_file'] + cls.app_import_file = getImportSetupByPinLayout(rpc=rpc)['app_import_file'] + if importDeviceConfigUnittests: cls.active_application_backup = rpc.getParameter("ActiveApplication") - cls.config_file = getImportSetupByPinLayout(rpc=rpc)['config_file'] - cls.app_import_file = getImportSetupByPinLayout(rpc=rpc)['app_import_file'] - _configFile = rpc.session.readDeviceConfigFile(configFile=cls.config_file) - rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, - applications=True) + with rpc.mainProxy.requestSession(): + cls.config_backup = rpc.session.exportConfig() + _configFile = rpc.session.readDeviceConfigFile(configFile=cls.config_file) + rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, + applications=True) @classmethod def tearDownClass(cls) -> None: - with O2x5xxRPCDevice(deviceAddress) as rpc: - with rpc.mainProxy.requestSession(): - rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, - applications=True) - if cls.active_application_backup != "0": - rpc.switchApplication(cls.active_application_backup) + if importDeviceConfigUnittests: + with O2x5xxRPCDevice(deviceAddress) as rpc: + with rpc.mainProxy.requestSession(): + rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, + applications=True) + if cls.active_application_backup != "0": + rpc.switchApplication(cls.active_application_backup) def setUp(self) -> None: with O2x5xxRPCDevice(deviceAddress) as self.rpc: @@ -252,7 +254,7 @@ def test_startCalculateExposureTimeWithArguments(self): {"x": 100, "y": 100}, {"x": 334, "y": 300}, {"x": 250, "y": 400}]}] self.rpc.imager.startCalculateExposureTime(minAnalogGainFactor=2, maxAnalogGainFactor=8, - saturatedRatio=[0.25], + saturatedRatio=0.25, ROIs=exposureROIsZone, RODs=exposureRODsZone) self.assertEqual(self.rpc.imager.getProgressCalculateExposureTime(), 1.0) @@ -265,6 +267,27 @@ def test_startCalculateAutofocus(self): self.rpc.imager.startCalculateAutofocus() self.assertEqual(self.rpc.imager.getProgressCalculateAutofocus(), 1.0) + def test_startCalculateAutofocusWithArguments(self): + with O2x5xxRPCDevice(deviceAddress) as self.rpc, self.rpc.mainProxy.requestSession(): + with self.rpc.sessionProxy.setOperatingMode(mode=1), self.rpc.editProxy.editApplication( + app_index=self.newApplicationIndex): + with self.rpc.applicationProxy.editImager(imager_index=1): + focusROIsZone = [{"id": 0, "group": 0, "type": "Rect", "width": 100, + "height": 50, "angle": 45, "center_x": 123, "center_y": 42}, + {"id": 0, "group": 0, "type": "Ellipse", "width": 50, + "height": 100, "angle": 45, "center_x": 42, "center_y": 123}, + {"id": 0, "group": 0, "type": "Poly", "points": [ + {"x": 100, "y": 100}, {"x": 234, "y": 100}, {"x": 150, "y": 300}]}] + focusRODsZone = [{"id": 0, "group": 0, "type": "Rect", "width": 30, + "height": 40, "angle": 45, "center_x": 123, "center_y": 42}, + {"id": 0, "group": 0, "type": "Ellipse", "width": 20, + "height": 100, "angle": 90, "center_x": 42, "center_y": 153}, + {"id": 0, "group": 0, "type": "Poly", "points": [ + {"x": 100, "y": 100}, {"x": 334, "y": 300}, {"x": 250, "y": 400}]}] + self.rpc.imager.startCalculateAutofocus(ROIs=focusROIsZone, + RODs=focusRODsZone) + self.assertEqual(self.rpc.imager.getProgressCalculateAutofocus(), 1.0) + def test_getAutofocusDistances(self): with O2x5xxRPCDevice(deviceAddress) as self.rpc, self.rpc.mainProxy.requestSession(): with self.rpc.sessionProxy.setOperatingMode(mode=1), self.rpc.editProxy.editApplication( diff --git a/tests/test_pcic.py b/tests/test_pcic.py index 11fd948..7b37420 100644 --- a/tests/test_pcic.py +++ b/tests/test_pcic.py @@ -1,4 +1,5 @@ import ast +import socket import time from unittest import TestCase from source import O2x5xxPCICDevice @@ -12,30 +13,33 @@ class TestPCIC(TestCase): pcic = None rpc = None session = None + config_file = None config_backup = None active_application_backup = None pin_layout = None @classmethod - def setUpClass(cls): - with O2x5xxRPCDevice(deviceAddress) as cls.rpc: - with cls.rpc.mainProxy.requestSession(): - cls.config_backup = cls.rpc.session.exportConfig() - cls.active_application_backup = cls.rpc.getParameter("ActiveApplication") - configFile = getImportSetupByPinLayout(rpc=cls.rpc)['config_file'] - configFile = cls.rpc.session.readDeviceConfigFile(configFile=configFile) - cls.rpc.session.importConfig(configFile, global_settings=True, network_settings=False, + def setUpClass(cls) -> None: + with O2x5xxRPCDevice(deviceAddress) as rpc: + cls.config_file = getImportSetupByPinLayout(rpc=rpc)['config_file'] + cls.app_import_file = getImportSetupByPinLayout(rpc=rpc)['app_import_file'] + if importDeviceConfigUnittests: + cls.active_application_backup = rpc.getParameter("ActiveApplication") + with rpc.mainProxy.requestSession(): + cls.config_backup = rpc.session.exportConfig() + _configFile = rpc.session.readDeviceConfigFile(configFile=cls.config_file) + rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, applications=True) - cls.rpc.switchApplication(1) @classmethod - def tearDownClass(cls): - with O2x5xxRPCDevice(deviceAddress) as cls.rpc: - with cls.rpc.mainProxy.requestSession(): - cls.rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, + def tearDownClass(cls) -> None: + if importDeviceConfigUnittests: + with O2x5xxRPCDevice(deviceAddress) as rpc: + with rpc.mainProxy.requestSession(): + rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, applications=True) - if cls.active_application_backup != "0": - cls.rpc.switchApplication(cls.active_application_backup) + if cls.active_application_backup != "0": + rpc.switchApplication(cls.active_application_backup) def setUp(self): with O2x5xxPCICDevice(deviceAddress, pcicTcpPort) as pcic: @@ -45,6 +49,42 @@ def setUp(self): def tearDown(self): pass + def test_PCIC_client_timeout_property_and_autoconnect_true(self): + timeout_values = [2, 6, 3, 9, 5, 7] + with O2x5xxPCICDevice(deviceAddress, pcicTcpPort, autoconnect=True) as pcic: + for _, x in enumerate(timeout_values): + pcic.timeout = x + self.assertEqual(pcic.timeout, x) + + def test_PCIC_client_timeout_property_and_autoconnect_false(self): + timeout_values = [2, 6, 3, 9, 5, 7] + with O2x5xxPCICDevice(deviceAddress, pcicTcpPort, autoconnect=False) as pcic: + pcic.connect() + for _, x in enumerate(timeout_values): + pcic.timeout = x + self.assertEqual(pcic.timeout, x) + + def test_PCIC_client_connect_timeout_with_wrong_ip_and_autoconnect_false(self): + TIMEOUT_VALUE = 2 + start_time = time.time() + with O2x5xxPCICDevice("192.168.0.5", pcicTcpPort, autoconnect=False, timeout=TIMEOUT_VALUE) as pcic: + self.assertEqual(pcic.timeout, TIMEOUT_VALUE) + with self.assertRaises(socket.timeout): + pcic.connect() + end_time = time.time() + duration_secs = end_time - start_time + self.assertLess(duration_secs, TIMEOUT_VALUE+1) + + def test_PCIC_client_connect_timeout_with_wrong_ip_and_autoconnect_true(self): + TIMEOUT_VALUE = 2 + start_time = time.time() + with self.assertRaises(socket.timeout): + with O2x5xxPCICDevice("192.168.0.5", pcicTcpPort, autoconnect=True, timeout=TIMEOUT_VALUE) as pcic: + pcic.occupancy_of_application_list() + end_time = time.time() + duration_secs = end_time - start_time + self.assertLess(duration_secs, TIMEOUT_VALUE+1) + def test_PCIC_client_with_multiple_connects(self): iterations = 100 for i in range(iterations): @@ -64,7 +104,7 @@ def test_PCIC_client_without_context_manager_with_autoconnect_False(self): self.assertNotEqual(result, "?") self.assertTrue(result.count('\t') >= 6) self.assertEqual(result, '006\t01\t01\t02\t03\t04\t05\t06') - device.disconnect() + device.close() def test_PCIC_client_without_context_manager_with_autoconnect_True(self): device = O2x5xxPCICDevice(deviceAddress, pcicTcpPort, autoconnect=True) @@ -196,7 +236,7 @@ def test_return_a_list_of_available_commands(self): def test_request_last_image_taken(self): with O2x5xxPCICDevice(deviceAddress, pcicTcpPort) as pcic: result = pcic.request_last_image_taken(1) - self.assertIsInstance(result, bytearray) + self.assertIsInstance(result, bytes) self.assertTrue(len(result) > 1000) def test_request_multiple_images_taken(self): @@ -206,7 +246,7 @@ def test_request_multiple_images_taken(self): result = pcic.execute_asynchronous_trigger() self.assertEqual(result, "*") result = pcic.request_last_image_taken(1) - self.assertIsInstance(result, bytearray) + self.assertIsInstance(result, bytes) self.assertTrue(len(result) > 10000) def test_request_multiple_images_taken_deserialized(self): @@ -295,7 +335,7 @@ def test_turn_process_interface_output_on_or_off(self): result = pcic.turn_process_interface_output_on_or_off(7) self.assertEqual(result, "*") ticket, answer = pcic.read_next_answer() - self.assertIsInstance(answer, bytearray) + self.assertIsInstance(answer, bytes) result = pcic.turn_process_interface_output_on_or_off(0) self.assertEqual(result, "*") @@ -338,3 +378,28 @@ def test_request_current_protocol_version(self): self.assertEqual(result, "*") result = pcic.request_current_protocol_version() self.assertEqual(result, "0{} 01 02 03".format(str(initialPCICVersion))) + + def test_timeout(self): + TIMEOUT_VALUES = range(1, 5) + + # Passing timeout value to constructor + for timeout_value in TIMEOUT_VALUES: + with O2x5xxPCICDevice(deviceAddress, pcicTcpPort, timeout=timeout_value) as pcic: + self.assertEqual(pcic.timeout, timeout_value) + + # Passing timeout value to constructor with autoconnect = False + for timeout_value in TIMEOUT_VALUES: + with O2x5xxPCICDevice(deviceAddress, pcicTcpPort, timeout=timeout_value, autoconnect=False) as pcic: + self.assertEqual(pcic.timeout, timeout_value) + + # Passing timeout value to property + for timeout_value in TIMEOUT_VALUES: + with O2x5xxPCICDevice(deviceAddress, pcicTcpPort) as pcic: + pcic.timeout = timeout_value + self.assertEqual(pcic.timeout, timeout_value) + + # Passing timeout value to property with autoconnect = False + for timeout_value in TIMEOUT_VALUES: + with O2x5xxPCICDevice(deviceAddress, pcicTcpPort, autoconnect=False) as pcic: + pcic.timeout = timeout_value + self.assertEqual(pcic.timeout, timeout_value) diff --git a/tests/test_rpc.py b/tests/test_rpc.py index f59cdb1..b82b21c 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -1,35 +1,39 @@ import time from unittest import TestCase import numpy as np +import socket from source import O2x5xxRPCDevice from tests.utils import * from .config import * class TestRPC_MainAPI(TestCase): + config_file = None config_backup = None active_application_backup = None @classmethod def setUpClass(cls) -> None: with O2x5xxRPCDevice(deviceAddress) as rpc: - with rpc.mainProxy.requestSession(): - cls.config_backup = rpc.session.exportConfig() + cls.config_file = getImportSetupByPinLayout(rpc=rpc)['config_file'] + cls.app_import_file = getImportSetupByPinLayout(rpc=rpc)['app_import_file'] + if importDeviceConfigUnittests: cls.active_application_backup = rpc.getParameter("ActiveApplication") - configFile = getImportSetupByPinLayout(rpc=rpc)['config_file'] - configFile = rpc.session.readDeviceConfigFile(configFile=configFile) - rpc.session.importConfig(configFile, global_settings=True, network_settings=False, - applications=True) - rpc.switchApplication(1) + with rpc.mainProxy.requestSession(): + cls.config_backup = rpc.session.exportConfig() + _configFile = rpc.session.readDeviceConfigFile(configFile=cls.config_file) + rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, + applications=True) @classmethod def tearDownClass(cls) -> None: - with O2x5xxRPCDevice(deviceAddress) as rpc: - with rpc.mainProxy.requestSession(): - rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, - applications=True) - if cls.active_application_backup != "0": - rpc.switchApplication(cls.active_application_backup) + if importDeviceConfigUnittests: + with O2x5xxRPCDevice(deviceAddress) as rpc: + with rpc.mainProxy.requestSession(): + rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, + applications=True) + if cls.active_application_backup != "0": + rpc.switchApplication(cls.active_application_backup) def setUp(self) -> None: with O2x5xxRPCDevice(deviceAddress) as rpc: @@ -38,6 +42,19 @@ def setUp(self) -> None: def tearDown(self) -> None: pass + def test_timeout_with_invalid_ip(self): + TIMEOUT_VALUES = range(1, 6) + for timeout_value in TIMEOUT_VALUES: + start_time = time.time() + with self.assertRaises(socket.timeout): + with O2x5xxRPCDevice("192.168.1.5", timeout=timeout_value) as device: + device.rpc.getParameter("ActiveApplication") + self.assertEqual(device.mainProxy.timeout, timeout_value) + end_time = time.time() + duration_secs = end_time - start_time + self.assertLess(duration_secs, timeout_value+1) + self.assertGreater(duration_secs, timeout_value-1) + def test_getParameter(self): with O2x5xxRPCDevice(deviceAddress) as rpc: result = rpc.getParameter(value="DeviceType") @@ -90,10 +107,10 @@ def test_switchApplication(self): self.assertEqual(int(rpc.getParameter("ActiveApplication")), 1) else: rpc.switchApplication(applicationIndex=2) + rpc.trigger() while rpc.getParameter("OperatingMode") != "0": time.sleep(1) self.assertEqual(int(rpc.getParameter("ActiveApplication")), 2) - time.sleep(5) # Switch back to initial application rpc.switchApplication(applicationIndex=initial_application) while rpc.getParameter("OperatingMode") != "0": @@ -181,19 +198,6 @@ def test_measure(self): self.assertIsInstance(result, dict) self.assertTrue(result) - def test_trigger(self): - with O2x5xxRPCDevice(deviceAddress) as rpc: - number_trigger = 10 - application_active = rpc.getParameter(value="ActiveApplication") - initial_application_stats = rpc.getApplicationStatisticData(applicationIndex=int(application_active)) - initial_number_of_frames = initial_application_stats['number_of_frames'] - for i in range(number_trigger): - answer = rpc.trigger() - self.assertTrue(answer) - application_stats = rpc.getApplicationStatisticData(applicationIndex=int(application_active)) - number_of_frames = application_stats['number_of_frames'] - self.assertEqual(number_of_frames, initial_number_of_frames + number_trigger) - def test_doPing(self): with O2x5xxRPCDevice(deviceAddress) as rpc: result = rpc.doPing() diff --git a/tests/test_session.py b/tests/test_session.py index c6b0344..814c990 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -1,5 +1,5 @@ from unittest import TestCase -from source import O2x5xxRPCDevice +from source import O2x5xxRPCDevice, O2x5xxPCICDevice from tests.utils import * from .config import * @@ -13,23 +13,25 @@ class TestRPC_SessionObject(TestCase): @classmethod def setUpClass(cls) -> None: with O2x5xxRPCDevice(deviceAddress) as rpc: - with rpc.mainProxy.requestSession(): - cls.config_backup = rpc.session.exportConfig() + cls.config_file = getImportSetupByPinLayout(rpc=rpc)['config_file'] + cls.app_import_file = getImportSetupByPinLayout(rpc=rpc)['app_import_file'] + if importDeviceConfigUnittests: cls.active_application_backup = rpc.getParameter("ActiveApplication") - cls.config_file = getImportSetupByPinLayout(rpc=rpc)['config_file'] - cls.app_import_file = getImportSetupByPinLayout(rpc=rpc)['app_import_file'] - _configFile = rpc.session.readDeviceConfigFile(configFile=cls.config_file) - rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, - applications=True) + with rpc.mainProxy.requestSession(): + cls.config_backup = rpc.session.exportConfig() + _configFile = rpc.session.readDeviceConfigFile(configFile=cls.config_file) + rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, + applications=True) @classmethod def tearDownClass(cls) -> None: - with O2x5xxRPCDevice(deviceAddress) as rpc: - with rpc.mainProxy.requestSession(): - rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, - applications=True) - if cls.active_application_backup != "0": - rpc.switchApplication(cls.active_application_backup) + if importDeviceConfigUnittests: + with O2x5xxRPCDevice(deviceAddress) as rpc: + with rpc.mainProxy.requestSession(): + rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, + applications=True) + if cls.active_application_backup != "0": + rpc.switchApplication(cls.active_application_backup) def setUp(self) -> None: with O2x5xxRPCDevice(deviceAddress) as self.rpc: @@ -82,9 +84,10 @@ def test_resetStatistics(self): triggerNum = 20 self.rpc.switchApplication(applicationIndex=2) self.rpc.session.resetStatistics() - for i in range(triggerNum): - answer = self.rpc.trigger() - self.assertTrue(answer) + with O2x5xxPCICDevice(deviceAddress, pcicTcpPort) as pcic: + for i in range(triggerNum): + answer = pcic.execute_synchronous_trigger() + self.assertTrue(answer) result = self.rpc.getApplicationStatisticData(applicationIndex=2) self.assertEqual(result['number_of_frames'], triggerNum) self.rpc.session.resetStatistics()