mirror of https://github.com/milvus-io/milvus.git
Fix bug: search timeout (#5557)
* Fix bug: search timeout Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com> * Add log and fix unittest bug Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/5484/head
parent
c47b157511
commit
b0b8f58192
|
@ -158,10 +158,14 @@ func (node *DataNode) Init() error {
|
|||
|
||||
resp, err := node.dataService.RegisterNode(ctx, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Register node failed: %v", err)
|
||||
err = fmt.Errorf("Register node failed: %v", err)
|
||||
log.Debug("DataNode RegisterNode failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return fmt.Errorf("Receive error when registering data node, msg: %s", resp.Status.Reason)
|
||||
err = fmt.Errorf("Receive error when registering data node, msg: %s", resp.Status.Reason)
|
||||
log.Debug("DataNode RegisterNode failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
for _, kv := range resp.InitParams.StartParams {
|
||||
|
@ -178,6 +182,10 @@ func (node *DataNode) Init() error {
|
|||
return fmt.Errorf("Invalid key: %v", kv.Key)
|
||||
}
|
||||
}
|
||||
log.Debug("DataNode Init", zap.Any("DDChannelName", Params.DDChannelNames),
|
||||
zap.Any("SegmentStatisticsChannelName", Params.SegmentStatisticsChannelName),
|
||||
zap.Any("TimeTickChannelName", Params.TimeTickChannelName),
|
||||
zap.Any("CompleteFlushChannelName", Params.TimeTickChannelName))
|
||||
|
||||
select {
|
||||
case <-time.After(RPCConnectionTimeout):
|
||||
|
|
|
@ -62,7 +62,7 @@ func NewClient(address string, timeout time.Duration) (*Client, error) {
|
|||
func (c *Client) Init() error {
|
||||
tracer := opentracing.GlobalTracer()
|
||||
connectGrpcFunc := func() error {
|
||||
log.Debug("DataNode connect ", zap.String("address", c.address))
|
||||
log.Debug("DataNodeClient try connect ", zap.String("address", c.address))
|
||||
conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
|
||||
grpc.WithUnaryInterceptor(
|
||||
otgrpc.OpenTracingClientInterceptor(tracer)),
|
||||
|
@ -77,8 +77,10 @@ func (c *Client) Init() error {
|
|||
|
||||
err := retry.Retry(c.reconnTry, time.Millisecond*500, connectGrpcFunc)
|
||||
if err != nil {
|
||||
log.Debug("DataNodeClient try connect failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("DataNodeClient connect success")
|
||||
c.grpc = datapb.NewDataNodeClient(c.conn)
|
||||
return nil
|
||||
}
|
||||
|
@ -87,7 +89,7 @@ func (c *Client) reconnect() error {
|
|||
tracer := opentracing.GlobalTracer()
|
||||
var err error
|
||||
connectGrpcFunc := func() error {
|
||||
log.Debug("DataNode connect ", zap.String("address", c.address))
|
||||
log.Debug("DataNodeClient try reconnect ", zap.String("address", c.address))
|
||||
conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
|
||||
grpc.WithUnaryInterceptor(
|
||||
otgrpc.OpenTracingClientInterceptor(tracer)),
|
||||
|
@ -102,8 +104,10 @@ func (c *Client) reconnect() error {
|
|||
|
||||
err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
|
||||
if err != nil {
|
||||
log.Debug("DataNodeClient try reconnect failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("DataNodeClient reconnect success")
|
||||
c.grpc = datapb.NewDataNodeClient(c.conn)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -171,6 +171,7 @@ func (s *Server) init() error {
|
|||
|
||||
err := s.datanode.Register()
|
||||
if err != nil {
|
||||
log.Debug("DataNode Register etcd failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
err = s.startGrpc()
|
||||
|
@ -184,18 +185,23 @@ func (s *Server) init() error {
|
|||
log.Debug("Init master service client ...")
|
||||
masterServiceClient, err := s.newMasterServiceClient(Params.MasterAddress)
|
||||
if err != nil {
|
||||
log.Debug("DataNode newMasterServiceClient failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
if err = masterServiceClient.Init(); err != nil {
|
||||
log.Debug("DataNode masterServiceClient Init failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
if err = masterServiceClient.Start(); err != nil {
|
||||
log.Debug("DataNode masterServiceClient Start failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
err = funcutil.WaitForComponentHealthy(ctx, masterServiceClient, "MasterService", 1000000, time.Millisecond*200)
|
||||
if err != nil {
|
||||
log.Debug("DataNode wait masterService ready failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
log.Debug("DataNode masterService is ready")
|
||||
if err = s.SetMasterServiceInterface(masterServiceClient); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -207,15 +213,19 @@ func (s *Server) init() error {
|
|||
log.Debug("DataNode Init data service client ...")
|
||||
dataServiceClient := s.newDataServiceClient(Params.DataServiceAddress, dn.Params.MetaRootPath, dn.Params.EtcdAddress, 10)
|
||||
if err = dataServiceClient.Init(); err != nil {
|
||||
log.Debug("DataNode newDataServiceClient failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
if err = dataServiceClient.Start(); err != nil {
|
||||
log.Debug("DataNode dataServiceClient Start failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
err = funcutil.WaitForComponentInitOrHealthy(ctx, dataServiceClient, "DataService", 1000000, time.Millisecond*200)
|
||||
if err != nil {
|
||||
log.Debug("DataNode wait dataServiceClient ready failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
log.Debug("DataNode dataService is ready")
|
||||
if err = s.SetDataServiceInterface(dataServiceClient); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -228,6 +238,7 @@ func (s *Server) init() error {
|
|||
log.Warn("datanode init error: ", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("DataNode", zap.Any("State", internalpb.StateCode_Initializing))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -47,10 +47,12 @@ func getDataServiceAddress(sess *sessionutil.Session) (string, error) {
|
|||
key := typeutil.DataServiceRole
|
||||
msess, _, err := sess.GetSessions(key)
|
||||
if err != nil {
|
||||
log.Debug("DataServiceClient, getSessions failed", zap.Any("key", key), zap.Error(err))
|
||||
return "", err
|
||||
}
|
||||
ms, ok := msess[key]
|
||||
if !ok {
|
||||
log.Debug("DataServiceClient, not existed in msess ", zap.Any("key", key), zap.Any("len of msess", len(msess)))
|
||||
return "", fmt.Errorf("number of master service is incorrect, %d", len(msess))
|
||||
}
|
||||
return ms.Address, nil
|
||||
|
@ -70,9 +72,10 @@ func NewClient(address, metaRoot string, etcdAddr []string, timeout time.Duratio
|
|||
|
||||
func (c *Client) Init() error {
|
||||
tracer := opentracing.GlobalTracer()
|
||||
log.Debug("DataServiceClient", zap.Any("c.addr", c.addr))
|
||||
if c.addr != "" {
|
||||
connectGrpcFunc := func() error {
|
||||
log.Debug("dataservice connect ", zap.String("address", c.addr))
|
||||
log.Debug("DataServiceClient try connect ", zap.String("address", c.addr))
|
||||
conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
|
||||
grpc.WithUnaryInterceptor(
|
||||
otgrpc.OpenTracingClientInterceptor(tracer)),
|
||||
|
@ -87,8 +90,10 @@ func (c *Client) Init() error {
|
|||
|
||||
err := retry.Retry(100000, time.Millisecond*200, connectGrpcFunc)
|
||||
if err != nil {
|
||||
log.Debug("DataServiceClient connect failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("DataServiceClient connect success")
|
||||
} else {
|
||||
return c.reconnect()
|
||||
}
|
||||
|
@ -109,10 +114,11 @@ func (c *Client) reconnect() error {
|
|||
}
|
||||
err = retry.Retry(c.reconnTry, 3*time.Second, getDataServiceAddressFn)
|
||||
if err != nil {
|
||||
log.Debug("DataServiceClient try reconnect getDataServiceAddressFn failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
connectGrpcFunc := func() error {
|
||||
log.Debug("DataService connect ", zap.String("address", c.addr))
|
||||
log.Debug("DataServiceClient try reconnect ", zap.String("address", c.addr))
|
||||
conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
|
||||
grpc.WithUnaryInterceptor(
|
||||
otgrpc.OpenTracingClientInterceptor(tracer)),
|
||||
|
@ -127,6 +133,7 @@ func (c *Client) reconnect() error {
|
|||
|
||||
err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
|
||||
if err != nil {
|
||||
log.Debug("DataService try reconnect failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
c.grpcClient = datapb.NewDataServiceClient(c.conn)
|
||||
|
|
|
@ -51,7 +51,7 @@ func NewClient(ctx context.Context, address string, timeout time.Duration) *Clie
|
|||
func (c *Client) Init() error {
|
||||
tracer := opentracing.GlobalTracer()
|
||||
connectGrpcFunc := func() error {
|
||||
log.Debug("proxynode connect ", zap.String("address", c.address))
|
||||
log.Debug("ProxyNodeClient try connect ", zap.String("address", c.address))
|
||||
conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
|
||||
grpc.WithUnaryInterceptor(
|
||||
otgrpc.OpenTracingClientInterceptor(tracer)),
|
||||
|
@ -65,8 +65,10 @@ func (c *Client) Init() error {
|
|||
}
|
||||
err := retry.Retry(c.reconnTry, time.Millisecond*200, connectGrpcFunc)
|
||||
if err != nil {
|
||||
log.Debug("ProxyNodeClient connect failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("ProxyNodeClient connect success", zap.String("address", c.address))
|
||||
c.grpcClient = proxypb.NewProxyNodeServiceClient(c.conn)
|
||||
return nil
|
||||
}
|
||||
|
@ -74,7 +76,7 @@ func (c *Client) Init() error {
|
|||
func (c *Client) reconnect() error {
|
||||
tracer := opentracing.GlobalTracer()
|
||||
connectGrpcFunc := func() error {
|
||||
log.Debug("ProxyNode connect ", zap.String("address", c.address))
|
||||
log.Debug("ProxyNodeClient try reconnect ", zap.String("address", c.address))
|
||||
conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
|
||||
grpc.WithUnaryInterceptor(
|
||||
otgrpc.OpenTracingClientInterceptor(tracer)),
|
||||
|
@ -89,8 +91,10 @@ func (c *Client) reconnect() error {
|
|||
|
||||
err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
|
||||
if err != nil {
|
||||
log.Debug("ProxyNodeClient try reconnect failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("ProxyNodeClient reconnect success")
|
||||
c.grpcClient = proxypb.NewProxyNodeServiceClient(c.conn)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -166,6 +166,7 @@ func (s *Server) init() error {
|
|||
|
||||
err = s.proxynode.Register()
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode Register etcd failed ", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -181,69 +182,77 @@ func (s *Server) init() error {
|
|||
s.proxyServiceClient = grpcproxyserviceclient.NewClient(Params.ProxyServiceAddress)
|
||||
err = s.proxyServiceClient.Init()
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode proxyServiceClient init failed ", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
s.proxynode.SetProxyServiceClient(s.proxyServiceClient)
|
||||
log.Debug("set proxy service client ...")
|
||||
|
||||
masterServiceAddr := Params.MasterAddress
|
||||
log.Debug("proxynode", zap.String("master address", masterServiceAddr))
|
||||
log.Debug("ProxyNode", zap.String("master address", masterServiceAddr))
|
||||
timeout := 3 * time.Second
|
||||
s.masterServiceClient, err = grpcmasterserviceclient.NewClient(masterServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout)
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode new masterServiceClient failed ", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
err = s.masterServiceClient.Init()
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode new masterServiceClient Init ", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
err = funcutil.WaitForComponentHealthy(ctx, s.masterServiceClient, "MasterService", 1000000, time.Millisecond*200)
|
||||
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode WaitForComponentHealthy master service failed ", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
s.proxynode.SetMasterClient(s.masterServiceClient)
|
||||
log.Debug("set master client ...")
|
||||
|
||||
dataServiceAddr := Params.DataServiceAddress
|
||||
log.Debug("proxynode", zap.String("data service address", dataServiceAddr))
|
||||
log.Debug("ProxyNode", zap.String("data service address", dataServiceAddr))
|
||||
s.dataServiceClient = grpcdataserviceclient.NewClient(dataServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, 10)
|
||||
err = s.dataServiceClient.Init()
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode dataServiceClient init failed ", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
s.proxynode.SetDataServiceClient(s.dataServiceClient)
|
||||
log.Debug("set data service address ...")
|
||||
|
||||
indexServiceAddr := Params.IndexServerAddress
|
||||
log.Debug("proxynode", zap.String("index server address", indexServiceAddr))
|
||||
log.Debug("ProxyNode", zap.String("index server address", indexServiceAddr))
|
||||
s.indexServiceClient = grpcindexserviceclient.NewClient(indexServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, 10)
|
||||
err = s.indexServiceClient.Init()
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode indexServiceClient init failed ", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
s.proxynode.SetIndexServiceClient(s.indexServiceClient)
|
||||
log.Debug("set index service client ...")
|
||||
|
||||
queryServiceAddr := Params.QueryServiceAddress
|
||||
log.Debug("proxynode", zap.String("query server address", queryServiceAddr))
|
||||
log.Debug("ProxyNode", zap.String("query server address", queryServiceAddr))
|
||||
s.queryServiceClient, err = grpcqueryserviceclient.NewClient(ctx, queryServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout)
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode new queryServiceClient failed ", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
err = s.queryServiceClient.Init()
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode queryServiceClient Init failed ", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
s.proxynode.SetQueryServiceClient(s.queryServiceClient)
|
||||
log.Debug("set query service client ...")
|
||||
|
||||
s.proxynode.UpdateStateCode(internalpb.StateCode_Initializing)
|
||||
log.Debug("proxynode",
|
||||
zap.Any("state of proxynode", internalpb.StateCode_Initializing))
|
||||
log.Debug("ProxyNode state",
|
||||
zap.Any("State", internalpb.StateCode_Initializing))
|
||||
|
||||
if err := s.proxynode.Init(); err != nil {
|
||||
log.Debug("proxynode", zap.String("proxynode init error", err.Error()))
|
||||
log.Debug("ProxyNode init failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -198,9 +198,9 @@ func (index *CIndexQuery) QueryOnBinaryVecIndex(vectors []byte) (QueryResult, er
|
|||
if len(vectors) <= 0 {
|
||||
return nil, errors.New("nq is zero")
|
||||
}
|
||||
res, err := CreateQueryResult()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
res, err1 := CreateQueryResult()
|
||||
if err1 != nil {
|
||||
return nil, err1
|
||||
}
|
||||
fn := func() C.CStatus {
|
||||
cRes, ok := res.(*CQueryResult)
|
||||
|
@ -210,7 +210,7 @@ func (index *CIndexQuery) QueryOnBinaryVecIndex(vectors []byte) (QueryResult, er
|
|||
}
|
||||
return C.QueryOnBinaryVecIndex(index.indexPtr, (C.int64_t)(len(vectors)), (*C.uint8_t)(&vectors[0]), &cRes.ptr)
|
||||
}
|
||||
err = TryCatch(fn)
|
||||
err := TryCatch(fn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -79,8 +79,8 @@ func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, e
|
|||
msFactory: factory,
|
||||
}
|
||||
node.UpdateStateCode(internalpb.StateCode_Abnormal)
|
||||
log.Debug("proxynode",
|
||||
zap.Any("state of proxynode", internalpb.StateCode_Abnormal))
|
||||
log.Debug("ProxyNode",
|
||||
zap.Any("State", "Abnormal"))
|
||||
return node, nil
|
||||
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ func (node *ProxyNode) Init() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("service was ready ...")
|
||||
log.Debug("ProxyService is ready ...")
|
||||
|
||||
request := &proxypb.RegisterNodeRequest{
|
||||
Address: &commonpb.Address{
|
||||
|
@ -112,54 +112,74 @@ func (node *ProxyNode) Init() error {
|
|||
|
||||
response, err := node.proxyService.RegisterNode(ctx, request)
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode RegisterNode failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if response.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
log.Debug("ProxyNode RegisterNode failed", zap.String("Reason", response.Status.Reason))
|
||||
return errors.New(response.Status.Reason)
|
||||
}
|
||||
|
||||
err = Params.LoadConfigFromInitParams(response.InitParams)
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode LoadConfigFromInitParams failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// wait for dataservice state changed to Healthy
|
||||
if node.dataService != nil {
|
||||
log.Debug("ProxyNode wait for dataService ready")
|
||||
err := funcutil.WaitForComponentHealthy(ctx, node.dataService, "DataService", 1000000, time.Millisecond*200)
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode wait for dataService ready failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("ProxyNode dataService is ready")
|
||||
}
|
||||
|
||||
// wait for queryService state changed to Healthy
|
||||
if node.queryService != nil {
|
||||
log.Debug("ProxyNode wait for queryService ready")
|
||||
err := funcutil.WaitForComponentHealthy(ctx, node.queryService, "QueryService", 1000000, time.Millisecond*200)
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode wait for queryService ready failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("ProxyNode queryService is ready")
|
||||
}
|
||||
|
||||
// wait for indexservice state changed to Healthy
|
||||
if node.indexService != nil {
|
||||
log.Debug("ProxyNode wait for indexService ready")
|
||||
err := funcutil.WaitForComponentHealthy(ctx, node.indexService, "IndexService", 1000000, time.Millisecond*200)
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode wait for indexService ready failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("ProxyNode indexService is ready")
|
||||
}
|
||||
|
||||
if node.queryService != nil {
|
||||
resp, err := node.queryService.CreateQueryChannel(ctx)
|
||||
if err != nil {
|
||||
log.Debug("ProxyNode CreateQueryChannel failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
log.Debug("ProxyNode CreateQueryChannel failed", zap.String("reason", resp.Status.Reason))
|
||||
|
||||
return errors.New(resp.Status.Reason)
|
||||
}
|
||||
log.Debug("ProxyNode CreateQueryChannel success")
|
||||
|
||||
Params.SearchChannelNames = []string{resp.RequestChannel}
|
||||
Params.SearchResultChannelNames = []string{resp.ResultChannel}
|
||||
Params.RetrieveChannelNames = []string{resp.RequestChannel}
|
||||
Params.RetrieveResultChannelNames = []string{resp.ResultChannel}
|
||||
log.Debug("ProxyNode CreateQueryChannel success", zap.Any("SearchChannelNames", Params.SearchChannelNames))
|
||||
log.Debug("ProxyNode CreateQueryChannel success", zap.Any("SearchResultChannelNames", Params.SearchResultChannelNames))
|
||||
log.Debug("ProxyNode CreateQueryChannel success", zap.Any("RetrieveChannelNames", Params.RetrieveChannelNames))
|
||||
log.Debug("ProxyNode CreateQueryChannel success", zap.Any("RetrieveResultChannelNames", Params.RetrieveResultChannelNames))
|
||||
}
|
||||
|
||||
// todo
|
||||
|
@ -245,9 +265,8 @@ func (node *ProxyNode) Start() error {
|
|||
}
|
||||
|
||||
node.UpdateStateCode(internalpb.StateCode_Healthy)
|
||||
log.Debug("proxynode",
|
||||
zap.Any("state of proxynode", internalpb.StateCode_Healthy))
|
||||
log.Debug("proxy node is healthy ...")
|
||||
log.Debug("ProxyNode",
|
||||
zap.Any("State", internalpb.StateCode_Healthy))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -889,6 +889,9 @@ func (st *SearchTask) Execute(ctx context.Context) error {
|
|||
msgPack.Msgs[0] = tsMsg
|
||||
err := st.queryMsgStream.Produce(&msgPack)
|
||||
log.Debug("proxynode", zap.Int("length of searchMsg", len(msgPack.Msgs)))
|
||||
log.Debug("proxy node sent one searchMsg",
|
||||
zap.Any("msgID", tsMsg.ID()),
|
||||
zap.Any("collectionID", st.CollectionID))
|
||||
if err != nil {
|
||||
log.Debug("proxynode", zap.String("send search request failed", err.Error()))
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ package querynode
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
|
@ -71,28 +70,30 @@ func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri
|
|||
}
|
||||
|
||||
func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelRequest) (*commonpb.Status, error) {
|
||||
if node.searchService == nil || node.searchService.searchMsgStream == nil {
|
||||
errMsg := "null search service or null search message stream"
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: errMsg,
|
||||
}
|
||||
//if node.searchService == nil || node.searchService.searchMsgStream == nil {
|
||||
// errMsg := "null search service or null search message stream"
|
||||
// status := &commonpb.Status{
|
||||
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
// Reason: errMsg,
|
||||
// }
|
||||
//
|
||||
// return status, errors.New(errMsg)
|
||||
//}
|
||||
//
|
||||
//// add request channel
|
||||
//consumeChannels := []string{in.RequestChannelID}
|
||||
//consumeSubName := Params.MsgChannelSubName
|
||||
//node.searchService.searchMsgStream.AsConsumer(consumeChannels, consumeSubName)
|
||||
//node.retrieveService.retrieveMsgStream.AsConsumer(consumeChannels, "RetrieveSubName")
|
||||
//log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
|
||||
//
|
||||
//// add result channel
|
||||
//producerChannels := []string{in.ResultChannelID}
|
||||
//node.searchService.searchResultMsgStream.AsProducer(producerChannels)
|
||||
//node.retrieveService.retrieveResultMsgStream.AsProducer(producerChannels)
|
||||
//log.Debug("querynode AsProducer: " + strings.Join(producerChannels, ", "))
|
||||
|
||||
return status, errors.New(errMsg)
|
||||
}
|
||||
|
||||
// add request channel
|
||||
consumeChannels := []string{in.RequestChannelID}
|
||||
consumeSubName := Params.MsgChannelSubName
|
||||
node.searchService.searchMsgStream.AsConsumer(consumeChannels, consumeSubName)
|
||||
node.retrieveService.retrieveMsgStream.AsConsumer(consumeChannels, "RetrieveSubName")
|
||||
log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
|
||||
|
||||
// add result channel
|
||||
producerChannels := []string{in.ResultChannelID}
|
||||
node.searchService.searchResultMsgStream.AsProducer(producerChannels)
|
||||
node.retrieveService.retrieveResultMsgStream.AsProducer(producerChannels)
|
||||
log.Debug("querynode AsProducer: " + strings.Join(producerChannels, ", "))
|
||||
// Do nothing
|
||||
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
|
|
|
@ -158,7 +158,7 @@ func (node *QueryNode) Init() error {
|
|||
}
|
||||
}
|
||||
|
||||
log.Debug("", zap.Int64("QueryNodeID", Params.QueryNodeID))
|
||||
log.Debug("QueryNode Init ", zap.Int64("QueryNodeID", Params.QueryNodeID), zap.Any("searchChannelNames", Params.SearchChannelNames))
|
||||
|
||||
if node.masterService == nil {
|
||||
log.Error("null master service detected")
|
||||
|
|
|
@ -47,6 +47,7 @@ func newSearchService(ctx context.Context,
|
|||
|
||||
searchStream, _ := factory.NewQueryMsgStream(ctx)
|
||||
searchResultStream, _ := factory.NewQueryMsgStream(ctx)
|
||||
log.Debug("newSearchService", zap.Any("SearchChannelNames", Params.SearchChannelNames), zap.Any("SearchResultChannels", Params.SearchResultChannelNames))
|
||||
|
||||
if len(Params.SearchChannelNames) > 0 && len(Params.SearchResultChannelNames) > 0 {
|
||||
// query node need to consume search channels and produce search result channels when init.
|
||||
|
@ -102,14 +103,24 @@ func (s *searchService) consumeSearch() {
|
|||
default:
|
||||
msgPack := s.searchMsgStream.Consume()
|
||||
if msgPack == nil || len(msgPack.Msgs) <= 0 {
|
||||
msgPackNil := msgPack == nil
|
||||
msgPackEmpty := true
|
||||
if msgPack != nil {
|
||||
msgPackEmpty = len(msgPack.Msgs) <= 0
|
||||
}
|
||||
log.Debug("consume search message failed", zap.Any("msgPack is Nil", msgPackNil),
|
||||
zap.Any("msgPackEmpty", msgPackEmpty))
|
||||
|
||||
continue
|
||||
}
|
||||
for _, msg := range msgPack.Msgs {
|
||||
log.Debug("consume search message", zap.Int64("msgID", msg.ID()))
|
||||
sm, ok := msg.(*msgstream.SearchMsg)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
log.Debug("consume search message",
|
||||
zap.Int64("msgID", msg.ID()),
|
||||
zap.Any("collectionID", sm.CollectionID))
|
||||
sp, ctx := trace.StartSpanFromContext(sm.TraceCtx())
|
||||
sm.SetTraceCtx(ctx)
|
||||
err := s.collectionCheck(sm.CollectionID)
|
||||
|
|
|
@ -76,11 +76,14 @@ func (qs *QueryService) GetStatisticsChannel(ctx context.Context) (*milvuspb.Str
|
|||
}
|
||||
|
||||
func (qs *QueryService) RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
|
||||
log.Debug("register query node", zap.String("address", req.Address.String()))
|
||||
// TODO:: add mutex
|
||||
nodeID := req.Base.SourceID
|
||||
log.Debug("register query node", zap.Any("QueryNodeID", nodeID), zap.String("address", req.Address.String()))
|
||||
|
||||
if _, ok := qs.queryNodes[nodeID]; ok {
|
||||
err := errors.New("nodeID already exists")
|
||||
log.Debug("register query node Failed nodeID already exist", zap.Any("QueryNodeID", nodeID), zap.String("address", req.Address.String()))
|
||||
|
||||
return &querypb.RegisterNodeResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
|
@ -92,6 +95,8 @@ func (qs *QueryService) RegisterNode(ctx context.Context, req *querypb.RegisterN
|
|||
registerNodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10)
|
||||
client, err := nodeclient.NewClient(registerNodeAddress)
|
||||
if err != nil {
|
||||
log.Debug("register query node new NodeClient failed", zap.Any("QueryNodeID", nodeID), zap.String("address", req.Address.String()))
|
||||
|
||||
return &querypb.RegisterNodeResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
|
@ -100,6 +105,8 @@ func (qs *QueryService) RegisterNode(ctx context.Context, req *querypb.RegisterN
|
|||
}, err
|
||||
}
|
||||
if err := client.Init(); err != nil {
|
||||
log.Debug("register query node client init failed", zap.Any("QueryNodeID", nodeID), zap.String("address", req.Address.String()))
|
||||
|
||||
return &querypb.RegisterNodeResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
|
@ -108,6 +115,7 @@ func (qs *QueryService) RegisterNode(ctx context.Context, req *querypb.RegisterN
|
|||
}, err
|
||||
}
|
||||
if err := client.Start(); err != nil {
|
||||
log.Debug("register query node client start failed", zap.Any("QueryNodeID", nodeID), zap.String("address", req.Address.String()))
|
||||
return nil, err
|
||||
}
|
||||
qs.queryNodes[nodeID] = newQueryNodeInfo(client)
|
||||
|
@ -129,6 +137,7 @@ func (qs *QueryService) RegisterNode(ctx context.Context, req *querypb.RegisterN
|
|||
})
|
||||
}
|
||||
qs.qcMutex.Unlock()
|
||||
log.Debug("register query node success", zap.Any("QueryNodeID", nodeID), zap.String("address", req.Address.String()), zap.Any("StartParams", startParams))
|
||||
|
||||
return &querypb.RegisterNodeResponse{
|
||||
Status: &commonpb.Status{
|
||||
|
@ -409,18 +418,9 @@ func (qs *QueryService) ReleasePartitions(ctx context.Context, req *querypb.Rele
|
|||
}
|
||||
|
||||
func (qs *QueryService) CreateQueryChannel(ctx context.Context) (*querypb.CreateQueryChannelResponse, error) {
|
||||
channelID := len(qs.queryChannels)
|
||||
searchPrefix := Params.SearchChannelPrefix
|
||||
searchResultPrefix := Params.SearchResultChannelPrefix
|
||||
allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
|
||||
allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
|
||||
|
||||
qs.qcMutex.Lock()
|
||||
qs.queryChannels = append(qs.queryChannels, &queryChannelInfo{
|
||||
requestChannel: allocatedQueryChannel,
|
||||
responseChannel: allocatedQueryResultChannel,
|
||||
})
|
||||
|
||||
allocatedQueryChannel := qs.queryChannels[0].requestChannel
|
||||
allocatedQueryResultChannel := qs.queryChannels[0].responseChannel
|
||||
addQueryChannelsRequest := &querypb.AddQueryChannelRequest{
|
||||
RequestChannelID: allocatedQueryChannel,
|
||||
ResultChannelID: allocatedQueryResultChannel,
|
||||
|
|
|
@ -14,6 +14,7 @@ package queryservice
|
|||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -65,6 +66,7 @@ func (qs *QueryService) Register() error {
|
|||
}
|
||||
|
||||
func (qs *QueryService) Init() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -91,6 +93,17 @@ func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryServ
|
|||
rand.Seed(time.Now().UnixNano())
|
||||
nodes := make(map[int64]*queryNodeInfo)
|
||||
queryChannels := make([]*queryChannelInfo, 0)
|
||||
channelID := len(queryChannels)
|
||||
searchPrefix := Params.SearchChannelPrefix
|
||||
searchResultPrefix := Params.SearchResultChannelPrefix
|
||||
allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
|
||||
allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
|
||||
|
||||
queryChannels = append(queryChannels, &queryChannelInfo{
|
||||
requestChannel: allocatedQueryChannel,
|
||||
responseChannel: allocatedQueryResultChannel,
|
||||
})
|
||||
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
replica := newMetaReplica()
|
||||
scheduler := NewTaskScheduler(ctx1)
|
||||
|
|
|
@ -1,30 +1,30 @@
|
|||
cd ..
|
||||
|
||||
echo "starting master"
|
||||
nohup ./bin/masterservice > ~/masterservice.out 2>&1 &
|
||||
nohup ./bin/milvus run master > ~/masterservice.out 2>&1 &
|
||||
|
||||
echo "starting dataservice"
|
||||
nohup ./bin/dataservice > ~/dataservice.out 2>&1 &
|
||||
nohup ./bin/milvus run dataservice > ~/dataservice.out 2>&1 &
|
||||
|
||||
echo "starting datanode"
|
||||
nohup ./bin/datanode > ~/datanode.out 2>&1 &
|
||||
nohup ./bin/milvus run datanode > ~/datanode.out 2>&1 &
|
||||
|
||||
echo "starting proxyservice"
|
||||
nohup ./bin/proxyservice > ~/proxyservice.out 2>&1 &
|
||||
nohup ./bin/milvus run proxyservice > ~/proxyservice.out 2>&1 &
|
||||
|
||||
echo "starting proxynode"
|
||||
nohup ./bin/proxynode > ~/proxynode.out 2>&1 &
|
||||
nohup ./bin/milvus run proxynode > ~/proxynode.out 2>&1 &
|
||||
|
||||
echo "starting queryservice"
|
||||
nohup ./bin/queryservice > ~/queryservice.out 2>&1 &
|
||||
nohup ./bin/milvus run queryservice > ~/queryservice.out 2>&1 &
|
||||
|
||||
echo "starting querynode1"
|
||||
export QUERY_NODE_ID=1
|
||||
nohup ./bin/querynode > ~/querynode1.out 2>&1 &
|
||||
nohup ./bin/milvus run querynode > ~/querynode1.out 2>&1 &
|
||||
|
||||
|
||||
echo "starting indexservice"
|
||||
nohup ./bin/indexservice > ~/indexservice.out 2>&1 &
|
||||
nohup ./bin/milvus run indexservice > ~/indexservice.out 2>&1 &
|
||||
|
||||
echo "starting indexnode"
|
||||
nohup ./bin/indexnode > ~/indexnode.out 2>&1 &
|
||||
nohup ./bin/milvus run indexnode > ~/indexnode.out 2>&1 &
|
||||
|
|
|
@ -1,29 +1,5 @@
|
|||
echo "stopping masterservice"
|
||||
kill -9 $(ps -e | grep masterservice | awk '{print $1}')
|
||||
|
||||
echo "stopping proxyservice"
|
||||
kill -9 $(ps -e | grep proxyservice | awk '{print $1}')
|
||||
|
||||
echo "stopping proxynode"
|
||||
kill -9 $(ps -e | grep proxynode | awk '{print $1}')
|
||||
|
||||
echo "stopping queryservice"
|
||||
kill -9 $(ps -e | grep queryservice | awk '{print $1}')
|
||||
|
||||
echo "stopping querynode"
|
||||
kill -9 $(ps -e | grep querynode | awk '{print $1}')
|
||||
|
||||
echo "stopping dataservice"
|
||||
kill -9 $(ps -e | grep dataservice | awk '{print $1}')
|
||||
|
||||
echo "stopping datanode"
|
||||
kill -9 $(ps -e | grep datanode | awk '{print $1}')
|
||||
|
||||
echo "stopping indexservice"
|
||||
kill -9 $(ps -e | grep indexservice | awk '{print $1}')
|
||||
|
||||
echo "stopping indexnode"
|
||||
kill -9 $(ps -e | grep indexnode | awk '{print $1}')
|
||||
echo "stopping milvus"
|
||||
kill -9 $(ps -e | grep milvus | awk '{print $1}')
|
||||
|
||||
echo "completed"
|
||||
|
||||
|
|
Loading…
Reference in New Issue