fix: Fix panic due to empty candidate import segments (#35673)

issue: https://github.com/milvus-io/milvus/issues/35662

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/35743/head
yihao.dai 2024-08-27 17:08:59 +08:00 committed by GitHub
parent 4d2f96c760
commit 9868fe4e6c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 21 additions and 5 deletions

View File

@ -220,7 +220,10 @@ func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []sy
continue
}
partitionID := t.GetPartitionIDs()[partitionIdx]
segmentID := PickSegment(t.req.GetRequestSegments(), channel, partitionID)
segmentID, err := PickSegment(t.req.GetRequestSegments(), channel, partitionID)
if err != nil {
return nil, nil, err
}
syncTask, err := NewSyncTask(t.ctx, t.allocator, t.metaCaches, t.req.GetTs(),
segmentID, partitionID, t.GetCollectionID(), channel, data, nil)
if err != nil {

View File

@ -216,7 +216,10 @@ func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future
continue
}
partitionID := t.GetPartitionIDs()[0]
segmentID := PickSegment(t.req.GetRequestSegments(), channel, partitionID)
segmentID, err := PickSegment(t.req.GetRequestSegments(), channel, partitionID)
if err != nil {
return nil, nil, err
}
syncTask, err := NewSyncTask(t.ctx, t.allocator, t.metaCaches, t.req.GetTs(),
segmentID, partitionID, t.GetCollectionID(), channel, nil, data)
if err != nil {

View File

@ -111,13 +111,18 @@ func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache
}, nil
}
func PickSegment(segments []*datapb.ImportRequestSegment, vchannel string, partitionID int64) int64 {
func PickSegment(segments []*datapb.ImportRequestSegment, vchannel string, partitionID int64) (int64, error) {
candidates := lo.Filter(segments, func(info *datapb.ImportRequestSegment, _ int) bool {
return info.GetVchannel() == vchannel && info.GetPartitionID() == partitionID
})
if len(candidates) == 0 {
return 0, fmt.Errorf("no candidate segments found for channel %s and partition %d",
vchannel, partitionID)
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
return candidates[r.Intn(len(candidates))].GetSegmentID()
return candidates[r.Intn(len(candidates))].GetSegmentID(), nil
}
func CheckRowsEqual(schema *schemapb.CollectionSchema, data *storage.InsertData) error {

View File

@ -153,7 +153,8 @@ func Test_PickSegment(t *testing.T) {
batchSize := 1 * 1024 * 1024
for totalSize > 0 {
picked := PickSegment(task.req.GetRequestSegments(), vchannel, partitionID)
picked, err := PickSegment(task.req.GetRequestSegments(), vchannel, partitionID)
assert.NoError(t, err)
importedSize[picked] += batchSize
totalSize -= batchSize
}
@ -167,4 +168,8 @@ func Test_PickSegment(t *testing.T) {
fn(importedSize[int64(101)])
fn(importedSize[int64(102)])
fn(importedSize[int64(103)])
// test no candidate segments found
_, err := PickSegment(task.req.GetRequestSegments(), "ch-2", 20)
assert.Error(t, err)
}