add independent nq limit option (#24632)

Signed-off-by: chyezh <ye.zhen@zilliz.com>
pull/24682/head
chyezh 2023-06-07 10:38:36 +08:00 committed by GitHub
parent d0c2fa5d19
commit 1593278f9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 76 additions and 42 deletions

View File

@ -18,8 +18,8 @@ func createMilvusReducer(ctx context.Context, params *queryParams, req *internal
return &cntReducer{}
} else if req.GetIterationExtensionReduceRate() > 0 {
params.limit = params.limit * req.GetIterationExtensionReduceRate()
if params.limit > Params.CommonCfg.TopKLimit.GetAsInt64() {
params.limit = Params.CommonCfg.TopKLimit.GetAsInt64()
if params.limit > Params.QuotaConfig.TopKLimit.GetAsInt64() {
params.limit = Params.QuotaConfig.TopKLimit.GetAsInt64()
}
}
return newDefaultLimitReducer(ctx, params, req, schema, collectionName)

View File

@ -147,7 +147,7 @@ func parseQueryParams(queryParamsPair []*commonpb.KeyValuePair) (*queryParams, e
return nil, fmt.Errorf("%s [%s] is invalid", LimitKey, limitStr)
}
if limit != 0 {
if err := validateLimit(limit); err != nil {
if err := validateTopKLimit(limit); err != nil {
return nil, fmt.Errorf("%s [%d] is invalid, %w", LimitKey, limit, err)
}
}
@ -161,13 +161,13 @@ func parseQueryParams(queryParamsPair []*commonpb.KeyValuePair) (*queryParams, e
}
if offset != 0 {
if err := validateLimit(offset); err != nil {
if err := validateTopKLimit(offset); err != nil {
return nil, fmt.Errorf("%s [%d] is invalid, %w", OffsetKey, offset, err)
}
}
}
if err = validateLimit(limit + offset); err != nil {
if err = validateTopKLimit(limit + offset); err != nil {
return nil, fmt.Errorf("invalid limit[%d] + offset[%d], %w", limit, offset, err)
}

View File

@ -118,7 +118,7 @@ func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair) (*planpb.QueryIn
if err != nil {
return nil, 0, fmt.Errorf("%s [%s] is invalid", TopKKey, topKStr)
}
if err := validateLimit(topK); err != nil {
if err := validateTopKLimit(topK); err != nil {
return nil, 0, fmt.Errorf("%s [%d] is invalid, %w", TopKKey, topK, err)
}
@ -131,14 +131,14 @@ func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair) (*planpb.QueryIn
}
if offset != 0 {
if err := validateLimit(offset); err != nil {
if err := validateTopKLimit(offset); err != nil {
return nil, 0, fmt.Errorf("%s [%d] is invalid, %w", OffsetKey, offset, err)
}
}
}
queryTopK := topK + offset
if err := validateLimit(queryTopK); err != nil {
if err := validateTopKLimit(queryTopK); err != nil {
return nil, 0, fmt.Errorf("%s+%s [%d] is invalid, %w", OffsetKey, TopKKey, queryTopK, err)
}
@ -244,7 +244,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
log.Ctx(ctx).Debug("translate output fields",
zap.Strings("output fields", t.request.GetOutputFields()))
//fetch search_growing from search param
// fetch search_growing from search param
var ignoreGrowing bool
for i, kv := range t.request.GetSearchParams() {
if kv.GetKey() == IgnoreGrowingKey {
@ -265,7 +265,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
}
// Check if nq is valid:
// https://milvus.io/docs/limitations.md
if err := validateLimit(nq); err != nil {
if err := validateNQLimit(nq); err != nil {
return fmt.Errorf("%s [%d] is invalid, %w", NQKey, nq, err)
}
t.SearchRequest.Nq = nq
@ -374,7 +374,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
guaranteeTs = parseGuaranteeTsFromConsistency(guaranteeTs, t.BeginTs(), consistencyLevel)
} else {
consistencyLevel = t.request.GetConsistencyLevel()
//Compatibility logic, parse guarantee timestamp
// Compatibility logic, parse guarantee timestamp
if consistencyLevel == 0 && guaranteeTs > 0 {
guaranteeTs = parseGuaranteeTs(guaranteeTs, t.BeginTs())
} else {
@ -814,7 +814,7 @@ func reduceSearchResultData(ctx context.Context, subSearchResultData []*schemapb
log.Ctx(ctx).Warn("invalid search results", zap.Error(err))
return ret, err
}
//printSearchResultData(sData, strconv.FormatInt(int64(i), 10))
// printSearchResultData(sData, strconv.FormatInt(int64(i), 10))
}
var (

View File

@ -81,9 +81,18 @@ func isNumber(c uint8) bool {
return true
}
func validateLimit(limit int64) error {
if limit <= 0 || limit > Params.CommonCfg.TopKLimit.GetAsInt64() {
return fmt.Errorf("should be in range [1, %d], but got %d", Params.CommonCfg.TopKLimit.GetAsInt64(), limit)
func validateTopKLimit(limit int64) error {
topKLimit := Params.QuotaConfig.TopKLimit.GetAsInt64()
if limit <= 0 || limit > topKLimit {
return fmt.Errorf("should be in range [1, %d], but got %d", topKLimit, limit)
}
return nil
}
func validateNQLimit(limit int64) error {
nqLimit := Params.QuotaConfig.NQLimit.GetAsInt64()
if limit <= 0 || limit > nqLimit {
return fmt.Errorf("nq (number of search vector per search request) should be in range [1, %d], but got %d", nqLimit, limit)
}
return nil
}
@ -302,7 +311,7 @@ func validateFieldType(schema *schemapb.CollectionSchema) error {
// ValidateFieldAutoID call after validatePrimaryKey
func ValidateFieldAutoID(coll *schemapb.CollectionSchema) error {
var idx = -1
idx := -1
for i, field := range coll.Fields {
if field.AutoID {
if idx != -1 {
@ -863,7 +872,6 @@ func translateOutputFields(outputFields []string, schema *schemapb.CollectionSch
return nil, nil, fmt.Errorf("field %s not exist", outputFieldName)
}
}
}
}
@ -957,7 +965,7 @@ func fillFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgstr
neededFieldsNum := 0
isPrimaryKeyNum := 0
var dataNameSet = typeutil.NewSet[string]()
dataNameSet := typeutil.NewSet[string]()
for _, data := range insertMsg.FieldsData {
fieldName := data.GetFieldName()
if dataNameSet.Contain(fieldName) {

View File

@ -1749,3 +1749,19 @@ func Test_ParseGuaranteeTsFromConsistency(t *testing.T) {
assert.Equal(t, tsNow, parseGuaranteeTsFromConsistency(tsNow, tsMax, customized))
assert.Equal(t, tsEventually, parseGuaranteeTsFromConsistency(tsDefault, tsMax, eventually))
}
func Test_NQLimit(t *testing.T) {
paramtable.Init()
assert.Nil(t, validateNQLimit(16384))
assert.Nil(t, validateNQLimit(1))
assert.Error(t, validateNQLimit(16385))
assert.Error(t, validateNQLimit(0))
}
func Test_TopKLimit(t *testing.T) {
paramtable.Init()
assert.Nil(t, validateTopKLimit(16384))
assert.Nil(t, validateTopKLimit(1))
assert.Error(t, validateTopKLimit(16385))
assert.Error(t, validateTopKLimit(0))
}

View File

@ -34,11 +34,11 @@ const (
// DefaultIndexSliceSize defines the default slice size of index file when serializing.
DefaultIndexSliceSize = 16
DefaultGracefulTime = 5000 //ms
DefaultGracefulTime = 5000 // ms
DefaultGracefulStopTimeout = 30 // s
DefaultThreadCoreCoefficient = 10
DefaultSessionTTL = 20 //s
DefaultSessionTTL = 20 // s
DefaultSessionRetryTimes = 30
DefaultMaxDegree = 56
@ -199,7 +199,6 @@ type commonConfig struct {
GracefulTime ParamItem `refreshable:"true"`
GracefulStopTimeout ParamItem `refreshable:"true"`
TopKLimit ParamItem `refreshable:"true"`
StorageType ParamItem `refreshable:"false"`
SimdType ParamItem `refreshable:"false"`
@ -521,17 +520,6 @@ This configuration is only used by querynode and indexnode, it selects CPU instr
}
p.LoadNumThreadRatio.Init(base.mgr)
p.TopKLimit = ParamItem{
Key: "common.topKLimit",
Version: "2.2.1",
DefaultValue: "16384",
Doc: `Search limit, which applies on:
maximum # of results to return (topK), and
maximum # of search requests (nq).
Check https://milvus.io/docs/limitations.md for more details.`,
}
p.TopKLimit.Init(base.mgr)
p.BeamWidthRatio = ParamItem{
Key: "common.DiskIndex.BeamWidthRatio",
Version: "2.0.0",
@ -860,7 +848,6 @@ func (p *rootCoordConfig) init(base *BaseTable) {
Export: true,
}
p.EnableActiveStandby.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////
@ -1110,9 +1097,9 @@ please adjust in embedded Milvus: false`,
// /////////////////////////////////////////////////////////////////////////////
// --- querycoord ---
type queryCoordConfig struct {
//Deprecated: Since 2.2.0
// Deprecated: Since 2.2.0
RetryNum ParamItem `refreshable:"true"`
//Deprecated: Since 2.2.0
// Deprecated: Since 2.2.0
RetryInterval ParamItem `refreshable:"true"`
TaskMergeCap ParamItem `refreshable:"false"`
TaskExecutionCap ParamItem `refreshable:"true"`
@ -1136,7 +1123,7 @@ type queryCoordConfig struct {
DistPullInterval ParamItem `refreshable:"false"`
HeartbeatAvailableInterval ParamItem `refreshable:"true"`
LoadTimeoutSeconds ParamItem `refreshable:"true"`
//Deprecated: Since 2.2.2, QueryCoord do not use HandOff logic anymore
// Deprecated: Since 2.2.2, QueryCoord do not use HandOff logic anymore
CheckHandoffInterval ParamItem `refreshable:"true"`
EnableActiveStandby ParamItem `refreshable:"false"`
@ -1413,7 +1400,7 @@ type queryNodeConfig struct {
FlowGraphMaxParallelism ParamItem `refreshable:"false"`
// stats
//Deprecated: Never used
// Deprecated: Never used
StatsPublishInterval ParamItem `refreshable:"true"`
// segcore
@ -1786,7 +1773,6 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
// /////////////////////////////////////////////////////////////////////////////
// --- datacoord ---
type dataCoordConfig struct {
// --- CHANNEL ---
WatchTimeoutInterval ParamItem `refreshable:"false"`
ChannelBalanceSilentDuration ParamItem `refreshable:"true"`

View File

@ -89,6 +89,8 @@ type quotaConfig struct {
// limits
MaxCollectionNum ParamItem `refreshable:"true"`
MaxCollectionNumPerDB ParamItem `refreshable:"true"`
TopKLimit ParamItem `refreshable:"true"`
NQLimit ParamItem `refreshable:"true"`
// limit writing
ForceDenyWriting ParamItem `refreshable:"true"`
@ -294,7 +296,7 @@ The maximum rate will not be greater than ` + "max" + `.`,
if !p.DMLLimitEnabled.GetAsBool() {
return max
}
var rate = getAsFloat(v)
rate := getAsFloat(v)
if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax
rate = megaBytes2Bytes(rate)
}
@ -338,7 +340,7 @@ The maximum rate will not be greater than ` + "max" + `.`,
if !p.DMLLimitEnabled.GetAsBool() {
return max
}
var rate = getAsFloat(v)
rate := getAsFloat(v)
if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax
rate = megaBytes2Bytes(rate)
}
@ -546,7 +548,6 @@ The maximum rate will not be greater than ` + "max" + `.`,
return min
}
return fmt.Sprintf("%f", rate)
},
}
p.DMLMinBulkLoadRatePerCollection.Init(base.mgr)
@ -737,6 +738,30 @@ The maximum rate will not be greater than ` + "max" + `.`,
}
p.MaxCollectionNumPerDB.Init(base.mgr)
p.TopKLimit = ParamItem{
Key: "quotaAndLimits.limits.topK",
Version: "2.2.1",
DefaultValue: "16384",
FallbackKeys: []string{
"common.topKLimit",
},
Doc: `Search limit, which applies on:
maximum # of results to return (topK).
Check https://milvus.io/docs/limitations.md for more details.`,
}
p.TopKLimit.Init(base.mgr)
p.NQLimit = ParamItem{
Key: "quotaAndLimits.limits.nq",
Version: "2.3.0",
DefaultValue: "16384",
FallbackKeys: []string{},
Doc: `Search limit, which applies on:
maximum # of search requests (nq).
Check https://milvus.io/docs/limitations.md for more details.`,
}
p.NQLimit.Init(base.mgr)
// limit writing
p.ForceDenyWriting = ParamItem{
Key: "quotaAndLimits.limitWriting.forceDeny",
@ -1100,7 +1125,6 @@ MB/s, default no limit`,
Export: true,
}
p.CoolOffSpeed.Init(base.mgr)
}
func megaBytes2Bytes(f float64) float64 {