influxdb/influxql/engine_test.go

540 lines
16 KiB
Go

package influxql_test
import (
"bytes"
"encoding/json"
"fmt"
"os"
"reflect"
"sort"
"strings"
"testing"
"time"
"github.com/influxdb/influxdb/influxql"
)
// Ensure the planner can plan and execute a simple count query.
func TestPlanner_Plan_Count(t *testing.T) {
db := NewDB("2000-01-01T12:00:00Z")
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T00:00:00Z", map[string]interface{}{"value": float64(100)})
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T00:00:10Z", map[string]interface{}{"value": float64(90)})
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T00:00:20Z", map[string]interface{}{"value": float64(80)})
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T00:00:30Z", map[string]interface{}{"value": float64(70)})
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T00:00:40Z", map[string]interface{}{"value": float64(60)})
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T00:00:50Z", map[string]interface{}{"value": float64(50)})
// Expected resultset.
exp := minify(`[{"name":"cpu","columns":["time","count"],"values":[[0,6]]}]`)
// Execute and compare.
rs := db.MustPlanAndExecute(`SELECT count(value) FROM cpu`)
if act := minify(jsonify(rs)); exp != act {
t.Fatalf("unexpected resultset: %s", act)
}
}
// Ensure the planner can plan and execute a count query across multiple series.
func TestPlanner_Plan_Count_Multiseries(t *testing.T) {
db := NewDB("2000-01-01T12:00:00Z")
db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T00:00:00Z", map[string]interface{}{"value": float64(100)})
db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T00:00:10Z", map[string]interface{}{"value": float64(90)})
db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T00:00:20Z", map[string]interface{}{"value": float64(80)})
db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T00:00:30Z", map[string]interface{}{"value": float64(70)})
db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T00:00:40Z", map[string]interface{}{"value": float64(60)})
db.WriteSeries("cpu", map[string]string{"host": "servera", "region": "us-west"}, "2000-01-01T00:00:50Z", map[string]interface{}{"value": float64(50)})
// Expected resultset.
exp := minify(`[{"name":"cpu","columns":["time","count"],"values":[[0,6]]}]`)
// Execute and compare.
rs := db.MustPlanAndExecute(`SELECT count(value) FROM cpu`)
if act := minify(jsonify(rs)); exp != act {
t.Fatalf("unexpected resultset: %s", act)
}
}
// Ensure the planner can plan and execute a count query grouped by hour.
func TestPlanner_Plan_GroupByInterval(t *testing.T) {
db := NewDB("2000-01-01T12:00:00Z")
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(100)})
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(90)})
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T09:30:00Z", map[string]interface{}{"value": float64(80)})
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T11:00:00Z", map[string]interface{}{"value": float64(70)})
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T11:00:00Z", map[string]interface{}{"value": float64(60)})
db.WriteSeries("cpu", map[string]string{}, "2000-01-01T11:30:00Z", map[string]interface{}{"value": float64(50)})
// Expected resultset.
exp := minify(`[{
"name":"cpu",
"columns":["time","sum"],
"values":[
[946717200000000,190],
[946719000000000,80],
[946720800000000,0],
[946722600000000,0],
[946724400000000,130],
[946726200000000,50]
]
}]`)
// Query for data since 3 hours ago until now, grouped every 30 minutes.
rs := db.MustPlanAndExecute(`
SELECT sum(value)
FROM cpu
WHERE time >= now() - 3h
GROUP BY time(30m)`)
// Compare resultsets.
if act := jsonify(rs); exp != act {
t.Fatalf("unexpected resultset: %s", indent(act))
}
}
// Ensure the planner can plan and execute a query grouped by interval and tag.
func TestPlanner_Plan_GroupByIntervalAndTag(t *testing.T) {
db := NewDB("2000-01-01T12:00:00Z")
db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(10)})
db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T09:30:00Z", map[string]interface{}{"value": float64(20)})
db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T11:00:00Z", map[string]interface{}{"value": float64(30)})
db.WriteSeries("cpu", map[string]string{"host": "servera"}, "2000-01-01T11:30:00Z", map[string]interface{}{"value": float64(40)})
db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(1)})
db.WriteSeries("cpu", map[string]string{"host": "serverb"}, "2000-01-01T11:00:00Z", map[string]interface{}{"value": float64(2)})
// Query for data since 3 hours ago until now, grouped every 30 minutes.
rs := db.MustPlanAndExecute(`
SELECT sum(value)
FROM cpu
WHERE time >= now() - 3h
GROUP BY time(1h), host`)
// Expected resultset.
exp := minify(`[{
"name":"cpu",
"tags":{"host":"servera"},
"columns":["time","sum"],
"values":[
[946717200000000,30],
[946720800000000,0],
[946724400000000,70]
]
},{
"name":"cpu",
"tags":{"host":"serverb"},
"columns":["time","sum"],
"values":[
[946717200000000,1],
[946720800000000,0],
[946724400000000,2]
]
}]`)
// Compare resultsets.
if act := jsonify(rs); exp != act {
t.Fatalf("unexpected resultset: \n\n%s\n\n%s\n\n", exp, act)
}
}
// Ensure the planner can plan and execute a joined query.
func TestPlanner_Plan_Join(t *testing.T) {
db := NewDB("2000-01-01T12:00:00Z")
db.WriteSeries("cpu.0", map[string]string{}, "2000-01-01T00:00:00Z", map[string]interface{}{"value": float64(1)})
db.WriteSeries("cpu.0", map[string]string{}, "2000-01-01T00:00:10Z", map[string]interface{}{"value": float64(2)})
db.WriteSeries("cpu.0", map[string]string{}, "2000-01-01T00:00:20Z", map[string]interface{}{"value": float64(3)})
db.WriteSeries("cpu.0", map[string]string{}, "2000-01-01T00:00:30Z", map[string]interface{}{"value": float64(4)})
db.WriteSeries("cpu.1", map[string]string{}, "2000-01-01T00:00:00Z", map[string]interface{}{"value": float64(10)})
db.WriteSeries("cpu.1", map[string]string{}, "2000-01-01T00:00:10Z", map[string]interface{}{"value": float64(20)})
db.WriteSeries("cpu.1", map[string]string{}, "2000-01-01T00:00:30Z", map[string]interface{}{"value": float64(40)})
// Query must join the series and sum the values.
rs := db.MustPlanAndExecute(`
SELECT sum(cpu.0.value) + sum(cpu.1.value) AS "sum"
FROM JOIN(cpu.0, cpu.1)
WHERE time >= "2000-01-01 00:00:00" AND time < "2000-01-01 00:01:00"
GROUP BY time(10s)`)
// Expected resultset.
exp := minify(`[{
"columns":["time","sum"],
"values":[
[946684800000000,11],
[946684810000000,22],
[946684820000000,3],
[946684830000000,44],
[946684840000000,0],
[946684850000000,0]
]
}]`)
// Compare resultsets.
if act := jsonify(rs); exp != act {
t.Fatalf("unexpected resultset: %s", indent(act))
}
}
// DB represents an in-memory test database that implements methods for Planner.
type DB struct {
measurements map[string]*Measurement
series map[uint32]*Series
maxSeriesID uint32
Now time.Time
}
// NewDB returns a new instance of DB at a given time.
func NewDB(now string) *DB {
return &DB{
measurements: make(map[string]*Measurement),
series: make(map[uint32]*Series),
Now: mustParseTime(now),
}
}
// PlanAndExecute plans, executes, and retrieves all rows.
func (db *DB) PlanAndExecute(querystring string) ([]*influxql.Row, error) {
// Plan statement.
p := influxql.NewPlanner(db)
p.Now = func() time.Time { return db.Now }
e, err := p.Plan(MustParseSelectStatement(querystring))
if err != nil {
return nil, err
}
// Execute plan.
ch, err := e.Execute()
if err != nil {
return nil, err
}
// Collect resultset.
var rs []*influxql.Row
for row := range ch {
rs = append(rs, row)
}
return rs, nil
}
// MustPlanAndExecute plans, executes, and retrieves all rows. Panic on error.
func (db *DB) MustPlanAndExecute(querystring string) []*influxql.Row {
rs, err := db.PlanAndExecute(querystring)
if err != nil {
panic(err.Error())
}
return rs
}
// WriteSeries writes a series
func (db *DB) WriteSeries(name string, tags map[string]string, timestamp string, values map[string]interface{}) {
// Find or create measurement & series.
m, s := db.CreateSeriesIfNotExists(name, tags)
// Create point.
p := &point{
timestamp: mustParseTime(timestamp).UTC().UnixNano(),
values: make(map[uint8]interface{}),
}
// Map field names to field ids.
for k, v := range values {
f := m.CreateFieldIfNotExists(k, influxql.InspectDataType(v))
p.values[f.id] = v
}
// Add point to series.
s.points = append(s.points, p)
// Sort series points by time.
sort.Sort(points(s.points))
}
// CreateSeriesIfNotExists returns a measurement & series by name/tagset.
// Creates them if they don't exist.
func (db *DB) CreateSeriesIfNotExists(name string, tags map[string]string) (*Measurement, *Series) {
// Find or create meaurement
m := db.measurements[name]
if m == nil {
m = NewMeasurement(name)
db.measurements[name] = m
}
// Normalize tags and try to match against existing series.
if tags == nil {
tags = make(map[string]string)
}
for _, s := range m.series {
if reflect.DeepEqual(s.tags, tags) {
return m, s
}
}
// Create new series.
db.maxSeriesID++
s := &Series{id: db.maxSeriesID, tags: tags}
// Add series to DB and measurement.
db.series[s.id] = s
m.series[s.id] = s
return m, s
}
// MatchSeries returns the series ids that match a name and tagset.
func (db *DB) MatchSeries(name string, tags map[string]string) []uint32 {
// Find measurement.
m := db.measurements[name]
if m == nil {
return nil
}
// Compare tagsets against each series.
var ids []uint32
for _, s := range m.series {
// Check that each tag value matches the series' tag values.
matched := true
for k, v := range tags {
if s.tags[k] != v {
matched = false
break
}
}
// Append series if all tags match.
if matched {
ids = append(ids, s.id)
}
}
return ids
}
// SeriesTagValues returns a slice of tag values for a given series.
func (db *DB) SeriesTagValues(seriesID uint32, keys []string) (values []string) {
values = make([]string, len(keys))
// Find series.
s := db.series[seriesID]
if s == nil {
return
}
// Loop over keys and set values.
for i, key := range keys {
values[i] = s.tags[key]
}
return
}
// FieldID returns the field identifier for a given measurement name and field name.
func (db *DB) Field(name, field string) (fieldID uint8, typ influxql.DataType) {
// Find measurement.
m := db.measurements[name]
if m == nil {
return
}
// Find field.
f := m.fields[field]
if f == nil {
return
}
return f.id, f.typ
}
// CreateIterator returns a new iterator for a given field.
func (db *DB) CreateIterator(seriesID uint32, fieldID uint8, typ influxql.DataType, min, max time.Time, interval time.Duration) influxql.Iterator {
s := db.series[seriesID]
if s == nil {
panic(fmt.Sprintf("series not found: %d", seriesID))
}
// Create iterator.
i := &iterator{
points: s.points,
fieldID: fieldID,
typ: typ,
imin: -1,
interval: int64(interval),
}
if !min.IsZero() {
i.min = min.UnixNano()
}
if !max.IsZero() {
i.max = max.UnixNano()
}
return i
}
// iterator represents an implementation of Iterator over a set of points.
type iterator struct {
fieldID uint8
typ influxql.DataType
index int
points points
min, max int64 // time range
imin, imax int64 // interval time range
interval int64 // interval duration
}
// NextIterval moves the iterator to the next available interval.
// Returns true if another iterval is available.
func (i *iterator) NextIterval() bool {
// Initialize interval start time if not set.
// If there's no duration then there's only one interval.
// Otherwise increment it by the interval.
if i.imin == -1 {
i.imin = i.min
} else if i.interval == 0 {
return false
} else {
// Update interval start time if it's before iterator end time.
// Otherwise return false.
if imin := i.imin + i.interval; i.max == 0 || imin < i.max {
i.imin = imin
} else {
return false
}
}
// Interval end time should be the start time plus interval duration.
// If the end time is beyond the iterator end time then shorten it.
i.imax = i.imin + i.interval
if max := i.max; i.imax > max {
i.imax = max
}
return true
}
// Next returns the next point's timestamp and field value.
func (i *iterator) Next() (timestamp int64, value interface{}) {
for {
// If index is beyond points range then return nil.
if i.index > len(i.points)-1 {
return 0, nil
}
// Retrieve point and extract value.
p := i.points[i.index]
v := p.values[i.fieldID]
// Move cursor forward.
i.index++
// If timestamp is beyond bucket time range then move index back and exit.
timestamp := p.timestamp
if timestamp >= i.imax && i.imax != 0 {
i.index--
return 0, nil
}
// Return value if it is non-nil.
// Otherwise loop again and try the next point.
if v != nil {
return p.timestamp, v
}
}
}
// Time returns start time of the current interval.
func (i *iterator) Time() int64 { return i.imin }
// Interval returns the group by duration.
func (i *iterator) Interval() time.Duration { return time.Duration(i.interval) }
type Measurement struct {
name string
maxFieldID uint8
fields map[string]*Field
series map[uint32]*Series
}
// NewMeasurement returns a new instance of Measurement.
func NewMeasurement(name string) *Measurement {
return &Measurement{
name: name,
fields: make(map[string]*Field),
series: make(map[uint32]*Series),
}
}
// CreateFieldIfNotExists returns a field by name & data type.
// Creates it if it doesn't exist.
func (m *Measurement) CreateFieldIfNotExists(name string, typ influxql.DataType) *Field {
f := m.fields[name]
if f != nil && f.typ != typ {
panic("field data type mismatch: " + name)
} else if f == nil {
m.maxFieldID++
f = &Field{id: m.maxFieldID, name: name, typ: typ}
m.fields[name] = f
}
return f
}
type Series struct {
id uint32
tags map[string]string
points points
}
type Field struct {
id uint8
name string
typ influxql.DataType
}
type point struct {
timestamp int64
values map[uint8]interface{}
}
type points []*point
func (p points) Len() int { return len(p) }
func (p points) Less(i, j int) bool { return p[i].timestamp < p[j].timestamp }
func (p points) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
// mustParseTime parses an IS0-8601 string. Panic on error.
func mustParseTime(s string) time.Time {
t, err := time.Parse(time.RFC3339, s)
if err != nil {
panic(err.Error())
}
return t
}
// mustMarshalJSON encodes a value to JSON.
func mustMarshalJSON(v interface{}) []byte {
b, err := json.Marshal(v)
if err != nil {
panic("marshal json: " + err.Error())
}
return b
}
// jsonify encodes a value to JSON and returns as a string.
func jsonify(v interface{}) string { return string(mustMarshalJSON(v)) }
// ident indents a JSON string.
func indent(s string) string {
var buf bytes.Buffer
json.Indent(&buf, []byte(s), "", " ")
return buf.String()
}
// minify removes tabs and newlines.
func minify(s string) string { return strings.NewReplacer("\n", "", "\t", "").Replace(s) }