Add unit tests for schemas, collations and trigger functions.

pull/3/head
Dave Page 2016-08-16 12:54:02 +01:00
parent 67f481ab11
commit 99b4a0fe5b
21 changed files with 1773 additions and 19 deletions

View File

@ -0,0 +1,17 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.utils.route import BaseTestGenerator
class CollationTestGenerator(BaseTestGenerator):
def generate_tests(self):
return

View File

@ -0,0 +1,74 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as collation_utils
class CollationAddTestCase(BaseTestGenerator):
""" This class will add new collation under schema node. """
scenarios = [
# Fetching default URL for collation node.
('Default Node URL', dict(url='/browser/collation/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
4. Add the schemas
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schemas
schema_utils.add_schemas(cls.tester)
def runTest(self):
""" This function will add collation under schema node. """
collation_utils.add_collation(
self.tester, self.server_connect_response, self.server_ids)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added collations, schemas, database,
server and the 'parent_id.pkl' file which is created in setup()
function.
:return: None
"""
collation_utils.delete_collation(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,77 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as collation_utils
class CollationDeleteTestCase(BaseTestGenerator):
""" This class will delete added collation under schema node. """
scenarios = [
# Fetching default URL for collation node.
('Fetch collation Node URL', dict(url='/browser/collation/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
4. Add the schemas
5. Add the collations
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schemas
schema_utils.add_schemas(cls.tester)
# Add collations
collation_utils.add_collation(cls.tester, cls.server_connect_response,
cls.server_ids)
def runTest(self):
""" This function will delete collation under schema node. """
collation_utils.delete_collation(self.tester)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added schemas, database,
server and the 'parent_id.pkl' file which is created in setup()
function.
:return: None
"""
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,101 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as collation_utils
class CollationGetTestCase(BaseTestGenerator):
""" This class will fetch new collation under schema node. """
scenarios = [
# Fetching default URL for collation node.
('Fetch collation Node URL', dict(url='/browser/collation/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
4. Add the schemas
5. Add the collations
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schemas
schema_utils.add_schemas(cls.tester)
# Add collations
collation_utils.add_collation(cls.tester, cls.server_connect_response,
cls.server_ids)
def runTest(self):
""" This function will fetch collation under schema node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_ids_dict = all_id["scid"][0]
collation_ids_dict = all_id["coid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(self.tester,
utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
schema_info = schema_ids_dict[int(server_id)]
schema_response = schema_utils.verify_schemas(
self.tester, server_id, db_id, schema_info[0])
schema_response = json.loads(
schema_response.data.decode('utf-8'))
if len(schema_response) != 0:
collation_id = collation_ids_dict[int(server_id)]
get_response = collation_utils.verify_collation(
self.tester, server_id, db_id, schema_info[0],
collation_id)
self.assertEquals(get_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added collations, schemas, database,
server and the 'parent_id.pkl' file which is created in setup()
function.
:return: None
"""
collation_utils.delete_collation(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,125 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from regression.test_setup import advanced_config_data
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as collation_utils
class CollationPutTestCase(BaseTestGenerator):
""" This class will update added collation under schema node. """
scenarios = [
# Fetching default URL for collation node.
('Fetch collation Node URL', dict(url='/browser/collation/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
4. Add the schemas
5. Add the collations
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schemas
schema_utils.add_schemas(cls.tester)
# Add collations
collation_utils.add_collation(cls.tester, cls.server_connect_response,
cls.server_ids)
def runTest(self):
""" This function will update collation under schema node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_ids_dict = all_id["scid"][0]
collation_ids_dict = all_id["coid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(self.tester,
utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
schema_info = schema_ids_dict[int(server_id)]
schema_response = schema_utils.verify_schemas(self.tester,
server_id,
db_id,
schema_info[0])
schema_response = json.loads(
schema_response.data.decode('utf-8'))
if len(schema_response) != 0:
collation_id = collation_ids_dict[int(server_id)]
get_response = collation_utils.verify_collation(
self.tester, server_id, db_id, schema_info[0],
collation_id)
get_response_data = json.loads(
get_response.data.decode('utf-8'))
if len(get_response_data) == 0:
raise Exception("No collation node to update.")
data = {
"description":
advanced_config_data['collation_update_data']
['comment'],
"id": collation_id,
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' +
str(db_id) + '/' +
str(schema_info[0]) + '/' +
str(collation_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added collations, schemas, database,
server and the 'parent_id.pkl' file which is created in setup()
function.
:return: None
"""
collation_utils.delete_collation(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,153 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
import json
import os
import pickle
from regression.test_setup import pickle_path, advanced_config_data
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from regression import test_utils as utils
COLLATION_URL = '/browser/collation/obj/'
def get_collation_config_data(server_connect_data):
"""This function returns the collation config data"""
adv_config_data = None
data = None
db_user = server_connect_data['data']['user']['name']
# Get the config data of appropriate db user
for config_test_data in \
advanced_config_data['collation_credentials']:
if db_user == config_test_data['owner']:
adv_config_data = config_test_data
if adv_config_data is not None:
data = {
"copy_collation": adv_config_data['copy_collation'],
"name": adv_config_data['name'],
"owner": adv_config_data['owner'],
"schema": adv_config_data['schema']
}
return data
def write_collation_id(response_data, server_id):
"""
This function writes the server and collation id
:param response_data: collation response data
:type response_data: dict
:param server_id: server id
:type server_id: int
:return: None
"""
collation_id = response_data['node']['_id']
pickle_id_dict = utils.get_pickle_id_dict()
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'coid' in pickle_id_dict:
if pickle_id_dict['coid']:
# Add the db_id as value in dict
pickle_id_dict["coid"][0].update(
{int(server_id): collation_id})
else:
# Create new dict with server_id and db_id
pickle_id_dict["coid"].append(
{int(server_id): collation_id})
db_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, db_output)
db_output.close()
def add_collation(tester, server_connect_response, server_ids):
"""This function add the collation to schemas"""
all_id = utils.get_ids()
db_ids_dict = all_id["did"][0]
schema_ids_dict = all_id["scid"][0]
for server_connect_response, server_id in zip(server_connect_response,
server_ids):
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(tester, utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
schema_info = schema_ids_dict[int(server_id)]
schema_utils.verify_schemas(tester, server_id, db_id,
schema_info[0])
data = get_collation_config_data(server_connect_response)
data['schema'] = schema_info[1]
response = tester.post(
COLLATION_URL + str(utils.SERVER_GROUP) + '/' + str(server_id)
+ '/' + str(db_id) + '/' + str(schema_info[0]) + '/',
data=json.dumps(data), content_type='html/json')
response_data = json.loads(response.data.decode('utf-8'))
write_collation_id(response_data, server_id)
def verify_collation(tester, server_id, db_id, schema_id, collation_id):
"""This function verifies the collation using GET API"""
get_response = tester.get(
COLLATION_URL + str(utils.SERVER_GROUP) + '/' + str(server_id) + '/' +
str(db_id) + '/' + str(schema_id) + '/' + str(collation_id),
content_type='html/json')
return get_response
def delete_collation(tester):
"""This function deletes the collations from schema"""
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_ids_dict = all_id["scid"][0]
collation_ids_dict = all_id["coid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(tester, utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
schema_info = schema_ids_dict[int(server_id)]
schema_response = schema_utils.verify_schemas(tester, server_id,
db_id,
schema_info[0])
schema_response = json.loads(schema_response.data.decode('utf-8'))
if len(schema_response) != 0:
collation_id = collation_ids_dict[int(server_id)]
get_response = verify_collation(
tester, server_id, db_id, schema_info[0], collation_id)
get_response_data = json.loads(
get_response.data.decode('utf-8'))
if len(get_response_data) == 0:
raise Exception("No collation node to delete.")
del_response = tester.delete(
COLLATION_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) + '/' +
str(schema_info[0]) + '/' + str(collation_id),
follow_redirects=True)
assert del_response.status_code == 200
del_response_data = json.loads(
del_response.data.decode('utf-8'))
assert del_response_data['success'] == 1

View File

@ -0,0 +1,17 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.utils.route import BaseTestGenerator
class TriggerFunctionTestGenerator(BaseTestGenerator):
def generate_tests(self):
return

View File

@ -0,0 +1,74 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as trigger_funcs_utils
class TriggerFuncAddTestCase(BaseTestGenerator):
""" This class will add new trigger function under schema node. """
scenarios = [
# Fetching default URL for trigger function node.
('Fetch Trigger Function Node URL', dict(
url='/browser/trigger_function/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
4. Add the schemas
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schemas
schema_utils.add_schemas(cls.tester)
def runTest(self):
""" This function will add trigger function under schema node. """
trigger_funcs_utils.add_trigger_function(
self.tester, self.server_connect_response, self.server_ids)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added trigger function, schemas, database,
server and the 'parent_id.pkl' file which is created in setup()
function.
:return: None
"""
trigger_funcs_utils.delete_trigger_function(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,77 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as trigger_funcs_utils
class TriggerFuncDeleteTestCase(BaseTestGenerator):
""" This class will delete the trigger function under schema node. """
scenarios = [
# Fetching default URL for trigger function node.
('Fetch Trigger Function Node URL',
dict(url='/browser/trigger_function/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
4. Add the schemas
5. Add the trigger functions
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schemas
schema_utils.add_schemas(cls.tester)
# Add trigger functions
trigger_funcs_utils.add_trigger_function(
cls.tester, cls.server_connect_response, cls.server_ids)
def runTest(self):
""" This function will delete trigger function under database node. """
trigger_funcs_utils.delete_trigger_function(self.tester)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added trigger function, schemas, database,
server and the 'parent_id.pkl' file which is created in setup()
function.
:return: None
"""
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,97 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as trigger_funcs_utils
class TriggerFuncGetTestCase(BaseTestGenerator):
"""This class will fetch added trigger function under schema node."""
scenarios = [
# Fetching default URL for trigger function node.
('Fetch Trigger Function Node URL',
dict(url='/browser/trigger_function/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
4. Add the schemas
5. Add the trigger functions
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schemas
schema_utils.add_schemas(cls.tester)
# Add trigger functions
trigger_funcs_utils.add_trigger_function(
cls.tester, cls.server_connect_response, cls.server_ids)
def runTest(self):
""" This function will delete trigger function under database node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_ids_dict = all_id["scid"][0]
trigger_ids_dict = all_id["tfnid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(
self.tester, utils.SERVER_GROUP, server_id, db_id)
if db_con['data']["connected"]:
schema_id = schema_ids_dict[int(server_id)]
schema_response = schema_utils.verify_schemas(
self.tester, server_id, db_id, schema_id)
if schema_response.status_code == 200:
trigger_func_ids = trigger_ids_dict[int(server_id)]
for trigger_func_id in trigger_func_ids:
trigger_response = \
trigger_funcs_utils.verify_trigger_function(
self.tester, server_id, db_id, schema_id,
trigger_func_id)
self.assertTrue(trigger_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added trigger function, schemas, database,
server and the 'parent_id.pkl' file which is created in setup()
function.
:return: None
"""
trigger_funcs_utils.delete_trigger_function(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,119 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from regression.test_setup import advanced_config_data
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as trigger_funcs_utils
class TriggerFuncPutTestCase(BaseTestGenerator):
""" This class will update new trigger function under schema node. """
scenarios = [
# Fetching default URL for trigger function node.
('Fetch Trigger Function Node URL',
dict(url='/browser/trigger_function/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
4. Add the schemas
5. Add the trigger functions
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schemas
schema_utils.add_schemas(cls.tester)
# Add trigger functions
trigger_funcs_utils.add_trigger_function(
cls.tester, cls.server_connect_response, cls.server_ids)
def runTest(self):
""" This function will update trigger function under database node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_ids_dict = all_id["scid"][0]
trigger_ids_dict = all_id["tfnid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(self.tester,
utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
schema_id = schema_ids_dict[int(server_id)]
schema_response = schema_utils.verify_schemas(self.tester,
server_id,
db_id,
schema_id)
if schema_response.status_code == 200:
trigger_func_ids = trigger_ids_dict[int(server_id)]
for trigger_func_id in trigger_func_ids:
trigger_response = \
trigger_funcs_utils.verify_trigger_function(
self.tester, server_id, db_id, schema_id,
trigger_func_id)
if trigger_response.status_code == 200:
data = {
"description": advanced_config_data[
'trigger_func_update_data']['comment'],
"id": trigger_func_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' +
str(db_id) + '/' +
str(schema_id) + '/' +
str(trigger_func_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added trigger function, schemas, database,
server and the 'parent_id.pkl' file which is created in setup()
function.
:return: None
"""
trigger_funcs_utils.delete_trigger_function(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,174 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
import json
import os
import pickle
import uuid
from regression.test_setup import pickle_path, advanced_config_data
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from regression import test_utils as utils
TRIGGER_FUNCTIONS_URL = '/browser/trigger_function/obj/'
def get_trigger_func_data(server_connect_data):
"""This function returns the trigger function config data"""
adv_config_data = None
data = None
db_user = server_connect_data['data']['user']['name']
# Get the config data of appropriate db user
for config_test_data in \
advanced_config_data['trigger_function_credentials']:
if db_user == config_test_data['fun_owner']:
adv_config_data = config_test_data
if adv_config_data is not None:
data = {
"acl": adv_config_data['acl'],
"arguments": adv_config_data['args'],
"funcowner": adv_config_data['fun_owner'],
"lanname": adv_config_data['language'],
"name": adv_config_data['name'],
"options": adv_config_data['options'],
"proleakproof": adv_config_data['leak_proof'],
"pronamespace": adv_config_data['namespace'],
"prorettypename": adv_config_data['type'],
"prosecdef": adv_config_data['sec_def'],
"prosrc": adv_config_data['code'],
"provolatile": adv_config_data['volitile'],
"seclabels": adv_config_data['sec_label'],
"variables": adv_config_data['Variable']
}
return data
def write_trigger_func_id(trigger_func_ids_list, server_id):
"""
This function writes the server and trigger function related data like
server id and trigger function name
:param trigger_func_ids_list: list of trigger functions ids
:type trigger_func_ids_list: list
:param server_id: server id
:type server_id: int
:return: None
"""
pickle_id_dict = utils.get_pickle_id_dict()
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'tfnid' in pickle_id_dict:
if pickle_id_dict['tfnid']:
# Add the db_id as value in dict
pickle_id_dict["tfnid"][0].update(
{int(server_id): trigger_func_ids_list})
else:
# Create new dict with server_id and db_id
pickle_id_dict["tfnid"].append(
{int(server_id): trigger_func_ids_list})
db_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, db_output)
db_output.close()
def add_trigger_function(tester, server_connect_response, server_ids):
"""This function add the trigger function to schema"""
all_id = utils.get_ids()
db_ids_dict = all_id["did"][0]
schema_ids_dict = all_id["scid"][0]
for server_connect_response, server_id in zip(server_connect_response,
server_ids):
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(tester, utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
schema_id = schema_ids_dict[int(server_id)][0]
schema_utils.verify_schemas(tester, server_id, db_id,
schema_id)
data = get_trigger_func_data(server_connect_response)
# Get the type from config data. We are adding two types
# i.e. event_trigger and trigger.
trigger_func_types = data['prorettypename'].split('/')
trigger_func_ids_list = []
for func_type in trigger_func_types:
data['prorettypename'] = func_type
data["name"] = str(uuid.uuid4())[1:8]
if schema_id:
data['pronamespace'] = schema_id
else:
schema_id = data['pronamespace']
response = tester.post(
TRIGGER_FUNCTIONS_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) + '/' + str(schema_id)
+ '/', data=json.dumps(data), content_type='html/json')
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
trigger_func_id = response_data['node']['_id']
trigger_func_ids_list.append(trigger_func_id)
write_trigger_func_id(trigger_func_ids_list, server_id)
def verify_trigger_function(tester, server_id, db_id, schema_id,
trigger_func_id):
"""This function verifies the trigger function with GET API"""
get_response = tester.get(
TRIGGER_FUNCTIONS_URL + str(utils.SERVER_GROUP) + '/'
+ str(server_id) + '/' + str(db_id) + '/' +
str(schema_id) + '/' + str(trigger_func_id),
content_type='html/json')
assert get_response.status_code == 200
return get_response
def delete_trigger_function(tester):
"""This function add the trigger function to schema"""
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_ids_dict = all_id["scid"][0]
trigger_ids_dict = all_id["tfnid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(tester, utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
schema_id = schema_ids_dict[int(server_id)][0]
schema_response = schema_utils.verify_schemas(
tester, server_id, db_id, schema_id)
if schema_response.status_code == 200:
trigger_func_ids = trigger_ids_dict[int(server_id)]
for trigger_func_id in trigger_func_ids:
trigger_response = verify_trigger_function(
tester, server_id, db_id, schema_id, trigger_func_id)
if trigger_response.status_code == 200:
del_response = tester.delete(
TRIGGER_FUNCTIONS_URL + str(
utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) + '/' +
str(schema_id) + '/' + str(trigger_func_id),
follow_redirects=True)
assert del_response.status_code == 200
del_response_data = json.loads(
del_response.data.decode('utf-8'))
assert del_response_data['success'] == 1

View File

@ -0,0 +1,17 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.utils.route import BaseTestGenerator
class SchemaTestGenerator(BaseTestGenerator):
def generate_tests(self):
return []

View File

@ -0,0 +1,65 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from . import utils as schema_utils
class SchemaAddTestCase(BaseTestGenerator):
""" This class will add new schema under database node. """
scenarios = [
# Fetching default URL for schema node.
('Check Schema Node URL', dict(url='/browser/schema/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
def runTest(self):
""" This function will add schema under database node. """
schema_utils.add_schemas(self.tester)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added schemas, database, server
and the 'parent_id.pkl' file which is created in setup() function.
:return: None
"""
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,66 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from . import utils as schema_utils
class SchemaDeleteTestCase(BaseTestGenerator):
""" This class will add new schema under database node. """
scenarios = [
# Fetching default URL for extension node.
('Check Schema Node URL', dict(url='/browser/schema/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schemas
schema_utils.add_schemas(cls.tester)
def runTest(self):
""" This function will delete schema under database node. """
schema_utils.delete_schema(self.tester)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added schemas, database, server
and the 'parent_id.pkl' file which is created in setup() function.
:return: None
"""
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,82 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from . import utils as schema_utils
class SchemaGetTestCase(BaseTestGenerator):
""" This class will add new schema under database node. """
scenarios = [
# Fetching default URL for extension node.
('Check Schema Node URL', dict(url='/browser/schema/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schemas
schema_utils.add_schemas(cls.tester)
def runTest(self):
""" This function will delete schema under database node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_ids_dict = all_id["scid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(self.tester,
utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
schema_id = schema_ids_dict[int(server_id)][0]
schema_response = schema_utils.verify_schemas(self.tester,
server_id, db_id,
schema_id)
self.assertTrue(schema_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added schemas, database, server
and the 'parent_id.pkl' file which is created in setup() function.
:return: None
"""
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,112 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from regression.test_setup import advanced_config_data
from . import utils as schema_utils
class SchemaPutTestCase(BaseTestGenerator):
""" This class will update the schema under database node. """
scenarios = [
# Fetching default URL for extension node.
('Check Schema Node URL', dict(url='/browser/schema/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schemas
schema_utils.add_schemas(cls.tester)
def runTest(self):
""" This function will delete schema under database node. """
all_id = utils.get_ids()
db_ids_dict = all_id["did"][0]
schema_ids_dict = all_id["scid"][0]
for server_connect_data, server_id in zip(self.server_connect_response,
self.server_ids):
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(self.tester,
utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
schema_id = schema_ids_dict[int(server_id)][0]
schema_response = schema_utils.verify_schemas(self.tester,
server_id, db_id,
schema_id)
schema_response = json.loads(
schema_response.data.decode('utf-8'))
if not schema_response:
raise Exception("No schema(s) to update.")
adv_config_data = None
data = None
db_user = server_connect_data['data']['user']['name']
# Get the config data of appropriate db user
for config_test_data in advanced_config_data['schema_update_data']:
if db_user == config_test_data['owner']:
adv_config_data = config_test_data
if adv_config_data is not None:
data = {
"deffuncacl": adv_config_data["func_acl"],
"defseqacl": adv_config_data["seq_acl"],
"deftblacl": adv_config_data["tbl_acl"],
"id": schema_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' + str(server_id) +
'/' + str(db_id) + '/' + str(schema_id),
data=json.dumps(data), follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
response_data = json.loads(put_response.data.decode('utf-8'))
self.assertTrue(response_data['success'], 1)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added schemas, database, server
and the 'parent_id.pkl' file which is created in setup() function.
:return: None
"""
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,148 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
import os
import pickle
import uuid
from regression.test_setup import pickle_path, advanced_config_data
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from regression import test_utils as utils
SCHEMA_URL = '/browser/schema/obj/'
def get_schema_config_data(server_connect_response):
"""This function is used to get advance config test data for schema"""
adv_config_data = None
data = None
db_user = server_connect_response['data']['user']['name']
# Get the config data of appropriate db user
for config_test_data in advanced_config_data['schema_credentials']:
if db_user == config_test_data['owner']:
adv_config_data = config_test_data
if adv_config_data is not None:
data = {
"deffuncacl": adv_config_data['func_acl'],
"defseqacl": adv_config_data['seq_acl'],
"deftblacl": adv_config_data['tbl_acl'],
"deftypeacl": adv_config_data['type_acl'],
"name": "test_{0}".format(str(uuid.uuid4())[1:8]),
"namespaceowner": adv_config_data['owner'],
"nspacl": adv_config_data['privilege'],
"seclabels": adv_config_data['sec_label']
}
return data
def write_schema_id(response_data, server_id):
"""
This function writes the schema id into parent_id.pkl
:param response_data: schema add response data
:type response_data: dict
:param server_id: server id
:type server_id: str
:return: None
"""
schema_id = response_data['node']['_id']
schema_name = str(response_data['node']['label'])
pickle_id_dict = utils.get_pickle_id_dict()
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'scid' in pickle_id_dict:
if pickle_id_dict['scid']:
# Add the schema_id as value in dict
pickle_id_dict["scid"][0].update(
{int(server_id): [schema_id, schema_name]})
else:
# Create new dict with server_id and schema_id
pickle_id_dict["scid"].append(
{int(server_id): [schema_id, schema_name]})
schema_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, schema_output)
schema_output.close()
def add_schemas(tester):
"""This function add the schemas into databases"""
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
for server_id in server_ids:
server_connect_response = server_utils.verify_server(
tester, str(utils.SERVER_GROUP), server_id)
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(tester, utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
data = get_schema_config_data(
server_connect_response)
response = tester.post(
SCHEMA_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) +
'/', data=json.dumps(data),
content_type='html/json')
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
write_schema_id(response_data, server_id)
def verify_schemas(tester, server_id, db_id, schema_id):
"""This function verifies the schema is exists"""
response = tester.get(SCHEMA_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(schema_id),
content_type='html/json')
return response
def delete_schema(tester):
"""This function delete schemas from the databases"""
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_ids_dict = all_id["scid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(tester, utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
schema_id = schema_ids_dict[int(server_id)][0]
schema_response = verify_schemas(tester, server_id, db_id,
schema_id)
schema_response = json.loads(schema_response.data.decode('utf-8'))
if not schema_response:
raise Exception("No schema(s) to delete.")
del_response = tester.delete(
SCHEMA_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' +
str(db_id) + '/' +
str(schema_id),
follow_redirects=True)
assert del_response.status_code == 200
response_data = json.loads(del_response.data.decode('utf-8'))
assert response_data['success'] == 1

View File

@ -12,7 +12,7 @@ import os
import pickle
import uuid
from regression.test_setup import pickle_path, config_data, advanced_config_data
from regression.test_setup import pickle_path, advanced_config_data
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from regression import test_utils as utils
@ -130,7 +130,7 @@ def verify_database(tester, server_group, server_id, db_id):
"""
# Verify servers
server_utils.verify_server(tester,server_group,server_id)
server_utils.verify_server(tester, server_group, server_id)
# Connect to database
con_response = tester.post('{0}{1}/{2}/{3}'.format(

View File

@ -143,5 +143,162 @@
"tbspc_update_data": {
"comment": "This is tablespace update comment"
},
"schema_credentials":[
{
"func_acl": [],
"seq_acl": [],
"tbl_acl": [],
"type_acl": [],
"name": "test_schema",
"owner": "postgres",
"privilege":
[
{
"grantee":"postgres",
"grantor":"postgres",
"privileges":
[
{
"privilege_type":"C",
"privilege":true,
"with_grant":false
},
{
"privilege_type":"U",
"privilege":true,
"with_grant":false
}
]
}
],
"sec_label": []
}],
"schema_update_data":[
{
"owner": "postgres",
"tbl_acl":
{
"added":
[
{
"grantee": "public",
"grantor": "postgres",
"privileges":
[
{
"privilege_type": "D",
"privilege": true,
"with_grant": false
},
{
"privilege_type": "x",
"privilege": true,
"with_grant": false
}
]
}
]
},
"func_acl":
{
"added":
[
{
"grantee":"postgres",
"grantor":"postgres",
"privileges":
[
{
"privilege_type":"X",
"privilege":true,
"with_grant":true
}
]
}
]
},
"seq_acl":
{
"added":
[
{
"grantee":"postgres",
"grantor":"postgres",
"privileges":
[
{
"privilege_type":"r",
"privilege":true,
"with_grant":false
},
{
"privilege_type":"w",
"privilege":true,
"with_grant":false
},
{
"privilege_type":"U",
"privilege":true,
"with_grant":false
}
]
}
]
}
}],
"trigger_function_credentials":[
{
"acl":
[
{
"grantee":"postgres",
"grantor":"postgres",
"privileges":
[
{
"privilege_type":"X",
"privilege":true,
"with_grant":true
}
]
}
],
"args": [],
"fun_owner": "postgres",
"language": "plpgsql",
"name": "test_abort_any_command",
"options": [],
"leak_proof": true,
"namespace": 2200,
"type": "event_trigger/trigger",
"sec_def": true,
"code": "BEGIN RAISE EXCEPTION 'command % is disabled', tg_tag; END;",
"volitile": "s",
"sec_label": [],
"Variable":
[
{
"name":"enable_sort",
"value":true
}
]
}],
"trigger_func_update_data": {
"comment": "This is trigger function update comment"
},
"collation_credentials":[
{
"copy_collation": "pg_catalog.\"POSIX\"",
"name": "test_collation",
"owner": "postgres",
"schema": ""
}],
"collation_update_data": {
"comment": "This is collation update comment"
}
}

View File

@ -24,7 +24,9 @@ def get_pickle_id_dict():
"did": [], # database
"lrid": [], # role
"tsid": [], # tablespace
"scid": [] # schema
"scid": [], # schema
"tfnid": [], # trigger functions
"coid": [] # collation
}
return pickle_id_dict
@ -45,22 +47,22 @@ def get_ids(url=pickle_path):
return ids
def test_getnodes(tester=None):
# Connect to server and database.
if not tester:
return None
all_id = get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
db_con = []
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con.append(verify_database(tester, SERVER_GROUP, server_id, db_id))
return db_con
# def test_getnodes(tester=None):
# # Connect to server and database.
#
# if not tester:
# return None
#
# all_id = get_ids()
#
# server_ids = all_id["sid"]
# db_ids_dict = all_id["did"][0]
#
# db_con = []
# for server_id in server_ids:
# db_id = db_ids_dict[int(server_id)]
# db_con.append(verify_database(tester, SERVER_GROUP, server_id, db_id))
# return db_con
def login_tester_account(tester):