diff --git a/services/meta/data.go b/services/meta/data.go index ab7c04ce4e..31a21fcc04 100644 --- a/services/meta/data.go +++ b/services/meta/data.go @@ -870,10 +870,12 @@ func NewRetentionPolicyInfo(name string) *RetentionPolicyInfo { // ShardGroupByTimestamp returns the shard group in the policy that contains the timestamp. func (rpi *RetentionPolicyInfo) ShardGroupByTimestamp(timestamp time.Time) *ShardGroupInfo { for i := range rpi.ShardGroups { - if rpi.ShardGroups[i].Contains(timestamp) && !rpi.ShardGroups[i].Deleted() { + sgi := &rpi.ShardGroups[i] + if sgi.Contains(timestamp) && !sgi.Deleted() && (!sgi.Truncated() || timestamp.Before(sgi.TruncatedAt)) { return &rpi.ShardGroups[i] } } + return nil } @@ -997,11 +999,12 @@ func normalisedShardDuration(sgd, d time.Duration) time.Duration { // to be sure that a ShardGroup is not simply missing. If the DeletedAt is set, the system can // safely delete any associated shards. type ShardGroupInfo struct { - ID uint64 - StartTime time.Time - EndTime time.Time - DeletedAt time.Time - Shards []ShardInfo + ID uint64 + StartTime time.Time + EndTime time.Time + DeletedAt time.Time + Shards []ShardInfo + TruncatedAt time.Time } // ShardGroupInfos implements sort.Interface on []ShardGroupInfo, based @@ -1027,6 +1030,11 @@ func (sgi *ShardGroupInfo) Deleted() bool { return !sgi.DeletedAt.IsZero() } +// Truncated returns true if this ShardGroup has been truncated (no new writes) +func (sgi *ShardGroupInfo) Truncated() bool { + return !sgi.TruncatedAt.IsZero() +} + // clone returns a deep copy of sgi. func (sgi ShardGroupInfo) clone() ShardGroupInfo { other := sgi @@ -1055,6 +1063,10 @@ func (sgi *ShardGroupInfo) marshal() *internal.ShardGroupInfo { DeletedAt: proto.Int64(MarshalTime(sgi.DeletedAt)), } + if !sgi.TruncatedAt.IsZero() { + pb.TruncatedAt = proto.Int64(MarshalTime(sgi.TruncatedAt)) + } + pb.Shards = make([]*internal.ShardInfo, len(sgi.Shards)) for i := range sgi.Shards { pb.Shards[i] = sgi.Shards[i].marshal() @@ -1070,6 +1082,10 @@ func (sgi *ShardGroupInfo) unmarshal(pb *internal.ShardGroupInfo) { sgi.EndTime = UnmarshalTime(pb.GetEndTime()) sgi.DeletedAt = UnmarshalTime(pb.GetDeletedAt()) + if pb != nil && pb.TruncatedAt != nil { + sgi.TruncatedAt = UnmarshalTime(pb.GetTruncatedAt()) + } + if len(pb.GetShards()) > 0 { sgi.Shards = make([]ShardInfo, len(pb.GetShards())) for i, x := range pb.GetShards() { diff --git a/services/meta/internal/meta.pb.go b/services/meta/internal/meta.pb.go index 4743e67a4c..61bd4f7ef2 100644 --- a/services/meta/internal/meta.pb.go +++ b/services/meta/internal/meta.pb.go @@ -408,6 +408,7 @@ type ShardGroupInfo struct { EndTime *int64 `protobuf:"varint,3,req,name=EndTime" json:"EndTime,omitempty"` DeletedAt *int64 `protobuf:"varint,4,req,name=DeletedAt" json:"DeletedAt,omitempty"` Shards []*ShardInfo `protobuf:"bytes,5,rep,name=Shards" json:"Shards,omitempty"` + TruncatedAt *int64 `protobuf:"varint,6,opt,name=TruncatedAt" json:"TruncatedAt,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -450,6 +451,13 @@ func (m *ShardGroupInfo) GetShards() []*ShardInfo { return nil } +func (m *ShardGroupInfo) GetTruncatedAt() int64 { + if m != nil && m.TruncatedAt != nil { + return *m.TruncatedAt + } + return 0 +} + type ShardInfo struct { ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"` OwnerIDs []uint64 `protobuf:"varint,2,rep,name=OwnerIDs" json:"OwnerIDs,omitempty"` diff --git a/services/meta/internal/meta.proto b/services/meta/internal/meta.proto index e220dac855..2fc5128351 100644 --- a/services/meta/internal/meta.proto +++ b/services/meta/internal/meta.proto @@ -52,6 +52,7 @@ message ShardGroupInfo { required int64 EndTime = 3; required int64 DeletedAt = 4; repeated ShardInfo Shards = 5; + optional int64 TruncatedAt = 6; } message ShardInfo {