package coordinator import ( "context" "fmt" "os" "sync" "time" "github.com/tidwall/gjson" "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/datacoord" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/kv/tikv" "github.com/milvus-io/milvus/internal/querycoordv2" "github.com/milvus-io/milvus/internal/rootcoord" streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/kv" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/proto/proxypb" "github.com/milvus-io/milvus/pkg/v2/proto/querypb" "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/v2/util" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/syncutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) var Params *paramtable.ComponentParam = paramtable.Get() type mixCoordImpl struct { rootcoordServer *rootcoord.Core queryCoordServer *querycoordv2.Server datacoordServer *datacoord.Server streamingCoord *streamingcoord.Server ctx context.Context cancel context.CancelFunc wg sync.WaitGroup etcdCli *clientv3.Client tikvCli *txnkv.Client address string proxyCreator proxyutil.ProxyCreator proxyWatcher *proxyutil.ProxyWatcher proxyClientManager proxyutil.ProxyClientManagerInterface metricsCacheManager *metricsinfo.MetricsCacheManager stateCode atomic.Int32 initOnce sync.Once startOnce sync.Once session *sessionutil.Session factory dependency.Factory enableActiveStandBy bool activateFunc func() error metricsRequest *metricsinfo.MetricsRequest metaKVCreator func() kv.MetaKv mixCoordClient types.MixCoordClient } func NewMixCoordServer(c context.Context, factory dependency.Factory) (*mixCoordImpl, error) { ctx, cancel := context.WithCancel(c) rootCoordServer, _ := rootcoord.NewCore(ctx, factory) queryCoordServer, _ := querycoordv2.NewQueryCoord(c) dataCoordServer := datacoord.CreateServer(c, factory) return &mixCoordImpl{ ctx: ctx, cancel: cancel, rootcoordServer: rootCoordServer, queryCoordServer: queryCoordServer, datacoordServer: dataCoordServer, enableActiveStandBy: Params.MixCoordCfg.EnableActiveStandby.GetAsBool(), factory: factory, }, nil } // Register register mixcoord at etcd func (s *mixCoordImpl) Register() error { log := log.Ctx(s.ctx) s.session.Register() afterRegister := func() { metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.MixCoordRole).Inc() log.Info("MixCoord Register Finished") s.session.LivenessCheck(s.ctx, func() { log.Error("MixCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.GetServerID())) os.Exit(1) }) } if s.enableActiveStandBy { go func() { if err := s.session.ProcessActiveStandBy(s.activateFunc); err != nil { log.Error("failed to activate standby server", zap.Error(err)) panic(err) } afterRegister() }() } else { afterRegister() } return nil } func (s *mixCoordImpl) Init() error { log := log.Ctx(s.ctx) var initErr error if initErr = s.initSession(); initErr != nil { return initErr } s.factory.Init(Params) s.initKVCreator() s.initStreamingCoord() if s.enableActiveStandBy { s.activateFunc = func() error { log.Info("mixCoord switch from standby to active, activating") var err error s.initOnce.Do(func() { if err = s.initInternal(); err != nil { log.Error("mixCoord init failed", zap.Error(err)) } }) if err != nil { return err } log.Info("mixCoord startup success", zap.String("address", s.session.GetAddress())) return err } s.UpdateStateCode(commonpb.StateCode_StandBy) log.Info("MixCoord enter standby mode successfully") } else { s.initOnce.Do(func() { if initErr = s.initInternal(); initErr != nil { log.Error("mixCoord init failed", zap.Error(initErr)) } }) } return initErr } func (s *mixCoordImpl) initInternal() error { log := log.Ctx(s.ctx) s.rootcoordServer.SetMixCoord(s) s.datacoordServer.SetMixCoord(s) s.queryCoordServer.SetMixCoord(s) if err := s.streamingCoord.Start(s.ctx); err != nil { log.Error("streamCoord start failed", zap.Error(err)) return err } if err := s.rootcoordServer.Init(); err != nil { log.Error("rootCoord init failed", zap.Error(err)) return err } if err := s.rootcoordServer.Start(); err != nil { log.Error("rootCoord start failed", zap.Error(err)) return err } if err := s.datacoordServer.Init(); err != nil { log.Error("dataCoord init failed", zap.Error(err)) return err } if err := s.datacoordServer.Start(); err != nil { log.Error("dataCoord start failed", zap.Error(err)) return err } if err := s.queryCoordServer.Init(); err != nil { log.Error("queryCoord init failed", zap.Error(err)) return err } if err := s.queryCoordServer.Start(); err != nil { log.Error("queryCoord start failed", zap.Error(err)) return err } return nil } func (s *mixCoordImpl) initKVCreator() { if s.metaKVCreator == nil { if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV { s.metaKVCreator = func() kv.MetaKv { return tikv.NewTiKV(s.tikvCli, Params.TiKVCfg.MetaRootPath.GetValue(), tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond))) } } else { s.metaKVCreator = func() kv.MetaKv { return etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(), etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))) } } } } func (s *mixCoordImpl) Start() error { s.UpdateStateCode(commonpb.StateCode_Healthy) var startErr error return startErr } func (s *mixCoordImpl) Stop() error { log.Info("graceful stop") s.GracefulStop() log.Info("graceful stop done") if err := s.queryCoordServer.Stop(); err != nil { log.Error("Failed to stop queryCoord", zap.Error(err)) } if err := s.datacoordServer.Stop(); err != nil { log.Error("Failed to stop dataCoord", zap.Error(err)) } if err := s.rootcoordServer.Stop(); err != nil { log.Error("Failed to stop rootCoord", zap.Error(err)) } s.cancel() return nil } func (s *mixCoordImpl) initStreamingCoord() { fMixcoord := syncutil.NewFuture[types.MixCoordClient]() fMixcoord.Set(s.mixCoordClient) s.streamingCoord = streamingcoord.NewServerBuilder(). WithETCD(s.etcdCli). WithMetaKV(s.metaKVCreator()). WithSession(s.session). WithMixCoordClient(fMixcoord). Build() } func (s *mixCoordImpl) initSession() error { s.session = sessionutil.NewSession(s.ctx) s.session.Init(typeutil.MixCoordRole, s.address, true, true) s.session.SetEnableActiveStandBy(s.enableActiveStandBy) s.rootcoordServer.SetSession(s.session) s.datacoordServer.SetSession(s.session) s.queryCoordServer.SetSession(s.session) return nil } func (s *mixCoordImpl) startHealthCheck() { } func (s *mixCoordImpl) SetAddress(address string) { s.address = address s.rootcoordServer.SetAddress(address) s.datacoordServer.SetAddress(address) s.queryCoordServer.SetAddress(address) } func (s *mixCoordImpl) SetEtcdClient(client *clientv3.Client) { s.etcdCli = client s.rootcoordServer.SetEtcdClient(client) s.datacoordServer.SetEtcdClient(client) s.queryCoordServer.SetEtcdClient(client) } func (s *mixCoordImpl) SetTiKVClient(client *txnkv.Client) { s.tikvCli = client s.rootcoordServer.SetTiKVClient(client) s.datacoordServer.SetTiKVClient(client) s.queryCoordServer.SetTiKVClient(client) } func (s *mixCoordImpl) SetMixCoordClient(client types.MixCoordClient) { s.mixCoordClient = client } func (s *mixCoordImpl) GetServerID() int64 { return paramtable.GetNodeID() } func (s *mixCoordImpl) UpdateStateCode(code commonpb.StateCode) { s.stateCode.Store(int32(code)) } func (s *mixCoordImpl) GetStateCode() commonpb.StateCode { return commonpb.StateCode(s.stateCode.Load()) } func (s *mixCoordImpl) GracefulStop() { if s.streamingCoord != nil { s.streamingCoord.Stop() s.streamingCoord = nil } } // RootCoordServer func (s *mixCoordImpl) CreateCollection(ctx context.Context, req *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { return s.rootcoordServer.CreateCollection(ctx, req) } func (s *mixCoordImpl) DropCollection(ctx context.Context, req *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { return s.rootcoordServer.DropCollection(ctx, req) } func (s *mixCoordImpl) HasCollection(ctx context.Context, req *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { return s.rootcoordServer.HasCollection(ctx, req) } func (s *mixCoordImpl) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { return s.rootcoordServer.DescribeCollection(ctx, req) } func (s *mixCoordImpl) ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) { return s.rootcoordServer.ShowCollections(ctx, req) } func (s *mixCoordImpl) ShowCollectionIDs(ctx context.Context, req *rootcoordpb.ShowCollectionIDsRequest) (*rootcoordpb.ShowCollectionIDsResponse, error) { return s.rootcoordServer.ShowCollectionIDs(ctx, req) } func (s *mixCoordImpl) AlterCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { return s.rootcoordServer.AlterCollection(ctx, req) } func (s *mixCoordImpl) AlterCollectionField(ctx context.Context, req *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error) { return s.rootcoordServer.AlterCollectionField(ctx, req) } func (s *mixCoordImpl) CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { return s.rootcoordServer.CreatePartition(ctx, req) } func (s *mixCoordImpl) DropPartition(ctx context.Context, req *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { return s.rootcoordServer.DropPartition(ctx, req) } func (s *mixCoordImpl) HasPartition(ctx context.Context, req *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { return s.rootcoordServer.HasPartition(ctx, req) } func (s *mixCoordImpl) ShowPartitions(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) { return s.rootcoordServer.ShowPartitions(ctx, req) } func (s *mixCoordImpl) ShowPartitionsInternal(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) { return s.rootcoordServer.ShowPartitionsInternal(ctx, req) } func (s *mixCoordImpl) AllocTimestamp(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) { return s.rootcoordServer.AllocTimestamp(ctx, req) } func (s *mixCoordImpl) AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) { return s.rootcoordServer.AllocID(ctx, req) } func (s *mixCoordImpl) UpdateChannelTimeTick(ctx context.Context, req *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) { return s.rootcoordServer.UpdateChannelTimeTick(ctx, req) } func (s *mixCoordImpl) ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) { return s.rootcoordServer.ShowSegments(ctx, req) } func (s *mixCoordImpl) GetPChannelInfo(ctx context.Context, req *rootcoordpb.GetPChannelInfoRequest) (*rootcoordpb.GetPChannelInfoResponse, error) { return s.rootcoordServer.GetPChannelInfo(ctx, req) } func (s *mixCoordImpl) InvalidateCollectionMetaCache(ctx context.Context, req *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { return s.rootcoordServer.InvalidateCollectionMetaCache(ctx, req) } func (s *mixCoordImpl) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { return s.rootcoordServer.ShowConfigurations(ctx, req) } func (s *mixCoordImpl) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) (*commonpb.Status, error) { return s.rootcoordServer.CreateAlias(ctx, in) } func (s *mixCoordImpl) DescribeCollectionInternal(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { return s.rootcoordServer.DescribeCollectionInternal(ctx, in) } // DropAlias drop collection alias func (c *mixCoordImpl) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*commonpb.Status, error) { return c.rootcoordServer.DropAlias(ctx, in) } // AlterAlias alter collection alias func (c *mixCoordImpl) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (*commonpb.Status, error) { return c.rootcoordServer.AlterAlias(ctx, in) } // DescribeAlias describe collection alias func (c *mixCoordImpl) DescribeAlias(ctx context.Context, in *milvuspb.DescribeAliasRequest) (*milvuspb.DescribeAliasResponse, error) { return c.rootcoordServer.DescribeAlias(ctx, in) } // ListAliases list aliases func (c *mixCoordImpl) ListAliases(ctx context.Context, in *milvuspb.ListAliasesRequest) (*milvuspb.ListAliasesResponse, error) { return c.rootcoordServer.ListAliases(ctx, in) } func (c *mixCoordImpl) AddCollectionField(ctx context.Context, in *milvuspb.AddCollectionFieldRequest) (*commonpb.Status, error) { return c.rootcoordServer.AddCollectionField(ctx, in) } func (s *mixCoordImpl) CreateCredential(ctx context.Context, req *internalpb.CredentialInfo) (*commonpb.Status, error) { return s.rootcoordServer.CreateCredential(ctx, req) } func (s *mixCoordImpl) GetCredential(ctx context.Context, req *rootcoordpb.GetCredentialRequest) (*rootcoordpb.GetCredentialResponse, error) { return s.rootcoordServer.GetCredential(ctx, req) } func (s *mixCoordImpl) UpdateCredential(ctx context.Context, req *internalpb.CredentialInfo) (*commonpb.Status, error) { return s.rootcoordServer.UpdateCredential(ctx, req) } func (s *mixCoordImpl) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) { return s.rootcoordServer.DeleteCredential(ctx, req) } func (s *mixCoordImpl) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) { return s.rootcoordServer.ListCredUsers(ctx, req) } func (s *mixCoordImpl) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) (*commonpb.Status, error) { return s.rootcoordServer.CreateRole(ctx, req) } func (s *mixCoordImpl) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (*commonpb.Status, error) { return s.rootcoordServer.DropRole(ctx, req) } func (s *mixCoordImpl) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) { return s.rootcoordServer.OperateUserRole(ctx, req) } func (s *mixCoordImpl) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) { return s.rootcoordServer.SelectRole(ctx, req) } func (s *mixCoordImpl) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) { return s.rootcoordServer.SelectUser(ctx, req) } func (s *mixCoordImpl) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) { return s.rootcoordServer.OperatePrivilege(ctx, req) } func (s *mixCoordImpl) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) { return s.rootcoordServer.SelectGrant(ctx, req) } func (s *mixCoordImpl) ListPolicy(ctx context.Context, req *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) { return s.rootcoordServer.ListPolicy(ctx, req) } func (s *mixCoordImpl) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { return s.rootcoordServer.CheckHealth(ctx, req) } func (s *mixCoordImpl) RenameCollection(ctx context.Context, req *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) { return s.rootcoordServer.RenameCollection(ctx, req) } func (s *mixCoordImpl) CreateDatabase(ctx context.Context, req *milvuspb.CreateDatabaseRequest) (*commonpb.Status, error) { return s.rootcoordServer.CreateDatabase(ctx, req) } func (s *mixCoordImpl) DropDatabase(ctx context.Context, req *milvuspb.DropDatabaseRequest) (*commonpb.Status, error) { return s.rootcoordServer.DropDatabase(ctx, req) } func (s *mixCoordImpl) ListDatabases(ctx context.Context, req *milvuspb.ListDatabasesRequest) (*milvuspb.ListDatabasesResponse, error) { return s.rootcoordServer.ListDatabases(ctx, req) } func (s *mixCoordImpl) DescribeDatabase(ctx context.Context, req *rootcoordpb.DescribeDatabaseRequest) (*rootcoordpb.DescribeDatabaseResponse, error) { return s.rootcoordServer.DescribeDatabase(ctx, req) } func (s *mixCoordImpl) AlterDatabase(ctx context.Context, req *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) { return s.rootcoordServer.AlterDatabase(ctx, req) } func (s *mixCoordImpl) BackupRBAC(ctx context.Context, req *milvuspb.BackupRBACMetaRequest) (*milvuspb.BackupRBACMetaResponse, error) { return s.rootcoordServer.BackupRBAC(ctx, req) } func (s *mixCoordImpl) RestoreRBAC(ctx context.Context, req *milvuspb.RestoreRBACMetaRequest) (*commonpb.Status, error) { return s.rootcoordServer.RestoreRBAC(ctx, req) } func (s *mixCoordImpl) CreatePrivilegeGroup(ctx context.Context, req *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) { return s.rootcoordServer.CreatePrivilegeGroup(ctx, req) } func (s *mixCoordImpl) DropPrivilegeGroup(ctx context.Context, req *milvuspb.DropPrivilegeGroupRequest) (*commonpb.Status, error) { return s.rootcoordServer.DropPrivilegeGroup(ctx, req) } func (s *mixCoordImpl) ListPrivilegeGroups(ctx context.Context, req *milvuspb.ListPrivilegeGroupsRequest) (*milvuspb.ListPrivilegeGroupsResponse, error) { return s.rootcoordServer.ListPrivilegeGroups(ctx, req) } func (s *mixCoordImpl) OperatePrivilegeGroup(ctx context.Context, req *milvuspb.OperatePrivilegeGroupRequest) (*commonpb.Status, error) { return s.rootcoordServer.OperatePrivilegeGroup(ctx, req) } // GetComponentStates get states of components func (s *mixCoordImpl) GetComponentStates(ctx context.Context, req *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) { code := s.GetStateCode() log.Ctx(ctx).Debug("Mix coord current state", zap.String("StateCode", code.String())) nodeID := common.NotRegisteredID if s.session != nil && s.session.Registered() { nodeID = s.session.ServerID } return &milvuspb.ComponentStates{ State: &milvuspb.ComponentInfo{ NodeID: nodeID, Role: typeutil.MixCoordRole, StateCode: code, ExtraInfo: nil, }, Status: merr.Success(), SubcomponentStates: []*milvuspb.ComponentInfo{ { NodeID: nodeID, Role: typeutil.MixCoordRole, StateCode: code, ExtraInfo: nil, }, }, }, nil } // GetTimeTickChannel get timetick channel name func (sc *mixCoordImpl) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ Status: merr.Success(), Value: Params.CommonCfg.RootCoordTimeTick.GetValue(), }, nil } // GetStatisticsChannel get statistics channel name func (s *mixCoordImpl) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ Status: merr.Success(), Value: Params.CommonCfg.RootCoordStatistics.GetValue(), }, nil } // GetMetrics get metrics func (s *mixCoordImpl) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { systemTopology := metricsinfo.SystemTopology{ NodesInfo: make([]metricsinfo.SystemTopologyNode, 0), } // If a processing role is specified, the corresponding role will be used for processing ret := gjson.Parse(in.GetRequest()) processRole, _ := metricsinfo.ParseMetricProcessInRole(ret) if len(processRole) > 0 && processRole == typeutil.QueryCoordRole { return s.GetQcMetrics(ctx, in) } else if len(processRole) > 0 && processRole == typeutil.DataCoordRole { return s.GetDcMetrics(ctx, in) } identifierMap := make(map[string]int) rootCoordResp, rootCoordErr := s.rootcoordServer.GetMetrics(ctx, in) var rootCoordTopology metricsinfo.RootCoordTopology rootCoordRoleName := "" if rootCoordErr == nil && rootCoordResp != nil { rootCoordRoleName = rootCoordResp.GetComponentName() rootCoordErr = metricsinfo.UnmarshalTopology(rootCoordResp.GetResponse(), &rootCoordTopology) identifierMap[rootCoordRoleName] = int(rootCoordTopology.Self.ID) } dataCoordResp, dataCoordErr := s.datacoordServer.GetMetrics(ctx, in) var dataCoordTopology metricsinfo.DataCoordTopology dataCoordRoleName := "" if dataCoordErr == nil && dataCoordResp != nil { dataCoordRoleName = dataCoordResp.GetComponentName() dataCoordErr = metricsinfo.UnmarshalTopology(dataCoordResp.GetResponse(), &dataCoordTopology) identifierMap[dataCoordRoleName] = int(dataCoordTopology.Cluster.Self.ID) } queryCoordResp, queryCoordErr := s.queryCoordServer.GetMetrics(ctx, in) var queryCoordTopology metricsinfo.QueryCoordTopology queryCoordRoleName := "" if queryCoordErr == nil && queryCoordResp != nil { queryCoordRoleName = queryCoordResp.GetComponentName() queryCoordErr = metricsinfo.UnmarshalTopology(queryCoordResp.GetResponse(), &queryCoordTopology) identifierMap[queryCoordRoleName] = int(queryCoordTopology.Cluster.Self.ID) } if rootCoordErr == nil && rootCoordResp != nil { rootCoordTopologyNode := metricsinfo.SystemTopologyNode{ Identifier: identifierMap[rootCoordRoleName], Connected: make([]metricsinfo.ConnectionEdge, 0), Infos: &rootCoordTopology.Self, } if dataCoordErr == nil && dataCoordResp != nil { rootCoordTopologyNode.Connected = append(rootCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ ConnectedIdentifier: identifierMap[dataCoordRoleName], Type: metricsinfo.Forward, TargetType: typeutil.DataCoordRole, }) } if queryCoordErr == nil && queryCoordResp != nil { rootCoordTopologyNode.Connected = append(rootCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ ConnectedIdentifier: identifierMap[queryCoordRoleName], Type: metricsinfo.Forward, TargetType: typeutil.QueryCoordRole, }) } systemTopology.NodesInfo = append(systemTopology.NodesInfo, rootCoordTopologyNode) } if dataCoordErr == nil && dataCoordResp != nil { dataCoordTopologyNode := metricsinfo.SystemTopologyNode{ Identifier: identifierMap[dataCoordRoleName], Connected: make([]metricsinfo.ConnectionEdge, 0), Infos: &dataCoordTopology.Cluster.Self, } if rootCoordErr == nil && rootCoordResp != nil { dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ ConnectedIdentifier: identifierMap[rootCoordRoleName], Type: metricsinfo.Forward, TargetType: typeutil.RootCoordRole, }) } if queryCoordErr == nil && queryCoordResp != nil { dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ ConnectedIdentifier: identifierMap[queryCoordRoleName], Type: metricsinfo.Forward, TargetType: typeutil.QueryCoordRole, }) } for _, dataNode := range dataCoordTopology.Cluster.ConnectedDataNodes { node := dataNode identifier := int(node.ID) identifierMap[dataNode.Name] = identifier dataNodeTopologyNode := metricsinfo.SystemTopologyNode{ Identifier: identifier, Connected: nil, Infos: &node, } systemTopology.NodesInfo = append(systemTopology.NodesInfo, dataNodeTopologyNode) dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ ConnectedIdentifier: identifier, Type: metricsinfo.CoordConnectToNode, TargetType: typeutil.DataNodeRole, }) } systemTopology.NodesInfo = append(systemTopology.NodesInfo, dataCoordTopologyNode) } if queryCoordErr == nil && queryCoordResp != nil { queryCoordTopologyNode := metricsinfo.SystemTopologyNode{ Identifier: identifierMap[queryCoordRoleName], Connected: make([]metricsinfo.ConnectionEdge, 0), Infos: &queryCoordTopology.Cluster.Self, } if rootCoordErr == nil && rootCoordResp != nil { queryCoordTopologyNode.Connected = append(queryCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ ConnectedIdentifier: identifierMap[rootCoordRoleName], Type: metricsinfo.Forward, TargetType: typeutil.RootCoordRole, }) } if dataCoordErr == nil && dataCoordResp != nil { queryCoordTopologyNode.Connected = append(queryCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ ConnectedIdentifier: identifierMap[dataCoordRoleName], Type: metricsinfo.Forward, TargetType: typeutil.DataCoordRole, }) } for _, queryNode := range queryCoordTopology.Cluster.ConnectedNodes { node := queryNode identifier := int(node.ID) identifierMap[queryNode.Name] = identifier queryNodeTopologyNode := metricsinfo.SystemTopologyNode{ Identifier: identifier, Connected: nil, Infos: &node, } systemTopology.NodesInfo = append(systemTopology.NodesInfo, queryNodeTopologyNode) queryCoordTopologyNode.Connected = append(queryCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ ConnectedIdentifier: identifier, Type: metricsinfo.CoordConnectToNode, TargetType: typeutil.QueryNodeRole, }) } systemTopology.NodesInfo = append(systemTopology.NodesInfo, queryCoordTopologyNode) } resp, err := metricsinfo.MarshalTopology(systemTopology) if err != nil { return &milvuspb.GetMetricsResponse{ Status: merr.Status(err), Response: "", ComponentName: metricsinfo.ConstructComponentName(typeutil.MixCoordRole, paramtable.GetNodeID()), }, nil } return &milvuspb.GetMetricsResponse{ Status: merr.Success(), Response: resp, ComponentName: metricsinfo.ConstructComponentName(typeutil.MixCoordRole, paramtable.GetNodeID()), }, nil } // GetMetrics get metrics func (s *mixCoordImpl) GetDcMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { return s.datacoordServer.GetMetrics(ctx, in) } func (s *mixCoordImpl) GetQcMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { return s.queryCoordServer.GetMetrics(ctx, in) } // QueryCoordServer func (s *mixCoordImpl) ActivateChecker(ctx context.Context, req *querypb.ActivateCheckerRequest) (*commonpb.Status, error) { return s.queryCoordServer.ActivateChecker(ctx, req) } func (s *mixCoordImpl) DeactivateChecker(ctx context.Context, req *querypb.DeactivateCheckerRequest) (*commonpb.Status, error) { return s.queryCoordServer.DeactivateChecker(ctx, req) } func (s *mixCoordImpl) ListCheckers(ctx context.Context, req *querypb.ListCheckersRequest) (*querypb.ListCheckersResponse, error) { return s.queryCoordServer.ListCheckers(ctx, req) } func (s *mixCoordImpl) ShowLoadCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) { return s.queryCoordServer.ShowLoadCollections(ctx, req) } func (s *mixCoordImpl) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) { return s.queryCoordServer.LoadCollection(ctx, req) } func (s *mixCoordImpl) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { return s.queryCoordServer.ReleaseCollection(ctx, req) } func (s *mixCoordImpl) ShowLoadPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) { return s.queryCoordServer.ShowLoadPartitions(ctx, req) } func (s *mixCoordImpl) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) { return s.queryCoordServer.LoadPartitions(ctx, req) } func (s *mixCoordImpl) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) { return s.queryCoordServer.ReleasePartitions(ctx, req) } func (s *mixCoordImpl) SyncNewCreatedPartition(ctx context.Context, req *querypb.SyncNewCreatedPartitionRequest) (*commonpb.Status, error) { return s.queryCoordServer.SyncNewCreatedPartition(ctx, req) } func (s *mixCoordImpl) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) { return s.queryCoordServer.GetPartitionStates(ctx, req) } func (s *mixCoordImpl) GetLoadSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) { return s.queryCoordServer.GetLoadSegmentInfo(ctx, req) } func (s *mixCoordImpl) LoadBalance(ctx context.Context, req *querypb.LoadBalanceRequest) (*commonpb.Status, error) { return s.queryCoordServer.LoadBalance(ctx, req) } func (s *mixCoordImpl) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) { return s.queryCoordServer.GetReplicas(ctx, req) } func (s *mixCoordImpl) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) { return s.queryCoordServer.GetShardLeaders(ctx, req) } func (s *mixCoordImpl) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) { return s.queryCoordServer.CreateResourceGroup(ctx, req) } func (s *mixCoordImpl) UpdateResourceGroups(ctx context.Context, req *querypb.UpdateResourceGroupsRequest) (*commonpb.Status, error) { return s.queryCoordServer.UpdateResourceGroups(ctx, req) } func (s *mixCoordImpl) DropResourceGroup(ctx context.Context, req *milvuspb.DropResourceGroupRequest) (*commonpb.Status, error) { return s.queryCoordServer.DropResourceGroup(ctx, req) } func (s *mixCoordImpl) TransferNode(ctx context.Context, req *milvuspb.TransferNodeRequest) (*commonpb.Status, error) { return s.queryCoordServer.TransferNode(ctx, req) } func (s *mixCoordImpl) TransferReplica(ctx context.Context, req *querypb.TransferReplicaRequest) (*commonpb.Status, error) { return s.queryCoordServer.TransferReplica(ctx, req) } func (s *mixCoordImpl) ListResourceGroups(ctx context.Context, req *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error) { return s.queryCoordServer.ListResourceGroups(ctx, req) } func (s *mixCoordImpl) DescribeResourceGroup(ctx context.Context, req *querypb.DescribeResourceGroupRequest) (*querypb.DescribeResourceGroupResponse, error) { return s.queryCoordServer.DescribeResourceGroup(ctx, req) } func (s *mixCoordImpl) ListQueryNode(ctx context.Context, req *querypb.ListQueryNodeRequest) (*querypb.ListQueryNodeResponse, error) { return s.queryCoordServer.ListQueryNode(ctx, req) } func (s *mixCoordImpl) GetQueryNodeDistribution(ctx context.Context, req *querypb.GetQueryNodeDistributionRequest) (*querypb.GetQueryNodeDistributionResponse, error) { return s.queryCoordServer.GetQueryNodeDistribution(ctx, req) } func (s *mixCoordImpl) SuspendBalance(ctx context.Context, req *querypb.SuspendBalanceRequest) (*commonpb.Status, error) { return s.queryCoordServer.SuspendBalance(ctx, req) } func (s *mixCoordImpl) ResumeBalance(ctx context.Context, req *querypb.ResumeBalanceRequest) (*commonpb.Status, error) { return s.queryCoordServer.ResumeBalance(ctx, req) } func (s *mixCoordImpl) CheckBalanceStatus(ctx context.Context, req *querypb.CheckBalanceStatusRequest) (*querypb.CheckBalanceStatusResponse, error) { return s.queryCoordServer.CheckBalanceStatus(ctx, req) } func (s *mixCoordImpl) SuspendNode(ctx context.Context, req *querypb.SuspendNodeRequest) (*commonpb.Status, error) { return s.queryCoordServer.SuspendNode(ctx, req) } func (s *mixCoordImpl) ResumeNode(ctx context.Context, req *querypb.ResumeNodeRequest) (*commonpb.Status, error) { return s.queryCoordServer.ResumeNode(ctx, req) } func (s *mixCoordImpl) TransferSegment(ctx context.Context, req *querypb.TransferSegmentRequest) (*commonpb.Status, error) { return s.queryCoordServer.TransferSegment(ctx, req) } func (s *mixCoordImpl) TransferChannel(ctx context.Context, req *querypb.TransferChannelRequest) (*commonpb.Status, error) { return s.queryCoordServer.TransferChannel(ctx, req) } func (s *mixCoordImpl) CheckQueryNodeDistribution(ctx context.Context, req *querypb.CheckQueryNodeDistributionRequest) (*commonpb.Status, error) { return s.queryCoordServer.CheckQueryNodeDistribution(ctx, req) } func (s *mixCoordImpl) UpdateLoadConfig(ctx context.Context, req *querypb.UpdateLoadConfigRequest) (*commonpb.Status, error) { return s.queryCoordServer.UpdateLoadConfig(ctx, req) } // DataCoordServer func (s *mixCoordImpl) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { return s.datacoordServer.GetSegmentInfo(ctx, req) } func (s *mixCoordImpl) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) { return s.datacoordServer.Flush(ctx, req) } func (s *mixCoordImpl) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) { return s.datacoordServer.AssignSegmentID(ctx, req) } func (s *mixCoordImpl) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { return s.datacoordServer.GetSegmentStates(ctx, req) } func (s *mixCoordImpl) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) { return s.datacoordServer.GetInsertBinlogPaths(ctx, req) } func (s *mixCoordImpl) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) { return s.datacoordServer.GetCollectionStatistics(ctx, req) } func (s *mixCoordImpl) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) { return s.datacoordServer.GetPartitionStatistics(ctx, req) } func (s *mixCoordImpl) GetSegmentInfoChannel(ctx context.Context, req *datapb.GetSegmentInfoChannelRequest) (*milvuspb.StringResponse, error) { return s.datacoordServer.GetSegmentInfoChannel(ctx, req) } func (s *mixCoordImpl) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) { return s.datacoordServer.SaveBinlogPaths(ctx, req) } func (s *mixCoordImpl) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) { return s.datacoordServer.GetRecoveryInfo(ctx, req) } func (s *mixCoordImpl) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryInfoRequestV2) (*datapb.GetRecoveryInfoResponseV2, error) { return s.datacoordServer.GetRecoveryInfoV2(ctx, req) } func (s *mixCoordImpl) GetChannelRecoveryInfo(ctx context.Context, req *datapb.GetChannelRecoveryInfoRequest) (*datapb.GetChannelRecoveryInfoResponse, error) { return s.datacoordServer.GetChannelRecoveryInfo(ctx, req) } func (s *mixCoordImpl) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) { return s.datacoordServer.GetFlushedSegments(ctx, req) } func (s *mixCoordImpl) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegmentsByStatesRequest) (*datapb.GetSegmentsByStatesResponse, error) { return s.datacoordServer.GetSegmentsByStates(ctx, req) } func (s *mixCoordImpl) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) { return s.datacoordServer.ManualCompaction(ctx, req) } func (s *mixCoordImpl) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) { return s.datacoordServer.GetCompactionState(ctx, req) } func (s *mixCoordImpl) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) { return s.datacoordServer.GetCompactionStateWithPlans(ctx, req) } func (s *mixCoordImpl) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) { return s.datacoordServer.WatchChannels(ctx, req) } func (s *mixCoordImpl) GetFlushState(ctx context.Context, req *datapb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) { return s.datacoordServer.GetFlushState(ctx, req) } func (s *mixCoordImpl) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAllStateRequest) (*milvuspb.GetFlushAllStateResponse, error) { return s.datacoordServer.GetFlushAllState(ctx, req) } func (s *mixCoordImpl) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) { return s.datacoordServer.DropVirtualChannel(ctx, req) } func (s *mixCoordImpl) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest) (*datapb.SetSegmentStateResponse, error) { return s.datacoordServer.SetSegmentState(ctx, req) } func (s *mixCoordImpl) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) (*commonpb.Status, error) { return s.datacoordServer.UpdateSegmentStatistics(ctx, req) } func (s *mixCoordImpl) UpdateChannelCheckpoint(ctx context.Context, req *datapb.UpdateChannelCheckpointRequest) (*commonpb.Status, error) { return s.datacoordServer.UpdateChannelCheckpoint(ctx, req) } func (s *mixCoordImpl) MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmentsDroppedRequest) (*commonpb.Status, error) { return s.datacoordServer.MarkSegmentsDropped(ctx, req) } func (s *mixCoordImpl) BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) { return s.datacoordServer.BroadcastAlteredCollection(ctx, req) } func (s *mixCoordImpl) GcConfirm(ctx context.Context, req *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) { return s.datacoordServer.GcConfirm(ctx, req) } func (s *mixCoordImpl) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) { return s.datacoordServer.CreateIndex(ctx, req) } func (s *mixCoordImpl) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest) (*commonpb.Status, error) { return s.datacoordServer.AlterIndex(ctx, req) } func (s *mixCoordImpl) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) { return s.datacoordServer.GetIndexState(ctx, req) } func (s *mixCoordImpl) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegmentIndexStateRequest) (*indexpb.GetSegmentIndexStateResponse, error) { return s.datacoordServer.GetSegmentIndexState(ctx, req) } func (s *mixCoordImpl) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoRequest) (*indexpb.GetIndexInfoResponse, error) { return s.datacoordServer.GetIndexInfos(ctx, req) } func (s *mixCoordImpl) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) { return s.datacoordServer.DescribeIndex(ctx, req) } func (s *mixCoordImpl) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexStatisticsRequest) (*indexpb.GetIndexStatisticsResponse, error) { return s.datacoordServer.GetIndexStatistics(ctx, req) } func (s *mixCoordImpl) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) { return s.datacoordServer.DropIndex(ctx, req) } func (s *mixCoordImpl) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest) (*indexpb.GetIndexBuildProgressResponse, error) { return s.datacoordServer.GetIndexBuildProgress(ctx, req) } func (s *mixCoordImpl) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDataNodeTtMsgsRequest) (*commonpb.Status, error) { return s.datacoordServer.ReportDataNodeTtMsgs(ctx, req) } func (s *mixCoordImpl) GcControl(ctx context.Context, req *datapb.GcControlRequest) (*commonpb.Status, error) { return s.datacoordServer.GcControl(ctx, req) } func (s *mixCoordImpl) ImportV2(ctx context.Context, req *internalpb.ImportRequestInternal) (*internalpb.ImportResponse, error) { return s.datacoordServer.ImportV2(ctx, req) } func (s *mixCoordImpl) GetImportProgress(ctx context.Context, req *internalpb.GetImportProgressRequest) (*internalpb.GetImportProgressResponse, error) { return s.datacoordServer.GetImportProgress(ctx, req) } func (s *mixCoordImpl) ListImports(ctx context.Context, req *internalpb.ListImportsRequestInternal) (*internalpb.ListImportsResponse, error) { return s.datacoordServer.ListImports(ctx, req) } func (s *mixCoordImpl) ListIndexes(ctx context.Context, req *indexpb.ListIndexesRequest) (*indexpb.ListIndexesResponse, error) { return s.datacoordServer.ListIndexes(ctx, req) } func (s *mixCoordImpl) AllocSegment(ctx context.Context, req *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error) { return s.datacoordServer.AllocSegment(ctx, req) } func (s *mixCoordImpl) NotifyDropPartition(ctx context.Context, channel string, partitionIDs []int64) error { return s.datacoordServer.NotifyDropPartition(ctx, channel, partitionIDs) } // RegisterStreamingCoordGRPCService registers the grpc service of streaming coordinator. func (s *mixCoordImpl) RegisterStreamingCoordGRPCService(server *grpc.Server) { s.streamingCoord.RegisterGRPCService(server) } func (s *mixCoordImpl) GetQuotaMetrics(ctx context.Context, req *internalpb.GetQuotaMetricsRequest) (*internalpb.GetQuotaMetricsResponse, error) { return s.rootcoordServer.GetQuotaMetrics(ctx, req) } func (s *mixCoordImpl) ListLoadedSegments(ctx context.Context, req *querypb.ListLoadedSegmentsRequest) (*querypb.ListLoadedSegmentsResponse, error) { return s.queryCoordServer.ListLoadedSegments(ctx, req) } func (s *mixCoordImpl) FlushAll(ctx context.Context, req *datapb.FlushAllRequest) (*datapb.FlushAllResponse, error) { return s.datacoordServer.FlushAll(ctx, req) } // AddFileResource add file resource func (s *mixCoordImpl) AddFileResource(ctx context.Context, req *milvuspb.AddFileResourceRequest) (*commonpb.Status, error) { return s.datacoordServer.AddFileResource(ctx, req) } // RemoveFileResource remove file resource func (s *mixCoordImpl) RemoveFileResource(ctx context.Context, req *milvuspb.RemoveFileResourceRequest) (*commonpb.Status, error) { return s.datacoordServer.RemoveFileResource(ctx, req) } // ListFileResources list file resources func (s *mixCoordImpl) ListFileResources(ctx context.Context, req *milvuspb.ListFileResourcesRequest) (*milvuspb.ListFileResourcesResponse, error) { return s.datacoordServer.ListFileResources(ctx, req) }