210 lines
7.1 KiB
Python
210 lines
7.1 KiB
Python
#############################################################
|
|
#
|
|
# pgAdmin 4 - PostgreSQL Tools
|
|
#
|
|
# Copyright (C) 2013 - 2024, The pgAdmin Development Team
|
|
# This software is released under the PostgreSQL Licence
|
|
#
|
|
##############################################################
|
|
|
|
import sys
|
|
import traceback
|
|
from abc import ABCMeta, abstractmethod
|
|
from importlib import import_module
|
|
|
|
from werkzeug.utils import find_modules
|
|
from pgadmin.utils import server_utils
|
|
from pgadmin.utils.constants import PSYCOPG3
|
|
from .. import socketio
|
|
|
|
import unittest
|
|
import config
|
|
|
|
|
|
class TestsGeneratorRegistry(ABCMeta):
|
|
"""
|
|
class TestsGeneratorRegistry()
|
|
Every module will be registered automatically by its module name.
|
|
|
|
Class-level Methods:
|
|
----------- -------
|
|
* __init__(...)
|
|
- This is used to register test modules. You don't need to
|
|
call this function explicitly. This will be automatically executed,
|
|
whenever we create a class and inherit from BaseTestGenerator -
|
|
it will register it as an available module in TestsGeneratorRegistry.
|
|
By setting the __metaclass__ for BaseTestGenerator to
|
|
TestsGeneratorRegistry it will create new instance of this
|
|
TestsGeneratorRegistry per class.
|
|
|
|
* load_generators():
|
|
- This function will load all the modules from __init__()
|
|
present in registry.
|
|
"""
|
|
|
|
registry = dict()
|
|
|
|
def __init__(self, name, bases, d):
|
|
|
|
# Register this type of module, based on the module name
|
|
# Avoid registering the BaseDriver itself
|
|
|
|
if name != 'BaseTestGenerator' and name != 'BaseFeatureTest':
|
|
# Store/append test classes in 'registry' if test modules has
|
|
# multiple classes
|
|
if d['__module__'] in TestsGeneratorRegistry.registry:
|
|
TestsGeneratorRegistry.registry[d['__module__']].append(self)
|
|
else:
|
|
TestsGeneratorRegistry.registry[d['__module__']] = [self]
|
|
|
|
ABCMeta.__init__(self, name, bases, d)
|
|
|
|
@classmethod
|
|
def load_generators(cls, pkg_args, pkg_root, exclude_pkgs, for_modules=[],
|
|
is_resql_only=False):
|
|
|
|
cls.registry = dict()
|
|
|
|
all_modules = []
|
|
|
|
try:
|
|
for module_name in find_modules(pkg_root, False, True):
|
|
all_modules.append(module_name)
|
|
except Exception:
|
|
pass
|
|
|
|
if 'resql' not in exclude_pkgs:
|
|
# Append reverse engineered test case module
|
|
all_modules.append('regression.re_sql.tests.test_resql')
|
|
|
|
if (pkg_args is None or pkg_args == "all") and \
|
|
'feature_tests' not in exclude_pkgs:
|
|
# Append feature tests module
|
|
all_modules += find_modules(
|
|
'regression.feature_tests', False, True)
|
|
|
|
# If specific modules are to be tested, exclude others
|
|
# for modules are handled differently for resql
|
|
if not is_resql_only and len(for_modules) > 0:
|
|
all_modules = [module_name
|
|
for module_name in all_modules
|
|
for fmod in for_modules
|
|
if module_name.endswith(fmod)]
|
|
|
|
# Set the module list and exclude packages in the BaseTestGenerator
|
|
# for Reverse Engineer SQL test cases.
|
|
BaseTestGenerator.setReSQLModuleList(all_modules)
|
|
BaseTestGenerator.setExcludePkgs(exclude_pkgs)
|
|
|
|
# Check if only reverse engineered sql test cases to run
|
|
# if yes then import only that module
|
|
if is_resql_only:
|
|
BaseTestGenerator.setForModules(for_modules)
|
|
# In case of RESQL only clear the registry of modules, as
|
|
# RESQL test cases should be run.
|
|
cls.registry = dict()
|
|
try:
|
|
import_module('regression.re_sql.tests.test_resql')
|
|
except ImportError:
|
|
traceback.print_exc(file=sys.stderr)
|
|
else:
|
|
# Check for SERVER mode
|
|
TestsGeneratorRegistry._exclude_packages(all_modules,
|
|
exclude_pkgs)
|
|
|
|
@staticmethod
|
|
def _exclude_packages(all_modules, exclude_pkgs):
|
|
"""
|
|
This function check for server mode test cases.
|
|
:param all_modules: all modules.
|
|
:param exclude_pkgs: exclude package list.
|
|
"""
|
|
for module_name in all_modules:
|
|
try:
|
|
if "tests." in str(module_name) and not any(
|
|
str(module_name).startswith(
|
|
'pgadmin.' + str(exclude_pkg)
|
|
) for exclude_pkg in exclude_pkgs
|
|
):
|
|
import_module(module_name)
|
|
except ImportError:
|
|
traceback.print_exc(file=sys.stderr)
|
|
|
|
|
|
class BaseTestGenerator(unittest.TestCase, metaclass=TestsGeneratorRegistry):
|
|
# Defining abstract method which will override by individual testcase.
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.server_id = self.server_information["server_id"]
|
|
server_con = server_utils.connect_server(self, self.server_id)
|
|
if hasattr(self, 'skip_on_database') and \
|
|
'data' in server_con and 'type' in server_con['data'] and \
|
|
server_con['data']['type'] in self.skip_on_database:
|
|
self.skipTest('cannot run in: %s' % server_con['data']['type'])
|
|
if hasattr(self, 'mock_data') and 'function_name' in self.mock_data:
|
|
self.mock_data['function_name'] =\
|
|
self.mock_data['function_name'].replace(
|
|
PSYCOPG3, config.PG_DEFAULT_DRIVER)
|
|
|
|
def setTestServer(self, server):
|
|
self.server = server
|
|
|
|
@abstractmethod
|
|
def runTest(self):
|
|
pass
|
|
|
|
# Initializing app.
|
|
def setApp(self, app):
|
|
self.app = app
|
|
|
|
# Initializing test_client.
|
|
@classmethod
|
|
def setTestClient(cls, test_client):
|
|
cls.tester = test_client
|
|
|
|
def setDriver(self, driver):
|
|
self.driver = driver
|
|
|
|
def setParallelUI_tests(self, parallel_ui_tests):
|
|
self.parallel_ui_tests = parallel_ui_tests
|
|
|
|
def setServerInformation(self, server_information):
|
|
self.server_information = server_information
|
|
|
|
def setTestDatabaseName(self, database_name):
|
|
self.test_db = database_name
|
|
|
|
@classmethod
|
|
def setReSQLModuleList(cls, module_list):
|
|
cls.re_sql_module_list = module_list
|
|
|
|
@classmethod
|
|
def setExcludePkgs(cls, exclude_pkgs):
|
|
cls.exclude_pkgs = exclude_pkgs
|
|
|
|
@classmethod
|
|
def setForModules(cls, for_modules):
|
|
cls.for_modules = for_modules
|
|
|
|
|
|
class BaseSocketTestGenerator(BaseTestGenerator):
|
|
SOCKET_NAMESPACE = ""
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.tester.get("/")
|
|
self.socket_client = socketio.test_client(
|
|
self.app, namespace=self.SOCKET_NAMESPACE,
|
|
flask_test_client=self.tester)
|
|
self.assertTrue(self.socket_client.is_connected(self.SOCKET_NAMESPACE))
|
|
|
|
def runTest(self):
|
|
super().runTest()
|
|
|
|
def tearDown(self):
|
|
super().tearDown()
|
|
self.socket_client.disconnect(namespace=self.SOCKET_NAMESPACE)
|
|
self.assertFalse(
|
|
self.socket_client.is_connected(self.SOCKET_NAMESPACE))
|