diff --git a/bolt/scraper.go b/bolt/scraper.go index b04c8e0e45..f44d8c5632 100644 --- a/bolt/scraper.go +++ b/bolt/scraper.go @@ -46,6 +46,20 @@ func (c *Client) ListTargets(ctx context.Context) (list []platform.ScraperTarget // AddTarget add a new scraper target into storage. func (c *Client) AddTarget(ctx context.Context, target *platform.ScraperTarget) (err error) { + if !target.OrgID.Valid() { + return &platform.Error{ + Code: platform.EInvalid, + Msg: "org id is invalid", + Op: OpPrefix + platform.OpAddTarget, + } + } + if !target.BucketID.Valid() { + return &platform.Error{ + Code: platform.EInvalid, + Msg: "bucket id is invalid", + Op: OpPrefix + platform.OpAddTarget, + } + } err = c.db.Update(func(tx *bolt.Tx) error { target.ID = c.IDGenerator.ID() return c.putTarget(ctx, tx, target) @@ -100,6 +114,12 @@ func (c *Client) UpdateTarget(ctx context.Context, update *platform.ScraperTarge if pe != nil { return pe } + if !update.BucketID.Valid() { + update.BucketID = target.BucketID + } + if !update.OrgID.Valid() { + update.OrgID = target.OrgID + } target = update return c.putTarget(ctx, tx, target) }) diff --git a/bolt/scraper_test.go b/bolt/scraper_test.go index c7431602e9..7770199a64 100644 --- a/bolt/scraper_test.go +++ b/bolt/scraper_test.go @@ -18,7 +18,7 @@ func initScraperTargetStoreService(f platformtesting.TargetFields, t *testing.T) ctx := context.Background() for _, target := range f.Targets { if err := c.PutTarget(ctx, target); err != nil { - t.Fatalf("failed to populate users") + t.Fatalf("failed to populate targets: %v", err) } } return c, bolt.OpPrefix, func() { diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 727dcd6a0c..4df034719f 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -375,6 +375,12 @@ func (m *Launcher) run(ctx context.Context) (err error) { return err } + subscriber.Subscribe(gather.MetricsSubject, "", &gather.RecorderHandler{ + Logger: m.logger, + Recorder: gather.PointWriter{ + Writer: pointsWriter, + }, + }) scraperScheduler, err := gather.NewScheduler(10, m.logger, scraperTargetSvc, publisher, subscriber, 0, 0) if err != nil { m.logger.Error("failed to create scraper subscriber", zap.Error(err)) diff --git a/gather/README.md b/gather/README.md new file mode 100644 index 0000000000..9f2f5376cc --- /dev/null +++ b/gather/README.md @@ -0,0 +1,53 @@ +# How to use this package + +## Make sure nats is running. Both publisher and subscriber are open + +```go +// NATS streaming server +m.natsServer = nats.NewServer(nats.Config{FilestoreDir: m.natsPath}) +if err := m.natsServer.Open(); err != nil { + m.logger.Error("failed to start nats streaming server", zap.Error(err)) + return err +} + +publisher := nats.NewAsyncPublisher("nats-publisher") +if err := publisher.Open(); err != nil { + m.logger.Error("failed to connect to streaming server", zap.Error(err)) + return err +} + +subscriber := nats.NewQueueSubscriber("nats-subscriber") +if err := subscriber.Open(); err != nil { + m.logger.Error("failed to connect to streaming server", zap.Error(err)) + return err +} +``` + +## Make sure the scraperTargetStorageService is accessible + +```go +scraperTargetSvc influxdb.ScraperTargetStoreService = m.boltClient +``` + +## Setup recorder, Make sure subscriber subscribes use the correct recorder with the correct write service + +```go +recorder := gather.PlatformWriter{ + Timeout: time.Millisecond * 30, + Writer: writer, +} +subscriber.Subscribe(MetricsSubject, "", &RecorderHandler{ + Logger: logger, + Recorder: recorder, +}) +``` + +## Start the scheduler + +```go +scraperScheduler, err := gather.NewScheduler(10, m.logger, scraperTargetSvc, publisher, subscriber, 0, 0) +if err != nil { + m.logger.Error("failed to create scraper subscriber", zap.Error(err)) + return err +} +``` \ No newline at end of file diff --git a/gather/handler.go b/gather/handler.go index 63bad92061..6f55dcd038 100644 --- a/gather/handler.go +++ b/gather/handler.go @@ -5,7 +5,7 @@ import ( "context" "encoding/json" - platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/nats" "go.uber.org/zap" ) @@ -22,7 +22,7 @@ type handler struct { func (h *handler) Process(s nats.Subscription, m nats.Message) { defer m.Ack() - req := new(platform.ScraperTarget) + req := new(influxdb.ScraperTarget) err := json.Unmarshal(m.Data(), req) if err != nil { h.Logger.Error("unable to unmarshal json", zap.Error(err)) @@ -35,7 +35,7 @@ func (h *handler) Process(s nats.Subscription, m nats.Message) { return } - // send metrics to storage queue + // send metrics to recorder queue buf := new(bytes.Buffer) if err := json.NewEncoder(buf).Encode(ms); err != nil { h.Logger.Error("unable to marshal json", zap.Error(err)) diff --git a/gather/metrics.go b/gather/metrics.go index 8f91f4e9ec..afae39c903 100644 --- a/gather/metrics.go +++ b/gather/metrics.go @@ -1,18 +1,76 @@ package gather import ( + "bytes" + "io" + "time" + "github.com/gogo/protobuf/proto" + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/models" ) +// MetricsCollection is the struct including metrics and other requirements. +type MetricsCollection struct { + OrgID influxdb.ID `json:"orgID"` + BucketID influxdb.ID `json:"bucketID"` + MetricsSlice MetricsSlice `json:"metrics"` +} + // Metrics is the default influx based metrics. type Metrics struct { Name string `json:"name"` Tags map[string]string `json:"tags"` Fields map[string]interface{} `json:"fields"` - Timestamp int64 `json:"timestamp"` + Timestamp time.Time `json:"timestamp"` Type MetricType `json:"type"` } +// MetricsSlice is a slice of Metrics +type MetricsSlice []Metrics + +// Points convert the MetricsSlice to model.Points +func (ms MetricsSlice) Points() (models.Points, error) { + ps := make([]models.Point, len(ms)) + for mi, m := range ms { + point, err := models.NewPoint(m.Name, models.NewTags(m.Tags), m.Fields, m.Timestamp) + if err != nil { + return ps, err + } + if m.Type.Valid() { + point.AddTag("type", m.Type.String()) + } + ps[mi] = point + } + return ps, nil +} + +// Reader returns an io.Reader that enumerates the metrics. +// All metrics are allocated into the underlying buffer. +func (ms MetricsSlice) Reader() (io.Reader, error) { + buf := new(bytes.Buffer) + for mi, m := range ms { + point, err := models.NewPoint(m.Name, models.NewTags(m.Tags), m.Fields, m.Timestamp) + if err != nil { + return nil, err + } + if m.Type.Valid() { + point.AddTag("type", m.Type.String()) + } + _, err = buf.WriteString(point.String()) + if err != nil { + return nil, err + } + if mi < len(ms)-1 && len(ms) > 1 { + _, err = buf.WriteString("\n") + if err != nil { + return nil, err + } + } + } + return buf, nil +} + // MetricType is prometheus metrics type. type MetricType int @@ -40,6 +98,11 @@ var metricTypeValue = map[string]int32{ "HISTOGRAM": 4, } +// Valid returns whether the metrics type is valid. +func (x MetricType) Valid() bool { + return x >= MetricTypeCounter && x <= MetricTypeHistogrm +} + // String returns the string value of MetricType. func (x MetricType) String() string { return metricTypeName[x] diff --git a/gather/metrics_test.go b/gather/metrics_test.go index fe28e00106..5c01550ef7 100644 --- a/gather/metrics_test.go +++ b/gather/metrics_test.go @@ -1,13 +1,114 @@ package gather import ( + "bytes" "encoding/json" "testing" + "time" "github.com/google/go-cmp/cmp" ) -func TestMetrics(t *testing.T) { +func TestMetricsReader(t *testing.T) { + cases := []struct { + name string + ms MetricsSlice + wants string + or string + }{ + { + name: "single value only", + ms: []Metrics{ + { + Name: "cpu_load_short", + Fields: map[string]interface{}{ + "value": 0.64, + }, + Type: -1, + Timestamp: time.Unix(0, 1422568543702900257), + }, + }, + wants: "cpu_load_short value=0.64 1422568543702900257", + }, + { + name: "single regular metrics", + ms: []Metrics{ + { + Name: "cpu_load_short", + Tags: map[string]string{ + "host": "server01", + "region": "us-west", + }, + Fields: map[string]interface{}{ + "value": 0.64, + }, + Type: MetricTypeGauge, + Timestamp: time.Unix(0, 1422568543702900257), + }, + }, + wants: "cpu_load_short,host=server01,region=us-west,type=GAUGE value=0.64 1422568543702900257", + }, + { + name: "multiple value only", + ms: []Metrics{ + { + Name: "cpu_load_short", + Fields: map[string]interface{}{ + "value": 0.64, + "region": "us-west", + }, + Type: -1, + Timestamp: time.Unix(0, 1522568543702900257), + }, + }, + wants: `cpu_load_short region="us-west",value=0.64 1522568543702900257`, + }, + { + name: "multiple metrics", + ms: []Metrics{ + { + Name: "cpu_load_short", + Tags: map[string]string{ + "region": "us-west", + }, + Fields: map[string]interface{}{ + "value": 0.64, + }, + Type: -1, + Timestamp: time.Unix(0, 1422568543702900257), + }, + { + Name: "cpu_load_short", + Tags: map[string]string{ + "region": "us-east", + }, + Fields: map[string]interface{}{ + "value": 0.34, + }, + Type: -1, + Timestamp: time.Unix(0, 1522568543702900257), + }, + }, + wants: "cpu_load_short,region=us-west value=0.64 1422568543702900257\ncpu_load_short,region=us-east value=0.34 1522568543702900257", + }, + } + for _, c := range cases { + r, err := c.ms.Reader() + if err != nil { + t.Fatalf("error in convert metrics to reader: %v", err) + } + buf := new(bytes.Buffer) + buf.ReadFrom(r) + + if diff1 := cmp.Diff(c.wants, buf.String(), nil); diff1 != "" { + if diff2 := cmp.Diff(c.or, buf.String(), nil); diff2 != "" { + t.Fatalf("convert metrics is incorrect, diff %s", diff1) + } + } + } +} + +func TestMetricsMarshal(t *testing.T) { cases := []struct { name string ms []Metrics @@ -20,7 +121,7 @@ func TestMetrics(t *testing.T) { name: "single", ms: []Metrics{ { - Timestamp: 12345, + Timestamp: time.Unix(12345, 0), Tags: map[string]string{ "b": "B", "a": "A", @@ -38,7 +139,7 @@ func TestMetrics(t *testing.T) { name: "multiple", ms: []Metrics{ { - Timestamp: 12345, + Timestamp: time.Unix(12345, 0), Tags: map[string]string{ "b": "B", "a": "A", @@ -52,7 +153,7 @@ func TestMetrics(t *testing.T) { }, { - Timestamp: 12345, + Timestamp: time.Unix(12345, 0), Tags: map[string]string{ "b": "B2", "a": "A2", diff --git a/gather/prometheus.go b/gather/prometheus.go index 5a51d54310..a68f2c733b 100644 --- a/gather/prometheus.go +++ b/gather/prometheus.go @@ -9,7 +9,7 @@ import ( "net/http" "time" - platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb" "github.com/matttproud/golang_protobuf_extensions/pbutil" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" @@ -20,23 +20,23 @@ import ( type prometheusScraper struct{} // Gather parse metrics from a scraper target url. -func (p *prometheusScraper) Gather(ctx context.Context, target platform.ScraperTarget) (ms []Metrics, err error) { +func (p *prometheusScraper) Gather(ctx context.Context, target influxdb.ScraperTarget) (collected MetricsCollection, err error) { resp, err := http.Get(target.URL) if err != nil { - return ms, err + return collected, err } defer resp.Body.Close() - return p.parse(resp.Body, resp.Header) + return p.parse(resp.Body, resp.Header, target) } -func (p *prometheusScraper) parse(r io.Reader, header http.Header) ([]Metrics, error) { +func (p *prometheusScraper) parse(r io.Reader, header http.Header, target influxdb.ScraperTarget) (collected MetricsCollection, err error) { var parser expfmt.TextParser now := time.Now() mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type")) if err != nil { - return nil, err + return collected, err } // Prepare output metricFamilies := make(map[string]*dto.MetricFamily) @@ -49,14 +49,14 @@ func (p *prometheusScraper) parse(r io.Reader, header http.Header) ([]Metrics, e if err == io.EOF { break } - return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err) + return collected, fmt.Errorf("reading metric family protocol buffer failed: %s", err) } metricFamilies[mf.GetName()] = mf } } else { metricFamilies, err = parser.TextToMetricFamilies(r) if err != nil { - return nil, fmt.Errorf("reading text format failed: %s", err) + return collected, fmt.Errorf("reading text format failed: %s", err) } } ms := make([]Metrics, 0) @@ -91,7 +91,7 @@ func (p *prometheusScraper) parse(r io.Reader, header http.Header) ([]Metrics, e tm = time.Unix(0, *m.TimestampMs*1000000) } me := Metrics{ - Timestamp: tm.UnixNano(), + Timestamp: tm, Tags: tags, Fields: fields, Name: name, @@ -102,7 +102,13 @@ func (p *prometheusScraper) parse(r io.Reader, header http.Header) ([]Metrics, e } - return ms, nil + collected = MetricsCollection{ + MetricsSlice: ms, + OrgID: target.OrgID, + BucketID: target.BucketID, + } + + return collected, nil } // Get labels from metric diff --git a/gather/recorder.go b/gather/recorder.go new file mode 100644 index 0000000000..eefad9761e --- /dev/null +++ b/gather/recorder.go @@ -0,0 +1,82 @@ +package gather + +import ( + "context" + "encoding/json" + "time" + + "github.com/influxdata/influxdb/tsdb" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/nats" + "github.com/influxdata/influxdb/storage" + "go.uber.org/zap" +) + +// PointWriter will use the storage.PointWriter interface to record metrics. +type PointWriter struct { + Writer storage.PointsWriter +} + +// Record the metrics and write using storage.PointWriter interface. +func (s PointWriter) Record(collected MetricsCollection) error { + ps, err := collected.MetricsSlice.Points() + if err != nil { + return err + } + ps, err = tsdb.ExplodePoints(collected.OrgID, collected.BucketID, ps) + if err != nil { + return err + } + return s.Writer.WritePoints(ps) +} + +// ServiceWriter will use the writer interface to record the metrics. +type ServiceWriter struct { + Writer influxdb.WriteService + Timeout time.Duration +} + +// Record the metrics and write using writer interface. +func (s ServiceWriter) Record(collected MetricsCollection) error { + r, err := collected.MetricsSlice.Reader() + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), s.Timeout) + defer cancel() + s.Writer.Write(ctx, + collected.OrgID, + collected.BucketID, + r, + ) + + return nil +} + +// Recorder record the metrics of a time based. +type Recorder interface { + //Subscriber nats.Subscriber + Record(collected MetricsCollection) error +} + +// RecorderHandler implements nats.Handler interface. +type RecorderHandler struct { + Recorder Recorder + Logger *zap.Logger +} + +// Process consumes job queue, and use recorder to record. +func (h *RecorderHandler) Process(s nats.Subscription, m nats.Message) { + defer m.Ack() + collected := new(MetricsCollection) + err := json.Unmarshal(m.Data(), &collected) + if err != nil { + h.Logger.Error("recorder handler error", zap.Error(err)) + return + } + err = h.Recorder.Record(*collected) + if err != nil { + h.Logger.Error("recorder handler error", zap.Error(err)) + } +} diff --git a/gather/scheduler.go b/gather/scheduler.go index 629ae76a2d..28104bb67b 100644 --- a/gather/scheduler.go +++ b/gather/scheduler.go @@ -7,7 +7,7 @@ import ( "fmt" "time" - platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/nats" "go.uber.org/zap" ) @@ -20,7 +20,7 @@ const ( // Scheduler is struct to run scrape jobs. type Scheduler struct { - Targets platform.ScraperTargetStoreService + Targets influxdb.ScraperTargetStoreService // Interval is between each metrics gathering event. Interval time.Duration // Timeout is the maxisium time duration allowed by each TCP request @@ -38,7 +38,7 @@ type Scheduler struct { func NewScheduler( numScrapers int, l *zap.Logger, - targets platform.ScraperTargetStoreService, + targets influxdb.ScraperTargetStoreService, p nats.Publisher, s nats.Subscriber, interval time.Duration, @@ -111,14 +111,14 @@ func (s *Scheduler) run(ctx context.Context) error { } } -func requestScrape(t platform.ScraperTarget, publisher nats.Publisher) error { +func requestScrape(t influxdb.ScraperTarget, publisher nats.Publisher) error { buf := new(bytes.Buffer) err := json.NewEncoder(buf).Encode(t) if err != nil { return err } switch t.Type { - case platform.PrometheusScraperType: + case influxdb.PrometheusScraperType: return publisher.Publish(promTargetSubject, buf) } return fmt.Errorf("unsupported target scrape type: %s", t.Type) diff --git a/gather/scheduler_test.go b/gather/scheduler_test.go index cee5eb2914..f0eceebed9 100644 --- a/gather/scheduler_test.go +++ b/gather/scheduler_test.go @@ -8,10 +8,10 @@ import ( "time" "github.com/google/go-cmp/cmp" - platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb" influxlogger "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/mock" - platformtesting "github.com/influxdata/influxdb/testing" + influxdbtesting "github.com/influxdata/influxdb/testing" ) func TestScheduler(t *testing.T) { @@ -29,20 +29,22 @@ func TestScheduler(t *testing.T) { defer cancel() storage := &mockStorage{ - Metrics: make(map[int64]Metrics), - Targets: []platform.ScraperTarget{ + Metrics: make(map[time.Time]Metrics), + Targets: []influxdb.ScraperTarget{ { - ID: platformtesting.MustIDBase16("3a0d0a6365646120"), - Type: platform.PrometheusScraperType, - URL: ts.URL + "/metrics", + ID: influxdbtesting.MustIDBase16("3a0d0a6365646120"), + Type: influxdb.PrometheusScraperType, + URL: ts.URL + "/metrics", + OrgID: *orgID, + BucketID: *bucketID, }, }, TotalGatherJobs: make(chan struct{}, totalGatherJobs), } - subscriber.Subscribe(MetricsSubject, "", &StorageHandler{ - Logger: logger, - Storage: storage, + subscriber.Subscribe(MetricsSubject, "", &RecorderHandler{ + Logger: logger, + Recorder: storage, }) scheduler, err := NewScheduler(10, logger, diff --git a/gather/scraper.go b/gather/scraper.go index c0dc06386d..f1bfc23d1c 100644 --- a/gather/scraper.go +++ b/gather/scraper.go @@ -3,10 +3,10 @@ package gather import ( "context" - platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb" ) // Scraper gathers metrics from a scraper target. type Scraper interface { - Gather(ctx context.Context, target platform.ScraperTarget) (ms []Metrics, err error) + Gather(ctx context.Context, target influxdb.ScraperTarget) (collected MetricsCollection, err error) } diff --git a/gather/scraper_test.go b/gather/scraper_test.go index 9cd5b8418e..6b0649d085 100644 --- a/gather/scraper_test.go +++ b/gather/scraper_test.go @@ -7,9 +7,15 @@ import ( "reflect" "sync" "testing" + "time" "github.com/google/go-cmp/cmp" - platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb" +) + +var ( + orgID, _ = influxdb.IDFromString("020f755c3c082000") + bucketID, _ = influxdb.IDFromString("020f755c3c082001") ) func TestPrometheusScraper(t *testing.T) { @@ -124,19 +130,19 @@ func TestPrometheusScraper(t *testing.T) { defer ts.Close() url = ts.URL } - results, err := scraper.Gather(context.Background(), platform.ScraperTarget{ - URL: url + "/metrics", - OrgName: "org1", - BucketName: "bucket1", + results, err := scraper.Gather(context.Background(), influxdb.ScraperTarget{ + URL: url + "/metrics", + OrgID: *orgID, + BucketID: *bucketID, }) if err != nil && !c.hasErr { t.Fatalf("scraper parse err in testing %s: %v", c.name, err) } - if len(c.ms) != len(results) { + if len(c.ms) != len(results.MetricsSlice) { t.Fatalf("scraper parse metrics incorrect length, want %d, got %d", - len(c.ms), len(results)) + len(c.ms), len(results.MetricsSlice)) } - for _, m := range results { + for _, m := range results.MetricsSlice { for _, cm := range c.ms { if m.Name == cm.Name { if diff := cmp.Diff(m, cm, metricsCmpOption); diff != "" { @@ -182,47 +188,47 @@ go_memstats_frees_total 1.8944339e+07 go_memstats_gc_cpu_fraction 1.972734963012756e-05 ` -// mockStorage implement storage interface -// and platform.ScraperTargetStoreService interface. +// mockStorage implement recorder interface +// and influxdb.ScraperTargetStoreService interface. type mockStorage struct { sync.RWMutex TotalGatherJobs chan struct{} - Metrics map[int64]Metrics - Targets []platform.ScraperTarget + Metrics map[time.Time]Metrics + Targets []influxdb.ScraperTarget } -func (s *mockStorage) Record(ms []Metrics) error { +func (s *mockStorage) Record(collected MetricsCollection) error { s.Lock() defer s.Unlock() - for _, m := range ms { + for _, m := range collected.MetricsSlice { s.Metrics[m.Timestamp] = m } s.TotalGatherJobs <- struct{}{} return nil } -func (s *mockStorage) ListTargets(ctx context.Context) (targets []platform.ScraperTarget, err error) { +func (s *mockStorage) ListTargets(ctx context.Context) (targets []influxdb.ScraperTarget, err error) { s.RLock() defer s.RUnlock() if s.Targets == nil { s.Lock() - s.Targets = make([]platform.ScraperTarget, 0) + s.Targets = make([]influxdb.ScraperTarget, 0) s.Unlock() } return s.Targets, nil } -func (s *mockStorage) AddTarget(ctx context.Context, t *platform.ScraperTarget) error { +func (s *mockStorage) AddTarget(ctx context.Context, t *influxdb.ScraperTarget) error { s.Lock() defer s.Unlock() if s.Targets == nil { - s.Targets = make([]platform.ScraperTarget, 0) + s.Targets = make([]influxdb.ScraperTarget, 0) } s.Targets = append(s.Targets, *t) return nil } -func (s *mockStorage) RemoveTarget(ctx context.Context, id platform.ID) error { +func (s *mockStorage) RemoveTarget(ctx context.Context, id influxdb.ID) error { s.Lock() defer s.Unlock() @@ -238,7 +244,7 @@ func (s *mockStorage) RemoveTarget(ctx context.Context, id platform.ID) error { return nil } -func (s *mockStorage) GetTargetByID(ctx context.Context, id platform.ID) (target *platform.ScraperTarget, err error) { +func (s *mockStorage) GetTargetByID(ctx context.Context, id influxdb.ID) (target *influxdb.ScraperTarget, err error) { s.RLock() defer s.RUnlock() @@ -253,7 +259,7 @@ func (s *mockStorage) GetTargetByID(ctx context.Context, id platform.ID) (target } -func (s *mockStorage) UpdateTarget(ctx context.Context, update *platform.ScraperTarget) (target *platform.ScraperTarget, err error) { +func (s *mockStorage) UpdateTarget(ctx context.Context, update *influxdb.ScraperTarget) (target *influxdb.ScraperTarget, err error) { s.Lock() defer s.Unlock() diff --git a/gather/storage.go b/gather/storage.go deleted file mode 100644 index 4a80d13bc3..0000000000 --- a/gather/storage.go +++ /dev/null @@ -1,36 +0,0 @@ -package gather - -import ( - "encoding/json" - "fmt" - - "github.com/influxdata/influxdb/nats" - "go.uber.org/zap" -) - -// Storage stores the metrics of a time based. -type Storage interface { - //Subscriber nats.Subscriber - Record([]Metrics) error -} - -// StorageHandler implements nats.Handler interface. -type StorageHandler struct { - Storage Storage - Logger *zap.Logger -} - -// Process consumes job queue, and use storage to record. -func (h *StorageHandler) Process(s nats.Subscription, m nats.Message) { - defer m.Ack() - ms := make([]Metrics, 0) - err := json.Unmarshal(m.Data(), &ms) - if err != nil { - h.Logger.Error(fmt.Sprintf("storage handler process err: %v", err)) - return - } - err = h.Storage.Record(ms) - if err != nil { - h.Logger.Error(fmt.Sprintf("storage handler store err: %v", err)) - } -} diff --git a/http/api_handler.go b/http/api_handler.go index 7a6f9933cf..dd9af07fe4 100644 --- a/http/api_handler.go +++ b/http/api_handler.go @@ -20,6 +20,7 @@ type APIHandler struct { DashboardHandler *DashboardHandler AssetHandler *AssetHandler ChronografHandler *ChronografHandler + ScraperHandler *ScraperHandler SourceHandler *SourceHandler MacroHandler *MacroHandler TaskHandler *TaskHandler @@ -101,6 +102,9 @@ func NewAPIHandler(b *APIBackend) *APIHandler { h.AuthorizationHandler.LookupService = b.LookupService h.AuthorizationHandler.Logger = b.Logger.With(zap.String("handler", "auth")) + h.ScraperHandler = NewScraperHandler() + h.ScraperHandler.ScraperStorageService = b.ScraperTargetStoreService + h.SourceHandler = NewSourceHandler() h.SourceHandler.SourceService = b.SourceService h.SourceHandler.NewBucketService = b.NewBucketService @@ -159,10 +163,11 @@ var apiLinks = map[string]interface{}{ "spec": "/api/v2/query/spec", "suggestions": "/api/v2/query/suggestions", }, - "setup": "/api/v2/setup", - "signin": "/api/v2/signin", - "signout": "/api/v2/signout", - "sources": "/api/v2/sources", + "setup": "/api/v2/setup", + "signin": "/api/v2/signin", + "signout": "/api/v2/signout", + "sources": "/api/v2/sources", + "scrapertargets": "/api/v2/scrapertargets", "system": map[string]string{ "metrics": "/metrics", "debug": "/debug/pprof", @@ -250,6 +255,11 @@ func (h *APIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + if strings.HasPrefix(r.URL.Path, "/api/v2/scrapertargets") { + h.ScraperHandler.ServeHTTP(w, r) + return + } + if strings.HasPrefix(r.URL.Path, "/api/v2/tasks") { h.TaskHandler.ServeHTTP(w, r) return diff --git a/http/scraper_service.go b/http/scraper_service.go index 40a9698ca2..d9d438586c 100644 --- a/http/scraper_service.go +++ b/http/scraper_service.go @@ -8,7 +8,6 @@ import ( "path" platform "github.com/influxdata/influxdb" - kerrors "github.com/influxdata/influxdb/kit/errors" "github.com/julienschmidt/httprouter" "go.uber.org/zap" ) @@ -159,7 +158,10 @@ func decodeScraperTargetIDRequest(ctx context.Context, r *http.Request) (*platfo params := httprouter.ParamsFromContext(ctx) id := params.ByName("id") if id == "" { - return nil, kerrors.InvalidDataf("url missing id") + return nil, &platform.Error{ + Code: platform.EInvalid, + Msg: "url missing id", + } } var i platform.ID @@ -270,6 +272,21 @@ func (s *ScraperService) AddTarget(ctx context.Context, target *platform.Scraper return err } + if !target.OrgID.Valid() { + return &platform.Error{ + Code: platform.EInvalid, + Msg: "org id is invalid", + Op: s.OpPrefix + platform.OpAddTarget, + } + } + if !target.BucketID.Valid() { + return &platform.Error{ + Code: platform.EInvalid, + Msg: "bucket id is invalid", + Op: s.OpPrefix + platform.OpAddTarget, + } + } + octets, err := json.Marshal(target) if err != nil { return err diff --git a/http/scraper_service_test.go b/http/scraper_service_test.go index 6463bfc889..adab83aa8a 100644 --- a/http/scraper_service_test.go +++ b/http/scraper_service_test.go @@ -55,20 +55,20 @@ func TestService_handleGetScraperTargets(t *testing.T) { ListTargetsF: func(ctx context.Context) ([]platform.ScraperTarget, error) { return []platform.ScraperTarget{ { - ID: targetOneID, - Name: "target-1", - Type: platform.PrometheusScraperType, - URL: "www.one.url", - OrgName: "org-name", - BucketName: "bkt-one", + ID: targetOneID, + Name: "target-1", + Type: platform.PrometheusScraperType, + URL: "www.one.url", + OrgID: platformtesting.MustIDBase16("0000000000000211"), + BucketID: platformtesting.MustIDBase16("0000000000000212"), }, { - ID: targetTwoID, - Name: "target-2", - Type: platform.PrometheusScraperType, - URL: "www.two.url", - OrgName: "org-name", - BucketName: "bkt-two", + ID: targetTwoID, + Name: "target-2", + Type: platform.PrometheusScraperType, + URL: "www.two.url", + OrgID: platformtesting.MustIDBase16("0000000000000211"), + BucketID: platformtesting.MustIDBase16("0000000000000212"), }, }, nil }, @@ -88,8 +88,8 @@ func TestService_handleGetScraperTargets(t *testing.T) { { "id": "%s", "name": "target-1", - "bucket": "bkt-one", - "org": "org-name", + "bucketID": "0000000000000212", + "orgID": "0000000000000211", "type": "prometheus", "url": "www.one.url", "links": { @@ -99,8 +99,8 @@ func TestService_handleGetScraperTargets(t *testing.T) { { "id": "%s", "name": "target-2", - "bucket": "bkt-two", - "org": "org-name", + "bucketID": "0000000000000212", + "orgID": "0000000000000211", "type": "prometheus", "url": "www.two.url", "links": { @@ -204,12 +204,12 @@ func TestService_handleGetScraperTarget(t *testing.T) { GetTargetByIDF: func(ctx context.Context, id platform.ID) (*platform.ScraperTarget, error) { if id == targetOneID { return &platform.ScraperTarget{ - ID: targetOneID, - Name: "target-1", - Type: platform.PrometheusScraperType, - URL: "www.some.url", - OrgName: "org-name", - BucketName: "bkt-name", + ID: targetOneID, + Name: "target-1", + Type: platform.PrometheusScraperType, + URL: "www.some.url", + OrgID: platformtesting.MustIDBase16("0000000000000211"), + BucketID: platformtesting.MustIDBase16("0000000000000212"), }, nil } return nil, fmt.Errorf("not found") @@ -229,8 +229,8 @@ func TestService_handleGetScraperTarget(t *testing.T) { "name": "target-1", "type": "prometheus", "url": "www.some.url", - "bucket": "bkt-name", - "org": "org-name", + "bucketID": "0000000000000212", + "orgID": "0000000000000211", "links": { "self": "/api/v2/scrapertargets/%[1]s" } @@ -413,11 +413,11 @@ func TestService_handlePostScraperTarget(t *testing.T) { }, args: args{ target: &platform.ScraperTarget{ - Name: "hello", - Type: platform.PrometheusScraperType, - BucketName: "bkt-name", - OrgName: "org-name", - URL: "www.some.url", + Name: "hello", + Type: platform.PrometheusScraperType, + BucketID: platformtesting.MustIDBase16("0000000000000212"), + OrgID: platformtesting.MustIDBase16("0000000000000211"), + URL: "www.some.url", }, }, wants: wants{ @@ -430,8 +430,8 @@ func TestService_handlePostScraperTarget(t *testing.T) { "name": "hello", "type": "prometheus", "url": "www.some.url", - "org": "org-name", - "bucket": "bkt-name", + "orgID": "0000000000000211", + "bucketID": "0000000000000212", "links": { "self": "/api/v2/scrapertargets/%[1]s" } @@ -513,12 +513,12 @@ func TestService_handlePatchScraperTarget(t *testing.T) { args: args{ id: targetOneIDString, update: &platform.ScraperTarget{ - ID: targetOneID, - Name: "name", - BucketName: "buck", - Type: platform.PrometheusScraperType, - URL: "www.example.url", - OrgName: "orgg", + ID: targetOneID, + Name: "name", + BucketID: platformtesting.MustIDBase16("0000000000000212"), + Type: platform.PrometheusScraperType, + URL: "www.example.url", + OrgID: platformtesting.MustIDBase16("0000000000000211"), }, }, wants: wants{ @@ -531,8 +531,8 @@ func TestService_handlePatchScraperTarget(t *testing.T) { "name":"name", "type":"prometheus", "url":"www.example.url", - "org":"orgg", - "bucket":"buck", + "orgID":"0000000000000211", + "bucketID":"0000000000000212", "links":{ "self":"/api/v2/scrapertargets/%[1]s" } @@ -557,12 +557,12 @@ func TestService_handlePatchScraperTarget(t *testing.T) { args: args{ id: targetOneIDString, update: &platform.ScraperTarget{ - ID: targetOneID, - Name: "name", - BucketName: "buck", - Type: platform.PrometheusScraperType, - URL: "www.example.url", - OrgName: "orgg", + ID: targetOneID, + Name: "name", + BucketID: platformtesting.MustIDBase16("0000000000000212"), + Type: platform.PrometheusScraperType, + URL: "www.example.url", + OrgID: platformtesting.MustIDBase16("0000000000000211"), }, }, wants: wants{ diff --git a/inmem/scraper.go b/inmem/scraper.go index 6bd565efc3..32e129cd51 100644 --- a/inmem/scraper.go +++ b/inmem/scraper.go @@ -53,6 +53,20 @@ func (s *Service) ListTargets(ctx context.Context) (list []platform.ScraperTarge // AddTarget add a new scraper target into storage. func (s *Service) AddTarget(ctx context.Context, target *platform.ScraperTarget) (err error) { target.ID = s.IDGenerator.ID() + if !target.OrgID.Valid() { + return &platform.Error{ + Code: platform.EInvalid, + Msg: "org id is invalid", + Op: OpPrefix + platform.OpAddTarget, + } + } + if !target.BucketID.Valid() { + return &platform.Error{ + Code: platform.EInvalid, + Msg: "bucket id is invalid", + Op: OpPrefix + platform.OpAddTarget, + } + } if err := s.PutTarget(ctx, target); err != nil { return &platform.Error{ Op: OpPrefix + platform.OpAddTarget, @@ -84,13 +98,19 @@ func (s *Service) UpdateTarget(ctx context.Context, update *platform.ScraperTarg Msg: "id is invalid", } } - _, pe := s.loadScraperTarget(update.ID) + oldTarget, pe := s.loadScraperTarget(update.ID) if pe != nil { return nil, &platform.Error{ Op: op, Err: pe, } } + if !update.OrgID.Valid() { + update.OrgID = oldTarget.OrgID + } + if !update.BucketID.Valid() { + update.BucketID = oldTarget.BucketID + } if err = s.PutTarget(ctx, update); err != nil { return nil, &platform.Error{ Op: op, diff --git a/scraper.go b/scraper.go index 12e5cfd80e..c7751c80bc 100644 --- a/scraper.go +++ b/scraper.go @@ -18,12 +18,12 @@ const ( // ScraperTarget is a target to scrape type ScraperTarget struct { - ID ID `json:"id,omitempty"` - Name string `json:"name"` - Type ScraperType `json:"type"` - URL string `json:"url"` - OrgName string `json:"org"` - BucketName string `json:"bucket"` + ID ID `json:"id,omitempty"` + Name string `json:"name"` + Type ScraperType `json:"type"` + URL string `json:"url"` + OrgID ID `json:"orgID,omitempty"` + BucketID ID `json:"bucketID,omitempty"` } // ScraperTargetStoreService defines the crud service for ScraperTarget. diff --git a/testing/scraper_target.go b/testing/scraper_target.go index b9d0799b4c..efc081a330 100644 --- a/testing/scraper_target.go +++ b/testing/scraper_target.go @@ -99,22 +99,106 @@ func AddTarget( }, args: args{ target: &platform.ScraperTarget{ - Name: "name1", - Type: platform.PrometheusScraperType, - OrgName: "org1", - BucketName: "bucket1", - URL: "url1", + Name: "name1", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), + URL: "url1", }, }, wants: wants{ targets: []platform.ScraperTarget{ { - Name: "name1", - Type: platform.PrometheusScraperType, - OrgName: "org1", - BucketName: "bucket1", - URL: "url1", - ID: MustIDBase16(targetOneID), + Name: "name1", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), + URL: "url1", + ID: MustIDBase16(targetOneID), + }, + }, + }, + }, + { + name: "create target with invalid org id", + fields: TargetFields{ + IDGenerator: mock.NewIDGenerator(targetTwoID, t), + Targets: []*platform.ScraperTarget{ + { + Name: "name1", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), + URL: "url1", + ID: MustIDBase16(targetOneID), + }, + }, + }, + args: args{ + target: &platform.ScraperTarget{ + ID: MustIDBase16(targetTwoID), + Name: "name2", + Type: platform.PrometheusScraperType, + BucketID: MustIDBase16(bucketTwoID), + URL: "url2", + }, + }, + wants: wants{ + err: &platform.Error{ + Code: platform.EInvalid, + Msg: "org id is invalid", + Op: platform.OpAddTarget, + }, + targets: []platform.ScraperTarget{ + { + Name: "name1", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), + URL: "url1", + ID: MustIDBase16(targetOneID), + }, + }, + }, + }, + { + name: "create target with invalid bucket id", + fields: TargetFields{ + IDGenerator: mock.NewIDGenerator(targetTwoID, t), + Targets: []*platform.ScraperTarget{ + { + Name: "name1", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), + URL: "url1", + ID: MustIDBase16(targetOneID), + }, + }, + }, + args: args{ + target: &platform.ScraperTarget{ + ID: MustIDBase16(targetTwoID), + Name: "name2", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgTwoID), + URL: "url2", + }, + }, + wants: wants{ + err: &platform.Error{ + Code: platform.EInvalid, + Msg: "bucket id is invalid", + Op: platform.OpAddTarget, + }, + targets: []platform.ScraperTarget{ + { + Name: "name1", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), + URL: "url1", + ID: MustIDBase16(targetOneID), }, }, }, @@ -125,42 +209,42 @@ func AddTarget( IDGenerator: mock.NewIDGenerator(targetTwoID, t), Targets: []*platform.ScraperTarget{ { - Name: "name1", - Type: platform.PrometheusScraperType, - OrgName: "org1", - BucketName: "bucket1", - URL: "url1", - ID: MustIDBase16(targetOneID), + Name: "name1", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), + URL: "url1", + ID: MustIDBase16(targetOneID), }, }, }, args: args{ target: &platform.ScraperTarget{ - ID: MustIDBase16(targetTwoID), - Name: "name2", - Type: platform.PrometheusScraperType, - OrgName: "org2", - BucketName: "bucket2", - URL: "url2", + ID: MustIDBase16(targetTwoID), + Name: "name2", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgTwoID), + BucketID: MustIDBase16(bucketTwoID), + URL: "url2", }, }, wants: wants{ targets: []platform.ScraperTarget{ { - Name: "name1", - Type: platform.PrometheusScraperType, - OrgName: "org1", - BucketName: "bucket1", - URL: "url1", - ID: MustIDBase16(targetOneID), + Name: "name1", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), + URL: "url1", + ID: MustIDBase16(targetOneID), }, { - Name: "name2", - Type: platform.PrometheusScraperType, - OrgName: "org2", - BucketName: "bucket2", - URL: "url2", - ID: MustIDBase16(targetTwoID), + Name: "name2", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgTwoID), + BucketID: MustIDBase16(bucketTwoID), + URL: "url2", + ID: MustIDBase16(targetTwoID), }, }, }, @@ -208,40 +292,40 @@ func ListTargets( fields: TargetFields{ Targets: []*platform.ScraperTarget{ { - Name: "name1", - Type: platform.PrometheusScraperType, - OrgName: "org1", - BucketName: "bucket1", - URL: "url1", - ID: MustIDBase16(targetOneID), + Name: "name1", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), + URL: "url1", + ID: MustIDBase16(targetOneID), }, { - Name: "name2", - Type: platform.PrometheusScraperType, - OrgName: "org2", - BucketName: "bucket2", - URL: "url2", - ID: MustIDBase16(targetTwoID), + Name: "name2", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgTwoID), + BucketID: MustIDBase16(bucketTwoID), + URL: "url2", + ID: MustIDBase16(targetTwoID), }, }, }, wants: wants{ targets: []platform.ScraperTarget{ { - Name: "name1", - Type: platform.PrometheusScraperType, - OrgName: "org1", - BucketName: "bucket1", - URL: "url1", - ID: MustIDBase16(targetOneID), + Name: "name1", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), + URL: "url1", + ID: MustIDBase16(targetOneID), }, { - Name: "name2", - Type: platform.PrometheusScraperType, - OrgName: "org2", - BucketName: "bucket2", - URL: "url2", - ID: MustIDBase16(targetTwoID), + Name: "name2", + Type: platform.PrometheusScraperType, + OrgID: MustIDBase16(orgTwoID), + BucketID: MustIDBase16(bucketTwoID), + URL: "url2", + ID: MustIDBase16(targetTwoID), }, }, }, @@ -286,12 +370,16 @@ func GetTargetByID( fields: TargetFields{ Targets: []*platform.ScraperTarget{ { - ID: MustIDBase16(targetOneID), - Name: "target1", + ID: MustIDBase16(targetOneID), + Name: "target1", + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, { - ID: MustIDBase16(targetTwoID), - Name: "target2", + ID: MustIDBase16(targetTwoID), + Name: "target2", + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, }, }, @@ -300,8 +388,10 @@ func GetTargetByID( }, wants: wants{ target: &platform.ScraperTarget{ - ID: MustIDBase16(targetTwoID), - Name: "target2", + ID: MustIDBase16(targetTwoID), + Name: "target2", + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, }, }, @@ -310,12 +400,16 @@ func GetTargetByID( fields: TargetFields{ Targets: []*platform.ScraperTarget{ { - ID: MustIDBase16(targetOneID), - Name: "target1", + ID: MustIDBase16(targetOneID), + Name: "target1", + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, { - ID: MustIDBase16(targetTwoID), - Name: "target2", + ID: MustIDBase16(targetTwoID), + Name: "target2", + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, }, }, @@ -369,10 +463,14 @@ func RemoveTarget(init func(TargetFields, *testing.T) (platform.ScraperTargetSto fields: TargetFields{ Targets: []*platform.ScraperTarget{ { - ID: MustIDBase16(targetOneID), + ID: MustIDBase16(targetOneID), + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, { - ID: MustIDBase16(targetTwoID), + ID: MustIDBase16(targetTwoID), + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, }, }, @@ -382,7 +480,9 @@ func RemoveTarget(init func(TargetFields, *testing.T) (platform.ScraperTargetSto wants: wants{ targets: []platform.ScraperTarget{ { - ID: MustIDBase16(targetTwoID), + ID: MustIDBase16(targetTwoID), + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, }, }, @@ -392,10 +492,14 @@ func RemoveTarget(init func(TargetFields, *testing.T) (platform.ScraperTargetSto fields: TargetFields{ Targets: []*platform.ScraperTarget{ { - ID: MustIDBase16(targetOneID), + ID: MustIDBase16(targetOneID), + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, { - ID: MustIDBase16(targetTwoID), + ID: MustIDBase16(targetTwoID), + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, }, }, @@ -410,10 +514,14 @@ func RemoveTarget(init func(TargetFields, *testing.T) (platform.ScraperTargetSto }, targets: []platform.ScraperTarget{ { - ID: MustIDBase16(targetOneID), + ID: MustIDBase16(targetOneID), + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, { - ID: MustIDBase16(targetTwoID), + ID: MustIDBase16(targetTwoID), + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, }, }, @@ -463,12 +571,16 @@ func UpdateTarget( fields: TargetFields{ Targets: []*platform.ScraperTarget{ { - ID: MustIDBase16(targetOneID), - URL: "url1", + ID: MustIDBase16(targetOneID), + URL: "url1", + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, { - ID: MustIDBase16(targetTwoID), - URL: "url2", + ID: MustIDBase16(targetTwoID), + URL: "url2", + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, }, }, @@ -488,12 +600,16 @@ func UpdateTarget( fields: TargetFields{ Targets: []*platform.ScraperTarget{ { - ID: MustIDBase16(targetOneID), - URL: "url1", + ID: MustIDBase16(targetOneID), + URL: "url1", + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, { - ID: MustIDBase16(targetTwoID), - URL: "url2", + ID: MustIDBase16(targetTwoID), + URL: "url2", + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, }, }, @@ -514,12 +630,16 @@ func UpdateTarget( fields: TargetFields{ Targets: []*platform.ScraperTarget{ { - ID: MustIDBase16(targetOneID), - URL: "url1", + ID: MustIDBase16(targetOneID), + URL: "url1", + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, { - ID: MustIDBase16(targetTwoID), - URL: "url2", + ID: MustIDBase16(targetTwoID), + URL: "url2", + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, }, }, @@ -529,8 +649,10 @@ func UpdateTarget( }, wants: wants{ target: &platform.ScraperTarget{ - ID: MustIDBase16(targetOneID), - URL: "changed", + ID: MustIDBase16(targetOneID), + URL: "changed", + OrgID: MustIDBase16(orgOneID), + BucketID: MustIDBase16(bucketOneID), }, }, },