From 4fb7bf1730a286a7972b221b1a51461b8c548c57 Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Sun, 28 Apr 2019 21:39:21 -0600 Subject: [PATCH] tsm1: implement predicate matcher from protobufs --- tsdb/tsm1/predicate.go | 597 +++++++++++++++++++++++++++++++++++- tsdb/tsm1/predicate_test.go | 165 ++++++++++ 2 files changed, 758 insertions(+), 4 deletions(-) create mode 100644 tsdb/tsm1/predicate_test.go diff --git a/tsdb/tsm1/predicate.go b/tsdb/tsm1/predicate.go index f6601b0ec9..05de456078 100644 --- a/tsdb/tsm1/predicate.go +++ b/tsdb/tsm1/predicate.go @@ -1,6 +1,13 @@ package tsm1 -import "errors" +import ( + "bytes" + "fmt" + "regexp" + + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/storage/reads/datatypes" +) // Predicate is something that can match on a series key. type Predicate interface { @@ -8,9 +15,591 @@ type Predicate interface { Marshal() ([]byte, error) } +// UnmarshalPredicate takes stored predicate bytes from a Marshal call and returns a Predicate. func UnmarshalPredicate(data []byte) (Predicate, error) { - if data != nil { - return nil, errors.New("unimplemented") + if len(data) == 0 { + return nil, nil + } else if data[0] != '\x00' { + return nil, fmt.Errorf("unknown tag byte: %x", data[0]) } - return nil, nil + + pred := new(datatypes.Predicate) + if err := pred.Unmarshal(data[1:]); err != nil { + return nil, err + } + return NewProtobufPredicate(pred) +} + +// +// Design +// + +// Predicates lazily evaluate with memoization so that we can walk a series key +// by the tags without parsing them into a structure and allocating. Each node +// in a predicate tree keeps a cache if it has enough information to have a +// definite value. The predicate state keeps track of all of the tag key/value +// pairs passed to it, and has a reset function to start over for a new series key. +// +// For example, imagine a query like +// +// ("tag1" == "val1" AND "tag2" == "val2") OR "tag3" == "val3" +// +// The state would have tag values set on it like +// +// state.Set("tag1", "val1") => NeedMore +// state.Set("tag2", "not-val2") => NeedMore +// state.Set("tag3", "val3") => True +// +// where after the first Set, the AND and OR clauses are both NeedMore, after +// the second Set, the AND clause is False and the OR clause is NeedMore, and +// after the last Set, the AND clause is still False, and the OR clause is True. +// +// Fast resetting is achieved by having each cache maintain a pointer to the state +// and both having a generation number. When the state resets, it bumps the generation +// number, and when the value is set in the cache, it is set with the current generation +// of the state. When querying the cache, it checks if the generation still matches. + +// +// Protobuf Implementation +// + +// NewProtobufPredicate returns a Predicate that matches based on the comparison structure +// described by the incoming protobuf. +func NewProtobufPredicate(pred *datatypes.Predicate) (Predicate, error) { + // Walk the predicate to collect the tag refs + locs := make(map[string]int) + walkPredicateNodes(pred.Root, func(node *datatypes.Node) { + if node.GetNodeType() == datatypes.NodeTypeTagRef { + locs[node.GetTagRefValue()] = len(locs) + } else if node.GetNodeType() == datatypes.NodeTypeFieldRef { + locs[models.FieldKeyTagKey] = len(locs) + } + }) + + // Construct the shared state and root predicate node. + state := newPredicateState(locs) + root, err := buildPredicateNode(state, pred.Root) + if err != nil { + return nil, err + } + + return &predicateMatcher{ + pred: pred, + state: state, + root: root, + }, nil +} + +// predicateMatcher implements Predicate for a protobuf. +type predicateMatcher struct { + pred *datatypes.Predicate + state *predicateState + root predicateNode +} + +// Matches checks if the key matches the predicate by feeding individual tags into the +// state and returning as soon as the root node has a definite answer. +func (p *predicateMatcher) Matches(key []byte) bool { + p.state.Reset() + + popTag := predicatePopTag + if bytes.IndexByte(key, '\\') != -1 { + popTag = predicatePopTagEscape + } + + var tag, value []byte + for len(key) > 0 { + tag, value, key = popTag(key) + if tag == nil || !p.state.Set(tag, value) { + continue + } + resp := p.root.Update() + if resp == predicateResponse_true { + return true + } else if resp == predicateResponse_false { + return false + } + } + + return false +} + +// Marshal returns a buffer representing the protobuf predicate. +func (p *predicateMatcher) Marshal() ([]byte, error) { + // Prefix it with a zero byte so that we can change in the future if necessary + buf := make([]byte, 1+p.pred.Size()) + _, err := p.pred.MarshalTo(buf[1:]) + return buf, err +} + +// walkPredicateNodes recursively calls the function for each node. +func walkPredicateNodes(node *datatypes.Node, fn func(node *datatypes.Node)) { + fn(node) + for _, ch := range node.Children { + walkPredicateNodes(ch, fn) + } +} + +// buildPredicateNode takes a protobuf node and converts it into a predicateNode. It is strict +// in what it accepts. +func buildPredicateNode(state *predicateState, node *datatypes.Node) (predicateNode, error) { + switch node.GetNodeType() { + case datatypes.NodeTypeComparisonExpression: + children := node.GetChildren() + if len(children) != 2 { + return nil, fmt.Errorf("invalid number of children for logical expression: %v", len(children)) + } + left, right := children[0], children[1] + + comp := &predicateNodeComparison{ + predicateCache: newPredicateCache(state), + comp: node.GetComparison(), + } + + // Fill in the left side of the comparison + switch left.GetNodeType() { + // Field refs are actually for the '\xff' tag + case datatypes.NodeTypeFieldRef: + idx, ok := state.locs[models.FieldKeyTagKey] + if !ok { + return nil, fmt.Errorf("invalid field ref in comparison: %v", left.GetTagRefValue()) + } + comp.leftIndex = idx + + // Tag refs look up the location of the tag in the state + case datatypes.NodeTypeTagRef: + idx, ok := state.locs[left.GetTagRefValue()] + if !ok { + return nil, fmt.Errorf("invalid tag ref in comparison: %v", left.GetTagRefValue()) + } + comp.leftIndex = idx + + // Left literals are only allowed to be strings + case datatypes.NodeTypeLiteral: + lit, ok := left.GetValue().(*datatypes.Node_StringValue) + if !ok { + return nil, fmt.Errorf("invalid left literal in comparison: %v", left.GetValue()) + } + comp.leftLiteral = []byte(lit.StringValue) + + default: + return nil, fmt.Errorf("invalid left node in comparison: %v", left.GetNodeType()) + } + + // Fill in the right side of the comparison + switch right.GetNodeType() { + // Field refs are actually for the '\xff' tag + case datatypes.NodeTypeFieldRef: + idx, ok := state.locs[models.FieldKeyTagKey] + if !ok { + return nil, fmt.Errorf("invalid field ref in comparison: %v", left.GetTagRefValue()) + } + comp.rightIndex = idx + + // Tag refs look up the location of the tag in the state + case datatypes.NodeTypeTagRef: + idx, ok := state.locs[right.GetTagRefValue()] + if !ok { + return nil, fmt.Errorf("invalid tag ref in comparison: %v", right.GetTagRefValue()) + } + comp.rightIndex = idx + + // Right literals are allowed to be regexes as well as strings + case datatypes.NodeTypeLiteral: + switch lit := right.GetValue().(type) { + case *datatypes.Node_StringValue: + comp.rightLiteral = []byte(lit.StringValue) + + case *datatypes.Node_RegexValue: + reg, err := regexp.Compile(lit.RegexValue) + if err != nil { + return nil, err + } + comp.rightReg = reg + + default: + return nil, fmt.Errorf("invalid right literal in comparison: %v", right.GetValue()) + } + + default: + return nil, fmt.Errorf("invalid right node in comparison: %v", right.GetNodeType()) + } + + // Ensure that a regex is set on the right if and only if the comparison is a regex + if comp.rightReg == nil { + if comp.comp == datatypes.ComparisonRegex || comp.comp == datatypes.ComparisonNotRegex { + return nil, fmt.Errorf("invalid comparison involving regex: %v", node) + } + } else { + if comp.comp != datatypes.ComparisonRegex && comp.comp != datatypes.ComparisonNotEqual { + return nil, fmt.Errorf("invalid comparison not against regex: %v", node) + } + } + + return comp, nil + + case datatypes.NodeTypeLogicalExpression: + children := node.GetChildren() + if len(children) != 2 { + return nil, fmt.Errorf("invalid number of children for logical expression: %v", len(children)) + } + + left, err := buildPredicateNode(state, children[0]) + if err != nil { + return nil, err + } + right, err := buildPredicateNode(state, children[1]) + if err != nil { + return nil, err + } + + switch node.GetLogical() { + case datatypes.LogicalAnd: + return &predicateNodeAnd{ + predicateCache: newPredicateCache(state), + left: left, + right: right, + }, nil + + case datatypes.LogicalOr: + return &predicateNodeOr{ + predicateCache: newPredicateCache(state), + left: left, + right: right, + }, nil + + default: + return nil, fmt.Errorf("unknown logical type: %v", node.GetLogical()) + } + + default: + return nil, fmt.Errorf("unsupported predicate type: %v", node.GetNodeType()) + } +} + +// +// Predicate Responses +// + +type predicateResponse uint8 + +const ( + predicateResponse_needMore predicateResponse = iota + predicateResponse_true + predicateResponse_false +) + +// +// Predicate State +// + +// predicateState keeps track of tag key=>value mappings with cheap methods +// to reset to a blank state. +type predicateState struct { + gen uint64 + locs map[string]int + values [][]byte +} + +// newPredicateState creates a predicateState given a map of keys to indexes into an +// an array. +func newPredicateState(locs map[string]int) *predicateState { + return &predicateState{ + gen: 1, // so that caches start out unfilled since they start at 0 + locs: locs, + values: make([][]byte, len(locs)), + } +} + +// Reset clears any set values for the state. +func (p *predicateState) Reset() { + p.gen++ + + for i := range p.values { + p.values[i] = nil + } +} + +// Set sets the key to be the value and returns true if the key is part of the considered +// set of keys. +func (p *predicateState) Set(key, value []byte) bool { + i, ok := p.locs[string(key)] + if ok { + p.values[i] = value + } + return ok +} + +// +// Predicate Cache +// + +// predicateCache interacts with the predicateState to keep determined responses +// memoized until the state has been Reset to avoid recomputing nodes. +type predicateCache struct { + state *predicateState + gen uint64 + resp predicateResponse +} + +// newPredicateCache constructs a predicateCache for the provided state. +func newPredicateCache(state *predicateState) predicateCache { + return predicateCache{ + state: state, + gen: 0, + resp: predicateResponse_needMore, + } +} + +// Cached returns the cached response and a boolean indicating if it is valid. +func (p *predicateCache) Cached() (predicateResponse, bool) { + return p.resp, p.gen == p.state.gen +} + +// Store sets the cache to the provided response until the state is Reset. +func (p *predicateCache) Store(resp predicateResponse) { + p.gen = p.state.gen + p.resp = resp +} + +// +// Predicate Nodes +// + +// predicateNode is the interface that any parts of a predicate tree implement. +type predicateNode interface { + // Update informs the node that the state has been updated and asks it to return + // a response. + Update() predicateResponse +} + +// predicateNodeAnd combines two predicate nodes with an And. +type predicateNodeAnd struct { + predicateCache + left, right predicateNode +} + +// Update checks if both of the left and right nodes are true. If either is false +// then the node is definitely false. Otherwise, it needs more information. +func (p *predicateNodeAnd) Update() predicateResponse { + if resp, ok := p.Cached(); ok { + return resp + } + + left := p.left.Update() + if left == predicateResponse_false { + p.Store(predicateResponse_false) + return predicateResponse_false + } else if left == predicateResponse_needMore { + return predicateResponse_needMore + } + + right := p.right.Update() + if right == predicateResponse_false { + p.Store(predicateResponse_false) + return predicateResponse_false + } else if right == predicateResponse_needMore { + return predicateResponse_needMore + } + + return predicateResponse_true +} + +// predicateNodeOr combines two predicate nodes with an Or. +type predicateNodeOr struct { + predicateCache + left, right predicateNode +} + +// Update checks if either the left and right nodes are true. If both nodes +// are false, then the node is definitely asle. Otherwise, it needs more information. +func (p *predicateNodeOr) Update() predicateResponse { + if resp, ok := p.Cached(); ok { + return resp + } + + left := p.left.Update() + if left == predicateResponse_true { + p.Store(predicateResponse_true) + return predicateResponse_true + } + + right := p.right.Update() + if right == predicateResponse_true { + p.Store(predicateResponse_true) + return predicateResponse_true + } + + if left == predicateResponse_false && right == predicateResponse_false { + p.Store(predicateResponse_false) + return predicateResponse_false + } + + return predicateResponse_needMore +} + +// predicateNodeComparison compares values of tags. +type predicateNodeComparison struct { + predicateCache + comp datatypes.Node_Comparison + rightReg *regexp.Regexp + leftLiteral []byte + rightLiteral []byte + leftIndex int + rightIndex int +} + +// Update checks if both sides of the comparison are determined, and if so, evaluates +// the comparison to a determined truth value. +func (p *predicateNodeComparison) Update() predicateResponse { + if resp, ok := p.Cached(); ok { + return resp + } + + left := p.leftLiteral + if left == nil { + left = p.state.values[p.leftIndex] + if left == nil { + return predicateResponse_needMore + } + } + + right := p.rightLiteral + if right == nil && p.rightReg != nil { + right = p.state.values[p.rightIndex] + if right == nil { + return predicateResponse_needMore + } + } + + if predicateEval(p.comp, left, right, p.rightReg) { + p.Store(predicateResponse_true) + return predicateResponse_true + } else { + p.Store(predicateResponse_false) + return predicateResponse_false + } +} + +// predicateEval is a helper to do the appropriate comparison depending on which comparison +// enumeration value was passed. +func predicateEval(comp datatypes.Node_Comparison, left, right []byte, rightReg *regexp.Regexp) bool { + switch comp { + case datatypes.ComparisonEqual: + return string(left) == string(right) + case datatypes.ComparisonNotEqual: + return string(left) != string(right) + case datatypes.ComparisonStartsWith: + return bytes.HasPrefix(left, right) + case datatypes.ComparisonLess: + return string(left) < string(right) + case datatypes.ComparisonLessEqual: + return string(left) <= string(right) + case datatypes.ComparisonGreater: + return string(left) > string(right) + case datatypes.ComparisonGreaterEqual: + return string(left) >= string(right) + case datatypes.ComparisonRegex: + return rightReg.Match(left) + case datatypes.ComparisonNotRegex: + return !rightReg.Match(left) + } + return false +} + +// +// Popping Tags +// + +// The models package has some of this logic as well, but doesn't export ways to get +// at individual tags one at a time. In the common, no escape characters case, popping +// the first tag off of a series key takes around ~10ns. + +// predicatePopTag pops a tag=value pair from the front of series, returning the +// remainder in rest. it assumes there are no escaped characters in the series. +func predicatePopTag(series []byte) (tag, value []byte, rest []byte) { + // find the first ',' + i := bytes.IndexByte(series, ',') + if i >= 0 && i < len(series) { + series, rest = series[:i], series[i+1:] + } + + // find the first '=' + j := bytes.IndexByte(series, '=') + if j >= 0 && j < len(series) { + tag, value = series[:j], series[j+1:] + } + + return tag, value, rest +} + +// predicatePopTagEscape pops a tag=value pair from the front of series, returning the +// remainder in rest. it assumes there are possibly/likely escaped characters in the series. +func predicatePopTagEscape(series []byte) (tag, value []byte, rest []byte) { + // find the first unescaped ',' + for j := uint(0); j < uint(len(series)); { + i := bytes.IndexByte(series[j:], ',') + if i < 0 { + break + } + ui := uint(i) + + if ui > 0 && ui-1 < uint(len(series)) && series[ui-1] == '\\' { + j = ui + 1 + continue + } + + idx := ui + j + series, rest = series[:idx], series[idx+1:] + break + } + + // find the first unescaped '=' + for j := uint(0); j < uint(len(series)); { + i := bytes.IndexByte(series[j:], '=') + if i < 0 { + break + } + ui := uint(i) + + if ui > 0 && ui-1 < uint(len(series)) && series[ui-1] == '\\' { + j = ui + 1 + continue + } + + idx := ui + j + tag, value = series[:idx], series[idx+1:] + break + } + + // sad time: it's possible this tag/value has escaped characters, so we have to + // find an unescape them. since the byte slice may refer to read-only memory, we + // can't do this in place, so we make copies. + if bytes.IndexByte(tag, '\\') != -1 { + unescapedTag := make([]byte, 0, len(tag)) + for i, c := range tag { + if c == '\\' && i+1 < len(tag) { + if c := tag[i+1]; c == ',' || c == ' ' || c == '=' { + continue + } + } + unescapedTag = append(unescapedTag, c) + } + tag = unescapedTag + } + + if bytes.IndexByte(value, '\\') != -1 { + unescapedValue := make([]byte, 0, len(value)) + for i, c := range value { + if c == '\\' && i+1 < len(value) { + if c := value[i+1]; c == ',' || c == ' ' || c == '=' { + continue + } + } + unescapedValue = append(unescapedValue, c) + } + value = unescapedValue + } + + return tag, value, rest } diff --git a/tsdb/tsm1/predicate_test.go b/tsdb/tsm1/predicate_test.go new file mode 100644 index 0000000000..5bce8802cb --- /dev/null +++ b/tsdb/tsm1/predicate_test.go @@ -0,0 +1,165 @@ +package tsm1 + +import ( + "fmt" + "testing" + + "github.com/influxdata/influxdb/storage/reads/datatypes" +) + +func TestPredicate(t *testing.T) { + cases := []struct { + Name string + Predicate *datatypes.Predicate + Key string + Matches bool + }{ + { + Name: "Basic Matching", + Predicate: predicate( + comparisonNode(datatypes.ComparisonEqual, tagNode("tag3"), stringNode("val3")), + ), + Key: "bucketorg,tag3=val3", + Matches: true, + }, + + { + Name: "Basic Unmatching", + Predicate: predicate( + comparisonNode(datatypes.ComparisonEqual, tagNode("tag3"), stringNode("val3")), + ), + Key: "bucketorg,tag3=val2", + Matches: false, + }, + + { + Name: "Compound Logical Matching", + Predicate: predicate( + orNode( + andNode( + comparisonNode(datatypes.ComparisonEqual, tagNode("foo"), stringNode("bar")), + comparisonNode(datatypes.ComparisonEqual, tagNode("baz"), stringNode("no")), + ), + comparisonNode(datatypes.ComparisonEqual, tagNode("tag3"), stringNode("val3")), + ), + ), + Key: "bucketorg,foo=bar,baz=bif,tag3=val3", + Matches: true, + }, + + { + Name: "Compound Logical Unmatching", + Predicate: predicate( + orNode( + andNode( + comparisonNode(datatypes.ComparisonEqual, tagNode("foo"), stringNode("bar")), + comparisonNode(datatypes.ComparisonEqual, tagNode("baz"), stringNode("no")), + ), + comparisonNode(datatypes.ComparisonEqual, tagNode("tag3"), stringNode("val3")), + ), + ), + Key: "bucketorg,foo=bar,baz=bif,tag3=val2", + Matches: false, + }, + } + + for _, test := range cases { + t.Run(test.Name, func(t *testing.T) { + pred, err := NewProtobufPredicate(test.Predicate) + if err != nil { + t.Fatal("compile failure:", err) + } + + if got, exp := pred.Matches([]byte(test.Key)), test.Matches; got != exp { + t.Fatal("match failure:", "got", got, "!=", "exp", exp) + } + }) + } +} + +func BenchmarkPredicate(b *testing.B) { + run := func(b *testing.B, predicate *datatypes.Predicate) { + pred, err := NewProtobufPredicate(predicate) + if err != nil { + b.Fatal(err) + } + + series := []byte("bucketorg,") + for i := 0; i < 10; i++ { + series = append(series, fmt.Sprintf("tag%d=val%d,", i, i)...) + } + series = series[:len(series)-1] + + b.SetBytes(int64(len(series))) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + pred.Matches(series) + } + } + + b.Run("Basic", func(b *testing.B) { + run(b, predicate( + comparisonNode(datatypes.ComparisonEqual, tagNode("tag5"), stringNode("val5")), + )) + }) + + b.Run("Compound", func(b *testing.B) { + run(b, predicate( + orNode( + andNode( + comparisonNode(datatypes.ComparisonEqual, tagNode("tag0"), stringNode("val0")), + comparisonNode(datatypes.ComparisonEqual, tagNode("tag6"), stringNode("val5")), + ), + comparisonNode(datatypes.ComparisonEqual, tagNode("tag5"), stringNode("val5")), + ), + )) + }) +} + +// +// Helpers to create predicate protobufs +// + +func tagNode(s string) *datatypes.Node { + return &datatypes.Node{ + NodeType: datatypes.NodeTypeTagRef, + Value: &datatypes.Node_TagRefValue{TagRefValue: s}, + } +} + +func stringNode(s string) *datatypes.Node { + return &datatypes.Node{ + NodeType: datatypes.NodeTypeLiteral, + Value: &datatypes.Node_StringValue{StringValue: s}, + } +} + +func comparisonNode(comp datatypes.Node_Comparison, left, right *datatypes.Node) *datatypes.Node { + return &datatypes.Node{ + NodeType: datatypes.NodeTypeComparisonExpression, + Value: &datatypes.Node_Comparison_{Comparison: comp}, + Children: []*datatypes.Node{left, right}, + } +} + +func andNode(left, right *datatypes.Node) *datatypes.Node { + return &datatypes.Node{ + NodeType: datatypes.NodeTypeLogicalExpression, + Value: &datatypes.Node_Logical_{Logical: datatypes.LogicalAnd}, + Children: []*datatypes.Node{left, right}, + } +} + +func orNode(left, right *datatypes.Node) *datatypes.Node { + return &datatypes.Node{ + NodeType: datatypes.NodeTypeLogicalExpression, + Value: &datatypes.Node_Logical_{Logical: datatypes.LogicalOr}, + Children: []*datatypes.Node{left, right}, + } +} + +func predicate(root *datatypes.Node) *datatypes.Predicate { + return &datatypes.Predicate{Root: root} +}