Fixed cognitive complexity reported by SonarQube.

pull/41/head
Rahul Shirsat 2021-03-19 11:51:45 +05:30 committed by Akshay Joshi
parent b973d6055d
commit f5cc1d1c7e
2 changed files with 111 additions and 103 deletions

View File

@ -1071,6 +1071,71 @@ class FunctionView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
resp_data['pronamespace'] = self._get_schema(
resp_data['pronamespace'])
def _get_function_definition(self, scid, fnid, resp_data, target_schema):
sql = render_template("/".join([self.sql_template_path,
self._GET_DEFINITION_SQL]
), data=resp_data,
fnid=fnid, scid=scid)
status, res = self.conn.execute_2darray(sql)
if not status:
return internal_server_error(errormsg=res)
elif target_schema:
res['rows'][0]['nspname'] = target_schema
resp_data['pronamespace'] = target_schema
# Add newline and tab before each argument to format
name_with_default_args = self.qtIdent(
self.conn,
res['rows'][0]['nspname'],
res['rows'][0]['proname']
) + '(\n\t' + res['rows'][0]['func_args']. \
replace(', ', ',\n\t') + ')'
# Generate sql for "SQL panel"
# func_def is function signature with default arguments
# query_for - To distinguish the type of call
func_def = render_template("/".join([self.sql_template_path,
self._CREATE_SQL]),
data=resp_data, query_type="create",
func_def=name_with_default_args,
query_for="sql_panel")
return func_def
def _get_procedure_definition(self, scid, fnid, resp_data, target_schema):
sql = render_template("/".join([self.sql_template_path,
self._GET_DEFINITION_SQL]
), data=resp_data,
fnid=fnid, scid=scid)
status, res = self.conn.execute_2darray(sql)
if not status:
return internal_server_error(errormsg=res)
elif target_schema:
res['rows'][0]['nspname'] = target_schema
# Add newline and tab before each argument to format
name_with_default_args = self.qtIdent(
self.conn,
res['rows'][0]['nspname'],
res['rows'][0]['proname']
) + '(\n\t' + res['rows'][0]['func_args'].\
replace(', ', ',\n\t') + ')'
# Generate sql for "SQL panel"
# func_def is procedure signature with default arguments
# query_for - To distinguish the type of call
func_def = render_template("/".join([self.sql_template_path,
self._CREATE_SQL]),
data=resp_data, query_type="create",
func_def=name_with_default_args,
query_for="sql_panel")
return func_def
@check_precondition
def sql(self, gid, sid, did, scid, fnid=None, **kwargs):
"""
@ -1113,6 +1178,7 @@ class FunctionView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
if self.node_type == 'procedure':
object_type = 'procedure'
if 'provolatile' in resp_data:
resp_data['provolatile'] = vol_dict.get(
resp_data['provolatile'], ''
@ -1121,36 +1187,11 @@ class FunctionView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
# Get Schema Name from its OID.
self._get_schema_name_from_oid(resp_data)
sql = render_template("/".join([self.sql_template_path,
self._GET_DEFINITION_SQL]
), data=resp_data,
fnid=fnid, scid=scid)
status, res = self.conn.execute_2darray(sql)
if not status:
return internal_server_error(errormsg=res)
elif target_schema:
res['rows'][0]['nspname'] = target_schema
# Add newline and tab before each argument to format
name_with_default_args = self.qtIdent(
self.conn,
res['rows'][0]['nspname'],
res['rows'][0]['proname']
) + '(\n\t' + res['rows'][0]['func_args'].\
replace(', ', ',\n\t') + ')'
# Parse privilege data
self._parse_privilege_data(resp_data)
# Generate sql for "SQL panel"
# func_def is procedure signature with default arguments
# query_for - To distinguish the type of call
func_def = render_template("/".join([self.sql_template_path,
self._CREATE_SQL]),
data=resp_data, query_type="create",
func_def=name_with_default_args,
query_for="sql_panel")
func_def = self._get_procedure_definition(scid, fnid, resp_data,
target_schema)
else:
object_type = 'function'
@ -1168,34 +1209,12 @@ class FunctionView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
# Parse privilege data
self._parse_privilege_data(resp_data)
sql = render_template("/".join([self.sql_template_path,
self._GET_DEFINITION_SQL]
), data=resp_data,
fnid=fnid, scid=scid)
func_def = self._get_function_definition(scid, fnid, resp_data,
target_schema)
status, res = self.conn.execute_2darray(sql)
if not status:
return internal_server_error(errormsg=res)
elif target_schema:
res['rows'][0]['nspname'] = target_schema
resp_data['pronamespace'] = target_schema
# Add newline and tab before each argument to format
name_with_default_args = self.qtIdent(
self.conn,
res['rows'][0]['nspname'],
res['rows'][0]['proname']
) + '(\n\t' + res['rows'][0]['func_args']. \
replace(', ', ',\n\t') + ')'
# Generate sql for "SQL panel"
# func_def is function signature with default arguments
# query_for - To distinguish the type of call
func_def = render_template("/".join([self.sql_template_path,
self._CREATE_SQL]),
data=resp_data, query_type="create",
func_def=name_with_default_args,
query_for="sql_panel")
# This is to check whether any exception occurred, if yes, then return
if not isinstance(func_def, str) and func_def.status_code is not None:
return func_def
sql_header = """-- {0}: {1}.{2}({3})\n\n""".format(
object_type.upper(), resp_data['pronamespace'],

View File

@ -538,37 +538,19 @@ rolmembership:{
else:
data[key] = val
invalid_msg = self._validate_rolname(kwargs.get('rid', -1), data)
if invalid_msg is not None:
return precondition_required(invalid_msg)
invalid_msg_arr = [
self._validate_rolname(kwargs.get('rid', -1), data),
self._validate_rolvaliduntil(data),
self._validate_rolconnlimit(data),
self._validate_rolemembership(kwargs.get('rid', -1), data),
self._validate_seclabels(kwargs.get('rid', -1), data),
self._validate_variables(kwargs.get('rid', -1), data),
self._validate_rolemembers(kwargs.get('rid', -1), data)
]
invalid_msg = self._validate_rolvaliduntil(data)
if invalid_msg is not None:
return precondition_required(invalid_msg)
invalid_msg = self._validate_rolconnlimit(data)
if invalid_msg is not None:
return precondition_required(invalid_msg)
invalid_msg = self._validate_rolemembership(
kwargs.get('rid', -1), data)
if invalid_msg is not None:
return precondition_required(invalid_msg)
invalid_msg = self._validate_seclabels(
kwargs.get('rid', -1), data)
if invalid_msg is not None:
return precondition_required(invalid_msg)
invalid_msg = self._validate_variables(
kwargs.get('rid', -1), data)
if invalid_msg is not None:
return precondition_required(invalid_msg)
invalid_msg = self._validate_rolemembers(
kwargs.get('rid', -1), data)
if invalid_msg is not None:
return precondition_required(invalid_msg)
for invalid_msg in invalid_msg_arr:
if invalid_msg is not None:
return precondition_required(invalid_msg)
self.request = data
@ -798,6 +780,30 @@ rolmembership:{
return gone()
def _set_seclabels(self, row):
if 'seclabels' in row and row['seclabels'] is not None:
res = []
for sec in row['seclabels']:
sec = re.search(r'([^=]+)=(.*$)', sec)
res.append({
'provider': sec.group(1),
'label': sec.group(2)
})
row['seclabels'] = res
def _set_rolemembership(self, row):
if 'rolmembers' in row:
rolmembers = []
for role in row['rolmembers']:
role = re.search(r'([01])(.+)', role)
rolmembers.append({
'role': role.group(2),
'admin': True if role.group(1) == '1' else False
})
row['rolmembers'] = rolmembers
def transform(self, rset):
for row in rset['rows']:
res = []
@ -810,25 +816,8 @@ rolmembership:{
'admin': True if role.group(1) == '1' else False
})
row['rolmembership'] = res
if 'seclabels' in row and row['seclabels'] is not None:
res = []
for sec in row['seclabels']:
sec = re.search(r'([^=]+)=(.*$)', sec)
res.append({
'provider': sec.group(1),
'label': sec.group(2)
})
row['seclabels'] = res
if 'rolmembers' in row:
rolmembers = []
for role in row['rolmembers']:
role = re.search(r'([01])(.+)', role)
rolmembers.append({
'role': role.group(2),
'admin': True if role.group(1) == '1' else False
})
row['rolmembers'] = rolmembers
self._set_seclabels(row)
self._set_rolemembership(row)
@check_precondition(action='properties')
def properties(self, gid, sid, rid):