diff --git a/Gopkg.lock b/Gopkg.lock index 280baa4605..2145ec59df 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -535,12 +535,12 @@ revision = "0b12d6b5" [[projects]] - digest = "1:78867fbb25230fd7dacdc62eab84f78593d74adee03ce974038cfd8299615167" + digest = "1:e788f8a9bd113b97bf37e03cb98be2df72dff19714ee17fdd887417c46bd46dd" name = "github.com/jsternberg/zap-logfmt" packages = ["."] pruneopts = "UT" - revision = "ac4bd917e18a4548ce6e0e765b29a4e7f397b0b6" - version = "v1.0.0" + revision = "9a5340c241253a529cbf359db5038e5b3549bd53" + version = "v1.1.0" [[projects]] digest = "1:86228f0e4ff0f954420afeee30b44fedebc2ceebc78228e4f9dd0a4e0c93bf68" @@ -864,7 +864,7 @@ version = "v1.1.0" [[projects]] - digest = "1:973def4d3d414173bbc236b72680b76ec9509a62433b1814fcfd70c0386e272f" + digest = "1:ec84c73fae08866389c8ffb608073553cdbe0eea4c3236fddbe33a2f114e2453" name = "go.uber.org/zap" packages = [ ".", @@ -872,7 +872,9 @@ "internal/bufferpool", "internal/color", "internal/exit", + "internal/ztest", "zapcore", + "zaptest", "zaptest/observer", ] pruneopts = "UT" @@ -1105,6 +1107,8 @@ "github.com/uber/jaeger-client-go/log", "github.com/uber/jaeger-lib/metrics", "go.uber.org/zap", + "go.uber.org/zap/zapcore", + "go.uber.org/zap/zaptest", "go.uber.org/zap/zaptest/observer", "golang.org/x/net/context", "golang.org/x/oauth2", diff --git a/query/control/controller.go b/query/control/controller.go index fcb94c3533..ac868e280e 100644 --- a/query/control/controller.go +++ b/query/control/controller.go @@ -35,6 +35,7 @@ import ( opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" ) // Controller provides a central location to manage all incoming queries. @@ -54,6 +55,7 @@ type Controller struct { lplanner plan.LogicalPlanner pplanner plan.Planner executor execute.Executor + logger *zap.Logger maxConcurrency int availableConcurrency int @@ -64,12 +66,17 @@ type Config struct { ConcurrencyQuota int MemoryBytesQuota int64 ExecutorDependencies execute.Dependencies + Logger *zap.Logger Verbose bool } type QueryID uint64 func New(c Config) *Controller { + logger := c.Logger + if logger == nil { + logger = zap.NewNop() + } ctrl := &Controller{ newQueries: make(chan *Query), queries: make(map[QueryID]*Query), @@ -80,7 +87,8 @@ func New(c Config) *Controller { availableMemory: c.MemoryBytesQuota, lplanner: plan.NewLogicalPlanner(), pplanner: plan.NewPlanner(), - executor: execute.NewExecutor(c.ExecutorDependencies), + executor: execute.NewExecutor(c.ExecutorDependencies, logger), + logger: logger, metrics: newControllerMetrics(), verbose: c.Verbose, } diff --git a/query/execute/dispatcher.go b/query/execute/dispatcher.go index 42f8169f2f..d061e02452 100644 --- a/query/execute/dispatcher.go +++ b/query/execute/dispatcher.go @@ -5,6 +5,9 @@ import ( "fmt" "runtime/debug" "sync" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // Dispatcher schedules work for a query. @@ -31,14 +34,17 @@ type poolDispatcher struct { wg sync.WaitGroup err error errC chan error + + logger *zap.Logger } -func newPoolDispatcher(throughput int) *poolDispatcher { +func newPoolDispatcher(throughput int, logger *zap.Logger) *poolDispatcher { return &poolDispatcher{ throughput: throughput, work: make(chan ScheduleFunc, 100), closing: make(chan struct{}), errC: make(chan error, 1), + logger: logger.With(zap.String("component", "dispatcher")), } } @@ -65,6 +71,10 @@ func (d *poolDispatcher) Start(n int, ctx context.Context) { err = fmt.Errorf("%v", e) } d.setErr(fmt.Errorf("panic: %v\n%s", err, debug.Stack())) + if entry := d.logger.Check(zapcore.InfoLevel, "Dispatcher panic"); entry != nil { + entry.Stack = string(debug.Stack()) + entry.Write(zap.Error(err)) + } } }() d.run(ctx) diff --git a/query/execute/executor.go b/query/execute/executor.go index c25b936dc8..db6f800075 100644 --- a/query/execute/executor.go +++ b/query/execute/executor.go @@ -11,6 +11,8 @@ import ( "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/plan" "github.com/pkg/errors" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) type Executor interface { @@ -18,12 +20,17 @@ type Executor interface { } type executor struct { - deps Dependencies + deps Dependencies + logger *zap.Logger } -func NewExecutor(deps Dependencies) Executor { +func NewExecutor(deps Dependencies, logger *zap.Logger) Executor { + if logger == nil { + logger = zap.NewNop() + } e := &executor{ - deps: deps, + deps: deps, + logger: logger, } return e } @@ -58,6 +65,7 @@ type executionState struct { transports []Transport dispatcher *poolDispatcher + logger *zap.Logger } func (e *executor) Execute(ctx context.Context, orgID platform.ID, p *plan.PlanSpec, a *Allocator) (map[string]query.Result, error) { @@ -65,6 +73,7 @@ func (e *executor) Execute(ctx context.Context, orgID platform.ID, p *plan.PlanS if err != nil { return nil, errors.Wrap(err, "failed to initialize execute state") } + es.logger = e.logger es.do(ctx) return es.results, nil } @@ -90,7 +99,7 @@ func (e *executor) createExecutionState(ctx context.Context, orgID platform.ID, resources: p.Resources, results: make(map[string]query.Result, len(p.Results)), // TODO(nathanielc): Have the planner specify the dispatcher throughput - dispatcher: newPoolDispatcher(10), + dispatcher: newPoolDispatcher(10, e.logger), } nodes := make(map[plan.ProcedureID]Node, len(p.Procedures)) for name, yield := range p.Results { @@ -200,6 +209,10 @@ func (es *executionState) do(ctx context.Context) { err = fmt.Errorf("%v", e) } es.abort(fmt.Errorf("panic: %v\n%s", err, debug.Stack())) + if entry := es.logger.Check(zapcore.InfoLevel, "Execute source panic"); entry != nil { + entry.Stack = string(debug.Stack()) + entry.Write(zap.Error(err)) + } } }() src.Run(ctx) diff --git a/query/execute/executor_test.go b/query/execute/executor_test.go index 4a5442a3c9..f94bee8066 100644 --- a/query/execute/executor_test.go +++ b/query/execute/executor_test.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query/plan" uuid "github.com/satori/go.uuid" + "go.uber.org/zap/zaptest" ) var epoch = time.Unix(0, 0) @@ -488,7 +489,7 @@ func TestExecutor_Execute(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { - exe := execute.NewExecutor(nil) + exe := execute.NewExecutor(nil, zaptest.NewLogger(t)) results, err := exe.Execute(context.Background(), orgID, tc.plan, executetest.UnlimitedAllocator) if err != nil { t.Fatal(err)