From e34acf5f4eb1b27ed3f6d7cafe0c782f555ee9da Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Wed, 9 Nov 2022 22:11:04 +0800 Subject: [PATCH] Fix rootcoord metrics (#20427) (#20447) Signed-off-by: longjiquan Signed-off-by: longjiquan --- internal/metastore/model/collection.go | 8 + internal/metastore/model/collection_test.go | 15 ++ internal/rootcoord/meta_table.go | 43 +++- internal/rootcoord/meta_table_test.go | 230 ++++++++++++++++++++ 4 files changed, 292 insertions(+), 4 deletions(-) diff --git a/internal/metastore/model/collection.go b/internal/metastore/model/collection.go index ef61041af9..2bd39ae7e0 100644 --- a/internal/metastore/model/collection.go +++ b/internal/metastore/model/collection.go @@ -5,6 +5,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/common" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/samber/lo" ) type Collection struct { @@ -51,6 +52,13 @@ func (c Collection) Clone() *Collection { } } +func (c Collection) GetPartitionNum(filterUnavailable bool) int { + if !filterUnavailable { + return len(c.Partitions) + } + return lo.CountBy(c.Partitions, func(p *Partition) bool { return p.Available() }) +} + func (c Collection) Equal(other Collection) bool { return c.TenantID == other.TenantID && CheckPartitionsEqual(c.Partitions, other.Partitions) && diff --git a/internal/metastore/model/collection_test.go b/internal/metastore/model/collection_test.go index d82baaf01c..d96446d22d 100644 --- a/internal/metastore/model/collection_test.go +++ b/internal/metastore/model/collection_test.go @@ -102,3 +102,18 @@ func TestUnmarshalCollectionModel(t *testing.T) { func TestMarshalCollectionModel(t *testing.T) { assert.Nil(t, MarshalCollectionModel(nil)) } + +func TestCollection_GetPartitionNum(t *testing.T) { + coll := &Collection{ + Partitions: []*Partition{ + {State: pb.PartitionState_PartitionCreated}, + {State: pb.PartitionState_PartitionCreating}, + {State: pb.PartitionState_PartitionCreated}, + {State: pb.PartitionState_PartitionDropping}, + {State: pb.PartitionState_PartitionCreated}, + {State: pb.PartitionState_PartitionDropped}, + }, + } + assert.Equal(t, 3, coll.GetPartitionNum(true)) + assert.Equal(t, 6, coll.GetPartitionNum(false)) +} diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 6b6c155f26..3641332c55 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -22,21 +22,20 @@ import ( "fmt" "sync" - "github.com/milvus-io/milvus/internal/common" - pb "github.com/milvus-io/milvus/internal/proto/etcdpb" - "go.uber.org/zap" - "github.com/milvus-io/milvus-proto/go-api/milvuspb" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/contextutil" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/typeutil" + "go.uber.org/zap" ) const ( @@ -140,6 +139,9 @@ func (mt *MetaTable) reload() error { mt.collName2ID = make(map[string]UniqueID) mt.collAlias2ID = make(map[string]UniqueID) + collectionNum := int64(0) + partitionNum := int64(0) + // max ts means listing latest resources, meta table should always cache the latest version of catalog. collections, err := mt.catalog.ListCollections(mt.ctx, typeutil.MaxTimestamp) if err != nil { @@ -148,6 +150,11 @@ func (mt *MetaTable) reload() error { for name, collection := range collections { mt.collID2Meta[collection.CollectionID] = collection mt.collName2ID[name] = collection.CollectionID + + if collection.Available() { + collectionNum++ + partitionNum += int64(collection.GetPartitionNum(true)) + } } // max ts means listing latest resources, meta table should always cache the latest version of catalog. @@ -159,6 +166,9 @@ func (mt *MetaTable) reload() error { mt.collAlias2ID[alias.Name] = alias.CollectionID } + metrics.RootCoordNumOfCollections.Set(float64(collectionNum)) + metrics.RootCoordNumOfPartitions.WithLabelValues().Set(float64(partitionNum)) + return nil } @@ -199,6 +209,16 @@ func (mt *MetaTable) ChangeCollectionState(ctx context.Context, collectionID Uni return err } mt.collID2Meta[collectionID] = clone + + switch state { + case pb.CollectionState_CollectionCreated: + metrics.RootCoordNumOfCollections.Inc() + metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(coll.GetPartitionNum(true))) + default: + metrics.RootCoordNumOfCollections.Dec() + metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(coll.GetPartitionNum(true))) + } + log.Info("change collection state", zap.Int64("collection", collectionID), zap.String("state", state.String()), zap.Uint64("ts", ts)) @@ -457,9 +477,13 @@ func (mt *MetaTable) AddPartition(ctx context.Context, partition *model.Partitio return err } mt.collID2Meta[partition.CollectionID].Partitions = append(mt.collID2Meta[partition.CollectionID].Partitions, partition.Clone()) + + metrics.RootCoordNumOfPartitions.WithLabelValues().Inc() + log.Info("add partition to meta table", zap.Int64("collection", partition.CollectionID), zap.String("partition", partition.PartitionName), zap.Int64("partitionid", partition.PartitionID), zap.Uint64("ts", partition.PartitionCreatedTimestamp)) + return nil } @@ -480,9 +504,20 @@ func (mt *MetaTable) ChangePartitionState(ctx context.Context, collectionID Uniq return err } mt.collID2Meta[collectionID].Partitions[idx] = clone + + switch state { + case pb.PartitionState_PartitionCreated: + log.Warn("[should not happen] change partition to created", + zap.String("collection", coll.Name), zap.Int64("collection id", coll.CollectionID), + zap.String("partition", clone.PartitionName), zap.Int64("partition id", clone.PartitionID)) + default: + metrics.RootCoordNumOfPartitions.WithLabelValues().Dec() + } + log.Info("change partition state", zap.Int64("collection", collectionID), zap.Int64("partition", partitionID), zap.String("state", state.String()), zap.Uint64("ts", ts)) + return nil } } diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index 483e14595d..4001a3936e 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -1020,3 +1020,233 @@ func TestMetaTable_RemoveCollection(t *testing.T) { assert.NoError(t, err) }) } + +func TestMetaTable_reload(t *testing.T) { + t.Run("failed to list collections", func(t *testing.T) { + catalog := mocks.NewRootCoordCatalog(t) + catalog.On("ListCollections", + mock.Anything, // context.Context + mock.AnythingOfType("uint64"), + ).Return(nil, errors.New("error mock ListCollections")) + meta := &MetaTable{catalog: catalog} + err := meta.reload() + assert.Error(t, err) + assert.Empty(t, meta.collID2Meta) + assert.Empty(t, meta.collName2ID) + assert.Empty(t, meta.collAlias2ID) + }) + + t.Run("failed to list aliases", func(t *testing.T) { + catalog := mocks.NewRootCoordCatalog(t) + catalog.On("ListCollections", + mock.Anything, // context.Context + mock.AnythingOfType("uint64"), + ).Return( + map[string]*model.Collection{"test": {CollectionID: 100, Name: "test"}}, + nil) + catalog.On("ListAliases", + mock.Anything, // context.Context + mock.AnythingOfType("uint64"), + ).Return(nil, errors.New("error mock ListAliases")) + meta := &MetaTable{catalog: catalog} + err := meta.reload() + assert.Error(t, err) + assert.Empty(t, meta.collAlias2ID) + }) + + t.Run("normal case", func(t *testing.T) { + catalog := mocks.NewRootCoordCatalog(t) + + catalog.On("ListCollections", + mock.Anything, // context.Context + mock.AnythingOfType("uint64"), + ).Return( + map[string]*model.Collection{"test": {CollectionID: 100, Name: "test"}}, + nil) + + catalog.On("ListAliases", + mock.Anything, // context.Context + mock.AnythingOfType("uint64"), + ).Return( + []*model.Alias{{Name: "alias", CollectionID: 100}}, + nil) + + meta := &MetaTable{catalog: catalog} + err := meta.reload() + assert.NoError(t, err) + assert.Equal(t, 1, len(meta.collID2Meta)) + assert.Equal(t, 1, len(meta.collName2ID)) + assert.Equal(t, 1, len(meta.collAlias2ID)) + }) +} + +func TestMetaTable_ChangeCollectionState(t *testing.T) { + t.Run("not exist", func(t *testing.T) { + meta := &MetaTable{} + err := meta.ChangeCollectionState(context.TODO(), 100, pb.CollectionState_CollectionCreated, 100) + assert.NoError(t, err) + }) + + t.Run("failed to alter collection", func(t *testing.T) { + catalog := mocks.NewRootCoordCatalog(t) + catalog.On("AlterCollection", + mock.Anything, // context.Context + mock.Anything, // *model.Collection + mock.Anything, // *model.Collection + mock.Anything, // metastore.AlterType + mock.AnythingOfType("uint64"), + ).Return(errors.New("error mock AlterCollection")) + meta := &MetaTable{ + catalog: catalog, + collID2Meta: map[typeutil.UniqueID]*model.Collection{ + 100: {Name: "test", CollectionID: 100}, + }, + } + err := meta.ChangeCollectionState(context.TODO(), 100, pb.CollectionState_CollectionCreated, 1000) + assert.Error(t, err) + }) + + t.Run("normal case", func(t *testing.T) { + catalog := mocks.NewRootCoordCatalog(t) + catalog.On("AlterCollection", + mock.Anything, // context.Context + mock.Anything, // *model.Collection + mock.Anything, // *model.Collection + mock.Anything, // metastore.AlterType + mock.AnythingOfType("uint64"), + ).Return(nil) + meta := &MetaTable{ + catalog: catalog, + collID2Meta: map[typeutil.UniqueID]*model.Collection{ + 100: {Name: "test", CollectionID: 100}, + }, + } + err := meta.ChangeCollectionState(context.TODO(), 100, pb.CollectionState_CollectionCreated, 1000) + assert.NoError(t, err) + err = meta.ChangeCollectionState(context.TODO(), 100, pb.CollectionState_CollectionDropping, 1000) + assert.NoError(t, err) + }) +} + +func TestMetaTable_AddPartition(t *testing.T) { + t.Run("collection not available", func(t *testing.T) { + meta := &MetaTable{} + err := meta.AddPartition(context.TODO(), &model.Partition{CollectionID: 100}) + assert.Error(t, err) + }) + + t.Run("add not-created partition", func(t *testing.T) { + meta := &MetaTable{ + collID2Meta: map[typeutil.UniqueID]*model.Collection{ + 100: { + Name: "test", + CollectionID: 100, + }, + }, + } + err := meta.AddPartition(context.TODO(), &model.Partition{CollectionID: 100, State: pb.PartitionState_PartitionDropping}) + assert.Error(t, err) + }) + + t.Run("failed to create partition", func(t *testing.T) { + catalog := mocks.NewRootCoordCatalog(t) + catalog.On("CreatePartition", + mock.Anything, // context.Context + mock.Anything, // *model.Partition + mock.AnythingOfType("uint64"), + ).Return(errors.New("error mock CreatePartition")) + meta := &MetaTable{ + catalog: catalog, + collID2Meta: map[typeutil.UniqueID]*model.Collection{ + 100: {Name: "test", CollectionID: 100}, + }, + } + err := meta.AddPartition(context.TODO(), &model.Partition{CollectionID: 100, State: pb.PartitionState_PartitionCreated}) + assert.Error(t, err) + }) + + t.Run("normal case", func(t *testing.T) { + catalog := mocks.NewRootCoordCatalog(t) + catalog.On("CreatePartition", + mock.Anything, // context.Context + mock.Anything, // *model.Partition + mock.AnythingOfType("uint64"), + ).Return(nil) + meta := &MetaTable{ + catalog: catalog, + collID2Meta: map[typeutil.UniqueID]*model.Collection{ + 100: {Name: "test", CollectionID: 100}, + }, + } + err := meta.AddPartition(context.TODO(), &model.Partition{CollectionID: 100, State: pb.PartitionState_PartitionCreated}) + assert.NoError(t, err) + }) +} + +func TestMetaTable_ChangePartitionState(t *testing.T) { + t.Run("collection not exist", func(t *testing.T) { + meta := &MetaTable{} + err := meta.ChangePartitionState(context.TODO(), 100, 500, pb.PartitionState_PartitionDropping, 1000) + assert.NoError(t, err) + }) + + t.Run("partition not exist", func(t *testing.T) { + meta := &MetaTable{ + collID2Meta: map[typeutil.UniqueID]*model.Collection{ + 100: {Name: "test", CollectionID: 100}, + }, + } + err := meta.ChangePartitionState(context.TODO(), 100, 500, pb.PartitionState_PartitionDropping, 1000) + assert.Error(t, err) + }) + + t.Run("failed to alter partition", func(t *testing.T) { + catalog := mocks.NewRootCoordCatalog(t) + catalog.On("AlterPartition", + mock.Anything, // context.Context + mock.Anything, // *model.Partition + mock.Anything, // *model.Partition + mock.Anything, // metastore.AlterType + mock.AnythingOfType("uint64"), + ).Return(errors.New("error mock AlterPartition")) + meta := &MetaTable{ + catalog: catalog, + collID2Meta: map[typeutil.UniqueID]*model.Collection{ + 100: { + Name: "test", CollectionID: 100, + Partitions: []*model.Partition{ + {CollectionID: 100, PartitionID: 500}, + }, + }, + }, + } + err := meta.ChangePartitionState(context.TODO(), 100, 500, pb.PartitionState_PartitionDropping, 1000) + assert.Error(t, err) + }) + + t.Run("normal case", func(t *testing.T) { + catalog := mocks.NewRootCoordCatalog(t) + catalog.On("AlterPartition", + mock.Anything, // context.Context + mock.Anything, // *model.Partition + mock.Anything, // *model.Partition + mock.Anything, // metastore.AlterType + mock.AnythingOfType("uint64"), + ).Return(nil) + meta := &MetaTable{ + catalog: catalog, + collID2Meta: map[typeutil.UniqueID]*model.Collection{ + 100: { + Name: "test", CollectionID: 100, + Partitions: []*model.Partition{ + {CollectionID: 100, PartitionID: 500}, + }, + }, + }, + } + err := meta.ChangePartitionState(context.TODO(), 100, 500, pb.PartitionState_PartitionCreated, 1000) + assert.NoError(t, err) + err = meta.ChangePartitionState(context.TODO(), 100, 500, pb.PartitionState_PartitionDropping, 1000) + assert.NoError(t, err) + }) +}