Add support for LISTEN/NOTIFY in the query tool. Fixes #3204

pull/17/head
Akshay Joshi 2018-05-30 21:58:28 -04:00 committed by Dave Page
parent 2b4605a9d3
commit 38ee39ae7a
11 changed files with 335 additions and 19 deletions

View File

@ -11,6 +11,7 @@ Features
********
| `Bug #1447 <https://redmine.postgresql.org/issues/1447>`_ - Add support for SSH tunneled connections
| `Bug #3204 <https://redmine.postgresql.org/issues/3204>`_ - Add support for LISTEN/NOTIFY in the query tool
Bug fixes
*********

View File

@ -10,6 +10,9 @@
from __future__ import print_function
import time
import sys
from selenium.common.exceptions import StaleElementReferenceException
import config
from selenium.webdriver import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
@ -55,7 +58,7 @@ class QueryToolFeatureTest(BaseFeatureTest):
# explain query with verbose and cost
print("Explain query with verbose and cost... ",
file=sys.stderr, end="")
if self._test_explain_plan_feature():
if self._supported_server_version():
self._query_tool_explain_with_verbose_and_cost()
print("OK.", file=sys.stderr)
self._clear_query_tool()
@ -65,7 +68,7 @@ class QueryToolFeatureTest(BaseFeatureTest):
# explain analyze query with buffers and timing
print("Explain analyze query with buffers and timing... ",
file=sys.stderr, end="")
if self._test_explain_plan_feature():
if self._supported_server_version():
self._query_tool_explain_analyze_with_buffers_and_timing()
print("OK.", file=sys.stderr)
self._clear_query_tool()
@ -96,6 +99,11 @@ class QueryToolFeatureTest(BaseFeatureTest):
print("OK.", file=sys.stderr)
self._clear_query_tool()
# Notify Statements.
print("Capture Notify Statements... ", file=sys.stderr, end="")
self._query_tool_notify_statements()
self._clear_query_tool()
def after(self):
self.page.remove_server(self.server)
connection = test_utils.get_db_connection(
@ -144,8 +152,8 @@ class QueryToolFeatureTest(BaseFeatureTest):
self.page.click_element(
self.page.find_by_xpath("//*[@id='btn-clear-dropdown']")
)
ActionChains(self.driver)\
.move_to_element(self.page.find_by_xpath("//*[@id='btn-clear']"))\
ActionChains(self.driver) \
.move_to_element(self.page.find_by_xpath("//*[@id='btn-clear']")) \
.perform()
self.page.click_element(
self.page.find_by_xpath("//*[@id='btn-clear']")
@ -579,7 +587,7 @@ SELECT 1, pg_sleep(300)"""
# have 'auto-rollback fa fa-check visibility-hidden' classes
if 'auto-rollback fa fa-check' == str(
auto_rollback_check.get_attribute('class')):
auto_rollback_check.get_attribute('class')):
auto_rollback_btn.click()
auto_commit_btn = self.page.find_by_id("btn-auto-commit")
@ -592,7 +600,7 @@ SELECT 1, pg_sleep(300)"""
# have 'auto-commit fa fa-check visibility-hidden' classes
if 'auto-commit fa fa-check visibility-hidden' == str(
auto_commit_check.get_attribute('class')):
auto_commit_check.get_attribute('class')):
auto_commit_btn.click()
self.page.find_by_id("btn-flash").click()
@ -605,7 +613,7 @@ SELECT 1, pg_sleep(300)"""
'contains(string(), "canceling statement due to user request")]'
)
def _test_explain_plan_feature(self):
def _supported_server_version(self):
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
@ -615,3 +623,58 @@ SELECT 1, pg_sleep(300)"""
self.server['sslmode']
)
return connection.server_version > 90100
def _query_tool_notify_statements(self):
wait = WebDriverWait(self.page.driver, 60)
print("\n\tListen on an event... ", file=sys.stderr, end="")
self.page.fill_codemirror_area_with("LISTEN foo;")
self.page.find_by_id("btn-flash").click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
wait.until(EC.text_to_be_present_in_element(
(By.CSS_SELECTOR, ".sql-editor-message"), "LISTEN")
)
print("OK.", file=sys.stderr)
self._clear_query_tool()
print("\tNotify event without data... ", file=sys.stderr, end="")
self.page.fill_codemirror_area_with("NOTIFY foo;")
self.page.find_by_id("btn-flash").click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Notifications')
wait.until(EC.text_to_be_present_in_element(
(By.CSS_SELECTOR, "td.channel"), "foo")
)
print("OK.", file=sys.stderr)
self._clear_query_tool()
print("\tNotify event with data... ", file=sys.stderr, end="")
if self._supported_server_version():
self.page.fill_codemirror_area_with("SELECT pg_notify('foo', "
"'Hello')")
self.page.find_by_id("btn-flash").click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Notifications')
wait.until(WaitForAnyElementWithText(
(By.CSS_SELECTOR, 'td.payload'), "Hello"))
print("OK.", file=sys.stderr)
else:
print("Skipped.", file=sys.stderr)
class WaitForAnyElementWithText(object):
def __init__(self, locator, text):
self.locator = locator
self.text = text
def __call__(self, driver):
try:
elements = EC._find_elements(driver, self.locator)
for elem in elements:
if self.text in elem.text:
return True
return False
except StaleElementReferenceException:
return False

View File

@ -82,6 +82,8 @@ class ExecuteQuery {
self.loadingScreen.hide();
self.enableSQLEditorButtons();
self.sqlServerObject.update_msg_history(false, httpMessageData.data.result);
if ('notifies' in httpMessageData.data)
self.sqlServerObject.update_notifications(httpMessageData.data.notifies);
// Highlight the error in the sql panel
self.sqlServerObject._highlight_error(httpMessageData.data.result);
@ -116,6 +118,8 @@ class ExecuteQuery {
self.loadingScreen.setMessage('Loading data from the database server and rendering...');
self.sqlServerObject.call_render_after_poll(httpMessage.data.data);
if ('notifies' in httpMessage.data.data)
self.sqlServerObject.update_notifications(httpMessage.data.data.notifies);
} else if (ExecuteQuery.isQueryStillRunning(httpMessage)) {
// If status is Busy then poll the result by recursive call to the poll function
this.delayedPoll();

View File

@ -0,0 +1,131 @@
import gettext from 'sources/gettext';
import Backgrid from 'pgadmin.backgrid';
import Backbone from 'backbone';
import Alertify from 'pgadmin.alertifyjs';
let NotificationsModel = Backbone.Model.extend({
defaults: {
recorded_time: undefined,
event: undefined,
pid: undefined,
payload: undefined,
},
schema: [{
id: 'recorded_time',
label: gettext('Recorded time'),
cell: 'string',
type: 'text',
editable: false,
cellHeaderClasses: 'width_percent_20',
headerCell: Backgrid.Extension.CustomHeaderCell,
},{
id: 'channel',
label: gettext('Event'),
cell: 'string',
type: 'text',
editable: false,
cellHeaderClasses: 'width_percent_20',
headerCell: Backgrid.Extension.CustomHeaderCell,
},{
id: 'pid',
label: gettext('Process ID'),
cell: 'string',
type: 'text',
editable: false,
cellHeaderClasses: 'width_percent_20',
headerCell: Backgrid.Extension.CustomHeaderCell,
},{
id: 'payload',
label: gettext('Payload'),
cell: 'string',
type: 'text',
editable: false,
cellHeaderClasses: 'width_percent_40',
headerCell: Backgrid.Extension.CustomHeaderCell,
}],
});
let NotificationCollection = Backbone.Collection.extend({
model: NotificationsModel,
});
let queryToolNotifications = {
collection: null,
/* This function is responsible to create and render the
* new backgrid for the notification tab.
*/
renderNotificationsGrid: function(notifications_panel) {
if (!queryToolNotifications.collection)
queryToolNotifications.collection = new NotificationCollection();
let gridCols = [{
name: 'recorded_time',
label: gettext('Recorded time'),
type: 'text',
editable: false,
cell: 'string',
}, {
name: 'channel',
label: gettext('Event'),
type: 'text',
editable: false,
cell: 'string',
}, {
name: 'pid',
label: gettext('Process ID'),
type: 'text',
editable: false,
cell: 'string',
}, {
name: 'payload',
label: gettext('Payload'),
type: 'text',
editable: false,
cell: 'string',
}];
// Set up the grid
let notifications_grid = new Backgrid.Grid({
columns: gridCols,
collection: queryToolNotifications.collection,
className: 'backgrid table-bordered presentation table backgrid-striped',
});
// Render the grid
if (notifications_grid)
notifications_panel.$container.append(notifications_grid.render().el);
},
// This function is used to raise notify messages and update the
// notification grid.
updateNotifications: function(notify_messages) {
if (notify_messages != null && notify_messages.length > 0) {
for (let i in notify_messages) {
let notify_msg = '';
if (notify_messages[i].payload != '') {
notify_msg = gettext('Asynchronous notification "')
+ notify_messages[i].channel
+ gettext('" with payload "')
+ notify_messages[i].payload
+ gettext('" received from server process with PID ')
+ notify_messages[i].pid;
}
else {
notify_msg = gettext('Asynchronous notification "')
+ notify_messages[i].channel
+ gettext('" received from server process with PID ')
+ notify_messages[i].pid;
}
Alertify.info(notify_msg);
}
// Add notify messages to the collection.
queryToolNotifications.collection.add(notify_messages);
}
},
};
module.exports = queryToolNotifications;

View File

@ -72,6 +72,9 @@ define(['jquery', 'sources/gettext', 'sources/url_for'],
$el.data('panel-visible') !== 'visible' ) {
return;
}
let sqleditor_obj = target;
// Start polling..
$.ajax({
url: url,
@ -82,6 +85,9 @@ define(['jquery', 'sources/gettext', 'sources/url_for'],
msg = res.data.message,
is_status_changed = false;
// Raise notify messages comes from database server.
sqleditor_obj.update_notifications(res.data.notifies);
// Inject CSS as required
switch(status) {
// Busy

View File

@ -543,10 +543,12 @@ def poll(trans_id):
# There may be additional messages even if result is present
# eg: Function can provide result as well as RAISE messages
additional_messages = None
notifies = None
if status == 'Success':
messages = conn.messages()
if messages:
additional_messages = ''.join(messages)
notifies = conn.get_notifies()
# Procedure/Function output may comes in the form of Notices from the
# database server, so we need to append those outputs with the
@ -564,6 +566,7 @@ def poll(trans_id):
'rows_fetched_from': rows_fetched_from,
'rows_fetched_to': rows_fetched_to,
'additional_messages': additional_messages,
'notifies': notifies,
'has_more_rows': has_more_rows,
'colinfo': columns_info,
'primary_keys': primary_keys,
@ -1476,12 +1479,18 @@ def query_tool_status(trans_id):
if conn and trans_obj and session_obj:
status = conn.transaction_status()
# Check for the asynchronous notifies statements.
conn.check_notifies(True)
notifies = conn.get_notifies()
return make_json_response(
data={
'status': status,
'message': gettext(
CONNECTION_STATUS_MESSAGE_MAPPING.get(status)
)
CONNECTION_STATUS_MESSAGE_MAPPING.get(status),
),
'notifies': notifies
}
)
else:

View File

@ -20,6 +20,7 @@ define('tools.querytool', [
'react', 'react-dom',
'sources/keyboard_shortcuts',
'sources/sqleditor/query_tool_actions',
'sources/sqleditor/query_tool_notifications',
'pgadmin.datagrid',
'sources/modify_animation',
'sources/sqleditor/calculate_query_run_time',
@ -36,8 +37,8 @@ define('tools.querytool', [
pgExplain, GridSelector, ActiveCellCapture, clipboard, copyData, RangeSelectionHelper, handleQueryOutputKeyboardEvent,
XCellSelectionModel, setStagedRows, SqlEditorUtils, ExecuteQuery, httpErrorHandler, FilterHandler,
HistoryBundle, queryHistory, React, ReactDOM,
keyboardShortcuts, queryToolActions, Datagrid, modifyAnimation,
calculateQueryRunTime, callRenderAfterPoll) {
keyboardShortcuts, queryToolActions, queryToolNotifications, Datagrid,
modifyAnimation, calculateQueryRunTime, callRenderAfterPoll) {
/* Return back, this has been called more than once */
if (pgAdmin.SqlEditor)
return pgAdmin.SqlEditor;
@ -242,19 +243,32 @@ define('tools.querytool', [
content: '<div id ="history_grid" class="sql-editor-history-container" tabindex: "0"></div>',
});
var notifications = new pgAdmin.Browser.Panel({
name: 'notifications',
title: gettext('Notifications'),
width: '100%',
height: '100%',
isCloseable: false,
isPrivate: true,
content: '<div id ="notification_grid" class="sql-editor-notifications" tabindex: "0"></div>',
});
// Load all the created panels
data_output.load(main_docker);
explain.load(main_docker);
messages.load(main_docker);
history.load(main_docker);
notifications.load(main_docker);
// Add all the panels to the docker
self.data_output_panel = main_docker.addPanel('data_output', wcDocker.DOCK.BOTTOM, sql_panel_obj);
self.explain_panel = main_docker.addPanel('explain', wcDocker.DOCK.STACKED, self.data_output_panel);
self.messages_panel = main_docker.addPanel('messages', wcDocker.DOCK.STACKED, self.data_output_panel);
self.history_panel = main_docker.addPanel('history', wcDocker.DOCK.STACKED, self.data_output_panel);
self.notifications_panel = main_docker.addPanel('notifications', wcDocker.DOCK.STACKED, self.data_output_panel);
self.render_history_grid();
queryToolNotifications.renderNotificationsGrid(self.notifications_panel);
if (!self.handler.is_new_browser_tab) {
// Listen on the panel closed event and notify user to save modifications.
@ -3832,6 +3846,12 @@ define('tools.querytool', [
}
});
},
/* This function is used to raise notify messages and update
* the notification grid.
*/
update_notifications: function (notifications) {
queryToolNotifications.updateNotifications(notifications);
},
});
pgAdmin.SqlEditor = {

View File

@ -47,6 +47,7 @@ class StartRunningQuery:
transaction_object = pickle.loads(session_obj['command_obj'])
can_edit = False
can_filter = False
notifies = None
if transaction_object is not None and session_obj is not None:
# set fetched row count to 0 as we are executing query again.
transaction_object.update_fetched_row_cnt(0)
@ -88,6 +89,8 @@ class StartRunningQuery:
can_edit = transaction_object.can_edit()
can_filter = transaction_object.can_filter()
# Get the notifies
notifies = conn.get_notifies()
else:
status = False
result = gettext(
@ -97,7 +100,8 @@ class StartRunningQuery:
'status': status, 'result': result,
'can_edit': can_edit, 'can_filter': can_filter,
'info_notifier_timeout':
self.blueprint_object.info_notifier_timeout.get()
self.blueprint_object.info_notifier_timeout.get(),
'notifies': notifies
}
)

View File

@ -117,7 +117,8 @@ class StartRunningQueryTest(BaseTestGenerator):
'not found.',
can_edit=False,
can_filter=False,
info_notifier_timeout=5
info_notifier_timeout=5,
notifies=None
)
),
expect_internal_server_error_called_with=None,
@ -276,7 +277,8 @@ class StartRunningQueryTest(BaseTestGenerator):
result='async function result output',
can_edit=True,
can_filter=True,
info_notifier_timeout=5
info_notifier_timeout=5,
notifies=None
)
),
expect_internal_server_error_called_with=None,
@ -319,7 +321,8 @@ class StartRunningQueryTest(BaseTestGenerator):
result='async function result output',
can_edit=True,
can_filter=True,
info_notifier_timeout=5
info_notifier_timeout=5,
notifies=None
)
),
expect_internal_server_error_called_with=None,
@ -362,7 +365,8 @@ class StartRunningQueryTest(BaseTestGenerator):
result='async function result output',
can_edit=True,
can_filter=True,
info_notifier_timeout=5
info_notifier_timeout=5,
notifies=None
)
),
expect_internal_server_error_called_with=None,
@ -406,7 +410,8 @@ class StartRunningQueryTest(BaseTestGenerator):
result='async function result output',
can_edit=True,
can_filter=True,
info_notifier_timeout=5
info_notifier_timeout=5,
notifies=None
)
),
expect_internal_server_error_called_with=None,
@ -511,8 +516,10 @@ class StartRunningQueryTest(BaseTestGenerator):
connect=MagicMock(),
execute_async=MagicMock(),
execute_void=MagicMock(),
get_notifies=MagicMock(),
)
self.connection.connect.return_value = self.connection_connect_return
self.connection.get_notifies.return_value = None
self.connection.execute_async.return_value = \
self.execute_async_return_value
if self.manager_connection_exception is None:

View File

@ -16,6 +16,7 @@ object.
import random
import select
import sys
import datetime
from collections import deque
import simplejson as json
import psycopg2
@ -136,6 +137,13 @@ class Connection(BaseConnection):
formatted error message if flag is set to true else return
normal error message.
* check_notifies(required_polling)
- Check for the notify messages by polling the connection or after
execute is there in notifies.
* get_notifies()
- This function will returns list of notifies received from database
server.
"""
def __init__(self, manager, conn_id, db, auto_reconnect=True, async=0,
@ -155,6 +163,7 @@ class Connection(BaseConnection):
self.execution_aborted = False
self.row_count = 0
self.__notices = None
self.__notifies = None
self.password = None
# This flag indicates the connection status (connected/disconnected).
self.wasConnected = False
@ -891,6 +900,7 @@ WHERE
try:
self.__notices = []
self.__notifies = []
self.execution_aborted = False
cur.execute(query, params)
res = self._wait_timeout(cur.connection)
@ -908,6 +918,9 @@ WHERE
)
)
# Check for the asynchronous notifies.
self.check_notifies()
if self.is_disconnected(pe):
raise ConnectionLost(
self.manager.sid,
@ -1366,6 +1379,9 @@ Failed to reset the connection to the server due to following error:
self.__notices.extend(self.conn.notices)
self.conn.notices.clear()
# Check for the asynchronous notifies.
self.check_notifies()
# We also need to fetch notices before we return from function in case
# of any Exception, To avoid code duplication we will return after
# fetching the notices in case of any Exception
@ -1542,6 +1558,21 @@ Failed to reset the connection to the server due to following error:
resp = []
while self.__notices:
resp.append(self.__notices.pop(0))
for notify in self.__notifies:
if notify.payload is not None and notify.payload is not '':
notify_msg = gettext(
"Asynchronous notification \"{0}\" with payload \"{1}\" "
"received from server process with PID {2}\n"
).format(notify.channel, notify.payload, notify.pid)
else:
notify_msg = gettext(
"Asynchronous notification \"{0}\" received from "
"server process with PID {1}\n"
).format(notify.channel, notify.pid)
resp.append(notify_msg)
return resp
def decode_to_utf8(self, value):
@ -1711,3 +1742,34 @@ Failed to reset the connection to the server due to following error:
return False
return True
def check_notifies(self, required_polling=False):
"""
Check for the notify messages by polling the connection or after
execute is there in notifies.
"""
if self.conn and required_polling:
self.conn.poll()
if self.conn and hasattr(self.conn, 'notifies') and \
len(self.conn.notifies) > 0:
self.__notifies.extend(self.conn.notifies)
self.conn.notifies = []
else:
self.__notifies = []
def get_notifies(self):
"""
This function will returns list of notifies received from database
server.
"""
notifies = None
# Convert list of Notify objects into list of Dict.
if self.__notifies is not None and len(self.__notifies) > 0:
notifies = [{'recorded_time': str(datetime.datetime.now()),
'channel': notify.channel,
'payload': notify.payload,
'pid': notify.pid
} for notify in self.__notifies
]
return notifies

View File

@ -43,6 +43,7 @@ describe('ExecuteQuery', () => {
'saveState',
'initTransaction',
'handle_connection_lost',
'update_notifications',
]);
sqlEditorMock.transId = 123;
sqlEditorMock.rows_affected = 1000;
@ -76,7 +77,7 @@ describe('ExecuteQuery', () => {
describe('when query was successful', () => {
beforeEach(() => {
response = {
data: {status: 'Success'},
data: {status: 'Success', notifies: [{'pid': 100}]},
};
networkMock.onGet('/sqleditor/query_tool/poll/123').reply(200, response);
@ -97,7 +98,15 @@ describe('ExecuteQuery', () => {
it('should render the results', (done) => {
setTimeout(() => {
expect(sqlEditorMock.call_render_after_poll)
.toHaveBeenCalledWith({status: 'Success'});
.toHaveBeenCalledWith({status: 'Success', notifies: [{'pid': 100}]});
done();
}, 0);
});
it('should update the notification panel', (done) => {
setTimeout(() => {
expect(sqlEditorMock.update_notifications)
.toHaveBeenCalledWith([{'pid': 100}]);
done();
}, 0);
});