mirror of https://github.com/ARMmbed/mbed-os.git
Merge pull request #719 from PrzemekWirkus/host-test-improvements-part-2
Tools: Host test plugin improvements, part 2: Copy pluginspull/718/merge
commit
eadd2013c2
|
|
@ -4,13 +4,18 @@
|
|||
#include "test_env.h"
|
||||
|
||||
namespace {
|
||||
const char *HTTP_SERVER_NAME = "http://developer.mbed.org";
|
||||
// Test connection information
|
||||
const char *HTTP_SERVER_NAME = "developer.mbed.org";
|
||||
const char *HTTP_SERVER_FILE_PATH = "/media/uploads/mbed_official/hello.txt";
|
||||
const int HTTP_SERVER_PORT = 80;
|
||||
const int RECV_BUFFER_SIZE = 512;
|
||||
|
||||
// Test related data
|
||||
const char *HTTP_OK_STR = "200 OK";
|
||||
const char *HTTP_HELLO_STR = "Hello world!";
|
||||
|
||||
// Test buffers
|
||||
char buffer[RECV_BUFFER_SIZE] = {0};
|
||||
}
|
||||
|
||||
bool find_substring(const char *first, const char *last, const char *s_first, const char *s_last) {
|
||||
|
|
@ -19,39 +24,54 @@ bool find_substring(const char *first, const char *last, const char *s_first, co
|
|||
}
|
||||
|
||||
int main() {
|
||||
bool result = false;
|
||||
EthernetInterface eth;
|
||||
eth.init(); //Use DHCP
|
||||
eth.connect();
|
||||
printf("TCP client IP Address is %s\n", eth.getIPAddress());
|
||||
printf("TCP client IP Address is %s\r\n", eth.getIPAddress());
|
||||
|
||||
TCPSocketConnection sock;
|
||||
sock.connect(HTTP_SERVER_NAME, HTTP_SERVER_PORT);
|
||||
if (sock.connect(HTTP_SERVER_NAME, HTTP_SERVER_PORT) == 0) {
|
||||
printf("HTTP: Connected to %s:%d\r\n", HTTP_SERVER_NAME, HTTP_SERVER_PORT);
|
||||
|
||||
char http_cmd[] = "GET /media/uploads/mbed_official/hello.txt HTTP/1.0\n\n";
|
||||
sock.send_all(http_cmd, sizeof(http_cmd));
|
||||
// We are constructing GET command like this:
|
||||
// GET http://developer.mbed.org/media/uploads/mbed_official/hello.txt HTTP/1.0\n\n
|
||||
strcpy(buffer, "GET http://");
|
||||
strcat(buffer, HTTP_SERVER_NAME);
|
||||
strcat(buffer, HTTP_SERVER_FILE_PATH);
|
||||
strcat(buffer, " HTTP/1.0\n\n");
|
||||
// Send GET command
|
||||
sock.send_all(buffer, strlen(buffer));
|
||||
|
||||
char buffer[RECV_BUFFER_SIZE] = {0};
|
||||
bool result = true;
|
||||
while (true) {
|
||||
const int ret = sock.receive(buffer, sizeof(buffer) - 1);
|
||||
if (ret <= 0)
|
||||
break;
|
||||
buffer[ret] = '\0';
|
||||
// Server will respond with HTTP GET's success code
|
||||
bool found_200_ok = false;
|
||||
{
|
||||
const int ret = sock.receive(buffer, sizeof(buffer) - 1);
|
||||
buffer[ret] = '\0';
|
||||
// Find 200 OK HTTP status in reply
|
||||
found_200_ok = find_substring(buffer, buffer + ret, HTTP_OK_STR, HTTP_OK_STR + strlen(HTTP_OK_STR));
|
||||
printf("HTTP: Received %d chars from server\r\n", ret);
|
||||
printf("HTTP: Received 200 OK status ... %s\r\n", found_200_ok ? "[OK]" : "[FAIL]");
|
||||
printf("HTTP: Received massage:\r\n\r\n");
|
||||
printf("%s", buffer);
|
||||
}
|
||||
|
||||
// Find 200 OK HTTP status in reply
|
||||
bool found_200_ok = find_substring(buffer, buffer + ret, HTTP_OK_STR, HTTP_OK_STR + strlen(HTTP_OK_STR));
|
||||
result = result && found_200_ok;
|
||||
// Server will respond with requested file content
|
||||
bool found_hello = false;
|
||||
{
|
||||
const int ret = sock.receive(buffer, sizeof(buffer) - 1);
|
||||
buffer[ret] = '\0';
|
||||
// Find Hello World! in reply
|
||||
found_hello = find_substring(buffer, buffer + ret, HTTP_HELLO_STR, HTTP_HELLO_STR + strlen(HTTP_HELLO_STR));
|
||||
printf("HTTP: Received %d chars from server\r\n", ret);
|
||||
printf("HTTP: Received '%s' status ... %s\r\n", HTTP_HELLO_STR, found_hello ? "[OK]" : "[FAIL]");
|
||||
printf("HTTP: Received massage:\r\n\r\n");
|
||||
printf("%s", buffer);
|
||||
}
|
||||
|
||||
// Find Hello World! in reply
|
||||
bool found_hello = find_substring(buffer, buffer + ret, HTTP_HELLO_STR, HTTP_HELLO_STR + strlen(HTTP_HELLO_STR));
|
||||
result = result && found_hello;
|
||||
|
||||
// Print results
|
||||
printf("HTTP: Received %d chars from server\r\n", ret);
|
||||
printf("HTTP: Received 200 OK status ... %s\r\n", found_200_ok ? "[OK]" : "[FAIL]");
|
||||
printf("HTTP: Received '%s' status ... %s\r\n", HTTP_HELLO_STR, found_hello ? "[OK]" : "[FAIL]");
|
||||
printf("HTTP: Received massage:\r\n\r\n");
|
||||
printf("%s", buffer);
|
||||
if (found_200_ok && found_hello) {
|
||||
result = true;
|
||||
}
|
||||
}
|
||||
|
||||
sock.close();
|
||||
|
|
|
|||
|
|
@ -24,12 +24,6 @@ NTP Client header file
|
|||
#ifndef NTPCLIENT_H_
|
||||
#define NTPCLIENT_H_
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
using std::uint8_t;
|
||||
using std::uint16_t;
|
||||
using std::uint32_t;
|
||||
|
||||
#include "UDPSocket.h"
|
||||
|
||||
#define NTP_DEFAULT_PORT 123
|
||||
|
|
@ -95,8 +89,6 @@ private:
|
|||
} __attribute__ ((packed));
|
||||
|
||||
UDPSocket m_sock;
|
||||
|
||||
};
|
||||
|
||||
|
||||
#endif /* NTPCLIENT_H_ */
|
||||
|
|
|
|||
|
|
@ -18,28 +18,25 @@ limitations under the License.
|
|||
import sys
|
||||
import uuid
|
||||
from sys import stdout
|
||||
from host_test import TestResults, Test
|
||||
from host_test import HostTestResults, Test
|
||||
|
||||
|
||||
class EchoTest(Test):
|
||||
""" This host test will use mbed serial port with
|
||||
""" This host test will use mbed serial port with
|
||||
baudrate 115200 to perform echo test on that port.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# Constructors
|
||||
TestResults.__init__(self)
|
||||
HostTestResults.__init__(self)
|
||||
Test.__init__(self)
|
||||
|
||||
|
||||
# Test parameters
|
||||
self.TEST_SERIAL_BAUDRATE = 115200
|
||||
self.TEST_LOOP_COUNT = 50
|
||||
|
||||
# Initializations
|
||||
serial_init_res = self.mbed.init_serial(self.TEST_SERIAL_BAUDRATE)
|
||||
if not serial_init_res:
|
||||
self.print_result(self.RESULT_IO_SERIAL)
|
||||
self.mbed.reset()
|
||||
# Custom initialization for echo test
|
||||
self.mbed.init_serial_params(serial_baud=self.TEST_SERIAL_BAUDRATE)
|
||||
|
||||
def test(self):
|
||||
""" Test function, return True or False to get standard test notification on stdout
|
||||
|
|
@ -47,7 +44,6 @@ class EchoTest(Test):
|
|||
c = self.mbed.serial_readline() # '{{start}}'
|
||||
if c is None:
|
||||
return self.RESULT_IO_SERIAL
|
||||
self.notify(c.strip())
|
||||
|
||||
self.mbed.flush()
|
||||
self.notify("HOST: Starting the ECHO test")
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ class Mbed:
|
|||
|
||||
parser.add_option("-p", "--port",
|
||||
dest="port",
|
||||
help="The serial port of the target mbed (ie: COM3)",
|
||||
help="The serial port of the target mbed",
|
||||
metavar="PORT")
|
||||
|
||||
parser.add_option("-d", "--disk",
|
||||
|
|
@ -56,16 +56,27 @@ class Mbed:
|
|||
help="The target disk path",
|
||||
metavar="DISK_PATH")
|
||||
|
||||
parser.add_option("-f", "--image-path",
|
||||
dest="image_path",
|
||||
help="Path with target's image",
|
||||
metavar="IMAGE_PATH")
|
||||
|
||||
parser.add_option("-c", "--copy",
|
||||
dest="copy_method",
|
||||
help="Copy method selector",
|
||||
metavar="COPY_METHOD")
|
||||
|
||||
parser.add_option("-C", "--program_cycle_s",
|
||||
dest="program_cycle_s",
|
||||
help="Program cycle sleep. Define how many seconds you want wait after copying bianry onto target",
|
||||
type="float",
|
||||
metavar="COPY_METHOD")
|
||||
|
||||
parser.add_option("-t", "--timeout",
|
||||
dest="timeout",
|
||||
help="Timeout",
|
||||
metavar="TIMEOUT")
|
||||
|
||||
parser.add_option("-e", "--extra",
|
||||
dest="extra",
|
||||
help="Extra serial port (used by some tests)",
|
||||
metavar="EXTRA")
|
||||
|
||||
parser.add_option("-r", "--reset",
|
||||
dest="forced_reset_type",
|
||||
help="Forces different type of reset")
|
||||
|
|
@ -84,32 +95,47 @@ class Mbed:
|
|||
if self.options.port is None:
|
||||
raise Exception("The serial port of the target mbed have to be provided as command line arguments")
|
||||
|
||||
# Options related to copy / reset mbed device
|
||||
self.port = self.options.port
|
||||
self.disk = self.options.disk
|
||||
self.extra_port = self.options.extra
|
||||
self.extra_serial = None
|
||||
self.serial = None
|
||||
self.timeout = self.DEFAULT_TOUT if self.options.timeout is None else self.options.timeout
|
||||
print 'Host test instrumentation on port: "%s" and disk: "%s"' % (self.port, self.disk)
|
||||
self.image_path = self.options.image_path.strip('"')
|
||||
self.copy_method = self.options.copy_method
|
||||
self.program_cycle_s = float(self.options.program_cycle_s)
|
||||
|
||||
def init_serial(self, baud=9600, extra_baud=9600):
|
||||
""" Initialize serial port. Function will return error is port can't be opened or initialized
|
||||
self.serial = None
|
||||
self.serial_baud = 9600
|
||||
self.serial_timeout = 1
|
||||
|
||||
self.timeout = self.DEFAULT_TOUT if self.options.timeout is None else self.options.timeout
|
||||
print 'MBED: Instrumentation: "%s" and disk: "%s"' % (self.port, self.disk)
|
||||
|
||||
def init_serial_params(self, serial_baud=9600, serial_timeout=1):
|
||||
""" Initialize port parameters.
|
||||
This parameters will be used by self.init_serial() function to open serial port
|
||||
"""
|
||||
self.serial_baud = serial_baud
|
||||
self.serial_timeout = serial_timeout
|
||||
|
||||
def init_serial(self, serial_baud=None, serial_timeout=None):
|
||||
""" Initialize serial port.
|
||||
Function will return error is port can't be opened or initialized
|
||||
"""
|
||||
# Overload serial port configuration from default to parameters' values if they are specified
|
||||
serial_baud = serial_baud if serial_baud is not None else self.serial_baud
|
||||
serial_timeout = serial_timeout if serial_timeout is not None else self.serial_timeout
|
||||
|
||||
result = True
|
||||
try:
|
||||
self.serial = Serial(self.port, timeout=1)
|
||||
self.serial = Serial(self.port, baudrate=serial_baud, timeout=serial_timeout)
|
||||
except Exception as e:
|
||||
print "MBED: %s"% str(e)
|
||||
result = False
|
||||
# Port can be opened
|
||||
if result:
|
||||
self.serial.setBaudrate(baud)
|
||||
if self.extra_port:
|
||||
self.extra_serial = Serial(self.extra_port, timeout = 1)
|
||||
self.extra_serial.setBaudrate(extra_baud)
|
||||
self.flush()
|
||||
return result
|
||||
|
||||
def serial_timeout(self, timeout):
|
||||
def set_serial_timeout(self, timeout):
|
||||
""" Wraps self.mbed.serial object timeout property
|
||||
"""
|
||||
result = None
|
||||
|
|
@ -139,7 +165,8 @@ class Mbed:
|
|||
try:
|
||||
c = self.serial.read(1)
|
||||
result += c
|
||||
except:
|
||||
except Exception as e:
|
||||
print "MBED: %s"% str(e)
|
||||
result = None
|
||||
break
|
||||
if c == '\n':
|
||||
|
|
@ -157,12 +184,6 @@ class Mbed:
|
|||
result = None
|
||||
return result
|
||||
|
||||
def touch_file(self, path):
|
||||
""" Touch file and set timestamp to items
|
||||
"""
|
||||
with open(path, 'a'):
|
||||
os.utime(path, None)
|
||||
|
||||
def reset_timeout(self, timeout):
|
||||
""" Timeout executed just after reset command is issued
|
||||
"""
|
||||
|
|
@ -176,25 +197,51 @@ class Mbed:
|
|||
# Flush serials to get only input after reset
|
||||
self.flush()
|
||||
if self.options.forced_reset_type:
|
||||
host_tests_plugins.call_plugin('ResetMethod', self.options.forced_reset_type, disk=self.disk)
|
||||
result = host_tests_plugins.call_plugin('ResetMethod', self.options.forced_reset_type, disk=self.disk)
|
||||
else:
|
||||
host_tests_plugins.call_plugin('ResetMethod', 'default', serial=self.serial)
|
||||
result = host_tests_plugins.call_plugin('ResetMethod', 'default', serial=self.serial)
|
||||
# Give time to wait for the image loading
|
||||
reset_tout_s = self.options.forced_reset_timeout if self.options.forced_reset_timeout is not None else self.DEFAULT_RESET_TOUT
|
||||
self.reset_timeout(reset_tout_s)
|
||||
return result
|
||||
|
||||
def copy_image(self, image_path=None, disk=None, copy_method=None):
|
||||
""" Closure for copy_image_raw() method.
|
||||
Method which is actually copying image to mbed
|
||||
"""
|
||||
# Set closure environment
|
||||
image_path = image_path if image_path is not None else self.image_path
|
||||
disk = disk if disk is not None else self.disk
|
||||
copy_method = copy_method if copy_method is not None else self.copy_method
|
||||
# Call proper copy method
|
||||
result = self.copy_image_raw(image_path, disk, copy_method)
|
||||
sleep(self.program_cycle_s)
|
||||
return result
|
||||
|
||||
def copy_image_raw(self, image_path=None, disk=None, copy_method=None):
|
||||
""" Copy file depending on method you want to use. Handles exception
|
||||
and return code from shell copy commands.
|
||||
"""
|
||||
if copy_method is not None:
|
||||
# image_path - Where is binary with target's firmware
|
||||
result = host_tests_plugins.call_plugin('CopyMethod', copy_method, image_path=image_path, destination_disk=disk)
|
||||
else:
|
||||
copy_method = 'default'
|
||||
result = host_tests_plugins.call_plugin('CopyMethod', copy_method, image_path=image_path, destination_disk=disk)
|
||||
return result;
|
||||
|
||||
def flush(self):
|
||||
""" Flush serial ports
|
||||
"""
|
||||
result = False
|
||||
if self.serial:
|
||||
self.serial.flushInput()
|
||||
self.serial.flushOutput()
|
||||
if self.extra_serial:
|
||||
self.extra_serial.flushInput()
|
||||
self.extra_serial.flushOutput()
|
||||
result = True
|
||||
return result
|
||||
|
||||
|
||||
class TestResults:
|
||||
class HostTestResults:
|
||||
""" Test results set by host tests
|
||||
"""
|
||||
def __init__(self):
|
||||
|
|
@ -202,9 +249,12 @@ class TestResults:
|
|||
self.RESULT_FAILURE = 'failure'
|
||||
self.RESULT_ERROR = 'error'
|
||||
self.RESULT_IO_SERIAL = 'ioerr_serial'
|
||||
self.RESULT_NO_IMAGE = 'no_image'
|
||||
self.RESULT_IOERR_COPY = "ioerr_copy"
|
||||
self.RESULT_PASSIVE = "passive"
|
||||
|
||||
|
||||
class Test(TestResults):
|
||||
class Test(HostTestResults):
|
||||
""" Base class for host test's test runner
|
||||
"""
|
||||
def __init__(self):
|
||||
|
|
@ -214,15 +264,38 @@ class Test(TestResults):
|
|||
""" Test runner for host test. This function will start executing
|
||||
test and forward test result via serial port to test suite
|
||||
"""
|
||||
# Copy image to device
|
||||
self.notify("HOST: Copy image onto target...")
|
||||
result = self.mbed.copy_image()
|
||||
if not result:
|
||||
self.print_result(self.RESULT_IOERR_COPY)
|
||||
|
||||
# Initialize and open target's serial port (console)
|
||||
self.notify("HOST: Initialize serial port...")
|
||||
result = self.mbed.init_serial()
|
||||
if not result:
|
||||
self.print_result(self.RESULT_IO_SERIAL)
|
||||
|
||||
# Reset device
|
||||
self.notify("HOST: Reset target...")
|
||||
result = self.mbed.reset()
|
||||
if not result:
|
||||
self.print_result(self.RESULT_IO_SERIAL)
|
||||
|
||||
# Run test
|
||||
try:
|
||||
result = self.test()
|
||||
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
|
||||
if result is not None:
|
||||
self.print_result(result)
|
||||
else:
|
||||
self.notify("HOST: Passive mode...")
|
||||
except Exception, e:
|
||||
print str(e)
|
||||
self.print_result(self.RESULT_ERROR)
|
||||
|
||||
def setup(self):
|
||||
""" Setup and check if configuration for test is correct. E.g. if serial port can be opened
|
||||
""" Setup and check if configuration for test is
|
||||
correct. E.g. if serial port can be opened.
|
||||
"""
|
||||
result = True
|
||||
if not self.mbed.serial:
|
||||
|
|
@ -246,30 +319,27 @@ class DefaultTest(Test):
|
|||
""" Test class with serial port initialization
|
||||
"""
|
||||
def __init__(self):
|
||||
TestResults.__init__(self)
|
||||
HostTestResults.__init__(self)
|
||||
Test.__init__(self)
|
||||
serial_init_res = self.mbed.init_serial()
|
||||
if not serial_init_res:
|
||||
self.print_result(self.RESULT_IO_SERIAL)
|
||||
self.mbed.reset()
|
||||
|
||||
|
||||
class Simple(DefaultTest):
|
||||
""" Simple, basic host test's test runner waiting for serial port
|
||||
output from MUT, no supervision over test running in MUT is executed.
|
||||
Just waiting for result
|
||||
"""
|
||||
def run(self):
|
||||
def test(self):
|
||||
result = self.RESULT_SUCCESS
|
||||
try:
|
||||
while True:
|
||||
c = self.mbed.serial_read(512)
|
||||
if c is None:
|
||||
self.print_result(self.RESULT_IO_SERIAL)
|
||||
break
|
||||
return self.RESULT_IO_SERIAL
|
||||
stdout.write(c)
|
||||
stdout.flush()
|
||||
except KeyboardInterrupt, _:
|
||||
self.notify("\r\n[CTRL+C] exit")
|
||||
result = self.RESULT_ERROR
|
||||
return result
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
|||
|
|
@ -78,9 +78,9 @@ class HostTestPluginBase:
|
|||
try:
|
||||
ret = call(cmd, shell=shell)
|
||||
if ret:
|
||||
self.print_plugin_error("[ret=%d] Command: %s"% (self.name, self.type, ret, ' '.join(cmd)))
|
||||
except Exception, e:
|
||||
self.print_plugin_error("[ret=%d] Command: %s"% (int(ret), cmd))
|
||||
except Exception as e:
|
||||
result = False
|
||||
self.print_plugin_error("[ret=%d] Command: %s"% (self.name, self.type, ret, " ".join(cmd)))
|
||||
self.print_plugin_error("%s::%s: " + str(e))
|
||||
self.print_plugin_error("[ret=%d] Command: %s"% (int(ret), cmd))
|
||||
self.print_plugin_error(str(e))
|
||||
return result
|
||||
|
|
|
|||
|
|
@ -32,7 +32,8 @@ class HostTestPluginCopyMethod_Mbed(HostTestPluginBase):
|
|||
try:
|
||||
copy(image_path, destination_disk)
|
||||
except Exception, e:
|
||||
self.print_plugin_error("shutil.copy(%s, %s) failed: %s"% (image_path, destination_disk, str(e)))
|
||||
self.print_plugin_error("shutil.copy('%s', '%s')"% (image_path, destination_disk))
|
||||
self.print_plugin_error("Error: %s"% str(e))
|
||||
result = False
|
||||
return result
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
"""
|
||||
|
||||
from shutil import copy
|
||||
import re
|
||||
from os.path import join
|
||||
from host_test_plugins import HostTestPluginBase
|
||||
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ class StdioTest(DefaultTest):
|
|||
self.mbed.serial_write(str(random_integer) + "\n")
|
||||
|
||||
serial_stdio_msg = self.mbed.serial_readline()
|
||||
if c is None:
|
||||
if serial_stdio_msg is None:
|
||||
return self.RESULT_IO_SERIAL
|
||||
delay_time = time() - start
|
||||
self.notify(serial_stdio_msg.strip())
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ limitations under the License.
|
|||
import sys
|
||||
import socket
|
||||
from sys import stdout
|
||||
from host_test import Test
|
||||
from host_test import HostTestResults, Test
|
||||
from SocketServer import BaseRequestHandler, TCPServer
|
||||
|
||||
|
||||
|
|
@ -28,15 +28,12 @@ SERVER_PORT = 7
|
|||
|
||||
class TCPEchoClientTest(Test):
|
||||
def __init__(self):
|
||||
HostTestResults.__init__(self)
|
||||
Test.__init__(self)
|
||||
self.mbed.init_serial()
|
||||
|
||||
def send_server_ip_port(self, ip_address, port_no):
|
||||
""" Set up network host. Reset target and and send server IP via serial to Mbed
|
||||
"""
|
||||
self.notify("HOST: Resetting target...")
|
||||
self.mbed.reset()
|
||||
|
||||
c = self.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...'
|
||||
if c is None:
|
||||
self.print_result(self.RESULT_IO_SERIAL)
|
||||
|
|
@ -57,10 +54,15 @@ class TCPEchoClientTest(Test):
|
|||
return
|
||||
self.notify(c.strip())
|
||||
|
||||
def test(self):
|
||||
# Returning none will suppress host test from printing success code
|
||||
return None
|
||||
|
||||
|
||||
class TCPEchoClient_Handler(BaseRequestHandler):
|
||||
def handle(self):
|
||||
""" One handle per connection """
|
||||
""" One handle per connection
|
||||
"""
|
||||
print "HOST: Connection received...",
|
||||
count = 1;
|
||||
while True:
|
||||
|
|
@ -78,9 +80,10 @@ class TCPEchoClient_Handler(BaseRequestHandler):
|
|||
|
||||
|
||||
server = TCPServer((SERVER_IP, SERVER_PORT), TCPEchoClient_Handler)
|
||||
print "HOST: Listening for connections: " + SERVER_IP + ":" + str(SERVER_PORT)
|
||||
print "HOST: Listening for TCP connections: " + SERVER_IP + ":" + str(SERVER_PORT)
|
||||
|
||||
mbed_test = TCPEchoClientTest();
|
||||
mbed_test.run()
|
||||
mbed_test.send_server_ip_port(SERVER_IP, SERVER_PORT)
|
||||
|
||||
server.serve_forever()
|
||||
|
|
|
|||
|
|
@ -32,12 +32,11 @@ class TCPEchoServerTest(DefaultTest):
|
|||
PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
|
||||
re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
|
||||
|
||||
def run(self):
|
||||
def test(self):
|
||||
result = False
|
||||
c = self.mbed.serial_readline()
|
||||
if c is None:
|
||||
self.print_result(self.RESULT_IO_SERIAL)
|
||||
return
|
||||
return self.RESULT_IO_SERIAL
|
||||
self.notify(c)
|
||||
|
||||
m = self.re_detect_server_ip.search(c)
|
||||
|
|
@ -52,15 +51,20 @@ class TCPEchoServerTest(DefaultTest):
|
|||
self.s.connect((self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
|
||||
except Exception, e:
|
||||
self.s = None
|
||||
self.notify("HOST: Error: %s"% e)
|
||||
self.print_result(self.RESULT_ERROR)
|
||||
exit(-1)
|
||||
self.notify("HOST: Socket error: %s"% e)
|
||||
return self.RESULT_ERROR
|
||||
|
||||
print 'HOST: Sending %d echo strings...'% self.ECHO_LOOPs,
|
||||
for i in range(0, self.ECHO_LOOPs):
|
||||
TEST_STRING = str(uuid.uuid4())
|
||||
self.s.sendall(TEST_STRING)
|
||||
data = self.s.recv(128)
|
||||
try:
|
||||
self.s.sendall(TEST_STRING)
|
||||
data = self.s.recv(128)
|
||||
except Exception, e:
|
||||
self.s = None
|
||||
self.notify("HOST: Socket error: %s"% e)
|
||||
return self.RESULT_ERROR
|
||||
|
||||
received_str = repr(data)[1:-1]
|
||||
if TEST_STRING == received_str: # We need to cut not needed single quotes from the string
|
||||
sys.stdout.write('.')
|
||||
|
|
@ -77,22 +81,10 @@ class TCPEchoServerTest(DefaultTest):
|
|||
if self.s is not None:
|
||||
self.s.close()
|
||||
else:
|
||||
print "HOST: TCP Server not found"
|
||||
self.notify("HOST: TCP Server not found")
|
||||
result = False
|
||||
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
|
||||
|
||||
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
|
||||
|
||||
# Receiving
|
||||
try:
|
||||
while True:
|
||||
c = self.mbed.serial_read(512)
|
||||
if c is None:
|
||||
self.print_result(self.RESULT_IO_SERIAL)
|
||||
break
|
||||
stdout.write(c)
|
||||
stdout.flush()
|
||||
except KeyboardInterrupt, _:
|
||||
print "\n[CTRL+c] exit"
|
||||
|
||||
if __name__ == '__main__':
|
||||
TCPEchoServerTest().run()
|
||||
|
|
|
|||
|
|
@ -57,13 +57,13 @@ def udp_packet_recv(threadName, server_ip, server_port):
|
|||
|
||||
|
||||
class UDPEchoServerTest(DefaultTest):
|
||||
ECHO_SERVER_ADDRESS = "" # UDP IP of datagram bursts
|
||||
ECHO_PORT = 0 # UDP port for datagram bursts
|
||||
CONTROL_PORT = 23 # TCP port used to get stats from mbed device, e.g. counters
|
||||
s = None # Socket
|
||||
ECHO_SERVER_ADDRESS = "" # UDP IP of datagram bursts
|
||||
ECHO_PORT = 0 # UDP port for datagram bursts
|
||||
CONTROL_PORT = 23 # TCP port used to get stats from mbed device, e.g. counters
|
||||
s = None # Socket
|
||||
|
||||
TEST_PACKET_COUNT = 1000 # how many packets should be send
|
||||
TEST_STRESS_FACTOR = 0.001 # stress factor: 10 ms
|
||||
TEST_PACKET_COUNT = 1000 # how many packets should be send
|
||||
TEST_STRESS_FACTOR = 0.001 # stress factor: 10 ms
|
||||
PACKET_SATURATION_RATIO = 29.9 # Acceptable packet transmission in %
|
||||
|
||||
PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
|
||||
|
|
@ -81,11 +81,10 @@ class UDPEchoServerTest(DefaultTest):
|
|||
s.close()
|
||||
return data
|
||||
|
||||
def run(self):
|
||||
def test(self):
|
||||
serial_ip_msg = self.mbed.serial_readline()
|
||||
if serial_ip_msg is None:
|
||||
self.print_result(self.RESULT_IO_SERIAL)
|
||||
return
|
||||
return self.RESULT_IO_SERIAL
|
||||
stdout.write(serial_ip_msg)
|
||||
stdout.flush()
|
||||
# Searching for IP address and port prompted by server
|
||||
|
|
@ -101,8 +100,7 @@ class UDPEchoServerTest(DefaultTest):
|
|||
except Exception, e:
|
||||
self.s = None
|
||||
self.notify("HOST: Error: %s"% e)
|
||||
self.print_result(self.RESULT_ERROR)
|
||||
return
|
||||
return self.RESULT_ERROR
|
||||
|
||||
# UDP replied receiver works in background to get echoed datagrams
|
||||
SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
|
||||
|
|
@ -123,38 +121,25 @@ class UDPEchoServerTest(DefaultTest):
|
|||
|
||||
# Wait 5 seconds for packets to come
|
||||
result = True
|
||||
print
|
||||
print "HOST: Test Summary:"
|
||||
self.notify("HOST: Test Summary:")
|
||||
for d in range(5):
|
||||
sleep(1.0)
|
||||
summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0
|
||||
self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d, summary_datagram_success, len(dict_udp_recv_datagrams), self.TEST_PACKET_COUNT, self.TEST_STRESS_FACTOR))
|
||||
self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d,
|
||||
summary_datagram_success,
|
||||
len(dict_udp_recv_datagrams),
|
||||
self.TEST_PACKET_COUNT,
|
||||
self.TEST_STRESS_FACTOR))
|
||||
result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO)
|
||||
stdout.flush()
|
||||
|
||||
# Getting control data from test
|
||||
print
|
||||
print "HOST: Mbed Summary:"
|
||||
self.notify("...")
|
||||
self.notify("HOST: Mbed Summary:")
|
||||
mbed_stats = self.get_control_data()
|
||||
print mbed_stats
|
||||
print
|
||||
stdout.flush()
|
||||
self.notify(mbed_stats)
|
||||
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
|
||||
|
||||
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
|
||||
|
||||
# Receiving serial data from mbed
|
||||
print
|
||||
print "HOST: Remaining mbed serial port data:"
|
||||
try:
|
||||
while True:
|
||||
c = self.mbed.serial_read(512)
|
||||
if c is None:
|
||||
self.print_result(self.RESULT_IO_SERIAL)
|
||||
break
|
||||
stdout.write(c)
|
||||
stdout.flush()
|
||||
except KeyboardInterrupt, _:
|
||||
print "\n[CTRL+c] exit"
|
||||
|
||||
if __name__ == '__main__':
|
||||
UDPEchoServerTest().run()
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ limitations under the License.
|
|||
import sys
|
||||
import socket
|
||||
from sys import stdout
|
||||
from host_test import Test
|
||||
from host_test import HostTestResults, Test
|
||||
from SocketServer import BaseRequestHandler, UDPServer
|
||||
|
||||
|
||||
|
|
@ -28,20 +28,17 @@ SERVER_PORT = 7
|
|||
|
||||
class UDPEchoClientTest(Test):
|
||||
def __init__(self):
|
||||
HostTestResults.__init__(self)
|
||||
Test.__init__(self)
|
||||
self.mbed.init_serial()
|
||||
|
||||
def send_server_ip_port(self, ip_address, port_no):
|
||||
print "HOST: Resetting target..."
|
||||
self.mbed.reset()
|
||||
|
||||
c = self.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
|
||||
if c is None:
|
||||
self.print_result(self.RESULT_IO_SERIAL)
|
||||
return
|
||||
self.notify(c.strip())
|
||||
|
||||
print "HOST: Sending server IP Address to target..."
|
||||
self.notify("HOST: Sending server IP Address to target...")
|
||||
connection_str = ip_address + ":" + str(port_no) + "\n"
|
||||
self.mbed.serial_write(connection_str)
|
||||
|
||||
|
|
@ -50,6 +47,11 @@ class UDPEchoClientTest(Test):
|
|||
self.print_result(self.RESULT_IO_SERIAL)
|
||||
return
|
||||
self.notify(c.strip())
|
||||
return self.RESULT_PASSIVE
|
||||
|
||||
def test(self):
|
||||
# Returning none will suppress host test from printing success code
|
||||
return None
|
||||
|
||||
|
||||
class UDPEchoClient_Handler(BaseRequestHandler):
|
||||
|
|
@ -67,9 +69,10 @@ class UDPEchoClient_Handler(BaseRequestHandler):
|
|||
|
||||
|
||||
server = UDPServer((SERVER_IP, SERVER_PORT), UDPEchoClient_Handler)
|
||||
print "HOST: Listening for connections..."
|
||||
print "HOST: Listening for UDP connections..."
|
||||
|
||||
mbed_test = UDPEchoClientTest();
|
||||
mbed_test.run()
|
||||
mbed_test.send_server_ip_port(SERVER_IP, SERVER_PORT)
|
||||
|
||||
server.serve_forever()
|
||||
|
|
|
|||
|
|
@ -31,12 +31,11 @@ class UDPEchoServerTest(DefaultTest):
|
|||
PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
|
||||
re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
|
||||
|
||||
def run(self):
|
||||
def test(self):
|
||||
result = True
|
||||
serial_ip_msg = self.mbed.serial_readline()
|
||||
if serial_ip_msg is None:
|
||||
self.print_result(self.RESULT_IO_SERIAL)
|
||||
return
|
||||
return self.RESULT_IO_SERIAL
|
||||
self.notify(serial_ip_msg)
|
||||
# Searching for IP address and port prompted by server
|
||||
m = self.re_detect_server_ip.search(serial_ip_msg)
|
||||
|
|
@ -50,9 +49,8 @@ class UDPEchoServerTest(DefaultTest):
|
|||
self.s = socket(AF_INET, SOCK_DGRAM)
|
||||
except Exception, e:
|
||||
self.s = None
|
||||
print "HOST: Error: %s" % e
|
||||
self.print_result(self.RESULT_ERROR)
|
||||
exit(-1)
|
||||
self.notify("HOST: Socket error: %s"% e)
|
||||
return self.RESULT_ERROR
|
||||
|
||||
for i in range(0, 100):
|
||||
TEST_STRING = str(uuid.uuid4())
|
||||
|
|
@ -69,20 +67,8 @@ class UDPEchoServerTest(DefaultTest):
|
|||
|
||||
if self.s is not None:
|
||||
self.s.close()
|
||||
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
|
||||
|
||||
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
|
||||
|
||||
# Receiving
|
||||
try:
|
||||
while True:
|
||||
c = self.mbed.serial_read(512)
|
||||
if c is None:
|
||||
self.print_result(self.RESULT_IO_SERIAL)
|
||||
break
|
||||
stdout.write(c)
|
||||
stdout.flush()
|
||||
except KeyboardInterrupt, _:
|
||||
print "\n[CTRL+c] exit"
|
||||
|
||||
if __name__ == '__main__':
|
||||
UDPEchoServerTest().run()
|
||||
|
|
|
|||
|
|
@ -23,10 +23,14 @@ class WaitusTest(DefaultTest):
|
|||
""" This test is reading single characters from stdio
|
||||
and measures time between their occurrences.
|
||||
"""
|
||||
TICK_LOOP_COUNTER = 13
|
||||
TICK_LOOP_SUCCESSFUL_COUNTS = 10
|
||||
DEVIATION = 0.10 # +/-10%
|
||||
|
||||
def test(self):
|
||||
test_result = True
|
||||
# First character to start test (to know after reset when test starts)
|
||||
if self.mbed.serial_timeout(None) is None:
|
||||
if self.mbed.set_serial_timeout(None) is None:
|
||||
return self.RESULT_IO_SERIAL
|
||||
c = self.mbed.serial_read(1)
|
||||
if c is None:
|
||||
|
|
@ -38,29 +42,32 @@ class WaitusTest(DefaultTest):
|
|||
c = self.mbed.serial_read(1) # Re-read first 'tick'
|
||||
if c is None:
|
||||
return self.RESULT_IO_SERIAL
|
||||
self.notify("Test started")
|
||||
start_serial_pool = time()
|
||||
start = time()
|
||||
for i in range(0, 10):
|
||||
|
||||
success_counter = 0
|
||||
|
||||
for i in range(0, self.TICK_LOOP_COUNTER):
|
||||
c = self.mbed.serial_read(1)
|
||||
if c is None:
|
||||
return self.RESULT_IO_SERIAL
|
||||
if i > 2: # we will ignore first few measurements
|
||||
delta = time() - start
|
||||
deviation = abs(delta - 1)
|
||||
# Round values
|
||||
delta = round(delta, 2)
|
||||
deviation = round(deviation, 2)
|
||||
# Check if time measurements are in given range
|
||||
deviation_ok = True if delta > 0 and deviation <= 0.10 else False # +/-10%
|
||||
test_result = test_result and deviation_ok
|
||||
msg = "OK" if deviation_ok else "FAIL"
|
||||
self.notify(". in %.2f sec (%.2f) [%s]" % (delta, deviation, msg))
|
||||
else:
|
||||
self.notify(". skipped")
|
||||
delta = time() - start
|
||||
deviation = abs(delta - 1)
|
||||
# Round values
|
||||
delta = round(delta, 2)
|
||||
deviation = round(deviation, 2)
|
||||
# Check if time measurements are in given range
|
||||
deviation_ok = True if delta > 0 and deviation <= self.DEVIATION else False
|
||||
success_counter = success_counter+1 if deviation_ok else 0
|
||||
msg = "OK" if deviation_ok else "FAIL"
|
||||
self.notify("%s in %.2f sec (%.2f) [%s]"% (c, delta, deviation, msg))
|
||||
start = time()
|
||||
if success_counter >= self.TICK_LOOP_SUCCESSFUL_COUNTS:
|
||||
break
|
||||
measurement_time = time() - start_serial_pool
|
||||
self.notify("Consecutive OK timer reads: %d"% success_counter)
|
||||
self.notify("Completed in %.2f sec" % (measurement_time))
|
||||
test_result = True if success_counter >= self.TICK_LOOP_SUCCESSFUL_COUNTS else False
|
||||
return self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -438,7 +438,8 @@ class SingleTestRunner(object):
|
|||
inc_dirs=INC_DIRS,
|
||||
jobs=self.opts_jobs)
|
||||
except ToolException:
|
||||
print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building project %s'% (project_name))
|
||||
project_name_str = project_name if project_name is not None else test_id
|
||||
print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building project %s'% (project_name_str))
|
||||
return test_summary, self.shuffle_random_seed, test_summary_ext, test_suite_properties_ext
|
||||
if self.opts_only_build_tests:
|
||||
# With this option we are skipping testing phase
|
||||
|
|
@ -598,30 +599,6 @@ class SingleTestRunner(object):
|
|||
result = self.TEST_LOOPS_DICT[test_id]
|
||||
return result
|
||||
|
||||
def image_copy_method_selector(self, target_name, image_path, disk, copy_method,
|
||||
images_config=None, image_dest=None, verbose=False):
|
||||
""" Function copied image file and fiddles with image configuration files in needed.
|
||||
This function will select proper image configuration (modify image config file
|
||||
if needed) after image is copied.
|
||||
"""
|
||||
image_dest = image_dest if image_dest is not None else ''
|
||||
_copy_res, _err_msg, _copy_method = self.file_copy_method_selector(image_path, disk, copy_method, image_dest=image_dest, verbose=verbose)
|
||||
return _copy_res, _err_msg, _copy_method
|
||||
|
||||
def file_copy_method_selector(self, image_path, disk, copy_method, image_dest='', verbose=False):
|
||||
""" Copy file depending on method you want to use. Handles exception
|
||||
and return code from shell copy commands.
|
||||
"""
|
||||
result = False
|
||||
resutl_msg = '' # TODO: pass result_msg from plugin to test suite
|
||||
if copy_method is not None:
|
||||
# image_path - Where is binary with target's firmware
|
||||
result = host_tests_plugins.call_plugin('CopyMethod', copy_method, image_path=image_path, destination_disk=disk)
|
||||
else:
|
||||
copy_method = 'default'
|
||||
result = host_tests_plugins.call_plugin('CopyMethod', copy_method, image_path=image_path, destination_disk=disk)
|
||||
return result, resutl_msg, copy_method
|
||||
|
||||
def delete_file(self, file_path):
|
||||
""" Remove file from the system
|
||||
"""
|
||||
|
|
@ -690,7 +667,7 @@ class SingleTestRunner(object):
|
|||
# Host test execution
|
||||
start_host_exec_time = time()
|
||||
|
||||
single_test_result = self.TEST_RESULT_UNDEF # singe test run result
|
||||
single_test_result = self.TEST_RESULT_UNDEF # single test run result
|
||||
_copy_method = selected_copy_method
|
||||
|
||||
if not exists(image_path):
|
||||
|
|
@ -699,31 +676,19 @@ class SingleTestRunner(object):
|
|||
single_test_output = self.logger.log_line(self.logger.LogType.ERROR, 'Image file does not exist: %s'% image_path)
|
||||
print single_test_output
|
||||
else:
|
||||
# Choose one method of copy files to mbed MSD drive
|
||||
_copy_res, _err_msg, _copy_method = self.image_copy_method_selector(target_name, image_path, disk, selected_copy_method,
|
||||
images_config, image_dest)
|
||||
# Host test execution
|
||||
start_host_exec_time = time()
|
||||
|
||||
if not _copy_res: # copy error to mbed MSD
|
||||
single_test_result = self.TEST_RESULT_IOERR_COPY
|
||||
single_test_output = self.logger.log_line(self.logger.LogType.ERROR, "Copy method '%s' failed. Reason: %s"% (_copy_method, _err_msg))
|
||||
print single_test_output
|
||||
else:
|
||||
# Copy Extra Files
|
||||
if not target_by_mcu.is_disk_virtual and test.extra_files:
|
||||
for f in test.extra_files:
|
||||
copy(f, disk)
|
||||
|
||||
sleep(target_by_mcu.program_cycle_s())
|
||||
# Host test execution
|
||||
start_host_exec_time = time()
|
||||
|
||||
host_test_verbose = self.opts_verbose_test_result_only or self.opts_verbose
|
||||
host_test_reset = self.opts_mut_reset_type if reset_type is None else reset_type
|
||||
single_test_result, single_test_output = self.run_host_test(test.host_test, disk, port, duration,
|
||||
micro=target_name,
|
||||
verbose=host_test_verbose,
|
||||
reset=host_test_reset,
|
||||
reset_tout=reset_tout)
|
||||
host_test_verbose = self.opts_verbose_test_result_only or self.opts_verbose
|
||||
host_test_reset = self.opts_mut_reset_type if reset_type is None else reset_type
|
||||
single_test_result, single_test_output = self.run_host_test(test.host_test,
|
||||
image_path, disk, port, duration,
|
||||
micro=target_name,
|
||||
verbose=host_test_verbose,
|
||||
reset=host_test_reset,
|
||||
reset_tout=reset_tout,
|
||||
copy_method=selected_copy_method,
|
||||
program_cycle_s=target_by_mcu.program_cycle_s())
|
||||
|
||||
# Store test result
|
||||
test_all_result.append(single_test_result)
|
||||
|
|
@ -799,7 +764,9 @@ class SingleTestRunner(object):
|
|||
result = test_all_result[0]
|
||||
return result
|
||||
|
||||
def run_host_test(self, name, disk, port, duration, micro=None, reset=None, reset_tout=None, verbose=False, extra_serial=None):
|
||||
def run_host_test(self, name, image_path, disk, port, duration,
|
||||
micro=None, reset=None, reset_tout=None,
|
||||
verbose=False, copy_method=None, program_cycle_s=None):
|
||||
""" Function creates new process with host test configured with particular test case.
|
||||
Function also is pooling for serial port activity from process to catch all data
|
||||
printed by test runner and host test during test execution
|
||||
|
|
@ -833,13 +800,19 @@ class SingleTestRunner(object):
|
|||
return result
|
||||
|
||||
# print "{%s} port:%s disk:%s" % (name, port, disk),
|
||||
cmd = ["python", "%s.py" % name, '-p', port, '-d', disk, '-t', str(duration)]
|
||||
cmd = ["python",
|
||||
'%s.py'% name,
|
||||
'-d', disk,
|
||||
'-f', '"%s"'% image_path,
|
||||
'-p', port,
|
||||
'-t', str(duration),
|
||||
'-C', str(program_cycle_s)]
|
||||
|
||||
# Add extra parameters to host_test
|
||||
if copy_method is not None:
|
||||
cmd += ["-c", copy_method]
|
||||
if micro is not None:
|
||||
cmd += ["-m", micro]
|
||||
if extra_serial is not None:
|
||||
cmd += ["-e", extra_serial]
|
||||
if reset is not None:
|
||||
cmd += ["-r", reset]
|
||||
if reset_tout is not None:
|
||||
|
|
@ -854,7 +827,7 @@ class SingleTestRunner(object):
|
|||
start_time = time()
|
||||
line = ''
|
||||
output = []
|
||||
while (time() - start_time) < duration:
|
||||
while (time() - start_time) < (2 * duration):
|
||||
c = get_char_from_queue(obs)
|
||||
|
||||
if c:
|
||||
|
|
|
|||
|
|
@ -657,7 +657,7 @@ TESTS = [
|
|||
"peripherals": ["ethernet"],
|
||||
},
|
||||
{
|
||||
"id": "NET_2", "description": "UDP client hello world",
|
||||
"id": "NET_2", "description": "NIST Internet Time Service",
|
||||
"source_dir": join(TEST_DIR, "net", "helloworld", "udpclient"),
|
||||
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY, TEST_MBED_LIB],
|
||||
"duration": 15,
|
||||
|
|
@ -697,7 +697,7 @@ TESTS = [
|
|||
"peripherals": ["ethernet"],
|
||||
},
|
||||
{
|
||||
"id": "NET_7", "description": "HTTP client",
|
||||
"id": "NET_7", "description": "HTTP client hello world",
|
||||
"source_dir": join(TEST_DIR, "net", "protocols", "HTTPClient_HelloWorld"),
|
||||
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY, TEST_MBED_LIB],
|
||||
"automated": True,
|
||||
|
|
|
|||
Loading…
Reference in New Issue