fix(query): change logging bridge to be a service instead
It is no longer necessary for the query logging to be a bridge as the stats are available for consumption from the ProxyQueryService. This change changes the logging bridge to directly implement the proxy query service instead of implementing a bridge.pull/13332/head
parent
9a5126d29a
commit
f37e65f26c
|
@ -12,8 +12,8 @@ import (
|
|||
"github.com/influxdata/flux/csv"
|
||||
"github.com/influxdata/flux/lang"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/mock"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/query/mock"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -34,7 +34,7 @@ func TestProxyQueryService_Query(t *testing.T) {
|
|||
t.Fatalf("error adding dialect mappings: %v", err)
|
||||
}
|
||||
h.ProxyQueryService = &mock.ProxyQueryService{
|
||||
QueryFn: func(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
||||
QueryF: func(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
||||
if _, err := io.WriteString(w, "boo"); err != nil {
|
||||
return flux.Statistics{}, err
|
||||
}
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/influxdb/kit/check"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
)
|
||||
|
||||
var _ query.ProxyQueryService = (*ProxyQueryService)(nil)
|
||||
|
||||
// ProxyQueryService is a mock implementation of a query.ProxyQueryService.
|
||||
type ProxyQueryService struct {
|
||||
QueryFn func(context.Context, io.Writer, *query.ProxyRequest) (flux.Statistics, error)
|
||||
}
|
||||
|
||||
// NewProxyQueryService returns a mock of ProxyQueryService where its methods will return zero values.
|
||||
func NewProxyQueryService() *ProxyQueryService {
|
||||
return &ProxyQueryService{
|
||||
QueryFn: func(context.Context, io.Writer, *query.ProxyRequest) (flux.Statistics, error) {
|
||||
return flux.Statistics{}, nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Query performs the requested query and encodes the results into w.
|
||||
func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
||||
return s.QueryFn(ctx, w, req)
|
||||
}
|
||||
|
||||
func (*ProxyQueryService) Check(ctx context.Context) check.Response {
|
||||
return check.Response{Name: "Mock Query Service", Status: check.StatusPass}
|
||||
}
|
|
@ -7,18 +7,20 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/iocounter"
|
||||
"github.com/influxdata/influxdb/kit/check"
|
||||
"github.com/influxdata/influxdb/kit/tracing"
|
||||
)
|
||||
|
||||
// LoggingServiceBridge implements ProxyQueryService and logs the queries while consuming a QueryService interface.
|
||||
type LoggingServiceBridge struct {
|
||||
QueryService QueryService
|
||||
QueryLogger Logger
|
||||
// LoggingProxyQueryService wraps a ProxyQueryService and logs the queries.
|
||||
type LoggingProxyQueryService struct {
|
||||
ProxyQueryService ProxyQueryService
|
||||
QueryLogger Logger
|
||||
NowFunction func() time.Time
|
||||
}
|
||||
|
||||
// Query executes and logs the query.
|
||||
func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (stats flux.Statistics, err error) {
|
||||
func (s *LoggingProxyQueryService) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (stats flux.Statistics, err error) {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
|
@ -28,42 +30,32 @@ func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *Prox
|
|||
if r != nil {
|
||||
err = fmt.Errorf("panic: %v", r)
|
||||
}
|
||||
var now time.Time
|
||||
if s.NowFunction != nil {
|
||||
now = s.NowFunction()
|
||||
} else {
|
||||
now = time.Now()
|
||||
}
|
||||
log := Log{
|
||||
OrganizationID: req.Request.OrganizationID,
|
||||
ProxyRequest: req,
|
||||
ResponseSize: n,
|
||||
Time: time.Now(),
|
||||
Time: now,
|
||||
Statistics: stats,
|
||||
}
|
||||
if err != nil {
|
||||
log.Error = err
|
||||
Error: err,
|
||||
}
|
||||
s.QueryLogger.Log(log)
|
||||
}()
|
||||
|
||||
results, err := s.QueryService.Query(ctx, &req.Request)
|
||||
wc := &iocounter.Writer{Writer: w}
|
||||
stats, err = s.ProxyQueryService.Query(ctx, wc, req)
|
||||
if err != nil {
|
||||
return stats, tracing.LogError(span, err)
|
||||
}
|
||||
// Check if this result iterator reports stats. We call this defer before cancel because
|
||||
// the query needs to be finished before it will have valid statistics.
|
||||
defer func() {
|
||||
results.Release()
|
||||
stats = results.Statistics()
|
||||
}()
|
||||
|
||||
encoder := req.Dialect.Encoder()
|
||||
n, err = encoder.Encode(w, results)
|
||||
if err != nil {
|
||||
return stats, tracing.LogError(span, err)
|
||||
}
|
||||
// The results iterator may have had an error independent of encoding errors.
|
||||
if err = results.Err(); err != nil {
|
||||
return stats, tracing.LogError(span, err)
|
||||
}
|
||||
n = wc.Count()
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func (s *LoggingServiceBridge) Check(ctx context.Context) check.Response {
|
||||
return s.QueryService.Check(ctx)
|
||||
func (s *LoggingProxyQueryService) Check(ctx context.Context) check.Response {
|
||||
return s.ProxyQueryService.Check(ctx)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
package query_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/influxdata/flux"
|
||||
platform "github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/query/mock"
|
||||
)
|
||||
|
||||
var orgID = MustIDBase16("ba55ba55ba55ba55")
|
||||
|
||||
// MustIDBase16 is an helper to ensure a correct ID is built during testing.
|
||||
func MustIDBase16(s string) platform.ID {
|
||||
id, err := platform.IDFromString(s)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return *id
|
||||
}
|
||||
|
||||
var opts = []cmp.Option{
|
||||
cmpopts.IgnoreUnexported(query.ProxyRequest{}),
|
||||
cmpopts.IgnoreUnexported(query.Request{}),
|
||||
}
|
||||
|
||||
func TestLoggingProxyQueryService(t *testing.T) {
|
||||
wantStats := flux.Statistics{
|
||||
TotalDuration: time.Second,
|
||||
CompileDuration: time.Second,
|
||||
QueueDuration: time.Second,
|
||||
PlanDuration: time.Second,
|
||||
RequeueDuration: time.Second,
|
||||
ExecuteDuration: time.Second,
|
||||
Concurrency: 2,
|
||||
MaxAllocated: 2048,
|
||||
}
|
||||
wantBytes := 10
|
||||
pqs := &mock.ProxyQueryService{
|
||||
QueryF: func(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
||||
w.Write(make([]byte, wantBytes))
|
||||
return wantStats, nil
|
||||
},
|
||||
}
|
||||
var logs []query.Log
|
||||
logger := &mock.QueryLogger{
|
||||
LogFn: func(l query.Log) error {
|
||||
logs = append(logs, l)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
wantTime := time.Now()
|
||||
lpqs := query.LoggingProxyQueryService{
|
||||
ProxyQueryService: pqs,
|
||||
QueryLogger: logger,
|
||||
NowFunction: func() time.Time {
|
||||
return wantTime
|
||||
},
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
req := &query.ProxyRequest{
|
||||
Request: query.Request{
|
||||
Authorization: nil,
|
||||
OrganizationID: orgID,
|
||||
Compiler: nil,
|
||||
},
|
||||
Dialect: nil,
|
||||
}
|
||||
stats, err := lpqs.Query(context.Background(), &buf, req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !cmp.Equal(wantStats, stats, opts...) {
|
||||
t.Errorf("unexpected query stats: -want/+got\n%s", cmp.Diff(wantStats, stats, opts...))
|
||||
}
|
||||
wantLogs := []query.Log{{
|
||||
Time: wantTime,
|
||||
OrganizationID: orgID,
|
||||
Error: nil,
|
||||
ProxyRequest: req,
|
||||
ResponseSize: int64(wantBytes),
|
||||
Statistics: wantStats,
|
||||
}}
|
||||
if !cmp.Equal(wantLogs, logs, opts...) {
|
||||
t.Errorf("unexpected query logs: -want/+got\n%s", cmp.Diff(wantLogs, logs, opts...))
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package mock
|
||||
|
||||
import "github.com/influxdata/influxdb/query"
|
||||
|
||||
var _ query.Logger = (*QueryLogger)(nil)
|
||||
|
||||
type QueryLogger struct {
|
||||
LogFn func(query.Log) error
|
||||
}
|
||||
|
||||
func (l *QueryLogger) Log(log query.Log) error {
|
||||
return l.LogFn(log)
|
||||
}
|
Loading…
Reference in New Issue