// Package subscriber implements the subscriber service // to forward incoming data to remote services. package subscriber // import "github.com/influxdata/influxdb/services/subscriber" import ( "context" "errors" "fmt" "net/url" "sync" "sync/atomic" "time" "unsafe" "github.com/influxdata/influxdb/coordinator" "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/monitor" "github.com/influxdata/influxdb/services/meta" "go.uber.org/zap" ) // Statistics for the Subscriber service. const ( statCreateFailures = "createFailures" statPointsWritten = "pointsWritten" statWriteFailures = "writeFailures" statMemUsage = "memUsage" ) // WriteRequest is a parsed write request. type WriteRequest struct { Database string RetentionPolicy string // lineProtocol must be valid newline-separated line protocol. lineProtocol []byte // pointOffsets gives the starting index within lineProtocol of each point, // for splitting batches if required. pointOffsets []int } func NewWriteRequest(r *coordinator.WritePointsRequest, log *zap.Logger) (wr WriteRequest, numInvalid int64) { log = log.With(zap.String("database", r.Database), zap.String("retention_policy", r.RetentionPolicy)) // Pre-allocate at least smallPointSize bytes per point. const smallPointSize = 10 writeReq := WriteRequest{ Database: r.Database, RetentionPolicy: r.RetentionPolicy, pointOffsets: make([]int, 0, len(r.Points)), lineProtocol: make([]byte, 0, len(r.Points)*smallPointSize), } numInvalid = 0 for _, p := range r.Points { if err := models.ValidPointStrings(p); err != nil { log.Debug("discarding point", zap.Error(err)) numInvalid++ continue } // We are about to append a point of line protocol, so the new point's start index // is the current length. writeReq.pointOffsets = append(writeReq.pointOffsets, len(writeReq.lineProtocol)) // Append the new point and a newline writeReq.lineProtocol = p.AppendString(writeReq.lineProtocol) writeReq.lineProtocol = append(writeReq.lineProtocol, byte('\n')) } return writeReq, numInvalid } // PointAt uses pointOffsets to slice the lineProtocol buffer and retrieve the i_th point in the request. // It includes the trailing newline. func (w *WriteRequest) PointAt(i int) []byte { start := w.pointOffsets[i] // The end of the last point is the length of the buffer end := len(w.lineProtocol) // For points that are not the last point, the end is the start of the next point if i+1 < len(w.pointOffsets) { end = w.pointOffsets[i+1] } return w.lineProtocol[start:end] } func (w *WriteRequest) Length() int { return len(w.pointOffsets) } func (w *WriteRequest) SizeOf() int { const intSize = unsafe.Sizeof(w.pointOffsets[0]) return len(w.lineProtocol) + len(w.pointOffsets)*int(intSize) + len(w.Database) + len(w.RetentionPolicy) } // PointsWriter is an interface for writing points to a subscription destination. // Only WritePoints() needs to be satisfied. PointsWriter implementations // must be goroutine safe. type PointsWriter interface { WritePointsContext(ctx context.Context, request WriteRequest) (destination string, err error) } // subEntry is a unique set that identifies a given subscription. type subEntry struct { db string rp string name string } // Service manages forking the incoming data from InfluxDB // to defined third party destinations. // Subscriptions are defined per database and retention policy. type Service struct { MetaClient interface { Databases() []meta.DatabaseInfo WaitForDataChanged() chan struct{} } NewPointsWriter func(u url.URL) (PointsWriter, error) Logger *zap.Logger stats *Statistics wg sync.WaitGroup closing chan struct{} mu sync.Mutex conf Config subs map[subEntry]*chanWriter // subscriptionRouter is not locked by mu router *subscriptionRouter } // NewService returns a subscriber service with given settings func NewService(c Config) *Service { stats := &Statistics{} s := &Service{ Logger: zap.NewNop(), stats: stats, conf: c, router: newSubscriptionRouter(stats), } s.NewPointsWriter = s.newPointsWriter return s } // Open starts the subscription service. func (s *Service) Open() error { if !s.conf.Enabled { s.Logger.Info("Service is disabled") return nil // Service disabled. } err := func() error { s.mu.Lock() defer s.mu.Unlock() if s.MetaClient == nil { return errors.New("no meta store") } s.closing = make(chan struct{}) s.wg.Add(1) go func() { defer s.wg.Done() s.waitForMetaUpdates() }() return nil }() if err != nil { return err } // Create all subs with initial metadata s.updateSubs() s.Logger.Info("Opened service") return nil } // Close terminates the subscription service. // It will return an error if Open was not called first. func (s *Service) Close() error { // stop receiving new input s.router.Close() err := func() error { s.mu.Lock() defer s.mu.Unlock() if s.closing == nil { return fmt.Errorf("closing unopened subscription service") } select { case <-s.closing: // already closed return nil default: } close(s.closing) return nil }() if err != nil { return err } // Note this section is safe for concurrent calls to Close - both calls will wait for the exits, one caller // will win the right to close the channel writers, and the other will have to wait at the lock for that to finish. // When the second caller gets the lock subs is nil which is safe. // wait, not under the lock, for waitForMetaUpdates to finish gracefully s.wg.Wait() // close all the subscriptions s.mu.Lock() defer s.mu.Unlock() for _, cw := range s.subs { cw.Close() } s.subs = nil s.Logger.Info("Closed service") return nil } // WithLogger sets the logger on the service. func (s *Service) WithLogger(log *zap.Logger) { s.Logger = log.With(zap.String("service", "subscriber")) s.router.Logger = s.Logger } // Statistics maintains the statistics for the subscriber service. type Statistics struct { CreateFailures int64 PointsWritten int64 WriteFailures int64 } // Statistics returns statistics for periodic monitoring. func (s *Service) Statistics(tags map[string]string) []models.Statistic { statistics := []models.Statistic{{ Name: "subscriber", Tags: tags, Values: map[string]interface{}{ statCreateFailures: atomic.LoadInt64(&s.stats.CreateFailures), statPointsWritten: atomic.LoadInt64(&s.stats.PointsWritten), statWriteFailures: atomic.LoadInt64(&s.stats.WriteFailures), }, }} s.mu.Lock() defer s.mu.Unlock() totalSize := int64(0) for _, cw := range s.subs { statistics = append(statistics, cw.Statistics(tags)...) totalSize += atomic.LoadInt64(&cw.queueSize) } statistics[0].Values[statMemUsage] = totalSize return statistics } func (s *Service) waitForMetaUpdates() { ch := s.MetaClient.WaitForDataChanged() for { select { case <-ch: // ch is closed on changes, so fetch the new channel to wait on to ensure we don't miss a new // change while updating ch = s.MetaClient.WaitForDataChanged() s.updateSubs() case <-s.closing: return } } } func (s *Service) createSubscription(se subEntry, mode string, destinations []string) (PointsWriter, error) { var bm BalanceMode switch mode { case "ALL": bm = ALL case "ANY": bm = ANY default: return nil, fmt.Errorf("unknown balance mode %q", mode) } writers := make([]PointsWriter, 0, len(destinations)) stats := make([]writerStats, 0, len(destinations)) // add only valid destinations for _, dest := range destinations { u, err := url.Parse(dest) if err != nil { return nil, fmt.Errorf("failed to parse destination %q: %w", dest, err) } w, err := s.NewPointsWriter(*u) if err != nil { return nil, fmt.Errorf("failed to create writer for destination %q: %w", dest, err) } writers = append(writers, w) stats = append(stats, writerStats{dest: dest}) } return &balancewriter{ bm: bm, writers: writers, stats: stats, defaultTags: models.StatisticTags{ "database": se.db, "retention_policy": se.rp, "name": se.name, "mode": mode, }, }, nil } func (s *Service) Send(request *coordinator.WritePointsRequest) { s.router.Send(request) } func (s *Service) updateSubs() { s.mu.Lock() defer s.mu.Unlock() // check if we're closing while under the lock select { case <-s.closing: return default: } if s.subs == nil { s.subs = make(map[subEntry]*chanWriter) } dbis := s.MetaClient.Databases() allEntries := make(map[subEntry]bool) createdNew := false // Add in new subscriptions for _, dbi := range dbis { for _, rpi := range dbi.RetentionPolicies { for _, si := range rpi.Subscriptions { se := subEntry{ db: dbi.Name, rp: rpi.Name, name: si.Name, } allEntries[se] = true if _, ok := s.subs[se]; ok { continue } createdNew = true s.Logger.Info("Adding new subscription", logger.Database(se.db), logger.RetentionPolicy(se.rp)) sub, err := s.createSubscription(se, si.Mode, si.Destinations) if err != nil { atomic.AddInt64(&s.stats.CreateFailures, 1) s.Logger.Info("Subscription creation failed", zap.String("name", si.Name), zap.Error(err)) continue } s.subs[se] = newChanWriter(s, sub, se) s.Logger.Info("Added new subscription", logger.Database(se.db), logger.RetentionPolicy(se.rp)) } } } toClose := make(map[subEntry]*chanWriter) for se, cw := range s.subs { if !allEntries[se] { toClose[se] = cw delete(s.subs, se) } } if createdNew || len(toClose) > 0 { memoryLimit := int64(0) if s.conf.TotalBufferBytes != 0 { memoryLimit = int64(s.conf.TotalBufferBytes / len(s.subs)) if memoryLimit == 0 { memoryLimit = 1 } } // update the router before we close any subscriptions s.router.Update(s.subs, memoryLimit) } for se, cw := range toClose { s.Logger.Info("Deleting old subscription", logger.Database(se.db), logger.RetentionPolicy(se.rp)) cw.CancelAndClose() s.Logger.Info("Deleted old subscription", logger.Database(se.db), logger.RetentionPolicy(se.rp)) } } // newPointsWriter returns a new PointsWriter from the given URL. func (s *Service) newPointsWriter(u url.URL) (PointsWriter, error) { switch u.Scheme { case "udp": return NewUDP(u), nil case "http": return NewHTTP(u.String(), time.Duration(s.conf.HTTPTimeout)) case "https": if s.conf.InsecureSkipVerify { s.Logger.Warn("'insecure-skip-verify' is true. This will skip all certificate verifications.") } return NewHTTPS(u.String(), time.Duration(s.conf.HTTPTimeout), s.conf.InsecureSkipVerify, s.conf.CaCerts, s.conf.TLS) default: return nil, fmt.Errorf("unknown destination scheme %s", u.Scheme) } } // chanWriter sends WritePointsRequest to a PointsWriter received over a channel. type chanWriter struct { writeRequests chan WriteRequest ctx context.Context cancel context.CancelFunc pw PointsWriter pointsWritten *int64 failures *int64 logger *zap.Logger queueSize int64 queueLimit int64 wg sync.WaitGroup } func newChanWriter(s *Service, sub PointsWriter, se subEntry) *chanWriter { ctx, cancel := context.WithCancel(context.Background()) cw := &chanWriter{ writeRequests: make(chan WriteRequest, s.conf.WriteBufferSize), ctx: ctx, cancel: cancel, pw: sub, pointsWritten: &s.stats.PointsWritten, failures: &s.stats.WriteFailures, logger: s.Logger.With(zap.String("name", se.name), logger.Database(se.db), logger.RetentionPolicy(se.rp)), } for i := 0; i < s.conf.WriteConcurrency; i++ { cw.wg.Add(1) go func() { defer cw.wg.Done() cw.Run() }() } return cw } // Write is on the hot path for data ingest (to the whole database, not just subscriptions). // Be extra careful about latency. func (c *chanWriter) Write(wr WriteRequest) { sz := wr.SizeOf() newSize := atomic.AddInt64(&c.queueSize, int64(sz)) limit := atomic.LoadInt64(&c.queueLimit) // If we would add more size than we should hold, reject the write if limit > 0 && newSize > limit { atomic.AddInt64(c.failures, 1) atomic.AddInt64(&c.queueSize, -int64(sz)) c.logger.Warn("Write rejected, queue over byte limit") return } // If the write queue is full, reject the write select { case c.writeRequests <- wr: default: atomic.AddInt64(c.failures, 1) c.logger.Warn("Write rejected, queue full") } } // limitTo sets a new limit on the size of the queue. func (c *chanWriter) limitTo(newLimit int64) { atomic.StoreInt64(&c.queueLimit, newLimit) // We don't immediately evict things if the queue is over the limit, // since they should be shortly evicted in normal operation. } func (c *chanWriter) CancelAndClose() { close(c.writeRequests) c.cancel() c.wg.Wait() } // Close closes the chanWriter. It blocks until all the in-flight write requests are finished. func (c *chanWriter) Close() { close(c.writeRequests) c.wg.Wait() } func (c *chanWriter) Run() { for wr := range c.writeRequests { dest, err := c.pw.WritePointsContext(c.ctx, wr) if err != nil { c.logger.Warn("Write failed", zap.String("destination", dest), zap.Error(err)) atomic.AddInt64(c.failures, 1) } else { atomic.AddInt64(c.pointsWritten, int64(len(wr.pointOffsets))) } atomic.AddInt64(&c.queueSize, -int64(wr.SizeOf())) } } // Statistics returns statistics for periodic monitoring. func (c *chanWriter) Statistics(tags map[string]string) []models.Statistic { if m, ok := c.pw.(monitor.Reporter); ok { return m.Statistics(tags) } return []models.Statistic{} } // BalanceMode specifies what balance mode to use on a subscription. type BalanceMode int const ( // ALL indicates to send writes to all subscriber destinations. ALL BalanceMode = iota // ANY indicates to send writes to a single subscriber destination, round robin. ANY ) type writerStats struct { dest string failures int64 pointsWritten int64 } // balances writes across PointsWriters according to BalanceMode type balancewriter struct { bm BalanceMode writers []PointsWriter stats []writerStats defaultTags models.StatisticTags i int } func (b *balancewriter) WritePointsContext(ctx context.Context, request WriteRequest) (string, error) { var lastErr error var lastDest string for range b.writers { // round-robin through destinations. i := b.i w := b.writers[i] b.i = (b.i + 1) % len(b.writers) // write points to destination. dest, err := w.WritePointsContext(ctx, request) if err != nil { lastErr = err lastDest = dest atomic.AddInt64(&b.stats[i].failures, 1) } else { atomic.AddInt64(&b.stats[i].pointsWritten, int64(len(request.pointOffsets))) if b.bm == ANY { break } } } return lastDest, lastErr } // Statistics returns statistics for periodic monitoring. func (b *balancewriter) Statistics(tags map[string]string) []models.Statistic { statistics := make([]models.Statistic, len(b.stats)) for i := range b.stats { subTags := b.defaultTags.Merge(tags) subTags["destination"] = b.stats[i].dest statistics[i] = models.Statistic{ Name: "subscriber", Tags: subTags, Values: map[string]interface{}{ statPointsWritten: atomic.LoadInt64(&b.stats[i].pointsWritten), statWriteFailures: atomic.LoadInt64(&b.stats[i].failures), }, } } return statistics } type dbrp struct { db string rp string } // subscriptionRouter has a mutex lock on the hot path for database writes - make sure that the lock is very tight. type subscriptionRouter struct { mu sync.RWMutex ready bool m map[dbrp][]*chanWriter writeFailures *int64 Logger *zap.Logger } func newSubscriptionRouter(statistics *Statistics) *subscriptionRouter { return &subscriptionRouter{ ready: true, writeFailures: &statistics.WriteFailures, Logger: zap.NewNop(), } } func (s *subscriptionRouter) Close() { s.mu.Lock() defer s.mu.Unlock() s.ready = false } func (s *subscriptionRouter) Send(request *coordinator.WritePointsRequest) { // serialize points and put on writer s.mu.RLock() defer s.mu.RUnlock() if !s.ready { return } writers := s.m[dbrp{ db: request.Database, rp: request.RetentionPolicy, }] if len(writers) == 0 { return } writeReq, numInvalid := NewWriteRequest(request, s.Logger) atomic.AddInt64(s.writeFailures, numInvalid) for _, w := range writers { w.Write(writeReq) } } func (s *subscriptionRouter) Update(cws map[subEntry]*chanWriter, memoryLimit int64) { s.mu.Lock() defer s.mu.Unlock() if !s.ready { panic("must be created with NewServer before calling update, must not call update after close") } s.m = make(map[dbrp][]*chanWriter) for se, cw := range cws { cw.limitTo(memoryLimit) key := dbrp{ db: se.db, rp: se.rp, } s.m[key] = append(s.m[key], cw) } }