Fix datanode init bug

Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
pull/4973/head^2
XuanYang-cn 2021-02-04 10:25:01 +08:00 committed by yefu.chen
parent 6e7e0b748a
commit bc7ad02df4
7 changed files with 148 additions and 4 deletions

View File

@ -208,8 +208,8 @@ func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*common
switch {
case node.State != internalpb2.StateCode_HEALTHY:
status.Reason = fmt.Sprintf("DataNode %d not healthy!", node.NodeID)
case node.State != internalpb2.StateCode_INITIALIZING:
status.Reason = fmt.Sprintf("DataNode %d not initializing!", node.NodeID)
return status, errors.New(status.GetReason())
case len(Params.InsertChannelNames) != 0:

View File

@ -50,6 +50,7 @@ type DataService interface {
GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error)
GetComponentStates() (*internalpb2.ComponentStates, error)
GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error)
GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error)
}
type MasterClient interface {
@ -728,6 +729,10 @@ func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.Collectio
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
if !s.checkStateIsHealthy() {
resp.Status.Reason = "data service is not healthy"
return resp, nil
}
nums, err := s.meta.GetNumRowsOfCollection(req.CollectionID)
if err != nil {
resp.Status.Reason = err.Error()
@ -737,3 +742,27 @@ func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.Collectio
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
return resp, nil
}
func (s *Server) GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
resp := &datapb.SegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
if !s.checkStateIsHealthy() {
resp.Status.Reason = "data service is not healthy"
return resp, nil
}
infos := make([]*datapb.SegmentInfo, len(req.SegmentIDs))
for i, id := range req.SegmentIDs {
segmentInfo, err := s.meta.GetSegment(id)
if err != nil {
resp.Status.Reason = err.Error()
return resp, nil
}
infos[i] = segmentInfo
}
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
resp.Infos = infos
return resp, nil
}

View File

@ -137,3 +137,7 @@ func (c *Client) GetSegmentInfoChannel() (string, error) {
func (c *Client) GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) {
return c.grpcClient.GetCount(context.Background(), req)
}
func (c *Client) GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
return c.grpcClient.GetSegmentInfo(context.Background(), req)
}

View File

@ -25,7 +25,7 @@ type Service struct {
}
func (s *Service) GetSegmentInfo(ctx context.Context, request *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
panic("implement me")
return s.server.GetSegmentInfo(request)
}
func NewGrpcService(ctx context.Context) *Service {

View File

@ -322,5 +322,5 @@ func (s *Server) GetDdChannel(ctx context.Context, request *commonpb.Empty) (*mi
}
func (s *Server) GetPersistentSegmentInfo(ctx context.Context, request *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
panic("implement me")
return s.impl.GetPersistentSegmentInfo(request)
}

View File

@ -2,10 +2,13 @@ package proxynode
import (
"context"
"errors"
"log"
"strconv"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@ -590,3 +593,109 @@ func (node *NodeImpl) Flush(request *milvuspb.FlushRequest) (*commonpb.Status, e
func (node *NodeImpl) GetDdChannel(request *commonpb.Empty) (*milvuspb.StringResponse, error) {
panic("implement me")
}
func (node *NodeImpl) GetPersistentSegmentInfo(req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
resp := &milvuspb.PersistentSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
segments, err := node.getSegmentsOfCollection(req.DbName, req.CollectionName)
if err != nil {
resp.Status.Reason = err.Error()
return resp, nil
}
infoResp, err := node.dataServiceClient.GetSegmentInfo(&datapb.SegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kSegmentInfo,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
SegmentIDs: segments,
})
if err != nil {
resp.Status.Reason = err.Error()
return resp, nil
}
if infoResp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
resp.Status.Reason = infoResp.Status.Reason
return resp, nil
}
persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
for i, info := range infoResp.Infos {
persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
SegmentID: info.SegmentID,
CollectionID: info.CollectionID,
PartitionID: info.PartitionID,
OpenTime: info.OpenTime,
SealedTime: info.SealedTime,
FlushedTime: info.FlushedTime,
NumRows: info.NumRows,
MemSize: info.MemSize,
State: info.State,
}
}
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
resp.Infos = persistentInfos
return resp, nil
}
func (node *NodeImpl) getSegmentsOfCollection(dbName string, collectionName string) ([]UniqueID, error) {
describeCollectionResponse, err := node.masterClient.DescribeCollection(&milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeCollection,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
DbName: dbName,
CollectionName: collectionName,
})
if err != nil {
return nil, err
}
if describeCollectionResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, errors.New(describeCollectionResponse.Status.Reason)
}
collectionID := describeCollectionResponse.CollectionID
showPartitionsResp, err := node.masterClient.ShowPartitions(&milvuspb.ShowPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowPartitions,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
DbName: dbName,
CollectionName: collectionName,
CollectionID: collectionID,
})
if err != nil {
return nil, err
}
if showPartitionsResp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, errors.New(showPartitionsResp.Status.Reason)
}
ret := make([]UniqueID, 0)
for _, partitionID := range showPartitionsResp.PartitionIDs {
showSegmentResponse, err := node.masterClient.ShowSegments(&milvuspb.ShowSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowSegment,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
CollectionID: collectionID,
PartitionID: partitionID,
})
if err != nil {
return nil, err
}
if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, errors.New(showSegmentResponse.Status.Reason)
}
ret = append(ret, showSegmentResponse.SegmentIDs...)
}
return ret, nil
}

View File

@ -22,6 +22,7 @@ type MasterClient interface {
ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error)
}
type IndexServiceClient interface {
@ -51,6 +52,7 @@ type DataServiceClient interface {
GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error)
GetComponentStates() (*internalpb2.ComponentStates, error)
GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error)
}
type ProxyServiceClient interface {