mirror of https://github.com/milvus-io/milvus.git
fix: [2.4] Fix inaccurate general count (#38525)
Checking general count should only count healthy collections and partitions. issue: https://github.com/milvus-io/milvus/issues/37630 pr: https://github.com/milvus-io/milvus/pull/38524 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/38548/head
parent
c24f666f1b
commit
70d2b58533
|
@ -0,0 +1,92 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package rootcoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/metastore/mocks"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
mocktso "github.com/milvus-io/milvus/internal/tso/mocks"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
func TestCheckGeneralCapacity(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
catalog := mocks.NewRootCoordCatalog(t)
|
||||
catalog.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(nil, nil)
|
||||
catalog.EXPECT().ListCollections(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
|
||||
catalog.EXPECT().ListAliases(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
|
||||
catalog.EXPECT().CreateDatabase(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
allocator := mocktso.NewAllocator(t)
|
||||
allocator.EXPECT().GenerateTSO(mock.Anything).Return(1000, nil)
|
||||
|
||||
meta, err := NewMetaTable(ctx, catalog, allocator)
|
||||
assert.NoError(t, err)
|
||||
core := newTestCore(withMeta(meta))
|
||||
assert.Equal(t, 0, meta.GetGeneralCount(ctx))
|
||||
|
||||
Params.Save(Params.RootCoordCfg.MaxGeneralCapacity.Key, "512")
|
||||
defer Params.Reset(Params.RootCoordCfg.MaxGeneralCapacity.Key)
|
||||
|
||||
assert.Equal(t, 0, meta.GetGeneralCount(ctx))
|
||||
err = checkGeneralCapacity(ctx, 1, 2, 256, core)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, meta.GetGeneralCount(ctx))
|
||||
err = checkGeneralCapacity(ctx, 2, 4, 256, core)
|
||||
assert.Error(t, err)
|
||||
|
||||
catalog.EXPECT().CreateCollection(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
err = meta.CreateDatabase(ctx, &model.Database{}, typeutil.MaxTimestamp)
|
||||
assert.NoError(t, err)
|
||||
err = meta.AddCollection(ctx, &model.Collection{
|
||||
CollectionID: 1,
|
||||
State: pb.CollectionState_CollectionCreating,
|
||||
ShardsNum: 256,
|
||||
Partitions: []*model.Partition{
|
||||
{PartitionID: 100, State: pb.PartitionState_PartitionCreated},
|
||||
{PartitionID: 200, State: pb.PartitionState_PartitionCreated},
|
||||
},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 0, meta.GetGeneralCount(ctx))
|
||||
err = checkGeneralCapacity(ctx, 1, 2, 256, core)
|
||||
assert.NoError(t, err)
|
||||
|
||||
catalog.EXPECT().AlterCollection(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
err = meta.ChangeCollectionState(ctx, 1, pb.CollectionState_CollectionCreated, typeutil.MaxTimestamp)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 512, meta.GetGeneralCount(ctx))
|
||||
err = checkGeneralCapacity(ctx, 1, 1, 1, core)
|
||||
assert.Error(t, err)
|
||||
|
||||
err = meta.ChangeCollectionState(ctx, 1, pb.CollectionState_CollectionDropping, typeutil.MaxTimestamp)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 0, meta.GetGeneralCount(ctx))
|
||||
err = checkGeneralCapacity(ctx, 1, 2, 256, core)
|
||||
assert.NoError(t, err)
|
||||
}
|
|
@ -190,11 +190,12 @@ func (mt *MetaTable) reload() error {
|
|||
}
|
||||
for _, collection := range collections {
|
||||
mt.collID2Meta[collection.CollectionID] = collection
|
||||
mt.generalCnt += len(collection.Partitions) * int(collection.ShardsNum)
|
||||
if collection.Available() {
|
||||
mt.names.insert(dbName, collection.Name, collection.CollectionID)
|
||||
pn := collection.GetPartitionNum(true)
|
||||
mt.generalCnt += pn * int(collection.ShardsNum)
|
||||
collectionNum++
|
||||
partitionNum += int64(collection.GetPartitionNum(true))
|
||||
partitionNum += int64(pn)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -234,8 +235,10 @@ func (mt *MetaTable) reloadWithNonDatabase() error {
|
|||
mt.collID2Meta[collection.CollectionID] = collection
|
||||
if collection.Available() {
|
||||
mt.names.insert(util.DefaultDBName, collection.Name, collection.CollectionID)
|
||||
pn := collection.GetPartitionNum(true)
|
||||
mt.generalCnt += pn * int(collection.ShardsNum)
|
||||
collectionNum++
|
||||
partitionNum += int64(collection.GetPartitionNum(true))
|
||||
partitionNum += int64(pn)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -419,8 +422,6 @@ func (mt *MetaTable) AddCollection(ctx context.Context, coll *model.Collection)
|
|||
mt.collID2Meta[coll.CollectionID] = coll.Clone()
|
||||
mt.names.insert(db.Name, coll.Name, coll.CollectionID)
|
||||
|
||||
mt.generalCnt += len(coll.Partitions) * int(coll.ShardsNum)
|
||||
|
||||
log.Ctx(ctx).Info("add collection to meta table",
|
||||
zap.Int64("dbID", coll.DBID),
|
||||
zap.String("collection", coll.Name),
|
||||
|
@ -451,13 +452,17 @@ func (mt *MetaTable) ChangeCollectionState(ctx context.Context, collectionID Uni
|
|||
return fmt.Errorf("dbID not found for collection:%d", collectionID)
|
||||
}
|
||||
|
||||
pn := coll.GetPartitionNum(true)
|
||||
|
||||
switch state {
|
||||
case pb.CollectionState_CollectionCreated:
|
||||
mt.generalCnt += pn * int(coll.ShardsNum)
|
||||
metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Inc()
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(coll.GetPartitionNum(true)))
|
||||
default:
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(pn))
|
||||
case pb.CollectionState_CollectionDropping:
|
||||
mt.generalCnt -= pn * int(coll.ShardsNum)
|
||||
metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Dec()
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(coll.GetPartitionNum(true)))
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(pn))
|
||||
}
|
||||
|
||||
log.Ctx(ctx).Info("change collection state", zap.Int64("collection", collectionID),
|
||||
|
@ -525,8 +530,6 @@ func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID
|
|||
mt.removeAllNamesIfMatchedInternal(collectionID, allNames)
|
||||
mt.removeCollectionByIDInternal(collectionID)
|
||||
|
||||
mt.generalCnt -= len(coll.Partitions) * int(coll.ShardsNum)
|
||||
|
||||
log.Ctx(ctx).Info("remove collection",
|
||||
zap.Int64("dbID", coll.DBID),
|
||||
zap.String("name", coll.Name),
|
||||
|
@ -875,8 +878,6 @@ func (mt *MetaTable) AddPartition(ctx context.Context, partition *model.Partitio
|
|||
}
|
||||
mt.collID2Meta[partition.CollectionID].Partitions = append(mt.collID2Meta[partition.CollectionID].Partitions, partition.Clone())
|
||||
|
||||
mt.generalCnt += int(coll.ShardsNum) // 1 partition * shardNum
|
||||
|
||||
log.Ctx(ctx).Info("add partition to meta table",
|
||||
zap.Int64("collection", partition.CollectionID), zap.String("partition", partition.PartitionName),
|
||||
zap.Int64("partitionid", partition.PartitionID), zap.Uint64("ts", partition.PartitionCreatedTimestamp))
|
||||
|
@ -904,9 +905,11 @@ func (mt *MetaTable) ChangePartitionState(ctx context.Context, collectionID Uniq
|
|||
|
||||
switch state {
|
||||
case pb.PartitionState_PartitionCreated:
|
||||
mt.generalCnt += int(coll.ShardsNum) // 1 partition * shardNum
|
||||
// support Dynamic load/release partitions
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Inc()
|
||||
default:
|
||||
case pb.PartitionState_PartitionDropping:
|
||||
mt.generalCnt -= int(coll.ShardsNum) // 1 partition * shardNum
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Dec()
|
||||
}
|
||||
|
||||
|
@ -941,7 +944,6 @@ func (mt *MetaTable) RemovePartition(ctx context.Context, dbID int64, collection
|
|||
}
|
||||
if loc != -1 {
|
||||
coll.Partitions = append(coll.Partitions[:loc], coll.Partitions[loc+1:]...)
|
||||
mt.generalCnt -= int(coll.ShardsNum) // 1 partition * shardNum
|
||||
}
|
||||
log.Info("remove partition", zap.Int64("collection", collectionID), zap.Int64("partition", partitionID), zap.Uint64("ts", ts))
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue