diff --git a/docs/en_US/release_notes_4_19.rst b/docs/en_US/release_notes_4_19.rst index c906a58e1..c216fbf5d 100644 --- a/docs/en_US/release_notes_4_19.rst +++ b/docs/en_US/release_notes_4_19.rst @@ -14,6 +14,7 @@ New features Housekeeping ************ +| `Issue #5088 `_ - Improve code coverage and API test cases for the Event Trigger module. | `Issue #5176 `_ - Enhance logging by tracking stdout and stderr of subprocess when log level set to DEBUG. Bug fixes diff --git a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/__init__.py index 4206f47cf..369606952 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/__init__.py +++ b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/__init__.py @@ -676,7 +676,9 @@ class EventTriggerView(PGChildNodeView): if len(res['rows']) == 0: return gone( - _("Could not find the specified event trigger on the server.") + gettext( + "Could not find the specified event trigger on the " + "server.") ) result = res['rows'][0] diff --git a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/event_triggers_test_data.json b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/event_triggers_test_data.json new file mode 100644 index 000000000..2a6aab89d --- /dev/null +++ b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/event_triggers_test_data.json @@ -0,0 +1,532 @@ +{ + "create_event_trigger": [ + { + "name": "Create DDL_COMMAND_END Event Trigger", + "url": "/browser/event_trigger/obj/", + "is_positive_test": true, + "test_data": { + "enabled": "O", + "eventfunname": "PLACE_HOLDER", + "eventname": "ddl_command_start", + "eventowner": "PLACE_HOLDER", + "name": "PLACE_HOLDER" + }, + "parameters_to_compare": [ + "enabled", + "eventname", + "eventowner", + "name" + ], + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + }, + { + "name": "Create SQL_DROP Event Trigger", + "url": "/browser/event_trigger/obj/", + "is_positive_test": true, + "test_data": { + "enabled": "O", + "eventfunname": "PLACE_HOLDER", + "eventname": "ddl_command_start", + "eventowner": "PLACE_HOLDER", + "name": "PLACE_HOLDER" + }, + "parameters_to_compare": [ + "enabled", + "eventname", + "eventowner", + "name" + ], + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + }, + { + "name": "Create DDL_COMMAND_START Event Trigger", + "url": "/browser/event_trigger/obj/", + "is_positive_test": true, + "test_data": { + "enabled": "O", + "eventfunname": "PLACE_HOLDER", + "eventname": "ddl_command_start", + "eventowner": "PLACE_HOLDER", + "name": "PLACE_HOLDER" + }, + "parameters_to_compare": [ + "enabled", + "eventname", + "eventowner", + "name" + ], + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + }, + { + "name": "Error while creating a Event Trigger", + "url": "/browser/event_trigger/obj/", + "error_creating_event_trigger": true, + "is_positive_test": false, + "test_data": { + "enabled": "O", + "eventfunname": "PLACE_HOLDER", + "eventname": "DDL_COMMAND_END", + "eventowner": "PLACE_HOLDER", + "name": "PLACE_HOLDER", + "providers": [] + }, + "parameters_to_compare": [ + ], + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "(False, 'Mocked Internal Server Error while creating a event trigger')" + }, + "expected_data": { + "status_code": 500 + } + }, + { + "name": "Internal server error", + "url": "/browser/event_trigger/obj/", + "internal_server_error": true, + "is_positive_test": false, + "test_data": { + "enabled": "O", + "eventfunname": "PLACE_HOLDER", + "eventname": "DDL_COMMAND_END", + "eventowner": "PLACE_HOLDER", + "name": "PLACE_HOLDER", + "providers": [] + }, + "parameters_to_compare": [ + "name", + "eventname", + "eventowner", + "eventfunname" + ], + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "(True, True), (False, 'Mocked Internal Server Error')" + }, + "expected_data": { + "status_code": 500 + } + }, + { + "name": "Error while getting oid of newly created event trigger", + "url": "/browser/event_trigger/obj/", + "error_getting_event_trigger_oid": true, + "is_positive_test": false, + "test_data": { + "enabled": "O", + "eventfunname": "PLACE_HOLDER", + "eventname": "DDL_COMMAND_END", + "eventowner": "PLACE_HOLDER", + "name": "PLACE_HOLDER", + "providers": [] + }, + "parameters_to_compare": [ + ], + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "(True, True), (True, True), (False, 'Mocked Internal Server Error while getting oid of created event trigger')" + }, + "expected_data": { + "status_code": 500 + } + }, + { + "name": "Create DDL_COMMAND_END Event Trigger with less parameter", + "url": "/browser/event_trigger/obj/", + "is_positive_test": false, + "parameter_missing": true, + "test_data": { + "enabled": "O", + "eventfunname": "PLACE_HOLDER", + "eventname": "ddl_command_start", + "eventowner": "PLACE_HOLDER", + "name": "PLACE_HOLDER" + }, + "parameters_to_compare": [ + "enabled", + "eventname", + "eventowner", + "name" + ], + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 400 + } + } + ], + "get_event_trigger": [ + { + "name": "Get Event Trigger", + "url": "/browser/event_trigger/obj/", + "is_positive_test": true, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + }, + { + "name": "Error while fetching a Event Trigger", + "url": "/browser/event_trigger/obj/", + "error_fetching_event_trigger": true, + "is_positive_test": false, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict", + "return_value": "(False, 'Mocked Internal Server Error while fetching a event trigger')" + }, + "expected_data": { + "status_code": 500 + } + }, + { + "name": "Fetch event trigger using wrong event trigger id", + "url": "/browser/event_trigger/obj/", + "wrong_event_trigger_id": true, + "is_positive_test": false, + "mocking_required": false, + "mock_data": { + }, + "expected_data": { + "status_code": 410 + } + }, + { + "name": "Get Event Trigger list", + "url": "/browser/event_trigger/obj/", + "is_positive_test": true, + "event_trigger_list": true, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + }, + { + "name": "Error while fetching a Event Trigger list", + "url": "/browser/event_trigger/obj/", + "error_fetching_event_trigger": true, + "is_positive_test": false, + "event_trigger_list": true, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict", + "return_value": "(False, 'Mocked Internal Server Error while fetching a event trigger')" + }, + "expected_data": { + "status_code": 500 + } + } + ], + "delete_event_trigger": [ + { + "name": "delete Event Trigger", + "url": "/browser/event_trigger/obj/", + "is_positive_test": true, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + }, + { + "name": "Error while deleting a Event Trigger", + "url": "/browser/event_trigger/obj/", + "error_deleting_event_trigger": true, + "is_positive_test": false, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "(False, 'Mocked Internal Server Error while deleting a event trigger')" + }, + "expected_data": { + "status_code": 500 + } + }, + { + "name": "delete event trigger using wrong event trigger id", + "url": "/browser/event_trigger/obj/", + "wrong_event_trigger_id": true, + "is_positive_test": false, + "mocking_required": false, + "mock_data": { + }, + "expected_data": { + "status_code": 410 + } + }, + { + "name": "Error while deleting a created Event Trigger", + "url": "/browser/event_trigger/obj/", + "error_deleting_created_event_trigger": true, + "is_positive_test": false, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "(True, True),(False, 'Mocked Internal Server Error while deleting a event trigger')" + }, + "expected_data": { + "status_code": 500 + } + } + ], + "update_event_trigger": [ + { + "name": "Update Event Trigger", + "url": "/browser/event_trigger/obj/", + "is_positive_test": true, + "mocking_required": false, + "test_data": { + "comment": "This is event trigger update comment", + "id": "PLACE_HOLDER" + }, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + }, + { + "name": "Error while updating a Event Trigger", + "url": "/browser/event_trigger/obj/", + "error_updating_event_trigger": true, + "test_data": { + "comment": "This is event trigger update comment", + "id": "PLACE_HOLDER" + }, + "is_positive_test": false, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "(False, 'Mocked Internal Server Error while fetching a event trigger')" + }, + "expected_data": { + "status_code": 500 + } + }, + { + "name": "Update event trigger using wrong event trigger id", + "url": "/browser/event_trigger/obj/", + "wrong_event_trigger_id": true, + "is_positive_test": false, + "test_data": { + "comment": "This is event trigger update comment", + "id": "PLACE_HOLDER" + }, + "mocking_required": false, + "mock_data": { + }, + "expected_data": { + "status_code": 410 + } + }, + { + "name": "Error while updating a Event Trigger", + "url": "/browser/event_trigger/obj/", + "error_in_db": true, + "is_positive_test": false, + "test_data": { + "comment": "This is event trigger update comment", + "id": "PLACE_HOLDER" + }, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.browser.server_groups.servers.databases.event_triggers.EventTriggerView.get_sql", + "return_value": "('')" + }, + "expected_data": { + "status_code": 200 + } + } + ], + "dependency_dependent_event_trigger": [ + { + "name": "Get Event Trigger dependency", + "url": "/browser/event_trigger/dependency/", + "is_positive_test": true, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + }, + { + "name": "Get Event Trigger dependent", + "url": "/browser/event_trigger/dependent/", + "is_positive_test": true, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + } + ], + "get_event_trigger_nodes_and_node": [ + { + "name": "Get Event Trigger nodes", + "url": "/browser/event_trigger/nodes/", + "is_positive_test": true, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + }, + { + "name": "Error while fetching a Event Trigger nodes", + "url": "/browser/event_trigger/nodes/", + "error_fetching_event_trigger": true, + "is_positive_test": false, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray", + "return_value": "(False, 'Mocked Internal Server Error while fetching a event trigger nodes')" + }, + "expected_data": { + "status_code": 500 + } + }, + { + "name": "Get Event Trigger node", + "url": "/browser/event_trigger/nodes/", + "is_positive_test": true, + "node": true, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + }, + { + "name": "Error while fetching a Event Trigger node", + "url": "/browser/event_trigger/nodes/", + "error_fetching_event_trigger": true, + "is_positive_test": false, + "node": true, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray", + "return_value": "(False, 'Mocked Internal Server Error while fetching a event trigger nodes')" + }, + "expected_data": { + "status_code": 500 + } + } + ], + "get_event_trigger_sql": [ + { + "name": "Get Event Trigger SQL", + "url": "/browser/event_trigger/sql/", + "is_positive_test": true, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + }, + { + "name": "Error while fetching a created Event Trigger sql", + "url": "/browser/event_trigger/sql/", + "error_fetching_event_trigger": true, + "is_positive_test": false, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict", + "return_value": "(False, 'Mocked Internal Server Error while fetching a event trigger sql')" + }, + "expected_data": { + "status_code": 500 + } + }, + { + "name": "Error while fetching a DB created Event Trigger sql", + "url": "/browser/event_trigger/sql/", + "error_fetching_event_trigger_db": true, + "is_positive_test": false, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "(False, 'Mocked Internal Server Error while fetching a DB created Event Trigger sql')" + }, + "expected_data": { + "status_code": 500 + } + }, + { + "name": "Get modified Event Trigger SQL", + "url": "/browser/event_trigger/msql/", + "is_positive_test": true, + "mocking_required": false, + "msql": true, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + }, + { + "name": "Get modified Event Trigger SQL when no trigger function passed", + "url": "/browser/event_trigger/msql/", + "is_positive_test": false, + "error_fetching_event_trigger_msql": true, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 410 + } + } + ], + "get_event_trigger_functions": [ + { + "name": "Get Event Trigger functions", + "url": "/browser/event_trigger/fopts/", + "is_positive_test": true, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + }, + { + "name": "Error while fetching a Event Trigger functions", + "url": "/browser/event_trigger/fopts/", + "error_fetching_event_trigger": true, + "is_positive_test": false, + "event_trigger_functions": true, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray", + "return_value": "(False, 'Mocked Internal Server Error while fetching a event trigger functions')" + }, + "expected_data": { + "status_code": 500 + } + } + ], + "delete_multiple_event_trigger" :[ + { + "name": "Delete multiple Event Trigger", + "url": "/browser/event_trigger/obj/", + "is_positive_test": true, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200 + } + } + ] +} diff --git a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_add.py b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_add.py index a671fd353..526aeba76 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_add.py +++ b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_add.py @@ -9,6 +9,7 @@ import json import uuid +import sys from pgadmin.browser.server_groups.servers.databases.schemas.tests import \ utils as schema_utils @@ -19,15 +20,18 @@ from pgadmin.utils.route import BaseTestGenerator from regression import parent_node_dict from regression import trigger_funcs_utils from regression.python_test_utils import test_utils as utils +from . import utils as event_trigger_utils + +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch class EventTriggerAddTestCase(BaseTestGenerator): """ This class will add new event trigger under test schema. """ - scenarios = [ - # Fetching default URL for event trigger node. - ('Fetch Event Trigger Node URL', - dict(url='/browser/event_trigger/obj/')) - ] + scenarios = utils.generate_scenarios('create_event_trigger', + event_trigger_utils.test_cases) def setUp(self): self.schema_data = parent_node_dict['schema'][-1] @@ -53,6 +57,18 @@ class EventTriggerAddTestCase(BaseTestGenerator): self.server, self.db_name, self.schema_name, self.func_name, server_version) + def create_event_trigger(self): + """ + This function create a event trigger and returns the created event + trigger response + :return: created event trigger response + """ + return self.tester.post( + self.url + str(utils.SERVER_GROUP) + '/' + + str(self.server_id) + '/' + str(self.db_id) + + '/', data=json.dumps(self.test_data), + content_type='html/json') + def runTest(self): """ This function will add event trigger under test database. """ db_con = database_utils.connect_database(self, utils.SERVER_GROUP, @@ -71,20 +87,48 @@ class EventTriggerAddTestCase(BaseTestGenerator): func_name) if not func_response: raise Exception("Could not find the trigger function.") - data = { - "enabled": "O", - "eventfunname": "%s.%s" % (self.schema_name, self.func_name), - "eventname": "DDL_COMMAND_END", - "eventowner": self.db_user, - "name": "event_trigger_add_%s" % (str(uuid.uuid4())[1:8]), - "providers": [] - } - response = self.tester.post( - self.url + str(utils.SERVER_GROUP) + '/' + - str(self.server_id) + '/' + str(self.db_id) + - '/', data=json.dumps(data), - content_type='html/json') - self.assertAlmostEquals(response.status_code, 200) + + self.test_data['eventfunname'] = "%s.%s" % ( + self.schema_name, self.func_name) + self.test_data['eventowner'] = self.db_user + self.test_data['name'] = "event_trigger_add_%s" % ( + str(uuid.uuid4())[1:8]) + actual_response_code = True + expected_response_code = False + + if self.is_positive_test: + self.create_event_trigger() + expected_output = event_trigger_utils.verify_event_trigger_node( + self) + del self.test_data['eventfunname'] + self.assertDictEqual(expected_output, self.test_data) + else: + if hasattr(self, "error_creating_event_trigger"): + with patch(self.mock_data["function_name"], + return_value=eval(self.mock_data["return_value"])): + response = self.create_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + + if hasattr(self, "error_getting_event_trigger_oid"): + with patch(self.mock_data["function_name"], + side_effect=eval(self.mock_data["return_value"])): + response = self.create_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + if hasattr(self, "internal_server_error"): + with patch(self.mock_data["function_name"], + side_effect=eval(self.mock_data["return_value"])): + response = self.create_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + if hasattr(self, "parameter_missing"): + del self.test_data['eventfunname'] + response = self.create_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + + self.assertEquals(actual_response_code, expected_response_code) def tearDown(self): # Disconnect database diff --git a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_delete.py b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_delete.py index 8122c15a8..ec00d86f4 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_delete.py +++ b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_delete.py @@ -8,6 +8,7 @@ ########################################################################## import uuid +import sys from pgadmin.browser.server_groups.servers.databases.schemas.tests import \ utils as schema_utils @@ -20,14 +21,16 @@ from regression import trigger_funcs_utils from regression.python_test_utils import test_utils as utils from . import utils as event_trigger_utils +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch + class EventTriggerDeleteTestCase(BaseTestGenerator): """ This class will delete added event trigger under test database. """ - scenarios = [ - # Fetching default URL for event trigger node. - ('Fetch Event Trigger Node URL', - dict(url='/browser/event_trigger/obj/')) - ] + scenarios = utils.generate_scenarios('delete_event_trigger', + event_trigger_utils.test_cases) def setUp(self): self.schema_data = parent_node_dict['schema'][-1] @@ -58,6 +61,18 @@ class EventTriggerDeleteTestCase(BaseTestGenerator): self.server, self.db_name, self.schema_name, self.func_name, self.trigger_name) + def delete_event_trigger(self): + """ + This function returns the event trigger delete response + :return: event trigger delete response + """ + return self.tester.delete( + self.url + str(utils.SERVER_GROUP) + '/' + + str(self.server_id) + '/' + + str(self.db_id) + '/' + + str(self.event_trigger_id), + follow_redirects=True) + def runTest(self): """ This function will delete event trigger under test database. """ db_con = database_utils.connect_database(self, utils.SERVER_GROUP, @@ -81,13 +96,32 @@ class EventTriggerDeleteTestCase(BaseTestGenerator): self.trigger_name) if not trigger_response: raise Exception("Could not find event trigger.") - del_response = self.tester.delete( - self.url + str(utils.SERVER_GROUP) + '/' + - str(self.server_id) + '/' + - str(self.db_id) + '/' + - str(self.event_trigger_id), - follow_redirects=True) - self.assertEquals(del_response.status_code, 200) + actual_response_code = True + expected_response_code = False + + if self.is_positive_test: + response = self.delete_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + else: + if hasattr(self, "error_deleting_event_trigger"): + with patch(self.mock_data["function_name"], + return_value=eval(self.mock_data["return_value"])): + response = self.delete_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + if hasattr(self, "error_deleting_created_event_trigger"): + with patch(self.mock_data["function_name"], + side_effect=eval(self.mock_data["return_value"])): + response = self.delete_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + if hasattr(self, "wrong_event_trigger_id"): + self.event_trigger_id = 99999 + response = self.delete_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + self.assertEquals(actual_response_code, expected_response_code) def tearDown(self): # Disconnect the database diff --git a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_delete_multiple.py b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_delete_multiple.py index 8893b78a9..b0e3e4eca 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_delete_multiple.py +++ b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_delete_multiple.py @@ -24,11 +24,8 @@ from . import utils as event_trigger_utils class EventTriggerMultipleDeleteTestCase(BaseTestGenerator): """ This class will delete added event trigger under test database. """ - scenarios = [ - # Fetching default URL for event trigger node. - ('Fetch Event Trigger Node URL', - dict(url='/browser/event_trigger/obj/')) - ] + scenarios = utils.generate_scenarios('delete_multiple_event_trigger', + event_trigger_utils.test_cases) def setUp(self): self.schema_data = parent_node_dict['schema'][-1] @@ -64,6 +61,20 @@ class EventTriggerMultipleDeleteTestCase(BaseTestGenerator): self.server, self.db_name, self.schema_name, self.func_name, self.trigger_names[1])) + def delete_multiple(self, data): + """ + This function returns multiple event trigger delete response + :param data: event trigger ids to delete + :return: event trigger delete response + """ + return self.tester.delete( + self.url + str(utils.SERVER_GROUP) + '/' + + str(self.server_id) + '/' + + str(self.db_id) + '/', + follow_redirects=True, + data=json.dumps(data), + content_type='html/json') + def runTest(self): """ This function will delete event trigger under test database. """ db_con = database_utils.connect_database(self, utils.SERVER_GROUP, @@ -93,14 +104,16 @@ class EventTriggerMultipleDeleteTestCase(BaseTestGenerator): if not trigger_response: raise Exception("Could not find event trigger.") data = {'ids': self.event_trigger_ids} - del_response = self.tester.delete( - self.url + str(utils.SERVER_GROUP) + '/' + - str(self.server_id) + '/' + - str(self.db_id) + '/', - follow_redirects=True, - data=json.dumps(data), - content_type='html/json') - self.assertEquals(del_response.status_code, 200) + + actual_response_code = True + expected_response_code = False + + if self.is_positive_test: + response = self.delete_multiple(data) + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + + self.assertEquals(actual_response_code, expected_response_code) def tearDown(self): # Disconnect the database diff --git a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_dependancy_dependent.py b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_dependancy_dependent.py new file mode 100644 index 000000000..dd9a9284e --- /dev/null +++ b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_dependancy_dependent.py @@ -0,0 +1,105 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2020, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +import uuid + +from pgadmin.browser.server_groups.servers.databases.schemas.tests import \ + utils as schema_utils +from pgadmin.browser.server_groups.servers.databases.tests import \ + utils as database_utils +from pgadmin.utils import server_utils as server_utils +from pgadmin.utils.route import BaseTestGenerator +from regression import parent_node_dict +from regression import trigger_funcs_utils +from regression.python_test_utils import test_utils as utils +from . import utils as event_trigger_utils + + +class EventTriggerDependencyDependentTestCase(BaseTestGenerator): + """ This class will fetch added event trigger dependency and dependent + under test database. """ + scenarios = utils.generate_scenarios('dependency_dependent_event_trigger', + event_trigger_utils.test_cases) + + def setUp(self): + self.schema_data = parent_node_dict['schema'][-1] + self.server_id = self.schema_data['server_id'] + self.db_id = self.schema_data['db_id'] + self.schema_name = self.schema_data['schema_name'] + self.schema_id = self.schema_data['schema_id'] + self.extension_name = "postgres_fdw" + self.db_name = parent_node_dict["database"][-1]["db_name"] + self.db_user = self.server["username"] + self.func_name = "trigger_func_%s" % str(uuid.uuid4())[1:8] + self.trigger_name = "event_trigger_delete_%s" % ( + str(uuid.uuid4())[1:8]) + server_con = server_utils.connect_server(self, self.server_id) + if not server_con["info"] == "Server connected.": + raise Exception("Could not connect to server to add resource " + "groups.") + server_version = 0 + if "type" in server_con["data"]: + if server_con["data"]["version"] < 90300: + message = "Event triggers are not supported by PG9.2 " \ + "and PPAS9.2 and below." + self.skipTest(message) + self.function_info = trigger_funcs_utils.create_trigger_function( + self.server, self.db_name, self.schema_name, self.func_name, + server_version) + self.event_trigger_id = event_trigger_utils.create_event_trigger( + self.server, self.db_name, self.schema_name, self.func_name, + self.trigger_name) + + def runTest(self): + """ This function will delete event trigger under test database. """ + db_con = database_utils.connect_database(self, utils.SERVER_GROUP, + self.server_id, self.db_id) + if not db_con['data']["connected"]: + raise Exception("Could not connect to database.") + schema_response = schema_utils.verify_schemas(self.server, + self.db_name, + self.schema_name) + if not schema_response: + raise Exception("Could not find the schema.") + func_name = self.function_info[1] + func_response = trigger_funcs_utils.verify_trigger_function( + self.server, + self.db_name, + func_name) + if not func_response: + raise Exception("Could not find the trigger function.") + trigger_response = event_trigger_utils.verify_event_trigger( + self.server, self.db_name, + self.trigger_name) + if not trigger_response: + raise Exception("Could not find event trigger.") + actual_response_code = True + expected_response_code = False + + if self.is_positive_test: + response = self.get_dependency_dependent() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + self.assertEquals(actual_response_code, expected_response_code) + + def get_dependency_dependent(self): + """ + This function returns the event trigger dependency and dependent + :return: event trigger dependency and dependent + """ + return self.tester.get( + self.url + str(utils.SERVER_GROUP) + '/' + + str(self.server_id) + '/' + + str(self.db_id) + '/' + + str(self.event_trigger_id), + follow_redirects=True) + + def tearDown(self): + # Disconnect the database + database_utils.disconnect_database(self, self.server_id, self.db_id) diff --git a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_functions.py b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_functions.py new file mode 100644 index 000000000..5d965f9a1 --- /dev/null +++ b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_functions.py @@ -0,0 +1,116 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2020, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +import uuid +import sys + +from pgadmin.browser.server_groups.servers.databases.schemas.tests import \ + utils as schema_utils +from pgadmin.browser.server_groups.servers.databases.tests import \ + utils as database_utils +from pgadmin.utils import server_utils as server_utils +from pgadmin.utils.route import BaseTestGenerator +from regression import parent_node_dict +from regression import trigger_funcs_utils +from regression.python_test_utils import test_utils as utils +from . import utils as event_trigger_utils + +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch + + +class EventTriggerFunctionsTestCase(BaseTestGenerator): + """ This class will fetch added event trigger functions + under test database. """ + scenarios = utils.generate_scenarios('get_event_trigger_functions', + event_trigger_utils.test_cases) + + def setUp(self): + self.schema_data = parent_node_dict['schema'][-1] + self.server_id = self.schema_data['server_id'] + self.db_id = self.schema_data['db_id'] + self.schema_name = self.schema_data['schema_name'] + self.schema_id = self.schema_data['schema_id'] + self.extension_name = "postgres_fdw" + self.db_name = parent_node_dict["database"][-1]["db_name"] + self.db_user = self.server["username"] + self.func_name = "trigger_func_%s" % str(uuid.uuid4())[1:8] + self.trigger_name = "event_trigger_delete_%s" % ( + str(uuid.uuid4())[1:8]) + server_con = server_utils.connect_server(self, self.server_id) + if not server_con["info"] == "Server connected.": + raise Exception("Could not connect to server to add resource " + "groups.") + server_version = 0 + if "type" in server_con["data"]: + if server_con["data"]["version"] < 90300: + message = "Event triggers are not supported by PG9.2 " \ + "and PPAS9.2 and below." + self.skipTest(message) + self.function_info = trigger_funcs_utils.create_trigger_function( + self.server, self.db_name, self.schema_name, self.func_name, + server_version) + self.event_trigger_id = event_trigger_utils.create_event_trigger( + self.server, self.db_name, self.schema_name, self.func_name, + self.trigger_name) + + def get_event_trigger_functions(self): + """ + This function returns event trigger functions + :return: event trigger functions + """ + return self.tester.get( + self.url + str(utils.SERVER_GROUP) + '/' + + str(self.server_id) + '/' + + str(self.db_id) + '/' + + str(self.event_trigger_id), + follow_redirects=True) + + def runTest(self): + """ This function will delete event trigger under test database. """ + db_con = database_utils.connect_database(self, utils.SERVER_GROUP, + self.server_id, self.db_id) + if not db_con['data']["connected"]: + raise Exception("Could not connect to database.") + schema_response = schema_utils.verify_schemas(self.server, + self.db_name, + self.schema_name) + if not schema_response: + raise Exception("Could not find the schema.") + func_name = self.function_info[1] + func_response = trigger_funcs_utils.verify_trigger_function( + self.server, + self.db_name, + func_name) + if not func_response: + raise Exception("Could not find the trigger function.") + trigger_response = event_trigger_utils.verify_event_trigger( + self.server, self.db_name, + self.trigger_name) + if not trigger_response: + raise Exception("Could not find event trigger.") + + if self.is_positive_test: + response = self.get_event_trigger_functions() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + else: + with patch(self.mock_data["function_name"], + return_value=eval(self.mock_data["return_value"])): + response = self.get_event_trigger_functions() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + + self.assertEquals(actual_response_code, expected_response_code) + + def tearDown(self): + # Disconnect the database + database_utils.disconnect_database(self, self.server_id, self.db_id) diff --git a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_get.py b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_get.py index fcd695803..2b299d151 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_get.py +++ b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_get.py @@ -8,7 +8,7 @@ ########################################################################## import uuid - +import sys from pgadmin.browser.server_groups.servers.databases.schemas.tests import \ utils as schema_utils from pgadmin.browser.server_groups.servers.databases.tests import \ @@ -20,14 +20,16 @@ from regression import trigger_funcs_utils from regression.python_test_utils import test_utils as utils from . import utils as event_trigger_utils +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch + class EventTriggerGetTestCase(BaseTestGenerator): """ This class will fetch added event trigger under test database. """ - scenarios = [ - # Fetching default URL for event trigger node. - ('Fetch Event Trigger Node URL', - dict(url='/browser/event_trigger/obj/')) - ] + scenarios = utils.generate_scenarios('get_event_trigger', + event_trigger_utils.test_cases) def setUp(self): self.schema_data = parent_node_dict['schema'][-1] @@ -57,6 +59,30 @@ class EventTriggerGetTestCase(BaseTestGenerator): self.server, self.db_name, self.schema_name, self.func_name, self.trigger_name) + def get_event_trigger(self): + """ + This functions returns the event trigger details + :return: event trigger details + """ + return self.tester.get( + self.url + + str(utils.SERVER_GROUP) + '/' + str(self.server_id) + '/' + + str(self.db_id) + '/' + str(self.event_trigger_id), + content_type='html/json' + ) + + def get_event_trigger_list(self): + """ + This functions returns the event trigger list + :return: event trigger list + """ + return self.tester.get( + self.url + + str(utils.SERVER_GROUP) + '/' + str(self.server_id) + '/' + + str(self.db_id) + '/', + content_type='html/json' + ) + def runTest(self): """ This function will fetch added event trigger under test database. """ @@ -76,13 +102,138 @@ class EventTriggerGetTestCase(BaseTestGenerator): func_name) if not func_response: raise Exception("Could not find the trigger function.") - response = self.tester.get( + + actual_response_code = True + expected_response_code = False + if self.is_positive_test: + if hasattr(self, "event_trigger_list"): + response = self.get_event_trigger_list() + else: + response = self.get_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + else: + if hasattr(self, "error_fetching_event_trigger"): + with patch(self.mock_data["function_name"], + return_value=eval(self.mock_data["return_value"])): + if hasattr(self, "event_trigger_list"): + response = self.get_event_trigger_list() + else: + response = self.get_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + if hasattr(self, "wrong_event_trigger_id"): + self.event_trigger_id = 99999 + response = self.get_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + + self.assertEquals(actual_response_code, expected_response_code) + + def tearDown(self): + # Disconnect the database + database_utils.disconnect_database(self, self.server_id, self.db_id) + + +class EventTriggerGetNodesAndNodeTestCase(BaseTestGenerator): + """ This class will fetch added event trigger under test database. """ + scenarios = utils.generate_scenarios('get_event_trigger_nodes_and_node', + event_trigger_utils.test_cases) + + def setUp(self): + self.schema_data = parent_node_dict['schema'][-1] + self.server_id = self.schema_data['server_id'] + self.db_id = self.schema_data['db_id'] + self.schema_name = self.schema_data['schema_name'] + self.schema_id = self.schema_data['schema_id'] + self.extension_name = "postgres_fdw" + self.db_name = parent_node_dict["database"][-1]["db_name"] + self.db_user = self.server["username"] + self.func_name = "trigger_func_%s" % str(uuid.uuid4())[1:8] + self.trigger_name = "event_trigger_get_%s" % (str(uuid.uuid4())[1:8]) + server_con = server_utils.connect_server(self, self.server_id) + if not server_con["info"] == "Server connected.": + raise Exception("Could not connect to server to add resource " + "groups.") + server_version = 0 + if "type" in server_con["data"]: + if server_con["data"]["version"] < 90300: + message = "Event triggers are not supported by PG9.2 " \ + "and PPAS9.2 and below." + self.skipTest(message) + self.function_info = trigger_funcs_utils.create_trigger_function( + self.server, self.db_name, self.schema_name, self.func_name, + server_version) + self.event_trigger_id = event_trigger_utils.create_event_trigger( + self.server, self.db_name, self.schema_name, self.func_name, + self.trigger_name) + + def get_event_trigger_nodes(self): + """ + This function returns the event trigger nodes details + :return: event trigger nodes details + """ + return self.tester.get( + self.url + + str(utils.SERVER_GROUP) + '/' + str(self.server_id) + '/' + + str(self.db_id) + '/', + content_type='html/json' + ) + + def get_event_trigger_node(self): + """ + This function returns the event trigger node details + :return: event trigger node details + """ + return self.tester.get( self.url + str(utils.SERVER_GROUP) + '/' + str(self.server_id) + '/' + str(self.db_id) + '/' + str(self.event_trigger_id), content_type='html/json' ) - self.assertEquals(response.status_code, 200) + + def runTest(self): + """ This function will fetch added event trigger under test database. + """ + db_con = database_utils.connect_database(self, utils.SERVER_GROUP, + self.server_id, self.db_id) + if not db_con['data']["connected"]: + raise Exception("Could not connect to database.") + schema_response = schema_utils.verify_schemas(self.server, + self.db_name, + self.schema_name) + if not schema_response: + raise Exception("Could not find the schema.") + func_name = self.function_info[1] + func_response = trigger_funcs_utils.verify_trigger_function( + self.server, + self.db_name, + func_name) + if not func_response: + raise Exception("Could not find the trigger function.") + + actual_response_code = True + expected_response_code = False + + if self.is_positive_test: + if hasattr(self, "node"): + response = self.get_event_trigger_node() + else: + response = self.get_event_trigger_nodes() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + else: + if hasattr(self, "error_fetching_event_trigger"): + with patch(self.mock_data["function_name"], + return_value=eval(self.mock_data["return_value"])): + if hasattr(self, "node"): + response = self.get_event_trigger_node() + else: + response = self.get_event_trigger_nodes() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + + self.assertEquals(actual_response_code, expected_response_code) def tearDown(self): # Disconnect the database diff --git a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_put.py b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_put.py index 467ceab1c..79f8ee67b 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_put.py +++ b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_put.py @@ -9,6 +9,7 @@ import json import uuid +import sys from pgadmin.browser.server_groups.servers.databases.schemas.tests import \ utils as schema_utils @@ -21,14 +22,16 @@ from regression import trigger_funcs_utils from regression.python_test_utils import test_utils as utils from . import utils as event_trigger_utils +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch + class EventTriggerPutTestCase(BaseTestGenerator): """ This class will fetch added event trigger under test database. """ - scenarios = [ - # Fetching default URL for event trigger node. - ('Fetch Event Trigger Node URL', - dict(url='/browser/event_trigger/obj/')) - ] + scenarios = utils.generate_scenarios('update_event_trigger', + event_trigger_utils.test_cases) def setUp(self): self.schema_data = parent_node_dict['schema'][-1] @@ -58,6 +61,18 @@ class EventTriggerPutTestCase(BaseTestGenerator): self.server, self.db_name, self.schema_name, self.func_name, self.trigger_name) + def update_event_trigger(self): + """ + This functions update event trigger details + :return: Event trigger update request details + """ + return self.tester.put( + self.url + str(utils.SERVER_GROUP) + '/' + + str(self.server_id) + '/' + str(self.db_id) + + '/' + str(self.event_trigger_id), + data=json.dumps(self.test_data), + follow_redirects=True) + def runTest(self): """ This function will update event trigger under test database. """ db_con = database_utils.connect_database(self, utils.SERVER_GROUP, @@ -80,17 +95,33 @@ class EventTriggerPutTestCase(BaseTestGenerator): self.server, self.db_name, self.trigger_name) if not trigger_response: raise Exception("Could not find event trigger.") - data = { - "comment": "This is event trigger update comment", - "id": self.event_trigger_id - } - put_response = self.tester.put( - self.url + str(utils.SERVER_GROUP) + '/' + - str(self.server_id) + '/' + str(self.db_id) + - '/' + str(self.event_trigger_id), - data=json.dumps(data), - follow_redirects=True) - self.assertEquals(put_response.status_code, 200) + self.test_data['id'] = self.event_trigger_id + actual_response_code = True + expected_response_code = False + if self.is_positive_test: + response = self.update_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + else: + if hasattr(self, "error_updating_event_trigger"): + with patch(self.mock_data["function_name"], + return_value=eval(self.mock_data["return_value"])): + response = self.update_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + if hasattr(self, "wrong_event_trigger_id"): + self.event_trigger_id = 99999 + response = self.update_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + if hasattr(self, "error_in_db"): + with patch(self.mock_data["function_name"], + return_value=eval(self.mock_data["return_value"])): + response = self.update_event_trigger() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + + self.assertEquals(actual_response_code, expected_response_code) def tearDown(self): # Disconnect the database diff --git a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_sql.py b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_sql.py new file mode 100644 index 000000000..711d1d0d3 --- /dev/null +++ b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/test_event_trigger_sql.py @@ -0,0 +1,134 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2020, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +import uuid +import sys +from pgadmin.browser.server_groups.servers.databases.schemas.tests import \ + utils as schema_utils +from pgadmin.browser.server_groups.servers.databases.tests import \ + utils as database_utils +from pgadmin.utils import server_utils as server_utils +from pgadmin.utils.route import BaseTestGenerator +from regression import parent_node_dict +from regression import trigger_funcs_utils +from regression.python_test_utils import test_utils as utils +from . import utils as event_trigger_utils + +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch + + +class EventTriggerGetSqlTestCase(BaseTestGenerator): + """ This class will fetch added event trigger sql under test database. """ + scenarios = utils.generate_scenarios('get_event_trigger_sql', + event_trigger_utils.test_cases) + + def setUp(self): + self.schema_data = parent_node_dict['schema'][-1] + self.server_id = self.schema_data['server_id'] + self.db_id = self.schema_data['db_id'] + self.schema_name = self.schema_data['schema_name'] + self.schema_id = self.schema_data['schema_id'] + self.extension_name = "postgres_fdw" + self.db_name = parent_node_dict["database"][-1]["db_name"] + self.db_user = self.server["username"] + self.func_name = "trigger_func_%s" % str(uuid.uuid4())[1:8] + self.trigger_name = "event_trigger_get_%s" % (str(uuid.uuid4())[1:8]) + server_con = server_utils.connect_server(self, self.server_id) + if not server_con["info"] == "Server connected.": + raise Exception("Could not connect to server to add resource " + "groups.") + server_version = 0 + if "type" in server_con["data"]: + if server_con["data"]["version"] < 90300: + message = "Event triggers are not supported by PG9.2 " \ + "and PPAS9.2 and below." + self.skipTest(message) + self.function_info = trigger_funcs_utils.create_trigger_function( + self.server, self.db_name, self.schema_name, self.func_name, + server_version) + self.event_trigger_id = event_trigger_utils.create_event_trigger( + self.server, self.db_name, self.schema_name, self.func_name, + self.trigger_name) + + def get_event_trigger_sql(self): + """ + this function fetch event trigger sql + :return: event trigger sql details + """ + return self.tester.get( + self.url + + str(utils.SERVER_GROUP) + '/' + str(self.server_id) + '/' + + str(self.db_id) + '/' + str(self.event_trigger_id), + content_type='html/json' + ) + + def get_event_trigger_msql(self): + """ + This function returns the modified sql + :return: event trigger msql details + """ + return self.tester.get( + self.url + + str(utils.SERVER_GROUP) + '/' + str(self.server_id) + '/' + + str(self.db_id) + '/', + content_type='html/json' + ) + + def runTest(self): + """ This function will fetch added event trigger under test database. + """ + db_con = database_utils.connect_database(self, utils.SERVER_GROUP, + self.server_id, self.db_id) + if not db_con['data']["connected"]: + raise Exception("Could not connect to database.") + schema_response = schema_utils.verify_schemas(self.server, + self.db_name, + self.schema_name) + if not schema_response: + raise Exception("Could not find the schema.") + func_name = self.function_info[1] + func_response = trigger_funcs_utils.verify_trigger_function( + self.server, + self.db_name, + func_name) + if not func_response: + raise Exception("Could not find the trigger function.") + + actual_response_code = True, + expected_response_code = False + if self.is_positive_test or hasattr(self, "msql"): + response = self.get_event_trigger_sql() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + else: + if hasattr(self, "error_fetching_event_trigger"): + with patch(self.mock_data["function_name"], + return_value=eval(self.mock_data["return_value"])): + response = self.get_event_trigger_sql() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + if hasattr(self, "error_fetching_event_trigger_db"): + with patch(self.mock_data["function_name"], + return_value=eval(self.mock_data["return_value"])): + response = self.get_event_trigger_sql() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + if hasattr(self, "error_fetching_event_trigger_msql"): + response = self.get_event_trigger_msql() + actual_response_code = response.status_code + expected_response_code = self.expected_data['status_code'] + + self.assertEquals(actual_response_code, expected_response_code) + + def tearDown(self): + # Disconnect the database + database_utils.disconnect_database(self, self.server_id, self.db_id) diff --git a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/utils.py index 1e147123f..94ae931cc 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/utils.py +++ b/web/pgadmin/browser/server_groups/servers/databases/event_triggers/tests/utils.py @@ -11,8 +11,15 @@ from __future__ import print_function import sys import traceback +import os +import json from regression.python_test_utils.test_utils import get_db_connection +from regression.python_test_utils import test_utils as utils + +CURRENT_PATH = os.path.dirname(os.path.realpath(__file__)) +with open(CURRENT_PATH + "/event_triggers_test_data.json") as data_file: + test_cases = json.load(data_file) def create_event_trigger(server, db_name, schema_name, func_name, @@ -89,3 +96,34 @@ def verify_event_trigger(server, db_name, trigger_name): return event_trigger except Exception: traceback.print_exc(file=sys.stderr) + + +def verify_event_trigger_node(self): + """ + This function verifies the event trigger is present in the database + :param self: server details + :return event_trigger: event trigger's expected details + :rtype event_trigger: dict + """ + try: + connection = get_db_connection(self.db_name, + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode']) + pg_cursor = connection.cursor() + pg_cursor.execute("SELECT evtenabled," + "evtevent, " + "(select rolname from pg_authid where oid " + "= pl.evtowner) as evtowner," + " evtname from pg_event_trigger pl " + "WHERE evtname = '%s'" % self.test_data['name']) + + event_trigger = pg_cursor.fetchone() + expected_output = utils.create_expected_output( + self.parameters_to_compare, list(event_trigger)) + connection.close() + return expected_output + except Exception: + traceback.print_exc(file=sys.stderr) diff --git a/web/pgadmin/browser/server_groups/servers/databases/languages/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/languages/tests/utils.py index f5cc9c35c..f0f9b9239 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/languages/tests/utils.py +++ b/web/pgadmin/browser/server_groups/servers/databases/languages/tests/utils.py @@ -14,6 +14,7 @@ import traceback import os import json from regression.python_test_utils.test_utils import get_db_connection +from regression.python_test_utils import test_utils as utils CURRENT_PATH = os.path.dirname(os.path.realpath(__file__)) with open(CURRENT_PATH + "/language_test_data.json") as data_file: @@ -83,8 +84,9 @@ def verify_language(self): "from pg_language pl where lanname='%s'" % self.data["name"]) language = pg_cursor.fetchall() - expected_output = make_dict(self.parameters_to_compare, - list(language[0])) + expected_output = utils.create_expected_output( + self.parameters_to_compare, + list(language[0])) connection.close() diff --git a/web/regression/python_test_utils/test_utils.py b/web/regression/python_test_utils/test_utils.py index 88cbfb223..971fc2403 100644 --- a/web/regression/python_test_utils/test_utils.py +++ b/web/regression/python_test_utils/test_utils.py @@ -1198,3 +1198,21 @@ def generate_scenarios(key, test_cases): tup = (test_name, dict(scenario)) scenarios.append(tup) return scenarios + + +def create_expected_output(parameters, actual_data): + """ + This function creates the dict using given parameter and actual data + :param parameters: + :param actual_data: + :return: expected data + :type: dict + """ + expected_output = {} + + for key in parameters: + for value in actual_data: + expected_output[key] = value + actual_data.remove(value) + break + return expected_output