From aa052e1a5734d66f9b698a963371f14ec2bc59f8 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Tue, 28 Aug 2018 15:50:01 -0500 Subject: [PATCH] feat(query/plan): make it possible to configure the default memory limit for queries The previous default was just to have no limit at all. This adds a configuration option to the planner so a static value can be set for the memory limit on each individual query. --- query/control/controller.go | 3 +- query/plan/options.go | 20 +++++++++++ query/plan/options_test.go | 71 +++++++++++++++++++++++++++++++++++++ query/plan/physical.go | 16 ++++++--- 4 files changed, 105 insertions(+), 5 deletions(-) create mode 100644 query/plan/options.go create mode 100644 query/plan/options_test.go diff --git a/query/control/controller.go b/query/control/controller.go index ac868e280e..d24d07709a 100644 --- a/query/control/controller.go +++ b/query/control/controller.go @@ -66,6 +66,7 @@ type Config struct { ConcurrencyQuota int MemoryBytesQuota int64 ExecutorDependencies execute.Dependencies + PlannerOptions []plan.Option Logger *zap.Logger Verbose bool } @@ -86,7 +87,7 @@ func New(c Config) *Controller { availableConcurrency: c.ConcurrencyQuota, availableMemory: c.MemoryBytesQuota, lplanner: plan.NewLogicalPlanner(), - pplanner: plan.NewPlanner(), + pplanner: plan.NewPlanner(c.PlannerOptions...), executor: execute.NewExecutor(c.ExecutorDependencies, logger), logger: logger, metrics: newControllerMetrics(), diff --git a/query/plan/options.go b/query/plan/options.go new file mode 100644 index 0000000000..9cd8ac24b8 --- /dev/null +++ b/query/plan/options.go @@ -0,0 +1,20 @@ +package plan + +// Option is an option to configure the behavior of the planner. +type Option interface { + apply(*planner) +} + +type optionFunc func(*planner) + +func (opt optionFunc) apply(p *planner) { + opt(p) +} + +// WithDefaultMemoryLimit sets the default memory limit for plans generated by the planner. +// If the query spec explicitly sets a memory limit, that limit is used instead of the default. +func WithDefaultMemoryLimit(memBytes int64) Option { + return optionFunc(func(p *planner) { + p.defaultMemoryLimit = memBytes + }) +} diff --git a/query/plan/options_test.go b/query/plan/options_test.go new file mode 100644 index 0000000000..4faa385566 --- /dev/null +++ b/query/plan/options_test.go @@ -0,0 +1,71 @@ +package plan_test + +import ( + "testing" + "time" + + "github.com/influxdata/platform/query" + "github.com/influxdata/platform/query/functions" + "github.com/influxdata/platform/query/plan" +) + +func TestPhysicalPlanner_DefaultMemoryLimit(t *testing.T) { + // Simple logical plan taken from the planner tests. + lp := &plan.LogicalPlanSpec{ + Now: time.Now(), + Resources: query.ResourceManagement{ + ConcurrencyQuota: 1, + }, + Procedures: map[plan.ProcedureID]*plan.Procedure{ + plan.ProcedureIDFromOperationID("from"): { + ID: plan.ProcedureIDFromOperationID("from"), + Spec: &functions.FromProcedureSpec{ + Database: "mydb", + }, + Parents: nil, + Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")}, + }, + plan.ProcedureIDFromOperationID("range"): { + ID: plan.ProcedureIDFromOperationID("range"), + Spec: &functions.RangeProcedureSpec{ + Bounds: query.Bounds{ + Start: query.Time{ + IsRelative: true, + Relative: -1 * time.Hour, + }, + Stop: query.Now, + }, + TimeCol: "_time", + }, + Parents: []plan.ProcedureID{ + plan.ProcedureIDFromOperationID("from"), + }, + Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("count")}, + }, + plan.ProcedureIDFromOperationID("count"): { + ID: plan.ProcedureIDFromOperationID("count"), + Spec: &functions.CountProcedureSpec{}, + Parents: []plan.ProcedureID{ + (plan.ProcedureIDFromOperationID("range")), + }, + Children: nil, + }, + }, + Order: []plan.ProcedureID{ + plan.ProcedureIDFromOperationID("from"), + plan.ProcedureIDFromOperationID("range"), + plan.ProcedureIDFromOperationID("count"), + }, + } + + planner := plan.NewPlanner(plan.WithDefaultMemoryLimit(1024)) + spec, err := planner.Plan(lp, nil) + if err != nil { + t.Fatal(err) + } + + // The plan spec should have 1024 set for the memory limits. + if got, exp := spec.Resources.MemoryBytesQuota, int64(1024); got != exp { + t.Fatalf("unexpected memory bytes quota: exp=%d got=%d", exp, got) + } +} diff --git a/query/plan/physical.go b/query/plan/physical.go index f8c668f561..375099dd39 100644 --- a/query/plan/physical.go +++ b/query/plan/physical.go @@ -58,13 +58,21 @@ type PlanRewriter interface { } type planner struct { - plan *PlanSpec + plan *PlanSpec + defaultMemoryLimit int64 modified bool } -func NewPlanner() Planner { - return new(planner) +// NewPlanner constructs a new physical planner while applying the given options. +func NewPlanner(opts ...Option) Planner { + p := &planner{ + defaultMemoryLimit: math.MaxInt64, + } + for _, opt := range opts { + opt.apply(p) + } + return p } func resolveTime(qt query.Time, now time.Time) values.Time { @@ -236,7 +244,7 @@ func (p *planner) Plan(lp *LogicalPlanSpec, s Storage) (*PlanSpec, error) { } // Update memory quota if p.plan.Resources.MemoryBytesQuota == 0 { - p.plan.Resources.MemoryBytesQuota = math.MaxInt64 + p.plan.Resources.MemoryBytesQuota = p.defaultMemoryLimit } return p.plan, nil