mirror of https://github.com/milvus-io/milvus.git
fix: Correct the last empty l0 views (#31198)
See also: #31191 --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/31266/head
parent
126bb52f3d
commit
1fafc72077
|
@ -61,7 +61,8 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() {
|
|||
expectedSegID := seg1.ID
|
||||
|
||||
s.Require().Equal(1, len(latestL0Segments))
|
||||
levelZeroView := viewManager.getChangedLevelZeroViews(1, latestL0Segments)
|
||||
needRefresh, levelZeroView := viewManager.getChangedLevelZeroViews(1, latestL0Segments)
|
||||
s.True(needRefresh)
|
||||
s.Require().Equal(1, len(levelZeroView))
|
||||
cView, ok := levelZeroView[0].(*LevelZeroSegmentsView)
|
||||
s.True(ok)
|
||||
|
@ -107,7 +108,8 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() {
|
|||
|
||||
latestL0Segments := GetViewsByInfo(levelZeroSegments...)
|
||||
s.Require().NotEmpty(latestL0Segments)
|
||||
levelZeroView := viewManager.getChangedLevelZeroViews(1, latestL0Segments)
|
||||
needRefresh, levelZeroView := viewManager.getChangedLevelZeroViews(1, latestL0Segments)
|
||||
s.Require().True(needRefresh)
|
||||
s.Require().Equal(1, len(levelZeroView))
|
||||
cView, ok := levelZeroView[0].(*LevelZeroSegmentsView)
|
||||
s.True(ok)
|
||||
|
|
|
@ -169,48 +169,54 @@ func (m *CompactionViewManager) Check(ctx context.Context) (events map[Compactio
|
|||
}
|
||||
|
||||
func (m *CompactionViewManager) RefreshLevelZeroViews(latestCollSegs map[int64][]*SegmentInfo) []CompactionView {
|
||||
var refreshedL0Views []CompactionView
|
||||
var allRefreshedL0Veiws []CompactionView
|
||||
for collID, segments := range latestCollSegs {
|
||||
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
|
||||
return info.GetLevel() == datapb.SegmentLevel_L0
|
||||
})
|
||||
|
||||
latestL0Segments := GetViewsByInfo(levelZeroSegments...)
|
||||
changedL0Views := m.getChangedLevelZeroViews(collID, latestL0Segments)
|
||||
if len(changedL0Views) == 0 {
|
||||
continue
|
||||
needRefresh, collRefreshedViews := m.getChangedLevelZeroViews(collID, latestL0Segments)
|
||||
if needRefresh {
|
||||
log.Info("Refresh compaction level zero views",
|
||||
zap.Int64("collectionID", collID),
|
||||
zap.Strings("views", lo.Map(collRefreshedViews, func(view CompactionView, _ int) string {
|
||||
return view.String()
|
||||
})))
|
||||
|
||||
m.view.collections[collID] = latestL0Segments
|
||||
}
|
||||
|
||||
log.Info("Refresh compaction level zero views",
|
||||
zap.Int64("collectionID", collID),
|
||||
zap.Strings("views", lo.Map(changedL0Views, func(view CompactionView, _ int) string {
|
||||
return view.String()
|
||||
})))
|
||||
|
||||
m.view.collections[collID] = latestL0Segments
|
||||
refreshedL0Views = append(refreshedL0Views, changedL0Views...)
|
||||
if len(collRefreshedViews) > 0 {
|
||||
allRefreshedL0Veiws = append(allRefreshedL0Veiws, collRefreshedViews...)
|
||||
}
|
||||
}
|
||||
|
||||
return refreshedL0Views
|
||||
return allRefreshedL0Veiws
|
||||
}
|
||||
|
||||
func (m *CompactionViewManager) getChangedLevelZeroViews(collID UniqueID, LevelZeroViews []*SegmentView) []CompactionView {
|
||||
latestViews := m.groupL0ViewsByPartChan(collID, LevelZeroViews)
|
||||
func (m *CompactionViewManager) getChangedLevelZeroViews(collID UniqueID, LevelZeroViews []*SegmentView) (needRefresh bool, refreshed []CompactionView) {
|
||||
cachedViews := m.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
|
||||
return v.Level == datapb.SegmentLevel_L0
|
||||
})
|
||||
|
||||
var signals []CompactionView
|
||||
if len(LevelZeroViews) == 0 && len(cachedViews) != 0 {
|
||||
needRefresh = true
|
||||
return
|
||||
}
|
||||
|
||||
latestViews := m.groupL0ViewsByPartChan(collID, LevelZeroViews)
|
||||
for _, latestView := range latestViews {
|
||||
views := lo.Filter(cachedViews, func(v *SegmentView, _ int) bool {
|
||||
return v.label.Equal(latestView.GetGroupLabel())
|
||||
})
|
||||
|
||||
if !latestView.Equal(views) {
|
||||
signals = append(signals, latestView)
|
||||
refreshed = append(refreshed, latestView)
|
||||
needRefresh = true
|
||||
}
|
||||
}
|
||||
return signals
|
||||
return
|
||||
}
|
||||
|
||||
func (m *CompactionViewManager) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) map[string]*LevelZeroSegmentsView {
|
||||
|
|
|
@ -233,7 +233,7 @@ func (s *CompactionViewManagerSuite) TestNotifyTrigger() {
|
|||
func (s *CompactionViewManagerSuite) TestCheck() {
|
||||
// nothing in the view before the test
|
||||
ctx := context.Background()
|
||||
s.Empty(s.m.view.collections)
|
||||
s.Require().Empty(s.m.view.collections)
|
||||
events := s.m.Check(ctx)
|
||||
|
||||
s.m.viewGuard.Lock()
|
||||
|
@ -265,6 +265,48 @@ func (s *CompactionViewManagerSuite) TestCheck() {
|
|||
emptyEvents = s.m.Check(ctx)
|
||||
s.Empty(emptyEvents)
|
||||
s.Empty(s.m.view.collections)
|
||||
|
||||
s.Run("check collection for zero l0 segments", func() {
|
||||
s.SetupTest()
|
||||
ctx := context.Background()
|
||||
s.Require().Empty(s.m.view.collections)
|
||||
events := s.m.Check(ctx)
|
||||
|
||||
s.m.viewGuard.Lock()
|
||||
views := s.m.view.GetSegmentViewBy(s.testLabel.CollectionID, nil)
|
||||
s.m.viewGuard.Unlock()
|
||||
s.Require().Equal(4, len(views))
|
||||
for _, view := range views {
|
||||
s.EqualValues(s.testLabel, view.label)
|
||||
s.Equal(datapb.SegmentLevel_L0, view.Level)
|
||||
s.Equal(commonpb.SegmentState_Flushed, view.State)
|
||||
log.Info("String", zap.String("segment", view.String()))
|
||||
log.Info("LevelZeroString", zap.String("segment", view.LevelZeroString()))
|
||||
}
|
||||
|
||||
s.NotEmpty(events)
|
||||
s.Equal(1, len(events))
|
||||
refreshed, ok := events[TriggerTypeLevelZeroViewChange]
|
||||
s.Require().True(ok)
|
||||
s.Equal(1, len(refreshed))
|
||||
|
||||
// All l0 segments are dropped in the collection
|
||||
// and there're still some L1 segments
|
||||
s.m.meta.Lock()
|
||||
s.m.meta.segments.segments = map[int64]*SegmentInfo{
|
||||
2000: genTestSegmentInfo(s.testLabel, 2000, datapb.SegmentLevel_L0, commonpb.SegmentState_Dropped),
|
||||
2001: genTestSegmentInfo(s.testLabel, 2001, datapb.SegmentLevel_L0, commonpb.SegmentState_Dropped),
|
||||
2003: genTestSegmentInfo(s.testLabel, 2003, datapb.SegmentLevel_L0, commonpb.SegmentState_Dropped),
|
||||
3000: genTestSegmentInfo(s.testLabel, 2003, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed),
|
||||
}
|
||||
s.m.meta.Unlock()
|
||||
events = s.m.Check(ctx)
|
||||
s.Empty(events)
|
||||
s.m.viewGuard.Lock()
|
||||
views = s.m.view.GetSegmentViewBy(s.testLabel.CollectionID, nil)
|
||||
s.m.viewGuard.Unlock()
|
||||
s.Equal(0, len(views))
|
||||
})
|
||||
}
|
||||
|
||||
func genTestSegmentInfo(label *CompactionGroupLabel, ID UniqueID, level datapb.SegmentLevel, state commonpb.SegmentState) *SegmentInfo {
|
||||
|
|
Loading…
Reference in New Issue