fix: Return record with largest timestamp for entires with same PK (#33936)

See also #33883

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/33947/head
congqixia 2024-06-18 15:55:59 +08:00 committed by GitHub
parent 5772123ca2
commit 3fdaae8792
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 248 additions and 59 deletions

View File

@ -116,9 +116,9 @@ class OffsetOrderedMap : public OffsetMap {
bool false_filtered_out) const override {
std::shared_lock<std::shared_mutex> lck(mtx_);
if (limit == Unlimited || limit == NoLimit) {
limit = map_.size();
}
// if (limit == Unlimited || limit == NoLimit) {
// limit = map_.size();
// }
// TODO: we can't retrieve pk by offset very conveniently.
// Selectivity should be done outside.
@ -142,6 +142,9 @@ class OffsetOrderedMap : public OffsetMap {
if (!false_filtered_out) {
cnt = size - bitset.count();
}
if (limit == Unlimited || limit == NoLimit) {
limit = cnt;
}
limit = std::min(limit, cnt);
std::vector<int64_t> seg_offsets;
seg_offsets.reserve(limit);

View File

@ -3,10 +3,15 @@ package segments
import (
"context"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type internalReducer interface {
@ -30,3 +35,46 @@ func CreateSegCoreReducer(req *querypb.QueryRequest, schema *schemapb.Collection
}
return newDefaultLimitReducerSegcore(req, schema, manager)
}
type TimestampedRetrieveResult[T interface {
typeutil.ResultWithID
GetFieldsData() []*schemapb.FieldData
}] struct {
Result T
Timestamps []int64
}
func (r *TimestampedRetrieveResult[T]) GetIds() *schemapb.IDs {
return r.Result.GetIds()
}
func (r *TimestampedRetrieveResult[T]) GetHasMoreResult() bool {
return r.Result.GetHasMoreResult()
}
func (r *TimestampedRetrieveResult[T]) GetTimestamps() []int64 {
return r.Timestamps
}
func NewTimestampedRetrieveResult[T interface {
typeutil.ResultWithID
GetFieldsData() []*schemapb.FieldData
}](result T) (*TimestampedRetrieveResult[T], error) {
tsField, has := lo.Find(result.GetFieldsData(), func(fd *schemapb.FieldData) bool {
return fd.GetFieldId() == common.TimeStampField
})
if !has {
return nil, merr.WrapErrServiceInternal("RetrieveResult does not have timestamp field")
}
timestamps := tsField.GetScalars().GetLongData().GetData()
idSize := typeutil.GetSizeOfIDs(result.GetIds())
if idSize != len(timestamps) {
return nil, merr.WrapErrServiceInternal("id length is not equal to timestamp length")
}
return &TimestampedRetrieveResult[T]{
Result: result,
Timestamps: timestamps,
}, nil
}

View File

@ -399,7 +399,7 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
_, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "MergeInternalRetrieveResult")
defer sp.End()
validRetrieveResults := []*internalpb.RetrieveResults{}
validRetrieveResults := []*TimestampedRetrieveResult[*internalpb.RetrieveResults]{}
relatedDataSize := int64(0)
hasMoreResult := false
for _, r := range retrieveResults {
@ -409,7 +409,11 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
if r == nil || len(r.GetFieldsData()) == 0 || size == 0 {
continue
}
validRetrieveResults = append(validRetrieveResults, r)
tr, err := NewTimestampedRetrieveResult(r)
if err != nil {
return nil, err
}
validRetrieveResults = append(validRetrieveResults, tr)
loopEnd += size
hasMoreResult = hasMoreResult || r.GetHasMoreResult()
}
@ -423,23 +427,23 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
loopEnd = int(param.limit)
}
ret.FieldsData = make([]*schemapb.FieldData, len(validRetrieveResults[0].GetFieldsData()))
idTsMap := make(map[interface{}]uint64)
ret.FieldsData = make([]*schemapb.FieldData, len(validRetrieveResults[0].Result.GetFieldsData()))
idTsMap := make(map[interface{}]int64)
cursors := make([]int64, len(validRetrieveResults))
var retSize int64
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
for j := 0; j < loopEnd; {
sel, drainOneResult := typeutil.SelectMinPK(validRetrieveResults, cursors)
sel, drainOneResult := typeutil.SelectMinPKWithTimestamp(validRetrieveResults, cursors)
if sel == -1 || (param.mergeStopForBest && drainOneResult) {
break
}
pk := typeutil.GetPK(validRetrieveResults[sel].GetIds(), cursors[sel])
ts := getTS(validRetrieveResults[sel], cursors[sel])
ts := validRetrieveResults[sel].Timestamps[cursors[sel]]
if _, ok := idTsMap[pk]; !ok {
typeutil.AppendPKs(ret.Ids, pk)
retSize += typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].GetFieldsData(), cursors[sel])
retSize += typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].Result.GetFieldsData(), cursors[sel])
idTsMap[pk] = ts
j++
} else {
@ -448,7 +452,7 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
if ts != 0 && ts > idTsMap[pk] {
idTsMap[pk] = ts
typeutil.DeleteFieldData(ret.FieldsData)
retSize += typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].GetFieldsData(), cursors[sel])
retSize += typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].Result.GetFieldsData(), cursors[sel])
}
}
@ -514,7 +518,7 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
loopEnd int
)
validRetrieveResults := []*segcorepb.RetrieveResults{}
validRetrieveResults := []*TimestampedRetrieveResult[*segcorepb.RetrieveResults]{}
validSegments := make([]Segment, 0, len(segments))
selectedOffsets := make([][]int64, 0, len(retrieveResults))
selectedIndexes := make([][]int64, 0, len(retrieveResults))
@ -526,7 +530,11 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
log.Debug("filter out invalid retrieve result")
continue
}
validRetrieveResults = append(validRetrieveResults, r)
tr, err := NewTimestampedRetrieveResult(r)
if err != nil {
return nil, err
}
validRetrieveResults = append(validRetrieveResults, tr)
if plan.ignoreNonPk {
validSegments = append(validSegments, segments[i])
}
@ -548,29 +556,35 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
limit = int(param.limit)
}
idSet := make(map[interface{}]struct{})
cursors := make([]int64, len(validRetrieveResults))
idTsMap := make(map[any]int64)
var availableCount int
var retSize int64
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
for j := 0; j < loopEnd && (limit == -1 || availableCount < limit); j++ {
sel, drainOneResult := typeutil.SelectMinPK(validRetrieveResults, cursors)
sel, drainOneResult := typeutil.SelectMinPKWithTimestamp(validRetrieveResults, cursors)
if sel == -1 || (param.mergeStopForBest && drainOneResult) {
break
}
pk := typeutil.GetPK(validRetrieveResults[sel].GetIds(), cursors[sel])
if _, ok := idSet[pk]; !ok {
ts := validRetrieveResults[sel].Timestamps[cursors[sel]]
if _, ok := idTsMap[pk]; !ok {
typeutil.AppendPKs(ret.Ids, pk)
selected = append(selected, sel)
selectedOffsets[sel] = append(selectedOffsets[sel], validRetrieveResults[sel].GetOffset()[cursors[sel]])
selectedOffsets[sel] = append(selectedOffsets[sel], validRetrieveResults[sel].Result.GetOffset()[cursors[sel]])
selectedIndexes[sel] = append(selectedIndexes[sel], cursors[sel])
idSet[pk] = struct{}{}
idTsMap[pk] = ts
availableCount++
} else {
// primary keys duplicate
skipDupCnt++
if ts != 0 && ts > idTsMap[pk] {
idTsMap[pk] = ts
selectedOffsets[sel][len(selectedOffsets[sel])-1] = validRetrieveResults[sel].Result.GetOffset()[cursors[sel]]
selectedIndexes[sel][len(selectedIndexes[sel])-1] = cursors[sel]
}
}
cursors[sel]++
@ -585,11 +599,11 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
// judge the `!plan.ignoreNonPk` condition.
_, span2 := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "MergeSegcoreResults-AppendFieldData")
defer span2.End()
ret.FieldsData = make([]*schemapb.FieldData, len(validRetrieveResults[0].GetFieldsData()))
ret.FieldsData = make([]*schemapb.FieldData, len(validRetrieveResults[0].Result.GetFieldsData()))
cursors = make([]int64, len(validRetrieveResults))
for _, sel := range selected {
// cannot use `cursors[sel]` directly, since some of them may be skipped.
retSize += typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].GetFieldsData(), selectedIndexes[sel][cursors[sel]])
retSize += typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].Result.GetFieldsData(), selectedIndexes[sel][cursors[sel]])
// limit retrieve result to avoid oom
if retSize > maxOutputSize {

View File

@ -22,6 +22,7 @@ import (
"sort"
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
@ -33,6 +34,15 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func getFieldData[T interface {
GetFieldsData() []*schemapb.FieldData
}](rs T, fieldID int64) (*schemapb.FieldData, bool) {
fd, has := lo.Find(rs.GetFieldsData(), func(fd *schemapb.FieldData) bool {
return fd.GetFieldId() == fieldID
})
return fd, has
}
type ResultSuite struct {
suite.Suite
}
@ -54,10 +64,12 @@ func (suite *ResultSuite) TestResult_MergeSegcoreRetrieveResults() {
FloatVector := []float32{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 11.0, 22.0, 33.0, 44.0, 55.0, 66.0, 77.0, 88.0}
var fieldDataArray1 []*schemapb.FieldData
fieldDataArray1 = append(fieldDataArray1, genFieldData(common.TimeStampFieldName, common.TimeStampField, schemapb.DataType_Int64, []int64{1000, 2000}, 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(Int64FieldName, Int64FieldID, schemapb.DataType_Int64, Int64Array[0:2], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(FloatVectorFieldName, FloatVectorFieldID, schemapb.DataType_FloatVector, FloatVector[0:16], Dim))
var fieldDataArray2 []*schemapb.FieldData
fieldDataArray2 = append(fieldDataArray2, genFieldData(common.TimeStampFieldName, common.TimeStampField, schemapb.DataType_Int64, []int64{2000, 3000}, 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(Int64FieldName, Int64FieldID, schemapb.DataType_Int64, Int64Array[0:2], 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(FloatVectorFieldName, FloatVectorFieldID, schemapb.DataType_FloatVector, FloatVector[0:16], Dim))
@ -88,10 +100,14 @@ func (suite *ResultSuite) TestResult_MergeSegcoreRetrieveResults() {
result, err := MergeSegcoreRetrieveResultsV1(context.Background(), []*segcorepb.RetrieveResults{result1, result2},
NewMergeParam(typeutil.Unlimited, make([]int64, 0), nil, false))
suite.NoError(err)
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal(3, len(result.GetFieldsData()))
suite.Equal([]int64{0, 1}, result.GetIds().GetIntId().GetData())
suite.Equal(Int64Array, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
suite.InDeltaSlice(FloatVector, result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
intFieldData, has := getFieldData(result, Int64FieldID)
suite.Require().True(has)
suite.Equal(Int64Array, intFieldData.GetScalars().GetLongData().Data)
vectorFieldData, has := getFieldData(result, FloatVectorFieldID)
suite.Require().True(has)
suite.InDeltaSlice(FloatVector, vectorFieldData.GetVectors().GetFloatVector().Data, 10e-10)
})
suite.Run("test nil results", func() {
@ -168,11 +184,15 @@ func (suite *ResultSuite) TestResult_MergeSegcoreRetrieveResults() {
suite.Run(test.description, func() {
result, err := MergeSegcoreRetrieveResultsV1(context.Background(), []*segcorepb.RetrieveResults{r1, r2},
NewMergeParam(test.limit, make([]int64, 0), nil, false))
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal(3, len(result.GetFieldsData()))
suite.Equal(int(test.limit), len(result.GetIds().GetIntId().GetData()))
suite.Equal(resultIDs[0:test.limit], result.GetIds().GetIntId().GetData())
suite.Equal(resultField0[0:test.limit], result.GetFieldsData()[0].GetScalars().GetLongData().Data)
suite.InDeltaSlice(resultFloat[0:test.limit*Dim], result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
intFieldData, has := getFieldData(result, Int64FieldID)
suite.Require().True(has)
suite.Equal(resultField0[0:test.limit], intFieldData.GetScalars().GetLongData().Data)
vectorFieldData, has := getFieldData(result, FloatVectorFieldID)
suite.Require().True(has)
suite.InDeltaSlice(resultFloat[0:test.limit*Dim], vectorFieldData.GetVectors().GetFloatVector().Data, 10e-10)
suite.NoError(err)
})
}
@ -211,10 +231,14 @@ func (suite *ResultSuite) TestResult_MergeSegcoreRetrieveResults() {
suite.Run("test int ID", func() {
result, err := MergeSegcoreRetrieveResultsV1(context.Background(), []*segcorepb.RetrieveResults{r1, r2},
NewMergeParam(typeutil.Unlimited, make([]int64, 0), nil, false))
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal(3, len(result.GetFieldsData()))
suite.Equal([]int64{1, 2, 3, 4}, result.GetIds().GetIntId().GetData())
suite.Equal([]int64{11, 11, 22, 22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
suite.InDeltaSlice(resultFloat, result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
intFieldData, has := getFieldData(result, Int64FieldID)
suite.Require().True(has)
suite.Equal([]int64{11, 11, 22, 22}, intFieldData.GetScalars().GetLongData().Data)
vectorFieldData, has := getFieldData(result, FloatVectorFieldID)
suite.Require().True(has)
suite.InDeltaSlice(resultFloat, vectorFieldData.GetVectors().GetFloatVector().Data, 10e-10)
suite.NoError(err)
})
@ -238,10 +262,14 @@ func (suite *ResultSuite) TestResult_MergeSegcoreRetrieveResults() {
result, err := MergeSegcoreRetrieveResultsV1(context.Background(), []*segcorepb.RetrieveResults{r1, r2},
NewMergeParam(typeutil.Unlimited, make([]int64, 0), nil, false))
suite.NoError(err)
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal(3, len(result.GetFieldsData()))
suite.Equal([]string{"a", "b", "c", "d"}, result.GetIds().GetStrId().GetData())
suite.Equal([]int64{11, 11, 22, 22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
suite.InDeltaSlice(resultFloat, result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
intFieldData, has := getFieldData(result, Int64FieldID)
suite.Require().True(has)
suite.Equal([]int64{11, 11, 22, 22}, intFieldData.GetScalars().GetLongData().Data)
vectorFieldData, has := getFieldData(result, FloatVectorFieldID)
suite.Require().True(has)
suite.InDeltaSlice(resultFloat, vectorFieldData.GetVectors().GetFloatVector().Data, 10e-10)
suite.NoError(err)
})
})
@ -259,10 +287,12 @@ func (suite *ResultSuite) TestResult_MergeInternalRetrieveResults() {
FloatVector := []float32{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 11.0, 22.0, 33.0, 44.0, 55.0, 66.0, 77.0, 88.0}
var fieldDataArray1 []*schemapb.FieldData
fieldDataArray1 = append(fieldDataArray1, genFieldData(common.TimeStampFieldName, common.TimeStampField, schemapb.DataType_Int64, []int64{1000, 2000}, 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(Int64FieldName, Int64FieldID, schemapb.DataType_Int64, Int64Array[0:2], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(FloatVectorFieldName, FloatVectorFieldID, schemapb.DataType_FloatVector, FloatVector[0:16], Dim))
var fieldDataArray2 []*schemapb.FieldData
fieldDataArray2 = append(fieldDataArray2, genFieldData(common.TimeStampFieldName, common.TimeStampField, schemapb.DataType_Int64, []int64{2000, 3000}, 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(Int64FieldName, Int64FieldID, schemapb.DataType_Int64, Int64Array[0:2], 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(FloatVectorFieldName, FloatVectorFieldID, schemapb.DataType_FloatVector, FloatVector[0:16], Dim))
@ -291,10 +321,14 @@ func (suite *ResultSuite) TestResult_MergeInternalRetrieveResults() {
result, err := MergeInternalRetrieveResult(context.Background(), []*internalpb.RetrieveResults{result1, result2},
NewMergeParam(typeutil.Unlimited, make([]int64, 0), nil, false))
suite.NoError(err)
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal(3, len(result.GetFieldsData()))
suite.Equal([]int64{0, 1}, result.GetIds().GetIntId().GetData())
suite.Equal(Int64Array, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
suite.InDeltaSlice(FloatVector, result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
intFieldData, has := getFieldData(result, Int64FieldID)
suite.Require().True(has)
suite.Equal(Int64Array, intFieldData.GetScalars().GetLongData().GetData())
vectorFieldData, has := getFieldData(result, FloatVectorFieldID)
suite.Require().True(has)
suite.InDeltaSlice(FloatVector, vectorFieldData.GetVectors().GetFloatVector().Data, 10e-10)
})
suite.Run("test nil results", func() {
@ -389,11 +423,16 @@ func (suite *ResultSuite) TestResult_MergeInternalRetrieveResults() {
suite.Run(test.description, func() {
result, err := MergeInternalRetrieveResult(context.Background(), []*internalpb.RetrieveResults{r1, r2},
NewMergeParam(test.limit, make([]int64, 0), nil, false))
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal(3, len(result.GetFieldsData()))
suite.Equal(int(test.limit), len(result.GetIds().GetIntId().GetData()))
suite.Equal(resultIDs[0:test.limit], result.GetIds().GetIntId().GetData())
suite.Equal(resultField0[0:test.limit], result.GetFieldsData()[0].GetScalars().GetLongData().Data)
suite.InDeltaSlice(resultFloat[0:test.limit*Dim], result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
intFieldData, has := getFieldData(result, Int64FieldID)
suite.Require().True(has)
suite.Equal(resultField0[0:test.limit], intFieldData.GetScalars().GetLongData().Data)
vectorFieldData, has := getFieldData(result, FloatVectorFieldID)
suite.Require().True(has)
suite.InDeltaSlice(resultFloat[0:test.limit*Dim], vectorFieldData.GetVectors().GetFloatVector().Data, 10e-10)
suite.NoError(err)
})
}
@ -430,10 +469,15 @@ func (suite *ResultSuite) TestResult_MergeInternalRetrieveResults() {
suite.Run("test int ID", func() {
result, err := MergeInternalRetrieveResult(context.Background(), []*internalpb.RetrieveResults{r1, r2},
NewMergeParam(typeutil.Unlimited, make([]int64, 0), nil, false))
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal(3, len(result.GetFieldsData()))
suite.Equal([]int64{1, 2, 3, 4}, result.GetIds().GetIntId().GetData())
suite.Equal([]int64{11, 11, 22, 22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
suite.InDeltaSlice(resultFloat, result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
intFieldData, has := getFieldData(result, Int64FieldID)
suite.Require().True(has)
suite.Equal([]int64{11, 11, 22, 22}, intFieldData.GetScalars().GetLongData().Data)
vectorFieldData, has := getFieldData(result, FloatVectorFieldID)
suite.Require().True(has)
suite.InDeltaSlice(resultFloat, vectorFieldData.GetVectors().GetFloatVector().Data, 10e-10)
suite.NoError(err)
})
@ -457,10 +501,14 @@ func (suite *ResultSuite) TestResult_MergeInternalRetrieveResults() {
result, err := MergeInternalRetrieveResult(context.Background(), []*internalpb.RetrieveResults{r1, r2},
NewMergeParam(typeutil.Unlimited, make([]int64, 0), nil, false))
suite.NoError(err)
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal(3, len(result.GetFieldsData()))
suite.Equal([]string{"a", "b", "c", "d"}, result.GetIds().GetStrId().GetData())
suite.Equal([]int64{11, 11, 22, 22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
suite.InDeltaSlice(resultFloat, result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
intFieldData, has := getFieldData(result, Int64FieldID)
suite.Require().True(has)
suite.Equal([]int64{11, 11, 22, 22}, intFieldData.GetScalars().GetLongData().Data)
vectorFieldData, has := getFieldData(result, FloatVectorFieldID)
suite.Require().True(has)
suite.InDeltaSlice(resultFloat, vectorFieldData.GetVectors().GetFloatVector().Data, 10e-10)
suite.NoError(err)
})
})
@ -478,12 +526,14 @@ func (suite *ResultSuite) TestResult_MergeStopForBestResult() {
FloatVector := []float32{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 11.0, 22.0, 33.0, 44.0}
var fieldDataArray1 []*schemapb.FieldData
fieldDataArray1 = append(fieldDataArray1, genFieldData(common.TimeStampFieldName, common.TimeStampField, schemapb.DataType_Int64, []int64{1000, 2000, 3000}, 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(Int64FieldName, Int64FieldID,
schemapb.DataType_Int64, Int64Array[0:3], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(FloatVectorFieldName, FloatVectorFieldID,
schemapb.DataType_FloatVector, FloatVector[0:12], Dim))
var fieldDataArray2 []*schemapb.FieldData
fieldDataArray2 = append(fieldDataArray2, genFieldData(common.TimeStampFieldName, common.TimeStampField, schemapb.DataType_Int64, []int64{2000, 3000, 4000}, 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(Int64FieldName, Int64FieldID,
schemapb.DataType_Int64, Int64Array[0:3], 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(FloatVectorFieldName, FloatVectorFieldID,
@ -518,13 +568,17 @@ func (suite *ResultSuite) TestResult_MergeStopForBestResult() {
result, err := MergeSegcoreRetrieveResultsV1(context.Background(), []*segcorepb.RetrieveResults{result1, result2},
NewMergeParam(3, make([]int64, 0), nil, true))
suite.NoError(err)
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal(3, len(result.GetFieldsData()))
// has more result both, stop reduce when draining one result
// here, we can only get best result from 0 to 4 without 6, because result1 has more results
suite.Equal([]int64{0, 1, 2, 3, 4}, result.GetIds().GetIntId().GetData())
suite.Equal([]int64{11, 22, 11, 22, 33}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
intFieldData, has := getFieldData(result, Int64FieldID)
suite.Require().True(has)
suite.Equal([]int64{11, 22, 11, 22, 33}, intFieldData.GetScalars().GetLongData().Data)
vectorFieldData, has := getFieldData(result, FloatVectorFieldID)
suite.Require().True(has)
suite.InDeltaSlice([]float32{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 11, 22, 33, 44},
result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
vectorFieldData.GetVectors().GetFloatVector().Data, 10e-10)
})
suite.Run("merge stop unlimited", func() {
result1.HasMoreResult = false
@ -532,13 +586,17 @@ func (suite *ResultSuite) TestResult_MergeStopForBestResult() {
result, err := MergeSegcoreRetrieveResultsV1(context.Background(), []*segcorepb.RetrieveResults{result1, result2},
NewMergeParam(typeutil.Unlimited, make([]int64, 0), nil, true))
suite.NoError(err)
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal(3, len(result.GetFieldsData()))
// as result1 and result2 don't have better results neither
// we can reduce all available result into the reduced result
suite.Equal([]int64{0, 1, 2, 3, 4, 6}, result.GetIds().GetIntId().GetData())
suite.Equal([]int64{11, 22, 11, 22, 33, 33}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
intFieldData, has := getFieldData(result, Int64FieldID)
suite.Require().True(has)
suite.Equal([]int64{11, 22, 11, 22, 33, 33}, intFieldData.GetScalars().GetLongData().Data)
vectorFieldData, has := getFieldData(result, FloatVectorFieldID)
suite.Require().True(has)
suite.InDeltaSlice([]float32{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 11, 22, 33, 44, 11, 22, 33, 44},
result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
vectorFieldData.GetVectors().GetFloatVector().Data, 10e-10)
})
suite.Run("merge stop one limited", func() {
result1.HasMoreResult = true
@ -546,12 +604,16 @@ func (suite *ResultSuite) TestResult_MergeStopForBestResult() {
result, err := MergeSegcoreRetrieveResultsV1(context.Background(), []*segcorepb.RetrieveResults{result1, result2},
NewMergeParam(typeutil.Unlimited, make([]int64, 0), nil, true))
suite.NoError(err)
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal(3, len(result.GetFieldsData()))
// as result1 may have better results, stop reducing when draining it
suite.Equal([]int64{0, 1, 2, 3, 4}, result.GetIds().GetIntId().GetData())
suite.Equal([]int64{11, 22, 11, 22, 33}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
intFieldData, has := getFieldData(result, Int64FieldID)
suite.Require().True(has)
suite.Equal([]int64{11, 22, 11, 22, 33}, intFieldData.GetScalars().GetLongData().Data)
vectorFieldData, has := getFieldData(result, FloatVectorFieldID)
suite.Require().True(has)
suite.InDeltaSlice([]float32{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 11, 22, 33, 44},
result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
vectorFieldData.GetVectors().GetFloatVector().Data, 10e-10)
})
})
@ -581,11 +643,15 @@ func (suite *ResultSuite) TestResult_MergeStopForBestResult() {
result, err := MergeInternalRetrieveResult(context.Background(), []*internalpb.RetrieveResults{result1, result2},
NewMergeParam(3, make([]int64, 0), nil, true))
suite.NoError(err)
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal(3, len(result.GetFieldsData()))
suite.Equal([]int64{0, 2, 4, 6, 7}, result.GetIds().GetIntId().GetData())
suite.Equal([]int64{11, 11, 22, 22, 33}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
intFieldData, has := getFieldData(result, Int64FieldID)
suite.Require().True(has)
suite.Equal([]int64{11, 11, 22, 22, 33}, intFieldData.GetScalars().GetLongData().Data)
vectorFieldData, has := getFieldData(result, FloatVectorFieldID)
suite.Require().True(has)
suite.InDeltaSlice([]float32{1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 5, 6, 7, 8, 11, 22, 33, 44},
result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
vectorFieldData.GetVectors().GetFloatVector().Data, 10e-10)
})
suite.Run("test stop internal merge for best with early termination", func() {
@ -599,6 +665,12 @@ func (suite *ResultSuite) TestResult_MergeStopForBestResult() {
},
FieldsData: fieldDataArray1,
}
var drainDataArray2 []*schemapb.FieldData
drainDataArray2 = append(drainDataArray2, genFieldData(common.TimeStampFieldName, common.TimeStampField, schemapb.DataType_Int64, []int64{2000}, 1))
drainDataArray2 = append(drainDataArray2, genFieldData(Int64FieldName, Int64FieldID,
schemapb.DataType_Int64, Int64Array[0:1], 1))
drainDataArray2 = append(drainDataArray2, genFieldData(FloatVectorFieldName, FloatVectorFieldID,
schemapb.DataType_FloatVector, FloatVector[0:4], Dim))
result2 := &internalpb.RetrieveResults{
Ids: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
@ -607,7 +679,7 @@ func (suite *ResultSuite) TestResult_MergeStopForBestResult() {
},
},
},
FieldsData: fieldDataArray2,
FieldsData: drainDataArray2,
}
suite.Run("test drain one result without more results", func() {
result1.HasMoreResult = false
@ -615,7 +687,7 @@ func (suite *ResultSuite) TestResult_MergeStopForBestResult() {
result, err := MergeInternalRetrieveResult(context.Background(), []*internalpb.RetrieveResults{result1, result2},
NewMergeParam(3, make([]int64, 0), nil, true))
suite.NoError(err)
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal(3, len(result.GetFieldsData()))
suite.Equal([]int64{0, 2, 4, 7}, result.GetIds().GetIntId().GetData())
})
suite.Run("test drain one result with more results", func() {
@ -624,7 +696,7 @@ func (suite *ResultSuite) TestResult_MergeStopForBestResult() {
result, err := MergeInternalRetrieveResult(context.Background(), []*internalpb.RetrieveResults{result1, result2},
NewMergeParam(3, make([]int64, 0), nil, true))
suite.NoError(err)
suite.Equal(2, len(result.GetFieldsData()))
suite.Equal(3, len(result.GetFieldsData()))
suite.Equal([]int64{0, 2}, result.GetIds().GetIntId().GetData())
})
})

View File

@ -1350,6 +1350,10 @@ type ResultWithID interface {
GetHasMoreResult() bool
}
type ResultWithTimestamp interface {
GetTimestamps() []int64
}
// SelectMinPK select the index of the minPK in results T of the cursors.
func SelectMinPK[T ResultWithID](results []T, cursors []int64) (int, bool) {
var (
@ -1389,6 +1393,54 @@ func SelectMinPK[T ResultWithID](results []T, cursors []int64) (int, bool) {
return sel, drainResult
}
func SelectMinPKWithTimestamp[T interface {
ResultWithID
ResultWithTimestamp
}](results []T, cursors []int64) (int, bool) {
var (
sel = -1
drainResult = false
maxTimestamp int64 = 0
minIntPK int64 = math.MaxInt64
firstStr = true
minStrPK string
)
for i, cursor := range cursors {
timestamps := results[i].GetTimestamps()
// if cursor has run out of all results from one result and this result has more matched results
// in this case we have tell reduce to stop because better results may be retrieved in the following iteration
if int(cursor) >= GetSizeOfIDs(results[i].GetIds()) && (results[i].GetHasMoreResult()) {
drainResult = true
continue
}
pkInterface := GetPK(results[i].GetIds(), cursor)
switch pk := pkInterface.(type) {
case string:
ts := timestamps[cursor]
if firstStr || pk < minStrPK || (pk == minStrPK && ts > maxTimestamp) {
firstStr = false
minStrPK = pk
sel = i
maxTimestamp = ts
}
case int64:
ts := timestamps[cursor]
if pk < minIntPK || (pk == minIntPK && ts > maxTimestamp) {
minIntPK = pk
sel = i
maxTimestamp = ts
}
default:
continue
}
}
return sel, drainResult
}
func AppendGroupByValue(dstResData *schemapb.SearchResultData,
groupByVal interface{}, srcDataType schemapb.DataType,
) error {