writes happen concurrently via chanWriter
parent
663373c4e1
commit
a7cac1337b
|
@ -31,8 +31,8 @@ func (h *HTTP) WritePoints(p *coordinator.WritePointsRequest) (err error) {
|
|||
Database: p.Database,
|
||||
RetentionPolicy: p.RetentionPolicy,
|
||||
})
|
||||
for _, p := range p.Points {
|
||||
bp.AddPoint(client.NewPointFrom(p))
|
||||
for _, pt := range p.Points {
|
||||
bp.AddPoint(client.NewPointFrom(pt))
|
||||
}
|
||||
err = h.c.Write(bp)
|
||||
return
|
||||
|
|
|
@ -54,18 +54,22 @@ type Service struct {
|
|||
closing chan struct{}
|
||||
mu sync.Mutex
|
||||
conf Config
|
||||
|
||||
failures *expvar.Int
|
||||
}
|
||||
|
||||
// NewService returns a subscriber service with given settings
|
||||
func NewService(c Config) *Service {
|
||||
s := &Service{
|
||||
Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags),
|
||||
statMap: influxdb.NewStatistics("subscriber", "subscriber", nil),
|
||||
points: make(chan *coordinator.WritePointsRequest, 100),
|
||||
closed: true,
|
||||
conf: c,
|
||||
Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags),
|
||||
statMap: influxdb.NewStatistics("subscriber", "subscriber", nil),
|
||||
points: make(chan *coordinator.WritePointsRequest, 100),
|
||||
closed: true,
|
||||
conf: c,
|
||||
failures: &expvar.Int{},
|
||||
}
|
||||
s.NewPointsWriter = s.newPointsWriter
|
||||
s.statMap.Set(statWriteFailures, s.failures)
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -83,14 +87,21 @@ func (s *Service) Open() error {
|
|||
s.update = make(chan struct{})
|
||||
|
||||
s.wg.Add(2)
|
||||
go s.run()
|
||||
go s.waitForMetaUpdates()
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
s.run()
|
||||
}()
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
s.waitForMetaUpdates()
|
||||
}()
|
||||
|
||||
s.Logger.Println("opened service")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close terminates the subscription service
|
||||
// Will panic if called multiple times or without first opening the service.
|
||||
func (s *Service) Close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
@ -111,7 +122,6 @@ func (s *Service) SetLogOutput(w io.Writer) {
|
|||
}
|
||||
|
||||
func (s *Service) waitForMetaUpdates() {
|
||||
defer s.wg.Done()
|
||||
for {
|
||||
ch := s.MetaClient.WaitForDataChanged()
|
||||
select {
|
||||
|
@ -183,28 +193,30 @@ func (s *Service) Points() chan<- *coordinator.WritePointsRequest {
|
|||
|
||||
// read points off chan and write them
|
||||
func (s *Service) run() {
|
||||
defer s.wg.Done()
|
||||
subs := make(map[subEntry]PointsWriter)
|
||||
var wg sync.WaitGroup
|
||||
subs := make(map[subEntry]chanWriter)
|
||||
// Perform initial update
|
||||
s.updateSubs(subs)
|
||||
s.updateSubs(subs, &wg)
|
||||
for {
|
||||
select {
|
||||
case <-s.update:
|
||||
err := s.updateSubs(subs)
|
||||
err := s.updateSubs(subs, &wg)
|
||||
if err != nil {
|
||||
s.Logger.Println("failed to update subscriptions:", err)
|
||||
}
|
||||
case p, ok := <-s.points:
|
||||
if !ok {
|
||||
// Close out all chanWriters
|
||||
for _, cw := range subs {
|
||||
cw.Close()
|
||||
}
|
||||
// Wait for them to finish
|
||||
wg.Wait()
|
||||
return
|
||||
}
|
||||
for se, sub := range subs {
|
||||
for se, cw := range subs {
|
||||
if p.Database == se.db && p.RetentionPolicy == se.rp {
|
||||
err := sub.WritePoints(p)
|
||||
if err != nil {
|
||||
s.Logger.Println(err)
|
||||
s.statMap.Add(statWriteFailures, 1)
|
||||
}
|
||||
cw.writeRequests <- p
|
||||
}
|
||||
}
|
||||
s.statMap.Add(statPointsWritten, int64(len(p.Points)))
|
||||
|
@ -212,7 +224,7 @@ func (s *Service) run() {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Service) updateSubs(subs map[subEntry]PointsWriter) error {
|
||||
func (s *Service) updateSubs(subs map[subEntry]chanWriter, wg *sync.WaitGroup) error {
|
||||
dbis := s.MetaClient.Databases()
|
||||
allEntries := make(map[subEntry]bool, 0)
|
||||
// Add in new subscriptions
|
||||
|
@ -232,7 +244,18 @@ func (s *Service) updateSubs(subs map[subEntry]PointsWriter) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
subs[se] = sub
|
||||
cw := chanWriter{
|
||||
writeRequests: make(chan *coordinator.WritePointsRequest),
|
||||
pw: sub,
|
||||
failures: s.failures,
|
||||
logger: s.Logger,
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
cw.Run()
|
||||
}()
|
||||
subs[se] = cw
|
||||
s.Logger.Println("added new subscription for", se.db, se.rp)
|
||||
}
|
||||
}
|
||||
|
@ -241,6 +264,10 @@ func (s *Service) updateSubs(subs map[subEntry]PointsWriter) error {
|
|||
// Remove deleted subs
|
||||
for se := range subs {
|
||||
if !allEntries[se] {
|
||||
// Close the chanWriter
|
||||
subs[se].Close()
|
||||
|
||||
// Remove it from the set
|
||||
delete(subs, se)
|
||||
s.Logger.Println("deleted old subscription for", se.db, se.rp)
|
||||
}
|
||||
|
@ -261,6 +288,29 @@ func (s *Service) newPointsWriter(u url.URL) (PointsWriter, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// Sends WritePointsRequest to a PointsWriter received over a channel.
|
||||
type chanWriter struct {
|
||||
writeRequests chan *coordinator.WritePointsRequest
|
||||
pw PointsWriter
|
||||
failures *expvar.Int
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
// Close the chanWriter
|
||||
func (c chanWriter) Close() {
|
||||
close(c.writeRequests)
|
||||
}
|
||||
|
||||
func (c chanWriter) Run() {
|
||||
for wr := range c.writeRequests {
|
||||
err := c.pw.WritePoints(wr)
|
||||
if err != nil {
|
||||
c.logger.Println(err)
|
||||
c.failures.Add(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BalanceMode sets what balance mode to use on a subscription.
|
||||
// valid options are currently ALL or ANY
|
||||
type BalanceMode int
|
||||
|
|
Loading…
Reference in New Issue