mirror of https://github.com/ARMmbed/mbed-os.git
Refactored TCP echo server
parent
9fc02738ac
commit
111b55784e
|
@ -1,4 +1,5 @@
|
|||
#include "mbed.h"
|
||||
#include "test_env.h"
|
||||
#include "EthernetInterface.h"
|
||||
|
||||
namespace {
|
||||
|
@ -7,22 +8,27 @@ namespace {
|
|||
}
|
||||
|
||||
int main (void) {
|
||||
TEST_TIMEOUT(20);
|
||||
TEST_HOSTTEST(tcpecho_server_auto);
|
||||
TEST_DESCRIPTION(TCP echo server);
|
||||
TEST_START("NET_3");
|
||||
|
||||
char buffer[BUFFER_SIZE] = {0};
|
||||
EthernetInterface eth;
|
||||
eth.init(); //Use DHCP
|
||||
eth.connect();
|
||||
printf("MBED: Server IP Address is %s:%d\r\n", eth.getIPAddress(), ECHO_SERVER_PORT);
|
||||
printf("MBED: Server IP Address is %s:%d" NL, eth.getIPAddress(), ECHO_SERVER_PORT);
|
||||
|
||||
TCPSocketServer server;
|
||||
server.bind(ECHO_SERVER_PORT);
|
||||
server.listen();
|
||||
|
||||
while (true) {
|
||||
printf("MBED: Wait for new connection...\n");
|
||||
printf("MBED: Wait for new connection..." NL);
|
||||
TCPSocketConnection client;
|
||||
server.accept(client);
|
||||
client.set_blocking(false, 1500); // Timeout after (1.5)s
|
||||
printf("MBED: Connection from: %s\r\n", client.get_address());
|
||||
printf("MBED: Connection from: %s" NL, client.get_address());
|
||||
|
||||
while (true) {
|
||||
const int n = client.receive(buffer, sizeof(buffer));
|
||||
|
|
|
@ -24,6 +24,7 @@ from dev_null_auto import DevNullTest
|
|||
from rtc_auto import RTCTest
|
||||
from echo import EchoTest
|
||||
from detect_auto import DetectPlatformTest
|
||||
from tcpecho_server_auto import TCPEchoServerTest
|
||||
|
||||
|
||||
HOSTREGISTRY = HostRegistry()
|
||||
|
@ -36,6 +37,7 @@ HOSTREGISTRY.register_host_test("dev_null_auto", DevNullTest())
|
|||
HOSTREGISTRY.register_host_test("rtc_auto", RTCTest())
|
||||
HOSTREGISTRY.register_host_test("echo", EchoTest())
|
||||
HOSTREGISTRY.register_host_test("detect_auto", DetectPlatformTest())
|
||||
HOSTREGISTRY.register_host_test("tcpecho_server_auto", TCPEchoServerTest())
|
||||
|
||||
###############################################################################
|
||||
# Functional interface for test supervisor registry
|
||||
|
|
|
@ -20,10 +20,8 @@ import sys
|
|||
import uuid
|
||||
import socket
|
||||
from sys import stdout
|
||||
from host_test import DefaultTest
|
||||
|
||||
|
||||
class TCPEchoServerTest(DefaultTest):
|
||||
class TCPEchoServerTest():
|
||||
ECHO_SERVER_ADDRESS = ""
|
||||
ECHO_PORT = 0
|
||||
ECHO_LOOPs = 100
|
||||
|
@ -32,18 +30,18 @@ class TCPEchoServerTest(DefaultTest):
|
|||
PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
|
||||
re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
|
||||
|
||||
def test(self):
|
||||
def test(self, selftest):
|
||||
result = False
|
||||
c = self.mbed.serial_readline()
|
||||
c = selftest.mbed.serial_readline()
|
||||
if c is None:
|
||||
return self.RESULT_IO_SERIAL
|
||||
self.notify(c)
|
||||
return selftest.RESULT_IO_SERIAL
|
||||
selftest.notify(c)
|
||||
|
||||
m = self.re_detect_server_ip.search(c)
|
||||
if m and len(m.groups()):
|
||||
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
|
||||
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
|
||||
self.notify("HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
|
||||
selftest.notify("HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
|
||||
|
||||
# We assume this test fails so can't send 'error' message to server
|
||||
try:
|
||||
|
@ -51,8 +49,8 @@ class TCPEchoServerTest(DefaultTest):
|
|||
self.s.connect((self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
|
||||
except Exception, e:
|
||||
self.s = None
|
||||
self.notify("HOST: Socket error: %s"% e)
|
||||
return self.RESULT_ERROR
|
||||
selftest.notify("HOST: Socket error: %s"% e)
|
||||
return selftest.RESULT_ERROR
|
||||
|
||||
print 'HOST: Sending %d echo strings...'% self.ECHO_LOOPs,
|
||||
for i in range(0, self.ECHO_LOOPs):
|
||||
|
@ -62,8 +60,8 @@ class TCPEchoServerTest(DefaultTest):
|
|||
data = self.s.recv(128)
|
||||
except Exception, e:
|
||||
self.s = None
|
||||
self.notify("HOST: Socket error: %s"% e)
|
||||
return self.RESULT_ERROR
|
||||
selftest.notify("HOST: Socket error: %s"% e)
|
||||
return selftest.RESULT_ERROR
|
||||
|
||||
received_str = repr(data)[1:-1]
|
||||
if TEST_STRING == received_str: # We need to cut not needed single quotes from the string
|
||||
|
@ -81,10 +79,6 @@ class TCPEchoServerTest(DefaultTest):
|
|||
if self.s is not None:
|
||||
self.s.close()
|
||||
else:
|
||||
self.notify("HOST: TCP Server not found")
|
||||
selftest.notify("HOST: TCP Server not found")
|
||||
result = False
|
||||
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
TCPEchoServerTest().run()
|
||||
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE
|
||||
|
|
|
@ -680,9 +680,9 @@ TESTS = [
|
|||
{
|
||||
"id": "NET_3", "description": "TCP echo server",
|
||||
"source_dir": join(TEST_DIR, "net", "echo", "tcp_server"),
|
||||
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY],
|
||||
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY, TEST_MBED_LIB],
|
||||
"automated": True,
|
||||
"host_test" : "tcpecho_server_auto",
|
||||
#"host_test" : "tcpecho_server_auto",
|
||||
"peripherals": ["ethernet"],
|
||||
},
|
||||
{
|
||||
|
@ -696,7 +696,7 @@ TESTS = [
|
|||
{
|
||||
"id": "NET_5", "description": "UDP echo server",
|
||||
"source_dir": join(TEST_DIR, "net", "echo", "udp_server"),
|
||||
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY],
|
||||
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY, TEST_MBED_LIB],
|
||||
"automated": True,
|
||||
"host_test" : "udpecho_server_auto",
|
||||
"peripherals": ["ethernet"]
|
||||
|
|
Loading…
Reference in New Issue