From f146d3825f4850f68c1acfee9639b1c3bf5bd6a8 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Wed, 30 Jun 2021 17:48:19 +0800 Subject: [PATCH] Fix search hang after querynode restart (#6212) Signed-off-by: xige-16 --- internal/distributed/querynode/service.go | 35 --- internal/querycoord/cluster.go | 50 +++- internal/querycoord/task.go | 326 ++++++++++------------ internal/querynode/query_collection.go | 1 + internal/querynode/query_node.go | 46 --- internal/querynode/query_node_test.go | 4 - 6 files changed, 179 insertions(+), 283 deletions(-) diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 31b2917302..454df0c356 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -31,7 +31,6 @@ import ( grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" dsc "github.com/milvus-io/milvus/internal/distributed/datacoord/client" isc "github.com/milvus-io/milvus/internal/distributed/indexcoord/client" - qcc "github.com/milvus-io/milvus/internal/distributed/querycoord/client" rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" @@ -59,7 +58,6 @@ type Server struct { dataCoord *dsc.Client rootCoord *rcc.GrpcClient indexCoord *isc.Client - queryCoord *qcc.Client closer io.Closer } @@ -101,35 +99,6 @@ func (s *Server) init() error { if err := s.querynode.Register(); err != nil { return err } - // --- QueryCoord --- - log.Debug("QueryNode start to new QueryCoordClient", zap.Any("QueryCoordAddress", Params.QueryCoordAddress)) - queryCoord, err := qcc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints) - if err != nil { - log.Debug("QueryNode new QueryCoordClient failed", zap.Error(err)) - panic(err) - } - - if err = queryCoord.Init(); err != nil { - log.Debug("QueryNode QueryCoordClient Init failed", zap.Error(err)) - panic(err) - } - - if err = queryCoord.Start(); err != nil { - log.Debug("QueryNode QueryCoordClient Start failed", zap.Error(err)) - panic(err) - } - - log.Debug("QueryNode start to wait for QueryCoord ready") - err = funcutil.WaitForComponentInitOrHealthy(s.ctx, queryCoord, "QueryCoord", 1000000, time.Millisecond*200) - if err != nil { - log.Debug("QueryNode wait for QueryCoord ready failed", zap.Error(err)) - panic(err) - } - log.Debug("QueryNode report QueryCoord is ready") - - if err := s.SetQueryCoord(queryCoord); err != nil { - panic(err) - } // --- RootCoord Client --- //ms.Params.Init() @@ -315,10 +284,6 @@ func (s *Server) SetRootCoord(rootCoord types.RootCoord) error { return s.querynode.SetRootCoord(rootCoord) } -func (s *Server) SetQueryCoord(queryCoord types.QueryCoord) error { - return s.querynode.SetQueryCoord(queryCoord) -} - func (s *Server) SetIndexCoord(indexCoord types.IndexCoord) error { return s.querynode.SetIndexCoord(indexCoord) } diff --git a/internal/querycoord/cluster.go b/internal/querycoord/cluster.go index 2ac944f007..214c25dc00 100644 --- a/internal/querycoord/cluster.go +++ b/internal/querycoord/cluster.go @@ -109,11 +109,11 @@ func (c *queryNodeCluster) GetComponentInfos(ctx context.Context) ([]*internalpb c.RLock() defer c.RUnlock() subComponentInfos := make([]*internalpb.ComponentInfo, 0) - nodeIDs, err := c.getOnServiceNodeIDs() + nodes, err := c.getOnServiceNodes() if err != nil { return nil, err } - for _, nodeID := range nodeIDs { + for nodeID := range nodes { node := c.nodes[nodeID] componentStates, err := node.client.GetComponentStates(ctx) if err != nil { @@ -228,7 +228,7 @@ func (c *queryNodeCluster) WatchDmChannels(ctx context.Context, nodeID int64, in if err == nil && status.ErrorCode == commonpb.ErrorCode_Success { collectionID := in.CollectionID //c.clusterMeta.addCollection(collectionID, in.Schema) - //c.clusterMeta.addDmChannel(collectionID, nodeID, channels) + c.clusterMeta.addDmChannel(collectionID, nodeID, channels) node.addCollection(collectionID, in.Schema) node.addDmChannel(collectionID, channels) @@ -328,12 +328,12 @@ func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSe defer c.Unlock() segmentInfos := make([]*querypb.SegmentInfo, 0) - nodes, err := c.getOnServiceNodeIDs() + nodes, err := c.getOnServiceNodes() if err != nil { log.Warn(err.Error()) return segmentInfos, nil } - for _, nodeID := range nodes { + for nodeID := range nodes { res, err := c.nodes[nodeID].client.GetSegmentInfo(ctx, in) if err != nil { return nil, err @@ -407,6 +407,17 @@ func (c *queryNodeCluster) RegisterNode(ctx context.Context, session *sessionuti return fmt.Errorf("node %d alredy exists in cluster", id) } +func (c *queryNodeCluster) getNodeByID(nodeID int64) (*queryNode, error) { + c.RLock() + defer c.RUnlock() + + if node, ok := c.nodes[nodeID]; ok { + return node, nil + } + + return nil, fmt.Errorf("query node %d not exist", nodeID) +} + func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error { c.Lock() defer c.Unlock() @@ -427,25 +438,36 @@ func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error { return nil } -func (c *queryNodeCluster) onServiceNodeIDs() ([]int64, error) { - c.Lock() - defer c.Unlock() +func (c *queryNodeCluster) onServiceNodes() (map[int64]*queryNode, error) { + c.RLock() + defer c.RUnlock() - return c.getOnServiceNodeIDs() + return c.getOnServiceNodes() } -func (c *queryNodeCluster) getOnServiceNodeIDs() ([]int64, error) { - nodeIDs := make([]int64, 0) +func (c *queryNodeCluster) getOnServiceNodes() (map[int64]*queryNode, error) { + nodes := make(map[int64]*queryNode) for nodeID, node := range c.nodes { if node.isOnService() { - nodeIDs = append(nodeIDs, nodeID) + nodes[nodeID] = node } } - if len(nodeIDs) == 0 { + if len(nodes) == 0 { return nil, errors.New("no queryNode is alive") } - return nodeIDs, nil + return nodes, nil +} + +func (c *queryNodeCluster) isOnService(nodeID int64) (bool, error) { + c.Lock() + defer c.Unlock() + + if node, ok := c.nodes[nodeID]; ok { + return node.isOnService(), nil + } + + return false, fmt.Errorf("query node %d not exist", nodeID) } func (c *queryNodeCluster) printMeta() { diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index f6364a205d..ab4d3765b7 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -15,6 +15,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/golang/protobuf/proto" "go.uber.org/zap" @@ -307,10 +308,11 @@ func (lct *LoadCollectionTask) PostExecute(ctx context.Context) error { lct.meta.addCollection(collectionID, lct.Schema) if lct.result.ErrorCode != commonpb.ErrorCode_Success { lct.childTasks = make([]task, 0) - for nodeID, node := range lct.cluster.nodes { - if !node.isOnService() { - continue - } + nodes, err := lct.cluster.onServiceNodes() + if err != nil { + log.Debug(err.Error()) + } + for nodeID := range nodes { req := &querypb.ReleaseCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleaseCollection, @@ -382,10 +384,11 @@ func (rct *ReleaseCollectionTask) Execute(ctx context.Context) error { } if rct.NodeID <= 0 { - for nodeID, node := range rct.cluster.nodes { - if !node.isOnService() { - continue - } + nodes, err := rct.cluster.onServiceNodes() + if err != nil { + log.Debug(err.Error()) + } + for nodeID := range nodes { req := proto.Clone(rct.ReleaseCollectionRequest).(*querypb.ReleaseCollectionRequest) req.NodeID = nodeID releaseCollectionTask := &ReleaseCollectionTask{ @@ -552,10 +555,11 @@ func (lpt *LoadPartitionTask) PostExecute(ctx context.Context) error { if lpt.result.ErrorCode != commonpb.ErrorCode_Success { lpt.childTasks = make([]task, 0) if lpt.addCol { - for nodeID, node := range lpt.cluster.nodes { - if !node.isOnService() { - continue - } + nodes, err := lpt.cluster.onServiceNodes() + if err != nil { + log.Debug(err.Error()) + } + for nodeID := range nodes { req := &querypb.ReleaseCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleaseCollection, @@ -580,10 +584,11 @@ func (lpt *LoadPartitionTask) PostExecute(ctx context.Context) error { log.Debug("loadPartitionTask: add a releaseCollectionTask to loadPartitionTask's childTask", zap.Any("task", releaseCollectionTask)) } } else { - for nodeID, node := range lpt.cluster.nodes { - if !node.isOnService() { - continue - } + nodes, err := lpt.cluster.onServiceNodes() + if err != nil { + log.Debug(err.Error()) + } + for nodeID := range nodes { req := &querypb.ReleasePartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleasePartitions, @@ -661,10 +666,11 @@ func (rpt *ReleasePartitionTask) Execute(ctx context.Context) error { } if rpt.NodeID <= 0 { - for nodeID, node := range rpt.cluster.nodes { - if !node.isOnService() { - continue - } + nodes, err := rpt.cluster.onServiceNodes() + if err != nil { + log.Debug(err.Error()) + } + for nodeID := range nodes { req := proto.Clone(rpt.ReleasePartitionsRequest).(*querypb.ReleasePartitionsRequest) req.NodeID = nodeID releasePartitionTask := &ReleasePartitionTask{ @@ -734,7 +740,12 @@ func (lst *LoadSegmentTask) Marshal() string { } func (lst *LoadSegmentTask) IsValid() bool { - return lst.ctx != nil && lst.cluster.nodes[lst.NodeID].isOnService() + onService, err := lst.cluster.isOnService(lst.NodeID) + if err != nil { + return false + } + + return lst.ctx != nil && onService } func (lst *LoadSegmentTask) Type() commonpb.MsgType { @@ -854,7 +865,11 @@ func (rst *ReleaseSegmentTask) Marshal() string { } func (rst *ReleaseSegmentTask) IsValid() bool { - return rst.ctx != nil && rst.cluster.nodes[rst.NodeID].isOnService() + onService, err := rst.cluster.isOnService(rst.NodeID) + if err != nil { + return false + } + return rst.ctx != nil && onService } func (rst *ReleaseSegmentTask) Type() commonpb.MsgType { @@ -912,7 +927,11 @@ func (wdt *WatchDmChannelTask) Marshal() string { } func (wdt *WatchDmChannelTask) IsValid() bool { - return wdt.ctx != nil && wdt.cluster.nodes[wdt.NodeID].isOnService() + onService, err := wdt.cluster.isOnService(wdt.NodeID) + if err != nil { + return false + } + return wdt.ctx != nil && onService } func (wdt *WatchDmChannelTask) Type() commonpb.MsgType { @@ -1036,7 +1055,12 @@ func (wqt *WatchQueryChannelTask) Marshal() string { } func (wqt *WatchQueryChannelTask) IsValid() bool { - return wqt.ctx != nil && wqt.cluster.nodes[wqt.NodeID].isOnService() + onService, err := wqt.cluster.isOnService(wqt.NodeID) + if err != nil { + return false + } + + return wqt.ctx != nil && onService } func (wqt *WatchQueryChannelTask) Type() commonpb.MsgType { @@ -1127,18 +1151,27 @@ func (lbt *LoadBalanceTask) Execute(ctx context.Context) error { if lbt.triggerCondition == querypb.TriggerCondition_nodeDown { for _, nodeID := range lbt.SourceNodeIDs { + node, err := lbt.cluster.getNodeByID(nodeID) + if err != nil { + log.Error(err.Error()) + continue + } lbt.meta.deleteSegmentInfoByNodeID(nodeID) - collectionInfos := lbt.cluster.nodes[nodeID].collectionInfos + collectionInfos := node.collectionInfos for collectionID, info := range collectionInfos { - loadCollection := lbt.meta.collectionInfos[collectionID].LoadCollection - schema := lbt.meta.collectionInfos[collectionID].Schema + metaInfo, err := lbt.meta.getCollectionInfoByID(collectionID) + if err != nil { + log.Error(err.Error()) + continue + } + loadCollection := metaInfo.LoadCollection + schema := metaInfo.Schema partitionIDs := info.PartitionIDs segmentsToLoad := make([]UniqueID, 0) - segment2BingLog := make(map[UniqueID]*querypb.SegmentLoadInfo) + loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0) channelsToWatch := make([]string, 0) - watchRequestsInPartition := make([]*querypb.WatchDmChannelsRequest, 0) - watchRequestsInCollection := make(map[string]*querypb.WatchDmChannelsRequest) + watchDmChannelReqs := make([]*querypb.WatchDmChannelsRequest, 0) dmChannels, err := lbt.meta.getDmChannelsByNodeID(collectionID, nodeID) if err != nil { @@ -1162,148 +1195,68 @@ func (lbt *LoadBalanceTask) Execute(ctx context.Context) error { return err } + for _, segmentBingLog := range recoveryInfo.Binlogs { + segmentID := segmentBingLog.SegmentID + segmentLoadInfo := &querypb.SegmentLoadInfo{ + SegmentID: segmentID, + PartitionID: partitionID, + CollectionID: collectionID, + BinlogPaths: segmentBingLog.FieldBinlogs, + } + + loadSegmentReq := &querypb.LoadSegmentsRequest{ + Base: lbt.Base, + Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo}, + Schema: schema, + LoadCondition: querypb.TriggerCondition_nodeDown, + } + + segmentsToLoad = append(segmentsToLoad, segmentID) + loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) + } + for _, channelInfo := range recoveryInfo.Channels { for _, channel := range dmChannels { if channelInfo.ChannelName == channel { - watchRequest := &querypb.WatchDmChannelsRequest{ - Base: lbt.Base, - CollectionID: collectionID, - Infos: []*datapb.VchannelInfo{channelInfo}, - Schema: schema, - } if loadCollection { - if _, ok := watchRequestsInCollection[channel]; !ok { - watchRequestsInCollection[channel] = watchRequest + merged := false + for index, channelName := range channelsToWatch { + if channel == channelName { + merged = true + oldInfo := watchDmChannelReqs[index].Infos[0] + newInfo := mergeVChannelInfo(oldInfo, channelInfo) + watchDmChannelReqs[index].Infos = []*datapb.VchannelInfo{newInfo} + break + } + } + if !merged { + watchRequest := &querypb.WatchDmChannelsRequest{ + Base: lbt.Base, + CollectionID: collectionID, + Infos: []*datapb.VchannelInfo{channelInfo}, + Schema: schema, + } channelsToWatch = append(channelsToWatch, channel) - } else { - oldInfo := watchRequestsInCollection[channel].Infos[0] - newInfo := mergeVChannelInfo(oldInfo, channelInfo) - watchRequestsInCollection[channel].Infos = []*datapb.VchannelInfo{newInfo} + watchDmChannelReqs = append(watchDmChannelReqs, watchRequest) } } else { - watchRequest.PartitionID = partitionID + watchRequest := &querypb.WatchDmChannelsRequest{ + Base: lbt.Base, + CollectionID: collectionID, + PartitionID: partitionID, + Infos: []*datapb.VchannelInfo{channelInfo}, + Schema: schema, + } channelsToWatch = append(channelsToWatch, channel) - watchRequestsInPartition = append(watchRequestsInPartition, watchRequest) + watchDmChannelReqs = append(watchDmChannelReqs, watchRequest) } break } } } - - for _, binlog := range recoveryInfo.Binlogs { - segmentID := binlog.SegmentID - if lbt.meta.hasSegmentInfo(segmentID) { - continue - } - segmentLoadInfo := &querypb.SegmentLoadInfo{ - SegmentID: segmentID, - PartitionID: partitionID, - CollectionID: collectionID, - BinlogPaths: make([]*datapb.FieldBinlog, 0), - } - segmentLoadInfo.BinlogPaths = append(segmentLoadInfo.BinlogPaths, binlog.FieldBinlogs...) - segmentsToLoad = append(segmentsToLoad, segmentID) - segment2BingLog[segmentID] = segmentLoadInfo - } - } - - segment2Nodes := shuffleSegmentsToQueryNode(segmentsToLoad, lbt.cluster) - watchRequest2Nodes := shuffleChannelsToQueryNode(channelsToWatch, lbt.cluster) - - watchQueryChannelInfo := make(map[int64]bool) - node2Segments := make(map[int64][]*querypb.SegmentLoadInfo) - for index, id := range segment2Nodes { - if _, ok := node2Segments[id]; !ok { - node2Segments[id] = make([]*querypb.SegmentLoadInfo, 0) - } - segmentID := segmentsToLoad[index] - node2Segments[id] = append(node2Segments[id], segment2BingLog[segmentID]) - if lbt.cluster.hasWatchedQueryChannel(lbt.ctx, id, collectionID) { - watchQueryChannelInfo[id] = true - continue - } - watchQueryChannelInfo[id] = false - } - for _, id := range watchRequest2Nodes { - if lbt.cluster.hasWatchedQueryChannel(lbt.ctx, id, collectionID) { - watchQueryChannelInfo[id] = true - continue - } - watchQueryChannelInfo[id] = false - } - - for id, segmentInfos := range node2Segments { - loadSegmentsRequest := &querypb.LoadSegmentsRequest{ - Base: lbt.Base, - NodeID: id, - Infos: segmentInfos, - Schema: schema, - LoadCondition: querypb.TriggerCondition_grpcRequest, - } - - loadSegmentTask := &LoadSegmentTask{ - BaseTask: BaseTask{ - ctx: lbt.ctx, - Condition: NewTaskCondition(lbt.ctx), - triggerCondition: querypb.TriggerCondition_grpcRequest, - }, - - LoadSegmentsRequest: loadSegmentsRequest, - meta: lbt.meta, - cluster: lbt.cluster, - } - lbt.AddChildTask(loadSegmentTask) - log.Debug("LoadBalanceTask: add a loadSegmentTask to loadBalanceTask's childTask", zap.Any("task", loadSegmentTask)) - } - - for index, id := range watchRequest2Nodes { - var watchRequest *querypb.WatchDmChannelsRequest - if loadCollection { - channel := channelsToWatch[index] - watchRequest = watchRequestsInCollection[channel] - } else { - watchRequest = watchRequestsInPartition[index] - } - watchRequest.NodeID = id - watchDmChannelTask := &WatchDmChannelTask{ - BaseTask: BaseTask{ - ctx: lbt.ctx, - Condition: NewTaskCondition(lbt.ctx), - triggerCondition: querypb.TriggerCondition_grpcRequest, - }, - WatchDmChannelsRequest: watchRequest, - meta: lbt.meta, - cluster: lbt.cluster, - } - lbt.AddChildTask(watchDmChannelTask) - log.Debug("LoadBalanceTask: add a watchDmChannelTask to loadBalanceTask's childTask", zap.Any("task", watchDmChannelTask)) - } - - for id, watched := range watchQueryChannelInfo { - if !watched { - queryChannel, queryResultChannel := lbt.meta.GetQueryChannel(collectionID) - - addQueryChannelRequest := &querypb.AddQueryChannelRequest{ - Base: lbt.Base, - NodeID: id, - CollectionID: collectionID, - RequestChannelID: queryChannel, - ResultChannelID: queryResultChannel, - } - watchQueryChannelTask := &WatchQueryChannelTask{ - BaseTask: BaseTask{ - ctx: lbt.ctx, - Condition: NewTaskCondition(lbt.ctx), - triggerCondition: querypb.TriggerCondition_grpcRequest, - }, - - AddQueryChannelRequest: addQueryChannelRequest, - cluster: lbt.cluster, - } - lbt.AddChildTask(watchQueryChannelTask) - log.Debug("LoadBalanceTask: add a watchQueryChannelTask to loadBalanceTask's childTask", zap.Any("task", watchQueryChannelTask)) - } } + assignInternalTask(collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs) + log.Debug("loadBalanceTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) } } } @@ -1336,10 +1289,19 @@ func (lbt *LoadBalanceTask) PostExecute(context.Context) error { func shuffleChannelsToQueryNode(dmChannels []string, cluster *queryNodeCluster) []int64 { maxNumChannels := 0 - for nodeID, node := range cluster.nodes { - if !node.onService { + nodes := make(map[int64]*queryNode) + var err error + for { + nodes, err = cluster.onServiceNodes() + if err != nil { + log.Debug(err.Error()) + time.Sleep(1 * time.Second) continue } + break + } + + for nodeID := range nodes { numChannels, _ := cluster.getNumDmChannels(nodeID) if numChannels > maxNumChannels { maxNumChannels = numChannels @@ -1355,26 +1317,20 @@ func shuffleChannelsToQueryNode(dmChannels []string, cluster *queryNodeCluster) for { lastOffset := offset if !loopAll { - for id, node := range cluster.nodes { - if !node.isOnService() { - continue - } - numSegments, _ := cluster.getNumSegments(id) + for nodeID := range nodes { + numSegments, _ := cluster.getNumSegments(nodeID) if numSegments >= maxNumChannels { continue } - res = append(res, id) + res = append(res, nodeID) offset++ if offset == len(dmChannels) { return res } } } else { - for id, node := range cluster.nodes { - if !node.isOnService() { - continue - } - res = append(res, id) + for nodeID := range nodes { + res = append(res, nodeID) offset++ if offset == len(dmChannels) { return res @@ -1389,10 +1345,18 @@ func shuffleChannelsToQueryNode(dmChannels []string, cluster *queryNodeCluster) func shuffleSegmentsToQueryNode(segmentIDs []UniqueID, cluster *queryNodeCluster) []int64 { maxNumSegments := 0 - for nodeID, node := range cluster.nodes { - if !node.isOnService() { + nodes := make(map[int64]*queryNode) + var err error + for { + nodes, err = cluster.onServiceNodes() + if err != nil { + log.Debug(err.Error()) + time.Sleep(1 * time.Second) continue } + break + } + for nodeID := range nodes { numSegments, _ := cluster.getNumSegments(nodeID) if numSegments > maxNumSegments { maxNumSegments = numSegments @@ -1409,26 +1373,20 @@ func shuffleSegmentsToQueryNode(segmentIDs []UniqueID, cluster *queryNodeCluster for { lastOffset := offset if !loopAll { - for id, node := range cluster.nodes { - if !node.isOnService() { - continue - } - numSegments, _ := cluster.getNumSegments(id) + for nodeID := range nodes { + numSegments, _ := cluster.getNumSegments(nodeID) if numSegments >= maxNumSegments { continue } - res = append(res, id) + res = append(res, nodeID) offset++ if offset == len(segmentIDs) { return res } } } else { - for id, node := range cluster.nodes { - if !node.isOnService() { - continue - } - res = append(res, id) + for nodeID := range nodes { + res = append(res, nodeID) offset++ if offset == len(segmentIDs) { return res diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index d691d3b016..db63ff7951 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -116,6 +116,7 @@ func (q *queryCollection) register() { return } + //TODO:: can't add new vChannel to selectCase q.watcherSelectCase = make([]reflect.SelectCase, 0) log.Debug("register tSafe watcher and init watcher select case", zap.Any("collectionID", collection.ID()), diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index a935d7729b..064056fb44 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -61,7 +61,6 @@ type QueryNode struct { queryService *queryService // clients - queryCoord types.QueryCoord rootCoord types.RootCoord indexCoord types.IndexCoord dataCoord types.DataCoord @@ -147,43 +146,6 @@ func (node *QueryNode) Init() error { node.streaming = newStreaming(node.queryNodeLoopCtx, node.msFactory, node.etcdKV) C.SegcoreInit() - //registerReq := &queryPb.RegisterNodeRequest{ - // Base: &commonpb.MsgBase{ - // SourceID: Params.QueryNodeID, - // }, - // Address: &commonpb.Address{ - // Ip: Params.QueryNodeIP, - // Port: Params.QueryNodePort, - // }, - //} - // - //resp, err := node.queryCoord.RegisterNode(ctx, registerReq) - //if err != nil { - // log.Debug("QueryNode RegisterNode failed", zap.Error(err)) - // panic(err) - //} - //if resp.Status.ErrorCode != commonpb.ErrorCode_Success { - // log.Debug("QueryNode RegisterNode failed", zap.Any("Reason", resp.Status.Reason)) - // panic(resp.Status.Reason) - //} - //log.Debug("QueryNode RegisterNode success") - // - //for _, kv := range resp.InitParams.StartParams { - // switch kv.Key { - // case "StatsChannelName": - // Params.StatsChannelName = kv.Value - // case "TimeTickChannelName": - // Params.QueryTimeTickChannelName = kv.Value - // case "SearchChannelName": - // Params.SearchChannelNames = append(Params.SearchChannelNames, kv.Value) - // case "SearchResultChannelName": - // Params.SearchResultChannelNames = append(Params.SearchResultChannelNames, kv.Value) - // default: - // return fmt.Errorf("Invalid key: %v", kv.Key) - // } - //} - // - //log.Debug("QueryNode Init ", zap.Int64("QueryNodeID", Params.QueryNodeID), zap.Any("searchChannelNames", Params.SearchChannelNames)) if node.rootCoord == nil { log.Error("null root coordinator detected") @@ -259,14 +221,6 @@ func (node *QueryNode) SetRootCoord(rc types.RootCoord) error { return nil } -func (node *QueryNode) SetQueryCoord(query types.QueryCoord) error { - if query == nil { - return errors.New("null query coordinator interface") - } - node.queryCoord = query - return nil -} - func (node *QueryNode) SetIndexCoord(index types.IndexCoord) error { if index == nil { return errors.New("null index coordinator interface") diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 7b5b1f455d..1b219b0093 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -175,10 +175,6 @@ func newQueryNodeMock() *QueryNode { panic(err) } svr := NewQueryNode(ctx, Params.QueryNodeID, msFactory) - err = svr.SetQueryCoord(&queryCoordMock{}) - if err != nil { - panic(err) - } svr.historical = newHistorical(svr.queryNodeLoopCtx, nil, nil, nil, svr.msFactory, etcdKV) svr.streaming = newStreaming(ctx, msFactory, etcdKV)