mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: longjiquan <jiquan.long@zilliz.com> Signed-off-by: longjiquan <jiquan.long@zilliz.com>pull/20469/head
parent
6faa579928
commit
e34acf5f4e
|
@ -5,6 +5,7 @@ import (
|
||||||
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
||||||
"github.com/milvus-io/milvus/internal/common"
|
"github.com/milvus-io/milvus/internal/common"
|
||||||
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
|
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||||
|
"github.com/samber/lo"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Collection struct {
|
type Collection struct {
|
||||||
|
@ -51,6 +52,13 @@ func (c Collection) Clone() *Collection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c Collection) GetPartitionNum(filterUnavailable bool) int {
|
||||||
|
if !filterUnavailable {
|
||||||
|
return len(c.Partitions)
|
||||||
|
}
|
||||||
|
return lo.CountBy(c.Partitions, func(p *Partition) bool { return p.Available() })
|
||||||
|
}
|
||||||
|
|
||||||
func (c Collection) Equal(other Collection) bool {
|
func (c Collection) Equal(other Collection) bool {
|
||||||
return c.TenantID == other.TenantID &&
|
return c.TenantID == other.TenantID &&
|
||||||
CheckPartitionsEqual(c.Partitions, other.Partitions) &&
|
CheckPartitionsEqual(c.Partitions, other.Partitions) &&
|
||||||
|
|
|
@ -102,3 +102,18 @@ func TestUnmarshalCollectionModel(t *testing.T) {
|
||||||
func TestMarshalCollectionModel(t *testing.T) {
|
func TestMarshalCollectionModel(t *testing.T) {
|
||||||
assert.Nil(t, MarshalCollectionModel(nil))
|
assert.Nil(t, MarshalCollectionModel(nil))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCollection_GetPartitionNum(t *testing.T) {
|
||||||
|
coll := &Collection{
|
||||||
|
Partitions: []*Partition{
|
||||||
|
{State: pb.PartitionState_PartitionCreated},
|
||||||
|
{State: pb.PartitionState_PartitionCreating},
|
||||||
|
{State: pb.PartitionState_PartitionCreated},
|
||||||
|
{State: pb.PartitionState_PartitionDropping},
|
||||||
|
{State: pb.PartitionState_PartitionCreated},
|
||||||
|
{State: pb.PartitionState_PartitionDropped},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert.Equal(t, 3, coll.GetPartitionNum(true))
|
||||||
|
assert.Equal(t, 6, coll.GetPartitionNum(false))
|
||||||
|
}
|
||||||
|
|
|
@ -22,21 +22,20 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/common"
|
|
||||||
|
|
||||||
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
|
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||||
|
"github.com/milvus-io/milvus/internal/common"
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/metastore"
|
"github.com/milvus-io/milvus/internal/metastore"
|
||||||
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
|
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
|
||||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||||
|
"github.com/milvus-io/milvus/internal/metrics"
|
||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
"github.com/milvus-io/milvus/internal/util/contextutil"
|
"github.com/milvus-io/milvus/internal/util/contextutil"
|
||||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -140,6 +139,9 @@ func (mt *MetaTable) reload() error {
|
||||||
mt.collName2ID = make(map[string]UniqueID)
|
mt.collName2ID = make(map[string]UniqueID)
|
||||||
mt.collAlias2ID = make(map[string]UniqueID)
|
mt.collAlias2ID = make(map[string]UniqueID)
|
||||||
|
|
||||||
|
collectionNum := int64(0)
|
||||||
|
partitionNum := int64(0)
|
||||||
|
|
||||||
// max ts means listing latest resources, meta table should always cache the latest version of catalog.
|
// max ts means listing latest resources, meta table should always cache the latest version of catalog.
|
||||||
collections, err := mt.catalog.ListCollections(mt.ctx, typeutil.MaxTimestamp)
|
collections, err := mt.catalog.ListCollections(mt.ctx, typeutil.MaxTimestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -148,6 +150,11 @@ func (mt *MetaTable) reload() error {
|
||||||
for name, collection := range collections {
|
for name, collection := range collections {
|
||||||
mt.collID2Meta[collection.CollectionID] = collection
|
mt.collID2Meta[collection.CollectionID] = collection
|
||||||
mt.collName2ID[name] = collection.CollectionID
|
mt.collName2ID[name] = collection.CollectionID
|
||||||
|
|
||||||
|
if collection.Available() {
|
||||||
|
collectionNum++
|
||||||
|
partitionNum += int64(collection.GetPartitionNum(true))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// max ts means listing latest resources, meta table should always cache the latest version of catalog.
|
// max ts means listing latest resources, meta table should always cache the latest version of catalog.
|
||||||
|
@ -159,6 +166,9 @@ func (mt *MetaTable) reload() error {
|
||||||
mt.collAlias2ID[alias.Name] = alias.CollectionID
|
mt.collAlias2ID[alias.Name] = alias.CollectionID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics.RootCoordNumOfCollections.Set(float64(collectionNum))
|
||||||
|
metrics.RootCoordNumOfPartitions.WithLabelValues().Set(float64(partitionNum))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -199,6 +209,16 @@ func (mt *MetaTable) ChangeCollectionState(ctx context.Context, collectionID Uni
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
mt.collID2Meta[collectionID] = clone
|
mt.collID2Meta[collectionID] = clone
|
||||||
|
|
||||||
|
switch state {
|
||||||
|
case pb.CollectionState_CollectionCreated:
|
||||||
|
metrics.RootCoordNumOfCollections.Inc()
|
||||||
|
metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(coll.GetPartitionNum(true)))
|
||||||
|
default:
|
||||||
|
metrics.RootCoordNumOfCollections.Dec()
|
||||||
|
metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(coll.GetPartitionNum(true)))
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("change collection state", zap.Int64("collection", collectionID),
|
log.Info("change collection state", zap.Int64("collection", collectionID),
|
||||||
zap.String("state", state.String()), zap.Uint64("ts", ts))
|
zap.String("state", state.String()), zap.Uint64("ts", ts))
|
||||||
|
|
||||||
|
@ -457,9 +477,13 @@ func (mt *MetaTable) AddPartition(ctx context.Context, partition *model.Partitio
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
mt.collID2Meta[partition.CollectionID].Partitions = append(mt.collID2Meta[partition.CollectionID].Partitions, partition.Clone())
|
mt.collID2Meta[partition.CollectionID].Partitions = append(mt.collID2Meta[partition.CollectionID].Partitions, partition.Clone())
|
||||||
|
|
||||||
|
metrics.RootCoordNumOfPartitions.WithLabelValues().Inc()
|
||||||
|
|
||||||
log.Info("add partition to meta table",
|
log.Info("add partition to meta table",
|
||||||
zap.Int64("collection", partition.CollectionID), zap.String("partition", partition.PartitionName),
|
zap.Int64("collection", partition.CollectionID), zap.String("partition", partition.PartitionName),
|
||||||
zap.Int64("partitionid", partition.PartitionID), zap.Uint64("ts", partition.PartitionCreatedTimestamp))
|
zap.Int64("partitionid", partition.PartitionID), zap.Uint64("ts", partition.PartitionCreatedTimestamp))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -480,9 +504,20 @@ func (mt *MetaTable) ChangePartitionState(ctx context.Context, collectionID Uniq
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
mt.collID2Meta[collectionID].Partitions[idx] = clone
|
mt.collID2Meta[collectionID].Partitions[idx] = clone
|
||||||
|
|
||||||
|
switch state {
|
||||||
|
case pb.PartitionState_PartitionCreated:
|
||||||
|
log.Warn("[should not happen] change partition to created",
|
||||||
|
zap.String("collection", coll.Name), zap.Int64("collection id", coll.CollectionID),
|
||||||
|
zap.String("partition", clone.PartitionName), zap.Int64("partition id", clone.PartitionID))
|
||||||
|
default:
|
||||||
|
metrics.RootCoordNumOfPartitions.WithLabelValues().Dec()
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("change partition state", zap.Int64("collection", collectionID),
|
log.Info("change partition state", zap.Int64("collection", collectionID),
|
||||||
zap.Int64("partition", partitionID), zap.String("state", state.String()),
|
zap.Int64("partition", partitionID), zap.String("state", state.String()),
|
||||||
zap.Uint64("ts", ts))
|
zap.Uint64("ts", ts))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1020,3 +1020,233 @@ func TestMetaTable_RemoveCollection(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMetaTable_reload(t *testing.T) {
|
||||||
|
t.Run("failed to list collections", func(t *testing.T) {
|
||||||
|
catalog := mocks.NewRootCoordCatalog(t)
|
||||||
|
catalog.On("ListCollections",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("uint64"),
|
||||||
|
).Return(nil, errors.New("error mock ListCollections"))
|
||||||
|
meta := &MetaTable{catalog: catalog}
|
||||||
|
err := meta.reload()
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Empty(t, meta.collID2Meta)
|
||||||
|
assert.Empty(t, meta.collName2ID)
|
||||||
|
assert.Empty(t, meta.collAlias2ID)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("failed to list aliases", func(t *testing.T) {
|
||||||
|
catalog := mocks.NewRootCoordCatalog(t)
|
||||||
|
catalog.On("ListCollections",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("uint64"),
|
||||||
|
).Return(
|
||||||
|
map[string]*model.Collection{"test": {CollectionID: 100, Name: "test"}},
|
||||||
|
nil)
|
||||||
|
catalog.On("ListAliases",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("uint64"),
|
||||||
|
).Return(nil, errors.New("error mock ListAliases"))
|
||||||
|
meta := &MetaTable{catalog: catalog}
|
||||||
|
err := meta.reload()
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Empty(t, meta.collAlias2ID)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("normal case", func(t *testing.T) {
|
||||||
|
catalog := mocks.NewRootCoordCatalog(t)
|
||||||
|
|
||||||
|
catalog.On("ListCollections",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("uint64"),
|
||||||
|
).Return(
|
||||||
|
map[string]*model.Collection{"test": {CollectionID: 100, Name: "test"}},
|
||||||
|
nil)
|
||||||
|
|
||||||
|
catalog.On("ListAliases",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("uint64"),
|
||||||
|
).Return(
|
||||||
|
[]*model.Alias{{Name: "alias", CollectionID: 100}},
|
||||||
|
nil)
|
||||||
|
|
||||||
|
meta := &MetaTable{catalog: catalog}
|
||||||
|
err := meta.reload()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, 1, len(meta.collID2Meta))
|
||||||
|
assert.Equal(t, 1, len(meta.collName2ID))
|
||||||
|
assert.Equal(t, 1, len(meta.collAlias2ID))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetaTable_ChangeCollectionState(t *testing.T) {
|
||||||
|
t.Run("not exist", func(t *testing.T) {
|
||||||
|
meta := &MetaTable{}
|
||||||
|
err := meta.ChangeCollectionState(context.TODO(), 100, pb.CollectionState_CollectionCreated, 100)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("failed to alter collection", func(t *testing.T) {
|
||||||
|
catalog := mocks.NewRootCoordCatalog(t)
|
||||||
|
catalog.On("AlterCollection",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.Anything, // *model.Collection
|
||||||
|
mock.Anything, // *model.Collection
|
||||||
|
mock.Anything, // metastore.AlterType
|
||||||
|
mock.AnythingOfType("uint64"),
|
||||||
|
).Return(errors.New("error mock AlterCollection"))
|
||||||
|
meta := &MetaTable{
|
||||||
|
catalog: catalog,
|
||||||
|
collID2Meta: map[typeutil.UniqueID]*model.Collection{
|
||||||
|
100: {Name: "test", CollectionID: 100},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := meta.ChangeCollectionState(context.TODO(), 100, pb.CollectionState_CollectionCreated, 1000)
|
||||||
|
assert.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("normal case", func(t *testing.T) {
|
||||||
|
catalog := mocks.NewRootCoordCatalog(t)
|
||||||
|
catalog.On("AlterCollection",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.Anything, // *model.Collection
|
||||||
|
mock.Anything, // *model.Collection
|
||||||
|
mock.Anything, // metastore.AlterType
|
||||||
|
mock.AnythingOfType("uint64"),
|
||||||
|
).Return(nil)
|
||||||
|
meta := &MetaTable{
|
||||||
|
catalog: catalog,
|
||||||
|
collID2Meta: map[typeutil.UniqueID]*model.Collection{
|
||||||
|
100: {Name: "test", CollectionID: 100},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := meta.ChangeCollectionState(context.TODO(), 100, pb.CollectionState_CollectionCreated, 1000)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = meta.ChangeCollectionState(context.TODO(), 100, pb.CollectionState_CollectionDropping, 1000)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetaTable_AddPartition(t *testing.T) {
|
||||||
|
t.Run("collection not available", func(t *testing.T) {
|
||||||
|
meta := &MetaTable{}
|
||||||
|
err := meta.AddPartition(context.TODO(), &model.Partition{CollectionID: 100})
|
||||||
|
assert.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("add not-created partition", func(t *testing.T) {
|
||||||
|
meta := &MetaTable{
|
||||||
|
collID2Meta: map[typeutil.UniqueID]*model.Collection{
|
||||||
|
100: {
|
||||||
|
Name: "test",
|
||||||
|
CollectionID: 100,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := meta.AddPartition(context.TODO(), &model.Partition{CollectionID: 100, State: pb.PartitionState_PartitionDropping})
|
||||||
|
assert.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("failed to create partition", func(t *testing.T) {
|
||||||
|
catalog := mocks.NewRootCoordCatalog(t)
|
||||||
|
catalog.On("CreatePartition",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.Anything, // *model.Partition
|
||||||
|
mock.AnythingOfType("uint64"),
|
||||||
|
).Return(errors.New("error mock CreatePartition"))
|
||||||
|
meta := &MetaTable{
|
||||||
|
catalog: catalog,
|
||||||
|
collID2Meta: map[typeutil.UniqueID]*model.Collection{
|
||||||
|
100: {Name: "test", CollectionID: 100},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := meta.AddPartition(context.TODO(), &model.Partition{CollectionID: 100, State: pb.PartitionState_PartitionCreated})
|
||||||
|
assert.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("normal case", func(t *testing.T) {
|
||||||
|
catalog := mocks.NewRootCoordCatalog(t)
|
||||||
|
catalog.On("CreatePartition",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.Anything, // *model.Partition
|
||||||
|
mock.AnythingOfType("uint64"),
|
||||||
|
).Return(nil)
|
||||||
|
meta := &MetaTable{
|
||||||
|
catalog: catalog,
|
||||||
|
collID2Meta: map[typeutil.UniqueID]*model.Collection{
|
||||||
|
100: {Name: "test", CollectionID: 100},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := meta.AddPartition(context.TODO(), &model.Partition{CollectionID: 100, State: pb.PartitionState_PartitionCreated})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetaTable_ChangePartitionState(t *testing.T) {
|
||||||
|
t.Run("collection not exist", func(t *testing.T) {
|
||||||
|
meta := &MetaTable{}
|
||||||
|
err := meta.ChangePartitionState(context.TODO(), 100, 500, pb.PartitionState_PartitionDropping, 1000)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("partition not exist", func(t *testing.T) {
|
||||||
|
meta := &MetaTable{
|
||||||
|
collID2Meta: map[typeutil.UniqueID]*model.Collection{
|
||||||
|
100: {Name: "test", CollectionID: 100},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := meta.ChangePartitionState(context.TODO(), 100, 500, pb.PartitionState_PartitionDropping, 1000)
|
||||||
|
assert.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("failed to alter partition", func(t *testing.T) {
|
||||||
|
catalog := mocks.NewRootCoordCatalog(t)
|
||||||
|
catalog.On("AlterPartition",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.Anything, // *model.Partition
|
||||||
|
mock.Anything, // *model.Partition
|
||||||
|
mock.Anything, // metastore.AlterType
|
||||||
|
mock.AnythingOfType("uint64"),
|
||||||
|
).Return(errors.New("error mock AlterPartition"))
|
||||||
|
meta := &MetaTable{
|
||||||
|
catalog: catalog,
|
||||||
|
collID2Meta: map[typeutil.UniqueID]*model.Collection{
|
||||||
|
100: {
|
||||||
|
Name: "test", CollectionID: 100,
|
||||||
|
Partitions: []*model.Partition{
|
||||||
|
{CollectionID: 100, PartitionID: 500},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := meta.ChangePartitionState(context.TODO(), 100, 500, pb.PartitionState_PartitionDropping, 1000)
|
||||||
|
assert.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("normal case", func(t *testing.T) {
|
||||||
|
catalog := mocks.NewRootCoordCatalog(t)
|
||||||
|
catalog.On("AlterPartition",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.Anything, // *model.Partition
|
||||||
|
mock.Anything, // *model.Partition
|
||||||
|
mock.Anything, // metastore.AlterType
|
||||||
|
mock.AnythingOfType("uint64"),
|
||||||
|
).Return(nil)
|
||||||
|
meta := &MetaTable{
|
||||||
|
catalog: catalog,
|
||||||
|
collID2Meta: map[typeutil.UniqueID]*model.Collection{
|
||||||
|
100: {
|
||||||
|
Name: "test", CollectionID: 100,
|
||||||
|
Partitions: []*model.Partition{
|
||||||
|
{CollectionID: 100, PartitionID: 500},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := meta.ChangePartitionState(context.TODO(), 100, 500, pb.PartitionState_PartitionCreated, 1000)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = meta.ChangePartitionState(context.TODO(), 100, 500, pb.PartitionState_PartitionDropping, 1000)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue