feat(telemetry): encode metrics into store in various formats

pull/11347/head
Chris Goller 2019-01-21 22:02:24 -06:00
parent f30c5358d7
commit 781e305aff
5 changed files with 170 additions and 14 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/influxdata/influxdb/kit/cli"
influxlogger "github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/prometheus"
"github.com/influxdata/influxdb/telemetry"
"go.uber.org/zap"
)
@ -52,6 +53,9 @@ func run() error {
Logger: logger,
}
svc := telemetry.NewPushGateway(logger, store)
// Print data as line protocol
svc.Encoder = &prometheus.LineProtocol{}
handler := http.HandlerFunc(svc.Handler)
logger.Info("starting telemetryd server", zap.String("addr", addr))

View File

@ -4,11 +4,31 @@ import (
"bytes"
"encoding/json"
"io"
"math"
"strconv"
"time"
"github.com/influxdata/influxdb/models"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
// Encoder transforms metric families into bytes.
type Encoder interface {
// Encode encodes metrics into bytes.
Encode(mfs []*dto.MetricFamily) ([]byte, error)
}
// Expfmt is encodes metric familes into promtheus exposition format.
type Expfmt struct {
Format expfmt.Format
}
// Encode encodes metrics into prometheus exposition format bytes.
func (e *Expfmt) Encode(mfs []*dto.MetricFamily) ([]byte, error) {
return EncodeExpfmt(mfs, e.Format)
}
// DecodeExpfmt decodes the reader of format into metric families.
func DecodeExpfmt(r io.Reader, format expfmt.Format) ([]*dto.MetricFamily, error) {
dec := expfmt.NewDecoder(r, format)
@ -31,7 +51,7 @@ func DecodeExpfmt(r io.Reader, format expfmt.Format) ([]*dto.MetricFamily, error
// EncodeExpfmt encodes the metrics family (defaults to expfmt.FmtProtoDelim).
func EncodeExpfmt(mfs []*dto.MetricFamily, opts ...expfmt.Format) ([]byte, error) {
format := expfmt.FmtProtoDelim
if len(opts) != 0 {
if len(opts) != 0 && opts[0] != "" {
format = opts[0]
}
buf := &bytes.Buffer{}
@ -44,6 +64,15 @@ func EncodeExpfmt(mfs []*dto.MetricFamily, opts ...expfmt.Format) ([]byte, error
return buf.Bytes(), nil
}
// JSON is encodes metric familes into JSON.
type JSON struct{}
// Encode encodes metrics JSON bytes. This not always works
// as some prometheus values are NaN or Inf.
func (j *JSON) Encode(mfs []*dto.MetricFamily) ([]byte, error) {
return EncodeJSON(mfs)
}
// DecodeJSON decodes a JSON array of metrics families.
func DecodeJSON(r io.Reader) ([]*dto.MetricFamily, error) {
dec := json.NewDecoder(r)
@ -65,3 +94,131 @@ func DecodeJSON(r io.Reader) ([]*dto.MetricFamily, error) {
func EncodeJSON(mfs []*dto.MetricFamily) ([]byte, error) {
return json.Marshal(mfs)
}
const (
// just in case the definition of time.Nanosecond changes from 1.
nsPerMilliseconds = int64(time.Millisecond / time.Nanosecond)
)
// LineProtocol is encodes metric familes into influxdb lineprotocl.
type LineProtocol struct{}
// Encode encodes metrics into line protocol format bytes.
func (l *LineProtocol) Encode(mfs []*dto.MetricFamily) ([]byte, error) {
return EncodeLineProtocol(mfs)
}
// EncodeLineProtocol converts prometheus metrics into line protocol.
func EncodeLineProtocol(mfs []*dto.MetricFamily) ([]byte, error) {
var b bytes.Buffer
pts := points(mfs)
for _, p := range pts {
if _, err := b.WriteString(p.String()); err != nil {
return nil, err
}
if err := b.WriteByte('\n'); err != nil {
return nil, err
}
}
return b.Bytes(), nil
}
func points(mfs []*dto.MetricFamily) models.Points {
pts := make(models.Points, 0, len(mfs))
for _, mf := range mfs {
mts := make(models.Points, 0, len(mf.Metric))
name := mf.GetName()
for _, m := range mf.Metric {
ts := tags(m.Label)
fs := fields(mf.GetType(), m)
tm := timestamp(m)
pt, err := models.NewPoint(name, ts, fs, tm)
if err != nil {
continue
}
mts = append(mts, pt)
}
pts = append(pts, mts...)
}
return pts
}
func timestamp(m *dto.Metric) time.Time {
var tm time.Time
if m.GetTimestampMs() > 0 {
tm = time.Unix(0, m.GetTimestampMs()*nsPerMilliseconds)
}
return tm
}
func tags(labels []*dto.LabelPair) models.Tags {
ts := make(models.Tags, len(labels))
for i, label := range labels {
ts[i] = models.NewTag([]byte(label.GetName()), []byte(label.GetValue()))
}
return ts
}
func fields(typ dto.MetricType, m *dto.Metric) models.Fields {
switch typ {
case dto.MetricType_SUMMARY:
return summary(m.GetSummary())
case dto.MetricType_HISTOGRAM:
return histogram(m.GetHistogram())
case dto.MetricType_GAUGE:
return value("gauge", m.GetGauge())
case dto.MetricType_COUNTER:
return value("counter", m.GetCounter())
case dto.MetricType_UNTYPED:
return value("value", m.GetUntyped())
default:
return nil
}
}
func summary(s *dto.Summary) map[string]interface{} {
fields := make(map[string]interface{}, len(s.Quantile)+2)
for _, q := range s.Quantile {
v := q.GetValue()
if !math.IsNaN(v) {
key := strconv.FormatFloat(q.GetQuantile(), 'f', -1, 64)
fields[key] = v
}
}
fields["count"] = float64(s.GetSampleCount())
fields["sum"] = float64(s.GetSampleSum())
return fields
}
func histogram(hist *dto.Histogram) map[string]interface{} {
fields := make(map[string]interface{}, len(hist.Bucket)+2)
for _, b := range hist.Bucket {
k := strconv.FormatFloat(b.GetUpperBound(), 'f', -1, 64)
fields[k] = float64(b.GetCumulativeCount())
}
fields["count"] = float64(hist.GetSampleCount())
fields["sum"] = float64(hist.GetSampleSum())
return fields
}
type valuer interface {
GetValue() float64
}
func value(typ string, m valuer) models.Fields {
vs := make(models.Fields, 1)
v := m.GetValue()
if !math.IsNaN(v) {
vs[typ] = v
}
return vs
}

View File

@ -37,7 +37,7 @@ type PushGateway struct {
Store Store
Transformers []prometheus.Transformer
Format expfmt.Format
Encoder prometheus.Encoder
}
// NewPushGateway constructs the PushGateway.
@ -81,8 +81,10 @@ func (p *PushGateway) Handler(w http.ResponseWriter, r *http.Request) {
p.MaxBytes = DefaultMaxBytes
}
if p.Format == "" {
p.Format = expfmt.FmtText
if p.Encoder == nil {
p.Encoder = &prometheus.Expfmt{
Format: expfmt.FmtText,
}
}
ctx, cancel := context.WithTimeout(
@ -118,14 +120,7 @@ func (p *PushGateway) Handler(w http.ResponseWriter, r *http.Request) {
mfs = transformer.Transform(mfs)
}
var data []byte
switch p.Format {
case "JSON":
data, err = prometheus.EncodeJSON(mfs)
default:
data, err = prometheus.EncodeExpfmt(mfs, p.Format)
}
data, err := p.Encoder.Encode(mfs)
if err != nil {
p.Logger.Error("unable to encode metric families", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)

View File

@ -113,7 +113,7 @@ func TestPushGateway_Handler(t *testing.T) {
now: tt.fields.now,
},
)
p.Format = "JSON"
p.Encoder = &pr.JSON{}
tt.args.r.Header.Set("Content-Type", tt.contentType)
p.Handler(tt.args.w, tt.args.r)

View File

@ -27,7 +27,7 @@ func TestReport(t *testing.T) {
}
gw := NewPushGateway(logger, store, timestamps)
gw.Format = "JSON"
gw.Encoder = &pr.JSON{}
ts := httptest.NewServer(http.HandlerFunc(gw.Handler))
defer ts.Close()