influxdb/services/subscriber/service.go

640 lines
16 KiB
Go

// Package subscriber implements the subscriber service
// to forward incoming data to remote services.
package subscriber // import "github.com/influxdata/influxdb/services/subscriber"
import (
"context"
"errors"
"fmt"
"net/url"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/services/meta"
"go.uber.org/zap"
)
// Statistics for the Subscriber service.
const (
statCreateFailures = "createFailures"
statPointsWritten = "pointsWritten"
statWriteFailures = "writeFailures"
statMemUsage = "memUsage"
)
// WriteRequest is a parsed write request.
type WriteRequest struct {
Database string
RetentionPolicy string
// lineProtocol must be valid newline-separated line protocol.
lineProtocol []byte
// pointOffsets gives the starting index within lineProtocol of each point,
// for splitting batches if required.
pointOffsets []int
}
func NewWriteRequest(r *coordinator.WritePointsRequest, log *zap.Logger) (wr WriteRequest, numInvalid int64) {
log = log.With(zap.String("database", r.Database), zap.String("retention_policy", r.RetentionPolicy))
// Pre-allocate at least smallPointSize bytes per point.
const smallPointSize = 10
writeReq := WriteRequest{
Database: r.Database,
RetentionPolicy: r.RetentionPolicy,
pointOffsets: make([]int, 0, len(r.Points)),
lineProtocol: make([]byte, 0, len(r.Points)*smallPointSize),
}
numInvalid = 0
for _, p := range r.Points {
if err := models.ValidPointStrings(p); err != nil {
log.Debug("discarding point", zap.Error(err))
numInvalid++
continue
}
// We are about to append a point of line protocol, so the new point's start index
// is the current length.
writeReq.pointOffsets = append(writeReq.pointOffsets, len(writeReq.lineProtocol))
// Append the new point and a newline
writeReq.lineProtocol = p.AppendString(writeReq.lineProtocol)
writeReq.lineProtocol = append(writeReq.lineProtocol, byte('\n'))
}
return writeReq, numInvalid
}
// pointAt uses pointOffsets to slice the lineProtocol buffer and retrieve the i_th point in the request.
// It includes the trailing newline.
func (w *WriteRequest) PointAt(i int) []byte {
start := w.pointOffsets[i]
// The end of the last point is the length of the buffer
end := len(w.lineProtocol)
// For points that are not the last point, the end is the start of the next point
if i+1 < len(w.pointOffsets) {
end = w.pointOffsets[i+1]
}
return w.lineProtocol[start:end]
}
func (w *WriteRequest) Length() int {
return len(w.pointOffsets)
}
func (w *WriteRequest) SizeOf() int {
const intSize = unsafe.Sizeof(w.pointOffsets[0])
return len(w.lineProtocol) + len(w.pointOffsets)*int(intSize) + len(w.Database) + len(w.RetentionPolicy)
}
// PointsWriter is an interface for writing points to a subscription destination.
// Only WritePoints() needs to be satisfied. PointsWriter implementations
// must be goroutine safe.
type PointsWriter interface {
WritePointsContext(ctx context.Context, request WriteRequest) error
}
// subEntry is a unique set that identifies a given subscription.
type subEntry struct {
db string
rp string
name string
}
// Service manages forking the incoming data from InfluxDB
// to defined third party destinations.
// Subscriptions are defined per database and retention policy.
type Service struct {
MetaClient interface {
Databases() []meta.DatabaseInfo
WaitForDataChanged() chan struct{}
}
NewPointsWriter func(u url.URL) (PointsWriter, error)
Logger *zap.Logger
stats *Statistics
wg sync.WaitGroup
closing chan struct{}
mu sync.Mutex
conf Config
subs map[subEntry]*chanWriter
// subscriptionRouter is not locked by mu
router *subscriptionRouter
}
// NewService returns a subscriber service with given settings
func NewService(c Config) *Service {
stats := &Statistics{}
s := &Service{
Logger: zap.NewNop(),
stats: stats,
conf: c,
router: newSubscriptionRouter(stats),
}
s.NewPointsWriter = s.newPointsWriter
return s
}
// Open starts the subscription service.
func (s *Service) Open() error {
if !s.conf.Enabled {
return nil // Service disabled.
}
err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.MetaClient == nil {
return errors.New("no meta store")
}
s.closing = make(chan struct{})
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.waitForMetaUpdates()
}()
return nil
}()
if err != nil {
return err
}
// Create all subs with initial metadata
s.updateSubs()
s.Logger.Info("Opened service")
return nil
}
// Close terminates the subscription service.
// It will return an error if Open was not called first.
func (s *Service) Close() error {
// stop receiving new input
s.router.Close()
err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closing == nil {
return fmt.Errorf("closing unopened subscription service")
}
select {
case <-s.closing:
// already closed
return nil
default:
}
close(s.closing)
return nil
}()
if err != nil {
return err
}
// Note this section is safe for concurrent calls to Close - both calls will wait for the exits, one caller
// will win the right to close the channel writers, and the other will have to wait at the lock for that to finish.
// When the second caller gets the lock subs is nil which is safe.
// wait, not under the lock, for waitForMetaUpdates to finish gracefully
s.wg.Wait()
// close all the subscriptions
s.mu.Lock()
defer s.mu.Unlock()
for _, cw := range s.subs {
cw.Close()
}
s.subs = nil
s.Logger.Info("Closed service")
return nil
}
// WithLogger sets the logger on the service.
func (s *Service) WithLogger(log *zap.Logger) {
s.Logger = log.With(zap.String("service", "subscriber"))
s.router.Logger = s.Logger
}
// Statistics maintains the statistics for the subscriber service.
type Statistics struct {
CreateFailures int64
PointsWritten int64
WriteFailures int64
}
// Statistics returns statistics for periodic monitoring.
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
statistics := []models.Statistic{{
Name: "subscriber",
Tags: tags,
Values: map[string]interface{}{
statCreateFailures: atomic.LoadInt64(&s.stats.CreateFailures),
statPointsWritten: atomic.LoadInt64(&s.stats.PointsWritten),
statWriteFailures: atomic.LoadInt64(&s.stats.WriteFailures),
},
}}
s.mu.Lock()
defer s.mu.Unlock()
totalSize := int64(0)
for _, cw := range s.subs {
statistics = append(statistics, cw.Statistics(tags)...)
totalSize += atomic.LoadInt64(&cw.queueSize)
}
statistics[0].Values[statMemUsage] = totalSize
return statistics
}
func (s *Service) waitForMetaUpdates() {
ch := s.MetaClient.WaitForDataChanged()
for {
select {
case <-ch:
// ch is closed on changes, so fetch the new channel to wait on to ensure we don't miss a new
// change while updating
ch = s.MetaClient.WaitForDataChanged()
s.updateSubs()
case <-s.closing:
return
}
}
}
func (s *Service) createSubscription(se subEntry, mode string, destinations []string) (PointsWriter, error) {
var bm BalanceMode
switch mode {
case "ALL":
bm = ALL
case "ANY":
bm = ANY
default:
return nil, fmt.Errorf("unknown balance mode %q", mode)
}
writers := make([]PointsWriter, 0, len(destinations))
stats := make([]writerStats, 0, len(destinations))
// add only valid destinations
for _, dest := range destinations {
u, err := url.Parse(dest)
if err != nil {
return nil, fmt.Errorf("failed to parse destination %q: %w", dest, err)
}
w, err := s.NewPointsWriter(*u)
if err != nil {
return nil, fmt.Errorf("failed to create writer for destination %q: %w", dest, err)
}
writers = append(writers, w)
stats = append(stats, writerStats{dest: dest})
}
return &balancewriter{
bm: bm,
writers: writers,
stats: stats,
defaultTags: models.StatisticTags{
"database": se.db,
"retention_policy": se.rp,
"name": se.name,
"mode": mode,
},
}, nil
}
func (s *Service) Send(request *coordinator.WritePointsRequest) {
s.router.Send(request)
}
func (s *Service) updateSubs() {
s.mu.Lock()
defer s.mu.Unlock()
// check if we're closing while under the lock
select {
case <-s.closing:
return
default:
}
if s.subs == nil {
s.subs = make(map[subEntry]*chanWriter)
}
dbis := s.MetaClient.Databases()
allEntries := make(map[subEntry]bool)
createdNew := false
// Add in new subscriptions
for _, dbi := range dbis {
for _, rpi := range dbi.RetentionPolicies {
for _, si := range rpi.Subscriptions {
se := subEntry{
db: dbi.Name,
rp: rpi.Name,
name: si.Name,
}
allEntries[se] = true
if _, ok := s.subs[se]; ok {
continue
}
createdNew = true
s.Logger.Info("Adding new subscription",
logger.Database(se.db),
logger.RetentionPolicy(se.rp))
sub, err := s.createSubscription(se, si.Mode, si.Destinations)
if err != nil {
atomic.AddInt64(&s.stats.CreateFailures, 1)
s.Logger.Info("Subscription creation failed", zap.String("name", si.Name), zap.Error(err))
continue
}
s.subs[se] = newChanWriter(s, sub)
s.Logger.Info("Added new subscription",
logger.Database(se.db),
logger.RetentionPolicy(se.rp))
}
}
}
toClose := make(map[subEntry]*chanWriter)
for se, cw := range s.subs {
if !allEntries[se] {
toClose[se] = cw
delete(s.subs, se)
}
}
if createdNew || len(toClose) > 0 {
memoryLimit := int64(0)
if s.conf.TotalBufferBytes != 0 {
memoryLimit = int64(s.conf.TotalBufferBytes / len(s.subs))
if memoryLimit == 0 {
memoryLimit = 1
}
}
// update the router before we close any subscriptions
s.router.Update(s.subs, memoryLimit)
}
for se, cw := range toClose {
s.Logger.Info("Deleting old subscription",
logger.Database(se.db),
logger.RetentionPolicy(se.rp))
cw.CancelAndClose()
s.Logger.Info("Deleted old subscription",
logger.Database(se.db),
logger.RetentionPolicy(se.rp))
}
}
// newPointsWriter returns a new PointsWriter from the given URL.
func (s *Service) newPointsWriter(u url.URL) (PointsWriter, error) {
switch u.Scheme {
case "udp":
return NewUDP(u.Host), nil
case "http":
return NewHTTP(u.String(), time.Duration(s.conf.HTTPTimeout))
case "https":
if s.conf.InsecureSkipVerify {
s.Logger.Warn("'insecure-skip-verify' is true. This will skip all certificate verifications.")
}
return NewHTTPS(u.String(), time.Duration(s.conf.HTTPTimeout), s.conf.InsecureSkipVerify, s.conf.CaCerts, s.conf.TLS)
default:
return nil, fmt.Errorf("unknown destination scheme %s", u.Scheme)
}
}
// chanWriter sends WritePointsRequest to a PointsWriter received over a channel.
type chanWriter struct {
writeRequests chan WriteRequest
ctx context.Context
cancel context.CancelFunc
pw PointsWriter
pointsWritten *int64
failures *int64
logger *zap.Logger
queueSize int64
queueLimit int64
wg sync.WaitGroup
}
func newChanWriter(s *Service, sub PointsWriter) *chanWriter {
ctx, cancel := context.WithCancel(context.Background())
cw := &chanWriter{
writeRequests: make(chan WriteRequest, s.conf.WriteBufferSize),
ctx: ctx,
cancel: cancel,
pw: sub,
pointsWritten: &s.stats.PointsWritten,
failures: &s.stats.WriteFailures,
logger: s.Logger,
}
for i := 0; i < s.conf.WriteConcurrency; i++ {
cw.wg.Add(1)
go func() {
defer cw.wg.Done()
cw.Run()
}()
}
return cw
}
// Write is on the hot path for data ingest (to the whole database, not just subscriptions).
// Be extra careful about latency.
func (c *chanWriter) Write(wr WriteRequest) {
sz := wr.SizeOf()
newSize := atomic.AddInt64(&c.queueSize, int64(sz))
limit := atomic.LoadInt64(&c.queueLimit)
// If we would add more size than we should hold, reject the write
if limit > 0 && newSize > limit {
atomic.AddInt64(c.failures, 1)
atomic.AddInt64(&c.queueSize, -int64(sz))
return
}
// If the write queue is full, reject the write
select {
case c.writeRequests <- wr:
default:
atomic.AddInt64(c.failures, 1)
}
}
// limitTo sets a new limit on the size of the queue.
func (c *chanWriter) limitTo(newLimit int64) {
atomic.StoreInt64(&c.queueLimit, newLimit)
// We don't immediately evict things if the queue is over the limit,
// since they should be shortly evicted in normal operation.
}
func (c *chanWriter) CancelAndClose() {
close(c.writeRequests)
c.cancel()
c.wg.Wait()
}
// Close closes the chanWriter. It blocks until all the in-flight write requests are finished.
func (c *chanWriter) Close() {
close(c.writeRequests)
c.wg.Wait()
}
func (c *chanWriter) Run() {
for wr := range c.writeRequests {
err := c.pw.WritePointsContext(c.ctx, wr)
if err != nil {
c.logger.Info(err.Error())
atomic.AddInt64(c.failures, 1)
} else {
atomic.AddInt64(c.pointsWritten, int64(len(wr.pointOffsets)))
}
atomic.AddInt64(&c.queueSize, -int64(wr.SizeOf()))
}
}
// Statistics returns statistics for periodic monitoring.
func (c *chanWriter) Statistics(tags map[string]string) []models.Statistic {
if m, ok := c.pw.(monitor.Reporter); ok {
return m.Statistics(tags)
}
return []models.Statistic{}
}
// BalanceMode specifies what balance mode to use on a subscription.
type BalanceMode int
const (
// ALL indicates to send writes to all subscriber destinations.
ALL BalanceMode = iota
// ANY indicates to send writes to a single subscriber destination, round robin.
ANY
)
type writerStats struct {
dest string
failures int64
pointsWritten int64
}
// balances writes across PointsWriters according to BalanceMode
type balancewriter struct {
bm BalanceMode
writers []PointsWriter
stats []writerStats
defaultTags models.StatisticTags
i int
}
func (b *balancewriter) WritePointsContext(ctx context.Context, request WriteRequest) error {
var lastErr error
for range b.writers {
// round robin through destinations.
i := b.i
w := b.writers[i]
b.i = (b.i + 1) % len(b.writers)
// write points to destination.
err := w.WritePointsContext(ctx, request)
if err != nil {
lastErr = err
atomic.AddInt64(&b.stats[i].failures, 1)
} else {
atomic.AddInt64(&b.stats[i].pointsWritten, int64(len(request.pointOffsets)))
if b.bm == ANY {
break
}
}
}
return lastErr
}
// Statistics returns statistics for periodic monitoring.
func (b *balancewriter) Statistics(tags map[string]string) []models.Statistic {
statistics := make([]models.Statistic, len(b.stats))
for i := range b.stats {
subTags := b.defaultTags.Merge(tags)
subTags["destination"] = b.stats[i].dest
statistics[i] = models.Statistic{
Name: "subscriber",
Tags: subTags,
Values: map[string]interface{}{
statPointsWritten: atomic.LoadInt64(&b.stats[i].pointsWritten),
statWriteFailures: atomic.LoadInt64(&b.stats[i].failures),
},
}
}
return statistics
}
type dbrp struct {
db string
rp string
}
// subscriptionRouter has a mutex lock on the hot path for database writes - make sure that the lock is very tight.
type subscriptionRouter struct {
mu sync.RWMutex
ready bool
m map[dbrp][]*chanWriter
writeFailures *int64
Logger *zap.Logger
}
func newSubscriptionRouter(statistics *Statistics) *subscriptionRouter {
return &subscriptionRouter{
ready: true,
writeFailures: &statistics.WriteFailures,
Logger: zap.NewNop(),
}
}
func (s *subscriptionRouter) Close() {
s.mu.Lock()
defer s.mu.Unlock()
s.ready = false
}
func (s *subscriptionRouter) Send(request *coordinator.WritePointsRequest) {
// serialize points and put on writer
s.mu.RLock()
defer s.mu.RUnlock()
if !s.ready {
return
}
writers := s.m[dbrp{
db: request.Database,
rp: request.RetentionPolicy,
}]
if len(writers) == 0 {
return
}
writeReq, numInvalid := NewWriteRequest(request, s.Logger)
atomic.AddInt64(s.writeFailures, numInvalid)
for _, w := range writers {
w.Write(writeReq)
}
}
func (s *subscriptionRouter) Update(cws map[subEntry]*chanWriter, memoryLimit int64) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.ready {
panic("must be created with NewServer before calling update, must not call update after close")
}
s.m = make(map[dbrp][]*chanWriter)
for se, cw := range cws {
cw.limitTo(memoryLimit)
key := dbrp{
db: se.db,
rp: se.rp,
}
s.m[key] = append(s.m[key], cw)
}
}