Merge pull request #11347 from influxdata/feat/telemetry
Add optional telemetry that reports every 8 hourspull/11425/head
commit
31ab881c96
|
@ -7,7 +7,7 @@ import (
|
|||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/bbolt"
|
||||
bolt "github.com/coreos/bbolt"
|
||||
platform "github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/rand"
|
||||
"github.com/influxdata/influxdb/snowflake"
|
||||
|
@ -88,6 +88,11 @@ func (c *Client) Open(ctx context.Context) error {
|
|||
// initialize creates Buckets that are missing
|
||||
func (c *Client) initialize(ctx context.Context) error {
|
||||
if err := c.db.Update(func(tx *bolt.Tx) error {
|
||||
// Always create ID bucket.
|
||||
if err := c.initializeID(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Always create Buckets bucket.
|
||||
if err := c.initializeBuckets(ctx, tx); err != nil {
|
||||
return err
|
||||
|
|
|
@ -17,6 +17,18 @@ func init() {
|
|||
}
|
||||
|
||||
func NewTestClient() (*bolt.Client, func(), error) {
|
||||
c, closeFn, err := newTestClient()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := c.Open(context.Background()); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return c, closeFn, nil
|
||||
}
|
||||
|
||||
func newTestClient() (*bolt.Client, func(), error) {
|
||||
c := bolt.NewClient()
|
||||
|
||||
f, err := ioutil.TempFile("", "influxdata-platform-bolt-")
|
||||
|
@ -27,10 +39,6 @@ func NewTestClient() (*bolt.Client, func(), error) {
|
|||
|
||||
c.Path = f.Name()
|
||||
|
||||
if err := c.Open(context.TODO()); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
close := func() {
|
||||
c.Close()
|
||||
os.Remove(c.Path)
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
package bolt
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
|
||||
bolt "github.com/coreos/bbolt"
|
||||
platform "github.com/influxdata/influxdb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
idsBucket = []byte("idsv1")
|
||||
idKey = []byte("id")
|
||||
errIDNotFound = errors.New("source not found")
|
||||
)
|
||||
|
||||
var _ platform.IDGenerator = (*Client)(nil)
|
||||
|
||||
func (c *Client) initializeID(tx *bolt.Tx) error {
|
||||
if _, err := tx.CreateBucketIfNotExists([]byte(idsBucket)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := c.getID(tx)
|
||||
if err != nil && err != errIDNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
if err == errIDNotFound {
|
||||
if err := c.generateID(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ID retrieves the unique ID for this influx instance.
|
||||
func (c *Client) ID() platform.ID {
|
||||
// if any error occurs return a random number
|
||||
id := platform.ID(rand.Int63())
|
||||
err := c.db.View(func(tx *bolt.Tx) error {
|
||||
val, err := c.getID(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
id = val
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
c.Logger.Error("unable to load id", zap.Error(err))
|
||||
}
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
func (c *Client) getID(tx *bolt.Tx) (platform.ID, error) {
|
||||
v := tx.Bucket(idsBucket).Get(idKey)
|
||||
if len(v) == 0 {
|
||||
return platform.InvalidID(), errIDNotFound
|
||||
}
|
||||
return decodeID(v)
|
||||
}
|
||||
|
||||
func decodeID(val []byte) (platform.ID, error) {
|
||||
if len(val) < platform.IDLength {
|
||||
// This should not happen.
|
||||
return platform.InvalidID(), fmt.Errorf("provided value is too short to contain an ID. Please report this error")
|
||||
}
|
||||
|
||||
var id platform.ID
|
||||
if err := id.Decode(val[:platform.IDLength]); err != nil {
|
||||
return platform.InvalidID(), err
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (c *Client) generateID(tx *bolt.Tx) error {
|
||||
id := c.IDGenerator.ID()
|
||||
encodedID, err := id.Encode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Bucket(idsBucket).Put(idKey, encodedID)
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package bolt_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/mock"
|
||||
)
|
||||
|
||||
func TestID(t *testing.T) {
|
||||
c, closeFn, err := newTestClient()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new bolt client: %v", err)
|
||||
}
|
||||
defer closeFn()
|
||||
|
||||
c.IDGenerator = mock.NewIDGenerator(testIDStr, t)
|
||||
|
||||
if err := c.Open(context.Background()); err != nil {
|
||||
t.Fatalf("failed to open bolt client: %v", err)
|
||||
}
|
||||
|
||||
if got, want := c.ID(), testID; got != want {
|
||||
t.Errorf("Client.ID() = %v, want %v", got, want)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
package influxdb
|
||||
|
||||
// BuildInfo represents the information about InfluxDB build.
|
||||
type BuildInfo struct {
|
||||
Version string // Version is the current git tag with v prefix stripped
|
||||
Commit string // Commit is the current git commit SHA
|
||||
Date string // Date is the build date in RFC3339
|
||||
}
|
|
@ -6,7 +6,7 @@ import (
|
|||
"io"
|
||||
"net"
|
||||
nethttp "net/http"
|
||||
_ "net/http/pprof"
|
||||
_ "net/http/pprof" // needed to add pprof to our binary.
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/influxdata/influxdb/kit/prom"
|
||||
influxlogger "github.com/influxdata/influxdb/logger"
|
||||
"github.com/influxdata/influxdb/nats"
|
||||
infprom "github.com/influxdata/influxdb/prometheus"
|
||||
"github.com/influxdata/influxdb/proto"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
pcontrol "github.com/influxdata/influxdb/query/control"
|
||||
|
@ -37,8 +38,8 @@ import (
|
|||
taskbolt "github.com/influxdata/influxdb/task/backend/bolt"
|
||||
"github.com/influxdata/influxdb/task/backend/coordinator"
|
||||
taskexecutor "github.com/influxdata/influxdb/task/backend/executor"
|
||||
_ "github.com/influxdata/influxdb/tsdb/tsi1"
|
||||
_ "github.com/influxdata/influxdb/tsdb/tsm1"
|
||||
_ "github.com/influxdata/influxdb/tsdb/tsi1" // needed for tsi1
|
||||
_ "github.com/influxdata/influxdb/tsdb/tsm1" // needed for tsm1
|
||||
"github.com/influxdata/influxdb/vault"
|
||||
pzap "github.com/influxdata/influxdb/zap"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
|
@ -53,15 +54,16 @@ type Launcher struct {
|
|||
cancel func()
|
||||
running bool
|
||||
|
||||
logLevel string
|
||||
developerMode bool
|
||||
logLevel string
|
||||
reportingDisabled bool
|
||||
|
||||
httpBindAddress string
|
||||
boltPath string
|
||||
natsPath string
|
||||
developerMode bool
|
||||
enginePath string
|
||||
protosPath string
|
||||
|
||||
secretStore string
|
||||
secretStore string
|
||||
|
||||
boltClient *bolt.Client
|
||||
engine *storage.Engine
|
||||
|
@ -76,10 +78,14 @@ type Launcher struct {
|
|||
scheduler *taskbackend.TickScheduler
|
||||
|
||||
logger *zap.Logger
|
||||
reg *prom.Registry
|
||||
|
||||
Stdin io.Reader
|
||||
Stdout io.Writer
|
||||
Stderr io.Writer
|
||||
|
||||
// BuildInfo contains commit, version and such of influxdb.
|
||||
BuildInfo platform.BuildInfo
|
||||
}
|
||||
|
||||
// NewLauncher returns a new instance of Launcher connected to standard in/out/err.
|
||||
|
@ -91,10 +97,33 @@ func NewLauncher() *Launcher {
|
|||
}
|
||||
}
|
||||
|
||||
// Running returns true if the main Launcher has started running.
|
||||
func (m *Launcher) Running() bool {
|
||||
return m.running
|
||||
}
|
||||
|
||||
// ReportingDisabled is true if opted out of usage stats.
|
||||
func (m *Launcher) ReportingDisabled() bool {
|
||||
return m.reportingDisabled
|
||||
}
|
||||
|
||||
// Registry returns the prometheus metrics registry.
|
||||
func (m *Launcher) Registry() *prom.Registry {
|
||||
return m.reg
|
||||
}
|
||||
|
||||
// Logger returns the launchers logger.
|
||||
func (m *Launcher) Logger() *zap.Logger {
|
||||
return m.logger
|
||||
}
|
||||
|
||||
// SetBuild adds version, commit, and date to prometheus metrics.
|
||||
func (m *Launcher) SetBuild(version, commit, date string) {
|
||||
m.BuildInfo.Version = version
|
||||
m.BuildInfo.Commit = commit
|
||||
m.BuildInfo.Date = date
|
||||
}
|
||||
|
||||
// URL returns the URL to connect to the HTTP server.
|
||||
func (m *Launcher) URL() string {
|
||||
return fmt.Sprintf("http://127.0.0.1:%d", m.httpPort)
|
||||
|
@ -122,7 +151,7 @@ func (m *Launcher) Shutdown(ctx context.Context) {
|
|||
}
|
||||
|
||||
m.logger.Info("Stopping", zap.String("service", "query"))
|
||||
if err := m.queryController.Shutdown(ctx); err != nil {
|
||||
if err := m.queryController.Shutdown(ctx); err != nil && err != context.Canceled {
|
||||
m.logger.Info("Failed closing query service", zap.Error(err))
|
||||
}
|
||||
|
||||
|
@ -198,6 +227,12 @@ func (m *Launcher) Run(ctx context.Context, args ...string) error {
|
|||
Default: filepath.Join(dir, "protos"),
|
||||
Desc: "path to protos on the filesystem",
|
||||
},
|
||||
{
|
||||
DestP: &m.reportingDisabled,
|
||||
Flag: "reporting-disabled",
|
||||
Default: false,
|
||||
Desc: "disable sending telemetry data to https://telemetry.influxdata.com every 8 hours",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -231,10 +266,6 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
tracer.IDGenerator = snowflake.NewIDGenerator()
|
||||
opentracing.SetGlobalTracer(tracer)
|
||||
|
||||
reg := prom.NewRegistry()
|
||||
reg.MustRegister(prometheus.NewGoCollector())
|
||||
reg.WithLogger(m.logger)
|
||||
|
||||
m.boltClient = bolt.NewClient()
|
||||
m.boltClient.Path = m.boltPath
|
||||
m.boltClient.WithLogger(m.logger.With(zap.String("service", "bolt")))
|
||||
|
@ -244,7 +275,13 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
reg.MustRegister(m.boltClient)
|
||||
m.reg = prom.NewRegistry()
|
||||
m.reg.MustRegister(
|
||||
prometheus.NewGoCollector(),
|
||||
infprom.NewInfluxCollector(m.boltClient, m.BuildInfo),
|
||||
)
|
||||
m.reg.WithLogger(m.logger)
|
||||
m.reg.MustRegister(m.boltClient)
|
||||
|
||||
var (
|
||||
orgSvc platform.OrganizationService = m.boltClient
|
||||
|
@ -319,7 +356,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
return err
|
||||
}
|
||||
// The Engine's metrics must be registered after it opens.
|
||||
reg.MustRegister(m.engine.PrometheusCollectors()...)
|
||||
m.reg.MustRegister(m.engine.PrometheusCollectors()...)
|
||||
|
||||
pointsWriter = m.engine
|
||||
|
||||
|
@ -343,10 +380,10 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
}
|
||||
|
||||
m.queryController = pcontrol.New(cc)
|
||||
reg.MustRegister(m.queryController.PrometheusCollectors()...)
|
||||
m.reg.MustRegister(m.queryController.PrometheusCollectors()...)
|
||||
}
|
||||
|
||||
var storageQueryService query.ProxyQueryService = readservice.NewProxyQueryService(m.queryController)
|
||||
var storageQueryService = readservice.NewProxyQueryService(m.queryController)
|
||||
var taskSvc platform.TaskService
|
||||
{
|
||||
boltStore, err := taskbolt.New(m.boltClient.DB(), "tasks")
|
||||
|
@ -360,7 +397,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
lw := taskbackend.NewPointLogWriter(pointsWriter)
|
||||
m.scheduler = taskbackend.NewScheduler(boltStore, executor, lw, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond), taskbackend.WithLogger(m.logger))
|
||||
m.scheduler.Start(ctx)
|
||||
reg.MustRegister(m.scheduler.PrometheusCollectors()...)
|
||||
m.reg.MustRegister(m.scheduler.PrometheusCollectors()...)
|
||||
|
||||
queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController}
|
||||
lr := taskbackend.NewQueryLogReader(queryService)
|
||||
|
@ -451,9 +488,9 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
// HTTP server
|
||||
httpLogger := m.logger.With(zap.String("service", "http"))
|
||||
platformHandler := http.NewPlatformHandler(handlerConfig)
|
||||
reg.MustRegister(platformHandler.PrometheusCollectors()...)
|
||||
m.reg.MustRegister(platformHandler.PrometheusCollectors()...)
|
||||
|
||||
h := http.NewHandlerFromRegistry("platform", reg)
|
||||
h := http.NewHandlerFromRegistry("platform", m.reg)
|
||||
h.Handler = platformHandler
|
||||
h.Logger = httpLogger
|
||||
h.Tracer = opentracing.GlobalTracer()
|
||||
|
|
|
@ -5,21 +5,30 @@ import (
|
|||
"fmt"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/cmd/influxd/launcher"
|
||||
"github.com/influxdata/influxdb/kit/signals"
|
||||
_ "github.com/influxdata/influxdb/query/builtin"
|
||||
"github.com/influxdata/influxdb/telemetry"
|
||||
_ "github.com/influxdata/influxdb/tsdb/tsi1"
|
||||
_ "github.com/influxdata/influxdb/tsdb/tsm1"
|
||||
)
|
||||
|
||||
var (
|
||||
version = "dev"
|
||||
commit = "none"
|
||||
date = "unknown"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// exit with SIGINT and SIGTERM
|
||||
ctx := context.Background()
|
||||
ctx = signals.WithStandardSignals(ctx)
|
||||
|
||||
m := launcher.NewLauncher()
|
||||
m.SetBuild(version, commit, date)
|
||||
if err := m.Run(ctx, os.Args[1:]...); err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
|
@ -27,10 +36,23 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
if !m.ReportingDisabled() {
|
||||
reporter := telemetry.NewReporter(m.Registry())
|
||||
reporter.Interval = 8 * time.Hour
|
||||
reporter.Logger = m.Logger()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
reporter.Report(ctx)
|
||||
}()
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
|
||||
// Attempt clean shutdown.
|
||||
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
||||
defer cancel()
|
||||
m.Shutdown(ctx)
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
## Telemetry Server
|
||||
|
||||
Telemetry server accepts pushed prometheus metrics where it
|
||||
logs them to stdout.
|
||||
|
||||
Telemetry server is very similar to prometheus pushgateway, but,
|
||||
has stores that are configurable rather than just a /metrics
|
||||
endpoint.
|
|
@ -0,0 +1,68 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/kit/cli"
|
||||
influxlogger "github.com/influxdata/influxdb/logger"
|
||||
"github.com/influxdata/influxdb/prometheus"
|
||||
"github.com/influxdata/influxdb/telemetry"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
logger = influxlogger.New(os.Stdout)
|
||||
addr string
|
||||
)
|
||||
|
||||
func main() {
|
||||
prog := &cli.Program{
|
||||
Run: run,
|
||||
Name: "telemetryd",
|
||||
Opts: []cli.Opt{
|
||||
{
|
||||
DestP: &addr,
|
||||
Flag: "bind-addr",
|
||||
Default: ":8080",
|
||||
Desc: "binding address for telemetry server",
|
||||
},
|
||||
},
|
||||
}
|
||||
cmd := cli.NewCommand(prog)
|
||||
|
||||
var exitCode int
|
||||
if err := cmd.Execute(); err != nil {
|
||||
exitCode = 1
|
||||
logger.Error("Command returned error", zap.Error(err))
|
||||
}
|
||||
|
||||
if err := logger.Sync(); err != nil {
|
||||
exitCode = 1
|
||||
fmt.Fprintf(os.Stderr, "Error syncing logs: %v\n", err)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
os.Exit(exitCode)
|
||||
}
|
||||
|
||||
func run() error {
|
||||
logger := logger.With(zap.String("service", "telemetryd"))
|
||||
store := &telemetry.LogStore{
|
||||
Logger: logger,
|
||||
}
|
||||
svc := telemetry.NewPushGateway(logger, store)
|
||||
// Print data as line protocol
|
||||
svc.Encoder = &prometheus.LineProtocol{}
|
||||
|
||||
handler := http.HandlerFunc(svc.Handler)
|
||||
logger.Info("starting telemetryd server", zap.String("addr", addr))
|
||||
|
||||
srv := http.Server{
|
||||
Addr: addr,
|
||||
Handler: handler,
|
||||
ErrorLog: zap.NewStdLog(logger),
|
||||
}
|
||||
return srv.ListenAndServe()
|
||||
}
|
1
go.mod
1
go.mod
|
@ -45,6 +45,7 @@ require (
|
|||
github.com/gocql/gocql v0.0.0-20181124151448-70385f88b28b // indirect
|
||||
github.com/gogo/protobuf v1.1.1
|
||||
github.com/golang/gddo v0.0.0-20181116215533-9bd4a3295021
|
||||
github.com/golang/protobuf v1.2.0
|
||||
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
|
||||
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c
|
||||
github.com/google/go-cmp v0.2.0
|
||||
|
|
|
@ -0,0 +1,224 @@
|
|||
package prometheus
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"math"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
)
|
||||
|
||||
// Encoder transforms metric families into bytes.
|
||||
type Encoder interface {
|
||||
// Encode encodes metrics into bytes.
|
||||
Encode(mfs []*dto.MetricFamily) ([]byte, error)
|
||||
}
|
||||
|
||||
// Expfmt is encodes metric familes into promtheus exposition format.
|
||||
type Expfmt struct {
|
||||
Format expfmt.Format
|
||||
}
|
||||
|
||||
// Encode encodes metrics into prometheus exposition format bytes.
|
||||
func (e *Expfmt) Encode(mfs []*dto.MetricFamily) ([]byte, error) {
|
||||
return EncodeExpfmt(mfs, e.Format)
|
||||
}
|
||||
|
||||
// DecodeExpfmt decodes the reader of format into metric families.
|
||||
func DecodeExpfmt(r io.Reader, format expfmt.Format) ([]*dto.MetricFamily, error) {
|
||||
dec := expfmt.NewDecoder(r, format)
|
||||
mfs := []*dto.MetricFamily{}
|
||||
for {
|
||||
var mf dto.MetricFamily
|
||||
if err := dec.Decode(&mf); err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
mfs = append(mfs, &mf)
|
||||
}
|
||||
return mfs, nil
|
||||
}
|
||||
|
||||
// EncodeExpfmt encodes the metrics family (defaults to expfmt.FmtProtoDelim).
|
||||
func EncodeExpfmt(mfs []*dto.MetricFamily, opts ...expfmt.Format) ([]byte, error) {
|
||||
format := expfmt.FmtProtoDelim
|
||||
if len(opts) != 0 && opts[0] != "" {
|
||||
format = opts[0]
|
||||
}
|
||||
buf := &bytes.Buffer{}
|
||||
enc := expfmt.NewEncoder(buf, format)
|
||||
for _, mf := range mfs {
|
||||
if err := enc.Encode(mf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
// JSON is encodes metric familes into JSON.
|
||||
type JSON struct{}
|
||||
|
||||
// Encode encodes metrics JSON bytes. This not always works
|
||||
// as some prometheus values are NaN or Inf.
|
||||
func (j *JSON) Encode(mfs []*dto.MetricFamily) ([]byte, error) {
|
||||
return EncodeJSON(mfs)
|
||||
}
|
||||
|
||||
// DecodeJSON decodes a JSON array of metrics families.
|
||||
func DecodeJSON(r io.Reader) ([]*dto.MetricFamily, error) {
|
||||
dec := json.NewDecoder(r)
|
||||
families := []*dto.MetricFamily{}
|
||||
for {
|
||||
mfs := []*dto.MetricFamily{}
|
||||
|
||||
if err := dec.Decode(&mfs); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
families = append(families, mfs...)
|
||||
}
|
||||
return families, nil
|
||||
}
|
||||
|
||||
// EncodeJSON encodes the metric families to JSON.
|
||||
func EncodeJSON(mfs []*dto.MetricFamily) ([]byte, error) {
|
||||
return json.Marshal(mfs)
|
||||
}
|
||||
|
||||
const (
|
||||
// just in case the definition of time.Nanosecond changes from 1.
|
||||
nsPerMilliseconds = int64(time.Millisecond / time.Nanosecond)
|
||||
)
|
||||
|
||||
// LineProtocol is encodes metric familes into influxdb lineprotocl.
|
||||
type LineProtocol struct{}
|
||||
|
||||
// Encode encodes metrics into line protocol format bytes.
|
||||
func (l *LineProtocol) Encode(mfs []*dto.MetricFamily) ([]byte, error) {
|
||||
return EncodeLineProtocol(mfs)
|
||||
}
|
||||
|
||||
// EncodeLineProtocol converts prometheus metrics into line protocol.
|
||||
func EncodeLineProtocol(mfs []*dto.MetricFamily) ([]byte, error) {
|
||||
var b bytes.Buffer
|
||||
|
||||
pts := points(mfs)
|
||||
for _, p := range pts {
|
||||
if _, err := b.WriteString(p.String()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := b.WriteByte('\n'); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
func points(mfs []*dto.MetricFamily) models.Points {
|
||||
pts := make(models.Points, 0, len(mfs))
|
||||
for _, mf := range mfs {
|
||||
mts := make(models.Points, 0, len(mf.Metric))
|
||||
name := mf.GetName()
|
||||
for _, m := range mf.Metric {
|
||||
ts := tags(m.Label)
|
||||
fs := fields(mf.GetType(), m)
|
||||
tm := timestamp(m)
|
||||
|
||||
pt, err := models.NewPoint(name, ts, fs, tm)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
mts = append(mts, pt)
|
||||
}
|
||||
pts = append(pts, mts...)
|
||||
}
|
||||
|
||||
return pts
|
||||
}
|
||||
|
||||
func timestamp(m *dto.Metric) time.Time {
|
||||
var tm time.Time
|
||||
if m.GetTimestampMs() > 0 {
|
||||
tm = time.Unix(0, m.GetTimestampMs()*nsPerMilliseconds)
|
||||
}
|
||||
return tm
|
||||
|
||||
}
|
||||
|
||||
func tags(labels []*dto.LabelPair) models.Tags {
|
||||
ts := make(models.Tags, len(labels))
|
||||
for i, label := range labels {
|
||||
ts[i] = models.NewTag([]byte(label.GetName()), []byte(label.GetValue()))
|
||||
}
|
||||
return ts
|
||||
}
|
||||
|
||||
func fields(typ dto.MetricType, m *dto.Metric) models.Fields {
|
||||
switch typ {
|
||||
case dto.MetricType_SUMMARY:
|
||||
return summary(m.GetSummary())
|
||||
case dto.MetricType_HISTOGRAM:
|
||||
return histogram(m.GetHistogram())
|
||||
case dto.MetricType_GAUGE:
|
||||
return value("gauge", m.GetGauge())
|
||||
case dto.MetricType_COUNTER:
|
||||
return value("counter", m.GetCounter())
|
||||
case dto.MetricType_UNTYPED:
|
||||
return value("value", m.GetUntyped())
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func summary(s *dto.Summary) map[string]interface{} {
|
||||
fields := make(map[string]interface{}, len(s.Quantile)+2)
|
||||
for _, q := range s.Quantile {
|
||||
v := q.GetValue()
|
||||
if !math.IsNaN(v) {
|
||||
key := strconv.FormatFloat(q.GetQuantile(), 'f', -1, 64)
|
||||
fields[key] = v
|
||||
}
|
||||
}
|
||||
|
||||
fields["count"] = float64(s.GetSampleCount())
|
||||
fields["sum"] = float64(s.GetSampleSum())
|
||||
return fields
|
||||
}
|
||||
|
||||
func histogram(hist *dto.Histogram) map[string]interface{} {
|
||||
fields := make(map[string]interface{}, len(hist.Bucket)+2)
|
||||
for _, b := range hist.Bucket {
|
||||
k := strconv.FormatFloat(b.GetUpperBound(), 'f', -1, 64)
|
||||
fields[k] = float64(b.GetCumulativeCount())
|
||||
}
|
||||
|
||||
fields["count"] = float64(hist.GetSampleCount())
|
||||
fields["sum"] = float64(hist.GetSampleSum())
|
||||
|
||||
return fields
|
||||
}
|
||||
|
||||
type valuer interface {
|
||||
GetValue() float64
|
||||
}
|
||||
|
||||
func value(typ string, m valuer) models.Fields {
|
||||
vs := make(models.Fields, 1)
|
||||
|
||||
v := m.GetValue()
|
||||
if !math.IsNaN(v) {
|
||||
vs[typ] = v
|
||||
}
|
||||
|
||||
return vs
|
||||
}
|
|
@ -0,0 +1,125 @@
|
|||
package prometheus_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
pr "github.com/influxdata/influxdb/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
)
|
||||
|
||||
func Test_CodecExpfmt(t *testing.T) {
|
||||
mf1 := []*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))}
|
||||
mf2 := []*dto.MetricFamily{NewCounter("mf2", 1.0, pr.L("n2", "v2"))}
|
||||
|
||||
b1, err := pr.EncodeExpfmt(mf1)
|
||||
if err != nil {
|
||||
t.Fatalf("encodeExpfmt() error = %v", err)
|
||||
}
|
||||
|
||||
got1, err := pr.DecodeExpfmt(bytes.NewBuffer(b1), expfmt.FmtProtoDelim)
|
||||
if err != nil {
|
||||
t.Fatalf("decodeExpfmt() error = %v", err)
|
||||
}
|
||||
|
||||
for i := range got1 {
|
||||
if got1[i].String() != mf1[i].String() {
|
||||
t.Errorf("codec1() = %v, want %v", got1[i].String(), mf1[i].String())
|
||||
}
|
||||
}
|
||||
|
||||
b2, err := pr.EncodeExpfmt(mf2)
|
||||
if err != nil {
|
||||
t.Fatalf("encodeExpfmt() error = %v", err)
|
||||
}
|
||||
|
||||
got2, err := pr.DecodeExpfmt(bytes.NewBuffer(b2), expfmt.FmtProtoDelim)
|
||||
if err != nil {
|
||||
t.Fatalf("decodeExpfmt() error = %v", err)
|
||||
}
|
||||
|
||||
for i := range got2 {
|
||||
if got2[i].String() != mf2[i].String() {
|
||||
t.Errorf("codec2() = %v, want %v", got2[i].String(), mf2[i].String())
|
||||
}
|
||||
}
|
||||
|
||||
b3 := append(b2, b1...)
|
||||
b3 = append(b3, b2...)
|
||||
|
||||
mf3 := []*dto.MetricFamily{
|
||||
NewCounter("mf2", 1.0, pr.L("n2", "v2")),
|
||||
NewCounter("mf1", 1.0, pr.L("n1", "v1")),
|
||||
NewCounter("mf2", 1.0, pr.L("n2", "v2")),
|
||||
}
|
||||
|
||||
got3, err := pr.DecodeExpfmt(bytes.NewBuffer(b3), expfmt.FmtProtoDelim)
|
||||
if err != nil {
|
||||
t.Fatalf("decodeExpfmt() error = %v", err)
|
||||
}
|
||||
|
||||
for i := range got3 {
|
||||
if got3[i].String() != mf3[i].String() {
|
||||
t.Errorf("codec3() = %v, want %v", got3[i].String(), mf3[i].String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Test_CodecJSON(t *testing.T) {
|
||||
mf1 := []*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1")), NewCounter("mf1", 1.0, pr.L("n1", "v1"))}
|
||||
mf2 := []*dto.MetricFamily{NewCounter("mf2", 1.0, pr.L("n2", "v2"))}
|
||||
|
||||
b1, err := pr.EncodeJSON(mf1)
|
||||
if err != nil {
|
||||
t.Fatalf("encodeJSON() error = %v", err)
|
||||
}
|
||||
|
||||
got1, err := pr.DecodeJSON(bytes.NewBuffer(b1))
|
||||
if err != nil {
|
||||
t.Fatalf("decodeJSON() error = %v", err)
|
||||
}
|
||||
|
||||
for i := range got1 {
|
||||
if got1[i].String() != mf1[i].String() {
|
||||
t.Errorf("codec1() = %v, want %v", got1[i].String(), mf1[i].String())
|
||||
}
|
||||
}
|
||||
|
||||
b2, err := pr.EncodeJSON(mf2)
|
||||
if err != nil {
|
||||
t.Fatalf("encodeJSON() error = %v", err)
|
||||
}
|
||||
|
||||
got2, err := pr.DecodeJSON(bytes.NewBuffer(b2))
|
||||
if err != nil {
|
||||
t.Fatalf("decodeJSON() error = %v", err)
|
||||
}
|
||||
|
||||
for i := range got2 {
|
||||
if got2[i].String() != mf2[i].String() {
|
||||
t.Errorf("codec2() = %v, want %v", got2[i].String(), mf2[i].String())
|
||||
}
|
||||
}
|
||||
|
||||
b3 := append(b2, b1...)
|
||||
b3 = append(b3, b2...)
|
||||
|
||||
mf3 := []*dto.MetricFamily{
|
||||
NewCounter("mf2", 1.0, pr.L("n2", "v2")),
|
||||
NewCounter("mf1", 1.0, pr.L("n1", "v1")),
|
||||
NewCounter("mf1", 1.0, pr.L("n1", "v1")),
|
||||
NewCounter("mf2", 1.0, pr.L("n2", "v2")),
|
||||
}
|
||||
|
||||
got3, err := pr.DecodeJSON(bytes.NewBuffer(b3))
|
||||
if err != nil {
|
||||
t.Fatalf("decodeJSON() error = %v", err)
|
||||
}
|
||||
|
||||
for i := range got3 {
|
||||
if got3[i].String() != mf3[i].String() {
|
||||
t.Errorf("codec3() = %v, want %v", got3[i].String(), mf3[i].String())
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
package prometheus_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/influxdb/prometheus"
|
||||
pr "github.com/prometheus/client_golang/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
)
|
||||
|
||||
const metrics = `
|
||||
# HELP go_goroutines Number of goroutines that currently exist.
|
||||
# TYPE go_goroutines gauge
|
||||
go_goroutines 85
|
||||
# HELP go_info Information about the Go environment.
|
||||
# TYPE go_info gauge
|
||||
go_info{version="go1.11.4"} 1
|
||||
# HELP storage_compactions_queued Number of queued compactions.
|
||||
# TYPE storage_compactions_queued gauge
|
||||
storage_compactions_queued{level="1"} 1
|
||||
storage_compactions_queued{level="2"} 2
|
||||
`
|
||||
|
||||
func ExampleFilter_Gather() {
|
||||
mfs, _ := prometheus.DecodeExpfmt(bytes.NewBufferString(metrics), expfmt.FmtText)
|
||||
fmt.Printf("Start with %d metric families\n", len(mfs))
|
||||
fmt.Printf("%s\n", metrics)
|
||||
|
||||
filter := &prometheus.Filter{
|
||||
Gatherer: pr.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
return mfs, nil
|
||||
}),
|
||||
Matcher: prometheus.NewMatcher().
|
||||
Family("go_goroutines").
|
||||
Family(
|
||||
"storage_compactions_queued",
|
||||
prometheus.L("level", "2"),
|
||||
),
|
||||
}
|
||||
|
||||
fmt.Printf("Filtering for the entire go_goroutines family and\njust the level=2 label of the storage_compactions_queued family.\n\n")
|
||||
filtered, _ := filter.Gather()
|
||||
b, _ := prometheus.EncodeExpfmt(filtered, expfmt.FmtText)
|
||||
|
||||
fmt.Printf("After filtering:\n\n%s", string(b))
|
||||
|
||||
// Output:
|
||||
// Start with 3 metric families
|
||||
//
|
||||
// # HELP go_goroutines Number of goroutines that currently exist.
|
||||
// # TYPE go_goroutines gauge
|
||||
// go_goroutines 85
|
||||
// # HELP go_info Information about the Go environment.
|
||||
// # TYPE go_info gauge
|
||||
// go_info{version="go1.11.4"} 1
|
||||
// # HELP storage_compactions_queued Number of queued compactions.
|
||||
// # TYPE storage_compactions_queued gauge
|
||||
// storage_compactions_queued{level="1"} 1
|
||||
// storage_compactions_queued{level="2"} 2
|
||||
//
|
||||
// Filtering for the entire go_goroutines family and
|
||||
// just the level=2 label of the storage_compactions_queued family.
|
||||
//
|
||||
// After filtering:
|
||||
//
|
||||
// # HELP go_goroutines Number of goroutines that currently exist.
|
||||
// # TYPE go_goroutines gauge
|
||||
// go_goroutines 85
|
||||
// # HELP storage_compactions_queued Number of queued compactions.
|
||||
// # TYPE storage_compactions_queued gauge
|
||||
// storage_compactions_queued{level="2"} 2
|
||||
}
|
|
@ -0,0 +1,118 @@
|
|||
package prometheus
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
)
|
||||
|
||||
var _ prometheus.Gatherer = (*Filter)(nil)
|
||||
|
||||
// Filter filters the metrics from Gather using Matcher.
|
||||
type Filter struct {
|
||||
Gatherer prometheus.Gatherer
|
||||
Matcher Matcher
|
||||
}
|
||||
|
||||
// Gather filters all metrics to only those that match the Matcher.
|
||||
func (f *Filter) Gather() ([]*dto.MetricFamily, error) {
|
||||
mfs, err := f.Gatherer.Gather()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return f.Matcher.Match(mfs), nil
|
||||
}
|
||||
|
||||
// Matcher is used to match families of prometheus metrics.
|
||||
type Matcher map[string]Labels // family name to label/value
|
||||
|
||||
// NewMatcher returns a new matcher.
|
||||
func NewMatcher() Matcher {
|
||||
return Matcher{}
|
||||
}
|
||||
|
||||
// Family helps constuct match by adding a metric family to match to.
|
||||
func (m Matcher) Family(name string, lps ...*dto.LabelPair) Matcher {
|
||||
// prometheus metrics labels are sorted by label name.
|
||||
sort.Slice(lps, func(i, j int) bool {
|
||||
return lps[i].GetName() < lps[j].GetName()
|
||||
})
|
||||
|
||||
pairs := &labelPairs{
|
||||
Label: lps,
|
||||
}
|
||||
|
||||
family, ok := m[name]
|
||||
if !ok {
|
||||
family = make(Labels)
|
||||
}
|
||||
|
||||
family[pairs.String()] = true
|
||||
m[name] = family
|
||||
return m
|
||||
}
|
||||
|
||||
// Match returns all metric families that match.
|
||||
func (m Matcher) Match(mfs []*dto.MetricFamily) []*dto.MetricFamily {
|
||||
if len(mfs) == 0 {
|
||||
return mfs
|
||||
}
|
||||
|
||||
filteredFamilies := []*dto.MetricFamily{}
|
||||
for _, mf := range mfs {
|
||||
labels, ok := m[mf.GetName()]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
metrics := []*dto.Metric{}
|
||||
match := false
|
||||
for _, metric := range mf.Metric {
|
||||
if labels.Match(metric) {
|
||||
match = true
|
||||
metrics = append(metrics, metric)
|
||||
}
|
||||
}
|
||||
if match {
|
||||
filteredFamilies = append(filteredFamilies, &dto.MetricFamily{
|
||||
Name: mf.Name,
|
||||
Help: mf.Help,
|
||||
Type: mf.Type,
|
||||
Metric: metrics,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
sort.Sort(familySorter(filteredFamilies))
|
||||
return filteredFamilies
|
||||
}
|
||||
|
||||
// L is used with Family to create a series of label pairs for matching.
|
||||
func L(name, value string) *dto.LabelPair {
|
||||
return &dto.LabelPair{
|
||||
Name: proto.String(name),
|
||||
Value: proto.String(value),
|
||||
}
|
||||
}
|
||||
|
||||
// Labels are string representations of a set of prometheus label pairs that
|
||||
// are used to match to metric.
|
||||
type Labels map[string]bool
|
||||
|
||||
// Match checks if the metric's labels matches this set of labels.
|
||||
func (ls Labels) Match(metric *dto.Metric) bool {
|
||||
lp := &labelPairs{metric.Label}
|
||||
return ls[lp.String()] || ls[""] // match empty string so no labels can be matched.
|
||||
}
|
||||
|
||||
// labelPairs is used to serialize a portion of dto.Metric into a serializable
|
||||
// string.
|
||||
type labelPairs struct {
|
||||
Label []*dto.LabelPair `protobuf:"bytes,1,rep,name=label" json:"label,omitempty"`
|
||||
}
|
||||
|
||||
func (l *labelPairs) Reset() {}
|
||||
func (l *labelPairs) String() string { return proto.CompactTextString(l) }
|
||||
func (*labelPairs) ProtoMessage() {}
|
|
@ -0,0 +1,133 @@
|
|||
package prometheus_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
pr "github.com/influxdata/influxdb/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
)
|
||||
|
||||
func TestFilter_Gather(t *testing.T) {
|
||||
type fields struct {
|
||||
Gatherer prometheus.Gatherer
|
||||
Matcher pr.Matcher
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
want []*dto.MetricFamily
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "no metrics returns nil",
|
||||
fields: fields{
|
||||
Gatherer: prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
return nil, nil
|
||||
}),
|
||||
Matcher: pr.NewMatcher().
|
||||
Family("http_api_requests_total",
|
||||
pr.L("handler", "platform"),
|
||||
pr.L("method", "GET"),
|
||||
pr.L("path", "/api/v2"),
|
||||
pr.L("status", "2XX"),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "gather error returns error",
|
||||
fields: fields{
|
||||
Gatherer: prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
return nil, fmt.Errorf("e1")
|
||||
}),
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "no matches returns no metric families",
|
||||
fields: fields{
|
||||
Gatherer: prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
mf := &dto.MetricFamily{
|
||||
Name: proto.String("n1"),
|
||||
Help: proto.String("h1"),
|
||||
}
|
||||
return []*dto.MetricFamily{mf}, nil
|
||||
}),
|
||||
Matcher: pr.NewMatcher().
|
||||
Family("http_api_requests_total",
|
||||
pr.L("handler", "platform"),
|
||||
pr.L("method", "GET"),
|
||||
pr.L("path", "/api/v2"),
|
||||
pr.L("status", "2XX"),
|
||||
),
|
||||
},
|
||||
want: []*dto.MetricFamily{},
|
||||
},
|
||||
{
|
||||
name: "matching family without metric matches nothing",
|
||||
fields: fields{
|
||||
Gatherer: prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
mf := &dto.MetricFamily{
|
||||
Name: proto.String("go_memstats_frees_total"),
|
||||
}
|
||||
return []*dto.MetricFamily{mf}, nil
|
||||
}),
|
||||
Matcher: pr.NewMatcher().
|
||||
Family("go_memstats_frees_total"),
|
||||
},
|
||||
want: []*dto.MetricFamily{},
|
||||
},
|
||||
{
|
||||
name: "matching family with no labels matches",
|
||||
fields: fields{
|
||||
Gatherer: prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
return []*dto.MetricFamily{NewCounter("go_memstats_frees_total", 1.0)}, nil
|
||||
}),
|
||||
Matcher: pr.NewMatcher().
|
||||
Family("go_memstats_frees_total"),
|
||||
},
|
||||
want: []*dto.MetricFamily{NewCounter("go_memstats_frees_total", 1.0)},
|
||||
},
|
||||
{
|
||||
name: "matching with labels a family with labels matches",
|
||||
fields: fields{
|
||||
Gatherer: prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
return []*dto.MetricFamily{NewCounter("go_memstats_frees_total", 1.0, pr.L("n1", "v1"))}, nil
|
||||
}),
|
||||
Matcher: pr.NewMatcher().
|
||||
Family("go_memstats_frees_total", pr.L("n1", "v1")),
|
||||
},
|
||||
want: []*dto.MetricFamily{NewCounter("go_memstats_frees_total", 1.0, pr.L("n1", "v1"))},
|
||||
},
|
||||
{
|
||||
name: "matching a family that has no labels with labels matches",
|
||||
fields: fields{
|
||||
Gatherer: prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
return []*dto.MetricFamily{NewCounter("go_memstats_frees_total", 1.0, pr.L("n1", "v1"))}, nil
|
||||
}),
|
||||
Matcher: pr.NewMatcher().
|
||||
Family("go_memstats_frees_total"),
|
||||
},
|
||||
want: []*dto.MetricFamily{NewCounter("go_memstats_frees_total", 1.0, pr.L("n1", "v1"))},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
f := &pr.Filter{
|
||||
Gatherer: tt.fields.Gatherer,
|
||||
Matcher: tt.fields.Matcher,
|
||||
}
|
||||
got, err := f.Gather()
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("Filter.Gather() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("Filter.Gather() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
package prometheus
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
platform "github.com/influxdata/influxdb"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type influxCollector struct {
|
||||
influxInfoDesc *prometheus.Desc
|
||||
|
||||
influxUptimeDesc *prometheus.Desc
|
||||
start time.Time
|
||||
}
|
||||
|
||||
// NewInfluxCollector returns a collector which exports influxdb process metrics.
|
||||
func NewInfluxCollector(procID platform.IDGenerator, build platform.BuildInfo) prometheus.Collector {
|
||||
id := procID.ID().String()
|
||||
|
||||
return &influxCollector{
|
||||
influxInfoDesc: prometheus.NewDesc(
|
||||
"influxdb_info",
|
||||
"Information about the influxdb environment.",
|
||||
nil, prometheus.Labels{
|
||||
"version": build.Version,
|
||||
"commit": build.Commit,
|
||||
"date": build.Date,
|
||||
"os": runtime.GOOS,
|
||||
"arch": runtime.GOARCH,
|
||||
"cpus": strconv.Itoa(runtime.NumCPU()),
|
||||
},
|
||||
),
|
||||
influxUptimeDesc: prometheus.NewDesc(
|
||||
"influxdb_uptime_seconds",
|
||||
"influxdb process uptime in seconds",
|
||||
nil, prometheus.Labels{
|
||||
"id": id,
|
||||
},
|
||||
),
|
||||
start: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// Describe returns all descriptions of the collector.
|
||||
func (c *influxCollector) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- c.influxInfoDesc
|
||||
ch <- c.influxUptimeDesc
|
||||
}
|
||||
|
||||
// Collect returns the current state of all metrics of the collector.
|
||||
func (c *influxCollector) Collect(ch chan<- prometheus.Metric) {
|
||||
ch <- prometheus.MustNewConstMetric(c.influxInfoDesc, prometheus.GaugeValue, 1)
|
||||
|
||||
uptime := time.Since(c.start).Seconds()
|
||||
ch <- prometheus.MustNewConstMetric(c.influxUptimeDesc, prometheus.GaugeValue, float64(uptime))
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package prometheus_test
|
||||
|
||||
import (
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
)
|
||||
|
||||
func NewCounter(name string, v float64, ls ...*dto.LabelPair) *dto.MetricFamily {
|
||||
m := &dto.Metric{
|
||||
Label: ls,
|
||||
Counter: &dto.Counter{
|
||||
Value: &v,
|
||||
},
|
||||
}
|
||||
return &dto.MetricFamily{
|
||||
Name: proto.String(name),
|
||||
Type: dto.MetricType_COUNTER.Enum(),
|
||||
Metric: []*dto.Metric{m},
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package prometheus
|
||||
|
||||
import dto "github.com/prometheus/client_model/go"
|
||||
|
||||
// labelPairSorter implements sort.Interface. It is used to sort a slice of
|
||||
// dto.LabelPair pointers.
|
||||
type labelPairSorter []*dto.LabelPair
|
||||
|
||||
func (s labelPairSorter) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
func (s labelPairSorter) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
||||
func (s labelPairSorter) Less(i, j int) bool {
|
||||
return s[i].GetName() < s[j].GetName()
|
||||
}
|
||||
|
||||
// familySorter implements sort.Interface. It is used to sort a slice of
|
||||
// dto.MetricFamily pointers.
|
||||
type familySorter []*dto.MetricFamily
|
||||
|
||||
func (s familySorter) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
func (s familySorter) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
||||
func (s familySorter) Less(i, j int) bool {
|
||||
return s[i].GetName() < s[j].GetName()
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
package prometheus
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
)
|
||||
|
||||
// Transformer modifies prometheus metrics families.
|
||||
type Transformer interface {
|
||||
// Transform updates the metrics family
|
||||
Transform(mfs []*dto.MetricFamily) []*dto.MetricFamily
|
||||
}
|
||||
|
||||
var _ Transformer = (*AddLabels)(nil)
|
||||
|
||||
// AddLabels adds labels to all metrics. It will overwrite
|
||||
// the label if it already exists.
|
||||
type AddLabels struct {
|
||||
Labels map[string]string
|
||||
}
|
||||
|
||||
// Transform adds labels to the metrics.
|
||||
func (a *AddLabels) Transform(mfs []*dto.MetricFamily) []*dto.MetricFamily {
|
||||
for i := range mfs {
|
||||
for j, m := range mfs[i].Metric {
|
||||
// Filter out labels to add
|
||||
labels := m.Label[:0]
|
||||
for _, l := range m.Label {
|
||||
if _, ok := a.Labels[l.GetName()]; !ok {
|
||||
labels = append(labels, l)
|
||||
}
|
||||
}
|
||||
|
||||
// Add all new labels to the metric
|
||||
for k, v := range a.Labels {
|
||||
labels = append(labels, L(k, v))
|
||||
}
|
||||
sort.Sort(labelPairSorter(labels))
|
||||
mfs[i].Metric[j].Label = labels
|
||||
}
|
||||
}
|
||||
return mfs
|
||||
}
|
||||
|
||||
var _ Transformer = (*RemoveLabels)(nil)
|
||||
|
||||
// RemoveLabels adds labels to all metrics. It will overwrite
|
||||
// the label if it already exists.
|
||||
type RemoveLabels struct {
|
||||
Labels map[string]struct{}
|
||||
}
|
||||
|
||||
// Transform removes labels from the metrics.
|
||||
func (r *RemoveLabels) Transform(mfs []*dto.MetricFamily) []*dto.MetricFamily {
|
||||
for i := range mfs {
|
||||
for j, m := range mfs[i].Metric {
|
||||
// Filter out labels
|
||||
labels := m.Label[:0]
|
||||
for _, l := range m.Label {
|
||||
if _, ok := r.Labels[l.GetName()]; !ok {
|
||||
labels = append(labels, l)
|
||||
}
|
||||
}
|
||||
mfs[i].Metric[j].Label = labels
|
||||
}
|
||||
}
|
||||
return mfs
|
||||
}
|
||||
|
||||
var _ Transformer = (*RenameFamilies)(nil)
|
||||
|
||||
// RenameFamilies changes the name of families to another name
|
||||
type RenameFamilies struct {
|
||||
FromTo map[string]string
|
||||
}
|
||||
|
||||
// Transform renames metric families names.
|
||||
func (r *RenameFamilies) Transform(mfs []*dto.MetricFamily) []*dto.MetricFamily {
|
||||
renamed := mfs[:0]
|
||||
for _, mf := range mfs {
|
||||
if to, ok := r.FromTo[mf.GetName()]; ok {
|
||||
mf.Name = &to
|
||||
}
|
||||
renamed = append(renamed, mf)
|
||||
|
||||
}
|
||||
sort.Sort(familySorter(renamed))
|
||||
return renamed
|
||||
}
|
|
@ -0,0 +1,241 @@
|
|||
package prometheus_test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
pr "github.com/influxdata/influxdb/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
)
|
||||
|
||||
func TestAddLabels_Transform(t *testing.T) {
|
||||
type fields struct {
|
||||
Labels map[string]string
|
||||
}
|
||||
type args struct {
|
||||
mfs []*dto.MetricFamily
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want []*dto.MetricFamily
|
||||
}{
|
||||
{
|
||||
name: "add label from metric replaces label",
|
||||
fields: fields{
|
||||
Labels: map[string]string{
|
||||
"handler": "influxdb",
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
mfs: []*dto.MetricFamily{
|
||||
NewCounter("http_api_requests_total", 10,
|
||||
pr.L("handler", "platform"),
|
||||
pr.L("method", "GET"),
|
||||
pr.L("path", "/api/v2"),
|
||||
pr.L("status", "2XX"),
|
||||
),
|
||||
},
|
||||
},
|
||||
want: []*dto.MetricFamily{
|
||||
NewCounter("http_api_requests_total", 10,
|
||||
pr.L("handler", "influxdb"),
|
||||
pr.L("method", "GET"),
|
||||
pr.L("path", "/api/v2"),
|
||||
pr.L("status", "2XX"),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "add label from metric replaces label",
|
||||
fields: fields{
|
||||
Labels: map[string]string{
|
||||
"org": "myorg",
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
mfs: []*dto.MetricFamily{
|
||||
NewCounter("http_api_requests_total", 10,
|
||||
pr.L("handler", "platform"),
|
||||
pr.L("method", "GET"),
|
||||
pr.L("path", "/api/v2"),
|
||||
pr.L("status", "2XX"),
|
||||
),
|
||||
},
|
||||
},
|
||||
want: []*dto.MetricFamily{
|
||||
NewCounter("http_api_requests_total", 10,
|
||||
pr.L("handler", "platform"),
|
||||
pr.L("method", "GET"),
|
||||
pr.L("org", "myorg"),
|
||||
pr.L("path", "/api/v2"),
|
||||
pr.L("status", "2XX"),
|
||||
),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
a := &pr.AddLabels{
|
||||
Labels: tt.fields.Labels,
|
||||
}
|
||||
if got := a.Transform(tt.args.mfs); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("AddLabels.Transform() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveLabels_Transform(t *testing.T) {
|
||||
type fields struct {
|
||||
Labels map[string]struct{}
|
||||
}
|
||||
type args struct {
|
||||
mfs []*dto.MetricFamily
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want []*dto.MetricFamily
|
||||
}{
|
||||
{
|
||||
name: "remove label from metric",
|
||||
fields: fields{
|
||||
Labels: map[string]struct{}{
|
||||
"handler": struct{}{},
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
mfs: []*dto.MetricFamily{
|
||||
NewCounter("http_api_requests_total", 10,
|
||||
pr.L("handler", "platform"),
|
||||
pr.L("method", "GET"),
|
||||
pr.L("path", "/api/v2"),
|
||||
pr.L("status", "2XX"),
|
||||
),
|
||||
},
|
||||
},
|
||||
want: []*dto.MetricFamily{
|
||||
NewCounter("http_api_requests_total", 10,
|
||||
pr.L("method", "GET"),
|
||||
pr.L("path", "/api/v2"),
|
||||
pr.L("status", "2XX"),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no match removes no labels",
|
||||
fields: fields{
|
||||
Labels: map[string]struct{}{
|
||||
"handler": struct{}{},
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
mfs: []*dto.MetricFamily{
|
||||
NewCounter("http_api_requests_total", 10,
|
||||
pr.L("method", "GET"),
|
||||
pr.L("path", "/api/v2"),
|
||||
pr.L("status", "2XX"),
|
||||
),
|
||||
},
|
||||
},
|
||||
want: []*dto.MetricFamily{
|
||||
NewCounter("http_api_requests_total", 10,
|
||||
pr.L("method", "GET"),
|
||||
pr.L("path", "/api/v2"),
|
||||
pr.L("status", "2XX"),
|
||||
),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
r := &pr.RemoveLabels{
|
||||
Labels: tt.fields.Labels,
|
||||
}
|
||||
if got := r.Transform(tt.args.mfs); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("RemoveLabels.Transform() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRenameFamilies_Transform(t *testing.T) {
|
||||
type fields struct {
|
||||
FromTo map[string]string
|
||||
}
|
||||
type args struct {
|
||||
mfs []*dto.MetricFamily
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want []*dto.MetricFamily
|
||||
}{
|
||||
{
|
||||
name: "rename metric family in sort order",
|
||||
fields: fields{
|
||||
FromTo: map[string]string{
|
||||
"http_api_requests_total": "api_requests_total",
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
mfs: []*dto.MetricFamily{
|
||||
NewCounter("handler", 10,
|
||||
pr.L("handler", "platform"),
|
||||
),
|
||||
NewCounter("http_api_requests_total", 10,
|
||||
pr.L("handler", "platform"),
|
||||
pr.L("method", "GET"),
|
||||
pr.L("path", "/api/v2"),
|
||||
pr.L("status", "2XX"),
|
||||
),
|
||||
},
|
||||
},
|
||||
want: []*dto.MetricFamily{
|
||||
NewCounter("api_requests_total", 10,
|
||||
pr.L("handler", "platform"),
|
||||
pr.L("method", "GET"),
|
||||
pr.L("path", "/api/v2"),
|
||||
pr.L("status", "2XX"),
|
||||
),
|
||||
NewCounter("handler", 10,
|
||||
pr.L("handler", "platform"),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ignored if not found",
|
||||
fields: fields{
|
||||
FromTo: map[string]string{
|
||||
"http_api_requests_total": "api_requests_total",
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
mfs: []*dto.MetricFamily{
|
||||
NewCounter("handler", 10,
|
||||
pr.L("handler", "platform"),
|
||||
),
|
||||
},
|
||||
},
|
||||
want: []*dto.MetricFamily{
|
||||
NewCounter("handler", 10,
|
||||
pr.L("handler", "platform"),
|
||||
),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
r := &pr.RenameFamilies{
|
||||
FromTo: tt.fields.FromTo,
|
||||
}
|
||||
if got := r.Transform(tt.args.mfs); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("RenameFamilies.Transform() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
## Telemetry Data
|
||||
|
||||
Telemetry is first collected by retrieving prometheus data from a Gatherer.
|
||||
Next, the collected data is filtered by matching a subset of prometheus families.
|
||||
Finally, the data is transmitted to a prometheus push gateway handler.
|
||||
|
||||
The handler enriches the metrics with the timestamp when the data is
|
||||
received.
|
|
@ -0,0 +1,169 @@
|
|||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/influxdata/influxdb/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultTimeout is the length of time servicing the metrics before canceling.
|
||||
DefaultTimeout = 10 * time.Second
|
||||
// DefaultMaxBytes is the largest request body read.
|
||||
DefaultMaxBytes = 1024000
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrMetricsTimestampPresent is returned when the prometheus metrics has timestamps set.
|
||||
// Not sure why, but, pushgateway does not allow timestamps.
|
||||
ErrMetricsTimestampPresent = fmt.Errorf("pushed metrics must not have timestamp")
|
||||
)
|
||||
|
||||
// PushGateway handles receiving prometheus push metrics and forwards them to the Store.
|
||||
// If Format is not set, the format of the inbound metrics are used.
|
||||
type PushGateway struct {
|
||||
Timeout time.Duration // handler returns after this duration with an error; defaults to 5 seconds
|
||||
MaxBytes int64 // maximum number of bytes to read from the body; defaults to 1024000
|
||||
Logger *zap.Logger
|
||||
|
||||
Store Store
|
||||
Transformers []prometheus.Transformer
|
||||
|
||||
Encoder prometheus.Encoder
|
||||
}
|
||||
|
||||
// NewPushGateway constructs the PushGateway.
|
||||
func NewPushGateway(logger *zap.Logger, store Store, xforms ...prometheus.Transformer) *PushGateway {
|
||||
if len(xforms) == 0 {
|
||||
xforms = append(xforms, &AddTimestamps{})
|
||||
}
|
||||
return &PushGateway{
|
||||
Store: store,
|
||||
Transformers: xforms,
|
||||
Logger: logger,
|
||||
Timeout: DefaultTimeout,
|
||||
MaxBytes: DefaultMaxBytes,
|
||||
}
|
||||
}
|
||||
|
||||
// Handler accepts prometheus metrics send via the Push client and sends those
|
||||
// metrics into the store.
|
||||
func (p *PushGateway) Handler(w http.ResponseWriter, r *http.Request) {
|
||||
// redirect to agreement to give our users information about
|
||||
// this collected data.
|
||||
switch r.Method {
|
||||
case http.MethodGet, http.MethodHead:
|
||||
http.Redirect(w, r, "https://www.influxdata.com/legal/data-processing-agreement/", http.StatusSeeOther)
|
||||
return
|
||||
case http.MethodPost, http.MethodPut:
|
||||
default:
|
||||
w.Header().Set("Allow", "GET, HEAD, PUT, POST")
|
||||
http.Error(w,
|
||||
http.StatusText(http.StatusMethodNotAllowed),
|
||||
http.StatusMethodNotAllowed,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
if p.Timeout == 0 {
|
||||
p.Timeout = DefaultTimeout
|
||||
}
|
||||
|
||||
if p.MaxBytes == 0 {
|
||||
p.MaxBytes = DefaultMaxBytes
|
||||
}
|
||||
|
||||
if p.Encoder == nil {
|
||||
p.Encoder = &prometheus.Expfmt{
|
||||
Format: expfmt.FmtText,
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(
|
||||
r.Context(),
|
||||
p.Timeout,
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
r = r.WithContext(ctx)
|
||||
defer r.Body.Close()
|
||||
|
||||
format, err := metricsFormat(r.Header)
|
||||
if err != nil {
|
||||
p.Logger.Error("metrics format not support", zap.Error(err))
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
mfs, err := decodePostMetricsRequest(r.Body, format, p.MaxBytes)
|
||||
if err != nil {
|
||||
p.Logger.Error("unable to decode metrics", zap.Error(err))
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if err := valid(mfs); err != nil {
|
||||
p.Logger.Error("invalid metrics", zap.Error(err))
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
for _, transformer := range p.Transformers {
|
||||
mfs = transformer.Transform(mfs)
|
||||
}
|
||||
|
||||
data, err := p.Encoder.Encode(mfs)
|
||||
if err != nil {
|
||||
p.Logger.Error("unable to encode metric families", zap.Error(err))
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if err := p.Store.WriteMessage(ctx, data); err != nil {
|
||||
p.Logger.Error("unable to write to store", zap.Error(err))
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
|
||||
func metricsFormat(headers http.Header) (expfmt.Format, error) {
|
||||
format := expfmt.ResponseFormat(headers)
|
||||
if format == expfmt.FmtUnknown {
|
||||
return "", fmt.Errorf("unknown format metrics format")
|
||||
}
|
||||
return format, nil
|
||||
}
|
||||
|
||||
func decodePostMetricsRequest(body io.Reader, format expfmt.Format, maxBytes int64) ([]*dto.MetricFamily, error) {
|
||||
// protect against reading too many bytes
|
||||
r := io.LimitReader(body, maxBytes)
|
||||
|
||||
mfs, err := prometheus.DecodeExpfmt(r, format)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return mfs, nil
|
||||
}
|
||||
|
||||
// prom's pushgateway does not allow timestamps for some reason.
|
||||
func valid(mfs []*dto.MetricFamily) error {
|
||||
// Checks if any timestamps have been specified.
|
||||
for i := range mfs {
|
||||
for j := range mfs[i].Metric {
|
||||
if mfs[i].Metric[j].TimestampMs != nil {
|
||||
return ErrMetricsTimestampPresent
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,305 @@
|
|||
package telemetry
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
pr "github.com/influxdata/influxdb/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestPushGateway_Handler(t *testing.T) {
|
||||
type fields struct {
|
||||
Store *mockStore
|
||||
now func() time.Time
|
||||
}
|
||||
type args struct {
|
||||
w *httptest.ResponseRecorder
|
||||
r *http.Request
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
contentType string
|
||||
wantStatus int
|
||||
want []byte
|
||||
}{
|
||||
|
||||
{
|
||||
name: "unknown content-type is a bad request",
|
||||
fields: fields{
|
||||
Store: &mockStore{},
|
||||
},
|
||||
args: args{
|
||||
w: httptest.NewRecorder(),
|
||||
r: httptest.NewRequest("POST", "/", nil),
|
||||
},
|
||||
wantStatus: http.StatusBadRequest,
|
||||
},
|
||||
|
||||
{
|
||||
name: "bad metric with timestamp is a bad request",
|
||||
fields: fields{
|
||||
Store: &mockStore{},
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
},
|
||||
args: args{
|
||||
w: httptest.NewRecorder(),
|
||||
r: httptest.NewRequest("POST", "/",
|
||||
mustEncode(t,
|
||||
[]*dto.MetricFamily{badMetric()},
|
||||
),
|
||||
),
|
||||
},
|
||||
contentType: string(expfmt.FmtProtoDelim),
|
||||
wantStatus: http.StatusBadRequest,
|
||||
},
|
||||
{
|
||||
name: "store error is an internal server error",
|
||||
fields: fields{
|
||||
Store: &mockStore{
|
||||
err: fmt.Errorf("e1"),
|
||||
},
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
},
|
||||
args: args{
|
||||
w: httptest.NewRecorder(),
|
||||
r: httptest.NewRequest("POST", "/",
|
||||
mustEncode(t,
|
||||
[]*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))},
|
||||
),
|
||||
),
|
||||
},
|
||||
contentType: string(expfmt.FmtProtoDelim),
|
||||
wantStatus: http.StatusInternalServerError,
|
||||
want: []byte(`[{"name":"mf1","type":0,"metric":[{"label":[{"name":"n1","value":"v1"}],"counter":{"value":1},"timestamp_ms":0}]}]`),
|
||||
},
|
||||
{
|
||||
name: "metric store in store",
|
||||
fields: fields{
|
||||
Store: &mockStore{},
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
},
|
||||
args: args{
|
||||
w: httptest.NewRecorder(),
|
||||
r: httptest.NewRequest("POST", "/",
|
||||
mustEncode(t,
|
||||
[]*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))},
|
||||
),
|
||||
),
|
||||
},
|
||||
contentType: string(expfmt.FmtProtoDelim),
|
||||
wantStatus: http.StatusAccepted,
|
||||
want: []byte(`[{"name":"mf1","type":0,"metric":[{"label":[{"name":"n1","value":"v1"}],"counter":{"value":1},"timestamp_ms":0}]}]`),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
p := NewPushGateway(
|
||||
zaptest.NewLogger(t),
|
||||
tt.fields.Store,
|
||||
&AddTimestamps{
|
||||
now: tt.fields.now,
|
||||
},
|
||||
)
|
||||
p.Encoder = &pr.JSON{}
|
||||
tt.args.r.Header.Set("Content-Type", tt.contentType)
|
||||
p.Handler(tt.args.w, tt.args.r)
|
||||
|
||||
if tt.args.w.Code != http.StatusAccepted {
|
||||
t.Logf("Body: %s", tt.args.w.Body.String())
|
||||
}
|
||||
if got, want := tt.args.w.Code, tt.wantStatus; got != want {
|
||||
t.Errorf("PushGateway.Handler() StatusCode = %v, want %v", got, want)
|
||||
}
|
||||
|
||||
if got, want := tt.fields.Store.data, tt.want; string(got) != string(want) {
|
||||
t.Errorf("PushGateway.Handler() Data = %s, want %s", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_decodePostMetricsRequest(t *testing.T) {
|
||||
type args struct {
|
||||
req *http.Request
|
||||
maxBytes int64
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
contentType string
|
||||
want []*dto.MetricFamily
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "bad body returns no metrics",
|
||||
args: args{
|
||||
req: httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte{0x10})),
|
||||
maxBytes: 10,
|
||||
},
|
||||
contentType: string(expfmt.FmtProtoDelim),
|
||||
want: []*dto.MetricFamily{},
|
||||
},
|
||||
{
|
||||
name: "no body returns no metrics",
|
||||
args: args{
|
||||
req: httptest.NewRequest("POST", "/", nil),
|
||||
maxBytes: 10,
|
||||
},
|
||||
contentType: string(expfmt.FmtProtoDelim),
|
||||
want: []*dto.MetricFamily{},
|
||||
},
|
||||
{
|
||||
name: "metrics are returned from POST",
|
||||
args: args{
|
||||
req: httptest.NewRequest("POST", "/",
|
||||
mustEncode(t,
|
||||
[]*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))},
|
||||
),
|
||||
),
|
||||
maxBytes: 31,
|
||||
},
|
||||
contentType: string(expfmt.FmtProtoDelim),
|
||||
want: []*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))},
|
||||
},
|
||||
{
|
||||
name: "max bytes limits on record boundary returns a single record",
|
||||
args: args{
|
||||
req: httptest.NewRequest("POST", "/",
|
||||
mustEncode(t,
|
||||
[]*dto.MetricFamily{
|
||||
NewCounter("mf1", 1.0, pr.L("n1", "v1")),
|
||||
NewCounter("mf2", 1.0, pr.L("n2", "v2")),
|
||||
},
|
||||
),
|
||||
),
|
||||
maxBytes: 31,
|
||||
},
|
||||
contentType: string(expfmt.FmtProtoDelim),
|
||||
want: []*dto.MetricFamily{NewCounter("mf1", 1.0, pr.L("n1", "v1"))},
|
||||
},
|
||||
{
|
||||
name: "exceeding max bytes returns an error",
|
||||
args: args{
|
||||
req: httptest.NewRequest("POST", "/",
|
||||
mustEncode(t,
|
||||
[]*dto.MetricFamily{
|
||||
NewCounter("mf1", 1.0, pr.L("n1", "v1")),
|
||||
NewCounter("mf2", 1.0, pr.L("n2", "v2")),
|
||||
},
|
||||
),
|
||||
),
|
||||
maxBytes: 33,
|
||||
},
|
||||
contentType: string(expfmt.FmtProtoDelim),
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
tt.args.req.Header.Set("Content-Type", tt.contentType)
|
||||
got, err := decodePostMetricsRequest(tt.args.req.Body, expfmt.Format(tt.contentType), tt.args.maxBytes)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("decodePostMetricsRequest() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("decodePostMetricsRequest() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func badMetric() *dto.MetricFamily {
|
||||
return &dto.MetricFamily{
|
||||
Name: proto.String("bad"),
|
||||
Type: dto.MetricType_COUNTER.Enum(),
|
||||
Metric: []*dto.Metric{
|
||||
&dto.Metric{
|
||||
Label: []*dto.LabelPair{pr.L("n1", "v1")},
|
||||
Counter: &dto.Counter{
|
||||
Value: proto.Float64(1.0),
|
||||
},
|
||||
TimestampMs: proto.Int64(1),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func goodMetric() *dto.MetricFamily {
|
||||
return &dto.MetricFamily{
|
||||
Name: proto.String("good"),
|
||||
Type: dto.MetricType_COUNTER.Enum(),
|
||||
Metric: []*dto.Metric{
|
||||
&dto.Metric{
|
||||
Label: []*dto.LabelPair{pr.L("n1", "v1")},
|
||||
Counter: &dto.Counter{
|
||||
Value: proto.Float64(1.0),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func Test_valid(t *testing.T) {
|
||||
type args struct {
|
||||
mfs []*dto.MetricFamily
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "metric with timestamp is invalid",
|
||||
args: args{
|
||||
mfs: []*dto.MetricFamily{badMetric()},
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "metric without timestamp is valid",
|
||||
args: args{
|
||||
mfs: []*dto.MetricFamily{goodMetric()},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if err := valid(tt.args.mfs); (err != nil) != tt.wantErr {
|
||||
t.Errorf("valid() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type mockStore struct {
|
||||
data []byte
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *mockStore) WriteMessage(ctx context.Context, data []byte) error {
|
||||
m.data = data
|
||||
return m.err
|
||||
|
||||
}
|
||||
|
||||
func mustEncode(t *testing.T, mfs []*dto.MetricFamily) io.Reader {
|
||||
b, err := pr.EncodeExpfmt(mfs)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to encode %v", err)
|
||||
}
|
||||
return bytes.NewBuffer(b)
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
package telemetry
|
||||
|
||||
import (
|
||||
pr "github.com/influxdata/influxdb/prometheus"
|
||||
)
|
||||
|
||||
var telemetryMatcher = pr.NewMatcher().
|
||||
/*
|
||||
* Runtime stats
|
||||
*/
|
||||
Family("influxdb_info"). // includes version, os, etc.
|
||||
Family("influxdb_uptime_seconds").
|
||||
/*
|
||||
* Resource Counts
|
||||
*/
|
||||
Family("influxdb_organizations_total").
|
||||
Family("influxdb_buckets_total").
|
||||
Family("influxdb_users_total").
|
||||
Family("influxdb_tokens_total").
|
||||
Family("influxdb_dashboards_total").
|
||||
Family("influxdb_scrapers_total").
|
||||
Family("influxdb_telegrafs_total").
|
||||
Family("task_scheduler_claims_active"). // Count of currently active tasks
|
||||
/*
|
||||
* Count of API requests including success and failure
|
||||
*/
|
||||
Family("http_api_requests_total").
|
||||
/*
|
||||
* Count of writes and queries
|
||||
*/
|
||||
Family("storage_wal_writes_total").
|
||||
Family("query_control_requests_total").
|
||||
/*
|
||||
* Query analysis
|
||||
*/
|
||||
Family("query_control_functions_total"). // Count of functions in queries (e.g. mean, median)
|
||||
Family("query_control_all_duration_seconds"). // Total query duration per org.
|
||||
/*
|
||||
* Write analysis
|
||||
*/
|
||||
Family("http_api_request_duration_seconds_bucket",
|
||||
pr.L("path", "/api/v2/write"), // Count only the durations of the /write endpoint.
|
||||
).
|
||||
/*
|
||||
* Storage cardinality
|
||||
*/
|
||||
Family("storage_tsi_index_series_total").
|
||||
/*
|
||||
* Storage disk usage
|
||||
*/
|
||||
Family("storage_series_file_disk_bytes"). // All families need to be aggregated to
|
||||
Family("storage_wal_current_segment_bytes"). // get a true idea of disk usage.
|
||||
Family("storage_tsm_files_disk_bytes")
|
|
@ -0,0 +1,116 @@
|
|||
package telemetry
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
pr "github.com/influxdata/influxdb/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
)
|
||||
|
||||
// Pusher pushes metrics to a prometheus push gateway.
|
||||
type Pusher struct {
|
||||
URL string
|
||||
Gather prometheus.Gatherer
|
||||
Client *http.Client
|
||||
PushFormat expfmt.Format
|
||||
}
|
||||
|
||||
// NewPusher sends usage metrics to a prometheus push gateway.
|
||||
func NewPusher(g prometheus.Gatherer) *Pusher {
|
||||
return &Pusher{
|
||||
//URL: "https://telemetry.influxdata.com/metrics/job/influxdb",
|
||||
URL: "http://127.0.0.1:8080/metrics/job/influxdb",
|
||||
Gather: &pr.Filter{
|
||||
Gatherer: g,
|
||||
Matcher: telemetryMatcher,
|
||||
},
|
||||
Client: &http.Client{
|
||||
Transport: http.DefaultTransport,
|
||||
Timeout: 10 * time.Second,
|
||||
},
|
||||
PushFormat: expfmt.FmtText,
|
||||
}
|
||||
}
|
||||
|
||||
// Push POSTs prometheus metrics in protobuf delimited format to a push gateway.
|
||||
func (p *Pusher) Push(ctx context.Context) error {
|
||||
if p.PushFormat == "" {
|
||||
p.PushFormat = expfmt.FmtText
|
||||
}
|
||||
|
||||
resps := make(chan (error))
|
||||
go func() {
|
||||
resps <- p.push(ctx)
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-resps:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pusher) push(ctx context.Context) error {
|
||||
r, err := p.encode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// when there are no metrics to send, then, no need to POST.
|
||||
if r == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(http.MethodPost, p.URL, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
req.Header.Set("Content-Type", string(p.PushFormat))
|
||||
|
||||
res, err := p.Client.Do(req)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusAccepted {
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
return fmt.Errorf("unable to POST metrics; received status %s: %s", http.StatusText(res.StatusCode), body)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pusher) encode() (io.Reader, error) {
|
||||
mfs, err := p.Gather.Gather()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(mfs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
b, err := pr.EncodeExpfmt(mfs, p.PushFormat)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return bytes.NewBuffer(b), nil
|
||||
}
|
|
@ -0,0 +1,150 @@
|
|||
package telemetry
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
)
|
||||
|
||||
func TestPusher_Push(t *testing.T) {
|
||||
type check struct {
|
||||
Method string
|
||||
Body []byte
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
gather prometheus.Gatherer
|
||||
timeout time.Duration
|
||||
status int
|
||||
|
||||
want check
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "no metrics no push",
|
||||
gather: prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
return nil, nil
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "timeout while gathering data returns error",
|
||||
gather: prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
time.Sleep(time.Hour)
|
||||
return nil, nil
|
||||
}),
|
||||
timeout: time.Millisecond,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "timeout server timeout data returns error",
|
||||
gather: prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
mf := &dto.MetricFamily{}
|
||||
return []*dto.MetricFamily{mf}, nil
|
||||
}),
|
||||
timeout: time.Millisecond,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "error gathering metrics returns error",
|
||||
gather: prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
return nil, fmt.Errorf("e1")
|
||||
}),
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "status code that is not Accepted (202) is an error",
|
||||
gather: prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
mf := &dto.MetricFamily{}
|
||||
return []*dto.MetricFamily{mf}, nil
|
||||
}),
|
||||
status: http.StatusInternalServerError,
|
||||
want: check{
|
||||
Method: http.MethodPost,
|
||||
Body: []byte{0x00},
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "sending metric are marshalled into delimited protobufs",
|
||||
gather: prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
mf := &dto.MetricFamily{
|
||||
Name: proto.String("n1"),
|
||||
Help: proto.String("h1"),
|
||||
}
|
||||
return []*dto.MetricFamily{mf}, nil
|
||||
}),
|
||||
status: http.StatusAccepted,
|
||||
want: check{
|
||||
Method: http.MethodPost,
|
||||
Body: MustMarshal(&dto.MetricFamily{
|
||||
Name: proto.String("n1"),
|
||||
Help: proto.String("h1"),
|
||||
}),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
if tt.timeout > 0 {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, tt.timeout)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
var got check
|
||||
srv := httptest.NewServer(
|
||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if tt.timeout > 0 { // testing server timeouts
|
||||
r = r.WithContext(ctx)
|
||||
_ = r
|
||||
<-ctx.Done()
|
||||
return
|
||||
}
|
||||
got.Method = r.Method
|
||||
got.Body, _ = ioutil.ReadAll(r.Body)
|
||||
w.WriteHeader(tt.status)
|
||||
}),
|
||||
)
|
||||
defer srv.Close()
|
||||
|
||||
url := srv.URL
|
||||
client := srv.Client()
|
||||
p := &Pusher{
|
||||
URL: url,
|
||||
Gather: tt.gather,
|
||||
Client: client,
|
||||
PushFormat: expfmt.FmtProtoDelim,
|
||||
}
|
||||
if err := p.Push(ctx); (err != nil) != tt.wantErr {
|
||||
t.Errorf("Pusher.Push() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !cmp.Equal(got, tt.want) {
|
||||
t.Errorf("%q. Pusher.Push() = -got/+want %s", tt.name, cmp.Diff(got, tt.want))
|
||||
t.Logf("%v\n%v", got.Body, tt.want.Body)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func MustMarshal(mf *dto.MetricFamily) []byte {
|
||||
buf := &bytes.Buffer{}
|
||||
_, err := pbutil.WriteDelimited(buf, mf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return buf.Bytes()
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
influxlogger "github.com/influxdata/influxdb/logger"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Reporter reports telemetry metrics to a prometheus push
|
||||
// gateway every interval.
|
||||
type Reporter struct {
|
||||
Pusher *Pusher
|
||||
Logger *zap.Logger
|
||||
Interval time.Duration
|
||||
}
|
||||
|
||||
// NewReporter reports telemetry every 24 hours.
|
||||
func NewReporter(g prometheus.Gatherer) *Reporter {
|
||||
return &Reporter{
|
||||
Pusher: NewPusher(g),
|
||||
Logger: zap.NewNop(),
|
||||
Interval: 24 * time.Hour,
|
||||
}
|
||||
}
|
||||
|
||||
// Report starts periodic telemetry reporting each interval.
|
||||
func (r *Reporter) Report(ctx context.Context) {
|
||||
logger := r.Logger.With(
|
||||
zap.String("service", "telemetry"),
|
||||
influxlogger.DurationLiteral("interval", r.Interval),
|
||||
)
|
||||
|
||||
logger.Info("Starting")
|
||||
if err := r.Pusher.Push(ctx); err != nil {
|
||||
logger.Debug("failure reporting telemetry metrics", zap.Error(err))
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(r.Interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
logger.Debug("Reporting")
|
||||
if err := r.Pusher.Push(ctx); err != nil {
|
||||
logger.Debug("failure reporting telemetry metrics", zap.Error(err))
|
||||
}
|
||||
case <-ctx.Done():
|
||||
logger.Info("Stopping")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pr "github.com/influxdata/influxdb/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestReport(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
logger := zaptest.NewLogger(t)
|
||||
store := newReportingStore()
|
||||
timestamps := &AddTimestamps{
|
||||
now: func() time.Time {
|
||||
return time.Unix(0, 0)
|
||||
},
|
||||
}
|
||||
|
||||
gw := NewPushGateway(logger, store, timestamps)
|
||||
gw.Encoder = &pr.JSON{}
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(gw.Handler))
|
||||
defer ts.Close()
|
||||
|
||||
mfs := []*dto.MetricFamily{NewCounter("influxdb_buckets_total", 1.0)}
|
||||
gatherer := prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
|
||||
return mfs, nil
|
||||
})
|
||||
|
||||
pusher := NewPusher(gatherer)
|
||||
pusher.URL = ts.URL
|
||||
|
||||
reporter := &Reporter{
|
||||
Pusher: pusher,
|
||||
Logger: logger,
|
||||
Interval: 30 * time.Second,
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
defer wg.Wait()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
reporter.Report(ctx)
|
||||
}()
|
||||
|
||||
got := <-store.ch
|
||||
|
||||
// Encode to JSON to make it easier to compare
|
||||
want, _ := pr.EncodeJSON(timestamps.Transform(mfs))
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Errorf("Reporter.Report() = %s, want %s", got, want)
|
||||
}
|
||||
|
||||
cancel()
|
||||
}
|
||||
|
||||
func newReportingStore() *reportingStore {
|
||||
return &reportingStore{
|
||||
ch: make(chan []byte, 1),
|
||||
}
|
||||
}
|
||||
|
||||
type reportingStore struct {
|
||||
ch chan []byte
|
||||
}
|
||||
|
||||
func (s *reportingStore) WriteMessage(ctx context.Context, data []byte) error {
|
||||
s.ch <- data
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Store records usage data.
|
||||
type Store interface {
|
||||
// WriteMessage stores data into the store.
|
||||
WriteMessage(ctx context.Context, data []byte) error
|
||||
}
|
||||
|
||||
var _ Store = (*LogStore)(nil)
|
||||
|
||||
// LogStore logs data written to the store.
|
||||
type LogStore struct {
|
||||
Logger *zap.Logger
|
||||
}
|
||||
|
||||
// WriteMessage logs data at Info level.
|
||||
func (s *LogStore) WriteMessage(ctx context.Context, data []byte) error {
|
||||
s.Logger.Info("write", zap.String("data", string(data)))
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package telemetry
|
||||
|
||||
import (
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
)
|
||||
|
||||
func NewCounter(name string, v float64, ls ...*dto.LabelPair) *dto.MetricFamily {
|
||||
m := &dto.Metric{
|
||||
Label: ls,
|
||||
Counter: &dto.Counter{
|
||||
Value: &v,
|
||||
},
|
||||
}
|
||||
return &dto.MetricFamily{
|
||||
Name: proto.String(name),
|
||||
Type: dto.MetricType_COUNTER.Enum(),
|
||||
Metric: []*dto.Metric{m},
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
package telemetry
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
)
|
||||
|
||||
const (
|
||||
// just in case the definition of time.Nanosecond changes from 1.
|
||||
nsPerMillisecond = int64(time.Millisecond / time.Nanosecond)
|
||||
)
|
||||
|
||||
var _ prometheus.Transformer = (*AddTimestamps)(nil)
|
||||
|
||||
// AddTimestamps enriches prometheus metrics by adding timestamps.
|
||||
type AddTimestamps struct {
|
||||
now func() time.Time
|
||||
}
|
||||
|
||||
// Transform adds now as a timestamp to all metrics.
|
||||
func (a *AddTimestamps) Transform(mfs []*dto.MetricFamily) []*dto.MetricFamily {
|
||||
now := a.now
|
||||
if now == nil {
|
||||
now = time.Now
|
||||
}
|
||||
nowMilliseconds := now().UnixNano() / nsPerMillisecond
|
||||
|
||||
for i := range mfs {
|
||||
for j := range mfs[i].Metric {
|
||||
mfs[i].Metric[j].TimestampMs = &nowMilliseconds
|
||||
}
|
||||
}
|
||||
return mfs
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
package telemetry
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
pr "github.com/influxdata/influxdb/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
)
|
||||
|
||||
func goodMetricWithTime() *dto.MetricFamily {
|
||||
return &dto.MetricFamily{
|
||||
Name: proto.String("good"),
|
||||
Type: dto.MetricType_COUNTER.Enum(),
|
||||
Metric: []*dto.Metric{
|
||||
&dto.Metric{
|
||||
Label: []*dto.LabelPair{pr.L("n1", "v1")},
|
||||
Counter: &dto.Counter{
|
||||
Value: proto.Float64(1.0),
|
||||
},
|
||||
TimestampMs: proto.Int64(1),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddTimestamps(t *testing.T) {
|
||||
type args struct {
|
||||
mfs []*dto.MetricFamily
|
||||
now func() time.Time
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
}{
|
||||
{
|
||||
args: args{
|
||||
mfs: []*dto.MetricFamily{goodMetric()},
|
||||
now: func() time.Time { return time.Unix(0, int64(time.Millisecond)) },
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ts := AddTimestamps{
|
||||
now: tt.args.now,
|
||||
}
|
||||
got := ts.Transform(tt.args.mfs)
|
||||
want := []*dto.MetricFamily{goodMetricWithTime()}
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Errorf("AddTimestamps.Transform() = %v, want %v", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue