Add terst cases for packages, and update Synonym cases for recent API changes.

pull/3/head
Priyanka Shendge 2016-10-14 10:59:36 -07:00 committed by Dave Page
parent f089744d6d
commit 22dadacb0f
11 changed files with 510 additions and 33 deletions

View File

@ -0,0 +1,16 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.utils.route import BaseTestGenerator
class PackageTestGenerator(BaseTestGenerator):
def runTest(self):
return

View File

@ -0,0 +1,91 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
import json
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
class PackageAddTestCase(BaseTestGenerator):
""" This class will add new package under test schema. """
scenarios = [
# Fetching default URL for package node.
('Fetch Package Node URL', dict(
url='/browser/package/obj/'))
]
def setUp(self):
schema_info = parent_node_dict["schema"][-1]
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_con = server_utils.connect_server(self, self.server_id)
if server_con:
if "server_type" in server_con["data"]:
if server_con["data"]["server_type"] == "pg":
message = "Packages not supported by PostgreSQL."
self.skipTest(message)
def runTest(self):
""" This function will add package under test schema. """
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
self.server_id,
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema.")
data = \
{
"name": "pkg_%s" % str(uuid.uuid4())[1:4],
"owner": self.server["username"],
"pkgacl": [],
"pkgbodysrc": "PROCEDURE p1() is \n"
"begin \n"
"dbms_output.put_line('Test_pkg.Proc...'); "
"\nEND\t;",
"pkgheadsrc": "PROCEDURE p1();",
"schema": self.schema_id
}
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/' + str(self.schema_id) + '/',
data=json.dumps(data),
content_type='html/json')
self.assertEquals(response.status_code, 200)
def tearDown(self):
"""This function disconnect the test database."""
database_utils.disconnect_database(self, self.server_id,
self.db_id)

View File

@ -0,0 +1,94 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as package_utils
class PackageDeleteTestCase(BaseTestGenerator):
""" This class will delete new package under test schema. """
scenarios = [
# Fetching default URL for package node.
('Fetch Package Node URL', dict(
url='/browser/package/obj/'))
]
def setUp(self):
schema_info = parent_node_dict["schema"][-1]
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.pkg_name= "pkg_%s" % str(uuid.uuid4())[1:4]
self.proc_name = "proc_%s" % str(uuid.uuid4())[1:4]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_con = server_utils.connect_server(self, self.server_id)
if server_con:
if "server_type" in server_con["data"]:
if server_con["data"]["server_type"] == "pg":
message = "Packages not supported by PostgreSQL."
self.skipTest(message)
self.package_id = package_utils.create_package(self.server,
self.db_name,
self.schema_name,
self.pkg_name,
self.proc_name)
def runTest(self):
""" This function will delete package under test schema. """
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
self.server_id,
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema.")
package_response = package_utils.verify_package(self.server,
self.db_name,
self.schema_name)
if not package_response:
raise Exception("Could not find the package.")
delete_response = self.tester.delete(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.package_id),
follow_redirects=True)
self.assertEquals(delete_response.status_code, 200)
def tearDown(self):
"""This function disconnect the test database."""
database_utils.disconnect_database(self, self.server_id,
self.db_id)

View File

@ -0,0 +1,85 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as package_utils
class PackageGetTestCase(BaseTestGenerator):
""" This class will fetch new package under test schema. """
scenarios = [
# Fetching default URL for package node.
('Fetch Package Node URL', dict(
url='/browser/package/obj/'))
]
def setUp(self):
schema_info = parent_node_dict["schema"][-1]
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.pkg_name= "pkg_%s" % str(uuid.uuid4())[1:4]
self.proc_name = "proc_%s" % str(uuid.uuid4())[1:4]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_con = server_utils.connect_server(self, self.server_id)
if server_con:
if "server_type" in server_con["data"]:
if server_con["data"]["server_type"] == "pg":
message = "Packages not supported by PostgreSQL."
self.skipTest(message)
self.package_id = package_utils.create_package(self.server,
self.db_name,
self.schema_name,
self.pkg_name,
self.proc_name)
def runTest(self):
""" This function will fetch package under test schema. """
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
self.server_id,
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema.")
response = self.tester.get(self.url
+ str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) + '/' +
str(self.schema_id) + '/' + str(self.package_id),
content_type='html/json')
self.assertEquals(response.status_code, 200)
def tearDown(self):
"""This function disconnect the test database."""
database_utils.disconnect_database(self, self.server_id,
self.db_id)

View File

@ -0,0 +1,102 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
import json
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as package_utils
class PackagePutTestCase(BaseTestGenerator):
""" This class will update new package under test schema. """
scenarios = [
# Fetching default URL for package node.
('Fetch Package Node URL', dict(
url='/browser/package/obj/'))
]
def setUp(self):
schema_info = parent_node_dict["schema"][-1]
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.pkg_name= "pkg_%s" % str(uuid.uuid4())[1:4]
self.proc_name = "proc_%s" % str(uuid.uuid4())[1:4]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_con = server_utils.connect_server(self, self.server_id)
if server_con:
if "server_type" in server_con["data"]:
if server_con["data"]["server_type"] == "pg":
message = "Packages not supported by PostgreSQL."
self.skipTest(message)
self.package_id = package_utils.create_package(self.server,
self.db_name,
self.schema_name,
self.pkg_name,
self.proc_name)
def runTest(self):
""" This function will update package under test schema. """
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
self.server_id,
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema.")
package_response = package_utils.verify_package(self.server,
self.db_name,
self.schema_name)
if not package_response:
raise Exception("Could not find the package.")
data = {
"description": "This is FTS template update comment",
"id": self.package_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.package_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
def tearDown(self):
"""This function disconnect the test database."""
database_utils.disconnect_database(self, self.server_id,
self.db_id)

View File

@ -0,0 +1,81 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
from __future__ import print_function
import traceback
import sys
from regression import test_utils as utils
def create_package(server, db_name, schema_name, pkg_name, proc_name):
"""
This function create the package on given test schema.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param schema_name: schema name
:type schema_name: str
:param package_name: package_name
:type package_name: str
:return package_id: synonym_id
:rtype: int
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
pg_cursor = connection.cursor()
query = "CREATE OR REPLACE PACKAGE %s.%s IS PROCEDURE %s(); END %s; " \
"CREATE OR REPLACE PACKAGE BODY %s.%s IS PROCEDURE %s() IS BEGIN " \
"dbms_output.put_line('Test_pkg.Proc...'); END; END %s;" % \
(schema_name, pkg_name, proc_name, pkg_name, schema_name,
pkg_name, proc_name, pkg_name)
pg_cursor.execute(query)
connection.commit()
# Get 'oid' from newly created package
pg_cursor.execute("SELECT oid FROM pg_namespace WHERE nspname='%s'" %
pkg_name)
package_id = pg_cursor.fetchone()[0]
connection.close()
return package_id
except Exception:
traceback.print_exc(file=sys.stderr)
def verify_package(server, db_name, pkg_name):
"""
This function verify the added package on test schema.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param package_name: package name
:type package_name: str
:return package: package record from database
:rtype: tuple
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
pg_cursor = connection.cursor()
pg_cursor.execute("SELECT oid FROM pg_namespace WHERE nspname='%s'" %
pkg_name)
package = pg_cursor.fetchone()
connection.close()
return package
except Exception:
traceback.print_exc(file=sys.stderr)

View File

@ -22,7 +22,8 @@ from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
class SynonymAddTestCase(BaseTestGenerator):
"""This class will add new synonym under schema node."""
"""This class will add new synonym under test schema."""
scenarios = [
# Fetching default URL for synonym node.
('Default Node URL', dict(url='/browser/synonym/obj/'))
@ -56,8 +57,10 @@ class SynonymAddTestCase(BaseTestGenerator):
self.server, self.db_name, self.schema_name, self.sequence_name)
def runTest(self):
"""This function will add synonym under schema node."""
"""This function will add synonym under test schema."""
db_user = self.server["username"]
data = {
"owner": db_user,
"schema": self.schema_name,
@ -66,6 +69,7 @@ class SynonymAddTestCase(BaseTestGenerator):
"targettype": "Sequence",
"name": "synonym_add_%s" % (str(uuid.uuid4())[1:6])
}
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' + str(self.server_id)
+ '/' + str(self.db_id) + '/' + str(self.schema_id) + '/',
@ -74,4 +78,5 @@ class SynonymAddTestCase(BaseTestGenerator):
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -23,6 +23,7 @@ from . import utils as synonym_utils
class SynonymDeleteTestCase(BaseTestGenerator):
"""This class will delete added synonym under schema node."""
scenarios = [
# Fetching default URL for synonym node.
('Fetch synonym Node URL', dict(url='/browser/synonym/obj/'))
@ -55,26 +56,29 @@ class SynonymDeleteTestCase(BaseTestGenerator):
self.sequence_id = sequence_utils.create_sequences(
self.server, self.db_name, self.schema_name, self.sequence_name)
self.synonym_name = "test_synonym_delete_%s" % str(uuid.uuid4())[1:6]
self.synonym_id = synonym_utils.create_synonym(self.server,
self.db_name,
self.schema_name,
self.synonym_name,
self.sequence_name)
synonym_utils.create_synonym(self.server,
self.db_name,
self.schema_name,
self.synonym_name,
self.sequence_name)
def runTest(self):
"""This function will delete synonym under schema node."""
synonym_response = synonym_utils.verify_synonym(self.server,
self.db_name,
self.synonym_name)
if not synonym_response:
raise Exception("No synonym node to delete.")
response = self.tester.delete(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) + '/' +
str(self.schema_id) + '/' + str(self.synonym_id),
str(self.schema_id) + '/' + str(self.synonym_name),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -14,7 +14,7 @@ from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.sequences.tests\
from pgadmin.browser.server_groups.servers.databases.schemas.sequences.tests \
import utils as sequence_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
@ -23,6 +23,7 @@ from . import utils as synonym_utils
class SynonymGetTestCase(BaseTestGenerator):
"""This class will fetch new synonym under schema node."""
scenarios = [
# Fetching default URL for synonym node.
('Fetch synonym Node URL', dict(url='/browser/synonym/obj/'))
@ -50,26 +51,28 @@ class SynonymGetTestCase(BaseTestGenerator):
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add the synonym.")
self.sequence_name = "test_sequence_synonym_%s" %\
self.sequence_name = "test_sequence_synonym_%s" % \
str(uuid.uuid4())[1:6]
self.sequence_id = sequence_utils.create_sequences(
self.server, self.db_name, self.schema_name, self.sequence_name)
self.synonym_name = "test_synonym_get_%s" % str(uuid.uuid4())[1:6]
self.synonym_id = synonym_utils.create_synonym(self.server,
self.db_name,
self.schema_name,
self.synonym_name,
self.sequence_name)
synonym_utils.create_synonym(self.server,
self.db_name,
self.schema_name,
self.synonym_name,
self.sequence_name)
def runTest(self):
"""This function will fetch synonym under schema node."""
response = self.tester.get(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) + '/' +
str(self.schema_id) + '/' + str(self.synonym_id),
str(self.schema_id) + '/' + str(self.synonym_name),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
""" Disconnect the database. """
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -25,13 +25,15 @@ from . import utils as synonym_utils
class SynonymPutTestCase(BaseTestGenerator):
"""This class will update added synonym under schema node."""
"""This class will update added synonym under test schema."""
scenarios = [
# Fetching default URL for synonym node.
('Fetch synonym Node URL', dict(url='/browser/synonym/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
@ -58,14 +60,15 @@ class SynonymPutTestCase(BaseTestGenerator):
self.sequence_id = sequence_utils.create_sequences(
self.server, self.db_name, self.schema_name, self.sequence_name)
self.synonym_name = "test_synonym_put_%s" % str(uuid.uuid4())[1:6]
self.synonym_id = synonym_utils.create_synonym(self.server,
self.db_name,
self.schema_name,
self.synonym_name,
self.sequence_name)
synonym_utils.create_synonym(self.server,
self.db_name,
self.schema_name,
self.synonym_name,
self.sequence_name)
def runTest(self):
"""This function will update synonym under schema node."""
synonym_response = synonym_utils.verify_synonym(self.server,
self.db_name,
self.synonym_name)
@ -92,5 +95,6 @@ class SynonymPutTestCase(BaseTestGenerator):
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
""" Disconnect the database. """
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -13,7 +13,7 @@ import sys
from regression import test_utils as utils
def create_synonym(server, db_name, schema_name, synonym_name, sequence_name):
def create_synonym(server, db_name, schema_name, synonym_name, sequence_name):
"""
This function create the synonym on given schema node.
:param server: server details
@ -26,8 +26,6 @@ def create_synonym(server, db_name, schema_name, synonym_name, sequence_name):
:type synonym_name: str
:param sequence_name: sequence name
:type sequence_name: str
:return synonym_id: synonym_id
:rtype: int
"""
try:
connection = utils.get_db_connection(db_name,
@ -40,12 +38,6 @@ def create_synonym(server, db_name, schema_name, synonym_name, sequence_name):
schema_name, synonym_name, schema_name, sequence_name)
pg_cursor.execute(query)
connection.commit()
# Get 'oid' from newly created synonym
pg_cursor.execute("SELECT oid FROM pg_synonym WHERE synname='%s'" %
synonym_name)
synonym_id = pg_cursor.fetchone()
connection.close()
return synonym_id
except Exception:
traceback.print_exc(file=sys.stderr)