PointsWriter will drop writes to subscriber service for any in-flight writes

pull/4686/head
Nathaniel Cook 2015-11-05 15:27:00 -07:00
parent 780df574bb
commit 1719a6107c
5 changed files with 73 additions and 14 deletions

View File

@ -97,6 +97,7 @@
- [#4625](https://github.com/influxdb/influxdb/pull/4625): Correctly handle bad write requests. Thanks @oiooj.
- [#4650](https://github.com/influxdb/influxdb/issues/4650): Importer should skip empty lines
- [#4651](https://github.com/influxdb/influxdb/issues/4651): Importer doesn't flush out last batch
- [#4602](https://github.com/influxdb/influxdb/issues/4602): Fixes data race between PointsWriter and Subscriber services.
## v0.9.4 [2015-09-14]

View File

@ -112,6 +112,7 @@ type PointsWriter struct {
Subscriber interface {
Points() chan<- *WritePointsRequest
}
subPoints chan<- *WritePointsRequest
statMap *expvar.Map
}
@ -155,8 +156,9 @@ func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) {
func (w *PointsWriter) Open() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.closing == nil {
w.closing = make(chan struct{})
w.closing = make(chan struct{})
if w.Subscriber != nil {
w.subPoints = w.Subscriber.Points()
}
return nil
}
@ -167,7 +169,12 @@ func (w *PointsWriter) Close() error {
defer w.mu.Unlock()
if w.closing != nil {
close(w.closing)
w.closing = nil
}
if w.subPoints != nil {
// 'nil' channels always block so this makes the
// select statement in WritePoints hit its default case
// dropping any in-flight writes.
w.subPoints = nil
}
return nil
}
@ -252,13 +259,19 @@ func (w *PointsWriter) WritePoints(p *WritePointsRequest) error {
}
// Send points to subscriptions if possible.
if w.Subscriber != nil {
select {
case w.Subscriber.Points() <- p:
w.statMap.Add(statSubWriteOK, 1)
default:
w.statMap.Add(statSubWriteDrop, 1)
}
ok := false
// We need to lock just in case the channel is about to be nil'ed
w.mu.RLock()
select {
case w.subPoints <- p:
ok = true
default:
}
w.mu.RUnlock()
if ok {
w.statMap.Add(statSubWriteOK, 1)
} else {
w.statMap.Add(statSubWriteDrop, 1)
}
for range shardMappings.Points {

View File

@ -322,6 +322,9 @@ func TestPointsWriter_WritePoints(t *testing.T) {
c.HintedHandoff = hh
c.Subscriber = sub
c.Open()
defer c.Close()
err := c.WritePoints(pr)
if err == nil && test.expErr != nil {
t.Errorf("PointsWriter.WritePoints(): '%s' error: got %v, exp %v", test.name, err, test.expErr)

View File

@ -385,10 +385,6 @@ func (s *Server) Open() error {
// Wait for the store to initialize.
<-s.MetaStore.Ready()
if err := s.Monitor.Open(); err != nil {
return fmt.Errorf("open monitor: %v", err)
}
// Open TSDB store.
if err := s.TSDBStore.Open(); err != nil {
return fmt.Errorf("open tsdb store: %s", err)
@ -404,6 +400,16 @@ func (s *Server) Open() error {
return fmt.Errorf("open subscriber: %s", err)
}
// Open the points writer service
if err := s.PointsWriter.Open(); err != nil {
return fmt.Errorf("open points writer: %s", err)
}
// Open the monitor service
if err := s.Monitor.Open(); err != nil {
return fmt.Errorf("open monitor: %v", err)
}
for _, service := range s.Services {
if err := service.Open(); err != nil {
return fmt.Errorf("open service: %s", err)
@ -444,6 +450,10 @@ func (s *Server) Close() error {
s.Monitor.Close()
}
if s.PointsWriter != nil {
s.PointsWriter.Close()
}
if s.HintedHandoff != nil {
s.HintedHandoff.Close()
}

View File

@ -8,6 +8,8 @@ import (
"strings"
"testing"
"time"
"github.com/influxdb/influxdb/cluster"
)
// Ensure that HTTP responses include the InfluxDB version.
@ -5294,3 +5296,33 @@ func TestServer_Query_IntoTarget(t *testing.T) {
}
}
}
// This test reproduced a data race with closing the
// Subscriber points channel while writes were in-flight in the PointsWriter.
func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) {
t.Parallel()
s := OpenDefaultServer(NewConfig(), "")
defer s.Close()
// goroutine to write points
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
default:
wpr := &cluster.WritePointsRequest{
Database: "db0",
RetentionPolicy: "rp0",
}
s.PointsWriter.WritePoints(wpr)
}
}
}()
time.Sleep(10 * time.Millisecond)
close(done)
// Race occurs on s.Close()
}