Add error returned by Init,Start,Stop functions

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2021-01-19 17:38:01 +08:00 committed by yefu.chen
parent 83c60fd84f
commit be6775671b
22 changed files with 170 additions and 238 deletions

View File

@ -78,9 +78,9 @@ In order to boost throughput, we model Milvus as a stream-driven system.
```go
type Component interface {
Init()
Start()
Stop()
Init() error
Start() error
Stop() error
GetComponentStates() (ComponentStates, error)
GetTimeTickChannel() (string, error)
GetStatisticsChannel() (string, error)

View File

@ -11,15 +11,15 @@ type Client struct {
// GOOSE TODO: add DataNodeClient
}
func (c *Client) Init() {
func (c *Client) Init() error {
panic("implement me")
}
func (c *Client) Start() {
func (c *Client) Start() error {
panic("implement me")
}
func (c *Client) Stop() {
func (c *Client) Stop() error {
panic("implement me")
}

View File

@ -34,7 +34,7 @@ func (g Client) GetIndexStates(req *indexpb.IndexStatesRequest) (*indexpb.IndexS
return g.grpcClient.GetIndexStates(ctx, req)
}
func (g Client) GetIndexFilePaths(req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
func (g Client) GetIndexFilePaths(req *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error) {
ctx := context.TODO()

View File

@ -28,8 +28,9 @@ type Server struct {
loopWg sync.WaitGroup
}
func (s *Server) Init() {
func (s *Server) Init() error {
indexservice.Params.Init()
return nil
}
func (s *Server) Start() error {
@ -37,8 +38,9 @@ func (s *Server) Start() error {
return s.startIndexServer()
}
func (s *Server) Stop() {
func (s *Server) Stop() error {
s.loopWg.Wait()
return nil
}
func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) {
@ -69,7 +71,7 @@ func (s *Server) GetIndexStates(ctx context.Context, req *indexpb.IndexStatesReq
return s.server.GetIndexStates(req)
}
func (s *Server) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
func (s *Server) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error) {
return s.server.GetIndexFilePaths(req)
}

View File

@ -13,15 +13,15 @@ type Client struct {
grpcClient querypb.QueryNodeClient
}
func (c *Client) Init() {
func (c *Client) Init() error {
panic("implement me")
}
func (c *Client) Start() {
func (c *Client) Start() error {
panic("implement me")
}
func (c *Client) Stop() {
func (c *Client) Stop() error {
panic("implement me")
}

View File

@ -9,15 +9,15 @@ type Client struct {
grpcClient querypb.QueryServiceClient
}
func (c *Client) Init() {
func (c *Client) Init() error {
panic("implement me")
}
func (c *Client) Start() {
func (c *Client) Start() error {
panic("implement me")
}
func (c *Client) Stop() {
func (c *Client) Stop() error {
panic("implement me")
}

View File

@ -13,15 +13,15 @@ type Server struct {
queryService queryServiceImpl.Interface
}
func (s *Server) Init() {
func (s *Server) Init() error {
panic("implement me")
}
func (s *Server) Start() {
func (s *Server) Start() error {
panic("implement me")
}
func (s *Server) Stop() {
func (s *Server) Stop() error {
panic("implement me")
}

View File

@ -133,13 +133,13 @@ func (c *Client) GetIndexStates(indexIDs []UniqueID) (*indexpb.IndexStatesRespon
return response, err
}
func (c *Client) GetIndexFilePaths(indexIDs []UniqueID) ([][]string, error) {
func (c *Client) GetIndexFilePaths(indexID UniqueID) ([]string, error) {
if c.tryConnect() != nil {
panic("GetIndexFilePaths: failed to connect index builder")
}
ctx := context.TODO()
request := &indexpb.IndexFilePathsRequest{
IndexIDs: indexIDs,
request := &indexpb.IndexFilePathRequest{
IndexID: indexID,
}
response, err := c.client.GetIndexFilePaths(ctx, request)
@ -147,15 +147,5 @@ func (c *Client) GetIndexFilePaths(indexIDs []UniqueID) ([][]string, error) {
return nil, err
}
var filePaths [][]string
for _, indexID := range indexIDs {
for _, filePathInfo := range response.FilePaths {
if indexID == filePathInfo.IndexID {
filePaths = append(filePaths, filePathInfo.IndexFilePaths)
break
}
}
}
return filePaths, nil
return response.IndexFilePaths, nil
}

View File

@ -74,25 +74,16 @@ func (b *Builder) GetIndexStates(ctx context.Context, request *indexpb.IndexStat
return ret, nil
}
func (b *Builder) GetIndexFilePaths(ctx context.Context, request *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
func (b *Builder) GetIndexFilePaths(ctx context.Context, request *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error) {
ret := &indexpb.IndexFilePathsResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS},
IndexID: request.IndexID,
}
var filePathInfos []*indexpb.IndexFilePathInfo
for _, indexID := range request.IndexIDs {
filePathInfo := &indexpb.IndexFilePathInfo{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS},
IndexID: indexID,
}
filePaths, err := b.metaTable.GetIndexFilePaths(indexID)
if err != nil {
filePathInfo.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
filePathInfo.Status.Reason = err.Error()
}
filePathInfo.IndexFilePaths = filePaths
filePathInfos = append(filePathInfos, filePathInfo)
filePaths, err := b.metaTable.GetIndexFilePaths(request.IndexID)
if err != nil {
ret.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
ret.Status.Reason = err.Error()
}
ret.FilePaths = filePathInfos
ret.IndexFilePaths = filePaths
return ret, nil
}

View File

@ -45,15 +45,15 @@ type IndexNode struct {
//serviceClient indexservice.Interface // method factory
}
func (i *IndexNode) Init() {
func (i *IndexNode) Init() error {
panic("implement me")
}
func (i *IndexNode) Start() {
func (i *IndexNode) Start() error {
panic("implement me")
}
func (i *IndexNode) Stop() {
func (i *IndexNode) Stop() error {
panic("implement me")
}

View File

@ -124,7 +124,8 @@ func TestBuilder_GRPC(t *testing.T) {
assert.Equal(t, commonpb.IndexState_INPROGRESS, description.States[0].State)
assert.Equal(t, indexID, description.States[0].IndexID)
indexDataPaths, err := buildClient.GetIndexFilePaths([]UniqueID{indexID})
indexDataPaths, err := buildClient.GetIndexFilePaths(indexID)
assert.Nil(t, err)
assert.Nil(t, indexDataPaths[0])
assert.Nil(t, indexDataPaths)
}

View File

@ -46,15 +46,15 @@ type IndexService struct {
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
func (i *IndexService) Init() {
func (i *IndexService) Init() error {
panic("implement me")
}
func (i *IndexService) Start() {
func (i *IndexService) Start() error {
panic("implement me")
}
func (i *IndexService) Stop() {
func (i *IndexService) Stop() error {
panic("implement me")
}
@ -149,7 +149,7 @@ func (i *IndexService) GetIndexStates(req *indexpb.IndexStatesRequest) (*indexpb
return ret, nil
}
func (i *IndexService) GetIndexFilePaths(req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
func (i *IndexService) GetIndexFilePaths(req *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error) {
panic("implement me")
}

View File

@ -13,6 +13,6 @@ type Interface interface {
RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error)
BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error)
GetIndexStates(req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error)
GetIndexFilePaths(req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error)
GetIndexFilePaths(req *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error)
NotifyBuildIndex(nty *indexpb.BuildIndexNotification) (*commonpb.Status, error)
}

View File

@ -65,7 +65,7 @@ func (m *MockWriteNodeClient) GetInsertBinlogPaths(segmentID UniqueID) (map[Uniq
type BuildIndexClient interface {
BuildIndex(columnDataPaths []string, typeParams map[string]string, indexParams map[string]string) (UniqueID, error)
GetIndexStates(indexIDs []UniqueID) (*indexpb.IndexStatesResponse, error)
GetIndexFilePaths(indexID []UniqueID) ([][]string, error)
GetIndexFilePaths(indexID UniqueID) ([]string, error)
}
type MockBuildIndexClient struct {
@ -107,8 +107,8 @@ func (m *MockBuildIndexClient) GetIndexStates(indexIDs []UniqueID) (*indexpb.Ind
return ret, nil
}
func (m *MockBuildIndexClient) GetIndexFilePaths(indexIDs []UniqueID) ([][]string, error) {
return [][]string{{"/binlog/index/file_1", "/binlog/index/file_2", "/binlog/index/file_3"}}, nil
func (m *MockBuildIndexClient) GetIndexFilePaths(indexID UniqueID) ([]string, error) {
return []string{"/binlog/index/file_1", "/binlog/index/file_2", "/binlog/index/file_3"}, nil
}
type LoadIndexClient interface {

View File

@ -116,11 +116,10 @@ func (scheduler *IndexBuildScheduler) describe() error {
}
if description.States[0].State == commonpb.IndexState_FINISHED {
log.Printf("build index for segment %d field %d is finished", indexBuildInfo.segmentID, indexBuildInfo.fieldID)
filesPaths, err := scheduler.client.GetIndexFilePaths([]UniqueID{indexID})
filePaths, err := scheduler.client.GetIndexFilePaths(indexID)
if err != nil {
return err
}
filePaths := filesPaths[0]
//TODO: remove fileName
var fieldName string

View File

@ -34,9 +34,9 @@ message IndexStatesResponse {
}
message BuildIndexRequest {
repeated string data_paths = 1;
repeated common.KeyValuePair type_params = 2;
repeated common.KeyValuePair index_params = 3;
repeated string data_paths = 2;
repeated common.KeyValuePair type_params = 3;
repeated common.KeyValuePair index_params = 4;
}
message BuildIndexResponse {
@ -56,19 +56,14 @@ message BuildIndexNotification {
repeated string index_file_paths = 3;
}
message IndexFilePathsRequest {
repeated int64 indexIDs = 1;
}
message IndexFilePathInfo {
common.Status status = 1;
int64 indexID = 2;
repeated string index_file_paths = 3;
message IndexFilePathRequest {
int64 indexID = 1;
}
message IndexFilePathsResponse {
common.Status status = 1;
repeated IndexFilePathInfo file_paths = 2;
int64 indexID = 2;
repeated string index_file_paths = 3;
}
message IndexMeta {
@ -93,7 +88,7 @@ service IndexService {
rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {}
rpc BuildIndex(BuildIndexRequest) returns (BuildIndexResponse){}
rpc GetIndexStates(IndexStatesRequest) returns (IndexStatesResponse) {}
rpc GetIndexFilePaths(IndexFilePathsRequest) returns (IndexFilePathsResponse){}
rpc GetIndexFilePaths(IndexFilePathRequest) returns (IndexFilePathsResponse){}
rpc NotifyBuildIndex(BuildIndexNotification) returns (common.Status) {}
}

View File

@ -262,9 +262,9 @@ func (m *IndexStatesResponse) GetStates() []*IndexInfo {
}
type BuildIndexRequest struct {
DataPaths []string `protobuf:"bytes,1,rep,name=data_paths,json=dataPaths,proto3" json:"data_paths,omitempty"`
TypeParams []*commonpb.KeyValuePair `protobuf:"bytes,2,rep,name=type_params,json=typeParams,proto3" json:"type_params,omitempty"`
IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,3,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"`
DataPaths []string `protobuf:"bytes,2,rep,name=data_paths,json=dataPaths,proto3" json:"data_paths,omitempty"`
TypeParams []*commonpb.KeyValuePair `protobuf:"bytes,3,rep,name=type_params,json=typeParams,proto3" json:"type_params,omitempty"`
IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,4,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -465,46 +465,46 @@ func (m *BuildIndexNotification) GetIndexFilePaths() []string {
return nil
}
type IndexFilePathsRequest struct {
IndexIDs []int64 `protobuf:"varint,1,rep,packed,name=indexIDs,proto3" json:"indexIDs,omitempty"`
type IndexFilePathRequest struct {
IndexID int64 `protobuf:"varint,1,opt,name=indexID,proto3" json:"indexID,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *IndexFilePathsRequest) Reset() { *m = IndexFilePathsRequest{} }
func (m *IndexFilePathsRequest) String() string { return proto.CompactTextString(m) }
func (*IndexFilePathsRequest) ProtoMessage() {}
func (*IndexFilePathsRequest) Descriptor() ([]byte, []int) {
func (m *IndexFilePathRequest) Reset() { *m = IndexFilePathRequest{} }
func (m *IndexFilePathRequest) String() string { return proto.CompactTextString(m) }
func (*IndexFilePathRequest) ProtoMessage() {}
func (*IndexFilePathRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_a5d2036b4df73e0a, []int{9}
}
func (m *IndexFilePathsRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_IndexFilePathsRequest.Unmarshal(m, b)
func (m *IndexFilePathRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_IndexFilePathRequest.Unmarshal(m, b)
}
func (m *IndexFilePathsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_IndexFilePathsRequest.Marshal(b, m, deterministic)
func (m *IndexFilePathRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_IndexFilePathRequest.Marshal(b, m, deterministic)
}
func (m *IndexFilePathsRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_IndexFilePathsRequest.Merge(m, src)
func (m *IndexFilePathRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_IndexFilePathRequest.Merge(m, src)
}
func (m *IndexFilePathsRequest) XXX_Size() int {
return xxx_messageInfo_IndexFilePathsRequest.Size(m)
func (m *IndexFilePathRequest) XXX_Size() int {
return xxx_messageInfo_IndexFilePathRequest.Size(m)
}
func (m *IndexFilePathsRequest) XXX_DiscardUnknown() {
xxx_messageInfo_IndexFilePathsRequest.DiscardUnknown(m)
func (m *IndexFilePathRequest) XXX_DiscardUnknown() {
xxx_messageInfo_IndexFilePathRequest.DiscardUnknown(m)
}
var xxx_messageInfo_IndexFilePathsRequest proto.InternalMessageInfo
var xxx_messageInfo_IndexFilePathRequest proto.InternalMessageInfo
func (m *IndexFilePathsRequest) GetIndexIDs() []int64 {
func (m *IndexFilePathRequest) GetIndexID() int64 {
if m != nil {
return m.IndexIDs
return m.IndexID
}
return nil
return 0
}
type IndexFilePathInfo struct {
type IndexFilePathsResponse struct {
Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
IndexID int64 `protobuf:"varint,2,opt,name=indexID,proto3" json:"indexID,omitempty"`
IndexFilePaths []string `protobuf:"bytes,3,rep,name=index_file_paths,json=indexFilePaths,proto3" json:"index_file_paths,omitempty"`
@ -513,65 +513,11 @@ type IndexFilePathInfo struct {
XXX_sizecache int32 `json:"-"`
}
func (m *IndexFilePathInfo) Reset() { *m = IndexFilePathInfo{} }
func (m *IndexFilePathInfo) String() string { return proto.CompactTextString(m) }
func (*IndexFilePathInfo) ProtoMessage() {}
func (*IndexFilePathInfo) Descriptor() ([]byte, []int) {
return fileDescriptor_a5d2036b4df73e0a, []int{10}
}
func (m *IndexFilePathInfo) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_IndexFilePathInfo.Unmarshal(m, b)
}
func (m *IndexFilePathInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_IndexFilePathInfo.Marshal(b, m, deterministic)
}
func (m *IndexFilePathInfo) XXX_Merge(src proto.Message) {
xxx_messageInfo_IndexFilePathInfo.Merge(m, src)
}
func (m *IndexFilePathInfo) XXX_Size() int {
return xxx_messageInfo_IndexFilePathInfo.Size(m)
}
func (m *IndexFilePathInfo) XXX_DiscardUnknown() {
xxx_messageInfo_IndexFilePathInfo.DiscardUnknown(m)
}
var xxx_messageInfo_IndexFilePathInfo proto.InternalMessageInfo
func (m *IndexFilePathInfo) GetStatus() *commonpb.Status {
if m != nil {
return m.Status
}
return nil
}
func (m *IndexFilePathInfo) GetIndexID() int64 {
if m != nil {
return m.IndexID
}
return 0
}
func (m *IndexFilePathInfo) GetIndexFilePaths() []string {
if m != nil {
return m.IndexFilePaths
}
return nil
}
type IndexFilePathsResponse struct {
Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
FilePaths []*IndexFilePathInfo `protobuf:"bytes,2,rep,name=file_paths,json=filePaths,proto3" json:"file_paths,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *IndexFilePathsResponse) Reset() { *m = IndexFilePathsResponse{} }
func (m *IndexFilePathsResponse) String() string { return proto.CompactTextString(m) }
func (*IndexFilePathsResponse) ProtoMessage() {}
func (*IndexFilePathsResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_a5d2036b4df73e0a, []int{11}
return fileDescriptor_a5d2036b4df73e0a, []int{10}
}
func (m *IndexFilePathsResponse) XXX_Unmarshal(b []byte) error {
@ -599,9 +545,16 @@ func (m *IndexFilePathsResponse) GetStatus() *commonpb.Status {
return nil
}
func (m *IndexFilePathsResponse) GetFilePaths() []*IndexFilePathInfo {
func (m *IndexFilePathsResponse) GetIndexID() int64 {
if m != nil {
return m.FilePaths
return m.IndexID
}
return 0
}
func (m *IndexFilePathsResponse) GetIndexFilePaths() []string {
if m != nil {
return m.IndexFilePaths
}
return nil
}
@ -623,7 +576,7 @@ func (m *IndexMeta) Reset() { *m = IndexMeta{} }
func (m *IndexMeta) String() string { return proto.CompactTextString(m) }
func (*IndexMeta) ProtoMessage() {}
func (*IndexMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_a5d2036b4df73e0a, []int{12}
return fileDescriptor_a5d2036b4df73e0a, []int{11}
}
func (m *IndexMeta) XXX_Unmarshal(b []byte) error {
@ -703,8 +656,7 @@ func init() {
proto.RegisterType((*BuildIndexResponse)(nil), "milvus.proto.index.BuildIndexResponse")
proto.RegisterType((*BuildIndexCmd)(nil), "milvus.proto.index.BuildIndexCmd")
proto.RegisterType((*BuildIndexNotification)(nil), "milvus.proto.index.BuildIndexNotification")
proto.RegisterType((*IndexFilePathsRequest)(nil), "milvus.proto.index.IndexFilePathsRequest")
proto.RegisterType((*IndexFilePathInfo)(nil), "milvus.proto.index.IndexFilePathInfo")
proto.RegisterType((*IndexFilePathRequest)(nil), "milvus.proto.index.IndexFilePathRequest")
proto.RegisterType((*IndexFilePathsResponse)(nil), "milvus.proto.index.IndexFilePathsResponse")
proto.RegisterType((*IndexMeta)(nil), "milvus.proto.index.IndexMeta")
}
@ -712,57 +664,55 @@ func init() {
func init() { proto.RegisterFile("index_service.proto", fileDescriptor_a5d2036b4df73e0a) }
var fileDescriptor_a5d2036b4df73e0a = []byte{
// 793 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x56, 0x5d, 0x4f, 0xe3, 0x46,
0x14, 0xc5, 0x18, 0x42, 0x73, 0x13, 0x22, 0x32, 0x69, 0x51, 0x94, 0x16, 0x15, 0x5c, 0x01, 0x29,
0x52, 0x9d, 0x2a, 0x88, 0xf6, 0xb1, 0x22, 0xa0, 0x56, 0x51, 0x05, 0x42, 0xd3, 0xaa, 0x0f, 0xad,
0xaa, 0xc8, 0xb1, 0x6f, 0xc8, 0x48, 0xfe, 0x08, 0x9e, 0x31, 0x5a, 0x78, 0x59, 0xad, 0xb4, 0x8f,
0x2b, 0xad, 0x56, 0xfb, 0x53, 0xf6, 0x75, 0x7f, 0xdc, 0xca, 0xe3, 0xb1, 0x63, 0x43, 0x48, 0x58,
0x81, 0xb4, 0x6f, 0x99, 0xeb, 0x73, 0x3f, 0xe6, 0x9c, 0x33, 0x33, 0x81, 0x06, 0xf3, 0x1d, 0x7c,
0x31, 0xe0, 0x18, 0x5e, 0x33, 0x1b, 0xcd, 0x49, 0x18, 0x88, 0x80, 0x10, 0x8f, 0xb9, 0xd7, 0x11,
0x4f, 0x56, 0xa6, 0x44, 0xb4, 0xaa, 0x76, 0xe0, 0x79, 0x81, 0x9f, 0xc4, 0x5a, 0x35, 0xe6, 0x0b,
0x0c, 0x7d, 0xcb, 0x4d, 0xd6, 0xc6, 0x4b, 0x68, 0x50, 0xbc, 0x64, 0x5c, 0x60, 0x78, 0x1e, 0x38,
0x48, 0xf1, 0x2a, 0x42, 0x2e, 0xc8, 0xcf, 0xb0, 0x32, 0xb4, 0x38, 0x36, 0xb5, 0x6d, 0xad, 0x5d,
0xe9, 0x7e, 0x67, 0x16, 0xea, 0xaa, 0x82, 0x67, 0xfc, 0xb2, 0x67, 0x71, 0xa4, 0x12, 0x49, 0x7e,
0x81, 0x35, 0xcb, 0x71, 0x42, 0xe4, 0xbc, 0xb9, 0x3c, 0x27, 0xe9, 0x38, 0xc1, 0xd0, 0x14, 0x6c,
0xbc, 0xd5, 0xe0, 0xeb, 0xe2, 0x04, 0x7c, 0x12, 0xf8, 0x1c, 0xc9, 0x21, 0x94, 0xb8, 0xb0, 0x44,
0xc4, 0xd5, 0x10, 0xdf, 0xce, 0xac, 0xf7, 0x97, 0x84, 0x50, 0x05, 0x25, 0x3d, 0xa8, 0x30, 0x9f,
0x89, 0xc1, 0xc4, 0x0a, 0x2d, 0x2f, 0x9d, 0x64, 0xc7, 0xbc, 0x43, 0x8b, 0x62, 0xa0, 0xef, 0x33,
0x71, 0x21, 0x81, 0x14, 0x58, 0xf6, 0xdb, 0x30, 0x81, 0xf4, 0x63, 0xe6, 0xe2, 0xd2, 0xc8, 0x53,
0x46, 0x9a, 0xb0, 0x26, 0xf9, 0xec, 0x9f, 0x36, 0xb5, 0x6d, 0xbd, 0xad, 0xd3, 0x74, 0x69, 0x08,
0x28, 0x4b, 0x7c, 0xdf, 0x1f, 0x05, 0xe4, 0x08, 0x56, 0xe3, 0x51, 0x12, 0xe6, 0x6a, 0xdd, 0xef,
0x67, 0x0e, 0x3d, 0x2d, 0x4f, 0x13, 0x74, 0xbe, 0x7a, 0x3c, 0xf3, 0xb4, 0x3a, 0xd9, 0x84, 0x12,
0x45, 0x8b, 0x07, 0x7e, 0x53, 0xdf, 0xd6, 0xda, 0x65, 0xaa, 0x56, 0xc6, 0x2b, 0x0d, 0x1a, 0x85,
0x31, 0x9f, 0x42, 0xdb, 0x51, 0x92, 0x84, 0x31, 0x63, 0x7a, 0xbb, 0xd2, 0xdd, 0x32, 0xef, 0x1b,
0xc9, 0xcc, 0x36, 0x49, 0x15, 0xd8, 0xf8, 0xa8, 0x41, 0xbd, 0x17, 0x31, 0xd7, 0x91, 0x9f, 0x52,
0xa6, 0xb6, 0x00, 0x1c, 0x4b, 0x58, 0x83, 0x89, 0x25, 0xc6, 0x5c, 0x92, 0x55, 0xa6, 0xe5, 0x38,
0x72, 0x11, 0x07, 0x62, 0x89, 0xc4, 0xcd, 0x04, 0xa7, 0x12, 0xe9, 0xf7, 0x25, 0x52, 0x53, 0xfe,
0x89, 0x37, 0xff, 0x58, 0x6e, 0x84, 0x17, 0x16, 0x0b, 0x29, 0xc4, 0x59, 0x89, 0x44, 0xe4, 0x14,
0xaa, 0x89, 0xfd, 0x55, 0x11, 0xfd, 0xb1, 0x45, 0x2a, 0x32, 0x4d, 0x09, 0x6d, 0x03, 0xc9, 0x4f,
0xff, 0x14, 0x02, 0x1f, 0xd4, 0xcf, 0x18, 0xc2, 0xfa, 0xb4, 0xc9, 0x89, 0xe7, 0x14, 0x8d, 0x54,
0x90, 0xfa, 0x57, 0xd0, 0x43, 0xbc, 0x52, 0xa6, 0xdd, 0x9d, 0x25, 0xc1, 0x3d, 0xb2, 0x69, 0x9c,
0x61, 0xbc, 0xd3, 0x60, 0x73, 0xfa, 0xe9, 0x3c, 0x10, 0x6c, 0xc4, 0x6c, 0x4b, 0xb0, 0xc0, 0x7f,
0xe6, 0xdd, 0x90, 0x36, 0x6c, 0x24, 0xc4, 0x8f, 0x98, 0x8b, 0x4a, 0x61, 0x5d, 0x2a, 0x5c, 0x93,
0xf1, 0xdf, 0x99, 0x8b, 0x52, 0x66, 0xe3, 0x10, 0xbe, 0xe9, 0x17, 0x22, 0xa9, 0x3d, 0x5a, 0xf0,
0x95, 0xaa, 0xc6, 0xd5, 0x49, 0xca, 0xd6, 0xc6, 0x1b, 0x0d, 0xea, 0x85, 0x2c, 0x79, 0xa6, 0xbe,
0xd8, 0x1e, 0xde, 0x6b, 0xb0, 0x79, 0x77, 0x13, 0x4f, 0x71, 0xc9, 0x29, 0x40, 0xae, 0x67, 0xe2,
0xfc, 0xdd, 0x07, 0x8f, 0x5a, 0x9e, 0x03, 0x5a, 0x1e, 0x65, 0x53, 0x7d, 0x58, 0x56, 0x17, 0xce,
0x19, 0x0a, 0xeb, 0xf9, 0x2f, 0x9c, 0x2d, 0x00, 0xf4, 0xaf, 0x22, 0x1c, 0x08, 0xe6, 0xa1, 0xbc,
0x74, 0x74, 0x5a, 0x96, 0x91, 0xbf, 0x99, 0x87, 0xe4, 0x07, 0x58, 0xe7, 0xf6, 0x18, 0x9d, 0xc8,
0x55, 0x88, 0x15, 0x89, 0xa8, 0xa6, 0x41, 0x09, 0x32, 0xa1, 0x31, 0x8c, 0xfd, 0x38, 0xb0, 0x03,
0x6f, 0xe2, 0xa2, 0x50, 0xd0, 0x55, 0x09, 0xad, 0xcb, 0x4f, 0x27, 0xea, 0x8b, 0xc4, 0x2b, 0xe7,
0x97, 0x3e, 0xd7, 0xf9, 0x33, 0xb5, 0x5c, 0x9b, 0xa5, 0x65, 0xf7, 0xf5, 0x0a, 0x54, 0x13, 0x1a,
0x92, 0x17, 0x93, 0xd8, 0x50, 0xcd, 0xbf, 0x3b, 0x64, 0x7f, 0x56, 0xdb, 0x19, 0x6f, 0x63, 0xab,
0xbd, 0x18, 0x98, 0x98, 0xc4, 0x58, 0x22, 0xff, 0x03, 0x4c, 0x27, 0x27, 0x8f, 0xdb, 0x59, 0x6b,
0x6f, 0x11, 0x2c, 0x2b, 0x6f, 0x43, 0xed, 0x0f, 0x14, 0xb9, 0x67, 0x80, 0xec, 0x3d, 0x68, 0xa7,
0xc2, 0x73, 0xd6, 0xda, 0x5f, 0x88, 0xcb, 0x9a, 0xb8, 0x50, 0x4f, 0x9b, 0x64, 0x74, 0x92, 0x1f,
0x17, 0xda, 0x36, 0x6b, 0x75, 0xf0, 0x18, 0x68, 0x8e, 0xb1, 0x0d, 0x79, 0x81, 0xdd, 0xe4, 0x78,
0x3b, 0x98, 0x4f, 0x48, 0xfe, 0xc2, 0x6b, 0xcd, 0x3b, 0x88, 0xc6, 0x52, 0xf7, 0x3f, 0x75, 0x76,
0xa4, 0xe4, 0xe7, 0x05, 0x75, 0x76, 0xe6, 0x77, 0x39, 0xf1, 0x9c, 0x05, 0xc5, 0x7b, 0xc7, 0xff,
0xfe, 0x76, 0xc9, 0xc4, 0x38, 0x1a, 0xc6, 0x5f, 0x3a, 0xb7, 0xcc, 0x75, 0xd9, 0xad, 0x40, 0x7b,
0xdc, 0x49, 0xb2, 0x7e, 0x72, 0x18, 0x17, 0x21, 0x1b, 0x46, 0x02, 0x9d, 0x4e, 0xfa, 0x2f, 0xa4,
0x23, 0x4b, 0x75, 0x64, 0xb7, 0xc9, 0x70, 0x58, 0x92, 0xcb, 0xc3, 0x4f, 0x01, 0x00, 0x00, 0xff,
0xff, 0x10, 0x1a, 0x0a, 0xe9, 0xdf, 0x09, 0x00, 0x00,
// 757 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0x5d, 0x4f, 0xdb, 0x4a,
0x10, 0xc5, 0x38, 0x04, 0x65, 0x12, 0x22, 0xd8, 0x20, 0x14, 0xe5, 0x5e, 0x74, 0xc1, 0x57, 0x17,
0x22, 0xa4, 0xeb, 0xa0, 0x20, 0xda, 0xc7, 0x8a, 0x80, 0x5a, 0x45, 0x15, 0x08, 0xb9, 0x55, 0x1f,
0x5a, 0x55, 0x91, 0x63, 0x0f, 0x64, 0x55, 0x7f, 0x04, 0xef, 0x1a, 0x15, 0x5e, 0xaa, 0xaa, 0x3f,
0xa0, 0xea, 0x6f, 0xe9, 0x6b, 0x7f, 0x5c, 0xe5, 0xdd, 0x75, 0x12, 0x83, 0x49, 0x40, 0xd0, 0x37,
0xef, 0xee, 0x99, 0x33, 0xb3, 0xe7, 0xcc, 0xac, 0xa1, 0x46, 0x03, 0x17, 0x3f, 0xf7, 0x18, 0x46,
0x97, 0xd4, 0x41, 0x73, 0x18, 0x85, 0x3c, 0x24, 0xc4, 0xa7, 0xde, 0x65, 0xcc, 0xe4, 0xca, 0x14,
0x88, 0x46, 0xc5, 0x09, 0x7d, 0x3f, 0x0c, 0xe4, 0x5e, 0xa3, 0x4a, 0x03, 0x8e, 0x51, 0x60, 0x7b,
0x72, 0x6d, 0x7c, 0x81, 0x9a, 0x85, 0xe7, 0x94, 0x71, 0x8c, 0x4e, 0x42, 0x17, 0x2d, 0xbc, 0x88,
0x91, 0x71, 0xb2, 0x0b, 0x85, 0xbe, 0xcd, 0xb0, 0xae, 0x6d, 0x68, 0xcd, 0x72, 0xfb, 0x6f, 0x33,
0xc3, 0xab, 0x08, 0x8f, 0xd9, 0x79, 0xc7, 0x66, 0x68, 0x09, 0x24, 0x79, 0x06, 0x8b, 0xb6, 0xeb,
0x46, 0xc8, 0x58, 0x7d, 0x7e, 0x4a, 0xd0, 0x81, 0xc4, 0x58, 0x29, 0xd8, 0xf8, 0xae, 0xc1, 0x6a,
0xb6, 0x02, 0x36, 0x0c, 0x03, 0x86, 0x64, 0x0f, 0x8a, 0x8c, 0xdb, 0x3c, 0x66, 0xaa, 0x88, 0xbf,
0x72, 0xf9, 0xde, 0x08, 0x88, 0xa5, 0xa0, 0xa4, 0x03, 0x65, 0x1a, 0x50, 0xde, 0x1b, 0xda, 0x91,
0xed, 0xa7, 0x95, 0x6c, 0x9a, 0x37, 0x64, 0x51, 0x0a, 0x74, 0x03, 0xca, 0x4f, 0x05, 0xd0, 0x02,
0x3a, 0xfa, 0x36, 0x4c, 0x20, 0xdd, 0x44, 0xb9, 0x84, 0x1a, 0x59, 0xaa, 0x48, 0x1d, 0x16, 0x85,
0x9e, 0xdd, 0xa3, 0xba, 0xb6, 0xa1, 0x37, 0x75, 0x2b, 0x5d, 0x1a, 0x1c, 0x4a, 0x02, 0xdf, 0x0d,
0xce, 0x42, 0xb2, 0x0f, 0x0b, 0x49, 0x29, 0x52, 0xb9, 0x6a, 0xfb, 0x9f, 0xdc, 0xa2, 0xc7, 0xf4,
0x96, 0x44, 0x4f, 0xb2, 0x27, 0x35, 0x8f, 0xd9, 0xc9, 0x1a, 0x14, 0x2d, 0xb4, 0x59, 0x18, 0xd4,
0xf5, 0x0d, 0xad, 0x59, 0xb2, 0xd4, 0xca, 0xf8, 0xaa, 0x41, 0x2d, 0x53, 0xe6, 0x63, 0x64, 0xdb,
0x97, 0x41, 0x98, 0x28, 0xa6, 0x37, 0xcb, 0xed, 0x75, 0xf3, 0x76, 0x23, 0x99, 0xa3, 0x4b, 0x5a,
0x0a, 0x6c, 0xfc, 0xd2, 0x60, 0xa5, 0x13, 0x53, 0xcf, 0x15, 0x47, 0xa9, 0x52, 0xeb, 0x00, 0xae,
0xcd, 0xed, 0xde, 0xd0, 0xe6, 0x03, 0x49, 0x58, 0xb2, 0x4a, 0xc9, 0xce, 0x69, 0xb2, 0x91, 0x58,
0xc4, 0xaf, 0x86, 0x98, 0x5a, 0xa4, 0x8b, 0x84, 0x9b, 0xb9, 0x55, 0xbe, 0xc6, 0xab, 0x77, 0xb6,
0x17, 0xe3, 0xa9, 0x4d, 0x23, 0x0b, 0x92, 0x28, 0x69, 0x11, 0x39, 0x82, 0x8a, 0x6c, 0x7f, 0x45,
0x52, 0xb8, 0x2f, 0x49, 0x59, 0x84, 0x29, 0xa3, 0x1d, 0x20, 0x93, 0xd5, 0x3f, 0x46, 0xc0, 0x3b,
0xfd, 0x33, 0xfa, 0xb0, 0x34, 0x4e, 0x72, 0xe8, 0xbb, 0xd9, 0x46, 0xca, 0x58, 0xfd, 0x1c, 0xf4,
0x08, 0x2f, 0x54, 0xd3, 0xfe, 0x97, 0x67, 0xc1, 0x2d, 0xb1, 0xad, 0x24, 0xc2, 0xf8, 0xa1, 0xc1,
0xda, 0xf8, 0xe8, 0x24, 0xe4, 0xf4, 0x8c, 0x3a, 0x36, 0xa7, 0x61, 0xf0, 0xc4, 0xb7, 0x21, 0x4d,
0x58, 0x96, 0xc2, 0x9f, 0x51, 0x0f, 0x95, 0xc3, 0xba, 0x70, 0xb8, 0x2a, 0xf6, 0x5f, 0x52, 0x0f,
0x85, 0xcd, 0xc6, 0x2e, 0xac, 0x76, 0x27, 0x77, 0x72, 0xe7, 0x28, 0xa3, 0x54, 0x72, 0x8b, 0x4c,
0x08, 0xfb, 0x43, 0x9e, 0x3c, 0xe0, 0x16, 0x3f, 0xe7, 0xd5, 0x70, 0x1f, 0x23, 0xb7, 0x9f, 0x7e,
0xb8, 0xd7, 0x01, 0x30, 0xb8, 0x88, 0xb1, 0xc7, 0xa9, 0x8f, 0x62, 0xc0, 0x75, 0xab, 0x24, 0x76,
0xde, 0x52, 0x1f, 0xc9, 0xbf, 0xb0, 0xc4, 0x9c, 0x01, 0xba, 0xb1, 0xa7, 0x10, 0x05, 0x81, 0xa8,
0xa4, 0x9b, 0x02, 0x64, 0x42, 0xad, 0x9f, 0x78, 0xdf, 0x73, 0x42, 0x7f, 0xe8, 0x21, 0x57, 0xd0,
0x05, 0x01, 0x5d, 0x11, 0x47, 0x87, 0xea, 0x44, 0xe0, 0x55, 0x97, 0x15, 0x1f, 0xda, 0x65, 0xb9,
0xaa, 0x2d, 0xe6, 0xa9, 0xd6, 0xfe, 0x56, 0x80, 0x8a, 0x94, 0x41, 0xfe, 0x9d, 0x88, 0x03, 0x95,
0xc9, 0x37, 0x9e, 0x6c, 0xe7, 0xa5, 0xcd, 0xf9, 0x0f, 0x35, 0x9a, 0xb3, 0x81, 0xb2, 0x45, 0x8c,
0x39, 0xf2, 0x11, 0x60, 0x5c, 0x39, 0xb9, 0xdf, 0xcd, 0x1a, 0x5b, 0xb3, 0x60, 0x23, 0x7a, 0x07,
0xaa, 0xaf, 0x90, 0x4f, 0x3c, 0xb9, 0x64, 0xeb, 0xce, 0x57, 0x32, 0xf3, 0xeb, 0x68, 0x6c, 0xcf,
0xc4, 0x8d, 0x92, 0x7c, 0x82, 0x95, 0x34, 0xc9, 0x48, 0x4e, 0xd2, 0xbc, 0x33, 0xfe, 0xc6, 0x70,
0x35, 0x76, 0x66, 0x22, 0x59, 0x46, 0xb0, 0x65, 0xf1, 0x56, 0x5c, 0x4d, 0xc8, 0xb6, 0x33, 0x5d,
0x8f, 0xc9, 0xb7, 0xa5, 0x31, 0x6d, 0x0a, 0x8d, 0xb9, 0xf6, 0x07, 0x35, 0x3a, 0xc2, 0xf1, 0x93,
0x8c, 0x39, 0x9b, 0xd3, 0xb3, 0x1c, 0xfa, 0xee, 0x0c, 0xf2, 0xce, 0xc1, 0xfb, 0x17, 0xe7, 0x94,
0x0f, 0xe2, 0x7e, 0x72, 0xd2, 0xba, 0xa6, 0x9e, 0x47, 0xaf, 0x39, 0x3a, 0x83, 0x96, 0x8c, 0xfa,
0xdf, 0xa5, 0x8c, 0x47, 0xb4, 0x1f, 0x73, 0x74, 0x5b, 0xe9, 0x0f, 0xbf, 0x25, 0xa8, 0x5a, 0x22,
0xdb, 0xb0, 0xdf, 0x2f, 0x8a, 0xe5, 0xde, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0x4d, 0x32, 0xc8,
0x07, 0x4a, 0x09, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -786,7 +736,7 @@ type IndexServiceClient interface {
RegisterNode(ctx context.Context, in *RegisterNodeRequest, opts ...grpc.CallOption) (*RegisterNodeResponse, error)
BuildIndex(ctx context.Context, in *BuildIndexRequest, opts ...grpc.CallOption) (*BuildIndexResponse, error)
GetIndexStates(ctx context.Context, in *IndexStatesRequest, opts ...grpc.CallOption) (*IndexStatesResponse, error)
GetIndexFilePaths(ctx context.Context, in *IndexFilePathsRequest, opts ...grpc.CallOption) (*IndexFilePathsResponse, error)
GetIndexFilePaths(ctx context.Context, in *IndexFilePathRequest, opts ...grpc.CallOption) (*IndexFilePathsResponse, error)
NotifyBuildIndex(ctx context.Context, in *BuildIndexNotification, opts ...grpc.CallOption) (*commonpb.Status, error)
}
@ -825,7 +775,7 @@ func (c *indexServiceClient) GetIndexStates(ctx context.Context, in *IndexStates
return out, nil
}
func (c *indexServiceClient) GetIndexFilePaths(ctx context.Context, in *IndexFilePathsRequest, opts ...grpc.CallOption) (*IndexFilePathsResponse, error) {
func (c *indexServiceClient) GetIndexFilePaths(ctx context.Context, in *IndexFilePathRequest, opts ...grpc.CallOption) (*IndexFilePathsResponse, error) {
out := new(IndexFilePathsResponse)
err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexService/GetIndexFilePaths", in, out, opts...)
if err != nil {
@ -854,7 +804,7 @@ type IndexServiceServer interface {
RegisterNode(context.Context, *RegisterNodeRequest) (*RegisterNodeResponse, error)
BuildIndex(context.Context, *BuildIndexRequest) (*BuildIndexResponse, error)
GetIndexStates(context.Context, *IndexStatesRequest) (*IndexStatesResponse, error)
GetIndexFilePaths(context.Context, *IndexFilePathsRequest) (*IndexFilePathsResponse, error)
GetIndexFilePaths(context.Context, *IndexFilePathRequest) (*IndexFilePathsResponse, error)
NotifyBuildIndex(context.Context, *BuildIndexNotification) (*commonpb.Status, error)
}
@ -871,7 +821,7 @@ func (*UnimplementedIndexServiceServer) BuildIndex(ctx context.Context, req *Bui
func (*UnimplementedIndexServiceServer) GetIndexStates(ctx context.Context, req *IndexStatesRequest) (*IndexStatesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetIndexStates not implemented")
}
func (*UnimplementedIndexServiceServer) GetIndexFilePaths(ctx context.Context, req *IndexFilePathsRequest) (*IndexFilePathsResponse, error) {
func (*UnimplementedIndexServiceServer) GetIndexFilePaths(ctx context.Context, req *IndexFilePathRequest) (*IndexFilePathsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetIndexFilePaths not implemented")
}
func (*UnimplementedIndexServiceServer) NotifyBuildIndex(ctx context.Context, req *BuildIndexNotification) (*commonpb.Status, error) {
@ -937,7 +887,7 @@ func _IndexService_GetIndexStates_Handler(srv interface{}, ctx context.Context,
}
func _IndexService_GetIndexFilePaths_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(IndexFilePathsRequest)
in := new(IndexFilePathRequest)
if err := dec(in); err != nil {
return nil, err
}
@ -949,7 +899,7 @@ func _IndexService_GetIndexFilePaths_Handler(srv interface{}, ctx context.Contex
FullMethod: "/milvus.proto.index.IndexService/GetIndexFilePaths",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(IndexServiceServer).GetIndexFilePaths(ctx, req.(*IndexFilePathsRequest))
return srv.(IndexServiceServer).GetIndexFilePaths(ctx, req.(*IndexFilePathRequest))
}
return interceptor(ctx, in, info, handler)
}

View File

@ -13,15 +13,15 @@ type ProxyService struct {
}
func (s ProxyService) Init() {
func (s ProxyService) Init() error {
panic("implement me")
}
func (s ProxyService) Start() {
func (s ProxyService) Start() error {
panic("implement me")
}
func (s ProxyService) Stop() {
func (s ProxyService) Stop() error {
panic("implement me")
}

View File

@ -16,10 +16,13 @@ import (
"context"
"errors"
"fmt"
queryserviceimpl "github.com/zilliztech/milvus-distributed/internal/queryservice"
"io"
"sync/atomic"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
queryserviceimpl "github.com/zilliztech/milvus-distributed/internal/queryservice"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
@ -31,9 +34,7 @@ import (
)
type Node interface {
Init()
Start()
Stop()
typeutil.Service
GetComponentStates() (*internalpb2.ComponentStates, error)
GetTimeTickChannel() (string, error)
@ -123,7 +124,7 @@ func Init() {
Params.Init()
}
func (node *QueryNode) Init() {
func (node *QueryNode) Init() error {
registerReq := queryPb.RegisterNodeRequest{
Address: &commonpb.Address{
Ip: Params.QueryNodeIP,
@ -141,9 +142,10 @@ func (node *QueryNode) Init() {
// TODO: use response.initParams
Params.Init()
return nil
}
func (node *QueryNode) Start() {
func (node *QueryNode) Start() error {
// todo add connectMaster logic
// init services and manager
node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
@ -162,9 +164,10 @@ func (node *QueryNode) Start() {
node.stateCode.Store(internalpb2.StateCode_HEALTHY)
<-node.queryNodeLoopCtx.Done()
return nil
}
func (node *QueryNode) Stop() {
func (node *QueryNode) Stop() error {
node.stateCode.Store(internalpb2.StateCode_ABNORMAL)
node.queryNodeLoopCancel()
@ -187,6 +190,7 @@ func (node *QueryNode) Stop() {
if node.closer != nil {
node.closer.Close()
}
return nil
}
func (node *QueryNode) GetComponentStates() (*internalpb2.ComponentStates, error) {

View File

@ -227,15 +227,15 @@ func (s *segmentManager) loadSegmentFieldsData(segmentID UniqueID, targetFields
}
func (s *segmentManager) getIndexPaths(indexID UniqueID) ([]string, error) {
indexFilePathRequest := &indexpb.IndexFilePathsRequest{
IndexIDs: []UniqueID{indexID},
indexFilePathRequest := &indexpb.IndexFilePathRequest{
IndexID: indexID,
}
pathResponse, err := s.indexBuilderClient.GetIndexFilePaths(context.TODO(), indexFilePathRequest)
if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, err
}
return pathResponse.FilePaths[0].IndexFilePaths, nil
return pathResponse.IndexFilePaths, nil
}
func (s *segmentManager) getIndexParam() (indexParam, error) {

View File

@ -9,15 +9,15 @@ type QueryService struct {
}
//serverBase interface
func (qs *QueryService) Init() {
func (qs *QueryService) Init() error {
panic("implement me")
}
func (qs *QueryService) Start() {
func (qs *QueryService) Start() error {
panic("implement me")
}
func (qs *QueryService) Stop() {
func (qs *QueryService) Stop() error {
panic("implement me")
}

View File

@ -5,9 +5,9 @@ import (
)
type Service interface {
Init()
Start()
Stop()
Init() error
Start() error
Stop() error
}
type Component interface {