Modified the creation logic of custom schema and added support for schema id in re_sql test framework.

pull/25/head
Khushboo Vashi 2019-07-09 13:09:32 +05:30 committed by Akshay Joshi
parent a17687e27c
commit f4ac1e804e
1 changed files with 48 additions and 26 deletions

View File

@ -58,13 +58,7 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
self, utils.SERVER_GROUP, self.server_information['server_id'],
self.server_information['db_id'])
self.database_info = parent_node_dict["database"][-1]
self.db_name = self.database_info["db_name"]
self.connection = utils.get_db_connection(self.db_name,
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'])
self.get_db_connection()
if not self.db_con['info'] == "Database connected.":
raise Exception("Could not connect to database.")
@ -73,8 +67,13 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
self.apppath = os.getcwd()
def runTest(self):
# Create the module list on which reverse engineering sql test
# cases will be executed.
""" Create the module list on which reverse engineering sql test
cases will be executed."""
# Schema ID placeholder in JSON file which needs to be replaced
# while running the test cases
self.JSON_PLACEHOLDERS = {'schema_id': '<SCHEMA_ID>'}
server_info = self.server_information
resql_module_list = create_resql_module_list(
BaseTestGenerator.re_sql_module_list,
@ -104,6 +103,21 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
self, self.server_information['server_id'],
self.server_information['db_id'])
def get_db_connection(self):
"""Get the database connection."""
self.database_info = parent_node_dict["database"][-1]
self.db_name = self.database_info["db_name"]
if (not hasattr(self, 'connection')) or \
(hasattr(self, 'connection') and self.connection.closed == 1):
self.connection = utils.get_db_connection(
self.db_name,
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port']
)
def get_url(self, endpoint, object_id=None):
"""
This function is used to get the url.
@ -148,25 +162,33 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
for scenario in scenarios:
print(scenario['name'])
if 'data' in scenario and 'schema' in scenario['data']:
# If schema is already exist then fetch the oid
schema = regression.schema_utils.verify_schemas(
self.server, self.db_name,
scenario['data']['schema'])
if schema:
self.schema_id = schema[0]
else:
# If schema doesn't exist then create it
schema = regression.schema_utils.create_schema(
self.connection,
scenario['data']['schema'])
self.schema_id = schema[0]
else:
self.schema_id = self.server_information['schema_id']
if 'type' in scenario and scenario['type'] == 'create':
# Get the url and create the specific node.
if 'data' in scenario and 'schema' in scenario['data']:
# If schema is already exist then fetch the oid
self.get_db_connection()
schema = regression.schema_utils.verify_schemas(
self.server, self.db_name,
scenario['data']['schema']
)
if schema:
self.schema_id = schema[0]
else:
# If schema doesn't exist then create it
schema = regression.schema_utils.create_schema(
self.connection,
scenario['data']['schema'])
self.schema_id = schema[0]
else:
self.schema_id = self.server_information['schema_id']
if 'data' in scenario and 'schema_id' in scenario['data'] and \
scenario['data']['schema_id'] == \
self.JSON_PLACEHOLDERS['schema_id']:
scenario['data']['schema'] = self.schema_id
create_url = self.get_url(scenario['endpoint'])
response = self.tester.post(create_url,
data=json.dumps(scenario['data']),