Fix tsi TAG KEYS iterator.

pull/7913/head
Ben Johnson 2016-12-06 10:30:41 -07:00
parent 759ff4ab80
commit 183418dcbd
No known key found for this signature in database
GPG Key ID: 81741CD251883081
7 changed files with 152 additions and 63 deletions

View File

@ -55,7 +55,7 @@ type Engine interface {
DeleteMeasurement(name []byte) error
// TagKeys(name []byte) ([][]byte, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
// InfluxQL iterators

View File

@ -307,7 +307,7 @@ func (e *Engine) ForEachMeasurementSeriesByExpr(name []byte, condition influxql.
return e.index.ForEachMeasurementSeriesByExpr(name, condition, fn)
}
func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
return e.index.MeasurementTagKeysByExpr(name, expr)
}

View File

@ -30,7 +30,7 @@ type Index interface {
Dereference(b []byte)
TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
// InfluxQL system iterators

View File

@ -209,7 +209,7 @@ func (i *Index) CreateMeasurementIndexIfNotExists(name string) *tsdb.Measurement
}
// MeasurementTagKeyByExpr returns an ordered set of tag keys filtered by an expression.
func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
i.mu.RLock()
defer i.mu.RUnlock()
@ -217,20 +217,7 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) ([][]b
if mm == nil {
return nil, nil
}
keySet, _, err := mm.TagKeysByExpr(expr)
if err != nil {
return nil, err
}
// Convert string set to byte slice list.
a := make([][]byte, 0, len(keySet))
for key := range keySet {
a = append(a, []byte(key))
}
bytesutil.Sort(a)
return a, nil
return mm.TagKeysByExpr(expr)
}
// ForEachMeasurementTagKey iterates over all tag keys for a measurement.

View File

@ -523,9 +523,89 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro
// Dereference is a nop.
func (i *Index) Dereference([]byte) {}
// MeasurementTagKeyByExpr returns an ordered set of tag keys filtered by an expression.
func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
panic("TODO")
// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
switch e := expr.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok {
return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
} else if tag.Val != "_tagKey" {
return nil, nil
}
if influxql.IsRegexOp(e.Op) {
re, ok := e.RHS.(*influxql.RegexLiteral)
if !ok {
return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String())
}
return i.tagKeysByFilter(name, e.Op, nil, re.Val), nil
}
s, ok := e.RHS.(*influxql.StringLiteral)
if !ok {
return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String())
}
return i.tagKeysByFilter(name, e.Op, []byte(s.Val), nil), nil
case influxql.AND, influxql.OR:
lhs, err := i.MeasurementTagKeysByExpr(name, e.LHS)
if err != nil {
return nil, err
}
rhs, err := i.MeasurementTagKeysByExpr(name, e.RHS)
if err != nil {
return nil, err
}
if lhs != nil && rhs != nil {
if e.Op == influxql.OR {
return unionStringSets(lhs, rhs), nil
}
return intersectStringSets(lhs, rhs), nil
} else if lhs != nil {
return lhs, nil
} else if rhs != nil {
return rhs, nil
}
return nil, nil
default:
return nil, fmt.Errorf("invalid operator")
}
case *influxql.ParenExpr:
return i.MeasurementTagKeysByExpr(name, e.Expr)
}
return nil, fmt.Errorf("%#v", expr)
}
// tagKeysByFilter will filter the tag keys for the measurement.
func (i *Index) tagKeysByFilter(name []byte, op influxql.Token, val []byte, regex *regexp.Regexp) map[string]struct{} {
ss := make(map[string]struct{})
itr := i.TagKeyIterator(name)
for e := itr.Next(); e != nil; e = itr.Next() {
var matched bool
switch op {
case influxql.EQ:
matched = bytes.Equal(e.Key(), val)
case influxql.NEQ:
matched = !bytes.Equal(e.Key(), val)
case influxql.EQREGEX:
matched = regex.Match(e.Key())
case influxql.NEQREGEX:
matched = !regex.Match(e.Key())
}
if !matched {
continue
}
ss[string(e.Key())] = struct{}{}
}
return ss
}
// TagKeySeriesIterator returns a series iterator for all values across a single key.
@ -1051,3 +1131,30 @@ func (itr *seriesPointIterator) Next() (*influxql.FloatPoint, error) {
return &itr.point, nil
}
}
// unionStringSets returns the union of two sets
func unionStringSets(a, b map[string]struct{}) map[string]struct{} {
other := make(map[string]struct{})
for k := range a {
other[k] = struct{}{}
}
for k := range b {
other[k] = struct{}{}
}
return other
}
// intersectStringSets returns the intersection of two sets.
func intersectStringSets(a, b map[string]struct{}) map[string]struct{} {
if len(a) < len(b) {
a, b = b, a
}
other := make(map[string]struct{})
for k := range a {
if _, ok := b[k]; ok {
other[k] = struct{}{}
}
}
return other
}

View File

@ -812,68 +812,63 @@ func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error
}
// tagKeysByExpr extracts the tag keys wanted by the expression.
func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) {
func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (map[string]struct{}, error) {
switch e := expr.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok {
return nil, false, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
}
if tag.Val != "_tagKey" {
return nil, false, nil
}
tf := TagFilter{
Op: e.Op,
return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
} else if tag.Val != "_tagKey" {
return nil, nil
}
if influxql.IsRegexOp(e.Op) {
re, ok := e.RHS.(*influxql.RegexLiteral)
if !ok {
return nil, false, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String())
return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String())
}
tf.Regex = re.Val
} else {
return m.tagKeysByFilter(e.Op, "", re.Val), nil
}
s, ok := e.RHS.(*influxql.StringLiteral)
if !ok {
return nil, false, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String())
return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String())
}
tf.Value = s.Val
}
return m.tagKeysByFilter(tf.Op, tf.Value, tf.Regex), true, nil
return m.tagKeysByFilter(e.Op, s.Val, nil), nil
case influxql.AND, influxql.OR:
lhsKeys, lhsOk, err := m.TagKeysByExpr(e.LHS)
lhs, err := m.TagKeysByExpr(e.LHS)
if err != nil {
return nil, false, err
return nil, err
}
rhsKeys, rhsOk, err := m.TagKeysByExpr(e.RHS)
rhs, err := m.TagKeysByExpr(e.RHS)
if err != nil {
return nil, false, err
return nil, err
}
if lhsOk && rhsOk {
if lhs != nil && rhs != nil {
if e.Op == influxql.OR {
return lhsKeys.union(rhsKeys), true, nil
return stringSet(lhs).union(rhs), nil
}
return stringSet(lhs).intersect(rhs), nil
} else if lhs != nil {
return lhs, nil
} else if rhs != nil {
return rhs, nil
}
return nil, nil
default:
return nil, fmt.Errorf("invalid operator")
}
return lhsKeys.intersect(rhsKeys), true, nil
} else if lhsOk {
return lhsKeys, true, nil
} else if rhsOk {
return rhsKeys, true, nil
}
return nil, false, nil
default:
return nil, false, fmt.Errorf("invalid operator")
}
case *influxql.ParenExpr:
return m.TagKeysByExpr(e.Expr)
}
return nil, false, fmt.Errorf("%#v", expr)
return nil, fmt.Errorf("%#v", expr)
}
// tagKeysByFilter will filter the tag keys for the measurement.

View File

@ -903,20 +903,20 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err
}
for _, name := range names {
/*
// Determine a list of keys from condition.
keySet, err := sh.engine.MeasurementTagKeysByExpr(name, cond)
if err != nil {
return nil, err
}
*/
// Loop over all keys for each series.
m := make(map[KeyValue]struct{})
if err := sh.engine.ForEachMeasurementSeriesByExpr(name, filterExpr, func(tags models.Tags) error {
for _, t := range tags {
if _, ok := keySet[string(t.Key)]; ok {
m[KeyValue{string(t.Key), string(t.Value)}] = struct{}{}
}
}
return nil
}); err != nil {
return nil, err