Merge pull request #11921 from madchutney/tools/py3-fixes

Updates to tools for Python 3 compatibility
pull/12019/head
Martin Kojtal 2019-11-27 16:29:50 +01:00 committed by GitHub
commit 57f9a1eea8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 144 additions and 123 deletions

View File

@ -115,11 +115,11 @@ def wrap_init(func):
# Get the currently in-use API key (may come from environment or # Get the currently in-use API key (may come from environment or
# configuration files, which is handled by the cloud SDK) # configuration files, which is handled by the cloud SDK)
api_key_value = accounts.config.get("api_key") api_key_value = accounts.config.get("api_key")
api_key = accounts.list_api_keys( api_key = next(accounts.list_api_keys(
filter={ filter={
"key": api_key_value "key": api_key_value
} }
).next() ))
certificates_owned = list(certs.list_certificates()) certificates_owned = list(certs.list_certificates())
dev_cert_info = None dev_cert_info = None
for certif in certificates_owned: for certif in certificates_owned:

View File

@ -28,6 +28,7 @@ import copy
from tools.targets import TARGET_MAP from tools.targets import TARGET_MAP
from tools.utils import mkdir from tools.utils import mkdir
from tools.resources import FileType, FileRef from tools.resources import FileType, FileRef
from future.utils import with_metaclass
"""Just a template for subclassing""" """Just a template for subclassing"""
@ -57,14 +58,13 @@ def deprecated_exporter(CLS):
CLS.NAME = "%s (DEPRECATED)" % old_name CLS.NAME = "%s (DEPRECATED)" % old_name
return CLS return CLS
class Exporter(object): class Exporter(with_metaclass(ABCMeta, object)):
"""Exporter base class """Exporter base class
This class is meant to be extended by individual exporters, and provides a This class is meant to be extended by individual exporters, and provides a
few helper methods for implementing an exporter with either jinja2 or few helper methods for implementing an exporter with either jinja2 or
progen. progen.
""" """
__metaclass__ = ABCMeta
TEMPLATE_DIR = dirname(__file__) TEMPLATE_DIR = dirname(__file__)
DOT_IN_RELATIVE_PATH = False DOT_IN_RELATIVE_PATH = False
NAME = None NAME = None

View File

@ -134,7 +134,7 @@ class PackFlashAlgo(object):
blob = self.algo_data[:] blob = self.algo_data[:]
pad_size = 0 if len(blob) % 4 == 0 else 4 - len(blob) % 4 pad_size = 0 if len(blob) % 4 == 0 else 4 - len(blob) % 4
blob = blob + "\x00" * pad_size blob = blob + "\x00" * pad_size
integer_list = struct.unpack("<" + "L" * (len(blob) / 4), blob) integer_list = struct.unpack("<" + "L" * (len(blob) // 4), blob)
line_list = [] line_list = []
for pos in range(0, len(integer_list), group_size): for pos in range(0, len(integer_list), group_size):
group = ["0x%08x" % value for value in group = ["0x%08x" % value for value in

View File

@ -18,7 +18,7 @@ from __future__ import print_function
from sys import stdout from sys import stdout
class DefaultAuto(): class DefaultAuto(object):
""" Simple, basic host test's test runner waiting for serial port """ Simple, basic host test's test runner waiting for serial port
output from MUT, no supervision over test running in MUT is executed. output from MUT, no supervision over test running in MUT is executed.
""" """

View File

@ -17,7 +17,7 @@ limitations under the License.
import re import re
class DetectPlatformTest(): class DetectPlatformTest(object):
PATTERN_MICRO_NAME = "Target '(\w+)'" PATTERN_MICRO_NAME = "Target '(\w+)'"
re_detect_micro_name = re.compile(PATTERN_MICRO_NAME) re_detect_micro_name = re.compile(PATTERN_MICRO_NAME)

View File

@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
class DevNullTest(): class DevNullTest(object):
def check_readline(self, selftest, text): def check_readline(self, selftest, text):
""" Reads line from serial port and checks if text was part of read string """ Reads line from serial port and checks if text was part of read string

View File

@ -19,7 +19,7 @@ import sys
import uuid import uuid
from sys import stdout from sys import stdout
class EchoTest(): class EchoTest(object):
# Test parameters # Test parameters
TEST_SERIAL_BAUDRATE = 115200 TEST_SERIAL_BAUDRATE = 115200

View File

@ -14,7 +14,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from host_test import Test from .host_test import Test
class EchoTest(Test): class EchoTest(Test):

View File

@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
class HelloTest(): class HelloTest(object):
HELLO_WORLD = "Hello World" HELLO_WORLD = "Hello World"
def test(self, selftest): def test(self, selftest):

View File

@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
class HostRegistry: class HostRegistry(object):
""" Class stores registry with host tests and objects representing them """ Class stores registry with host tests and objects representing them
""" """
HOST_TESTS = {} # host_test_name -> host_test_ojbect HOST_TESTS = {} # host_test_name -> host_test_ojbect

View File

@ -18,8 +18,8 @@ limitations under the License.
# Check if 'serial' module is installed # Check if 'serial' module is installed
try: try:
from serial import Serial from serial import Serial
except ImportError, e: except ImportError as e:
print "Error: Can't import 'serial' module: %s"% e print("Error: Can't import 'serial' module: %s"% e)
exit(-1) exit(-1)
import os import os
@ -29,7 +29,7 @@ from sys import stdout
from time import sleep, time from time import sleep, time
from optparse import OptionParser from optparse import OptionParser
import host_tests_plugins from . import host_tests_plugins
# This is a little tricky. We need to add upper directory to path so # This is a little tricky. We need to add upper directory to path so
# we can find packages we want from the same level as other files do # we can find packages we want from the same level as other files do
@ -39,7 +39,7 @@ from tools.test_api import get_autodetected_MUTS_list
from tools.test_api import get_module_avail from tools.test_api import get_module_avail
class Mbed: class Mbed(object):
""" Base class for a host driven test """ Base class for a host driven test
""" """
def __init__(self): def __init__(self):
@ -117,7 +117,7 @@ class Mbed:
self.serial_timeout = 1 self.serial_timeout = 1
self.timeout = self.DEFAULT_TOUT if self.options.timeout is None else self.options.timeout self.timeout = self.DEFAULT_TOUT if self.options.timeout is None else self.options.timeout
print 'MBED: Instrumentation: "%s" and disk: "%s"' % (self.port, self.disk) print('MBED: Instrumentation: "%s" and disk: "%s"' % (self.port, self.disk))
def init_serial_params(self, serial_baud=9600, serial_timeout=1): def init_serial_params(self, serial_baud=9600, serial_timeout=1):
""" Initialize port parameters. """ Initialize port parameters.
@ -183,11 +183,11 @@ class Mbed:
stdout.write('.') stdout.write('.')
stdout.flush() stdout.flush()
else: else:
print "...port ready!" print("...port ready!")
result = True result = True
break break
if not result and last_error: if not result and last_error:
print last_error print(last_error)
return result return result
def set_serial_timeout(self, timeout): def set_serial_timeout(self, timeout):
@ -221,7 +221,7 @@ class Mbed:
c = self.serial.read(1) c = self.serial.read(1)
result += c result += c
except Exception as e: except Exception as e:
print "MBED: %s"% str(e) print("MBED: %s"% str(e))
result = None result = None
break break
if c == '\n': if c == '\n':
@ -298,7 +298,7 @@ class Mbed:
return result return result
class HostTestResults: class HostTestResults(object):
""" Test results set by host tests """ Test results set by host tests
""" """
def __init__(self): def __init__(self):
@ -389,8 +389,8 @@ class Test(HostTestResults):
self.print_result(result) self.print_result(result)
else: else:
self.notify("HOST: Passive mode...") self.notify("HOST: Passive mode...")
except Exception, e: except Exception as e:
print str(e) print(str(e))
self.print_result(self.RESULT_ERROR) self.print_result(self.RESULT_ERROR)
def setup(self): def setup(self):
@ -406,7 +406,7 @@ class Test(HostTestResults):
def notify(self, message): def notify(self, message):
""" On screen notification function """ On screen notification function
""" """
print message print(message)
stdout.flush() stdout.flush()
def print_result(self, result): def print_result(self, result):

View File

@ -27,10 +27,12 @@
# >myled.write(1) # >myled.write(1)
# > # >
import serial, urllib2, time from future import standard_library
standard_library.install_aliases()
import serial, urllib.request, time
# mbed super class # mbed super class
class mbed: class mbed(object):
def __init__(self): def __init__(self):
print("This will work as a demo but no transport mechanism has been selected") print("This will work as a demo but no transport mechanism has been selected")
@ -48,7 +50,7 @@ class SerialRPC(mbed):
# creates the command to be sent serially - /name/method arg1 arg2 arg3 ... argN # creates the command to be sent serially - /name/method arg1 arg2 arg3 ... argN
str = "/" + name + "/" + method + " " + " ".join(args) + "\n" str = "/" + name + "/" + method + " " + " ".join(args) + "\n"
# prints the command being executed # prints the command being executed
print str print(str)
# writes the command to serial # writes the command to serial
self.ser.write(str) self.ser.write(str)
# strips trailing characters from the line just written # strips trailing characters from the line just written
@ -61,12 +63,12 @@ class HTTPRPC(mbed):
self.host = "http://" + ip self.host = "http://" + ip
def rpc(self, name, method, args): def rpc(self, name, method, args):
response = urllib2.urlopen(self.host + "/rpc/" + name + "/" + method + "%20" + "%20".join(args)) response = urllib.request.urlopen(self.host + "/rpc/" + name + "/" + method + "%20" + "%20".join(args))
return response.read().strip() return response.read().strip()
# generic mbed interface super class # generic mbed interface super class
class mbed_interface(): class mbed_interface(object):
# initialize an mbed interface with a transport mechanism and pin name # initialize an mbed interface with a transport mechanism and pin name
def __init__(self, this_mbed, mpin): def __init__(self, this_mbed, mpin):
self.mbed = this_mbed self.mbed = this_mbed
@ -198,7 +200,7 @@ class Timer(mbed_interface):
return float(re.search('\d+\.*\d*', r).group(0)) return float(re.search('\d+\.*\d*', r).group(0))
# Serial # Serial
class Serial(): class Serial(object):
def __init__(self, this_mbed, tx, rx=""): def __init__(self, this_mbed, tx, rx=""):
self.mbed = this_mbed self.mbed = this_mbed
if isinstance(tx, str): if isinstance(tx, str):

View File

@ -14,7 +14,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from host_test import Test, Simple from .host_test import Test, Simple
from sys import stdout from sys import stdout
class NETTest(Simple): class NETTest(Simple):

View File

@ -14,8 +14,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from host_test import Test from .host_test import Test
from mbedrpc import SerialRPC, DigitalOut, DigitalIn, pin from .mbedrpc import SerialRPC, DigitalOut, DigitalIn, pin
class RpcTest(Test): class RpcTest(Test):
@ -30,7 +30,7 @@ class RpcTest(Test):
if hasattr(self.mbed.options, 'micro'): if hasattr(self.mbed.options, 'micro'):
if self.mbed.options.micro == 'M0+': if self.mbed.options.micro == 'M0+':
print "Freedom Board: PTA12 <-> PTC4" print("Freedom Board: PTA12 <-> PTC4")
p_out = pin("PTA12") p_out = pin("PTA12")
p_in = pin("PTC4") p_in = pin("PTC4")

View File

@ -18,7 +18,7 @@ limitations under the License.
import re import re
from time import time, strftime, gmtime from time import time, strftime, gmtime
class RTCTest(): class RTCTest(object):
PATTERN_RTC_VALUE = "\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]" PATTERN_RTC_VALUE = "\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]"
re_detect_rtc_value = re.compile(PATTERN_RTC_VALUE) re_detect_rtc_value = re.compile(PATTERN_RTC_VALUE)

View File

@ -21,7 +21,7 @@ import time
import string import string
from sys import stdout from sys import stdout
class SerialCompleteTest(): class SerialCompleteTest(object):
def test(self, selftest): def test(self, selftest):
strip_chars = string.whitespace + "\0" strip_chars = string.whitespace + "\0"

View File

@ -21,7 +21,7 @@ import time
import string import string
from sys import stdout from sys import stdout
class SerialNCRXTest(): class SerialNCRXTest(object):
def test(self, selftest): def test(self, selftest):
selftest.mbed.flush(); selftest.mbed.flush();

View File

@ -21,7 +21,7 @@ import time
import string import string
from sys import stdout from sys import stdout
class SerialNCTXTest(): class SerialNCTXTest(object):
def test(self, selftest): def test(self, selftest):
selftest.mbed.flush(); selftest.mbed.flush();

View File

@ -19,7 +19,7 @@ import re
import random import random
from time import time from time import time
class StdioTest(): class StdioTest(object):
PATTERN_INT_VALUE = "Your value was: (-?\d+)" PATTERN_INT_VALUE = "Your value was: (-?\d+)"
re_detect_int_value = re.compile(PATTERN_INT_VALUE) re_detect_int_value = re.compile(PATTERN_INT_VALUE)

View File

@ -26,9 +26,9 @@ LEN_PACKET = 127
N_PACKETS = 5000 N_PACKETS = 5000
TOT_BITS = float(LEN_PACKET * N_PACKETS * 8) * 2 TOT_BITS = float(LEN_PACKET * N_PACKETS * 8) * 2
MEGA = float(1024 * 1024) MEGA = float(1024 * 1024)
UPDATE_STEP = (N_PACKETS/10) UPDATE_STEP = (N_PACKETS // 10)
class TCP_EchoClient: class TCP_EchoClient(object):
def __init__(self, host): def __init__(self, host):
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect((host, ECHO_PORT)) self.s.connect((host, ECHO_PORT))
@ -44,10 +44,10 @@ class TCP_EchoClient:
def test(self): def test(self):
start = time() start = time()
for i in range(N_PACKETS): for i in range(N_PACKETS):
if (i % UPDATE_STEP) == 0: print '%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.) if (i % UPDATE_STEP) == 0: print('%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.))
self.__packet() self.__packet()
t = time() - start t = time() - start
print 'Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA) print('Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA))
def __del__(self): def __del__(self):
self.s.close() self.s.close()

View File

@ -35,7 +35,7 @@ class TCPEchoClient_Handler(BaseRequestHandler):
if not data: break if not data: break
self.request.sendall(data) self.request.sendall(data)
if '{{end}}' in str(data): if '{{end}}' in str(data):
print print()
print(str(data)) print(str(data))
else: else:
if not count % 10: if not count % 10:
@ -43,7 +43,7 @@ class TCPEchoClient_Handler(BaseRequestHandler):
count += 1 count += 1
stdout.flush() stdout.flush()
class TCPEchoClientTest(): class TCPEchoClientTest(object):
def send_server_ip_port(self, selftest, ip_address, port_no): def send_server_ip_port(self, selftest, ip_address, port_no):
""" Set up network host. Reset target and and send server IP via serial to Mbed """ Set up network host. Reset target and and send server IP via serial to Mbed
""" """

View File

@ -14,7 +14,10 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from SocketServer import BaseRequestHandler, TCPServer try:
from SocketServer import BaseRequestHandler, TCPServer
except ImportError:
from socketserver import BaseRequestHandler, TCPServer
from time import time from time import time
from mbed_settings import LOCALHOST from mbed_settings import LOCALHOST
@ -24,7 +27,7 @@ MEGA = float(1024 * 1024)
class TCP_EchoHandler(BaseRequestHandler): class TCP_EchoHandler(BaseRequestHandler):
def handle(self): def handle(self):
print "\nconnection received" print("\nconnection received")
start = time() start = time()
bytes = 0 bytes = 0
index = 0 index = 0
@ -35,7 +38,7 @@ class TCP_EchoHandler(BaseRequestHandler):
bytes += len(data) bytes += len(data)
for n in map(ord, data): for n in map(ord, data):
if n != index: if n != index:
print "data error %d != %d" % (n , index) print("data error %d != %d" % (n , index))
index += 1 index += 1
if index > MAX_INDEX: if index > MAX_INDEX:
index = 0 index = 0
@ -43,8 +46,8 @@ class TCP_EchoHandler(BaseRequestHandler):
self.request.sendall(data) self.request.sendall(data)
t = time() - start t = time() - start
b = float(bytes * 8) * 2 b = float(bytes * 8) * 2
print "Throughput: (%.2f)Mbits/s" % ((b/t)/MEGA) print("Throughput: (%.2f)Mbits/s" % ((b/t)/MEGA))
server = TCPServer((LOCALHOST, 7), TCP_EchoHandler) server = TCPServer((LOCALHOST, 7), TCP_EchoHandler)
print "listening for connections" print("listening for connections")
server.serve_forever() server.serve_forever()

View File

@ -22,7 +22,7 @@ import uuid
import socket import socket
from sys import stdout from sys import stdout
class TCPEchoServerTest(): class TCPEchoServerTest(object):
ECHO_SERVER_ADDRESS = "" ECHO_SERVER_ADDRESS = ""
ECHO_PORT = 0 ECHO_PORT = 0
ECHO_LOOPs = 100 ECHO_LOOPs = 100

View File

@ -14,6 +14,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from __future__ import print_function
# Be sure that the tools directory is in the search path # Be sure that the tools directory is in the search path
import sys import sys
from os.path import join, abspath, dirname from os.path import join, abspath, dirname
@ -21,20 +22,23 @@ ROOT = abspath(join(dirname(__file__), "..", ".."))
sys.path.insert(0, ROOT) sys.path.insert(0, ROOT)
from mbed_settings import LOCALHOST from mbed_settings import LOCALHOST
from SocketServer import BaseRequestHandler, TCPServer try:
from SocketServer import BaseRequestHandler, TCPServer
except ImportError:
from socketserver import BaseRequestHandler, TCPServer
class TCP_EchoHandler(BaseRequestHandler): class TCP_EchoHandler(BaseRequestHandler):
def handle(self): def handle(self):
print "\nHandle connection from:", self.client_address print("\nHandle connection from:", self.client_address)
while True: while True:
data = self.request.recv(1024) data = self.request.recv(1024)
if not data: break if not data: break
self.request.sendall(data) self.request.sendall(data)
self.request.close() self.request.close()
print "socket closed" print("socket closed")
if __name__ == '__main__': if __name__ == '__main__':
server = TCPServer((LOCALHOST, 7), TCP_EchoHandler) server = TCPServer((LOCALHOST, 7), TCP_EchoHandler)
print "listening for connections on:", (LOCALHOST, 7) print("listening for connections on:", (LOCALHOST, 7))
server.serve_forever() server.serve_forever()

View File

@ -24,11 +24,20 @@ udp_link_layer_auto.py -p COM20 -d E:\ -t 10
import re import re
import uuid import uuid
import socket import socket
import thread try:
# Python 3
import _thread as thread
except ImportError:
# Python 2
import thread
from sys import stdout from sys import stdout
from time import time, sleep from time import time, sleep
from host_test import DefaultTest from .host_test import DefaultTest
from SocketServer import BaseRequestHandler, UDPServer
try:
from SocketServer import BaseRequestHandler, UDPServer
except ImportError:
from socketserver import BaseRequestHandler, UDPServer
# Received datagrams (with time) # Received datagrams (with time)
@ -52,7 +61,7 @@ def udp_packet_recv(threadName, server_ip, server_port):
""" This function will receive packet stream from mbed device """ This function will receive packet stream from mbed device
""" """
server = UDPServer((server_ip, server_port), UDPEchoClient_Handler) server = UDPServer((server_ip, server_port), UDPEchoClient_Handler)
print "[UDP_COUNTER] Listening for connections... %s:%d"% (server_ip, server_port) print("[UDP_COUNTER] Listening for connections... %s:%d"% (server_ip, server_port))
server.serve_forever() server.serve_forever()
@ -74,7 +83,7 @@ class UDPEchoServerTest(DefaultTest):
try: try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.ECHO_SERVER_ADDRESS, self.CONTROL_PORT)) s.connect((self.ECHO_SERVER_ADDRESS, self.CONTROL_PORT))
except Exception, e: except Exception as e:
data = None data = None
s.send(command) s.send(command)
data = s.recv(BUFFER_SIZE) data = s.recv(BUFFER_SIZE)
@ -97,7 +106,7 @@ class UDPEchoServerTest(DefaultTest):
# Open client socket to burst datagrams to UDP server in mbed # Open client socket to burst datagrams to UDP server in mbed
try: try:
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
except Exception, e: except Exception as e:
self.s = None self.s = None
self.notify("HOST: Error: %s"% e) self.notify("HOST: Error: %s"% e)
return self.RESULT_ERROR return self.RESULT_ERROR

View File

@ -26,9 +26,9 @@ LEN_PACKET = 127
N_PACKETS = 5000 N_PACKETS = 5000
TOT_BITS = float(LEN_PACKET * N_PACKETS * 8) * 2 TOT_BITS = float(LEN_PACKET * N_PACKETS * 8) * 2
MEGA = float(1024 * 1024) MEGA = float(1024 * 1024)
UPDATE_STEP = (N_PACKETS/10) UPDATE_STEP = (N_PACKETS // 10)
class UDP_EchoClient: class UDP_EchoClient(object):
s = socket(AF_INET, SOCK_DGRAM) s = socket(AF_INET, SOCK_DGRAM)
def __init__(self, host): def __init__(self, host):
@ -45,10 +45,10 @@ class UDP_EchoClient:
def test(self): def test(self):
start = time() start = time()
for i in range(N_PACKETS): for i in range(N_PACKETS):
if (i % UPDATE_STEP) == 0: print '%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.) if (i % UPDATE_STEP) == 0: print('%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.))
self.__packet() self.__packet()
t = time() - start t = time() - start
print 'Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA) print('Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA))
while True: while True:
e = UDP_EchoClient(CLIENT_ADDRESS) e = UDP_EchoClient(CLIENT_ADDRESS)

View File

@ -36,7 +36,7 @@ class UDPEchoClient_Handler(BaseRequestHandler):
sys.stdout.write('.') sys.stdout.write('.')
stdout.flush() stdout.flush()
class UDPEchoClientTest(): class UDPEchoClientTest(object):
def send_server_ip_port(self, selftest, ip_address, port_no): def send_server_ip_port(self, selftest, ip_address, port_no):
c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...' c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'

View File

@ -14,16 +14,20 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from SocketServer import BaseRequestHandler, UDPServer from __future__ import print_function
try:
from SocketServer import BaseRequestHandler, UDPServer
except ImportError:
from socketserver import BaseRequestHandler, UDPServer
from mbed_settings import SERVER_ADDRESS from mbed_settings import SERVER_ADDRESS
class UDP_EchoHandler(BaseRequestHandler): class UDP_EchoHandler(BaseRequestHandler):
def handle(self): def handle(self):
data, socket = self.request data, socket = self.request
print "client:", self.client_address print("client:", self.client_address)
print "data:", data print("data:", data)
socket.sendto(data, self.client_address) socket.sendto(data, self.client_address)
server = UDPServer((SERVER_ADDRESS, 7195), UDP_EchoHandler) server = UDPServer((SERVER_ADDRESS, 7195), UDP_EchoHandler)
print "listening for connections" print("listening for connections")
server.serve_forever() server.serve_forever()

View File

@ -22,7 +22,7 @@ import uuid
from sys import stdout from sys import stdout
from socket import socket, AF_INET, SOCK_DGRAM from socket import socket, AF_INET, SOCK_DGRAM
class UDPEchoServerTest(): class UDPEchoServerTest(object):
ECHO_SERVER_ADDRESS = "" ECHO_SERVER_ADDRESS = ""
ECHO_PORT = 0 ECHO_PORT = 0
s = None # Socket s = None # Socket

View File

@ -17,7 +17,7 @@ limitations under the License.
from time import time from time import time
class WaitusTest(): class WaitusTest(object):
""" This test is reading single characters from stdio """ This test is reading single characters from stdio
and measures time between their occurrences. and measures time between their occurrences.
""" """

View File

@ -20,7 +20,7 @@ import uuid
import time import time
from sys import stdout from sys import stdout
class WFITest(): class WFITest(object):
def test(self, selftest): def test(self, selftest):
c = selftest.mbed.serial_readline() c = selftest.mbed.serial_readline()

View File

@ -33,6 +33,7 @@ from collections import defaultdict
from prettytable import PrettyTable, HEADER from prettytable import PrettyTable, HEADER
from jinja2 import FileSystemLoader, StrictUndefined from jinja2 import FileSystemLoader, StrictUndefined
from jinja2.environment import Environment from jinja2.environment import Environment
from future.utils import with_metaclass
# Be sure that the tools directory is in the search path # Be sure that the tools directory is in the search path
@ -46,9 +47,8 @@ from tools.utils import (
) # noqa: E402 ) # noqa: E402
class _Parser(object): class _Parser(with_metaclass(ABCMeta, object)):
"""Internal interface for parsing""" """Internal interface for parsing"""
__metaclass__ = ABCMeta
SECTIONS = ('.text', '.data', '.bss', '.heap', '.stack') SECTIONS = ('.text', '.data', '.bss', '.heap', '.stack')
MISC_FLASH_SECTIONS = ('.interrupts', '.flash_config') MISC_FLASH_SECTIONS = ('.interrupts', '.flash_config')
OTHER_SECTIONS = ('.interrupts_ram', '.init', '.ARM.extab', OTHER_SECTIONS = ('.interrupts_ram', '.init', '.ARM.extab',

View File

@ -16,9 +16,10 @@
from __future__ import print_function, division, absolute_import from __future__ import print_function, division, absolute_import
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from future.utils import with_metaclass
class Notifier(object): class Notifier(with_metaclass(ABCMeta, object)):
""" """
Notifiers send build system events to a front end or may implement a front Notifiers send build system events to a front end or may implement a front
end themselves, displaying warnings and errors for a user. end themselves, displaying warnings and errors for a user.
@ -48,8 +49,6 @@ class Notifier(object):
| communicate the binary location to the online IDE. | communicate the binary location to the online IDE.
""" """
__metaclass__ = ABCMeta
@abstractmethod @abstractmethod
def notify(self, event): def notify(self, event):
""" """

View File

@ -103,14 +103,14 @@ if __name__ == '__main__':
# Print scrip version # Print scrip version
if opts.version: if opts.version:
print parser.description print(parser.description)
print parser.epilog print(parser.epilog)
print "Version %d.%d"% get_version() print("Version %d.%d"% get_version())
exit(0) exit(0)
# Print summary / information about automation test status # Print summary / information about automation test status
if opts.test_automation_report: if opts.test_automation_report:
print get_avail_tests_summary_table(platform_filter=opts.general_filter_regex) print(get_avail_tests_summary_table(platform_filter=opts.general_filter_regex))
exit(0) exit(0)
# Print summary / information about automation test status # Print summary / information about automation test status
@ -122,15 +122,15 @@ if __name__ == '__main__':
'host_test', 'host_test',
'duration', 'duration',
'source_dir'] 'source_dir']
print get_avail_tests_summary_table(cols=test_case_report_cols, print(get_avail_tests_summary_table(cols=test_case_report_cols,
result_summary=False, result_summary=False,
join_delim='\n', join_delim='\n',
platform_filter=opts.general_filter_regex) platform_filter=opts.general_filter_regex))
exit(0) exit(0)
# Only prints matrix of supported toolchains # Only prints matrix of supported toolchains
if opts.supported_toolchains: if opts.supported_toolchains:
print mcu_toolchain_matrix(platform_filter=opts.general_filter_regex) print(mcu_toolchain_matrix(platform_filter=opts.general_filter_regex))
exit(0) exit(0)
test_spec = None test_spec = None
@ -139,14 +139,14 @@ if __name__ == '__main__':
if hasattr(opts, 'auto_detect') and opts.auto_detect: if hasattr(opts, 'auto_detect') and opts.auto_detect:
# If auto_detect attribute is present, we assume other auto-detection # If auto_detect attribute is present, we assume other auto-detection
# parameters like 'toolchains_filter' are also set. # parameters like 'toolchains_filter' are also set.
print "MBEDLS: Detecting connected mbed-enabled devices... " print("MBEDLS: Detecting connected mbed-enabled devices... ")
MUTs = get_autodetected_MUTS_list() MUTs = get_autodetected_MUTS_list()
for mut in MUTs.values(): for mut in MUTs.values():
print "MBEDLS: Detected %s, port: %s, mounted: %s"% (mut['mcu_unique'] if 'mcu_unique' in mut else mut['mcu'], print("MBEDLS: Detected %s, port: %s, mounted: %s"% (mut['mcu_unique'] if 'mcu_unique' in mut else mut['mcu'],
mut['port'], mut['port'],
mut['disk']) mut['disk']))
# Set up parameters for test specification filter function (we need to set toolchains per target here) # Set up parameters for test specification filter function (we need to set toolchains per target here)
use_default_toolchain = 'default' in opts.toolchains_filter if opts.toolchains_filter is not None else True use_default_toolchain = 'default' in opts.toolchains_filter if opts.toolchains_filter is not None else True
@ -179,13 +179,13 @@ if __name__ == '__main__':
exit(-1) exit(-1)
if opts.verbose_test_configuration_only: if opts.verbose_test_configuration_only:
print "MUTs configuration in %s:" % ('auto-detected' if opts.auto_detect else opts.muts_spec_filename) print("MUTs configuration in %s:" % ('auto-detected' if opts.auto_detect else opts.muts_spec_filename))
if MUTs: if MUTs:
print print_muts_configuration_from_json(MUTs, platform_filter=opts.general_filter_regex) print(print_muts_configuration_from_json(MUTs, platform_filter=opts.general_filter_regex))
print print()
print "Test specification in %s:" % ('auto-detected' if opts.auto_detect else opts.test_spec_filename) print("Test specification in %s:" % ('auto-detected' if opts.auto_detect else opts.test_spec_filename))
if test_spec: if test_spec:
print print_test_configuration_from_json(test_spec) print(print_test_configuration_from_json(test_spec))
exit(0) exit(0)
if get_module_avail('mbed_lstools'): if get_module_avail('mbed_lstools'):
@ -201,16 +201,16 @@ if __name__ == '__main__':
report_exporter = ReportExporter(ResultExporterType.JUNIT_OPER) report_exporter = ReportExporter(ResultExporterType.JUNIT_OPER)
report_exporter.report_to_file(test_results, opts.report_junit_file_name) report_exporter.report_to_file(test_results, opts.report_junit_file_name)
else: else:
print "Unknown interoperability test scope name: '%s'" % (opts.operability_checks) print("Unknown interoperability test scope name: '%s'" % (opts.operability_checks))
print "Available test scopes: %s" % (','.join(["'%s'" % n for n in test_scope])) print("Available test scopes: %s" % (','.join(["'%s'" % n for n in test_scope])))
exit(0) exit(0)
# Verbose test specification and MUTs configuration # Verbose test specification and MUTs configuration
if MUTs and opts.verbose: if MUTs and opts.verbose:
print print_muts_configuration_from_json(MUTs) print(print_muts_configuration_from_json(MUTs))
if test_spec and opts.verbose: if test_spec and opts.verbose:
print print_test_configuration_from_json(test_spec) print(print_test_configuration_from_json(test_spec))
if opts.only_build_tests: if opts.only_build_tests:
# We are skipping testing phase, and suppress summary # We are skipping testing phase, and suppress summary

View File

@ -33,7 +33,7 @@ def format_number(number, width):
# convert to string # convert to string
line = format(number, '0%dx' % (width)) line = format(number, '0%dx' % (width))
if len(line) > width: if len(line) > width:
print "[ERROR] 0x%s cannot fit in width %d" % (line, width) print("[ERROR] 0x%s cannot fit in width %d" % (line, width))
sys.exit(-1) sys.exit(-1)
# cut string to list & reverse # cut string to list & reverse
line = [line[i:i+2] for i in range(0, len(line), 2)] line = [line[i:i+2] for i in range(0, len(line), 2)]
@ -58,7 +58,7 @@ def write_padding_bytes(output_name, size):
current_size = os.stat(output_name).st_size current_size = os.stat(output_name).st_size
padcount = size - current_size padcount = size - current_size
if padcount < 0: if padcount < 0:
print "[ERROR] image is larger than expected size" print("[ERROR] image is larger than expected size")
sys.exit(-1) sys.exit(-1)
output = open(output_name, "ab") output = open(output_name, "ab")
output.write('\377' * padcount) output.write('\377' * padcount)
@ -96,7 +96,7 @@ def find_symbol(toolchain, mapfile, symbol):
ret = match.group("addr") ret = match.group("addr")
if not ret: if not ret:
print "[ERROR] cannot find the address of symbol " + symbol print("[ERROR] cannot find the address of symbol " + symbol)
return 0 return 0
return int(ret,16) | 1 return int(ret,16) | 1

View File

@ -1274,7 +1274,7 @@ if args.list:
for f in fnmatch.filter(os.listdir(cubemxdirMCU), "STM32*.xml"): for f in fnmatch.filter(os.listdir(cubemxdirMCU), "STM32*.xml"):
print(f) print(f)
FileCount += 1 FileCount += 1
print print()
print("%i available xml files description" % FileCount) print("%i available xml files description" % FileCount)
quit() quit()
@ -1287,7 +1287,7 @@ if args.boards:
NucleoFileCount += 1 NucleoFileCount += 1
elif "Discovery" in f: elif "Discovery" in f:
DiscoFileCount += 1 DiscoFileCount += 1
print print()
print("%2i available Nucleo files description" % NucleoFileCount) print("%2i available Nucleo files description" % NucleoFileCount)
print("%2i available Disco files description" % DiscoFileCount) print("%2i available Disco files description" % DiscoFileCount)
quit() quit()

View File

@ -646,7 +646,7 @@ class MCU_NRF51Code(object):
binh.write_hex_file(fileout, write_start_addr=False) binh.write_hex_file(fileout, write_start_addr=False)
class NCS36510TargetCode: class NCS36510TargetCode(object):
@staticmethod @staticmethod
def ncs36510_addfib(t_self, resources, elf, binf): def ncs36510_addfib(t_self, resources, elf, binf):
from tools.targets.NCS import add_fib_at_start from tools.targets.NCS import add_fib_at_start
@ -654,7 +654,7 @@ class NCS36510TargetCode:
add_fib_at_start(binf[:-4]) add_fib_at_start(binf[:-4])
class RTL8195ACode: class RTL8195ACode(object):
"""RTL8195A Hooks""" """RTL8195A Hooks"""
@staticmethod @staticmethod
def binary_hook(t_self, resources, elf, binf): def binary_hook(t_self, resources, elf, binf):
@ -662,7 +662,7 @@ class RTL8195ACode:
rtl8195a_elf2bin(t_self, elf, binf) rtl8195a_elf2bin(t_self, elf, binf)
class PSOC6Code: class PSOC6Code(object):
@staticmethod @staticmethod
def complete(t_self, resources, elf, binf): def complete(t_self, resources, elf, binf):
from tools.targets.PSOC6 import complete as psoc6_complete from tools.targets.PSOC6 import complete as psoc6_complete
@ -694,7 +694,7 @@ class PSOC6Code:
from tools.targets.PSOC6 import sign_image as psoc6_sign_image from tools.targets.PSOC6 import sign_image as psoc6_sign_image
psoc6_sign_image(t_self, binf) psoc6_sign_image(t_self, binf)
class ArmMuscaA1Code: class ArmMuscaA1Code(object):
"""Musca-A1 Hooks""" """Musca-A1 Hooks"""
@staticmethod @staticmethod
def binary_hook(t_self, resources, elf, binf): def binary_hook(t_self, resources, elf, binf):
@ -710,7 +710,7 @@ class ArmMuscaA1Code:
musca_tfm_bin(t_self, binf, secure_bin) musca_tfm_bin(t_self, binf, secure_bin)
class LPC55S69Code: class LPC55S69Code(object):
"""LPC55S69 Hooks""" """LPC55S69 Hooks"""
@staticmethod @staticmethod
def binary_hook(t_self, resources, elf, binf): def binary_hook(t_self, resources, elf, binf):
@ -725,7 +725,7 @@ class LPC55S69Code:
) )
lpc55s69_complete(t_self, binf, secure_bin) lpc55s69_complete(t_self, binf, secure_bin)
class M2351Code: class M2351Code(object):
"""M2351 Hooks""" """M2351 Hooks"""
@staticmethod @staticmethod
def merge_secure(t_self, resources, ns_elf, ns_hex): def merge_secure(t_self, resources, ns_elf, ns_hex):

View File

@ -247,14 +247,14 @@ def subcommand(name, *args, **kwargs):
choices=TARGET_MAP.keys(), type=str.upper)) choices=TARGET_MAP.keys(), type=str.upper))
def targets_cmd(mcus=[]): def targets_cmd(mcus=[]):
"""Find and print errors about specific targets""" """Find and print errors about specific targets"""
print dump_all([check_hierarchy(TARGET_MAP[m]) for m in mcus], print(dump_all([check_hierarchy(TARGET_MAP[m]) for m in mcus],
default_flow_style=False) default_flow_style=False))
@subcommand("all-targets") @subcommand("all-targets")
def all_targets_cmd(): def all_targets_cmd():
"""Print all errors about all parts""" """Print all errors about all parts"""
print dump_all([check_hierarchy(m) for m in TARGET_MAP.values()], print(dump_all([check_hierarchy(m) for m in list(TARGET_MAP.values())],
default_flow_style=False) default_flow_style=False))
@subcommand("orphans") @subcommand("orphans")
def orphans_cmd(): def orphans_cmd():
@ -265,7 +265,7 @@ def orphans_cmd():
if name in orphans: if name in orphans:
orphans.remove(name) orphans.remove(name)
if orphans: if orphans:
print dump_all([orphans], default_flow_style=False) print(dump_all([orphans], default_flow_style=False))
return len(orphans) return len(orphans)
def main(): def main():

View File

@ -312,7 +312,7 @@ def main():
# Print memory map summary on screen # Print memory map summary on screen
if build_report: if build_report:
print print()
print(print_build_memory_usage(build_report)) print(print_build_memory_usage(build_report))
print_report_exporter = ReportExporter(ResultExporterType.PRINT, package="build") print_report_exporter = ReportExporter(ResultExporterType.PRINT, package="build")

View File

@ -20,7 +20,7 @@ from mock import patch
from tools.detect_targets import get_interface_version from tools.detect_targets import get_interface_version
class MbedLsToolsMock(): class MbedLsToolsMock(object):
""" """
Mock of mbedls tools Mock of mbedls tools
""" """

View File

@ -1461,11 +1461,11 @@ def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=','
def progress_bar(percent_progress, saturation=0): def progress_bar(percent_progress, saturation=0):
""" This function creates progress bar with optional simple saturation mark """ This function creates progress bar with optional simple saturation mark
""" """
step = int(percent_progress / 2) # Scale by to (scale: 1 - 50) step = percent_progress // 2 # Scale by to (scale: 1 - 50)
str_progress = '#' * step + '.' * int(50 - step) str_progress = '#' * step + '.' * int(50 - step)
c = '!' if str_progress[38] == '.' else '|' c = '!' if str_progress[38] == '.' else '|'
if saturation > 0: if saturation > 0:
saturation = saturation / 2 saturation = saturation // 2
str_progress = str_progress[:saturation] + c + str_progress[saturation:] str_progress = str_progress[:saturation] + c + str_progress[saturation:]
return str_progress return str_progress
@ -1517,7 +1517,7 @@ def singletest_in_cli_mode(single_test):
# Returns True if no build failures of the test projects or their dependencies # Returns True if no build failures of the test projects or their dependencies
return status return status
class TestLogger(): class TestLogger(object):
""" Super-class for logging and printing ongoing events for test suite pass """ Super-class for logging and printing ongoing events for test suite pass
""" """
def __init__(self, store_log=True): def __init__(self, store_log=True):

View File

@ -29,7 +29,7 @@ ResultExporterType = construct_enum(HTML='Html_Exporter',
PRINT='Print_Exporter') PRINT='Print_Exporter')
class ReportExporter(): class ReportExporter(object):
""" Class exports extended test result Python data structure to """ Class exports extended test result Python data structure to
different formats like HTML, JUnit XML. different formats like HTML, JUnit XML.

View File

@ -14,6 +14,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from past.builtins import cmp
from tools.paths import * from tools.paths import *
from argparse import ArgumentTypeError from argparse import ArgumentTypeError
from tools.utils import columnate from tools.utils import columnate
@ -857,7 +858,7 @@ except:
TEST_GROUPS = {} TEST_GROUPS = {}
GROUPS.update(TEST_GROUPS) GROUPS.update(TEST_GROUPS)
class Test: class Test(object):
DEFAULTS = { DEFAULTS = {
#'mcu': None, #'mcu': None,
'description': None, 'description': None,

View File

@ -49,6 +49,7 @@ from ..config import (ConfigException, RAM_ALL_MEMORIES, ROM_ALL_MEMORIES)
from ..regions import (UPDATE_WHITELIST, merge_region_list) from ..regions import (UPDATE_WHITELIST, merge_region_list)
from ..settings import COMPARE_FIXED from ..settings import COMPARE_FIXED
from ..settings import ARM_PATH, ARMC6_PATH, GCC_ARM_PATH, IAR_PATH from ..settings import ARM_PATH, ARMC6_PATH, GCC_ARM_PATH, IAR_PATH
from future.utils import with_metaclass
TOOLCHAIN_PATHS = { TOOLCHAIN_PATHS = {
@ -109,7 +110,7 @@ CORTEX_SYMBOLS = {
} }
class mbedToolchain: class mbedToolchain(with_metaclass(ABCMeta, object)):
OFFICIALLY_SUPPORTED = False OFFICIALLY_SUPPORTED = False
# Verbose logging # Verbose logging
@ -127,8 +128,6 @@ class mbedToolchain:
PROFILE_FILE_NAME = ".profile" PROFILE_FILE_NAME = ".profile"
__metaclass__ = ABCMeta
profile_template = {'common': [], 'c': [], 'cxx': [], 'asm': [], 'ld': []} profile_template = {'common': [], 'c': [], 'cxx': [], 'asm': [], 'ld': []}
def __init__(self, target, notify=None, macros=None, build_profile=None, def __init__(self, target, notify=None, macros=None, build_profile=None,