Added NET_4, NET_6 and NET_13 tests to autodetection pool

pull/900/head
Przemek Wirkus 2015-01-30 09:57:36 +00:00
parent 605ee4a0b1
commit 035714e409
7 changed files with 81 additions and 97 deletions

View File

@ -1,8 +1,8 @@
#include "mbed.h"
#include "test_env.h"
#include "EthernetInterface.h"
struct s_ip_address
{
struct s_ip_address {
int ip_1;
int ip_2;
int ip_3;
@ -10,6 +10,11 @@ struct s_ip_address
};
int main() {
TEST_TIMEOUT(20);
TEST_HOSTTEST(tcpecho_client_auto);
TEST_DESCRIPTION(TCP echo client);
TEST_START("NET_4");
char buffer[256] = {0};
char out_buffer[] = "Hello World\n";
char out_success[] = "{{success}}\n{{end}}\n";
@ -17,20 +22,20 @@ int main() {
s_ip_address ip_addr = {0, 0, 0, 0};
int port = 0;
printf("TCPCllient waiting for server IP and port...\r\n");
printf("TCPCllient waiting for server IP and port..." NL);
scanf("%d.%d.%d.%d:%d", &ip_addr.ip_1, &ip_addr.ip_2, &ip_addr.ip_3, &ip_addr.ip_4, &port);
printf("Address received:%d.%d.%d.%d:%d\r\n", ip_addr.ip_1, ip_addr.ip_2, ip_addr.ip_3, ip_addr.ip_4, port);
printf("Address received:%d.%d.%d.%d:%d" NL, ip_addr.ip_1, ip_addr.ip_2, ip_addr.ip_3, ip_addr.ip_4, port);
EthernetInterface eth;
eth.init(); //Use DHCP
eth.connect();
printf("TCPClient IP Address is %s\r\n", eth.getIPAddress());
printf("TCPClient IP Address is %s" NL, eth.getIPAddress());
sprintf(buffer, "%d.%d.%d.%d", ip_addr.ip_1, ip_addr.ip_2, ip_addr.ip_3, ip_addr.ip_4);
TCPSocketConnection socket;
while (socket.connect(buffer, port) < 0) {
printf("TCPCllient unable to connect to %s:%d\r\n", buffer, port);
printf("TCPCllient unable to connect to %s:%d" NL, buffer, port);
wait(1);
}

View File

@ -21,8 +21,12 @@ char char_rand() {
return (rand() % ASCII_MAX) + ' ';
}
int main() {
TEST_TIMEOUT(20);
TEST_HOSTTEST(tcpecho_client_auto);
TEST_DESCRIPTION(TCP client echo loop);
TEST_START("NET_13");
char buffer[BUFFER_SIZE] = {0};
char out_buffer[BUFFER_SIZE] = {0};
s_ip_address ip_addr = {0, 0, 0, 0};
@ -70,6 +74,5 @@ int main() {
}
socket.close();
eth.disconnect();
notify_completion(result);
return 0;
TEST_RESULT(result);
}

View File

@ -11,8 +11,7 @@ namespace {
const int MAX_ECHO_LOOPS = 100;
const char ASCII_MAX = '~' - ' ';
struct s_ip_address
{
struct s_ip_address {
int ip_1;
int ip_2;
int ip_3;
@ -24,8 +23,12 @@ char char_rand() {
return (rand() % ASCII_MAX) + ' ';
}
int main() {
TEST_TIMEOUT(20);
TEST_HOSTTEST(udpecho_client_auto);
TEST_DESCRIPTION(UDP echo client);
TEST_START("NET_6");
char buffer[BUFFER_SIZE] = {0};
char out_buffer[BUFFER_SIZE] = {0};
s_ip_address ip_addr = {0, 0, 0, 0};
@ -75,6 +78,5 @@ int main() {
socket.close();
eth.disconnect();
notify_completion(result);
return 0;
TEST_RESULT(result);
}

View File

@ -26,7 +26,8 @@ from echo import EchoTest
from detect_auto import DetectPlatformTest
from tcpecho_server_auto import TCPEchoServerTest
from udpecho_server_auto import UDPEchoServerTest
from tcpecho_client_auto import TCPEchoClientTest
from udpecho_client_auto import UDPEchoClientTest
HOSTREGISTRY = HostRegistry()
HOSTREGISTRY.register_host_test("default", DefaultAuto())
@ -40,6 +41,8 @@ HOSTREGISTRY.register_host_test("echo", EchoTest())
HOSTREGISTRY.register_host_test("detect_auto", DetectPlatformTest())
HOSTREGISTRY.register_host_test("tcpecho_server_auto", TCPEchoServerTest())
HOSTREGISTRY.register_host_test("udpecho_server_auto", UDPEchoServerTest())
HOSTREGISTRY.register_host_test("tcpecho_client_auto", TCPEchoClientTest())
HOSTREGISTRY.register_host_test("udpecho_client_auto", UDPEchoClientTest())
###############################################################################
# Functional interface for test supervisor registry

View File

@ -18,47 +18,11 @@ limitations under the License.
import sys
import socket
from sys import stdout
from host_test import HostTestResults, Test
from SocketServer import BaseRequestHandler, TCPServer
SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
SERVER_PORT = 7
class TCPEchoClientTest(Test):
def __init__(self):
HostTestResults.__init__(self)
Test.__init__(self)
def send_server_ip_port(self, ip_address, port_no):
""" Set up network host. Reset target and and send server IP via serial to Mbed
"""
c = self.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...'
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
self.notify(c.strip())
self.notify("HOST: Sending server IP Address to target...")
connection_str = ip_address + ":" + str(port_no) + "\n"
self.mbed.serial_write(connection_str)
self.notify(connection_str)
# Two more strings about connection should be sent by MBED
for i in range(0, 2):
c = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
self.notify(c.strip())
def test(self):
# Returning none will suppress host test from printing success code
return None
class TCPEchoClient_Handler(BaseRequestHandler):
def handle(self):
""" One handle per connection
@ -78,12 +42,33 @@ class TCPEchoClient_Handler(BaseRequestHandler):
count += 1
stdout.flush()
class TCPEchoClientTest():
def send_server_ip_port(self, selftest, ip_address, port_no):
""" Set up network host. Reset target and and send server IP via serial to Mbed
"""
c = selftest.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...'
if c is None:
self.print_result(selftest.RESULT_IO_SERIAL)
return
server = TCPServer((SERVER_IP, SERVER_PORT), TCPEchoClient_Handler)
print "HOST: Listening for TCP connections: " + SERVER_IP + ":" + str(SERVER_PORT)
selftest.notify(c.strip())
selftest.notify("HOST: Sending server IP Address to target...")
mbed_test = TCPEchoClientTest();
mbed_test.run()
mbed_test.send_server_ip_port(SERVER_IP, SERVER_PORT)
connection_str = ip_address + ":" + str(port_no) + "\n"
selftest.mbed.serial_write(connection_str)
selftest.notify(connection_str)
server.serve_forever()
# Two more strings about connection should be sent by MBED
for i in range(0, 2):
c = selftest.mbed.serial_readline()
if c is None:
selftest.print_result(self.RESULT_IO_SERIAL)
return
selftest.notify(c.strip())
def test(self, selftest):
# Returning none will suppress host test from printing success code
server = TCPServer((SERVER_IP, SERVER_PORT), TCPEchoClient_Handler)
print "HOST: Listening for TCP connections: " + SERVER_IP + ":" + str(SERVER_PORT)
self.send_server_ip_port(selftest, SERVER_IP, SERVER_PORT)
server.serve_forever()

View File

@ -18,42 +18,11 @@ limitations under the License.
import sys
import socket
from sys import stdout
from host_test import HostTestResults, Test
from SocketServer import BaseRequestHandler, UDPServer
SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
SERVER_PORT = 7
class UDPEchoClientTest(Test):
def __init__(self):
HostTestResults.__init__(self)
Test.__init__(self)
def send_server_ip_port(self, ip_address, port_no):
c = self.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
self.notify(c.strip())
self.notify("HOST: Sending server IP Address to target...")
connection_str = ip_address + ":" + str(port_no) + "\n"
self.mbed.serial_write(connection_str)
c = self.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
self.notify(c.strip())
return self.RESULT_PASSIVE
def test(self):
# Returning none will suppress host test from printing success code
return None
class UDPEchoClient_Handler(BaseRequestHandler):
def handle(self):
""" One handle per connection
@ -67,12 +36,29 @@ class UDPEchoClient_Handler(BaseRequestHandler):
sys.stdout.write('.')
stdout.flush()
class UDPEchoClientTest():
server = UDPServer((SERVER_IP, SERVER_PORT), UDPEchoClient_Handler)
print "HOST: Listening for UDP connections..."
def send_server_ip_port(self, selftest, ip_address, port_no):
c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
if c is None:
selftest.print_result(selftest.RESULT_IO_SERIAL)
return
selftest.notify(c.strip())
mbed_test = UDPEchoClientTest();
mbed_test.run()
mbed_test.send_server_ip_port(SERVER_IP, SERVER_PORT)
selftest.notify("HOST: Sending server IP Address to target...")
connection_str = ip_address + ":" + str(port_no) + "\n"
selftest.mbed.serial_write(connection_str)
server.serve_forever()
c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
if c is None:
self.print_result(selftest.RESULT_IO_SERIAL)
return
selftest.notify(c.strip())
return selftest.RESULT_PASSIVE
def test(self, selftest):
# Returning none will suppress host test from printing success code
server = UDPServer((SERVER_IP, SERVER_PORT), UDPEchoClient_Handler)
print "HOST: Listening for UDP connections..."
self.send_server_ip_port(selftest, SERVER_IP, SERVER_PORT)
server.serve_forever()

View File

@ -690,7 +690,7 @@ TESTS = [
"source_dir": join(TEST_DIR, "net", "echo", "tcp_client"),
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY, TEST_MBED_LIB],
"automated": True,
"host_test": "tcpecho_client_auto",
#"host_test": "tcpecho_client_auto",
"peripherals": ["ethernet"]
},
{
@ -706,7 +706,7 @@ TESTS = [
"source_dir": join(TEST_DIR, "net", "echo", "udp_client"),
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY, TEST_MBED_LIB],
"automated": True,
"host_test" : "udpecho_client_auto",
#"host_test" : "udpecho_client_auto",
"peripherals": ["ethernet"],
},
{
@ -754,7 +754,7 @@ TESTS = [
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY, TEST_MBED_LIB],
"automated": True,
"duration": 15,
"host_test": "tcpecho_client_auto",
#"host_test": "tcpecho_client_auto",
"peripherals": ["ethernet"],
},
{