pgadmin4/web/pgadmin/llm/__init__.py

1854 lines
55 KiB
Python

##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2026, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
"""A blueprint module implementing LLM/AI configuration."""
import json
import ssl
from flask import request
from flask_babel import gettext
from pgadmin.utils import PgAdminModule
from pgadmin.utils.preferences import Preferences
from pgadmin.utils.ajax import make_json_response, internal_server_error
from pgadmin.user_login_check import pga_login_required
from pgadmin.utils.constants import MIMETYPE_APP_JS
from pgadmin.utils.csrf import pgCSRFProtect
import config
# Try to use certifi for proper SSL certificate handling
try:
import certifi
SSL_CONTEXT = ssl.create_default_context(cafile=certifi.where())
except ImportError:
SSL_CONTEXT = ssl.create_default_context()
# Enforce minimum TLS 1.2 to satisfy security requirements
SSL_CONTEXT.minimum_version = ssl.TLSVersion.TLSv1_2
MODULE_NAME = 'llm'
# Valid LLM providers
LLM_PROVIDERS = ['anthropic', 'openai', 'ollama', 'docker']
class LLMModule(PgAdminModule):
"""LLM configuration module for pgAdmin."""
def register_preferences(self):
"""
Register preferences for LLM providers.
"""
self.preference = Preferences('ai', gettext('AI'))
# Default Provider Setting
provider_options = [
{'label': gettext('None (Disabled)'), 'value': ''},
{'label': gettext('Anthropic'), 'value': 'anthropic'},
{'label': gettext('OpenAI'), 'value': 'openai'},
{'label': gettext('Ollama'), 'value': 'ollama'},
{'label': gettext('Docker Model Runner'), 'value': 'docker'},
]
# Get default provider from config
default_provider_value = getattr(config, 'DEFAULT_LLM_PROVIDER', '')
self.default_provider = self.preference.register(
'general', 'default_provider',
gettext("Default Provider"), 'options',
default_provider_value,
category_label=gettext('AI Configuration'),
options=provider_options,
help_str=gettext(
'The LLM provider to use for AI features. '
'Select "None (Disabled)" to disable AI features. '
'Note: AI features must also be enabled in the server '
'configuration (LLM_ENABLED) for this setting to take effect.'
),
control_props={'allowClear': False}
)
# Maximum Tool Iterations
max_tool_iterations_default = getattr(
config, 'MAX_LLM_TOOL_ITERATIONS', 20
)
self.max_tool_iterations = self.preference.register(
'general', 'max_tool_iterations',
gettext("Max Tool Iterations"), 'integer',
max_tool_iterations_default,
category_label=gettext('AI Configuration'),
min_val=1,
max_val=100,
help_str=gettext(
'Maximum number of tool call iterations allowed during an AI '
'conversation. Higher values allow more complex queries but '
'may consume more resources. Default is 20.'
)
)
# Anthropic Settings
# Get defaults from config
anthropic_key_file_default = getattr(
config, 'ANTHROPIC_API_KEY_FILE', ''
)
anthropic_model_default = getattr(config, 'ANTHROPIC_API_MODEL', '')
self.anthropic_api_key_file = self.preference.register(
'anthropic', 'anthropic_api_key_file',
gettext("API Key File"), 'text',
anthropic_key_file_default,
category_label=gettext('Anthropic'),
help_str=gettext(
'Path to a file containing your Anthropic API key. '
'The file should contain only the API key.'
)
)
# Fallback Anthropic models (used if API fetch fails)
anthropic_model_options = []
self.anthropic_api_model = self.preference.register(
'anthropic', 'anthropic_api_model',
gettext("Model"), 'options',
anthropic_model_default,
category_label=gettext('Anthropic'),
options=anthropic_model_options,
help_str=gettext(
'The Anthropic model to use. Models are loaded dynamically '
'from your API key. You can also type a custom model name. '
'Leave empty to use the default (Claude Sonnet 4).'
),
control_props={
'allowClear': True,
'creatable': True,
'tags': True,
'placeholder': gettext('Select or type a model name...'),
'optionsUrl': 'llm.models_anthropic',
'optionsRefreshUrl': 'llm.refresh_models_anthropic',
'refreshDepNames': {
'api_key_file': 'anthropic_api_key_file'
}
}
)
# OpenAI Settings
# Get defaults from config
openai_key_file_default = getattr(config, 'OPENAI_API_KEY_FILE', '')
openai_model_default = getattr(config, 'OPENAI_API_MODEL', '')
self.openai_api_key_file = self.preference.register(
'openai', 'openai_api_key_file',
gettext("API Key File"), 'text',
openai_key_file_default,
category_label=gettext('OpenAI'),
help_str=gettext(
'Path to a file containing your OpenAI API key. '
'The file should contain only the API key.'
)
)
# Fallback OpenAI models (used if API fetch fails)
openai_model_options = []
self.openai_api_model = self.preference.register(
'openai', 'openai_api_model',
gettext("Model"), 'options',
openai_model_default,
category_label=gettext('OpenAI'),
options=openai_model_options,
help_str=gettext(
'The OpenAI model to use. Models are loaded dynamically '
'from your API key. You can also type a custom model name. '
'Leave empty to use the default (GPT-4o).'
),
control_props={
'allowClear': True,
'creatable': True,
'tags': True,
'placeholder': gettext('Select or type a model name...'),
'optionsUrl': 'llm.models_openai',
'optionsRefreshUrl': 'llm.refresh_models_openai',
'refreshDepNames': {
'api_key_file': 'openai_api_key_file'
}
}
)
# Ollama Settings
# Get defaults from config
ollama_url_default = getattr(config, 'OLLAMA_API_URL', '')
ollama_model_default = getattr(config, 'OLLAMA_API_MODEL', '')
self.ollama_api_url = self.preference.register(
'ollama', 'ollama_api_url',
gettext("API URL"), 'text',
ollama_url_default,
category_label=gettext('Ollama'),
help_str=gettext(
'URL for the Ollama API endpoint '
'(e.g., http://localhost:11434).'
)
)
# Fallback Ollama models (used if API fetch fails)
ollama_model_options = []
self.ollama_api_model = self.preference.register(
'ollama', 'ollama_api_model',
gettext("Model"), 'options',
ollama_model_default,
category_label=gettext('Ollama'),
options=ollama_model_options,
help_str=gettext(
'The Ollama model to use. Models are loaded dynamically '
'from your Ollama server. You can also type a custom model '
'name. Leave empty to use the default (llama3.2).'
),
control_props={
'allowClear': True,
'creatable': True,
'tags': True,
'placeholder': gettext('Select or type a model name...'),
'optionsUrl': 'llm.models_ollama',
'optionsRefreshUrl': 'llm.refresh_models_ollama',
'refreshDepNames': {
'api_url': 'ollama_api_url'
}
}
)
# Docker Model Runner Settings
# Get defaults from config
docker_url_default = getattr(config, 'DOCKER_API_URL', '')
docker_model_default = getattr(config, 'DOCKER_API_MODEL', '')
self.docker_api_url = self.preference.register(
'docker', 'docker_api_url',
gettext("API URL"), 'text',
docker_url_default,
category_label=gettext('Docker Model Runner'),
help_str=gettext(
'URL for the Docker Model Runner API endpoint '
'(e.g., http://localhost:12434). Available in Docker Desktop '
'4.40 and later.'
)
)
# Fallback Docker models (used if API fetch fails)
docker_model_options = []
self.docker_api_model = self.preference.register(
'docker', 'docker_api_model',
gettext("Model"), 'options',
docker_model_default,
category_label=gettext('Docker Model Runner'),
options=docker_model_options,
help_str=gettext(
'The Docker model to use. Models are loaded dynamically '
'from your Docker Model Runner. You can also type a custom '
'model name. Leave empty to use the default (ai/qwen3-coder).'
),
control_props={
'allowClear': True,
'creatable': True,
'tags': True,
'placeholder': gettext('Select or type a model name...'),
'optionsUrl': 'llm.models_docker',
'optionsRefreshUrl': 'llm.refresh_models_docker',
'refreshDepNames': {
'api_url': 'docker_api_url'
}
}
)
def get_exposed_url_endpoints(self):
"""
Returns the list of URLs exposed to the client.
"""
return [
'llm.models_anthropic',
'llm.models_openai',
'llm.models_ollama',
'llm.models_docker',
'llm.refresh_models_anthropic',
'llm.refresh_models_openai',
'llm.refresh_models_ollama',
'llm.refresh_models_docker',
'llm.status',
# Security reports
'llm.security_report',
'llm.database_security_report',
'llm.schema_security_report',
# Security report streams
'llm.security_report_stream',
'llm.database_security_report_stream',
'llm.schema_security_report_stream',
# Performance reports
'llm.performance_report',
'llm.database_performance_report',
# Performance report streams
'llm.performance_report_stream',
'llm.database_performance_report_stream',
# Design reviews
'llm.database_design_report',
'llm.schema_design_report',
# Design report streams
'llm.database_design_report_stream',
'llm.schema_design_report_stream',
]
# Initialise the module
blueprint = LLMModule(MODULE_NAME, __name__)
@blueprint.route("/status", methods=["GET"], endpoint='status')
@pga_login_required
def get_llm_status():
"""
Get the LLM configuration status.
Returns whether LLM is enabled at system and user level,
and the configured provider and model.
"""
from pgadmin.llm.utils import (
is_llm_enabled, is_llm_enabled_system, get_default_provider,
get_anthropic_model, get_openai_model, get_ollama_model,
get_docker_model
)
provider = get_default_provider()
model = None
if provider == 'anthropic':
model = get_anthropic_model()
elif provider == 'openai':
model = get_openai_model()
elif provider == 'ollama':
model = get_ollama_model()
elif provider == 'docker':
model = get_docker_model()
return make_json_response(
success=1,
data={
'enabled': is_llm_enabled(),
'system_enabled': is_llm_enabled_system(),
'provider': provider,
'model': model
}
)
@blueprint.route(
"/models/anthropic", methods=["GET"], endpoint='models_anthropic'
)
@pga_login_required
def get_anthropic_models():
"""
Fetch available Anthropic models.
Returns models that support tool use.
"""
from pgadmin.llm.utils import get_anthropic_api_key
api_key = get_anthropic_api_key()
if not api_key:
return make_json_response(
data={'models': [], 'error': 'No API key configured'},
status=200
)
try:
models = _fetch_anthropic_models(api_key)
return make_json_response(data={'models': models}, status=200)
except Exception as e:
return make_json_response(
data={'models': [], 'error': str(e)},
status=200
)
@blueprint.route(
"/models/anthropic/refresh",
methods=["POST"],
endpoint='refresh_models_anthropic'
)
@pga_login_required
def refresh_anthropic_models():
"""
Fetch available Anthropic models using a provided API key file path.
Used by the preferences refresh button to load models before saving.
"""
from pgadmin.llm.utils import read_api_key_file
data = request.get_json(force=True, silent=True) or {}
api_key_file = data.get('api_key_file', '')
if not api_key_file:
return make_json_response(
data={'models': [], 'error': 'No API key file provided'},
status=200
)
api_key = read_api_key_file(api_key_file)
if not api_key:
return make_json_response(
data={'models': [], 'error': 'Could not read API key from file'},
status=200
)
try:
models = _fetch_anthropic_models(api_key)
return make_json_response(data={'models': models}, status=200)
except Exception as e:
return make_json_response(
data={'models': [], 'error': str(e)},
status=200
)
@blueprint.route("/models/openai", methods=["GET"], endpoint='models_openai')
@pga_login_required
def get_openai_models():
"""
Fetch available OpenAI models.
Returns models that support function calling.
"""
from pgadmin.llm.utils import get_openai_api_key
api_key = get_openai_api_key()
if not api_key:
return make_json_response(
data={'models': [], 'error': 'No API key configured'},
status=200
)
try:
models = _fetch_openai_models(api_key)
return make_json_response(data={'models': models}, status=200)
except Exception as e:
return make_json_response(
data={'models': [], 'error': str(e)},
status=200
)
@blueprint.route(
"/models/openai/refresh",
methods=["POST"],
endpoint='refresh_models_openai'
)
@pga_login_required
def refresh_openai_models():
"""
Fetch available OpenAI models using a provided API key file path.
Used by the preferences refresh button to load models before saving.
"""
from pgadmin.llm.utils import read_api_key_file
data = request.get_json(force=True, silent=True) or {}
api_key_file = data.get('api_key_file', '')
if not api_key_file:
return make_json_response(
data={'models': [], 'error': 'No API key file provided'},
status=200
)
api_key = read_api_key_file(api_key_file)
if not api_key:
return make_json_response(
data={'models': [], 'error': 'Could not read API key from file'},
status=200
)
try:
models = _fetch_openai_models(api_key)
return make_json_response(data={'models': models}, status=200)
except Exception as e:
return make_json_response(
data={'models': [], 'error': str(e)},
status=200
)
@blueprint.route("/models/ollama", methods=["GET"], endpoint='models_ollama')
@pga_login_required
def get_ollama_models():
"""
Fetch available Ollama models.
"""
from pgadmin.llm.utils import get_ollama_api_url
api_url = get_ollama_api_url()
if not api_url:
return make_json_response(
data={'models': [], 'error': 'No API URL configured'},
status=200
)
try:
models = _fetch_ollama_models(api_url)
return make_json_response(data={'models': models}, status=200)
except Exception as e:
return make_json_response(
data={'models': [], 'error': str(e)},
status=200
)
@blueprint.route(
"/models/ollama/refresh",
methods=["POST"],
endpoint='refresh_models_ollama'
)
@pga_login_required
def refresh_ollama_models():
"""
Fetch available Ollama models using a provided API URL.
Used by the preferences refresh button to load models before saving.
"""
data = request.get_json(force=True, silent=True) or {}
api_url = data.get('api_url', '')
if not api_url:
return make_json_response(
data={'models': [], 'error': 'No API URL provided'},
status=200
)
try:
models = _fetch_ollama_models(api_url)
return make_json_response(data={'models': models}, status=200)
except Exception as e:
return make_json_response(
data={'models': [], 'error': str(e)},
status=200
)
@blueprint.route("/models/docker", methods=["GET"], endpoint='models_docker')
@pga_login_required
def get_docker_models():
"""
Fetch available Docker Model Runner models.
"""
from pgadmin.llm.utils import get_docker_api_url
api_url = get_docker_api_url()
if not api_url:
return make_json_response(
data={'models': [], 'error': 'No API URL configured'},
status=200
)
try:
models = _fetch_docker_models(api_url)
return make_json_response(data={'models': models}, status=200)
except Exception as e:
return make_json_response(
data={'models': [], 'error': str(e)},
status=200
)
@blueprint.route(
"/models/docker/refresh",
methods=["POST"],
endpoint='refresh_models_docker'
)
@pga_login_required
def refresh_docker_models():
"""
Fetch available Docker models using a provided API URL.
Used by the preferences refresh button to load models before saving.
"""
data = request.get_json(force=True, silent=True) or {}
api_url = data.get('api_url', '')
if not api_url:
return make_json_response(
data={'models': [], 'error': 'No API URL provided'},
status=200
)
try:
models = _fetch_docker_models(api_url)
return make_json_response(data={'models': models}, status=200)
except Exception as e:
return make_json_response(
data={'models': [], 'error': str(e)},
status=200
)
def _fetch_anthropic_models(api_key):
"""
Fetch models from Anthropic API.
Returns a list of model options with label and value.
"""
import urllib.request
import urllib.error
req = urllib.request.Request(
'https://api.anthropic.com/v1/models',
headers={
'x-api-key': api_key,
'anthropic-version': '2023-06-01'
}
)
try:
with urllib.request.urlopen(
req, timeout=30, context=SSL_CONTEXT
) as response:
data = json.loads(response.read().decode('utf-8'))
except urllib.error.HTTPError as e:
if e.code == 401:
raise ValueError('Invalid API key')
raise ConnectionError(f'API error: {e.code}')
models = []
seen = set()
for model in data.get('data', []):
model_id = model.get('id', '')
display_name = model.get('display_name', model_id)
# Skip if already seen or empty
if not model_id or model_id in seen:
continue
seen.add(model_id)
# Create a user-friendly label
if display_name and display_name != model_id:
label = f"{display_name} ({model_id})"
else:
label = model_id
models.append({
'label': label,
'value': model_id
})
# Sort alphabetically by model ID
models.sort(key=lambda x: x['value'])
return models
def _fetch_openai_models(api_key):
"""
Fetch models from OpenAI API.
Returns a list of model options with label and value.
"""
import urllib.request
import urllib.error
req = urllib.request.Request(
'https://api.openai.com/v1/models',
headers={
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
)
try:
with urllib.request.urlopen(
req, timeout=30, context=SSL_CONTEXT
) as response:
data = json.loads(response.read().decode('utf-8'))
except urllib.error.HTTPError as e:
if e.code == 401:
raise ValueError('Invalid API key')
raise ConnectionError(f'API error: {e.code}')
models = []
seen = set()
for model in data.get('data', []):
model_id = model.get('id', '')
# Skip if already seen or empty
if not model_id or model_id in seen:
continue
seen.add(model_id)
models.append({
'label': model_id,
'value': model_id
})
# Sort alphabetically
models.sort(key=lambda x: x['value'])
return models
def _fetch_ollama_models(api_url):
"""
Fetch models from Ollama API.
Returns a list of model options with label and value.
"""
import urllib.request
import urllib.error
# Normalize URL
api_url = api_url.rstrip('/')
url = f'{api_url}/api/tags'
req = urllib.request.Request(url)
try:
with urllib.request.urlopen(
req, timeout=30, context=SSL_CONTEXT
) as response:
data = json.loads(response.read().decode('utf-8'))
except urllib.error.URLError as e:
raise ConnectionError(
f'Cannot connect to Ollama: {e.reason}'
)
except OSError as e:
raise ConnectionError(
f'Error fetching models: {str(e)}'
)
models = []
for model in data.get('models', []):
name = model.get('name', '')
if name:
# Format size if available
size = model.get('size', 0)
if size:
size_gb = size / (1024 ** 3)
label = f"{name} ({size_gb:.1f} GB)"
else:
label = name
models.append({
'label': label,
'value': name
})
# Sort alphabetically
models.sort(key=lambda x: x['value'])
return models
def _fetch_docker_models(api_url):
"""
Fetch models from Docker Model Runner API.
Returns a list of model options with label and value.
Docker Model Runner uses an OpenAI-compatible API at /engines/v1/models
"""
import urllib.request
import urllib.error
# Normalize URL
api_url = api_url.rstrip('/')
url = f'{api_url}/engines/v1/models'
req = urllib.request.Request(url)
try:
with urllib.request.urlopen(
req, timeout=30, context=SSL_CONTEXT
) as response:
data = json.loads(response.read().decode('utf-8'))
except urllib.error.URLError as e:
raise ConnectionError(
f'Cannot connect to Docker Model Runner: '
f'{e.reason}. Is Docker Desktop running '
f'with model runner enabled?'
)
except OSError as e:
raise ConnectionError(
f'Error fetching models: {str(e)}'
)
models = []
seen = set()
for model in data.get('data', []):
model_id = model.get('id', '')
# Skip if already seen or empty
if not model_id or model_id in seen:
continue
seen.add(model_id)
models.append({
'label': model_id,
'value': model_id
})
# Sort alphabetically
models.sort(key=lambda x: x['value'])
return models
@blueprint.route(
"/security-report/<int:sid>",
methods=["GET"],
endpoint='security_report'
)
@pga_login_required
def generate_security_report(sid):
"""
Generate a security report for the specified server.
Uses the multi-stage pipeline to analyze server configuration.
"""
from pgadmin.llm.utils import is_llm_enabled
from pgadmin.llm.reports.generator import generate_report_sync
from pgadmin.utils.driver import get_driver
# Check if LLM is configured
if not is_llm_enabled():
return make_json_response(
success=0,
errormsg=gettext(
'LLM is not configured. Please configure an LLM provider '
'in Preferences > AI.'
)
)
# Get database connection
try:
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(sid)
conn = manager.connection()
if not conn.connected():
return make_json_response(
success=0,
errormsg=gettext('Server is not connected.')
)
# Generate report using pipeline
context = {}
success, result = generate_report_sync(
report_type='security',
scope='server',
conn=conn,
manager=manager,
context=context
)
if success:
return make_json_response(
success=1,
data={'report': result}
)
else:
return make_json_response(
success=0,
errormsg=result
)
except Exception as e:
return make_json_response(
success=0,
errormsg=gettext('Failed to generate report: ') + str(e)
)
@blueprint.route(
"/security-report/<int:sid>/stream",
methods=["GET"],
endpoint='security_report_stream'
)
@pgCSRFProtect.exempt
@pga_login_required
def generate_security_report_stream(sid):
"""
Stream a security report for the specified server via SSE.
"""
from pgadmin.llm.utils import is_llm_enabled
from pgadmin.llm.reports.generator import (
generate_report_streaming, create_sse_response
)
from pgadmin.utils.driver import get_driver
if not is_llm_enabled():
return make_json_response(
success=0,
errormsg=gettext(
'LLM is not configured. Please configure an LLM provider '
'in Preferences > AI.'
)
)
try:
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(sid)
conn = manager.connection()
if not conn.connected():
return make_json_response(
success=0,
errormsg=gettext('Server is not connected.')
)
context = {}
generator = generate_report_streaming(
report_type='security',
scope='server',
conn=conn,
manager=manager,
context=context
)
return create_sse_response(generator)
except Exception as e:
return make_json_response(
success=0,
errormsg=str(e)
)
def _gather_security_config(conn, manager):
"""
Gather security-related configuration from the PostgreSQL server.
"""
security_info = {
'server_version': manager.ver,
'server_version_num': manager.sversion,
}
# Get security-related settings from pg_settings
settings_query = """
SELECT name, setting, short_desc, context, source
FROM pg_settings
WHERE name IN (
-- Connection settings
'listen_addresses', 'port', 'max_connections',
'superuser_reserved_connections',
-- Authentication
'password_encryption', 'krb_server_keyfile',
'authentication_timeout', 'ssl', 'ssl_ciphers',
'ssl_prefer_server_ciphers', 'ssl_min_protocol_version',
'ssl_max_protocol_version', 'ssl_cert_file', 'ssl_key_file',
'ssl_ca_file', 'ssl_crl_file',
-- Security
'db_user_namespace', 'row_security', 'default_roles_initialized',
-- Logging (security-relevant)
'log_connections', 'log_disconnections',
'log_hostname', 'log_statement', 'log_line_prefix',
'log_duration', 'log_min_duration_statement',
'log_min_error_statement', 'log_replication_commands',
-- Client connection defaults
'client_min_messages', 'search_path',
-- Resource usage (DoS prevention)
'statement_timeout', 'idle_in_transaction_session_timeout',
'idle_session_timeout', 'lock_timeout',
-- Write ahead log
'wal_level', 'archive_mode',
-- Misc
'shared_preload_libraries', 'local_preload_libraries'
)
ORDER BY name
"""
status, result = conn.execute_dict(settings_query)
if status and result:
security_info['settings'] = result.get('rows', [])
else:
security_info['settings'] = []
# Get pg_hba.conf rules (if available via pg_hba_file_rules)
hba_query = """
SELECT line_number, type, database, user_name, address,
netmask, auth_method, options, error
FROM pg_hba_file_rules
ORDER BY line_number
"""
status, result = conn.execute_dict(hba_query)
if status and result:
security_info['hba_rules'] = result.get('rows', [])
else:
# View might not exist or user doesn't have permission
security_info['hba_rules'] = []
security_info['hba_note'] = 'Unable to read pg_hba.conf rules'
# Get superuser roles
superusers_query = """
SELECT rolname, rolcreaterole, rolcreatedb, rolbypassrls,
rolconnlimit, rolvaliduntil
FROM pg_roles
WHERE rolsuper = true
ORDER BY rolname
"""
status, result = conn.execute_dict(superusers_query)
if status and result:
security_info['superusers'] = result.get('rows', [])
else:
security_info['superusers'] = []
# Get roles with special privileges
special_roles_query = """
SELECT rolname, rolsuper, rolcreaterole, rolcreatedb,
rolreplication, rolbypassrls, rolcanlogin, rolconnlimit
FROM pg_roles
WHERE (rolcreaterole OR rolcreatedb OR rolreplication OR rolbypassrls)
AND NOT rolsuper
ORDER BY rolname
"""
status, result = conn.execute_dict(special_roles_query)
if status and result:
security_info['privileged_roles'] = result.get('rows', [])
else:
security_info['privileged_roles'] = []
# Get roles with no password expiry that can login
no_expiry_query = """
SELECT rolname, rolvaliduntil
FROM pg_roles
WHERE rolcanlogin = true
AND (rolvaliduntil IS NULL OR rolvaliduntil = 'infinity')
ORDER BY rolname
"""
status, result = conn.execute_dict(no_expiry_query)
if status and result:
security_info['roles_no_expiry'] = result.get('rows', [])
else:
security_info['roles_no_expiry'] = []
# Check for loaded extensions
extensions_query = """
SELECT extname, extversion
FROM pg_extension
ORDER BY extname
"""
status, result = conn.execute_dict(extensions_query)
if status and result:
security_info['extensions'] = result.get('rows', [])
else:
security_info['extensions'] = []
return security_info
# =============================================================================
# Database Security Report
# =============================================================================
@blueprint.route(
"/database-security-report/<int:sid>/<int:did>",
methods=["GET"],
endpoint='database_security_report'
)
@pga_login_required
def generate_database_security_report(sid, did):
"""
Generate a security report for the specified database.
Uses the multi-stage pipeline to analyze database security.
"""
from pgadmin.llm.utils import is_llm_enabled
from pgadmin.llm.reports.generator import generate_report_sync
from pgadmin.utils.driver import get_driver
# Check if LLM is configured
if not is_llm_enabled():
return make_json_response(
success=0,
errormsg=gettext(
'LLM is not configured. Please configure an LLM provider '
'in Preferences > AI.'
)
)
# Get database connection
try:
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(sid)
conn = manager.connection(did=did)
if not conn.connected():
return make_json_response(
success=0,
errormsg=gettext('Database is not connected.')
)
# Generate report using pipeline
context = {
'database_name': conn.db
}
success, result = generate_report_sync(
report_type='security',
scope='database',
conn=conn,
manager=manager,
context=context
)
if success:
return make_json_response(
success=1,
data={'report': result}
)
else:
return make_json_response(
success=0,
errormsg=result
)
except Exception as e:
return make_json_response(
success=0,
errormsg=gettext('Failed to generate report: ') + str(e)
)
@blueprint.route(
"/database-security-report/<int:sid>/<int:did>/stream",
methods=["GET"],
endpoint='database_security_report_stream'
)
@pgCSRFProtect.exempt
@pga_login_required
def generate_database_security_report_stream(sid, did):
"""
Stream a database security report via SSE.
"""
from pgadmin.llm.utils import is_llm_enabled
from pgadmin.llm.reports.generator import (
generate_report_streaming, create_sse_response
)
from pgadmin.utils.driver import get_driver
if not is_llm_enabled():
return make_json_response(
success=0,
errormsg=gettext(
'LLM is not configured. Please configure an LLM provider '
'in Preferences > AI.'
)
)
try:
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(sid)
conn = manager.connection(did=did)
if not conn.connected():
return make_json_response(
success=0,
errormsg=gettext('Database is not connected.')
)
context = {
'database_name': conn.db
}
generator = generate_report_streaming(
report_type='security',
scope='database',
conn=conn,
manager=manager,
context=context
)
return create_sse_response(generator)
except Exception as e:
return make_json_response(
success=0,
errormsg=str(e)
)
# =============================================================================
# Schema Security Report
# =============================================================================
@blueprint.route(
"/schema-security-report/<int:sid>/<int:did>/<int:scid>",
methods=["GET"],
endpoint='schema_security_report'
)
@pga_login_required
def generate_schema_security_report(sid, did, scid):
"""
Generate a security report for the specified schema.
Uses the multi-stage pipeline to analyze schema security.
"""
from pgadmin.llm.utils import is_llm_enabled
from pgadmin.llm.reports.generator import generate_report_sync
from pgadmin.utils.driver import get_driver
# Check if LLM is configured
if not is_llm_enabled():
return make_json_response(
success=0,
errormsg=gettext(
'LLM is not configured. Please configure an LLM provider '
'in Preferences > AI.'
)
)
# Get database connection
try:
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(sid)
conn = manager.connection(did=did)
if not conn.connected():
return make_json_response(
success=0,
errormsg=gettext('Database is not connected.')
)
# Get schema name from scid
schema_query = "SELECT nspname FROM pg_namespace WHERE oid = %s"
status, result = conn.execute_dict(schema_query, [scid])
if not status or not result.get('rows'):
return make_json_response(
success=0,
errormsg=gettext('Schema not found.')
)
schema_name = result['rows'][0]['nspname']
# Generate report using pipeline
context = {
'database_name': conn.db,
'schema_name': schema_name,
'schema_oid': scid
}
success, result = generate_report_sync(
report_type='security',
scope='schema',
conn=conn,
manager=manager,
context=context
)
if success:
return make_json_response(
success=1,
data={'report': result}
)
else:
return make_json_response(
success=0,
errormsg=result
)
except Exception as e:
return make_json_response(
success=0,
errormsg=gettext('Failed to generate report: ') + str(e)
)
@blueprint.route(
"/schema-security-report/<int:sid>/<int:did>/<int:scid>/stream",
methods=["GET"],
endpoint='schema_security_report_stream'
)
@pgCSRFProtect.exempt
@pga_login_required
def generate_schema_security_report_stream(sid, did, scid):
"""
Stream a schema security report via SSE.
"""
from pgadmin.llm.utils import is_llm_enabled
from pgadmin.llm.reports.generator import (
generate_report_streaming, create_sse_response
)
from pgadmin.utils.driver import get_driver
if not is_llm_enabled():
return make_json_response(
success=0,
errormsg=gettext(
'LLM is not configured. Please configure an LLM provider '
'in Preferences > AI.'
)
)
try:
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(sid)
conn = manager.connection(did=did)
if not conn.connected():
return make_json_response(
success=0,
errormsg=gettext('Database is not connected.')
)
# Get schema name from scid
schema_query = "SELECT nspname FROM pg_namespace WHERE oid = %s"
status, result = conn.execute_dict(schema_query, [scid])
if not status or not result.get('rows'):
return make_json_response(
success=0,
errormsg=gettext('Schema not found.')
)
schema_name = result['rows'][0]['nspname']
context = {
'database_name': conn.db,
'schema_name': schema_name,
'schema_oid': scid
}
generator = generate_report_streaming(
report_type='security',
scope='schema',
conn=conn,
manager=manager,
context=context
)
return create_sse_response(generator)
except Exception as e:
return make_json_response(
success=0,
errormsg=str(e)
)
# =============================================================================
# Server Performance Report
# =============================================================================
@blueprint.route(
"/performance-report/<int:sid>",
methods=["GET"],
endpoint='performance_report'
)
@pga_login_required
def generate_performance_report(sid):
"""
Generate a performance report for the specified server.
Uses the multi-stage pipeline to analyze server performance.
"""
from pgadmin.llm.utils import is_llm_enabled
from pgadmin.llm.reports.generator import generate_report_sync
from pgadmin.utils.driver import get_driver
# Check if LLM is configured
if not is_llm_enabled():
return make_json_response(
success=0,
errormsg=gettext(
'LLM is not configured. Please configure an LLM provider '
'in Preferences > AI.'
)
)
# Get database connection
try:
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(sid)
conn = manager.connection()
if not conn.connected():
return make_json_response(
success=0,
errormsg=gettext('Server is not connected.')
)
# Generate report using pipeline
context = {}
success, result = generate_report_sync(
report_type='performance',
scope='server',
conn=conn,
manager=manager,
context=context
)
if success:
return make_json_response(
success=1,
data={'report': result}
)
else:
return make_json_response(
success=0,
errormsg=result
)
except Exception as e:
return make_json_response(
success=0,
errormsg=gettext('Failed to generate report: ') + str(e)
)
@blueprint.route(
"/performance-report/<int:sid>/stream",
methods=["GET"],
endpoint='performance_report_stream'
)
@pgCSRFProtect.exempt
@pga_login_required
def generate_performance_report_stream(sid):
"""
Stream a server performance report via SSE.
"""
from pgadmin.llm.utils import is_llm_enabled
from pgadmin.llm.reports.generator import (
generate_report_streaming, create_sse_response
)
from pgadmin.utils.driver import get_driver
if not is_llm_enabled():
return make_json_response(
success=0,
errormsg=gettext(
'LLM is not configured. Please configure an LLM provider '
'in Preferences > AI.'
)
)
try:
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(sid)
conn = manager.connection()
if not conn.connected():
return make_json_response(
success=0,
errormsg=gettext('Server is not connected.')
)
context = {}
generator = generate_report_streaming(
report_type='performance',
scope='server',
conn=conn,
manager=manager,
context=context
)
return create_sse_response(generator)
except Exception as e:
return make_json_response(
success=0,
errormsg=str(e)
)
# =============================================================================
# Database Performance Report
# =============================================================================
@blueprint.route(
"/database-performance-report/<int:sid>/<int:did>",
methods=["GET"],
endpoint='database_performance_report'
)
@pga_login_required
def generate_database_performance_report(sid, did):
"""
Generate a performance report for the specified database.
Uses the multi-stage pipeline to analyze database performance.
"""
from pgadmin.llm.utils import is_llm_enabled
from pgadmin.llm.reports.generator import generate_report_sync
from pgadmin.utils.driver import get_driver
# Check if LLM is configured
if not is_llm_enabled():
return make_json_response(
success=0,
errormsg=gettext(
'LLM is not configured. Please configure an LLM provider '
'in Preferences > AI.'
)
)
# Get database connection
try:
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(sid)
conn = manager.connection(did=did)
if not conn.connected():
return make_json_response(
success=0,
errormsg=gettext('Database is not connected.')
)
# Generate report using pipeline
context = {
'database_name': conn.db
}
success, result = generate_report_sync(
report_type='performance',
scope='database',
conn=conn,
manager=manager,
context=context
)
if success:
return make_json_response(
success=1,
data={'report': result}
)
else:
return make_json_response(
success=0,
errormsg=result
)
except Exception as e:
return make_json_response(
success=0,
errormsg=gettext('Failed to generate report: ') + str(e)
)
@blueprint.route(
"/database-performance-report/<int:sid>/<int:did>/stream",
methods=["GET"],
endpoint='database_performance_report_stream'
)
@pgCSRFProtect.exempt
@pga_login_required
def generate_database_performance_report_stream(sid, did):
"""
Stream a database performance report via SSE.
"""
from pgadmin.llm.utils import is_llm_enabled
from pgadmin.llm.reports.generator import (
generate_report_streaming, create_sse_response
)
from pgadmin.utils.driver import get_driver
if not is_llm_enabled():
return make_json_response(
success=0,
errormsg=gettext(
'LLM is not configured. Please configure an LLM provider '
'in Preferences > AI.'
)
)
try:
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(sid)
conn = manager.connection(did=did)
if not conn.connected():
return make_json_response(
success=0,
errormsg=gettext('Database is not connected.')
)
context = {
'database_name': conn.db
}
generator = generate_report_streaming(
report_type='performance',
scope='database',
conn=conn,
manager=manager,
context=context
)
return create_sse_response(generator)
except Exception as e:
return make_json_response(
success=0,
errormsg=str(e)
)
# =============================================================================
# Database Design Review
# =============================================================================
@blueprint.route(
"/database-design-report/<int:sid>/<int:did>",
methods=["GET"],
endpoint='database_design_report'
)
@pga_login_required
def generate_database_design_report(sid, did):
"""
Generate a design review report for the specified database.
Uses the multi-stage pipeline to analyze database schema design.
"""
from pgadmin.llm.utils import is_llm_enabled
from pgadmin.llm.reports.generator import generate_report_sync
from pgadmin.utils.driver import get_driver
# Check if LLM is configured
if not is_llm_enabled():
return make_json_response(
success=0,
errormsg=gettext(
'LLM is not configured. Please configure an LLM provider '
'in Preferences > AI.'
)
)
# Get database connection
try:
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(sid)
conn = manager.connection(did=did)
if not conn.connected():
return make_json_response(
success=0,
errormsg=gettext('Database is not connected.')
)
# Generate report using pipeline
context = {
'database_name': conn.db
}
success, result = generate_report_sync(
report_type='design',
scope='database',
conn=conn,
manager=manager,
context=context
)
if success:
return make_json_response(
success=1,
data={'report': result}
)
else:
return make_json_response(
success=0,
errormsg=result
)
except Exception as e:
return make_json_response(
success=0,
errormsg=gettext('Failed to generate report: ') + str(e)
)
@blueprint.route(
"/database-design-report/<int:sid>/<int:did>/stream",
methods=["GET"],
endpoint='database_design_report_stream'
)
@pgCSRFProtect.exempt
@pga_login_required
def generate_database_design_report_stream(sid, did):
"""
Stream a database design report via SSE.
"""
from pgadmin.llm.utils import is_llm_enabled
from pgadmin.llm.reports.generator import (
generate_report_streaming, create_sse_response
)
from pgadmin.utils.driver import get_driver
if not is_llm_enabled():
return make_json_response(
success=0,
errormsg=gettext(
'LLM is not configured. Please configure an LLM provider '
'in Preferences > AI.'
)
)
try:
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(sid)
conn = manager.connection(did=did)
if not conn.connected():
return make_json_response(
success=0,
errormsg=gettext('Database is not connected.')
)
context = {
'database_name': conn.db
}
generator = generate_report_streaming(
report_type='design',
scope='database',
conn=conn,
manager=manager,
context=context
)
return create_sse_response(generator)
except Exception as e:
return make_json_response(
success=0,
errormsg=str(e)
)
# =============================================================================
# Schema Design Review
# =============================================================================
@blueprint.route(
"/schema-design-report/<int:sid>/<int:did>/<int:scid>",
methods=["GET"],
endpoint='schema_design_report'
)
@pga_login_required
def generate_schema_design_report(sid, did, scid):
"""
Generate a design review report for the specified schema.
Uses the multi-stage pipeline to analyze schema design.
"""
from pgadmin.llm.utils import is_llm_enabled
from pgadmin.llm.reports.generator import generate_report_sync
from pgadmin.utils.driver import get_driver
# Check if LLM is configured
if not is_llm_enabled():
return make_json_response(
success=0,
errormsg=gettext(
'LLM is not configured. Please configure an LLM provider '
'in Preferences > AI.'
)
)
# Get database connection
try:
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(sid)
conn = manager.connection(did=did)
if not conn.connected():
return make_json_response(
success=0,
errormsg=gettext('Database is not connected.')
)
# Get schema name from scid
schema_query = "SELECT nspname FROM pg_namespace WHERE oid = %s"
status, result = conn.execute_dict(schema_query, [scid])
if not status or not result.get('rows'):
return make_json_response(
success=0,
errormsg=gettext('Schema not found.')
)
schema_name = result['rows'][0]['nspname']
# Generate report using pipeline
context = {
'database_name': conn.db,
'schema_name': schema_name,
'schema_oid': scid
}
success, result = generate_report_sync(
report_type='design',
scope='schema',
conn=conn,
manager=manager,
context=context
)
if success:
return make_json_response(
success=1,
data={'report': result}
)
else:
return make_json_response(
success=0,
errormsg=result
)
except Exception as e:
return make_json_response(
success=0,
errormsg=gettext('Failed to generate report: ') + str(e)
)
@blueprint.route(
"/schema-design-report/<int:sid>/<int:did>/<int:scid>/stream",
methods=["GET"],
endpoint='schema_design_report_stream'
)
@pgCSRFProtect.exempt
@pga_login_required
def generate_schema_design_report_stream(sid, did, scid):
"""
Stream a schema design report via SSE.
"""
from pgadmin.llm.utils import is_llm_enabled
from pgadmin.llm.reports.generator import (
generate_report_streaming, create_sse_response
)
from pgadmin.utils.driver import get_driver
if not is_llm_enabled():
return make_json_response(
success=0,
errormsg=gettext(
'LLM is not configured. Please configure an LLM provider '
'in Preferences > AI.'
)
)
try:
driver = get_driver(config.PG_DEFAULT_DRIVER)
manager = driver.connection_manager(sid)
conn = manager.connection(did=did)
if not conn.connected():
return make_json_response(
success=0,
errormsg=gettext('Database is not connected.')
)
# Get schema name from scid
schema_query = "SELECT nspname FROM pg_namespace WHERE oid = %s"
status, result = conn.execute_dict(schema_query, [scid])
if not status or not result.get('rows'):
return make_json_response(
success=0,
errormsg=gettext('Schema not found.')
)
schema_name = result['rows'][0]['nspname']
context = {
'database_name': conn.db,
'schema_name': schema_name,
'schema_oid': scid
}
generator = generate_report_streaming(
report_type='design',
scope='schema',
conn=conn,
manager=manager,
context=context
)
return create_sse_response(generator)
except Exception as e:
return make_json_response(
success=0,
errormsg=str(e)
)