add(gather): add writer interface

pull/10957/head
Kelvin Wang 2019-01-10 12:39:37 -05:00
parent da0f5d5ea6
commit ce6004243c
20 changed files with 713 additions and 241 deletions

View File

@ -46,6 +46,20 @@ func (c *Client) ListTargets(ctx context.Context) (list []platform.ScraperTarget
// AddTarget add a new scraper target into storage.
func (c *Client) AddTarget(ctx context.Context, target *platform.ScraperTarget) (err error) {
if !target.OrgID.Valid() {
return &platform.Error{
Code: platform.EInvalid,
Msg: "org id is invalid",
Op: OpPrefix + platform.OpAddTarget,
}
}
if !target.BucketID.Valid() {
return &platform.Error{
Code: platform.EInvalid,
Msg: "bucket id is invalid",
Op: OpPrefix + platform.OpAddTarget,
}
}
err = c.db.Update(func(tx *bolt.Tx) error {
target.ID = c.IDGenerator.ID()
return c.putTarget(ctx, tx, target)
@ -100,6 +114,12 @@ func (c *Client) UpdateTarget(ctx context.Context, update *platform.ScraperTarge
if pe != nil {
return pe
}
if !update.BucketID.Valid() {
update.BucketID = target.BucketID
}
if !update.OrgID.Valid() {
update.OrgID = target.OrgID
}
target = update
return c.putTarget(ctx, tx, target)
})

View File

@ -18,7 +18,7 @@ func initScraperTargetStoreService(f platformtesting.TargetFields, t *testing.T)
ctx := context.Background()
for _, target := range f.Targets {
if err := c.PutTarget(ctx, target); err != nil {
t.Fatalf("failed to populate users")
t.Fatalf("failed to populate targets: %v", err)
}
}
return c, bolt.OpPrefix, func() {

View File

@ -375,6 +375,12 @@ func (m *Launcher) run(ctx context.Context) (err error) {
return err
}
subscriber.Subscribe(gather.MetricsSubject, "", &gather.RecorderHandler{
Logger: m.logger,
Recorder: gather.PointWriter{
Writer: pointsWriter,
},
})
scraperScheduler, err := gather.NewScheduler(10, m.logger, scraperTargetSvc, publisher, subscriber, 0, 0)
if err != nil {
m.logger.Error("failed to create scraper subscriber", zap.Error(err))

53
gather/README.md Normal file
View File

@ -0,0 +1,53 @@
# How to use this package
## Make sure nats is running. Both publisher and subscriber are open
```go
// NATS streaming server
m.natsServer = nats.NewServer(nats.Config{FilestoreDir: m.natsPath})
if err := m.natsServer.Open(); err != nil {
m.logger.Error("failed to start nats streaming server", zap.Error(err))
return err
}
publisher := nats.NewAsyncPublisher("nats-publisher")
if err := publisher.Open(); err != nil {
m.logger.Error("failed to connect to streaming server", zap.Error(err))
return err
}
subscriber := nats.NewQueueSubscriber("nats-subscriber")
if err := subscriber.Open(); err != nil {
m.logger.Error("failed to connect to streaming server", zap.Error(err))
return err
}
```
## Make sure the scraperTargetStorageService is accessible
```go
scraperTargetSvc influxdb.ScraperTargetStoreService = m.boltClient
```
## Setup recorder, Make sure subscriber subscribes use the correct recorder with the correct write service
```go
recorder := gather.PlatformWriter{
Timeout: time.Millisecond * 30,
Writer: writer,
}
subscriber.Subscribe(MetricsSubject, "", &RecorderHandler{
Logger: logger,
Recorder: recorder,
})
```
## Start the scheduler
```go
scraperScheduler, err := gather.NewScheduler(10, m.logger, scraperTargetSvc, publisher, subscriber, 0, 0)
if err != nil {
m.logger.Error("failed to create scraper subscriber", zap.Error(err))
return err
}
```

View File

@ -5,7 +5,7 @@ import (
"context"
"encoding/json"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/nats"
"go.uber.org/zap"
)
@ -22,7 +22,7 @@ type handler struct {
func (h *handler) Process(s nats.Subscription, m nats.Message) {
defer m.Ack()
req := new(platform.ScraperTarget)
req := new(influxdb.ScraperTarget)
err := json.Unmarshal(m.Data(), req)
if err != nil {
h.Logger.Error("unable to unmarshal json", zap.Error(err))
@ -35,7 +35,7 @@ func (h *handler) Process(s nats.Subscription, m nats.Message) {
return
}
// send metrics to storage queue
// send metrics to recorder queue
buf := new(bytes.Buffer)
if err := json.NewEncoder(buf).Encode(ms); err != nil {
h.Logger.Error("unable to marshal json", zap.Error(err))

View File

@ -1,18 +1,76 @@
package gather
import (
"bytes"
"io"
"time"
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
)
// MetricsCollection is the struct including metrics and other requirements.
type MetricsCollection struct {
OrgID influxdb.ID `json:"orgID"`
BucketID influxdb.ID `json:"bucketID"`
MetricsSlice MetricsSlice `json:"metrics"`
}
// Metrics is the default influx based metrics.
type Metrics struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Fields map[string]interface{} `json:"fields"`
Timestamp int64 `json:"timestamp"`
Timestamp time.Time `json:"timestamp"`
Type MetricType `json:"type"`
}
// MetricsSlice is a slice of Metrics
type MetricsSlice []Metrics
// Points convert the MetricsSlice to model.Points
func (ms MetricsSlice) Points() (models.Points, error) {
ps := make([]models.Point, len(ms))
for mi, m := range ms {
point, err := models.NewPoint(m.Name, models.NewTags(m.Tags), m.Fields, m.Timestamp)
if err != nil {
return ps, err
}
if m.Type.Valid() {
point.AddTag("type", m.Type.String())
}
ps[mi] = point
}
return ps, nil
}
// Reader returns an io.Reader that enumerates the metrics.
// All metrics are allocated into the underlying buffer.
func (ms MetricsSlice) Reader() (io.Reader, error) {
buf := new(bytes.Buffer)
for mi, m := range ms {
point, err := models.NewPoint(m.Name, models.NewTags(m.Tags), m.Fields, m.Timestamp)
if err != nil {
return nil, err
}
if m.Type.Valid() {
point.AddTag("type", m.Type.String())
}
_, err = buf.WriteString(point.String())
if err != nil {
return nil, err
}
if mi < len(ms)-1 && len(ms) > 1 {
_, err = buf.WriteString("\n")
if err != nil {
return nil, err
}
}
}
return buf, nil
}
// MetricType is prometheus metrics type.
type MetricType int
@ -40,6 +98,11 @@ var metricTypeValue = map[string]int32{
"HISTOGRAM": 4,
}
// Valid returns whether the metrics type is valid.
func (x MetricType) Valid() bool {
return x >= MetricTypeCounter && x <= MetricTypeHistogrm
}
// String returns the string value of MetricType.
func (x MetricType) String() string {
return metricTypeName[x]

View File

@ -1,13 +1,114 @@
package gather
import (
"bytes"
"encoding/json"
"testing"
"time"
"github.com/google/go-cmp/cmp"
)
func TestMetrics(t *testing.T) {
func TestMetricsReader(t *testing.T) {
cases := []struct {
name string
ms MetricsSlice
wants string
or string
}{
{
name: "single value only",
ms: []Metrics{
{
Name: "cpu_load_short",
Fields: map[string]interface{}{
"value": 0.64,
},
Type: -1,
Timestamp: time.Unix(0, 1422568543702900257),
},
},
wants: "cpu_load_short value=0.64 1422568543702900257",
},
{
name: "single regular metrics",
ms: []Metrics{
{
Name: "cpu_load_short",
Tags: map[string]string{
"host": "server01",
"region": "us-west",
},
Fields: map[string]interface{}{
"value": 0.64,
},
Type: MetricTypeGauge,
Timestamp: time.Unix(0, 1422568543702900257),
},
},
wants: "cpu_load_short,host=server01,region=us-west,type=GAUGE value=0.64 1422568543702900257",
},
{
name: "multiple value only",
ms: []Metrics{
{
Name: "cpu_load_short",
Fields: map[string]interface{}{
"value": 0.64,
"region": "us-west",
},
Type: -1,
Timestamp: time.Unix(0, 1522568543702900257),
},
},
wants: `cpu_load_short region="us-west",value=0.64 1522568543702900257`,
},
{
name: "multiple metrics",
ms: []Metrics{
{
Name: "cpu_load_short",
Tags: map[string]string{
"region": "us-west",
},
Fields: map[string]interface{}{
"value": 0.64,
},
Type: -1,
Timestamp: time.Unix(0, 1422568543702900257),
},
{
Name: "cpu_load_short",
Tags: map[string]string{
"region": "us-east",
},
Fields: map[string]interface{}{
"value": 0.34,
},
Type: -1,
Timestamp: time.Unix(0, 1522568543702900257),
},
},
wants: "cpu_load_short,region=us-west value=0.64 1422568543702900257\ncpu_load_short,region=us-east value=0.34 1522568543702900257",
},
}
for _, c := range cases {
r, err := c.ms.Reader()
if err != nil {
t.Fatalf("error in convert metrics to reader: %v", err)
}
buf := new(bytes.Buffer)
buf.ReadFrom(r)
if diff1 := cmp.Diff(c.wants, buf.String(), nil); diff1 != "" {
if diff2 := cmp.Diff(c.or, buf.String(), nil); diff2 != "" {
t.Fatalf("convert metrics is incorrect, diff %s", diff1)
}
}
}
}
func TestMetricsMarshal(t *testing.T) {
cases := []struct {
name string
ms []Metrics
@ -20,7 +121,7 @@ func TestMetrics(t *testing.T) {
name: "single",
ms: []Metrics{
{
Timestamp: 12345,
Timestamp: time.Unix(12345, 0),
Tags: map[string]string{
"b": "B",
"a": "A",
@ -38,7 +139,7 @@ func TestMetrics(t *testing.T) {
name: "multiple",
ms: []Metrics{
{
Timestamp: 12345,
Timestamp: time.Unix(12345, 0),
Tags: map[string]string{
"b": "B",
"a": "A",
@ -52,7 +153,7 @@ func TestMetrics(t *testing.T) {
},
{
Timestamp: 12345,
Timestamp: time.Unix(12345, 0),
Tags: map[string]string{
"b": "B2",
"a": "A2",

View File

@ -9,7 +9,7 @@ import (
"net/http"
"time"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
"github.com/matttproud/golang_protobuf_extensions/pbutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
@ -20,23 +20,23 @@ import (
type prometheusScraper struct{}
// Gather parse metrics from a scraper target url.
func (p *prometheusScraper) Gather(ctx context.Context, target platform.ScraperTarget) (ms []Metrics, err error) {
func (p *prometheusScraper) Gather(ctx context.Context, target influxdb.ScraperTarget) (collected MetricsCollection, err error) {
resp, err := http.Get(target.URL)
if err != nil {
return ms, err
return collected, err
}
defer resp.Body.Close()
return p.parse(resp.Body, resp.Header)
return p.parse(resp.Body, resp.Header, target)
}
func (p *prometheusScraper) parse(r io.Reader, header http.Header) ([]Metrics, error) {
func (p *prometheusScraper) parse(r io.Reader, header http.Header, target influxdb.ScraperTarget) (collected MetricsCollection, err error) {
var parser expfmt.TextParser
now := time.Now()
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
if err != nil {
return nil, err
return collected, err
}
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)
@ -49,14 +49,14 @@ func (p *prometheusScraper) parse(r io.Reader, header http.Header) ([]Metrics, e
if err == io.EOF {
break
}
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err)
return collected, fmt.Errorf("reading metric family protocol buffer failed: %s", err)
}
metricFamilies[mf.GetName()] = mf
}
} else {
metricFamilies, err = parser.TextToMetricFamilies(r)
if err != nil {
return nil, fmt.Errorf("reading text format failed: %s", err)
return collected, fmt.Errorf("reading text format failed: %s", err)
}
}
ms := make([]Metrics, 0)
@ -91,7 +91,7 @@ func (p *prometheusScraper) parse(r io.Reader, header http.Header) ([]Metrics, e
tm = time.Unix(0, *m.TimestampMs*1000000)
}
me := Metrics{
Timestamp: tm.UnixNano(),
Timestamp: tm,
Tags: tags,
Fields: fields,
Name: name,
@ -102,7 +102,13 @@ func (p *prometheusScraper) parse(r io.Reader, header http.Header) ([]Metrics, e
}
return ms, nil
collected = MetricsCollection{
MetricsSlice: ms,
OrgID: target.OrgID,
BucketID: target.BucketID,
}
return collected, nil
}
// Get labels from metric

82
gather/recorder.go Normal file
View File

@ -0,0 +1,82 @@
package gather
import (
"context"
"encoding/json"
"time"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/nats"
"github.com/influxdata/influxdb/storage"
"go.uber.org/zap"
)
// PointWriter will use the storage.PointWriter interface to record metrics.
type PointWriter struct {
Writer storage.PointsWriter
}
// Record the metrics and write using storage.PointWriter interface.
func (s PointWriter) Record(collected MetricsCollection) error {
ps, err := collected.MetricsSlice.Points()
if err != nil {
return err
}
ps, err = tsdb.ExplodePoints(collected.OrgID, collected.BucketID, ps)
if err != nil {
return err
}
return s.Writer.WritePoints(ps)
}
// ServiceWriter will use the writer interface to record the metrics.
type ServiceWriter struct {
Writer influxdb.WriteService
Timeout time.Duration
}
// Record the metrics and write using writer interface.
func (s ServiceWriter) Record(collected MetricsCollection) error {
r, err := collected.MetricsSlice.Reader()
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), s.Timeout)
defer cancel()
s.Writer.Write(ctx,
collected.OrgID,
collected.BucketID,
r,
)
return nil
}
// Recorder record the metrics of a time based.
type Recorder interface {
//Subscriber nats.Subscriber
Record(collected MetricsCollection) error
}
// RecorderHandler implements nats.Handler interface.
type RecorderHandler struct {
Recorder Recorder
Logger *zap.Logger
}
// Process consumes job queue, and use recorder to record.
func (h *RecorderHandler) Process(s nats.Subscription, m nats.Message) {
defer m.Ack()
collected := new(MetricsCollection)
err := json.Unmarshal(m.Data(), &collected)
if err != nil {
h.Logger.Error("recorder handler error", zap.Error(err))
return
}
err = h.Recorder.Record(*collected)
if err != nil {
h.Logger.Error("recorder handler error", zap.Error(err))
}
}

View File

@ -7,7 +7,7 @@ import (
"fmt"
"time"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/nats"
"go.uber.org/zap"
)
@ -20,7 +20,7 @@ const (
// Scheduler is struct to run scrape jobs.
type Scheduler struct {
Targets platform.ScraperTargetStoreService
Targets influxdb.ScraperTargetStoreService
// Interval is between each metrics gathering event.
Interval time.Duration
// Timeout is the maxisium time duration allowed by each TCP request
@ -38,7 +38,7 @@ type Scheduler struct {
func NewScheduler(
numScrapers int,
l *zap.Logger,
targets platform.ScraperTargetStoreService,
targets influxdb.ScraperTargetStoreService,
p nats.Publisher,
s nats.Subscriber,
interval time.Duration,
@ -111,14 +111,14 @@ func (s *Scheduler) run(ctx context.Context) error {
}
}
func requestScrape(t platform.ScraperTarget, publisher nats.Publisher) error {
func requestScrape(t influxdb.ScraperTarget, publisher nats.Publisher) error {
buf := new(bytes.Buffer)
err := json.NewEncoder(buf).Encode(t)
if err != nil {
return err
}
switch t.Type {
case platform.PrometheusScraperType:
case influxdb.PrometheusScraperType:
return publisher.Publish(promTargetSubject, buf)
}
return fmt.Errorf("unsupported target scrape type: %s", t.Type)

View File

@ -8,10 +8,10 @@ import (
"time"
"github.com/google/go-cmp/cmp"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
influxlogger "github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/mock"
platformtesting "github.com/influxdata/influxdb/testing"
influxdbtesting "github.com/influxdata/influxdb/testing"
)
func TestScheduler(t *testing.T) {
@ -29,20 +29,22 @@ func TestScheduler(t *testing.T) {
defer cancel()
storage := &mockStorage{
Metrics: make(map[int64]Metrics),
Targets: []platform.ScraperTarget{
Metrics: make(map[time.Time]Metrics),
Targets: []influxdb.ScraperTarget{
{
ID: platformtesting.MustIDBase16("3a0d0a6365646120"),
Type: platform.PrometheusScraperType,
URL: ts.URL + "/metrics",
ID: influxdbtesting.MustIDBase16("3a0d0a6365646120"),
Type: influxdb.PrometheusScraperType,
URL: ts.URL + "/metrics",
OrgID: *orgID,
BucketID: *bucketID,
},
},
TotalGatherJobs: make(chan struct{}, totalGatherJobs),
}
subscriber.Subscribe(MetricsSubject, "", &StorageHandler{
Logger: logger,
Storage: storage,
subscriber.Subscribe(MetricsSubject, "", &RecorderHandler{
Logger: logger,
Recorder: storage,
})
scheduler, err := NewScheduler(10, logger,

View File

@ -3,10 +3,10 @@ package gather
import (
"context"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
)
// Scraper gathers metrics from a scraper target.
type Scraper interface {
Gather(ctx context.Context, target platform.ScraperTarget) (ms []Metrics, err error)
Gather(ctx context.Context, target influxdb.ScraperTarget) (collected MetricsCollection, err error)
}

View File

@ -7,9 +7,15 @@ import (
"reflect"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
)
var (
orgID, _ = influxdb.IDFromString("020f755c3c082000")
bucketID, _ = influxdb.IDFromString("020f755c3c082001")
)
func TestPrometheusScraper(t *testing.T) {
@ -124,19 +130,19 @@ func TestPrometheusScraper(t *testing.T) {
defer ts.Close()
url = ts.URL
}
results, err := scraper.Gather(context.Background(), platform.ScraperTarget{
URL: url + "/metrics",
OrgName: "org1",
BucketName: "bucket1",
results, err := scraper.Gather(context.Background(), influxdb.ScraperTarget{
URL: url + "/metrics",
OrgID: *orgID,
BucketID: *bucketID,
})
if err != nil && !c.hasErr {
t.Fatalf("scraper parse err in testing %s: %v", c.name, err)
}
if len(c.ms) != len(results) {
if len(c.ms) != len(results.MetricsSlice) {
t.Fatalf("scraper parse metrics incorrect length, want %d, got %d",
len(c.ms), len(results))
len(c.ms), len(results.MetricsSlice))
}
for _, m := range results {
for _, m := range results.MetricsSlice {
for _, cm := range c.ms {
if m.Name == cm.Name {
if diff := cmp.Diff(m, cm, metricsCmpOption); diff != "" {
@ -182,47 +188,47 @@ go_memstats_frees_total 1.8944339e+07
go_memstats_gc_cpu_fraction 1.972734963012756e-05
`
// mockStorage implement storage interface
// and platform.ScraperTargetStoreService interface.
// mockStorage implement recorder interface
// and influxdb.ScraperTargetStoreService interface.
type mockStorage struct {
sync.RWMutex
TotalGatherJobs chan struct{}
Metrics map[int64]Metrics
Targets []platform.ScraperTarget
Metrics map[time.Time]Metrics
Targets []influxdb.ScraperTarget
}
func (s *mockStorage) Record(ms []Metrics) error {
func (s *mockStorage) Record(collected MetricsCollection) error {
s.Lock()
defer s.Unlock()
for _, m := range ms {
for _, m := range collected.MetricsSlice {
s.Metrics[m.Timestamp] = m
}
s.TotalGatherJobs <- struct{}{}
return nil
}
func (s *mockStorage) ListTargets(ctx context.Context) (targets []platform.ScraperTarget, err error) {
func (s *mockStorage) ListTargets(ctx context.Context) (targets []influxdb.ScraperTarget, err error) {
s.RLock()
defer s.RUnlock()
if s.Targets == nil {
s.Lock()
s.Targets = make([]platform.ScraperTarget, 0)
s.Targets = make([]influxdb.ScraperTarget, 0)
s.Unlock()
}
return s.Targets, nil
}
func (s *mockStorage) AddTarget(ctx context.Context, t *platform.ScraperTarget) error {
func (s *mockStorage) AddTarget(ctx context.Context, t *influxdb.ScraperTarget) error {
s.Lock()
defer s.Unlock()
if s.Targets == nil {
s.Targets = make([]platform.ScraperTarget, 0)
s.Targets = make([]influxdb.ScraperTarget, 0)
}
s.Targets = append(s.Targets, *t)
return nil
}
func (s *mockStorage) RemoveTarget(ctx context.Context, id platform.ID) error {
func (s *mockStorage) RemoveTarget(ctx context.Context, id influxdb.ID) error {
s.Lock()
defer s.Unlock()
@ -238,7 +244,7 @@ func (s *mockStorage) RemoveTarget(ctx context.Context, id platform.ID) error {
return nil
}
func (s *mockStorage) GetTargetByID(ctx context.Context, id platform.ID) (target *platform.ScraperTarget, err error) {
func (s *mockStorage) GetTargetByID(ctx context.Context, id influxdb.ID) (target *influxdb.ScraperTarget, err error) {
s.RLock()
defer s.RUnlock()
@ -253,7 +259,7 @@ func (s *mockStorage) GetTargetByID(ctx context.Context, id platform.ID) (target
}
func (s *mockStorage) UpdateTarget(ctx context.Context, update *platform.ScraperTarget) (target *platform.ScraperTarget, err error) {
func (s *mockStorage) UpdateTarget(ctx context.Context, update *influxdb.ScraperTarget) (target *influxdb.ScraperTarget, err error) {
s.Lock()
defer s.Unlock()

View File

@ -1,36 +0,0 @@
package gather
import (
"encoding/json"
"fmt"
"github.com/influxdata/influxdb/nats"
"go.uber.org/zap"
)
// Storage stores the metrics of a time based.
type Storage interface {
//Subscriber nats.Subscriber
Record([]Metrics) error
}
// StorageHandler implements nats.Handler interface.
type StorageHandler struct {
Storage Storage
Logger *zap.Logger
}
// Process consumes job queue, and use storage to record.
func (h *StorageHandler) Process(s nats.Subscription, m nats.Message) {
defer m.Ack()
ms := make([]Metrics, 0)
err := json.Unmarshal(m.Data(), &ms)
if err != nil {
h.Logger.Error(fmt.Sprintf("storage handler process err: %v", err))
return
}
err = h.Storage.Record(ms)
if err != nil {
h.Logger.Error(fmt.Sprintf("storage handler store err: %v", err))
}
}

View File

@ -20,6 +20,7 @@ type APIHandler struct {
DashboardHandler *DashboardHandler
AssetHandler *AssetHandler
ChronografHandler *ChronografHandler
ScraperHandler *ScraperHandler
SourceHandler *SourceHandler
MacroHandler *MacroHandler
TaskHandler *TaskHandler
@ -101,6 +102,9 @@ func NewAPIHandler(b *APIBackend) *APIHandler {
h.AuthorizationHandler.LookupService = b.LookupService
h.AuthorizationHandler.Logger = b.Logger.With(zap.String("handler", "auth"))
h.ScraperHandler = NewScraperHandler()
h.ScraperHandler.ScraperStorageService = b.ScraperTargetStoreService
h.SourceHandler = NewSourceHandler()
h.SourceHandler.SourceService = b.SourceService
h.SourceHandler.NewBucketService = b.NewBucketService
@ -159,10 +163,11 @@ var apiLinks = map[string]interface{}{
"spec": "/api/v2/query/spec",
"suggestions": "/api/v2/query/suggestions",
},
"setup": "/api/v2/setup",
"signin": "/api/v2/signin",
"signout": "/api/v2/signout",
"sources": "/api/v2/sources",
"setup": "/api/v2/setup",
"signin": "/api/v2/signin",
"signout": "/api/v2/signout",
"sources": "/api/v2/sources",
"scrapertargets": "/api/v2/scrapertargets",
"system": map[string]string{
"metrics": "/metrics",
"debug": "/debug/pprof",
@ -250,6 +255,11 @@ func (h *APIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
if strings.HasPrefix(r.URL.Path, "/api/v2/scrapertargets") {
h.ScraperHandler.ServeHTTP(w, r)
return
}
if strings.HasPrefix(r.URL.Path, "/api/v2/tasks") {
h.TaskHandler.ServeHTTP(w, r)
return

View File

@ -8,7 +8,6 @@ import (
"path"
platform "github.com/influxdata/influxdb"
kerrors "github.com/influxdata/influxdb/kit/errors"
"github.com/julienschmidt/httprouter"
"go.uber.org/zap"
)
@ -159,7 +158,10 @@ func decodeScraperTargetIDRequest(ctx context.Context, r *http.Request) (*platfo
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
if id == "" {
return nil, kerrors.InvalidDataf("url missing id")
return nil, &platform.Error{
Code: platform.EInvalid,
Msg: "url missing id",
}
}
var i platform.ID
@ -270,6 +272,21 @@ func (s *ScraperService) AddTarget(ctx context.Context, target *platform.Scraper
return err
}
if !target.OrgID.Valid() {
return &platform.Error{
Code: platform.EInvalid,
Msg: "org id is invalid",
Op: s.OpPrefix + platform.OpAddTarget,
}
}
if !target.BucketID.Valid() {
return &platform.Error{
Code: platform.EInvalid,
Msg: "bucket id is invalid",
Op: s.OpPrefix + platform.OpAddTarget,
}
}
octets, err := json.Marshal(target)
if err != nil {
return err

View File

@ -55,20 +55,20 @@ func TestService_handleGetScraperTargets(t *testing.T) {
ListTargetsF: func(ctx context.Context) ([]platform.ScraperTarget, error) {
return []platform.ScraperTarget{
{
ID: targetOneID,
Name: "target-1",
Type: platform.PrometheusScraperType,
URL: "www.one.url",
OrgName: "org-name",
BucketName: "bkt-one",
ID: targetOneID,
Name: "target-1",
Type: platform.PrometheusScraperType,
URL: "www.one.url",
OrgID: platformtesting.MustIDBase16("0000000000000211"),
BucketID: platformtesting.MustIDBase16("0000000000000212"),
},
{
ID: targetTwoID,
Name: "target-2",
Type: platform.PrometheusScraperType,
URL: "www.two.url",
OrgName: "org-name",
BucketName: "bkt-two",
ID: targetTwoID,
Name: "target-2",
Type: platform.PrometheusScraperType,
URL: "www.two.url",
OrgID: platformtesting.MustIDBase16("0000000000000211"),
BucketID: platformtesting.MustIDBase16("0000000000000212"),
},
}, nil
},
@ -88,8 +88,8 @@ func TestService_handleGetScraperTargets(t *testing.T) {
{
"id": "%s",
"name": "target-1",
"bucket": "bkt-one",
"org": "org-name",
"bucketID": "0000000000000212",
"orgID": "0000000000000211",
"type": "prometheus",
"url": "www.one.url",
"links": {
@ -99,8 +99,8 @@ func TestService_handleGetScraperTargets(t *testing.T) {
{
"id": "%s",
"name": "target-2",
"bucket": "bkt-two",
"org": "org-name",
"bucketID": "0000000000000212",
"orgID": "0000000000000211",
"type": "prometheus",
"url": "www.two.url",
"links": {
@ -204,12 +204,12 @@ func TestService_handleGetScraperTarget(t *testing.T) {
GetTargetByIDF: func(ctx context.Context, id platform.ID) (*platform.ScraperTarget, error) {
if id == targetOneID {
return &platform.ScraperTarget{
ID: targetOneID,
Name: "target-1",
Type: platform.PrometheusScraperType,
URL: "www.some.url",
OrgName: "org-name",
BucketName: "bkt-name",
ID: targetOneID,
Name: "target-1",
Type: platform.PrometheusScraperType,
URL: "www.some.url",
OrgID: platformtesting.MustIDBase16("0000000000000211"),
BucketID: platformtesting.MustIDBase16("0000000000000212"),
}, nil
}
return nil, fmt.Errorf("not found")
@ -229,8 +229,8 @@ func TestService_handleGetScraperTarget(t *testing.T) {
"name": "target-1",
"type": "prometheus",
"url": "www.some.url",
"bucket": "bkt-name",
"org": "org-name",
"bucketID": "0000000000000212",
"orgID": "0000000000000211",
"links": {
"self": "/api/v2/scrapertargets/%[1]s"
}
@ -413,11 +413,11 @@ func TestService_handlePostScraperTarget(t *testing.T) {
},
args: args{
target: &platform.ScraperTarget{
Name: "hello",
Type: platform.PrometheusScraperType,
BucketName: "bkt-name",
OrgName: "org-name",
URL: "www.some.url",
Name: "hello",
Type: platform.PrometheusScraperType,
BucketID: platformtesting.MustIDBase16("0000000000000212"),
OrgID: platformtesting.MustIDBase16("0000000000000211"),
URL: "www.some.url",
},
},
wants: wants{
@ -430,8 +430,8 @@ func TestService_handlePostScraperTarget(t *testing.T) {
"name": "hello",
"type": "prometheus",
"url": "www.some.url",
"org": "org-name",
"bucket": "bkt-name",
"orgID": "0000000000000211",
"bucketID": "0000000000000212",
"links": {
"self": "/api/v2/scrapertargets/%[1]s"
}
@ -513,12 +513,12 @@ func TestService_handlePatchScraperTarget(t *testing.T) {
args: args{
id: targetOneIDString,
update: &platform.ScraperTarget{
ID: targetOneID,
Name: "name",
BucketName: "buck",
Type: platform.PrometheusScraperType,
URL: "www.example.url",
OrgName: "orgg",
ID: targetOneID,
Name: "name",
BucketID: platformtesting.MustIDBase16("0000000000000212"),
Type: platform.PrometheusScraperType,
URL: "www.example.url",
OrgID: platformtesting.MustIDBase16("0000000000000211"),
},
},
wants: wants{
@ -531,8 +531,8 @@ func TestService_handlePatchScraperTarget(t *testing.T) {
"name":"name",
"type":"prometheus",
"url":"www.example.url",
"org":"orgg",
"bucket":"buck",
"orgID":"0000000000000211",
"bucketID":"0000000000000212",
"links":{
"self":"/api/v2/scrapertargets/%[1]s"
}
@ -557,12 +557,12 @@ func TestService_handlePatchScraperTarget(t *testing.T) {
args: args{
id: targetOneIDString,
update: &platform.ScraperTarget{
ID: targetOneID,
Name: "name",
BucketName: "buck",
Type: platform.PrometheusScraperType,
URL: "www.example.url",
OrgName: "orgg",
ID: targetOneID,
Name: "name",
BucketID: platformtesting.MustIDBase16("0000000000000212"),
Type: platform.PrometheusScraperType,
URL: "www.example.url",
OrgID: platformtesting.MustIDBase16("0000000000000211"),
},
},
wants: wants{

View File

@ -53,6 +53,20 @@ func (s *Service) ListTargets(ctx context.Context) (list []platform.ScraperTarge
// AddTarget add a new scraper target into storage.
func (s *Service) AddTarget(ctx context.Context, target *platform.ScraperTarget) (err error) {
target.ID = s.IDGenerator.ID()
if !target.OrgID.Valid() {
return &platform.Error{
Code: platform.EInvalid,
Msg: "org id is invalid",
Op: OpPrefix + platform.OpAddTarget,
}
}
if !target.BucketID.Valid() {
return &platform.Error{
Code: platform.EInvalid,
Msg: "bucket id is invalid",
Op: OpPrefix + platform.OpAddTarget,
}
}
if err := s.PutTarget(ctx, target); err != nil {
return &platform.Error{
Op: OpPrefix + platform.OpAddTarget,
@ -84,13 +98,19 @@ func (s *Service) UpdateTarget(ctx context.Context, update *platform.ScraperTarg
Msg: "id is invalid",
}
}
_, pe := s.loadScraperTarget(update.ID)
oldTarget, pe := s.loadScraperTarget(update.ID)
if pe != nil {
return nil, &platform.Error{
Op: op,
Err: pe,
}
}
if !update.OrgID.Valid() {
update.OrgID = oldTarget.OrgID
}
if !update.BucketID.Valid() {
update.BucketID = oldTarget.BucketID
}
if err = s.PutTarget(ctx, update); err != nil {
return nil, &platform.Error{
Op: op,

View File

@ -18,12 +18,12 @@ const (
// ScraperTarget is a target to scrape
type ScraperTarget struct {
ID ID `json:"id,omitempty"`
Name string `json:"name"`
Type ScraperType `json:"type"`
URL string `json:"url"`
OrgName string `json:"org"`
BucketName string `json:"bucket"`
ID ID `json:"id,omitempty"`
Name string `json:"name"`
Type ScraperType `json:"type"`
URL string `json:"url"`
OrgID ID `json:"orgID,omitempty"`
BucketID ID `json:"bucketID,omitempty"`
}
// ScraperTargetStoreService defines the crud service for ScraperTarget.

View File

@ -99,22 +99,106 @@ func AddTarget(
},
args: args{
target: &platform.ScraperTarget{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgName: "org1",
BucketName: "bucket1",
URL: "url1",
Name: "name1",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
URL: "url1",
},
},
wants: wants{
targets: []platform.ScraperTarget{
{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgName: "org1",
BucketName: "bucket1",
URL: "url1",
ID: MustIDBase16(targetOneID),
Name: "name1",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
URL: "url1",
ID: MustIDBase16(targetOneID),
},
},
},
},
{
name: "create target with invalid org id",
fields: TargetFields{
IDGenerator: mock.NewIDGenerator(targetTwoID, t),
Targets: []*platform.ScraperTarget{
{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
URL: "url1",
ID: MustIDBase16(targetOneID),
},
},
},
args: args{
target: &platform.ScraperTarget{
ID: MustIDBase16(targetTwoID),
Name: "name2",
Type: platform.PrometheusScraperType,
BucketID: MustIDBase16(bucketTwoID),
URL: "url2",
},
},
wants: wants{
err: &platform.Error{
Code: platform.EInvalid,
Msg: "org id is invalid",
Op: platform.OpAddTarget,
},
targets: []platform.ScraperTarget{
{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
URL: "url1",
ID: MustIDBase16(targetOneID),
},
},
},
},
{
name: "create target with invalid bucket id",
fields: TargetFields{
IDGenerator: mock.NewIDGenerator(targetTwoID, t),
Targets: []*platform.ScraperTarget{
{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
URL: "url1",
ID: MustIDBase16(targetOneID),
},
},
},
args: args{
target: &platform.ScraperTarget{
ID: MustIDBase16(targetTwoID),
Name: "name2",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgTwoID),
URL: "url2",
},
},
wants: wants{
err: &platform.Error{
Code: platform.EInvalid,
Msg: "bucket id is invalid",
Op: platform.OpAddTarget,
},
targets: []platform.ScraperTarget{
{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
URL: "url1",
ID: MustIDBase16(targetOneID),
},
},
},
@ -125,42 +209,42 @@ func AddTarget(
IDGenerator: mock.NewIDGenerator(targetTwoID, t),
Targets: []*platform.ScraperTarget{
{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgName: "org1",
BucketName: "bucket1",
URL: "url1",
ID: MustIDBase16(targetOneID),
Name: "name1",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
URL: "url1",
ID: MustIDBase16(targetOneID),
},
},
},
args: args{
target: &platform.ScraperTarget{
ID: MustIDBase16(targetTwoID),
Name: "name2",
Type: platform.PrometheusScraperType,
OrgName: "org2",
BucketName: "bucket2",
URL: "url2",
ID: MustIDBase16(targetTwoID),
Name: "name2",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgTwoID),
BucketID: MustIDBase16(bucketTwoID),
URL: "url2",
},
},
wants: wants{
targets: []platform.ScraperTarget{
{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgName: "org1",
BucketName: "bucket1",
URL: "url1",
ID: MustIDBase16(targetOneID),
Name: "name1",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
URL: "url1",
ID: MustIDBase16(targetOneID),
},
{
Name: "name2",
Type: platform.PrometheusScraperType,
OrgName: "org2",
BucketName: "bucket2",
URL: "url2",
ID: MustIDBase16(targetTwoID),
Name: "name2",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgTwoID),
BucketID: MustIDBase16(bucketTwoID),
URL: "url2",
ID: MustIDBase16(targetTwoID),
},
},
},
@ -208,40 +292,40 @@ func ListTargets(
fields: TargetFields{
Targets: []*platform.ScraperTarget{
{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgName: "org1",
BucketName: "bucket1",
URL: "url1",
ID: MustIDBase16(targetOneID),
Name: "name1",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
URL: "url1",
ID: MustIDBase16(targetOneID),
},
{
Name: "name2",
Type: platform.PrometheusScraperType,
OrgName: "org2",
BucketName: "bucket2",
URL: "url2",
ID: MustIDBase16(targetTwoID),
Name: "name2",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgTwoID),
BucketID: MustIDBase16(bucketTwoID),
URL: "url2",
ID: MustIDBase16(targetTwoID),
},
},
},
wants: wants{
targets: []platform.ScraperTarget{
{
Name: "name1",
Type: platform.PrometheusScraperType,
OrgName: "org1",
BucketName: "bucket1",
URL: "url1",
ID: MustIDBase16(targetOneID),
Name: "name1",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
URL: "url1",
ID: MustIDBase16(targetOneID),
},
{
Name: "name2",
Type: platform.PrometheusScraperType,
OrgName: "org2",
BucketName: "bucket2",
URL: "url2",
ID: MustIDBase16(targetTwoID),
Name: "name2",
Type: platform.PrometheusScraperType,
OrgID: MustIDBase16(orgTwoID),
BucketID: MustIDBase16(bucketTwoID),
URL: "url2",
ID: MustIDBase16(targetTwoID),
},
},
},
@ -286,12 +370,16 @@ func GetTargetByID(
fields: TargetFields{
Targets: []*platform.ScraperTarget{
{
ID: MustIDBase16(targetOneID),
Name: "target1",
ID: MustIDBase16(targetOneID),
Name: "target1",
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
{
ID: MustIDBase16(targetTwoID),
Name: "target2",
ID: MustIDBase16(targetTwoID),
Name: "target2",
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
},
},
@ -300,8 +388,10 @@ func GetTargetByID(
},
wants: wants{
target: &platform.ScraperTarget{
ID: MustIDBase16(targetTwoID),
Name: "target2",
ID: MustIDBase16(targetTwoID),
Name: "target2",
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
},
},
@ -310,12 +400,16 @@ func GetTargetByID(
fields: TargetFields{
Targets: []*platform.ScraperTarget{
{
ID: MustIDBase16(targetOneID),
Name: "target1",
ID: MustIDBase16(targetOneID),
Name: "target1",
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
{
ID: MustIDBase16(targetTwoID),
Name: "target2",
ID: MustIDBase16(targetTwoID),
Name: "target2",
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
},
},
@ -369,10 +463,14 @@ func RemoveTarget(init func(TargetFields, *testing.T) (platform.ScraperTargetSto
fields: TargetFields{
Targets: []*platform.ScraperTarget{
{
ID: MustIDBase16(targetOneID),
ID: MustIDBase16(targetOneID),
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
{
ID: MustIDBase16(targetTwoID),
ID: MustIDBase16(targetTwoID),
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
},
},
@ -382,7 +480,9 @@ func RemoveTarget(init func(TargetFields, *testing.T) (platform.ScraperTargetSto
wants: wants{
targets: []platform.ScraperTarget{
{
ID: MustIDBase16(targetTwoID),
ID: MustIDBase16(targetTwoID),
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
},
},
@ -392,10 +492,14 @@ func RemoveTarget(init func(TargetFields, *testing.T) (platform.ScraperTargetSto
fields: TargetFields{
Targets: []*platform.ScraperTarget{
{
ID: MustIDBase16(targetOneID),
ID: MustIDBase16(targetOneID),
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
{
ID: MustIDBase16(targetTwoID),
ID: MustIDBase16(targetTwoID),
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
},
},
@ -410,10 +514,14 @@ func RemoveTarget(init func(TargetFields, *testing.T) (platform.ScraperTargetSto
},
targets: []platform.ScraperTarget{
{
ID: MustIDBase16(targetOneID),
ID: MustIDBase16(targetOneID),
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
{
ID: MustIDBase16(targetTwoID),
ID: MustIDBase16(targetTwoID),
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
},
},
@ -463,12 +571,16 @@ func UpdateTarget(
fields: TargetFields{
Targets: []*platform.ScraperTarget{
{
ID: MustIDBase16(targetOneID),
URL: "url1",
ID: MustIDBase16(targetOneID),
URL: "url1",
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
{
ID: MustIDBase16(targetTwoID),
URL: "url2",
ID: MustIDBase16(targetTwoID),
URL: "url2",
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
},
},
@ -488,12 +600,16 @@ func UpdateTarget(
fields: TargetFields{
Targets: []*platform.ScraperTarget{
{
ID: MustIDBase16(targetOneID),
URL: "url1",
ID: MustIDBase16(targetOneID),
URL: "url1",
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
{
ID: MustIDBase16(targetTwoID),
URL: "url2",
ID: MustIDBase16(targetTwoID),
URL: "url2",
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
},
},
@ -514,12 +630,16 @@ func UpdateTarget(
fields: TargetFields{
Targets: []*platform.ScraperTarget{
{
ID: MustIDBase16(targetOneID),
URL: "url1",
ID: MustIDBase16(targetOneID),
URL: "url1",
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
{
ID: MustIDBase16(targetTwoID),
URL: "url2",
ID: MustIDBase16(targetTwoID),
URL: "url2",
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
},
},
@ -529,8 +649,10 @@ func UpdateTarget(
},
wants: wants{
target: &platform.ScraperTarget{
ID: MustIDBase16(targetOneID),
URL: "changed",
ID: MustIDBase16(targetOneID),
URL: "changed",
OrgID: MustIDBase16(orgOneID),
BucketID: MustIDBase16(bucketOneID),
},
},
},