Improve code coverage and API test cases for the CAST module. Fixes #5049.

pull/27/head
Yogesh Mahajan 2020-01-16 14:34:51 +05:30 committed by Akshay Joshi
parent 8c3bba65e5
commit 4ab3bbeb82
13 changed files with 1472 additions and 114 deletions

View File

@ -14,6 +14,7 @@ New features
Housekeeping
************
| `Issue #5049 <https://redmine.postgresql.org/issues/5049>`_ - Improve code coverage and API test cases for the CAST module.
| `Issue #5071 <https://redmine.postgresql.org/issues/5071>`_ - Improve the test framework to run for multiple classes defined in a single file.
| `Issue #5072 <https://redmine.postgresql.org/issues/5072>`_ - Updated wcDocker package which includes aria-label accessibility improvements.

View File

@ -0,0 +1,763 @@
{
"cast_create": [
{
"name": "TC_01 - Create cast: With valid source & target type of implicit_type.",
"is_positive_test": true,
"inventory_data": {},
"test_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
}
},
{
"name": "TC_02 - Create cast: With valid source & target type of explict_type.",
"is_positive_test": true,
"inventory_data": {},
"test_data": {
"castcontext": "EXPLICIT",
"encoding": "UTF8",
"name": "timestamp with time zone->bigint",
"srctyp": "timestamp with time zone",
"trgtyp": "bigint"
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
}
},
{
"name": "TC_03 - Create cast: With invalid source type.",
"is_positive_test": false,
"inventory_data": {},
"test_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money123",
"trgtyp": "bigint"
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 500,
"error_msg": "ERROR: type \"money123\" does not exist",
"test_result_data": {}
}
},
{
"name": "TC_04 - Create cast: With insufficient parameters missing target type.",
"is_positive_test": false,
"inventory_data": {},
"test_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money"
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 410,
"error_msg": "Could not find the required parameter (trgtyp)",
"test_result_data": {}
}
},
{
"name": "TC_05 - Create cast: With valid data while server down",
"is_positive_test": false,
"inventory_data": {},
"test_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
}
},
{
"name": "TC_06 - Create cast: With valid data while exception.",
"is_positive_test": false,
"inventory_data": {},
"test_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "Exception('Mocked Exception Message')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Exception Message",
"test_result_data": {}
}
}
],
"cast_create_get_functions": [
{
"name": "TC_01 - From create cast dialogue, get available cast functions for valid source & target type",
"is_positive_test": true,
"inventory_data": {},
"test_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "integer->bigint",
"srctyp": "integer",
"trgtyp": "bigint"
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
}
},
{
"name": "TC_02 - From create cast dialogue, get available cast functions when server is down",
"is_positive_test": false,
"inventory_data": {},
"test_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "integer->bigint",
"srctyp": "integer",
"trgtyp": "bigint"
},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
}
}
],
"cast_create_get_type": [
{
"name": "TC_01 - From create cast dialogue get available cast types",
"is_positive_test": true,
"inventory_data": {},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
}
},
{
"name": "TC_02 - From create cast dialogue get available cast types while server is down",
"is_positive_test": false,
"inventory_data": {},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
}
}
],
"cast_delete": [
{
"name": "TC_01 - Delete existing cast using cast id",
"is_positive_test": true,
"inventory_data": {
"castcontext": "EXPLICIT",
"encoding": "UTF8",
"name": "timestamp with time zone->bigint",
"srctyp": "timestamp with time zone",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
}
},
{
"name": "TC_02 - Delete non-existing cast using cast id",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 410,
"error_msg": "",
"test_result_data": {}
}
},
{
"name": "TC_03 - Delete existing cast using cast id while server down",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
}
}
],
"cast_delete_multiple": [
{
"name": "TC_01 - Delete multiple existing casts using cast ids",
"is_positive_test": true,
"inventory_data": {
"castcontext": "EXPLICIT",
"encoding": "UTF8",
"name": "timestamp with time zone->bigint",
"srctyp": "timestamp with time zone",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
}
},
{
"name": "TC_02 - Delete multiple existing casts using cast ids while server down",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
}
},
{
"name": "TC_03 - Delete multiple existing casts using cast ids while exception",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "Exception('Mocked Exception Message')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Exception Message",
"test_result_data": {}
}
}
],
"cast_get": [
{
"name": "TC_01 - Get cast details: With existing cast id.",
"is_positive_test": true,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": false
},
{
"name": "TC_02 - Get casts list: With existing db id.",
"is_positive_test": true,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": true
},
{
"name": "TC_03 - Get cast details: With non existing db id",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 410,
"error_msg": null,
"test_result_data": {}
},
"is_list": false
},
{
"name": "TC_04 - Get cast details: With existing cast id while server is down.",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
},
"is_list": false
},
{
"name": "TC_05 - Get casts list: With existing db id while server is down.",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
},
"is_list": true
}
],
"cast_get_dependencies_dependants": [
{
"name": "TC_01 - Get cast dependents with existing cast id",
"is_positive_test": true,
"inventory_data": {"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_dependant": true
},
{
"name": "TC_02 - Get cast dependencies with existing cast id",
"is_positive_test": true,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_dependant": true
}
],
"cast_get_node": [
{
"name": "TC_01 - Get cast node details: With existing cast id.",
"is_positive_test": true,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": false
},
{
"name": "TC_02 - Get casts nodes list: With existing db id.",
"is_positive_test": true,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": true
},
{
"name": "TC_03 - Get cast node details: With non existing db id",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 410,
"error_msg": null,
"test_result_data": {}
},
"is_list": false
},
{
"name": "TC_04 - Get cast node details: With existing cast id while server is down.",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
},
"is_list": false
},
{
"name": "TC_05 - Get casts list: With existing db id while server is down.",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
},
"is_list": true
}
],
"cast_get_sql": [
{
"name": "TC_01 - Get cast sql for existing cast id",
"is_positive_test": true,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
}
},
{
"name": "TC_02 - Get cast sql for non-existing cast id",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 500,
"error_msg": " ",
"test_result_data": {}
}
},
{
"name": "TC_03 - Get cast sql for existing cast id while server is down",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
}
},
{
"name": "TC_04 - Get cast sql for existing cast id while exception",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "Exception('Mocked Exception Message')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Exception Message",
"test_result_data": {}
}
}
],
"cast_put": [
{
"name": "TC_01 - Update existing cast with valid parameter Value ",
"is_positive_test": true,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {
"description": "This is cast update comment"
},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
}
},
{
"name": "TC_02 - Update existing cast with invalid parameter value ",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {
"trgtyp": "bigint344"
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 500,
"error_msg": null,
"test_result_data": {}
}
},
{
"name": "TC_03 - Update existing cast with valid parameter value while server is down",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
}
},
{
"name": "TC_04 - Update non-existing cast with valid parameter value",
"is_positive_test": false,
"inventory_data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"test_data": {},
"mocking_required": false,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
}
}
]
}

View File

@ -0,0 +1,76 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from __future__ import print_function
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as cast_utils
import sys
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class CastsCreateTestCase(BaseTestGenerator):
skip_on_database = ['gpdb']
url = '/browser/cast/obj/'
# Generates scenarios from cast_test_data.json file
scenarios = cast_utils.generate_scenarios("cast_create")
def setUp(self):
""" This function will get data required to create cast."""
super(CastsCreateTestCase, self).runTest()
self.data = self.test_data
def runTest(self):
""" This function will add cast under test database. """
self.server_data = parent_node_dict["database"][-1]
self.server_id = self.server_data["server_id"]
self.db_id = self.server_data['db_id']
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
self.server_id,
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
if self.is_positive_test:
response = cast_utils.api_create_cast(self)
cast_utils.assert_status_code(self, response)
cast_utils.assert_cast_created(self)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
response = cast_utils.api_create_cast(self)
cast_utils.assert_status_code(self, response)
cast_utils.assert_error_message(self, response)
else:
response = cast_utils.api_create_cast(self)
cast_utils.assert_status_code(self, response)
def tearDown(self):
"""This function disconnect the test database and drop added cast."""
if self.is_positive_test:
connection = utils.get_db_connection(self.server_data['db_name'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
cast_utils.drop_cast(connection, self.data["srctyp"],
self.data["trgtyp"])
database_utils.disconnect_database(self, self.server_id,
self.db_id)

View File

@ -0,0 +1,62 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from __future__ import print_function
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as cast_utils
import sys
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class CastsCreateGetFunctionsTestCase(BaseTestGenerator):
skip_on_database = ['gpdb']
url = '/browser/cast/'
scenarios = cast_utils.generate_scenarios("cast_create_get_functions")
def runTest(self):
""" This function will add cast under test database. """
super(CastsCreateGetFunctionsTestCase, self).runTest()
self.data = self.test_data
self.server_data = parent_node_dict["database"][-1]
self.server_id = self.server_data["server_id"]
self.db_id = self.server_data['db_id']
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
self.server_id,
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
if self.is_positive_test:
response = cast_utils.api_create_cast_get_functions(self)
cast_utils.assert_status_code(self, response)
else:
if self.mocking_required:
return_value_object = eval(self.mock_data["return_value"])
with patch(self.mock_data["function_name"],
side_effect=[return_value_object]):
response = cast_utils.api_create_cast_get_functions(self)
cast_utils.assert_status_code(self, response)
cast_utils.assert_error_message(self, response)
def tearDown(self):
"""This function disconnect the test database and drop added cast."""
if self.is_positive_test:
database_utils.disconnect_database(self, self.server_id,
self.db_id)

View File

@ -8,9 +8,6 @@
##########################################################################
from __future__ import print_function
import json
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.utils.route import BaseTestGenerator
@ -18,17 +15,22 @@ from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as cast_utils
import sys
class CastsAddTestCase(BaseTestGenerator):
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class CastsCreateGettypeTestCase(BaseTestGenerator):
skip_on_database = ['gpdb']
scenarios = [
# Fetching default URL for cast node.
('Check Cast Node', dict(url='/browser/cast/obj/'))
]
url = '/browser/cast/'
scenarios = cast_utils.generate_scenarios("cast_create_get_type")
def runTest(self):
""" This function will add cast under test database. """
super(CastsAddTestCase, self).runTest()
super(CastsCreateGettypeTestCase, self).runTest()
self.server_data = parent_node_dict["database"][-1]
self.server_id = self.server_data["server_id"]
self.db_id = self.server_data['db_id']
@ -39,24 +41,18 @@ class CastsAddTestCase(BaseTestGenerator):
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
self.data = cast_utils.get_cast_data()
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(
self.db_id) + '/',
data=json.dumps(self.data),
content_type='html/json')
self.assertEquals(response.status_code, 200)
if self.is_positive_test:
response = cast_utils.api_create_cast_get_type(self)
cast_utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
response = cast_utils.api_create_cast_get_type(self)
cast_utils.assert_status_code(self, response)
cast_utils.assert_error_message(self, response)
def tearDown(self):
"""This function disconnect the test database and drop added cast."""
connection = utils.get_db_connection(self.server_data['db_name'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
cast_utils.drop_cast(connection, self.data["srctyp"],
self.data["trgtyp"])
""" This function disconnect the test database and drop added cast. """
database_utils.disconnect_database(self, self.server_id,
self.db_id)

View File

@ -16,25 +16,30 @@ from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as cast_utils
import sys
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class CastsDeleteTestCase(BaseTestGenerator):
url = '/browser/cast/obj/'
""" This class will delete the cast node added under database node. """
skip_on_database = ['gpdb']
scenarios = [
# Fetching default URL for cast node.
('Check Cast Node', dict(url='/browser/cast/obj/'))
]
scenarios = cast_utils.generate_scenarios("cast_delete")
def setUp(self):
super(CastsDeleteTestCase, self).setUp()
self.inv_data = self.inventory_data
self.default_db = self.server["db"]
self.database_info = parent_node_dict['database'][-1]
self.db_name = self.database_info['db_name']
self.server["db"] = self.db_name
self.source_type = 'money'
self.target_type = 'bigint'
self.cast_id = cast_utils.create_cast(self.server, self.source_type,
self.target_type)
self.cast_id = cast_utils.create_cast(self.server,
self.inv_data["srctyp"],
self.inv_data["trgtyp"])
def runTest(self):
""" This function will delete added cast."""
@ -52,18 +57,32 @@ class CastsDeleteTestCase(BaseTestGenerator):
self.server['host'],
self.server['port'],
self.server['sslmode'])
response = cast_utils.verify_cast(connection, self.source_type,
self.target_type)
if len(response) == 0:
casts_exists = cast_utils.verify_cast(connection,
self.inv_data["srctyp"],
self.inv_data["trgtyp"])
if not casts_exists:
raise Exception("Could not find cast.")
delete_response = self.tester.delete(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/' + str(self.cast_id),
follow_redirects=True)
self.assertEquals(delete_response.status_code, 200)
if self.is_positive_test:
response = cast_utils.api_delete_cast(self, self.cast_id)
cast_utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
response = cast_utils.api_delete_cast(self, self.cast_id)
cast_utils.assert_status_code(self, response)
cast_utils.assert_error_message(self, response)
else:
response = cast_utils.api_delete_cast(self, 12398)
cast_utils.assert_status_code(self, response)
def tearDown(self):
""" Actually Delete cast """
if not self.is_positive_test:
cast_utils.api_delete_cast(self, self.cast_id)
"""This function will disconnect test database."""
database_utils.disconnect_database(self, self.server_id,
self.db_id)

View File

@ -17,25 +17,30 @@ from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as cast_utils
import sys
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class CastsMultipleDeleteTestCase(BaseTestGenerator):
""" This class will delete the cast node added under database node. """
skip_on_database = ['gpdb']
scenarios = [
# Fetching default URL for cast node.
('Check Cast Node', dict(url='/browser/cast/obj/'))
]
url = '/browser/cast/obj/'
scenarios = cast_utils.generate_scenarios("cast_delete_multiple")
def setUp(self):
super(CastsMultipleDeleteTestCase, self).setUp()
self.inv_data = self.inventory_data
self.default_db = self.server["db"]
self.database_info = parent_node_dict['database'][-1]
self.db_name = self.database_info['db_name']
self.server["db"] = self.db_name
self.source_type = 'money'
self.target_type = 'bigint'
self.cast_id = cast_utils.create_cast(self.server, self.source_type,
self.target_type)
self.cast_id = cast_utils.create_cast(self.server,
self.inv_data["srctyp"],
self.inv_data["trgtyp"])
def runTest(self):
""" This function will delete added cast."""
@ -53,20 +58,32 @@ class CastsMultipleDeleteTestCase(BaseTestGenerator):
self.server['host'],
self.server['port'],
self.server['sslmode'])
response = cast_utils.verify_cast(connection, self.source_type,
self.target_type)
if len(response) == 0:
casts_exists = cast_utils.verify_cast(connection,
self.inv_data["srctyp"],
self.inv_data["trgtyp"])
if not casts_exists:
raise Exception("Could not find cast.")
delete_response = self.tester.delete(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/',
data=json.dumps({'ids': [self.cast_id]}),
content_type='html/json',
follow_redirects=True)
self.assertEquals(delete_response.status_code, 200)
if self.is_positive_test:
response = cast_utils.api_delete_casts(self, [self.cast_id])
cast_utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
response = cast_utils.api_delete_casts(self,
[self.cast_id])
cast_utils.assert_status_code(self, response)
cast_utils.assert_error_message(self, response)
else:
response = cast_utils.api_delete_casts(self, [self.cast_id])
cast_utils.assert_status_code(self, response)
def tearDown(self):
""" Actually Delete cast """
if not self.is_positive_test:
cast_utils.api_delete_casts(self, [self.cast_id])
"""This function will disconnect test database."""
database_utils.disconnect_database(self, self.server_id,
self.db_id)

View File

@ -8,7 +8,6 @@
##########################################################################
from __future__ import print_function
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.utils.route import BaseTestGenerator
@ -16,26 +15,31 @@ from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as cast_utils
import sys
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class CastsGetTestCase(BaseTestGenerator):
""" This class will fetch the cast node added under database node. """
skip_on_database = ['gpdb']
scenarios = [
# Fetching default URL for cast node.
('Check Cast Node', dict(url='/browser/cast/obj/'))
]
url = '/browser/cast/obj/'
scenarios = cast_utils.generate_scenarios("cast_get")
def setUp(self):
""" This function will create cast."""
super(CastsGetTestCase, self).setUp()
self.inv_data = self.inventory_data
self.default_db = self.server["db"]
self.database_info = parent_node_dict['database'][-1]
self.db_name = self.database_info['db_name']
self.server["db"] = self.db_name
self.source_type = 'money'
self.target_type = 'bigint'
self.cast_id = cast_utils.create_cast(self.server, self.source_type,
self.target_type)
self.cast_id = cast_utils.create_cast(self.server,
self.inv_data["srctyp"],
self.inv_data["trgtyp"])
def runTest(self):
""" This function will fetch added cast."""
@ -47,12 +51,31 @@ class CastsGetTestCase(BaseTestGenerator):
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
response = self.tester.get(
self.url + str(utils.SERVER_GROUP) + '/' + str(
self.server_id) + '/' +
str(self.db_id) + '/' + str(self.cast_id),
content_type='html/json')
self.assertEquals(response.status_code, 200)
if self.is_positive_test:
if self.is_list:
response = cast_utils.api_get_cast(self, "")
cast_utils.assert_status_code(self, response)
else:
response = cast_utils.api_get_cast(self, self.cast_id)
cast_utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
if self.is_list:
response = cast_utils.api_get_cast(self, "")
cast_utils.assert_status_code(self, response)
cast_utils.assert_error_message(self, response)
else:
response = cast_utils.api_get_cast(self, self.cast_id)
cast_utils.assert_status_code(self, response)
else:
self.cast_id = 12893
response = cast_utils.api_get_cast(self, self.cast_id)
cast_utils.assert_status_code(self, response)
def tearDown(self):
"""This function disconnect the test database and drop added cast."""
@ -62,8 +85,8 @@ class CastsGetTestCase(BaseTestGenerator):
self.server['host'],
self.server['port'],
self.server['sslmode'])
cast_utils.drop_cast(connection, self.source_type,
self.target_type)
cast_utils.drop_cast(connection, self.inv_data["srctyp"],
self.inv_data["trgtyp"])
database_utils.disconnect_database(self, self.server_id,
self.db_id)
self.server['db'] = self.default_db

View File

@ -0,0 +1,80 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from __future__ import print_function
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as cast_utils
import sys
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class CastsGetDependentsAndDependencyTestCase(BaseTestGenerator):
""" This class will fetch the cast node added under database node. """
skip_on_database = ['gpdb']
url = '/browser/cast/'
scenarios = cast_utils.generate_scenarios(
"cast_get_dependencies_dependants")
def setUp(self):
""" This function will create cast."""
super(CastsGetDependentsAndDependencyTestCase, self).setUp()
self.inv_data = self.inventory_data
self.default_db = self.server["db"]
self.database_info = parent_node_dict['database'][-1]
self.db_name = self.database_info['db_name']
self.server["db"] = self.db_name
self.cast_id = cast_utils.create_cast(self.server,
self.inv_data["srctyp"],
self.inv_data["trgtyp"])
def runTest(self):
""" This function will fetch added cast."""
self.server_id = self.database_info["server_id"]
self.db_id = self.database_info['db_id']
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
self.server_id,
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
if self.is_positive_test:
if self.is_dependant:
response = cast_utils.api_get_cast_node_dependent(self)
cast_utils.assert_status_code(self, response)
else:
response = cast_utils.api_get_cast_node_dependencies(self)
cast_utils.assert_status_code(self, response)
# TODO
# Check weather to add -ve tests or not
# as -ve scenarios NOT handled in code
def tearDown(self):
"""This function disconnect the test database and drop added cast."""
connection = utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
cast_utils.drop_cast(connection, self.inv_data["srctyp"],
self.inv_data["trgtyp"])
database_utils.disconnect_database(self, self.server_id,
self.db_id)
self.server['db'] = self.default_db

View File

@ -0,0 +1,95 @@
from __future__ import print_function
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as cast_utils
import sys
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class CastsGetNodeTestCase(BaseTestGenerator):
""" This class will fetch the cast node added under database node. """
skip_on_database = ['gpdb']
url = '/browser/cast/nodes/'
scenarios = cast_utils.generate_scenarios("cast_get_node")
def setUp(self):
""" This function will create cast."""
super(CastsGetNodeTestCase, self).setUp()
self.inv_data = self.inventory_data
self.default_db = self.server["db"]
self.database_info = parent_node_dict['database'][-1]
self.db_name = self.database_info['db_name']
self.server["db"] = self.db_name
self.cast_id = cast_utils.create_cast(self.server,
self.inv_data["srctyp"],
self.inv_data["trgtyp"])
def runTest(self):
""" This function will fetch added cast."""
self.server_id = self.database_info["server_id"]
self.db_id = self.database_info['db_id']
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
self.server_id,
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
if self.is_positive_test:
if self.is_list:
response = cast_utils.api_get_cast_node(self, "")
cast_utils.assert_status_code(self, response)
else:
response = cast_utils.api_get_cast_node(self, self.cast_id)
cast_utils.assert_status_code(self, response)
else:
if self.mocking_required:
return_value_object = eval(self.mock_data["return_value"])
with patch(self.mock_data["function_name"],
side_effect=[return_value_object]):
if self.is_list:
response = cast_utils.api_get_cast_node(self, "")
cast_utils.assert_status_code(self, response)
# act_res = response.status_code
# exp_res = self.expected_data["status_code"]
# self.assertEquals(act_res, exp_res)
cast_utils.assert_error_message(self, response)
# act_res = response.json["errormsg"]
# exp_res = self.expected_data["error_msg"]
# self.assertEquals(act_res, exp_res)
else:
response = cast_utils.api_get_cast_node(self,
self.cast_id)
cast_utils.assert_status_code(self, response)
# act_res = response.status_code
# exp_res = self.expected_data["status_code"]
# self.assertEquals(act_res, exp_res)
else:
self.cast_id = 12893
response = cast_utils.api_get_cast(self, self.cast_id)
cast_utils.assert_status_code(self, response)
def tearDown(self):
"""This function disconnect the test database and drop added cast."""
connection = utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
cast_utils.drop_cast(connection, self.inv_data["srctyp"],
self.inv_data["trgtyp"])
database_utils.disconnect_database(self, self.server_id,
self.db_id)
self.server['db'] = self.default_db

View File

@ -0,0 +1,81 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from __future__ import print_function
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as cast_utils
import sys
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class CastsGetSqlTestCase(BaseTestGenerator):
""" This class will fetch the cast node added under database node. """
skip_on_database = ['gpdb']
url = '/browser/cast/'
scenarios = cast_utils.generate_scenarios("cast_get_sql")
def setUp(self):
""" This function will create cast."""
super(CastsGetSqlTestCase, self).setUp()
self.inv_data = self.inventory_data
self.default_db = self.server["db"]
self.database_info = parent_node_dict['database'][-1]
self.db_name = self.database_info['db_name']
self.server["db"] = self.db_name
self.cast_id = cast_utils.create_cast(self.server,
self.inv_data["srctyp"],
self.inv_data["trgtyp"])
def runTest(self):
""" This function will fetch added cast."""
self.server_id = self.database_info["server_id"]
self.db_id = self.database_info['db_id']
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
self.server_id,
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
if self.is_positive_test:
response = cast_utils.api_get_cast_sql(self)
cast_utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
response = cast_utils.api_get_cast_sql(self)
cast_utils.assert_status_code(self, response)
else:
self.cast_id = 34091
response = cast_utils.api_get_cast_sql(self)
cast_utils.assert_status_code(self, response)
def tearDown(self):
"""This function disconnect the test database and drop added cast."""
connection = utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
cast_utils.drop_cast(connection, self.inv_data["srctyp"],
self.inv_data["trgtyp"])
database_utils.disconnect_database(self, self.server_id,
self.db_id)
self.server['db'] = self.default_db

View File

@ -17,25 +17,31 @@ from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as cast_utils
import sys
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class CastsPutTestCase(BaseTestGenerator):
url = '/browser/cast/obj/'
""" This class will fetch the cast node added under database node. """
skip_on_database = ['gpdb']
scenarios = [
# Fetching default URL for cast node.
('Check Cast Node', dict(url='/browser/cast/obj/'))
]
scenarios = cast_utils.generate_scenarios("cast_put")
def setUp(self):
""" This function will create cast."""
super(CastsPutTestCase, self).setUp()
self.inv_data = self.inventory_data
self.data = self.test_data
self.default_db = self.server["db"]
self.database_info = parent_node_dict['database'][-1]
self.db_name = self.database_info['db_name']
self.server["db"] = self.db_name
self.source_type = 'character'
self.target_type = 'cidr'
self.source_type = self.inv_data["srctyp"]
self.target_type = self.inv_data["trgtyp"]
self.cast_id = cast_utils.create_cast(self.server, self.source_type,
self.target_type)
@ -54,22 +60,27 @@ class CastsPutTestCase(BaseTestGenerator):
self.server['db_password'],
self.server['host'],
self.server['port'])
response = cast_utils.verify_cast(connection, self.source_type,
casts_exists = cast_utils.verify_cast(connection, self.source_type,
self.target_type)
if len(response) == 0:
if not casts_exists:
raise Exception("Could not find cast.")
data = {
"description": "This is cast update comment",
"id": self.cast_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(
self.db_id) +
'/' + str(self.cast_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
self.data["id"] = self.cast_id
if self.is_positive_test:
response = cast_utils.api_update_cast(self)
cast_utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
response = cast_utils.api_update_cast(self)
cast_utils.assert_status_code(self, response)
cast_utils.assert_error_message(self, response)
else:
if len(self.data) == 1:
self.cast_id = 109822
response = cast_utils.api_update_cast(self)
cast_utils.assert_status_code(self, response)
def tearDown(self):
"""This function disconnect the test database and drop added cast."""

View File

@ -8,22 +8,153 @@
##########################################################################
from __future__ import print_function
from regression.python_test_utils.test_utils import *
import os
import json
import sys
import traceback
from regression.python_test_utils.test_utils import get_db_connection
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
with open(CURRENT_PATH + "/cast_test_data.json") as data_file:
test_cases = json.load(data_file)
def get_cast_data():
data = {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint",
}
return data
def generate_scenarios(key):
"""
This function generates the test case scenarios according to key given
to it, e.g. key=ADD, key=update etc.
:param key: for which operation generate the test scenario
:type key: str
:return: scenarios
:rtype: list
"""
scenarios = []
for scenario in test_cases[key]:
test_name = scenario["name"]
scenario.pop("name")
tup = (test_name, dict(scenario))
scenarios.append(tup)
return scenarios
def api_get_cast(self, cast_id):
return self.tester.get(
self.url + str(SERVER_GROUP) + '/' + str(
self.server_id) + '/' +
str(self.db_id) + '/' + str(cast_id),
content_type='html/json')
def api_create_cast(self):
return self.tester.post(
self.url + str(SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(
self.db_id) + '/',
data=json.dumps(self.data),
content_type='html/json')
def api_get_cast_node(self, cast_id):
return self.tester.get(
self.url + str(SERVER_GROUP) + '/' + str(
self.server_id) + '/' +
str(self.db_id) + '/' + str(cast_id),
content_type='html/json')
def api_create_cast_get_functions(self):
return self.tester.post(
self.url + 'get_functions/' + str(SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(
self.db_id) + '/',
data=json.dumps(self.data),
content_type='html/json')
def api_delete_cast(self, cast_id):
return self.tester.delete(
self.url + str(SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/' + str(cast_id),
follow_redirects=True)
def api_delete_casts(self, list_of_cast_ids):
return self.tester.delete(
self.url + str(SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) + '/',
data=json.dumps({'ids': list_of_cast_ids}),
content_type='html/json',
follow_redirects=True)
def api_create_cast_get_type(self):
return self.tester.get(
self.url + 'get_type/' + str(SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(
self.db_id) + '/',
content_type='html/json')
def api_get_cast_node_dependent(self):
return self.tester.get(
self.url + 'dependent/' + str(SERVER_GROUP) + '/' + str(
self.server_id) + '/' +
str(self.db_id) + '/' + str(self.cast_id),
content_type='html/json')
def api_get_cast_node_dependencies(self):
return self.tester.get(
self.url + 'dependency/' + str(SERVER_GROUP) + '/' + str(
self.server_id) + '/' +
str(self.db_id) + '/' + str(self.cast_id),
content_type='html/json')
def api_get_cast_sql(self):
return self.tester.get(
self.url + 'sql/' + str(SERVER_GROUP) + '/' + str(
self.server_id) + '/' +
str(self.db_id) + '/' + str(self.cast_id),
content_type='html/json')
def api_update_cast(self):
return self.tester.put(
self.url + str(SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(
self.db_id) +
'/' + str(self.cast_id),
data=json.dumps(self.data),
follow_redirects=True)
def get_database_connection(self):
return get_db_connection(self.server_data['db_name'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
def assert_status_code(self, response):
act_res = response.status_code
exp_res = self.expected_data["status_code"]
return self.assertEquals(act_res, exp_res)
def assert_error_message(self, response):
act_res = response.json["errormsg"]
exp_res = self.expected_data["error_msg"]
return self.assertEquals(act_res, exp_res)
def assert_cast_created(self):
source_type = self.data["srctyp"]
target_type = self.data["trgtyp"]
con = get_database_connection(self)
act_res = verify_cast(con, source_type, target_type)
return self.assertTrue(act_res)
def create_cast(server, source_type, target_type):
@ -82,7 +213,10 @@ def verify_cast(connection, source_type, target_type):
"format_type(t.oid, NULL) = '%s')" % (source_type, target_type))
casts = pg_cursor.fetchall()
connection.close()
return casts
if len(casts) > 0:
return True
else:
return False
except Exception:
traceback.print_exc(file=sys.stderr)