From 2d00eb18c52e4e49503bb71324c11bbb5a03ec6b Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Sun, 20 Jan 2019 14:10:40 -0600 Subject: [PATCH] feat(telemetry): add telemetry handler --- cmd/influxd/launcher/launcher.go | 40 +--- cmd/influxd/main.go | 5 +- telemetry/README.md | 8 + telemetry/handler.go | 134 +++++++++++++ telemetry/handler_test.go | 311 +++++++++++++++++++++++++++++++ telemetry/push.go | 4 +- telemetry/reporter.go | 55 ++++++ telemetry/reporter_test.go | 80 ++++++++ telemetry/store.go | 40 ++++ telemetry/telemetry_test.go | 20 ++ telemetry/timestamps.go | 36 ++++ telemetry/timestamps_test.go | 57 ++++++ 12 files changed, 757 insertions(+), 33 deletions(-) create mode 100644 telemetry/README.md create mode 100644 telemetry/handler.go create mode 100644 telemetry/handler_test.go create mode 100644 telemetry/reporter.go create mode 100644 telemetry/reporter_test.go create mode 100644 telemetry/store.go create mode 100644 telemetry/telemetry_test.go create mode 100644 telemetry/timestamps.go create mode 100644 telemetry/timestamps_test.go diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 84927bc9d2..3ae9974e9a 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -38,7 +38,6 @@ import ( taskbolt "github.com/influxdata/influxdb/task/backend/bolt" "github.com/influxdata/influxdb/task/backend/coordinator" taskexecutor "github.com/influxdata/influxdb/task/backend/executor" - "github.com/influxdata/influxdb/telemetry" _ "github.com/influxdata/influxdb/tsdb/tsi1" // needed for tsi1 _ "github.com/influxdata/influxdb/tsdb/tsm1" // needed for tsm1 "github.com/influxdata/influxdb/vault" @@ -108,6 +107,16 @@ func (m *Launcher) ReportingDisabled() bool { return m.reportingDisabled } +// Registry returns the prometheus metrics registry. +func (m *Launcher) Registry() *prom.Registry { + return m.reg +} + +// Logger returns the launchers logger. +func (m *Launcher) Logger() *zap.Logger { + return m.logger +} + // SetBuild adds version, commit, and date to prometheus metrics. func (m *Launcher) SetBuild(version, commit, date string) { m.BuildInfo.Version = version @@ -512,32 +521,3 @@ func (m *Launcher) run(ctx context.Context) (err error) { return nil } - -// ReportUsageStats starts periodic server reporting. -func (m *Launcher) ReportUsageStats(ctx context.Context, interval time.Duration) { - pusher := telemetry.NewUsagePusher(m.reg) - logger := m.logger.With( - zap.String("service", "reporting"), - influxlogger.DurationLiteral("interval", interval), - ) - - logger.Info("Starting") - if err := pusher.Push(ctx); err != nil { - logger.Debug("failure pushing usage metrics", zap.Error(err)) - } - - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - logger.Debug("Pushing") - if err := pusher.Push(ctx); err != nil { - logger.Debug("failure pushing reporting usage", zap.Error(err)) - } - case <-ctx.Done(): - logger.Info("Stopping") - return - } - } -} diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index eb16a6a6db..9cb840336a 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/influxdb/cmd/influxd/launcher" "github.com/influxdata/influxdb/kit/signals" _ "github.com/influxdata/influxdb/query/builtin" + "github.com/influxdata/influxdb/telemetry" _ "github.com/influxdata/influxdb/tsdb/tsi1" _ "github.com/influxdata/influxdb/tsdb/tsm1" ) @@ -35,7 +36,9 @@ func main() { } if !m.ReportingDisabled() { - go m.ReportUsageStats(ctx, 8*time.Hour) + reporter := telemetry.NewReporter(m.Registry()) + reporter.Interval = 8 * time.Hour + reporter.Logger = m.Logger() } <-ctx.Done() diff --git a/telemetry/README.md b/telemetry/README.md new file mode 100644 index 0000000000..c14a2c1efd --- /dev/null +++ b/telemetry/README.md @@ -0,0 +1,8 @@ +## Telemetry Data + +Telemetry is first collected by retrieving prometheus data from a Gatherer. +Next, the collected data is filtered by matching a subset of prometheus families. +Finally, the data is transmitted to a prometheus push gateway handler. + +The handler enriches the metrics with the timestamp when the data is +received. diff --git a/telemetry/handler.go b/telemetry/handler.go new file mode 100644 index 0000000000..ea0fb22078 --- /dev/null +++ b/telemetry/handler.go @@ -0,0 +1,134 @@ +package telemetry + +import ( + "context" + "fmt" + "io" + "net/http" + "time" + + "go.uber.org/zap" + + "github.com/influxdata/influxdb/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" +) + +const ( + // DefaultTimeout is the length of time servicing the metrics before canceling. + DefaultTimeout = 10 * time.Second + // DefaultMaxBytes is the largest request body read. + DefaultMaxBytes = 1024000 +) + +var ( + // ErrMetricsTimestampPresent is returned when the prometheus metrics has timestamps set. + // Not sure why, but, pushgateway does not allow timestamps. + ErrMetricsTimestampPresent = fmt.Errorf("pushed metrics must not have timestamp") +) + +// PushGateway handles receiving prometheus push metrics and forwards them to the Store. +type PushGateway struct { + Timeout time.Duration // handler returns after this duration with an error; defaults to 5 seconds + MaxBytes int64 // maximum number of bytes to read from the body; defaults to 1024000 + Logger *zap.Logger + + Store Store + Transformers []prometheus.Transformer +} + +// NewPushGateway constructs the PushGateway. +func NewPushGateway(logger *zap.Logger, store Store, xforms ...prometheus.Transformer) *PushGateway { + if len(xforms) == 0 { + xforms = append(xforms, &AddTimestamps{}) + } + return &PushGateway{ + Store: store, + Transformers: xforms, + Logger: logger, + Timeout: DefaultTimeout, + MaxBytes: DefaultMaxBytes, + } +} + +// Handler accepts prometheus metrics send via the Push client and sends those +// metrics into the store. +func (p *PushGateway) Handler(w http.ResponseWriter, r *http.Request) { + if p.Timeout == 0 { + p.Timeout = DefaultTimeout + } + + if p.MaxBytes == 0 { + p.MaxBytes = DefaultMaxBytes + } + + ctx, cancel := context.WithTimeout( + r.Context(), + p.Timeout, + ) + defer cancel() + + r = r.WithContext(ctx) + + mfs, err := decodePostMetricsRequest(r, p.MaxBytes) + if err != nil { + p.Logger.Error("unable to decode metrics", zap.Error(err)) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := valid(mfs); err != nil { + p.Logger.Error("invalid metrics", zap.Error(err)) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + for _, transformer := range p.Transformers { + mfs = transformer.Transform(mfs) + } + + data, err := prometheus.EncodeJSON(mfs) + if err != nil { + p.Logger.Error("unable to encode metric families", zap.Error(err)) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if err := p.Store.WriteMessage(ctx, data); err != nil { + p.Logger.Error("unable to write to store", zap.Error(err)) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusAccepted) +} + +func decodePostMetricsRequest(req *http.Request, maxBytes int64) ([]*dto.MetricFamily, error) { + format := expfmt.ResponseFormat(req.Header) + if format == expfmt.FmtUnknown { + return nil, fmt.Errorf("unknown format metrics format") + } + + // protect against reading too many bytes + r := io.LimitReader(req.Body, maxBytes) + defer req.Body.Close() + + mfs, err := prometheus.DecodeExpfmt(r, format) + if err != nil { + return nil, err + } + return mfs, nil +} + +// prom's pushgateway does not allow timestamps for some reason. +func valid(mfs []*dto.MetricFamily) error { + // Checks if any timestamps have been specified. + for i := range mfs { + for j := range mfs[i].Metric { + if mfs[i].Metric[j].TimestampMs != nil { + return ErrMetricsTimestampPresent + } + } + } + return nil +} diff --git a/telemetry/handler_test.go b/telemetry/handler_test.go new file mode 100644 index 0000000000..0263f76a1b --- /dev/null +++ b/telemetry/handler_test.go @@ -0,0 +1,311 @@ +package telemetry + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "reflect" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + pr "github.com/influxdata/influxdb/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "go.uber.org/zap/zaptest" +) + +func TestPushGateway_Handler(t *testing.T) { + type fields struct { + Store *mockStore + now func() time.Time + } + type args struct { + w *httptest.ResponseRecorder + r *http.Request + } + tests := []struct { + name string + fields fields + args args + contentType string + wantStatus int + want []byte + }{ + + { + name: "unknown content-type is a bad request", + fields: fields{ + Store: &mockStore{}, + }, + args: args{ + w: httptest.NewRecorder(), + r: httptest.NewRequest("POST", "/", nil), + }, + wantStatus: http.StatusBadRequest, + }, + + { + name: "bad metric with timestamp is a bad request", + fields: fields{ + Store: &mockStore{}, + now: func() time.Time { return time.Unix(0, 0) }, + }, + args: args{ + w: httptest.NewRecorder(), + r: httptest.NewRequest("POST", "/", + mustEncode(t, + []*dto.MetricFamily{badMetric()}, + ), + ), + }, + contentType: string(expfmt.FmtProtoDelim), + wantStatus: http.StatusBadRequest, + }, + { + name: "store error is an internal server error", + fields: fields{ + Store: &mockStore{ + err: fmt.Errorf("e1"), + }, + now: func() time.Time { return time.Unix(0, 0) }, + }, + args: args{ + w: httptest.NewRecorder(), + r: httptest.NewRequest("POST", "/", + mustEncode(t, + []*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))}, + ), + ), + }, + contentType: string(expfmt.FmtProtoDelim), + wantStatus: http.StatusInternalServerError, + want: []byte(`[{"name":"mf1","type":0,"metric":[{"label":[{"name":"n1","value":"v1"}],"counter":{"value":1},"timestamp_ms":0}]}]`), + }, + { + name: "metric store in store", + fields: fields{ + Store: &mockStore{}, + now: func() time.Time { return time.Unix(0, 0) }, + }, + args: args{ + w: httptest.NewRecorder(), + r: httptest.NewRequest("POST", "/", + mustEncode(t, + []*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))}, + ), + ), + }, + contentType: string(expfmt.FmtProtoDelim), + wantStatus: http.StatusAccepted, + want: []byte(`[{"name":"mf1","type":0,"metric":[{"label":[{"name":"n1","value":"v1"}],"counter":{"value":1},"timestamp_ms":0}]}]`), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := NewPushGateway( + zaptest.NewLogger(t), + tt.fields.Store, + &AddTimestamps{ + now: tt.fields.now, + }, + ) + tt.args.r.Header.Set("Content-Type", tt.contentType) + p.Handler(tt.args.w, tt.args.r) + + if tt.args.w.Code != http.StatusAccepted { + t.Logf("Body: %s", tt.args.w.Body.String()) + } + if got, want := tt.args.w.Code, tt.wantStatus; got != want { + t.Errorf("PushGateway.Handler() StatusCode = %v, want %v", got, want) + } + + if got, want := tt.fields.Store.data, tt.want; string(got) != string(want) { + t.Errorf("PushGateway.Handler() Data = %s, want %s", got, want) + } + }) + } +} + +func Test_decodePostMetricsRequest(t *testing.T) { + type args struct { + req *http.Request + maxBytes int64 + } + tests := []struct { + name string + args args + contentType string + want []*dto.MetricFamily + wantErr bool + }{ + { + name: "invalid response format is an error", + args: args{ + req: httptest.NewRequest("POST", "/", nil), + }, + wantErr: true, + }, + { + name: "bad body returns no metrics", + args: args{ + req: httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte{0x10})), + maxBytes: 10, + }, + contentType: string(expfmt.FmtProtoDelim), + want: []*dto.MetricFamily{}, + }, + { + name: "no body returns no metrics", + args: args{ + req: httptest.NewRequest("POST", "/", nil), + maxBytes: 10, + }, + contentType: string(expfmt.FmtProtoDelim), + want: []*dto.MetricFamily{}, + }, + { + name: "metrics are returned from POST", + args: args{ + req: httptest.NewRequest("POST", "/", + mustEncode(t, + []*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))}, + ), + ), + maxBytes: 31, + }, + contentType: string(expfmt.FmtProtoDelim), + want: []*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))}, + }, + { + name: "max bytes limits on record boundary returns a single record", + args: args{ + req: httptest.NewRequest("POST", "/", + mustEncode(t, + []*dto.MetricFamily{ + NewCounter("mf1", 1.0, pr.L("n1", "v1")), + NewCounter("mf2", 1.0, pr.L("n2", "v2")), + }, + ), + ), + maxBytes: 31, + }, + contentType: string(expfmt.FmtProtoDelim), + want: []*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))}, + }, + { + name: "exceeding max bytes returns an error", + args: args{ + req: httptest.NewRequest("POST", "/", + mustEncode(t, + []*dto.MetricFamily{ + NewCounter("mf1", 1.0, pr.L("n1", "v1")), + NewCounter("mf2", 1.0, pr.L("n2", "v2")), + }, + ), + ), + maxBytes: 33, + }, + contentType: string(expfmt.FmtProtoDelim), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.args.req.Header.Set("Content-Type", tt.contentType) + got, err := decodePostMetricsRequest(tt.args.req, tt.args.maxBytes) + if (err != nil) != tt.wantErr { + t.Errorf("decodePostMetricsRequest() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("decodePostMetricsRequest() = %v, want %v", got, tt.want) + } + }) + } +} + +func badMetric() *dto.MetricFamily { + return &dto.MetricFamily{ + Name: proto.String("bad"), + Type: dto.MetricType_COUNTER.Enum(), + Metric: []*dto.Metric{ + &dto.Metric{ + Label: []*dto.LabelPair{pr.L("n1", "v1")}, + Counter: &dto.Counter{ + Value: proto.Float64(1.0), + }, + TimestampMs: proto.Int64(1), + }, + }, + } +} + +func goodMetric() *dto.MetricFamily { + return &dto.MetricFamily{ + Name: proto.String("good"), + Type: dto.MetricType_COUNTER.Enum(), + Metric: []*dto.Metric{ + &dto.Metric{ + Label: []*dto.LabelPair{pr.L("n1", "v1")}, + Counter: &dto.Counter{ + Value: proto.Float64(1.0), + }, + }, + }, + } +} + +func Test_valid(t *testing.T) { + type args struct { + mfs []*dto.MetricFamily + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "metric with timestamp is invalid", + args: args{ + mfs: []*dto.MetricFamily{badMetric()}, + }, + wantErr: true, + }, + { + name: "metric without timestamp is valid", + args: args{ + mfs: []*dto.MetricFamily{goodMetric()}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := valid(tt.args.mfs); (err != nil) != tt.wantErr { + t.Errorf("valid() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +type mockStore struct { + data []byte + err error +} + +func (m *mockStore) WriteMessage(ctx context.Context, data []byte) error { + m.data = data + return m.err + +} + +func mustEncode(t *testing.T, mfs []*dto.MetricFamily) io.Reader { + b, err := pr.EncodeExpfmt(mfs) + if err != nil { + t.Fatalf("unable to encode %v", err) + } + return bytes.NewBuffer(b) +} diff --git a/telemetry/push.go b/telemetry/push.go index f84a04f1ea..c612d82850 100644 --- a/telemetry/push.go +++ b/telemetry/push.go @@ -21,8 +21,8 @@ type Pusher struct { Client *http.Client } -// NewUsagePusher sends usage metrics to influxcloud. -func NewUsagePusher(g prometheus.Gatherer) *Pusher { +// NewPusher sends usage metrics to a prometheus push gateway. +func NewPusher(g prometheus.Gatherer) *Pusher { return &Pusher{ //URL: "https://0czqg3djc8.execute-api.us-east-1.amazonaws.com/prod", URL: "http://localhost:8080/metrics/job/influxdb", diff --git a/telemetry/reporter.go b/telemetry/reporter.go new file mode 100644 index 0000000000..75901d59c9 --- /dev/null +++ b/telemetry/reporter.go @@ -0,0 +1,55 @@ +package telemetry + +import ( + "context" + "time" + + influxlogger "github.com/influxdata/influxdb/logger" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +// Reporter reports telemetry metrics to a prometheus push +// gateway every interval. +type Reporter struct { + Pusher *Pusher + Logger *zap.Logger + Interval time.Duration +} + +// NewReporter reports telemetry every 24 hours. +func NewReporter(g prometheus.Gatherer) *Reporter { + return &Reporter{ + Pusher: NewPusher(g), + Logger: zap.NewNop(), + Interval: 24 * time.Hour, + } +} + +// Report starts periodic telemetry reporting each interval. +func (r *Reporter) Report(ctx context.Context) { + logger := r.Logger.With( + zap.String("service", "telemetry"), + influxlogger.DurationLiteral("interval", r.Interval), + ) + + logger.Info("Starting") + if err := r.Pusher.Push(ctx); err != nil { + logger.Debug("failure reporting telemetry metrics", zap.Error(err)) + } + + ticker := time.NewTicker(r.Interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + logger.Debug("Reporting") + if err := r.Pusher.Push(ctx); err != nil { + logger.Debug("failure reporting telemetry metrics", zap.Error(err)) + } + case <-ctx.Done(): + logger.Info("Stopping") + return + } + } +} diff --git a/telemetry/reporter_test.go b/telemetry/reporter_test.go new file mode 100644 index 0000000000..0c785f14d0 --- /dev/null +++ b/telemetry/reporter_test.go @@ -0,0 +1,80 @@ +package telemetry + +import ( + "context" + "net/http" + "net/http/httptest" + "reflect" + "sync" + "testing" + "time" + + pr "github.com/influxdata/influxdb/prometheus" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "go.uber.org/zap/zaptest" +) + +func TestReport(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + logger := zaptest.NewLogger(t) + store := newReportingStore() + timestamps := &AddTimestamps{ + now: func() time.Time { + return time.Unix(0, 0) + }, + } + + gw := NewPushGateway(logger, store, timestamps) + + ts := httptest.NewServer(http.HandlerFunc(gw.Handler)) + defer ts.Close() + + mfs := []*dto.MetricFamily{NewCounter("influxdb_buckets_total", 1.0)} + gatherer := prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { + return mfs, nil + }) + + pusher := NewPusher(gatherer) + pusher.URL = ts.URL + + reporter := &Reporter{ + Pusher: pusher, + Logger: logger, + Interval: 30 * time.Second, + } + + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + go func() { + defer wg.Done() + reporter.Report(ctx) + }() + + got := <-store.ch + + // Encode to JSON to make it easier to compare + want, _ := pr.EncodeJSON(timestamps.Transform(mfs)) + if !reflect.DeepEqual(got, want) { + t.Errorf("Reporter.Report() = %s, want %s", got, want) + } + + cancel() +} + +func newReportingStore() *reportingStore { + return &reportingStore{ + ch: make(chan []byte, 1), + } +} + +type reportingStore struct { + ch chan []byte +} + +func (s *reportingStore) WriteMessage(ctx context.Context, data []byte) error { + s.ch <- data + return nil +} diff --git a/telemetry/store.go b/telemetry/store.go new file mode 100644 index 0000000000..3972a11d60 --- /dev/null +++ b/telemetry/store.go @@ -0,0 +1,40 @@ +package telemetry + +import ( + "bytes" + "context" + "encoding/json" + + "github.com/influxdata/influxdb/prometheus" + "go.uber.org/zap" +) + +// Store records usage data. +type Store interface { + // WriteMessage stores data into the store. + WriteMessage(ctx context.Context, data []byte) error +} + +var _ Store = (*LogStore)(nil) + +// LogStore logs data written to the store. +type LogStore struct { + Logger *zap.Logger +} + +// WriteMessage logs data at Info level. +func (s *LogStore) WriteMessage(ctx context.Context, data []byte) error { + buf := bytes.NewBuffer(data) + mfs, err := prometheus.DecodeJSON(buf) + if err != nil { + s.Logger.Error("error decoding metrics", zap.Error(err)) + return err + } + b, err := json.Marshal(mfs) + if err != nil { + s.Logger.Error("error marshaling metrics", zap.Error(err)) + return err + } + s.Logger.Info("write", zap.String("data", string(b))) + return nil +} diff --git a/telemetry/telemetry_test.go b/telemetry/telemetry_test.go new file mode 100644 index 0000000000..c88e85f0f3 --- /dev/null +++ b/telemetry/telemetry_test.go @@ -0,0 +1,20 @@ +package telemetry + +import ( + proto "github.com/golang/protobuf/proto" + dto "github.com/prometheus/client_model/go" +) + +func NewCounter(name string, v float64, ls ...*dto.LabelPair) *dto.MetricFamily { + m := &dto.Metric{ + Label: ls, + Counter: &dto.Counter{ + Value: &v, + }, + } + return &dto.MetricFamily{ + Name: proto.String(name), + Type: dto.MetricType_COUNTER.Enum(), + Metric: []*dto.Metric{m}, + } +} diff --git a/telemetry/timestamps.go b/telemetry/timestamps.go new file mode 100644 index 0000000000..d0944ef379 --- /dev/null +++ b/telemetry/timestamps.go @@ -0,0 +1,36 @@ +package telemetry + +import ( + "time" + + "github.com/influxdata/influxdb/prometheus" + dto "github.com/prometheus/client_model/go" +) + +const ( + // just in case the definition of time.Nanosecond changes from 1. + nsPerMillisecond = int64(time.Millisecond / time.Nanosecond) +) + +var _ prometheus.Transformer = (*AddTimestamps)(nil) + +// AddTimestamps enriches prometheus metrics by adding timestamps. +type AddTimestamps struct { + now func() time.Time +} + +// Transform adds now as a timestamp to all metrics. +func (a *AddTimestamps) Transform(mfs []*dto.MetricFamily) []*dto.MetricFamily { + now := a.now + if now == nil { + now = time.Now + } + nowMilliseconds := now().UnixNano() / nsPerMillisecond + + for i := range mfs { + for j := range mfs[i].Metric { + mfs[i].Metric[j].TimestampMs = &nowMilliseconds + } + } + return mfs +} diff --git a/telemetry/timestamps_test.go b/telemetry/timestamps_test.go new file mode 100644 index 0000000000..537bcd1e99 --- /dev/null +++ b/telemetry/timestamps_test.go @@ -0,0 +1,57 @@ +package telemetry + +import ( + "reflect" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + pr "github.com/influxdata/influxdb/prometheus" + dto "github.com/prometheus/client_model/go" +) + +func goodMetricWithTime() *dto.MetricFamily { + return &dto.MetricFamily{ + Name: proto.String("good"), + Type: dto.MetricType_COUNTER.Enum(), + Metric: []*dto.Metric{ + &dto.Metric{ + Label: []*dto.LabelPair{pr.L("n1", "v1")}, + Counter: &dto.Counter{ + Value: proto.Float64(1.0), + }, + TimestampMs: proto.Int64(1), + }, + }, + } +} + +func TestAddTimestamps(t *testing.T) { + type args struct { + mfs []*dto.MetricFamily + now func() time.Time + } + tests := []struct { + name string + args args + }{ + { + args: args{ + mfs: []*dto.MetricFamily{goodMetric()}, + now: func() time.Time { return time.Unix(0, int64(time.Millisecond)) }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts := AddTimestamps{ + now: tt.args.now, + } + got := ts.Transform(tt.args.mfs) + want := []*dto.MetricFamily{goodMetricWithTime()} + if !reflect.DeepEqual(got, want) { + t.Errorf("AddTimestamps.Transform() = %v, want %v", got, want) + } + }) + } +}