mbed-os/tools/test_mysql.py

272 lines
12 KiB
Python

"""
mbed SDK
Copyright (c) 2011-2014 ARM Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Przemyslaw Wirkus <Przemyslaw.Wirkus@arm.com>
"""
import re
import MySQLdb as mdb
# Imports from TEST API
from tools.test_db import BaseDBAccess
class MySQLDBAccess(BaseDBAccess):
""" Wrapper for MySQL DB access for common test suite interface
"""
def __init__(self):
BaseDBAccess.__init__(self)
self.DB_TYPE = 'mysql'
def detect_database(self, verbose=False):
""" detect database and return VERION data structure or string (verbose=True)
"""
query = 'SHOW VARIABLES LIKE "%version%"'
rows = self.select_all(query)
if verbose:
result = []
for row in rows:
result.append("\t%s: %s"% (row['Variable_name'], row['Value']))
result = "\n".join(result)
else:
result = rows
return result
def parse_db_connection_string(self, str):
""" Parsing SQL DB connection string. String should contain:
- DB Name, user name, password, URL (DB host), name
Function should return tuple with parsed (host, user, passwd, db) or None if error
E.g. connection string: 'mysql://username:password@127.0.0.1/db_name'
"""
result = BaseDBAccess().parse_db_connection_string(str)
if result is not None:
(db_type, username, password, host, db_name) = result
if db_type != 'mysql':
result = None
return result
def is_connected(self):
""" Returns True if we are connected to database
"""
return self.db_object is not None
def connect(self, host, user, passwd, db):
""" Connects to DB and returns DB object
"""
try:
self.db_object = mdb.connect(host=host, user=user, passwd=passwd, db=db)
# Let's remember connection credentials
self.db_type = self.DB_TYPE
self.host = host
self.user = user
self.passwd = passwd
self.db = db
except mdb.Error, e:
print "Error %d: %s"% (e.args[0], e.args[1])
self.db_object = None
self.db_type = None
self.host = None
self.user = None
self.passwd = None
self.db = None
def connect_url(self, db_url):
""" Connects to database using db_url (database url parsing),
store host, username, password, db_name
"""
result = self.parse_db_connection_string(db_url)
if result is not None:
(db_type, username, password, host, db_name) = result
if db_type == self.DB_TYPE:
self.connect(host, username, password, db_name)
def reconnect(self):
""" Reconnects to DB and returns DB object using stored host name,
database name and credentials (user name and password)
"""
self.connect(self.host, self.user, self.passwd, self.db)
def disconnect(self):
""" Close DB connection
"""
if self.db_object:
self.db_object.close()
self.db_object = None
self.db_type = None
def escape_string(self, str):
""" Escapes string so it can be put in SQL query between quotes
"""
con = self.db_object
result = con.escape_string(str)
return result if result else ''
def select_all(self, query):
""" Execute SELECT query and get all results
"""
con = self.db_object
cur = con.cursor(mdb.cursors.DictCursor)
cur.execute(query)
rows = cur.fetchall()
return rows
def insert(self, query, commit=True):
""" Execute INSERT query, define if you want to commit
"""
con = self.db_object
cur = con.cursor()
cur.execute(query)
if commit:
con.commit()
return cur.lastrowid
def get_next_build_id(self, name, desc='', location='', type=None, status=None):
""" Insert new build_id (DB unique build like ID number to send all test results)
"""
if status is None:
status = self.BUILD_ID_STATUS_STARTED
if type is None:
type = self.BUILD_ID_TYPE_TEST
query = """INSERT INTO `%s` (%s_name, %s_desc, %s_location, %s_type_fk, %s_status_fk)
VALUES ('%s', '%s', '%s', %d, %d)"""% (self.TABLE_BUILD_ID,
self.TABLE_BUILD_ID,
self.TABLE_BUILD_ID,
self.TABLE_BUILD_ID,
self.TABLE_BUILD_ID,
self.TABLE_BUILD_ID,
self.escape_string(name),
self.escape_string(desc),
self.escape_string(location),
type,
status)
index = self.insert(query) # Provide inserted record PK
return index
def get_table_entry_pk(self, table, column, value, update_db=True):
""" Checks for entries in tables with two columns (<TABLE_NAME>_pk, <column>)
If update_db is True updates table entry if value in specified column doesn't exist
"""
# TODO: table buffering
result = None
table_pk = '%s_pk'% table
query = """SELECT `%s`
FROM `%s`
WHERE `%s`='%s'"""% (table_pk,
table,
column,
self.escape_string(value))
rows = self.select_all(query)
if len(rows) == 1:
result = rows[0][table_pk]
elif len(rows) == 0 and update_db:
# Update DB with new value
result = self.update_table_entry(table, column, value)
return result
def update_table_entry(self, table, column, value):
""" Updates table entry if value in specified column doesn't exist
Locks table to perform atomic read + update
"""
result = None
con = self.db_object
cur = con.cursor()
cur.execute("LOCK TABLES `%s` WRITE"% table)
table_pk = '%s_pk'% table
query = """SELECT `%s`
FROM `%s`
WHERE `%s`='%s'"""% (table_pk,
table,
column,
self.escape_string(value))
cur.execute(query)
rows = cur.fetchall()
if len(rows) == 0:
query = """INSERT INTO `%s` (%s)
VALUES ('%s')"""% (table,
column,
self.escape_string(value))
cur.execute(query)
result = cur.lastrowid
con.commit()
cur.execute("UNLOCK TABLES")
return result
def update_build_id_info(self, build_id, **kw):
""" Update additional data inside build_id table
Examples:
db.update_build_id_info(build_id, _status_fk=self.BUILD_ID_STATUS_COMPLETED, _shuffle_seed=0.0123456789):
"""
if len(kw):
con = self.db_object
cur = con.cursor()
# Prepare UPDATE query
# ["`mtest_build_id_pk`=[value-1]", "`mtest_build_id_name`=[value-2]", "`mtest_build_id_desc`=[value-3]"]
set_list = []
for col_sufix in kw:
assign_str = "`%s%s`='%s'"% (self.TABLE_BUILD_ID, col_sufix, self.escape_string(str(kw[col_sufix])))
set_list.append(assign_str)
set_str = ', '.join(set_list)
query = """UPDATE `%s`
SET %s
WHERE `mtest_build_id_pk`=%d"""% (self.TABLE_BUILD_ID,
set_str,
build_id)
cur.execute(query)
con.commit()
def insert_test_entry(self, build_id, target, toolchain, test_type, test_id, test_result, test_output, test_time, test_timeout, test_loop, test_extra=''):
""" Inserts test result entry to database. All checks regarding existing
toolchain names in DB are performed.
If some data is missing DB will be updated
"""
# Get all table FK and if entry is new try to insert new value
target_fk = self.get_table_entry_pk(self.TABLE_TARGET, self.TABLE_TARGET + '_name', target)
toolchain_fk = self.get_table_entry_pk(self.TABLE_TOOLCHAIN, self.TABLE_TOOLCHAIN + '_name', toolchain)
test_type_fk = self.get_table_entry_pk(self.TABLE_TEST_TYPE, self.TABLE_TEST_TYPE + '_name', test_type)
test_id_fk = self.get_table_entry_pk(self.TABLE_TEST_ID, self.TABLE_TEST_ID + '_name', test_id)
test_result_fk = self.get_table_entry_pk(self.TABLE_TEST_RESULT, self.TABLE_TEST_RESULT + '_name', test_result)
con = self.db_object
cur = con.cursor()
query = """ INSERT INTO `%s` (`mtest_build_id_fk`,
`mtest_target_fk`,
`mtest_toolchain_fk`,
`mtest_test_type_fk`,
`mtest_test_id_fk`,
`mtest_test_result_fk`,
`mtest_test_output`,
`mtest_test_time`,
`mtest_test_timeout`,
`mtest_test_loop_no`,
`mtest_test_result_extra`)
VALUES (%d, %d, %d, %d, %d, %d, '%s', %.2f, %.2f, %d, '%s')"""% (self.TABLE_TEST_ENTRY,
build_id,
target_fk,
toolchain_fk,
test_type_fk,
test_id_fk,
test_result_fk,
self.escape_string(test_output),
test_time,
test_timeout,
test_loop,
self.escape_string(test_extra))
cur.execute(query)
con.commit()