feat(tsi): optimize series iteration (#22316)
When using queries like 'select count(_seriesKey) from bigmeasurement`, we should iterate over the tsi structures to serve the query instead of loading all the series into memory up front. Co-authored-by: Sam Arnold <sarnold@influxdata.com>pull/22331/head
parent
309a05a1d3
commit
df448c654b
|
@ -73,6 +73,7 @@ Because of the version bump to `go`, the macOS build for this release requires a
|
|||
1. [21910](https://github.com/influxdata/influxdb/pull/21910): Added `--ui-disabled` option to `influxd` to allow for running with the UI disabled.
|
||||
1. [21958](https://github.com/influxdata/influxdb/pull/21958): Telemetry improvements: Do not record telemetry data for non-existant paths; replace invalid static asset paths with a slug.
|
||||
1. [22023](https://github.com/influxdata/influxdb/pull/22023): Upgrade Flux to v0.124.0.
|
||||
1. [22316](https://github.com/influxdata/influxdb/pull/22316): Optimize series iteration for queries that can be answered without inspecting TSM data.
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
|
|
|
@ -2390,6 +2390,40 @@ func (e *Engine) CreateIterator(ctx context.Context, measurement string, opt que
|
|||
return newMergeFinalizerIterator(ctx, itrs, opt, e.logger)
|
||||
}
|
||||
|
||||
// createSeriesIterator creates an optimized series iterator if possible.
|
||||
// We exclude less-common cases for now as not worth implementing.
|
||||
func (e *Engine) createSeriesIterator(measurement string, ref *influxql.VarRef, is tsdb.IndexSet, opt query.IteratorOptions) (query.Iterator, error) {
|
||||
// Main check to see if we are trying to create a seriesKey iterator
|
||||
if ref == nil || ref.Val != "_seriesKey" || len(opt.Aux) != 0 {
|
||||
return nil, nil
|
||||
}
|
||||
// Check some other cases that we could maybe handle, but don't
|
||||
if len(opt.Dimensions) > 0 {
|
||||
return nil, nil
|
||||
}
|
||||
if opt.SLimit != 0 || opt.SOffset != 0 {
|
||||
return nil, nil
|
||||
}
|
||||
if opt.StripName {
|
||||
return nil, nil
|
||||
}
|
||||
if opt.Ordered {
|
||||
return nil, nil
|
||||
}
|
||||
// Actual creation of the iterator
|
||||
seriesCursor, err := is.MeasurementSeriesKeyByExprIterator([]byte(measurement), opt.Condition, opt.Authorizer)
|
||||
if err != nil {
|
||||
seriesCursor.Close()
|
||||
return nil, err
|
||||
}
|
||||
var seriesIterator query.Iterator
|
||||
seriesIterator = newSeriesIterator(measurement, seriesCursor)
|
||||
if opt.InterruptCh != nil {
|
||||
seriesIterator = query.NewInterruptIterator(seriesIterator, opt.InterruptCh)
|
||||
}
|
||||
return seriesIterator, nil
|
||||
}
|
||||
|
||||
func (e *Engine) createCallIterator(ctx context.Context, measurement string, call *influxql.Call, opt query.IteratorOptions) ([]query.Iterator, error) {
|
||||
ref, _ := call.Args[0].(*influxql.VarRef)
|
||||
|
||||
|
@ -2399,6 +2433,28 @@ func (e *Engine) createCallIterator(ctx context.Context, measurement string, cal
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// check for optimized series iteration for tsi index
|
||||
if e.index.Type() == tsdb.TSI1IndexName {
|
||||
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
seriesOpt := opt
|
||||
if len(opt.Dimensions) == 0 && call.Name == "count" {
|
||||
// no point ordering the series if we are just counting all of them
|
||||
seriesOpt.Ordered = false
|
||||
}
|
||||
seriesIterator, err := e.createSeriesIterator(measurement, ref, indexSet, seriesOpt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if seriesIterator != nil {
|
||||
callIterator, err := query.NewCallIterator(seriesIterator, opt)
|
||||
if err != nil {
|
||||
seriesIterator.Close()
|
||||
return nil, err
|
||||
}
|
||||
return []query.Iterator{callIterator}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Determine tagsets for this measurement based on dimensions and filters.
|
||||
var (
|
||||
tagSets []*query.TagSet
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
|
||||
"github.com/influxdata/influxql"
|
||||
tassert "github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
|
@ -2023,6 +2024,80 @@ func TestEngine_CreateCursor_Descending(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure engine can create an descending iterator for cached values.
|
||||
func TestEngine_CreateIterator_SeriesKey(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) {
|
||||
assert := tassert.New(t)
|
||||
e := MustOpenEngine(t, index)
|
||||
defer e.Close()
|
||||
|
||||
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
|
||||
e.CreateSeriesIfNotExists([]byte("cpu,host=A,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "A", "region": "east"}))
|
||||
e.CreateSeriesIfNotExists([]byte("cpu,host=B,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "B", "region": "east"}))
|
||||
e.CreateSeriesIfNotExists([]byte("cpu,host=C,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "C", "region": "east"}))
|
||||
e.CreateSeriesIfNotExists([]byte("cpu,host=A,region=west"), []byte("cpu"), models.NewTags(map[string]string{"host": "A", "region": "west"}))
|
||||
|
||||
if err := e.WritePointsString(
|
||||
`cpu,host=A,region=east value=1.1 1000000001`,
|
||||
`cpu,host=B,region=east value=1.2 1000000002`,
|
||||
`cpu,host=A,region=east value=1.3 1000000003`,
|
||||
`cpu,host=C,region=east value=1.4 1000000004`,
|
||||
`cpu,host=A,region=west value=1.5 1000000005`,
|
||||
); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
opts := query.IteratorOptions{
|
||||
Expr: influxql.MustParseExpr(`_seriesKey`),
|
||||
Dimensions: []string{},
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
Condition: influxql.MustParseExpr(`host = 'A'`),
|
||||
}
|
||||
|
||||
itr, err := e.CreateIterator(context.Background(), "cpu", opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
stringItr, ok := itr.(query.StringIterator)
|
||||
assert.True(ok, "series iterator must be of type string")
|
||||
expectedSeries := map[string]struct{}{
|
||||
"cpu,host=A,region=west": struct{}{},
|
||||
"cpu,host=A,region=east": struct{}{},
|
||||
}
|
||||
var str *query.StringPoint
|
||||
for str, err = stringItr.Next(); err == nil && str != (*query.StringPoint)(nil); str, err = stringItr.Next() {
|
||||
_, ok := expectedSeries[str.Value]
|
||||
assert.True(ok, "Saw bad key "+str.Value)
|
||||
delete(expectedSeries, str.Value)
|
||||
}
|
||||
assert.NoError(err)
|
||||
assert.NoError(itr.Close())
|
||||
|
||||
countOpts := opts
|
||||
countOpts.Expr = influxql.MustParseExpr(`count(_seriesKey)`)
|
||||
itr, err = e.CreateIterator(context.Background(), "cpu", countOpts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
integerIter, ok := itr.(query.IntegerIterator)
|
||||
assert.True(ok, "series count iterator must be of type integer")
|
||||
i, err := integerIter.Next()
|
||||
assert.NoError(err)
|
||||
assert.Equal(int64(2), i.Value, "must count 2 series with host=A")
|
||||
i, err = integerIter.Next()
|
||||
assert.NoError(err)
|
||||
assert.Equal((*query.IntegerPoint)(nil), i, "count iterator has only one output")
|
||||
assert.NoError(itr.Close())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func makeBlockTypeSlice(n int) []byte {
|
||||
r := make([]byte, n)
|
||||
b := tsm1.BlockFloat64
|
||||
|
|
|
@ -3,6 +3,7 @@ package tsm1
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/influxql/query"
|
||||
"github.com/influxdata/influxdb/v2/pkg/metrics"
|
||||
|
@ -216,3 +217,71 @@ func newInstrumentedIterator(ctx context.Context, itr query.Iterator) query.Iter
|
|||
panic(fmt.Sprintf("unsupported instrumented iterator type: %T", itr))
|
||||
}
|
||||
}
|
||||
|
||||
type seriesIterator struct {
|
||||
cur tsdb.SeriesKeyIterator
|
||||
point query.StringPoint // reusable buffer
|
||||
|
||||
statsLock sync.Mutex
|
||||
stats query.IteratorStats
|
||||
statsBuf query.IteratorStats
|
||||
}
|
||||
|
||||
func newSeriesIterator(name string, cur tsdb.SeriesKeyIterator) *seriesIterator {
|
||||
itr := &seriesIterator{
|
||||
cur: cur,
|
||||
point: query.StringPoint{
|
||||
Name: name,
|
||||
Tags: query.NewTags(nil),
|
||||
},
|
||||
}
|
||||
itr.stats = itr.statsBuf
|
||||
return itr
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
func (itr *seriesIterator) Next() (*query.StringPoint, error) {
|
||||
// Read from the main cursor
|
||||
b, err := itr.cur.Next()
|
||||
if err != nil {
|
||||
itr.copyStats()
|
||||
return nil, err
|
||||
}
|
||||
itr.point.Value = string(b)
|
||||
|
||||
// Exit if we have no more points or we are outside our time range.
|
||||
if b == nil {
|
||||
itr.copyStats()
|
||||
return nil, nil
|
||||
}
|
||||
// Track points returned.
|
||||
itr.statsBuf.PointN++
|
||||
itr.statsBuf.SeriesN++
|
||||
|
||||
// Copy buffer to stats periodically.
|
||||
if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 {
|
||||
itr.copyStats()
|
||||
}
|
||||
|
||||
return &itr.point, nil
|
||||
}
|
||||
|
||||
// copyStats copies from the itr stats buffer to the stats under lock.
|
||||
func (itr *seriesIterator) copyStats() {
|
||||
itr.statsLock.Lock()
|
||||
itr.stats = itr.statsBuf
|
||||
itr.statsLock.Unlock()
|
||||
}
|
||||
|
||||
// Stats returns stats on the points processed.
|
||||
func (itr *seriesIterator) Stats() query.IteratorStats {
|
||||
itr.statsLock.Lock()
|
||||
stats := itr.stats
|
||||
itr.statsLock.Unlock()
|
||||
return stats
|
||||
}
|
||||
|
||||
// Close closes the iterator.
|
||||
func (itr *seriesIterator) Close() error {
|
||||
return itr.cur.Close()
|
||||
}
|
||||
|
|
|
@ -211,6 +211,12 @@ type SeriesIDIterator interface {
|
|||
Close() error
|
||||
}
|
||||
|
||||
// SeriesKeyIterator represents an iterator over a list of SeriesKeys
|
||||
type SeriesKeyIterator interface {
|
||||
Next() ([]byte, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
// SeriesIDSetIterator represents an iterator that can produce a SeriesIDSet.
|
||||
type SeriesIDSetIterator interface {
|
||||
SeriesIDIterator
|
||||
|
@ -2295,6 +2301,93 @@ func (is IndexSet) measurementSeriesByExprIterator(name []byte, expr influxql.Ex
|
|||
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
|
||||
}
|
||||
|
||||
type measurementSeriesKeyByExprIterator struct {
|
||||
ids SeriesIDIterator
|
||||
is IndexSet
|
||||
auth query.Authorizer
|
||||
once sync.Once
|
||||
releaser func()
|
||||
}
|
||||
|
||||
func (itr *measurementSeriesKeyByExprIterator) Next() ([]byte, error) {
|
||||
if itr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
for {
|
||||
e, err := itr.ids.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if e.SeriesID == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
seriesKey := itr.is.SeriesFile.SeriesKey(e.SeriesID)
|
||||
if len(seriesKey) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
name, tags := ParseSeriesKey(seriesKey)
|
||||
|
||||
// Check leftover filters. All fields that might be filtered default to zero values
|
||||
if e.Expr != nil {
|
||||
if v, ok := e.Expr.(*influxql.BooleanLiteral); ok {
|
||||
if !v.Val {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
values := make(map[string]interface{}, len(tags))
|
||||
for _, t := range tags {
|
||||
values[string(t.Key)] = string(t.Value)
|
||||
}
|
||||
if !influxql.EvalBool(e.Expr, values) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if itr.auth != nil && !itr.auth.AuthorizeSeriesRead(itr.is.Database(), name, tags) {
|
||||
continue
|
||||
}
|
||||
|
||||
out := models.MakeKey(name, tags)
|
||||
// ensure nil is only returned when we are done (or for errors)
|
||||
if out == nil {
|
||||
out = []byte{}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *measurementSeriesKeyByExprIterator) Close() error {
|
||||
if itr == nil {
|
||||
return nil
|
||||
}
|
||||
itr.once.Do(itr.releaser)
|
||||
return itr.ids.Close()
|
||||
}
|
||||
|
||||
// MeasurementSeriesKeyByExprIterator iterates through series, filtered by an expression on the tags.
|
||||
// Any non-tag expressions will be filtered as if the field had the zero value.
|
||||
func (is IndexSet) MeasurementSeriesKeyByExprIterator(name []byte, expr influxql.Expr, auth query.Authorizer) (SeriesKeyIterator, error) {
|
||||
release := is.SeriesFile.Retain()
|
||||
// Create iterator for all matching series.
|
||||
ids, err := is.measurementSeriesByExprIterator(name, expr)
|
||||
if err != nil {
|
||||
release()
|
||||
return nil, err
|
||||
}
|
||||
if ids == nil {
|
||||
release()
|
||||
return nil, nil
|
||||
}
|
||||
return &measurementSeriesKeyByExprIterator{
|
||||
ids: ids,
|
||||
releaser: release,
|
||||
auth: auth,
|
||||
is: is,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
|
||||
func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
|
||||
release := is.SeriesFile.Retain()
|
||||
|
|
|
@ -266,7 +266,7 @@ func (f *SeriesFile) SeriesCount() uint64 {
|
|||
return n
|
||||
}
|
||||
|
||||
// SeriesIterator returns an iterator over all the series.
|
||||
// SeriesIDIterator returns an iterator over all the series.
|
||||
func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator {
|
||||
var ids []uint64
|
||||
for _, p := range f.partitions {
|
||||
|
|
Loading…
Reference in New Issue