Ensure saved shared server passwords are re-encrypted on password change. #9258

pull/9516/head
Yogesh Mahajan 2026-01-06 11:29:42 +05:30 committed by GitHub
parent cc0377fb59
commit 1301e5d2ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 33 additions and 25 deletions

View File

@ -19,7 +19,7 @@ from pgadmin.utils.constants import (
KEY_RING_DESKTOP_USER, SSL_MODES, RESTRICTION_TYPE_DATABASES,
RESTRICTION_TYPE_SQL)
from pgadmin.utils.crypto import encrypt, decrypt
from pgadmin.model import db, Server
from pgadmin.model import db, Server, SharedServer
from flask import current_app
from pgadmin.utils.master_password import set_masterpass_check_text
from pgadmin.utils.driver import get_driver
@ -440,37 +440,45 @@ def migrate_saved_passwords(master_key, master_password):
return passwords_migrated, error
def __reencrpyt_server_password(server, old_key, new_key):
from pgadmin.utils.driver import get_driver
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(server.id)
_password_check(server, manager, old_key, new_key)
if server.tunnel_password is not None:
tunnel_password = decrypt(server.tunnel_password, old_key)
if isinstance(tunnel_password, bytes):
tunnel_password = tunnel_password.decode()
tunnel_password = encrypt(tunnel_password, new_key)
setattr(server, 'tunnel_password', tunnel_password)
manager.tunnel_password = tunnel_password
elif manager.tunnel_password is not None:
tunnel_password = decrypt(manager.tunnel_password, old_key)
if isinstance(tunnel_password, bytes):
tunnel_password = tunnel_password.decode()
tunnel_password = encrypt(tunnel_password, new_key)
manager.tunnel_password = tunnel_password
db.session.commit()
manager.update_session()
def reencrpyt_server_passwords(user_id, old_key, new_key):
"""
This function will decrypt the saved passwords in SQLite with old key
and then encrypt with new key
"""
from pgadmin.utils.driver import get_driver
driver = get_driver(config.PG_DEFAULT_DRIVER)
for server in Server.query.filter_by(user_id=user_id).all():
manager = driver.connection_manager(server.id)
_password_check(server, manager, old_key, new_key)
__reencrpyt_server_password(server, old_key, new_key)
if server.tunnel_password is not None:
tunnel_password = decrypt(server.tunnel_password, old_key)
if isinstance(tunnel_password, bytes):
tunnel_password = tunnel_password.decode()
tunnel_password = encrypt(tunnel_password, new_key)
setattr(server, 'tunnel_password', tunnel_password)
manager.tunnel_password = tunnel_password
elif manager.tunnel_password is not None:
tunnel_password = decrypt(manager.tunnel_password, old_key)
if isinstance(tunnel_password, bytes):
tunnel_password = tunnel_password.decode()
tunnel_password = encrypt(tunnel_password, new_key)
manager.tunnel_password = tunnel_password
db.session.commit()
manager.update_session()
# Ensure saved shared server passwords are re-encrypted.
for server in SharedServer.query.filter_by(user_id=user_id).all():
__reencrpyt_server_password(server, old_key, new_key)
def remove_saved_passwords(user_id):