feat(influxdb): add query/write http event recorders

feat(http): add prometheus counters for tracking write/query usage

feat(http/metric): add metric recoder for recording http metrics

feat(prometheus): implement metric.Recorder for prometheus metrics

fix(prometheus): remove erroneous fmt.Printlns

feat(http): add prometheus registry to API backend

This was done as exposing prometheus metrics to a higher level was quite
difficult. It was much simple to simply pass the registry down to
anything that needs it.

feat(cmd/influxd/launcher): pass prom registry in on api backend

feat(http): collect metrics for write and query endpoints

This was much messier than I would have preferred. Future work is
outlined in TODOs.

review(influxdb): rename metric.Metric to metric.Event
pull/13303/head
Michael Desa 2019-04-10 17:08:22 -04:00
parent 53810fadeb
commit e00c071c2c
No known key found for this signature in database
GPG Key ID: 87002651EC5DFFE6
9 changed files with 220 additions and 25 deletions

View File

@ -614,8 +614,12 @@ func (m *Launcher) run(ctx context.Context) (err error) {
LookupService: lookupSvc,
DocumentService: m.kvService,
OrgLookupService: m.kvService,
WriteEventRecorder: infprom.NewEventRecorder("write"),
QueryEventRecorder: infprom.NewEventRecorder("query"),
}
m.reg.MustRegister(m.apibackend.PrometheusCollectors()...)
// HTTP server
httpLogger := m.logger.With(zap.String("service", "http"))
platformHandler := http.NewPlatformHandler(m.apibackend)

View File

@ -7,8 +7,11 @@ import (
influxdb "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/authorizer"
"github.com/influxdata/influxdb/chronograf/server"
"github.com/influxdata/influxdb/http/metric"
"github.com/influxdata/influxdb/kit/prom"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/storage"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
@ -44,6 +47,9 @@ type APIBackend struct {
NewBucketService func(*influxdb.Source) (influxdb.BucketService, error)
NewQueryService func(*influxdb.Source) (query.ProxyQueryService, error)
WriteEventRecorder metric.EventRecorder
QueryEventRecorder metric.EventRecorder
PointsWriter storage.PointsWriter
AuthorizationService influxdb.AuthorizationService
BucketService influxdb.BucketService
@ -73,6 +79,21 @@ type APIBackend struct {
DocumentService influxdb.DocumentService
}
// PrometheusCollectors exposes the prometheus collectors associated with an APIBackend.
func (b *APIBackend) PrometheusCollectors() []prometheus.Collector {
var cs []prometheus.Collector
if pc, ok := b.WriteEventRecorder.(prom.PrometheusCollector); ok {
cs = append(cs, pc.PrometheusCollectors()...)
}
if pc, ok := b.QueryEventRecorder.(prom.PrometheusCollector); ok {
cs = append(cs, pc.PrometheusCollectors()...)
}
return cs
}
// NewAPIHandler constructs all api handlers beneath it and returns an APIHandler
func NewAPIHandler(b *APIBackend) *APIHandler {
h := &APIHandler{}

21
http/metric/recorder.go Normal file
View File

@ -0,0 +1,21 @@
package metric
import (
"context"
"github.com/influxdata/influxdb"
)
// EventRecorder records meta-data associated with http requests.
type EventRecorder interface {
Record(ctx context.Context, e Event)
}
// Event represents the meta data associated with an API request.
type Event struct {
OrgID influxdb.ID
Endpoint string
RequestBytes int
ResponseBytes int
Status int
}

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"mime"
"net/http"
@ -306,8 +307,9 @@ func QueryRequestFromProxyRequest(req *query.ProxyRequest) (*QueryRequest, error
return qr, nil
}
func decodeQueryRequest(ctx context.Context, r *http.Request, svc influxdb.OrganizationService) (*QueryRequest, error) {
func decodeQueryRequest(ctx context.Context, r *http.Request, svc influxdb.OrganizationService) (*QueryRequest, int, error) {
var req QueryRequest
body := &countReader{Reader: r.Body}
var contentType = "application/json"
if ct := r.Header.Get("Content-Type"); ct != "" {
@ -315,41 +317,52 @@ func decodeQueryRequest(ctx context.Context, r *http.Request, svc influxdb.Organ
}
mt, _, err := mime.ParseMediaType(contentType)
if err != nil {
return nil, err
return nil, body.bytesRead, err
}
switch mt {
case "application/vnd.flux":
body, err := ioutil.ReadAll(r.Body)
octets, err := ioutil.ReadAll(body)
if err != nil {
return nil, err
return nil, body.bytesRead, err
}
req.Query = string(body)
req.Query = string(octets)
case "application/json":
fallthrough
default:
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, err
if err := json.NewDecoder(body).Decode(&req); err != nil {
return nil, body.bytesRead, err
}
}
req = req.WithDefaults()
if err := req.Validate(); err != nil {
return nil, err
return nil, body.bytesRead, err
}
req.Org, err = queryOrganization(ctx, r, svc)
return &req, err
return &req, body.bytesRead, err
}
func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth influxdb.Authorizer, svc influxdb.OrganizationService) (*query.ProxyRequest, error) {
req, err := decodeQueryRequest(ctx, r, svc)
type countReader struct {
bytesRead int
io.Reader
}
func (r *countReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
r.bytesRead += n
return n, err
}
func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth influxdb.Authorizer, svc influxdb.OrganizationService) (*query.ProxyRequest, int, error) {
req, n, err := decodeQueryRequest(ctx, r, svc)
if err != nil {
return nil, err
return nil, n, err
}
pr, err := req.ProxyRequest()
if err != nil {
return nil, err
return nil, n, err
}
var token *influxdb.Authorization
@ -359,9 +372,9 @@ func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth influxdb
case *influxdb.Session:
token = a.EphemeralAuth(req.Org.ID)
default:
return pr, influxdb.ErrAuthorizerNotSupported
return pr, n, influxdb.ErrAuthorizerNotSupported
}
pr.Request.Authorization = token
return pr, nil
return pr, n, nil
}

View File

@ -19,12 +19,13 @@ import (
"github.com/influxdata/flux/parser"
platform "github.com/influxdata/influxdb"
pcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/http/metric"
"github.com/influxdata/influxdb/kit/check"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/query"
"github.com/julienschmidt/httprouter"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
prom "github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
@ -35,7 +36,8 @@ const (
// FluxBackend is all services and associated parameters required to construct
// the FluxHandler.
type FluxBackend struct {
Logger *zap.Logger
Logger *zap.Logger
QueryEventRecorder metric.EventRecorder
OrganizationService platform.OrganizationService
ProxyQueryService query.ProxyQueryService
@ -44,7 +46,8 @@ type FluxBackend struct {
// NewFluxBackend returns a new instance of FluxBackend.
func NewFluxBackend(b *APIBackend) *FluxBackend {
return &FluxBackend{
Logger: b.Logger.With(zap.String("handler", "query")),
Logger: b.Logger.With(zap.String("handler", "query")),
QueryEventRecorder: b.QueryEventRecorder,
ProxyQueryService: b.FluxService,
OrganizationService: b.OrganizationService,
@ -60,6 +63,8 @@ type FluxHandler struct {
Now func() time.Time
OrganizationService platform.OrganizationService
ProxyQueryService query.ProxyQueryService
EventRecorder metric.EventRecorder
}
// NewFluxHandler returns a new handler at /api/v2/query for flux queries.
@ -71,6 +76,7 @@ func NewFluxHandler(b *FluxBackend) *FluxHandler {
ProxyQueryService: b.ProxyQueryService,
OrganizationService: b.OrganizationService,
EventRecorder: b.QueryEventRecorder,
}
h.HandlerFunc("POST", fluxPath, h.handleQuery)
@ -88,17 +94,34 @@ func (h *FluxHandler) handleQuery(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// TODO(desa): I really don't like how we're recording the usage metrics here
// Ideally this will be moved when we solve https://github.com/influxdata/influxdb/issues/13403
var orgID platform.ID
var requestBytes int
sw := newStatusResponseWriter(w)
w = sw
defer func() {
h.EventRecorder.Record(ctx, metric.Event{
OrgID: orgID,
Endpoint: r.URL.Path, // This should be sufficient for the time being as it should only be single endpoint.
RequestBytes: requestBytes,
ResponseBytes: sw.responseBytes,
Status: sw.code(),
})
}()
a, err := pcontext.GetAuthorizer(ctx)
if err != nil {
EncodeError(ctx, err, w)
return
}
req, err := decodeProxyQueryRequest(ctx, r, a, h.OrganizationService)
req, n, err := decodeProxyQueryRequest(ctx, r, a, h.OrganizationService)
if err != nil && err != platform.ErrAuthorizerNotSupported {
EncodeError(ctx, err, w)
return
}
requestBytes = n
// Transform the context into one with the request's authorization.
ctx = pcontext.SetAuthorizer(ctx, req.Request.Authorization)
@ -316,7 +339,7 @@ func (h *FluxHandler) getFluxSuggestion(w http.ResponseWriter, r *http.Request)
}
// PrometheusCollectors satisifies the prom.PrometheusCollector interface.
func (h *FluxHandler) PrometheusCollectors() []prometheus.Collector {
func (h *FluxHandler) PrometheusCollectors() []prom.Collector {
// TODO: gather and return relevant metrics.
return nil
}

View File

@ -478,7 +478,7 @@ func Test_decodeQueryRequest(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := decodeQueryRequest(tt.args.ctx, tt.args.r, tt.args.svc)
got, _, err := decodeQueryRequest(tt.args.ctx, tt.args.r, tt.args.svc)
if (err != nil) != tt.wantErr {
t.Errorf("decodeQueryRequest() error = %v, wantErr %v", err, tt.wantErr)
return
@ -684,7 +684,7 @@ func Test_decodeProxyQueryRequest(t *testing.T) {
cmpOptions := append(cmpOptions, cmpopts.IgnoreFields(lang.ASTCompiler{}, "Now"))
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := decodeProxyQueryRequest(tt.args.ctx, tt.args.r, tt.args.auth, tt.args.svc)
got, _, err := decodeProxyQueryRequest(tt.args.ctx, tt.args.r, tt.args.auth, tt.args.svc)
if (err != nil) != tt.wantErr {
t.Errorf("decodeProxyQueryRequest() error = %v, wantErr %v", err, tt.wantErr)
return

View File

@ -3,7 +3,8 @@ package http
import "net/http"
type statusResponseWriter struct {
statusCode int
statusCode int
responseBytes int
http.ResponseWriter
}
@ -13,6 +14,12 @@ func newStatusResponseWriter(w http.ResponseWriter) *statusResponseWriter {
}
}
func (w *statusResponseWriter) Write(b []byte) (int, error) {
n, err := w.ResponseWriter.Write(b)
w.responseBytes += n
return n, err
}
// WriteHeader writes the header and captures the status code.
func (w *statusResponseWriter) WriteHeader(statusCode int) {
w.statusCode = statusCode
@ -28,6 +35,7 @@ func (w *statusResponseWriter) code() int {
}
return code
}
func (w *statusResponseWriter) statusCodeClass() string {
class := "XXX"
switch w.code() / 100 {

View File

@ -9,6 +9,7 @@ import (
"net/http"
"time"
"github.com/influxdata/influxdb/http/metric"
"github.com/julienschmidt/httprouter"
"go.uber.org/zap"
@ -23,7 +24,8 @@ import (
// WriteBackend is all services and associated parameters required to construct
// the WriteHandler.
type WriteBackend struct {
Logger *zap.Logger
Logger *zap.Logger
WriteEventRecorder metric.EventRecorder
PointsWriter storage.PointsWriter
BucketService platform.BucketService
@ -33,7 +35,8 @@ type WriteBackend struct {
// NewWriteBackend returns a new instance of WriteBackend.
func NewWriteBackend(b *APIBackend) *WriteBackend {
return &WriteBackend{
Logger: b.Logger.With(zap.String("handler", "write")),
Logger: b.Logger.With(zap.String("handler", "write")),
WriteEventRecorder: b.WriteEventRecorder,
PointsWriter: b.PointsWriter,
BucketService: b.BucketService,
@ -51,6 +54,8 @@ type WriteHandler struct {
OrganizationService platform.OrganizationService
PointsWriter storage.PointsWriter
EventRecorder metric.EventRecorder
}
const (
@ -68,6 +73,7 @@ func NewWriteHandler(b *WriteBackend) *WriteHandler {
PointsWriter: b.PointsWriter,
BucketService: b.BucketService,
OrganizationService: b.OrganizationService,
EventRecorder: b.WriteEventRecorder,
}
h.HandlerFunc("POST", writePath, h.handleWrite)
@ -81,6 +87,22 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
defer r.Body.Close()
// TODO(desa): I really don't like how we're recording the usage metrics here
// Ideally this will be moved when we solve https://github.com/influxdata/influxdb/issues/13403
var orgID platform.ID
var requestBytes int
sw := newStatusResponseWriter(w)
w = sw
defer func() {
h.EventRecorder.Record(ctx, metric.Event{
OrgID: orgID,
Endpoint: r.URL.Path, // This should be sufficient for the time being as it should only be single endpoint.
RequestBytes: requestBytes,
ResponseBytes: sw.responseBytes,
Status: sw.code(),
})
}()
in := r.Body
if r.Header.Get("Content-Encoding") == "gzip" {
var err error
@ -132,6 +154,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
org = o
}
orgID = org.ID
var bucket *platform.Bucket
if id, err := platform.IDFromString(req.Bucket); err == nil {
@ -198,6 +221,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
}, w)
return
}
requestBytes = len(data)
points, err := models.ParsePointsWithPrecision(data, time.Now(), req.Precision)
if err != nil {

View File

@ -0,0 +1,81 @@
package prometheus
import (
"context"
"fmt"
"github.com/influxdata/influxdb/http/metric"
"github.com/prometheus/client_golang/prometheus"
)
// EventRecorder implements http/metric.EventRecorder. It is used to collect
// http api metrics.
type EventRecorder struct {
count *prometheus.CounterVec
requestBytes *prometheus.CounterVec
responseBytes *prometheus.CounterVec
}
// NewEventRecorder returns an instance of a metric event recorder. Subsystem is expected to be
// descriptive of the type of metric being recorded. Possible values may include write, query,
// task, dashboard, etc.
//
// The general structure of the metrics produced from the metric recorder should be
//
// http_<subsystem>_request_count{org_id=<org_id>, status=<status>, endpoint=<endpoint>} ...
// http_<subsystem>_request_bytes{org_id=<org_id>, status=<status>, endpoint=<endpoint>} ...
// http_<subsystem>_response_bytes{org_id=<org_id>, status=<status>, endpoint=<endpoint>} ...
func NewEventRecorder(subsystem string) *EventRecorder {
const namespace = "http"
labels := []string{"org_id", "status", "endpoint"}
count := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_count",
Help: "Total number of query requests",
}, labels)
requestBytes := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_bytes",
Help: "Count of bytes received",
}, labels)
responseBytes := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "response_bytes",
Help: "Count of bytes returned",
}, labels)
return &EventRecorder{
count: count,
requestBytes: requestBytes,
responseBytes: responseBytes,
}
}
// Record metric records the request count, response bytes, and request bytes with labels
// for the org, endpoint, and status.
func (r *EventRecorder) Record(ctx context.Context, e metric.Event) {
labels := prometheus.Labels{
"org_id": e.OrgID.String(),
"endpoint": e.Endpoint,
"status": fmt.Sprintf("%d", e.Status),
}
r.count.With(labels).Inc()
r.requestBytes.With(labels).Add(float64(e.RequestBytes))
r.responseBytes.With(labels).Add(float64(e.ResponseBytes))
}
// PrometheusCollectors exposes the prometheus collectors associated with a metric recorder.
func (r *EventRecorder) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
r.count,
r.requestBytes,
r.responseBytes,
}
}