feat(query/plan): make it possible to configure the default memory limit for queries
The previous default was just to have no limit at all. This adds a configuration option to the planner so a static value can be set for the memory limit on each individual query.pull/10616/head
parent
2efa84131d
commit
aa052e1a57
|
@ -66,6 +66,7 @@ type Config struct {
|
|||
ConcurrencyQuota int
|
||||
MemoryBytesQuota int64
|
||||
ExecutorDependencies execute.Dependencies
|
||||
PlannerOptions []plan.Option
|
||||
Logger *zap.Logger
|
||||
Verbose bool
|
||||
}
|
||||
|
@ -86,7 +87,7 @@ func New(c Config) *Controller {
|
|||
availableConcurrency: c.ConcurrencyQuota,
|
||||
availableMemory: c.MemoryBytesQuota,
|
||||
lplanner: plan.NewLogicalPlanner(),
|
||||
pplanner: plan.NewPlanner(),
|
||||
pplanner: plan.NewPlanner(c.PlannerOptions...),
|
||||
executor: execute.NewExecutor(c.ExecutorDependencies, logger),
|
||||
logger: logger,
|
||||
metrics: newControllerMetrics(),
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
package plan
|
||||
|
||||
// Option is an option to configure the behavior of the planner.
|
||||
type Option interface {
|
||||
apply(*planner)
|
||||
}
|
||||
|
||||
type optionFunc func(*planner)
|
||||
|
||||
func (opt optionFunc) apply(p *planner) {
|
||||
opt(p)
|
||||
}
|
||||
|
||||
// WithDefaultMemoryLimit sets the default memory limit for plans generated by the planner.
|
||||
// If the query spec explicitly sets a memory limit, that limit is used instead of the default.
|
||||
func WithDefaultMemoryLimit(memBytes int64) Option {
|
||||
return optionFunc(func(p *planner) {
|
||||
p.defaultMemoryLimit = memBytes
|
||||
})
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
package plan_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
"github.com/influxdata/platform/query/plan"
|
||||
)
|
||||
|
||||
func TestPhysicalPlanner_DefaultMemoryLimit(t *testing.T) {
|
||||
// Simple logical plan taken from the planner tests.
|
||||
lp := &plan.LogicalPlanSpec{
|
||||
Now: time.Now(),
|
||||
Resources: query.ResourceManagement{
|
||||
ConcurrencyQuota: 1,
|
||||
},
|
||||
Procedures: map[plan.ProcedureID]*plan.Procedure{
|
||||
plan.ProcedureIDFromOperationID("from"): {
|
||||
ID: plan.ProcedureIDFromOperationID("from"),
|
||||
Spec: &functions.FromProcedureSpec{
|
||||
Database: "mydb",
|
||||
},
|
||||
Parents: nil,
|
||||
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")},
|
||||
},
|
||||
plan.ProcedureIDFromOperationID("range"): {
|
||||
ID: plan.ProcedureIDFromOperationID("range"),
|
||||
Spec: &functions.RangeProcedureSpec{
|
||||
Bounds: query.Bounds{
|
||||
Start: query.Time{
|
||||
IsRelative: true,
|
||||
Relative: -1 * time.Hour,
|
||||
},
|
||||
Stop: query.Now,
|
||||
},
|
||||
TimeCol: "_time",
|
||||
},
|
||||
Parents: []plan.ProcedureID{
|
||||
plan.ProcedureIDFromOperationID("from"),
|
||||
},
|
||||
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("count")},
|
||||
},
|
||||
plan.ProcedureIDFromOperationID("count"): {
|
||||
ID: plan.ProcedureIDFromOperationID("count"),
|
||||
Spec: &functions.CountProcedureSpec{},
|
||||
Parents: []plan.ProcedureID{
|
||||
(plan.ProcedureIDFromOperationID("range")),
|
||||
},
|
||||
Children: nil,
|
||||
},
|
||||
},
|
||||
Order: []plan.ProcedureID{
|
||||
plan.ProcedureIDFromOperationID("from"),
|
||||
plan.ProcedureIDFromOperationID("range"),
|
||||
plan.ProcedureIDFromOperationID("count"),
|
||||
},
|
||||
}
|
||||
|
||||
planner := plan.NewPlanner(plan.WithDefaultMemoryLimit(1024))
|
||||
spec, err := planner.Plan(lp, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// The plan spec should have 1024 set for the memory limits.
|
||||
if got, exp := spec.Resources.MemoryBytesQuota, int64(1024); got != exp {
|
||||
t.Fatalf("unexpected memory bytes quota: exp=%d got=%d", exp, got)
|
||||
}
|
||||
}
|
|
@ -58,13 +58,21 @@ type PlanRewriter interface {
|
|||
}
|
||||
|
||||
type planner struct {
|
||||
plan *PlanSpec
|
||||
plan *PlanSpec
|
||||
defaultMemoryLimit int64
|
||||
|
||||
modified bool
|
||||
}
|
||||
|
||||
func NewPlanner() Planner {
|
||||
return new(planner)
|
||||
// NewPlanner constructs a new physical planner while applying the given options.
|
||||
func NewPlanner(opts ...Option) Planner {
|
||||
p := &planner{
|
||||
defaultMemoryLimit: math.MaxInt64,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt.apply(p)
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func resolveTime(qt query.Time, now time.Time) values.Time {
|
||||
|
@ -236,7 +244,7 @@ func (p *planner) Plan(lp *LogicalPlanSpec, s Storage) (*PlanSpec, error) {
|
|||
}
|
||||
// Update memory quota
|
||||
if p.plan.Resources.MemoryBytesQuota == 0 {
|
||||
p.plan.Resources.MemoryBytesQuota = math.MaxInt64
|
||||
p.plan.Resources.MemoryBytesQuota = p.defaultMemoryLimit
|
||||
}
|
||||
|
||||
return p.plan, nil
|
||||
|
|
Loading…
Reference in New Issue