From 183418dcbdd7d725ffb8c5ed4fc23c77d970b470 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 6 Dec 2016 10:30:41 -0700 Subject: [PATCH] Fix tsi TAG KEYS iterator. --- tsdb/engine.go | 2 +- tsdb/engine/tsm1/engine.go | 2 +- tsdb/index.go | 2 +- tsdb/index/inmem/inmem.go | 17 +----- tsdb/index/tsi1/index.go | 113 ++++++++++++++++++++++++++++++++++++- tsdb/meta.go | 63 ++++++++++----------- tsdb/store.go | 16 +++--- 7 files changed, 152 insertions(+), 63 deletions(-) diff --git a/tsdb/engine.go b/tsdb/engine.go index 965d4dc6c6..fa6ad12604 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -55,7 +55,7 @@ type Engine interface { DeleteMeasurement(name []byte) error // TagKeys(name []byte) ([][]byte, error) - MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) + MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error // InfluxQL iterators diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index bf87675c02..43abf3dc7c 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -307,7 +307,7 @@ func (e *Engine) ForEachMeasurementSeriesByExpr(name []byte, condition influxql. return e.index.ForEachMeasurementSeriesByExpr(name, condition, fn) } -func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { +func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { return e.index.MeasurementTagKeysByExpr(name, expr) } diff --git a/tsdb/index.go b/tsdb/index.go index 0fc7b122d0..cb2dc47b60 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -30,7 +30,7 @@ type Index interface { Dereference(b []byte) TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) - MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) + MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error // InfluxQL system iterators diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index d2440cc7d2..d6b137f083 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -209,7 +209,7 @@ func (i *Index) CreateMeasurementIndexIfNotExists(name string) *tsdb.Measurement } // MeasurementTagKeyByExpr returns an ordered set of tag keys filtered by an expression. -func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { +func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { i.mu.RLock() defer i.mu.RUnlock() @@ -217,20 +217,7 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) ([][]b if mm == nil { return nil, nil } - - keySet, _, err := mm.TagKeysByExpr(expr) - if err != nil { - return nil, err - } - - // Convert string set to byte slice list. - a := make([][]byte, 0, len(keySet)) - for key := range keySet { - a = append(a, []byte(key)) - } - bytesutil.Sort(a) - - return a, nil + return mm.TagKeysByExpr(expr) } // ForEachMeasurementTagKey iterates over all tag keys for a measurement. diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index ee06d7029e..e66df0182a 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -523,9 +523,89 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro // Dereference is a nop. func (i *Index) Dereference([]byte) {} -// MeasurementTagKeyByExpr returns an ordered set of tag keys filtered by an expression. -func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { - panic("TODO") +// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression. +func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { + switch e := expr.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok { + return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) + } else if tag.Val != "_tagKey" { + return nil, nil + } + + if influxql.IsRegexOp(e.Op) { + re, ok := e.RHS.(*influxql.RegexLiteral) + if !ok { + return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) + } + return i.tagKeysByFilter(name, e.Op, nil, re.Val), nil + } + + s, ok := e.RHS.(*influxql.StringLiteral) + if !ok { + return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) + } + return i.tagKeysByFilter(name, e.Op, []byte(s.Val), nil), nil + + case influxql.AND, influxql.OR: + lhs, err := i.MeasurementTagKeysByExpr(name, e.LHS) + if err != nil { + return nil, err + } + + rhs, err := i.MeasurementTagKeysByExpr(name, e.RHS) + if err != nil { + return nil, err + } + + if lhs != nil && rhs != nil { + if e.Op == influxql.OR { + return unionStringSets(lhs, rhs), nil + } + return intersectStringSets(lhs, rhs), nil + } else if lhs != nil { + return lhs, nil + } else if rhs != nil { + return rhs, nil + } + return nil, nil + default: + return nil, fmt.Errorf("invalid operator") + } + + case *influxql.ParenExpr: + return i.MeasurementTagKeysByExpr(name, e.Expr) + } + + return nil, fmt.Errorf("%#v", expr) +} + +// tagKeysByFilter will filter the tag keys for the measurement. +func (i *Index) tagKeysByFilter(name []byte, op influxql.Token, val []byte, regex *regexp.Regexp) map[string]struct{} { + ss := make(map[string]struct{}) + itr := i.TagKeyIterator(name) + for e := itr.Next(); e != nil; e = itr.Next() { + var matched bool + switch op { + case influxql.EQ: + matched = bytes.Equal(e.Key(), val) + case influxql.NEQ: + matched = !bytes.Equal(e.Key(), val) + case influxql.EQREGEX: + matched = regex.Match(e.Key()) + case influxql.NEQREGEX: + matched = !regex.Match(e.Key()) + } + + if !matched { + continue + } + ss[string(e.Key())] = struct{}{} + } + return ss } // TagKeySeriesIterator returns a series iterator for all values across a single key. @@ -1051,3 +1131,30 @@ func (itr *seriesPointIterator) Next() (*influxql.FloatPoint, error) { return &itr.point, nil } } + +// unionStringSets returns the union of two sets +func unionStringSets(a, b map[string]struct{}) map[string]struct{} { + other := make(map[string]struct{}) + for k := range a { + other[k] = struct{}{} + } + for k := range b { + other[k] = struct{}{} + } + return other +} + +// intersectStringSets returns the intersection of two sets. +func intersectStringSets(a, b map[string]struct{}) map[string]struct{} { + if len(a) < len(b) { + a, b = b, a + } + + other := make(map[string]struct{}) + for k := range a { + if _, ok := b[k]; ok { + other[k] = struct{}{} + } + } + return other +} diff --git a/tsdb/meta.go b/tsdb/meta.go index c512df98d3..9f03cc1516 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -812,68 +812,63 @@ func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error } // tagKeysByExpr extracts the tag keys wanted by the expression. -func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) { +func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (map[string]struct{}, error) { switch e := expr.(type) { case *influxql.BinaryExpr: switch e.Op { case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: tag, ok := e.LHS.(*influxql.VarRef) if !ok { - return nil, false, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) - } - - if tag.Val != "_tagKey" { - return nil, false, nil - } - - tf := TagFilter{ - Op: e.Op, + return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) + } else if tag.Val != "_tagKey" { + return nil, nil } if influxql.IsRegexOp(e.Op) { re, ok := e.RHS.(*influxql.RegexLiteral) if !ok { - return nil, false, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) + return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) } - tf.Regex = re.Val - } else { - s, ok := e.RHS.(*influxql.StringLiteral) - if !ok { - return nil, false, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) - } - tf.Value = s.Val + return m.tagKeysByFilter(e.Op, "", re.Val), nil } - return m.tagKeysByFilter(tf.Op, tf.Value, tf.Regex), true, nil + + s, ok := e.RHS.(*influxql.StringLiteral) + if !ok { + return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) + } + return m.tagKeysByFilter(e.Op, s.Val, nil), nil + case influxql.AND, influxql.OR: - lhsKeys, lhsOk, err := m.TagKeysByExpr(e.LHS) + lhs, err := m.TagKeysByExpr(e.LHS) if err != nil { - return nil, false, err + return nil, err } - rhsKeys, rhsOk, err := m.TagKeysByExpr(e.RHS) + rhs, err := m.TagKeysByExpr(e.RHS) if err != nil { - return nil, false, err + return nil, err } - if lhsOk && rhsOk { + if lhs != nil && rhs != nil { if e.Op == influxql.OR { - return lhsKeys.union(rhsKeys), true, nil + return stringSet(lhs).union(rhs), nil } - - return lhsKeys.intersect(rhsKeys), true, nil - } else if lhsOk { - return lhsKeys, true, nil - } else if rhsOk { - return rhsKeys, true, nil + return stringSet(lhs).intersect(rhs), nil + } else if lhs != nil { + return lhs, nil + } else if rhs != nil { + return rhs, nil } - return nil, false, nil + return nil, nil default: - return nil, false, fmt.Errorf("invalid operator") + return nil, fmt.Errorf("invalid operator") } + case *influxql.ParenExpr: return m.TagKeysByExpr(e.Expr) } - return nil, false, fmt.Errorf("%#v", expr) + + return nil, fmt.Errorf("%#v", expr) } // tagKeysByFilter will filter the tag keys for the measurement. diff --git a/tsdb/store.go b/tsdb/store.go index 8071996518..4bda5a84c1 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -903,19 +903,19 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err } for _, name := range names { - /* - // Determine a list of keys from condition. - keySet, err := sh.engine.MeasurementTagKeysByExpr(name, cond) - if err != nil { - return nil, err - } - */ + // Determine a list of keys from condition. + keySet, err := sh.engine.MeasurementTagKeysByExpr(name, cond) + if err != nil { + return nil, err + } // Loop over all keys for each series. m := make(map[KeyValue]struct{}) if err := sh.engine.ForEachMeasurementSeriesByExpr(name, filterExpr, func(tags models.Tags) error { for _, t := range tags { - m[KeyValue{string(t.Key), string(t.Value)}] = struct{}{} + if _, ok := keySet[string(t.Key)]; ok { + m[KeyValue{string(t.Key), string(t.Value)}] = struct{}{} + } } return nil }); err != nil {