Skip to content

Commit

Permalink
atremote3
Browse files Browse the repository at this point in the history
  • Loading branch information
marcin-usielski committed Oct 31, 2024
1 parent a835911 commit 5302a16
Show file tree
Hide file tree
Showing 4 changed files with 354 additions and 5 deletions.
2 changes: 1 addition & 1 deletion moler/device/atremote.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def _prepare_state_hops_with_proxy_pc(self):
AtRemote.proxy_pc: {
AtRemote.unix_remote_root: AtRemote.unix_remote,
AtRemote.at_remote: AtRemote.unix_remote,
AtRemote.not_connected: AtRemote.unix_remote,
AtRemote.not_connected: AtRemote.unix_local,
AtRemote.unix_local_root: AtRemote.unix_local,
}
}
Expand Down
282 changes: 282 additions & 0 deletions moler/device/atremote3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
# -*- coding: utf-8 -*-
"""
Moler's device has 2 main responsibilities:
- be the factory that returns commands of that device
- be the state machine that controls which commands may run in given state
"""

__author__ = 'Grzegorz Latuszek, Marcin Usielski'
__copyright__ = 'Copyright (C) 2020-2024, Nokia'
__email__ = '[email protected], [email protected]'

from moler.device.unixremote3 import UnixRemote3
from moler.helpers import call_base_class_method_with_same_name, mark_to_call_base_class_method_with_same_name
from moler.cmd.at.genericat import GenericAtCommand


@call_base_class_method_with_same_name
class AtRemote3(UnixRemote3):
r"""
AtRemote device class.
::
Example of device in yaml configuration file:
-without PROXY_PC:
AT_1:
DEVICE_CLASS: moler.device.atremote3.AtRemote3
CONNECTION_HOPS:
UNIX_LOCAL:
UNIX_REMOTE:
execute_command: ssh # default value
command_params:
expected_prompt: unix_remote_prompt
host: host_ip
login: login
password: password
UNIX_REMOTE:
AT_REMOTE:
execute_command: plink_serial # default value
command_params:
serial_devname: 'COM5'
AT_REMOTE:
UNIX_REMOTE:
execute_command: ctrl_c # default value
"""

at_remote = "AT_REMOTE"

def __init__(self, sm_params, name=None, io_connection=None, io_type=None, variant=None, io_constructor_kwargs=None,
initial_state=None, lazy_cmds_events=False):
"""
Create AT device communicating over io_connection
:param sm_params: dict with parameters of state machine for device
:param name: name of device
:param io_connection: External-IO connection having embedded moler-connection
:param io_type: type of connection - tcp, udp, ssh, telnet, ..
:param variant: connection implementation variant, ex. 'threaded', 'twisted', 'asyncio', ..
(if not given then default one is taken)
:param io_constructor_kwargs: additional parameter into constructor of selected connection type
(if not given then default one is taken)
:param initial_state: name of initial state. State machine tries to enter this state just after creation.
:param lazy_cmds_events: set False to load all commands and events when device is initialized, set True to load
commands and events when they are required for the first time.
"""
initial_state = initial_state if initial_state is not None else AtRemote3.at_remote
super(AtRemote3, self).__init__(name=name, io_connection=io_connection,
io_type=io_type, variant=variant,
io_constructor_kwargs=io_constructor_kwargs,
sm_params=sm_params, initial_state=initial_state,
lazy_cmds_events=lazy_cmds_events)

def _overwrite_prompts(self):
super()._overwrite_prompts()
# copy prompt for AT_REMOTE/ctrl_c from UNIX_REMOTE_ROOT/exit
hops_config = self._configurations[AtRemote3.connection_hops]
remote_ux_root_exit_params = hops_config[AtRemote3.unix_remote_root][AtRemote3.unix_remote]["command_params"]
remote_ux_prompt = remote_ux_root_exit_params["expected_prompt"]
hops_config[AtRemote3.at_remote][AtRemote3.unix_remote]["command_params"]["expected_prompt"] = remote_ux_prompt

@mark_to_call_base_class_method_with_same_name
def _get_default_sm_configuration_with_proxy_pc(self):
"""
Return State Machine default configuration without proxy_pc state.
:return: default sm configuration without proxy_pc state.
"""
config = {
AtRemote3.connection_hops: {
AtRemote3.unix_remote: { # from
AtRemote3.at_remote: { # to
"execute_command": "plink_serial",
"command_params": { # with parameters
"target_newline": "\r\n"
},
"required_command_params": [
"serial_devname"
]
},
},
AtRemote3.at_remote: { # from
AtRemote3.unix_remote: { # to
"execute_command": "ctrl_c", # using command
"command_params": { # with parameters
"expected_prompt": 'remote_prompt', # overwritten in _configure_state_machine
},
"required_command_params": [
]
},
},
}
}
return config

@mark_to_call_base_class_method_with_same_name
def _prepare_transitions_with_proxy_pc(self):
"""
Prepare transitions to change states without proxy_pc state.
:return: transitions without proxy_pc state.
"""
transitions = {
AtRemote3.unix_remote: {
AtRemote3.at_remote: {
"action": [
"_execute_command_to_change_state"
],
}
},
AtRemote3.at_remote: {
AtRemote3.unix_remote: {
"action": [
"_execute_command_to_change_state"
],
}
},
}
return transitions

@mark_to_call_base_class_method_with_same_name
def _prepare_state_prompts_without_proxy_pc(self):
"""
Prepare textual prompt for each state for State Machine without proxy_pc state.
:return: textual prompt for each state without proxy_pc state.
"""
hops_config = self._configurations[AtRemote3.connection_hops]
serial_devname = hops_config[AtRemote3.unix_remote][AtRemote3.at_remote]["command_params"]["serial_devname"]
proxy_prompt = f"{serial_devname}> port READY"
at_cmds_prompt = GenericAtCommand._re_default_at_prompt.pattern # pylint: disable=protected-access
state_prompts = {
AtRemote3.at_remote: f"{proxy_prompt}|{at_cmds_prompt}"
}
return state_prompts

@mark_to_call_base_class_method_with_same_name
def _prepare_state_prompts_with_proxy_pc(self):
"""
Prepare textual prompt for each state for State Machine without proxy_pc state.
:return: textual prompt for each state without proxy_pc state.
"""
hops_config = self._configurations[AtRemote3.connection_hops]
serial_devname = hops_config[AtRemote3.unix_remote][AtRemote3.at_remote]["command_params"]["serial_devname"]
proxy_prompt = f"{serial_devname}> port READY"
at_cmds_prompt = GenericAtCommand._re_default_at_prompt.pattern # pylint: disable=protected-access
state_prompts = {
AtRemote3.at_remote: f"{proxy_prompt}|{at_cmds_prompt}"
}
return state_prompts

@mark_to_call_base_class_method_with_same_name
def _prepare_newline_chars_without_proxy_pc(self):
"""
Prepare newline char for each state for State Machine without proxy_pc state.
:return: newline char for each state without proxy_pc state.
"""
hops_config = self._configurations[AtRemote3.connection_hops]
hops_2_at_remote_config = hops_config[AtRemote3.unix_remote][AtRemote3.at_remote]
newline_chars = {
AtRemote3.at_remote: hops_2_at_remote_config["command_params"]["target_newline"],
}
return newline_chars

@mark_to_call_base_class_method_with_same_name
def _prepare_newline_chars_with_proxy_pc(self):
"""
Prepare newline char for each state for State Machine without proxy_pc state.
:return: newline char for each state without proxy_pc state.
"""
hops_config = self._configurations[AtRemote3.connection_hops]
hops_2_at_remote_config = hops_config[AtRemote3.unix_remote][AtRemote3.at_remote]
newline_chars = {
AtRemote3.at_remote: hops_2_at_remote_config["command_params"]["target_newline"],
}
return newline_chars

@mark_to_call_base_class_method_with_same_name
def _prepare_state_hops_with_proxy_pc(self):
"""
Prepare non direct transitions for each state for State Machine without proxy_pc state.
:return: non direct transitions for each state without proxy_pc state.
"""
state_hops = {
AtRemote3.not_connected: {
AtRemote3.unix_local_root: AtRemote3.unix_local,
AtRemote3.proxy_pc: AtRemote3.unix_local,
AtRemote3.unix_remote: AtRemote3.unix_local,
AtRemote3.unix_remote_root: AtRemote3.unix_local,
AtRemote3.at_remote: AtRemote3.unix_local,
},
AtRemote3.unix_local: {
AtRemote3.unix_remote_root: AtRemote3.proxy_pc,
AtRemote3.at_remote: AtRemote3.proxy_pc,
},
AtRemote3.unix_local_root: {
AtRemote3.not_connected: AtRemote3.unix_local,
AtRemote3.proxy_pc: AtRemote3.unix_local,
AtRemote3.unix_remote: AtRemote3.unix_local,
AtRemote3.unix_remote_root: AtRemote3.unix_local,
AtRemote3.at_remote: AtRemote3.unix_local,
},
AtRemote3.unix_remote: {
AtRemote3.unix_local: AtRemote3.proxy_pc,
AtRemote3.not_connected: AtRemote3.proxy_pc,
AtRemote3.unix_local_root: AtRemote3.proxy_pc,
},
AtRemote3.unix_remote_root: {
AtRemote3.not_connected: AtRemote3.unix_remote,
AtRemote3.proxy_pc: AtRemote3.unix_remote,
AtRemote3.unix_local: AtRemote3.unix_remote,
AtRemote3.unix_local_root: AtRemote3.unix_remote,
AtRemote3.at_remote: AtRemote3.unix_remote,
},
AtRemote3.at_remote: {
AtRemote3.not_connected: AtRemote3.unix_remote,
AtRemote3.proxy_pc: AtRemote3.unix_remote,
AtRemote3.unix_local: AtRemote3.unix_remote,
AtRemote3.unix_local_root: AtRemote3.unix_remote,
AtRemote3.unix_remote_root: AtRemote3.unix_remote,
},
AtRemote3.proxy_pc: {
AtRemote3.unix_remote_root: AtRemote3.unix_remote,
AtRemote3.at_remote: AtRemote3.unix_remote,
AtRemote3.not_connected: AtRemote3.unix_local,
AtRemote3.unix_local_root: AtRemote3.unix_local,
}
}
return state_hops

def _configure_state_machine(self, sm_params):
"""
Configure device State Machine.
:param sm_params: dict with parameters of state machine for device.
:return: None.
"""
super(AtRemote3, self)._configure_state_machine(sm_params)

# copy prompt for AT_REMOTE/ctrl_c from UNIX_REMOTE_ROOT/exit
hops_config = self._configurations[AtRemote3.connection_hops]
remote_ux_root_exit_params = hops_config[AtRemote3.unix_remote_root][AtRemote3.unix_remote]["command_params"]
remote_ux_prompt = remote_ux_root_exit_params["expected_prompt"]
hops_config[AtRemote3.at_remote][AtRemote3.unix_remote]["command_params"]["expected_prompt"] = remote_ux_prompt

def _get_packages_for_state(self, state, observer):
"""
Get available packages containing cmds and events for each state.
:param state: device state.
:param observer: observer type, available: cmd, events
:return: available cmds or events for specific device state.
"""
available = super(AtRemote3, self)._get_packages_for_state(state, observer)

if not available:
if state == AtRemote3.at_remote:
available = {AtRemote3.cmds: ['moler.cmd.at', 'moler.cmd.unix.ctrl_c'],
AtRemote3.events: ['moler.events.shared']}
if available:
return available[observer]
elif state == AtRemote3.unix_remote: # this is unix extended with plink_serial command
if observer == AtRemote3.cmds:
available.append('moler.cmd.at.plink_serial')
available.append('moler.cmd.at.cu')

return available
28 changes: 24 additions & 4 deletions test/device/test_SM_at_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,41 @@

from moler.util.devices_SM import iterate_over_device_states, get_device

at_remotes = ["AT_REMOTE", "AT_REMOTE3"]
at_remotes_proxy_pc = ["AT_REMOTE_PROXY_PC", "AT_REMOTE_PROXY_PC3"]

def test_at_remote_device(device_connection, at_remote_output):
at_remote = get_device(name="AT_REMOTE", connection=device_connection, device_output=at_remote_output,

@pytest.mark.parametrize("device_name", at_remotes)
def test_at_remote_device(device_name, device_connection, at_remote_output):
at_remote = get_device(name=device_name, connection=device_connection, device_output=at_remote_output,
test_file_path=__file__)

iterate_over_device_states(device=at_remote)


def test_at_remote_device_proxy_pc(device_connection, at_remote_output_proxy_pc):
at_remote = get_device(name="AT_REMOTE_PROXY_PC", connection=device_connection, device_output=at_remote_output_proxy_pc,
@pytest.mark.parametrize("device_name", at_remotes_proxy_pc)
def test_at_remote_device_proxy_pc(device_name, device_connection, at_remote_output_proxy_pc):
at_remote = get_device(name=device_name, connection=device_connection, device_output=at_remote_output_proxy_pc,
test_file_path=__file__)

iterate_over_device_states(device=at_remote)


@pytest.mark.parametrize("devices", [at_remotes_proxy_pc, at_remotes])
def test_unix_sm_identity(devices, device_connection, at_remote_output):

dev0 = get_device(name=devices[0], connection=device_connection, device_output=at_remote_output,
test_file_path=__file__)
dev1 = get_device(name=devices[1], connection=device_connection, device_output=at_remote_output,
test_file_path=__file__)

assert dev0._state_hops == dev1._state_hops
assert dev0._configurations == dev1._configurations

assert dev0._stored_transitions == dev1._stored_transitions
assert dev0._state_prompts == dev1._state_prompts
assert dev0._newline_chars == dev1._newline_chars

@pytest.fixture
def at_remote_output():
plink_cmd_string = 'plink -serial COM5 |& awk \'BEGIN {print "COM5> port READY"} {print} END {print "^C"}\''
Expand Down
47 changes: 47 additions & 0 deletions test/resources/device_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,53 @@ DEVICES:
command_params:
serial_devname: 'COM5'

AT_REMOTE3:
DEVICE_CLASS: moler.device.atremote3.AtRemote3
INITIAL_STATE: UNIX_LOCAL
CONNECTION_HOPS:
UNIX_LOCAL:
UNIX_REMOTE:
execute_command: ssh # default value
command_params:
expected_prompt: 'remote#'
host: remote_host
login: remote_login
password: passwd4remote
set_timeout: null
UNIX_REMOTE:
AT_REMOTE:
execute_command: plink_serial # default value
command_params:
serial_devname: 'COM5'

AT_REMOTE_PROXY_PC3:
DEVICE_CLASS: moler.device.atremote3.AtRemote3
INITIAL_STATE: UNIX_LOCAL
CONNECTION_HOPS:
UNIX_LOCAL:
PROXY_PC:
execute_command: ssh
command_params:
expected_prompt: "proxy_pc#"
host: proxy_pc_host
login: proxy_pc_login
password: password
set_timeout: null
PROXY_PC:
UNIX_REMOTE:
execute_command: ssh # default value
command_params:
expected_prompt: 'remote#'
host: remote_host
login: remote_login
password: passwd4remote
set_timeout: null
UNIX_REMOTE:
AT_REMOTE:
execute_command: plink_serial # default value
command_params:
serial_devname: 'COM5'

ADB_REMOTE:
DEVICE_CLASS: moler.device.adbremote.AdbRemote
INITIAL_STATE: UNIX_LOCAL
Expand Down

0 comments on commit 5302a16

Please sign in to comment.