Refactor datacoord segment status (#6039)

* Refactor datacoord segment status

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

* Remove lastExpireTime

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/6069/head
congqixia 2021-06-24 14:20:10 +08:00 committed by GitHub
parent 99f61b8caf
commit f6761bbc33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 196 additions and 130 deletions

View File

@ -14,6 +14,8 @@ package datacoord
import (
"sort"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -60,47 +62,45 @@ type sealPolicy interface {
}
// segmentSealPolicy seal policy applies to segment
type segmentSealPolicy func(*segmentStatus, Timestamp) bool
type segmentSealPolicy func(status *segmentStatus, info *datapb.SegmentInfo, ts Timestamp) bool
// channelSealPolicy seal policy applies to channel
type channelSealPolicy func(string, []*segmentStatus, Timestamp) []*segmentStatus
type channelSealPolicy func(string, []*datapb.SegmentInfo, Timestamp) []*datapb.SegmentInfo
// getSegmentCapacityPolicy get segmentSealPolicy with segment size factor policy
func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy {
return func(status *segmentStatus, ts Timestamp) bool {
return func(status *segmentStatus, info *datapb.SegmentInfo, ts Timestamp) bool {
var allocSize int64
for _, allocation := range status.allocations {
allocSize += allocation.numOfRows
}
// max, written, allocated := status.total, status.currentRows, allocSize
// float64(writtenCount) >= Params.SegmentSizeFactor*float64(maxCount)
return float64(status.currentRows) >= sizeFactor*float64(status.total)
return float64(status.currentRows) >= sizeFactor*float64(info.MaxRowNum)
}
}
// getLastExpiresLifetimePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime
func getLastExpiresLifetimePolicy(lifetime uint64) segmentSealPolicy {
return func(status *segmentStatus, ts Timestamp) bool {
return (ts - status.lastExpireTime) > lifetime
return func(status *segmentStatus, info *datapb.SegmentInfo, ts Timestamp) bool {
return (ts - info.LastExpireTime) > lifetime
}
}
// getChannelCapacityPolicy get channelSealPolicy with channel segment capacity policy
func getChannelOpenSegCapacityPolicy(limit int) channelSealPolicy {
return func(channel string, segs []*segmentStatus, ts Timestamp) []*segmentStatus {
return func(channel string, segs []*datapb.SegmentInfo, ts Timestamp) []*datapb.SegmentInfo {
if len(segs) <= limit {
return []*segmentStatus{}
return []*datapb.SegmentInfo{}
}
sortSegStatusByLastExpires(segs)
sortSegmentsByLastExpires(segs)
offLen := len(segs) - limit
return segs[0:offLen]
}
}
// sortSegStatusByLastExpires sort segmentStatus with lastExpireTime ascending order
func sortSegStatusByLastExpires(segs []*segmentStatus) {
func sortSegmentsByLastExpires(segs []*datapb.SegmentInfo) {
sort.Slice(segs, func(i, j int) bool {
return segs[i].lastExpireTime < segs[j].lastExpireTime
return segs[i].LastExpireTime < segs[j].LastExpireTime
})
}
@ -116,14 +116,14 @@ func newSealPolicyV1() sealPolicy {
}
type flushPolicy interface {
apply(status *segmentStatus, t Timestamp) bool
apply(info *datapb.SegmentInfo, t Timestamp) bool
}
type flushPolicyV1 struct {
}
func (p *flushPolicyV1) apply(status *segmentStatus, t Timestamp) bool {
return status.sealed && status.lastExpireTime <= t
func (p *flushPolicyV1) apply(info *datapb.SegmentInfo, t Timestamp) bool {
return info.State == commonpb.SegmentState_Sealed && info.LastExpireTime <= t
}
func newFlushPolicyV1() flushPolicy {

View File

@ -53,25 +53,24 @@ type Manager interface {
GetFlushableSegments(ctx context.Context, channel string, ts Timestamp) ([]UniqueID, error)
// UpdateSegmentStats update segment status
UpdateSegmentStats(stat *internalpb.SegmentStatisticsUpdates)
// ExpireAllocations notify segment status to expire old allocations
ExpireAllocations(channel string, ts Timestamp) error
}
// segmentStatus stores allocation entries and temporary row count
type segmentStatus struct {
id UniqueID
collectionID UniqueID
partitionID UniqueID
sealed bool
total int64
insertChannel string
allocations []*allocation
lastExpireTime Timestamp
currentRows int64
id UniqueID
allocations []*allocation
currentRows int64
}
// allcation entry for segment allocation record
type allocation struct {
numOfRows int64
expireTime Timestamp
}
// SegmentManager handles segment related logic
type SegmentManager struct {
meta *meta
mu sync.RWMutex
@ -85,68 +84,68 @@ type SegmentManager struct {
segmentSealPolicies []segmentSealPolicy
channelSealPolicies []channelSealPolicy
flushPolicy flushPolicy
allocPool sync.Pool
}
type allocHelper struct {
afterCreateSegment func(segment *datapb.SegmentInfo) error
}
type allocOption struct {
apply func(manager *SegmentManager)
// allocOption allction option applies to `SegmentManager`
type allocOption interface {
apply(manager *SegmentManager)
}
// allocFunc function shortcut for allocOption
type allocFunc func(manager *SegmentManager)
// implement allocOption
func (f allocFunc) apply(manager *SegmentManager) {
f(manager)
}
// get allocOption with allocHelper setting
func withAllocHelper(helper allocHelper) allocOption {
return allocOption{
apply: func(manager *SegmentManager) { manager.helper = helper },
}
return allocFunc(func(manager *SegmentManager) { manager.helper = helper })
}
// get default allocHelper, which does nothing
func defaultAllocHelper() allocHelper {
return allocHelper{
afterCreateSegment: func(segment *datapb.SegmentInfo) error { return nil },
}
}
// get allocOption with estimatePolicy
func withCalUpperLimitPolicy(policy calUpperLimitPolicy) allocOption {
return allocOption{
apply: func(manager *SegmentManager) { manager.estimatePolicy = policy },
}
return allocFunc(func(manager *SegmentManager) { manager.estimatePolicy = policy })
}
// get allocOption with allocPolicy
func withAllocPolicy(policy allocatePolicy) allocOption {
return allocOption{
apply: func(manager *SegmentManager) { manager.allocPolicy = policy },
}
return allocFunc(func(manager *SegmentManager) { manager.allocPolicy = policy })
}
// func withSealPolicy(policy sealPolicy) allocOption {
// return allocOption{
// apply: func(manager *SegmentManager) { manager.sealPolicy = policy },
// }
// }
// get allocOption with segmentSealPolicies
func withSegmentSealPolices(policies ...segmentSealPolicy) allocOption {
return allocOption{
apply: func(manager *SegmentManager) {
// do override instead of append, to override default options
manager.segmentSealPolicies = policies
},
}
return allocFunc(func(manager *SegmentManager) {
// do override instead of append, to override default options
manager.segmentSealPolicies = policies
})
}
// get allocOption with channelSealPolicies
func withChannelSealPolices(policies ...channelSealPolicy) allocOption {
return allocOption{
apply: func(manager *SegmentManager) {
// do override instead of append, to override default options
manager.channelSealPolicies = policies
},
}
return allocFunc(func(manager *SegmentManager) {
// do override instead of append, to override default options
manager.channelSealPolicies = policies
})
}
// get allocOption with flushPolicy
func withFlushPolicy(policy flushPolicy) allocOption {
return allocOption{
apply: func(manager *SegmentManager) { manager.flushPolicy = policy },
}
return allocFunc(func(manager *SegmentManager) { manager.flushPolicy = policy })
}
func defaultCalUpperLimitPolicy() calUpperLimitPolicy {
@ -169,6 +168,7 @@ func defaultFlushPolicy() flushPolicy {
return newFlushPolicyV1()
}
// newSegmentManager should be the only way to retrieve SegmentManager
func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *SegmentManager {
manager := &SegmentManager{
meta: meta,
@ -181,6 +181,12 @@ func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *Se
segmentSealPolicies: []segmentSealPolicy{defaultSegmentSealPolicy()}, // default only segment size policy
channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy
flushPolicy: defaultFlushPolicy(),
allocPool: sync.Pool{
New: func() interface{} {
return &allocation{}
},
},
}
for _, opt := range opts {
opt.apply(manager)
@ -189,25 +195,44 @@ func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *Se
return manager
}
// loadSegmentsFromMeta generate corresponding segment status for each segment from meta
func (s *SegmentManager) loadSegmentsFromMeta() {
segments := s.meta.GetUnFlushedSegments()
ids := make([]UniqueID, 0, len(segments))
for _, seg := range segments {
ids = append(ids, seg.ID)
stat := &segmentStatus{
id: seg.ID,
collectionID: seg.CollectionID,
partitionID: seg.PartitionID,
total: seg.MaxRowNum,
allocations: []*allocation{},
insertChannel: seg.InsertChannel,
lastExpireTime: seg.LastExpireTime,
sealed: seg.State == commonpb.SegmentState_Sealed,
id: seg.ID,
allocations: make([]*allocation, 0, 16),
}
s.stats[seg.ID] = stat
}
log.Debug("Restore segment allocation", zap.Int64s("segments", ids))
}
// getAllocation unified way to retrieve allocation struct
func (s *SegmentManager) getAllocation(numOfRows int64, expireTs uint64) *allocation {
v := s.allocPool.Get()
if v == nil {
return &allocation{
numOfRows: numOfRows,
expireTime: expireTs,
}
}
a, ok := v.(*allocation)
if !ok {
a = &allocation{}
}
a.numOfRows, a.expireTime = numOfRows, expireTs
return a
}
// putAllocation put allocation for recycling
func (s *SegmentManager) putAllocation(a *allocation) {
s.allocPool.Put(a)
}
// AllocSegment allocate segment per request collcation, partication, channel and rows
func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID,
partitionID UniqueID, channelName string, requestRows int64) (segID UniqueID, retCount int64, expireTime Timestamp, err error) {
sp, _ := trace.StartSpanFromContext(ctx)
@ -217,12 +242,18 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
var success bool
var status *segmentStatus
var info *datapb.SegmentInfo
for _, segStatus := range s.stats {
if segStatus.sealed || segStatus.collectionID != collectionID ||
segStatus.partitionID != partitionID || segStatus.insertChannel != channelName {
info, err = s.meta.GetSegment(segStatus.id)
if err != nil {
log.Warn("Failed to get seginfo from meta", zap.Int64("id", segStatus.id), zap.Error(err))
continue
}
success, err = s.alloc(segStatus, requestRows)
if info.State == commonpb.SegmentState_Sealed || info.CollectionID != collectionID ||
info.PartitionID != partitionID || info.InsertChannel != channelName {
continue
}
success, err = s.alloc(segStatus, info, requestRows)
if err != nil {
return
}
@ -237,7 +268,12 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
if err != nil {
return
}
success, err = s.alloc(status, requestRows)
info, err = s.meta.GetSegment(status.id)
if err != nil {
log.Warn("Failed to get seg into from meta", zap.Int64("id", status.id), zap.Error(err))
return
}
success, err = s.alloc(status, info, requestRows)
if err != nil {
return
}
@ -249,16 +285,17 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
segID = status.id
retCount = requestRows
expireTime = status.lastExpireTime
expireTime = info.LastExpireTime
return
}
func (s *SegmentManager) alloc(status *segmentStatus, numOfRows int64) (bool, error) {
func (s *SegmentManager) alloc(status *segmentStatus, info *datapb.SegmentInfo, numOfRows int64) (bool, error) {
var allocSize int64
for _, allocItem := range status.allocations {
allocSize += allocItem.numOfRows
}
if !s.allocPolicy.apply(status.total, status.currentRows, allocSize, numOfRows) {
if !s.allocPolicy.apply(info.MaxRowNum, status.currentRows, allocSize, numOfRows) {
return false, nil
}
@ -267,11 +304,10 @@ func (s *SegmentManager) alloc(status *segmentStatus, numOfRows int64) (bool, er
return false, err
}
alloc := &allocation{
numOfRows: numOfRows,
expireTime: expireTs,
}
status.lastExpireTime = expireTs
alloc := s.getAllocation(numOfRows, expireTs)
//safe here since info is a clone, used to pass expireTs out
info.LastExpireTime = expireTs
status.allocations = append(status.allocations, alloc)
if err := s.meta.SetLastExpireTime(status.id, expireTs); err != nil {
@ -303,15 +339,9 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
return nil, err
}
status := &segmentStatus{
id: id,
collectionID: collectionID,
partitionID: partitionID,
sealed: false,
total: int64(maxNumOfRows),
insertChannel: channelName,
allocations: []*allocation{},
lastExpireTime: 0,
currentRows: 0,
id: id,
allocations: make([]*allocation, 0, 16),
currentRows: 0,
}
s.stats[id] = status
@ -358,6 +388,12 @@ func (s *SegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
stat, ok := s.stats[segmentID]
if ok && stat != nil {
for _, allocation := range stat.allocations {
s.putAllocation(allocation)
}
}
delete(s.stats, segmentID)
}
@ -368,17 +404,21 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu
defer s.mu.Unlock()
ret := make([]UniqueID, 0)
for _, status := range s.stats {
if status.collectionID != collectionID {
info, err := s.meta.GetSegment(status.id)
if err != nil {
log.Warn("Failed to get seg info from meta", zap.Int64("id", status.id), zap.Error(err))
continue
}
if status.sealed {
if info.CollectionID != collectionID {
continue
}
if info.State == commonpb.SegmentState_Sealed {
ret = append(ret, status.id)
continue
}
if err := s.meta.SealSegment(status.id); err != nil {
return nil, err
}
status.sealed = true
ret = append(ret, status.id)
}
return ret, nil
@ -394,12 +434,18 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin
return nil, err
}
ret := make([]UniqueID, 0)
segments := s.meta.GetSegmentsByChannel(channel)
mIDSegment := make(map[UniqueID]*datapb.SegmentInfo)
for _, segment := range segments {
mIDSegment[segment.ID] = segment
}
ret := make([]UniqueID, 0, len(segments))
for _, status := range s.stats {
if status.insertChannel != channel {
info, has := mIDSegment[status.id]
if !has {
continue
}
if s.flushPolicy.apply(status, t) {
if s.flushPolicy.apply(info, t) {
ret = append(ret, status.id)
}
}
@ -407,6 +453,7 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin
return ret, nil
}
// UpdateSegmentStats update number of rows in memory
func (s *SegmentManager) UpdateSegmentStats(stat *internalpb.SegmentStatisticsUpdates) {
s.mu.Lock()
defer s.mu.Unlock()
@ -417,51 +464,71 @@ func (s *SegmentManager) UpdateSegmentStats(stat *internalpb.SegmentStatisticsUp
segment.currentRows = stat.NumRows
}
// tryToSealSegment applies segment & channel seal policies
func (s *SegmentManager) tryToSealSegment(ts Timestamp) error {
channelInfo := make(map[string][]*segmentStatus)
// ExpireAllocations notify segment status to expire old allocations
func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) error {
s.mu.Lock()
defer s.mu.Unlock()
segments := s.meta.GetSegmentsByChannel(channel)
mIDSeg := make(map[UniqueID]struct{})
for _, segment := range segments {
mIDSeg[segment.ID] = struct{}{}
}
for _, status := range s.stats {
channelInfo[status.insertChannel] = append(channelInfo[status.insertChannel], status)
if status.sealed {
_, ok := mIDSeg[status.id]
if !ok {
continue
}
// change shouldSeal to segment seal policy logic
for _, policy := range s.segmentSealPolicies {
if policy(status, ts) {
if err := s.meta.SealSegment(status.id); err != nil {
return err
}
status.sealed = true
break
}
}
}
for channel, segmentStats := range channelInfo {
for _, policy := range s.channelSealPolicies {
vs := policy(channel, segmentStats, ts)
for _, status := range vs {
if status.sealed {
continue
}
if err := s.meta.SealSegment(status.id); err != nil {
return err
}
status.sealed = true
for i := 0; i < len(status.allocations); i++ {
if status.allocations[i].expireTime <= ts {
a := status.allocations[i]
status.allocations = append(status.allocations[:i], status.allocations[i+1:]...)
s.putAllocation(a)
}
}
}
return nil
}
// func (s *SegmentManager) shouldSeal(segStatus *segmentStatus) (bool, error) {
// var allocSize int64
// for _, allocation := range segStatus.allocations {
// allocSize += allocation.rowNums
// }
// ret := s.sealPolicy.apply(segStatus.total, segStatus.currentRows, allocSize)
// return ret, nil
// }
// tryToSealSegment applies segment & channel seal policies
func (s *SegmentManager) tryToSealSegment(ts Timestamp) error {
channelInfo := make(map[string][]*datapb.SegmentInfo)
mIDSegment := make(map[UniqueID]*datapb.SegmentInfo)
for _, status := range s.stats {
info, err := s.meta.GetSegment(status.id)
if err != nil {
log.Warn("Failed to get seg info from meta", zap.Int64("id", status.id), zap.Error(err))
continue
}
mIDSegment[status.id] = info
channelInfo[info.InsertChannel] = append(channelInfo[info.InsertChannel], info)
if info.State == commonpb.SegmentState_Sealed {
continue
}
// change shouldSeal to segment seal policy logic
for _, policy := range s.segmentSealPolicies {
if policy(status, info, ts) {
if err := s.meta.SealSegment(status.id); err != nil {
return err
}
break
}
}
}
for channel, segmentInfos := range channelInfo {
for _, policy := range s.channelSealPolicies {
vs := policy(channel, segmentInfos, ts)
for _, info := range vs {
if info.State == commonpb.SegmentState_Sealed {
continue
}
if err := s.meta.SealSegment(info.ID); err != nil {
return err
}
}
}
}
return nil
}
// only for test
func (s *SegmentManager) SealSegment(ctx context.Context, segmentID UniqueID) error {
@ -472,7 +539,6 @@ func (s *SegmentManager) SealSegment(ctx context.Context, segmentID UniqueID) er
if err := s.meta.SealSegment(segmentID); err != nil {
return err
}
s.stats[segmentID].sealed = true
return nil
}

View File

@ -141,6 +141,5 @@ func TestSaveSegmentsToMeta(t *testing.T) {
segment, err := meta.GetSegment(segID)
assert.Nil(t, err)
assert.EqualValues(t, segment.LastExpireTime, expireTs)
assert.EqualValues(t, segStatus.total, segment.MaxRowNum)
assert.EqualValues(t, commonpb.SegmentState_Sealed, segment.State)
}

View File

@ -344,6 +344,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
if len(segmentInfos) > 0 {
s.cluster.flush(segmentInfos)
}
s.segmentManager.ExpireAllocations(ch, ts)
}
}
}