Stylize utils.py

pull/2458/head
Jimmy Brisson 2016-07-27 11:30:47 -05:00
parent b396436432
commit 99f1284a81
1 changed files with 238 additions and 106 deletions

View File

@ -21,18 +21,26 @@ import argparse
import math import math
from os import listdir, remove, makedirs from os import listdir, remove, makedirs
from shutil import copyfile from shutil import copyfile
from os.path import isdir, join, exists, split, relpath, splitext, abspath, commonprefix, normpath from os.path import isdir, join, exists, split, relpath, splitext, abspath
from os.path import commonprefix, normpath
from subprocess import Popen, PIPE, STDOUT, call from subprocess import Popen, PIPE, STDOUT, call
import json import json
from collections import OrderedDict from collections import OrderedDict
import logging import logging
def compile_worker(job): def compile_worker(job):
"""Standard task runner used for compiling
Positional argumets:
job - a dict containing a list of commands and the remaining arguments
to run_cmd
"""
results = [] results = []
for command in job['commands']: for command in job['commands']:
try: try:
_, _stderr, _rc = run_cmd(command, work_dir=job['work_dir'], chroot=job['chroot']) _, _stderr, _rc = run_cmd(command, work_dir=job['work_dir'],
except KeyboardInterrupt as e: chroot=job['chroot'])
except KeyboardInterrupt:
raise ToolException raise ToolException
results.append({ results.append({
@ -48,96 +56,143 @@ def compile_worker(job):
'results': results 'results': results
} }
def cmd(l, check=True, verbose=False, shell=False, cwd=None): def cmd(command, check=True, verbose=False, shell=False, cwd=None):
text = l if shell else ' '.join(l) """A wrapper to run a command as a blocking job"""
text = command if shell else ' '.join(command)
if verbose: if verbose:
print text print text
rc = call(l, shell=shell, cwd=cwd) return_code = call(command, shell=shell, cwd=cwd)
if check and rc != 0: if check and return_code != 0:
raise Exception('ERROR %d: "%s"' % (rc, text)) raise Exception('ERROR %d: "%s"' % (return_code, text))
def run_cmd(command, work_dir=None, chroot=None, redirect=False): def run_cmd(command, work_dir=None, chroot=None, redirect=False):
"""Run a command in the forground
Positional arguments:
command - the command to run
Keyword arguments:
work_dir - the working directory to run the command in
chroot - the chroot to run the command in
redirect - redirect the stderr to a pipe to be used later
"""
if chroot: if chroot:
# Conventions managed by the web team for the mbed.org build system # Conventions managed by the web team for the mbed.org build system
chroot_cmd = [ chroot_cmd = [
'/usr/sbin/chroot', '--userspec=33:33', chroot '/usr/sbin/chroot', '--userspec=33:33', chroot
] ]
for c in command: for element in command:
chroot_cmd += [c.replace(chroot, '')] chroot_cmd += [element.replace(chroot, '')]
logging.debug("Running command %s"%' '.join(chroot_cmd)) logging.debug("Running command %s", ' '.join(chroot_cmd))
command = chroot_cmd command = chroot_cmd
work_dir = None work_dir = None
try: try:
p = Popen(command, stdout=PIPE, stderr=STDOUT if redirect else PIPE, cwd=work_dir) process = Popen(command, stdout=PIPE,
_stdout, _stderr = p.communicate() stderr=STDOUT if redirect else PIPE, cwd=work_dir)
except OSError as e: _stdout, _stderr = process.communicate()
except OSError:
print "[OS ERROR] Command: "+(' '.join(command)) print "[OS ERROR] Command: "+(' '.join(command))
raise raise
return _stdout, _stderr, p.returncode return _stdout, _stderr, process.returncode
def run_cmd_ext(command): def run_cmd_ext(command):
""" A version of run command that checks if the command exists befor running
Positional arguments:
command - the command line you are trying to invoke
"""
assert is_cmd_valid(command[0]) assert is_cmd_valid(command[0])
p = Popen(command, stdout=PIPE, stderr=PIPE) process = Popen(command, stdout=PIPE, stderr=PIPE)
_stdout, _stderr = p.communicate() _stdout, _stderr = process.communicate()
return _stdout, _stderr, p.returncode return _stdout, _stderr, process.returncode
def is_cmd_valid(cmd): def is_cmd_valid(command):
""" Verify that a command exists and is executable
Positional arguments:
command - the command to check
"""
caller = get_caller_name() caller = get_caller_name()
abspath = find_cmd_abspath(cmd) cmd_path = find_cmd_abspath(command)
if not abspath: if not cmd_path:
error("%s: Command '%s' can't be found" % (caller, cmd)) error("%s: Command '%s' can't be found" % (caller, command))
if not is_exec(abspath): if not is_exec(cmd_path):
error("%s: Command '%s' resolves to file '%s' which is not executable" % (caller, cmd, abspath)) error("%s: Command '%s' resolves to file '%s' which is not executable"
% (caller, command, cmd_path))
return True return True
def is_exec(path): def is_exec(path):
"""A simple check to verify that a path to an executable exists
Positional arguments:
path - the executable
"""
return os.access(path, os.X_OK) or os.access(path+'.exe', os.X_OK) return os.access(path, os.X_OK) or os.access(path+'.exe', os.X_OK)
def find_cmd_abspath(cmd): def find_cmd_abspath(command):
""" Returns the absolute path to a command. """ Returns the absolute path to a command.
None is returned if no absolute path was found. None is returned if no absolute path was found.
Positional arguhments:
command - the command to find the path of
""" """
if exists(cmd) or exists(cmd + '.exe'): if exists(command) or exists(command + '.exe'):
return os.path.abspath(cmd) return os.path.abspath(command)
if not 'PATH' in os.environ: if not 'PATH' in os.environ:
raise Exception("Can't find command path for current platform ('%s')" % sys.platform) raise Exception("Can't find command path for current platform ('%s')"
PATH=os.environ['PATH'] % sys.platform)
for path in PATH.split(os.pathsep): path_env = os.environ['PATH']
abspath = '%s/%s' % (path, cmd) for path in path_env.split(os.pathsep):
if exists(abspath) or exists(abspath + '.exe'): cmd_path = '%s/%s' % (path, command)
return abspath if exists(cmd_path) or exists(cmd_path + '.exe'):
return cmd_path
def mkdir(path): def mkdir(path):
""" a wrapped makedirs that only tries to create a directory if it does not
exist already
Positional arguments:
path - the path to maybe create
"""
if not exists(path): if not exists(path):
makedirs(path) makedirs(path)
def copy_file(src, dst): def copy_file(src, dst):
""" Implement the behaviour of "shutil.copy(src, dst)" without copying the """ Implement the behaviour of "shutil.copy(src, dst)" without copying the
permissions (this was causing errors with directories mounted with samba) permissions (this was causing errors with directories mounted with samba)
Positional arguments:
src - the source of the copy operation
dst - the destination of the copy operation
""" """
if isdir(dst): if isdir(dst):
_, file = split(src) _, base = split(src)
dst = join(dst, file) dst = join(dst, base)
copyfile(src, dst) copyfile(src, dst)
def delete_dir_files(dir): def delete_dir_files(directory):
if not exists(dir): """ A function that does rm -rf
Positional arguments:
directory - the directory to remove
"""
if not exists(directory):
return return
for f in listdir(dir): for element in listdir(directory):
file = join(dir, f) to_remove = join(directory, element)
if not isdir(file): if not isdir(to_remove):
remove(file) remove(file)
@ -145,34 +200,58 @@ def get_caller_name(steps=2):
""" """
When called inside a function, it returns the name When called inside a function, it returns the name
of the caller of that function. of the caller of that function.
Keyword arguments:
steps - the number of steps up the stack the calling function is
""" """
return inspect.stack()[steps][3] return inspect.stack()[steps][3]
def error(msg): def error(msg):
"""Fatal error, abort hard
Positional arguments:
msg - the message to print before crashing
"""
print("ERROR: %s" % msg) print("ERROR: %s" % msg)
sys.exit(1) sys.exit(1)
def rel_path(path, base, dot=False): def rel_path(path, base, dot=False):
p = relpath(path, base) """Relative path calculation that optionaly always starts with a dot
if dot and not p.startswith('.'):
p = './' + p Positional arguments:
return p path - the path to make relative
base - what to make the path relative to
Keyword arguments:
dot - if True, the path will always start with a './'
"""
final_path = relpath(path, base)
if dot and not final_path.startswith('.'):
final_path = './' + final_path
return final_path
class ToolException(Exception): class ToolException(Exception):
"""A class representing an exception throw by the tools"""
pass pass
class NotSupportedException(Exception): class NotSupportedException(Exception):
"""A class a toolchain not supporting a particular target"""
pass pass
class InvalidReleaseTargetException(Exception): class InvalidReleaseTargetException(Exception):
pass pass
def split_path(path): def split_path(path):
base, file = split(path) """spilt a file name into it's directory name, base name, and extension
name, ext = splitext(file)
Positional arguments:
path - the file name to split
"""
base, has_ext = split(path)
name, ext = splitext(has_ext)
return base, name, ext return base, name, ext
@ -181,12 +260,15 @@ def get_path_depth(path):
This roughly translates to the number of path separators (os.sep) + 1. This roughly translates to the number of path separators (os.sep) + 1.
Ex. Given "path/to/dir", this would return 3 Ex. Given "path/to/dir", this would return 3
Special cases: "." and "/" return 0 Special cases: "." and "/" return 0
Positional arguments:
path - the path to calculate the depth of
""" """
normalized_path = normpath(path) normalized_path = normpath(path)
path_depth = 0 path_depth = 0
head, tail = split(normalized_path) head, tail = split(normalized_path)
while(tail and tail != '.'): while tail and tail != '.':
path_depth += 1 path_depth += 1
head, tail = split(head) head, tail = split(head)
@ -194,18 +276,28 @@ def get_path_depth(path):
def args_error(parser, message): def args_error(parser, message):
"""Abort with an error that was generated by the arguments to a CLI program
Positional arguments:
parser - the ArgumentParser object that parsed the command line
message - what went wrong
"""
print "\n\n%s\n\n" % message print "\n\n%s\n\n" % message
parser.print_help() parser.print_help()
sys.exit() sys.exit()
def construct_enum(**enums): def construct_enum(**enums):
""" Create your own pseudo-enums """ """ Create your own pseudo-enums
Keyword arguments:
* - a member of the Enum you are creating and it's value
"""
return type('Enum', (), enums) return type('Enum', (), enums)
def check_required_modules(required_modules, verbose=True): def check_required_modules(required_modules, verbose=True):
""" Function checks for Python modules which should be "importable" (installed) """ Function checks for Python modules which should be "importable"
before test suite can be used. before test suite can be used.
@return returns True if all modules are installed already @return returns True if all modules are installed already
""" """
@ -214,63 +306,84 @@ def check_required_modules(required_modules, verbose=True):
for module_name in required_modules: for module_name in required_modules:
try: try:
imp.find_module(module_name) imp.find_module(module_name)
except ImportError as e: except ImportError:
# We also test against a rare case: module is an egg file # We also test against a rare case: module is an egg file
try: try:
__import__(module_name) __import__(module_name)
except ImportError as e: except ImportError as exc:
not_installed_modules.append(module_name) not_installed_modules.append(module_name)
if verbose: if verbose:
print "Error: %s" % e print "Error: %s" % exc
if verbose: if verbose:
if not_installed_modules: if not_installed_modules:
print "Warning: Module(s) %s not installed. Please install required module(s) before using this script."% (', '.join(not_installed_modules)) print ("Warning: Module(s) %s not installed. Please install " + \
"required module(s) before using this script.")\
% (', '.join(not_installed_modules))
if not_installed_modules: if not_installed_modules:
return False return False
else: else:
return True return True
# Utility function: traverse a dictionary and change all the strings in the dictionary to def dict_to_ascii(dictionary):
# ASCII from Unicode. Useful when reading ASCII JSON data, because the JSON decoder always """ Utility function: traverse a dictionary and change all the strings in
# returns Unicode string. the dictionary to ASCII from Unicode. Useful when reading ASCII JSON data,
# Based on http://stackoverflow.com/a/13105359 because the JSON decoder always returns Unicode string. Based on
def dict_to_ascii(input): http://stackoverflow.com/a/13105359
if isinstance(input, dict):
return OrderedDict([(dict_to_ascii(key), dict_to_ascii(value)) for key, value in input.iteritems()]) Positional arguments:
elif isinstance(input, list): dictionary - The dict that contains some Unicode that should be ASCII
return [dict_to_ascii(element) for element in input] """
elif isinstance(input, unicode): if isinstance(dictionary, dict):
return input.encode('ascii') return OrderedDict([(dict_to_ascii(key), dict_to_ascii(value))
else: for key, value in dictionary.iteritems()])
return input elif isinstance(dictionary, list):
return [dict_to_ascii(element) for element in dictionary]
elif isinstance(dictionary, unicode):
return dictionary.encode('ascii')
else:
return dictionary
# Read a JSON file and return its Python representation, transforming all the strings from Unicode
# to ASCII. The order of keys in the JSON file is preserved.
def json_file_to_dict(fname): def json_file_to_dict(fname):
""" Read a JSON file and return its Python representation, transforming all
the strings from Unicode to ASCII. The order of keys in the JSON file is
preserved.
Positional arguments:
fname - the name of the file to parse
"""
try: try:
with open(fname, "rt") as f: with open(fname, "r") as file_obj:
return dict_to_ascii(json.load(f, object_pairs_hook=OrderedDict)) return dict_to_ascii(json.load(file_obj,
object_pairs_hook=OrderedDict))
except (ValueError, IOError): except (ValueError, IOError):
sys.stderr.write("Error parsing '%s':\n" % fname) sys.stderr.write("Error parsing '%s':\n" % fname)
raise raise
# Wowza, double closure # Wowza, double closure
def argparse_type(casedness, prefer_hyphen=False) : def argparse_type(casedness, prefer_hyphen=False):
def middle(list, type_name): def middle(lst, type_name):
# validate that an argument passed in (as string) is a member of the list of possible
# arguments. Offer a suggestion if the case of the string, or the hyphens/underscores
# do not match the expected style of the argument.
def parse_type(string): def parse_type(string):
if prefer_hyphen: newstring = casedness(string).replace("_","-") """ validate that an argument passed in (as string) is a member of
else: newstring = casedness(string).replace("-","_") the list of possible arguments. Offer a suggestion if the case of
if string in list: the string, or the hyphens/underscores do not match the expected
return string style of the argument.
elif string not in list and newstring in list: """
raise argparse.ArgumentTypeError("{0} is not a supported {1}. Did you mean {2}?".format(string, type_name, newstring)) if prefer_hyphen:
newstring = casedness(string).replace("_", "-")
else: else:
raise argparse.ArgumentTypeError("{0} is not a supported {1}. Supported {1}s are:\n{2}".format(string, type_name, columnate(list))) newstring = casedness(string).replace("-", "_")
if string in lst:
return string
elif string not in lst and newstring in lst:
raise argparse.ArgumentTypeError(
"{0} is not a supported {1}. Did you mean {2}?".format(
string, type_name, newstring))
else:
raise argparse.ArgumentTypeError(
"{0} is not a supported {1}. Supported {1}s are:\n{2}".
format(string, type_name, columnate(lst)))
return parse_type return parse_type
return middle return middle
@ -281,15 +394,19 @@ argparse_uppercase_hyphen_type = argparse_type(str.upper, True)
argparse_lowercase_hyphen_type = argparse_type(str.lower, True) argparse_lowercase_hyphen_type = argparse_type(str.lower, True)
def argparse_force_type(case): def argparse_force_type(case):
def middle(list, type_name): """ validate that an argument passed in (as string) is a member of the list
# validate that an argument passed in (as string) is a member of the list of possible of possible arguments after converting it's case.
# arguments after converting it's case. Offer a suggestion if the hyphens/underscores """
# do not match the expected style of the argument. def middle(lst, type_name):
""" The parser type generator"""
def parse_type(string): def parse_type(string):
for option in list: """ The parser type"""
for option in lst:
if case(string) == case(option): if case(string) == case(option):
return option return option
raise argparse.ArgumentTypeError("{0} is not a supported {1}. Supported {1}s are:\n{2}".format(string, type_name, columnate(list))) raise argparse.ArgumentTypeError(
"{0} is not a supported {1}. Supported {1}s are:\n{2}".
format(string, type_name, columnate(lst)))
return parse_type return parse_type
return middle return middle
@ -297,30 +414,42 @@ def argparse_force_type(case):
argparse_force_uppercase_type = argparse_force_type(str.upper) argparse_force_uppercase_type = argparse_force_type(str.upper)
argparse_force_lowercase_type = argparse_force_type(str.lower) argparse_force_lowercase_type = argparse_force_type(str.lower)
# An argument parser combinator that takes in an argument parser and creates a new parser that def argparse_many(func):
# accepts a comma separated list of the same thing. """ An argument parser combinator that takes in an argument parser and
def argparse_many(fn): creates a new parser that accepts a comma separated list of the same thing.
"""
def wrap(string): def wrap(string):
return [fn(s) for s in string.split(",")] """ The actual parser"""
return [func(s) for s in string.split(",")]
return wrap return wrap
# An argument parser that verifies that a string passed in corresponds to a file def argparse_filestring_type(string):
def argparse_filestring_type(string) : """ An argument parser that verifies that a string passed in corresponds
if exists(string) : to a file"""
if exists(string):
return string return string
else : else:
raise argparse.ArgumentTypeError("{0}"" does not exist in the filesystem.".format(string)) raise argparse.ArgumentTypeError(
"{0}"" does not exist in the filesystem.".format(string))
# render a list of strings as a in a bunch of columns def columnate(strings, separator=", ", chars=80):
def columnate(strings, seperator=", ", chars=80): """ render a list of strings as a in a bunch of columns
Positional arguments:
strings - the strings to columnate
Keyword arguments;
separator - the separation between the columns
chars - the maximum with of a row
"""
col_width = max(len(s) for s in strings) col_width = max(len(s) for s in strings)
total_width = col_width + len(seperator) total_width = col_width + len(separator)
columns = math.floor(chars / total_width) columns = math.floor(chars / total_width)
output = "" output = ""
for i, s in zip(range(len(strings)), strings): for i, string in zip(range(len(strings)), strings):
append = s append = string
if i != len(strings) - 1: if i != len(strings) - 1:
append += seperator append += separator
if i % columns == columns - 1: if i % columns == columns - 1:
append += "\n" append += "\n"
else: else:
@ -328,13 +457,16 @@ def columnate(strings, seperator=", ", chars=80):
output += append output += append
return output return output
# fail if argument provided is a parent of the specified directory
def argparse_dir_not_parent(other): def argparse_dir_not_parent(other):
"""fail if argument provided is a parent of the specified directory"""
def parse_type(not_parent): def parse_type(not_parent):
"""The parser type"""
abs_other = abspath(other) abs_other = abspath(other)
abs_not_parent = abspath(not_parent) abs_not_parent = abspath(not_parent)
if abs_not_parent == commonprefix([abs_not_parent, abs_other]): if abs_not_parent == commonprefix([abs_not_parent, abs_other]):
raise argparse.ArgumentTypeError("{0} may not be a parent directory of {1}".format(not_parent, other)) raise argparse.ArgumentTypeError(
"{0} may not be a parent directory of {1}".format(
not_parent, other))
else: else:
return not_parent return not_parent
return parse_type return parse_type