influxdb/gather/prometheus.go

155 lines
3.9 KiB
Go
Raw Normal View History

2018-08-21 20:16:15 +00:00
package gather
import (
"context"
"fmt"
"io"
"math"
"mime"
"net/http"
"time"
"github.com/influxdata/platform"
"github.com/matttproud/golang_protobuf_extensions/pbutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
2018-09-07 15:45:28 +00:00
// prometheusScraper handles parsing prometheus metrics.
// implements Scraper interfaces.
type prometheusScraper struct{}
2018-08-21 20:16:15 +00:00
2018-09-07 15:45:28 +00:00
// Gather parse metrics from a scraper target url.
func (p *prometheusScraper) Gather(ctx context.Context, target platform.ScraperTarget) (ms []Metrics, err error) {
resp, err := http.Get(target.URL)
2018-08-21 20:16:15 +00:00
if err != nil {
2018-09-07 15:45:28 +00:00
return ms, err
2018-08-21 20:16:15 +00:00
}
defer resp.Body.Close()
return p.parse(resp.Body, resp.Header)
}
2018-09-07 15:45:28 +00:00
func (p *prometheusScraper) parse(r io.Reader, header http.Header) ([]Metrics, error) {
2018-08-21 20:16:15 +00:00
var parser expfmt.TextParser
2018-09-07 15:45:28 +00:00
now := time.Now()
2018-08-21 20:16:15 +00:00
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
if err != nil {
2018-09-07 15:45:28 +00:00
return nil, err
2018-08-21 20:16:15 +00:00
}
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)
2018-09-07 15:45:28 +00:00
if mediatype == "application/vnd.google.protobuf" &&
2018-08-21 20:16:15 +00:00
params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" {
for {
mf := &dto.MetricFamily{}
if _, err := pbutil.ReadDelimited(r, mf); err != nil {
if err == io.EOF {
break
}
2018-09-07 15:45:28 +00:00
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err)
2018-08-21 20:16:15 +00:00
}
metricFamilies[mf.GetName()] = mf
}
} else {
metricFamilies, err = parser.TextToMetricFamilies(r)
if err != nil {
2018-09-07 15:45:28 +00:00
return nil, fmt.Errorf("reading text format failed: %s", err)
2018-08-21 20:16:15 +00:00
}
}
ms := make([]Metrics, 0)
// read metrics
2018-09-07 15:45:28 +00:00
for name, family := range metricFamilies {
for _, m := range family.Metric {
2018-08-21 20:16:15 +00:00
// reading tags
tags := makeLabels(m)
// reading fields
fields := make(map[string]interface{})
2018-09-07 15:45:28 +00:00
switch family.GetType() {
case dto.MetricType_SUMMARY:
2018-08-21 20:16:15 +00:00
// summary metric
fields = makeQuantiles(m)
fields["count"] = float64(m.GetSummary().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())
2018-09-07 15:45:28 +00:00
case dto.MetricType_HISTOGRAM:
2018-08-21 20:16:15 +00:00
// histogram metric
fields = makeBuckets(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetHistogram().GetSampleSum())
2018-09-07 15:45:28 +00:00
default:
2018-08-21 20:16:15 +00:00
// standard metric
fields = getNameAndValue(m)
}
2018-09-07 15:45:28 +00:00
if len(fields) == 0 {
continue
}
tm := now
if m.TimestampMs != nil && *m.TimestampMs > 0 {
tm = time.Unix(0, *m.TimestampMs*1000000)
}
me := Metrics{
Timestamp: tm.UnixNano(),
Tags: tags,
Fields: fields,
Name: name,
Type: MetricType(family.GetType()),
2018-08-21 20:16:15 +00:00
}
2018-09-07 15:45:28 +00:00
ms = append(ms, me)
2018-08-21 20:16:15 +00:00
}
}
2018-09-07 15:45:28 +00:00
return ms, nil
2018-08-21 20:16:15 +00:00
}
// Get labels from metric
func makeLabels(m *dto.Metric) map[string]string {
result := map[string]string{}
for _, lp := range m.Label {
result[lp.GetName()] = lp.GetValue()
}
return result
}
2018-09-07 15:45:28 +00:00
// Get Buckets from histogram metric
2018-08-21 20:16:15 +00:00
func makeBuckets(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
for _, b := range m.GetHistogram().Bucket {
fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount())
}
return fields
}
// Get name and value from metric
func getNameAndValue(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
if m.Gauge != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields["gauge"] = float64(m.GetGauge().GetValue())
}
} else if m.Counter != nil {
if !math.IsNaN(m.GetCounter().GetValue()) {
fields["counter"] = float64(m.GetCounter().GetValue())
}
} else if m.Untyped != nil {
if !math.IsNaN(m.GetUntyped().GetValue()) {
fields["value"] = float64(m.GetUntyped().GetValue())
}
}
return fields
}
// Get Quantiles from summary metric
func makeQuantiles(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
for _, q := range m.GetSummary().Quantile {
if !math.IsNaN(q.GetValue()) {
fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue())
}
}
return fields
}