Bounds Refactor (#675)

* Refactor bounds in planner
pull/10616/head
Adam Perlin 2018-08-23 15:58:29 -07:00 committed by GitHub
parent 816a11c9a0
commit 302aaee1f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 780 additions and 931 deletions

19
query/bounds.go Normal file
View File

@ -0,0 +1,19 @@
package query
import "time"
type Bounds struct {
Start Time
Stop Time
}
// IsEmpty reports whether the given bounds
// are empty, i.e., if start >= stop.
func (b Bounds) IsEmpty(now time.Time) bool {
return b.Start.Time(now).Equal(b.Stop.Time(now)) || b.Start.Time(now).After(b.Stop.Time(now))
}
// HasZero returns true if the given bounds contain a Go zero time value as either Start or Stop.
func (b Bounds) HasZero() bool {
return b.Start.IsZero() || b.Stop.IsZero()
}

128
query/bounds_test.go Normal file
View File

@ -0,0 +1,128 @@
package query_test
import (
"testing"
"time"
"github.com/influxdata/platform/query"
)
var EmptyBounds = query.Bounds{
Start: query.Now,
Stop: query.Now,
}
func TestBounds_HasZero(t *testing.T) {
tests := []struct {
name string
now time.Time
bounds query.Bounds
want bool
}{
{
name: "single zero",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Time{},
},
want: true,
},
{
name: "both zero",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: query.Bounds{
Start: query.Time{},
Stop: query.Time{},
},
want: true,
},
{
name: "both non-zero",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.bounds.HasZero()
if got != tt.want {
t.Errorf("unexpected result for bounds.HasZero(): got %t, want %t", got, tt.want)
}
})
}
}
func TestBounds_IsEmpty(t *testing.T) {
tests := []struct {
name string
now time.Time
bounds query.Bounds
want bool
}{
{
name: "empty bounds / start == stop",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: query.Bounds{
Start: query.Now,
Stop: query.Now,
},
want: true,
},
{
name: "empty bounds / absolute now == relative now",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: query.Bounds{
Start: query.Now,
Stop: query.Time{
Absolute: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
},
},
want: true,
},
{
name: "start > stop",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: time.Hour,
},
Stop: query.Now,
},
want: true,
},
{
name: "start < stop",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.bounds.IsEmpty(tt.now)
if got != tt.want {
t.Errorf("unexpected result for bounds.IsEmpty(): got %t, want %t", got, tt.want)
}
})
}
}

View File

@ -36,16 +36,16 @@ func NewExecutor(deps Dependencies, logger *zap.Logger) Executor {
}
type streamContext struct {
bounds Bounds
bounds *Bounds
}
func newStreamContext(b Bounds) streamContext {
func newStreamContext(b *Bounds) streamContext {
return streamContext{
bounds: b,
}
}
func (ctx streamContext) Bounds() Bounds {
func (ctx streamContext) Bounds() *Bounds {
return ctx.bounds
}
@ -128,13 +128,19 @@ func (es *executionState) createNode(ctx context.Context, pr *plan.Procedure, no
return n, nil
}
// Add explicit stream context if bounds are set on this node
var streamContext streamContext
if pr.Bounds != nil {
streamContext.bounds = &Bounds{
Start: pr.Bounds.Start,
Stop: pr.Bounds.Stop,
}
}
// Build execution context
ec := executionContext{
es: es,
streamContext: newStreamContext(Bounds{
Start: resolveTime(pr.Bounds.Start, es.p.Now),
Stop: resolveTime(pr.Bounds.Stop, es.p.Now),
}),
es: es,
streamContext: streamContext,
}
if len(pr.Parents) > 0 {

View File

@ -6,6 +6,8 @@ import (
"testing"
"time"
"github.com/influxdata/platform/query/values"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform"
"github.com/influxdata/platform/query"
@ -60,9 +62,9 @@ func TestExecutor_Execute(t *testing.T) {
},
}},
},
Bounds: plan.BoundsSpec{
Start: query.Time{Absolute: time.Unix(0, 1)},
Stop: query.Time{Absolute: time.Unix(0, 5)},
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(time.Unix(0, 1)),
Stop: values.ConvertTime(time.Unix(0, 5)),
},
Parents: nil,
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("sum")},
@ -75,6 +77,10 @@ func TestExecutor_Execute(t *testing.T) {
Parents: []plan.ProcedureID{
plan.ProcedureIDFromOperationID("from"),
},
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(time.Unix(0, 1)),
Stop: values.ConvertTime(time.Unix(0, 5)),
},
Children: nil,
},
},
@ -126,9 +132,9 @@ func TestExecutor_Execute(t *testing.T) {
},
}},
},
Bounds: plan.BoundsSpec{
Start: query.Time{Absolute: time.Unix(0, 1)},
Stop: query.Time{Absolute: time.Unix(0, 5)},
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(time.Unix(0, 1)),
Stop: values.ConvertTime(time.Unix(0, 5)),
},
Parents: nil,
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("sum")},
@ -141,6 +147,10 @@ func TestExecutor_Execute(t *testing.T) {
Parents: []plan.ProcedureID{
plan.ProcedureIDFromOperationID("from"),
},
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(time.Unix(0, 1)),
Stop: values.ConvertTime(time.Unix(0, 5)),
},
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("join")},
},
plan.ProcedureIDFromOperationID("count"): {
@ -151,6 +161,10 @@ func TestExecutor_Execute(t *testing.T) {
Parents: []plan.ProcedureID{
plan.ProcedureIDFromOperationID("from"),
},
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(time.Unix(0, 1)),
Stop: values.ConvertTime(time.Unix(0, 5)),
},
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("join")},
},
plan.ProcedureIDFromOperationID("join"): {
@ -166,6 +180,10 @@ func TestExecutor_Execute(t *testing.T) {
plan.ProcedureIDFromOperationID("sum"),
plan.ProcedureIDFromOperationID("count"),
},
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(time.Unix(0, 1)),
Stop: values.ConvertTime(time.Unix(0, 5)),
},
Children: nil,
},
},
@ -269,7 +287,11 @@ func TestExecutor_Execute(t *testing.T) {
},
},
},
Parents: nil,
Parents: nil,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(time.Unix(0, 1)),
Stop: values.ConvertTime(time.Unix(0, 5)),
},
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("sum")},
},
plan.ProcedureIDFromOperationID("sum"): {
@ -280,6 +302,10 @@ func TestExecutor_Execute(t *testing.T) {
Parents: []plan.ProcedureID{
plan.ProcedureIDFromOperationID("from"),
},
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(time.Unix(0, 1)),
Stop: values.ConvertTime(time.Unix(0, 5)),
},
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("join")},
},
plan.ProcedureIDFromOperationID("count"): {
@ -290,6 +316,10 @@ func TestExecutor_Execute(t *testing.T) {
Parents: []plan.ProcedureID{
plan.ProcedureIDFromOperationID("from"),
},
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(time.Unix(0, 1)),
Stop: values.ConvertTime(time.Unix(0, 5)),
},
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("join")},
},
plan.ProcedureIDFromOperationID("join"): {
@ -305,6 +335,10 @@ func TestExecutor_Execute(t *testing.T) {
plan.ProcedureIDFromOperationID("sum"),
plan.ProcedureIDFromOperationID("count"),
},
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(time.Unix(0, 1)),
Stop: values.ConvertTime(time.Unix(0, 5)),
},
Children: nil,
},
},
@ -421,9 +455,9 @@ func TestExecutor_Execute(t *testing.T) {
},
}},
},
Bounds: plan.BoundsSpec{
Start: query.Time{Absolute: time.Unix(0, 1)},
Stop: query.Time{Absolute: time.Unix(0, 5)},
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(time.Unix(0, 1)),
Stop: values.ConvertTime(time.Unix(0, 5)),
},
Parents: nil,
Children: []plan.ProcedureID{
@ -439,6 +473,10 @@ func TestExecutor_Execute(t *testing.T) {
Parents: []plan.ProcedureID{
plan.ProcedureIDFromOperationID("from"),
},
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(time.Unix(0, 1)),
Stop: values.ConvertTime(time.Unix(0, 5)),
},
Children: nil,
},
plan.ProcedureIDFromOperationID("mean"): {
@ -449,6 +487,10 @@ func TestExecutor_Execute(t *testing.T) {
Parents: []plan.ProcedureID{
plan.ProcedureIDFromOperationID("from"),
},
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(time.Unix(0, 1)),
Stop: values.ConvertTime(time.Unix(0, 5)),
},
Children: nil,
},
},

View File

@ -19,7 +19,7 @@ type Transformation interface {
// StreamContext represents necessary context for a single stream of
// query data.
type StreamContext interface {
Bounds() Bounds
Bounds() *Bounds
}
type Administration interface {

View File

@ -78,7 +78,7 @@ func (s *FirstProcedureSpec) PushDown(root *plan.Procedure, dup func() *plan.Pro
root = dup()
selectSpec = root.Spec.(*FromProcedureSpec)
selectSpec.BoundsSet = false
selectSpec.Bounds = plan.BoundsSpec{}
selectSpec.Bounds = query.Bounds{}
selectSpec.LimitSet = false
selectSpec.PointsLimit = 0
selectSpec.SeriesLimit = 0
@ -88,7 +88,7 @@ func (s *FirstProcedureSpec) PushDown(root *plan.Procedure, dup func() *plan.Pro
return
}
selectSpec.BoundsSet = true
selectSpec.Bounds = plan.BoundsSpec{
selectSpec.Bounds = query.Bounds{
Start: query.MinTime,
Stop: query.Now,
}

View File

@ -95,7 +95,7 @@ func TestFirst_PushDown(t *testing.T) {
want := &plan.Procedure{
Spec: &functions.FromProcedureSpec{
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.MinTime,
Stop: query.Now,
},
@ -113,7 +113,7 @@ func TestFirst_PushDown_Duplicate(t *testing.T) {
root := &plan.Procedure{
Spec: &functions.FromProcedureSpec{
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.MinTime,
Stop: query.Now,
},

View File

@ -97,7 +97,7 @@ type FromProcedureSpec struct {
Hosts []string
BoundsSet bool
Bounds plan.BoundsSpec
Bounds query.Bounds
FilterSet bool
Filter *semantic.FunctionExpression
@ -139,7 +139,7 @@ func newFromProcedure(qs query.OperationSpec, pa plan.Administration) (plan.Proc
func (s *FromProcedureSpec) Kind() plan.ProcedureKind {
return FromKind
}
func (s *FromProcedureSpec) TimeBounds() plan.BoundsSpec {
func (s *FromProcedureSpec) TimeBounds() query.Bounds {
return s.Bounds
}
func (s *FromProcedureSpec) Copy() plan.ProcedureSpec {
@ -184,26 +184,27 @@ func (s *FromProcedureSpec) Copy() plan.ProcedureSpec {
func createFromSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execute.Administration) (execute.Source, error) {
spec := prSpec.(*FromProcedureSpec)
var w execute.Window
bounds := a.StreamContext().Bounds()
if bounds == nil {
return nil, errors.New("nil bounds passed to from")
}
if spec.WindowSet {
w = execute.Window{
Every: execute.Duration(spec.Window.Every),
Period: execute.Duration(spec.Window.Period),
Round: execute.Duration(spec.Window.Round),
Start: a.ResolveTime(spec.Window.Start),
Start: bounds.Start,
}
} else {
duration := execute.Duration(a.ResolveTime(spec.Bounds.Stop)) - execute.Duration(a.ResolveTime(spec.Bounds.Start))
duration := execute.Duration(bounds.Stop) - execute.Duration(bounds.Start)
w = execute.Window{
Every: duration,
Period: duration,
Start: a.ResolveTime(spec.Bounds.Start),
Start: bounds.Start,
}
}
currentTime := w.Start + execute.Time(w.Period)
bounds := execute.Bounds{
Start: a.ResolveTime(spec.Bounds.Start),
Stop: a.ResolveTime(spec.Bounds.Stop),
}
deps := a.Dependencies()[FromKind].(storage.Dependencies)
orgID := a.OrganizationID()
@ -241,7 +242,7 @@ func createFromSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execu
GroupKeys: spec.GroupKeys,
AggregateMethod: spec.AggregateMethod,
},
bounds,
*bounds,
w,
currentTime,
), nil

View File

@ -78,7 +78,7 @@ func (s *LastProcedureSpec) PushDown(root *plan.Procedure, dup func() *plan.Proc
root = dup()
selectSpec = root.Spec.(*FromProcedureSpec)
selectSpec.BoundsSet = false
selectSpec.Bounds = plan.BoundsSpec{}
selectSpec.Bounds = query.Bounds{}
selectSpec.LimitSet = false
selectSpec.PointsLimit = 0
selectSpec.SeriesLimit = 0
@ -88,7 +88,7 @@ func (s *LastProcedureSpec) PushDown(root *plan.Procedure, dup func() *plan.Proc
return
}
selectSpec.BoundsSet = true
selectSpec.Bounds = plan.BoundsSpec{
selectSpec.Bounds = query.Bounds{
Start: query.MinTime,
Stop: query.Now,
}

View File

@ -98,7 +98,7 @@ func TestLast_PushDown(t *testing.T) {
want := &plan.Procedure{
Spec: &functions.FromProcedureSpec{
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.MinTime,
Stop: query.Now,
},
@ -116,7 +116,7 @@ func TestLast_PushDown_Duplicate(t *testing.T) {
root := &plan.Procedure{
Spec: &functions.FromProcedureSpec{
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.MinTime,
Stop: query.Now,
},

View File

@ -8,6 +8,7 @@ import (
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/semantic"
"github.com/influxdata/platform/query/values"
"github.com/pkg/errors"
)
const RangeKind = "range"
@ -93,7 +94,7 @@ func (s *RangeOpSpec) Kind() query.OperationKind {
}
type RangeProcedureSpec struct {
Bounds plan.BoundsSpec
Bounds query.Bounds
TimeCol string
StartCol string
StopCol string
@ -111,7 +112,7 @@ func newRangeProcedure(qs query.OperationSpec, pa plan.Administration) (plan.Pro
}
return &RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: spec.Start,
Stop: spec.Stop,
},
@ -139,6 +140,7 @@ func (s *RangeProcedureSpec) PushDownRules() []plan.PushDownRule {
},
}}
}
func (s *RangeProcedureSpec) PushDown(root *plan.Procedure, dup func() *plan.Procedure) {
selectSpec := root.Spec.(*FromProcedureSpec)
if selectSpec.BoundsSet {
@ -149,14 +151,14 @@ func (s *RangeProcedureSpec) PushDown(root *plan.Procedure, dup func() *plan.Pro
root = dup()
selectSpec = root.Spec.(*FromProcedureSpec)
selectSpec.BoundsSet = false
selectSpec.Bounds = plan.BoundsSpec{}
selectSpec.Bounds = query.Bounds{}
return
}
selectSpec.BoundsSet = true
selectSpec.Bounds = s.Bounds
}
func (s *RangeProcedureSpec) TimeBounds() plan.BoundsSpec {
func (s *RangeProcedureSpec) TimeBounds() query.Bounds {
return s.Bounds
}
@ -168,21 +170,12 @@ func createRangeTransformation(id execute.DatasetID, mode execute.AccumulationMo
cache := execute.NewTableBuilderCache(a.Allocator())
d := execute.NewDataset(id, mode, cache)
// Resolve range transformation bounds against current execution now value if they're relative
start := a.ResolveTime(s.Bounds.Start)
stop := a.ResolveTime(s.Bounds.Stop)
// Range behavior is invalid if start > stop
if start > stop {
return nil, nil, fmt.Errorf("range error: start bound greater than stop")
bounds := a.StreamContext().Bounds()
if bounds == nil {
return nil, nil, errors.New("nil bounds supplied to range")
}
absoluteBounds := execute.Bounds{
Start: start,
Stop: stop,
}
t, err := NewRangeTransformation(d, cache, s, absoluteBounds)
t, err := NewRangeTransformation(d, cache, s, *bounds)
if err != nil {
return nil, nil, err
}

View File

@ -129,7 +129,7 @@ func TestRangeOperation_Marshaling(t *testing.T) {
func TestRange_PushDown(t *testing.T) {
spec := &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Stop: query.Now,
},
}
@ -139,7 +139,7 @@ func TestRange_PushDown(t *testing.T) {
want := &plan.Procedure{
Spec: &functions.FromProcedureSpec{
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Stop: query.Now,
},
},
@ -161,7 +161,7 @@ func TestRange_Process(t *testing.T) {
{
name: "from csv",
spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -5 * time.Minute,
@ -208,7 +208,7 @@ func TestRange_Process(t *testing.T) {
{
name: "invalid column",
spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -5 * time.Minute,
@ -250,7 +250,7 @@ func TestRange_Process(t *testing.T) {
{
name: "specified column",
spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -2 * time.Minute,
@ -293,7 +293,7 @@ func TestRange_Process(t *testing.T) {
{
name: "group key no overlap",
spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -2 * time.Minute,
@ -338,7 +338,7 @@ func TestRange_Process(t *testing.T) {
{
name: "group key overlap",
spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
Absolute: time.Unix(12*time.Minute.Nanoseconds(), 0),
},
@ -400,6 +400,91 @@ func TestRange_Process(t *testing.T) {
)
},
},
{
name: "empty bounds start == stop",
spec: &functions.RangeProcedureSpec{
Bounds: query.Bounds{
Start: query.Time{
Absolute: time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC),
},
Stop: query.Time{
Absolute: time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
TimeCol: "_time",
StartCol: "_start",
StopCol: "_stop",
},
data: []query.Table{&executetest.Table{
ColMeta: []query.ColMeta{
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{execute.Time(time.Minute.Nanoseconds()), 10.0},
{execute.Time(3 * time.Minute.Nanoseconds()), 9.0},
{execute.Time(7 * time.Minute.Nanoseconds()), 1.0},
{execute.Time(2 * time.Minute.Nanoseconds()), 5.0},
{execute.Time(4 * time.Minute.Nanoseconds()), 4.0},
{execute.Time(6 * time.Minute.Nanoseconds()), 8.0},
{execute.Time(5 * time.Minute.Nanoseconds()), 6.0},
},
}},
want: []*executetest.Table{{
ColMeta: []query.ColMeta{
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
},
Data: [][]interface{}(nil),
}},
now: values.Time(7 * time.Minute.Nanoseconds()),
},
{
name: "empty bounds start > stop",
spec: &functions.RangeProcedureSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -2 * time.Minute,
},
Stop: query.Time{
IsRelative: true,
Relative: -5 * time.Minute,
},
},
TimeCol: "_time",
StartCol: "_start",
StopCol: "_stop",
},
data: []query.Table{&executetest.Table{
ColMeta: []query.ColMeta{
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{execute.Time(time.Minute.Nanoseconds()), 10.0},
{execute.Time(3 * time.Minute.Nanoseconds()), 9.0},
{execute.Time(7 * time.Minute.Nanoseconds()), 1.0},
{execute.Time(2 * time.Minute.Nanoseconds()), 5.0},
{execute.Time(4 * time.Minute.Nanoseconds()), 4.0},
{execute.Time(6 * time.Minute.Nanoseconds()), 8.0},
{execute.Time(5 * time.Minute.Nanoseconds()), 6.0},
},
}},
want: []*executetest.Table{{
ColMeta: []query.ColMeta{
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
},
Data: [][]interface{}(nil),
}},
now: values.Time(7 * time.Minute.Nanoseconds()),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@ -450,14 +535,14 @@ func TestRange_Process(t *testing.T) {
}
func TestRange_PushDown_Duplicate(t *testing.T) {
spec := &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Stop: query.Now,
},
}
root := &plan.Procedure{
Spec: &functions.FromProcedureSpec{
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.MinTime,
Stop: query.Now,
},
@ -473,7 +558,7 @@ func TestRange_PushDown_Duplicate(t *testing.T) {
func TestRange_PushDown_Match(t *testing.T) {
spec := &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Stop: query.Now,
},
TimeCol: "_time",

View File

@ -1,8 +1,8 @@
from(db:"testdb")
|> range(start:2018-05-22T19:53:26Z)
|> group(by: ["_measurement"])
|> window(every: 1s, ignoreGlobalBounds: true)
|> window(every: 1s)
|> mean(timeSrc: "_start")
|> window(every: inf, ignoreGlobalBounds: true)
|> window(every: inf)
|> map(fn: (r) => {_time: r._time, mean: r._value})
|> yield(name:"0")

View File

@ -1,8 +1,8 @@
from(db:"testdb")
|> range(start:2018-05-22T19:53:26Z)
|> group(by: ["_measurement"])
|> window(every: 1s, start: 2, ignoreGlobalBounds: true)
|> window(every: 1s, start: 2)
|> mean(timeSrc: "_start")
|> window(every: inf, ignoreGlobalBounds: true)
|> window(every: inf)
|> map(fn: (r) => {_time: r._time, mean: r._value})
|> yield(name:"0")

View File

@ -15,15 +15,14 @@ import (
const WindowKind = "window"
type WindowOpSpec struct {
Every query.Duration `json:"every"`
Period query.Duration `json:"period"`
Start query.Time `json:"start"`
Round query.Duration `json:"round"`
Triggering query.TriggerSpec `json:"triggering"`
IgnoreGlobalBounds bool `json:"ignoreGlobalBounds"`
TimeCol string `json:"timeCol"`
StopColLabel string `json:"stopColLabel"`
StartColLabel string `json:"startColLabel"`
Every query.Duration `json:"every"`
Period query.Duration `json:"period"`
Start query.Time `json:"start"`
Round query.Duration `json:"round"`
Triggering query.TriggerSpec `json:"triggering"`
TimeCol string `json:"time_col"`
StopColLabel string `json:"stop_col_label"`
StartColLabel string `json:"start_col_label"`
}
var infinityVar = values.NewDurationValue(math.MaxInt64)
@ -35,7 +34,6 @@ func init() {
windowSignature.Params["period"] = semantic.Duration
windowSignature.Params["round"] = semantic.Duration
windowSignature.Params["start"] = semantic.Time
windowSignature.Params["ignoreGlobalBounds"] = semantic.Bool
windowSignature.Params["timeCol"] = semantic.String
windowSignature.Params["startColLabel"] = semantic.String
windowSignature.Params["stopColLabel"] = semantic.String
@ -82,12 +80,6 @@ func createWindowOpSpec(args query.Arguments, a *query.Administration) (query.Op
return nil, errors.New(`window function requires at least one of "every" or "period" to be set`)
}
if v, ok, err := args.GetBool("ignoreGlobalBounds"); err != nil {
return nil, err
} else if ok {
spec.IgnoreGlobalBounds = v
}
if label, ok, err := args.GetString("timeCol"); err != nil {
return nil, err
} else if ok {
@ -129,9 +121,8 @@ func (s *WindowOpSpec) Kind() query.OperationKind {
}
type WindowProcedureSpec struct {
Window plan.WindowSpec
IgnoreGlobalBounds bool
Triggering query.TriggerSpec
Window plan.WindowSpec
Triggering query.TriggerSpec
TimeCol,
StartColLabel,
StopColLabel string
@ -149,11 +140,10 @@ func newWindowProcedure(qs query.OperationSpec, pa plan.Administration) (plan.Pr
Round: s.Round,
Start: s.Start,
},
IgnoreGlobalBounds: s.IgnoreGlobalBounds,
Triggering: s.Triggering,
TimeCol: s.TimeCol,
StartColLabel: s.StartColLabel,
StopColLabel: s.StopColLabel,
Triggering: s.Triggering,
TimeCol: s.TimeCol,
StartColLabel: s.StartColLabel,
StopColLabel: s.StopColLabel,
}
if p.Triggering == nil {
p.Triggering = query.DefaultTrigger
@ -188,17 +178,22 @@ func createWindowTransformation(id execute.DatasetID, mode execute.AccumulationM
} else {
start = a.ResolveTime(s.Window.Start)
}
bounds := a.StreamContext().Bounds()
if bounds == nil {
return nil, nil, errors.New("nil bounds passed to window")
}
t := NewFixedWindowTransformation(
d,
cache,
a.StreamContext().Bounds(),
*bounds,
execute.Window{
Every: execute.Duration(s.Window.Every),
Period: execute.Duration(s.Window.Period),
Round: execute.Duration(s.Window.Round),
Start: start,
},
s.IgnoreGlobalBounds,
s.TimeCol,
s.StartColLabel,
s.StopColLabel,
@ -212,8 +207,7 @@ type fixedWindowTransformation struct {
w execute.Window
bounds execute.Bounds
offset execute.Duration
ignoreGlobalBounds bool
offset execute.Duration
timeCol,
startColLabel,
@ -225,22 +219,20 @@ func NewFixedWindowTransformation(
cache execute.TableBuilderCache,
bounds execute.Bounds,
w execute.Window,
ignoreGlobalBounds bool,
timeCol,
startColLabel,
stopColLabel string,
) execute.Transformation {
offset := execute.Duration(w.Start - w.Start.Truncate(w.Every))
return &fixedWindowTransformation{
d: d,
cache: cache,
w: w,
bounds: bounds,
offset: offset,
ignoreGlobalBounds: ignoreGlobalBounds,
timeCol: timeCol,
startColLabel: startColLabel,
stopColLabel: stopColLabel,
d: d,
cache: cache,
w: w,
bounds: bounds,
offset: offset,
timeCol: timeCol,
startColLabel: startColLabel,
stopColLabel: stopColLabel,
}
}
@ -373,21 +365,17 @@ func (t *fixedWindowTransformation) getWindowBounds(now execute.Time) []execute.
Stop: stop,
}
// Check global bounds
if !t.ignoreGlobalBounds {
if bnds.Stop > t.bounds.Stop {
bnds.Stop = t.bounds.Stop
}
// Check against procedure bounds
if bnds.Stop > t.bounds.Stop {
bnds.Stop = t.bounds.Stop
}
if bnds.Start < t.bounds.Start {
bnds.Start = t.bounds.Start
}
if bnds.Start < t.bounds.Start {
bnds.Start = t.bounds.Start
}
// Check bounds again since we just clamped them.
if bnds.Contains(now) {
bounds = append(bounds, bnds)
}
} else {
// Check bounds again since we just clamped them.
if bnds.Contains(now) {
bounds = append(bounds, bnds)
}

View File

@ -86,7 +86,6 @@ func TestFixedWindow_PassThrough(t *testing.T) {
Every: execute.Duration(time.Minute),
Period: execute.Duration(time.Minute),
},
false,
execute.DefaultTimeColLabel,
execute.DefaultStartColLabel,
execute.DefaultStopColLabel,
@ -95,6 +94,11 @@ func TestFixedWindow_PassThrough(t *testing.T) {
})
}
var EmptyBounds = &execute.Bounds{
Start: execute.Time(0),
Stop: execute.Time(0),
}
func TestFixedWindow_Process(t *testing.T) {
testCases := []struct {
name string
@ -102,6 +106,7 @@ func TestFixedWindow_Process(t *testing.T) {
start execute.Time
every, period execute.Duration
num int
bounds *execute.Bounds
want func(start execute.Time) []*executetest.Table
}{
{
@ -555,13 +560,46 @@ func TestFixedWindow_Process(t *testing.T) {
}
},
},
{
name: "empty bounds start == stop",
valueCol: query.ColMeta{Label: "_value", Type: query.TInt},
start: execute.Time(time.Date(2017, 10, 10, 10, 0, 0, 0, time.UTC).UnixNano()),
every: execute.Duration(time.Minute),
period: execute.Duration(time.Minute),
num: 15,
bounds: EmptyBounds,
want: func(start execute.Time) []*executetest.Table {
return nil
},
},
{
name: "empty bounds start > stop",
valueCol: query.ColMeta{Label: "_value", Type: query.TInt},
start: execute.Time(time.Date(2017, 10, 10, 10, 0, 0, 0, time.UTC).UnixNano()),
every: execute.Duration(time.Minute),
period: execute.Duration(time.Minute),
num: 15,
bounds: &execute.Bounds{
Start: execute.Time(time.Date(2017, 10, 10, 12, 0, 0, 0, time.UTC).UnixNano()),
Stop: execute.Time(time.Date(2017, 10, 10, 10, 0, 0, 0, time.UTC).UnixNano()),
},
want: func(start execute.Time) []*executetest.Table {
return nil
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
start := tc.start
stop := start + execute.Time(time.Hour)
var start, stop execute.Time
if tc.bounds != nil {
start = tc.bounds.Start
stop = tc.bounds.Stop
} else {
start = tc.start
stop = start + execute.Time(time.Hour)
}
d := executetest.NewDataset(executetest.RandomDatasetID())
c := execute.NewTableBuilderCache(executetest.UnlimitedAllocator)
@ -579,7 +617,6 @@ func TestFixedWindow_Process(t *testing.T) {
Period: tc.period,
Start: start,
},
false,
execute.DefaultTimeColLabel,
execute.DefaultStartColLabel,
execute.DefaultStopColLabel,

View File

@ -77,10 +77,10 @@ At this point, generate the `filter` call to evaluate the condition. If there is
We group together the streams based on the `GROUP BY` clause. As an example:
> SELECT mean(usage_user) FROM telegraf..cpu WHERE time >= now() - 5m GROUP BY time(5m), host
... |> group(by: ["_measurement", "_start", "host"]) |> window(every: 5m, ignoreGlobalBounds: true)
... |> group(by: ["_measurement", "_start", "host"]) |> window(every: 5m)
If the `GROUP BY time(...)` doesn't exist, `window()` is skipped. Grouping will have a default of [`_measurement`, `_start`], regardless of whether a GROUP BY clause is present.
If there are keys in the group by clause, they are concatenated with the default list. If a wildcard is used for grouping, then this step is skipped. We also add `ignoreGlobalBounds` to every invocation of `window()` so the boundaries aren't clamped by the `range()` call.
If there are keys in the group by clause, they are concatenated with the default list. If a wildcard is used for grouping, then this step is skipped.
### <a name="evaluate-function"></a> Evaluate the function
@ -105,7 +105,7 @@ If the aggregate is combined with conditions, the column name of `_value` is rep
If there a window operation was added, we then combine each of the function results from the windows back into a single table.
... |> window(every: inf, ignoreGlobalBounds: true)
... |> window(every: inf)
This step is skipped if there was no window function.

View File

@ -231,12 +231,11 @@ func (gr *groupInfo) createCursor(t *transpilerState) (cursor, error) {
if interval > 0 {
cur = &groupCursor{
id: t.op("window", &functions.WindowOpSpec{
Every: query.Duration(math.MaxInt64),
Period: query.Duration(math.MaxInt64),
IgnoreGlobalBounds: true,
TimeCol: execute.DefaultTimeColLabel,
StartColLabel: execute.DefaultStartColLabel,
StopColLabel: execute.DefaultStopColLabel,
Every: query.Duration(math.MaxInt64),
Period: query.Duration(math.MaxInt64),
TimeCol: execute.DefaultTimeColLabel,
StartColLabel: execute.DefaultStartColLabel,
StopColLabel: execute.DefaultStopColLabel,
}, cur.ID()),
cursor: cur,
}
@ -357,12 +356,11 @@ func (gr *groupInfo) group(t *transpilerState, in cursor) (cursor, error) {
if windowEvery > 0 {
windowOp := &functions.WindowOpSpec{
Every: query.Duration(windowEvery),
Period: query.Duration(windowEvery),
IgnoreGlobalBounds: true,
TimeCol: execute.DefaultTimeColLabel,
StartColLabel: execute.DefaultStartColLabel,
StopColLabel: execute.DefaultStopColLabel,
Every: query.Duration(windowEvery),
Period: query.Duration(windowEvery),
TimeCol: execute.DefaultTimeColLabel,
StartColLabel: execute.DefaultStartColLabel,
StopColLabel: execute.DefaultStopColLabel,
}
if !windowStart.IsZero() {

View File

@ -80,24 +80,22 @@ func init() {
{
ID: "window0",
Spec: &functions.WindowOpSpec{
Every: query.Duration(time.Minute),
Period: query.Duration(time.Minute),
IgnoreGlobalBounds: true,
TimeCol: execute.DefaultTimeColLabel,
StartColLabel: execute.DefaultStartColLabel,
StopColLabel: execute.DefaultStopColLabel,
Every: query.Duration(time.Minute),
Period: query.Duration(time.Minute),
TimeCol: execute.DefaultTimeColLabel,
StartColLabel: execute.DefaultStartColLabel,
StopColLabel: execute.DefaultStopColLabel,
},
},
&aggregate,
{
ID: "window1",
Spec: &functions.WindowOpSpec{
Every: query.Duration(math.MaxInt64),
Period: query.Duration(math.MaxInt64),
IgnoreGlobalBounds: true,
TimeCol: execute.DefaultTimeColLabel,
StartColLabel: execute.DefaultStartColLabel,
StopColLabel: execute.DefaultStopColLabel,
Every: query.Duration(math.MaxInt64),
Period: query.Duration(math.MaxInt64),
TimeCol: execute.DefaultTimeColLabel,
StartColLabel: execute.DefaultStartColLabel,
StopColLabel: execute.DefaultStopColLabel,
},
},
{

View File

@ -80,25 +80,23 @@ func init() {
{
ID: "window0",
Spec: &functions.WindowOpSpec{
Every: query.Duration(5 * time.Minute),
Period: query.Duration(5 * time.Minute),
Start: query.Time{Absolute: time.Unix(0, 0).Add(time.Minute * 2)},
IgnoreGlobalBounds: true,
TimeCol: execute.DefaultTimeColLabel,
StartColLabel: execute.DefaultStartColLabel,
StopColLabel: execute.DefaultStopColLabel,
Every: query.Duration(5 * time.Minute),
Period: query.Duration(5 * time.Minute),
Start: query.Time{Absolute: time.Unix(0, 0).Add(time.Minute * 2)},
TimeCol: execute.DefaultTimeColLabel,
StartColLabel: execute.DefaultStartColLabel,
StopColLabel: execute.DefaultStopColLabel,
},
},
&aggregate,
{
ID: "window1",
Spec: &functions.WindowOpSpec{
Every: query.Duration(math.MaxInt64),
Period: query.Duration(math.MaxInt64),
IgnoreGlobalBounds: true,
TimeCol: execute.DefaultTimeColLabel,
StartColLabel: execute.DefaultStartColLabel,
StopColLabel: execute.DefaultStopColLabel,
Every: query.Duration(math.MaxInt64),
Period: query.Duration(math.MaxInt64),
TimeCol: execute.DefaultTimeColLabel,
StartColLabel: execute.DefaultStartColLabel,
StopColLabel: execute.DefaultStopColLabel,
},
},
{

View File

@ -6,370 +6,98 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/plan/plantest"
"github.com/influxdata/platform/query/values"
)
func TestBoundsIntersect(t *testing.T) {
now := time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC)
tests := []struct {
name string
now time.Time
a, b plan.BoundsSpec
want plan.BoundsSpec
a, b *plan.BoundsSpec
want *plan.BoundsSpec
}{
{
name: "contained",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
a: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
b: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -30 * time.Minute,
},
Stop: query.Now,
b: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-30 * time.Minute)),
Stop: values.ConvertTime(now),
},
want: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -30 * time.Minute,
},
Stop: query.Now,
want: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-30 * time.Minute)),
Stop: values.ConvertTime(now),
},
},
{
name: "contained sym",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -30 * time.Minute,
},
Stop: query.Now,
a: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-30 * time.Minute)),
Stop: values.ConvertTime(now),
},
b: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
b: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
want: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -30 * time.Minute,
},
Stop: query.Now,
want: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-30 * time.Minute)),
Stop: values.ConvertTime(now),
},
},
{
name: "no overlap",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
a: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
b: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -3 * time.Hour,
},
Stop: query.Time{
IsRelative: true,
Relative: -2 * time.Hour,
},
},
want: plan.EmptyBoundsSpec,
},
{
name: "no overlap sym",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -3 * time.Hour,
},
Stop: query.Time{
IsRelative: true,
Relative: -2 * time.Hour,
},
},
b: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
b: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-3 * time.Hour)),
Stop: values.ConvertTime(now.Add(-2 * time.Hour)),
},
want: plan.EmptyBoundsSpec,
},
{
name: "overlap",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
a: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
b: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -2 * time.Hour,
},
Stop: query.Time{
IsRelative: true,
Relative: -30 * time.Minute,
},
b: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-2 * time.Hour)),
Stop: values.ConvertTime(now.Add(-30 * time.Minute)),
},
want: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Time{
IsRelative: true,
Relative: -30 * time.Minute,
},
want: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now.Add(-30 * time.Minute)),
},
},
{
name: "overlap sym",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -2 * time.Hour,
},
Stop: query.Time{
IsRelative: true,
Relative: -30 * time.Minute,
},
},
b: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
},
want: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Time{
IsRelative: true,
Relative: -30 * time.Minute,
},
},
},
{
name: "both start zero",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Stop: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
},
b: plan.BoundsSpec{
Stop: query.Time{
IsRelative: true,
Relative: -20 * time.Minute,
},
},
want: plan.BoundsSpec{},
},
{
name: "both start zero sym",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Stop: query.Time{
IsRelative: true,
Relative: -20 * time.Minute,
},
},
b: plan.BoundsSpec{
Stop: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
},
want: plan.BoundsSpec{},
},
{
name: "absolute times",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 1, 0, 0, time.UTC),
},
Stop: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 3, 0, 0, time.UTC),
},
a: &plan.BoundsSpec{
Start: values.ConvertTime(time.Date(2018, time.January, 1, 0, 1, 0, 0, time.UTC)),
Stop: values.ConvertTime(time.Date(2018, time.January, 1, 0, 3, 0, 0, time.UTC)),
},
b: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 4, 0, 0, time.UTC),
},
Stop: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 5, 0, 0, time.UTC),
},
b: &plan.BoundsSpec{
Start: values.ConvertTime(time.Date(2018, time.January, 1, 0, 4, 0, 0, time.UTC)),
Stop: values.ConvertTime(time.Date(2018, time.January, 1, 0, 5, 0, 0, time.UTC)),
},
want: plan.EmptyBoundsSpec,
},
{
name: "absolute times sym",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 4, 0, 0, time.UTC),
},
Stop: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 5, 0, 0, time.UTC),
},
},
b: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 1, 0, 0, time.UTC),
},
Stop: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 3, 0, 0, time.UTC),
},
},
want: plan.EmptyBoundsSpec,
},
{
name: "relative bounds future",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Time{
IsRelative: true,
Relative: 5 * time.Hour,
},
},
b: plan.BoundsSpec{
Start: query.Now,
Stop: query.Time{
IsRelative: true,
Relative: 3 * time.Hour,
},
},
want: plan.BoundsSpec{
Start: query.Now,
Stop: query.Time{
IsRelative: true,
Relative: 3 * time.Hour,
},
},
},
{
name: "relative bounds 2",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -3 * time.Hour,
},
Stop: query.Now,
},
b: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -2 * time.Hour,
},
Stop: query.Time{
IsRelative: true,
Relative: 2 * time.Hour,
},
},
want: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -2 * time.Hour,
},
Stop: query.Now,
},
},
{
name: "start stop zero",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{},
b: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 1, 0, 0, time.UTC),
},
Stop: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 3, 0, 0, time.UTC),
},
},
want: plan.BoundsSpec{},
},
{
name: "one stops zero",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -3 * time.Hour,
},
Stop: query.Now,
},
b: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -2 * time.Hour,
},
},
want: plan.BoundsSpec{},
},
{
name: "relative/absolute",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 15, 0, 0, time.UTC),
},
Stop: query.Now,
},
b: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 10, 0, 0, time.UTC),
},
Stop: query.Time{
Absolute: time.Date(2018, time.August, 15, 11, 0, 0, 0, time.UTC),
},
},
want: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 15, 0, 0, time.UTC),
},
Stop: query.Now,
},
},
{
name: "intersect with empty returns empty",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 15, 0, 0, time.UTC),
},
Stop: query.Now,
a: &plan.BoundsSpec{
Start: values.ConvertTime(time.Date(2018, time.January, 1, 0, 15, 0, 0, time.UTC)),
Stop: values.ConvertTime(now),
},
b: plan.EmptyBoundsSpec,
want: plan.EmptyBoundsSpec,
@ -378,18 +106,16 @@ func TestBoundsIntersect(t *testing.T) {
name: "intersect with empty returns empty sym",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.EmptyBoundsSpec,
b: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 15, 0, 0, time.UTC),
},
Stop: query.Now,
b: &plan.BoundsSpec{
Start: values.ConvertTime(time.Date(2018, time.January, 1, 0, 15, 0, 0, time.UTC)),
Stop: values.ConvertTime(now),
},
want: plan.EmptyBoundsSpec,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.a.Intersect(tt.b, tt.now)
got := tt.a.Intersect(tt.b)
if !cmp.Equal(got, tt.want) {
t.Errorf("unexpected bounds -want/+got:\n%s", cmp.Diff(tt.want, got, plantest.CmpOptions...))
}
@ -398,142 +124,33 @@ func TestBoundsIntersect(t *testing.T) {
}
func TestBounds_Union(t *testing.T) {
now := time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC)
tests := []struct {
name string
now time.Time
a, b plan.BoundsSpec
want plan.BoundsSpec
a, b *plan.BoundsSpec
want *plan.BoundsSpec
}{
{
name: "basic case",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 1, 0, 0, time.UTC),
},
Stop: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 3, 0, 0, time.UTC),
},
a: &plan.BoundsSpec{
Start: values.ConvertTime(time.Date(2018, time.January, 1, 0, 1, 0, 0, time.UTC)),
Stop: values.ConvertTime(time.Date(2018, time.January, 1, 0, 3, 0, 0, time.UTC)),
},
b: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 2, 0, 0, time.UTC),
},
Stop: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 4, 0, 0, time.UTC),
},
b: &plan.BoundsSpec{
Start: values.ConvertTime(time.Date(2018, time.January, 1, 0, 2, 0, 0, time.UTC)),
Stop: values.ConvertTime(time.Date(2018, time.January, 1, 0, 4, 0, 0, time.UTC)),
},
want: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 1, 0, 0, time.UTC),
},
Stop: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 4, 0, 0, time.UTC),
},
},
},
{
name: "basic case relative",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
},
b: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -30 * time.Minute,
},
Stop: query.Now,
},
want: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
},
},
{
name: "bounds in future",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -2 * time.Hour,
},
Stop: query.Now,
},
b: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Time{
IsRelative: true,
Relative: 2 * time.Hour,
},
},
want: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -2 * time.Hour,
},
Stop: query.Time{
IsRelative: true,
Relative: 2 * time.Hour,
},
},
},
{
name: "one zero, one not",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -2 * time.Hour,
},
Stop: query.Now,
},
b: plan.BoundsSpec{},
want: plan.BoundsSpec{},
},
{
name: "relative/absolute",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
},
b: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 10, 0, 0, time.UTC),
},
Stop: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 20, 0, 0, time.UTC),
},
},
want: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 10, 0, 0, time.UTC),
},
Stop: query.Now,
want: &plan.BoundsSpec{
Start: values.ConvertTime(time.Date(2018, time.January, 1, 0, 1, 0, 0, time.UTC)),
Stop: values.ConvertTime(time.Date(2018, time.January, 1, 0, 4, 0, 0, time.UTC)),
},
},
{
name: "union with empty returns empty",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 15, 0, 0, time.UTC),
},
Stop: query.Now,
a: &plan.BoundsSpec{
Start: values.ConvertTime(time.Date(2018, time.January, 1, 0, 15, 0, 0, time.UTC)),
Stop: values.ConvertTime(now),
},
b: plan.EmptyBoundsSpec,
want: plan.EmptyBoundsSpec,
@ -542,47 +159,33 @@ func TestBounds_Union(t *testing.T) {
name: "union with empty returns empty sym",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.EmptyBoundsSpec,
b: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 15, 0, 0, time.UTC),
},
Stop: query.Now,
b: &plan.BoundsSpec{
Start: values.ConvertTime(time.Date(2018, time.January, 1, 0, 15, 0, 0, time.UTC)),
Stop: values.ConvertTime(now),
},
want: plan.EmptyBoundsSpec,
},
{
name: "no overlap",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
a: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 15, 0, 0, time.UTC),
},
Stop: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 20, 0, 0, time.UTC),
},
a: &plan.BoundsSpec{
Start: values.ConvertTime(time.Date(2018, time.January, 1, 0, 15, 0, 0, time.UTC)),
Stop: values.ConvertTime(time.Date(2018, time.January, 1, 0, 20, 0, 0, time.UTC)),
},
b: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 45, 0, 0, time.UTC),
},
Stop: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 50, 0, 0, time.UTC),
},
b: &plan.BoundsSpec{
Start: values.ConvertTime(time.Date(2018, time.January, 1, 0, 45, 0, 0, time.UTC)),
Stop: values.ConvertTime(time.Date(2018, time.January, 1, 0, 50, 0, 0, time.UTC)),
},
want: plan.BoundsSpec{
Start: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 15, 0, 0, time.UTC),
},
Stop: query.Time{
Absolute: time.Date(2018, time.January, 1, 0, 50, 0, 0, time.UTC),
},
want: &plan.BoundsSpec{
Start: values.ConvertTime(time.Date(2018, time.January, 1, 0, 15, 0, 0, time.UTC)),
Stop: values.ConvertTime(time.Date(2018, time.January, 1, 0, 50, 0, 0, time.UTC)),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.a.Union(tt.b, tt.now)
got := tt.a.Union(tt.b)
if !cmp.Equal(got, tt.want) {
t.Errorf("unexpected bounds -want/+got:\n%s", cmp.Diff(tt.want, got, plantest.CmpOptions...))
}
@ -590,106 +193,47 @@ func TestBounds_Union(t *testing.T) {
}
}
func TestBounds_HasZero(t *testing.T) {
tests := []struct {
name string
now time.Time
bounds plan.BoundsSpec
want bool
}{
{
name: "single zero",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Time{},
},
want: true,
},
{
name: "both zero",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: plan.BoundsSpec{
Start: query.Time{},
Stop: query.Time{},
},
want: true,
},
{
name: "both non-zero",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.bounds.HasZero()
if got != tt.want {
t.Errorf("unexpected result for bounds.HasZero(): got %t, want %t", got, tt.want)
}
})
}
}
func TestBounds_IsEmpty(t *testing.T) {
now := time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC)
tests := []struct {
name string
now time.Time
bounds plan.BoundsSpec
bounds *plan.BoundsSpec
want bool
}{
{
name: "empty bounds / start == stop",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: plan.BoundsSpec{
Start: query.Now,
Stop: query.Now,
bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now),
Stop: values.ConvertTime(now),
},
want: true,
},
{
name: "empty bounds / absolute now == relative now",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: plan.BoundsSpec{
Start: query.Now,
Stop: query.Time{
Absolute: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
},
bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now),
Stop: values.ConvertTime(time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC)),
},
want: true,
},
{
name: "start > stop",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: time.Hour,
},
Stop: query.Now,
bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(time.Hour)),
Stop: values.ConvertTime(now),
},
want: true,
},
{
name: "start < stop",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
want: false,
},
@ -697,10 +241,37 @@ func TestBounds_IsEmpty(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.bounds.IsEmpty(tt.now)
got := tt.bounds.IsEmpty()
if got != tt.want {
t.Errorf("unexpected result for bounds.IsEmpty(): got %t, want %t", got, tt.want)
}
})
}
}
/*
func TestPlanner_ResolveBounds(t *testing.T) {
tests := []struct {
name string
now time.Time
bounds query.Bounds
want plan.BoundsSpec
}{
{
name: "relative bounds",
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
bounds: query.Bounds{
Start: query.Time{
Relative: true,
Start: -1 * time.Hour,
},
Stop: values.ConvertTime(now),
},
want: &plan.BoundsSpec{
Start: values.Time(time.Unix())
}
}
}
}
*/

View File

@ -56,7 +56,7 @@ func TestLogicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("1"): {
ID: plan.ProcedureIDFromOperationID("1"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{Relative: -1 * time.Hour},
},
TimeCol: "_time",
@ -97,7 +97,7 @@ func TestLogicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("range0"): {
ID: plan.ProcedureIDFromOperationID("range0"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{Relative: -1 * time.Hour},
},
TimeCol: "_time",
@ -126,7 +126,7 @@ func TestLogicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("range1"): {
ID: plan.ProcedureIDFromOperationID("range1"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{Relative: -1 * time.Hour},
},
TimeCol: "_time",

View File

@ -5,6 +5,8 @@ import (
"math"
"time"
"github.com/influxdata/platform/query/values"
"github.com/influxdata/platform/query"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
@ -13,6 +15,11 @@ import (
// DefaultYieldName is the yield name to use in cases where no explicit yield name was specified.
const DefaultYieldName = "_result"
var (
MinTime = values.ConvertTime(query.MinTime.Absolute)
MaxTime = values.ConvertTime(query.MaxTime.Absolute)
)
type PlanSpec struct {
// Now represents the relative current time of the plan.
Now time.Time
@ -60,6 +67,22 @@ func NewPlanner() Planner {
return new(planner)
}
func resolveTime(qt query.Time, now time.Time) values.Time {
return values.ConvertTime(qt.Time(now))
}
func ToBoundsSpec(bounds query.Bounds, now time.Time) (*BoundsSpec, error) {
if bounds.HasZero() {
return nil, errors.New("bounds contain zero time")
}
return &BoundsSpec{
Start: resolveTime(bounds.Start, now),
Stop: resolveTime(bounds.Stop, now),
}, nil
}
// TODO: remove branches with empty bounds
func (p *planner) Plan(lp *LogicalPlanSpec, s Storage) (*PlanSpec, error) {
now := lp.Now
@ -136,10 +159,14 @@ func (p *planner) Plan(lp *LogicalPlanSpec, s Storage) (*PlanSpec, error) {
// The bounds of the current procedure are always the union
// of the bounds of any parent procedure
pr.DoParents(func(parent *Procedure) {
if pr.Bounds.HasZero() {
pr.Bounds = parent.Bounds
if parent.Bounds == nil {
return
}
if pr.Bounds != nil {
pr.Bounds = pr.Bounds.Union(parent.Bounds)
} else {
pr.Bounds = pr.Bounds.Union(parent.Bounds, now)
pr.Bounds = parent.Bounds
}
})
@ -147,15 +174,15 @@ func (p *planner) Plan(lp *LogicalPlanSpec, s Storage) (*PlanSpec, error) {
// the procedure's new bounds are the intersection of any bounds it inherited
// from its parents, and its own bounds.
if bounded, ok := pr.Spec.(BoundedProcedureSpec); ok {
if pr.Bounds.HasZero() {
pr.Bounds = bounded.TimeBounds()
convertedBounds, err := ToBoundsSpec(bounded.TimeBounds(), pr.plan.Now)
if err != nil {
return nil, errors.Wrapf(err, "invalid time bounds from procedure %s", pr.Spec.Kind())
}
if pr.Bounds != nil {
pr.Bounds = pr.Bounds.Intersect(convertedBounds)
} else {
newBounds := pr.Bounds.Intersect(bounded.TimeBounds(), now)
if newBounds != EmptyBoundsSpec {
pr.Bounds = newBounds
} else {
pr.Bounds = bounded.TimeBounds()
}
pr.Bounds = convertedBounds
}
}
@ -191,9 +218,13 @@ func (p *planner) Plan(lp *LogicalPlanSpec, s Storage) (*PlanSpec, error) {
}
}
// Check to see if any results are unbounded.
// Since bounds are inherited,
// results will be unbounded only if no bounds were provided
// by any parent procedure node.
for name, yield := range p.plan.Results {
if pr, ok := p.plan.Procedures[yield.ID]; ok {
if pr.Bounds.HasZero() {
if pr.Bounds == nil {
return nil, fmt.Errorf(`result '%s' is unbounded. Add a 'range' call to bound the query.`, name)
}
}

View File

@ -5,6 +5,8 @@ import (
"testing"
"time"
"github.com/influxdata/platform/query/values"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/functions"
@ -13,6 +15,7 @@ import (
)
func TestPhysicalPlanner_Plan(t *testing.T) {
now := time.Date(2017, 8, 8, 0, 0, 0, 0, time.UTC)
testCases := []struct {
name string
lp *plan.LogicalPlanSpec
@ -21,6 +24,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
{
name: "single push down",
lp: &plan.LogicalPlanSpec{
Now: now,
Resources: query.ResourceManagement{
ConcurrencyQuota: 1,
MemoryBytesQuota: 10000,
@ -37,7 +41,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("range"): {
ID: plan.ProcedureIDFromOperationID("range"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -78,7 +82,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
Spec: &functions.FromProcedureSpec{
Database: "mydb",
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -88,12 +92,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
AggregateSet: true,
AggregateMethod: "count",
},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: nil,
Children: []plan.ProcedureID{},
@ -110,6 +111,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
{
name: "single push down with match",
lp: &plan.LogicalPlanSpec{
Now: now,
Procedures: map[plan.ProcedureID]*plan.Procedure{
plan.ProcedureIDFromOperationID("from"): {
ID: plan.ProcedureIDFromOperationID("from"),
@ -145,7 +147,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
Spec: &functions.FromProcedureSpec{
Database: "mydb",
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.MinTime,
Stop: query.Now,
},
@ -154,9 +156,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
DescendingSet: true,
Descending: true,
},
Bounds: plan.BoundsSpec{
Start: query.MinTime,
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: plan.MinTime,
Stop: values.ConvertTime(now),
},
Parents: nil,
Children: []plan.ProcedureID{},
@ -173,6 +175,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
{
name: "multiple push down",
lp: &plan.LogicalPlanSpec{
Now: now,
Procedures: map[plan.ProcedureID]*plan.Procedure{
plan.ProcedureIDFromOperationID("from"): {
ID: plan.ProcedureIDFromOperationID("from"),
@ -185,7 +188,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("range"): {
ID: plan.ProcedureIDFromOperationID("range"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -237,7 +240,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
Spec: &functions.FromProcedureSpec{
Database: "mydb",
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -247,12 +250,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
LimitSet: true,
PointsLimit: 10,
},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: nil,
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("mean")},
@ -263,12 +263,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
Parents: []plan.ProcedureID{
(plan.ProcedureIDFromOperationID("from")),
},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Children: nil,
},
@ -285,6 +282,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
{
name: "multiple yield",
lp: &plan.LogicalPlanSpec{
Now: now,
Resources: query.ResourceManagement{
ConcurrencyQuota: 1,
MemoryBytesQuota: 10000,
@ -301,7 +299,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("range"): {
ID: plan.ProcedureIDFromOperationID("range"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -362,7 +360,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
Spec: &functions.FromProcedureSpec{
Database: "mydb",
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -370,12 +368,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
Stop: query.Now,
},
},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: nil,
Children: []plan.ProcedureID{
@ -386,12 +381,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("stddev"): {
ID: plan.ProcedureIDFromOperationID("stddev"),
Spec: &functions.StddevProcedureSpec{},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("from")},
Children: []plan.ProcedureID{},
@ -399,12 +391,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("skew"): {
ID: plan.ProcedureIDFromOperationID("skew"),
Spec: &functions.SkewProcedureSpec{},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("from")},
Children: []plan.ProcedureID{},
@ -424,6 +413,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
{
name: "group with aggregate",
lp: &plan.LogicalPlanSpec{
Now: now,
Resources: query.ResourceManagement{
ConcurrencyQuota: 1,
MemoryBytesQuota: 10000,
@ -440,7 +430,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("range"): {
ID: plan.ProcedureIDFromOperationID("range"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -488,7 +478,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
Spec: &functions.FromProcedureSpec{
Database: "mydb",
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -501,12 +491,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
AggregateSet: true,
AggregateMethod: "sum",
},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: nil,
Children: []plan.ProcedureID{
@ -516,12 +503,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromParentID(plan.ProcedureIDFromOperationID("from")): {
ID: plan.ProcedureIDFromParentID(plan.ProcedureIDFromOperationID("from")),
Spec: &functions.SumProcedureSpec{},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("from")},
},
@ -538,6 +522,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
{
name: "group with distinct on tag",
lp: &plan.LogicalPlanSpec{
Now: now,
Resources: query.ResourceManagement{
ConcurrencyQuota: 1,
MemoryBytesQuota: 10000,
@ -554,7 +539,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("range"): {
ID: plan.ProcedureIDFromOperationID("range"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -604,7 +589,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
Spec: &functions.FromProcedureSpec{
Database: "mydb",
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -617,12 +602,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
LimitSet: true,
PointsLimit: -1,
},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: nil,
Children: []plan.ProcedureID{
@ -632,12 +614,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("distinct"): {
ID: plan.ProcedureIDFromOperationID("distinct"),
Spec: &functions.DistinctProcedureSpec{Column: "host"},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("from")},
},
@ -654,6 +633,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
{
name: "group with distinct on _value does not optimize",
lp: &plan.LogicalPlanSpec{
Now: now,
Resources: query.ResourceManagement{
ConcurrencyQuota: 1,
MemoryBytesQuota: 10000,
@ -670,7 +650,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("range"): {
ID: plan.ProcedureIDFromOperationID("range"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -720,7 +700,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
Spec: &functions.FromProcedureSpec{
Database: "mydb",
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -731,12 +711,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host"},
},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: nil,
Children: []plan.ProcedureID{
@ -746,12 +723,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("distinct"): {
ID: plan.ProcedureIDFromOperationID("distinct"),
Spec: &functions.DistinctProcedureSpec{Column: "_value"},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("from")},
},
@ -768,6 +742,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
{
name: "group with distinct on non-grouped does not optimize",
lp: &plan.LogicalPlanSpec{
Now: now,
Resources: query.ResourceManagement{
ConcurrencyQuota: 1,
MemoryBytesQuota: 10000,
@ -784,7 +759,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("range"): {
ID: plan.ProcedureIDFromOperationID("range"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -834,7 +809,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
Spec: &functions.FromProcedureSpec{
Database: "mydb",
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -845,12 +820,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host"},
},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: nil,
Children: []plan.ProcedureID{
@ -860,12 +832,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("distinct"): {
ID: plan.ProcedureIDFromOperationID("distinct"),
Spec: &functions.DistinctProcedureSpec{Column: "region"},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("from")},
},
@ -882,6 +851,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
{
name: "bounds context",
lp: &plan.LogicalPlanSpec{
Now: now,
Procedures: map[plan.ProcedureID]*plan.Procedure{
plan.ProcedureIDFromOperationID("fromCSV"): {
ID: plan.ProcedureIDFromOperationID("fromCSV"),
@ -894,7 +864,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("range1"): {
ID: plan.ProcedureIDFromOperationID("range1"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -911,7 +881,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("range2"): {
ID: plan.ProcedureIDFromOperationID("range2"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -30 * time.Minute,
@ -952,6 +922,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
},
},
pp: &plan.PlanSpec{
Now: now,
Resources: query.ResourceManagement{
ConcurrencyQuota: 5,
MemoryBytesQuota: math.MaxInt64,
@ -968,7 +939,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("range1"): {
ID: plan.ProcedureIDFromOperationID("range1"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -977,12 +948,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
},
TimeCol: "_time",
},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: []plan.ProcedureID{
(plan.ProcedureIDFromOperationID("fromCSV")),
@ -992,7 +960,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("range2"): {
ID: plan.ProcedureIDFromOperationID("range2"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -30 * time.Minute,
@ -1000,12 +968,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
Stop: query.Now,
},
},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -30 * time.Minute,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-30 * time.Minute)),
Stop: values.ConvertTime(now),
},
Parents: []plan.ProcedureID{
(plan.ProcedureIDFromOperationID("range1")),
@ -1017,12 +982,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
Spec: &functions.LimitProcedureSpec{
N: 10,
},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -30 * time.Minute,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-30 * time.Minute)),
Stop: values.ConvertTime(now),
},
Parents: []plan.ProcedureID{
plan.ProcedureIDFromOperationID("range2"),
@ -1032,12 +994,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("mean"): {
ID: plan.ProcedureIDFromOperationID("mean"),
Spec: &functions.MeanProcedureSpec{},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -30 * time.Minute,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-30 * time.Minute)),
Stop: values.ConvertTime(now),
},
Parents: []plan.ProcedureID{
(plan.ProcedureIDFromOperationID("limit")),
@ -1067,7 +1026,9 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
}
func TestPhysicalPlanner_Plan_PushDown_Branch(t *testing.T) {
now := time.Date(2017, 8, 8, 0, 0, 0, 0, time.UTC)
lp := &plan.LogicalPlanSpec{
Now: now,
Procedures: map[plan.ProcedureID]*plan.Procedure{
plan.ProcedureIDFromOperationID("from"): {
ID: plan.ProcedureIDFromOperationID("from"),
@ -1117,6 +1078,7 @@ func TestPhysicalPlanner_Plan_PushDown_Branch(t *testing.T) {
fromID := plan.ProcedureIDFromOperationID("from")
fromIDDup := plan.ProcedureIDForDuplicate(fromID)
want := &plan.PlanSpec{
Now: now,
Resources: query.ResourceManagement{
ConcurrencyQuota: 2,
MemoryBytesQuota: math.MaxInt64,
@ -1127,7 +1089,7 @@ func TestPhysicalPlanner_Plan_PushDown_Branch(t *testing.T) {
Spec: &functions.FromProcedureSpec{
Database: "mydb",
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.MinTime,
Stop: query.Now,
},
@ -1136,9 +1098,9 @@ func TestPhysicalPlanner_Plan_PushDown_Branch(t *testing.T) {
DescendingSet: true,
Descending: true, // last
},
Bounds: plan.BoundsSpec{
Start: query.MinTime,
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: plan.MinTime,
Stop: values.ConvertTime(now),
},
Children: []plan.ProcedureID{},
},
@ -1147,7 +1109,7 @@ func TestPhysicalPlanner_Plan_PushDown_Branch(t *testing.T) {
Spec: &functions.FromProcedureSpec{
Database: "mydb",
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.MinTime,
Stop: query.Now,
},
@ -1156,9 +1118,9 @@ func TestPhysicalPlanner_Plan_PushDown_Branch(t *testing.T) {
DescendingSet: true,
Descending: false, // first
},
Bounds: plan.BoundsSpec{
Start: query.MinTime,
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: plan.MinTime,
Stop: values.ConvertTime(now),
},
Parents: []plan.ProcedureID{},
Children: []plan.ProcedureID{},
@ -1178,7 +1140,9 @@ func TestPhysicalPlanner_Plan_PushDown_Branch(t *testing.T) {
}
func TestPhysicalPlanner_Plan_PushDown_Mixed(t *testing.T) {
now := time.Date(2017, 8, 8, 0, 0, 0, 0, time.UTC)
lp := &plan.LogicalPlanSpec{
Now: now,
Procedures: map[plan.ProcedureID]*plan.Procedure{
plan.ProcedureIDFromOperationID("from"): {
ID: plan.ProcedureIDFromOperationID("from"),
@ -1191,7 +1155,7 @@ func TestPhysicalPlanner_Plan_PushDown_Mixed(t *testing.T) {
plan.ProcedureIDFromOperationID("range"): {
ID: plan.ProcedureIDFromOperationID("range"),
Spec: &functions.RangeProcedureSpec{
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -1246,6 +1210,7 @@ func TestPhysicalPlanner_Plan_PushDown_Mixed(t *testing.T) {
fromID := plan.ProcedureIDFromOperationID("from")
fromIDDup := plan.ProcedureIDForDuplicate(fromID)
want := &plan.PlanSpec{
Now: now,
Resources: query.ResourceManagement{
ConcurrencyQuota: 3,
MemoryBytesQuota: math.MaxInt64,
@ -1256,7 +1221,7 @@ func TestPhysicalPlanner_Plan_PushDown_Mixed(t *testing.T) {
Spec: &functions.FromProcedureSpec{
Database: "mydb",
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -1266,12 +1231,9 @@ func TestPhysicalPlanner_Plan_PushDown_Mixed(t *testing.T) {
AggregateSet: true,
AggregateMethod: "sum",
},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: []plan.ProcedureID{},
Children: []plan.ProcedureID{},
@ -1281,7 +1243,7 @@ func TestPhysicalPlanner_Plan_PushDown_Mixed(t *testing.T) {
Spec: &functions.FromProcedureSpec{
Database: "mydb",
BoundsSet: true,
Bounds: plan.BoundsSpec{
Bounds: query.Bounds{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
@ -1289,24 +1251,18 @@ func TestPhysicalPlanner_Plan_PushDown_Mixed(t *testing.T) {
Stop: query.Now,
},
},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("mean")},
},
plan.ProcedureIDFromOperationID("mean"): {
ID: plan.ProcedureIDFromOperationID("mean"),
Spec: &functions.MeanProcedureSpec{},
Bounds: plan.BoundsSpec{
Start: query.Time{
IsRelative: true,
Relative: -1 * time.Hour,
},
Stop: query.Now,
Bounds: &plan.BoundsSpec{
Start: values.ConvertTime(now.Add(-1 * time.Hour)),
Stop: values.ConvertTime(now),
},
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("from")},
Children: []plan.ProcedureID{},
@ -1329,10 +1285,15 @@ func TestPhysicalPlanner_Plan_PushDown_Mixed(t *testing.T) {
func PhysicalPlanTestHelper(t *testing.T, lp *plan.LogicalPlanSpec, want *plan.PlanSpec) {
t.Helper()
// Setup expected now time
// Setup expected now time if it doesn't exist
now := time.Now()
lp.Now = now
want.Now = now
if lp.Now.IsZero() {
lp.Now = now
}
if want.Now.IsZero() {
want.Now = now
}
planner := plan.NewPlanner()
got, err := planner.Plan(lp, nil)

View File

@ -2,7 +2,8 @@ package plan
import (
"fmt"
"time"
"github.com/influxdata/platform/query/values"
"github.com/influxdata/platform/query"
uuid "github.com/satori/go.uuid"
@ -22,7 +23,7 @@ type Procedure struct {
Parents []ProcedureID
Children []ProcedureID
Spec ProcedureSpec
Bounds BoundsSpec
Bounds *BoundsSpec
}
func (p *Procedure) Copy() *Procedure {
@ -39,6 +40,11 @@ func (p *Procedure) Copy() *Procedure {
np.Spec = p.Spec.Copy()
if p.Bounds != nil {
bounds := *p.Bounds
np.Bounds = &bounds
}
return np
}
@ -75,7 +81,7 @@ type PushDownProcedureSpec interface {
}
type BoundedProcedureSpec interface {
TimeBounds() BoundsSpec
TimeBounds() query.Bounds
}
type YieldProcedureSpec interface {
@ -102,91 +108,78 @@ type PushDownRule struct {
// ProcedureKind denotes the kind of operations.
type ProcedureKind string
// TODO(adamperlin): make plan.BoundsSpec always use resolved absolute times, and remove
// the `now` parameter from all associated methods.
type BoundsSpec struct {
Start query.Time
Stop query.Time
Start values.Time
Stop values.Time
}
var EmptyBoundsSpec = BoundsSpec{
Start: query.Now,
Stop: query.Now,
var EmptyBoundsSpec = &BoundsSpec{
Start: values.Time(0),
Stop: values.Time(0),
}
// IsEmpty reports whether the given bounds
// are empty, i.e., if start >= stop.
func (b BoundsSpec) IsEmpty(now time.Time) bool {
return b.Start.Time(now).Equal(b.Stop.Time(now)) || b.Start.Time(now).After(b.Stop.Time(now))
}
// HasZero returns true if the given bounds contain a Go zero time value as either Start or Stop.
func (b BoundsSpec) HasZero() bool {
return b.Start.IsZero() || b.Stop.IsZero()
func (b *BoundsSpec) IsEmpty() bool {
return b.Start >= b.Stop
}
// Contains reports whether a given time is contained within the range
// a given BoundsSpec represents
func (b BoundsSpec) Contains(t time.Time, now time.Time) bool {
return (t.After(b.Start.Time(now)) || t.Equal(b.Start.Time(now))) && t.Before(b.Stop.Time(now))
func (b *BoundsSpec) Contains(t values.Time) bool {
return t >= b.Start && t < b.Stop
}
// Overlaps reports whether two given bounds have overlapping time ranges.
func (b BoundsSpec) Overlaps(o BoundsSpec, now time.Time) bool {
return b.Contains(o.Start.Time(now), now) ||
(b.Contains(o.Stop.Time(now), now) && o.Stop.Time(now).After(b.Start.Time(now))) ||
o.Contains(b.Start.Time(now), now)
func (b *BoundsSpec) Overlaps(o *BoundsSpec) bool {
return b.Contains(o.Start) ||
(b.Contains(o.Stop) && o.Stop > b.Start) ||
o.Contains(b.Start)
}
// Union returns the union of two time bounds (the smallest bounds which contain both input bounds)
// If either of the bounds have zeroes, Union will return a zero-valued BoundsSpec.
// Union with EmptyBoundsSpec always returns EmptyBoundsSpec.
func (b BoundsSpec) Union(o BoundsSpec, now time.Time) (u BoundsSpec) {
if b.HasZero() || o.HasZero() {
return
}
if b.IsEmpty(now) || o.IsEmpty(now) {
func (b *BoundsSpec) Union(o *BoundsSpec) *BoundsSpec {
if b.IsEmpty() || o.IsEmpty() {
return EmptyBoundsSpec
}
u := new(BoundsSpec)
u.Start = b.Start
if o.Start.Time(now).Before(b.Start.Time(now)) {
if o.Start < b.Start {
u.Start = o.Start
}
u.Stop = b.Stop
if o.Stop.Time(now).After(b.Stop.Time(now)) {
if o.Stop > b.Stop {
u.Stop = o.Stop
}
return
return u
}
// Intersect returns the intersection of two bounds (the range over which they overlap).
// If either of the bounds have zeroes, it will return a zero-valued BoundsSpec.
// If there is no intersection, EmptyBoundsSpec is returned.
// Intersect with EmptyBoundsSpec will always return EmptyBoundsSpec.
func (b BoundsSpec) Intersect(o BoundsSpec, now time.Time) (i BoundsSpec) {
if b.HasZero() || o.HasZero() {
return
}
if b.IsEmpty(now) || o.IsEmpty(now) || !b.Overlaps(o, now) {
func (b *BoundsSpec) Intersect(o *BoundsSpec) *BoundsSpec {
if b.IsEmpty() || o.IsEmpty() || !b.Overlaps(o) {
return EmptyBoundsSpec
}
i := new(BoundsSpec)
i.Start = b.Start
if o.Start.Time(now).After(b.Start.Time(now)) {
if o.Start > b.Start {
i.Start = o.Start
}
i.Stop = b.Stop
if o.Stop.Time(now).Before(b.Stop.Time(now)) {
if o.Stop < b.Stop {
i.Stop = o.Stop
}
return
return i
}
type WindowSpec struct {