first commit
parent
5854f1b2fb
commit
ae31bebad5
|
@ -1295,4 +1295,4 @@
|
|||
"gopkg.in/robfig/cron.v2",
|
||||
]
|
||||
solver-name = "gps-cdcl"
|
||||
solver-version = 1
|
||||
solver-version = 1
|
|
@ -98,3 +98,7 @@ required = [
|
|||
[[constraint]]
|
||||
branch = "master"
|
||||
name = "github.com/influxdata/flux"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/prometheus/prometheus"
|
||||
version = "=2.3.2"
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/platform"
|
||||
"github.com/influxdata/platform/gather"
|
||||
"github.com/influxdata/platform/nats"
|
||||
)
|
||||
|
||||
const (
|
||||
subjectParse = "sub_parse"
|
||||
subjectStore = "sub_store"
|
||||
)
|
||||
|
||||
func main() {
|
||||
logger := new(log.Logger)
|
||||
|
||||
msg, influxURL := parseFlags()
|
||||
publisher := initNats(logger, influxURL)
|
||||
|
||||
go scheduleGetMetrics(msg, publisher)
|
||||
|
||||
<-make(chan struct{})
|
||||
}
|
||||
|
||||
func parseFlags() (msg gather.ScrapperRequest, influxURL []string) {
|
||||
orgIDstr := flag.String("orgID", "", "the organization id")
|
||||
bucketIDstr := flag.String("bucketID", "", "the bucket id")
|
||||
hostStr := flag.String("pHost", "", "the promethus host")
|
||||
influxStr := flag.String("influxURLs", "", "comma seperated urls")
|
||||
flag.Parse()
|
||||
|
||||
orgID, err := platform.IDFromString(*orgIDstr)
|
||||
if err != nil || orgID == nil || orgID.String() == "" {
|
||||
log.Fatal("Invalid orgID")
|
||||
}
|
||||
|
||||
bucketID, err := platform.IDFromString(*bucketIDstr)
|
||||
if err != nil || bucketID == nil || bucketID.String() == "" {
|
||||
log.Fatal("Invalid bucketID")
|
||||
}
|
||||
|
||||
if *hostStr == "" {
|
||||
log.Fatal("Invalid host")
|
||||
}
|
||||
pURL := *hostStr + "/metrics"
|
||||
|
||||
influxURL = strings.Split(*influxStr, ",")
|
||||
if len(influxURL) == 0 {
|
||||
influxURL = []string{
|
||||
"http://localhost:8086",
|
||||
}
|
||||
}
|
||||
msg = gather.ScrapperRequest{
|
||||
HostURL: pURL,
|
||||
OrgID: *orgID,
|
||||
BucketID: *bucketID,
|
||||
}
|
||||
return msg, influxURL
|
||||
}
|
||||
|
||||
func initNats(logger *log.Logger, influxURL []string) nats.Publisher {
|
||||
server := nats.NewServer(nats.Config{
|
||||
FilestoreDir: ".",
|
||||
})
|
||||
|
||||
if err := server.Open(); err != nil {
|
||||
log.Fatalf("nats server fatal err %v", err)
|
||||
}
|
||||
|
||||
subscriber := nats.NewQueueSubscriber("nats-subscriber")
|
||||
if err := subscriber.Open(); err != nil {
|
||||
log.Fatalf("nats parse subscriber open issue %v", err)
|
||||
}
|
||||
|
||||
publisher := nats.NewAsyncPublisher("nats-publisher")
|
||||
if err := publisher.Open(); err != nil {
|
||||
log.Fatalf("nats parse publisher open issue %v", err)
|
||||
}
|
||||
|
||||
if err := subscriber.Subscribe(subjectParse, "", &gather.ScrapperHandler{
|
||||
Scrapper: gather.NewPrometheusScrapper(
|
||||
gather.NewNatsStorage(
|
||||
gather.NewInfluxStorage(influxURL),
|
||||
subjectStore,
|
||||
logger,
|
||||
publisher,
|
||||
subscriber,
|
||||
),
|
||||
),
|
||||
Logger: logger,
|
||||
}); err != nil {
|
||||
log.Fatalf("nats subscribe error")
|
||||
}
|
||||
return publisher
|
||||
}
|
||||
|
||||
// scheduleGetMetrics will send the scraperRequest to publisher
|
||||
// for every 2 second
|
||||
func scheduleGetMetrics(msg gather.ScrapperRequest, publisher nats.Publisher) {
|
||||
buf := new(bytes.Buffer)
|
||||
b, _ := json.Marshal(msg)
|
||||
buf.Write(b)
|
||||
publisher.Publish(subjectParse, buf)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
scheduleGetMetrics(msg, publisher)
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package gather
|
||||
|
||||
import dto "github.com/prometheus/client_model/go"
|
||||
|
||||
// Metrics is the default influx based metrics
|
||||
type Metrics struct {
|
||||
Name string `json:"name"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Fields map[string]interface{} `json:"fields"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Type dto.MetricType `json:"type"`
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
package gather
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
)
|
||||
|
||||
func TestMetrics(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
ms []Metrics
|
||||
}{
|
||||
{
|
||||
name: "empty",
|
||||
ms: make([]Metrics, 0),
|
||||
},
|
||||
{
|
||||
name: "single",
|
||||
ms: []Metrics{
|
||||
{
|
||||
Timestamp: 12345,
|
||||
Tags: map[string]string{
|
||||
"b": "B",
|
||||
"a": "A",
|
||||
"c": "C",
|
||||
},
|
||||
Fields: map[string]interface{}{
|
||||
"x": 12.3,
|
||||
"y": "a long string",
|
||||
},
|
||||
Type: dto.MetricType_SUMMARY,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multiple",
|
||||
ms: []Metrics{
|
||||
{
|
||||
Timestamp: 12345,
|
||||
Tags: map[string]string{
|
||||
"b": "B",
|
||||
"a": "A",
|
||||
"c": "C",
|
||||
},
|
||||
Fields: map[string]interface{}{
|
||||
"x": 12.3,
|
||||
"y": "a long string",
|
||||
},
|
||||
Type: dto.MetricType_SUMMARY,
|
||||
},
|
||||
|
||||
{
|
||||
Timestamp: 12345,
|
||||
Tags: map[string]string{
|
||||
"b": "B2",
|
||||
"a": "A2",
|
||||
"c": "C2",
|
||||
},
|
||||
Fields: map[string]interface{}{
|
||||
"x": 12.5,
|
||||
"y": "a long string2",
|
||||
},
|
||||
Type: dto.MetricType_GAUGE,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
b, err := json.Marshal(c.ms)
|
||||
if err != nil {
|
||||
t.Fatalf("error in marshaling metrics: %v", err)
|
||||
}
|
||||
result := make([]Metrics, 0)
|
||||
err = json.Unmarshal(b, &result)
|
||||
if err != nil {
|
||||
t.Fatalf("error in unmarshaling metrics: b: %s, %v", string(b), err)
|
||||
}
|
||||
if diff := cmp.Diff(c.ms, result, nil); diff != "" {
|
||||
t.Fatalf("unmarshaling metrics is incorrect, want %v, got %v", c.ms, result)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,210 @@
|
|||
package gather
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math"
|
||||
"mime"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/platform"
|
||||
"github.com/influxdata/platform/nats"
|
||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
)
|
||||
|
||||
/*
|
||||
1. Discoverer
|
||||
2. Scheduler
|
||||
3. Queue
|
||||
4. Scrap
|
||||
5. Storage
|
||||
*/
|
||||
|
||||
// Scrapper gathers metrics from an url
|
||||
type Scrapper interface {
|
||||
Gather(ctx context.Context, orgID, BucketID platform.ID, url string) error
|
||||
}
|
||||
|
||||
// NewPrometheusScrapper returns a new prometheusScraper
|
||||
// to fetch metrics from prometheus /metrics
|
||||
func NewPrometheusScrapper(s Storage) Scrapper {
|
||||
return &prometheusScrapper{
|
||||
Storage: s,
|
||||
}
|
||||
}
|
||||
|
||||
type prometheusScrapper struct {
|
||||
Storage Storage
|
||||
}
|
||||
|
||||
func (p *prometheusScrapper) Gather(
|
||||
ctx context.Context,
|
||||
orgID,
|
||||
BucketID platform.ID,
|
||||
url string,
|
||||
) error {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return p.parse(resp.Body, resp.Header)
|
||||
}
|
||||
|
||||
func (p *prometheusScrapper) parse(r io.Reader, header http.Header) error {
|
||||
var parser expfmt.TextParser
|
||||
|
||||
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Prepare output
|
||||
metricFamilies := make(map[string]*dto.MetricFamily)
|
||||
if err == nil && mediatype == "application/vnd.google.protobuf" &&
|
||||
params["encoding"] == "delimited" &&
|
||||
params["proto"] == "io.prometheus.client.MetricFamily" {
|
||||
for {
|
||||
mf := &dto.MetricFamily{}
|
||||
if _, err := pbutil.ReadDelimited(r, mf); err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return fmt.Errorf("reading metric family protocol buffer failed: %s", err)
|
||||
}
|
||||
metricFamilies[mf.GetName()] = mf
|
||||
}
|
||||
} else {
|
||||
metricFamilies, err = parser.TextToMetricFamilies(r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reading text format failed: %s", err)
|
||||
}
|
||||
}
|
||||
ms := make([]Metrics, 0)
|
||||
|
||||
// read metrics
|
||||
for metricName, mf := range metricFamilies {
|
||||
for _, m := range mf.Metric {
|
||||
// reading tags
|
||||
tags := makeLabels(m)
|
||||
// reading fields
|
||||
fields := make(map[string]interface{})
|
||||
if mf.GetType() == dto.MetricType_SUMMARY {
|
||||
// summary metric
|
||||
fields = makeQuantiles(m)
|
||||
fields["count"] = float64(m.GetSummary().GetSampleCount())
|
||||
fields["sum"] = float64(m.GetSummary().GetSampleSum())
|
||||
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
|
||||
// histogram metric
|
||||
fields = makeBuckets(m)
|
||||
fields["count"] = float64(m.GetHistogram().GetSampleCount())
|
||||
fields["sum"] = float64(m.GetHistogram().GetSampleSum())
|
||||
} else {
|
||||
// standard metric
|
||||
fields = getNameAndValue(m)
|
||||
}
|
||||
if len(fields) > 0 {
|
||||
var t time.Time
|
||||
if m.TimestampMs != nil && *m.TimestampMs > 0 {
|
||||
t = time.Unix(0, *m.TimestampMs*1000000)
|
||||
} else {
|
||||
t = time.Now()
|
||||
}
|
||||
me := Metrics{
|
||||
Timestamp: t.UnixNano(),
|
||||
Tags: tags,
|
||||
Fields: fields,
|
||||
Name: metricName,
|
||||
Type: mf.GetType(),
|
||||
}
|
||||
ms = append(ms, me)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
fmt.Println(len(ms))
|
||||
|
||||
return p.Storage.Record(ms)
|
||||
}
|
||||
|
||||
// Get labels from metric
|
||||
func makeLabels(m *dto.Metric) map[string]string {
|
||||
result := map[string]string{}
|
||||
for _, lp := range m.Label {
|
||||
result[lp.GetName()] = lp.GetValue()
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Get Buckets from histogram metric
|
||||
func makeBuckets(m *dto.Metric) map[string]interface{} {
|
||||
fields := make(map[string]interface{})
|
||||
for _, b := range m.GetHistogram().Bucket {
|
||||
fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount())
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
||||
// Get name and value from metric
|
||||
func getNameAndValue(m *dto.Metric) map[string]interface{} {
|
||||
fields := make(map[string]interface{})
|
||||
if m.Gauge != nil {
|
||||
if !math.IsNaN(m.GetGauge().GetValue()) {
|
||||
fields["gauge"] = float64(m.GetGauge().GetValue())
|
||||
}
|
||||
} else if m.Counter != nil {
|
||||
if !math.IsNaN(m.GetCounter().GetValue()) {
|
||||
fields["counter"] = float64(m.GetCounter().GetValue())
|
||||
}
|
||||
} else if m.Untyped != nil {
|
||||
if !math.IsNaN(m.GetUntyped().GetValue()) {
|
||||
fields["value"] = float64(m.GetUntyped().GetValue())
|
||||
}
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
||||
// Get Quantiles from summary metric
|
||||
func makeQuantiles(m *dto.Metric) map[string]interface{} {
|
||||
fields := make(map[string]interface{})
|
||||
for _, q := range m.GetSummary().Quantile {
|
||||
if !math.IsNaN(q.GetValue()) {
|
||||
fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue())
|
||||
}
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
||||
// ScrapperRequest is the parsing request submited to nats
|
||||
type ScrapperRequest struct {
|
||||
HostURL string `json:"host_url"`
|
||||
OrgID platform.ID `json:"org"`
|
||||
BucketID platform.ID `json:"bucket"`
|
||||
}
|
||||
|
||||
// ScrapperHandler handles parsing subscription
|
||||
type ScrapperHandler struct {
|
||||
Scrapper Scrapper
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
||||
// Process implents nats Handler interface
|
||||
func (h *ScrapperHandler) Process(s nats.Subscription, m nats.Message) {
|
||||
defer m.Ack()
|
||||
msg := new(ScrapperRequest)
|
||||
err := json.Unmarshal(m.Data(), msg)
|
||||
if err != nil {
|
||||
h.Logger.Printf("scrapper processing error %v\n", err)
|
||||
return
|
||||
}
|
||||
err = h.Scrapper.Gather(context.Background(), msg.OrgID, msg.BucketID, msg.HostURL)
|
||||
if err != nil {
|
||||
h.Logger.Println(err.Error())
|
||||
}
|
||||
}
|
|
@ -0,0 +1,227 @@
|
|||
package gather
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/platform"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
)
|
||||
|
||||
type mockStorage struct {
|
||||
Metrics map[int64]Metrics
|
||||
}
|
||||
|
||||
func (s *mockStorage) Record(ms []Metrics) error {
|
||||
for _, m := range ms {
|
||||
s.Metrics[m.Timestamp] = m
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type mockHTTPHandler struct {
|
||||
unauthorized bool
|
||||
noContent bool
|
||||
responseMap map[string]string
|
||||
}
|
||||
|
||||
func (h mockHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if h.unauthorized {
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
if h.noContent {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
s, ok := h.responseMap[r.URL.Path]
|
||||
if !ok {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
|
||||
w.Write([]byte(s))
|
||||
}
|
||||
|
||||
func TestParse(t *testing.T) {
|
||||
option := cmp.Options{
|
||||
cmp.Comparer(func(x, y Metrics) bool {
|
||||
return x.Name == y.Name &&
|
||||
x.Type == y.Type &&
|
||||
reflect.DeepEqual(x.Tags, y.Tags) &&
|
||||
reflect.DeepEqual(x.Fields, y.Fields)
|
||||
}),
|
||||
}
|
||||
orgID, _ := platform.IDFromString("020f755c3c082000")
|
||||
bucketID, _ := platform.IDFromString("020f755c3c082001")
|
||||
cases := []struct {
|
||||
name string
|
||||
ms []Metrics
|
||||
handler *mockHTTPHandler
|
||||
hasErr bool
|
||||
}{
|
||||
{
|
||||
name: "bad request",
|
||||
hasErr: true,
|
||||
},
|
||||
{
|
||||
name: "empty request",
|
||||
handler: &mockHTTPHandler{
|
||||
responseMap: map[string]string{
|
||||
"/metrics": "",
|
||||
},
|
||||
},
|
||||
hasErr: true,
|
||||
},
|
||||
{
|
||||
name: "regular metrics",
|
||||
handler: &mockHTTPHandler{
|
||||
responseMap: map[string]string{
|
||||
"/metrics": sampleResp,
|
||||
},
|
||||
},
|
||||
ms: []Metrics{
|
||||
{
|
||||
Name: "go_gc_duration_seconds",
|
||||
Type: dto.MetricType_SUMMARY,
|
||||
Fields: map[string]interface{}{
|
||||
"count": float64(326),
|
||||
"sum": 0.07497837,
|
||||
"0": 3.6257e-05,
|
||||
"0.25": 0.0001434,
|
||||
"0.5": 0.000194491,
|
||||
"0.75": 0.000270339,
|
||||
"1": 0.000789365,
|
||||
},
|
||||
Tags: map[string]string{},
|
||||
},
|
||||
{
|
||||
Name: "go_goroutines",
|
||||
Type: dto.MetricType_GAUGE,
|
||||
Tags: map[string]string{},
|
||||
Fields: map[string]interface{}{
|
||||
"gauge": float64(36),
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "go_info",
|
||||
Type: dto.MetricType_GAUGE,
|
||||
Tags: map[string]string{
|
||||
"version": "go1.10.3",
|
||||
},
|
||||
Fields: map[string]interface{}{
|
||||
"gauge": float64(1),
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "go_memstats_alloc_bytes",
|
||||
Type: dto.MetricType_GAUGE,
|
||||
Tags: map[string]string{},
|
||||
Fields: map[string]interface{}{
|
||||
"gauge": 2.0091312e+07,
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "go_memstats_alloc_bytes_total",
|
||||
Type: dto.MetricType_COUNTER,
|
||||
Fields: map[string]interface{}{
|
||||
"counter": 4.183173328e+09,
|
||||
},
|
||||
Tags: map[string]string{},
|
||||
},
|
||||
{
|
||||
Name: "go_memstats_buck_hash_sys_bytes",
|
||||
Type: dto.MetricType_GAUGE,
|
||||
Tags: map[string]string{},
|
||||
Fields: map[string]interface{}{
|
||||
"gauge": 1.533852e+06,
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "go_memstats_frees_total",
|
||||
Type: dto.MetricType_COUNTER,
|
||||
Tags: map[string]string{},
|
||||
Fields: map[string]interface{}{
|
||||
"counter": 1.8944339e+07,
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "go_memstats_gc_cpu_fraction",
|
||||
Type: dto.MetricType_GAUGE,
|
||||
Tags: map[string]string{},
|
||||
Fields: map[string]interface{}{
|
||||
"gauge": 1.972734963012756e-05,
|
||||
},
|
||||
},
|
||||
},
|
||||
hasErr: false,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
storage := &mockStorage{
|
||||
Metrics: make(map[int64]Metrics),
|
||||
}
|
||||
scrapper := NewPrometheusScrapper(storage)
|
||||
var url string
|
||||
if c.handler != nil {
|
||||
ts := httptest.NewServer(c.handler)
|
||||
defer ts.Close()
|
||||
url = ts.URL
|
||||
}
|
||||
err := scrapper.Gather(context.Background(), *orgID, *bucketID, url+"/metrics")
|
||||
if err != nil && !c.hasErr {
|
||||
t.Fatalf("scrapper parse err in testing %s: %v", c.name, err)
|
||||
}
|
||||
if len(c.ms) != len(storage.Metrics) {
|
||||
t.Fatalf("scrapper parse metrics incorrect length, want %d, got %d",
|
||||
len(c.ms), len(storage.Metrics))
|
||||
}
|
||||
for _, m := range storage.Metrics {
|
||||
for _, cm := range c.ms {
|
||||
if m.Name == cm.Name {
|
||||
if diff := cmp.Diff(m, cm, option); diff != "" {
|
||||
t.Fatalf("scrapper parse metrics want %v, got %v", cm, m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const sampleResp = `
|
||||
# HELP go_gc_duration_seconds A summary of the GC invocation durations.
|
||||
# TYPE go_gc_duration_seconds summary
|
||||
go_gc_duration_seconds{quantile="0"} 3.6257e-05
|
||||
go_gc_duration_seconds{quantile="0.25"} 0.0001434
|
||||
go_gc_duration_seconds{quantile="0.5"} 0.000194491
|
||||
go_gc_duration_seconds{quantile="0.75"} 0.000270339
|
||||
go_gc_duration_seconds{quantile="1"} 0.000789365
|
||||
go_gc_duration_seconds_sum 0.07497837
|
||||
go_gc_duration_seconds_count 326
|
||||
# HELP go_goroutines Number of goroutines that currently exist.
|
||||
# TYPE go_goroutines gauge
|
||||
go_goroutines 36
|
||||
# HELP go_info Information about the Go environment.
|
||||
# TYPE go_info gauge
|
||||
go_info{version="go1.10.3"} 1
|
||||
# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use.
|
||||
# TYPE go_memstats_alloc_bytes gauge
|
||||
go_memstats_alloc_bytes 2.0091312e+07
|
||||
# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed.
|
||||
# TYPE go_memstats_alloc_bytes_total counter
|
||||
go_memstats_alloc_bytes_total 4.183173328e+09
|
||||
# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table.
|
||||
# TYPE go_memstats_buck_hash_sys_bytes gauge
|
||||
go_memstats_buck_hash_sys_bytes 1.533852e+06
|
||||
# HELP go_memstats_frees_total Total number of frees.
|
||||
# TYPE go_memstats_frees_total counter
|
||||
go_memstats_frees_total 1.8944339e+07
|
||||
# HELP go_memstats_gc_cpu_fraction The fraction of this program's available CPU time used by the GC since the program started.
|
||||
# TYPE go_memstats_gc_cpu_fraction gauge
|
||||
go_memstats_gc_cpu_fraction 1.972734963012756e-05
|
||||
`
|
|
@ -0,0 +1,102 @@
|
|||
package gather
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/influxdata/platform/flux"
|
||||
"github.com/influxdata/platform/nats"
|
||||
)
|
||||
|
||||
// Storage stores the metrics of a time based
|
||||
type Storage interface {
|
||||
//Subscriber nats.Subscriber
|
||||
Record([]Metrics) error
|
||||
}
|
||||
|
||||
// NewNatsStorage use nats to publish each store request
|
||||
// and the subscriber will use the embeded storage to do the store
|
||||
// activity
|
||||
func NewNatsStorage(
|
||||
storage Storage,
|
||||
subject string,
|
||||
logger *log.Logger,
|
||||
publisher nats.Publisher,
|
||||
subscriber nats.Subscriber,
|
||||
) Storage {
|
||||
s := &natsStorage{
|
||||
storage: storage,
|
||||
subject: subject,
|
||||
publisher: publisher,
|
||||
subscriber: subscriber,
|
||||
}
|
||||
s.subscriber.Subscribe(s.subject, "", &storageHandler{
|
||||
storage: storage,
|
||||
logger: logger,
|
||||
})
|
||||
return s
|
||||
}
|
||||
|
||||
type natsStorage struct {
|
||||
storage Storage
|
||||
subject string
|
||||
subscriber nats.Subscriber
|
||||
publisher nats.Publisher
|
||||
}
|
||||
|
||||
func (s *natsStorage) Record(ms []Metrics) error {
|
||||
buf := new(bytes.Buffer)
|
||||
b, err := json.Marshal(ms)
|
||||
if err != nil {
|
||||
return fmt.Errorf("scrapper metrics serialization error: %v", err)
|
||||
}
|
||||
_, err = buf.Write(b)
|
||||
if err != nil {
|
||||
return fmt.Errorf("scrapper metrics buffer write error: %v", err)
|
||||
}
|
||||
if err = s.publisher.Publish(s.subject, buf); err != nil {
|
||||
return fmt.Errorf("scrapper publisher publish error: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type storageHandler struct {
|
||||
storage Storage
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
func (h *storageHandler) Process(s nats.Subscription, m nats.Message) {
|
||||
defer m.Ack()
|
||||
ms := make([]Metrics, 0)
|
||||
err := json.Unmarshal(m.Data(), &ms)
|
||||
if err != nil {
|
||||
h.logger.Printf("storage handler process err: %v\n", err)
|
||||
}
|
||||
err = h.storage.Record(ms)
|
||||
if err != nil {
|
||||
h.logger.Printf("storage handler store err: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
// NewInfluxStorage create a new influx storage
|
||||
// which will storage data directly to influxdb
|
||||
func NewInfluxStorage(urls []string) Storage {
|
||||
return &influxStorage{
|
||||
URLs: urls,
|
||||
}
|
||||
}
|
||||
|
||||
type influxStorage struct {
|
||||
URLs []string
|
||||
Client flux.Client
|
||||
}
|
||||
|
||||
func (s *influxStorage) Record(ms []Metrics) error {
|
||||
println(s.URLs)
|
||||
for _, m := range ms {
|
||||
fmt.Println(m)
|
||||
}
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue