Add shard group "DeletedAt" timestamps
parent
37f9a886b7
commit
8600e2e036
26
meta/data.go
26
meta/data.go
|
@ -240,7 +240,14 @@ func (data *Data) ShardGroups(database, policy string) ([]ShardGroupInfo, error)
|
|||
} else if rpi == nil {
|
||||
return nil, ErrRetentionPolicyNotFound
|
||||
}
|
||||
return rpi.ShardGroups, nil
|
||||
groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups))
|
||||
for _, g := range rpi.ShardGroups {
|
||||
if g.Deleted() {
|
||||
continue
|
||||
}
|
||||
groups = append(groups, g)
|
||||
}
|
||||
return groups, nil
|
||||
}
|
||||
|
||||
// ShardGroupByTimestamp returns the shard group on a database and policy for a given timestamp.
|
||||
|
@ -331,10 +338,10 @@ func (data *Data) DeleteShardGroup(database, policy string, id uint64) error {
|
|||
return ErrRetentionPolicyNotFound
|
||||
}
|
||||
|
||||
// Find shard group by ID and remove it.
|
||||
// Find shard group by ID and set its deletion timestamp.
|
||||
for i := range rpi.ShardGroups {
|
||||
if rpi.ShardGroups[i].ID == id {
|
||||
rpi.ShardGroups = append(rpi.ShardGroups[:i], rpi.ShardGroups[i+1:]...)
|
||||
rpi.ShardGroups[i].DeletedAt = time.Now().UTC()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -568,7 +575,7 @@ func NewRetentionPolicyInfo(name string) *RetentionPolicyInfo {
|
|||
// ShardGroupByTimestamp returns the shard group in the policy that contains the timestamp.
|
||||
func (rpi *RetentionPolicyInfo) ShardGroupByTimestamp(timestamp time.Time) *ShardGroupInfo {
|
||||
for i := range rpi.ShardGroups {
|
||||
if rpi.ShardGroups[i].Contains(timestamp) {
|
||||
if rpi.ShardGroups[i].Contains(timestamp) && !rpi.ShardGroups[i].Deleted() {
|
||||
return &rpi.ShardGroups[i]
|
||||
}
|
||||
}
|
||||
|
@ -609,12 +616,16 @@ func shardGroupDuration(d time.Duration) time.Duration {
|
|||
return 1 * time.Hour
|
||||
}
|
||||
|
||||
// ShardGroupInfo represents metadata about a shard group.
|
||||
// ShardGroupInfo represents metadata about a shard group. The DeletedAt field is important
|
||||
// because it makes it clear that a ShardGroup has been marked as deleted, and allow the system
|
||||
// to be sure that a ShardGroup is not simply missing. If the DeletedAt is set, the system can
|
||||
// safely delete any associated shards.
|
||||
type ShardGroupInfo struct {
|
||||
ID uint64
|
||||
StartTime time.Time
|
||||
EndTime time.Time
|
||||
Shards []ShardInfo
|
||||
DeletedAt time.Time
|
||||
}
|
||||
|
||||
// Contains return true if the shard group contains data for the timestamp.
|
||||
|
@ -627,6 +638,11 @@ func (sgi *ShardGroupInfo) Overlaps(min, max time.Time) bool {
|
|||
return !sgi.StartTime.After(max) && sgi.EndTime.After(min)
|
||||
}
|
||||
|
||||
// Deleted returns whether this ShardGroup has been deleted.
|
||||
func (sgi *ShardGroupInfo) Deleted() bool {
|
||||
return !sgi.DeletedAt.IsZero()
|
||||
}
|
||||
|
||||
// clone returns a deep copy of sgi.
|
||||
func (sgi ShardGroupInfo) clone() ShardGroupInfo {
|
||||
other := sgi
|
||||
|
|
|
@ -365,8 +365,9 @@ func TestData_DeleteShardGroup(t *testing.T) {
|
|||
|
||||
if err := data.DeleteShardGroup("db0", "rp0", 1); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if len(data.Databases[0].RetentionPolicies[0].ShardGroups) != 0 {
|
||||
t.Fatalf("unexpected shard groups: %#v", data.Databases[0].RetentionPolicies[0].ShardGroups)
|
||||
}
|
||||
if sg := data.Databases[0].RetentionPolicies[0].ShardGroups[0]; !sg.Deleted() {
|
||||
t.Fatalf("shard group not correctly flagged as deleted")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -525,7 +525,7 @@ func (s *Store) CreateShardGroupIfNotExists(database, policy string, timestamp t
|
|||
// Try to find shard group locally first.
|
||||
if sgi, err := s.ShardGroupByTimestamp(database, policy, timestamp); err != nil {
|
||||
return nil, err
|
||||
} else if sgi != nil {
|
||||
} else if sgi != nil && !sgi.Deleted() {
|
||||
return sgi, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -107,16 +107,17 @@ func (s *Service) deleteShards() {
|
|||
case <-ticker.C:
|
||||
s.logger.Println("retention policy shard deletion commencing")
|
||||
|
||||
// Build a list of all shard IDs that exist.
|
||||
shardIDs := make(map[uint64]struct{}, 0)
|
||||
deletedShardIDs := make(map[uint64]struct{}, 0)
|
||||
s.MetaStore.VisitShardGroups(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo, g meta.ShardGroupInfo) {
|
||||
for _, sh := range g.Shards {
|
||||
shardIDs[sh.ID] = struct{}{}
|
||||
if g.Deleted() {
|
||||
for _, sh := range g.Shards {
|
||||
deletedShardIDs[sh.ID] = struct{}{}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
for _, id := range s.TSDBStore.ShardIDs() {
|
||||
if _, ok := shardIDs[id]; !ok {
|
||||
if _, ok := deletedShardIDs[id]; ok {
|
||||
if err := s.TSDBStore.DeleteShard(id); err != nil {
|
||||
s.logger.Printf("failed to delete shard ID %d: %s", id, err.Error())
|
||||
continue
|
||||
|
|
Loading…
Reference in New Issue