fix(flux): add durations to Flux logging (#19697)

pull/19777/head
Christopher M. Wolff 2020-10-13 10:59:39 -07:00 committed by GitHub
parent 563e6c3d1a
commit 25fb1077e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 186 additions and 9 deletions

View File

@ -1,6 +1,10 @@
v1.8.4 [unreleased]
-------------------
### Bugfixes
- [#19696](https://github.com/influxdata/influxdb/pull/19697): fix(flux): add durations to Flux logging
v1.8.3 [2020-09-30]
-------------------

View File

@ -2,6 +2,7 @@ package control
import (
"context"
"time"
"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
@ -16,6 +17,59 @@ import (
type MetaClient = coordinator.MetaClient
type Authorizer = influxdb.Authorizer
// query is a wrapper that lets us accumulate statistics about
// how long it took to compile and execute a query.
type query struct {
flux.Query
// The time that this query was requested by invoking Controller.Query
requestStart time.Time
// The duration of compiling this query
compileDuration time.Duration
// The time this query began execution
execStart time.Time
stats flux.Statistics
}
func (q *query) Done() {
q.Query.Done()
q.stats = q.Query.Statistics()
q.stats.CompileDuration = q.compileDuration
q.stats.ExecuteDuration = time.Since(q.execStart)
q.stats.TotalDuration = time.Since(q.requestStart)
}
func (q *query) Statistics() flux.Statistics {
return q.stats
}
// program is a wrapper that lets us return a wrapped flux.Query
// that let's us accumulate statistics about how long it took to
// compile and execute a query.
type program struct {
flux.Program
requestStart time.Time
compileDuration time.Duration
}
func (p *program) Start(ctx context.Context, allocator *memory.Allocator) (flux.Query, error) {
start := time.Now()
q, err := p.Program.Start(ctx, allocator)
if err != nil {
return nil, err
}
return &query{
Query: q,
requestStart: p.requestStart,
compileDuration: p.compileDuration,
execStart: start,
}, nil
}
func NewController(mc MetaClient, reader influxdb.Reader, auth Authorizer, authEnabled bool, logger *zap.Logger) *Controller {
builtin.Initialize()
@ -36,22 +90,38 @@ type Controller struct {
}
func (c *Controller) Query(ctx context.Context, compiler flux.Compiler) (flux.Query, error) {
requestStart := time.Now()
for _, dep := range c.deps {
ctx = dep.Inject(ctx)
}
p, err := compiler.Compile(ctx)
p, err := c.compile(ctx, compiler, requestStart)
if err != nil {
return nil, err
}
if p, ok := p.(lang.LoggingProgram); ok {
p.SetLogger(c.logger)
}
alloc := &memory.Allocator{}
return p.Start(ctx, alloc)
}
func (c *Controller) compile(ctx context.Context, compiler flux.Compiler, requestStart time.Time) (flux.Program, error) {
start := time.Now()
p, err := compiler.Compile(ctx)
if err != nil {
return nil, err
}
p = &program{
Program: p,
requestStart: requestStart,
compileDuration: time.Since(start),
}
if p, ok := p.(lang.LoggingProgram); ok {
p.SetLogger(c.logger)
}
return p, nil
}
func (c *Controller) PrometheusCollectors() []prometheus.Collector {
return nil
}

View File

@ -0,0 +1,69 @@
package control_test
import (
"context"
"testing"
"github.com/influxdata/flux"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/mock"
"github.com/influxdata/influxdb/flux/control"
"github.com/influxdata/influxdb/internal"
imock "github.com/influxdata/influxdb/mock"
"go.uber.org/zap/zaptest"
)
func TestController_Query(t *testing.T) {
mc := &internal.MetaClientMock{}
reader := &imock.Reader{}
ctrl := control.NewController(mc, reader, nil, false, zaptest.NewLogger(t))
t.Run("stats", func(t *testing.T) {
ctx := context.Background()
compiler := &mock.Compiler{
Type: "mock",
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
StartFn: func(ctx context.Context, alloc *memory.Allocator) (*mock.Query, error) {
ch := make(chan flux.Result)
close(ch)
q := &mock.Query{
ResultsCh: ch,
}
q.SetStatistics(flux.Statistics{
MaxAllocated: 1025,
TotalAllocated: 2049,
})
return q, nil
},
}, nil
},
}
q, err := ctrl.Query(ctx, compiler)
if err != nil {
t.Fatal(err)
}
for range q.Results() {
}
q.Done()
gotStats := q.Statistics()
if w, g := int64(1025), gotStats.MaxAllocated; w != g {
t.Errorf("wanted %d max bytes allocated, got %d", w, g)
}
if w, g := int64(2049), gotStats.TotalAllocated; w != g {
t.Errorf("wanted %d total bytes allocated, got %d", w, g)
}
if g := gotStats.CompileDuration; g <= 0 {
t.Errorf("wanted compile duration to be greater than zero, got %d", g)
}
if g := gotStats.ExecuteDuration; g <= 0 {
t.Errorf("wanted execute duration to be greater than zero, got %d", g)
}
if g, w := gotStats.TotalDuration, gotStats.CompileDuration+gotStats.ExecuteDuration; g <= w {
t.Errorf("wanted total duration to be greater than or equal to %d, got %d", w, g)
}
})
}

37
mock/flux.go Normal file
View File

@ -0,0 +1,37 @@
package mock
import (
"context"
"github.com/influxdata/flux/memory"
"github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb"
)
// Reader is a mock implementation of flux/stdlib/influxdata/influxdb.Reader
type Reader struct {
ReadFilterFn func(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error)
ReadGroupFn func(ctx context.Context, spec influxdb.ReadGroupSpec, alloc *memory.Allocator) (influxdb.TableIterator, error)
ReadTagKeysFn func(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc *memory.Allocator) (influxdb.TableIterator, error)
ReadTagValuesFn func(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc *memory.Allocator) (influxdb.TableIterator, error)
CloseFn func()
}
func (m Reader) ReadFilter(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
return m.ReadFilterFn(ctx, spec, alloc)
}
func (m Reader) ReadGroup(ctx context.Context, spec influxdb.ReadGroupSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
return m.ReadGroupFn(ctx, spec, alloc)
}
func (m Reader) ReadTagKeys(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
return m.ReadTagKeysFn(ctx, spec, alloc)
}
func (m Reader) ReadTagValues(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
return m.ReadTagValuesFn(ctx, spec, alloc)
}
func (m Reader) Close() {
m.CloseFn()
}

View File

@ -1442,12 +1442,9 @@ func (h *Handler) logFluxQuery(n int64, stats flux.Statistics, compiler flux.Com
zap.Error(err),
zap.Duration("stat_total_duration", stats.TotalDuration),
zap.Duration("stat_compile_duration", stats.CompileDuration),
zap.Duration("stat_queue_duration", stats.QueueDuration),
zap.Duration("stat_plan_duration", stats.PlanDuration),
zap.Duration("stat_requeue_duration", stats.RequeueDuration),
zap.Duration("stat_execute_duration", stats.ExecuteDuration),
zap.Int64("stat_max_allocated", stats.MaxAllocated),
zap.Int("stat_concurrency", stats.Concurrency),
zap.Int64("stat_total_allocated", stats.TotalAllocated),
)
}