enhance: Reduce segmentManager lock granularity (#37836)

Use a channel level key lock for segments in segmentManager.

issue: https://github.com/milvus-io/milvus/issues/37633,
https://github.com/milvus-io/milvus/issues/37630

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/38514/head
yihao.dai 2024-12-17 14:12:52 +08:00 committed by GitHub
parent dd4f33ae19
commit d4dab3c62f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 344 additions and 304 deletions

View File

@ -145,9 +145,9 @@ func (_c *MockManager_AllocSegment_Call) RunAndReturn(run func(context.Context,
return _c
}
// DropSegment provides a mock function with given fields: ctx, segmentID
func (_m *MockManager) DropSegment(ctx context.Context, segmentID int64) {
_m.Called(ctx, segmentID)
// DropSegment provides a mock function with given fields: ctx, channel, segmentID
func (_m *MockManager) DropSegment(ctx context.Context, channel string, segmentID int64) {
_m.Called(ctx, channel, segmentID)
}
// MockManager_DropSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegment'
@ -157,14 +157,15 @@ type MockManager_DropSegment_Call struct {
// DropSegment is a helper method to define mock.On call
// - ctx context.Context
// - channel string
// - segmentID int64
func (_e *MockManager_Expecter) DropSegment(ctx interface{}, segmentID interface{}) *MockManager_DropSegment_Call {
return &MockManager_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, segmentID)}
func (_e *MockManager_Expecter) DropSegment(ctx interface{}, channel interface{}, segmentID interface{}) *MockManager_DropSegment_Call {
return &MockManager_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, channel, segmentID)}
}
func (_c *MockManager_DropSegment_Call) Run(run func(ctx context.Context, segmentID int64)) *MockManager_DropSegment_Call {
func (_c *MockManager_DropSegment_Call) Run(run func(ctx context.Context, channel string, segmentID int64)) *MockManager_DropSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64))
run(args[0].(context.Context), args[1].(string), args[2].(int64))
})
return _c
}
@ -174,7 +175,7 @@ func (_c *MockManager_DropSegment_Call) Return() *MockManager_DropSegment_Call {
return _c
}
func (_c *MockManager_DropSegment_Call) RunAndReturn(run func(context.Context, int64)) *MockManager_DropSegment_Call {
func (_c *MockManager_DropSegment_Call) RunAndReturn(run func(context.Context, string, int64)) *MockManager_DropSegment_Call {
_c.Call.Return(run)
return _c
}
@ -214,21 +215,8 @@ func (_c *MockManager_DropSegmentsOfChannel_Call) RunAndReturn(run func(context.
}
// ExpireAllocations provides a mock function with given fields: ctx, channel, ts
func (_m *MockManager) ExpireAllocations(ctx context.Context, channel string, ts uint64) error {
ret := _m.Called(ctx, channel, ts)
if len(ret) == 0 {
panic("no return value specified for ExpireAllocations")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, uint64) error); ok {
r0 = rf(ctx, channel, ts)
} else {
r0 = ret.Error(0)
}
return r0
func (_m *MockManager) ExpireAllocations(ctx context.Context, channel string, ts uint64) {
_m.Called(ctx, channel, ts)
}
// MockManager_ExpireAllocations_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ExpireAllocations'
@ -251,12 +239,12 @@ func (_c *MockManager_ExpireAllocations_Call) Run(run func(ctx context.Context,
return _c
}
func (_c *MockManager_ExpireAllocations_Call) Return(_a0 error) *MockManager_ExpireAllocations_Call {
_c.Call.Return(_a0)
func (_c *MockManager_ExpireAllocations_Call) Return() *MockManager_ExpireAllocations_Call {
_c.Call.Return()
return _c
}
func (_c *MockManager_ExpireAllocations_Call) RunAndReturn(run func(context.Context, string, uint64) error) *MockManager_ExpireAllocations_Call {
func (_c *MockManager_ExpireAllocations_Call) RunAndReturn(run func(context.Context, string, uint64)) *MockManager_ExpireAllocations_Call {
_c.Call.Return(run)
return _c
}
@ -321,9 +309,9 @@ func (_c *MockManager_GetFlushableSegments_Call) RunAndReturn(run func(context.C
return _c
}
// SealAllSegments provides a mock function with given fields: ctx, collectionID, segIDs
func (_m *MockManager) SealAllSegments(ctx context.Context, collectionID int64, segIDs []int64) ([]int64, error) {
ret := _m.Called(ctx, collectionID, segIDs)
// SealAllSegments provides a mock function with given fields: ctx, channel, segIDs
func (_m *MockManager) SealAllSegments(ctx context.Context, channel string, segIDs []int64) ([]int64, error) {
ret := _m.Called(ctx, channel, segIDs)
if len(ret) == 0 {
panic("no return value specified for SealAllSegments")
@ -331,19 +319,19 @@ func (_m *MockManager) SealAllSegments(ctx context.Context, collectionID int64,
var r0 []int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64, []int64) ([]int64, error)); ok {
return rf(ctx, collectionID, segIDs)
if rf, ok := ret.Get(0).(func(context.Context, string, []int64) ([]int64, error)); ok {
return rf(ctx, channel, segIDs)
}
if rf, ok := ret.Get(0).(func(context.Context, int64, []int64) []int64); ok {
r0 = rf(ctx, collectionID, segIDs)
if rf, ok := ret.Get(0).(func(context.Context, string, []int64) []int64); ok {
r0 = rf(ctx, channel, segIDs)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]int64)
}
}
if rf, ok := ret.Get(1).(func(context.Context, int64, []int64) error); ok {
r1 = rf(ctx, collectionID, segIDs)
if rf, ok := ret.Get(1).(func(context.Context, string, []int64) error); ok {
r1 = rf(ctx, channel, segIDs)
} else {
r1 = ret.Error(1)
}
@ -358,15 +346,15 @@ type MockManager_SealAllSegments_Call struct {
// SealAllSegments is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - channel string
// - segIDs []int64
func (_e *MockManager_Expecter) SealAllSegments(ctx interface{}, collectionID interface{}, segIDs interface{}) *MockManager_SealAllSegments_Call {
return &MockManager_SealAllSegments_Call{Call: _e.mock.On("SealAllSegments", ctx, collectionID, segIDs)}
func (_e *MockManager_Expecter) SealAllSegments(ctx interface{}, channel interface{}, segIDs interface{}) *MockManager_SealAllSegments_Call {
return &MockManager_SealAllSegments_Call{Call: _e.mock.On("SealAllSegments", ctx, channel, segIDs)}
}
func (_c *MockManager_SealAllSegments_Call) Run(run func(ctx context.Context, collectionID int64, segIDs []int64)) *MockManager_SealAllSegments_Call {
func (_c *MockManager_SealAllSegments_Call) Run(run func(ctx context.Context, channel string, segIDs []int64)) *MockManager_SealAllSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].([]int64))
run(args[0].(context.Context), args[1].(string), args[2].([]int64))
})
return _c
}
@ -376,7 +364,7 @@ func (_c *MockManager_SealAllSegments_Call) Return(_a0 []int64, _a1 error) *Mock
return _c
}
func (_c *MockManager_SealAllSegments_Call) RunAndReturn(run func(context.Context, int64, []int64) ([]int64, error)) *MockManager_SealAllSegments_Call {
func (_c *MockManager_SealAllSegments_Call) RunAndReturn(run func(context.Context, string, []int64) ([]int64, error)) *MockManager_SealAllSegments_Call {
_c.Call.Return(run)
return _c
}

View File

@ -80,14 +80,14 @@ type Manager interface {
AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string) (*SegmentInfo, error)
// DropSegment drops the segment from manager.
DropSegment(ctx context.Context, segmentID UniqueID)
DropSegment(ctx context.Context, channel string, segmentID UniqueID)
// SealAllSegments seals all segments of collection with collectionID and return sealed segments.
// If segIDs is not empty, also seals segments in segIDs.
SealAllSegments(ctx context.Context, collectionID UniqueID, segIDs []UniqueID) ([]UniqueID, error)
SealAllSegments(ctx context.Context, channel string, segIDs []UniqueID) ([]UniqueID, error)
// GetFlushableSegments returns flushable segment ids
GetFlushableSegments(ctx context.Context, channel string, ts Timestamp) ([]UniqueID, error)
// ExpireAllocations notifies segment status to expire old allocations
ExpireAllocations(ctx context.Context, channel string, ts Timestamp) error
ExpireAllocations(ctx context.Context, channel string, ts Timestamp)
// DropSegmentsOfChannel drops all segments in a channel
DropSegmentsOfChannel(ctx context.Context, channel string)
}
@ -109,11 +109,15 @@ var _ Manager = (*SegmentManager)(nil)
// SegmentManager handles L1 segment related logic
type SegmentManager struct {
meta *meta
mu lock.RWMutex
allocator allocator.Allocator
helper allocHelper
segments []UniqueID
meta *meta
allocator allocator.Allocator
helper allocHelper
channelLock *lock.KeyLock[string]
channel2Growing *typeutil.ConcurrentMap[string, typeutil.UniqueSet]
channel2Sealed *typeutil.ConcurrentMap[string, typeutil.UniqueSet]
// Policies
estimatePolicy calUpperLimitPolicy
allocPolicy AllocatePolicy
segmentSealPolicies []SegmentSealPolicy
@ -214,7 +218,9 @@ func newSegmentManager(meta *meta, allocator allocator.Allocator, opts ...allocO
meta: meta,
allocator: allocator,
helper: defaultAllocHelper(),
segments: make([]UniqueID, 0),
channelLock: lock.NewKeyLock[string](),
channel2Growing: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](),
channel2Sealed: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](),
estimatePolicy: defaultCalUpperLimitPolicy(),
allocPolicy: defaultAllocatePolicy(),
segmentSealPolicies: defaultSegmentSealPolicy(),
@ -224,49 +230,57 @@ func newSegmentManager(meta *meta, allocator allocator.Allocator, opts ...allocO
for _, opt := range opts {
opt.apply(manager)
}
manager.loadSegmentsFromMeta()
if err := manager.maybeResetLastExpireForSegments(); err != nil {
latestTs, err := manager.genLastExpireTsForSegments()
if err != nil {
return nil, err
}
manager.loadSegmentsFromMeta(latestTs)
return manager, nil
}
// loadSegmentsFromMeta generate corresponding segment status for each segment from meta
func (s *SegmentManager) loadSegmentsFromMeta() {
segments := s.meta.GetUnFlushedSegments()
segmentsID := make([]UniqueID, 0, len(segments))
for _, segment := range segments {
if segment.Level != datapb.SegmentLevel_L0 {
segmentsID = append(segmentsID, segment.GetID())
func (s *SegmentManager) loadSegmentsFromMeta(latestTs Timestamp) {
unflushed := s.meta.GetUnFlushedSegments()
unflushed = lo.Filter(unflushed, func(segment *SegmentInfo, _ int) bool {
return segment.Level != datapb.SegmentLevel_L0
})
channel2Segments := lo.GroupBy(unflushed, func(segment *SegmentInfo) string {
return segment.GetInsertChannel()
})
for channel, segmentInfos := range channel2Segments {
growing := typeutil.NewUniqueSet()
sealed := typeutil.NewUniqueSet()
for _, segment := range segmentInfos {
// for all sealed and growing segments, need to reset last expire
if segment != nil && segment.GetState() == commonpb.SegmentState_Growing {
s.meta.SetLastExpire(segment.GetID(), latestTs)
growing.Insert(segment.GetID())
}
if segment != nil && segment.GetState() == commonpb.SegmentState_Sealed {
sealed.Insert(segment.GetID())
}
}
s.channel2Growing.Insert(channel, growing)
s.channel2Sealed.Insert(channel, sealed)
}
s.segments = segmentsID
}
func (s *SegmentManager) maybeResetLastExpireForSegments() error {
// for all sealed and growing segments, need to reset last expire
if len(s.segments) > 0 {
var latestTs uint64
allocateErr := retry.Do(context.Background(), func() error {
ts, tryErr := s.genExpireTs(context.Background())
func (s *SegmentManager) genLastExpireTsForSegments() (Timestamp, error) {
var latestTs uint64
allocateErr := retry.Do(context.Background(), func() error {
ts, tryErr := s.genExpireTs(context.Background())
if tryErr != nil {
log.Warn("failed to get ts from rootCoord for globalLastExpire", zap.Error(tryErr))
if tryErr != nil {
return tryErr
}
latestTs = ts
return nil
}, retry.Attempts(Params.DataCoordCfg.AllocLatestExpireAttempt.GetAsUint()), retry.Sleep(200*time.Millisecond))
if allocateErr != nil {
log.Warn("cannot allocate latest lastExpire from rootCoord", zap.Error(allocateErr))
return errors.New("global max expire ts is unavailable for segment manager")
}
for _, sID := range s.segments {
if segment := s.meta.GetSegment(context.TODO(), sID); segment != nil && segment.GetState() == commonpb.SegmentState_Growing {
s.meta.SetLastExpire(sID, latestTs)
}
return tryErr
}
latestTs = ts
return nil
}, retry.Attempts(Params.DataCoordCfg.AllocLatestExpireAttempt.GetAsUint()), retry.Sleep(200*time.Millisecond))
if allocateErr != nil {
log.Warn("cannot allocate latest lastExpire from rootCoord", zap.Error(allocateErr))
return 0, errors.New("global max expire ts is unavailable for segment manager")
}
return nil
return latestTs, nil
}
// AllocSegment allocate segment per request collcation, partication, channel and rows
@ -280,38 +294,33 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
With(zap.Int64("requestRows", requestRows))
_, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "Alloc-Segment")
defer sp.End()
s.mu.Lock()
defer s.mu.Unlock()
s.channelLock.Lock(channelName)
defer s.channelLock.Unlock(channelName)
// filter segments
validSegments := make(map[UniqueID]struct{})
invalidSegments := make(map[UniqueID]struct{})
segments := make([]*SegmentInfo, 0)
for _, segmentID := range s.segments {
segment := s.meta.GetHealthySegment(context.TODO(), segmentID)
segmentInfos := make([]*SegmentInfo, 0)
growing, _ := s.channel2Growing.Get(channelName)
growing.Range(func(segmentID int64) bool {
segment := s.meta.GetHealthySegment(ctx, segmentID)
if segment == nil {
invalidSegments[segmentID] = struct{}{}
continue
log.Warn("failed to get segment, remove it", zap.String("channel", channelName), zap.Int64("segmentID", segmentID))
growing.Remove(segmentID)
return true
}
validSegments[segmentID] = struct{}{}
if !satisfy(segment, collectionID, partitionID, channelName) || !isGrowing(segment) || segment.GetLevel() == datapb.SegmentLevel_L0 {
continue
if segment.GetPartitionID() != partitionID {
return true
}
segments = append(segments, segment)
}
if len(invalidSegments) > 0 {
log.Warn("Failed to get segments infos from meta, clear them", zap.Int64s("segmentIDs", lo.Keys(invalidSegments)))
}
s.segments = lo.Keys(validSegments)
segmentInfos = append(segmentInfos, segment)
return true
})
// Apply allocation policy.
maxCountPerSegment, err := s.estimateMaxNumOfRows(collectionID)
if err != nil {
return nil, err
}
newSegmentAllocations, existedSegmentAllocations := s.allocPolicy(segments,
newSegmentAllocations, existedSegmentAllocations := s.allocPolicy(segmentInfos,
requestRows, int64(maxCountPerSegment), datapb.SegmentLevel_L1)
// create new segments and add allocations
@ -344,15 +353,6 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
return allocations, nil
}
func satisfy(segment *SegmentInfo, collectionID, partitionID UniqueID, channel string) bool {
return segment.GetCollectionID() == collectionID && segment.GetPartitionID() == partitionID &&
segment.GetInsertChannel() == channel
}
func isGrowing(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Growing
}
func (s *SegmentManager) genExpireTs(ctx context.Context) (Timestamp, error) {
ts, err := s.allocator.AllocTimestamp(ctx)
if err != nil {
@ -366,6 +366,8 @@ func (s *SegmentManager) genExpireTs(ctx context.Context) (Timestamp, error) {
// AllocNewGrowingSegment allocates segment for streaming node.
func (s *SegmentManager) AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string) (*SegmentInfo, error) {
s.channelLock.Lock(channelName)
defer s.channelLock.Unlock(channelName)
return s.openNewSegmentWithGivenSegmentID(ctx, collectionID, partitionID, segmentID, channelName)
}
@ -404,7 +406,8 @@ func (s *SegmentManager) openNewSegmentWithGivenSegmentID(ctx context.Context, c
log.Error("failed to add segment to DataCoord", zap.Error(err))
return nil, err
}
s.segments = append(s.segments, segmentID)
growing, _ := s.channel2Growing.GetOrInsert(channelName, typeutil.NewUniqueSet())
growing.Insert(segmentID)
log.Info("datacoord: estimateTotalRows: ",
zap.Int64("CollectionID", segmentInfo.CollectionID),
zap.Int64("SegmentID", segmentInfo.ID),
@ -424,18 +427,21 @@ func (s *SegmentManager) estimateMaxNumOfRows(collectionID UniqueID) (int, error
}
// DropSegment drop the segment from manager.
func (s *SegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
func (s *SegmentManager) DropSegment(ctx context.Context, channel string, segmentID UniqueID) {
_, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "Drop-Segment")
defer sp.End()
s.mu.Lock()
defer s.mu.Unlock()
for i, id := range s.segments {
if id == segmentID {
s.segments = append(s.segments[:i], s.segments[i+1:]...)
break
}
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)
if growing, ok := s.channel2Growing.Get(channel); ok {
growing.Remove(segmentID)
}
segment := s.meta.GetHealthySegment(context.TODO(), segmentID)
if sealed, ok := s.channel2Sealed.Get(channel); ok {
sealed.Remove(segmentID)
}
segment := s.meta.GetHealthySegment(ctx, segmentID)
if segment == nil {
log.Warn("Failed to get segment", zap.Int64("id", segmentID))
return
@ -447,30 +453,46 @@ func (s *SegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
}
// SealAllSegments seals all segments of collection with collectionID and return sealed segments
func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID UniqueID, segIDs []UniqueID) ([]UniqueID, error) {
func (s *SegmentManager) SealAllSegments(ctx context.Context, channel string, segIDs []UniqueID) ([]UniqueID, error) {
_, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "Seal-Segments")
defer sp.End()
s.mu.Lock()
defer s.mu.Unlock()
var ret []UniqueID
segCandidates := s.segments
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)
sealed, _ := s.channel2Sealed.GetOrInsert(channel, typeutil.NewUniqueSet())
growing, _ := s.channel2Growing.Get(channel)
var (
sealedSegments []int64
growingSegments []int64
)
if len(segIDs) != 0 {
segCandidates = segIDs
sealedSegments = s.meta.GetSegments(segIDs, func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Sealed
})
growingSegments = s.meta.GetSegments(segIDs, func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Growing
})
} else {
sealedSegments = s.meta.GetSegments(sealed.Collect(), func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment)
})
growingSegments = s.meta.GetSegments(growing.Collect(), func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment)
})
}
sealedSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID && isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Sealed
})
growingSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID && isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Growing
})
var ret []UniqueID
ret = append(ret, sealedSegments...)
for _, id := range growingSegments {
if err := s.meta.SetState(ctx, id, commonpb.SegmentState_Sealed); err != nil {
return nil, err
}
sealed.Insert(id)
growing.Remove(id)
ret = append(ret, id)
}
return ret, nil
@ -480,37 +502,54 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu
func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel string, t Timestamp) ([]UniqueID, error) {
_, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "Get-Segments")
defer sp.End()
s.mu.Lock()
defer s.mu.Unlock()
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)
// TODO:move tryToSealSegment and dropEmptySealedSegment outside
if err := s.tryToSealSegment(ctx, t, channel); err != nil {
return nil, err
}
// TODO: It's too frequent; perhaps each channel could check once per minute instead.
s.cleanupSealedSegment(ctx, t, channel)
ret := make([]UniqueID, 0, len(s.segments))
for _, id := range s.segments {
info := s.meta.GetHealthySegment(ctx, id)
if info == nil || info.InsertChannel != channel {
continue
sealed, ok := s.channel2Sealed.Get(channel)
if !ok {
return nil, nil
}
ret := make([]UniqueID, 0, sealed.Len())
sealed.Range(func(segmentID int64) bool {
info := s.meta.GetHealthySegment(ctx, segmentID)
if info == nil {
return true
}
if s.flushPolicy(info, t) {
ret = append(ret, id)
ret = append(ret, segmentID)
}
}
return true
})
return ret, nil
}
// ExpireAllocations notify segment status to expire old allocations
func (s *SegmentManager) ExpireAllocations(ctx context.Context, channel string, ts Timestamp) error {
s.mu.Lock()
defer s.mu.Unlock()
for _, id := range s.segments {
func (s *SegmentManager) ExpireAllocations(ctx context.Context, channel string, ts Timestamp) {
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)
growing, ok := s.channel2Growing.Get(channel)
if !ok {
return
}
growing.Range(func(id int64) bool {
segment := s.meta.GetHealthySegment(ctx, id)
if segment == nil || segment.InsertChannel != channel {
continue
if segment == nil {
log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", id))
growing.Remove(id)
return true
}
allocations := make([]*Allocation, 0, len(segment.allocations))
for i := 0; i < len(segment.allocations); i++ {
@ -522,76 +561,89 @@ func (s *SegmentManager) ExpireAllocations(ctx context.Context, channel string,
}
}
s.meta.SetAllocations(segment.GetID(), allocations)
}
return nil
return true
})
}
func (s *SegmentManager) cleanupSealedSegment(ctx context.Context, ts Timestamp, channel string) {
valids := make([]int64, 0, len(s.segments))
for _, id := range s.segments {
segment := s.meta.GetHealthySegment(ctx, id)
if segment == nil || segment.InsertChannel != channel {
valids = append(valids, id)
continue
}
if isEmptySealedSegment(segment, ts) {
log.Info("remove empty sealed segment", zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id))
s.meta.SetState(ctx, id, commonpb.SegmentState_Dropped)
continue
}
valids = append(valids, id)
sealed, ok := s.channel2Sealed.Get(channel)
if !ok {
return
}
s.segments = valids
}
func isEmptySealedSegment(segment *SegmentInfo, ts Timestamp) bool {
return segment.GetState() == commonpb.SegmentState_Sealed && segment.GetLastExpireTime() <= ts && segment.currRows == 0
sealed.Range(func(id int64) bool {
segment := s.meta.GetHealthySegment(ctx, id)
if segment == nil {
log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", id))
sealed.Remove(id)
return true
}
// Check if segment is empty
if segment.GetLastExpireTime() <= ts && segment.currRows == 0 {
log.Info("remove empty sealed segment", zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id))
if err := s.meta.SetState(ctx, id, commonpb.SegmentState_Dropped); err != nil {
log.Warn("failed to set segment state to dropped", zap.String("channel", channel),
zap.Int64("segmentID", id), zap.Error(err))
} else {
sealed.Remove(id)
}
}
return true
})
}
// tryToSealSegment applies segment & channel seal policies
func (s *SegmentManager) tryToSealSegment(ctx context.Context, ts Timestamp, channel string) error {
channelInfo := make(map[string][]*SegmentInfo)
growing, ok := s.channel2Growing.Get(channel)
if !ok {
return nil
}
sealed, _ := s.channel2Sealed.GetOrInsert(channel, typeutil.NewUniqueSet())
channelSegmentInfos := make([]*SegmentInfo, 0, len(growing))
sealedSegments := make(map[int64]struct{})
for _, id := range s.segments {
var setStateErr error
growing.Range(func(id int64) bool {
info := s.meta.GetHealthySegment(ctx, id)
if info == nil || info.InsertChannel != channel {
continue
}
channelInfo[info.InsertChannel] = append(channelInfo[info.InsertChannel], info)
if info.State != commonpb.SegmentState_Growing {
continue
if info == nil {
return true
}
channelSegmentInfos = append(channelSegmentInfos, info)
// change shouldSeal to segment seal policy logic
for _, policy := range s.segmentSealPolicies {
if shouldSeal, reason := policy.ShouldSeal(info, ts); shouldSeal {
log.Info("Seal Segment for policy matched", zap.Int64("segmentID", info.GetID()), zap.String("reason", reason))
if err := s.meta.SetState(ctx, id, commonpb.SegmentState_Sealed); err != nil {
return err
setStateErr = err
return false
}
sealedSegments[id] = struct{}{}
sealed.Insert(id)
growing.Remove(id)
break
}
}
return true
})
if setStateErr != nil {
return setStateErr
}
for channel, segmentInfos := range channelInfo {
for _, policy := range s.channelSealPolicies {
vs, reason := policy(channel, segmentInfos, ts)
for _, info := range vs {
if _, ok := sealedSegments[info.GetID()]; ok {
continue
}
if info.State != commonpb.SegmentState_Growing {
continue
}
if err := s.meta.SetState(ctx, info.GetID(), commonpb.SegmentState_Sealed); err != nil {
return err
}
log.Info("seal segment for channel seal policy matched",
zap.Int64("segmentID", info.GetID()), zap.String("channel", channel), zap.String("reason", reason))
sealedSegments[info.GetID()] = struct{}{}
for _, policy := range s.channelSealPolicies {
vs, reason := policy(channel, channelSegmentInfos, ts)
for _, info := range vs {
if _, ok := sealedSegments[info.GetID()]; ok {
continue
}
if err := s.meta.SetState(ctx, info.GetID(), commonpb.SegmentState_Sealed); err != nil {
return err
}
log.Info("seal segment for channel seal policy matched",
zap.Int64("segmentID", info.GetID()), zap.String("channel", channel), zap.String("reason", reason))
sealedSegments[info.GetID()] = struct{}{}
sealed.Insert(info.GetID())
growing.Remove(info.GetID())
}
}
return nil
@ -599,24 +651,26 @@ func (s *SegmentManager) tryToSealSegment(ctx context.Context, ts Timestamp, cha
// DropSegmentsOfChannel drops all segments in a channel
func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel string) {
s.mu.Lock()
defer s.mu.Unlock()
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)
validSegments := make([]int64, 0, len(s.segments))
for _, sid := range s.segments {
s.channel2Sealed.Remove(channel)
growing, ok := s.channel2Growing.Get(channel)
if !ok {
return
}
growing.Range(func(sid int64) bool {
segment := s.meta.GetHealthySegment(ctx, sid)
if segment == nil {
continue
}
if segment.GetInsertChannel() != channel {
validSegments = append(validSegments, sid)
continue
log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", sid))
growing.Remove(sid)
return true
}
s.meta.SetAllocations(sid, nil)
for _, allocation := range segment.allocations {
putAllocation(allocation)
}
}
s.segments = validSegments
return true
})
s.channel2Growing.Remove(channel)
}

View File

@ -35,7 +35,9 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestManagerOptions(t *testing.T) {
@ -145,19 +147,25 @@ func TestAllocSegment(t *testing.T) {
})
t.Run("alloc clear unhealthy segment", func(t *testing.T) {
allocations1, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100)
vchannel := "c1"
partitionID := int64(100)
allocations1, err := segmentManager.AllocSegment(ctx, collID, partitionID, vchannel, 100)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations1))
assert.EqualValues(t, 1, len(segmentManager.segments))
segments, ok := segmentManager.channel2Growing.Get(vchannel)
assert.True(t, ok)
assert.EqualValues(t, 1, segments.Len())
err = meta.SetState(context.TODO(), allocations1[0].SegmentID, commonpb.SegmentState_Dropped)
assert.NoError(t, err)
allocations2, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100)
allocations2, err := segmentManager.AllocSegment(ctx, collID, partitionID, vchannel, 100)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations2))
// clear old healthy and alloc new
assert.EqualValues(t, 1, len(segmentManager.segments))
segments, ok = segmentManager.channel2Growing.Get(vchannel)
assert.True(t, ok)
assert.EqualValues(t, 1, segments.Len())
assert.NotEqual(t, allocations1[0].SegmentID, allocations2[0].SegmentID)
})
}
@ -220,7 +228,8 @@ func TestLastExpireReset(t *testing.T) {
meta.SetCurrentRows(segmentID1, bigRows)
meta.SetCurrentRows(segmentID2, bigRows)
meta.SetCurrentRows(segmentID3, smallRows)
segmentManager.tryToSealSegment(context.TODO(), expire1, channelName)
err = segmentManager.tryToSealSegment(context.TODO(), expire1, channelName)
assert.NoError(t, err)
assert.Equal(t, commonpb.SegmentState_Sealed, meta.GetSegment(context.TODO(), segmentID1).GetState())
assert.Equal(t, commonpb.SegmentState_Sealed, meta.GetSegment(context.TODO(), segmentID2).GetState())
assert.Equal(t, commonpb.SegmentState_Growing, meta.GetSegment(context.TODO(), segmentID3).GetState())
@ -273,11 +282,14 @@ func TestLoadSegmentsFromMeta(t *testing.T) {
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
vchannel := "ch0"
partitionID := int64(100)
sealedSegment := &datapb.SegmentInfo{
ID: 1,
CollectionID: collID,
PartitionID: 0,
InsertChannel: "",
PartitionID: partitionID,
InsertChannel: vchannel,
State: commonpb.SegmentState_Sealed,
MaxRowNum: 100,
LastExpireTime: 1000,
@ -285,8 +297,8 @@ func TestLoadSegmentsFromMeta(t *testing.T) {
growingSegment := &datapb.SegmentInfo{
ID: 2,
CollectionID: collID,
PartitionID: 0,
InsertChannel: "",
PartitionID: partitionID,
InsertChannel: vchannel,
State: commonpb.SegmentState_Growing,
MaxRowNum: 100,
LastExpireTime: 1000,
@ -294,8 +306,8 @@ func TestLoadSegmentsFromMeta(t *testing.T) {
flushedSegment := &datapb.SegmentInfo{
ID: 3,
CollectionID: collID,
PartitionID: 0,
InsertChannel: "",
PartitionID: partitionID,
InsertChannel: vchannel,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 100,
LastExpireTime: 1000,
@ -307,9 +319,14 @@ func TestLoadSegmentsFromMeta(t *testing.T) {
err = meta.AddSegment(context.TODO(), NewSegmentInfo(flushedSegment))
assert.NoError(t, err)
segmentManager, _ := newSegmentManager(meta, mockAllocator)
segments := segmentManager.segments
assert.EqualValues(t, 2, len(segments))
segmentManager, err := newSegmentManager(meta, mockAllocator)
assert.NoError(t, err)
growing, ok := segmentManager.channel2Growing.Get(vchannel)
assert.True(t, ok)
assert.EqualValues(t, 1, growing.Len())
sealed, ok := segmentManager.channel2Sealed.Get(vchannel)
assert.True(t, ok)
assert.EqualValues(t, 1, sealed.Len())
}
func TestSaveSegmentsToMeta(t *testing.T) {
@ -326,7 +343,7 @@ func TestSaveSegmentsToMeta(t *testing.T) {
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
_, err = segmentManager.SealAllSegments(context.Background(), collID, nil)
_, err = segmentManager.SealAllSegments(context.Background(), "c1", nil)
assert.NoError(t, err)
segment := meta.GetHealthySegment(context.TODO(), allocations[0].SegmentID)
assert.NotNil(t, segment)
@ -348,7 +365,7 @@ func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) {
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
_, err = segmentManager.SealAllSegments(context.Background(), collID, []int64{allocations[0].SegmentID})
_, err = segmentManager.SealAllSegments(context.Background(), "c1", []int64{allocations[0].SegmentID})
assert.NoError(t, err)
segment := meta.GetHealthySegment(context.TODO(), allocations[0].SegmentID)
assert.NotNil(t, segment)
@ -367,14 +384,14 @@ func TestDropSegment(t *testing.T) {
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager, _ := newSegmentManager(meta, mockAllocator)
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 100, "c1", 1000)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
segID := allocations[0].SegmentID
segment := meta.GetHealthySegment(context.TODO(), segID)
assert.NotNil(t, segment)
segmentManager.DropSegment(context.Background(), segID)
segmentManager.DropSegment(context.Background(), "c1", segID)
segment = meta.GetHealthySegment(context.TODO(), segID)
assert.NotNil(t, segment)
}
@ -436,8 +453,7 @@ func TestExpireAllocation(t *testing.T) {
segment := meta.GetHealthySegment(context.TODO(), id)
assert.NotNil(t, segment)
assert.EqualValues(t, 100, len(segment.allocations))
err = segmentManager.ExpireAllocations(context.TODO(), "ch1", maxts)
assert.NoError(t, err)
segmentManager.ExpireAllocations(context.TODO(), "ch1", maxts)
segment = meta.GetHealthySegment(context.TODO(), id)
assert.NotNil(t, segment)
assert.EqualValues(t, 0, len(segment.allocations))
@ -459,7 +475,7 @@ func TestGetFlushableSegments(t *testing.T) {
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
ids, err := segmentManager.SealAllSegments(context.TODO(), collID, nil)
ids, err := segmentManager.SealAllSegments(context.TODO(), "c1", nil)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(ids))
assert.EqualValues(t, allocations[0].SegmentID, ids[0])
@ -753,6 +769,7 @@ func TestAllocationPool(t *testing.T) {
}
func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
partitionID := int64(100)
type fields struct {
meta *meta
segments []UniqueID
@ -775,15 +792,17 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
PartitionID: partitionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
State: commonpb.SegmentState_Sealed,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
PartitionID: partitionID,
InsertChannel: "ch2",
State: commonpb.SegmentState_Flushed,
State: commonpb.SegmentState_Growing,
},
},
},
@ -805,13 +824,15 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
PartitionID: partitionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
State: commonpb.SegmentState_Sealed,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
PartitionID: partitionID,
InsertChannel: "ch2",
State: commonpb.SegmentState_Growing,
},
@ -830,11 +851,36 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &SegmentManager{
meta: tt.fields.meta,
segments: tt.fields.segments,
meta: tt.fields.meta,
channelLock: lock.NewKeyLock[string](),
channel2Growing: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](),
channel2Sealed: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](),
}
for _, segmentID := range tt.fields.segments {
segmentInfo := tt.fields.meta.GetSegment(context.Background(), segmentID)
channel := tt.args.channel
if segmentInfo != nil {
channel = segmentInfo.GetInsertChannel()
}
if segmentInfo == nil || segmentInfo.GetState() == commonpb.SegmentState_Growing {
growing, _ := s.channel2Growing.GetOrInsert(channel, typeutil.NewUniqueSet())
growing.Insert(segmentID)
} else if segmentInfo.GetState() == commonpb.SegmentState_Sealed {
sealed, _ := s.channel2Sealed.GetOrInsert(channel, typeutil.NewUniqueSet())
sealed.Insert(segmentID)
}
}
s.DropSegmentsOfChannel(context.TODO(), tt.args.channel)
assert.ElementsMatch(t, tt.want, s.segments)
all := make([]int64, 0)
s.channel2Sealed.Range(func(_ string, segments typeutil.UniqueSet) bool {
all = append(all, segments.Collect()...)
return true
})
s.channel2Growing.Range(func(_ string, segments typeutil.UniqueSet) bool {
all = append(all, segments.Collect()...)
return true
})
assert.ElementsMatch(t, tt.want, all)
})
}
}

View File

@ -820,56 +820,10 @@ func TestServer_getSystemInfoMetrics(t *testing.T) {
}
}
type spySegmentManager struct {
spyCh chan struct{}
}
// AllocSegment allocates rows and record the allocation.
func (s *spySegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) {
return nil, nil
}
func (s *spySegmentManager) AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string) (*SegmentInfo, error) {
return nil, nil
}
func (s *spySegmentManager) allocSegmentForImport(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64, taskID int64) (*Allocation, error) {
return nil, nil
}
// DropSegment drops the segment from manager.
func (s *spySegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
}
// FlushImportSegments set importing segment state to Flushed.
func (s *spySegmentManager) FlushImportSegments(ctx context.Context, collectionID UniqueID, segmentIDs []UniqueID) error {
return nil
}
// SealAllSegments seals all segments of collection with collectionID and return sealed segments
func (s *spySegmentManager) SealAllSegments(ctx context.Context, collectionID UniqueID, segIDs []UniqueID) ([]UniqueID, error) {
return nil, nil
}
// GetFlushableSegments returns flushable segment ids
func (s *spySegmentManager) GetFlushableSegments(ctx context.Context, channel string, ts Timestamp) ([]UniqueID, error) {
return nil, nil
}
// ExpireAllocations notifies segment status to expire old allocations
func (s *spySegmentManager) ExpireAllocations(ctx context.Context, channel string, ts Timestamp) error {
return nil
}
// DropSegmentsOfChannel drops all segments in a channel
func (s *spySegmentManager) DropSegmentsOfChannel(ctx context.Context, channel string) {
s.spyCh <- struct{}{}
}
func TestDropVirtualChannel(t *testing.T) {
t.Run("normal DropVirtualChannel", func(t *testing.T) {
spyCh := make(chan struct{}, 1)
svr := newTestServer(t, WithSegmentManager(&spySegmentManager{spyCh: spyCh}))
segmentManager := NewMockManager(t)
svr := newTestServer(t, WithSegmentManager(segmentManager))
defer closeTestServer(t, svr)
@ -996,12 +950,11 @@ func TestDropVirtualChannel(t *testing.T) {
}
req.Segments = append(req.Segments, seg2Drop)
}
segmentManager.EXPECT().DropSegmentsOfChannel(mock.Anything, mock.Anything).Return()
resp, err := svr.DropVirtualChannel(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
<-spyCh
// resend
resp, err = svr.DropVirtualChannel(ctx, req)
assert.NoError(t, err)

View File

@ -113,19 +113,21 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
}
timeOfSeal, _ := tsoutil.ParseTS(ts)
sealedSegmentIDs := make([]int64, 0)
if !streamingutil.IsStreamingServiceEnabled() {
var err error
if sealedSegmentIDs, err = s.segmentManager.SealAllSegments(ctx, req.GetCollectionID(), req.GetSegmentIDs()); err != nil {
return &datapb.FlushResponse{
Status: merr.Status(errors.Wrapf(err, "failed to flush collection %d",
req.GetCollectionID())),
}, nil
}
}
sealedSegmentsIDDict := make(map[UniqueID]bool)
for _, sealedSegmentID := range sealedSegmentIDs {
sealedSegmentsIDDict[sealedSegmentID] = true
if !streamingutil.IsStreamingServiceEnabled() {
for _, channel := range coll.VChannelNames {
sealedSegmentIDs, err := s.segmentManager.SealAllSegments(ctx, channel, req.GetSegmentIDs())
if err != nil {
return &datapb.FlushResponse{
Status: merr.Status(errors.Wrapf(err, "failed to flush collection %d",
req.GetCollectionID())),
}, nil
}
for _, sealedSegmentID := range sealedSegmentIDs {
sealedSegmentsIDDict[sealedSegmentID] = true
}
}
}
segments := s.meta.GetSegmentsOfCollection(ctx, req.GetCollectionID())
@ -172,7 +174,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
log.Info("flush response with segments",
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64s("sealSegments", sealedSegmentIDs),
zap.Int64s("sealSegments", lo.Keys(sealedSegmentsIDDict)),
zap.Int("flushedSegmentsCount", len(flushSegmentIDs)),
zap.Time("timeOfSeal", timeOfSeal),
zap.Uint64("flushTs", ts),
@ -182,7 +184,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
Status: merr.Success(),
DbID: req.GetDbID(),
CollectionID: req.GetCollectionID(),
SegmentIDs: sealedSegmentIDs,
SegmentIDs: lo.Keys(sealedSegmentsIDDict),
TimeOfSeal: timeOfSeal.Unix(),
FlushSegmentIDs: flushSegmentIDs,
FlushTs: ts,
@ -540,10 +542,10 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
// Set segment state
if req.GetDropped() {
// segmentManager manages growing segments
s.segmentManager.DropSegment(ctx, req.GetSegmentID())
s.segmentManager.DropSegment(ctx, req.GetChannel(), req.GetSegmentID())
operators = append(operators, UpdateStatusOperator(req.GetSegmentID(), commonpb.SegmentState_Dropped))
} else if req.GetFlushed() {
s.segmentManager.DropSegment(ctx, req.GetSegmentID())
s.segmentManager.DropSegment(ctx, req.GetChannel(), req.GetSegmentID())
// set segment to SegmentState_Flushing
operators = append(operators, UpdateStatusOperator(req.GetSegmentID(), commonpb.SegmentState_Flushing))
}
@ -1493,10 +1495,7 @@ func (s *Server) handleDataNodeTtMsg(ctx context.Context, ttMsg *msgpb.DataNodeT
s.updateSegmentStatistics(ctx, segmentStats)
if err := s.segmentManager.ExpireAllocations(ctx, channel, ts); err != nil {
log.Warn("failed to expire allocations", zap.Error(err))
return err
}
s.segmentManager.ExpireAllocations(ctx, channel, ts)
flushableIDs, err := s.segmentManager.GetFlushableSegments(ctx, channel, ts)
if err != nil {

View File

@ -499,7 +499,7 @@ func (s *ServerSuite) TestFlush_NormalCase() {
s.testServer.cluster = mockCluster
schema := newTestSchema()
s.testServer.meta.AddCollection(&collectionInfo{ID: 0, Schema: schema, Partitions: []int64{}})
s.testServer.meta.AddCollection(&collectionInfo{ID: 0, Schema: schema, Partitions: []int64{}, VChannelNames: []string{"channel-1"}})
allocations, err := s.testServer.segmentManager.AllocSegment(context.TODO(), 0, 1, "channel-1", 1)
s.NoError(err)
s.EqualValues(1, len(allocations))