Remove database code from singletest api

pull/10254/head
Brian Daniels 2019-03-28 11:53:42 -05:00 committed by Jimmy Brisson
parent a40b27322e
commit 6d754fe96c
5 changed files with 1 additions and 555 deletions

View File

@ -38,9 +38,7 @@ Quick navigation:
| `test` | unit tests for tools |
| `test_api.py` | part of pre-greentea greentea |
| `test_configs` | configuration files used by `mbed test` |
| `test_db.py` | part of pre-greentea greentea |
| `test_exporters.py` | part of pre-greentea greentea |
| `test_mysql.py` | part of pre-greentea greentea |
| `tests.py` | implementation of `mbed test --greentea` |
| `toolchains` | API for calling the selected compiler |
| `utils.py` | General purpose utilities like file moving |

View File

@ -65,7 +65,6 @@ from tools.build_api import mcu_toolchain_matrix
# Imports from TEST API
from tools.test_api import SingleTestRunner
from tools.test_api import singletest_in_cli_mode
from tools.test_api import detect_database_verbose
from tools.test_api import get_json_data_from_file
from tools.test_api import get_avail_tests_summary_table
from tools.test_api import get_default_test_options_parser
@ -109,10 +108,6 @@ if __name__ == '__main__':
print "Version %d.%d"% get_version()
exit(0)
if opts.db_url and opts.verbose_test_configuration_only:
detect_database_verbose(opts.db_url)
exit(0)
# Print summary / information about automation test status
if opts.test_automation_report:
print get_avail_tests_summary_table(platform_filter=opts.general_filter_regex)
@ -227,7 +222,6 @@ if __name__ == '__main__':
_clean=opts.clean,
_parser=parser,
_opts=opts,
_opts_db_url=opts.db_url,
_opts_log_file_name=opts.log_file_name,
_opts_report_html_file_name=opts.report_html_file_name,
_opts_report_junit_file_name=opts.report_junit_file_name,

View File

@ -57,7 +57,6 @@ from tools.memap import MemapParser
from tools.targets import TARGET_MAP, Target
from tools.config import Config
import tools.test_configs as TestConfig
from tools.test_db import BaseDBAccess
from tools.build_api import build_project, build_mbed_libs, build_lib
from tools.build_api import get_target_supported_toolchains
from tools.build_api import write_build_report
@ -185,7 +184,6 @@ class SingleTestRunner(object):
_clean=False,
_parser=None,
_opts=None,
_opts_db_url=None,
_opts_log_file_name=None,
_opts_report_html_file_name=None,
_opts_report_junit_file_name=None,
@ -244,7 +242,6 @@ class SingleTestRunner(object):
self.test_spec = _test_spec
# Settings passed e.g. from command line
self.opts_db_url = _opts_db_url
self.opts_log_file_name = _opts_log_file_name
self.opts_report_html_file_name = _opts_report_html_file_name
self.opts_report_junit_file_name = _opts_report_junit_file_name
@ -284,21 +281,6 @@ class SingleTestRunner(object):
# File / screen logger initialization
self.logger = CLITestLogger(file_name=self.opts_log_file_name) # Default test logger
# Database related initializations
self.db_logger = factory_db_logger(self.opts_db_url)
self.db_logger_build_id = None # Build ID (database index of build_id table)
# Let's connect to database to set up credentials and confirm database is ready
if self.db_logger:
self.db_logger.connect_url(self.opts_db_url) # Save db access info inside db_logger object
if self.db_logger.is_connected():
# Get hostname and uname so we can use it as build description
# when creating new build_id in external database
(_hostname, _uname) = self.db_logger.get_hostname()
_host_location = os.path.dirname(os.path.abspath(__file__))
build_id_type = None if self.opts_only_build_tests is None else self.db_logger.BUILD_ID_TYPE_BUILD_ONLY
self.db_logger_build_id = self.db_logger.get_next_build_id(_hostname, desc=_uname, location=_host_location, type=build_id_type)
self.db_logger.disconnect()
def dump_options(self):
""" Function returns data structure with common settings passed to SingelTestRunner
It can be used for example to fill _extra fields in database storing test suite single run data
@ -307,8 +289,7 @@ class SingleTestRunner(object):
or
data_str = json.dumps(self.dump_options())
"""
result = {"db_url" : str(self.opts_db_url),
"log_file_name" : str(self.opts_log_file_name),
result = {"log_file_name" : str(self.opts_log_file_name),
"shuffle_test_order" : str(self.opts_shuffle_test_order),
"shuffle_test_seed" : str(self.opts_shuffle_test_seed),
"test_by_names" : str(self.opts_test_by_names),
@ -416,27 +397,6 @@ class SingleTestRunner(object):
if self.opts_shuffle_test_order:
random.shuffle(test_map_keys, self.shuffle_random_func)
# Update database with shuffle seed f applicable
if self.db_logger:
self.db_logger.reconnect();
if self.db_logger.is_connected():
self.db_logger.update_build_id_info(
self.db_logger_build_id,
_shuffle_seed=self.shuffle_random_func())
self.db_logger.disconnect();
if self.db_logger:
self.db_logger.reconnect();
if self.db_logger.is_connected():
# Update MUTs and Test Specification in database
self.db_logger.update_build_id_info(
self.db_logger_build_id,
_muts=self.muts, _test_spec=self.test_spec)
# Update Extra information in database (some options passed to test suite)
self.db_logger.update_build_id_info(
self.db_logger_build_id,
_extra=json.dumps(self.dump_options()))
self.db_logger.disconnect();
valid_test_map_keys = self.get_valid_tests(test_map_keys, target, toolchain, test_ids, self.opts_include_non_automated)
skipped_test_map_keys = self.get_skipped_tests(test_map_keys, valid_test_map_keys)
@ -656,12 +616,6 @@ class SingleTestRunner(object):
self.execute_thread_slice(q, target, toolchains, clean, test_ids, self.build_report, self.build_properties)
q.get()
if self.db_logger:
self.db_logger.reconnect();
if self.db_logger.is_connected():
self.db_logger.update_build_id_info(self.db_logger_build_id, _status_fk=self.db_logger.BUILD_ID_STATUS_COMPLETED)
self.db_logger.disconnect();
return self.test_summary, self.shuffle_random_seed, self.test_summary_ext, self.test_suite_properties_ext, self.build_report, self.build_properties
def get_valid_tests(self, test_map_keys, target, toolchain, test_ids, include_non_automated):
@ -885,9 +839,6 @@ class SingleTestRunner(object):
mcu = mut['mcu']
copy_method = mut.get('copy_method') # Available board configuration selection e.g. core selection etc.
if self.db_logger:
self.db_logger.reconnect()
selected_copy_method = self.opts_copy_method if copy_method is None else copy_method
# Tests can be looped so test results must be stored for the same test
@ -986,27 +937,10 @@ class SingleTestRunner(object):
single_test_result, target_name_unique, toolchain_name, test_id,
test_description, elapsed_time, single_timeout))
# Update database entries for ongoing test
if self.db_logger and self.db_logger.is_connected():
test_type = 'SingleTest'
self.db_logger.insert_test_entry(self.db_logger_build_id,
target_name,
toolchain_name,
test_type,
test_id,
single_test_result,
single_test_output,
elapsed_time,
single_timeout,
test_index)
# If we perform waterfall test we test until we get OK and we stop testing
if self.opts_waterfall_test and single_test_result == self.TEST_RESULT_OK:
break
if self.db_logger:
self.db_logger.disconnect()
return (self.shape_global_test_loop_result(test_all_result, self.opts_waterfall_test and self.opts_consolidate_waterfall_test),
target_name_unique,
toolchain_name,
@ -1658,46 +1592,6 @@ class CLITestLogger(TestLogger):
pass
return log_line_str
def factory_db_logger(db_url):
""" Factory database driver depending on database type supplied in database connection string db_url
"""
if db_url is not None:
from tools.test_mysql import MySQLDBAccess
connection_info = BaseDBAccess().parse_db_connection_string(db_url)
if connection_info is not None:
(db_type, username, password, host, db_name) = BaseDBAccess().parse_db_connection_string(db_url)
if db_type == 'mysql':
return MySQLDBAccess()
return None
def detect_database_verbose(db_url):
""" uses verbose mode (prints) database detection sequence to check it database connection string is valid
"""
result = BaseDBAccess().parse_db_connection_string(db_url)
if result is not None:
# Parsing passed
(db_type, username, password, host, db_name) = result
#print "DB type '%s', user name '%s', password '%s', host '%s', db name '%s'"% result
# Let's try to connect
db_ = factory_db_logger(db_url)
if db_ is not None:
print("Connecting to database '%s'..." % db_url)
db_.connect(host, username, password, db_name)
if db_.is_connected():
print("ok")
print("Detecting database...")
print(db_.detect_database(verbose=True))
print("Disconnecting...")
db_.disconnect()
print("done")
else:
print("Database type '%s' unknown" % db_type)
else:
print("Parse error: '%s' - DB Url error" % db_url)
def get_module_avail(module_name):
""" This function returns True if module_name is already imported module
"""
@ -1987,10 +1881,6 @@ def get_default_test_options_parser():
type=int,
help='You can increase global timeout for each test by specifying additional test timeout in seconds')
parser.add_argument('--db',
dest='db_url',
help='This specifies what database test suite uses to store its state. To pass DB connection info use database connection string. Example: \'mysql://username:password@127.0.0.1/db_name\'')
parser.add_argument('-l', '--log',
dest='log_file_name',
help='Log events to external file (note not all console entries may be visible in log file)')

View File

@ -1,165 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2014 ARM Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Przemyslaw Wirkus <Przemyslaw.Wirkus@arm.com>
"""
import re
import json
class BaseDBAccess():
""" Class used to connect with test database and store test results
"""
def __init__(self):
self.db_object = None
self.db_type = None
# Connection credentials
self.host = None
self.user = None
self.passwd = None
self.db = None
# Test Suite DB scheme (table names)
self.TABLE_BUILD_ID = 'mtest_build_id'
self.TABLE_BUILD_ID_STATUS = 'mtest_build_id_status'
self.TABLE_BUILD_ID_TYPE = 'mtest_build_id_type'
self.TABLE_TARGET = 'mtest_target'
self.TABLE_TEST_ENTRY = 'mtest_test_entry'
self.TABLE_TEST_ID = 'mtest_test_id'
self.TABLE_TEST_RESULT = 'mtest_test_result'
self.TABLE_TEST_TYPE = 'mtest_test_type'
self.TABLE_TOOLCHAIN = 'mtest_toolchain'
# Build ID status PKs
self.BUILD_ID_STATUS_STARTED = 1 # Started
self.BUILD_ID_STATUS_IN_PROGRESS = 2 # In Progress
self.BUILD_ID_STATUS_COMPLETED = 3 #Completed
self.BUILD_ID_STATUS_FAILED = 4 # Failed
# Build ID type PKs
self.BUILD_ID_TYPE_TEST = 1 # Test
self.BUILD_ID_TYPE_BUILD_ONLY = 2 # Build Only
def get_hostname(self):
""" Useful when creating build_id in database
Function returns (hostname, uname) which can be used as (build_id_name, build_id_desc)
"""
# Get hostname from socket
import socket
hostname = socket.gethostbyaddr(socket.gethostname())[0]
# Get uname from platform resources
import platform
uname = json.dumps(platform.uname())
return (hostname, uname)
def get_db_type(self):
""" Returns database type. E.g. 'mysql', 'sqlLite' etc.
"""
return self.db_type
def detect_database(self, verbose=False):
""" detect database and return VERION data structure or string (verbose=True)
"""
return None
def parse_db_connection_string(self, str):
""" Parsing SQL DB connection string. String should contain:
- DB Name, user name, password, URL (DB host), name
Function should return tuple with parsed (db_type, username, password, host, db_name) or None if error
(db_type, username, password, host, db_name) = self.parse_db_connection_string(db_url)
E.g. connection string: 'mysql://username:password@127.0.0.1/db_name'
"""
result = None
if type(str) == type(''):
PATTERN = '^([\w]+)://([\w]+):([\w]*)@(.*)/([\w]+)'
result = re.match(PATTERN, str)
if result is not None:
result = result.groups() # Tuple (db_name, host, user, passwd, db)
return result # (db_type, username, password, host, db_name)
def is_connected(self):
""" Returns True if we are connected to database
"""
pass
def connect(self, host, user, passwd, db):
""" Connects to DB and returns DB object
"""
pass
def connect_url(self, db_url):
""" Connects to database using db_url (database url parsing),
store host, username, password, db_name
"""
pass
def reconnect(self):
""" Reconnects to DB and returns DB object using stored host name,
database name and credentials (user name and password)
"""
pass
def disconnect(self):
""" Close DB connection
"""
pass
def escape_string(self, str):
""" Escapes string so it can be put in SQL query between quotes
"""
pass
def select_all(self, query):
""" Execute SELECT query and get all results
"""
pass
def insert(self, query, commit=True):
""" Execute INSERT query, define if you want to commit
"""
pass
def get_next_build_id(self, name, desc='', location='', type=None, status=None):
""" Insert new build_id (DB unique build like ID number to send all test results)
"""
pass
def get_table_entry_pk(self, table, column, value, update_db=True):
""" Checks for entries in tables with two columns (<TABLE_NAME>_pk, <column>)
If update_db is True updates table entry if value in specified column doesn't exist
"""
pass
def update_table_entry(self, table, column, value):
""" Updates table entry if value in specified column doesn't exist
Locks table to perform atomic read + update
"""
pass
def update_build_id_info(self, build_id, **kw):
""" Update additional data inside build_id table
Examples:
db.update_build_is(build_id, _status_fk=self.BUILD_ID_STATUS_COMPLETED, _shuffle_seed=0.0123456789):
"""
pass
def insert_test_entry(self, build_id, target, toolchain, test_type, test_id, test_result, test_time, test_timeout, test_loop, test_extra=''):
""" Inserts test result entry to database. All checks regarding existing
toolchain names in DB are performed.
If some data is missing DB will be updated
"""
pass

View File

@ -1,271 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2014 ARM Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Przemyslaw Wirkus <Przemyslaw.Wirkus@arm.com>
"""
import re
import MySQLdb as mdb
# Imports from TEST API
from tools.test_db import BaseDBAccess
class MySQLDBAccess(BaseDBAccess):
""" Wrapper for MySQL DB access for common test suite interface
"""
def __init__(self):
BaseDBAccess.__init__(self)
self.DB_TYPE = 'mysql'
def detect_database(self, verbose=False):
""" detect database and return VERION data structure or string (verbose=True)
"""
query = 'SHOW VARIABLES LIKE "%version%"'
rows = self.select_all(query)
if verbose:
result = []
for row in rows:
result.append("\t%s: %s"% (row['Variable_name'], row['Value']))
result = "\n".join(result)
else:
result = rows
return result
def parse_db_connection_string(self, str):
""" Parsing SQL DB connection string. String should contain:
- DB Name, user name, password, URL (DB host), name
Function should return tuple with parsed (host, user, passwd, db) or None if error
E.g. connection string: 'mysql://username:password@127.0.0.1/db_name'
"""
result = BaseDBAccess().parse_db_connection_string(str)
if result is not None:
(db_type, username, password, host, db_name) = result
if db_type != 'mysql':
result = None
return result
def is_connected(self):
""" Returns True if we are connected to database
"""
return self.db_object is not None
def connect(self, host, user, passwd, db):
""" Connects to DB and returns DB object
"""
try:
self.db_object = mdb.connect(host=host, user=user, passwd=passwd, db=db)
# Let's remember connection credentials
self.db_type = self.DB_TYPE
self.host = host
self.user = user
self.passwd = passwd
self.db = db
except mdb.Error, e:
print "Error %d: %s"% (e.args[0], e.args[1])
self.db_object = None
self.db_type = None
self.host = None
self.user = None
self.passwd = None
self.db = None
def connect_url(self, db_url):
""" Connects to database using db_url (database url parsing),
store host, username, password, db_name
"""
result = self.parse_db_connection_string(db_url)
if result is not None:
(db_type, username, password, host, db_name) = result
if db_type == self.DB_TYPE:
self.connect(host, username, password, db_name)
def reconnect(self):
""" Reconnects to DB and returns DB object using stored host name,
database name and credentials (user name and password)
"""
self.connect(self.host, self.user, self.passwd, self.db)
def disconnect(self):
""" Close DB connection
"""
if self.db_object:
self.db_object.close()
self.db_object = None
self.db_type = None
def escape_string(self, str):
""" Escapes string so it can be put in SQL query between quotes
"""
con = self.db_object
result = con.escape_string(str)
return result if result else ''
def select_all(self, query):
""" Execute SELECT query and get all results
"""
con = self.db_object
cur = con.cursor(mdb.cursors.DictCursor)
cur.execute(query)
rows = cur.fetchall()
return rows
def insert(self, query, commit=True):
""" Execute INSERT query, define if you want to commit
"""
con = self.db_object
cur = con.cursor()
cur.execute(query)
if commit:
con.commit()
return cur.lastrowid
def get_next_build_id(self, name, desc='', location='', type=None, status=None):
""" Insert new build_id (DB unique build like ID number to send all test results)
"""
if status is None:
status = self.BUILD_ID_STATUS_STARTED
if type is None:
type = self.BUILD_ID_TYPE_TEST
query = """INSERT INTO `%s` (%s_name, %s_desc, %s_location, %s_type_fk, %s_status_fk)
VALUES ('%s', '%s', '%s', %d, %d)"""% (self.TABLE_BUILD_ID,
self.TABLE_BUILD_ID,
self.TABLE_BUILD_ID,
self.TABLE_BUILD_ID,
self.TABLE_BUILD_ID,
self.TABLE_BUILD_ID,
self.escape_string(name),
self.escape_string(desc),
self.escape_string(location),
type,
status)
index = self.insert(query) # Provide inserted record PK
return index
def get_table_entry_pk(self, table, column, value, update_db=True):
""" Checks for entries in tables with two columns (<TABLE_NAME>_pk, <column>)
If update_db is True updates table entry if value in specified column doesn't exist
"""
# TODO: table buffering
result = None
table_pk = '%s_pk'% table
query = """SELECT `%s`
FROM `%s`
WHERE `%s`='%s'"""% (table_pk,
table,
column,
self.escape_string(value))
rows = self.select_all(query)
if len(rows) == 1:
result = rows[0][table_pk]
elif len(rows) == 0 and update_db:
# Update DB with new value
result = self.update_table_entry(table, column, value)
return result
def update_table_entry(self, table, column, value):
""" Updates table entry if value in specified column doesn't exist
Locks table to perform atomic read + update
"""
result = None
con = self.db_object
cur = con.cursor()
cur.execute("LOCK TABLES `%s` WRITE"% table)
table_pk = '%s_pk'% table
query = """SELECT `%s`
FROM `%s`
WHERE `%s`='%s'"""% (table_pk,
table,
column,
self.escape_string(value))
cur.execute(query)
rows = cur.fetchall()
if len(rows) == 0:
query = """INSERT INTO `%s` (%s)
VALUES ('%s')"""% (table,
column,
self.escape_string(value))
cur.execute(query)
result = cur.lastrowid
con.commit()
cur.execute("UNLOCK TABLES")
return result
def update_build_id_info(self, build_id, **kw):
""" Update additional data inside build_id table
Examples:
db.update_build_id_info(build_id, _status_fk=self.BUILD_ID_STATUS_COMPLETED, _shuffle_seed=0.0123456789):
"""
if len(kw):
con = self.db_object
cur = con.cursor()
# Prepare UPDATE query
# ["`mtest_build_id_pk`=[value-1]", "`mtest_build_id_name`=[value-2]", "`mtest_build_id_desc`=[value-3]"]
set_list = []
for col_sufix in kw:
assign_str = "`%s%s`='%s'"% (self.TABLE_BUILD_ID, col_sufix, self.escape_string(str(kw[col_sufix])))
set_list.append(assign_str)
set_str = ', '.join(set_list)
query = """UPDATE `%s`
SET %s
WHERE `mtest_build_id_pk`=%d"""% (self.TABLE_BUILD_ID,
set_str,
build_id)
cur.execute(query)
con.commit()
def insert_test_entry(self, build_id, target, toolchain, test_type, test_id, test_result, test_output, test_time, test_timeout, test_loop, test_extra=''):
""" Inserts test result entry to database. All checks regarding existing
toolchain names in DB are performed.
If some data is missing DB will be updated
"""
# Get all table FK and if entry is new try to insert new value
target_fk = self.get_table_entry_pk(self.TABLE_TARGET, self.TABLE_TARGET + '_name', target)
toolchain_fk = self.get_table_entry_pk(self.TABLE_TOOLCHAIN, self.TABLE_TOOLCHAIN + '_name', toolchain)
test_type_fk = self.get_table_entry_pk(self.TABLE_TEST_TYPE, self.TABLE_TEST_TYPE + '_name', test_type)
test_id_fk = self.get_table_entry_pk(self.TABLE_TEST_ID, self.TABLE_TEST_ID + '_name', test_id)
test_result_fk = self.get_table_entry_pk(self.TABLE_TEST_RESULT, self.TABLE_TEST_RESULT + '_name', test_result)
con = self.db_object
cur = con.cursor()
query = """ INSERT INTO `%s` (`mtest_build_id_fk`,
`mtest_target_fk`,
`mtest_toolchain_fk`,
`mtest_test_type_fk`,
`mtest_test_id_fk`,
`mtest_test_result_fk`,
`mtest_test_output`,
`mtest_test_time`,
`mtest_test_timeout`,
`mtest_test_loop_no`,
`mtest_test_result_extra`)
VALUES (%d, %d, %d, %d, %d, %d, '%s', %.2f, %.2f, %d, '%s')"""% (self.TABLE_TEST_ENTRY,
build_id,
target_fk,
toolchain_fk,
test_type_fk,
test_id_fk,
test_result_fk,
self.escape_string(test_output),
test_time,
test_timeout,
test_loop,
self.escape_string(test_extra))
cur.execute(query)
con.commit()