feat: support general capacity restrict for cloud-side resoure contro… (#29845)

related: #29844

Signed-off-by: MrPresent-Han <chun.han@zilliz.com>
pull/29987/head
MrPresent-Han 2024-01-16 16:32:53 +08:00 committed by GitHub
parent 750166fd97
commit 2a0eb1d2e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 150 additions and 5 deletions

View File

@ -180,6 +180,7 @@ rootCoord:
serverMaxRecvSize: 268435456
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
maxGeneralCapacity: 65536
# Related configuration of proxy, used to validate client requests and reduce the returned results.
proxy:

View File

@ -16,6 +16,13 @@
package rootcoord
import (
"context"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
// TODO: better to make them configurable, use default value if no config was set since we never explode these before.
globalIDAllocatorKey = "idTimestamp"
@ -23,3 +30,42 @@ const (
globalTSOAllocatorKey = "timestamp"
globalTSOAllocatorSubPath = "tso"
)
func checkGeneralCapacity(ctx context.Context, newColNum int,
newParNum int64,
newShardNum int32,
core *Core,
ts typeutil.Timestamp,
) error {
var addedNum int64 = 0
if newColNum > 0 && newParNum > 0 && newShardNum > 0 {
// create collections scenarios
addedNum += int64(newColNum) * newParNum * int64(newShardNum)
} else if newColNum == 0 && newShardNum == 0 && newParNum > 0 {
// add partitions to existing collections
addedNum += newParNum
}
var generalNum int64 = 0
collectionsMap := core.meta.ListAllAvailCollections(ctx)
for dbId, collectionIds := range collectionsMap {
db, err := core.meta.GetDatabaseByID(ctx, dbId, ts)
if err == nil {
for _, collectionId := range collectionIds {
collection, err := core.meta.GetCollectionByID(ctx, db.Name, collectionId, ts, true)
if err == nil {
partNum := int64(collection.GetPartitionNum(false))
shardNum := int64(collection.ShardsNum)
generalNum += partNum * shardNum
}
}
}
}
generalNum += addedNum
if generalNum > Params.RootCoordCfg.MaxGeneralCapacity.GetAsInt64() {
return merr.WrapGeneralCapacityExceed(generalNum, Params.RootCoordCfg.MaxGeneralCapacity.GetAsInt64(),
"failed checking constraint: sum_collections(parition*shard) exceeding the max general capacity:")
}
return nil
}

View File

@ -66,8 +66,8 @@ func (t *createCollectionTask) validate() error {
return err
}
// 1. check shard number
shardsNum := t.Req.GetShardsNum()
cfgMaxShardNum := Params.RootCoordCfg.DmlChannelNum.GetAsInt32()
if shardsNum > cfgMaxShardNum {
return fmt.Errorf("shard num (%d) exceeds max configuration (%d)", shardsNum, cfgMaxShardNum)
@ -78,6 +78,7 @@ func (t *createCollectionTask) validate() error {
return fmt.Errorf("shard num (%d) exceeds system limit (%d)", shardsNum, cfgShardLimit)
}
// 2. check db-collection capacity
db2CollIDs := t.core.meta.ListAllAvailCollections(t.ctx)
collIDs, ok := db2CollIDs[t.dbID]
@ -92,6 +93,7 @@ func (t *createCollectionTask) validate() error {
return merr.WrapErrCollectionNumLimitExceeded(maxColNumPerDB, "max number of collection has reached the limit in DB")
}
// 3. check total collection number
totalCollections := 0
for _, collIDs := range db2CollIDs {
totalCollections += len(collIDs)
@ -102,7 +104,13 @@ func (t *createCollectionTask) validate() error {
log.Warn("unable to create collection because the number of collection has reached the limit", zap.Int("max_collection_num", maxCollectionNum))
return merr.WrapErrCollectionNumLimitExceeded(maxCollectionNum, "max number of collection has reached the limit")
}
return nil
// 4. check collection * shard * partition
var newPartNum int64 = 1
if t.Req.GetNumPartitions() > 0 {
newPartNum = t.Req.GetNumPartitions()
}
return checkGeneralCapacity(t.ctx, 1, newPartNum, t.Req.GetShardsNum(), t.core, t.ts)
}
func checkDefaultValue(schema *schemapb.CollectionSchema) error {

View File

@ -183,6 +183,48 @@ func Test_createCollectionTask_validate(t *testing.T) {
assert.Error(t, err)
})
t.Run("collection general number exceeds limit", func(t *testing.T) {
paramtable.Get().Save(Params.RootCoordCfg.MaxGeneralCapacity.Key, strconv.Itoa(1))
defer paramtable.Get().Reset(Params.RootCoordCfg.MaxGeneralCapacity.Key)
meta := mockrootcoord.NewIMetaTable(t)
meta.On("ListAllAvailCollections",
mock.Anything,
).Return(map[int64][]int64{
1: {1, 2},
}, nil)
meta.On("GetDatabaseByID",
mock.Anything, mock.Anything, mock.Anything,
).Return(&model.Database{
Name: "default",
}, nil)
meta.On("GetCollectionByID",
mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything,
).Return(&model.Collection{
Name: "default",
ShardsNum: 2,
Partitions: []*model.Partition{
{
PartitionID: 1,
},
},
}, nil)
core := newTestCore(withMeta(meta))
task := createCollectionTask{
baseTask: newBaseTask(context.TODO(), core),
Req: &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection},
NumPartitions: 256,
ShardsNum: 2,
},
dbID: util.DefaultDBID,
}
err := task.validate()
assert.ErrorIs(t, err, merr.ErrGeneralCapacityExceeded)
})
t.Run("normal case", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("ListAllAvailCollections",
@ -190,12 +232,16 @@ func Test_createCollectionTask_validate(t *testing.T) {
).Return(map[int64][]int64{
1: {1, 2},
}, nil)
meta.On("GetDatabaseByID", mock.Anything,
mock.Anything, mock.Anything).Return(nil, errors.New("mock"))
core := newTestCore(withMeta(meta))
task := createCollectionTask{
baseTask: newBaseTask(context.TODO(), core),
Req: &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection},
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection},
NumPartitions: 2,
ShardsNum: 2,
},
dbID: 1,
}
@ -526,6 +572,8 @@ func Test_createCollectionTask_Prepare(t *testing.T) {
})
t.Run("invalid schema", func(t *testing.T) {
meta.On("GetDatabaseByID", mock.Anything,
mock.Anything, mock.Anything).Return(nil, errors.New("mock"))
core := newTestCore(withMeta(meta))
collectionName := funcutil.GenRandomStr()
task := &createCollectionTask{
@ -554,7 +602,8 @@ func Test_createCollectionTask_Prepare(t *testing.T) {
}
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
meta.On("GetDatabaseByID", mock.Anything,
mock.Anything, mock.Anything).Return(nil, errors.New("mock"))
core := newTestCore(withInvalidIDAllocator(), withMeta(meta))
task := createCollectionTask{
@ -577,6 +626,8 @@ func Test_createCollectionTask_Prepare(t *testing.T) {
field1 := funcutil.GenRandomStr()
ticker := newRocksMqTtSynchronizer()
meta.On("GetDatabaseByID", mock.Anything,
mock.Anything, mock.Anything).Return(nil, errors.New("mock"))
core := newTestCore(withValidIDAllocator(), withTtSynchronizer(ticker), withMeta(meta))
@ -912,6 +963,8 @@ func Test_createCollectionTask_PartitionKey(t *testing.T) {
).Return(map[int64][]int64{
util.DefaultDBID: {1, 2},
}, nil)
meta.On("GetDatabaseByID", mock.Anything,
mock.Anything, mock.Anything).Return(nil, errors.New("mock"))
paramtable.Get().Save(Params.QuotaConfig.MaxCollectionNum.Key, strconv.Itoa(math.MaxInt64))
defer paramtable.Get().Reset(Params.QuotaConfig.MaxCollectionNum.Key)

View File

@ -44,7 +44,7 @@ func (t *createPartitionTask) Prepare(ctx context.Context) error {
return err
}
t.collMeta = collMeta
return nil
return checkGeneralCapacity(ctx, 0, 1, 0, t.core, t.ts)
}
func (t *createPartitionTask) Execute(ctx context.Context) error {

View File

@ -20,6 +20,7 @@ import (
"context"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@ -61,6 +62,14 @@ func Test_createPartitionTask_Prepare(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(coll.Clone(), nil)
meta.On("ListAllAvailCollections",
mock.Anything,
).Return(map[int64][]int64{
1: {1, 2},
}, nil)
meta.On("GetDatabaseByID",
mock.Anything, mock.Anything, mock.Anything,
).Return(nil, errors.New("mock"))
core := newTestCore(withMeta(meta))
task := &createPartitionTask{

View File

@ -57,6 +57,9 @@ var (
ErrPartitionNotLoaded = newMilvusError("partition not loaded", 201, false)
ErrPartitionNotFullyLoaded = newMilvusError("partition not fully loaded", 202, true)
// General capacity related
ErrGeneralCapacityExceeded = newMilvusError("general capacity exceeded", 250, false)
// ResourceGroup related
ErrResourceGroupNotFound = newMilvusError("resource group not found", 300, false)

View File

@ -540,6 +540,15 @@ func WrapErrPartitionNotFullyLoaded(partition any, msg ...string) error {
return err
}
func WrapGeneralCapacityExceed(newGeneralSize any, generalCapacity any, msg ...string) error {
err := wrapFields(ErrGeneralCapacityExceeded, value("newGeneralSize", newGeneralSize),
value("generalCapacity", generalCapacity))
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}
// ResourceGroup related
func WrapErrResourceGroupNotFound(rg any, msg ...string) error {
err := wrapFields(ErrResourceGroupNotFound, value("rg", rg))

View File

@ -869,6 +869,7 @@ type rootCoordConfig struct {
ImportTaskSubPath ParamItem `refreshable:"true"`
EnableActiveStandby ParamItem `refreshable:"false"`
MaxDatabaseNum ParamItem `refreshable:"false"`
MaxGeneralCapacity ParamItem `refreshable:"true"`
}
func (p *rootCoordConfig) init(base *BaseTable) {
@ -948,6 +949,21 @@ func (p *rootCoordConfig) init(base *BaseTable) {
Export: true,
}
p.MaxDatabaseNum.Init(base.mgr)
p.MaxGeneralCapacity = ParamItem{
Key: "rootCoord.maxGeneralCapacity",
Version: "2.3.5",
DefaultValue: "65536",
Doc: "upper limit for the sum of of product of partitionNumber and shardNumber",
Export: true,
Formatter: func(v string) string {
if getAsInt(v) < 512 {
return "512"
}
return v
},
}
p.MaxGeneralCapacity.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////