From a41e539458ee0bb6edac31b5fce4ae675e618879 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 30 Nov 2014 15:52:00 -0700 Subject: [PATCH] Initial query planner. --- database_test.go | 9 ++ engine.go | 230 +++++++++++++++++++++++++++++++++++++++++++++++ engine_test.go | 115 ++++++++++++++++++++++++ influxql/ast.go | 11 +++ 4 files changed, 365 insertions(+) create mode 100644 engine.go create mode 100644 engine_test.go diff --git a/database_test.go b/database_test.go index 221eb56bb4..b3ce5c4273 100644 --- a/database_test.go +++ b/database_test.go @@ -436,3 +436,12 @@ func mustParseQuery(s string) *influxql.Query { } return q } + +// mustParseSelectStatement parses a single select statement. +func mustParseSelectStatement(s string) *influxql.SelectStatement { + stmt, err := influxql.NewParser(strings.NewReader(s)).ParseStatement() + if err != nil { + panic(err.Error()) + } + return stmt.(*influxql.SelectStatement) +} diff --git a/engine.go b/engine.go new file mode 100644 index 0000000000..c5a9a45714 --- /dev/null +++ b/engine.go @@ -0,0 +1,230 @@ +package influxdb + +import ( + "strings" + + "github.com/influxdb/influxdb/influxql" +) + +// Planner creates an execution plan for an InfluxQL statement. +type Planner struct { + NewIteratorFunc func(name string, tags map[string]string) []Iterator +} + +// Plan generates an executable plan for a SELECT statement. +func (p *Planner) Plan(stmt *influxql.SelectStatement) Executor { + // Create a new executor. + e := &executor{ + stmt: stmt, + newIteratorFunc: p.NewIteratorFunc, + } + + // Generate mappers and reducers for each field. + e.mappers = make([]mapper, len(stmt.Fields)) + e.reducers = make([]reducer, len(stmt.Fields)) + + for i, f := range stmt.Fields { + switch expr := f.Expr.(type) { + case *influxql.Call: + switch strings.ToLower(expr.Name) { + case "count": + e.mappers[i] = &countMapper{} + e.reducers[i] = &sumReducer{} + case "sum": + e.mappers[i] = &evalMapper{expr: expr.Args[0]} + e.reducers[i] = &sumReducer{} + default: + panic("pending: non-count calls") + } + default: + e.mappers[i] = &evalMapper{expr: f.Expr} + } + } + + return e +} + +// Executor represents an execution plan that can be run. +type Executor interface { + Execute() (<-chan []interface{}, error) +} + +type executor struct { + stmt *influxql.SelectStatement + mappers []mapper + reducers []reducer + newIteratorFunc func(name string, tags map[string]string) []Iterator +} + +func (e *executor) Execute() (<-chan []interface{}, error) { + // Retrieve a list of iterators. + // TODO: Support multiple sources. + iterators := e.newIteratorFunc(e.stmt.Source.(*influxql.Series).Name, nil) + + // Reduce intermediate data to our final dataset. + result := make(chan []interface{}, 0) + go func() { + // Execute the mappers for every element in the iterator. + intermediate := e.executeMappers(iterators) + + // TODO: Sort, if specified. + + // If the statement is aggregated then execute reducers. + // Otherwise stream raw data rows as-is. + if e.stmt.Aggregated() { + e.executeReducers(intermediate, result) + } else { + for _, values := range intermediate { + for _, row := range values { + result <- row + } + } + } + + // Close the result channel to notify the caller of the end. + close(result) + }() + + return result, nil +} + +// executes mappers to generate data for each field. +func (e *executor) executeMappers(iterators []Iterator) map[string][][]interface{} { + intermediate := make(map[string][][]interface{}) + for _, itr := range iterators { + for p := itr.Next(); p != nil; p = itr.Next() { + // Generate an intermediate row with the mappers. + value := make([]interface{}, len(e.mappers)) + for i, m := range e.mappers { + value[i] = m.Map(p) + } + + // Append row to the key in the intermediate data. + var key = "" // TODO: Generate key. + intermediate[key] = append(intermediate[key], value) + } + } + return intermediate +} + +// executes reducers to combine data by key and then sends the data to the result channel. +func (e *executor) executeReducers(intermediate map[string][][]interface{}, result chan []interface{}) { + for _, values := range intermediate { + row := make([]interface{}, len(e.reducers)) + for i, mr := range e.reducers { + row[i] = mr.Reduce(values, i) + } + result <- row + } +} + +// Iterator represents an object that can iterate over raw points. +type Iterator interface { + Next() Point +} + +// Point represents a timeseries data point with a timestamp and values. +type Point interface { + Timestamp() int64 + Value(name string) interface{} +} + +type mapper interface { + Map(Point) interface{} +} + +type countMapper struct{} + +func (m *countMapper) Map(_ Point) interface{} { return 1 } + +type evalMapper struct { + expr influxql.Expr +} + +func (m *evalMapper) Map(p Point) interface{} { return eval(p, m.expr) } + +type reducer interface { + Reduce(values [][]interface{}, index int) interface{} +} + +type sumReducer struct{} + +func (r *sumReducer) Reduce(values [][]interface{}, index int) interface{} { + var n int + for _, value := range values { + v, _ := value[index].(int) + n += v + } + return n +} + +// eval computes the value of an expression for a given point. +func eval(p Point, expr influxql.Expr) interface{} { + switch expr := expr.(type) { + case *influxql.VarRef: + return p.Value(expr.Val) + case *influxql.Call: + panic("not implemented: eval: call") + case *influxql.NumberLiteral: + return expr.Val + case *influxql.StringLiteral: + return expr.Val + case *influxql.BooleanLiteral: + return expr.Val + case *influxql.TimeLiteral: + return expr.Val + case *influxql.DurationLiteral: + return expr.Val + case *influxql.BinaryExpr: + return evalBinaryExpr(p, expr) + case *influxql.ParenExpr: + return eval(p, expr.Expr) + } + panic("unsupported expression type") +} + +func evalBinaryExpr(p Point, expr *influxql.BinaryExpr) interface{} { + // Compute the left and right hand side values. + lhs := eval(p, expr.LHS) + rhs := eval(p, expr.RHS) + + // Execute them with the appropriate types. + switch expr.Op { + case influxql.ADD: + return lhs.(float64) + rhs.(float64) + case influxql.SUB: + return lhs.(float64) - rhs.(float64) + case influxql.MUL: + return lhs.(float64) * rhs.(float64) + case influxql.DIV: + if rhs == 0 { + return float64(0) + } + return lhs.(float64) / rhs.(float64) + + case influxql.AND: + return lhs.(bool) && rhs.(bool) + case influxql.OR: + return lhs.(bool) || rhs.(bool) + + case influxql.EQ: + return lhs == rhs + case influxql.NEQ: + return lhs != rhs + case influxql.LT: + return lhs.(float64) < rhs.(float64) + case influxql.LTE: + return lhs.(float64) <= rhs.(float64) + case influxql.GT: + return lhs.(float64) > rhs.(float64) + case influxql.GTE: + return lhs.(float64) >= rhs.(float64) + + default: + panic("invalid binary expr operator:" + expr.Op.String()) + } +} + +// EXAMPLE: SELECT COUNT(value) FROM some_series GROUP BY TIME(5m) HAVING COUNT(value) > 23 +// EXAMPLE: SELECT * FROM cpu GROUP BY TIME(1h), host HAVING TOP(value, 10) WHERE time > NOW() +// EXAMPLE: SELECT MAX(value) AS max_value, host FROM cpu GROUP BY TIME(1h), host HAVING TOP(max_value, 13) diff --git a/engine_test.go b/engine_test.go new file mode 100644 index 0000000000..072e94c80a --- /dev/null +++ b/engine_test.go @@ -0,0 +1,115 @@ +package influxdb_test + +import ( + "reflect" + "testing" + "time" + + "github.com/influxdb/influxdb" +) + +// Ensure the planner can generate an appropriate executor. +func TestPlanner(t *testing.T) { + // Create a planner to a mock database with multiple series: + // + // 1. "cpu" - cpu usage + // 2. "visits" - page view tracking + // 3. "errors" - system errors + // + // Each series has a "host" tag. + var p influxdb.Planner + p.NewIteratorFunc = func(name string, tags map[string]string) []influxdb.Iterator { + switch name { + case "cpu": + return []influxdb.Iterator{&sliceIterator{Points: []influxdb.Point{ + &point{"timestamp": mustParseTime("2000-01-01T00:00:00Z"), "value": float64(10)}, + &point{"timestamp": mustParseTime("2000-01-01T00:00:00Z"), "value": float64(60)}, + &point{"timestamp": mustParseTime("2000-01-01T00:01:30Z"), "value": float64(50)}, + }}} + case "visits": + return []influxdb.Iterator{&sliceIterator{Points: []influxdb.Point{ + &point{"timestamp": mustParseTime("2000-01-01T00:00:00Z"), "path": "/", "user_id": 123}, + &point{"timestamp": mustParseTime("2000-01-01T00:01:00Z"), "path": "/signup", "user_id": 456}, + &point{"timestamp": mustParseTime("2000-01-01T00:01:00Z"), "path": "/login", "user_id": 123}, + }}} + case "errors": + } + panic("series not found: " + name) + } + + // Set up a list of example queries with their expected result set. + var tests = []struct { + q string + res [][]interface{} + }{ + // 0. Retrieve raw data. + { + q: `SELECT value FROM cpu`, + res: [][]interface{}{{float64(10)}, {float64(60)}, {float64(50)}}, + }, + + // 1. Simple count. + { + q: `SELECT count() FROM cpu`, + res: [][]interface{}{{3}}, + }, + + // 2. Sum grouped by time. + { + q: `SELECT sum(value) FROM cpu GROUP BY time(1m)`, + res: [][]interface{}{{-1}}, + }, + } + + // Iterate over each test, parse the query, plan & execute the statement. + // Retrieve all the result rows and compare with the expected result. + for i, tt := range tests { + // Plan and execute. + q := mustParseSelectStatement(tt.q) + ch, err := p.Plan(q).Execute() + if err != nil { + t.Errorf("%d. %q: execute error: %s", i, tt.q, err) + continue + } + + // Collect all the results. + var res [][]interface{} + for row := range ch { + res = append(res, row) + } + + // Compare the results to what is expected. + if !reflect.DeepEqual(tt.res, res) { + t.Errorf("%d. %q: result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.q, tt.res, res) + continue + } + } +} + +// sliceIterator iterates over a slice of points. +type sliceIterator struct { + Points []influxdb.Point + Index int +} + +// Next returns the next point in the iterator. +func (i *sliceIterator) Next() (p influxdb.Point) { + if i.Index < len(i.Points) { + p = i.Points[i.Index] + i.Index++ + } + return +} + +// point represents a single timeseries data point. +// The "timestamp" key is reserved for the timestamp. +type point map[string]interface{} + +// Timestamp returns the time on the point in nanoseconds since epoch. +// Panic if the "timestamp" key is not a time. +func (p point) Timestamp() int64 { + return p["timestamp"].(time.Time).UnixNano() +} + +// Value returns a value by name. +func (p point) Value(name string) interface{} { return p[name] } diff --git a/influxql/ast.go b/influxql/ast.go index f60935e58f..1b3fbdf95e 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -118,6 +118,17 @@ type SelectStatement struct { Ascending bool } +// Aggregated returns true if the statement uses aggregate functions. +func (s *SelectStatement) Aggregated() bool { + var v bool + WalkFunc(s.Fields, func(n Node) { + if _, ok := n.(*Call); ok { + v = true + } + }) + return v +} + // DeleteStatement represents a command for removing data from the database. type DeleteStatement struct { // Data source that values are removed from.