Add wait group to hinted handoff service
parent
4ec77d8b84
commit
2ccf97e6a0
|
@ -15,6 +15,7 @@ var ErrHintedHandoffDisabled = fmt.Errorf("hinted handoff disabled")
|
|||
|
||||
type Service struct {
|
||||
mu sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
closing chan struct{}
|
||||
|
||||
Logger *log.Logger
|
||||
|
@ -56,6 +57,7 @@ func (s *Service) Open() error {
|
|||
|
||||
s.closing = make(chan struct{})
|
||||
|
||||
s.wg.Add(2)
|
||||
go s.retryWrites()
|
||||
go s.expireWrites()
|
||||
|
||||
|
@ -69,6 +71,7 @@ func (s *Service) Close() error {
|
|||
if s.closing != nil {
|
||||
close(s.closing)
|
||||
}
|
||||
s.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -82,6 +85,7 @@ func (s *Service) WriteShard(shardID, ownerID uint64, points []tsdb.Point) error
|
|||
}
|
||||
|
||||
func (s *Service) retryWrites() {
|
||||
defer s.wg.Done()
|
||||
ticker := time.NewTicker(time.Duration(s.cfg.RetryInterval))
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
|
@ -99,6 +103,7 @@ func (s *Service) retryWrites() {
|
|||
// expireWrites will cause the handoff queues to remove writes that are older
|
||||
// than the configured threshold
|
||||
func (s *Service) expireWrites() {
|
||||
defer s.wg.Done()
|
||||
ticker := time.NewTicker(time.Hour)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
|
|
Loading…
Reference in New Issue