mirror of https://github.com/ARMmbed/mbed-os.git
pyFlakes issues fixed. Simple refactoring
parent
3ce8116fb2
commit
7a5d00bd65
|
@ -23,10 +23,9 @@ udp_link_layer_auto.py -p COM20 -d E:\ -t 10
|
|||
|
||||
import thread
|
||||
from SocketServer import BaseRequestHandler, UDPServer
|
||||
# from socket import socket, AF_INET, SOCK_DGRAM
|
||||
import socket
|
||||
import re
|
||||
from host_test import Test, DefaultTest
|
||||
from host_test import DefaultTest
|
||||
from time import time, sleep
|
||||
from sys import stdout
|
||||
|
||||
|
@ -41,13 +40,10 @@ dict_udp_sent_datagrams = dict()
|
|||
class UDPEchoClient_Handler(BaseRequestHandler):
|
||||
def handle(self):
|
||||
""" One handle per connection """
|
||||
data, socket = self.request
|
||||
_data, _socket = self.request
|
||||
# Process received datagram
|
||||
data_str = repr(data)[1:-1]
|
||||
data_str = repr(_data)[1:-1]
|
||||
dict_udp_recv_datagrams[data_str] = time()
|
||||
# print "[UDP_COUNTER] [" + data_str + "]"
|
||||
# print ".",
|
||||
|
||||
|
||||
|
||||
def udp_packet_recv(threadName, server_ip, server_port):
|
||||
|
@ -61,16 +57,13 @@ class UDPEchoServerTest(DefaultTest):
|
|||
ECHO_SERVER_ADDRESS = "" # UDP IP of datagram bursts
|
||||
ECHO_PORT = 0 # UDP port for datagram bursts
|
||||
CONTROL_PORT = 23 # TCP port used to get stats from mbed device, e.g. counters
|
||||
s = None # Socket
|
||||
s = None # Socket
|
||||
|
||||
TEST_PACKET_COUNT = 1000 # how many packets should be send
|
||||
TEST_STRESS_FACTOR = 0.001 # stress factor: 10 ms
|
||||
|
||||
pattern_server_ip = "^Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
|
||||
re_detect_server_ip = re.compile(pattern_server_ip)
|
||||
|
||||
def print_result(self, result):
|
||||
print "\n{%s}\n{end}" % result
|
||||
PATTERN_SERVER_IP = "^Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
|
||||
re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
|
||||
|
||||
def get_control_data(self, command="stat\n"):
|
||||
BUFFER_SIZE = 256
|
||||
|
@ -84,7 +77,7 @@ class UDPEchoServerTest(DefaultTest):
|
|||
def run(self):
|
||||
ip_msg_timeout = self.mbed.options.timeout
|
||||
serial_ip_msg = ""
|
||||
start_serial_pool = time();
|
||||
start_serial_pool = time()
|
||||
while (time() - start_serial_pool) < ip_msg_timeout:
|
||||
c = self.mbed.serial.read(512)
|
||||
stdout.write(c)
|
||||
|
@ -115,7 +108,7 @@ class UDPEchoServerTest(DefaultTest):
|
|||
# UDP replied receiver works in background to get echoed datagrams
|
||||
SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
|
||||
SERVER_PORT = self.ECHO_PORT + 1
|
||||
thread.start_new_thread( udp_packet_recv, ("Thread-udp-recv", SERVER_IP, SERVER_PORT) )
|
||||
thread.start_new_thread(udp_packet_recv, ("Thread-udp-recv", SERVER_IP, SERVER_PORT))
|
||||
sleep(0.5)
|
||||
|
||||
# Burst part
|
||||
|
|
Loading…
Reference in New Issue