Add TruncatedAt field to meta.ShardGroupInfo
parent
022479778d
commit
647210c57a
|
@ -870,10 +870,12 @@ func NewRetentionPolicyInfo(name string) *RetentionPolicyInfo {
|
|||
// ShardGroupByTimestamp returns the shard group in the policy that contains the timestamp.
|
||||
func (rpi *RetentionPolicyInfo) ShardGroupByTimestamp(timestamp time.Time) *ShardGroupInfo {
|
||||
for i := range rpi.ShardGroups {
|
||||
if rpi.ShardGroups[i].Contains(timestamp) && !rpi.ShardGroups[i].Deleted() {
|
||||
sgi := &rpi.ShardGroups[i]
|
||||
if sgi.Contains(timestamp) && !sgi.Deleted() && (!sgi.Truncated() || timestamp.Before(sgi.TruncatedAt)) {
|
||||
return &rpi.ShardGroups[i]
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -997,11 +999,12 @@ func normalisedShardDuration(sgd, d time.Duration) time.Duration {
|
|||
// to be sure that a ShardGroup is not simply missing. If the DeletedAt is set, the system can
|
||||
// safely delete any associated shards.
|
||||
type ShardGroupInfo struct {
|
||||
ID uint64
|
||||
StartTime time.Time
|
||||
EndTime time.Time
|
||||
DeletedAt time.Time
|
||||
Shards []ShardInfo
|
||||
ID uint64
|
||||
StartTime time.Time
|
||||
EndTime time.Time
|
||||
DeletedAt time.Time
|
||||
Shards []ShardInfo
|
||||
TruncatedAt time.Time
|
||||
}
|
||||
|
||||
// ShardGroupInfos implements sort.Interface on []ShardGroupInfo, based
|
||||
|
@ -1027,6 +1030,11 @@ func (sgi *ShardGroupInfo) Deleted() bool {
|
|||
return !sgi.DeletedAt.IsZero()
|
||||
}
|
||||
|
||||
// Truncated returns true if this ShardGroup has been truncated (no new writes)
|
||||
func (sgi *ShardGroupInfo) Truncated() bool {
|
||||
return !sgi.TruncatedAt.IsZero()
|
||||
}
|
||||
|
||||
// clone returns a deep copy of sgi.
|
||||
func (sgi ShardGroupInfo) clone() ShardGroupInfo {
|
||||
other := sgi
|
||||
|
@ -1055,6 +1063,10 @@ func (sgi *ShardGroupInfo) marshal() *internal.ShardGroupInfo {
|
|||
DeletedAt: proto.Int64(MarshalTime(sgi.DeletedAt)),
|
||||
}
|
||||
|
||||
if !sgi.TruncatedAt.IsZero() {
|
||||
pb.TruncatedAt = proto.Int64(MarshalTime(sgi.TruncatedAt))
|
||||
}
|
||||
|
||||
pb.Shards = make([]*internal.ShardInfo, len(sgi.Shards))
|
||||
for i := range sgi.Shards {
|
||||
pb.Shards[i] = sgi.Shards[i].marshal()
|
||||
|
@ -1070,6 +1082,10 @@ func (sgi *ShardGroupInfo) unmarshal(pb *internal.ShardGroupInfo) {
|
|||
sgi.EndTime = UnmarshalTime(pb.GetEndTime())
|
||||
sgi.DeletedAt = UnmarshalTime(pb.GetDeletedAt())
|
||||
|
||||
if pb != nil && pb.TruncatedAt != nil {
|
||||
sgi.TruncatedAt = UnmarshalTime(pb.GetTruncatedAt())
|
||||
}
|
||||
|
||||
if len(pb.GetShards()) > 0 {
|
||||
sgi.Shards = make([]ShardInfo, len(pb.GetShards()))
|
||||
for i, x := range pb.GetShards() {
|
||||
|
|
|
@ -408,6 +408,7 @@ type ShardGroupInfo struct {
|
|||
EndTime *int64 `protobuf:"varint,3,req,name=EndTime" json:"EndTime,omitempty"`
|
||||
DeletedAt *int64 `protobuf:"varint,4,req,name=DeletedAt" json:"DeletedAt,omitempty"`
|
||||
Shards []*ShardInfo `protobuf:"bytes,5,rep,name=Shards" json:"Shards,omitempty"`
|
||||
TruncatedAt *int64 `protobuf:"varint,6,opt,name=TruncatedAt" json:"TruncatedAt,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -450,6 +451,13 @@ func (m *ShardGroupInfo) GetShards() []*ShardInfo {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *ShardGroupInfo) GetTruncatedAt() int64 {
|
||||
if m != nil && m.TruncatedAt != nil {
|
||||
return *m.TruncatedAt
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type ShardInfo struct {
|
||||
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
|
||||
OwnerIDs []uint64 `protobuf:"varint,2,rep,name=OwnerIDs" json:"OwnerIDs,omitempty"`
|
||||
|
|
|
@ -52,6 +52,7 @@ message ShardGroupInfo {
|
|||
required int64 EndTime = 3;
|
||||
required int64 DeletedAt = 4;
|
||||
repeated ShardInfo Shards = 5;
|
||||
optional int64 TruncatedAt = 6;
|
||||
}
|
||||
|
||||
message ShardInfo {
|
||||
|
|
Loading…
Reference in New Issue