mirror of https://github.com/ARMmbed/mbed-os.git
Added automation tests' implementation for TCP_Server, UDP_Server and STDIO
parent
53ce32875f
commit
34e19e31e5
|
@ -5,19 +5,11 @@
|
|||
* In the past we had an issue where the stdio retargeting was not linked in.
|
||||
*/
|
||||
|
||||
int main() {
|
||||
int Value = -1;
|
||||
|
||||
printf("\r\n\r\nGCC4MBED Test Suite\r\n");
|
||||
printf("Standard I/O Unit Tests\r\n");
|
||||
|
||||
printf("Test 1: printf() test\r\n");
|
||||
|
||||
printf("Test 2: scanf() test\r\n");
|
||||
printf(" Type number and press Enter: \n");
|
||||
/*
|
||||
scanf("%d", &Value);
|
||||
printf("\n Your value was: %d\r\n", Value);
|
||||
|
||||
fprintf(stdout, "Test 3: fprintf(stdout, ...) test\r\n");
|
||||
fprintf(stderr, "Test 4: fprintf(stderr, ...) test\r\n");
|
||||
fscanf(stdin, "%d", &Value);
|
||||
fprintf(stdout, "Test 3: fprintf(stdout, ...) test\r\n");
|
||||
|
||||
fprintf(stderr, "Test 4: fprintf(stderr, ...) test\r\n");
|
||||
|
@ -28,4 +20,26 @@ int main() {
|
|||
printf("\n Your value was: %d\r\n", Value);
|
||||
|
||||
printf("Test complete\r\n");
|
||||
|
||||
*/
|
||||
|
||||
int main() {
|
||||
union {
|
||||
int value_int;
|
||||
};
|
||||
|
||||
while (true)
|
||||
{
|
||||
// SCANFm PRINTF family
|
||||
value_int = 0;
|
||||
scanf("%d", &value_int);
|
||||
printf("Your value was: %d\r\n", value_int);
|
||||
|
||||
// FSCANF, FPRINTF family
|
||||
value_int = 0;
|
||||
fscanf(stdin, "%d", &value_int);
|
||||
fprintf(stdout, "Your value was: %d\r\n", value_int);
|
||||
|
||||
//...
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,34 +1,38 @@
|
|||
#include "mbed.h"
|
||||
#include "EthernetInterface.h"
|
||||
|
||||
#define ECHO_SERVER_PORT 7
|
||||
namespace {
|
||||
const int ECHO_SERVER_PORT = 7;
|
||||
const int BUFFER_SIZE = 256;
|
||||
}
|
||||
|
||||
int main (void) {
|
||||
EthernetInterface eth;
|
||||
eth.init(); //Use DHCP
|
||||
eth.connect();
|
||||
printf("IP Address is %s\n", eth.getIPAddress());
|
||||
|
||||
printf("Server IP Address is %s:%d\n", eth.getIPAddress(), ECHO_SERVER_PORT);
|
||||
|
||||
TCPSocketServer server;
|
||||
server.bind(ECHO_SERVER_PORT);
|
||||
server.listen();
|
||||
|
||||
|
||||
while (true) {
|
||||
printf("\nWait for new connection...\n");
|
||||
printf("Wait for new connection...\n");
|
||||
TCPSocketConnection client;
|
||||
server.accept(client);
|
||||
client.set_blocking(false, 1500); // Timeout after (1.5)s
|
||||
|
||||
printf("Connection from: %s\n", client.get_address());
|
||||
char buffer[256];
|
||||
|
||||
while (true) {
|
||||
char buffer[BUFFER_SIZE] = {0};
|
||||
int n = client.receive(buffer, sizeof(buffer));
|
||||
if (n <= 0) break;
|
||||
|
||||
const int buffer_string_end_index = n >= BUFFER_SIZE ? BUFFER_SIZE-1 : n;
|
||||
buffer[buffer_string_end_index] = '\0';
|
||||
printf("Server received: %s\n", buffer);
|
||||
client.send_all(buffer, n);
|
||||
if (n <= 0) break;
|
||||
}
|
||||
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,24 +1,31 @@
|
|||
#include "mbed.h"
|
||||
#include "EthernetInterface.h"
|
||||
|
||||
#define ECHO_SERVER_PORT 7
|
||||
namespace {
|
||||
const int ECHO_SERVER_PORT = 7;
|
||||
const int BUFFER_SIZE = 256;
|
||||
}
|
||||
|
||||
int main (void) {
|
||||
EthernetInterface eth;
|
||||
eth.init(); //Use DHCP
|
||||
eth.connect();
|
||||
printf("IP Address is %s\n", eth.getIPAddress());
|
||||
printf("Server IP Address is %s:%d\n", eth.getIPAddress(), ECHO_SERVER_PORT);
|
||||
|
||||
UDPSocket server;
|
||||
server.bind(ECHO_SERVER_PORT);
|
||||
|
||||
Endpoint client;
|
||||
char buffer[256];
|
||||
char buffer[BUFFER_SIZE] = {0};
|
||||
while (true) {
|
||||
printf("\nWait for packet...\n");
|
||||
printf("Wait for packet...\n");
|
||||
int n = server.receiveFrom(client, buffer, sizeof(buffer));
|
||||
|
||||
printf("Received packet from: %s\n", client.get_address());
|
||||
server.sendTo(client, buffer, n);
|
||||
if (n > 0) {
|
||||
printf("Received packet from: %s\n", client.get_address());
|
||||
const int buffer_string_end_index = n >= BUFFER_SIZE ? BUFFER_SIZE-1 : n;
|
||||
buffer[buffer_string_end_index] = '\0';
|
||||
printf("Server received: %s\n", buffer);
|
||||
server.sendTo(client, buffer, n);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
"""
|
||||
mbed SDK
|
||||
Copyright (c) 2011-2013 ARM Limited
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
|
||||
import random
|
||||
import re
|
||||
from host_test import Test, DefaultTest
|
||||
from time import time
|
||||
from sys import stdout
|
||||
|
||||
class StdioTest(DefaultTest):
|
||||
|
||||
pattern_int_value = "^Your value was: (-?\d+)"
|
||||
re_detect_int_value = re.compile(pattern_int_value)
|
||||
|
||||
def print_result(self, result):
|
||||
print "\n{%s}\n{end}" % result
|
||||
|
||||
def run(self):
|
||||
|
||||
test_result = True
|
||||
|
||||
for i in range(1, 5):
|
||||
random_integer = random.randint(-10000, 10000)
|
||||
print "Generated number: " + str(random_integer)
|
||||
self.mbed.serial.write(str(random_integer) + "\n")
|
||||
serial_stdio_msg = ""
|
||||
|
||||
ip_msg_timeout = self.mbed.options.timeout
|
||||
start_serial_pool = time();
|
||||
while (time() - start_serial_pool) < ip_msg_timeout:
|
||||
c = self.mbed.serial.read(512)
|
||||
stdout.write(c)
|
||||
stdout.flush()
|
||||
serial_stdio_msg += c
|
||||
# Searching for reply with scanned values
|
||||
m = self.re_detect_int_value.search(serial_stdio_msg)
|
||||
if m and len(m.groups()):
|
||||
duration = time() - start_serial_pool
|
||||
print "Number: " + str(m.groups()[0])
|
||||
test_result = test_result and (random_integer == int(m.groups()[0]))
|
||||
stdout.flush()
|
||||
break
|
||||
else:
|
||||
print "Error: No IP and port information sent from server"
|
||||
self.print_result('error')
|
||||
exit(-2)
|
||||
|
||||
if test_result: # All numbers are the same
|
||||
self.print_result('success')
|
||||
else:
|
||||
self.print_result('failure')
|
||||
stdout.flush()
|
||||
|
||||
if __name__ == '__main__':
|
||||
StdioTest().run()
|
|
@ -0,0 +1,90 @@
|
|||
"""
|
||||
mbed SDK
|
||||
Copyright (c) 2011-2013 ARM Limited
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
|
||||
import socket
|
||||
import re
|
||||
from host_test import Test, DefaultTest
|
||||
from time import time
|
||||
from sys import stdout
|
||||
|
||||
class TCPEchoServerTest(DefaultTest):
|
||||
ECHO_SERVER_ADDRESS = ""
|
||||
ECHO_PORT = 0
|
||||
s = None # Socket
|
||||
|
||||
pattern_server_ip = "^Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
|
||||
re_detect_server_ip = re.compile(pattern_server_ip)
|
||||
|
||||
def print_result(self, result):
|
||||
print "\n{%s}\n{end}" % result
|
||||
|
||||
def run(self):
|
||||
ip_msg_timeout = self.mbed.options.timeout
|
||||
serial_ip_msg = ""
|
||||
start_serial_pool = time();
|
||||
while (time() - start_serial_pool) < ip_msg_timeout:
|
||||
c = self.mbed.serial.read(512)
|
||||
stdout.write(c)
|
||||
stdout.flush()
|
||||
serial_ip_msg += c
|
||||
# Searching for IP address and port prompted by server
|
||||
m = self.re_detect_server_ip.search(serial_ip_msg)
|
||||
if m and len(m.groups()):
|
||||
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
|
||||
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
|
||||
duration = time() - start_serial_pool
|
||||
print "TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT) + " after " + "%.2f" % duration + " sec"
|
||||
stdout.flush()
|
||||
break
|
||||
else:
|
||||
print "Error: No IP and port information sent from server"
|
||||
self.print_result('error')
|
||||
exit(-2)
|
||||
|
||||
# We assume this test fails so can't send 'error' message to server
|
||||
try:
|
||||
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.s.connect((self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
|
||||
except Exception, e:
|
||||
print "Error: %s" % e
|
||||
self.print_result('error')
|
||||
exit(-1)
|
||||
|
||||
TEST_STRING = 'Hello, world !!!'
|
||||
self.s.sendall(TEST_STRING)
|
||||
|
||||
data = self.s.recv(1024)
|
||||
received_str = repr(data)[1:-1]
|
||||
|
||||
if TEST_STRING == received_str: # We need to cut not needed single quotes from the string
|
||||
print 'Received data: ' + received_str
|
||||
self.print_result('success')
|
||||
else:
|
||||
self.print_result('failure')
|
||||
self.s.close()
|
||||
|
||||
# Receiving
|
||||
try:
|
||||
while True:
|
||||
c = self.mbed.serial.read(512)
|
||||
stdout.write(c)
|
||||
stdout.flush()
|
||||
except KeyboardInterrupt, _:
|
||||
print "\n[CTRL+c] exit"
|
||||
|
||||
if __name__ == '__main__':
|
||||
TCPEchoServerTest().run()
|
|
@ -0,0 +1,89 @@
|
|||
"""
|
||||
mbed SDK
|
||||
Copyright (c) 2011-2013 ARM Limited
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
|
||||
from socket import socket, AF_INET, SOCK_DGRAM
|
||||
import re
|
||||
from host_test import Test, DefaultTest
|
||||
from time import time
|
||||
from sys import stdout
|
||||
|
||||
class UDPEchoServerTest(DefaultTest):
|
||||
ECHO_SERVER_ADDRESS = ""
|
||||
ECHO_PORT = 0
|
||||
s = None # Socket
|
||||
|
||||
pattern_server_ip = "^Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
|
||||
re_detect_server_ip = re.compile(pattern_server_ip)
|
||||
|
||||
def print_result(self, result):
|
||||
print "\n{%s}\n{end}" % result
|
||||
|
||||
def run(self):
|
||||
ip_msg_timeout = self.mbed.options.timeout
|
||||
serial_ip_msg = ""
|
||||
start_serial_pool = time();
|
||||
while (time() - start_serial_pool) < ip_msg_timeout:
|
||||
c = self.mbed.serial.read(512)
|
||||
stdout.write(c)
|
||||
stdout.flush()
|
||||
serial_ip_msg += c
|
||||
# Searching for IP address and port prompted by server
|
||||
m = self.re_detect_server_ip.search(serial_ip_msg)
|
||||
if m and len(m.groups()):
|
||||
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
|
||||
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
|
||||
duration = time() - start_serial_pool
|
||||
print "UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT) + " after " + "%.2f" % duration + " sec"
|
||||
stdout.flush()
|
||||
break
|
||||
else:
|
||||
print "Error: No IP and port information sent from server"
|
||||
self.print_result('error')
|
||||
exit(-2)
|
||||
|
||||
# We assume this test fails so can't send 'error' message to server
|
||||
try:
|
||||
self.s = socket(AF_INET, SOCK_DGRAM)
|
||||
except Exception, e:
|
||||
print "Error: %s" % e
|
||||
self.print_result('error')
|
||||
exit(-1)
|
||||
|
||||
TEST_STRING = 'Hello, world !!!'
|
||||
self.s.sendto(TEST_STRING, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
|
||||
|
||||
data = self.s.recv(len(TEST_STRING))
|
||||
received_str = repr(data)[1:-1]
|
||||
|
||||
if TEST_STRING == received_str: # We need to cut not needed single quotes from the string
|
||||
print 'Received data: ' + received_str
|
||||
self.print_result('success')
|
||||
else:
|
||||
self.print_result('failure')
|
||||
self.s.close()
|
||||
|
||||
# Receiving
|
||||
try:
|
||||
while True:
|
||||
c = self.mbed.serial.read(512)
|
||||
stdout.write(c)
|
||||
stdout.flush()
|
||||
except KeyboardInterrupt, _:
|
||||
print "\n[CTRL+c] exit"
|
||||
|
||||
if __name__ == '__main__':
|
||||
UDPEchoServerTest().run()
|
|
@ -249,7 +249,6 @@ class SingleTestRunner(object):
|
|||
return (test_result, target_name, toolchain_name,
|
||||
test_id, test_description, round(elapsed_time, 2), duration)
|
||||
|
||||
|
||||
def run_host_test(self, name, disk, port, duration, extra_serial=""):
|
||||
# print "{%s} port:%s disk:%s" % (name, port, disk),
|
||||
cmd = ["python", "%s.py" % name, '-p', port, '-d', disk, '-t', str(duration), "-e", extra_serial]
|
||||
|
|
Loading…
Reference in New Issue