Remove direct Subscriber dependency from PointsWriter
parent
1d8fa27272
commit
71579fcbe9
|
@ -177,7 +177,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
|||
s.PointsWriter = coordinator.NewPointsWriter()
|
||||
s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout)
|
||||
s.PointsWriter.TSDBStore = s.TSDBStore
|
||||
s.PointsWriter.Subscriber = s.Subscriber
|
||||
s.PointsWriter.AddWriteSubscriber(s.Subscriber.Points())
|
||||
|
||||
// Initialize query executor.
|
||||
s.QueryExecutor = influxql.NewQueryExecutor()
|
||||
|
|
|
@ -60,9 +60,6 @@ type PointsWriter struct {
|
|||
WriteToShard(shardID uint64, points []models.Point) error
|
||||
}
|
||||
|
||||
Subscriber interface {
|
||||
Points() chan<- *WritePointsRequest
|
||||
}
|
||||
subPoints []chan<- *WritePointsRequest
|
||||
|
||||
stats *WriteStatistics
|
||||
|
@ -127,9 +124,6 @@ func (w *PointsWriter) Open() error {
|
|||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
w.closing = make(chan struct{})
|
||||
if w.Subscriber != nil {
|
||||
w.AddWriteSubscriber(w.Subscriber.Points())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -315,23 +309,26 @@ func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistency
|
|||
}
|
||||
|
||||
// Send points to subscriptions if possible.
|
||||
ok := false
|
||||
var ok, dropped int64
|
||||
pts := &WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points}
|
||||
// We need to lock just in case the channel is about to be nil'ed
|
||||
w.mu.RLock()
|
||||
for _, ch := range w.subPoints {
|
||||
select {
|
||||
case ch <- pts:
|
||||
ok = true
|
||||
ok++
|
||||
default:
|
||||
dropped++
|
||||
}
|
||||
}
|
||||
w.mu.RUnlock()
|
||||
|
||||
if ok {
|
||||
atomic.AddInt64(&w.stats.SubWriteOK, 1)
|
||||
} else {
|
||||
atomic.AddInt64(&w.stats.SubWriteDrop, 1)
|
||||
if ok > 0 {
|
||||
atomic.AddInt64(&w.stats.SubWriteOK, ok)
|
||||
}
|
||||
|
||||
if dropped > 0 {
|
||||
atomic.AddInt64(&w.stats.SubWriteDrop, dropped)
|
||||
}
|
||||
|
||||
if err == nil && len(shardMappings.Dropped) > 0 {
|
||||
|
|
|
@ -340,7 +340,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
|
|||
c := coordinator.NewPointsWriter()
|
||||
c.MetaClient = ms
|
||||
c.TSDBStore = store
|
||||
c.Subscriber = sub
|
||||
c.AddWriteSubscriber(sub.Points())
|
||||
c.Node = &influxdb.Node{ID: 1}
|
||||
|
||||
c.Open()
|
||||
|
@ -416,7 +416,7 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) {
|
|||
c := coordinator.NewPointsWriter()
|
||||
c.MetaClient = ms
|
||||
c.TSDBStore = store
|
||||
c.Subscriber = sub
|
||||
c.AddWriteSubscriber(sub.Points())
|
||||
c.Node = &influxdb.Node{ID: 1}
|
||||
|
||||
c.Open()
|
||||
|
|
Loading…
Reference in New Issue