share some more code with reads

pull/10616/head
Jeff Wendling 2018-10-08 15:12:16 -06:00
parent 6256926563
commit 5fa04a8d56
4 changed files with 276 additions and 285 deletions

View File

@ -3,15 +3,23 @@ package reads
import (
"bytes"
"fmt"
"regexp"
"strconv"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/storage/reads/datatypes"
"github.com/influxdata/platform/tsdb"
"github.com/pkg/errors"
)
const (
fieldKey = "_field"
measurementKey = "_measurement"
valueKey = "_value"
)
// NodeVisitor can be called by Walk to traverse the Node hierarchy.
// The Visit() function is called once per node.
type NodeVisitor interface {
@ -244,12 +252,6 @@ func toStoragePredicateHelper(n semantic.Expression, objectName string) (*dataty
},
}, nil
case *semantic.MemberExpression:
const (
fieldKey = "_field"
measurementKey = "_measurement"
valueKey = "_value"
)
// Sanity check that the object is the objectName identifier
if ident, ok := n.Object.(*semantic.IdentifierExpression); !ok || ident.Name != objectName {
return nil, fmt.Errorf("unknown object %q", n.Object)
@ -317,3 +319,266 @@ func toComparisonOperator(o ast.OperatorKind) (datatypes.Node_Comparison, error)
return 0, fmt.Errorf("unknown operator %v", o)
}
}
var measurementRemap = map[string]string{"_measurement": "_name"}
// NodeToExpr transforms a predicate node to an influxql.Expr.
func NodeToExpr(node *datatypes.Node, remap map[string]string) (influxql.Expr, error) {
v := &nodeToExprVisitor{remap: remap}
WalkNode(v, node)
if err := v.Err(); err != nil {
return nil, err
}
if len(v.exprs) > 1 {
return nil, errors.New("invalid expression")
}
if len(v.exprs) == 0 {
return nil, nil
}
// TODO(edd): It would be preferable if RewriteRegexConditions was a
// package level function in influxql.
stmt := &influxql.SelectStatement{
Condition: v.exprs[0],
}
stmt.RewriteRegexConditions()
return stmt.Condition, nil
}
type nodeToExprVisitor struct {
remap map[string]string
exprs []influxql.Expr
err error
}
func (v *nodeToExprVisitor) Visit(n *datatypes.Node) NodeVisitor {
if v.err != nil {
return nil
}
switch n.NodeType {
case datatypes.NodeTypeLogicalExpression:
if len(n.Children) > 1 {
op := influxql.AND
if n.GetLogical() == datatypes.LogicalOr {
op = influxql.OR
}
WalkNode(v, n.Children[0])
if v.err != nil {
return nil
}
for i := 1; i < len(n.Children); i++ {
WalkNode(v, n.Children[i])
if v.err != nil {
return nil
}
if len(v.exprs) >= 2 {
lhs, rhs := v.pop2()
v.exprs = append(v.exprs, &influxql.BinaryExpr{LHS: lhs, Op: op, RHS: rhs})
}
}
return nil
}
case datatypes.NodeTypeParenExpression:
if len(n.Children) != 1 {
v.err = errors.New("ParenExpression expects one child")
return nil
}
WalkNode(v, n.Children[0])
if v.err != nil {
return nil
}
if len(v.exprs) > 0 {
v.exprs = append(v.exprs, &influxql.ParenExpr{Expr: v.pop()})
}
return nil
case datatypes.NodeTypeComparisonExpression:
WalkChildren(v, n)
if len(v.exprs) < 2 {
v.err = errors.New("ComparisonExpression expects two children")
return nil
}
lhs, rhs := v.pop2()
be := &influxql.BinaryExpr{LHS: lhs, RHS: rhs}
switch n.GetComparison() {
case datatypes.ComparisonEqual:
be.Op = influxql.EQ
case datatypes.ComparisonNotEqual:
be.Op = influxql.NEQ
case datatypes.ComparisonStartsWith:
// TODO(sgc): rewrite to anchored RE, as index does not support startsWith yet
v.err = errors.New("startsWith not implemented")
return nil
case datatypes.ComparisonRegex:
be.Op = influxql.EQREGEX
case datatypes.ComparisonNotRegex:
be.Op = influxql.NEQREGEX
case datatypes.ComparisonLess:
be.Op = influxql.LT
case datatypes.ComparisonLessEqual:
be.Op = influxql.LTE
case datatypes.ComparisonGreater:
be.Op = influxql.GT
case datatypes.ComparisonGreaterEqual:
be.Op = influxql.GTE
default:
v.err = errors.New("invalid comparison operator")
return nil
}
v.exprs = append(v.exprs, be)
return nil
case datatypes.NodeTypeTagRef:
ref := n.GetTagRefValue()
if v.remap != nil {
if nk, ok := v.remap[ref]; ok {
ref = nk
}
}
v.exprs = append(v.exprs, &influxql.VarRef{Val: ref, Type: influxql.Tag})
return nil
case datatypes.NodeTypeFieldRef:
v.exprs = append(v.exprs, &influxql.VarRef{Val: "$"})
return nil
case datatypes.NodeTypeLiteral:
switch val := n.Value.(type) {
case *datatypes.Node_StringValue:
v.exprs = append(v.exprs, &influxql.StringLiteral{Val: val.StringValue})
case *datatypes.Node_RegexValue:
// TODO(sgc): consider hashing the RegexValue and cache compiled version
re, err := regexp.Compile(val.RegexValue)
if err != nil {
v.err = err
}
v.exprs = append(v.exprs, &influxql.RegexLiteral{Val: re})
return nil
case *datatypes.Node_IntegerValue:
v.exprs = append(v.exprs, &influxql.IntegerLiteral{Val: val.IntegerValue})
case *datatypes.Node_UnsignedValue:
v.exprs = append(v.exprs, &influxql.UnsignedLiteral{Val: val.UnsignedValue})
case *datatypes.Node_FloatValue:
v.exprs = append(v.exprs, &influxql.NumberLiteral{Val: val.FloatValue})
case *datatypes.Node_BooleanValue:
v.exprs = append(v.exprs, &influxql.BooleanLiteral{Val: val.BooleanValue})
default:
v.err = errors.New("unexpected literal type")
return nil
}
return nil
default:
return v
}
return nil
}
func (v *nodeToExprVisitor) Err() error {
return v.err
}
func (v *nodeToExprVisitor) pop() influxql.Expr {
if len(v.exprs) == 0 {
panic("stack empty")
}
var top influxql.Expr
top, v.exprs = v.exprs[len(v.exprs)-1], v.exprs[:len(v.exprs)-1]
return top
}
func (v *nodeToExprVisitor) pop2() (influxql.Expr, influxql.Expr) {
if len(v.exprs) < 2 {
panic("stack empty")
}
rhs := v.exprs[len(v.exprs)-1]
lhs := v.exprs[len(v.exprs)-2]
v.exprs = v.exprs[:len(v.exprs)-2]
return lhs, rhs
}
func IsTrueBooleanLiteral(expr influxql.Expr) bool {
b, ok := expr.(*influxql.BooleanLiteral)
if ok {
return b.Val
}
return false
}
func RewriteExprRemoveFieldValue(expr influxql.Expr) influxql.Expr {
return influxql.RewriteExpr(expr, func(expr influxql.Expr) influxql.Expr {
if be, ok := expr.(*influxql.BinaryExpr); ok {
if ref, ok := be.LHS.(*influxql.VarRef); ok {
if ref.Val == "$" {
return &influxql.BooleanLiteral{Val: true}
}
}
}
return expr
})
}
type hasRefs struct {
refs []string
found []bool
}
func (v *hasRefs) allFound() bool {
for _, val := range v.found {
if !val {
return false
}
}
return true
}
func (v *hasRefs) Visit(node influxql.Node) influxql.Visitor {
if v.allFound() {
return nil
}
if n, ok := node.(*influxql.VarRef); ok {
for i, r := range v.refs {
if !v.found[i] && r == n.Val {
v.found[i] = true
if v.allFound() {
return nil
}
}
}
}
return v
}
func HasFieldValueKey(expr influxql.Expr) bool {
refs := hasRefs{refs: []string{valueKey}, found: make([]bool, 1)}
influxql.Walk(&refs, expr)
return refs.found[0]
}

View File

@ -5,7 +5,6 @@ import "github.com/influxdata/platform/models"
const (
fieldKey = "_field"
measurementKey = "_measurement"
valueKey = "_value"
)
var (

View File

@ -51,16 +51,16 @@ func newIndexSeriesCursor(ctx context.Context, src *readSource, req *datatypes.R
mi := tsdb.NewMeasurementSliceIterator([][]byte{m})
if root := req.Predicate.GetRoot(); root != nil {
if p.cond, err = nodeToExpr(root, nil); err != nil {
if p.cond, err = reads.NodeToExpr(root, nil); err != nil {
return nil, err
}
p.hasValueExpr = hasFieldValueKey(p.cond)
p.hasValueExpr = reads.HasFieldValueKey(p.cond)
if !p.hasValueExpr {
opt.Condition = p.cond
} else {
opt.Condition = influxql.Reduce(rewriteExprRemoveFieldValue(influxql.CloneExpr(p.cond)), nil)
if isTrueBooleanLiteral(opt.Condition) {
opt.Condition = influxql.Reduce(reads.RewriteExprRemoveFieldValue(influxql.CloneExpr(p.cond)), nil)
if reads.IsTrueBooleanLiteral(opt.Condition) {
opt.Condition = nil
}
}
@ -121,7 +121,7 @@ func (c *indexSeriesCursor) Next() *reads.SeriesRow {
if c.cond != nil && c.hasValueExpr {
// TODO(sgc): lazily evaluate valueCond
c.row.ValueCond = influxql.Reduce(c.cond, c)
if isTrueBooleanLiteral(c.row.ValueCond) {
if reads.IsTrueBooleanLiteral(c.row.ValueCond) {
// we've reduced the expression to "true"
c.row.ValueCond = nil
}

View File

@ -1,273 +0,0 @@
package readservice
import (
"errors"
"regexp"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/storage/reads"
"github.com/influxdata/platform/storage/reads/datatypes"
)
var measurementRemap = map[string]string{"_measurement": "_name"}
// nodeToExpr transforms a predicate node to an influxql.Expr.
func nodeToExpr(node *datatypes.Node, remap map[string]string) (influxql.Expr, error) {
v := &nodeToExprVisitor{remap: remap}
reads.WalkNode(v, node)
if err := v.Err(); err != nil {
return nil, err
}
if len(v.exprs) > 1 {
return nil, errors.New("invalid expression")
}
if len(v.exprs) == 0 {
return nil, nil
}
// TODO(edd): It would be preferable if RewriteRegexConditions was a
// package level function in influxql.
stmt := &influxql.SelectStatement{
Condition: v.exprs[0],
}
stmt.RewriteRegexConditions()
return stmt.Condition, nil
}
type nodeToExprVisitor struct {
remap map[string]string
exprs []influxql.Expr
err error
}
func (v *nodeToExprVisitor) Visit(n *datatypes.Node) reads.NodeVisitor {
if v.err != nil {
return nil
}
switch n.NodeType {
case datatypes.NodeTypeLogicalExpression:
if len(n.Children) > 1 {
op := influxql.AND
if n.GetLogical() == datatypes.LogicalOr {
op = influxql.OR
}
reads.WalkNode(v, n.Children[0])
if v.err != nil {
return nil
}
for i := 1; i < len(n.Children); i++ {
reads.WalkNode(v, n.Children[i])
if v.err != nil {
return nil
}
if len(v.exprs) >= 2 {
lhs, rhs := v.pop2()
v.exprs = append(v.exprs, &influxql.BinaryExpr{LHS: lhs, Op: op, RHS: rhs})
}
}
return nil
}
case datatypes.NodeTypeParenExpression:
if len(n.Children) != 1 {
v.err = errors.New("ParenExpression expects one child")
return nil
}
reads.WalkNode(v, n.Children[0])
if v.err != nil {
return nil
}
if len(v.exprs) > 0 {
v.exprs = append(v.exprs, &influxql.ParenExpr{Expr: v.pop()})
}
return nil
case datatypes.NodeTypeComparisonExpression:
reads.WalkChildren(v, n)
if len(v.exprs) < 2 {
v.err = errors.New("ComparisonExpression expects two children")
return nil
}
lhs, rhs := v.pop2()
be := &influxql.BinaryExpr{LHS: lhs, RHS: rhs}
switch n.GetComparison() {
case datatypes.ComparisonEqual:
be.Op = influxql.EQ
case datatypes.ComparisonNotEqual:
be.Op = influxql.NEQ
case datatypes.ComparisonStartsWith:
// TODO(sgc): rewrite to anchored RE, as index does not support startsWith yet
v.err = errors.New("startsWith not implemented")
return nil
case datatypes.ComparisonRegex:
be.Op = influxql.EQREGEX
case datatypes.ComparisonNotRegex:
be.Op = influxql.NEQREGEX
case datatypes.ComparisonLess:
be.Op = influxql.LT
case datatypes.ComparisonLessEqual:
be.Op = influxql.LTE
case datatypes.ComparisonGreater:
be.Op = influxql.GT
case datatypes.ComparisonGreaterEqual:
be.Op = influxql.GTE
default:
v.err = errors.New("invalid comparison operator")
return nil
}
v.exprs = append(v.exprs, be)
return nil
case datatypes.NodeTypeTagRef:
ref := n.GetTagRefValue()
if v.remap != nil {
if nk, ok := v.remap[ref]; ok {
ref = nk
}
}
v.exprs = append(v.exprs, &influxql.VarRef{Val: ref, Type: influxql.Tag})
return nil
case datatypes.NodeTypeFieldRef:
v.exprs = append(v.exprs, &influxql.VarRef{Val: "$"})
return nil
case datatypes.NodeTypeLiteral:
switch val := n.Value.(type) {
case *datatypes.Node_StringValue:
v.exprs = append(v.exprs, &influxql.StringLiteral{Val: val.StringValue})
case *datatypes.Node_RegexValue:
// TODO(sgc): consider hashing the RegexValue and cache compiled version
re, err := regexp.Compile(val.RegexValue)
if err != nil {
v.err = err
}
v.exprs = append(v.exprs, &influxql.RegexLiteral{Val: re})
return nil
case *datatypes.Node_IntegerValue:
v.exprs = append(v.exprs, &influxql.IntegerLiteral{Val: val.IntegerValue})
case *datatypes.Node_UnsignedValue:
v.exprs = append(v.exprs, &influxql.UnsignedLiteral{Val: val.UnsignedValue})
case *datatypes.Node_FloatValue:
v.exprs = append(v.exprs, &influxql.NumberLiteral{Val: val.FloatValue})
case *datatypes.Node_BooleanValue:
v.exprs = append(v.exprs, &influxql.BooleanLiteral{Val: val.BooleanValue})
default:
v.err = errors.New("unexpected literal type")
return nil
}
return nil
default:
return v
}
return nil
}
func (v *nodeToExprVisitor) Err() error {
return v.err
}
func (v *nodeToExprVisitor) pop() influxql.Expr {
if len(v.exprs) == 0 {
panic("stack empty")
}
var top influxql.Expr
top, v.exprs = v.exprs[len(v.exprs)-1], v.exprs[:len(v.exprs)-1]
return top
}
func (v *nodeToExprVisitor) pop2() (influxql.Expr, influxql.Expr) {
if len(v.exprs) < 2 {
panic("stack empty")
}
rhs := v.exprs[len(v.exprs)-1]
lhs := v.exprs[len(v.exprs)-2]
v.exprs = v.exprs[:len(v.exprs)-2]
return lhs, rhs
}
func isTrueBooleanLiteral(expr influxql.Expr) bool {
b, ok := expr.(*influxql.BooleanLiteral)
if ok {
return b.Val
}
return false
}
func rewriteExprRemoveFieldValue(expr influxql.Expr) influxql.Expr {
return influxql.RewriteExpr(expr, func(expr influxql.Expr) influxql.Expr {
if be, ok := expr.(*influxql.BinaryExpr); ok {
if ref, ok := be.LHS.(*influxql.VarRef); ok {
if ref.Val == "$" {
return &influxql.BooleanLiteral{Val: true}
}
}
}
return expr
})
}
type hasRefs struct {
refs []string
found []bool
}
func (v *hasRefs) allFound() bool {
for _, val := range v.found {
if !val {
return false
}
}
return true
}
func (v *hasRefs) Visit(node influxql.Node) influxql.Visitor {
if v.allFound() {
return nil
}
if n, ok := node.(*influxql.VarRef); ok {
for i, r := range v.refs {
if !v.found[i] && r == n.Val {
v.found[i] = true
if v.allFound() {
return nil
}
}
}
}
return v
}
func hasFieldValueKey(expr influxql.Expr) bool {
refs := hasRefs{refs: []string{valueKey}, found: make([]bool, 1)}
influxql.Walk(&refs, expr)
return refs.found[0]
}