Remove possibility of race when dropping shards

Fixes #8819.

Previously, the process of dropping expired shards according to the
retention policy duration, was managed by two independent goroutines in
the retention policy service. This behaviour was introduced in #2776,
at a time when there were both data and meta nodes in the OSS codebase.
The idea was that only the leader meta node would run the meta data
deletions in the first goroutine, and all other nodes would run the
local deletions in the second goroutine.

InfluxDB no longer operates in that way and so we ended up with two
independent goroutines that were carrying out an action that was really
dependent on each other.

If the second goroutine runs before the first then it may not see the
meta data changes indicating shards should be deleted and it won't
delete any shards locally. Shortly after this the first goroutine will
run and remove the meta data for the shard groups.

This results in a situation where it looks like the shards have gone,
but in fact they remain on disk (and importantly, their series within
the index) until the next time the second goroutine runs. By default
that's 30 minutes.

In the case where the shards to be removed would have removed the last
occurences of some series, then it's possible that if the database was already at its
maximum series limit (or tag limit for that matter), no further new series
can be inserted.
pull/9017/head
Edd Robinson 2017-10-26 14:31:27 +01:00
parent 77977af685
commit 2ea2abb001
4 changed files with 39 additions and 48 deletions

View File

@ -84,6 +84,7 @@
- [#8983](https://github.com/influxdata/influxdb/issues/8983): Remove the pidfile after the server has exited.
- [#9005](https://github.com/influxdata/influxdb/pull/9005): Return `query.ErrQueryInterrupted` for successful read on `InterruptCh`.
- [#8989](https://github.com/influxdata/influxdb/issues/8989): Fix race inside Measurement index.
- [#8819](https://github.com/influxdata/influxdb/issues/8819): Ensure retention service always removes local shards.
## v1.3.4 [unreleased]

View File

@ -8,7 +8,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/tsdb"
"go.uber.org/zap"
"github.com/uber-go/zap"
)
// TSDBStoreMock is a mockable implementation of tsdb.Store.

View File

@ -44,12 +44,10 @@ func (s *Service) Open() error {
}
s.logger.Info(fmt.Sprint("Starting retention policy enforcement service with check interval of ", s.config.CheckInterval))
s.done = make(chan struct{})
s.wg.Add(2)
go s.deleteShardGroups()
go s.deleteShards()
s.wg.Add(1)
go func() { defer s.wg.Done(); s.run() }()
return nil
}
@ -59,7 +57,7 @@ func (s *Service) Close() error {
return nil
}
s.logger.Info("retention policy enforcement terminating")
s.logger.Info("Retention policy enforcement service closing.")
close(s.done)
s.wg.Wait()
@ -72,9 +70,7 @@ func (s *Service) WithLogger(log zap.Logger) {
s.logger = log.With(zap.String("service", "retention"))
}
func (s *Service) deleteShardGroups() {
defer s.wg.Done()
func (s *Service) run() {
ticker := time.NewTicker(time.Duration(s.config.CheckInterval))
defer ticker.Stop()
for {
@ -83,46 +79,26 @@ func (s *Service) deleteShardGroups() {
return
case <-ticker.C:
dbs := s.MetaClient.Databases()
for _, d := range dbs {
for _, r := range d.RetentionPolicies {
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
s.logger.Info(fmt.Sprintf("failed to delete shard group %d from database %s, retention policy %s: %s",
g.ID, d.Name, r.Name, err.Error()))
} else {
s.logger.Info(fmt.Sprintf("deleted shard group %d from database %s, retention policy %s",
g.ID, d.Name, r.Name))
}
}
}
}
}
}
}
func (s *Service) deleteShards() {
defer s.wg.Done()
ticker := time.NewTicker(time.Duration(s.config.CheckInterval))
defer ticker.Stop()
for {
select {
case <-s.done:
return
case <-ticker.C:
s.logger.Info("retention policy shard deletion check commencing")
s.logger.Info("Retention policy shard deletion check commencing.")
type deletionInfo struct {
db string
rp string
}
deletedShardIDs := make(map[uint64]deletionInfo, 0)
dbs := s.MetaClient.Databases()
for _, d := range dbs {
for _, r := range d.RetentionPolicies {
for _, g := range r.DeletedShardGroups() {
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
s.logger.Info(fmt.Sprintf("Failed to delete shard group %d from database %s, retention policy %s: %v. Retry in %v.", g.ID, d.Name, r.Name, err, s.config.CheckInterval))
continue
}
s.logger.Info(fmt.Sprintf("Deleted shard group %d from database %s, retention policy %s.", g.ID, d.Name, r.Name))
// Store all the shard IDs that may possibly need to be removed locally.
for _, sh := range g.Shards {
deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name}
}
@ -130,19 +106,19 @@ func (s *Service) deleteShards() {
}
}
// Remove shards if we store them locally
for _, id := range s.TSDBStore.ShardIDs() {
if info, ok := deletedShardIDs[id]; ok {
if err := s.TSDBStore.DeleteShard(id); err != nil {
s.logger.Error(fmt.Sprintf("failed to delete shard ID %d from database %s, retention policy %s: %s",
id, info.db, info.rp, err.Error()))
s.logger.Error(fmt.Sprintf("Failed to delete shard ID %d from database %s, retention policy %s: %v. Will retry in %v", id, info.db, info.rp, err, s.config.CheckInterval))
continue
}
s.logger.Info(fmt.Sprintf("shard ID %d from database %s, retention policy %s, deleted",
id, info.db, info.rp))
s.logger.Info(fmt.Sprintf("Shard ID %d from database %s, retention policy %s, deleted.", id, info.db, info.rp))
}
}
if err := s.MetaClient.PruneShardGroups(); err != nil {
s.logger.Info(fmt.Sprintf("error pruning shard groups: %s", err))
s.logger.Info(fmt.Sprintf("Problem pruning shard groups: %s. Will retry in %v", err, s.config.CheckInterval))
}
}
}

View File

@ -3,6 +3,7 @@ package retention_test
import (
"bytes"
"fmt"
"reflect"
"sync"
"testing"
"time"
@ -65,10 +66,14 @@ func TestService_8819_repro(t *testing.T) {
t.Fatal(err)
}
// Wait for service to run.
// Wait for service to run one sweep of all dbs/rps/shards.
if err := <-errC; err != nil {
t.Fatalf("%dth iteration: %v", i, err)
}
if err := s.Close(); err != nil {
t.Fatal(err)
}
}
}
@ -76,7 +81,7 @@ func testService_8819_repro(t *testing.T) (*Service, chan error) {
c := retention.NewConfig()
c.CheckInterval = toml.Duration(time.Millisecond)
s := NewService(c)
errC := make(chan error)
errC := make(chan error, 1) // Buffer Important to prevent deadlock.
// A database and a bunch of shards
var mu sync.Mutex
@ -102,7 +107,6 @@ func testService_8819_repro(t *testing.T) (*Service, chan error) {
},
},
},
// TODO - add expired stuff
},
}
@ -115,10 +119,13 @@ func testService_8819_repro(t *testing.T) (*Service, chan error) {
s.MetaClient.DeleteShardGroupFn = func(database string, policy string, id uint64) error {
if database != "db0" {
errC <- fmt.Errorf("wrong db name: %s", database)
return nil
} else if policy != "autogen" {
errC <- fmt.Errorf("wrong rp name: %s", policy)
return nil
} else if id != 1 {
errC <- fmt.Errorf("wrong shard group id: %d", id)
return nil
}
// remove the associated shards (3 and 9) from the shards slice...
@ -156,8 +163,15 @@ func testService_8819_repro(t *testing.T) (*Service, chan error) {
if !found {
errC <- fmt.Errorf("local shard %d present, yet it's missing from meta store. %v -- %v ", lid, shards, localShards)
return nil
}
}
// We should have removed shards 3 and 9
if !reflect.DeepEqual(localShards, []uint64{5, 8, 11}) {
errC <- fmt.Errorf("removed shards still present locally: %v", localShards)
return nil
}
errC <- nil
return nil
}