package tsm1 import ( "context" "fmt" "sync" "github.com/influxdata/influxdb/pkg/metrics" "github.com/influxdata/influxdb/pkg/tracing" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/tsdb" "go.uber.org/zap" ) func newLimitIterator(input query.Iterator, opt query.IteratorOptions) query.Iterator { switch input := input.(type) { case query.FloatIterator: return newFloatLimitIterator(input, opt) case query.IntegerIterator: return newIntegerLimitIterator(input, opt) case query.UnsignedIterator: return newUnsignedLimitIterator(input, opt) case query.StringIterator: return newStringLimitIterator(input, opt) case query.BooleanIterator: return newBooleanLimitIterator(input, opt) default: panic(fmt.Sprintf("unsupported limit iterator type: %T", input)) } } type floatCastIntegerCursor struct { cursor integerCursor } func (c *floatCastIntegerCursor) close() error { return c.cursor.close() } func (c *floatCastIntegerCursor) next() (t int64, v interface{}) { return c.nextFloat() } func (c *floatCastIntegerCursor) nextFloat() (int64, float64) { t, v := c.cursor.nextInteger() return t, float64(v) } type floatCastUnsignedCursor struct { cursor unsignedCursor } func (c *floatCastUnsignedCursor) close() error { return c.cursor.close() } func (c *floatCastUnsignedCursor) next() (t int64, v interface{}) { return c.nextFloat() } func (c *floatCastUnsignedCursor) nextFloat() (int64, float64) { t, v := c.cursor.nextUnsigned() return t, float64(v) } type integerCastFloatCursor struct { cursor floatCursor } func (c *integerCastFloatCursor) close() error { return c.cursor.close() } func (c *integerCastFloatCursor) next() (t int64, v interface{}) { return c.nextInteger() } func (c *integerCastFloatCursor) nextInteger() (int64, int64) { t, v := c.cursor.nextFloat() return t, int64(v) } type integerCastUnsignedCursor struct { cursor unsignedCursor } func (c *integerCastUnsignedCursor) close() error { return c.cursor.close() } func (c *integerCastUnsignedCursor) next() (t int64, v interface{}) { return c.nextInteger() } func (c *integerCastUnsignedCursor) nextInteger() (int64, int64) { t, v := c.cursor.nextUnsigned() return t, int64(v) } type unsignedCastFloatCursor struct { cursor floatCursor } func (c *unsignedCastFloatCursor) close() error { return c.cursor.close() } func (c *unsignedCastFloatCursor) next() (t int64, v interface{}) { return c.nextUnsigned() } func (c *unsignedCastFloatCursor) nextUnsigned() (int64, uint64) { t, v := c.cursor.nextFloat() return t, uint64(v) } type unsignedCastIntegerCursor struct { cursor integerCursor } func (c *unsignedCastIntegerCursor) close() error { return c.cursor.close() } func (c *unsignedCastIntegerCursor) next() (t int64, v interface{}) { return c.nextUnsigned() } func (c *unsignedCastIntegerCursor) nextUnsigned() (int64, uint64) { t, v := c.cursor.nextInteger() return t, uint64(v) } // literalValueCursor represents a cursor that always returns a single value. // It doesn't not have a time value so it can only be used with nextAt(). type literalValueCursor struct { value interface{} } func (c *literalValueCursor) close() error { return nil } func (c *literalValueCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value } func (c *literalValueCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value } func (c *literalValueCursor) nextAt(seek int64) interface{} { return c.value } // preallocate and cast to cursorAt to avoid allocations var ( nilFloatLiteralValueCursor cursorAt = &literalValueCursor{value: (*float64)(nil)} nilIntegerLiteralValueCursor cursorAt = &literalValueCursor{value: (*int64)(nil)} nilUnsignedLiteralValueCursor cursorAt = &literalValueCursor{value: (*uint64)(nil)} nilStringLiteralValueCursor cursorAt = &literalValueCursor{value: (*string)(nil)} nilBooleanLiteralValueCursor cursorAt = &literalValueCursor{value: (*bool)(nil)} ) // stringSliceCursor is a cursor that outputs a slice of string values. type stringSliceCursor struct { values []string } func (c *stringSliceCursor) close() error { return nil } func (c *stringSliceCursor) next() (int64, interface{}) { return c.nextString() } func (c *stringSliceCursor) nextString() (int64, string) { if len(c.values) == 0 { return tsdb.EOF, "" } value := c.values[0] c.values = c.values[1:] return 0, value } type cursorsAt []cursorAt func (c cursorsAt) close() { for _, cur := range c { cur.close() } } // newMergeFinalizerIterator creates a new Merge iterator from the inputs. If the call to Merge succeeds, // the resulting Iterator will be wrapped in a finalizer iterator. // If Merge returns an error, the inputs will be closed. func newMergeFinalizerIterator(ctx context.Context, inputs []query.Iterator, opt query.IteratorOptions, log *zap.Logger) (query.Iterator, error) { itr, err := query.Iterators(inputs).Merge(opt) if err != nil { query.Iterators(inputs).Close() return nil, err } return newInstrumentedIterator(ctx, newFinalizerIterator(itr, log)), nil } // newFinalizerIterator creates a new iterator that installs a runtime finalizer // to ensure close is eventually called if the iterator is garbage collected. // This additional guard attempts to protect against clients of CreateIterator not // correctly closing them and leaking cursors. func newFinalizerIterator(itr query.Iterator, log *zap.Logger) query.Iterator { if itr == nil { return nil } switch inner := itr.(type) { case query.FloatIterator: return newFloatFinalizerIterator(inner, log) case query.IntegerIterator: return newIntegerFinalizerIterator(inner, log) case query.UnsignedIterator: return newUnsignedFinalizerIterator(inner, log) case query.StringIterator: return newStringFinalizerIterator(inner, log) case query.BooleanIterator: return newBooleanFinalizerIterator(inner, log) default: panic(fmt.Sprintf("unsupported finalizer iterator type: %T", itr)) } } func newInstrumentedIterator(ctx context.Context, itr query.Iterator) query.Iterator { if itr == nil { return nil } span := tracing.SpanFromContext(ctx) grp := metrics.GroupFromContext(ctx) if span == nil || grp == nil { return itr } switch inner := itr.(type) { case query.FloatIterator: return newFloatInstrumentedIterator(inner, span, grp) case query.IntegerIterator: return newIntegerInstrumentedIterator(inner, span, grp) case query.UnsignedIterator: return newUnsignedInstrumentedIterator(inner, span, grp) case query.StringIterator: return newStringInstrumentedIterator(inner, span, grp) case query.BooleanIterator: return newBooleanInstrumentedIterator(inner, span, grp) default: panic(fmt.Sprintf("unsupported instrumented iterator type: %T", itr)) } } type seriesIterator struct { cur tsdb.SeriesKeyIterator point query.StringPoint // reusable buffer statsLock sync.Mutex stats query.IteratorStats statsBuf query.IteratorStats } func newSeriesIterator(name string, cur tsdb.SeriesKeyIterator) (*seriesIterator, error) { if cur == nil { return nil, fmt.Errorf("internal error: cannot create series iterator from nil iterator") } itr := &seriesIterator{ cur: cur, point: query.StringPoint{ Name: name, Tags: query.NewTags(nil), }, } itr.stats = itr.statsBuf return itr, nil } // Next returns the next point from the iterator. func (itr *seriesIterator) Next() (*query.StringPoint, error) { for { // Read from the main cursor b, err := itr.cur.Next() if err != nil { itr.copyStats() return nil, err } itr.point.Value = string(b) // Exit if we have no more points or we are outside our time range. if b == nil { itr.copyStats() return nil, nil } // Track points returned. itr.statsBuf.PointN++ itr.statsBuf.SeriesN++ // Copy buffer to stats periodically. if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 { itr.copyStats() } return &itr.point, nil } } // copyStats copies from the itr stats buffer to the stats under lock. func (itr *seriesIterator) copyStats() { itr.statsLock.Lock() itr.stats = itr.statsBuf itr.statsLock.Unlock() } // Stats returns stats on the points processed. func (itr *seriesIterator) Stats() query.IteratorStats { itr.statsLock.Lock() stats := itr.stats itr.statsLock.Unlock() return stats } // Close closes the iterator. func (itr *seriesIterator) Close() error { return itr.cur.Close() }