feat(query): log panics with their stacktraces within the query executor
The logger is now threaded into the query controller, executor, and the dispatcher so that we can log panics. They are logged at the info level because the panics do not result in the system crashing and becoming unusable.pull/10616/head
parent
769791d366
commit
1129552475
|
@ -535,12 +535,12 @@
|
|||
revision = "0b12d6b5"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:78867fbb25230fd7dacdc62eab84f78593d74adee03ce974038cfd8299615167"
|
||||
digest = "1:e788f8a9bd113b97bf37e03cb98be2df72dff19714ee17fdd887417c46bd46dd"
|
||||
name = "github.com/jsternberg/zap-logfmt"
|
||||
packages = ["."]
|
||||
pruneopts = "UT"
|
||||
revision = "ac4bd917e18a4548ce6e0e765b29a4e7f397b0b6"
|
||||
version = "v1.0.0"
|
||||
revision = "9a5340c241253a529cbf359db5038e5b3549bd53"
|
||||
version = "v1.1.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:86228f0e4ff0f954420afeee30b44fedebc2ceebc78228e4f9dd0a4e0c93bf68"
|
||||
|
@ -864,7 +864,7 @@
|
|||
version = "v1.1.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:973def4d3d414173bbc236b72680b76ec9509a62433b1814fcfd70c0386e272f"
|
||||
digest = "1:ec84c73fae08866389c8ffb608073553cdbe0eea4c3236fddbe33a2f114e2453"
|
||||
name = "go.uber.org/zap"
|
||||
packages = [
|
||||
".",
|
||||
|
@ -872,7 +872,9 @@
|
|||
"internal/bufferpool",
|
||||
"internal/color",
|
||||
"internal/exit",
|
||||
"internal/ztest",
|
||||
"zapcore",
|
||||
"zaptest",
|
||||
"zaptest/observer",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
|
@ -1105,6 +1107,8 @@
|
|||
"github.com/uber/jaeger-client-go/log",
|
||||
"github.com/uber/jaeger-lib/metrics",
|
||||
"go.uber.org/zap",
|
||||
"go.uber.org/zap/zapcore",
|
||||
"go.uber.org/zap/zaptest",
|
||||
"go.uber.org/zap/zaptest/observer",
|
||||
"golang.org/x/net/context",
|
||||
"golang.org/x/oauth2",
|
||||
|
|
|
@ -35,6 +35,7 @@ import (
|
|||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Controller provides a central location to manage all incoming queries.
|
||||
|
@ -54,6 +55,7 @@ type Controller struct {
|
|||
lplanner plan.LogicalPlanner
|
||||
pplanner plan.Planner
|
||||
executor execute.Executor
|
||||
logger *zap.Logger
|
||||
|
||||
maxConcurrency int
|
||||
availableConcurrency int
|
||||
|
@ -64,12 +66,17 @@ type Config struct {
|
|||
ConcurrencyQuota int
|
||||
MemoryBytesQuota int64
|
||||
ExecutorDependencies execute.Dependencies
|
||||
Logger *zap.Logger
|
||||
Verbose bool
|
||||
}
|
||||
|
||||
type QueryID uint64
|
||||
|
||||
func New(c Config) *Controller {
|
||||
logger := c.Logger
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
}
|
||||
ctrl := &Controller{
|
||||
newQueries: make(chan *Query),
|
||||
queries: make(map[QueryID]*Query),
|
||||
|
@ -80,7 +87,8 @@ func New(c Config) *Controller {
|
|||
availableMemory: c.MemoryBytesQuota,
|
||||
lplanner: plan.NewLogicalPlanner(),
|
||||
pplanner: plan.NewPlanner(),
|
||||
executor: execute.NewExecutor(c.ExecutorDependencies),
|
||||
executor: execute.NewExecutor(c.ExecutorDependencies, logger),
|
||||
logger: logger,
|
||||
metrics: newControllerMetrics(),
|
||||
verbose: c.Verbose,
|
||||
}
|
||||
|
|
|
@ -5,6 +5,9 @@ import (
|
|||
"fmt"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
// Dispatcher schedules work for a query.
|
||||
|
@ -31,14 +34,17 @@ type poolDispatcher struct {
|
|||
wg sync.WaitGroup
|
||||
err error
|
||||
errC chan error
|
||||
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func newPoolDispatcher(throughput int) *poolDispatcher {
|
||||
func newPoolDispatcher(throughput int, logger *zap.Logger) *poolDispatcher {
|
||||
return &poolDispatcher{
|
||||
throughput: throughput,
|
||||
work: make(chan ScheduleFunc, 100),
|
||||
closing: make(chan struct{}),
|
||||
errC: make(chan error, 1),
|
||||
logger: logger.With(zap.String("component", "dispatcher")),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -65,6 +71,10 @@ func (d *poolDispatcher) Start(n int, ctx context.Context) {
|
|||
err = fmt.Errorf("%v", e)
|
||||
}
|
||||
d.setErr(fmt.Errorf("panic: %v\n%s", err, debug.Stack()))
|
||||
if entry := d.logger.Check(zapcore.InfoLevel, "Dispatcher panic"); entry != nil {
|
||||
entry.Stack = string(debug.Stack())
|
||||
entry.Write(zap.Error(err))
|
||||
}
|
||||
}
|
||||
}()
|
||||
d.run(ctx)
|
||||
|
|
|
@ -11,6 +11,8 @@ import (
|
|||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/plan"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
type Executor interface {
|
||||
|
@ -18,12 +20,17 @@ type Executor interface {
|
|||
}
|
||||
|
||||
type executor struct {
|
||||
deps Dependencies
|
||||
deps Dependencies
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewExecutor(deps Dependencies) Executor {
|
||||
func NewExecutor(deps Dependencies, logger *zap.Logger) Executor {
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
}
|
||||
e := &executor{
|
||||
deps: deps,
|
||||
deps: deps,
|
||||
logger: logger,
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
@ -58,6 +65,7 @@ type executionState struct {
|
|||
transports []Transport
|
||||
|
||||
dispatcher *poolDispatcher
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func (e *executor) Execute(ctx context.Context, orgID platform.ID, p *plan.PlanSpec, a *Allocator) (map[string]query.Result, error) {
|
||||
|
@ -65,6 +73,7 @@ func (e *executor) Execute(ctx context.Context, orgID platform.ID, p *plan.PlanS
|
|||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to initialize execute state")
|
||||
}
|
||||
es.logger = e.logger
|
||||
es.do(ctx)
|
||||
return es.results, nil
|
||||
}
|
||||
|
@ -90,7 +99,7 @@ func (e *executor) createExecutionState(ctx context.Context, orgID platform.ID,
|
|||
resources: p.Resources,
|
||||
results: make(map[string]query.Result, len(p.Results)),
|
||||
// TODO(nathanielc): Have the planner specify the dispatcher throughput
|
||||
dispatcher: newPoolDispatcher(10),
|
||||
dispatcher: newPoolDispatcher(10, e.logger),
|
||||
}
|
||||
nodes := make(map[plan.ProcedureID]Node, len(p.Procedures))
|
||||
for name, yield := range p.Results {
|
||||
|
@ -200,6 +209,10 @@ func (es *executionState) do(ctx context.Context) {
|
|||
err = fmt.Errorf("%v", e)
|
||||
}
|
||||
es.abort(fmt.Errorf("panic: %v\n%s", err, debug.Stack()))
|
||||
if entry := es.logger.Check(zapcore.InfoLevel, "Execute source panic"); entry != nil {
|
||||
entry.Stack = string(debug.Stack())
|
||||
entry.Write(zap.Error(err))
|
||||
}
|
||||
}
|
||||
}()
|
||||
src.Run(ctx)
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/influxdata/platform/query/functions"
|
||||
"github.com/influxdata/platform/query/plan"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
var epoch = time.Unix(0, 0)
|
||||
|
@ -488,7 +489,7 @@ func TestExecutor_Execute(t *testing.T) {
|
|||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
exe := execute.NewExecutor(nil)
|
||||
exe := execute.NewExecutor(nil, zaptest.NewLogger(t))
|
||||
results, err := exe.Execute(context.Background(), orgID, tc.plan, executetest.UnlimitedAllocator)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
Loading…
Reference in New Issue