Use different folders for pg vs. ppas RE-SQL tests. Fall back to the tests folder without a subdirectory if neither exist.

pull/25/head
Akshay Joshi 2019-07-05 14:40:51 +01:00 committed by Dave Page
parent e4bf52f355
commit 11a3aa56be
27 changed files with 984 additions and 74 deletions

View File

@ -98,7 +98,10 @@
},{
"type": "delete",
"name": "Drop FDW",
"endpoint": "NODE-foreign_data_wrapper.delete_id"
"endpoint": "NODE-foreign_data_wrapper.delete_id",
"data": {
"name": "Fdw2_$%{}[]()&*^!@\"'`\\/#"
}
}
]
}

View File

@ -98,7 +98,10 @@
},{
"type": "delete",
"name": "Drop FDW",
"endpoint": "NODE-foreign_data_wrapper.delete_id"
"endpoint": "NODE-foreign_data_wrapper.delete_id",
"data": {
"name": "Fdw2_$%{}[]()&*^!@\"'`\\/#"
}
}
]
}

View File

@ -1,9 +1,4 @@
{
"prerequisite": {
"minVer": 90400,
"maxVer": null,
"type": "ppas"
},
"scenarios": [
{
"type": "create",

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 14 KiB

View File

@ -0,0 +1,41 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2019, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import json
# Utility functions used by tests
# Executes a query and polls for the results, then returns them
def execute_query(tester, query, start_query_tool_url, poll_url):
# Start query tool and execute sql
response = tester.post(start_query_tool_url,
data=json.dumps({"sql": query}),
content_type='html/json')
if response.status_code != 200:
return False, None
# Poll for results
return poll_for_query_results(tester=tester, poll_url=poll_url)
# Polls for the result of an executed query
def poll_for_query_results(tester, poll_url):
# Poll for results until they are successful
while True:
response = tester.get(poll_url)
if response.status_code != 200:
return False, None
response_data = json.loads(response.data.decode('utf-8'))
status = response_data['data']['status']
if status == 'Success':
return True, response_data
elif status == 'NotConnected' or status == 'Cancel':
return False, None

View File

@ -0,0 +1,125 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2019, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import json
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from .execute_query_utils import execute_query
class TestQueryUpdatableResultset(BaseTestGenerator):
""" This class will test the detection of whether the query
result-set is updatable. """
scenarios = [
('When selecting all columns of the table', dict(
sql='SELECT * FROM test_for_updatable_resultset;',
primary_keys={
'pk_col1': 'int4',
'pk_col2': 'int4'
}
)),
('When selecting all primary keys of the table', dict(
sql='SELECT pk_col1, pk_col2 FROM test_for_updatable_resultset;',
primary_keys={
'pk_col1': 'int4',
'pk_col2': 'int4'
}
)),
('When selecting some of the primary keys of the table', dict(
sql='SELECT pk_col2 FROM test_for_updatable_resultset;',
primary_keys=None
)),
('When selecting none of the primary keys of the table', dict(
sql='SELECT normal_col1 FROM test_for_updatable_resultset;',
primary_keys=None
)),
('When renaming a primary key', dict(
sql='SELECT pk_col1 as some_col, '
'pk_col2 FROM test_for_updatable_resultset;',
primary_keys=None
)),
('When renaming a column to a primary key name', dict(
sql='SELECT pk_col1, pk_col2, normal_col1 as pk_col1 '
'FROM test_for_updatable_resultset;',
primary_keys=None
))
]
def setUp(self):
self._initialize_database_connection()
self._initialize_query_tool()
self._initialize_urls()
self._create_test_table()
def runTest(self):
is_success, response_data = \
execute_query(tester=self.tester,
query=self.sql,
poll_url=self.poll_url,
start_query_tool_url=self.start_query_tool_url)
self.assertEquals(is_success, True)
# Check primary keys
primary_keys = response_data['data']['primary_keys']
self.assertEquals(primary_keys, self.primary_keys)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)
def _initialize_database_connection(self):
database_info = parent_node_dict["database"][-1]
self.server_id = database_info["server_id"]
self.db_id = database_info["db_id"]
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
self.server_id,
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to the database.")
def _initialize_query_tool(self):
url = '/datagrid/initialize/query_tool/{0}/{1}/{2}'.format(
utils.SERVER_GROUP, self.server_id, self.db_id)
response = self.tester.post(url)
self.assertEquals(response.status_code, 200)
response_data = json.loads(response.data.decode('utf-8'))
self.trans_id = response_data['data']['gridTransId']
def _initialize_urls(self):
self.start_query_tool_url = \
'/sqleditor/query_tool/start/{0}'.format(self.trans_id)
self.poll_url = '/sqleditor/poll/{0}'.format(self.trans_id)
def _create_test_table(self):
create_sql = """
DROP TABLE IF EXISTS test_for_updatable_resultset;
CREATE TABLE test_for_updatable_resultset(
pk_col1 SERIAL,
pk_col2 SERIAL,
normal_col1 VARCHAR,
normal_col2 VARCHAR,
PRIMARY KEY(pk_col1, pk_col2)
);
"""
is_success, _ = \
execute_query(tester=self.tester,
query=create_sql,
start_query_tool_url=self.start_query_tool_url,
poll_url=self.poll_url)
self.assertEquals(is_success, True)

View File

@ -0,0 +1,347 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2019, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import json
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from .execute_query_utils import execute_query
class TestSaveChangedData(BaseTestGenerator):
""" This class tests saving data changes in the grid to the database """
scenarios = [
('When inserting new valid row', dict(
save_payload={
"updated": {},
"added": {
"2": {
"err": False,
"data": {
"pk_col": "3",
"__temp_PK": "2",
"normal_col": "three"
}
}
},
"staged_rows": {},
"deleted": {},
"updated_index": {},
"added_index": {"2": "2"},
"columns": [
{
"name": "pk_col",
"display_name": "pk_col",
"column_type": "[PK] integer",
"column_type_internal": "integer",
"pos": 0,
"label": "pk_col<br>[PK] integer",
"cell": "number",
"can_edit": True,
"type": "integer",
"not_null": True,
"has_default_val": False,
"is_array": False},
{"name": "normal_col",
"display_name": "normal_col",
"column_type": "character varying",
"column_type_internal": "character varying",
"pos": 1,
"label": "normal_col<br>character varying",
"cell": "string",
"can_edit": True,
"type": "character varying",
"not_null": False,
"has_default_val": False,
"is_array": False}
]
},
save_status=True,
check_sql='SELECT * FROM test_for_save_data WHERE pk_col = 3',
check_result=[[3, "three"]]
)),
('When inserting new invalid row', dict(
save_payload={
"updated": {},
"added": {
"2": {
"err": False,
"data": {
"pk_col": "1",
"__temp_PK": "2",
"normal_col": "four"
}
}
},
"staged_rows": {},
"deleted": {},
"updated_index": {},
"added_index": {"2": "2"},
"columns": [
{
"name": "pk_col",
"display_name": "pk_col",
"column_type": "[PK] integer",
"column_type_internal": "integer",
"pos": 0,
"label": "pk_col<br>[PK] integer",
"cell": "number",
"can_edit": True,
"type": "integer",
"not_null": True,
"has_default_val": False,
"is_array": False},
{"name": "normal_col",
"display_name": "normal_col",
"column_type": "character varying",
"column_type_internal": "character varying",
"pos": 1,
"label": "normal_col<br>character varying",
"cell": "string",
"can_edit": True,
"type": "character varying",
"not_null": False,
"has_default_val": False,
"is_array": False}
]
},
save_status=False,
check_sql=None,
check_result=None
)),
('When updating a row in a valid way', dict(
save_payload={
"updated": {
"1":
{"err": False,
"data": {"normal_col": "ONE"},
"primary_keys":
{"pk_col": 1}
}
},
"added": {},
"staged_rows": {},
"deleted": {},
"updated_index": {"1": "1"},
"added_index": {},
"columns": [
{
"name": "pk_col",
"display_name": "pk_col",
"column_type": "[PK] integer",
"column_type_internal": "integer",
"pos": 0,
"label": "pk_col<br>[PK] integer",
"cell": "number",
"can_edit": True,
"type": "integer",
"not_null": True,
"has_default_val": False,
"is_array": False},
{"name": "normal_col",
"display_name": "normal_col",
"column_type": "character varying",
"column_type_internal": "character varying",
"pos": 1,
"label": "normal_col<br>character varying",
"cell": "string",
"can_edit": True,
"type": "character varying",
"not_null": False,
"has_default_val": False,
"is_array": False}
]
},
save_status=True,
check_sql='SELECT * FROM test_for_save_data WHERE pk_col = 1',
check_result=[[1, "ONE"]]
)),
('When updating a row in an invalid way', dict(
save_payload={
"updated": {
"1":
{"err": False,
"data": {"pk_col": "2"},
"primary_keys":
{"pk_col": 1}
}
},
"added": {},
"staged_rows": {},
"deleted": {},
"updated_index": {"1": "1"},
"added_index": {},
"columns": [
{
"name": "pk_col",
"display_name": "pk_col",
"column_type": "[PK] integer",
"column_type_internal": "integer",
"pos": 0,
"label": "pk_col<br>[PK] integer",
"cell": "number",
"can_edit": True,
"type": "integer",
"not_null": True,
"has_default_val": False,
"is_array": False},
{"name": "normal_col",
"display_name": "normal_col",
"column_type": "character varying",
"column_type_internal": "character varying",
"pos": 1,
"label": "normal_col<br>character varying",
"cell": "string",
"can_edit": True,
"type": "character varying",
"not_null": False,
"has_default_val": False,
"is_array": False}
]
},
save_status=False,
check_sql=None,
check_result=None
)),
('When deleting a row', dict(
save_payload={
"updated": {},
"added": {},
"staged_rows": {"1": {"pk_col": 2}},
"deleted": {"1": {"pk_col": 2}},
"updated_index": {},
"added_index": {},
"columns": [
{
"name": "pk_col",
"display_name": "pk_col",
"column_type": "[PK] integer",
"column_type_internal": "integer",
"pos": 0,
"label": "pk_col<br>[PK] integer",
"cell": "number",
"can_edit": True,
"type": "integer",
"not_null": True,
"has_default_val": False,
"is_array": False},
{"name": "normal_col",
"display_name": "normal_col",
"column_type": "character varying",
"column_type_internal": "character varying",
"pos": 1,
"label": "normal_col<br>character varying",
"cell": "string",
"can_edit": True,
"type": "character varying",
"not_null": False,
"has_default_val": False,
"is_array": False}
]
},
save_status=True,
check_sql='SELECT * FROM test_for_save_data WHERE pk_col = 2',
check_result='SELECT 0'
)),
]
def setUp(self):
self._initialize_database_connection()
self._initialize_query_tool()
self._initialize_urls_and_select_sql()
self._create_test_table()
def runTest(self):
# Execute select sql
is_success, _ = \
execute_query(tester=self.tester,
query=self.select_sql,
start_query_tool_url=self.start_query_tool_url,
poll_url=self.poll_url)
self.assertEquals(is_success, True)
# Send a request to save changed data
response = self.tester.post(self.save_url,
data=json.dumps(self.save_payload),
content_type='html/json')
self.assertEquals(response.status_code, 200)
# Check that the save is successful
response_data = json.loads(response.data.decode('utf-8'))
save_status = response_data['data']['status']
self.assertEquals(save_status, self.save_status)
if self.check_sql:
# Execute check sql
is_success, response_data = \
execute_query(tester=self.tester,
query=self.check_sql,
start_query_tool_url=self.start_query_tool_url,
poll_url=self.poll_url)
self.assertEquals(is_success, True)
# Check table for updates
result = response_data['data']['result']
self.assertEquals(result, self.check_result)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)
def _initialize_database_connection(self):
database_info = parent_node_dict["database"][-1]
self.server_id = database_info["server_id"]
self.db_id = database_info["db_id"]
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
self.server_id,
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to the database.")
def _initialize_query_tool(self):
url = '/datagrid/initialize/query_tool/{0}/{1}/{2}'.format(
utils.SERVER_GROUP, self.server_id, self.db_id)
response = self.tester.post(url)
self.assertEquals(response.status_code, 200)
response_data = json.loads(response.data.decode('utf-8'))
self.trans_id = response_data['data']['gridTransId']
def _initialize_urls_and_select_sql(self):
self.start_query_tool_url = \
'/sqleditor/query_tool/start/{0}'.format(self.trans_id)
self.save_url = '/sqleditor/save/{0}'.format(self.trans_id)
self.poll_url = '/sqleditor/poll/{0}'.format(self.trans_id)
self.select_sql = 'SELECT * FROM test_for_save_data;'
def _create_test_table(self):
create_sql = """
DROP TABLE IF EXISTS test_for_save_data;
CREATE TABLE test_for_save_data(
pk_col INT PRIMARY KEY,
normal_col VARCHAR);
INSERT INTO test_for_save_data VALUES
(1, 'one'),
(2, 'two');
"""
is_success, _ = \
execute_query(tester=self.tester,
query=create_sql,
start_query_tool_url=self.start_query_tool_url,
poll_url=self.poll_url)
self.assertEquals(is_success, True)

View File

@ -0,0 +1,120 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2019, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
"""
Check if the result-set of a query is updatable, A resultset is
updatable (as of this version) if:
- All columns belong to the same table.
- All the primary key columns of the table are present in the resultset
- No duplicate columns
"""
from flask import render_template
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
def is_query_resultset_updatable(conn, sql_path):
"""
This function is used to check whether the last successful query
produced updatable results.
Args:
conn: Connection object.
sql_path: the path to the sql templates.
"""
columns_info = conn.get_column_info()
if columns_info is None or len(columns_info) < 1:
return return_not_updatable()
table_oid = _check_single_table(columns_info)
if not table_oid:
return return_not_updatable()
if not _check_duplicate_columns(columns_info):
return return_not_updatable()
if conn.connected():
primary_keys, primary_keys_columns, pk_names = \
_get_primary_keys(conn=conn,
table_oid=table_oid,
sql_path=sql_path)
if not _check_primary_keys_uniquely_exist(primary_keys_columns,
columns_info):
return return_not_updatable()
return True, primary_keys, pk_names, table_oid
else:
return return_not_updatable()
def _check_single_table(columns_info):
table_oid = columns_info[0]['table_oid']
for column in columns_info:
if column['table_oid'] != table_oid:
return None
return table_oid
def _check_duplicate_columns(columns_info):
column_numbers = \
[col['table_column'] for col in columns_info]
is_duplicate_columns = len(column_numbers) != len(set(column_numbers))
if is_duplicate_columns:
return False
return True
def _check_primary_keys_uniquely_exist(primary_keys_columns, columns_info):
for pk in primary_keys_columns:
pk_exists = False
for col in columns_info:
if col['table_column'] == pk['column_number']:
pk_exists = True
# If the primary key column is renamed
if col['display_name'] != pk['name']:
return False
# If a normal column is renamed to a primary key column name
elif col['display_name'] == pk['name']:
return False
if not pk_exists:
return False
return True
def _get_primary_keys(sql_path, table_oid, conn):
query = render_template(
"/".join([sql_path, 'primary_keys.sql']),
obj_id=table_oid
)
status, result = conn.execute_dict(query)
if not status:
return return_not_updatable()
primary_keys_columns = []
primary_keys = OrderedDict()
pk_names = []
for row in result['rows']:
primary_keys[row['attname']] = row['typname']
primary_keys_columns.append({
'name': row['attname'],
'column_number': row['attnum']
})
pk_names.append(row['attname'])
return primary_keys, primary_keys_columns, pk_names
def return_not_updatable():
return False, None, None, None

View File

@ -0,0 +1,317 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2019, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from flask import render_template
from pgadmin.tools.sqleditor.utils.constant_definition import TX_STATUS_IDLE
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
def save_changed_data(changed_data, columns_info, conn, command_obj,
client_primary_key, auto_commit=True):
"""
This function is used to save the data into the database.
Depending on condition it will either update or insert the
new row into the database.
Args:
changed_data: Contains data to be saved
command_obj: The transaction object (command_obj or trans_obj)
conn: The connection object
columns_info: session_obj['columns_info']
client_primary_key: session_obj['client_primary_key']
auto_commit: If the changes should be commited automatically.
"""
status = False
res = None
query_res = dict()
count = 0
list_of_rowid = []
operations = ('added', 'updated', 'deleted')
list_of_sql = {}
_rowid = None
is_commit_required = False
pgadmin_alias = {
col_name: col_info['pgadmin_alias']
for col_name, col_info in columns_info.items()
}
if conn.connected():
is_savepoint = False
# Start the transaction if the session is idle
if conn.transaction_status() == TX_STATUS_IDLE:
conn.execute_void('BEGIN;')
else:
conn.execute_void('SAVEPOINT save_data;')
is_savepoint = True
# Iterate total number of records to be updated/inserted
for of_type in changed_data:
# No need to go further if its not add/update/delete operation
if of_type not in operations:
continue
# if no data to be save then continue
if len(changed_data[of_type]) < 1:
continue
column_type = {}
column_data = {}
for each_col in columns_info:
if (
columns_info[each_col]['not_null'] and
not columns_info[each_col]['has_default_val']
):
column_data[each_col] = None
column_type[each_col] = \
columns_info[each_col]['type_name']
else:
column_type[each_col] = \
columns_info[each_col]['type_name']
# For newly added rows
if of_type == 'added':
# Python dict does not honour the inserted item order
# So to insert data in the order, we need to make ordered
# list of added index We don't need this mechanism in
# updated/deleted rows as it does not matter in
# those operations
added_index = OrderedDict(
sorted(
changed_data['added_index'].items(),
key=lambda x: int(x[0])
)
)
list_of_sql[of_type] = []
# When new rows are added, only changed columns data is
# sent from client side. But if column is not_null and has
# no_default_value, set column to blank, instead
# of not null which is set by default.
column_data = {}
pk_names, primary_keys = command_obj.get_primary_keys()
has_oids = 'oid' in column_type
for each_row in added_index:
# Get the row index to match with the added rows
# dict key
tmp_row_index = added_index[each_row]
data = changed_data[of_type][tmp_row_index]['data']
# Remove our unique tracking key
data.pop(client_primary_key, None)
data.pop('is_row_copied', None)
list_of_rowid.append(data.get(client_primary_key))
# Update columns value with columns having
# not_null=False and has no default value
column_data.update(data)
sql = render_template(
"/".join([command_obj.sql_path, 'insert.sql']),
data_to_be_saved=column_data,
pgadmin_alias=pgadmin_alias,
primary_keys=None,
object_name=command_obj.object_name,
nsp_name=command_obj.nsp_name,
data_type=column_type,
pk_names=pk_names,
has_oids=has_oids
)
select_sql = render_template(
"/".join([command_obj.sql_path, 'select.sql']),
object_name=command_obj.object_name,
nsp_name=command_obj.nsp_name,
primary_keys=primary_keys,
has_oids=has_oids
)
list_of_sql[of_type].append({
'sql': sql, 'data': data,
'client_row': tmp_row_index,
'select_sql': select_sql
})
# Reset column data
column_data = {}
# For updated rows
elif of_type == 'updated':
list_of_sql[of_type] = []
for each_row in changed_data[of_type]:
data = changed_data[of_type][each_row]['data']
pk_escaped = {
pk: pk_val.replace('%', '%%') if hasattr(
pk_val, 'replace') else pk_val
for pk, pk_val in
changed_data[of_type][each_row]['primary_keys'].items()
}
sql = render_template(
"/".join([command_obj.sql_path, 'update.sql']),
data_to_be_saved=data,
pgadmin_alias=pgadmin_alias,
primary_keys=pk_escaped,
object_name=command_obj.object_name,
nsp_name=command_obj.nsp_name,
data_type=column_type
)
list_of_sql[of_type].append({'sql': sql, 'data': data})
list_of_rowid.append(data.get(client_primary_key))
# For deleted rows
elif of_type == 'deleted':
list_of_sql[of_type] = []
is_first = True
rows_to_delete = []
keys = None
no_of_keys = None
for each_row in changed_data[of_type]:
rows_to_delete.append(changed_data[of_type][each_row])
# Fetch the keys for SQL generation
if is_first:
# We need to covert dict_keys to normal list in
# Python3
# In Python2, it's already a list & We will also
# fetch column names using index
keys = list(
changed_data[of_type][each_row].keys()
)
no_of_keys = len(keys)
is_first = False
# Map index with column name for each row
for row in rows_to_delete:
for k, v in row.items():
# Set primary key with label & delete index based
# mapped key
try:
row[changed_data['columns']
[int(k)]['name']] = v
except ValueError:
continue
del row[k]
sql = render_template(
"/".join([command_obj.sql_path, 'delete.sql']),
data=rows_to_delete,
primary_key_labels=keys,
no_of_keys=no_of_keys,
object_name=command_obj.object_name,
nsp_name=command_obj.nsp_name
)
list_of_sql[of_type].append({'sql': sql, 'data': {}})
for opr, sqls in list_of_sql.items():
for item in sqls:
if item['sql']:
item['data'] = {
pgadmin_alias[k] if k in pgadmin_alias else k: v
for k, v in item['data'].items()
}
row_added = None
def failure_handle(res):
if is_savepoint:
conn.execute_void('ROLLBACK TO SAVEPOINT '
'save_data;')
msg = 'Query ROLLBACK, but the current ' \
'transaction is still ongoing.'
res += ' Saving ROLLBACK, but the current ' \
'transaction is still ongoing'
else:
conn.execute_void('ROLLBACK;')
msg = 'Transaction ROLLBACK'
# If we roll backed every thing then update the
# message for each sql query.
for val in query_res:
if query_res[val]['status']:
query_res[val]['result'] = msg
# If list is empty set rowid to 1
try:
if list_of_rowid:
_rowid = list_of_rowid[count]
else:
_rowid = 1
except Exception:
_rowid = 0
return status, res, query_res, _rowid,\
is_commit_required
try:
# Fetch oids/primary keys
if 'select_sql' in item and item['select_sql']:
status, res = conn.execute_dict(
item['sql'], item['data'])
else:
status, res = conn.execute_void(
item['sql'], item['data'])
except Exception as _:
failure_handle(res)
raise
if not status:
return failure_handle(res)
# Select added row from the table
if 'select_sql' in item:
status, sel_res = conn.execute_dict(
item['select_sql'], res['rows'][0])
if not status:
if is_savepoint:
conn.execute_void('ROLLBACK TO SAVEPOINT'
' save_data;')
msg = 'Query ROLLBACK, the current' \
' transaction is still ongoing.'
else:
conn.execute_void('ROLLBACK;')
msg = 'Transaction ROLLBACK'
# If we roll backed every thing then update
# the message for each sql query.
for val in query_res:
if query_res[val]['status']:
query_res[val]['result'] = msg
# If list is empty set rowid to 1
try:
if list_of_rowid:
_rowid = list_of_rowid[count]
else:
_rowid = 1
except Exception:
_rowid = 0
return status, sel_res, query_res, _rowid,\
is_commit_required
if 'rows' in sel_res and len(sel_res['rows']) > 0:
row_added = {
item['client_row']: sel_res['rows'][0]}
rows_affected = conn.rows_affected()
# store the result of each query in dictionary
query_res[count] = {
'status': status,
'result': None if row_added else res,
'sql': item['sql'], 'rows_affected': rows_affected,
'row_added': row_added
}
count += 1
# Commit the transaction if no error is found & autocommit is activated
if auto_commit:
conn.execute_void('COMMIT;')
else:
is_commit_required = True
return status, res, query_res, _rowid, is_commit_required

View File

@ -96,69 +96,8 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
filename)
with open(complete_file_name) as jsonfp:
data = json.load(jsonfp)
# CHECK SERVER VERSION & TYPE PRECONDITION
flag = False
if 'prerequisite' in data and \
data['prerequisite'] is not None:
prerequisite_data = data['prerequisite']
module_str = module.replace('_', ' ').capitalize()
db_type = server_info['type'].upper()
min_ver = prerequisite_data['minVer']
max_ver = prerequisite_data['maxVer']
if 'type' in prerequisite_data and \
prerequisite_data['type']:
if server_info['type'] != \
prerequisite_data['type']:
flag = True
print(
"\n\n"
"{0} are not supported by {1} - "
"Skipped".format(
module_str,
db_type
),
file=sys.stderr
)
if 'minVer' in prerequisite_data and \
prerequisite_data['minVer']:
if server_info['server_version'] < \
prerequisite_data['minVer']:
if not flag:
flag = True
print(
"\n\n"
"{0} are not supported by"
" {1} server less than"
" {2} - Skipped".format(
module_str, db_type, min_ver
),
file=sys.stderr
)
if 'maxVer' in prerequisite_data and \
prerequisite_data['maxVer']:
if server_info['server_version'] > \
prerequisite_data['maxVer']:
if not flag:
flag = True
print(
"\n\n"
"{0} are not supported by"
" {1} server greater than"
" {2} - Skipped".format(
module_str, db_type, max_ver
),
file=sys.stderr
)
if not flag:
tests_scenarios = {}
tests_scenarios['scenarios'] = data['scenarios']
for key, scenarios in tests_scenarios.items():
self.execute_test_case(scenarios)
for key, scenarios in data.items():
self.execute_test_case(scenarios)
def tearDown(self):
database_utils.disconnect_database(
@ -209,7 +148,7 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
for scenario in scenarios:
print(scenario['name'])
if scenario['data'] and 'schema' in scenario['data']:
if 'data' in scenario and 'schema' in scenario['data']:
# If schema is already exist then fetch the oid
schema = regression.schema_utils.verify_schemas(
self.server, self.db_name,
@ -265,8 +204,16 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
:param module_path: Path of the module to be tested.
:return:
"""
# Join the application path and the module path
absolute_path = os.path.join(self.apppath, module_path)
# Join the application path, module path and tests folder
tests_folder_path = os.path.join(self.apppath, module_path, 'tests')
# A folder name matching the Server Type (pg, ppas) takes priority so
# check whether that exists or not. If so, than check the version
# folder in it, else look directly in the 'tests' folder.
absolute_path = os.path.join(tests_folder_path, self.server['type'])
if not os.path.exists(absolute_path):
absolute_path = tests_folder_path
# Iterate the version mapping directories.
for version_mapping in get_version_mapping_directories(
self.server['type']):
@ -274,7 +221,7 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
self.server_information['server_version']:
continue
complete_path = os.path.join(absolute_path, 'tests',
complete_path = os.path.join(absolute_path,
version_mapping['name'])
if os.path.exists(complete_path):