892 lines
24 KiB
892 lines
24 KiB
// Package control keeps track of resources and manages queries.
// The Controller manages the resources available to each query by
// managing the memory allocation and concurrency usage of each query.
// The Controller will compile a program by using the passed in language
// and it will start the program using the ResourceManager.
// It will guarantee that each program that is started has at least
// one goroutine that it can use with the dispatcher and it will
// ensure a minimum amount of memory is available before the program
// runs.
// Other goroutines and memory usage is at the will of the specific
// resource strategy that the Controller is using.
// The Controller also provides visibility into the lifetime of the query
// and its current resource usage.
package control
import (
platform "github.com/influxdata/influxdb"
opentracing "github.com/opentracing/opentracing-go"
// orgLabel is the metric label to use in the controller
const orgLabel = "org"
// Controller provides a central location to manage all incoming queries.
// The controller is responsible for compiling, queueing, and executing queries.
type Controller struct {
lastID uint64
queriesMu sync.RWMutex
queries map[QueryID]*Query
queryQueue chan *Query
wg sync.WaitGroup
shutdown bool
done chan struct{}
abortOnce sync.Once
abort chan struct{}
memoryBytesQuotaPerQuery int64
metrics *controllerMetrics
labelKeys []string
logger *zap.Logger
dependencies execute.Dependencies
type Config struct {
// ConcurrencyQuota is the number of queries that are allowed to execute concurrently.
ConcurrencyQuota int
// MemoryBytesQuotaPerQuery is the maximum number of bytes (in table memory) a query is allowed to use at
// any given time.
// The maximum amount of memory the controller is allowed to consume is
// ConcurrencyQuota * MemoryBytesQuotaPerQuery
MemoryBytesQuotaPerQuery int64
// QueueSize is the number of queries that are allowed to be awaiting execution before new queries are
// rejected.
QueueSize int
Logger *zap.Logger
// MetricLabelKeys is a list of labels to add to the metrics produced by the controller.
// The value for a given key will be read off the context.
// The context value must be a string or an implementation of the Stringer interface.
MetricLabelKeys []string
ExecutorDependencies execute.Dependencies
func (c *Config) Validate() error {
if c.ConcurrencyQuota <= 0 {
return errors.New("ConcurrencyQuota must be positive")
if c.MemoryBytesQuotaPerQuery <= 0 {
return errors.New("MemoryBytesQuotaPerQuery must be positive")
if c.QueueSize <= 0 {
return errors.New("QueueSize must be positive")
return nil
type QueryID uint64
func New(c Config) (*Controller, error) {
if err := c.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid controller config")
c.MetricLabelKeys = append(c.MetricLabelKeys, orgLabel)
logger := c.Logger
if logger == nil {
logger = zap.NewNop()
logger.Info("Starting query controller",
zap.Int("concurrency_quota", c.ConcurrencyQuota),
zap.Int64("memory_bytes_quota_per_query", c.MemoryBytesQuotaPerQuery),
zap.Int("queue_size", c.QueueSize))
ctrl := &Controller{
queries: make(map[QueryID]*Query),
queryQueue: make(chan *Query, c.QueueSize),
done: make(chan struct{}),
abort: make(chan struct{}),
memoryBytesQuotaPerQuery: c.MemoryBytesQuotaPerQuery,
logger: logger,
metrics: newControllerMetrics(c.MetricLabelKeys),
labelKeys: c.MetricLabelKeys,
dependencies: c.ExecutorDependencies,
for i := 0; i < c.ConcurrencyQuota; i++ {
go func() {
defer ctrl.wg.Done()
return ctrl, nil
// Query satisfies the AsyncQueryService while ensuring the request is propagated on the context.
func (c *Controller) Query(ctx context.Context, req *query.Request) (flux.Query, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
// Set the request on the context so platform specific Flux operations can retrieve it later.
ctx = query.ContextWithRequest(ctx, req)
// Set the org label value for controller metrics
ctx = context.WithValue(ctx, orgLabel, req.OrganizationID.String())
q, err := c.query(ctx, req.Compiler)
if err != nil {
// If the controller reports an error, it's usually because of a syntax error
// or other problem that the client must fix.
return q, &platform.Error{
Code: platform.EInvalid,
Msg: err.Error(),
return q, nil
// query submits a query for execution returning immediately.
// Done must be called on any returned Query objects.
func (c *Controller) query(ctx context.Context, compiler flux.Compiler) (flux.Query, error) {
q, err := c.createQuery(ctx, compiler.CompilerType())
if err != nil {
return nil, err
if err := c.compileQuery(q, compiler); err != nil {
c.countQueryRequest(q, labelCompileError)
return nil, q.Err()
if err := c.enqueueQuery(q); err != nil {
c.countQueryRequest(q, labelQueueError)
return nil, q.Err()
return q, nil
func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Query, error) {
if c.shutdown {
return nil, errors.New("query controller shutdown")
id := c.nextID()
labelValues := make([]string, len(c.labelKeys))
compileLabelValues := make([]string, len(c.labelKeys)+1)
for i, k := range c.labelKeys {
value := ctx.Value(k)
var str string
switch v := value.(type) {
case string:
str = v
case fmt.Stringer:
str = v.String()
labelValues[i] = str
compileLabelValues[i] = str
compileLabelValues[len(compileLabelValues)-1] = string(ct)
cctx, cancel := context.WithCancel(ctx)
parentSpan, parentCtx := StartSpanFromContext(
q := &Query{
id: id,
labelValues: labelValues,
compileLabelValues: compileLabelValues,
state: Created,
c: c,
results: make(chan flux.Result),
parentCtx: parentCtx,
parentSpan: parentSpan,
cancel: cancel,
doneCh: make(chan struct{}),
// Lock the queries mutex for the rest of this method.
defer c.queriesMu.Unlock()
if c.shutdown {
// Query controller was shutdown between when we started
// creating the query and ending it.
err := errors.New("query controller shutdown")
return nil, err
c.queries[id] = q
return q, nil
func (c *Controller) nextID() QueryID {
nextID := atomic.AddUint64(&c.lastID, 1)
return QueryID(nextID)
func (c *Controller) countQueryRequest(q *Query, result requestsLabel) {
l := len(q.labelValues)
lvs := make([]string, l+1)
copy(lvs, q.labelValues)
lvs[l] = string(result)
func (c *Controller) compileQuery(q *Query, compiler flux.Compiler) (err error) {
defer func() {
if e := recover(); e != nil {
var ok bool
err, ok = e.(error)
if !ok {
err = fmt.Errorf("panic: %v", e)
if entry := c.logger.Check(zapcore.InfoLevel, "panic during compile"); entry != nil {
entry.Stack = string(debug.Stack())
ctx, ok := q.tryCompile()
if !ok {
return errors.New("failed to transition query to compiling state")
prog, err := compiler.Compile(ctx)
if err != nil {
return errors.Wrap(err, "compilation failed")
if p, ok := prog.(lang.DependenciesAwareProgram); ok {
q.program = prog
return nil
func (c *Controller) enqueueQuery(q *Query) error {
if _, ok := q.tryQueue(); !ok {
return errors.New("failed to transition query to queueing state")
select {
case c.queryQueue <- q:
return errors.New("queue length exceeded")
return nil
func (c *Controller) processQueryQueue() {
for {
select {
case <-c.done:
case q := <-c.queryQueue:
func (c *Controller) executeQuery(q *Query) {
defer func() {
if e := recover(); e != nil {
var ok bool
err, ok := e.(error)
if !ok {
err = fmt.Errorf("panic: %v", e)
if entry := c.logger.Check(zapcore.InfoLevel, "panic during program start"); entry != nil {
entry.Stack = string(debug.Stack())
ctx, ok := q.tryExec()
if !ok {
// This may happen if the query was cancelled (either because the
// client cancelled it, or because the controller is shutting down)
// In the case of cancellation, SetErr() should reset the error to an
// appropriate message.
q.setErr(errors.New("impossible state transition"))
q.alloc = new(memory.Allocator)
q.alloc.Limit = func(v int64) *int64 { return &v }(c.memoryBytesQuotaPerQuery)
exec, err := q.program.Start(ctx, q.alloc)
if err != nil {
q.exec = exec
q.pump(exec, ctx.Done())
func (c *Controller) finish(q *Query) {
delete(c.queries, q.id)
if len(c.queries) == 0 && c.shutdown {
// Queries reports the active queries.
func (c *Controller) Queries() []*Query {
defer c.queriesMu.RUnlock()
queries := make([]*Query, 0, len(c.queries))
for _, q := range c.queries {
queries = append(queries, q)
return queries
// Shutdown will signal to the Controller that it should not accept any
// new queries and that it should finish executing any existing queries.
// This will return once the Controller's run loop has been exited and all
// queries have been finished or until the Context has been canceled.
func (c *Controller) Shutdown(ctx context.Context) error {
// Mark that the controller is shutdown so it does not
// accept new queries.
c.shutdown = true
if len(c.queries) == 0 {
return nil
// Cancel all of the currently active queries.
for _, q := range c.queries {
// Wait for query processing goroutines to finish.
defer c.wg.Wait()
// Wait for all of the queries to be cleaned up or until the
// context is done.
select {
case <-c.done:
return nil
case <-ctx.Done():
c.abortOnce.Do(func() {
return ctx.Err()
// PrometheusCollectors satisifies the prom.PrometheusCollector interface.
func (c *Controller) PrometheusCollectors() []prometheus.Collector {
return c.metrics.PrometheusCollectors()
// Query represents a single request.
type Query struct {
id QueryID
labelValues []string
compileLabelValues []string
c *Controller
// query state. The stateMu protects access for the group below.
stateMu sync.RWMutex
state State
err error
runtimeErrs []error
cancel func()
parentCtx context.Context
parentSpan, currentSpan *span
stats flux.Statistics
done sync.Once
doneCh chan struct{}
program flux.Program
exec flux.Query
results chan flux.Result
alloc *memory.Allocator
// ID reports an ephemeral unique ID for the query.
func (q *Query) ID() QueryID {
return q.id
// Cancel will stop the query execution.
func (q *Query) Cancel() {
// Call the cancel function to signal that execution should
// be interrupted.
// Results returns a channel that will deliver the query results.
// It's possible that the channel is closed before any results arrive.
// In particular, if a query's context or the query itself is canceled,
// the query may close the results channel before any results are computed.
// The query may also have an error during execution so the Err()
// function should be used to check if an error happened.
func (q *Query) Results() <-chan flux.Result {
return q.results
// Done signals to the Controller that this query is no longer
// being used and resources related to the query may be freed.
func (q *Query) Done() {
// This must only be invoked once.
q.done.Do(func() {
// All done calls should block until the first done call succeeds.
defer close(q.doneCh)
// Lock the state mutex and transition to the finished state.
// Then force the query to cancel to tell it to stop executing.
// We transition to the new state first so that we do not enter
// the canceled state at any point (as we have not been canceled).
// Ensure that all of the results have been drained.
// It is ok to read this as the user has already indicated they don't
// care about the results. When this is closed, it tells us an error has
// been set or the results have finished being pumped.
for range q.results {
// Do nothing with the results.
// No other goroutines should be modifying state at this point so we
// can do things that would be unsafe in another context.
if q.exec != nil {
// Mark the program as being done and copy out the error if it exists.
if q.err == nil {
// TODO(jsternberg): The underlying program never returns
// this so maybe their interface should change?
q.err = q.exec.Err()
// Merge the metadata from the program into the controller stats.
stats := q.exec.Statistics()
q.stats.Metadata = stats.Metadata
// Retrieve the runtime errors that have been accumulated.
errMsgs := make([]string, 0, len(q.runtimeErrs))
for _, e := range q.runtimeErrs {
errMsgs = append(errMsgs, e.Error())
q.stats.RuntimeErrors = errMsgs
// Mark the query as finished so it is removed from the query map.
// count query request
if q.err != nil || len(q.runtimeErrs) > 0 {
q.c.countQueryRequest(q, labelRuntimeError)
} else {
q.c.countQueryRequest(q, labelSuccess)
// Statistics reports the statistics for the query.
// This method must be called after Done. It will block until
// the query has been finalized unless a context is given.
func (q *Query) Statistics() flux.Statistics {
stats := q.stats
if q.alloc != nil {
stats.MaxAllocated = q.alloc.MaxAllocated()
return stats
// State reports the current state of the query.
func (q *Query) State() State {
state := q.state
if !isFinishedState(state) {
// If the query is a non-finished state, check the
// context to see if we have been interrupted.
select {
case <-q.parentCtx.Done():
// The query has been canceled so report to the
// outside world that we have been canceled.
// Do NOT attempt to change the internal state
// variable here. It is a minefield. Leave the
// normal query execution to figure that out.
state = Canceled
// The context has not been canceled.
return state
// transitionTo will transition from one state to another. If a list of current states
// is given, then the query must be in one of those states for the transition to succeed.
// This method must be called with a lock and it must be called from within the run loop.
func (q *Query) transitionTo(newState State, currentState ...State) (context.Context, bool) {
// If we are transitioning to a non-finished state, the query
// may have been canceled. If the query was canceled, then
// we need to transition to the canceled state
if !isFinishedState(newState) {
select {
case <-q.parentCtx.Done():
// Transition to the canceled state and report that
// we failed to transition to the desired state.
_, _ = q.transitionTo(Canceled)
return nil, false
if len(currentState) > 0 {
// Find the current state in the list of current states.
for _, st := range currentState {
if q.state == st {
return nil, false
// We are transitioning to a new state. Close the current span (if it exists).
if q.currentSpan != nil {
switch q.state {
case Compiling:
q.stats.CompileDuration += q.currentSpan.Duration
case Queueing:
q.stats.QueueDuration += q.currentSpan.Duration
case Executing:
q.stats.ExecuteDuration += q.currentSpan.Duration
q.currentSpan = nil
if isFinishedState(newState) {
// Invoke the cancel function to ensure that we have signaled that the query should be done.
// The user is supposed to read the entirety of the tables returned before we end up in a finished
// state, but user error may have caused this not to happen so there's no harm to canceling multiple
// times.
// If we are transitioning to a finished state from a non-finished state, finish the parent span.
if q.parentSpan != nil {
q.stats.TotalDuration = q.parentSpan.Duration
q.parentSpan = nil
// Transition to the new state.
q.state = newState
// Start a new span and set a new context.
var (
dur *prometheus.HistogramVec
gauge *prometheus.GaugeVec
labelValues = q.labelValues
switch newState {
case Compiling:
dur, gauge = q.c.metrics.compilingDur, q.c.metrics.compiling
labelValues = q.compileLabelValues
case Queueing:
dur, gauge = q.c.metrics.queueingDur, q.c.metrics.queueing
case Executing:
dur, gauge = q.c.metrics.executingDur, q.c.metrics.executing
// This state is not tracked so do not create a new span or context for it.
// Use the parent context if one is needed.
return q.parentCtx, true
var currentCtx context.Context
q.currentSpan, currentCtx = StartSpanFromContext(
return currentCtx, true
// Err reports any error the query may have encountered.
func (q *Query) Err() error {
err := q.err
return err
// setErr marks this query with an error. If the query was
// canceled, then the error is ignored.
// This will mark the query as ready so setResults must not
// be called if this method is invoked.
func (q *Query) setErr(err error) {
defer q.stateMu.Unlock()
// We may have this get called when the query is canceled.
// If that is the case, transition to the canceled state
// instead and record the error from that since the error
// we received is probably wrong.
select {
case <-q.parentCtx.Done():
err = q.parentCtx.Err()
q.err = err
// Close the ready channel to report that no results
// will be sent.
func (q *Query) addRuntimeError(e error) {
defer q.stateMu.Unlock()
q.runtimeErrs = append(q.runtimeErrs, e)
// pump will read from the executing query results and pump the
// results to our destination.
// When there are no more results, then this will close our own
// results channel.
func (q *Query) pump(exec flux.Query, done <-chan struct{}) {
defer close(q.results)
// When our context is canceled, we need to propagate that cancel
// signal down to the executing program just in case it is waiting
// for a cancel signal and is ignoring the passed in context.
// We want this signal to only be sent once and we want to continue
// draining the results until the underlying program has actually
// been finished so we copy this to a new channel and set it to
// nil when it has been closed.
signalCh := done
for {
select {
case res, ok := <-exec.Results():
if !ok {
// It is possible for the underlying query to misbehave.
// We have to continue pumping results even if this is the
// case, but if the query has been canceled or finished with
// done, nobody is going to read these values so we need
// to avoid blocking.
ecr := &errorCollectingResult{
Result: res,
q: q,
select {
case <-done:
case q.results <- ecr:
case <-signalCh:
// Signal to the underlying executor that the query
// has been canceled. Usually, the signal on the context
// is likely enough, but this explicitly signals just in case.
// Set the done channel to nil so we don't do this again
// and we continue to drain the results.
signalCh = nil
case <-q.c.abort:
// If we get here, then any running queries should have been cancelled
// in controller.Shutdown().
// tryCompile attempts to transition the query into the Compiling state.
func (q *Query) tryCompile() (context.Context, bool) {
defer q.stateMu.Unlock()
return q.transitionTo(Compiling, Created)
// tryQueue attempts to transition the query into the Queueing state.
func (q *Query) tryQueue() (context.Context, bool) {
defer q.stateMu.Unlock()
return q.transitionTo(Queueing, Compiling)
// tryExec attempts to transition the query into the Executing state.
func (q *Query) tryExec() (context.Context, bool) {
defer q.stateMu.Unlock()
return q.transitionTo(Executing, Queueing)
type errorCollectingResult struct {
q *Query
func (r *errorCollectingResult) Tables() flux.TableIterator {
return &errorCollectingTableIterator{
TableIterator: r.Result.Tables(),
q: r.q,
type errorCollectingTableIterator struct {
q *Query
func (ti *errorCollectingTableIterator) Do(f func(t flux.Table) error) error {
err := ti.TableIterator.Do(f)
if err != nil {
return err
// State is the query state.
type State int
const (
// Created indicates the query has been created.
Created State = iota
// Compiling indicates that the query is in the process
// of executing the compiler associated with the query.
// Queueing indicates the query is waiting inside of the
// scheduler to be executed.
// TODO(jsternberg): This stage isn't used currently, but
// it makes sense to readd this once we have a work queue again.
// Executing indicates that the query is currently executing.
// Errored indicates that there was an error when attempting
// to execute a query within any state inside of the controller.
// Finished indicates that the query has been marked as Done
// and it is awaiting removal from the Controller or has already
// been removed.
// Canceled indicates that the query was signaled to be
// canceled. A canceled query must still be released with Done.
func (s State) String() string {
switch s {
case Created:
return "created"
case Compiling:
return "compiling"
case Queueing:
return "queueing"
case Executing:
return "executing"
case Errored:
return "errored"
case Finished:
return "finished"
case Canceled:
return "canceled"
return "unknown"
func isFinishedState(state State) bool {
switch state {
case Canceled, Errored, Finished:
return true
return false
// span is a simple wrapper around opentracing.Span in order to
// get access to the duration of the span for metrics reporting.
type span struct {
s opentracing.Span
start time.Time
Duration time.Duration
hist prometheus.Observer
gauge prometheus.Gauge
func StartSpanFromContext(ctx context.Context, operationName string, hist prometheus.Observer, gauge prometheus.Gauge) (*span, context.Context) {
start := time.Now()
s, sctx := opentracing.StartSpanFromContext(ctx, operationName, opentracing.StartTime(start))
return &span{
s: s,
start: start,
hist: hist,
gauge: gauge,
}, sctx
func (s *span) Finish() {
finish := time.Now()
s.Duration = finish.Sub(s.start)
FinishTime: finish,