feat(telemetry): add telemetry handler

pull/11347/head
Chris Goller 2019-01-20 14:10:40 -06:00
parent 8d289a2818
commit 2d00eb18c5
12 changed files with 757 additions and 33 deletions

View File

@ -38,7 +38,6 @@ import (
taskbolt "github.com/influxdata/influxdb/task/backend/bolt" taskbolt "github.com/influxdata/influxdb/task/backend/bolt"
"github.com/influxdata/influxdb/task/backend/coordinator" "github.com/influxdata/influxdb/task/backend/coordinator"
taskexecutor "github.com/influxdata/influxdb/task/backend/executor" taskexecutor "github.com/influxdata/influxdb/task/backend/executor"
"github.com/influxdata/influxdb/telemetry"
_ "github.com/influxdata/influxdb/tsdb/tsi1" // needed for tsi1 _ "github.com/influxdata/influxdb/tsdb/tsi1" // needed for tsi1
_ "github.com/influxdata/influxdb/tsdb/tsm1" // needed for tsm1 _ "github.com/influxdata/influxdb/tsdb/tsm1" // needed for tsm1
"github.com/influxdata/influxdb/vault" "github.com/influxdata/influxdb/vault"
@ -108,6 +107,16 @@ func (m *Launcher) ReportingDisabled() bool {
return m.reportingDisabled return m.reportingDisabled
} }
// Registry returns the prometheus metrics registry.
func (m *Launcher) Registry() *prom.Registry {
return m.reg
}
// Logger returns the launchers logger.
func (m *Launcher) Logger() *zap.Logger {
return m.logger
}
// SetBuild adds version, commit, and date to prometheus metrics. // SetBuild adds version, commit, and date to prometheus metrics.
func (m *Launcher) SetBuild(version, commit, date string) { func (m *Launcher) SetBuild(version, commit, date string) {
m.BuildInfo.Version = version m.BuildInfo.Version = version
@ -512,32 +521,3 @@ func (m *Launcher) run(ctx context.Context) (err error) {
return nil return nil
} }
// ReportUsageStats starts periodic server reporting.
func (m *Launcher) ReportUsageStats(ctx context.Context, interval time.Duration) {
pusher := telemetry.NewUsagePusher(m.reg)
logger := m.logger.With(
zap.String("service", "reporting"),
influxlogger.DurationLiteral("interval", interval),
)
logger.Info("Starting")
if err := pusher.Push(ctx); err != nil {
logger.Debug("failure pushing usage metrics", zap.Error(err))
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
logger.Debug("Pushing")
if err := pusher.Push(ctx); err != nil {
logger.Debug("failure pushing reporting usage", zap.Error(err))
}
case <-ctx.Done():
logger.Info("Stopping")
return
}
}
}

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/influxdb/cmd/influxd/launcher" "github.com/influxdata/influxdb/cmd/influxd/launcher"
"github.com/influxdata/influxdb/kit/signals" "github.com/influxdata/influxdb/kit/signals"
_ "github.com/influxdata/influxdb/query/builtin" _ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/telemetry"
_ "github.com/influxdata/influxdb/tsdb/tsi1" _ "github.com/influxdata/influxdb/tsdb/tsi1"
_ "github.com/influxdata/influxdb/tsdb/tsm1" _ "github.com/influxdata/influxdb/tsdb/tsm1"
) )
@ -35,7 +36,9 @@ func main() {
} }
if !m.ReportingDisabled() { if !m.ReportingDisabled() {
go m.ReportUsageStats(ctx, 8*time.Hour) reporter := telemetry.NewReporter(m.Registry())
reporter.Interval = 8 * time.Hour
reporter.Logger = m.Logger()
} }
<-ctx.Done() <-ctx.Done()

8
telemetry/README.md Normal file
View File

@ -0,0 +1,8 @@
## Telemetry Data
Telemetry is first collected by retrieving prometheus data from a Gatherer.
Next, the collected data is filtered by matching a subset of prometheus families.
Finally, the data is transmitted to a prometheus push gateway handler.
The handler enriches the metrics with the timestamp when the data is
received.

134
telemetry/handler.go Normal file
View File

@ -0,0 +1,134 @@
package telemetry
import (
"context"
"fmt"
"io"
"net/http"
"time"
"go.uber.org/zap"
"github.com/influxdata/influxdb/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
const (
// DefaultTimeout is the length of time servicing the metrics before canceling.
DefaultTimeout = 10 * time.Second
// DefaultMaxBytes is the largest request body read.
DefaultMaxBytes = 1024000
)
var (
// ErrMetricsTimestampPresent is returned when the prometheus metrics has timestamps set.
// Not sure why, but, pushgateway does not allow timestamps.
ErrMetricsTimestampPresent = fmt.Errorf("pushed metrics must not have timestamp")
)
// PushGateway handles receiving prometheus push metrics and forwards them to the Store.
type PushGateway struct {
Timeout time.Duration // handler returns after this duration with an error; defaults to 5 seconds
MaxBytes int64 // maximum number of bytes to read from the body; defaults to 1024000
Logger *zap.Logger
Store Store
Transformers []prometheus.Transformer
}
// NewPushGateway constructs the PushGateway.
func NewPushGateway(logger *zap.Logger, store Store, xforms ...prometheus.Transformer) *PushGateway {
if len(xforms) == 0 {
xforms = append(xforms, &AddTimestamps{})
}
return &PushGateway{
Store: store,
Transformers: xforms,
Logger: logger,
Timeout: DefaultTimeout,
MaxBytes: DefaultMaxBytes,
}
}
// Handler accepts prometheus metrics send via the Push client and sends those
// metrics into the store.
func (p *PushGateway) Handler(w http.ResponseWriter, r *http.Request) {
if p.Timeout == 0 {
p.Timeout = DefaultTimeout
}
if p.MaxBytes == 0 {
p.MaxBytes = DefaultMaxBytes
}
ctx, cancel := context.WithTimeout(
r.Context(),
p.Timeout,
)
defer cancel()
r = r.WithContext(ctx)
mfs, err := decodePostMetricsRequest(r, p.MaxBytes)
if err != nil {
p.Logger.Error("unable to decode metrics", zap.Error(err))
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := valid(mfs); err != nil {
p.Logger.Error("invalid metrics", zap.Error(err))
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
for _, transformer := range p.Transformers {
mfs = transformer.Transform(mfs)
}
data, err := prometheus.EncodeJSON(mfs)
if err != nil {
p.Logger.Error("unable to encode metric families", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := p.Store.WriteMessage(ctx, data); err != nil {
p.Logger.Error("unable to write to store", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusAccepted)
}
func decodePostMetricsRequest(req *http.Request, maxBytes int64) ([]*dto.MetricFamily, error) {
format := expfmt.ResponseFormat(req.Header)
if format == expfmt.FmtUnknown {
return nil, fmt.Errorf("unknown format metrics format")
}
// protect against reading too many bytes
r := io.LimitReader(req.Body, maxBytes)
defer req.Body.Close()
mfs, err := prometheus.DecodeExpfmt(r, format)
if err != nil {
return nil, err
}
return mfs, nil
}
// prom's pushgateway does not allow timestamps for some reason.
func valid(mfs []*dto.MetricFamily) error {
// Checks if any timestamps have been specified.
for i := range mfs {
for j := range mfs[i].Metric {
if mfs[i].Metric[j].TimestampMs != nil {
return ErrMetricsTimestampPresent
}
}
}
return nil
}

311
telemetry/handler_test.go Normal file
View File

@ -0,0 +1,311 @@
package telemetry
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"
"github.com/gogo/protobuf/proto"
pr "github.com/influxdata/influxdb/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"go.uber.org/zap/zaptest"
)
func TestPushGateway_Handler(t *testing.T) {
type fields struct {
Store *mockStore
now func() time.Time
}
type args struct {
w *httptest.ResponseRecorder
r *http.Request
}
tests := []struct {
name string
fields fields
args args
contentType string
wantStatus int
want []byte
}{
{
name: "unknown content-type is a bad request",
fields: fields{
Store: &mockStore{},
},
args: args{
w: httptest.NewRecorder(),
r: httptest.NewRequest("POST", "/", nil),
},
wantStatus: http.StatusBadRequest,
},
{
name: "bad metric with timestamp is a bad request",
fields: fields{
Store: &mockStore{},
now: func() time.Time { return time.Unix(0, 0) },
},
args: args{
w: httptest.NewRecorder(),
r: httptest.NewRequest("POST", "/",
mustEncode(t,
[]*dto.MetricFamily{badMetric()},
),
),
},
contentType: string(expfmt.FmtProtoDelim),
wantStatus: http.StatusBadRequest,
},
{
name: "store error is an internal server error",
fields: fields{
Store: &mockStore{
err: fmt.Errorf("e1"),
},
now: func() time.Time { return time.Unix(0, 0) },
},
args: args{
w: httptest.NewRecorder(),
r: httptest.NewRequest("POST", "/",
mustEncode(t,
[]*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))},
),
),
},
contentType: string(expfmt.FmtProtoDelim),
wantStatus: http.StatusInternalServerError,
want: []byte(`[{"name":"mf1","type":0,"metric":[{"label":[{"name":"n1","value":"v1"}],"counter":{"value":1},"timestamp_ms":0}]}]`),
},
{
name: "metric store in store",
fields: fields{
Store: &mockStore{},
now: func() time.Time { return time.Unix(0, 0) },
},
args: args{
w: httptest.NewRecorder(),
r: httptest.NewRequest("POST", "/",
mustEncode(t,
[]*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))},
),
),
},
contentType: string(expfmt.FmtProtoDelim),
wantStatus: http.StatusAccepted,
want: []byte(`[{"name":"mf1","type":0,"metric":[{"label":[{"name":"n1","value":"v1"}],"counter":{"value":1},"timestamp_ms":0}]}]`),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := NewPushGateway(
zaptest.NewLogger(t),
tt.fields.Store,
&AddTimestamps{
now: tt.fields.now,
},
)
tt.args.r.Header.Set("Content-Type", tt.contentType)
p.Handler(tt.args.w, tt.args.r)
if tt.args.w.Code != http.StatusAccepted {
t.Logf("Body: %s", tt.args.w.Body.String())
}
if got, want := tt.args.w.Code, tt.wantStatus; got != want {
t.Errorf("PushGateway.Handler() StatusCode = %v, want %v", got, want)
}
if got, want := tt.fields.Store.data, tt.want; string(got) != string(want) {
t.Errorf("PushGateway.Handler() Data = %s, want %s", got, want)
}
})
}
}
func Test_decodePostMetricsRequest(t *testing.T) {
type args struct {
req *http.Request
maxBytes int64
}
tests := []struct {
name string
args args
contentType string
want []*dto.MetricFamily
wantErr bool
}{
{
name: "invalid response format is an error",
args: args{
req: httptest.NewRequest("POST", "/", nil),
},
wantErr: true,
},
{
name: "bad body returns no metrics",
args: args{
req: httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte{0x10})),
maxBytes: 10,
},
contentType: string(expfmt.FmtProtoDelim),
want: []*dto.MetricFamily{},
},
{
name: "no body returns no metrics",
args: args{
req: httptest.NewRequest("POST", "/", nil),
maxBytes: 10,
},
contentType: string(expfmt.FmtProtoDelim),
want: []*dto.MetricFamily{},
},
{
name: "metrics are returned from POST",
args: args{
req: httptest.NewRequest("POST", "/",
mustEncode(t,
[]*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))},
),
),
maxBytes: 31,
},
contentType: string(expfmt.FmtProtoDelim),
want: []*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))},
},
{
name: "max bytes limits on record boundary returns a single record",
args: args{
req: httptest.NewRequest("POST", "/",
mustEncode(t,
[]*dto.MetricFamily{
NewCounter("mf1", 1.0, pr.L("n1", "v1")),
NewCounter("mf2", 1.0, pr.L("n2", "v2")),
},
),
),
maxBytes: 31,
},
contentType: string(expfmt.FmtProtoDelim),
want: []*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))},
},
{
name: "exceeding max bytes returns an error",
args: args{
req: httptest.NewRequest("POST", "/",
mustEncode(t,
[]*dto.MetricFamily{
NewCounter("mf1", 1.0, pr.L("n1", "v1")),
NewCounter("mf2", 1.0, pr.L("n2", "v2")),
},
),
),
maxBytes: 33,
},
contentType: string(expfmt.FmtProtoDelim),
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.args.req.Header.Set("Content-Type", tt.contentType)
got, err := decodePostMetricsRequest(tt.args.req, tt.args.maxBytes)
if (err != nil) != tt.wantErr {
t.Errorf("decodePostMetricsRequest() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("decodePostMetricsRequest() = %v, want %v", got, tt.want)
}
})
}
}
func badMetric() *dto.MetricFamily {
return &dto.MetricFamily{
Name: proto.String("bad"),
Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{
&dto.Metric{
Label: []*dto.LabelPair{pr.L("n1", "v1")},
Counter: &dto.Counter{
Value: proto.Float64(1.0),
},
TimestampMs: proto.Int64(1),
},
},
}
}
func goodMetric() *dto.MetricFamily {
return &dto.MetricFamily{
Name: proto.String("good"),
Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{
&dto.Metric{
Label: []*dto.LabelPair{pr.L("n1", "v1")},
Counter: &dto.Counter{
Value: proto.Float64(1.0),
},
},
},
}
}
func Test_valid(t *testing.T) {
type args struct {
mfs []*dto.MetricFamily
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "metric with timestamp is invalid",
args: args{
mfs: []*dto.MetricFamily{badMetric()},
},
wantErr: true,
},
{
name: "metric without timestamp is valid",
args: args{
mfs: []*dto.MetricFamily{goodMetric()},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := valid(tt.args.mfs); (err != nil) != tt.wantErr {
t.Errorf("valid() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
type mockStore struct {
data []byte
err error
}
func (m *mockStore) WriteMessage(ctx context.Context, data []byte) error {
m.data = data
return m.err
}
func mustEncode(t *testing.T, mfs []*dto.MetricFamily) io.Reader {
b, err := pr.EncodeExpfmt(mfs)
if err != nil {
t.Fatalf("unable to encode %v", err)
}
return bytes.NewBuffer(b)
}

View File

@ -21,8 +21,8 @@ type Pusher struct {
Client *http.Client Client *http.Client
} }
// NewUsagePusher sends usage metrics to influxcloud. // NewPusher sends usage metrics to a prometheus push gateway.
func NewUsagePusher(g prometheus.Gatherer) *Pusher { func NewPusher(g prometheus.Gatherer) *Pusher {
return &Pusher{ return &Pusher{
//URL: "https://0czqg3djc8.execute-api.us-east-1.amazonaws.com/prod", //URL: "https://0czqg3djc8.execute-api.us-east-1.amazonaws.com/prod",
URL: "http://localhost:8080/metrics/job/influxdb", URL: "http://localhost:8080/metrics/job/influxdb",

55
telemetry/reporter.go Normal file
View File

@ -0,0 +1,55 @@
package telemetry
import (
"context"
"time"
influxlogger "github.com/influxdata/influxdb/logger"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
// Reporter reports telemetry metrics to a prometheus push
// gateway every interval.
type Reporter struct {
Pusher *Pusher
Logger *zap.Logger
Interval time.Duration
}
// NewReporter reports telemetry every 24 hours.
func NewReporter(g prometheus.Gatherer) *Reporter {
return &Reporter{
Pusher: NewPusher(g),
Logger: zap.NewNop(),
Interval: 24 * time.Hour,
}
}
// Report starts periodic telemetry reporting each interval.
func (r *Reporter) Report(ctx context.Context) {
logger := r.Logger.With(
zap.String("service", "telemetry"),
influxlogger.DurationLiteral("interval", r.Interval),
)
logger.Info("Starting")
if err := r.Pusher.Push(ctx); err != nil {
logger.Debug("failure reporting telemetry metrics", zap.Error(err))
}
ticker := time.NewTicker(r.Interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
logger.Debug("Reporting")
if err := r.Pusher.Push(ctx); err != nil {
logger.Debug("failure reporting telemetry metrics", zap.Error(err))
}
case <-ctx.Done():
logger.Info("Stopping")
return
}
}
}

View File

@ -0,0 +1,80 @@
package telemetry
import (
"context"
"net/http"
"net/http/httptest"
"reflect"
"sync"
"testing"
"time"
pr "github.com/influxdata/influxdb/prometheus"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"go.uber.org/zap/zaptest"
)
func TestReport(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
logger := zaptest.NewLogger(t)
store := newReportingStore()
timestamps := &AddTimestamps{
now: func() time.Time {
return time.Unix(0, 0)
},
}
gw := NewPushGateway(logger, store, timestamps)
ts := httptest.NewServer(http.HandlerFunc(gw.Handler))
defer ts.Close()
mfs := []*dto.MetricFamily{NewCounter("influxdb_buckets_total", 1.0)}
gatherer := prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
return mfs, nil
})
pusher := NewPusher(gatherer)
pusher.URL = ts.URL
reporter := &Reporter{
Pusher: pusher,
Logger: logger,
Interval: 30 * time.Second,
}
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
go func() {
defer wg.Done()
reporter.Report(ctx)
}()
got := <-store.ch
// Encode to JSON to make it easier to compare
want, _ := pr.EncodeJSON(timestamps.Transform(mfs))
if !reflect.DeepEqual(got, want) {
t.Errorf("Reporter.Report() = %s, want %s", got, want)
}
cancel()
}
func newReportingStore() *reportingStore {
return &reportingStore{
ch: make(chan []byte, 1),
}
}
type reportingStore struct {
ch chan []byte
}
func (s *reportingStore) WriteMessage(ctx context.Context, data []byte) error {
s.ch <- data
return nil
}

40
telemetry/store.go Normal file
View File

@ -0,0 +1,40 @@
package telemetry
import (
"bytes"
"context"
"encoding/json"
"github.com/influxdata/influxdb/prometheus"
"go.uber.org/zap"
)
// Store records usage data.
type Store interface {
// WriteMessage stores data into the store.
WriteMessage(ctx context.Context, data []byte) error
}
var _ Store = (*LogStore)(nil)
// LogStore logs data written to the store.
type LogStore struct {
Logger *zap.Logger
}
// WriteMessage logs data at Info level.
func (s *LogStore) WriteMessage(ctx context.Context, data []byte) error {
buf := bytes.NewBuffer(data)
mfs, err := prometheus.DecodeJSON(buf)
if err != nil {
s.Logger.Error("error decoding metrics", zap.Error(err))
return err
}
b, err := json.Marshal(mfs)
if err != nil {
s.Logger.Error("error marshaling metrics", zap.Error(err))
return err
}
s.Logger.Info("write", zap.String("data", string(b)))
return nil
}

View File

@ -0,0 +1,20 @@
package telemetry
import (
proto "github.com/golang/protobuf/proto"
dto "github.com/prometheus/client_model/go"
)
func NewCounter(name string, v float64, ls ...*dto.LabelPair) *dto.MetricFamily {
m := &dto.Metric{
Label: ls,
Counter: &dto.Counter{
Value: &v,
},
}
return &dto.MetricFamily{
Name: proto.String(name),
Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{m},
}
}

36
telemetry/timestamps.go Normal file
View File

@ -0,0 +1,36 @@
package telemetry
import (
"time"
"github.com/influxdata/influxdb/prometheus"
dto "github.com/prometheus/client_model/go"
)
const (
// just in case the definition of time.Nanosecond changes from 1.
nsPerMillisecond = int64(time.Millisecond / time.Nanosecond)
)
var _ prometheus.Transformer = (*AddTimestamps)(nil)
// AddTimestamps enriches prometheus metrics by adding timestamps.
type AddTimestamps struct {
now func() time.Time
}
// Transform adds now as a timestamp to all metrics.
func (a *AddTimestamps) Transform(mfs []*dto.MetricFamily) []*dto.MetricFamily {
now := a.now
if now == nil {
now = time.Now
}
nowMilliseconds := now().UnixNano() / nsPerMillisecond
for i := range mfs {
for j := range mfs[i].Metric {
mfs[i].Metric[j].TimestampMs = &nowMilliseconds
}
}
return mfs
}

View File

@ -0,0 +1,57 @@
package telemetry
import (
"reflect"
"testing"
"time"
"github.com/gogo/protobuf/proto"
pr "github.com/influxdata/influxdb/prometheus"
dto "github.com/prometheus/client_model/go"
)
func goodMetricWithTime() *dto.MetricFamily {
return &dto.MetricFamily{
Name: proto.String("good"),
Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{
&dto.Metric{
Label: []*dto.LabelPair{pr.L("n1", "v1")},
Counter: &dto.Counter{
Value: proto.Float64(1.0),
},
TimestampMs: proto.Int64(1),
},
},
}
}
func TestAddTimestamps(t *testing.T) {
type args struct {
mfs []*dto.MetricFamily
now func() time.Time
}
tests := []struct {
name string
args args
}{
{
args: args{
mfs: []*dto.MetricFamily{goodMetric()},
now: func() time.Time { return time.Unix(0, int64(time.Millisecond)) },
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts := AddTimestamps{
now: tt.args.now,
}
got := ts.Transform(tt.args.mfs)
want := []*dto.MetricFamily{goodMetricWithTime()}
if !reflect.DeepEqual(got, want) {
t.Errorf("AddTimestamps.Transform() = %v, want %v", got, want)
}
})
}
}