influxdb/influxql/engine_test.go

446 lines
14 KiB
Go
Raw Normal View History

2014-12-08 05:08:39 +00:00
package influxql_test
import (
"bytes"
2014-12-09 15:45:29 +00:00
"encoding/json"
2014-12-08 05:08:39 +00:00
"fmt"
"os"
2015-01-26 12:19:35 +00:00
"reflect"
"strings"
2014-12-08 05:08:39 +00:00
"testing"
"time"
"github.com/influxdb/influxdb/influxql"
)
2015-01-26 12:19:35 +00:00
// Ensure a mapper can process intervals across multiple iterators.
func TestMapper_Map(t *testing.T) {
m := influxql.NewMapper(influxql.MapSum, []influxql.Iterator{
NewIterator([]string{"foo"}, []Point{
{"2000-01-01T00:00:00Z", float64(10)}, // first minute
{"2000-01-01T00:00:30Z", float64(20)},
{"2000-01-01T00:01:00Z", float64(30)}, // second minute
{"2000-01-01T00:01:30Z", float64(40)},
{"2000-01-01T00:03:00Z", float64(50)}, // fourth minute (skip third)
}),
NewIterator([]string{"bar"}, []Point{
{"2000-01-01T00:01:00Z", float64(100)}, // second minute
{"2000-01-01T00:01:30Z", float64(200)},
}),
}, 1*time.Minute)
ch := m.Map().C()
if data := <-ch; !reflect.DeepEqual(data, map[influxql.Key]interface{}{influxql.Key{Timestamp: 946684800000000000, Values: "\x00\x03foo"}: float64(30)}) {
t.Fatalf("unexpected data(0/foo): %#v", data)
} else if data := <-ch; !reflect.DeepEqual(data, map[influxql.Key]interface{}{influxql.Key{Timestamp: 946684800000000000, Values: "\x00\x03bar"}: float64(0)}) {
t.Fatalf("unexpected data(0/bar): %#v", data)
}
if data := <-ch; !reflect.DeepEqual(data, map[influxql.Key]interface{}{influxql.Key{Timestamp: 946684860000000000, Values: "\x00\x03foo"}: float64(70)}) {
t.Fatalf("unexpected data(1/foo): %#v", data)
} else if data := <-ch; !reflect.DeepEqual(data, map[influxql.Key]interface{}{influxql.Key{Timestamp: 946684860000000000, Values: "\x00\x03bar"}: float64(300)}) {
t.Fatalf("unexpected data(1/bar): %#v", data)
}
if data := <-ch; !reflect.DeepEqual(data, map[influxql.Key]interface{}{influxql.Key{Timestamp: 946684920000000000, Values: "\x00\x03foo"}: float64(0)}) {
t.Fatalf("unexpected data(2/foo): %#v", data)
} else if data := <-ch; !reflect.DeepEqual(data, map[influxql.Key]interface{}{influxql.Key{Timestamp: 946684920000000000, Values: "\x00\x03bar"}: float64(0)}) {
t.Fatalf("unexpected data(2/bar): %#v", data)
}
if data := <-ch; !reflect.DeepEqual(data, map[influxql.Key]interface{}{influxql.Key{Timestamp: 946684980000000000, Values: "\x00\x03foo"}: float64(50)}) {
t.Fatalf("unexpected data(3/foo): %#v", data)
} else if data := <-ch; !reflect.DeepEqual(data, map[influxql.Key]interface{}{influxql.Key{Timestamp: 946684980000000000, Values: "\x00\x03bar"}: float64(0)}) {
t.Fatalf("unexpected data(3/bar): %#v", data)
}
}
// Ensure a reducer can combine data received from a mapper.
func TestReducer_Reduce(t *testing.T) {
m := influxql.NewMapper(influxql.MapSum, []influxql.Iterator{
NewIterator([]string{"foo"}, []Point{
{"2000-01-01T00:00:00Z", float64(10)},
{"2000-01-01T00:01:00Z", float64(20)},
}),
NewIterator([]string{"bar"}, []Point{
{"2000-01-01T00:00:00Z", float64(100)},
{"2000-01-01T00:01:00Z", float64(200)},
}),
NewIterator([]string{"foo"}, []Point{
{"2000-01-01T00:00:00Z", float64(1000)},
{"2000-01-01T00:01:00Z", float64(2000)},
}),
}, 1*time.Minute)
r := influxql.NewReducer(influxql.ReduceSum, []*influxql.Mapper{m})
ch := r.Reduce().C()
if data := <-ch; !reflect.DeepEqual(data, map[influxql.Key]interface{}{influxql.Key{Timestamp: 946684800000000000, Values: "\x00\x03bar"}: float64(100)}) {
t.Fatalf("unexpected data(0/bar): %#v", data)
} else if data = <-ch; !reflect.DeepEqual(data, map[influxql.Key]interface{}{influxql.Key{Timestamp: 946684800000000000, Values: "\x00\x03foo"}: float64(1010)}) {
t.Fatalf("unexpected data(0/foo): %#v", data)
}
if data := <-ch; !reflect.DeepEqual(data, map[influxql.Key]interface{}{influxql.Key{Timestamp: 946684860000000000, Values: "\x00\x03bar"}: float64(200)}) {
t.Fatalf("unexpected data(1/bar): %#v", data)
} else if data = <-ch; !reflect.DeepEqual(data, map[influxql.Key]interface{}{influxql.Key{Timestamp: 946684860000000000, Values: "\x00\x03foo"}: float64(2020)}) {
t.Fatalf("unexpected data(1/foo): %#v", data)
}
if data, ok := <-ch; data != nil {
t.Fatalf("unexpected data(end): %#v", data)
} else if ok {
t.Fatalf("expected channel close")
}
}
// Ensure the planner can plan and execute a simple count query.
func TestPlanner_Plan_Count(t *testing.T) {
2015-01-26 12:19:35 +00:00
tx := NewTx()
tx.CreateIteratorsFunc = func(stmt *influxql.SelectStatement) ([]influxql.Iterator, error) {
return []influxql.Iterator{
NewIterator(nil, []Point{
{"2000-01-01T00:00:00Z", float64(100)},
{"2000-01-01T00:00:10Z", float64(90)},
{"2000-01-01T00:00:20Z", float64(80)},
}),
NewIterator(nil, []Point{
{"2000-01-01T00:00:30Z", float64(70)},
{"2000-01-01T00:00:40Z", float64(60)},
{"2000-01-01T00:00:50Z", float64(50)},
})}, nil
}
// Expected resultset.
2015-01-26 12:19:35 +00:00
exp := minify(`[{"name":"cpu","columns":["time","count"],"values":[[0,6]]}]`)
2014-12-08 05:08:39 +00:00
// Execute and compare.
2015-01-26 12:19:35 +00:00
rs := MustPlanAndExecute(NewDB(tx), `2000-01-01T12:00:00Z`,
`SELECT count(value) FROM cpu WHERE time >= '2000-01-01'`)
if act := minify(jsonify(rs)); exp != act {
t.Fatalf("unexpected resultset: %s", act)
}
}
2014-12-08 05:08:39 +00:00
2015-01-26 23:18:24 +00:00
// Ensure the planner can plan and execute a mean query
func TestPlanner_Plan_Mean(t *testing.T) {
tx := NewTx()
tx.CreateIteratorsFunc = func(stmt *influxql.SelectStatement) ([]influxql.Iterator, error) {
return []influxql.Iterator{
NewIterator(nil, []Point{
{"2000-01-01T00:00:00Z", float64(100)},
{"2000-01-01T00:00:10Z", float64(90)},
{"2000-01-01T00:00:20Z", float64(80)},
}),
NewIterator(nil, []Point{
{"2000-01-01T00:00:00Z", float64(80)},
{"2000-01-01T00:00:10Z", float64(80)},
{"2000-01-01T00:00:20Z", float64(50)},
}),
NewIterator(nil, []Point{
{"2000-01-01T00:01:30Z", float64(70)},
{"2000-01-01T00:01:40Z", float64(60)},
{"2000-01-01T00:01:50Z", float64(50)},
})}, nil
}
// Expected resultset.
exp := minify(`[{"name":"cpu","columns":["time","mean"],"values":[[946684800000000,80],[946684860000000,60]]}]`)
// Execute and compare.
rs := MustPlanAndExecute(NewDB(tx), `2000-01-01T12:00:00Z`,
`SELECT mean(value) FROM cpu WHERE time >= '2000-01-01' GROUP BY time(1m)`)
if act := minify(jsonify(rs)); exp != act {
t.Fatalf("unexpected resultset: %s", act)
}
}
// Ensure the planner can plan and execute a count query grouped by hour.
func TestPlanner_Plan_GroupByInterval(t *testing.T) {
2015-01-26 12:19:35 +00:00
tx := NewTx()
tx.CreateIteratorsFunc = func(stmt *influxql.SelectStatement) ([]influxql.Iterator, error) {
//min, max, interval := MustTimeRangeAndInterval(stmt, `2000-01-01T12:00:00Z`)
return []influxql.Iterator{
NewIterator(nil, []Point{
{"2000-01-01T09:00:00Z", float64(100)},
{"2000-01-01T09:00:00Z", float64(90)},
{"2000-01-01T09:30:00Z", float64(80)},
}),
NewIterator(nil, []Point{
{"2000-01-01T11:00:00Z", float64(70)},
{"2000-01-01T11:00:00Z", float64(60)},
{"2000-01-01T11:30:00Z", float64(50)},
})}, nil
2015-01-23 09:44:56 +00:00
}
// Expected resultset.
exp := minify(`[{
"name":"cpu",
"columns":["time","sum"],
"values":[
[946717200000000,190],
[946719000000000,80],
[946720800000000,0],
[946722600000000,0],
[946724400000000,130],
[946726200000000,50]
]
}]`)
// Query for data since 3 hours ago until now, grouped every 30 minutes.
2015-01-26 12:19:35 +00:00
rs := MustPlanAndExecute(NewDB(tx), "2000-01-01T12:00:00Z", `
SELECT sum(value)
FROM cpu
WHERE time >= now() - 3h
GROUP BY time(30m)`)
// Compare resultsets.
if act := jsonify(rs); exp != act {
t.Fatalf("unexpected resultset: %s", indent(act))
2014-12-08 05:08:39 +00:00
}
}
// Ensure the planner can plan and execute a query grouped by interval and tag.
func TestPlanner_Plan_GroupByIntervalAndTag(t *testing.T) {
2015-01-26 12:19:35 +00:00
tx := NewTx()
tx.CreateIteratorsFunc = func(stmt *influxql.SelectStatement) ([]influxql.Iterator, error) {
//min, max, interval := MustTimeRangeAndInterval(stmt, `2000-01-01T12:00:00Z`)
return []influxql.Iterator{
NewIterator([]string{"servera"}, []Point{
{"2000-01-01T09:00:00Z", float64(10)},
{"2000-01-01T09:30:00Z", float64(20)},
{"2000-01-01T11:00:00Z", float64(30)},
{"2000-01-01T11:30:00Z", float64(40)},
}),
NewIterator([]string{"serverb"}, []Point{
{"2000-01-01T09:00:00Z", float64(1)},
{"2000-01-01T11:00:00Z", float64(2)},
})}, nil
2015-01-23 09:44:56 +00:00
}
// Query for data since 3 hours ago until now, grouped every 30 minutes.
2015-01-26 12:19:35 +00:00
rs := MustPlanAndExecute(NewDB(tx), "2000-01-01T12:00:00Z", `
SELECT sum(value)
FROM cpu
WHERE time >= now() - 3h
GROUP BY time(1h), host`)
// Expected resultset.
exp := minify(`[{
"name":"cpu",
"tags":{"host":"servera"},
"columns":["time","sum"],
"values":[
[946717200000000,30],
[946720800000000,0],
[946724400000000,70]
]
},{
"name":"cpu",
"tags":{"host":"serverb"},
"columns":["time","sum"],
"values":[
[946717200000000,1],
[946720800000000,0],
[946724400000000,2]
]
}]`)
// Compare resultsets.
if act := jsonify(rs); exp != act {
2015-01-23 09:44:56 +00:00
t.Fatalf("unexpected resultset:\n\nexp=%s\n\ngot=%s\n\n", exp, act)
}
}
2015-01-23 09:44:56 +00:00
// Ensure the planner sends the correct simplified statements to the iterator creator.
func TestPlanner_CreateIterators(t *testing.T) {
var flag0, flag1 bool
2015-01-26 12:19:35 +00:00
tx := NewTx()
tx.CreateIteratorsFunc = func(stmt *influxql.SelectStatement) ([]influxql.Iterator, error) {
switch stmt.String() {
case `SELECT cpu.0.value FROM cpu.0 GROUP BY time(10s)`:
flag0 = true
case `SELECT cpu.1.value FROM cpu.1 GROUP BY time(10s)`:
flag1 = true
default:
t.Fatalf("unexpected stmt passed to iterator creator: %s", stmt.String())
}
return nil, nil
}
2015-01-23 09:44:56 +00:00
stmt := MustParseSelectStatement(`
2015-01-19 23:01:24 +00:00
SELECT sum(cpu.0.value) + sum(cpu.1.value) AS sum
FROM JOIN(cpu.0, cpu.1)
WHERE time >= '2000-01-01 00:00:00' AND time < '2000-01-01 00:01:00'
GROUP BY time(10s)`)
2015-01-26 12:19:35 +00:00
p := influxql.NewPlanner(NewDB(tx))
2015-01-23 09:44:56 +00:00
p.Now = func() time.Time { return mustParseTime("2000-01-01T12:00:00Z") }
if _, err := p.Plan(stmt); err != nil {
t.Fatalf("unexpected error: %s", err)
}
2015-01-23 09:44:56 +00:00
// Verify correct statements were passed through.
if !flag0 {
t.Error("cpu.0 substatement not passed in")
2014-12-09 15:45:29 +00:00
}
2015-01-23 09:44:56 +00:00
if !flag1 {
t.Error("cpu.1 substatement not passed in")
}
}
2015-01-26 12:19:35 +00:00
// DB represents a mockable database.
type DB struct {
BeginFunc func() (influxql.Tx, error)
}
2015-01-26 12:19:35 +00:00
// NewDB returns a mock database that returns a transaction for Begin().
func NewDB(tx influxql.Tx) *DB {
return &DB{
BeginFunc: func() (influxql.Tx, error) { return tx, nil },
}
2014-12-08 05:08:39 +00:00
}
2015-01-26 12:19:35 +00:00
func (db *DB) Begin() (influxql.Tx, error) { return db.BeginFunc() }
2014-12-09 15:45:29 +00:00
2015-01-26 12:19:35 +00:00
// Tx represents a mockable transaction.
type Tx struct {
OpenFunc func() error
CloseFunc func() error
SetNowFunc func(time.Time)
2014-12-09 15:45:29 +00:00
2015-01-26 12:19:35 +00:00
CreateIteratorsFunc func(*influxql.SelectStatement) ([]influxql.Iterator, error)
2014-12-09 15:45:29 +00:00
}
2015-01-26 12:19:35 +00:00
// NewTx returns a new mock Tx.
func NewTx() *Tx {
return &Tx{
OpenFunc: func() error { return nil },
CloseFunc: func() error { return nil },
SetNowFunc: func(_ time.Time) {},
}
2014-12-09 15:45:29 +00:00
}
2015-01-26 12:19:35 +00:00
func (tx *Tx) Open() error { return tx.OpenFunc() }
func (tx *Tx) Close() error { return tx.CloseFunc() }
func (tx *Tx) SetNow(now time.Time) { tx.SetNowFunc(now) }
2015-01-26 12:19:35 +00:00
func (tx *Tx) CreateIterators(stmt *influxql.SelectStatement) ([]influxql.Iterator, error) {
return tx.CreateIteratorsFunc(stmt)
2014-12-08 05:08:39 +00:00
}
2015-01-26 12:19:35 +00:00
// Iterator represents an implementation of Iterator.
type Iterator struct {
tags string // encoded dimensional tag values
points []Point // underlying point data
index int // current point index
}
2014-12-09 15:45:29 +00:00
2015-01-26 12:19:35 +00:00
// NewIterator returns a new iterator.
func NewIterator(tags []string, points []Point) *Iterator {
return &Iterator{tags: string(influxql.MarshalStrings(tags)), points: points}
}
2014-12-09 15:45:29 +00:00
2015-01-26 12:19:35 +00:00
// Tags returns the encoded dimensional tag values.
func (i *Iterator) Tags() string { return i.tags }
2015-01-26 12:19:35 +00:00
// Next returns the next point's timestamp and field value.
func (i *Iterator) Next() (key int64, value interface{}) {
// If index is beyond points range then return nil.
if i.index > len(i.points)-1 {
return 0, nil
2014-12-09 15:45:29 +00:00
}
2015-01-26 12:19:35 +00:00
// Retrieve point and extract value.
p := i.points[i.index]
i.index++
return p.Time(), p.Value
}
2014-12-09 15:45:29 +00:00
2015-01-23 09:44:56 +00:00
// Point represents a single value at a given time.
type Point struct {
Timestamp string // ISO-8601 formatted timestamp.
Value interface{}
}
2014-12-09 15:45:29 +00:00
2015-01-23 09:44:56 +00:00
// Time returns the Timestamp as nanoseconds since epoch.
func (p Point) Time() int64 { return mustParseTime(p.Timestamp).UnixNano() }
2014-12-09 15:45:29 +00:00
2015-01-23 09:44:56 +00:00
// PlanAndExecute plans, executes, and retrieves all rows.
2015-01-26 12:19:35 +00:00
func PlanAndExecute(db influxql.DB, now string, querystring string) ([]*influxql.Row, error) {
2015-01-23 09:44:56 +00:00
// Plan statement.
2015-01-26 12:19:35 +00:00
p := influxql.NewPlanner(db)
2015-01-23 09:44:56 +00:00
p.Now = func() time.Time { return mustParseTime(now) }
e, err := p.Plan(MustParseSelectStatement(querystring))
if err != nil {
return nil, err
}
2014-12-09 15:45:29 +00:00
2015-01-23 09:44:56 +00:00
// Execute plan.
ch, err := e.Execute()
if err != nil {
return nil, err
2014-12-09 15:45:29 +00:00
}
2015-01-23 09:44:56 +00:00
// Collect resultset.
var rs []*influxql.Row
for row := range ch {
rs = append(rs, row)
2014-12-09 15:45:29 +00:00
}
2015-01-23 09:44:56 +00:00
return rs, nil
2014-12-08 05:08:39 +00:00
}
2015-01-23 09:44:56 +00:00
// MustPlanAndExecute plans, executes, and retrieves all rows. Panic on error.
2015-01-26 12:19:35 +00:00
func MustPlanAndExecute(db influxql.DB, now string, querystring string) []*influxql.Row {
rs, err := PlanAndExecute(db, now, querystring)
2015-01-23 09:44:56 +00:00
if err != nil {
panic(err.Error())
}
return rs
2014-12-08 05:08:39 +00:00
}
2015-01-26 12:19:35 +00:00
// MustTimeRangeAndInterval returns the time range & interval of the query.
// Set max to 2000-01-01 if zero. Panic on error.
func MustTimeRangeAndInterval(stmt *influxql.SelectStatement, defaultMax string) (time.Time, time.Time, time.Duration) {
2015-01-23 09:44:56 +00:00
min, max := influxql.TimeRange(stmt.Condition)
interval, _, err := stmt.Dimensions.Normalize()
if err != nil {
panic(err.Error())
}
2015-01-26 12:19:35 +00:00
if max.IsZero() {
max = mustParseTime(defaultMax)
}
2015-01-23 09:44:56 +00:00
return min, max, interval
2014-12-08 05:08:39 +00:00
}
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
// mustParseTime parses an IS0-8601 string. Panic on error.
func mustParseTime(s string) time.Time {
t, err := time.Parse(time.RFC3339, s)
if err != nil {
panic(err.Error())
}
return t
}
2014-12-09 15:45:29 +00:00
// mustMarshalJSON encodes a value to JSON.
func mustMarshalJSON(v interface{}) []byte {
b, err := json.Marshal(v)
if err != nil {
panic("marshal json: " + err.Error())
}
return b
}
// jsonify encodes a value to JSON and returns as a string.
func jsonify(v interface{}) string { return string(mustMarshalJSON(v)) }
// ident indents a JSON string.
func indent(s string) string {
var buf bytes.Buffer
json.Indent(&buf, []byte(s), "", " ")
return buf.String()
}
// minify removes tabs and newlines.
func minify(s string) string { return strings.NewReplacer("\n", "", "\t", "").Replace(s) }
2015-01-26 12:19:35 +00:00
func unix(nsec int64) time.Time { return time.Unix(0, nsec).UTC() }