Introduce a test config database for the regression tests, and track/remove objects that are created during testing.

pull/3/head
Navnath Gadakh 2016-09-14 16:26:12 +01:00 committed by Dave Page
parent 3807ba047b
commit d3d8836f61
18 changed files with 648 additions and 654 deletions

View File

@ -266,6 +266,15 @@ STORAGE_DIR = os.path.join(
'storage'
)
##########################################################################
# Test settings - used primarily by the regression suite, not for users
##########################################################################
# Set default testing mode
TESTING_MODE = False
# The default path for SQLite database for testing
TEST_SQLITE_PATH = os.path.join(DATA_DIR, 'test_pgadmin4.db')
##########################################################################
# Allows flask application to response to the each request asynchronously
##########################################################################

View File

@ -7,57 +7,52 @@
#
# ##################################################################
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression import test_server_dict
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as database_utils
class DatabaseAddTestCase(BaseTestGenerator):
"""
This class will check server group node present on the object browser's
tree node by response code.
"""
"""This class will test the ADD database API"""
scenarios = [
# Fetching default URL for database node.
('Check Databases Node URL', dict(url='/browser/database/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function used to add the sever
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
def setUp(self):
pass
def runTest(self):
""" This function will add database under 1st server of tree node. """
server_id = test_server_dict["server"][0]["server_id"]
server_utils.connect_server(self, server_id)
database_utils.add_database(self.tester, self.server_connect_response,
self.server_ids)
data = database_utils.get_db_data()
self.db_name = data['name']
response = self.tester.post(self.url + str(utils.SERVER_GROUP) +
"/" + str(server_id) + "/",
data=json.dumps(data),
content_type='html/json')
self.assertEquals(response.status_code, 200)
response_data = json.loads(response.data.decode('utf-8'))
db_id = response_data['node']['_id']
db_dict = {"db_id": db_id, "db_name": self.db_name}
utils.write_node_info(int(server_id), "did", db_dict)
@classmethod
def tearDownClass(cls):
def tearDown(self):
"""
This function deletes the added database, added server and the
'parent_id.pkl' file which is created in setup()
:return: None
This function delete the database from server added in SQLite and
clears the node_info_dict
"""
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()
connection = utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'])
utils.drop_database(connection, self.db_name)
utils.clear_node_info_dict()

View File

@ -6,16 +6,14 @@
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression import test_server_dict
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as database_utils
class DatabaseDeleteTestCase(BaseTestGenerator):
""" This class will delete the database under last added server. """
scenarios = [
# Fetching default URL for database node.
('Check Databases Node URL', dict(url='/browser/database/obj/'))
@ -23,42 +21,23 @@ class DatabaseDeleteTestCase(BaseTestGenerator):
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
cls.db_id = utils.create_database(cls.server, "test_db_delete")
def runTest(self):
""" This function will delete the database."""
database_utils.delete_database(self.tester)
server_id = test_server_dict["server"][0]["server_id"]
server_response = server_utils.connect_server(self, server_id)
if server_response["data"]["connected"]:
db_id = self.db_id
response = self.tester.delete(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
else:
raise Exception("Could not connect to server to delete the "
"database.")
@classmethod
def tearDownClass(cls):
"""
This function deletes the added server and the 'parent_id.pkl' file
which is created in setup() function.
:return: None
"""
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()
pass

View File

@ -9,9 +9,7 @@
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression.test_setup import config_data
from regression.test_utils import get_ids
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from regression import test_server_dict
from . import utils as database_utils
@ -19,66 +17,32 @@ class DatabasesGetTestCase(BaseTestGenerator):
"""
This class will fetch database added under last added server.
"""
scenarios = [
# Fetching default URL for database node.
('Check Dat abases Node URL', dict(url='/browser/database/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
def runTest(self):
""" This function will fetch added database. """
all_id = get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
srv_grp = config_data['server_group']
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(self.tester, srv_grp,
server_data = test_server_dict["database"][0]
server_id = server_data["server_id"]
db_id = server_data['db_id']
db_con = database_utils.verify_database(self,
utils.SERVER_GROUP,
server_id,
db_id)
if db_con["info"] == "Database connected.":
try:
response = self.tester.get(
self.url + str(srv_grp) + '/' + str(server_id) + '/' +
self.url + str(utils.SERVER_GROUP) + '/' + str(
server_id) + '/' +
str(db_id), follow_redirects=True)
self.assertEquals(response.status_code, 200)
except Exception as exception:
raise Exception("Error while getting database. %s" % exception)
finally:
# Disconnect database to delete it
database_utils.disconnect_database(self, server_id, db_id)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added database, added server
and the 'parent_id.pkl' file which is created in setup() function.
:return: None
"""
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()
else:
raise Exception("Could not connect to database.")

View File

@ -11,17 +11,13 @@ import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression import test_server_dict
from regression.test_setup import advanced_config_data
from regression.test_utils import get_ids
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as database_utils
class DatabasesUpdateTestCase(BaseTestGenerator):
"""
This class will update the database under last added server.
"""
"""This class will update the database under last added server."""
scenarios = [
# Fetching default URL for database node.
('Check Databases Node', dict(url='/browser/database/obj/'))
@ -29,63 +25,46 @@ class DatabasesUpdateTestCase(BaseTestGenerator):
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server
2. Connect to server
3. Add the databases
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
cls.db_name = "test_db_put"
cls.db_id = utils.create_database(cls.server, cls.db_name)
def runTest(self):
""" This function will update the comments field of database."""
all_id = get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(self.tester,
server_id = test_server_dict["server"][0]["server_id"]
db_id = self.db_id
db_con = database_utils.verify_database(self,
utils.SERVER_GROUP,
server_id,
db_id)
if db_con["info"] == "Database connected.":
try:
data = {
"comments": advanced_config_data["db_update_data"][0]
["comment"],
"comments": advanced_config_data["db_update_data"]["comment"],
"id": db_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' + str(
response = self.tester.put(self.url + str(utils.SERVER_GROUP) + '/' + str(
server_id) + '/' +
str(db_id), data=json.dumps(data), follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
self.assertEquals(response.status_code, 200)
except Exception as exception:
raise Exception("Error while updating database details. %s" %
exception)
finally:
# Disconnect database to delete it
database_utils.disconnect_database(self, server_id, db_id)
else:
raise Exception("Error while updating database details.")
@classmethod
def tearDownClass(self):
def tearDownClass(cls):
"""
This function deletes the added server and 'parent_id.pkl' file
which is created in setup() function.
:return: None
This function delete the database from server added in SQLite and
clears the node_info_dict
"""
database_utils.delete_database(self.tester)
server_utils.delete_server(self.tester)
utils.delete_parent_id_file()
connection = utils.get_db_connection(cls.server['db'],
cls.server['username'],
cls.server['db_password'],
cls.server['host'],
cls.server['port'])
utils.drop_database(connection, cls.db_name)
utils.clear_node_info_dict()

View File

@ -8,11 +8,9 @@
# ##########################################################################
import json
import os
import pickle
import uuid
from regression.test_setup import pickle_path, advanced_config_data
from regression.test_setup import advanced_config_data
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from regression import test_utils as utils
@ -21,26 +19,11 @@ DATABASE_URL = '/browser/database/obj/'
DATABASE_CONNECT_URL = 'browser/database/connect/'
def get_db_data(server_connect_data):
"""
This function is used to get advance config test data for appropriate
server
:param server_connect_data: list of server details
:return data: database details
:rtype: dict
"""
adv_config_data = None
def get_db_data():
"""This function returns the database details from json file"""
data = None
db_user = server_connect_data['data']['user']['name']
# Get the config data of appropriate db user
for config_test_data in advanced_config_data['add_database_data']:
if db_user == config_test_data['owner']:
adv_config_data = config_test_data
if adv_config_data is not None:
if advanced_config_data['add_database_data'] is not None:
adv_config_data = advanced_config_data['add_database_data']
data = {
"datacl": adv_config_data['privileges_acl'],
"datconnlimit": adv_config_data['conn_limit'],
@ -58,67 +41,13 @@ def get_db_data(server_connect_data):
return data
def write_db_id(response_data):
"""
This function writes the server and database related data like server
name, server id , database name, database id etc.
:param response_data: server and databases details
:type response_data: dict
:return: None
"""
db_id = response_data['node']['_id']
server_id = response_data['node']['_pid']
pickle_id_dict = utils.get_pickle_id_dict()
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'did' in pickle_id_dict:
if pickle_id_dict['did']:
# Add the db_id as value in dict
pickle_id_dict["did"][0].update({server_id: db_id})
else:
# Create new dict with server_id and db_id
pickle_id_dict["did"].append({server_id: db_id})
db_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, db_output)
db_output.close()
def add_database(tester, server_connect_response, server_ids):
"""
This function add the database into servers
:param tester: flask test client
:type tester: flask test object
:param server_connect_response: server response
:type server_connect_response: dict
:param server_ids: server ids
:type server_ids: list
:return: None
"""
for server_connect, server_id in zip(server_connect_response, server_ids):
if server_connect['data']['connected']:
data = get_db_data(server_connect)
db_response = tester.post(DATABASE_URL + str(utils.SERVER_GROUP) +
"/" + server_id + "/",
data=json.dumps(data),
content_type='html/json')
assert db_response.status_code == 200
response_data = json.loads(db_response.data.decode('utf-8'))
write_db_id(response_data)
def verify_database(tester, server_group, server_id, db_id):
def verify_database(self, server_group, server_id, db_id):
"""
This function verifies that database is exists and whether it connect
successfully or not
:param tester: test client
:type tester: flask test client object
:param self: class object of test case class
:type self: class
:param server_group: server group id
:type server_group: int
:param server_id: server id
@ -130,15 +59,23 @@ def verify_database(tester, server_group, server_id, db_id):
"""
# Verify servers
server_utils.verify_server(tester, server_group, server_id)
server_utils.connect_server(self, server_id)
# Connect to database
con_response = tester.post('{0}{1}/{2}/{3}'.format(
db_con = self.tester.post('{0}{1}/{2}/{3}'.format(
DATABASE_CONNECT_URL, server_group, server_id, db_id),
follow_redirects=True)
temp_db_con = json.loads(con_response.data.decode('utf-8'))
self.assertEquals(db_con.status_code, 200)
db_con = json.loads(db_con.data.decode('utf-8'))
return db_con
return temp_db_con
def disconnect_database(self, server_id, db_id):
"""This function disconnect the db"""
db_con = self.tester.delete('{0}{1}/{2}/{3}'.format(
'browser/database/connect/', utils.SERVER_GROUP, server_id, db_id),
follow_redirects=True)
self.assertEquals(db_con.status_code, 200)
def delete_database(tester):

View File

@ -7,6 +7,8 @@
#
# ##########################################################################
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from . import utils as server_utils
@ -26,17 +28,19 @@ class ServersAddTestCase(BaseTestGenerator):
def runTest(self):
""" This function will add the server under default server group."""
server_utils.add_server(self.tester)
url = "{0}{1}/".format(self.url, utils.SERVER_GROUP)
response = self.tester.post(url, data=json.dumps(self.server),
content_type='html/json')
self.assertEquals(response.status_code, 200)
response_data = json.loads(response.data.decode('utf-8'))
server_id = response_data['node']['_id']
utils.write_node_info(int(server_id), "sid", self.server)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added server and the 'parent_id.pkl' file
which is created in setup() function.
:return: None
This function delete the server from SQLite & clears the node_info_dict
"""
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()
server_id = server_utils.get_server_id()
utils.delete_server(server_id)
utils.clear_node_info_dict()

View File

@ -7,10 +7,10 @@
#
# ##################################################################
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from . import utils as server_utils
@ -24,28 +24,27 @@ class ServerDeleteTestCase(BaseTestGenerator):
@classmethod
def setUpClass(cls):
"""
This function is used to add the server
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
"""This function add the server to test the DELETE API"""
server_utils.add_server(cls.server)
def runTest(self):
""" This function will get all available servers under object browser
and delete the last server using server id."""
"""This function deletes the added server"""
all_id = utils.get_node_info_dict()
servers_info = all_id["sid"]
url = self.url + str(utils.SERVER_GROUP) + "/"
server_utils.delete_server(self.tester)
if len(servers_info) == 0:
raise Exception("No server to delete!!!")
# Call API to delete the servers
server_id = list(servers_info[0].keys())[0]
response = self.tester.delete(url + str(server_id))
self.assertEquals(response.status_code, 200)
response_data = json.loads(response.data.decode('utf-8'))
self.assertEquals(response_data['success'], 1)
@classmethod
def tearDownClass(cls):
"""
This function deletes the 'parent_id.pkl' file which is created in
setup() function.
:return: None
"""
utils.delete_parent_id_file()
"""This function calls the clear_node_info_dict() function to clears
the node_info_dict"""
utils.clear_node_info_dict()

View File

@ -9,7 +9,7 @@
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from . import utils as server_utils
from regression import test_server_dict
class ServersGetTestCase(BaseTestGenerator):
@ -23,29 +23,13 @@ class ServersGetTestCase(BaseTestGenerator):
('Default Server Node url', dict(url='/browser/server/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function is used to add the server
:return: None
"""
server_utils.add_server(cls.tester)
def runTest(self):
""" This function will fetch the added servers to object browser. """
server_id = test_server_dict["server"][0]["server_id"]
if not server_id:
raise Exception("Server not found to test GET API")
response = self.tester.get(self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
server_utils.get_server(self.tester)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added server and the 'parent_id.pkl' file
which is created in setup() function.
:return: None
"""
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -23,32 +23,18 @@ class ServerUpdateTestCase(BaseTestGenerator):
@classmethod
def setUpClass(cls):
"""
This function perform the four tasks
1. Add the test server
2. Get the server
3. Connect to server
:return: None
"""
# Firstly, add the server
server_utils.add_server(cls.tester)
# Get the server
server_utils.get_server(cls.tester)
# Connect to server
cls.server_connect, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect) == 0:
raise Exception("No Server(s) connected to update!!!")
"""This function add the server to test the PUT API"""
server_utils.add_server(cls.server)
def runTest(self):
""" This function will update the server's comment field. """
"""This function update the server details"""
all_id = utils.get_node_info_dict()
servers_info = all_id["sid"]
for server_id in self.server_ids:
if len(servers_info) == 0:
raise Exception("No server to update.")
server_id = list(servers_info[0].keys())[0]
data = {
"comment":
server_utils.config_data['server_update_data'][0][
@ -56,22 +42,16 @@ class ServerUpdateTestCase(BaseTestGenerator):
"id": server_id
}
put_response = self.tester.put(
self.url + str(self.server_group) + '/' +
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id), data=json.dumps(data),
content_type='html/json')
self.assertEquals(put_response.status_code, 200)
response_data = json.loads(put_response.data.decode())
self.assertTrue(response_data['success'], 1)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added server and the 'parent_id.pkl' file
which is created in setup() function.
:return: None
This function delete the server from SQLite & clears the node_info_dict
"""
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()
server_id = server_utils.get_server_id()
utils.delete_server(server_id)
utils.clear_node_info_dict()

View File

@ -7,154 +7,64 @@
#
# ##########################################################################
import json
import os
import pickle
from __future__ import print_function
import sys
import json
import sqlite3
import config
from regression import node_info_dict
from regression import test_utils as utils
from regression.test_setup import pickle_path, config_data
from regression.test_setup import config_data
SERVER_URL = '/browser/server/obj/'
SERVER_CONNECT_URL = 'browser/server/connect/'
def write_server_id(response_data, pickle_id_dict):
"""
This function writes the server's details to file parent_id.pkl
def get_server_id():
"""This function returns the server id from node_info_dict"""
:param response_data: server's data
:type response_data: list of dictionary
:param pickle_id_dict: contains ids of server,database,tables etc.
:type pickle_id_dict: dict
:return: None
"""
server_id = response_data['node']['_id']
if os.path.isfile(pickle_path):
existed_server_id = open(pickle_path, 'rb')
pickle_id_dict = pickle.load(existed_server_id)
pickle_id_dict["sid"].append(str(server_id))
output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, output)
output.close()
server_id = 0
if "sid" in node_info_dict:
if node_info_dict['sid']:
server_id = list(node_info_dict['sid'][0].keys())[0]
return server_id
def add_server(tester):
"""
This function add the server in the existing server group
:param tester: test object
:type tester: flask test object
:return:None
"""
server_group, db_data, pickle_id_dict = utils.get_config_data()
url = "{0}{1}/".format(SERVER_URL, server_group)
for db_detail in db_data:
response = tester.post(url, data=json.dumps(db_detail),
content_type='html/json')
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
write_server_id(response_data, pickle_id_dict)
def get_server(tester):
"""
This function gets the added serer details
:param tester: test client object
:type tester: flask test object
:return: response_data
:rtype: list
"""
all_id = utils.get_ids()
server_ids = all_id["sid"]
for server_id in server_ids:
response = tester.get(SERVER_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id),
follow_redirects=True)
assert response.status_code == 200
def connect_server(tester):
def connect_server(self, server_id):
"""
This function used to connect added server
:param tester:test client object
:type tester: flask test object
:param add_db_flag: flag for db add test case
:type add_db_flag: bool
:return: server_connect, server_group, server_id
:rtype: server_connect:dict, server_group:dict, server_id:str
:param self: class object of server's test class
:type self: class
:param server_id: flag for db add test case
:type server_id: bool
"""
server_connect = []
servers = []
server_config = None
srv_id = utils.get_ids()
server_ids = srv_id["sid"]
# Connect to all servers
for server_id in server_ids:
response = tester.post(SERVER_CONNECT_URL + str(utils.SERVER_GROUP) +
'/' + server_id,
data=dict(
password=config_data
['server_credentials'][0]
['db_password']),
response = self.tester.post(SERVER_CONNECT_URL + str(utils.SERVER_GROUP) +
'/' + str(server_id),
data=dict(password=self.server['db_password']),
follow_redirects=True)
server_connect_detail = json.loads(response.data.decode('utf-8'))
db_user = server_connect_detail['data']['user']['name']
server_connect_detail['tablespace_path'] = None
# Get the server config of appropriate db user
for config in config_data['server_credentials']:
if db_user == config['db_username']:
server_config = config
if "tablespace_path" in server_config:
server_connect_detail['tablespace_path'] = \
server_config['tablespace_path']
server_connect.append(server_connect_detail)
servers.append(server_id)
return server_connect, utils.SERVER_GROUP, servers
def verify_server(tester, server_group, server_id):
"""This function verifies that server is connecting or not"""
response = tester.post(
'{0}{1}/{2}'.format(SERVER_CONNECT_URL, server_group, server_id),
data=dict(password=config_data
['server_credentials'][0]
['db_password']),
follow_redirects=True)
srv_connect = json.loads(response.data.decode('utf-8'))
return srv_connect
def delete_server(tester):
"""
This function used to delete the added servers
:param tester: test client object
:return: None
"""
all_id = utils.get_ids()
server_ids = all_id["sid"]
url = SERVER_URL + str(utils.SERVER_GROUP) + "/"
if len(server_ids) == 0:
raise Exception("No server(s) to delete!!!")
# Call api to delete the servers
for server_id in server_ids:
response = tester.delete(url + str(server_id))
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
assert response_data['success'] == 1
return response_data
def add_server(server):
try:
conn = sqlite3.connect(config.SQLITE_PATH)
cur = conn.cursor()
server_details = (
1, utils.SERVER_GROUP, server['name'], server['host'],
server['port'], server['db'], server['username'],
server['role'], server['sslmode'],
server['comment'])
cur.execute(
'INSERT INTO server (user_id, servergroup_id, name, host, '
'port, maintenance_db, username, role, ssl_mode,'
' comment) VALUES (?,?,?,?,?,?,?,?,?,?)', server_details)
server_id = cur.lastrowid
# Add server info to node_info_dict
utils.write_node_info(int(server_id), "sid", server)
conn.commit()
except Exception as err:
raise Exception(err)

View File

@ -7,8 +7,14 @@
#
##############################################################
import sys
import unittest
from abc import ABCMeta, abstractmethod
from importlib import import_module
from werkzeug.utils import find_modules
import config
class TestsGeneratorRegistry(ABCMeta):
@ -43,20 +49,26 @@ class TestsGeneratorRegistry(ABCMeta):
ABCMeta.__init__(cls, name, bases, d)
@staticmethod
def import_app_modules(module_name):
"""As we are running test suite for each server. To catch
the test cases, delete the previously imported module
"""
if str(module_name) in sys.modules.keys():
del sys.modules[module_name]
import_module(module_name)
@classmethod
def load_generators(cls, pkg):
cls.registry = dict()
from importlib import import_module
from werkzeug.utils import find_modules
import config
# Check for SERVER mode
if config.SERVER_MODE:
for module_name in find_modules(pkg, False, True):
try:
module = import_module(module_name)
if "tests." in str(module_name):
cls.import_app_modules(module_name)
except ImportError:
pass
else:
@ -65,7 +77,7 @@ class TestsGeneratorRegistry(ABCMeta):
# Exclude the test cases in browser node if SERVER_MODE
# is False
if "pgadmin.browser.tests" not in module_name:
module = import_module(module_name)
cls.import_app_modules(module_name)
except ImportError:
pass
@ -75,6 +87,11 @@ import six
@six.add_metaclass(TestsGeneratorRegistry)
class BaseTestGenerator(unittest.TestCase):
# Defining abstract method which will override by individual testcase.
@classmethod
def setTestServer(cls, server):
cls.server = server
@abstractmethod
def runTest(self):
pass

View File

@ -0,0 +1,32 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
global node_info_dict
node_info_dict = {
"sid": [], # server
"did": [], # database
"lrid": [], # role
"tsid": [], # tablespace
"scid": [], # schema
"tfnid": [], # trigger functions
"coid": [], # collation
"cid": [], # casts
"etid": [], # event_trigger
"eid": [], # extension
"fid": [], # FDW
"fsid": [], # FRS
"umid": [], # user_mapping
"seid": [] # sequence
}
global test_server_dict
test_server_dict = {
"server": [],
"database": []
}

View File

@ -9,10 +9,13 @@
""" This file collect all modules/files present in tests directory and add
them to TestSuite. """
from __future__ import print_function
import argparse
import os
import sys
import signal
import atexit
import unittest
import logging
@ -25,30 +28,60 @@ root = os.path.dirname(CURRENT_PATH)
if sys.path[0] != root:
sys.path.insert(0, root)
os.chdir(root)
from pgadmin import create_app
import config
import test_setup
# Execute setup.py if test SQLite database doesn't exist.
if os.path.isfile(config.TEST_SQLITE_PATH):
print("The configuration database already exists at '%s'. "
"Please remove the database and re-run the test suite." %
config.TEST_SQLITE_PATH)
sys.exit(1)
else:
config.TESTING_MODE = True
pgadmin_credentials = test_setup.config_data
# Set environment variables for email and password
os.environ['PGADMIN_SETUP_EMAIL'] = ''
os.environ['PGADMIN_SETUP_PASSWORD'] = ''
if pgadmin_credentials:
if 'pgAdmin4_login_credentials' in pgadmin_credentials:
if all(item in pgadmin_credentials['pgAdmin4_login_credentials']
for item in ['login_username', 'login_password']):
pgadmin_credentials = pgadmin_credentials[
'pgAdmin4_login_credentials']
os.environ['PGADMIN_SETUP_EMAIL'] = pgadmin_credentials[
'login_username']
os.environ['PGADMIN_SETUP_PASSWORD'] = pgadmin_credentials[
'login_password']
# Execute the setup file
exec (open("setup.py").read())
# Get the config database schema version. We store this in pgadmin.model
# as it turns out that putting it in the config files isn't a great idea
from pgadmin.model import SCHEMA_VERSION
from test_utils import login_tester_account, logout_tester_account
# Delay the import test_utils as it needs updated config.SQLITE_PATH
import test_utils
config.SETTINGS_SCHEMA_VERSION = SCHEMA_VERSION
# Override some other defaults
from logging import WARNING
config.CONSOLE_LOG_LEVEL = WARNING
# Create the app
app = create_app()
app.config['WTF_CSRF_ENABLED'] = False
test_client = app.test_client()
# Login the test client
login_tester_account(test_client)
def get_suite(arguments, test_app_client):
def get_suite(arguments, server, test_app_client):
"""
This function loads the all modules in the tests directory into testing
environment.
@ -56,6 +89,8 @@ def get_suite(arguments, test_app_client):
:param arguments: this is command line arguments for module name to
which test suite will run
:type arguments: str
:param server: server details
:type server: dict
:param test_app_client: test client
:type test_app_client: pgadmin app object
:return pgadmin_suite: test suite with test cases
@ -71,18 +106,24 @@ def get_suite(arguments, test_app_client):
if arguments['pkg'] is None or arguments['pkg'] == "all":
TestsGeneratorRegistry.load_generators('pgadmin')
else:
TestsGeneratorRegistry.load_generators('pgadmin.{}.tests'.format(
arguments['pkg']))
TestsGeneratorRegistry.load_generators('pgadmin.%s.tests' %
arguments['pkg'])
# Sort module list so that test suite executes the test cases sequentially
module_list = TestsGeneratorRegistry.registry.items()
module_list = sorted(module_list, key=lambda module_tuple: module_tuple[0])
# Get the each test module and add into list
for key, klass in TestsGeneratorRegistry.registry.items():
for key, klass in module_list:
gen = klass
modules.append(gen)
# Set the test client to each module & generate the scenarios
for module in modules:
obj = module()
obj.setApp(app)
obj.setTestClient(test_app_client)
obj.setTestServer(server)
scenario = generate_scenarios(obj)
pgadmin_suite.addTests(scenario)
@ -106,6 +147,10 @@ def add_arguments():
return arg
def sig_handler(signo, frame):
test_utils.drop_objects()
class StreamToLogger(object):
def __init__(self, logger, log_level=logging.INFO):
self.terminal = sys.stderr
@ -131,6 +176,14 @@ class StreamToLogger(object):
if __name__ == '__main__':
# Register cleanup function to cleanup on exit
atexit.register(test_utils.drop_objects)
# Set signal handler for cleanup
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGABRT, sig_handler)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGQUIT, sig_handler)
# Set basic logging configuration for log file
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s:%(levelname)s:%(name)s:%(message)s'
@ -144,11 +197,25 @@ if __name__ == '__main__':
sys.stderr = StreamToLogger(stderr_logger, logging.ERROR)
args = vars(add_arguments())
suite = get_suite(args, test_client)
tests = unittest.TextTestRunner(stream=sys.stderr, descriptions=True,
verbosity=2).run(suite)
servers_info = test_utils.get_config_data()
try:
for server in servers_info:
print("\n=============Running the test cases for '%s'============="
% server['name'], file=sys.stderr)
test_utils.create_test_server(server)
# Login the test client
test_utils.login_tester_account(test_client)
suite = get_suite(args, server, test_client)
tests = unittest.TextTestRunner(stream=sys.stderr,
descriptions=True,
verbosity=2).run(suite)
# Logout the test client
logout_tester_account(test_client)
test_utils.logout_tester_account(test_client)
test_utils.delete_test_server(server)
except SystemExit:
test_utils.drop_objects()
print("Please check output in file: %s/regression.log " % CURRENT_PATH)

View File

@ -1,5 +1,5 @@
{
"add_database_data": [
"add_database_data":
{
"privileges_acl": [
{
@ -93,13 +93,11 @@
"privileges": [],
"securities": [],
"variables": []
}
],
"db_update_data": [
},
"db_update_data":
{
"comment": "This is db update comment"
}
],
},
"lr_credentials": {
"can_login": "true",
@ -492,5 +490,3 @@
}

View File

@ -10,13 +10,9 @@
import json
import os
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
# with open(CURRENT_PATH + '/test_config.json') as data_file:
# config_data = json.load(data_file)
#
# with open(CURRENT_PATH + '/test_advanced_config.json') as data_file:
# advanced_config_data = json.load(data_file)
try:
with open(CURRENT_PATH + '/test_config.json') as data_file:
@ -31,5 +27,3 @@ try:
except:
with open(CURRENT_PATH + '/test_advanced_config.json.in') as data_file:
advanced_config_data = json.load(data_file)
pickle_path = os.path.join(CURRENT_PATH, 'parent_id.pkl')

View File

@ -6,87 +6,52 @@
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from __future__ import print_function
import os
import pickle
from test_setup import config_data, pickle_path
import sys
import psycopg2
import sqlite3
import config
import test_setup
import regression
SERVER_GROUP = test_setup.config_data['server_group']
SERVER_GROUP = config_data['server_group']
def get_db_connection(db, username, password, host, port):
"""This function retruns the connection object of psycopg"""
connection = psycopg2.connect(database=db,
user=username,
password=password,
host=host,
port=port)
return connection
def get_pickle_id_dict():
"""This function returns the empty dict of server config data"""
pickle_id_dict = {
"sid": [], # server
"did": [], # database
"lrid": [], # role
"tsid": [], # tablespace
"scid": [], # schema
"tfnid": [], # trigger functions
"coid": [], # collation
"cid": [], # casts
"etid": [], # event_trigger
"eid": [], # extension
"fid": [], # FDW
"fsid": [], # FRS
"umid": [], # user_mapping
"seid": [] # sequence
}
return pickle_id_dict
def get_ids(url=pickle_path):
"""
This function read the parent node's id and return it
:param url: file path from which it will red the ids
:type url: str
:return: node ids
:rtype: dict
"""
output = open(url, 'rb')
ids = pickle.load(output)
output.close()
return ids
# def test_getnodes(tester=None):
# # Connect to server and database.
#
# if not tester:
# return None
#
# all_id = get_ids()
#
# server_ids = all_id["sid"]
# db_ids_dict = all_id["did"][0]
#
# db_con = []
# for server_id in server_ids:
# db_id = db_ids_dict[int(server_id)]
# db_con.append(verify_database(tester, SERVER_GROUP, server_id, db_id))
# return db_con
def get_node_info_dict():
return regression.node_info_dict
def login_tester_account(tester):
"""
This function login the test account using credentials mentioned in
config file
This function login the test client using env variables email and password
:param tester: test client
:type tester: flask test client object
:return: None
"""
email = \
config_data['pgAdmin4_login_credentials']['login_username']
password = \
config_data['pgAdmin4_login_credentials']['login_password']
response = tester.post('/login', data=dict(
email=email, password=password), follow_redirects=True)
if os.environ['PGADMIN_SETUP_EMAIL'] and os.environ[
'PGADMIN_SETUP_PASSWORD']:
email = os.environ['PGADMIN_SETUP_EMAIL']
password = os.environ['PGADMIN_SETUP_PASSWORD']
tester.post('/login', data=dict(email=email, password=password),
follow_redirects=True)
else:
print("Unable to login test client, email and password not found.",
file=sys.stderr)
drop_objects()
sys.exit(1)
def logout_tester_account(tester):
@ -101,63 +66,231 @@ def logout_tester_account(tester):
response = tester.get('/logout')
# Config data for parent_id.pkl
def get_config_data():
"""
This function get the data related to server group and database
like db name, host, port and username etc.
:return: server_group, db_data, pickle_id_dict
:rtype: server_group:dict, db_data:list, pickle_id_dict:dict
"""
db_data = []
pickle_id_dict = get_pickle_id_dict()
server_group = config_data['server_group']
for srv in config_data['server_credentials']:
"""This function reads the server data from config_data"""
server_data = []
for srv in test_setup.config_data['server_credentials']:
data = {"name": srv['name'],
"comment": "",
"host": srv['host'],
"port": srv['db_port'],
"db": srv['maintenance_db'],
"username": srv['db_username'],
"db_password": srv['db_password'],
"role": "",
"sslmode": srv['sslmode']}
db_data.append(data)
return server_group, db_data, pickle_id_dict
"sslmode": srv['sslmode'],
"tablespace_path": srv['tablespace_path']}
server_data.append(data)
return server_data
def write_parent_id(response_data, pickle_id_dict):
def write_node_info(node_id, key, node_info=None):
"""
This function writes the server's details to file parent_id.pkl
This function append the node details to
:param node_id: node id
:type node_id: int
:param key: dict key name to store node info
:type key: str
:param node_info: node details
:type node_info: dict
:return: node_info_dict
:rtype: dict
"""
node_info_dict = regression.node_info_dict
if node_info_dict:
if key in node_info_dict and node_info_dict[key]:
node_info_dict[key].append({node_id: node_info})
else:
node_info_dict[key] = [{node_id: node_info}]
else:
raise Exception("node_info_dict is null.")
:param response_data: server's data
:type response_data: list of dictionary
:param pickle_id_dict: contains ids of server,database,tables etc.
:type pickle_id_dict: dict
def clear_node_info_dict():
"""This function used to clears the node_info_dict variable"""
node_info_dict = regression.node_info_dict
for node in node_info_dict:
del node_info_dict[node][:]
def create_database(server, db_name):
"""This function used to create database and returns the database id"""
try:
connection = get_db_connection(server['db'],
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
pg_cursor.execute("CREATE DATABASE %s" % db_name)
connection.set_isolation_level(old_isolation_level)
connection.commit()
# Get 'oid' from newly created database
pg_cursor.execute(
"SELECT db.oid from pg_database db WHERE db.datname='%s'" %
db_name)
oid = pg_cursor.fetchone()
db_id = ''
if oid:
db_id = oid[0]
connection.close()
return db_id
except Exception as exception:
raise Exception("Error while creating database. %s" % exception)
def drop_database(connection, db_name):
"""This function used to drop the database"""
try:
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
pg_cursor.execute('''DROP DATABASE "%s"''' % db_name)
connection.set_isolation_level(old_isolation_level)
connection.commit()
connection.close()
except Exception as exception:
raise Exception("Exception while dropping the database. %s" %
exception)
def create_server(server):
"""This function is used to create server"""
try:
conn = sqlite3.connect(config.SQLITE_PATH)
# Create the server
cur = conn.cursor()
server_details = (1, SERVER_GROUP, server['name'], server['host'],
server['port'], server['db'], server['username'],
server['role'], server['sslmode'], server['comment'])
cur.execute('INSERT INTO server (user_id, servergroup_id, name, host, '
'port, maintenance_db, username, role, ssl_mode,'
' comment) VALUES (?,?,?,?,?,?,?,?,?,?)', server_details)
server_id = cur.lastrowid
conn.commit()
return server_id
except Exception as exception:
raise Exception("Error while creating server. %s" % exception)
def delete_server(sid):
"""This function used to delete server from SQLite"""
try:
conn = sqlite3.connect(config.SQLITE_PATH)
cur = conn.cursor()
servers = cur.execute('SELECT * FROM server WHERE id=%s' % sid)
servers_count = len(servers.fetchall())
if servers_count:
cur.execute('DELETE FROM server WHERE id=%s' % sid)
conn.commit()
else:
print("No servers found to delete.", file=sys.stderr)
except Exception as err:
raise Exception("Error while deleting server %s" % err)
def create_test_server(server):
"""
This function create the test server which will act as parent server,
the other node will add under this server
:param server: server details
:type server: dict
:return: None
"""
# Create the server
server_id = create_server(server)
server_id = response_data['node']['_id']
if os.path.isfile(pickle_path):
existed_server_id = open(pickle_path, 'rb')
pickle_id_dict = pickle.load(existed_server_id)
# Create test database
test_db_name = "test_db"
db_id = create_database(server, test_db_name)
pickle_id_dict["sid"].append(str(server_id))
output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, output)
output.close()
# Add server info to test_server_dict
regression.test_server_dict["server"].append({"server_id": server_id,
"server": server})
regression.test_server_dict["database"].append({"server_id": server_id,
"db_id": db_id,
"db_name": test_db_name})
def delete_parent_id_file():
"""
This function deletes the file parent_id.pkl which contains server and
database details
def delete_test_server(server):
test_server_dict = regression.test_server_dict
if test_server_dict:
connection = get_db_connection(server['db'],
server['username'],
server['db_password'],
server['host'],
server['port'])
db_name = test_server_dict["database"][0]["db_name"]
drop_database(connection, db_name)
# Delete the server
server_id = test_server_dict['server'][0]["server_id"]
conn = sqlite3.connect(config.SQLITE_PATH)
cur = conn.cursor()
servers = cur.execute('SELECT * FROM server WHERE id=%s' % server_id)
servers_count = len(servers.fetchall())
if servers_count:
cur.execute('DELETE FROM server WHERE id=%s' % server_id)
conn.commit()
conn.close()
server_dict = regression.test_server_dict["server"]
:return: None
"""
# Pop the server from dict if it's deleted
server_dict = [server_dict.pop(server_dict.index(item))
for item in server_dict
if str(server_id) == str(item["server_id"])]
if os.path.isfile(pickle_path):
os.remove(pickle_path)
# Pop the db from dict if it's deleted
db_dict = regression.test_server_dict["database"]
db_dict = [db_dict.pop(db_dict.index(item)) for item in db_dict
if server_id == item["server_id"]]
def drop_objects():
"""This function use to cleanup the created the objects(servers, databases,
schemas etc) during the test suite run"""
# Cleanup in node_info_dict
servers_info = regression.node_info_dict['sid']
if servers_info:
for server in servers_info:
server_id = server.keys()[0]
server = server.values()[0]
if regression.node_info_dict['did']:
db_conn = get_db_connection(server['db'],
server['username'],
server['db_password'],
server['host'],
server['port'])
db_dict = regression.node_info_dict['did'][0]
if int(server_id) in db_dict:
db_name = db_dict[int(server_id)]["db_name"]
drop_database(db_conn, db_name)
delete_server(server_id)
# Cleanup in test_server_dict
servers = regression.test_server_dict["server"]
if servers:
for server in servers:
server_id = server["server_id"]
server = server["server"]
if regression.test_server_dict["database"]:
db_info = regression.test_server_dict["database"]
db_dict = [item for item in db_info
if server_id == item["server_id"]]
if db_dict:
for db in db_dict:
db_name = db["db_name"]
db_conn = get_db_connection(server['db'],
server['username'],
server['db_password'],
server['host'],
server['port'])
drop_database(db_conn, db_name)
delete_server(server_id)
# Remove the test SQLite database
if os.path.isfile(config.SQLITE_PATH):
os.remove(config.SQLITE_PATH)

25
web/setup.py Normal file → Executable file
View File

@ -29,6 +29,7 @@ import config
# Get the config database schema version. We store this in pgadmin.model
# as it turns out that putting it in the config files isn't a great idea
from pgadmin.model import SCHEMA_VERSION
config.SETTINGS_SCHEMA_VERSION = SCHEMA_VERSION
# If script is running under python2 then change the behaviour of functions
@ -50,10 +51,19 @@ def do_setup(app):
else:
print("NOTE: Configuring authentication for SERVER mode.\n")
if all(value in os.environ for value in
['PGADMIN_SETUP_EMAIL', 'PGADMIN_SETUP_PASSWORD']):
email = ''
p1 = ''
if os.environ['PGADMIN_SETUP_EMAIL'] and os.environ[
'PGADMIN_SETUP_PASSWORD']:
email = os.environ['PGADMIN_SETUP_EMAIL']
p1 = os.environ['PGADMIN_SETUP_PASSWORD']
else:
# Prompt the user for their default username and password.
print("""
Enter the email address and password to use for the initial pgAdmin user \
account:\n""")
Enter the email address and password to use for the initial pgAdmin user \
account:\n""")
email_filter = re.compile(
"^[a-zA-Z0-9.!#$%&'*+\/=?^_`{|}~-]+@[a-zA-Z0-9]"
"(?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9]"
@ -72,7 +82,8 @@ account:\n""")
if p1 != p2:
print('Passwords do not match. Please try again.')
else:
print('Password must be at least 6 characters. Please try again.')
print(
'Password must be at least 6 characters. Please try again.')
p1, p2 = pprompt()
# Setup Flask-Security
@ -91,7 +102,6 @@ account:\n""")
name='User',
description='pgAdmin User Role'
)
user_datastore.create_user(email=email, password=password)
db.session.flush()
user_datastore.add_role_to_user(email, 'Administrator')
@ -338,6 +348,10 @@ ALTER TABLE SERVER
if __name__ == '__main__':
app = Flask(__name__)
app.config.from_object(config)
if config.TESTING_MODE:
config.SQLITE_PATH = config.TEST_SQLITE_PATH
app.config['SQLALCHEMY_DATABASE_URI'] = \
'sqlite:///' + config.SQLITE_PATH.replace('\\', '/')
db.init_app(app)
@ -346,9 +360,10 @@ if __name__ == '__main__':
print("======================================\n")
local_config = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
os.path.dirname(os.path.dirname(__file__)),
'config_local.py'
)
if not os.path.isfile(local_config):
print("""
The configuration file - {0} does not exist.