From d8f9f011ff37ff615d12677bc7f881cdc1b6069d Mon Sep 17 00:00:00 2001 From: Delmer Reed Date: Thu, 17 Jan 2019 15:00:10 -0500 Subject: [PATCH] Support true time series query cancellation Previously, outdated queries in the `TimeSeries` and `QueryBuilderFetcher` would run to completion even though their results were ignored. Now, pending but outdated queries will be truly canceled via `XmlHttpRequest#abort`. This frees up server and network resources. Closes #10801 Co-authored-by: Delmer Reed --- .../components/verifyStep/DataListening.tsx | 2 +- ui/src/shared/actions/v2/queryBuilder.ts | 6 +- ui/src/shared/apis/v2/query.ts | 14 ++- ui/src/shared/apis/v2/queryBuilder.ts | 98 +++++++++---------- ui/src/shared/components/TimeSeries.tsx | 49 +++++++--- ui/src/types/promises.ts | 2 + ui/src/utils/restartable.test.ts | 3 +- ui/src/utils/restartable.ts | 2 +- 8 files changed, 105 insertions(+), 71 deletions(-) diff --git a/ui/src/onboarding/components/verifyStep/DataListening.tsx b/ui/src/onboarding/components/verifyStep/DataListening.tsx index 69b944bf69..1438116104 100644 --- a/ui/src/onboarding/components/verifyStep/DataListening.tsx +++ b/ui/src/onboarding/components/verifyStep/DataListening.tsx @@ -130,7 +130,7 @@ class DataListening extends PureComponent { '/api/v2/query', script, InfluxLanguage.Flux - ) + ).promise rowCount = response.rowCount timePassed = Number(new Date()) - this.startTime } catch (err) { diff --git a/ui/src/shared/actions/v2/queryBuilder.ts b/ui/src/shared/actions/v2/queryBuilder.ts index 7bd3d2f355..adff361efa 100644 --- a/ui/src/shared/actions/v2/queryBuilder.ts +++ b/ui/src/shared/actions/v2/queryBuilder.ts @@ -1,8 +1,5 @@ // APIs -import { - QueryBuilderFetcher, - CancellationError, -} from 'src/shared/apis/v2/queryBuilder' +import {QueryBuilderFetcher} from 'src/shared/apis/v2/queryBuilder' import {bucketsAPI} from 'src/utils/api' @@ -17,6 +14,7 @@ import { import {Dispatch} from 'redux-thunk' import {GetState} from 'src/types/v2' import {RemoteDataState} from 'src/types' +import {CancellationError} from 'src/types/promises' const fetcher = new QueryBuilderFetcher() diff --git a/ui/src/shared/apis/v2/query.ts b/ui/src/shared/apis/v2/query.ts index 73af17da9b..0951eebd85 100644 --- a/ui/src/shared/apis/v2/query.ts +++ b/ui/src/shared/apis/v2/query.ts @@ -3,6 +3,7 @@ import _ from 'lodash' import Deferred from 'src/utils/Deferred' import {InfluxLanguage} from 'src/types/v2/dashboards' +import {WrappedCancelablePromise, CancellationError} from 'src/types/promises' const CHECK_LIMIT_INTERVAL = 200 const MAX_ROWS = 50000 @@ -17,11 +18,11 @@ interface XHRError extends Error { xhr?: XMLHttpRequest } -export const executeQuery = async ( +export const executeQuery = ( url: string, query: string, language: InfluxLanguage = InfluxLanguage.Flux -): Promise => { +): WrappedCancelablePromise => { // We're using `XMLHttpRequest` directly here rather than through `axios` so // that we can poll the response size as it comes back. If the response size // is greater than a predefined limit, we close the HTTP connection and @@ -130,5 +131,12 @@ export const executeQuery = async ( xhr.setRequestHeader('Content-Type', 'application/json') xhr.send(body) - return deferred.promise + return { + promise: deferred.promise, + cancel: () => { + clearTimeout(interval) + xhr.abort() + deferred.reject(new CancellationError()) + }, + } } diff --git a/ui/src/shared/apis/v2/queryBuilder.ts b/ui/src/shared/apis/v2/queryBuilder.ts index a0c4204246..1305aebe61 100644 --- a/ui/src/shared/apis/v2/queryBuilder.ts +++ b/ui/src/shared/apis/v2/queryBuilder.ts @@ -1,6 +1,5 @@ // Libraries import {get} from 'lodash' -import uuid from 'uuid' // APIs import {executeQuery, ExecuteFluxQueryResult} from 'src/shared/apis/v2/query' @@ -8,27 +7,32 @@ import {parseResponse} from 'src/shared/parsing/flux/response' // Types import {InfluxLanguage, BuilderConfig} from 'src/types/v2' +import {WrappedCancelablePromise} from 'src/types/promises' export const SEARCH_DURATION = '30d' export const LIMIT = 200 -async function findBuckets(url: string): Promise { +type CancelableQuery = WrappedCancelablePromise + +function findBuckets(url: string): CancelableQuery { const query = `buckets() |> sort(columns: ["name"]) |> limit(n: ${LIMIT})` - const resp = await executeQuery(url, query, InfluxLanguage.Flux) - const parsed = extractCol(resp, 'name') + const {promise, cancel} = executeQuery(url, query, InfluxLanguage.Flux) - return parsed + return { + promise: promise.then(resp => extractCol(resp, 'name')), + cancel, + } } -async function findKeys( +function findKeys( url: string, bucket: string, tagsSelections: BuilderConfig['tags'], searchTerm: string = '' -): Promise { +): CancelableQuery { const tagFilters = formatTagFilterPredicate(tagsSelections) const searchFilter = formatSearchFilterCall(searchTerm) const previousKeyFilter = formatTagKeyFilterCall(tagsSelections) @@ -46,19 +50,21 @@ async function findKeys( |> limit(n: ${LIMIT}) ` - const resp = await executeQuery(url, query, InfluxLanguage.Flux) - const parsed = extractCol(resp, '_value') + const {promise, cancel} = executeQuery(url, query, InfluxLanguage.Flux) - return parsed + return { + promise: promise.then(resp => extractCol(resp, '_value')), + cancel, + } } -async function findValues( +function findValues( url: string, bucket: string, tagsSelections: BuilderConfig['tags'], key: string, searchTerm: string = '' -): Promise { +): CancelableQuery { const tagFilters = formatTagFilterPredicate(tagsSelections) const searchFilter = formatSearchFilterCall(searchTerm) @@ -69,10 +75,12 @@ async function findValues( |> limit(n: ${LIMIT}) ` - const resp = await executeQuery(url, query, InfluxLanguage.Flux) - const parsed = extractCol(resp, '_value') + const {promise, cancel} = executeQuery(url, query, InfluxLanguage.Flux) - return parsed + return { + promise: promise.then(resp => extractCol(resp, '_value')), + cancel, + } } function extractCol(resp: ExecuteFluxQueryResult, colName: string): string[] { @@ -118,25 +126,19 @@ function formatSearchFilterCall(searchTerm: string) { return `\n |> filter(fn: (r) => r._value =~ /(?i:${searchTerm})/)` } -export class CancellationError extends Error {} - export class QueryBuilderFetcher { - private findBucketsToken: string = '' - private findKeysTokens = [] - private findValuesTokens = [] + private findBucketsQuery: CancelableQuery + private findKeysQueries: CancelableQuery[] = [] + private findValuesQueries: CancelableQuery[] = [] public async findBuckets(url: string): Promise { - const token = uuid.v4() - - this.findBucketsToken = token - - const result = await findBuckets(url) - - if (token !== this.findBucketsToken) { - throw new CancellationError() + if (this.findBucketsQuery) { + this.findBucketsQuery.cancel() } - return result + this.findBucketsQuery = findBuckets(url) + + return this.findBucketsQuery.promise } public async findKeys( @@ -146,21 +148,21 @@ export class QueryBuilderFetcher { tagsSelections: BuilderConfig['tags'], searchTerm: string = '' ): Promise { - const token = uuid.v4() + this.cancelFindKeys(index) - this.findKeysTokens[index] = token - - const result = await findKeys(url, bucket, tagsSelections, searchTerm) - - if (token !== this.findKeysTokens[index]) { - throw new CancellationError() - } - - return result + this.findKeysQueries[index] = findKeys( + url, + bucket, + tagsSelections, + searchTerm + ) + return this.findKeysQueries[index].promise } public cancelFindKeys(index) { - this.findKeysTokens[index] = uuid.v4() + if (this.findKeysQueries[index]) { + this.findKeysQueries[index].cancel() + } } public async findValues( @@ -171,11 +173,9 @@ export class QueryBuilderFetcher { key: string, searchTerm: string = '' ): Promise { - const token = uuid.v4() + this.cancelFindValues(index) - this.findValuesTokens[index] = token - - const result = await findValues( + this.findValuesQueries[index] = findValues( url, bucket, tagsSelections, @@ -183,15 +183,13 @@ export class QueryBuilderFetcher { searchTerm ) - if (token !== this.findValuesTokens[index]) { - throw new CancellationError() - } - - return result + return this.findValuesQueries[index].promise } public cancelFindValues(index) { - this.findValuesTokens[index] = uuid.v4() + if (this.findValuesQueries[index]) { + this.findValuesQueries[index].cancel() + } } } diff --git a/ui/src/shared/components/TimeSeries.tsx b/ui/src/shared/components/TimeSeries.tsx index 5a2ba31ba4..c4091cf0ce 100644 --- a/ui/src/shared/components/TimeSeries.tsx +++ b/ui/src/shared/components/TimeSeries.tsx @@ -8,7 +8,6 @@ import {executeQuery, ExecuteFluxQueryResult} from 'src/shared/apis/v2/query' // Utils import {parseResponse} from 'src/shared/parsing/flux/response' -import {restartable, CancellationError} from 'src/utils/restartable' import {getSources, getActiveSource} from 'src/sources/selectors' import {renderQuery} from 'src/shared/utils/renderQuery' @@ -16,21 +15,38 @@ import {renderQuery} from 'src/shared/utils/renderQuery' import {RemoteDataState, FluxTable} from 'src/types' import {DashboardQuery} from 'src/types/v2/dashboards' import {AppState, Source} from 'src/types/v2' +import {WrappedCancelablePromise, CancellationError} from 'src/types/promises' type URLQuery = DashboardQuery & {url: string} -const executeQueries = async ( - queries: URLQuery[], +const executeRenderedQuery = ( + {text, type, url}: URLQuery, variables: {[key: string]: string} -): Promise => { - const promises = queries.map(async ({url, text, type}) => { - const renderedQuery = await renderQuery(text, type, variables) - const queryResult = await executeQuery(url, renderedQuery, type) +): WrappedCancelablePromise => { + let isCancelled = false + let cancelExecution - return queryResult + const cancel = () => { + isCancelled = true + + if (cancelExecution) { + cancelExecution() + } + } + + const promise = renderQuery(text, type, variables).then(renderedQuery => { + if (isCancelled) { + return Promise.reject(new CancellationError()) + } + + const pendingResult = executeQuery(url, renderedQuery, type) + + cancelExecution = pendingResult.cancel + + return pendingResult.promise }) - return Promise.all(promises) + return {promise, cancel} } export interface QueriesState { @@ -84,7 +100,9 @@ class TimeSeries extends Component { public state: State = defaultState() - private executeQueries = restartable(executeQueries) + private pendingResults: Array< + WrappedCancelablePromise + > = [] public async componentDidMount() { this.reload() @@ -142,7 +160,16 @@ class TimeSeries extends Component { try { const startTime = Date.now() - const results = await this.executeQueries(queries, variables) + + // Cancel any existing queries + this.pendingResults.forEach(({cancel}) => cancel()) + + // Issue new queries + this.pendingResults = queries.map(q => executeRenderedQuery(q, variables)) + + // Wait for new queries to complete + const results = await Promise.all(this.pendingResults.map(r => r.promise)) + const duration = Date.now() - startTime const tables = flatten(results.map(r => parseResponse(r.csv))) const files = results.map(r => r.csv) diff --git a/ui/src/types/promises.ts b/ui/src/types/promises.ts index 765e1e3885..16b59025a7 100644 --- a/ui/src/types/promises.ts +++ b/ui/src/types/promises.ts @@ -2,3 +2,5 @@ export interface WrappedCancelablePromise { promise: Promise cancel: () => void } + +export class CancellationError extends Error {} diff --git a/ui/src/utils/restartable.test.ts b/ui/src/utils/restartable.test.ts index 6a6d62b380..123b9dd70b 100644 --- a/ui/src/utils/restartable.test.ts +++ b/ui/src/utils/restartable.test.ts @@ -1,4 +1,5 @@ -import {restartable, CancellationError} from 'src/utils/restartable' +import {restartable} from 'src/utils/restartable' +import {CancellationError} from 'src/types/promises' describe('restartable', () => { test('with three concurrent promises', async () => { diff --git a/ui/src/utils/restartable.ts b/ui/src/utils/restartable.ts index 282e52487a..ae888f17c4 100644 --- a/ui/src/utils/restartable.ts +++ b/ui/src/utils/restartable.ts @@ -1,6 +1,6 @@ import Deferred from 'src/utils/Deferred' -export class CancellationError extends Error {} +import {CancellationError} from 'src/types/promises' // `restartable` is a utility that wraps promise-returning functions so that // concurrent calls resolve successfully exactly once, and with the most