Create a new interrupt iterator that will stop emitting points after an interrupt
Use of the iterator is spread out into both `IteratorCreators` and inside of the iterators themselves. Part of the interrupt must be handled inside of the engine so it stops trying to emit points when an interrupt is found and another part of the interrupt has to happen when combining the iterators so it doesn't just start reading the next shard.pull/5950/head
parent
3e580bcf04
commit
6655ca7769
|
@ -411,7 +411,7 @@ func (e *QueryExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordU
|
|||
func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, chunkSize, statementID int, results chan *influxql.Result, closing <-chan struct{}) error {
|
||||
// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
|
||||
now := time.Now().UTC()
|
||||
opt := influxql.SelectOptions{}
|
||||
opt := influxql.SelectOptions{InterruptCh: closing}
|
||||
|
||||
// Replace instances of "now()" with the current time, and check the resultant times.
|
||||
stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: now})
|
||||
|
|
|
@ -579,8 +579,9 @@ func BenchmarkCallIterator_Min_Float(b *testing.B) {
|
|||
b.ReportAllocs()
|
||||
|
||||
itr, err := influxql.NewCallIterator(input, influxql.IteratorOptions{
|
||||
Expr: MustParseExpr("min(value)"),
|
||||
Interval: influxql.Interval{Duration: 1 * time.Hour},
|
||||
Expr: MustParseExpr("min(value)"),
|
||||
Interval: influxql.Interval{Duration: 1 * time.Hour},
|
||||
InterruptCh: make(chan struct{}),
|
||||
})
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
|
|
|
@ -570,6 +570,39 @@ func (itr *floatIntervalIterator) Next() *FloatPoint {
|
|||
return p
|
||||
}
|
||||
|
||||
// floatInterruptIterator represents a float implementation of InterruptIterator.
|
||||
type floatInterruptIterator struct {
|
||||
input FloatIterator
|
||||
closing <-chan struct{}
|
||||
count int
|
||||
}
|
||||
|
||||
func newFloatInterruptIterator(input FloatIterator, closing <-chan struct{}) *floatInterruptIterator {
|
||||
return &floatInterruptIterator{input: input, closing: closing}
|
||||
}
|
||||
|
||||
func (itr *floatInterruptIterator) Close() error { return itr.input.Close() }
|
||||
|
||||
func (itr *floatInterruptIterator) Next() *FloatPoint {
|
||||
// Only check if the channel is closed every 256 points. This
|
||||
// intentionally checks on both 0 and 256 so that if the iterator
|
||||
// has been interrupted before the first point is emitted it will
|
||||
// not emit any points.
|
||||
if itr.count&0x100 == 0 {
|
||||
select {
|
||||
case <-itr.closing:
|
||||
return nil
|
||||
default:
|
||||
// Reset iterator count to zero and fall through to emit the next point.
|
||||
itr.count = 0
|
||||
}
|
||||
}
|
||||
|
||||
// Increment the counter for every point read.
|
||||
itr.count++
|
||||
return itr.input.Next()
|
||||
}
|
||||
|
||||
// floatAuxIterator represents a float implementation of AuxIterator.
|
||||
type floatAuxIterator struct {
|
||||
input *bufFloatIterator
|
||||
|
@ -1894,6 +1927,39 @@ func (itr *integerIntervalIterator) Next() *IntegerPoint {
|
|||
return p
|
||||
}
|
||||
|
||||
// integerInterruptIterator represents a integer implementation of InterruptIterator.
|
||||
type integerInterruptIterator struct {
|
||||
input IntegerIterator
|
||||
closing <-chan struct{}
|
||||
count int
|
||||
}
|
||||
|
||||
func newIntegerInterruptIterator(input IntegerIterator, closing <-chan struct{}) *integerInterruptIterator {
|
||||
return &integerInterruptIterator{input: input, closing: closing}
|
||||
}
|
||||
|
||||
func (itr *integerInterruptIterator) Close() error { return itr.input.Close() }
|
||||
|
||||
func (itr *integerInterruptIterator) Next() *IntegerPoint {
|
||||
// Only check if the channel is closed every 256 points. This
|
||||
// intentionally checks on both 0 and 256 so that if the iterator
|
||||
// has been interrupted before the first point is emitted it will
|
||||
// not emit any points.
|
||||
if itr.count&0x100 == 0 {
|
||||
select {
|
||||
case <-itr.closing:
|
||||
return nil
|
||||
default:
|
||||
// Reset iterator count to zero and fall through to emit the next point.
|
||||
itr.count = 0
|
||||
}
|
||||
}
|
||||
|
||||
// Increment the counter for every point read.
|
||||
itr.count++
|
||||
return itr.input.Next()
|
||||
}
|
||||
|
||||
// integerAuxIterator represents a integer implementation of AuxIterator.
|
||||
type integerAuxIterator struct {
|
||||
input *bufIntegerIterator
|
||||
|
@ -3215,6 +3281,39 @@ func (itr *stringIntervalIterator) Next() *StringPoint {
|
|||
return p
|
||||
}
|
||||
|
||||
// stringInterruptIterator represents a string implementation of InterruptIterator.
|
||||
type stringInterruptIterator struct {
|
||||
input StringIterator
|
||||
closing <-chan struct{}
|
||||
count int
|
||||
}
|
||||
|
||||
func newStringInterruptIterator(input StringIterator, closing <-chan struct{}) *stringInterruptIterator {
|
||||
return &stringInterruptIterator{input: input, closing: closing}
|
||||
}
|
||||
|
||||
func (itr *stringInterruptIterator) Close() error { return itr.input.Close() }
|
||||
|
||||
func (itr *stringInterruptIterator) Next() *StringPoint {
|
||||
// Only check if the channel is closed every 256 points. This
|
||||
// intentionally checks on both 0 and 256 so that if the iterator
|
||||
// has been interrupted before the first point is emitted it will
|
||||
// not emit any points.
|
||||
if itr.count&0x100 == 0 {
|
||||
select {
|
||||
case <-itr.closing:
|
||||
return nil
|
||||
default:
|
||||
// Reset iterator count to zero and fall through to emit the next point.
|
||||
itr.count = 0
|
||||
}
|
||||
}
|
||||
|
||||
// Increment the counter for every point read.
|
||||
itr.count++
|
||||
return itr.input.Next()
|
||||
}
|
||||
|
||||
// stringAuxIterator represents a string implementation of AuxIterator.
|
||||
type stringAuxIterator struct {
|
||||
input *bufStringIterator
|
||||
|
@ -4536,6 +4635,39 @@ func (itr *booleanIntervalIterator) Next() *BooleanPoint {
|
|||
return p
|
||||
}
|
||||
|
||||
// booleanInterruptIterator represents a boolean implementation of InterruptIterator.
|
||||
type booleanInterruptIterator struct {
|
||||
input BooleanIterator
|
||||
closing <-chan struct{}
|
||||
count int
|
||||
}
|
||||
|
||||
func newBooleanInterruptIterator(input BooleanIterator, closing <-chan struct{}) *booleanInterruptIterator {
|
||||
return &booleanInterruptIterator{input: input, closing: closing}
|
||||
}
|
||||
|
||||
func (itr *booleanInterruptIterator) Close() error { return itr.input.Close() }
|
||||
|
||||
func (itr *booleanInterruptIterator) Next() *BooleanPoint {
|
||||
// Only check if the channel is closed every 256 points. This
|
||||
// intentionally checks on both 0 and 256 so that if the iterator
|
||||
// has been interrupted before the first point is emitted it will
|
||||
// not emit any points.
|
||||
if itr.count&0x100 == 0 {
|
||||
select {
|
||||
case <-itr.closing:
|
||||
return nil
|
||||
default:
|
||||
// Reset iterator count to zero and fall through to emit the next point.
|
||||
itr.count = 0
|
||||
}
|
||||
}
|
||||
|
||||
// Increment the counter for every point read.
|
||||
itr.count++
|
||||
return itr.input.Next()
|
||||
}
|
||||
|
||||
// booleanAuxIterator represents a boolean implementation of AuxIterator.
|
||||
type booleanAuxIterator struct {
|
||||
input *bufBooleanIterator
|
||||
|
|
|
@ -569,6 +569,39 @@ func (itr *{{$k.name}}IntervalIterator) Next() *{{$k.Name}}Point {
|
|||
return p
|
||||
}
|
||||
|
||||
// {{$k.name}}InterruptIterator represents a {{$k.name}} implementation of InterruptIterator.
|
||||
type {{$k.name}}InterruptIterator struct {
|
||||
input {{$k.Name}}Iterator
|
||||
closing <-chan struct{}
|
||||
count int
|
||||
}
|
||||
|
||||
func new{{$k.Name}}InterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}InterruptIterator {
|
||||
return &{{$k.name}}InterruptIterator{input: input, closing: closing}
|
||||
}
|
||||
|
||||
func (itr *{{$k.name}}InterruptIterator) Close() error { return itr.input.Close() }
|
||||
|
||||
func (itr *{{$k.name}}InterruptIterator) Next() *{{$k.Name}}Point {
|
||||
// Only check if the channel is closed every 256 points. This
|
||||
// intentionally checks on both 0 and 256 so that if the iterator
|
||||
// has been interrupted before the first point is emitted it will
|
||||
// not emit any points.
|
||||
if itr.count&0x100 == 0 {
|
||||
select {
|
||||
case <-itr.closing:
|
||||
return nil
|
||||
default:
|
||||
// Reset iterator count to zero and fall through to emit the next point.
|
||||
itr.count = 0
|
||||
}
|
||||
}
|
||||
|
||||
// Increment the counter for every point read.
|
||||
itr.count++
|
||||
return itr.input.Next()
|
||||
}
|
||||
|
||||
// {{$k.name}}AuxIterator represents a {{$k.name}} implementation of AuxIterator.
|
||||
type {{$k.name}}AuxIterator struct {
|
||||
input *buf{{$k.Name}}Iterator
|
||||
|
|
|
@ -222,6 +222,23 @@ func NewIntervalIterator(input Iterator, opt IteratorOptions) Iterator {
|
|||
}
|
||||
}
|
||||
|
||||
// NewInterruptIterator returns an iterator that will stop producing output when a channel
|
||||
// has been closed on the passed in channel.
|
||||
func NewInterruptIterator(input Iterator, closing <-chan struct{}) Iterator {
|
||||
switch input := input.(type) {
|
||||
case FloatIterator:
|
||||
return newFloatInterruptIterator(input, closing)
|
||||
case IntegerIterator:
|
||||
return newIntegerInterruptIterator(input, closing)
|
||||
case StringIterator:
|
||||
return newStringInterruptIterator(input, closing)
|
||||
case BooleanIterator:
|
||||
return newBooleanInterruptIterator(input, closing)
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported fill iterator type: %T", input))
|
||||
}
|
||||
}
|
||||
|
||||
// AuxIterator represents an iterator that can split off separate auxilary iterators.
|
||||
type AuxIterator interface {
|
||||
Iterator
|
||||
|
@ -466,7 +483,11 @@ func (a IteratorCreators) CreateIterator(opt IteratorOptions) (Iterator, error)
|
|||
|
||||
// Merge into a single iterator.
|
||||
if opt.MergeSorted() {
|
||||
return NewSortedMergeIterator(itrs, opt), nil
|
||||
itr := NewSortedMergeIterator(itrs, opt)
|
||||
if opt.InterruptCh != nil {
|
||||
itr = NewInterruptIterator(itr, opt.InterruptCh)
|
||||
}
|
||||
return itr, nil
|
||||
}
|
||||
|
||||
itr := NewMergeIterator(itrs, opt)
|
||||
|
@ -478,6 +499,10 @@ func (a IteratorCreators) CreateIterator(opt IteratorOptions) (Iterator, error)
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if opt.InterruptCh != nil {
|
||||
itr = NewInterruptIterator(itr, opt.InterruptCh)
|
||||
}
|
||||
return NewCallIterator(itr, opt)
|
||||
}
|
||||
|
||||
|
@ -604,6 +629,10 @@ type IteratorOptions struct {
|
|||
|
||||
// Removes duplicate rows from raw queries.
|
||||
Dedupe bool
|
||||
|
||||
// If this channel is set and is closed, the iterator should try to exit
|
||||
// and close as soon as possible.
|
||||
InterruptCh <-chan struct{}
|
||||
}
|
||||
|
||||
// newIteratorOptionsStmt creates the iterator options from stmt.
|
||||
|
@ -655,6 +684,9 @@ func newIteratorOptionsStmt(stmt *SelectStatement, sopt *SelectOptions) (opt Ite
|
|||
opt.Fill, opt.FillValue = stmt.Fill, stmt.FillValue
|
||||
opt.Limit, opt.Offset = stmt.Limit, stmt.Offset
|
||||
opt.SLimit, opt.SOffset = stmt.SLimit, stmt.SOffset
|
||||
if sopt != nil {
|
||||
opt.InterruptCh = sopt.InterruptCh
|
||||
}
|
||||
|
||||
return opt, nil
|
||||
}
|
||||
|
|
|
@ -14,6 +14,10 @@ type SelectOptions struct {
|
|||
|
||||
// The upper bound for a select call.
|
||||
MaxTime time.Time
|
||||
|
||||
// An optional channel that, if closed, signals that the select should be
|
||||
// interrupted.
|
||||
InterruptCh <-chan struct{}
|
||||
}
|
||||
|
||||
// Select executes stmt against ic and returns a list of iterators to stream from.
|
||||
|
|
|
@ -686,14 +686,23 @@ func (e *Engine) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return influxql.NewCallIterator(influxql.NewMergeIterator(inputs, opt), opt)
|
||||
|
||||
input := influxql.NewMergeIterator(inputs, opt)
|
||||
if opt.InterruptCh != nil {
|
||||
input = influxql.NewInterruptIterator(input, opt.InterruptCh)
|
||||
}
|
||||
return influxql.NewCallIterator(input, opt)
|
||||
}
|
||||
|
||||
itrs, err := e.createVarRefIterator(opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return influxql.NewSortedMergeIterator(itrs, opt), nil
|
||||
itr := influxql.NewSortedMergeIterator(itrs, opt)
|
||||
if opt.InterruptCh != nil {
|
||||
itr = influxql.NewInterruptIterator(itr, opt.InterruptCh)
|
||||
}
|
||||
return itr, nil
|
||||
}
|
||||
|
||||
func (e *Engine) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
|
||||
|
|
Loading…
Reference in New Issue