Implemented a server-side cursor to enhance performance when retrieving large datasets. #5797

pull/8911/head
Khushboo Vashi 2025-07-02 07:17:01 +00:00 committed by GitHub
parent 1280cf5108
commit f207818afa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 391 additions and 147 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 45 KiB

View File

@ -552,6 +552,10 @@ Use the fields on the *Options* panel to manage editor preferences.
will warn upon clicking the *Execute Query* button in the query tool. The warning
will appear only if *Underline query at cursor?* is set to *False*.
* When the *Use server cursor?* switch is set to *True*, the dataset will be fetched
using a server-side cursor after the query is executed.
.. image:: images/preferences_sql_results_grid.png
:alt: Preferences sql results grid section
:align: center

View File

@ -558,3 +558,31 @@ To execute a macro, simply select the appropriate shortcut keys, or select it fr
.. image:: images/query_output_data.png
:alt: Query Tool Macros Execution
:align: center
Server Side Cursor
******************
Server-side cursors allow partial retrieval of large datasets, making them particularly useful when working with
very large result sets. However, they may offer lower performance in typical, everyday usage scenarios.
To enable server-side cursors:
* Go to Preferences > Query Tool > Options and set "Use server cursor?" to True.
* Alternatively, you can enable it on a per-session basis via the Query Tools Execute menu.
.. image:: images/query_tool_server_cursor_execute_menu.png
:alt: Query Tool Server Cursor
:align: center
Limitations:
1. Transaction Requirement: Server-side cursors work only in transaction mode.
If enabled pgAdmin will automatically ensure queries run within a transaction.
2. Limited Use Case: Use server-side cursors only when fetching large datasets.
3. Pagination Limitation: In the Result Grid, the First and Last page buttons will be disabled,
as server-side cursors do not return a total row count. Consequently, the total number of rows
will not be displayed after execution.

View File

@ -134,6 +134,7 @@ export class FileTreeItem extends React.Component<IItemRendererXProps & IItemRen
}
private readonly setActiveFile = async (FileOrDir): Promise<void> => {
this.props.changeDirectoryCount(FileOrDir.parent);
if(FileOrDir._loaded !== true) {
this.events.dispatch(FileTreeXEvent.onTreeEvents, window.event, 'added', FileOrDir);

View File

@ -73,7 +73,7 @@ export default function ObjectExplorerToolbar() {
<Box display="flex" alignItems="center" gap="2px">
<PgButtonGroup size="small">
<ToolbarButton icon={<QueryToolIcon />} menuItem={menus['query_tool']} shortcut={browserPref?.sub_menu_query_tool} />
<ToolbarButton icon={<ViewDataIcon />} menuItem={menus['view_all_rows_context'] ??
<ToolbarButton icon={<ViewDataIcon />} menuItem={menus['view_all_rows_context'] ??
{label :gettext('All Rows')}}
shortcut={browserPref?.sub_menu_view_data} />
<ToolbarButton icon={<RowFilterIcon />} menuItem={menus['view_filtered_rows_context'] ?? { label : gettext('Filtered Rows...')}} />

View File

@ -146,7 +146,8 @@ class SqlEditorModule(PgAdminModule):
'sqleditor.get_new_connection_user',
'sqleditor._check_server_connection_status',
'sqleditor.get_new_connection_role',
'sqleditor.connect_server'
'sqleditor.connect_server',
'sqleditor.server_cursor',
]
def on_logout(self):
@ -203,9 +204,15 @@ def initialize_viewdata(trans_id, cmd_type, obj_type, sgid, sid, did, obj_id):
"""
if request.data:
filter_sql = json.loads(request.data)
_data = json.loads(request.data)
else:
filter_sql = request.args or request.form
_data = request.args or request.form
filter_sql = _data['filter_sql'] if 'filter_sql' in _data else None
server_cursor = _data['server_cursor'] if\
'server_cursor' in _data and (
_data['server_cursor'] == 'true' or _data['server_cursor'] is True
) else False
# Create asynchronous connection using random connection id.
conn_id = str(secrets.choice(range(1, 9999999)))
@ -242,8 +249,9 @@ def initialize_viewdata(trans_id, cmd_type, obj_type, sgid, sid, did, obj_id):
command_obj = ObjectRegistry.get_object(
obj_type, conn_id=conn_id, sgid=sgid, sid=sid,
did=did, obj_id=obj_id, cmd_type=cmd_type,
sql_filter=filter_sql
sql_filter=filter_sql, server_cursor=server_cursor
)
except ObjectGone:
raise
except Exception as e:
@ -354,6 +362,8 @@ def panel(trans_id):
if 'database_name' in params:
params['database_name'] = (
underscore_escape(params['database_name']))
params['server_cursor'] = params[
'server_cursor'] if 'server_cursor' in params else False
return render_template(
"sqleditor/index.html",
@ -485,6 +495,8 @@ def _init_sqleditor(trans_id, connect, sgid, sid, did, dbname=None, **kwargs):
kwargs['auto_commit'] = pref.preference('auto_commit').get()
if kwargs.get('auto_rollback', None) is None:
kwargs['auto_rollback'] = pref.preference('auto_rollback').get()
if kwargs.get('server_cursor', None) is None:
kwargs['server_cursor'] = pref.preference('server_cursor').get()
try:
conn = manager.connection(conn_id=conn_id,
@ -544,6 +556,7 @@ def _init_sqleditor(trans_id, connect, sgid, sid, did, dbname=None, **kwargs):
# Set the value of auto commit and auto rollback specified in Preferences
command_obj.set_auto_commit(kwargs['auto_commit'])
command_obj.set_auto_rollback(kwargs['auto_rollback'])
command_obj.set_server_cursor(kwargs['server_cursor'])
# Set the value of database name, that will be used later
command_obj.dbname = dbname if dbname else None
@ -909,8 +922,15 @@ def start_view_data(trans_id):
update_session_grid_transaction(trans_id, session_obj)
if trans_obj.server_cursor:
conn.release_async_cursor()
conn.execute_void("BEGIN;")
# Execute sql asynchronously
status, result = conn.execute_async(sql)
status, result = conn.execute_async(
sql,
server_cursor=trans_obj.server_cursor)
else:
status = False
result = error_msg
@ -947,6 +967,7 @@ def start_query_tool(trans_id):
)
connect = 'connect' in request.args and request.args['connect'] == '1'
is_error, errmsg = check_and_upgrade_to_qt(trans_id, connect)
if is_error:
return make_json_response(success=0, errormsg=errmsg,
@ -1209,6 +1230,7 @@ def poll(trans_id):
'transaction_status': transaction_status,
'data_obj': data_obj,
'pagination': pagination,
'server_cursor': trans_obj.server_cursor,
}
)
@ -1837,11 +1859,59 @@ def check_and_upgrade_to_qt(trans_id, connect):
'conn_id': data.conn_id
}
is_error, errmsg, _, _ = _init_sqleditor(
trans_id, connect, data.sgid, data.sid, data.did, **kwargs)
trans_id, connect, data.sgid, data.sid, data.did,
**kwargs)
return is_error, errmsg
def set_pref_options(trans_id, operation):
if request.data:
_data = json.loads(request.data)
else:
_data = request.args or request.form
connect = 'connect' in request.args and request.args['connect'] == '1'
is_error, errmsg = check_and_upgrade_to_qt(trans_id, connect)
if is_error:
return make_json_response(success=0, errormsg=errmsg,
info=ERROR_MSG_FAIL_TO_PROMOTE_QT,
status=404)
# Check the transaction and connection status
status, error_msg, conn, trans_obj, session_obj = \
check_transaction_status(trans_id)
if error_msg == ERROR_MSG_TRANS_ID_NOT_FOUND:
return make_json_response(success=0, errormsg=error_msg,
info='DATAGRID_TRANSACTION_REQUIRED',
status=404)
if (status and conn is not None and
trans_obj is not None and session_obj is not None):
res = None
if operation == 'auto_commit':
# Call the set_auto_commit method of transaction object
trans_obj.set_auto_commit(_data)
elif operation == 'auto_rollback':
trans_obj.set_auto_rollback(_data)
elif operation == 'server_cursor':
trans_obj.set_server_cursor(_data)
# As we changed the transaction object we need to
# restore it and update the session variable.
session_obj['command_obj'] = pickle.dumps(trans_obj, -1)
update_session_grid_transaction(trans_id, session_obj)
else:
status = False
res = error_msg
return make_json_response(data={'status': status, 'result': res})
@blueprint.route(
'/auto_commit/<int:trans_id>',
methods=["PUT", "POST"], endpoint='auto_commit'
@ -1854,45 +1924,7 @@ def set_auto_commit(trans_id):
Args:
trans_id: unique transaction id
"""
if request.data:
auto_commit = json.loads(request.data)
else:
auto_commit = request.args or request.form
connect = 'connect' in request.args and request.args['connect'] == '1'
is_error, errmsg = check_and_upgrade_to_qt(trans_id, connect)
if is_error:
return make_json_response(success=0, errormsg=errmsg,
info=ERROR_MSG_FAIL_TO_PROMOTE_QT,
status=404)
# Check the transaction and connection status
status, error_msg, conn, trans_obj, session_obj = \
check_transaction_status(trans_id)
if error_msg == ERROR_MSG_TRANS_ID_NOT_FOUND:
return make_json_response(success=0, errormsg=error_msg,
info='DATAGRID_TRANSACTION_REQUIRED',
status=404)
if status and conn is not None and \
trans_obj is not None and session_obj is not None:
res = None
# Call the set_auto_commit method of transaction object
trans_obj.set_auto_commit(auto_commit)
# As we changed the transaction object we need to
# restore it and update the session variable.
session_obj['command_obj'] = pickle.dumps(trans_obj, -1)
update_session_grid_transaction(trans_id, session_obj)
else:
status = False
res = error_msg
return make_json_response(data={'status': status, 'result': res})
return set_pref_options(trans_id, 'auto_commit')
@blueprint.route(
@ -1902,50 +1934,27 @@ def set_auto_commit(trans_id):
@pga_login_required
def set_auto_rollback(trans_id):
"""
This method is used to set the value for auto commit .
This method is used to set the value for auto rollback .
Args:
trans_id: unique transaction id
"""
if request.data:
auto_rollback = json.loads(request.data)
else:
auto_rollback = request.args or request.form
return set_pref_options(trans_id, 'auto_rollback')
connect = 'connect' in request.args and request.args['connect'] == '1'
is_error, errmsg = check_and_upgrade_to_qt(trans_id, connect)
if is_error:
return make_json_response(success=0, errormsg=errmsg,
info=ERROR_MSG_FAIL_TO_PROMOTE_QT,
status=404)
@blueprint.route(
'/server_cursor/<int:trans_id>',
methods=["PUT", "POST"], endpoint='server_cursor'
)
@pga_login_required
def set_server_cursor(trans_id):
"""
This method is used to set the value for server cursor.
# Check the transaction and connection status
status, error_msg, conn, trans_obj, session_obj = \
check_transaction_status(trans_id)
if error_msg == ERROR_MSG_TRANS_ID_NOT_FOUND:
return make_json_response(success=0, errormsg=error_msg,
info='DATAGRID_TRANSACTION_REQUIRED',
status=404)
if status and conn is not None and \
trans_obj is not None and session_obj is not None:
res = None
# Call the set_auto_rollback method of transaction object
trans_obj.set_auto_rollback(auto_rollback)
# As we changed the transaction object we need to
# restore it and update the session variable.
session_obj['command_obj'] = pickle.dumps(trans_obj, -1)
update_session_grid_transaction(trans_id, session_obj)
else:
status = False
res = error_msg
return make_json_response(data={'status': status, 'result': res})
Args:
trans_id: unique transaction id
"""
return set_pref_options(trans_id, 'server_cursor')
@blueprint.route(
@ -2181,12 +2190,18 @@ def start_query_download_tool(trans_id):
if not sql:
sql = trans_obj.get_sql(sync_conn)
if sql and query_commited:
if trans_obj.server_cursor:
sync_conn.release_async_cursor()
sync_conn.execute_void("BEGIN;")
# Re-execute the query to ensure the latest data is included
sync_conn.execute_async(sql)
sync_conn.execute_async(sql, server_cursor=trans_obj.server_cursor)
# This returns generator of records.
status, gen, conn_obj = \
sync_conn.execute_on_server_as_csv(records=10)
if trans_obj.server_cursor and query_commited:
sync_conn.execute_void("COMMIT;")
if not status:
return make_json_response(
data={

View File

@ -365,6 +365,8 @@ class GridCommand(BaseCommand, SQLFilter, FetchedRowTracker):
self.limit = 100
self.thread_native_id = None
self.server_cursor = kwargs['server_cursor'] if\
'server_cursor' in kwargs else None
def get_primary_keys(self, *args, **kwargs):
return None, None
@ -425,6 +427,9 @@ class GridCommand(BaseCommand, SQLFilter, FetchedRowTracker):
def set_thread_native_id(self, thread_native_id):
self.thread_native_id = thread_native_id
def set_server_cursor(self, server_cursor):
self.server_cursor = server_cursor
class TableCommand(GridCommand):
"""
@ -816,6 +821,7 @@ class QueryToolCommand(BaseCommand, FetchedRowTracker):
self.table_has_oids = False
self.columns_types = None
self.thread_native_id = None
self.server_cursor = False
def get_sql(self, default_conn=None):
return None
@ -917,6 +923,9 @@ class QueryToolCommand(BaseCommand, FetchedRowTracker):
def set_auto_commit(self, auto_commit):
self.auto_commit = auto_commit
def set_server_cursor(self, server_cursor):
self.server_cursor = server_cursor
def __set_updatable_results_attrs(self, sql_path,
table_oid, conn):
# Set template path for sql scripts and the table object id

View File

@ -125,7 +125,7 @@ export default class SQLEditor {
priority: 101,
label: gettext('All Rows'),
permission: AllPermissionTypes.TOOLS_QUERY_TOOL,
}, {
},{
name: 'view_first_100_rows_context_' + supportedNode,
node: supportedNode,
module: this,

View File

@ -130,12 +130,14 @@ export default function QueryToolComponent({params, pgWindow, pgAdmin, selectedN
connected_once: false,
connection_status: null,
connection_status_msg: '',
server_cursor: preferencesStore.getPreferencesForModule('sqleditor').server_cursor === true,
params: {
...params,
title: _.unescape(params.title),
is_query_tool: params.is_query_tool == 'true',
node_name: retrieveNodeName(selectedNodeInfo),
dbname: _.unescape(params.database_name) || getDatabaseLabel(selectedNodeInfo)
dbname: _.unescape(params.database_name) || getDatabaseLabel(selectedNodeInfo),
server_cursor: preferencesStore.getPreferencesForModule('sqleditor').server_cursor === true,
},
connection_list: [{
sgid: params.sgid,
@ -318,7 +320,7 @@ export default function QueryToolComponent({params, pgWindow, pgAdmin, selectedN
setQtStatePartial({ editor_disabled: false });
};
const initializeQueryTool = (password, explainObject=null, macroSQL='', executeCursor=false, reexecute=false)=>{
const initializeQueryTool = (password, explainObject=null, macroSQL='', executeCursor=false, executeServerCursor=false, reexecute=false)=>{
let selectedConn = _.find(qtState.connection_list, (c)=>c.is_selected);
let baseUrl = '';
if(qtState.params.is_query_tool) {
@ -336,12 +338,14 @@ export default function QueryToolComponent({params, pgWindow, pgAdmin, selectedN
...qtState.params,
});
}
eventBus.current.fireEvent(QUERY_TOOL_EVENTS.SERVER_CURSOR, executeServerCursor);
api.post(baseUrl, qtState.params.is_query_tool ? {
user: selectedConn.user,
role: selectedConn.role,
password: password,
dbname: selectedConn.database_name
} : qtState.params.sql_filter)
} : {sql_filter: qtState.params.sql_filter,
server_cursor: qtState.params.server_cursor})
.then(()=>{
setQtStatePartial({
connected: true,
@ -350,7 +354,7 @@ export default function QueryToolComponent({params, pgWindow, pgAdmin, selectedN
});
//this condition works if user is in View/Edit Data or user does not saved server or tunnel password and disconnected the server and executing the query
if(!qtState.params.is_query_tool || reexecute) {
eventBus.current.fireEvent(QUERY_TOOL_EVENTS.TRIGGER_EXECUTION, explainObject, macroSQL, executeCursor);
eventBus.current.fireEvent(QUERY_TOOL_EVENTS.TRIGGER_EXECUTION, explainObject, macroSQL, executeCursor, executeServerCursor);
let msg = `${selectedConn['server_name']}/${selectedConn['database_name']} - Database connected`;
pgAdmin.Browser.notifier.success(_.escape(msg));
}
@ -856,6 +860,7 @@ export default function QueryToolComponent({params, pgWindow, pgAdmin, selectedN
api: api,
modal: modal,
params: qtState.params,
server_cursor: qtState.server_cursor,
preferences: qtState.preferences,
mainContainerRef: containerRef,
editor_disabled: qtState.editor_disabled,
@ -892,7 +897,11 @@ export default function QueryToolComponent({params, pgWindow, pgAdmin, selectedN
};
});
},
}), [qtState.params, qtState.preferences, containerRef.current, qtState.editor_disabled, qtState.eol, qtState.current_file]);
updateServerCursor: (state) => {
setQtStatePartial(state);
},
}), [qtState.params, qtState.preferences, containerRef.current, qtState.editor_disabled, qtState.eol, qtState.current_file, qtState.server_cursor]);
const queryToolConnContextValue = React.useMemo(()=>({
connected: qtState.connected,
@ -952,6 +961,7 @@ QueryToolComponent.propTypes = {
bgcolor: PropTypes.string,
fgcolor: PropTypes.string,
is_query_tool: PropTypes.oneOfType([PropTypes.bool, PropTypes.string]).isRequired,
server_cursor: PropTypes.oneOfType([PropTypes.bool, PropTypes.string]),
user: PropTypes.string,
role: PropTypes.string,
server_name: PropTypes.string,

View File

@ -46,7 +46,7 @@ const StyledBox = styled(Box)(({theme}) => ({
...theme.mixins.panelBorder.bottom,
}));
function autoCommitRollback(type, api, transId, value) {
function changeQueryExecutionSettings(type, api, transId, value) {
let url = url_for(`sqleditor.${type}`, {
'trans_id': transId,
});
@ -123,8 +123,11 @@ export function MainToolBar({containerRef, onFilterClick, onManageMacros, onAddT
const checkMenuClick = useCallback((e)=>{
setCheckedMenuItems((prev)=>{
let newVal = !prev[e.value];
if(e.value === 'auto_commit' || e.value === 'auto_rollback') {
autoCommitRollback(e.value, queryToolCtx.api, queryToolCtx.params.trans_id, newVal)
if (e.value === 'server_cursor') {
queryToolCtx.updateServerCursor({server_cursor: newVal});
}
if(e.value === 'auto_commit' || e.value === 'auto_rollback' || e.value === 'server_cursor') {
changeQueryExecutionSettings(e.value, queryToolCtx.api, queryToolCtx.params.trans_id, newVal)
.catch ((error)=>{
newVal = prev[e.value];
eventBus.fireEvent(QUERY_TOOL_EVENTS.HANDLE_API_ERROR, error, {
@ -264,8 +267,8 @@ export function MainToolBar({containerRef, onFilterClick, onManageMacros, onAddT
};
useEffect(()=>{
if(isInTxn()) {
setDisableButton('commit', false);
setDisableButton('rollback', false);
setDisableButton('commit', queryToolCtx.params.server_cursor && !queryToolCtx.params.is_query_tool ?true:false);
setDisableButton('rollback', queryToolCtx.params.server_cursor && !queryToolCtx.params.is_query_tool ?true:false);
setDisableButton('execute-options', true);
} else {
setDisableButton('commit', true);
@ -338,6 +341,7 @@ export function MainToolBar({containerRef, onFilterClick, onManageMacros, onAddT
explain_settings: queryToolPref.explain_settings,
explain_wal: queryToolPref.explain_wal,
open_in_new_tab: queryToolPref.open_in_new_tab,
server_cursor: queryToolPref.server_cursor,
});
}
}
@ -625,6 +629,8 @@ export function MainToolBar({containerRef, onFilterClick, onManageMacros, onAddT
onClick={checkMenuClick}>{gettext('Auto commit?')}</PgMenuItem>
<PgMenuItem hasCheck value="auto_rollback" checked={checkedMenuItems['auto_rollback']}
onClick={checkMenuClick}>{gettext('Auto rollback on error?')}</PgMenuItem>
<PgMenuItem hasCheck value="server_cursor" checked={checkedMenuItems['server_cursor']}
onClick={checkMenuClick}>{gettext('Use server cursor?')}</PgMenuItem>
</PgMenu>
<PgMenu
anchorRef={explainMenuRef}

View File

@ -477,7 +477,6 @@ export class ResultSetUtils {
async saveResultsToFile(fileName, onProgress) {
try {
this.hasQueryCommitted = false;
await DownloadUtils.downloadFileStream({
url: url_for('sqleditor.query_tool_download', {
'trans_id': this.transId,
@ -724,6 +723,10 @@ export class ResultSetUtils {
let retMsg, tabMsg;
retMsg = tabMsg = gettext('Query returned successfully in %s.', this.queryRunTime());
if (httpMessage.data.data?.server_cursor) {
this.eventBus.fireEvent(QUERY_TOOL_EVENTS.SERVER_CURSOR, httpMessage.data.data?.server_cursor);
}
if(this.hasResultsToDisplay(httpMessage.data.data)) {
let msg1 = gettext('Successfully run. Total query runtime: %s.', this.queryRunTime());
let msg2 = gettext('%s rows affected.', httpMessage.data.data?.rows_affected);
@ -839,7 +842,6 @@ export function ResultSet() {
// NONE - no select, PAGE - show select all, ALL - select all.
const [allRowsSelect, setAllRowsSelect] = useState('NONE');
const modalId = MODAL_DIALOGS.QT_CONFIRMATIONS;
// We'll use this track if any changes were saved.
// It will help to decide whether results refresh is required or not on page change.
const pageDataDirty = useRef(false);

View File

@ -99,7 +99,7 @@ ShowDataOutputQueryPopup.propTypes = {
};
function PaginationInputs({pagination, totalRowCount, clearSelection}) {
function PaginationInputs({pagination, totalRowCount, clearSelection, serverCursor=false}) {
const eventBus = useContext(QueryToolEventsContext);
const [editPageRange, setEditPageRange] = useState(false);
const [errorInputs, setErrorInputs] = useState({
@ -117,7 +117,7 @@ function PaginationInputs({pagination, totalRowCount, clearSelection}) {
const goToPage = (pageNo)=>{
const from = (pageNo-1) * pagination.page_size + 1;
const to = from + pagination.page_size - 1;
eventBus.fireEvent(QUERY_TOOL_EVENTS.FETCH_WINDOW, from, to);
eventBus.fireEvent(QUERY_TOOL_EVENTS.FETCH_WINDOW, from, to, serverCursor);
clearSelection();
};
@ -205,16 +205,16 @@ function PaginationInputs({pagination, totalRowCount, clearSelection}) {
/>
</Box> : <span>{gettext('Showing rows: %s to %s', inputs.from, inputs.to)}</span>}
<PgButtonGroup>
{editPageRange && <PgIconButton size="xs"
{!serverCursor && editPageRange && <PgIconButton size="xs"
title={editPageRange ? gettext('Apply (or press Enter on input)') : gettext('Edit range')}
onClick={()=>eventBus.fireEvent(QUERY_TOOL_EVENTS.FETCH_WINDOW, inputs.from, inputs.to)}
disabled={errorInputs.from || errorInputs.to} icon={<CheckRoundedIcon />}
/>}
<PgIconButton size="xs"
{!serverCursor && <PgIconButton size="xs"
title={editPageRange ? gettext('Cancel edit') : gettext('Edit range')}
onClick={()=>setEditPageRange((prev)=>!prev)}
icon={editPageRange ? <EditOffRoundedIcon /> : <EditRoundedIcon />}
/>
/>}
</PgButtonGroup>
<div className='PaginationInputs-divider'>&nbsp;</div>
<span>{gettext('Page No:')}</span>
@ -228,15 +228,16 @@ function PaginationInputs({pagination, totalRowCount, clearSelection}) {
value={inputs.pageNo}
onChange={(value)=>onInputChange('pageNo', value)}
onKeyDown={onInputKeydownPageNo}
disabled={serverCursor}
error={errorInputs['pageNo']}
/>
<span> {gettext('of')} {pagination.page_count}</span>
<div className='PaginationInputs-divider'>&nbsp;</div>
<PgButtonGroup size="small">
<PgIconButton title={gettext('First Page')} disabled={pagination.page_no <= 1} onClick={()=>goToPage(1)} icon={<SkipPreviousRoundedIcon />}/>
<PgIconButton title={gettext('First Page')} disabled={pagination.page_no <= 1 || serverCursor} onClick={()=>goToPage(1)} icon={<SkipPreviousRoundedIcon />}/>
<PgIconButton title={gettext('Previous Page')} disabled={pagination.page_no <= 1} onClick={()=>goToPage(pagination.page_no-1)} icon={<FastRewindRoundedIcon />}/>
<PgIconButton title={gettext('Next Page')} disabled={pagination.page_no == pagination.page_count} onClick={()=>goToPage(pagination.page_no+1)} icon={<FastForwardRoundedIcon />}/>
<PgIconButton title={gettext('Last Page')} disabled={pagination.page_no == pagination.page_count} onClick={()=>goToPage(pagination.page_count)} icon={<SkipNextRoundedIcon />} />
<PgIconButton title={gettext('Next Page')} disabled={pagination.page_no == pagination.page_count && !serverCursor} onClick={()=>goToPage(pagination.page_no+1)} icon={<FastForwardRoundedIcon />}/>
<PgIconButton title={gettext('Last Page')} disabled={pagination.page_no == pagination.page_count || serverCursor} onClick={()=>goToPage(pagination.page_count)} icon={<SkipNextRoundedIcon />} />
</PgButtonGroup>
</Box>
);
@ -245,6 +246,7 @@ PaginationInputs.propTypes = {
pagination: PropTypes.object,
totalRowCount: PropTypes.number,
clearSelection: PropTypes.func,
serverCursor: PropTypes.bool,
};
export function ResultSetToolbar({query, canEdit, totalRowCount, pagination, allRowsSelect}) {
const eventBus = useContext(QueryToolEventsContext);
@ -450,7 +452,7 @@ export function ResultSetToolbar({query, canEdit, totalRowCount, pagination, all
</Box>
{totalRowCount > 0 &&
<Box>
<PaginationInputs key={JSON.stringify(pagination)} pagination={pagination} totalRowCount={totalRowCount} clearSelection={clearSelection} />
<PaginationInputs key={JSON.stringify(pagination)} pagination={pagination} totalRowCount={totalRowCount} clearSelection={clearSelection} serverCursor={queryToolCtx.server_cursor}/>
</Box>}
</StyledDiv>
<PgMenu

View File

@ -55,6 +55,7 @@ export function StatusBar({eol, handleEndOfLineChange}) {
const {openMenuName, toggleMenu, onMenuClose} = usePgMenuGroup();
// NONE - no select, PAGE - show select all, ALL - select all.
const [allRowsSelect, setAllRowsSelect] = useState('NONE');
const [serverCursor, setServerCursor] = useState(false);
useEffect(()=>{
eventBus.registerListener(QUERY_TOOL_EVENTS.CURSOR_ACTIVITY, (newPos)=>{
@ -82,6 +83,9 @@ export function StatusBar({eol, handleEndOfLineChange}) {
eventBus.registerListener(QUERY_TOOL_EVENTS.SELECTED_ROWS_COLS_CELL_CHANGED, (rows)=>{
setSelectedRowsCount(rows);
});
eventBus.registerListener(QUERY_TOOL_EVENTS.SERVER_CURSOR, (server_cursor)=>{
setServerCursor(server_cursor);
});
}, []);
useEffect(()=>{
@ -111,7 +115,7 @@ export function StatusBar({eol, handleEndOfLineChange}) {
return (
<StyledBox>
<Box className='StatusBar-padding StatusBar-divider'>{gettext('Total rows: %s', rowsCount)}</Box>
<Box className='StatusBar-padding StatusBar-divider'>{serverCursor && gettext('Query executed with server cursor')} {!serverCursor && gettext('Total rows: %s', rowsCount)}</Box>
{lastTaskText &&
<Box className='StatusBar-padding StatusBar-divider'>{lastTaskText} {hours.toString().padStart(2, '0')}:{minutes.toString().padStart(2, '0')}:{seconds.toString().padStart(2, '0')}.{msec.toString().padStart(3, '0')}</Box>
}

View File

@ -77,7 +77,8 @@ export function showViewData(
connectionData,
treeIdentifier,
transId,
filter=false
filter=false,
server_cursor=false
) {
const node = pgBrowser.tree.findNodeByDomElement(treeIdentifier);
if (node === undefined || !node.getData()) {
@ -100,7 +101,7 @@ export function showViewData(
return;
}
const gridUrl = generateUrl(transId, connectionData, node.getData(), parentData);
const gridUrl = generateUrl(transId, connectionData, node.getData(), parentData, server_cursor);
const queryToolTitle = generateViewDataTitle(pgBrowser, treeIdentifier);
if(filter) {
@ -109,7 +110,7 @@ export function showViewData(
showFilterDialog(pgBrowser, treeIdentifier, queryToolMod, transId, gridUrl,
queryToolTitle, validateUrl);
} else {
queryToolMod.launch(transId, gridUrl, false, queryToolTitle);
queryToolMod.launch(transId, gridUrl, false, queryToolTitle, {server_cursor: server_cursor});
}
}
@ -145,7 +146,7 @@ export function retrieveNodeName(parentData) {
return '';
}
function generateUrl(trans_id, connectionData, nodeData, parentData) {
function generateUrl(trans_id, connectionData, nodeData, parentData, server_cursor=false) {
let url_endpoint = url_for('sqleditor.panel', {
'trans_id': trans_id,
});
@ -157,7 +158,8 @@ function generateUrl(trans_id, connectionData, nodeData, parentData) {
+`&sgid=${parentData.server_group._id}`
+`&sid=${parentData.server._id}`
+`&did=${parentData.database._id}`
+`&server_type=${parentData.server.server_type}`;
+`&server_type=${parentData.server.server_type}`
+`&server_cursor=${server_cursor}`;
if(!parentData.server.username && parentData.server.user?.name) {
url_endpoint += `&user=${parentData.server.user?.name}`;

View File

@ -0,0 +1,111 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2025, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from regression.python_test_utils import test_utils
import json
from pgadmin.utils import server_utils
import secrets
import config
from pgadmin.tools.sqleditor.tests.execute_query_test_utils \
import async_poll
class TestExecuteServerCursor(BaseTestGenerator):
"""
This class validates download csv
"""
scenarios = [
(
'Execute with server cursor',
dict(
sql='SELECT 1',
init_url='/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}',
)
)
]
def setUp(self):
self._db_name = 'server_cursor_' + str(
secrets.choice(range(10000, 65535)))
self._sid = self.server_information['server_id']
server_utils.connect_server(self, self._sid)
self._did = test_utils.create_database(
self.server, self._db_name
)
# This method is responsible for initiating query hit at least once,
# so that download csv works
def initiate_sql_query_tool(self, trans_id, sql_query):
# This code is to ensure to create a async cursor so that downloading
# csv can work.
# Start query tool transaction
url = '/sqleditor/query_tool/start/{0}'.format(trans_id)
response = self.tester.post(url, data=json.dumps({"sql": sql_query}),
content_type='html/json')
self.assertEqual(response.status_code, 200)
return async_poll(tester=self.tester,
poll_url='/sqleditor/poll/{0}'.format(trans_id))
def set_server_cursor(self, server_cursor):
_url = '/sqleditor/server_cursor/{0}'.format(self.trans_id)
res = self.tester.post(_url, data=json.dumps(server_cursor))
self.assertEqual(res.status_code, 200)
def runTest(self):
db_con = database_utils.connect_database(self,
test_utils.SERVER_GROUP,
self._sid,
self._did)
if db_con["info"] != "Database connected.":
raise Exception("Could not connect to the database.")
# Initialize query tool
self.trans_id = str(secrets.choice(range(1, 9999999)))
url = self.init_url.format(
self.trans_id, test_utils.SERVER_GROUP, self._sid, self._did)
res = self.tester.post(url, data=json.dumps({
"dbname": self._db_name
}))
self.assertEqual(res.status_code, 200)
self.set_server_cursor(True)
response = self.initiate_sql_query_tool(self.trans_id, self.sql)
self.assertEqual(response.status_code, 200)
_resp = json.loads(response.data.decode())
self.assertTrue(_resp['data']['server_cursor'])
self.set_server_cursor(False)
# Close query tool
url = '/sqleditor/close/{0}'.format(self.trans_id)
response = self.tester.delete(url)
self.assertEqual(response.status_code, 200)
database_utils.disconnect_database(self, self._sid, self._did)
def tearDown(self):
main_conn = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
test_utils.drop_database(main_conn, self._db_name)

View File

@ -80,6 +80,17 @@ def register_query_tool_preferences(self):
'Tool tabs.')
)
self.server_cursor = self.preference.register(
'Options', 'server_cursor',
gettext("Use server cursor?"), 'boolean', False,
category_label=PREF_LABEL_OPTIONS,
help_str=gettext('If set to True, the dataset will be fetched using a'
' server-side cursor after the query is executed.'
' This allows controlled data transfer to the client,'
' enabling examination of large datasets without'
' loading them entirely into memory.')
)
self.show_prompt_save_query_changes = self.preference.register(
'Options', 'prompt_save_query_changes',
gettext("Prompt to save unsaved query changes?"), 'boolean', True,

View File

@ -101,7 +101,7 @@ class StartRunningQuery:
session_obj,
effective_sql_statement,
trans_id,
transaction_object
transaction_object,
)
can_edit = transaction_object.can_edit()
@ -137,18 +137,20 @@ class StartRunningQuery:
StartRunningQuery.save_transaction_in_session(session_obj,
trans_id, trans_obj)
# If auto commit is False and transaction status is Idle
# then call is_begin_not_required() function to check BEGIN
# is required or not.
if trans_obj.server_cursor and sql != 'COMMIT;' and sql != 'ROLLBACK;':
conn.release_async_cursor()
if StartRunningQuery.is_begin_required_for_sql_query(trans_obj,
conn, sql):
conn, sql
):
conn.execute_void("BEGIN;")
is_rollback_req = StartRunningQuery.is_rollback_statement_required(
trans_obj,
conn)
trans_obj.set_thread_native_id(None)
@copy_current_request_context
def asyn_exec_query(conn, sql, trans_obj, is_rollback_req,
app):
@ -156,9 +158,15 @@ class StartRunningQuery:
# and formatted_error is True.
with app.app_context():
try:
_, _ = conn.execute_async(sql)
# # If the transaction aborted for some reason and
# # Auto RollBack is True then issue a rollback to cleanup.
if trans_obj.server_cursor and (sql == 'COMMIT;' or
sql == 'ROLLBACK;'):
conn.execute_void(sql)
else:
_, _ = conn.execute_async(
sql, server_cursor=trans_obj.server_cursor)
# If the transaction aborted for some reason and
# Auto RollBack is True then issue a rollback
# to cleanup.
if is_rollback_req:
conn.execute_void("ROLLBACK;")
except Exception as e:
@ -178,10 +186,12 @@ class StartRunningQuery:
@staticmethod
def is_begin_required_for_sql_query(trans_obj, conn, sql):
return (not trans_obj.auto_commit and
conn.transaction_status() == TX_STATUS_IDLE and
is_begin_required(sql)
)
return ((trans_obj.server_cursor and trans_obj.auto_commit) or (
not trans_obj.auto_commit and
conn.transaction_status() == TX_STATUS_IDLE and
is_begin_required(sql)
))
@staticmethod
def is_rollback_statement_required(trans_obj, conn):

View File

@ -17,6 +17,7 @@ import os
import secrets
import datetime
import asyncio
import copy
from collections import deque
import psycopg
from flask import g, current_app
@ -30,7 +31,7 @@ from pgadmin.model import User
from pgadmin.utils.exception import ConnectionLost, CryptKeyMissing
from pgadmin.utils import get_complete_file_path
from ..abstract import BaseConnection
from .cursor import DictCursor, AsyncDictCursor
from .cursor import DictCursor, AsyncDictCursor, AsyncDictServerCursor
from .typecast import register_global_typecasters,\
register_string_typecasters, register_binary_typecasters, \
register_array_to_string_typecasters, ALL_JSON_TYPES
@ -186,6 +187,7 @@ class Connection(BaseConnection):
self.use_binary_placeholder = use_binary_placeholder
self.array_to_string = array_to_string
self.qtLiteral = get_driver(config.PG_DEFAULT_DRIVER).qtLiteral
self._autocommit = True
super(Connection, self).__init__()
@ -358,6 +360,7 @@ class Connection(BaseConnection):
prepare_threshold=manager.prepare_threshold
)
pg_conn = asyncio.run(connectdbserver())
pg_conn.server_cursor_factory = AsyncDictServerCursor
else:
pg_conn = psycopg.Connection.connect(
connection_string,
@ -704,9 +707,10 @@ WHERE db.datname = current_database()""")
self.conn_id.encode('utf-8')
), None)
if self.connected() and cur and not cur.closed and \
(not server_cursor or (server_cursor and cur.name)):
return True, cur
if self.connected() and cur and not cur.closed:
if not server_cursor or (
server_cursor and type(cur) is AsyncDictServerCursor):
return True, cur
if not self.connected():
errmsg = ""
@ -732,8 +736,10 @@ WHERE db.datname = current_database()""")
if server_cursor:
# Providing name to cursor will create server side cursor.
cursor_name = "CURSOR:{0}".format(self.conn_id)
self.conn.server_cursor_factory = AsyncDictServerCursor
cur = self.conn.cursor(
name=cursor_name
name=cursor_name,
scrollable=scrollable
)
else:
cur = self.conn.cursor(scrollable=scrollable)
@ -893,7 +899,10 @@ WHERE db.datname = current_database()""")
def gen(conn_obj, trans_obj, quote='strings', quote_char="'",
field_separator=',', replace_nulls_with=None):
cur.scroll(0, mode='absolute')
try:
cur.scroll(0, mode='absolute')
except Exception as e:
print(str(e))
results = cur.fetchmany(records)
if not results:
yield gettext('The query executed did not return any data.')
@ -1037,7 +1046,15 @@ WHERE db.datname = current_database()""")
return True, None
def execute_async(self, query, params=None, formatted_exception_msg=True):
def release_async_cursor(self):
if self.__async_cursor and not self.__async_cursor.closed:
try:
self.__async_cursor.close_cursor()
except Exception as e:
print("EXception==", str(e))
def execute_async(self, query, params=None, formatted_exception_msg=True,
server_cursor=False):
"""
This function executes the given query asynchronously and returns
result.
@ -1048,10 +1065,11 @@ WHERE db.datname = current_database()""")
formatted_exception_msg: if True then function return the
formatted exception message
"""
self.__async_cursor = None
self.__async_query_error = None
status, cur = self.__cursor(scrollable=True)
status, cur = self.__cursor(scrollable=True,
server_cursor=server_cursor)
if not status:
return False, str(cur)
@ -1501,7 +1519,7 @@ Failed to reset the connection to the server due to following error:
else:
status = 1
if not cur:
if not cur or cur.closed:
return False, self.CURSOR_NOT_FOUND
result = None
@ -1533,7 +1551,6 @@ Failed to reset the connection to the server due to following error:
result = []
try:
result = cur.fetchall(_tupples=True)
except psycopg.ProgrammingError:
result = None
except psycopg.Error:

View File

@ -15,10 +15,9 @@ result.
import asyncio
from collections import OrderedDict
import psycopg
from flask import g, current_app
from psycopg import Cursor as _cursor, AsyncCursor as _async_cursor
from typing import Any, Sequence
from psycopg import (Cursor as _cursor, AsyncCursor as _async_cursor,
AsyncServerCursor as _async_server_cursor)
from psycopg.rows import dict_row, tuple_row
from psycopg._encodings import py_codecs as encodings
from .encoding import configure_driver_encodings
@ -220,6 +219,7 @@ class AsyncDictCursor(_async_cursor):
def __init__(self, *args, **kwargs):
self._odt_desc = None
_async_cursor.__init__(self, *args, row_factory=dict_row)
self.cursor = _async_cursor
def _dict_tuple(self, tup):
"""
@ -234,8 +234,8 @@ class AsyncDictCursor(_async_cursor):
Transform the regular description to wrapper object, which handles
duplicate column name.
"""
self._odt_desc = _async_cursor.__getattribute__(self, 'description')
pgresult = _async_cursor.__getattribute__(self, 'pgresult')
self._odt_desc = self.cursor.__getattribute__(self, 'description')
pgresult = self.cursor.__getattribute__(self, 'pgresult')
desc = self._odt_desc
if desc is None or len(desc) == 0:
@ -289,21 +289,21 @@ class AsyncDictCursor(_async_cursor):
if params is not None and len(params) == 0:
params = None
return await _async_cursor.execute(self, query, params)
return await self.cursor.execute(self, query, params)
def executemany(self, query, params=None):
"""
Execute many function of regular cursor.
"""
self._odt_desc = None
return _async_cursor.executemany(self, query, params)
return self.cursor.executemany(self, query, params)
async def _close_cursor(self):
"""
Close the cursor.
"""
await _async_cursor.close(self)
await self.cursor.close(self)
def close_cursor(self):
"""
@ -328,13 +328,13 @@ class AsyncDictCursor(_async_cursor):
"""
Fetch many tuples as ordered dictionary list.
"""
return await _async_cursor.fetchmany(self, size)
return await self.cursor.fetchmany(self, size)
async def _fetchall(self):
"""
Fetch all tuples as ordered dictionary list.
"""
return await _async_cursor.fetchall(self)
return await self.cursor.fetchall(self)
def fetchall(self, _tupples=False):
"""
@ -353,7 +353,7 @@ class AsyncDictCursor(_async_cursor):
"""
Fetch all tuples as ordered dictionary list.
"""
return await _async_cursor.fetchone(self)
return await self.cursor.fetchone(self)
def fetchone(self):
"""
@ -382,7 +382,7 @@ class AsyncDictCursor(_async_cursor):
"""
Fetch all tuples as ordered dictionary list.
"""
return await _async_cursor.scroll(self, position, mode=mode)
return await self.cursor.scroll(self, position, mode=mode)
def scroll(self, position, mode="absolute"):
"""
@ -395,3 +395,15 @@ class AsyncDictCursor(_async_cursor):
return self.pgresult.ntuples
else:
return -1
class AsyncDictServerCursor(AsyncDictCursor, _async_server_cursor):
def __init__(self, *args, name=None, **kwargs):
self._odt_desc = None
_async_server_cursor.__init__(self, name=name, *args,
row_factory=dict_row)
self.cursor = _async_server_cursor
def get_rowcount(self):
return 1