package tsdb import ( "bytes" "sync" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxql" ) // guard lets one match a set of points and block until they are done. type guard struct { cond *sync.Cond done bool min int64 max int64 names map[string]struct{} expr *exprGuard } // newGuard constructs a guard that will match any points in the given min and max // time range, with the given set of measurement names, or the given expression. // The expression is optional. func newGuard(min, max int64, names []string, expr influxql.Expr) *guard { set := make(map[string]struct{}, len(names)) for _, name := range names { set[name] = struct{}{} } return &guard{ cond: sync.NewCond(new(sync.Mutex)), min: min, max: max, names: set, expr: newExprGuard(expr), } } // Matches returns true if any of the points match the guard. func (g *guard) Matches(points []models.Point) bool { if g == nil { return true } for _, pt := range points { if t := pt.Time().UnixNano(); t < g.min || t > g.max { continue } if len(g.names) == 0 && g.expr.matches(pt) { return true } else if _, ok := g.names[string(pt.Name())]; ok && g.expr.matches(pt) { return true } } return false } // Wait blocks until the guard has been marked Done. func (g *guard) Wait() { g.cond.L.Lock() for !g.done { g.cond.Wait() } g.cond.L.Unlock() } // Done signals to anyone waiting on the guard that they can proceed. func (g *guard) Done() { g.cond.L.Lock() g.done = true g.cond.Broadcast() g.cond.L.Unlock() } // exprGuard is a union of influxql.Expr based guards. a nil exprGuard matches // everything, while the zero value matches nothing. type exprGuard struct { and *[2]*exprGuard or *[2]*exprGuard tagMatches *tagGuard tagExists map[string]struct{} } type tagGuard struct { meas bool key []byte op func([]byte) bool } // empty returns true if the exprGuard is empty, meaning that it matches no points. func (e *exprGuard) empty() bool { return e != nil && e.and == nil && e.or == nil && e.tagMatches == nil && e.tagExists == nil } // newExprGuard scrutinizes the expression and returns an efficient guard. func newExprGuard(expr influxql.Expr) *exprGuard { if expr == nil { return nil } switch expr := expr.(type) { case *influxql.ParenExpr: return newExprGuard(expr.Expr) case *influxql.BooleanLiteral: if expr.Val { return nil // matches everything } return new(exprGuard) // matches nothing case *influxql.BinaryExpr: switch expr.Op { case influxql.AND: lhs, rhs := newExprGuard(expr.LHS), newExprGuard(expr.RHS) if lhs == nil { // reduce return rhs } else if rhs == nil { // reduce return lhs } else if lhs.empty() || rhs.empty() { // short circuit return new(exprGuard) } else { return &exprGuard{and: &[2]*exprGuard{lhs, rhs}} } case influxql.OR: lhs, rhs := newExprGuard(expr.LHS), newExprGuard(expr.RHS) if lhs.empty() { // reduce return rhs } else if rhs.empty() { // reduce return lhs } else if lhs == nil || rhs == nil { // short circuit return nil } else { return &exprGuard{or: &[2]*exprGuard{lhs, rhs}} } default: return newBinaryExprGuard(expr) } default: // if we couldn't analyze, match everything return nil } } // newBinaryExprGuard scrutinizes the binary expression and returns an efficient guard. func newBinaryExprGuard(expr *influxql.BinaryExpr) *exprGuard { // if it's a nested binary expression, always match. if _, ok := expr.LHS.(*influxql.BinaryExpr); ok { return nil } else if _, ok := expr.RHS.(*influxql.BinaryExpr); ok { return nil } // ensure one of the expressions is a VarRef, and make that the key. key, ok := expr.LHS.(*influxql.VarRef) value := expr.RHS if !ok { key, ok = expr.RHS.(*influxql.VarRef) if !ok { return nil } value = expr.LHS } // check the key for situations we know we can't filter. if key.Val != "_name" && key.Type != influxql.Unknown && key.Type != influxql.Tag { return nil } // scrutinize the value to return an efficient guard. switch value := value.(type) { case *influxql.StringLiteral: val := []byte(value.Val) g := &exprGuard{tagMatches: &tagGuard{ meas: key.Val == "_name", key: []byte(key.Val), }} switch expr.Op { case influxql.EQ: g.tagMatches.op = func(x []byte) bool { return bytes.Equal(val, x) } case influxql.NEQ: g.tagMatches.op = func(x []byte) bool { return !bytes.Equal(val, x) } default: // any other operator isn't valid. conservatively match everything. return nil } return g case *influxql.RegexLiteral: // There's a tradeoff between being precise and being fast. For example, if the // delete includes a very expensive regex, we don't want to run that against every // incoming point. The decision here is to match any point that has a possibly // expensive match if there is any overlap on the tags. In other words, expensive // matches get transformed into trivially matching everything. return &exprGuard{tagExists: map[string]struct{}{key.Val: {}}} case *influxql.VarRef: // We could do a better job here by encoding the two names and checking the points // against them, but I'm not quite sure how to do that. Be conservative and match // any points that contain either the key or value. // since every point has a measurement, always match if either are on the measurement. if key.Val == "_name" || value.Val == "_name" { return nil } return &exprGuard{tagExists: map[string]struct{}{ key.Val: {}, value.Val: {}, }} default: // any other value type matches everything return nil } } // matches checks if the exprGuard matches the point. func (g *exprGuard) matches(pt models.Point) bool { switch { case g == nil: return true case g.and != nil: return g.and[0].matches(pt) && g.and[1].matches(pt) case g.or != nil: return g.or[0].matches(pt) || g.or[1].matches(pt) case g.tagMatches != nil: if g.tagMatches.meas { return g.tagMatches.op(pt.Name()) } for _, tag := range pt.Tags() { if bytes.Equal(tag.Key, g.tagMatches.key) && g.tagMatches.op(tag.Value) { return true } } return false case g.tagExists != nil: for _, tag := range pt.Tags() { if _, ok := g.tagExists[string(tag.Key)]; ok { return true } } return false default: return false } }