mirror of https://github.com/milvus-io/milvus.git
1367 lines
46 KiB
Go
1367 lines
46 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rootcoord
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/exp/maps"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
|
"github.com/milvus-io/milvus/internal/metastore"
|
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
|
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
|
"github.com/milvus-io/milvus/internal/tso"
|
|
"github.com/milvus-io/milvus/pkg/common"
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
"github.com/milvus-io/milvus/pkg/metrics"
|
|
"github.com/milvus-io/milvus/pkg/util"
|
|
"github.com/milvus-io/milvus/pkg/util/contextutil"
|
|
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
|
)
|
|
|
|
//go:generate mockery --name=IMetaTable --outpkg=mockrootcoord --filename=meta_table.go --with-expecter
|
|
type IMetaTable interface {
|
|
GetDatabaseByID(ctx context.Context, dbID int64, ts Timestamp) (*model.Database, error)
|
|
GetDatabaseByName(ctx context.Context, dbName string, ts Timestamp) (*model.Database, error)
|
|
CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error
|
|
DropDatabase(ctx context.Context, dbName string, ts typeutil.Timestamp) error
|
|
ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error)
|
|
|
|
AddCollection(ctx context.Context, coll *model.Collection) error
|
|
ChangeCollectionState(ctx context.Context, collectionID UniqueID, state pb.CollectionState, ts Timestamp) error
|
|
RemoveCollection(ctx context.Context, collectionID UniqueID, ts Timestamp) error
|
|
GetCollectionByName(ctx context.Context, dbName string, collectionName string, ts Timestamp) (*model.Collection, error)
|
|
GetCollectionByID(ctx context.Context, dbName string, collectionID UniqueID, ts Timestamp, allowUnavailable bool) (*model.Collection, error)
|
|
ListCollections(ctx context.Context, dbName string, ts Timestamp, onlyAvail bool) ([]*model.Collection, error)
|
|
ListAllAvailCollections(ctx context.Context) map[int64][]int64
|
|
ListCollectionPhysicalChannels() map[typeutil.UniqueID][]string
|
|
GetCollectionVirtualChannels(colID int64) []string
|
|
AddPartition(ctx context.Context, partition *model.Partition) error
|
|
ChangePartitionState(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error
|
|
RemovePartition(ctx context.Context, dbID int64, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error
|
|
CreateAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error
|
|
DropAlias(ctx context.Context, dbName string, alias string, ts Timestamp) error
|
|
AlterAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error
|
|
AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, ts Timestamp) error
|
|
RenameCollection(ctx context.Context, dbName string, oldName string, newDBName string, newName string, ts Timestamp) error
|
|
|
|
// TODO: it'll be a big cost if we handle the time travel logic, since we should always list all aliases in catalog.
|
|
IsAlias(db, name string) bool
|
|
ListAliasesByID(collID UniqueID) []string
|
|
|
|
// TODO: better to accept ctx.
|
|
GetPartitionNameByID(collID UniqueID, partitionID UniqueID, ts Timestamp) (string, error) // serve for bulk insert.
|
|
GetPartitionByName(collID UniqueID, partitionName string, ts Timestamp) (UniqueID, error) // serve for bulk insert.
|
|
|
|
// TODO: better to accept ctx.
|
|
AddCredential(credInfo *internalpb.CredentialInfo) error
|
|
GetCredential(username string) (*internalpb.CredentialInfo, error)
|
|
DeleteCredential(username string) error
|
|
AlterCredential(credInfo *internalpb.CredentialInfo) error
|
|
ListCredentialUsernames() (*milvuspb.ListCredUsersResponse, error)
|
|
|
|
// TODO: better to accept ctx.
|
|
CreateRole(tenant string, entity *milvuspb.RoleEntity) error
|
|
DropRole(tenant string, roleName string) error
|
|
OperateUserRole(tenant string, userEntity *milvuspb.UserEntity, roleEntity *milvuspb.RoleEntity, operateType milvuspb.OperateUserRoleType) error
|
|
SelectRole(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error)
|
|
SelectUser(tenant string, entity *milvuspb.UserEntity, includeRoleInfo bool) ([]*milvuspb.UserResult, error)
|
|
OperatePrivilege(tenant string, entity *milvuspb.GrantEntity, operateType milvuspb.OperatePrivilegeType) error
|
|
SelectGrant(tenant string, entity *milvuspb.GrantEntity) ([]*milvuspb.GrantEntity, error)
|
|
DropGrant(tenant string, role *milvuspb.RoleEntity) error
|
|
ListPolicy(tenant string) ([]string, error)
|
|
ListUserRole(tenant string) ([]string, error)
|
|
}
|
|
|
|
type MetaTable struct {
|
|
ctx context.Context
|
|
catalog metastore.RootCoordCatalog
|
|
|
|
tsoAllocator tso.Allocator
|
|
|
|
dbName2Meta map[string]*model.Database // database name -> db meta
|
|
collID2Meta map[typeutil.UniqueID]*model.Collection // collection id -> collection meta
|
|
|
|
// collections *collectionDb
|
|
names *nameDb
|
|
aliases *nameDb
|
|
|
|
ddLock sync.RWMutex
|
|
permissionLock sync.RWMutex
|
|
}
|
|
|
|
func NewMetaTable(ctx context.Context, catalog metastore.RootCoordCatalog, tsoAllocator tso.Allocator) (*MetaTable, error) {
|
|
mt := &MetaTable{
|
|
ctx: contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue()),
|
|
catalog: catalog,
|
|
tsoAllocator: tsoAllocator,
|
|
}
|
|
if err := mt.reload(); err != nil {
|
|
return nil, err
|
|
}
|
|
return mt, nil
|
|
}
|
|
|
|
func (mt *MetaTable) reload() error {
|
|
mt.ddLock.Lock()
|
|
defer mt.ddLock.Unlock()
|
|
|
|
record := timerecord.NewTimeRecorder("rootcoord")
|
|
mt.dbName2Meta = make(map[string]*model.Database)
|
|
mt.collID2Meta = make(map[UniqueID]*model.Collection)
|
|
mt.names = newNameDb()
|
|
mt.aliases = newNameDb()
|
|
|
|
collectionNum := int64(0)
|
|
partitionNum := int64(0)
|
|
|
|
metrics.RootCoordNumOfCollections.Set(float64(0))
|
|
metrics.RootCoordNumOfPartitions.WithLabelValues().Set(float64(0))
|
|
|
|
// recover databases.
|
|
dbs, err := mt.catalog.ListDatabases(mt.ctx, typeutil.MaxTimestamp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info("recover databases", zap.Int("num of dbs", len(dbs)))
|
|
for _, db := range dbs {
|
|
mt.dbName2Meta[db.Name] = db
|
|
}
|
|
dbNames := maps.Keys(mt.dbName2Meta)
|
|
// create default database.
|
|
if !funcutil.SliceContain(dbNames, util.DefaultDBName) {
|
|
if err := mt.createDefaultDb(); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
mt.names.createDbIfNotExist(util.DefaultDBName)
|
|
mt.aliases.createDbIfNotExist(util.DefaultDBName)
|
|
}
|
|
|
|
// in order to support backward compatibility with meta of the old version, it also
|
|
// needs to reload collections that have no database
|
|
if err := mt.reloadWithNonDatabase(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// recover collections from db namespace
|
|
for dbName, db := range mt.dbName2Meta {
|
|
mt.names.createDbIfNotExist(dbName)
|
|
collections, err := mt.catalog.ListCollections(mt.ctx, db.ID, typeutil.MaxTimestamp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, collection := range collections {
|
|
mt.collID2Meta[collection.CollectionID] = collection
|
|
if collection.Available() {
|
|
mt.names.insert(dbName, collection.Name, collection.CollectionID)
|
|
collectionNum++
|
|
partitionNum += int64(collection.GetPartitionNum(true))
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Info("recover collections from db", zap.Int64("collection_num", collectionNum), zap.Int64("partition_num", partitionNum))
|
|
|
|
// recover aliases from db namespace
|
|
for dbName, db := range mt.dbName2Meta {
|
|
mt.aliases.createDbIfNotExist(dbName)
|
|
aliases, err := mt.catalog.ListAliases(mt.ctx, db.ID, typeutil.MaxTimestamp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, alias := range aliases {
|
|
mt.aliases.insert(dbName, alias.Name, alias.CollectionID)
|
|
}
|
|
}
|
|
|
|
metrics.RootCoordNumOfCollections.Add(float64(collectionNum))
|
|
metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(partitionNum))
|
|
log.Info("RootCoord meta table reload done", zap.Duration("duration", record.ElapseSpan()))
|
|
return nil
|
|
}
|
|
|
|
// insert into default database if the collections doesn't inside some database
|
|
func (mt *MetaTable) reloadWithNonDatabase() error {
|
|
collectionNum := int64(0)
|
|
partitionNum := int64(0)
|
|
oldCollections, err := mt.catalog.ListCollections(mt.ctx, util.NonDBID, typeutil.MaxTimestamp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, collection := range oldCollections {
|
|
mt.collID2Meta[collection.CollectionID] = collection
|
|
if collection.Available() {
|
|
mt.names.insert(util.DefaultDBName, collection.Name, collection.CollectionID)
|
|
collectionNum++
|
|
partitionNum += int64(collection.GetPartitionNum(true))
|
|
}
|
|
}
|
|
|
|
if collectionNum > 0 {
|
|
log.Info("recover collections without db", zap.Int64("collection_num", collectionNum), zap.Int64("partition_num", partitionNum))
|
|
}
|
|
|
|
aliases, err := mt.catalog.ListAliases(mt.ctx, util.NonDBID, typeutil.MaxTimestamp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, alias := range aliases {
|
|
mt.aliases.insert(util.DefaultDBName, alias.Name, alias.CollectionID)
|
|
}
|
|
|
|
metrics.RootCoordNumOfCollections.Add(float64(collectionNum))
|
|
metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(partitionNum))
|
|
return nil
|
|
}
|
|
|
|
func (mt *MetaTable) createDefaultDb() error {
|
|
ts, err := mt.tsoAllocator.GenerateTSO(1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return mt.createDatabasePrivate(mt.ctx, model.NewDefaultDatabase(), ts)
|
|
}
|
|
|
|
func (mt *MetaTable) CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error {
|
|
mt.ddLock.Lock()
|
|
defer mt.ddLock.Unlock()
|
|
|
|
return mt.createDatabasePrivate(ctx, db, ts)
|
|
}
|
|
|
|
func (mt *MetaTable) createDatabasePrivate(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error {
|
|
dbName := db.Name
|
|
if mt.names.exist(dbName) || mt.aliases.exist(dbName) {
|
|
return fmt.Errorf("database already exist: %s", dbName)
|
|
}
|
|
|
|
if err := mt.catalog.CreateDatabase(ctx, db, ts); err != nil {
|
|
return err
|
|
}
|
|
|
|
mt.names.createDbIfNotExist(dbName)
|
|
mt.aliases.createDbIfNotExist(dbName)
|
|
mt.dbName2Meta[dbName] = db
|
|
log.Ctx(ctx).Info("create database", zap.String("db", dbName), zap.Uint64("ts", ts))
|
|
|
|
return nil
|
|
}
|
|
|
|
func (mt *MetaTable) DropDatabase(ctx context.Context, dbName string, ts typeutil.Timestamp) error {
|
|
mt.ddLock.Lock()
|
|
defer mt.ddLock.Unlock()
|
|
|
|
if dbName == util.DefaultDBName {
|
|
return fmt.Errorf("can not drop default database")
|
|
}
|
|
|
|
db, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp)
|
|
if err != nil {
|
|
log.Warn("not found database", zap.String("db", dbName))
|
|
return nil
|
|
}
|
|
|
|
colls, err := mt.listCollectionFromCache(dbName, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(colls) > 0 {
|
|
return fmt.Errorf("database:%s not empty, must drop all collections before drop database", dbName)
|
|
}
|
|
|
|
if err := mt.catalog.DropDatabase(ctx, db.ID, ts); err != nil {
|
|
return err
|
|
}
|
|
|
|
mt.names.dropDb(dbName)
|
|
mt.aliases.dropDb(dbName)
|
|
delete(mt.dbName2Meta, dbName)
|
|
log.Ctx(ctx).Info("drop database", zap.String("db", dbName), zap.Uint64("ts", ts))
|
|
|
|
return nil
|
|
}
|
|
|
|
func (mt *MetaTable) ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error) {
|
|
mt.ddLock.RLock()
|
|
defer mt.ddLock.RUnlock()
|
|
|
|
return maps.Values(mt.dbName2Meta), nil
|
|
}
|
|
|
|
func (mt *MetaTable) GetDatabaseByID(ctx context.Context, dbID int64, ts Timestamp) (*model.Database, error) {
|
|
mt.ddLock.RLock()
|
|
defer mt.ddLock.RUnlock()
|
|
return mt.getDatabaseByIDInternal(ctx, dbID, ts)
|
|
}
|
|
|
|
func (mt *MetaTable) getDatabaseByIDInternal(ctx context.Context, dbID int64, ts Timestamp) (*model.Database, error) {
|
|
for _, db := range maps.Values(mt.dbName2Meta) {
|
|
if db.ID == dbID {
|
|
return db, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("database dbID:%d not found", dbID)
|
|
}
|
|
|
|
func (mt *MetaTable) GetDatabaseByName(ctx context.Context, dbName string, ts Timestamp) (*model.Database, error) {
|
|
mt.ddLock.RLock()
|
|
defer mt.ddLock.RUnlock()
|
|
return mt.getDatabaseByNameInternal(ctx, dbName, ts)
|
|
}
|
|
|
|
func (mt *MetaTable) getDatabaseByNameInternal(ctx context.Context, dbName string, ts Timestamp) (*model.Database, error) {
|
|
// backward compatibility for rolling upgrade
|
|
if dbName == "" {
|
|
log.Warn("db name is empty")
|
|
dbName = util.DefaultDBName
|
|
}
|
|
|
|
db, ok := mt.dbName2Meta[dbName]
|
|
if !ok {
|
|
return nil, fmt.Errorf("database:%s not found", dbName)
|
|
}
|
|
|
|
return db, nil
|
|
}
|
|
|
|
func (mt *MetaTable) AddCollection(ctx context.Context, coll *model.Collection) error {
|
|
mt.ddLock.Lock()
|
|
defer mt.ddLock.Unlock()
|
|
|
|
// Note:
|
|
// 1, idempotency check was already done outside;
|
|
// 2, no need to check time travel logic, since ts should always be the latest;
|
|
|
|
db, err := mt.getDatabaseByIDInternal(ctx, coll.DBID, typeutil.MaxTimestamp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if coll.State != pb.CollectionState_CollectionCreating {
|
|
return fmt.Errorf("collection state should be creating, collection name: %s, collection id: %d, state: %s", coll.Name, coll.CollectionID, coll.State)
|
|
}
|
|
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
|
if err := mt.catalog.CreateCollection(ctx1, coll, coll.CreateTime); err != nil {
|
|
return err
|
|
}
|
|
|
|
mt.collID2Meta[coll.CollectionID] = coll.Clone()
|
|
mt.names.insert(db.Name, coll.Name, coll.CollectionID)
|
|
|
|
log.Ctx(ctx).Info("add collection to meta table",
|
|
zap.Int64("dbID", coll.DBID),
|
|
zap.String("collection", coll.Name),
|
|
zap.Int64("id", coll.CollectionID),
|
|
zap.Uint64("ts", coll.CreateTime),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
func (mt *MetaTable) ChangeCollectionState(ctx context.Context, collectionID UniqueID, state pb.CollectionState, ts Timestamp) error {
|
|
mt.ddLock.Lock()
|
|
defer mt.ddLock.Unlock()
|
|
|
|
coll, ok := mt.collID2Meta[collectionID]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
clone := coll.Clone()
|
|
clone.State = state
|
|
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
|
if err := mt.catalog.AlterCollection(ctx1, coll, clone, metastore.MODIFY, ts); err != nil {
|
|
return err
|
|
}
|
|
mt.collID2Meta[collectionID] = clone
|
|
|
|
switch state {
|
|
case pb.CollectionState_CollectionCreated:
|
|
metrics.RootCoordNumOfCollections.Inc()
|
|
metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(coll.GetPartitionNum(true)))
|
|
default:
|
|
metrics.RootCoordNumOfCollections.Dec()
|
|
metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(coll.GetPartitionNum(true)))
|
|
}
|
|
|
|
log.Ctx(ctx).Info("change collection state", zap.Int64("collection", collectionID),
|
|
zap.String("state", state.String()), zap.Uint64("ts", ts))
|
|
|
|
return nil
|
|
}
|
|
|
|
func (mt *MetaTable) removeIfNameMatchedInternal(collectionID UniqueID, name string) {
|
|
mt.names.removeIf(func(db string, collection string, id UniqueID) bool {
|
|
return collectionID == id
|
|
})
|
|
}
|
|
|
|
func (mt *MetaTable) removeIfAliasMatchedInternal(collectionID UniqueID, alias string) {
|
|
mt.aliases.removeIf(func(db string, collection string, id UniqueID) bool {
|
|
return collectionID == id
|
|
})
|
|
}
|
|
|
|
func (mt *MetaTable) removeIfMatchedInternal(collectionID UniqueID, name string) {
|
|
mt.removeIfNameMatchedInternal(collectionID, name)
|
|
mt.removeIfAliasMatchedInternal(collectionID, name)
|
|
}
|
|
|
|
func (mt *MetaTable) removeAllNamesIfMatchedInternal(collectionID UniqueID, names []string) {
|
|
for _, name := range names {
|
|
mt.removeIfMatchedInternal(collectionID, name)
|
|
}
|
|
}
|
|
|
|
func (mt *MetaTable) removeCollectionByIDInternal(collectionID UniqueID) {
|
|
delete(mt.collID2Meta, collectionID)
|
|
}
|
|
|
|
func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID, ts Timestamp) error {
|
|
mt.ddLock.Lock()
|
|
defer mt.ddLock.Unlock()
|
|
|
|
// Note: we cannot handle case that dropping collection with `ts1` but a collection exists in catalog with newer ts
|
|
// which is bigger than `ts1`. So we assume that ts should always be the latest.
|
|
coll, ok := mt.collID2Meta[collectionID]
|
|
if !ok {
|
|
log.Warn("not found collection, skip remove", zap.Int64("collectionID", collectionID))
|
|
return nil
|
|
}
|
|
|
|
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
|
aliases := mt.listAliasesByID(collectionID)
|
|
newColl := &model.Collection{CollectionID: collectionID, Aliases: aliases, DBID: coll.DBID}
|
|
if err := mt.catalog.DropCollection(ctx1, newColl, ts); err != nil {
|
|
return err
|
|
}
|
|
|
|
allNames := common.CloneStringList(aliases)
|
|
allNames = append(allNames, coll.Name)
|
|
|
|
// We cannot delete the name directly, since newly collection with same name may be created.
|
|
mt.removeAllNamesIfMatchedInternal(collectionID, allNames)
|
|
mt.removeCollectionByIDInternal(collectionID)
|
|
|
|
log.Ctx(ctx).Info("remove collection",
|
|
zap.Int64("dbID", coll.DBID),
|
|
zap.String("name", coll.Name),
|
|
zap.Int64("id", collectionID),
|
|
zap.Strings("aliases", aliases),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func filterUnavailable(coll *model.Collection) *model.Collection {
|
|
clone := coll.Clone()
|
|
// pick available partitions.
|
|
clone.Partitions = nil
|
|
for _, partition := range coll.Partitions {
|
|
if partition.Available() {
|
|
clone.Partitions = append(clone.Partitions, partition.Clone())
|
|
}
|
|
}
|
|
return clone
|
|
}
|
|
|
|
// getLatestCollectionByIDInternal should be called with ts = typeutil.MaxTimestamp
|
|
func (mt *MetaTable) getLatestCollectionByIDInternal(ctx context.Context, collectionID UniqueID, allowAvailable bool) (*model.Collection, error) {
|
|
coll, ok := mt.collID2Meta[collectionID]
|
|
if !ok || coll == nil {
|
|
return nil, merr.WrapErrCollectionNotFound(collectionID)
|
|
}
|
|
if allowAvailable {
|
|
return coll.Clone(), nil
|
|
}
|
|
if !coll.Available() {
|
|
return nil, merr.WrapErrCollectionNotFound(collectionID)
|
|
}
|
|
return filterUnavailable(coll), nil
|
|
}
|
|
|
|
// getCollectionByIDInternal get collection by collection id without lock.
|
|
func (mt *MetaTable) getCollectionByIDInternal(ctx context.Context, dbName string, collectionID UniqueID, ts Timestamp, allowUnavailable bool) (*model.Collection, error) {
|
|
if isMaxTs(ts) {
|
|
return mt.getLatestCollectionByIDInternal(ctx, collectionID, allowUnavailable)
|
|
}
|
|
|
|
var coll *model.Collection
|
|
coll, ok := mt.collID2Meta[collectionID]
|
|
if !ok || coll == nil || !coll.Available() || coll.CreateTime > ts {
|
|
// travel meta information from catalog.
|
|
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
|
db, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
coll, err = mt.catalog.GetCollectionByID(ctx1, db.ID, ts, collectionID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if coll == nil {
|
|
// use coll.Name to match error message of regression. TODO: remove this after error code is ready.
|
|
return nil, merr.WrapErrCollectionNotFound(collectionID)
|
|
}
|
|
|
|
if allowUnavailable {
|
|
return coll.Clone(), nil
|
|
}
|
|
|
|
if !coll.Available() {
|
|
// use coll.Name to match error message of regression. TODO: remove this after error code is ready.
|
|
return nil, merr.WrapErrCollectionNotFound(dbName, coll.Name)
|
|
}
|
|
|
|
return filterUnavailable(coll), nil
|
|
}
|
|
|
|
func (mt *MetaTable) GetCollectionByName(ctx context.Context, dbName string, collectionName string, ts Timestamp) (*model.Collection, error) {
|
|
mt.ddLock.RLock()
|
|
defer mt.ddLock.RUnlock()
|
|
return mt.getCollectionByNameInternal(ctx, dbName, collectionName, ts)
|
|
}
|
|
|
|
func (mt *MetaTable) getCollectionByNameInternal(ctx context.Context, dbName string, collectionName string, ts Timestamp) (*model.Collection, error) {
|
|
// backward compatibility for rolling upgrade
|
|
if dbName == "" {
|
|
log.Warn("db name is empty", zap.String("collectionName", collectionName), zap.Uint64("ts", ts))
|
|
dbName = util.DefaultDBName
|
|
}
|
|
|
|
collectionID, ok := mt.aliases.get(dbName, collectionName)
|
|
if ok {
|
|
return mt.getCollectionByIDInternal(ctx, dbName, collectionID, ts, false)
|
|
}
|
|
|
|
collectionID, ok = mt.names.get(dbName, collectionName)
|
|
if ok {
|
|
return mt.getCollectionByIDInternal(ctx, dbName, collectionID, ts, false)
|
|
}
|
|
|
|
if isMaxTs(ts) {
|
|
return nil, merr.WrapErrCollectionNotFoundWithDB(dbName, collectionName)
|
|
}
|
|
|
|
db, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// travel meta information from catalog. No need to check time travel logic again, since catalog already did.
|
|
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
|
coll, err := mt.catalog.GetCollectionByName(ctx1, db.ID, collectionName, ts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if coll == nil || !coll.Available() {
|
|
return nil, merr.WrapErrCollectionNotFoundWithDB(dbName, collectionName)
|
|
}
|
|
return filterUnavailable(coll), nil
|
|
}
|
|
|
|
func (mt *MetaTable) GetCollectionByID(ctx context.Context, dbName string, collectionID UniqueID, ts Timestamp, allowUnavailable bool) (*model.Collection, error) {
|
|
mt.ddLock.RLock()
|
|
defer mt.ddLock.RUnlock()
|
|
|
|
return mt.getCollectionByIDInternal(ctx, dbName, collectionID, ts, allowUnavailable)
|
|
}
|
|
|
|
func (mt *MetaTable) ListAllAvailCollections(ctx context.Context) map[int64][]int64 {
|
|
mt.ddLock.RLock()
|
|
defer mt.ddLock.RUnlock()
|
|
|
|
ret := make(map[int64][]int64, len(mt.dbName2Meta))
|
|
for _, dbMeta := range mt.dbName2Meta {
|
|
ret[dbMeta.ID] = make([]int64, 0)
|
|
}
|
|
|
|
for collID, collMeta := range mt.collID2Meta {
|
|
if !collMeta.Available() {
|
|
continue
|
|
}
|
|
dbID := collMeta.DBID
|
|
if dbID == util.NonDBID {
|
|
ret[util.DefaultDBID] = append(ret[util.DefaultDBID], collID)
|
|
continue
|
|
}
|
|
ret[dbID] = append(ret[dbID], collID)
|
|
}
|
|
|
|
return ret
|
|
}
|
|
|
|
func (mt *MetaTable) ListCollections(ctx context.Context, dbName string, ts Timestamp, onlyAvail bool) ([]*model.Collection, error) {
|
|
mt.ddLock.RLock()
|
|
defer mt.ddLock.RUnlock()
|
|
|
|
if isMaxTs(ts) {
|
|
return mt.listCollectionFromCache(dbName, onlyAvail)
|
|
}
|
|
|
|
db, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// list collections should always be loaded from catalog.
|
|
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
|
colls, err := mt.catalog.ListCollections(ctx1, db.ID, ts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
onlineCollections := make([]*model.Collection, 0, len(colls))
|
|
for _, coll := range colls {
|
|
if onlyAvail && !coll.Available() {
|
|
continue
|
|
}
|
|
onlineCollections = append(onlineCollections, coll)
|
|
}
|
|
return onlineCollections, nil
|
|
}
|
|
|
|
func (mt *MetaTable) listCollectionFromCache(dbName string, onlyAvail bool) ([]*model.Collection, error) {
|
|
// backward compatibility for rolling upgrade
|
|
if dbName == "" {
|
|
log.Warn("db name is empty")
|
|
dbName = util.DefaultDBName
|
|
}
|
|
|
|
db, ok := mt.dbName2Meta[dbName]
|
|
if !ok {
|
|
return nil, fmt.Errorf("database:%s not found", dbName)
|
|
}
|
|
|
|
collectionFromCache := make([]*model.Collection, 0, len(mt.collID2Meta))
|
|
for _, collMeta := range mt.collID2Meta {
|
|
if (collMeta.DBID != util.NonDBID && db.ID == collMeta.DBID) ||
|
|
(collMeta.DBID == util.NonDBID && dbName == util.DefaultDBName) {
|
|
if onlyAvail && !collMeta.Available() {
|
|
continue
|
|
}
|
|
|
|
collectionFromCache = append(collectionFromCache, collMeta)
|
|
}
|
|
}
|
|
return collectionFromCache, nil
|
|
}
|
|
|
|
// ListCollectionPhysicalChannels list physical channels of all collections.
|
|
func (mt *MetaTable) ListCollectionPhysicalChannels() map[typeutil.UniqueID][]string {
|
|
mt.ddLock.RLock()
|
|
defer mt.ddLock.RUnlock()
|
|
|
|
chanMap := make(map[UniqueID][]string)
|
|
|
|
for id, collInfo := range mt.collID2Meta {
|
|
chanMap[id] = common.CloneStringList(collInfo.PhysicalChannelNames)
|
|
}
|
|
|
|
return chanMap
|
|
}
|
|
|
|
func (mt *MetaTable) AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, ts Timestamp) error {
|
|
mt.ddLock.Lock()
|
|
defer mt.ddLock.Unlock()
|
|
|
|
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
|
if err := mt.catalog.AlterCollection(ctx1, oldColl, newColl, metastore.MODIFY, ts); err != nil {
|
|
return err
|
|
}
|
|
mt.collID2Meta[oldColl.CollectionID] = newColl
|
|
log.Info("alter collection finished", zap.Int64("collectionID", oldColl.CollectionID), zap.Uint64("ts", ts))
|
|
return nil
|
|
}
|
|
|
|
func (mt *MetaTable) RenameCollection(ctx context.Context, dbName string, oldName string, newDBName string, newName string, ts Timestamp) error {
|
|
mt.ddLock.Lock()
|
|
defer mt.ddLock.Unlock()
|
|
|
|
ctx = contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
|
log := log.Ctx(ctx).With(
|
|
zap.String("oldDBName", dbName),
|
|
zap.String("oldName", oldName),
|
|
zap.String("newDBName", newDBName),
|
|
zap.String("newName", newName),
|
|
)
|
|
|
|
// backward compatibility for rolling upgrade
|
|
if dbName == "" {
|
|
log.Warn("db name is empty")
|
|
dbName = util.DefaultDBName
|
|
}
|
|
|
|
if newDBName == "" {
|
|
log.Warn("target db name is empty")
|
|
newDBName = dbName
|
|
}
|
|
|
|
// check target db
|
|
targetDB, ok := mt.dbName2Meta[newDBName]
|
|
if !ok {
|
|
return fmt.Errorf("target database:%s not found", newDBName)
|
|
}
|
|
|
|
// old collection should not be an alias
|
|
_, ok = mt.aliases.get(dbName, oldName)
|
|
if ok {
|
|
log.Warn("unsupported use a alias to rename collection")
|
|
return fmt.Errorf("unsupported use an alias to rename collection, alias:%s", oldName)
|
|
}
|
|
|
|
// check new collection already exists
|
|
newColl, err := mt.getCollectionByNameInternal(ctx, newDBName, newName, ts)
|
|
if newColl != nil {
|
|
log.Warn("check new collection fail")
|
|
return fmt.Errorf("duplicated new collection name %s:%s with other collection name or alias", newDBName, newName)
|
|
}
|
|
if err != nil && !errors.Is(err, merr.ErrCollectionNotFound) {
|
|
log.Warn("check new collection name fail")
|
|
return err
|
|
}
|
|
|
|
// get old collection meta
|
|
oldColl, err := mt.getCollectionByNameInternal(ctx, dbName, oldName, ts)
|
|
if err != nil {
|
|
log.Warn("get old collection fail")
|
|
return err
|
|
}
|
|
|
|
// unsupported rename collection while the collection has aliases
|
|
aliases := mt.listAliasesByID(oldColl.CollectionID)
|
|
if len(aliases) > 0 && oldColl.DBID != targetDB.ID {
|
|
return fmt.Errorf("fail to rename db name, must drop all aliases of this collection before rename")
|
|
}
|
|
|
|
newColl = oldColl.Clone()
|
|
newColl.Name = newName
|
|
newColl.DBID = targetDB.ID
|
|
if err := mt.catalog.AlterCollection(ctx, oldColl, newColl, metastore.MODIFY, ts); err != nil {
|
|
return err
|
|
}
|
|
|
|
mt.names.insert(newDBName, newName, oldColl.CollectionID)
|
|
mt.names.remove(dbName, oldName)
|
|
|
|
mt.collID2Meta[oldColl.CollectionID] = newColl
|
|
|
|
log.Info("rename collection finished")
|
|
return nil
|
|
}
|
|
|
|
// GetCollectionVirtualChannels returns virtual channels of a given collection.
|
|
func (mt *MetaTable) GetCollectionVirtualChannels(colID int64) []string {
|
|
mt.ddLock.RLock()
|
|
defer mt.ddLock.RUnlock()
|
|
for id, collInfo := range mt.collID2Meta {
|
|
if id == colID {
|
|
return common.CloneStringList(collInfo.VirtualChannelNames)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (mt *MetaTable) AddPartition(ctx context.Context, partition *model.Partition) error {
|
|
mt.ddLock.Lock()
|
|
defer mt.ddLock.Unlock()
|
|
|
|
coll, ok := mt.collID2Meta[partition.CollectionID]
|
|
if !ok || !coll.Available() {
|
|
return fmt.Errorf("collection not exists: %d", partition.CollectionID)
|
|
}
|
|
if partition.State != pb.PartitionState_PartitionCreating {
|
|
return fmt.Errorf("partition state is not created, collection: %d, partition: %d, state: %s", partition.CollectionID, partition.PartitionID, partition.State)
|
|
}
|
|
if err := mt.catalog.CreatePartition(ctx, coll.DBID, partition, partition.PartitionCreatedTimestamp); err != nil {
|
|
return err
|
|
}
|
|
mt.collID2Meta[partition.CollectionID].Partitions = append(mt.collID2Meta[partition.CollectionID].Partitions, partition.Clone())
|
|
|
|
metrics.RootCoordNumOfPartitions.WithLabelValues().Inc()
|
|
|
|
log.Ctx(ctx).Info("add partition to meta table",
|
|
zap.Int64("collection", partition.CollectionID), zap.String("partition", partition.PartitionName),
|
|
zap.Int64("partitionid", partition.PartitionID), zap.Uint64("ts", partition.PartitionCreatedTimestamp))
|
|
|
|
return nil
|
|
}
|
|
|
|
func (mt *MetaTable) ChangePartitionState(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error {
|
|
mt.ddLock.Lock()
|
|
defer mt.ddLock.Unlock()
|
|
|
|
coll, ok := mt.collID2Meta[collectionID]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
for idx, part := range coll.Partitions {
|
|
if part.PartitionID == partitionID {
|
|
clone := part.Clone()
|
|
clone.State = state
|
|
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
|
if err := mt.catalog.AlterPartition(ctx1, coll.DBID, part, clone, metastore.MODIFY, ts); err != nil {
|
|
return err
|
|
}
|
|
mt.collID2Meta[collectionID].Partitions[idx] = clone
|
|
|
|
switch state {
|
|
case pb.PartitionState_PartitionCreated:
|
|
// support Dynamic load/release partitions
|
|
metrics.RootCoordNumOfPartitions.WithLabelValues().Inc()
|
|
default:
|
|
metrics.RootCoordNumOfPartitions.WithLabelValues().Dec()
|
|
}
|
|
|
|
log.Ctx(ctx).Info("change partition state", zap.Int64("collection", collectionID),
|
|
zap.Int64("partition", partitionID), zap.String("state", state.String()),
|
|
zap.Uint64("ts", ts))
|
|
|
|
return nil
|
|
}
|
|
}
|
|
return fmt.Errorf("partition not exist, collection: %d, partition: %d", collectionID, partitionID)
|
|
}
|
|
|
|
func (mt *MetaTable) RemovePartition(ctx context.Context, dbID int64, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error {
|
|
mt.ddLock.Lock()
|
|
defer mt.ddLock.Unlock()
|
|
|
|
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
|
if err := mt.catalog.DropPartition(ctx1, dbID, collectionID, partitionID, ts); err != nil {
|
|
return err
|
|
}
|
|
coll, ok := mt.collID2Meta[collectionID]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
loc := -1
|
|
for idx, part := range coll.Partitions {
|
|
if part.PartitionID == partitionID {
|
|
loc = idx
|
|
break
|
|
}
|
|
}
|
|
if loc != -1 {
|
|
coll.Partitions = append(coll.Partitions[:loc], coll.Partitions[loc+1:]...)
|
|
}
|
|
log.Info("remove partition", zap.Int64("collection", collectionID), zap.Int64("partition", partitionID), zap.Uint64("ts", ts))
|
|
return nil
|
|
}
|
|
|
|
func (mt *MetaTable) CreateAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error {
|
|
mt.ddLock.Lock()
|
|
defer mt.ddLock.Unlock()
|
|
// backward compatibility for rolling upgrade
|
|
if dbName == "" {
|
|
log.Warn("db name is empty", zap.String("alias", alias), zap.String("collection", collectionName))
|
|
dbName = util.DefaultDBName
|
|
}
|
|
|
|
// It's ok that we don't read from catalog when cache missed.
|
|
// Since cache always keep the latest version, and the ts should always be the latest.
|
|
|
|
if !mt.names.exist(dbName) {
|
|
return fmt.Errorf("database %s not found", dbName)
|
|
}
|
|
|
|
if collID, ok := mt.names.get(dbName, alias); ok {
|
|
coll, ok := mt.collID2Meta[collID]
|
|
if !ok {
|
|
return fmt.Errorf("meta error, name mapped non-exist collection id")
|
|
}
|
|
// allow alias with dropping&dropped
|
|
if coll.State != pb.CollectionState_CollectionDropping && coll.State != pb.CollectionState_CollectionDropped {
|
|
return fmt.Errorf("cannot alter alias, collection already exists with same name: %s", alias)
|
|
}
|
|
}
|
|
|
|
collectionID, ok := mt.names.get(dbName, collectionName)
|
|
if !ok {
|
|
// you cannot alias to a non-existent collection.
|
|
return fmt.Errorf("collection not exists: %s", collectionName)
|
|
}
|
|
|
|
// check if alias exists.
|
|
aliasedCollectionID, ok := mt.aliases.get(dbName, alias)
|
|
if ok && aliasedCollectionID == collectionID {
|
|
log.Warn("add duplicate alias", zap.String("alias", alias), zap.String("collection", collectionName), zap.Uint64("ts", ts))
|
|
return nil
|
|
} else if ok {
|
|
// TODO: better to check if aliasedCollectionID exist or is available, though not very possible.
|
|
aliasedColl := mt.collID2Meta[aliasedCollectionID]
|
|
return fmt.Errorf("alias exists and already aliased to another collection, alias: %s, collection: %s, other collection: %s", alias, collectionName, aliasedColl.Name)
|
|
}
|
|
// alias didn't exist.
|
|
|
|
coll, ok := mt.collID2Meta[collectionID]
|
|
if !ok || !coll.Available() {
|
|
// you cannot alias to a non-existent collection.
|
|
return fmt.Errorf("collection not exists: %s", collectionName)
|
|
}
|
|
|
|
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
|
if err := mt.catalog.CreateAlias(ctx1, &model.Alias{
|
|
Name: alias,
|
|
CollectionID: collectionID,
|
|
CreatedTime: ts,
|
|
State: pb.AliasState_AliasCreated,
|
|
DbID: coll.DBID,
|
|
}, ts); err != nil {
|
|
return err
|
|
}
|
|
|
|
mt.aliases.insert(dbName, alias, collectionID)
|
|
|
|
log.Ctx(ctx).Info("create alias",
|
|
zap.String("db", dbName),
|
|
zap.String("alias", alias),
|
|
zap.String("collection", collectionName),
|
|
zap.Int64("id", coll.CollectionID),
|
|
zap.Uint64("ts", ts),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (mt *MetaTable) DropAlias(ctx context.Context, dbName string, alias string, ts Timestamp) error {
|
|
mt.ddLock.Lock()
|
|
defer mt.ddLock.Unlock()
|
|
// backward compatibility for rolling upgrade
|
|
if dbName == "" {
|
|
log.Warn("db name is empty", zap.String("alias", alias), zap.Uint64("ts", ts))
|
|
dbName = util.DefaultDBName
|
|
}
|
|
|
|
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
|
db, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := mt.catalog.DropAlias(ctx1, db.ID, alias, ts); err != nil {
|
|
return err
|
|
}
|
|
|
|
mt.aliases.remove(dbName, alias)
|
|
|
|
log.Ctx(ctx).Info("drop alias",
|
|
zap.String("db", dbName),
|
|
zap.String("alias", alias),
|
|
zap.Uint64("ts", ts),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (mt *MetaTable) AlterAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error {
|
|
mt.ddLock.Lock()
|
|
defer mt.ddLock.Unlock()
|
|
// backward compatibility for rolling upgrade
|
|
if dbName == "" {
|
|
log.Warn("db name is empty", zap.String("alias", alias), zap.String("collection", collectionName))
|
|
dbName = util.DefaultDBName
|
|
}
|
|
|
|
// It's ok that we don't read from catalog when cache missed.
|
|
// Since cache always keep the latest version, and the ts should always be the latest.
|
|
|
|
if !mt.names.exist(dbName) {
|
|
return fmt.Errorf("database not found: %s", dbName)
|
|
}
|
|
|
|
if collID, ok := mt.names.get(dbName, alias); ok {
|
|
coll := mt.collID2Meta[collID]
|
|
// allow alias with dropping&dropped
|
|
if coll.State != pb.CollectionState_CollectionDropping && coll.State != pb.CollectionState_CollectionDropped {
|
|
return fmt.Errorf("cannot alter alias, collection already exists with same name: %s", alias)
|
|
}
|
|
}
|
|
|
|
collectionID, ok := mt.names.get(dbName, collectionName)
|
|
if !ok {
|
|
// you cannot alias to a non-existent collection.
|
|
return fmt.Errorf("collection not exists: %s", collectionName)
|
|
}
|
|
|
|
coll, ok := mt.collID2Meta[collectionID]
|
|
if !ok || !coll.Available() {
|
|
// you cannot alias to a non-existent collection.
|
|
return fmt.Errorf("collection not exists: %s", collectionName)
|
|
}
|
|
|
|
// check if alias exists.
|
|
_, ok = mt.aliases.get(dbName, alias)
|
|
if !ok {
|
|
//
|
|
return fmt.Errorf("failed to alter alias, alias does not exist: %s", alias)
|
|
}
|
|
|
|
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
|
if err := mt.catalog.AlterAlias(ctx1, &model.Alias{
|
|
Name: alias,
|
|
CollectionID: collectionID,
|
|
CreatedTime: ts,
|
|
State: pb.AliasState_AliasCreated,
|
|
DbID: coll.DBID,
|
|
}, ts); err != nil {
|
|
return err
|
|
}
|
|
|
|
// alias switch to another collection anyway.
|
|
mt.aliases.insert(dbName, alias, collectionID)
|
|
|
|
log.Ctx(ctx).Info("alter alias",
|
|
zap.String("db", dbName),
|
|
zap.String("alias", alias),
|
|
zap.String("collection", collectionName),
|
|
zap.Uint64("ts", ts),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (mt *MetaTable) IsAlias(db, name string) bool {
|
|
mt.ddLock.RLock()
|
|
defer mt.ddLock.RUnlock()
|
|
|
|
_, ok := mt.aliases.get(db, name)
|
|
return ok
|
|
}
|
|
|
|
func (mt *MetaTable) listAliasesByID(collID UniqueID) []string {
|
|
ret := make([]string, 0)
|
|
mt.aliases.iterate(func(db string, collection string, id UniqueID) bool {
|
|
if collID == id {
|
|
ret = append(ret, collection)
|
|
}
|
|
return true
|
|
})
|
|
return ret
|
|
}
|
|
|
|
func (mt *MetaTable) ListAliasesByID(collID UniqueID) []string {
|
|
mt.ddLock.RLock()
|
|
defer mt.ddLock.RUnlock()
|
|
|
|
return mt.listAliasesByID(collID)
|
|
}
|
|
|
|
// GetPartitionNameByID serve for bulk insert.
|
|
func (mt *MetaTable) GetPartitionNameByID(collID UniqueID, partitionID UniqueID, ts Timestamp) (string, error) {
|
|
mt.ddLock.RLock()
|
|
defer mt.ddLock.RUnlock()
|
|
|
|
coll, ok := mt.collID2Meta[collID]
|
|
if ok && coll.Available() && coll.CreateTime <= ts {
|
|
// cache hit.
|
|
for _, partition := range coll.Partitions {
|
|
if partition.Available() && partition.PartitionID == partitionID && partition.PartitionCreatedTimestamp <= ts {
|
|
// cache hit.
|
|
return partition.PartitionName, nil
|
|
}
|
|
}
|
|
}
|
|
// cache miss, get from catalog anyway.
|
|
coll, err := mt.catalog.GetCollectionByID(mt.ctx, coll.DBID, ts, collID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if !coll.Available() {
|
|
return "", fmt.Errorf("collection not exist: %d", collID)
|
|
}
|
|
for _, partition := range coll.Partitions {
|
|
// no need to check time travel logic again, since catalog already did.
|
|
if partition.Available() && partition.PartitionID == partitionID {
|
|
return partition.PartitionName, nil
|
|
}
|
|
}
|
|
return "", fmt.Errorf("partition not exist: %d", partitionID)
|
|
}
|
|
|
|
// GetPartitionByName serve for bulk insert.
|
|
func (mt *MetaTable) GetPartitionByName(collID UniqueID, partitionName string, ts Timestamp) (UniqueID, error) {
|
|
mt.ddLock.RLock()
|
|
defer mt.ddLock.RUnlock()
|
|
|
|
coll, ok := mt.collID2Meta[collID]
|
|
if ok && coll.Available() && coll.CreateTime <= ts {
|
|
// cache hit.
|
|
for _, partition := range coll.Partitions {
|
|
if partition.Available() && partition.PartitionName == partitionName && partition.PartitionCreatedTimestamp <= ts {
|
|
// cache hit.
|
|
return partition.PartitionID, nil
|
|
}
|
|
}
|
|
}
|
|
// cache miss, get from catalog anyway.
|
|
coll, err := mt.catalog.GetCollectionByID(mt.ctx, coll.DBID, ts, collID)
|
|
if err != nil {
|
|
return common.InvalidPartitionID, err
|
|
}
|
|
if !coll.Available() {
|
|
return common.InvalidPartitionID, fmt.Errorf("collection not exist: %d", collID)
|
|
}
|
|
for _, partition := range coll.Partitions {
|
|
// no need to check time travel logic again, since catalog already did.
|
|
if partition.Available() && partition.PartitionName == partitionName {
|
|
return partition.PartitionID, nil
|
|
}
|
|
}
|
|
|
|
log.Error("partition ID not found for partition name", zap.String("partitionName", partitionName),
|
|
zap.Int64("collectionID", collID), zap.String("collectionName", coll.Name))
|
|
return common.InvalidPartitionID, fmt.Errorf("partition ID not found for partition name '%s' in collection '%s'",
|
|
partitionName, coll.Name)
|
|
}
|
|
|
|
// AddCredential add credential
|
|
func (mt *MetaTable) AddCredential(credInfo *internalpb.CredentialInfo) error {
|
|
if credInfo.Username == "" {
|
|
return fmt.Errorf("username is empty")
|
|
}
|
|
mt.permissionLock.Lock()
|
|
defer mt.permissionLock.Unlock()
|
|
|
|
usernames, err := mt.catalog.ListCredentials(mt.ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(usernames) >= Params.ProxyCfg.MaxUserNum.GetAsInt() {
|
|
errMsg := "unable to add user because the number of users has reached the limit"
|
|
log.Error(errMsg, zap.Int("max_user_num", Params.ProxyCfg.MaxUserNum.GetAsInt()))
|
|
return errors.New(errMsg)
|
|
}
|
|
|
|
if origin, _ := mt.catalog.GetCredential(mt.ctx, credInfo.Username); origin != nil {
|
|
return fmt.Errorf("user already exists: %s", credInfo.Username)
|
|
}
|
|
|
|
credential := &model.Credential{
|
|
Username: credInfo.Username,
|
|
EncryptedPassword: credInfo.EncryptedPassword,
|
|
}
|
|
return mt.catalog.CreateCredential(mt.ctx, credential)
|
|
}
|
|
|
|
// AlterCredential update credential
|
|
func (mt *MetaTable) AlterCredential(credInfo *internalpb.CredentialInfo) error {
|
|
if credInfo.Username == "" {
|
|
return fmt.Errorf("username is empty")
|
|
}
|
|
|
|
mt.permissionLock.Lock()
|
|
defer mt.permissionLock.Unlock()
|
|
|
|
credential := &model.Credential{
|
|
Username: credInfo.Username,
|
|
EncryptedPassword: credInfo.EncryptedPassword,
|
|
}
|
|
return mt.catalog.AlterCredential(mt.ctx, credential)
|
|
}
|
|
|
|
// GetCredential get credential by username
|
|
func (mt *MetaTable) GetCredential(username string) (*internalpb.CredentialInfo, error) {
|
|
mt.permissionLock.RLock()
|
|
defer mt.permissionLock.RUnlock()
|
|
|
|
credential, err := mt.catalog.GetCredential(mt.ctx, username)
|
|
return model.MarshalCredentialModel(credential), err
|
|
}
|
|
|
|
// DeleteCredential delete credential
|
|
func (mt *MetaTable) DeleteCredential(username string) error {
|
|
mt.permissionLock.Lock()
|
|
defer mt.permissionLock.Unlock()
|
|
|
|
return mt.catalog.DropCredential(mt.ctx, username)
|
|
}
|
|
|
|
// ListCredentialUsernames list credential usernames
|
|
func (mt *MetaTable) ListCredentialUsernames() (*milvuspb.ListCredUsersResponse, error) {
|
|
mt.permissionLock.RLock()
|
|
defer mt.permissionLock.RUnlock()
|
|
|
|
usernames, err := mt.catalog.ListCredentials(mt.ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list credential usernames err:%w", err)
|
|
}
|
|
return &milvuspb.ListCredUsersResponse{Usernames: usernames}, nil
|
|
}
|
|
|
|
// CreateRole create role
|
|
func (mt *MetaTable) CreateRole(tenant string, entity *milvuspb.RoleEntity) error {
|
|
if funcutil.IsEmptyString(entity.Name) {
|
|
return fmt.Errorf("the role name in the role info is empty")
|
|
}
|
|
mt.permissionLock.Lock()
|
|
defer mt.permissionLock.Unlock()
|
|
|
|
results, err := mt.catalog.ListRole(mt.ctx, tenant, nil, false)
|
|
if err != nil {
|
|
log.Warn("fail to list roles", zap.Error(err))
|
|
return err
|
|
}
|
|
for _, result := range results {
|
|
if result.GetRole().GetName() == entity.Name {
|
|
log.Info("role already exists", zap.String("role", entity.Name))
|
|
return common.NewIgnorableError(errors.Newf("role [%s] already exists", entity))
|
|
}
|
|
}
|
|
if len(results) >= Params.ProxyCfg.MaxRoleNum.GetAsInt() {
|
|
errMsg := "unable to create role because the number of roles has reached the limit"
|
|
log.Warn(errMsg, zap.Int("max_role_num", Params.ProxyCfg.MaxRoleNum.GetAsInt()))
|
|
return errors.New(errMsg)
|
|
}
|
|
|
|
return mt.catalog.CreateRole(mt.ctx, tenant, entity)
|
|
}
|
|
|
|
// DropRole drop role info
|
|
func (mt *MetaTable) DropRole(tenant string, roleName string) error {
|
|
mt.permissionLock.Lock()
|
|
defer mt.permissionLock.Unlock()
|
|
|
|
return mt.catalog.DropRole(mt.ctx, tenant, roleName)
|
|
}
|
|
|
|
// OperateUserRole operate the relationship between a user and a role, including adding a user to a role and removing a user from a role
|
|
func (mt *MetaTable) OperateUserRole(tenant string, userEntity *milvuspb.UserEntity, roleEntity *milvuspb.RoleEntity, operateType milvuspb.OperateUserRoleType) error {
|
|
if funcutil.IsEmptyString(userEntity.Name) {
|
|
return fmt.Errorf("username in the user entity is empty")
|
|
}
|
|
if funcutil.IsEmptyString(roleEntity.Name) {
|
|
return fmt.Errorf("role name in the role entity is empty")
|
|
}
|
|
|
|
mt.permissionLock.Lock()
|
|
defer mt.permissionLock.Unlock()
|
|
|
|
return mt.catalog.AlterUserRole(mt.ctx, tenant, userEntity, roleEntity, operateType)
|
|
}
|
|
|
|
// SelectRole select role.
|
|
// Enter the role condition by the entity param. And this param is nil, which means selecting all roles.
|
|
// Get all users that are added to the role by setting the includeUserInfo param to true.
|
|
func (mt *MetaTable) SelectRole(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error) {
|
|
mt.permissionLock.RLock()
|
|
defer mt.permissionLock.RUnlock()
|
|
|
|
return mt.catalog.ListRole(mt.ctx, tenant, entity, includeUserInfo)
|
|
}
|
|
|
|
// SelectUser select user.
|
|
// Enter the user condition by the entity param. And this param is nil, which means selecting all users.
|
|
// Get all roles that are added the user to by setting the includeRoleInfo param to true.
|
|
func (mt *MetaTable) SelectUser(tenant string, entity *milvuspb.UserEntity, includeRoleInfo bool) ([]*milvuspb.UserResult, error) {
|
|
mt.permissionLock.RLock()
|
|
defer mt.permissionLock.RUnlock()
|
|
|
|
return mt.catalog.ListUser(mt.ctx, tenant, entity, includeRoleInfo)
|
|
}
|
|
|
|
// OperatePrivilege grant or revoke privilege by setting the operateType param
|
|
func (mt *MetaTable) OperatePrivilege(tenant string, entity *milvuspb.GrantEntity, operateType milvuspb.OperatePrivilegeType) error {
|
|
if funcutil.IsEmptyString(entity.ObjectName) {
|
|
return fmt.Errorf("the object name in the grant entity is empty")
|
|
}
|
|
if entity.Object == nil || funcutil.IsEmptyString(entity.Object.Name) {
|
|
return fmt.Errorf("the object entity in the grant entity is invalid")
|
|
}
|
|
if entity.Role == nil || funcutil.IsEmptyString(entity.Role.Name) {
|
|
return fmt.Errorf("the role entity in the grant entity is invalid")
|
|
}
|
|
if entity.Grantor == nil {
|
|
return fmt.Errorf("the grantor in the grant entity is empty")
|
|
}
|
|
if entity.Grantor.Privilege == nil || funcutil.IsEmptyString(entity.Grantor.Privilege.Name) {
|
|
return fmt.Errorf("the privilege name in the grant entity is empty")
|
|
}
|
|
if entity.Grantor.User == nil || funcutil.IsEmptyString(entity.Grantor.User.Name) {
|
|
return fmt.Errorf("the grantor name in the grant entity is empty")
|
|
}
|
|
if !funcutil.IsRevoke(operateType) && !funcutil.IsGrant(operateType) {
|
|
return fmt.Errorf("the operate type in the grant entity is invalid")
|
|
}
|
|
if entity.DbName == "" {
|
|
entity.DbName = util.DefaultDBName
|
|
}
|
|
|
|
mt.permissionLock.Lock()
|
|
defer mt.permissionLock.Unlock()
|
|
|
|
return mt.catalog.AlterGrant(mt.ctx, tenant, entity, operateType)
|
|
}
|
|
|
|
// SelectGrant select grant
|
|
// The principal entity MUST be not empty in the grant entity
|
|
// The resource entity and the resource name are optional, and the two params should be not empty together when you select some grants about the resource kind.
|
|
func (mt *MetaTable) SelectGrant(tenant string, entity *milvuspb.GrantEntity) ([]*milvuspb.GrantEntity, error) {
|
|
var entities []*milvuspb.GrantEntity
|
|
if entity == nil {
|
|
return entities, fmt.Errorf("the grant entity is nil")
|
|
}
|
|
|
|
if entity.Role == nil || funcutil.IsEmptyString(entity.Role.Name) {
|
|
return entities, fmt.Errorf("the role entity in the grant entity is invalid")
|
|
}
|
|
if entity.DbName == "" {
|
|
entity.DbName = util.DefaultDBName
|
|
}
|
|
|
|
mt.permissionLock.RLock()
|
|
defer mt.permissionLock.RUnlock()
|
|
|
|
return mt.catalog.ListGrant(mt.ctx, tenant, entity)
|
|
}
|
|
|
|
func (mt *MetaTable) DropGrant(tenant string, role *milvuspb.RoleEntity) error {
|
|
if role == nil || funcutil.IsEmptyString(role.Name) {
|
|
return fmt.Errorf("the role entity is invalid when dropping the grant")
|
|
}
|
|
mt.permissionLock.Lock()
|
|
defer mt.permissionLock.Unlock()
|
|
|
|
return mt.catalog.DeleteGrant(mt.ctx, tenant, role)
|
|
}
|
|
|
|
func (mt *MetaTable) ListPolicy(tenant string) ([]string, error) {
|
|
mt.permissionLock.RLock()
|
|
defer mt.permissionLock.RUnlock()
|
|
|
|
return mt.catalog.ListPolicy(mt.ctx, tenant)
|
|
}
|
|
|
|
func (mt *MetaTable) ListUserRole(tenant string) ([]string, error) {
|
|
mt.permissionLock.RLock()
|
|
defer mt.permissionLock.RUnlock()
|
|
|
|
return mt.catalog.ListUserRole(mt.ctx, tenant)
|
|
}
|