pgadmin4/web/pgadmin/utils/driver/psycopg3/connection.py

1913 lines
66 KiB
Python

##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2025, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
"""
Implementation of Connection.
It is a wrapper around the actual psycopg3 driver, and connection
object.
"""
import os
import secrets
import datetime
import asyncio
import copy
from collections import deque
import psycopg
from flask import g, current_app
from flask_babel import gettext
from flask_security import current_user
from pgadmin.utils.crypto import decrypt
from psycopg._encodings import py_codecs as encodings
import config
from pgadmin.model import User
from pgadmin.utils.exception import ConnectionLost, CryptKeyMissing
from pgadmin.utils import get_complete_file_path
from ..abstract import BaseConnection
from .cursor import DictCursor, AsyncDictCursor, AsyncDictServerCursor
from .typecast import register_global_typecasters,\
register_string_typecasters, register_binary_typecasters, \
register_array_to_string_typecasters, ALL_JSON_TYPES
from .encoding import get_encoding, configure_driver_encodings
from pgadmin.utils import csv_lib as csv
from pgadmin.utils.master_password import get_crypt_key
from io import StringIO
from pgadmin.utils.locker import ConnectionLocker
from pgadmin.utils.driver import get_driver
# On Windows, Psycopg is not compatible with the default ProactorEventLoop.
# So, setting to SelectorEventLoop.
if os.name == 'nt':
asyncio.set_event_loop_policy(
asyncio.WindowsSelectorEventLoopPolicy()
)
_ = gettext
# Register global type caster which will be applicable to all connections.
register_global_typecasters()
configure_driver_encodings(encodings)
class Connection(BaseConnection):
"""
class Connection(object)
A wrapper class, which wraps the psycopg3 connection object, and
delegate the execution to the actual connection object, when required.
Methods:
-------
* connect(**kwargs)
- Connect the PostgreSQL/EDB Postgres Advanced Server using the psycopg3
driver
* execute_scalar(query, params, formatted_exception_msg)
- Execute the given query and returns single datum result
* execute_async(query, params, formatted_exception_msg)
- Execute the given query asynchronously and returns result.
* execute_void(query, params, formatted_exception_msg)
- Execute the given query with no result.
* execute_2darray(query, params, formatted_exception_msg)
- Execute the given query and returns the result as a 2 dimensional
array.
* execute_dict(query, params, formatted_exception_msg)
- Execute the given query and returns the result as an array of dict
(column name -> value) format.
* connected()
- Get the status of the connection.
Returns True if connected, otherwise False.
* reset()
- Reconnect the database server (if possible)
* transaction_status()
- Transaction Status
* ping()
- Ping the server.
* _release()
- Release the connection object of psycopg3
* _reconnect()
- Attempt to reconnect to the database
* _wait(conn)
- This method is used to wait for asynchronous connection. This is a
blocking call.
* _wait_timeout(conn)
- This method is used to wait for asynchronous connection with timeout.
This is a non blocking call.
* poll(formatted_exception_msg)
- This method is used to poll the data of query running on asynchronous
connection.
* status_message()
- Returns the status message returned by the last command executed on
the server.
* rows_affected()
- Returns the no of rows affected by the last command executed on
the server.
* cancel_transaction(conn_id, did=None)
- This method is used to cancel the transaction for the
specified connection id and database id.
* messages()
- Returns the list of messages/notices sends from the PostgreSQL database
server.
* _formatted_exception_msg(exception_obj, formatted_msg)
- This method is used to parse the psycopg.Error object and returns the
formatted error message if flag is set to true else return
normal error message.
* check_notifies(required_polling)
- Check for the notify messages by polling the connection or after
execute is there in notifies.
* get_notifies()
- This function will returns list of notifies received from database
server.
* pq_encrypt_password_conn()
- This function will return the encrypted password for database server
- greater than or equal to 10.
"""
UNAUTHORIZED_REQUEST = gettext("Unauthorized request.")
CURSOR_NOT_FOUND = \
gettext("Cursor could not be found for the async connection.")
ARGS_STR = "{0}#{1}"
def __init__(self, manager, conn_id, db, **kwargs):
assert (manager is not None)
assert (conn_id is not None)
auto_reconnect = kwargs.get('auto_reconnect', True)
async_ = kwargs.get('async_', 0)
use_binary_placeholder = kwargs.get('use_binary_placeholder', False)
array_to_string = kwargs.get('array_to_string', False)
self.conn_id = conn_id
self.manager = manager
self.db = db if db is not None else manager.db
self.conn = None
self.auto_reconnect = auto_reconnect
self.async_ = async_
self.__async_cursor = None
self.__async_query_id = None
self.__async_query_error = None
self.__backend_pid = None
self.execution_aborted = False
self.row_count = 0
self.__notices = None
self.__notifies = None
self.password = None
# This flag indicates the connection status (connected/disconnected).
self.wasConnected = False
# This flag indicates the connection reconnecting status.
self.reconnecting = False
self.use_binary_placeholder = use_binary_placeholder
self.array_to_string = array_to_string
self.qtLiteral = get_driver(config.PG_DEFAULT_DRIVER).qtLiteral
self._autocommit = True
super(Connection, self).__init__()
def as_dict(self):
"""
Returns the dictionary object representing this object.
"""
# In case, it cannot be auto reconnectable, or already been released,
# then we will return None.
if not self.auto_reconnect and not self.conn:
return None
res = dict()
res['conn_id'] = self.conn_id
res['database'] = self.db
res['async_'] = self.async_
res['wasConnected'] = self.wasConnected
res['auto_reconnect'] = self.auto_reconnect
res['use_binary_placeholder'] = self.use_binary_placeholder
res['array_to_string'] = self.array_to_string
return res
def __repr__(self):
return "PG Connection: {0} ({1}) -> {2} (ajax:{3})".format(
self.conn_id, self.db,
'Connected' if self.conn and not self.conn.closed else
"Disconnected",
self.async_
)
def __str__(self):
return self.__repr__()
def _check_user_password(self, kwargs):
"""
Check user and password.
"""
password = None
encpass = None
is_update_password = True
if 'user' in kwargs and kwargs['password']:
password = kwargs['password']
kwargs.pop('password')
is_update_password = False
else:
if 'encpass' in kwargs:
encpass = kwargs['encpass']
else:
encpass = kwargs['password'] if 'password' in kwargs else None
return password, encpass, is_update_password
def _decode_password(self, encpass, manager, password, crypt_key):
if encpass:
# Fetch Logged in User Details.
user = User.query.filter_by(id=current_user.id).first()
if user is None:
return True, self.UNAUTHORIZED_REQUEST, password
try:
password = decrypt(encpass, crypt_key)
# password is in bytes, for python3 we need it in string
if isinstance(password, bytes):
password = password.decode()
except Exception as e:
manager.stop_ssh_tunnel()
current_app.logger.exception(e)
return True, \
_(
"Failed to decrypt the saved password.\nError: {0}"
).format(str(e)), password
return False, '', password
def connect(self, **kwargs):
if self.conn:
if self.conn.closed:
self.conn = None
else:
return True, None
manager = self.manager
crypt_key_present, crypt_key = get_crypt_key()
if not crypt_key_present:
raise CryptKeyMissing()
password, encpass, is_update_password = \
self._check_user_password(kwargs)
passfile = kwargs['passfile'] if 'passfile' in kwargs else None
tunnel_password = kwargs['tunnel_password'] if 'tunnel_password' in \
kwargs else ''
# Check SSH Tunnel needs to be created
if manager.use_ssh_tunnel == 1 and not manager.tunnel_created:
status, error = manager.create_ssh_tunnel(tunnel_password)
if not status:
return False, error
# Check SSH Tunnel is alive or not.
if manager.use_ssh_tunnel == 1:
manager.check_ssh_tunnel_alive()
if is_update_password:
if encpass is None:
encpass = self.password or getattr(manager, 'password', None)
self.password = encpass
# Reset the existing connection password
if self.reconnecting is not False:
self.password = None
if not crypt_key_present:
raise CryptKeyMissing()
is_error, errmsg, password = self._decode_password(
encpass, manager, password, crypt_key)
if is_error:
return False, errmsg
# If no password credential is found then connect request might
# come from Query tool, ViewData grid, debugger etc tools.
# we will check for pgpass file availability from connection manager
# if it's present then we will use it
if not password and not encpass and not passfile:
passfile = manager.get_connection_param_value('passfile')
if manager.passexec:
password = manager.passexec.get()
try:
database = self.db
if 'user' in kwargs and kwargs['user']:
user = kwargs['user']
else:
user = manager.user
conn_id = self.conn_id
import os
os.environ['PGAPPNAME'] = '{0} - {1}'.format(
config.APP_NAME, conn_id)
ssl_key = get_complete_file_path(
manager.get_connection_param_value('sslkey'))
sslmode = manager.get_connection_param_value('sslmode')
if ssl_key and sslmode in \
['require', 'verify-ca', 'verify-full']:
ssl_key_file_permission = \
int(oct(os.stat(ssl_key).st_mode)[-3:])
if ssl_key_file_permission > 600:
os.chmod(ssl_key, 0o600)
with ConnectionLocker(manager.kerberos_conn):
# Create the connection string
connection_string = manager.create_connection_string(
database, user, password)
if self.async_:
autocommit = True
if 'auto_commit' in kwargs:
autocommit = kwargs['auto_commit']
async def connectdbserver():
return await psycopg.AsyncConnection.connect(
connection_string,
cursor_factory=AsyncDictCursor,
autocommit=autocommit,
prepare_threshold=manager.prepare_threshold
)
pg_conn = asyncio.run(connectdbserver())
pg_conn.server_cursor_factory = AsyncDictServerCursor
else:
pg_conn = psycopg.Connection.connect(
connection_string,
cursor_factory=DictCursor,
prepare_threshold=manager.prepare_threshold)
except psycopg.Error as e:
manager.stop_ssh_tunnel()
if hasattr(e, 'pgerror'):
msg = e.pgerror
elif e.diag.message_detail:
msg = e.diag.message_detail
else:
msg = str(e)
current_app.logger.info(
"Failed to connect to the database server(#{server_id}) for "
"connection ({conn_id}) with error message as below"
":{msg}".format(
server_id=self.manager.sid,
conn_id=conn_id,
msg=msg
)
)
return False, msg
# Overwrite connection notice attr to support
# more than 50 notices at a time
pg_conn.notices = deque([], self.ASYNC_NOTICE_MAXLENGTH)
pg_conn.add_notify_handler(self.check_notifies)
pg_conn.add_notice_handler(self.get_notices)
self.conn = pg_conn
self.wasConnected = True
try:
status, msg = self._initialize(conn_id, **kwargs)
except Exception as e:
manager.stop_ssh_tunnel()
current_app.logger.exception(e)
self.conn = None
if not self.reconnecting:
self.wasConnected = False
raise e
if status and is_update_password:
manager._update_password(encpass)
else:
if not self.reconnecting and is_update_password:
self.wasConnected = False
return status, msg
def _set_auto_commit(self, kwargs):
"""
autocommit flag does not work with asynchronous connections.
By default asynchronous connection runs in autocommit mode.
:param kwargs:
:return:
"""
if self.async_ == 0:
if 'autocommit' in kwargs and kwargs['autocommit'] is False:
self.conn.autocommit = False
else:
self.conn.autocommit = True
def _set_role(self, manager, cur, conn_id, **kwargs):
"""
Set role
:param manager:
:param cur:
:param conn_id:
:return:
"""
is_set_role = False
role = None
status = None
if 'role' in kwargs and kwargs['role']:
is_set_role = True
role = kwargs['role']
elif manager.role:
is_set_role = True
role = manager.role
if is_set_role:
_query = "SELECT rolname from pg_roles WHERE rolname = {0}" \
"".format(self.qtLiteral(role, self.conn))
_status, res = self.execute_scalar(_query)
if res:
status = self._execute(cur, "SET ROLE TO {0}".format(
self.qtLiteral(role, self.conn)))
else:
# If role is not found then set the status to role
# for showing the proper error message
status = role
if status is not None:
self.conn.close()
self.conn = None
current_app.logger.error(
"Connect to the database server (#{server_id}) for "
"connection ({conn_id}), but - failed to setup the role "
" {msg}".format(
server_id=self.manager.sid,
conn_id=conn_id,
msg=status
)
)
return True, \
_(
"Failed to setup the role \n{0}"
).format(status)
return False, ''
def _execute(self, cur, query, params=None):
formatted_exception_msg = self._formatted_exception_msg
try:
self.__internal_blocking_execute(cur, query, params)
except psycopg.Error as pe:
cur.close_cursor()
return formatted_exception_msg(pe, False)
return None
def _initialize(self, conn_id, **kwargs):
self.execution_aborted = False
self.__backend_pid = self.conn.info.backend_pid
setattr(g, self.ARGS_STR.format(
self.manager.sid,
self.conn_id.encode('utf-8')
), None)
register_string_typecasters(self.conn)
manager = self.manager
# autocommit flag does not work with asynchronous connections.
# By default asynchronous connection runs in autocommit mode.
self._set_auto_commit(kwargs)
if self.array_to_string:
register_array_to_string_typecasters(self.conn)
# Register type casters for binary data only after registering array to
# string type casters.
if self.use_binary_placeholder:
register_binary_typecasters(self.conn)
#
postgres_encoding, self.python_encoding = \
get_encoding(self.conn.info.encoding)
status, cur = self.__cursor()
# Note that we use 'UPDATE pg_settings' for setting bytea_output as a
# convenience hack for those running on old, unsupported versions of
# PostgreSQL 'cos we're nice like that.
status = self._execute(
cur,
"SET DateStyle=ISO; "
"SET client_min_messages=notice; "
"SELECT set_config('bytea_output','hex',false)"
" FROM pg_show_all_settings()"
" WHERE name = 'bytea_output'; "
"SET client_encoding='{0}';".format(postgres_encoding)
)
if status is not None:
self.conn.close()
self.conn = None
return False, status
is_error, errmsg = self._set_role(manager, cur, conn_id, **kwargs)
if is_error:
return False, errmsg
# Check database version every time on reconnection
status = self._execute(cur, "SELECT version()")
if status is not None:
self.conn.close()
self.conn = None
self.wasConnected = False
current_app.logger.error(
"Failed to fetch the version information on the "
"established connection to the database server "
"(#{server_id}) for '{conn_id}' with below error "
"message:{msg}".format(
server_id=self.manager.sid,
conn_id=conn_id,
msg=status)
)
return False, status
if cur.rowcount > 0:
row = cur.fetchmany(1)[0]
manager.ver = row['version']
manager.sversion = self.conn.info.server_version
status = self._execute(cur, """
SELECT
db.oid as did, db.datname, db.datallowconn,
pg_encoding_to_char(db.encoding) AS serverencoding,
has_database_privilege(db.oid, 'CREATE') as cancreate,
datistemplate
FROM
pg_catalog.pg_database db
WHERE db.datname = current_database()""")
if status is None:
manager.db_info = manager.db_info or dict()
if cur.rowcount > 0:
res = cur.fetchmany(1)[0]
manager.db_info[res['did']] = res.copy()
# We do not have database oid for the maintenance database.
if len(manager.db_info) == 1:
manager.did = res['did']
if manager.sversion >= 120000:
status = self._execute(cur, """
SELECT
gss_authenticated, encrypted
FROM
pg_catalog.pg_stat_gssapi
WHERE pid = pg_backend_pid()""")
if status is None and cur.get_rowcount() > 0:
res_enc = cur.fetchmany(1)[0]
manager.db_info[res['did']]['gss_authenticated'] =\
res_enc['gss_authenticated']
manager.db_info[res['did']]['gss_encrypted'] = \
res_enc['encrypted']
if len(manager.db_info) == 1:
manager.gss_authenticated = \
res_enc['gss_authenticated']
manager.gss_encrypted = res_enc['encrypted']
self._set_user_info(cur, manager, **kwargs)
self._set_server_type_and_password(kwargs, manager)
ret_msg = self.execute_post_connection_sql(cur, manager)
manager.update_session()
return True, ret_msg
def _set_user_info(self, cur, manager, **kwargs):
"""
Set user info.
:param cur:
:param manager:
:return:
"""
status = self._execute(cur, """
SELECT
roles.oid as id, roles.rolname as name,
roles.rolsuper as is_superuser,
CASE WHEN roles.rolsuper THEN true ELSE roles.rolcreaterole END as
can_create_role,
CASE WHEN roles.rolsuper THEN true
ELSE roles.rolcreatedb END as can_create_db,
CASE WHEN 'pg_signal_backend'=ANY(ARRAY(WITH RECURSIVE cte AS (
SELECT pg_roles.oid,pg_roles.rolname FROM pg_roles
WHERE pg_roles.oid = roles.oid
UNION ALL
SELECT m.roleid,pgr.rolname FROM cte cte_1
JOIN pg_auth_members m ON m.member = cte_1.oid
JOIN pg_roles pgr ON pgr.oid = m.roleid)
SELECT rolname FROM cte)) THEN True
ELSE False END as can_signal_backend
FROM
pg_catalog.pg_roles as roles
WHERE
rolname = current_user""")
if status is None and 'user' not in kwargs:
manager.user_info = dict()
if cur.get_rowcount() > 0:
manager.user_info = cur.fetchmany(1)[0]
def _set_server_type_and_password(self, kwargs, manager):
"""
Set server type
:param kwargs:
:param manager:
:return:
"""
if 'password' in kwargs:
manager.password = kwargs['password']
server_types = None
if 'server_types' in kwargs and isinstance(
kwargs['server_types'], list):
server_types = manager.server_types = kwargs['server_types']
if server_types is None:
from pgadmin.browser.server_groups.servers.types import ServerType
server_types = ServerType.types()
for st in server_types:
if st.stype == 'ppas':
if st.instance_of(manager.ver):
manager.server_type = st.stype
manager.server_cls = st
break
else:
if st.instance_of():
manager.server_type = st.stype
manager.server_cls = st
break
def execute_post_connection_sql(self, cur, manager):
# Execute post connection SQL if provided in the server dialog
errmsg = None
if manager.post_connection_sql and manager.post_connection_sql != '':
status = self._execute(cur, manager.post_connection_sql)
if status is not None:
errmsg = gettext(("Failed to execute the post connection SQL "
"with below error message:\n{msg}").format(
msg=status))
current_app.logger.error(errmsg)
return errmsg
def __cursor(self, server_cursor=False, scrollable=False):
if not get_crypt_key()[0] and config.SERVER_MODE:
raise CryptKeyMissing()
# Check SSH Tunnel is alive or not. If used by the database
# server for the connection.
if self.manager.use_ssh_tunnel == 1:
self.manager.check_ssh_tunnel_alive()
if self.wasConnected is False:
raise ConnectionLost(
self.manager.sid,
self.db,
None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
)
cur = getattr(g, self.ARGS_STR.format(
self.manager.sid,
self.conn_id.encode('utf-8')
), None)
if self.connected() and cur and not cur.closed:
if not server_cursor or (
server_cursor and type(cur) is AsyncDictServerCursor):
return True, cur
if not self.connected():
errmsg = ""
current_app.logger.warning(
"Connection to database server (#{server_id}) for the "
"connection - '{conn_id}' has been lost.".format(
server_id=self.manager.sid,
conn_id=self.conn_id
)
)
if self.auto_reconnect and not self.reconnecting:
self.__attempt_execution_reconnect(None)
else:
raise ConnectionLost(
self.manager.sid,
self.db,
None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
)
try:
if server_cursor:
# Providing name to cursor will create server side cursor.
cursor_name = "CURSOR:{0}".format(self.conn_id)
self.conn.server_cursor_factory = AsyncDictServerCursor
cur = self.conn.cursor(
name=cursor_name,
scrollable=scrollable
)
else:
cur = self.conn.cursor(scrollable=scrollable)
except psycopg.Error as pe:
current_app.logger.exception(pe)
errmsg = gettext(
"Failed to create cursor for psycopg3 connection with error "
"message for the server#{1}:{2}:\n{0}"
).format(
str(pe), self.manager.sid, self.db
)
current_app.logger.error(errmsg)
if self.conn.closed:
self.conn = None
if self.auto_reconnect and not self.reconnecting:
current_app.logger.info(
gettext(
"Attempting to reconnect to the database server "
"(#{server_id}) for the connection - '{conn_id}'."
).format(
server_id=self.manager.sid,
conn_id=self.conn_id
)
)
return self.__attempt_execution_reconnect(
self.__cursor, server_cursor
)
else:
raise ConnectionLost(
self.manager.sid,
self.db,
None if self.conn_id[0:3] == 'DB:'
else self.conn_id[5:]
)
setattr(
g, self.ARGS_STR.format(
self.manager.sid, self.conn_id.encode('utf-8')
), cur
)
return True, cur
def reset_cursor_at(self, position):
"""
This function is used to reset the cursor at the given position
"""
cur = self.__async_cursor
if not cur:
current_app.logger.log(
25,
'Cursor not found in reset_cursor_at method')
try:
cur.scroll(position, mode='absolute')
except psycopg.Error:
# bypassing the error as cursor tried to scroll on the
# specified position, but end of records found
current_app.logger.log(
25,
'Failed to reset cursor in reset_cursor_at method')
except IndexError as e:
current_app.logger.log(
25,
'Psycopg3 Cursor: {0}'.format(str(e)))
def __internal_blocking_execute(self, cur, query, params):
"""
This function executes the query using cursor's execute function,
but in case of asynchronous connection we need to wait for the
transaction to be completed. If self.async_ is 1 then it is a
blocking call.
Args:
cur: Cursor object
query: SQL query to run.
params: Extra parameters
"""
query = query.encode(self.python_encoding)
cur.execute(query, params)
def execute_on_server_as_csv(self, records=2000):
"""
To fetch query result and generate CSV output
Args:
params: Additional parameters
records: Number of initial records
Returns:
Generator response
"""
cur = self.__async_cursor
if not cur:
return False, self.CURSOR_NOT_FOUND
if self.conn.pgconn.connect_poll() != 3:
return False, gettext(
"Asynchronous query execution/operation underway."
)
encoding = self.python_encoding
query = None
try:
query = str(cur.query, encoding) \
if cur and cur.query is not None else None
except Exception:
current_app.logger.warning('Error encoding query with {0}'.format(
encoding))
current_app.logger.log(
25,
"Execute (with server cursor) by {pga_user} on "
"{db_user}@{db_host}/{db_name} #{server_id} - "
"{conn_id} (Query-id: {query_id}):\n{query}".format(
pga_user=current_user.email,
db_user=self.conn.info.user,
db_host=self.conn.info.host,
db_name=self.conn.info.dbname,
server_id=self.manager.sid,
conn_id=self.conn_id,
query=query,
query_id=self.__async_query_id
)
)
# http://initd.org/psycopg/docs/cursor.html#cursor.description
# to avoid no-op
if cur.description is None:
return False, \
gettext('The query executed did not return any data.')
def handle_null_values(results, replace_nulls_with):
"""
This function is used to replace null values with the given string
:param results:
:param replace_nulls_with: null values will be replaced by this
string.
:return: modified result
"""
temp_results = []
for row in results:
res = dict()
for k, v in row.items():
if v is None:
res[k] = replace_nulls_with
else:
res[k] = v
temp_results.append(res)
results = temp_results
return results
def gen(conn_obj, trans_obj, quote='strings', quote_char="'",
field_separator=',', replace_nulls_with=None):
try:
cur.scroll(0, mode='absolute')
except Exception as e:
print(str(e))
results = cur.fetchmany(records)
if not results:
yield gettext('The query executed did not return any data.')
return
header = []
json_columns = []
for c in cur.ordered_description():
# This is to handle the case in which column name is non-ascii
column_name = c.to_dict()['name']
header.append(column_name)
if c.to_dict()['type_code'] in ALL_JSON_TYPES:
json_columns.append(column_name)
res_io = StringIO()
if quote == 'strings':
quote = csv.QUOTE_NONNUMERIC
elif quote == 'all':
quote = csv.QUOTE_ALL
else:
quote = csv.QUOTE_NONE
csv_writer = csv.DictWriter(
res_io, fieldnames=header, delimiter=field_separator,
quoting=quote,
quotechar=quote_char,
replace_nulls_with=replace_nulls_with
)
csv_writer.writeheader()
# Replace the null values with given string if configured.
if replace_nulls_with is not None:
results = handle_null_values(results, replace_nulls_with)
csv_writer.writerows(results)
yield res_io.getvalue()
while True:
results = cur.fetchmany(records)
if not results:
break
res_io = StringIO()
csv_writer = csv.DictWriter(
res_io, fieldnames=header, delimiter=field_separator,
quoting=quote,
quotechar=quote_char,
replace_nulls_with=replace_nulls_with
)
# Replace the null values with given string if configured.
if replace_nulls_with is not None:
results = handle_null_values(results, replace_nulls_with)
csv_writer.writerows(results)
yield res_io.getvalue()
try:
# try to reset the cursor scroll back to where it was,
# bypass error, if cannot scroll back
rows_fetched_from = trans_obj.get_fetched_row_cnt()
cur.scroll(rows_fetched_from, mode='absolute')
except psycopg.Error:
# bypassing the error as cursor tried to scroll on the
# specified position, but end of records found
pass
except Exception:
pass
# Registering back type caster for large size data types to string
# which was unregistered at starting
register_string_typecasters(self.conn)
return True, gen, self
def execute_scalar(self, query, params=None,
formatted_exception_msg=False):
status, cur = self.__cursor()
self.row_count = 0
if not status:
return False, str(cur)
query_id = str(secrets.choice(range(1, 9999999)))
current_app.logger.log(
25,
"Execute (scalar) by {pga_user} on "
"{db_user}@{db_host}/{db_name} #{server_id} - "
"{conn_id} (Query-id: {query_id}):\n{query}".format(
pga_user=current_user.email,
db_user=self.conn.info.user,
db_host=self.conn.info.host,
db_name=self.conn.info.dbname,
server_id=self.manager.sid,
conn_id=self.conn_id,
query=query,
query_id=query_id
)
)
try:
self.__internal_blocking_execute(cur, query, params)
except psycopg.Error as pe:
cur.close_cursor()
if not self.connected():
if self.auto_reconnect and not self.reconnecting:
return self.__attempt_execution_reconnect(
self.execute_scalar, query, params,
formatted_exception_msg
)
raise ConnectionLost(
self.manager.sid,
self.db,
None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
)
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
current_app.logger.error(
"Failed to execute query (execute_scalar) for the server "
"#{server_id} - {conn_id} (Query-id: {query_id}):\n"
"Error Message:{errmsg}".format(
server_id=self.manager.sid,
conn_id=self.conn_id,
errmsg=errmsg,
query_id=query_id
)
)
return False, errmsg
except Exception:
print("EXCEPTION.....")
# If multiple queries are run, make sure to reach
# the last query result
while cur.nextset():
pass # This loop is empty
self.row_count = cur.get_rowcount()
if cur.get_rowcount() > 0:
res = cur.fetchone()
if len(res) > 0:
return True, res[0]
return True, None
def release_async_cursor(self):
if self.__async_cursor and not self.__async_cursor.closed:
try:
self.__async_cursor.close_cursor()
except Exception as e:
print("EXception==", str(e))
def execute_async(self, query, params=None, formatted_exception_msg=True,
server_cursor=False):
"""
This function executes the given query asynchronously and returns
result.
Args:
query: SQL query to run.
params: extra parameters to the function
formatted_exception_msg: if True then function return the
formatted exception message
"""
self.__async_cursor = None
self.__async_query_error = None
status, cur = self.__cursor(scrollable=True,
server_cursor=server_cursor)
if not status:
return False, str(cur)
query_id = str(secrets.choice(range(1, 9999999)))
encoding = self.python_encoding
query = query.encode(encoding)
self.__async_cursor = cur
self.__async_query_id = query_id
current_app.logger.log(
25,
"Execute (async) by {pga_user} on {db_user}@{db_host}/{db_name} "
"#{server_id} - {conn_id} (Query-id: "
"{query_id}):\n{query}".format(
pga_user=current_user.username,
db_user=self.conn.info.user,
db_host=self.conn.info.host,
db_name=self.conn.info.dbname,
server_id=self.manager.sid,
conn_id=self.conn_id,
query=query.decode(encoding),
query_id=query_id
)
)
try:
self.__notices = []
self.__notifies = []
self.execution_aborted = False
cur.execute(query, params)
except psycopg.Error as pe:
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
current_app.logger.error(
"Failed to execute query (execute_async) for the server "
"#{server_id} - {conn_id}(Query-id: {query_id}):\n"
"Error Message:{errmsg}".format(
server_id=self.manager.sid,
conn_id=self.conn_id,
errmsg=errmsg,
query_id=query_id
)
)
self.__async_query_error = errmsg
if self.conn and self.conn.closed or self.is_disconnected(pe):
raise ConnectionLost(
self.manager.sid,
self.db,
None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
)
return False, errmsg
return True, None
def execute_void(self, query, params=None, formatted_exception_msg=False):
"""
This function executes the given query with no result.
Args:
query: SQL query to run.
params: extra parameters to the function
formatted_exception_msg: if True then function return the
formatted exception message
"""
status, cur = self.__cursor()
if not status:
return False, str(cur)
query_id = str(secrets.choice(range(1, 9999999)))
current_app.logger.log(
25,
"Execute (void) by {pga_user} on "
"{db_user}@{db_host}/{db_name} #{server_id} - "
"{conn_id} (Query-id: {query_id}):\n{query}".format(
pga_user=current_user.email,
db_user=self.conn.info.user,
db_host=self.conn.info.host,
db_name=self.conn.info.dbname,
server_id=self.manager.sid,
conn_id=self.conn_id,
query=query,
query_id=query_id
)
)
try:
self.__internal_blocking_execute(cur, query, params)
except psycopg.Error as pe:
cur.close_cursor()
if not self.connected():
if self.auto_reconnect and not self.reconnecting:
return self.__attempt_execution_reconnect(
self.execute_void, query, params,
formatted_exception_msg
)
raise ConnectionLost(
self.manager.sid,
self.db,
None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
)
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
current_app.logger.error(
"Failed to execute query (execute_void) for the server "
"#{server_id} - {conn_id}(Query-id: {query_id}):\n"
"Error Message:{errmsg}".format(
server_id=self.manager.sid,
conn_id=self.conn_id,
errmsg=errmsg,
query_id=query_id
)
)
return False, errmsg
return True, None
def __attempt_execution_reconnect(self, fn, *args, **kwargs):
self.reconnecting = True
setattr(g, self.ARGS_STR.format(
self.manager.sid,
self.conn_id.encode('utf-8')
), None)
try:
status, res = self.connect()
if status:
if fn:
status, res = fn(*args, **kwargs)
self.reconnecting = False
return status, res
except Exception as e:
current_app.logger.exception(e)
self.reconnecting = False
current_app.logger.warning(
"Failed to reconnect the database server "
"(Server #{server_id}, Connection #{conn_id})".format(
server_id=self.manager.sid,
conn_id=self.conn_id
)
)
self.reconnecting = False
raise ConnectionLost(
self.manager.sid,
self.db,
None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
)
def execute_2darray(self, query, params=None,
formatted_exception_msg=False):
status, cur = self.__cursor()
self.row_count = 0
if not status:
return False, str(cur)
query_id = str(secrets.choice(range(1, 9999999)))
current_app.logger.log(
25,
"Execute (2darray) by {pga_user} on "
"{db_user}@{db_host}/{db_name} #{server_id} - "
"{conn_id} (Query-id: {query_id}):\n{query}".format(
pga_user=current_user.email,
db_user=self.conn.info.user,
db_host=self.conn.info.host,
db_name=self.conn.info.dbname,
server_id=self.manager.sid,
conn_id=self.conn_id,
query=query,
query_id=query_id
)
)
try:
self.__internal_blocking_execute(cur, query, params)
except psycopg.Error as pe:
cur.close_cursor()
if not self.connected() and self.auto_reconnect and \
not self.reconnecting:
return self.__attempt_execution_reconnect(
self.execute_2darray, query, params,
formatted_exception_msg
)
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
current_app.logger.error(
"Failed to execute query (execute_2darray) for the server "
"#{server_id} - {conn_id} (Query-id: {query_id}):\n"
"Error Message:{errmsg}".format(
server_id=self.manager.sid,
conn_id=self.conn_id,
errmsg=errmsg,
query_id=query_id
)
)
return False, errmsg
# Get Resultset Column Name, Type and size
columns = cur.description and [
desc.to_dict() for desc in cur.ordered_description()
] or []
rows = []
self.row_count = cur.get_rowcount()
if cur.get_rowcount() > 0:
rows = cur.fetchall()
return True, {'columns': columns, 'rows': rows}
def execute_dict(self, query, params=None, formatted_exception_msg=False):
status, cur = self.__cursor()
self.row_count = 0
if not status:
return False, str(cur)
query_id = str(secrets.choice(range(1, 9999999)))
current_app.logger.log(
25,
"Execute (dict) by {pga_user} on "
"{db_user}@{db_host}/{db_name} #{server_id} - "
"{conn_id} (Query-id: {query_id}):\n{query}".format(
pga_user=current_user.email,
db_user=self.conn.info.user,
db_host=self.conn.info.host,
db_name=self.conn.info.dbname,
server_id=self.manager.sid,
conn_id=self.conn_id,
query=query,
query_id=query_id
)
)
try:
self.__internal_blocking_execute(cur, query, params)
except psycopg.Error as pe:
cur.close_cursor()
if not self.connected():
if self.auto_reconnect and not self.reconnecting:
return self.__attempt_execution_reconnect(
self.execute_dict, query, params,
formatted_exception_msg
)
raise ConnectionLost(
self.manager.sid,
self.db,
None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
)
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
current_app.logger.error(
"Failed to execute query (execute_dict) for the server "
"#{server_id}- {conn_id} (Query-id: {query_id}):\n"
"Error Message:{errmsg}".format(
server_id=self.manager.sid,
conn_id=self.conn_id,
query_id=query_id,
errmsg=errmsg
)
)
return False, errmsg
# Get Resultset Column Name, Type and size
columns = cur.description and [
desc.to_dict() for desc in cur.ordered_description()
] or []
rows = []
self.row_count = cur.rowcount
# If multiple queries are run, make sure to reach
# the last query result
while cur.nextset():
pass # This loop is empty
if cur.get_rowcount() > 0:
rows = cur.fetchall()
return True, {'columns': columns, 'rows': rows}
def async_fetchmany_2darray(self, records=2000,
from_rownum=0, to_rownum=0,
formatted_exception_msg=False):
"""
User should poll and check if status is ASYNC_OK before calling this
function
Args:
records: no of records to fetch. use -1 to fetchall.
formatted_exception_msg:
for_download: if True, will fetch all records and reset the cursor
Returns:
"""
cur = self.__async_cursor
if not cur:
return False, self.CURSOR_NOT_FOUND
if not self.conn:
raise ConnectionLost(
self.manager.sid,
self.db,
None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
)
if self.conn.pgconn.is_busy():
return False, gettext(
"Asynchronous query execution/operation underway."
)
more_results = True
while more_results:
if cur.get_rowcount() > 0:
result = []
try:
if records == -1:
result = cur.fetchwindow(
from_rownum=0, to_rownum=cur.get_rowcount() - 1,
_tupples=True)
elif records is None:
result = cur.fetchwindow(from_rownum=from_rownum,
to_rownum=to_rownum,
_tupples=True)
else:
result = cur.fetchmany(records, _tupples=True)
except psycopg.ProgrammingError:
result = None
else:
# User performed operation which dose not produce record/s as
# result.
# for eg. DDL operations.
return True, None
more_results = cur.nextset()
return True, result
def connected(self):
if self.conn:
if not self.conn.closed:
return True
self.conn = None
return False
def _decrypt_password(self, manager):
"""
Decrypt password
:param manager: Manager for get password.
:return:
"""
password = getattr(manager, 'password', None)
if password:
# Fetch Logged in User Details.
user = User.query.filter_by(id=current_user.id).first()
if user is None:
return False, self.UNAUTHORIZED_REQUEST, password
crypt_key_present, crypt_key = get_crypt_key()
if not crypt_key_present:
return False, crypt_key, password
password = decrypt(password, crypt_key).decode()
return True, '', password
def reset(self):
if self.conn and self.conn.closed:
self.conn = None
pg_conn = None
manager = self.manager
is_return, return_value, password = self._decrypt_password(manager)
if is_return:
return False, return_value
try:
with ConnectionLocker(manager.kerberos_conn):
# Create the connection string
connection_string = manager.create_connection_string(
self.db, manager.user, password)
pg_conn = psycopg.connect(connection_string,
cursor_factory=DictCursor)
except psycopg.Error as e:
if hasattr(e, 'pgerror'):
msg = e.pgerror
elif hasattr(e, 'message'):
msg = e.message
elif e.diag.message_detail:
msg = e.diag.message_detail
else:
msg = str(e)
current_app.logger.error(
gettext(
"""
Failed to reset the connection to the server due to following error:
{0}"""
).Format(msg)
)
return False, msg
pg_conn.notices = deque([], self.ASYNC_NOTICE_MAXLENGTH)
self.conn = pg_conn
self.__backend_pid = pg_conn.info.backend_pid
return True, None
def transaction_status(self):
if self.conn and self.conn.info:
return self.conn.info.transaction_status
return None
def async_query_error(self):
return self.__async_query_error
def ping(self):
return self.execute_scalar('SELECT 1')
def _release(self):
if self.wasConnected:
if self.conn:
if self.async_ == 0:
self.conn.close()
elif self.async_ == 1:
self._close_async()
self.conn = None
self.password = None
self.wasConnected = False
def _close_async(self):
async def _close_conn(conn):
if conn:
await conn.close()
asyncio.run(_close_conn(self.conn))
def _wait(self, conn):
pass # This function is empty
def _wait_timeout(self, conn, time):
pass # This function is empty
def poll(self, formatted_exception_msg=False, no_result=False):
cur = self.__async_cursor
if self.conn and self.conn.info.transaction_status == 1:
status = 3
elif self.__async_query_error:
return False, self.__async_query_error
elif self.conn and self.conn.pgconn.error_message:
return False, self.conn.pgconn.error_message
else:
status = 1
if not cur or cur.closed:
return False, self.CURSOR_NOT_FOUND
result = None
self.row_count = 0
self.column_info = None
current_app.logger.log(
25,
"Polling result for (Query-id: {query_id})".format(
query_id=self.__async_query_id
)
)
more_result = True
while more_result:
if self.conn:
if cur.description is not None:
self.column_info = [desc.to_dict() for
desc in cur.ordered_description()]
pos = 0
if self.column_info:
for col in self.column_info:
col['pos'] = pos
pos += 1
else:
self.column_info = None
self.row_count = cur.get_rowcount()
if not no_result and cur.get_rowcount() > 0:
result = []
try:
result = cur.fetchall(_tupples=True)
except psycopg.ProgrammingError:
result = None
except psycopg.Error:
result = None
more_result = cur.nextset()
return status, result
def status_message(self):
"""
This function will return the status message returned by the last
command executed on the server.
"""
cur = self.__async_cursor
if not cur:
return self.CURSOR_NOT_FOUND
current_app.logger.log(
25,
"Status message for (Query-id: {query_id})".format(
query_id=self.__async_query_id
)
)
return cur.statusmessage
def rows_affected(self):
"""
This function will return the no of rows affected by the last command
executed on the server.
"""
return self.row_count
@property
def total_rows(self):
if self.__async_cursor is None:
return 0
return self.__async_cursor.rowcount
def get_column_info(self):
"""
This function will returns list of columns for last async sql command
executed on the server.
"""
return self.column_info
def cancel_transaction(self, conn_id, did=None):
"""
This function is used to cancel the running transaction
of the given connection id and database id using
PostgreSQL's pg_cancel_backend.
Args:
conn_id: Connection id
did: Database id (optional)
"""
cancel_conn = self.manager.connection(did=did, conn_id=conn_id)
query = """SELECT pg_cancel_backend({0});""".format(
cancel_conn.__backend_pid)
status = True
msg = ''
# if backend pid is same then create a new connection
# to cancel the query and release it.
if cancel_conn.__backend_pid == self.__backend_pid:
password = getattr(self.manager, 'password', None)
if password:
# Fetch Logged in User Details.
user = User.query.filter_by(id=current_user.id).first()
if user is None:
return False, self.UNAUTHORIZED_REQUEST
crypt_key_present, crypt_key = get_crypt_key()
if not crypt_key_present:
return False, crypt_key
password = decrypt(password, crypt_key).decode()
try:
with ConnectionLocker(self.manager.kerberos_conn):
connection_string = self.manager.create_connection_string(
self.db, self.manager.user, password)
pg_conn = psycopg.connect(connection_string,
cursor_factory=DictCursor)
# Get the cursor and run the query
cur = pg_conn.cursor()
cur.execute(query)
# Close the connection
pg_conn.close()
except psycopg.Error as e:
status = False
if hasattr(e, 'pgerror'):
msg = e.pgerror
elif e.diag.message_detail:
msg = e.diag.message_detail
else:
msg = str(e)
return status, msg
else:
if self.connected():
status, msg = self.execute_void(query)
if status:
cancel_conn.execution_aborted = True
else:
status = False
msg = gettext("Not connected to the database server.")
return status, msg
def messages(self):
"""
Returns the list of the messages/notices send from the database server.
"""
resp = []
if self.__notices is not None:
while self.__notices:
resp.append(self.__notices.pop(0))
if self.__notifies is None:
return resp
for notify in self.__notifies:
if notify.payload is not None and notify.payload != '':
notify_msg = gettext(
"Asynchronous notification \"{0}\" with payload \"{1}\" "
"received from server process with PID {2}\n"
).format(notify.channel, notify.payload, notify.pid)
else:
notify_msg = gettext(
"Asynchronous notification \"{0}\" received from "
"server process with PID {1}\n"
).format(notify.channel, notify.pid)
resp.append(notify_msg)
return resp
def _formatted_exception_msg(self, exception_obj, formatted_msg):
"""
This method is used to parse the psycopg.Error object and returns the
formatted error message if flag is set to true else return
normal error message.
Args:
exception_obj: exception object
formatted_msg: if True then function return the formatted exception
message
"""
if hasattr(exception_obj, 'pgerror'):
errmsg = exception_obj.pgerror
elif hasattr(exception_obj, 'diag') and \
hasattr(exception_obj.diag, 'message_detail') and\
exception_obj.diag.message_detail is not None:
errmsg = exception_obj.diag.message_primary + '\n' + \
exception_obj.diag.message_detail
else:
errmsg = str(exception_obj)
# if formatted_msg is false then return from the function
if not formatted_msg:
notices = self.get_notices()
return errmsg if notices == '' else notices + '\n' + errmsg
# Do not append if error starts with `ERROR:` as most pg related
# error starts with `ERROR:`
if not errmsg.startswith('ERROR:'):
errmsg = gettext('ERROR: ') + errmsg + ' \n\n'
if exception_obj.diag.severity is not None \
and exception_obj.diag.message_primary is not None:
ex_diag_message = "{0}: {1}".format(
exception_obj.diag.severity,
exception_obj.diag.message_primary
)
# If both errors are different then only append it
if errmsg and ex_diag_message and \
ex_diag_message.strip().strip('\n').lower() not in \
errmsg.strip().strip('\n').lower():
errmsg += ex_diag_message
elif exception_obj.diag.message_primary is not None:
message_primary = exception_obj.diag.message_primary
if message_primary.lower() not in errmsg.lower():
errmsg += message_primary
if exception_obj.diag.sqlstate is not None:
if not errmsg.endswith('\n'):
errmsg += '\n'
errmsg += gettext('SQL state: ')
errmsg += exception_obj.diag.sqlstate
if exception_obj.diag.message_detail is not None and \
'Detail:'.lower() not in errmsg.lower():
if not errmsg.endswith('\n'):
errmsg += '\n'
errmsg += gettext('Detail: ')
errmsg += exception_obj.diag.message_detail
if exception_obj.diag.message_hint is not None and \
'Hint:'.lower() not in errmsg.lower():
if not errmsg.endswith('\n'):
errmsg += '\n'
errmsg += gettext('Hint: ')
errmsg += exception_obj.diag.message_hint
if exception_obj.diag.statement_position is not None and \
'Character:'.lower() not in errmsg.lower():
if not errmsg.endswith('\n'):
errmsg += '\n'
errmsg += gettext('Character: ')
errmsg += exception_obj.diag.statement_position
if exception_obj.diag.context is not None and \
'Context:'.lower() not in errmsg.lower():
if not errmsg.endswith('\n'):
errmsg += '\n'
errmsg += gettext('Context: ')
errmsg += exception_obj.diag.context
notices = self.get_notices()
return errmsg if notices == '' else notices + '\n' + errmsg
#####
# As per issue reported on pgsycopg2 github repository link is shared below
# conn.closed is not reliable enough to identify the disconnection from the
# database server for some unknown reasons.
#
# (https://github.com/psycopg/psycopg2/issues/263)
#
# In order to resolve the issue, sqlalchamey follows the below logic to
# identify the disconnection. It relies on exception message to identify
# the error.
#
# Reference (MIT license):
# https://github.com/zzzeek/sqlalchemy/blob/master/lib/sqlalchemy/dialects/postgresql/psycopg2.py
#
def is_disconnected(self, err):
if self.conn and not self.conn.closed:
# checks based on strings. in the case that .closed
# didn't cut it, fall back onto these.
str_e = str(err).partition("\n")[0]
for msg in [
# these error messages from libpq: interfaces/libpq/fe-misc.c
# and interfaces/libpq/fe-secure.c.
'terminating connection',
'closed the connection',
'connection not open',
'could not receive data from server',
'could not send data to server',
'connection already closed',
'cursor already closed',
# not sure where this path is originally from, it may
# be obsolete. It really says "losed", not "closed".
'losed the connection unexpectedly',
# these can occur in newer SSL
'connection has been closed unexpectedly',
'SSL SYSCALL error: Bad file descriptor',
'SSL SYSCALL error: EOF detected',
'terminating connection due to administrator command'
]:
idx = str_e.find(msg)
if idx >= 0 and '"' not in str_e[:idx]:
return True
return False
return True
def check_notifies(self, n=None):
"""
Check for the notify messages by polling the connection or after
execute is there in notifies.
"""
if n:
if self.__notifies is None:
self.__notifies = []
self.__notifies.append(n)
def get_notifies(self):
"""
This function will returns list of notifies received from database
server.
"""
notifies = None
# Convert list of Notify objects into list of Dict.
if self.__notifies is not None and len(self.__notifies) > 0:
notifies = [{'recorded_time': str(datetime.datetime.now()),
'channel': notify.channel,
'payload': notify.payload,
'pid': notify.pid
} for notify in self.__notifies
]
self.__notifies = None
return notifies
def get_notices(self, diag=None):
"""
This function will returns the notices as string.
:return:
"""
notices = ''
# Check for notices.
if diag and hasattr(diag, 'message_primary'):
if self.__notices is None:
self.__notices = []
self.__notices.append(f"{diag.severity}:"
f" {diag.message_primary}\n")
if diag is None:
while self.__notices:
notices += self.__notices.pop(0)
return notices
def pq_encrypt_password_conn(self, password, user):
"""
This function will return the encrypted password for database server
:param password: password to be encrypted
:param user: user of the database server
:return:
"""
enc_password = None
if self.connected():
status, enc_algorithm = \
self.execute_scalar("SHOW password_encryption")
if status:
encoding = self.conn.info.encoding
enc_password = self.conn.pgconn.encrypt_password(
password.encode(encoding), user.encode(encoding),
enc_algorithm.encode(encoding)
).decode()
return enc_password
def mogrify(self, query, parameters):
"""
This function will return the sql query after parameters binding
:param query: sql query before parameters (variables) binding
:param parameters: query parameters / variables
:return:
"""
status, _ = self.__cursor()
if not status:
return None
else:
if parameters:
with psycopg.ClientCursor(self.conn) as _cur:
return _cur.mogrify(query, parameters)
else:
return query