Host test plugins: First draft with working pluging for mbed (serial sendBreak command for reset)

pull/597/head
Przemek Wirkus 2014-10-22 15:12:24 +01:00
parent 9ace6d40a4
commit fd2142f3bf
7 changed files with 320 additions and 69 deletions

View File

@ -23,9 +23,12 @@ except ImportError, e:
exit(-1)
import os
from optparse import OptionParser
from time import sleep, time
from sys import stdout
from time import sleep, time
from optparse import OptionParser
from subprocess import call
import host_tests_plugins
# This is a little tricky. We need to add upper directory to path so
# we can find packages we want from the same level as other files do
@ -156,27 +159,6 @@ class Mbed:
result = None
return result
def safe_sendBreak(self, serial):
""" Wraps serial.sendBreak() to avoid serial::serialposix.py exception on Linux
Traceback (most recent call last):
File "make.py", line 189, in <module>
serial.sendBreak()
File "/usr/lib/python2.7/dist-packages/serial/serialposix.py", line 511, in sendBreak
termios.tcsendbreak(self.fd, int(duration/0.25))
error: (32, 'Broken pipe')
"""
result = True
try:
serial.sendBreak()
except:
# In linux a termios.error is raised in sendBreak and in setBreak.
# The following setBreak() is needed to release the reset signal on the target mcu.
try:
serial.setBreak(False)
except:
result = False
return result
def touch_file(self, path):
""" Touch file and set timestamp to items
"""
@ -190,56 +172,15 @@ class Mbed:
sleep(1)
def reset(self):
""" Reset function.
Supports:
- 'standard' send break command via Mbed's CDC,
- also handles other reset modes:
- E.g. reset by touching file with specific file name:
reboot.txt - startup from standby state, reboots when in run mode.
shutdown.txt - shutdown from run mode
reset.txt - reset FPGA during run mode
- eACommander for reset of SiLabs Gecko baords.
""" Calls proper reset plugin to do the job.
Please refer to host_test_plugins functionality
"""
if self.options.forced_reset_type:
if self.options.forced_reset_type == 'eACommander':
# For this copy method 'disk' will be 'serialno' for eACommander command line parameters
# Note: Commands are executed in the order they are specified on the command line
cmd = [EACOMMANDER_CMD,
'--serialno', self.disk.rstrip('/\\'),
'--resettype', '2', '--reset',]
try:
self.flush()
ret = call(cmd, shell=True)
if ret:
resutl_msg = "Return code: %d. Command: "% ret + " ".join(cmd)
result = False
except Exception, e:
resutl_msg = e
result = False
elif self.options.forced_reset_type == 'eACommander-usb':
# For this copy method 'disk' will be 'usb address' for eACommander command line parameters
# Note: Commands are executed in the order they are specified on the command line
cmd = [EACOMMANDER_CMD,
'--usb', self.disk.rstrip('/\\'),
'--resettype', '2', '--reset',]
try:
self.flush()
ret = call(cmd, shell=True)
if ret:
resutl_msg = "Return code: %d. Command: "% ret + " ".join(cmd)
result = False
except Exception, e:
resutl_msg = e
result = False
elif self.options.forced_reset_type.endswith('.txt'):
reset_file_path = os.path.join(self.disk, self.options.forced_reset_type.lower())
self.touch_file(reset_file_path)
self.flush()
host_tests_plugins.call_plugin('ResetMethod', self.options.forced_reset_type, disk=self.disk)
else:
self.safe_sendBreak(self.serial) # Instead of serial.sendBreak()
self.flush()
host_tests_plugins.call_plugin('ResetMethod', 'default', serial=self.serial)
# Flush serials to get only input after reset
#self.flush()
self.flush()
# Give time to wait for the image loading
reset_tout_s = self.options.forced_reset_timeout if self.options.forced_reset_timeout is not None else self.DEFAULT_RESET_TOUT
self.reset_timeout(reset_tout_s)

View File

@ -0,0 +1,38 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import host_test_registry
import module_reset_mbed
import module_reset_mps2
import module_reset_silabs
# Plugin registry instance
HOST_TEST_PLUGIN_REGISTRY = host_test_registry.HostTestRegistry()
# Static plugin registration
# TODO: extend to files with name module_*.py loaded ad-hoc
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_mbed.load_plugin())
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_mps2.load_plugin())
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_silabs.load_plugin())
def call_plugin(type, capability, *args, **kwargs):
""" Interface to call plugin registry functional way
"""
HOST_TEST_PLUGIN_REGISTRY.call_plugin(type, capability, *args, **kwargs)
#print HOST_TEST_PLUGIN_REGISTRY.get_string()

View File

@ -0,0 +1,24 @@
# Interfaces and utils for host test plugin architecture
def construct_enum(**enums):
""" Create your own pseudo-enums
"""
return type('Enum', (), enums)
class HostTestPluginBase:
""" Base class for all plug-ins used with host tests.
"""
def register(self, *args, **kwargs):
pass
def unregister(self):
pass
# Attributes defining plugin name, type etc.
# This values across plugin should be unique
name = "HostTestPluginBase"
type = "BasePlugin"
capabilities = []

View File

@ -0,0 +1,65 @@
# Registry storing plugins
"""
module_example.py:
def plugin_main(*args, **kwargs):
print args, kwargs
loader.py:
def load_plugin(name):
mod = __import__("module_%s" % name)
return mod
def call_plugin(name, *args, **kwargs):
plugin = load_plugin(name)
plugin.plugin_main(*args, **kwargs)
call_plugin("example", 1234)
"""
class HostTestRegistry:
""" Simple class used to register and store
host test plugins for further usage
"""
# Here we actually store all the plugins
PLUGINS = {}
def print_error(self, text):
print "Plugin load failed. Reason: %s"% text
def register_plugin(self, plugin):
# TODO:
# - check for unique caps for specified type
if plugin.name not in self.PLUGINS:
plugin.setup() # Setup plugin
self.PLUGINS[plugin.name] = plugin
else:
self.print_error("%s already loaded"% plugin.name)
def call_plugin(self, type, capability, *args, **kwargs):
""" Execute plugin functionality respectively to its purpose
"""
for plugin_name in self.PLUGINS:
plugin = self.PLUGINS[plugin_name]
if plugin.type == type and capability in plugin.capabilities:
return plugin.execute(capability, *args, **kwargs)
return False
def get_string(self):
""" User friendly printing method to show hooked plugins
"""
from prettytable import PrettyTable
column_names = ['name', 'type', 'capabilities']
pt = PrettyTable(column_names)
for column in column_names:
pt.align[column] = 'l'
for plugin_name in self.PLUGINS:
name = self.PLUGINS[plugin_name].name
type = self.PLUGINS[plugin_name].type
capabilities = ', '.join(self.PLUGINS[plugin_name].capabilities)
row = [name, type, capabilities]
pt.add_row(row)
return pt.get_string()

View File

@ -0,0 +1,60 @@
# Interfaces and utils for host test plugin architecture
from subprocess import call
from host_test_plugins import HostTestPluginBase
class HostTestPluginResetMethod_Mbed(HostTestPluginBase):
def safe_sendBreak(self, serial):
""" Wraps serial.sendBreak() to avoid serial::serialposix.py exception on Linux
Traceback (most recent call last):
File "make.py", line 189, in <module>
serial.sendBreak()
File "/usr/lib/python2.7/dist-packages/serial/serialposix.py", line 511, in sendBreak
termios.tcsendbreak(self.fd, int(duration/0.25))
error: (32, 'Broken pipe')
"""
result = True
try:
serial.sendBreak()
except:
# In linux a termios.error is raised in sendBreak and in setBreak.
# The following setBreak() is needed to release the reset signal on the target mcu.
try:
serial.setBreak(False)
except:
result = False
return result
# Plugin interface
name = 'HostTestPluginResetMethod_Mbed'
type = 'ResetMethod'
capabilities = ['default']
required_parameters = ['serial']
def setup(self, *args, **kwargs):
pass
def execute(self, capabilitity, *args, **kwargs):
""" Executes capability by name.
Each capability may directly just call some command line
program or execute building pythonic function
"""
for parameter in self.required_parameters:
if parameter not in kwargs:
print "Plugin parameter '%s' missing"% parameter
return False
if capabilitity == 'default':
serial = kwargs['serial']
self.safe_sendBreak(serial)
else:
return False
return True
def load_plugin():
""" Returns plugin available in this module
"""
return HostTestPluginResetMethod_Mbed()

View File

@ -0,0 +1,59 @@
# Interfaces and utils for host test plugin architecture
import os
from host_test_plugins import HostTestPluginBase
class HostTestPluginResetMethod_MPS2(HostTestPluginBase):
"""
Supports:
reboot.txt - startup from standby state, reboots when in run mode.
shutdown.txt - shutdown from run mode.
reset.txt - reset FPGA during run mode.
"""
def touch_file(self, path):
""" Touch file and set timestamp to items
"""
with open(path, 'a'):
os.utime(path, None)
# Plugin interface
name = 'HostTestPluginResetMethod_MPS2'
type = 'ResetMethod'
capabilities = ['reboot.txt', 'shutdown.txt', 'reset.txt']
required_parameters = ['disk']
def setup(self, *args, **kwargs):
""" Prepare / configure plugin to work.
This method can receive plugin specific parameters by kwargs and
ignore other parameters which may affect other plugins.
"""
pass
def execute(self, capabilitity, *args, **kwargs):
""" Executes capability by name.
Each capability may directly just call some command line
program or execute building pythonic function
"""
for parameter in self.required_parameters:
if parameter not in kwargs:
print "%s. Plugin parameter '%s' missing"% (self.name, parameter)
return False
if capabilitity == 'reboot.txt':
pass
elif capabilitity == 'shutdown.txt':
pass
elif capabilitity == 'reset.txt':
pass
else:
return False
return True
def load_plugin():
""" Returns plugin available in this module
"""
return HostTestPluginResetMethod_MPS2()

View File

@ -0,0 +1,64 @@
# Interfaces and utils for host test plugin architecture
from subprocess import call
from host_test_plugins import HostTestPluginBase
class HostTestPluginResetMethod_SiLabs(HostTestPluginBase):
# Plugin interface
name = 'HostTestPluginResetMethod_SiLabs'
type = 'ResetMethod'
capabilities = ['eACommander', 'eACommander-usb']
required_parameters = ['disk']
def setup(self, *args, **kwargs):
self.EACOMMANDER_CMD = 'c:/SiliconLabs/SimplicityStudio/v2/commander/eACommander.exe'
pass
def execute(self, capabilitity, *args, **kwargs):
""" Executes capability by name.
Each capability may directly just call some command line
program or execute building pythonic function
"""
for parameter in self.required_parameters:
if parameter not in kwargs:
print "Plugin parameter '%s' missing"% parameter
return False
disk = kwargs['disk'].rstrip('/\\')
if capabilitity == 'eACommander':
# For this copy method 'disk' will be 'serialno' for eACommander command line parameters
# Note: Commands are executed in the order they are specified on the command line
cmd = [self.EACOMMANDER_CMD,
'--serialno', disk,
'--resettype', '2', '--reset',]
try:
ret = call(cmd, shell=True)
if ret:
print "Return code: %d. Command: "% ret + " ".join(cmd)
except Exception, e:
print str(e)
elif capabilitity == 'eACommander-usb':
# For this copy method 'disk' will be 'usb address' for eACommander command line parameters
# Note: Commands are executed in the order they are specified on the command line
cmd = [self.EACOMMANDER_CMD,
'--usb', disk,
'--resettype', '2', '--reset',]
try:
ret = call(cmd, shell=True)
if ret:
print "Return code: %d. Command: "% ret + " ".join(cmd)
except Exception, e:
print(e)
else:
return False
return True
def load_plugin():
""" Returns plugin available in this module
"""
return HostTestPluginResetMethod_SiLabs()