fix(influxdb): better error message when range not pushed down

Fixes https://github.com/influxdata/flux/issues/1224.
pull/13862/head
jlapacik 2019-05-03 15:13:12 -07:00
parent 2d5879b6d6
commit dc59629f69
3 changed files with 57 additions and 13 deletions

View File

@ -121,6 +121,27 @@ func (s *FromProcedureSpec) Copy() plan.ProcedureSpec {
return ns
}
func (s *FromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error {
// FromProcedureSpec is a logical operation representing any read
// from storage. However as a logical operation, it doesn't specify
// how data is to be read from storage. It is the query planner's
// job to determine the optimal read strategy and to convert this
// logical operation into the appropriate physical operation.
//
// Logical operations cannot be executed by the query engine. So if
// this operation is still around post physical planning, it means
// that a 'range' could not be pushed down to storage. Storage does
// not support unbounded reads, and so this query must not be
// validated.
var bucket string
if len(s.Bucket) > 0 {
bucket = s.Bucket
} else {
bucket = s.BucketID
}
return fmt.Errorf("cannot submit unbounded read to %q; try bounding 'from' with a call to 'range'", bucket)
}
func InjectFromDependencies(depsMap execute.Dependencies, deps Dependencies) error {
if err := deps.Validate(); err != nil {
return err

View File

@ -7,6 +7,8 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/plan/plantest"
"github.com/influxdata/flux/querytest"
"github.com/influxdata/flux/stdlib/universe"
platform "github.com/influxdata/influxdb"
@ -154,3 +156,37 @@ func TestFromOpSpec_BucketsAccessed(t *testing.T) {
})
}
}
func TestFromValidation(t *testing.T) {
spec := plantest.PlanSpec{
// from |> group (cannot query an infinite time range)
Nodes: []plan.Node{
plan.CreateLogicalNode("from", &influxdb.FromProcedureSpec{
Bucket: "my-bucket",
}),
plan.CreatePhysicalNode("group", &universe.GroupProcedureSpec{
GroupMode: flux.GroupModeBy,
GroupKeys: []string{"_measurement", "_field"},
}),
},
Edges: [][2]int{
{0, 1},
},
}
ps := plantest.CreatePlanSpec(&spec)
pp := plan.NewPhysicalPlanner(plan.OnlyPhysicalRules(
influxdb.PushDownRangeRule{},
influxdb.PushDownFilterRule{},
influxdb.PushDownGroupRule{},
))
_, err := pp.Plan(ps)
if err == nil {
t.Error("Expected query with no call to range to fail physical planning")
}
want := `cannot submit unbounded read to "my-bucket"; try bounding 'from' with a call to 'range'`
got := err.Error()
if want != got {
t.Errorf("unexpected error; -want/+got\n- %s\n+ %s", want, got)
}
}

View File

@ -79,19 +79,6 @@ func (s *ReadRangePhysSpec) Copy() plan.ProcedureSpec {
return ns
}
func (s *ReadRangePhysSpec) PostPhysicalValidate(id plan.NodeID) error {
if s.Bounds.Start.IsZero() && s.Bounds.Stop.IsZero() {
var bucket string
if len(s.Bucket) > 0 {
bucket = s.Bucket
} else {
bucket = s.BucketID
}
return fmt.Errorf(`%s: results from "%s" must be bounded`, id, bucket)
}
return nil
}
func (s *ReadRangePhysSpec) LookupBucketID(ctx context.Context, orgID influxdb.ID, buckets BucketLookup) (influxdb.ID, error) {
// Determine bucketID
switch {