From a2a2b8b888f2f9002f742fcea4a7823b1bf8f9ef Mon Sep 17 00:00:00 2001 From: Murtuza Zabuawala Date: Mon, 10 Apr 2017 14:07:48 +0100 Subject: [PATCH] Ensure the query tool displays but does not render HTML returned by the server in the results grid. Fixes #2330. --- .../xss_checks_panels_and_query_tool_test.py | 189 ++++++++++++++++++ .../xss_checks_pgadmin_debugger_test.py | 119 +++++++++++ .../js/slickgrid/slick.pgadmin.formatters.js | 10 +- web/pgadmin/tools/debugger/__init__.py | 16 +- .../debugger/templates/debugger/js/direct.js | 3 + web/regression/feature_utils/pgadmin_page.py | 5 +- .../python_test_utils/test_utils.py | 77 +++++++ 7 files changed, 405 insertions(+), 14 deletions(-) create mode 100644 web/pgadmin/feature_tests/xss_checks_panels_and_query_tool_test.py create mode 100644 web/pgadmin/feature_tests/xss_checks_pgadmin_debugger_test.py diff --git a/web/pgadmin/feature_tests/xss_checks_panels_and_query_tool_test.py b/web/pgadmin/feature_tests/xss_checks_panels_and_query_tool_test.py new file mode 100644 index 000000000..526d4e766 --- /dev/null +++ b/web/pgadmin/feature_tests/xss_checks_panels_and_query_tool_test.py @@ -0,0 +1,189 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2017, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +from selenium.webdriver import ActionChains +from regression.python_test_utils import test_utils +from regression.feature_utils.base_feature_test import BaseFeatureTest +import time + +class CheckForXssFeatureTest(BaseFeatureTest): + """ + Tests to check if pgAdmin4 is vulnerable to XSS. + + Here we will check html source code for escaped characters if we + found them in the code then we are not vulnerable otherwise we might. + + We will cover, + 1) Browser Tree (aciTree) + 2) Properties Tab (BackFrom) + 3) Dependents Tab (BackGrid) + 4) SQL Tab (Code Mirror) + 5) Query Tool (SlickGrid) + """ + + scenarios = [ + ("Test XSS check for panels and query tool", dict()) + ] + + def before(self): + connection = test_utils.get_db_connection(self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port']) + test_utils.drop_database(connection, "acceptance_test_db") + test_utils.create_database(self.server, "acceptance_test_db") + test_utils.create_table(self.server, "acceptance_test_db", + "

X") + + # This is needed to test dependents tab (eg: BackGrid) + test_utils.create_constraint(self.server, "acceptance_test_db", + "

X", + "unique", "

Y") + + def runTest(self): + self.page.wait_for_spinner_to_disappear() + self._connects_to_server() + self._tables_node_expandable() + self._check_xss_in_browser_tree() + self._check_xss_in_properties_tab() + self._check_xss_in_sql_tab() + self._check_xss_in_dependents_tab() + + # Query tool + self._check_xss_in_query_tool() + self._close_query_tool() + + def after(self): + time.sleep(1) + self.page.remove_server(self.server) + connection = test_utils.get_db_connection(self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port']) + test_utils.drop_database(connection, "acceptance_test_db") + + def _connects_to_server(self): + self.page.find_by_xpath("//*[@class='aciTreeText' and .='Servers']").click() + self.page.driver.find_element_by_link_text("Object").click() + ActionChains(self.page.driver) \ + .move_to_element(self.page.driver.find_element_by_link_text("Create")) \ + .perform() + self.page.find_by_partial_link_text("Server...").click() + + server_config = self.server + self.page.fill_input_by_field_name("name", server_config['name']) + self.page.find_by_partial_link_text("Connection").click() + self.page.fill_input_by_field_name("host", server_config['host']) + self.page.fill_input_by_field_name("port", server_config['port']) + self.page.fill_input_by_field_name("username", server_config['username']) + self.page.fill_input_by_field_name("password", server_config['db_password']) + self.page.find_by_xpath("//button[contains(.,'Save')]").click() + + def _tables_node_expandable(self): + self.page.toggle_open_tree_item(self.server['name']) + self.page.toggle_open_tree_item('Databases') + self.page.toggle_open_tree_item('acceptance_test_db') + self.page.toggle_open_tree_item('Schemas') + self.page.toggle_open_tree_item('public') + self.page.toggle_open_tree_item('Tables') + self.page.select_tree_item("

X") + + def _check_xss_in_browser_tree(self): + # Fetch the inner html & check for escaped characters + source_code = self.page.find_by_xpath( + "//*[@id='tree']" + ).get_attribute('innerHTML') + + self._check_escaped_characters( + source_code, + "<h1>X", + "Browser tree" + ) + + def _check_xss_in_properties_tab(self): + self.page.click_tab("Properties") + source_code = self.page.find_by_xpath( + "//span[contains(@class,'uneditable-input')]" + ).get_attribute('innerHTML') + self._check_escaped_characters( + source_code, + "<h1>X", + "Properties tab (Backform Control)" + ) + + + def _check_xss_in_sql_tab(self): + self.page.click_tab("SQL") + # Fetch the inner html & check for escaped characters + source_code = self.page.find_by_xpath( + "//*[contains(@class,'CodeMirror-lines') and contains(.,'CREATE TABLE')]" + ).get_attribute('innerHTML') + + self._check_escaped_characters( + source_code, + "<h1>X", + "SQL tab (Code Mirror)" + ) + + def _check_xss_in_dependents_tab(self): # Create any constraint with xss name to test this + self.page.click_tab("Dependents") + + source_code = self.page.find_by_xpath( + "//*[@id='5']/table/tbody/tr/td/div/div/div[2]/table/tbody/tr/td[2]" + ).get_attribute('innerHTML') + + self._check_escaped_characters( + source_code, + "public.<h1 onmouseover='console.log(2);'>Y", + "Dependents tab (BackGrid)" + ) + + def _check_xss_in_query_tool(self): + self.page.driver.find_element_by_link_text("Tools").click() + self.page.find_by_partial_link_text("Query Tool").click() + time.sleep(3) + self.page.driver.switch_to.frame(self.page.driver.find_element_by_tag_name('iframe')) + self.page.fill_codemirror_area_with("select ''") + time.sleep(1) + self.page.find_by_id("btn-flash").click() + time.sleep(2) + + source_code = self.page.find_by_xpath( + "//*[@id='0']//*[@id='datagrid']/div[5]/div/div[1]/div[2]" + ).get_attribute('innerHTML') + + self._check_escaped_characters( + source_code, + '<img src="x" onerror="console.log(1)">', + "Query tool (SlickGrid)" + ) + + def _close_query_tool(self): + self.page.driver.switch_to_default_content() + self.page.click_element( + self.page.find_by_xpath("//*[@id='dockerContainer']/div/div[3]/div/div[2]/div[1]") + ) + time.sleep(0.5) + self.page.driver.switch_to.frame(self.page.driver.find_element_by_tag_name('iframe')) + time.sleep(1) + self.page.click_element(self.page.find_by_xpath("//button[contains(.,'Yes')]")) + time.sleep(0.5) + self.page.driver.switch_to_default_content() + + + def _check_escaped_characters(self, source_code, string_to_find, source): + # For XSS we need to search against element's html code + if source_code.find(string_to_find) == -1: + # No escaped characters found + assert False, "{0} might be vulnerable to XSS ".format(source) + else: + # escaped characters found + assert True diff --git a/web/pgadmin/feature_tests/xss_checks_pgadmin_debugger_test.py b/web/pgadmin/feature_tests/xss_checks_pgadmin_debugger_test.py new file mode 100644 index 000000000..8b018fedc --- /dev/null +++ b/web/pgadmin/feature_tests/xss_checks_pgadmin_debugger_test.py @@ -0,0 +1,119 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2017, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +from selenium.webdriver import ActionChains +from regression.python_test_utils import test_utils +from regression.feature_utils.base_feature_test import BaseFeatureTest +import time + +class CheckDebuggerForXssFeatureTest(BaseFeatureTest): + """Tests to check if Debugger is vulnerable to XSS.""" + + scenarios = [ + ("Test table DDL generation", dict()) + ] + + def before(self): + # Some test function is needed for debugger + test_utils.create_debug_function(self.server, "postgres", + "test_function") + + def runTest(self): + self.page.wait_for_spinner_to_disappear() + self._connects_to_server() + self._function_node_expandable() + self._debug_function() + + def after(self): + time.sleep(0.5) + test_utils.drop_debug_function(self.server, "postgres", + "test_function") + self.page.remove_server(self.server) + + def _connects_to_server(self): + self.page.find_by_xpath("//*[@class='aciTreeText' and .='Servers']").click() + self.page.driver.find_element_by_link_text("Object").click() + ActionChains(self.page.driver) \ + .move_to_element(self.page.driver.find_element_by_link_text("Create")) \ + .perform() + self.page.find_by_partial_link_text("Server...").click() + + server_config = self.server + self.page.fill_input_by_field_name("name", server_config['name']) + self.page.find_by_partial_link_text("Connection").click() + self.page.fill_input_by_field_name("host", server_config['host']) + self.page.fill_input_by_field_name("port", server_config['port']) + self.page.fill_input_by_field_name("username", server_config['username']) + self.page.fill_input_by_field_name("password", server_config['db_password']) + self.page.find_by_xpath("//button[contains(.,'Save')]").click() + + def _function_node_expandable(self): + self.page.toggle_open_tree_item(self.server['name']) + self.page.toggle_open_tree_item('Databases') + self.page.toggle_open_tree_item('postgres') + self.page.toggle_open_tree_item('Schemas') + self.page.toggle_open_tree_item('public') + self.page.toggle_open_tree_item('Functions') + self.page.select_tree_item("test_function()") + + def _debug_function(self): + self.page.driver.find_element_by_link_text("Object").click() + ActionChains(self.page.driver) \ + .move_to_element(self.page.driver.find_element_by_link_text("Debugging")) \ + .perform() + self.page.driver.find_element_by_link_text("Debug").click() + time.sleep(0.5) + # We need to check if debugger plugin is installed or not + try: + is_error = self.page.find_by_xpath( + "//div[contains(@class,'ajs-header')]" + ).text + except Exception as e: + is_error = None + + # If debugger plugin is not found + if is_error and is_error == "Debugger Error": + self.page.click_modal_ok() + self.skipTest("Please make sure that debugger plugin is properly configured") + else: + time.sleep(2) + self.page.driver.switch_to.frame(self.page.driver.find_element_by_tag_name('iframe')) + self.page.click_element(self.page.driver.find_elements_by_xpath("//button")[2]) + time.sleep(2) + + # Only this tab is vulnerable rest are BackGrid & Code Mirror control + # which are already tested in Query tool test case + self.page.click_tab("Messages") + source_code = self.page.find_by_xpath( + "//*[@id='messages']" + ).get_attribute('innerHTML') + + self._check_escaped_characters( + source_code, + 'NOTICE: <img src="x" onerror="console.log(1)">', + 'Debugger' + ) + self._close_debugger() + + def _close_debugger(self): + time.sleep(0.5) + self.page.driver.switch_to_default_content() + time.sleep(0.5) + self.page.click_element( + self.page.find_by_xpath("//*[@id='dockerContainer']/div/div[3]/div/div[2]/div[1]") + ) + + def _check_escaped_characters(self, source_code, string_to_find, source): + # For XSS we need to search against element's html code + if source_code.find(string_to_find) == -1: + # No escaped characters found + assert False, "{0} might be vulnerable to XSS ".format(source) + else: + # escaped characters found + assert True diff --git a/web/pgadmin/static/js/slickgrid/slick.pgadmin.formatters.js b/web/pgadmin/static/js/slickgrid/slick.pgadmin.formatters.js index b066095e1..290bdddac 100644 --- a/web/pgadmin/static/js/slickgrid/slick.pgadmin.formatters.js +++ b/web/pgadmin/static/js/slickgrid/slick.pgadmin.formatters.js @@ -24,7 +24,7 @@ } else { // Stringify only if it's json object if (typeof value === "object" && !Array.isArray(value)) { - return JSON.stringify(value); + return _.escape(JSON.stringify(value)); } else if (Array.isArray(value)) { var temp = []; $.each(value, function(i, val) { @@ -34,9 +34,9 @@ temp.push(val) } }); - return "[" + temp.join() + "]" + return _.escape("[" + temp.join() + "]") } else { - return value; + return _.escape(value); } } } @@ -49,7 +49,7 @@ return ''; } else { - return "" + value + ""; + return "" + _.escape(value) + ""; } } @@ -70,7 +70,7 @@ return "[null]"; } else { - return value; + return _.escape(value); } } diff --git a/web/pgadmin/tools/debugger/__init__.py b/web/pgadmin/tools/debugger/__init__.py index 03ba9b12e..9c435ff5a 100644 --- a/web/pgadmin/tools/debugger/__init__.py +++ b/web/pgadmin/tools/debugger/__init__.py @@ -1425,10 +1425,10 @@ def poll_end_execution_result(trans_id): status = 'Success' additional_msgs = conn.messages() if len(additional_msgs) > 0: - additional_msgs = [msg.strip("
") for msg in additional_msgs] - additional_msgs = "
".join(additional_msgs) + additional_msgs = [msg.strip("\n") for msg in additional_msgs] + additional_msgs = "\n".join(additional_msgs) if statusmsg: - statusmsg = additional_msgs + "
" + statusmsg + statusmsg = additional_msgs + "\n" + statusmsg else: statusmsg = additional_msgs @@ -1443,10 +1443,10 @@ def poll_end_execution_result(trans_id): status = 'Success' additional_msgs = conn.messages() if len(additional_msgs) > 0: - additional_msgs = [msg.strip("
") for msg in additional_msgs] - additional_msgs = "
".join(additional_msgs) + additional_msgs = [msg.strip("\n") for msg in additional_msgs] + additional_msgs = "\n".join(additional_msgs) if statusmsg: - statusmsg = additional_msgs + "
" + statusmsg + statusmsg = additional_msgs + "\n" + statusmsg else: statusmsg = additional_msgs @@ -1460,9 +1460,9 @@ def poll_end_execution_result(trans_id): additional_msgs = conn.messages() if len(additional_msgs) > 0: additional_msgs = [msg.strip("\n") for msg in additional_msgs] - additional_msgs = "
".join(additional_msgs) + additional_msgs = "\n".join(additional_msgs) if statusmsg: - statusmsg = additional_msgs + "
" + statusmsg + statusmsg = additional_msgs + "\n" + statusmsg else: statusmsg = additional_msgs return make_json_response(data={ diff --git a/web/pgadmin/tools/debugger/templates/debugger/js/direct.js b/web/pgadmin/tools/debugger/templates/debugger/js/direct.js index 971b1135d..8bdfdda65 100644 --- a/web/pgadmin/tools/debugger/templates/debugger/js/direct.js +++ b/web/pgadmin/tools/debugger/templates/debugger/js/direct.js @@ -383,6 +383,9 @@ define( // This function will update messages tab update_messages: function(msg) { + // To prevent xss + msg = _.escape(msg); + var old_msgs='', new_msgs=''; old_msgs = pgTools.DirectDebug.messages_panel.$container.find('.messages').html(); if(old_msgs) { diff --git a/web/regression/feature_utils/pgadmin_page.py b/web/regression/feature_utils/pgadmin_page.py index aa31bc047..3ebd8bcde 100644 --- a/web/regression/feature_utils/pgadmin_page.py +++ b/web/regression/feature_utils/pgadmin_page.py @@ -35,7 +35,10 @@ class PgadminPage: def click_modal_ok(self): time.sleep(0.5) - self.click_element(self.find_by_xpath("//button[contains(.,'OK')]")) + # Find active alertify dialog in case of multiple alertify dialog & click on that dialog + self.click_element( + self.find_by_xpath("//div[contains(@class, 'alertify') and not(contains(@class, 'ajs-hidden'))]//button[.='OK']") + ) def add_server(self, server_config): self.find_by_xpath("//*[@class='aciTreeText' and contains(.,'Servers')]").click() diff --git a/web/regression/python_test_utils/test_utils.py b/web/regression/python_test_utils/test_utils.py index 5fa13a648..947bcf3a1 100644 --- a/web/regression/python_test_utils/test_utils.py +++ b/web/regression/python_test_utils/test_utils.py @@ -158,6 +158,83 @@ def create_table(server, db_name, table_name): except Exception: traceback.print_exc(file=sys.stderr) +def create_constraint( + server, db_name, table_name, + constraint_type="unique", constraint_name="test_unique"): + try: + connection = get_db_connection(db_name, + server['username'], + server['db_password'], + server['host'], + server['port']) + old_isolation_level = connection.isolation_level + connection.set_isolation_level(0) + pg_cursor = connection.cursor() + pg_cursor.execute(''' + ALTER TABLE "%s" + ADD CONSTRAINT "%s" %s (some_column) + ''' % (table_name, constraint_name, constraint_type.upper()) + ) + + connection.set_isolation_level(old_isolation_level) + connection.commit() + + except Exception: + traceback.print_exc(file=sys.stderr) + +def create_debug_function(server, db_name, function_name="test_func"): + try: + connection = get_db_connection(db_name, + server['username'], + server['db_password'], + server['host'], + server['port']) + old_isolation_level = connection.isolation_level + connection.set_isolation_level(0) + pg_cursor = connection.cursor() + pg_cursor.execute(''' + CREATE OR REPLACE FUNCTION public."%s"() + RETURNS text + LANGUAGE 'plpgsql' + COST 100.0 + VOLATILE + AS $function$ + BEGIN + RAISE INFO 'This is a test function'; + RAISE NOTICE ''; + RAISE NOTICE '

'; + RETURN 'Hello, pgAdmin4'; + END; + $function$; + ''' % (function_name) + ) + connection.set_isolation_level(old_isolation_level) + connection.commit() + + except Exception: + traceback.print_exc(file=sys.stderr) + + +def drop_debug_function(server, db_name, function_name="test_func"): + try: + connection = get_db_connection(db_name, + server['username'], + server['db_password'], + server['host'], + server['port']) + old_isolation_level = connection.isolation_level + connection.set_isolation_level(0) + pg_cursor = connection.cursor() + pg_cursor.execute(''' + DROP FUNCTION public."%s"(); + ''' % (function_name) + ) + connection.set_isolation_level(old_isolation_level) + connection.commit() + + except Exception: + traceback.print_exc(file=sys.stderr) + def drop_database(connection, database_name): """This function used to drop the database"""