fix: Improvements in response to PR feedback

* Pass context.Context to Service.Open
* Remove redundant comments
* Bind to retention.Config configuration to be consistent with 1.x
pull/19525/head
Stuart Carnie 2020-09-09 09:48:47 -07:00
parent ab31e7763b
commit e265f60b55
6 changed files with 30 additions and 36 deletions

View File

@ -436,7 +436,7 @@ func launcherOpts(l *Launcher) []cli.Opt {
Desc: "Controls whether we hint to the kernel that we intend to page in mmap'd sections of TSM files.",
},
{
DestP: &l.StorageConfig.RetentionInterval,
DestP: &l.StorageConfig.RetentionService.CheckInterval,
Flag: "storage-retention-check-interval",
Desc: "The interval of time when retention policy enforcement checks run.",
},

View File

@ -1,33 +1,21 @@
package storage
import (
"time"
"github.com/influxdata/influxdb/v2/toml"
"github.com/influxdata/influxdb/v2/tsdb"
)
// Default configuration values.
const (
DefaultRetentionInterval = time.Hour
DefaultSeriesFileDirectoryName = "_series"
DefaultIndexDirectoryName = "index"
DefaultWALDirectoryName = "wal"
DefaultEngineDirectoryName = "data"
"github.com/influxdata/influxdb/v2/v1/services/retention"
)
// Config holds the configuration for an Engine.
type Config struct {
Data tsdb.Config
// Frequency of retention in seconds.
RetentionInterval toml.Duration `toml:"retention-interval"`
RetentionService retention.Config
}
// NewConfig initialises a new config for an Engine.
func NewConfig() Config {
return Config{
Data: tsdb.NewConfig(),
RetentionInterval: toml.Duration(DefaultRetentionInterval),
Data: tsdb.NewConfig(),
RetentionService: retention.NewConfig(),
}
}

View File

@ -111,7 +111,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
pw.MetaClient = e.metaClient
e.pointsWriter = pw
e.retentionService = retention.NewService(retention.Config{Enabled: true, CheckInterval: c.RetentionInterval})
e.retentionService = retention.NewService(c.RetentionService)
e.retentionService.TSDBStore = e.tsdbStore
e.retentionService.MetaClient = e.metaClient
@ -157,7 +157,7 @@ func (e *Engine) Open(ctx context.Context) (err error) {
return err
}
if err := e.retentionService.Open(); err != nil {
if err := e.retentionService.Open(ctx); err != nil {
return err
}

View File

@ -25,8 +25,6 @@ func (c Config) Validate() error {
return nil
}
// TODO: Should we enforce a minimum interval?
// Polling every nanosecond, for instance, will greatly impact performance.
if c.CheckInterval <= 0 {
return errors.New("check-interval must be positive")
}

View File

@ -25,7 +25,7 @@ type Service struct {
config Config
wg sync.WaitGroup
done chan struct{}
cancel context.CancelFunc
logger *zap.Logger
}
@ -39,31 +39,37 @@ func NewService(c Config) *Service {
}
// Open starts retention policy enforcement.
func (s *Service) Open() error {
if !s.config.Enabled || s.done != nil {
func (s *Service) Open(ctx context.Context) error {
if !s.config.Enabled || s.cancel != nil {
return nil
}
s.logger.Info("Starting retention policy enforcement service",
logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval)))
s.done = make(chan struct{})
ctx, s.cancel = context.WithCancel(ctx)
s.wg.Add(1)
go func() { defer s.wg.Done(); s.run() }()
go func() {
defer s.wg.Done()
s.run(ctx)
}()
return nil
}
// Close stops retention policy enforcement.
func (s *Service) Close() error {
if !s.config.Enabled || s.done == nil {
if !s.config.Enabled || s.cancel == nil {
return nil
}
s.logger.Info("Closing retention policy enforcement service")
close(s.done)
s.cancel()
s.wg.Wait()
s.done = nil
s.cancel = nil
return nil
}
@ -72,12 +78,12 @@ func (s *Service) WithLogger(log *zap.Logger) {
s.logger = log.With(zap.String("service", "retention"))
}
func (s *Service) run() {
func (s *Service) run(ctx context.Context) {
ticker := time.NewTicker(time.Duration(s.config.CheckInterval))
defer ticker.Stop()
for {
select {
case <-s.done:
case <-ctx.Done():
return
case <-ticker.C:

View File

@ -2,6 +2,7 @@ package retention_test
import (
"bytes"
"context"
"fmt"
"reflect"
"sync"
@ -21,7 +22,7 @@ func TestService_OpenDisabled(t *testing.T) {
c.Enabled = false
s := NewService(c)
if err := s.Open(); err != nil {
if err := s.Open(context.Background()); err != nil {
t.Fatal(err)
}
@ -34,7 +35,8 @@ func TestService_OpenClose(t *testing.T) {
// Opening a disabled service should be a no-op.
s := NewService(retention.NewConfig())
if err := s.Open(); err != nil {
ctx := context.Background()
if err := s.Open(ctx); err != nil {
t.Fatal(err)
}
@ -43,7 +45,7 @@ func TestService_OpenClose(t *testing.T) {
}
// Reopening is a no-op
if err := s.Open(); err != nil {
if err := s.Open(ctx); err != nil {
t.Fatal(err)
}
@ -171,7 +173,7 @@ func TestService_CheckShards(t *testing.T) {
return nil
}
if err := s.Open(); err != nil {
if err := s.Open(context.Background()); err != nil {
t.Fatalf("unexpected open error: %s", err)
}
defer func() {
@ -211,7 +213,7 @@ func TestService_8819_repro(t *testing.T) {
for i := 0; i < 1000; i++ {
s, errC, done := testService_8819_repro(t)
if err := s.Open(); err != nil {
if err := s.Open(context.Background()); err != nil {
t.Fatal(err)
}