Skip to content

Commit

Permalink
Merge pull request #77 from mazimkhan/master
Browse files Browse the repository at this point in the history
Hardware reset feature
  • Loading branch information
PrzemekWirkus committed May 9, 2016
2 parents ec5e92a + de63740 commit dfdc23f
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 26 deletions.
4 changes: 2 additions & 2 deletions mbed_host_tests/host_tests/base_host_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ def notify_complete(self, result=None):
if self.__event_queue:
self.__event_queue.put(('__notify_complete', result, time()))

def reset_dut(self):
def reset_dut(self, value):
"""
Reset device under test
:return:
"""
if self.__event_queue:
self.__event_queue.put(('__reset_dut', True, time()))
self.__event_queue.put(('__reset_dut', value, time()))

def notify_conn_lost(self, text):
"""! Notify main even loop that there was a DUT-host test connection error
Expand Down
12 changes: 6 additions & 6 deletions mbed_host_tests/host_tests_conn_proxy/conn_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, port, baudrate, prn_lock, config):
self.target_id = self.config.get('target_id', None)

# Values used to call serial port listener...
self.logger.prn_inf("serial(port=%s, baudrate=%d)"% (self.port, self.baudrate))
self.logger.prn_inf("serial(port=%s, baudrate=%d, timeout=%s)"% (self.port, self.baudrate, self.timeout))

# Check if serial port for given target_id changed
# If it does we will use new port to open connections and make sure reset plugin
Expand Down Expand Up @@ -191,11 +191,11 @@ def conn_process(event_queue, dut_event_queue, prn_lock, config):
break

# Send data to DUT
if not dut_event_queue.empty():
try:
(key, value, _) = dut_event_queue.get(timeout=1)
except QueueEmpty:
continue
try:
(key, value, _) = dut_event_queue.get(timeout=0)
except QueueEmpty:
pass # Check if target sent something
else:
# Return if state machine in host_test_default has finished to end process
if key == '__host_test_finished' and value == True:
connector.finish()
Expand Down
2 changes: 2 additions & 0 deletions mbed_host_tests/host_tests_plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import module_copy_shell
import module_copy_mbed
import module_reset_mbed
import module_power_cycle_mbed

# Additional, non standard platforms
import module_copy_silabs
Expand All @@ -56,6 +57,7 @@
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_silabs.load_plugin())
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_stlink.load_plugin())
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_stlink.load_plugin())
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_power_cycle_mbed.load_plugin())
#HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_jn51xx.load_plugin())
#HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_jn51xx.load_plugin())

Expand Down
180 changes: 180 additions & 0 deletions mbed_host_tests/host_tests_plugins/module_power_cycle_mbed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""
mbed SDK
Copyright (c) 2011-2015 ARM Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Przemyslaw Wirkus <[email protected]>
"""

import os
import json
import time
import requests
from host_test_plugins import HostTestPluginBase


class HostTestPluginPowerCycleResetMethod(HostTestPluginBase):

# Plugin interface
name = 'HostTestPluginPowerCycleResetMethod'
type = 'ResetMethod'
stable = True
capabilities = ['power_cycle']
required_parameters = ['target_id', 'device_info']

def setup(self, *args, **kwargs):
"""! Configure plugin, this function should be called before plugin execute() method is used.
"""
return True

def execute(self, capability, *args, **kwargs):
"""! Executes capability by name
@param capability Capability name
@param args Additional arguments
@param kwargs Additional arguments
@details Each capability e.g. may directly just call some command line program or execute building pythonic function
@return Capability call return value
"""
if 'target_id' not in kwargs or not kwargs['target_id']:
self.print_plugin_error("Error: This plugin requires mbed target_id")
return False

if 'device_info' not in kwargs or type(kwargs['device_info']) is not dict:
self.print_plugin_error("Error: This plugin requires dict parameter 'device_info' passed by the caller.")
return False

result = False
if self.check_parameters(capability, *args, **kwargs) is True:
if capability in HostTestPluginPowerCycleResetMethod.capabilities:
target_id = kwargs['target_id']
device_info = kwargs['device_info']
ret = self.__get_mbed_tas_rm_addr()
if ret:
ip, port = ret
result = self.__hw_reset(ip, port, target_id, device_info)
return result

def __get_mbed_tas_rm_addr(self):
"""
Get IP and Port of mbed tas rm service.
:return:
"""
try:
ip = os.environ['MBED_TAS_RM_IP']
port = os.environ['MBED_TAS_RM_PORT']
return ip, port
except KeyError, e:
self.print_plugin_error("HOST: Failed to read environment variable (" + str(e) + "). Can't perform hardware reset.")

return None

def __hw_reset(self, ip, port, target_id, device_info):
"""
Reset target device using TAS RM API
:param ip:
:param port:
:param target_id:
:param device_info:
:return:
"""

switch_off_req = {
"name": "switchResource",
"sub_requests": [
{
"resource_type": "mbed_platform",
"resource_id": target_id,
"switch_command": "OFF"
}
]
}


switch_on_req = {
"name": "switchResource",
"sub_requests": [
{
"resource_type": "mbed_platform",
"resource_id": target_id,
"switch_command": "ON"
}
]
}

result = False

# reset target
switch_off_req = self.__run_request(ip, port, switch_off_req)
if switch_off_req is None:
self.print_plugin_error("HOST: Failed to communicate with TAS RM!")
return result

if "error" in switch_off_req['sub_requests'][0]:
self.print_plugin_error("HOST: Failed to reset target. error = %s" % switch_off_req['sub_requests'][0]['error'])
return result

def poll_state(required_state):
switch_state_req = {
"name": "switchResource",
"sub_requests": [
{
"resource_type": "mbed_platform",
"resource_id": target_id,
"switch_command": "STATE"
}
]
}
resp = self.__run_request(ip, port, switch_state_req)
start = time.time()
while resp and (resp['sub_requests'][0]['state'] != required_state or (required_state == 'ON' and
resp['sub_requests'][0]["mount_point"] == "Not Connected")) and (time.time() - start) < 300:
time.sleep(2)
resp = self.__run_request(ip, port, resp)
return resp

poll_state("OFF")

self.__run_request(ip, port, switch_on_req)
resp = poll_state("ON")
if resp and resp['sub_requests'][0]['state'] == 'ON' and resp['sub_requests'][0]["mount_point"] != "Not Connected":
for k, v in resp['sub_requests'][0].iteritems():
device_info[k] = v
result = True
else:
self.print_plugin_error("HOST: Failed to reset device %s" % target_id)

return result

@staticmethod
def __run_request(ip, port, request):
"""
:param request:
:return:
"""
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
get_resp = requests.get("http://%s:%s/" % (ip, port), data=json.dumps(request), headers=headers)
resp = get_resp.json()
if get_resp.status_code == 200:
return resp
else:
return None


def load_plugin():
"""! Returns plugin available in this module
"""
return HostTestPluginPowerCycleResetMethod()
20 changes: 18 additions & 2 deletions mbed_host_tests/host_tests_runner/host_test_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

class DefaultTestSelector(DefaultTestSelectorBase):
"""! Select default host_test supervision (replaced after auto detection) """
RESET_TYPE_SW_RST = "software_reset"
RESET_TYPE_HW_RST = "hardware_reset"

def __init__(self, options):
"""! ctor
Expand Down Expand Up @@ -191,10 +193,24 @@ def start_conn_process():
result = value
break
elif key == '__reset_dut':
# Disconnecting and re-connecting comm process will reset DUT
# Disconnect to avoid connection lost event
dut_event_queue.put(('__host_test_finished', True, time()))
p.join()
# self.mbed.update_device_info() - This call is commented but left as it would be required in hard reset.

if value == DefaultTestSelector.RESET_TYPE_SW_RST:
self.logger.prn_inf("Performing software reset.")
# Just disconnecting and re-connecting comm process will soft reset DUT
elif value == DefaultTestSelector.RESET_TYPE_HW_RST:
self.logger.prn_inf("Performing hard reset.")
# request hardware reset
self.mbed.hw_reset()
else:
self.logger.prn_err("Invalid reset type (%s). Supported types [%s]." %
(value, ", ".join([DefaultTestSelector.RESET_TYPE_HW_RST,
DefaultTestSelector.RESET_TYPE_SW_RST])))
self.logger.prn_inf("Software reset will be performed.")

# connect to the device
p = start_conn_process()
elif key == '__notify_conn_lost':
# This event is sent by conn_process, DUT connection was lost
Expand Down
28 changes: 12 additions & 16 deletions mbed_host_tests/host_tests_runner/mbed_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
"""

import json
import time
import mbed_lstools
from time import sleep

import mbed_host_tests.host_tests_plugins as ht_plugins


Expand Down Expand Up @@ -117,20 +114,19 @@ def copy_image_raw(self, image_path=None, disk=None, copy_method=None, port=None
target_id=self.target_id)
return result

def update_device_info(self):
def hw_reset(self):
"""
Updates device's port and disk using mbedls. Typically used after reset.
Performs hardware reset of target ned device.
:return:
"""
for i in range(3):
mbed_list = mbed_lstools.create().list_mbeds_ext()
if mbed_list:
for mut in mbed_list:
if mut['target_id'] == self.target_id:
self.port = mut['serial_port']
self.disk = mut['mount_point']
return True
print "HOST: Failed to find target after reset. Retrying (%d)" % i
time.sleep(1)
return False
device_info = {}
result = ht_plugins.call_plugin('ResetMethod',
'power_cycle',
target_id=self.target_id,
device_info=device_info)
if result:
self.port = device_info['serial_port']
self.disk = device_info['mount_point']
return result

1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ def read(fname):
},
install_requires=["PySerial>=3.0",
"PrettyTable>=0.7.2",
"requests",
"mbed-ls"])

0 comments on commit dfdc23f

Please sign in to comment.