influxdb/gather/scheduler.go

136 lines
3.0 KiB
Go
Raw Normal View History

2018-09-07 15:45:28 +00:00
package gather
import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
2019-01-10 17:39:37 +00:00
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/nats"
2018-09-07 15:45:28 +00:00
"go.uber.org/zap"
)
2018-09-25 17:45:32 +00:00
// nats subjects
2018-09-07 15:45:28 +00:00
const (
MetricsSubject = "metrics"
promTargetSubject = "promTarget"
)
// Scheduler is struct to run scrape jobs.
type Scheduler struct {
2019-01-10 17:39:37 +00:00
Targets influxdb.ScraperTargetStoreService
2018-09-07 15:45:28 +00:00
// Interval is between each metrics gathering event.
Interval time.Duration
// Timeout is the maxisium time duration allowed by each TCP request
Timeout time.Duration
// Publisher will send the gather requests and gathered metrics to the queue.
Publisher nats.Publisher
log *zap.Logger
2018-09-25 17:45:32 +00:00
gather chan struct{}
2018-09-07 15:45:28 +00:00
}
// NewScheduler creates a new Scheduler and subscriptions for scraper jobs.
func NewScheduler(
log *zap.Logger,
2018-09-07 15:45:28 +00:00
numScrapers int,
2019-01-10 17:39:37 +00:00
targets influxdb.ScraperTargetStoreService,
2018-09-07 15:45:28 +00:00
p nats.Publisher,
s nats.Subscriber,
interval time.Duration,
timeout time.Duration,
) (*Scheduler, error) {
if interval == 0 {
interval = 60 * time.Second
}
if timeout == 0 {
timeout = 30 * time.Second
}
scheduler := &Scheduler{
Targets: targets,
Interval: interval,
Timeout: timeout,
Publisher: p,
log: log,
2018-09-25 17:45:32 +00:00
gather: make(chan struct{}, 100),
2018-09-07 15:45:28 +00:00
}
for i := 0; i < numScrapers; i++ {
err := s.Subscribe(promTargetSubject, "metrics", &handler{
2018-09-07 15:45:28 +00:00
Scraper: new(prometheusScraper),
Publisher: p,
log: log,
2018-09-07 15:45:28 +00:00
})
if err != nil {
return nil, err
}
}
return scheduler, nil
}
// Run will retrieve scraper targets from the target storage,
// and publish them to nats job queue for gather.
func (s *Scheduler) Run(ctx context.Context) error {
go func(s *Scheduler, ctx context.Context) {
2018-09-25 17:45:32 +00:00
for {
select {
case <-ctx.Done():
return
case <-time.After(s.Interval): // TODO: change to ticker because of garbage collection
2018-09-25 17:45:32 +00:00
s.gather <- struct{}{}
}
}
}(s, ctx)
2018-09-25 17:45:32 +00:00
return s.run(ctx)
}
func (s *Scheduler) run(ctx context.Context) error {
2018-09-07 15:45:28 +00:00
for {
select {
case <-ctx.Done():
return nil
2018-09-25 17:45:32 +00:00
case <-s.gather:
s.doGather(ctx)
}
}
}
func (s *Scheduler) doGather(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, s.Timeout)
defer cancel()
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
2019-04-12 16:45:48 +00:00
targets, err := s.Targets.ListTargets(ctx, influxdb.ScraperTargetFilter{})
if err != nil {
s.log.Error("Cannot list targets", zap.Error(err))
tracing.LogError(span, err)
return
}
for _, target := range targets {
if err := requestScrape(target, s.Publisher); err != nil {
s.log.Error("JSON encoding error", zap.Error(err))
tracing.LogError(span, err)
2018-09-07 15:45:28 +00:00
}
}
}
2019-01-10 17:39:37 +00:00
func requestScrape(t influxdb.ScraperTarget, publisher nats.Publisher) error {
2018-09-07 15:45:28 +00:00
buf := new(bytes.Buffer)
err := json.NewEncoder(buf).Encode(t)
if err != nil {
return err
}
switch t.Type {
2019-01-10 17:39:37 +00:00
case influxdb.PrometheusScraperType:
2018-09-07 15:45:28 +00:00
return publisher.Publish(promTargetSubject, buf)
}
return fmt.Errorf("unsupported target scrape type: %s", t.Type)
}