From eb2c5546018bc60777a3ae54be8b2f03e3b15bf4 Mon Sep 17 00:00:00 2001 From: Nikhil Mohite Date: Mon, 3 Aug 2020 12:46:34 +0530 Subject: [PATCH] Fixed cognitive complexity issues reported by SonarQube. --- web/pgadmin/tools/maintenance/__init__.py | 38 +++++++--- .../sqleditor/utils/query_tool_preferences.py | 2 +- web/pgadmin/tools/user_management/__init__.py | 76 ++++++++++++------- web/pgadmin/utils/route.py | 30 +++++--- 4 files changed, 96 insertions(+), 50 deletions(-) diff --git a/web/pgadmin/tools/maintenance/__init__.py b/web/pgadmin/tools/maintenance/__init__.py index d19e32d30..833f7f4ee 100644 --- a/web/pgadmin/tools/maintenance/__init__.py +++ b/web/pgadmin/tools/maintenance/__init__.py @@ -97,10 +97,12 @@ class Message(IProcessDesc): def type_desc(self): return _("Maintenance") - def details(self, cmd, args): - + def _check_for_vacuum(self): + """ + Check for VACUUM in data and return format response. + :return: response. + """ res = None - if self.data['op'] == "VACUUM": res = _('VACUUM ({0})') @@ -113,6 +115,10 @@ class Message(IProcessDesc): opts.append(_('VERBOSE')) res = res.format(', '.join(str(x) for x in opts)) + return res + + def details(self, cmd, args): + res = self._check_for_vacuum() if self.data['op'] == "ANALYZE": res = _('ANALYZE') @@ -162,6 +168,23 @@ def script(): ) +def get_index_name(data): + """ + Check and get index name from constraints. + :param data: Data. + :return: index_name. + """ + index_name = None + if 'primary_key' in data and data['primary_key']: + index_name = data['primary_key'] + elif 'unique_constraint' in data and data['unique_constraint']: + index_name = data['unique_constraint'] + elif 'index' in data and data['index']: + index_name = data['index'] + + return index_name + + @blueprint.route( '/job//', methods=['POST'], endpoint='create_job' ) @@ -182,14 +205,7 @@ def create_maintenance_job(sid, did): else: data = json.loads(request.data, encoding='utf-8') - index_name = None - - if 'primary_key' in data and data['primary_key']: - index_name = data['primary_key'] - elif 'unique_constraint' in data and data['unique_constraint']: - index_name = data['unique_constraint'] - elif 'index' in data and data['index']: - index_name = data['index'] + index_name = get_index_name(data) # Fetch the server details like hostname, port, roles etc server = Server.query.filter_by( diff --git a/web/pgadmin/tools/sqleditor/utils/query_tool_preferences.py b/web/pgadmin/tools/sqleditor/utils/query_tool_preferences.py index e4d580c1a..b70e22102 100644 --- a/web/pgadmin/tools/sqleditor/utils/query_tool_preferences.py +++ b/web/pgadmin/tools/sqleditor/utils/query_tool_preferences.py @@ -137,7 +137,7 @@ def register_query_tool_preferences(self): gettext("Plain text mode?"), 'boolean', False, category_label=gettext('Editor'), help_str=gettext( - 'When set to True, keywords won''t be highlighted and code ' + 'When set to True, keywords won\'t be highlighted and code ' 'folding will be disabled. Plain text mode will improve editor ' 'performance with large files.' ) diff --git a/web/pgadmin/tools/user_management/__init__.py b/web/pgadmin/tools/user_management/__init__.py index 6b8d84e2a..96cf4016f 100644 --- a/web/pgadmin/tools/user_management/__init__.py +++ b/web/pgadmin/tools/user_management/__init__.py @@ -85,6 +85,22 @@ blueprint = UserManagementModule( ) +def validate_password(data, new_data): + """ + Check password new and confirm password match. If both passwords are not + match raise exception. + :param data: Data. + :param new_data: new data dict. + """ + if ('newPassword' in data and data['newPassword'] != "" and + 'confirmPassword' in data and data['confirmPassword'] != ""): + + if data['newPassword'] == data['confirmPassword']: + new_data['password'] = encrypt_password(data['newPassword']) + else: + raise Exception(_("Passwords do not match.")) + + def validate_user(data): new_data = dict() email_filter = re.compile( @@ -93,13 +109,7 @@ def validate_user(data): "(?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$" ) - if ('newPassword' in data and data['newPassword'] != "" and - 'confirmPassword' in data and data['confirmPassword'] != ""): - - if data['newPassword'] == data['confirmPassword']: - new_data['password'] = encrypt_password(data['newPassword']) - else: - raise Exception(_("Passwords do not match.")) + validate_password(data, new_data) if 'email' in data and data['email'] and data['email'] != "": if email_filter.match(data['email']): @@ -240,6 +250,36 @@ def create(): ) +def _create_new_user(new_data): + """ + Create new user. + :param new_data: Data from user creation. + :return: Return new created user. + """ + auth_source = new_data['auth_source'] if 'auth_source' in new_data \ + else current_app.PGADMIN_DEFAULT_AUTH_SOURCE + username = new_data['username'] if \ + 'username' in new_data and auth_source != \ + current_app.PGADMIN_DEFAULT_AUTH_SOURCE else new_data['email'] + email = new_data['email'] if 'email' in new_data else None + password = new_data['password'] if 'password' in new_data else None + + usr = User(username=username, + email=email, + roles=new_data['roles'], + active=new_data['active'], + password=password, + auth_source=auth_source) + db.session.add(usr) + db.session.commit() + # Add default server group for new user. + server_group = ServerGroup(user_id=usr.id, name="Servers") + db.session.add(server_group) + db.session.commit() + + return usr + + def create_user(data): if 'auth_source' in data and data['auth_source'] != \ current_app.PGADMIN_DEFAULT_AUTH_SOURCE: @@ -264,27 +304,7 @@ def create_user(data): return False, str(e) try: - - auth_source = new_data['auth_source'] if 'auth_source' in new_data \ - else current_app.PGADMIN_DEFAULT_AUTH_SOURCE - username = new_data['username'] if \ - 'username' in new_data and auth_source !=\ - current_app.PGADMIN_DEFAULT_AUTH_SOURCE else new_data['email'] - email = new_data['email'] if 'email' in new_data else None - password = new_data['password'] if 'password' in new_data else None - - usr = User(username=username, - email=email, - roles=new_data['roles'], - active=new_data['active'], - password=password, - auth_source=auth_source) - db.session.add(usr) - db.session.commit() - # Add default server group for new user. - server_group = ServerGroup(user_id=usr.id, name="Servers") - db.session.add(server_group) - db.session.commit() + usr = _create_new_user(new_data) except Exception as e: return False, str(e) diff --git a/web/pgadmin/utils/route.py b/web/pgadmin/utils/route.py index 0d646bb45..cd996aa70 100644 --- a/web/pgadmin/utils/route.py +++ b/web/pgadmin/utils/route.py @@ -94,16 +94,26 @@ class TestsGeneratorRegistry(ABCMeta): traceback.print_exc(file=sys.stderr) else: # Check for SERVER mode - for module_name in all_modules: - try: - if "tests." in str(module_name) and not any( - str(module_name).startswith( - 'pgadmin.' + str(exclude_pkg) - ) for exclude_pkg in exclude_pkgs - ): - import_module(module_name) - except ImportError: - traceback.print_exc(file=sys.stderr) + TestsGeneratorRegistry._check_server_mode(all_modules, + exclude_pkgs) + + @staticmethod + def _check_server_mode(all_modules, exclude_pkgs): + """ + This function check for server mode test cases. + :param all_modules: all modules. + :param exclude_pkgs: exclude package list. + """ + for module_name in all_modules: + try: + if "tests." in str(module_name) and not any( + str(module_name).startswith( + 'pgadmin.' + str(exclude_pkg) + ) for exclude_pkg in exclude_pkgs + ): + import_module(module_name) + except ImportError: + traceback.print_exc(file=sys.stderr) @six.add_metaclass(TestsGeneratorRegistry)