From 2ea2abb00110bcc2ba1cade29cb46fb2f85ec286 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 26 Oct 2017 14:31:27 +0100 Subject: [PATCH] Remove possibility of race when dropping shards Fixes #8819. Previously, the process of dropping expired shards according to the retention policy duration, was managed by two independent goroutines in the retention policy service. This behaviour was introduced in #2776, at a time when there were both data and meta nodes in the OSS codebase. The idea was that only the leader meta node would run the meta data deletions in the first goroutine, and all other nodes would run the local deletions in the second goroutine. InfluxDB no longer operates in that way and so we ended up with two independent goroutines that were carrying out an action that was really dependent on each other. If the second goroutine runs before the first then it may not see the meta data changes indicating shards should be deleted and it won't delete any shards locally. Shortly after this the first goroutine will run and remove the meta data for the shard groups. This results in a situation where it looks like the shards have gone, but in fact they remain on disk (and importantly, their series within the index) until the next time the second goroutine runs. By default that's 30 minutes. In the case where the shards to be removed would have removed the last occurences of some series, then it's possible that if the database was already at its maximum series limit (or tag limit for that matter), no further new series can be inserted. --- CHANGELOG.md | 1 + internal/tsdb_store.go | 2 +- services/retention/service.go | 64 ++++++++++-------------------- services/retention/service_test.go | 20 ++++++++-- 4 files changed, 39 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1380268e26..a2dd22b442 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ - [#8983](https://github.com/influxdata/influxdb/issues/8983): Remove the pidfile after the server has exited. - [#9005](https://github.com/influxdata/influxdb/pull/9005): Return `query.ErrQueryInterrupted` for successful read on `InterruptCh`. - [#8989](https://github.com/influxdata/influxdb/issues/8989): Fix race inside Measurement index. +- [#8819](https://github.com/influxdata/influxdb/issues/8819): Ensure retention service always removes local shards. ## v1.3.4 [unreleased] diff --git a/internal/tsdb_store.go b/internal/tsdb_store.go index c5cec1276c..92b5c40362 100644 --- a/internal/tsdb_store.go +++ b/internal/tsdb_store.go @@ -8,7 +8,7 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/tsdb" - "go.uber.org/zap" + "github.com/uber-go/zap" ) // TSDBStoreMock is a mockable implementation of tsdb.Store. diff --git a/services/retention/service.go b/services/retention/service.go index 58875ef4f2..53301ddb0e 100644 --- a/services/retention/service.go +++ b/services/retention/service.go @@ -44,12 +44,10 @@ func (s *Service) Open() error { } s.logger.Info(fmt.Sprint("Starting retention policy enforcement service with check interval of ", s.config.CheckInterval)) - s.done = make(chan struct{}) - s.wg.Add(2) - go s.deleteShardGroups() - go s.deleteShards() + s.wg.Add(1) + go func() { defer s.wg.Done(); s.run() }() return nil } @@ -59,7 +57,7 @@ func (s *Service) Close() error { return nil } - s.logger.Info("retention policy enforcement terminating") + s.logger.Info("Retention policy enforcement service closing.") close(s.done) s.wg.Wait() @@ -72,9 +70,7 @@ func (s *Service) WithLogger(log zap.Logger) { s.logger = log.With(zap.String("service", "retention")) } -func (s *Service) deleteShardGroups() { - defer s.wg.Done() - +func (s *Service) run() { ticker := time.NewTicker(time.Duration(s.config.CheckInterval)) defer ticker.Stop() for { @@ -83,46 +79,26 @@ func (s *Service) deleteShardGroups() { return case <-ticker.C: - dbs := s.MetaClient.Databases() - for _, d := range dbs { - for _, r := range d.RetentionPolicies { - for _, g := range r.ExpiredShardGroups(time.Now().UTC()) { - if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { - s.logger.Info(fmt.Sprintf("failed to delete shard group %d from database %s, retention policy %s: %s", - g.ID, d.Name, r.Name, err.Error())) - } else { - s.logger.Info(fmt.Sprintf("deleted shard group %d from database %s, retention policy %s", - g.ID, d.Name, r.Name)) - } - } - } - } - } - } -} - -func (s *Service) deleteShards() { - defer s.wg.Done() - - ticker := time.NewTicker(time.Duration(s.config.CheckInterval)) - defer ticker.Stop() - for { - select { - case <-s.done: - return - - case <-ticker.C: - s.logger.Info("retention policy shard deletion check commencing") + s.logger.Info("Retention policy shard deletion check commencing.") type deletionInfo struct { db string rp string } deletedShardIDs := make(map[uint64]deletionInfo, 0) + dbs := s.MetaClient.Databases() for _, d := range dbs { for _, r := range d.RetentionPolicies { - for _, g := range r.DeletedShardGroups() { + for _, g := range r.ExpiredShardGroups(time.Now().UTC()) { + if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { + s.logger.Info(fmt.Sprintf("Failed to delete shard group %d from database %s, retention policy %s: %v. Retry in %v.", g.ID, d.Name, r.Name, err, s.config.CheckInterval)) + continue + } + + s.logger.Info(fmt.Sprintf("Deleted shard group %d from database %s, retention policy %s.", g.ID, d.Name, r.Name)) + + // Store all the shard IDs that may possibly need to be removed locally. for _, sh := range g.Shards { deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name} } @@ -130,19 +106,19 @@ func (s *Service) deleteShards() { } } + // Remove shards if we store them locally for _, id := range s.TSDBStore.ShardIDs() { if info, ok := deletedShardIDs[id]; ok { if err := s.TSDBStore.DeleteShard(id); err != nil { - s.logger.Error(fmt.Sprintf("failed to delete shard ID %d from database %s, retention policy %s: %s", - id, info.db, info.rp, err.Error())) + s.logger.Error(fmt.Sprintf("Failed to delete shard ID %d from database %s, retention policy %s: %v. Will retry in %v", id, info.db, info.rp, err, s.config.CheckInterval)) continue } - s.logger.Info(fmt.Sprintf("shard ID %d from database %s, retention policy %s, deleted", - id, info.db, info.rp)) + s.logger.Info(fmt.Sprintf("Shard ID %d from database %s, retention policy %s, deleted.", id, info.db, info.rp)) } } + if err := s.MetaClient.PruneShardGroups(); err != nil { - s.logger.Info(fmt.Sprintf("error pruning shard groups: %s", err)) + s.logger.Info(fmt.Sprintf("Problem pruning shard groups: %s. Will retry in %v", err, s.config.CheckInterval)) } } } diff --git a/services/retention/service_test.go b/services/retention/service_test.go index 3b22d96e84..a1b3705a9d 100644 --- a/services/retention/service_test.go +++ b/services/retention/service_test.go @@ -3,6 +3,7 @@ package retention_test import ( "bytes" "fmt" + "reflect" "sync" "testing" "time" @@ -65,10 +66,14 @@ func TestService_8819_repro(t *testing.T) { t.Fatal(err) } - // Wait for service to run. + // Wait for service to run one sweep of all dbs/rps/shards. if err := <-errC; err != nil { t.Fatalf("%dth iteration: %v", i, err) } + + if err := s.Close(); err != nil { + t.Fatal(err) + } } } @@ -76,7 +81,7 @@ func testService_8819_repro(t *testing.T) (*Service, chan error) { c := retention.NewConfig() c.CheckInterval = toml.Duration(time.Millisecond) s := NewService(c) - errC := make(chan error) + errC := make(chan error, 1) // Buffer Important to prevent deadlock. // A database and a bunch of shards var mu sync.Mutex @@ -102,7 +107,6 @@ func testService_8819_repro(t *testing.T) (*Service, chan error) { }, }, }, - // TODO - add expired stuff }, } @@ -115,10 +119,13 @@ func testService_8819_repro(t *testing.T) (*Service, chan error) { s.MetaClient.DeleteShardGroupFn = func(database string, policy string, id uint64) error { if database != "db0" { errC <- fmt.Errorf("wrong db name: %s", database) + return nil } else if policy != "autogen" { errC <- fmt.Errorf("wrong rp name: %s", policy) + return nil } else if id != 1 { errC <- fmt.Errorf("wrong shard group id: %d", id) + return nil } // remove the associated shards (3 and 9) from the shards slice... @@ -156,8 +163,15 @@ func testService_8819_repro(t *testing.T) (*Service, chan error) { if !found { errC <- fmt.Errorf("local shard %d present, yet it's missing from meta store. %v -- %v ", lid, shards, localShards) + return nil } } + + // We should have removed shards 3 and 9 + if !reflect.DeepEqual(localShards, []uint64{5, 8, 11}) { + errC <- fmt.Errorf("removed shards still present locally: %v", localShards) + return nil + } errC <- nil return nil }