diff --git a/internal/meta_client.go b/internal/meta_client.go index e87607ebcc..7ab28d815b 100644 --- a/internal/meta_client.go +++ b/internal/meta_client.go @@ -32,6 +32,8 @@ type MetaClientMock struct { OpenFn func() error + PruneShardGroupsFn func() error + RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error) AuthenticateFn func(username, password string) (ui meta.User, err error) @@ -164,3 +166,5 @@ func (c *MetaClientMock) Users() []meta.UserInfo { return c.Use func (c *MetaClientMock) Open() error { return c.OpenFn() } func (c *MetaClientMock) Data() meta.Data { return c.DataFn() } func (c *MetaClientMock) SetData(d *meta.Data) error { return c.SetDataFn(d) } + +func (c *MetaClientMock) PruneShardGroups() error { return c.PruneShardGroupsFn() } diff --git a/services/retention/service.go b/services/retention/service.go index 574367b5b6..58875ef4f2 100644 --- a/services/retention/service.go +++ b/services/retention/service.go @@ -22,9 +22,9 @@ type Service struct { DeleteShard(shardID uint64) error } - checkInterval time.Duration - wg sync.WaitGroup - done chan struct{} + config Config + wg sync.WaitGroup + done chan struct{} logger zap.Logger } @@ -32,15 +32,21 @@ type Service struct { // NewService returns a configured retention policy enforcement service. func NewService(c Config) *Service { return &Service{ - checkInterval: time.Duration(c.CheckInterval), - done: make(chan struct{}), - logger: zap.New(zap.NullEncoder()), + config: c, + logger: zap.New(zap.NullEncoder()), } } // Open starts retention policy enforcement. func (s *Service) Open() error { - s.logger.Info(fmt.Sprint("Starting retention policy enforcement service with check interval of ", s.checkInterval)) + if !s.config.Enabled || s.done != nil { + return nil + } + + s.logger.Info(fmt.Sprint("Starting retention policy enforcement service with check interval of ", s.config.CheckInterval)) + + s.done = make(chan struct{}) + s.wg.Add(2) go s.deleteShardGroups() go s.deleteShards() @@ -49,9 +55,15 @@ func (s *Service) Open() error { // Close stops retention policy enforcement. func (s *Service) Close() error { + if !s.config.Enabled || s.done == nil { + return nil + } + s.logger.Info("retention policy enforcement terminating") close(s.done) + s.wg.Wait() + s.done = nil return nil } @@ -63,7 +75,7 @@ func (s *Service) WithLogger(log zap.Logger) { func (s *Service) deleteShardGroups() { defer s.wg.Done() - ticker := time.NewTicker(s.checkInterval) + ticker := time.NewTicker(time.Duration(s.config.CheckInterval)) defer ticker.Stop() for { select { @@ -92,7 +104,7 @@ func (s *Service) deleteShardGroups() { func (s *Service) deleteShards() { defer s.wg.Done() - ticker := time.NewTicker(s.checkInterval) + ticker := time.NewTicker(time.Duration(s.config.CheckInterval)) defer ticker.Stop() for { select { diff --git a/services/retention/service_test.go b/services/retention/service_test.go new file mode 100644 index 0000000000..f348c18a56 --- /dev/null +++ b/services/retention/service_test.go @@ -0,0 +1,78 @@ +package retention_test + +import ( + "bytes" + "testing" + + "github.com/influxdata/influxdb/internal" + "github.com/influxdata/influxdb/services/retention" + "github.com/uber-go/zap" +) + +func TestService_OpenDisabled(t *testing.T) { + // Opening a disabled service should be a no-op. + c := retention.NewConfig() + c.Enabled = false + s := NewService(c) + + if err := s.Open(); err != nil { + t.Fatal(err) + } + + if s.LogBuf.String() != "" { + t.Fatalf("service logged %q, didn't expect any logging", s.LogBuf.String()) + } +} + +func TestService_OpenClose(t *testing.T) { + // Opening a disabled service should be a no-op. + s := NewService(retention.NewConfig()) + + if err := s.Open(); err != nil { + t.Fatal(err) + } + + if s.LogBuf.String() == "" { + t.Fatal("service didn't log anything on open") + } + + // Reopening is a no-op + if err := s.Open(); err != nil { + t.Fatal(err) + } + + if err := s.Close(); err != nil { + t.Fatal(err) + } + + // Re-closing is a no-op + if err := s.Close(); err != nil { + t.Fatal(err) + } +} + +type Service struct { + MetaClient *internal.MetaClientMock + TSDBStore *internal.TSDBStoreMock + + LogBuf bytes.Buffer + *retention.Service +} + +func NewService(c retention.Config) *Service { + s := &Service{ + MetaClient: &internal.MetaClientMock{}, + TSDBStore: &internal.TSDBStoreMock{}, + Service: retention.NewService(c), + } + + l := zap.New( + zap.NewTextEncoder(), + zap.Output(zap.AddSync(&s.LogBuf)), + ) + s.WithLogger(l) + + s.Service.MetaClient = s.MetaClient + s.Service.TSDBStore = s.TSDBStore + return s +}