pgadmin4/web/pgadmin/settings/__init__.py

506 lines
16 KiB
Python

##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2025, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
"""Utility functions for storing and retrieving user configuration settings."""
import json
from flask import Response, request, render_template, current_app
from flask_babel import gettext
from flask_login import current_user
from pgadmin.user_login_check import pga_login_required
from pgadmin.utils import PgAdminModule, get_complete_file_path
from pgadmin.utils.ajax import make_json_response, bad_request,\
success_return, internal_server_error, make_response as ajax_response
from pgadmin.utils.menu import MenuItem
from pgadmin.model import db, Setting, ApplicationState
from pgadmin.utils.constants import MIMETYPE_APP_JS
from .utils import get_dialog_type, get_file_type_setting
from cryptography.fernet import Fernet
import hashlib
MODULE_NAME = 'settings'
class SettingsModule(PgAdminModule):
def get_own_menuitems(self):
return {
'file_items': [
MenuItem(
name='mnu_resetlayout',
priority=998,
module="pgAdmin.Settings",
callback='show',
label=gettext('Reset Layout')
)
]
}
def get_exposed_url_endpoints(self):
"""
Returns:
list: a list of url endpoints exposed to the client.
"""
return [
'settings.store', 'settings.store_bulk', 'settings.reset_layout',
'settings.save_tree_state', 'settings.get_tree_state',
'settings.reset_tree_state',
'settings.save_file_format_setting',
'settings.get_file_format_setting',
'settings.save_application_state',
'settings.get_application_state',
'settings.delete_application_state',
'settings.get_tool_data'
]
blueprint = SettingsModule(MODULE_NAME, __name__)
def store_setting(setting, value):
"""Set a configuration setting for the current user."""
data = Setting(user_id=current_user.id, setting=setting, value=value)
db.session.merge(data)
db.session.commit()
def get_setting(setting, default=''):
"""Retrieve a configuration setting for the current user, or return the
default value specified by the caller."""
data = Setting.query.filter_by(
user_id=current_user.id, setting=setting).first()
if not data or data.value is None:
return default
else:
return data.value
def get_workspace_layout():
result = (Setting.query.filter(
Setting.user_id == current_user.id,
Setting.setting.ilike('Workspace/Layout%')).all())
settings = {}
for row in result:
settings[row.setting] = row.value
return settings
def get_layout():
layout = {'Browser/Layout': get_setting('Browser/Layout', default='')}
layout = {**layout, **get_workspace_layout()}
return layout
@blueprint.route("/")
@pga_login_required
def index():
return bad_request(errormsg=gettext("This URL cannot be called directly."))
@blueprint.route("/settings.js")
@pga_login_required
def script():
"""Render the required Javascript"""
return Response(response=render_template("settings/settings.js"),
status=200,
mimetype=MIMETYPE_APP_JS)
@blueprint.route("/store", methods=['POST'], endpoint='store_bulk')
@blueprint.route("/store/<setting>/<value>", methods=['PUT'], endpoint='store')
@pga_login_required
def store(setting=None, value=None):
"""Store a configuration setting, or if this is a POST request and a
count value is present, store multiple settings at once."""
success = 1
errormsg = ''
try:
data = request.form if request.form else json.loads(
request.data.decode('utf-8'))
if request.method == 'POST':
if 'count' in request.form:
for x in range(int(data['count'])):
store_setting(data['setting%d' % (
x + 1)], data['value%d' % (x + 1)])
else:
store_setting(data['setting'], data['value'])
else:
store_setting(setting, value)
except Exception as e:
success = 0
errormsg = str(e)
return make_json_response(success=success,
errormsg=errormsg,
info=gettext("Setting stored"))
@blueprint.route("/layout", methods=['DELETE'], endpoint='reset_layout')
@pga_login_required
def reset_layout():
"""Reset configuration setting"""
try:
if hasattr(request, 'params') and \
request.params['setting'] in [
'Browser/Layout', 'SQLEditor/Layout', 'Debugger/Layout']:
db.session.query(Setting) \
.filter(Setting.user_id == current_user.id) \
.filter((Setting.setting == request.params['setting'])) \
.delete()
else:
db.session.query(Setting) \
.filter(Setting.user_id == current_user.id)\
.filter((Setting.setting == 'Browser/Layout') |
(Setting.setting == 'SQLEditor/Layout') |
(Setting.setting == 'Debugger/Layout'))\
.delete()
db.session.commit()
except Exception as e:
return make_json_response(
status=410, success=0, errormsg=str(e)
)
return make_json_response(result=request.form)
@blueprint.route("/reset_tree_state", methods=['DELETE'],
endpoint='reset_tree_state')
@pga_login_required
def reset_tree_state():
"""Reset the saved tree state."""
data = Setting.query.filter_by(user_id=current_user.id,
setting='browser_tree_state').first()
try:
if data is not None:
db.session.delete(data)
db.session.commit()
except Exception as e:
return make_json_response(
status=410, success=0, errormsg=str(e)
)
return success_return()
@blueprint.route("/save_tree_state/", endpoint="save_tree_state",
methods=['POST'])
@pga_login_required
def save_browser_tree_state():
"""Save the browser tree state."""
data = request.form if request.form else request.data.decode('utf-8')
old_data = get_setting('browser_tree_state')
if old_data and old_data != 'null':
if data:
data = json.loads(data)
old_data = json.loads(old_data)
old_data.update(data)
new_data = json.dumps(old_data)
else:
new_data = data
try:
store_setting('browser_tree_state', new_data)
except Exception as e:
current_app.logger.exception(e)
return internal_server_error(errormsg=str(e))
return success_return()
@blueprint.route("/get_tree_state/", endpoint="get_tree_state",
methods=['GET'])
@pga_login_required
def get_browser_tree_state():
"""Get the browser tree state."""
try:
data = get_setting('browser_tree_state')
except Exception as e:
current_app.logger.exception(e)
return internal_server_error(errormsg=str(e))
return Response(response=data,
status=200,
mimetype="application/json")
@blueprint.route("/save_file_format_setting/",
endpoint="save_file_format_setting",
methods=['POST'])
@pga_login_required
def save_file_format_setting():
"""
This function save the selected file format.save_file_format_setting
:return: save file format response
"""
data = request.form if request.form else json.loads(
request.data.decode('utf-8'))
file_type = get_dialog_type(data['allowed_file_types'])
store_setting(file_type, data['last_selected_format'])
return make_json_response(success=True,
info=data,
result=request.form)
@blueprint.route("/get_file_format_setting/",
endpoint="get_file_format_setting",
methods=['GET'])
@pga_login_required
def get_file_format_setting():
"""
This function return the last selected file format
:return: last selected file format
"""
data = dict()
for k, v in request.args.items():
try:
data[k] = json.loads(v)
except (ValueError, TypeError, KeyError):
data[k] = v
return make_json_response(success=True,
info=get_file_type_setting(list(data.values())))
@blueprint.route(
'/save_application_state',
methods=["POST"], endpoint='save_application_state'
)
@pga_login_required
def save_application_state():
"""
Expose an api to save the application state which stores the data from
query tool, ERD, schema-diff, psql
"""
data = json.loads(request.data)
trans_id = data['trans_id']
fernet = Fernet(current_app.config['SECRET_KEY'].encode())
tool_data = fernet.encrypt(json.dumps(data['tool_data']).encode())
connection_info = data['connection_info'] \
if 'connection_info' in data else None
if (connection_info and 'open_file_name' in connection_info and
connection_info['open_file_name']):
file_path = get_complete_file_path(connection_info['open_file_name'])
connection_info['last_saved_file_hash'] = (
get_last_saved_file_hash(file_path, trans_id))
try:
data_entry = ApplicationState(
uid=current_user.id, id=trans_id,connection_info=connection_info,
tool_data=tool_data)
db.session.merge(data_entry)
db.session.commit()
except Exception as e:
print(e)
db.session.rollback()
return make_json_response(
data={
'status': True,
'msg': 'Success',
})
def get_last_saved_file_hash(file_path, trans_id):
result = db.session \
.query(ApplicationState) \
.filter(ApplicationState.uid == current_user.id,
ApplicationState.id == trans_id).all()
file_hash_update_require = True
last_saved_file_hash = None
for row in result:
connection_info = row.connection_info
if ('open_file_name' in connection_info and
connection_info['open_file_name']):
file_hash_update_require = not connection_info['is_editor_dirty']
last_saved_file_hash = connection_info['last_saved_file_hash']
if file_hash_update_require:
last_saved_file_hash = compute_md5_hash_file(file_path)
return last_saved_file_hash
@blueprint.route(
'/get_application_state',
methods=["GET"], endpoint='get_application_state')
@pga_login_required
def get_application_state():
"""
Returns application state if any stored.
"""
fernet = Fernet(current_app.config['SECRET_KEY'].encode())
result = db.session \
.query(ApplicationState) \
.filter(ApplicationState.uid == current_user.id) \
.all()
res = []
for row in result:
connection_info = row.connection_info
if (connection_info and 'open_file_name' in connection_info and
connection_info['open_file_name']):
file_path = get_complete_file_path(
connection_info['open_file_name'])
file_deleted = False if file_path else True
connection_info['file_deleted'] = file_deleted
if (not file_deleted and connection_info['is_editor_dirty'] and
'last_saved_file_hash' in connection_info and
connection_info['last_saved_file_hash']):
connection_info['external_file_changes'] = \
check_external_file_changes(
file_path, connection_info['last_saved_file_hash'])
res.append({'connection_info': connection_info,
'tool_data': fernet.decrypt(row.tool_data).decode(),
'id': row.id
})
return make_json_response(
data={
'status': True,
'msg': '',
'result': res
}
)
@blueprint.route(
'/get_tool_data/<int:trans_id>',
methods=["GET"], endpoint='get_tool_data')
@pga_login_required
def get_tool_data(trans_id):
fernet = Fernet(current_app.config['SECRET_KEY'].encode())
result = db.session \
.query(ApplicationState) \
.filter(ApplicationState.uid == current_user.id,
ApplicationState.id == trans_id) \
.first()
if result:
connection_info = result.connection_info
tool_data = fernet.decrypt(result.tool_data).decode()
if (connection_info and 'open_file_name' in connection_info and
connection_info['open_file_name']):
file_path = (
get_complete_file_path(connection_info['open_file_name']))
file_deleted = False if file_path else True
connection_info['file_deleted'] = file_deleted
if (not file_deleted and connection_info['is_editor_dirty'] and
'last_saved_file_hash' in connection_info and
connection_info['last_saved_file_hash']):
connection_info['external_file_changes'] = \
check_external_file_changes(
file_path, connection_info['last_saved_file_hash'])
if not (file_deleted or connection_info['is_editor_dirty']):
# Send tool data only if file is deleted or edited
tool_data = None
return make_json_response(
data={
'status': True,
'msg': '',
'result': {
'connection_info': connection_info,
'tool_data': tool_data,
'id': result.id
}
}
)
else:
return (
make_json_response(
success=0,
errormsg=gettext(
'There is no saved content available for this tab.'),
status=404
))
@blueprint.route(
'/delete_application_state/',
methods=["DELETE"], endpoint='delete_application_state')
@pga_login_required
def delete_application_state():
status = False
msg = gettext('Unable to delete application state data.')
if request.data:
data = json.loads(request.data)
if 'trans_ids' in data:
for trans_id in data['trans_ids']:
status, msg = delete_tool_data(trans_id)
else:
trans_id = int(data['panelId'].split('_')[-1])
status, msg = delete_tool_data(trans_id)
return make_json_response(
data={
'status': status,
'msg': msg,
}
)
def delete_tool_data(trans_id=None):
try:
if trans_id:
results = db.session \
.query(ApplicationState) \
.filter(ApplicationState.uid == current_user.id,
ApplicationState.id == trans_id) \
.all()
else:
results = db.session \
.query(ApplicationState) \
.filter(ApplicationState.uid == current_user.id) \
.all()
for result in results:
db.session.delete(result)
db.session.commit()
return True, 'Success'
except Exception as e:
db.session.rollback()
return False, str(e)
def compute_md5_hash_file(file_path, chunk_size=8192):
"""Compute md5 hash for large files by reading in chunks."""
md5_hash = hashlib.md5()
# Open the file in binary mode
with open(file_path, "rb") as file:
# Read and hash in 8 KB chunks (can adjust the chunk size if needed)
for chunk in iter(lambda: file.read(chunk_size), b""):
md5_hash.update(chunk)
return md5_hash.hexdigest()
def check_external_file_changes(file_path, last_saved_file_hash):
current_file_hash = compute_md5_hash_file(file_path)
if current_file_hash != last_saved_file_hash:
return True
return False