Test suite enhancements:

1. The user will specify the tablespace path in test_config.json.in
2.  If tablespace path not found, skip the test cases for that server(Only tablespace test cases)
3.  Add the skipped test summary in the test result. (Now it's showing on console + in log file, but need to update in a final enhanced test summary report. Which is research point we will work on that after finishing all nodes API test cases)
4.  Removed the test_ prefix from the values in the config files.
5. Add tablespace and roles tests
pull/3/head
Navnath Gadakh 2016-08-09 16:05:40 +01:00 committed by Dave Page
parent 2b13d55016
commit 81e2bc1e80
34 changed files with 1685 additions and 625 deletions

View File

@ -7,10 +7,11 @@
#
# ##################################################################
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as database_utils
class DatabaseAddTestCase(BaseTestGenerator):
@ -24,7 +25,8 @@ class DatabaseAddTestCase(BaseTestGenerator):
('Check Databases Node URL', dict(url='/browser/database/obj/'))
]
def setUp(self):
@classmethod
def setUpClass(cls):
"""
This function used to add the sever
@ -32,27 +34,23 @@ class DatabaseAddTestCase(BaseTestGenerator):
"""
# Add the server
utils.add_server(self.tester)
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
def runTest(self):
""" This function will add database under 1st server of tree node. """
server_connect_response, server_group, server_ids = \
utils.connect_server(self.tester)
database_utils.add_database(self.tester, self.server_connect_response,
self.server_ids)
for server_connect, server_id in zip(server_connect_response,
server_ids):
if server_connect['data']['connected']:
data = utils.get_db_data(server_connect)
db_response = self.tester.post(self.url + str(server_group) +
"/" + server_id + "/",
data=json.dumps(data),
content_type='html/json')
self.assertTrue(db_response.status_code, 200)
response_data = json.loads(db_response.data.decode('utf-8'))
utils.write_db_parent_id(response_data)
def tearDown(self):
@classmethod
def tearDownClass(cls):
"""
This function deletes the added database, added server and the
'parent_id.pkl' file which is created in setup()
@ -60,6 +58,6 @@ class DatabaseAddTestCase(BaseTestGenerator):
:return: None
"""
utils.delete_database(self.tester)
utils.delete_server(self.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -7,12 +7,10 @@
#
# ##################################################################
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression.test_setup import config_data
from regression.test_utils import get_ids
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as database_utils
class DatabaseDeleteTestCase(BaseTestGenerator):
@ -23,43 +21,38 @@ class DatabaseDeleteTestCase(BaseTestGenerator):
('Check Databases Node URL', dict(url='/browser/database/obj/'))
]
def setUp(self):
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
:return: None
"""
# Firstly, add the server
utils.add_server(self.tester)
# Secondly, connect to server/database
utils.connect_server(self.tester)
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
def runTest(self):
""" This function will delete the database."""
srv_grp = config_data['test_server_group']
all_id = get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
database_utils.delete_database(self.tester)
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = utils.verify_database(self.tester, srv_grp, server_id,
db_id)
if len(db_con) == 0:
raise Exception("No database(s) to delete for server id %s"
% server_id)
response = self.tester.delete(self.url + str(srv_grp) + '/' +
str(server_id) + '/' + str(db_id),
follow_redirects=True)
response_data = json.loads(response.data.decode('utf-8'))
self.assertTrue(response_data['success'], 1)
def tearDown(self):
@classmethod
def tearDownClass(cls):
"""
This function deletes the added server and the 'parent_id.pkl' file
which is created in setup() function.
@ -67,5 +60,5 @@ class DatabaseDeleteTestCase(BaseTestGenerator):
:return: None
"""
utils.delete_server(self.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -11,6 +11,8 @@ from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression.test_setup import config_data
from regression.test_utils import get_ids
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as database_utils
class DatabasesGetTestCase(BaseTestGenerator):
@ -20,22 +22,33 @@ class DatabasesGetTestCase(BaseTestGenerator):
scenarios = [
# Fetching default URL for database node.
('Check Databases Node URL', dict(url='/browser/database/obj/'))
('Check Dat abases Node URL', dict(url='/browser/database/obj/'))
]
def setUp(self):
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
:return: None
"""
# Firstly, add the server
utils.add_server(self.tester)
# Secondly, connect to server/database
utils.connect_server(self.tester)
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
def runTest(self):
""" This function will fetch added database. """
@ -44,11 +57,12 @@ class DatabasesGetTestCase(BaseTestGenerator):
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
srv_grp = config_data['test_server_group']
srv_grp = config_data['server_group']
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = utils.verify_database(self.tester, srv_grp, server_id,
db_con = database_utils.verify_database(self.tester, srv_grp,
server_id,
db_id)
if db_con["info"] == "Database connected.":
response = self.tester.get(
@ -56,7 +70,8 @@ class DatabasesGetTestCase(BaseTestGenerator):
str(db_id), follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
@classmethod
def tearDownClass(cls):
"""
This function deletes the added database, added server
and the 'parent_id.pkl' file which is created in setup() function.
@ -64,6 +79,6 @@ class DatabasesGetTestCase(BaseTestGenerator):
:return: None
"""
utils.delete_database(self.tester)
utils.delete_server(self.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -11,8 +11,10 @@ import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression.test_setup import config_data, advanced_config_data
from regression.test_setup import advanced_config_data
from regression.test_utils import get_ids
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as database_utils
class DatabasesUpdateTestCase(BaseTestGenerator):
@ -25,44 +27,58 @@ class DatabasesUpdateTestCase(BaseTestGenerator):
('Check Databases Node', dict(url='/browser/database/obj/'))
]
def setUp(self):
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
:return: None
"""
# Firstly, add the server
utils.add_server(self.tester)
# Secondly, connect to server/database
utils.connect_server(self.tester)
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
def runTest(self):
""" This function will update the comments field of database."""
srv_grp = config_data['test_server_group']
all_id = get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = utils.verify_database(self.tester, srv_grp, server_id,
db_con = database_utils.verify_database(self.tester,
utils.SERVER_GROUP,
server_id,
db_id)
if db_con["info"] == "Database connected.":
data = {
"comments": advanced_config_data["test_db_update_data"][0]
["test_comment"],
"comments": advanced_config_data["db_update_data"][0]
["comment"],
"id": db_id
}
put_response = self.tester.put(
self.url + str(srv_grp) + '/' + str(server_id) + '/' +
self.url + str(utils.SERVER_GROUP) + '/' + str(
server_id) + '/' +
str(db_id), data=json.dumps(data), follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
def tearDown(self):
@classmethod
def tearDownClass(self):
"""
This function deletes the added server and 'parent_id.pkl' file
which is created in setup() function.
@ -70,5 +86,6 @@ class DatabasesUpdateTestCase(BaseTestGenerator):
:return: None
"""
utils.delete_server(self.tester)
database_utils.delete_database(self.tester)
server_utils.delete_server(self.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,176 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
import json
import os
import pickle
import uuid
from regression.test_setup import pickle_path, config_data, advanced_config_data
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from regression import test_utils as utils
DATABASE_URL = '/browser/database/obj/'
DATABASE_CONNECT_URL = 'browser/database/connect/'
def get_db_data(server_connect_data):
"""
This function is used to get advance config test data for appropriate
server
:param server_connect_data: list of server details
:return data: database details
:rtype: dict
"""
adv_config_data = None
data = None
db_user = server_connect_data['data']['user']['name']
# Get the config data of appropriate db user
for config_test_data in advanced_config_data['add_database_data']:
if db_user == config_test_data['owner']:
adv_config_data = config_test_data
if adv_config_data is not None:
data = {
"datacl": adv_config_data['privileges_acl'],
"datconnlimit": adv_config_data['conn_limit'],
"datowner": adv_config_data['owner'],
"deffuncacl": adv_config_data['fun_acl'],
"defseqacl": adv_config_data['seq_acl'],
"deftblacl": adv_config_data['tbl_acl'],
"deftypeacl": adv_config_data['type_acl'],
"encoding": adv_config_data['encoding'],
"name": str(uuid.uuid4())[1:8],
"privileges": adv_config_data['privileges'],
"securities": adv_config_data['securities'],
"variables": adv_config_data['variables']
}
return data
def write_db_id(response_data):
"""
This function writes the server and database related data like server
name, server id , database name, database id etc.
:param response_data: server and databases details
:type response_data: dict
:return: None
"""
db_id = response_data['node']['_id']
server_id = response_data['node']['_pid']
pickle_id_dict = utils.get_pickle_id_dict()
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'did' in pickle_id_dict:
if pickle_id_dict['did']:
# Add the db_id as value in dict
pickle_id_dict["did"][0].update({server_id: db_id})
else:
# Create new dict with server_id and db_id
pickle_id_dict["did"].append({server_id: db_id})
db_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, db_output)
db_output.close()
def add_database(tester, server_connect_response, server_ids):
"""
This function add the database into servers
:param tester: flask test client
:type tester: flask test object
:param server_connect_response: server response
:type server_connect_response: dict
:param server_ids: server ids
:type server_ids: list
:return: None
"""
for server_connect, server_id in zip(server_connect_response, server_ids):
if server_connect['data']['connected']:
data = get_db_data(server_connect)
db_response = tester.post(DATABASE_URL + str(utils.SERVER_GROUP) +
"/" + server_id + "/",
data=json.dumps(data),
content_type='html/json')
assert db_response.status_code == 200
response_data = json.loads(db_response.data.decode('utf-8'))
write_db_id(response_data)
def verify_database(tester, server_group, server_id, db_id):
"""
This function verifies that database is exists and whether it connect
successfully or not
:param tester: test client
:type tester: flask test client object
:param server_group: server group id
:type server_group: int
:param server_id: server id
:type server_id: str
:param db_id: database id
:type db_id: str
:return: temp_db_con
:rtype: list
"""
# Verify servers
server_utils.verify_server(tester,server_group,server_id)
# Connect to database
con_response = tester.post('{0}{1}/{2}/{3}'.format(
DATABASE_CONNECT_URL, server_group, server_id, db_id),
follow_redirects=True)
temp_db_con = json.loads(con_response.data.decode('utf-8'))
return temp_db_con
def delete_database(tester):
"""
This function used to delete the added databases
:param tester: test client object
:return: None
"""
server_ids = None
db_ids_dict = None
all_id = utils.get_ids()
if "sid" and "did" in all_id.keys():
server_ids = all_id["sid"]
if all_id['did']:
db_ids_dict = all_id['did'][0]
else:
raise Exception("Keys are not found in pickle dict: {}".format(["sid", "did"]))
if server_ids and db_ids_dict is not None:
for server_id in server_ids:
server_response = server_utils.verify_server(tester, utils.SERVER_GROUP, server_id)
if server_response["data"]["connected"]:
db_id = db_ids_dict[int(server_id)]
response = tester.delete(DATABASE_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id),
follow_redirects=True)
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
assert response_data['success'] == 1
else:
raise Exception("No servers/databases found.")

View File

@ -0,0 +1,16 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
class RoleGeneratorTestCase(BaseTestGenerator):
def runTest(self):
return

View File

@ -0,0 +1,55 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as roles_utils
class LoginRoleAddTestCase(BaseTestGenerator):
"""This class has add role scenario"""
scenarios = [
# Fetching default URL for roles node.
('Check Role Node', dict(url='/browser/role/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function used to add the sever
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the roles!!!")
def runTest(self):
"""This function test the add role scenario"""
roles_utils.add_role(self.tester, self.server_connect_response,
self.server_group, self.server_ids)
@classmethod
def tearDownClass(cls):
"""This function deletes the role,server and parent id file"""
roles_utils.delete_role(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,58 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as roles_utils
class LoginRoleDeleteTestCase(BaseTestGenerator):
"""This class has delete role scenario"""
scenarios = [
# Fetching default URL for roles node.
('Check Role Node', dict(url='/browser/role/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function used to add the sever and roles
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the roles!!!")
# Add the role
roles_utils.add_role(cls.tester, cls.server_connect_response,
cls.server_group, cls.server_ids)
def runTest(self):
"""This function tests the delete role scenario"""
roles_utils.delete_role(self.tester)
@classmethod
def tearDownClass(self):
"""This function deletes the role,server and parent id file"""
server_utils.delete_server(self.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,67 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as roles_utils
class LoginRoleGetTestCase(BaseTestGenerator):
"""This class tests the get role scenario"""
scenarios = [
# Fetching default URL for roles node.
('Check Role Node', dict(url='/browser/role/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function used to add the sever and roles
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to get the roles!!!")
# Add the role
roles_utils.add_role(cls.tester, cls.server_connect_response,
cls.server_group, cls.server_ids)
def runTest(self):
"""This function test the get role scenario"""
all_id = utils.get_ids()
server_ids = all_id["sid"]
role_ids_dict = all_id["lrid"][0]
for server_id in server_ids:
role_id = role_ids_dict[int(server_id)]
response = self.tester.get(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(role_id),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""This function deletes the role,server and parent id file"""
roles_utils.delete_role(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,84 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from regression.test_setup import advanced_config_data
from . import utils as roles_utils
class LoginRolePutTestCase(BaseTestGenerator):
"""This class has update role scenario"""
scenarios = [
# Fetching default URL for roles node.
('Check Role Node', dict(url='/browser/role/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function used to add the sever and roles
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to get the roles!!!")
# Add the role
roles_utils.add_role(cls.tester, cls.server_connect_response,
cls.server_group, cls.server_ids)
def runTest(self):
"""This function tests the update role data scenario"""
all_id = utils.get_ids()
server_ids = all_id["sid"]
role_ids_dict = all_id["lrid"][0]
for server_id in server_ids:
role_id = role_ids_dict[int(server_id)]
role_response = roles_utils.verify_role(self.tester,
utils.SERVER_GROUP,
server_id,
role_id)
if len(role_response) == 0:
raise Exception("No roles(s) to update!!!")
data = {
"description": advanced_config_data["lr_update_data"]
["comment"],
"lrid": role_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(role_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""This function deletes the role,server and parent id file"""
roles_utils.delete_role(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,187 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
import os
import pickle
import uuid
from regression.test_setup import pickle_path, config_data, advanced_config_data
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from regression import test_utils as utils
ROLE_URL = '/browser/role/obj/'
def verify_role(tester, server_group, server_id, role_id):
"""
This function calls the GET API for role to verify
:param tester: test client
:type tester: flask test client object
:param server_group: server group id
:type server_group: int
:param server_id: server id
:type server_id: str
:param role_id: role id
:type role_id: int
:return: dict/None
"""
srv_connect = server_utils.verify_server(tester, server_group, server_id)
if srv_connect['data']['connected']:
response = tester.get(
'{0}{1}/{2}/{3}'.format(ROLE_URL, server_group, server_id,
role_id),
content_type='html/json')
temp_response = json.loads(response.data.decode('utf-8'))
return temp_response
else:
return None
def test_getrole(tester):
if not tester:
return None
all_id = utils.get_ids()
server_ids = all_id["sid"]
role_ids_dict = all_id["lrid"][0]
server_group = config_data['server_group']
role_response_data = []
for server_id in server_ids:
role_id = role_ids_dict[int(server_id)]
role_response_data.append(
verify_role(tester, server_group, server_id, role_id))
return role_response_data
def get_role_data():
"""This function returns the role data from config file"""
data = {
"rolcanlogin": advanced_config_data['lr_credentials']
['can_login'],
"rolconnlimit": advanced_config_data['lr_credentials']
['conn_limit'],
"rolcreaterole": advanced_config_data['lr_credentials']
['create_role'],
"rolinherit": advanced_config_data['lr_credentials']
['role_inherit'],
"rolmembership": advanced_config_data['lr_credentials']
['role_membership'],
"rolname": str(uuid.uuid4())[1:8],
"rolpassword": advanced_config_data['lr_credentials']
['lr_password'],
"rolvaliduntil": advanced_config_data['lr_credentials']
['lr_validity'],
"seclabels": advanced_config_data['lr_credentials']
['sec_lable'],
"variables": advanced_config_data['lr_credentials']
['variable']
}
return data
def add_role(tester, server_connect_response, server_group, server_ids):
"""
This function is used to add the roles to server
:param tester: flask test client
:type tester: flask test client object
:param server_connect_response: server connect API response
:type server_connect_response: dict
:param server_group: server group
:type server_group: int
:param server_ids: list of server id
:type server_ids: list
:return: None
"""
for server_connect, server_id in zip(server_connect_response,
server_ids):
if server_connect['data']['connected']:
data = get_role_data()
response = tester.post(ROLE_URL + str(server_group) + '/'
+ server_id + '/', data=json.dumps(data),
content_type='html/json')
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
write_role_id(response_data)
def write_role_id(response_data):
"""
:param response_data:
:return:
"""
lr_id = response_data['node']['_id']
server_id = response_data['node']['_pid']
pickle_id_dict = utils.get_pickle_id_dict()
# TODO: modify logic to write in file / file exists or create new check
# old file
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'lrid' in pickle_id_dict:
if pickle_id_dict['lrid']:
# Add the db_id as value in dict
pickle_id_dict["lrid"][0].update({server_id: lr_id})
else:
# Create new dict with server_id and db_id
pickle_id_dict["lrid"].append({server_id: lr_id})
db_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, db_output)
db_output.close()
def delete_role(tester):
"""
This function use to delete the existing roles in the servers
:param tester: flask test client
:type tester: flask test object
:return: None
"""
server_ids = None
role_ids_dict = None
all_id = utils.get_ids()
if "sid" and "lrid" in all_id.keys():
server_ids = all_id["sid"]
if all_id['lrid']:
role_ids_dict = all_id['lrid'][0]
else:
raise Exception("Keys are not found: {}".format(["sid", "lrid"]))
if server_ids and role_ids_dict is not None:
for server_id in server_ids:
server_response = server_utils.verify_server(tester,
utils.SERVER_GROUP,
server_id)
if server_response["data"]["connected"]:
role_id = role_ids_dict[int(server_id)]
response = tester.delete(
ROLE_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(role_id),
follow_redirects=True)
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
assert response_data['success'] == 1
else:
raise Exception("No servers/roles found.")

View File

@ -0,0 +1,16 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
class TblspaceGeneratorTestCase(BaseTestGenerator):
def runTest(self):
return

View File

@ -0,0 +1,55 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as tablespace_utils
class TableSpaceAddTestCase(BaseTestGenerator):
"""This class will add tablespace node under server"""
scenarios = [
# Fetching default URL for tablespace node.
('Check Tablespace Node', dict(url='/browser/tablespace/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function used to add the sever
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the roles!!!")
def runTest(self):
"""This function test the add tablespace scenario"""
tablespace_status = tablespace_utils.add_table_space(
self.tester, self.server_connect_response, self.server_group,
self.server_ids)
@classmethod
def tearDownClass(cls):
"""This function deletes the tablespaces,server and parent_id file"""
tablespace_utils.delete_table_space(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,58 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as tablespace_utils
class TableSpaceDeleteTestCase(BaseTestGenerator):
"""This class has delete table space scenario"""
scenarios = [
# Fetching default URL for tablespace node.
('Check Tablespace Node', dict(url='/browser/tablespace/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function used to add the sever
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the roles!!!")
# Add tablespace
tablespace_utils.add_table_space(cls.tester,
cls.server_connect_response,
cls.server_group, cls.server_ids)
def runTest(self):
"""This function tests the delete table space scenario"""
tablespace_utils.delete_table_space(self.tester)
@classmethod
def tearDownClass(cls):
"""This function deletes the server and parent id file"""
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,78 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as tablespace_utils
class TablespaceGetTestCase(BaseTestGenerator):
"""This class tests the get table space scenario"""
scenarios = [
# Fetching default URL for roles node.
('Check Tablespace Node', dict(url='/browser/tablespace/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function used to add the sever
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the roles!!!")
# Add tablespace
tablespace_utils.add_table_space(cls.tester,
cls.server_connect_response,
cls.server_group, cls.server_ids)
def runTest(self):
"""This function test the get table space scenario"""
tablespace_ids_dict = None
all_id = utils.get_ids()
server_ids = all_id["sid"]
if "tsid" in all_id and all_id["tsid"]:
tablespace_ids_dict = all_id["tsid"][0]
if tablespace_ids_dict:
for server_id in server_ids:
tablespace_id = tablespace_ids_dict[int(server_id)]
server_response = server_utils.verify_server(self.tester,
utils.SERVER_GROUP,
server_id)
if server_response['data']['connected']:
tablespace_utils.verify_table_space(
self.tester, utils.SERVER_GROUP, server_id, tablespace_id)
response = self.tester.get(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(
tablespace_id),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""This function deletes the tablespaces,server and parent id file"""
tablespace_utils.delete_table_space(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,90 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from test_setup import advanced_config_data
from . import utils as tablespace_utils
class TableSpaceUpdateTestCase(BaseTestGenerator):
"""This class has update tablespace scenario"""
scenarios = [
# Fetching default URL for roles node.
('Check Tablespace Node', dict(url='/browser/tablespace/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function used to add the sever
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the roles!!!")
# Add tablespace
tablespace_utils.add_table_space(cls.tester,
cls.server_connect_response,
cls.server_group, cls.server_ids)
def runTest(self):
"""This function tests the update tablespace data scenario"""
tablespace_ids_dict = None
all_id = utils.get_ids()
server_ids = all_id["sid"]
if "tsid" in all_id and all_id["tsid"]:
tablespace_ids_dict = all_id["tsid"][0]
if tablespace_ids_dict:
for server_id in server_ids:
tablespace_id = tablespace_ids_dict[int(server_id)]
tablespace_response = tablespace_utils.verify_table_space(
self.tester,
utils.SERVER_GROUP, server_id,
tablespace_id)
if len(tablespace_response) == 0:
raise Exception("No tablespace(s) to update!!!")
data = {
"description": advanced_config_data["tbspc_update_data"]
["comment"],
"table_space_id": tablespace_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(
tablespace_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""This function deletes the tablespaces,server and parent id file"""
tablespace_utils.delete_table_space(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,195 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from __future__ import print_function
import json
import os
import pickle
import uuid
import sys
from regression.test_setup import pickle_path, config_data, \
advanced_config_data
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from regression import test_utils as utils
TABLE_SPACE_URL = '/browser/tablespace/obj/'
# Tablespace utility
def get_tablespace_data(server_connect):
"""This function returns the tablespace data from config file"""
adv_config_data = None
data = None
db_user = server_connect['data']['user']['name']
server_config_data = config_data['server_credentials']
# Get the config data of appropriate db user
for config_test_data in advanced_config_data['tablespc_credentials']:
if db_user == config_test_data['spc_user']:
# Add the tablespace path from server config
server_config = (item for item in server_config_data if
item["db_username"] == db_user).next()
if "tablespace_path" in server_config:
config_test_data['spc_location'] = server_config['tablespace_path']
adv_config_data = config_test_data
else:
config_test_data['spc_location'] = None
if adv_config_data is not None:
data = {
"name": str(uuid.uuid4())[1:8],
"seclabels": adv_config_data["spc_seclable"],
"spcacl": adv_config_data["spc_acl"],
"spclocation": adv_config_data["spc_location"],
"spcoptions": adv_config_data["spc_opts"],
"spcuser": adv_config_data["spc_user"]
}
return data
def write_tablespace_id(response_data):
"""
This function write the table space id to parent_id.pkl
:param response_data: create table space API response data
:type response_data: dict
:return: None
"""
table_space_id = response_data['node']['_id']
server_id = response_data['node']['_pid']
pickle_id_dict = utils.get_pickle_id_dict()
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'tsid' in pickle_id_dict:
if pickle_id_dict['tsid']:
# Add the db_id as value in dict
pickle_id_dict["tsid"][0].update(
{server_id: table_space_id})
else:
# Create new dict with server_id and db_id
pickle_id_dict["tsid"].append(
{server_id: table_space_id})
db_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, db_output)
db_output.close()
def add_table_space(tester, server_connect_response, server_group, server_ids):
"""
This function is used to add the roles to server
:param tester: flask test client
:type tester: flask test client object
:param server_connect_response: server connect API response
:type server_connect_response: dict
:param server_group: server group
:type server_group: int
:param server_ids: list of server id
:type server_ids: list
:return: None
"""
total_servers_count = len(server_ids)
servers_without_tablespace_path = []
for server_connect, server_id in zip(server_connect_response,
server_ids):
tablespace_path = server_connect['tablespace_path']
# Skip the test case if tablespace_path does not exist
if not tablespace_path or tablespace_path is None:
file_name = os.path.basename(
sys._getframe().f_back.f_code.co_filename)
servers_without_tablespace_path.append(server_id)
if total_servers_count == len(servers_without_tablespace_path):
print("Skipping tablespaces test cases for the file <{0}>, "
"Tablespace path not configured for the servers: "
"{1}".format(file_name, server_ids), file=sys.stderr)
return
else:
print("Skipping tablespaces test case for the file <{0}>: "
"Tablespace path not configured for server: {1}".format(
file_name, server_id), file=sys.stderr)
continue
if server_connect['data']['connected']:
data = get_tablespace_data(server_connect)
response = tester.post(TABLE_SPACE_URL + str(server_group) + '/'
+ server_id + '/',
data=json.dumps(data),
content_type='html/json')
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
write_tablespace_id(response_data)
def verify_table_space(tester, server_group, server_id, tablespace_id):
"""
This function calls the GET API for role to verify
:param tester: test client
:type tester: flask test client object
:param server_group: server group id
:type server_group: int
:param server_id: server id
:type server_id: str
:param tablespace_id: table space id
:type tablespace_id: int
:return: dict/None
"""
srv_connect = server_utils.verify_server(tester, server_group, server_id)
if srv_connect['data']['connected']:
response = tester.get(
'{0}{1}/{2}/{3}'.format(TABLE_SPACE_URL, server_group, server_id,
tablespace_id),
content_type='html/json')
assert response.status_code == 200
temp_response = json.loads(response.data.decode('utf-8'))
return temp_response
else:
return None
def delete_table_space(tester):
"""
This function use to delete the existing tablespace in the servers
:param tester: flask test client
:type tester: flask test object
:return: None
"""
all_id = utils.get_ids()
server_ids = all_id["sid"]
if "tsid" in all_id and all_id["tsid"]:
tablespace_ids_dict = all_id["tsid"][0]
else:
tablespace_ids_dict = None
if tablespace_ids_dict is not None:
for server_id in server_ids:
tablespace_id = tablespace_ids_dict[int(server_id)]
role_response = verify_table_space(tester, utils.SERVER_GROUP,
server_id,
tablespace_id)
if len(role_response) == 0:
raise Exception("No tablespace(s) to delete!!!")
response = tester.delete(
TABLE_SPACE_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(tablespace_id),
follow_redirects=True)
assert response.status_code == 200
delete_response_data = json.loads(response.data.decode('utf-8'))
assert delete_response_data['success'] == 1

View File

@ -7,10 +7,9 @@
#
# ##########################################################################
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from . import utils as server_utils
class ServersAddTestCase(BaseTestGenerator):
@ -21,24 +20,17 @@ class ServersAddTestCase(BaseTestGenerator):
('Default Server Node url', dict(url='/browser/server/obj/'))
]
def setUp(self):
@classmethod
def setUpClass(cls):
pass
def runTest(self):
""" This function will add the server under default server group."""
server_group, config_data, pickle_id_dict = utils.get_config_data()
for server_data in config_data:
url = "{0}{1}/".format(self.url, server_group)
response = self.tester.post(url, data=json.dumps(server_data),
content_type='html/json')
server_utils.add_server(self.tester)
self.assertTrue(response.status_code, 200)
response_data = json.loads(response.data.decode())
utils.write_parent_id(response_data, pickle_id_dict)
def tearDown(self):
@classmethod
def tearDownClass(cls):
"""
This function deletes the added server and the 'parent_id.pkl' file
which is created in setup() function.
@ -46,5 +38,5 @@ class ServersAddTestCase(BaseTestGenerator):
:return: None
"""
utils.delete_server(self.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -7,12 +7,11 @@
#
# ##################################################################
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression.test_setup import config_data
from regression.test_utils import get_ids
from . import utils as server_utils
class ServerDeleteTestCase(BaseTestGenerator):
@ -23,7 +22,8 @@ class ServerDeleteTestCase(BaseTestGenerator):
('Default Server Node url', dict(url='/browser/server/obj/'))
]
def setUp(self):
@classmethod
def setUpClass(cls):
"""
This function is used to add the server
@ -31,28 +31,16 @@ class ServerDeleteTestCase(BaseTestGenerator):
"""
# Firstly, add the server
utils.add_server(self.tester)
server_utils.add_server(cls.tester)
def runTest(self):
""" This function will get all available servers under object browser
and delete the last server using server id."""
srv_grp = config_data['test_server_group']
all_id = get_ids()
server_ids = all_id["sid"]
server_utils.delete_server(self.tester)
url = self.url + str(srv_grp) + "/"
if len(server_ids) == 0:
raise Exception("No server(s) to delete!!!")
# Call api to delete the servers
for server_id in server_ids:
response = self.tester.delete(url + str(server_id))
self.assertTrue(response.status_code, 200)
response_data = json.loads(response.data.decode())
self.assertTrue(response_data['success'], 1)
def tearDown(self):
@classmethod
def tearDownClass(cls):
"""
This function deletes the 'parent_id.pkl' file which is created in
setup() function.

View File

@ -9,8 +9,7 @@
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression.test_setup import config_data
from regression.test_utils import get_ids
from . import utils as server_utils
class ServersGetTestCase(BaseTestGenerator):
@ -24,28 +23,23 @@ class ServersGetTestCase(BaseTestGenerator):
('Default Server Node url', dict(url='/browser/server/obj/'))
]
def setUp(self):
@classmethod
def setUpClass(cls):
"""
This function is used to add the server
:return: None
"""
utils.add_server(self.tester)
server_utils.add_server(cls.tester)
def runTest(self):
""" This function will fetch the added servers to object browser. """
all_id = get_ids()
server_ids = all_id["sid"]
srv_grp = config_data['test_server_group']
server_utils.get_server(self.tester)
for server_id in server_ids:
url = "{0}{1}/{2}".format(self.url, srv_grp, server_id)
response = self.tester.get(url, content_type='html/json')
self.assertEquals(response.status_code, 200)
def tearDown(self):
@classmethod
def tearDownClass(cls):
"""
This function deletes the added server and the 'parent_id.pkl' file
which is created in setup() function.
@ -53,5 +47,5 @@ class ServersGetTestCase(BaseTestGenerator):
:return: None
"""
utils.delete_server(self.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -8,10 +8,9 @@
# ##########################################################################
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression.test_setup import config_data
from . import utils as server_utils
class ServerUpdateTestCase(BaseTestGenerator):
@ -22,7 +21,8 @@ class ServerUpdateTestCase(BaseTestGenerator):
('Default Server Node url', dict(url='/browser/server/obj/'))
]
def setUp(self):
@classmethod
def setUpClass(cls):
"""
This function perform the four tasks
1. Add the test server
@ -33,13 +33,16 @@ class ServerUpdateTestCase(BaseTestGenerator):
"""
# Firstly, add the server
utils.add_server(self.tester)
server_utils.add_server(cls.tester)
# Get the server
utils.get_server(self.tester)
server_utils.get_server(cls.tester)
# Connect to server
self.server_connect, self.server_group, self.server_ids = \
utils.connect_server(self.tester)
if len(self.server_connect) == 0:
cls.server_connect, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect) == 0:
raise Exception("No Server(s) connected to update!!!")
def runTest(self):
@ -48,7 +51,8 @@ class ServerUpdateTestCase(BaseTestGenerator):
for server_id in self.server_ids:
data = {
"comment":
config_data['test_server_update_data'][0]['test_comment'],
server_utils.config_data['server_update_data'][0][
'comment'],
"id": server_id
}
put_response = self.tester.put(
@ -60,7 +64,8 @@ class ServerUpdateTestCase(BaseTestGenerator):
response_data = json.loads(put_response.data.decode())
self.assertTrue(response_data['success'], 1)
def tearDown(self):
@classmethod
def tearDownClass(cls):
"""
This function deletes the added server and the 'parent_id.pkl' file
which is created in setup() function.
@ -68,5 +73,5 @@ class ServerUpdateTestCase(BaseTestGenerator):
:return: None
"""
utils.delete_server(self.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,160 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
import json
import os
import pickle
from regression import test_utils as utils
from regression.test_setup import pickle_path, config_data
SERVER_URL = '/browser/server/obj/'
SERVER_CONNECT_URL = 'browser/server/connect/'
def write_server_id(response_data, pickle_id_dict):
"""
This function writes the server's details to file parent_id.pkl
:param response_data: server's data
:type response_data: list of dictionary
:param pickle_id_dict: contains ids of server,database,tables etc.
:type pickle_id_dict: dict
:return: None
"""
server_id = response_data['node']['_id']
if os.path.isfile(pickle_path):
existed_server_id = open(pickle_path, 'rb')
pickle_id_dict = pickle.load(existed_server_id)
pickle_id_dict["sid"].append(str(server_id))
output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, output)
output.close()
def add_server(tester):
"""
This function add the server in the existing server group
:param tester: test object
:type tester: flask test object
:return:None
"""
server_group, db_data, pickle_id_dict = utils.get_config_data()
url = "{0}{1}/".format(SERVER_URL, server_group)
for db_detail in db_data:
response = tester.post(url, data=json.dumps(db_detail),
content_type='html/json')
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
write_server_id(response_data, pickle_id_dict)
def get_server(tester):
"""
This function gets the added serer details
:param tester: test client object
:type tester: flask test object
:return: response_data
:rtype: list
"""
all_id = utils.get_ids()
server_ids = all_id["sid"]
for server_id in server_ids:
response = tester.get(SERVER_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id),
follow_redirects=True)
assert response.status_code == 200
def connect_server(tester):
"""
This function used to connect added server
:param tester:test client object
:type tester: flask test object
:param add_db_flag: flag for db add test case
:type add_db_flag: bool
:return: server_connect, server_group, server_id
:rtype: server_connect:dict, server_group:dict, server_id:str
"""
server_connect = []
servers = []
server_config = None
srv_id = utils.get_ids()
server_ids = srv_id["sid"]
# Connect to all servers
for server_id in server_ids:
response = tester.post(SERVER_CONNECT_URL + str(utils.SERVER_GROUP) +
'/' + server_id,
data=dict(
password=config_data
['server_credentials'][0]
['db_password']),
follow_redirects=True)
server_connect_detail = json.loads(response.data.decode('utf-8'))
db_user = server_connect_detail['data']['user']['name']
server_connect_detail['tablespace_path'] = None
# Get the server config of appropriate db user
for config in config_data['server_credentials']:
if db_user == config['db_username']:
server_config = config
if "tablespace_path" in server_config:
server_connect_detail['tablespace_path'] = \
server_config['tablespace_path']
server_connect.append(server_connect_detail)
servers.append(server_id)
return server_connect, utils.SERVER_GROUP, servers
def verify_server(tester, server_group, server_id):
"""This function verifies that server is connecting or not"""
response = tester.post(
'{0}{1}/{2}'.format(SERVER_CONNECT_URL, server_group, server_id),
data=dict(password=config_data
['server_credentials'][0]
['db_password']),
follow_redirects=True)
srv_connect = json.loads(response.data.decode('utf-8'))
return srv_connect
def delete_server(tester):
"""
This function used to delete the added servers
:param tester: test client object
:return: None
"""
all_id = utils.get_ids()
server_ids = all_id["sid"]
url = SERVER_URL + str(utils.SERVER_GROUP) + "/"
if len(server_ids) == 0:
raise Exception("No server(s) to delete!!!")
# Call api to delete the servers
for server_id in server_ids:
response = tester.delete(url + str(server_id))
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
assert response_data['success'] == 1

View File

@ -26,7 +26,7 @@ class SgNodeTestCase(BaseTestGenerator):
def runTest(self):
"""This function will check available server groups."""
server_group_id = config_data['test_server_group']
server_group_id = config_data['server_group']
response = self.tester.get(self.url + str(server_group_id),
content_type='html/json')
self.assertTrue(response.status_code, 200)

View File

@ -8,9 +8,12 @@
# ##########################################################################
import uuid
import json
from pgadmin.utils.route import BaseTestGenerator
from regression.test_setup import config_data
from regression import test_utils as utils
from utils import change_password
class ChangePasswordTestCase(BaseTestGenerator):
@ -24,9 +27,9 @@ class ChangePasswordTestCase(BaseTestGenerator):
# This testcase validates invalid confirmation password
('TestCase for Validating Incorrect_New_Password', dict(
password=(config_data['pgAdmin4_login_credentials']
['test_login_password']),
['login_password']),
new_password=(config_data['pgAdmin4_login_credentials']
['test_new_password']),
['new_password']),
new_password_confirm=str(uuid.uuid4())[4:8],
respdata='Passwords do not match')),
@ -34,7 +37,7 @@ class ChangePasswordTestCase(BaseTestGenerator):
# minimum length
('TestCase for Validating New_Password_Less_Than_Min_Length',
dict(password=(config_data['pgAdmin4_login_credentials']
['test_login_password']),
['login_password']),
new_password=str(uuid.uuid4())[4:8],
new_password_confirm=str(uuid.uuid4())[4:8],
respdata='Password must be at least 6 characters')),
@ -42,7 +45,7 @@ class ChangePasswordTestCase(BaseTestGenerator):
# This testcase validates if both password fields are left blank
('TestCase for Validating Empty_New_Password', dict(
password=(config_data['pgAdmin4_login_credentials']
['test_login_password']),
['login_password']),
new_password='', new_password_confirm='',
respdata='Password not provided')),
@ -50,57 +53,66 @@ class ChangePasswordTestCase(BaseTestGenerator):
('TestCase for Validating Incorrect_Current_Password', dict(
password=str(uuid.uuid4())[4:8],
new_password=(config_data['pgAdmin4_login_credentials']
['test_new_password']),
['new_password']),
new_password_confirm=(
config_data['pgAdmin4_login_credentials']
['test_new_password']),
['new_password']),
respdata='Invalid password')),
# This testcase checks for valid password
# This test case checks for valid password
('TestCase for Changing Valid_Password', dict(
password=(config_data['pgAdmin4_login_credentials']
['test_login_password']),
new_password=(config_data['pgAdmin4_login_credentials']
['test_new_password']),
valid_password='reassigning_password',
username=(config_data['pgAdmin4_test_user_credentials']
['login_username']),
password=(config_data['pgAdmin4_test_user_credentials']
['login_password']),
new_password=(config_data['pgAdmin4_test_user_credentials']
['new_password']),
new_password_confirm=(
config_data['pgAdmin4_login_credentials']
['test_new_password']),
respdata='You successfully changed your password.')),
('Reassigning_Password', dict(
test_case='reassigning_password',
password=(config_data['pgAdmin4_login_credentials']
['test_new_password']),
new_password=(config_data['pgAdmin4_login_credentials']
['test_login_password']),
new_password_confirm=(
config_data['pgAdmin4_login_credentials']
['test_login_password']),
config_data['pgAdmin4_test_user_credentials']
['new_password']),
respdata='You successfully changed your password.'))
]
@classmethod
def setUpClass(cls):
pass
def runTest(self):
"""This function will check change password functionality."""
# Check for 'test_case' exists in self For reassigning the password.
# Password gets change in change password test case.
if 'test_case' in dir(self):
email = \
config_data['pgAdmin4_login_credentials'][
'test_login_username']
password = \
config_data['pgAdmin4_login_credentials'][
'test_new_password']
response = self.tester.post('/login', data=dict(
email=email, password=password), follow_redirects=True)
response = self.tester.get('/change', follow_redirects=True)
self.assertIn('pgAdmin 4 Password Change', response.data.decode(
'utf-8'))
response = self.tester.post('/change', data=dict(
password=self.password,
new_password=self.new_password,
new_password_confirm=self.new_password_confirm),
# Check for 'valid_password' exists in self to test 'valid password'
# test case
if 'valid_password' in dir(self):
response = self.tester.post('/user_management/user/', data=dict(
email=self.username, newPassword=self.password,
confirmPassword=self.password, active=1, role="2"),
follow_redirects=True)
self.assertIn(self.respdata, response.data.decode('utf-8'))
user_id = json.loads(response.data.decode('utf-8'))['id']
# Logout the Administrator before login normal user
utils.logout_tester_account(self.tester)
response = self.tester.post('/login', data=dict(
email=self.username, password=self.password),
follow_redirects=True)
assert response.status_code == 200
# test the 'change password' test case
change_password(self)
# Delete the normal user after changing it's password
utils.logout_tester_account(self.tester)
# Login the Administrator before deleting normal user
utils.login_tester_account(self.tester)
response = self.tester.delete(
'/user_management/user/' + str(user_id),
follow_redirects=True)
assert response.status_code == 200
else:
change_password(self)
@classmethod
def tearDownClass(cls):
utils.login_tester_account(cls.tester)

View File

@ -24,20 +24,20 @@ class LoginTestCase(BaseTestGenerator):
# This test case validates the invalid/incorrect password
('TestCase for Checking Invalid_Password', dict(
email=(config_data['pgAdmin4_login_credentials']
['test_login_username']),
['login_username']),
password=str(uuid.uuid4())[4:8],
respdata='Invalid password')),
# This test case validates the empty password field
('Empty_Password', dict(
email=(config_data['pgAdmin4_login_credentials']
['test_login_username']), password='',
['login_username']), password='',
respdata='Password not provided')),
# This test case validates blank email field
('Empty_Email', dict(
email='', password=(config_data['pgAdmin4_login_credentials']
['test_login_password']),
['login_password']),
respdata='Email not provided')),
# This test case validates empty email and password
@ -49,7 +49,7 @@ class LoginTestCase(BaseTestGenerator):
('Invalid_Email', dict(
email=str(uuid.uuid4())[1:6] + '@xyz.com',
password=(config_data['pgAdmin4_login_credentials']
['test_login_password']),
['login_password']),
respdata='Specified user does not exist')),
# This test case validates invalid email and password
@ -62,21 +62,22 @@ class LoginTestCase(BaseTestGenerator):
# to login pgAdmin 4
('Valid_Credentials', dict(
email=(config_data['pgAdmin4_login_credentials']
['test_login_username']),
['login_username']),
password=(config_data['pgAdmin4_login_credentials']
['test_login_password']),
['login_password']),
respdata='Gravatar image for %s' %
config_data['pgAdmin4_login_credentials']
['test_login_username']))
['login_username']))
]
def setUp(self):
@classmethod
def setUpClass(cls):
"""
We need to logout the test client as we are testing scenarios of
logging in the client like invalid password, invalid emails,
empty credentials etc.
"""
utils.logout_tester_account(self.tester)
utils.logout_tester_account(cls.tester)
def runTest(self):
"""This function checks login functionality."""
@ -86,9 +87,10 @@ class LoginTestCase(BaseTestGenerator):
follow_redirects=True)
self.assertIn(self.respdata, response.data.decode('utf8'))
def tearDown(self):
@classmethod
def tearDownClass(cls):
"""
We need to again login the test client as soon as test scenarios
finishes.
"""
utils.login_tester_account(self.tester)
utils.login_tester_account(cls.tester)

View File

@ -24,7 +24,8 @@ class LogoutTest(BaseTestGenerator):
('Logging Out', dict(respdata='Redirecting...'))
]
def setUp(self):
@classmethod
def setUpClass(cls):
pass
def runTest(self):
@ -33,9 +34,10 @@ class LogoutTest(BaseTestGenerator):
response = self.tester.get('/logout')
self.assertIn(self.respdata, response.data.decode('utf8'))
def tearDown(self):
@classmethod
def tearDownClass(cls):
"""
We need to again login the test client as soon as test scenarios
finishes.
"""
utils.login_tester_account(self.tester)
utils.login_tester_account(cls.tester)

View File

@ -34,11 +34,12 @@ class ResetPasswordTestCase(BaseTestGenerator):
# This test case validates the valid email id
('TestCase for Validating Valid_Email', dict(
email=config_data['pgAdmin4_login_credentials']
['test_login_username'], respdata='pgAdmin 4'))
['login_username'], respdata='pgAdmin 4'))
]
def setUp(self):
logout_tester_account(self.tester)
@classmethod
def setUpClass(cls):
logout_tester_account(cls.tester)
def runTest(self):
"""This function checks reset password functionality."""
@ -51,5 +52,6 @@ class ResetPasswordTestCase(BaseTestGenerator):
follow_redirects=True)
self.assertIn(self.respdata, response.data.decode('utf-8'))
def tearDown(self):
login_tester_account(self.tester)
@classmethod
def tearDownClass(cls):
login_tester_account(cls.tester)

View File

@ -0,0 +1,21 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
def change_password(self):
response = self.tester.get('/change', follow_redirects=True)
self.assertIn('pgAdmin 4 Password Change', response.data.decode(
'utf-8'))
response = self.tester.post('/change', data=dict(
password=self.password,
new_password=self.new_password,
new_password_confirm=self.new_password_confirm),
follow_redirects=True)
self.assertIn(self.respdata, response.data.decode('utf-8'))

View File

@ -84,5 +84,6 @@ class BaseTestGenerator(unittest.TestCase):
self.app = app
# Initializing test_client.
def setTestClient(self, test_client):
self.tester = test_client
@classmethod
def setTestClient(cls, test_client):
cls.tester = test_client

View File

@ -75,22 +75,29 @@ General Information
Test Data Details
-----------------
"pgAdmin4 Login Credentials":
"pgAdmin4 Login Credentials" (used to login to pgAdmin):
test_login_username = login id
test_login_password = login password
test_new_password = new login password
login_username = login id
login_password = login password
new_password = new login password
"test_server_credentials":
"pgAdmin4 Test User Credentials" (dummy user used for testing user management):
test_name = Server/database Name
test_db_username = Database Username
test_host = IP Address of Server
test_db_password = Database Password
test_db_port = Database Port
test_maintenance_db = Maintenance Database
test_sslmode = SSL Mode
test_comment = Any Comments to add
login_username = login id
login_password = login password
new_password = new login password
"server_credentials":
name = Server/database Name
db_username = Database Username
host = IP Address of Server
db_password = Database Password
db_port = Database Port
maintenance_db = Maintenance Database
sslmode = SSL Mode
comment = Any Comments to add
tablespace_path = A path under which a tablespace can be created
Execution:

View File

@ -1,7 +1,7 @@
{
"test_add_database_data": [
"add_database_data": [
{
"test_privileges_acl": [
"privileges_acl": [
{
"grantee": "postgres",
"grantor": "postgres",
@ -19,9 +19,9 @@
]
}
],
"test_conn_limit": -1,
"test_owner": "postgres",
"test_fun_acl": [
"conn_limit": -1,
"owner": "postgres",
"fun_acl": [
{
"grantee": "postgres",
"grantor": "postgres",
@ -34,7 +34,7 @@
]
}
],
"test_seq_acl": [
"seq_acl": [
{
"grantee": "postgres",
"grantor": "postgres",
@ -57,7 +57,7 @@
]
}
],
"test_tbl_acl": [
"tbl_acl": [
{
"grantee": "postgres",
"grantor": "postgres",
@ -75,7 +75,7 @@
]
}
],
"test_type_acl": [
"type_acl": [
{
"grantee": "postgres",
"grantor": "postgres",
@ -88,112 +88,60 @@
]
}
],
"test_encoding": "UTF8",
"test_name": "test_db_automation",
"test_privileges": [],
"test_securities": [],
"test_variables": []
},
{
"test_privileges_acl": [
{
"grantee": "enterprisedb",
"grantor": "enterprisedb",
"privileges": [
{
"privilege_type": "C",
"privilege": true,
"with_grant": true
},
{
"privilege_type": "T",
"privilege": true,
"with_grant": false
}
]
"encoding": "UTF8",
"name": "test_db_automation",
"privileges": [],
"securities": [],
"variables": []
}
],
"test_conn_limit": -1,
"test_owner": "enterprisedb",
"test_fun_acl": [
"db_update_data": [
{
"grantee": "enterprisedb",
"grantor": "enterprisedb",
"privileges": [
{
"privilege_type": "X",
"privilege": true,
"with_grant": false
}
]
}
],
"test_seq_acl": [
{
"grantee": "enterprisedb",
"grantor": "enterprisedb",
"privileges": [
{
"privilege_type": "r",
"privilege": true,
"with_grant": false
},
{
"privilege_type": "w",
"privilege": true,
"with_grant": false
},
{
"privilege_type": "U",
"privilege": true,
"with_grant": false
}
]
}
],
"test_tbl_acl": [
{
"grantee": "enterprisedb",
"grantor": "enterprisedb",
"privileges": [
{
"privilege_type": "a",
"privilege": true,
"with_grant": true
},
{
"privilege_type": "r",
"privilege": true,
"with_grant": false
}
]
}
],
"test_type_acl": [
{
"grantee": "enterprisedb",
"grantor": "enterprisedb",
"privileges": [
{
"privilege_type": "U",
"privilege": true,
"with_grant": false
}
]
}
],
"test_encoding": "UTF8",
"test_name": "test_db_automation",
"test_privileges": [],
"test_securities": [],
"test_variables": []
"comment": "This is db update comment"
}
],
"test_db_update_data": [
"lr_credentials": {
"can_login": "true",
"conn_limit": -1,
"create_role": "true",
"role_inherit": "true",
"role_membership": [],
"lr_name": "testlrg1",
"lr_password": "edb",
"lr_validity": "12/27/2016",
"sec_lable": [],
"variable":[
{"name":"work_mem",
"database":"postgres",
"value":65
}]
},
"lr_update_data": {
"comment": "This is db update comment"
},
"tablespc_credentials":[{
"tblspace_name": "test_tablespace",
"spc_seclable": [],
"spc_acl": [
{
"test_comment": "This is db update comment"
"grantee":"postgres",
"grantor":"postgres",
"privileges":[
{
"privilege_type":"C",
"privilege":true,
"with_grant":false
}
]
}
],
"spc_opts": [],
"spc_user": "postgres"
}],
"tbspc_update_data": {
"comment": "This is tablespace update comment"
}
}

View File

@ -1,35 +1,31 @@
{
"pgAdmin4_login_credentials": {
"test_new_password": "NEWPASSWORD",
"test_login_password": "PASSWORD",
"test_login_username": "USER@EXAMPLE.COM"
"new_password": "NEWPASSWORD",
"login_password": "PASSWORD",
"login_username": "USER@EXAMPLE.COM"
},
"test_server_group": 1,
"test_server_credentials": [
{
"test_name": "PostgreSQL 9.4",
"test_comment": "PostgreSQL 9.4 Server (EDB Installer)",
"test_db_username": "postgres",
"test_host": "localhost",
"test_db_password": "PASSWORD",
"test_db_port": 5432,
"test_maintenance_db": "postgres",
"test_sslmode": "prefer"
"pgAdmin4_test_user_credentials": {
"new_password": "NEWPASSWORD",
"login_password": "PASSWORD",
"login_username": "USER@EXAMPLE.COM"
},
"server_group": 1,
"server_credentials": [
{
"test_name": "Postgres Plus Advanced Server 9.4",
"test_comment": "Postgres Plus Advanced 9.4 Server (EDB Installer)",
"test_db_username": "enterprisedb",
"test_host": "localhost",
"test_db_password": "edb",
"test_db_port": 5444,
"test_maintenance_db": "edb",
"test_sslmode": "prefer"
"name": "PostgreSQL 9.4",
"comment": "PostgreSQL 9.4 Server (EDB Installer)",
"db_username": "postgres",
"host": "localhost",
"db_password": "PASSWORD",
"db_port": 5432,
"maintenance_db": "postgres",
"sslmode": "prefer",
"tablespace_path": ""
}
],
"test_server_update_data": [
"server_update_data": [
{
"test_comment": "This is test update comment"
"comment": "This is test update comment"
}
]
}

View File

@ -12,10 +12,24 @@ import os
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
with open(CURRENT_PATH + '/test_config.json') as data_file:
# with open(CURRENT_PATH + '/test_config.json') as data_file:
# config_data = json.load(data_file)
#
# with open(CURRENT_PATH + '/test_advanced_config.json') as data_file:
# advanced_config_data = json.load(data_file)
try:
with open(CURRENT_PATH + '/test_config.json') as data_file:
config_data = json.load(data_file)
except:
with open(CURRENT_PATH + '/test_config.json.in') as data_file:
config_data = json.load(data_file)
with open(CURRENT_PATH + '/test_advanced_config.json') as data_file:
try:
with open(CURRENT_PATH + '/test_advanced_config.json') as data_file:
advanced_config_data = json.load(data_file)
except:
with open(CURRENT_PATH + '/test_advanced_config.json.in') as data_file:
advanced_config_data = json.load(data_file)
pickle_path = os.path.join(CURRENT_PATH, 'parent_id.pkl')

View File

@ -9,16 +9,24 @@
import os
import pickle
import json
import uuid
from test_setup import config_data, advanced_config_data, \
pickle_path
from test_setup import config_data, pickle_path
SERVER_URL = '/browser/server/obj/'
SERVER_CONNECT_URL = 'browser/server/connect/'
DATABASE_URL = '/browser/database/obj/'
DATABASE_CONNECT_URL = 'browser/database/connect/'
SERVER_GROUP = config_data['server_group']
def get_pickle_id_dict():
"""This function returns the empty dict of server config data"""
pickle_id_dict = {
"sid": [], # server
"did": [], # database
"lrid": [], # role
"tsid": [], # tablespace
"scid": [] # schema
}
return pickle_id_dict
def get_ids(url=pickle_path):
@ -34,43 +42,9 @@ def get_ids(url=pickle_path):
output = open(url, 'rb')
ids = pickle.load(output)
output.close()
return ids
def verify_database(tester, server_group, server_id, db_id):
"""
This function verifies that database is exists and whether it connect
successfully or not
:param tester: test client
:type tester: flask test client object
:param server_group: server group id
:type server_group: int
:param server_id: server id
:type server_id: str
:param db_id: database id
:type db_id: str
:return: temp_db_con
:rtype: list
"""
# Connect to server
response = tester.post('{0}{1}/{2}'.format(
SERVER_CONNECT_URL, server_group, server_id), data=dict(
password=config_data['test_server_credentials'][0][
'test_db_password']),
follow_redirects=True)
# Connect to database
con_response = tester.post('{0}{1}/{2}/{3}'.format(
DATABASE_CONNECT_URL, server_group, server_id, db_id),
follow_redirects=True)
temp_db_con = json.loads(con_response.data.decode('utf-8'))
return temp_db_con
def test_getnodes(tester=None):
# Connect to server and database.
@ -81,53 +55,14 @@ def test_getnodes(tester=None):
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
srv_grp = config_data['test_server_group']
db_con = []
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con.append(verify_database(tester, srv_grp, server_id, db_id))
db_con.append(verify_database(tester, SERVER_GROUP, server_id, db_id))
return db_con
def get_db_data(server_connect_data):
"""
This function is used to get advance config test data for appropriate
server
:param server_connect_data: list of server details
:return data: database details
:rtype: dict
"""
adv_config_data = None
data = None
db_user = server_connect_data['data']['user']['name']
# Get the config data of appropriate db user
for config_test_data in advanced_config_data['test_add_database_data']:
if db_user == config_test_data['test_owner']:
adv_config_data = config_test_data
if adv_config_data is not None:
data = {
"datacl": adv_config_data['test_privileges_acl'],
"datconnlimit": adv_config_data['test_conn_limit'],
"datowner": adv_config_data['test_owner'],
"deffuncacl": adv_config_data['test_fun_acl'],
"defseqacl": adv_config_data['test_seq_acl'],
"deftblacl": adv_config_data['test_tbl_acl'],
"deftypeacl": adv_config_data['test_type_acl'],
"encoding": adv_config_data['test_encoding'],
"name": str(uuid.uuid4())[1:8],
"privileges": adv_config_data['test_privileges'],
"securities": adv_config_data['test_securities'],
"variables": adv_config_data['test_variables']
}
return data
def login_tester_account(tester):
"""
This function login the test account using credentials mentioned in
@ -139,9 +74,9 @@ def login_tester_account(tester):
"""
email = \
config_data['pgAdmin4_login_credentials']['test_login_username']
config_data['pgAdmin4_login_credentials']['login_username']
password = \
config_data['pgAdmin4_login_credentials']['test_login_password']
config_data['pgAdmin4_login_credentials']['login_password']
response = tester.post('/login', data=dict(
email=email, password=password), follow_redirects=True)
@ -158,6 +93,7 @@ def logout_tester_account(tester):
response = tester.get('/logout')
# Config data for parent_id.pkl
def get_config_data():
"""
This function get the data related to server group and database
@ -168,23 +104,18 @@ def get_config_data():
"""
db_data = []
pickle_id_dict = get_pickle_id_dict()
server_group = config_data['server_group']
pickle_id_dict = {
"sid": [], # server
"did": [] # database
}
server_group = config_data['test_server_group']
for srv in config_data['test_server_credentials']:
data = {"name": srv['test_name'],
for srv in config_data['server_credentials']:
data = {"name": srv['name'],
"comment": "",
"host": srv['test_host'],
"port": srv['test_db_port'],
"db": srv['test_maintenance_db'],
"username": srv['test_db_username'],
"host": srv['host'],
"port": srv['db_port'],
"db": srv['maintenance_db'],
"username": srv['db_username'],
"role": "",
"sslmode": srv['test_sslmode']}
"sslmode": srv['sslmode']}
db_data.append(data)
return server_group, db_data, pickle_id_dict
@ -205,40 +136,12 @@ def write_parent_id(response_data, pickle_id_dict):
existed_server_id = open(pickle_path, 'rb')
pickle_id_dict = pickle.load(existed_server_id)
pickle_id_dict["sid"].append(server_id)
pickle_id_dict["sid"].append(str(server_id))
output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, output)
output.close()
def write_db_parent_id(response_data):
"""
This function writes the server and database related data like server
name, server id , database name, database id etc.
:param response_data: server and databases details
:type response_data: dict
:return: None
"""
db_id = response_data['node']['_id']
server_id = response_data['node']['_pid']
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'did' in pickle_id_dict:
if pickle_id_dict['did']:
# Add the db_id as value in dict
pickle_id_dict["did"][0].update({server_id: db_id})
else:
# Create new dict with server_id and db_id
pickle_id_dict["did"].append({server_id: db_id})
db_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, db_output)
db_output.close()
def delete_parent_id_file():
"""
This function deletes the file parent_id.pkl which contains server and
@ -250,148 +153,3 @@ def delete_parent_id_file():
if os.path.isfile(pickle_path):
os.remove(pickle_path)
def add_server(tester):
"""
This function add the server in the existing server group
:param tester: test object
:type tester: flask test object
:return:None
"""
server_group, db_data, pickle_id_dict = get_config_data()
url = "{0}{1}/".format(SERVER_URL, server_group)
for db_detail in db_data:
response = tester.post(url, data=json.dumps(db_detail),
content_type='html/json')
response_data = json.loads(response.data.decode())
write_parent_id(response_data, pickle_id_dict)
def get_server(tester):
"""
This function gets the added serer details
:param tester: test client object
:type tester: flask test object
:return: response_data
:rtype: list
"""
all_id = get_ids()
server_ids = all_id["sid"]
server_group = config_data['test_server_group']
for server_id in server_ids:
response = tester.get(SERVER_URL + str(server_group) + '/' +
str(server_id),
follow_redirects=True)
response_data = json.loads(response.data.decode())
def connect_server(tester):
"""
This function used to connect added server
:param tester:test client object
:type tester: flask test object
:return: server_connect, server_group, server_id
:rtype: server_connect:dict, server_group:dict, server_id:str
"""
server_connect = []
servers = []
srv_id = get_ids()
server_ids = srv_id["sid"]
server_group = config_data['test_server_group']
# Connect to all servers
for server_id in server_ids:
response = tester.post(SERVER_CONNECT_URL + str(server_group) +
'/' + server_id,
data=dict(
password=config_data
['test_server_credentials'][0]
['test_db_password']),
follow_redirects=True)
server_connect_detail = json.loads(response.data.decode())
connect_database(tester, server_connect_detail, server_id,
server_group)
server_connect.append(server_connect_detail)
servers.append(server_id)
return server_connect, server_group, servers
def connect_database(tester, server_connect, server_id, server_group):
"""
This function is used to connect database and writes it's details to
file 'parent_id.pkl'
:param tester: test client object
:type tester: flask test client object
:param server_connect: server's data
:type server_connect: dict
:param server_id: server id
:type server_id: str
:param server_group: server group name
:type server_group: str
:return: None
"""
if server_connect['data']['connected']:
db_data = get_db_data(server_connect)
db_response = tester.post(
DATABASE_URL + str(server_group) + "/" + server_id + "/",
data=json.dumps(db_data),
content_type='html/json')
response_data = json.loads(db_response.data.decode())
write_db_parent_id(response_data)
def delete_server(tester):
"""
This function used to delete the added servers
:param tester: test client object
:return: None
"""
srv_grp = config_data['test_server_group']
all_id = get_ids()
server_ids = all_id["sid"]
url = SERVER_URL + str(srv_grp) + "/"
# Call api to delete the servers
for server_id in server_ids:
response = tester.delete(url + str(server_id))
assert response.status_code == 200
response_data = json.loads(response.data.decode())
assert response_data['success'] == 1
def delete_database(tester):
"""
This function used to delete the added databases
:param tester: test client object
:return: None
"""
srv_grp = config_data['test_server_group']
all_id = get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id['did'][0]
db_con = test_getnodes(tester)
if len(db_con) == 0:
raise Exception("No database(s) to delete.")
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
response = tester.delete(DATABASE_URL + str(srv_grp) + '/' +
str(server_id) + '/' + str(db_id),
follow_redirects=True)
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
assert response_data['success'] == 1