Merge pull request #725 from influxdata/js-default-memory-limit-for-planner
feat(query/plan): make it possible to configure the default memory limit for queriespull/10616/head
commit
5b4506db1d
|
@ -66,6 +66,7 @@ type Config struct {
|
||||||
ConcurrencyQuota int
|
ConcurrencyQuota int
|
||||||
MemoryBytesQuota int64
|
MemoryBytesQuota int64
|
||||||
ExecutorDependencies execute.Dependencies
|
ExecutorDependencies execute.Dependencies
|
||||||
|
PlannerOptions []plan.Option
|
||||||
Logger *zap.Logger
|
Logger *zap.Logger
|
||||||
Verbose bool
|
Verbose bool
|
||||||
}
|
}
|
||||||
|
@ -86,7 +87,7 @@ func New(c Config) *Controller {
|
||||||
availableConcurrency: c.ConcurrencyQuota,
|
availableConcurrency: c.ConcurrencyQuota,
|
||||||
availableMemory: c.MemoryBytesQuota,
|
availableMemory: c.MemoryBytesQuota,
|
||||||
lplanner: plan.NewLogicalPlanner(),
|
lplanner: plan.NewLogicalPlanner(),
|
||||||
pplanner: plan.NewPlanner(),
|
pplanner: plan.NewPlanner(c.PlannerOptions...),
|
||||||
executor: execute.NewExecutor(c.ExecutorDependencies, logger),
|
executor: execute.NewExecutor(c.ExecutorDependencies, logger),
|
||||||
logger: logger,
|
logger: logger,
|
||||||
metrics: newControllerMetrics(),
|
metrics: newControllerMetrics(),
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
package plan
|
||||||
|
|
||||||
|
// Option is an option to configure the behavior of the planner.
|
||||||
|
type Option interface {
|
||||||
|
apply(*planner)
|
||||||
|
}
|
||||||
|
|
||||||
|
type optionFunc func(*planner)
|
||||||
|
|
||||||
|
func (opt optionFunc) apply(p *planner) {
|
||||||
|
opt(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithDefaultMemoryLimit sets the default memory limit for plans generated by the planner.
|
||||||
|
// If the query spec explicitly sets a memory limit, that limit is used instead of the default.
|
||||||
|
func WithDefaultMemoryLimit(memBytes int64) Option {
|
||||||
|
return optionFunc(func(p *planner) {
|
||||||
|
p.defaultMemoryLimit = memBytes
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,71 @@
|
||||||
|
package plan_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/platform/query"
|
||||||
|
"github.com/influxdata/platform/query/functions"
|
||||||
|
"github.com/influxdata/platform/query/plan"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPhysicalPlanner_DefaultMemoryLimit(t *testing.T) {
|
||||||
|
// Simple logical plan taken from the planner tests.
|
||||||
|
lp := &plan.LogicalPlanSpec{
|
||||||
|
Now: time.Now(),
|
||||||
|
Resources: query.ResourceManagement{
|
||||||
|
ConcurrencyQuota: 1,
|
||||||
|
},
|
||||||
|
Procedures: map[plan.ProcedureID]*plan.Procedure{
|
||||||
|
plan.ProcedureIDFromOperationID("from"): {
|
||||||
|
ID: plan.ProcedureIDFromOperationID("from"),
|
||||||
|
Spec: &functions.FromProcedureSpec{
|
||||||
|
Database: "mydb",
|
||||||
|
},
|
||||||
|
Parents: nil,
|
||||||
|
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")},
|
||||||
|
},
|
||||||
|
plan.ProcedureIDFromOperationID("range"): {
|
||||||
|
ID: plan.ProcedureIDFromOperationID("range"),
|
||||||
|
Spec: &functions.RangeProcedureSpec{
|
||||||
|
Bounds: query.Bounds{
|
||||||
|
Start: query.Time{
|
||||||
|
IsRelative: true,
|
||||||
|
Relative: -1 * time.Hour,
|
||||||
|
},
|
||||||
|
Stop: query.Now,
|
||||||
|
},
|
||||||
|
TimeCol: "_time",
|
||||||
|
},
|
||||||
|
Parents: []plan.ProcedureID{
|
||||||
|
plan.ProcedureIDFromOperationID("from"),
|
||||||
|
},
|
||||||
|
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("count")},
|
||||||
|
},
|
||||||
|
plan.ProcedureIDFromOperationID("count"): {
|
||||||
|
ID: plan.ProcedureIDFromOperationID("count"),
|
||||||
|
Spec: &functions.CountProcedureSpec{},
|
||||||
|
Parents: []plan.ProcedureID{
|
||||||
|
(plan.ProcedureIDFromOperationID("range")),
|
||||||
|
},
|
||||||
|
Children: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Order: []plan.ProcedureID{
|
||||||
|
plan.ProcedureIDFromOperationID("from"),
|
||||||
|
plan.ProcedureIDFromOperationID("range"),
|
||||||
|
plan.ProcedureIDFromOperationID("count"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
planner := plan.NewPlanner(plan.WithDefaultMemoryLimit(1024))
|
||||||
|
spec, err := planner.Plan(lp, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The plan spec should have 1024 set for the memory limits.
|
||||||
|
if got, exp := spec.Resources.MemoryBytesQuota, int64(1024); got != exp {
|
||||||
|
t.Fatalf("unexpected memory bytes quota: exp=%d got=%d", exp, got)
|
||||||
|
}
|
||||||
|
}
|
|
@ -58,13 +58,21 @@ type PlanRewriter interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type planner struct {
|
type planner struct {
|
||||||
plan *PlanSpec
|
plan *PlanSpec
|
||||||
|
defaultMemoryLimit int64
|
||||||
|
|
||||||
modified bool
|
modified bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPlanner() Planner {
|
// NewPlanner constructs a new physical planner while applying the given options.
|
||||||
return new(planner)
|
func NewPlanner(opts ...Option) Planner {
|
||||||
|
p := &planner{
|
||||||
|
defaultMemoryLimit: math.MaxInt64,
|
||||||
|
}
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt.apply(p)
|
||||||
|
}
|
||||||
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
func resolveTime(qt query.Time, now time.Time) values.Time {
|
func resolveTime(qt query.Time, now time.Time) values.Time {
|
||||||
|
@ -236,7 +244,7 @@ func (p *planner) Plan(lp *LogicalPlanSpec, s Storage) (*PlanSpec, error) {
|
||||||
}
|
}
|
||||||
// Update memory quota
|
// Update memory quota
|
||||||
if p.plan.Resources.MemoryBytesQuota == 0 {
|
if p.plan.Resources.MemoryBytesQuota == 0 {
|
||||||
p.plan.Resources.MemoryBytesQuota = math.MaxInt64
|
p.plan.Resources.MemoryBytesQuota = p.defaultMemoryLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.plan, nil
|
return p.plan, nil
|
||||||
|
|
Loading…
Reference in New Issue