From e739afb77d17c5d776c571ddd3658beccac24431 Mon Sep 17 00:00:00 2001 From: zhulongcheng Date: Tue, 23 Oct 2018 11:14:23 +0800 Subject: [PATCH 01/11] add tsi1.Index methods These methods are copied from tsdb.IndexSet and modified slightly. This fix is to remove tsdb.IndexSet to resolve #886. --- tsdb/index.go | 19 +- tsdb/tsi1/index.go | 566 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 575 insertions(+), 10 deletions(-) diff --git a/tsdb/index.go b/tsdb/index.go index df0bb2d7c0..9f9fab8a71 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -362,29 +362,29 @@ func (itr *filterUndeletedSeriesIDIterator) Next() (SeriesIDElem, error) { } // seriesIDExprIterator is an iterator that attaches an associated expression. -type seriesIDExprIterator struct { +type SeriesIDExprIterator struct { itr SeriesIDIterator expr influxql.Expr } // newSeriesIDExprIterator returns a new instance of seriesIDExprIterator. -func newSeriesIDExprIterator(itr SeriesIDIterator, expr influxql.Expr) SeriesIDIterator { +func NewSeriesIDExprIterator(itr SeriesIDIterator, expr influxql.Expr) SeriesIDIterator { if itr == nil { return nil } - return &seriesIDExprIterator{ + return &SeriesIDExprIterator{ itr: itr, expr: expr, } } -func (itr *seriesIDExprIterator) Close() error { +func (itr *SeriesIDExprIterator) Close() error { return itr.itr.Close() } // Next returns the next element in the iterator. -func (itr *seriesIDExprIterator) Next() (SeriesIDElem, error) { +func (itr *SeriesIDExprIterator) Next() (SeriesIDElem, error) { elem, err := itr.itr.Next() if err != nil { return SeriesIDElem{}, err @@ -733,6 +733,7 @@ func (itr *seriesIDDifferenceIterator) Next() (_ SeriesIDElem, err error) { type seriesPointIterator struct { once sync.Once indexSet IndexSet + // index *tsi1.Index mitr MeasurementIterator keys [][]byte opt query.IteratorOptions @@ -2452,8 +2453,8 @@ func assert(condition bool, msg string, v ...interface{}) { } } -type byTagKey []*query.TagSet +type ByTagKey []*query.TagSet -func (t byTagKey) Len() int { return len(t) } -func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 } -func (t byTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t ByTagKey) Len() int { return len(t) } +func (t ByTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 } +func (t ByTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] } diff --git a/tsdb/tsi1/index.go b/tsdb/tsi1/index.go index 6e42c0ed2f..840d6d4ede 100644 --- a/tsdb/tsi1/index.go +++ b/tsdb/tsi1/index.go @@ -13,7 +13,11 @@ import ( "sync/atomic" "unsafe" + "bytes" + "sort" + "github.com/cespare/xxhash" + "github.com/influxdata/influxdb/query" "github.com/influxdata/influxql" "github.com/influxdata/platform/models" "github.com/influxdata/platform/pkg/estimator" @@ -534,8 +538,52 @@ func (i *Index) MeasurementIterator() (tsdb.MeasurementIterator, error) { return tsdb.MergeMeasurementIterators(itrs...), nil } -// MeasurementSeriesIDIterator returns an iterator over all series in a measurement. +func (i *Index) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr) (tsdb.SeriesIDIterator, error) { + return i.measurementSeriesByExprIterator(name, expr) +} + +// measurementSeriesByExprIterator returns a series iterator for a measurement +// that is filtered by expr. See MeasurementSeriesByExprIterator for more details. +// +// measurementSeriesByExprIterator guarantees to never take any locks on the +// series file. +func (i *Index) measurementSeriesByExprIterator(name []byte, expr influxql.Expr) (tsdb.SeriesIDIterator, error) { + // Return all series for the measurement if there are no tag expressions. + + release := i.sfile.Retain() + defer release() + + if expr == nil { + itr, err := i.measurementSeriesIDIterator(name) + if err != nil { + return nil, err + } + return tsdb.FilterUndeletedSeriesIDIterator(i.sfile, itr), nil + } + + itr, err := i.seriesByExprIterator(name, expr) + if err != nil { + return nil, err + } + + return tsdb.FilterUndeletedSeriesIDIterator(i.sfile, itr), nil +} + +// MeasurementSeriesIDIterator returns an iterator over all non-tombstoned series +// for the provided measurement. func (i *Index) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) { + itr, err := i.measurementSeriesIDIterator(name) + if err != nil { + return nil, err + } + + release := i.sfile.Retain() + defer release() + return tsdb.FilterUndeletedSeriesIDIterator(i.sfile, itr), nil +} + +// measurementSeriesIDIterator returns an iterator over all series in a measurement. +func (i *Index) measurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) { itrs := make([]tsdb.SeriesIDIterator, 0, len(i.partitions)) for _, p := range i.partitions { itr, err := p.MeasurementSeriesIDIterator(name) @@ -945,6 +993,18 @@ func (i *Index) TagValueIterator(name, key []byte) (tsdb.TagValueIterator, error // TagKeySeriesIDIterator returns a series iterator for all values across a single key. func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) { + release := i.sfile.Retain() + defer release() + + itr, err := i.tagKeySeriesIDIterator(name, key) + if err != nil { + return nil, err + } + return tsdb.FilterUndeletedSeriesIDIterator(i.sfile, itr), nil +} + +// tagKeySeriesIDIterator returns a series iterator for all values across a single key. +func (i *Index) tagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) { a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions)) for _, p := range i.partitions { itr := p.TagKeySeriesIDIterator(name, key) @@ -957,6 +1017,18 @@ func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, // TagValueSeriesIDIterator returns a series iterator for a single tag value. func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) { + release := i.sfile.Retain() + defer release() + + itr, err := i.tagValueSeriesIDIterator(name, key, value) + if err != nil { + return nil, err + } + return tsdb.FilterUndeletedSeriesIDIterator(i.sfile, itr), nil +} + +// tagValueSeriesIDIterator returns a series iterator for a single tag value. +func (i *Index) tagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) { // Check series ID set cache... if EnableBitsetCache { if ss := i.tagValueCache.Get(name, key, value); ss != nil { @@ -989,6 +1061,114 @@ func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesID return itr, nil } +func (i *Index) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) { + release := i.sfile.Retain() + defer release() + + itr, err := i.MeasurementSeriesByExprIterator(name, opt.Condition) + if err != nil { + return nil, err + } else if itr == nil { + return nil, nil + } + defer itr.Close() + // measurementSeriesByExprIterator filters deleted series IDs; no need to + // do so here. + + var dims []string + if len(opt.Dimensions) > 0 { + dims = make([]string, len(opt.Dimensions)) + copy(dims, opt.Dimensions) + sort.Strings(dims) + } + + // For every series, get the tag values for the requested tag keys i.e. + // dimensions. This is the TagSet for that series. Series with the same + // TagSet are then grouped together, because for the purpose of GROUP BY + // they are part of the same composite series. + tagSets := make(map[string]*query.TagSet, 64) + var seriesN, maxSeriesN int + + if opt.MaxSeriesN > 0 { + maxSeriesN = opt.MaxSeriesN + } else { + maxSeriesN = int(^uint(0) >> 1) + } + + // The tag sets require a string for each series key in the set, The series + // file formatted keys need to be parsed into models format. Since they will + // end up as strings we can re-use an intermediate buffer for this process. + var keyBuf []byte + var tagsBuf models.Tags // Buffer for tags. Tags are not needed outside of each loop iteration. + for { + se, err := itr.Next() + if err != nil { + return nil, err + } else if se.SeriesID.IsZero() { + break + } + + // Skip if the series has been tombstoned. + key := i.sfile.SeriesKey(se.SeriesID) + if len(key) == 0 { + continue + } + + if seriesN&0x3fff == 0x3fff { + // check every 16384 series if the query has been canceled + select { + case <-opt.InterruptCh: + return nil, query.ErrQueryInterrupted + default: + } + } + + if seriesN > maxSeriesN { + return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN) + } + + // NOTE - must not escape this loop iteration. + _, tagsBuf = tsdb.ParseSeriesKeyInto(key, tagsBuf) + var tagsAsKey []byte + if len(dims) > 0 { + tagsAsKey = tsdb.MakeTagsKey(dims, tagsBuf) + } + + tagSet, ok := tagSets[string(tagsAsKey)] + if !ok { + // This TagSet is new, create a new entry for it. + tagSet = &query.TagSet{ + Tags: nil, + Key: tagsAsKey, + } + } + + // Associate the series and filter with the Tagset. + keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf) + tagSet.AddFilter(string(keyBuf), se.Expr) + keyBuf = keyBuf[:0] + + // Ensure it's back in the map. + tagSets[string(tagsAsKey)] = tagSet + seriesN++ + } + + // Sort the series in each tag set. + for _, t := range tagSets { + sort.Sort(t) + } + + // The TagSets have been created, as a map of TagSets. Just send + // the values back as a slice, sorting for consistency. + sortedTagsSets := make([]*query.TagSet, 0, len(tagSets)) + for _, v := range tagSets { + sortedTagsSets = append(sortedTagsSets, v) + } + sort.Sort(tsdb.ByTagKey(sortedTagsSets)) + + return sortedTagsSets, nil +} + // MeasurementTagKeysByExpr extracts the tag keys wanted by the expression. func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { n := i.availableThreads() @@ -1092,6 +1272,390 @@ func (i *Index) MeasurementCardinalityStats() MeasurementCardinalityStats { return stats } +func (i *Index) seriesByExprIterator(name []byte, expr influxql.Expr) (tsdb.SeriesIDIterator, error) { + switch expr := expr.(type) { + case *influxql.BinaryExpr: + switch expr.Op { + case influxql.AND, influxql.OR: + // Get the series IDs and filter expressions for the LHS. + litr, err := i.seriesByExprIterator(name, expr.LHS) + if err != nil { + return nil, err + } + + // Get the series IDs and filter expressions for the RHS. + ritr, err := i.seriesByExprIterator(name, expr.RHS) + if err != nil { + if litr != nil { + litr.Close() + } + return nil, err + } + + // Intersect iterators if expression is "AND". + if expr.Op == influxql.AND { + return tsdb.IntersectSeriesIDIterators(litr, ritr), nil + } + + // Union iterators if expression is "OR". + return tsdb.UnionSeriesIDIterators(litr, ritr), nil + + default: + return i.seriesByBinaryExprIterator(name, expr) + } + + case *influxql.ParenExpr: + return i.seriesByExprIterator(name, expr.Expr) + + case *influxql.BooleanLiteral: + if expr.Val { + return i.measurementSeriesIDIterator(name) + } + return nil, nil + + default: + return nil, nil + } +} + +// seriesByBinaryExprIterator returns a series iterator and a filtering expression. +func (i *Index) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr) (tsdb.SeriesIDIterator, error) { + // If this binary expression has another binary expression, then this + // is some expression math and we should just pass it to the underlying query. + if _, ok := n.LHS.(*influxql.BinaryExpr); ok { + itr, err := i.measurementSeriesIDIterator(name) + if err != nil { + return nil, err + } + return tsdb.NewSeriesIDExprIterator(itr, n), nil + } else if _, ok := n.RHS.(*influxql.BinaryExpr); ok { + itr, err := i.measurementSeriesIDIterator(name) + if err != nil { + return nil, err + } + return tsdb.NewSeriesIDExprIterator(itr, n), nil + } + + // Retrieve the variable reference from the correct side of the expression. + key, ok := n.LHS.(*influxql.VarRef) + value := n.RHS + if !ok { + key, ok = n.RHS.(*influxql.VarRef) + if !ok { + // This is an expression we do not know how to evaluate. Let the + // query engine take care of this. + itr, err := i.measurementSeriesIDIterator(name) + if err != nil { + return nil, err + } + return tsdb.NewSeriesIDExprIterator(itr, n), nil + } + value = n.LHS + } + + // For fields, return all series from this measurement. + if key.Val != "_name" && (key.Type == influxql.AnyField || (key.Type != influxql.Tag && key.Type != influxql.Unknown)) { + itr, err := i.measurementSeriesIDIterator(name) + if err != nil { + return nil, err + } + return tsdb.NewSeriesIDExprIterator(itr, n), nil + } else if value, ok := value.(*influxql.VarRef); ok { + // Check if the RHS is a variable and if it is a field. + if value.Val != "_name" && (key.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) { + itr, err := i.measurementSeriesIDIterator(name) + if err != nil { + return nil, err + } + return tsdb.NewSeriesIDExprIterator(itr, n), nil + } + } + + // Create iterator based on value type. + switch value := value.(type) { + case *influxql.StringLiteral: + return i.seriesByBinaryExprStringIterator(name, []byte(key.Val), []byte(value.Val), n.Op) + case *influxql.RegexLiteral: + return i.seriesByBinaryExprRegexIterator(name, []byte(key.Val), value.Val, n.Op) + case *influxql.VarRef: + return i.seriesByBinaryExprVarRefIterator(name, []byte(key.Val), value, n.Op) + default: + // We do not know how to evaluate this expression so pass it + // on to the query engine. + itr, err := i.measurementSeriesIDIterator(name) + if err != nil { + return nil, err + } + return tsdb.NewSeriesIDExprIterator(itr, n), nil + } +} + +func (i *Index) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (tsdb.SeriesIDIterator, error) { + // Special handling for "_name" to match measurement name. + if bytes.Equal(key, []byte("_name")) { + if (op == influxql.EQ && bytes.Equal(value, name)) || (op == influxql.NEQ && !bytes.Equal(value, name)) { + return i.measurementSeriesIDIterator(name) + } + return nil, nil + } + + if op == influxql.EQ { + // Match a specific value. + if len(value) != 0 { + return i.tagValueSeriesIDIterator(name, key, value) + } + + mitr, err := i.measurementSeriesIDIterator(name) + if err != nil { + return nil, err + } + + kitr, err := i.tagKeySeriesIDIterator(name, key) + if err != nil { + if mitr != nil { + mitr.Close() + } + return nil, err + } + + // Return all measurement series that have no values from this tag key. + return tsdb.DifferenceSeriesIDIterators(mitr, kitr), nil + } + + // Return all measurement series without this tag value. + if len(value) != 0 { + mitr, err := i.measurementSeriesIDIterator(name) + if err != nil { + return nil, err + } + + vitr, err := i.tagValueSeriesIDIterator(name, key, value) + if err != nil { + if mitr != nil { + mitr.Close() + } + return nil, err + } + + return tsdb.DifferenceSeriesIDIterators(mitr, vitr), nil + } + + // Return all series across all values of this tag key. + return i.tagKeySeriesIDIterator(name, key) +} + +func (i *Index) seriesByBinaryExprRegexIterator(name, key []byte, value *regexp.Regexp, op influxql.Token) (tsdb.SeriesIDIterator, error) { + // Special handling for "_name" to match measurement name. + if bytes.Equal(key, []byte("_name")) { + match := value.Match(name) + if (op == influxql.EQREGEX && match) || (op == influxql.NEQREGEX && !match) { + mitr, err := i.measurementSeriesIDIterator(name) + if err != nil { + return nil, err + } + return tsdb.NewSeriesIDExprIterator(mitr, &influxql.BooleanLiteral{Val: true}), nil + } + return nil, nil + } + return i.matchTagValueSeriesIDIterator(name, key, value, op == influxql.EQREGEX) +} + +func (i *Index) seriesByBinaryExprVarRefIterator(name, key []byte, value *influxql.VarRef, op influxql.Token) (tsdb.SeriesIDIterator, error) { + itr0, err := i.tagKeySeriesIDIterator(name, key) + if err != nil { + return nil, err + } + + itr1, err := i.tagKeySeriesIDIterator(name, []byte(value.Val)) + if err != nil { + if itr0 != nil { + itr0.Close() + } + return nil, err + } + + if op == influxql.EQ { + return tsdb.IntersectSeriesIDIterators(itr0, itr1), nil + } + return tsdb.DifferenceSeriesIDIterators(itr0, itr1), nil +} + +// MatchTagValueSeriesIDIterator returns a series iterator for tags which match value. +// If matches is false, returns iterators which do not match value. +func (i *Index) MatchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (tsdb.SeriesIDIterator, error) { + release := i.sfile.Retain() + defer release() + + itr, err := i.matchTagValueSeriesIDIterator(name, key, value, matches) + if err != nil { + return nil, err + } + return tsdb.FilterUndeletedSeriesIDIterator(i.sfile, itr), nil +} + +// matchTagValueSeriesIDIterator returns a series iterator for tags which match +// value. See MatchTagValueSeriesIDIterator for more details. +// +// It guarantees to never take any locks on the underlying series file. +func (i *Index) matchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (tsdb.SeriesIDIterator, error) { + matchEmpty := value.MatchString("") + if matches { + if matchEmpty { + return i.matchTagValueEqualEmptySeriesIDIterator(name, key, value) + } + return i.matchTagValueEqualNotEmptySeriesIDIterator(name, key, value) + } + + if matchEmpty { + return i.matchTagValueNotEqualEmptySeriesIDIterator(name, key, value) + } + return i.matchTagValueNotEqualNotEmptySeriesIDIterator(name, key, value) +} + +func (i *Index) matchTagValueEqualEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (tsdb.SeriesIDIterator, error) { + vitr, err := i.TagValueIterator(name, key) + if err != nil { + return nil, err + } else if vitr == nil { + return i.measurementSeriesIDIterator(name) + } + defer vitr.Close() + + var itrs []tsdb.SeriesIDIterator + if err := func() error { + for { + e, err := vitr.Next() + if err != nil { + return err + } else if e == nil { + break + } + + if !value.Match(e) { + itr, err := i.tagValueSeriesIDIterator(name, key, e) + if err != nil { + return err + } else if itr != nil { + itrs = append(itrs, itr) + } + } + } + return nil + }(); err != nil { + tsdb.SeriesIDIterators(itrs).Close() + return nil, err + } + + mitr, err := i.measurementSeriesIDIterator(name) + if err != nil { + tsdb.SeriesIDIterators(itrs).Close() + return nil, err + } + + return tsdb.DifferenceSeriesIDIterators(mitr, tsdb.MergeSeriesIDIterators(itrs...)), nil +} + +func (i *Index) matchTagValueEqualNotEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (tsdb.SeriesIDIterator, error) { + vitr, err := i.TagValueIterator(name, key) + if err != nil { + return nil, err + } else if vitr == nil { + return nil, nil + } + defer vitr.Close() + + var itrs []tsdb.SeriesIDIterator + for { + e, err := vitr.Next() + if err != nil { + tsdb.SeriesIDIterators(itrs).Close() + return nil, err + } else if e == nil { + break + } + + if value.Match(e) { + itr, err := i.tagValueSeriesIDIterator(name, key, e) + if err != nil { + tsdb.SeriesIDIterators(itrs).Close() + return nil, err + } else if itr != nil { + itrs = append(itrs, itr) + } + } + } + return tsdb.MergeSeriesIDIterators(itrs...), nil +} + +func (i *Index) matchTagValueNotEqualEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (tsdb.SeriesIDIterator, error) { + vitr, err := i.TagValueIterator(name, key) + if err != nil { + return nil, err + } else if vitr == nil { + return nil, nil + } + defer vitr.Close() + + var itrs []tsdb.SeriesIDIterator + for { + e, err := vitr.Next() + if err != nil { + tsdb.SeriesIDIterators(itrs).Close() + return nil, err + } else if e == nil { + break + } + + if !value.Match(e) { + itr, err := i.tagValueSeriesIDIterator(name, key, e) + if err != nil { + tsdb.SeriesIDIterators(itrs).Close() + return nil, err + } else if itr != nil { + itrs = append(itrs, itr) + } + } + } + return tsdb.MergeSeriesIDIterators(itrs...), nil +} + +func (i *Index) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (tsdb.SeriesIDIterator, error) { + vitr, err := i.TagValueIterator(name, key) + if err != nil { + return nil, err + } else if vitr == nil { + return i.measurementSeriesIDIterator(name) + } + defer vitr.Close() + + var itrs []tsdb.SeriesIDIterator + for { + e, err := vitr.Next() + if err != nil { + tsdb.SeriesIDIterators(itrs).Close() + return nil, err + } else if e == nil { + break + } + if value.Match(e) { + itr, err := i.tagValueSeriesIDIterator(name, key, e) + if err != nil { + tsdb.SeriesIDIterators(itrs).Close() + return nil, err + } else if itr != nil { + itrs = append(itrs, itr) + } + } + } + + mitr, err := i.measurementSeriesIDIterator(name) + if err != nil { + tsdb.SeriesIDIterators(itrs).Close() + return nil, err + } + return tsdb.DifferenceSeriesIDIterators(mitr, tsdb.MergeSeriesIDIterators(itrs...)), nil +} + // IsIndexDir returns true if directory contains at least one partition directory. func IsIndexDir(path string) (bool, error) { fis, err := ioutil.ReadDir(path) From 28fecc1f6f4fe4d7bcaca21a1565c848492031e8 Mon Sep 17 00:00:00 2001 From: zhulongcheng Date: Tue, 23 Oct 2018 15:32:11 +0800 Subject: [PATCH 02/11] replace tsdb.IndexSet with tsi1.Index This fix is to remove tsdb.IndexSet to resolve #886. --- storage/engine.go | 3 +-- storage/series_cursor.go | 15 ++++++++------- tsdb/index.go | 14 +++++++------- tsdb/tsm1/engine.go | 7 ++----- 4 files changed, 18 insertions(+), 21 deletions(-) diff --git a/storage/engine.go b/storage/engine.go index e0698e4842..d88ad97cc2 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -256,8 +256,7 @@ func (e *Engine) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest if e.closing == nil { return nil, ErrEngineClosed } - // TODO(edd): remove IndexSet - return newSeriesCursor(req, tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}, cond) + return newSeriesCursor(req, e.index, cond) } func (e *Engine) CreateCursorIterator(ctx context.Context) (tsdb.CursorIterator, error) { diff --git a/storage/series_cursor.go b/storage/series_cursor.go index 4528ea76a4..4599bd2257 100644 --- a/storage/series_cursor.go +++ b/storage/series_cursor.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/influxql" "github.com/influxdata/platform/models" "github.com/influxdata/platform/tsdb" + "github.com/influxdata/platform/tsdb/tsi1" ) type SeriesCursor interface { @@ -19,10 +20,10 @@ type SeriesCursorRequest struct { Measurements tsdb.MeasurementIterator } -// seriesCursor is an implementation of SeriesCursor over an IndexSet. +// seriesCursor is an implementation of SeriesCursor over an tsi1.Index. type seriesCursor struct { once sync.Once - indexSet tsdb.IndexSet + index *tsi1.Index mitr tsdb.MeasurementIterator keys [][]byte ofs int @@ -36,7 +37,7 @@ type SeriesCursorRow struct { } // newSeriesCursor returns a new instance of SeriesCursor. -func newSeriesCursor(req SeriesCursorRequest, indexSet tsdb.IndexSet, cond influxql.Expr) (_ SeriesCursor, err error) { +func newSeriesCursor(req SeriesCursorRequest, index *tsi1.Index, cond influxql.Expr) (_ SeriesCursor, err error) { // Only equality operators are allowed. influxql.WalkFunc(cond, func(node influxql.Node) { switch n := node.(type) { @@ -54,14 +55,14 @@ func newSeriesCursor(req SeriesCursorRequest, indexSet tsdb.IndexSet, cond influ mitr := req.Measurements if mitr == nil { - mitr, err = indexSet.MeasurementIterator() + mitr, err = index.MeasurementIterator() if err != nil { return nil, err } } return &seriesCursor{ - indexSet: indexSet, + index: index, mitr: mitr, cond: cond, }, nil @@ -103,7 +104,7 @@ func (cur *seriesCursor) Next() (*SeriesCursorRow, error) { } func (cur *seriesCursor) readSeriesKeys(name []byte) error { - sitr, err := cur.indexSet.MeasurementSeriesByExprIterator(name, cur.cond) + sitr, err := cur.index.MeasurementSeriesByExprIterator(name, cur.cond) if err != nil { return err } else if sitr == nil { @@ -122,7 +123,7 @@ func (cur *seriesCursor) readSeriesKeys(name []byte) error { break } - key := cur.indexSet.SeriesFile.SeriesKey(elem.SeriesID) + key := cur.index.SeriesFile().SeriesKey(elem.SeriesID) if len(key) == 0 { continue } diff --git a/tsdb/index.go b/tsdb/index.go index 9f9fab8a71..9e0a2b22ed 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/platform/pkg/estimator" "github.com/influxdata/platform/pkg/slices" "go.uber.org/zap" + "github.com/influxdata/platform/tsdb/tsi1" ) // Available index types. @@ -732,8 +733,7 @@ func (itr *seriesIDDifferenceIterator) Next() (_ SeriesIDElem, err error) { // seriesPointIterator adapts SeriesIterator to an influxql.Iterator. type seriesPointIterator struct { once sync.Once - indexSet IndexSet - // index *tsi1.Index + index *tsi1.Index mitr MeasurementIterator keys [][]byte opt query.IteratorOptions @@ -742,7 +742,7 @@ type seriesPointIterator struct { } // newSeriesPointIterator returns a new instance of seriesPointIterator. -func NewSeriesPointIterator(indexSet IndexSet, opt query.IteratorOptions) (_ query.Iterator, err error) { +func NewSeriesPointIterator(index *tsi1.Index, opt query.IteratorOptions) (_ query.Iterator, err error) { // Only equality operators are allowed. influxql.WalkFunc(opt.Condition, func(n influxql.Node) { switch n := n.(type) { @@ -759,13 +759,13 @@ func NewSeriesPointIterator(indexSet IndexSet, opt query.IteratorOptions) (_ que return nil, err } - mitr, err := indexSet.MeasurementIterator() + mitr, err := index.MeasurementIterator() if err != nil { return nil, err } return &seriesPointIterator{ - indexSet: indexSet, + index: index, mitr: mitr, point: query.FloatPoint{ Aux: make([]interface{}, len(opt.Aux)), @@ -825,7 +825,7 @@ func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) { } func (itr *seriesPointIterator) readSeriesKeys(name []byte) error { - sitr, err := itr.indexSet.MeasurementSeriesByExprIterator(name, itr.opt.Condition) + sitr, err := itr.index.MeasurementSeriesByExprIterator(name, itr.opt.Condition) if err != nil { return err } else if sitr == nil { @@ -852,7 +852,7 @@ func (itr *seriesPointIterator) readSeriesKeys(name []byte) error { } } - key := itr.indexSet.SeriesFile.SeriesKey(elem.SeriesID) + key := itr.index.SeriesFile().SeriesKey(elem.SeriesID) if len(key) == 0 { continue } diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index fe1570d541..9add48afc2 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -1514,8 +1514,7 @@ func (e *Engine) DeleteMeasurement(name []byte) error { // DeleteMeasurement deletes a measurement and all related series. func (e *Engine) deleteMeasurement(name []byte) error { // Attempt to find the series keys. - indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} - itr, err := indexSet.MeasurementSeriesByExprIterator(name, nil) + itr, err := e.index.MeasurementSeriesIDIterator(name) if err != nil { return err } else if itr == nil { @@ -2049,9 +2048,7 @@ func (e *Engine) IteratorCost(measurement string, opt query.IteratorOptions) (qu return query.IteratorCost{}, nil } - // Determine all of the tag sets for this query. - indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} - tagSets, err := indexSet.TagSets(e.sfile, []byte(measurement), opt) + tagSets, err := e.index.TagSets([]byte(measurement), opt) if err != nil { return query.IteratorCost{}, err } From 427d719af802000190eb83ccec952b3bc76fe3a3 Mon Sep 17 00:00:00 2001 From: zhulongcheng Date: Tue, 23 Oct 2018 16:28:26 +0800 Subject: [PATCH 03/11] remove tsdb.IndexSet tests This fix is to remove tsdb.IndexSet to resolve #886. --- tsdb/index_test.go | 60 ++-------------------------------------- tsdb/tsm1/engine_test.go | 23 ++++++--------- 2 files changed, 11 insertions(+), 72 deletions(-) diff --git a/tsdb/index_test.go b/tsdb/index_test.go index 05a6f954c4..3705b6aac0 100644 --- a/tsdb/index_test.go +++ b/tsdb/index_test.go @@ -14,7 +14,6 @@ import ( "github.com/influxdata/influxql" "github.com/influxdata/platform/logger" "github.com/influxdata/platform/models" - "github.com/influxdata/platform/pkg/slices" "github.com/influxdata/platform/tsdb" "github.com/influxdata/platform/tsdb/tsi1" ) @@ -62,51 +61,6 @@ func TestMergeSeriesIDIterators(t *testing.T) { } } -func TestIndexSet_MeasurementNamesByExpr(t *testing.T) { - // Setup indexes - indexes := map[string]*Index{} - for _, name := range tsdb.RegisteredIndexes() { - idx := MustOpenNewIndex(tsi1.NewConfig()) - idx.AddSeries("cpu", map[string]string{"region": "east"}, models.Integer) - idx.AddSeries("cpu", map[string]string{"region": "west", "secret": "foo"}, models.Integer) - idx.AddSeries("disk", map[string]string{"secret": "foo"}, models.Integer) - idx.AddSeries("mem", map[string]string{"region": "west"}, models.Integer) - idx.AddSeries("gpu", map[string]string{"region": "east"}, models.Integer) - idx.AddSeries("pci", map[string]string{"region": "east", "secret": "foo"}, models.Integer) - indexes[name] = idx - defer idx.Close() - } - - type example struct { - name string - expr influxql.Expr - expected [][]byte - } - - examples := []example{ - {name: "all", expected: slices.StringsToBytes("cpu", "disk", "gpu", "mem", "pci")}, - {name: "EQ", expr: influxql.MustParseExpr(`region = 'west'`), expected: slices.StringsToBytes("cpu", "mem")}, - {name: "NEQ", expr: influxql.MustParseExpr(`region != 'west'`), expected: slices.StringsToBytes("gpu", "pci")}, - {name: "EQREGEX", expr: influxql.MustParseExpr(`region =~ /.*st/`), expected: slices.StringsToBytes("cpu", "gpu", "mem", "pci")}, - {name: "NEQREGEX", expr: influxql.MustParseExpr(`region !~ /.*est/`), expected: slices.StringsToBytes("gpu", "pci")}, - } - - for _, idx := range tsdb.RegisteredIndexes() { - t.Run(idx, func(t *testing.T) { - for _, example := range examples { - t.Run(example.name, func(t *testing.T) { - names, err := indexes[idx].IndexSet().MeasurementNamesByExpr(nil, example.expr) - if err != nil { - t.Fatal(err) - } else if !reflect.DeepEqual(names, example.expected) { - t.Fatalf("got names: %v, expected %v", slices.BytesToStrings(names), slices.BytesToStrings(example.expected)) - } - }) - } - }) - } -} - // Index wraps a series file and index. type Index struct { rootPath string @@ -166,10 +120,6 @@ func (i *Index) MustOpen() { } } -func (idx *Index) IndexSet() *tsdb.IndexSet { - return &tsdb.IndexSet{Indexes: []tsdb.Index{idx.Index}, SeriesFile: idx.sfile} -} - func (idx *Index) AddSeries(name string, tags map[string]string, typ models.FieldType) error { t := models.NewTags(tags) key := fmt.Sprintf("%s,%s", name, t.HashKey()) @@ -215,8 +165,8 @@ func (i *Index) Close() error { // // Typical results on an i7 laptop. // -// BenchmarkIndexSet_TagSets/1M_series/tsi1-8 100 18995530 ns/op 5221180 B/op 20379 allocs/op -func BenchmarkIndexSet_TagSets(b *testing.B) { +// BenchmarkIndex_TagSets/1M_series/tsi1-8 100 18995530 ns/op 5221180 B/op 20379 allocs/op +func BenchmarkIndex_TagSets(b *testing.B) { // Read line-protocol and coerce into tsdb format. // 1M series generated with: // $inch -b 10000 -c 1 -t 10,10,10,10,10,10 -f 1 -m 5 -p 1 @@ -269,14 +219,10 @@ func BenchmarkIndexSet_TagSets(b *testing.B) { name := []byte("m4") opt := query.IteratorOptions{Condition: influxql.MustParseExpr(`"tag5"::tag = 'value0'`)} - indexSet := tsdb.IndexSet{ - SeriesFile: idx.sfile, - Indexes: []tsdb.Index{idx.Index}, - } // For TSI implementation var ts func() ([]*query.TagSet, error) ts = func() ([]*query.TagSet, error) { - return indexSet.TagSets(idx.sfile, name, opt) + return idx.Index.TagSets(name, opt) } b.Run(indexType, func(b *testing.B) { diff --git a/tsdb/tsm1/engine_test.go b/tsdb/tsm1/engine_test.go index 9266602762..df48803bdc 100644 --- a/tsdb/tsm1/engine_test.go +++ b/tsdb/tsm1/engine_test.go @@ -753,8 +753,7 @@ func TestEngine_DeleteSeriesRange(t *testing.T) { } // Check that the series still exists in the index - indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} - iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu")) + iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu")) if err != nil { t.Fatalf("iterator error: %v", err) } @@ -785,8 +784,7 @@ func TestEngine_DeleteSeriesRange(t *testing.T) { t.Fatalf("failed to delete series: %v", err) } - indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} - if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil { + if iter, err = e.index.MeasurementSeriesIDIterator([]byte("cpu")); err != nil { t.Fatalf("iterator error: %v", err) } if iter == nil { @@ -878,8 +876,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) { } // Check that the series still exists in the index - indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} - iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu")) + iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu")) if err != nil { t.Fatalf("iterator error: %v", err) } @@ -910,8 +907,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) { t.Fatalf("failed to delete series: %v", err) } - indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} - if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil { + if iter, err = e.index.MeasurementSeriesIDIterator([]byte("cpu")); err != nil { t.Fatalf("iterator error: %v", err) } if iter == nil { @@ -984,8 +980,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate_Nil(t *testing.T) { } // Check that the series still exists in the index - indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} - iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu")) + iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu")) if err != nil { t.Fatalf("iterator error: %v", err) } else if iter == nil { @@ -1000,7 +995,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate_Nil(t *testing.T) { } // Check that disk series still exists - iter, err = indexSet.MeasurementSeriesIDIterator([]byte("disk")) + iter, err = e.index.MeasurementSeriesIDIterator([]byte("disk")) if err != nil { t.Fatalf("iterator error: %v", err) } else if iter == nil { @@ -1091,8 +1086,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate_FlushBatch(t *testing.T) { } // Check that the series still exists in the index - indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} - iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu")) + iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu")) if err != nil { t.Fatalf("iterator error: %v", err) } @@ -1123,8 +1117,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate_FlushBatch(t *testing.T) { t.Fatalf("failed to delete series: %v", err) } - indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} - if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil { + if iter, err = e.index.MeasurementSeriesIDIterator([]byte("cpu")); err != nil { t.Fatalf("iterator error: %v", err) } if iter == nil { From c1e732782eab2be3b33c29b8e8f737ff612565e2 Mon Sep 17 00:00:00 2001 From: zhulongcheng Date: Tue, 23 Oct 2018 16:43:12 +0800 Subject: [PATCH 04/11] remove tsdb.IndexSet This fix is to resolve #886. --- tsdb/index.go | 1224 ------------------------------------------------- 1 file changed, 1224 deletions(-) diff --git a/tsdb/index.go b/tsdb/index.go index 9e0a2b22ed..6f516b7641 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -11,9 +11,7 @@ import ( "github.com/influxdata/influxdb/query" "github.com/influxdata/influxql" "github.com/influxdata/platform/models" - "github.com/influxdata/platform/pkg/bytesutil" "github.com/influxdata/platform/pkg/estimator" - "github.com/influxdata/platform/pkg/slices" "go.uber.org/zap" "github.com/influxdata/platform/tsdb/tsi1" ) @@ -1180,1228 +1178,6 @@ func (itr *tagValueMergeIterator) Next() (_ []byte, err error) { return value, nil } -// IndexSet represents a list of indexes, all belonging to one database. -type IndexSet struct { - Indexes []Index // The set of indexes comprising this IndexSet. - SeriesFile *SeriesFile // The Series File associated with the db for this set. -} - -// HasInmemIndex returns true if any in-memory index is in use. -func (is IndexSet) HasInmemIndex() bool { - // TODO(edd): Remove - return false -} - -// Database returns the database name of the first index. -func (is IndexSet) Database() string { - if len(is.Indexes) == 0 { - return "" - } - return is.Indexes[0].Database() -} - -// DedupeInmemIndexes returns an index set which removes duplicate indexes. -// Useful because inmem indexes are shared by shards per database. -func (is IndexSet) DedupeInmemIndexes() IndexSet { - other := IndexSet{ - Indexes: make([]Index, 0, len(is.Indexes)), - SeriesFile: is.SeriesFile, - } - - uniqueIndexes := make(map[uintptr]Index) - for _, idx := range is.Indexes { - uniqueIndexes[idx.UniqueReferenceID()] = idx - } - - for _, idx := range uniqueIndexes { - other.Indexes = append(other.Indexes, idx) - } - - return other -} - -// MeasurementNamesByExpr returns a slice of measurement names matching the -// provided condition. If no condition is provided then all names are returned. -func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { - release := is.SeriesFile.Retain() - defer release() - - // Return filtered list if expression exists. - if expr != nil { - return is.measurementNamesByExpr(auth, expr) - } - - itr, err := is.measurementIterator() - if err != nil { - return nil, err - } else if itr == nil { - return nil, nil - } - defer itr.Close() - - // Iterate over all measurements if no condition exists. - var names [][]byte - for { - e, err := itr.Next() - if err != nil { - return nil, err - } else if e == nil { - break - } - names = append(names, e) - } - return slices.CopyChunkedByteSlices(names, 1000), nil -} - -func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { - if expr == nil { - return nil, nil - } - - switch e := expr.(type) { - case *influxql.BinaryExpr: - switch e.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: - tag, ok := e.LHS.(*influxql.VarRef) - if !ok { - return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) - } - - // Retrieve value or regex expression from RHS. - var value string - var regex *regexp.Regexp - if influxql.IsRegexOp(e.Op) { - re, ok := e.RHS.(*influxql.RegexLiteral) - if !ok { - return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) - } - regex = re.Val - } else { - s, ok := e.RHS.(*influxql.StringLiteral) - if !ok { - return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) - } - value = s.Val - } - - // Match on name, if specified. - if tag.Val == "_name" { - return is.measurementNamesByNameFilter(auth, e.Op, value, regex) - } else if influxql.IsSystemName(tag.Val) { - return nil, nil - } - return is.measurementNamesByTagFilter(auth, e.Op, tag.Val, value, regex) - - case influxql.OR, influxql.AND: - lhs, err := is.measurementNamesByExpr(auth, e.LHS) - if err != nil { - return nil, err - } - - rhs, err := is.measurementNamesByExpr(auth, e.RHS) - if err != nil { - return nil, err - } - - if e.Op == influxql.OR { - return bytesutil.Union(lhs, rhs), nil - } - return bytesutil.Intersect(lhs, rhs), nil - - default: - return nil, fmt.Errorf("invalid tag comparison operator") - } - - case *influxql.ParenExpr: - return is.measurementNamesByExpr(auth, e.Expr) - default: - return nil, fmt.Errorf("%#v", expr) - } -} - -// measurementNamesByNameFilter returns matching measurement names in sorted order. -func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) { - itr, err := is.measurementIterator() - if err != nil { - return nil, err - } else if itr == nil { - return nil, nil - } - defer itr.Close() - - var names [][]byte - for { - e, err := itr.Next() - if err != nil { - return nil, err - } else if e == nil { - break - } - - var matched bool - switch op { - case influxql.EQ: - matched = string(e) == val - case influxql.NEQ: - matched = string(e) != val - case influxql.EQREGEX: - matched = regex.Match(e) - case influxql.NEQREGEX: - matched = !regex.Match(e) - } - - if matched { - names = append(names, e) - } - } - bytesutil.Sort(names) - return names, nil -} - -func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) { - var names [][]byte - - mitr, err := is.measurementIterator() - if err != nil { - return nil, err - } else if mitr == nil { - return nil, nil - } - defer mitr.Close() - - // valEqual determines if the provided []byte is equal to the tag value - // to be filtered on. - valEqual := regex.Match - if op == influxql.EQ || op == influxql.NEQ { - vb := []byte(val) - valEqual = func(b []byte) bool { return bytes.Equal(vb, b) } - } - - var tagMatch bool - var authorized bool - for { - me, err := mitr.Next() - if err != nil { - return nil, err - } else if me == nil { - break - } - // If the measurement doesn't have the tag key, then it won't be considered. - if ok, err := is.hasTagKey(me, []byte(key)); err != nil { - return nil, err - } else if !ok { - continue - } - tagMatch = false - // Authorization must be explicitly granted when an authorizer is present. - authorized = true - - vitr, err := is.tagValueIterator(me, []byte(key)) - if err != nil { - return nil, err - } - - if vitr != nil { - defer vitr.Close() - for { - ve, err := vitr.Next() - if err != nil { - return nil, err - } else if ve == nil { - break - } - if !valEqual(ve) { - continue - } - - tagMatch = true - break - } - if err := vitr.Close(); err != nil { - return nil, err - } - } - - // For negation operators, to determine if the measurement is authorized, - // an authorized series belonging to the measurement must be located. - // Then, the measurement can be added iff !tagMatch && authorized. - if (op == influxql.NEQ || op == influxql.NEQREGEX) && !tagMatch { - authorized = true - } - - // tags match | operation is EQ | measurement matches - // -------------------------------------------------- - // True | True | True - // True | False | False - // False | True | False - // False | False | True - if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) && authorized { - names = append(names, me) - continue - } - } - - bytesutil.Sort(names) - return names, nil -} - -// HasTagKey returns true if the tag key exists in any index for the provided -// measurement. -func (is IndexSet) HasTagKey(name, key []byte) (bool, error) { - return is.hasTagKey(name, key) -} - -// hasTagKey returns true if the tag key exists in any index for the provided -// measurement, and guarantees to never take a lock on the series file. -func (is IndexSet) hasTagKey(name, key []byte) (bool, error) { - for _, idx := range is.Indexes { - if ok, err := idx.HasTagKey(name, key); err != nil { - return false, err - } else if ok { - return true, nil - } - } - return false, nil -} - -// HasTagValue returns true if the tag value exists in any index for the provided -// measurement and tag key. -func (is IndexSet) HasTagValue(name, key, value []byte) (bool, error) { - for _, idx := range is.Indexes { - if ok, err := idx.HasTagValue(name, key, value); err != nil { - return false, err - } else if ok { - return true, nil - } - } - return false, nil -} - -// MeasurementIterator returns an iterator over all measurements in the index. -func (is IndexSet) MeasurementIterator() (MeasurementIterator, error) { - return is.measurementIterator() -} - -// measurementIterator returns an iterator over all measurements in the index. -// It guarantees to never take any locks on the underlying series file. -func (is IndexSet) measurementIterator() (MeasurementIterator, error) { - a := make([]MeasurementIterator, 0, len(is.Indexes)) - for _, idx := range is.Indexes { - itr, err := idx.MeasurementIterator() - if err != nil { - MeasurementIterators(a).Close() - return nil, err - } else if itr != nil { - a = append(a, itr) - } - } - return MergeMeasurementIterators(a...), nil -} - -// TagKeyIterator returns a key iterator for a measurement. -func (is IndexSet) TagKeyIterator(name []byte) (TagKeyIterator, error) { - return is.tagKeyIterator(name) -} - -// tagKeyIterator returns a key iterator for a measurement. It guarantees to never -// take any locks on the underlying series file. -func (is IndexSet) tagKeyIterator(name []byte) (TagKeyIterator, error) { - a := make([]TagKeyIterator, 0, len(is.Indexes)) - for _, idx := range is.Indexes { - itr, err := idx.TagKeyIterator(name) - if err != nil { - TagKeyIterators(a).Close() - return nil, err - } else if itr != nil { - a = append(a, itr) - } - } - return MergeTagKeyIterators(a...), nil -} - -// TagValueIterator returns a value iterator for a tag key. -func (is IndexSet) TagValueIterator(name, key []byte) (TagValueIterator, error) { - return is.tagValueIterator(name, key) -} - -// tagValueIterator returns a value iterator for a tag key. It guarantees to never -// take any locks on the underlying series file. -func (is IndexSet) tagValueIterator(name, key []byte) (TagValueIterator, error) { - a := make([]TagValueIterator, 0, len(is.Indexes)) - for _, idx := range is.Indexes { - itr, err := idx.TagValueIterator(name, key) - if err != nil { - TagValueIterators(a).Close() - return nil, err - } else if itr != nil { - a = append(a, itr) - } - } - return MergeTagValueIterators(a...), nil -} - -// MeasurementSeriesIDIterator returns an iterator over all non-tombstoned series -// for the provided measurement. -func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) { - release := is.SeriesFile.Retain() - defer release() - - itr, err := is.measurementSeriesIDIterator(name) - if err != nil { - return nil, err - } - return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil -} - -// measurementSeriesIDIterator does not provide any locking on the Series file. -// -// See MeasurementSeriesIDIterator for more details. -func (is IndexSet) measurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) { - a := make([]SeriesIDIterator, 0, len(is.Indexes)) - for _, idx := range is.Indexes { - itr, err := idx.MeasurementSeriesIDIterator(name) - if err != nil { - SeriesIDIterators(a).Close() - return nil, err - } else if itr != nil { - a = append(a, itr) - } - } - return MergeSeriesIDIterators(a...), nil -} - -// ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies -// the provided function. -func (is IndexSet) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { - release := is.SeriesFile.Retain() - defer release() - - itr, err := is.tagKeyIterator(name) - if err != nil { - return err - } else if itr == nil { - return nil - } - defer itr.Close() - - for { - key, err := itr.Next() - if err != nil { - return err - } else if key == nil { - return nil - } - - if err := fn(key); err != nil { - return err - } - } -} - -// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression. -func (is IndexSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { - release := is.SeriesFile.Retain() - defer release() - - keys := make(map[string]struct{}) - for _, idx := range is.Indexes { - m, err := idx.MeasurementTagKeysByExpr(name, expr) - if err != nil { - return nil, err - } - for k := range m { - keys[k] = struct{}{} - } - } - return keys, nil -} - -// TagKeySeriesIDIterator returns a series iterator for all values across a single key. -func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) { - release := is.SeriesFile.Retain() - defer release() - - itr, err := is.tagKeySeriesIDIterator(name, key) - if err != nil { - return nil, err - } - return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil -} - -// tagKeySeriesIDIterator returns a series iterator for all values across a -// single key. -// -// It guarantees to never take any locks on the series file. -func (is IndexSet) tagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) { - a := make([]SeriesIDIterator, 0, len(is.Indexes)) - for _, idx := range is.Indexes { - itr, err := idx.TagKeySeriesIDIterator(name, key) - if err != nil { - SeriesIDIterators(a).Close() - return nil, err - } else if itr != nil { - a = append(a, itr) - } - } - return MergeSeriesIDIterators(a...), nil -} - -// TagValueSeriesIDIterator returns a series iterator for a single tag value. -func (is IndexSet) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) { - release := is.SeriesFile.Retain() - defer release() - - itr, err := is.tagValueSeriesIDIterator(name, key, value) - if err != nil { - return nil, err - } - return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil -} - -// tagValueSeriesIDIterator does not provide any locking on the Series File. -// -// See TagValueSeriesIDIterator for more details. -func (is IndexSet) tagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) { - a := make([]SeriesIDIterator, 0, len(is.Indexes)) - for _, idx := range is.Indexes { - itr, err := idx.TagValueSeriesIDIterator(name, key, value) - if err != nil { - SeriesIDIterators(a).Close() - return nil, err - } else if itr != nil { - a = append(a, itr) - } - } - return MergeSeriesIDIterators(a...), nil -} - -// MeasurementSeriesByExprIterator returns a series iterator for a measurement -// that is filtered by expr. If expr only contains time expressions then this -// call is equivalent to MeasurementSeriesIDIterator(). -func (is IndexSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) { - release := is.SeriesFile.Retain() - defer release() - return is.measurementSeriesByExprIterator(name, expr) -} - -// measurementSeriesByExprIterator returns a series iterator for a measurement -// that is filtered by expr. See MeasurementSeriesByExprIterator for more details. -// -// measurementSeriesByExprIterator guarantees to never take any locks on the -// series file. -func (is IndexSet) measurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) { - // Return all series for the measurement if there are no tag expressions. - if expr == nil { - itr, err := is.measurementSeriesIDIterator(name) - if err != nil { - return nil, err - } - return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil - } - - itr, err := is.seriesByExprIterator(name, expr) - if err != nil { - return nil, err - } - return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil -} - -// MeasurementSeriesKeysByExpr returns a list of series keys matching expr. -func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { - release := is.SeriesFile.Retain() - defer release() - - // Create iterator for all matching series. - itr, err := is.measurementSeriesByExprIterator(name, expr) - if err != nil { - return nil, err - } else if itr == nil { - return nil, nil - } - defer itr.Close() - - // measurementSeriesByExprIterator filters deleted series; no need to do so here. - - // Iterate over all series and generate keys. - var keys [][]byte - for { - e, err := itr.Next() - if err != nil { - return nil, err - } else if e.SeriesID.IsZero() { - break - } - - // Check for unsupported field filters. - // Any remaining filters means there were fields (e.g., `WHERE value = 1.2`). - if e.Expr != nil { - if v, ok := e.Expr.(*influxql.BooleanLiteral); !ok || !v.Val { - return nil, errors.New("fields not supported in WHERE clause during deletion") - } - } - - seriesKey := is.SeriesFile.SeriesKey(e.SeriesID) - if len(seriesKey) == 0 { - continue - } - - name, tags := ParseSeriesKey(seriesKey) - keys = append(keys, models.MakeKey(name, tags)) - } - - bytesutil.Sort(keys) - - return keys, nil -} - -func (is IndexSet) seriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) { - switch expr := expr.(type) { - case *influxql.BinaryExpr: - switch expr.Op { - case influxql.AND, influxql.OR: - // Get the series IDs and filter expressions for the LHS. - litr, err := is.seriesByExprIterator(name, expr.LHS) - if err != nil { - return nil, err - } - - // Get the series IDs and filter expressions for the RHS. - ritr, err := is.seriesByExprIterator(name, expr.RHS) - if err != nil { - if litr != nil { - litr.Close() - } - return nil, err - } - - // Intersect iterators if expression is "AND". - if expr.Op == influxql.AND { - return IntersectSeriesIDIterators(litr, ritr), nil - } - - // Union iterators if expression is "OR". - return UnionSeriesIDIterators(litr, ritr), nil - - default: - return is.seriesByBinaryExprIterator(name, expr) - } - - case *influxql.ParenExpr: - return is.seriesByExprIterator(name, expr.Expr) - - case *influxql.BooleanLiteral: - if expr.Val { - return is.measurementSeriesIDIterator(name) - } - return nil, nil - - default: - return nil, nil - } -} - -// seriesByBinaryExprIterator returns a series iterator and a filtering expression. -func (is IndexSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr) (SeriesIDIterator, error) { - // If this binary expression has another binary expression, then this - // is some expression math and we should just pass it to the underlying query. - if _, ok := n.LHS.(*influxql.BinaryExpr); ok { - itr, err := is.measurementSeriesIDIterator(name) - if err != nil { - return nil, err - } - return newSeriesIDExprIterator(itr, n), nil - } else if _, ok := n.RHS.(*influxql.BinaryExpr); ok { - itr, err := is.measurementSeriesIDIterator(name) - if err != nil { - return nil, err - } - return newSeriesIDExprIterator(itr, n), nil - } - - // Retrieve the variable reference from the correct side of the expression. - key, ok := n.LHS.(*influxql.VarRef) - value := n.RHS - if !ok { - key, ok = n.RHS.(*influxql.VarRef) - if !ok { - // This is an expression we do not know how to evaluate. Let the - // query engine take care of this. - itr, err := is.measurementSeriesIDIterator(name) - if err != nil { - return nil, err - } - return newSeriesIDExprIterator(itr, n), nil - } - value = n.LHS - } - - // For fields, return all series from this measurement. - if key.Val != "_name" && (key.Type == influxql.AnyField || (key.Type != influxql.Tag && key.Type != influxql.Unknown)) { - itr, err := is.measurementSeriesIDIterator(name) - if err != nil { - return nil, err - } - return newSeriesIDExprIterator(itr, n), nil - } else if value, ok := value.(*influxql.VarRef); ok { - // Check if the RHS is a variable and if it is a field. - if value.Val != "_name" && (key.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) { - itr, err := is.measurementSeriesIDIterator(name) - if err != nil { - return nil, err - } - return newSeriesIDExprIterator(itr, n), nil - } - } - - // Create iterator based on value type. - switch value := value.(type) { - case *influxql.StringLiteral: - return is.seriesByBinaryExprStringIterator(name, []byte(key.Val), []byte(value.Val), n.Op) - case *influxql.RegexLiteral: - return is.seriesByBinaryExprRegexIterator(name, []byte(key.Val), value.Val, n.Op) - case *influxql.VarRef: - return is.seriesByBinaryExprVarRefIterator(name, []byte(key.Val), value, n.Op) - default: - // We do not know how to evaluate this expression so pass it - // on to the query engine. - itr, err := is.measurementSeriesIDIterator(name) - if err != nil { - return nil, err - } - return newSeriesIDExprIterator(itr, n), nil - } -} - -func (is IndexSet) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (SeriesIDIterator, error) { - // Special handling for "_name" to match measurement name. - if bytes.Equal(key, []byte("_name")) { - if (op == influxql.EQ && bytes.Equal(value, name)) || (op == influxql.NEQ && !bytes.Equal(value, name)) { - return is.measurementSeriesIDIterator(name) - } - return nil, nil - } - - if op == influxql.EQ { - // Match a specific value. - if len(value) != 0 { - return is.tagValueSeriesIDIterator(name, key, value) - } - - mitr, err := is.measurementSeriesIDIterator(name) - if err != nil { - return nil, err - } - - kitr, err := is.tagKeySeriesIDIterator(name, key) - if err != nil { - if mitr != nil { - mitr.Close() - } - return nil, err - } - - // Return all measurement series that have no values from this tag key. - return DifferenceSeriesIDIterators(mitr, kitr), nil - } - - // Return all measurement series without this tag value. - if len(value) != 0 { - mitr, err := is.measurementSeriesIDIterator(name) - if err != nil { - return nil, err - } - - vitr, err := is.tagValueSeriesIDIterator(name, key, value) - if err != nil { - if mitr != nil { - mitr.Close() - } - return nil, err - } - - return DifferenceSeriesIDIterators(mitr, vitr), nil - } - - // Return all series across all values of this tag key. - return is.tagKeySeriesIDIterator(name, key) -} - -func (is IndexSet) seriesByBinaryExprRegexIterator(name, key []byte, value *regexp.Regexp, op influxql.Token) (SeriesIDIterator, error) { - // Special handling for "_name" to match measurement name. - if bytes.Equal(key, []byte("_name")) { - match := value.Match(name) - if (op == influxql.EQREGEX && match) || (op == influxql.NEQREGEX && !match) { - mitr, err := is.measurementSeriesIDIterator(name) - if err != nil { - return nil, err - } - return newSeriesIDExprIterator(mitr, &influxql.BooleanLiteral{Val: true}), nil - } - return nil, nil - } - return is.matchTagValueSeriesIDIterator(name, key, value, op == influxql.EQREGEX) -} - -func (is IndexSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *influxql.VarRef, op influxql.Token) (SeriesIDIterator, error) { - itr0, err := is.tagKeySeriesIDIterator(name, key) - if err != nil { - return nil, err - } - - itr1, err := is.tagKeySeriesIDIterator(name, []byte(value.Val)) - if err != nil { - if itr0 != nil { - itr0.Close() - } - return nil, err - } - - if op == influxql.EQ { - return IntersectSeriesIDIterators(itr0, itr1), nil - } - return DifferenceSeriesIDIterators(itr0, itr1), nil -} - -// MatchTagValueSeriesIDIterator returns a series iterator for tags which match value. -// If matches is false, returns iterators which do not match value. -func (is IndexSet) MatchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (SeriesIDIterator, error) { - release := is.SeriesFile.Retain() - defer release() - itr, err := is.matchTagValueSeriesIDIterator(name, key, value, matches) - if err != nil { - return nil, err - } - return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil -} - -// matchTagValueSeriesIDIterator returns a series iterator for tags which match -// value. See MatchTagValueSeriesIDIterator for more details. -// -// It guarantees to never take any locks on the underlying series file. -func (is IndexSet) matchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (SeriesIDIterator, error) { - matchEmpty := value.MatchString("") - if matches { - if matchEmpty { - return is.matchTagValueEqualEmptySeriesIDIterator(name, key, value) - } - return is.matchTagValueEqualNotEmptySeriesIDIterator(name, key, value) - } - - if matchEmpty { - return is.matchTagValueNotEqualEmptySeriesIDIterator(name, key, value) - } - return is.matchTagValueNotEqualNotEmptySeriesIDIterator(name, key, value) -} - -func (is IndexSet) matchTagValueEqualEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) { - vitr, err := is.tagValueIterator(name, key) - if err != nil { - return nil, err - } else if vitr == nil { - return is.measurementSeriesIDIterator(name) - } - defer vitr.Close() - - var itrs []SeriesIDIterator - if err := func() error { - for { - e, err := vitr.Next() - if err != nil { - return err - } else if e == nil { - break - } - - if !value.Match(e) { - itr, err := is.tagValueSeriesIDIterator(name, key, e) - if err != nil { - return err - } else if itr != nil { - itrs = append(itrs, itr) - } - } - } - return nil - }(); err != nil { - SeriesIDIterators(itrs).Close() - return nil, err - } - - mitr, err := is.measurementSeriesIDIterator(name) - if err != nil { - SeriesIDIterators(itrs).Close() - return nil, err - } - - return DifferenceSeriesIDIterators(mitr, MergeSeriesIDIterators(itrs...)), nil -} - -func (is IndexSet) matchTagValueEqualNotEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) { - vitr, err := is.tagValueIterator(name, key) - if err != nil { - return nil, err - } else if vitr == nil { - return nil, nil - } - defer vitr.Close() - - var itrs []SeriesIDIterator - for { - e, err := vitr.Next() - if err != nil { - SeriesIDIterators(itrs).Close() - return nil, err - } else if e == nil { - break - } - - if value.Match(e) { - itr, err := is.tagValueSeriesIDIterator(name, key, e) - if err != nil { - SeriesIDIterators(itrs).Close() - return nil, err - } else if itr != nil { - itrs = append(itrs, itr) - } - } - } - return MergeSeriesIDIterators(itrs...), nil -} - -func (is IndexSet) matchTagValueNotEqualEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) { - vitr, err := is.tagValueIterator(name, key) - if err != nil { - return nil, err - } else if vitr == nil { - return nil, nil - } - defer vitr.Close() - - var itrs []SeriesIDIterator - for { - e, err := vitr.Next() - if err != nil { - SeriesIDIterators(itrs).Close() - return nil, err - } else if e == nil { - break - } - - if !value.Match(e) { - itr, err := is.tagValueSeriesIDIterator(name, key, e) - if err != nil { - SeriesIDIterators(itrs).Close() - return nil, err - } else if itr != nil { - itrs = append(itrs, itr) - } - } - } - return MergeSeriesIDIterators(itrs...), nil -} - -func (is IndexSet) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) { - vitr, err := is.tagValueIterator(name, key) - if err != nil { - return nil, err - } else if vitr == nil { - return is.measurementSeriesIDIterator(name) - } - defer vitr.Close() - - var itrs []SeriesIDIterator - for { - e, err := vitr.Next() - if err != nil { - SeriesIDIterators(itrs).Close() - return nil, err - } else if e == nil { - break - } - if value.Match(e) { - itr, err := is.tagValueSeriesIDIterator(name, key, e) - if err != nil { - SeriesIDIterators(itrs).Close() - return nil, err - } else if itr != nil { - itrs = append(itrs, itr) - } - } - } - - mitr, err := is.measurementSeriesIDIterator(name) - if err != nil { - SeriesIDIterators(itrs).Close() - return nil, err - } - return DifferenceSeriesIDIterators(mitr, MergeSeriesIDIterators(itrs...)), nil -} - -// TagValuesByKeyAndExpr retrieves tag values for the provided tag keys. -// -// TagValuesByKeyAndExpr returns sets of values for each key, indexable by the -// position of the tag key in the keys argument. -// -// N.B tagValuesByKeyAndExpr relies on keys being sorted in ascending -// lexicographic order. -func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) { - release := is.SeriesFile.Retain() - defer release() - return is.tagValuesByKeyAndExpr(auth, name, keys, expr) -} - -// tagValuesByKeyAndExpr retrieves tag values for the provided tag keys. See -// TagValuesByKeyAndExpr for more details. -// -// tagValuesByKeyAndExpr guarantees to never take any locks on the underlying -// series file. -func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) { - valueExpr := influxql.CloneExpr(expr) - valueExpr = influxql.Reduce(influxql.RewriteExpr(valueExpr, func(e influxql.Expr) influxql.Expr { - switch e := e.(type) { - case *influxql.BinaryExpr: - switch e.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: - tag, ok := e.LHS.(*influxql.VarRef) - if !ok || tag.Val != "value" { - return nil - } - } - } - return e - }), nil) - - itr, err := is.seriesByExprIterator(name, expr) - if err != nil { - return nil, err - } else if itr == nil { - return nil, nil - } - itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr) - defer itr.Close() - - keyIdxs := make(map[string]int, len(keys)) - for ki, key := range keys { - keyIdxs[key] = ki - - // Check that keys are in order. - if ki > 0 && key < keys[ki-1] { - return nil, fmt.Errorf("keys %v are not in ascending order", keys) - } - } - - resultSet := make([]map[string]struct{}, len(keys)) - for i := 0; i < len(resultSet); i++ { - resultSet[i] = make(map[string]struct{}) - } - - // Iterate all series to collect tag values. - for { - e, err := itr.Next() - if err != nil { - return nil, err - } else if e.SeriesID.IsZero() { - break - } - - buf := is.SeriesFile.SeriesKey(e.SeriesID) - if len(buf) == 0 { - continue - } - - _, buf = ReadSeriesKeyLen(buf) - _, buf = ReadSeriesKeyMeasurement(buf) - tagN, buf := ReadSeriesKeyTagN(buf) - for i := 0; i < tagN; i++ { - var key, value []byte - key, value, buf = ReadSeriesKeyTag(buf) - if valueExpr != nil { - if !influxql.EvalBool(valueExpr, map[string]interface{}{"value": string(value)}) { - continue - } - } - - if idx, ok := keyIdxs[string(key)]; ok { - resultSet[idx][string(value)] = struct{}{} - } else if string(key) > keys[len(keys)-1] { - // The tag key is > the largest key we're interested in. - break - } - } - } - return resultSet, nil -} - -// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression. -func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { - if len(keys) == 0 { - return nil, nil - } - - results := make([][]string, len(keys)) - // If the keys are not sorted, then sort them. - if !keysSorted { - sort.Strings(keys) - } - - release := is.SeriesFile.Retain() - defer release() - - // No expression means that the values shouldn't be filtered; so fetch them - // all. - if expr == nil { - for ki, key := range keys { - vitr, err := is.tagValueIterator(name, []byte(key)) - if err != nil { - return nil, err - } else if vitr == nil { - break - } - defer vitr.Close() - - for { - val, err := vitr.Next() - if err != nil { - return nil, err - } else if val == nil { - break - } - results[ki] = append(results[ki], string(val)) - } - } - return results, nil - } - - // This is the case where we have filtered series by some WHERE condition. - // We only care about the tag values for the keys given the - // filtered set of series ids. - resultSet, err := is.tagValuesByKeyAndExpr(auth, name, keys, expr) - if err != nil { - return nil, err - } - - // Convert result sets into []string - for i, s := range resultSet { - values := make([]string, 0, len(s)) - for v := range s { - values = append(values, v) - } - sort.Strings(values) - results[i] = values - } - return results, nil -} - -// TagSets returns an ordered list of tag sets for a measurement by dimension -// and filtered by an optional conditional expression. -func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) { - release := is.SeriesFile.Retain() - defer release() - - itr, err := is.measurementSeriesByExprIterator(name, opt.Condition) - if err != nil { - return nil, err - } else if itr == nil { - return nil, nil - } - defer itr.Close() - // measurementSeriesByExprIterator filters deleted series IDs; no need to - // do so here. - - var dims []string - if len(opt.Dimensions) > 0 { - dims = make([]string, len(opt.Dimensions)) - copy(dims, opt.Dimensions) - sort.Strings(dims) - } - - // For every series, get the tag values for the requested tag keys i.e. - // dimensions. This is the TagSet for that series. Series with the same - // TagSet are then grouped together, because for the purpose of GROUP BY - // they are part of the same composite series. - tagSets := make(map[string]*query.TagSet, 64) - var seriesN, maxSeriesN int - - if opt.MaxSeriesN > 0 { - maxSeriesN = opt.MaxSeriesN - } else { - maxSeriesN = int(^uint(0) >> 1) - } - - // The tag sets require a string for each series key in the set, The series - // file formatted keys need to be parsed into models format. Since they will - // end up as strings we can re-use an intermediate buffer for this process. - var keyBuf []byte - var tagsBuf models.Tags // Buffer for tags. Tags are not needed outside of each loop iteration. - for { - se, err := itr.Next() - if err != nil { - return nil, err - } else if se.SeriesID.IsZero() { - break - } - - // Skip if the series has been tombstoned. - key := sfile.SeriesKey(se.SeriesID) - if len(key) == 0 { - continue - } - - if seriesN&0x3fff == 0x3fff { - // check every 16384 series if the query has been canceled - select { - case <-opt.InterruptCh: - return nil, query.ErrQueryInterrupted - default: - } - } - - if seriesN > maxSeriesN { - return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN) - } - - // NOTE - must not escape this loop iteration. - _, tagsBuf = ParseSeriesKeyInto(key, tagsBuf) - var tagsAsKey []byte - if len(dims) > 0 { - tagsAsKey = MakeTagsKey(dims, tagsBuf) - } - - tagSet, ok := tagSets[string(tagsAsKey)] - if !ok { - // This TagSet is new, create a new entry for it. - tagSet = &query.TagSet{ - Tags: nil, - Key: tagsAsKey, - } - } - - // Associate the series and filter with the Tagset. - keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf) - tagSet.AddFilter(string(keyBuf), se.Expr) - keyBuf = keyBuf[:0] - - // Ensure it's back in the map. - tagSets[string(tagsAsKey)] = tagSet - seriesN++ - } - - // Sort the series in each tag set. - for _, t := range tagSets { - sort.Sort(t) - } - - // The TagSets have been created, as a map of TagSets. Just send - // the values back as a slice, sorting for consistency. - sortedTagsSets := make([]*query.TagSet, 0, len(tagSets)) - for _, v := range tagSets { - sortedTagsSets = append(sortedTagsSets, v) - } - sort.Sort(byTagKey(sortedTagsSets)) - - return sortedTagsSets, nil -} - // IndexFormat represents the format for an index. type IndexFormat int From c89c79dc024cdca53dd396dc0bb339dfbb53250b Mon Sep 17 00:00:00 2001 From: zhulongcheng Date: Tue, 23 Oct 2018 17:49:15 +0800 Subject: [PATCH 05/11] replace tsdb.Index interface with tsi1.Index instance This fix is to remove tsdb.Index interface to resolve #886. --- storage/engine.go | 2 +- tsdb/engine.go | 5 +++-- tsdb/index.go | 2 ++ tsdb/shard.go | 12 ++++-------- tsdb/tsm1/engine.go | 4 ++-- tsdb/tsm1/engine_test.go | 4 ++-- 6 files changed, 14 insertions(+), 15 deletions(-) diff --git a/storage/engine.go b/storage/engine.go index d88ad97cc2..fe24352c07 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -109,7 +109,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine { // Initialise Engine // TODO(edd): should just be able to use the config values for data/wal. - engine := tsm1.NewEngine(0, tsdb.Index(e.index), filepath.Join(path, "data"), filepath.Join(path, "wal"), e.sfile, c.EngineOptions) + engine := tsm1.NewEngine(0, e.index, filepath.Join(path, "data"), filepath.Join(path, "wal"), e.sfile, c.EngineOptions) // TODO(edd): Once the tsdb.Engine abstraction is gone, this won't be needed. e.engine = engine.(*tsm1.Engine) diff --git a/tsdb/engine.go b/tsdb/engine.go index e29cec7f8a..8eaa34d1a9 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/platform/pkg/estimator" "github.com/influxdata/platform/pkg/limiter" "go.uber.org/zap" + "github.com/influxdata/platform/tsdb/tsi1" ) var ( @@ -80,7 +81,7 @@ type SeriesIDSets interface { } // NewEngineFunc creates a new engine. -type NewEngineFunc func(id uint64, i Index, path string, walPath string, sfile *SeriesFile, options EngineOptions) Engine +type NewEngineFunc func(id uint64, i *tsi1.Index, path string, walPath string, sfile *SeriesFile, options EngineOptions) Engine // newEngineFuncs is a lookup of engine constructors by name. var newEngineFuncs = make(map[string]NewEngineFunc) @@ -105,7 +106,7 @@ func RegisteredEngines() []string { // NewEngine returns an instance of an engine based on its format. // If the path does not exist then the DefaultFormat is used. -func NewEngine(id uint64, i Index, path string, sfile *SeriesFile, options EngineOptions) (Engine, error) { +func NewEngine(id uint64, i *tsi1.Index, path string, sfile *SeriesFile, options EngineOptions) (Engine, error) { // Create a new engine if _, err := os.Stat(path); os.IsNotExist(err) { // TODO(jeff): remove walPath argument diff --git a/tsdb/index.go b/tsdb/index.go index 6f516b7641..fad9e18f93 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -1189,6 +1189,7 @@ const ( TSI1Format IndexFormat = 2 ) +// TODO(@zlc): remove // NewIndexFunc creates a new index. type NewIndexFunc func(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index @@ -1209,6 +1210,7 @@ func RegisteredIndexes() []string { return []string{TSI1IndexName} } +// TODO(@zlc): remove // NewIndex returns an instance of an index based on its format. // If the path does not exist then the DefaultFormat is used. func NewIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) (Index, error) { diff --git a/tsdb/shard.go b/tsdb/shard.go index 815fc1637a..9867869605 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/platform/models" "go.uber.org/zap" + "github.com/influxdata/platform/tsdb/tsi1" ) // SeriesFileDirectory is the name of the directory containing series files for @@ -32,7 +33,7 @@ type Shard struct { mu sync.RWMutex _engine Engine - index Index + index *tsi1.Index enabled bool baseLogger *zap.Logger @@ -105,15 +106,10 @@ func (s *Shard) Open() error { return nil } - seriesIDSet := NewSeriesIDSet() - // Initialize underlying index. ipath := filepath.Join(s.path, "index") - idx, err := NewIndex(s.id, "remove-me", ipath, seriesIDSet, s.sfile, s.options) - if err != nil { - return err - } + idx := tsi1.NewIndex(s.sfile, "remove me", tsi1.NewConfig(), tsi1.WithPath(filepath.Join(ipath, tsi1.DefaultIndexDirectoryName))) idx.WithLogger(s.baseLogger) // Open index. @@ -204,7 +200,7 @@ func (s *Shard) engineNoLock() (Engine, error) { // Index returns a reference to the underlying index. It returns an error if // the index is nil. -func (s *Shard) Index() (Index, error) { +func (s *Shard) Index() (*tsi1.Index, error) { s.mu.RLock() defer s.mu.RUnlock() if err := s.ready(); err != nil { diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index 9add48afc2..b4985e0bc2 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -121,7 +121,7 @@ const ( type Engine struct { mu sync.RWMutex - index tsdb.Index + index *tsi1.Index // The following group of fields is used to track the state of level compactions within the // Engine. The WaitGroup is used to monitor the compaction goroutines, the 'done' channel is @@ -177,7 +177,7 @@ type Engine struct { } // NewEngine returns a new instance of Engine. -func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Engine { +func NewEngine(id uint64, idx *tsi1.Index, path string, walPath string, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Engine { fs := NewFileStore(path) fs.openLimiter = opt.OpenLimiter if opt.FileStoreObserver != nil { diff --git a/tsdb/tsm1/engine_test.go b/tsdb/tsm1/engine_test.go index df48803bdc..b441052b8c 100644 --- a/tsdb/tsm1/engine_test.go +++ b/tsdb/tsm1/engine_test.go @@ -1497,7 +1497,7 @@ type Engine struct { *tsm1.Engine root string indexPath string - index tsdb.Index + index *tsi1.Index sfile *tsdb.SeriesFile } @@ -1669,7 +1669,7 @@ func (e *Engine) MustWriteSnapshot() { } } -func MustOpenIndex(id uint64, database, path string, seriesIDSet *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, options tsdb.EngineOptions) tsdb.Index { +func MustOpenIndex(id uint64, database, path string, seriesIDSet *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, options tsdb.EngineOptions) *tsi1.Index { idx := tsi1.NewIndex(sfile, database, tsi1.NewConfig(), tsi1.WithPath(path)) if err := idx.Open(); err != nil { panic(err) From 0e9185f764d4880931bc6454d5e7abdacc849dff Mon Sep 17 00:00:00 2001 From: zhulongcheng Date: Tue, 23 Oct 2018 17:56:14 +0800 Subject: [PATCH 06/11] remove tsdb.Index interface This fix is to resolve #886. --- tsdb/index.go | 90 --------------------------------------------------- 1 file changed, 90 deletions(-) diff --git a/tsdb/index.go b/tsdb/index.go index fad9e18f93..2b778812f0 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -4,68 +4,18 @@ import ( "bytes" "errors" "fmt" - "regexp" "sort" "sync" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxql" "github.com/influxdata/platform/models" - "github.com/influxdata/platform/pkg/estimator" - "go.uber.org/zap" "github.com/influxdata/platform/tsdb/tsi1" ) // Available index types. const TSI1IndexName = "tsi1" -type Index interface { - Open() error // used by this package - Close() error // used by this package - WithLogger(*zap.Logger) // used by this package - - Database() string // used by this package - MeasurementExists(name []byte) (bool, error) // used by engine - MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) // used by engine - ForEachMeasurementName(fn func(name []byte) error) error // used by engine - - InitializeSeries(*SeriesCollection) error // used by engine - CreateSeriesIfNotExists(key, name []byte, tags models.Tags, typ models.FieldType) error // used by engine - CreateSeriesListIfNotExists(*SeriesCollection) error // used by engine - DropSeries(seriesID SeriesID, key []byte, cascade bool) error // used by engine - DropMeasurementIfSeriesNotExist(name []byte) error // used by engine - - MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) // used by engine - SeriesN() int64 // used by engine - SeriesSketches() (estimator.Sketch, estimator.Sketch, error) // used by engine - SeriesIDSet() *SeriesIDSet // used by idpe - - HasTagKey(name, key []byte) (bool, error) // used by this package - HasTagValue(name, key, value []byte) (bool, error) // used by this package - - MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) // used by this package - - TagKeyCardinality(name, key []byte) int // used by engine - - // InfluxQL system iterators - MeasurementIterator() (MeasurementIterator, error) // used by this package - TagKeyIterator(name []byte) (TagKeyIterator, error) // used by this package - TagValueIterator(name, key []byte) (TagValueIterator, error) // used by this package - MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) // used by this package - TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) // used by this package - TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) // used by this package - - // To be removed w/ tsi1. - SetFieldName(measurement []byte, name string) // used by this package - - Type() string // used by this package - // Returns a unique reference ID to the index instance. - // For inmem, returns a reference to the backing Index, not ShardIndex. - UniqueReferenceID() uintptr // used by this package - - Rebuild() // used by engine -} - // SeriesElem represents a generic series element. type SeriesElem interface { Name() []byte @@ -1178,52 +1128,12 @@ func (itr *tagValueMergeIterator) Next() (_ []byte, err error) { return value, nil } -// IndexFormat represents the format for an index. -type IndexFormat int - -const ( - // InMemFormat is the format used by the original in-memory shared index. - InMemFormat IndexFormat = 1 - - // TSI1Format is the format used by the tsi1 index. - TSI1Format IndexFormat = 2 -) - -// TODO(@zlc): remove -// NewIndexFunc creates a new index. -type NewIndexFunc func(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index - -// newIndexFuncs is a lookup of index constructors by name. -var newIndexFuncs = make(map[string]NewIndexFunc) - -// RegisterIndex registers a storage index initializer by name. -func RegisterIndex(name string, fn NewIndexFunc) { - if _, ok := newIndexFuncs[name]; ok { - panic("index already registered: " + name) - } - newIndexFuncs[name] = fn -} - // RegisteredIndexes returns the slice of currently registered indexes. func RegisteredIndexes() []string { // TODO(edd): This can be removed, cleaning up test code in the process. return []string{TSI1IndexName} } -// TODO(@zlc): remove -// NewIndex returns an instance of an index based on its format. -// If the path does not exist then the DefaultFormat is used. -func NewIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) (Index, error) { - // Lookup index by format. - fn := newIndexFuncs[TSI1IndexName] - if fn == nil { - return nil, fmt.Errorf("invalid index format: %q", TSI1IndexName) - } - - // TODO(jeff): remove database argument - return fn(id, "remove-me", path, seriesIDSet, sfile, options), nil -} - // assert will panic with a given formatted message if the given condition is false. func assert(condition bool, msg string, v ...interface{}) { if !condition { From 5d66bbed4801f61475b71d017f70e7beb17a5244 Mon Sep 17 00:00:00 2001 From: zhulongcheng Date: Sat, 27 Oct 2018 00:52:31 +0800 Subject: [PATCH 07/11] remove functions for registering engine This fix is to resolve import cycle --- tsdb/config.go | 11 -------- tsdb/engine.go | 65 --------------------------------------------- tsdb/tsm1/engine.go | 4 --- 3 files changed, 80 deletions(-) diff --git a/tsdb/config.go b/tsdb/config.go index 83f8728084..4d2eb4b06d 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -105,17 +105,6 @@ func (c *Config) Validate() error { } valid := false - for _, e := range RegisteredEngines() { - if e == c.Engine { - valid = true - break - } - } - if !valid { - return fmt.Errorf("unrecognized engine %s", c.Engine) - } - - valid = false for _, e := range RegisteredIndexes() { if e == c.Index { valid = true diff --git a/tsdb/engine.go b/tsdb/engine.go index 8eaa34d1a9..5cb2469a25 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -3,12 +3,9 @@ package tsdb import ( "context" "errors" - "fmt" "io" - "os" "regexp" "runtime" - "sort" "time" "github.com/influxdata/influxql" @@ -16,7 +13,6 @@ import ( "github.com/influxdata/platform/pkg/estimator" "github.com/influxdata/platform/pkg/limiter" "go.uber.org/zap" - "github.com/influxdata/platform/tsdb/tsi1" ) var ( @@ -80,67 +76,6 @@ type SeriesIDSets interface { ForEach(f func(ids *SeriesIDSet)) error } -// NewEngineFunc creates a new engine. -type NewEngineFunc func(id uint64, i *tsi1.Index, path string, walPath string, sfile *SeriesFile, options EngineOptions) Engine - -// newEngineFuncs is a lookup of engine constructors by name. -var newEngineFuncs = make(map[string]NewEngineFunc) - -// RegisterEngine registers a storage engine initializer by name. -func RegisterEngine(name string, fn NewEngineFunc) { - if _, ok := newEngineFuncs[name]; ok { - panic("engine already registered: " + name) - } - newEngineFuncs[name] = fn -} - -// RegisteredEngines returns the slice of currently registered engines. -func RegisteredEngines() []string { - a := make([]string, 0, len(newEngineFuncs)) - for k := range newEngineFuncs { - a = append(a, k) - } - sort.Strings(a) - return a -} - -// NewEngine returns an instance of an engine based on its format. -// If the path does not exist then the DefaultFormat is used. -func NewEngine(id uint64, i *tsi1.Index, path string, sfile *SeriesFile, options EngineOptions) (Engine, error) { - // Create a new engine - if _, err := os.Stat(path); os.IsNotExist(err) { - // TODO(jeff): remove walPath argument - engine := newEngineFuncs[options.EngineVersion](id, i, path, "", sfile, options) - if options.OnNewEngine != nil { - options.OnNewEngine(engine) - } - return engine, nil - } - - // If it's a dir then it's a tsm1 engine - format := DefaultEngine - if fi, err := os.Stat(path); err != nil { - return nil, err - } else if !fi.Mode().IsDir() { - return nil, ErrUnknownEngineFormat - } else { - format = "tsm1" - } - - // Lookup engine by format. - fn := newEngineFuncs[format] - if fn == nil { - return nil, fmt.Errorf("invalid engine format: %q", format) - } - - // TODO(jeff): remove walPath argument - engine := fn(id, i, path, "", sfile, options) - if options.OnNewEngine != nil { - options.OnNewEngine(engine) - } - return engine, nil -} - // EngineOptions represents the options used to initialize the engine. type EngineOptions struct { EngineVersion string diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index b4985e0bc2..2d4ee43607 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -40,10 +40,6 @@ import ( //go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@compact.gen.go.tmpldata compact.gen.go.tmpl //go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@reader.gen.go.tmpldata reader.gen.go.tmpl -func init() { - tsdb.RegisterEngine("tsm1", NewEngine) -} - var ( // Ensure Engine implements the interface. _ tsdb.Engine = &Engine{} From 9d29874e20414d0f38cc1f5c7b9638b34d25cebd Mon Sep 17 00:00:00 2001 From: zhulongcheng Date: Sat, 27 Oct 2018 01:33:31 +0800 Subject: [PATCH 08/11] move SeriesFileDirectory constant to defaults package --- cmd/influx_inspect/buildtsi/buildtsi.go | 4 ++-- storage/engine.go | 2 +- tsdb/config.go | 1 + tsdb/defaults/defaults.go | 5 +++++ tsdb/index_test.go | 2 +- tsdb/shard.go | 4 ---- tsdb/tsm1/engine_test.go | 2 +- 7 files changed, 11 insertions(+), 9 deletions(-) diff --git a/cmd/influx_inspect/buildtsi/buildtsi.go b/cmd/influx_inspect/buildtsi/buildtsi.go index 8981ce4290..954125c2aa 100644 --- a/cmd/influx_inspect/buildtsi/buildtsi.go +++ b/cmd/influx_inspect/buildtsi/buildtsi.go @@ -115,7 +115,7 @@ func (cmd *Command) run(dataDir, walDir string) error { func (cmd *Command) processDatabase(dbName, dataDir, walDir string) error { cmd.Logger.Info("Rebuilding database", zap.String("name", dbName)) - sfile := tsdb.NewSeriesFile(filepath.Join(dataDir, tsdb.SeriesFileDirectory)) + sfile := tsdb.NewSeriesFile(filepath.Join(dataDir, tsdb.DefaultSeriesFileDirectory)) sfile.Logger = cmd.Logger if err := sfile.Open(); err != nil { return err @@ -131,7 +131,7 @@ func (cmd *Command) processDatabase(dbName, dataDir, walDir string) error { rpName := fi.Name() if !fi.IsDir() { continue - } else if rpName == tsdb.SeriesFileDirectory { + } else if rpName == tsdb.DefaultSeriesFileDirectory { continue } else if cmd.retentionFilter != "" && rpName != cmd.retentionFilter { continue diff --git a/storage/engine.go b/storage/engine.go index fe24352c07..a24ab79d30 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -97,7 +97,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine { e := &Engine{ config: c, path: path, - sfile: tsdb.NewSeriesFile(filepath.Join(path, tsdb.SeriesFileDirectory)), + sfile: tsdb.NewSeriesFile(filepath.Join(path, tsdb.DefaultSeriesFileDirectory)), logger: zap.NewNop(), } diff --git a/tsdb/config.go b/tsdb/config.go index 4d2eb4b06d..2d72161519 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -26,6 +26,7 @@ const ( // See the defaults package for explanations of what these mean DefaultCompactThroughputBurst = defaults.DefaultCompactThroughputBurst DefaultMaxPointsPerBlock = defaults.DefaultMaxPointsPerBlock DefaultMaxConcurrentCompactions = defaults.DefaultMaxConcurrentCompactions + DefaultSeriesFileDirectory = defaults.DefaultSeriesFileDirectory ) // Config holds the configuration for the tsbd package. diff --git a/tsdb/defaults/defaults.go b/tsdb/defaults/defaults.go index ac97d2f7da..67aea4b86b 100644 --- a/tsdb/defaults/defaults.go +++ b/tsdb/defaults/defaults.go @@ -48,4 +48,9 @@ const ( // DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions // that can run at one time. A value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime. DefaultMaxConcurrentCompactions = 0 + + // DefaultSeriesFileDirectory is the name of the directory containing series files for + // a database. + DefaultSeriesFileDirectory = "_series" + ) diff --git a/tsdb/index_test.go b/tsdb/index_test.go index 3705b6aac0..09394999eb 100644 --- a/tsdb/index_test.go +++ b/tsdb/index_test.go @@ -80,7 +80,7 @@ func MustNewIndex(c tsi1.Config) *Index { panic(err) } - seriesPath, err := ioutil.TempDir(rootPath, tsdb.SeriesFileDirectory) + seriesPath, err := ioutil.TempDir(rootPath, tsdb.DefaultSeriesFileDirectory) if err != nil { panic(err) } diff --git a/tsdb/shard.go b/tsdb/shard.go index 9867869605..f418498fb2 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -11,10 +11,6 @@ import ( "github.com/influxdata/platform/tsdb/tsi1" ) -// SeriesFileDirectory is the name of the directory containing series files for -// a database. -const SeriesFileDirectory = "_series" - var ( // Static objects to prevent small allocs. timeBytes = []byte("time") diff --git a/tsdb/tsm1/engine_test.go b/tsdb/tsm1/engine_test.go index b441052b8c..cf6bbd47a0 100644 --- a/tsdb/tsm1/engine_test.go +++ b/tsdb/tsm1/engine_test.go @@ -1516,7 +1516,7 @@ func NewEngine() (*Engine, error) { } // Setup series file. - sfile := tsdb.NewSeriesFile(filepath.Join(dbPath, tsdb.SeriesFileDirectory)) + sfile := tsdb.NewSeriesFile(filepath.Join(dbPath, tsdb.DefaultSeriesFileDirectory)) sfile.Logger = logger.New(os.Stdout) if err = sfile.Open(); err != nil { return nil, err From f6104a7e78a39f9dc5a4221b5b47a5e766b63490 Mon Sep 17 00:00:00 2001 From: zhulongcheng Date: Sat, 27 Oct 2018 01:39:36 +0800 Subject: [PATCH 09/11] remove unused Shard --- tsdb/shard.go | 297 -------------------------------------------------- 1 file changed, 297 deletions(-) delete mode 100644 tsdb/shard.go diff --git a/tsdb/shard.go b/tsdb/shard.go deleted file mode 100644 index f418498fb2..0000000000 --- a/tsdb/shard.go +++ /dev/null @@ -1,297 +0,0 @@ -package tsdb - -import ( - "context" - "fmt" - "path/filepath" - "sync" - - "github.com/influxdata/platform/models" - "go.uber.org/zap" - "github.com/influxdata/platform/tsdb/tsi1" -) - -var ( - // Static objects to prevent small allocs. - timeBytes = []byte("time") -) - -// Shard represents a self-contained time series database. An inverted index of -// the measurement and tag data is kept along with the raw time series data. -// Data can be split across many shards. The query engine in TSDB is responsible -// for combining the output of many shards into a single query result. -type Shard struct { - path string - id uint64 - - sfile *SeriesFile - options EngineOptions - - mu sync.RWMutex - _engine Engine - index *tsi1.Index - enabled bool - - baseLogger *zap.Logger - logger *zap.Logger - - EnableOnOpen bool - - // CompactionDisabled specifies the shard should not schedule compactions. - // This option is intended for offline tooling. - CompactionDisabled bool -} - -// NewShard returns a new initialized Shard. -func NewShard(id uint64, path string, sfile *SeriesFile, opt EngineOptions) *Shard { - logger := zap.NewNop() - - s := &Shard{ - id: id, - path: path, - sfile: sfile, - options: opt, - - logger: logger, - baseLogger: logger, - EnableOnOpen: true, - } - return s -} - -// WithLogger sets the logger on the shard. It must be called before Open. -func (s *Shard) WithLogger(log *zap.Logger) { - s.baseLogger = log - engine, err := s.Engine() - if err == nil { - engine.WithLogger(s.baseLogger) - s.index.WithLogger(s.baseLogger) - } - s.logger = s.baseLogger.With(zap.String("service", "shard")) -} - -// SetEnabled enables the shard for queries and write. When disabled, all -// writes and queries return an error and compactions are stopped for the shard. -func (s *Shard) SetEnabled(enabled bool) { - s.mu.Lock() - // Prevent writes and queries - s.enabled = enabled - if s._engine != nil && !s.CompactionDisabled { - // Disable background compactions and snapshotting - s._engine.SetEnabled(enabled) - } - s.mu.Unlock() -} - -// ID returns the shards ID. -func (s *Shard) ID() uint64 { - return s.id -} - -// Path returns the path set on the shard when it was created. -func (s *Shard) Path() string { return s.path } - -// Open initializes and opens the shard's store. -func (s *Shard) Open() error { - if err := func() error { - s.mu.Lock() - defer s.mu.Unlock() - - // Return if the shard is already open - if s._engine != nil { - return nil - } - - // Initialize underlying index. - ipath := filepath.Join(s.path, "index") - - idx := tsi1.NewIndex(s.sfile, "remove me", tsi1.NewConfig(), tsi1.WithPath(filepath.Join(ipath, tsi1.DefaultIndexDirectoryName))) - idx.WithLogger(s.baseLogger) - - // Open index. - if err := idx.Open(); err != nil { - return err - } - s.index = idx - - // Initialize underlying engine. - e, err := NewEngine(s.id, idx, s.path, s.sfile, s.options) - if err != nil { - return err - } - - // Set log output on the engine. - e.WithLogger(s.baseLogger) - - // Disable compactions while loading the index - e.SetEnabled(false) - - // Open engine. - if err := e.Open(); err != nil { - return err - } - - s._engine = e - - return nil - }(); err != nil { - s.close() - return NewShardError(s.id, err) - } - - if s.EnableOnOpen { - // enable writes, queries and compactions - s.SetEnabled(true) - } - - return nil -} - -// Close shuts down the shard's store. -func (s *Shard) Close() error { - s.mu.Lock() - defer s.mu.Unlock() - return s.close() -} - -// close closes the shard an removes reference to the shard from associated -// indexes, unless clean is false. -func (s *Shard) close() error { - if s._engine == nil { - return nil - } - - err := s._engine.Close() - if err == nil { - s._engine = nil - } - - if e := s.index.Close(); e == nil { - s.index = nil - } - return err -} - -// ready determines if the Shard is ready for queries or writes. -// It returns nil if ready, otherwise ErrShardClosed or ErrShardDisabled -func (s *Shard) ready() error { - return nil // TODO(edd)remove -} - -// Engine returns a reference to the currently loaded engine. -func (s *Shard) Engine() (Engine, error) { - s.mu.RLock() - defer s.mu.RUnlock() - return s.engineNoLock() -} - -// engineNoLock is similar to calling Engine(), but the caller must guarantee -// that they already hold an appropriate lock. -func (s *Shard) engineNoLock() (Engine, error) { - if err := s.ready(); err != nil { - return nil, err - } - return s._engine, nil -} - -// Index returns a reference to the underlying index. It returns an error if -// the index is nil. -func (s *Shard) Index() (*tsi1.Index, error) { - s.mu.RLock() - defer s.mu.RUnlock() - if err := s.ready(); err != nil { - return nil, err - } - return s.index, nil -} - -// WritePoints will write the raw data points and any new metadata to the index in the shard. -func (s *Shard) WritePoints(points []models.Point) error { - collection := NewSeriesCollection(points) - - j := 0 - for iter := collection.Iterator(); iter.Next(); { - tags := iter.Tags() - - // Filter out any tags with key equal to "time": they are invalid. - if tags.Get(timeBytes) != nil { - if collection.Reason == "" { - collection.Reason = fmt.Sprintf( - "invalid tag key: input tag %q on measurement %q is invalid", - timeBytes, iter.Name()) - } - collection.Dropped++ - collection.DroppedKeys = append(collection.DroppedKeys, iter.Key()) - continue - } - - // Drop any series with invalid unicode characters in the key. - if s.options.Config.ValidateKeys && !models.ValidKeyTokens(string(iter.Name()), tags) { - if collection.Reason == "" { - collection.Reason = fmt.Sprintf( - "key contains invalid unicode: %q", - iter.Key()) - } - collection.Dropped++ - collection.DroppedKeys = append(collection.DroppedKeys, iter.Key()) - continue - } - - // TODO(jeff): do we have to filter entries where the only field is "time"? - - collection.Copy(j, iter.Index()) - j++ - } - collection.Truncate(j) - - s.mu.RLock() - defer s.mu.RUnlock() - - engine, err := s.engineNoLock() - if err != nil { - return err - } - - // make sure the series exist - if err := engine.CreateSeriesListIfNotExists(collection); err != nil { - // ignore PartialWriteErrors. The collection captures it. - if _, ok := err.(PartialWriteError); !ok { - return err - } - } - - // Write to the engine. - if err := engine.WritePoints(collection.Points); err != nil { - return fmt.Errorf("engine: %s", err) - } - - return collection.PartialWriteError() -} - -// DeleteSeriesRangeWithPredicate deletes all values from for seriesKeys between min and max (inclusive) -// for which predicate() returns true. If predicate() is nil, then all values in range are deleted. -func (s *Shard) DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error { - engine, err := s.Engine() - if err != nil { - return err - } - return engine.DeleteSeriesRangeWithPredicate(itr, predicate) -} - -// SeriesN returns the unique number of series in the shard. -func (s *Shard) SeriesN() int64 { - engine, err := s.Engine() - if err != nil { - return 0 - } - return engine.SeriesN() -} - -// CreateCursorIterator creates a CursorIterator for the shard. -func (s *Shard) CreateCursorIterator(ctx context.Context) (CursorIterator, error) { - engine, err := s.Engine() - if err != nil { - return nil, err - } - return engine.CreateCursorIterator(ctx) -} From 268832ee6489e227c5b0da31b61bede40a63e2e0 Mon Sep 17 00:00:00 2001 From: zhulongcheng Date: Sat, 27 Oct 2018 01:49:37 +0800 Subject: [PATCH 10/11] remove unused seriesPointIterator --- tsdb/index.go | 137 -------------------------------------------------- 1 file changed, 137 deletions(-) diff --git a/tsdb/index.go b/tsdb/index.go index 2b778812f0..d7a1d1159c 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -2,15 +2,12 @@ package tsdb import ( "bytes" - "errors" "fmt" - "sort" "sync" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxql" "github.com/influxdata/platform/models" - "github.com/influxdata/platform/tsdb/tsi1" ) // Available index types. @@ -678,140 +675,6 @@ func (itr *seriesIDDifferenceIterator) Next() (_ SeriesIDElem, err error) { } } -// seriesPointIterator adapts SeriesIterator to an influxql.Iterator. -type seriesPointIterator struct { - once sync.Once - index *tsi1.Index - mitr MeasurementIterator - keys [][]byte - opt query.IteratorOptions - - point query.FloatPoint // reusable point -} - -// newSeriesPointIterator returns a new instance of seriesPointIterator. -func NewSeriesPointIterator(index *tsi1.Index, opt query.IteratorOptions) (_ query.Iterator, err error) { - // Only equality operators are allowed. - influxql.WalkFunc(opt.Condition, func(n influxql.Node) { - switch n := n.(type) { - case *influxql.BinaryExpr: - switch n.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX, - influxql.OR, influxql.AND: - default: - err = errors.New("invalid tag comparison operator") - } - } - }) - if err != nil { - return nil, err - } - - mitr, err := index.MeasurementIterator() - if err != nil { - return nil, err - } - - return &seriesPointIterator{ - index: index, - mitr: mitr, - point: query.FloatPoint{ - Aux: make([]interface{}, len(opt.Aux)), - }, - opt: opt, - }, nil -} - -// Stats returns stats about the points processed. -func (itr *seriesPointIterator) Stats() query.IteratorStats { return query.IteratorStats{} } - -// Close closes the iterator. -func (itr *seriesPointIterator) Close() (err error) { - itr.once.Do(func() { - if itr.mitr != nil { - err = itr.mitr.Close() - } - }) - return err -} - -// Next emits the next point in the iterator. -func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) { - for { - // Read series keys for next measurement if no more keys remaining. - // Exit if there are no measurements remaining. - if len(itr.keys) == 0 { - m, err := itr.mitr.Next() - if err != nil { - return nil, err - } else if m == nil { - return nil, nil - } - - if err := itr.readSeriesKeys(m); err != nil { - return nil, err - } - continue - } - - name, tags := ParseSeriesKey(itr.keys[0]) - itr.keys = itr.keys[1:] - - // Convert to a key. - key := string(models.MakeKey(name, tags)) - - // Write auxiliary fields. - for i, f := range itr.opt.Aux { - switch f.Val { - case "key": - itr.point.Aux[i] = key - } - } - - return &itr.point, nil - } -} - -func (itr *seriesPointIterator) readSeriesKeys(name []byte) error { - sitr, err := itr.index.MeasurementSeriesByExprIterator(name, itr.opt.Condition) - if err != nil { - return err - } else if sitr == nil { - return nil - } - defer sitr.Close() - - // Slurp all series keys. - itr.keys = itr.keys[:0] - for i := 0; ; i++ { - elem, err := sitr.Next() - if err != nil { - return err - } else if elem.SeriesID.IsZero() { - break - } - - // Periodically check for interrupt. - if i&0xFF == 0xFF { - select { - case <-itr.opt.InterruptCh: - return itr.Close() - default: - } - } - - key := itr.index.SeriesFile().SeriesKey(elem.SeriesID) - if len(key) == 0 { - continue - } - itr.keys = append(itr.keys, key) - } - - // Sort keys. - sort.Sort(seriesKeys(itr.keys)) - return nil -} - // MeasurementIterator represents a iterator over a list of measurements. type MeasurementIterator interface { Close() error From 1dd0d33b1eec3188d9efc440ec83600d94b549cd Mon Sep 17 00:00:00 2001 From: zhulongcheng Date: Sat, 27 Oct 2018 01:57:18 +0800 Subject: [PATCH 11/11] fix type assertion err --- tsdb/tsm1/engine.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index 2d4ee43607..1b07f28a92 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -1189,17 +1189,16 @@ func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, predica // Ensure that the index does not compact away the measurement or series we're // going to delete before we're done with them. - if tsiIndex, ok := e.index.(*tsi1.Index); ok { - tsiIndex.DisableCompactions() - defer tsiIndex.EnableCompactions() - tsiIndex.Wait() + e.index.DisableCompactions() + defer e.index.EnableCompactions() + e.index.Wait() - fs, err := tsiIndex.RetainFileSet() - if err != nil { - return err - } - defer fs.Release() + fs, err := e.index.RetainFileSet() + if err != nil { + return err } + defer fs.Release() + var ( sz int