Graceful server reconnections.
parent
3c366fafe7
commit
b3c625f135
|
@ -13,7 +13,8 @@ psycopg2. It is a wrapper around the actual psycopg2 driver, and connection
|
|||
object.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
import datetime
|
||||
import config
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
|
@ -118,6 +119,38 @@ class Connection(BaseConnection):
|
|||
|
||||
super(Connection, self).__init__()
|
||||
|
||||
def as_dict(self):
|
||||
"""
|
||||
Returns the dictionary object representing this object.
|
||||
"""
|
||||
# In case, it can not be auto reconnectable, or already been released,
|
||||
# then we will return None.
|
||||
if not self.auto_reconnect and not self.conn:
|
||||
return None
|
||||
|
||||
res = dict()
|
||||
res['conn_id'] = self.conn_id
|
||||
res['database'] = self.db
|
||||
res['async'] = self.async
|
||||
|
||||
return res
|
||||
|
||||
def __repr__(self):
|
||||
return "PG Connection: {0} ({1}) -> {2} (ajax:{3})".format(
|
||||
self.conn_id, self.db,
|
||||
'Connected' if self.conn and not self.conn.closed else
|
||||
"Disconnected",
|
||||
self.async
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return "PG Connection: {0} ({1}) -> {2} (ajax:{3})".format(
|
||||
self.conn_id, self.db,
|
||||
'Connected' if self.conn and not self.conn.closed else
|
||||
"Disconnected",
|
||||
self.async
|
||||
)
|
||||
|
||||
def connect(self, **kwargs):
|
||||
if self.conn:
|
||||
if self.conn.closed:
|
||||
|
@ -156,7 +189,7 @@ class Connection(BaseConnection):
|
|||
|
||||
try:
|
||||
import os
|
||||
os.environ['PGAPPNAME'] = 'pgAdmin IV - {0}'.format(self.conn_id)
|
||||
os.environ['PGAPPNAME'] = '{0} - {1}'.format(config.APP_NAME, self.conn_id)
|
||||
pg_conn = psycopg2.connect(
|
||||
host=mgr.host,
|
||||
port=mgr.port,
|
||||
|
@ -192,17 +225,17 @@ Failed to connect to the database server(#{server_id}) for connection ({conn_id}
|
|||
self.conn = pg_conn
|
||||
self.__backend_pid = pg_conn.get_backend_pid()
|
||||
|
||||
# autocommit and client encoding not worked with asynchronous connection
|
||||
# By default asynchronous connection runs in autocommit mode
|
||||
# autocommit flag does not work with asynchronous connections.
|
||||
# By default asynchronous connection runs in autocommit mode.
|
||||
if self.async == 0:
|
||||
self.conn.autocommit = True
|
||||
self.conn.set_client_encoding("UNICODE")
|
||||
|
||||
status, res = self.execute_scalar("""
|
||||
SET DateStyle=ISO;
|
||||
SET client_min_messages=notice;
|
||||
SET bytea_output=escape;
|
||||
""")
|
||||
SET client_encoding='UNICODE';""")
|
||||
|
||||
if not status:
|
||||
self.conn.close()
|
||||
self.conn = None
|
||||
|
@ -285,6 +318,8 @@ WHERE
|
|||
mgr.server_type = st.stype
|
||||
break
|
||||
|
||||
mgr.update_session()
|
||||
|
||||
return True, None
|
||||
|
||||
def __cursor(self):
|
||||
|
@ -817,7 +852,6 @@ class ServerManager(object):
|
|||
assert(server is not None)
|
||||
assert(isinstance(server, Server))
|
||||
|
||||
self.module = None
|
||||
self.ver = None
|
||||
self.sversion = None
|
||||
self.server_type = None
|
||||
|
@ -831,14 +865,37 @@ class ServerManager(object):
|
|||
self.password = server.password
|
||||
self.role = server.role
|
||||
self.ssl_mode = server.ssl_mode
|
||||
self.pinged = datetime.now()
|
||||
self.pinged = datetime.datetime.now()
|
||||
self.db_info = dict()
|
||||
|
||||
for con in self.connections:
|
||||
self.connections[con]._release()
|
||||
|
||||
self.update_session()
|
||||
|
||||
self.connections = dict()
|
||||
|
||||
def as_dict(self):
|
||||
"""
|
||||
Returns a dictionary object representing the server manager.
|
||||
"""
|
||||
if self.ver is None or len(self.connections) == 0:
|
||||
return None
|
||||
|
||||
res = dict()
|
||||
res['sid'] = self.sid
|
||||
res['password'] = self.password
|
||||
|
||||
connections = res['connections'] = dict()
|
||||
|
||||
for conn_id in self.connections:
|
||||
conn = self.connections[conn_id].as_dict()
|
||||
|
||||
if conn is not None:
|
||||
connections[conn_id] = conn
|
||||
|
||||
return res
|
||||
|
||||
def ServerVersion(self):
|
||||
return self.ver
|
||||
|
||||
|
@ -861,7 +918,9 @@ class ServerManager(object):
|
|||
return int(int(self.sversion / 100) / 100)
|
||||
raise Exception("Information is not available!")
|
||||
|
||||
def connection(self, database=None, conn_id=None, auto_reconnect=True, did=None):
|
||||
def connection(
|
||||
self, database=None, conn_id=None, auto_reconnect=True, did=None
|
||||
):
|
||||
msg_active_conn = gettext(
|
||||
"Server has no active connection, please connect it first!"
|
||||
)
|
||||
|
@ -878,7 +937,8 @@ class ServerManager(object):
|
|||
if conn.connected():
|
||||
status, res = conn.execute_dict("""
|
||||
SELECT
|
||||
db.oid as did, db.datname, db.datallowconn, pg_encoding_to_char(db.encoding) AS serverencoding,
|
||||
db.oid as did, db.datname, db.datallowconn,
|
||||
pg_encoding_to_char(db.encoding) AS serverencoding,
|
||||
has_database_privilege(db.oid, 'CREATE') as cancreate, datlastsysoid
|
||||
FROM
|
||||
pg_database db
|
||||
|
@ -900,7 +960,7 @@ WHERE db.oid = {0}""".format(did))
|
|||
my_id = ('CONN:' + str(conn_id)) if conn_id is not None else \
|
||||
('DB:' + str(database))
|
||||
|
||||
self.pinged = datetime.now()
|
||||
self.pinged = datetime.datetime.now()
|
||||
|
||||
if my_id in self.connections:
|
||||
return self.connections[my_id]
|
||||
|
@ -912,6 +972,38 @@ WHERE db.oid = {0}""".format(did))
|
|||
|
||||
return self.connections[my_id]
|
||||
|
||||
def _restore(self, data):
|
||||
"""
|
||||
Helps restoring to reconnect the auto-connect connections smoothly on
|
||||
reload/restart of the app server..
|
||||
"""
|
||||
# Hmm.. we will not honour this request, when I already have
|
||||
# connections
|
||||
if len(self.connections) != 0:
|
||||
return
|
||||
|
||||
# We need to know about the existing server variant supports during
|
||||
# first connection for identifications.
|
||||
from pgadmin.browser.server_groups.servers.types import ServerType
|
||||
self.pinged = datetime.datetime.now()
|
||||
|
||||
connections = data['connections']
|
||||
for conn_id in connections:
|
||||
conn_info = connections[conn_id]
|
||||
conn = self.connections[conn_info['conn_id']] = Connection(
|
||||
self, conn_info['conn_id'], conn_info['database'],
|
||||
True, conn_info['async']
|
||||
)
|
||||
|
||||
try:
|
||||
conn.connect(
|
||||
password=data['password'],
|
||||
server_types=ServerType.types()
|
||||
)
|
||||
except Exception as e:
|
||||
current_app.logger.exception(e)
|
||||
self.connections.pop(conn_info['conn_id'])
|
||||
|
||||
def release(self, database=None, conn_id=None, did=None):
|
||||
if did is not None:
|
||||
if did in self.db_info and 'datname' in self.db_info[did]:
|
||||
|
@ -921,7 +1013,6 @@ WHERE db.oid = {0}""".format(did))
|
|||
else:
|
||||
return False
|
||||
|
||||
|
||||
my_id = ('CONN:' + str(conn_id)) if conn_id is not None else \
|
||||
('DB:' + str(database)) if database is not None else None
|
||||
|
||||
|
@ -936,6 +1027,8 @@ WHERE db.oid = {0}""".format(did))
|
|||
self.server_type = None
|
||||
self.password = None
|
||||
|
||||
self.update_session()
|
||||
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
@ -949,8 +1042,23 @@ WHERE db.oid = {0}""".format(did))
|
|||
self.server_type = None
|
||||
self.password = None
|
||||
|
||||
self.update_session()
|
||||
|
||||
return True
|
||||
|
||||
def update_session(self):
|
||||
managers = session['__pgsql_server_managers'] \
|
||||
if '__pgsql_server_managers' in session else dict()
|
||||
updated_mgr = self.as_dict()
|
||||
|
||||
if not updated_mgr:
|
||||
if self.sid in managers:
|
||||
managers.pop(self.sid)
|
||||
else:
|
||||
managers[self.sid] = updated_mgr
|
||||
session['__pgsql_server_managers'] = managers
|
||||
|
||||
|
||||
class Driver(BaseDriver):
|
||||
"""
|
||||
class Driver(BaseDriver):
|
||||
|
@ -997,17 +1105,31 @@ class Driver(BaseDriver):
|
|||
assert (sid is not None and isinstance(sid, int))
|
||||
managers = None
|
||||
|
||||
import datetime
|
||||
if session['_id'] not in self.managers:
|
||||
self.managers[session['_id']] = managers = dict()
|
||||
if '__pgsql_server_managers' in session:
|
||||
session_managers = session['__pgsql_server_managers'].copy()
|
||||
session['__pgsql_server_managers'] = dict()
|
||||
|
||||
for server_id in session_managers:
|
||||
s = Server.query.filter_by(id=server_id).first()
|
||||
|
||||
if not s:
|
||||
continue
|
||||
|
||||
manager = managers[str(server_id)] = ServerManager(s)
|
||||
manager._restore(session_managers[server_id])
|
||||
manager.update_session()
|
||||
else:
|
||||
managers = self.managers[session['_id']]
|
||||
|
||||
managers['pinged'] = datetime.datetime.now()
|
||||
if str(sid) not in managers:
|
||||
from pgadmin.model import Server
|
||||
s = Server.query.filter_by(id=sid).first()
|
||||
|
||||
if not s:
|
||||
return None
|
||||
|
||||
managers[str(sid)] = ServerManager(s)
|
||||
|
||||
return managers[str(sid)]
|
||||
|
@ -1073,8 +1195,6 @@ class Driver(BaseDriver):
|
|||
Release the connections for the sessions, which have not pinged the
|
||||
server for more than config.MAX_SESSION_IDLE_TIME.
|
||||
"""
|
||||
import datetime
|
||||
import config
|
||||
|
||||
# Mininum session idle is 20 minutes
|
||||
max_idle_time = max(config.MAX_SESSION_IDLE_TIME or 60, 20)
|
||||
|
|
Loading…
Reference in New Issue