From ea21588b9efc8056189fba008a575767db3cfee2 Mon Sep 17 00:00:00 2001
From: Jason Wilder <mail@jasonwilder.com>
Date: Tue, 4 Oct 2016 13:44:56 -0600
Subject: [PATCH] Fix subscriber service dropping writes under high write load

The subscriber write goroutine would drop points if the write load
was higher than it could process.  This could happen with a just
a few writers to the server.

Instead, process the channel with multiple writers to avoid dropping
writes so easily.  This also adds some config options to control how
large the channel buffer is as well as how many goroutines are started.

Fixes #7330
---
 CHANGELOG.md                   |  1 +
 etc/config.sample.toml         |  8 +++++---
 services/subscriber/config.go  | 25 ++++++++++++++++++++++---
 services/subscriber/service.go | 17 ++++++++++-------
 4 files changed, 38 insertions(+), 13 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index e984055b66..b94469db98 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,7 @@
 - [#5878](https://github.com/influxdata/influxdb/issues/5878): Ensure correct shard groups created when retention policy has been altered.
 - [#7391](https://github.com/influxdata/influxdb/issues/7391): Fix RLE integer decoding producing negative numbers
 - [#7335](https://github.com/influxdata/influxdb/pull/7335): Avoid stat syscall when planning compactions
+- [#7330](https://github.com/influxdata/influxdb/issues/7330): Subscription data loss under high write load
 
 ## v1.0.1 [2016-09-26]
 
diff --git a/etc/config.sample.toml b/etc/config.sample.toml
index b99d0ae478..59665187a9 100644
--- a/etc/config.sample.toml
+++ b/etc/config.sample.toml
@@ -49,8 +49,8 @@ reporting-disabled = false
   # These are the WAL settings for the storage engine >= 0.9.3
   wal-dir = "/var/lib/influxdb/wal"
   wal-logging-enabled = true
-  
-  # Trace logging provides more verbose output around the tsm engine. Turning 
+
+  # Trace logging provides more verbose output around the tsm engine. Turning
   # this on can provide more useful output for debugging tsm engine issues.
   # trace-logging-enabled = false
 
@@ -182,7 +182,9 @@ reporting-disabled = false
 
 [subscriber]
   enabled = true
-  http-timeout = "30s"
+  # http-timeout = "30s"
+  # write-concurrency = 40
+  # write-buffer-size = 1000
 
 
 ###
diff --git a/services/subscriber/config.go b/services/subscriber/config.go
index ff92077c2e..e9402629c6 100644
--- a/services/subscriber/config.go
+++ b/services/subscriber/config.go
@@ -8,7 +8,9 @@ import (
 )
 
 const (
-	DefaultHTTPTimeout = 30 * time.Second
+	DefaultHTTPTimeout      = 30 * time.Second
+	DefaultWriteConcurrency = 40
+	DefaultWriteBufferSize  = 1000
 )
 
 // Config represents a configuration of the subscriber service.
@@ -17,13 +19,21 @@ type Config struct {
 	Enabled bool `toml:"enabled"`
 
 	HTTPTimeout toml.Duration `toml:"http-timeout"`
+
+	// The number of writer goroutines processing the write channel.
+	WriteConcurrency int `toml:"write-concurrency"`
+
+	// The number of in-flight writes buffered in the write channel.
+	WriteBufferSize int `toml:"write-buffer-size"`
 }
 
 // NewConfig returns a new instance of a subscriber config.
 func NewConfig() Config {
 	return Config{
-		Enabled:     true,
-		HTTPTimeout: toml.Duration(DefaultHTTPTimeout),
+		Enabled:          true,
+		HTTPTimeout:      toml.Duration(DefaultHTTPTimeout),
+		WriteConcurrency: DefaultWriteConcurrency,
+		WriteBufferSize:  DefaultWriteBufferSize,
 	}
 }
 
@@ -31,5 +41,14 @@ func (c Config) Validate() error {
 	if c.HTTPTimeout <= 0 {
 		return errors.New("http-timeout must be greater than 0")
 	}
+
+	if c.WriteBufferSize <= 0 {
+		return errors.New("write-buffer-size must be greater than 0")
+	}
+
+	if c.WriteConcurrency <= 0 {
+		return errors.New("write-concurrency must be greater than 0")
+	}
+
 	return nil
 }
diff --git a/services/subscriber/service.go b/services/subscriber/service.go
index fae321f2cf..4a5615fb99 100644
--- a/services/subscriber/service.go
+++ b/services/subscriber/service.go
@@ -24,7 +24,8 @@ const (
 )
 
 // PointsWriter is an interface for writing points to a subscription destination.
-// Only WritePoints() needs to be satisfied.
+// Only WritePoints() needs to be satisfied.  PointsWriter implementations
+// must be goroutine safe.
 type PointsWriter interface {
 	WritePoints(p *coordinator.WritePointsRequest) error
 }
@@ -287,17 +288,19 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) error {
 					return err
 				}
 				cw := chanWriter{
-					writeRequests: make(chan *coordinator.WritePointsRequest, 100),
+					writeRequests: make(chan *coordinator.WritePointsRequest, s.conf.WriteBufferSize),
 					pw:            sub,
 					pointsWritten: &s.stats.PointsWritten,
 					failures:      &s.stats.WriteFailures,
 					logger:        s.Logger,
 				}
-				wg.Add(1)
-				go func() {
-					defer wg.Done()
-					cw.Run()
-				}()
+				for i := 0; i < s.conf.WriteConcurrency; i++ {
+					wg.Add(1)
+					go func() {
+						defer wg.Done()
+						cw.Run()
+					}()
+				}
 				s.subs[se] = cw
 				s.Logger.Println("added new subscription for", se.db, se.rp)
 			}