mirror of https://github.com/milvus-io/milvus.git
Fix duplicated segment id in bulkinsert task state (#27915)
Signed-off-by: yhmo <yihua.mo@zilliz.com>pull/27897/head
parent
f212158d61
commit
e6d4cde397
|
@ -586,12 +586,7 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im
|
|||
// Meta persist should be done before memory objs change.
|
||||
toPersistImportTaskInfo = cloneImportTaskInfo(v)
|
||||
toPersistImportTaskInfo.State.StateCode = ir.GetState()
|
||||
// if is started state, append the new created segment id
|
||||
if v.GetState().GetStateCode() == commonpb.ImportState_ImportStarted {
|
||||
toPersistImportTaskInfo.State.Segments = append(toPersistImportTaskInfo.State.Segments, ir.GetSegments()...)
|
||||
} else {
|
||||
toPersistImportTaskInfo.State.Segments = ir.GetSegments()
|
||||
}
|
||||
toPersistImportTaskInfo.State.Segments = mergeArray(toPersistImportTaskInfo.State.Segments, ir.GetSegments())
|
||||
toPersistImportTaskInfo.State.RowCount = ir.GetRowCount()
|
||||
toPersistImportTaskInfo.State.RowIds = ir.GetAutoIds()
|
||||
for _, kv := range ir.GetInfos() {
|
||||
|
@ -1087,3 +1082,20 @@ func cloneImportTaskInfo(taskInfo *datapb.ImportTaskInfo) *datapb.ImportTaskInfo
|
|||
}
|
||||
return cloned
|
||||
}
|
||||
|
||||
func mergeArray(arr1 []int64, arr2 []int64) []int64 {
|
||||
reduce := make(map[int64]int)
|
||||
doReduce := func(arr []int64) {
|
||||
for _, v := range arr {
|
||||
reduce[v] = 1
|
||||
}
|
||||
}
|
||||
doReduce(arr1)
|
||||
doReduce(arr2)
|
||||
|
||||
result := make([]int64, 0, len(reduce))
|
||||
for k := range reduce {
|
||||
result = append(result, k)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package rootcoord
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -1101,3 +1102,33 @@ func TestImportManager_isRowbased(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
assert.False(t, rb)
|
||||
}
|
||||
|
||||
func TestImportManager_mergeArray(t *testing.T) {
|
||||
converter := func(arr []int64) []int {
|
||||
res := make([]int, 0, len(arr))
|
||||
for _, v := range arr {
|
||||
res = append(res, int(v))
|
||||
}
|
||||
sort.Ints(res)
|
||||
return res
|
||||
}
|
||||
|
||||
arr1 := []int64{1, 2, 3}
|
||||
arr2 := []int64{2, 4, 6}
|
||||
res := converter(mergeArray(arr1, arr2))
|
||||
assert.Equal(t, []int{1, 2, 3, 4, 6}, res)
|
||||
|
||||
res = converter(mergeArray(arr1, nil))
|
||||
assert.Equal(t, []int{1, 2, 3}, res)
|
||||
|
||||
res = converter(mergeArray(nil, arr2))
|
||||
assert.Equal(t, []int{2, 4, 6}, res)
|
||||
|
||||
res = converter(mergeArray(nil, nil))
|
||||
assert.Equal(t, []int{}, res)
|
||||
|
||||
arr1 = []int64{1, 2, 3}
|
||||
arr2 = []int64{6, 5, 4}
|
||||
res = converter(mergeArray(arr1, arr2))
|
||||
assert.Equal(t, []int{1, 2, 3, 4, 5, 6}, res)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue