diff --git a/influxql/ast.go b/influxql/ast.go index fc922315ff..a155410538 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -64,7 +64,8 @@ func (_ Fields) node() {} func (_ *Field) node() {} func (_ Dimensions) node() {} func (_ *Dimension) node() {} -func (_ *Series) node() {} +func (_ *Measurement) node() {} +func (_ Measurements) node() {} func (_ *Join) node() {} func (_ *Merge) node() {} func (_ *VarRef) node() {} @@ -142,9 +143,9 @@ type Source interface { source() } -func (_ *Series) source() {} -func (_ *Join) source() {} -func (_ *Merge) source() {} +func (_ *Measurement) source() {} +func (_ *Join) source() {} +func (_ *Merge) source() {} // SortField represens a field to sort results by. type SortField struct { @@ -277,7 +278,7 @@ func (s *SelectStatement) Substatement(ref *VarRef) (*SelectStatement, error) { } // If there is only one series source then return it with the whole condition. - if _, ok := s.Source.(*Series); ok { + if _, ok := s.Source.(*Measurement); ok { other.Source = s.Source other.Condition = s.Condition return other, nil @@ -288,7 +289,7 @@ func (s *SelectStatement) Substatement(ref *VarRef) (*SelectStatement, error) { if name == "" { return nil, fmt.Errorf("field source not found: %s", ref.Val) } - other.Source = &Series{Name: name} + other.Source = &Measurement{Name: name} // Filter out conditions. if s.Condition != nil { @@ -338,23 +339,21 @@ func filterExprBySource(name string, expr Expr) Expr { // Returns a blank string if no sources match. func MatchSource(src Source, name string) string { switch src := src.(type) { - case *Series: + case *Measurement: if strings.HasPrefix(name, src.Name) { return src.Name } case *Join: - if str := MatchSource(src.LHS, name); str != "" { - return str - } - if str := MatchSource(src.RHS, name); str != "" { - return str + for _, m := range src.Measurements { + if strings.HasPrefix(name, m.Name) { + return m.Name + } } case *Merge: - if str := MatchSource(src.LHS, name); str != "" { - return str - } - if str := MatchSource(src.RHS, name); str != "" { - return str + for _, m := range src.Measurements { + if strings.HasPrefix(name, m.Name) { + return m.Name + } } } return "" @@ -709,34 +708,44 @@ type Dimension struct { // String returns a string representation of the dimension. func (d *Dimension) String() string { return d.Expr.String() } -// Series represents a single series used as a datasource. -type Series struct { +// Measurements represents a list of measurements. +type Measurements []*Measurement + +// String returns a string representation of the measurements. +func (a Measurements) String() string { + var str []string + for _, m := range a { + str = append(str, m.String()) + } + return strings.Join(str, ", ") +} + +// Measurement represents a single measurement used as a datasource. +type Measurement struct { Name string } -// String returns a string representation of the series. -func (s *Series) String() string { return s.Name } +// String returns a string representation of the measurement. +func (m *Measurement) String() string { return QuoteIdent(s.Name) } // Join represents two datasources joined together. type Join struct { - LHS Source - RHS Source + Measurements Measurements } // String returns a string representation of the join. func (j *Join) String() string { - return fmt.Sprintf("%s JOIN %s", j.LHS.String(), j.RHS.String()) + return fmt.Sprintf("join(%s)", j.Measurements.String()) } // Merge represents a datasource created by merging two datasources. type Merge struct { - LHS Source - RHS Source + Measurements Measurements } // String returns a string representation of the merge. func (m *Merge) String() string { - return fmt.Sprintf("%s JOIN %s", m.LHS.String(), m.RHS.String()) + return fmt.Sprintf("merge(%s)", m.Measurements.String()) } // VarRef represents a reference to a variable. diff --git a/influxql/ast_test.go b/influxql/ast_test.go index d30b8ae99c..7deab76a7c 100644 --- a/influxql/ast_test.go +++ b/influxql/ast_test.go @@ -39,28 +39,28 @@ func TestSelectStatement_Substatement(t *testing.T) { // 1. Simple join { - stmt: `SELECT sum(aa.value) + sum(bb.value) FROM aa JOIN bb`, + stmt: `SELECT sum(aa.value) + sum(bb.value) FROM join(aa,bb)`, expr: &influxql.VarRef{Val: "aa.value"}, sub: `SELECT aa.value FROM aa`, }, // 2. Simple merge { - stmt: `SELECT sum(aa.value) + sum(bb.value) FROM aa MERGE bb`, + stmt: `SELECT sum(aa.value) + sum(bb.value) FROM merge(aa, bb)`, expr: &influxql.VarRef{Val: "bb.value"}, sub: `SELECT bb.value FROM bb`, }, // 3. Join with condition { - stmt: `SELECT sum(aa.value) + sum(bb.value) FROM aa JOIN bb WHERE aa.host = "servera" AND bb.host = "serverb"`, + stmt: `SELECT sum(aa.value) + sum(bb.value) FROM join(aa, bb) WHERE aa.host = "servera" AND bb.host = "serverb"`, expr: &influxql.VarRef{Val: "bb.value"}, sub: `SELECT bb.value FROM bb WHERE bb.host = "serverb"`, }, // 4. Join with complex condition { - stmt: `SELECT sum(aa.value) + sum(bb.value) FROM aa JOIN bb WHERE aa.host = "servera" AND (bb.host = "serverb" OR bb.host = "serverc") AND 1 = 2`, + stmt: `SELECT sum(aa.value) + sum(bb.value) FROM join(aa, bb) WHERE aa.host = "servera" AND (bb.host = "serverb" OR bb.host = "serverc") AND 1 = 2`, expr: &influxql.VarRef{Val: "bb.value"}, sub: `SELECT bb.value FROM bb WHERE (bb.host = "serverb" OR bb.host = "serverc") AND 1.000 = 2.000`, }, diff --git a/influxql/engine.go b/influxql/engine.go index 84b82a54ad..94fde06485 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -1,8 +1,11 @@ package influxql import ( + "encoding/binary" "errors" "fmt" + "hash/fnv" + "sort" "strings" "time" ) @@ -12,6 +15,9 @@ type DB interface { // Returns a list of series data ids matching a name and tags. MatchSeries(name string, tags map[string]string) []uint32 + // Returns a slice of tag values for a series. + SeriesTagValues(seriesID uint32, keys []string) []string + // Returns the id and data type for a series field. Field(name, field string) (fieldID uint8, typ DataType) @@ -59,11 +65,11 @@ func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) { e.min, e.max = min, max // Determine group by interval. - interval, dimensions, err := p.normalizeDimensions(stmt.Dimensions) + interval, tags, err := p.normalizeDimensions(stmt.Dimensions) if err != nil { return nil, err } - e.interval, e.dimensions = interval, dimensions + e.interval, e.tags = interval, tags // Generate a processor for each field. for i, f := range stmt.Fields { @@ -79,7 +85,7 @@ func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) { // normalizeDimensions extacts the time interval, if specified. // Returns all remaining dimensions. -func (p *Planner) normalizeDimensions(dimensions Dimensions) (time.Duration, Dimensions, error) { +func (p *Planner) normalizeDimensions(dimensions Dimensions) (time.Duration, []string, error) { // Ignore if there are no dimensions. if len(dimensions) == 0 { return 0, nil, nil @@ -97,10 +103,10 @@ func (p *Planner) normalizeDimensions(dimensions Dimensions) (time.Duration, Dim if !ok { return 0, nil, errors.New("time dimension must have one duration argument") } - return lit.Val, dimensions[1:], nil + return lit.Val, dimensionKeys(dimensions[1:]), nil } - return 0, dimensions, nil + return 0, dimensionKeys(dimensions), nil } // planField returns a processor for field. @@ -151,7 +157,7 @@ func (p *Planner) planCall(e *Executor, c *Call) (processor, error) { if err != nil { return nil, err } - name := sub.Source.(*Series).Name + name := sub.Source.(*Measurement).Name tags := make(map[string]string) // TODO: Extract tags. // Find field. @@ -174,6 +180,7 @@ func (p *Planner) planCall(e *Executor, c *Call) (processor, error) { m := newMapper(e, seriesID, fieldID, typ) m.min, m.max = e.min.UnixNano(), e.max.UnixNano() m.interval = int64(e.interval) + m.key = append(make([]byte, 8), marshalStrings(p.DB.SeriesTagValues(seriesID, e.tags))...) r.mappers[i] = m } @@ -199,7 +206,20 @@ func (p *Planner) planCall(e *Executor, c *Call) (processor, error) { // planBinaryExpr generates a processor for a binary expression. // A binary expression represents a join operator between two processors. func (p *Planner) planBinaryExpr(e *Executor, expr *BinaryExpr) (processor, error) { - panic("TODO") + // Create processor for LHS. + lhs, err := p.planExpr(e, expr.LHS) + if err != nil { + return nil, fmt.Errorf("lhs: %s", err) + } + + // Create processor for RHS. + rhs, err := p.planExpr(e, expr.RHS) + if err != nil { + return nil, fmt.Errorf("rhs: %s", err) + } + + // Combine processors. + return newBinaryExprEvaluator(e, expr.Op, lhs, rhs), nil } // Executor represents the implementation of Executor. @@ -210,7 +230,7 @@ type Executor struct { processors []processor // per-field processors min, max time.Time // time range interval time.Duration // group by duration - dimensions Dimensions // non-interval dimensions + tags []string // group by tag keys } // Execute begins execution of the query and returns a channel to receive rows. @@ -231,28 +251,14 @@ func (e *Executor) Execute() (<-chan *Row, error) { func (e *Executor) execute(out chan *Row) { // TODO: Support multi-value rows. - // Initialize row. - row := &Row{} - row.Name = e.processors[0].name() - - // Create column names. - row.Columns = make([]string, 0) - if e.interval != 0 { - row.Columns = append(row.Columns, "time") - } - for i, f := range e.stmt.Fields { - name := f.Name() - if name == "" { - name = fmt.Sprintf("col%d", i) - } - row.Columns = append(row.Columns, name) - } + // Initialize map of rows by encoded tagset. + rows := make(map[string]*Row) // Combine values from each processor. loop: for { - values := make([]interface{}, len(e.processors)+1) - + // Retrieve values from processors and write them to the approprite + // row based on their tagset. for i, p := range e.processors { // Retrieve data from the processor. m, ok := <-p.C() @@ -262,25 +268,86 @@ loop: // Set values on returned row. for k, v := range m { - values[0] = k / int64(time.Microsecond) // TODO: Set once per row. + // Extract timestamp and tag values from key. + b := []byte(k) + timestamp := int64(binary.BigEndian.Uint64(b[0:8])) + + // Lookup row values and populate data. + values := e.createRowValuesIfNotExists(rows, e.processors[0].name(), b[8:], timestamp) values[i+1] = v } } - - // Remove timestamp if there is no group by interval. - if e.interval == 0 { - values = values[1:] - } - row.Values = append(row.Values, values) } - // Send row to the channel. - out <- row + // Normalize rows and values. + // This converts the timestamps from nanoseconds to microseconds. + a := make(Rows, 0, len(rows)) + for _, row := range rows { + for _, values := range row.Values { + values[0] = values[0].(int64) / int64(time.Microsecond) + } + a = append(a, row) + } + sort.Sort(a) + + // Send rows to the channel. + for _, row := range a { + out <- row + } // Mark the end of the output channel. close(out) } +// creates a new value set if one does not already exist for a given tagset + timestamp. +func (e *Executor) createRowValuesIfNotExists(rows map[string]*Row, name string, tagset []byte, timestamp int64) []interface{} { + // TODO: Add "name" to lookup key. + + // Find row by tagset. + var row *Row + if row = rows[string(tagset)]; row == nil { + row = &Row{Name: name} + + // Create tag map. + row.Tags = make(map[string]string) + for i, v := range unmarshalStrings(tagset) { + row.Tags[e.tags[i]] = v + } + + // Create column names. + row.Columns = make([]string, 1, len(e.stmt.Fields)+1) + row.Columns[0] = "time" + for i, f := range e.stmt.Fields { + name := f.Name() + if name == "" { + name = fmt.Sprintf("col%d", i) + } + row.Columns = append(row.Columns, name) + } + + // Save to lookup. + rows[string(tagset)] = row + } + + // If no values exist or last value doesn't match the timestamp then create new. + if len(row.Values) == 0 || row.Values[len(row.Values)-1][0] != timestamp { + values := make([]interface{}, len(e.processors)+1) + values[0] = timestamp + row.Values = append(row.Values, values) + } + + return row.Values[len(row.Values)-1] +} + +// dimensionKeys returns a list of tag key names for the dimensions. +// Each dimension must be a VarRef. +func dimensionKeys(dimensions Dimensions) (a []string) { + for _, d := range dimensions { + a = append(a, d.Expr.(*VarRef).Val) + } + return +} + // mapper represents an object for processing iterators. type mapper struct { executor *Executor // parent executor @@ -290,9 +357,10 @@ type mapper struct { itr Iterator // series iterator min, max int64 // time range interval int64 // group by interval + key []byte // encoded timestamp + dimensional values fn mapFunc // map function - c chan map[int64]interface{} + c chan map[string]interface{} done chan chan struct{} } @@ -303,7 +371,7 @@ func newMapper(e *Executor, seriesID uint32, fieldID uint8, typ DataType) *mappe seriesID: seriesID, fieldID: fieldID, typ: typ, - c: make(chan map[int64]interface{}, 0), + c: make(chan map[string]interface{}, 0), done: make(chan chan struct{}, 0), } } @@ -319,7 +387,7 @@ func (m *mapper) start() { func (m *mapper) stop() { syncClose(m.done) } // C returns the streaming data channel. -func (m *mapper) C() <-chan map[int64]interface{} { return m.c } +func (m *mapper) C() <-chan map[string]interface{} { return m.c } // run executes the map function against the iterator. func (m *mapper) run() { @@ -329,9 +397,13 @@ func (m *mapper) run() { close(m.c) } -// emit sends a value to the reducer's output channel. +// emit sends a value to the mapper's output channel. func (m *mapper) emit(key int64, value interface{}) { - m.c <- map[int64]interface{}{key: value} + // Encode the timestamp to the beginning of the key. + binary.BigEndian.PutUint64(m.key, uint64(key)) + + // OPTIMIZE: Collect emit calls and flush all at once. + m.c <- map[string]interface{}{string(m.key): value} } // mapFunc represents a function used for mapping iterators. @@ -355,6 +427,14 @@ func mapSum(itr Iterator, m *mapper) { m.emit(itr.Time(), n) } +// processor represents an object for joining reducer output. +type processor interface { + start() + stop() + name() string + C() <-chan map[string]interface{} +} + // reducer represents an object for processing mapper output. // Implements processor. type reducer struct { @@ -363,7 +443,7 @@ type reducer struct { mappers []*mapper // child mappers fn reduceFunc // reduce function - c chan map[int64]interface{} + c chan map[string]interface{} done chan chan struct{} } @@ -371,7 +451,7 @@ type reducer struct { func newReducer(e *Executor) *reducer { return &reducer{ executor: e, - c: make(chan map[int64]interface{}, 0), + c: make(chan map[string]interface{}, 0), done: make(chan chan struct{}, 0), } } @@ -393,17 +473,17 @@ func (r *reducer) stop() { } // C returns the streaming data channel. -func (r *reducer) C() <-chan map[int64]interface{} { return r.c } +func (r *reducer) C() <-chan map[string]interface{} { return r.c } // name returns the source name. -func (r *reducer) name() string { return r.stmt.Source.(*Series).Name } +func (r *reducer) name() string { return r.stmt.Source.(*Measurement).Name } // run runs the reducer loop to read mapper output and reduce it. func (r *reducer) run() { loop: for { // Combine all data from the mappers. - data := make(map[int64][]interface{}) + data := make(map[string][]interface{}) for _, m := range r.mappers { kv, ok := <-m.C() if !ok { @@ -425,15 +505,15 @@ loop: } // emit sends a value to the reducer's output channel. -func (r *reducer) emit(key int64, value interface{}) { - r.c <- map[int64]interface{}{key: value} +func (r *reducer) emit(key string, value interface{}) { + r.c <- map[string]interface{}{key: value} } // reduceFunc represents a function used for reducing mapper output. -type reduceFunc func(int64, []interface{}, *reducer) +type reduceFunc func(string, []interface{}, *reducer) // reduceSum computes the sum of values for each key. -func reduceSum(key int64, values []interface{}, r *reducer) { +func reduceSum(key string, values []interface{}, r *reducer) { var n float64 for _, v := range values { n += v.(float64) @@ -441,18 +521,109 @@ func reduceSum(key int64, values []interface{}, r *reducer) { r.emit(key, n) } -// processor represents an object for joining reducer output. -type processor interface { - start() - stop() - name() string - C() <-chan map[int64]interface{} +// binaryExprEvaluator represents a processor for combining two processors. +type binaryExprEvaluator struct { + executor *Executor // parent executor + lhs, rhs processor // processors + op Token // operation + + c chan map[string]interface{} + done chan chan struct{} +} + +// newBinaryExprEvaluator returns a new instance of binaryExprEvaluator. +func newBinaryExprEvaluator(e *Executor, op Token, lhs, rhs processor) *binaryExprEvaluator { + return &binaryExprEvaluator{ + executor: e, + op: op, + lhs: lhs, + rhs: rhs, + c: make(chan map[string]interface{}, 0), + done: make(chan chan struct{}, 0), + } +} + +// start begins streaming values from the lhs/rhs processors +func (e *binaryExprEvaluator) start() { + e.lhs.start() + e.rhs.start() + go e.run() +} + +// stop stops the processor. +func (e *binaryExprEvaluator) stop() { + e.lhs.stop() + e.rhs.stop() + syncClose(e.done) +} + +// C returns the streaming data channel. +func (e *binaryExprEvaluator) C() <-chan map[string]interface{} { return e.c } + +// name returns the source name. +func (e *binaryExprEvaluator) name() string { return "" } + +// run runs the processor loop to read subprocessor output and combine it. +func (e *binaryExprEvaluator) run() { + for { + // Read LHS value. + lhs, ok := <-e.lhs.C() + if !ok { + break + } + + // Read RHS value. + rhs, ok := <-e.rhs.C() + if !ok { + break + } + + // Merge maps. + m := make(map[string]interface{}) + for k, v := range lhs { + m[k] = e.eval(v, rhs[k]) + } + for k, v := range rhs { + // Skip value if already processed in lhs loop. + if _, ok := m[k]; ok { + continue + } + m[k] = e.eval(float64(0), v) + } + + // Return value. + e.c <- m + } + + // Mark the channel as complete. + close(e.c) +} + +// eval evaluates two values using the evaluator's operation. +func (e *binaryExprEvaluator) eval(lhs, rhs interface{}) interface{} { + switch e.op { + case ADD: + return lhs.(float64) + rhs.(float64) + case SUB: + return lhs.(float64) - rhs.(float64) + case MUL: + return lhs.(float64) * rhs.(float64) + case DIV: + rhs := rhs.(float64) + if rhs == 0 { + return float64(0) + } + return lhs.(float64) / rhs + default: + // TODO: Validate operation & data types. + panic("invalid operation: " + e.op.String()) + } } // literalProcessor represents a processor that continually sends a literal value. type literalProcessor struct { val interface{} - c chan map[int64]interface{} + c chan map[string]interface{} done chan chan struct{} } @@ -460,13 +631,13 @@ type literalProcessor struct { func newLiteralProcessor(val interface{}) *literalProcessor { return &literalProcessor{ val: val, - c: make(chan map[int64]interface{}, 0), + c: make(chan map[string]interface{}, 0), done: make(chan chan struct{}, 0), } } // C returns the streaming data channel. -func (p *literalProcessor) C() <-chan map[int64]interface{} { return p.c } +func (p *literalProcessor) C() <-chan map[string]interface{} { return p.c } // process continually returns a literal value with a "0" key. func (p *literalProcessor) start() { go p.run() } @@ -478,7 +649,7 @@ func (p *literalProcessor) run() { case ch := <-p.done: close(ch) return - case p.c <- map[int64]interface{}{0: p.val}: + case p.c <- map[string]interface{}{"": p.val}: } } } @@ -521,8 +692,73 @@ type Row struct { Err error `json:"err,omitempty"` } -// TODO: Walk field expressions to extract subqueries. -// TODO: Resolve subqueries to series ids. -// TODO: send query with all ids to executor (knows to run locally or remote server) -// TODO: executor creates mapper for each series id. -// TODO: Create +// tagsHash returns a hash of tag key/value pairs. +func (r *Row) tagsHash() uint64 { + h := fnv.New64a() + keys := r.tagsKeys() + for _, k := range keys { + h.Write([]byte(k)) + h.Write([]byte(r.Tags[k])) + } + return h.Sum64() +} + +// tagKeys returns a sorted list of tag keys. +func (r *Row) tagsKeys() []string { + a := make([]string, len(r.Tags)) + for k := range r.Tags { + a = append(a, k) + } + sort.Strings(a) + return a +} + +// Rows represents a list of rows that can be sorted consistently by name/tag. +type Rows []*Row + +func (p Rows) Len() int { return len(p) } + +func (p Rows) Less(i, j int) bool { + // Sort by name first. + if p[i].Name != p[j].Name { + return p[i].Name < p[j].Name + } + + // Sort by tag set hash. Tags don't have a meaningful sort order so we + // just compute a hash and sort by that instead. This allows the tests + // to receive rows in a predictable order every time. + return p[i].tagsHash() < p[j].tagsHash() +} + +func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +// marshalStrings encodes an array of strings into a byte slice. +func marshalStrings(a []string) (ret []byte) { + for _, s := range a { + // Create a slice for len+data + b := make([]byte, 2+len(s)) + binary.BigEndian.PutUint16(b[0:2], uint16(len(s))) + copy(b[2:], s) + + // Append it to the full byte slice. + ret = append(ret, b...) + } + return +} + +// unmarshalStrings decodes a byte slice into an array of strings. +func unmarshalStrings(b []byte) (ret []string) { + for { + // If there's no more data then exit. + if len(b) == 0 { + return + } + + // Decode size + data. + n := binary.BigEndian.Uint16(b[0:2]) + ret = append(ret, string(b[2:n+2])) + + // Move the byte slice forward and retry. + b = b[n+2:] + } +} diff --git a/influxql/engine_test.go b/influxql/engine_test.go index d834078eb1..f380186397 100644 --- a/influxql/engine_test.go +++ b/influxql/engine_test.go @@ -25,7 +25,27 @@ func TestPlanner_Plan_Count(t *testing.T) { db.WriteSeries("cpu", map[string]string{}, "2000-01-01T00:00:50Z", map[string]interface{}{"value": float64(50)}) // Expected resultset. - exp := minify(`[{"name":"cpu","columns":["count"],"values":[[6]]}]`) + exp := minify(`[{"name":"cpu","columns":["time","count"],"values":[[0,6]]}]`) + + // Execute and compare. + rs := db.MustPlanAndExecute(`SELECT count(value) FROM cpu`) + if act := minify(jsonify(rs)); exp != act { + t.Fatalf("unexpected resultset: %s", act) + } +} + +// Ensure the planner can plan and execute a count query across multiple series. +func TestPlanner_Plan_Count_Multiseries(t *testing.T) { + db := NewDB("2000-01-01T12:00:00Z") + db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T00:00:00Z", map[string]interface{}{"value": float64(100)}) + db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T00:00:10Z", map[string]interface{}{"value": float64(90)}) + db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T00:00:20Z", map[string]interface{}{"value": float64(80)}) + db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T00:00:30Z", map[string]interface{}{"value": float64(70)}) + db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T00:00:40Z", map[string]interface{}{"value": float64(60)}) + db.WriteSeries("cpu", map[string]string{"host": "servera", "region": "us-west"}, "2000-01-01T00:00:50Z", map[string]interface{}{"value": float64(50)}) + + // Expected resultset. + exp := minify(`[{"name":"cpu","columns":["time","count"],"values":[[0,6]]}]`) // Execute and compare. rs := db.MustPlanAndExecute(`SELECT count(value) FROM cpu`) @@ -35,7 +55,7 @@ func TestPlanner_Plan_Count(t *testing.T) { } // Ensure the planner can plan and execute a count query grouped by hour. -func TestPlanner_Plan_CountByHour(t *testing.T) { +func TestPlanner_Plan_GroupByInterval(t *testing.T) { db := NewDB("2000-01-01T12:00:00Z") db.WriteSeries("cpu", map[string]string{}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(100)}) db.WriteSeries("cpu", map[string]string{}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(90)}) @@ -71,10 +91,94 @@ func TestPlanner_Plan_CountByHour(t *testing.T) { } } +// Ensure the planner can plan and execute a query grouped by interval and tag. +func TestPlanner_Plan_GroupByIntervalAndTag(t *testing.T) { + db := NewDB("2000-01-01T12:00:00Z") + db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(10)}) + db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T09:30:00Z", map[string]interface{}{"value": float64(20)}) + db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T11:00:00Z", map[string]interface{}{"value": float64(30)}) + db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T11:30:00Z", map[string]interface{}{"value": float64(40)}) + + db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(1)}) + db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T11:00:00Z", map[string]interface{}{"value": float64(2)}) + + // Query for data since 3 hours ago until now, grouped every 30 minutes. + rs := db.MustPlanAndExecute(` + SELECT sum(value) + FROM cpu + WHERE time >= now() - 3h + GROUP BY time(1h), host`) + + // Expected resultset. + exp := minify(`[{ + "name":"cpu", + "tags":{"host":"servera"}, + "columns":["time","sum"], + "values":[ + [946717200000000,30], + [946720800000000,0], + [946724400000000,70] + ] + },{ + "name":"cpu", + "tags":{"host":"serverb"}, + "columns":["time","sum"], + "values":[ + [946717200000000,1], + [946720800000000,0], + [946724400000000,2] + ] + }]`) + + // Compare resultsets. + if act := jsonify(rs); exp != act { + t.Fatalf("unexpected resultset: \n\n%s\n\n%s\n\n", exp, act) + } +} + +// Ensure the planner can plan and execute a joined query. +func TestPlanner_Plan_Join(t *testing.T) { + db := NewDB("2000-01-01T12:00:00Z") + db.WriteSeries("cpu.0", map[string]string{}, "2000-01-01T00:00:00Z", map[string]interface{}{"value": float64(1)}) + db.WriteSeries("cpu.0", map[string]string{}, "2000-01-01T00:00:10Z", map[string]interface{}{"value": float64(2)}) + db.WriteSeries("cpu.0", map[string]string{}, "2000-01-01T00:00:20Z", map[string]interface{}{"value": float64(3)}) + db.WriteSeries("cpu.0", map[string]string{}, "2000-01-01T00:00:30Z", map[string]interface{}{"value": float64(4)}) + + db.WriteSeries("cpu.1", map[string]string{}, "2000-01-01T00:00:00Z", map[string]interface{}{"value": float64(10)}) + db.WriteSeries("cpu.1", map[string]string{}, "2000-01-01T00:00:10Z", map[string]interface{}{"value": float64(20)}) + db.WriteSeries("cpu.1", map[string]string{}, "2000-01-01T00:00:30Z", map[string]interface{}{"value": float64(40)}) + + // Query must join the series and sum the values. + rs := db.MustPlanAndExecute(` + SELECT sum(cpu.0.value) + sum(cpu.1.value) AS "sum" + FROM JOIN(cpu.0, cpu.1) + WHERE time >= "2000-01-01 00:00:00" AND time < "2000-01-01 00:01:00" + GROUP BY time(10s)`) + + // Expected resultset. + exp := minify(`[{ + "columns":["time","sum"], + "values":[ + [946684800000000,11], + [946684810000000,22], + [946684820000000,3], + [946684830000000,44], + [946684840000000,0], + [946684850000000,0] + ] + }]`) + + // Compare resultsets. + if act := jsonify(rs); exp != act { + t.Fatalf("unexpected resultset: %s", indent(act)) + } +} + // DB represents an in-memory test database that implements methods for Planner. type DB struct { measurements map[string]*Measurement series map[uint32]*Series + maxSeriesID uint32 Now time.Time } @@ -167,8 +271,8 @@ func (db *DB) CreateSeriesIfNotExists(name string, tags map[string]string) (*Mea } // Create new series. - m.maxSeriesID++ - s := &Series{id: m.maxSeriesID, tags: tags} + db.maxSeriesID++ + s := &Series{id: db.maxSeriesID, tags: tags} // Add series to DB and measurement. db.series[s.id] = s @@ -206,6 +310,23 @@ func (db *DB) MatchSeries(name string, tags map[string]string) []uint32 { return ids } +// SeriesTagValues returns a slice of tag values for a given series and tag keys. +func (db *DB) SeriesTagValues(seriesID uint32, keys []string) (values []string) { + values = make([]string, len(keys)) + + // Find series. + s := db.series[seriesID] + if s == nil { + return + } + + // Loop over keys and set values. + for i, key := range keys { + values[i] = s.tags[key] + } + return +} + // FieldID returns the field identifier for a given measurement name and field name. func (db *DB) Field(name, field string) (fieldID uint8, typ influxql.DataType) { // Find measurement. @@ -334,8 +455,7 @@ type Measurement struct { maxFieldID uint8 fields map[string]*Field - maxSeriesID uint32 - series map[uint32]*Series + series map[uint32]*Series } // NewMeasurement returns a new instance of Measurement. diff --git a/influxql/parser.go b/influxql/parser.go index 7b615ba091..9f934a8d97 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -566,30 +566,52 @@ func (p *Parser) parseAlias() (string, error) { // parseSource parses the "FROM" clause of the query. func (p *Parser) parseSource() (Source, error) { - // Scan the identifier for the series. + // The first token can either be the series name or a join/merge call. tok, pos, lit := p.scanIgnoreWhitespace() if tok != IDENT && tok != STRING { return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos) } - lhs := &Series{Name: lit} - // Check if the next token is JOIN or MERGE. - if tok, _, _ := p.scanIgnoreWhitespace(); tok == JOIN { - rhs, err := p.parseSource() - if err != nil { - return nil, err - } - return &Join{LHS: lhs, RHS: rhs}, nil - } else if tok == MERGE { - rhs, err := p.parseSource() - if err != nil { - return nil, err - } - return &Merge{LHS: lhs, RHS: rhs}, nil + // If the token is a string or the next token is not an LPAREN then return a measurement. + if next, _, _ := p.scan(); tok == STRING || (tok == IDENT && next != LPAREN) { + p.unscan() + return &Measurement{Name: lit}, nil } - p.unscan() - return lhs, nil + // Verify the source type is join/merge. + sourceType := strings.ToLower(lit) + if sourceType != "join" && sourceType != "merge" { + return nil, &ParseError{Message: "unknown merge type: " + sourceType, Pos: pos} + } + + // Parse measurement list. + var measurements []*Measurement + for { + // Scan the measurement name. + tok, pos, lit := p.scanIgnoreWhitespace() + if tok != IDENT && tok != STRING { + return nil, newParseError(tokstr(tok, lit), []string{"measurement name"}, pos) + } + measurements = append(measurements, &Measurement{Name: lit}) + + // If there's not a comma next then stop parsing measurements. + if tok, _, _ := p.scan(); tok != COMMA { + p.unscan() + break + } + } + + // Expect a closing right paren. + if tok, pos, lit := p.scanIgnoreWhitespace(); tok != RPAREN { + return nil, newParseError(tokstr(tok, lit), []string{")"}, pos) + } + + // Return the appropriate source type. + if sourceType == "join" { + return &Join{Measurements: measurements}, nil + } else { + return &Merge{Measurements: measurements}, nil + } } // parseCondition parses the "WHERE" clause of the query, if it exists. diff --git a/influxql/parser_test.go b/influxql/parser_test.go index ec78224419..97f5899a68 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -52,7 +52,7 @@ func TestParser_ParseStatement(t *testing.T) { Fields: influxql.Fields{ &influxql.Field{Expr: &influxql.Wildcard{}}, }, - Source: &influxql.Series{Name: "myseries"}, + Source: &influxql.Measurement{Name: "myseries"}, }, }, @@ -65,7 +65,7 @@ func TestParser_ParseStatement(t *testing.T) { &influxql.Field{Expr: &influxql.VarRef{Val: "field2"}}, &influxql.Field{Expr: &influxql.VarRef{Val: "field3"}, Alias: "field_x"}, }, - Source: &influxql.Series{Name: "myseries"}, + Source: &influxql.Measurement{Name: "myseries"}, Condition: &influxql.BinaryExpr{ Op: influxql.EQ, LHS: &influxql.VarRef{Val: "host"}, @@ -83,14 +83,14 @@ func TestParser_ParseStatement(t *testing.T) { // SELECT statement with JOIN { - s: `SELECT field1 FROM aa JOIN bb JOIN cc`, + s: `SELECT field1 FROM join(aa,"bb", cc) JOIN cc`, stmt: &influxql.SelectStatement{ Fields: influxql.Fields{&influxql.Field{Expr: &influxql.VarRef{Val: "field1"}}}, Source: &influxql.Join{ - LHS: &influxql.Series{Name: "aa"}, - RHS: &influxql.Join{ - LHS: &influxql.Series{Name: "bb"}, - RHS: &influxql.Series{Name: "cc"}, + Measurements: influxql.Measurements{ + {Name: "aa"}, + {Name: "bb"}, + {Name: "cc"}, }, }, }, @@ -98,14 +98,13 @@ func TestParser_ParseStatement(t *testing.T) { // SELECT statement with MERGE { - s: `SELECT field1 FROM aa MERGE bb MERGE cc`, + s: `SELECT field1 FROM merge(aa,b.b)`, stmt: &influxql.SelectStatement{ Fields: influxql.Fields{&influxql.Field{Expr: &influxql.VarRef{Val: "field1"}}}, Source: &influxql.Merge{ - LHS: &influxql.Series{Name: "aa"}, - RHS: &influxql.Merge{ - LHS: &influxql.Series{Name: "bb"}, - RHS: &influxql.Series{Name: "cc"}, + Measurements: influxql.Measurements{ + {Name: "aa"}, + {Name: "b.b"}, }, }, }, @@ -116,7 +115,7 @@ func TestParser_ParseStatement(t *testing.T) { s: `select my_field from myseries`, stmt: &influxql.SelectStatement{ Fields: influxql.Fields{&influxql.Field{Expr: &influxql.VarRef{Val: "my_field"}}}, - Source: &influxql.Series{Name: "myseries"}, + Source: &influxql.Measurement{Name: "myseries"}, }, }, @@ -125,7 +124,7 @@ func TestParser_ParseStatement(t *testing.T) { s: `SELECT field1 FROM myseries ORDER BY ASC, field1, field2 DESC LIMIT 10`, stmt: &influxql.SelectStatement{ Fields: influxql.Fields{&influxql.Field{Expr: &influxql.VarRef{Val: "field1"}}}, - Source: &influxql.Series{Name: "myseries"}, + Source: &influxql.Measurement{Name: "myseries"}, SortFields: influxql.SortFields{ &influxql.SortField{Ascending: true}, &influxql.SortField{Name: "field1"}, @@ -139,7 +138,7 @@ func TestParser_ParseStatement(t *testing.T) { { s: `DELETE FROM myseries WHERE host = 'hosta.influxdb.org'`, stmt: &influxql.DeleteStatement{ - Source: &influxql.Series{Name: "myseries"}, + Source: &influxql.Measurement{Name: "myseries"}, Condition: &influxql.BinaryExpr{ Op: influxql.EQ, LHS: &influxql.VarRef{Val: "host"}, @@ -194,7 +193,7 @@ func TestParser_ParseStatement(t *testing.T) { { s: `LIST TAG KEYS FROM src WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`, stmt: &influxql.ListTagKeysStatement{ - Source: &influxql.Series{Name: "src"}, + Source: &influxql.Measurement{Name: "src"}, Condition: &influxql.BinaryExpr{ Op: influxql.EQ, LHS: &influxql.VarRef{Val: "region"}, @@ -213,7 +212,7 @@ func TestParser_ParseStatement(t *testing.T) { { s: `LIST TAG VALUES FROM src WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`, stmt: &influxql.ListTagValuesStatement{ - Source: &influxql.Series{Name: "src"}, + Source: &influxql.Measurement{Name: "src"}, Condition: &influxql.BinaryExpr{ Op: influxql.EQ, LHS: &influxql.VarRef{Val: "region"}, @@ -232,7 +231,7 @@ func TestParser_ParseStatement(t *testing.T) { { s: `LIST FIELD KEYS FROM src WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`, stmt: &influxql.ListFieldKeysStatement{ - Source: &influxql.Series{Name: "src"}, + Source: &influxql.Measurement{Name: "src"}, Condition: &influxql.BinaryExpr{ Op: influxql.EQ, LHS: &influxql.VarRef{Val: "region"}, @@ -251,7 +250,7 @@ func TestParser_ParseStatement(t *testing.T) { { s: `LIST FIELD VALUES FROM src WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`, stmt: &influxql.ListFieldValuesStatement{ - Source: &influxql.Series{Name: "src"}, + Source: &influxql.Measurement{Name: "src"}, Condition: &influxql.BinaryExpr{ Op: influxql.EQ, LHS: &influxql.VarRef{Val: "region"}, @@ -285,7 +284,7 @@ func TestParser_ParseStatement(t *testing.T) { Name: "myquery", Source: &influxql.SelectStatement{ Fields: influxql.Fields{&influxql.Field{Expr: &influxql.Call{Name: "count"}}}, - Source: &influxql.Series{Name: "myseries"}, + Source: &influxql.Measurement{Name: "myseries"}, }, Target: "foo", }, diff --git a/influxql/scanner_test.go b/influxql/scanner_test.go index b16837edf9..d826af76da 100644 --- a/influxql/scanner_test.go +++ b/influxql/scanner_test.go @@ -109,13 +109,11 @@ func TestScanner_Scan(t *testing.T) { {s: `INNER`, tok: influxql.INNER}, {s: `INSERT`, tok: influxql.INSERT}, {s: `INTO`, tok: influxql.INTO}, - {s: `JOIN`, tok: influxql.JOIN}, {s: `KEYS`, tok: influxql.KEYS}, {s: `LIMIT`, tok: influxql.LIMIT}, {s: `LIST`, tok: influxql.LIST}, {s: `MEASUREMENT`, tok: influxql.MEASUREMENT}, {s: `MEASUREMENTS`, tok: influxql.MEASUREMENTS}, - {s: `MERGE`, tok: influxql.MERGE}, {s: `ORDER`, tok: influxql.ORDER}, {s: `QUERIES`, tok: influxql.QUERIES}, {s: `SELECT`, tok: influxql.SELECT}, diff --git a/influxql/token.go b/influxql/token.go index f7daf81934..0755d77c2f 100644 --- a/influxql/token.go +++ b/influxql/token.go @@ -65,13 +65,11 @@ const ( INNER INSERT INTO - JOIN KEYS LIMIT LIST MEASUREMENT MEASUREMENTS - MERGE ORDER QUERIES QUERY @@ -129,13 +127,11 @@ var tokens = [...]string{ INNER: "INNER", INSERT: "INSERT", INTO: "INTO", - JOIN: "JOIN", KEYS: "KEYS", LIMIT: "LIMIT", LIST: "LIST", MEASUREMENT: "MEASUREMENT", MEASUREMENTS: "MEASUREMENTS", - MERGE: "MERGE", ORDER: "ORDER", QUERIES: "QUERIES", QUERY: "QUERY",