mirror of https://github.com/milvus-io/milvus.git
Fix search hang after querynode restart (#6212)
Signed-off-by: xige-16 <xi.ge@zilliz.com>pull/6234/head
parent
ffb32875c1
commit
f146d3825f
|
@ -31,7 +31,6 @@ import (
|
|||
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
|
||||
dsc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
|
||||
isc "github.com/milvus-io/milvus/internal/distributed/indexcoord/client"
|
||||
qcc "github.com/milvus-io/milvus/internal/distributed/querycoord/client"
|
||||
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
|
@ -59,7 +58,6 @@ type Server struct {
|
|||
dataCoord *dsc.Client
|
||||
rootCoord *rcc.GrpcClient
|
||||
indexCoord *isc.Client
|
||||
queryCoord *qcc.Client
|
||||
|
||||
closer io.Closer
|
||||
}
|
||||
|
@ -101,35 +99,6 @@ func (s *Server) init() error {
|
|||
if err := s.querynode.Register(); err != nil {
|
||||
return err
|
||||
}
|
||||
// --- QueryCoord ---
|
||||
log.Debug("QueryNode start to new QueryCoordClient", zap.Any("QueryCoordAddress", Params.QueryCoordAddress))
|
||||
queryCoord, err := qcc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints)
|
||||
if err != nil {
|
||||
log.Debug("QueryNode new QueryCoordClient failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err = queryCoord.Init(); err != nil {
|
||||
log.Debug("QueryNode QueryCoordClient Init failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err = queryCoord.Start(); err != nil {
|
||||
log.Debug("QueryNode QueryCoordClient Start failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
log.Debug("QueryNode start to wait for QueryCoord ready")
|
||||
err = funcutil.WaitForComponentInitOrHealthy(s.ctx, queryCoord, "QueryCoord", 1000000, time.Millisecond*200)
|
||||
if err != nil {
|
||||
log.Debug("QueryNode wait for QueryCoord ready failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
log.Debug("QueryNode report QueryCoord is ready")
|
||||
|
||||
if err := s.SetQueryCoord(queryCoord); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// --- RootCoord Client ---
|
||||
//ms.Params.Init()
|
||||
|
@ -315,10 +284,6 @@ func (s *Server) SetRootCoord(rootCoord types.RootCoord) error {
|
|||
return s.querynode.SetRootCoord(rootCoord)
|
||||
}
|
||||
|
||||
func (s *Server) SetQueryCoord(queryCoord types.QueryCoord) error {
|
||||
return s.querynode.SetQueryCoord(queryCoord)
|
||||
}
|
||||
|
||||
func (s *Server) SetIndexCoord(indexCoord types.IndexCoord) error {
|
||||
return s.querynode.SetIndexCoord(indexCoord)
|
||||
}
|
||||
|
|
|
@ -109,11 +109,11 @@ func (c *queryNodeCluster) GetComponentInfos(ctx context.Context) ([]*internalpb
|
|||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
subComponentInfos := make([]*internalpb.ComponentInfo, 0)
|
||||
nodeIDs, err := c.getOnServiceNodeIDs()
|
||||
nodes, err := c.getOnServiceNodes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, nodeID := range nodeIDs {
|
||||
for nodeID := range nodes {
|
||||
node := c.nodes[nodeID]
|
||||
componentStates, err := node.client.GetComponentStates(ctx)
|
||||
if err != nil {
|
||||
|
@ -228,7 +228,7 @@ func (c *queryNodeCluster) WatchDmChannels(ctx context.Context, nodeID int64, in
|
|||
if err == nil && status.ErrorCode == commonpb.ErrorCode_Success {
|
||||
collectionID := in.CollectionID
|
||||
//c.clusterMeta.addCollection(collectionID, in.Schema)
|
||||
//c.clusterMeta.addDmChannel(collectionID, nodeID, channels)
|
||||
c.clusterMeta.addDmChannel(collectionID, nodeID, channels)
|
||||
|
||||
node.addCollection(collectionID, in.Schema)
|
||||
node.addDmChannel(collectionID, channels)
|
||||
|
@ -328,12 +328,12 @@ func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSe
|
|||
defer c.Unlock()
|
||||
|
||||
segmentInfos := make([]*querypb.SegmentInfo, 0)
|
||||
nodes, err := c.getOnServiceNodeIDs()
|
||||
nodes, err := c.getOnServiceNodes()
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
return segmentInfos, nil
|
||||
}
|
||||
for _, nodeID := range nodes {
|
||||
for nodeID := range nodes {
|
||||
res, err := c.nodes[nodeID].client.GetSegmentInfo(ctx, in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -407,6 +407,17 @@ func (c *queryNodeCluster) RegisterNode(ctx context.Context, session *sessionuti
|
|||
return fmt.Errorf("node %d alredy exists in cluster", id)
|
||||
}
|
||||
|
||||
func (c *queryNodeCluster) getNodeByID(nodeID int64) (*queryNode, error) {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
if node, ok := c.nodes[nodeID]; ok {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("query node %d not exist", nodeID)
|
||||
}
|
||||
|
||||
func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
@ -427,25 +438,36 @@ func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *queryNodeCluster) onServiceNodeIDs() ([]int64, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
func (c *queryNodeCluster) onServiceNodes() (map[int64]*queryNode, error) {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
return c.getOnServiceNodeIDs()
|
||||
return c.getOnServiceNodes()
|
||||
}
|
||||
|
||||
func (c *queryNodeCluster) getOnServiceNodeIDs() ([]int64, error) {
|
||||
nodeIDs := make([]int64, 0)
|
||||
func (c *queryNodeCluster) getOnServiceNodes() (map[int64]*queryNode, error) {
|
||||
nodes := make(map[int64]*queryNode)
|
||||
for nodeID, node := range c.nodes {
|
||||
if node.isOnService() {
|
||||
nodeIDs = append(nodeIDs, nodeID)
|
||||
nodes[nodeID] = node
|
||||
}
|
||||
}
|
||||
if len(nodeIDs) == 0 {
|
||||
if len(nodes) == 0 {
|
||||
return nil, errors.New("no queryNode is alive")
|
||||
}
|
||||
|
||||
return nodeIDs, nil
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
func (c *queryNodeCluster) isOnService(nodeID int64) (bool, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
if node, ok := c.nodes[nodeID]; ok {
|
||||
return node.isOnService(), nil
|
||||
}
|
||||
|
||||
return false, fmt.Errorf("query node %d not exist", nodeID)
|
||||
}
|
||||
|
||||
func (c *queryNodeCluster) printMeta() {
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
|
@ -307,10 +308,11 @@ func (lct *LoadCollectionTask) PostExecute(ctx context.Context) error {
|
|||
lct.meta.addCollection(collectionID, lct.Schema)
|
||||
if lct.result.ErrorCode != commonpb.ErrorCode_Success {
|
||||
lct.childTasks = make([]task, 0)
|
||||
for nodeID, node := range lct.cluster.nodes {
|
||||
if !node.isOnService() {
|
||||
continue
|
||||
}
|
||||
nodes, err := lct.cluster.onServiceNodes()
|
||||
if err != nil {
|
||||
log.Debug(err.Error())
|
||||
}
|
||||
for nodeID := range nodes {
|
||||
req := &querypb.ReleaseCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_ReleaseCollection,
|
||||
|
@ -382,10 +384,11 @@ func (rct *ReleaseCollectionTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
if rct.NodeID <= 0 {
|
||||
for nodeID, node := range rct.cluster.nodes {
|
||||
if !node.isOnService() {
|
||||
continue
|
||||
}
|
||||
nodes, err := rct.cluster.onServiceNodes()
|
||||
if err != nil {
|
||||
log.Debug(err.Error())
|
||||
}
|
||||
for nodeID := range nodes {
|
||||
req := proto.Clone(rct.ReleaseCollectionRequest).(*querypb.ReleaseCollectionRequest)
|
||||
req.NodeID = nodeID
|
||||
releaseCollectionTask := &ReleaseCollectionTask{
|
||||
|
@ -552,10 +555,11 @@ func (lpt *LoadPartitionTask) PostExecute(ctx context.Context) error {
|
|||
if lpt.result.ErrorCode != commonpb.ErrorCode_Success {
|
||||
lpt.childTasks = make([]task, 0)
|
||||
if lpt.addCol {
|
||||
for nodeID, node := range lpt.cluster.nodes {
|
||||
if !node.isOnService() {
|
||||
continue
|
||||
}
|
||||
nodes, err := lpt.cluster.onServiceNodes()
|
||||
if err != nil {
|
||||
log.Debug(err.Error())
|
||||
}
|
||||
for nodeID := range nodes {
|
||||
req := &querypb.ReleaseCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_ReleaseCollection,
|
||||
|
@ -580,10 +584,11 @@ func (lpt *LoadPartitionTask) PostExecute(ctx context.Context) error {
|
|||
log.Debug("loadPartitionTask: add a releaseCollectionTask to loadPartitionTask's childTask", zap.Any("task", releaseCollectionTask))
|
||||
}
|
||||
} else {
|
||||
for nodeID, node := range lpt.cluster.nodes {
|
||||
if !node.isOnService() {
|
||||
continue
|
||||
}
|
||||
nodes, err := lpt.cluster.onServiceNodes()
|
||||
if err != nil {
|
||||
log.Debug(err.Error())
|
||||
}
|
||||
for nodeID := range nodes {
|
||||
req := &querypb.ReleasePartitionsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_ReleasePartitions,
|
||||
|
@ -661,10 +666,11 @@ func (rpt *ReleasePartitionTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
if rpt.NodeID <= 0 {
|
||||
for nodeID, node := range rpt.cluster.nodes {
|
||||
if !node.isOnService() {
|
||||
continue
|
||||
}
|
||||
nodes, err := rpt.cluster.onServiceNodes()
|
||||
if err != nil {
|
||||
log.Debug(err.Error())
|
||||
}
|
||||
for nodeID := range nodes {
|
||||
req := proto.Clone(rpt.ReleasePartitionsRequest).(*querypb.ReleasePartitionsRequest)
|
||||
req.NodeID = nodeID
|
||||
releasePartitionTask := &ReleasePartitionTask{
|
||||
|
@ -734,7 +740,12 @@ func (lst *LoadSegmentTask) Marshal() string {
|
|||
}
|
||||
|
||||
func (lst *LoadSegmentTask) IsValid() bool {
|
||||
return lst.ctx != nil && lst.cluster.nodes[lst.NodeID].isOnService()
|
||||
onService, err := lst.cluster.isOnService(lst.NodeID)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return lst.ctx != nil && onService
|
||||
}
|
||||
|
||||
func (lst *LoadSegmentTask) Type() commonpb.MsgType {
|
||||
|
@ -854,7 +865,11 @@ func (rst *ReleaseSegmentTask) Marshal() string {
|
|||
}
|
||||
|
||||
func (rst *ReleaseSegmentTask) IsValid() bool {
|
||||
return rst.ctx != nil && rst.cluster.nodes[rst.NodeID].isOnService()
|
||||
onService, err := rst.cluster.isOnService(rst.NodeID)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return rst.ctx != nil && onService
|
||||
}
|
||||
|
||||
func (rst *ReleaseSegmentTask) Type() commonpb.MsgType {
|
||||
|
@ -912,7 +927,11 @@ func (wdt *WatchDmChannelTask) Marshal() string {
|
|||
}
|
||||
|
||||
func (wdt *WatchDmChannelTask) IsValid() bool {
|
||||
return wdt.ctx != nil && wdt.cluster.nodes[wdt.NodeID].isOnService()
|
||||
onService, err := wdt.cluster.isOnService(wdt.NodeID)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return wdt.ctx != nil && onService
|
||||
}
|
||||
|
||||
func (wdt *WatchDmChannelTask) Type() commonpb.MsgType {
|
||||
|
@ -1036,7 +1055,12 @@ func (wqt *WatchQueryChannelTask) Marshal() string {
|
|||
}
|
||||
|
||||
func (wqt *WatchQueryChannelTask) IsValid() bool {
|
||||
return wqt.ctx != nil && wqt.cluster.nodes[wqt.NodeID].isOnService()
|
||||
onService, err := wqt.cluster.isOnService(wqt.NodeID)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return wqt.ctx != nil && onService
|
||||
}
|
||||
|
||||
func (wqt *WatchQueryChannelTask) Type() commonpb.MsgType {
|
||||
|
@ -1127,18 +1151,27 @@ func (lbt *LoadBalanceTask) Execute(ctx context.Context) error {
|
|||
|
||||
if lbt.triggerCondition == querypb.TriggerCondition_nodeDown {
|
||||
for _, nodeID := range lbt.SourceNodeIDs {
|
||||
node, err := lbt.cluster.getNodeByID(nodeID)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
continue
|
||||
}
|
||||
lbt.meta.deleteSegmentInfoByNodeID(nodeID)
|
||||
collectionInfos := lbt.cluster.nodes[nodeID].collectionInfos
|
||||
collectionInfos := node.collectionInfos
|
||||
for collectionID, info := range collectionInfos {
|
||||
loadCollection := lbt.meta.collectionInfos[collectionID].LoadCollection
|
||||
schema := lbt.meta.collectionInfos[collectionID].Schema
|
||||
metaInfo, err := lbt.meta.getCollectionInfoByID(collectionID)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
continue
|
||||
}
|
||||
loadCollection := metaInfo.LoadCollection
|
||||
schema := metaInfo.Schema
|
||||
partitionIDs := info.PartitionIDs
|
||||
|
||||
segmentsToLoad := make([]UniqueID, 0)
|
||||
segment2BingLog := make(map[UniqueID]*querypb.SegmentLoadInfo)
|
||||
loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0)
|
||||
channelsToWatch := make([]string, 0)
|
||||
watchRequestsInPartition := make([]*querypb.WatchDmChannelsRequest, 0)
|
||||
watchRequestsInCollection := make(map[string]*querypb.WatchDmChannelsRequest)
|
||||
watchDmChannelReqs := make([]*querypb.WatchDmChannelsRequest, 0)
|
||||
|
||||
dmChannels, err := lbt.meta.getDmChannelsByNodeID(collectionID, nodeID)
|
||||
if err != nil {
|
||||
|
@ -1162,148 +1195,68 @@ func (lbt *LoadBalanceTask) Execute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
for _, segmentBingLog := range recoveryInfo.Binlogs {
|
||||
segmentID := segmentBingLog.SegmentID
|
||||
segmentLoadInfo := &querypb.SegmentLoadInfo{
|
||||
SegmentID: segmentID,
|
||||
PartitionID: partitionID,
|
||||
CollectionID: collectionID,
|
||||
BinlogPaths: segmentBingLog.FieldBinlogs,
|
||||
}
|
||||
|
||||
loadSegmentReq := &querypb.LoadSegmentsRequest{
|
||||
Base: lbt.Base,
|
||||
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
|
||||
Schema: schema,
|
||||
LoadCondition: querypb.TriggerCondition_nodeDown,
|
||||
}
|
||||
|
||||
segmentsToLoad = append(segmentsToLoad, segmentID)
|
||||
loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq)
|
||||
}
|
||||
|
||||
for _, channelInfo := range recoveryInfo.Channels {
|
||||
for _, channel := range dmChannels {
|
||||
if channelInfo.ChannelName == channel {
|
||||
watchRequest := &querypb.WatchDmChannelsRequest{
|
||||
Base: lbt.Base,
|
||||
CollectionID: collectionID,
|
||||
Infos: []*datapb.VchannelInfo{channelInfo},
|
||||
Schema: schema,
|
||||
}
|
||||
if loadCollection {
|
||||
if _, ok := watchRequestsInCollection[channel]; !ok {
|
||||
watchRequestsInCollection[channel] = watchRequest
|
||||
merged := false
|
||||
for index, channelName := range channelsToWatch {
|
||||
if channel == channelName {
|
||||
merged = true
|
||||
oldInfo := watchDmChannelReqs[index].Infos[0]
|
||||
newInfo := mergeVChannelInfo(oldInfo, channelInfo)
|
||||
watchDmChannelReqs[index].Infos = []*datapb.VchannelInfo{newInfo}
|
||||
break
|
||||
}
|
||||
}
|
||||
if !merged {
|
||||
watchRequest := &querypb.WatchDmChannelsRequest{
|
||||
Base: lbt.Base,
|
||||
CollectionID: collectionID,
|
||||
Infos: []*datapb.VchannelInfo{channelInfo},
|
||||
Schema: schema,
|
||||
}
|
||||
channelsToWatch = append(channelsToWatch, channel)
|
||||
} else {
|
||||
oldInfo := watchRequestsInCollection[channel].Infos[0]
|
||||
newInfo := mergeVChannelInfo(oldInfo, channelInfo)
|
||||
watchRequestsInCollection[channel].Infos = []*datapb.VchannelInfo{newInfo}
|
||||
watchDmChannelReqs = append(watchDmChannelReqs, watchRequest)
|
||||
}
|
||||
} else {
|
||||
watchRequest.PartitionID = partitionID
|
||||
watchRequest := &querypb.WatchDmChannelsRequest{
|
||||
Base: lbt.Base,
|
||||
CollectionID: collectionID,
|
||||
PartitionID: partitionID,
|
||||
Infos: []*datapb.VchannelInfo{channelInfo},
|
||||
Schema: schema,
|
||||
}
|
||||
channelsToWatch = append(channelsToWatch, channel)
|
||||
watchRequestsInPartition = append(watchRequestsInPartition, watchRequest)
|
||||
watchDmChannelReqs = append(watchDmChannelReqs, watchRequest)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, binlog := range recoveryInfo.Binlogs {
|
||||
segmentID := binlog.SegmentID
|
||||
if lbt.meta.hasSegmentInfo(segmentID) {
|
||||
continue
|
||||
}
|
||||
segmentLoadInfo := &querypb.SegmentLoadInfo{
|
||||
SegmentID: segmentID,
|
||||
PartitionID: partitionID,
|
||||
CollectionID: collectionID,
|
||||
BinlogPaths: make([]*datapb.FieldBinlog, 0),
|
||||
}
|
||||
segmentLoadInfo.BinlogPaths = append(segmentLoadInfo.BinlogPaths, binlog.FieldBinlogs...)
|
||||
segmentsToLoad = append(segmentsToLoad, segmentID)
|
||||
segment2BingLog[segmentID] = segmentLoadInfo
|
||||
}
|
||||
}
|
||||
|
||||
segment2Nodes := shuffleSegmentsToQueryNode(segmentsToLoad, lbt.cluster)
|
||||
watchRequest2Nodes := shuffleChannelsToQueryNode(channelsToWatch, lbt.cluster)
|
||||
|
||||
watchQueryChannelInfo := make(map[int64]bool)
|
||||
node2Segments := make(map[int64][]*querypb.SegmentLoadInfo)
|
||||
for index, id := range segment2Nodes {
|
||||
if _, ok := node2Segments[id]; !ok {
|
||||
node2Segments[id] = make([]*querypb.SegmentLoadInfo, 0)
|
||||
}
|
||||
segmentID := segmentsToLoad[index]
|
||||
node2Segments[id] = append(node2Segments[id], segment2BingLog[segmentID])
|
||||
if lbt.cluster.hasWatchedQueryChannel(lbt.ctx, id, collectionID) {
|
||||
watchQueryChannelInfo[id] = true
|
||||
continue
|
||||
}
|
||||
watchQueryChannelInfo[id] = false
|
||||
}
|
||||
for _, id := range watchRequest2Nodes {
|
||||
if lbt.cluster.hasWatchedQueryChannel(lbt.ctx, id, collectionID) {
|
||||
watchQueryChannelInfo[id] = true
|
||||
continue
|
||||
}
|
||||
watchQueryChannelInfo[id] = false
|
||||
}
|
||||
|
||||
for id, segmentInfos := range node2Segments {
|
||||
loadSegmentsRequest := &querypb.LoadSegmentsRequest{
|
||||
Base: lbt.Base,
|
||||
NodeID: id,
|
||||
Infos: segmentInfos,
|
||||
Schema: schema,
|
||||
LoadCondition: querypb.TriggerCondition_grpcRequest,
|
||||
}
|
||||
|
||||
loadSegmentTask := &LoadSegmentTask{
|
||||
BaseTask: BaseTask{
|
||||
ctx: lbt.ctx,
|
||||
Condition: NewTaskCondition(lbt.ctx),
|
||||
triggerCondition: querypb.TriggerCondition_grpcRequest,
|
||||
},
|
||||
|
||||
LoadSegmentsRequest: loadSegmentsRequest,
|
||||
meta: lbt.meta,
|
||||
cluster: lbt.cluster,
|
||||
}
|
||||
lbt.AddChildTask(loadSegmentTask)
|
||||
log.Debug("LoadBalanceTask: add a loadSegmentTask to loadBalanceTask's childTask", zap.Any("task", loadSegmentTask))
|
||||
}
|
||||
|
||||
for index, id := range watchRequest2Nodes {
|
||||
var watchRequest *querypb.WatchDmChannelsRequest
|
||||
if loadCollection {
|
||||
channel := channelsToWatch[index]
|
||||
watchRequest = watchRequestsInCollection[channel]
|
||||
} else {
|
||||
watchRequest = watchRequestsInPartition[index]
|
||||
}
|
||||
watchRequest.NodeID = id
|
||||
watchDmChannelTask := &WatchDmChannelTask{
|
||||
BaseTask: BaseTask{
|
||||
ctx: lbt.ctx,
|
||||
Condition: NewTaskCondition(lbt.ctx),
|
||||
triggerCondition: querypb.TriggerCondition_grpcRequest,
|
||||
},
|
||||
WatchDmChannelsRequest: watchRequest,
|
||||
meta: lbt.meta,
|
||||
cluster: lbt.cluster,
|
||||
}
|
||||
lbt.AddChildTask(watchDmChannelTask)
|
||||
log.Debug("LoadBalanceTask: add a watchDmChannelTask to loadBalanceTask's childTask", zap.Any("task", watchDmChannelTask))
|
||||
}
|
||||
|
||||
for id, watched := range watchQueryChannelInfo {
|
||||
if !watched {
|
||||
queryChannel, queryResultChannel := lbt.meta.GetQueryChannel(collectionID)
|
||||
|
||||
addQueryChannelRequest := &querypb.AddQueryChannelRequest{
|
||||
Base: lbt.Base,
|
||||
NodeID: id,
|
||||
CollectionID: collectionID,
|
||||
RequestChannelID: queryChannel,
|
||||
ResultChannelID: queryResultChannel,
|
||||
}
|
||||
watchQueryChannelTask := &WatchQueryChannelTask{
|
||||
BaseTask: BaseTask{
|
||||
ctx: lbt.ctx,
|
||||
Condition: NewTaskCondition(lbt.ctx),
|
||||
triggerCondition: querypb.TriggerCondition_grpcRequest,
|
||||
},
|
||||
|
||||
AddQueryChannelRequest: addQueryChannelRequest,
|
||||
cluster: lbt.cluster,
|
||||
}
|
||||
lbt.AddChildTask(watchQueryChannelTask)
|
||||
log.Debug("LoadBalanceTask: add a watchQueryChannelTask to loadBalanceTask's childTask", zap.Any("task", watchQueryChannelTask))
|
||||
}
|
||||
}
|
||||
assignInternalTask(collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs)
|
||||
log.Debug("loadBalanceTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1336,10 +1289,19 @@ func (lbt *LoadBalanceTask) PostExecute(context.Context) error {
|
|||
|
||||
func shuffleChannelsToQueryNode(dmChannels []string, cluster *queryNodeCluster) []int64 {
|
||||
maxNumChannels := 0
|
||||
for nodeID, node := range cluster.nodes {
|
||||
if !node.onService {
|
||||
nodes := make(map[int64]*queryNode)
|
||||
var err error
|
||||
for {
|
||||
nodes, err = cluster.onServiceNodes()
|
||||
if err != nil {
|
||||
log.Debug(err.Error())
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
for nodeID := range nodes {
|
||||
numChannels, _ := cluster.getNumDmChannels(nodeID)
|
||||
if numChannels > maxNumChannels {
|
||||
maxNumChannels = numChannels
|
||||
|
@ -1355,26 +1317,20 @@ func shuffleChannelsToQueryNode(dmChannels []string, cluster *queryNodeCluster)
|
|||
for {
|
||||
lastOffset := offset
|
||||
if !loopAll {
|
||||
for id, node := range cluster.nodes {
|
||||
if !node.isOnService() {
|
||||
continue
|
||||
}
|
||||
numSegments, _ := cluster.getNumSegments(id)
|
||||
for nodeID := range nodes {
|
||||
numSegments, _ := cluster.getNumSegments(nodeID)
|
||||
if numSegments >= maxNumChannels {
|
||||
continue
|
||||
}
|
||||
res = append(res, id)
|
||||
res = append(res, nodeID)
|
||||
offset++
|
||||
if offset == len(dmChannels) {
|
||||
return res
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for id, node := range cluster.nodes {
|
||||
if !node.isOnService() {
|
||||
continue
|
||||
}
|
||||
res = append(res, id)
|
||||
for nodeID := range nodes {
|
||||
res = append(res, nodeID)
|
||||
offset++
|
||||
if offset == len(dmChannels) {
|
||||
return res
|
||||
|
@ -1389,10 +1345,18 @@ func shuffleChannelsToQueryNode(dmChannels []string, cluster *queryNodeCluster)
|
|||
|
||||
func shuffleSegmentsToQueryNode(segmentIDs []UniqueID, cluster *queryNodeCluster) []int64 {
|
||||
maxNumSegments := 0
|
||||
for nodeID, node := range cluster.nodes {
|
||||
if !node.isOnService() {
|
||||
nodes := make(map[int64]*queryNode)
|
||||
var err error
|
||||
for {
|
||||
nodes, err = cluster.onServiceNodes()
|
||||
if err != nil {
|
||||
log.Debug(err.Error())
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
for nodeID := range nodes {
|
||||
numSegments, _ := cluster.getNumSegments(nodeID)
|
||||
if numSegments > maxNumSegments {
|
||||
maxNumSegments = numSegments
|
||||
|
@ -1409,26 +1373,20 @@ func shuffleSegmentsToQueryNode(segmentIDs []UniqueID, cluster *queryNodeCluster
|
|||
for {
|
||||
lastOffset := offset
|
||||
if !loopAll {
|
||||
for id, node := range cluster.nodes {
|
||||
if !node.isOnService() {
|
||||
continue
|
||||
}
|
||||
numSegments, _ := cluster.getNumSegments(id)
|
||||
for nodeID := range nodes {
|
||||
numSegments, _ := cluster.getNumSegments(nodeID)
|
||||
if numSegments >= maxNumSegments {
|
||||
continue
|
||||
}
|
||||
res = append(res, id)
|
||||
res = append(res, nodeID)
|
||||
offset++
|
||||
if offset == len(segmentIDs) {
|
||||
return res
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for id, node := range cluster.nodes {
|
||||
if !node.isOnService() {
|
||||
continue
|
||||
}
|
||||
res = append(res, id)
|
||||
for nodeID := range nodes {
|
||||
res = append(res, nodeID)
|
||||
offset++
|
||||
if offset == len(segmentIDs) {
|
||||
return res
|
||||
|
|
|
@ -116,6 +116,7 @@ func (q *queryCollection) register() {
|
|||
return
|
||||
}
|
||||
|
||||
//TODO:: can't add new vChannel to selectCase
|
||||
q.watcherSelectCase = make([]reflect.SelectCase, 0)
|
||||
log.Debug("register tSafe watcher and init watcher select case",
|
||||
zap.Any("collectionID", collection.ID()),
|
||||
|
|
|
@ -61,7 +61,6 @@ type QueryNode struct {
|
|||
queryService *queryService
|
||||
|
||||
// clients
|
||||
queryCoord types.QueryCoord
|
||||
rootCoord types.RootCoord
|
||||
indexCoord types.IndexCoord
|
||||
dataCoord types.DataCoord
|
||||
|
@ -147,43 +146,6 @@ func (node *QueryNode) Init() error {
|
|||
node.streaming = newStreaming(node.queryNodeLoopCtx, node.msFactory, node.etcdKV)
|
||||
|
||||
C.SegcoreInit()
|
||||
//registerReq := &queryPb.RegisterNodeRequest{
|
||||
// Base: &commonpb.MsgBase{
|
||||
// SourceID: Params.QueryNodeID,
|
||||
// },
|
||||
// Address: &commonpb.Address{
|
||||
// Ip: Params.QueryNodeIP,
|
||||
// Port: Params.QueryNodePort,
|
||||
// },
|
||||
//}
|
||||
//
|
||||
//resp, err := node.queryCoord.RegisterNode(ctx, registerReq)
|
||||
//if err != nil {
|
||||
// log.Debug("QueryNode RegisterNode failed", zap.Error(err))
|
||||
// panic(err)
|
||||
//}
|
||||
//if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
// log.Debug("QueryNode RegisterNode failed", zap.Any("Reason", resp.Status.Reason))
|
||||
// panic(resp.Status.Reason)
|
||||
//}
|
||||
//log.Debug("QueryNode RegisterNode success")
|
||||
//
|
||||
//for _, kv := range resp.InitParams.StartParams {
|
||||
// switch kv.Key {
|
||||
// case "StatsChannelName":
|
||||
// Params.StatsChannelName = kv.Value
|
||||
// case "TimeTickChannelName":
|
||||
// Params.QueryTimeTickChannelName = kv.Value
|
||||
// case "SearchChannelName":
|
||||
// Params.SearchChannelNames = append(Params.SearchChannelNames, kv.Value)
|
||||
// case "SearchResultChannelName":
|
||||
// Params.SearchResultChannelNames = append(Params.SearchResultChannelNames, kv.Value)
|
||||
// default:
|
||||
// return fmt.Errorf("Invalid key: %v", kv.Key)
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//log.Debug("QueryNode Init ", zap.Int64("QueryNodeID", Params.QueryNodeID), zap.Any("searchChannelNames", Params.SearchChannelNames))
|
||||
|
||||
if node.rootCoord == nil {
|
||||
log.Error("null root coordinator detected")
|
||||
|
@ -259,14 +221,6 @@ func (node *QueryNode) SetRootCoord(rc types.RootCoord) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (node *QueryNode) SetQueryCoord(query types.QueryCoord) error {
|
||||
if query == nil {
|
||||
return errors.New("null query coordinator interface")
|
||||
}
|
||||
node.queryCoord = query
|
||||
return nil
|
||||
}
|
||||
|
||||
func (node *QueryNode) SetIndexCoord(index types.IndexCoord) error {
|
||||
if index == nil {
|
||||
return errors.New("null index coordinator interface")
|
||||
|
|
|
@ -175,10 +175,6 @@ func newQueryNodeMock() *QueryNode {
|
|||
panic(err)
|
||||
}
|
||||
svr := NewQueryNode(ctx, Params.QueryNodeID, msFactory)
|
||||
err = svr.SetQueryCoord(&queryCoordMock{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
svr.historical = newHistorical(svr.queryNodeLoopCtx, nil, nil, nil, svr.msFactory, etcdKV)
|
||||
svr.streaming = newStreaming(ctx, msFactory, etcdKV)
|
||||
|
||||
|
|
Loading…
Reference in New Issue