Add regression tests for FTS nodes.
parent
36c185e9dc
commit
8b91babdc2
|
@ -0,0 +1,16 @@
|
|||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
|
||||
|
||||
class FTSConfigurationTestGenerator(BaseTestGenerator):
|
||||
|
||||
def runTest(self):
|
||||
return []
|
|
@ -0,0 +1,89 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas. \
|
||||
fts_parser.tests import utils as fts_parser_utils
|
||||
|
||||
|
||||
class FTSConfiguraionAddTestCase(BaseTestGenerator):
|
||||
""" This class will add new FTS configuration under test schema. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for fts_configuration node.
|
||||
('Fetch fts_configuration Node URL',
|
||||
dict(url='/browser/fts_configuration/obj/'))
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
""" This function will create parser."""
|
||||
|
||||
schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = schema_data['schema_name']
|
||||
self.schema_id = schema_data['schema_id']
|
||||
self.server_id = schema_data['server_id']
|
||||
self.db_id = schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
self.fts_parser_name = "fts_parser_%s" % str(uuid.uuid4())[1:4]
|
||||
|
||||
self.fts_parser_id = fts_parser_utils.create_fts_parser(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.fts_parser_name)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add new FTS configuration under test schema. """
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
data = \
|
||||
{
|
||||
"name": "fts_conf_%s" % str(uuid.uuid4())[1:4],
|
||||
"owner": self.server["username"],
|
||||
"prsname": "%s.%s" % (self.schema_name, self.fts_parser_name),
|
||||
"schema": self.schema_id,
|
||||
"tokens": []
|
||||
}
|
||||
|
||||
response = self.tester.post(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' + str(self.db_id) +
|
||||
'/' + str(self.schema_id) + '/',
|
||||
data=json.dumps(data),
|
||||
content_type='html/json')
|
||||
|
||||
self.assertEquals(response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,84 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from . import utils as fts_configuration_utils
|
||||
|
||||
|
||||
class FTSConfDeleteTestCase(BaseTestGenerator):
|
||||
""" This class will delete added FTS configuration under schema node. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for fts_configuration node.
|
||||
('Fetch FTS configuration Node URL',
|
||||
dict(url='/browser/fts_configuration/obj/'))
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
""" This function will create FTS configuration."""
|
||||
|
||||
schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = schema_data['schema_name']
|
||||
self.schema_id = schema_data['schema_id']
|
||||
self.server_id = schema_data['server_id']
|
||||
self.db_id = schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
self.fts_conf_name = "fts_conf_%s" % str(uuid.uuid4())[1:4]
|
||||
|
||||
self.fts_conf_id = fts_configuration_utils.create_fts_configuration(
|
||||
self.server, self.db_name, self.schema_name, self.fts_conf_name)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will delete new FTS configuration under test
|
||||
schema. """
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
fts_conf_response = fts_configuration_utils.verify_fts_configuration(
|
||||
self.server, self.db_name, self.fts_conf_name
|
||||
)
|
||||
|
||||
if not fts_conf_response:
|
||||
raise Exception("Could not find the FTS Configuration.")
|
||||
|
||||
delete_response = self.tester.delete(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.fts_conf_id),
|
||||
follow_redirects=True)
|
||||
self.assertEquals(delete_response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,77 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from . import utils as fts_configuration_utils
|
||||
|
||||
|
||||
class FTSConfGetTestCase(BaseTestGenerator):
|
||||
""" This class will fetch added fts_configuration under schema node. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for fts_configuration node.
|
||||
('Fetch FTS configuration Node URL',
|
||||
dict(url='/browser/fts_configuration/obj/'))
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
""" This function will create FTS configuration."""
|
||||
|
||||
schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = schema_data['schema_name']
|
||||
self.schema_id = schema_data['schema_id']
|
||||
self.server_id = schema_data['server_id']
|
||||
self.db_id = schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
self.fts_conf_name = "fts_conf_%s" % str(uuid.uuid4())[1:4]
|
||||
|
||||
self.fts_conf_id = fts_configuration_utils.create_fts_configuration(
|
||||
self.server, self.db_name, self.schema_name, self.fts_conf_name)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will fetch new FTS configuration under
|
||||
test schema. """
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
get_response = self.tester.get(self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.fts_conf_id),
|
||||
content_type='html/json')
|
||||
|
||||
self.assertEquals(get_response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,92 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from . import utils as fts_configuration_utils
|
||||
|
||||
|
||||
class FTSConfPutTestCase(BaseTestGenerator):
|
||||
""" This class will update added FTS configuration under schema node. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for fts_configuration node.
|
||||
('Fetch FTS configuration Node URL',
|
||||
dict(url='/browser/fts_configuration/obj/'))
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
""" This function will create FTS configuration."""
|
||||
|
||||
schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = schema_data['schema_name']
|
||||
self.schema_id = schema_data['schema_id']
|
||||
self.server_id = schema_data['server_id']
|
||||
self.db_id = schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
self.fts_conf_name = "fts_conf_%s" % str(uuid.uuid4())[1:4]
|
||||
|
||||
self.fts_conf_id = fts_configuration_utils.create_fts_configuration(
|
||||
self.server, self.db_name, self.schema_name, self.fts_conf_name)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will update new FTS configuration under
|
||||
test schema. """
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
fts_conf_response = fts_configuration_utils.verify_fts_configuration(
|
||||
self.server, self.db_name, self.fts_conf_name
|
||||
)
|
||||
|
||||
if not fts_conf_response:
|
||||
raise Exception("Could not find the FTS Configuration.")
|
||||
|
||||
data = \
|
||||
{
|
||||
"description": "This is FTS configuration update comment",
|
||||
"id": self.fts_conf_id
|
||||
}
|
||||
|
||||
put_response = self.tester.put(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.fts_conf_id),
|
||||
data=json.dumps(data),
|
||||
follow_redirects=True)
|
||||
self.assertEquals(put_response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,80 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
from __future__ import print_function
|
||||
import traceback
|
||||
import os
|
||||
import sys
|
||||
from regression.test_utils import get_db_connection
|
||||
|
||||
file_name = os.path.basename(__file__)
|
||||
|
||||
|
||||
def create_fts_configuration(server, db_name, schema_name, fts_conf_name):
|
||||
"""This function will add the fts_configuration under test schema using
|
||||
default parser. """
|
||||
|
||||
try:
|
||||
connection = get_db_connection(db_name,
|
||||
server['username'],
|
||||
server['db_password'],
|
||||
server['host'],
|
||||
server['port'])
|
||||
pg_cursor = connection.cursor()
|
||||
|
||||
query = "CREATE TEXT SEARCH CONFIGURATION " + schema_name + "." + fts_conf_name + \
|
||||
"(PARSER=default)"
|
||||
|
||||
pg_cursor.execute(query)
|
||||
connection.commit()
|
||||
|
||||
# Get 'oid' from newly created configuration
|
||||
pg_cursor.execute("select oid from pg_catalog.pg_ts_config where "
|
||||
"cfgname = '%s' order by oid ASC limit 1"
|
||||
% fts_conf_name)
|
||||
|
||||
oid = pg_cursor.fetchone()
|
||||
fts_conf_id = ''
|
||||
if oid:
|
||||
fts_conf_id = oid[0]
|
||||
connection.close()
|
||||
return fts_conf_id
|
||||
except Exception:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
|
||||
|
||||
def verify_fts_configuration(server, db_name, fts_conf_name):
|
||||
"""
|
||||
This function will verify current FTS configuration.
|
||||
|
||||
:param server: server details
|
||||
:type server: dict
|
||||
:param db_name: database name
|
||||
:type db_name: str
|
||||
:param fts_conf_name: FTS configuration name to be added
|
||||
:type fts_conf_name: str
|
||||
:return fts_conf: FTS configuration detail
|
||||
:rtype: tuple
|
||||
"""
|
||||
try:
|
||||
connection = get_db_connection(db_name,
|
||||
server['username'],
|
||||
server['db_password'],
|
||||
server['host'],
|
||||
server['port'])
|
||||
pg_cursor = connection.cursor()
|
||||
|
||||
pg_cursor.execute(
|
||||
"select oid from pg_catalog.pg_ts_config where "
|
||||
"cfgname = '%s' order by oid ASC limit 1"
|
||||
% fts_conf_name)
|
||||
fts_conf = pg_cursor.fetchone()
|
||||
connection.close()
|
||||
return fts_conf
|
||||
except Exception:
|
||||
traceback.print_exc(file=sys.stderr)
|
|
@ -0,0 +1,16 @@
|
|||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
|
||||
|
||||
class ForeignTableTestGenerator(BaseTestGenerator):
|
||||
|
||||
def runTest(self):
|
||||
return []
|
|
@ -0,0 +1,82 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
|
||||
|
||||
class FtsDictionaryAddTestCase(BaseTestGenerator):
|
||||
""" This class will add new FTS dictionary under schema node. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for FTS dictionary node.
|
||||
('Fetch FTS dictionary Node URL', dict(
|
||||
url='/browser/fts_dictionary/obj/'))
|
||||
]
|
||||
|
||||
def runTest(self):
|
||||
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = self.schema_data['schema_name']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
data = \
|
||||
{
|
||||
"name": "fts_dict_%s" % str(uuid.uuid4())[1:4],
|
||||
"options": [
|
||||
{
|
||||
"value": "synonym_sample",
|
||||
"option": "synonyms"
|
||||
}
|
||||
],
|
||||
"owner": self.server["username"],
|
||||
"schema": self.schema_id,
|
||||
"template": "synonym"
|
||||
}
|
||||
|
||||
response = self.tester.post(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' + str(self.db_id) +
|
||||
'/' + str(self.schema_id) + '/',
|
||||
data=json.dumps(data),
|
||||
content_type='html/json')
|
||||
|
||||
self.assertEquals(response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,87 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from . import utils as fts_dict_utils
|
||||
|
||||
|
||||
class FtsDictionaryDeleteTestCase(BaseTestGenerator):
|
||||
""" This class will delete added FTS Dictionary under schema node. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for FTS dictionary node.
|
||||
('Fetch FTS dictionary Node URL', dict(
|
||||
url='/browser/fts_dictionary/obj/'))
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = self.schema_data['schema_name']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
self.fts_dict_name = "fts_dict_%s" % str(uuid.uuid4())[1:4]
|
||||
|
||||
self.fts_dict_id = fts_dict_utils.create_fts_dictionary(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.fts_dict_name)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will update FTS dictionary present under
|
||||
test schema. """
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
dict_response = fts_dict_utils.verify_fts_dict(self.server,
|
||||
self.db_name,
|
||||
self.fts_dict_name)
|
||||
|
||||
if not dict_response:
|
||||
raise Exception("Could not find the FTS dictionary.")
|
||||
|
||||
delete_response = self.tester.delete(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.fts_dict_id),
|
||||
follow_redirects=True)
|
||||
|
||||
self.assertEquals(delete_response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,77 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from . import utils as fts_dict_utils
|
||||
|
||||
|
||||
class FtsDictionaryGetTestCase(BaseTestGenerator):
|
||||
""" This class will fetch new FTS dictionary under schema node. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for FTS dictionary node.
|
||||
('Fetch FTS dictionary Node URL', dict(
|
||||
url='/browser/fts_dictionary/obj/'))
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = self.schema_data['schema_name']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
self.fts_dict_name = "fts_dict_%s" % str(uuid.uuid4())[1:4]
|
||||
|
||||
self.fts_dict_id = fts_dict_utils.create_fts_dictionary(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.fts_dict_name)
|
||||
|
||||
def runTest(self):
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
response = self.tester.get(self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.fts_dict_id),
|
||||
content_type='html/json')
|
||||
|
||||
self.assertEquals(response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,95 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from . import utils as fts_dict_utils
|
||||
|
||||
|
||||
class FtsDictionaryPutTestCase(BaseTestGenerator):
|
||||
""" This class will update added FTS dictionary under schema node. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for FTS dictionary node.
|
||||
('Fetch FTS dictionary Node URL', dict(
|
||||
url='/browser/fts_dictionary/obj/'))
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = self.schema_data['schema_name']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
self.fts_dict_name = "fts_dict_%s" % str(uuid.uuid4())[1:4]
|
||||
|
||||
self.fts_dict_id = fts_dict_utils.create_fts_dictionary(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.fts_dict_name)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will update FTS dictionary present under
|
||||
test schema. """
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
dict_response = fts_dict_utils.verify_fts_dict(self.server,
|
||||
self.db_name,
|
||||
self.fts_dict_name)
|
||||
|
||||
if not dict_response:
|
||||
raise Exception("Could not find the FTS dictionary.")
|
||||
|
||||
data = \
|
||||
{
|
||||
"description": "This is FTS dictionary update comment",
|
||||
"id": self.fts_dict_id
|
||||
|
||||
}
|
||||
put_response = self.tester.put(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.fts_dict_id),
|
||||
data=json.dumps(data),
|
||||
follow_redirects=True)
|
||||
|
||||
self.assertEquals(put_response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,80 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import traceback
|
||||
import os
|
||||
import sys
|
||||
from regression.test_utils import get_db_connection
|
||||
|
||||
file_name = os.path.basename(__file__)
|
||||
|
||||
|
||||
def create_fts_dictionary(server, db_name, schema_name, fts_dict_name):
|
||||
"""This function will add the fts_dictionary under test schema. """
|
||||
|
||||
try:
|
||||
connection = get_db_connection(db_name,
|
||||
server['username'],
|
||||
server['db_password'],
|
||||
server['host'],
|
||||
server['port'])
|
||||
pg_cursor = connection.cursor()
|
||||
|
||||
query = "CREATE TEXT SEARCH DICTIONARY %s.%s (TEMPLATE = simple)" % (
|
||||
schema_name, fts_dict_name)
|
||||
|
||||
pg_cursor.execute(query)
|
||||
connection.commit()
|
||||
|
||||
# Get 'oid' from newly created dictionary
|
||||
pg_cursor.execute("select oid from pg_catalog.pg_ts_dict where "
|
||||
"dictname = '%s' order by oid ASC limit 1"
|
||||
% fts_dict_name)
|
||||
|
||||
oid = pg_cursor.fetchone()
|
||||
fts_dict_id = ''
|
||||
if oid:
|
||||
fts_dict_id = oid[0]
|
||||
connection.close()
|
||||
return fts_dict_id
|
||||
except Exception:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
|
||||
|
||||
def verify_fts_dict(server, db_name, fts_dict_name):
|
||||
"""
|
||||
This function will verify current FTS dictionary.
|
||||
|
||||
:param server: server details
|
||||
:type server: dict
|
||||
:param db_name: database name
|
||||
:type db_name: str
|
||||
:param fts_dict_name: FTS dictionary name to be added
|
||||
:type fts_dict_name: str
|
||||
:return fts_dict: FTS dictionary detail
|
||||
:rtype: tuple
|
||||
"""
|
||||
try:
|
||||
connection = get_db_connection(db_name,
|
||||
server['username'],
|
||||
server['db_password'],
|
||||
server['host'],
|
||||
server['port'])
|
||||
pg_cursor = connection.cursor()
|
||||
|
||||
pg_cursor.execute(
|
||||
"select oid from pg_catalog.pg_ts_dict where "
|
||||
"dictname = '%s' order by oid ASC limit 1"
|
||||
% fts_dict_name)
|
||||
fts_dict = pg_cursor.fetchone()
|
||||
connection.close()
|
||||
return fts_dict
|
||||
except Exception:
|
||||
traceback.print_exc(file=sys.stderr)
|
|
@ -0,0 +1,16 @@
|
|||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
|
||||
|
||||
class ForeignTableTestGenerator(BaseTestGenerator):
|
||||
|
||||
def runTest(self):
|
||||
return []
|
|
@ -0,0 +1,78 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
|
||||
|
||||
class FtsParserAddTestCase(BaseTestGenerator):
|
||||
""" This class will add new FTS parser under schema node. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for FTS parser node.
|
||||
('Fetch FTS parser Node URL', dict(url='/browser/fts_parser/obj/'))
|
||||
]
|
||||
|
||||
def runTest(self):
|
||||
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = self.schema_data['schema_name']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
self.data = \
|
||||
{
|
||||
"name": "fts_parser_%s" % str(uuid.uuid4())[1:4],
|
||||
"schema": self.schema_id,
|
||||
"prsend": "btfloat4sortsupport",
|
||||
"prsheadline": "prsd_headline",
|
||||
"prslextype": "dsynonym_init",
|
||||
"prsstart": "int4_accum",
|
||||
"prstoken": "gist_box_penalty"
|
||||
}
|
||||
|
||||
response = self.tester.post(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' + str(self.db_id) +
|
||||
'/' + str(self.schema_id) + '/',
|
||||
data=json.dumps(self.data),
|
||||
content_type='html/json')
|
||||
|
||||
self.assertEquals(response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,87 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from . import utils as fts_parser_utils
|
||||
|
||||
|
||||
class FtsParserDeleteTestCase(BaseTestGenerator):
|
||||
""" This class will delete added FTS Parser under schema node. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for FTS parser node.
|
||||
('Fetch FTS parser Node URL', dict(url='/browser/fts_parser/obj/'))
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = self.schema_data['schema_name']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
self.fts_parser_name = "fts_parser_%s" % str(uuid.uuid4())[1:4]
|
||||
|
||||
self.fts_parser_id = fts_parser_utils.create_fts_parser(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.fts_parser_name)
|
||||
|
||||
def runTest(self):
|
||||
|
||||
""" This function will delete FTS parser present under
|
||||
test schema. """
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
parser_response = fts_parser_utils.verify_fts_parser(self.server,
|
||||
self.db_name,
|
||||
self.fts_parser_name)
|
||||
|
||||
if not parser_response:
|
||||
raise Exception("Could not find the FTS parser.")
|
||||
|
||||
delete_response = self.tester.delete(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.fts_parser_id),
|
||||
follow_redirects=True)
|
||||
|
||||
self.assertEquals(delete_response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,76 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from . import utils as fts_parser_utils
|
||||
|
||||
|
||||
class FtsParserGetTestCase(BaseTestGenerator):
|
||||
""" This class will add new foreign table under schema node. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for FTS parser node.
|
||||
('Fetch FTS parser Node URL', dict(url='/browser/fts_parser/obj/'))
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = self.schema_data['schema_name']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
self.fts_parser_name = "fts_parser_%s" % str(uuid.uuid4())[1:4]
|
||||
|
||||
self.fts_parser_id = fts_parser_utils.create_fts_parser(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.fts_parser_name)
|
||||
|
||||
def runTest(self):
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
response = self.tester.get(self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.fts_parser_id),
|
||||
content_type='html/json')
|
||||
|
||||
self.assertEquals(response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,96 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from . import utils as fts_parser_utils
|
||||
|
||||
|
||||
class FtsParserPutTestCase(BaseTestGenerator):
|
||||
""" This class will update added FTS Parser under schema node. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for FTS parser node.
|
||||
('Fetch FTS parser Node URL', dict(url='/browser/fts_parser/obj/'))
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = self.schema_data['schema_name']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
self.fts_parser_name = "fts_parser_%s" % str(uuid.uuid4())[1:4]
|
||||
|
||||
self.fts_parser_id = fts_parser_utils.create_fts_parser(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.fts_parser_name)
|
||||
|
||||
def runTest(self):
|
||||
|
||||
""" This function will update FTS parser present under
|
||||
test schema. """
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
parser_response = fts_parser_utils.verify_fts_parser(self.server,
|
||||
self.db_name,
|
||||
self.fts_parser_name)
|
||||
|
||||
if not parser_response:
|
||||
raise Exception("Could not find the FTS parser.")
|
||||
|
||||
data = \
|
||||
{
|
||||
"description": "This is FTS parser update comment",
|
||||
"id": self.fts_parser_id
|
||||
|
||||
}
|
||||
|
||||
put_response = self.tester.put(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.fts_parser_id),
|
||||
data=json.dumps(data),
|
||||
follow_redirects=True)
|
||||
|
||||
self.assertEquals(put_response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,81 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import traceback
|
||||
import os
|
||||
import sys
|
||||
from regression.test_utils import get_db_connection
|
||||
|
||||
file_name = os.path.basename(__file__)
|
||||
|
||||
|
||||
def create_fts_parser(server, db_name, schema_name, fts_parser_name):
|
||||
"""This function will add the fts_parser under test schema. """
|
||||
|
||||
try:
|
||||
connection = get_db_connection(db_name,
|
||||
server['username'],
|
||||
server['db_password'],
|
||||
server['host'],
|
||||
server['port'])
|
||||
pg_cursor = connection.cursor()
|
||||
|
||||
query = "CREATE TEXT SEARCH PARSER " + schema_name + "." + fts_parser_name + \
|
||||
"(START=int4_accum, GETTOKEN=gist_box_penalty, " \
|
||||
"END=btfloat4sortsupport, LEXTYPES=dsynonym_init)"
|
||||
|
||||
pg_cursor.execute(query)
|
||||
connection.commit()
|
||||
|
||||
# Get 'oid' from newly created parser
|
||||
pg_cursor.execute("select oid from pg_catalog.pg_ts_parser where "
|
||||
"prsname = '%s' order by oid ASC limit 1"
|
||||
% fts_parser_name)
|
||||
|
||||
oid = pg_cursor.fetchone()
|
||||
fts_parser_id = ''
|
||||
if oid:
|
||||
fts_parser_id = oid[0]
|
||||
connection.close()
|
||||
return fts_parser_id
|
||||
except Exception:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
|
||||
|
||||
def verify_fts_parser(server, db_name, fts_parser_name):
|
||||
"""
|
||||
This function will verify current FTS parser.
|
||||
|
||||
:param server: server details
|
||||
:type server: dict
|
||||
:param db_name: database name
|
||||
:type db_name: str
|
||||
:param fts_parser_name: FTS parser name to be added
|
||||
:type fts_parser_name: str
|
||||
:return fts_temp: FTS parser detail
|
||||
:rtype: tuple
|
||||
"""
|
||||
try:
|
||||
connection = get_db_connection(db_name,
|
||||
server['username'],
|
||||
server['db_password'],
|
||||
server['host'],
|
||||
server['port'])
|
||||
pg_cursor = connection.cursor()
|
||||
|
||||
pg_cursor.execute(
|
||||
"select oid from pg_catalog.pg_ts_parser where "
|
||||
"prsname = '%s' order by oid ASC limit 1"
|
||||
% fts_parser_name)
|
||||
fts_parser = pg_cursor.fetchone()
|
||||
connection.close()
|
||||
return fts_parser
|
||||
except Exception:
|
||||
traceback.print_exc(file=sys.stderr)
|
|
@ -0,0 +1,16 @@
|
|||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
|
||||
|
||||
class ForeignTableTestGenerator(BaseTestGenerator):
|
||||
|
||||
def runTest(self):
|
||||
return []
|
|
@ -0,0 +1,77 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
|
||||
|
||||
class FtsTemplateAddTestCase(BaseTestGenerator):
|
||||
""" This class will add new FTS template under test schema. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for FTS template node.
|
||||
('Fetch FTS templates Node URL', dict(url='/browser/fts_template/obj/'))
|
||||
]
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add FTS template present under
|
||||
test schema. """
|
||||
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = self.schema_data['schema_name']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
self.data = \
|
||||
{
|
||||
"name": "fts_temp_%s" % str(uuid.uuid4())[1:4],
|
||||
"schema": self.schema_id,
|
||||
"tmplinit": "dispell_init",
|
||||
"tmpllexize": "dispell_lexize"
|
||||
}
|
||||
|
||||
response = self.tester.post(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' + str(self.db_id) +
|
||||
'/' + str(self.schema_id) + '/',
|
||||
data=json.dumps(self.data),
|
||||
content_type='html/json')
|
||||
|
||||
self.assertEquals(response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,84 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from . import utils as fts_temp_utils
|
||||
|
||||
|
||||
class FtsTemplateDeleteTestCase(BaseTestGenerator):
|
||||
""" This class will delete new FTS template under schema node. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for FTS template node.
|
||||
('Fetch FTS template Node URL', dict(url='/browser/fts_template/obj/'))
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = self.schema_data['schema_name']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
self.fts_temp_name = "fts_temp_%s" % str(uuid.uuid4())[1:4]
|
||||
self.fts_temp_id = fts_temp_utils.create_fts_template(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.fts_temp_name)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will delete FTS template present under
|
||||
test schema. """
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
fts_response = fts_temp_utils.verify_fts_template(self.server,
|
||||
self.db_name,
|
||||
self.fts_temp_name)
|
||||
|
||||
if not fts_response:
|
||||
raise Exception("Could not find the FTS template.")
|
||||
|
||||
delete_response = self.tester.delete(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.fts_temp_id),
|
||||
follow_redirects=True)
|
||||
|
||||
self.assertEquals(delete_response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,76 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from . import utils as fts_temp_utils
|
||||
|
||||
|
||||
class FtsTemplateGetTestCase(BaseTestGenerator):
|
||||
""" This class will fetch new FTS template under test schema. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for FTS template node.
|
||||
('Fetch FTS templates Node URL', dict(url='/browser/fts_template/obj/'))
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = self.schema_data['schema_name']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
self.fts_temp_name = "fts_temp_%s" % str(uuid.uuid4())[1:4]
|
||||
self.fts_temp_id = fts_temp_utils.create_fts_template(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.fts_temp_name)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will fetch FTS template present under
|
||||
test schema. """
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
response = self.tester.get(self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.fts_temp_id),
|
||||
content_type='html/json')
|
||||
|
||||
self.assertEquals(response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,93 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression import test_utils as utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from . import utils as fts_temp_utils
|
||||
|
||||
|
||||
class FtsTemplatePutTestCase(BaseTestGenerator):
|
||||
""" This class will update new FTS template under schema node. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for FTS template node.
|
||||
('Fetch FTS template Node URL', dict(url='/browser/fts_template/obj/'))
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.schema_name = self.schema_data['schema_name']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
self.fts_temp_name = "fts_temp_%s" % str(uuid.uuid4())[1:4]
|
||||
self.fts_temp_id = fts_temp_utils.create_fts_template(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.fts_temp_name)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will update FTS template present under
|
||||
test schema. """
|
||||
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
fts_response = fts_temp_utils.verify_fts_template(self.server,
|
||||
self.db_name,
|
||||
self.fts_temp_name)
|
||||
|
||||
if not fts_response:
|
||||
raise Exception("Could not find the FTS template.")
|
||||
|
||||
data = \
|
||||
{
|
||||
"description": "This is FTS template update comment",
|
||||
"id": self.fts_temp_id
|
||||
|
||||
}
|
||||
|
||||
put_response = self.tester.put(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.fts_temp_id),
|
||||
data=json.dumps(data),
|
||||
follow_redirects=True)
|
||||
|
||||
self.assertEquals(put_response.status_code, 200)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function disconnect the test database."""
|
||||
|
||||
database_utils.disconnect_database(self, self.server_id,
|
||||
self.db_id)
|
|
@ -0,0 +1,77 @@
|
|||
# #################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
# ##################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
import traceback
|
||||
import os
|
||||
import sys
|
||||
from regression.test_utils import get_db_connection
|
||||
|
||||
file_name = os.path.basename(__file__)
|
||||
|
||||
|
||||
def create_fts_template(server, db_name, schema_name, fts_temp_name):
|
||||
"""This function will add the fts_template under test schema. """
|
||||
|
||||
try:
|
||||
connection = get_db_connection(db_name,
|
||||
server['username'],
|
||||
server['db_password'],
|
||||
server['host'],
|
||||
server['port'])
|
||||
pg_cursor = connection.cursor()
|
||||
|
||||
query = "CREATE TEXT SEARCH TEMPLATE " + schema_name + "." + fts_temp_name + \
|
||||
"(INIT=dispell_init, LEXIZE=dispell_lexize)"
|
||||
pg_cursor.execute(query)
|
||||
connection.commit()
|
||||
|
||||
# Get 'oid' from newly created template
|
||||
pg_cursor.execute("select oid from pg_catalog.pg_ts_template where "
|
||||
"tmplname = '%s' order by oid ASC limit 1" % fts_temp_name)
|
||||
|
||||
oid = pg_cursor.fetchone()
|
||||
fts_temp_id = ''
|
||||
if oid:
|
||||
fts_temp_id = oid[0]
|
||||
connection.close()
|
||||
return fts_temp_id
|
||||
except Exception:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
|
||||
|
||||
def verify_fts_template(server, db_name, fts_temp_name):
|
||||
"""
|
||||
This function will verify current FTS template.
|
||||
|
||||
:param server: server details
|
||||
:type server: dict
|
||||
:param db_name: database name
|
||||
:type db_name: str
|
||||
:param fts_temp_name: FTS template name to be added
|
||||
:type fts_temp_name: str
|
||||
:return fts_temp: FTS template detail
|
||||
:rtype: tuple
|
||||
"""
|
||||
try:
|
||||
connection = get_db_connection(db_name,
|
||||
server['username'],
|
||||
server['db_password'],
|
||||
server['host'],
|
||||
server['port'])
|
||||
pg_cursor = connection.cursor()
|
||||
|
||||
pg_cursor.execute(
|
||||
"select oid from pg_catalog.pg_ts_template where "
|
||||
"tmplname = '%s' order by oid ASC limit 1" % fts_temp_name)
|
||||
fts_template = pg_cursor.fetchone()
|
||||
connection.close()
|
||||
return fts_template
|
||||
except Exception:
|
||||
traceback.print_exc(file=sys.stderr)
|
Loading…
Reference in New Issue