Merge pull request #1247 from influxdb/group-by

Add group by support for tags.
pull/1254/head
Ben Johnson 2014-12-21 10:06:40 -07:00
commit b3a4d0aca7
8 changed files with 524 additions and 144 deletions

View File

@ -64,7 +64,8 @@ func (_ Fields) node() {}
func (_ *Field) node() {}
func (_ Dimensions) node() {}
func (_ *Dimension) node() {}
func (_ *Series) node() {}
func (_ *Measurement) node() {}
func (_ Measurements) node() {}
func (_ *Join) node() {}
func (_ *Merge) node() {}
func (_ *VarRef) node() {}
@ -142,9 +143,9 @@ type Source interface {
source()
}
func (_ *Series) source() {}
func (_ *Join) source() {}
func (_ *Merge) source() {}
func (_ *Measurement) source() {}
func (_ *Join) source() {}
func (_ *Merge) source() {}
// SortField represens a field to sort results by.
type SortField struct {
@ -277,7 +278,7 @@ func (s *SelectStatement) Substatement(ref *VarRef) (*SelectStatement, error) {
}
// If there is only one series source then return it with the whole condition.
if _, ok := s.Source.(*Series); ok {
if _, ok := s.Source.(*Measurement); ok {
other.Source = s.Source
other.Condition = s.Condition
return other, nil
@ -288,7 +289,7 @@ func (s *SelectStatement) Substatement(ref *VarRef) (*SelectStatement, error) {
if name == "" {
return nil, fmt.Errorf("field source not found: %s", ref.Val)
}
other.Source = &Series{Name: name}
other.Source = &Measurement{Name: name}
// Filter out conditions.
if s.Condition != nil {
@ -338,23 +339,21 @@ func filterExprBySource(name string, expr Expr) Expr {
// Returns a blank string if no sources match.
func MatchSource(src Source, name string) string {
switch src := src.(type) {
case *Series:
case *Measurement:
if strings.HasPrefix(name, src.Name) {
return src.Name
}
case *Join:
if str := MatchSource(src.LHS, name); str != "" {
return str
}
if str := MatchSource(src.RHS, name); str != "" {
return str
for _, m := range src.Measurements {
if strings.HasPrefix(name, m.Name) {
return m.Name
}
}
case *Merge:
if str := MatchSource(src.LHS, name); str != "" {
return str
}
if str := MatchSource(src.RHS, name); str != "" {
return str
for _, m := range src.Measurements {
if strings.HasPrefix(name, m.Name) {
return m.Name
}
}
}
return ""
@ -709,34 +708,44 @@ type Dimension struct {
// String returns a string representation of the dimension.
func (d *Dimension) String() string { return d.Expr.String() }
// Series represents a single series used as a datasource.
type Series struct {
// Measurements represents a list of measurements.
type Measurements []*Measurement
// String returns a string representation of the measurements.
func (a Measurements) String() string {
var str []string
for _, m := range a {
str = append(str, m.String())
}
return strings.Join(str, ", ")
}
// Measurement represents a single measurement used as a datasource.
type Measurement struct {
Name string
}
// String returns a string representation of the series.
func (s *Series) String() string { return s.Name }
// String returns a string representation of the measurement.
func (m *Measurement) String() string { return QuoteIdent(s.Name) }
// Join represents two datasources joined together.
type Join struct {
LHS Source
RHS Source
Measurements Measurements
}
// String returns a string representation of the join.
func (j *Join) String() string {
return fmt.Sprintf("%s JOIN %s", j.LHS.String(), j.RHS.String())
return fmt.Sprintf("join(%s)", j.Measurements.String())
}
// Merge represents a datasource created by merging two datasources.
type Merge struct {
LHS Source
RHS Source
Measurements Measurements
}
// String returns a string representation of the merge.
func (m *Merge) String() string {
return fmt.Sprintf("%s JOIN %s", m.LHS.String(), m.RHS.String())
return fmt.Sprintf("merge(%s)", m.Measurements.String())
}
// VarRef represents a reference to a variable.

View File

@ -39,28 +39,28 @@ func TestSelectStatement_Substatement(t *testing.T) {
// 1. Simple join
{
stmt: `SELECT sum(aa.value) + sum(bb.value) FROM aa JOIN bb`,
stmt: `SELECT sum(aa.value) + sum(bb.value) FROM join(aa,bb)`,
expr: &influxql.VarRef{Val: "aa.value"},
sub: `SELECT aa.value FROM aa`,
},
// 2. Simple merge
{
stmt: `SELECT sum(aa.value) + sum(bb.value) FROM aa MERGE bb`,
stmt: `SELECT sum(aa.value) + sum(bb.value) FROM merge(aa, bb)`,
expr: &influxql.VarRef{Val: "bb.value"},
sub: `SELECT bb.value FROM bb`,
},
// 3. Join with condition
{
stmt: `SELECT sum(aa.value) + sum(bb.value) FROM aa JOIN bb WHERE aa.host = "servera" AND bb.host = "serverb"`,
stmt: `SELECT sum(aa.value) + sum(bb.value) FROM join(aa, bb) WHERE aa.host = "servera" AND bb.host = "serverb"`,
expr: &influxql.VarRef{Val: "bb.value"},
sub: `SELECT bb.value FROM bb WHERE bb.host = "serverb"`,
},
// 4. Join with complex condition
{
stmt: `SELECT sum(aa.value) + sum(bb.value) FROM aa JOIN bb WHERE aa.host = "servera" AND (bb.host = "serverb" OR bb.host = "serverc") AND 1 = 2`,
stmt: `SELECT sum(aa.value) + sum(bb.value) FROM join(aa, bb) WHERE aa.host = "servera" AND (bb.host = "serverb" OR bb.host = "serverc") AND 1 = 2`,
expr: &influxql.VarRef{Val: "bb.value"},
sub: `SELECT bb.value FROM bb WHERE (bb.host = "serverb" OR bb.host = "serverc") AND 1.000 = 2.000`,
},

View File

@ -1,8 +1,11 @@
package influxql
import (
"encoding/binary"
"errors"
"fmt"
"hash/fnv"
"sort"
"strings"
"time"
)
@ -12,6 +15,9 @@ type DB interface {
// Returns a list of series data ids matching a name and tags.
MatchSeries(name string, tags map[string]string) []uint32
// Returns a slice of tag values for a series.
SeriesTagValues(seriesID uint32, keys []string) []string
// Returns the id and data type for a series field.
Field(name, field string) (fieldID uint8, typ DataType)
@ -59,11 +65,11 @@ func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) {
e.min, e.max = min, max
// Determine group by interval.
interval, dimensions, err := p.normalizeDimensions(stmt.Dimensions)
interval, tags, err := p.normalizeDimensions(stmt.Dimensions)
if err != nil {
return nil, err
}
e.interval, e.dimensions = interval, dimensions
e.interval, e.tags = interval, tags
// Generate a processor for each field.
for i, f := range stmt.Fields {
@ -79,7 +85,7 @@ func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) {
// normalizeDimensions extacts the time interval, if specified.
// Returns all remaining dimensions.
func (p *Planner) normalizeDimensions(dimensions Dimensions) (time.Duration, Dimensions, error) {
func (p *Planner) normalizeDimensions(dimensions Dimensions) (time.Duration, []string, error) {
// Ignore if there are no dimensions.
if len(dimensions) == 0 {
return 0, nil, nil
@ -97,10 +103,10 @@ func (p *Planner) normalizeDimensions(dimensions Dimensions) (time.Duration, Dim
if !ok {
return 0, nil, errors.New("time dimension must have one duration argument")
}
return lit.Val, dimensions[1:], nil
return lit.Val, dimensionKeys(dimensions[1:]), nil
}
return 0, dimensions, nil
return 0, dimensionKeys(dimensions), nil
}
// planField returns a processor for field.
@ -151,7 +157,7 @@ func (p *Planner) planCall(e *Executor, c *Call) (processor, error) {
if err != nil {
return nil, err
}
name := sub.Source.(*Series).Name
name := sub.Source.(*Measurement).Name
tags := make(map[string]string) // TODO: Extract tags.
// Find field.
@ -174,6 +180,7 @@ func (p *Planner) planCall(e *Executor, c *Call) (processor, error) {
m := newMapper(e, seriesID, fieldID, typ)
m.min, m.max = e.min.UnixNano(), e.max.UnixNano()
m.interval = int64(e.interval)
m.key = append(make([]byte, 8), marshalStrings(p.DB.SeriesTagValues(seriesID, e.tags))...)
r.mappers[i] = m
}
@ -199,7 +206,20 @@ func (p *Planner) planCall(e *Executor, c *Call) (processor, error) {
// planBinaryExpr generates a processor for a binary expression.
// A binary expression represents a join operator between two processors.
func (p *Planner) planBinaryExpr(e *Executor, expr *BinaryExpr) (processor, error) {
panic("TODO")
// Create processor for LHS.
lhs, err := p.planExpr(e, expr.LHS)
if err != nil {
return nil, fmt.Errorf("lhs: %s", err)
}
// Create processor for RHS.
rhs, err := p.planExpr(e, expr.RHS)
if err != nil {
return nil, fmt.Errorf("rhs: %s", err)
}
// Combine processors.
return newBinaryExprEvaluator(e, expr.Op, lhs, rhs), nil
}
// Executor represents the implementation of Executor.
@ -210,7 +230,7 @@ type Executor struct {
processors []processor // per-field processors
min, max time.Time // time range
interval time.Duration // group by duration
dimensions Dimensions // non-interval dimensions
tags []string // group by tag keys
}
// Execute begins execution of the query and returns a channel to receive rows.
@ -231,28 +251,14 @@ func (e *Executor) Execute() (<-chan *Row, error) {
func (e *Executor) execute(out chan *Row) {
// TODO: Support multi-value rows.
// Initialize row.
row := &Row{}
row.Name = e.processors[0].name()
// Create column names.
row.Columns = make([]string, 0)
if e.interval != 0 {
row.Columns = append(row.Columns, "time")
}
for i, f := range e.stmt.Fields {
name := f.Name()
if name == "" {
name = fmt.Sprintf("col%d", i)
}
row.Columns = append(row.Columns, name)
}
// Initialize map of rows by encoded tagset.
rows := make(map[string]*Row)
// Combine values from each processor.
loop:
for {
values := make([]interface{}, len(e.processors)+1)
// Retrieve values from processors and write them to the approprite
// row based on their tagset.
for i, p := range e.processors {
// Retrieve data from the processor.
m, ok := <-p.C()
@ -262,25 +268,86 @@ loop:
// Set values on returned row.
for k, v := range m {
values[0] = k / int64(time.Microsecond) // TODO: Set once per row.
// Extract timestamp and tag values from key.
b := []byte(k)
timestamp := int64(binary.BigEndian.Uint64(b[0:8]))
// Lookup row values and populate data.
values := e.createRowValuesIfNotExists(rows, e.processors[0].name(), b[8:], timestamp)
values[i+1] = v
}
}
// Remove timestamp if there is no group by interval.
if e.interval == 0 {
values = values[1:]
}
row.Values = append(row.Values, values)
}
// Send row to the channel.
out <- row
// Normalize rows and values.
// This converts the timestamps from nanoseconds to microseconds.
a := make(Rows, 0, len(rows))
for _, row := range rows {
for _, values := range row.Values {
values[0] = values[0].(int64) / int64(time.Microsecond)
}
a = append(a, row)
}
sort.Sort(a)
// Send rows to the channel.
for _, row := range a {
out <- row
}
// Mark the end of the output channel.
close(out)
}
// creates a new value set if one does not already exist for a given tagset + timestamp.
func (e *Executor) createRowValuesIfNotExists(rows map[string]*Row, name string, tagset []byte, timestamp int64) []interface{} {
// TODO: Add "name" to lookup key.
// Find row by tagset.
var row *Row
if row = rows[string(tagset)]; row == nil {
row = &Row{Name: name}
// Create tag map.
row.Tags = make(map[string]string)
for i, v := range unmarshalStrings(tagset) {
row.Tags[e.tags[i]] = v
}
// Create column names.
row.Columns = make([]string, 1, len(e.stmt.Fields)+1)
row.Columns[0] = "time"
for i, f := range e.stmt.Fields {
name := f.Name()
if name == "" {
name = fmt.Sprintf("col%d", i)
}
row.Columns = append(row.Columns, name)
}
// Save to lookup.
rows[string(tagset)] = row
}
// If no values exist or last value doesn't match the timestamp then create new.
if len(row.Values) == 0 || row.Values[len(row.Values)-1][0] != timestamp {
values := make([]interface{}, len(e.processors)+1)
values[0] = timestamp
row.Values = append(row.Values, values)
}
return row.Values[len(row.Values)-1]
}
// dimensionKeys returns a list of tag key names for the dimensions.
// Each dimension must be a VarRef.
func dimensionKeys(dimensions Dimensions) (a []string) {
for _, d := range dimensions {
a = append(a, d.Expr.(*VarRef).Val)
}
return
}
// mapper represents an object for processing iterators.
type mapper struct {
executor *Executor // parent executor
@ -290,9 +357,10 @@ type mapper struct {
itr Iterator // series iterator
min, max int64 // time range
interval int64 // group by interval
key []byte // encoded timestamp + dimensional values
fn mapFunc // map function
c chan map[int64]interface{}
c chan map[string]interface{}
done chan chan struct{}
}
@ -303,7 +371,7 @@ func newMapper(e *Executor, seriesID uint32, fieldID uint8, typ DataType) *mappe
seriesID: seriesID,
fieldID: fieldID,
typ: typ,
c: make(chan map[int64]interface{}, 0),
c: make(chan map[string]interface{}, 0),
done: make(chan chan struct{}, 0),
}
}
@ -319,7 +387,7 @@ func (m *mapper) start() {
func (m *mapper) stop() { syncClose(m.done) }
// C returns the streaming data channel.
func (m *mapper) C() <-chan map[int64]interface{} { return m.c }
func (m *mapper) C() <-chan map[string]interface{} { return m.c }
// run executes the map function against the iterator.
func (m *mapper) run() {
@ -329,9 +397,13 @@ func (m *mapper) run() {
close(m.c)
}
// emit sends a value to the reducer's output channel.
// emit sends a value to the mapper's output channel.
func (m *mapper) emit(key int64, value interface{}) {
m.c <- map[int64]interface{}{key: value}
// Encode the timestamp to the beginning of the key.
binary.BigEndian.PutUint64(m.key, uint64(key))
// OPTIMIZE: Collect emit calls and flush all at once.
m.c <- map[string]interface{}{string(m.key): value}
}
// mapFunc represents a function used for mapping iterators.
@ -355,6 +427,14 @@ func mapSum(itr Iterator, m *mapper) {
m.emit(itr.Time(), n)
}
// processor represents an object for joining reducer output.
type processor interface {
start()
stop()
name() string
C() <-chan map[string]interface{}
}
// reducer represents an object for processing mapper output.
// Implements processor.
type reducer struct {
@ -363,7 +443,7 @@ type reducer struct {
mappers []*mapper // child mappers
fn reduceFunc // reduce function
c chan map[int64]interface{}
c chan map[string]interface{}
done chan chan struct{}
}
@ -371,7 +451,7 @@ type reducer struct {
func newReducer(e *Executor) *reducer {
return &reducer{
executor: e,
c: make(chan map[int64]interface{}, 0),
c: make(chan map[string]interface{}, 0),
done: make(chan chan struct{}, 0),
}
}
@ -393,17 +473,17 @@ func (r *reducer) stop() {
}
// C returns the streaming data channel.
func (r *reducer) C() <-chan map[int64]interface{} { return r.c }
func (r *reducer) C() <-chan map[string]interface{} { return r.c }
// name returns the source name.
func (r *reducer) name() string { return r.stmt.Source.(*Series).Name }
func (r *reducer) name() string { return r.stmt.Source.(*Measurement).Name }
// run runs the reducer loop to read mapper output and reduce it.
func (r *reducer) run() {
loop:
for {
// Combine all data from the mappers.
data := make(map[int64][]interface{})
data := make(map[string][]interface{})
for _, m := range r.mappers {
kv, ok := <-m.C()
if !ok {
@ -425,15 +505,15 @@ loop:
}
// emit sends a value to the reducer's output channel.
func (r *reducer) emit(key int64, value interface{}) {
r.c <- map[int64]interface{}{key: value}
func (r *reducer) emit(key string, value interface{}) {
r.c <- map[string]interface{}{key: value}
}
// reduceFunc represents a function used for reducing mapper output.
type reduceFunc func(int64, []interface{}, *reducer)
type reduceFunc func(string, []interface{}, *reducer)
// reduceSum computes the sum of values for each key.
func reduceSum(key int64, values []interface{}, r *reducer) {
func reduceSum(key string, values []interface{}, r *reducer) {
var n float64
for _, v := range values {
n += v.(float64)
@ -441,18 +521,109 @@ func reduceSum(key int64, values []interface{}, r *reducer) {
r.emit(key, n)
}
// processor represents an object for joining reducer output.
type processor interface {
start()
stop()
name() string
C() <-chan map[int64]interface{}
// binaryExprEvaluator represents a processor for combining two processors.
type binaryExprEvaluator struct {
executor *Executor // parent executor
lhs, rhs processor // processors
op Token // operation
c chan map[string]interface{}
done chan chan struct{}
}
// newBinaryExprEvaluator returns a new instance of binaryExprEvaluator.
func newBinaryExprEvaluator(e *Executor, op Token, lhs, rhs processor) *binaryExprEvaluator {
return &binaryExprEvaluator{
executor: e,
op: op,
lhs: lhs,
rhs: rhs,
c: make(chan map[string]interface{}, 0),
done: make(chan chan struct{}, 0),
}
}
// start begins streaming values from the lhs/rhs processors
func (e *binaryExprEvaluator) start() {
e.lhs.start()
e.rhs.start()
go e.run()
}
// stop stops the processor.
func (e *binaryExprEvaluator) stop() {
e.lhs.stop()
e.rhs.stop()
syncClose(e.done)
}
// C returns the streaming data channel.
func (e *binaryExprEvaluator) C() <-chan map[string]interface{} { return e.c }
// name returns the source name.
func (e *binaryExprEvaluator) name() string { return "" }
// run runs the processor loop to read subprocessor output and combine it.
func (e *binaryExprEvaluator) run() {
for {
// Read LHS value.
lhs, ok := <-e.lhs.C()
if !ok {
break
}
// Read RHS value.
rhs, ok := <-e.rhs.C()
if !ok {
break
}
// Merge maps.
m := make(map[string]interface{})
for k, v := range lhs {
m[k] = e.eval(v, rhs[k])
}
for k, v := range rhs {
// Skip value if already processed in lhs loop.
if _, ok := m[k]; ok {
continue
}
m[k] = e.eval(float64(0), v)
}
// Return value.
e.c <- m
}
// Mark the channel as complete.
close(e.c)
}
// eval evaluates two values using the evaluator's operation.
func (e *binaryExprEvaluator) eval(lhs, rhs interface{}) interface{} {
switch e.op {
case ADD:
return lhs.(float64) + rhs.(float64)
case SUB:
return lhs.(float64) - rhs.(float64)
case MUL:
return lhs.(float64) * rhs.(float64)
case DIV:
rhs := rhs.(float64)
if rhs == 0 {
return float64(0)
}
return lhs.(float64) / rhs
default:
// TODO: Validate operation & data types.
panic("invalid operation: " + e.op.String())
}
}
// literalProcessor represents a processor that continually sends a literal value.
type literalProcessor struct {
val interface{}
c chan map[int64]interface{}
c chan map[string]interface{}
done chan chan struct{}
}
@ -460,13 +631,13 @@ type literalProcessor struct {
func newLiteralProcessor(val interface{}) *literalProcessor {
return &literalProcessor{
val: val,
c: make(chan map[int64]interface{}, 0),
c: make(chan map[string]interface{}, 0),
done: make(chan chan struct{}, 0),
}
}
// C returns the streaming data channel.
func (p *literalProcessor) C() <-chan map[int64]interface{} { return p.c }
func (p *literalProcessor) C() <-chan map[string]interface{} { return p.c }
// process continually returns a literal value with a "0" key.
func (p *literalProcessor) start() { go p.run() }
@ -478,7 +649,7 @@ func (p *literalProcessor) run() {
case ch := <-p.done:
close(ch)
return
case p.c <- map[int64]interface{}{0: p.val}:
case p.c <- map[string]interface{}{"": p.val}:
}
}
}
@ -521,8 +692,73 @@ type Row struct {
Err error `json:"err,omitempty"`
}
// TODO: Walk field expressions to extract subqueries.
// TODO: Resolve subqueries to series ids.
// TODO: send query with all ids to executor (knows to run locally or remote server)
// TODO: executor creates mapper for each series id.
// TODO: Create
// tagsHash returns a hash of tag key/value pairs.
func (r *Row) tagsHash() uint64 {
h := fnv.New64a()
keys := r.tagsKeys()
for _, k := range keys {
h.Write([]byte(k))
h.Write([]byte(r.Tags[k]))
}
return h.Sum64()
}
// tagKeys returns a sorted list of tag keys.
func (r *Row) tagsKeys() []string {
a := make([]string, len(r.Tags))
for k := range r.Tags {
a = append(a, k)
}
sort.Strings(a)
return a
}
// Rows represents a list of rows that can be sorted consistently by name/tag.
type Rows []*Row
func (p Rows) Len() int { return len(p) }
func (p Rows) Less(i, j int) bool {
// Sort by name first.
if p[i].Name != p[j].Name {
return p[i].Name < p[j].Name
}
// Sort by tag set hash. Tags don't have a meaningful sort order so we
// just compute a hash and sort by that instead. This allows the tests
// to receive rows in a predictable order every time.
return p[i].tagsHash() < p[j].tagsHash()
}
func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// marshalStrings encodes an array of strings into a byte slice.
func marshalStrings(a []string) (ret []byte) {
for _, s := range a {
// Create a slice for len+data
b := make([]byte, 2+len(s))
binary.BigEndian.PutUint16(b[0:2], uint16(len(s)))
copy(b[2:], s)
// Append it to the full byte slice.
ret = append(ret, b...)
}
return
}
// unmarshalStrings decodes a byte slice into an array of strings.
func unmarshalStrings(b []byte) (ret []string) {
for {
// If there's no more data then exit.
if len(b) == 0 {
return
}
// Decode size + data.
n := binary.BigEndian.Uint16(b[0:2])
ret = append(ret, string(b[2:n+2]))
// Move the byte slice forward and retry.
b = b[n+2:]
}
}

View File

@ -25,7 +25,27 @@ func TestPlanner_Plan_Count(t *testing.T) {
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T00:00:50Z", map[string]interface{}{"value": float64(50)})
// Expected resultset.
exp := minify(`[{"name":"cpu","columns":["count"],"values":[[6]]}]`)
exp := minify(`[{"name":"cpu","columns":["time","count"],"values":[[0,6]]}]`)
// Execute and compare.
rs := db.MustPlanAndExecute(`SELECT count(value) FROM cpu`)
if act := minify(jsonify(rs)); exp != act {
t.Fatalf("unexpected resultset: %s", act)
}
}
// Ensure the planner can plan and execute a count query across multiple series.
func TestPlanner_Plan_Count_Multiseries(t *testing.T) {
db := NewDB("2000-01-01T12:00:00Z")
db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T00:00:00Z", map[string]interface{}{"value": float64(100)})
db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T00:00:10Z", map[string]interface{}{"value": float64(90)})
db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T00:00:20Z", map[string]interface{}{"value": float64(80)})
db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T00:00:30Z", map[string]interface{}{"value": float64(70)})
db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T00:00:40Z", map[string]interface{}{"value": float64(60)})
db.WriteSeries("cpu", map[string]string{"host": "servera", "region": "us-west"}, "2000-01-01T00:00:50Z", map[string]interface{}{"value": float64(50)})
// Expected resultset.
exp := minify(`[{"name":"cpu","columns":["time","count"],"values":[[0,6]]}]`)
// Execute and compare.
rs := db.MustPlanAndExecute(`SELECT count(value) FROM cpu`)
@ -35,7 +55,7 @@ func TestPlanner_Plan_Count(t *testing.T) {
}
// Ensure the planner can plan and execute a count query grouped by hour.
func TestPlanner_Plan_CountByHour(t *testing.T) {
func TestPlanner_Plan_GroupByInterval(t *testing.T) {
db := NewDB("2000-01-01T12:00:00Z")
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(100)})
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(90)})
@ -71,10 +91,94 @@ func TestPlanner_Plan_CountByHour(t *testing.T) {
}
}
// Ensure the planner can plan and execute a query grouped by interval and tag.
func TestPlanner_Plan_GroupByIntervalAndTag(t *testing.T) {
db := NewDB("2000-01-01T12:00:00Z")
db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(10)})
db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T09:30:00Z", map[string]interface{}{"value": float64(20)})
db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T11:00:00Z", map[string]interface{}{"value": float64(30)})
db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T11:30:00Z", map[string]interface{}{"value": float64(40)})
db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(1)})
db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T11:00:00Z", map[string]interface{}{"value": float64(2)})
// Query for data since 3 hours ago until now, grouped every 30 minutes.
rs := db.MustPlanAndExecute(`
SELECT sum(value)
FROM cpu
WHERE time >= now() - 3h
GROUP BY time(1h), host`)
// Expected resultset.
exp := minify(`[{
"name":"cpu",
"tags":{"host":"servera"},
"columns":["time","sum"],
"values":[
[946717200000000,30],
[946720800000000,0],
[946724400000000,70]
]
},{
"name":"cpu",
"tags":{"host":"serverb"},
"columns":["time","sum"],
"values":[
[946717200000000,1],
[946720800000000,0],
[946724400000000,2]
]
}]`)
// Compare resultsets.
if act := jsonify(rs); exp != act {
t.Fatalf("unexpected resultset: \n\n%s\n\n%s\n\n", exp, act)
}
}
// Ensure the planner can plan and execute a joined query.
func TestPlanner_Plan_Join(t *testing.T) {
db := NewDB("2000-01-01T12:00:00Z")
db.WriteSeries("cpu.0", map[string]string{}, "2000-01-01T00:00:00Z", map[string]interface{}{"value": float64(1)})
db.WriteSeries("cpu.0", map[string]string{}, "2000-01-01T00:00:10Z", map[string]interface{}{"value": float64(2)})
db.WriteSeries("cpu.0", map[string]string{}, "2000-01-01T00:00:20Z", map[string]interface{}{"value": float64(3)})
db.WriteSeries("cpu.0", map[string]string{}, "2000-01-01T00:00:30Z", map[string]interface{}{"value": float64(4)})
db.WriteSeries("cpu.1", map[string]string{}, "2000-01-01T00:00:00Z", map[string]interface{}{"value": float64(10)})
db.WriteSeries("cpu.1", map[string]string{}, "2000-01-01T00:00:10Z", map[string]interface{}{"value": float64(20)})
db.WriteSeries("cpu.1", map[string]string{}, "2000-01-01T00:00:30Z", map[string]interface{}{"value": float64(40)})
// Query must join the series and sum the values.
rs := db.MustPlanAndExecute(`
SELECT sum(cpu.0.value) + sum(cpu.1.value) AS "sum"
FROM JOIN(cpu.0, cpu.1)
WHERE time >= "2000-01-01 00:00:00" AND time < "2000-01-01 00:01:00"
GROUP BY time(10s)`)
// Expected resultset.
exp := minify(`[{
"columns":["time","sum"],
"values":[
[946684800000000,11],
[946684810000000,22],
[946684820000000,3],
[946684830000000,44],
[946684840000000,0],
[946684850000000,0]
]
}]`)
// Compare resultsets.
if act := jsonify(rs); exp != act {
t.Fatalf("unexpected resultset: %s", indent(act))
}
}
// DB represents an in-memory test database that implements methods for Planner.
type DB struct {
measurements map[string]*Measurement
series map[uint32]*Series
maxSeriesID uint32
Now time.Time
}
@ -167,8 +271,8 @@ func (db *DB) CreateSeriesIfNotExists(name string, tags map[string]string) (*Mea
}
// Create new series.
m.maxSeriesID++
s := &Series{id: m.maxSeriesID, tags: tags}
db.maxSeriesID++
s := &Series{id: db.maxSeriesID, tags: tags}
// Add series to DB and measurement.
db.series[s.id] = s
@ -206,6 +310,23 @@ func (db *DB) MatchSeries(name string, tags map[string]string) []uint32 {
return ids
}
// SeriesTagValues returns a slice of tag values for a given series and tag keys.
func (db *DB) SeriesTagValues(seriesID uint32, keys []string) (values []string) {
values = make([]string, len(keys))
// Find series.
s := db.series[seriesID]
if s == nil {
return
}
// Loop over keys and set values.
for i, key := range keys {
values[i] = s.tags[key]
}
return
}
// FieldID returns the field identifier for a given measurement name and field name.
func (db *DB) Field(name, field string) (fieldID uint8, typ influxql.DataType) {
// Find measurement.
@ -334,8 +455,7 @@ type Measurement struct {
maxFieldID uint8
fields map[string]*Field
maxSeriesID uint32
series map[uint32]*Series
series map[uint32]*Series
}
// NewMeasurement returns a new instance of Measurement.

View File

@ -566,30 +566,52 @@ func (p *Parser) parseAlias() (string, error) {
// parseSource parses the "FROM" clause of the query.
func (p *Parser) parseSource() (Source, error) {
// Scan the identifier for the series.
// The first token can either be the series name or a join/merge call.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
}
lhs := &Series{Name: lit}
// Check if the next token is JOIN or MERGE.
if tok, _, _ := p.scanIgnoreWhitespace(); tok == JOIN {
rhs, err := p.parseSource()
if err != nil {
return nil, err
}
return &Join{LHS: lhs, RHS: rhs}, nil
} else if tok == MERGE {
rhs, err := p.parseSource()
if err != nil {
return nil, err
}
return &Merge{LHS: lhs, RHS: rhs}, nil
// If the token is a string or the next token is not an LPAREN then return a measurement.
if next, _, _ := p.scan(); tok == STRING || (tok == IDENT && next != LPAREN) {
p.unscan()
return &Measurement{Name: lit}, nil
}
p.unscan()
return lhs, nil
// Verify the source type is join/merge.
sourceType := strings.ToLower(lit)
if sourceType != "join" && sourceType != "merge" {
return nil, &ParseError{Message: "unknown merge type: " + sourceType, Pos: pos}
}
// Parse measurement list.
var measurements []*Measurement
for {
// Scan the measurement name.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"measurement name"}, pos)
}
measurements = append(measurements, &Measurement{Name: lit})
// If there's not a comma next then stop parsing measurements.
if tok, _, _ := p.scan(); tok != COMMA {
p.unscan()
break
}
}
// Expect a closing right paren.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != RPAREN {
return nil, newParseError(tokstr(tok, lit), []string{")"}, pos)
}
// Return the appropriate source type.
if sourceType == "join" {
return &Join{Measurements: measurements}, nil
} else {
return &Merge{Measurements: measurements}, nil
}
}
// parseCondition parses the "WHERE" clause of the query, if it exists.

View File

@ -52,7 +52,7 @@ func TestParser_ParseStatement(t *testing.T) {
Fields: influxql.Fields{
&influxql.Field{Expr: &influxql.Wildcard{}},
},
Source: &influxql.Series{Name: "myseries"},
Source: &influxql.Measurement{Name: "myseries"},
},
},
@ -65,7 +65,7 @@ func TestParser_ParseStatement(t *testing.T) {
&influxql.Field{Expr: &influxql.VarRef{Val: "field2"}},
&influxql.Field{Expr: &influxql.VarRef{Val: "field3"}, Alias: "field_x"},
},
Source: &influxql.Series{Name: "myseries"},
Source: &influxql.Measurement{Name: "myseries"},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "host"},
@ -83,14 +83,14 @@ func TestParser_ParseStatement(t *testing.T) {
// SELECT statement with JOIN
{
s: `SELECT field1 FROM aa JOIN bb JOIN cc`,
s: `SELECT field1 FROM join(aa,"bb", cc) JOIN cc`,
stmt: &influxql.SelectStatement{
Fields: influxql.Fields{&influxql.Field{Expr: &influxql.VarRef{Val: "field1"}}},
Source: &influxql.Join{
LHS: &influxql.Series{Name: "aa"},
RHS: &influxql.Join{
LHS: &influxql.Series{Name: "bb"},
RHS: &influxql.Series{Name: "cc"},
Measurements: influxql.Measurements{
{Name: "aa"},
{Name: "bb"},
{Name: "cc"},
},
},
},
@ -98,14 +98,13 @@ func TestParser_ParseStatement(t *testing.T) {
// SELECT statement with MERGE
{
s: `SELECT field1 FROM aa MERGE bb MERGE cc`,
s: `SELECT field1 FROM merge(aa,b.b)`,
stmt: &influxql.SelectStatement{
Fields: influxql.Fields{&influxql.Field{Expr: &influxql.VarRef{Val: "field1"}}},
Source: &influxql.Merge{
LHS: &influxql.Series{Name: "aa"},
RHS: &influxql.Merge{
LHS: &influxql.Series{Name: "bb"},
RHS: &influxql.Series{Name: "cc"},
Measurements: influxql.Measurements{
{Name: "aa"},
{Name: "b.b"},
},
},
},
@ -116,7 +115,7 @@ func TestParser_ParseStatement(t *testing.T) {
s: `select my_field from myseries`,
stmt: &influxql.SelectStatement{
Fields: influxql.Fields{&influxql.Field{Expr: &influxql.VarRef{Val: "my_field"}}},
Source: &influxql.Series{Name: "myseries"},
Source: &influxql.Measurement{Name: "myseries"},
},
},
@ -125,7 +124,7 @@ func TestParser_ParseStatement(t *testing.T) {
s: `SELECT field1 FROM myseries ORDER BY ASC, field1, field2 DESC LIMIT 10`,
stmt: &influxql.SelectStatement{
Fields: influxql.Fields{&influxql.Field{Expr: &influxql.VarRef{Val: "field1"}}},
Source: &influxql.Series{Name: "myseries"},
Source: &influxql.Measurement{Name: "myseries"},
SortFields: influxql.SortFields{
&influxql.SortField{Ascending: true},
&influxql.SortField{Name: "field1"},
@ -139,7 +138,7 @@ func TestParser_ParseStatement(t *testing.T) {
{
s: `DELETE FROM myseries WHERE host = 'hosta.influxdb.org'`,
stmt: &influxql.DeleteStatement{
Source: &influxql.Series{Name: "myseries"},
Source: &influxql.Measurement{Name: "myseries"},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "host"},
@ -194,7 +193,7 @@ func TestParser_ParseStatement(t *testing.T) {
{
s: `LIST TAG KEYS FROM src WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`,
stmt: &influxql.ListTagKeysStatement{
Source: &influxql.Series{Name: "src"},
Source: &influxql.Measurement{Name: "src"},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "region"},
@ -213,7 +212,7 @@ func TestParser_ParseStatement(t *testing.T) {
{
s: `LIST TAG VALUES FROM src WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`,
stmt: &influxql.ListTagValuesStatement{
Source: &influxql.Series{Name: "src"},
Source: &influxql.Measurement{Name: "src"},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "region"},
@ -232,7 +231,7 @@ func TestParser_ParseStatement(t *testing.T) {
{
s: `LIST FIELD KEYS FROM src WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`,
stmt: &influxql.ListFieldKeysStatement{
Source: &influxql.Series{Name: "src"},
Source: &influxql.Measurement{Name: "src"},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "region"},
@ -251,7 +250,7 @@ func TestParser_ParseStatement(t *testing.T) {
{
s: `LIST FIELD VALUES FROM src WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`,
stmt: &influxql.ListFieldValuesStatement{
Source: &influxql.Series{Name: "src"},
Source: &influxql.Measurement{Name: "src"},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "region"},
@ -285,7 +284,7 @@ func TestParser_ParseStatement(t *testing.T) {
Name: "myquery",
Source: &influxql.SelectStatement{
Fields: influxql.Fields{&influxql.Field{Expr: &influxql.Call{Name: "count"}}},
Source: &influxql.Series{Name: "myseries"},
Source: &influxql.Measurement{Name: "myseries"},
},
Target: "foo",
},

View File

@ -109,13 +109,11 @@ func TestScanner_Scan(t *testing.T) {
{s: `INNER`, tok: influxql.INNER},
{s: `INSERT`, tok: influxql.INSERT},
{s: `INTO`, tok: influxql.INTO},
{s: `JOIN`, tok: influxql.JOIN},
{s: `KEYS`, tok: influxql.KEYS},
{s: `LIMIT`, tok: influxql.LIMIT},
{s: `LIST`, tok: influxql.LIST},
{s: `MEASUREMENT`, tok: influxql.MEASUREMENT},
{s: `MEASUREMENTS`, tok: influxql.MEASUREMENTS},
{s: `MERGE`, tok: influxql.MERGE},
{s: `ORDER`, tok: influxql.ORDER},
{s: `QUERIES`, tok: influxql.QUERIES},
{s: `SELECT`, tok: influxql.SELECT},

View File

@ -65,13 +65,11 @@ const (
INNER
INSERT
INTO
JOIN
KEYS
LIMIT
LIST
MEASUREMENT
MEASUREMENTS
MERGE
ORDER
QUERIES
QUERY
@ -129,13 +127,11 @@ var tokens = [...]string{
INNER: "INNER",
INSERT: "INSERT",
INTO: "INTO",
JOIN: "JOIN",
KEYS: "KEYS",
LIMIT: "LIMIT",
LIST: "LIST",
MEASUREMENT: "MEASUREMENT",
MEASUREMENTS: "MEASUREMENTS",
MERGE: "MERGE",
ORDER: "ORDER",
QUERIES: "QUERIES",
QUERY: "QUERY",