From 781e305aff8e2adf9ee8c4a93ad9a5e86b3b0b3d Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Mon, 21 Jan 2019 22:02:24 -0600 Subject: [PATCH] feat(telemetry): encode metrics into store in various formats --- cmd/telemetryd/main.go | 4 + prometheus/codec.go | 159 ++++++++++++++++++++++++++++++++++++- telemetry/handler.go | 17 ++-- telemetry/handler_test.go | 2 +- telemetry/reporter_test.go | 2 +- 5 files changed, 170 insertions(+), 14 deletions(-) diff --git a/cmd/telemetryd/main.go b/cmd/telemetryd/main.go index 567333aefc..b7c896f6ab 100644 --- a/cmd/telemetryd/main.go +++ b/cmd/telemetryd/main.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/influxdb/kit/cli" influxlogger "github.com/influxdata/influxdb/logger" + "github.com/influxdata/influxdb/prometheus" "github.com/influxdata/influxdb/telemetry" "go.uber.org/zap" ) @@ -52,6 +53,9 @@ func run() error { Logger: logger, } svc := telemetry.NewPushGateway(logger, store) + // Print data as line protocol + svc.Encoder = &prometheus.LineProtocol{} + handler := http.HandlerFunc(svc.Handler) logger.Info("starting telemetryd server", zap.String("addr", addr)) diff --git a/prometheus/codec.go b/prometheus/codec.go index 07d20e252d..0c096ada71 100644 --- a/prometheus/codec.go +++ b/prometheus/codec.go @@ -4,11 +4,31 @@ import ( "bytes" "encoding/json" "io" + "math" + "strconv" + "time" + "github.com/influxdata/influxdb/models" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" ) +// Encoder transforms metric families into bytes. +type Encoder interface { + // Encode encodes metrics into bytes. + Encode(mfs []*dto.MetricFamily) ([]byte, error) +} + +// Expfmt is encodes metric familes into promtheus exposition format. +type Expfmt struct { + Format expfmt.Format +} + +// Encode encodes metrics into prometheus exposition format bytes. +func (e *Expfmt) Encode(mfs []*dto.MetricFamily) ([]byte, error) { + return EncodeExpfmt(mfs, e.Format) +} + // DecodeExpfmt decodes the reader of format into metric families. func DecodeExpfmt(r io.Reader, format expfmt.Format) ([]*dto.MetricFamily, error) { dec := expfmt.NewDecoder(r, format) @@ -31,7 +51,7 @@ func DecodeExpfmt(r io.Reader, format expfmt.Format) ([]*dto.MetricFamily, error // EncodeExpfmt encodes the metrics family (defaults to expfmt.FmtProtoDelim). func EncodeExpfmt(mfs []*dto.MetricFamily, opts ...expfmt.Format) ([]byte, error) { format := expfmt.FmtProtoDelim - if len(opts) != 0 { + if len(opts) != 0 && opts[0] != "" { format = opts[0] } buf := &bytes.Buffer{} @@ -44,6 +64,15 @@ func EncodeExpfmt(mfs []*dto.MetricFamily, opts ...expfmt.Format) ([]byte, error return buf.Bytes(), nil } +// JSON is encodes metric familes into JSON. +type JSON struct{} + +// Encode encodes metrics JSON bytes. This not always works +// as some prometheus values are NaN or Inf. +func (j *JSON) Encode(mfs []*dto.MetricFamily) ([]byte, error) { + return EncodeJSON(mfs) +} + // DecodeJSON decodes a JSON array of metrics families. func DecodeJSON(r io.Reader) ([]*dto.MetricFamily, error) { dec := json.NewDecoder(r) @@ -65,3 +94,131 @@ func DecodeJSON(r io.Reader) ([]*dto.MetricFamily, error) { func EncodeJSON(mfs []*dto.MetricFamily) ([]byte, error) { return json.Marshal(mfs) } + +const ( + // just in case the definition of time.Nanosecond changes from 1. + nsPerMilliseconds = int64(time.Millisecond / time.Nanosecond) +) + +// LineProtocol is encodes metric familes into influxdb lineprotocl. +type LineProtocol struct{} + +// Encode encodes metrics into line protocol format bytes. +func (l *LineProtocol) Encode(mfs []*dto.MetricFamily) ([]byte, error) { + return EncodeLineProtocol(mfs) +} + +// EncodeLineProtocol converts prometheus metrics into line protocol. +func EncodeLineProtocol(mfs []*dto.MetricFamily) ([]byte, error) { + var b bytes.Buffer + + pts := points(mfs) + for _, p := range pts { + if _, err := b.WriteString(p.String()); err != nil { + return nil, err + } + if err := b.WriteByte('\n'); err != nil { + return nil, err + } + } + return b.Bytes(), nil +} + +func points(mfs []*dto.MetricFamily) models.Points { + pts := make(models.Points, 0, len(mfs)) + for _, mf := range mfs { + mts := make(models.Points, 0, len(mf.Metric)) + name := mf.GetName() + for _, m := range mf.Metric { + ts := tags(m.Label) + fs := fields(mf.GetType(), m) + tm := timestamp(m) + + pt, err := models.NewPoint(name, ts, fs, tm) + if err != nil { + continue + } + mts = append(mts, pt) + } + pts = append(pts, mts...) + } + + return pts +} + +func timestamp(m *dto.Metric) time.Time { + var tm time.Time + if m.GetTimestampMs() > 0 { + tm = time.Unix(0, m.GetTimestampMs()*nsPerMilliseconds) + } + return tm + +} + +func tags(labels []*dto.LabelPair) models.Tags { + ts := make(models.Tags, len(labels)) + for i, label := range labels { + ts[i] = models.NewTag([]byte(label.GetName()), []byte(label.GetValue())) + } + return ts +} + +func fields(typ dto.MetricType, m *dto.Metric) models.Fields { + switch typ { + case dto.MetricType_SUMMARY: + return summary(m.GetSummary()) + case dto.MetricType_HISTOGRAM: + return histogram(m.GetHistogram()) + case dto.MetricType_GAUGE: + return value("gauge", m.GetGauge()) + case dto.MetricType_COUNTER: + return value("counter", m.GetCounter()) + case dto.MetricType_UNTYPED: + return value("value", m.GetUntyped()) + default: + return nil + } +} + +func summary(s *dto.Summary) map[string]interface{} { + fields := make(map[string]interface{}, len(s.Quantile)+2) + for _, q := range s.Quantile { + v := q.GetValue() + if !math.IsNaN(v) { + key := strconv.FormatFloat(q.GetQuantile(), 'f', -1, 64) + fields[key] = v + } + } + + fields["count"] = float64(s.GetSampleCount()) + fields["sum"] = float64(s.GetSampleSum()) + return fields +} + +func histogram(hist *dto.Histogram) map[string]interface{} { + fields := make(map[string]interface{}, len(hist.Bucket)+2) + for _, b := range hist.Bucket { + k := strconv.FormatFloat(b.GetUpperBound(), 'f', -1, 64) + fields[k] = float64(b.GetCumulativeCount()) + } + + fields["count"] = float64(hist.GetSampleCount()) + fields["sum"] = float64(hist.GetSampleSum()) + + return fields +} + +type valuer interface { + GetValue() float64 +} + +func value(typ string, m valuer) models.Fields { + vs := make(models.Fields, 1) + + v := m.GetValue() + if !math.IsNaN(v) { + vs[typ] = v + } + + return vs +} diff --git a/telemetry/handler.go b/telemetry/handler.go index 922c9221cb..c91a6bbd51 100644 --- a/telemetry/handler.go +++ b/telemetry/handler.go @@ -37,7 +37,7 @@ type PushGateway struct { Store Store Transformers []prometheus.Transformer - Format expfmt.Format + Encoder prometheus.Encoder } // NewPushGateway constructs the PushGateway. @@ -81,8 +81,10 @@ func (p *PushGateway) Handler(w http.ResponseWriter, r *http.Request) { p.MaxBytes = DefaultMaxBytes } - if p.Format == "" { - p.Format = expfmt.FmtText + if p.Encoder == nil { + p.Encoder = &prometheus.Expfmt{ + Format: expfmt.FmtText, + } } ctx, cancel := context.WithTimeout( @@ -118,14 +120,7 @@ func (p *PushGateway) Handler(w http.ResponseWriter, r *http.Request) { mfs = transformer.Transform(mfs) } - var data []byte - switch p.Format { - case "JSON": - data, err = prometheus.EncodeJSON(mfs) - default: - data, err = prometheus.EncodeExpfmt(mfs, p.Format) - } - + data, err := p.Encoder.Encode(mfs) if err != nil { p.Logger.Error("unable to encode metric families", zap.Error(err)) http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/telemetry/handler_test.go b/telemetry/handler_test.go index 6015c73d6e..40fa9308a5 100644 --- a/telemetry/handler_test.go +++ b/telemetry/handler_test.go @@ -113,7 +113,7 @@ func TestPushGateway_Handler(t *testing.T) { now: tt.fields.now, }, ) - p.Format = "JSON" + p.Encoder = &pr.JSON{} tt.args.r.Header.Set("Content-Type", tt.contentType) p.Handler(tt.args.w, tt.args.r) diff --git a/telemetry/reporter_test.go b/telemetry/reporter_test.go index 451743a386..66d8268832 100644 --- a/telemetry/reporter_test.go +++ b/telemetry/reporter_test.go @@ -27,7 +27,7 @@ func TestReport(t *testing.T) { } gw := NewPushGateway(logger, store, timestamps) - gw.Format = "JSON" + gw.Encoder = &pr.JSON{} ts := httptest.NewServer(http.HandlerFunc(gw.Handler)) defer ts.Close()