pvt #59002188. add points filtering to the engine package.

pull/17/head
John Shahid 2013-10-16 16:27:26 -04:00
parent af21903e19
commit 938029e3c5
5 changed files with 343 additions and 14 deletions

12
src/common/helpers.go Normal file
View File

@ -0,0 +1,12 @@
package common
import (
"encoding/json"
"protocol"
)
func StringToSeriesArray(seriesString string) ([]*protocol.Series, error) {
series := []*protocol.Series{}
err := json.Unmarshal([]byte(seriesString), &series)
return series, err
}

View File

@ -0,0 +1,171 @@
package engine
import (
"fmt"
"protocol"
"regexp"
)
type BooleanOperation func(leftType, rightType protocol.FieldDefinition_Type, leftValue, rightValue *protocol.FieldValue) (bool, error)
var (
registeredOperators = map[string]BooleanOperation{}
)
func init() {
registeredOperators["=="] = EqualityOperator
registeredOperators["!="] = not(EqualityOperator)
registeredOperators[">="] = GreaterThanOrEqualOperator
registeredOperators[">"] = GreaterThanOperator
registeredOperators["<"] = not(GreaterThanOrEqualOperator)
registeredOperators[">"] = not(GreaterThanOperator)
registeredOperators["~="] = RegexMatcherOperator
}
func not(op BooleanOperation) BooleanOperation {
return func(leftType, rightType protocol.FieldDefinition_Type, leftValue, rightValue *protocol.FieldValue) (bool, error) {
ok, err := op(leftType, rightType, leftValue, rightValue)
return !ok, err
}
}
func commonType(leftType, rightType protocol.FieldDefinition_Type) (protocol.FieldDefinition_Type, error) {
switch leftType {
case protocol.FieldDefinition_INT64, protocol.FieldDefinition_INT32:
switch rightType {
case protocol.FieldDefinition_INT32, protocol.FieldDefinition_INT64:
return protocol.FieldDefinition_INT64, nil
case protocol.FieldDefinition_DOUBLE:
return protocol.FieldDefinition_DOUBLE, nil
}
case protocol.FieldDefinition_DOUBLE:
switch rightType {
case protocol.FieldDefinition_INT64, protocol.FieldDefinition_INT32, protocol.FieldDefinition_DOUBLE:
return protocol.FieldDefinition_DOUBLE, nil
}
case protocol.FieldDefinition_BOOL:
if rightType == protocol.FieldDefinition_BOOL {
return protocol.FieldDefinition_BOOL, nil
}
case protocol.FieldDefinition_STRING:
if rightType == protocol.FieldDefinition_STRING {
return protocol.FieldDefinition_STRING, nil
}
}
return 0, fmt.Errorf("%v and %v cannot be coerced to a common type", leftType, rightType)
}
func coerceValue(value *protocol.FieldValue, fromType, toType protocol.FieldDefinition_Type) *protocol.FieldValue {
switch toType {
case protocol.FieldDefinition_INT64:
switch fromType {
case protocol.FieldDefinition_INT64:
return value
case protocol.FieldDefinition_INT32:
temp := int64(*value.IntValue)
return &protocol.FieldValue{Int64Value: &temp}
}
case protocol.FieldDefinition_DOUBLE:
switch fromType {
case protocol.FieldDefinition_DOUBLE:
return value
case protocol.FieldDefinition_INT64:
temp := float64(*value.Int64Value)
return &protocol.FieldValue{DoubleValue: &temp}
case protocol.FieldDefinition_INT32:
temp := float64(*value.IntValue)
return &protocol.FieldValue{DoubleValue: &temp}
}
}
return value
}
func EqualityOperator(leftType, rightType protocol.FieldDefinition_Type, leftValue, rightValue *protocol.FieldValue) (bool, error) {
cType, err := commonType(leftType, rightType)
if err != nil {
return false, err
}
leftValue = coerceValue(leftValue, leftType, cType)
rightValue = coerceValue(rightValue, rightType, cType)
switch cType {
case protocol.FieldDefinition_STRING:
return *leftValue.StringValue == *rightValue.StringValue, nil
case protocol.FieldDefinition_INT64:
return *leftValue.Int64Value == *rightValue.Int64Value, nil
case protocol.FieldDefinition_DOUBLE:
return *leftValue.DoubleValue == *rightValue.DoubleValue, nil
case protocol.FieldDefinition_BOOL:
return *leftValue.BoolValue == *rightValue.BoolValue, nil
default:
return false, fmt.Errorf("Unknown type %v", cType)
}
}
func RegexMatcherOperator(leftType, rightType protocol.FieldDefinition_Type, leftValue, rightValue *protocol.FieldValue) (bool, error) {
cType, err := commonType(leftType, rightType)
if err != nil {
return false, err
}
leftValue = coerceValue(leftValue, leftType, cType)
rightValue = coerceValue(rightValue, rightType, cType)
switch cType {
case protocol.FieldDefinition_STRING:
// TODO: do case insensitive matching
return regexp.MatchString(*rightValue.StringValue, *leftValue.StringValue)
default:
return false, fmt.Errorf("Cannot use regex matcher with type %v", cType)
}
}
func GreaterThanOrEqualOperator(leftType, rightType protocol.FieldDefinition_Type, leftValue, rightValue *protocol.FieldValue) (bool, error) {
cType, err := commonType(leftType, rightType)
if err != nil {
return false, err
}
leftValue = coerceValue(leftValue, leftType, cType)
rightValue = coerceValue(rightValue, rightType, cType)
switch cType {
case protocol.FieldDefinition_STRING:
return *leftValue.StringValue >= *rightValue.StringValue, nil
case protocol.FieldDefinition_INT64:
return *leftValue.Int64Value >= *rightValue.Int64Value, nil
case protocol.FieldDefinition_DOUBLE:
return *leftValue.DoubleValue >= *rightValue.DoubleValue, nil
default:
return false, fmt.Errorf("Cannot use >= with type %v", cType)
}
}
func GreaterThanOperator(leftType, rightType protocol.FieldDefinition_Type, leftValue, rightValue *protocol.FieldValue) (bool, error) {
cType, err := commonType(leftType, rightType)
if err != nil {
return false, err
}
leftValue = coerceValue(leftValue, leftType, cType)
rightValue = coerceValue(rightValue, rightType, cType)
switch cType {
case protocol.FieldDefinition_STRING:
return *leftValue.StringValue > *rightValue.StringValue, nil
case protocol.FieldDefinition_INT64:
return *leftValue.Int64Value > *rightValue.Int64Value, nil
case protocol.FieldDefinition_DOUBLE:
return *leftValue.DoubleValue > *rightValue.DoubleValue, nil
default:
return false, fmt.Errorf("Cannot use >= with type %v", cType)
}
}

View File

@ -38,15 +38,9 @@ func (self *MockCoordinator) WriteSeriesData(database string, series *protocol.S
return nil
}
func stringToSeriesArray(seriesString string, c *C) []*protocol.Series {
series := []*protocol.Series{}
err := json.Unmarshal([]byte(seriesString), &series)
c.Assert(err, IsNil)
return series
}
func createEngine(c *C, seriesString string) EngineI {
series := stringToSeriesArray(seriesString, c)
series, err := common.StringToSeriesArray(seriesString)
c.Assert(err, IsNil)
engine, err := NewQueryEngine(&MockCoordinator{
series: series,
@ -82,7 +76,8 @@ func runQuery(engine EngineI, query string, c *C, expectedSeries string) {
c.Assert(err, IsNil)
series := stringToSeriesArray(expectedSeries, c)
series, err := common.StringToSeriesArray(expectedSeries)
c.Assert(err, IsNil)
if !reflect.DeepEqual(result, series) {
resultData, _ := json.MarshalIndent(result, "", " ")
@ -910,3 +905,61 @@ func (self *EngineSuite) TestPercentileQueryWithOutOfBoundNumericArguments(c *C)
runQueryRunError(engine, "select percentile(column_one, 0) from foo group by time(1m);", c, err)
runQueryRunError(engine, "select percentile(column_one, 105) from foo group by time(1m);", c, err)
}
func (self *EngineSuite) TestEqualityFiltering(c *C) {
queryStr := "select * from t where column_one == 100 and column_two != 6;"
query, err := parser.ParseQuery(queryStr)
c.Assert(err, IsNil)
series, err := common.StringToSeriesArray(`
[
{
"points": [
{"values": [{"int_value": 100},{"int_value": 5 }], "timestamp": 1381346631, "sequence_number": 1},
{"values": [{"int_value": 100},{"int_value": 6 }], "timestamp": 1381346631, "sequence_number": 1},
{"values": [{"int_value": 90 },{"int_value": 15}], "timestamp": 1381346632, "sequence_number": 1}
],
"name": "t",
"fields": [
{"type": "INT32", "name": "column_one"},
{"type": "INT32", "name": "column_two"}
]
}
]
`)
c.Assert(err, IsNil)
result, err := Filter(query, series[0])
c.Assert(err, IsNil)
c.Assert(result, NotNil)
c.Assert(result.Points, HasLen, 1)
c.Assert(*result.Points[0].Values[0].IntValue, Equals, int32(100))
c.Assert(*result.Points[0].Values[1].IntValue, Equals, int32(5))
}
func (self *EngineSuite) TestInequalityFiltering(c *C) {
queryStr := "select * from t where column_one >= 100 and column_two > 6;"
query, err := parser.ParseQuery(queryStr)
c.Assert(err, IsNil)
series, err := common.StringToSeriesArray(`
[
{
"points": [
{"values": [{"int_value": 100},{"int_value": 7 }], "timestamp": 1381346631, "sequence_number": 1},
{"values": [{"int_value": 100},{"int_value": 6 }], "timestamp": 1381346631, "sequence_number": 1},
{"values": [{"int_value": 90 },{"int_value": 15}], "timestamp": 1381346632, "sequence_number": 1}
],
"name": "t",
"fields": [
{"type": "INT32", "name": "column_one"},
{"type": "INT32", "name": "column_two"}
]
}
]
`)
c.Assert(err, IsNil)
result, err := Filter(query, series[0])
c.Assert(err, IsNil)
c.Assert(result, NotNil)
c.Assert(result.Points, HasLen, 1)
c.Assert(*result.Points[0].Values[0].IntValue, Equals, int32(100))
c.Assert(*result.Points[0].Values[1].IntValue, Equals, int32(6))
}

98
src/engine/filtering.go Normal file
View File

@ -0,0 +1,98 @@
package engine
import (
"fmt"
"parser"
"protocol"
"strconv"
)
func getExpressionValue(expr *parser.Expression, fields []*protocol.FieldDefinition, point *protocol.Point) (
protocol.FieldDefinition_Type, *protocol.FieldValue, error) {
if value, ok := expr.GetLeftValue(); ok {
switch value.Type {
case parser.ValueFunctionCall:
return 0, nil, fmt.Errorf("Cannot process function call %s in expression", value.Name)
case parser.ValueFloat:
value, _ := strconv.ParseFloat(value.Name, 64)
return protocol.FieldDefinition_DOUBLE, &protocol.FieldValue{DoubleValue: &value}, nil
case parser.ValueInt:
value, _ := strconv.ParseInt(value.Name, 10, 64)
return protocol.FieldDefinition_INT64, &protocol.FieldValue{Int64Value: &value}, nil
case parser.ValueString:
return protocol.FieldDefinition_STRING, &protocol.FieldValue{StringValue: &value.Name}, nil
case parser.ValueSimpleName:
// TODO: optimize this so we don't have to lookup the column everytime
fieldIdx := -1
for idx, field := range fields {
if *field.Name == value.Name {
fieldIdx = idx
break
}
}
if fieldIdx == -1 {
return 0, nil, fmt.Errorf("Cannot find column %s", value.Name)
}
return *fields[fieldIdx].Type, point.Values[fieldIdx], nil
}
}
return 0, nil, fmt.Errorf("Cannot evaulate expression")
}
func matchesExpression(expr *parser.BoolExpression, fields []*protocol.FieldDefinition, point *protocol.Point) (bool, error) {
leftType, leftValue, err := getExpressionValue(expr.Left, fields, point)
if err != nil {
return false, err
}
rightType, rightValue, err := getExpressionValue(expr.Right, fields, point)
if err != nil {
return false, err
}
operator := registeredOperators[expr.Operation]
return operator(leftType, rightType, leftValue, rightValue)
}
func matches(condition *parser.WhereCondition, fields []*protocol.FieldDefinition, point *protocol.Point) (bool, error) {
if expr, ok := condition.GetBoolExpression(); ok {
return matchesExpression(expr, fields, point)
}
left, _ := condition.GetLeftWhereCondition()
leftResult, err := matches(left, fields, point)
if err != nil {
return false, err
}
// short circuit
if !leftResult && condition.Operation == "AND" ||
leftResult && condition.Operation == "OR" {
return leftResult, nil
}
return matches(condition.Right, fields, point)
}
func Filter(query *parser.Query, series *protocol.Series) (*protocol.Series, error) {
if query.GetWhereCondition() == nil {
return series, nil
}
points := series.Points
series.Points = nil
for _, point := range points {
ok, err := matches(query.GetWhereCondition(), series.Fields, point)
if err != nil {
return nil, err
}
if ok {
series.Points = append(series.Points, point)
}
}
return series, nil
}

View File

@ -2,7 +2,6 @@ package parser
import (
"fmt"
"protocol"
"sort"
"strconv"
"strings"
@ -22,10 +21,6 @@ var (
}
)
func (self *Query) Filter(series *protocol.Series) *protocol.Series {
return nil
}
// parse time expressions, e.g. now() - 1d
func parseTime(expr *Expression) (int64, error) {
if value, ok := expr.GetLeftValue(); ok {