tsm1: implement predicate matcher from protobufs

pull/13371/head
Jeff Wendling 2019-04-28 21:39:21 -06:00 committed by Jeff Wendling
parent 4096f93891
commit 4fb7bf1730
2 changed files with 758 additions and 4 deletions

View File

@ -1,6 +1,13 @@
package tsm1
import "errors"
import (
"bytes"
"fmt"
"regexp"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage/reads/datatypes"
)
// Predicate is something that can match on a series key.
type Predicate interface {
@ -8,9 +15,591 @@ type Predicate interface {
Marshal() ([]byte, error)
}
// UnmarshalPredicate takes stored predicate bytes from a Marshal call and returns a Predicate.
func UnmarshalPredicate(data []byte) (Predicate, error) {
if data != nil {
return nil, errors.New("unimplemented")
}
if len(data) == 0 {
return nil, nil
} else if data[0] != '\x00' {
return nil, fmt.Errorf("unknown tag byte: %x", data[0])
}
pred := new(datatypes.Predicate)
if err := pred.Unmarshal(data[1:]); err != nil {
return nil, err
}
return NewProtobufPredicate(pred)
}
//
// Design
//
// Predicates lazily evaluate with memoization so that we can walk a series key
// by the tags without parsing them into a structure and allocating. Each node
// in a predicate tree keeps a cache if it has enough information to have a
// definite value. The predicate state keeps track of all of the tag key/value
// pairs passed to it, and has a reset function to start over for a new series key.
//
// For example, imagine a query like
//
// ("tag1" == "val1" AND "tag2" == "val2") OR "tag3" == "val3"
//
// The state would have tag values set on it like
//
// state.Set("tag1", "val1") => NeedMore
// state.Set("tag2", "not-val2") => NeedMore
// state.Set("tag3", "val3") => True
//
// where after the first Set, the AND and OR clauses are both NeedMore, after
// the second Set, the AND clause is False and the OR clause is NeedMore, and
// after the last Set, the AND clause is still False, and the OR clause is True.
//
// Fast resetting is achieved by having each cache maintain a pointer to the state
// and both having a generation number. When the state resets, it bumps the generation
// number, and when the value is set in the cache, it is set with the current generation
// of the state. When querying the cache, it checks if the generation still matches.
//
// Protobuf Implementation
//
// NewProtobufPredicate returns a Predicate that matches based on the comparison structure
// described by the incoming protobuf.
func NewProtobufPredicate(pred *datatypes.Predicate) (Predicate, error) {
// Walk the predicate to collect the tag refs
locs := make(map[string]int)
walkPredicateNodes(pred.Root, func(node *datatypes.Node) {
if node.GetNodeType() == datatypes.NodeTypeTagRef {
locs[node.GetTagRefValue()] = len(locs)
} else if node.GetNodeType() == datatypes.NodeTypeFieldRef {
locs[models.FieldKeyTagKey] = len(locs)
}
})
// Construct the shared state and root predicate node.
state := newPredicateState(locs)
root, err := buildPredicateNode(state, pred.Root)
if err != nil {
return nil, err
}
return &predicateMatcher{
pred: pred,
state: state,
root: root,
}, nil
}
// predicateMatcher implements Predicate for a protobuf.
type predicateMatcher struct {
pred *datatypes.Predicate
state *predicateState
root predicateNode
}
// Matches checks if the key matches the predicate by feeding individual tags into the
// state and returning as soon as the root node has a definite answer.
func (p *predicateMatcher) Matches(key []byte) bool {
p.state.Reset()
popTag := predicatePopTag
if bytes.IndexByte(key, '\\') != -1 {
popTag = predicatePopTagEscape
}
var tag, value []byte
for len(key) > 0 {
tag, value, key = popTag(key)
if tag == nil || !p.state.Set(tag, value) {
continue
}
resp := p.root.Update()
if resp == predicateResponse_true {
return true
} else if resp == predicateResponse_false {
return false
}
}
return false
}
// Marshal returns a buffer representing the protobuf predicate.
func (p *predicateMatcher) Marshal() ([]byte, error) {
// Prefix it with a zero byte so that we can change in the future if necessary
buf := make([]byte, 1+p.pred.Size())
_, err := p.pred.MarshalTo(buf[1:])
return buf, err
}
// walkPredicateNodes recursively calls the function for each node.
func walkPredicateNodes(node *datatypes.Node, fn func(node *datatypes.Node)) {
fn(node)
for _, ch := range node.Children {
walkPredicateNodes(ch, fn)
}
}
// buildPredicateNode takes a protobuf node and converts it into a predicateNode. It is strict
// in what it accepts.
func buildPredicateNode(state *predicateState, node *datatypes.Node) (predicateNode, error) {
switch node.GetNodeType() {
case datatypes.NodeTypeComparisonExpression:
children := node.GetChildren()
if len(children) != 2 {
return nil, fmt.Errorf("invalid number of children for logical expression: %v", len(children))
}
left, right := children[0], children[1]
comp := &predicateNodeComparison{
predicateCache: newPredicateCache(state),
comp: node.GetComparison(),
}
// Fill in the left side of the comparison
switch left.GetNodeType() {
// Field refs are actually for the '\xff' tag
case datatypes.NodeTypeFieldRef:
idx, ok := state.locs[models.FieldKeyTagKey]
if !ok {
return nil, fmt.Errorf("invalid field ref in comparison: %v", left.GetTagRefValue())
}
comp.leftIndex = idx
// Tag refs look up the location of the tag in the state
case datatypes.NodeTypeTagRef:
idx, ok := state.locs[left.GetTagRefValue()]
if !ok {
return nil, fmt.Errorf("invalid tag ref in comparison: %v", left.GetTagRefValue())
}
comp.leftIndex = idx
// Left literals are only allowed to be strings
case datatypes.NodeTypeLiteral:
lit, ok := left.GetValue().(*datatypes.Node_StringValue)
if !ok {
return nil, fmt.Errorf("invalid left literal in comparison: %v", left.GetValue())
}
comp.leftLiteral = []byte(lit.StringValue)
default:
return nil, fmt.Errorf("invalid left node in comparison: %v", left.GetNodeType())
}
// Fill in the right side of the comparison
switch right.GetNodeType() {
// Field refs are actually for the '\xff' tag
case datatypes.NodeTypeFieldRef:
idx, ok := state.locs[models.FieldKeyTagKey]
if !ok {
return nil, fmt.Errorf("invalid field ref in comparison: %v", left.GetTagRefValue())
}
comp.rightIndex = idx
// Tag refs look up the location of the tag in the state
case datatypes.NodeTypeTagRef:
idx, ok := state.locs[right.GetTagRefValue()]
if !ok {
return nil, fmt.Errorf("invalid tag ref in comparison: %v", right.GetTagRefValue())
}
comp.rightIndex = idx
// Right literals are allowed to be regexes as well as strings
case datatypes.NodeTypeLiteral:
switch lit := right.GetValue().(type) {
case *datatypes.Node_StringValue:
comp.rightLiteral = []byte(lit.StringValue)
case *datatypes.Node_RegexValue:
reg, err := regexp.Compile(lit.RegexValue)
if err != nil {
return nil, err
}
comp.rightReg = reg
default:
return nil, fmt.Errorf("invalid right literal in comparison: %v", right.GetValue())
}
default:
return nil, fmt.Errorf("invalid right node in comparison: %v", right.GetNodeType())
}
// Ensure that a regex is set on the right if and only if the comparison is a regex
if comp.rightReg == nil {
if comp.comp == datatypes.ComparisonRegex || comp.comp == datatypes.ComparisonNotRegex {
return nil, fmt.Errorf("invalid comparison involving regex: %v", node)
}
} else {
if comp.comp != datatypes.ComparisonRegex && comp.comp != datatypes.ComparisonNotEqual {
return nil, fmt.Errorf("invalid comparison not against regex: %v", node)
}
}
return comp, nil
case datatypes.NodeTypeLogicalExpression:
children := node.GetChildren()
if len(children) != 2 {
return nil, fmt.Errorf("invalid number of children for logical expression: %v", len(children))
}
left, err := buildPredicateNode(state, children[0])
if err != nil {
return nil, err
}
right, err := buildPredicateNode(state, children[1])
if err != nil {
return nil, err
}
switch node.GetLogical() {
case datatypes.LogicalAnd:
return &predicateNodeAnd{
predicateCache: newPredicateCache(state),
left: left,
right: right,
}, nil
case datatypes.LogicalOr:
return &predicateNodeOr{
predicateCache: newPredicateCache(state),
left: left,
right: right,
}, nil
default:
return nil, fmt.Errorf("unknown logical type: %v", node.GetLogical())
}
default:
return nil, fmt.Errorf("unsupported predicate type: %v", node.GetNodeType())
}
}
//
// Predicate Responses
//
type predicateResponse uint8
const (
predicateResponse_needMore predicateResponse = iota
predicateResponse_true
predicateResponse_false
)
//
// Predicate State
//
// predicateState keeps track of tag key=>value mappings with cheap methods
// to reset to a blank state.
type predicateState struct {
gen uint64
locs map[string]int
values [][]byte
}
// newPredicateState creates a predicateState given a map of keys to indexes into an
// an array.
func newPredicateState(locs map[string]int) *predicateState {
return &predicateState{
gen: 1, // so that caches start out unfilled since they start at 0
locs: locs,
values: make([][]byte, len(locs)),
}
}
// Reset clears any set values for the state.
func (p *predicateState) Reset() {
p.gen++
for i := range p.values {
p.values[i] = nil
}
}
// Set sets the key to be the value and returns true if the key is part of the considered
// set of keys.
func (p *predicateState) Set(key, value []byte) bool {
i, ok := p.locs[string(key)]
if ok {
p.values[i] = value
}
return ok
}
//
// Predicate Cache
//
// predicateCache interacts with the predicateState to keep determined responses
// memoized until the state has been Reset to avoid recomputing nodes.
type predicateCache struct {
state *predicateState
gen uint64
resp predicateResponse
}
// newPredicateCache constructs a predicateCache for the provided state.
func newPredicateCache(state *predicateState) predicateCache {
return predicateCache{
state: state,
gen: 0,
resp: predicateResponse_needMore,
}
}
// Cached returns the cached response and a boolean indicating if it is valid.
func (p *predicateCache) Cached() (predicateResponse, bool) {
return p.resp, p.gen == p.state.gen
}
// Store sets the cache to the provided response until the state is Reset.
func (p *predicateCache) Store(resp predicateResponse) {
p.gen = p.state.gen
p.resp = resp
}
//
// Predicate Nodes
//
// predicateNode is the interface that any parts of a predicate tree implement.
type predicateNode interface {
// Update informs the node that the state has been updated and asks it to return
// a response.
Update() predicateResponse
}
// predicateNodeAnd combines two predicate nodes with an And.
type predicateNodeAnd struct {
predicateCache
left, right predicateNode
}
// Update checks if both of the left and right nodes are true. If either is false
// then the node is definitely false. Otherwise, it needs more information.
func (p *predicateNodeAnd) Update() predicateResponse {
if resp, ok := p.Cached(); ok {
return resp
}
left := p.left.Update()
if left == predicateResponse_false {
p.Store(predicateResponse_false)
return predicateResponse_false
} else if left == predicateResponse_needMore {
return predicateResponse_needMore
}
right := p.right.Update()
if right == predicateResponse_false {
p.Store(predicateResponse_false)
return predicateResponse_false
} else if right == predicateResponse_needMore {
return predicateResponse_needMore
}
return predicateResponse_true
}
// predicateNodeOr combines two predicate nodes with an Or.
type predicateNodeOr struct {
predicateCache
left, right predicateNode
}
// Update checks if either the left and right nodes are true. If both nodes
// are false, then the node is definitely asle. Otherwise, it needs more information.
func (p *predicateNodeOr) Update() predicateResponse {
if resp, ok := p.Cached(); ok {
return resp
}
left := p.left.Update()
if left == predicateResponse_true {
p.Store(predicateResponse_true)
return predicateResponse_true
}
right := p.right.Update()
if right == predicateResponse_true {
p.Store(predicateResponse_true)
return predicateResponse_true
}
if left == predicateResponse_false && right == predicateResponse_false {
p.Store(predicateResponse_false)
return predicateResponse_false
}
return predicateResponse_needMore
}
// predicateNodeComparison compares values of tags.
type predicateNodeComparison struct {
predicateCache
comp datatypes.Node_Comparison
rightReg *regexp.Regexp
leftLiteral []byte
rightLiteral []byte
leftIndex int
rightIndex int
}
// Update checks if both sides of the comparison are determined, and if so, evaluates
// the comparison to a determined truth value.
func (p *predicateNodeComparison) Update() predicateResponse {
if resp, ok := p.Cached(); ok {
return resp
}
left := p.leftLiteral
if left == nil {
left = p.state.values[p.leftIndex]
if left == nil {
return predicateResponse_needMore
}
}
right := p.rightLiteral
if right == nil && p.rightReg != nil {
right = p.state.values[p.rightIndex]
if right == nil {
return predicateResponse_needMore
}
}
if predicateEval(p.comp, left, right, p.rightReg) {
p.Store(predicateResponse_true)
return predicateResponse_true
} else {
p.Store(predicateResponse_false)
return predicateResponse_false
}
}
// predicateEval is a helper to do the appropriate comparison depending on which comparison
// enumeration value was passed.
func predicateEval(comp datatypes.Node_Comparison, left, right []byte, rightReg *regexp.Regexp) bool {
switch comp {
case datatypes.ComparisonEqual:
return string(left) == string(right)
case datatypes.ComparisonNotEqual:
return string(left) != string(right)
case datatypes.ComparisonStartsWith:
return bytes.HasPrefix(left, right)
case datatypes.ComparisonLess:
return string(left) < string(right)
case datatypes.ComparisonLessEqual:
return string(left) <= string(right)
case datatypes.ComparisonGreater:
return string(left) > string(right)
case datatypes.ComparisonGreaterEqual:
return string(left) >= string(right)
case datatypes.ComparisonRegex:
return rightReg.Match(left)
case datatypes.ComparisonNotRegex:
return !rightReg.Match(left)
}
return false
}
//
// Popping Tags
//
// The models package has some of this logic as well, but doesn't export ways to get
// at individual tags one at a time. In the common, no escape characters case, popping
// the first tag off of a series key takes around ~10ns.
// predicatePopTag pops a tag=value pair from the front of series, returning the
// remainder in rest. it assumes there are no escaped characters in the series.
func predicatePopTag(series []byte) (tag, value []byte, rest []byte) {
// find the first ','
i := bytes.IndexByte(series, ',')
if i >= 0 && i < len(series) {
series, rest = series[:i], series[i+1:]
}
// find the first '='
j := bytes.IndexByte(series, '=')
if j >= 0 && j < len(series) {
tag, value = series[:j], series[j+1:]
}
return tag, value, rest
}
// predicatePopTagEscape pops a tag=value pair from the front of series, returning the
// remainder in rest. it assumes there are possibly/likely escaped characters in the series.
func predicatePopTagEscape(series []byte) (tag, value []byte, rest []byte) {
// find the first unescaped ','
for j := uint(0); j < uint(len(series)); {
i := bytes.IndexByte(series[j:], ',')
if i < 0 {
break
}
ui := uint(i)
if ui > 0 && ui-1 < uint(len(series)) && series[ui-1] == '\\' {
j = ui + 1
continue
}
idx := ui + j
series, rest = series[:idx], series[idx+1:]
break
}
// find the first unescaped '='
for j := uint(0); j < uint(len(series)); {
i := bytes.IndexByte(series[j:], '=')
if i < 0 {
break
}
ui := uint(i)
if ui > 0 && ui-1 < uint(len(series)) && series[ui-1] == '\\' {
j = ui + 1
continue
}
idx := ui + j
tag, value = series[:idx], series[idx+1:]
break
}
// sad time: it's possible this tag/value has escaped characters, so we have to
// find an unescape them. since the byte slice may refer to read-only memory, we
// can't do this in place, so we make copies.
if bytes.IndexByte(tag, '\\') != -1 {
unescapedTag := make([]byte, 0, len(tag))
for i, c := range tag {
if c == '\\' && i+1 < len(tag) {
if c := tag[i+1]; c == ',' || c == ' ' || c == '=' {
continue
}
}
unescapedTag = append(unescapedTag, c)
}
tag = unescapedTag
}
if bytes.IndexByte(value, '\\') != -1 {
unescapedValue := make([]byte, 0, len(value))
for i, c := range value {
if c == '\\' && i+1 < len(value) {
if c := value[i+1]; c == ',' || c == ' ' || c == '=' {
continue
}
}
unescapedValue = append(unescapedValue, c)
}
value = unescapedValue
}
return tag, value, rest
}

165
tsdb/tsm1/predicate_test.go Normal file
View File

@ -0,0 +1,165 @@
package tsm1
import (
"fmt"
"testing"
"github.com/influxdata/influxdb/storage/reads/datatypes"
)
func TestPredicate(t *testing.T) {
cases := []struct {
Name string
Predicate *datatypes.Predicate
Key string
Matches bool
}{
{
Name: "Basic Matching",
Predicate: predicate(
comparisonNode(datatypes.ComparisonEqual, tagNode("tag3"), stringNode("val3")),
),
Key: "bucketorg,tag3=val3",
Matches: true,
},
{
Name: "Basic Unmatching",
Predicate: predicate(
comparisonNode(datatypes.ComparisonEqual, tagNode("tag3"), stringNode("val3")),
),
Key: "bucketorg,tag3=val2",
Matches: false,
},
{
Name: "Compound Logical Matching",
Predicate: predicate(
orNode(
andNode(
comparisonNode(datatypes.ComparisonEqual, tagNode("foo"), stringNode("bar")),
comparisonNode(datatypes.ComparisonEqual, tagNode("baz"), stringNode("no")),
),
comparisonNode(datatypes.ComparisonEqual, tagNode("tag3"), stringNode("val3")),
),
),
Key: "bucketorg,foo=bar,baz=bif,tag3=val3",
Matches: true,
},
{
Name: "Compound Logical Unmatching",
Predicate: predicate(
orNode(
andNode(
comparisonNode(datatypes.ComparisonEqual, tagNode("foo"), stringNode("bar")),
comparisonNode(datatypes.ComparisonEqual, tagNode("baz"), stringNode("no")),
),
comparisonNode(datatypes.ComparisonEqual, tagNode("tag3"), stringNode("val3")),
),
),
Key: "bucketorg,foo=bar,baz=bif,tag3=val2",
Matches: false,
},
}
for _, test := range cases {
t.Run(test.Name, func(t *testing.T) {
pred, err := NewProtobufPredicate(test.Predicate)
if err != nil {
t.Fatal("compile failure:", err)
}
if got, exp := pred.Matches([]byte(test.Key)), test.Matches; got != exp {
t.Fatal("match failure:", "got", got, "!=", "exp", exp)
}
})
}
}
func BenchmarkPredicate(b *testing.B) {
run := func(b *testing.B, predicate *datatypes.Predicate) {
pred, err := NewProtobufPredicate(predicate)
if err != nil {
b.Fatal(err)
}
series := []byte("bucketorg,")
for i := 0; i < 10; i++ {
series = append(series, fmt.Sprintf("tag%d=val%d,", i, i)...)
}
series = series[:len(series)-1]
b.SetBytes(int64(len(series)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
pred.Matches(series)
}
}
b.Run("Basic", func(b *testing.B) {
run(b, predicate(
comparisonNode(datatypes.ComparisonEqual, tagNode("tag5"), stringNode("val5")),
))
})
b.Run("Compound", func(b *testing.B) {
run(b, predicate(
orNode(
andNode(
comparisonNode(datatypes.ComparisonEqual, tagNode("tag0"), stringNode("val0")),
comparisonNode(datatypes.ComparisonEqual, tagNode("tag6"), stringNode("val5")),
),
comparisonNode(datatypes.ComparisonEqual, tagNode("tag5"), stringNode("val5")),
),
))
})
}
//
// Helpers to create predicate protobufs
//
func tagNode(s string) *datatypes.Node {
return &datatypes.Node{
NodeType: datatypes.NodeTypeTagRef,
Value: &datatypes.Node_TagRefValue{TagRefValue: s},
}
}
func stringNode(s string) *datatypes.Node {
return &datatypes.Node{
NodeType: datatypes.NodeTypeLiteral,
Value: &datatypes.Node_StringValue{StringValue: s},
}
}
func comparisonNode(comp datatypes.Node_Comparison, left, right *datatypes.Node) *datatypes.Node {
return &datatypes.Node{
NodeType: datatypes.NodeTypeComparisonExpression,
Value: &datatypes.Node_Comparison_{Comparison: comp},
Children: []*datatypes.Node{left, right},
}
}
func andNode(left, right *datatypes.Node) *datatypes.Node {
return &datatypes.Node{
NodeType: datatypes.NodeTypeLogicalExpression,
Value: &datatypes.Node_Logical_{Logical: datatypes.LogicalAnd},
Children: []*datatypes.Node{left, right},
}
}
func orNode(left, right *datatypes.Node) *datatypes.Node {
return &datatypes.Node{
NodeType: datatypes.NodeTypeLogicalExpression,
Value: &datatypes.Node_Logical_{Logical: datatypes.LogicalOr},
Children: []*datatypes.Node{left, right},
}
}
func predicate(root *datatypes.Node) *datatypes.Predicate {
return &datatypes.Predicate{Root: root}
}