milvus/internal/util/paramtable/quota_param.go

730 lines
19 KiB
Go
Raw Normal View History

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package paramtable
import (
"fmt"
"math"
"strconv"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
)
const (
// defaultMax is the default unlimited rate or threshold.
defaultMax = float64(math.MaxFloat64)
// MBSize used to convert megabytes and bytes.
MBSize = 1024.0 * 1024.0
// defaultDiskQuotaInMB is the default disk quota in megabytes.
defaultDiskQuotaInMB = defaultMax / MBSize
// defaultMin is the default minimal rate.
defaultMin = float64(0)
// defaultLowWaterLevel is the default memory low water level.
defaultLowWaterLevel = float64(0.85)
// defaultHighWaterLevel is the default memory low water level.
defaultHighWaterLevel = float64(0.95)
)
// quotaConfig is configuration for quota and limitations.
type quotaConfig struct {
QuotaAndLimitsEnabled ParamItem `refreshable:"false"`
QuotaCenterCollectInterval ParamItem `refreshable:"false"`
// ddl
DDLLimitEnabled ParamItem `refreshable:"true"`
DDLCollectionRate ParamItem `refreshable:"false"`
DDLPartitionRate ParamItem `refreshable:"false"`
IndexLimitEnabled ParamItem `refreshable:"true"`
MaxIndexRate ParamItem `refreshable:"false"`
FlushLimitEnabled ParamItem `refreshable:"true"`
MaxFlushRate ParamItem `refreshable:"false"`
CompactionLimitEnabled ParamItem `refreshable:"true"`
MaxCompactionRate ParamItem `refreshable:"false"`
// dml
DMLLimitEnabled ParamItem `refreshable:"true"`
DMLMaxInsertRate ParamItem `refreshable:"false"`
DMLMinInsertRate ParamItem `refreshable:"false"`
DMLMaxDeleteRate ParamItem `refreshable:"false"`
DMLMinDeleteRate ParamItem `refreshable:"false"`
DMLMaxBulkLoadRate ParamItem `refreshable:"false"`
DMLMinBulkLoadRate ParamItem `refreshable:"false"`
// dql
DQLLimitEnabled ParamItem `refreshable:"true"`
DQLMaxSearchRate ParamItem `refreshable:"false"`
DQLMinSearchRate ParamItem `refreshable:"false"`
DQLMaxQueryRate ParamItem `refreshable:"false"`
DQLMinQueryRate ParamItem `refreshable:"false"`
// limits
MaxCollectionNum ParamItem `refreshable:"true"`
// limit writing
ForceDenyWriting ParamItem `refreshable:"true"`
TtProtectionEnabled ParamItem `refreshable:"true"`
MaxTimeTickDelay ParamItem `refreshable:"true"`
MemProtectionEnabled ParamItem `refreshable:"true"`
DataNodeMemoryLowWaterLevel ParamItem `refreshable:"true"`
DataNodeMemoryHighWaterLevel ParamItem `refreshable:"true"`
QueryNodeMemoryLowWaterLevel ParamItem `refreshable:"true"`
QueryNodeMemoryHighWaterLevel ParamItem `refreshable:"true"`
DiskProtectionEnabled ParamItem `refreshable:"true"`
DiskQuota ParamItem `refreshable:"true"`
// limit reading
ForceDenyReading ParamItem `refreshable:"true"`
QueueProtectionEnabled ParamItem `refreshable:"true"`
NQInQueueThreshold ParamItem `refreshable:"true"`
QueueLatencyThreshold ParamItem `refreshable:"true"`
ResultProtectionEnabled ParamItem `refreshable:"true"`
MaxReadResultRate ParamItem `refreshable:"true"`
CoolOffSpeed ParamItem `refreshable:"true"`
}
func (p *quotaConfig) init(base *BaseTable) {
p.QuotaAndLimitsEnabled = ParamItem{
Key: "quotaAndLimits.enabled",
Version: "2.2.0",
DefaultValue: "false",
}
p.QuotaAndLimitsEnabled.Init(base.mgr)
const defaultInterval = "3.0"
p.QuotaCenterCollectInterval = ParamItem{
Key: "quotaAndLimits.quotaCenterCollectInterval",
Version: "2.2.0",
DefaultValue: defaultInterval,
Formatter: func(v string) string {
// (0 ~ 65536)
if getAsInt(v) <= 0 || getAsInt(v) >= 65536 {
return defaultInterval
}
return v
},
}
p.QuotaCenterCollectInterval.Init(base.mgr)
// ddl
max := fmt.Sprintf("%f", defaultMax)
min := fmt.Sprintf("%f", defaultMin)
p.DDLLimitEnabled = ParamItem{
Key: "quotaAndLimits.ddl.enabled",
Version: "2.2.0",
DefaultValue: "false",
}
p.DDLLimitEnabled.Init(base.mgr)
p.DDLCollectionRate = ParamItem{
Key: "quotaAndLimits.ddl.collectionRate",
Version: "2.2.0",
DefaultValue: max,
Formatter: func(v string) string {
if !p.DDLLimitEnabled.GetAsBool() {
return max
}
// [0 ~ Inf)
if getAsInt(v) < 0 {
return max
}
return v
},
}
p.DDLCollectionRate.Init(base.mgr)
p.DDLPartitionRate = ParamItem{
Key: "quotaAndLimits.ddl.partitionRate",
Version: "2.2.0",
DefaultValue: max,
Formatter: func(v string) string {
if !p.DDLLimitEnabled.GetAsBool() {
return max
}
// [0 ~ Inf)
if getAsInt(v) < 0 {
return max
}
return v
},
}
p.DDLPartitionRate.Init(base.mgr)
p.IndexLimitEnabled = ParamItem{
Key: "quotaAndLimits.indexRate.enabled",
Version: "2.2.0",
DefaultValue: "false",
}
p.IndexLimitEnabled.Init(base.mgr)
p.MaxIndexRate = ParamItem{
Key: "quotaAndLimits.indexRate.max",
Version: "2.2.0",
DefaultValue: max,
Formatter: func(v string) string {
if !p.IndexLimitEnabled.GetAsBool() {
return max
}
// [0 ~ Inf)
if getAsFloat(v) < 0 {
return max
}
return v
},
}
p.MaxIndexRate.Init(base.mgr)
p.FlushLimitEnabled = ParamItem{
Key: "quotaAndLimits.flushRate.enabled",
Version: "2.2.0",
DefaultValue: "false",
}
p.FlushLimitEnabled.Init(base.mgr)
p.MaxFlushRate = ParamItem{
Key: "quotaAndLimits.flushRate.max",
Version: "2.2.0",
DefaultValue: max,
Formatter: func(v string) string {
if !p.FlushLimitEnabled.GetAsBool() {
return max
}
// [0 ~ Inf)
if getAsInt(v) < 0 {
return max
}
return v
},
}
p.MaxFlushRate.Init(base.mgr)
p.CompactionLimitEnabled = ParamItem{
Key: "quotaAndLimits.compactionRate.enabled",
Version: "2.2.0",
DefaultValue: "false",
}
p.CompactionLimitEnabled.Init(base.mgr)
p.MaxCompactionRate = ParamItem{
Key: "quotaAndLimits.compactionRate.max",
Version: "2.2.0",
DefaultValue: max,
Formatter: func(v string) string {
if !p.CompactionLimitEnabled.GetAsBool() {
return max
}
// [0 ~ Inf)
if getAsInt(v) < 0 {
return max
}
return v
},
}
p.MaxCompactionRate.Init(base.mgr)
// dml
p.DMLLimitEnabled = ParamItem{
Key: "quotaAndLimits.dml.enabled",
Version: "2.2.0",
DefaultValue: "false",
}
p.DMLLimitEnabled.Init(base.mgr)
p.DMLMaxInsertRate = ParamItem{
Key: "quotaAndLimits.dml.insertRate.max",
Version: "2.2.0",
DefaultValue: max,
Formatter: func(v string) string {
if !p.DMLLimitEnabled.GetAsBool() {
return max
}
if math.Abs(getAsFloat(v)-defaultMax) > 0.001 { // maxRate != defaultMax
return fmt.Sprintf("%f", megaBytes2Bytes(getAsFloat(v)))
}
// [0, inf)
if getAsInt(v) < 0 {
return max
}
return v
},
}
p.DMLMaxInsertRate.Init(base.mgr)
p.DMLMinInsertRate = ParamItem{
Key: "quotaAndLimits.dml.insertRate.min",
Version: "2.2.0",
DefaultValue: min,
Formatter: func(v string) string {
if !p.DMLLimitEnabled.GetAsBool() {
return min
}
rate := megaBytes2Bytes(getAsFloat(v))
// [0, inf)
if rate < 0 {
return min
}
if !p.checkMinMaxLegal(rate, getAsFloat(v)) {
return min
}
return v
},
}
p.DMLMinInsertRate.Init(base.mgr)
p.DMLMaxDeleteRate = ParamItem{
Key: "quotaAndLimits.dml.deleteRate.max",
Version: "2.2.0",
DefaultValue: max,
Formatter: func(v string) string {
if !p.DMLLimitEnabled.GetAsBool() {
return max
}
rate := getAsFloat(v)
if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax
return fmt.Sprintf("%f", megaBytes2Bytes(rate))
}
// [0, inf)
if rate < 0 {
return max
}
return v
},
}
p.DMLMaxDeleteRate.Init(base.mgr)
p.DMLMinDeleteRate = ParamItem{
Key: "quotaAndLimits.dml.deleteRate.min",
Version: "2.2.0",
DefaultValue: min,
Formatter: func(v string) string {
if !p.DMLLimitEnabled.GetAsBool() {
return min
}
rate := megaBytes2Bytes(getAsFloat(v))
// [0, inf)
if rate < 0 {
return min
}
if !p.checkMinMaxLegal(rate, getAsFloat(v)) {
return min
}
return v
},
}
p.DMLMinDeleteRate.Init(base.mgr)
p.DMLMaxBulkLoadRate = ParamItem{
Key: "quotaAndLimits.dml.bulkLoadRate.max",
Version: "2.2.0",
DefaultValue: max,
Formatter: func(v string) string {
if !p.DMLLimitEnabled.GetAsBool() {
return max
}
rate := getAsFloat(v)
if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax
return fmt.Sprintf("%f", megaBytes2Bytes(rate))
}
// [0, inf)
if rate < 0 {
return max
}
return v
},
}
p.DMLMaxBulkLoadRate.Init(base.mgr)
p.DMLMinBulkLoadRate = ParamItem{
Key: "quotaAndLimits.dml.bulkLoadRate.min",
Version: "2.2.0",
DefaultValue: min,
Formatter: func(v string) string {
if !p.DMLLimitEnabled.GetAsBool() {
return min
}
rate := megaBytes2Bytes(getAsFloat(v))
// [0, inf)
if rate < 0 {
return min
}
if !p.checkMinMaxLegal(rate, getAsFloat(v)) {
return min
}
return v
},
}
p.DMLMinBulkLoadRate.Init(base.mgr)
// dql
p.DQLLimitEnabled = ParamItem{
Key: "quotaAndLimits.dql.enabled",
Version: "2.2.0",
DefaultValue: "false",
}
p.DQLLimitEnabled.Init(base.mgr)
p.DQLMaxSearchRate = ParamItem{
Key: "quotaAndLimits.dql.searchRate.max",
Version: "2.2.0",
DefaultValue: max,
Formatter: func(v string) string {
if !p.DQLLimitEnabled.GetAsBool() {
return max
}
// [0, inf)
if getAsFloat(v) < 0 {
return max
}
return v
},
}
p.DQLMaxSearchRate.Init(base.mgr)
p.DQLMinSearchRate = ParamItem{
Key: "quotaAndLimits.dql.searchRate.min",
Version: "2.2.0",
DefaultValue: min,
Formatter: func(v string) string {
if !p.DQLLimitEnabled.GetAsBool() {
return min
}
rate := getAsFloat(v)
// [0, inf)
if rate < 0 {
return min
}
if !p.checkMinMaxLegal(rate, getAsFloat(v)) {
return min
}
return v
},
}
p.DQLMinSearchRate.Init(base.mgr)
p.DQLMaxQueryRate = ParamItem{
Key: "quotaAndLimits.dql.queryRate.max",
Version: "2.2.0",
DefaultValue: max,
Formatter: func(v string) string {
if !p.DQLLimitEnabled.GetAsBool() {
return max
}
// [0, inf)
if getAsFloat(v) < 0 {
return max
}
return v
},
}
p.DQLMaxQueryRate.Init(base.mgr)
p.DQLMinQueryRate = ParamItem{
Key: "quotaAndLimits.dql.queryRate.min",
Version: "2.2.0",
DefaultValue: min,
Formatter: func(v string) string {
if !p.DQLLimitEnabled.GetAsBool() {
return min
}
rate := getAsFloat(v)
// [0, inf)
if rate < 0 {
return min
}
if !p.checkMinMaxLegal(rate, getAsFloat(v)) {
return min
}
return v
},
}
p.DQLMinQueryRate.Init(base.mgr)
// limits
p.MaxCollectionNum = ParamItem{
Key: "quotaAndLimits.limits.collection.maxNum",
Version: "2.2.0",
DefaultValue: "64",
}
p.MaxCollectionNum.Init(base.mgr)
// limit writing
p.ForceDenyWriting = ParamItem{
Key: "quotaAndLimits.limitWriting.forceDeny",
Version: "2.2.0",
DefaultValue: "false",
}
p.ForceDenyWriting.Init(base.mgr)
p.TtProtectionEnabled = ParamItem{
Key: "quotaAndLimits.limitWriting.ttProtection.enabled",
Version: "2.2.0",
DefaultValue: "true",
}
p.TtProtectionEnabled.Init(base.mgr)
const defaultMaxTtDelay = "300.0"
p.MaxTimeTickDelay = ParamItem{
Key: "quotaAndLimits.limitWriting.ttProtection.maxTimeTickDelay",
Version: "2.2.0",
DefaultValue: defaultMaxTtDelay,
Formatter: func(v string) string {
if !p.TtProtectionEnabled.GetAsBool() {
return fmt.Sprintf("%d", math.MaxInt64)
}
delay := getAsFloat(v)
// (0, 65536)
if delay <= 0 || delay >= 65536 {
return defaultMaxTtDelay
}
return fmt.Sprintf("%f", delay)
},
}
p.MaxTimeTickDelay.Init(base.mgr)
p.MemProtectionEnabled = ParamItem{
Key: "quotaAndLimits.limitWriting.memProtection.enabled",
Version: "2.2.0",
DefaultValue: "true",
}
p.MemProtectionEnabled.Init(base.mgr)
lowWaterLevel := fmt.Sprintf("%f", defaultLowWaterLevel)
highWaterLevel := fmt.Sprintf("%f", defaultHighWaterLevel)
p.DataNodeMemoryLowWaterLevel = ParamItem{
Key: "quotaAndLimits.limitWriting.memProtection.dataNodeMemoryLowWaterLevel",
Version: "2.2.0",
DefaultValue: lowWaterLevel,
Formatter: func(v string) string {
if !p.MemProtectionEnabled.GetAsBool() {
return lowWaterLevel
}
level := getAsFloat(v)
// (0, 1]
if level <= 0 || level > 1 {
return lowWaterLevel
}
return v
},
}
p.DataNodeMemoryLowWaterLevel.Init(base.mgr)
p.DataNodeMemoryHighWaterLevel = ParamItem{
Key: "quotaAndLimits.limitWriting.memProtection.dataNodeMemoryHighWaterLevel",
Version: "2.2.0",
DefaultValue: highWaterLevel,
Formatter: func(v string) string {
if !p.MemProtectionEnabled.GetAsBool() {
return "1"
}
level := getAsFloat(v)
// (0, 1]
if level <= 0 || level > 1 {
// log.Warn("MemoryLowWaterLevel must in the range of `(0, 1]`, use default value", zap.Float64("low", p.DataNodeMemoryHighWaterLevel), zap.Float64("default", defaultHighWaterLevel))
return highWaterLevel
}
if !p.checkMinMaxLegal(p.DataNodeMemoryLowWaterLevel.GetAsFloat(), getAsFloat(v)) {
return highWaterLevel
}
return v
},
}
p.DataNodeMemoryHighWaterLevel.Init(base.mgr)
p.QueryNodeMemoryLowWaterLevel = ParamItem{
Key: "quotaAndLimits.limitWriting.memProtection.queryNodeMemoryLowWaterLevel",
Version: "2.2.0",
DefaultValue: lowWaterLevel,
Formatter: func(v string) string {
if !p.MemProtectionEnabled.GetAsBool() {
return lowWaterLevel
}
level := getAsFloat(v)
// (0, 1]
if level <= 0 || level > 1 {
// log.Warn("MemoryLowWaterLevel must in the range of `(0, 1]`, use default value", zap.Float64("low", p.QueryNodeMemoryLowWaterLevel), zap.Float64("default", defaultLowWaterLevel))
return lowWaterLevel
}
return v
},
}
p.QueryNodeMemoryLowWaterLevel.Init(base.mgr)
p.QueryNodeMemoryHighWaterLevel = ParamItem{
Key: "quotaAndLimits.limitWriting.memProtection.queryNodeMemoryHighWaterLevel",
Version: "2.2.0",
DefaultValue: highWaterLevel,
Formatter: func(v string) string {
if !p.MemProtectionEnabled.GetAsBool() {
return highWaterLevel
}
level := getAsFloat(v)
// (0, 1]
if level <= 0 || level > 1 {
// log.Warn("MemoryLowWaterLevel must in the range of `(0, 1]`, use default value", zap.Float64("low", p.QueryNodeMemoryHighWaterLevel), zap.Float64("default", defaultHighWaterLevel))
return highWaterLevel
}
if !p.checkMinMaxLegal(p.QueryNodeMemoryLowWaterLevel.GetAsFloat(), getAsFloat(v)) {
return highWaterLevel
}
return v
},
}
p.QueryNodeMemoryHighWaterLevel.Init(base.mgr)
p.DiskProtectionEnabled = ParamItem{
Key: "quotaAndLimits.limitWriting.diskProtection.enabled",
Version: "2.2.0",
DefaultValue: "true",
}
p.DiskProtectionEnabled.Init(base.mgr)
quota := fmt.Sprintf("%f", defaultDiskQuotaInMB)
p.DiskQuota = ParamItem{
Key: "quotaAndLimits.limitWriting.diskProtection.diskQuota",
Version: "2.2.0",
DefaultValue: quota,
Formatter: func(v string) string {
if !p.DiskProtectionEnabled.GetAsBool() {
return max
}
level := getAsFloat(v)
// (0, +inf)
if level <= 0 {
level = getAsFloat(quota)
}
// megabytes to bytes
return fmt.Sprintf("%f", megaBytes2Bytes(level))
},
}
p.DiskQuota.Init(base.mgr)
// limit reading
p.ForceDenyReading = ParamItem{
Key: "quotaAndLimits.limitReading.forceDeny",
Version: "2.2.0",
DefaultValue: "false",
}
p.ForceDenyReading.Init(base.mgr)
p.QueueProtectionEnabled = ParamItem{
Key: "quotaAndLimits.limitReading.queueProtection.enabled",
Version: "2.2.0",
DefaultValue: "false",
}
p.QueueProtectionEnabled.Init(base.mgr)
p.NQInQueueThreshold = ParamItem{
Key: "quotaAndLimits.limitReading.queueProtection.nqInQueueThreshold",
Version: "2.2.0",
DefaultValue: strconv.FormatInt(math.MaxInt64, 10),
Formatter: func(v string) string {
if !p.QueueProtectionEnabled.GetAsBool() {
return strconv.FormatInt(math.MaxInt64, 10)
}
threshold := getAsFloat(v)
// [0, inf)
if threshold < 0 {
return strconv.FormatInt(math.MaxInt64, 10)
}
return v
},
}
p.NQInQueueThreshold.Init(base.mgr)
p.QueueLatencyThreshold = ParamItem{
Key: "quotaAndLimits.limitReading.queueProtection.queueLatencyThreshold",
Version: "2.2.0",
DefaultValue: max,
Formatter: func(v string) string {
if !p.QueueProtectionEnabled.GetAsBool() {
return max
}
level := getAsFloat(v)
// [0, inf)
if level < 0 {
return max
}
return v
},
}
p.QueueLatencyThreshold.Init(base.mgr)
p.ResultProtectionEnabled = ParamItem{
Key: "quotaAndLimits.limitReading.resultProtection.enabled",
Version: "2.2.0",
DefaultValue: "false",
}
p.ResultProtectionEnabled.Init(base.mgr)
p.MaxReadResultRate = ParamItem{
Key: "quotaAndLimits.limitReading.resultProtection.maxReadResultRate",
Version: "2.2.0",
DefaultValue: max,
Formatter: func(v string) string {
if !p.ResultProtectionEnabled.GetAsBool() {
return max
}
rate := getAsFloat(v)
if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax
return fmt.Sprintf("%f", megaBytes2Bytes(rate))
}
// [0, inf)
if rate < 0 {
return max
}
return v
},
}
p.MaxReadResultRate.Init(base.mgr)
const defaultSpeed = "0.9"
p.CoolOffSpeed = ParamItem{
Key: "quotaAndLimits.limitReading.coolOffSpeed",
Version: "2.2.0",
DefaultValue: defaultSpeed,
Formatter: func(v string) string {
// (0, 1]
speed := getAsFloat(v)
if speed <= 0 || speed > 1 {
// log.Warn("CoolOffSpeed must in the range of `(0, 1]`, use default value", zap.Float64("speed", p.CoolOffSpeed), zap.Float64("default", defaultSpeed))
return defaultSpeed
}
return v
},
}
p.CoolOffSpeed.Init(base.mgr)
}
func megaBytes2Bytes(f float64) float64 {
return f * MBSize
}
func (p *quotaConfig) checkMinMaxLegal(min, max float64) bool {
if min > max {
log.Warn("init QuotaConfig failed, max/high must be greater than or equal to min/low, use default values",
zap.String("msg", fmt.Sprintf("min: %v, max: %v, defaultMin: %v, defaultMax: %v", min, max, defaultMin, defaultMax)))
return false
}
return true
}