Merge pull request #3931 from influxdb/no_past_shards
Don't precreate shard groups entirely in pastpull/3993/head
commit
3a8b02afc3
|
@ -26,6 +26,7 @@ With this release InfluxDB is moving to Go 1.5.
|
||||||
- [#3901](https://github.com/influxdb/influxdb/pull/3901): Unblock relaxed write consistency level Thanks @takayuki!
|
- [#3901](https://github.com/influxdb/influxdb/pull/3901): Unblock relaxed write consistency level Thanks @takayuki!
|
||||||
- [#3950](https://github.com/influxdb/influxdb/pull/3950): Limit bz1 quickcheck tests to 10 iterations on CI
|
- [#3950](https://github.com/influxdb/influxdb/pull/3950): Limit bz1 quickcheck tests to 10 iterations on CI
|
||||||
- [#3977](https://github.com/influxdb/influxdb/pull/3977): Silence wal logging during testing
|
- [#3977](https://github.com/influxdb/influxdb/pull/3977): Silence wal logging during testing
|
||||||
|
- [#3931](https://github.com/influxdb/influxdb/pull/3931): Don't precreate shard groups entirely in the past
|
||||||
|
|
||||||
## v0.9.3 [2015-08-26]
|
## v0.9.3 [2015-08-26]
|
||||||
|
|
||||||
|
|
|
@ -1389,38 +1389,34 @@ func (s *Store) UserCount() (count int, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// PrecreateShardGroups creates shard groups whose endtime is before the cutoff time passed in. This
|
// PrecreateShardGroups creates shard groups whose endtime is before the 'to' time passed in, but
|
||||||
// avoid the need for these shards to be created when data for the corresponding time range arrives.
|
// is yet to expire before 'from'. This is to avoid the need for these shards to be created when data
|
||||||
// Shard creation involves Raft consensus, and precreation avoids taking the hit at write-time.
|
// for the corresponding time range arrives. Shard creation involves Raft consensus, and precreation
|
||||||
func (s *Store) PrecreateShardGroups(cutoff time.Time) error {
|
// avoids taking the hit at write-time.
|
||||||
|
func (s *Store) PrecreateShardGroups(from, to time.Time) error {
|
||||||
s.read(func(data *Data) error {
|
s.read(func(data *Data) error {
|
||||||
for _, di := range data.Databases {
|
for _, di := range data.Databases {
|
||||||
for _, rp := range di.RetentionPolicies {
|
for _, rp := range di.RetentionPolicies {
|
||||||
for _, g := range rp.ShardGroups {
|
if len(rp.ShardGroups) == 0 {
|
||||||
// Check to see if it is not deleted and going to end before our interval
|
// No data was ever written to this group, or all groups have been deleted.
|
||||||
if !g.Deleted() && g.EndTime.Before(cutoff) {
|
continue
|
||||||
nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)
|
}
|
||||||
|
g := rp.ShardGroups[len(rp.ShardGroups)-1] // Get the last group in time.
|
||||||
|
if !g.Deleted() && g.EndTime.Before(to) && g.EndTime.After(from) {
|
||||||
|
// Group is not deleted, will end before the future time, but is still yet to expire.
|
||||||
|
// This last check is important, so the system doesn't create shards groups wholly
|
||||||
|
// in the past.
|
||||||
|
|
||||||
// Check if successive shard group exists.
|
// Create successive shard group.
|
||||||
if sgi, err := s.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); err != nil {
|
nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)
|
||||||
s.Logger.Printf("failed to check if successive shard group for group exists %d: %s",
|
if newGroup, err := s.CreateShardGroupIfNotExists(di.Name, rp.Name, nextShardGroupTime); err != nil {
|
||||||
g.ID, err.Error())
|
s.Logger.Printf("failed to create successive shard group for group %d: %s",
|
||||||
continue
|
g.ID, err.Error())
|
||||||
} else if sgi != nil && !sgi.Deleted() {
|
} else {
|
||||||
continue
|
s.Logger.Printf("new shard group %d successfully created for database %s, retention policy %s",
|
||||||
}
|
newGroup.ID, di.Name, rp.Name)
|
||||||
|
|
||||||
// It doesn't. Create it.
|
|
||||||
if newGroup, err := s.CreateShardGroupIfNotExists(di.Name, rp.Name, nextShardGroupTime); err != nil {
|
|
||||||
s.Logger.Printf("failed to create successive shard group for group %d: %s",
|
|
||||||
g.ID, err.Error())
|
|
||||||
} else {
|
|
||||||
s.Logger.Printf("new shard group %d successfully created for database %s, retention policy %s",
|
|
||||||
newGroup.ID, di.Name, rp.Name)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -489,30 +489,57 @@ func TestStore_PrecreateShardGroup(t *testing.T) {
|
||||||
s := MustOpenStore()
|
s := MustOpenStore()
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
// Create node, database, policy, & group.
|
// Create node, database, policy, & groups.
|
||||||
if _, err := s.CreateNode("host0"); err != nil {
|
if _, err := s.CreateNode("host0"); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if _, err := s.CreateDatabase("db0"); err != nil {
|
} else if _, err := s.CreateDatabase("db0"); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
|
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
|
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp1", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if err := s.PrecreateShardGroups(time.Date(2001, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
|
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp2", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2001, time.January, 1, 1, 0, 0, 0, time.UTC)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if _, err := s.CreateShardGroup("db0", "rp1", time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := s.PrecreateShardGroups(time.Date(2001, time.January, 1, 0, 0, 0, 0, time.UTC), time.Date(2001, time.January, 1, 3, 0, 0, 0, time.UTC)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// rp0 should undergo precreation.
|
||||||
groups, err := s.ShardGroups("db0", "rp0")
|
groups, err := s.ShardGroups("db0", "rp0")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if len(groups) != 2 {
|
if len(groups) != 2 {
|
||||||
t.Fatalf("shard group precreation failed to create new shard group")
|
t.Fatalf("shard group precreation failed to create new shard group for rp0")
|
||||||
}
|
}
|
||||||
if groups[1].StartTime != time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC) {
|
if groups[1].StartTime != time.Date(2001, time.January, 1, 2, 0, 0, 0, time.UTC) {
|
||||||
t.Fatalf("precreated shard group has wrong start time, exp %s, got %s",
|
t.Fatalf("precreated shard group has wrong start time, exp %s, got %s",
|
||||||
time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC), groups[1].StartTime)
|
time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC), groups[1].StartTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rp1 should not undergo precreation since it is completely in the past.
|
||||||
|
groups, err = s.ShardGroups("db0", "rp1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(groups) != 1 {
|
||||||
|
t.Fatalf("shard group precreation created new shard group for rp1")
|
||||||
|
}
|
||||||
|
|
||||||
|
// rp2 should not undergo precreation since it has no shards.
|
||||||
|
groups, err = s.ShardGroups("db0", "rp2")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(groups) != 0 {
|
||||||
|
t.Fatalf("shard group precreation created new shard group for rp2")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure the store can create a new continuous query.
|
// Ensure the store can create a new continuous query.
|
||||||
|
|
|
@ -18,7 +18,7 @@ type Service struct {
|
||||||
|
|
||||||
MetaStore interface {
|
MetaStore interface {
|
||||||
IsLeader() bool
|
IsLeader() bool
|
||||||
PrecreateShardGroups(cutoff time.Time) error
|
PrecreateShardGroups(now, cutoff time.Time) error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,9 +91,9 @@ func (s *Service) runPrecreation() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// precreate performs actual resource precreation.
|
// precreate performs actual resource precreation.
|
||||||
func (s *Service) precreate(t time.Time) error {
|
func (s *Service) precreate(now time.Time) error {
|
||||||
cutoff := t.Add(s.advancePeriod).UTC()
|
cutoff := now.Add(s.advancePeriod).UTC()
|
||||||
if err := s.MetaStore.PrecreateShardGroups(cutoff); err != nil {
|
if err := s.MetaStore.PrecreateShardGroups(now, cutoff); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -18,7 +18,7 @@ func Test_ShardPrecreation(t *testing.T) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
ms := metaStore{
|
ms := metaStore{
|
||||||
PrecreateShardGroupsFn: func(u time.Time) error {
|
PrecreateShardGroupsFn: func(v, u time.Time) error {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
if u != now.Add(advancePeriod) {
|
if u != now.Add(advancePeriod) {
|
||||||
t.Fatalf("precreation called with wrong time, got %s, exp %s", u, now)
|
t.Fatalf("precreation called with wrong time, got %s, exp %s", u, now)
|
||||||
|
@ -47,13 +47,13 @@ func Test_ShardPrecreation(t *testing.T) {
|
||||||
|
|
||||||
// PointsWriter represents a mock impl of PointsWriter.
|
// PointsWriter represents a mock impl of PointsWriter.
|
||||||
type metaStore struct {
|
type metaStore struct {
|
||||||
PrecreateShardGroupsFn func(cutoff time.Time) error
|
PrecreateShardGroupsFn func(now, cutoff time.Time) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m metaStore) IsLeader() bool {
|
func (m metaStore) IsLeader() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m metaStore) PrecreateShardGroups(timestamp time.Time) error {
|
func (m metaStore) PrecreateShardGroups(now, cutoff time.Time) error {
|
||||||
return m.PrecreateShardGroupsFn(timestamp)
|
return m.PrecreateShardGroupsFn(now, cutoff)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue