parent
89d6f9ea3e
commit
42bb664aaf
|
@ -32,6 +32,7 @@ import (
|
|||
"github.com/influxdata/flux/memory"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/kit/errors"
|
||||
"github.com/influxdata/influxdb/kit/prom"
|
||||
"github.com/influxdata/influxdb/kit/tracing"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
|
@ -421,9 +422,15 @@ func (c *Controller) Shutdown(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
// PrometheusCollectors satisifies the prom.PrometheusCollector interface.
|
||||
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
|
||||
func (c *Controller) PrometheusCollectors() []prometheus.Collector {
|
||||
return c.metrics.PrometheusCollectors()
|
||||
collectors := c.metrics.PrometheusCollectors()
|
||||
for _, v := range c.dependencies {
|
||||
if pc, ok := v.(prom.PrometheusCollector); ok {
|
||||
collectors = append(collectors, pc.PrometheusCollectors()...)
|
||||
}
|
||||
}
|
||||
return collectors
|
||||
}
|
||||
|
||||
// Query represents a single request.
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
package influxdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/flux/execute"
|
||||
platform "github.com/influxdata/influxdb"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
orgLabel = "org"
|
||||
opLabel = "op"
|
||||
)
|
||||
|
||||
type metrics struct {
|
||||
ctxLabelKeys []string
|
||||
requestDur *prometheus.HistogramVec
|
||||
}
|
||||
|
||||
// NewMetrics produces a new metrics objects for an influxdb source.
|
||||
// Currently it just collects the duration of read requests into a histogram.
|
||||
// ctxLabelKeys is a list of labels to add to the produced metrics.
|
||||
// The value for a given key will be read off the context.
|
||||
// The context value must be a string or an implementation of the Stringer interface.
|
||||
// In addition, produced metrics will be labeled with the orgID and type of operation requested.
|
||||
func NewMetrics(ctxLabelKeys []string) *metrics {
|
||||
labelKeys := make([]string, len(ctxLabelKeys)+2)
|
||||
copy(labelKeys, ctxLabelKeys)
|
||||
labelKeys[len(labelKeys)-2] = orgLabel
|
||||
labelKeys[len(labelKeys)-1] = opLabel
|
||||
|
||||
m := new(metrics)
|
||||
m.requestDur = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "query",
|
||||
Subsystem: "influxdb_source",
|
||||
Name: "read_request_duration_seconds",
|
||||
Help: "Histogram of times spent in read requests",
|
||||
Buckets: prometheus.ExponentialBuckets(1e-3, 5, 7),
|
||||
}, labelKeys)
|
||||
m.ctxLabelKeys = ctxLabelKeys
|
||||
return m
|
||||
}
|
||||
|
||||
// PrometheusCollectors satisfies the PrometheusCollector interface.
|
||||
func (m *metrics) PrometheusCollectors() []prometheus.Collector {
|
||||
if m == nil {
|
||||
// if metrics happens to be nil here (such as for a test), then let's not panic.
|
||||
return nil
|
||||
}
|
||||
return []prometheus.Collector{
|
||||
m.requestDur,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *metrics) getLabelValues(ctx context.Context, orgID platform.ID, op string) []string {
|
||||
if m == nil {
|
||||
return nil
|
||||
}
|
||||
labelValues := make([]string, len(m.ctxLabelKeys)+2)
|
||||
for i, k := range m.ctxLabelKeys {
|
||||
value := ctx.Value(k)
|
||||
var str string
|
||||
switch v := value.(type) {
|
||||
case string:
|
||||
str = v
|
||||
case fmt.Stringer:
|
||||
str = v.String()
|
||||
}
|
||||
labelValues[i] = str
|
||||
}
|
||||
labelValues[len(labelValues)-2] = orgID.String()
|
||||
labelValues[len(labelValues)-1] = op
|
||||
return labelValues
|
||||
}
|
||||
|
||||
func (m *metrics) recordMetrics(labelValues []string, start time.Time) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.requestDur.WithLabelValues(labelValues...).Observe(time.Since(start).Seconds())
|
||||
}
|
||||
|
||||
func getMetricsFromDependencies(depsMap execute.Dependencies) *metrics {
|
||||
return depsMap[FromKind].(Dependencies).Metrics
|
||||
}
|
|
@ -3,6 +3,7 @@ package influxdb
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/codes"
|
||||
|
@ -10,6 +11,7 @@ import (
|
|||
"github.com/influxdata/flux/memory"
|
||||
"github.com/influxdata/flux/plan"
|
||||
"github.com/influxdata/flux/semantic"
|
||||
platform "github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/kit/tracing"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/tsdb/cursors"
|
||||
|
@ -34,10 +36,17 @@ type Source struct {
|
|||
stats cursors.CursorStats
|
||||
|
||||
runner runner
|
||||
|
||||
m *metrics
|
||||
orgID platform.ID
|
||||
op string
|
||||
}
|
||||
|
||||
func (s *Source) Run(ctx context.Context) {
|
||||
labelValues := s.m.getLabelValues(ctx, s.orgID, s.op)
|
||||
start := time.Now()
|
||||
err := s.runner.run(ctx)
|
||||
s.m.recordMetrics(labelValues, start)
|
||||
for _, t := range s.ts {
|
||||
t.Finish(s.id, err)
|
||||
}
|
||||
|
@ -105,15 +114,19 @@ type readFilterSource struct {
|
|||
readSpec ReadFilterSpec
|
||||
}
|
||||
|
||||
func ReadFilterSource(id execute.DatasetID, r Reader, readSpec ReadFilterSpec, alloc *memory.Allocator) execute.Source {
|
||||
func ReadFilterSource(id execute.DatasetID, r Reader, readSpec ReadFilterSpec, a execute.Administration) execute.Source {
|
||||
src := new(readFilterSource)
|
||||
|
||||
src.id = id
|
||||
src.alloc = alloc
|
||||
src.alloc = a.Allocator()
|
||||
|
||||
src.reader = r
|
||||
src.readSpec = readSpec
|
||||
|
||||
src.m = getMetricsFromDependencies(a.Dependencies())
|
||||
src.orgID = readSpec.OrganizationID
|
||||
src.op = "readFilter"
|
||||
|
||||
src.runner = src
|
||||
return src
|
||||
}
|
||||
|
@ -174,7 +187,7 @@ func createReadFilterSource(s plan.ProcedureSpec, id execute.DatasetID, a execut
|
|||
Bounds: *bounds,
|
||||
Predicate: filter,
|
||||
},
|
||||
a.Allocator(),
|
||||
a,
|
||||
), nil
|
||||
}
|
||||
|
||||
|
@ -184,15 +197,19 @@ type readGroupSource struct {
|
|||
readSpec ReadGroupSpec
|
||||
}
|
||||
|
||||
func ReadGroupSource(id execute.DatasetID, r Reader, readSpec ReadGroupSpec, alloc *memory.Allocator) execute.Source {
|
||||
func ReadGroupSource(id execute.DatasetID, r Reader, readSpec ReadGroupSpec, a execute.Administration) execute.Source {
|
||||
src := new(readGroupSource)
|
||||
|
||||
src.id = id
|
||||
src.alloc = alloc
|
||||
src.alloc = a.Allocator()
|
||||
|
||||
src.reader = r
|
||||
src.readSpec = readSpec
|
||||
|
||||
src.m = getMetricsFromDependencies(a.Dependencies())
|
||||
src.orgID = readSpec.OrganizationID
|
||||
src.op = "readGroup"
|
||||
|
||||
src.runner = src
|
||||
return src
|
||||
}
|
||||
|
@ -252,7 +269,7 @@ func createReadGroupSource(s plan.ProcedureSpec, id execute.DatasetID, a execute
|
|||
GroupKeys: spec.GroupKeys,
|
||||
AggregateMethod: spec.AggregateMethod,
|
||||
},
|
||||
a.Allocator(),
|
||||
a,
|
||||
), nil
|
||||
}
|
||||
|
||||
|
@ -290,7 +307,7 @@ func createReadTagKeysSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID,
|
|||
Predicate: filter,
|
||||
},
|
||||
},
|
||||
a.Allocator(),
|
||||
a,
|
||||
), nil
|
||||
}
|
||||
|
||||
|
@ -301,13 +318,18 @@ type readTagKeysSource struct {
|
|||
readSpec ReadTagKeysSpec
|
||||
}
|
||||
|
||||
func ReadTagKeysSource(id execute.DatasetID, r Reader, readSpec ReadTagKeysSpec, alloc *memory.Allocator) execute.Source {
|
||||
func ReadTagKeysSource(id execute.DatasetID, r Reader, readSpec ReadTagKeysSpec, a execute.Administration) execute.Source {
|
||||
src := &readTagKeysSource{
|
||||
reader: r,
|
||||
readSpec: readSpec,
|
||||
}
|
||||
src.id = id
|
||||
src.alloc = alloc
|
||||
src.alloc = a.Allocator()
|
||||
|
||||
src.m = getMetricsFromDependencies(a.Dependencies())
|
||||
src.orgID = readSpec.OrganizationID
|
||||
src.op = "readTagKeys"
|
||||
|
||||
src.runner = src
|
||||
return src
|
||||
}
|
||||
|
@ -355,7 +377,7 @@ func createReadTagValuesSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID
|
|||
},
|
||||
TagKey: spec.TagKey,
|
||||
},
|
||||
a.Allocator(),
|
||||
a,
|
||||
), nil
|
||||
}
|
||||
|
||||
|
@ -366,13 +388,18 @@ type readTagValuesSource struct {
|
|||
readSpec ReadTagValuesSpec
|
||||
}
|
||||
|
||||
func ReadTagValuesSource(id execute.DatasetID, r Reader, readSpec ReadTagValuesSpec, alloc *memory.Allocator) execute.Source {
|
||||
func ReadTagValuesSource(id execute.DatasetID, r Reader, readSpec ReadTagValuesSpec, a execute.Administration) execute.Source {
|
||||
src := &readTagValuesSource{
|
||||
reader: r,
|
||||
readSpec: readSpec,
|
||||
}
|
||||
src.id = id
|
||||
src.alloc = alloc
|
||||
src.alloc = a.Allocator()
|
||||
|
||||
src.m = getMetricsFromDependencies(a.Dependencies())
|
||||
src.orgID = readSpec.OrganizationID
|
||||
src.op = "readTagValues"
|
||||
|
||||
src.runner = src
|
||||
return src
|
||||
}
|
||||
|
|
|
@ -0,0 +1,142 @@
|
|||
package influxdb_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/memory"
|
||||
platform "github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/kit/prom"
|
||||
"github.com/influxdata/influxdb/kit/prom/promtest"
|
||||
"github.com/influxdata/influxdb/mock"
|
||||
"github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/tsdb/cursors"
|
||||
"github.com/influxdata/influxdb/uuid"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type mockTableIterator struct {
|
||||
}
|
||||
|
||||
func (mockTableIterator) Do(f func(flux.Table) error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mockTableIterator) Statistics() cursors.CursorStats {
|
||||
return cursors.CursorStats{}
|
||||
}
|
||||
|
||||
type mockReader struct {
|
||||
}
|
||||
|
||||
func (mockReader) ReadFilter(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
|
||||
return &mockTableIterator{}, nil
|
||||
}
|
||||
|
||||
func (mockReader) ReadGroup(ctx context.Context, spec influxdb.ReadGroupSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
|
||||
return &mockTableIterator{}, nil
|
||||
}
|
||||
|
||||
func (mockReader) ReadTagKeys(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
|
||||
return &mockTableIterator{}, nil
|
||||
}
|
||||
|
||||
func (mockReader) ReadTagValues(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
|
||||
return &mockTableIterator{}, nil
|
||||
}
|
||||
|
||||
func (mockReader) Close() {
|
||||
}
|
||||
|
||||
type mockAdministration struct {
|
||||
DependenciesFn func() execute.Dependencies
|
||||
}
|
||||
|
||||
func (mockAdministration) Context() context.Context {
|
||||
return context.Background()
|
||||
}
|
||||
|
||||
func (mockAdministration) ResolveTime(qt flux.Time) execute.Time {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (mockAdministration) StreamContext() execute.StreamContext {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mockAdministration) Allocator() *memory.Allocator {
|
||||
return &memory.Allocator{}
|
||||
}
|
||||
|
||||
func (mockAdministration) Parents() []execute.DatasetID {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a mockAdministration) Dependencies() execute.Dependencies {
|
||||
return a.DependenciesFn()
|
||||
}
|
||||
|
||||
const (
|
||||
labelKey = "key1"
|
||||
labelValue = "value1"
|
||||
)
|
||||
|
||||
// TestMetrics ensures that the metrics collected by an influxdb source are recorded.
|
||||
func TestMetrics(t *testing.T) {
|
||||
reg := prometheus.NewRegistry()
|
||||
|
||||
orgID, err := platform.IDFromString("deadbeefbeefdead")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var a execute.Administration
|
||||
{
|
||||
fromDeps := influxdb.Dependencies{
|
||||
Reader: &mockReader{},
|
||||
BucketLookup: mock.BucketLookup{},
|
||||
OrganizationLookup: mock.OrganizationLookup{},
|
||||
Metrics: influxdb.NewMetrics([]string{labelKey}),
|
||||
}
|
||||
a = &mockAdministration{
|
||||
DependenciesFn: func() execute.Dependencies {
|
||||
deps := make(map[string]interface{})
|
||||
deps[influxdb.FromKind] = fromDeps
|
||||
return deps
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
for _, v := range a.Dependencies() {
|
||||
if pc, ok := v.(prom.PrometheusCollector); ok {
|
||||
reg.MustRegister(pc.PrometheusCollectors()...)
|
||||
}
|
||||
}
|
||||
|
||||
// This key/value pair added to the context will appear as a label in the prometheus histogram.
|
||||
ctx := context.WithValue(context.Background(), labelKey, labelValue)
|
||||
rfs := influxdb.ReadFilterSource(
|
||||
execute.DatasetID(uuid.FromTime(time.Now())),
|
||||
&mockReader{},
|
||||
influxdb.ReadFilterSpec{
|
||||
OrganizationID: *orgID,
|
||||
},
|
||||
a,
|
||||
)
|
||||
rfs.Run(ctx)
|
||||
|
||||
// Verify that we sampled the execution of the source by checking the prom registry.
|
||||
mfs := promtest.MustGather(t, reg)
|
||||
expectedLabels := map[string]string{
|
||||
"org": "deadbeefbeefdead",
|
||||
"key1": "value1",
|
||||
"op": "readFilter",
|
||||
}
|
||||
m := promtest.MustFindMetric(t, mfs, "query_influxdb_source_read_request_duration_seconds", expectedLabels)
|
||||
if want, got := uint64(1), *(m.Histogram.SampleCount); want != got {
|
||||
t.Fatalf("expected sample count of %v, got %v", want, got)
|
||||
}
|
||||
}
|
|
@ -11,6 +11,7 @@ import (
|
|||
platform "github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/tsdb/cursors"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type HostLookup interface {
|
||||
|
@ -32,6 +33,7 @@ type Dependencies struct {
|
|||
Reader Reader
|
||||
BucketLookup BucketLookup
|
||||
OrganizationLookup OrganizationLookup
|
||||
Metrics *metrics
|
||||
}
|
||||
|
||||
func (d Dependencies) Validate() error {
|
||||
|
@ -47,6 +49,14 @@ func (d Dependencies) Validate() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// PrometheusCollectors satisfies the PrometheusCollector interface.
|
||||
func (d Dependencies) PrometheusCollectors() []prometheus.Collector {
|
||||
if d.Metrics != nil {
|
||||
return d.Metrics.PrometheusCollectors()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type StaticLookup struct {
|
||||
hosts []string
|
||||
}
|
||||
|
|
|
@ -27,12 +27,13 @@ func AddControllerConfigDependencies(
|
|||
) error {
|
||||
bucketLookupSvc := query.FromBucketService(bucketSvc)
|
||||
orgLookupSvc := query.FromOrganizationService(orgSvc)
|
||||
err := influxdb.InjectFromDependencies(cc.ExecutorDependencies, influxdb.Dependencies{
|
||||
metrics := influxdb.NewMetrics(cc.MetricLabelKeys)
|
||||
if err := influxdb.InjectFromDependencies(cc.ExecutorDependencies, influxdb.Dependencies{
|
||||
Reader: reads.NewReader(newStore(engine)),
|
||||
BucketLookup: bucketLookupSvc,
|
||||
OrganizationLookup: orgLookupSvc,
|
||||
})
|
||||
if err != nil {
|
||||
Metrics: metrics,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue