Improve code coverage and API test cases for Views and Materialized Views. Fixes #5337

pull/37/head
Yogesh Mahajan 2020-11-20 10:04:30 +05:30 committed by Akshay Joshi
parent 29c3c0cf7f
commit ce14696165
16 changed files with 2369 additions and 563 deletions

View File

@ -13,6 +13,7 @@ New features
Housekeeping
************
| `Issue #5337 <https://redmine.postgresql.org/issues/5337>`_ - Improve code coverage and API test cases for Views and Materialized Views.
Bug fixes
*********

View File

@ -73,10 +73,11 @@ class CompoundTriggersAddTestCase(BaseTestGenerator):
"ALTER TABLE %s.%s OWNER TO %s"
self.view_name = \
"view_compound_trigger_%s" % (str(uuid.uuid4())[1:8])
self.view_id = view_utils.create_view(self.server, self.db_name,
self.schema_name,
view_sql,
self.view_name)
self.view_id = compound_trigger_utils.create_view(self.server,
self.db_name,
self.schema_name,
view_sql,
self.view_name)
def create_compound_trigger(self, object_id):
return self.tester.post(

View File

@ -74,6 +74,49 @@ def create_compound_trigger(server, db_name, schema_name, table_name,
raise
def create_view(server, db_name, schema_name, sql_query, view_name):
"""
This function creates a table under provided schema.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param schema_name: schema name
:type schema_name: str
:param sql_query: sql query to create view
:type sql_query: str
:param view_name: view name
:type view_name: str
:return view_id: view id
:rtype: int
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'],
server['sslmode'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
query = sql_query % (schema_name, view_name, schema_name, view_name,
server['username'])
pg_cursor.execute(query)
connection.set_isolation_level(old_isolation_level)
connection.commit()
# Get 'oid' from newly created view
pg_cursor.execute("select oid from pg_class where relname='%s'" %
view_name)
view = pg_cursor.fetchone()
view_id = view[0]
connection.close()
return view_id
except Exception:
traceback.print_exc(file=sys.stderr)
raise
def verify_compound_trigger(server, db_name, trigger_name):
"""
This function verifies table exist in database or not.

View File

@ -1,188 +0,0 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import json
import uuid
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils import server_utils as server_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as views_utils
class MViewsUpdateParameterTestCase(BaseTestGenerator):
"""This class will update the view/mview under schema node."""
scenarios = [
# Fetching default URL for table node.
('Enable custom auto vacuum and set the parameters for table '
'without autovacuum_enabled',
dict(url='/browser/mview/obj/',
api_data={
'autovacuum_custom': True,
'vacuum_table': {
'changed': [
{'name': 'autovacuum_vacuum_cost_delay',
'value': 20},
{'name': 'autovacuum_vacuum_threshold',
'value': 20}
]
}}
)
),
('Change a parameter to zero value '
'without autovacuum_enabled',
dict(url='/browser/mview/obj/',
api_data={
'vacuum_table': {
'changed': [
{'name': 'autovacuum_vacuum_cost_delay',
'value': 0}
]
}}
)
),
('Enable autovacuum_enabled',
dict(url='/browser/mview/obj/',
api_data={'autovacuum_enabled': 't'}
)
),
('Reset individual parameters for table',
dict(url='/browser/mview/obj/',
api_data={
'autovacuum_enabled': 'x',
'vacuum_table': {
'changed': [
{'name': 'autovacuum_vacuum_cost_delay',
'value': None},
]
}}
)
),
('Reset custom auto vacuum',
dict(url='/browser/mview/obj/',
api_data={'autovacuum_custom': False}
)
),
('Enable toast custom auto vacuum and set the parameters for table '
'without autovacuum_enabled',
dict(url='/browser/mview/obj/',
api_data={
'toast_autovacuum': True,
'vacuum_toast': {
'changed': [
{'name': 'autovacuum_vacuum_cost_delay',
'value': 20},
{'name': 'autovacuum_vacuum_threshold',
'value': 20}
]
}}
)
),
('Change a toast parameter to zero value '
'without autovacuum_enabled',
dict(url='/browser/mview/obj/',
api_data={
'vacuum_toast': {
'changed': [
{'name': 'autovacuum_vacuum_cost_delay',
'value': 0}
]
}}
)
),
('Enable toast.autovacuum_enabled',
dict(url='/browser/mview/obj/',
api_data={'toast_autovacuum_enabled': 't'}
)
),
('Reset individual toast parameters for table',
dict(url='/browser/mview/obj/',
api_data={
'toast_autovacuum_enabled': 'x',
'vacuum_toast': {
'changed': [
{'name': 'autovacuum_vacuum_cost_delay',
'value': None},
]
}}
)
),
('Reset auto vacuum',
dict(url='/browser/mview/obj/',
api_data={'toast_autovacuum': False}
)
),
]
m_view_name = "test_mview_put_%s" % (str(uuid.uuid4())[1:8])
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_response = server_utils.connect_server(self, self.server_id)
if server_response["data"]["version"] < 90300 and "mview" in self.url:
message = "Materialized Views are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to update a mview.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to update a mview.")
self.m_view_id = views_utils.get_view_id(self.server, self.db_name,
self.m_view_name)
if self.m_view_id is None:
m_view_sql = "CREATE MATERIALIZED VIEW %s.%s TABLESPACE " \
"pg_default AS SELECT 'test_pgadmin' WITH NO " \
"DATA;ALTER TABLE %s.%s OWNER TO %s"
self.m_view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
m_view_sql,
self.m_view_name)
def runTest(self):
"""This function will update the view/mview under schema node."""
mview_response = views_utils.verify_view(self.server, self.db_name,
self.m_view_name)
if not mview_response:
raise Exception("Could not find the mview to update.")
data = self.api_data
data['oid'] = self.m_view_id
response = self.tester.put(self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.m_view_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEqual(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -20,50 +20,28 @@ from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as views_utils
MVIEW_CHECK_UTILITY_URL = 'browser/mview/check_utility_exists/'
MVIEW_REFRESH_URL = 'browser/mview/refresh_data/'
IS_UTILITY_EXISTS = True
class MViewsUpdateParameterTestCase(BaseTestGenerator):
"""This class will check materialized view refresh functionality."""
scenarios = [
('Check utility route',
dict(type='check_utility')
),
('Refresh materialized view with invalid oid',
dict(type='invalid')
),
('Refresh materialized view with data',
dict(type='with_data')
),
('Refresh materialized view with no data',
dict(type='with_no_data')
),
('Refresh materialized view with data (concurrently)',
dict(type='with_data_concurrently')
),
('Refresh materialized view with no data (concurrently)',
dict(type='with_no_data_concurrently')
),
]
# Generates scenarios
scenarios = utils.generate_scenarios("mview_refresh",
views_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_response = server_utils.connect_server(self, self.server_id)
if server_response["data"]["version"] < 90300 and "mview" in self.url:
message = "Materialized Views are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to update a mview.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
@ -72,75 +50,55 @@ class MViewsUpdateParameterTestCase(BaseTestGenerator):
if not schema_response:
raise Exception("Could not find the schema to update a mview.")
self.m_view_name = "test_mview_put_%s" % (str(uuid.uuid4())[1:8])
m_view_sql = "CREATE MATERIALIZED VIEW %s.%s TABLESPACE pg_default " \
"AS SELECT 'test_pgadmin' WITH NO DATA;ALTER TABLE " \
"%s.%s OWNER TO %s"
query = self.inventory_data['query']
self.m_view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
m_view_sql,
self.m_view_name)
self.m_view_name = "test_mview_put_%s" % (str(uuid.uuid4())[1:8])
self.view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.m_view_name,
query)
def runTest(self):
"""This class will check materialized view refresh functionality"""
mview_response = views_utils.verify_view(self.server, self.db_name,
self.m_view_name)
if not mview_response:
raise Exception("Could not find the mview to update.")
data = None
is_put_request = True
if self.is_put_request:
# Check utility
url_from_test_data = self.url
self.url = 'browser/mview/check_utility_exists/'
response = views_utils.api_get(self)
if response.json['success'] == 0:
self.skipTest("Couldn't check materialized view refresh "
"functionality because utility/binary does "
"not exists.")
# reset self.url value
self.url = url_from_test_data
if self.type == 'check_utility':
is_put_request = False
elif self.type == 'invalid':
data = dict({'concurrent': 'false', 'with_data': 'false'})
elif self.type == 'with_data':
data = dict({'concurrent': 'false', 'with_data': 'true'})
elif self.type == 'with_no_data':
data = dict({'concurrent': 'false', 'with_data': 'false'})
elif self.type == 'with_data_concurrently':
data = dict({'concurrent': 'true', 'with_data': 'true'})
elif self.type == 'with_no_data_concurrently':
data = dict({'concurrent': 'true', 'with_data': 'false'})
if self.is_positive_test:
response = views_utils.api_put(self)
response = self.tester.get(
MVIEW_CHECK_UTILITY_URL + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.m_view_id),
content_type='html/json'
)
self.assertEqual(response.status_code, 200)
if is_put_request and response.json['success'] == 0:
self.skipTest(
"Couldn't check materialized view refresh"
" functionality because utility/binary does not exists."
)
if is_put_request:
mvid = self.m_view_id
if self.type == 'invalid':
mvid = 99999
response = self.tester.put(
MVIEW_REFRESH_URL + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(mvid),
data=json.dumps(data),
follow_redirects=True
)
if self.type == 'invalid':
self.assertEqual(response.status_code, 410)
else:
self.assertEqual(response.status_code, 200)
# On success we get job_id from server
# Assert response
utils.assert_status_code(self, response)
self.assertTrue('job_id' in response.json['data'])
else:
if 'm_view_id' in self.data:
self.view_id = self.data['m_view_id']
response = views_utils.api_put(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
else:
# only check utility
response = views_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
def tearDown(self):
# Disconnect the database

View File

@ -6,9 +6,8 @@
# This software is released under the PostgreSQL Licence
#
##########################################################################
import json
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
@ -17,68 +16,31 @@ from pgadmin.utils import server_utils as server_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as views_utils
class ViewsAddTestCase(BaseTestGenerator):
"""This class will add new view under schema node."""
view_name = "test_view_add_%s" % (str(uuid.uuid4())[1:8])
v_data = {"schema": "",
"owner": "",
"datacl": [],
"seclabels": [],
"name": view_name,
"definition": "SELECT 'Hello World';"
}
m_view_name = "test_mview_add_%s" % (str(uuid.uuid4())[1:8])
m_view_data = {"spcname": "pg_default",
"toast_autovacuum_enabled": False,
"autovacuum_enabled": False,
"schema": "",
"owner": "",
"vacuum_table": [
{"name": "autovacuum_analyze_scale_factor"},
{"name": "autovacuum_analyze_threshold"},
{"name": "autovacuum_freeze_max_age"},
{"name": "autovacuum_vacuum_cost_delay"},
{"name": "autovacuum_vacuum_cost_limit"},
{"name": "autovacuum_vacuum_scale_factor"},
{"name": "autovacuum_vacuum_threshold"},
{"name": "autovacuum_freeze_min_age"},
{"name": "autovacuum_freeze_table_age"}],
"vacuum_toast": [{"name": "autovacuum_freeze_max_age"},
{"name": "autovacuum_vacuum_cost_delay"},
{"name": "autovacuum_vacuum_cost_limit"},
{"name": "autovacuum_vacuum_scale_factor"},
{"name": "autovacuum_vacuum_threshold"},
{"name": "autovacuum_freeze_min_age"},
{"name": "autovacuum_freeze_table_age"}],
"datacl": [],
"seclabels": [],
"name": m_view_name,
"definition": "SELECT 'test_pgadmin';"}
scenarios = [
('Add view under schema node', dict(url='/browser/view/obj/',
data=v_data)),
('Add materialized view under schema node',
dict(url='/browser/mview/obj/', data=m_view_data))
]
# Generates scenarios
scenarios = utils.generate_scenarios("view_create",
views_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_response = server_utils.connect_server(self, self.server_id)
if server_response["data"]["version"] < 90300 and "mview" in self.url:
message = "Materialized Views are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add view.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
@ -90,13 +52,44 @@ class ViewsAddTestCase(BaseTestGenerator):
def runTest(self):
"""This function will add view under schema node."""
db_user = self.server["username"]
self.data["schema"] = self.schema_name
self.data["schema"] = self.schema_id
self.data["owner"] = db_user
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' + str(self.server_id) +
'/' + str(self.db_id) + '/' + str(self.schema_id) + '/',
data=json.dumps(self.data), content_type='html/json')
self.assertEqual(response.status_code, 200)
if "name" in self.data:
view_name = \
self.data["name"] + (str(uuid.uuid4())[1:8])
self.data["name"] = view_name
if self.is_positive_test:
response = views_utils.api_create(self)
# Assert response
utils.assert_status_code(self, response)
# Verify in backend
cross_check_res = views_utils.verify_view(self.server,
self.db_name,
self.data["name"])
self.assertIsNotNone(cross_check_res, "Could not find the newly"
" created check view.")
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
response = views_utils.api_create(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
else:
if 'table_id' in self.data:
self.table_id = self.data['table_id']
response = views_utils.api_create(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database

View File

@ -8,6 +8,7 @@
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
@ -22,38 +23,26 @@ from . import utils as views_utils
class ViewsDeleteTestCase(BaseTestGenerator):
"""This class will delete the view/mview under schema node."""
view_sql = "CREATE OR REPLACE VIEW %s.%s AS SELECT 'Hello World'; " \
"ALTER TABLE %s.%s OWNER TO %s"
m_view_sql = "CREATE MATERIALIZED VIEW %s.%s TABLESPACE pg_default AS " \
"SELECT 'test_pgadmin' WITH NO DATA;ALTER TABLE %s.%s OWNER" \
" TO %s"
scenarios = [
('Delete view under schema node', dict(
url='/browser/view/obj/',
view_name="test_view_delete_%s" % (str(uuid.uuid4())[1:8]),
sql_query=view_sql)),
('Delete materialized view under schema node',
dict(url='/browser/mview/obj/',
view_name="test_mview_delete_%s" % (str(uuid.uuid4())[1:8]),
sql_query=m_view_sql))
]
# Generates scenarios
scenarios = utils.generate_scenarios("view_delete",
views_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_response = server_utils.connect_server(self, self.server_id)
if server_response["data"]["version"] < 90300 and "mview" in self.url:
message = "Materialized Views are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to delete view.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
@ -61,26 +50,69 @@ class ViewsDeleteTestCase(BaseTestGenerator):
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to delete the view.")
# Create view
query = self.inventory_data['query']
self.view_name = "test_view_delete_%s" % (str(uuid.uuid4())[1:8])
self.view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.sql_query,
self.view_name)
self.view_name,
query)
def runTest(self):
"""This function will delete the view/mview under schema node."""
view_response = views_utils.verify_view(self.server, self.db_name,
self.view_name)
if not view_response:
raise Exception("Could not find the view to delete.")
response = self.tester.delete(
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.view_id
),
follow_redirects=True
)
self.assertEqual(response.status_code, 200)
if self.is_list:
self.view_name_2 = "test_view_delete_%s" % (str(uuid.uuid4())[1:8])
self.view_id_2 = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.view_name_2,
query)
view_response = views_utils.verify_view(self.server, self.db_name,
self.view_name_2)
if not view_response:
raise Exception("Could not find the view to delete.")
# list to delete views
self.data['ids'] = [self.view_id, self.view_id_2]
def runTest(self):
"""This function will delete the view/mview under schema node."""
if self.is_positive_test:
if self.is_list:
response = views_utils.api_delete(self, '')
else:
response = views_utils.api_delete(self)
# Assert response
utils.assert_status_code(self, response)
# Verify in backend
view_response = views_utils.verify_view(self.server, self.db_name,
self.view_name)
self.assertIsNone(view_response, "Deleted view still present")
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
response = views_utils.api_delete(self)
elif 'view_id' in self.data:
self.view_id = self.data["view_id"]
response = views_utils.api_delete(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database

View File

@ -1,104 +0,0 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
import json
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils import server_utils as server_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as views_utils
class ViewsDeleteMultipleTestCase(BaseTestGenerator):
"""This class will delete the view/mview under schema node."""
view_sql = "CREATE OR REPLACE VIEW %s.%s AS SELECT 'Hello World'; " \
"ALTER TABLE %s.%s OWNER TO %s"
m_view_sql = "CREATE MATERIALIZED VIEW %s.%s TABLESPACE pg_default AS " \
"SELECT 'test_pgadmin' WITH NO DATA;ALTER TABLE %s.%s OWNER" \
" TO %s"
scenarios = [
('Delete multiple view under schema node', dict(
url='/browser/view/obj/',
view_name=["test_view_delete_%s" % (str(uuid.uuid4())[1:8]),
"test_view_delete_%s" % (str(uuid.uuid4())[1:8])],
sql_query=view_sql)),
('Delete multiple materialized view under schema node',
dict(url='/browser/mview/obj/',
view_name=["test_mview_delete_%s" % (str(uuid.uuid4())[1:8]),
"test_mview_delete_%s" % (str(uuid.uuid4())[1:8])],
sql_query=m_view_sql))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_response = server_utils.connect_server(self, self.server_id)
if server_response["data"]["version"] < 90300 and "mview" in self.url:
message = "Materialized Views are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to delete view.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to delete the view.")
self.view_ids = [views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.sql_query,
self.view_name[0]),
views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.sql_query,
self.view_name[1])
]
def runTest(self):
"""This function will delete the view/mview under schema node."""
view_response = views_utils.verify_view(self.server, self.db_name,
self.view_name[0])
if not view_response:
raise Exception("Could not find the view to delete.")
view_response = views_utils.verify_view(self.server, self.db_name,
self.view_name[1])
if not view_response:
raise Exception("Could not find the view to delete.")
data = {'ids': self.view_ids}
response = self.tester.delete(
"{0}{1}/{2}/{3}/{4}/".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id
),
follow_redirects=True,
data=json.dumps(data),
content_type='html/json'
)
self.assertEqual(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -9,6 +9,7 @@
import uuid
import json
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
@ -23,52 +24,25 @@ from . import utils as views_utils
class ViewsGetTestCase(BaseTestGenerator):
"""This class will fetch the view under schema node."""
view_sql = "CREATE OR REPLACE VIEW %s.%s AS SELECT 'Hello World'; " \
"ALTER TABLE %s.%s OWNER TO %s"
m_view_sql = "CREATE MATERIALIZED VIEW %s.%s TABLESPACE pg_default AS " \
"SELECT 'test_pgadmin' WITH NO DATA;ALTER TABLE %s.%s OWNER" \
" TO %s"
view_sql_with_bracket = "CREATE OR REPLACE VIEW %s.%s AS " \
"SELECT CASE WHEN (pg_db.datistemplate = false " \
"AND pg_db.datallowconn = true AND " \
"(pg_db.datconnlimit = -1 OR " \
"pg_db.datacl is null)) then true else false " \
"end as res FROM pg_database pg_db; " \
"ALTER TABLE %s.%s OWNER TO %s"
scenarios = [
('Get view under schema node', dict(
url='/browser/view/obj/',
view_name="test_view_get_%s" % (str(uuid.uuid4())[1:8]),
sql_query=view_sql,
type='view_without_conditions')),
('Get materialized view under schema node',
dict(url='/browser/mview/obj/',
view_name="test_mview_get_%s" % (str(uuid.uuid4())[1:8]),
sql_query=m_view_sql,
type='m_view_without_conditions')),
('Get view having brackets in script under schema node', dict(
url='/browser/view/obj/',
view_name="test_view_get_%s" % (str(uuid.uuid4())[1:8]),
sql_query=view_sql_with_bracket,
type='view_with_conditions'))
]
# Generates scenarios
scenarios = utils.generate_scenarios("view_get", views_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_response = server_utils.connect_server(self, self.server_id)
if server_response["data"]["version"] < 90300 and "mview" in self.url:
message = "Materialized Views are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to fetch the view.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
@ -76,25 +50,56 @@ class ViewsGetTestCase(BaseTestGenerator):
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to fetch the view.")
# Create view
query = self.inventory_data['query']
self.view_name = "test_view_get_%s" % (str(uuid.uuid4())[1:8])
self.view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.sql_query,
self.view_name)
self.view_name,
query)
# In case of multiple views
if self.is_list:
self.view_name_2 = "test_view_get_%s" % (str(uuid.uuid4())[1:8])
self.view_id_2 = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.view_name_2,
query)
def runTest(self):
"""This function will fetch the view/mview under schema node."""
response = self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.view_id
),
follow_redirects=True
)
self.assertEqual(response.status_code, 200)
if self.type == 'view_with_conditions':
response_data = json.loads(response.data.decode('utf-8'))
self.assertIn('((pg_db.datistemplate', response_data['definition'])
if self.is_positive_test:
if self.is_list:
response = views_utils.api_get(self, '')
else:
response = views_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
# Check definition data
test_result_data = self.expected_data["test_result_data"]
if bool(test_result_data):
response_data = json.loads(response.data.decode('utf-8'))
self.assertIn(test_result_data["definition"],
response_data['definition'])
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
response = views_utils.api_get(self)
elif 'view_id' in self.data:
# Non-existing view id
self.view_id = self.data["view_id"]
response = views_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database

View File

@ -0,0 +1,82 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
import json
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils import server_utils as server_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as views_utils
class ViewsDependeciesDependentsGetTestCase(BaseTestGenerator):
"""This class will fetch dependenciey & dependents for
the view under schema node."""
# Generates scenarios
scenarios = utils.generate_scenarios("view_dependecies_dependents",
views_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to fetch the view.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to fetch the view.")
# Create view
query = self.inventory_data['query']
self.view_name = "test_view_get_%s" % (str(uuid.uuid4())[1:8])
self.view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.view_name,
query)
def runTest(self):
"""This function will fetch dependenciey & dependents for
the view/mview under schema node."""
if self.is_positive_test:
if self.is_dependent:
self.url = self.url + 'dependent/'
response = views_utils.api_get(self)
else:
self.url = self.url + 'dependency/'
response = views_utils.api_get(self)
utils.assert_status_code(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,90 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
import json
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils import server_utils as server_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as views_utils
class ViewsGetMsqlTestCase(BaseTestGenerator):
"""This class will fetch the modified view/mview sql under schema node."""
# Generates scenarios
scenarios = utils.generate_scenarios("view_get_msql",
views_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to fetch the view.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to fetch the view.")
# Create view
query = self.inventory_data['query']
self.view_name = "test_view_get_%s" % (str(uuid.uuid4())[1:8])
self.view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.view_name,
query)
def runTest(self):
"""This function will fetch the modified view/mview sql under
schema node."""
if self.is_positive_test:
url_encode_data = {"oid": self.view_id,
"comment": self.data['comment']}
response = views_utils.api_get_msql(self, url_encode_data)
# Assert response
utils.assert_status_code(self, response)
else:
if 'view_id' in self.data:
# Non-existing view id
self.view_id = self.data["view_id"]
response = views_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,99 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
import json
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils import server_utils as server_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as views_utils
class ViewsGetNodesTestCase(BaseTestGenerator):
"""This class will fetch the view nodes under schema node."""
# Generates scenarios
scenarios = utils.generate_scenarios("view_get_nodes",
views_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to fetch the view.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to fetch the view.")
# Create view
query = self.inventory_data['query']
self.view_name = "test_view_get_%s" % (str(uuid.uuid4())[1:8])
self.view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.view_name,
query)
# In case of multiple views
if self.is_list:
self.view_name_2 = "test_view_get_%s" % (str(uuid.uuid4())[1:8])
self.check_constraint_id_2 = views_utils.\
create_view(self.server, self.db_name, self.schema_name,
self.view_name_2, query)
def runTest(self):
"""This function will fetch the view/mview nodes under schema node."""
if self.is_positive_test:
if self.is_list:
response = views_utils.api_get(self, '')
else:
response = views_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
response = views_utils.api_get(self)
elif 'view_id' in self.data:
# Non-existing view/mview id
self.view_id = self.data["view_id"]
response = views_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -9,6 +9,7 @@
import json
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
@ -23,38 +24,24 @@ from . import utils as views_utils
class ViewsUpdateTestCase(BaseTestGenerator):
"""This class will update the view/mview under schema node."""
view_sql = "CREATE OR REPLACE VIEW %s.%s AS SELECT 'Hello World'; " \
"ALTER TABLE %s.%s OWNER TO %s"
m_view_sql = "CREATE MATERIALIZED VIEW %s.%s TABLESPACE pg_default AS " \
"SELECT 'test_pgadmin' WITH NO DATA;ALTER TABLE %s.%s OWNER" \
" TO %s"
scenarios = [
('Update view under schema node', dict(
url='/browser/view/obj/',
view_name="test_view_put_%s" % (str(uuid.uuid4())[1:8]),
sql_query=view_sql)),
('Update materialized view under schema node',
dict(url='/browser/mview/obj/',
view_name="test_mview_put_%s" % (str(uuid.uuid4())[1:8]),
sql_query=m_view_sql))
]
# Generates scenarios
scenarios = utils.generate_scenarios("view_put", views_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_response = server_utils.connect_server(self, self.server_id)
if server_response["data"]["version"] < 90300 and "mview" in self.url:
message = "Materialized Views are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to update a view.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
@ -62,11 +49,18 @@ class ViewsUpdateTestCase(BaseTestGenerator):
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to update a view.")
self.view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.sql_query,
query = self.inventory_data['query']
self.view_name = "test_view_update_%s" % (str(uuid.uuid4())[1:8])
self.view_id = views_utils.get_view_id(self.server, self.db_name,
self.view_name)
if self.view_id is None:
self.view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.view_name, query)
def runTest(self):
"""This function will update the view/mview under schema node."""
@ -74,17 +68,23 @@ class ViewsUpdateTestCase(BaseTestGenerator):
self.view_name)
if not view_response:
raise Exception("Could not find the view to update.")
data = {"id": self.view_id,
"comment": "This is test comment"
}
response = self.tester.put(
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.view_id
),
data=json.dumps(data),
follow_redirects=True)
self.assertEqual(response.status_code, 200)
self.data["oid"] = self.view_id
if self.is_positive_test:
response = views_utils.api_put(self)
# Assert response
utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
response = views_utils.api_put(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database

View File

@ -0,0 +1,119 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
import json
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils import server_utils as server_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as views_utils
class ViewsSqlTestCase(BaseTestGenerator):
"""This class will fetch the view/mview sql under schema node."""
# Generates scenarios
scenarios = utils.generate_scenarios("view_sql", views_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
# Check DB version
if "server_min_version" in self.data:
server_con = server_utils.connect_server(self, self.server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to check version")
if "type" in server_con["data"] and \
server_con["data"]["type"] == "pg":
self.skipTest("Compound Triggers are not supported by PG.")
elif server_con["data"]["type"] == "ppas" \
and server_con["data"]["version"] < self.data[
"server_min_version"]:
self.skipTest(self.data["skip_msg"])
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to fetch the view.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to fetch the view.")
# Create view
query = self.inventory_data['query']
self.view_name = "test_view_sql_%s" % (str(uuid.uuid4())[1:8])
self.view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.view_name,
query)
if hasattr(self, "trigger_fun_required"):
self.func_name = "trigger_func_get_%s" % str(uuid.uuid4())[1:8]
self.function_info = views_utils.\
create_trigger_function_with_trigger(self.server, self.db_name,
self.schema_name,
self.func_name)
self.trigger_name = \
"test_trigger_get_%s" % (str(uuid.uuid4())[1:8])
self.trigger_id = views_utils.create_trigger(self.server,
self.db_name,
self.schema_name,
self.view_name,
self.trigger_name,
self.func_name,
"a")
def runTest(self):
"""This function will fetch the view/mview sql under schema node."""
if self.is_positive_test:
response = views_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
response = views_utils.api_get(self)
elif 'view_id' in self.data:
# Non-existing view id
self.view_id = self.data["view_id"]
response = views_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -10,11 +10,150 @@
import sys
import traceback
import os
import json
from regression.python_test_utils import test_utils as utils
from urllib.parse import urlencode
# Load test data from json file.
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
with open(CURRENT_PATH + "/view_test_data.json") as data_file:
test_cases = json.load(data_file)
def create_view(server, db_name, schema_name, sql_query, view_name):
# api call methods
def api_create(self):
return self.tester.post("{0}{1}/{2}/{3}/{4}/".
format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id
),
data=json.dumps(self.data),
content_type='html/json')
def api_get(self, view_id=None):
if view_id is None:
view_id = self.view_id
return self.tester.get("{0}{1}/{2}/{3}/{4}/{5}".
format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, view_id),
follow_redirects=True)
def api_get_msql(self, url_encode_data):
return self.tester.get("{0}{1}/{2}/{3}/{4}/{5}?{6}".
format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.view_id,
urlencode(url_encode_data)),
follow_redirects=True
)
def api_delete(self, view_id=None):
if view_id is None:
view_id = self.view_id
return self.tester.delete("{0}{1}/{2}/{3}/{4}/{5}".
format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, view_id),
data=json.dumps(self.data),
follow_redirects=True)
def api_put(self):
return self.tester.put("{0}{1}/{2}/{3}/{4}/{5}".
format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.view_id),
data=json.dumps(self.data),
follow_redirects=True)
def create_trigger_function_with_trigger(server, db_name, schema_name,
func_name):
"""This function add the trigger function to schema"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'],
server['sslmode'])
pg_cursor = connection.cursor()
query = "CREATE FUNCTION " + schema_name + "." + func_name + \
"()" \
" RETURNS trigger LANGUAGE 'plpgsql' STABLE LEAKPROOF" \
" SECURITY DEFINER SET enable_sort=true AS $BODY$ BEGIN" \
" NULL; END; $BODY$"
pg_cursor.execute(query)
connection.commit()
# Get 'oid' from newly created function
pg_cursor.execute("SELECT pro.oid, pro.proname FROM"
" pg_proc pro WHERE pro.proname='%s'" %
func_name)
functions = pg_cursor.fetchone()
connection.close()
return functions
except Exception:
traceback.print_exc(file=sys.stderr)
def create_trigger(server, db_name, schema_name, table_name, trigger_name,
trigger_func_name, trigger_func_arg=None):
"""
This function creates a column under provided table.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param schema_name: schema name
:type schema_name: str
:param table_name: table name
:type table_name: str
:param trigger_name: trigger name
:type trigger_name: str
:param trigger_func_name: trigger function name
:type trigger_func_name: str
:return trigger_id: trigger id
:rtype: int
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'],
server['sslmode'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
query = "CREATE TRIGGER %s INSTEAD OF DELETE ON %s.%s FOR EACH ROW " \
"EXECUTE PROCEDURE %s.%s(%s)" % (trigger_name, schema_name,
table_name, schema_name,
trigger_func_name,
trigger_func_arg)
pg_cursor.execute(query)
connection.set_isolation_level(old_isolation_level)
connection.commit()
pg_cursor.execute("SELECT oid FROM pg_trigger where tgname='%s'" %
trigger_name)
trigger = pg_cursor.fetchone()
trigger_id = ''
if trigger:
trigger_id = trigger[0]
connection.close()
return trigger_id
except Exception:
traceback.print_exc(file=sys.stderr)
raise
def create_view(server, db_name, schema_name, view_name, sql_query=None,
mview_index=None):
"""
This function creates a table under provided schema.
:param server: server details
@ -40,8 +179,7 @@ def create_view(server, db_name, schema_name, sql_query, view_name):
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
query = sql_query % (schema_name, view_name, schema_name, view_name,
server['username'])
query = eval(sql_query)
pg_cursor.execute(query)
connection.set_isolation_level(old_isolation_level)
connection.commit()