2018-09-11 22:56:51 +00:00
|
|
|
package query
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
2019-04-18 15:35:56 +00:00
|
|
|
"runtime/debug"
|
2018-09-11 22:56:51 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/influxdata/flux"
|
2019-04-09 22:14:11 +00:00
|
|
|
"github.com/influxdata/flux/iocounter"
|
2020-04-03 17:39:20 +00:00
|
|
|
"github.com/influxdata/influxdb/v2/kit/check"
|
|
|
|
"github.com/influxdata/influxdb/v2/kit/tracing"
|
2019-04-18 15:35:56 +00:00
|
|
|
"go.uber.org/zap"
|
|
|
|
"go.uber.org/zap/zapcore"
|
2018-09-11 22:56:51 +00:00
|
|
|
)
|
|
|
|
|
2019-04-09 22:14:11 +00:00
|
|
|
// LoggingProxyQueryService wraps a ProxyQueryService and logs the queries.
|
|
|
|
type LoggingProxyQueryService struct {
|
2019-12-04 23:10:23 +00:00
|
|
|
proxyQueryService ProxyQueryService
|
|
|
|
queryLogger Logger
|
|
|
|
nowFunction func() time.Time
|
|
|
|
log *zap.Logger
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewLoggingProxyQueryService(log *zap.Logger, queryLogger Logger, proxyQueryService ProxyQueryService) *LoggingProxyQueryService {
|
|
|
|
return &LoggingProxyQueryService{
|
|
|
|
proxyQueryService: proxyQueryService,
|
|
|
|
queryLogger: queryLogger,
|
|
|
|
nowFunction: time.Now,
|
|
|
|
log: log,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *LoggingProxyQueryService) SetNowFunctionForTesting(nowFunction func() time.Time) {
|
|
|
|
s.nowFunction = nowFunction
|
2018-09-11 22:56:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Query executes and logs the query.
|
2019-04-09 22:14:11 +00:00
|
|
|
func (s *LoggingProxyQueryService) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (stats flux.Statistics, err error) {
|
2019-03-06 00:18:04 +00:00
|
|
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
2019-03-05 00:38:10 +00:00
|
|
|
defer span.Finish()
|
|
|
|
|
2019-03-07 15:32:13 +00:00
|
|
|
var n int64
|
2018-09-11 22:56:51 +00:00
|
|
|
defer func() {
|
2019-04-18 15:35:56 +00:00
|
|
|
if r := recover(); r != nil {
|
2018-09-11 22:56:51 +00:00
|
|
|
err = fmt.Errorf("panic: %v", r)
|
2019-12-04 23:10:23 +00:00
|
|
|
if entry := s.log.Check(zapcore.InfoLevel, "QueryLogging panic"); entry != nil {
|
2019-04-18 15:35:56 +00:00
|
|
|
entry.Stack = string(debug.Stack())
|
|
|
|
entry.Write(zap.Error(err))
|
|
|
|
}
|
2018-09-11 22:56:51 +00:00
|
|
|
}
|
2019-12-18 15:30:38 +00:00
|
|
|
traceID, sampled, _ := tracing.InfoFromContext(ctx)
|
2018-09-11 22:56:51 +00:00
|
|
|
log := Log{
|
|
|
|
OrganizationID: req.Request.OrganizationID,
|
2019-12-05 14:59:41 +00:00
|
|
|
TraceID: traceID,
|
|
|
|
Sampled: sampled,
|
2018-09-11 22:56:51 +00:00
|
|
|
ProxyRequest: req,
|
|
|
|
ResponseSize: n,
|
2019-12-04 23:10:23 +00:00
|
|
|
Time: s.nowFunction(),
|
2018-09-11 22:56:51 +00:00
|
|
|
Statistics: stats,
|
2019-04-09 22:14:11 +00:00
|
|
|
Error: err,
|
2018-09-11 22:56:51 +00:00
|
|
|
}
|
2019-12-04 23:10:23 +00:00
|
|
|
s.queryLogger.Log(log)
|
2018-09-11 22:56:51 +00:00
|
|
|
}()
|
|
|
|
|
2019-04-09 22:14:11 +00:00
|
|
|
wc := &iocounter.Writer{Writer: w}
|
2019-12-04 23:10:23 +00:00
|
|
|
stats, err = s.proxyQueryService.Query(ctx, wc, req)
|
2018-09-11 22:56:51 +00:00
|
|
|
if err != nil {
|
2019-03-05 00:38:10 +00:00
|
|
|
return stats, tracing.LogError(span, err)
|
2018-09-11 22:56:51 +00:00
|
|
|
}
|
2019-04-09 22:14:11 +00:00
|
|
|
n = wc.Count()
|
2019-03-05 00:38:10 +00:00
|
|
|
return stats, nil
|
2018-09-11 22:56:51 +00:00
|
|
|
}
|
2019-03-26 03:05:44 +00:00
|
|
|
|
2019-04-09 22:14:11 +00:00
|
|
|
func (s *LoggingProxyQueryService) Check(ctx context.Context) check.Response {
|
2019-12-04 23:10:23 +00:00
|
|
|
return s.proxyQueryService.Check(ctx)
|
2019-03-26 03:05:44 +00:00
|
|
|
}
|