mirror of https://github.com/milvus-io/milvus.git
fix: [2.4]L0 brings its own start pos when syncing (#40899)
See also: #40388, #40207 pr: #40663 pr: #40664 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/41047/head
parent
5a6ed8161c
commit
4898b9d3c8
|
@ -60,12 +60,19 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
|
|||
Position: pack.checkpoint,
|
||||
})
|
||||
|
||||
startPos := lo.Map(pack.metacache.GetSegmentsBy(metacache.WithStartPosNotRecorded()), func(info *metacache.SegmentInfo, _ int) *datapb.SegmentStartPosition {
|
||||
return &datapb.SegmentStartPosition{
|
||||
SegmentID: info.SegmentID(),
|
||||
StartPosition: info.StartPosition(),
|
||||
}
|
||||
})
|
||||
startPos := lo.Map(pack.metacache.GetSegmentsBy(
|
||||
metacache.WithStartPosNotRecorded(), metacache.WithLevel(datapb.SegmentLevel_L1)),
|
||||
func(info *metacache.SegmentInfo, _ int) *datapb.SegmentStartPosition {
|
||||
return &datapb.SegmentStartPosition{
|
||||
SegmentID: info.SegmentID(),
|
||||
StartPosition: info.StartPosition(),
|
||||
}
|
||||
})
|
||||
// L0 brings its own start position
|
||||
if segment.Level() == datapb.SegmentLevel_L0 {
|
||||
startPos = append(startPos, &datapb.SegmentStartPosition{SegmentID: pack.segmentID, StartPosition: pack.StartPosition()})
|
||||
}
|
||||
|
||||
getBinlogNum := func(fBinlog *datapb.FieldBinlog) int { return len(fBinlog.GetBinlogs()) }
|
||||
log.Info("SaveBinlogPath",
|
||||
zap.Int64("SegmentID", pack.segmentID),
|
||||
|
|
|
@ -39,7 +39,7 @@ func (s *MetaWriterSuite) TestNormalSave() {
|
|||
bfs := metacache.NewBloomFilterSet()
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
task := NewSyncTask()
|
||||
|
@ -55,7 +55,7 @@ func (s *MetaWriterSuite) TestReturnError() {
|
|||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
task := NewSyncTask()
|
||||
task.WithMetaCache(s.metacache)
|
||||
err := s.writer.UpdateSync(task)
|
||||
|
@ -69,7 +69,7 @@ func (s *MetaWriterSuite) TestNormalSaveV2() {
|
|||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
task := NewSyncTaskV2()
|
||||
task.WithMetaCache(s.metacache)
|
||||
err := s.writer.UpdateSyncV2(task)
|
||||
|
@ -83,7 +83,7 @@ func (s *MetaWriterSuite) TestReturnErrorV2() {
|
|||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
task := NewSyncTaskV2()
|
||||
task.WithMetaCache(s.metacache)
|
||||
err := s.writer.UpdateSyncV2(task)
|
||||
|
|
|
@ -156,7 +156,7 @@ func (s *SyncManagerSuite) TestSubmit() {
|
|||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
|
||||
manager, err := NewSyncManager(s.chunkManager, s.allocator)
|
||||
|
@ -199,7 +199,7 @@ func (s *SyncManagerSuite) TestCompacted() {
|
|||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
|
||||
manager, err := NewSyncManager(s.chunkManager, s.allocator)
|
||||
|
|
|
@ -185,7 +185,7 @@ func (s *SyncTaskSuite) TestRunNormal() {
|
|||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
seg.GetBloomFilterSet().Roll()
|
||||
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
|
||||
s.Run("without_data", func() {
|
||||
|
@ -269,7 +269,7 @@ func (s *SyncTaskSuite) TestRunL0Segment() {
|
|||
bfs := metacache.NewBloomFilterSet()
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{Level: datapb.SegmentLevel_L0}, bfs)
|
||||
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
|
||||
s.Run("pure_delete_l0_flush", func() {
|
||||
|
@ -309,7 +309,7 @@ func (s *SyncTaskSuite) TestRunError() {
|
|||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, metacache.NewBloomFilterSet())
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
|
||||
s.Run("allocate_id_fail", func() {
|
||||
mockAllocator := allocator.NewMockAllocator(s.T())
|
||||
|
|
Loading…
Reference in New Issue