mirror of https://github.com/milvus-io/milvus.git
Implement getSegmentInfo interfaces
Signed-off-by: sunby <bingyi.sun@zilliz.com>pull/4973/head^2
parent
1189332786
commit
e7588d3250
|
@ -50,6 +50,7 @@ type DataService interface {
|
|||
GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error)
|
||||
GetComponentStates() (*internalpb2.ComponentStates, error)
|
||||
GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error)
|
||||
GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error)
|
||||
}
|
||||
|
||||
type MasterClient interface {
|
||||
|
@ -728,6 +729,10 @@ func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.Collectio
|
|||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
},
|
||||
}
|
||||
if !s.checkStateIsHealthy() {
|
||||
resp.Status.Reason = "data service is not healthy"
|
||||
return resp, nil
|
||||
}
|
||||
nums, err := s.meta.GetNumRowsOfCollection(req.CollectionID)
|
||||
if err != nil {
|
||||
resp.Status.Reason = err.Error()
|
||||
|
@ -737,3 +742,27 @@ func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.Collectio
|
|||
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Server) GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
|
||||
resp := &datapb.SegmentInfoResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
},
|
||||
}
|
||||
if !s.checkStateIsHealthy() {
|
||||
resp.Status.Reason = "data service is not healthy"
|
||||
return resp, nil
|
||||
}
|
||||
infos := make([]*datapb.SegmentInfo, len(req.SegmentIDs))
|
||||
for i, id := range req.SegmentIDs {
|
||||
segmentInfo, err := s.meta.GetSegment(id)
|
||||
if err != nil {
|
||||
resp.Status.Reason = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
infos[i] = segmentInfo
|
||||
}
|
||||
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
|
||||
resp.Infos = infos
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -137,3 +137,7 @@ func (c *Client) GetSegmentInfoChannel() (string, error) {
|
|||
func (c *Client) GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) {
|
||||
return c.grpcClient.GetCount(context.Background(), req)
|
||||
}
|
||||
|
||||
func (c *Client) GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
|
||||
return c.grpcClient.GetSegmentInfo(context.Background(), req)
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ type Service struct {
|
|||
}
|
||||
|
||||
func (s *Service) GetSegmentInfo(ctx context.Context, request *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
|
||||
panic("implement me")
|
||||
return s.server.GetSegmentInfo(request)
|
||||
}
|
||||
|
||||
func NewGrpcService(ctx context.Context) *Service {
|
||||
|
|
|
@ -322,5 +322,5 @@ func (s *Server) GetDdChannel(ctx context.Context, request *commonpb.Empty) (*mi
|
|||
}
|
||||
|
||||
func (s *Server) GetPersistentSegmentInfo(ctx context.Context, request *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
|
||||
panic("implement me")
|
||||
return s.impl.GetPersistentSegmentInfo(request)
|
||||
}
|
||||
|
|
|
@ -249,28 +249,12 @@ func TestMasterService(t *testing.T) {
|
|||
|
||||
msg, ok := <-ddStream.Chan()
|
||||
assert.True(t, ok)
|
||||
assert.True(t, len(msg.Msgs) == 2 || len(msg.Msgs) == 1)
|
||||
|
||||
assert.Equal(t, len(msg.Msgs), 1)
|
||||
createMsg, ok := (msg.Msgs[0]).(*ms.CreateCollectionMsg)
|
||||
assert.True(t, ok)
|
||||
createMeta, err := core.MetaTable.GetCollectionByName("testColl")
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, createMsg.CollectionID, createMeta.ID)
|
||||
assert.Equal(t, len(createMeta.PartitionIDs), 1)
|
||||
|
||||
if len(msg.Msgs) == 2 {
|
||||
createPart, ok := (msg.Msgs[1]).(*ms.CreatePartitionMsg)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, createPart.CollectionName, "testColl")
|
||||
assert.Equal(t, createPart.PartitionID, createMeta.PartitionIDs[0])
|
||||
} else {
|
||||
msg, ok = <-ddStream.Chan()
|
||||
assert.True(t, ok)
|
||||
createPart, ok := (msg.Msgs[0]).(*ms.CreatePartitionMsg)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, createPart.CollectionName, "testColl")
|
||||
assert.Equal(t, createPart.PartitionID, createMeta.PartitionIDs[0])
|
||||
}
|
||||
|
||||
req.Base.MsgID = 101
|
||||
req.Base.Timestamp = 101
|
||||
|
|
|
@ -147,26 +147,6 @@ func (t *CreateCollectionReqTask) Execute() error {
|
|||
return err
|
||||
}
|
||||
|
||||
ddPart := internalpb2.CreatePartitionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kCreatePartition,
|
||||
MsgID: t.Req.Base.MsgID, //TODO, msg id
|
||||
Timestamp: t.Req.Base.Timestamp + 1,
|
||||
SourceID: t.Req.Base.SourceID,
|
||||
},
|
||||
DbName: t.Req.DbName,
|
||||
CollectionName: t.Req.CollectionName,
|
||||
PartitionName: Params.DefaultPartitionName,
|
||||
DbID: 0, //TODO, not used
|
||||
CollectionID: collMeta.ID,
|
||||
PartitionID: partMeta.PartitionID,
|
||||
}
|
||||
|
||||
err = t.core.DdCreatePartitionReq(&ddPart)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -2,10 +2,13 @@ package proxynode
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
|
||||
|
@ -590,3 +593,109 @@ func (node *NodeImpl) Flush(request *milvuspb.FlushRequest) (*commonpb.Status, e
|
|||
func (node *NodeImpl) GetDdChannel(request *commonpb.Empty) (*milvuspb.StringResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (node *NodeImpl) GetPersistentSegmentInfo(req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
|
||||
resp := &milvuspb.PersistentSegmentInfoResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
},
|
||||
}
|
||||
segments, err := node.getSegmentsOfCollection(req.DbName, req.CollectionName)
|
||||
if err != nil {
|
||||
resp.Status.Reason = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
infoResp, err := node.dataServiceClient.GetSegmentInfo(&datapb.SegmentInfoRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kSegmentInfo,
|
||||
MsgID: 0,
|
||||
Timestamp: 0,
|
||||
SourceID: Params.ProxyID,
|
||||
},
|
||||
SegmentIDs: segments,
|
||||
})
|
||||
if err != nil {
|
||||
resp.Status.Reason = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
if infoResp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
||||
resp.Status.Reason = infoResp.Status.Reason
|
||||
return resp, nil
|
||||
}
|
||||
persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
|
||||
for i, info := range infoResp.Infos {
|
||||
persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
|
||||
SegmentID: info.SegmentID,
|
||||
CollectionID: info.CollectionID,
|
||||
PartitionID: info.PartitionID,
|
||||
OpenTime: info.OpenTime,
|
||||
SealedTime: info.SealedTime,
|
||||
FlushedTime: info.FlushedTime,
|
||||
NumRows: info.NumRows,
|
||||
MemSize: info.MemSize,
|
||||
State: info.State,
|
||||
}
|
||||
}
|
||||
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
|
||||
resp.Infos = persistentInfos
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (node *NodeImpl) getSegmentsOfCollection(dbName string, collectionName string) ([]UniqueID, error) {
|
||||
describeCollectionResponse, err := node.masterClient.DescribeCollection(&milvuspb.DescribeCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kDescribeCollection,
|
||||
MsgID: 0,
|
||||
Timestamp: 0,
|
||||
SourceID: Params.ProxyID,
|
||||
},
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if describeCollectionResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
||||
return nil, errors.New(describeCollectionResponse.Status.Reason)
|
||||
}
|
||||
collectionID := describeCollectionResponse.CollectionID
|
||||
showPartitionsResp, err := node.masterClient.ShowPartitions(&milvuspb.ShowPartitionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kShowPartitions,
|
||||
MsgID: 0,
|
||||
Timestamp: 0,
|
||||
SourceID: Params.ProxyID,
|
||||
},
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
CollectionID: collectionID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if showPartitionsResp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
||||
return nil, errors.New(showPartitionsResp.Status.Reason)
|
||||
}
|
||||
|
||||
ret := make([]UniqueID, 0)
|
||||
for _, partitionID := range showPartitionsResp.PartitionIDs {
|
||||
showSegmentResponse, err := node.masterClient.ShowSegments(&milvuspb.ShowSegmentRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kShowSegment,
|
||||
MsgID: 0,
|
||||
Timestamp: 0,
|
||||
SourceID: Params.ProxyID,
|
||||
},
|
||||
CollectionID: collectionID,
|
||||
PartitionID: partitionID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
||||
return nil, errors.New(showSegmentResponse.Status.Reason)
|
||||
}
|
||||
ret = append(ret, showSegmentResponse.SegmentIDs...)
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ type MasterClient interface {
|
|||
ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
|
||||
CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
|
||||
DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
|
||||
ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error)
|
||||
}
|
||||
|
||||
type IndexServiceClient interface {
|
||||
|
@ -51,6 +52,7 @@ type DataServiceClient interface {
|
|||
GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error)
|
||||
|
||||
GetComponentStates() (*internalpb2.ComponentStates, error)
|
||||
GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error)
|
||||
}
|
||||
|
||||
type ProxyServiceClient interface {
|
||||
|
|
Loading…
Reference in New Issue