Add tests to Retention service
parent
a37fca1199
commit
1629ec7f5f
|
@ -32,6 +32,8 @@ type MetaClientMock struct {
|
||||||
|
|
||||||
OpenFn func() error
|
OpenFn func() error
|
||||||
|
|
||||||
|
PruneShardGroupsFn func() error
|
||||||
|
|
||||||
RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
|
RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
|
||||||
|
|
||||||
AuthenticateFn func(username, password string) (ui meta.User, err error)
|
AuthenticateFn func(username, password string) (ui meta.User, err error)
|
||||||
|
@ -164,3 +166,5 @@ func (c *MetaClientMock) Users() []meta.UserInfo { return c.Use
|
||||||
func (c *MetaClientMock) Open() error { return c.OpenFn() }
|
func (c *MetaClientMock) Open() error { return c.OpenFn() }
|
||||||
func (c *MetaClientMock) Data() meta.Data { return c.DataFn() }
|
func (c *MetaClientMock) Data() meta.Data { return c.DataFn() }
|
||||||
func (c *MetaClientMock) SetData(d *meta.Data) error { return c.SetDataFn(d) }
|
func (c *MetaClientMock) SetData(d *meta.Data) error { return c.SetDataFn(d) }
|
||||||
|
|
||||||
|
func (c *MetaClientMock) PruneShardGroups() error { return c.PruneShardGroupsFn() }
|
||||||
|
|
|
@ -22,7 +22,7 @@ type Service struct {
|
||||||
DeleteShard(shardID uint64) error
|
DeleteShard(shardID uint64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
checkInterval time.Duration
|
config Config
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
|
|
||||||
|
@ -32,15 +32,21 @@ type Service struct {
|
||||||
// NewService returns a configured retention policy enforcement service.
|
// NewService returns a configured retention policy enforcement service.
|
||||||
func NewService(c Config) *Service {
|
func NewService(c Config) *Service {
|
||||||
return &Service{
|
return &Service{
|
||||||
checkInterval: time.Duration(c.CheckInterval),
|
config: c,
|
||||||
done: make(chan struct{}),
|
|
||||||
logger: zap.New(zap.NullEncoder()),
|
logger: zap.New(zap.NullEncoder()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open starts retention policy enforcement.
|
// Open starts retention policy enforcement.
|
||||||
func (s *Service) Open() error {
|
func (s *Service) Open() error {
|
||||||
s.logger.Info(fmt.Sprint("Starting retention policy enforcement service with check interval of ", s.checkInterval))
|
if !s.config.Enabled || s.done != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.Info(fmt.Sprint("Starting retention policy enforcement service with check interval of ", s.config.CheckInterval))
|
||||||
|
|
||||||
|
s.done = make(chan struct{})
|
||||||
|
|
||||||
s.wg.Add(2)
|
s.wg.Add(2)
|
||||||
go s.deleteShardGroups()
|
go s.deleteShardGroups()
|
||||||
go s.deleteShards()
|
go s.deleteShards()
|
||||||
|
@ -49,9 +55,15 @@ func (s *Service) Open() error {
|
||||||
|
|
||||||
// Close stops retention policy enforcement.
|
// Close stops retention policy enforcement.
|
||||||
func (s *Service) Close() error {
|
func (s *Service) Close() error {
|
||||||
|
if !s.config.Enabled || s.done == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
s.logger.Info("retention policy enforcement terminating")
|
s.logger.Info("retention policy enforcement terminating")
|
||||||
close(s.done)
|
close(s.done)
|
||||||
|
|
||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
|
s.done = nil
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,7 +75,7 @@ func (s *Service) WithLogger(log zap.Logger) {
|
||||||
func (s *Service) deleteShardGroups() {
|
func (s *Service) deleteShardGroups() {
|
||||||
defer s.wg.Done()
|
defer s.wg.Done()
|
||||||
|
|
||||||
ticker := time.NewTicker(s.checkInterval)
|
ticker := time.NewTicker(time.Duration(s.config.CheckInterval))
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -92,7 +104,7 @@ func (s *Service) deleteShardGroups() {
|
||||||
func (s *Service) deleteShards() {
|
func (s *Service) deleteShards() {
|
||||||
defer s.wg.Done()
|
defer s.wg.Done()
|
||||||
|
|
||||||
ticker := time.NewTicker(s.checkInterval)
|
ticker := time.NewTicker(time.Duration(s.config.CheckInterval))
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
package retention_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/internal"
|
||||||
|
"github.com/influxdata/influxdb/services/retention"
|
||||||
|
"github.com/uber-go/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestService_OpenDisabled(t *testing.T) {
|
||||||
|
// Opening a disabled service should be a no-op.
|
||||||
|
c := retention.NewConfig()
|
||||||
|
c.Enabled = false
|
||||||
|
s := NewService(c)
|
||||||
|
|
||||||
|
if err := s.Open(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.LogBuf.String() != "" {
|
||||||
|
t.Fatalf("service logged %q, didn't expect any logging", s.LogBuf.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestService_OpenClose(t *testing.T) {
|
||||||
|
// Opening a disabled service should be a no-op.
|
||||||
|
s := NewService(retention.NewConfig())
|
||||||
|
|
||||||
|
if err := s.Open(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.LogBuf.String() == "" {
|
||||||
|
t.Fatal("service didn't log anything on open")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reopening is a no-op
|
||||||
|
if err := s.Open(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.Close(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-closing is a no-op
|
||||||
|
if err := s.Close(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Service struct {
|
||||||
|
MetaClient *internal.MetaClientMock
|
||||||
|
TSDBStore *internal.TSDBStoreMock
|
||||||
|
|
||||||
|
LogBuf bytes.Buffer
|
||||||
|
*retention.Service
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewService(c retention.Config) *Service {
|
||||||
|
s := &Service{
|
||||||
|
MetaClient: &internal.MetaClientMock{},
|
||||||
|
TSDBStore: &internal.TSDBStoreMock{},
|
||||||
|
Service: retention.NewService(c),
|
||||||
|
}
|
||||||
|
|
||||||
|
l := zap.New(
|
||||||
|
zap.NewTextEncoder(),
|
||||||
|
zap.Output(zap.AddSync(&s.LogBuf)),
|
||||||
|
)
|
||||||
|
s.WithLogger(l)
|
||||||
|
|
||||||
|
s.Service.MetaClient = s.MetaClient
|
||||||
|
s.Service.TSDBStore = s.TSDBStore
|
||||||
|
return s
|
||||||
|
}
|
Loading…
Reference in New Issue