Merge pull request #3596 from influxdata/flux/limit-responses
Improved handling of large Flux query responsespull/3599/head
commit
0e9552f495
|
@ -4,6 +4,7 @@ import AJAX from 'src/utils/ajax'
|
|||
import {Service, FluxTable} from 'src/types'
|
||||
import {updateService} from 'src/shared/apis'
|
||||
import {parseResponse} from 'src/shared/parsing/flux/response'
|
||||
import {MAX_RESPONSE_BYTES} from 'src/flux/constants'
|
||||
|
||||
export const getSuggestions = async (url: string) => {
|
||||
try {
|
||||
|
@ -39,23 +40,36 @@ export const getAST = async (request: ASTRequest) => {
|
|||
}
|
||||
}
|
||||
|
||||
interface GetTimeSeriesResult {
|
||||
didTruncate: boolean
|
||||
tables: FluxTable[]
|
||||
}
|
||||
|
||||
export const getTimeSeries = async (
|
||||
service: Service,
|
||||
script: string
|
||||
): Promise<FluxTable[]> => {
|
||||
): Promise<GetTimeSeriesResult> => {
|
||||
const and = encodeURIComponent('&')
|
||||
const mark = encodeURIComponent('?')
|
||||
const garbage = script.replace(/\s/g, '') // server cannot handle whitespace
|
||||
const url = `${
|
||||
service.links.proxy
|
||||
}?path=/v1/query${mark}orgName=defaulorgname${and}q=${garbage}`
|
||||
|
||||
try {
|
||||
const {data} = await AJAX({
|
||||
method: 'POST',
|
||||
url: `${
|
||||
service.links.proxy
|
||||
}?path=/v1/query${mark}orgName=defaulorgname${and}q=${garbage}`,
|
||||
})
|
||||
// We are using the `fetch` API here since the `AJAX` utility lacks support
|
||||
// for limiting response size. The `AJAX` utility depends on
|
||||
// `axios.request` which _does_ have a `maxContentLength` option, though it
|
||||
// seems to be broken at the moment. We might use this option instead of
|
||||
// the `fetch` API in the future, if it is ever fixed. See
|
||||
// https://github.com/axios/axios/issues/1491.
|
||||
const resp = await fetch(url, {method: 'POST'})
|
||||
const {body, byteLength} = await decodeFluxRespWithLimit(resp)
|
||||
|
||||
return parseResponse(data)
|
||||
return {
|
||||
tables: parseResponse(body),
|
||||
didTruncate: byteLength >= MAX_RESPONSE_BYTES,
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Problem fetching data', error)
|
||||
|
||||
|
@ -114,3 +128,43 @@ export const updateScript = async (service: Service, script: string) => {
|
|||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
interface DecodeFluxRespWithLimitResult {
|
||||
body: string
|
||||
byteLength: number
|
||||
}
|
||||
|
||||
const decodeFluxRespWithLimit = async (
|
||||
resp: Response
|
||||
): Promise<DecodeFluxRespWithLimitResult> => {
|
||||
const reader = resp.body.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
|
||||
let bytesRead = 0
|
||||
let body = ''
|
||||
let currentRead = await reader.read()
|
||||
|
||||
while (!currentRead.done) {
|
||||
const currentText = decoder.decode(currentRead.value)
|
||||
|
||||
bytesRead += currentRead.value.byteLength
|
||||
|
||||
if (bytesRead >= MAX_RESPONSE_BYTES) {
|
||||
// Discard last line since it may be partially read
|
||||
const lines = currentText.split('\n')
|
||||
body += lines.slice(0, lines.length - 1).join('\n')
|
||||
|
||||
reader.cancel()
|
||||
|
||||
return {body, byteLength: bytesRead}
|
||||
} else {
|
||||
body += currentText
|
||||
}
|
||||
|
||||
currentRead = await reader.read()
|
||||
}
|
||||
|
||||
reader.cancel()
|
||||
|
||||
return {body, byteLength: bytesRead}
|
||||
}
|
||||
|
|
|
@ -8,25 +8,46 @@ import {vis} from 'src/flux/constants'
|
|||
|
||||
const NUM_FIXED_ROWS = 1
|
||||
|
||||
const filterTable = (table: FluxTable): FluxTable => {
|
||||
const IGNORED_COLUMNS = ['', 'result', 'table', '_start', '_stop']
|
||||
const header = table.data[0]
|
||||
const indices = IGNORED_COLUMNS.map(name => header.indexOf(name))
|
||||
const data = table.data.map(row =>
|
||||
row.filter((__, i) => !indices.includes(i))
|
||||
)
|
||||
|
||||
return {
|
||||
...table,
|
||||
data,
|
||||
}
|
||||
}
|
||||
|
||||
interface Props {
|
||||
table: FluxTable
|
||||
}
|
||||
|
||||
interface State {
|
||||
scrollLeft: number
|
||||
filteredTable: FluxTable
|
||||
}
|
||||
|
||||
@ErrorHandling
|
||||
export default class TimeMachineTable extends PureComponent<Props, State> {
|
||||
public static getDerivedStateFromProps({table}: Props) {
|
||||
return {filteredTable: filterTable(table)}
|
||||
}
|
||||
|
||||
constructor(props) {
|
||||
super(props)
|
||||
|
||||
this.state = {
|
||||
scrollLeft: 0,
|
||||
filteredTable: filterTable(props.table),
|
||||
}
|
||||
}
|
||||
|
||||
public render() {
|
||||
const {scrollLeft} = this.state
|
||||
const {scrollLeft, filteredTable} = this.state
|
||||
|
||||
return (
|
||||
<div style={{flex: '1 1 auto'}}>
|
||||
|
@ -73,7 +94,7 @@ export default class TimeMachineTable extends PureComponent<Props, State> {
|
|||
cellRenderer={this.cellRenderer}
|
||||
rowHeight={vis.TABLE_ROW_HEIGHT}
|
||||
height={height - this.headerOffset}
|
||||
rowCount={this.table.data.length - NUM_FIXED_ROWS}
|
||||
rowCount={filteredTable.data.length - NUM_FIXED_ROWS}
|
||||
/>
|
||||
)}
|
||||
</ColumnSizer>
|
||||
|
@ -93,7 +114,9 @@ export default class TimeMachineTable extends PureComponent<Props, State> {
|
|||
}
|
||||
|
||||
private get columnCount(): number {
|
||||
return _.get(this.table, 'data.0', []).length
|
||||
const {filteredTable} = this.state
|
||||
|
||||
return _.get(filteredTable, 'data.0', []).length
|
||||
}
|
||||
|
||||
private get headerOffset(): number {
|
||||
|
@ -109,13 +132,15 @@ export default class TimeMachineTable extends PureComponent<Props, State> {
|
|||
key,
|
||||
style,
|
||||
}: GridCellProps): React.ReactNode => {
|
||||
const {filteredTable} = this.state
|
||||
|
||||
return (
|
||||
<div
|
||||
key={key}
|
||||
style={{...style, display: 'flex', alignItems: 'center'}}
|
||||
className="table-graph-cell table-graph-cell__fixed-row"
|
||||
>
|
||||
{this.table.data[0][columnIndex]}
|
||||
{filteredTable.data[0][columnIndex]}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
@ -126,25 +151,12 @@ export default class TimeMachineTable extends PureComponent<Props, State> {
|
|||
rowIndex,
|
||||
style,
|
||||
}: GridCellProps): React.ReactNode => {
|
||||
const {filteredTable} = this.state
|
||||
|
||||
return (
|
||||
<div key={key} style={style} className="table-graph-cell">
|
||||
{this.table.data[rowIndex + NUM_FIXED_ROWS][columnIndex]}
|
||||
{filteredTable.data[rowIndex + NUM_FIXED_ROWS][columnIndex]}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
private get table(): FluxTable {
|
||||
const IGNORED_COLUMNS = ['', 'result', 'table', '_start', '_stop']
|
||||
const {table} = this.props
|
||||
const header = table.data[0]
|
||||
const indices = IGNORED_COLUMNS.map(name => header.indexOf(name))
|
||||
const data = table.data.map(row =>
|
||||
row.filter((__, i) => !indices.includes(i))
|
||||
)
|
||||
|
||||
return {
|
||||
...table,
|
||||
data,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,4 +6,15 @@ import * as builder from 'src/flux/constants/builder'
|
|||
import * as vis from 'src/flux/constants/vis'
|
||||
import * as explorer from 'src/flux/constants/explorer'
|
||||
|
||||
export {ast, funcNames, argTypes, editor, builder, vis, explorer}
|
||||
const MAX_RESPONSE_BYTES = 1e7 // 10 MB
|
||||
|
||||
export {
|
||||
ast,
|
||||
funcNames,
|
||||
argTypes,
|
||||
editor,
|
||||
builder,
|
||||
vis,
|
||||
explorer,
|
||||
MAX_RESPONSE_BYTES,
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import KeyboardShortcuts from 'src/shared/components/KeyboardShortcuts'
|
|||
import {
|
||||
analyzeSuccess,
|
||||
fluxTimeSeriesError,
|
||||
fluxResponseTruncatedError,
|
||||
} from 'src/shared/copy/notifications'
|
||||
import {UpdateScript} from 'src/flux/actions'
|
||||
|
||||
|
@ -452,8 +453,13 @@ export class FluxPage extends PureComponent<Props, State> {
|
|||
}
|
||||
|
||||
try {
|
||||
const data = await getTimeSeries(this.service, script)
|
||||
this.setState({data})
|
||||
const {tables, didTruncate} = await getTimeSeries(this.service, script)
|
||||
|
||||
this.setState({data: tables})
|
||||
|
||||
if (didTruncate) {
|
||||
notify(fluxResponseTruncatedError())
|
||||
}
|
||||
} catch (error) {
|
||||
this.setState({data: []})
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
// and ensuring stylistic consistency
|
||||
|
||||
import {FIVE_SECONDS, TEN_SECONDS, INFINITE} from 'src/shared/constants/index'
|
||||
import {MAX_RESPONSE_BYTES} from 'src/flux/constants'
|
||||
|
||||
const defaultErrorNotification = {
|
||||
type: 'error',
|
||||
|
@ -680,3 +681,13 @@ export const fluxTimeSeriesError = (message: string) => ({
|
|||
...defaultErrorNotification,
|
||||
message: `Could not get data: ${message}`,
|
||||
})
|
||||
|
||||
export const fluxResponseTruncatedError = () => {
|
||||
const BYTES_TO_MB = 1 / 1e6
|
||||
const APPROX_MAX_RESPONSE_MB = +(MAX_RESPONSE_BYTES * BYTES_TO_MB).toFixed(2)
|
||||
|
||||
return {
|
||||
...defaultErrorNotification,
|
||||
message: `Large response truncated to first ${APPROX_MAX_RESPONSE_MB} MB`,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,12 @@ export const parseTables = (responseChunk: string): FluxTable[] => {
|
|||
throw new Error('Unable to extract annotation data')
|
||||
}
|
||||
|
||||
if (_.isEmpty(nonAnnotationLines)) {
|
||||
// A response may be truncated on an arbitrary line. This guards against
|
||||
// the case where a response is truncated on annotation data
|
||||
return []
|
||||
}
|
||||
|
||||
const nonAnnotationData = Papa.parse(nonAnnotationLines).data
|
||||
const annotationData = Papa.parse(annotationLines).data
|
||||
const headerRow = nonAnnotationData[0]
|
||||
|
|
|
@ -134,3 +134,15 @@ export const CSV_TO_DYGRAPH_MISMATCHED = `
|
|||
,,1,2018-06-04T17:12:21.025984999Z,2018-06-04T17:13:00Z,2018-06-05T17:12:25Z,10,available,mem,bertrand.local
|
||||
,,1,2018-06-04T17:12:21.025984999Z,2018-06-04T17:13:00Z,2018-06-05T17:12:35Z,11,available,mem,bertrand.local
|
||||
`
|
||||
|
||||
export const TRUNCATED_RESPONSE = `
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
|
||||
,,0,1677-09-21T00:12:43.145224192Z,2018-05-22T22:39:17.042276772Z,2018-05-22T22:39:12.584Z,0,usage_guest,cpu,cpu-total,WattsInfluxDB
|
||||
,,1,1677-09-21T00:12:43.145224192Z,2018-05-22T22:39:17.042276772Z,2018-05-22T22:39:12.584Z,0,usage_guest_nice,cpu,cpu-total,WattsInfluxDB
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true,true,true,true
|
||||
#default,_result,,,,,,,,,,,,`
|
||||
|
|
|
@ -4,6 +4,7 @@ import {
|
|||
RESPONSE_METADATA,
|
||||
MULTI_SCHEMA_RESPONSE,
|
||||
EXPECTED_COLUMNS,
|
||||
TRUNCATED_RESPONSE,
|
||||
} from 'test/shared/parsing/flux/constants'
|
||||
|
||||
describe('Flux results parser', () => {
|
||||
|
@ -37,4 +38,12 @@ describe('Flux results parser', () => {
|
|||
expect(actual).toEqual(expected)
|
||||
})
|
||||
})
|
||||
|
||||
describe('partial responses', () => {
|
||||
it('should discard tables without any non-annotation rows', () => {
|
||||
const actual = parseResponse(TRUNCATED_RESPONSE)
|
||||
|
||||
expect(actual).toHaveLength(2)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue