2019-05-29 14:04:34 +00:00
// Package control keeps track of resources and manages queries.
//
// The Controller manages the resources available to each query by
// managing the memory allocation and concurrency usage of each query.
// The Controller will compile a program by using the passed in language
// and it will start the program using the ResourceManager.
//
// It will guarantee that each program that is started has at least
// one goroutine that it can use with the dispatcher and it will
// ensure a minimum amount of memory is available before the program
// runs.
//
// Other goroutines and memory usage is at the will of the specific
// resource strategy that the Controller is using.
//
// The Controller also provides visibility into the lifetime of the query
// and its current resource usage.
2018-09-11 22:56:51 +00:00
package control
import (
"context"
2019-05-29 14:04:34 +00:00
"fmt"
2019-06-10 17:23:12 +00:00
"runtime/debug"
2019-05-29 14:04:34 +00:00
"sync"
"sync/atomic"
2018-09-11 22:56:51 +00:00
"github.com/influxdata/flux"
2019-07-18 16:43:15 +00:00
"github.com/influxdata/flux/codes"
2020-09-17 00:59:15 +00:00
"github.com/influxdata/flux/execute/table"
2019-05-29 14:04:34 +00:00
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/memory"
2020-09-17 00:59:15 +00:00
"github.com/influxdata/flux/runtime"
2020-04-03 17:39:20 +00:00
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/errors"
2020-09-17 00:59:15 +00:00
"github.com/influxdata/influxdb/v2/kit/feature"
2020-04-03 17:39:20 +00:00
"github.com/influxdata/influxdb/v2/kit/prom"
"github.com/influxdata/influxdb/v2/kit/tracing"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/query"
2019-05-29 14:04:34 +00:00
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
2019-06-10 17:23:12 +00:00
"go.uber.org/zap/zapcore"
2018-09-11 22:56:51 +00:00
)
// orgLabel is the metric label to use in the controller
const orgLabel = "org"
2019-05-29 14:04:34 +00:00
// Controller provides a central location to manage all incoming queries.
// The controller is responsible for compiling, queueing, and executing queries.
2018-09-11 22:56:51 +00:00
type Controller struct {
2020-03-31 08:52:55 +00:00
config Config
2019-05-29 14:04:34 +00:00
lastID uint64
queriesMu sync . RWMutex
queries map [ QueryID ] * Query
queryQueue chan * Query
wg sync . WaitGroup
shutdown bool
done chan struct { }
abortOnce sync . Once
abort chan struct { }
2019-10-09 22:27:10 +00:00
memory * memoryManager
2019-05-29 14:04:34 +00:00
metrics * controllerMetrics
labelKeys [ ] string
2019-12-04 23:10:23 +00:00
log * zap . Logger
2019-05-29 14:04:34 +00:00
2019-09-19 15:01:17 +00:00
dependencies [ ] flux . Dependency
2018-09-11 22:56:51 +00:00
}
2019-05-29 14:04:34 +00:00
type Config struct {
// ConcurrencyQuota is the number of queries that are allowed to execute concurrently.
2020-11-24 15:14:16 +00:00
//
// This value is limited to an int32 because it's used to set the initial delta on the
// controller's WaitGroup, and WG deltas have an effective limit of math.MaxInt32.
// See: https://github.com/golang/go/issues/20687
ConcurrencyQuota int32
2019-10-09 22:27:10 +00:00
// InitialMemoryBytesQuotaPerQuery is the initial number of bytes allocated for a query
// when it is started. If this is unset, then the MemoryBytesQuotaPerQuery will be used.
InitialMemoryBytesQuotaPerQuery int64
2019-05-29 14:04:34 +00:00
// MemoryBytesQuotaPerQuery is the maximum number of bytes (in table memory) a query is allowed to use at
// any given time.
//
2019-10-09 22:27:10 +00:00
// A query may not be able to use its entire quota of memory if requesting more memory would conflict
// with the maximum amount of memory that the controller can request.
2019-05-29 14:04:34 +00:00
MemoryBytesQuotaPerQuery int64
2019-10-09 22:27:10 +00:00
// MaxMemoryBytes is the maximum amount of memory the controller is allowed to
// allocated to queries.
//
// If this is unset, then this number is ConcurrencyQuota * MemoryBytesQuotaPerQuery.
// This number must be greater than or equal to the ConcurrencyQuota * InitialMemoryBytesQuotaPerQuery.
// This number may be less than the ConcurrencyQuota * MemoryBytesQuotaPerQuery.
MaxMemoryBytes int64
2019-05-29 14:04:34 +00:00
// QueueSize is the number of queries that are allowed to be awaiting execution before new queries are
// rejected.
QueueSize int
Logger * zap . Logger
// MetricLabelKeys is a list of labels to add to the metrics produced by the controller.
// The value for a given key will be read off the context.
// The context value must be a string or an implementation of the Stringer interface.
MetricLabelKeys [ ] string
2019-09-19 15:01:17 +00:00
ExecutorDependencies [ ] flux . Dependency
2019-05-29 14:04:34 +00:00
}
2019-10-09 22:27:10 +00:00
// complete will fill in the defaults, validate the configuration, and
// return the new Config.
func ( c * Config ) complete ( ) ( Config , error ) {
config := * c
if config . InitialMemoryBytesQuotaPerQuery == 0 {
config . InitialMemoryBytesQuotaPerQuery = config . MemoryBytesQuotaPerQuery
}
if err := config . validate ( true ) ; err != nil {
return Config { } , err
}
return config , nil
}
func ( c * Config ) validate ( isComplete bool ) error {
2019-05-29 14:04:34 +00:00
if c . ConcurrencyQuota <= 0 {
return errors . New ( "ConcurrencyQuota must be positive" )
}
if c . MemoryBytesQuotaPerQuery <= 0 {
return errors . New ( "MemoryBytesQuotaPerQuery must be positive" )
2019-04-22 19:11:32 +00:00
}
2019-10-09 22:27:10 +00:00
if c . InitialMemoryBytesQuotaPerQuery < 0 || ( isComplete && c . InitialMemoryBytesQuotaPerQuery == 0 ) {
return errors . New ( "InitialMemoryBytesQuotaPerQuery must be positive" )
}
if c . MaxMemoryBytes < 0 {
return errors . New ( "MaxMemoryBytes must be positive" )
}
if c . MaxMemoryBytes != 0 {
if minMemory := int64 ( c . ConcurrencyQuota ) * c . InitialMemoryBytesQuotaPerQuery ; c . MaxMemoryBytes < minMemory {
return fmt . Errorf ( "MaxMemoryBytes must be greater than or equal to the ConcurrencyQuota * InitialMemoryBytesQuotaPerQuery: %d < %d (%d * %d)" , c . MaxMemoryBytes , minMemory , c . ConcurrencyQuota , c . InitialMemoryBytesQuotaPerQuery )
}
}
2019-05-29 14:04:34 +00:00
if c . QueueSize <= 0 {
return errors . New ( "QueueSize must be positive" )
}
return nil
}
2019-10-09 22:27:10 +00:00
// Validate will validate that the controller configuration is valid.
func ( c * Config ) Validate ( ) error {
return c . validate ( false )
}
2019-05-29 14:04:34 +00:00
type QueryID uint64
2019-10-09 22:27:10 +00:00
func New ( config Config ) ( * Controller , error ) {
c , err := config . complete ( )
if err != nil {
2019-05-29 14:04:34 +00:00
return nil , errors . Wrap ( err , "invalid controller config" )
}
2020-08-27 19:46:32 +00:00
c . MetricLabelKeys = append ( c . MetricLabelKeys , orgLabel )
2019-05-29 14:04:34 +00:00
logger := c . Logger
if logger == nil {
logger = zap . NewNop ( )
}
logger . Info ( "Starting query controller" ,
2020-11-24 15:14:16 +00:00
zap . Int32 ( "concurrency_quota" , c . ConcurrencyQuota ) ,
2019-10-09 22:27:10 +00:00
zap . Int64 ( "initial_memory_bytes_quota_per_query" , c . InitialMemoryBytesQuotaPerQuery ) ,
2019-05-29 14:04:34 +00:00
zap . Int64 ( "memory_bytes_quota_per_query" , c . MemoryBytesQuotaPerQuery ) ,
2019-10-09 22:27:10 +00:00
zap . Int64 ( "max_memory_bytes" , c . MaxMemoryBytes ) ,
2019-05-29 14:04:34 +00:00
zap . Int ( "queue_size" , c . QueueSize ) )
2019-10-09 22:27:10 +00:00
mm := & memoryManager {
initialBytesQuotaPerQuery : c . InitialMemoryBytesQuotaPerQuery ,
memoryBytesQuotaPerQuery : c . MemoryBytesQuotaPerQuery ,
}
if c . MaxMemoryBytes > 0 {
mm . unusedMemoryBytes = c . MaxMemoryBytes - ( int64 ( c . ConcurrencyQuota ) * c . InitialMemoryBytesQuotaPerQuery )
} else {
mm . unlimited = true
}
2019-05-29 14:04:34 +00:00
ctrl := & Controller {
2020-03-31 08:52:55 +00:00
config : c ,
2019-10-09 22:27:10 +00:00
queries : make ( map [ QueryID ] * Query ) ,
queryQueue : make ( chan * Query , c . QueueSize ) ,
done : make ( chan struct { } ) ,
abort : make ( chan struct { } ) ,
memory : mm ,
2019-12-04 23:10:23 +00:00
log : logger ,
2019-10-09 22:27:10 +00:00
metrics : newControllerMetrics ( c . MetricLabelKeys ) ,
labelKeys : c . MetricLabelKeys ,
dependencies : c . ExecutorDependencies ,
2019-05-29 14:04:34 +00:00
}
2020-11-24 15:14:16 +00:00
quota := int ( c . ConcurrencyQuota )
ctrl . wg . Add ( quota )
for i := 0 ; i < quota ; i ++ {
2019-05-29 14:04:34 +00:00
go func ( ) {
defer ctrl . wg . Done ( )
ctrl . processQueryQueue ( )
} ( )
}
return ctrl , nil
2018-09-11 22:56:51 +00:00
}
2018-12-11 15:57:32 +00:00
// Query satisfies the AsyncQueryService while ensuring the request is propagated on the context.
2018-09-11 22:56:51 +00:00
func ( c * Controller ) Query ( ctx context . Context , req * query . Request ) ( flux . Query , error ) {
2019-03-06 00:18:04 +00:00
span , ctx := tracing . StartSpanFromContext ( ctx )
2019-03-05 00:38:10 +00:00
defer span . Finish ( )
2018-09-11 22:56:51 +00:00
// Set the request on the context so platform specific Flux operations can retrieve it later.
ctx = query . ContextWithRequest ( ctx , req )
// Set the org label value for controller metrics
2019-08-22 02:08:51 +00:00
ctx = context . WithValue ( ctx , orgLabel , req . OrganizationID . String ( ) ) //lint:ignore SA1029 this is a temporary ignore until we have time to create an appropriate type
2019-09-19 15:01:17 +00:00
// The controller injects the dependencies for each incoming request.
for _ , dep := range c . dependencies {
ctx = dep . Inject ( ctx )
}
2020-09-17 00:59:15 +00:00
// Add per-transformation spans if the feature flag is set.
if feature . QueryTracing ( ) . Enabled ( ctx ) {
2020-10-12 21:24:19 +00:00
ctx = flux . WithQueryTracingEnabled ( ctx )
2020-09-17 00:59:15 +00:00
}
2019-05-29 14:04:34 +00:00
q , err := c . query ( ctx , req . Compiler )
2018-12-11 15:57:32 +00:00
if err != nil {
2019-07-18 16:43:15 +00:00
return q , err
2018-12-11 15:57:32 +00:00
}
return q , nil
2018-09-11 22:56:51 +00:00
}
2019-05-29 14:04:34 +00:00
// query submits a query for execution returning immediately.
// Done must be called on any returned Query objects.
func ( c * Controller ) query ( ctx context . Context , compiler flux . Compiler ) ( flux . Query , error ) {
q , err := c . createQuery ( ctx , compiler . CompilerType ( ) )
if err != nil {
2019-07-18 16:43:15 +00:00
return nil , handleFluxError ( err )
2019-05-29 14:04:34 +00:00
}
if err := c . compileQuery ( q , compiler ) ; err != nil {
q . setErr ( err )
c . finish ( q )
c . countQueryRequest ( q , labelCompileError )
return nil , q . Err ( )
}
if err := c . enqueueQuery ( q ) ; err != nil {
q . setErr ( err )
c . finish ( q )
c . countQueryRequest ( q , labelQueueError )
return nil , q . Err ( )
}
return q , nil
}
func ( c * Controller ) createQuery ( ctx context . Context , ct flux . CompilerType ) ( * Query , error ) {
c . queriesMu . RLock ( )
if c . shutdown {
c . queriesMu . RUnlock ( )
return nil , errors . New ( "query controller shutdown" )
}
c . queriesMu . RUnlock ( )
id := c . nextID ( )
labelValues := make ( [ ] string , len ( c . labelKeys ) )
compileLabelValues := make ( [ ] string , len ( c . labelKeys ) + 1 )
for i , k := range c . labelKeys {
value := ctx . Value ( k )
var str string
switch v := value . ( type ) {
case string :
str = v
case fmt . Stringer :
str = v . String ( )
}
labelValues [ i ] = str
compileLabelValues [ i ] = str
}
compileLabelValues [ len ( compileLabelValues ) - 1 ] = string ( ct )
cctx , cancel := context . WithCancel ( ctx )
2020-03-31 14:01:31 +00:00
parentSpan , parentCtx := tracing . StartSpanFromContextWithPromMetrics (
2019-05-29 14:04:34 +00:00
cctx ,
"all" ,
c . metrics . allDur . WithLabelValues ( labelValues ... ) ,
c . metrics . all . WithLabelValues ( labelValues ... ) ,
)
q := & Query {
id : id ,
labelValues : labelValues ,
compileLabelValues : compileLabelValues ,
state : Created ,
c : c ,
results : make ( chan flux . Result ) ,
parentCtx : parentCtx ,
parentSpan : parentSpan ,
cancel : cancel ,
doneCh : make ( chan struct { } ) ,
}
// Lock the queries mutex for the rest of this method.
c . queriesMu . Lock ( )
defer c . queriesMu . Unlock ( )
if c . shutdown {
// Query controller was shutdown between when we started
// creating the query and ending it.
2019-07-18 16:43:15 +00:00
err := & flux . Error {
Code : codes . Unavailable ,
Msg : "query controller shutdown" ,
}
2019-05-29 14:04:34 +00:00
q . setErr ( err )
return nil , err
}
c . queries [ id ] = q
return q , nil
}
func ( c * Controller ) nextID ( ) QueryID {
nextID := atomic . AddUint64 ( & c . lastID , 1 )
return QueryID ( nextID )
}
func ( c * Controller ) countQueryRequest ( q * Query , result requestsLabel ) {
l := len ( q . labelValues )
lvs := make ( [ ] string , l + 1 )
copy ( lvs , q . labelValues )
lvs [ l ] = string ( result )
c . metrics . requests . WithLabelValues ( lvs ... ) . Inc ( )
}
2019-06-10 17:23:12 +00:00
func ( c * Controller ) compileQuery ( q * Query , compiler flux . Compiler ) ( err error ) {
2019-12-06 15:05:16 +00:00
log := c . log . With ( influxlogger . TraceFields ( q . parentCtx ) ... )
2019-12-03 17:49:25 +00:00
2019-06-10 17:23:12 +00:00
defer func ( ) {
if e := recover ( ) ; e != nil {
var ok bool
err , ok = e . ( error )
if ! ok {
err = fmt . Errorf ( "panic: %v" , e )
}
2019-12-03 17:49:25 +00:00
if entry := log . Check ( zapcore . InfoLevel , "panic during compile" ) ; entry != nil {
2019-06-10 17:23:12 +00:00
entry . Stack = string ( debug . Stack ( ) )
entry . Write ( zap . Error ( err ) )
}
}
} ( )
2019-05-29 14:04:34 +00:00
ctx , ok := q . tryCompile ( )
if ! ok {
2019-07-18 16:43:15 +00:00
return & flux . Error {
Code : codes . Internal ,
Msg : "failed to transition query to compiling state" ,
}
2019-05-29 14:04:34 +00:00
}
2020-09-17 00:59:15 +00:00
prog , err := compiler . Compile ( ctx , runtime . Default )
2019-05-29 14:04:34 +00:00
if err != nil {
2019-07-18 16:43:15 +00:00
return & flux . Error {
Msg : "compilation failed" ,
Err : err ,
}
2019-05-29 14:04:34 +00:00
}
2019-09-19 15:01:17 +00:00
if p , ok := prog . ( lang . LoggingProgram ) ; ok {
2019-12-03 17:49:25 +00:00
p . SetLogger ( log )
2019-05-29 14:04:34 +00:00
}
q . program = prog
return nil
}
func ( c * Controller ) enqueueQuery ( q * Query ) error {
if _ , ok := q . tryQueue ( ) ; ! ok {
2019-07-18 16:43:15 +00:00
return & flux . Error {
Code : codes . Internal ,
Msg : "failed to transition query to queueing state" ,
}
2019-05-29 14:04:34 +00:00
}
select {
case c . queryQueue <- q :
default :
2019-07-18 16:43:15 +00:00
return & flux . Error {
Code : codes . ResourceExhausted ,
Msg : "queue length exceeded" ,
}
2019-05-29 14:04:34 +00:00
}
return nil
}
func ( c * Controller ) processQueryQueue ( ) {
for {
select {
case <- c . done :
return
case q := <- c . queryQueue :
c . executeQuery ( q )
}
}
}
2019-10-09 22:27:10 +00:00
// executeQuery will execute a compiled program and wait for its completion.
2019-05-29 14:04:34 +00:00
func ( c * Controller ) executeQuery ( q * Query ) {
2019-12-03 17:49:25 +00:00
2019-10-09 22:27:10 +00:00
defer c . waitForQuery ( q )
2019-06-10 17:23:12 +00:00
defer func ( ) {
if e := recover ( ) ; e != nil {
var ok bool
err , ok := e . ( error )
if ! ok {
err = fmt . Errorf ( "panic: %v" , e )
}
q . setErr ( err )
2019-12-06 15:05:16 +00:00
if entry := c . log . With ( influxlogger . TraceFields ( q . parentCtx ) ... ) .
2019-12-03 17:49:25 +00:00
Check ( zapcore . InfoLevel , "panic during program start" ) ; entry != nil {
2019-06-10 17:23:12 +00:00
entry . Stack = string ( debug . Stack ( ) )
entry . Write ( zap . Error ( err ) )
}
}
} ( )
2019-05-29 14:04:34 +00:00
ctx , ok := q . tryExec ( )
if ! ok {
// This may happen if the query was cancelled (either because the
// client cancelled it, or because the controller is shutting down)
// In the case of cancellation, SetErr() should reset the error to an
// appropriate message.
2019-07-18 16:43:15 +00:00
q . setErr ( & flux . Error {
Code : codes . Internal ,
Msg : "impossible state transition" ,
} )
2020-03-20 23:01:15 +00:00
2019-05-29 14:04:34 +00:00
return
}
2019-10-09 22:27:10 +00:00
q . c . createAllocator ( q )
2020-03-25 17:03:38 +00:00
// Record unused memory before start.
q . recordUnusedMemory ( )
2019-05-29 14:04:34 +00:00
exec , err := q . program . Start ( ctx , q . alloc )
if err != nil {
q . setErr ( err )
return
}
q . exec = exec
q . pump ( exec , ctx . Done ( ) )
}
2019-10-09 22:27:10 +00:00
// waitForQuery will wait until the query is done.
func ( c * Controller ) waitForQuery ( q * Query ) {
select {
case <- q . doneCh :
case <- c . done :
}
}
2019-05-29 14:04:34 +00:00
func ( c * Controller ) finish ( q * Query ) {
c . queriesMu . Lock ( )
delete ( c . queries , q . id )
if len ( c . queries ) == 0 && c . shutdown {
close ( c . done )
}
c . queriesMu . Unlock ( )
}
// Queries reports the active queries.
func ( c * Controller ) Queries ( ) [ ] * Query {
c . queriesMu . RLock ( )
defer c . queriesMu . RUnlock ( )
queries := make ( [ ] * Query , 0 , len ( c . queries ) )
for _ , q := range c . queries {
queries = append ( queries , q )
}
return queries
}
// Shutdown will signal to the Controller that it should not accept any
// new queries and that it should finish executing any existing queries.
// This will return once the Controller's run loop has been exited and all
// queries have been finished or until the Context has been canceled.
func ( c * Controller ) Shutdown ( ctx context . Context ) error {
// Mark that the controller is shutdown so it does not
// accept new queries.
c . queriesMu . Lock ( )
c . shutdown = true
if len ( c . queries ) == 0 {
c . queriesMu . Unlock ( )
return nil
}
c . queriesMu . Unlock ( )
// Cancel all of the currently active queries.
c . queriesMu . RLock ( )
for _ , q := range c . queries {
q . Cancel ( )
}
c . queriesMu . RUnlock ( )
// Wait for query processing goroutines to finish.
defer c . wg . Wait ( )
// Wait for all of the queries to be cleaned up or until the
// context is done.
select {
case <- c . done :
return nil
case <- ctx . Done ( ) :
c . abortOnce . Do ( func ( ) {
close ( c . abort )
} )
return ctx . Err ( )
}
}
2019-08-02 15:53:14 +00:00
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
2018-09-11 22:56:51 +00:00
func ( c * Controller ) PrometheusCollectors ( ) [ ] prometheus . Collector {
2019-08-02 15:53:14 +00:00
collectors := c . metrics . PrometheusCollectors ( )
2019-09-19 15:01:17 +00:00
for _ , dep := range c . dependencies {
if pc , ok := dep . ( prom . PrometheusCollector ) ; ok {
2019-08-02 15:53:14 +00:00
collectors = append ( collectors , pc . PrometheusCollectors ( ) ... )
}
}
return collectors
2018-09-11 22:56:51 +00:00
}
2018-11-26 17:43:09 +00:00
2020-03-31 08:52:55 +00:00
func ( c * Controller ) GetUnusedMemoryBytes ( ) int64 {
return c . memory . getUnusedMemoryBytes ( )
}
func ( c * Controller ) GetUsedMemoryBytes ( ) int64 {
return c . config . MaxMemoryBytes - c . GetUnusedMemoryBytes ( )
}
2019-05-29 14:04:34 +00:00
// Query represents a single request.
type Query struct {
id QueryID
labelValues [ ] string
compileLabelValues [ ] string
c * Controller
// query state. The stateMu protects access for the group below.
stateMu sync . RWMutex
state State
err error
runtimeErrs [ ] error
cancel func ( )
parentCtx context . Context
2020-03-31 14:01:31 +00:00
parentSpan , currentSpan * tracing . Span
2019-05-29 14:04:34 +00:00
stats flux . Statistics
done sync . Once
doneCh chan struct { }
program flux . Program
exec flux . Query
results chan flux . Result
2019-10-09 22:27:10 +00:00
memoryManager * queryMemoryManager
alloc * memory . Allocator
2019-05-29 14:04:34 +00:00
}
2020-09-17 00:59:15 +00:00
func ( q * Query ) ProfilerResults ( ) ( flux . ResultIterator , error ) {
p := q . program . ( * lang . AstProgram )
if len ( p . Profilers ) == 0 {
return nil , nil
}
tables := make ( [ ] flux . Table , 0 )
for _ , profiler := range p . Profilers {
if result , err := profiler . GetResult ( q , q . alloc ) ; err != nil {
return nil , err
} else {
tables = append ( tables , result )
}
}
res := table . NewProfilerResult ( tables ... )
return flux . NewSliceResultIterator ( [ ] flux . Result { & res } ) , nil
}
2019-05-29 14:04:34 +00:00
// ID reports an ephemeral unique ID for the query.
func ( q * Query ) ID ( ) QueryID {
return q . id
}
// Cancel will stop the query execution.
func ( q * Query ) Cancel ( ) {
// Call the cancel function to signal that execution should
// be interrupted.
q . cancel ( )
}
// Results returns a channel that will deliver the query results.
//
// It's possible that the channel is closed before any results arrive.
// In particular, if a query's context or the query itself is canceled,
// the query may close the results channel before any results are computed.
//
// The query may also have an error during execution so the Err()
// function should be used to check if an error happened.
func ( q * Query ) Results ( ) <- chan flux . Result {
return q . results
}
2020-03-25 17:03:38 +00:00
func ( q * Query ) recordUnusedMemory ( ) {
2020-03-31 08:52:55 +00:00
unused := q . c . GetUnusedMemoryBytes ( )
2020-03-25 17:03:38 +00:00
q . c . metrics . memoryUnused . WithLabelValues ( q . labelValues ... ) . Set ( float64 ( unused ) )
}
2019-05-29 14:04:34 +00:00
// Done signals to the Controller that this query is no longer
// being used and resources related to the query may be freed.
func ( q * Query ) Done ( ) {
// This must only be invoked once.
q . done . Do ( func ( ) {
// All done calls should block until the first done call succeeds.
defer close ( q . doneCh )
// Lock the state mutex and transition to the finished state.
// Then force the query to cancel to tell it to stop executing.
// We transition to the new state first so that we do not enter
// the canceled state at any point (as we have not been canceled).
q . stateMu . Lock ( )
q . transitionTo ( Finished )
q . cancel ( )
q . stateMu . Unlock ( )
// Ensure that all of the results have been drained.
// It is ok to read this as the user has already indicated they don't
// care about the results. When this is closed, it tells us an error has
// been set or the results have finished being pumped.
for range q . results {
// Do nothing with the results.
}
// No other goroutines should be modifying state at this point so we
// can do things that would be unsafe in another context.
if q . exec != nil {
// Mark the program as being done and copy out the error if it exists.
q . exec . Done ( )
if q . err == nil {
// TODO(jsternberg): The underlying program never returns
// this so maybe their interface should change?
q . err = q . exec . Err ( )
}
// Merge the metadata from the program into the controller stats.
stats := q . exec . Statistics ( )
q . stats . Metadata = stats . Metadata
}
// Retrieve the runtime errors that have been accumulated.
errMsgs := make ( [ ] string , 0 , len ( q . runtimeErrs ) )
for _ , e := range q . runtimeErrs {
errMsgs = append ( errMsgs , e . Error ( ) )
}
q . stats . RuntimeErrors = errMsgs
// Mark the query as finished so it is removed from the query map.
q . c . finish ( q )
2019-10-09 22:27:10 +00:00
// Release the additional memory associated with this query.
if q . memoryManager != nil {
q . memoryManager . Release ( )
2020-03-25 17:03:38 +00:00
// Record unused memory after finish.
q . recordUnusedMemory ( )
2019-10-09 22:27:10 +00:00
}
2020-03-25 17:03:38 +00:00
// Count query request.
2019-05-29 14:04:34 +00:00
if q . err != nil || len ( q . runtimeErrs ) > 0 {
q . c . countQueryRequest ( q , labelRuntimeError )
} else {
q . c . countQueryRequest ( q , labelSuccess )
}
2020-03-25 17:03:38 +00:00
2019-05-29 14:04:34 +00:00
} )
<- q . doneCh
}
// Statistics reports the statistics for the query.
//
// This method must be called after Done. It will block until
// the query has been finalized unless a context is given.
func ( q * Query ) Statistics ( ) flux . Statistics {
stats := q . stats
if q . alloc != nil {
stats . MaxAllocated = q . alloc . MaxAllocated ( )
}
return stats
}
// State reports the current state of the query.
func ( q * Query ) State ( ) State {
q . stateMu . RLock ( )
state := q . state
if ! isFinishedState ( state ) {
// If the query is a non-finished state, check the
// context to see if we have been interrupted.
select {
case <- q . parentCtx . Done ( ) :
// The query has been canceled so report to the
// outside world that we have been canceled.
// Do NOT attempt to change the internal state
// variable here. It is a minefield. Leave the
// normal query execution to figure that out.
state = Canceled
default :
// The context has not been canceled.
}
}
q . stateMu . RUnlock ( )
return state
}
// transitionTo will transition from one state to another. If a list of current states
// is given, then the query must be in one of those states for the transition to succeed.
// This method must be called with a lock and it must be called from within the run loop.
func ( q * Query ) transitionTo ( newState State , currentState ... State ) ( context . Context , bool ) {
// If we are transitioning to a non-finished state, the query
// may have been canceled. If the query was canceled, then
// we need to transition to the canceled state
if ! isFinishedState ( newState ) {
select {
case <- q . parentCtx . Done ( ) :
// Transition to the canceled state and report that
// we failed to transition to the desired state.
_ , _ = q . transitionTo ( Canceled )
return nil , false
default :
}
}
if len ( currentState ) > 0 {
// Find the current state in the list of current states.
for _ , st := range currentState {
if q . state == st {
goto TRANSITION
}
}
return nil , false
}
TRANSITION :
// We are transitioning to a new state. Close the current span (if it exists).
if q . currentSpan != nil {
q . currentSpan . Finish ( )
switch q . state {
case Compiling :
q . stats . CompileDuration += q . currentSpan . Duration
case Queueing :
q . stats . QueueDuration += q . currentSpan . Duration
case Executing :
q . stats . ExecuteDuration += q . currentSpan . Duration
}
}
q . currentSpan = nil
if isFinishedState ( newState ) {
// Invoke the cancel function to ensure that we have signaled that the query should be done.
// The user is supposed to read the entirety of the tables returned before we end up in a finished
// state, but user error may have caused this not to happen so there's no harm to canceling multiple
// times.
q . cancel ( )
// If we are transitioning to a finished state from a non-finished state, finish the parent span.
if q . parentSpan != nil {
q . parentSpan . Finish ( )
q . stats . TotalDuration = q . parentSpan . Duration
q . parentSpan = nil
}
}
// Transition to the new state.
q . state = newState
// Start a new span and set a new context.
var (
dur * prometheus . HistogramVec
gauge * prometheus . GaugeVec
labelValues = q . labelValues
)
switch newState {
case Compiling :
dur , gauge = q . c . metrics . compilingDur , q . c . metrics . compiling
labelValues = q . compileLabelValues
case Queueing :
dur , gauge = q . c . metrics . queueingDur , q . c . metrics . queueing
case Executing :
dur , gauge = q . c . metrics . executingDur , q . c . metrics . executing
default :
// This state is not tracked so do not create a new span or context for it.
// Use the parent context if one is needed.
return q . parentCtx , true
}
var currentCtx context . Context
2020-03-31 14:01:31 +00:00
q . currentSpan , currentCtx = tracing . StartSpanFromContextWithPromMetrics (
2019-05-29 14:04:34 +00:00
q . parentCtx ,
newState . String ( ) ,
dur . WithLabelValues ( labelValues ... ) ,
gauge . WithLabelValues ( labelValues ... ) ,
)
return currentCtx , true
}
// Err reports any error the query may have encountered.
func ( q * Query ) Err ( ) error {
q . stateMu . Lock ( )
err := q . err
q . stateMu . Unlock ( )
2019-07-18 16:43:15 +00:00
return handleFluxError ( err )
2019-05-29 14:04:34 +00:00
}
// setErr marks this query with an error. If the query was
// canceled, then the error is ignored.
//
// This will mark the query as ready so setResults must not
// be called if this method is invoked.
func ( q * Query ) setErr ( err error ) {
q . stateMu . Lock ( )
defer q . stateMu . Unlock ( )
// We may have this get called when the query is canceled.
// If that is the case, transition to the canceled state
// instead and record the error from that since the error
// we received is probably wrong.
select {
case <- q . parentCtx . Done ( ) :
q . transitionTo ( Canceled )
err = q . parentCtx . Err ( )
default :
q . transitionTo ( Errored )
}
q . err = err
// Close the ready channel to report that no results
// will be sent.
close ( q . results )
}
func ( q * Query ) addRuntimeError ( e error ) {
q . stateMu . Lock ( )
defer q . stateMu . Unlock ( )
q . runtimeErrs = append ( q . runtimeErrs , e )
}
// pump will read from the executing query results and pump the
// results to our destination.
// When there are no more results, then this will close our own
// results channel.
func ( q * Query ) pump ( exec flux . Query , done <- chan struct { } ) {
defer close ( q . results )
// When our context is canceled, we need to propagate that cancel
// signal down to the executing program just in case it is waiting
// for a cancel signal and is ignoring the passed in context.
// We want this signal to only be sent once and we want to continue
// draining the results until the underlying program has actually
// been finished so we copy this to a new channel and set it to
// nil when it has been closed.
signalCh := done
for {
select {
case res , ok := <- exec . Results ( ) :
if ! ok {
return
}
// It is possible for the underlying query to misbehave.
// We have to continue pumping results even if this is the
// case, but if the query has been canceled or finished with
// done, nobody is going to read these values so we need
// to avoid blocking.
ecr := & errorCollectingResult {
Result : res ,
q : q ,
}
select {
case <- done :
case q . results <- ecr :
}
case <- signalCh :
// Signal to the underlying executor that the query
// has been canceled. Usually, the signal on the context
// is likely enough, but this explicitly signals just in case.
exec . Cancel ( )
// Set the done channel to nil so we don't do this again
// and we continue to drain the results.
signalCh = nil
case <- q . c . abort :
// If we get here, then any running queries should have been cancelled
// in controller.Shutdown().
return
}
}
}
// tryCompile attempts to transition the query into the Compiling state.
func ( q * Query ) tryCompile ( ) ( context . Context , bool ) {
q . stateMu . Lock ( )
defer q . stateMu . Unlock ( )
return q . transitionTo ( Compiling , Created )
}
// tryQueue attempts to transition the query into the Queueing state.
func ( q * Query ) tryQueue ( ) ( context . Context , bool ) {
q . stateMu . Lock ( )
defer q . stateMu . Unlock ( )
return q . transitionTo ( Queueing , Compiling )
}
// tryExec attempts to transition the query into the Executing state.
func ( q * Query ) tryExec ( ) ( context . Context , bool ) {
q . stateMu . Lock ( )
defer q . stateMu . Unlock ( )
return q . transitionTo ( Executing , Queueing )
}
type errorCollectingResult struct {
flux . Result
q * Query
}
func ( r * errorCollectingResult ) Tables ( ) flux . TableIterator {
return & errorCollectingTableIterator {
TableIterator : r . Result . Tables ( ) ,
q : r . q ,
}
}
type errorCollectingTableIterator struct {
flux . TableIterator
q * Query
}
func ( ti * errorCollectingTableIterator ) Do ( f func ( t flux . Table ) error ) error {
err := ti . TableIterator . Do ( f )
if err != nil {
2019-08-20 17:20:40 +00:00
err = handleFluxError ( err )
2019-05-29 14:04:34 +00:00
ti . q . addRuntimeError ( err )
}
return err
}
// State is the query state.
type State int
const (
// Created indicates the query has been created.
Created State = iota
// Compiling indicates that the query is in the process
// of executing the compiler associated with the query.
Compiling
// Queueing indicates the query is waiting inside of the
// scheduler to be executed.
Queueing
// Executing indicates that the query is currently executing.
Executing
// Errored indicates that there was an error when attempting
// to execute a query within any state inside of the controller.
Errored
// Finished indicates that the query has been marked as Done
// and it is awaiting removal from the Controller or has already
// been removed.
Finished
// Canceled indicates that the query was signaled to be
// canceled. A canceled query must still be released with Done.
Canceled
)
func ( s State ) String ( ) string {
switch s {
case Created :
return "created"
case Compiling :
return "compiling"
case Queueing :
return "queueing"
case Executing :
return "executing"
case Errored :
return "errored"
case Finished :
return "finished"
case Canceled :
return "canceled"
default :
return "unknown"
}
}
func isFinishedState ( state State ) bool {
switch state {
case Canceled , Errored , Finished :
return true
default :
return false
}
}
2019-07-18 16:43:15 +00:00
// handleFluxError will take a flux.Error and convert it into an influxdb.Error.
// It will match certain codes to the equivalent in influxdb.
//
// If the error is any other type of error, it will return the error untouched.
//
// TODO(jsternberg): This likely becomes a public function, but this is just an initial
// implementation so playing it safe by making it package local for now.
func handleFluxError ( err error ) error {
ferr , ok := err . ( * flux . Error )
if ! ok {
return err
}
2019-08-27 15:44:46 +00:00
werr := handleFluxError ( ferr . Err )
2019-07-18 16:43:15 +00:00
code := influxdb . EInternal
2019-08-27 15:44:46 +00:00
switch ferr . Code {
case codes . Inherit :
// If we are inheriting the error code, influxdb doesn't
// have an equivalent of this so we need to retrieve
// the error code from the wrapped error which has already
// been translated to an influxdb error (if possible).
if werr != nil {
code = influxdb . ErrorCode ( werr )
}
2019-07-18 16:43:15 +00:00
case codes . NotFound :
code = influxdb . ENotFound
case codes . Invalid :
code = influxdb . EInvalid
// These don't really map correctly, but we want
// them to show up as 4XX so until influxdb error
// codes are updated for more types of failures,
// mapping these to invalid.
case codes . Canceled ,
codes . ResourceExhausted ,
codes . FailedPrecondition ,
codes . Aborted ,
codes . OutOfRange ,
codes . Unimplemented :
code = influxdb . EInvalid
case codes . PermissionDenied :
code = influxdb . EForbidden
case codes . Unauthenticated :
code = influxdb . EUnauthorized
default :
// Everything else is treated as an internal error
// which is set above.
}
return & influxdb . Error {
Code : code ,
Msg : ferr . Msg ,
2019-08-27 15:44:46 +00:00
Err : werr ,
2019-07-18 16:43:15 +00:00
}
}