From b3c625f135c80f48b3df05bf72e7c1b51ca8de5f Mon Sep 17 00:00:00 2001 From: Ashesh Vashi Date: Tue, 22 Mar 2016 15:35:11 +0000 Subject: [PATCH] Graceful server reconnections. --- web/pgadmin/utils/driver/psycopg2/__init__.py | 152 ++++++++++++++++-- 1 file changed, 136 insertions(+), 16 deletions(-) diff --git a/web/pgadmin/utils/driver/psycopg2/__init__.py b/web/pgadmin/utils/driver/psycopg2/__init__.py index 8bcba2170..db938bc08 100644 --- a/web/pgadmin/utils/driver/psycopg2/__init__.py +++ b/web/pgadmin/utils/driver/psycopg2/__init__.py @@ -13,7 +13,8 @@ psycopg2. It is a wrapper around the actual psycopg2 driver, and connection object. """ -from datetime import datetime +import datetime +import config import psycopg2 import psycopg2.extras @@ -118,6 +119,38 @@ class Connection(BaseConnection): super(Connection, self).__init__() + def as_dict(self): + """ + Returns the dictionary object representing this object. + """ + # In case, it can not be auto reconnectable, or already been released, + # then we will return None. + if not self.auto_reconnect and not self.conn: + return None + + res = dict() + res['conn_id'] = self.conn_id + res['database'] = self.db + res['async'] = self.async + + return res + + def __repr__(self): + return "PG Connection: {0} ({1}) -> {2} (ajax:{3})".format( + self.conn_id, self.db, + 'Connected' if self.conn and not self.conn.closed else + "Disconnected", + self.async + ) + + def __str__(self): + return "PG Connection: {0} ({1}) -> {2} (ajax:{3})".format( + self.conn_id, self.db, + 'Connected' if self.conn and not self.conn.closed else + "Disconnected", + self.async + ) + def connect(self, **kwargs): if self.conn: if self.conn.closed: @@ -156,7 +189,7 @@ class Connection(BaseConnection): try: import os - os.environ['PGAPPNAME'] = 'pgAdmin IV - {0}'.format(self.conn_id) + os.environ['PGAPPNAME'] = '{0} - {1}'.format(config.APP_NAME, self.conn_id) pg_conn = psycopg2.connect( host=mgr.host, port=mgr.port, @@ -192,17 +225,17 @@ Failed to connect to the database server(#{server_id}) for connection ({conn_id} self.conn = pg_conn self.__backend_pid = pg_conn.get_backend_pid() - # autocommit and client encoding not worked with asynchronous connection - # By default asynchronous connection runs in autocommit mode + # autocommit flag does not work with asynchronous connections. + # By default asynchronous connection runs in autocommit mode. if self.async == 0: self.conn.autocommit = True - self.conn.set_client_encoding("UNICODE") status, res = self.execute_scalar(""" SET DateStyle=ISO; SET client_min_messages=notice; SET bytea_output=escape; -""") +SET client_encoding='UNICODE';""") + if not status: self.conn.close() self.conn = None @@ -285,6 +318,8 @@ WHERE mgr.server_type = st.stype break + mgr.update_session() + return True, None def __cursor(self): @@ -817,7 +852,6 @@ class ServerManager(object): assert(server is not None) assert(isinstance(server, Server)) - self.module = None self.ver = None self.sversion = None self.server_type = None @@ -831,14 +865,37 @@ class ServerManager(object): self.password = server.password self.role = server.role self.ssl_mode = server.ssl_mode - self.pinged = datetime.now() + self.pinged = datetime.datetime.now() self.db_info = dict() for con in self.connections: self.connections[con]._release() + self.update_session() + self.connections = dict() + def as_dict(self): + """ + Returns a dictionary object representing the server manager. + """ + if self.ver is None or len(self.connections) == 0: + return None + + res = dict() + res['sid'] = self.sid + res['password'] = self.password + + connections = res['connections'] = dict() + + for conn_id in self.connections: + conn = self.connections[conn_id].as_dict() + + if conn is not None: + connections[conn_id] = conn + + return res + def ServerVersion(self): return self.ver @@ -861,7 +918,9 @@ class ServerManager(object): return int(int(self.sversion / 100) / 100) raise Exception("Information is not available!") - def connection(self, database=None, conn_id=None, auto_reconnect=True, did=None): + def connection( + self, database=None, conn_id=None, auto_reconnect=True, did=None + ): msg_active_conn = gettext( "Server has no active connection, please connect it first!" ) @@ -878,7 +937,8 @@ class ServerManager(object): if conn.connected(): status, res = conn.execute_dict(""" SELECT - db.oid as did, db.datname, db.datallowconn, pg_encoding_to_char(db.encoding) AS serverencoding, + db.oid as did, db.datname, db.datallowconn, + pg_encoding_to_char(db.encoding) AS serverencoding, has_database_privilege(db.oid, 'CREATE') as cancreate, datlastsysoid FROM pg_database db @@ -900,7 +960,7 @@ WHERE db.oid = {0}""".format(did)) my_id = ('CONN:' + str(conn_id)) if conn_id is not None else \ ('DB:' + str(database)) - self.pinged = datetime.now() + self.pinged = datetime.datetime.now() if my_id in self.connections: return self.connections[my_id] @@ -912,6 +972,38 @@ WHERE db.oid = {0}""".format(did)) return self.connections[my_id] + def _restore(self, data): + """ + Helps restoring to reconnect the auto-connect connections smoothly on + reload/restart of the app server.. + """ + # Hmm.. we will not honour this request, when I already have + # connections + if len(self.connections) != 0: + return + + # We need to know about the existing server variant supports during + # first connection for identifications. + from pgadmin.browser.server_groups.servers.types import ServerType + self.pinged = datetime.datetime.now() + + connections = data['connections'] + for conn_id in connections: + conn_info = connections[conn_id] + conn = self.connections[conn_info['conn_id']] = Connection( + self, conn_info['conn_id'], conn_info['database'], + True, conn_info['async'] + ) + + try: + conn.connect( + password=data['password'], + server_types=ServerType.types() + ) + except Exception as e: + current_app.logger.exception(e) + self.connections.pop(conn_info['conn_id']) + def release(self, database=None, conn_id=None, did=None): if did is not None: if did in self.db_info and 'datname' in self.db_info[did]: @@ -921,7 +1013,6 @@ WHERE db.oid = {0}""".format(did)) else: return False - my_id = ('CONN:' + str(conn_id)) if conn_id is not None else \ ('DB:' + str(database)) if database is not None else None @@ -936,6 +1027,8 @@ WHERE db.oid = {0}""".format(did)) self.server_type = None self.password = None + self.update_session() + return True else: return False @@ -949,8 +1042,23 @@ WHERE db.oid = {0}""".format(did)) self.server_type = None self.password = None + self.update_session() + return True + def update_session(self): + managers = session['__pgsql_server_managers'] \ + if '__pgsql_server_managers' in session else dict() + updated_mgr = self.as_dict() + + if not updated_mgr: + if self.sid in managers: + managers.pop(self.sid) + else: + managers[self.sid] = updated_mgr + session['__pgsql_server_managers'] = managers + + class Driver(BaseDriver): """ class Driver(BaseDriver): @@ -997,17 +1105,31 @@ class Driver(BaseDriver): assert (sid is not None and isinstance(sid, int)) managers = None - import datetime if session['_id'] not in self.managers: self.managers[session['_id']] = managers = dict() + if '__pgsql_server_managers' in session: + session_managers = session['__pgsql_server_managers'].copy() + session['__pgsql_server_managers'] = dict() + + for server_id in session_managers: + s = Server.query.filter_by(id=server_id).first() + + if not s: + continue + + manager = managers[str(server_id)] = ServerManager(s) + manager._restore(session_managers[server_id]) + manager.update_session() else: managers = self.managers[session['_id']] managers['pinged'] = datetime.datetime.now() if str(sid) not in managers: - from pgadmin.model import Server s = Server.query.filter_by(id=sid).first() + if not s: + return None + managers[str(sid)] = ServerManager(s) return managers[str(sid)] @@ -1073,8 +1195,6 @@ class Driver(BaseDriver): Release the connections for the sessions, which have not pinged the server for more than config.MAX_SESSION_IDLE_TIME. """ - import datetime - import config # Mininum session idle is 20 minutes max_idle_time = max(config.MAX_SESSION_IDLE_TIME or 60, 20)