fix: reduce didn't handle offset without limit and reduceStopForBest correctly (#32089)

fix https://github.com/milvus-io/milvus/issues/32059

this pr fix two issues:

offset is not handled correctly without specify a limit
reduceStopForBest doesn't guarantee to return limit result even if there
are more result when there is small segment

Signed-off-by: xiaofanluan <xiaofan.luan@zilliz.com>
pull/32119/head
Xiaofan 2024-04-10 01:01:18 -07:00 committed by GitHub
parent df208d538c
commit dbab9c5096
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 78 additions and 15 deletions

View File

@ -602,20 +602,25 @@ func reduceRetrieveResults(ctx context.Context, retrieveResults []*internalpb.Re
idSet := make(map[interface{}]struct{})
cursors := make([]int64, len(validRetrieveResults))
retrieveLimit := typeutil.Unlimited
if queryParams != nil && queryParams.limit != typeutil.Unlimited {
retrieveLimit = queryParams.limit + queryParams.offset
if !queryParams.reduceStopForBest {
loopEnd = int(queryParams.limit)
}
if queryParams.offset > 0 {
for i := int64(0); i < queryParams.offset; i++ {
sel, drainOneResult := typeutil.SelectMinPK(validRetrieveResults, cursors)
if sel == -1 || (queryParams.reduceStopForBest && drainOneResult) {
return ret, nil
}
cursors[sel]++
}
// handle offset
if queryParams != nil && queryParams.offset > 0 {
for i := int64(0); i < queryParams.offset; i++ {
sel, drainOneResult := typeutil.SelectMinPK(retrieveLimit, validRetrieveResults, cursors)
if sel == -1 || (queryParams.reduceStopForBest && drainOneResult) {
return ret, nil
}
cursors[sel]++
}
}
reduceStopForBest := false
if queryParams != nil {
reduceStopForBest = queryParams.reduceStopForBest
@ -624,7 +629,7 @@ func reduceRetrieveResults(ctx context.Context, retrieveResults []*internalpb.Re
var retSize int64
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
for j := 0; j < loopEnd; j++ {
sel, drainOneResult := typeutil.SelectMinPK(validRetrieveResults, cursors)
sel, drainOneResult := typeutil.SelectMinPK(retrieveLimit, validRetrieveResults, cursors)
if sel == -1 || (reduceStopForBest && drainOneResult) {
break
}

View File

@ -594,16 +594,45 @@ func TestTaskQuery_functions(t *testing.T) {
assert.InDeltaSlice(t, resultFloat[0:(len)*Dim], result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
})
t.Run("test stop reduce for best for limit and offset", func(t *testing.T) {
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: 1, offset: 1, reduceStopForBest: true})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))
assert.Equal(t, []int64{11, 22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
})
t.Run("test stop reduce for best for limit and offset", func(t *testing.T) {
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: 2, offset: 1, reduceStopForBest: true})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))
// we should get 6 result back in total, but only get 4, which means all the result should actually be part of result
assert.Equal(t, []int64{11, 22, 22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
})
t.Run("test stop reduce for best for unlimited set", func(t *testing.T) {
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: typeutil.Unlimited, reduceStopForBest: true})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))
assert.Equal(t, []int64{11, 11, 22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
assert.Equal(t, []int64{11, 11, 22, 22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
len := len(result.GetFieldsData()[0].GetScalars().GetLongData().Data)
assert.InDeltaSlice(t, resultFloat[0:(len)*Dim], result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
})
t.Run("test stop reduce for best for unlimited set amd pffset", func(t *testing.T) {
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: typeutil.Unlimited, offset: 3, reduceStopForBest: true})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))
assert.Equal(t, []int64{22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
})
})
})
}

View File

@ -408,7 +408,7 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
var retSize int64
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
for j := 0; j < loopEnd; {
sel, drainOneResult := typeutil.SelectMinPK(validRetrieveResults, cursors)
sel, drainOneResult := typeutil.SelectMinPK(param.limit, validRetrieveResults, cursors)
if sel == -1 || (param.mergeStopForBest && drainOneResult) {
break
}
@ -516,7 +516,7 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
var retSize int64
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
for j := 0; j < loopEnd; j++ {
sel, drainOneResult := typeutil.SelectMinPK(validRetrieveResults, cursors)
sel, drainOneResult := typeutil.SelectMinPK(param.limit, validRetrieveResults, cursors)
if sel == -1 || (param.mergeStopForBest && drainOneResult) {
break
}

View File

@ -524,10 +524,10 @@ func (suite *ResultSuite) TestResult_MergeStopForBestResult() {
NewMergeParam(typeutil.Unlimited, make([]int64, 0), nil, true))
suite.NoError(err)
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal([]int64{0, 1, 2, 3, 4}, result.GetIds().GetIntId().GetData())
suite.Equal([]int64{0, 1, 2, 3, 4, 6}, result.GetIds().GetIntId().GetData())
// here, we can only get best result from 0 to 4 without 6, because we can never know whether there is
// one potential 5 in following result1
suite.Equal([]int64{11, 22, 11, 22, 33}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
suite.Equal([]int64{11, 22, 11, 22, 33, 33}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
suite.InDeltaSlice([]float32{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 11, 22, 33, 44, 11, 22, 33, 44},
result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
})
@ -563,6 +563,34 @@ func (suite *ResultSuite) TestResult_MergeStopForBestResult() {
suite.InDeltaSlice([]float32{1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 5, 6, 7, 8, 11, 22, 33, 44},
result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
})
suite.Run("test stop internal merge for best with early termination", func() {
result1 := &internalpb.RetrieveResults{
Ids: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{0, 4, 7},
},
},
},
FieldsData: fieldDataArray1,
}
result2 := &internalpb.RetrieveResults{
Ids: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{2},
},
},
},
FieldsData: fieldDataArray2,
}
result, err := MergeInternalRetrieveResult(context.Background(), []*internalpb.RetrieveResults{result1, result2},
NewMergeParam(3, make([]int64, 0), nil, true))
suite.NoError(err)
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal([]int64{0, 2, 4, 7}, result.GetIds().GetIntId().GetData())
})
}
func (suite *ResultSuite) TestResult_ReduceSearchResultData() {

View File

@ -1324,7 +1324,7 @@ type ResultWithID interface {
}
// SelectMinPK select the index of the minPK in results T of the cursors.
func SelectMinPK[T ResultWithID](results []T, cursors []int64) (int, bool) {
func SelectMinPK[T ResultWithID](limit int64, results []T, cursors []int64) (int, bool) {
var (
sel = -1
drainResult = false
@ -1334,7 +1334,8 @@ func SelectMinPK[T ResultWithID](results []T, cursors []int64) (int, bool) {
minStrPK string
)
for i, cursor := range cursors {
if int(cursor) >= GetSizeOfIDs(results[i].GetIds()) {
// if result size < limit, this means we should ignore the result from this segment
if int(cursor) >= GetSizeOfIDs(results[i].GetIds()) && (GetSizeOfIDs(results[i].GetIds()) == int(limit)) {
drainResult = true
continue
}