From e7940cc556a03731c500e91eab9347dfb99ad286 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 28 Nov 2016 09:59:36 -0700 Subject: [PATCH] Add tsi1 series system iterator. --- tsdb/engine.go | 3 + tsdb/engine/tsm1/engine.go | 61 +++--------- tsdb/index.go | 8 +- tsdb/index/inmem/inmem.go | 96 ++++++++++++++++++- tsdb/index/tsi1/index.go | 108 ++++++++++++++++++++- tsdb/index/tsi1/log_file.go | 25 ++++- tsdb/index/tsi1/tsi1.go | 82 ++++++++-------- tsdb/shard.go | 181 ++++++++++++++---------------------- 8 files changed, 362 insertions(+), 202 deletions(-) diff --git a/tsdb/engine.go b/tsdb/engine.go index 4f5cb97f32..ad135ea87a 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -55,6 +55,9 @@ type Engine interface { MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) MeasurementFields(measurement string) *MeasurementFields + // InfluxQL system iterators + SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) + // Statistics will return statistics relevant to this engine. Statistics(tags map[string]string) []models.Statistic LastModified() time.Time diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 322a8e4f58..104503b49d 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -101,9 +101,8 @@ type Engine struct { traceLogger zap.Logger // Logger to be used when trace-logging is on. traceLogging bool - index tsdb.Index - fieldsMu sync.RWMutex - measurementFields map[string]*tsdb.MeasurementFields + index tsdb.Index + fieldset *tsdb.MeasurementFieldSet WAL *WAL Cache *Cache @@ -149,7 +148,7 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, opt tsdb. logOutput: os.Stderr, traceLogging: opt.Config.TraceLoggingEnabled, - measurementFields: make(map[string]*tsdb.MeasurementFields), + fieldset: tsdb.NewMeasurementFieldSet(), WAL: w, Cache: cache, @@ -167,6 +166,9 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, opt tsdb. stats: &EngineStatistics{}, } + // Attach fieldset to index. + e.index.SetFieldSet(e.fieldset) + if e.traceLogging { fs.enableTraceLogging(true) w.enableTraceLogging(true) @@ -302,22 +304,7 @@ func (e *Engine) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { // MeasurementFields returns the measurement fields for a measurement. func (e *Engine) MeasurementFields(measurement string) *tsdb.MeasurementFields { - e.fieldsMu.RLock() - m := e.measurementFields[measurement] - e.fieldsMu.RUnlock() - - if m != nil { - return m - } - - e.fieldsMu.Lock() - m = e.measurementFields[measurement] - if m == nil { - m = tsdb.NewMeasurementFields() - e.measurementFields[measurement] = m - } - e.fieldsMu.Unlock() - return m + return e.fieldset.CreateFieldsIfNotExists(measurement) } func (e *Engine) SeriesN() (uint64, error) { @@ -655,14 +642,7 @@ func (e *Engine) addToIndexFromKey(key []byte, fieldType influxql.DataType, inde seriesKey, field := SeriesAndFieldFromCompositeKey(key) name := tsdb.MeasurementFromSeriesKey(string(seriesKey)) - e.fieldsMu.Lock() - mf := e.measurementFields[name] - if mf == nil { - mf = tsdb.NewMeasurementFields() - e.measurementFields[name] = mf - } - e.fieldsMu.Unlock() - + mf := e.fieldset.CreateFieldsIfNotExists(name) if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil { return err } @@ -848,9 +828,7 @@ func (e *Engine) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error { // DeleteMeasurement deletes a measurement and all related series. func (e *Engine) DeleteMeasurement(name []byte) error { - e.mu.Lock() - delete(e.measurementFields, string(name)) - e.mu.Unlock() + e.fieldset.Delete(string(name)) // Attempt to find the series keys. m, err := e.Measurement(name) @@ -1335,18 +1313,8 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bo var itrs []influxql.Iterator if err := func() error { for _, name := range influxql.Sources(opt.Sources).Names() { - // Retrieve measurement fields. - e.mu.Lock() - mf := e.measurementFields[name] - e.mu.Unlock() - - // Skip if there are no fields. - if mf == nil { - continue - } - // Generate tag sets from index. - tagSets, err := e.index.TagSets([]byte(name), opt.Dimensions, opt.Condition, mf) + tagSets, err := e.index.TagSets([]byte(name), opt.Dimensions, opt.Condition) if err != nil { return err } @@ -1594,10 +1562,7 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, name string, s // buildCursor creates an untyped cursor for a field. func (e *Engine) buildCursor(measurement, seriesKey string, ref *influxql.VarRef, opt influxql.IteratorOptions) cursor { // Look up fields for measurement. - e.fieldsMu.RLock() - mf := e.measurementFields[measurement] - e.fieldsMu.RUnlock() - + mf := e.fieldset.Fields(measurement) if mf == nil { return nil } @@ -1671,6 +1636,10 @@ func (e *Engine) buildBooleanCursor(measurement, seriesKey, field string, opt in return newBooleanCursor(opt.SeekTime(), opt.Ascending, cacheValues, keyCursor) } +func (e *Engine) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return e.index.SeriesPointIterator(opt) +} + // SeriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID. func SeriesFieldKey(seriesKey, field string) string { return seriesKey + keyFieldSeparator + field diff --git a/tsdb/index.go b/tsdb/index.go index ffeaf73669..f101a5264e 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -31,7 +31,13 @@ type Index interface { Dereference(b []byte) - TagSets(name []byte, dimensions []string, condition influxql.Expr, mf *MeasurementFields) ([]*influxql.TagSet, error) + TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) + + // InfluxQL system iterators + SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) + + // Sets a shared fieldset from the engine. + SetFieldSet(fs *MeasurementFieldSet) // To be removed w/ tsi1. SetFieldName(measurement, name string) diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 4ace46e105..099afb6fa8 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -493,7 +493,7 @@ func (i *Index) Dereference(b []byte) { } // TagSets returns a list of tag sets. -func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr, mf *tsdb.MeasurementFields) ([]*influxql.TagSet, error) { +func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { i.mu.RLock() defer i.mu.RUnlock() @@ -520,8 +520,102 @@ func (i *Index) SeriesKeys() []string { return s } +// SetFieldSet sets a shared field set from the engine. +func (i *Index) SetFieldSet(*tsdb.MeasurementFieldSet) {} + // SetFieldName adds a field name to a measurement. func (i *Index) SetFieldName(measurement, name string) { m := i.CreateMeasurementIndexIfNotExists(measurement) m.SetFieldName(name) } + +// SeriesPointIterator returns an influxql iterator over all series. +func (i *Index) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { + // Read and sort all measurements. + mms := make(tsdb.Measurements, 0, len(i.measurements)) + for _, mm := range i.measurements { + mms = append(mms, mm) + } + sort.Sort(mms) + + return &seriesPointIterator{ + mms: mms, + point: influxql.FloatPoint{ + Aux: make([]interface{}, len(opt.Aux)), + }, + opt: opt, + }, nil +} + +// seriesPointIterator emits series as influxql points. +type seriesPointIterator struct { + mms tsdb.Measurements + keys struct { + buf []string + i int + } + + point influxql.FloatPoint // reusable point + opt influxql.IteratorOptions +} + +// Stats returns stats about the points processed. +func (itr *seriesPointIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} } + +// Close closes the iterator. +func (itr *seriesPointIterator) Close() error { return nil } + +// Next emits the next point in the iterator. +func (itr *seriesPointIterator) Next() (*influxql.FloatPoint, error) { + for { + // Load next measurement's keys if there are no more remaining. + if itr.keys.i >= len(itr.keys.buf) { + if err := itr.nextKeys(); err != nil { + return nil, err + } + if len(itr.keys.buf) == 0 { + return nil, nil + } + } + + // Read the next key. + key := itr.keys.buf[itr.keys.i] + itr.keys.i++ + + // Write auxiliary fields. + for i, f := range itr.opt.Aux { + switch f.Val { + case "key": + itr.point.Aux[i] = key + } + } + return &itr.point, nil + } +} + +// nextKeys reads all keys for the next measurement. +func (itr *seriesPointIterator) nextKeys() error { + for { + // Ensure previous keys are cleared out. + itr.keys.i, itr.keys.buf = 0, itr.keys.buf[:0] + + // Read next measurement. + if len(itr.mms) == 0 { + return nil + } + mm := itr.mms[0] + itr.mms = itr.mms[1:] + + // Read all series keys. + ids, err := mm.SeriesIDsAllOrByExpr(itr.opt.Condition) + if err != nil { + return err + } else if len(ids) == 0 { + continue + } + itr.keys.buf = mm.AppendSeriesKeysByID(itr.keys.buf, ids) + sort.Strings(itr.keys.buf) + + return nil + } +} diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index e70c16e0cd..d656e2c37e 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -37,6 +37,10 @@ type Index struct { mu sync.RWMutex logFiles []*LogFile indexFiles IndexFiles + + // Fieldset shared with engine. + // TODO: Move field management into index. + fieldset *tsdb.MeasurementFieldSet } // Open opens the index. @@ -124,6 +128,13 @@ func (i *Index) Close() error { return nil } +// SetFieldSet sets a shared field set from the engine. +func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) { + i.mu.Lock() + i.fieldset = fs + i.mu.Unlock() +} + // SetLogFiles explicitly sets log files. // TEMPORARY: For testing only. func (i *Index) SetLogFiles(a ...*LogFile) { i.logFiles = a } @@ -149,6 +160,19 @@ func (i *Index) files() []File { return a } +// SeriesIterator returns an iterator over all series in the index. +func (i *Index) SeriesIterator() SeriesIterator { + a := make([]SeriesIterator, 0, i.FileN()) + for _, f := range i.files() { + itr := f.SeriesIterator() + if itr == nil { + continue + } + a = append(a, itr) + } + return MergeSeriesIterators(a...) +} + // Measurement retrieves a measurement by name. func (i *Index) Measurement(name []byte) (*tsdb.Measurement, error) { return i.measurement(name), nil @@ -587,8 +611,8 @@ func (i *Index) matchTagValueNotEqualNotEmptySeriesIterator(name, key []byte, va // TagSets returns an ordered list of tag sets for a measurement by dimension // and filtered by an optional conditional expression. -func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr, mf *tsdb.MeasurementFields) ([]*influxql.TagSet, error) { - itr, err := i.MeasurementSeriesByExprIterator(name, condition, mf) +func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { + itr, err := i.MeasurementSeriesByExprIterator(name, condition) if err != nil { return nil, err } else if itr == nil { @@ -646,12 +670,12 @@ func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Exp // MeasurementSeriesByExprIterator returns a series iterator for a measurement // that is filtered by expr. If expr only contains time expressions then this // call is equivalent to MeasurementSeriesIterator(). -func (i *Index) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr, mf *tsdb.MeasurementFields) (SeriesIterator, error) { +func (i *Index) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIterator, error) { // Return all series for the measurement if there are no tag expressions. if expr == nil || influxql.OnlyTimeExpr(expr) { return i.MeasurementSeriesIterator(name), nil } - return i.seriesByExprIterator(name, expr, mf) + return i.seriesByExprIterator(name, expr, i.fieldset.CreateFieldsIfNotExists(string(name))) } func (i *Index) seriesByExprIterator(name []byte, expr influxql.Expr, mf *tsdb.MeasurementFields) (SeriesIterator, error) { @@ -808,6 +832,11 @@ func (i *Index) RemoveShard(shardID uint64) {} func (i *Index) AssignShard(k string, shardID uint64) {} func (i *Index) UnassignShard(k string, shardID uint64) {} +// SeriesPointIterator returns an influxql iterator over all series. +func (i *Index) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return newSeriesPointIterator(i, opt), nil +} + // File represents a log or index file. type File interface { Measurement(name []byte) MeasurementElem @@ -815,6 +844,8 @@ type File interface { TagValueIterator(name, key []byte) (itr TagValueIterator, deleted bool) + SeriesIterator() SeriesIterator + MeasurementSeriesIterator(name []byte) SeriesIterator TagKeySeriesIterator(name, key []byte) (itr SeriesIterator, deleted bool) TagValueSeriesIterator(name, key, value []byte) (itr SeriesIterator, deleted bool) } @@ -838,3 +869,72 @@ func (fe FilterExprs) Len() int { } return len(fe) } + +// seriesPointIterator adapts SeriesIterator to an influxql.Iterator. +type seriesPointIterator struct { + index *Index + mitr MeasurementIterator + sitr SeriesIterator + opt influxql.IteratorOptions + + point influxql.FloatPoint // reusable point +} + +// newSeriesPointIterator returns a new instance of seriesPointIterator. +func newSeriesPointIterator(index *Index, opt influxql.IteratorOptions) *seriesPointIterator { + return &seriesPointIterator{ + index: index, + mitr: index.MeasurementIterator(), + point: influxql.FloatPoint{ + Aux: make([]interface{}, len(opt.Aux)), + }, + opt: opt, + } +} + +// Stats returns stats about the points processed. +func (itr *seriesPointIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} } + +// Close closes the iterator. +func (itr *seriesPointIterator) Close() error { return nil } + +// Next emits the next point in the iterator. +func (itr *seriesPointIterator) Next() (*influxql.FloatPoint, error) { + for { + // Create new series iterator, if necessary. + // Exit if there are no measurements remaining. + if itr.sitr == nil { + m := itr.mitr.Next() + if m == nil { + return nil, nil + } + + sitr, err := itr.index.MeasurementSeriesByExprIterator(m.Name(), itr.opt.Condition) + if err != nil { + return nil, err + } else if sitr == nil { + continue + } + itr.sitr = sitr + } + + // Read next series element. + e := itr.sitr.Next() + if e == nil { + itr.sitr = nil + continue + } + + // Convert to a key. + key := string(models.MakeKey(e.Name(), e.Tags())) + + // Write auxiliary fields. + for i, f := range itr.opt.Aux { + switch f.Val { + case "key": + itr.point.Aux[i] = key + } + } + return &itr.point, nil + } +} diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 33dad95f34..07ec6a5dab 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -429,6 +429,29 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) { f.mms[string(e.Name)] = mm } +// SeriesIterator returns an iterator over all series in the log file. +func (f *LogFile) SeriesIterator() SeriesIterator { + f.mu.RLock() + defer f.mu.RUnlock() + + // Sort measurement names determine total series count. + var n int + names := make([][]byte, 0, len(f.mms)) + for _, mm := range f.mms { + names = append(names, mm.name) + n += len(mm.series) + } + sort.Sort(byteSlices(names)) + + // Combine series across all measurements. + series := make(logSeries, 0, n) + for _, name := range names { + series = append(series, f.mms[string(name)].series...) + } + + return newLogSeriesIterator(series) +} + // measurement returns a measurement by name. func (f *LogFile) measurement(name []byte) logMeasurement { mm, ok := f.mms[string(name)] @@ -451,7 +474,7 @@ func (f *LogFile) MeasurementIterator() MeasurementIterator { return &itr } -// MeasurementSeriesIterator returns an iterator over all series in the log file. +// MeasurementSeriesIterator returns an iterator over all series for a measurement. func (f *LogFile) MeasurementSeriesIterator(name []byte) SeriesIterator { f.mu.RLock() defer f.mu.RUnlock() diff --git a/tsdb/index/tsi1/tsi1.go b/tsdb/index/tsi1/tsi1.go index cc2d5d9051..304c56aaae 100644 --- a/tsdb/index/tsi1/tsi1.go +++ b/tsdb/index/tsi1/tsi1.go @@ -473,48 +473,50 @@ type seriesIntersectIterator struct { // Next returns the next element which occurs in both iterators. func (itr *seriesIntersectIterator) Next() (e SeriesElem) { - // Fill buffers. - if itr.buf[0] == nil { - itr.buf[0] = itr.itrs[0].Next() - } - if itr.buf[1] == nil { - itr.buf[1] = itr.itrs[1].Next() - } - - // Exit if either buffer is still empty. - if itr.buf[0] == nil || itr.buf[1] == nil { - return nil - } - - // Return lesser series. - if cmp := CompareSeriesElem(itr.buf[0], itr.buf[1]); cmp == -1 { - e, itr.buf[0] = itr.buf[0], nil - return e - } else if cmp == 1 { - e, itr.buf[1] = itr.buf[1], nil - return e - } - - // Merge series together if equal. - itr.e.SeriesElem = itr.buf[0] - - // Attach expression. - expr0 := itr.buf[0].Expr() - expr1 := itr.buf[0].Expr() - if expr0 == nil { - itr.e.expr = expr1 - } else if expr1 == nil { - itr.e.expr = expr0 - } else { - itr.e.expr = &influxql.BinaryExpr{ - Op: influxql.AND, - LHS: expr0, - RHS: expr1, + for { + // Fill buffers. + if itr.buf[0] == nil { + itr.buf[0] = itr.itrs[0].Next() + } + if itr.buf[1] == nil { + itr.buf[1] = itr.itrs[1].Next() } - } - itr.buf[0], itr.buf[1] = nil, nil - return &itr.e + // Exit if either buffer is still empty. + if itr.buf[0] == nil || itr.buf[1] == nil { + return nil + } + + // Skip if both series are not equal. + if cmp := CompareSeriesElem(itr.buf[0], itr.buf[1]); cmp == -1 { + itr.buf[0] = nil + continue + } else if cmp == 1 { + itr.buf[1] = nil + continue + } + + // Merge series together if equal. + itr.e.SeriesElem = itr.buf[0] + + // Attach expression. + expr0 := itr.buf[0].Expr() + expr1 := itr.buf[0].Expr() + if expr0 == nil { + itr.e.expr = expr1 + } else if expr1 == nil { + itr.e.expr = expr0 + } else { + itr.e.expr = &influxql.BinaryExpr{ + Op: influxql.AND, + LHS: expr0, + RHS: expr1, + } + } + + itr.buf[0], itr.buf[1] = nil, nil + return &itr.e + } } // UnionSeriesIterators returns an iterator that returns series from both diff --git a/tsdb/shard.go b/tsdb/shard.go index c19a5f534d..3e9b08514b 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -705,7 +705,7 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite case "_fieldKeys": return NewFieldKeysIterator(s, opt) case "_series": - return NewSeriesIterator(s, opt) + return s.createSeriesIterator(opt) case "_tagKeys": return NewTagKeysIterator(s, opt) default: @@ -713,6 +713,28 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite } } +// createSeriesIterator returns a new instance of SeriesIterator. +func (s *Shard) createSeriesIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { + // Only equality operators are allowed. + var err error + influxql.WalkFunc(opt.Condition, func(n influxql.Node) { + switch n := n.(type) { + case *influxql.BinaryExpr: + switch n.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX, + influxql.OR, influxql.AND: + default: + err = errors.New("invalid tag comparison operator") + } + } + }) + if err != nil { + return nil, err + } + + return s.engine.SeriesPointIterator(opt) +} + // FieldDimensions returns unique sets of fields and dimensions across a list of sources. func (s *Shard) FieldDimensions(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { if err := s.ready(); err != nil { @@ -1033,6 +1055,55 @@ func (m *MeasurementFields) FieldSet() map[string]influxql.DataType { return fields } +// MeasurementFieldSet represents a collection of fields by measurement. +// This safe for concurrent use. +type MeasurementFieldSet struct { + mu sync.RWMutex + fields map[string]*MeasurementFields +} + +// NewMeasurementFieldSet returns a new instance of MeasurementFieldSet. +func NewMeasurementFieldSet() *MeasurementFieldSet { + return &MeasurementFieldSet{ + fields: make(map[string]*MeasurementFields), + } +} + +// Fields returns fields for a measurement by name. +func (fs *MeasurementFieldSet) Fields(name string) *MeasurementFields { + fs.mu.RLock() + mf := fs.fields[name] + fs.mu.RUnlock() + return mf +} + +// CreateFieldsIfNotExists returns fields for a measurement by name. +func (fs *MeasurementFieldSet) CreateFieldsIfNotExists(name string) *MeasurementFields { + fs.mu.RLock() + mf := fs.fields[name] + fs.mu.RUnlock() + + if mf != nil { + return mf + } + + fs.mu.Lock() + mf = fs.fields[name] + if mf == nil { + mf = NewMeasurementFields() + fs.fields[name] = mf + } + fs.mu.Unlock() + return mf +} + +// Delete removes a field set for a measurement. +func (fs *MeasurementFieldSet) Delete(name string) { + fs.mu.Lock() + delete(fs.fields, name) + fs.mu.Unlock() +} + // Field represents a series field. type Field struct { ID uint8 `json:"id,omitempty"` @@ -1159,114 +1230,6 @@ func (itr *fieldKeysIterator) Next() (*influxql.FloatPoint, error) { } } -// seriesIterator emits series ids. -type seriesIterator struct { - mms Measurements - keys struct { - buf []string - i int - } - - point influxql.FloatPoint // reusable point - opt influxql.IteratorOptions -} - -// NewSeriesIterator returns a new instance of SeriesIterator. -func NewSeriesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) { - // Only equality operators are allowed. - var err error - influxql.WalkFunc(opt.Condition, func(n influxql.Node) { - switch n := n.(type) { - case *influxql.BinaryExpr: - switch n.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX, - influxql.OR, influxql.AND: - default: - err = errors.New("invalid tag comparison operator") - } - } - }) - if err != nil { - return nil, err - } - - // Read and sort all measurements. - mms, err := sh.engine.Measurements() - if err != nil { - return nil, err - } - sort.Sort(mms) - - return &seriesIterator{ - mms: mms, - point: influxql.FloatPoint{ - Aux: make([]interface{}, len(opt.Aux)), - }, - opt: opt, - }, nil -} - -// Stats returns stats about the points processed. -func (itr *seriesIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} } - -// Close closes the iterator. -func (itr *seriesIterator) Close() error { return nil } - -// Next emits the next point in the iterator. -func (itr *seriesIterator) Next() (*influxql.FloatPoint, error) { - for { - // Load next measurement's keys if there are no more remaining. - if itr.keys.i >= len(itr.keys.buf) { - if err := itr.nextKeys(); err != nil { - return nil, err - } - if len(itr.keys.buf) == 0 { - return nil, nil - } - } - - // Read the next key. - key := itr.keys.buf[itr.keys.i] - itr.keys.i++ - - // Write auxiliary fields. - for i, f := range itr.opt.Aux { - switch f.Val { - case "key": - itr.point.Aux[i] = key - } - } - return &itr.point, nil - } -} - -// nextKeys reads all keys for the next measurement. -func (itr *seriesIterator) nextKeys() error { - for { - // Ensure previous keys are cleared out. - itr.keys.i, itr.keys.buf = 0, itr.keys.buf[:0] - - // Read next measurement. - if len(itr.mms) == 0 { - return nil - } - mm := itr.mms[0] - itr.mms = itr.mms[1:] - - // Read all series keys. - ids, err := mm.seriesIDsAllOrByExpr(itr.opt.Condition) - if err != nil { - return err - } else if len(ids) == 0 { - continue - } - itr.keys.buf = mm.AppendSeriesKeysByID(itr.keys.buf, ids) - sort.Strings(itr.keys.buf) - - return nil - } -} - // NewTagKeysIterator returns a new instance of TagKeysIterator. func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) { fn := func(m *Measurement) []string {