fix: overwrite correct selection when pk duplicated (#35826)

Related to #35505

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/36276/head
congqixia 2024-09-14 10:27:08 +08:00 committed by GitHub
parent ef8289ea9b
commit 3bc7d63be9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 45 additions and 31 deletions

View File

@ -390,8 +390,6 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
validRetrieveResults := []*TimestampedRetrieveResult[*segcorepb.RetrieveResults]{} validRetrieveResults := []*TimestampedRetrieveResult[*segcorepb.RetrieveResults]{}
validSegments := make([]Segment, 0, len(segments)) validSegments := make([]Segment, 0, len(segments))
selectedOffsets := make([][]int64, 0, len(retrieveResults))
selectedIndexes := make([][]int64, 0, len(retrieveResults))
hasMoreResult := false hasMoreResult := false
for i, r := range retrieveResults { for i, r := range retrieveResults {
size := typeutil.GetSizeOfIDs(r.GetIds()) size := typeutil.GetSizeOfIDs(r.GetIds())
@ -408,8 +406,6 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
if plan.ignoreNonPk { if plan.ignoreNonPk {
validSegments = append(validSegments, segments[i]) validSegments = append(validSegments, segments[i])
} }
selectedOffsets = append(selectedOffsets, make([]int64, 0, len(r.GetOffset())))
selectedIndexes = append(selectedIndexes, make([]int64, 0, len(r.GetOffset())))
loopEnd += size loopEnd += size
hasMoreResult = r.GetHasMoreResult() || hasMoreResult hasMoreResult = r.GetHasMoreResult() || hasMoreResult
} }
@ -419,8 +415,6 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
return ret, nil return ret, nil
} }
selected := make([]int, 0, ret.GetAllRetrieveCount())
var limit int = -1 var limit int = -1
if param.limit != typeutil.Unlimited && !param.mergeStopForBest { if param.limit != typeutil.Unlimited && !param.mergeStopForBest {
limit = int(param.limit) limit = int(param.limit)
@ -433,6 +427,15 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
var availableCount int var availableCount int
var retSize int64 var retSize int64
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64() maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
type selection struct {
batchIndex int // index of validate retrieve results
resultIndex int64 // index of selection in selected result item
offset int64 // offset of the result
}
var selections []selection
for j := 0; j < loopEnd && (limit == -1 || availableCount < limit); j++ { for j := 0; j < loopEnd && (limit == -1 || availableCount < limit); j++ {
sel, drainOneResult := typeutil.SelectMinPKWithTimestamp(validRetrieveResults, cursors) sel, drainOneResult := typeutil.SelectMinPKWithTimestamp(validRetrieveResults, cursors)
if sel == -1 || (param.mergeStopForBest && drainOneResult) { if sel == -1 || (param.mergeStopForBest && drainOneResult) {
@ -443,9 +446,11 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
ts := validRetrieveResults[sel].Timestamps[cursors[sel]] ts := validRetrieveResults[sel].Timestamps[cursors[sel]]
if _, ok := idTsMap[pk]; !ok { if _, ok := idTsMap[pk]; !ok {
typeutil.AppendPKs(ret.Ids, pk) typeutil.AppendPKs(ret.Ids, pk)
selected = append(selected, sel) selections = append(selections, selection{
selectedOffsets[sel] = append(selectedOffsets[sel], validRetrieveResults[sel].Result.GetOffset()[cursors[sel]]) batchIndex: sel,
selectedIndexes[sel] = append(selectedIndexes[sel], cursors[sel]) resultIndex: cursors[sel],
offset: validRetrieveResults[sel].Result.GetOffset()[cursors[sel]],
})
idTsMap[pk] = ts idTsMap[pk] = ts
availableCount++ availableCount++
} else { } else {
@ -453,8 +458,21 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
skipDupCnt++ skipDupCnt++
if ts != 0 && ts > idTsMap[pk] { if ts != 0 && ts > idTsMap[pk] {
idTsMap[pk] = ts idTsMap[pk] = ts
selectedOffsets[sel][len(selectedOffsets[sel])-1] = validRetrieveResults[sel].Result.GetOffset()[cursors[sel]] idx := len(selections) - 1
selectedIndexes[sel][len(selectedIndexes[sel])-1] = cursors[sel] for ; idx >= 0; idx-- {
selection := selections[idx]
pkValue := typeutil.GetPK(validRetrieveResults[selection.batchIndex].GetIds(), selection.resultIndex)
if pk == pkValue {
break
}
}
if idx >= 0 {
selections[idx] = selection{
batchIndex: sel,
resultIndex: cursors[sel],
offset: validRetrieveResults[sel].Result.GetOffset()[cursors[sel]],
}
}
} }
} }
@ -470,31 +488,28 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
// judge the `!plan.ignoreNonPk` condition. // judge the `!plan.ignoreNonPk` condition.
_, span2 := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "MergeSegcoreResults-AppendFieldData") _, span2 := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "MergeSegcoreResults-AppendFieldData")
defer span2.End() defer span2.End()
ret.FieldsData = typeutil.PrepareResultFieldData(validRetrieveResults[0].Result.GetFieldsData(), int64(len(selected))) ret.FieldsData = typeutil.PrepareResultFieldData(validRetrieveResults[0].Result.GetFieldsData(), int64(len(selections)))
cursors = make([]int64, len(validRetrieveResults)) // cursors = make([]int64, len(validRetrieveResults))
for _, sel := range selected { for _, selection := range selections {
// cannot use `cursors[sel]` directly, since some of them may be skipped. // cannot use `cursors[sel]` directly, since some of them may be skipped.
retSize += typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].Result.GetFieldsData(), selectedIndexes[sel][cursors[sel]]) retSize += typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[selection.batchIndex].Result.GetFieldsData(), selection.resultIndex)
// limit retrieve result to avoid oom // limit retrieve result to avoid oom
if retSize > maxOutputSize { if retSize > maxOutputSize {
return nil, fmt.Errorf("query results exceed the maxOutputSize Limit %d", maxOutputSize) return nil, fmt.Errorf("query results exceed the maxOutputSize Limit %d", maxOutputSize)
} }
cursors[sel]++
} }
} else { } else {
// target entry not retrieved. // target entry not retrieved.
ctx, span2 := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "MergeSegcoreResults-RetrieveByOffsets-AppendFieldData") ctx, span2 := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "MergeSegcoreResults-RetrieveByOffsets-AppendFieldData")
defer span2.End() defer span2.End()
segmentResults := make([]*segcorepb.RetrieveResults, len(validRetrieveResults)) segmentResults := make([]*segcorepb.RetrieveResults, len(validRetrieveResults))
futures := make([]*conc.Future[any], 0, len(validRetrieveResults)) groups := lo.GroupBy(selections, func(sel selection) int {
for i, offsets := range selectedOffsets { return sel.batchIndex
if len(offsets) == 0 { })
log.Ctx(ctx).Debug("skip empty retrieve results", zap.Int64("segment", validSegments[i].ID())) futures := make([]*conc.Future[any], 0, len(groups))
continue for i, selections := range groups {
} idx, theOffsets := i, lo.Map(selections, func(sel selection, _ int) int64 { return sel.offset })
idx, theOffsets := i, offsets
future := GetSQPool().Submit(func() (any, error) { future := GetSQPool().Submit(func() (any, error) {
var r *segcorepb.RetrieveResults var r *segcorepb.RetrieveResults
var err error var err error
@ -515,23 +530,22 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
for _, r := range segmentResults { for _, r := range segmentResults {
if len(r.GetFieldsData()) != 0 { if len(r.GetFieldsData()) != 0 {
ret.FieldsData = typeutil.PrepareResultFieldData(r.GetFieldsData(), int64(len(selected))) ret.FieldsData = typeutil.PrepareResultFieldData(r.GetFieldsData(), int64(len(selections)))
break break
} }
} }
_, span3 := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "MergeSegcoreResults-AppendFieldData") _, span3 := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "MergeSegcoreResults-AppendFieldData")
defer span3.End() defer span3.End()
cursors = make([]int64, len(segmentResults)) // retrieve result is compacted, use 0,1,2...end
for _, sel := range selected { segmentResOffset := make([]int64, len(segmentResults))
retSize += typeutil.AppendFieldData(ret.FieldsData, segmentResults[sel].GetFieldsData(), cursors[sel]) for _, selection := range selections {
retSize += typeutil.AppendFieldData(ret.FieldsData, segmentResults[selection.batchIndex].GetFieldsData(), segmentResOffset[selection.batchIndex])
segmentResOffset[selection.batchIndex]++
// limit retrieve result to avoid oom // limit retrieve result to avoid oom
if retSize > maxOutputSize { if retSize > maxOutputSize {
return nil, fmt.Errorf("query results exceed the maxOutputSize Limit %d", maxOutputSize) return nil, fmt.Errorf("query results exceed the maxOutputSize Limit %d", maxOutputSize)
} }
cursors[sel]++
} }
} }