diff --git a/CHANGELOG.md b/CHANGELOG.md index 1380268e26..a2dd22b442 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ - [#8983](https://github.com/influxdata/influxdb/issues/8983): Remove the pidfile after the server has exited. - [#9005](https://github.com/influxdata/influxdb/pull/9005): Return `query.ErrQueryInterrupted` for successful read on `InterruptCh`. - [#8989](https://github.com/influxdata/influxdb/issues/8989): Fix race inside Measurement index. +- [#8819](https://github.com/influxdata/influxdb/issues/8819): Ensure retention service always removes local shards. ## v1.3.4 [unreleased] diff --git a/internal/tsdb_store.go b/internal/tsdb_store.go index c5cec1276c..92b5c40362 100644 --- a/internal/tsdb_store.go +++ b/internal/tsdb_store.go @@ -8,7 +8,7 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/tsdb" - "go.uber.org/zap" + "github.com/uber-go/zap" ) // TSDBStoreMock is a mockable implementation of tsdb.Store. diff --git a/services/retention/service.go b/services/retention/service.go index 58875ef4f2..53301ddb0e 100644 --- a/services/retention/service.go +++ b/services/retention/service.go @@ -44,12 +44,10 @@ func (s *Service) Open() error { } s.logger.Info(fmt.Sprint("Starting retention policy enforcement service with check interval of ", s.config.CheckInterval)) - s.done = make(chan struct{}) - s.wg.Add(2) - go s.deleteShardGroups() - go s.deleteShards() + s.wg.Add(1) + go func() { defer s.wg.Done(); s.run() }() return nil } @@ -59,7 +57,7 @@ func (s *Service) Close() error { return nil } - s.logger.Info("retention policy enforcement terminating") + s.logger.Info("Retention policy enforcement service closing.") close(s.done) s.wg.Wait() @@ -72,9 +70,7 @@ func (s *Service) WithLogger(log zap.Logger) { s.logger = log.With(zap.String("service", "retention")) } -func (s *Service) deleteShardGroups() { - defer s.wg.Done() - +func (s *Service) run() { ticker := time.NewTicker(time.Duration(s.config.CheckInterval)) defer ticker.Stop() for { @@ -83,46 +79,26 @@ func (s *Service) deleteShardGroups() { return case <-ticker.C: - dbs := s.MetaClient.Databases() - for _, d := range dbs { - for _, r := range d.RetentionPolicies { - for _, g := range r.ExpiredShardGroups(time.Now().UTC()) { - if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { - s.logger.Info(fmt.Sprintf("failed to delete shard group %d from database %s, retention policy %s: %s", - g.ID, d.Name, r.Name, err.Error())) - } else { - s.logger.Info(fmt.Sprintf("deleted shard group %d from database %s, retention policy %s", - g.ID, d.Name, r.Name)) - } - } - } - } - } - } -} - -func (s *Service) deleteShards() { - defer s.wg.Done() - - ticker := time.NewTicker(time.Duration(s.config.CheckInterval)) - defer ticker.Stop() - for { - select { - case <-s.done: - return - - case <-ticker.C: - s.logger.Info("retention policy shard deletion check commencing") + s.logger.Info("Retention policy shard deletion check commencing.") type deletionInfo struct { db string rp string } deletedShardIDs := make(map[uint64]deletionInfo, 0) + dbs := s.MetaClient.Databases() for _, d := range dbs { for _, r := range d.RetentionPolicies { - for _, g := range r.DeletedShardGroups() { + for _, g := range r.ExpiredShardGroups(time.Now().UTC()) { + if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { + s.logger.Info(fmt.Sprintf("Failed to delete shard group %d from database %s, retention policy %s: %v. Retry in %v.", g.ID, d.Name, r.Name, err, s.config.CheckInterval)) + continue + } + + s.logger.Info(fmt.Sprintf("Deleted shard group %d from database %s, retention policy %s.", g.ID, d.Name, r.Name)) + + // Store all the shard IDs that may possibly need to be removed locally. for _, sh := range g.Shards { deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name} } @@ -130,19 +106,19 @@ func (s *Service) deleteShards() { } } + // Remove shards if we store them locally for _, id := range s.TSDBStore.ShardIDs() { if info, ok := deletedShardIDs[id]; ok { if err := s.TSDBStore.DeleteShard(id); err != nil { - s.logger.Error(fmt.Sprintf("failed to delete shard ID %d from database %s, retention policy %s: %s", - id, info.db, info.rp, err.Error())) + s.logger.Error(fmt.Sprintf("Failed to delete shard ID %d from database %s, retention policy %s: %v. Will retry in %v", id, info.db, info.rp, err, s.config.CheckInterval)) continue } - s.logger.Info(fmt.Sprintf("shard ID %d from database %s, retention policy %s, deleted", - id, info.db, info.rp)) + s.logger.Info(fmt.Sprintf("Shard ID %d from database %s, retention policy %s, deleted.", id, info.db, info.rp)) } } + if err := s.MetaClient.PruneShardGroups(); err != nil { - s.logger.Info(fmt.Sprintf("error pruning shard groups: %s", err)) + s.logger.Info(fmt.Sprintf("Problem pruning shard groups: %s. Will retry in %v", err, s.config.CheckInterval)) } } } diff --git a/services/retention/service_test.go b/services/retention/service_test.go index 3b22d96e84..a1b3705a9d 100644 --- a/services/retention/service_test.go +++ b/services/retention/service_test.go @@ -3,6 +3,7 @@ package retention_test import ( "bytes" "fmt" + "reflect" "sync" "testing" "time" @@ -65,10 +66,14 @@ func TestService_8819_repro(t *testing.T) { t.Fatal(err) } - // Wait for service to run. + // Wait for service to run one sweep of all dbs/rps/shards. if err := <-errC; err != nil { t.Fatalf("%dth iteration: %v", i, err) } + + if err := s.Close(); err != nil { + t.Fatal(err) + } } } @@ -76,7 +81,7 @@ func testService_8819_repro(t *testing.T) (*Service, chan error) { c := retention.NewConfig() c.CheckInterval = toml.Duration(time.Millisecond) s := NewService(c) - errC := make(chan error) + errC := make(chan error, 1) // Buffer Important to prevent deadlock. // A database and a bunch of shards var mu sync.Mutex @@ -102,7 +107,6 @@ func testService_8819_repro(t *testing.T) (*Service, chan error) { }, }, }, - // TODO - add expired stuff }, } @@ -115,10 +119,13 @@ func testService_8819_repro(t *testing.T) (*Service, chan error) { s.MetaClient.DeleteShardGroupFn = func(database string, policy string, id uint64) error { if database != "db0" { errC <- fmt.Errorf("wrong db name: %s", database) + return nil } else if policy != "autogen" { errC <- fmt.Errorf("wrong rp name: %s", policy) + return nil } else if id != 1 { errC <- fmt.Errorf("wrong shard group id: %d", id) + return nil } // remove the associated shards (3 and 9) from the shards slice... @@ -156,8 +163,15 @@ func testService_8819_repro(t *testing.T) (*Service, chan error) { if !found { errC <- fmt.Errorf("local shard %d present, yet it's missing from meta store. %v -- %v ", lid, shards, localShards) + return nil } } + + // We should have removed shards 3 and 9 + if !reflect.DeepEqual(localShards, []uint64{5, 8, 11}) { + errC <- fmt.Errorf("removed shards still present locally: %v", localShards) + return nil + } errC <- nil return nil }