milvus/internal/coordinator/mix_coord.go

1099 lines
43 KiB
Go

package coordinator
import (
"context"
"fmt"
"os"
"sync"
"time"
"github.com/tidwall/gjson"
"github.com/tikv/client-go/v2/txnkv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/datacoord"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/tikv"
"github.com/milvus-io/milvus/internal/querycoordv2"
"github.com/milvus-io/milvus/internal/rootcoord"
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/proto/proxypb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/util"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
var Params *paramtable.ComponentParam = paramtable.Get()
type mixCoordImpl struct {
rootcoordServer *rootcoord.Core
queryCoordServer *querycoordv2.Server
datacoordServer *datacoord.Server
streamingCoord *streamingcoord.Server
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
etcdCli *clientv3.Client
tikvCli *txnkv.Client
address string
proxyCreator proxyutil.ProxyCreator
proxyWatcher *proxyutil.ProxyWatcher
proxyClientManager proxyutil.ProxyClientManagerInterface
metricsCacheManager *metricsinfo.MetricsCacheManager
stateCode atomic.Int32
initOnce sync.Once
startOnce sync.Once
session *sessionutil.Session
factory dependency.Factory
enableActiveStandBy bool
activateFunc func() error
metricsRequest *metricsinfo.MetricsRequest
metaKVCreator func() kv.MetaKv
mixCoordClient types.MixCoordClient
}
func NewMixCoordServer(c context.Context, factory dependency.Factory) (*mixCoordImpl, error) {
ctx, cancel := context.WithCancel(c)
rootCoordServer, _ := rootcoord.NewCore(ctx, factory)
queryCoordServer, _ := querycoordv2.NewQueryCoord(c)
dataCoordServer := datacoord.CreateServer(c, factory)
return &mixCoordImpl{
ctx: ctx,
cancel: cancel,
rootcoordServer: rootCoordServer,
queryCoordServer: queryCoordServer,
datacoordServer: dataCoordServer,
enableActiveStandBy: Params.MixCoordCfg.EnableActiveStandby.GetAsBool(),
factory: factory,
}, nil
}
// Register register mixcoord at etcd
func (s *mixCoordImpl) Register() error {
log := log.Ctx(s.ctx)
s.session.Register()
afterRegister := func() {
metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.MixCoordRole).Inc()
log.Info("MixCoord Register Finished")
s.session.LivenessCheck(s.ctx, func() {
log.Error("MixCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.GetServerID()))
os.Exit(1)
})
}
if s.enableActiveStandBy {
go func() {
if err := s.session.ProcessActiveStandBy(s.activateFunc); err != nil {
log.Error("failed to activate standby server", zap.Error(err))
panic(err)
}
afterRegister()
}()
} else {
afterRegister()
}
return nil
}
func (s *mixCoordImpl) Init() error {
log := log.Ctx(s.ctx)
var initErr error
if initErr = s.initSession(); initErr != nil {
return initErr
}
s.factory.Init(Params)
s.initKVCreator()
s.initStreamingCoord()
if s.enableActiveStandBy {
s.activateFunc = func() error {
log.Info("mixCoord switch from standby to active, activating")
var err error
s.initOnce.Do(func() {
if err = s.initInternal(); err != nil {
log.Error("mixCoord init failed", zap.Error(err))
}
})
if err != nil {
return err
}
log.Info("mixCoord startup success", zap.String("address", s.session.GetAddress()))
return err
}
s.UpdateStateCode(commonpb.StateCode_StandBy)
log.Info("MixCoord enter standby mode successfully")
} else {
s.initOnce.Do(func() {
if initErr = s.initInternal(); initErr != nil {
log.Error("mixCoord init failed", zap.Error(initErr))
}
})
}
return initErr
}
func (s *mixCoordImpl) initInternal() error {
log := log.Ctx(s.ctx)
s.rootcoordServer.SetMixCoord(s)
s.datacoordServer.SetMixCoord(s)
s.queryCoordServer.SetMixCoord(s)
if err := s.streamingCoord.Start(s.ctx); err != nil {
log.Error("streamCoord start failed", zap.Error(err))
return err
}
if err := s.rootcoordServer.Init(); err != nil {
log.Error("rootCoord init failed", zap.Error(err))
return err
}
if err := s.rootcoordServer.Start(); err != nil {
log.Error("rootCoord start failed", zap.Error(err))
return err
}
if err := s.datacoordServer.Init(); err != nil {
log.Error("dataCoord init failed", zap.Error(err))
return err
}
if err := s.datacoordServer.Start(); err != nil {
log.Error("dataCoord start failed", zap.Error(err))
return err
}
if err := s.queryCoordServer.Init(); err != nil {
log.Error("queryCoord init failed", zap.Error(err))
return err
}
if err := s.queryCoordServer.Start(); err != nil {
log.Error("queryCoord start failed", zap.Error(err))
return err
}
return nil
}
func (s *mixCoordImpl) initKVCreator() {
if s.metaKVCreator == nil {
if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
s.metaKVCreator = func() kv.MetaKv {
return tikv.NewTiKV(s.tikvCli, Params.TiKVCfg.MetaRootPath.GetValue(),
tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
}
} else {
s.metaKVCreator = func() kv.MetaKv {
return etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(),
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
}
}
}
}
func (s *mixCoordImpl) Start() error {
s.UpdateStateCode(commonpb.StateCode_Healthy)
var startErr error
return startErr
}
func (s *mixCoordImpl) Stop() error {
log.Info("graceful stop")
s.GracefulStop()
log.Info("graceful stop done")
if err := s.queryCoordServer.Stop(); err != nil {
log.Error("Failed to stop queryCoord", zap.Error(err))
}
if err := s.datacoordServer.Stop(); err != nil {
log.Error("Failed to stop dataCoord", zap.Error(err))
}
if err := s.rootcoordServer.Stop(); err != nil {
log.Error("Failed to stop rootCoord", zap.Error(err))
}
s.cancel()
return nil
}
func (s *mixCoordImpl) initStreamingCoord() {
fMixcoord := syncutil.NewFuture[types.MixCoordClient]()
fMixcoord.Set(s.mixCoordClient)
s.streamingCoord = streamingcoord.NewServerBuilder().
WithETCD(s.etcdCli).
WithMetaKV(s.metaKVCreator()).
WithSession(s.session).
WithMixCoordClient(fMixcoord).
Build()
}
func (s *mixCoordImpl) initSession() error {
s.session = sessionutil.NewSession(s.ctx)
s.session.Init(typeutil.MixCoordRole, s.address, true, true)
s.session.SetEnableActiveStandBy(s.enableActiveStandBy)
s.rootcoordServer.SetSession(s.session)
s.datacoordServer.SetSession(s.session)
s.queryCoordServer.SetSession(s.session)
return nil
}
func (s *mixCoordImpl) startHealthCheck() {
}
func (s *mixCoordImpl) SetAddress(address string) {
s.address = address
s.rootcoordServer.SetAddress(address)
s.datacoordServer.SetAddress(address)
s.queryCoordServer.SetAddress(address)
}
func (s *mixCoordImpl) SetEtcdClient(client *clientv3.Client) {
s.etcdCli = client
s.rootcoordServer.SetEtcdClient(client)
s.datacoordServer.SetEtcdClient(client)
s.queryCoordServer.SetEtcdClient(client)
}
func (s *mixCoordImpl) SetTiKVClient(client *txnkv.Client) {
s.tikvCli = client
s.rootcoordServer.SetTiKVClient(client)
s.datacoordServer.SetTiKVClient(client)
s.queryCoordServer.SetTiKVClient(client)
}
func (s *mixCoordImpl) SetMixCoordClient(client types.MixCoordClient) {
s.mixCoordClient = client
}
func (s *mixCoordImpl) GetServerID() int64 {
return paramtable.GetNodeID()
}
func (s *mixCoordImpl) UpdateStateCode(code commonpb.StateCode) {
s.stateCode.Store(int32(code))
}
func (s *mixCoordImpl) GetStateCode() commonpb.StateCode {
return commonpb.StateCode(s.stateCode.Load())
}
func (s *mixCoordImpl) GracefulStop() {
if s.streamingCoord != nil {
s.streamingCoord.Stop()
s.streamingCoord = nil
}
}
// RootCoordServer
func (s *mixCoordImpl) CreateCollection(ctx context.Context, req *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
return s.rootcoordServer.CreateCollection(ctx, req)
}
func (s *mixCoordImpl) DropCollection(ctx context.Context, req *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
return s.rootcoordServer.DropCollection(ctx, req)
}
func (s *mixCoordImpl) HasCollection(ctx context.Context, req *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
return s.rootcoordServer.HasCollection(ctx, req)
}
func (s *mixCoordImpl) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
return s.rootcoordServer.DescribeCollection(ctx, req)
}
func (s *mixCoordImpl) ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
return s.rootcoordServer.ShowCollections(ctx, req)
}
func (s *mixCoordImpl) ShowCollectionIDs(ctx context.Context, req *rootcoordpb.ShowCollectionIDsRequest) (*rootcoordpb.ShowCollectionIDsResponse, error) {
return s.rootcoordServer.ShowCollectionIDs(ctx, req)
}
func (s *mixCoordImpl) AlterCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
return s.rootcoordServer.AlterCollection(ctx, req)
}
func (s *mixCoordImpl) AlterCollectionField(ctx context.Context, req *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error) {
return s.rootcoordServer.AlterCollectionField(ctx, req)
}
func (s *mixCoordImpl) CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
return s.rootcoordServer.CreatePartition(ctx, req)
}
func (s *mixCoordImpl) DropPartition(ctx context.Context, req *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
return s.rootcoordServer.DropPartition(ctx, req)
}
func (s *mixCoordImpl) HasPartition(ctx context.Context, req *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
return s.rootcoordServer.HasPartition(ctx, req)
}
func (s *mixCoordImpl) ShowPartitions(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
return s.rootcoordServer.ShowPartitions(ctx, req)
}
func (s *mixCoordImpl) ShowPartitionsInternal(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
return s.rootcoordServer.ShowPartitionsInternal(ctx, req)
}
func (s *mixCoordImpl) AllocTimestamp(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
return s.rootcoordServer.AllocTimestamp(ctx, req)
}
func (s *mixCoordImpl) AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
return s.rootcoordServer.AllocID(ctx, req)
}
func (s *mixCoordImpl) UpdateChannelTimeTick(ctx context.Context, req *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) {
return s.rootcoordServer.UpdateChannelTimeTick(ctx, req)
}
func (s *mixCoordImpl) ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
return s.rootcoordServer.ShowSegments(ctx, req)
}
func (s *mixCoordImpl) GetPChannelInfo(ctx context.Context, req *rootcoordpb.GetPChannelInfoRequest) (*rootcoordpb.GetPChannelInfoResponse, error) {
return s.rootcoordServer.GetPChannelInfo(ctx, req)
}
func (s *mixCoordImpl) InvalidateCollectionMetaCache(ctx context.Context, req *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
return s.rootcoordServer.InvalidateCollectionMetaCache(ctx, req)
}
func (s *mixCoordImpl) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
return s.rootcoordServer.ShowConfigurations(ctx, req)
}
func (s *mixCoordImpl) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
return s.rootcoordServer.CreateAlias(ctx, in)
}
func (s *mixCoordImpl) DescribeCollectionInternal(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
return s.rootcoordServer.DescribeCollectionInternal(ctx, in)
}
// DropAlias drop collection alias
func (c *mixCoordImpl) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
return c.rootcoordServer.DropAlias(ctx, in)
}
// AlterAlias alter collection alias
func (c *mixCoordImpl) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
return c.rootcoordServer.AlterAlias(ctx, in)
}
// DescribeAlias describe collection alias
func (c *mixCoordImpl) DescribeAlias(ctx context.Context, in *milvuspb.DescribeAliasRequest) (*milvuspb.DescribeAliasResponse, error) {
return c.rootcoordServer.DescribeAlias(ctx, in)
}
// ListAliases list aliases
func (c *mixCoordImpl) ListAliases(ctx context.Context, in *milvuspb.ListAliasesRequest) (*milvuspb.ListAliasesResponse, error) {
return c.rootcoordServer.ListAliases(ctx, in)
}
func (c *mixCoordImpl) AddCollectionField(ctx context.Context, in *milvuspb.AddCollectionFieldRequest) (*commonpb.Status, error) {
return c.rootcoordServer.AddCollectionField(ctx, in)
}
func (s *mixCoordImpl) CreateCredential(ctx context.Context, req *internalpb.CredentialInfo) (*commonpb.Status, error) {
return s.rootcoordServer.CreateCredential(ctx, req)
}
func (s *mixCoordImpl) GetCredential(ctx context.Context, req *rootcoordpb.GetCredentialRequest) (*rootcoordpb.GetCredentialResponse, error) {
return s.rootcoordServer.GetCredential(ctx, req)
}
func (s *mixCoordImpl) UpdateCredential(ctx context.Context, req *internalpb.CredentialInfo) (*commonpb.Status, error) {
return s.rootcoordServer.UpdateCredential(ctx, req)
}
func (s *mixCoordImpl) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
return s.rootcoordServer.DeleteCredential(ctx, req)
}
func (s *mixCoordImpl) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
return s.rootcoordServer.ListCredUsers(ctx, req)
}
func (s *mixCoordImpl) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) (*commonpb.Status, error) {
return s.rootcoordServer.CreateRole(ctx, req)
}
func (s *mixCoordImpl) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (*commonpb.Status, error) {
return s.rootcoordServer.DropRole(ctx, req)
}
func (s *mixCoordImpl) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) {
return s.rootcoordServer.OperateUserRole(ctx, req)
}
func (s *mixCoordImpl) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) {
return s.rootcoordServer.SelectRole(ctx, req)
}
func (s *mixCoordImpl) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) {
return s.rootcoordServer.SelectUser(ctx, req)
}
func (s *mixCoordImpl) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) {
return s.rootcoordServer.OperatePrivilege(ctx, req)
}
func (s *mixCoordImpl) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) {
return s.rootcoordServer.SelectGrant(ctx, req)
}
func (s *mixCoordImpl) ListPolicy(ctx context.Context, req *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) {
return s.rootcoordServer.ListPolicy(ctx, req)
}
func (s *mixCoordImpl) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
return s.rootcoordServer.CheckHealth(ctx, req)
}
func (s *mixCoordImpl) RenameCollection(ctx context.Context, req *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) {
return s.rootcoordServer.RenameCollection(ctx, req)
}
func (s *mixCoordImpl) CreateDatabase(ctx context.Context, req *milvuspb.CreateDatabaseRequest) (*commonpb.Status, error) {
return s.rootcoordServer.CreateDatabase(ctx, req)
}
func (s *mixCoordImpl) DropDatabase(ctx context.Context, req *milvuspb.DropDatabaseRequest) (*commonpb.Status, error) {
return s.rootcoordServer.DropDatabase(ctx, req)
}
func (s *mixCoordImpl) ListDatabases(ctx context.Context, req *milvuspb.ListDatabasesRequest) (*milvuspb.ListDatabasesResponse, error) {
return s.rootcoordServer.ListDatabases(ctx, req)
}
func (s *mixCoordImpl) DescribeDatabase(ctx context.Context, req *rootcoordpb.DescribeDatabaseRequest) (*rootcoordpb.DescribeDatabaseResponse, error) {
return s.rootcoordServer.DescribeDatabase(ctx, req)
}
func (s *mixCoordImpl) AlterDatabase(ctx context.Context, req *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) {
return s.rootcoordServer.AlterDatabase(ctx, req)
}
func (s *mixCoordImpl) BackupRBAC(ctx context.Context, req *milvuspb.BackupRBACMetaRequest) (*milvuspb.BackupRBACMetaResponse, error) {
return s.rootcoordServer.BackupRBAC(ctx, req)
}
func (s *mixCoordImpl) RestoreRBAC(ctx context.Context, req *milvuspb.RestoreRBACMetaRequest) (*commonpb.Status, error) {
return s.rootcoordServer.RestoreRBAC(ctx, req)
}
func (s *mixCoordImpl) CreatePrivilegeGroup(ctx context.Context, req *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) {
return s.rootcoordServer.CreatePrivilegeGroup(ctx, req)
}
func (s *mixCoordImpl) DropPrivilegeGroup(ctx context.Context, req *milvuspb.DropPrivilegeGroupRequest) (*commonpb.Status, error) {
return s.rootcoordServer.DropPrivilegeGroup(ctx, req)
}
func (s *mixCoordImpl) ListPrivilegeGroups(ctx context.Context, req *milvuspb.ListPrivilegeGroupsRequest) (*milvuspb.ListPrivilegeGroupsResponse, error) {
return s.rootcoordServer.ListPrivilegeGroups(ctx, req)
}
func (s *mixCoordImpl) OperatePrivilegeGroup(ctx context.Context, req *milvuspb.OperatePrivilegeGroupRequest) (*commonpb.Status, error) {
return s.rootcoordServer.OperatePrivilegeGroup(ctx, req)
}
// GetComponentStates get states of components
func (s *mixCoordImpl) GetComponentStates(ctx context.Context, req *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
code := s.GetStateCode()
log.Ctx(ctx).Debug("Mix coord current state", zap.String("StateCode", code.String()))
nodeID := common.NotRegisteredID
if s.session != nil && s.session.Registered() {
nodeID = s.session.ServerID
}
return &milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{
NodeID: nodeID,
Role: typeutil.MixCoordRole,
StateCode: code,
ExtraInfo: nil,
},
Status: merr.Success(),
SubcomponentStates: []*milvuspb.ComponentInfo{
{
NodeID: nodeID,
Role: typeutil.MixCoordRole,
StateCode: code,
ExtraInfo: nil,
},
},
}, nil
}
// GetTimeTickChannel get timetick channel name
func (sc *mixCoordImpl) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: merr.Success(),
Value: Params.CommonCfg.RootCoordTimeTick.GetValue(),
}, nil
}
// GetStatisticsChannel get statistics channel name
func (s *mixCoordImpl) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: merr.Success(),
Value: Params.CommonCfg.RootCoordStatistics.GetValue(),
}, nil
}
// GetMetrics get metrics
func (s *mixCoordImpl) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
systemTopology := metricsinfo.SystemTopology{
NodesInfo: make([]metricsinfo.SystemTopologyNode, 0),
}
// If a processing role is specified, the corresponding role will be used for processing
ret := gjson.Parse(in.GetRequest())
processRole, _ := metricsinfo.ParseMetricProcessInRole(ret)
if len(processRole) > 0 && processRole == typeutil.QueryCoordRole {
return s.GetQcMetrics(ctx, in)
} else if len(processRole) > 0 && processRole == typeutil.DataCoordRole {
return s.GetDcMetrics(ctx, in)
}
identifierMap := make(map[string]int)
rootCoordResp, rootCoordErr := s.rootcoordServer.GetMetrics(ctx, in)
var rootCoordTopology metricsinfo.RootCoordTopology
rootCoordRoleName := ""
if rootCoordErr == nil && rootCoordResp != nil {
rootCoordRoleName = rootCoordResp.GetComponentName()
rootCoordErr = metricsinfo.UnmarshalTopology(rootCoordResp.GetResponse(), &rootCoordTopology)
identifierMap[rootCoordRoleName] = int(rootCoordTopology.Self.ID)
}
dataCoordResp, dataCoordErr := s.datacoordServer.GetMetrics(ctx, in)
var dataCoordTopology metricsinfo.DataCoordTopology
dataCoordRoleName := ""
if dataCoordErr == nil && dataCoordResp != nil {
dataCoordRoleName = dataCoordResp.GetComponentName()
dataCoordErr = metricsinfo.UnmarshalTopology(dataCoordResp.GetResponse(), &dataCoordTopology)
identifierMap[dataCoordRoleName] = int(dataCoordTopology.Cluster.Self.ID)
}
queryCoordResp, queryCoordErr := s.queryCoordServer.GetMetrics(ctx, in)
var queryCoordTopology metricsinfo.QueryCoordTopology
queryCoordRoleName := ""
if queryCoordErr == nil && queryCoordResp != nil {
queryCoordRoleName = queryCoordResp.GetComponentName()
queryCoordErr = metricsinfo.UnmarshalTopology(queryCoordResp.GetResponse(), &queryCoordTopology)
identifierMap[queryCoordRoleName] = int(queryCoordTopology.Cluster.Self.ID)
}
if rootCoordErr == nil && rootCoordResp != nil {
rootCoordTopologyNode := metricsinfo.SystemTopologyNode{
Identifier: identifierMap[rootCoordRoleName],
Connected: make([]metricsinfo.ConnectionEdge, 0),
Infos: &rootCoordTopology.Self,
}
if dataCoordErr == nil && dataCoordResp != nil {
rootCoordTopologyNode.Connected = append(rootCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
ConnectedIdentifier: identifierMap[dataCoordRoleName],
Type: metricsinfo.Forward,
TargetType: typeutil.DataCoordRole,
})
}
if queryCoordErr == nil && queryCoordResp != nil {
rootCoordTopologyNode.Connected = append(rootCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
ConnectedIdentifier: identifierMap[queryCoordRoleName],
Type: metricsinfo.Forward,
TargetType: typeutil.QueryCoordRole,
})
}
systemTopology.NodesInfo = append(systemTopology.NodesInfo, rootCoordTopologyNode)
}
if dataCoordErr == nil && dataCoordResp != nil {
dataCoordTopologyNode := metricsinfo.SystemTopologyNode{
Identifier: identifierMap[dataCoordRoleName],
Connected: make([]metricsinfo.ConnectionEdge, 0),
Infos: &dataCoordTopology.Cluster.Self,
}
if rootCoordErr == nil && rootCoordResp != nil {
dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
ConnectedIdentifier: identifierMap[rootCoordRoleName],
Type: metricsinfo.Forward,
TargetType: typeutil.RootCoordRole,
})
}
if queryCoordErr == nil && queryCoordResp != nil {
dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
ConnectedIdentifier: identifierMap[queryCoordRoleName],
Type: metricsinfo.Forward,
TargetType: typeutil.QueryCoordRole,
})
}
for _, dataNode := range dataCoordTopology.Cluster.ConnectedDataNodes {
node := dataNode
identifier := int(node.ID)
identifierMap[dataNode.Name] = identifier
dataNodeTopologyNode := metricsinfo.SystemTopologyNode{
Identifier: identifier,
Connected: nil,
Infos: &node,
}
systemTopology.NodesInfo = append(systemTopology.NodesInfo, dataNodeTopologyNode)
dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
ConnectedIdentifier: identifier,
Type: metricsinfo.CoordConnectToNode,
TargetType: typeutil.DataNodeRole,
})
}
systemTopology.NodesInfo = append(systemTopology.NodesInfo, dataCoordTopologyNode)
}
if queryCoordErr == nil && queryCoordResp != nil {
queryCoordTopologyNode := metricsinfo.SystemTopologyNode{
Identifier: identifierMap[queryCoordRoleName],
Connected: make([]metricsinfo.ConnectionEdge, 0),
Infos: &queryCoordTopology.Cluster.Self,
}
if rootCoordErr == nil && rootCoordResp != nil {
queryCoordTopologyNode.Connected = append(queryCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
ConnectedIdentifier: identifierMap[rootCoordRoleName],
Type: metricsinfo.Forward,
TargetType: typeutil.RootCoordRole,
})
}
if dataCoordErr == nil && dataCoordResp != nil {
queryCoordTopologyNode.Connected = append(queryCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
ConnectedIdentifier: identifierMap[dataCoordRoleName],
Type: metricsinfo.Forward,
TargetType: typeutil.DataCoordRole,
})
}
for _, queryNode := range queryCoordTopology.Cluster.ConnectedNodes {
node := queryNode
identifier := int(node.ID)
identifierMap[queryNode.Name] = identifier
queryNodeTopologyNode := metricsinfo.SystemTopologyNode{
Identifier: identifier,
Connected: nil,
Infos: &node,
}
systemTopology.NodesInfo = append(systemTopology.NodesInfo, queryNodeTopologyNode)
queryCoordTopologyNode.Connected = append(queryCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
ConnectedIdentifier: identifier,
Type: metricsinfo.CoordConnectToNode,
TargetType: typeutil.QueryNodeRole,
})
}
systemTopology.NodesInfo = append(systemTopology.NodesInfo, queryCoordTopologyNode)
}
resp, err := metricsinfo.MarshalTopology(systemTopology)
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: merr.Status(err),
Response: "",
ComponentName: metricsinfo.ConstructComponentName(typeutil.MixCoordRole, paramtable.GetNodeID()),
}, nil
}
return &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.MixCoordRole, paramtable.GetNodeID()),
}, nil
}
// GetMetrics get metrics
func (s *mixCoordImpl) GetDcMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
return s.datacoordServer.GetMetrics(ctx, in)
}
func (s *mixCoordImpl) GetQcMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
return s.queryCoordServer.GetMetrics(ctx, in)
}
// QueryCoordServer
func (s *mixCoordImpl) ActivateChecker(ctx context.Context, req *querypb.ActivateCheckerRequest) (*commonpb.Status, error) {
return s.queryCoordServer.ActivateChecker(ctx, req)
}
func (s *mixCoordImpl) DeactivateChecker(ctx context.Context, req *querypb.DeactivateCheckerRequest) (*commonpb.Status, error) {
return s.queryCoordServer.DeactivateChecker(ctx, req)
}
func (s *mixCoordImpl) ListCheckers(ctx context.Context, req *querypb.ListCheckersRequest) (*querypb.ListCheckersResponse, error) {
return s.queryCoordServer.ListCheckers(ctx, req)
}
func (s *mixCoordImpl) ShowLoadCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
return s.queryCoordServer.ShowLoadCollections(ctx, req)
}
func (s *mixCoordImpl) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
return s.queryCoordServer.LoadCollection(ctx, req)
}
func (s *mixCoordImpl) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
return s.queryCoordServer.ReleaseCollection(ctx, req)
}
func (s *mixCoordImpl) ShowLoadPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return s.queryCoordServer.ShowLoadPartitions(ctx, req)
}
func (s *mixCoordImpl) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
return s.queryCoordServer.LoadPartitions(ctx, req)
}
func (s *mixCoordImpl) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
return s.queryCoordServer.ReleasePartitions(ctx, req)
}
func (s *mixCoordImpl) SyncNewCreatedPartition(ctx context.Context, req *querypb.SyncNewCreatedPartitionRequest) (*commonpb.Status, error) {
return s.queryCoordServer.SyncNewCreatedPartition(ctx, req)
}
func (s *mixCoordImpl) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
return s.queryCoordServer.GetPartitionStates(ctx, req)
}
func (s *mixCoordImpl) GetLoadSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
return s.queryCoordServer.GetLoadSegmentInfo(ctx, req)
}
func (s *mixCoordImpl) LoadBalance(ctx context.Context, req *querypb.LoadBalanceRequest) (*commonpb.Status, error) {
return s.queryCoordServer.LoadBalance(ctx, req)
}
func (s *mixCoordImpl) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
return s.queryCoordServer.GetReplicas(ctx, req)
}
func (s *mixCoordImpl) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
return s.queryCoordServer.GetShardLeaders(ctx, req)
}
func (s *mixCoordImpl) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) {
return s.queryCoordServer.CreateResourceGroup(ctx, req)
}
func (s *mixCoordImpl) UpdateResourceGroups(ctx context.Context, req *querypb.UpdateResourceGroupsRequest) (*commonpb.Status, error) {
return s.queryCoordServer.UpdateResourceGroups(ctx, req)
}
func (s *mixCoordImpl) DropResourceGroup(ctx context.Context, req *milvuspb.DropResourceGroupRequest) (*commonpb.Status, error) {
return s.queryCoordServer.DropResourceGroup(ctx, req)
}
func (s *mixCoordImpl) TransferNode(ctx context.Context, req *milvuspb.TransferNodeRequest) (*commonpb.Status, error) {
return s.queryCoordServer.TransferNode(ctx, req)
}
func (s *mixCoordImpl) TransferReplica(ctx context.Context, req *querypb.TransferReplicaRequest) (*commonpb.Status, error) {
return s.queryCoordServer.TransferReplica(ctx, req)
}
func (s *mixCoordImpl) ListResourceGroups(ctx context.Context, req *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error) {
return s.queryCoordServer.ListResourceGroups(ctx, req)
}
func (s *mixCoordImpl) DescribeResourceGroup(ctx context.Context, req *querypb.DescribeResourceGroupRequest) (*querypb.DescribeResourceGroupResponse, error) {
return s.queryCoordServer.DescribeResourceGroup(ctx, req)
}
func (s *mixCoordImpl) ListQueryNode(ctx context.Context, req *querypb.ListQueryNodeRequest) (*querypb.ListQueryNodeResponse, error) {
return s.queryCoordServer.ListQueryNode(ctx, req)
}
func (s *mixCoordImpl) GetQueryNodeDistribution(ctx context.Context, req *querypb.GetQueryNodeDistributionRequest) (*querypb.GetQueryNodeDistributionResponse, error) {
return s.queryCoordServer.GetQueryNodeDistribution(ctx, req)
}
func (s *mixCoordImpl) SuspendBalance(ctx context.Context, req *querypb.SuspendBalanceRequest) (*commonpb.Status, error) {
return s.queryCoordServer.SuspendBalance(ctx, req)
}
func (s *mixCoordImpl) ResumeBalance(ctx context.Context, req *querypb.ResumeBalanceRequest) (*commonpb.Status, error) {
return s.queryCoordServer.ResumeBalance(ctx, req)
}
func (s *mixCoordImpl) CheckBalanceStatus(ctx context.Context, req *querypb.CheckBalanceStatusRequest) (*querypb.CheckBalanceStatusResponse, error) {
return s.queryCoordServer.CheckBalanceStatus(ctx, req)
}
func (s *mixCoordImpl) SuspendNode(ctx context.Context, req *querypb.SuspendNodeRequest) (*commonpb.Status, error) {
return s.queryCoordServer.SuspendNode(ctx, req)
}
func (s *mixCoordImpl) ResumeNode(ctx context.Context, req *querypb.ResumeNodeRequest) (*commonpb.Status, error) {
return s.queryCoordServer.ResumeNode(ctx, req)
}
func (s *mixCoordImpl) TransferSegment(ctx context.Context, req *querypb.TransferSegmentRequest) (*commonpb.Status, error) {
return s.queryCoordServer.TransferSegment(ctx, req)
}
func (s *mixCoordImpl) TransferChannel(ctx context.Context, req *querypb.TransferChannelRequest) (*commonpb.Status, error) {
return s.queryCoordServer.TransferChannel(ctx, req)
}
func (s *mixCoordImpl) CheckQueryNodeDistribution(ctx context.Context, req *querypb.CheckQueryNodeDistributionRequest) (*commonpb.Status, error) {
return s.queryCoordServer.CheckQueryNodeDistribution(ctx, req)
}
func (s *mixCoordImpl) UpdateLoadConfig(ctx context.Context, req *querypb.UpdateLoadConfigRequest) (*commonpb.Status, error) {
return s.queryCoordServer.UpdateLoadConfig(ctx, req)
}
// DataCoordServer
func (s *mixCoordImpl) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return s.datacoordServer.GetSegmentInfo(ctx, req)
}
func (s *mixCoordImpl) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
return s.datacoordServer.Flush(ctx, req)
}
func (s *mixCoordImpl) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
return s.datacoordServer.AssignSegmentID(ctx, req)
}
func (s *mixCoordImpl) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return s.datacoordServer.GetSegmentStates(ctx, req)
}
func (s *mixCoordImpl) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
return s.datacoordServer.GetInsertBinlogPaths(ctx, req)
}
func (s *mixCoordImpl) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
return s.datacoordServer.GetCollectionStatistics(ctx, req)
}
func (s *mixCoordImpl) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) {
return s.datacoordServer.GetPartitionStatistics(ctx, req)
}
func (s *mixCoordImpl) GetSegmentInfoChannel(ctx context.Context, req *datapb.GetSegmentInfoChannelRequest) (*milvuspb.StringResponse, error) {
return s.datacoordServer.GetSegmentInfoChannel(ctx, req)
}
func (s *mixCoordImpl) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
return s.datacoordServer.SaveBinlogPaths(ctx, req)
}
func (s *mixCoordImpl) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
return s.datacoordServer.GetRecoveryInfo(ctx, req)
}
func (s *mixCoordImpl) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryInfoRequestV2) (*datapb.GetRecoveryInfoResponseV2, error) {
return s.datacoordServer.GetRecoveryInfoV2(ctx, req)
}
func (s *mixCoordImpl) GetChannelRecoveryInfo(ctx context.Context, req *datapb.GetChannelRecoveryInfoRequest) (*datapb.GetChannelRecoveryInfoResponse, error) {
return s.datacoordServer.GetChannelRecoveryInfo(ctx, req)
}
func (s *mixCoordImpl) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
return s.datacoordServer.GetFlushedSegments(ctx, req)
}
func (s *mixCoordImpl) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegmentsByStatesRequest) (*datapb.GetSegmentsByStatesResponse, error) {
return s.datacoordServer.GetSegmentsByStates(ctx, req)
}
func (s *mixCoordImpl) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
return s.datacoordServer.ManualCompaction(ctx, req)
}
func (s *mixCoordImpl) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
return s.datacoordServer.GetCompactionState(ctx, req)
}
func (s *mixCoordImpl) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
return s.datacoordServer.GetCompactionStateWithPlans(ctx, req)
}
func (s *mixCoordImpl) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
return s.datacoordServer.WatchChannels(ctx, req)
}
func (s *mixCoordImpl) GetFlushState(ctx context.Context, req *datapb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
return s.datacoordServer.GetFlushState(ctx, req)
}
func (s *mixCoordImpl) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAllStateRequest) (*milvuspb.GetFlushAllStateResponse, error) {
return s.datacoordServer.GetFlushAllState(ctx, req)
}
func (s *mixCoordImpl) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
return s.datacoordServer.DropVirtualChannel(ctx, req)
}
func (s *mixCoordImpl) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest) (*datapb.SetSegmentStateResponse, error) {
return s.datacoordServer.SetSegmentState(ctx, req)
}
func (s *mixCoordImpl) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) (*commonpb.Status, error) {
return s.datacoordServer.UpdateSegmentStatistics(ctx, req)
}
func (s *mixCoordImpl) UpdateChannelCheckpoint(ctx context.Context, req *datapb.UpdateChannelCheckpointRequest) (*commonpb.Status, error) {
return s.datacoordServer.UpdateChannelCheckpoint(ctx, req)
}
func (s *mixCoordImpl) MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmentsDroppedRequest) (*commonpb.Status, error) {
return s.datacoordServer.MarkSegmentsDropped(ctx, req)
}
func (s *mixCoordImpl) BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
return s.datacoordServer.BroadcastAlteredCollection(ctx, req)
}
func (s *mixCoordImpl) GcConfirm(ctx context.Context, req *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
return s.datacoordServer.GcConfirm(ctx, req)
}
func (s *mixCoordImpl) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
return s.datacoordServer.CreateIndex(ctx, req)
}
func (s *mixCoordImpl) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest) (*commonpb.Status, error) {
return s.datacoordServer.AlterIndex(ctx, req)
}
func (s *mixCoordImpl) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) {
return s.datacoordServer.GetIndexState(ctx, req)
}
func (s *mixCoordImpl) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegmentIndexStateRequest) (*indexpb.GetSegmentIndexStateResponse, error) {
return s.datacoordServer.GetSegmentIndexState(ctx, req)
}
func (s *mixCoordImpl) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoRequest) (*indexpb.GetIndexInfoResponse, error) {
return s.datacoordServer.GetIndexInfos(ctx, req)
}
func (s *mixCoordImpl) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) {
return s.datacoordServer.DescribeIndex(ctx, req)
}
func (s *mixCoordImpl) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexStatisticsRequest) (*indexpb.GetIndexStatisticsResponse, error) {
return s.datacoordServer.GetIndexStatistics(ctx, req)
}
func (s *mixCoordImpl) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
return s.datacoordServer.DropIndex(ctx, req)
}
func (s *mixCoordImpl) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest) (*indexpb.GetIndexBuildProgressResponse, error) {
return s.datacoordServer.GetIndexBuildProgress(ctx, req)
}
func (s *mixCoordImpl) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDataNodeTtMsgsRequest) (*commonpb.Status, error) {
return s.datacoordServer.ReportDataNodeTtMsgs(ctx, req)
}
func (s *mixCoordImpl) GcControl(ctx context.Context, req *datapb.GcControlRequest) (*commonpb.Status, error) {
return s.datacoordServer.GcControl(ctx, req)
}
func (s *mixCoordImpl) ImportV2(ctx context.Context, req *internalpb.ImportRequestInternal) (*internalpb.ImportResponse, error) {
return s.datacoordServer.ImportV2(ctx, req)
}
func (s *mixCoordImpl) GetImportProgress(ctx context.Context, req *internalpb.GetImportProgressRequest) (*internalpb.GetImportProgressResponse, error) {
return s.datacoordServer.GetImportProgress(ctx, req)
}
func (s *mixCoordImpl) ListImports(ctx context.Context, req *internalpb.ListImportsRequestInternal) (*internalpb.ListImportsResponse, error) {
return s.datacoordServer.ListImports(ctx, req)
}
func (s *mixCoordImpl) ListIndexes(ctx context.Context, req *indexpb.ListIndexesRequest) (*indexpb.ListIndexesResponse, error) {
return s.datacoordServer.ListIndexes(ctx, req)
}
func (s *mixCoordImpl) AllocSegment(ctx context.Context, req *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error) {
return s.datacoordServer.AllocSegment(ctx, req)
}
func (s *mixCoordImpl) NotifyDropPartition(ctx context.Context, channel string, partitionIDs []int64) error {
return s.datacoordServer.NotifyDropPartition(ctx, channel, partitionIDs)
}
// RegisterStreamingCoordGRPCService registers the grpc service of streaming coordinator.
func (s *mixCoordImpl) RegisterStreamingCoordGRPCService(server *grpc.Server) {
s.streamingCoord.RegisterGRPCService(server)
}
func (s *mixCoordImpl) GetQuotaMetrics(ctx context.Context, req *internalpb.GetQuotaMetricsRequest) (*internalpb.GetQuotaMetricsResponse, error) {
return s.rootcoordServer.GetQuotaMetrics(ctx, req)
}
func (s *mixCoordImpl) ListLoadedSegments(ctx context.Context, req *querypb.ListLoadedSegmentsRequest) (*querypb.ListLoadedSegmentsResponse, error) {
return s.queryCoordServer.ListLoadedSegments(ctx, req)
}
func (s *mixCoordImpl) FlushAll(ctx context.Context, req *datapb.FlushAllRequest) (*datapb.FlushAllResponse, error) {
return s.datacoordServer.FlushAll(ctx, req)
}
// AddFileResource add file resource
func (s *mixCoordImpl) AddFileResource(ctx context.Context, req *milvuspb.AddFileResourceRequest) (*commonpb.Status, error) {
return s.datacoordServer.AddFileResource(ctx, req)
}
// RemoveFileResource remove file resource
func (s *mixCoordImpl) RemoveFileResource(ctx context.Context, req *milvuspb.RemoveFileResourceRequest) (*commonpb.Status, error) {
return s.datacoordServer.RemoveFileResource(ctx, req)
}
// ListFileResources list file resources
func (s *mixCoordImpl) ListFileResources(ctx context.Context, req *milvuspb.ListFileResourcesRequest) (*milvuspb.ListFileResourcesResponse, error) {
return s.datacoordServer.ListFileResources(ctx, req)
}