mirror of https://github.com/milvus-io/milvus.git
enhance: [2.4] allow to delete data when disk quota exhausted (#37139)
- issue: #37133 - pr: #37134 Signed-off-by: SimFG <bang.fu@zilliz.com>pull/37161/head
parent
2dc89b1cad
commit
ae4ce9bbba
|
@ -109,6 +109,12 @@ var dqlRateTypes = typeutil.NewSet(
|
|||
internalpb.RateType_DQLQuery,
|
||||
)
|
||||
|
||||
type LimiterRange struct {
|
||||
RateScope internalpb.RateScope
|
||||
OpType opType
|
||||
ExcludeRateTypes typeutil.Set[internalpb.RateType]
|
||||
}
|
||||
|
||||
// QuotaCenter manages the quota and limitations of the whole cluster,
|
||||
// it receives metrics info from DataNodes, QueryNodes and Proxies, and
|
||||
// notifies Proxies to limit rate of requests from clients or reject
|
||||
|
@ -223,18 +229,19 @@ func initLimiter(limiterFunc func(internalpb.RateType) *ratelimitutil.Limiter, r
|
|||
return rateLimiters
|
||||
}
|
||||
|
||||
func updateLimiter(node *rlinternal.RateLimiterNode, limiter *ratelimitutil.Limiter, rateScope internalpb.RateScope, opType opType) {
|
||||
func updateLimiter(node *rlinternal.RateLimiterNode, limiter *ratelimitutil.Limiter, limiterRange *LimiterRange) {
|
||||
if node == nil {
|
||||
log.Warn("update limiter failed, node is nil", zap.Any("rateScope", rateScope), zap.Any("opType", opType))
|
||||
log.Warn("update limiter failed, node is nil", zap.Any("rateScope", limiterRange.RateScope), zap.Any("opType", limiterRange.OpType))
|
||||
return
|
||||
}
|
||||
limiters := node.GetLimiters()
|
||||
getRateTypes(rateScope, opType).Range(func(rt internalpb.RateType) bool {
|
||||
getRateTypes(limiterRange.RateScope, limiterRange.OpType).
|
||||
Complement(limiterRange.ExcludeRateTypes).Range(func(rt internalpb.RateType) bool {
|
||||
originLimiter, ok := limiters.Get(rt)
|
||||
if !ok {
|
||||
log.Warn("update limiter failed, limiter not found",
|
||||
zap.Any("rateScope", rateScope),
|
||||
zap.Any("opType", opType),
|
||||
zap.Any("rateScope", limiterRange.RateScope),
|
||||
zap.Any("opType", limiterRange.OpType),
|
||||
zap.Any("rateType", rt))
|
||||
return true
|
||||
}
|
||||
|
@ -552,9 +559,17 @@ func (q *QuotaCenter) collectMetrics() error {
|
|||
// forceDenyWriting sets dml rates to 0 to reject all dml requests.
|
||||
func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster bool, dbIDs, collectionIDs []int64, col2partitionIDs map[int64][]int64) error {
|
||||
log := log.Ctx(context.TODO()).WithRateGroup("quotaCenter.forceDenyWriting", 1.0, 60.0)
|
||||
var excludeRange typeutil.Set[internalpb.RateType]
|
||||
if errorCode == commonpb.ErrorCode_DiskQuotaExhausted {
|
||||
excludeRange = typeutil.NewSet(internalpb.RateType_DMLDelete)
|
||||
}
|
||||
if cluster {
|
||||
clusterLimiters := q.rateLimiter.GetRootLimiters()
|
||||
updateLimiter(clusterLimiters, GetEarliestLimiter(), internalpb.RateScope_Cluster, dml)
|
||||
updateLimiter(clusterLimiters, GetEarliestLimiter(), &LimiterRange{
|
||||
RateScope: internalpb.RateScope_Cluster,
|
||||
OpType: dml,
|
||||
ExcludeRateTypes: excludeRange,
|
||||
})
|
||||
clusterLimiters.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode)
|
||||
}
|
||||
|
||||
|
@ -564,7 +579,11 @@ func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster boo
|
|||
log.Warn("db limiter not found of db ID", zap.Int64("dbID", dbID))
|
||||
continue
|
||||
}
|
||||
updateLimiter(dbLimiters, GetEarliestLimiter(), internalpb.RateScope_Database, dml)
|
||||
updateLimiter(dbLimiters, GetEarliestLimiter(), &LimiterRange{
|
||||
RateScope: internalpb.RateScope_Database,
|
||||
OpType: dml,
|
||||
ExcludeRateTypes: excludeRange,
|
||||
})
|
||||
dbLimiters.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode)
|
||||
}
|
||||
|
||||
|
@ -581,7 +600,11 @@ func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster boo
|
|||
zap.Int64("collectionID", collectionID))
|
||||
continue
|
||||
}
|
||||
updateLimiter(collectionLimiter, GetEarliestLimiter(), internalpb.RateScope_Collection, dml)
|
||||
updateLimiter(collectionLimiter, GetEarliestLimiter(), &LimiterRange{
|
||||
RateScope: internalpb.RateScope_Collection,
|
||||
OpType: dml,
|
||||
ExcludeRateTypes: excludeRange,
|
||||
})
|
||||
collectionLimiter.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode)
|
||||
}
|
||||
|
||||
|
@ -600,7 +623,11 @@ func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster boo
|
|||
zap.Int64("partitionID", partitionID))
|
||||
continue
|
||||
}
|
||||
updateLimiter(partitionLimiter, GetEarliestLimiter(), internalpb.RateScope_Partition, dml)
|
||||
updateLimiter(partitionLimiter, GetEarliestLimiter(), &LimiterRange{
|
||||
RateScope: internalpb.RateScope_Partition,
|
||||
OpType: dml,
|
||||
ExcludeRateTypes: excludeRange,
|
||||
})
|
||||
partitionLimiter.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode)
|
||||
}
|
||||
}
|
||||
|
@ -624,7 +651,10 @@ func (q *QuotaCenter) forceDenyReading(errorCode commonpb.ErrorCode, cluster boo
|
|||
for dbID, collectionIDToPartIDs := range q.readableCollections {
|
||||
for collectionID := range collectionIDToPartIDs {
|
||||
collectionLimiter := q.rateLimiter.GetCollectionLimiters(dbID, collectionID)
|
||||
updateLimiter(collectionLimiter, GetEarliestLimiter(), internalpb.RateScope_Collection, dql)
|
||||
updateLimiter(collectionLimiter, GetEarliestLimiter(), &LimiterRange{
|
||||
RateScope: internalpb.RateScope_Collection,
|
||||
OpType: dql,
|
||||
})
|
||||
collectionLimiter.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToRead, errorCode)
|
||||
collectionIDs = append(collectionIDs, collectionID)
|
||||
}
|
||||
|
@ -642,7 +672,10 @@ func (q *QuotaCenter) forceDenyReading(errorCode commonpb.ErrorCode, cluster boo
|
|||
log.Warn("db limiter not found of db ID", zap.Int64("dbID", dbID))
|
||||
continue
|
||||
}
|
||||
updateLimiter(dbLimiters, GetEarliestLimiter(), internalpb.RateScope_Database, dql)
|
||||
updateLimiter(dbLimiters, GetEarliestLimiter(), &LimiterRange{
|
||||
RateScope: internalpb.RateScope_Database,
|
||||
OpType: dql,
|
||||
})
|
||||
dbLimiters.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToRead, errorCode)
|
||||
mlog.RatedWarn(10, "QuotaCenter force to deny reading",
|
||||
zap.Int64s("dbIDs", dbIDs),
|
||||
|
|
|
@ -393,6 +393,65 @@ func TestQuotaCenter(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
t.Run("disk quota exhausted", func(t *testing.T) {
|
||||
qc := mocks.NewMockQueryCoordClient(t)
|
||||
meta := mockrootcoord.NewIMetaTable(t)
|
||||
meta.EXPECT().
|
||||
GetCollectionByIDWithMaxTs(mock.Anything, mock.Anything).
|
||||
Return(nil, merr.ErrCollectionNotFound).
|
||||
Maybe()
|
||||
|
||||
quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
|
||||
quotaCenter.collectionIDToDBID = typeutil.NewConcurrentMap[int64, int64]()
|
||||
quotaCenter.collectionIDToDBID.Insert(1, 0)
|
||||
quotaCenter.collectionIDToDBID.Insert(2, 0)
|
||||
|
||||
quotaCenter.writableCollections = map[int64]map[int64][]int64{
|
||||
0: collectionIDToPartitionIDs,
|
||||
}
|
||||
quotaCenter.writableCollections[0][1] = append(quotaCenter.writableCollections[0][1], 1000)
|
||||
|
||||
err := quotaCenter.resetAllCurrentRates()
|
||||
assert.NoError(t, err)
|
||||
|
||||
updateLimit := func(node *interalratelimitutil.RateLimiterNode, rateType internalpb.RateType, limit int64) {
|
||||
limiter, ok := node.GetLimiters().Get(rateType)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
limiter.SetLimit(Limit(limit))
|
||||
}
|
||||
assertLimit := func(node *interalratelimitutil.RateLimiterNode, rateType internalpb.RateType, expectValue int64) {
|
||||
limiter, ok := node.GetLimiters().Get(rateType)
|
||||
if !ok {
|
||||
assert.FailNow(t, "limiter not found")
|
||||
return
|
||||
}
|
||||
assert.EqualValues(t, expectValue, limiter.Limit())
|
||||
}
|
||||
|
||||
updateLimit(quotaCenter.rateLimiter.GetRootLimiters(), internalpb.RateType_DMLInsert, 10)
|
||||
updateLimit(quotaCenter.rateLimiter.GetRootLimiters(), internalpb.RateType_DMLDelete, 9)
|
||||
updateLimit(quotaCenter.rateLimiter.GetDatabaseLimiters(0), internalpb.RateType_DMLInsert, 10)
|
||||
updateLimit(quotaCenter.rateLimiter.GetDatabaseLimiters(0), internalpb.RateType_DMLDelete, 9)
|
||||
updateLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 1), internalpb.RateType_DMLInsert, 10)
|
||||
updateLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 1), internalpb.RateType_DMLDelete, 9)
|
||||
updateLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 2), internalpb.RateType_DMLInsert, 10)
|
||||
updateLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 2), internalpb.RateType_DMLDelete, 9)
|
||||
|
||||
err = quotaCenter.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted, true, []int64{0}, []int64{1}, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assertLimit(quotaCenter.rateLimiter.GetRootLimiters(), internalpb.RateType_DMLInsert, 0)
|
||||
assertLimit(quotaCenter.rateLimiter.GetRootLimiters(), internalpb.RateType_DMLDelete, 9)
|
||||
assertLimit(quotaCenter.rateLimiter.GetDatabaseLimiters(0), internalpb.RateType_DMLInsert, 0)
|
||||
assertLimit(quotaCenter.rateLimiter.GetDatabaseLimiters(0), internalpb.RateType_DMLDelete, 9)
|
||||
assertLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 1), internalpb.RateType_DMLInsert, 0)
|
||||
assertLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 1), internalpb.RateType_DMLDelete, 9)
|
||||
assertLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 2), internalpb.RateType_DMLInsert, 10)
|
||||
assertLimit(quotaCenter.rateLimiter.GetCollectionLimiters(0, 2), internalpb.RateType_DMLDelete, 9)
|
||||
})
|
||||
|
||||
t.Run("test calculateRates", func(t *testing.T) {
|
||||
forceBak := Params.QuotaConfig.ForceDenyWriting.GetValue()
|
||||
paramtable.Get().Save(Params.QuotaConfig.ForceDenyWriting.Key, "false")
|
||||
|
@ -854,7 +913,7 @@ func TestQuotaCenter(t *testing.T) {
|
|||
b, _ := limiters.Get(internalpb.RateType_DMLUpsert)
|
||||
assert.Equal(t, Limit(0), b.Limit())
|
||||
c, _ := limiters.Get(internalpb.RateType_DMLDelete)
|
||||
assert.Equal(t, Limit(0), c.Limit())
|
||||
assert.NotEqual(t, Limit(0), c.Limit())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1473,14 +1532,20 @@ func TestQuotaCenterSuite(t *testing.T) {
|
|||
|
||||
func TestUpdateLimiter(t *testing.T) {
|
||||
t.Run("nil node", func(t *testing.T) {
|
||||
updateLimiter(nil, nil, internalpb.RateScope_Database, dql)
|
||||
updateLimiter(nil, nil, &LimiterRange{
|
||||
RateScope: internalpb.RateScope_Collection,
|
||||
OpType: dql,
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("normal op", func(t *testing.T) {
|
||||
node := interalratelimitutil.NewRateLimiterNode(internalpb.RateScope_Collection)
|
||||
node.GetLimiters().Insert(internalpb.RateType_DQLSearch, ratelimitutil.NewLimiter(5, 5))
|
||||
newLimit := ratelimitutil.NewLimiter(10, 10)
|
||||
updateLimiter(node, newLimit, internalpb.RateScope_Collection, dql)
|
||||
updateLimiter(node, newLimit, &LimiterRange{
|
||||
RateScope: internalpb.RateScope_Collection,
|
||||
OpType: dql,
|
||||
})
|
||||
|
||||
searchLimit, _ := node.GetLimiters().Get(internalpb.RateType_DQLSearch)
|
||||
assert.Equal(t, Limit(10), searchLimit.Limit())
|
||||
|
|
|
@ -67,6 +67,9 @@ func (set Set[T]) Union(other Set[T]) Set[T] {
|
|||
|
||||
// Complement returns the complement with the given set
|
||||
func (set Set[T]) Complement(other Set[T]) Set[T] {
|
||||
if other == nil {
|
||||
return set
|
||||
}
|
||||
ret := NewSet(set.Collect()...)
|
||||
ret.Remove(other.Collect()...)
|
||||
return ret
|
||||
|
|
Loading…
Reference in New Issue