feat(flux): Add support for optionally logging Flux queries

New configuration:

```toml
[http]
flux-log-enabled = true
```

will produce log entries similar to:

```
ts=2019-01-11T14:47:46.124347Z lvl=info msg="Executed Flux query" log_id=0CwLysq0000 service=httpd compiler_type=flux response_size=1467 query="from(bucket:\"test\") |> range(start: -5000h) |> limit(n:5)" stat_total_duration=3.949ms stat_compile_duration=3.183ms stat_queue_duration=0.026ms stat_plan_duration=0.055ms stat_requeue_duration=0.000ms stat_execute_duration=0.668ms stat_max_allocated=3200 stat_concurrency=1
```
pull/11001/head
Stuart Carnie 2019-01-11 07:48:02 -07:00
parent a62e7864bb
commit c47a3ea2af
3 changed files with 53 additions and 2 deletions

View File

@ -235,6 +235,9 @@
# Determines whether the Flux query endpoint is enabled.
# flux-enabled = false
# Determines whether the Flux query logging is enabled.
# flux-log-enabled = false
# The bind address used by the HTTP service.
# bind-address = ":8086"

View File

@ -39,6 +39,7 @@ type Config struct {
SuppressWriteLog bool `toml:"suppress-write-log"`
WriteTracing bool `toml:"write-tracing"`
FluxEnabled bool `toml:"flux-enabled"`
FluxLogEnabled bool `toml:"flux-log-enabled"`
PprofEnabled bool `toml:"pprof-enabled"`
DebugPprofEnabled bool `toml:"debug-pprof-enabled"`
HTTPSEnabled bool `toml:"https-enabled"`
@ -66,6 +67,7 @@ func NewConfig() Config {
return Config{
Enabled: true,
FluxEnabled: false,
FluxLogEnabled: false,
BindAddress: DefaultBindAddress,
LogEnabled: true,
PprofEnabled: true,

View File

@ -25,6 +25,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
@ -1177,6 +1178,18 @@ func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user me
}
pr := req.ProxyRequest()
// Logging
var (
stats flux.Statistics
n int64
)
if h.Config.FluxLogEnabled {
defer func() {
h.logFluxQuery(n, stats, pr.Compiler, err)
}()
}
q, err := h.Controller.Query(ctx, pr.Compiler)
if err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
@ -1206,9 +1219,17 @@ func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user me
}
encoder := pr.Dialect.Encoder()
results := flux.NewResultIteratorFromQuery(q)
n, err := encoder.Encode(w, results)
if h.Config.FluxLogEnabled {
if s, ok := results.(flux.Statisticser); ok {
defer func() {
stats = s.Statistics()
}()
}
}
defer results.Release()
n, err = encoder.Encode(w, results)
if err != nil {
results.Release()
if n == 0 {
// If the encoder did not write anything, we can write an error header.
h.httpError(w, err.Error(), http.StatusInternalServerError)
@ -1217,6 +1238,31 @@ func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user me
}
}
func (h *Handler) logFluxQuery(n int64, stats flux.Statistics, compiler flux.Compiler, err error) {
var q string
switch c := compiler.(type) {
case lang.SpecCompiler:
q = fmt.Sprint(flux.Formatted(c.Spec))
case lang.FluxCompiler:
q = c.Query
}
h.Logger.Info("Executed Flux query",
zap.String("compiler_type", string(compiler.CompilerType())),
zap.Int64("response_size", n),
zap.String("query", q),
zap.Error(err),
zap.Duration("stat_total_duration", stats.TotalDuration),
zap.Duration("stat_compile_duration", stats.CompileDuration),
zap.Duration("stat_queue_duration", stats.QueueDuration),
zap.Duration("stat_plan_duration", stats.PlanDuration),
zap.Duration("stat_requeue_duration", stats.RequeueDuration),
zap.Duration("stat_execute_duration", stats.ExecuteDuration),
zap.Int64("stat_max_allocated", stats.MaxAllocated),
zap.Int("stat_concurrency", stats.Concurrency),
)
}
// serveExpvar serves internal metrics in /debug/vars format over HTTP.
func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
// Retrieve statistics from the monitor.