Add feature test framework, using selenium and chromedriver for UI testing.

Written by both George and Atira at Pivotal.
pull/3/head
Atira Odhner 2017-02-22 12:41:28 +00:00 committed by Dave Page
parent 89137f57b2
commit fe1aec5de0
12 changed files with 357 additions and 8 deletions

View File

@ -59,6 +59,12 @@ if 'PGADMIN_PORT' in globals():
globals()['PGADMIN_PORT'])
server_port = int(globals()['PGADMIN_PORT'])
PGADMIN_RUNTIME = True
elif 'PGADMIN_PORT' in os.environ:
port = os.environ['PGADMIN_PORT']
app.logger.debug(
'Not running under the desktop runtime, port: %s',
port)
server_port = int(port)
else:
app.logger.debug(
'Not running under the desktop runtime, port: %s',

View File

@ -0,0 +1,76 @@
#############################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##############################################################
from selenium.webdriver import ActionChains
import config as app_config
from regression import test_utils
from regression.feature_utils.base_feature_test import BaseFeatureTest
class ConnectsToServerFeatureTest(BaseFeatureTest):
"""
Tests that a database connection can be created from the UI
"""
def setUp(self):
super(ConnectsToServerFeatureTest, self).setUp()
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'])
test_utils.drop_database(connection, "acceptance_test_db")
test_utils.create_database(self.server, "acceptance_test_db")
test_utils.create_table(self.server, "acceptance_test_db", "test_table")
def runTest(self):
self.assertEqual(app_config.APP_NAME, self.page.driver.title)
self.page.wait_for_spinner_to_disappear()
self._connects_to_server()
self._tables_node_expandable()
def tearDown(self):
self.page.remove_server(self.server)
self.app_starter.stop_app()
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'])
test_utils.drop_database(connection, "acceptance_test_db")
def _connects_to_server(self):
self.page.find_by_xpath("//*[@class='aciTreeText' and .='Servers']").click()
self.page.driver.find_element_by_link_text("Object").click()
ActionChains(self.page.driver) \
.move_to_element(self.page.driver.find_element_by_link_text("Create")) \
.perform()
self.page.find_by_partial_link_text("Server...").click()
server_config = self.server
self.page.fill_input_by_field_name("name", server_config['name'])
self.page.find_by_partial_link_text("Connection").click()
self.page.fill_input_by_field_name("host", server_config['host'])
self.page.fill_input_by_field_name("port", server_config['port'])
self.page.fill_input_by_field_name("username", server_config['username'])
self.page.fill_input_by_field_name("password", server_config['db_password'])
self.page.find_by_xpath("//button[contains(.,'Save')]").click()
def _tables_node_expandable(self):
self.page.toggle_open_tree_item(self.server['name'])
self.page.toggle_open_tree_item('Databases')
self.page.toggle_open_tree_item('acceptance_test_db')
self.page.toggle_open_tree_item('Schemas')
self.page.toggle_open_tree_item('public')
self.page.toggle_open_tree_item('Tables')
self.page.toggle_open_tree_item('test_table')

View File

@ -0,0 +1,53 @@
from selenium import webdriver
from selenium.webdriver import ActionChains
from regression import test_utils
from regression.feature_utils.base_feature_test import BaseFeatureTest
class TemplateSelectionFeatureTest(BaseFeatureTest):
def setUp(self):
super(TemplateSelectionFeatureTest, self).setUp()
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'])
test_utils.drop_database(connection, "acceptance_test_db")
test_utils.create_database(self.server, "acceptance_test_db")
self.page.add_server(self.server)
def runTest(self):
test_utils.create_table(self.server, "acceptance_test_db", "test_table")
self.page.toggle_open_tree_item(self.server['name'])
self.page.toggle_open_tree_item('Databases')
self.page.toggle_open_tree_item('acceptance_test_db')
self.page.toggle_open_tree_item('Schemas')
self.page.toggle_open_tree_item('public')
self.page.find_by_xpath("//*[@id='tree']//*[@class='aciTreeText' and .='Trigger Functions']").click()
self.page.find_by_partial_link_text("Object").click()
ActionChains(self.page.driver) \
.move_to_element(self.page.driver.find_element_by_link_text("Create")) \
.perform()
self.page.find_by_partial_link_text("Trigger function...").click()
self.page.fill_input_by_field_name("name", "test-trigger-function")
self.page.find_by_partial_link_text("Definition").click()
self.page.fill_codemirror_area_with("some-trigger-function-content")
self.page.find_by_partial_link_text("SQL").click()
self.page.find_by_xpath("//*[contains(@class,'CodeMirror-lines') and contains(.,'LEAKPROOF')]")
def tearDown(self):
self.page.find_by_xpath("//button[contains(.,'Cancel')]").click()
self.page.remove_server(self.server)
self.app_starter.stop_app()
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'])
test_utils.drop_database(connection, "acceptance_test_db")

View File

@ -48,7 +48,7 @@ class TestsGeneratorRegistry(ABCMeta):
# Register this type of module, based on the module name
# Avoid registering the BaseDriver itself
if name != 'BaseTestGenerator':
if name != 'BaseTestGenerator' and name != 'BaseFeatureTest':
TestsGeneratorRegistry.registry[d['__module__']] = cls
ABCMeta.__init__(cls, name, bases, d)

View File

@ -1,4 +1,5 @@
parent_id.pkl
regression.log
test_greenplum_config.json
test_advanced_config.json
test_config.json

View File

@ -54,10 +54,10 @@ General Information
pgadmin4/web/regression/test_config.json
2b) After creating the server and test configuration file, add (or modify)
parameter values as per requirements. The pgAdmin4 regression framework expects
to find the files in the directory '/<installation dir>/web/regression/'.
If you move the file to another location, you must create a symbolic
link that specifies the new location.
parameter values as per requirements. The pgAdmin4 regression framework
expects to find the files in the directory
'/<installation dir>/web/regression/'. If you move the file to another
location, you must create a symbolic link that specifies the new location.
2c) Specifying Server Configuration file:
@ -103,6 +103,11 @@ Test Data Details
Execution:
-----------
- For feature tests to run as part of the entire test suite, Chrome and
chromedriver need to be installed; get chromedriver from
https://sites.google.com/a/chromium.org/chromedriver/downloads or a
package manager and make sure it is in the PATH
- The test framework is modular and pluggable and dynamically locates tests
for modules which are discovered at runtime. All test cases are found
and registered automatically by its module name in
@ -126,15 +131,15 @@ Execution:
- Execute test framework for single node at a time
Example 1) Run test framework for 'browser' node
run 'python runtests.py --pkg browser.tests'
run 'python runtests.py --pkg browser.server_groups.tests'
Example 2) Run test framework for 'database' node
run 'python runtests.py --pkg browser.server_groups.servers.databases.tests'
- Exclude a package and its subpackages when running tests:
Example: exclude acceptance tests but run all others:
run 'python runtests.py --exclude acceptance'
Example: exclude feature tests but run all others:
run 'python runtests.py --exclude feature_tests'
Example: exclude multiple packages:
run 'python runtests.py --exclude browser.server_groups.servers.databases,browser.server_groups.servers.tablespaces'

View File

View File

@ -0,0 +1,34 @@
import os
import subprocess
import signal
import random
class AppStarter:
"""
Helper for starting the full pgadmin4 app and loading the page via selenium
"""
def __init__(self, driver, app_config):
self.driver = driver
self.app_config = app_config
def start_app(self):
random_server_port = str(random.randint(10000, 65535))
env = {"PGADMIN_PORT": random_server_port}
env.update(os.environ)
self.pgadmin_process = subprocess.Popen(["python", "pgAdmin4.py", "magic-portal", random_server_port],
shell=False,
preexec_fn=os.setsid,
stderr=open(os.devnull, 'w'),
env=env)
self.driver.set_window_size(1024, 1024)
print("opening browser")
self.driver.get("http://" + self.app_config.DEFAULT_SERVER + ":" + random_server_port)
def stop_app(self):
self.driver.close()
os.killpg(os.getpgid(self.pgadmin_process.pid), signal.SIGTERM)

View File

@ -0,0 +1,26 @@
from selenium import webdriver
import config as app_config
from pgadmin.utils.route import BaseTestGenerator
from regression.feature_utils.app_starter import AppStarter
from regression.feature_utils.pgadmin_page import PgadminPage
class BaseFeatureTest(BaseTestGenerator):
def setUp(self):
if app_config.SERVER_MODE:
self.skipTest("Currently, config is set to start pgadmin in server mode. "
"This test doesn't know username and password so doesn't work in server mode")
driver = webdriver.Chrome()
self.app_starter = AppStarter(driver, app_config)
self.page = PgadminPage(driver, app_config)
self.app_starter.start_app()
self.page.wait_for_app()
def failureException(self, *args, **kwargs):
self.page.driver.save_screenshot('/tmp/feature_test_failure.png')
return AssertionError(*args, **kwargs)
def runTest(self):
pass

View File

@ -0,0 +1,124 @@
import time
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver import ActionChains
from selenium.webdriver.common.keys import Keys
class PgadminPage:
"""
Helper class for interacting with the page, given a selenium driver
"""
def __init__(self, driver, app_config):
self.driver = driver
self.app_config = app_config
def add_server(self, server_config):
self.wait_for_spinner_to_disappear()
self.find_by_xpath("//*[@class='aciTreeText' and contains(.,'Servers')]").click()
self.driver.find_element_by_link_text("Object").click()
ActionChains(self.driver) \
.move_to_element(self.driver.find_element_by_link_text("Create")) \
.perform()
self.find_by_partial_link_text("Server...").click()
self.fill_input_by_field_name("name", server_config['name'])
self.find_by_partial_link_text("Connection").click()
self.fill_input_by_field_name("host", server_config['host'])
self.fill_input_by_field_name("port", server_config['port'])
self.fill_input_by_field_name("username", server_config['username'])
self.fill_input_by_field_name("password", server_config['db_password'])
self.find_by_xpath("//button[contains(.,'Save')]").click()
self.find_by_xpath("//*[@id='tree']//*[.='" + server_config['name'] + "']")
def remove_server(self, server_config):
self.find_by_xpath("//*[@id='tree']//*[.='" + server_config['name'] + "' and @class='aciTreeItem']").click()
self.find_by_partial_link_text("Object").click()
self.find_by_partial_link_text("Delete/Drop").click()
time.sleep(0.5)
self.find_by_xpath("//button[contains(.,'OK')]").click()
def toggle_open_tree_item(self, tree_item_text):
self.find_by_xpath("//*[@id='tree']//*[.='" + tree_item_text + "']/../*[@class='aciTreeButton']").click()
def find_by_xpath(self, xpath):
return self.wait_for_element(lambda: self.driver.find_element_by_xpath(xpath))
def find_by_id(self, element_id):
return self.wait_for_element(lambda: self.driver.find_element_by_id(element_id))
def find_by_partial_link_text(self, link_text):
return self.wait_for_element(lambda: self.driver.find_element_by_partial_link_text(link_text))
def fill_input_by_field_name(self, field_name, field_content):
field = self.find_by_xpath("//input[@name='" + field_name + "']")
backspaces = [Keys.BACKSPACE]*len(field.get_attribute('value'))
field.click()
field.send_keys(backspaces)
field.send_keys(str(field_content))
self.wait_for_input_field_content(field_name, field_content)
def fill_codemirror_area_with(self, field_content):
self.find_by_xpath(
"//pre[contains(@class,'CodeMirror-line')]/../../../*[contains(@class,'CodeMirror-code')]").click()
ActionChains(self.driver).send_keys(field_content).perform()
def click_tab(self, tab_name):
self.find_by_xpath("//*[contains(@class,'wcPanelTab') and contains(.,'" + tab_name + "')]").click()
def wait_for_input_field_content(self, field_name, content):
def input_field_has_content():
element = self.driver.find_element_by_xpath(
"//input[@name='" + field_name + "']")
return str(content) == element.get_attribute('value')
return self._wait_for("field to contain '" + str(content) + "'", input_field_has_content)
def wait_for_element(self, find_method_with_args):
def element_if_it_exists():
try:
element = find_method_with_args()
if element.is_displayed() & element.is_enabled():
return element
except NoSuchElementException:
return False
return self._wait_for("element to exist", element_if_it_exists)
def wait_for_spinner_to_disappear(self):
def spinner_has_disappeared():
try:
self.driver.find_element_by_id("pg-spinner")
return False
except NoSuchElementException:
return True
self._wait_for("spinner to disappear", spinner_has_disappeared)
def wait_for_app(self):
def page_shows_app():
if self.driver.title == self.app_config.APP_NAME:
return True
else:
self.driver.refresh()
return False
self._wait_for("app to start", page_shows_app)
def _wait_for(self, waiting_for_message, condition_met_function):
timeout = 10
time_waited = 0
sleep_time = 0.01
while time_waited < timeout:
result = condition_met_function()
if result:
return result
time_waited += sleep_time
time.sleep(sleep_time)
raise AssertionError("timed out waiting for " + waiting_for_message)

View File

@ -153,10 +153,34 @@ def create_table(server, db_name, table_name):
traceback.print_exc(file=sys.stderr)
def create_table(server, db_name, table_name):
try:
connection = get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
pg_cursor.execute('''CREATE TABLE "%s" (name VARCHAR, value NUMERIC)''' % table_name)
pg_cursor.execute('''INSERT INTO "%s" VALUES ('Some-Name', 6)''' % table_name)
connection.set_isolation_level(old_isolation_level)
connection.commit()
except Exception:
traceback.print_exc(file=sys.stderr)
def drop_database(connection, database_name):
"""This function used to drop the database"""
if database_name not in ["postgres", "template1", "template0"]:
pg_cursor = connection.cursor()
pg_cursor.execute(
"SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity "
"WHERE pg_stat_activity.datname ='%s' and pid <> pg_backend_pid();" % database_name
)
pg_cursor.execute("SELECT * FROM pg_database db WHERE"
" db.datname='%s'" % database_name)
if pg_cursor.fetchall():