Updates to `/tools` for Python 3 compatibility

pull/11921/head
Graham Hammond 2019-11-21 15:02:37 +00:00
parent 30bab0942b
commit e723571474
32 changed files with 107 additions and 88 deletions

View File

@ -28,6 +28,7 @@ import copy
from tools.targets import TARGET_MAP
from tools.utils import mkdir
from tools.resources import FileType, FileRef
from future.utils import with_metaclass
"""Just a template for subclassing"""
@ -57,14 +58,13 @@ def deprecated_exporter(CLS):
CLS.NAME = "%s (DEPRECATED)" % old_name
return CLS
class Exporter(object):
class Exporter(with_metaclass(ABCMeta, object)):
"""Exporter base class
This class is meant to be extended by individual exporters, and provides a
few helper methods for implementing an exporter with either jinja2 or
progen.
"""
__metaclass__ = ABCMeta
TEMPLATE_DIR = dirname(__file__)
DOT_IN_RELATIVE_PATH = False
NAME = None

View File

@ -134,7 +134,7 @@ class PackFlashAlgo(object):
blob = self.algo_data[:]
pad_size = 0 if len(blob) % 4 == 0 else 4 - len(blob) % 4
blob = blob + "\x00" * pad_size
integer_list = struct.unpack("<" + "L" * (len(blob) / 4), blob)
integer_list = struct.unpack("<" + "L" * (len(blob) // 4), blob)
line_list = []
for pos in range(0, len(integer_list), group_size):
group = ["0x%08x" % value for value in

View File

@ -14,7 +14,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from host_test import Test
from .host_test import Test
class EchoTest(Test):

View File

@ -16,6 +16,7 @@ limitations under the License.
"""
class HelloTest():
class HelloTest(object):
HELLO_WORLD = "Hello World"
def test(self, selftest):

View File

@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
class HostRegistry:
class HostRegistry(object):
""" Class stores registry with host tests and objects representing them
"""
HOST_TESTS = {} # host_test_name -> host_test_ojbect

View File

@ -18,8 +18,8 @@ limitations under the License.
# Check if 'serial' module is installed
try:
from serial import Serial
except ImportError, e:
print "Error: Can't import 'serial' module: %s"% e
except ImportError as e:
print("Error: Can't import 'serial' module: %s"% e)
exit(-1)
import os
@ -29,7 +29,7 @@ from sys import stdout
from time import sleep, time
from optparse import OptionParser
import host_tests_plugins
from . import host_tests_plugins
# This is a little tricky. We need to add upper directory to path so
# we can find packages we want from the same level as other files do
@ -39,7 +39,7 @@ from tools.test_api import get_autodetected_MUTS_list
from tools.test_api import get_module_avail
class Mbed:
class Mbed(object):
""" Base class for a host driven test
"""
def __init__(self):
@ -117,7 +117,7 @@ class Mbed:
self.serial_timeout = 1
self.timeout = self.DEFAULT_TOUT if self.options.timeout is None else self.options.timeout
print 'MBED: Instrumentation: "%s" and disk: "%s"' % (self.port, self.disk)
print('MBED: Instrumentation: "%s" and disk: "%s"' % (self.port, self.disk))
def init_serial_params(self, serial_baud=9600, serial_timeout=1):
""" Initialize port parameters.
@ -183,11 +183,11 @@ class Mbed:
stdout.write('.')
stdout.flush()
else:
print "...port ready!"
print("...port ready!")
result = True
break
if not result and last_error:
print last_error
print(last_error)
return result
def set_serial_timeout(self, timeout):
@ -221,7 +221,7 @@ class Mbed:
c = self.serial.read(1)
result += c
except Exception as e:
print "MBED: %s"% str(e)
print("MBED: %s"% str(e))
result = None
break
if c == '\n':
@ -298,7 +298,7 @@ class Mbed:
return result
class HostTestResults:
class HostTestResults(object):
""" Test results set by host tests
"""
def __init__(self):
@ -389,8 +389,8 @@ class Test(HostTestResults):
self.print_result(result)
else:
self.notify("HOST: Passive mode...")
except Exception, e:
print str(e)
except Exception as e:
print(str(e))
self.print_result(self.RESULT_ERROR)
def setup(self):
@ -406,7 +406,7 @@ class Test(HostTestResults):
def notify(self, message):
""" On screen notification function
"""
print message
print(message)
stdout.flush()
def print_result(self, result):

View File

@ -27,10 +27,12 @@
# >myled.write(1)
# >
import serial, urllib2, time
from future import standard_library
standard_library.install_aliases()
import serial, urllib.request, urllib.error, urllib.parse, time
# mbed super class
class mbed:
class mbed(object):
def __init__(self):
print("This will work as a demo but no transport mechanism has been selected")
@ -48,7 +50,7 @@ class SerialRPC(mbed):
# creates the command to be sent serially - /name/method arg1 arg2 arg3 ... argN
str = "/" + name + "/" + method + " " + " ".join(args) + "\n"
# prints the command being executed
print str
print(str)
# writes the command to serial
self.ser.write(str)
# strips trailing characters from the line just written
@ -61,12 +63,12 @@ class HTTPRPC(mbed):
self.host = "http://" + ip
def rpc(self, name, method, args):
response = urllib2.urlopen(self.host + "/rpc/" + name + "/" + method + "%20" + "%20".join(args))
response = urllib.request.urlopen(self.host + "/rpc/" + name + "/" + method + "%20" + "%20".join(args))
return response.read().strip()
# generic mbed interface super class
class mbed_interface():
class mbed_interface(object):
# initialize an mbed interface with a transport mechanism and pin name
def __init__(self, this_mbed, mpin):
self.mbed = this_mbed
@ -198,7 +200,7 @@ class Timer(mbed_interface):
return float(re.search('\d+\.*\d*', r).group(0))
# Serial
class Serial():
class Serial(object):
def __init__(self, this_mbed, tx, rx=""):
self.mbed = this_mbed
if isinstance(tx, str):

View File

@ -14,7 +14,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from host_test import Test, Simple
from .host_test import Test, Simple
from sys import stdout
class NETTest(Simple):

View File

@ -14,8 +14,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from host_test import Test
from mbedrpc import SerialRPC, DigitalOut, DigitalIn, pin
from .host_test import Test
from .mbedrpc import SerialRPC, DigitalOut, DigitalIn, pin
class RpcTest(Test):
@ -30,7 +30,7 @@ class RpcTest(Test):
if hasattr(self.mbed.options, 'micro'):
if self.mbed.options.micro == 'M0+':
print "Freedom Board: PTA12 <-> PTC4"
print("Freedom Board: PTA12 <-> PTC4")
p_out = pin("PTA12")
p_in = pin("PTC4")

View File

@ -21,7 +21,7 @@ import time
import string
from sys import stdout
class SerialCompleteTest():
class SerialCompleteTest(object):
def test(self, selftest):
strip_chars = string.whitespace + "\0"

View File

@ -21,7 +21,7 @@ import time
import string
from sys import stdout
class SerialNCRXTest():
class SerialNCRXTest(object):
def test(self, selftest):
selftest.mbed.flush();

View File

@ -21,7 +21,7 @@ import time
import string
from sys import stdout
class SerialNCTXTest():
class SerialNCTXTest(object):
def test(self, selftest):
selftest.mbed.flush();

View File

@ -19,7 +19,7 @@ import re
import random
from time import time
class StdioTest():
class StdioTest(object):
PATTERN_INT_VALUE = "Your value was: (-?\d+)"
re_detect_int_value = re.compile(PATTERN_INT_VALUE)

View File

@ -26,9 +26,9 @@ LEN_PACKET = 127
N_PACKETS = 5000
TOT_BITS = float(LEN_PACKET * N_PACKETS * 8) * 2
MEGA = float(1024 * 1024)
UPDATE_STEP = (N_PACKETS/10)
UPDATE_STEP = N_PACKETS // 10
class TCP_EchoClient:
class TCP_EchoClient(object):
def __init__(self, host):
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect((host, ECHO_PORT))
@ -44,10 +44,10 @@ class TCP_EchoClient:
def test(self):
start = time()
for i in range(N_PACKETS):
if (i % UPDATE_STEP) == 0: print '%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.)
if (i % UPDATE_STEP) == 0: print('%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.))
self.__packet()
t = time() - start
print 'Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA)
print('Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA))
def __del__(self):
self.s.close()

View File

@ -35,7 +35,7 @@ class TCPEchoClient_Handler(BaseRequestHandler):
if not data: break
self.request.sendall(data)
if '{{end}}' in str(data):
print
print()
print(str(data))
else:
if not count % 10:
@ -43,7 +43,7 @@ class TCPEchoClient_Handler(BaseRequestHandler):
count += 1
stdout.flush()
class TCPEchoClientTest():
class TCPEchoClientTest(object):
def send_server_ip_port(self, selftest, ip_address, port_no):
""" Set up network host. Reset target and and send server IP via serial to Mbed
"""

View File

@ -14,7 +14,10 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from SocketServer import BaseRequestHandler, TCPServer
try:
from SocketServer import BaseRequestHandler, TCPServer
except ImportError:
from socketserver import BaseRequestHandler, TCPServer
from time import time
from mbed_settings import LOCALHOST
@ -24,7 +27,7 @@ MEGA = float(1024 * 1024)
class TCP_EchoHandler(BaseRequestHandler):
def handle(self):
print "\nconnection received"
print("\nconnection received")
start = time()
bytes = 0
index = 0
@ -35,7 +38,7 @@ class TCP_EchoHandler(BaseRequestHandler):
bytes += len(data)
for n in map(ord, data):
if n != index:
print "data error %d != %d" % (n , index)
print("data error %d != %d" % (n , index))
index += 1
if index > MAX_INDEX:
index = 0
@ -43,8 +46,8 @@ class TCP_EchoHandler(BaseRequestHandler):
self.request.sendall(data)
t = time() - start
b = float(bytes * 8) * 2
print "Throughput: (%.2f)Mbits/s" % ((b/t)/MEGA)
print("Throughput: (%.2f)Mbits/s" % ((b/t)/MEGA))
server = TCPServer((LOCALHOST, 7), TCP_EchoHandler)
print "listening for connections"
print("listening for connections")
server.serve_forever()

View File

@ -22,7 +22,7 @@ import uuid
import socket
from sys import stdout
class TCPEchoServerTest():
class TCPEchoServerTest(object):
ECHO_SERVER_ADDRESS = ""
ECHO_PORT = 0
ECHO_LOOPs = 100

View File

@ -21,20 +21,23 @@ ROOT = abspath(join(dirname(__file__), "..", ".."))
sys.path.insert(0, ROOT)
from mbed_settings import LOCALHOST
from SocketServer import BaseRequestHandler, TCPServer
try:
from SocketServer import BaseRequestHandler, TCPServer
except ImportError:
from socketserver import BaseRequestHandler, TCPServer
class TCP_EchoHandler(BaseRequestHandler):
def handle(self):
print "\nHandle connection from:", self.client_address
print("\nHandle connection from:", self.client_address)
while True:
data = self.request.recv(1024)
if not data: break
self.request.sendall(data)
self.request.close()
print "socket closed"
print("socket closed")
if __name__ == '__main__':
server = TCPServer((LOCALHOST, 7), TCP_EchoHandler)
print "listening for connections on:", (LOCALHOST, 7)
print("listening for connections on:", (LOCALHOST, 7))
server.serve_forever()

View File

@ -24,11 +24,20 @@ udp_link_layer_auto.py -p COM20 -d E:\ -t 10
import re
import uuid
import socket
import thread
try:
# Python 3
import _thread as thread
except ImportError:
# Python 2
import thread
from sys import stdout
from time import time, sleep
from host_test import DefaultTest
from SocketServer import BaseRequestHandler, UDPServer
from .host_test import DefaultTest
try:
from SocketServer import BaseRequestHandler, UDPServer
except ImportError:
from socketserver import BaseRequestHandler, UDPServer
# Received datagrams (with time)
@ -52,7 +61,7 @@ def udp_packet_recv(threadName, server_ip, server_port):
""" This function will receive packet stream from mbed device
"""
server = UDPServer((server_ip, server_port), UDPEchoClient_Handler)
print "[UDP_COUNTER] Listening for connections... %s:%d"% (server_ip, server_port)
print("[UDP_COUNTER] Listening for connections... %s:%d"% (server_ip, server_port))
server.serve_forever()
@ -74,7 +83,7 @@ class UDPEchoServerTest(DefaultTest):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.ECHO_SERVER_ADDRESS, self.CONTROL_PORT))
except Exception, e:
except Exception as e:
data = None
s.send(command)
data = s.recv(BUFFER_SIZE)
@ -97,7 +106,7 @@ class UDPEchoServerTest(DefaultTest):
# Open client socket to burst datagrams to UDP server in mbed
try:
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
except Exception, e:
except Exception as e:
self.s = None
self.notify("HOST: Error: %s"% e)
return self.RESULT_ERROR

View File

@ -26,9 +26,9 @@ LEN_PACKET = 127
N_PACKETS = 5000
TOT_BITS = float(LEN_PACKET * N_PACKETS * 8) * 2
MEGA = float(1024 * 1024)
UPDATE_STEP = (N_PACKETS/10)
UPDATE_STEP = N_PACKETS // 10
class UDP_EchoClient:
class UDP_EchoClient(object):
s = socket(AF_INET, SOCK_DGRAM)
def __init__(self, host):
@ -45,10 +45,10 @@ class UDP_EchoClient:
def test(self):
start = time()
for i in range(N_PACKETS):
if (i % UPDATE_STEP) == 0: print '%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.)
if (i % UPDATE_STEP) == 0: print('%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.))
self.__packet()
t = time() - start
print 'Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA)
print('Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA))
while True:
e = UDP_EchoClient(CLIENT_ADDRESS)

View File

@ -36,7 +36,7 @@ class UDPEchoClient_Handler(BaseRequestHandler):
sys.stdout.write('.')
stdout.flush()
class UDPEchoClientTest():
class UDPEchoClientTest(object):
def send_server_ip_port(self, selftest, ip_address, port_no):
c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'

View File

@ -14,16 +14,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from SocketServer import BaseRequestHandler, UDPServer
try:
from SocketServer import BaseRequestHandler, UDPServer
except ImportError:
from socketserver import BaseRequestHandler, UDPServer
from mbed_settings import SERVER_ADDRESS
class UDP_EchoHandler(BaseRequestHandler):
def handle(self):
data, socket = self.request
print "client:", self.client_address
print "data:", data
print("client:", self.client_address)
print("data:", data)
socket.sendto(data, self.client_address)
server = UDPServer((SERVER_ADDRESS, 7195), UDP_EchoHandler)
print "listening for connections"
print("listening for connections")
server.serve_forever()

View File

@ -22,7 +22,7 @@ import uuid
from sys import stdout
from socket import socket, AF_INET, SOCK_DGRAM
class UDPEchoServerTest():
class UDPEchoServerTest(object):
ECHO_SERVER_ADDRESS = ""
ECHO_PORT = 0
s = None # Socket

View File

@ -17,7 +17,7 @@ limitations under the License.
from time import time
class WaitusTest():
class WaitusTest(object):
""" This test is reading single characters from stdio
and measures time between their occurrences.
"""

View File

@ -20,7 +20,7 @@ import uuid
import time
from sys import stdout
class WFITest():
class WFITest(object):
def test(self, selftest):
c = selftest.mbed.serial_readline()

View File

@ -16,9 +16,10 @@
from __future__ import print_function, division, absolute_import
from abc import ABCMeta, abstractmethod
from future.utils import with_metaclass
class Notifier(object):
class Notifier(with_metaclass(ABCMeta, object)):
"""
Notifiers send build system events to a front end or may implement a front
end themselves, displaying warnings and errors for a user.
@ -48,8 +49,6 @@ class Notifier(object):
| communicate the binary location to the online IDE.
"""
__metaclass__ = ABCMeta
@abstractmethod
def notify(self, event):
"""

View File

@ -33,7 +33,7 @@ def format_number(number, width):
# convert to string
line = format(number, '0%dx' % (width))
if len(line) > width:
print "[ERROR] 0x%s cannot fit in width %d" % (line, width)
print("[ERROR] 0x%s cannot fit in width %d" % (line, width))
sys.exit(-1)
# cut string to list & reverse
line = [line[i:i+2] for i in range(0, len(line), 2)]
@ -58,7 +58,7 @@ def write_padding_bytes(output_name, size):
current_size = os.stat(output_name).st_size
padcount = size - current_size
if padcount < 0:
print "[ERROR] image is larger than expected size"
print("[ERROR] image is larger than expected size")
sys.exit(-1)
output = open(output_name, "ab")
output.write('\377' * padcount)
@ -96,7 +96,7 @@ def find_symbol(toolchain, mapfile, symbol):
ret = match.group("addr")
if not ret:
print "[ERROR] cannot find the address of symbol " + symbol
print("[ERROR] cannot find the address of symbol " + symbol)
return 0
return int(ret,16) | 1

View File

@ -1274,7 +1274,7 @@ if args.list:
for f in fnmatch.filter(os.listdir(cubemxdirMCU), "STM32*.xml"):
print(f)
FileCount += 1
print
print()
print("%i available xml files description" % FileCount)
quit()
@ -1287,7 +1287,7 @@ if args.boards:
NucleoFileCount += 1
elif "Discovery" in f:
DiscoFileCount += 1
print
print()
print("%2i available Nucleo files description" % NucleoFileCount)
print("%2i available Disco files description" % DiscoFileCount)
quit()

View File

@ -646,7 +646,7 @@ class MCU_NRF51Code(object):
binh.write_hex_file(fileout, write_start_addr=False)
class NCS36510TargetCode:
class NCS36510TargetCode(object):
@staticmethod
def ncs36510_addfib(t_self, resources, elf, binf):
from tools.targets.NCS import add_fib_at_start
@ -654,7 +654,7 @@ class NCS36510TargetCode:
add_fib_at_start(binf[:-4])
class RTL8195ACode:
class RTL8195ACode(object):
"""RTL8195A Hooks"""
@staticmethod
def binary_hook(t_self, resources, elf, binf):
@ -662,7 +662,7 @@ class RTL8195ACode:
rtl8195a_elf2bin(t_self, elf, binf)
class PSOC6Code:
class PSOC6Code(object):
@staticmethod
def complete(t_self, resources, elf, binf):
from tools.targets.PSOC6 import complete as psoc6_complete
@ -694,7 +694,7 @@ class PSOC6Code:
from tools.targets.PSOC6 import sign_image as psoc6_sign_image
psoc6_sign_image(t_self, binf)
class ArmMuscaA1Code:
class ArmMuscaA1Code(object):
"""Musca-A1 Hooks"""
@staticmethod
def binary_hook(t_self, resources, elf, binf):
@ -710,7 +710,7 @@ class ArmMuscaA1Code:
musca_tfm_bin(t_self, binf, secure_bin)
class LPC55S69Code:
class LPC55S69Code(object):
"""LPC55S69 Hooks"""
@staticmethod
def binary_hook(t_self, resources, elf, binf):
@ -725,7 +725,7 @@ class LPC55S69Code:
)
lpc55s69_complete(t_self, binf, secure_bin)
class M2351Code:
class M2351Code(object):
"""M2351 Hooks"""
@staticmethod
def merge_secure(t_self, resources, ns_elf, ns_hex):

View File

@ -247,14 +247,14 @@ def subcommand(name, *args, **kwargs):
choices=TARGET_MAP.keys(), type=str.upper))
def targets_cmd(mcus=[]):
"""Find and print errors about specific targets"""
print dump_all([check_hierarchy(TARGET_MAP[m]) for m in mcus],
default_flow_style=False)
print(dump_all([check_hierarchy(TARGET_MAP[m]) for m in mcus],
default_flow_style=False))
@subcommand("all-targets")
def all_targets_cmd():
"""Print all errors about all parts"""
print dump_all([check_hierarchy(m) for m in TARGET_MAP.values()],
default_flow_style=False)
print(dump_all([check_hierarchy(m) for m in list(TARGET_MAP.values())],
default_flow_style=False))
@subcommand("orphans")
def orphans_cmd():
@ -265,7 +265,7 @@ def orphans_cmd():
if name in orphans:
orphans.remove(name)
if orphans:
print dump_all([orphans], default_flow_style=False)
print(dump_all([orphans], default_flow_style=False))
return len(orphans)
def main():

View File

@ -20,7 +20,7 @@ from mock import patch
from tools.detect_targets import get_interface_version
class MbedLsToolsMock():
class MbedLsToolsMock(object):
"""
Mock of mbedls tools
"""

View File

@ -49,6 +49,7 @@ from ..config import (ConfigException, RAM_ALL_MEMORIES, ROM_ALL_MEMORIES)
from ..regions import (UPDATE_WHITELIST, merge_region_list)
from ..settings import COMPARE_FIXED
from ..settings import ARM_PATH, ARMC6_PATH, GCC_ARM_PATH, IAR_PATH
from future.utils import with_metaclass
TOOLCHAIN_PATHS = {
@ -109,7 +110,7 @@ CORTEX_SYMBOLS = {
}
class mbedToolchain:
class mbedToolchain(with_metaclass(ABCMeta, object)):
OFFICIALLY_SUPPORTED = False
# Verbose logging
@ -127,8 +128,6 @@ class mbedToolchain:
PROFILE_FILE_NAME = ".profile"
__metaclass__ = ABCMeta
profile_template = {'common': [], 'c': [], 'cxx': [], 'asm': [], 'ld': []}
def __init__(self, target, notify=None, macros=None, build_profile=None,