Allow queries to be interrupted during planning
If a bad query is run, kill query and limits would not kick in until after it started executing. Some bad queries that involve high cardinality can cause the server to OOM just from planning which defeats the purpose of the max-select-series limit. This change primarily fixes max-select-series limit so that the query is killed earlier and has the side effect that kill query now can kill a query while it's being planned.pull/8158/head
parent
86ad0a45b6
commit
2d5d899ac2
|
@ -1310,6 +1310,14 @@ func (e *Engine) createCallIterator(measurement string, call *influxql.Call, opt
|
|||
itrs := make([]influxql.Iterator, 0, len(tagSets))
|
||||
if err := func() error {
|
||||
for _, t := range tagSets {
|
||||
// Abort if the query was killed
|
||||
select {
|
||||
case <-opt.InterruptCh:
|
||||
influxql.Iterators(itrs).Close()
|
||||
return err
|
||||
default:
|
||||
}
|
||||
|
||||
inputs, err := e.createTagSetIterators(ref, mm, t, opt)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1510,6 +1518,14 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu
|
|||
}
|
||||
itrs = append(itrs, itr)
|
||||
|
||||
// Abort if the query was killed
|
||||
select {
|
||||
case <-opt.InterruptCh:
|
||||
influxql.Iterators(itrs).Close()
|
||||
return nil, err
|
||||
default:
|
||||
}
|
||||
|
||||
// Enforce series limit at creation time.
|
||||
if opt.MaxSeriesN > 0 && len(itrs) > opt.MaxSeriesN {
|
||||
influxql.Iterators(itrs).Close()
|
||||
|
|
14
tsdb/meta.go
14
tsdb/meta.go
|
@ -809,6 +809,13 @@ func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition inf
|
|||
tagSets := make(map[string]*influxql.TagSet, 64)
|
||||
var seriesN int
|
||||
for _, id := range ids {
|
||||
// Abort if the query was killed
|
||||
select {
|
||||
case <-opt.InterruptCh:
|
||||
return nil, influxql.ErrQueryInterrupted
|
||||
default:
|
||||
}
|
||||
|
||||
if opt.MaxSeriesN > 0 && seriesN > opt.MaxSeriesN {
|
||||
return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN)
|
||||
}
|
||||
|
@ -847,6 +854,13 @@ func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition inf
|
|||
|
||||
// Sort the series in each tag set.
|
||||
for _, t := range tagSets {
|
||||
// Abort if the query was killed
|
||||
select {
|
||||
case <-opt.InterruptCh:
|
||||
return nil, influxql.ErrQueryInterrupted
|
||||
default:
|
||||
}
|
||||
|
||||
sort.Sort(t)
|
||||
}
|
||||
|
||||
|
|
|
@ -1046,6 +1046,13 @@ func (a Shards) CreateIterator(measurement string, opt influxql.IteratorOptions)
|
|||
}
|
||||
itrs = append(itrs, itr)
|
||||
|
||||
select {
|
||||
case <-opt.InterruptCh:
|
||||
influxql.Iterators(itrs).Close()
|
||||
return nil, err
|
||||
default:
|
||||
}
|
||||
|
||||
// Enforce series limit at creation time.
|
||||
if opt.MaxSeriesN > 0 {
|
||||
stats := itr.Stats()
|
||||
|
|
Loading…
Reference in New Issue