mirror of https://github.com/milvus-io/milvus.git
fix: load operation when segment is on releasing (#31340)
issue: #30857 --------- Signed-off-by: chyezh <chyezh@outlook.com>pull/33836/head
parent
86a36b105a
commit
9b69601dfb
|
@ -283,6 +283,10 @@ type SegmentManager interface {
|
|||
Remove(ctx context.Context, segmentID typeutil.UniqueID, scope querypb.DataScope) (int, int)
|
||||
RemoveBy(ctx context.Context, filters ...SegmentFilter) (int, int)
|
||||
Clear(ctx context.Context)
|
||||
|
||||
// Deprecated: quick fix critical issue: #30857
|
||||
// TODO: All Segment assigned to querynode should be managed by SegmentManager, including loading or releasing to perform a transaction.
|
||||
Exist(segmentID typeutil.UniqueID, typ SegmentType) bool
|
||||
}
|
||||
|
||||
var _ SegmentManager = (*segmentManager)(nil)
|
||||
|
@ -296,14 +300,18 @@ type segmentManager struct {
|
|||
|
||||
// releaseCallback is the callback function when a segment is released.
|
||||
releaseCallback func(s Segment)
|
||||
|
||||
growingOnReleasingSegments typeutil.UniqueSet
|
||||
sealedOnReleasingSegments typeutil.UniqueSet
|
||||
}
|
||||
|
||||
func NewSegmentManager() *segmentManager {
|
||||
mgr := &segmentManager{
|
||||
growingSegments: make(map[int64]Segment),
|
||||
sealedSegments: make(map[int64]Segment),
|
||||
return &segmentManager{
|
||||
growingSegments: make(map[int64]Segment),
|
||||
sealedSegments: make(map[int64]Segment),
|
||||
growingOnReleasingSegments: typeutil.NewUniqueSet(),
|
||||
sealedOnReleasingSegments: typeutil.NewUniqueSet(),
|
||||
}
|
||||
return mgr
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, segments ...Segment) {
|
||||
|
@ -354,7 +362,7 @@ func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, seg
|
|||
if len(replacedSegment) > 0 {
|
||||
go func() {
|
||||
for _, segment := range replacedSegment {
|
||||
mgr.remove(ctx, segment)
|
||||
mgr.release(ctx, segment)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -374,6 +382,29 @@ func (mgr *segmentManager) UpdateBy(action SegmentAction, filters ...SegmentFilt
|
|||
return updated
|
||||
}
|
||||
|
||||
// Deprecated:
|
||||
// TODO: All Segment assigned to querynode should be managed by SegmentManager, including loading or releasing to perform a transaction.
|
||||
func (mgr *segmentManager) Exist(segmentID typeutil.UniqueID, typ SegmentType) bool {
|
||||
mgr.mu.RLock()
|
||||
defer mgr.mu.RUnlock()
|
||||
switch typ {
|
||||
case SegmentTypeGrowing:
|
||||
if _, ok := mgr.growingSegments[segmentID]; ok {
|
||||
return true
|
||||
} else if mgr.growingOnReleasingSegments.Contain(segmentID) {
|
||||
return true
|
||||
}
|
||||
case SegmentTypeSealed:
|
||||
if _, ok := mgr.sealedSegments[segmentID]; ok {
|
||||
return true
|
||||
} else if mgr.sealedOnReleasingSegments.Contain(segmentID) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) Get(segmentID typeutil.UniqueID) Segment {
|
||||
mgr.mu.RLock()
|
||||
defer mgr.mu.RUnlock()
|
||||
|
@ -639,11 +670,11 @@ func (mgr *segmentManager) Remove(ctx context.Context, segmentID typeutil.Unique
|
|||
mgr.mu.Unlock()
|
||||
|
||||
if growing != nil {
|
||||
mgr.remove(ctx, growing)
|
||||
mgr.release(ctx, growing)
|
||||
}
|
||||
|
||||
if sealed != nil {
|
||||
mgr.remove(ctx, sealed)
|
||||
mgr.release(ctx, sealed)
|
||||
}
|
||||
|
||||
return removeGrowing, removeSealed
|
||||
|
@ -655,6 +686,7 @@ func (mgr *segmentManager) removeSegmentWithType(typ SegmentType, segmentID type
|
|||
s, ok := mgr.growingSegments[segmentID]
|
||||
if ok {
|
||||
delete(mgr.growingSegments, segmentID)
|
||||
mgr.growingOnReleasingSegments.Insert(segmentID)
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -662,6 +694,7 @@ func (mgr *segmentManager) removeSegmentWithType(typ SegmentType, segmentID type
|
|||
s, ok := mgr.sealedSegments[segmentID]
|
||||
if ok {
|
||||
delete(mgr.sealedSegments, segmentID)
|
||||
mgr.sealedOnReleasingSegments.Insert(segmentID)
|
||||
return s
|
||||
}
|
||||
default:
|
||||
|
@ -694,26 +727,34 @@ func (mgr *segmentManager) RemoveBy(ctx context.Context, filters ...SegmentFilte
|
|||
mgr.mu.Unlock()
|
||||
|
||||
for _, s := range removeSegments {
|
||||
mgr.remove(ctx, s)
|
||||
mgr.release(ctx, s)
|
||||
}
|
||||
|
||||
return removeGrowing, removeSealed
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) Clear(ctx context.Context) {
|
||||
mgr.mu.Lock()
|
||||
defer mgr.mu.Unlock()
|
||||
|
||||
for id, segment := range mgr.growingSegments {
|
||||
delete(mgr.growingSegments, id)
|
||||
mgr.remove(ctx, segment)
|
||||
for id := range mgr.growingSegments {
|
||||
mgr.growingOnReleasingSegments.Insert(id)
|
||||
}
|
||||
growingWaitForRelease := mgr.growingSegments
|
||||
mgr.growingSegments = make(map[int64]Segment)
|
||||
|
||||
for id, segment := range mgr.sealedSegments {
|
||||
delete(mgr.sealedSegments, id)
|
||||
mgr.remove(ctx, segment)
|
||||
for id := range mgr.sealedSegments {
|
||||
mgr.sealedOnReleasingSegments.Insert(id)
|
||||
}
|
||||
sealedWaitForRelease := mgr.sealedSegments
|
||||
mgr.sealedSegments = make(map[int64]Segment)
|
||||
mgr.updateMetric()
|
||||
mgr.mu.Unlock()
|
||||
|
||||
for _, segment := range growingWaitForRelease {
|
||||
mgr.release(ctx, segment)
|
||||
}
|
||||
for _, segment := range sealedWaitForRelease {
|
||||
mgr.release(ctx, segment)
|
||||
}
|
||||
}
|
||||
|
||||
// registerReleaseCallback registers the callback function when a segment is released.
|
||||
|
@ -741,7 +782,7 @@ func (mgr *segmentManager) updateMetric() {
|
|||
metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(partiations.Len()))
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) remove(ctx context.Context, segment Segment) bool {
|
||||
func (mgr *segmentManager) release(ctx context.Context, segment Segment) {
|
||||
if mgr.releaseCallback != nil {
|
||||
mgr.releaseCallback(segment)
|
||||
log.Ctx(ctx).Info("remove segment from cache", zap.Int64("segmentID", segment.ID()))
|
||||
|
@ -757,5 +798,13 @@ func (mgr *segmentManager) remove(ctx context.Context, segment Segment) bool {
|
|||
segment.Level().String(),
|
||||
).Dec()
|
||||
|
||||
return true
|
||||
mgr.mu.Lock()
|
||||
defer mgr.mu.Unlock()
|
||||
|
||||
switch segment.Type() {
|
||||
case SegmentTypeGrowing:
|
||||
mgr.growingOnReleasingSegments.Remove(segment.ID())
|
||||
case SegmentTypeSealed:
|
||||
mgr.sealedOnReleasingSegments.Remove(segment.ID())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ func (s *ManagerSuite) SetupSuite() {
|
|||
|
||||
func (s *ManagerSuite) SetupTest() {
|
||||
s.mgr = NewSegmentManager()
|
||||
s.segments = nil
|
||||
|
||||
for i, id := range s.segmentIDs {
|
||||
schema := GenTestCollectionSchema("manager-suite", schemapb.DataType_Int64, true)
|
||||
|
@ -66,6 +67,19 @@ func (s *ManagerSuite) SetupTest() {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *ManagerSuite) TestExist() {
|
||||
for _, segment := range s.segments {
|
||||
s.True(s.mgr.Exist(segment.ID(), segment.Type()))
|
||||
s.mgr.removeSegmentWithType(segment.Type(), segment.ID())
|
||||
s.True(s.mgr.Exist(segment.ID(), segment.Type()))
|
||||
s.mgr.release(context.Background(), segment)
|
||||
s.False(s.mgr.Exist(segment.ID(), segment.Type()))
|
||||
}
|
||||
|
||||
s.False(s.mgr.Exist(10086, SegmentTypeGrowing))
|
||||
s.False(s.mgr.Exist(10086, SegmentTypeSealed))
|
||||
}
|
||||
|
||||
func (s *ManagerSuite) TestGetBy() {
|
||||
for i, partitionID := range s.partitionIDs {
|
||||
segments := s.mgr.GetBy(WithPartition(partitionID))
|
||||
|
|
|
@ -99,6 +99,49 @@ func (_c *MockSegmentManager_Empty_Call) RunAndReturn(run func() bool) *MockSegm
|
|||
return _c
|
||||
}
|
||||
|
||||
// Exist provides a mock function with given fields: segmentID, typ
|
||||
func (_m *MockSegmentManager) Exist(segmentID int64, typ commonpb.SegmentState) bool {
|
||||
ret := _m.Called(segmentID, typ)
|
||||
|
||||
var r0 bool
|
||||
if rf, ok := ret.Get(0).(func(int64, commonpb.SegmentState) bool); ok {
|
||||
r0 = rf(segmentID, typ)
|
||||
} else {
|
||||
r0 = ret.Get(0).(bool)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// MockSegmentManager_Exist_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Exist'
|
||||
type MockSegmentManager_Exist_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// Exist is a helper method to define mock.On call
|
||||
// - segmentID int64
|
||||
// - typ commonpb.SegmentState
|
||||
func (_e *MockSegmentManager_Expecter) Exist(segmentID interface{}, typ interface{}) *MockSegmentManager_Exist_Call {
|
||||
return &MockSegmentManager_Exist_Call{Call: _e.mock.On("Exist", segmentID, typ)}
|
||||
}
|
||||
|
||||
func (_c *MockSegmentManager_Exist_Call) Run(run func(segmentID int64, typ commonpb.SegmentState)) *MockSegmentManager_Exist_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(int64), args[1].(commonpb.SegmentState))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSegmentManager_Exist_Call) Return(_a0 bool) *MockSegmentManager_Exist_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSegmentManager_Exist_Call) RunAndReturn(run func(int64, commonpb.SegmentState) bool) *MockSegmentManager_Exist_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// Get provides a mock function with given fields: segmentID
|
||||
func (_m *MockSegmentManager) Get(segmentID int64) Segment {
|
||||
ret := _m.Called(segmentID)
|
||||
|
|
|
@ -761,8 +761,8 @@ func (loader *segmentLoader) prepare(ctx context.Context, segmentType SegmentTyp
|
|||
// filter out loaded & loading segments
|
||||
infos := make([]*querypb.SegmentLoadInfo, 0, len(segments))
|
||||
for _, segment := range segments {
|
||||
// Not loaded & loading
|
||||
if len(loader.manager.Segment.GetBy(WithType(segmentType), WithID(segment.GetSegmentID()))) == 0 &&
|
||||
// Not loaded & loading & releasing.
|
||||
if !loader.manager.Segment.Exist(segment.GetSegmentID(), segmentType) &&
|
||||
!loader.loadingSegments.Contain(segment.GetSegmentID()) {
|
||||
infos = append(infos, segment)
|
||||
loader.loadingSegments.Insert(segment.GetSegmentID(), newLoadResult())
|
||||
|
|
|
@ -773,7 +773,7 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
|
|||
idx := 0
|
||||
|
||||
var infos []*querypb.SegmentLoadInfo
|
||||
suite.segmentManager.EXPECT().GetBy(mock.Anything, mock.Anything).Return(nil)
|
||||
suite.segmentManager.EXPECT().Exist(mock.Anything, mock.Anything).Return(false)
|
||||
suite.segmentManager.EXPECT().GetWithType(suite.segmentID, SegmentTypeSealed).RunAndReturn(func(segmentID int64, segmentType commonpb.SegmentState) Segment {
|
||||
defer func() { idx++ }()
|
||||
if idx == 0 {
|
||||
|
@ -802,7 +802,7 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
|
|||
|
||||
var idx int
|
||||
var infos []*querypb.SegmentLoadInfo
|
||||
suite.segmentManager.EXPECT().GetBy(mock.Anything, mock.Anything).Return(nil)
|
||||
suite.segmentManager.EXPECT().Exist(mock.Anything, mock.Anything).Return(false)
|
||||
suite.segmentManager.EXPECT().GetWithType(suite.segmentID, SegmentTypeSealed).RunAndReturn(func(segmentID int64, segmentType commonpb.SegmentState) Segment {
|
||||
defer func() { idx++ }()
|
||||
if idx == 0 {
|
||||
|
@ -829,7 +829,7 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
|
|||
suite.Run("wait_timeout", func() {
|
||||
suite.SetupTest()
|
||||
|
||||
suite.segmentManager.EXPECT().GetBy(mock.Anything, mock.Anything).Return(nil)
|
||||
suite.segmentManager.EXPECT().Exist(mock.Anything, mock.Anything).Return(false)
|
||||
suite.segmentManager.EXPECT().GetWithType(suite.segmentID, SegmentTypeSealed).RunAndReturn(func(segmentID int64, segmentType commonpb.SegmentState) Segment {
|
||||
return nil
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue