Remove primary key duplicated query result on query node (#10834)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/10947/head
Cai Yudong 2021-10-30 15:12:41 +08:00 committed by GitHub
parent 96ac08d9fb
commit 7763615780
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 191 additions and 33 deletions

View File

@ -1323,3 +1323,118 @@ func genSimpleQueryNode(ctx context.Context) (*QueryNode, error) {
return node, nil
}
func genFieldData(fieldName string, fieldID int64, fieldType schemapb.DataType, fieldValue interface{}, dim int64) *schemapb.FieldData {
var fieldData *schemapb.FieldData
switch fieldType {
case schemapb.DataType_Bool:
fieldData = &schemapb.FieldData{
Type: schemapb.DataType_Bool,
FieldName: fieldName,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_BoolData{
BoolData: &schemapb.BoolArray{
Data: fieldValue.([]bool),
},
},
},
},
FieldId: fieldID,
}
case schemapb.DataType_Int32:
fieldData = &schemapb.FieldData{
Type: schemapb.DataType_Int32,
FieldName: fieldName,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: fieldValue.([]int32),
},
},
},
},
FieldId: fieldID,
}
case schemapb.DataType_Int64:
fieldData = &schemapb.FieldData{
Type: schemapb.DataType_Int64,
FieldName: fieldName,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: fieldValue.([]int64),
},
},
},
},
FieldId: fieldID,
}
case schemapb.DataType_Float:
fieldData = &schemapb.FieldData{
Type: schemapb.DataType_Float,
FieldName: fieldName,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_FloatData{
FloatData: &schemapb.FloatArray{
Data: fieldValue.([]float32),
},
},
},
},
FieldId: fieldID,
}
case schemapb.DataType_Double:
fieldData = &schemapb.FieldData{
Type: schemapb.DataType_Double,
FieldName: fieldName,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_DoubleData{
DoubleData: &schemapb.DoubleArray{
Data: fieldValue.([]float64),
},
},
},
},
FieldId: fieldID,
}
case schemapb.DataType_BinaryVector:
fieldData = &schemapb.FieldData{
Type: schemapb.DataType_BinaryVector,
FieldName: fieldName,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: dim,
Data: &schemapb.VectorField_BinaryVector{
BinaryVector: fieldValue.([]byte),
},
},
},
FieldId: fieldID,
}
case schemapb.DataType_FloatVector:
fieldData = &schemapb.FieldData{
Type: schemapb.DataType_FloatVector,
FieldName: fieldName,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: dim,
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: fieldValue.([]float32),
},
},
},
},
FieldId: fieldID,
}
default:
log.Error("not supported field type", zap.String("field type", fieldType.String()))
}
return fieldData
}

View File

@ -1208,38 +1208,60 @@ func (q *queryCollection) retrieve(msg queryMsg) error {
return nil
}
func mergeRetrieveResults(dataArr []*segcorepb.RetrieveResults) (*segcorepb.RetrieveResults, error) {
var final *segcorepb.RetrieveResults
for _, data := range dataArr {
func mergeRetrieveResults(retrieveResults []*segcorepb.RetrieveResults) (*segcorepb.RetrieveResults, error) {
var ret *segcorepb.RetrieveResults
var skipDupCnt int64 = 0
var idSet = make(map[int64]struct{})
// merge results and remove duplicates
for _, rr := range retrieveResults {
// skip empty result, it will break merge result
if data == nil || len(data.Offset) == 0 {
if rr == nil || len(rr.Offset) == 0 {
continue
}
if final == nil {
final = proto.Clone(data).(*segcorepb.RetrieveResults)
continue
if ret == nil {
ret = &segcorepb.RetrieveResults{
Ids: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{},
},
},
},
FieldsData: make([]*schemapb.FieldData, len(rr.FieldsData)),
}
}
proto.Merge(final.Ids, data.Ids)
if len(final.FieldsData) != len(data.FieldsData) {
if len(ret.FieldsData) != len(rr.FieldsData) {
return nil, fmt.Errorf("mismatch FieldData in RetrieveResults")
}
for i := range final.FieldsData {
proto.Merge(final.FieldsData[i], data.FieldsData[i])
dstIds := ret.Ids.GetIntId()
for i, id := range rr.Ids.GetIntId().GetData() {
if _, ok := idSet[id]; !ok {
dstIds.Data = append(dstIds.Data, id)
typeutil.AppendFieldData(ret.FieldsData, rr.FieldsData, int64(i))
idSet[id] = struct{}{}
} else {
// primary keys duplicate
skipDupCnt++
}
}
}
if skipDupCnt > 0 {
log.Debug("skip duplicated query result", zap.Int64("count", skipDupCnt))
}
// not found, return default values indicating not result found
if final == nil {
final = &segcorepb.RetrieveResults{
Ids: nil,
if ret == nil {
ret = &segcorepb.RetrieveResults{
Ids: &schemapb.IDs{},
FieldsData: []*schemapb.FieldData{},
}
}
return final, nil
return ret, nil
}
func (q *queryCollection) publishQueryResult(msg msgstream.TsMsg, collectionID UniqueID) error {

View File

@ -12,6 +12,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/common"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -488,31 +489,51 @@ func TestQueryCollection_waitNewTSafe(t *testing.T) {
}
func TestQueryCollection_mergeRetrieveResults(t *testing.T) {
fieldData := []*schemapb.FieldData{
{
Type: schemapb.DataType_FloatVector,
FieldName: defaultVecFieldName,
FieldId: simpleVecField.id,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: defaultDim,
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: []float32{1.1, 2.2, 3.3, 4.4},
},
},
const (
Dim = 8
Int64FieldName = "Int64Field"
FloatVectorFieldName = "FloatVectorField"
Int64FieldID = common.StartOfUserFieldID + 1
FloatVectorFieldID = common.StartOfUserFieldID + 2
)
Int64Array := []int64{11, 22}
FloatVector := []float32{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 11.0, 22.0, 33.0, 44.0, 55.0, 66.0, 77.0, 88.0}
var fieldDataArray1 []*schemapb.FieldData
fieldDataArray1 = append(fieldDataArray1, genFieldData(Int64FieldName, Int64FieldID, schemapb.DataType_Int64, Int64Array[0:2], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(FloatVectorFieldName, FloatVectorFieldID, schemapb.DataType_FloatVector, FloatVector[0:16], Dim))
var fieldDataArray2 []*schemapb.FieldData
fieldDataArray2 = append(fieldDataArray2, genFieldData(Int64FieldName, Int64FieldID, schemapb.DataType_Int64, Int64Array[0:2], 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(FloatVectorFieldName, FloatVectorFieldID, schemapb.DataType_FloatVector, FloatVector[0:16], Dim))
result1 := &segcorepb.RetrieveResults{
Ids: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{0, 1},
},
},
},
Offset: []int64{0, 1},
FieldsData: fieldDataArray1,
}
result := &segcorepb.RetrieveResults{
Ids: &schemapb.IDs{},
Offset: []int64{0},
FieldsData: fieldData,
result2 := &segcorepb.RetrieveResults{
Ids: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{0, 1},
},
},
},
Offset: []int64{0, 1},
FieldsData: fieldDataArray2,
}
_, err := mergeRetrieveResults([]*segcorepb.RetrieveResults{result})
result, err := mergeRetrieveResults([]*segcorepb.RetrieveResults{result1, result2})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.FieldsData[0].GetScalars().GetLongData().Data))
assert.Equal(t, 2*Dim, len(result.FieldsData[1].GetVectors().GetFloatVector().Data))
_, err = mergeRetrieveResults(nil)
assert.NoError(t, err)