From 32d904058ad2e9cdcc4eef68d462fde63486eb5e Mon Sep 17 00:00:00 2001 From: Shubham Agarwal Date: Mon, 3 Aug 2020 11:44:27 +0530 Subject: [PATCH] Fixed SonarQube issues. --- .../python_test_utils/test_utils.py | 225 +++++++++--------- 1 file changed, 116 insertions(+), 109 deletions(-) diff --git a/web/regression/python_test_utils/test_utils.py b/web/regression/python_test_utils/test_utils.py index 01ccb548c..2f36b5e81 100644 --- a/web/regression/python_test_utils/test_utils.py +++ b/web/regression/python_test_utils/test_utils.py @@ -551,32 +551,29 @@ def drop_tablespace(connection): def create_server(server): """This function is used to create server""" - try: - conn = sqlite3.connect(config.TEST_SQLITE_PATH) - # Create the server - cur = conn.cursor() - server_details = (1, SERVER_GROUP, server['name'], server['host'], - server['port'], server['db'], server['username'], - server['role'], server['sslmode'], server['comment']) - cur.execute('INSERT INTO server (user_id, servergroup_id, name, host, ' - 'port, maintenance_db, username, role, ssl_mode,' - ' comment) VALUES (?,?,?,?,?,?,?,?,?,?)', server_details) - server_id = cur.lastrowid - conn.commit() - conn.close() + conn = sqlite3.connect(config.TEST_SQLITE_PATH) + # Create the server + cur = conn.cursor() + server_details = (1, SERVER_GROUP, server['name'], server['host'], + server['port'], server['db'], server['username'], + server['role'], server['sslmode'], server['comment']) + cur.execute('INSERT INTO server (user_id, servergroup_id, name, host, ' + 'port, maintenance_db, username, role, ssl_mode,' + ' comment) VALUES (?,?,?,?,?,?,?,?,?,?)', server_details) + server_id = cur.lastrowid + conn.commit() + conn.close() - server['type'] = get_server_type(server) - # Add server info to parent_node_dict - regression.parent_node_dict["server"].append( - { - "server_id": server_id, - "server": server - } - ) + server['type'] = get_server_type(server) + # Add server info to parent_node_dict + regression.parent_node_dict["server"].append( + { + "server_id": server_id, + "server": server + } + ) - return server_id - except Exception as exception: - raise Exception("Error while creating server. %s" % exception) + return server_id def delete_server_with_api(tester, sid): @@ -746,6 +743,12 @@ def get_db_server(sid): def configure_preferences(default_binary_path=None): conn = sqlite3.connect(config.TEST_SQLITE_PATH) cur = conn.cursor() + insert_preferences_query = \ + 'INSERT INTO user_preferences(pid, uid, value) VALUES (?,?,?)' + select_preference_query = \ + 'SELECT pid, uid FROM user_preferences where pid=?' + update_preference_query = 'UPDATE user_preferences' \ + ' SET VALUE = ? WHERE PID = ?' if default_binary_path is not None: paths_pref = Preferences.module('paths') @@ -766,8 +769,7 @@ def configure_preferences(default_binary_path=None): else: params = (pref_bin_path.pid, 1, default_binary_path[server]) cur.execute( - 'INSERT INTO user_preferences(pid, uid, value)' - ' VALUES (?,?,?)', params + insert_preferences_query, params ) browser_pref = Preferences.module('browser') @@ -777,20 +779,16 @@ def configure_preferences(default_binary_path=None): browser_pref.preference('browser_tree_state_save_interval') user_pref = cur.execute( - 'SELECT pid, uid FROM user_preferences ' - 'where pid=?', (pref_tree_state_save_interval.pid,) + select_preference_query, (pref_tree_state_save_interval.pid,) ) if len(user_pref.fetchall()) == 0: - cur.execute( - 'INSERT INTO user_preferences(pid, uid, value)' - ' VALUES (?,?,?)', (pref_tree_state_save_interval.pid, 1, -1) - ) + cur.execute(insert_preferences_query, + (pref_tree_state_save_interval.pid, 1, -1) + ) else: cur.execute( - 'UPDATE user_preferences' - ' SET VALUE = ?' - ' WHERE PID = ?', (-1, pref_tree_state_save_interval.pid) + update_preference_query, (-1, pref_tree_state_save_interval.pid) ) # Disable auto expand sole children tree state for tests @@ -798,20 +796,18 @@ def configure_preferences(default_binary_path=None): browser_pref.preference('auto_expand_sole_children') user_pref = cur.execute( - 'SELECT pid, uid FROM user_preferences ' - 'where pid=?', (pref_auto_expand_sol_children.pid,) + select_preference_query, (pref_auto_expand_sol_children.pid,) ) if len(user_pref.fetchall()) == 0: cur.execute( - 'INSERT INTO user_preferences(pid, uid, value)' - ' VALUES (?,?,?)', (pref_auto_expand_sol_children.pid, 1, 'False') + insert_preferences_query, + (pref_auto_expand_sol_children.pid, 1, 'False') ) else: cur.execute( - 'UPDATE user_preferences' - ' SET VALUE = ?' - ' WHERE PID = ?', ('False', pref_auto_expand_sol_children.pid) + update_preference_query, + ('False', pref_auto_expand_sol_children.pid) ) # Disable reload warning on browser @@ -819,20 +815,18 @@ def configure_preferences(default_binary_path=None): browser_pref.preference('confirm_on_refresh_close') user_pref = cur.execute( - 'SELECT pid, uid FROM user_preferences ' - 'where pid=?', (pref_confirm_on_refresh_close.pid,) + select_preference_query, (pref_confirm_on_refresh_close.pid,) ) if len(user_pref.fetchall()) == 0: cur.execute( - 'INSERT INTO user_preferences(pid, uid, value)' - ' VALUES (?,?,?)', (pref_confirm_on_refresh_close.pid, 1, 'False') + insert_preferences_query, + (pref_confirm_on_refresh_close.pid, 1, 'False') ) else: cur.execute( - 'UPDATE user_preferences' - ' SET VALUE = ?' - ' WHERE PID = ?', ('False', pref_confirm_on_refresh_close.pid) + update_preference_query, + ('False', pref_confirm_on_refresh_close.pid) ) conn.commit() @@ -1135,9 +1129,9 @@ def check_binary_path_or_skip_test(cls, utility_name): cls.server['default_binary_paths'][cls.server['type']], utility_name ) - retVal = does_utility_exist(binary_path) - if retVal is not None: - cls.skipTest(retVal) + ret_val = does_utility_exist(binary_path) + if ret_val is not None: + cls.skipTest(ret_val) def get_watcher_dialogue_status(self): @@ -1253,17 +1247,7 @@ def get_selenium_grid_status_and_browser_list(selenoid_url, arguments): if selenoid_status: # Get available browsers from selenoid available_browsers = selenoid_status["browsers"] - - # Get browser list provided in input by user - if 'default_browser' in arguments and \ - arguments['default_browser'] is not None: - default_browser = arguments['default_browser'].lower() - list_of_browsers = [{"name": default_browser, - "version": None}] - else: - list_of_browsers = test_setup.config_data['selenoid_config'][ - 'browsers_list'] - + list_of_browsers = get_selenoid_browsers_list(arguments) for browser in list_of_browsers: if browser["name"].lower() in available_browsers.keys(): versions = available_browsers[(browser["name"].lower())] @@ -1333,7 +1317,7 @@ def launch_url_in_browser(driver_instance, url, title='pgAdmin 4', timeout=50): exception_msg = 'Web-page title did not match to {0}. ' \ 'Please check url {1} accessible on ' \ 'internet.'.format(title, url) - raise Exception(exception_msg) + raise WebDriverException(exception_msg) def get_remote_webdriver(hub_url, browser, browser_ver, test_name): @@ -1500,57 +1484,15 @@ def delete_server(tester, server_information=None): try: parent_node_dict = regression.parent_node_dict test_servers = parent_node_dict["server"] - test_databases = parent_node_dict["database"] - test_roles = regression.node_info_dict["lrid"] test_table_spaces = regression.node_info_dict["tsid"] for server in test_servers: if server["server_id"] == server_information['server_id']: srv_id = server["server_id"] servers_dict = server["server"] - deleted_db = [] - for database in test_databases: - if database['server_id'] == srv_id: - connection = get_db_connection( - servers_dict['db'], - servers_dict['username'], - servers_dict['db_password'], - servers_dict['host'], - servers_dict['port'], - servers_dict['sslmode'] - ) - # Drop database - drop_database(connection, database["db_name"]) - deleted_db.append(database) - - if len(deleted_db) > 0: - print("Deleted DB {0}".format(deleted_db), - file=sys.stderr) - for ele in deleted_db: - regression.parent_node_dict["database"].remove(ele) - - deleted_roles = [] - for role in test_roles: - if role['server_id'] == srv_id: - connection = get_db_connection( - servers_dict['db'], - servers_dict['username'], - servers_dict['db_password'], - servers_dict['host'], - servers_dict['port'], - servers_dict['sslmode'] - ) - # Delete role - regression.roles_utils.delete_role( - connection, role["role_name"] - ) - deleted_roles.append(role) - - if len(deleted_roles) > 0: - print("Deleted Roles {0}".format(deleted_roles), - file=sys.stderr) - for ele in deleted_roles: - regression.node_info_dict["lrid"].remove(ele) + # Delete databases and roles + delete_database(srv_id, servers_dict) + delete_roles(srv_id, servers_dict) for tablespace in test_table_spaces: if tablespace['server_id'] == srv_id: @@ -1578,3 +1520,68 @@ def delete_server(tester, server_information=None): except Exception: traceback.print_exc(file=sys.stderr) raise + + +def delete_database(server_id, servers_dict): + """This function will delete all the databases from the server""" + parent_node_dict = regression.parent_node_dict + test_databases = parent_node_dict["database"] + deleted_db = [] + for database in test_databases: + if database['server_id'] == server_id: + connection = get_db_connection( + servers_dict['db'], + servers_dict['username'], + servers_dict['db_password'], + servers_dict['host'], + servers_dict['port'], + servers_dict['sslmode'] + ) + # Drop database + drop_database(connection, database["db_name"]) + deleted_db.append(database) + + if len(deleted_db) > 0: + print("Deleted DB {0}".format(deleted_db), + file=sys.stderr) + for ele in deleted_db: + regression.parent_node_dict["database"].remove(ele) + + +def delete_roles(server_id, servers_dict): + test_roles = regression.node_info_dict["lrid"] + deleted_roles = [] + for role in test_roles: + if role['server_id'] == server_id: + connection = get_db_connection( + servers_dict['db'], + servers_dict['username'], + servers_dict['db_password'], + servers_dict['host'], + servers_dict['port'], + servers_dict['sslmode'] + ) + # Delete role + regression.roles_utils.delete_role( + connection, role["role_name"] + ) + deleted_roles.append(role) + + if len(deleted_roles) > 0: + print("Deleted Roles {0}".format(deleted_roles), + file=sys.stderr) + for ele in deleted_roles: + regression.node_info_dict["lrid"].remove(ele) + + +def get_selenoid_browsers_list(arguments): + """This function will return the list of all the browsers from selenoid""" + if 'default_browser' in arguments and \ + arguments['default_browser'] is not None: + default_browser = arguments['default_browser'].lower() + list_of_browsers = [{"name": default_browser, + "version": None}] + else: + list_of_browsers = test_setup.config_data['selenoid_config'][ + 'browsers_list'] + return list_of_browsers