// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rootcoord import ( "context" "fmt" "math/rand" "os" "strconv" "sync" "time" "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/tidwall/gjson" "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/coordinator/coordclient" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/kv/tikv" "github.com/milvus-io/milvus/internal/metastore" kvmetestore "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" "github.com/milvus-io/milvus/internal/metastore/model" streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server" "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry" tso2 "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/streamingutil" tsoutil2 "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" pb "github.com/milvus-io/milvus/pkg/proto/etcdpb" "github.com/milvus-io/milvus/pkg/proto/internalpb" "github.com/milvus-io/milvus/pkg/proto/proxypb" "github.com/milvus-io/milvus/pkg/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/crypto" "github.com/milvus-io/milvus/pkg/util/expr" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) // UniqueID is an alias of typeutil.UniqueID. type UniqueID = typeutil.UniqueID // Timestamp is an alias of typeutil.Timestamp type Timestamp = typeutil.Timestamp const InvalidCollectionID = UniqueID(0) var Params *paramtable.ComponentParam = paramtable.Get() type Opt func(*Core) type metaKVCreator func() kv.MetaKv // Core root coordinator core type Core struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup etcdCli *clientv3.Client tikvCli *txnkv.Client address string meta IMetaTable scheduler IScheduler broker Broker ddlTsLockManager DdlTsLockManager garbageCollector GarbageCollector stepExecutor StepExecutor metaKVCreator metaKVCreator proxyCreator proxyutil.ProxyCreator proxyWatcher *proxyutil.ProxyWatcher proxyClientManager proxyutil.ProxyClientManagerInterface metricsCacheManager *metricsinfo.MetricsCacheManager chanTimeTick *timetickSync idAllocator allocator.Interface tsoAllocator tso2.Allocator dataCoord types.DataCoordClient queryCoord types.QueryCoordClient quotaCenter *QuotaCenter stateCode atomic.Int32 initOnce sync.Once startOnce sync.Once session *sessionutil.Session factory dependency.Factory enableActiveStandBy bool activateFunc func() error metricsRequest *metricsinfo.MetricsRequest streamingCoord *streamingcoord.Server } // --------------------- function -------------------------- // NewCore creates a new rootcoord core func NewCore(c context.Context, factory dependency.Factory) (*Core, error) { ctx, cancel := context.WithCancel(c) rand.Seed(time.Now().UnixNano()) core := &Core{ ctx: ctx, cancel: cancel, factory: factory, enableActiveStandBy: Params.RootCoordCfg.EnableActiveStandby.GetAsBool(), metricsRequest: metricsinfo.NewMetricsRequest(), } core.UpdateStateCode(commonpb.StateCode_Abnormal) core.SetProxyCreator(proxyutil.DefaultProxyCreator) expr.Register("rootcoord", core) return core, nil } // UpdateStateCode update state code func (c *Core) UpdateStateCode(code commonpb.StateCode) { c.stateCode.Store(int32(code)) log.Ctx(c.ctx).Info("update rootcoord state", zap.String("state", code.String())) } func (c *Core) GetStateCode() commonpb.StateCode { return commonpb.StateCode(c.stateCode.Load()) } func (c *Core) sendTimeTick(t Timestamp, reason string) error { pc := c.chanTimeTick.listDmlChannels() pt := make([]uint64, len(pc)) for i := 0; i < len(pt); i++ { pt[i] = t } ttMsg := internalpb.ChannelTimeTickMsg{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_TimeTick), commonpbutil.WithTimeStamp(t), commonpbutil.WithSourceID(ddlSourceID), ), ChannelNames: pc, Timestamps: pt, DefaultTimestamp: t, } return c.chanTimeTick.updateTimeTick(&ttMsg, reason) } func (c *Core) sendMinDdlTsAsTt() { if !paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool() { return } log := log.Ctx(c.ctx) code := c.GetStateCode() if code != commonpb.StateCode_Healthy { log.Warn("rootCoord is not healthy, skip send timetick") return } minBgDdlTs := c.ddlTsLockManager.GetMinDdlTs() minNormalDdlTs := c.scheduler.GetMinDdlTs() minDdlTs := funcutil.Min(minBgDdlTs, minNormalDdlTs) // zero -> ddlTsLockManager and scheduler not started. if minDdlTs == typeutil.ZeroTimestamp { log.Warn("zero ts was met, this should be only occurred in starting state", zap.Uint64("minBgDdlTs", minBgDdlTs), zap.Uint64("minNormalDdlTs", minNormalDdlTs)) return } // max -> abnormal case, impossible. if minDdlTs == typeutil.MaxTimestamp { log.Warn("ddl ts is abnormal, max ts was met", zap.Uint64("minBgDdlTs", minBgDdlTs), zap.Uint64("minNormalDdlTs", minNormalDdlTs)) return } if err := c.sendTimeTick(minDdlTs, "timetick loop"); err != nil { log.Warn("failed to send timetick", zap.Error(err)) } } func (c *Core) startTimeTickLoop() { log := log.Ctx(c.ctx) defer c.wg.Done() ticker := time.NewTicker(Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond)) defer ticker.Stop() for { select { case <-c.ctx.Done(): log.Info("rootcoord's timetick loop quit!") return case <-ticker.C: c.sendMinDdlTsAsTt() } } } func (c *Core) tsLoop() { defer c.wg.Done() tsoTicker := time.NewTicker(tso2.UpdateTimestampStep) defer tsoTicker.Stop() ctx, cancel := context.WithCancel(c.ctx) defer cancel() log := log.Ctx(c.ctx) for { select { case <-tsoTicker.C: if err := c.tsoAllocator.UpdateTSO(); err != nil { log.Warn("failed to update tso", zap.Error(err)) continue } ts := c.tsoAllocator.GetLastSavedTime() metrics.RootCoordTimestampSaved.Set(float64(ts.Unix())) case <-ctx.Done(): log.Info("rootcoord's ts loop quit!") return } } } func (c *Core) SetProxyCreator(f func(ctx context.Context, addr string, nodeID int64) (types.ProxyClient, error)) { c.proxyCreator = f } func (c *Core) SetDataCoordClient(s types.DataCoordClient) error { if s == nil { return errors.New("null DataCoord interface") } c.dataCoord = s return nil } func (c *Core) SetQueryCoordClient(s types.QueryCoordClient) error { if s == nil { return errors.New("null QueryCoord interface") } c.queryCoord = s return nil } // Register register rootcoord at etcd func (c *Core) Register() error { log := log.Ctx(c.ctx) c.session.Register() afterRegister := func() { metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.RootCoordRole).Inc() log.Info("RootCoord Register Finished") c.session.LivenessCheck(c.ctx, func() { log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID)) os.Exit(1) }) } if c.enableActiveStandBy { go func() { if err := c.session.ProcessActiveStandBy(c.activateFunc); err != nil { log.Warn("failed to activate standby rootcoord server", zap.Error(err)) panic(err) } afterRegister() }() } else { afterRegister() } return nil } func (c *Core) SetAddress(address string) { c.address = address } // SetEtcdClient sets the etcdCli of Core func (c *Core) SetEtcdClient(etcdClient *clientv3.Client) { c.etcdCli = etcdClient } // SetTiKVClient sets the tikvCli of Core func (c *Core) SetTiKVClient(client *txnkv.Client) { c.tikvCli = client } func (c *Core) initSession() error { c.session = sessionutil.NewSession(c.ctx) if c.session == nil { return fmt.Errorf("session is nil, the etcd client connection may have failed") } c.session.Init(typeutil.RootCoordRole, c.address, true, true) c.session.SetEnableActiveStandBy(c.enableActiveStandBy) return nil } func (c *Core) initKVCreator() { if c.metaKVCreator == nil { if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV { c.metaKVCreator = func() kv.MetaKv { return tikv.NewTiKV(c.tikvCli, Params.TiKVCfg.MetaRootPath.GetValue(), tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond))) } } else { c.metaKVCreator = func() kv.MetaKv { return etcdkv.NewEtcdKV(c.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(), etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))) } } } } func (c *Core) initStreamingCoord() { c.streamingCoord = streamingcoord.NewServerBuilder(). WithETCD(c.etcdCli). WithMetaKV(c.metaKVCreator()). WithSession(c.session). WithRootCoordClient(coordclient.MustGetLocalRootCoordClientFuture()). Build() } func (c *Core) initMetaTable(initCtx context.Context) error { fn := func() error { var catalog metastore.RootCoordCatalog var err error switch Params.MetaStoreCfg.MetaStoreType.GetValue() { case util.MetaStoreTypeEtcd: log.Ctx(initCtx).Info("Using etcd as meta storage.") var ss *kvmetestore.SuffixSnapshot var err error metaKV := c.metaKVCreator() if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.EtcdCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil { return err } catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss} case util.MetaStoreTypeTiKV: log.Ctx(initCtx).Info("Using tikv as meta storage.") var ss *kvmetestore.SuffixSnapshot var err error metaKV := c.metaKVCreator() if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.TiKVCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil { return err } catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss} default: return retry.Unrecoverable(fmt.Errorf("not supported meta store: %s", Params.MetaStoreCfg.MetaStoreType.GetValue())) } if c.meta, err = NewMetaTable(c.ctx, catalog, c.tsoAllocator); err != nil { return err } return nil } return retry.Do(initCtx, fn, retry.Attempts(10)) } func (c *Core) initIDAllocator(initCtx context.Context) error { var tsoKV kv.TxnKV var kvPath string if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV { kvPath = Params.TiKVCfg.KvRootPath.GetValue() tsoKV = tsoutil2.NewTSOTiKVBase(c.tikvCli, kvPath, globalIDAllocatorSubPath) } else { kvPath = Params.EtcdCfg.KvRootPath.GetValue() tsoKV = tsoutil2.NewTSOKVBase(c.etcdCli, kvPath, globalIDAllocatorSubPath) } idAllocator := allocator.NewGlobalIDAllocator(globalIDAllocatorKey, tsoKV) if err := idAllocator.Initialize(); err != nil { return err } c.idAllocator = idAllocator log.Ctx(initCtx).Info("id allocator initialized", zap.String("root_path", kvPath), zap.String("sub_path", globalIDAllocatorSubPath), zap.String("key", globalIDAllocatorKey)) return nil } func (c *Core) initTSOAllocator(initCtx context.Context) error { var tsoKV kv.TxnKV var kvPath string if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV { kvPath = Params.TiKVCfg.KvRootPath.GetValue() tsoKV = tsoutil2.NewTSOTiKVBase(c.tikvCli, Params.TiKVCfg.KvRootPath.GetValue(), globalIDAllocatorSubPath) } else { kvPath = Params.EtcdCfg.KvRootPath.GetValue() tsoKV = tsoutil2.NewTSOKVBase(c.etcdCli, Params.EtcdCfg.KvRootPath.GetValue(), globalIDAllocatorSubPath) } tsoAllocator := tso2.NewGlobalTSOAllocator(globalTSOAllocatorKey, tsoKV) if err := tsoAllocator.Initialize(); err != nil { return err } c.tsoAllocator = tsoAllocator log.Ctx(initCtx).Info("tso allocator initialized", zap.String("root_path", kvPath), zap.String("sub_path", globalIDAllocatorSubPath), zap.String("key", globalIDAllocatorKey)) return nil } func (c *Core) initInternal() error { initCtx, initSpan := log.NewIntentContext(typeutil.RootCoordRole, "initInternal") defer initSpan.End() log := log.Ctx(initCtx) c.UpdateStateCode(commonpb.StateCode_Initializing) if err := c.initIDAllocator(initCtx); err != nil { return err } if err := c.initTSOAllocator(initCtx); err != nil { return err } if err := c.initMetaTable(initCtx); err != nil { return err } c.scheduler = newScheduler(c.ctx, c.idAllocator, c.tsoAllocator) c.factory.Init(Params) chanMap := c.meta.ListCollectionPhysicalChannels(c.ctx) c.chanTimeTick = newTimeTickSync(initCtx, c.ctx, c.session.ServerID, c.factory, chanMap) log.Info("create TimeTick sync done") c.proxyClientManager = proxyutil.NewProxyClientManager(c.proxyCreator) c.broker = newServerBroker(c) c.ddlTsLockManager = newDdlTsLockManager(c.tsoAllocator) c.garbageCollector = newBgGarbageCollector(c) c.stepExecutor = newBgStepExecutor(c.ctx) if err := c.streamingCoord.Start(c.ctx); err != nil { log.Info("start streaming coord failed", zap.Error(err)) return err } if !streamingutil.IsStreamingServiceEnabled() { c.proxyWatcher = proxyutil.NewProxyWatcher( c.etcdCli, c.chanTimeTick.initSessions, c.proxyClientManager.AddProxyClients, ) c.proxyWatcher.AddSessionFunc(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient) c.proxyWatcher.DelSessionFunc(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient) } else { c.proxyWatcher = proxyutil.NewProxyWatcher( c.etcdCli, c.proxyClientManager.AddProxyClients, ) c.proxyWatcher.AddSessionFunc(c.proxyClientManager.AddProxyClient) c.proxyWatcher.DelSessionFunc(c.proxyClientManager.DelProxyClient) } log.Info("init proxy manager done") c.metricsCacheManager = metricsinfo.NewMetricsCacheManager() c.quotaCenter = NewQuotaCenter(c.proxyClientManager, c.queryCoord, c.dataCoord, c.tsoAllocator, c.meta) log.Debug("RootCoord init QuotaCenter done") if err := c.initCredentials(initCtx); err != nil { return err } log.Info("init credentials done") if err := c.initRbac(initCtx); err != nil { return err } log.Info("init rootcoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", c.address)) return nil } func (c *Core) registerMetricsRequest() { c.metricsRequest.RegisterMetricsRequest(metricsinfo.SystemInfoMetrics, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return c.getSystemInfoMetrics(ctx, req) }) log.Ctx(c.ctx).Info("register metrics actions finished") } // Init initialize routine func (c *Core) Init() error { log := log.Ctx(c.ctx) var initError error c.registerMetricsRequest() c.factory.Init(Params) if err := c.initSession(); err != nil { return err } c.initKVCreator() c.initStreamingCoord() if c.enableActiveStandBy { c.activateFunc = func() error { log.Info("RootCoord switch from standby to active, activating") var err error c.initOnce.Do(func() { if err = c.initInternal(); err != nil { log.Error("RootCoord init failed", zap.Error(err)) } }) if err != nil { return err } c.startOnce.Do(func() { if err = c.startInternal(); err != nil { log.Error("RootCoord start failed", zap.Error(err)) } }) log.Info("RootCoord startup success", zap.String("address", c.session.Address)) return err } c.UpdateStateCode(commonpb.StateCode_StandBy) log.Info("RootCoord enter standby mode successfully") } else { c.initOnce.Do(func() { initError = c.initInternal() }) } return initError } func (c *Core) initCredentials(initCtx context.Context) error { credInfo, _ := c.meta.GetCredential(initCtx, util.UserRoot) if credInfo == nil { encryptedRootPassword, err := crypto.PasswordEncrypt(Params.CommonCfg.DefaultRootPassword.GetValue()) if err != nil { log.Ctx(initCtx).Warn("RootCoord init user root failed", zap.Error(err)) return err } log.Ctx(initCtx).Info("RootCoord init user root") err = c.meta.AddCredential(initCtx, &internalpb.CredentialInfo{Username: util.UserRoot, EncryptedPassword: encryptedRootPassword}) if err != nil { log.Ctx(initCtx).Warn("RootCoord init user root failed", zap.Error(err)) return err } return nil } return nil } func (c *Core) initRbac(initCtx context.Context) error { var err error // create default roles, including admin, public for _, role := range util.DefaultRoles { err = c.meta.CreateRole(initCtx, util.DefaultTenant, &milvuspb.RoleEntity{Name: role}) if err != nil && !common.IsIgnorableError(err) { return errors.Wrap(err, "failed to create role") } } if Params.ProxyCfg.EnablePublicPrivilege.GetAsBool() { err = c.initPublicRolePrivilege(initCtx) if err != nil { return err } } if Params.RoleCfg.Enabled.GetAsBool() { return c.initBuiltinRoles() } return nil } func (c *Core) initPublicRolePrivilege(initCtx context.Context) error { // grant privileges for the public role globalPrivileges := []string{ commonpb.ObjectPrivilege_PrivilegeDescribeCollection.String(), commonpb.ObjectPrivilege_PrivilegeListAliases.String(), } collectionPrivileges := []string{ commonpb.ObjectPrivilege_PrivilegeIndexDetail.String(), } var err error for _, globalPrivilege := range globalPrivileges { err = c.meta.OperatePrivilege(initCtx, util.DefaultTenant, &milvuspb.GrantEntity{ Role: &milvuspb.RoleEntity{Name: util.RolePublic}, Object: &milvuspb.ObjectEntity{Name: commonpb.ObjectType_Global.String()}, ObjectName: util.AnyWord, DbName: util.AnyWord, Grantor: &milvuspb.GrantorEntity{ User: &milvuspb.UserEntity{Name: util.UserRoot}, Privilege: &milvuspb.PrivilegeEntity{Name: globalPrivilege}, }, }, milvuspb.OperatePrivilegeType_Grant) if err != nil && !common.IsIgnorableError(err) { return errors.Wrap(err, "failed to grant global privilege") } } for _, collectionPrivilege := range collectionPrivileges { err = c.meta.OperatePrivilege(initCtx, util.DefaultTenant, &milvuspb.GrantEntity{ Role: &milvuspb.RoleEntity{Name: util.RolePublic}, Object: &milvuspb.ObjectEntity{Name: commonpb.ObjectType_Collection.String()}, ObjectName: util.AnyWord, DbName: util.AnyWord, Grantor: &milvuspb.GrantorEntity{ User: &milvuspb.UserEntity{Name: util.UserRoot}, Privilege: &milvuspb.PrivilegeEntity{Name: collectionPrivilege}, }, }, milvuspb.OperatePrivilegeType_Grant) if err != nil && !common.IsIgnorableError(err) { return errors.Wrap(err, "failed to grant collection privilege") } } return nil } func (c *Core) initBuiltinRoles() error { log := log.Ctx(c.ctx) rolePrivilegesMap := Params.RoleCfg.Roles.GetAsRoleDetails() for role, privilegesJSON := range rolePrivilegesMap { err := c.meta.CreateRole(c.ctx, util.DefaultTenant, &milvuspb.RoleEntity{Name: role}) if err != nil && !common.IsIgnorableError(err) { log.Error("create a builtin role fail", zap.String("roleName", role), zap.Error(err)) return errors.Wrapf(err, "failed to create a builtin role: %s", role) } for _, privilege := range privilegesJSON[util.RoleConfigPrivileges] { privilegeName := privilege[util.RoleConfigPrivilege] if !util.IsAnyWord(privilege[util.RoleConfigPrivilege]) { dbPrivName, err := c.getMetastorePrivilegeName(c.ctx, privilege[util.RoleConfigPrivilege]) if err != nil { return errors.Wrapf(err, "failed to get metastore privilege name for: %s", privilege[util.RoleConfigPrivilege]) } privilegeName = dbPrivName } err := c.meta.OperatePrivilege(c.ctx, util.DefaultTenant, &milvuspb.GrantEntity{ Role: &milvuspb.RoleEntity{Name: role}, Object: &milvuspb.ObjectEntity{Name: privilege[util.RoleConfigObjectType]}, ObjectName: privilege[util.RoleConfigObjectName], DbName: privilege[util.RoleConfigDBName], Grantor: &milvuspb.GrantorEntity{ User: &milvuspb.UserEntity{Name: util.UserRoot}, Privilege: &milvuspb.PrivilegeEntity{Name: privilegeName}, }, }, milvuspb.OperatePrivilegeType_Grant) if err != nil && !common.IsIgnorableError(err) { log.Error("grant privilege to builtin role fail", zap.String("roleName", role), zap.Any("privilege", privilege), zap.Error(err)) return errors.Wrapf(err, "failed to grant privilege: <%s, %s, %s> of db: %s to role: %s", privilege[util.RoleConfigObjectType], privilege[util.RoleConfigObjectName], privilege[util.RoleConfigPrivilege], privilege[util.RoleConfigDBName], role) } } util.BuiltinRoles = append(util.BuiltinRoles, role) log.Info("init a builtin role successfully", zap.String("roleName", role)) } return nil } func (c *Core) restore(ctx context.Context) error { dbs, err := c.meta.ListDatabases(ctx, typeutil.MaxTimestamp) if err != nil { return err } for _, db := range dbs { colls, err := c.meta.ListCollections(ctx, db.Name, typeutil.MaxTimestamp, false) if err != nil { return err } for _, coll := range colls { ts, err := c.tsoAllocator.GenerateTSO(1) if err != nil { return err } if coll.Available() { for _, part := range coll.Partitions { switch part.State { case pb.PartitionState_PartitionDropping: go c.garbageCollector.ReDropPartition(coll.DBID, coll.PhysicalChannelNames, coll.VirtualChannelNames, part.Clone(), ts) case pb.PartitionState_PartitionCreating: go c.garbageCollector.RemoveCreatingPartition(coll.DBID, part.Clone(), ts) default: } } } else { switch coll.State { case pb.CollectionState_CollectionDropping: go c.garbageCollector.ReDropCollection(coll.Clone(), ts) case pb.CollectionState_CollectionCreating: go c.garbageCollector.RemoveCreatingCollection(coll.Clone()) default: } } } } return nil } func (c *Core) startInternal() error { log := log.Ctx(c.ctx) if err := c.proxyWatcher.WatchProxy(c.ctx); err != nil { log.Fatal("rootcoord failed to watch proxy", zap.Error(err)) // you can not just stuck here, panic(err) } if err := c.restore(c.ctx); err != nil { panic(err) } if Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() { c.quotaCenter.Start() } c.scheduler.Start() c.stepExecutor.Start() go func() { // refresh rbac cache if err := retry.Do(c.ctx, func() error { if err := c.proxyClientManager.RefreshPolicyInfoCache(c.ctx, &proxypb.RefreshPolicyInfoCacheRequest{ OpType: int32(typeutil.CacheRefresh), }); err != nil { log.RatedWarn(60, "fail to refresh policy info cache", zap.Error(err)) return err } return nil }, retry.Attempts(100), retry.Sleep(time.Second)); err != nil { log.Warn("fail to refresh policy info cache", zap.Error(err)) } }() c.startServerLoop() c.UpdateStateCode(commonpb.StateCode_Healthy) sessionutil.SaveServerInfo(typeutil.RootCoordRole, c.session.ServerID) log.Info("rootcoord startup successfully") // regster the core as a appendoperator for broadcast service. // TODO: should be removed at 2.6.0. // Add the wal accesser to the broadcaster registry for making broadcast operation. registry.Register(registry.AppendOperatorTypeMsgstream, newMsgStreamAppendOperator(c)) return nil } func (c *Core) startServerLoop() { c.wg.Add(1) go c.tsLoop() if !streamingutil.IsStreamingServiceEnabled() { c.wg.Add(2) go c.startTimeTickLoop() go c.chanTimeTick.startWatch(&c.wg) } } // Start starts RootCoord. func (c *Core) Start() error { var err error if !c.enableActiveStandBy { c.startOnce.Do(func() { err = c.startInternal() }) } return err } func (c *Core) stopExecutor() { if c.stepExecutor != nil { c.stepExecutor.Stop() log.Ctx(c.ctx).Info("stop rootcoord executor") } } func (c *Core) stopScheduler() { if c.scheduler != nil { c.scheduler.Stop() log.Ctx(c.ctx).Info("stop rootcoord scheduler") } } func (c *Core) cancelIfNotNil() { if c.cancel != nil { c.cancel() log.Ctx(c.ctx).Info("cancel rootcoord goroutines") } } func (c *Core) revokeSession() { if c.session != nil { // wait at most one second to revoke c.session.Stop() log.Ctx(c.ctx).Info("rootcoord session stop") } } // Stop stops rootCoord. func (c *Core) Stop() error { c.UpdateStateCode(commonpb.StateCode_Abnormal) c.stopExecutor() c.stopScheduler() if c.streamingCoord != nil { c.streamingCoord.Stop() } if c.proxyWatcher != nil { c.proxyWatcher.Stop() } if c.quotaCenter != nil { c.quotaCenter.stop() } c.revokeSession() c.cancelIfNotNil() c.wg.Wait() return nil } // GetComponentStates get states of components func (c *Core) GetComponentStates(ctx context.Context, req *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) { code := c.GetStateCode() log.Ctx(ctx).Debug("RootCoord current state", zap.String("StateCode", code.String())) nodeID := common.NotRegisteredID if c.session != nil && c.session.Registered() { nodeID = c.session.ServerID } return &milvuspb.ComponentStates{ State: &milvuspb.ComponentInfo{ // NodeID: c.session.ServerID, // will race with Core.Register() NodeID: nodeID, Role: typeutil.RootCoordRole, StateCode: code, ExtraInfo: nil, }, Status: merr.Success(), SubcomponentStates: []*milvuspb.ComponentInfo{ { NodeID: nodeID, Role: typeutil.RootCoordRole, StateCode: code, ExtraInfo: nil, }, }, }, nil } // GetTimeTickChannel get timetick channel name func (c *Core) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ Status: merr.Success(), Value: Params.CommonCfg.RootCoordTimeTick.GetValue(), }, nil } // GetStatisticsChannel get statistics channel name func (c *Core) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ Status: merr.Success(), Value: Params.CommonCfg.RootCoordStatistics.GetValue(), }, nil } func (c *Core) CreateDatabase(ctx context.Context, in *milvuspb.CreateDatabaseRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } method := "CreateDatabase" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("CreateDatabase") log.Ctx(ctx).Info("received request to create database", zap.String("role", typeutil.RootCoordRole), zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID())) t := &createDatabaseTask{ baseTask: newBaseTask(ctx, c), Req: in, } if err := c.scheduler.AddTask(t); err != nil { log.Ctx(ctx).Info("failed to enqueue request to create database", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID())) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Ctx(ctx).Info("failed to create database", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) log.Ctx(ctx).Info("done to create database", zap.String("role", typeutil.RootCoordRole), zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs())) return merr.Success(), nil } func (c *Core) DropDatabase(ctx context.Context, in *milvuspb.DropDatabaseRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } method := "DropDatabase" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("DropDatabase") log.Ctx(ctx).Info("received request to drop database", zap.String("role", typeutil.RootCoordRole), zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID())) t := &dropDatabaseTask{ baseTask: newBaseTask(ctx, c), Req: in, } if err := c.scheduler.AddTask(t); err != nil { log.Ctx(ctx).Info("failed to enqueue request to drop database", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID())) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Ctx(ctx).Info("failed to drop database", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.CleanupRootCoordDBMetrics(in.GetDbName()) log.Ctx(ctx).Info("done to drop database", zap.String("role", typeutil.RootCoordRole), zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs())) return merr.Success(), nil } func (c *Core) ListDatabases(ctx context.Context, in *milvuspb.ListDatabasesRequest) (*milvuspb.ListDatabasesResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { ret := &milvuspb.ListDatabasesResponse{Status: merr.Status(err)} return ret, nil } method := "ListDatabases" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("ListDatabases") log := log.Ctx(ctx).With(zap.Int64("msgID", in.GetBase().GetMsgID())) log.Info("received request to list databases") t := &listDatabaseTask{ baseTask: newBaseTask(ctx, c), Req: in, Resp: &milvuspb.ListDatabasesResponse{}, } if err := c.scheduler.AddTask(t); err != nil { log.Info("failed to enqueue request to list databases", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return &milvuspb.ListDatabasesResponse{ Status: merr.Status(err), }, nil } if err := t.WaitToFinish(); err != nil { log.Info("failed to list databases", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return &milvuspb.ListDatabasesResponse{ Status: merr.Status(err), }, nil } metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) log.Info("done to list databases", zap.Int("num of databases", len(t.Resp.GetDbNames()))) return t.Resp, nil } // CreateCollection create collection func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("CreateCollection") log.Ctx(ctx).Info("received request to create collection", zap.String("dbName", in.GetDbName()), zap.String("name", in.GetCollectionName()), zap.String("role", typeutil.RootCoordRole)) t := &createCollectionTask{ baseTask: newBaseTask(ctx, c), Req: in, } if err := c.scheduler.AddTask(t); err != nil { log.Ctx(ctx).Info("failed to enqueue request to create collection", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("name", in.GetCollectionName())) metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.FailLabel).Inc() return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Ctx(ctx).Info("failed to create collection", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("name", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("CreateCollection").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("CreateCollection").Observe(float64(t.queueDur.Milliseconds())) log.Ctx(ctx).Info("done to create collection", zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) return merr.Success(), nil } // DropCollection drop collection func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("DropCollection") log.Ctx(ctx).Info("received request to drop collection", zap.String("role", typeutil.RootCoordRole), zap.String("dbName", in.GetDbName()), zap.String("name", in.GetCollectionName())) t := &dropCollectionTask{ baseTask: newBaseTask(ctx, c), Req: in, } if err := c.scheduler.AddTask(t); err != nil { log.Ctx(ctx).Info("failed to enqueue request to drop collection", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("name", in.GetCollectionName())) metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.FailLabel).Inc() return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Ctx(ctx).Info("failed to drop collection", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("name", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("DropCollection").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DropCollection").Observe(float64(t.queueDur.Milliseconds())) log.Ctx(ctx).Info("done to drop collection", zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) return merr.Success(), nil } // HasCollection check collection existence func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.BoolResponse{ Status: merr.Status(err), }, nil } metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("HasCollection") ts := getTravelTs(in) log := log.Ctx(ctx).With(zap.String("collectionName", in.GetCollectionName()), zap.Uint64("ts", ts)) t := &hasCollectionTask{ baseTask: newBaseTask(ctx, c), Req: in, Rsp: &milvuspb.BoolResponse{}, } if err := c.scheduler.AddTask(t); err != nil { log.Info("failed to enqueue request to has collection", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc() return &milvuspb.BoolResponse{ Status: merr.Status(err), }, nil } if err := t.WaitToFinish(); err != nil { log.Info("failed to has collection", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc() return &milvuspb.BoolResponse{ Status: merr.Status(err), }, nil } metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("HasCollection").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("HasCollection").Observe(float64(t.queueDur.Milliseconds())) return t.Rsp, nil } // getCollectionIDStr get collectionID string to avoid the alias name func (c *Core) getCollectionIDStr(ctx context.Context, db, collectionName string, collectionID int64) string { // When neither the collection name nor the collectionID exists, no error is returned at this point. // An error will be returned during the execution phase. if collectionID != 0 { return strconv.FormatInt(collectionID, 10) } coll, err := c.meta.GetCollectionByName(ctx, db, collectionName, typeutil.MaxTimestamp) if err != nil { return "-1" } return strconv.FormatInt(coll.CollectionID, 10) } func (c *Core) describeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*model.Collection, error) { ts := getTravelTs(in) if in.GetCollectionName() != "" { return c.meta.GetCollectionByName(ctx, in.GetDbName(), in.GetCollectionName(), ts) } return c.meta.GetCollectionByID(ctx, in.GetDbName(), in.GetCollectionID(), ts, allowUnavailable) } func convertModelToDesc(collInfo *model.Collection, aliases []string, dbName string) *milvuspb.DescribeCollectionResponse { resp := &milvuspb.DescribeCollectionResponse{ Status: merr.Success(), DbName: dbName, } resp.Schema = &schemapb.CollectionSchema{ Name: collInfo.Name, Description: collInfo.Description, AutoID: collInfo.AutoID, Fields: model.MarshalFieldModels(collInfo.Fields), Functions: model.MarshalFunctionModels(collInfo.Functions), EnableDynamicField: collInfo.EnableDynamicField, Properties: collInfo.Properties, } resp.CollectionID = collInfo.CollectionID resp.VirtualChannelNames = collInfo.VirtualChannelNames resp.PhysicalChannelNames = collInfo.PhysicalChannelNames if collInfo.ShardsNum == 0 { collInfo.ShardsNum = int32(len(collInfo.VirtualChannelNames)) } resp.ShardsNum = collInfo.ShardsNum resp.ConsistencyLevel = collInfo.ConsistencyLevel resp.CreatedTimestamp = collInfo.CreateTime createdPhysicalTime, _ := tsoutil.ParseHybridTs(collInfo.CreateTime) resp.CreatedUtcTimestamp = uint64(createdPhysicalTime) resp.Aliases = aliases resp.StartPositions = collInfo.StartPositions resp.CollectionName = resp.Schema.Name resp.Properties = collInfo.Properties resp.NumPartitions = int64(len(collInfo.Partitions)) resp.DbId = collInfo.DBID return resp } func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*milvuspb.DescribeCollectionResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.DescribeCollectionResponse{ Status: merr.Status(err), }, nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("DescribeCollection") ts := getTravelTs(in) log := log.Ctx(ctx).With(zap.String("collectionName", in.GetCollectionName()), zap.String("dbName", in.GetDbName()), zap.Int64("id", in.GetCollectionID()), zap.Uint64("ts", ts), zap.Bool("allowUnavailable", allowUnavailable)) t := &describeCollectionTask{ baseTask: newBaseTask(ctx, c), Req: in, Rsp: &milvuspb.DescribeCollectionResponse{Status: merr.Success()}, allowUnavailable: allowUnavailable, } if err := c.scheduler.AddTask(t); err != nil { log.Info("failed to enqueue request to describe collection", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc() return &milvuspb.DescribeCollectionResponse{ Status: merr.Status(err), }, nil } if err := t.WaitToFinish(); err != nil { log.Warn("failed to describe collection", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc() return &milvuspb.DescribeCollectionResponse{ Status: merr.Status(err), }, nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("DescribeCollection").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DescribeCollection").Observe(float64(t.queueDur.Milliseconds())) return t.Rsp, nil } // DescribeCollection return collection info func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { return c.describeCollectionImpl(ctx, in, false) } // DescribeCollectionInternal same to DescribeCollection, but will return unavailable collections and // only used in internal RPC. // When query cluster tried to do recovery, it'll be healthy until all collections' targets were recovered, // so during this time, releasing request generated by rootcoord's recovery won't succeed. So in theory, rootcoord goes // to be healthy, querycoord recovers all collections' targets, and then querycoord serves the releasing request sent // by rootcoord, eventually, the dropping collections will be released. func (c *Core) DescribeCollectionInternal(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { return c.describeCollectionImpl(ctx, in, true) } // ShowCollections list all collection names func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.ShowCollectionsResponse{ Status: merr.Status(err), }, nil } metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("ShowCollections") ts := getTravelTs(in) log := log.Ctx(ctx).With(zap.String("dbname", in.GetDbName()), zap.Uint64("ts", ts)) t := &showCollectionTask{ baseTask: newBaseTask(ctx, c), Req: in, Rsp: &milvuspb.ShowCollectionsResponse{}, } if err := c.scheduler.AddTask(t); err != nil { log.Info("failed to enqueue request to show collections", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc() return &milvuspb.ShowCollectionsResponse{ Status: merr.Status(err), }, nil } if err := t.WaitToFinish(); err != nil { log.Info("failed to show collections", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc() return &milvuspb.ShowCollectionsResponse{ Status: merr.Status(err), }, nil } metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("ShowCollections").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("ShowCollections").Observe(float64(t.queueDur.Milliseconds())) return t.Rsp, nil } func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("AlterCollection") log.Ctx(ctx).Info("received request to alter collection", zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetCollectionName()), zap.Any("props", in.Properties), zap.Any("delete_keys", in.DeleteKeys), ) t := &alterCollectionTask{ baseTask: newBaseTask(ctx, c), Req: in, } if err := c.scheduler.AddTask(t); err != nil { log.Warn("failed to enqueue request to alter collection", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("name", in.GetCollectionName())) metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc() return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Warn("failed to alter collection", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("name", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("AlterCollection").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("AlterCollection").Observe(float64(t.queueDur.Milliseconds())) log.Info("done to alter collection", zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) return merr.Success(), nil } func (c *Core) AlterCollectionField(ctx context.Context, in *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("AlterCollectionField") log.Ctx(ctx).Info("received request to alter collection field", zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetCollectionName()), zap.String("fieldName", in.GetFieldName()), zap.Any("props", in.Properties), ) t := &alterCollectionFieldTask{ baseTask: newBaseTask(ctx, c), Req: in, } if err := c.scheduler.AddTask(t); err != nil { log.Warn("failed to enqueue request to alter collection field", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("name", in.GetCollectionName()), zap.String("fieldName", in.GetFieldName())) metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.FailLabel).Inc() return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Warn("failed to alter collection", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("name", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("AlterCollectionField").Observe(float64(tr.ElapseSpan().Milliseconds())) log.Info("done to alter collection field", zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetCollectionName()), zap.String("fieldName", in.GetFieldName())) return merr.Success(), nil } func (c *Core) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } method := "AlterDatabase" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) log.Ctx(ctx).Info("received request to alter database", zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetDbName()), zap.Any("props", in.Properties)) t := &alterDatabaseTask{ baseTask: newBaseTask(ctx, c), Req: in, } if err := c.scheduler.AddTask(t); err != nil { log.Warn("failed to enqueue request to alter database", zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetDbName()), zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Warn("failed to alter database", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("name", in.GetDbName()), zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues(method).Observe(float64(t.queueDur.Milliseconds())) log.Ctx(ctx).Info("done to alter database", zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetDbName()), zap.Uint64("ts", t.GetTs())) return merr.Success(), nil } // CreatePartition create partition func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("CreatePartition") log.Ctx(ctx).Info("received request to create partition", zap.String("role", typeutil.RootCoordRole), zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName())) t := &createPartitionTask{ baseTask: newBaseTask(ctx, c), Req: in, } if err := c.scheduler.AddTask(t); err != nil { log.Ctx(ctx).Info("failed to enqueue request to create partition", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName())) metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.FailLabel).Inc() return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Ctx(ctx).Info("failed to create partition", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()), zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("CreatePartition").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("CreatePartition").Observe(float64(t.queueDur.Milliseconds())) log.Ctx(ctx).Info("done to create partition", zap.String("role", typeutil.RootCoordRole), zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()), zap.Uint64("ts", t.GetTs())) return merr.Success(), nil } // DropPartition drop partition func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("DropPartition") log.Ctx(ctx).Info("received request to drop partition", zap.String("role", typeutil.RootCoordRole), zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName())) t := &dropPartitionTask{ baseTask: newBaseTask(ctx, c), Req: in, } if err := c.scheduler.AddTask(t); err != nil { log.Ctx(ctx).Info("failed to enqueue request to drop partition", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName())) metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.FailLabel).Inc() return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Ctx(ctx).Info("failed to drop partition", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()), zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("DropPartition").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DropPartition").Observe(float64(t.queueDur.Milliseconds())) log.Ctx(ctx).Info("done to drop partition", zap.String("role", typeutil.RootCoordRole), zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()), zap.Uint64("ts", t.GetTs())) return merr.Success(), nil } // HasPartition check partition existence func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.BoolResponse{ Status: merr.Status(err), }, nil } metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("HasPartition") // TODO(longjiquan): why HasPartitionRequest doesn't contain Timestamp but other requests do. ts := typeutil.MaxTimestamp log := log.Ctx(ctx).With(zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()), zap.Uint64("ts", ts)) t := &hasPartitionTask{ baseTask: newBaseTask(ctx, c), Req: in, Rsp: &milvuspb.BoolResponse{}, } if err := c.scheduler.AddTask(t); err != nil { log.Info("failed to enqueue request to has partition", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc() return &milvuspb.BoolResponse{ Status: merr.Status(err), }, nil } if err := t.WaitToFinish(); err != nil { log.Info("failed to has partition", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc() return &milvuspb.BoolResponse{ Status: merr.Status(err), }, nil } metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("HasPartition").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("HasPartition").Observe(float64(t.queueDur.Milliseconds())) return t.Rsp, nil } func (c *Core) showPartitionsImpl(ctx context.Context, in *milvuspb.ShowPartitionsRequest, allowUnavailable bool) (*milvuspb.ShowPartitionsResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.ShowPartitionsResponse{ Status: merr.Status(err), }, nil } metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("ShowPartitions") log := log.Ctx(ctx).With(zap.String("collection", in.GetCollectionName()), zap.Int64("collection_id", in.GetCollectionID()), zap.Strings("partitions", in.GetPartitionNames()), zap.Bool("allowUnavailable", allowUnavailable)) t := &showPartitionTask{ baseTask: newBaseTask(ctx, c), Req: in, Rsp: &milvuspb.ShowPartitionsResponse{}, allowUnavailable: allowUnavailable, } if err := c.scheduler.AddTask(t); err != nil { log.Info("failed to enqueue request to show partitions", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc() return &milvuspb.ShowPartitionsResponse{ Status: merr.Status(err), // Status: common.StatusFromError(err), }, nil } if err := t.WaitToFinish(); err != nil { log.Info("failed to show partitions", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc() return &milvuspb.ShowPartitionsResponse{ Status: merr.Status(err), // Status: common.StatusFromError(err), }, nil } metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("ShowPartitions").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("ShowPartitions").Observe(float64(t.queueDur.Milliseconds())) return t.Rsp, nil } // ShowPartitions list all partition names func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) { return c.showPartitionsImpl(ctx, in, false) } // ShowPartitionsInternal same to ShowPartitions, only used in internal RPC. func (c *Core) ShowPartitionsInternal(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) { return c.showPartitionsImpl(ctx, in, true) } // ShowSegments list all segments func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) { // ShowSegments Only used in GetPersistentSegmentInfo, it's already deprecated for a long time. // Though we continue to keep current logic, it's not right enough since RootCoord only contains indexed segments. return &milvuspb.ShowSegmentsResponse{Status: merr.Success()}, nil } // GetPChannelInfo get pchannel info. func (c *Core) GetPChannelInfo(ctx context.Context, in *rootcoordpb.GetPChannelInfoRequest) (*rootcoordpb.GetPChannelInfoResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &rootcoordpb.GetPChannelInfoResponse{ Status: merr.Status(err), }, nil } return c.meta.GetPChannelInfo(ctx, in.GetPchannel()), nil } // AllocTimestamp alloc timestamp func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &rootcoordpb.AllocTimestampResponse{ Status: merr.Status(err), }, nil } if in.BlockTimestamp > 0 { blockTime, _ := tsoutil.ParseTS(in.BlockTimestamp) lastTime := c.tsoAllocator.GetLastSavedTime() deltaDuration := blockTime.Sub(lastTime) if deltaDuration > 0 { log.Info("wait for block timestamp", zap.Time("blockTime", blockTime), zap.Time("lastTime", lastTime), zap.Duration("delta", deltaDuration)) time.Sleep(deltaDuration + time.Millisecond*200) } } ts, err := c.tsoAllocator.GenerateTSO(in.GetCount()) if err != nil { log.Ctx(ctx).Error("failed to allocate timestamp", zap.String("role", typeutil.RootCoordRole), zap.Error(err)) return &rootcoordpb.AllocTimestampResponse{ Status: merr.Status(err), }, nil } // return first available timestamp ts = ts - uint64(in.GetCount()) + 1 metrics.RootCoordTimestamp.Set(float64(ts)) return &rootcoordpb.AllocTimestampResponse{ Status: merr.Success(), Timestamp: ts, Count: in.GetCount(), }, nil } // AllocID alloc ids func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &rootcoordpb.AllocIDResponse{ Status: merr.Status(err), }, nil } start, _, err := c.idAllocator.Alloc(in.Count) if err != nil { log.Ctx(ctx).Error("failed to allocate id", zap.String("role", typeutil.RootCoordRole), zap.Error(err)) return &rootcoordpb.AllocIDResponse{ Status: merr.Status(err), Count: in.Count, }, nil } metrics.RootCoordIDAllocCounter.Add(float64(in.Count)) return &rootcoordpb.AllocIDResponse{ Status: merr.Success(), ID: start, Count: in.Count, }, nil } // UpdateChannelTimeTick used to handle ChannelTimeTickMsg func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) { log := log.Ctx(ctx) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { log.Warn("failed to updateTimeTick because rootcoord is not healthy", zap.Error(err)) return merr.Status(err), nil } if in.Base.MsgType != commonpb.MsgType_TimeTick { log.Warn("failed to updateTimeTick because base messasge is not timetick, state", zap.Any("base message type", in.Base.MsgType)) return merr.Status(merr.WrapErrParameterInvalid(commonpb.MsgType_TimeTick.String(), in.Base.MsgType.String(), "invalid message type")), nil } err := c.chanTimeTick.updateTimeTick(in, "gRPC") if err != nil { log.Warn("failed to updateTimeTick", zap.String("role", typeutil.RootCoordRole), zap.Error(err)) return merr.Status(err), nil } return merr.Success(), nil } // InvalidateCollectionMetaCache notifies RootCoord to release the collection cache in Proxies. func (c *Core) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } err := c.proxyClientManager.InvalidateCollectionMetaCache(ctx, in) if err != nil { return merr.Status(err), nil } return merr.Success(), nil } // ShowConfigurations returns the configurations of RootCoord matching req.Pattern func (c *Core) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &internalpb.ShowConfigurationsResponse{ Status: merr.Status(err), Configuations: nil, }, nil } configList := make([]*commonpb.KeyValuePair, 0) for key, value := range Params.GetComponentConfigurations("rootcoord", req.Pattern) { configList = append(configList, &commonpb.KeyValuePair{ Key: key, Value: value, }) } return &internalpb.ShowConfigurationsResponse{ Status: merr.Success(), Configuations: configList, }, nil } // GetMetrics get metrics func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.GetMetricsResponse{ Status: merr.Status(err), Response: "", }, nil } resp := &milvuspb.GetMetricsResponse{ Status: merr.Success(), ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, paramtable.GetNodeID()), } ret, err := c.metricsRequest.ExecuteMetricsRequest(ctx, in) if err != nil { resp.Status = merr.Status(err) return resp, nil } resp.Response = ret return resp, nil } // CreateAlias create collection alias func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("CreateAlias") log.Ctx(ctx).Info("received request to create alias", zap.String("role", typeutil.RootCoordRole), zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName())) t := &createAliasTask{ baseTask: newBaseTask(ctx, c), Req: in, } if err := c.scheduler.AddTask(t); err != nil { log.Ctx(ctx).Info("failed to enqueue request to create alias", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName())) metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc() return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Ctx(ctx).Info("failed to create alias", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("CreateAlias").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("CreateAlias").Observe(float64(t.queueDur.Milliseconds())) log.Ctx(ctx).Info("done to create alias", zap.String("role", typeutil.RootCoordRole), zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) return merr.Success(), nil } // DropAlias drop collection alias func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("DropAlias") log.Ctx(ctx).Info("received request to drop alias", zap.String("role", typeutil.RootCoordRole), zap.String("alias", in.GetAlias())) t := &dropAliasTask{ baseTask: newBaseTask(ctx, c), Req: in, } if err := c.scheduler.AddTask(t); err != nil { log.Ctx(ctx).Info("failed to enqueue request to drop alias", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("alias", in.GetAlias())) metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc() return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Ctx(ctx).Info("failed to drop alias", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("alias", in.GetAlias()), zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("DropAlias").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DropAlias").Observe(float64(t.queueDur.Milliseconds())) log.Ctx(ctx).Info("done to drop alias", zap.String("role", typeutil.RootCoordRole), zap.String("alias", in.GetAlias()), zap.Uint64("ts", t.GetTs())) return merr.Success(), nil } // AlterAlias alter collection alias func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("AlterAlias") log.Ctx(ctx).Info("received request to alter alias", zap.String("role", typeutil.RootCoordRole), zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName())) t := &alterAliasTask{ baseTask: newBaseTask(ctx, c), Req: in, } if err := c.scheduler.AddTask(t); err != nil { log.Ctx(ctx).Info("failed to enqueue request to alter alias", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName())) metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc() return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Ctx(ctx).Info("failed to alter alias", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("AlterAlias").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("AlterAlias").Observe(float64(t.queueDur.Milliseconds())) log.Info("done to alter alias", zap.String("role", typeutil.RootCoordRole), zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) return merr.Success(), nil } // DescribeAlias describe collection alias func (c *Core) DescribeAlias(ctx context.Context, in *milvuspb.DescribeAliasRequest) (*milvuspb.DescribeAliasResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.DescribeAliasResponse{ Status: merr.Status(err), }, nil } log := log.Ctx(ctx).With( zap.String("role", typeutil.RootCoordRole), zap.String("db", in.GetDbName()), zap.String("alias", in.GetAlias())) method := "DescribeAlias" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("DescribeAlias") log.Info("received request to describe alias") if in.GetAlias() == "" { return &milvuspb.DescribeAliasResponse{ Status: merr.Status(merr.WrapErrParameterMissing("alias", "no input alias")), }, nil } collectionName, err := c.meta.DescribeAlias(ctx, in.GetDbName(), in.GetAlias(), 0) if err != nil { log.Warn("fail to DescribeAlias", zap.Error(err)) return &milvuspb.DescribeAliasResponse{ Status: merr.Status(err), }, nil } metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) log.Info("done to describe alias") return &milvuspb.DescribeAliasResponse{ Status: merr.Status(nil), DbName: in.GetDbName(), Alias: in.GetAlias(), Collection: collectionName, }, nil } // ListAliases list aliases func (c *Core) ListAliases(ctx context.Context, in *milvuspb.ListAliasesRequest) (*milvuspb.ListAliasesResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.ListAliasesResponse{ Status: merr.Status(err), }, nil } method := "ListAliases" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) log := log.Ctx(ctx).With( zap.String("role", typeutil.RootCoordRole), zap.String("db", in.GetDbName()), zap.String("collectionName", in.GetCollectionName())) log.Info("received request to list aliases") aliases, err := c.meta.ListAliases(ctx, in.GetDbName(), in.GetCollectionName(), 0) if err != nil { log.Warn("fail to ListAliases", zap.Error(err)) return &milvuspb.ListAliasesResponse{ Status: merr.Status(err), }, nil } metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) log.Info("done to list aliases") return &milvuspb.ListAliasesResponse{ Status: merr.Status(nil), DbName: in.GetDbName(), CollectionName: in.GetCollectionName(), Aliases: aliases, }, nil } // ExpireCredCache will call invalidate credential cache func (c *Core) ExpireCredCache(ctx context.Context, username string) error { req := proxypb.InvalidateCredCacheRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithSourceID(c.session.ServerID), ), Username: username, } return c.proxyClientManager.InvalidateCredentialCache(ctx, &req) } // UpdateCredCache will call update credential cache func (c *Core) UpdateCredCache(ctx context.Context, credInfo *internalpb.CredentialInfo) error { req := proxypb.UpdateCredCacheRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithSourceID(c.session.ServerID), ), Username: credInfo.Username, Password: credInfo.Sha256Password, } return c.proxyClientManager.UpdateCredentialCache(ctx, &req) } // CreateCredential create new user and password // 1. decode ciphertext password to raw password // 2. encrypt raw password // 3. save in to etcd func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.CredentialInfo) (*commonpb.Status, error) { method := "CreateCredential" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } // insert to db err := c.meta.AddCredential(ctx, credInfo) if err != nil { ctxLog.Warn("CreateCredential save credential failed", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return merr.StatusWithErrorCode(err, commonpb.ErrorCode_CreateCredentialFailure), nil } // update proxy's local cache err = c.UpdateCredCache(ctx, credInfo) if err != nil { ctxLog.Warn("CreateCredential add cache failed", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() } ctxLog.Debug("CreateCredential success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfCredentials.Inc() return merr.Success(), nil } // GetCredential get credential by username func (c *Core) GetCredential(ctx context.Context, in *rootcoordpb.GetCredentialRequest) (*rootcoordpb.GetCredentialResponse, error) { method := "GetCredential" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", in.Username)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &rootcoordpb.GetCredentialResponse{Status: merr.Status(err)}, nil } credInfo, err := c.meta.GetCredential(ctx, in.Username) if err != nil { ctxLog.Warn("GetCredential query credential failed", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return &rootcoordpb.GetCredentialResponse{ Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_GetCredentialFailure), }, nil } ctxLog.Debug("GetCredential success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &rootcoordpb.GetCredentialResponse{ Status: merr.Success(), Username: credInfo.Username, Password: credInfo.EncryptedPassword, }, nil } // UpdateCredential update password for a user func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.CredentialInfo) (*commonpb.Status, error) { method := "UpdateCredential" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } // update data on storage err := c.meta.AlterCredential(ctx, credInfo) if err != nil { ctxLog.Warn("UpdateCredential save credential failed", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return merr.StatusWithErrorCode(err, commonpb.ErrorCode_UpdateCredentialFailure), nil } // update proxy's local cache err = c.UpdateCredCache(ctx, credInfo) if err != nil { ctxLog.Warn("UpdateCredential update cache failed", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return merr.StatusWithErrorCode(err, commonpb.ErrorCode_UpdateCredentialFailure), nil } ctxLog.Debug("UpdateCredential success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return merr.Success(), nil } // DeleteCredential delete a user func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) { method := "DeleteCredential" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", in.Username)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } var status *commonpb.Status defer func() { if status.Code != 0 { metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() } }() err := executeDeleteCredentialTaskSteps(ctx, c, in.Username) if err != nil { errMsg := "fail to execute task when deleting the user" ctxLog.Warn(errMsg, zap.Error(err)) status = merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DeleteCredentialFailure) return status, nil } ctxLog.Debug("DeleteCredential success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfCredentials.Dec() status = merr.Success() return status, nil } // ListCredUsers list all usernames func (c *Core) ListCredUsers(ctx context.Context, in *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) { method := "ListCredUsers" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.ListCredUsersResponse{Status: merr.Status(err)}, nil } credInfo, err := c.meta.ListCredentialUsernames(ctx) if err != nil { ctxLog.Warn("ListCredUsers query usernames failed", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() status := merr.Status(err) return &milvuspb.ListCredUsersResponse{Status: status}, nil } ctxLog.Debug("ListCredUsers success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &milvuspb.ListCredUsersResponse{ Status: merr.Success(), Usernames: credInfo.Usernames, }, nil } // CreateRole create role // - check the node health // - check if the role is existed // - check if the role num has reached the limit // - create the role by the meta api func (c *Core) CreateRole(ctx context.Context, in *milvuspb.CreateRoleRequest) (*commonpb.Status, error) { method := "CreateRole" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in)) ctxLog.Debug(method + " begin") if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } entity := in.Entity err := c.meta.CreateRole(ctx, util.DefaultTenant, &milvuspb.RoleEntity{Name: entity.Name}) if err != nil { errMsg := "fail to create role" ctxLog.Warn(errMsg, zap.Error(err)) return merr.StatusWithErrorCode(err, commonpb.ErrorCode_CreateRoleFailure), nil } ctxLog.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfRoles.Inc() return merr.Success(), nil } // DropRole drop role // - check the node health // - check if the role name is existed // - check if the role has some grant info // - get all role mapping of this role // - drop these role mappings // - drop the role by the meta api func (c *Core) DropRole(ctx context.Context, in *milvuspb.DropRoleRequest) (*commonpb.Status, error) { method := "DropRole" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("role_name", in.RoleName)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } for util.IsBuiltinRole(in.GetRoleName()) { err := merr.WrapErrPrivilegeNotPermitted("the role[%s] is a builtin role, which can't be dropped", in.GetRoleName()) return merr.Status(err), nil } if _, err := c.meta.SelectRole(ctx, util.DefaultTenant, &milvuspb.RoleEntity{Name: in.RoleName}, false); err != nil { errMsg := "not found the role, maybe the role isn't existed or internal system error" ctxLog.Warn(errMsg, zap.Error(err)) return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil } if !in.ForceDrop { grantEntities, err := c.meta.SelectGrant(ctx, util.DefaultTenant, &milvuspb.GrantEntity{ Role: &milvuspb.RoleEntity{Name: in.RoleName}, DbName: "*", }) if len(grantEntities) != 0 { errMsg := "fail to drop the role that it has privileges. Use REVOKE API to revoke privileges" ctxLog.Warn(errMsg, zap.Any("grants", grantEntities), zap.Error(err)) return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil } } err := executeDropRoleTaskSteps(ctx, c, in.RoleName, in.ForceDrop) if err != nil { errMsg := "fail to execute task when dropping the role" ctxLog.Warn(errMsg, zap.Error(err)) return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil } ctxLog.Debug(method+" success", zap.String("role_name", in.RoleName)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfRoles.Dec() return merr.Success(), nil } // OperateUserRole operate the relationship between a user and a role // - check the node health // - check if the role is valid // - check if the user is valid // - operate the user-role by the meta api // - update the policy cache func (c *Core) OperateUserRole(ctx context.Context, in *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) { method := "OperateUserRole-" + in.Type.String() metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } if _, err := c.meta.SelectRole(ctx, util.DefaultTenant, &milvuspb.RoleEntity{Name: in.RoleName}, false); err != nil { errMsg := "not found the role, maybe the role isn't existed or internal system error" ctxLog.Warn(errMsg, zap.Error(err)) return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperateUserRoleFailure), nil } if in.Type != milvuspb.OperateUserRoleType_RemoveUserFromRole { if _, err := c.meta.SelectUser(ctx, util.DefaultTenant, &milvuspb.UserEntity{Name: in.Username}, false); err != nil { errMsg := "not found the user, maybe the user isn't existed or internal system error" ctxLog.Warn(errMsg, zap.Error(err)) return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperateUserRoleFailure), nil } } err := executeOperateUserRoleTaskSteps(ctx, c, in) if err != nil { errMsg := "fail to execute task when operate the user and role" ctxLog.Warn(errMsg, zap.Error(err)) return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperateUserRoleFailure), nil } ctxLog.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return merr.Success(), nil } // SelectRole select role // - check the node health // - check if the role is valid when this param is provided // - select role by the meta api func (c *Core) SelectRole(ctx context.Context, in *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) { method := "SelectRole" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.SelectRoleResponse{Status: merr.Status(err)}, nil } if in.Role != nil { if _, err := c.meta.SelectRole(ctx, util.DefaultTenant, &milvuspb.RoleEntity{Name: in.Role.Name}, false); err != nil { if errors.Is(err, merr.ErrIoKeyNotFound) { return &milvuspb.SelectRoleResponse{ Status: merr.Success(), }, nil } errMsg := "fail to select the role to check the role name" ctxLog.Warn(errMsg, zap.Error(err)) return &milvuspb.SelectRoleResponse{ Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectRoleFailure), }, nil } } roleResults, err := c.meta.SelectRole(ctx, util.DefaultTenant, in.Role, in.IncludeUserInfo) if err != nil { errMsg := "fail to select the role" ctxLog.Warn(errMsg, zap.Error(err)) return &milvuspb.SelectRoleResponse{ Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectRoleFailure), }, nil } ctxLog.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &milvuspb.SelectRoleResponse{ Status: merr.Success(), Results: roleResults, }, nil } // SelectUser select user // - check the node health // - check if the user is valid when this param is provided // - select user by the meta api func (c *Core) SelectUser(ctx context.Context, in *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) { method := "SelectUser" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.SelectUserResponse{Status: merr.Status(err)}, nil } if in.User != nil { if _, err := c.meta.SelectUser(ctx, util.DefaultTenant, &milvuspb.UserEntity{Name: in.User.Name}, false); err != nil { if errors.Is(err, merr.ErrIoKeyNotFound) { return &milvuspb.SelectUserResponse{ Status: merr.Success(), }, nil } errMsg := "fail to select the user to check the username" ctxLog.Warn(errMsg, zap.Any("in", in), zap.Error(err)) return &milvuspb.SelectUserResponse{ Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectUserFailure), }, nil } } userResults, err := c.meta.SelectUser(ctx, util.DefaultTenant, in.User, in.IncludeRoleInfo) if err != nil { errMsg := "fail to select the user" ctxLog.Warn(errMsg, zap.Error(err)) return &milvuspb.SelectUserResponse{ Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectUserFailure), }, nil } ctxLog.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &milvuspb.SelectUserResponse{ Status: merr.Success(), Results: userResults, }, nil } func (c *Core) isValidRole(entity *milvuspb.RoleEntity) error { if entity == nil { return errors.New("the role entity is nil") } if entity.Name == "" { return errors.New("the name in the role entity is empty") } if _, err := c.meta.SelectRole(c.ctx, util.DefaultTenant, &milvuspb.RoleEntity{Name: entity.Name}, false); err != nil { log.Warn("fail to select the role", zap.String("role_name", entity.Name), zap.Error(err)) return errors.New("not found the role, maybe the role isn't existed or internal system error") } return nil } func (c *Core) isValidObject(entity *milvuspb.ObjectEntity) error { if entity == nil { return errors.New("the object entity is nil") } if _, ok := commonpb.ObjectType_value[entity.Name]; !ok { return fmt.Errorf("not found the object type[name: %s], supported the object types: %v", entity.Name, lo.Keys(commonpb.ObjectType_value)) } return nil } func (c *Core) isValidPrivilege(ctx context.Context, privilegeName string, object string) error { if util.IsAnyWord(privilegeName) { return nil } customPrivGroup, err := c.meta.IsCustomPrivilegeGroup(ctx, privilegeName) if err != nil { return err } if customPrivGroup { return fmt.Errorf("can not operate the custom privilege group [%s]", privilegeName) } if lo.Contains(Params.RbacConfig.GetDefaultPrivilegeGroupNames(), privilegeName) { return fmt.Errorf("can not operate the built-in privilege group [%s]", privilegeName) } // check object privileges for built-in privileges if util.IsPrivilegeNameDefined(privilegeName) { privileges, ok := util.ObjectPrivileges[object] if !ok { return fmt.Errorf("not found the object type[name: %s], supported the object types: %v", object, lo.Keys(commonpb.ObjectType_value)) } for _, privilege := range privileges { if privilege == privilegeName { return nil } } } return fmt.Errorf("not found the privilege name[%s] in object[%s]", privilegeName, object) } func (c *Core) isValidPrivilegeV2(ctx context.Context, privilegeName string) error { if util.IsAnyWord(privilegeName) { return nil } customPrivGroup, err := c.meta.IsCustomPrivilegeGroup(ctx, privilegeName) if err != nil { return err } if customPrivGroup { return nil } if util.IsPrivilegeNameDefined(privilegeName) { return nil } return fmt.Errorf("not found the privilege name[%s]", privilegeName) } // OperatePrivilege operate the privilege, including grant and revoke // - check the node health // - check if the operating type is valid // - check if the entity is nil // - check if the params, including the resource entity, the principal entity, the grantor entity, is valid // - operate the privilege by the meta api // - update the policy cache func (c *Core) OperatePrivilege(ctx context.Context, in *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) { method := "OperatePrivilege" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in)) ctxLog.Debug(method) if err := c.operatePrivilegeCommonCheck(ctx, in); err != nil { return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil } privName := in.Entity.Grantor.Privilege.Name switch in.Version { case "v2": if err := c.isValidPrivilegeV2(ctx, privName); err != nil { ctxLog.Error("", zap.Error(err)) return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil } if err := c.validatePrivilegeGroupParams(ctx, privName, in.Entity.DbName, in.Entity.ObjectName); err != nil { ctxLog.Error("", zap.Error(err)) return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil } // set up object type for metastore, to be compatible with v1 version in.Entity.Object.Name = util.GetObjectType(privName) default: if err := c.isValidPrivilege(ctx, privName, in.Entity.Object.Name); err != nil { ctxLog.Error("", zap.Error(err)) return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil } // set up object name if it is global object type and not built in privilege group if in.Entity.Object.Name == commonpb.ObjectType_Global.String() && !util.IsBuiltinPrivilegeGroup(in.Entity.Grantor.Privilege.Name) { in.Entity.ObjectName = util.AnyWord } } err := executeOperatePrivilegeTaskSteps(ctx, c, in) if err != nil { errMsg := "fail to execute task when operating the privilege" ctxLog.Warn(errMsg, zap.Error(err)) return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil } ctxLog.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return merr.Success(), nil } func (c *Core) operatePrivilegeCommonCheck(ctx context.Context, in *milvuspb.OperatePrivilegeRequest) error { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return err } if in.Type != milvuspb.OperatePrivilegeType_Grant && in.Type != milvuspb.OperatePrivilegeType_Revoke { errMsg := fmt.Sprintf("invalid operate privilege type, current type: %s, valid value: [%s, %s]", in.Type, milvuspb.OperatePrivilegeType_Grant, milvuspb.OperatePrivilegeType_Revoke) return errors.New(errMsg) } if in.Entity == nil { errMsg := "the grant entity in the request is nil" return errors.New(errMsg) } if err := c.isValidObject(in.Entity.Object); err != nil { return errors.New("the object entity in the request is nil or invalid") } if err := c.isValidRole(in.Entity.Role); err != nil { return err } entity := in.Entity.Grantor if entity == nil { return errors.New("the grantor entity is nil") } if entity.User == nil || entity.User.Name == "" { return errors.New("the user entity in the grantor entity is nil or empty") } if _, err := c.meta.SelectUser(ctx, util.DefaultTenant, &milvuspb.UserEntity{Name: entity.User.Name}, false); err != nil { log.Ctx(ctx).Warn("fail to select the user", zap.String("username", entity.User.Name), zap.Error(err)) return errors.New("not found the user, maybe the user isn't existed or internal system error") } if entity.Privilege == nil { return errors.New("the privilege entity in the grantor entity is nil") } return nil } func (c *Core) validatePrivilegeGroupParams(ctx context.Context, entity string, dbName string, collectionName string) error { allGroups, err := c.getDefaultAndCustomPrivilegeGroups(ctx) if err != nil { return err } groups := lo.SliceToMap(allGroups, func(group *milvuspb.PrivilegeGroupInfo) (string, []*milvuspb.PrivilegeEntity) { return group.GroupName, group.Privileges }) privs, exists := groups[entity] if !exists || len(privs) == 0 { // it is a privilege, no need to check with other params return nil } // since all privileges are same level in a group, just check the first privilege level := util.GetPrivilegeLevel(privs[0].GetName()) switch level { case milvuspb.PrivilegeLevel_Cluster.String(): if !util.IsAnyWord(dbName) || !util.IsAnyWord(collectionName) { return merr.WrapErrParameterInvalidMsg("dbName and collectionName should be * for the cluster level privilege: %s", entity) } return nil case milvuspb.PrivilegeLevel_Database.String(): if collectionName != "" && collectionName != util.AnyWord { return merr.WrapErrParameterInvalidMsg("collectionName should be * for the database level privilege: %s", entity) } return nil case milvuspb.PrivilegeLevel_Collection.String(): if util.IsAnyWord(dbName) && !util.IsAnyWord(collectionName) && collectionName != "" { return merr.WrapErrParameterInvalidMsg("please specify database name for the collection level privilege: %s", entity) } return nil default: return errors.New("not found the privilege level") } } func (c *Core) getMetastorePrivilegeName(ctx context.Context, privName string) (string, error) { // if it is built-in privilege, return the privilege name directly if util.IsPrivilegeNameDefined(privName) { return util.PrivilegeNameForMetastore(privName), nil } // return the privilege group name if it is a custom privilege group customGroup, err := c.meta.IsCustomPrivilegeGroup(ctx, privName) if err != nil { return "", err } if customGroup { return util.PrivilegeGroupNameForMetastore(privName), nil } return "", errors.New("not found the privilege name") } // SelectGrant select grant // - check the node health // - check if the principal entity is valid // - check if the resource entity which is provided by the user is valid // - select grant by the meta api func (c *Core) SelectGrant(ctx context.Context, in *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) { method := "SelectGrant" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.SelectGrantResponse{ Status: merr.Status(err), }, nil } if in.Entity == nil { errMsg := "the grant entity in the request is nil" ctxLog.Warn(errMsg) return &milvuspb.SelectGrantResponse{ Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectGrantFailure), }, nil } if err := c.isValidRole(in.Entity.Role); err != nil { ctxLog.Warn("", zap.Error(err)) return &milvuspb.SelectGrantResponse{ Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_SelectGrantFailure), }, nil } if in.Entity.Object != nil { if err := c.isValidObject(in.Entity.Object); err != nil { ctxLog.Warn("", zap.Error(err)) return &milvuspb.SelectGrantResponse{ Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_SelectGrantFailure), }, nil } } grantEntities, err := c.meta.SelectGrant(ctx, util.DefaultTenant, in.Entity) if errors.Is(err, merr.ErrIoKeyNotFound) { return &milvuspb.SelectGrantResponse{ Status: merr.Success(), Entities: grantEntities, }, nil } if err != nil { errMsg := "fail to select the grant" ctxLog.Warn(errMsg, zap.Error(err)) return &milvuspb.SelectGrantResponse{ Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectGrantFailure), }, nil } ctxLog.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &milvuspb.SelectGrantResponse{ Status: merr.Success(), Entities: grantEntities, }, nil } func (c *Core) ListPolicy(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) { method := "PolicyList" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &internalpb.ListPolicyResponse{ Status: merr.Status(err), }, nil } policies, err := c.meta.ListPolicy(ctx, util.DefaultTenant) if err != nil { errMsg := "fail to list policy" ctxLog.Warn(errMsg, zap.Error(err)) return &internalpb.ListPolicyResponse{ Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_ListPolicyFailure), }, nil } // expand privilege groups and turn to policies allGroups, err := c.getDefaultAndCustomPrivilegeGroups(ctx) if err != nil { errMsg := "fail to get privilege groups" ctxLog.Warn(errMsg, zap.Error(err)) return &internalpb.ListPolicyResponse{ Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_ListPolicyFailure), }, nil } groups := lo.SliceToMap(allGroups, func(group *milvuspb.PrivilegeGroupInfo) (string, []*milvuspb.PrivilegeEntity) { return group.GroupName, group.Privileges }) expandGrants, err := c.expandPrivilegeGroups(ctx, policies, groups) if err != nil { errMsg := "fail to expand privilege groups" ctxLog.Warn(errMsg, zap.Error(err)) return &internalpb.ListPolicyResponse{ Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_ListPolicyFailure), }, nil } expandPolicies := lo.Map(expandGrants, func(r *milvuspb.GrantEntity, _ int) string { return funcutil.PolicyForPrivilege(r.Role.Name, r.Object.Name, r.ObjectName, r.Grantor.Privilege.Name, r.DbName) }) userRoles, err := c.meta.ListUserRole(ctx, util.DefaultTenant) if err != nil { errMsg := "fail to list user-role" ctxLog.Warn(errMsg, zap.Any("in", in), zap.Error(err)) return &internalpb.ListPolicyResponse{ Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_ListPolicyFailure), }, nil } ctxLog.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &internalpb.ListPolicyResponse{ Status: merr.Success(), PolicyInfos: expandPolicies, UserRoles: userRoles, PrivilegeGroups: allGroups, }, nil } func (c *Core) BackupRBAC(ctx context.Context, in *milvuspb.BackupRBACMetaRequest) (*milvuspb.BackupRBACMetaResponse, error) { method := "BackupRBAC" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.BackupRBACMetaResponse{ Status: merr.Status(err), }, nil } rbacMeta, err := c.meta.BackupRBAC(ctx, util.DefaultTenant) if err != nil { return &milvuspb.BackupRBACMetaResponse{ Status: merr.Status(err), }, nil } ctxLog.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &milvuspb.BackupRBACMetaResponse{ Status: merr.Success(), RBACMeta: rbacMeta, }, nil } func (c *Core) RestoreRBAC(ctx context.Context, in *milvuspb.RestoreRBACMetaRequest) (*commonpb.Status, error) { method := "RestoreRBAC" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } err := executeRestoreRBACTaskSteps(ctx, c, in) if err != nil { errMsg := "fail to execute task when restore rbac meta data" ctxLog.Warn(errMsg, zap.Error(err)) return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil } ctxLog.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return merr.Success(), nil } func (c *Core) RenameCollection(ctx context.Context, req *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } log := log.Ctx(ctx).With(zap.String("oldCollectionName", req.GetOldName()), zap.String("newCollectionName", req.GetNewName())) log.Info("received request to rename collection") metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("RenameCollection") t := &renameCollectionTask{ baseTask: newBaseTask(ctx, c), Req: req, } if err := c.scheduler.AddTask(t); err != nil { log.Warn("failed to enqueue request to rename collection", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.FailLabel).Inc() return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Warn("failed to rename collection", zap.Uint64("ts", t.GetTs()), zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("RenameCollection").Observe(float64(tr.ElapseSpan().Milliseconds())) log.Info("done to rename collection", zap.Uint64("ts", t.GetTs())) return merr.Success(), nil } func (c *Core) DescribeDatabase(ctx context.Context, req *rootcoordpb.DescribeDatabaseRequest) (*rootcoordpb.DescribeDatabaseResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &rootcoordpb.DescribeDatabaseResponse{Status: merr.Status(err)}, nil } log := log.Ctx(ctx).With(zap.String("dbName", req.GetDbName())) log.Info("received request to describe database ") metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("DescribeDatabase") t := &describeDBTask{ baseTask: newBaseTask(ctx, c), Req: req, } if err := c.scheduler.AddTask(t); err != nil { log.Warn("failed to enqueue request to describe database", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.FailLabel).Inc() return &rootcoordpb.DescribeDatabaseResponse{Status: merr.Status(err)}, nil } if err := t.WaitToFinish(); err != nil { log.Warn("failed to describe database", zap.Uint64("ts", t.GetTs()), zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.FailLabel).Inc() return &rootcoordpb.DescribeDatabaseResponse{Status: merr.Status(err)}, nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("DescribeDatabase").Observe(float64(tr.ElapseSpan().Milliseconds())) log.Info("done to describe database", zap.Uint64("ts", t.GetTs())) return t.Rsp, nil } func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.CheckHealthResponse{ Status: merr.Status(err), IsHealthy: false, Reasons: []string{fmt.Sprintf("serverID=%d: %v", c.session.ServerID, err)}, }, nil } group, ctx := errgroup.WithContext(ctx) errs := typeutil.NewConcurrentSet[error]() proxyClients := c.proxyClientManager.GetProxyClients() proxyClients.Range(func(key int64, value types.ProxyClient) bool { nodeID := key proxyClient := value group.Go(func() error { sta, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) if err != nil { errs.Insert(err) return err } err = merr.AnalyzeState("Proxy", nodeID, sta) if err != nil { errs.Insert(err) } return err }) return true }) maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) if maxDelay > 0 { group.Go(func() error { err := CheckTimeTickLagExceeded(ctx, c.queryCoord, c.dataCoord, maxDelay) if err != nil { errs.Insert(err) } return err }) } err := group.Wait() if err != nil { return &milvuspb.CheckHealthResponse{ Status: merr.Success(), IsHealthy: false, Reasons: lo.Map(errs.Collect(), func(e error, i int) string { return err.Error() }), }, nil } return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil } func (c *Core) CreatePrivilegeGroup(ctx context.Context, in *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) { method := "CreatePrivilegeGroup" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.StatusWithErrorCode(err, commonpb.ErrorCode_CreatePrivilegeGroupFailure), nil } if err := c.meta.CreatePrivilegeGroup(ctx, in.GroupName); err != nil { ctxLog.Warn("fail to create privilege group", zap.Error(err)) return merr.StatusWithErrorCode(err, commonpb.ErrorCode_CreatePrivilegeGroupFailure), nil } ctxLog.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfPrivilegeGroups.Inc() return merr.Success(), nil } func (c *Core) DropPrivilegeGroup(ctx context.Context, in *milvuspb.DropPrivilegeGroupRequest) (*commonpb.Status, error) { method := "DropPrivilegeGroup" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.StatusWithErrorCode(err, commonpb.ErrorCode_DropPrivilegeGroupFailure), nil } if err := c.meta.DropPrivilegeGroup(ctx, in.GroupName); err != nil { ctxLog.Warn("fail to drop privilege group", zap.Error(err)) return merr.StatusWithErrorCode(err, commonpb.ErrorCode_DropPrivilegeGroupFailure), nil } ctxLog.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfPrivilegeGroups.Desc() return merr.Success(), nil } func (c *Core) ListPrivilegeGroups(ctx context.Context, in *milvuspb.ListPrivilegeGroupsRequest) (*milvuspb.ListPrivilegeGroupsResponse, error) { method := "ListPrivilegeGroups" metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &milvuspb.ListPrivilegeGroupsResponse{ Status: merr.Status(err), }, nil } privGroups, err := c.meta.ListPrivilegeGroups(ctx) if err != nil { ctxLog.Warn("fail to list privilege group", zap.Error(err)) return &milvuspb.ListPrivilegeGroupsResponse{ Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_ListPrivilegeGroupsFailure), }, nil } ctxLog.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) // append built in privilege groups privGroups = append(privGroups, Params.RbacConfig.GetDefaultPrivilegeGroups()...) return &milvuspb.ListPrivilegeGroupsResponse{ Status: merr.Success(), PrivilegeGroups: privGroups, }, nil } func (c *Core) OperatePrivilegeGroup(ctx context.Context, in *milvuspb.OperatePrivilegeGroupRequest) (*commonpb.Status, error) { method := "OperatePrivilegeGroup-" + in.Type.String() metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder(method) ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in)) ctxLog.Debug(method) if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } err := executeOperatePrivilegeGroupTaskSteps(ctx, c, in) if err != nil { errMsg := "fail to execute task when operate privilege group" ctxLog.Warn(errMsg, zap.Error(err)) return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeGroupFailure), nil } ctxLog.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return merr.Success(), nil } func (c *Core) expandPrivilegeGroups(ctx context.Context, grants []*milvuspb.GrantEntity, groups map[string][]*milvuspb.PrivilegeEntity) ([]*milvuspb.GrantEntity, error) { newGrants := []*milvuspb.GrantEntity{} createGrantEntity := func(grant *milvuspb.GrantEntity, privilegeName string) (*milvuspb.GrantEntity, error) { metaName, err := c.getMetastorePrivilegeName(ctx, privilegeName) if err != nil { return nil, err } objectType := &milvuspb.ObjectEntity{ Name: util.GetObjectType(privilegeName), } objectName := grant.ObjectName if objectType.Name == commonpb.ObjectType_Global.String() { objectName = util.AnyWord } return &milvuspb.GrantEntity{ Role: grant.Role, Object: objectType, ObjectName: objectName, Grantor: &milvuspb.GrantorEntity{ User: grant.Grantor.User, Privilege: &milvuspb.PrivilegeEntity{ Name: metaName, }, }, DbName: grant.DbName, }, nil } for _, grant := range grants { privName := grant.Grantor.Privilege.Name privGroup, exists := groups[privName] if !exists { privGroup = []*milvuspb.PrivilegeEntity{{Name: privName}} } for _, priv := range privGroup { newGrant, err := createGrantEntity(grant, priv.Name) if err != nil { return nil, err } newGrants = append(newGrants, newGrant) } } // uniq by role + object + object name + grantor user + privilege name + db name return lo.UniqBy(newGrants, func(g *milvuspb.GrantEntity) string { return fmt.Sprintf("%s-%s-%s-%s-%s-%s", g.Role, g.Object, g.ObjectName, g.Grantor.User, g.Grantor.Privilege.Name, g.DbName) }), nil } // getDefaultAndCustomPrivilegeGroups returns default privilege groups and user-defined privilege groups. func (c *Core) getDefaultAndCustomPrivilegeGroups(ctx context.Context) ([]*milvuspb.PrivilegeGroupInfo, error) { allGroups, err := c.meta.ListPrivilegeGroups(ctx) allGroups = append(allGroups, Params.RbacConfig.GetDefaultPrivilegeGroups()...) if err != nil { return nil, err } return allGroups, nil } // RegisterStreamingCoordGRPCService registers the grpc service of streaming coordinator. func (s *Core) RegisterStreamingCoordGRPCService(server *grpc.Server) { s.streamingCoord.RegisterGRPCService(server) }