diff --git a/Gopkg.lock b/Gopkg.lock index dc8de1dbb5..a6e3656d39 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1295,4 +1295,4 @@ "gopkg.in/robfig/cron.v2", ] solver-name = "gps-cdcl" - solver-version = 1 + solver-version = 1 \ No newline at end of file diff --git a/Gopkg.toml b/Gopkg.toml index 42790f0ec6..123051ed24 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -98,3 +98,7 @@ required = [ [[constraint]] branch = "master" name = "github.com/influxdata/flux" + +[[constraint]] + name = "github.com/prometheus/prometheus" + version = "=2.3.2" diff --git a/cmd/scrapperd/main.go b/cmd/scrapperd/main.go new file mode 100644 index 0000000000..2178932426 --- /dev/null +++ b/cmd/scrapperd/main.go @@ -0,0 +1,114 @@ +package main + +import ( + "bytes" + "encoding/json" + "flag" + "log" + "strings" + "time" + + "github.com/influxdata/platform" + "github.com/influxdata/platform/gather" + "github.com/influxdata/platform/nats" +) + +const ( + subjectParse = "sub_parse" + subjectStore = "sub_store" +) + +func main() { + logger := new(log.Logger) + + msg, influxURL := parseFlags() + publisher := initNats(logger, influxURL) + + go scheduleGetMetrics(msg, publisher) + + <-make(chan struct{}) +} + +func parseFlags() (msg gather.ScrapperRequest, influxURL []string) { + orgIDstr := flag.String("orgID", "", "the organization id") + bucketIDstr := flag.String("bucketID", "", "the bucket id") + hostStr := flag.String("pHost", "", "the promethus host") + influxStr := flag.String("influxURLs", "", "comma seperated urls") + flag.Parse() + + orgID, err := platform.IDFromString(*orgIDstr) + if err != nil || orgID == nil || orgID.String() == "" { + log.Fatal("Invalid orgID") + } + + bucketID, err := platform.IDFromString(*bucketIDstr) + if err != nil || bucketID == nil || bucketID.String() == "" { + log.Fatal("Invalid bucketID") + } + + if *hostStr == "" { + log.Fatal("Invalid host") + } + pURL := *hostStr + "/metrics" + + influxURL = strings.Split(*influxStr, ",") + if len(influxURL) == 0 { + influxURL = []string{ + "http://localhost:8086", + } + } + msg = gather.ScrapperRequest{ + HostURL: pURL, + OrgID: *orgID, + BucketID: *bucketID, + } + return msg, influxURL +} + +func initNats(logger *log.Logger, influxURL []string) nats.Publisher { + server := nats.NewServer(nats.Config{ + FilestoreDir: ".", + }) + + if err := server.Open(); err != nil { + log.Fatalf("nats server fatal err %v", err) + } + + subscriber := nats.NewQueueSubscriber("nats-subscriber") + if err := subscriber.Open(); err != nil { + log.Fatalf("nats parse subscriber open issue %v", err) + } + + publisher := nats.NewAsyncPublisher("nats-publisher") + if err := publisher.Open(); err != nil { + log.Fatalf("nats parse publisher open issue %v", err) + } + + if err := subscriber.Subscribe(subjectParse, "", &gather.ScrapperHandler{ + Scrapper: gather.NewPrometheusScrapper( + gather.NewNatsStorage( + gather.NewInfluxStorage(influxURL), + subjectStore, + logger, + publisher, + subscriber, + ), + ), + Logger: logger, + }); err != nil { + log.Fatalf("nats subscribe error") + } + return publisher +} + +// scheduleGetMetrics will send the scraperRequest to publisher +// for every 2 second +func scheduleGetMetrics(msg gather.ScrapperRequest, publisher nats.Publisher) { + buf := new(bytes.Buffer) + b, _ := json.Marshal(msg) + buf.Write(b) + publisher.Publish(subjectParse, buf) + + time.Sleep(2 * time.Second) + scheduleGetMetrics(msg, publisher) +} diff --git a/gather/metrics.go b/gather/metrics.go new file mode 100644 index 0000000000..a84bf49838 --- /dev/null +++ b/gather/metrics.go @@ -0,0 +1,12 @@ +package gather + +import dto "github.com/prometheus/client_model/go" + +// Metrics is the default influx based metrics +type Metrics struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Fields map[string]interface{} `json:"fields"` + Timestamp int64 `json:"timestamp"` + Type dto.MetricType `json:"type"` +} diff --git a/gather/metrics_test.go b/gather/metrics_test.go new file mode 100644 index 0000000000..dd1b24c3f4 --- /dev/null +++ b/gather/metrics_test.go @@ -0,0 +1,85 @@ +package gather + +import ( + "encoding/json" + "testing" + + "github.com/google/go-cmp/cmp" + dto "github.com/prometheus/client_model/go" +) + +func TestMetrics(t *testing.T) { + cases := []struct { + name string + ms []Metrics + }{ + { + name: "empty", + ms: make([]Metrics, 0), + }, + { + name: "single", + ms: []Metrics{ + { + Timestamp: 12345, + Tags: map[string]string{ + "b": "B", + "a": "A", + "c": "C", + }, + Fields: map[string]interface{}{ + "x": 12.3, + "y": "a long string", + }, + Type: dto.MetricType_SUMMARY, + }, + }, + }, + { + name: "multiple", + ms: []Metrics{ + { + Timestamp: 12345, + Tags: map[string]string{ + "b": "B", + "a": "A", + "c": "C", + }, + Fields: map[string]interface{}{ + "x": 12.3, + "y": "a long string", + }, + Type: dto.MetricType_SUMMARY, + }, + + { + Timestamp: 12345, + Tags: map[string]string{ + "b": "B2", + "a": "A2", + "c": "C2", + }, + Fields: map[string]interface{}{ + "x": 12.5, + "y": "a long string2", + }, + Type: dto.MetricType_GAUGE, + }, + }, + }, + } + for _, c := range cases { + b, err := json.Marshal(c.ms) + if err != nil { + t.Fatalf("error in marshaling metrics: %v", err) + } + result := make([]Metrics, 0) + err = json.Unmarshal(b, &result) + if err != nil { + t.Fatalf("error in unmarshaling metrics: b: %s, %v", string(b), err) + } + if diff := cmp.Diff(c.ms, result, nil); diff != "" { + t.Fatalf("unmarshaling metrics is incorrect, want %v, got %v", c.ms, result) + } + } +} diff --git a/gather/scrapper.go b/gather/scrapper.go new file mode 100644 index 0000000000..08bc4fa181 --- /dev/null +++ b/gather/scrapper.go @@ -0,0 +1,210 @@ +package gather + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "math" + "mime" + "net/http" + "time" + + "github.com/influxdata/platform" + "github.com/influxdata/platform/nats" + "github.com/matttproud/golang_protobuf_extensions/pbutil" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" +) + +/* + 1. Discoverer + 2. Scheduler + 3. Queue + 4. Scrap + 5. Storage +*/ + +// Scrapper gathers metrics from an url +type Scrapper interface { + Gather(ctx context.Context, orgID, BucketID platform.ID, url string) error +} + +// NewPrometheusScrapper returns a new prometheusScraper +// to fetch metrics from prometheus /metrics +func NewPrometheusScrapper(s Storage) Scrapper { + return &prometheusScrapper{ + Storage: s, + } +} + +type prometheusScrapper struct { + Storage Storage +} + +func (p *prometheusScrapper) Gather( + ctx context.Context, + orgID, + BucketID platform.ID, + url string, +) error { + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + + return p.parse(resp.Body, resp.Header) +} + +func (p *prometheusScrapper) parse(r io.Reader, header http.Header) error { + var parser expfmt.TextParser + + mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type")) + if err != nil { + return err + } + // Prepare output + metricFamilies := make(map[string]*dto.MetricFamily) + if err == nil && mediatype == "application/vnd.google.protobuf" && + params["encoding"] == "delimited" && + params["proto"] == "io.prometheus.client.MetricFamily" { + for { + mf := &dto.MetricFamily{} + if _, err := pbutil.ReadDelimited(r, mf); err != nil { + if err == io.EOF { + break + } + return fmt.Errorf("reading metric family protocol buffer failed: %s", err) + } + metricFamilies[mf.GetName()] = mf + } + } else { + metricFamilies, err = parser.TextToMetricFamilies(r) + if err != nil { + return fmt.Errorf("reading text format failed: %s", err) + } + } + ms := make([]Metrics, 0) + + // read metrics + for metricName, mf := range metricFamilies { + for _, m := range mf.Metric { + // reading tags + tags := makeLabels(m) + // reading fields + fields := make(map[string]interface{}) + if mf.GetType() == dto.MetricType_SUMMARY { + // summary metric + fields = makeQuantiles(m) + fields["count"] = float64(m.GetSummary().GetSampleCount()) + fields["sum"] = float64(m.GetSummary().GetSampleSum()) + } else if mf.GetType() == dto.MetricType_HISTOGRAM { + // histogram metric + fields = makeBuckets(m) + fields["count"] = float64(m.GetHistogram().GetSampleCount()) + fields["sum"] = float64(m.GetHistogram().GetSampleSum()) + } else { + // standard metric + fields = getNameAndValue(m) + } + if len(fields) > 0 { + var t time.Time + if m.TimestampMs != nil && *m.TimestampMs > 0 { + t = time.Unix(0, *m.TimestampMs*1000000) + } else { + t = time.Now() + } + me := Metrics{ + Timestamp: t.UnixNano(), + Tags: tags, + Fields: fields, + Name: metricName, + Type: mf.GetType(), + } + ms = append(ms, me) + } + } + + } + fmt.Println(len(ms)) + + return p.Storage.Record(ms) +} + +// Get labels from metric +func makeLabels(m *dto.Metric) map[string]string { + result := map[string]string{} + for _, lp := range m.Label { + result[lp.GetName()] = lp.GetValue() + } + return result +} + +// Get Buckets from histogram metric +func makeBuckets(m *dto.Metric) map[string]interface{} { + fields := make(map[string]interface{}) + for _, b := range m.GetHistogram().Bucket { + fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount()) + } + return fields +} + +// Get name and value from metric +func getNameAndValue(m *dto.Metric) map[string]interface{} { + fields := make(map[string]interface{}) + if m.Gauge != nil { + if !math.IsNaN(m.GetGauge().GetValue()) { + fields["gauge"] = float64(m.GetGauge().GetValue()) + } + } else if m.Counter != nil { + if !math.IsNaN(m.GetCounter().GetValue()) { + fields["counter"] = float64(m.GetCounter().GetValue()) + } + } else if m.Untyped != nil { + if !math.IsNaN(m.GetUntyped().GetValue()) { + fields["value"] = float64(m.GetUntyped().GetValue()) + } + } + return fields +} + +// Get Quantiles from summary metric +func makeQuantiles(m *dto.Metric) map[string]interface{} { + fields := make(map[string]interface{}) + for _, q := range m.GetSummary().Quantile { + if !math.IsNaN(q.GetValue()) { + fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue()) + } + } + return fields +} + +// ScrapperRequest is the parsing request submited to nats +type ScrapperRequest struct { + HostURL string `json:"host_url"` + OrgID platform.ID `json:"org"` + BucketID platform.ID `json:"bucket"` +} + +// ScrapperHandler handles parsing subscription +type ScrapperHandler struct { + Scrapper Scrapper + Logger *log.Logger +} + +// Process implents nats Handler interface +func (h *ScrapperHandler) Process(s nats.Subscription, m nats.Message) { + defer m.Ack() + msg := new(ScrapperRequest) + err := json.Unmarshal(m.Data(), msg) + if err != nil { + h.Logger.Printf("scrapper processing error %v\n", err) + return + } + err = h.Scrapper.Gather(context.Background(), msg.OrgID, msg.BucketID, msg.HostURL) + if err != nil { + h.Logger.Println(err.Error()) + } +} diff --git a/gather/scrapper_test.go b/gather/scrapper_test.go new file mode 100644 index 0000000000..fb44dbb000 --- /dev/null +++ b/gather/scrapper_test.go @@ -0,0 +1,227 @@ +package gather + +import ( + "context" + "net/http" + "net/http/httptest" + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/platform" + dto "github.com/prometheus/client_model/go" +) + +type mockStorage struct { + Metrics map[int64]Metrics +} + +func (s *mockStorage) Record(ms []Metrics) error { + for _, m := range ms { + s.Metrics[m.Timestamp] = m + } + return nil +} + +type mockHTTPHandler struct { + unauthorized bool + noContent bool + responseMap map[string]string +} + +func (h mockHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if h.unauthorized { + w.WriteHeader(http.StatusUnauthorized) + return + } + if h.noContent { + w.WriteHeader(http.StatusNoContent) + return + } + s, ok := h.responseMap[r.URL.Path] + if !ok { + w.WriteHeader(http.StatusNotFound) + return + } + w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + w.Write([]byte(s)) +} + +func TestParse(t *testing.T) { + option := cmp.Options{ + cmp.Comparer(func(x, y Metrics) bool { + return x.Name == y.Name && + x.Type == y.Type && + reflect.DeepEqual(x.Tags, y.Tags) && + reflect.DeepEqual(x.Fields, y.Fields) + }), + } + orgID, _ := platform.IDFromString("020f755c3c082000") + bucketID, _ := platform.IDFromString("020f755c3c082001") + cases := []struct { + name string + ms []Metrics + handler *mockHTTPHandler + hasErr bool + }{ + { + name: "bad request", + hasErr: true, + }, + { + name: "empty request", + handler: &mockHTTPHandler{ + responseMap: map[string]string{ + "/metrics": "", + }, + }, + hasErr: true, + }, + { + name: "regular metrics", + handler: &mockHTTPHandler{ + responseMap: map[string]string{ + "/metrics": sampleResp, + }, + }, + ms: []Metrics{ + { + Name: "go_gc_duration_seconds", + Type: dto.MetricType_SUMMARY, + Fields: map[string]interface{}{ + "count": float64(326), + "sum": 0.07497837, + "0": 3.6257e-05, + "0.25": 0.0001434, + "0.5": 0.000194491, + "0.75": 0.000270339, + "1": 0.000789365, + }, + Tags: map[string]string{}, + }, + { + Name: "go_goroutines", + Type: dto.MetricType_GAUGE, + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "gauge": float64(36), + }, + }, + { + Name: "go_info", + Type: dto.MetricType_GAUGE, + Tags: map[string]string{ + "version": "go1.10.3", + }, + Fields: map[string]interface{}{ + "gauge": float64(1), + }, + }, + { + Name: "go_memstats_alloc_bytes", + Type: dto.MetricType_GAUGE, + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "gauge": 2.0091312e+07, + }, + }, + { + Name: "go_memstats_alloc_bytes_total", + Type: dto.MetricType_COUNTER, + Fields: map[string]interface{}{ + "counter": 4.183173328e+09, + }, + Tags: map[string]string{}, + }, + { + Name: "go_memstats_buck_hash_sys_bytes", + Type: dto.MetricType_GAUGE, + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "gauge": 1.533852e+06, + }, + }, + { + Name: "go_memstats_frees_total", + Type: dto.MetricType_COUNTER, + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "counter": 1.8944339e+07, + }, + }, + { + Name: "go_memstats_gc_cpu_fraction", + Type: dto.MetricType_GAUGE, + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "gauge": 1.972734963012756e-05, + }, + }, + }, + hasErr: false, + }, + } + for _, c := range cases { + storage := &mockStorage{ + Metrics: make(map[int64]Metrics), + } + scrapper := NewPrometheusScrapper(storage) + var url string + if c.handler != nil { + ts := httptest.NewServer(c.handler) + defer ts.Close() + url = ts.URL + } + err := scrapper.Gather(context.Background(), *orgID, *bucketID, url+"/metrics") + if err != nil && !c.hasErr { + t.Fatalf("scrapper parse err in testing %s: %v", c.name, err) + } + if len(c.ms) != len(storage.Metrics) { + t.Fatalf("scrapper parse metrics incorrect length, want %d, got %d", + len(c.ms), len(storage.Metrics)) + } + for _, m := range storage.Metrics { + for _, cm := range c.ms { + if m.Name == cm.Name { + if diff := cmp.Diff(m, cm, option); diff != "" { + t.Fatalf("scrapper parse metrics want %v, got %v", cm, m) + } + } + } + + } + } +} + +const sampleResp = ` +# HELP go_gc_duration_seconds A summary of the GC invocation durations. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 3.6257e-05 +go_gc_duration_seconds{quantile="0.25"} 0.0001434 +go_gc_duration_seconds{quantile="0.5"} 0.000194491 +go_gc_duration_seconds{quantile="0.75"} 0.000270339 +go_gc_duration_seconds{quantile="1"} 0.000789365 +go_gc_duration_seconds_sum 0.07497837 +go_gc_duration_seconds_count 326 +# HELP go_goroutines Number of goroutines that currently exist. +# TYPE go_goroutines gauge +go_goroutines 36 +# HELP go_info Information about the Go environment. +# TYPE go_info gauge +go_info{version="go1.10.3"} 1 +# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use. +# TYPE go_memstats_alloc_bytes gauge +go_memstats_alloc_bytes 2.0091312e+07 +# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed. +# TYPE go_memstats_alloc_bytes_total counter +go_memstats_alloc_bytes_total 4.183173328e+09 +# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table. +# TYPE go_memstats_buck_hash_sys_bytes gauge +go_memstats_buck_hash_sys_bytes 1.533852e+06 +# HELP go_memstats_frees_total Total number of frees. +# TYPE go_memstats_frees_total counter +go_memstats_frees_total 1.8944339e+07 +# HELP go_memstats_gc_cpu_fraction The fraction of this program's available CPU time used by the GC since the program started. +# TYPE go_memstats_gc_cpu_fraction gauge +go_memstats_gc_cpu_fraction 1.972734963012756e-05 +` diff --git a/gather/storage.go b/gather/storage.go new file mode 100644 index 0000000000..258560ca81 --- /dev/null +++ b/gather/storage.go @@ -0,0 +1,102 @@ +package gather + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + + "github.com/influxdata/platform/flux" + "github.com/influxdata/platform/nats" +) + +// Storage stores the metrics of a time based +type Storage interface { + //Subscriber nats.Subscriber + Record([]Metrics) error +} + +// NewNatsStorage use nats to publish each store request +// and the subscriber will use the embeded storage to do the store +// activity +func NewNatsStorage( + storage Storage, + subject string, + logger *log.Logger, + publisher nats.Publisher, + subscriber nats.Subscriber, +) Storage { + s := &natsStorage{ + storage: storage, + subject: subject, + publisher: publisher, + subscriber: subscriber, + } + s.subscriber.Subscribe(s.subject, "", &storageHandler{ + storage: storage, + logger: logger, + }) + return s +} + +type natsStorage struct { + storage Storage + subject string + subscriber nats.Subscriber + publisher nats.Publisher +} + +func (s *natsStorage) Record(ms []Metrics) error { + buf := new(bytes.Buffer) + b, err := json.Marshal(ms) + if err != nil { + return fmt.Errorf("scrapper metrics serialization error: %v", err) + } + _, err = buf.Write(b) + if err != nil { + return fmt.Errorf("scrapper metrics buffer write error: %v", err) + } + if err = s.publisher.Publish(s.subject, buf); err != nil { + return fmt.Errorf("scrapper publisher publish error: %v", err) + } + return nil +} + +type storageHandler struct { + storage Storage + logger *log.Logger +} + +func (h *storageHandler) Process(s nats.Subscription, m nats.Message) { + defer m.Ack() + ms := make([]Metrics, 0) + err := json.Unmarshal(m.Data(), &ms) + if err != nil { + h.logger.Printf("storage handler process err: %v\n", err) + } + err = h.storage.Record(ms) + if err != nil { + h.logger.Printf("storage handler store err: %v\n", err) + } +} + +// NewInfluxStorage create a new influx storage +// which will storage data directly to influxdb +func NewInfluxStorage(urls []string) Storage { + return &influxStorage{ + URLs: urls, + } +} + +type influxStorage struct { + URLs []string + Client flux.Client +} + +func (s *influxStorage) Record(ms []Metrics) error { + println(s.URLs) + for _, m := range ms { + fmt.Println(m) + } + return nil +}