diff --git a/README.md b/README.md index f509c1fa7..d2c503884 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,7 @@ simple - adapt as required for your distribution: ``` 4. Ensure that a PostgreSQL installation's bin/ directory is in the path (so - pg_config can be found for building psycopg2), and install the required + pg_config can be found for building psycopg3), and install the required packages: ```bash diff --git a/web/config.py b/web/config.py index dbfbe8429..e93f6defd 100644 --- a/web/config.py +++ b/web/config.py @@ -303,10 +303,7 @@ LOG_ROTATION_MAX_LOG_FILES = 90 # Maximum number of backups to retain ########################################################################## # The default driver used for making connection with PostgreSQL -if sys.version_info < (3, 7): - PG_DEFAULT_DRIVER = 'psycopg2' -else: - PG_DEFAULT_DRIVER = 'psycopg3' +PG_DEFAULT_DRIVER = 'psycopg3' # Maximum allowed idle time in minutes before which releasing the connection # for the particular session. (in minutes) diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/sequences/tests/pg/default/alter_seq_props_msql_psycopg2.sql b/web/pgadmin/browser/server_groups/servers/databases/schemas/sequences/tests/pg/default/alter_seq_props_msql_psycopg2.sql deleted file mode 100644 index 62c9b8b84..000000000 --- a/web/pgadmin/browser/server_groups/servers/databases/schemas/sequences/tests/pg/default/alter_seq_props_msql_psycopg2.sql +++ /dev/null @@ -1,8 +0,0 @@ -SELECT setval('public."Seq1_$%{}[]()&*^!@""''`\/#"', 7, true); - -ALTER SEQUENCE IF EXISTS public."Seq1_$%{}[]()&*^!@""'`\/#" - INCREMENT 12 - MINVALUE 2 - MAXVALUE 9992 - CACHE 2 - CYCLE; diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/sequences/tests/pg/default/test_sequences_pg.json b/web/pgadmin/browser/server_groups/servers/databases/schemas/sequences/tests/pg/default/test_sequences_pg.json index 771a8d479..80cba6cf0 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/schemas/sequences/tests/pg/default/test_sequences_pg.json +++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/sequences/tests/pg/default/test_sequences_pg.json @@ -70,25 +70,14 @@ "current_value": "7", "increment": "12", "minimum": "2", "maximum": "9992", "cache": "2", "cycled": true }, "expected_sql_file": "alter_seq_props.sql", - "expected_msql_file": "alter_seq_props_msql_psycopg2.sql", - "pg_driver": "psycopg2" - },{ - "type": "alter", - "name": "Alter Sequence properties", - "endpoint": "NODE-sequence.obj_id", - "sql_endpoint": "NODE-sequence.sql_id", - "msql_endpoint": "NODE-sequence.msql_id", - "data": { - "current_value": "7", "increment": "12", "minimum": "2", "maximum": "9992", "cache": "2", "cycled": true - }, - "expected_sql_file": "alter_seq_props.sql", - "expected_msql_file": "alter_seq_props_msql.sql", - "pg_driver": "psycopg3" + "expected_msql_file": "alter_seq_props_msql.sql" },{ "type": "alter", "name": "Alter Sequence add privileges", "endpoint": "NODE-sequence.obj_id", "sql_endpoint": "NODE-sequence.sql_id", + "sql_endpoint": "NODE-sequence.sql_id", + "sql_endpoint": "NODE-sequence.sql_id", "msql_endpoint": "NODE-sequence.msql_id", "data": { "relacl": { diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/sequences/tests/ppas/default/alter_seq_props_msql_psycopg2.sql b/web/pgadmin/browser/server_groups/servers/databases/schemas/sequences/tests/ppas/default/alter_seq_props_msql_psycopg2.sql deleted file mode 100644 index 62c9b8b84..000000000 --- a/web/pgadmin/browser/server_groups/servers/databases/schemas/sequences/tests/ppas/default/alter_seq_props_msql_psycopg2.sql +++ /dev/null @@ -1,8 +0,0 @@ -SELECT setval('public."Seq1_$%{}[]()&*^!@""''`\/#"', 7, true); - -ALTER SEQUENCE IF EXISTS public."Seq1_$%{}[]()&*^!@""'`\/#" - INCREMENT 12 - MINVALUE 2 - MAXVALUE 9992 - CACHE 2 - CYCLE; diff --git a/web/pgadmin/tools/debugger/__init__.py b/web/pgadmin/tools/debugger/__init__.py index f9e62f6e3..6ed88ad68 100644 --- a/web/pgadmin/tools/debugger/__init__.py +++ b/web/pgadmin/tools/debugger/__init__.py @@ -36,7 +36,6 @@ from pgadmin.browser.server_groups.servers.databases.extensions.utils \ from pgadmin.utils.constants import PREF_LABEL_KEYBOARD_SHORTCUTS, \ SERVER_CONNECTION_CLOSED from pgadmin.preferences import preferences -from pgadmin.utils.constants import PSYCOPG2 MODULE_NAME = 'debugger' @@ -1318,11 +1317,6 @@ def messages(trans_id): if conn.connected(): status = 'Busy' - if PG_DEFAULT_DRIVER == PSYCOPG2: - # psycopg3 doesn't require polling to get the - # messages as debugger connection is already open - # Remove this block while removing psucopg2 completely - _, result = conn.poll() notify = conn.messages() if notify: # In notice message we need to find "PLDBGBREAK" string to find diff --git a/web/pgadmin/tools/sqleditor/tests/test_download_csv_query_tool.py b/web/pgadmin/tools/sqleditor/tests/test_download_csv_query_tool.py index 042f3edd4..db0619f96 100644 --- a/web/pgadmin/tools/sqleditor/tests/test_download_csv_query_tool.py +++ b/web/pgadmin/tools/sqleditor/tests/test_download_csv_query_tool.py @@ -10,7 +10,6 @@ from unittest.mock import patch from pgadmin.utils.route import BaseTestGenerator -from pgadmin.utils.constants import PSYCOPG2 from pgadmin.browser.server_groups.servers.databases.tests import utils as \ database_utils from regression.python_test_utils import test_utils @@ -153,13 +152,8 @@ class TestDownloadCSV(BaseTestGenerator): # Disable the console logging from Flask logger self.app.logger.disabled = True if not self.is_valid and self.is_valid_tx: - if config.PG_DEFAULT_DRIVER == PSYCOPG2: - # When user enters wrong query, poll will throw 500, - # so expecting 500, as poll is never called for a wrong query. - self.assertEqual(res.status_code, 500) - else: - # The result will be null but status code will be 200 - self.assertEqual(res.status_code, 200) + # The result will be null but status code will be 200 + self.assertEqual(res.status_code, 200) elif self.filename is None: if self.download_as_txt: with patch('pgadmin.tools.sqleditor.blueprint.' diff --git a/web/pgadmin/tools/sqleditor/tests/test_sql_ascii_encoding.py b/web/pgadmin/tools/sqleditor/tests/test_sql_ascii_encoding.py index bf0c5e52c..e03a56308 100644 --- a/web/pgadmin/tools/sqleditor/tests/test_sql_ascii_encoding.py +++ b/web/pgadmin/tools/sqleditor/tests/test_sql_ascii_encoding.py @@ -2,17 +2,19 @@ # # pgAdmin 4 - PostgreSQL Tools # -# Copyright (C) 2013 - 2023, The pgAdmin Development Team +# Copyright (C) 2013 - 2022, The pgAdmin Development Team # This software is released under the PostgreSQL Licence # ########################################################################## import secrets +import json from pgadmin.utils.route import BaseTestGenerator -from pgadmin.utils.constants import PSYCOPG3 from regression.python_test_utils import test_utils from pgadmin.utils import server_utils +from pgadmin.browser.server_groups.servers.databases.tests import utils as \ + database_utils import config @@ -66,9 +68,6 @@ class TestSQLASCIIEncoding(BaseTestGenerator): ] def setUp(self): - if config.PG_DEFAULT_DRIVER == PSYCOPG3: - self.skipTest('SQL_ASCII encoding: skipping for psycopg3.') - self.encode_db_name = 'test_encoding_' + self.db_encoding + \ str(secrets.choice(range(1000, 65535))) self.encode_sid = self.server_information['server_id'] @@ -84,38 +83,43 @@ class TestSQLASCIIEncoding(BaseTestGenerator): self.server, self.encode_db_name, (self.db_encoding, self.lc_collate)) - test_utils.create_table_with_query( - self.server, - self.encode_db_name, - """CREATE TABLE {0}( - name character varying(200) COLLATE pg_catalog."default") - """.format(self.table_name)) - def runTest(self): - db_con = test_utils.get_db_connection( - self.encode_db_name, - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode'] - ) + db_con = database_utils.connect_database(self, + test_utils.SERVER_GROUP, + self.encode_sid, + self.encode_did) + if not db_con["info"] == "Database connected.": + raise Exception("Could not connect to the database.") - old_isolation_level = db_con.isolation_level - test_utils.set_isolation_level(db_con, 0) - pg_cursor = db_con.cursor() - pg_cursor.execute("SET client_encoding='{0}'".format(self.db_encoding)) - query = """INSERT INTO {0} VALUES('{1}')""".format( - self.table_name, self.test_str) - pg_cursor.execute(query) - test_utils.set_isolation_level(db_con, old_isolation_level) - db_con.commit() + # Initialize query tool + self.trans_id = str(secrets.choice(range(1, 9999999))) + url = '/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}'\ + .format(self.trans_id, test_utils.SERVER_GROUP, self.encode_sid, + self.encode_did) + response = self.tester.post(url) + self.assertEqual(response.status_code, 200) - query = """SELECT * FROM {0}""".format(self.table_name) - pg_cursor.execute(query) - resp = pg_cursor.fetchone() + # Check character + url = "/sqleditor/query_tool/start/{0}".format(self.trans_id) + sql = "select '{0}';".format(self.test_str) + response = self.tester.post(url, data=json.dumps({"sql": sql}), + content_type='html/json') + self.assertEqual(response.status_code, 200) + url = '/sqleditor/poll/{0}'.format(self.trans_id) + response = self.tester.get(url) + self.assertEqual(response.status_code, 200) + response_data = json.loads(response.data) + self.assertEqual(response_data['data']['rows_fetched_to'], 1) + result = response_data['data']['result'][0][0] + self.assertEqual(result, self.test_str) - self.assertEqual(resp[0], self.test_str) + # Close query tool + url = '/sqleditor/close/{0}'.format(self.trans_id) + response = self.tester.delete(url) + self.assertEqual(response.status_code, 200) + + database_utils.disconnect_database(self, self.encode_sid, + self.encode_did) def tearDown(self): main_conn = test_utils.get_db_connection( diff --git a/web/pgadmin/tools/sqleditor/tests/test_sql_ascii_encoding_psycopg3.py b/web/pgadmin/tools/sqleditor/tests/test_sql_ascii_encoding_psycopg3.py deleted file mode 100644 index ed49a8315..000000000 --- a/web/pgadmin/tools/sqleditor/tests/test_sql_ascii_encoding_psycopg3.py +++ /dev/null @@ -1,137 +0,0 @@ -########################################################################## -# -# pgAdmin 4 - PostgreSQL Tools -# -# Copyright (C) 2013 - 2022, The pgAdmin Development Team -# This software is released under the PostgreSQL Licence -# -########################################################################## - -import secrets -import json - -from pgadmin.utils.route import BaseTestGenerator -from pgadmin.utils.constants import PSYCOPG2 -from regression.python_test_utils import test_utils -from pgadmin.utils import server_utils -from pgadmin.browser.server_groups.servers.databases.tests import utils as \ - database_utils -import config - - -class TestSQLASCIIEncodingPsycopg3(BaseTestGenerator): - """ - This class validates character support in pgAdmin4 for - SQL_ASCII encodings - """ - scenarios = [ - ( - 'Test SQL_ASCII data with multiple backslashes', - dict( - table_name='test_sql_ascii', - db_encoding='SQL_ASCII', - lc_collate='C', - test_str='\\\\Four\\\Three\\Two\One' - )), - ( - 'Test SQL_ASCII data with file path', - dict( - table_name='test_sql_ascii', - db_encoding='SQL_ASCII', - lc_collate='C', - test_str='\\test\Documents\2017\12\19\AD93E646-' - 'E5FE-11E7-85AE-EB2E217F96F0.tif' - )), - ( - 'Test SQL_ASCII data with multiple forward slashes', - dict( - table_name='test_sql_ascii', - db_encoding='SQL_ASCII', - lc_collate='C', - test_str='////4///3//2/1' - )), - ( - 'Test SQL_ASCII data with blob string', - dict( - table_name='test_sql_ascii', - db_encoding='SQL_ASCII', - lc_collate='C', - test_str='Blob: \xf4\xa5\xa3\xa5' - )), - ( - 'Test SQL_ASCII data with blob string & ascii table name', - dict( - table_name='ΓΌ', - db_encoding='SQL_ASCII', - lc_collate='C', - test_str='Blob: \xf4\xa5\xa3\xa5' - )), - ] - - def setUp(self): - if config.PG_DEFAULT_DRIVER == PSYCOPG2: - self.skipTest('SQL_ASCII encoding: skipping for psycopg2.') - - self.encode_db_name = 'test_encoding_' + self.db_encoding + \ - str(secrets.choice(range(1000, 65535))) - self.encode_sid = self.server_information['server_id'] - - server_con = server_utils.connect_server(self, self.encode_sid) - if hasattr(self, 'skip_on_database'): - if 'data' in server_con and 'type' in server_con['data']: - if server_con['data']['type'] in self.skip_on_database: - self.skipTest('cannot run in: %s' % - server_con['data']['type']) - - self.encode_did = test_utils.create_database( - self.server, self.encode_db_name, - (self.db_encoding, self.lc_collate)) - - def runTest(self): - db_con = database_utils.connect_database(self, - test_utils.SERVER_GROUP, - self.encode_sid, - self.encode_did) - if not db_con["info"] == "Database connected.": - raise Exception("Could not connect to the database.") - - # Initialize query tool - self.trans_id = str(secrets.choice(range(1, 9999999))) - url = '/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}'\ - .format(self.trans_id, test_utils.SERVER_GROUP, self.encode_sid, - self.encode_did) - response = self.tester.post(url) - self.assertEqual(response.status_code, 200) - - # Check character - url = "/sqleditor/query_tool/start/{0}".format(self.trans_id) - sql = "select '{0}';".format(self.test_str) - response = self.tester.post(url, data=json.dumps({"sql": sql}), - content_type='html/json') - self.assertEqual(response.status_code, 200) - url = '/sqleditor/poll/{0}'.format(self.trans_id) - response = self.tester.get(url) - self.assertEqual(response.status_code, 200) - response_data = json.loads(response.data) - self.assertEqual(response_data['data']['rows_fetched_to'], 1) - result = response_data['data']['result'][0][0] - self.assertEqual(result, self.test_str) - - # Close query tool - url = '/sqleditor/close/{0}'.format(self.trans_id) - response = self.tester.delete(url) - self.assertEqual(response.status_code, 200) - - database_utils.disconnect_database(self, self.encode_sid, - self.encode_did) - - def tearDown(self): - main_conn = test_utils.get_db_connection( - self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode'] - ) - test_utils.drop_database(main_conn, self.encode_db_name) diff --git a/web/pgadmin/utils/constants.py b/web/pgadmin/utils/constants.py index 59638242e..400a44c82 100644 --- a/web/pgadmin/utils/constants.py +++ b/web/pgadmin/utils/constants.py @@ -118,7 +118,6 @@ USER_NOT_FOUND = gettext("The specified user ID (%s) could not be found.") DATABASE_LAST_SYSTEM_OID = 16383 # Drivers -PSYCOPG2 = 'psycopg2' PSYCOPG3 = 'psycopg3' # Shared storage diff --git a/web/pgadmin/utils/driver/psycopg2/__init__.py b/web/pgadmin/utils/driver/psycopg2/__init__.py deleted file mode 100644 index 8a958e8b6..000000000 --- a/web/pgadmin/utils/driver/psycopg2/__init__.py +++ /dev/null @@ -1,415 +0,0 @@ -########################################################################## -# -# pgAdmin 4 - PostgreSQL Tools -# -# Copyright (C) 2013 - 2023, The pgAdmin Development Team -# This software is released under the PostgreSQL Licence -# -########################################################################## - -""" -Implementation of Driver class -It is a wrapper around the actual psycopg2 driver, and connection -object. - -""" -import datetime -import re -from flask import session -from flask_login import current_user -from werkzeug.exceptions import InternalServerError -import psycopg2 -from psycopg2.extensions import adapt -from threading import Lock - -import config -from pgadmin.model import Server -from .keywords import scan_keyword -from ..abstract import BaseDriver -from .connection import Connection -from .server_manager import ServerManager - -connection_restore_lock = Lock() - - -class Driver(BaseDriver): - """ - class Driver(BaseDriver): - - This driver acts as a wrapper around psycopg2 connection driver - implementation. We will be using psycopg2 for makeing connection with - the PostgreSQL/EDB Postgres Advanced Server (EnterpriseDB). - - Properties: - ---------- - - * Version (string): - Version of psycopg2 driver - - Methods: - ------- - * get_connection(sid, database, conn_id, auto_reconnect) - - It returns a Connection class object, which may/may not be connected - to the database server for this sesssion - - * release_connection(seid, database, conn_id) - - It releases the connection object for the given conn_id/database for this - session. - - * connection_manager(sid, reset) - - It returns the server connection manager for this session. - """ - - def __init__(self, **kwargs): - self.managers = dict() - - super().__init__() - - def _restore_connections_from_session(self): - """ - Used internally by connection_manager to restore connections - from sessions. - """ - if session.sid not in self.managers: - self.managers[session.sid] = managers = dict() - if '__pgsql_server_managers' in session: - session_managers = \ - session['__pgsql_server_managers'].copy() - for server in \ - Server.query.filter_by( - user_id=current_user.id): - manager = managers[str(server.id)] = \ - ServerManager(server) - if server.id in session_managers: - manager._restore(session_managers[server.id]) - manager.update_session() - return managers - - return {} - - def connection_manager(self, sid=None): - """ - connection_manager(...) - - Returns the ServerManager object for the current session. It will - create new ServerManager object (if necessary). - - Parameters: - sid - - Server ID - """ - assert (sid is not None and isinstance(sid, int)) - managers = None - - server_data = Server.query.filter_by(id=sid).first() - if server_data is None: - return None - - if session.sid not in self.managers: - with connection_restore_lock: - # The wait is over but the object might have been loaded - # by some other thread check again - managers = self._restore_connections_from_session() - else: - managers = self.managers[session.sid] - if str(sid) in managers: - manager = managers[str(sid)] - with connection_restore_lock: - manager._restore_connections() - manager.update_session() - - managers['pinged'] = datetime.datetime.now() - if str(sid) not in managers: - s = Server.query.filter_by(id=sid).first() - - if not s: - return None - - managers[str(sid)] = ServerManager(s) - - return managers[str(sid)] - - return managers[str(sid)] - - def version(self): - """ - version(...) - - Returns the current version of psycopg2 driver - """ - _version = getattr(psycopg2, '__version__', None) - - if _version: - return _version - - raise InternalServerError( - "Driver Version information for psycopg2 is not available!" - ) - - def libpq_version(self): - """ - Returns the loaded libpq version - """ - version = getattr(psycopg2, '__libpq_version__', None) - if version: - return version - - raise InternalServerError( - "libpq version information is not available!" - ) - - def get_connection( - self, sid, database=None, conn_id=None, auto_reconnect=True - ): - """ - get_connection(...) - - Returns the connection object for the certain connection-id/database - for the specific server, identified by sid. Create a new Connection - object (if require). - - Parameters: - sid - - Server ID - database - - Database, on which the connection needs to be made - If provided none, maintenance_db for the server will be used, - while generating new Connection object. - conn_id - - Identification String for the Connection This will be used by - certain tools, which will require a dedicated connection for it. - i.e. Debugger, Query Tool, etc. - auto_reconnect - - This parameters define, if we should attempt to reconnect the - database server automatically, when connection has been lost for - any reason. Certain tools like Debugger will require a permenant - connection, and it stops working on disconnection. - - """ - manager = self.connection_manager(sid) - - return manager.connection(database=database, conn_id=conn_id, - auto_reconnect=auto_reconnect) - - def release_connection(self, sid, database=None, conn_id=None): - """ - Release the connection for the given connection-id/database in this - session. - """ - return self.connection_manager(sid).release(database, conn_id) - - def delete_manager(self, sid): - """ - Delete manager for given server id. - """ - manager = self.connection_manager(sid) - if manager is not None: - manager.release() - if session.sid in self.managers and \ - str(sid) in self.managers[session.sid]: - del self.managers[session.sid][str(sid)] - - def gc_timeout(self): - """ - Release the connections for the sessions, which have not pinged the - server for more than config.MAX_SESSION_IDLE_TIME. - """ - - # Minimum session idle is 20 minutes - max_idle_time = max(config.MAX_SESSION_IDLE_TIME or 60, 20) - session_idle_timeout = datetime.timedelta(minutes=max_idle_time) - - curr_time = datetime.datetime.now() - - for sess in self.managers: - sess_mgr = self.managers[sess] - - if sess == session.sid: - sess_mgr['pinged'] = curr_time - continue - if curr_time - sess_mgr['pinged'] >= session_idle_timeout: - for mgr in [ - m for m in sess_mgr.values() if isinstance(m, - ServerManager) - ]: - mgr.release() - - def gc_own(self): - """ - Release the connections for current session - This is useful when (eg. logout) we want to release all - connections (except dedicated connections created by utilities - like backup, restore etc) of all servers for current user. - """ - - sess_mgr = self.managers.get(session.sid, None) - - if sess_mgr: - for mgr in ( - m for m in sess_mgr.values() if isinstance(m, ServerManager) - ): - mgr.release() - - @staticmethod - def qtLiteral(value, force_quote=False): - adapted = adapt(value) - - # Not all adapted objects have encoding - # e.g. - # psycopg2.extensions.BOOLEAN - # psycopg2.extensions.FLOAT - # psycopg2.extensions.INTEGER - # etc... - if hasattr(adapted, 'encoding'): - adapted.encoding = 'utf8' - res = adapted.getquoted() - - if isinstance(res, bytes): - res = res.decode('utf-8') - - if force_quote is True: - # Convert the input to the string to use the startsWith(...) - res = str(res) - if not res.startswith("'"): - return "'" + res + "'" - - return res - - @staticmethod - def ScanKeywordExtraLookup(key): - # UNRESERVED_KEYWORD 0 - # COL_NAME_KEYWORD 1 - # TYPE_FUNC_NAME_KEYWORD 2 - # RESERVED_KEYWORD 3 - extra_keywords = { - 'connect': 3, - 'convert': 3, - 'distributed': 0, - 'exec': 3, - 'log': 0, - 'long': 3, - 'minus': 3, - 'nocache': 3, - 'number': 3, - 'package': 3, - 'pls_integer': 3, - 'raw': 3, - 'return': 3, - 'smalldatetime': 3, - 'smallfloat': 3, - 'smallmoney': 3, - 'sysdate': 3, - 'systimestap': 3, - 'tinyint': 3, - 'tinytext': 3, - 'varchar2': 3 - } - - return extra_keywords.get(key, None) or scan_keyword(key) - - @staticmethod - def needsQuoting(key, for_types): - value = key - val_noarray = value - - # check if the string is number or not - if isinstance(value, int): - return True - # certain types should not be quoted even though it contains a space. - # Evilness. - elif for_types and value[-2:] == "[]": - val_noarray = value[:-2] - - if for_types and val_noarray.lower() in [ - 'bit varying', - '"char"', - 'character varying', - 'double precision', - 'timestamp without time zone', - 'timestamp with time zone', - 'time without time zone', - 'time with time zone', - '"trigger"', - '"unknown"' - ]: - return False - - # If already quoted?, If yes then do not quote again - if for_types and val_noarray and \ - (val_noarray.startswith('"') or val_noarray.endswith('"')): - return False - - if '0' <= val_noarray[0] <= '9': - return True - - if re.search('[^a-z_0-9]+', val_noarray): - return True - - # check string is keywaord or not - category = Driver.ScanKeywordExtraLookup(value) - - if category is None: - return False - - # UNRESERVED_KEYWORD - if category == 0: - return False - - # COL_NAME_KEYWORD - if for_types and category == 1: - return False - - return True - - @staticmethod - def qtTypeIdent(conn, *args): - # We're not using the conn object at the moment, but - we will - # modify the - # logic to use the server version specific keywords later. - res = None - value = None - - for val in args: - # DataType doesn't have len function then convert it to string - if not hasattr(val, '__len__'): - val = str(val) - - if len(val) == 0: - continue - value = val - - if Driver.needsQuoting(val, True): - value = value.replace("\"", "\"\"") - value = "\"" + value + "\"" - - res = ((res and res + '.') or '') + value - - return res - - @staticmethod - def qtIdent(conn, *args): - # We're not using the conn object at the moment, but - we will - # modify the logic to use the server version specific keywords later. - res = None - value = None - - for val in args: - if isinstance(val, list): - return map(lambda w: Driver.qtIdent(conn, w), val) - - # DataType doesn't have len function then convert it to string - if not hasattr(val, '__len__'): - val = str(val) - - if len(val) == 0: - continue - - value = val - - if Driver.needsQuoting(val, False): - value = value.replace("\"", "\"\"") - value = "\"" + value + "\"" - - res = ((res and res + '.') or '') + value - - return res diff --git a/web/pgadmin/utils/driver/psycopg2/connection.py b/web/pgadmin/utils/driver/psycopg2/connection.py deleted file mode 100644 index b717e18f2..000000000 --- a/web/pgadmin/utils/driver/psycopg2/connection.py +++ /dev/null @@ -1,1994 +0,0 @@ -########################################################################## -# -# pgAdmin 4 - PostgreSQL Tools -# -# Copyright (C) 2013 - 2023, The pgAdmin Development Team -# This software is released under the PostgreSQL Licence -# -########################################################################## - -""" -Implementation of Connection. -It is a wrapper around the actual psycopg2 driver, and connection -object. -""" - -import secrets -import select -import datetime -from collections import deque -import psycopg2 -from flask import g, current_app -from flask_babel import gettext -from flask_security import current_user -from pgadmin.utils.crypto import decrypt -from psycopg2.extensions import encodings - -import config -from pgadmin.model import User -from pgadmin.utils.exception import ConnectionLost, CryptKeyMissing -from pgadmin.utils import get_complete_file_path -from ..abstract import BaseConnection -from .cursor import DictCursor -from .typecast import numeric_typecasters, register_global_typecasters,\ - register_string_typecasters, register_binary_typecasters, \ - unregister_numeric_typecasters, \ - register_array_to_string_typecasters, ALL_JSON_TYPES -from .encoding import get_encoding, configure_driver_encodings -from pgadmin.utils import csv -from pgadmin.utils.master_password import get_crypt_key -from io import StringIO -from pgadmin.utils.locker import ConnectionLocker - -_ = gettext - -# Register global type caster which will be applicable to all connections. -register_global_typecasters() -configure_driver_encodings(encodings) - - -class Connection(BaseConnection): - """ - class Connection() - - A wrapper class, which wraps the psycopg2 connection object, and - delegate the execution to the actual connection object, when required. - - Methods: - ------- - * connect(**kwargs) - - Connect the PostgreSQL/EDB Postgres Advanced Server using the psycopg2 - driver - - * execute_scalar(query, params, formatted_exception_msg) - - Execute the given query and returns single datum result - - * execute_async(query, params, formatted_exception_msg) - - Execute the given query asynchronously and returns result. - - * execute_void(query, params, formatted_exception_msg) - - Execute the given query with no result. - - * execute_2darray(query, params, formatted_exception_msg) - - Execute the given query and returns the result as a 2 dimensional - array. - - * execute_dict(query, params, formatted_exception_msg) - - Execute the given query and returns the result as an array of dict - (column name -> value) format. - - * connected() - - Get the status of the connection. - Returns True if connected, otherwise False. - - * reset() - - Reconnect the database server (if possible) - - * transaction_status() - - Transaction Status - - * ping() - - Ping the server. - - * _release() - - Release the connection object of psycopg2 - - * _reconnect() - - Attempt to reconnect to the database - - * _wait(conn) - - This method is used to wait for asynchronous connection. This is a - blocking call. - - * _wait_timeout(conn) - - This method is used to wait for asynchronous connection with timeout. - This is a non blocking call. - - * poll(formatted_exception_msg) - - This method is used to poll the data of query running on asynchronous - connection. - - * status_message() - - Returns the status message returned by the last command executed on - the server. - - * rows_affected() - - Returns the no of rows affected by the last command executed on - the server. - - * cancel_transaction(conn_id, did=None) - - This method is used to cancel the transaction for the - specified connection id and database id. - - * messages() - - Returns the list of messages/notices sends from the PostgreSQL database - server. - - * _formatted_exception_msg(exception_obj, formatted_msg) - - This method is used to parse the psycopg2.Error object and returns the - formatted error message if flag is set to true else return - normal error message. - - * check_notifies(required_polling) - - Check for the notify messages by polling the connection or after - execute is there in notifies. - - * get_notifies() - - This function will returns list of notifies received from database - server. - - * pq_encrypt_password_conn() - - This function will return the encrypted password for database server - - greater than or equal to 10. - """ - UNAUTHORIZED_REQUEST = gettext("Unauthorized request.") - CURSOR_NOT_FOUND = \ - gettext("Cursor could not be found for the async connection.") - ARGS_STR = "{0}#{1}" - - def __init__(self, manager, conn_id, db, **kwargs): - assert manager is not None - assert conn_id is not None - - auto_reconnect = kwargs.get('auto_reconnect', True) - async_ = kwargs.get('async_', 0) - use_binary_placeholder = kwargs.get('use_binary_placeholder', False) - array_to_string = kwargs.get('array_to_string', False) - - self.conn_id = conn_id - self.manager = manager - self.db = db if db is not None else manager.db - self.conn = None - self.auto_reconnect = auto_reconnect - self.async_ = async_ - self.__async_cursor = None - self.__async_query_id = None - self.__backend_pid = None - self.execution_aborted = False - self.row_count = 0 - self.__notices = None - self.__notifies = None - self.password = None - # This flag indicates the connection status (connected/disconnected). - self.wasConnected = False - # This flag indicates the connection reconnecting status. - self.reconnecting = False - self.use_binary_placeholder = use_binary_placeholder - self.array_to_string = array_to_string - super().__init__() - - def as_dict(self): - """ - Returns the dictionary object representing this object. - """ - # In case, it cannot be auto reconnectable, or already been released, - # then we will return None. - if not self.auto_reconnect and not self.conn: - return None - - res = dict() - res['conn_id'] = self.conn_id - res['database'] = self.db - res['async_'] = self.async_ - res['wasConnected'] = self.wasConnected - res['auto_reconnect'] = self.auto_reconnect - res['use_binary_placeholder'] = self.use_binary_placeholder - res['array_to_string'] = self.array_to_string - - return res - - def __repr__(self): - return "PG Connection: {0} ({1}) -> {2} (ajax:{3})".format( - self.conn_id, self.db, - 'Connected' if self.conn and not self.conn.closed else - "Disconnected", - self.async_ - ) - - def __str__(self): - return self.__repr__() - - def _check_user_password(self, kwargs): - """ - Check user and password. - """ - password = None - encpass = None - is_update_password = True - - if 'user' in kwargs and kwargs['password']: - password = kwargs['password'] - kwargs.pop('password') - is_update_password = False - else: - encpass = kwargs['password'] if 'password' in kwargs else None - - return password, encpass, is_update_password - - def _decode_password(self, encpass, manager, password, crypt_key): - if encpass: - # Fetch Logged in User Details. - user = User.query.filter_by(id=current_user.id).first() - - if user is None: - return True, self.UNAUTHORIZED_REQUEST, password - - try: - password = decrypt(encpass, crypt_key) - # password is in bytes, for python3 we need it in string - if isinstance(password, bytes): - password = password.decode() - except Exception as e: - manager.stop_ssh_tunnel() - current_app.logger.exception(e) - return True, \ - _( - "Failed to decrypt the saved password.\nError: {0}" - ).format(str(e)), password - return False, '', password - - def connect(self, **kwargs): - if self.conn: - if self.conn.closed: - self.conn = None - else: - return True, None - - manager = self.manager - crypt_key_present, crypt_key = get_crypt_key() - - password, encpass, is_update_password = self._check_user_password( - kwargs) - - passfile = kwargs['passfile'] if 'passfile' in kwargs else None - tunnel_password = kwargs['tunnel_password'] if 'tunnel_password' in \ - kwargs else '' - - # Check SSH Tunnel needs to be created - if manager.use_ssh_tunnel == 1 and not manager.tunnel_created: - status, error = manager.create_ssh_tunnel(tunnel_password) - if not status: - return False, error - - # Check SSH Tunnel is alive or not. - if manager.use_ssh_tunnel == 1: - manager.check_ssh_tunnel_alive() - - if is_update_password: - if encpass is None: - encpass = self.password or getattr(manager, 'password', None) - - self.password = encpass - - # Reset the existing connection password - if self.reconnecting is not False: - self.password = None - - if not crypt_key_present: - raise CryptKeyMissing() - - is_error, errmsg, password = self._decode_password(encpass, manager, - password, crypt_key) - if is_error: - return False, errmsg - - # If no password credential is found then connect request might - # come from Query tool, ViewData grid, debugger etc tools. - # we will check for pgpass file availability from connection manager - # if it's present then we will use it - if not password and not encpass and not passfile: - passfile = manager.get_connection_param_value('passfile') - if manager.passexec: - password = manager.passexec.get() - - try: - database = self.db - if 'user' in kwargs and kwargs['user']: - user = kwargs['user'] - else: - user = manager.user - conn_id = self.conn_id - - import os - os.environ['PGAPPNAME'] = '{0} - {1}'.format( - config.APP_NAME, conn_id) - - ssl_key = get_complete_file_path( - manager.get_connection_param_value('sslkey')) - sslmode = manager.get_connection_param_value('sslmode') - if ssl_key and sslmode in \ - ['require', 'verify-ca', 'verify-full']: - ssl_key_file_permission = \ - int(oct(os.stat(ssl_key).st_mode)[-3:]) - if ssl_key_file_permission > 600: - os.chmod(ssl_key, 0o600) - - with ConnectionLocker(manager.kerberos_conn): - # Create the connection string - connection_string = manager.create_connection_string( - database, user, password) - - pg_conn = psycopg2.connect(connection_string, - async_=self.async_) - - # If connection is asynchronous then we will have to wait - # until the connection is ready to use. - if self.async_ == 1: - self._wait(pg_conn) - - except psycopg2.Error as e: - manager.stop_ssh_tunnel() - if e.pgerror: - msg = e.pgerror - elif e.diag.message_detail: - msg = e.diag.message_detail - else: - msg = str(e) - current_app.logger.info( - "Failed to connect to the database server(#{server_id}) for " - "connection ({conn_id}) with error message as below" - ":{msg}".format( - server_id=self.manager.sid, - conn_id=conn_id, - msg=msg - ) - ) - return False, msg - - # Overwrite connection notice attr to support - # more than 50 notices at a time - pg_conn.notices = deque([], self.ASYNC_NOTICE_MAXLENGTH) - - self.conn = pg_conn - self.wasConnected = True - try: - status, msg = self._initialize(conn_id, **kwargs) - except Exception as e: - manager.stop_ssh_tunnel() - current_app.logger.exception(e) - self.conn = None - if not self.reconnecting: - self.wasConnected = False - raise e - - if status and is_update_password: - manager._update_password(encpass) - else: - if not self.reconnecting and is_update_password: - self.wasConnected = False - - return status, msg - - def _set_auto_commit(self, kwargs): - """ - autocommit flag does not work with asynchronous connections. - By default asynchronous connection runs in autocommit mode. - :param kwargs: - :return: - """ - if self.async_ == 0: - if 'autocommit' in kwargs and kwargs['autocommit'] is False: - self.conn.autocommit = False - else: - self.conn.autocommit = True - - def _set_role(self, manager, cur, conn_id, **kwargs): - """ - Set role - :param manager: - :param cur: - :param conn_id: - :return: - """ - is_set_role = False - role = None - - if 'role' in kwargs and kwargs['role']: - is_set_role = True - role = kwargs['role'] - elif manager.role: - is_set_role = True - role = manager.role - - if is_set_role: - status = self._execute(cur, "SET ROLE TO %s", [role]) - - if status is not None: - self.conn.close() - self.conn = None - current_app.logger.error( - "Connect to the database server (#{server_id}) for " - "connection ({conn_id}), but - failed to setup the role " - "with error message as below:{msg}".format( - server_id=self.manager.sid, - conn_id=conn_id, - msg=status - ) - ) - return True, \ - _( - "Failed to setup the role with error message:\n{0}" - ).format(status) - return False, '' - - def _execute(self, cur, query, params=None): - formatted_exception_msg = self._formatted_exception_msg - try: - self.__internal_blocking_execute(cur, query, params) - except psycopg2.Error as pe: - cur.close() - return formatted_exception_msg(pe, False) - return None - - def _initialize(self, conn_id, **kwargs): - self.execution_aborted = False - self.__backend_pid = self.conn.get_backend_pid() - - setattr(g, self.ARGS_STR.format( - self.manager.sid, - self.conn_id.encode('utf-8') - ), None) - - status, cur = self.__cursor() - - manager = self.manager - - # autocommit flag does not work with asynchronous connections. - # By default asynchronous connection runs in autocommit mode. - self._set_auto_commit(kwargs) - - register_string_typecasters(self.conn) - - if self.array_to_string: - register_array_to_string_typecasters(self.conn) - - # Register type casters for binary data only after registering array to - # string type casters. - if self.use_binary_placeholder: - register_binary_typecasters(self.conn) - - postgres_encoding, self.python_encoding, typecast_encoding = \ - get_encoding(self.conn.encoding) - - # Note that we use 'UPDATE pg_settings' for setting bytea_output as a - # convenience hack for those running on old, unsupported versions of - # PostgreSQL 'cos we're nice like that. - status = self._execute( - cur, - "SET DateStyle=ISO; " - "SET client_min_messages=notice; " - "SELECT set_config('bytea_output','hex',false) FROM pg_settings" - " WHERE name = 'bytea_output'; " - "SET client_encoding='{0}';".format(postgres_encoding) - ) - - if status is not None: - self.conn.close() - self.conn = None - - return False, status - - is_error, errmsg = self._set_role(manager, cur, conn_id, **kwargs) - if is_error: - return False, errmsg - - # Check database version every time on reconnection - status = self._execute(cur, "SELECT version()") - - if status is not None: - self.conn.close() - self.conn = None - self.wasConnected = False - current_app.logger.error( - "Failed to fetch the version information on the " - "established connection to the database server " - "(#{server_id}) for '{conn_id}' with below error " - "message:{msg}".format( - server_id=self.manager.sid, - conn_id=conn_id, - msg=status) - ) - return False, status - - if cur.rowcount > 0: - row = cur.fetchmany(1)[0] - manager.ver = row['version'] - manager.sversion = self.conn.server_version - - status = self._execute(cur, """ -SELECT - db.oid as did, db.datname, db.datallowconn, - pg_encoding_to_char(db.encoding) AS serverencoding, - has_database_privilege(db.oid, 'CREATE') as cancreate, - datistemplate -FROM - pg_catalog.pg_database db -WHERE db.datname = current_database()""") - - if status is None: - manager.db_info = manager.db_info or dict() - if cur.rowcount > 0: - res = cur.fetchmany(1)[0] - manager.db_info[res['did']] = res.copy() - - # We do not have database oid for the maintenance database. - if len(manager.db_info) == 1: - manager.did = res['did'] - - if manager.sversion >= 120000: - status = self._execute(cur, """ - SELECT - gss_authenticated, encrypted - FROM - pg_catalog.pg_stat_gssapi - WHERE pid = pg_backend_pid()""") - if status is None and cur.rowcount > 0: - res_enc = cur.fetchmany(1)[0] - manager.db_info[res['did']]['gss_authenticated'] =\ - res_enc['gss_authenticated'] - manager.db_info[res['did']]['gss_encrypted'] = \ - res_enc['encrypted'] - - if len(manager.db_info) == 1: - manager.gss_authenticated = \ - res_enc['gss_authenticated'] - manager.gss_encrypted = res_enc['encrypted'] - - self._set_user_info(cur, manager, **kwargs) - - self._set_server_type_and_password(kwargs, manager) - - manager.update_session() - - return True, None - - def _set_user_info(self, cur, manager, **kwargs): - """ - Set user info. - :param cur: - :param manager: - :return: - """ - status = self._execute(cur, """ - SELECT - roles.oid as id, roles.rolname as name, - roles.rolsuper as is_superuser, - CASE WHEN roles.rolsuper THEN true ELSE roles.rolcreaterole END as - can_create_role, - CASE WHEN roles.rolsuper THEN true - ELSE roles.rolcreatedb END as can_create_db, - CASE WHEN 'pg_signal_backend'=ANY(ARRAY(WITH RECURSIVE cte AS ( - SELECT pg_roles.oid,pg_roles.rolname FROM pg_roles - WHERE pg_roles.oid = roles.oid - UNION ALL - SELECT m.roleid,pgr.rolname FROM cte cte_1 - JOIN pg_auth_members m ON m.member = cte_1.oid - JOIN pg_roles pgr ON pgr.oid = m.roleid) - SELECT rolname FROM cte)) THEN True - ELSE False END as can_signal_backend - FROM - pg_catalog.pg_roles as roles - WHERE - rolname = current_user""") - - if status is None and 'user' not in kwargs: - manager.user_info = dict() - if cur.rowcount > 0: - manager.user_info = cur.fetchmany(1)[0] - - def _set_server_type_and_password(self, kwargs, manager): - """ - Set server type - :param kwargs: - :param manager: - :return: - """ - if 'password' in kwargs: - manager.password = kwargs['password'] - - server_types = None - if 'server_types' in kwargs and isinstance( - kwargs['server_types'], list): - server_types = manager.server_types = kwargs['server_types'] - - if server_types is None: - from pgadmin.browser.server_groups.servers.types import ServerType - server_types = ServerType.types() - - for st in server_types: - if st.stype == 'ppas': - if st.instance_of(manager.ver): - manager.server_type = st.stype - manager.server_cls = st - break - else: - if st.instance_of(): - manager.server_type = st.stype - manager.server_cls = st - break - - def __cursor(self, server_cursor=False): - - if not get_crypt_key()[0]: - raise CryptKeyMissing() - - # Check SSH Tunnel is alive or not. If used by the database - # server for the connection. - if self.manager.use_ssh_tunnel == 1: - self.manager.check_ssh_tunnel_alive() - - if self.wasConnected is False: - raise ConnectionLost( - self.manager.sid, - self.db, - None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:] - ) - cur = getattr(g, self.ARGS_STR.format( - self.manager.sid, - self.conn_id.encode('utf-8') - ), None) - - if self.connected() and cur and not cur.closed and \ - (not server_cursor or (server_cursor and cur.name)): - return True, cur - - if not self.connected(): - errmsg = "" - - current_app.logger.warning( - "Connection to database server (#{server_id}) for the " - "connection - '{conn_id}' has been lost.".format( - server_id=self.manager.sid, - conn_id=self.conn_id - ) - ) - - if self.auto_reconnect and not self.reconnecting: - self.__attempt_execution_reconnect(None) - else: - raise ConnectionLost( - self.manager.sid, - self.db, - None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:] - ) - - try: - if server_cursor: - # Providing name to cursor will create server side cursor. - cursor_name = "CURSOR:{0}".format(self.conn_id) - cur = self.conn.cursor( - name=cursor_name, cursor_factory=DictCursor - ) - else: - cur = self.conn.cursor(cursor_factory=DictCursor) - except psycopg2.Error as pe: - current_app.logger.exception(pe) - errmsg = gettext( - "Failed to create cursor for psycopg2 connection with error " - "message for the server#{1}:{2}:\n{0}" - ).format( - str(pe), self.manager.sid, self.db - ) - - current_app.logger.error(errmsg) - if self.conn.closed: - self.conn = None - if self.auto_reconnect and not self.reconnecting: - current_app.logger.info( - gettext( - "Attempting to reconnect to the database server " - "(#{server_id}) for the connection - '{conn_id}'." - ).format( - server_id=self.manager.sid, - conn_id=self.conn_id - ) - ) - return self.__attempt_execution_reconnect( - self.__cursor, server_cursor - ) - else: - raise ConnectionLost( - self.manager.sid, - self.db, - None if self.conn_id[0:3] == 'DB:' - else self.conn_id[5:] - ) - - setattr( - g, self.ARGS_STR.format( - self.manager.sid, self.conn_id.encode('utf-8') - ), cur - ) - - return True, cur - - def reset_cursor_at(self, position): - """ - This function is used to reset the cursor at the given position - """ - cur = self.__async_cursor - if not cur: - current_app.logger.log( - 25, - 'Cursor not found in reset_cursor_at method') - - try: - cur.scroll(position, mode='absolute') - except psycopg2.Error: - # bypassing the error as cursor tried to scroll on the - # specified position, but end of records found - current_app.logger.log( - 25, - 'Failed to reset cursor in reset_cursor_at method') - - def escape_params_sqlascii(self, params): - # The data is unescaped using string_typecasters when selected - # We need to esacpe the data so that it does not fail when - # it is encoded with python ascii - # unicode_escape helps in escaping and unescaping - if self.conn and \ - self.conn.encoding in ('SQL_ASCII', 'SQLASCII', - 'MULE_INTERNAL', 'MULEINTERNAL')\ - and params is not None and isinstance(params, dict): - for key, val in params.items(): - modified_val = val - # "unicode_escape" will convert single backslash to double - # backslash, so we will have to replace/revert them again - # to store the correct value into the database. - if isinstance(val, str): - modified_val = val.encode('unicode_escape')\ - .decode('raw_unicode_escape')\ - .replace("\\\\", "\\") - - params[key] = modified_val - - return params - - def __internal_blocking_execute(self, cur, query, params): - """ - This function executes the query using cursor's execute function, - but in case of asynchronous connection we need to wait for the - transaction to be completed. If self.async_ is 1 then it is a - blocking call. - - Args: - cur: Cursor object - query: SQL query to run. - params: Extra parameters - """ - - query = query.encode(self.python_encoding) - - params = self.escape_params_sqlascii(params) - cur.execute(query, params) - if self.async_ == 1: - self._wait(cur.connection) - - def execute_on_server_as_csv(self, formatted_exception_msg=False, - records=2000): - """ - To fetch query result and generate CSV output - - Args: - params: Additional parameters - formatted_exception_msg: For exception - records: Number of initial records - Returns: - Generator response - """ - cur = self.__async_cursor - if not cur: - return False, self.CURSOR_NOT_FOUND - - if self.conn.isexecuting(): - return False, gettext( - "Asynchronous query execution/operation underway." - ) - - encoding = self.python_encoding - query = None - try: - query = str(cur.query, encoding) \ - if cur and cur.query is not None else None - except Exception: - current_app.logger.warning('Error encoding query') - - dsn = self.conn.get_dsn_parameters() - current_app.logger.log( - 25, - "Execute (with server cursor) by {pga_user} on " - "{db_user}@{db_host}/{db_name} #{server_id} - " - "{conn_id} (Query-id: {query_id}):\n{query}".format( - pga_user=current_user.email, - db_user=dsn['user'], - db_host=dsn['host'], - db_name=dsn['dbname'], - server_id=self.manager.sid, - conn_id=self.conn_id, - query=query, - query_id=self.__async_query_id - ) - ) - try: - # Unregistering type casting for large size data types. - unregister_numeric_typecasters(self.conn) - if self.async_ == 1: - self._wait(cur.connection) - except psycopg2.Error as pe: - cur.close() - errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) - current_app.logger.error( - "failed to execute query ((with server cursor) " - "for the server #{server_id} - {conn_id} " - "(query-id: {query_id}):\n" - "error message:{errmsg}".format( - server_id=self.manager.sid, - conn_id=self.conn_id, - errmsg=errmsg, - query_id=self.__async_query_id - ) - ) - return False, errmsg - - # http://initd.org/psycopg/docs/cursor.html#cursor.description - # to avoid no-op - if cur.description is None: - return False, \ - gettext('The query executed did not return any data.') - - def handle_null_values(results, replace_nulls_with): - """ - This function is used to replace null values with the given string - - :param results: - :param replace_nulls_with: null values will be replaced by this - string. - :return: modified result - """ - - temp_results = [] - for row in results: - res = dict() - for k, v in row.items(): - if v is None: - res[k] = replace_nulls_with - else: - res[k] = v - temp_results.append(res) - results = temp_results - - return results - - def gen(conn_obj, trans_obj, quote='strings', quote_char="'", - field_separator=',', replace_nulls_with=None): - - cur.scroll(0, mode='absolute') - results = cur.fetchmany(records) - if not results: - yield gettext('The query executed did not return any data.') - return - - # Type cast the numeric values - results = numeric_typecasters(results, conn_obj) - - header = [] - json_columns = [] - - for c in cur.ordered_description(): - # This is to handle the case in which column name is non-ascii - column_name = c.to_dict()['name'] - header.append(column_name) - if c.to_dict()['type_code'] in ALL_JSON_TYPES: - json_columns.append(column_name) - - res_io = StringIO() - - if quote == 'strings': - quote = csv.QUOTE_NONNUMERIC - elif quote == 'all': - quote = csv.QUOTE_ALL - else: - quote = csv.QUOTE_NONE - - csv_writer = csv.DictWriter( - res_io, fieldnames=header, delimiter=field_separator, - quoting=quote, - quotechar=quote_char, - replace_nulls_with=replace_nulls_with - ) - - csv_writer.writeheader() - # Replace the null values with given string if configured. - if replace_nulls_with is not None: - results = handle_null_values(results, replace_nulls_with) - csv_writer.writerows(results) - - yield res_io.getvalue() - - while True: - results = cur.fetchmany(records) - - if not results: - break - res_io = StringIO() - - csv_writer = csv.DictWriter( - res_io, fieldnames=header, delimiter=field_separator, - quoting=quote, - quotechar=quote_char, - replace_nulls_with=replace_nulls_with - ) - - # Replace the null values with given string if configured. - if replace_nulls_with is not None: - results = handle_null_values(results, replace_nulls_with) - csv_writer.writerows(results) - yield res_io.getvalue() - - try: - # try to reset the cursor scroll back to where it was, - # bypass error, if cannot scroll back - rows_fetched_from = trans_obj.get_fetched_row_cnt() - cur.scroll(rows_fetched_from, mode='absolute') - except psycopg2.Error: - # bypassing the error as cursor tried to scroll on the - # specified position, but end of records found - pass - - # Registering back type caster for large size data types to string - # which was unregistered at starting - register_string_typecasters(self.conn) - return True, gen, self - - def execute_scalar(self, query, params=None, - formatted_exception_msg=False): - status, cur = self.__cursor() - self.row_count = 0 - - if not status: - return False, str(cur) - query_id = secrets.choice(range(1, 9999999)) - - dsn = self.conn.get_dsn_parameters() - current_app.logger.log( - 25, - "Execute (scalar) by {pga_user} on " - "{db_user}@{db_host}/{db_name} #{server_id} - " - "{conn_id} (Query-id: {query_id}):\n{query}".format( - pga_user=current_user.email, - db_user=dsn['user'], - db_host=dsn['host'], - db_name=dsn['dbname'], - server_id=self.manager.sid, - conn_id=self.conn_id, - query=query, - query_id=query_id - ) - ) - try: - self.__internal_blocking_execute(cur, query, params) - except psycopg2.Error as pe: - cur.close() - if not self.connected(): - if self.auto_reconnect and not self.reconnecting: - return self.__attempt_execution_reconnect( - self.execute_scalar, query, params, - formatted_exception_msg - ) - raise ConnectionLost( - self.manager.sid, - self.db, - None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:] - ) - errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) - - if errmsg == "can\'t execute an empty query": - return True, {'rows': []} - current_app.logger.error( - "Failed to execute query (execute_scalar) for the server " - "#{server_id} - {conn_id} (Query-id: {query_id}):\n" - "Error Message:{errmsg}".format( - server_id=self.manager.sid, - conn_id=self.conn_id, - errmsg=errmsg, - query_id=query_id - ) - ) - return False, errmsg - - self.row_count = cur.rowcount - if cur.rowcount > 0: - res = cur.fetchone() - if len(res) > 0: - return True, res[0] - - return True, None - - def execute_async(self, query, params=None, formatted_exception_msg=True): - """ - This function executes the given query asynchronously and returns - result. - - Args: - query: SQL query to run. - params: extra parameters to the function - formatted_exception_msg: if True then function return the - formatted exception message - """ - - # Convert the params based on python_encoding - params = self.escape_params_sqlascii(params) - - self.__async_cursor = None - status, cur = self.__cursor() - - if not status: - return False, str(cur) - query_id = secrets.choice(range(1, 9999999)) - - encoding = self.python_encoding - - query = query.encode(encoding) - - dsn = self.conn.get_dsn_parameters() - current_app.logger.log( - 25, - "Execute (async) by {pga_user} on {db_user}@{db_host}/{db_name} " - "#{server_id} - {conn_id} (Query-id: " - "{query_id}):\n{query}".format( - pga_user=current_user.username, - db_user=dsn['user'], - db_host=dsn['host'], - db_name=dsn['dbname'], - server_id=self.manager.sid, - conn_id=self.conn_id, - query=query.decode(encoding), - query_id=query_id - ) - ) - - try: - self.__notices = [] - self.__notifies = [] - self.execution_aborted = False - cur.execute(query, params) - res = self._wait_timeout(cur.connection) - except psycopg2.Error as pe: - errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) - current_app.logger.error( - "Failed to execute query (execute_async) for the server " - "#{server_id} - {conn_id}(Query-id: {query_id}):\n" - "Error Message:{errmsg}".format( - server_id=self.manager.sid, - conn_id=self.conn_id, - errmsg=errmsg, - query_id=query_id - ) - ) - - # Check for the asynchronous notifies. - self.check_notifies() - - if self.is_disconnected(pe): - raise ConnectionLost( - self.manager.sid, - self.db, - None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:] - ) - return False, errmsg - - self.__async_cursor = cur - self.__async_query_id = query_id - - return True, res - - def execute_void(self, query, params=None, formatted_exception_msg=False): - """ - This function executes the given query with no result. - - Args: - query: SQL query to run. - params: extra parameters to the function - formatted_exception_msg: if True then function return the - formatted exception message - """ - status, cur = self.__cursor() - - if not status: - return False, str(cur) - query_id = secrets.choice(range(1, 9999999)) - - dsn = self.conn.get_dsn_parameters() - current_app.logger.log( - 25, - "Execute (void) by {pga_user} on " - "{db_user}@{db_host}/{db_name} #{server_id} - " - "{conn_id} (Query-id: {query_id}):\n{query}".format( - pga_user=current_user.email, - db_user=dsn['user'], - db_host=dsn['host'], - db_name=dsn['dbname'], - server_id=self.manager.sid, - conn_id=self.conn_id, - query=query, - query_id=query_id - ) - ) - - try: - self.__internal_blocking_execute(cur, query, params) - except psycopg2.Error as pe: - cur.close() - if not self.connected(): - if self.auto_reconnect and not self.reconnecting: - return self.__attempt_execution_reconnect( - self.execute_void, query, params, - formatted_exception_msg - ) - raise ConnectionLost( - self.manager.sid, - self.db, - None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:] - ) - errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) - current_app.logger.error( - "Failed to execute query (execute_void) for the server " - "#{server_id} - {conn_id}(Query-id: {query_id}):\n" - "Error Message:{errmsg}".format( - server_id=self.manager.sid, - conn_id=self.conn_id, - errmsg=errmsg, - query_id=query_id - ) - ) - return False, errmsg - - return True, None - - def __attempt_execution_reconnect(self, fn, *args, **kwargs): - self.reconnecting = True - setattr(g, self.ARGS_STR.format( - self.manager.sid, - self.conn_id.encode('utf-8') - ), None) - try: - status, res = self.connect() - if status: - if fn: - status, res = fn(*args, **kwargs) - self.reconnecting = False - return status, res - except Exception as e: - current_app.logger.exception(e) - self.reconnecting = False - - current_app.logger.warning( - "Failed to reconnect the database server " - "(Server #{server_id}, Connection #{conn_id})".format( - server_id=self.manager.sid, - conn_id=self.conn_id - ) - ) - self.reconnecting = False - raise ConnectionLost( - self.manager.sid, - self.db, - None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:] - ) - - def execute_2darray(self, query, params=None, - formatted_exception_msg=False): - status, cur = self.__cursor() - self.row_count = 0 - - if not status: - return False, str(cur) - - query_id = secrets.choice(range(1, 9999999)) - dsn = self.conn.get_dsn_parameters() - current_app.logger.log( - 25, - "Execute (2darray) by {pga_user} on " - "{db_user}@{db_host}/{db_name} #{server_id} - " - "{conn_id} (Query-id: {query_id}):\n{query}".format( - pga_user=current_user.email, - db_user=dsn['user'], - db_host=dsn['host'], - db_name=dsn['dbname'], - server_id=self.manager.sid, - conn_id=self.conn_id, - query=query, - query_id=query_id - ) - ) - try: - self.__internal_blocking_execute(cur, query, params) - except psycopg2.Error as pe: - cur.close() - if not self.connected() and self.auto_reconnect and \ - not self.reconnecting: - return self.__attempt_execution_reconnect( - self.execute_2darray, query, params, - formatted_exception_msg - ) - errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) - current_app.logger.error( - "Failed to execute query (execute_2darray) for the server " - "#{server_id} - {conn_id} (Query-id: {query_id}):\n" - "Error Message:{errmsg}".format( - server_id=self.manager.sid, - conn_id=self.conn_id, - errmsg=errmsg, - query_id=query_id - ) - ) - return False, errmsg - - # Get Resultset Column Name, Type and size - columns = cur.description and [ - desc.to_dict() for desc in cur.ordered_description() - ] or [] - - rows = [] - self.row_count = cur.rowcount - if cur.rowcount > 0: - for row in cur: - rows.append(row) - - return True, {'columns': columns, 'rows': rows} - - def execute_dict(self, query, params=None, formatted_exception_msg=False): - status, cur = self.__cursor() - self.row_count = 0 - - if not status: - return False, str(cur) - query_id = secrets.choice(range(1, 9999999)) - dsn = self.conn.get_dsn_parameters() - current_app.logger.log( - 25, - "Execute (dict) by {pga_user} on " - "{db_user}@{db_host}/{db_name} #{server_id} - " - "{conn_id} (Query-id: {query_id}):\n{query}".format( - pga_user=current_user.email, - db_user=dsn['user'], - db_host=dsn['host'], - db_name=dsn['dbname'], - server_id=self.manager.sid, - conn_id=self.conn_id, - query=query, - query_id=query_id - ) - ) - try: - self.__internal_blocking_execute(cur, query, params) - except psycopg2.Error as pe: - cur.close() - if not self.connected(): - if self.auto_reconnect and not self.reconnecting: - return self.__attempt_execution_reconnect( - self.execute_dict, query, params, - formatted_exception_msg - ) - raise ConnectionLost( - self.manager.sid, - self.db, - None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:] - ) - errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) - if errmsg == "can\'t execute an empty query": - return True, {'rows': []} - current_app.logger.error( - "Failed to execute query (execute_dict) for the server " - "#{server_id}- {conn_id} (Query-id: {query_id}):\n" - "Error Message:{errmsg}".format( - server_id=self.manager.sid, - conn_id=self.conn_id, - query_id=query_id, - errmsg=errmsg - ) - ) - return False, errmsg - - # Get Resultset Column Name, Type and size - columns = cur.description and [ - desc.to_dict() for desc in cur.ordered_description() - ] or [] - - rows = [] - self.row_count = cur.rowcount - if cur.rowcount > 0: - for row in cur: - rows.append(dict(row)) - - return True, {'columns': columns, 'rows': rows} - - def async_fetchmany_2darray(self, records=2000, - formatted_exception_msg=False): - """ - User should poll and check if status is ASYNC_OK before calling this - function - Args: - records: no of records to fetch. use -1 to fetchall. - formatted_exception_msg: - for_download: if True, will fetch all records and reset the cursor - - Returns: - - """ - cur = self.__async_cursor - if not cur: - return False, self.CURSOR_NOT_FOUND - - if self.conn.isexecuting(): - return False, gettext( - "Asynchronous query execution/operation underway." - ) - - if self.row_count > 0: - result = [] - # For DDL operation, we may not have result. - # - # Because - there is not direct way to differentiate DML and - # DDL operations, we need to rely on exception to figure - # that out at the moment. - try: - if records == -1: - res = cur.fetchall() - else: - res = cur.fetchmany(records) - for row in res: - new_row = [] - for col in self.column_info: - new_row.append(row[col['name']]) - result.append(new_row) - except psycopg2.ProgrammingError: - result = None - else: - # User performed operation which dose not produce record/s as - # result. - # for eg. DDL operations. - return True, None - - return True, result - - def connected(self): - if self.conn: - if not self.conn.closed: - return True - self.conn = None - return False - - def _decrypt_password(self, manager): - """ - Decrypt password - :param manager: Manager for get password. - :return: - """ - password = getattr(manager, 'password', None) - if password: - # Fetch Logged in User Details. - user = User.query.filter_by(id=current_user.id).first() - - if user is None: - return False, self.UNAUTHORIZED_REQUEST, password - - crypt_key_present, crypt_key = get_crypt_key() - if not crypt_key_present: - return False, crypt_key, password - - password = decrypt(password, crypt_key).decode() - return True, '', password - - def reset(self): - if self.conn and self.conn.closed: - self.conn = None - manager = self.manager - - is_return, return_value, password = self._decrypt_password(manager) - if is_return: - return False, return_value - - try: - with ConnectionLocker(manager.kerberos_conn): - # Create the connection string - connection_string = manager.create_connection_string( - self.db, manager.user, password) - - pg_conn = psycopg2.connect(connection_string) - - except psycopg2.Error as e: - if e.pgerror: - msg = e.pgerror - elif e.message: - msg = e.message - elif e.diag.message_detail: - msg = e.diag.message_detail - else: - msg = str(e) - - current_app.logger.error( - gettext( - """ -Failed to reset the connection to the server due to following error: -{0}""" - ).Format(msg) - ) - return False, msg - - pg_conn.notices = deque([], self.ASYNC_NOTICE_MAXLENGTH) - self.conn = pg_conn - self.__backend_pid = pg_conn.get_backend_pid() - - return True, None - - def transaction_status(self): - if self.conn: - return self.conn.get_transaction_status() - return None - - def ping(self): - return self.execute_scalar('SELECT 1') - - def _release(self): - if self.wasConnected: - if self.conn: - self.conn.close() - self.conn = None - self.password = None - self.wasConnected = False - - def _wait(self, conn): - """ - This function is used for the asynchronous connection, - it will call poll method in a infinite loop till poll - returns psycopg2.extensions.POLL_OK. This is a blocking - call. - - Args: - conn: connection object - """ - while True: - state = conn.poll() - if state == psycopg2.extensions.POLL_OK: - break - elif state == psycopg2.extensions.POLL_WRITE: - select.select([], [conn.fileno()], [], - self.ASYNC_WAIT_TIMEOUT) - elif state == psycopg2.extensions.POLL_READ: - select.select([conn.fileno()], [], [], - self.ASYNC_WAIT_TIMEOUT) - else: - raise psycopg2.OperationalError( - "poll() returned %s from _wait function" % state) - - def _wait_timeout(self, conn): - """ - This function is used for the asynchronous connection, - it will call poll method and return the status. If state is - psycopg2.extensions.POLL_WRITE and psycopg2.extensions.POLL_READ - function will wait for the given timeout.This is not a blocking call. - - Args: - conn: connection object - time: wait time - """ - - while True: - state = conn.poll() - - if state == psycopg2.extensions.POLL_OK: - return self.ASYNC_OK - elif state == psycopg2.extensions.POLL_WRITE: - # Wait for the given time and then check the return status - # If three empty lists are returned then the time-out is - # reached. - timeout_status = select.select( - [], [conn.fileno()], [], self.ASYNC_TIMEOUT - ) - if timeout_status == ([], [], []): - return self.ASYNC_WRITE_TIMEOUT - elif state == psycopg2.extensions.POLL_READ: - # Wait for the given time and then check the return status - # If three empty lists are returned then the time-out is - # reached. - timeout_status = select.select( - [conn.fileno()], [], [], self.ASYNC_TIMEOUT - ) - if timeout_status == ([], [], []): - return self.ASYNC_READ_TIMEOUT - else: - raise psycopg2.OperationalError( - "poll() returned %s from _wait_timeout function" % state - ) - - def poll(self, formatted_exception_msg=False, no_result=False): - """ - This function is a wrapper around connection's poll function. - It internally uses the _wait_timeout method to poll the - result on the connection object. In case of success it - returns the result of the query. - - Args: - formatted_exception_msg: if True then function return the formatted - exception message, otherwise error string. - no_result: If True then only poll status will be returned. - """ - - cur = self.__async_cursor - if not cur: - return False, self.CURSOR_NOT_FOUND - - current_app.logger.log( - 25, - "Polling result for (Query-id: {query_id})".format( - query_id=self.__async_query_id - ) - ) - - is_error = False - try: - status = self._wait_timeout(self.conn) - except psycopg2.OperationalError as op_er: - errmsg = \ - self._formatted_exception_msg(op_er, formatted_exception_msg) - is_error = True - except psycopg2.Error as pe: - errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) - is_error = True - if self.conn.closed: - raise ConnectionLost( - self.manager.sid, - self.db, - self.conn_id[5:] - ) - except OSError as e: - # Bad File descriptor - if e.errno == 9: - raise ConnectionLost( - self.manager.sid, - self.db, - self.conn_id[5:] - ) - else: - raise e - - if self.conn.notices and self.__notices is not None: - self.__notices.extend(self.conn.notices) - self.conn.notices.clear() - - # Check for the asynchronous notifies. - self.check_notifies() - - # We also need to fetch notices before we return from function in case - # of any Exception, To avoid code duplication we will return after - # fetching the notices in case of any Exception - if is_error: - return False, errmsg - - result = None - self.row_count = 0 - self.column_info = None - - if status == self.ASYNC_OK: - - # if user has cancelled the transaction then changed the status - if self.execution_aborted: - status = self.ASYNC_EXECUTION_ABORTED - self.execution_aborted = False - return status, result - - # Fetch the column information - if cur.description is not None: - self.column_info = [ - desc.to_dict() for desc in cur.ordered_description() - ] - - pos = 0 - for col in self.column_info: - col['pos'] = pos - pos += 1 - - self.row_count = cur.rowcount - if not no_result and cur.rowcount > 0: - result = [] - # For DDL operation, we may not have result. - # - # Because - there is not direct way to differentiate DML - # and DDL operations, we need to rely on exception to - # figure that out at the moment. - try: - for row in cur: - new_row = [] - for col in self.column_info: - new_row.append(row[col['name']]) - result.append(new_row) - - except psycopg2.ProgrammingError: - result = None - - return status, result - - def status_message(self): - """ - This function will return the status message returned by the last - command executed on the server. - """ - cur = self.__async_cursor - if not cur: - return self.CURSOR_NOT_FOUND - - current_app.logger.log( - 25, - "Status message for (Query-id: {query_id})".format( - query_id=self.__async_query_id - ) - ) - - return cur.statusmessage - - def rows_affected(self): - """ - This function will return the no of rows affected by the last command - executed on the server. - """ - - return self.row_count - - def get_column_info(self): - """ - This function will returns list of columns for last async sql command - executed on the server. - """ - - return self.column_info - - def cancel_transaction(self, conn_id, did=None): - """ - This function is used to cancel the running transaction - of the given connection id and database id using - PostgreSQL's pg_cancel_backend. - - Args: - conn_id: Connection id - did: Database id (optional) - """ - cancel_conn = self.manager.connection(did=did, conn_id=conn_id) - query = """SELECT pg_cancel_backend({0});""".format( - cancel_conn.__backend_pid) - - status = True - msg = '' - - # if backend pid is same then create a new connection - # to cancel the query and release it. - if cancel_conn.__backend_pid == self.__backend_pid: - password = getattr(self.manager, 'password', None) - if password: - # Fetch Logged in User Details. - user = User.query.filter_by(id=current_user.id).first() - if user is None: - return False, self.UNAUTHORIZED_REQUEST - - crypt_key_present, crypt_key = get_crypt_key() - if not crypt_key_present: - return False, crypt_key - - password = decrypt(password, crypt_key)\ - .decode() - - try: - with ConnectionLocker(self.manager.kerberos_conn): - # Create the connection string - connection_string = \ - self.manager.create_connection_string( - self.db, self.manager.user, password) - - pg_conn = psycopg2.connect(connection_string) - - # Get the cursor and run the query - cur = pg_conn.cursor() - cur.execute(query) - - # Close the connection - pg_conn.close() - - except psycopg2.Error as e: - status = False - if e.pgerror: - msg = e.pgerror - elif e.diag.message_detail: - msg = e.diag.message_detail - else: - msg = str(e) - return status, msg - else: - if self.connected(): - status, msg = self.execute_void(query) - - if status: - cancel_conn.execution_aborted = True - else: - status = False - msg = gettext("Not connected to the database server.") - - return status, msg - - def messages(self): - """ - Returns the list of the messages/notices send from the database server. - """ - resp = [] - - if self.__notices is not None: - while self.__notices: - resp.append(self.__notices.pop(0)) - - if self.__notifies is None: - return resp - - for notify in self.__notifies: - if notify.payload is not None and notify.payload != '': - notify_msg = gettext( - "Asynchronous notification \"{0}\" with payload \"{1}\" " - "received from server process with PID {2}\n" - ).format(notify.channel, notify.payload, notify.pid) - - else: - notify_msg = gettext( - "Asynchronous notification \"{0}\" received from " - "server process with PID {1}\n" - ).format(notify.channel, notify.pid) - resp.append(notify_msg) - - return resp - - def _formatted_exception_msg(self, exception_obj, formatted_msg): - """ - This method is used to parse the psycopg2.Error object and returns the - formatted error message if flag is set to true else return - normal error message. - - Args: - exception_obj: exception object - formatted_msg: if True then function return the formatted exception - message - - """ - if exception_obj.pgerror: - errmsg = exception_obj.pgerror - elif exception_obj.diag.message_detail: - errmsg = exception_obj.diag.message_detail - else: - errmsg = str(exception_obj) - - # if formatted_msg is false then return from the function - if not formatted_msg: - notices = self.get_notices() - return errmsg if notices == '' else notices + '\n' + errmsg - - # Do not append if error starts with `ERROR:` as most pg related - # error starts with `ERROR:` - if not errmsg.startswith('ERROR:'): - errmsg = gettext('ERROR: ') + errmsg + '\n\n' - - if exception_obj.diag.severity is not None \ - and exception_obj.diag.message_primary is not None: - ex_diag_message = "{0}: {1}".format( - exception_obj.diag.severity, - exception_obj.diag.message_primary - ) - # If both errors are different then only append it - if errmsg and ex_diag_message and \ - ex_diag_message.strip().strip('\n').lower() not in \ - errmsg.strip().strip('\n').lower(): - errmsg += ex_diag_message - elif exception_obj.diag.message_primary is not None: - message_primary = exception_obj.diag.message_primary - if message_primary.lower() not in errmsg.lower(): - errmsg += message_primary - - if exception_obj.diag.sqlstate is not None: - if not errmsg.endswith('\n'): - errmsg += '\n' - errmsg += gettext('SQL state: ') - errmsg += exception_obj.diag.sqlstate - - if exception_obj.diag.message_detail is not None and \ - 'Detail:'.lower() not in errmsg.lower(): - if not errmsg.endswith('\n'): - errmsg += '\n' - errmsg += gettext('Detail: ') - errmsg += exception_obj.diag.message_detail - - if exception_obj.diag.message_hint is not None and \ - 'Hint:'.lower() not in errmsg.lower(): - if not errmsg.endswith('\n'): - errmsg += '\n' - errmsg += gettext('Hint: ') - errmsg += exception_obj.diag.message_hint - - if exception_obj.diag.statement_position is not None and \ - 'Character:'.lower() not in errmsg.lower(): - if not errmsg.endswith('\n'): - errmsg += '\n' - errmsg += gettext('Character: ') - errmsg += exception_obj.diag.statement_position - - if exception_obj.diag.context is not None and \ - 'Context:'.lower() not in errmsg.lower(): - if not errmsg.endswith('\n'): - errmsg += '\n' - errmsg += gettext('Context: ') - errmsg += exception_obj.diag.context - - notices = self.get_notices() - return errmsg if notices == '' else notices + '\n' + errmsg - - ##### - # As per issue reported on pgsycopg2 github repository link is shared below - # conn.closed is not reliable enough to identify the disconnection from the - # database server for some unknown reasons. - # - # (https://github.com/psycopg/psycopg2/issues/263) - # - # In order to resolve the issue, sqlalchamey follows the below logic to - # identify the disconnection. It relies on exception message to identify - # the error. - # - # Reference (MIT license): - # https://github.com/zzzeek/sqlalchemy/blob/master/lib/sqlalchemy/dialects/postgresql/psycopg2.py - # - def is_disconnected(self, err): - if not self.conn.closed: - # checks based on strings. in the case that .closed - # didn't cut it, fall back onto these. - str_e = str(err).partition("\n")[0] - for msg in [ - # these error messages from libpq: interfaces/libpq/fe-misc.c - # and interfaces/libpq/fe-secure.c. - 'terminating connection', - 'closed the connection', - 'connection not open', - 'could not receive data from server', - 'could not send data to server', - # psycopg2 client errors, psycopg2/conenction.h, - # psycopg2/cursor.h - 'connection already closed', - 'cursor already closed', - # not sure where this path is originally from, it may - # be obsolete. It really says "losed", not "closed". - 'losed the connection unexpectedly', - # these can occur in newer SSL - 'connection has been closed unexpectedly', - 'SSL SYSCALL error: Bad file descriptor', - 'SSL SYSCALL error: EOF detected', - ]: - idx = str_e.find(msg) - if idx >= 0 and '"' not in str_e[:idx]: - return True - - return False - return True - - def check_notifies(self, required_polling=False): - """ - Check for the notify messages by polling the connection or after - execute is there in notifies. - """ - if self.conn and required_polling: - self.conn.poll() - - if self.conn and hasattr(self.conn, 'notifies') and \ - len(self.conn.notifies) > 0: - self.__notifies.extend(self.conn.notifies) - self.conn.notifies = [] - else: - self.__notifies = [] - - def get_notifies(self): - """ - This function will returns list of notifies received from database - server. - """ - notifies = None - # Convert list of Notify objects into list of Dict. - if self.__notifies is not None and len(self.__notifies) > 0: - notifies = [{'recorded_time': str(datetime.datetime.now()), - 'channel': notify.channel, - 'payload': notify.payload, - 'pid': notify.pid - } for notify in self.__notifies - ] - return notifies - - def get_notices(self): - """ - This function will returns the notices as string. - :return: - """ - notices = '' - # Check for notices. - if self.conn.notices and self.__notices is not None: - self.__notices.extend(self.conn.notices) - self.conn.notices.clear() - - while self.__notices: - notices += self.__notices.pop(0) - - return notices - - def pq_encrypt_password_conn(self, password, user): - """ - This function will return the encrypted password for database server - greater than or equal to 10 - :param password: password to be encrypted - :param user: user of the database server - :return: - """ - enc_password = None - if psycopg2.__libpq_version__ >= 100000 and \ - hasattr(psycopg2.extensions, 'encrypt_password'): - if self.connected(): - status, enc_algorithm = \ - self.execute_scalar("SHOW password_encryption") - if status: - enc_password = psycopg2.extensions.encrypt_password( - password=password, user=user, scope=self.conn, - algorithm=enc_algorithm - ) - elif psycopg2.__libpq_version__ < 100000: - current_app.logger.warning( - "To encrypt passwords the required libpq version is " - "greater than or equal to 100000. Current libpq version " - "is {curr_ver}".format( - curr_ver=psycopg2.__libpq_version__ - ) - ) - elif not hasattr(psycopg2.extensions, 'encrypt_password'): - current_app.logger.warning( - "The psycopg2.extensions module does not have the" - "'encrypt_password' method." - ) - - return enc_password - - def mogrify(self, query, parameters): - """ - This function will return the sql query after parameters binding - :param query: sql query before parameters (variables) binding - :param parameters: query parameters / variables - :return: - """ - status, cursor = self.__cursor() - if not status: - return None - else: - - if parameters: - mogrified_sql = cursor.mogrify(query, parameters) - if isinstance(mogrified_sql, bytes): - mogrified_sql = mogrified_sql.decode(self.python_encoding) - return mogrified_sql - else: - return query diff --git a/web/pgadmin/utils/driver/psycopg2/cursor.py b/web/pgadmin/utils/driver/psycopg2/cursor.py deleted file mode 100644 index f6104e1c6..000000000 --- a/web/pgadmin/utils/driver/psycopg2/cursor.py +++ /dev/null @@ -1,235 +0,0 @@ -########################################################################## -# -# pgAdmin 4 - PostgreSQL Tools -# -# Copyright (C) 2013 - 2023, The pgAdmin Development Team -# This software is released under the PostgreSQL Licence -# -########################################################################## - -""" -Implementation of an extended cursor, which returns ordered dictionary when -fetching results from it, and also takes care of the duplicate column name in -result. -""" - - -from collections import OrderedDict -import psycopg2 -from psycopg2.extensions import cursor as _cursor, encodings -from .encoding import configure_driver_encodings - -configure_driver_encodings(encodings) - - -class _WrapperColumn(): - """ - class _WrapperColumn() - - A wrapper class, which wraps the individual description column object, - to allow identify the duplicate column name, created by PostgreSQL database - server implicitly during query execution. - - Methods: - ------- - * __init__(_col, _name) - - Initialize the wrapper around the description column object, which will - present the dummy name when available instead of the duplicate name. - - * __getattribute__(name) - - Get attributes from the original column description (which is a named - tuple) except for few of the attributes of this object (i.e. orig_col, - dummy_name, __class__, to_dict) are part of this object. - - * __getitem__(idx) - - Get the item from the original object except for the 0th index item, - which is for 'name'. - - * __setitem__(idx, value) - * __delitem__(idx) - - Override them to make the operations on original object. - - * to_dict() - - Converts original objects data as OrderedDict (except the name will same - as dummy name (if available), and one more parameter as 'display_name'. - """ - - def __init__(self, _col, _name): - """Initializer for _WrapperColumn""" - self.orig_col = _col - self.dummy_name = _name - - def __getattribute__(self, name): - """Getting the attributes from the original object. (except few)""" - if (name == 'orig_col' or name == 'dummy_name' or - name == '__class__' or name == 'to_dict'): - return object.__getattribute__(self, name) - elif name == 'name': - res = object.__getattribute__(self, 'dummy_name') - if res is not None: - return res - return self.orig_col.__getattribute__(name) - - def __getitem__(self, idx): - """Overrides __getitem__ to fetch item from original object""" - if idx == 0 and self.dummy_name is not None: - return self.dummy_name - return self.orig_col.__getitem__(idx) - - def __setitem__(self, *args, **kwargs): - """Orverrides __setitem__ to do the operations on original object.""" - return self.orig_col.__setitem__(*args, **kwargs) - - def __delitem__(self, *args, **kwargs): - """Orverrides __delitem__ to do the operations on original object.""" - return self.orig_col.__delitem__(*args, **kwargs) - - def to_dict(self): - """ - Generates an OrderedDict from the fields of the original objects - with avoiding the duplicate name. - """ - - # In psycopg2 2.8, the description of one result column, - # exposed as items of the cursor.description sequence. - # Before psycopg2 2.8 the description attribute was a sequence - # of simple tuples or namedtuples. - ores = OrderedDict() - ores['name'] = self.orig_col.name - ores['type_code'] = self.orig_col.type_code - ores['display_size'] = self.orig_col.display_size - ores['internal_size'] = self.orig_col.internal_size - ores['precision'] = self.orig_col.precision - ores['scale'] = self.orig_col.scale - ores['null_ok'] = self.orig_col.null_ok - ores['table_oid'] = self.orig_col.table_oid - ores['table_column'] = self.orig_col.table_column - - name = ores['name'] - if self.dummy_name: - ores['name'] = self.dummy_name - ores['display_name'] = name - return ores - - -class DictCursor(_cursor): - """ - DictCursor - - A class to generate the dictionary from the tuple, and also takes care of - the duplicate column name in result description. - - Methods: - ------- - * __init__() - - Initialize the cursor object - - * _dict_tuple(tuple) - - Generate a dictionary object from a tuple, based on the column - description. - - * _ordered_description() - - Generates the _WrapperColumn object from the description column, and - identifies duplicate column name - """ - - def __init__(self, *args, **kwargs): - """ - Initialize the cursor object. - """ - self._odt_desc = None - _cursor.__init__(self, *args, **kwargs) - - def _dict_tuple(self, tup): - """ - Transform the tuple into a dictionary object. - """ - if self._odt_desc is None: - self._ordered_description() - return dict((k[0], v) for k, v in zip(self._odt_desc, tup)) - - def _ordered_description(self): - """ - Transform the regular description to wrapper object, which handles - duplicate column name. - """ - self._odt_desc = _cursor.__getattribute__(self, 'description') - desc = self._odt_desc - - if desc is None or len(desc) == 0: - return - - res = list() - od = dict((d[0], 0) for d in desc) - for d in desc: - dummy = None - idx = od[d.name] - if idx == 0: - od[d.name] = 1 - else: - name = d.name - while name in od: - idx += 1 - name = ("%s-%s" % (d.name, idx)) - od[d.name] = idx - dummy = name - res.append(_WrapperColumn(d, dummy)) - self._odt_desc = tuple(res) - - def ordered_description(self): - """ - Use this to fetch the description - """ - if self._odt_desc is None: - self._ordered_description() - return self._odt_desc - - def execute(self, query, params=None): - """ - Execute function - """ - self._odt_desc = None - if params is not None and len(params) == 0: - params = None - - return _cursor.execute(self, query, params) - - def executemany(self, query, params=None): - """ - Execute many function of regular cursor. - """ - self._odt_desc = None - return _cursor.executemany(self, query, params) - - def callproc(self, proname, params=None): - """ - Call a procedure by a name. - """ - self._odt_desc = None - return _cursor.callproc(self, proname, params) - - def fetchmany(self, size=None): - """ - Fetch many tuples as ordered dictionary list. - """ - tuples = _cursor.fetchmany(self, size) - if tuples is not None: - return [self._dict_tuple(t) for t in tuples] - return None - - def fetchall(self): - """ - Fetch all tuples as ordered dictionary list. - """ - tuples = _cursor.fetchall(self) - if tuples is not None: - return [self._dict_tuple(t) for t in tuples] - - def __iter__(self): - it = _cursor.__iter__(self) - try: - yield self._dict_tuple(next(it)) - while True: - yield self._dict_tuple(next(it)) - except StopIteration: - pass diff --git a/web/pgadmin/utils/driver/psycopg2/encoding.py b/web/pgadmin/utils/driver/psycopg2/encoding.py deleted file mode 100644 index 99e0fb08b..000000000 --- a/web/pgadmin/utils/driver/psycopg2/encoding.py +++ /dev/null @@ -1,83 +0,0 @@ -########################################################################## -# -# pgAdmin 4 - PostgreSQL Tools -# -# Copyright (C) 2013 - 2023, The pgAdmin Development Team -# This software is released under the PostgreSQL Licence -# -########################################################################## - -# Get Postgres and Python encoding - -encode_dict = { - 'SQL_ASCII': ['SQL_ASCII', 'raw_unicode_escape', 'unicode_escape'], - 'SQLASCII': ['SQL_ASCII', 'raw_unicode_escape', 'unicode_escape'], - 'MULE_INTERNAL': ['MULE_INTERNAL', 'raw_unicode_escape', 'unicode_escape'], - 'MULEINTERNAL': ['MULEINTERNAL', 'raw_unicode_escape', 'unicode_escape'], - 'LATIN1': ['LATIN1', 'latin1', 'latin1'], - 'LATIN2': ['LATIN2', 'latin2', 'latin2'], - 'LATIN3': ['LATIN3', 'latin3', 'latin3'], - 'LATIN4': ['LATIN4', 'latin4', 'latin4'], - 'LATIN5': ['LATIN5', 'latin5', 'latin5'], - 'LATIN6': ['LATIN6', 'latin6', 'latin6'], - 'LATIN7': ['LATIN7', 'latin7', 'latin7'], - 'LATIN8': ['LATIN8', 'latin8', 'latin8'], - 'LATIN9': ['LATIN9', 'latin9', 'latin9'], - 'LATIN10': ['LATIN10', 'latin10', 'latin10'], - 'WIN866': ['WIN866', 'cp866', 'cp866'], - 'WIN874': ['WIN874', 'cp874', 'cp874'], - 'WIN1250': ['WIN1250', 'cp1250', 'cp1250'], - 'WIN1251': ['WIN1251', 'cp1251', 'cp1251'], - 'WIN1252': ['WIN1252', 'cp1252', 'cp1252'], - 'WIN1253': ['WIN1253', 'cp1253', 'cp1253'], - 'WIN1254': ['WIN1254', 'cp1254', 'cp1254'], - 'WIN1255': ['WIN1255', 'cp1255', 'cp1255'], - 'WIN1256': ['WIN1256', 'cp1256', 'cp1256'], - 'WIN1257': ['WIN1257', 'cp1257', 'cp1257'], - 'WIN1258': ['WIN1258', 'cp1258', 'cp1258'], - 'EUC_JIS_2004': ['EUC_JIS_2004', 'eucjis2004', 'eucjis2004'], - 'EUCJIS2004': ['EUCJIS2004', 'eucjis2004', 'eucjis2004'], - 'EUC_CN': ['EUC_CN', 'euc-cn', 'euc-cn'], - 'EUCCN': ['EUCCN', 'euc-cn', 'euc-cn'], - 'EUC_JP': ['EUC_JP', 'euc_jp', 'euc_jp'], - 'EUCJP': ['EUCJP', 'euc_jp', 'euc_jp'], - 'EUC_KR': ['EUC_KR', 'euc_kr', 'euc_kr'], - 'EUCKR': ['EUCKR', 'euc_kr', 'euc_kr'], - 'EUC_TW': ['BIG5', 'big5', 'big5'], - 'EUCTW': ['BIG5', 'big5', 'big5'], - 'ISO_8859_5': ['ISO_8859_5', 'iso8859_5', 'iso8859_5'], - 'ISO88595': ['ISO88595', 'iso8859_5', 'iso8859_5'], - 'ISO_8859_6': ['ISO_8859_6', 'iso8859_6', 'iso8859_6'], - 'ISO88596': ['ISO88596', 'iso8859_6', 'iso8859_6'], - 'ISO_8859_7': ['ISO_8859_7', 'iso8859_7', 'iso8859_7'], - 'ISO88597': ['ISO88597', 'iso8859_7', 'iso8859_7'], - 'ISO_8859_8': ['ISO_8859_8', 'iso8859_8', 'iso8859_8'], - 'ISO88598': ['ISO88598', 'iso8859_8', 'iso8859_8'], - 'KOI8R': ['KOI8R', 'koi8_r', 'koi8_r'], - 'KOI8U': ['KOI8U', 'koi8_u', 'koi8_u'], - -} - - -def get_encoding(key): - """ - :param key: Database Encoding - :return: - [Postgres_encoding, Python_encoding, typecast_encoding] - - Postgres encoding, Python encoding, type cast encoding - """ - # - # Reference: https://www.postgresql.org/docs/11/multibyte.html - - return encode_dict.get(key, ['UNICODE', 'utf-8', 'utf-8']) - - -def configure_driver_encodings(encodings): - # Replace the python encoding for original name and renamed encodings - # psycopg2 removes the underscore in conn.encoding - # Setting the encodings dict value will only help for select statements - # because for parameterized DML, param values are converted based on - # python encoding of pyscopg2s internal encodings dict. - for key, val in encode_dict.items(): - postgres_encoding, python_encoding, typecast_encoding = val - encodings[key] = python_encoding diff --git a/web/pgadmin/utils/driver/psycopg2/generate_keywords.py b/web/pgadmin/utils/driver/psycopg2/generate_keywords.py deleted file mode 100644 index 775466124..000000000 --- a/web/pgadmin/utils/driver/psycopg2/generate_keywords.py +++ /dev/null @@ -1,63 +0,0 @@ -########################################################################## -# -# pgAdmin 4 - PostgreSQL Tools -# -# Copyright (C) 2013 - 2023, The pgAdmin Development Team -# This software is released under the PostgreSQL Licence -# -# This allows us to generate to keywords.py for PostgreSQL for used by -# qtIdent and qtTypeIdent functions for scanning the keywords type. -# -# In order to generate keywords.py for specific version of PostgreSQL, put -# pg_config executable in the PATH. -# -########################################################################## - -import os -import re - -if __name__ == '__main__': - include_dir = os.popen('pg_config --includedir').read().rstrip() - version = os.popen('pg_config --version').read().rstrip() - - keywords_file = open('keywords.py', 'w') - - keywords_file.write("""########################################################################## -# -# pgAdmin 4 - PostgreSQL Tools -# -# Copyright (C) 2013 - 2023, The pgAdmin Development Team -# This software is released under the PostgreSQL Licence -# -########################################################################## -""") - keywords_file.write('# ScanKeyword function for ' + version) - keywords_file.write('\n\ndef ScanKeyword(key):') - keywords_file.write('\n keywordDict = {\n') - - idx = 0 - - with open(include_dir + "/postgresql/server/parser/kwlist.h", "rb") as ins: - - pattern = re.compile(r'"([^"]+)",\s*[^,]*\s*,\s*(.*)$') - keyword_types = [ - 'UNRESERVED_KEYWORD', 'COL_NAME_KEYWORD', - 'TYPE_FUNC_NAME_KEYWORD', 'RESERVED_KEYWORD' - ] - - for line in ins: - line = line.decode().rstrip() - if line[0:11] == 'PG_KEYWORD(' and line[-1] == ')': - match = pattern.match(line[11:-1]) - if idx != 0: - keywords_file.write(", ") - else: - keywords_file.write(" ") - keywords_file.write( - '"' + match.group(1) + '": ' + - str(keyword_types.index(match.group(2))) - ) - idx += 1 - keywords_file.write('\n }\n') - keywords_file.write( - ' return (key in keywordDict and keywordDict[key]) or None') diff --git a/web/pgadmin/utils/driver/psycopg2/keywords.py b/web/pgadmin/utils/driver/psycopg2/keywords.py deleted file mode 100644 index 6b3324186..000000000 --- a/web/pgadmin/utils/driver/psycopg2/keywords.py +++ /dev/null @@ -1,432 +0,0 @@ -########################################################################## -# -# pgAdmin 4 - PostgreSQL Tools -# -# Copyright (C) 2013 - 2023, The pgAdmin Development Team -# This software is released under the PostgreSQL Licence -# -########################################################################## - -# ScanKeyword function for PostgreSQL 9.5rc1 - - -def scan_keyword(key): - keywords = { - 'abort': 0, - 'absolute': 0, - 'access': 0, - 'action': 0, - 'add': 0, - 'admin': 0, - 'after': 0, - 'aggregate': 0, - 'all': 3, - 'also': 0, - 'alter': 0, - 'always': 0, - 'analyze': 3, - 'and': 3, - 'any': 3, - 'array': 3, - 'as': 3, - 'asc': 3, - 'assertion': 0, - 'assignment': 0, - 'asymmetric': 3, - 'at': 0, - 'attribute': 0, - 'authorization': 2, - 'backward': 0, - 'before': 0, - 'begin': 0, - 'between': 1, - 'bigint': 1, - 'binary': 2, - 'bit': 1, - 'boolean': 1, - 'both': 3, - 'by': 0, - 'cache': 0, - 'called': 0, - 'cascade': 0, - 'cascaded': 0, - 'case': 3, - 'cast': 3, - 'catalog': 0, - 'chain': 0, - 'char': 1, - 'character': 1, - 'characteristics': 0, - 'check': 3, - 'checkpoint': 0, - 'class': 0, - 'close': 0, - 'cluster': 0, - 'coalesce': 1, - 'collate': 3, - 'collation': 2, - 'column': 3, - 'comment': 0, - 'comments': 0, - 'commit': 0, - 'committed': 0, - 'concurrently': 2, - 'configuration': 0, - 'conflict': 0, - 'connection': 0, - 'constraint': 3, - 'constraints': 0, - 'content': 0, - 'continue': 0, - 'conversion': 0, - 'copy': 0, - 'cost': 0, - 'create': 3, - 'cross': 2, - 'csv': 0, - 'cube': 0, - 'current': 0, - 'current_catalog': 3, - 'current_date': 3, - 'current_role': 3, - 'current_schema': 2, - 'current_time': 3, - 'current_timestamp': 3, - 'current_user': 3, - 'cursor': 0, - 'cycle': 0, - 'data': 0, - 'database': 0, - 'day': 0, - 'deallocate': 0, - 'dec': 1, - 'decimal': 1, - 'declare': 0, - 'default': 3, - 'defaults': 0, - 'deferrable': 3, - 'deferred': 0, - 'definer': 0, - 'delete': 0, - 'delimiter': 0, - 'delimiters': 0, - 'desc': 3, - 'dictionary': 0, - 'disable': 0, - 'discard': 0, - 'distinct': 3, - 'do': 3, - 'document': 0, - 'domain': 0, - 'double': 0, - 'drop': 0, - 'each': 0, - 'else': 3, - 'enable': 0, - 'encoding': 0, - 'encrypted': 0, - 'end': 3, - 'enum': 0, - 'escape': 0, - 'event': 0, - 'except': 3, - 'exclude': 0, - 'excluding': 0, - 'exclusive': 0, - 'execute': 0, - 'exists': 1, - 'explain': 0, - 'extension': 0, - 'external': 0, - 'extract': 1, - 'false': 3, - 'family': 0, - 'fetch': 3, - 'filter': 0, - 'first': 0, - 'float': 1, - 'following': 0, - 'for': 3, - 'force': 0, - 'foreign': 3, - 'forward': 0, - 'freeze': 2, - 'from': 3, - 'full': 2, - 'function': 0, - 'functions': 0, - 'global': 0, - 'grant': 3, - 'granted': 0, - 'greatest': 1, - 'group': 3, - 'grouping': 1, - 'handler': 0, - 'having': 3, - 'header': 0, - 'hold': 0, - 'hour': 0, - 'identity': 0, - 'if': 0, - 'ilike': 2, - 'immediate': 0, - 'immutable': 0, - 'implicit': 0, - 'import': 0, - 'in': 3, - 'including': 0, - 'increment': 0, - 'index': 0, - 'indexes': 0, - 'inherit': 0, - 'inherits': 0, - 'initially': 3, - 'inline': 0, - 'inner': 2, - 'inout': 1, - 'input': 0, - 'insensitive': 0, - 'insert': 0, - 'instead': 0, - 'int': 1, - 'integer': 1, - 'intersect': 3, - 'interval': 1, - 'into': 3, - 'invoker': 0, - 'is': 2, - 'isnull': 2, - 'isolation': 0, - 'join': 2, - 'key': 0, - 'label': 0, - 'language': 0, - 'large': 0, - 'last': 0, - 'lateral': 3, - 'leading': 3, - 'leakproof': 0, - 'least': 1, - 'left': 2, - 'level': 0, - 'like': 2, - 'limit': 3, - 'listen': 0, - 'load': 0, - 'local': 0, - 'localtime': 3, - 'localtimestamp': 3, - 'location': 0, - 'lock': 0, - 'locked': 0, - 'logged': 0, - 'mapping': 0, - 'match': 0, - 'materialized': 0, - 'maxvalue': 0, - 'minute': 0, - 'minvalue': 0, - 'mode': 0, - 'month': 0, - 'move': 0, - 'name': 0, - 'names': 0, - 'national': 1, - 'natural': 2, - 'nchar': 1, - 'next': 0, - 'no': 0, - 'none': 1, - 'not': 3, - 'nothing': 0, - 'notify': 0, - 'notnull': 2, - 'nowait': 0, - 'null': 3, - 'nullif': 1, - 'nulls': 0, - 'numeric': 1, - 'object': 0, - 'of': 0, - 'off': 0, - 'offset': 3, - 'oids': 0, - 'on': 3, - 'only': 3, - 'operator': 0, - 'option': 0, - 'options': 0, - 'or': 3, - 'order': 3, - 'ordinality': 0, - 'out': 1, - 'outer': 2, - 'over': 0, - 'overlaps': 2, - 'overlay': 1, - 'owned': 0, - 'owner': 0, - 'parser': 0, - 'partial': 0, - 'partition': 0, - 'passing': 0, - 'password': 0, - 'placing': 3, - 'plans': 0, - 'policy': 0, - 'position': 1, - 'preceding': 0, - 'precision': 1, - 'prepare': 0, - 'prepared': 0, - 'preserve': 0, - 'primary': 3, - 'prior': 0, - 'privileges': 0, - 'procedural': 0, - 'procedure': 0, - 'program': 0, - 'quote': 0, - 'range': 0, - 'read': 0, - 'real': 1, - 'reassign': 0, - 'recheck': 0, - 'recursive': 0, - 'ref': 0, - 'references': 3, - 'refresh': 0, - 'reindex': 0, - 'relative': 0, - 'release': 0, - 'rename': 0, - 'repeatable': 0, - 'replace': 0, - 'replica': 0, - 'reset': 0, - 'restart': 0, - 'restrict': 0, - 'returning': 3, - 'returns': 0, - 'revoke': 0, - 'right': 2, - 'role': 0, - 'rollback': 0, - 'rollup': 0, - 'row': 1, - 'rows': 0, - 'rule': 0, - 'savepoint': 0, - 'schema': 0, - 'scroll': 0, - 'search': 0, - 'second': 0, - 'security': 0, - 'select': 3, - 'sequence': 0, - 'sequences': 0, - 'serializable': 0, - 'server': 0, - 'session': 0, - 'session_user': 3, - 'set': 0, - 'setof': 1, - 'sets': 0, - 'share': 0, - 'show': 0, - 'similar': 2, - 'simple': 0, - 'skip': 0, - 'smallint': 1, - 'snapshot': 0, - 'some': 3, - 'sql': 0, - 'stable': 0, - 'standalone': 0, - 'start': 0, - 'statement': 0, - 'statistics': 0, - 'stdin': 0, - 'stdout': 0, - 'storage': 0, - 'strict': 0, - 'strip': 0, - 'substring': 1, - 'symmetric': 3, - 'sysid': 0, - 'system': 0, - 'table': 3, - 'tables': 0, - 'tablesample': 2, - 'tablespace': 0, - 'temp': 0, - 'template': 0, - 'temporary': 0, - 'text': 0, - 'then': 3, - 'time': 1, - 'timestamp': 1, - 'to': 3, - 'trailing': 3, - 'transaction': 0, - 'transform': 0, - 'treat': 1, - 'trigger': 0, - 'trim': 1, - 'true': 3, - 'truncate': 0, - 'trusted': 0, - 'type': 0, - 'types': 0, - 'unbounded': 0, - 'uncommitted': 0, - 'unencrypted': 0, - 'union': 3, - 'unique': 3, - 'unknown': 0, - 'unlisten': 0, - 'unlogged': 0, - 'until': 0, - 'update': 0, - 'user': 3, - 'using': 3, - 'vacuum': 0, - 'valid': 0, - 'validate': 0, - 'validator': 0, - 'value': 0, - 'values': 1, - 'varchar': 1, - 'variadic': 3, - 'varying': 0, - 'verbose': 2, - 'version': 0, - 'view': 0, - 'views': 0, - 'volatile': 0, - 'when': 3, - 'where': 3, - 'whitespace': 0, - 'window': 3, - 'with': 3, - 'within': 0, - 'without': 0, - 'work': 0, - 'wrapper': 0, - 'write': 0, - 'xml': 0, - 'xmlattributes': 1, - 'xmlconcat': 1, - 'xmlelement': 1, - 'xmlexists': 1, - 'xmlforest': 1, - 'xmlparse': 1, - 'xmlpi': 1, - 'xmlroot': 1, - 'xmlserialize': 1, - 'year': 0, - 'yes': 0, - 'zone': 0, - } - - return keywords.get(key, None) diff --git a/web/pgadmin/utils/driver/psycopg2/server_manager.py b/web/pgadmin/utils/driver/psycopg2/server_manager.py deleted file mode 100644 index 300d05f4a..000000000 --- a/web/pgadmin/utils/driver/psycopg2/server_manager.py +++ /dev/null @@ -1,677 +0,0 @@ -########################################################################## -# -# pgAdmin 4 - PostgreSQL Tools -# -# Copyright (C) 2013 - 2023, The pgAdmin Development Team -# This software is released under the PostgreSQL Licence -# -########################################################################## - -""" -Implementation of ServerManager -""" -import os -import datetime -import config -import logging -from flask import current_app, session -from flask_security import current_user -from flask_babel import gettext -from werkzeug.exceptions import InternalServerError - -from pgadmin.utils import get_complete_file_path -from pgadmin.utils.crypto import decrypt -from pgadmin.utils.master_password import process_masterpass_disabled -from .connection import Connection -from pgadmin.model import Server, User -from pgadmin.utils.exception import ConnectionLost, SSHTunnelConnectionLost,\ - CryptKeyMissing -from pgadmin.utils.master_password import get_crypt_key -from pgadmin.utils.exception import ObjectGone -from pgadmin.utils.passexec import PasswordExec -from psycopg2.extensions import make_dsn - -if config.SUPPORT_SSH_TUNNEL: - from sshtunnel import SSHTunnelForwarder, BaseSSHTunnelForwarderError - - -class ServerManager(): - """ - class ServerManager - - This class contains the information about the given server. - And, acts as connection manager for that particular session. - """ - _INFORMATION_MSG = gettext("Information is not available.") - - def __init__(self, server): - self.connections = dict() - self.local_bind_host = '127.0.0.1' - self.local_bind_port = None - self.tunnel_object = None - self.tunnel_created = False - self.display_connection_string = '' - - self.update(server) - - def update(self, server): - assert server is not None - assert isinstance(server, Server) - - self.ver = None - self.sversion = None - self.server_type = None - self.server_cls = None - self.password = None - self.tunnel_password = None - - self.sid = server.id - self.host = server.host - self.port = server.port - self.db = server.maintenance_db - self.shared = server.shared - self.did = None - self.user = server.username - self.password = server.password - self.role = server.role - self.pinged = datetime.datetime.now() - self.db_info = dict() - self.server_types = None - self.db_res = server.db_res - self.passexec = \ - PasswordExec(server.passexec_cmd, server.passexec_expiration) \ - if server.passexec_cmd else None - self.service = server.service - if config.SUPPORT_SSH_TUNNEL: - self.use_ssh_tunnel = server.use_ssh_tunnel - self.tunnel_host = server.tunnel_host - self.tunnel_port = \ - 22 if server.tunnel_port is None else server.tunnel_port - self.tunnel_username = server.tunnel_username - self.tunnel_authentication = 0 \ - if server.tunnel_authentication is None \ - else server.tunnel_authentication - self.tunnel_identity_file = server.tunnel_identity_file - self.tunnel_password = server.tunnel_password - else: - self.use_ssh_tunnel = 0 - self.tunnel_host = None - self.tunnel_port = 22 - self.tunnel_username = None - self.tunnel_authentication = None - self.tunnel_identity_file = None - self.tunnel_password = None - - self.kerberos_conn = server.kerberos_conn - self.gss_authenticated = False - self.gss_encrypted = False - self.connection_params = server.connection_params - self.create_connection_string(self.db, self.user) - - for con in self.connections: - self.connections[con]._release() - - self.update_session() - - self.connections = dict() - - def _set_password(self, res): - """ - Set password for server manager object. - :param res: response dict. - :return: - """ - if hasattr(self, 'password') and self.password: - if hasattr(self.password, 'decode'): - res['password'] = self.password.decode('utf-8') - else: - res['password'] = str(self.password) - else: - res['password'] = self.password - - def as_dict(self): - """ - Returns a dictionary object representing the server manager. - """ - if self.ver is None or len(self.connections) == 0: - return None - - res = dict() - res['sid'] = self.sid - res['ver'] = self.ver - res['sversion'] = self.sversion - - self._set_password(res) - - if self.use_ssh_tunnel: - if hasattr(self, 'tunnel_password') and self.tunnel_password: - if hasattr(self.tunnel_password, 'decode'): - res['tunnel_password'] = \ - self.tunnel_password.decode('utf-8') - else: - res['tunnel_password'] = str(self.tunnel_password) - else: - res['tunnel_password'] = self.tunnel_password - - connections = res['connections'] = dict() - - for conn_id in self.connections: - conn = self.connections[conn_id].as_dict() - - if conn is not None: - connections[conn_id] = conn - - return res - - def server_version(self): - return self.ver - - @property - def version(self): - return self.sversion - - def major_version(self): - if self.sversion is not None: - return int(self.sversion / 10000) - raise InternalServerError(self._INFORMATION_MSG) - - def minor_version(self): - if self.sversion: - return int(int(self.sversion / 100) % 100) - raise InternalServerError(self._INFORMATION_MSG) - - def patch_version(self): - if self.sversion: - return int(int(self.sversion / 100) / 100) - raise InternalServerError(self._INFORMATION_MSG) - - def connection(self, **kwargs): - database = kwargs.get('database', None) - conn_id = kwargs.get('conn_id', None) - auto_reconnect = kwargs.get('auto_reconnect', True) - did = kwargs.get('did', None) - async_ = kwargs.get('async_', None) - use_binary_placeholder = kwargs.get('use_binary_placeholder', False) - array_to_string = kwargs.get('array_to_string', False) - - if database is not None: - if did is not None and did in self.db_info: - self.db_info[did]['datname'] = database - else: - if did is None: - database = self.db - elif did in self.db_info: - database = self.db_info[did]['datname'] - else: - maintenance_db_id = 'DB:{0}'.format(self.db) - if maintenance_db_id in self.connections: - conn = self.connections[maintenance_db_id] - # try to connect maintenance db if not connected - if not conn.connected(): - conn.connect() - - if conn.connected(): - status, res = conn.execute_dict(""" -SELECT - db.oid as did, db.datname, db.datallowconn, - pg_catalog.pg_encoding_to_char(db.encoding) AS serverencoding, - pg_catalog.has_database_privilege(db.oid, 'CREATE') as cancreate, - datistemplate -FROM - pg_catalog.pg_database db -WHERE db.oid = {0}""".format(did)) - - if status and len(res['rows']) > 0: - for row in res['rows']: - self.db_info[did] = row - database = self.db_info[did]['datname'] - - if did not in self.db_info: - raise ObjectGone(gettext( - "Could not find the specified database." - )) - - if not get_crypt_key()[0]: - # the reason its not connected might be missing key - raise CryptKeyMissing() - - if database is None: - # Check SSH Tunnel is alive or not. - if self.use_ssh_tunnel == 1: - self.check_ssh_tunnel_alive() - else: - raise ConnectionLost(self.sid, None, None) - - my_id = ('CONN:{0}'.format(conn_id)) if conn_id is not None else \ - ('DB:{0}'.format(database)) - - self.pinged = datetime.datetime.now() - - if my_id in self.connections: - return self.connections[my_id] - else: - if async_ is None: - async_ = 1 if conn_id is not None else 0 - else: - async_ = 1 if async_ is True else 0 - self.connections[my_id] = Connection( - self, my_id, database, auto_reconnect=auto_reconnect, - async_=async_, - use_binary_placeholder=use_binary_placeholder, - array_to_string=array_to_string - ) - - return self.connections[my_id] - - @staticmethod - def _get_password_to_conn(data, masterpass_processed): - """ - Get password for connect to server with simple and ssh connection. - :param data: Data. - :param masterpass_processed: - :return: - """ - # The data variable is a copy so is not automatically synced - # update here - if masterpass_processed and 'password' in data: - data['password'] = None - if masterpass_processed and 'tunnel_password' in data: - data['tunnel_password'] = None - - def _get_server_type(self): - """ - Get server type and server cls. - :return: - """ - from pgadmin.browser.server_groups.servers.types import ServerType - - if self.ver and not self.server_type: - for st in ServerType.types(): - if st.instance_of(self.ver): - self.server_type = st.stype - self.server_cls = st - break - - def _check_and_reconnect_server(self, conn, conn_info, data): - """ - Check and try to reconnect the server if server previously connected - and auto_reconnect is true. - :param conn: - :type conn: - :param conn_info: - :type conn_info: - :param data: - :type data: - :return: - :rtype: - """ - from pgadmin.browser.server_groups.servers.types import ServerType - if conn_info['wasConnected'] and conn_info['auto_reconnect']: - try: - # Check SSH Tunnel needs to be created - if self.use_ssh_tunnel == 1 and \ - not self.tunnel_created: - status, error = self.create_ssh_tunnel( - data['tunnel_password']) - - # Check SSH Tunnel is alive or not. - self.check_ssh_tunnel_alive() - - conn.connect( - password=data['password'], - server_types=ServerType.types() - ) - # This will also update wasConnected flag in - # connection so no need to update the flag manually. - except CryptKeyMissing: - # maintain the status as this will help to restore once - # the key is available - conn.wasConnected = conn_info['wasConnected'] - conn.auto_reconnect = conn_info['auto_reconnect'] - except Exception as e: - current_app.logger.exception(e) - self.connections.pop(conn_info['conn_id']) - raise - - def _restore(self, data): - """ - Helps restoring to reconnect the auto-connect connections smoothly on - reload/restart of the app server.. - """ - # restore server version from flask session if flask server was - # restarted. As we need server version to resolve sql template paths. - masterpass_processed = process_masterpass_disabled() - - ServerManager._get_password_to_conn(data, masterpass_processed) - # Get server type. - self._get_server_type() - - # We need to know about the existing server variant supports during - # first connection for identifications. - self.pinged = datetime.datetime.now() - try: - if 'password' in data and data['password'] and \ - hasattr(data['password'], 'encode'): - data['password'] = data['password'].encode('utf-8') - if 'tunnel_password' in data and data['tunnel_password']: - data['tunnel_password'] = \ - data['tunnel_password'].encode('utf-8') - except Exception as e: - current_app.logger.exception(e) - - connections = data['connections'] - - for conn_id in connections: - conn_info = connections[conn_id] - if conn_info['conn_id'] in self.connections: - conn = self.connections[conn_info['conn_id']] - else: - conn = self.connections[conn_info['conn_id']] = Connection( - self, conn_info['conn_id'], conn_info['database'], - auto_reconnect=conn_info['auto_reconnect'], - async_=conn_info['async_'], - use_binary_placeholder=conn_info[ - 'use_binary_placeholder'], - array_to_string=conn_info['array_to_string'] - ) - - # only try to reconnect - self._check_and_reconnect_server(conn, conn_info, data) - - def _restore_connections(self): - for conn_id in self.connections: - conn = self.connections[conn_id] - # only try to reconnect if connection was connected previously - # and auto_reconnect is true. - wasConnected = conn.wasConnected - auto_reconnect = conn.auto_reconnect - if conn.wasConnected and conn.auto_reconnect: - try: - # Check SSH Tunnel needs to be created - if self.use_ssh_tunnel == 1 and \ - not self.tunnel_created: - status, error = self.create_ssh_tunnel( - self.tunnel_password - ) - - # Check SSH Tunnel is alive or not. - self.check_ssh_tunnel_alive() - - conn.connect() - # This will also update wasConnected flag in - # connection so no need to update the flag manually. - except CryptKeyMissing: - # maintain the status as this will help to restore once - # the key is available - conn.wasConnected = wasConnected - conn.auto_reconnect = auto_reconnect - except Exception as e: - self.connections.pop(conn_id) - current_app.logger.exception(e) - raise - - def _stop_ssh_tunnel(self, did, database, conn_id): - """ - Stop ssh tunnel connection if function call without any parameter. - :param did: Database Id. - :param database: Database. - :param conn_id: COnnection Id. - :return: - """ - if database is None and conn_id is None and did is None: - self.stop_ssh_tunnel() - - def _check_db_info(self, did, conn_id, database): - """ - Check did is not none and it is resent in db_info. - :param did: Database Id. - :param conn_id: Connection Id. - :return: - """ - if database is None and conn_id is None and did is None: - self.stop_ssh_tunnel() - - my_id = None - if did is not None: - if did in self.db_info and 'datname' in self.db_info[did]: - database = self.db_info[did]['datname'] - if database is None: - return True, False, my_id - else: - return True, False, my_id - - if conn_id is not None: - my_id = 'CONN:{0}'.format(conn_id) - elif database is not None: - my_id = 'DB:{0}'.format(database) - - return False, True, my_id - - def release(self, database=None, conn_id=None, did=None): - # Stop the SSH tunnel if release() function calls without - # any parameter. - is_return, return_value, my_id = self._check_db_info(did, conn_id, - database) - if is_return: - return return_value - - if my_id is not None: - if my_id in self.connections: - self.connections[my_id]._release() - del self.connections[my_id] - if did is not None: - del self.db_info[did] - - if len(self.connections) == 0: - self.ver = None - self.sversion = None - self.server_type = None - self.server_cls = None - self.password = None - - self.update_session() - - return True - else: - return False - - for con_key in list(self.connections.keys()): - conn = self.connections[con_key] - # Cancel the ongoing transaction before closing the connection - # as it may hang forever - if conn.connected() and conn.conn_id is not None and \ - conn.conn_id.startswith('CONN:'): - conn.cancel_transaction(conn.conn_id[5:]) - conn._release() - - self.connections = dict() - self.ver = None - self.sversion = None - self.server_type = None - self.server_cls = None - self.password = None - - self.update_session() - - return True - - def _update_password(self, passwd): - self.password = passwd - for conn_id in self.connections: - conn = self.connections[conn_id] - if conn.conn is not None or conn.wasConnected is True: - conn.password = passwd - - def update_session(self): - managers = session['__pgsql_server_managers'] \ - if '__pgsql_server_managers' in session else dict() - updated_mgr = self.as_dict() - - if not updated_mgr: - if self.sid in managers: - managers.pop(self.sid) - else: - managers[self.sid] = updated_mgr - session['__pgsql_server_managers'] = managers - session.force_write = True - - def utility(self, operation): - """ - utility(operation) - - Returns: name of the utility which used for the operation - """ - if self.server_cls is not None: - return self.server_cls.utility(operation, self.sversion) - - return None - - def export_password_env(self, env): - if self.password: - crypt_key_present, crypt_key = get_crypt_key() - if not crypt_key_present: - return False, crypt_key - - password = decrypt(self.password, crypt_key).decode() - os.environ[str(env)] = password - - def create_ssh_tunnel(self, tunnel_password): - """ - This method is used to create ssh tunnel and update the IP Address and - IP Address and port to localhost and the local bind port return by the - SSHTunnelForwarder class. - :return: True if tunnel is successfully created else error message. - """ - # Fetch Logged in User Details. - user = User.query.filter_by(id=current_user.id).first() - if user is None: - return False, gettext("Unauthorized request.") - - if tunnel_password is not None and tunnel_password != '': - crypt_key_present, crypt_key = get_crypt_key() - if not crypt_key_present: - raise CryptKeyMissing() - - try: - tunnel_password = decrypt(tunnel_password, crypt_key) - # password is in bytes, for python3 we need it in string - if isinstance(tunnel_password, bytes): - tunnel_password = tunnel_password.decode() - - except Exception as e: - current_app.logger.exception(e) - return False, gettext("Failed to decrypt the SSH tunnel " - "password.\nError: {0}").format(str(e)) - - try: - # If authentication method is 1 then it uses identity file - # and password - ssh_logger = None - if current_app.debug: - ssh_logger = logging.getLogger('sshtunnel') - ssh_logger.setLevel(logging.DEBUG) - for h in current_app.logger.handlers: - ssh_logger.addHandler(h) - if self.tunnel_authentication == 1: - self.tunnel_object = SSHTunnelForwarder( - (self.tunnel_host, int(self.tunnel_port)), - ssh_username=self.tunnel_username, - ssh_pkey=get_complete_file_path(self.tunnel_identity_file), - ssh_private_key_password=tunnel_password, - remote_bind_address=(self.host, self.port), - logger=ssh_logger - ) - else: - self.tunnel_object = SSHTunnelForwarder( - (self.tunnel_host, int(self.tunnel_port)), - ssh_username=self.tunnel_username, - ssh_password=tunnel_password, - remote_bind_address=(self.host, self.port), - logger=ssh_logger - ) - # flag tunnel threads in daemon mode to fix hang issue. - self.tunnel_object.daemon_forward_servers = True - self.tunnel_object.start() - self.tunnel_created = True - except BaseSSHTunnelForwarderError as e: - current_app.logger.exception(e) - return False, gettext("Failed to create the SSH tunnel.\n" - "Error: {0}").format(str(e)) - - # Update the port to communicate locally - self.local_bind_port = self.tunnel_object.local_bind_port - - return True, None - - def check_ssh_tunnel_alive(self): - # Check SSH Tunnel is alive or not. if it is not then - # raise the ConnectionLost exception. - if self.tunnel_object is None or not self.tunnel_object.is_active: - self.tunnel_created = False - raise SSHTunnelConnectionLost(self.tunnel_host) - - def stop_ssh_tunnel(self): - # Stop the SSH tunnel if created. - if self.tunnel_object and self.tunnel_object.is_active: - self.tunnel_object.stop() - self.local_bind_port = None - self.tunnel_object = None - self.tunnel_created = False - - def get_connection_param_value(self, param_name): - """ - This function return the value of param_name if found in the - connection parameter. - """ - value = None - if self.connection_params and param_name in self.connection_params: - value = self.connection_params[param_name] - - return value - - def create_connection_string(self, database, user, password=None): - """ - This function is used to create connection string based on the - parameters. - """ - dsn_args = dict() - dsn_args['host'] = \ - self.local_bind_host if self.use_ssh_tunnel else self.host - dsn_args['port'] = \ - self.local_bind_port if self.use_ssh_tunnel else self.port - dsn_args['dbname'] = database - dsn_args['user'] = user - if self.service is not None: - dsn_args['service'] = self.service - - # Make a copy to display the connection string on GUI. - display_dsn_args = dsn_args.copy() - # Password should not be visible into the connection string, so - # setting the value with password to 'xxxxxxx'. - if password: - display_dsn_args['password'] = 'xxxxxxx' - dsn_args['password'] = password - - # Loop through all the connection parameters set in the server dialog. - if self.connection_params and isinstance(self.connection_params, dict): - for key, value in self.connection_params.items(): - with_complete_path = False - orig_value = value - # Getting complete file path if the key is one of the below. - if key in ['passfile', 'sslcert', 'sslkey', 'sslrootcert', - 'sslcrl', 'sslcrldir']: - with_complete_path = True - value = get_complete_file_path(value) - - # In case of host address need to check ssh tunnel flag. - if key == 'hostaddr': - value = self.local_bind_host if self.use_ssh_tunnel else \ - value - - dsn_args[key] = value - display_dsn_args[key] = orig_value if with_complete_path else \ - value - - self.display_connection_string = make_dsn(**display_dsn_args) - - return make_dsn(**dsn_args) diff --git a/web/pgadmin/utils/driver/psycopg2/typecast.py b/web/pgadmin/utils/driver/psycopg2/typecast.py deleted file mode 100644 index 9d618d6b9..000000000 --- a/web/pgadmin/utils/driver/psycopg2/typecast.py +++ /dev/null @@ -1,265 +0,0 @@ -########################################################################## -# -# pgAdmin 4 - PostgreSQL Tools -# -# Copyright (C) 2013 - 2023, The pgAdmin Development Team -# This software is released under the PostgreSQL Licence -# -########################################################################## - -""" -Typecast various data types so that they can be compatible with Javascript -data types. -""" - -from psycopg2 import STRING as _STRING -from psycopg2.extensions import FLOAT as _FLOAT -from psycopg2.extensions import DECIMAL as _DECIMAL, encodings -import psycopg2 -from psycopg2.extras import Json as psycopg2_json - -from .encoding import configure_driver_encodings, get_encoding - -configure_driver_encodings(encodings) - -# OIDs of data types which need to typecast as string to avoid JavaScript -# compatibility issues. -# e.g JavaScript does not support 64 bit integers. It has 64-bit double -# giving only 53 bits of integer range (IEEE 754) -# So to avoid loss of remaining 11 bits (64-53) we need to typecast bigint to -# string. - -TO_STRING_DATATYPES = ( - # To cast bytea, interval type - 17, 1186, - - # date, timestamp, timestamp with zone, time without time zone - 1082, 1114, 1184, 1083 -) - -TO_STRING_NUMERIC_DATATYPES = ( - # Real, double precision, numeric, bigint - 700, 701, 1700, 20 -) - -# OIDs of array data types which need to typecast to array of string. -# This list may contain: -# OIDs of data types from PSYCOPG_SUPPORTED_ARRAY_DATATYPES as they need to be -# typecast to array of string. -# Also OIDs of data types which psycopg2 does not typecast array of that -# data type. e.g: uuid, bit, varbit, etc. - -TO_ARRAY_OF_STRING_DATATYPES = ( - # To cast bytea[] type - 1001, - - # bigint[] - 1016, - - # double precision[], real[] - 1022, 1021, - - # bit[], varbit[] - 1561, 1563, -) - -# OID of record array data type -RECORD_ARRAY = (2287,) - -# OIDs of builtin array datatypes supported by psycopg2 -# OID reference psycopg2/psycopg/typecast_builtins.c -# -# For these array data types psycopg2 returns result in list. -# For all other array data types psycopg2 returns result as string (string -# representing array literal) -# e.g: -# -# For below two sql psycopg2 returns result in different formats. -# SELECT '{foo,bar}'::text[]; -# print('type of {} ==> {}'.format(res[0], type(res[0]))) -# SELECT '{foo,bar}'::xml[]; -# print('type of {} ==> {}'.format(res[0], type(res[0]))) -# -# Output: -# type of ['foo', 'bar'] ==> -# type of {foo,bar} ==> - -PSYCOPG_SUPPORTED_BUILTIN_ARRAY_DATATYPES = ( - 1016, 1005, 1006, 1007, 1021, 1022, 1231, - 1002, 1003, 1009, 1014, 1015, 1009, 1014, - 1015, 1000, 1115, 1185, 1183, 1270, 1182, - 1187, 1001, 1028, 1013, 1041, 651, 1040 -) - -# json, jsonb -# OID reference psycopg2/lib/_json.py -PSYCOPG_SUPPORTED_JSON_TYPES = (114, 3802) - -# json[], jsonb[] -PSYCOPG_SUPPORTED_JSON_ARRAY_TYPES = (199, 3807) - -ALL_JSON_TYPES = PSYCOPG_SUPPORTED_JSON_TYPES +\ - PSYCOPG_SUPPORTED_JSON_ARRAY_TYPES - -# INET[], CIDR[] -# OID reference psycopg2/lib/_ipaddress.py -PSYCOPG_SUPPORTED_IPADDRESS_ARRAY_TYPES = (1041, 651) - -# uuid[] -# OID reference psycopg2/lib/extras.py -PSYCOPG_SUPPORTED_IPADDRESS_ARRAY_TYPES = (2951,) - -# int4range, int8range, numrange, daterange tsrange, tstzrange[] -# OID reference psycopg2/lib/_range.py -PSYCOPG_SUPPORTED_RANGE_TYPES = (3904, 3926, 3906, 3912, 3908, 3910) - -# int4range[], int8range[], numrange[], daterange[] tsrange[], tstzrange[] -# OID reference psycopg2/lib/_range.py -PSYCOPG_SUPPORTED_RANGE_ARRAY_TYPES = (3905, 3927, 3907, 3913, 3909, 3911) - - -def register_global_typecasters(): - unicode_type_for_record = psycopg2.extensions.new_type( - (2249,), - "RECORD", - psycopg2.extensions.UNICODE - ) - - unicode_array_type_for_record_array = psycopg2.extensions.new_array_type( - RECORD_ARRAY, - "ARRAY_RECORD", - unicode_type_for_record - ) - - # This registers a unicode type caster for datatype 'RECORD'. - psycopg2.extensions.register_type(unicode_type_for_record) - - # This registers a array unicode type caster for datatype 'ARRAY_RECORD'. - psycopg2.extensions.register_type(unicode_array_type_for_record_array) - - # define type caster to convert various pg types into string type - pg_types_to_string_type = psycopg2.extensions.new_type( - TO_STRING_DATATYPES + TO_STRING_NUMERIC_DATATYPES + - PSYCOPG_SUPPORTED_RANGE_TYPES, 'TYPECAST_TO_STRING', _STRING - ) - - # define type caster to convert pg array types of above types into - # array of string type - pg_array_types_to_array_of_string_type = \ - psycopg2.extensions.new_array_type( - TO_ARRAY_OF_STRING_DATATYPES, - 'TYPECAST_TO_ARRAY_OF_STRING', pg_types_to_string_type - ) - - # This registers a type caster to convert various pg types into string type - psycopg2.extensions.register_type(pg_types_to_string_type) - - # This registers a type caster to convert various pg array types into - # array of string type - psycopg2.extensions.register_type(pg_array_types_to_array_of_string_type) - - # Treat JSON data as text because converting it to dict alters the data - # which should not happen as per postgres docs - psycopg2.extras.register_default_json(loads=lambda x: x) - psycopg2.extras.register_default_jsonb(loads=lambda x: x) - - # pysycopg2 adapt does not support dict by default. Need to register - # Used http://initd.org/psycopg/docs/extras.html#json-adaptation - psycopg2.extensions.register_adapter(dict, psycopg2_json) - - -def register_string_typecasters(connection): - # raw_unicode_escape used for SQL ASCII will escape the - # characters. Here we unescape them using unicode_escape - # and send ahead. When insert update is done, the characters - # are escaped again and sent to the DB. - - postgres_encoding, python_encoding, typecast_encoding = \ - get_encoding(connection.encoding) - if postgres_encoding != 'UNICODE': - def non_ascii_escape(value, cursor): - if value is None: - return None - return bytes( - value, encodings[cursor.connection.encoding] - ).decode(typecast_encoding, errors='replace') - - unicode_type = psycopg2.extensions.new_type( - # "char", name, text, character, character varying - (19, 18, 25, 1042, 1043, 0), - 'UNICODE', non_ascii_escape) - - unicode_array_type = psycopg2.extensions.new_array_type( - # "char"[], name[], text[], character[], character varying[] - (1002, 1003, 1009, 1014, 1015, 0 - ), 'UNICODEARRAY', unicode_type) - - psycopg2.extensions.register_type(unicode_type, connection) - psycopg2.extensions.register_type(unicode_array_type, connection) - - -def numeric_typecasters(results, conn_obj): - # This function is to convert pg types to numeic type caster - - numeric_cols = [] - for obj_type in conn_obj.column_info: - if obj_type['type_code'] in TO_STRING_NUMERIC_DATATYPES: - numeric_cols.append(obj_type['name']) - - for result in results: - for key, value in result.items(): - if isinstance(result[key], - str) and key in numeric_cols and not value.isdigit(): - result[key] = float(result[key]) - elif isinstance(result[key], str) and key in numeric_cols: - result[key] = int(result[key]) - return results - - -def register_binary_typecasters(connection): - psycopg2.extensions.register_type( - psycopg2.extensions.new_type( - ( - # To cast bytea type - 17, - ), - 'BYTEA_PLACEHOLDER', - # Only show placeholder if data actually exists. - lambda value, cursor: 'binary data' - if value is not None else None), - connection - ) - - psycopg2.extensions.register_type( - psycopg2.extensions.new_type( - ( - # To cast bytea[] type - 1001, - ), - 'BYTEA_ARRAY_PLACEHOLDER', - # Only show placeholder if data actually exists. - lambda value, cursor: 'binary data[]' - if value is not None else None), - connection - ) - - -def register_array_to_string_typecasters(connection): - psycopg2.extensions.register_type( - psycopg2.extensions.new_type( - PSYCOPG_SUPPORTED_BUILTIN_ARRAY_DATATYPES + - PSYCOPG_SUPPORTED_JSON_ARRAY_TYPES + - PSYCOPG_SUPPORTED_IPADDRESS_ARRAY_TYPES + - PSYCOPG_SUPPORTED_RANGE_ARRAY_TYPES + - TO_ARRAY_OF_STRING_DATATYPES, - 'ARRAY_TO_STRING', - _STRING), - connection - ) - - -def unregister_numeric_typecasters(connection): - string_type_to_decimal = \ - psycopg2.extensions.new_type(TO_STRING_NUMERIC_DATATYPES, - 'TYPECAST_TO_DECIMAL', _DECIMAL) - psycopg2.extensions.register_type(string_type_to_decimal, connection) diff --git a/web/pgadmin/utils/driver/registry.py b/web/pgadmin/utils/driver/registry.py index 23b940d09..2a8352297 100644 --- a/web/pgadmin/utils/driver/registry.py +++ b/web/pgadmin/utils/driver/registry.py @@ -9,7 +9,6 @@ from abc import ABCMeta -from pgadmin.utils.constants import PSYCOPG2 from pgadmin.utils.dynamic_registry import create_registry_metaclass import config @@ -18,12 +17,8 @@ import config def load_modules(cls, app=None): submodules = [] - if config.PG_DEFAULT_DRIVER == PSYCOPG2: - from . import psycopg2 as module - submodules.append(module) - else: - from . import psycopg3 as module - submodules.append(module) + from . import psycopg3 as module + submodules.append(module) from . import abstract as module submodules.append(module) diff --git a/web/pgadmin/utils/route.py b/web/pgadmin/utils/route.py index 47ba73407..173348cc8 100644 --- a/web/pgadmin/utils/route.py +++ b/web/pgadmin/utils/route.py @@ -14,7 +14,7 @@ from importlib import import_module from werkzeug.utils import find_modules from pgadmin.utils import server_utils -from pgadmin.utils.constants import PSYCOPG2, PSYCOPG3 +from pgadmin.utils.constants import PSYCOPG3 from .. import socketio import unittest @@ -69,9 +69,6 @@ class TestsGeneratorRegistry(ABCMeta): try: for module_name in find_modules(pkg_root, False, True): - if module_name.find(PSYCOPG2) != -1: - print("Skipping ", module_name) - continue all_modules.append(module_name) except Exception: pass diff --git a/web/pgadmin/utils/tests/test_encoding.py b/web/pgadmin/utils/tests/test_encoding.py deleted file mode 100644 index 36668f275..000000000 --- a/web/pgadmin/utils/tests/test_encoding.py +++ /dev/null @@ -1,227 +0,0 @@ -####################################################################### -# -# pgAdmin 4 - PostgreSQL Tools -# -# Copyright (C) 2013 - 2023, The pgAdmin Development Team -# This software is released under the PostgreSQL Licence -# -########################################################################## -import config -from pgadmin.utils.driver.psycopg2.encoding import get_encoding -from pgadmin.utils.route import BaseTestGenerator -from pgadmin.utils.constants import PSYCOPG3 - - -class TestEncoding(BaseTestGenerator): - scenarios = [ - ( - 'When the database encoding is SQL_ASCII', - dict( - db_encoding='raw_unicode_escape', - expected_return_value=['SQL_ASCII', 'raw-unicode-escape', - 'unicode-escape'] - ) - ), ( - 'When the database encoding is LATIN1', - dict( - db_encoding='latin1', - expected_return_value=['LATIN1', 'iso8859-1', 'iso8859-1'] - ) - ), ( - 'When the database encoding is LATIN2', - dict( - db_encoding='latin2', - expected_return_value=['LATIN2', 'iso8859-2', 'iso8859-2'] - ) - ), ( - 'When the database encoding is LATIN3', - dict( - db_encoding='latin3', - expected_return_value=['LATIN3', 'iso8859-3', 'iso8859-3'] - ) - ), ( - 'When the database encoding is LATIN4', - dict( - db_encoding='latin4', - expected_return_value=['LATIN4', 'iso8859-4', 'iso8859-4'] - ) - ), ( - 'When the database encoding is LATIN5', - dict( - db_encoding='latin5', - expected_return_value=['LATIN5', 'iso8859-9', 'iso8859-9'] - ) - ), ( - 'When the database encoding is LATIN6', - dict( - db_encoding='latin6', - expected_return_value=['LATIN6', 'iso8859-10', 'iso8859-10'] - ) - ), ( - 'When the database encoding is LATIN7', - dict( - db_encoding='latin7', - expected_return_value=['LATIN7', 'iso8859-13', 'iso8859-13'] - ) - ), ( - 'When the database encoding is LATIN8', - dict( - db_encoding='latin8', - expected_return_value=['LATIN8', 'iso8859-14', 'iso8859-14'] - ) - ), ( - 'When the database encoding is LATIN9', - dict( - db_encoding='latin9', - expected_return_value=['LATIN9', 'iso8859-15', 'iso8859-15'] - ) - ), ( - 'When the database encoding is LATIN10', - dict( - db_encoding='latin10', - expected_return_value=['LATIN10', 'iso8859-16', 'iso8859-16'] - ) - ), ( - 'When the database encoding is WIN1250', - dict( - db_encoding='cp1250', - expected_return_value=['WIN1250', 'cp1250', 'cp1250'] - ) - ), ( - 'When the database encoding is WIN1251', - dict( - db_encoding='cp1251', - expected_return_value=['WIN1251', 'cp1251', 'cp1251'] - ) - ), ( - 'When the database encoding is WIN1252', - dict( - db_encoding='cp1252', - expected_return_value=['WIN1252', 'cp1252', 'cp1252'] - ) - ), ( - 'When the database encoding is WIN1253', - dict( - db_encoding='cp1253', - expected_return_value=['WIN1253', 'cp1253', 'cp1253'] - ) - ), ( - 'When the database encoding is WIN1254', - dict( - db_encoding='cp1254', - expected_return_value=['WIN1254', 'cp1254', 'cp1254'] - ) - ), ( - 'When the database encoding is WIN1255', - dict( - db_encoding='cp1255', - expected_return_value=['WIN1255', 'cp1255', 'cp1255'] - ) - ), ( - 'When the database encoding is WIN1256', - dict( - db_encoding='cp1256', - expected_return_value=['WIN1256', 'cp1256', 'cp1256'] - ) - ), ( - 'When the database encoding is WIN1257', - dict( - db_encoding='cp1257', - expected_return_value=['WIN1257', 'cp1257', 'cp1257'] - ) - ), ( - 'When the database encoding is WIN1258', - dict( - db_encoding='cp1258', - expected_return_value=['WIN1258', 'cp1258', 'cp1258'] - ) - ), ( - 'When the database encoding is EUC_JIS_2004', - dict( - db_encoding='eucjis2004', - expected_return_value=['EUC_JIS_2004', 'euc_jis_2004', - 'euc_jis_2004'] - ) - ), ( - 'When the database encoding is EUC_CN', - dict( - db_encoding='euc-cn', - expected_return_value=['EUC_CN', 'gb2312', 'gb2312'] - ) - ), ( - 'When the database encoding is EUC_JP', - dict( - db_encoding='euc_jp', - expected_return_value=['EUC_JP', 'euc_jp', 'euc_jp'] - ) - ), ( - 'When the database encoding is EUC_KR', - dict( - db_encoding='euc_kr', - expected_return_value=['EUC_KR', 'euc_kr', 'euc_kr'] - ) - ), ( - 'When the database encoding is EUC_TW', - dict( - db_encoding='big5', - expected_return_value=['BIG5', 'big5', 'big5'] - ) - ), ( - 'When the database encoding is ISO_8859_5', - dict( - db_encoding='iso8859_5', - expected_return_value=['ISO_8859_5', 'iso8859-5', 'iso8859-5'] - ) - ), ( - 'When the database encoding is ISO_8859_6', - dict( - db_encoding='iso8859_6', - expected_return_value=['ISO_8859_6', 'iso8859-6', 'iso8859-6'] - ) - ), ( - 'When the database encoding is ISO_8859_7', - dict( - db_encoding='iso8859_7', - expected_return_value=['ISO_8859_7', 'iso8859-7', 'iso8859-7'] - ) - ), ( - 'When the database encoding is ISO_8859_8', - dict( - db_encoding='iso8859_8', - expected_return_value=['ISO_8859_8', 'iso8859-8', 'iso8859-8'] - ) - ), ( - 'When the database encoding is KOI8R', - dict( - db_encoding='koi8_r', - expected_return_value=['KOI8R', 'koi8-r', 'koi8-r'] - ) - ), ( - 'When the database encoding is KOI8U', - dict( - db_encoding='koi8_u', - expected_return_value=['KOI8U', 'koi8-u', 'koi8-u'] - ) - ), ( - 'When the database encoding is WIN866', - dict( - db_encoding='cp866', - expected_return_value=['WIN866', 'cp866', 'cp866'] - ) - ), ( - 'When the database encoding is WIN874', - dict( - db_encoding='cp874', - expected_return_value=['WIN874', 'cp874', 'cp874'] - ) - ), - ] - - def setUp(self): - if config.PG_DEFAULT_DRIVER == PSYCOPG3: - self.skipTest('Skipping for psycopg3 ' - 'as we get the mapping from the driver itself.') - - def runTest(self): - result = get_encoding(self.db_encoding) - self.assertEqual(result, self.expected_return_value) diff --git a/web/regression/python_test_utils/test_utils.py b/web/regression/python_test_utils/test_utils.py index 6d156fc64..d237bde4b 100644 --- a/web/regression/python_test_utils/test_utils.py +++ b/web/regression/python_test_utils/test_utils.py @@ -39,12 +39,8 @@ from pgadmin.utils.constants import BINARY_PATHS, PSYCOPG3 from pgadmin.utils import set_binary_path from functools import wraps +import psycopg -# Remove this condition, once psycopg2 will be removed completely -if config.PG_DEFAULT_DRIVER == PSYCOPG3: - import psycopg -else: - import psycopg2 as psycopg CURRENT_PATH = os.path.abspath(os.path.join(os.path.dirname( os.path.realpath(__file__)), "../")) diff --git a/web/regression/re_sql/tests/test_resql.py b/web/regression/re_sql/tests/test_resql.py index 34c14a86d..056c353a6 100644 --- a/web/regression/re_sql/tests/test_resql.py +++ b/web/regression/re_sql/tests/test_resql.py @@ -236,9 +236,6 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator): elif self.check_precondition( scenario['precondition_sql'], False): skip_test_case = False - elif 'pg_driver' in scenario and\ - scenario['pg_driver'] != PG_DEFAULT_DRIVER: - skip_test_case = True else: skip_test_case = False