Support true time series query cancellation

Previously, outdated queries in the `TimeSeries` and
`QueryBuilderFetcher` would run to completion even though their results
were ignored.

Now, pending but outdated queries will be truly canceled via
`XmlHttpRequest#abort`. This frees up server and network resources.

Closes #10801

Co-authored-by: Delmer Reed <delmer814+1@gmail.com>
pull/11259/head
Delmer Reed 2019-01-17 15:00:10 -05:00 committed by Chris Henn
parent 250c8df997
commit d8f9f011ff
8 changed files with 105 additions and 71 deletions

View File

@ -130,7 +130,7 @@ class DataListening extends PureComponent<Props, State> {
'/api/v2/query',
script,
InfluxLanguage.Flux
)
).promise
rowCount = response.rowCount
timePassed = Number(new Date()) - this.startTime
} catch (err) {

View File

@ -1,8 +1,5 @@
// APIs
import {
QueryBuilderFetcher,
CancellationError,
} from 'src/shared/apis/v2/queryBuilder'
import {QueryBuilderFetcher} from 'src/shared/apis/v2/queryBuilder'
import {bucketsAPI} from 'src/utils/api'
@ -17,6 +14,7 @@ import {
import {Dispatch} from 'redux-thunk'
import {GetState} from 'src/types/v2'
import {RemoteDataState} from 'src/types'
import {CancellationError} from 'src/types/promises'
const fetcher = new QueryBuilderFetcher()

View File

@ -3,6 +3,7 @@ import _ from 'lodash'
import Deferred from 'src/utils/Deferred'
import {InfluxLanguage} from 'src/types/v2/dashboards'
import {WrappedCancelablePromise, CancellationError} from 'src/types/promises'
const CHECK_LIMIT_INTERVAL = 200
const MAX_ROWS = 50000
@ -17,11 +18,11 @@ interface XHRError extends Error {
xhr?: XMLHttpRequest
}
export const executeQuery = async (
export const executeQuery = (
url: string,
query: string,
language: InfluxLanguage = InfluxLanguage.Flux
): Promise<ExecuteFluxQueryResult> => {
): WrappedCancelablePromise<ExecuteFluxQueryResult> => {
// We're using `XMLHttpRequest` directly here rather than through `axios` so
// that we can poll the response size as it comes back. If the response size
// is greater than a predefined limit, we close the HTTP connection and
@ -130,5 +131,12 @@ export const executeQuery = async (
xhr.setRequestHeader('Content-Type', 'application/json')
xhr.send(body)
return deferred.promise
return {
promise: deferred.promise,
cancel: () => {
clearTimeout(interval)
xhr.abort()
deferred.reject(new CancellationError())
},
}
}

View File

@ -1,6 +1,5 @@
// Libraries
import {get} from 'lodash'
import uuid from 'uuid'
// APIs
import {executeQuery, ExecuteFluxQueryResult} from 'src/shared/apis/v2/query'
@ -8,27 +7,32 @@ import {parseResponse} from 'src/shared/parsing/flux/response'
// Types
import {InfluxLanguage, BuilderConfig} from 'src/types/v2'
import {WrappedCancelablePromise} from 'src/types/promises'
export const SEARCH_DURATION = '30d'
export const LIMIT = 200
async function findBuckets(url: string): Promise<string[]> {
type CancelableQuery = WrappedCancelablePromise<string[]>
function findBuckets(url: string): CancelableQuery {
const query = `buckets()
|> sort(columns: ["name"])
|> limit(n: ${LIMIT})`
const resp = await executeQuery(url, query, InfluxLanguage.Flux)
const parsed = extractCol(resp, 'name')
const {promise, cancel} = executeQuery(url, query, InfluxLanguage.Flux)
return parsed
return {
promise: promise.then(resp => extractCol(resp, 'name')),
cancel,
}
}
async function findKeys(
function findKeys(
url: string,
bucket: string,
tagsSelections: BuilderConfig['tags'],
searchTerm: string = ''
): Promise<string[]> {
): CancelableQuery {
const tagFilters = formatTagFilterPredicate(tagsSelections)
const searchFilter = formatSearchFilterCall(searchTerm)
const previousKeyFilter = formatTagKeyFilterCall(tagsSelections)
@ -46,19 +50,21 @@ async function findKeys(
|> limit(n: ${LIMIT})
`
const resp = await executeQuery(url, query, InfluxLanguage.Flux)
const parsed = extractCol(resp, '_value')
const {promise, cancel} = executeQuery(url, query, InfluxLanguage.Flux)
return parsed
return {
promise: promise.then(resp => extractCol(resp, '_value')),
cancel,
}
}
async function findValues(
function findValues(
url: string,
bucket: string,
tagsSelections: BuilderConfig['tags'],
key: string,
searchTerm: string = ''
): Promise<string[]> {
): CancelableQuery {
const tagFilters = formatTagFilterPredicate(tagsSelections)
const searchFilter = formatSearchFilterCall(searchTerm)
@ -69,10 +75,12 @@ async function findValues(
|> limit(n: ${LIMIT})
`
const resp = await executeQuery(url, query, InfluxLanguage.Flux)
const parsed = extractCol(resp, '_value')
const {promise, cancel} = executeQuery(url, query, InfluxLanguage.Flux)
return parsed
return {
promise: promise.then(resp => extractCol(resp, '_value')),
cancel,
}
}
function extractCol(resp: ExecuteFluxQueryResult, colName: string): string[] {
@ -118,25 +126,19 @@ function formatSearchFilterCall(searchTerm: string) {
return `\n |> filter(fn: (r) => r._value =~ /(?i:${searchTerm})/)`
}
export class CancellationError extends Error {}
export class QueryBuilderFetcher {
private findBucketsToken: string = ''
private findKeysTokens = []
private findValuesTokens = []
private findBucketsQuery: CancelableQuery
private findKeysQueries: CancelableQuery[] = []
private findValuesQueries: CancelableQuery[] = []
public async findBuckets(url: string): Promise<string[]> {
const token = uuid.v4()
this.findBucketsToken = token
const result = await findBuckets(url)
if (token !== this.findBucketsToken) {
throw new CancellationError()
if (this.findBucketsQuery) {
this.findBucketsQuery.cancel()
}
return result
this.findBucketsQuery = findBuckets(url)
return this.findBucketsQuery.promise
}
public async findKeys(
@ -146,21 +148,21 @@ export class QueryBuilderFetcher {
tagsSelections: BuilderConfig['tags'],
searchTerm: string = ''
): Promise<string[]> {
const token = uuid.v4()
this.cancelFindKeys(index)
this.findKeysTokens[index] = token
const result = await findKeys(url, bucket, tagsSelections, searchTerm)
if (token !== this.findKeysTokens[index]) {
throw new CancellationError()
}
return result
this.findKeysQueries[index] = findKeys(
url,
bucket,
tagsSelections,
searchTerm
)
return this.findKeysQueries[index].promise
}
public cancelFindKeys(index) {
this.findKeysTokens[index] = uuid.v4()
if (this.findKeysQueries[index]) {
this.findKeysQueries[index].cancel()
}
}
public async findValues(
@ -171,11 +173,9 @@ export class QueryBuilderFetcher {
key: string,
searchTerm: string = ''
): Promise<string[]> {
const token = uuid.v4()
this.cancelFindValues(index)
this.findValuesTokens[index] = token
const result = await findValues(
this.findValuesQueries[index] = findValues(
url,
bucket,
tagsSelections,
@ -183,15 +183,13 @@ export class QueryBuilderFetcher {
searchTerm
)
if (token !== this.findValuesTokens[index]) {
throw new CancellationError()
}
return result
return this.findValuesQueries[index].promise
}
public cancelFindValues(index) {
this.findValuesTokens[index] = uuid.v4()
if (this.findValuesQueries[index]) {
this.findValuesQueries[index].cancel()
}
}
}

View File

@ -8,7 +8,6 @@ import {executeQuery, ExecuteFluxQueryResult} from 'src/shared/apis/v2/query'
// Utils
import {parseResponse} from 'src/shared/parsing/flux/response'
import {restartable, CancellationError} from 'src/utils/restartable'
import {getSources, getActiveSource} from 'src/sources/selectors'
import {renderQuery} from 'src/shared/utils/renderQuery'
@ -16,21 +15,38 @@ import {renderQuery} from 'src/shared/utils/renderQuery'
import {RemoteDataState, FluxTable} from 'src/types'
import {DashboardQuery} from 'src/types/v2/dashboards'
import {AppState, Source} from 'src/types/v2'
import {WrappedCancelablePromise, CancellationError} from 'src/types/promises'
type URLQuery = DashboardQuery & {url: string}
const executeQueries = async (
queries: URLQuery[],
const executeRenderedQuery = (
{text, type, url}: URLQuery,
variables: {[key: string]: string}
): Promise<ExecuteFluxQueryResult[]> => {
const promises = queries.map(async ({url, text, type}) => {
const renderedQuery = await renderQuery(text, type, variables)
const queryResult = await executeQuery(url, renderedQuery, type)
): WrappedCancelablePromise<ExecuteFluxQueryResult> => {
let isCancelled = false
let cancelExecution
return queryResult
const cancel = () => {
isCancelled = true
if (cancelExecution) {
cancelExecution()
}
}
const promise = renderQuery(text, type, variables).then(renderedQuery => {
if (isCancelled) {
return Promise.reject(new CancellationError())
}
const pendingResult = executeQuery(url, renderedQuery, type)
cancelExecution = pendingResult.cancel
return pendingResult.promise
})
return Promise.all(promises)
return {promise, cancel}
}
export interface QueriesState {
@ -84,7 +100,9 @@ class TimeSeries extends Component<Props, State> {
public state: State = defaultState()
private executeQueries = restartable(executeQueries)
private pendingResults: Array<
WrappedCancelablePromise<ExecuteFluxQueryResult>
> = []
public async componentDidMount() {
this.reload()
@ -142,7 +160,16 @@ class TimeSeries extends Component<Props, State> {
try {
const startTime = Date.now()
const results = await this.executeQueries(queries, variables)
// Cancel any existing queries
this.pendingResults.forEach(({cancel}) => cancel())
// Issue new queries
this.pendingResults = queries.map(q => executeRenderedQuery(q, variables))
// Wait for new queries to complete
const results = await Promise.all(this.pendingResults.map(r => r.promise))
const duration = Date.now() - startTime
const tables = flatten(results.map(r => parseResponse(r.csv)))
const files = results.map(r => r.csv)

View File

@ -2,3 +2,5 @@ export interface WrappedCancelablePromise<T> {
promise: Promise<T>
cancel: () => void
}
export class CancellationError extends Error {}

View File

@ -1,4 +1,5 @@
import {restartable, CancellationError} from 'src/utils/restartable'
import {restartable} from 'src/utils/restartable'
import {CancellationError} from 'src/types/promises'
describe('restartable', () => {
test('with three concurrent promises', async () => {

View File

@ -1,6 +1,6 @@
import Deferred from 'src/utils/Deferred'
export class CancellationError extends Error {}
import {CancellationError} from 'src/types/promises'
// `restartable` is a utility that wraps promise-returning functions so that
// concurrent calls resolve successfully exactly once, and with the most