feat(scraper): add scraper lib

pull/10616/head
Kelvin Wang 2018-09-07 11:45:28 -04:00
parent ae31bebad5
commit f636c52fa5
20 changed files with 1795 additions and 364 deletions

View File

@ -113,6 +113,11 @@ func (c *Client) initialize(ctx context.Context) error {
return err
}
// Always create Scraper bucket.
if err := c.initializeScraperTargets(ctx, tx); err != nil {
return err
}
return nil
}); err != nil {
return err

114
bolt/scraper.go Normal file
View File

@ -0,0 +1,114 @@
package bolt
import (
"context"
"encoding/json"
"errors"
"fmt"
bolt "github.com/coreos/bbolt"
"github.com/influxdata/platform"
)
var (
scraperBucket = []byte("scraperv2")
)
var _ platform.ScraperTargetStoreService = (*Client)(nil)
func (c *Client) initializeScraperTargets(ctx context.Context, tx *bolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists([]byte(scraperBucket)); err != nil {
return err
}
return nil
}
// ListTargets will list all scrape targets.
func (c *Client) ListTargets(ctx context.Context) (list []platform.ScraperTarget, err error) {
list = make([]platform.ScraperTarget, 0)
err = c.db.View(func(tx *bolt.Tx) (err error) {
cur := tx.Bucket(scraperBucket).Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
target := new(platform.ScraperTarget)
if err = json.Unmarshal(v, target); err != nil {
return err
}
list = append(list, *target)
}
return err
})
return list, err
}
// AddTarget add a new scraper target into storage.
func (c *Client) AddTarget(ctx context.Context, target *platform.ScraperTarget) (err error) {
return c.db.Update(func(tx *bolt.Tx) error {
target.ID = c.IDGenerator.ID()
return c.putTarget(ctx, tx, target)
})
}
// RemoveTarget removes a scraper target from the bucket.
func (c *Client) RemoveTarget(ctx context.Context, id platform.ID) error {
return c.db.Update(func(tx *bolt.Tx) error {
_, err := c.findTargetByID(ctx, tx, id)
if err != nil {
return err
}
return tx.Bucket(scraperBucket).Delete(id)
})
}
// UpdateTarget updates a scraper target.
func (c *Client) UpdateTarget(ctx context.Context, update *platform.ScraperTarget) (target *platform.ScraperTarget, err error) {
if len(update.ID) == 0 {
return nil, errors.New("update scraper: id is empty")
}
err = c.db.Update(func(tx *bolt.Tx) error {
target, err = c.findTargetByID(ctx, tx, update.ID)
if err != nil {
return err
}
target = update
return c.putTarget(ctx, tx, target)
})
return target, err
}
// GetTargetByID retrieves a scraper target by id.
func (c *Client) GetTargetByID(ctx context.Context, id platform.ID) (target *platform.ScraperTarget, err error) {
err = c.db.View(func(tx *bolt.Tx) error {
target, err = c.findTargetByID(ctx, tx, id)
return err
})
return target, err
}
func (c *Client) findTargetByID(ctx context.Context, tx *bolt.Tx, id platform.ID) (target *platform.ScraperTarget, err error) {
target = new(platform.ScraperTarget)
v := tx.Bucket(scraperBucket).Get(id)
if len(v) == 0 {
return nil, fmt.Errorf("scraper target is not found")
}
if err := json.Unmarshal(v, target); err != nil {
return nil, err
}
return target, nil
}
func (c *Client) putTarget(ctx context.Context, tx *bolt.Tx, target *platform.ScraperTarget) (err error) {
v, err := json.Marshal(target)
if err != nil {
return err
}
return tx.Bucket(scraperBucket).Put(target.ID, v)
}
// PutTarget will put a scraper target without setting an ID.
func (c *Client) PutTarget(ctx context.Context, target *platform.ScraperTarget) error {
return c.db.Update(func(tx *bolt.Tx) error {
return c.putTarget(ctx, tx, target)
})
}

51
bolt/scraper_test.go Normal file
View File

@ -0,0 +1,51 @@
package bolt_test
import (
"context"
"testing"
"github.com/influxdata/platform"
platformtesting "github.com/influxdata/platform/testing"
)
func initScraperTargetStoreService(f platformtesting.TargetFields, t *testing.T) (platform.ScraperTargetStoreService, func()) {
c, closeFn, err := NewTestClient()
if err != nil {
t.Fatalf("failed to create new bolt client: %v", err)
}
c.IDGenerator = f.IDGenerator
ctx := context.TODO()
for _, target := range f.Targets {
if err := c.PutTarget(ctx, &target); err != nil {
t.Fatalf("failed to populate users")
}
}
return c, func() {
defer closeFn()
for _, target := range f.Targets {
if err := c.RemoveTarget(ctx, target.ID); err != nil {
t.Logf("failed to remove targets: %v", err)
}
}
}
}
func TestScraperTargetStoreService_AddTarget(t *testing.T) {
platformtesting.AddTarget(initScraperTargetStoreService, t)
}
func TestScraperTargetStoreService_ListTargets(t *testing.T) {
platformtesting.ListTargets(initScraperTargetStoreService, t)
}
func TestScraperTargetStoreService_RemoveTarget(t *testing.T) {
platformtesting.RemoveTarget(initScraperTargetStoreService, t)
}
func TestScraperTargetStoreService_UpdateTarget(t *testing.T) {
platformtesting.UpdateTarget(initScraperTargetStoreService, t)
}
func TestScraperTargetStoreService_GetTargetByID(t *testing.T) {
platformtesting.GetTargetByID(initScraperTargetStoreService, t)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/influxdata/platform"
"github.com/influxdata/platform/bolt"
"github.com/influxdata/platform/chronograf/server"
"github.com/influxdata/platform/gather"
"github.com/influxdata/platform/http"
"github.com/influxdata/platform/kit/prom"
"github.com/influxdata/platform/nats"
@ -45,8 +46,8 @@ func main() {
}
const (
// NatsSubject is the subject that subscribers and publishers use for writing and consuming line protocol
NatsSubject = "ingress"
// IngressSubject is the subject that subscribers and publishers use for writing and consuming line protocol
IngressSubject = "ingress"
// IngressGroup is the Nats Streaming Subscriber group, allowing multiple subscribers to distribute work
IngressGroup = "ingress"
)
@ -119,6 +120,7 @@ var platformCmd = &cobra.Command{
}
func platformF(cmd *cobra.Command, args []string) {
ctx := context.Background()
// Create top level logger
logger := influxlogger.New(os.Stdout)
@ -129,7 +131,7 @@ func platformF(cmd *cobra.Command, args []string) {
c := bolt.NewClient()
c.Path = boltPath
if err := c.Open(context.TODO()); err != nil {
if err := c.Open(ctx); err != nil {
logger.Error("failed opening bolt", zap.Error(err))
os.Exit(1)
}
@ -208,7 +210,9 @@ func platformF(cmd *cobra.Command, args []string) {
// see issue #563
}
chronografSvc, err := server.NewServiceV2(context.TODO(), c.DB())
var scraperTargetSvc platform.ScraperTargetStoreService = c
chronografSvc, err := server.NewServiceV2(ctx, c.DB())
if err != nil {
logger.Error("failed creating chronograf service", zap.Error(err))
os.Exit(1)
@ -239,11 +243,20 @@ func platformF(cmd *cobra.Command, args []string) {
os.Exit(1)
}
if err := subscriber.Subscribe(NatsSubject, IngressGroup, &nats.LogHandler{Logger: logger}); err != nil {
if err := subscriber.Subscribe(IngressSubject, IngressGroup, &nats.LogHandler{Logger: logger}); err != nil {
logger.Error("failed to create nats subscriber", zap.Error(err))
os.Exit(1)
}
scraperScheduler, err := gather.NewScheduler(10, logger, scraperTargetSvc, publisher, subscriber, 0, 0)
if err != nil {
logger.Error("failed to create scraper subscriber", zap.Error(err))
os.Exit(1)
}
go func() {
errc <- scraperScheduler.Run(ctx)
}()
httpServer := &nethttp.Server{
Addr: httpBindAddress,
}
@ -285,7 +298,7 @@ func platformF(cmd *cobra.Command, args []string) {
taskHandler.TaskService = taskSvc
publishFn := func(r io.Reader) error {
return publisher.Publish(NatsSubject, r)
return publisher.Publish(IngressSubject, r)
}
writeHandler := http.NewWriteHandler(publishFn)
@ -335,7 +348,7 @@ func platformF(cmd *cobra.Command, args []string) {
logger.Fatal("unable to start platform", zap.Error(err))
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
httpServer.Shutdown(ctx)
}

View File

@ -1,114 +0,0 @@
package main
import (
"bytes"
"encoding/json"
"flag"
"log"
"strings"
"time"
"github.com/influxdata/platform"
"github.com/influxdata/platform/gather"
"github.com/influxdata/platform/nats"
)
const (
subjectParse = "sub_parse"
subjectStore = "sub_store"
)
func main() {
logger := new(log.Logger)
msg, influxURL := parseFlags()
publisher := initNats(logger, influxURL)
go scheduleGetMetrics(msg, publisher)
<-make(chan struct{})
}
func parseFlags() (msg gather.ScrapperRequest, influxURL []string) {
orgIDstr := flag.String("orgID", "", "the organization id")
bucketIDstr := flag.String("bucketID", "", "the bucket id")
hostStr := flag.String("pHost", "", "the promethus host")
influxStr := flag.String("influxURLs", "", "comma seperated urls")
flag.Parse()
orgID, err := platform.IDFromString(*orgIDstr)
if err != nil || orgID == nil || orgID.String() == "" {
log.Fatal("Invalid orgID")
}
bucketID, err := platform.IDFromString(*bucketIDstr)
if err != nil || bucketID == nil || bucketID.String() == "" {
log.Fatal("Invalid bucketID")
}
if *hostStr == "" {
log.Fatal("Invalid host")
}
pURL := *hostStr + "/metrics"
influxURL = strings.Split(*influxStr, ",")
if len(influxURL) == 0 {
influxURL = []string{
"http://localhost:8086",
}
}
msg = gather.ScrapperRequest{
HostURL: pURL,
OrgID: *orgID,
BucketID: *bucketID,
}
return msg, influxURL
}
func initNats(logger *log.Logger, influxURL []string) nats.Publisher {
server := nats.NewServer(nats.Config{
FilestoreDir: ".",
})
if err := server.Open(); err != nil {
log.Fatalf("nats server fatal err %v", err)
}
subscriber := nats.NewQueueSubscriber("nats-subscriber")
if err := subscriber.Open(); err != nil {
log.Fatalf("nats parse subscriber open issue %v", err)
}
publisher := nats.NewAsyncPublisher("nats-publisher")
if err := publisher.Open(); err != nil {
log.Fatalf("nats parse publisher open issue %v", err)
}
if err := subscriber.Subscribe(subjectParse, "", &gather.ScrapperHandler{
Scrapper: gather.NewPrometheusScrapper(
gather.NewNatsStorage(
gather.NewInfluxStorage(influxURL),
subjectStore,
logger,
publisher,
subscriber,
),
),
Logger: logger,
}); err != nil {
log.Fatalf("nats subscribe error")
}
return publisher
}
// scheduleGetMetrics will send the scraperRequest to publisher
// for every 2 second
func scheduleGetMetrics(msg gather.ScrapperRequest, publisher nats.Publisher) {
buf := new(bytes.Buffer)
b, _ := json.Marshal(msg)
buf.Write(b)
publisher.Publish(subjectParse, buf)
time.Sleep(2 * time.Second)
scheduleGetMetrics(msg, publisher)
}

50
gather/handler.go Normal file
View File

@ -0,0 +1,50 @@
package gather
import (
"bytes"
"context"
"encoding/json"
"github.com/influxdata/platform"
"github.com/influxdata/platform/nats"
"go.uber.org/zap"
)
// handler implents nats Handler interface.
type handler struct {
Scraper Scraper
Publisher nats.Publisher
Logger *zap.Logger
}
// Process consumes scraper target from scraper target queue,
// call the scraper to gather, and publish to metrics queue.
func (h *handler) Process(s nats.Subscription, m nats.Message) {
defer m.Ack()
req := new(platform.ScraperTarget)
err := json.Unmarshal(m.Data(), req)
if err != nil {
h.Logger.Error("unable to unmarshal json", zap.Error(err))
return
}
ms, err := h.Scraper.Gather(context.TODO(), *req)
if err != nil {
h.Logger.Error("unable to gather", zap.Error(err))
return
}
// send metrics to storage queue
buf := new(bytes.Buffer)
if err := json.NewEncoder(buf).Encode(ms); err != nil {
h.Logger.Error("unable to marshal json", zap.Error(err))
return
}
if err := h.Publisher.Publish(MetricsSubject, buf); err != nil {
h.Logger.Error("unable to publish scraper metrics", zap.Error(err))
return
}
}

View File

@ -1,12 +1,56 @@
package gather
import dto "github.com/prometheus/client_model/go"
import (
"github.com/gogo/protobuf/proto"
)
// Metrics is the default influx based metrics
// Metrics is the default influx based metrics.
type Metrics struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Fields map[string]interface{} `json:"fields"`
Timestamp int64 `json:"timestamp"`
Type dto.MetricType `json:"type"`
Type MetricType `json:"type"`
}
// MetricType is prometheus metrics type.
type MetricType int
// the set of metric types
const (
MetricTypeCounter MetricType = iota
MetricTypeGauge
MetricTypeSummary
MetricTypeUntyped
MetricTypeHistogrm
)
var metricTypeName = []string{
"COUNTER",
"GAUGE",
"SUMMARY",
"UNTYPED",
"HISTOGRAM",
}
var metricTypeValue = map[string]int32{
"COUNTER": 0,
"GAUGE": 1,
"SUMMARY": 2,
"UNTYPED": 3,
"HISTOGRAM": 4,
}
// String returns the string value of MetricType.
func (x MetricType) String() string {
return metricTypeName[x]
}
// UnmarshalJSON implements the unmarshaler interface.
func (x *MetricType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(metricTypeValue, data, "MetricType")
if err != nil {
return err
}
*x = MetricType(value)
return nil
}

View File

@ -5,7 +5,6 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
dto "github.com/prometheus/client_model/go"
)
func TestMetrics(t *testing.T) {
@ -31,7 +30,7 @@ func TestMetrics(t *testing.T) {
"x": 12.3,
"y": "a long string",
},
Type: dto.MetricType_SUMMARY,
Type: MetricTypeSummary,
},
},
},
@ -49,7 +48,7 @@ func TestMetrics(t *testing.T) {
"x": 12.3,
"y": "a long string",
},
Type: dto.MetricType_SUMMARY,
Type: MetricTypeSummary,
},
{
@ -63,7 +62,7 @@ func TestMetrics(t *testing.T) {
"x": 12.5,
"y": "a long string2",
},
Type: dto.MetricType_GAUGE,
Type: MetricTypeGauge,
},
},
},

View File

@ -2,72 +2,45 @@ package gather
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"math"
"mime"
"net/http"
"time"
"github.com/influxdata/platform"
"github.com/influxdata/platform/nats"
"github.com/matttproud/golang_protobuf_extensions/pbutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
/*
1. Discoverer
2. Scheduler
3. Queue
4. Scrap
5. Storage
*/
// prometheusScraper handles parsing prometheus metrics.
// implements Scraper interfaces.
type prometheusScraper struct{}
// Scrapper gathers metrics from an url
type Scrapper interface {
Gather(ctx context.Context, orgID, BucketID platform.ID, url string) error
}
// NewPrometheusScrapper returns a new prometheusScraper
// to fetch metrics from prometheus /metrics
func NewPrometheusScrapper(s Storage) Scrapper {
return &prometheusScrapper{
Storage: s,
}
}
type prometheusScrapper struct {
Storage Storage
}
func (p *prometheusScrapper) Gather(
ctx context.Context,
orgID,
BucketID platform.ID,
url string,
) error {
resp, err := http.Get(url)
// Gather parse metrics from a scraper target url.
func (p *prometheusScraper) Gather(ctx context.Context, target platform.ScraperTarget) (ms []Metrics, err error) {
resp, err := http.Get(target.URL)
if err != nil {
return err
return ms, err
}
defer resp.Body.Close()
return p.parse(resp.Body, resp.Header)
}
func (p *prometheusScrapper) parse(r io.Reader, header http.Header) error {
func (p *prometheusScraper) parse(r io.Reader, header http.Header) ([]Metrics, error) {
var parser expfmt.TextParser
now := time.Now()
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
if err != nil {
return err
return nil, err
}
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)
if err == nil && mediatype == "application/vnd.google.protobuf" &&
if mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" {
for {
@ -76,61 +49,60 @@ func (p *prometheusScrapper) parse(r io.Reader, header http.Header) error {
if err == io.EOF {
break
}
return fmt.Errorf("reading metric family protocol buffer failed: %s", err)
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err)
}
metricFamilies[mf.GetName()] = mf
}
} else {
metricFamilies, err = parser.TextToMetricFamilies(r)
if err != nil {
return fmt.Errorf("reading text format failed: %s", err)
return nil, fmt.Errorf("reading text format failed: %s", err)
}
}
ms := make([]Metrics, 0)
// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
for name, family := range metricFamilies {
for _, m := range family.Metric {
// reading tags
tags := makeLabels(m)
// reading fields
fields := make(map[string]interface{})
if mf.GetType() == dto.MetricType_SUMMARY {
switch family.GetType() {
case dto.MetricType_SUMMARY:
// summary metric
fields = makeQuantiles(m)
fields["count"] = float64(m.GetSummary().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
case dto.MetricType_HISTOGRAM:
// histogram metric
fields = makeBuckets(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetHistogram().GetSampleSum())
} else {
default:
// standard metric
fields = getNameAndValue(m)
}
if len(fields) > 0 {
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = time.Now()
}
me := Metrics{
Timestamp: t.UnixNano(),
Tags: tags,
Fields: fields,
Name: metricName,
Type: mf.GetType(),
}
ms = append(ms, me)
if len(fields) == 0 {
continue
}
tm := now
if m.TimestampMs != nil && *m.TimestampMs > 0 {
tm = time.Unix(0, *m.TimestampMs*1000000)
}
me := Metrics{
Timestamp: tm.UnixNano(),
Tags: tags,
Fields: fields,
Name: name,
Type: MetricType(family.GetType()),
}
ms = append(ms, me)
}
}
fmt.Println(len(ms))
return p.Storage.Record(ms)
return ms, nil
}
// Get labels from metric
@ -142,7 +114,7 @@ func makeLabels(m *dto.Metric) map[string]string {
return result
}
// Get Buckets from histogram metric
// Get Buckets from histogram metric
func makeBuckets(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
for _, b := range m.GetHistogram().Bucket {
@ -180,31 +152,3 @@ func makeQuantiles(m *dto.Metric) map[string]interface{} {
}
return fields
}
// ScrapperRequest is the parsing request submited to nats
type ScrapperRequest struct {
HostURL string `json:"host_url"`
OrgID platform.ID `json:"org"`
BucketID platform.ID `json:"bucket"`
}
// ScrapperHandler handles parsing subscription
type ScrapperHandler struct {
Scrapper Scrapper
Logger *log.Logger
}
// Process implents nats Handler interface
func (h *ScrapperHandler) Process(s nats.Subscription, m nats.Message) {
defer m.Ack()
msg := new(ScrapperRequest)
err := json.Unmarshal(m.Data(), msg)
if err != nil {
h.Logger.Printf("scrapper processing error %v\n", err)
return
}
err = h.Scrapper.Gather(context.Background(), msg.OrgID, msg.BucketID, msg.HostURL)
if err != nil {
h.Logger.Println(err.Error())
}
}

107
gather/scheduler.go Normal file
View File

@ -0,0 +1,107 @@
package gather
import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
"github.com/influxdata/platform"
"github.com/influxdata/platform/nats"
"go.uber.org/zap"
)
const (
MetricsSubject = "metrics"
promTargetSubject = "promTarget"
)
// Scheduler is struct to run scrape jobs.
type Scheduler struct {
Targets platform.ScraperTargetStoreService
// Interval is between each metrics gathering event.
Interval time.Duration
// Timeout is the maxisium time duration allowed by each TCP request
Timeout time.Duration
// Publisher will send the gather requests and gathered metrics to the queue.
Publisher nats.Publisher
Logger *zap.Logger
}
// NewScheduler creates a new Scheduler and subscriptions for scraper jobs.
func NewScheduler(
numScrapers int,
l *zap.Logger,
targets platform.ScraperTargetStoreService,
p nats.Publisher,
s nats.Subscriber,
interval time.Duration,
timeout time.Duration,
) (*Scheduler, error) {
if interval == 0 {
interval = 60 * time.Second
}
if timeout == 0 {
timeout = 30 * time.Second
}
scheduler := &Scheduler{
Targets: targets,
Interval: interval,
Timeout: timeout,
Publisher: p,
Logger: l,
}
for i := 0; i < numScrapers; i++ {
err := s.Subscribe(promTargetSubject, "", &handler{
Scraper: new(prometheusScraper),
Publisher: p,
Logger: l,
})
if err != nil {
return nil, err
}
}
return scheduler, nil
}
// Run will retrieve scraper targets from the target storage,
// and publish them to nats job queue for gather.
func (s *Scheduler) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(s.Interval):
ctx, cancel := context.WithTimeout(ctx, s.Timeout)
defer cancel()
targets, err := s.Targets.ListTargets(ctx)
if err != nil {
s.Logger.Error("cannot list targets", zap.Error(err))
continue
}
for _, target := range targets {
if err := requestScrape(target, s.Publisher); err != nil {
s.Logger.Error("json encoding error", zap.Error(err))
}
}
}
}
}
func requestScrape(t platform.ScraperTarget, publisher nats.Publisher) error {
buf := new(bytes.Buffer)
err := json.NewEncoder(buf).Encode(t)
if err != nil {
return err
}
switch t.Type {
case platform.PrometheusScraperType:
return publisher.Publish(promTargetSubject, buf)
}
return fmt.Errorf("unsupported target scrape type: %s", t.Type)
}

109
gather/scheduler_test.go Normal file
View File

@ -0,0 +1,109 @@
// +build !race
package gather
import (
"context"
"io/ioutil"
"net/http/httptest"
"os"
"testing"
"time"
"github.com/google/go-cmp/cmp"
influxlogger "github.com/influxdata/influxdb/logger"
"github.com/influxdata/platform"
"github.com/influxdata/platform/nats"
"go.uber.org/zap"
)
func TestScheduler(t *testing.T) {
_, publisher, subscriber := newTestingNats(t)
// Create top level logger
logger := influxlogger.New(os.Stdout)
ts := httptest.NewServer(&mockHTTPHandler{
responseMap: map[string]string{
"/metrics": sampleRespSmall,
},
})
storage := &mockStorage{
Metrics: make(map[int64]Metrics),
Targets: []platform.ScraperTarget{
{
Type: platform.PrometheusScraperType,
URL: ts.URL + "/metrics",
},
},
}
subscriber.Subscribe(MetricsSubject, "", &StorageHandler{
Logger: logger,
Storage: storage,
})
scheduler, err := NewScheduler(10, logger,
storage, publisher, subscriber, time.Millisecond*25, time.Microsecond*15)
go func() {
err = scheduler.Run(context.TODO())
if err != nil {
t.Fatal(err)
}
}()
// let scheduler run for 80 miliseconds.
<-time.After(time.Millisecond * 80)
want := Metrics{
Name: "go_goroutines",
Type: MetricTypeGauge,
Tags: map[string]string{},
Fields: map[string]interface{}{
"gauge": float64(36),
},
}
if len(storage.Metrics) < 3 {
t.Fatalf("non metrics stored, len %d", len(storage.Metrics))
}
for _, v := range storage.Metrics {
if diff := cmp.Diff(v, want, metricsCmpOption); diff != "" {
t.Fatalf("scraper parse metrics want %v, got %v", want, v)
}
}
ts.Close()
}
const sampleRespSmall = `
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 36
`
func newTestingNats(t *testing.T) (
server *nats.Server,
publisher *nats.AsyncPublisher,
subscriber *nats.QueueSubscriber,
) {
dir, err := ioutil.TempDir("", "influxdata-platform-nats-")
if err != nil {
t.Fatal("unable to open temporary nats folder")
}
// NATS streaming server
server = nats.NewServer(nats.Config{FilestoreDir: dir})
if err := server.Open(); err != nil {
t.Fatal("failed to start nats streaming server ", dir, zap.Error(err))
}
publisher = nats.NewAsyncPublisher("nats-testing-publisher")
if err = publisher.Open(); err != nil {
t.Fatal("failed to connect to streaming server", zap.Error(err))
}
subscriber = nats.NewQueueSubscriber("nats-testing-subscriber")
if err := subscriber.Open(); err != nil {
t.Fatal("failed to connect to streaming server", zap.Error(err))
}
return server, publisher, subscriber
}

12
gather/scraper.go Normal file
View File

@ -0,0 +1,12 @@
package gather
import (
"context"
"github.com/influxdata/platform"
)
// Scraper gathers metrics from a scraper target.
type Scraper interface {
Gather(ctx context.Context, target platform.ScraperTarget) (ms []Metrics, err error)
}

View File

@ -9,55 +9,9 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform"
dto "github.com/prometheus/client_model/go"
)
type mockStorage struct {
Metrics map[int64]Metrics
}
func (s *mockStorage) Record(ms []Metrics) error {
for _, m := range ms {
s.Metrics[m.Timestamp] = m
}
return nil
}
type mockHTTPHandler struct {
unauthorized bool
noContent bool
responseMap map[string]string
}
func (h mockHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if h.unauthorized {
w.WriteHeader(http.StatusUnauthorized)
return
}
if h.noContent {
w.WriteHeader(http.StatusNoContent)
return
}
s, ok := h.responseMap[r.URL.Path]
if !ok {
w.WriteHeader(http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
w.Write([]byte(s))
}
func TestParse(t *testing.T) {
option := cmp.Options{
cmp.Comparer(func(x, y Metrics) bool {
return x.Name == y.Name &&
x.Type == y.Type &&
reflect.DeepEqual(x.Tags, y.Tags) &&
reflect.DeepEqual(x.Fields, y.Fields)
}),
}
orgID, _ := platform.IDFromString("020f755c3c082000")
bucketID, _ := platform.IDFromString("020f755c3c082001")
func TestPrometheusScraper(t *testing.T) {
cases := []struct {
name string
ms []Metrics
@ -87,7 +41,7 @@ func TestParse(t *testing.T) {
ms: []Metrics{
{
Name: "go_gc_duration_seconds",
Type: dto.MetricType_SUMMARY,
Type: MetricTypeSummary,
Fields: map[string]interface{}{
"count": float64(326),
"sum": 0.07497837,
@ -101,7 +55,7 @@ func TestParse(t *testing.T) {
},
{
Name: "go_goroutines",
Type: dto.MetricType_GAUGE,
Type: MetricTypeGauge,
Tags: map[string]string{},
Fields: map[string]interface{}{
"gauge": float64(36),
@ -109,7 +63,7 @@ func TestParse(t *testing.T) {
},
{
Name: "go_info",
Type: dto.MetricType_GAUGE,
Type: MetricTypeGauge,
Tags: map[string]string{
"version": "go1.10.3",
},
@ -119,7 +73,7 @@ func TestParse(t *testing.T) {
},
{
Name: "go_memstats_alloc_bytes",
Type: dto.MetricType_GAUGE,
Type: MetricTypeGauge,
Tags: map[string]string{},
Fields: map[string]interface{}{
"gauge": 2.0091312e+07,
@ -127,7 +81,7 @@ func TestParse(t *testing.T) {
},
{
Name: "go_memstats_alloc_bytes_total",
Type: dto.MetricType_COUNTER,
Type: MetricTypeCounter,
Fields: map[string]interface{}{
"counter": 4.183173328e+09,
},
@ -135,7 +89,7 @@ func TestParse(t *testing.T) {
},
{
Name: "go_memstats_buck_hash_sys_bytes",
Type: dto.MetricType_GAUGE,
Type: MetricTypeGauge,
Tags: map[string]string{},
Fields: map[string]interface{}{
"gauge": 1.533852e+06,
@ -143,7 +97,7 @@ func TestParse(t *testing.T) {
},
{
Name: "go_memstats_frees_total",
Type: dto.MetricType_COUNTER,
Type: MetricTypeCounter,
Tags: map[string]string{},
Fields: map[string]interface{}{
"counter": 1.8944339e+07,
@ -151,7 +105,7 @@ func TestParse(t *testing.T) {
},
{
Name: "go_memstats_gc_cpu_fraction",
Type: dto.MetricType_GAUGE,
Type: MetricTypeGauge,
Tags: map[string]string{},
Fields: map[string]interface{}{
"gauge": 1.972734963012756e-05,
@ -162,29 +116,30 @@ func TestParse(t *testing.T) {
},
}
for _, c := range cases {
storage := &mockStorage{
Metrics: make(map[int64]Metrics),
}
scrapper := NewPrometheusScrapper(storage)
scraper := new(prometheusScraper)
var url string
if c.handler != nil {
ts := httptest.NewServer(c.handler)
defer ts.Close()
url = ts.URL
}
err := scrapper.Gather(context.Background(), *orgID, *bucketID, url+"/metrics")
results, err := scraper.Gather(context.Background(), platform.ScraperTarget{
URL: url + "/metrics",
OrgName: "org1",
BucketName: "bucket1",
})
if err != nil && !c.hasErr {
t.Fatalf("scrapper parse err in testing %s: %v", c.name, err)
t.Fatalf("scraper parse err in testing %s: %v", c.name, err)
}
if len(c.ms) != len(storage.Metrics) {
t.Fatalf("scrapper parse metrics incorrect length, want %d, got %d",
len(c.ms), len(storage.Metrics))
if len(c.ms) != len(results) {
t.Fatalf("scraper parse metrics incorrect length, want %d, got %d",
len(c.ms), len(results))
}
for _, m := range storage.Metrics {
for _, m := range results {
for _, cm := range c.ms {
if m.Name == cm.Name {
if diff := cmp.Diff(m, cm, option); diff != "" {
t.Fatalf("scrapper parse metrics want %v, got %v", cm, m)
if diff := cmp.Diff(m, cm, metricsCmpOption); diff != "" {
t.Fatalf("scraper parse metrics want %v, got %v", cm, m)
}
}
}
@ -225,3 +180,101 @@ go_memstats_frees_total 1.8944339e+07
# TYPE go_memstats_gc_cpu_fraction gauge
go_memstats_gc_cpu_fraction 1.972734963012756e-05
`
// mockStorage implement storage interface
// and platform.ScraperTargetStoreService interface.
type mockStorage struct {
Metrics map[int64]Metrics
Targets []platform.ScraperTarget
}
func (s *mockStorage) Record(ms []Metrics) error {
for _, m := range ms {
s.Metrics[m.Timestamp] = m
}
return nil
}
func (s *mockStorage) ListTargets(ctx context.Context) (targets []platform.ScraperTarget, err error) {
if s.Targets == nil {
s.Targets = make([]platform.ScraperTarget, 0)
}
return s.Targets, nil
}
func (s *mockStorage) AddTarget(ctx context.Context, t *platform.ScraperTarget) error {
if s.Targets == nil {
s.Targets = make([]platform.ScraperTarget, 0)
}
s.Targets = append(s.Targets, *t)
return nil
}
func (s *mockStorage) RemoveTarget(ctx context.Context, id platform.ID) error {
if s.Targets == nil {
return nil
}
for k, v := range s.Targets {
if v.ID.String() == id.String() {
s.Targets = append(s.Targets[:k], s.Targets[k+1:]...)
break
}
}
return nil
}
func (s *mockStorage) GetTargetByID(ctx context.Context, id platform.ID) (target *platform.ScraperTarget, err error) {
for k, v := range s.Targets {
if v.ID.String() == id.String() {
target = &s.Targets[k]
break
}
}
return target, err
}
func (s *mockStorage) UpdateTarget(ctx context.Context, update *platform.ScraperTarget) (target *platform.ScraperTarget, err error) {
for k, v := range s.Targets {
if v.ID.String() == string(update.ID) {
s.Targets[k] = *update
break
}
}
return update, err
}
type mockHTTPHandler struct {
unauthorized bool
noContent bool
responseMap map[string]string
}
func (h mockHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if h.unauthorized {
w.WriteHeader(http.StatusUnauthorized)
return
}
if h.noContent {
w.WriteHeader(http.StatusNoContent)
return
}
s, ok := h.responseMap[r.URL.Path]
if !ok {
w.WriteHeader(http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
w.Write([]byte(s))
}
var metricsCmpOption = cmp.Options{
cmp.Comparer(func(x, y Metrics) bool {
return x.Name == y.Name &&
x.Type == y.Type &&
reflect.DeepEqual(x.Tags, y.Tags) &&
reflect.DeepEqual(x.Fields, y.Fields)
}),
}

View File

@ -1,102 +1,36 @@
package gather
import (
"bytes"
"encoding/json"
"fmt"
"log"
"github.com/influxdata/platform/flux"
"github.com/influxdata/platform/nats"
"go.uber.org/zap"
)
// Storage stores the metrics of a time based
// Storage stores the metrics of a time based.
type Storage interface {
//Subscriber nats.Subscriber
Record([]Metrics) error
}
// NewNatsStorage use nats to publish each store request
// and the subscriber will use the embeded storage to do the store
// activity
func NewNatsStorage(
storage Storage,
subject string,
logger *log.Logger,
publisher nats.Publisher,
subscriber nats.Subscriber,
) Storage {
s := &natsStorage{
storage: storage,
subject: subject,
publisher: publisher,
subscriber: subscriber,
}
s.subscriber.Subscribe(s.subject, "", &storageHandler{
storage: storage,
logger: logger,
})
return s
// StorageHandler implements nats.Handler interface.
type StorageHandler struct {
Storage Storage
Logger *zap.Logger
}
type natsStorage struct {
storage Storage
subject string
subscriber nats.Subscriber
publisher nats.Publisher
}
func (s *natsStorage) Record(ms []Metrics) error {
buf := new(bytes.Buffer)
b, err := json.Marshal(ms)
if err != nil {
return fmt.Errorf("scrapper metrics serialization error: %v", err)
}
_, err = buf.Write(b)
if err != nil {
return fmt.Errorf("scrapper metrics buffer write error: %v", err)
}
if err = s.publisher.Publish(s.subject, buf); err != nil {
return fmt.Errorf("scrapper publisher publish error: %v", err)
}
return nil
}
type storageHandler struct {
storage Storage
logger *log.Logger
}
func (h *storageHandler) Process(s nats.Subscription, m nats.Message) {
// Process consumes job queue, and use storage to record.
func (h *StorageHandler) Process(s nats.Subscription, m nats.Message) {
defer m.Ack()
ms := make([]Metrics, 0)
err := json.Unmarshal(m.Data(), &ms)
if err != nil {
h.logger.Printf("storage handler process err: %v\n", err)
h.Logger.Error(fmt.Sprintf("storage handler process err: %v", err))
return
}
err = h.storage.Record(ms)
err = h.Storage.Record(ms)
if err != nil {
h.logger.Printf("storage handler store err: %v\n", err)
h.Logger.Error(fmt.Sprintf("storage handler store err: %v", err))
}
}
// NewInfluxStorage create a new influx storage
// which will storage data directly to influxdb
func NewInfluxStorage(urls []string) Storage {
return &influxStorage{
URLs: urls,
}
}
type influxStorage struct {
URLs []string
Client flux.Client
}
func (s *influxStorage) Record(ms []Metrics) error {
println(s.URLs)
for _, m := range ms {
fmt.Println(m)
}
return nil
}

392
http/scraper_service.go Normal file
View File

@ -0,0 +1,392 @@
package http
import (
"bytes"
"context"
"encoding/json"
"net/http"
"path"
"github.com/influxdata/platform"
kerrors "github.com/influxdata/platform/kit/errors"
"github.com/julienschmidt/httprouter"
)
// ScraperHandler represents an HTTP API handler for scraper targets.
type ScraperHandler struct {
*httprouter.Router
ScraperStorageService platform.ScraperTargetStoreService
}
const (
targetPath = "/v2/scrapertargets"
)
// NewScraperHandler returns a new instance of ScraperHandler.
func NewScraperHandler() *ScraperHandler {
h := &ScraperHandler{
Router: httprouter.New(),
}
h.HandlerFunc("POST", targetPath, h.handlePostScraperTarget)
h.HandlerFunc("GET", targetPath, h.handleGetScraperTargets)
h.HandlerFunc("GET", targetPath+"/:id", h.handleGetScraperTarget)
h.HandlerFunc("PATCH", targetPath+"/:id", h.handlePatchScraperTarget)
h.HandlerFunc("DELETE", targetPath+"/:id", h.handleDeleteScraperTarget)
return h
}
// handlePostScraperTarget is HTTP handler for the POST /v2/scrapertargets route.
func (h *ScraperHandler) handlePostScraperTarget(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeScraperTargetAddRequest(ctx, r)
if err != nil {
EncodeError(ctx, err, w)
return
}
if err := h.ScraperStorageService.AddTarget(ctx, req); err != nil {
EncodeError(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusCreated, newTargetResponse(*req)); err != nil {
EncodeError(ctx, err, w)
return
}
}
// handleDeleteScraperTarget is the HTTP handler for the DELETE /v2/scrapertargets/:id route.
func (h *ScraperHandler) handleDeleteScraperTarget(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
id, err := decodeScraperTargetIDRequest(ctx, r)
if err != nil {
EncodeError(ctx, err, w)
return
}
if err := h.ScraperStorageService.RemoveTarget(ctx, *id); err != nil {
EncodeError(ctx, err, w)
return
}
w.WriteHeader(http.StatusAccepted)
}
// handlePatchScraperTarget is the HTTP handler for the PATCH /v2/scrapertargets/:id route.
func (h *ScraperHandler) handlePatchScraperTarget(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
update, err := decodeScraperTargetUpdateRequest(ctx, r)
if err != nil {
EncodeError(ctx, err, w)
return
}
target, err := h.ScraperStorageService.UpdateTarget(ctx, update)
if err != nil {
EncodeError(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, newTargetResponse(*target)); err != nil {
EncodeError(ctx, err, w)
return
}
}
func (h *ScraperHandler) handleGetScraperTarget(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
id, err := decodeScraperTargetIDRequest(ctx, r)
if err != nil {
EncodeError(ctx, err, w)
return
}
target, err := h.ScraperStorageService.GetTargetByID(ctx, *id)
if err != nil {
EncodeError(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, newTargetResponse(*target)); err != nil {
EncodeError(ctx, err, w)
return
}
}
// handleGetScraperTargets is the HTTP handler for the GET /v2/scrapertargets route.
func (h *ScraperHandler) handleGetScraperTargets(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
targets, err := h.ScraperStorageService.ListTargets(ctx)
if err != nil {
EncodeError(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, newListTargetsResponse(targets)); err != nil {
EncodeError(ctx, err, w)
return
}
}
func decodeScraperTargetUpdateRequest(ctx context.Context, r *http.Request) (
*platform.ScraperTarget, error) {
update := &platform.ScraperTarget{}
if err := json.NewDecoder(r.Body).Decode(update); err != nil {
return nil, err
}
id, err := decodeScraperTargetIDRequest(ctx, r)
if err != nil {
return nil, err
}
update.ID = *id
return update, nil
}
func decodeScraperTargetAddRequest(ctx context.Context, r *http.Request) (*platform.ScraperTarget, error) {
req := &platform.ScraperTarget{}
if err := json.NewDecoder(r.Body).Decode(req); err != nil {
return nil, err
}
return req, nil
}
func decodeScraperTargetIDRequest(ctx context.Context, r *http.Request) (*platform.ID, error) {
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
if id == "" {
return nil, kerrors.InvalidDataf("url missing id")
}
var i platform.ID
if err := i.DecodeFromString(id); err != nil {
return nil, err
}
return &i, nil
}
// ScraperService connects to Influx via HTTP using tokens to manage scraper targets.
type ScraperService struct {
Addr string
Token string
InsecureSkipVerify bool
}
// ListTargets returns a list of all scraper targets.
func (s *ScraperService) ListTargets(ctx context.Context) ([]platform.ScraperTarget, error) {
url, err := newURL(s.Addr, targetPath)
if err != nil {
return nil, err
}
query := url.Query()
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return nil, err
}
req.URL.RawQuery = query.Encode()
SetToken(s.Token, req)
hc := newClient(url.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
if err := CheckError(resp); err != nil {
return nil, err
}
var targetsResp getTargetsResponse
if err := json.NewDecoder(resp.Body).Decode(&targetsResp); err != nil {
return nil, err
}
targets := make([]platform.ScraperTarget, len(targetsResp.Targets))
for k, v := range targetsResp.Targets {
targets[k] = v.ScraperTarget
}
return targets, nil
}
// UpdateTarget updates a single scraper target with changeset.
// Returns the new target state after update.
func (s *ScraperService) UpdateTarget(ctx context.Context, update *platform.ScraperTarget) (*platform.ScraperTarget, error) {
url, err := newURL(s.Addr, targetIDPath(update.ID))
if err != nil {
return nil, err
}
octets, err := json.Marshal(update)
if err != nil {
return nil, err
}
req, err := http.NewRequest("PATCH", url.String(), bytes.NewReader(octets))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
SetToken(s.Token, req)
hc := newClient(url.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
if err := CheckError(resp); err != nil {
return nil, err
}
var targetResp targetResponse
if err := json.NewDecoder(resp.Body).Decode(&targetResp); err != nil {
return nil, err
}
defer resp.Body.Close()
return &targetResp.ScraperTarget, nil
}
// AddTarget creates a new scraper target and sets target.ID with the new identifier.
func (s *ScraperService) AddTarget(ctx context.Context, target platform.ScraperTarget) error {
url, err := newURL(s.Addr, targetPath)
if err != nil {
return err
}
octets, err := json.Marshal(target)
if err != nil {
return err
}
req, err := http.NewRequest("POST", url.String(), bytes.NewReader(octets))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
SetToken(s.Token, req)
hc := newClient(url.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return err
}
// TODO(jsternberg): Should this check for a 201 explicitly?
if err := CheckError(resp); err != nil {
return err
}
targetResp := new(targetResponse)
if err := json.NewDecoder(resp.Body).Decode(targetResp); err != nil {
return err
}
return nil
}
// RemoveTarget removes a scraper target by ID.
func (s *ScraperService) RemoveTarget(ctx context.Context, id platform.ID) error {
url, err := newURL(s.Addr, targetIDPath(id))
if err != nil {
return err
}
req, err := http.NewRequest("DELETE", url.String(), nil)
if err != nil {
return err
}
SetToken(s.Token, req)
hc := newClient(url.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return err
}
return CheckError(resp)
}
// GetTargetByID returns a single target by ID.
func (s *ScraperService) GetTargetByID(ctx context.Context, id platform.ID) (*platform.ScraperTarget, error) {
url, err := newURL(s.Addr, targetIDPath(id))
if err != nil {
return nil, err
}
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return nil, err
}
SetToken(s.Token, req)
hc := newClient(url.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
if err := CheckError(resp); err != nil {
return nil, err
}
var targetResp targetResponse
if err := json.NewDecoder(resp.Body).Decode(&targetResp); err != nil {
return nil, err
}
defer resp.Body.Close()
return &targetResp.ScraperTarget, nil
}
func targetIDPath(id platform.ID) string {
return path.Join(targetPath, id.String())
}
type getTargetsLinks struct {
Self string `json:"self"`
}
type getTargetsResponse struct {
Links getTargetsLinks `json:"links"`
Targets []targetResponse `json:"scraper_targets"`
}
type targetLinks struct {
Self string `json:"self"`
}
type targetResponse struct {
platform.ScraperTarget
Links targetLinks `json:"links"`
}
func newListTargetsResponse(targets []platform.ScraperTarget) getTargetsResponse {
res := getTargetsResponse{
Links: getTargetsLinks{
Self: targetPath,
},
Targets: make([]targetResponse, 0, len(targets)),
}
for _, target := range targets {
res.Targets = append(res.Targets, newTargetResponse(target))
}
return res
}
func newTargetResponse(target platform.ScraperTarget) targetResponse {
return targetResponse{
Links: targetLinks{
Self: targetIDPath(target.ID),
},
ScraperTarget: target,
}
}

98
inmem/scraper.go Normal file
View File

@ -0,0 +1,98 @@
package inmem
import (
"context"
"errors"
"fmt"
"github.com/influxdata/platform"
)
var (
errScraperTargetNotFound = fmt.Errorf("scraper target is not found")
)
var _ platform.ScraperTargetStoreService = (*Service)(nil)
func (s *Service) loadScraperTarget(id platform.ID) (*platform.ScraperTarget, error) {
i, ok := s.scraperTargetKV.Load(id.String())
if !ok {
return nil, errScraperTargetNotFound
}
b, ok := i.(platform.ScraperTarget)
if !ok {
return nil, fmt.Errorf("type %T is not a scraper target", i)
}
return &b, nil
}
func (s *Service) forEachScraperTarget(ctx context.Context, fn func(b platform.ScraperTarget) bool) error {
var err error
s.scraperTargetKV.Range(func(k, v interface{}) bool {
o, ok := v.(platform.ScraperTarget)
if !ok {
err = fmt.Errorf("type %T is not a scraper target", v)
return false
}
return fn(o)
})
return err
}
// ListTargets will list all scrape targets.
func (s *Service) ListTargets(ctx context.Context) (list []platform.ScraperTarget, err error) {
list = make([]platform.ScraperTarget, 0)
s.scraperTargetKV.Range(func(_, v interface{}) bool {
b, ok := v.(platform.ScraperTarget)
if !ok {
err = fmt.Errorf("type %T is not a scraper target", v)
return false
}
list = append(list, b)
return true
})
return list, err
}
// AddTarget add a new scraper target into storage.
func (s *Service) AddTarget(ctx context.Context, target *platform.ScraperTarget) (err error) {
target.ID = s.IDGenerator.ID()
return s.PutTarget(ctx, target)
}
// RemoveTarget removes a scraper target from the bucket.
func (s *Service) RemoveTarget(ctx context.Context, id platform.ID) error {
if _, err := s.loadScraperTarget(id); err != nil {
return err
}
s.scraperTargetKV.Delete(id.String())
return nil
}
// UpdateTarget updates a scraper target.
func (s *Service) UpdateTarget(ctx context.Context, update *platform.ScraperTarget) (target *platform.ScraperTarget, err error) {
if len(update.ID) == 0 {
return nil, errors.New("update scraper: id is empty")
}
target, err = s.loadScraperTarget(update.ID)
if err != nil {
return nil, err
}
target = update
err = s.PutTarget(ctx, update)
return target, err
}
// GetTargetByID retrieves a scraper target by id.
func (s *Service) GetTargetByID(ctx context.Context, id platform.ID) (target *platform.ScraperTarget, err error) {
return s.loadScraperTarget(id)
}
// PutTarget will put a scraper target without setting an ID.
func (s *Service) PutTarget(ctx context.Context, target *platform.ScraperTarget) error {
s.scraperTargetKV.Store(target.ID.String(), *target)
return nil
}

41
inmem/scraper_test.go Normal file
View File

@ -0,0 +1,41 @@
package inmem
import (
"context"
"testing"
"github.com/influxdata/platform"
platformtesting "github.com/influxdata/platform/testing"
)
func initScraperTargetStoreService(f platformtesting.TargetFields, t *testing.T) (platform.ScraperTargetStoreService, func()) {
s := NewService()
s.IDGenerator = f.IDGenerator
ctx := context.TODO()
for _, target := range f.Targets {
if err := s.PutTarget(ctx, &target); err != nil {
t.Fatalf("failed to populate scraper targets")
}
}
return s, func() {}
}
func TestScraperTargetStoreService_AddTarget(t *testing.T) {
platformtesting.AddTarget(initScraperTargetStoreService, t)
}
func TestScraperTargetStoreService_ListTargets(t *testing.T) {
platformtesting.ListTargets(initScraperTargetStoreService, t)
}
func TestScraperTargetStoreService_RemoveTarget(t *testing.T) {
platformtesting.RemoveTarget(initScraperTargetStoreService, t)
}
func TestScraperTargetStoreService_UpdateTarget(t *testing.T) {
platformtesting.UpdateTarget(initScraperTargetStoreService, t)
}
func TestScraperTargetStoreService_GetTargetByID(t *testing.T) {
platformtesting.GetTargetByID(initScraperTargetStoreService, t)
}

View File

@ -18,6 +18,7 @@ type Service struct {
viewKV sync.Map
dbrpMappingKV sync.Map
userResourceMappingKV sync.Map
scraperTargetKV sync.Map
TokenGenerator platform.TokenGenerator
IDGenerator platform.IDGenerator

53
scraper.go Normal file
View File

@ -0,0 +1,53 @@
package platform
import (
"context"
)
// ScraperTarget is a target to scrape
type ScraperTarget struct {
ID ID `json:"id"`
Name string `json:"name"`
Type ScraperType `json:"type"`
URL string `json:"url"`
OrgName string `json:"org"`
BucketName string `json:"bucket"`
}
// ScraperTargetStoreService defines the crud service for ScraperTarget.
type ScraperTargetStoreService interface {
ListTargets(ctx context.Context) ([]ScraperTarget, error)
AddTarget(ctx context.Context, t *ScraperTarget) error
GetTargetByID(ctx context.Context, id ID) (*ScraperTarget, error)
RemoveTarget(ctx context.Context, id ID) error
UpdateTarget(ctx context.Context, t *ScraperTarget) (*ScraperTarget, error)
}
// ScraperTargetFilter represents a set of filter that restrict the returned results.
type ScraperTargetFilter struct {
ID *ID `json:"id"`
Name *string `json:"name"`
}
// ScraperType defines the scraper methods.
type ScraperType string
// Scraper types
const (
// PrometheusScraperType parses metrics from a prometheus endpoint.
PrometheusScraperType = "prometheus"
)
var validScraperTypes = map[ScraperType]bool{
PrometheusScraperType: false,
}
// ValidScraperType returns true is the type string is valid
func ValidScraperType(s string) bool {
switch s {
case PrometheusScraperType:
return true
default:
return false
}
}

525
testing/scraper_target.go Normal file
View File

@ -0,0 +1,525 @@
package testing
import (
"bytes"
"context"
"fmt"
"sort"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform"
"github.com/influxdata/platform/mock"
"github.com/pkg/errors"
)
const (
targetOneID = "020f755c3c082000"
targetTwoID = "020f755c3c082001"
targetThreeID = "020f755c3c082002"
)
// TargetFields will include the IDGenerator, and targets
type TargetFields struct {
IDGenerator platform.IDGenerator
Targets []platform.ScraperTarget
}
var targetCmpOptions = cmp.Options{
cmp.Comparer(func(x, y []byte) bool {
return bytes.Equal(x, y)
}),
cmp.Transformer("Sort", func(in []platform.ScraperTarget) []platform.ScraperTarget {
out := append([]platform.ScraperTarget(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].ID.String() > out[j].ID.String()
})
return out
}),
}
// AddTarget testing.
func AddTarget(
init func(TargetFields, *testing.T) (platform.ScraperTargetStoreService, func()),
t *testing.T,
) {
type args struct {
target *platform.ScraperTarget
}
type wants struct {
err error
targets []platform.ScraperTarget
}
tests := []struct {
name string
fields TargetFields
args args
wants wants
}{
{
name: "create targets with empty set",
fields: TargetFields{
IDGenerator: mock.NewIDGenerator(targetOneID, t),
Targets: []platform.ScraperTarget{},
},
args: args{
target: &platform.ScraperTarget{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgName: "org1",
BucketName: "bucket1",
URL: "url1",
},
},
wants: wants{
targets: []platform.ScraperTarget{
{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgName: "org1",
BucketName: "bucket1",
URL: "url1",
ID: idFromString(t, targetOneID),
},
},
},
},
{
name: "basic create target",
fields: TargetFields{
IDGenerator: mock.NewIDGenerator(targetTwoID, t),
Targets: []platform.ScraperTarget{
{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgName: "org1",
BucketName: "bucket1",
URL: "url1",
ID: idFromString(t, targetOneID),
},
},
},
args: args{
target: &platform.ScraperTarget{
Name: "name2",
Type: platform.PrometheusScraperType,
OrgName: "org2",
BucketName: "bucket2",
URL: "url2",
},
},
wants: wants{
targets: []platform.ScraperTarget{
{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgName: "org1",
BucketName: "bucket1",
URL: "url1",
ID: idFromString(t, targetOneID),
},
{
Name: "name2",
Type: platform.PrometheusScraperType,
OrgName: "org2",
BucketName: "bucket2",
URL: "url2",
ID: idFromString(t, targetTwoID),
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.TODO()
err := s.AddTarget(ctx, tt.args.target)
if (err != nil) != (tt.wants.err != nil) {
t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err)
}
if err != nil && tt.wants.err != nil {
if err.Error() != tt.wants.err.Error() {
t.Fatalf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error())
}
}
defer s.RemoveTarget(ctx, tt.args.target.ID)
targets, err := s.ListTargets(ctx)
if err != nil {
t.Fatalf("failed to retrieve scraper targets: %v", err)
}
if diff := cmp.Diff(targets, tt.wants.targets, targetCmpOptions...); diff != "" {
t.Errorf("scraper targets are different -got/+want\ndiff %s", diff)
}
})
}
}
// ListTargets testing
func ListTargets(
init func(TargetFields, *testing.T) (platform.ScraperTargetStoreService, func()),
t *testing.T,
) {
type wants struct {
targets []platform.ScraperTarget
err error
}
tests := []struct {
name string
fields TargetFields
wants wants
}{
{
name: "find all targets",
fields: TargetFields{
Targets: []platform.ScraperTarget{
{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgName: "org1",
BucketName: "bucket1",
URL: "url1",
ID: idFromString(t, targetOneID),
},
{
Name: "name2",
Type: platform.PrometheusScraperType,
OrgName: "org2",
BucketName: "bucket2",
URL: "url2",
ID: idFromString(t, targetTwoID),
},
},
},
wants: wants{
targets: []platform.ScraperTarget{
{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgName: "org1",
BucketName: "bucket1",
URL: "url1",
ID: idFromString(t, targetOneID),
},
{
Name: "name2",
Type: platform.PrometheusScraperType,
OrgName: "org2",
BucketName: "bucket2",
URL: "url2",
ID: idFromString(t, targetTwoID),
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.TODO()
targets, err := s.ListTargets(ctx)
if (err != nil) != (tt.wants.err != nil) {
t.Fatalf("expected errors to be equal '%v' got '%v'", tt.wants.err, err)
}
if err != nil && tt.wants.err != nil {
if err.Error() != tt.wants.err.Error() {
t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err)
}
}
if diff := cmp.Diff(targets, tt.wants.targets, targetCmpOptions...); diff != "" {
t.Errorf("targets are different -got/+want\ndiff %s", diff)
}
})
}
}
// GetTargetByID testing
func GetTargetByID(
init func(TargetFields, *testing.T) (platform.ScraperTargetStoreService, func()),
t *testing.T,
) {
type args struct {
id platform.ID
}
type wants struct {
err error
target *platform.ScraperTarget
}
tests := []struct {
name string
fields TargetFields
args args
wants wants
}{
{
name: "basic find target by id",
fields: TargetFields{
Targets: []platform.ScraperTarget{
{
ID: idFromString(t, targetOneID),
Name: "target1",
},
{
ID: idFromString(t, targetTwoID),
Name: "target2",
},
},
},
args: args{
id: idFromString(t, targetTwoID),
},
wants: wants{
target: &platform.ScraperTarget{
ID: idFromString(t, targetTwoID),
Name: "target2",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.TODO()
target, err := s.GetTargetByID(ctx, tt.args.id)
if (err != nil) != (tt.wants.err != nil) {
t.Fatalf("expected errors to be equal '%v' got '%v'", tt.wants.err, err)
}
if err != nil && tt.wants.err != nil {
if err.Error() != tt.wants.err.Error() {
t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err)
}
}
if diff := cmp.Diff(target, tt.wants.target, targetCmpOptions...); diff != "" {
t.Errorf("target is different -got/+want\ndiff %s", diff)
}
})
}
}
// RemoveTarget testing
func RemoveTarget(init func(TargetFields, *testing.T) (platform.ScraperTargetStoreService, func()),
t *testing.T) {
type args struct {
ID platform.ID
}
type wants struct {
err error
targets []platform.ScraperTarget
}
tests := []struct {
name string
fields TargetFields
args args
wants wants
}{
{
name: "delete targets using exist id",
fields: TargetFields{
Targets: []platform.ScraperTarget{
{
ID: idFromString(t, targetOneID),
},
{
ID: idFromString(t, targetTwoID),
},
},
},
args: args{
ID: idFromString(t, targetOneID),
},
wants: wants{
targets: []platform.ScraperTarget{
{
ID: idFromString(t, targetTwoID),
},
},
},
},
{
name: "delete targets using id that does not exist",
fields: TargetFields{
Targets: []platform.ScraperTarget{
{
ID: idFromString(t, targetOneID),
},
{
ID: idFromString(t, targetTwoID),
},
},
},
args: args{
ID: idFromString(t, targetThreeID),
},
wants: wants{
err: fmt.Errorf("scraper target is not found"),
targets: []platform.ScraperTarget{
{
ID: idFromString(t, targetOneID),
},
{
ID: idFromString(t, targetTwoID),
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.TODO()
err := s.RemoveTarget(ctx, tt.args.ID)
if (err != nil) != (tt.wants.err != nil) {
t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err)
}
if err != nil && tt.wants.err != nil {
if err.Error() != tt.wants.err.Error() {
t.Fatalf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error())
}
}
targets, err := s.ListTargets(ctx)
if err != nil {
t.Fatalf("failed to retrieve targets: %v", err)
}
if diff := cmp.Diff(targets, tt.wants.targets, targetCmpOptions...); diff != "" {
t.Errorf("targets are different -got/+want\ndiff %s", diff)
}
})
}
}
// UpdateTarget testing
func UpdateTarget(
init func(TargetFields, *testing.T) (platform.ScraperTargetStoreService, func()),
t *testing.T,
) {
type args struct {
url string
id platform.ID
}
type wants struct {
err error
target *platform.ScraperTarget
}
tests := []struct {
name string
fields TargetFields
args args
wants wants
}{
{
name: "update url with blank id",
fields: TargetFields{
Targets: []platform.ScraperTarget{
{
ID: idFromString(t, targetOneID),
URL: "url1",
},
{
ID: idFromString(t, targetTwoID),
URL: "url2",
},
},
},
args: args{
url: "changed",
},
wants: wants{
err: errors.New("update scraper: id is empty"),
},
},
{
name: "update url with non exist id",
fields: TargetFields{
Targets: []platform.ScraperTarget{
{
ID: idFromString(t, targetOneID),
URL: "url1",
},
{
ID: idFromString(t, targetTwoID),
URL: "url2",
},
},
},
args: args{
id: idFromString(t, targetThreeID),
url: "changed",
},
wants: wants{
err: errors.New("scraper target is not found"),
},
},
{
name: "update url",
fields: TargetFields{
Targets: []platform.ScraperTarget{
{
ID: idFromString(t, targetOneID),
URL: "url1",
},
{
ID: idFromString(t, targetTwoID),
URL: "url2",
},
},
},
args: args{
id: idFromString(t, targetOneID),
url: "changed",
},
wants: wants{
target: &platform.ScraperTarget{
ID: idFromString(t, targetOneID),
URL: "changed",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.TODO()
upd := &platform.ScraperTarget{
ID: tt.args.id,
URL: tt.args.url,
}
target, err := s.UpdateTarget(ctx, upd)
if (err != nil) != (tt.wants.err != nil) {
t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err)
}
if err != nil && tt.wants.err != nil {
if err.Error() != tt.wants.err.Error() {
t.Fatalf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error())
}
}
if diff := cmp.Diff(target, tt.wants.target, targetCmpOptions...); diff != "" {
t.Errorf("scraper target is different -got/+want\ndiff %s", diff)
}
})
}
}