Fix high cpu usage caused by proto.size (#27054)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/27159/head
xige-16 2023-09-17 20:55:21 +08:00 committed by GitHub
parent c162c6a4c8
commit 488b423e1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 17 deletions

View File

@ -557,6 +557,8 @@ func reduceRetrieveResults(ctx context.Context, retrieveResults []*internalpb.Re
}
}
var retSize int64
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
for j := 0; j < loopEnd; j++ {
sel := typeutil.SelectMinPK(validRetrieveResults, cursors)
if sel == -1 {
@ -565,7 +567,7 @@ func reduceRetrieveResults(ctx context.Context, retrieveResults []*internalpb.Re
pk := typeutil.GetPK(validRetrieveResults[sel].GetIds(), cursors[sel])
if _, ok := idSet[pk]; !ok {
typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].GetFieldsData(), cursors[sel])
retSize += typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].GetFieldsData(), cursors[sel])
idSet[pk] = struct{}{}
} else {
// primary keys duplicate
@ -573,8 +575,8 @@ func reduceRetrieveResults(ctx context.Context, retrieveResults []*internalpb.Re
}
// limit retrieve result to avoid oom
if int64(proto.Size(ret)) > paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64() {
return nil, fmt.Errorf("query results exceed the maxOutputSize Limit %d", paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64())
if retSize > maxOutputSize {
return nil, fmt.Errorf("query results exceed the maxOutputSize Limit %d", maxOutputSize)
}
cursors[sel]++

View File

@ -828,6 +828,8 @@ func reduceSearchResultData(ctx context.Context, subSearchResultData []*schemapb
realTopK int64 = -1
)
var retSize int64
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
// reducing nq * topk results
for i := int64(0); i < nq; i++ {
@ -865,7 +867,7 @@ func reduceSearchResultData(ctx context.Context, subSearchResultData []*schemapb
// remove duplicates
if _, ok := idSet[id]; !ok {
typeutil.AppendFieldData(ret.Results.FieldsData, subSearchResultData[subSearchIdx].FieldsData, resultDataIdx)
retSize += typeutil.AppendFieldData(ret.Results.FieldsData, subSearchResultData[subSearchIdx].FieldsData, resultDataIdx)
typeutil.AppendPKs(ret.Results.Ids, id)
ret.Results.Scores = append(ret.Results.Scores, score)
idSet[id] = struct{}{}
@ -884,8 +886,8 @@ func reduceSearchResultData(ctx context.Context, subSearchResultData []*schemapb
ret.Results.Topks = append(ret.Results.Topks, realTopK)
// limit search result to avoid oom
if int64(proto.Size(ret)) > paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64() {
return nil, fmt.Errorf("search results exceed the maxOutputSize Limit %d", paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64())
if retSize > maxOutputSize {
return nil, fmt.Errorf("search results exceed the maxOutputSize Limit %d", maxOutputSize)
}
}
log.Ctx(ctx).Debug("skip duplicated search result", zap.Int64("count", skipDupCnt))

View File

@ -123,6 +123,8 @@ func ReduceSearchResultData(ctx context.Context, searchResultData []*schemapb.Se
}
var skipDupCnt int64
var retSize int64
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
for i := int64(0); i < nq; i++ {
offsets := make([]int64, len(searchResultData))
@ -140,7 +142,7 @@ func ReduceSearchResultData(ctx context.Context, searchResultData []*schemapb.Se
// remove duplicates
if _, ok := idSet[id]; !ok {
typeutil.AppendFieldData(ret.FieldsData, searchResultData[sel].FieldsData, idx)
retSize += typeutil.AppendFieldData(ret.FieldsData, searchResultData[sel].FieldsData, idx)
typeutil.AppendPKs(ret.Ids, id)
ret.Scores = append(ret.Scores, score)
idSet[id] = struct{}{}
@ -159,8 +161,8 @@ func ReduceSearchResultData(ctx context.Context, searchResultData []*schemapb.Se
ret.Topks = append(ret.Topks, j)
// limit search result to avoid oom
if int64(proto.Size(ret)) > paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64() {
return nil, fmt.Errorf("search results exceed the maxOutputSize Limit %d", paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64())
if retSize > maxOutputSize {
return nil, fmt.Errorf("search results exceed the maxOutputSize Limit %d", maxOutputSize)
}
}
log.Debug("skip duplicated search result", zap.Int64("count", skipDupCnt))
@ -273,6 +275,9 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
ret.FieldsData = make([]*schemapb.FieldData, len(validRetrieveResults[0].GetFieldsData()))
idTsMap := make(map[interface{}]uint64)
cursors := make([]int64, len(validRetrieveResults))
var retSize int64
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
for j := 0; j < loopEnd; j++ {
sel := typeutil.SelectMinPK(validRetrieveResults, cursors)
if sel == -1 {
@ -283,7 +288,7 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
ts := getTS(validRetrieveResults[sel], cursors[sel])
if _, ok := idTsMap[pk]; !ok {
typeutil.AppendPKs(ret.Ids, pk)
typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].GetFieldsData(), cursors[sel])
retSize += typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].GetFieldsData(), cursors[sel])
idTsMap[pk] = ts
} else {
// primary keys duplicate
@ -291,13 +296,13 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
if ts != 0 && ts > idTsMap[pk] {
idTsMap[pk] = ts
typeutil.DeleteFieldData(ret.FieldsData)
typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].GetFieldsData(), cursors[sel])
retSize += typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].GetFieldsData(), cursors[sel])
}
}
// limit retrieve result to avoid oom
if int64(proto.Size(ret)) > paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64() {
return nil, fmt.Errorf("query results exceed the maxOutputSize Limit %d", paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64())
if retSize > maxOutputSize {
return nil, fmt.Errorf("query results exceed the maxOutputSize Limit %d", maxOutputSize)
}
cursors[sel]++
@ -373,6 +378,9 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
ret.FieldsData = make([]*schemapb.FieldData, len(validRetrieveResults[0].GetFieldsData()))
idSet := make(map[interface{}]struct{})
cursors := make([]int64, len(validRetrieveResults))
var retSize int64
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
for j := 0; j < loopEnd; j++ {
sel := typeutil.SelectMinPK(validRetrieveResults, cursors)
if sel == -1 {
@ -382,7 +390,7 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
pk := typeutil.GetPK(validRetrieveResults[sel].GetIds(), cursors[sel])
if _, ok := idSet[pk]; !ok {
typeutil.AppendPKs(ret.Ids, pk)
typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].GetFieldsData(), cursors[sel])
retSize += typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].GetFieldsData(), cursors[sel])
idSet[pk] = struct{}{}
} else {
// primary keys duplicate
@ -390,8 +398,8 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
}
// limit retrieve result to avoid oom
if int64(proto.Size(ret)) > paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64() {
return nil, fmt.Errorf("query results exceed the maxOutputSize Limit %d", paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64())
if retSize > maxOutputSize {
return nil, fmt.Errorf("query results exceed the maxOutputSize Limit %d", maxOutputSize)
}
cursors[sel]++

View File

@ -20,6 +20,7 @@ import (
"fmt"
"math"
"strconv"
"unsafe"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
@ -382,7 +383,7 @@ func IsVariableDataType(dataType schemapb.DataType) bool {
}
// AppendFieldData appends fields data of specified index from src to dst
func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx int64) {
func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx int64) (appendSize int64) {
for i, fieldData := range src {
switch fieldType := fieldData.Field.(type) {
case *schemapb.FieldData_Scalars:
@ -409,6 +410,8 @@ func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx i
} else {
dstScalar.GetBoolData().Data = append(dstScalar.GetBoolData().Data, srcScalar.BoolData.Data[idx])
}
/* #nosec G103 */
appendSize += int64(unsafe.Sizeof(srcScalar.BoolData.Data[idx]))
case *schemapb.ScalarField_IntData:
if dstScalar.GetIntData() == nil {
dstScalar.Data = &schemapb.ScalarField_IntData{
@ -419,6 +422,8 @@ func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx i
} else {
dstScalar.GetIntData().Data = append(dstScalar.GetIntData().Data, srcScalar.IntData.Data[idx])
}
/* #nosec G103 */
appendSize += int64(unsafe.Sizeof(srcScalar.IntData.Data[idx]))
case *schemapb.ScalarField_LongData:
if dstScalar.GetLongData() == nil {
dstScalar.Data = &schemapb.ScalarField_LongData{
@ -429,6 +434,8 @@ func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx i
} else {
dstScalar.GetLongData().Data = append(dstScalar.GetLongData().Data, srcScalar.LongData.Data[idx])
}
/* #nosec G103 */
appendSize += int64(unsafe.Sizeof(srcScalar.LongData.Data[idx]))
case *schemapb.ScalarField_FloatData:
if dstScalar.GetFloatData() == nil {
dstScalar.Data = &schemapb.ScalarField_FloatData{
@ -439,6 +446,8 @@ func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx i
} else {
dstScalar.GetFloatData().Data = append(dstScalar.GetFloatData().Data, srcScalar.FloatData.Data[idx])
}
/* #nosec G103 */
appendSize += int64(unsafe.Sizeof(srcScalar.FloatData.Data[idx]))
case *schemapb.ScalarField_DoubleData:
if dstScalar.GetDoubleData() == nil {
dstScalar.Data = &schemapb.ScalarField_DoubleData{
@ -449,6 +458,8 @@ func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx i
} else {
dstScalar.GetDoubleData().Data = append(dstScalar.GetDoubleData().Data, srcScalar.DoubleData.Data[idx])
}
/* #nosec G103 */
appendSize += int64(unsafe.Sizeof(srcScalar.DoubleData.Data[idx]))
case *schemapb.ScalarField_StringData:
if dstScalar.GetStringData() == nil {
dstScalar.Data = &schemapb.ScalarField_StringData{
@ -459,6 +470,8 @@ func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx i
} else {
dstScalar.GetStringData().Data = append(dstScalar.GetStringData().Data, srcScalar.StringData.Data[idx])
}
/* #nosec G103 */
appendSize += int64(unsafe.Sizeof(srcScalar.StringData.Data[idx]))
case *schemapb.ScalarField_ArrayData:
if dstScalar.GetArrayData() == nil {
dstScalar.Data = &schemapb.ScalarField_ArrayData{
@ -469,6 +482,8 @@ func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx i
} else {
dstScalar.GetArrayData().Data = append(dstScalar.GetArrayData().Data, srcScalar.ArrayData.Data[idx])
}
/* #nosec G103 */
appendSize += int64(unsafe.Sizeof(srcScalar.ArrayData.Data[idx]))
case *schemapb.ScalarField_JsonData:
if dstScalar.GetJsonData() == nil {
dstScalar.Data = &schemapb.ScalarField_JsonData{
@ -479,6 +494,8 @@ func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx i
} else {
dstScalar.GetJsonData().Data = append(dstScalar.GetJsonData().Data, srcScalar.JsonData.Data[idx])
}
/* #nosec G103 */
appendSize += int64(unsafe.Sizeof(srcScalar.JsonData.Data[idx]))
default:
log.Error("Not supported field type", zap.String("field type", fieldData.Type.String()))
}
@ -509,6 +526,8 @@ func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx i
dstBinaryVector := dstVector.Data.(*schemapb.VectorField_BinaryVector)
dstBinaryVector.BinaryVector = append(dstBinaryVector.BinaryVector, srcVector.BinaryVector[idx*(dim/8):(idx+1)*(dim/8)]...)
}
/* #nosec G103 */
appendSize += int64(unsafe.Sizeof(srcVector.BinaryVector[idx*(dim/8) : (idx+1)*(dim/8)]))
case *schemapb.VectorField_FloatVector:
if dstVector.GetFloatVector() == nil {
srcToCopy := srcVector.FloatVector.Data[idx*dim : (idx+1)*dim]
@ -521,6 +540,8 @@ func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx i
} else {
dstVector.GetFloatVector().Data = append(dstVector.GetFloatVector().Data, srcVector.FloatVector.Data[idx*dim:(idx+1)*dim]...)
}
/* #nosec G103 */
appendSize += int64(unsafe.Sizeof(srcVector.FloatVector.Data[idx*dim : (idx+1)*dim]))
case *schemapb.VectorField_Float16Vector:
if dstVector.GetFloat16Vector() == nil {
srcToCopy := srcVector.Float16Vector[idx*(dim*2) : (idx+1)*(dim*2)]
@ -532,11 +553,15 @@ func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx i
dstFloat16Vector := dstVector.Data.(*schemapb.VectorField_Float16Vector)
dstFloat16Vector.Float16Vector = append(dstFloat16Vector.Float16Vector, srcVector.Float16Vector[idx*(dim*2):(idx+1)*(dim*2)]...)
}
/* #nosec G103 */
appendSize += int64(unsafe.Sizeof(srcVector.Float16Vector[idx*(dim*2) : (idx+1)*(dim*2)]))
default:
log.Error("Not supported field type", zap.String("field type", fieldData.Type.String()))
}
}
}
return
}
// DeleteFieldData delete fields data appended last time