influxdb/tsdb/guard.go

254 lines
6.3 KiB
Go

package tsdb
import (
"bytes"
"sync"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxql"
)
// guard lets one match a set of points and block until they are done.
type guard struct {
cond *sync.Cond
done bool
min int64
max int64
names map[string]struct{}
expr *exprGuard
}
// newGuard constructs a guard that will match any points in the given min and max
// time range, with the given set of measurement names, or the given expression.
// The expression is optional.
func newGuard(min, max int64, names []string, expr influxql.Expr) *guard {
set := make(map[string]struct{}, len(names))
for _, name := range names {
set[name] = struct{}{}
}
return &guard{
cond: sync.NewCond(new(sync.Mutex)),
min: min,
max: max,
names: set,
expr: newExprGuard(expr),
}
}
// Matches returns true if any of the points match the guard.
func (g *guard) Matches(points []models.Point) bool {
if g == nil {
return true
}
for _, pt := range points {
if t := pt.Time().UnixNano(); t < g.min || t > g.max {
continue
}
if len(g.names) == 0 && g.expr.matches(pt) {
return true
} else if _, ok := g.names[string(pt.Name())]; ok && g.expr.matches(pt) {
return true
}
}
return false
}
// Wait blocks until the guard has been marked Done.
func (g *guard) Wait() {
g.cond.L.Lock()
for !g.done {
g.cond.Wait()
}
g.cond.L.Unlock()
}
// Done signals to anyone waiting on the guard that they can proceed.
func (g *guard) Done() {
g.cond.L.Lock()
g.done = true
g.cond.Broadcast()
g.cond.L.Unlock()
}
// exprGuard is a union of influxql.Expr based guards. a nil exprGuard matches
// everything, while the zero value matches nothing.
type exprGuard struct {
and *[2]*exprGuard
or *[2]*exprGuard
tagMatches *tagGuard
tagExists map[string]struct{}
}
type tagGuard struct {
meas bool
key []byte
op func([]byte) bool
}
// empty returns true if the exprGuard is empty, meaning that it matches no points.
func (e *exprGuard) empty() bool {
return e != nil && e.and == nil && e.or == nil && e.tagMatches == nil && e.tagExists == nil
}
// newExprGuard scrutinizes the expression and returns an efficient guard.
func newExprGuard(expr influxql.Expr) *exprGuard {
if expr == nil {
return nil
}
switch expr := expr.(type) {
case *influxql.ParenExpr:
return newExprGuard(expr.Expr)
case *influxql.BooleanLiteral:
if expr.Val {
return nil // matches everything
}
return new(exprGuard) // matches nothing
case *influxql.BinaryExpr:
switch expr.Op {
case influxql.AND:
lhs, rhs := newExprGuard(expr.LHS), newExprGuard(expr.RHS)
if lhs == nil { // reduce
return rhs
} else if rhs == nil { // reduce
return lhs
} else if lhs.empty() || rhs.empty() { // short circuit
return new(exprGuard)
} else {
return &exprGuard{and: &[2]*exprGuard{lhs, rhs}}
}
case influxql.OR:
lhs, rhs := newExprGuard(expr.LHS), newExprGuard(expr.RHS)
if lhs.empty() { // reduce
return rhs
} else if rhs.empty() { // reduce
return lhs
} else if lhs == nil || rhs == nil { // short circuit
return nil
} else {
return &exprGuard{or: &[2]*exprGuard{lhs, rhs}}
}
default:
return newBinaryExprGuard(expr)
}
default:
// if we couldn't analyze, match everything
return nil
}
}
// newBinaryExprGuard scrutinizes the binary expression and returns an efficient guard.
func newBinaryExprGuard(expr *influxql.BinaryExpr) *exprGuard {
// if it's a nested binary expression, always match.
if _, ok := expr.LHS.(*influxql.BinaryExpr); ok {
return nil
} else if _, ok := expr.RHS.(*influxql.BinaryExpr); ok {
return nil
}
// ensure one of the expressions is a VarRef, and make that the key.
key, ok := expr.LHS.(*influxql.VarRef)
value := expr.RHS
if !ok {
key, ok = expr.RHS.(*influxql.VarRef)
if !ok {
return nil
}
value = expr.LHS
}
// check the key for situations we know we can't filter.
if key.Val != "_name" && key.Type != influxql.Unknown && key.Type != influxql.Tag {
return nil
}
// scrutinize the value to return an efficient guard.
switch value := value.(type) {
case *influxql.StringLiteral:
val := []byte(value.Val)
g := &exprGuard{tagMatches: &tagGuard{
meas: key.Val == "_name",
key: []byte(key.Val),
}}
switch expr.Op {
case influxql.EQ:
g.tagMatches.op = func(x []byte) bool { return bytes.Equal(val, x) }
case influxql.NEQ:
g.tagMatches.op = func(x []byte) bool { return !bytes.Equal(val, x) }
default: // any other operator isn't valid. conservatively match everything.
return nil
}
return g
case *influxql.RegexLiteral:
// There's a tradeoff between being precise and being fast. For example, if the
// delete includes a very expensive regex, we don't want to run that against every
// incoming point. The decision here is to match any point that has a possibly
// expensive match if there is any overlap on the tags. In other words, expensive
// matches get transformed into trivially matching everything.
return &exprGuard{tagExists: map[string]struct{}{key.Val: {}}}
case *influxql.VarRef:
// We could do a better job here by encoding the two names and checking the points
// against them, but I'm not quite sure how to do that. Be conservative and match
// any points that contain either the key or value.
// since every point has a measurement, always match if either are on the measurement.
if key.Val == "_name" || value.Val == "_name" {
return nil
}
return &exprGuard{tagExists: map[string]struct{}{
key.Val: {},
value.Val: {},
}}
default: // any other value type matches everything
return nil
}
}
// matches checks if the exprGuard matches the point.
func (g *exprGuard) matches(pt models.Point) bool {
switch {
case g == nil:
return true
case g.and != nil:
return g.and[0].matches(pt) && g.and[1].matches(pt)
case g.or != nil:
return g.or[0].matches(pt) || g.or[1].matches(pt)
case g.tagMatches != nil:
if g.tagMatches.meas {
return g.tagMatches.op(pt.Name())
}
for _, tag := range pt.Tags() {
if bytes.Equal(tag.Key, g.tagMatches.key) && g.tagMatches.op(tag.Value) {
return true
}
}
return false
case g.tagExists != nil:
for _, tag := range pt.Tags() {
if _, ok := g.tagExists[string(tag.Key)]; ok {
return true
}
}
return false
default:
return false
}
}