mirror of https://github.com/ARMmbed/mbed-os.git
Refactored UDP echo server
parent
111b55784e
commit
f81509c933
|
@ -1,4 +1,5 @@
|
|||
#include "mbed.h"
|
||||
#include "test_env.h"
|
||||
#include "EthernetInterface.h"
|
||||
|
||||
namespace {
|
||||
|
@ -7,6 +8,11 @@ namespace {
|
|||
}
|
||||
|
||||
int main (void) {
|
||||
TEST_TIMEOUT(20);
|
||||
TEST_HOSTTEST(udpecho_server_auto);
|
||||
TEST_DESCRIPTION(UDP echo server);
|
||||
TEST_START("NET_5");
|
||||
|
||||
EthernetInterface eth;
|
||||
eth.init(); //Use DHCP
|
||||
eth.connect();
|
||||
|
|
|
@ -25,6 +25,7 @@ from rtc_auto import RTCTest
|
|||
from echo import EchoTest
|
||||
from detect_auto import DetectPlatformTest
|
||||
from tcpecho_server_auto import TCPEchoServerTest
|
||||
from udpecho_server_auto import UDPEchoServerTest
|
||||
|
||||
|
||||
HOSTREGISTRY = HostRegistry()
|
||||
|
@ -38,6 +39,7 @@ HOSTREGISTRY.register_host_test("rtc_auto", RTCTest())
|
|||
HOSTREGISTRY.register_host_test("echo", EchoTest())
|
||||
HOSTREGISTRY.register_host_test("detect_auto", DetectPlatformTest())
|
||||
HOSTREGISTRY.register_host_test("tcpecho_server_auto", TCPEchoServerTest())
|
||||
HOSTREGISTRY.register_host_test("udpecho_server_auto", UDPEchoServerTest())
|
||||
|
||||
###############################################################################
|
||||
# Functional interface for test supervisor registry
|
||||
|
|
|
@ -19,11 +19,9 @@ import re
|
|||
import sys
|
||||
import uuid
|
||||
from sys import stdout
|
||||
from host_test import DefaultTest
|
||||
from socket import socket, AF_INET, SOCK_DGRAM
|
||||
|
||||
|
||||
class UDPEchoServerTest(DefaultTest):
|
||||
class UDPEchoServerTest():
|
||||
ECHO_SERVER_ADDRESS = ""
|
||||
ECHO_PORT = 0
|
||||
s = None # Socket
|
||||
|
@ -31,26 +29,26 @@ class UDPEchoServerTest(DefaultTest):
|
|||
PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
|
||||
re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
|
||||
|
||||
def test(self):
|
||||
def test(self, selftest):
|
||||
result = True
|
||||
serial_ip_msg = self.mbed.serial_readline()
|
||||
serial_ip_msg = selftest.mbed.serial_readline()
|
||||
if serial_ip_msg is None:
|
||||
return self.RESULT_IO_SERIAL
|
||||
self.notify(serial_ip_msg)
|
||||
return selftest.RESULT_IO_SERIAL
|
||||
selftest.notify(serial_ip_msg)
|
||||
# Searching for IP address and port prompted by server
|
||||
m = self.re_detect_server_ip.search(serial_ip_msg)
|
||||
if m and len(m.groups()):
|
||||
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
|
||||
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
|
||||
self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
|
||||
selftest.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
|
||||
|
||||
# We assume this test fails so can't send 'error' message to server
|
||||
try:
|
||||
self.s = socket(AF_INET, SOCK_DGRAM)
|
||||
except Exception, e:
|
||||
self.s = None
|
||||
self.notify("HOST: Socket error: %s"% e)
|
||||
return self.RESULT_ERROR
|
||||
selftest.notify("HOST: Socket error: %s"% e)
|
||||
return selftest.RESULT_ERROR
|
||||
|
||||
for i in range(0, 100):
|
||||
TEST_STRING = str(uuid.uuid4())
|
||||
|
@ -67,8 +65,4 @@ class UDPEchoServerTest(DefaultTest):
|
|||
|
||||
if self.s is not None:
|
||||
self.s.close()
|
||||
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
UDPEchoServerTest().run()
|
||||
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE
|
||||
|
|
|
@ -698,7 +698,7 @@ TESTS = [
|
|||
"source_dir": join(TEST_DIR, "net", "echo", "udp_server"),
|
||||
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY, TEST_MBED_LIB],
|
||||
"automated": True,
|
||||
"host_test" : "udpecho_server_auto",
|
||||
#"host_test" : "udpecho_server_auto",
|
||||
"peripherals": ["ethernet"]
|
||||
},
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue