add grpc interface to process NewSegment And FlushSegmentCompleted (#6247)

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>
pull/6252/head
neza2017 2021-07-01 14:58:17 +08:00 committed by GitHub
parent 580e3a57cf
commit 6019c193fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 264 additions and 46 deletions

View File

@ -295,6 +295,12 @@ func (m *mockRootCoordService) UpdateChannelTimeTick(ctx context.Context, req *i
func (m *mockRootCoordService) ReleaseDQLMessageStream(ctx context.Context, req *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoordService) SegmentFlushCompleted(ctx context.Context, in *internalpb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoordService) AddNewSegment(ctx context.Context, in *datapb.SegmentMsg) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
type mockStartupPolicy struct {
}

View File

@ -22,6 +22,7 @@ import (
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
@ -306,3 +307,15 @@ func (c *GrpcClient) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.Re
})
return ret.(*commonpb.Status), err
}
func (c *GrpcClient) SegmentFlushCompleted(ctx context.Context, in *internalpb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.SegmentFlushCompleted(ctx, in)
})
return ret.(*commonpb.Status), err
}
func (c *GrpcClient) AddNewSegment(ctx context.Context, in *datapb.SegmentMsg) (*commonpb.Status, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.AddNewSegment(ctx, in)
})
return ret.(*commonpb.Status), err
}

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
@ -349,3 +350,9 @@ func (s *Server) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequ
func (s *Server) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
return s.rootCoord.ReleaseDQLMessageStream(ctx, in)
}
func (s *Server) SegmentFlushCompleted(ctx context.Context, in *internalpb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
return s.rootCoord.SegmentFlushCompleted(ctx, in)
}
func (s *Server) AddNewSegment(ctx context.Context, in *datapb.SegmentMsg) (*commonpb.Status, error) {
return s.rootCoord.AddNewSegment(ctx, in)
}

View File

@ -7,6 +7,7 @@ import "common.proto";
import "milvus.proto";
import "internal.proto";
import "proxy.proto";
import "data_coord.proto";
service RootCoord {
rpc GetComponentStates(internal.GetComponentStatesRequest) returns (internal.ComponentStates) {}
@ -96,6 +97,9 @@ service RootCoord {
rpc AllocID(AllocIDRequest) returns (AllocIDResponse) {}
rpc UpdateChannelTimeTick(internal.ChannelTimeTickMsg) returns (common.Status) {}
rpc ReleaseDQLMessageStream(proxy.ReleaseDQLMessageStreamRequest) returns (common.Status) {}
rpc SegmentFlushCompleted(internal.SegmentFlushCompletedMsg) returns (common.Status) {}
rpc AddNewSegment(data.SegmentMsg) returns (common.Status) {}
}
message AllocTimestampRequest {

View File

@ -8,6 +8,7 @@ import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
commonpb "github.com/milvus-io/milvus/internal/proto/commonpb"
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
internalpb "github.com/milvus-io/milvus/internal/proto/internalpb"
milvuspb "github.com/milvus-io/milvus/internal/proto/milvuspb"
proxypb "github.com/milvus-io/milvus/internal/proto/proxypb"
@ -242,53 +243,56 @@ func init() {
func init() { proto.RegisterFile("root_coord.proto", fileDescriptor_4513485a144f6b06) }
var fileDescriptor_4513485a144f6b06 = []byte{
// 732 bytes of a gzipped FileDescriptorProto
// 783 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0xdb, 0x4f, 0xdb, 0x30,
0x14, 0xc6, 0x69, 0x61, 0x4c, 0x1c, 0xda, 0x82, 0x2c, 0x60, 0xa8, 0xe3, 0x81, 0x75, 0x1a, 0xb4,
0x5c, 0x52, 0x04, 0xd2, 0xb4, 0xd7, 0xd1, 0x4a, 0x50, 0x69, 0x48, 0x23, 0x05, 0x69, 0x37, 0x54,
0xb9, 0xc1, 0x6a, 0x23, 0x12, 0x3b, 0xc4, 0xee, 0x60, 0x8f, 0xfb, 0xb7, 0xf7, 0x34, 0xe5, 0x62,
0x37, 0x49, 0x93, 0x10, 0xb4, 0xbd, 0xc5, 0xf1, 0xcf, 0xdf, 0xe7, 0x73, 0x91, 0x8e, 0x61, 0xd5,
0x65, 0x4c, 0x0c, 0x0c, 0xc6, 0xdc, 0x5b, 0xcd, 0x71, 0x99, 0x60, 0x68, 0xc3, 0x36, 0xad, 0x9f,
0x13, 0x1e, 0xac, 0x34, 0x6f, 0xdb, 0xdf, 0xad, 0x57, 0x0c, 0x66, 0xdb, 0x8c, 0x06, 0xff, 0xeb,
0x95, 0x28, 0x55, 0xaf, 0x99, 0x54, 0x10, 0x97, 0x62, 0x2b, 0x5c, 0x2f, 0x3b, 0x2e, 0x7b, 0xfc,
0x15, 0x2c, 0x1a, 0x03, 0x58, 0xff, 0x68, 0x59, 0xcc, 0xb8, 0x32, 0x6d, 0xc2, 0x05, 0xb6, 0x1d,
0x9d, 0xdc, 0x4f, 0x08, 0x17, 0xe8, 0x08, 0x16, 0x86, 0x98, 0x93, 0xcd, 0xd2, 0x76, 0xa9, 0xb9,
0x7c, 0xbc, 0xa5, 0xc5, 0x8c, 0x43, 0xb7, 0x0b, 0x3e, 0x3a, 0xc5, 0x9c, 0xe8, 0x3e, 0x89, 0xd6,
0xe0, 0x85, 0xc1, 0x26, 0x54, 0x6c, 0xce, 0x6f, 0x97, 0x9a, 0x55, 0x3d, 0x58, 0x34, 0x7e, 0x97,
0x60, 0x23, 0xe9, 0xc0, 0x1d, 0x46, 0x39, 0x41, 0x27, 0xb0, 0xc8, 0x05, 0x16, 0x13, 0x1e, 0x9a,
0xbc, 0x4e, 0x35, 0xe9, 0xfb, 0x88, 0x1e, 0xa2, 0x68, 0x0b, 0x96, 0x84, 0x54, 0xda, 0x2c, 0x6f,
0x97, 0x9a, 0x0b, 0xfa, 0xf4, 0x47, 0xc6, 0x1d, 0xbe, 0x40, 0xcd, 0xbf, 0x42, 0xaf, 0xfb, 0x1f,
0xa2, 0x2b, 0x47, 0x95, 0x2d, 0x58, 0x51, 0xca, 0xff, 0x12, 0x55, 0x0d, 0xca, 0xbd, 0xae, 0x2f,
0x3d, 0xaf, 0x97, 0x7b, 0xdd, 0xf4, 0x38, 0x8e, 0xff, 0xac, 0xc2, 0x92, 0xce, 0x98, 0xe8, 0x78,
0x35, 0x47, 0x0e, 0xa0, 0x33, 0x22, 0x3a, 0xcc, 0x76, 0x18, 0x25, 0x54, 0x78, 0x8a, 0x84, 0xa3,
0xa3, 0xb8, 0x9d, 0xaa, 0xfd, 0x2c, 0x1a, 0xe6, 0xa2, 0xbe, 0x93, 0x71, 0x22, 0x81, 0x37, 0xe6,
0x90, 0xed, 0x3b, 0x7a, 0x85, 0xbc, 0x32, 0x8d, 0xbb, 0xce, 0x18, 0x53, 0x4a, 0xac, 0x3c, 0xc7,
0x04, 0x2a, 0x1d, 0xdf, 0xc6, 0x4f, 0x84, 0x8b, 0xbe, 0x70, 0x4d, 0x3a, 0x92, 0x79, 0x6c, 0xcc,
0xa1, 0x7b, 0x58, 0x3b, 0x23, 0xbe, 0xbb, 0xc9, 0x85, 0x69, 0x70, 0x69, 0x78, 0x9c, 0x6d, 0x38,
0x03, 0x3f, 0xd3, 0x72, 0x00, 0xab, 0x1d, 0x97, 0x60, 0x41, 0x3a, 0xcc, 0xb2, 0x88, 0x21, 0x4c,
0x46, 0xd1, 0x41, 0xea, 0xd1, 0x24, 0x26, 0x8d, 0xf2, 0xca, 0xdd, 0x98, 0x43, 0xdf, 0xa1, 0xd6,
0x75, 0x99, 0x13, 0x91, 0xdf, 0x4b, 0x95, 0x8f, 0x43, 0x05, 0xc5, 0x07, 0x50, 0x3d, 0xc7, 0x3c,
0xa2, 0xdd, 0x4a, 0xd5, 0x8e, 0x31, 0x52, 0xfa, 0x4d, 0x2a, 0x7a, 0xca, 0x98, 0x15, 0x49, 0xcf,
0x03, 0xa0, 0x2e, 0xe1, 0x86, 0x6b, 0x0e, 0xa3, 0x09, 0xd2, 0xd2, 0x23, 0x98, 0x01, 0xa5, 0x55,
0xbb, 0x30, 0xaf, 0x8c, 0x29, 0xac, 0xf4, 0xc7, 0xec, 0x61, 0xba, 0xc7, 0xd1, 0x7e, 0x7a, 0x45,
0xe3, 0x94, 0xb4, 0x3c, 0x28, 0x06, 0x2b, 0xbf, 0x1b, 0x58, 0x09, 0x0a, 0xfc, 0x19, 0xbb, 0xc2,
0xf4, 0xa3, 0xdc, 0xcf, 0x69, 0x03, 0x45, 0x15, 0x2c, 0xd4, 0x57, 0xa8, 0x7a, 0x05, 0x9e, 0x8a,
0xb7, 0x32, 0x9b, 0xe0, 0xb9, 0xd2, 0x37, 0x50, 0x39, 0xc7, 0x7c, 0xaa, 0xdc, 0xcc, 0x6a, 0x81,
0x19, 0xe1, 0x42, 0x1d, 0x70, 0x07, 0x35, 0x2f, 0x6b, 0xea, 0x30, 0xcf, 0xe8, 0xdf, 0x38, 0x24,
0x2d, 0xf6, 0x0b, 0xb1, 0xd1, 0xaa, 0xcb, 0xae, 0xe8, 0x93, 0x91, 0x4d, 0xa8, 0xc8, 0xa8, 0x42,
0x82, 0xca, 0xaf, 0xfa, 0x0c, 0xac, 0xfc, 0x08, 0x54, 0xbc, 0xbb, 0x84, 0x1b, 0x3c, 0x23, 0x77,
0x51, 0x44, 0x3a, 0xb5, 0x0a, 0x90, 0xca, 0xe6, 0x1a, 0x96, 0x83, 0xb6, 0xe9, 0xd1, 0x5b, 0xf2,
0x88, 0x76, 0x73, 0x1a, 0xcb, 0x27, 0x0a, 0x56, 0x7e, 0x0c, 0x55, 0x19, 0x5a, 0x20, 0xdc, 0xca,
0x0d, 0x3f, 0x26, 0xbd, 0x57, 0x04, 0x55, 0x01, 0x5c, 0xc2, 0x92, 0xd7, 0x9a, 0x81, 0xcb, 0xbb,
0xcc, 0xd6, 0x7d, 0xce, 0xe5, 0xef, 0xc3, 0x11, 0xad, 0x5e, 0x09, 0xe8, 0x50, 0x4b, 0x7f, 0xeb,
0x68, 0xa9, 0xef, 0x95, 0xba, 0x56, 0x14, 0x57, 0x51, 0xfc, 0x80, 0x97, 0xe1, 0xec, 0x46, 0x3b,
0xb9, 0x87, 0xd5, 0xb3, 0xa1, 0xbe, 0xfb, 0x24, 0xa7, 0xd4, 0x31, 0xac, 0x5f, 0x3b, 0xb7, 0xde,
0x88, 0x08, 0x06, 0x91, 0x1c, 0x85, 0xc9, 0xaa, 0x4c, 0xc7, 0x6d, 0x9c, 0xbb, 0xe0, 0xa3, 0xa7,
0x72, 0x66, 0xc1, 0x2b, 0x9d, 0x58, 0x04, 0x73, 0xd2, 0xbd, 0xfc, 0x74, 0x41, 0x38, 0xc7, 0x23,
0xd2, 0x17, 0x2e, 0xc1, 0x76, 0x72, 0x44, 0x06, 0x2f, 0xbe, 0x0c, 0xb8, 0x58, 0x85, 0x4e, 0x3f,
0x7c, 0x7b, 0x3f, 0x32, 0xc5, 0x78, 0x32, 0xf4, 0x76, 0xda, 0x01, 0x7a, 0x68, 0xb2, 0xf0, 0xab,
0x2d, 0xe3, 0x68, 0xfb, 0xa7, 0xdb, 0x2a, 0x35, 0xce, 0x70, 0xb8, 0xe8, 0xff, 0x3a, 0xf9, 0x1b,
0x00, 0x00, 0xff, 0xff, 0xa5, 0x6f, 0xe1, 0xba, 0xcf, 0x0a, 0x00, 0x00,
0x14, 0xc6, 0x69, 0x61, 0x4c, 0x1c, 0xda, 0x82, 0x2c, 0x60, 0xa8, 0x63, 0x12, 0xeb, 0x34, 0x68,
0xb9, 0xb4, 0x08, 0xa4, 0x69, 0xaf, 0xd0, 0x6a, 0x50, 0x69, 0x9d, 0x46, 0x0a, 0xd2, 0x6e, 0xa8,
0x72, 0xd3, 0xa3, 0x36, 0x22, 0x89, 0x43, 0xec, 0x0e, 0xf6, 0xb8, 0x7f, 0x69, 0x7f, 0xe1, 0x94,
0x8b, 0xd3, 0xa4, 0x4d, 0x42, 0xd0, 0xf6, 0x56, 0xc7, 0x3f, 0x7f, 0x9f, 0x8f, 0xbf, 0x53, 0xd9,
0xb0, 0x6a, 0x33, 0x26, 0x7a, 0x2a, 0x63, 0xf6, 0xa0, 0x6e, 0xd9, 0x4c, 0x30, 0xb2, 0x61, 0x68,
0xfa, 0xcf, 0x31, 0xf7, 0x46, 0x75, 0x67, 0xda, 0x9d, 0x2d, 0x17, 0x54, 0x66, 0x18, 0xcc, 0xf4,
0xbe, 0x97, 0x0b, 0x61, 0xaa, 0x5c, 0xd2, 0x4c, 0x81, 0xb6, 0x49, 0x75, 0x7f, 0xbc, 0x6c, 0xd9,
0xec, 0xe1, 0x97, 0x3f, 0x58, 0x1d, 0x50, 0x41, 0xc3, 0x16, 0x95, 0x1e, 0xac, 0x9f, 0xea, 0x3a,
0x53, 0xaf, 0x34, 0x03, 0xb9, 0xa0, 0x86, 0xa5, 0xe0, 0xdd, 0x18, 0xb9, 0x20, 0x47, 0xb0, 0xd0,
0xa7, 0x1c, 0x37, 0x73, 0xdb, 0xb9, 0xea, 0xf2, 0xf1, 0x56, 0x3d, 0xb2, 0x15, 0xdf, 0xbf, 0xc3,
0x87, 0x67, 0x94, 0xa3, 0xe2, 0x92, 0x64, 0x0d, 0x9e, 0xa9, 0x6c, 0x6c, 0x8a, 0xcd, 0xf9, 0xed,
0x5c, 0xb5, 0xa8, 0x78, 0x83, 0xca, 0xef, 0x1c, 0x6c, 0x4c, 0x3b, 0x70, 0x8b, 0x99, 0x1c, 0xc9,
0x09, 0x2c, 0x72, 0x41, 0xc5, 0x98, 0xfb, 0x26, 0x2f, 0x63, 0x4d, 0xba, 0x2e, 0xa2, 0xf8, 0x28,
0xd9, 0x82, 0x25, 0x21, 0x95, 0x36, 0xf3, 0xdb, 0xb9, 0xea, 0x82, 0x32, 0xf9, 0x90, 0xb0, 0x87,
0x2f, 0x50, 0x72, 0xb7, 0xd0, 0x6e, 0xfd, 0x87, 0xea, 0xf2, 0x61, 0x65, 0x1d, 0x56, 0x02, 0xe5,
0x7f, 0xa9, 0xaa, 0x04, 0xf9, 0x76, 0xcb, 0x95, 0x9e, 0x57, 0xf2, 0xed, 0x56, 0x7c, 0x1d, 0xc7,
0x7f, 0x08, 0x2c, 0x29, 0x8c, 0x89, 0xa6, 0x13, 0x20, 0xb1, 0x80, 0x9c, 0xa3, 0x68, 0x32, 0xc3,
0x62, 0x26, 0x9a, 0xc2, 0x51, 0x44, 0x4e, 0x8e, 0xa2, 0x76, 0x41, 0x37, 0xcc, 0xa2, 0xfe, 0x59,
0x94, 0x77, 0x12, 0x56, 0x4c, 0xe1, 0x95, 0x39, 0x62, 0xb8, 0x8e, 0x4e, 0x90, 0x57, 0x9a, 0x7a,
0xdb, 0x1c, 0x51, 0xd3, 0x44, 0x3d, 0xcd, 0x71, 0x0a, 0x95, 0x8e, 0x6f, 0xa2, 0x2b, 0xfc, 0x41,
0x57, 0xd8, 0x9a, 0x39, 0x94, 0xe7, 0x58, 0x99, 0x23, 0x77, 0xb0, 0x76, 0x8e, 0xae, 0xbb, 0xc6,
0x85, 0xa6, 0x72, 0x69, 0x78, 0x9c, 0x6c, 0x38, 0x03, 0x3f, 0xd1, 0xb2, 0x07, 0xab, 0x4d, 0x1b,
0xa9, 0xc0, 0x26, 0xd3, 0x75, 0x54, 0x85, 0xc6, 0x4c, 0x72, 0x10, 0xbb, 0x74, 0x1a, 0x93, 0x46,
0x69, 0x71, 0x57, 0xe6, 0xc8, 0x77, 0x28, 0xb5, 0x6c, 0x66, 0x85, 0xe4, 0xf7, 0x62, 0xe5, 0xa3,
0x50, 0x46, 0xf1, 0x1e, 0x14, 0x2f, 0x28, 0x0f, 0x69, 0xd7, 0x62, 0xb5, 0x23, 0x8c, 0x94, 0x7e,
0x1d, 0x8b, 0x9e, 0x31, 0xa6, 0x87, 0x8e, 0xe7, 0x1e, 0x48, 0x0b, 0xb9, 0x6a, 0x6b, 0xfd, 0xf0,
0x01, 0xd5, 0xe3, 0x2b, 0x98, 0x01, 0xa5, 0x55, 0x23, 0x33, 0x1f, 0x18, 0x9b, 0xb0, 0xd2, 0x1d,
0xb1, 0xfb, 0xc9, 0x1c, 0x27, 0xfb, 0xf1, 0x89, 0x46, 0x29, 0x69, 0x79, 0x90, 0x0d, 0x0e, 0xfc,
0x6e, 0x60, 0xc5, 0x0b, 0xf8, 0x33, 0xb5, 0x85, 0xe6, 0x56, 0xb9, 0x9f, 0xd2, 0x06, 0x01, 0x95,
0x31, 0xa8, 0xaf, 0x50, 0x74, 0x02, 0x9e, 0x88, 0xd7, 0x12, 0x9b, 0xe0, 0xa9, 0xd2, 0x37, 0x50,
0xb8, 0xa0, 0x7c, 0xa2, 0x5c, 0x4d, 0x6a, 0x81, 0x19, 0xe1, 0x4c, 0x1d, 0x70, 0x0b, 0x25, 0xe7,
0xd4, 0x82, 0xc5, 0x3c, 0xa1, 0x7f, 0xa3, 0x90, 0xb4, 0xd8, 0xcf, 0xc4, 0x86, 0x53, 0x97, 0x5d,
0xd1, 0xc5, 0xa1, 0x81, 0xa6, 0x48, 0x48, 0x61, 0x8a, 0x4a, 0x4f, 0x7d, 0x06, 0x0e, 0xfc, 0x10,
0x0a, 0xce, 0x5e, 0xfc, 0x09, 0x9e, 0x70, 0x76, 0x61, 0x44, 0x3a, 0xd5, 0x32, 0x90, 0x81, 0xcd,
0x35, 0x2c, 0x7b, 0x6d, 0xd3, 0x36, 0x07, 0xf8, 0x40, 0x76, 0x53, 0x1a, 0xcb, 0x25, 0x32, 0x26,
0x3f, 0x82, 0xa2, 0x2c, 0xcd, 0x13, 0xae, 0xa5, 0x96, 0x1f, 0x91, 0xde, 0xcb, 0x82, 0x06, 0x05,
0x5c, 0xc2, 0x92, 0xd3, 0x9a, 0x9e, 0xcb, 0xdb, 0xc4, 0xd6, 0x7d, 0xca, 0xe6, 0xef, 0xfc, 0x2b,
0x3a, 0x78, 0x25, 0x90, 0xc3, 0x7a, 0xfc, 0xeb, 0xa7, 0x1e, 0xfb, 0x5e, 0x29, 0xd7, 0xb3, 0xe2,
0x41, 0x15, 0x3f, 0xe0, 0xb9, 0x7f, 0x77, 0x93, 0x9d, 0xd4, 0xc5, 0xc1, 0xb3, 0xa1, 0xbc, 0xfb,
0x28, 0x17, 0xa8, 0x53, 0x58, 0xbf, 0xb6, 0x06, 0xce, 0x15, 0xe1, 0x5d, 0x44, 0xf2, 0x2a, 0x9c,
0x4e, 0x65, 0x72, 0xdd, 0x46, 0xb9, 0x0e, 0x1f, 0x3e, 0x76, 0x66, 0x3a, 0xbc, 0x50, 0x50, 0x47,
0xca, 0xb1, 0x75, 0xf9, 0xb1, 0x83, 0x9c, 0xd3, 0x21, 0x76, 0x85, 0x8d, 0xd4, 0x98, 0xbe, 0x22,
0xbd, 0x37, 0x60, 0x02, 0x9c, 0x31, 0xa1, 0x21, 0xac, 0xfb, 0xbd, 0xfc, 0x41, 0x1f, 0xf3, 0x91,
0xf3, 0x3a, 0xd0, 0x51, 0xe0, 0x80, 0x34, 0x12, 0x0a, 0x8a, 0xa5, 0x33, 0x94, 0xd5, 0x81, 0xe2,
0xe9, 0x60, 0xf0, 0x09, 0xe5, 0x5f, 0x87, 0xbc, 0x8a, 0xf2, 0xce, 0x1b, 0x56, 0x8a, 0x3f, 0x2e,
0x77, 0xf6, 0xfe, 0xdb, 0xbb, 0xa1, 0x26, 0x46, 0xe3, 0xbe, 0x33, 0xd3, 0xf0, 0xd0, 0x43, 0x8d,
0xf9, 0xbf, 0x1a, 0x72, 0xbb, 0x0d, 0x77, 0x75, 0x23, 0x88, 0xd4, 0xea, 0xf7, 0x17, 0xdd, 0x4f,
0x27, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x50, 0xce, 0xa8, 0x25, 0x99, 0x0b, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -370,6 +374,8 @@ type RootCoordClient interface {
AllocID(ctx context.Context, in *AllocIDRequest, opts ...grpc.CallOption) (*AllocIDResponse, error)
UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg, opts ...grpc.CallOption) (*commonpb.Status, error)
ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
SegmentFlushCompleted(ctx context.Context, in *internalpb.SegmentFlushCompletedMsg, opts ...grpc.CallOption) (*commonpb.Status, error)
AddNewSegment(ctx context.Context, in *datapb.SegmentMsg, opts ...grpc.CallOption) (*commonpb.Status, error)
}
type rootCoordClient struct {
@ -569,6 +575,24 @@ func (c *rootCoordClient) ReleaseDQLMessageStream(ctx context.Context, in *proxy
return out, nil
}
func (c *rootCoordClient) SegmentFlushCompleted(ctx context.Context, in *internalpb.SegmentFlushCompletedMsg, opts ...grpc.CallOption) (*commonpb.Status, error) {
out := new(commonpb.Status)
err := c.cc.Invoke(ctx, "/milvus.proto.rootcoord.RootCoord/SegmentFlushCompleted", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *rootCoordClient) AddNewSegment(ctx context.Context, in *datapb.SegmentMsg, opts ...grpc.CallOption) (*commonpb.Status, error) {
out := new(commonpb.Status)
err := c.cc.Invoke(ctx, "/milvus.proto.rootcoord.RootCoord/AddNewSegment", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// RootCoordServer is the server API for RootCoord service.
type RootCoordServer interface {
GetComponentStates(context.Context, *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error)
@ -638,6 +662,8 @@ type RootCoordServer interface {
AllocID(context.Context, *AllocIDRequest) (*AllocIDResponse, error)
UpdateChannelTimeTick(context.Context, *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error)
ReleaseDQLMessageStream(context.Context, *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error)
SegmentFlushCompleted(context.Context, *internalpb.SegmentFlushCompletedMsg) (*commonpb.Status, error)
AddNewSegment(context.Context, *datapb.SegmentMsg) (*commonpb.Status, error)
}
// UnimplementedRootCoordServer can be embedded to have forward compatible implementations.
@ -707,6 +733,12 @@ func (*UnimplementedRootCoordServer) UpdateChannelTimeTick(ctx context.Context,
func (*UnimplementedRootCoordServer) ReleaseDQLMessageStream(ctx context.Context, req *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReleaseDQLMessageStream not implemented")
}
func (*UnimplementedRootCoordServer) SegmentFlushCompleted(ctx context.Context, req *internalpb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
return nil, status.Errorf(codes.Unimplemented, "method SegmentFlushCompleted not implemented")
}
func (*UnimplementedRootCoordServer) AddNewSegment(ctx context.Context, req *datapb.SegmentMsg) (*commonpb.Status, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddNewSegment not implemented")
}
func RegisterRootCoordServer(s *grpc.Server, srv RootCoordServer) {
s.RegisterService(&_RootCoord_serviceDesc, srv)
@ -1090,6 +1122,42 @@ func _RootCoord_ReleaseDQLMessageStream_Handler(srv interface{}, ctx context.Con
return interceptor(ctx, in, info, handler)
}
func _RootCoord_SegmentFlushCompleted_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(internalpb.SegmentFlushCompletedMsg)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RootCoordServer).SegmentFlushCompleted(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/milvus.proto.rootcoord.RootCoord/SegmentFlushCompleted",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RootCoordServer).SegmentFlushCompleted(ctx, req.(*internalpb.SegmentFlushCompletedMsg))
}
return interceptor(ctx, in, info, handler)
}
func _RootCoord_AddNewSegment_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(datapb.SegmentMsg)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RootCoordServer).AddNewSegment(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/milvus.proto.rootcoord.RootCoord/AddNewSegment",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RootCoordServer).AddNewSegment(ctx, req.(*datapb.SegmentMsg))
}
return interceptor(ctx, in, info, handler)
}
var _RootCoord_serviceDesc = grpc.ServiceDesc{
ServiceName: "milvus.proto.rootcoord.RootCoord",
HandlerType: (*RootCoordServer)(nil),
@ -1178,6 +1246,14 @@ var _RootCoord_serviceDesc = grpc.ServiceDesc{
MethodName: "ReleaseDQLMessageStream",
Handler: _RootCoord_ReleaseDQLMessageStream_Handler,
},
{
MethodName: "SegmentFlushCompleted",
Handler: _RootCoord_SegmentFlushCompleted_Handler,
},
{
MethodName: "AddNewSegment",
Handler: _RootCoord_AddNewSegment_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "root_coord.proto",

View File

@ -1907,3 +1907,113 @@ func (c *Core) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseD
}
return c.proxyClientManager.ReleaseDQLMessageStream(ctx, in)
}
func (c *Core) SegmentFlushCompleted(ctx context.Context, in *internalpb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
}, nil
}
if in.Base.MsgType != commonpb.MsgType_SegmentFlushDone {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: fmt.Sprintf("SegmentFlushDone with incorrect msgtype = %s", commonpb.MsgType_name[int32(in.Base.MsgType)]),
}, nil
}
segID := in.SegmentID
log.Debug("flush segment", zap.Int64("id", segID))
coll, err := c.MetaTable.GetCollectionBySegmentID(segID)
if err != nil {
log.Warn("GetCollectionBySegmentID error", zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: fmt.Sprintf("GetCollectionBySegmentID error = %v", err),
}, nil
}
err = c.MetaTable.AddFlushedSegment(segID)
if err != nil {
log.Warn("AddFlushedSegment error", zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: fmt.Sprintf("AddFlushedSegment error = %v", err),
}, nil
}
if len(coll.FieldIndexes) == 0 {
log.Debug("no index params on collection", zap.String("collection_name", coll.Schema.Name))
}
var segIdxInfos []*etcdpb.SegmentIndexInfo
for _, f := range coll.FieldIndexes {
fieldSch, err := GetFieldSchemaByID(coll, f.FiledID)
if err != nil {
log.Warn("field schema not found", zap.Int64("field id", f.FiledID))
continue
}
idxInfo, err := c.MetaTable.GetIndexByID(f.IndexID)
if err != nil {
log.Warn("index not found", zap.Int64("index id", f.IndexID))
continue
}
info := etcdpb.SegmentIndexInfo{
SegmentID: segID,
FieldID: fieldSch.FieldID,
IndexID: idxInfo.IndexID,
EnableIndex: false,
}
info.BuildID, err = c.BuildIndex(ctx, segID, fieldSch, idxInfo, true)
if err == nil && info.BuildID != 0 {
info.EnableIndex = true
} else {
log.Error("build index fail", zap.Int64("buildid", info.BuildID), zap.Error(err))
}
segIdxInfos = append(segIdxInfos, &info)
}
if len(segIdxInfos) > 0 {
_, err = c.MetaTable.AddIndex(segIdxInfos, "", "")
if err != nil {
log.Error("AddIndex fail", zap.String("err", err.Error()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: fmt.Sprintf("AddIndex error = %v", err),
}, nil
}
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
}, nil
}
func (c *Core) AddNewSegment(ctx context.Context, in *datapb.SegmentMsg) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
}, nil
}
if in.Base.MsgType != commonpb.MsgType_SegmentInfo {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: fmt.Sprintf("AddNewSegment with incorrect msgtype = %s", commonpb.MsgType_name[int32(in.Base.MsgType)]),
}, nil
}
if _, err := c.MetaTable.AddSegment([]*datapb.SegmentInfo{in.Segment}, "", ""); err != nil {
log.Debug("add segment info meta table failed ", zap.String("error", err.Error()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: fmt.Sprintf("add segment info meta table failed, error = %v", err),
}, nil
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
}, nil
}

View File

@ -109,6 +109,8 @@ type RootCoord interface {
DescribeSegment(ctx context.Context, req *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error)
ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error)
ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error)
SegmentFlushCompleted(ctx context.Context, in *internalpb.SegmentFlushCompletedMsg) (*commonpb.Status, error)
AddNewSegment(ctx context.Context, in *datapb.SegmentMsg) (*commonpb.Status, error)
}
// RootCoordComponent is used by grpc server of RootCoord