Remove host_tests from mbed-os/tools

host tests (mbedhtrun) is maintained as part of mbed-os-tools but there
is a redundant duplicate of host_tests under mbed-os/tools/ directory
this PR changes to remove those duplicates and update test_api py modules
to use host tests from mbed-host-tests
pull/14932/head
Rajkumar Kanagaraj 2021-07-19 04:41:22 -07:00
parent d147abc3e5
commit e75b67fada
52 changed files with 2 additions and 3717 deletions

View File

@ -1,68 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from .host_registry import HostRegistry
# Host test supervisors
from .echo import EchoTest
from .rtc_auto import RTCTest
from .stdio_auto import StdioTest
from .hello_auto import HelloTest
from .detect_auto import DetectPlatformTest
from .default_auto import DefaultAuto
from .dev_null_auto import DevNullTest
from .wait_us_auto import WaitusTest
from .tcpecho_server_auto import TCPEchoServerTest
from .udpecho_server_auto import UDPEchoServerTest
from .tcpecho_client_auto import TCPEchoClientTest
from .udpecho_client_auto import UDPEchoClientTest
from .wfi_auto import WFITest
from .serial_nc_rx_auto import SerialNCRXTest
from .serial_nc_tx_auto import SerialNCTXTest
from .serial_complete_auto import SerialCompleteTest
# Populate registry with supervising objects
HOSTREGISTRY = HostRegistry()
HOSTREGISTRY.register_host_test("echo", EchoTest())
HOSTREGISTRY.register_host_test("default", DefaultAuto())
HOSTREGISTRY.register_host_test("rtc_auto", RTCTest())
HOSTREGISTRY.register_host_test("hello_auto", HelloTest())
HOSTREGISTRY.register_host_test("stdio_auto", StdioTest())
HOSTREGISTRY.register_host_test("detect_auto", DetectPlatformTest())
HOSTREGISTRY.register_host_test("default_auto", DefaultAuto())
HOSTREGISTRY.register_host_test("wait_us_auto", WaitusTest())
HOSTREGISTRY.register_host_test("dev_null_auto", DevNullTest())
HOSTREGISTRY.register_host_test("tcpecho_server_auto", TCPEchoServerTest())
HOSTREGISTRY.register_host_test("udpecho_server_auto", UDPEchoServerTest())
HOSTREGISTRY.register_host_test("tcpecho_client_auto", TCPEchoClientTest())
HOSTREGISTRY.register_host_test("udpecho_client_auto", UDPEchoClientTest())
HOSTREGISTRY.register_host_test("wfi_auto", WFITest())
HOSTREGISTRY.register_host_test("serial_nc_rx_auto", SerialNCRXTest())
HOSTREGISTRY.register_host_test("serial_nc_tx_auto", SerialNCTXTest())
HOSTREGISTRY.register_host_test("serial_complete_auto", SerialCompleteTest())
###############################################################################
# Functional interface for test supervisor registry
###############################################################################
def get_host_test(ht_name):
return HOSTREGISTRY.get_host_test(ht_name)
def is_host_test(ht_name):
return HOSTREGISTRY.is_host_test(ht_name)

View File

@ -1,38 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
from sys import stdout
class DefaultAuto(object):
""" Simple, basic host test's test runner waiting for serial port
output from MUT, no supervision over test running in MUT is executed.
"""
def test(self, selftest):
result = selftest.RESULT_SUCCESS
try:
while True:
c = selftest.mbed.serial_read(512)
if c is None:
return selftest.RESULT_IO_SERIAL
stdout.write(c)
stdout.flush()
except KeyboardInterrupt:
selftest.notify("\r\n[CTRL+C] exit")
result = selftest.RESULT_ERROR
return result

View File

@ -1,56 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import re
class DetectPlatformTest(object):
PATTERN_MICRO_NAME = "Target '(\w+)'"
re_detect_micro_name = re.compile(PATTERN_MICRO_NAME)
def test(self, selftest):
result = True
c = selftest.mbed.serial_readline() # {{start}} preamble
if c is None:
return selftest.RESULT_IO_SERIAL
selftest.notify(c.strip())
selftest.notify("HOST: Detecting target name...")
c = selftest.mbed.serial_readline()
if c is None:
return selftest.RESULT_IO_SERIAL
selftest.notify(c.strip())
# Check for target name
m = self.re_detect_micro_name.search(c)
if m and len(m.groups()):
micro_name = m.groups()[0]
micro_cmp = selftest.mbed.options.micro == micro_name
result = result and micro_cmp
selftest.notify("HOST: MUT Target name '%s', expected '%s'... [%s]"% (micro_name,
selftest.mbed.options.micro,
"OK" if micro_cmp else "FAIL"))
for i in range(0, 2):
c = selftest.mbed.serial_readline()
if c is None:
return selftest.RESULT_IO_SERIAL
selftest.notify(c.strip())
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE

View File

@ -1,51 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
class DevNullTest(object):
def check_readline(self, selftest, text):
""" Reads line from serial port and checks if text was part of read string
"""
result = False
c = selftest.mbed.serial_readline()
if c and text in c:
result = True
return result
def test(self, selftest):
result = True
# Test should print some text and later stop printing
# 'MBED: re-routing stdout to /null'
res = self.check_readline(selftest, "re-routing stdout to /null")
if not res:
# We haven't read preamble line
result = False
else:
# Check if there are printed characters
str = ''
for i in range(3):
c = selftest.mbed.serial_read(32)
if c is None:
return selftest.RESULT_IO_SERIAL
else:
str += c
if len(str) > 0:
result = False
break
selftest.notify("Received %d bytes: %s"% (len(str), str))
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE

View File

@ -1,60 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys
import uuid
from sys import stdout
class EchoTest(object):
# Test parameters
TEST_SERIAL_BAUDRATE = 115200
TEST_LOOP_COUNT = 50
def test(self, selftest):
""" This host test will use mbed serial port with
baudrate 115200 to perform echo test on that port.
"""
# Custom initialization for echo test
selftest.mbed.init_serial_params(serial_baud=self.TEST_SERIAL_BAUDRATE)
selftest.mbed.init_serial()
# Test function, return True or False to get standard test notification on stdout
selftest.mbed.flush()
selftest.notify("HOST: Starting the ECHO test")
result = True
""" This ensures that there are no parasites left in the serial buffer.
"""
for i in range(0, 2):
selftest.mbed.serial_write("\n")
c = selftest.mbed.serial_readline()
for i in range(0, self.TEST_LOOP_COUNT):
TEST_STRING = str(uuid.uuid4()) + "\n"
selftest.mbed.serial_write(TEST_STRING)
c = selftest.mbed.serial_readline()
if c is None:
return selftest.RESULT_IO_SERIAL
if c.strip() != TEST_STRING.strip():
selftest.notify('HOST: "%s" != "%s"'% (c, TEST_STRING))
result = False
else:
sys.stdout.write('.')
stdout.flush()
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE

View File

@ -1,49 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from .host_test import Test
class EchoTest(Test):
def __init__(self):
Test.__init__(self)
self.mbed.init_serial()
self.mbed.extra_serial.rtscts = True
self.mbed.reset()
def test(self):
self.mbed.flush()
self.notify("Starting the ECHO test")
TEST="longer serial test"
check = True
for i in range(1, 100):
self.mbed.extra_serial.write(TEST + "\n")
l = self.mbed.extra_serial.readline().strip()
if not l: continue
if l != TEST:
check = False
self.notify('"%s" != "%s"' % (l, TEST))
else:
if (i % 10) == 0:
self.notify('.')
return check
if __name__ == '__main__':
EchoTest().run()

View File

@ -1,26 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import socket
BROADCAST_PORT = 58083
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(('0.0.0.0', BROADCAST_PORT))
while True:
print s.recvfrom(256)

View File

@ -1,31 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import socket
from time import sleep, time
BROADCAST_PORT = 58083
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(('', 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
while True:
print "Broadcasting..."
data = 'Hello World: ' + repr(time()) + '\n'
s.sendto(data, ('<broadcast>', BROADCAST_PORT))
sleep(1)

View File

@ -1,32 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import socket
import struct
MCAST_GRP = '224.1.1.1'
MCAST_PORT = 5007
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', MCAST_PORT))
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
while True:
print sock.recv(10240)

View File

@ -1,31 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import socket
from time import sleep, time
MCAST_GRP = '224.1.1.1'
MCAST_PORT = 5007
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
while True:
print "Multicast to group: %s\n" % MCAST_GRP
data = 'Hello World: ' + repr(time()) + '\n'
sock.sendto(data, (MCAST_GRP, MCAST_PORT))
sleep(1)

View File

@ -1,29 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import socket
ECHO_SERVER_ADDRESS = "10.2.202.45"
ECHO_PORT = 7
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ECHO_SERVER_ADDRESS, ECHO_PORT))
s.sendall('Hello, world')
data = s.recv(1024)
s.close()
print 'Received', repr(data)

View File

@ -1,31 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', 7))
s.listen(1)
while True:
conn, addr = s.accept()
print 'Connected by', addr
while True:
data = conn.recv(1024)
if not data: break
conn.sendall(data)
conn.close()

View File

@ -1,29 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import socket
ECHO_SERVER_ADDRESS = '10.2.202.45'
ECHO_PORT = 7
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto("Hello World\n", (ECHO_SERVER_ADDRESS, ECHO_PORT))
response = sock.recv(256)
sock.close()
print response

View File

@ -1,28 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import socket
ECHO_PORT = 7
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', ECHO_PORT))
while True:
data, address = sock.recvfrom(256)
print "datagram from", address
sock.sendto(data, address)

View File

@ -1,17 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -1,35 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
class HelloTest(object):
HELLO_WORLD = "Hello World"
def test(self, selftest):
c = selftest.mbed.serial_readline()
if c is None:
return selftest.RESULT_IO_SERIAL
selftest.notify("Read %d bytes:"% len(c))
selftest.notify(c.strip())
result = True
# Because we can have targetID here let's try to decode
if len(c) < len(self.HELLO_WORLD):
result = False
else:
result = self.HELLO_WORLD in c
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE

View File

@ -1,37 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
class HostRegistry(object):
""" Class stores registry with host tests and objects representing them
"""
HOST_TESTS = {} # host_test_name -> host_test_ojbect
def register_host_test(self, ht_name, ht_object):
if ht_name not in self.HOST_TESTS:
self.HOST_TESTS[ht_name] = ht_object
def unregister_host_test(self):
if ht_name in HOST_TESTS:
self.HOST_TESTS[ht_name] = None
def get_host_test(self, ht_name):
return self.HOST_TESTS[ht_name] if ht_name in self.HOST_TESTS else None
def is_host_test(self, ht_name):
return ht_name in self.HOST_TESTS

View File

@ -1,427 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
# Check if 'serial' module is installed
try:
from serial import Serial
except ImportError as e:
print("Error: Can't import 'serial' module: %s"% e)
exit(-1)
import os
import re
import types
from sys import stdout
from time import sleep, time
from optparse import OptionParser
from . import host_tests_plugins
# This is a little tricky. We need to add upper directory to path so
# we can find packages we want from the same level as other files do
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
from tools.test_api import get_autodetected_MUTS_list
from tools.test_api import get_module_avail
class Mbed(object):
""" Base class for a host driven test
"""
def __init__(self):
parser = OptionParser()
parser.add_option("-m", "--micro",
dest="micro",
help="The target microcontroller",
metavar="MICRO")
parser.add_option("-p", "--port",
dest="port",
help="The serial port of the target mbed",
metavar="PORT")
parser.add_option("-d", "--disk",
dest="disk",
help="The target disk path",
metavar="DISK_PATH")
parser.add_option("-f", "--image-path",
dest="image_path",
help="Path with target's image",
metavar="IMAGE_PATH")
parser.add_option("-c", "--copy",
dest="copy_method",
help="Copy method selector",
metavar="COPY_METHOD")
parser.add_option("-C", "--program_cycle_s",
dest="program_cycle_s",
help="Program cycle sleep. Define how many seconds you want wait after copying bianry onto target",
type="float",
metavar="COPY_METHOD")
parser.add_option("-t", "--timeout",
dest="timeout",
help="Timeout",
metavar="TIMEOUT")
parser.add_option("-r", "--reset",
dest="forced_reset_type",
help="Forces different type of reset")
parser.add_option("-R", "--reset-timeout",
dest="forced_reset_timeout",
metavar="NUMBER",
type="int",
help="When forcing a reset using option -r you can set up after reset timeout in seconds")
parser.add_option('', '--auto',
dest='auto_detect',
metavar=False,
action="store_true",
help='Use mbed-ls module to detect all connected mbed devices')
(self.options, _) = parser.parse_args()
self.DEFAULT_RESET_TOUT = 0
self.DEFAULT_TOUT = 10
if self.options.port is None:
raise Exception("The serial port of the target mbed have to be provided as command line arguments")
# Options related to copy / reset mbed device
self.port = self.options.port
self.disk = self.options.disk
self.image_path = self.options.image_path.strip('"')
self.copy_method = self.options.copy_method
self.program_cycle_s = float(self.options.program_cycle_s)
self.serial = None
self.serial_baud = 9600
self.serial_timeout = 1
self.timeout = self.DEFAULT_TOUT if self.options.timeout is None else self.options.timeout
print('MBED: Instrumentation: "%s" and disk: "%s"' % (self.port, self.disk))
def init_serial_params(self, serial_baud=9600, serial_timeout=1):
""" Initialize port parameters.
This parameters will be used by self.init_serial() function to open serial port
"""
self.serial_baud = serial_baud
self.serial_timeout = serial_timeout
def init_serial(self, serial_baud=None, serial_timeout=None):
""" Initialize serial port.
Function will return error is port can't be opened or initialized
"""
# Overload serial port configuration from default to parameters' values if they are specified
serial_baud = serial_baud if serial_baud is not None else self.serial_baud
serial_timeout = serial_timeout if serial_timeout is not None else self.serial_timeout
if get_module_avail('mbed_lstools') and self.options.auto_detect:
# Ensure serial port is up-to-date (try to find it 60 times)
found = False
for i in range(0, 60):
print('Looking for %s with MBEDLS' % self.options.micro)
muts_list = get_autodetected_MUTS_list(platform_name_filter=[self.options.micro])
if 1 in muts_list:
mut = muts_list[1]
self.port = mut['port']
found = True
break
else:
sleep(3)
if not found:
return False
# Clear serial port
if self.serial:
self.serial.close()
self.serial = None
# We will pool for serial to be re-mounted if it was unmounted after device reset
result = self.pool_for_serial_init(serial_baud, serial_timeout) # Blocking
# Port can be opened
if result:
self.flush()
return result
def pool_for_serial_init(self, serial_baud, serial_timeout, pooling_loops=40, init_delay=0.5, loop_delay=0.25):
""" Functions pools for serial port readiness
"""
result = True
last_error = None
# This loop is used to check for serial port availability due to
# some delays and remounting when devices are being flashed with new software.
for i in range(pooling_loops):
sleep(loop_delay if i else init_delay)
try:
self.serial = Serial(self.port, baudrate=serial_baud, timeout=serial_timeout)
except Exception as e:
result = False
last_error = "MBED: %s"% str(e)
stdout.write('.')
stdout.flush()
else:
print("...port ready!")
result = True
break
if not result and last_error:
print(last_error)
return result
def set_serial_timeout(self, timeout):
""" Wraps self.mbed.serial object timeout property
"""
result = None
if self.serial:
self.serial.timeout = timeout
result = True
return result
def serial_read(self, count=1):
""" Wraps self.mbed.serial object read method
"""
result = None
if self.serial:
try:
result = self.serial.read(count)
except:
result = None
return result
def serial_readline(self, timeout=5):
""" Wraps self.mbed.serial object read method to read one line from serial port
"""
result = ''
start = time()
while (time() - start) < timeout:
if self.serial:
try:
c = self.serial.read(1)
result += c
except Exception as e:
print("MBED: %s"% str(e))
result = None
break
if c == '\n':
break
return result
def serial_write(self, write_buffer):
""" Wraps self.mbed.serial object write method
"""
result = None
if self.serial:
try:
result = self.serial.write(write_buffer)
except:
result = None
return result
def reset_timeout(self, timeout):
""" Timeout executed just after reset command is issued
"""
for n in range(0, timeout):
sleep(1)
def reset(self):
""" Calls proper reset plugin to do the job.
Please refer to host_test_plugins functionality
"""
# Flush serials to get only input after reset
self.flush()
if self.options.forced_reset_type:
result = host_tests_plugins.call_plugin('ResetMethod', self.options.forced_reset_type, disk=self.disk)
else:
result = host_tests_plugins.call_plugin('ResetMethod', 'default', serial=self.serial)
# Give time to wait for the image loading
reset_tout_s = self.options.forced_reset_timeout if self.options.forced_reset_timeout is not None else self.DEFAULT_RESET_TOUT
self.reset_timeout(reset_tout_s)
return result
def copy_image(self, image_path=None, disk=None, copy_method=None):
""" Closure for copy_image_raw() method.
Method which is actually copying image to mbed
"""
# Set closure environment
image_path = image_path if image_path is not None else self.image_path
disk = disk if disk is not None else self.disk
copy_method = copy_method if copy_method is not None else self.copy_method
# Call proper copy method
result = self.copy_image_raw(image_path, disk, copy_method)
return result
def copy_image_raw(self, image_path=None, disk=None, copy_method=None):
""" Copy file depending on method you want to use. Handles exception
and return code from shell copy commands.
"""
# image_path - Where is binary with target's firmware
if copy_method is not None:
# We override 'default' method with 'shell' method
if copy_method == 'default':
copy_method = 'shell'
else:
copy_method = 'shell'
result = host_tests_plugins.call_plugin('CopyMethod', copy_method, image_path=image_path, destination_disk=disk, program_cycle_s=self.program_cycle_s, target_mcu=self.options.micro)
return result;
def flush(self):
""" Flush serial ports
"""
result = False
if self.serial:
self.serial.flushInput()
self.serial.flushOutput()
result = True
return result
class HostTestResults(object):
""" Test results set by host tests
"""
def __init__(self):
self.RESULT_SUCCESS = 'success'
self.RESULT_FAILURE = 'failure'
self.RESULT_ERROR = 'error'
self.RESULT_IO_SERIAL = 'ioerr_serial'
self.RESULT_NO_IMAGE = 'no_image'
self.RESULT_IOERR_COPY = "ioerr_copy"
self.RESULT_PASSIVE = "passive"
self.RESULT_NOT_DETECTED = "not_detected"
self.RESULT_MBED_ASSERT = "mbed_assert"
import tools.host_tests as host_tests
class Test(HostTestResults):
""" Base class for host test's test runner
"""
# Select default host_test supervision (replaced after autodetection)
test_supervisor = host_tests.get_host_test("default")
def __init__(self):
self.mbed = Mbed()
def detect_test_config(self, verbose=False):
""" Detects test case configuration
"""
result = {}
while True:
line = self.mbed.serial_readline()
if "{start}" in line:
self.notify("HOST: Start test...")
break
else:
# Detect if this is property from TEST_ENV print
m = re.search('{([\w_]+);([\w\d\+ ]+)}}', line[:-1])
if m and len(m.groups()) == 2:
# This is most likely auto-detection property
result[m.group(1)] = m.group(2)
if verbose:
self.notify("HOST: Property '%s' = '%s'"% (m.group(1), m.group(2)))
else:
# We can check if this is TArget Id in mbed specific format
m2 = re.search('^([\$]+)([a-fA-F0-9]+)', line[:-1])
if m2 and len(m2.groups()) == 2:
if verbose:
target_id = m2.group(1) + m2.group(2)
self.notify("HOST: TargetID '%s'"% target_id)
self.notify(line[len(target_id):-1])
else:
self.notify("HOST: Unknown property: %s"% line.strip())
return result
def run(self):
""" Test runner for host test. This function will start executing
test and forward test result via serial port to test suite
"""
# Copy image to device
self.notify("HOST: Copy image onto target...")
result = self.mbed.copy_image()
if not result:
self.print_result(self.RESULT_IOERR_COPY)
# Initialize and open target's serial port (console)
self.notify("HOST: Initialize serial port...")
result = self.mbed.init_serial()
if not result:
self.print_result(self.RESULT_IO_SERIAL)
# Reset device
self.notify("HOST: Reset target...")
result = self.mbed.reset()
if not result:
self.print_result(self.RESULT_IO_SERIAL)
# Run test
try:
CONFIG = self.detect_test_config(verbose=True) # print CONFIG
if "host_test_name" in CONFIG:
if host_tests.is_host_test(CONFIG["host_test_name"]):
self.test_supervisor = host_tests.get_host_test(CONFIG["host_test_name"])
result = self.test_supervisor.test(self) #result = self.test()
if result is not None:
self.print_result(result)
else:
self.notify("HOST: Passive mode...")
except Exception as e:
print(str(e))
self.print_result(self.RESULT_ERROR)
def setup(self):
""" Setup and check if configuration for test is
correct. E.g. if serial port can be opened.
"""
result = True
if not self.mbed.serial:
result = False
self.print_result(self.RESULT_IO_SERIAL)
return result
def notify(self, message):
""" On screen notification function
"""
print(message)
stdout.flush()
def print_result(self, result):
""" Test result unified printing function
"""
self.notify("\r\n{{%s}}\r\n{{end}}" % result)
class DefaultTestSelector(Test):
""" Test class with serial port initialization
"""
def __init__(self):
HostTestResults.__init__(self)
Test.__init__(self)
if __name__ == '__main__':
DefaultTestSelector().run()

View File

@ -1,82 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
from . import host_test_registry
# This plugins provide 'flashing' methods to host test scripts
from . import module_copy_mbed
from . import module_copy_shell
from . import module_copy_silabs
try:
from . import module_copy_smart
except:
pass
#import module_copy_firefox
from . import module_copy_mps2
# Plugins used to reset certain platform
from . import module_reset_mbed
from . import module_reset_silabs
from . import module_reset_mps2
# Plugin registry instance
HOST_TEST_PLUGIN_REGISTRY = host_test_registry.HostTestRegistry()
# Static plugin registration
# Some plugins are commented out if they are not stable or not commonly used
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_mbed.load_plugin())
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_shell.load_plugin())
try:
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_smart.load_plugin())
except:
pass
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_mbed.load_plugin())
#HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_firefox.load_plugin())
# Extra platforms support
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_mps2.load_plugin())
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_mps2.load_plugin())
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_silabs.load_plugin())
HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_silabs.load_plugin())
# TODO: extend plugin loading to files with name module_*.py loaded ad-hoc
###############################################################################
# Functional interface for host test plugin registry
###############################################################################
def call_plugin(type, capability, *args, **kwargs):
""" Interface to call plugin registry functional way
"""
return HOST_TEST_PLUGIN_REGISTRY.call_plugin(type, capability, *args, **kwargs)
def get_plugin_caps(type):
""" Returns list of all capabilities for plugin family with the same type.
If there are no capabilities empty list is returned
"""
return HOST_TEST_PLUGIN_REGISTRY.get_plugin_caps(type)
def print_plugin_info():
""" Prints plugins' information in user friendly way
"""
print(HOST_TEST_PLUGIN_REGISTRY)

View File

@ -1,118 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
from os import access, F_OK
from sys import stdout
from time import sleep
from subprocess import call
class HostTestPluginBase:
""" Base class for all plug-ins used with host tests.
"""
###########################################################################
# Interface:
###########################################################################
###########################################################################
# Interface attributes defining plugin name, type etc.
###########################################################################
name = "HostTestPluginBase" # Plugin name, can be plugin class name
type = "BasePlugin" # Plugin type: ResetMethod, Copymethod etc.
capabilities = [] # Capabilities names: what plugin can achieve
# (e.g. reset using some external command line tool)
stable = False # Determine if plugin is stable and can be used
###########################################################################
# Interface methods
###########################################################################
def setup(self, *args, **kwargs):
""" Configure plugin, this function should be called before plugin execute() method is used.
"""
return False
def execute(self, capabilitity, *args, **kwargs):
""" Executes capability by name.
Each capability e.g. may directly just call some command line
program or execute building pythonic function
"""
return False
###########################################################################
# Interface helper methods - overload only if you need to have custom behaviour
###########################################################################
def print_plugin_error(self, text):
""" Function prints error in console and exits always with False
"""
print("Plugin error: %s::%s: %s" % (self.name, self.type, text))
return False
def print_plugin_info(self, text, NL=True):
""" Function prints notification in console and exits always with True
"""
print("Plugin info: %s::%s: %s"% (self.name, self.type, text))
return True
def print_plugin_char(self, char):
""" Function prints char on stdout
"""
stdout.write(char)
stdout.flush()
return True
def check_mount_point_ready(self, destination_disk, init_delay=0.2, loop_delay=0.25):
""" Checks if destination_disk is ready and can be accessed by e.g. copy commands
@init_delay - Initial delay time before first access check
@loop_delay - pooling delay for access check
"""
if not access(destination_disk, F_OK):
self.print_plugin_info("Waiting for mount point '%s' to be ready..."% destination_disk, NL=False)
sleep(init_delay)
while not access(destination_disk, F_OK):
sleep(loop_delay)
self.print_plugin_char('.')
def check_parameters(self, capabilitity, *args, **kwargs):
""" This function should be ran each time we call execute()
to check if none of the required parameters is missing.
"""
missing_parameters = []
for parameter in self.required_parameters:
if parameter not in kwargs:
missing_parameters.append(parameter)
if len(missing_parameters) > 0:
self.print_plugin_error("execute parameter(s) '%s' missing!"% (', '.join(parameter)))
return False
return True
def run_command(self, cmd, shell=True):
""" Runs command from command line.
"""
result = True
ret = 0
try:
ret = call(cmd, shell=shell)
if ret:
self.print_plugin_error("[ret=%d] Command: %s"% (int(ret), cmd))
return False
except Exception as e:
result = False
self.print_plugin_error("[ret=%d] Command: %s"% (int(ret), cmd))
self.print_plugin_error(str(e))
return result

View File

@ -1,91 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
class HostTestRegistry:
""" Simple class used to register and store
host test plugins for further usage
"""
# Here we actually store all the plugins
PLUGINS = {} # 'Plugin Name' : Plugin Object
def print_error(self, text):
print("Plugin load failed. Reason: %s" % text)
def register_plugin(self, plugin):
""" Registers and stores plugin inside registry for further use.
Method also calls plugin's setup() function to configure plugin if needed.
Note: Different groups of plugins may demand different extra parameter. Plugins
should be at least for one type of plugin configured with the same parameters
because we do not know which of them will actually use particular parameter.
"""
# TODO:
# - check for unique caps for specified type
if plugin.name not in self.PLUGINS:
if plugin.setup(): # Setup plugin can be completed without errors
self.PLUGINS[plugin.name] = plugin
return True
else:
self.print_error("%s setup failed"% plugin.name)
else:
self.print_error("%s already loaded"% plugin.name)
return False
def call_plugin(self, type, capability, *args, **kwargs):
""" Execute plugin functionality respectively to its purpose
"""
for plugin_name in self.PLUGINS:
plugin = self.PLUGINS[plugin_name]
if plugin.type == type and capability in plugin.capabilities:
return plugin.execute(capability, *args, **kwargs)
return False
def get_plugin_caps(self, type):
""" Returns list of all capabilities for plugin family with the same type.
If there are no capabilities empty list is returned
"""
result = []
for plugin_name in self.PLUGINS:
plugin = self.PLUGINS[plugin_name]
if plugin.type == type:
result.extend(plugin.capabilities)
return sorted(result)
def load_plugin(self, name):
""" Used to load module from
"""
mod = __import__("module_%s"% name)
return mod
def __str__(self):
""" User friendly printing method to show hooked plugins
"""
from prettytable import PrettyTable
column_names = ['name', 'type', 'capabilities', 'stable']
pt = PrettyTable(column_names)
for column in column_names:
pt.align[column] = 'l'
for plugin_name in sorted(self.PLUGINS.keys()):
name = self.PLUGINS[plugin_name].name
type = self.PLUGINS[plugin_name].type
stable = self.PLUGINS[plugin_name].stable
capabilities = ', '.join(self.PLUGINS[plugin_name].capabilities)
row = [name, type, capabilities, stable]
pt.add_row(row)
return pt.get_string()

View File

@ -1,77 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from os.path import join, basename
from host_test_plugins import HostTestPluginBase
class HostTestPluginCopyMethod_Firefox(HostTestPluginBase):
def file_store_firefox(self, file_path, dest_disk):
try:
from selenium import webdriver
profile = webdriver.FirefoxProfile()
profile.set_preference('browser.download.folderList', 2) # custom location
profile.set_preference('browser.download.manager.showWhenStarting', False)
profile.set_preference('browser.download.dir', dest_disk)
profile.set_preference('browser.helperApps.neverAsk.saveToDisk', 'application/octet-stream')
# Launch browser with profile and get file
browser = webdriver.Firefox(profile)
browser.get(file_path)
browser.close()
except:
return False
return True
# Plugin interface
name = 'HostTestPluginCopyMethod_Firefox'
type = 'CopyMethod'
capabilities = ['firefox']
required_parameters = ['image_path', 'destination_disk']
def setup(self, *args, **kwargs):
""" Configure plugin, this function should be called before plugin execute() method is used.
"""
try:
from selenium import webdriver
except ImportError, e:
self.print_plugin_error("Error: firefox copy method requires selenium library. %s"% e)
return False
return True
def execute(self, capabilitity, *args, **kwargs):
""" Executes capability by name.
Each capability may directly just call some command line
program or execute building pythonic function
"""
result = False
if self.check_parameters(capabilitity, *args, **kwargs) is True:
image_path = kwargs['image_path']
destination_disk = kwargs['destination_disk']
# Prepare correct command line parameter values
image_base_name = basename(image_path)
destination_path = join(destination_disk, image_base_name)
if capabilitity == 'firefox':
self.file_store_firefox(image_path, destination_path)
return result
def load_plugin():
""" Returns plugin available in this module
"""
return HostTestPluginCopyMethod_Firefox()

View File

@ -1,80 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
from shutil import copy
from .host_test_plugins import HostTestPluginBase
from time import sleep
class HostTestPluginCopyMethod_Mbed(HostTestPluginBase):
def generic_mbed_copy(self, image_path, destination_disk):
""" Generic mbed copy method for "mbed enabled" devices.
It uses standard python shuitl function to copy
image_file (target specific binary) to device's disk.
"""
result = True
if not destination_disk.endswith('/') and not destination_disk.endswith('\\'):
destination_disk += '/'
try:
copy(image_path, destination_disk)
except Exception as e:
self.print_plugin_error("shutil.copy('%s', '%s')"% (image_path, destination_disk))
self.print_plugin_error("Error: %s"% str(e))
result = False
return result
# Plugin interface
name = 'HostTestPluginCopyMethod_Mbed'
type = 'CopyMethod'
stable = True
capabilities = ['shutil', 'default']
required_parameters = ['image_path', 'destination_disk', 'program_cycle_s']
def setup(self, *args, **kwargs):
""" Configure plugin, this function should be called before plugin execute() method is used.
"""
return True
def execute(self, capability, *args, **kwargs):
""" Executes capability by name.
Each capability may directly just call some command line
program or execute building pythonic function
"""
result = False
if self.check_parameters(capability, *args, **kwargs) is True:
# Capability 'default' is a dummy capability
if capability == 'shutil':
image_path = kwargs['image_path']
destination_disk = kwargs['destination_disk']
program_cycle_s = kwargs['program_cycle_s']
# Wait for mount point to be ready
self.check_mount_point_ready(destination_disk) # Blocking
result = self.generic_mbed_copy(image_path, destination_disk)
# Allow mbed to cycle
sleep(program_cycle_s)
return result
def load_plugin():
""" Returns plugin available in this module
"""
return HostTestPluginCopyMethod_Mbed()

View File

@ -1,152 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
import re
import os, shutil
from os.path import join
from time import sleep
from .host_test_plugins import HostTestPluginBase
class HostTestPluginCopyMethod_MPS2(HostTestPluginBase):
# MPS2 specific flashing / binary setup funcitons
def mps2_set_board_image_file(self, disk, images_cfg_path, image0file_path, image_name='images.txt'):
""" This function will alter image cfg file.
Main goal of this function is to change number of images to 1, comment all
existing image entries and append at the end of file new entry with test path.
@return True when all steps succeed.
"""
MBED_SDK_TEST_STAMP = 'test suite entry'
image_path = join(disk, images_cfg_path, image_name)
new_file_lines = [] # New configuration file lines (entries)
# Check each line of the image configuration file
try:
with open(image_path, 'r') as file:
for line in file:
if re.search('^TOTALIMAGES', line):
# Check number of total images, should be 1
new_file_lines.append(re.sub('^TOTALIMAGES:[\t ]*[\d]+', 'TOTALIMAGES: 1', line))
elif re.search('; - %s[\n\r]*$'% MBED_SDK_TEST_STAMP, line):
# Look for test suite entries and remove them
pass # Omit all test suite entries
elif re.search('^IMAGE[\d]+FILE', line):
# Check all image entries and mark the ';'
new_file_lines.append(';' + line) # Comment non test suite lines
else:
# Append line to new file
new_file_lines.append(line)
except IOError as e:
return False
# Add new image entry with proper commented stamp
new_file_lines.append('IMAGE0FILE: %s ; - %s\r\n'% (image0file_path, MBED_SDK_TEST_STAMP))
# Write all lines to file
try:
with open(image_path, 'w') as file:
for line in new_file_lines:
file.write(line),
except IOError:
return False
return True
def mps2_select_core(self, disk, mobo_config_name=""):
""" Function selects actual core
"""
# TODO: implement core selection
pass
def mps2_switch_usb_auto_mounting_after_restart(self, disk, usb_config_name=""):
""" Function alters configuration to allow USB MSD to be mounted after restarts
"""
# TODO: implement USB MSD restart detection
pass
def copy_file(self, file, disk):
if not file:
return
_, ext = os.path.splitext(file)
ext = ext.lower()
dfile = disk + "/SOFTWARE/mbed" + ext
if os.path.isfile(dfile):
print('Remove old binary %s' % dfile)
os.remove(dfile)
shutil.copy(file, dfile)
return True
def touch_file(self, file):
""" Touch file and set timestamp to items
"""
tfile = file+'.tmp'
fhandle = open(tfile, 'a')
try:
fhandle.close()
finally:
os.rename(tfile, file)
return True
# Plugin interface
name = 'HostTestPluginCopyMethod_MPS2'
type = 'CopyMethod'
capabilities = ['mps2-copy']
required_parameters = ['image_path', 'destination_disk']
def setup(self, *args, **kwargs):
""" Configure plugin, this function should be called before plugin execute() method is used.
"""
return True
def execute(self, capabilitity, *args, **kwargs):
""" Executes capability by name.
Each capability may directly just call some command line
program or execute building pythonic function
"""
result = False
if self.check_parameters(capabilitity, *args, **kwargs) is True:
file = kwargs['image_path']
disk = kwargs['destination_disk']
""" Add a delay in case there a test just finished
Prevents interface firmware hiccups
"""
sleep(20)
if capabilitity == 'mps2-copy' and self.copy_file(file, disk):
sleep(3)
if self.touch_file(disk + 'reboot.txt'):
""" Add a delay after the board was rebooted.
The actual reboot time is 20 seconds, but using 15 seconds
allows us to open the COM port and save a board reset.
This also prevents interface firmware hiccups.
"""
sleep(7)
result = True
return result
def load_plugin():
""" Returns plugin available in this module
"""
return HostTestPluginCopyMethod_MPS2()

View File

@ -1,76 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
import os
from os.path import join, basename
from time import sleep
from .host_test_plugins import HostTestPluginBase
class HostTestPluginCopyMethod_Shell(HostTestPluginBase):
# Plugin interface
name = 'HostTestPluginCopyMethod_Shell'
type = 'CopyMethod'
stable = True
capabilities = ['shell', 'cp', 'copy', 'xcopy']
required_parameters = ['image_path', 'destination_disk', 'program_cycle_s']
def setup(self, *args, **kwargs):
""" Configure plugin, this function should be called before plugin execute() method is used.
"""
return True
def execute(self, capability, *args, **kwargs):
""" Executes capability by name.
Each capability may directly just call some command line
program or execute building pythonic function
"""
result = False
if self.check_parameters(capability, *args, **kwargs) is True:
image_path = kwargs['image_path']
destination_disk = kwargs['destination_disk']
program_cycle_s = kwargs['program_cycle_s']
# Wait for mount point to be ready
self.check_mount_point_ready(destination_disk) # Blocking
# Prepare correct command line parameter values
image_base_name = basename(image_path)
destination_path = join(destination_disk, image_base_name)
if capability == 'shell':
if os.name == 'nt': capability = 'copy'
elif os.name == 'posix': capability = 'cp'
if capability == 'cp' or capability == 'copy' or capability == 'copy':
copy_method = capability
cmd = [copy_method, image_path, destination_path]
if os.name == 'posix':
result = self.run_command(cmd, shell=False)
result = self.run_command(["sync"])
else:
result = self.run_command(cmd)
# Allow mbed to cycle
sleep(program_cycle_s)
return result
def load_plugin():
""" Returns plugin available in this module
"""
return HostTestPluginCopyMethod_Shell()

View File

@ -1,69 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
from time import sleep
from .host_test_plugins import HostTestPluginBase
class HostTestPluginCopyMethod_Silabs(HostTestPluginBase):
# Plugin interface
name = 'HostTestPluginCopyMethod_Silabs'
type = 'CopyMethod'
capabilities = ['eACommander', 'eACommander-usb']
required_parameters = ['image_path', 'destination_disk', 'program_cycle_s']
def setup(self, *args, **kwargs):
""" Configure plugin, this function should be called before plugin execute() method is used.
"""
self.EACOMMANDER_CMD = 'eACommander.exe'
return True
def execute(self, capabilitity, *args, **kwargs):
""" Executes capability by name.
Each capability may directly just call some command line
program or execute building pythonic function
"""
result = False
if self.check_parameters(capabilitity, *args, **kwargs) is True:
image_path = kwargs['image_path']
destination_disk = kwargs['destination_disk']
program_cycle_s = kwargs['program_cycle_s']
if capabilitity == 'eACommander':
cmd = [self.EACOMMANDER_CMD,
'--serialno', destination_disk,
'--flash', image_path,
'--resettype', '2', '--reset']
result = self.run_command(cmd)
elif capabilitity == 'eACommander-usb':
cmd = [self.EACOMMANDER_CMD,
'--usb', destination_disk,
'--flash', image_path]
result = self.run_command(cmd)
# Allow mbed to cycle
sleep(program_cycle_s)
return result
def load_plugin():
""" Returns plugin available in this module
"""
return HostTestPluginCopyMethod_Silabs()

View File

@ -1,119 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import sys
from os.path import join, basename, exists, abspath, dirname
from time import sleep
from host_test_plugins import HostTestPluginBase
sys.path.append(abspath(join(dirname(__file__), "../../../")))
import tools.test_api
class HostTestPluginCopyMethod_Smart(HostTestPluginBase):
# Plugin interface
name = 'HostTestPluginCopyMethod_Smart'
type = 'CopyMethod'
stable = True
capabilities = ['smart']
required_parameters = ['image_path', 'destination_disk', 'target_mcu']
def setup(self, *args, **kwargs):
""" Configure plugin, this function should be called before plugin execute() method is used.
"""
return True
def execute(self, capability, *args, **kwargs):
""" Executes capability by name.
Each capability may directly just call some command line
program or execute building pythonic function
"""
result = False
if self.check_parameters(capability, *args, **kwargs) is True:
image_path = kwargs['image_path']
destination_disk = kwargs['destination_disk']
target_mcu = kwargs['target_mcu']
# Wait for mount point to be ready
self.check_mount_point_ready(destination_disk) # Blocking
# Prepare correct command line parameter values
image_base_name = basename(image_path)
destination_path = join(destination_disk, image_base_name)
if capability == 'smart':
if os.name == 'posix':
cmd = ['cp', image_path, destination_path]
result = self.run_command(cmd, shell=False)
cmd = ['sync']
result = self.run_command(cmd, shell=False)
elif os.name == 'nt':
cmd = ['copy', image_path, destination_path]
result = self.run_command(cmd, shell=True)
# Give the OS and filesystem time to settle down
sleep(3)
platform_name_filter = [target_mcu]
muts_list = {}
remount_complete = False
for i in range(0, 60):
print('Looking for %s with MBEDLS' % target_mcu)
muts_list = tools.test_api.get_autodetected_MUTS_list(platform_name_filter=platform_name_filter)
if 1 in muts_list:
mut = muts_list[1]
destination_disk = mut['disk']
destination_path = join(destination_disk, image_base_name)
if mut['mcu'] == 'LPC1768' or mut['mcu'] == 'LPC11U24':
if exists(destination_disk) and exists(destination_path):
remount_complete = True
break;
else:
if exists(destination_disk) and not exists(destination_path):
remount_complete = True
break;
sleep(1)
if remount_complete:
print('Remount complete')
else:
print('Remount FAILED')
if exists(destination_disk):
print('Disk exists')
else:
print('Disk does not exist')
if exists(destination_path):
print('Image exists')
else:
print('Image does not exist')
result = None
return result
def load_plugin():
""" Returns plugin available in this module
"""
return HostTestPluginCopyMethod_Smart()

View File

@ -1,74 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
from .host_test_plugins import HostTestPluginBase
class HostTestPluginResetMethod_Mbed(HostTestPluginBase):
def safe_sendBreak(self, serial):
""" Wraps serial.sendBreak() to avoid serial::serialposix.py exception on Linux
Traceback (most recent call last):
File "make.py", line 189, in <module>
serial.sendBreak()
File "/usr/lib/python2.7/dist-packages/serial/serialposix.py", line 511, in sendBreak
termios.tcsendbreak(self.fd, int(duration/0.25))
error: (32, 'Broken pipe')
"""
result = True
try:
serial.sendBreak()
except:
# In linux a termios.error is raised in sendBreak and in setBreak.
# The following setBreak() is needed to release the reset signal on the target mcu.
try:
serial.setBreak(False)
except:
result = False
return result
# Plugin interface
name = 'HostTestPluginResetMethod_Mbed'
type = 'ResetMethod'
stable = True
capabilities = ['default']
required_parameters = ['serial']
def setup(self, *args, **kwargs):
""" Configure plugin, this function should be called before plugin execute() method is used.
"""
return True
def execute(self, capabilitity, *args, **kwargs):
""" Executes capability by name.
Each capability may directly just call some command line
program or execute building pythonic function
"""
result = False
if self.check_parameters(capabilitity, *args, **kwargs) is True:
if capabilitity == 'default':
serial = kwargs['serial']
result = self.safe_sendBreak(serial)
return result
def load_plugin():
""" Returns plugin available in this module
"""
return HostTestPluginResetMethod_Mbed()

View File

@ -1,80 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
import os
from time import sleep
from .host_test_plugins import HostTestPluginBase
# Note: This plugin is not fully functional, needs improvements
class HostTestPluginResetMethod_MPS2(HostTestPluginBase):
""" Plugin used to reset ARM_MPS2 platform
Supports:
reboot.txt - startup from standby state, reboots when in run mode.
shutdown.txt - shutdown from run mode.
reset.txt - reset FPGA during run mode.
"""
def touch_file(self, file):
""" Touch file and set timestamp to items
"""
tfile = file+'.tmp'
fhandle = open(tfile, 'a')
try:
fhandle.close()
finally:
os.rename(tfile, file)
return True
# Plugin interface
name = 'HostTestPluginResetMethod_MPS2'
type = 'ResetMethod'
capabilities = ['mps2-reboot', 'mps2-reset']
required_parameters = ['disk']
def setup(self, *args, **kwargs):
""" Prepare / configure plugin to work.
This method can receive plugin specific parameters by kwargs and
ignore other parameters which may affect other plugins.
"""
return True
def execute(self, capabilitity, *args, **kwargs):
""" Executes capability by name.
Each capability may directly just call some command line
program or execute building pythonic function
"""
return True
result = False
if self.check_parameters(capabilitity, *args, **kwargs) is True:
disk = kwargs['disk']
if capabilitity == 'mps2-reboot' and self.touch_file(disk + 'reboot.txt'):
sleep(20)
result = True
elif capabilitity == 'mps2-reset' and self.touch_file(disk + 'reboot.txt'):
sleep(20)
result = True
return result
def load_plugin():
""" Returns plugin available in this module
"""
return HostTestPluginResetMethod_MPS2()

View File

@ -1,68 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
from .host_test_plugins import HostTestPluginBase
class HostTestPluginResetMethod_SiLabs(HostTestPluginBase):
# Plugin interface
name = 'HostTestPluginResetMethod_SiLabs'
type = 'ResetMethod'
stable = True
capabilities = ['eACommander', 'eACommander-usb']
required_parameters = ['disk']
def setup(self, *args, **kwargs):
""" Configure plugin, this function should be called before plugin execute() method is used.
"""
# Note you need to have eACommander.exe on your system path!
self.EACOMMANDER_CMD = 'eACommander.exe'
return True
def execute(self, capabilitity, *args, **kwargs):
""" Executes capability by name.
Each capability may directly just call some command line
program or execute building pythonic function
"""
result = False
if self.check_parameters(capabilitity, *args, **kwargs) is True:
disk = kwargs['disk'].rstrip('/\\')
if capabilitity == 'eACommander':
# For this copy method 'disk' will be 'serialno' for eACommander command line parameters
# Note: Commands are executed in the order they are specified on the command line
cmd = [self.EACOMMANDER_CMD,
'--serialno', disk,
'--resettype', '2', '--reset',]
result = self.run_command(cmd)
elif capabilitity == 'eACommander-usb':
# For this copy method 'disk' will be 'usb address' for eACommander command line parameters
# Note: Commands are executed in the order they are specified on the command line
cmd = [self.EACOMMANDER_CMD,
'--usb', disk,
'--resettype', '2', '--reset',]
result = self.run_command(cmd)
return result
def load_plugin():
""" Returns plugin available in this module
"""
return HostTestPluginResetMethod_SiLabs()

View File

@ -1,227 +0,0 @@
# mbedRPC.py - mbed RPC interface for Python
#
##Copyright (c) 2010 ARM Ltd
##
##Permission is hereby granted, free of charge, to any person obtaining a copy
##of this software and associated documentation files (the "Software"), to deal
##in the Software without restriction, including without limitation the rights
##to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
##copies of the Software, and to permit persons to whom the Software is
##furnished to do so, subject to the following conditions:
##
##The above copyright notice and this permission notice shall be included in
##all copies or substantial portions of the Software.
##
##THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
##IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
##FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
##AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
##LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
##OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
##THE SOFTWARE.
#
# Example:
# >from mbedRPC import*
# >mbed = SerialRPC("COM5",9600)
# >myled = DigitalOut(mbed,"myled") <--- Where the text in quotations matches your RPC pin definition's second parameter, in this case it could be RpcDigitalOut myled(LED1,"myled");
# >myled.write(1)
# >
from future import standard_library
standard_library.install_aliases()
import serial, urllib.request, time
# mbed super class
class mbed(object):
def __init__(self):
print("This will work as a demo but no transport mechanism has been selected")
def rpc(self, name, method, args):
print("Superclass method not overridden")
# Transport mechanisms, derived from mbed
class SerialRPC(mbed):
def __init__(self, port, baud):
self.ser = serial.Serial(port)
self.ser.setBaudrate(baud)
def rpc(self, name, method, args):
# creates the command to be sent serially - /name/method arg1 arg2 arg3 ... argN
str = "/" + name + "/" + method + " " + " ".join(args) + "\n"
# prints the command being executed
print(str)
# writes the command to serial
self.ser.write(str)
# strips trailing characters from the line just written
ret_val = self.ser.readline().strip()
return ret_val
class HTTPRPC(mbed):
def __init__(self, ip):
self.host = "http://" + ip
def rpc(self, name, method, args):
response = urllib.request.urlopen(self.host + "/rpc/" + name + "/" + method + "%20" + "%20".join(args))
return response.read().strip()
# generic mbed interface super class
class mbed_interface(object):
# initialize an mbed interface with a transport mechanism and pin name
def __init__(self, this_mbed, mpin):
self.mbed = this_mbed
if isinstance(mpin, str):
self.name = mpin
def __del__(self):
r = self.mbed.rpc(self.name, "delete", [])
def new(self, class_name, name, pin1, pin2 = "", pin3 = ""):
args = [arg for arg in [pin1,pin2,pin3,name] if arg != ""]
r = self.mbed.rpc(class_name, "new", args)
# generic read
def read(self):
r = self.mbed.rpc(self.name, "read", [])
return int(r)
# for classes that need write functionality - inherits from the generic reading interface
class mbed_interface_write(mbed_interface):
def __init__(self, this_mbed, mpin):
mbed_interface.__init__(self, this_mbed, mpin)
# generic write
def write(self, value):
r = self.mbed.rpc(self.name, "write", [str(value)])
# mbed interfaces
class DigitalOut(mbed_interface_write):
def __init__(self, this_mbed, mpin):
mbed_interface_write.__init__(self, this_mbed, mpin)
class AnalogIn(mbed_interface):
def __init__(self, this_mbed, mpin):
mbed_interface.__init__(self, this_mbed, mpin)
def read_u16(self):
r = self.mbed.rpc(self.name, "read_u16", [])
return int(r)
class AnalogOut(mbed_interface_write):
def __init__(self, this_mbed, mpin):
mbed_interface_write.__init__(self, this_mbed, mpin)
def write_u16(self, value):
self.mbed.rpc(self.name, "write_u16", [str(value)])
def read(self):
r = self.mbed.rpc(self.name, "read", [])
return float(r)
class DigitalIn(mbed_interface):
def __init__(self, this_mbed, mpin):
mbed_interface.__init__(self, this_mbed, mpin)
class PwmOut(mbed_interface_write):
def __init__(self, this_mbed, mpin):
mbed_interface_write.__init__(self, this_mbed, mpin)
def read(self):
r = self.mbed.rpc(self.name, "read", [])
return r
def period(self, value):
self.mbed.rpc(self.name, "period", [str(value)])
def period_ms(self, value):
self.mbed.rpc(self.name, "period_ms", [str(value)])
def period_us(self, value):
self.mbed.rpc(self.name, "period_us", [str(value)])
def pulsewidth(self, value):
self.mbed.rpc(self.name, "pulsewidth", [str(value)])
def pulsewidth_ms(self, value):
self.mbed.rpc(self.name, "pulsewidth_ms", [str(value)])
def pulsewidth_us(self, value):
self.mbed.rpc(self.name, "pulsewidth_us", [str(value)])
class RPCFunction(mbed_interface):
def __init__(self, this_mbed, name):
mbed_interface.__init__(self, this_mbed, name)
def run(self, input):
r = self.mbed.rpc(self.name, "run", [input])
return r
class RPCVariable(mbed_interface_write):
def __init__(self, this_mbed, name):
mbed_interface_write.__init__(self, this_mbed, name)
def read(self):
r = self.mbed.rpc(self.name, "read", [])
return r
class Timer(mbed_interface):
def __init__(self, this_mbed, name):
mbed_interface.__init__(self, this_mbed, name)
def start(self):
r = self.mbed.rpc(self.name, "start", [])
def stop(self):
r = self.mbed.rpc(self.name, "stop", [])
def reset(self):
r = self.mbed.rpc(self.name, "reset", [])
def read(self):
r = self.mbed.rpc(self.name, "read", [])
return float(re.search('\d+\.*\d*', r).group(0))
def read_ms(self):
r = self.mbed.rpc(self.name, "read_ms", [])
return float(re.search('\d+\.*\d*', r).group(0))
def read_us(self):
r = self.mbed.rpc(self.name, "read_us", [])
return float(re.search('\d+\.*\d*', r).group(0))
# Serial
class Serial(object):
def __init__(self, this_mbed, tx, rx=""):
self.mbed = this_mbed
if isinstance(tx, str):
self.name = tx
def __del__(self):
r = self.mbed.rpc(self.name, "delete", [])
def baud(self, value):
r = self.mbed.rpc(self.name, "baud", [str(value)])
def putc(self, value):
r = self.mbed.rpc(self.name, "putc", [str(value)])
def puts(self, value):
r = self.mbed.rpc(self.name, "puts", ["\"" + str(value) + "\""])
def getc(self):
r = self.mbed.rpc(self.name, "getc", [])
return int(r)
def wait(s):
time.sleep(s)

View File

@ -1,90 +0,0 @@
"""
Copyright (c) 2016 ARM Limited. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
import sys
import re
import time
import mido
from mido import Message
def test_midi_in(port):
expected_messages_count=0
while expected_messages_count < 7:
for message in port.iter_pending():
if message.type in ('note_on', 'note_off', 'program_change', 'sysex'):
yield message
expected_messages_count+=1
time.sleep(0.1)
def test_midi_loopback(input_port):
expected_messages_count=0
while expected_messages_count < 1:
for message in input_port.iter_pending():
print('Test MIDI OUT loopback received {}'.format(message.hex()))
expected_messages_count+=1
def test_midi_out_loopback(output_port,input_port):
print("Test MIDI OUT loopback")
output_port.send(Message('program_change', program=1))
test_midi_loopback(input_port)
output_port.send(Message('note_on', note=21))
test_midi_loopback(input_port)
output_port.send(Message('note_off', note=21))
test_midi_loopback(input_port)
output_port.send(Message('sysex', data=[0x7E,0x7F,0x09,0x01]))
test_midi_loopback(input_port)
output_port.send(Message('sysex', data=[0x7F,0x7F,0x04,0x01,0x7F,0x7F]))
test_midi_loopback(input_port)
output_port.send(Message('sysex', data=[0x41,0x10,0x42,0x12,0x40,0x00,0x7F,0x00,0x41]))
test_midi_loopback(input_port)
output_port.send(Message('sysex', data=[0x41,0x10,0x42,0x12,0x40,0x00,0x04,0x7F,0x3D]))
test_midi_loopback(input_port)
portname=""
while portname=="":
print("Wait for MIDI IN plug ...")
for name in mido.get_input_names():
matchObj = re.match( r'Mbed', name)
if matchObj:
portname=name
time.sleep( 1 )
try:
input_port = mido.open_input(portname)
output_port = mido.open_output(portname)
print('Using {}'.format(input_port))
print("Test MIDI IN")
for message in test_midi_in(input_port):
print('Test MIDI IN received {}'.format(message.hex()))
test_midi_out_loopback(output_port,input_port)
except KeyboardInterrupt:
pass

View File

@ -1,28 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from .host_test import Test, Simple
from sys import stdout
class NETTest(Simple):
def __init__(self):
Test.__init__(self)
self.mbed.init_serial(115200)
self.mbed.reset()
if __name__ == '__main__':
NETTest().run()

View File

@ -1,57 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from .host_test import Test
from .mbedrpc import SerialRPC, DigitalOut, DigitalIn, pin
class RpcTest(Test):
def test(self):
self.notify("RPC Test")
s = SerialRPC(self.mbed.port, debug=True)
self.notify("Init remote objects")
p_out = pin("p10")
p_in = pin("p11")
if hasattr(self.mbed.options, 'micro'):
if self.mbed.options.micro == 'M0+':
print("Freedom Board: PTA12 <-> PTC4")
p_out = pin("PTA12")
p_in = pin("PTC4")
self.output = DigitalOut(s, p_out);
self.input = DigitalIn(s, p_in);
self.check = True
self.write_read_test(1)
self.write_read_test(0)
return self.check
def write_read_test(self, v):
self.notify("Check %d" % v)
self.output.write(v)
if self.input.read() != v:
self.notify("ERROR")
self.check = False
else:
self.notify("OK")
if __name__ == '__main__':
RpcTest().run()

View File

@ -1,51 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import re
from time import time, strftime, gmtime
class RTCTest(object):
PATTERN_RTC_VALUE = "\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]"
re_detect_rtc_value = re.compile(PATTERN_RTC_VALUE)
def test(self, selftest):
test_result = True
start = time()
sec_prev = 0
for i in range(0, 5):
# Timeout changed from default: we need to wait longer for some boards to start-up
c = selftest.mbed.serial_readline(timeout=10)
if c is None:
return selftest.RESULT_IO_SERIAL
selftest.notify(c.strip())
delta = time() - start
m = self.re_detect_rtc_value.search(c)
if m and len(m.groups()):
sec = int(m.groups()[0])
time_str = m.groups()[1]
correct_time_str = strftime("%Y-%m-%d %H:%M:%S %p", gmtime(float(sec)))
single_result = time_str == correct_time_str and sec > 0 and sec > sec_prev
test_result = test_result and single_result
result_msg = "OK" if single_result else "FAIL"
selftest.notify("HOST: [%s] [%s] received time %+d sec after %.2f sec... %s"% (sec, time_str, sec - sec_prev, delta, result_msg))
sec_prev = sec
else:
test_result = False
break
start = time()
return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE

View File

@ -1,44 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys
import uuid
import time
import string
from sys import stdout
class SerialCompleteTest(object):
def test(self, selftest):
strip_chars = string.whitespace + "\0"
out_str = selftest.mbed.serial_readline()
selftest.notify("HOST: " + out_str)
if not out_str:
selftest.notify("HOST: No output detected")
return selftest.RESULT_IO_SERIAL
out_str_stripped = out_str.strip(strip_chars)
if out_str_stripped != "123456789":
selftest.notify("HOST: Unexpected output. '123456789' Expected. but received '%s'" % out_str_stripped)
return selftest.RESULT_FAILURE
else:
return selftest.RESULT_SUCCESS

View File

@ -1,88 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys
import uuid
import time
import string
from sys import stdout
class SerialNCRXTest(object):
def test(self, selftest):
selftest.mbed.flush();
# Wait 0.5 seconds to ensure mbed is listening
time.sleep(0.5)
#handshake with target to sync test start
selftest.mbed.serial_write("S");
strip_chars = string.whitespace + "\0"
out_str = selftest.mbed.serial_readline()
if not out_str:
selftest.notify("HOST: No output detected")
return selftest.RESULT_IO_SERIAL
out_str_stripped = out_str.strip(strip_chars)
if out_str_stripped != "RX OK - Start NC test":
selftest.notify("HOST: Unexpected output. Expected 'RX OK - Expected' but received '%s'" % out_str_stripped)
return selftest.RESULT_FAILURE
# Wait 0.5 seconds to ensure mbed is listening
time.sleep(0.5)
selftest.mbed.serial_write("E");
strip_chars = string.whitespace + "\0"
out_str = selftest.mbed.serial_readline()
if not out_str:
selftest.notify("HOST: No output detected")
return selftest.RESULT_IO_SERIAL
out_str_stripped = out_str.strip(strip_chars)
if out_str_stripped != "RX OK - Expected":
selftest.notify("HOST: Unexpected output. Expected 'RX OK - Expected' but received '%s'" % out_str_stripped)
return selftest.RESULT_FAILURE
# Wait 0.5 seconds to ensure mbed is listening
time.sleep(0.5)
# Send character, mbed shouldn't receive
selftest.mbed.serial_write("U");
out_str = selftest.mbed.serial_readline()
# If no characters received, pass the test
if not out_str:
selftest.notify("HOST: No further output detected")
return selftest.RESULT_SUCCESS
else:
out_str_stripped = out_str.strip(strip_chars)
if out_str_stripped == "RX OK - Unexpected":
selftest.notify("HOST: Unexpected output returned indicating RX still functioning")
else:
selftest.notify("HOST: Extraneous output '%s' detected indicating unknown error" % out_str_stripped)
return selftest.RESULT_FAILURE

View File

@ -1,63 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys
import uuid
import time
import string
from sys import stdout
class SerialNCTXTest(object):
def test(self, selftest):
selftest.mbed.flush();
# Wait 0.5 seconds to ensure mbed is listening
time.sleep(0.5)
selftest.mbed.serial_write("S");
strip_chars = string.whitespace + "\0"
out_str = selftest.mbed.serial_readline()
selftest.notify("HOST: " + out_str)
if not out_str:
selftest.notify("HOST: No output detected")
return selftest.RESULT_IO_SERIAL
out_str_stripped = out_str.strip(strip_chars)
if out_str_stripped != "TX OK - Expected":
selftest.notify("HOST: Unexpected output. Expected 'TX OK - Expected' but received '%s'" % out_str_stripped)
return selftest.RESULT_FAILURE
out_str = selftest.mbed.serial_readline()
# If no characters received, pass the test
if not out_str:
selftest.notify("HOST: No further output detected")
return selftest.RESULT_SUCCESS
else:
out_str_stripped = out_str.strip(strip_chars)
if out_str_stripped == "TX OK - Unexpected":
selftest.notify("HOST: Unexpected output returned indicating TX still functioning")
else:
selftest.notify("HOST: Extraneous output '%s' detected indicating unknown error" % out_str_stripped)
return selftest.RESULT_FAILURE

View File

@ -1,57 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import re
import random
from time import time
class StdioTest(object):
PATTERN_INT_VALUE = "Your value was: (-?\d+)"
re_detect_int_value = re.compile(PATTERN_INT_VALUE)
def test(self, selftest):
test_result = True
c = selftest.mbed.serial_readline() # {{start}} preamble
if c is None:
return selftest.RESULT_IO_SERIAL
selftest.notify(c)
for i in range(0, 10):
random_integer = random.randint(-99999, 99999)
selftest.notify("HOST: Generated number: " + str(random_integer))
start = time()
selftest.mbed.serial_write(str(random_integer) + "\n")
serial_stdio_msg = selftest.mbed.serial_readline()
if serial_stdio_msg is None:
return selftest.RESULT_IO_SERIAL
delay_time = time() - start
selftest.notify(serial_stdio_msg.strip())
# Searching for reply with scanned values
m = self.re_detect_int_value.search(serial_stdio_msg)
if m and len(m.groups()):
int_value = m.groups()[0]
int_value_cmp = random_integer == int(int_value)
test_result = test_result and int_value_cmp
selftest.notify("HOST: Number %s read after %.3f sec ... [%s]"% (int_value, delay_time, "OK" if int_value_cmp else "FAIL"))
else:
test_result = False
break
return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE

View File

@ -1,58 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import socket
import string, random
from time import time
from mbed_settings import SERVER_ADDRESS
ECHO_PORT = 7
LEN_PACKET = 127
N_PACKETS = 5000
TOT_BITS = float(LEN_PACKET * N_PACKETS * 8) * 2
MEGA = float(1024 * 1024)
UPDATE_STEP = (N_PACKETS // 10)
class TCP_EchoClient(object):
def __init__(self, host):
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect((host, ECHO_PORT))
self.packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET))
def __packet(self):
# Comment out the checks when measuring the throughput
# self.packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET))
self.s.send(self.packet)
data = self.s.recv(LEN_PACKET)
# assert self.packet == data, "packet error:\n%s\n%s\n" % (self.packet, data)
def test(self):
start = time()
for i in range(N_PACKETS):
if (i % UPDATE_STEP) == 0: print('%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.))
self.__packet()
t = time() - start
print('Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA))
def __del__(self):
self.s.close()
while True:
e = TCP_EchoClient(SERVER_ADDRESS)
e.test()

View File

@ -1,93 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
import sys
import socket
from sys import stdout
try:
from SocketServer import BaseRequestHandler, TCPServer
except ImportError:
from socketserver import BaseRequestHandler, TCPServer
class TCPEchoClient_Handler(BaseRequestHandler):
def handle(self):
""" One handle per connection
"""
print("HOST: Connection received...")
count = 1;
while True:
data = self.request.recv(1024)
if not data: break
self.request.sendall(data)
if '{{end}}' in str(data):
print()
print(str(data))
else:
if not count % 10:
sys.stdout.write('.')
count += 1
stdout.flush()
class TCPEchoClientTest(object):
def send_server_ip_port(self, selftest, ip_address, port_no):
""" Set up network host. Reset target and and send server IP via serial to Mbed
"""
c = selftest.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...'
if c is None:
self.print_result(selftest.RESULT_IO_SERIAL)
return
selftest.notify(c.strip())
selftest.notify("HOST: Sending server IP Address to target...")
connection_str = ip_address + ":" + str(port_no) + "\n"
selftest.mbed.serial_write(connection_str)
selftest.notify(connection_str)
# Two more strings about connection should be sent by MBED
for i in range(0, 2):
c = selftest.mbed.serial_readline()
if c is None:
selftest.print_result(self.RESULT_IO_SERIAL)
return
selftest.notify(c.strip())
def test(self, selftest):
# We need to discover SERVEP_IP and set up SERVER_PORT
# Note: Port 7 is Echo Protocol:
#
# Port number rationale:
#
# The Echo Protocol is a service in the Internet Protocol Suite defined
# in RFC 862. It was originally proposed for testing and measurement
# of round-trip times[citation needed] in IP networks.
#
# A host may connect to a server that supports the Echo Protocol using
# the Transmission Control Protocol (TCP) or the User Datagram Protocol
# (UDP) on the well-known port number 7. The server sends back an
# identical copy of the data it received.
SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
SERVER_PORT = 7
# Returning none will suppress host test from printing success code
server = TCPServer((SERVER_IP, SERVER_PORT), TCPEchoClient_Handler)
print("HOST: Listening for TCP connections: %s:%s" %
(SERVER_IP, str(SERVER_PORT)))
self.send_server_ip_port(selftest, SERVER_IP, SERVER_PORT)
server.serve_forever()

View File

@ -1,54 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
try:
from SocketServer import BaseRequestHandler, TCPServer
except ImportError:
from socketserver import BaseRequestHandler, TCPServer
from time import time
from mbed_settings import LOCALHOST
MAX_INDEX = 126
MEGA = float(1024 * 1024)
class TCP_EchoHandler(BaseRequestHandler):
def handle(self):
print("\nconnection received")
start = time()
bytes = 0
index = 0
while True:
data = self.request.recv(1024)
if not data: break
bytes += len(data)
for n in map(ord, data):
if n != index:
print("data error %d != %d" % (n , index))
index += 1
if index > MAX_INDEX:
index = 0
self.request.sendall(data)
t = time() - start
b = float(bytes * 8) * 2
print("Throughput: (%.2f)Mbits/s" % ((b/t)/MEGA))
server = TCPServer((LOCALHOST, 7), TCP_EchoHandler)
print("listening for connections")
server.serve_forever()

View File

@ -1,86 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
import re
import sys
import uuid
import socket
from sys import stdout
class TCPEchoServerTest(object):
ECHO_SERVER_ADDRESS = ""
ECHO_PORT = 0
ECHO_LOOPs = 100
s = None # Socket
PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
def test(self, selftest):
result = False
c = selftest.mbed.serial_readline()
if c is None:
return selftest.RESULT_IO_SERIAL
selftest.notify(c)
m = self.re_detect_server_ip.search(c)
if m and len(m.groups()):
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
selftest.notify("HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
# We assume this test fails so can't send 'error' message to server
try:
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect((self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
except Exception as e:
self.s = None
selftest.notify("HOST: Socket error: %s"% e)
return selftest.RESULT_ERROR
print('HOST: Sending %d echo strings...'% self.ECHO_LOOPs,)
for i in range(0, self.ECHO_LOOPs):
TEST_STRING = str(uuid.uuid4())
try:
self.s.sendall(TEST_STRING)
data = self.s.recv(128)
except Exception as e:
self.s = None
selftest.notify("HOST: Socket error: %s"% e)
return selftest.RESULT_ERROR
received_str = repr(data)[1:-1]
if TEST_STRING == received_str: # We need to cut not needed single quotes from the string
sys.stdout.write('.')
stdout.flush()
result = True
else:
print("Expected: ")
print("'%s'"% TEST_STRING)
print("received: ")
print("'%s'"% received_str)
result = False
break
if self.s is not None:
self.s.close()
else:
selftest.notify("HOST: TCP Server not found")
result = False
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE

View File

@ -1,45 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
# Be sure that the tools directory is in the search path
import sys
from os.path import join, abspath, dirname
ROOT = abspath(join(dirname(__file__), "..", ".."))
sys.path.insert(0, ROOT)
from mbed_settings import LOCALHOST
try:
from SocketServer import BaseRequestHandler, TCPServer
except ImportError:
from socketserver import BaseRequestHandler, TCPServer
class TCP_EchoHandler(BaseRequestHandler):
def handle(self):
print("\nHandle connection from:", self.client_address)
while True:
data = self.request.recv(1024)
if not data: break
self.request.sendall(data)
self.request.close()
print("socket closed")
if __name__ == '__main__':
server = TCPServer((LOCALHOST, 7), TCP_EchoHandler)
print("listening for connections on:", (LOCALHOST, 7))
server.serve_forever()

View File

@ -1,155 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
"""
How to use:
make.py -m LPC1768 -t ARM -d E:\ -n NET_14
udp_link_layer_auto.py -p COM20 -d E:\ -t 10
"""
import re
import uuid
import socket
try:
# Python 3
import _thread as thread
except ImportError:
# Python 2
import thread
from sys import stdout
from time import time, sleep
from .host_test import DefaultTest
try:
from SocketServer import BaseRequestHandler, UDPServer
except ImportError:
from socketserver import BaseRequestHandler, UDPServer
# Received datagrams (with time)
dict_udp_recv_datagrams = dict()
# Sent datagrams (with time)
dict_udp_sent_datagrams = dict()
class UDPEchoClient_Handler(BaseRequestHandler):
def handle(self):
""" One handle per connection
"""
_data, _socket = self.request
# Process received datagram
data_str = repr(_data)[1:-1]
dict_udp_recv_datagrams[data_str] = time()
def udp_packet_recv(threadName, server_ip, server_port):
""" This function will receive packet stream from mbed device
"""
server = UDPServer((server_ip, server_port), UDPEchoClient_Handler)
print("[UDP_COUNTER] Listening for connections... %s:%d"% (server_ip, server_port))
server.serve_forever()
class UDPEchoServerTest(DefaultTest):
ECHO_SERVER_ADDRESS = "" # UDP IP of datagram bursts
ECHO_PORT = 0 # UDP port for datagram bursts
CONTROL_PORT = 23 # TCP port used to get stats from mbed device, e.g. counters
s = None # Socket
TEST_PACKET_COUNT = 1000 # how many packets should be send
TEST_STRESS_FACTOR = 0.001 # stress factor: 10 ms
PACKET_SATURATION_RATIO = 29.9 # Acceptable packet transmission in %
PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
def get_control_data(self, command="stat\n"):
BUFFER_SIZE = 256
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.ECHO_SERVER_ADDRESS, self.CONTROL_PORT))
except Exception as e:
data = None
s.send(command)
data = s.recv(BUFFER_SIZE)
s.close()
return data
def test(self):
serial_ip_msg = self.mbed.serial_readline()
if serial_ip_msg is None:
return self.RESULT_IO_SERIAL
stdout.write(serial_ip_msg)
stdout.flush()
# Searching for IP address and port prompted by server
m = self.re_detect_server_ip.search(serial_ip_msg)
if m and len(m.groups()):
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
# Open client socket to burst datagrams to UDP server in mbed
try:
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
except Exception as e:
self.s = None
self.notify("HOST: Error: %s"% e)
return self.RESULT_ERROR
# UDP replied receiver works in background to get echoed datagrams
SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
SERVER_PORT = self.ECHO_PORT + 1
thread.start_new_thread(udp_packet_recv, ("Thread-udp-recv", SERVER_IP, SERVER_PORT))
sleep(0.5)
# Burst part
for no in range(self.TEST_PACKET_COUNT):
TEST_STRING = str(uuid.uuid4())
payload = str(no) + "__" + TEST_STRING
self.s.sendto(payload, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
dict_udp_sent_datagrams[payload] = time()
sleep(self.TEST_STRESS_FACTOR)
if self.s is not None:
self.s.close()
# Wait 5 seconds for packets to come
result = True
self.notify("HOST: Test Summary:")
for d in range(5):
sleep(1.0)
summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0
self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d,
summary_datagram_success,
len(dict_udp_recv_datagrams),
self.TEST_PACKET_COUNT,
self.TEST_STRESS_FACTOR))
result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO)
stdout.flush()
# Getting control data from test
self.notify("...")
self.notify("HOST: Mbed Summary:")
mbed_stats = self.get_control_data()
self.notify(mbed_stats)
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':
UDPEchoServerTest().run()

View File

@ -1,56 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from socket import socket, AF_INET, SOCK_DGRAM
import string, random
from time import time
from mbed_settings import CLIENT_ADDRESS
ECHO_PORT = 7
LEN_PACKET = 127
N_PACKETS = 5000
TOT_BITS = float(LEN_PACKET * N_PACKETS * 8) * 2
MEGA = float(1024 * 1024)
UPDATE_STEP = (N_PACKETS // 10)
class UDP_EchoClient(object):
s = socket(AF_INET, SOCK_DGRAM)
def __init__(self, host):
self.host = host
self.packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET))
def __packet(self):
# Comment out the checks when measuring the throughput
# packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET))
UDP_EchoClient.s.sendto(packet, (self.host, ECHO_PORT))
data = UDP_EchoClient.s.recv(LEN_PACKET)
# assert packet == data, "packet error:\n%s\n%s\n" % (packet, data)
def test(self):
start = time()
for i in range(N_PACKETS):
if (i % UPDATE_STEP) == 0: print('%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.))
self.__packet()
t = time() - start
print('Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA))
while True:
e = UDP_EchoClient(CLIENT_ADDRESS)
e.test()

View File

@ -1,81 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
import sys
import socket
from sys import stdout
try:
from SocketServer import BaseRequestHandler, UDPServer
except ImportError:
from socketserver import BaseRequestHandler, UDPServer
class UDPEchoClient_Handler(BaseRequestHandler):
def handle(self):
""" One handle per connection
"""
data, socket = self.request
socket.sendto(data, self.client_address)
if '{{end}}' in data:
print("\n%s" % data)
else:
sys.stdout.write('.')
stdout.flush()
class UDPEchoClientTest(object):
def send_server_ip_port(self, selftest, ip_address, port_no):
c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
if c is None:
selftest.print_result(selftest.RESULT_IO_SERIAL)
return
selftest.notify(c.strip())
selftest.notify("HOST: Sending server IP Address to target...")
connection_str = ip_address + ":" + str(port_no) + "\n"
selftest.mbed.serial_write(connection_str)
c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
if c is None:
self.print_result(selftest.RESULT_IO_SERIAL)
return
selftest.notify(c.strip())
return selftest.RESULT_PASSIVE
def test(self, selftest):
# We need to discover SERVEP_IP and set up SERVER_PORT
# Note: Port 7 is Echo Protocol:
#
# Port number rationale:
#
# The Echo Protocol is a service in the Internet Protocol Suite defined
# in RFC 862. It was originally proposed for testing and measurement
# of round-trip times[citation needed] in IP networks.
#
# A host may connect to a server that supports the Echo Protocol using
# the Transmission Control Protocol (TCP) or the User Datagram Protocol
# (UDP) on the well-known port number 7. The server sends back an
# identical copy of the data it received.
SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
SERVER_PORT = 7
# Returning none will suppress host test from printing success code
server = UDPServer((SERVER_IP, SERVER_PORT), UDPEchoClient_Handler)
print("HOST: Listening for UDP connections...")
self.send_server_ip_port(selftest, SERVER_IP, SERVER_PORT)
server.serve_forever()

View File

@ -1,34 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
try:
from SocketServer import BaseRequestHandler, UDPServer
except ImportError:
from socketserver import BaseRequestHandler, UDPServer
from mbed_settings import SERVER_ADDRESS
class UDP_EchoHandler(BaseRequestHandler):
def handle(self):
data, socket = self.request
print("client:", self.client_address)
print("data:", data)
socket.sendto(data, self.client_address)
server = UDPServer((SERVER_ADDRESS, 7195), UDP_EchoHandler)
print("listening for connections")
server.serve_forever()

View File

@ -1,70 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
import re
import sys
import uuid
from sys import stdout
from socket import socket, AF_INET, SOCK_DGRAM
class UDPEchoServerTest(object):
ECHO_SERVER_ADDRESS = ""
ECHO_PORT = 0
s = None # Socket
PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
def test(self, selftest):
result = True
serial_ip_msg = selftest.mbed.serial_readline()
if serial_ip_msg is None:
return selftest.RESULT_IO_SERIAL
selftest.notify(serial_ip_msg)
# Searching for IP address and port prompted by server
m = self.re_detect_server_ip.search(serial_ip_msg)
if m and len(m.groups()):
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
selftest.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
# We assume this test fails so can't send 'error' message to server
try:
self.s = socket(AF_INET, SOCK_DGRAM)
except Exception as e:
self.s = None
selftest.notify("HOST: Socket error: %s"% e)
return selftest.RESULT_ERROR
for i in range(0, 100):
TEST_STRING = str(uuid.uuid4())
self.s.sendto(TEST_STRING, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
data = self.s.recv(len(TEST_STRING))
received_str = repr(data)[1:-1]
if TEST_STRING != received_str:
result = False
break
sys.stdout.write('.')
stdout.flush()
else:
result = False
if self.s is not None:
self.s.close()
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE

View File

@ -1,70 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from time import time
class WaitusTest(object):
""" This test is reading single characters from stdio
and measures time between their occurrences.
"""
TICK_LOOP_COUNTER = 13
TICK_LOOP_SUCCESSFUL_COUNTS = 10
DEVIATION = 0.10 # +/-10%
def test(self, selftest):
test_result = True
# First character to start test (to know after reset when test starts)
if selftest.mbed.set_serial_timeout(None) is None:
return selftest.RESULT_IO_SERIAL
c = selftest.mbed.serial_read(1)
if c is None:
return selftest.RESULT_IO_SERIAL
if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6
#Read additional 39 bytes of TargetID
if selftest.mbed.serial_read(39) is None:
return selftest.RESULT_IO_SERIAL
c = selftest.mbed.serial_read(1) # Re-read first 'tick'
if c is None:
return selftest.RESULT_IO_SERIAL
start_serial_pool = time()
start = time()
success_counter = 0
for i in range(0, self.TICK_LOOP_COUNTER):
c = selftest.mbed.serial_read(1)
if c is None:
return selftest.RESULT_IO_SERIAL
delta = time() - start
deviation = abs(delta - 1)
# Round values
delta = round(delta, 2)
deviation = round(deviation, 2)
# Check if time measurements are in given range
deviation_ok = True if delta > 0 and deviation <= self.DEVIATION else False
success_counter = success_counter+1 if deviation_ok else 0
msg = "OK" if deviation_ok else "FAIL"
selftest.notify("%s in %.2f sec (%.2f) [%s]"% (c, delta, deviation, msg))
start = time()
if success_counter >= self.TICK_LOOP_SUCCESSFUL_COUNTS:
break
measurement_time = time() - start_serial_pool
selftest.notify("Consecutive OK timer reads: %d"% success_counter)
selftest.notify("Completed in %.2f sec" % (measurement_time))
test_result = True if success_counter >= self.TICK_LOOP_SUCCESSFUL_COUNTS else False
return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE

View File

@ -1,46 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys
import uuid
import time
from sys import stdout
class WFITest(object):
def test(self, selftest):
c = selftest.mbed.serial_readline()
if c == None:
selftest.notify("HOST: No output detected")
return selftest.RESULT_IO_SERIAL
if c.strip() != "0":
selftest.notify("HOST: Unexpected output. Expected '0' but received '%s'" % c.strip())
return selftest.RESULT_FAILURE
# Wait 10 seconds to allow serial prints (indicating failure)
selftest.mbed.set_serial_timeout(10)
# If no characters received, pass the test
if not selftest.mbed.serial_readline():
selftest.notify("HOST: No further output detected")
return selftest.RESULT_SUCCESS
else:
selftest.notify("HOST: Extra output detected")
return selftest.RESULT_FAILURE

View File

@ -71,8 +71,7 @@ from tools.utils import argparse_lowercase_type
from tools.utils import argparse_many
from tools.notifier.mock import MockNotifier
from tools.notifier.term import TerminalNotifier
import tools.host_tests.host_tests_plugins as host_tests_plugins
from mbed_host_tests import host_tests_plugins
try:
import mbed_lstools
@ -2168,7 +2167,7 @@ def build_tests(tests, base_source_paths, build_path, target, toolchain_name,
'toolchain_paths': TOOLCHAIN_PATHS,
'stats_depth': stats_depth,
'notify': MockNotifier(),
'coverage_patterns': coverage_patterns,
'coverage_patterns': coverage_patterns,
'resource_filter': resource_filter
}