fix qupta_center calculate negative memory factor for rate limit (#23750)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/23790/head
wei liu 2023-04-28 11:32:35 +08:00 committed by GitHub
parent 6f94bfd26d
commit 537e068fb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 200 additions and 94 deletions

View File

@ -330,16 +330,11 @@ func (q *QuotaCenter) calculateReadRates() {
return
}
enableQueueProtection := Params.QuotaConfig.QueueProtectionEnabled.GetAsBool()
if !enableQueueProtection {
return
}
limitCollectionSet := typeutil.NewUniqueSet()
enableQueueProtection := Params.QuotaConfig.QueueProtectionEnabled.GetAsBool()
// query latency
queueLatencyThreshold := Params.QuotaConfig.QueueLatencyThreshold.GetAsDuration(time.Second)
// queueLatencyThreshold >= 0 means enable queue latency protection
// enableQueueProtection && queueLatencyThreshold >= 0 means enable queue latency protection
if queueLatencyThreshold >= 0 {
for _, metric := range q.queryNodeMetrics {
searchLatency := metric.SearchQueue.AvgQueueDuration
@ -352,7 +347,7 @@ func (q *QuotaCenter) calculateReadRates() {
// queue length
nqInQueueThreshold := Params.QuotaConfig.NQInQueueThreshold.GetAsInt64()
if nqInQueueThreshold >= 0 {
if enableQueueProtection && nqInQueueThreshold >= 0 {
// >= 0 means enable queue length protection
sum := func(ri metricsinfo.ReadInfoInQueue) int64 {
return ri.UnsolvedQueue + ri.ReadyQueue + ri.ReceiveChan + ri.ExecuteChan
@ -415,6 +410,7 @@ func (q *QuotaCenter) calculateReadRates() {
// calculateWriteRates calculates and sets dml rates.
func (q *QuotaCenter) calculateWriteRates() error {
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
if Params.QuotaConfig.ForceDenyWriting.GetAsBool() {
q.forceDenyWriting(commonpb.ErrorCode_ForceDeny)
return nil
@ -422,123 +418,184 @@ func (q *QuotaCenter) calculateWriteRates() error {
q.checkDiskQuota()
collectionFactor := map[int64]float64{}
for _, collection := range q.writableCollections {
collectionFactor[collection] = 1.0
}
ts, err := q.tsoAllocator.GenerateTSO(1)
if err != nil {
return err
}
maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
curTs, _ := tsoutil.ParseTS(ts)
processTtDelay := func(role string, nodeID int64, vchannel string, minTs time.Time, delay time.Duration, effectedCollections []int64) {
log := log.With(zap.String("role", role),
zap.Int64("nodeID", nodeID),
zap.String("vchannel", vchannel),
zap.Time("curTs", curTs),
zap.Time("minTs", minTs),
zap.Duration("delay", delay),
zap.Duration("MaxDelay", maxDelay),
zap.Int64s("effectedCollections", effectedCollections),
)
factor := float64(maxDelay.Nanoseconds()-delay.Nanoseconds()) / float64(maxDelay.Nanoseconds())
collectionFactors := q.getTimeTickDelayFactor(ts)
memFactors := q.getMemoryFactor()
for collection, factor := range memFactors {
_, ok := collectionFactors[collection]
if !ok || collectionFactors[collection] > factor {
collectionFactors[collection] = factor
}
}
for collection, factor := range collectionFactors {
if factor <= 0 {
log.Warn("QuotaCenter: force deny writing due to long timeTick delay")
q.forceDenyWriting(commonpb.ErrorCode_TimeTickLongDelay, effectedCollections...)
q.forceDenyWriting(commonpb.ErrorCode_TimeTickLongDelay, collection)
}
for _, collection := range effectedCollections {
if factor < collectionFactor[collection] {
collectionFactor[collection] = factor
}
if q.currentRates[collection][internalpb.RateType_DMLInsert] != Inf {
q.currentRates[collection][internalpb.RateType_DMLInsert] *= Limit(factor)
}
if q.currentRates[collection][internalpb.RateType_DMLDelete] != Inf {
q.currentRates[collection][internalpb.RateType_DMLDelete] *= Limit(factor)
}
q.guaranteeMinRate(Params.QuotaConfig.DMLMinInsertRate.GetAsFloat(), internalpb.RateType_DMLInsert)
q.guaranteeMinRate(Params.QuotaConfig.DMLMinDeleteRate.GetAsFloat(), internalpb.RateType_DMLDelete)
log.RatedDebug(10, "QuotaCenter cool write rates off done",
zap.Int64("collectionID", collection),
zap.Float64("factor", factor))
}
processMemUsage := func(role string, nodeID int64, hms metricsinfo.HardwareMetrics, memLowLevel float64, memHighLevel float64, effectedCollections []int64) {
memoryWaterLevel := float64(hms.MemoryUsage) / float64(hms.Memory)
log := log.With(
zap.String("role", role),
zap.Int64("nodeID", nodeID),
zap.Uint64("usedMem", hms.MemoryUsage),
zap.Uint64("totalMem", hms.Memory),
zap.Float64("memoryWaterLevel", memoryWaterLevel),
zap.Float64("memoryHighWaterLevel", memHighLevel),
zap.Int64s("effectedCollections", effectedCollections),
)
return nil
}
factor := (memHighLevel - memoryWaterLevel) / (memHighLevel - memLowLevel)
func (q *QuotaCenter) getTimeTickDelayFactor(ts Timestamp) map[int64]float64 {
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
if !Params.QuotaConfig.TtProtectionEnabled.GetAsBool() {
return nil
}
if factor <= 0 {
log.Warn("QuotaCenter: force deny writing due to memory reach high water")
q.forceDenyWriting(commonpb.ErrorCode_MemoryQuotaExhausted, effectedCollections...)
maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
if maxDelay < 0 {
// < 0 means disable tt protection
return nil
}
collectionsMaxDelay := make(map[int64]time.Duration)
updateCollectionDelay := func(delay time.Duration, collections []int64) {
for _, collection := range collections {
_, ok := collectionsMaxDelay[collection]
if !ok || collectionsMaxDelay[collection] < delay {
collectionsMaxDelay[collection] = delay
}
}
}
if factor < 0.9 {
log.Warn("QuotaCenter: Node memory to low water level, limit writing rate",
t1, _ := tsoutil.ParseTS(ts)
for nodeID, metric := range q.queryNodeMetrics {
if metric.Fgm.NumFlowGraph > 0 && metric.Fgm.MinFlowGraphChannel != "" {
t2, _ := tsoutil.ParseTS(metric.Fgm.MinFlowGraphTt)
delay := t1.Sub(t2)
updateCollectionDelay(delay, metric.Effect.CollectionIDs)
metrics.RootCoordTtDelay.WithLabelValues(typeutil.QueryNodeRole, strconv.FormatInt(nodeID, 10)).Set(float64(delay.Milliseconds()))
}
}
for nodeID, metric := range q.dataNodeMetrics {
if metric.Fgm.NumFlowGraph > 0 && metric.Fgm.MinFlowGraphChannel != "" {
t2, _ := tsoutil.ParseTS(metric.Fgm.MinFlowGraphTt)
delay := t1.Sub(t2)
updateCollectionDelay(delay, metric.Effect.CollectionIDs)
metrics.RootCoordTtDelay.WithLabelValues(typeutil.DataNodeRole, strconv.FormatInt(nodeID, 10)).Set(float64(delay.Milliseconds()))
}
}
collectionFactor := make(map[int64]float64)
for collectionID, curMaxDelay := range collectionsMaxDelay {
if curMaxDelay.Nanoseconds() >= maxDelay.Nanoseconds() {
log.RatedWarn(10, "QuotaCenter force deny writing due to long timeTick delay",
zap.Int64("collectionID", collectionID),
zap.Time("curTs", t1),
zap.Duration("delay", curMaxDelay),
zap.Duration("MaxDelay", maxDelay))
log.RatedInfo(10, "DataNode and QueryNode Metrics",
zap.Any("QueryNodeMetrics", q.queryNodeMetrics),
zap.Any("DataNodeMetrics", q.dataNodeMetrics))
collectionFactor[collectionID] = 0
continue
}
factor := float64(maxDelay.Nanoseconds()-curMaxDelay.Nanoseconds()) / float64(maxDelay.Nanoseconds())
if factor <= 0.9 {
log.RatedWarn(10, "QuotaCenter: limit writing due to long timeTick delay",
zap.Int64("collectionID", collectionID),
zap.Time("curTs", t1),
zap.Duration("delay", curMaxDelay),
zap.Duration("MaxDelay", maxDelay),
zap.Float64("factor", factor))
}
for _, collection := range effectedCollections {
if factor < collectionFactor[collection] {
collectionFactor[collection] = factor
}
}
collectionFactor[collectionID] = factor
}
enableTtProtection := Params.QuotaConfig.TtProtectionEnabled.GetAsBool()
enableMemProtection := Params.QuotaConfig.MemProtectionEnabled.GetAsBool()
return collectionFactor
}
// getMemoryFactor checks whether any node has memory resource issue,
// and return the factor according to max memory water level.
func (q *QuotaCenter) getMemoryFactor() map[int64]float64 {
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
if !Params.QuotaConfig.MemProtectionEnabled.GetAsBool() {
return nil
}
dataNodeMemoryLowWaterLevel := Params.QuotaConfig.DataNodeMemoryLowWaterLevel.GetAsFloat()
dataNodeMemoryHighWaterLevel := Params.QuotaConfig.DataNodeMemoryHighWaterLevel.GetAsFloat()
queryNodeMemoryLowWaterLevel := Params.QuotaConfig.QueryNodeMemoryLowWaterLevel.GetAsFloat()
queryNodeMemoryHighWaterLevel := Params.QuotaConfig.QueryNodeMemoryHighWaterLevel.GetAsFloat()
collectionFactor := make(map[int64]float64)
updateCollectionFactor := func(factor float64, collections []int64) {
for _, collection := range collections {
_, ok := collectionFactor[collection]
if !ok || collectionFactor[collection] > factor {
collectionFactor[collection] = factor
}
}
}
for nodeID, metric := range q.queryNodeMetrics {
if enableTtProtection && maxDelay >= 0 && metric.Fgm.NumFlowGraph > 0 && metric.Fgm.MinFlowGraphChannel != "" {
minTs, _ := tsoutil.ParseTS(metric.Fgm.MinFlowGraphTt)
delay := curTs.Sub(minTs)
processTtDelay(typeutil.QueryNodeRole, nodeID, metric.Fgm.MinFlowGraphChannel, minTs, delay, metric.Effect.CollectionIDs)
metrics.RootCoordTtDelay.WithLabelValues(typeutil.QueryNodeRole, strconv.FormatInt(nodeID, 10)).Set(float64(delay.Milliseconds()))
memoryWaterLevel := float64(metric.Hms.MemoryUsage) / float64(metric.Hms.Memory)
if memoryWaterLevel <= queryNodeMemoryLowWaterLevel {
continue
}
if enableMemProtection {
processMemUsage(typeutil.QueryNodeRole, nodeID, metric.Hms, queryNodeMemoryLowWaterLevel, queryNodeMemoryHighWaterLevel, metric.Effect.CollectionIDs)
if memoryWaterLevel >= queryNodeMemoryHighWaterLevel {
log.RatedWarn(10, "QuotaCenter: QueryNode memory to high water level",
zap.String("Node", fmt.Sprintf("%s-%d", typeutil.QueryNodeRole, nodeID)),
zap.Int64s("collections", metric.Effect.CollectionIDs),
zap.Uint64("UsedMem", metric.Hms.MemoryUsage),
zap.Uint64("TotalMem", metric.Hms.Memory),
zap.Float64("memoryWaterLevel", memoryWaterLevel),
zap.Float64("memoryHighWaterLevel", queryNodeMemoryHighWaterLevel))
updateCollectionFactor(0, metric.Effect.CollectionIDs)
continue
}
factor := (queryNodeMemoryHighWaterLevel - memoryWaterLevel) / (queryNodeMemoryHighWaterLevel - queryNodeMemoryLowWaterLevel)
updateCollectionFactor(factor, metric.Effect.CollectionIDs)
log.RatedWarn(10, "QuotaCenter: QueryNode memory to low water level, limit writing rate",
zap.String("Node", fmt.Sprintf("%s-%d", typeutil.QueryNodeRole, nodeID)),
zap.Int64s("collections", metric.Effect.CollectionIDs),
zap.Uint64("UsedMem", metric.Hms.MemoryUsage),
zap.Uint64("TotalMem", metric.Hms.Memory),
zap.Float64("memoryWaterLevel", memoryWaterLevel),
zap.Float64("memoryLowWaterLevel", queryNodeMemoryLowWaterLevel))
}
for nodeID, metric := range q.dataNodeMetrics {
if enableTtProtection && maxDelay >= 0 && metric.Fgm.NumFlowGraph > 0 && metric.Fgm.MinFlowGraphChannel != "" {
minTs, _ := tsoutil.ParseTS(metric.Fgm.MinFlowGraphTt)
delay := curTs.Sub(minTs)
processTtDelay(typeutil.DataNodeRole, nodeID, metric.Fgm.MinFlowGraphChannel, minTs, delay, metric.Effect.CollectionIDs)
metrics.RootCoordTtDelay.WithLabelValues(typeutil.DataNodeRole, strconv.FormatInt(nodeID, 10)).Set(float64(delay.Milliseconds()))
memoryWaterLevel := float64(metric.Hms.MemoryUsage) / float64(metric.Hms.Memory)
if memoryWaterLevel <= dataNodeMemoryLowWaterLevel {
continue
}
if enableMemProtection {
processMemUsage(typeutil.QueryNodeRole, nodeID, metric.Hms, dataNodeMemoryLowWaterLevel, dataNodeMemoryHighWaterLevel, metric.Effect.CollectionIDs)
if memoryWaterLevel >= dataNodeMemoryHighWaterLevel {
log.RatedWarn(10, "QuotaCenter: DataNode memory to high water level",
zap.String("Node", fmt.Sprintf("%s-%d", typeutil.DataNodeRole, nodeID)),
zap.Int64s("collections", metric.Effect.CollectionIDs),
zap.Uint64("UsedMem", metric.Hms.MemoryUsage),
zap.Uint64("TotalMem", metric.Hms.Memory),
zap.Float64("memoryWaterLevel", memoryWaterLevel),
zap.Float64("memoryHighWaterLevel", dataNodeMemoryHighWaterLevel))
updateCollectionFactor(0, metric.Effect.CollectionIDs)
continue
}
factor := (dataNodeMemoryHighWaterLevel - memoryWaterLevel) / (dataNodeMemoryHighWaterLevel - dataNodeMemoryLowWaterLevel)
log.RatedWarn(10, "QuotaCenter: DataNode memory to low water level, limit writing rate",
zap.String("Node", fmt.Sprintf("%s-%d", typeutil.DataNodeRole, nodeID)),
zap.Int64s("collections", metric.Effect.CollectionIDs),
zap.Uint64("UsedMem", metric.Hms.MemoryUsage),
zap.Uint64("TotalMem", metric.Hms.Memory),
zap.Float64("memoryWaterLevel", memoryWaterLevel),
zap.Float64("memoryLowWaterLevel", dataNodeMemoryLowWaterLevel))
updateCollectionFactor(factor, metric.Effect.CollectionIDs)
}
for collection, cf := range collectionFactor {
if q.currentRates[collection][internalpb.RateType_DMLInsert] != Inf && cf > 0 {
q.currentRates[collection][internalpb.RateType_DMLInsert] *= Limit(cf)
}
if q.currentRates[collection][internalpb.RateType_DMLDelete] != Inf && cf > 0 {
q.currentRates[collection][internalpb.RateType_DMLDelete] *= Limit(cf)
}
q.guaranteeMinRate(Params.QuotaConfig.DMLMinInsertRate.GetAsFloat(), internalpb.RateType_DMLInsert)
q.guaranteeMinRate(Params.QuotaConfig.DMLMinDeleteRate.GetAsFloat(), internalpb.RateType_DMLDelete)
log.Warn("QuotaCenter cool write rates off done",
zap.Int64("collectionID", collection),
zap.Float64("factor", cf))
}
return nil
return collectionFactor
}
// calculateRates calculates target rates by different strategies.

View File

@ -138,6 +138,52 @@ func TestQuotaCenter(t *testing.T) {
assert.Error(t, err)
})
t.Run("test getTimeTickDelayFactor factors", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
type ttCase struct {
maxTtDelay time.Duration
curTt time.Time
fgTt time.Time
expectedFactor float64
}
t0 := time.Now()
ttCases := []ttCase{
{10 * time.Second, t0, t0.Add(1 * time.Second), 1},
{10 * time.Second, t0, t0, 1},
{10 * time.Second, t0.Add(1 * time.Second), t0, 0.9},
{10 * time.Second, t0.Add(2 * time.Second), t0, 0.8},
{10 * time.Second, t0.Add(5 * time.Second), t0, 0.5},
{10 * time.Second, t0.Add(7 * time.Second), t0, 0.3},
{10 * time.Second, t0.Add(9 * time.Second), t0, 0.1},
{10 * time.Second, t0.Add(10 * time.Second), t0, 0},
{10 * time.Second, t0.Add(100 * time.Second), t0, 0},
}
backup := Params.QuotaConfig.MaxTimeTickDelay
for _, c := range ttCases {
paramtable.Get().Save(Params.QuotaConfig.MaxTimeTickDelay.Key, fmt.Sprintf("%f", c.maxTtDelay.Seconds()))
fgTs := tsoutil.ComposeTSByTime(c.fgTt, 0)
quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{
1: {
Fgm: metricsinfo.FlowGraphMetric{
NumFlowGraph: 1,
MinFlowGraphTt: fgTs,
MinFlowGraphChannel: "dml",
},
},
}
curTs := tsoutil.ComposeTSByTime(c.curTt, 0)
factors := quotaCenter.getTimeTickDelayFactor(curTs)
for _, factor := range factors {
assert.True(t, math.Abs(factor-c.expectedFactor) < 0.01)
}
}
Params.QuotaConfig.MaxTimeTickDelay = backup
})
t.Run("test TimeTickDelayFactor factors", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
@ -329,8 +375,11 @@ func TestQuotaCenter(t *testing.T) {
},
},
}
quotaCenter.resetAllCurrentRates()
quotaCenter.calculateWriteRates()
factors := quotaCenter.getMemoryFactor()
for _, factor := range factors {
assert.True(t, math.Abs(factor-c.expectedFactor) < 0.01)
}
}
paramtable.Get().Reset(Params.QuotaConfig.QueryNodeMemoryLowWaterLevel.Key)