2019-01-19 00:57:26 +00:00
|
|
|
package telemetry
|
2019-01-18 13:38:58 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"io/ioutil"
|
|
|
|
"net/http"
|
|
|
|
"time"
|
|
|
|
|
2019-01-19 01:14:10 +00:00
|
|
|
pr "github.com/influxdata/influxdb/prometheus"
|
2019-01-18 13:38:58 +00:00
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/prometheus/common/expfmt"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Pusher pushes metrics to a prometheus push gateway.
|
|
|
|
type Pusher struct {
|
|
|
|
URL string
|
|
|
|
Gather prometheus.Gatherer
|
|
|
|
Client *http.Client
|
|
|
|
}
|
|
|
|
|
2019-01-20 20:10:40 +00:00
|
|
|
// NewPusher sends usage metrics to a prometheus push gateway.
|
|
|
|
func NewPusher(g prometheus.Gatherer) *Pusher {
|
2019-01-18 13:38:58 +00:00
|
|
|
return &Pusher{
|
2019-01-21 17:43:05 +00:00
|
|
|
//URL: "https://telemetry.influxdata.com/metrics/job/influxdb",
|
|
|
|
URL: "http://127.0.0.1:8080/metrics/job/influxdb",
|
2019-01-19 01:14:10 +00:00
|
|
|
Gather: &pr.Filter{
|
|
|
|
Gatherer: g,
|
|
|
|
Matcher: telemetryMatcher,
|
|
|
|
},
|
2019-01-18 13:38:58 +00:00
|
|
|
Client: &http.Client{
|
|
|
|
Transport: http.DefaultTransport,
|
|
|
|
Timeout: 10 * time.Second,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Push POSTs prometheus metrics in protobuf delimited format to a push gateway.
|
|
|
|
func (p *Pusher) Push(ctx context.Context) error {
|
|
|
|
resps := make(chan (error))
|
|
|
|
go func() {
|
|
|
|
resps <- p.push(ctx)
|
|
|
|
}()
|
|
|
|
|
|
|
|
select {
|
|
|
|
case err := <-resps:
|
|
|
|
return err
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *Pusher) push(ctx context.Context) error {
|
|
|
|
r, err := p.encode()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// when there are no metrics to send, then, no need to POST.
|
|
|
|
if r == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
req, err := http.NewRequest(http.MethodPost, p.URL, r)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
req = req.WithContext(ctx)
|
|
|
|
|
|
|
|
req.Header.Set("Content-Type", string(expfmt.FmtProtoDelim))
|
|
|
|
|
|
|
|
res, err := p.Client.Do(req)
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
defer res.Body.Close()
|
|
|
|
if res.StatusCode != http.StatusAccepted {
|
|
|
|
body, _ := ioutil.ReadAll(res.Body)
|
|
|
|
return fmt.Errorf("unable to POST metrics; received status %s: %s", http.StatusText(res.StatusCode), body)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *Pusher) encode() (io.Reader, error) {
|
|
|
|
mfs, err := p.Gather.Gather()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(mfs) == 0 {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
buf := &bytes.Buffer{}
|
|
|
|
enc := expfmt.NewEncoder(buf, expfmt.FmtProtoDelim)
|
|
|
|
for _, mf := range mfs {
|
|
|
|
if err := enc.Encode(mf); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return buf, nil
|
|
|
|
}
|