Watch query channel to near the latest position (#13682)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/13863/head
xige-16 2021-12-21 13:50:54 +08:00 committed by GitHub
parent 83624d5850
commit 4e4ff54d98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 297 additions and 418 deletions

View File

@ -150,9 +150,8 @@ message AddQueryChannelRequest {
int64 collectionID = 3; int64 collectionID = 3;
string query_channel = 4; string query_channel = 4;
string query_result_channel = 5; string query_result_channel = 5;
repeated int64 global_sealed_segmentID = 6; internal.MsgPosition seek_position = 6;
internal.MsgPosition seek_position = 7; repeated SegmentInfo global_sealed_segments = 7;
repeated SegmentInfo global_sealed_segments = 8;
} }
message RemoveQueryChannelRequest { message RemoveQueryChannelRequest {

View File

@ -947,17 +947,16 @@ func (m *GetSegmentInfoResponse) GetInfos() []*SegmentInfo {
//-----------------query node grpc request and response proto---------------- //-----------------query node grpc request and response proto----------------
type AddQueryChannelRequest struct { type AddQueryChannelRequest struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
NodeID int64 `protobuf:"varint,2,opt,name=nodeID,proto3" json:"nodeID,omitempty"` NodeID int64 `protobuf:"varint,2,opt,name=nodeID,proto3" json:"nodeID,omitempty"`
CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"` CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
QueryChannel string `protobuf:"bytes,4,opt,name=query_channel,json=queryChannel,proto3" json:"query_channel,omitempty"` QueryChannel string `protobuf:"bytes,4,opt,name=query_channel,json=queryChannel,proto3" json:"query_channel,omitempty"`
QueryResultChannel string `protobuf:"bytes,5,opt,name=query_result_channel,json=queryResultChannel,proto3" json:"query_result_channel,omitempty"` QueryResultChannel string `protobuf:"bytes,5,opt,name=query_result_channel,json=queryResultChannel,proto3" json:"query_result_channel,omitempty"`
GlobalSealedSegmentID []int64 `protobuf:"varint,6,rep,packed,name=global_sealed_segmentID,json=globalSealedSegmentID,proto3" json:"global_sealed_segmentID,omitempty"` SeekPosition *internalpb.MsgPosition `protobuf:"bytes,6,opt,name=seek_position,json=seekPosition,proto3" json:"seek_position,omitempty"`
SeekPosition *internalpb.MsgPosition `protobuf:"bytes,7,opt,name=seek_position,json=seekPosition,proto3" json:"seek_position,omitempty"` GlobalSealedSegments []*SegmentInfo `protobuf:"bytes,7,rep,name=global_sealed_segments,json=globalSealedSegments,proto3" json:"global_sealed_segments,omitempty"`
GlobalSealedSegments []*SegmentInfo `protobuf:"bytes,8,rep,name=global_sealed_segments,json=globalSealedSegments,proto3" json:"global_sealed_segments,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"`
XXX_sizecache int32 `json:"-"`
} }
func (m *AddQueryChannelRequest) Reset() { *m = AddQueryChannelRequest{} } func (m *AddQueryChannelRequest) Reset() { *m = AddQueryChannelRequest{} }
@ -1020,13 +1019,6 @@ func (m *AddQueryChannelRequest) GetQueryResultChannel() string {
return "" return ""
} }
func (m *AddQueryChannelRequest) GetGlobalSealedSegmentID() []int64 {
if m != nil {
return m.GlobalSealedSegmentID
}
return nil
}
func (m *AddQueryChannelRequest) GetSeekPosition() *internalpb.MsgPosition { func (m *AddQueryChannelRequest) GetSeekPosition() *internalpb.MsgPosition {
if m != nil { if m != nil {
return m.SeekPosition return m.SeekPosition
@ -2228,145 +2220,144 @@ func init() {
func init() { proto.RegisterFile("query_coord.proto", fileDescriptor_aab7cc9a69ed26e8) } func init() { proto.RegisterFile("query_coord.proto", fileDescriptor_aab7cc9a69ed26e8) }
var fileDescriptor_aab7cc9a69ed26e8 = []byte{ var fileDescriptor_aab7cc9a69ed26e8 = []byte{
// 2209 bytes of a gzipped FileDescriptorProto // 2193 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x19, 0x4d, 0x73, 0xdc, 0x48, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x19, 0x4d, 0x73, 0xdc, 0x48,
0xd5, 0x9a, 0x2f, 0x7b, 0xde, 0x8c, 0x67, 0xe4, 0xf6, 0xc7, 0x8e, 0x87, 0x24, 0xeb, 0xd5, 0xae, 0xd5, 0x9a, 0x2f, 0x7b, 0xde, 0x8c, 0x67, 0xe4, 0x4e, 0x62, 0xc6, 0x43, 0x92, 0xf5, 0x6a, 0xd7,
0xb3, 0xc1, 0xcb, 0xda, 0xc1, 0x01, 0x8a, 0x2d, 0xe0, 0x10, 0xdb, 0xc4, 0x6b, 0x76, 0xe3, 0x98, 0xd9, 0xe0, 0x65, 0xed, 0xe0, 0x40, 0x15, 0x5b, 0xc0, 0x21, 0xb6, 0x89, 0xd7, 0xec, 0xc6, 0x31,
0xb1, 0xb3, 0x14, 0xa9, 0x54, 0x09, 0xcd, 0xa8, 0x3d, 0x56, 0x45, 0x52, 0x4f, 0xd4, 0x9a, 0xb5, 0x63, 0x67, 0x29, 0x52, 0xa9, 0x12, 0x9a, 0x51, 0x7b, 0xac, 0x8a, 0xa4, 0x9e, 0xa8, 0x35, 0x6b,
0x1d, 0xae, 0x1c, 0x96, 0x03, 0xc5, 0x1f, 0xa0, 0xb8, 0x00, 0x05, 0xa9, 0x62, 0x8f, 0xdc, 0x73, 0x3b, 0x5c, 0x39, 0x2c, 0x07, 0x8a, 0x3f, 0x40, 0x71, 0x01, 0x0a, 0x52, 0xc5, 0xfe, 0x87, 0x5c,
0xe1, 0x6f, 0x50, 0x1c, 0xe0, 0x27, 0x70, 0xa7, 0xba, 0xd5, 0xd2, 0x48, 0x9a, 0x96, 0x2d, 0xdb, 0xf8, 0x1b, 0x14, 0x07, 0xf8, 0x09, 0x1c, 0xa9, 0xa2, 0xba, 0xd5, 0xd2, 0xe8, 0xa3, 0x65, 0xcb,
0xe5, 0x24, 0x45, 0x71, 0x93, 0x5e, 0xbf, 0xd7, 0xef, 0xf5, 0xfb, 0xee, 0x7e, 0x30, 0xf3, 0x7c, 0x76, 0x79, 0x93, 0xa2, 0xb8, 0x49, 0xaf, 0xdf, 0xeb, 0xf7, 0xfa, 0x7d, 0x77, 0x3f, 0x98, 0x7b,
0x88, 0xbd, 0x53, 0xbd, 0x47, 0x88, 0x67, 0xae, 0x0e, 0x3c, 0xe2, 0x13, 0x84, 0x1c, 0xcb, 0xfe, 0x31, 0xc6, 0xde, 0x89, 0x3e, 0x20, 0xc4, 0x33, 0x57, 0x46, 0x1e, 0xf1, 0x09, 0x42, 0x8e, 0x65,
0x72, 0x48, 0x83, 0xbf, 0x55, 0xbe, 0xde, 0xae, 0xf7, 0x88, 0xe3, 0x10, 0x37, 0x80, 0xb5, 0xeb, 0x7f, 0x31, 0xa6, 0xc1, 0xdf, 0x0a, 0x5f, 0xef, 0x36, 0x07, 0xc4, 0x71, 0x88, 0x1b, 0xc0, 0xba,
0x71, 0x8c, 0x76, 0xc3, 0x72, 0x7d, 0xec, 0xb9, 0x86, 0x1d, 0xae, 0xd2, 0xde, 0x11, 0x76, 0x0c, 0xcd, 0x38, 0x46, 0xb7, 0x65, 0xb9, 0x3e, 0xf6, 0x5c, 0xc3, 0x0e, 0x57, 0xe9, 0xe0, 0x10, 0x3b,
0xf1, 0xa7, 0x9a, 0x86, 0x6f, 0xc4, 0xf7, 0x6f, 0xcf, 0x58, 0xae, 0x89, 0x4f, 0xe2, 0x20, 0xed, 0x86, 0xf8, 0x53, 0x4d, 0xc3, 0x37, 0xe2, 0xfb, 0x77, 0xe7, 0x2c, 0xd7, 0xc4, 0xc7, 0x71, 0x90,
0x57, 0x0a, 0x2c, 0xec, 0x1f, 0x91, 0xe3, 0x4d, 0x62, 0xdb, 0xb8, 0xe7, 0x5b, 0xc4, 0xa5, 0x1d, 0xf6, 0x2b, 0x05, 0xe6, 0xf7, 0x0e, 0xc9, 0xd1, 0x06, 0xb1, 0x6d, 0x3c, 0xf0, 0x2d, 0xe2, 0xd2,
0xfc, 0x7c, 0x88, 0xa9, 0x8f, 0xee, 0x42, 0xa9, 0x6b, 0x50, 0xdc, 0x52, 0x96, 0x94, 0x3b, 0xb5, 0x1e, 0x7e, 0x31, 0xc6, 0xd4, 0x47, 0xf7, 0xa0, 0xd2, 0x37, 0x28, 0xee, 0x28, 0x8b, 0xca, 0xdd,
0xf5, 0x1b, 0xab, 0x09, 0xe1, 0x84, 0x54, 0x0f, 0x69, 0x7f, 0xc3, 0xa0, 0xb8, 0xc3, 0x31, 0x11, 0xc6, 0xda, 0xcd, 0x95, 0x84, 0x70, 0x42, 0xaa, 0x47, 0x74, 0xb8, 0x6e, 0x50, 0xdc, 0xe3, 0x98,
0x82, 0x92, 0xd9, 0xdd, 0xd9, 0x6a, 0x15, 0x96, 0x94, 0x3b, 0xc5, 0x0e, 0xff, 0x46, 0x1f, 0xc0, 0x08, 0x41, 0xc5, 0xec, 0x6f, 0x6f, 0x76, 0x4a, 0x8b, 0xca, 0xdd, 0x72, 0x8f, 0x7f, 0xa3, 0xf7,
0x74, 0x2f, 0xda, 0x7b, 0x67, 0x8b, 0xb6, 0x8a, 0x4b, 0xc5, 0x3b, 0xc5, 0x4e, 0x12, 0xa8, 0xfd, 0x61, 0x76, 0x10, 0xed, 0xbd, 0xbd, 0x49, 0x3b, 0xe5, 0xc5, 0xf2, 0xdd, 0x72, 0x2f, 0x09, 0xd4,
0x59, 0x81, 0x77, 0xc6, 0xc4, 0xa0, 0x03, 0xe2, 0x52, 0x8c, 0xee, 0x41, 0x85, 0xfa, 0x86, 0x3f, 0xfe, 0xac, 0xc0, 0x37, 0x32, 0x62, 0xd0, 0x11, 0x71, 0x29, 0x46, 0xf7, 0xa1, 0x46, 0x7d, 0xc3,
0xa4, 0x42, 0x92, 0x6f, 0x48, 0x25, 0xd9, 0xe7, 0x28, 0x1d, 0x81, 0x3a, 0xce, 0xb6, 0x20, 0x61, 0x1f, 0x53, 0x21, 0xc9, 0x37, 0xa5, 0x92, 0xec, 0x71, 0x94, 0x9e, 0x40, 0xcd, 0xb2, 0x2d, 0x49,
0x8b, 0xbe, 0x0d, 0x73, 0x96, 0xfb, 0x10, 0x3b, 0xc4, 0x3b, 0xd5, 0x07, 0xd8, 0xeb, 0x61, 0xd7, 0xd8, 0xa2, 0xef, 0xc0, 0x75, 0xcb, 0x7d, 0x84, 0x1d, 0xe2, 0x9d, 0xe8, 0x23, 0xec, 0x0d, 0xb0,
0x37, 0xfa, 0x38, 0x94, 0x71, 0x36, 0x5c, 0xdb, 0x1b, 0x2d, 0x69, 0x7f, 0x52, 0x60, 0x9e, 0x49, 0xeb, 0x1b, 0x43, 0x1c, 0xca, 0x78, 0x2d, 0x5c, 0xdb, 0x9d, 0x2c, 0x69, 0x7f, 0x52, 0xe0, 0x06,
0xba, 0x67, 0x78, 0xbe, 0x75, 0x0d, 0xfa, 0xd2, 0xa0, 0x1e, 0x97, 0xb1, 0x55, 0xe4, 0x6b, 0x09, 0x93, 0x74, 0xd7, 0xf0, 0x7c, 0xeb, 0x0a, 0xf4, 0xa5, 0x41, 0x33, 0x2e, 0x63, 0xa7, 0xcc, 0xd7,
0x18, 0xc3, 0x19, 0x84, 0xec, 0xd9, 0xd9, 0x4a, 0x5c, 0xdc, 0x04, 0x4c, 0xfb, 0xa3, 0x30, 0x6c, 0x12, 0x30, 0x86, 0x33, 0x0a, 0xd9, 0xb3, 0xb3, 0x55, 0xb8, 0xb8, 0x09, 0x98, 0xf6, 0x47, 0x61,
0x5c, 0xce, 0xab, 0x28, 0x34, 0xcd, 0xb3, 0x30, 0xce, 0xf3, 0x32, 0xea, 0x7c, 0xa5, 0xc0, 0xfc, 0xd8, 0xb8, 0x9c, 0x97, 0x51, 0x68, 0x9a, 0x67, 0x29, 0xcb, 0xf3, 0x22, 0xea, 0x7c, 0xad, 0xc0,
0xe7, 0xc4, 0x30, 0x47, 0x86, 0x7f, 0xfd, 0xea, 0xfc, 0x11, 0x54, 0x82, 0xc0, 0x69, 0x95, 0x38, 0x8d, 0xcf, 0x88, 0x61, 0x4e, 0x0c, 0xff, 0xf5, 0xab, 0xf3, 0x47, 0x50, 0x0b, 0x02, 0xa7, 0x53,
0xaf, 0xe5, 0x24, 0x2f, 0x11, 0x54, 0x23, 0x09, 0xf7, 0x39, 0xa0, 0x23, 0x88, 0xb4, 0xdf, 0x29, 0xe1, 0xbc, 0x96, 0x92, 0xbc, 0x44, 0x50, 0x4d, 0x24, 0xdc, 0xe3, 0x80, 0x9e, 0x20, 0xd2, 0x7e,
0xd0, 0xea, 0x60, 0x1b, 0x1b, 0x14, 0xbf, 0xc9, 0x53, 0x2c, 0x40, 0xc5, 0x25, 0x26, 0xde, 0xd9, 0xa7, 0x40, 0xa7, 0x87, 0x6d, 0x6c, 0x50, 0xfc, 0x26, 0x4f, 0x31, 0x0f, 0x35, 0x97, 0x98, 0x78,
0xe2, 0xa7, 0x28, 0x76, 0xc4, 0x9f, 0xf6, 0x2f, 0xa1, 0xe1, 0xb7, 0xdc, 0x61, 0x63, 0x56, 0x28, 0x7b, 0x93, 0x9f, 0xa2, 0xdc, 0x13, 0x7f, 0xda, 0x3f, 0x85, 0x86, 0xdf, 0x72, 0x87, 0x8d, 0x59,
0x5f, 0xc6, 0x0a, 0xaf, 0x46, 0x56, 0x78, 0xdb, 0x4f, 0x3a, 0xb2, 0x54, 0x39, 0x61, 0xa9, 0x9f, 0xa1, 0x7a, 0x11, 0x2b, 0xbc, 0x9e, 0x58, 0xe1, 0x6d, 0x3f, 0xe9, 0xc4, 0x52, 0xd5, 0x84, 0xa5,
0xc3, 0xe2, 0xa6, 0x87, 0x0d, 0x1f, 0xff, 0x94, 0x65, 0xfe, 0xcd, 0x23, 0xc3, 0x75, 0xb1, 0x1d, 0x7e, 0x0e, 0x0b, 0x1b, 0x1e, 0x36, 0x7c, 0xfc, 0x53, 0x96, 0xf9, 0x37, 0x0e, 0x0d, 0xd7, 0xc5,
0x1e, 0x21, 0xcd, 0x5c, 0x91, 0x30, 0x6f, 0xc1, 0xe4, 0xc0, 0x23, 0x27, 0xa7, 0x91, 0xdc, 0xe1, 0x76, 0x78, 0x84, 0x34, 0x73, 0x45, 0xc2, 0xbc, 0x03, 0xd3, 0x23, 0x8f, 0x1c, 0x9f, 0x44, 0x72,
0xaf, 0xf6, 0x17, 0x05, 0xda, 0xb2, 0xbd, 0xaf, 0x92, 0x11, 0xde, 0x87, 0x69, 0x51, 0xc2, 0x82, 0x87, 0xbf, 0xda, 0x5f, 0x14, 0xe8, 0xca, 0xf6, 0xbe, 0x4c, 0x46, 0x78, 0x0f, 0x66, 0x45, 0x09,
0xdd, 0x38, 0xcf, 0x6a, 0xa7, 0xfe, 0x3c, 0xc6, 0x01, 0xdd, 0x85, 0xb9, 0x00, 0xc9, 0xc3, 0x74, 0x0b, 0x76, 0xe3, 0x3c, 0xeb, 0xbd, 0xe6, 0x8b, 0x18, 0x07, 0x74, 0x0f, 0xae, 0x07, 0x48, 0x1e,
0x68, 0xfb, 0x11, 0x6e, 0x91, 0xe3, 0x22, 0xbe, 0xd6, 0xe1, 0x4b, 0x82, 0x42, 0x7b, 0xa9, 0xc0, 0xa6, 0x63, 0xdb, 0x8f, 0x70, 0xcb, 0x1c, 0x17, 0xf1, 0xb5, 0x1e, 0x5f, 0x12, 0x14, 0xda, 0x2b,
0xe2, 0x36, 0xf6, 0x23, 0x23, 0x32, 0xae, 0xf8, 0x2d, 0x4d, 0xb2, 0x5f, 0x2b, 0xd0, 0x96, 0xc9, 0x05, 0x16, 0xb6, 0xb0, 0x1f, 0x19, 0x91, 0x71, 0xc5, 0x6f, 0x69, 0x92, 0xfd, 0x4a, 0x81, 0xae,
0x7a, 0x15, 0xb5, 0x3e, 0x81, 0x85, 0x88, 0x87, 0x6e, 0x62, 0xda, 0xf3, 0xac, 0x01, 0x77, 0x66, 0x4c, 0xd6, 0xcb, 0xa8, 0xf5, 0x29, 0xcc, 0x47, 0x3c, 0x74, 0x13, 0xd3, 0x81, 0x67, 0x8d, 0xb8,
0x9e, 0x72, 0x6b, 0xeb, 0xef, 0xaf, 0x8e, 0x77, 0x09, 0xab, 0x69, 0x09, 0xe6, 0xa3, 0x2d, 0xb6, 0x33, 0xf3, 0x94, 0xdb, 0x58, 0x7b, 0x6f, 0x25, 0xdb, 0x25, 0xac, 0xa4, 0x25, 0xb8, 0x11, 0x6d,
0x62, 0x3b, 0x68, 0xbf, 0x51, 0x60, 0x7e, 0x1b, 0xfb, 0xfb, 0xb8, 0xef, 0x60, 0xd7, 0xdf, 0x71, 0xb1, 0x19, 0xdb, 0x41, 0xfb, 0x8d, 0x02, 0x37, 0xb6, 0xb0, 0xbf, 0x87, 0x87, 0x0e, 0x76, 0xfd,
0x0f, 0xc9, 0xe5, 0xf5, 0x7a, 0x0b, 0x80, 0x8a, 0x7d, 0xa2, 0x72, 0x10, 0x83, 0xe4, 0xd1, 0x31, 0x6d, 0xf7, 0x80, 0x5c, 0x5c, 0xaf, 0xb7, 0x01, 0xa8, 0xd8, 0x27, 0x2a, 0x07, 0x31, 0x48, 0x11,
0xef, 0x3e, 0xd2, 0xf2, 0x5c, 0x45, 0x77, 0xdf, 0x85, 0xb2, 0xe5, 0x1e, 0x92, 0x50, 0x55, 0xef, 0x1d, 0xf3, 0xee, 0x23, 0x2d, 0xcf, 0x65, 0x74, 0xf7, 0x3d, 0xa8, 0x5a, 0xee, 0x01, 0x09, 0x55,
0xca, 0x54, 0x15, 0x67, 0x16, 0x60, 0x6b, 0x7f, 0x2b, 0xc2, 0xc2, 0x7d, 0xd3, 0x94, 0x85, 0xdd, 0xf5, 0x8e, 0x4c, 0x55, 0x71, 0x66, 0x01, 0xb6, 0xf6, 0x9f, 0x12, 0xcc, 0x3f, 0x30, 0x4d, 0x59,
0xc5, 0xf5, 0x32, 0x8a, 0xee, 0x42, 0x3c, 0xba, 0x73, 0xf9, 0xdc, 0x58, 0x48, 0x95, 0x2e, 0x10, 0xd8, 0x9d, 0x5f, 0x2f, 0x93, 0xe8, 0x2e, 0xc5, 0xa3, 0xbb, 0x90, 0xcf, 0x65, 0x42, 0xaa, 0x72,
0x52, 0xe5, 0xac, 0x90, 0x42, 0xdf, 0x83, 0x77, 0xfa, 0x36, 0xe9, 0x1a, 0xb6, 0x4e, 0xb1, 0x61, 0x8e, 0x90, 0xaa, 0xe6, 0x85, 0x14, 0xda, 0x82, 0x59, 0x8a, 0xf1, 0x73, 0x7d, 0x44, 0x28, 0xf7,
0x63, 0x53, 0x8f, 0xcc, 0xd4, 0xaa, 0x70, 0xbb, 0xcd, 0x07, 0xcb, 0xfb, 0x7c, 0x35, 0x54, 0xd0, 0x89, 0x4e, 0x8d, 0x9f, 0x46, 0x4b, 0x9e, 0x26, 0x6a, 0x26, 0x1f, 0xd1, 0xe1, 0xae, 0xc0, 0xec,
0x16, 0xda, 0x86, 0x69, 0x8a, 0xf1, 0x33, 0x7d, 0x40, 0x28, 0xf7, 0xa5, 0xd6, 0x24, 0xd7, 0x82, 0x35, 0x19, 0x61, 0xf8, 0x87, 0x9e, 0xc0, 0xfc, 0xd0, 0x26, 0x7d, 0xc3, 0xd6, 0x29, 0x36, 0x6c,
0x96, 0xd4, 0x42, 0xd4, 0x84, 0x3e, 0xa4, 0xfd, 0x3d, 0x81, 0xd9, 0xa9, 0x33, 0xc2, 0xf0, 0x0f, 0x6c, 0xea, 0xc2, 0xde, 0xb4, 0x33, 0x5d, 0x4c, 0xe1, 0xd7, 0x03, 0xf2, 0x3d, 0x4e, 0x2d, 0x16,
0x3d, 0x86, 0x05, 0xa9, 0x00, 0xb4, 0x35, 0x95, 0xcf, 0x50, 0x73, 0x12, 0x01, 0xa9, 0xf6, 0x4f, 0xa8, 0xf6, 0x0f, 0x05, 0x16, 0x7a, 0xd8, 0x21, 0x5f, 0xe0, 0xff, 0x55, 0x13, 0x68, 0xff, 0x2a,
0x05, 0x16, 0x3b, 0xd8, 0x21, 0x5f, 0xe2, 0xff, 0x55, 0xd3, 0x69, 0xff, 0x2e, 0xc0, 0xc2, 0xcf, 0xc1, 0xfc, 0xcf, 0x0c, 0x7f, 0x70, 0xb8, 0xe9, 0x08, 0x10, 0x7d, 0x33, 0xe7, 0x2b, 0x52, 0xa0,
0x0c, 0xbf, 0x77, 0xb4, 0xe5, 0x08, 0x10, 0x7d, 0x33, 0xe7, 0xcb, 0x53, 0xd8, 0xa2, 0xf0, 0x2b, 0xa2, 0x30, 0xaa, 0xca, 0xac, 0xca, 0xae, 0x15, 0x2b, 0x9f, 0x8b, 0x23, 0xc7, 0xc2, 0x28, 0x56,
0xcb, 0xac, 0xca, 0xae, 0x23, 0xab, 0x5f, 0x88, 0x23, 0xc7, 0xc2, 0x2f, 0x56, 0xf9, 0x2b, 0x97, 0xc1, 0x6b, 0x17, 0xa8, 0xe0, 0x68, 0x03, 0x66, 0xf1, 0xf1, 0xc0, 0x1e, 0x9b, 0x58, 0x0f, 0xb8,
0xa8, 0xfc, 0x68, 0x13, 0xa6, 0xf1, 0x49, 0xcf, 0x1e, 0x9a, 0x58, 0x0f, 0xb8, 0x4f, 0x72, 0xee, 0x07, 0x3e, 0x75, 0x5b, 0xc2, 0x3d, 0xee, 0x52, 0x4d, 0x41, 0xb4, 0xcd, 0x43, 0xf9, 0xb5, 0x02,
0xb7, 0x24, 0xdc, 0xe3, 0x2e, 0x55, 0x17, 0x44, 0x3b, 0x3c, 0x05, 0xbc, 0x52, 0x60, 0x31, 0xd0, 0x0b, 0x81, 0x9e, 0xb1, 0xed, 0x1b, 0x6f, 0x56, 0xd5, 0x91, 0x1a, 0x2b, 0xe7, 0x51, 0xa3, 0xf6,
0x33, 0xb6, 0x7d, 0xe3, 0xcd, 0xaa, 0x3a, 0x52, 0x63, 0xe9, 0x22, 0x6a, 0xd4, 0xfe, 0x50, 0x82, 0x87, 0x0a, 0xb4, 0xc5, 0x01, 0x59, 0xdf, 0xc6, 0x96, 0xd0, 0x4d, 0xa8, 0x47, 0xa9, 0x55, 0x94,
0xa6, 0x38, 0x20, 0xeb, 0xf7, 0xd8, 0x12, 0xba, 0x01, 0xd5, 0x51, 0xac, 0x07, 0x2d, 0xc3, 0x08, 0xfe, 0x09, 0x00, 0x2d, 0x42, 0x23, 0x66, 0x3f, 0x21, 0x69, 0x1c, 0x54, 0x48, 0xdc, 0xb0, 0x50,
0x80, 0x96, 0xa0, 0x16, 0xb3, 0x9f, 0x90, 0x34, 0x0e, 0xca, 0x25, 0x6e, 0x58, 0x60, 0x4b, 0xb1, 0x56, 0x62, 0x85, 0xf2, 0x16, 0xc0, 0x81, 0x3d, 0xa6, 0x87, 0xba, 0x6f, 0x39, 0x58, 0xb4, 0x2b,
0x02, 0x7b, 0x13, 0xe0, 0xd0, 0x1e, 0xd2, 0x23, 0xdd, 0xb7, 0x1c, 0x2c, 0xda, 0x9c, 0x2a, 0x87, 0x75, 0x0e, 0xd9, 0xb7, 0x1c, 0x8c, 0x1e, 0x40, 0xb3, 0x6f, 0xb9, 0x36, 0x19, 0xea, 0x23, 0xc3,
0x1c, 0x58, 0x0e, 0x46, 0xf7, 0xa1, 0xde, 0xb5, 0x5c, 0x9b, 0xf4, 0xf5, 0x81, 0xe1, 0x1f, 0x51, 0x3f, 0xa4, 0x9d, 0x5a, 0xae, 0xc5, 0x1e, 0x5a, 0xd8, 0x36, 0xd7, 0x39, 0x6e, 0xaf, 0x11, 0xd0,
0x9e, 0x85, 0xe4, 0x16, 0x7b, 0x60, 0x61, 0xdb, 0xdc, 0xe0, 0xb8, 0x9d, 0x5a, 0x40, 0xb3, 0xc7, 0xec, 0x32, 0x12, 0x74, 0x1b, 0x1a, 0xee, 0xd8, 0xd1, 0xc9, 0x81, 0xee, 0x91, 0x23, 0x66, 0x73,
0x48, 0xd0, 0x2d, 0xa8, 0xb9, 0x43, 0x47, 0x27, 0x87, 0xba, 0x47, 0x8e, 0x29, 0xcf, 0x4c, 0xc5, 0xce, 0xc2, 0x1d, 0x3b, 0x8f, 0x0f, 0x7a, 0xe4, 0x88, 0xa2, 0x1f, 0x42, 0x9d, 0x25, 0x77, 0x6a,
0x4e, 0xd5, 0x1d, 0x3a, 0x8f, 0x0e, 0x3b, 0xe4, 0x98, 0xa2, 0x1f, 0x42, 0x95, 0x15, 0x05, 0x6a, 0x93, 0x21, 0xed, 0xcc, 0x14, 0xda, 0x7f, 0x42, 0xc0, 0xa8, 0x4d, 0xe6, 0x08, 0x9c, 0xba, 0x5e,
0x93, 0x7e, 0x98, 0x65, 0xce, 0xdb, 0x7f, 0x44, 0xc0, 0xa8, 0x4d, 0xe6, 0x08, 0x9c, 0xba, 0x9a, 0x8c, 0x3a, 0x22, 0x40, 0x77, 0xa0, 0x35, 0x20, 0xce, 0xc8, 0xe0, 0x1a, 0x7a, 0xe8, 0x11, 0xa7,
0x8f, 0x3a, 0x22, 0x40, 0xb7, 0xa1, 0xd1, 0x23, 0xce, 0xc0, 0xe0, 0x1a, 0x7a, 0xe0, 0x11, 0xa7, 0x03, 0x3c, 0x5a, 0x52, 0x50, 0xf4, 0x2e, 0x34, 0xb1, 0x6b, 0xf4, 0x6d, 0xe6, 0xb8, 0x26, 0x3e,
0x05, 0x3c, 0x5a, 0x52, 0x50, 0xf4, 0x1e, 0xd4, 0xb1, 0x6b, 0x74, 0x6d, 0xe6, 0xb8, 0x26, 0x3e, 0xee, 0x34, 0x16, 0x95, 0xbb, 0x33, 0xbd, 0x46, 0x00, 0xdb, 0x66, 0x20, 0xf4, 0x18, 0xd4, 0xe0,
0x69, 0xd5, 0x96, 0x94, 0x3b, 0x53, 0x9d, 0x5a, 0x00, 0xdb, 0x61, 0x20, 0xf4, 0x08, 0xd4, 0xe0, 0xf2, 0xcd, 0x14, 0x25, 0xfc, 0xbb, 0xc9, 0xe5, 0x59, 0x4a, 0x67, 0x61, 0x13, 0x1f, 0xaf, 0x70,
0xd2, 0xce, 0x14, 0x25, 0xfc, 0xbb, 0xce, 0xe5, 0x59, 0x4e, 0x67, 0x61, 0x13, 0x9f, 0xac, 0x72, 0xa2, 0x87, 0x96, 0x8d, 0x99, 0x92, 0xb8, 0x73, 0xb4, 0xf8, 0x42, 0xf8, 0x4b, 0xb5, 0x57, 0x25,
0xa2, 0x07, 0x96, 0x8d, 0x99, 0x92, 0xb8, 0x73, 0x34, 0xf8, 0x42, 0xf8, 0x4b, 0xb5, 0x97, 0x05, 0xb8, 0xc6, 0xdc, 0x23, 0x4c, 0xa2, 0x17, 0x77, 0xf1, 0x5b, 0x00, 0x26, 0xf5, 0xf5, 0x84, 0x9b,
0x98, 0x65, 0xee, 0x11, 0x26, 0xd1, 0xcb, 0xbb, 0xf8, 0x4d, 0x00, 0x93, 0xfa, 0x7a, 0xc2, 0xcd, 0xd7, 0x4d, 0xea, 0xef, 0x04, 0x9e, 0xfe, 0x71, 0xe8, 0xc5, 0xe5, 0xfc, 0xf6, 0x23, 0xe5, 0xae,
0xab, 0x26, 0xf5, 0x77, 0x03, 0x4f, 0xff, 0x24, 0xf4, 0xe2, 0x62, 0x76, 0xdb, 0x92, 0x72, 0xd7, 0xd9, 0x84, 0x70, 0x91, 0x8b, 0x15, 0x4b, 0xc5, 0x94, 0x8c, 0xbd, 0x01, 0xd6, 0x13, 0xed, 0x72,
0xf1, 0x84, 0x70, 0x99, 0x0b, 0x19, 0x4b, 0xc5, 0x94, 0x0c, 0xbd, 0x1e, 0xd6, 0x13, 0x6d, 0x76, 0x33, 0x00, 0xee, 0xc8, 0x03, 0xb1, 0x26, 0x69, 0x33, 0xfe, 0xae, 0xc0, 0xbc, 0xb8, 0x1b, 0x5c,
0x3d, 0x00, 0xee, 0xca, 0x03, 0xb1, 0x22, 0x69, 0x4f, 0xfe, 0xa1, 0xc0, 0x82, 0xb8, 0x53, 0x5c, 0x5e, 0x5d, 0x79, 0x19, 0x21, 0x0c, 0x9f, 0xf2, 0x29, 0x7d, 0x66, 0xa5, 0x40, 0x42, 0xae, 0x4a,
0x5d, 0x5d, 0x59, 0x19, 0x21, 0x0c, 0x9f, 0xe2, 0x19, 0xfd, 0x69, 0x29, 0x47, 0x42, 0x2e, 0x4b, 0x12, 0x72, 0xb2, 0xd7, 0xaa, 0xa5, 0x7b, 0x2d, 0xed, 0xb7, 0x0a, 0xcc, 0x7f, 0x62, 0xb8, 0x26,
0x12, 0x72, 0xb2, 0x47, 0xab, 0xa4, 0x7b, 0x34, 0xed, 0xb7, 0x0a, 0x2c, 0x7c, 0x6a, 0xb8, 0x26, 0x39, 0x38, 0xb8, 0xfc, 0x01, 0x37, 0xa0, 0x49, 0x27, 0xf9, 0xb5, 0x70, 0x2f, 0x95, 0x20, 0xd2,
0x39, 0x3c, 0xbc, 0xfa, 0x01, 0x37, 0xa1, 0x4e, 0x47, 0xf9, 0x35, 0x77, 0x0f, 0x96, 0x20, 0xd2, 0xbe, 0x2c, 0x01, 0x62, 0xee, 0xb0, 0x6e, 0xd8, 0x86, 0x3b, 0xc0, 0x17, 0x97, 0x66, 0x09, 0x5a,
0xbe, 0x2a, 0x00, 0x62, 0xee, 0xb0, 0x61, 0xd8, 0x86, 0xdb, 0xc3, 0x97, 0x97, 0x66, 0x19, 0x1a, 0x09, 0x27, 0x88, 0x5e, 0x72, 0xe2, 0x5e, 0x40, 0xd1, 0xa7, 0xd0, 0xea, 0x07, 0xac, 0x74, 0x0f,
0x09, 0x27, 0x88, 0x5e, 0x80, 0xe2, 0x5e, 0x40, 0xd1, 0x67, 0xd0, 0xe8, 0x06, 0xac, 0x74, 0x0f, 0x1b, 0x94, 0xb8, 0xdc, 0x0e, 0xad, 0xb5, 0xf7, 0x65, 0x62, 0xef, 0x7b, 0xd6, 0x70, 0x88, 0xbd,
0x1b, 0x94, 0xb8, 0xdc, 0x0e, 0x8d, 0xf5, 0x0f, 0x64, 0x62, 0x1f, 0x78, 0x56, 0xbf, 0x8f, 0xbd, 0x0d, 0xe2, 0x9a, 0x41, 0x97, 0x33, 0xdb, 0x0f, 0xc5, 0x64, 0xa4, 0xe8, 0x1d, 0x68, 0x4c, 0x22,
0x4d, 0xe2, 0x9a, 0x41, 0x97, 0x33, 0xdd, 0x0d, 0xc5, 0x64, 0xa4, 0xe8, 0x5d, 0xa8, 0x8d, 0x22, 0x22, 0x2c, 0x91, 0x10, 0x85, 0x04, 0x45, 0x1f, 0xc2, 0x5c, 0xb2, 0x01, 0x9a, 0x18, 0x4e, 0xa5,
0x22, 0x2c, 0x91, 0x10, 0x85, 0x04, 0x45, 0x1f, 0xc1, 0x4c, 0xba, 0x03, 0x0b, 0x0d, 0xa7, 0xd2, 0xf1, 0xde, 0x86, 0x19, 0xe7, 0x97, 0x80, 0xa2, 0xa2, 0xcf, 0x4b, 0x13, 0xcf, 0xe8, 0x45, 0xee,
0x64, 0xf3, 0x45, 0xb5, 0x5f, 0x02, 0x8a, 0x8a, 0x3e, 0x2f, 0x4d, 0x3c, 0xa3, 0xe7, 0xb9, 0x07, 0x73, 0x37, 0xa1, 0x6e, 0x86, 0x94, 0xe2, 0x76, 0x35, 0x01, 0xb0, 0xf0, 0x08, 0x24, 0xd4, 0x6d,
0xde, 0x80, 0xaa, 0x19, 0x52, 0x8a, 0x5b, 0xd9, 0x08, 0xc0, 0xc2, 0x23, 0x90, 0x50, 0xb7, 0x89, 0x62, 0x98, 0xd8, 0x0c, 0x93, 0x7a, 0x00, 0xfc, 0x8c, 0xc3, 0xb4, 0xaf, 0x4a, 0xa0, 0xc6, 0x9b,
0x61, 0x62, 0x33, 0x4c, 0xea, 0x01, 0xf0, 0x73, 0x0e, 0xd3, 0xbe, 0x2e, 0x80, 0x1a, 0x6f, 0xaa, 0xaa, 0xc2, 0xbc, 0xaf, 0xe6, 0x76, 0x77, 0x4a, 0x07, 0x59, 0xb9, 0x44, 0x07, 0x99, 0xed, 0x70,
0x72, 0xf3, 0xbe, 0x9e, 0x5b, 0xe1, 0x19, 0x1d, 0x64, 0xe9, 0x0a, 0x1d, 0xe4, 0x78, 0x87, 0x5b, 0xab, 0x17, 0xeb, 0x70, 0xb5, 0xdf, 0x2b, 0xd0, 0x4e, 0x5d, 0xa6, 0xd2, 0xe5, 0x55, 0xc9, 0x96,
0xbe, 0x5c, 0x87, 0xab, 0xfd, 0x5e, 0x81, 0x66, 0xea, 0x12, 0x96, 0x2e, 0xaf, 0xca, 0x78, 0x79, 0xd7, 0xef, 0x43, 0x95, 0xd5, 0x1c, 0xcc, 0x95, 0xd4, 0x4a, 0xb3, 0x95, 0x5d, 0xd1, 0x7a, 0x01,
0xfd, 0x3e, 0x94, 0x59, 0xcd, 0xc1, 0x5c, 0x49, 0x8d, 0x34, 0x5b, 0xd9, 0xd5, 0xae, 0x13, 0x10, 0x01, 0x5a, 0x85, 0x6b, 0x92, 0x27, 0x33, 0x61, 0x4a, 0x94, 0x7d, 0x31, 0xd3, 0xfe, 0x5a, 0x81,
0xa0, 0x35, 0x98, 0x95, 0x3c, 0xb5, 0x09, 0x53, 0xa2, 0xf1, 0x97, 0x36, 0xed, 0xaf, 0x25, 0xa8, 0x46, 0x4c, 0x1f, 0x67, 0x74, 0x06, 0x69, 0x4b, 0x97, 0x24, 0x96, 0x4e, 0x1d, 0xaf, 0x9c, 0x3d,
0xc5, 0xf4, 0x71, 0x4e, 0x67, 0x90, 0xb6, 0x74, 0x41, 0x62, 0xe9, 0xd4, 0xf1, 0x8a, 0xe3, 0xc7, 0x5e, 0xce, 0xd3, 0x12, 0x5a, 0x80, 0x19, 0x07, 0x3b, 0x3a, 0xb5, 0x5e, 0x86, 0xbd, 0xc1, 0xb4,
0xcb, 0x78, 0x92, 0x42, 0x8b, 0x30, 0xe5, 0x60, 0x47, 0xa7, 0xd6, 0x8b, 0xb0, 0x37, 0x98, 0x74, 0x83, 0x9d, 0x3d, 0xeb, 0x25, 0x66, 0x4b, 0xac, 0xac, 0xf3, 0x9a, 0x1e, 0xa4, 0xe4, 0x69, 0x77,
0xb0, 0xb3, 0x6f, 0xbd, 0xc0, 0x6c, 0x89, 0x95, 0x75, 0x5e, 0xd3, 0x83, 0x94, 0x3c, 0xe9, 0x0e, 0xec, 0xf0, 0x8a, 0x7e, 0x0b, 0x20, 0x28, 0x85, 0xae, 0xe1, 0x60, 0x5e, 0xf0, 0xeb, 0xbd, 0x3a,
0x1d, 0x5e, 0xd1, 0x6f, 0x02, 0x04, 0xa5, 0xd0, 0x35, 0x1c, 0xcc, 0x0b, 0x7e, 0xb5, 0x53, 0xe5, 0x87, 0xec, 0x18, 0x0e, 0x46, 0x1d, 0x98, 0xe6, 0x3f, 0xdb, 0x9b, 0x9d, 0x99, 0x80, 0x50, 0xfc,
0x90, 0x5d, 0xc3, 0xc1, 0xa8, 0x05, 0x93, 0xfc, 0x67, 0x67, 0xab, 0x35, 0x15, 0x10, 0x8a, 0xdf, 0x26, 0xc3, 0xa1, 0x9e, 0x0e, 0x87, 0xa2, 0xc5, 0xfa, 0x1e, 0x5c, 0x1b, 0xf0, 0x97, 0x10, 0x73,
0x64, 0x38, 0x54, 0xd3, 0xe1, 0x90, 0xb7, 0x58, 0xdf, 0x85, 0xd9, 0x1e, 0x7f, 0x41, 0x31, 0x37, 0xfd, 0x64, 0x23, 0x5a, 0x12, 0x35, 0x5b, 0xb6, 0x84, 0x1e, 0x32, 0xe7, 0xe2, 0x1a, 0xd5, 0x03,
0x4e, 0x37, 0xa3, 0x25, 0x51, 0xb3, 0x65, 0x4b, 0xe8, 0x01, 0x73, 0x2e, 0xae, 0x51, 0x3d, 0xb0, 0x2b, 0x37, 0xb9, 0x95, 0xdf, 0x95, 0xdf, 0x48, 0x03, 0xcc, 0xc0, 0xc8, 0x61, 0x4e, 0xe4, 0x7f,
0x72, 0x9d, 0x5b, 0xf9, 0x3d, 0xf9, 0x4d, 0x36, 0xc0, 0x0c, 0x8c, 0x1c, 0xe6, 0x44, 0xfe, 0x37, 0x99, 0x36, 0x61, 0xb6, 0x58, 0x9b, 0xd0, 0xba, 0x4c, 0x9b, 0xf0, 0x65, 0x19, 0x5a, 0x93, 0x02,
0xd6, 0x26, 0x4c, 0xe7, 0x6b, 0x13, 0x1a, 0x57, 0x69, 0x13, 0xbe, 0x2a, 0x42, 0x63, 0x54, 0x60, 0x5b, 0x38, 0xfa, 0x8b, 0xbc, 0xf6, 0xee, 0x80, 0x3a, 0x79, 0xa8, 0xe0, 0x8a, 0x39, 0xb5, 0x47,
0x73, 0x47, 0x7f, 0x9e, 0x57, 0xe2, 0x5d, 0x50, 0x47, 0x0f, 0x1c, 0x5c, 0x31, 0x67, 0xf6, 0x08, 0x48, 0x3f, 0x51, 0xb4, 0x47, 0xa9, 0x30, 0xfb, 0x18, 0xea, 0x2c, 0x91, 0xe9, 0xfe, 0xc9, 0x08,
0xe9, 0xa7, 0x8d, 0xe6, 0x20, 0x15, 0x66, 0x9f, 0x40, 0x95, 0x25, 0x32, 0xdd, 0x3f, 0x1d, 0x60, 0x73, 0x47, 0x6b, 0xa5, 0x0b, 0x44, 0xb0, 0x11, 0xcb, 0x6c, 0xfb, 0x27, 0x23, 0xdc, 0x9b, 0xb1,
0xee, 0x68, 0x8d, 0x74, 0x81, 0x08, 0x36, 0x62, 0x99, 0xed, 0xe0, 0x74, 0x80, 0x3b, 0x53, 0xb6, 0xc5, 0xd7, 0x25, 0xdf, 0x0e, 0xd1, 0x7d, 0xb8, 0xe1, 0x05, 0xed, 0x81, 0xa9, 0x27, 0x8e, 0x1d,
0xf8, 0xba, 0xe2, 0x9b, 0x23, 0xba, 0x07, 0xf3, 0x5e, 0xd0, 0x1e, 0x98, 0x7a, 0xe2, 0xd8, 0x41, 0x54, 0xda, 0xeb, 0xe1, 0xe2, 0x6e, 0xfc, 0xf8, 0x39, 0x91, 0x3b, 0x9d, 0x1b, 0xb9, 0xff, 0x56,
0xa5, 0x9d, 0x0b, 0x17, 0xf7, 0xe2, 0xc7, 0xcf, 0x88, 0xdc, 0xc9, 0xcc, 0xc8, 0xfd, 0x8f, 0x02, 0x60, 0x4e, 0x78, 0x07, 0xf3, 0xd9, 0x21, 0xbf, 0xb1, 0xb0, 0x3c, 0x4b, 0x5c, 0xdb, 0x72, 0xa3,
0x33, 0xc2, 0x3b, 0x98, 0xcf, 0xf6, 0xf9, 0x8d, 0x85, 0xe5, 0x59, 0xe2, 0xda, 0x96, 0x1b, 0x35, 0x26, 0x47, 0x98, 0x23, 0x00, 0x8a, 0x26, 0xe7, 0x13, 0x68, 0x0b, 0xa4, 0x28, 0x5d, 0x16, 0xac,
0x39, 0xc2, 0x1c, 0x01, 0x50, 0x34, 0x39, 0x9f, 0x42, 0x53, 0x20, 0x45, 0xe9, 0x32, 0x67, 0x55, 0xca, 0xad, 0x80, 0x2e, 0x4a, 0x94, 0x4b, 0xd0, 0x22, 0x07, 0x07, 0x71, 0x7e, 0x41, 0xbc, 0xcf,
0x6e, 0x04, 0x74, 0x51, 0xa2, 0x5c, 0x86, 0x06, 0x39, 0x3c, 0x8c, 0xf3, 0x0b, 0xe2, 0x7d, 0x5a, 0x0a, 0xa8, 0x60, 0xf8, 0x13, 0x50, 0x43, 0xb4, 0xf3, 0x26, 0xe8, 0xb6, 0x20, 0x8c, 0x6e, 0xf7,
0x40, 0x05, 0xc3, 0x9f, 0x80, 0x1a, 0xa2, 0x5d, 0x34, 0x41, 0x37, 0x05, 0x61, 0x74, 0xbb, 0xff, 0xbf, 0x56, 0xa0, 0x93, 0x4c, 0xd7, 0xb1, 0xe3, 0x9f, 0xbf, 0x21, 0xf8, 0x41, 0xf2, 0x8d, 0x67,
0xb5, 0x02, 0xad, 0x64, 0xba, 0x8e, 0x1d, 0xff, 0xe2, 0x0d, 0xc1, 0x0f, 0x92, 0x6f, 0x43, 0xcb, 0xe9, 0x14, 0x79, 0x26, 0x7c, 0x44, 0x47, 0xba, 0xfc, 0x12, 0x5a, 0x49, 0x3f, 0x44, 0x4d, 0x98,
0x67, 0xc8, 0x33, 0xe2, 0x23, 0x3a, 0xd2, 0x95, 0x17, 0xd0, 0x48, 0xfa, 0x21, 0xaa, 0xc3, 0xd4, 0xd9, 0x21, 0xfe, 0x8f, 0x8f, 0x2d, 0xea, 0xab, 0x53, 0xa8, 0x05, 0xb0, 0x43, 0xfc, 0x5d, 0x0f,
0x2e, 0xf1, 0x7f, 0x7c, 0x62, 0x51, 0x5f, 0x9d, 0x40, 0x0d, 0x80, 0x5d, 0xe2, 0xef, 0x79, 0x98, 0x53, 0xec, 0xfa, 0xaa, 0x82, 0x00, 0x6a, 0x8f, 0xdd, 0x4d, 0x8b, 0x3e, 0x57, 0x4b, 0xe8, 0x9a,
0x62, 0xd7, 0x57, 0x15, 0x04, 0x50, 0x79, 0xe4, 0x6e, 0x59, 0xf4, 0x99, 0x5a, 0x40, 0xb3, 0xa2, 0xa8, 0x0c, 0x86, 0xbd, 0x2d, 0x8c, 0xab, 0x96, 0x19, 0x79, 0xf4, 0x57, 0x41, 0x2a, 0x34, 0x23,
0x32, 0x18, 0xf6, 0x8e, 0x30, 0xae, 0x5a, 0x64, 0xe4, 0xd1, 0x5f, 0x09, 0xa9, 0x50, 0x8f, 0x50, 0x94, 0xad, 0xdd, 0x27, 0x6a, 0x15, 0xd5, 0xa1, 0x1a, 0x7c, 0xd6, 0x96, 0x4d, 0x50, 0xd3, 0x8d,
0xb6, 0xf7, 0x1e, 0xab, 0x65, 0x54, 0x85, 0x72, 0xf0, 0x59, 0x59, 0x31, 0x41, 0x4d, 0x37, 0x1e, 0x07, 0xdb, 0xf3, 0x89, 0xfb, 0xa9, 0x4b, 0x8e, 0x22, 0x90, 0x3a, 0x85, 0x1a, 0x30, 0x2d, 0x9a,
0x6c, 0xcf, 0xc7, 0xee, 0x67, 0x2e, 0x39, 0x8e, 0x40, 0xea, 0x04, 0xaa, 0xc1, 0xa4, 0x68, 0xe6, 0x39, 0x55, 0x41, 0x6d, 0x68, 0xc4, 0xfa, 0x28, 0xb5, 0xc4, 0x00, 0x5b, 0xde, 0x68, 0x20, 0x3a,
0x54, 0x05, 0x35, 0xa1, 0x16, 0xeb, 0xa3, 0xd4, 0x02, 0x03, 0x6c, 0x7b, 0x83, 0x9e, 0xe8, 0xa8, 0xaa, 0x40, 0x04, 0x66, 0xb5, 0x4d, 0x72, 0xe4, 0xaa, 0x95, 0xe5, 0x07, 0x30, 0x13, 0x06, 0x08,
0x02, 0x11, 0x98, 0xd5, 0xb6, 0xc8, 0xb1, 0xab, 0x96, 0x56, 0xee, 0xc3, 0x54, 0x18, 0x20, 0xec, 0x3b, 0x4d, 0xb0, 0x3b, 0xfb, 0x53, 0xa7, 0xd0, 0x1c, 0xcc, 0x26, 0x26, 0x01, 0xaa, 0x82, 0x10,
0x34, 0xc1, 0xee, 0xec, 0x4f, 0x9d, 0x40, 0x33, 0x30, 0x9d, 0x98, 0x20, 0xa8, 0x0a, 0x42, 0xd0, 0xb4, 0xec, 0xc4, 0xf8, 0x45, 0x2d, 0xad, 0xfd, 0xad, 0x01, 0x10, 0xf4, 0x0c, 0x84, 0x78, 0x26,
0xb0, 0x13, 0x63, 0x1b, 0xb5, 0xb0, 0xfe, 0xf7, 0x1a, 0x40, 0xd0, 0x33, 0x10, 0xe2, 0x99, 0x68, 0x1a, 0x01, 0xda, 0xc2, 0x3e, 0xcb, 0x87, 0xc4, 0x0d, 0x73, 0x19, 0x45, 0xf7, 0x72, 0x4a, 0x6b,
0x00, 0x68, 0x1b, 0xfb, 0x2c, 0x1f, 0x12, 0x37, 0xcc, 0x65, 0x14, 0xdd, 0xcd, 0x28, 0xad, 0xe3, 0x16, 0x55, 0x48, 0xda, 0xbd, 0x93, 0x43, 0x91, 0x42, 0xd7, 0xa6, 0x90, 0xc3, 0x39, 0xb2, 0x1b,
0xa8, 0x42, 0xd2, 0xf6, 0xed, 0x0c, 0x8a, 0x14, 0xba, 0x36, 0x81, 0x1c, 0xce, 0x91, 0xdd, 0x30, 0xe6, 0xbe, 0x35, 0x78, 0x1e, 0x35, 0x1b, 0xf9, 0x1c, 0x53, 0xa8, 0x21, 0xc7, 0x54, 0x1e, 0x12,
0x0f, 0xac, 0xde, 0xb3, 0xa8, 0xd9, 0xc8, 0xe6, 0x98, 0x42, 0x0d, 0x39, 0xa6, 0xf2, 0x90, 0xf8, 0x3f, 0x7b, 0xbe, 0x67, 0xb9, 0xc3, 0xf0, 0xa1, 0x51, 0x9b, 0x42, 0x2f, 0xe0, 0xfa, 0x16, 0xe6,
0xd9, 0xf7, 0x3d, 0xcb, 0xed, 0x87, 0x0f, 0x94, 0xda, 0x04, 0x7a, 0x0e, 0x73, 0xdb, 0x98, 0x73, 0xdc, 0x2d, 0xea, 0x5b, 0x03, 0x1a, 0x32, 0x5c, 0xcb, 0x67, 0x98, 0x41, 0x3e, 0x27, 0x4b, 0x1b,
0xb7, 0xa8, 0x6f, 0xf5, 0x68, 0xc8, 0x70, 0x3d, 0x9b, 0xe1, 0x18, 0xf2, 0x05, 0x59, 0xda, 0xd0, 0xda, 0xa9, 0x71, 0x27, 0x5a, 0x96, 0x3a, 0xb2, 0x74, 0x34, 0xdb, 0xfd, 0xb0, 0x10, 0x6e, 0xc4,
0x4c, 0x8d, 0x49, 0xd1, 0x8a, 0xd4, 0x91, 0xa5, 0x23, 0xdd, 0xf6, 0x47, 0xb9, 0x70, 0x23, 0x6e, 0xcd, 0x82, 0x56, 0x72, 0x14, 0x88, 0xbe, 0x95, 0xb7, 0x41, 0x66, 0x76, 0xd2, 0x5d, 0x2e, 0x82,
0x16, 0x34, 0x92, 0x23, 0x44, 0xf4, 0xcd, 0xac, 0x0d, 0xc6, 0x66, 0x2e, 0xed, 0x95, 0x3c, 0xa8, 0x1a, 0xb1, 0x7a, 0x0a, 0xad, 0xe4, 0xb0, 0x49, 0xce, 0x4a, 0x3a, 0x90, 0xea, 0x9e, 0xf6, 0xc6,
0x11, 0xab, 0x27, 0xd0, 0x48, 0x0e, 0xa9, 0xe4, 0xac, 0xa4, 0x83, 0xac, 0xf6, 0x59, 0x6f, 0xc3, 0xab, 0x4d, 0xa1, 0x5f, 0xc0, 0x5c, 0x66, 0xc2, 0x83, 0xbe, 0x2d, 0xdb, 0x3e, 0x6f, 0x10, 0x74,
0xda, 0x04, 0xfa, 0x05, 0xcc, 0x8c, 0x4d, 0x86, 0xd0, 0xb7, 0x64, 0xdb, 0x67, 0x0d, 0x90, 0xce, 0x16, 0x07, 0x21, 0xfd, 0x44, 0x8b, 0xf9, 0xd2, 0x67, 0x46, 0x7d, 0xc5, 0xa5, 0x8f, 0x6d, 0x7f,
0xe3, 0x20, 0xa4, 0x1f, 0x69, 0x31, 0x5b, 0xfa, 0xb1, 0x11, 0x61, 0x7e, 0xe9, 0x63, 0xdb, 0x9f, 0x9a, 0xf4, 0xe7, 0xe6, 0x30, 0x06, 0x94, 0x9d, 0xf1, 0xa0, 0x8f, 0x64, 0x2c, 0x72, 0xe7, 0x4c,
0x25, 0xfd, 0x85, 0x39, 0x0c, 0x01, 0x8d, 0xcf, 0x86, 0xd0, 0xc7, 0x32, 0x16, 0x99, 0xf3, 0xa9, 0xdd, 0x95, 0xa2, 0xe8, 0x91, 0xc9, 0xc7, 0x3c, 0x5a, 0xd3, 0x4d, 0xb3, 0x94, 0x6d, 0xee, 0x5c,
0xf6, 0x6a, 0x5e, 0xf4, 0xc8, 0xe4, 0x43, 0x1e, 0xad, 0xe9, 0xa6, 0x59, 0xca, 0x36, 0x73, 0x1e, 0x47, 0xce, 0x36, 0x7f, 0xb4, 0x12, 0x38, 0x75, 0x72, 0x74, 0x20, 0xb7, 0x95, 0x74, 0xdc, 0x21,
0x24, 0x67, 0x9b, 0x3d, 0x92, 0x09, 0x9c, 0x3a, 0x39, 0x72, 0x90, 0xdb, 0x4a, 0x3a, 0x26, 0x91, 0x77, 0x6a, 0xf9, 0x24, 0x42, 0x9b, 0x42, 0xfb, 0x89, 0x1c, 0x8c, 0xee, 0xe4, 0xf9, 0x44, 0xf2,
0x3b, 0xb5, 0x7c, 0x82, 0xa1, 0x4d, 0xa0, 0x83, 0x44, 0x0e, 0x46, 0xb7, 0xb3, 0x7c, 0x22, 0x79, 0xb2, 0x7b, 0x96, 0xb9, 0x74, 0x80, 0x2d, 0xec, 0x3f, 0xc2, 0xbe, 0x67, 0x0d, 0x68, 0x7a, 0x53,
0xd9, 0x3d, 0xcf, 0x5c, 0x3a, 0xc0, 0x36, 0xf6, 0x1f, 0x62, 0xdf, 0xb3, 0x7a, 0x34, 0xbd, 0xa9, 0xf1, 0x33, 0x41, 0x08, 0x37, 0xfd, 0xe0, 0x4c, 0xbc, 0x50, 0xec, 0xb5, 0x57, 0x00, 0x75, 0x6e,
0xf8, 0x19, 0x21, 0x84, 0x9b, 0x7e, 0x78, 0x2e, 0x5e, 0x28, 0xf6, 0xfa, 0x4b, 0x80, 0x2a, 0xb7, 0x33, 0x56, 0x1e, 0xfe, 0x9f, 0xc6, 0xaf, 0x20, 0x8d, 0x3f, 0x83, 0x76, 0x6a, 0x6e, 0x24, 0x4f,
0x19, 0x2b, 0x0f, 0xff, 0x4f, 0xe3, 0xd7, 0x90, 0xc6, 0x9f, 0x42, 0x33, 0x35, 0x6f, 0x92, 0xa7, 0xe3, 0xf2, 0xe1, 0xd2, 0x59, 0x0e, 0xd2, 0x07, 0x94, 0x9d, 0x8a, 0xc8, 0x03, 0x2b, 0x77, 0x7a,
0x71, 0xf9, 0x50, 0xea, 0x3c, 0x07, 0xe9, 0x02, 0x1a, 0x9f, 0x8a, 0xc8, 0x03, 0x2b, 0x73, 0x7a, 0x72, 0x16, 0x8f, 0x67, 0xd0, 0x4e, 0x8d, 0x25, 0xe4, 0x27, 0x90, 0xcf, 0x2e, 0x0a, 0x9c, 0x20,
0x72, 0x1e, 0x8f, 0xa7, 0xd0, 0x4c, 0x8d, 0x25, 0xe4, 0x27, 0x90, 0xcf, 0x2e, 0x72, 0x9c, 0x60, 0xfb, 0x18, 0x2f, 0x3f, 0x41, 0xee, 0xa3, 0xfd, 0x59, 0x3c, 0x3e, 0x87, 0x66, 0xfc, 0x1d, 0x14,
0xfc, 0x31, 0x5e, 0x7e, 0x82, 0xcc, 0x47, 0xfb, 0xf3, 0x78, 0x7c, 0x01, 0xf5, 0xf8, 0x3b, 0x28, 0x7d, 0x90, 0x17, 0x9d, 0xa9, 0x97, 0xb1, 0x37, 0x9f, 0xaf, 0xaf, 0xbe, 0x9e, 0x3d, 0x83, 0x76,
0xfa, 0x30, 0x2b, 0x3a, 0x53, 0x2f, 0x63, 0x6f, 0x3e, 0x5f, 0x5f, 0x7f, 0x3d, 0x7b, 0x0a, 0xcd, 0xea, 0xdd, 0x53, 0x6e, 0x5d, 0xf9, 0xe3, 0xe8, 0x59, 0xbb, 0x7f, 0x8d, 0x19, 0xf8, 0xaa, 0x73,
0xd4, 0xbb, 0xa7, 0xdc, 0xba, 0xf2, 0xc7, 0xd1, 0xf3, 0x76, 0x7f, 0x8d, 0x19, 0xf8, 0xba, 0x73, 0xe5, 0xfa, 0x77, 0x9f, 0xae, 0x0d, 0x2d, 0xff, 0x70, 0xdc, 0x67, 0xa7, 0x5c, 0x0d, 0x30, 0x3f,
0xe5, 0xc6, 0x77, 0x9e, 0xac, 0xf7, 0x2d, 0xff, 0x68, 0xd8, 0x65, 0xa7, 0x5c, 0x0b, 0x30, 0x3f, 0xb2, 0x88, 0xf8, 0x5a, 0x0d, 0x93, 0xc6, 0x2a, 0xdf, 0x69, 0x95, 0x4b, 0x3b, 0xea, 0xf7, 0x6b,
0xb6, 0x88, 0xf8, 0x5a, 0x0b, 0x93, 0xc6, 0x1a, 0xdf, 0x69, 0x8d, 0x4b, 0x3b, 0xe8, 0x76, 0x2b, 0xfc, 0xf7, 0xfe, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xc8, 0x7f, 0xb6, 0xed, 0xcc, 0x27, 0x00,
0xfc, 0xf7, 0xde, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x78, 0x64, 0xb3, 0xad, 0x04, 0x28, 0x00,
0x00, 0x00,
} }

View File

@ -340,9 +340,22 @@ func (c *queryNodeCluster) addQueryChannel(ctx context.Context, nodeID int64, in
c.RUnlock() c.RUnlock()
if targetNode != nil { if targetNode != nil {
err := targetNode.addQueryChannel(ctx, in) emptyChangeInfo := &querypb.SealedSegmentsChangeInfo{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SealedSegmentsChangeInfo,
},
}
msgPosition, err := c.clusterMeta.sendSealedSegmentChangeInfos(in.CollectionID, in.QueryChannel, emptyChangeInfo)
if err != nil { if err != nil {
log.Debug("addQueryChannel: queryNode add query channel error", zap.String("error", err.Error())) log.Error("addQueryChannel: get latest messageID of query channel error", zap.String("queryChannel", in.QueryChannel), zap.Error(err))
return err
}
// update watch position to latest
in.SeekPosition = msgPosition
err = targetNode.addQueryChannel(ctx, in)
if err != nil {
log.Error("addQueryChannel: queryNode add query channel error", zap.String("queryChannel", in.QueryChannel), zap.Error(err))
return err return err
} }
return nil return nil

View File

@ -33,6 +33,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
minioKV "github.com/milvus-io/milvus/internal/kv/minio" minioKV "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/etcdpb"
@ -463,8 +464,19 @@ func TestGrpcRequest(t *testing.T) {
clusterSession := sessionutil.NewSession(context.Background(), Params.MetaRootPath, Params.EtcdEndpoints) clusterSession := sessionutil.NewSession(context.Background(), Params.MetaRootPath, Params.EtcdEndpoints)
clusterSession.Init(typeutil.QueryCoordRole, Params.Address, true) clusterSession.Init(typeutil.QueryCoordRole, Params.Address, true)
clusterSession.Register() clusterSession.Register()
meta, err := newMeta(baseCtx, kv, nil, nil) factory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"PulsarAddress": Params.PulsarAddress,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
err = factory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
idAllocator := func() (UniqueID, error) {
return 0, nil
}
meta, err := newMeta(baseCtx, kv, factory, idAllocator)
assert.Nil(t, err)
cluster := &queryNodeCluster{ cluster := &queryNodeCluster{
ctx: baseCtx, ctx: baseCtx,
cancel: cancel, cancel: cancel,
@ -532,8 +544,7 @@ func TestGrpcRequest(t *testing.T) {
}) })
t.Run("Test AddQueryChannel", func(t *testing.T) { t.Run("Test AddQueryChannel", func(t *testing.T) {
info, err := cluster.clusterMeta.getQueryChannelInfoByID(defaultCollectionID) info := cluster.clusterMeta.getQueryChannelInfoByID(defaultCollectionID)
assert.Nil(t, err)
addQueryChannelReq := &querypb.AddQueryChannelRequest{ addQueryChannelReq := &querypb.AddQueryChannelRequest{
NodeID: nodeID, NodeID: nodeID,
CollectionID: defaultCollectionID, CollectionID: defaultCollectionID,
@ -545,8 +556,7 @@ func TestGrpcRequest(t *testing.T) {
}) })
t.Run("Test RemoveQueryChannel", func(t *testing.T) { t.Run("Test RemoveQueryChannel", func(t *testing.T) {
info, err := cluster.clusterMeta.getQueryChannelInfoByID(defaultCollectionID) info := cluster.clusterMeta.getQueryChannelInfoByID(defaultCollectionID)
assert.Nil(t, err)
removeQueryChannelReq := &querypb.RemoveQueryChannelRequest{ removeQueryChannelReq := &querypb.RemoveQueryChannelRequest{
NodeID: nodeID, NodeID: nodeID,
CollectionID: defaultCollectionID, CollectionID: defaultCollectionID,

View File

@ -701,19 +701,7 @@ func (qc *QueryCoord) CreateQueryChannel(ctx context.Context, req *querypb.Creat
} }
collectionID := req.CollectionID collectionID := req.CollectionID
info, err := qc.meta.getQueryChannelInfoByID(collectionID) info := qc.meta.getQueryChannelInfoByID(collectionID)
if err != nil {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
log.Error("createQueryChannel end with error",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Error(err))
return &querypb.CreateQueryChannelResponse{
Status: status,
}, nil
}
log.Debug("createQueryChannelRequest completed", log.Debug("createQueryChannelRequest completed",
zap.String("role", typeutil.QueryCoordRole), zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID), zap.Int64("collectionID", collectionID),

View File

@ -760,25 +760,6 @@ func TestGrpcTaskBeforeHealthy(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
} }
func Test_GrpcGetQueryChannelFail(t *testing.T) {
kv := &testKv{
returnFn: failedResult,
}
meta, err := newMeta(context.Background(), kv, nil, nil)
assert.Nil(t, err)
queryCoord := &QueryCoord{
meta: meta,
}
queryCoord.stateCode.Store(internalpb.StateCode_Healthy)
res, err := queryCoord.CreateQueryChannel(context.Background(), &querypb.CreateQueryChannelRequest{
CollectionID: defaultCollectionID,
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, res.Status.ErrorCode)
}
func TestQueryCoord_GetComponentStates(t *testing.T) { func TestQueryCoord_GetComponentStates(t *testing.T) {
n := &QueryCoord{} n := &QueryCoord{}
n.stateCode.Store(internalpb.StateCode_Healthy) n.stateCode.Store(internalpb.StateCode_Healthy)

View File

@ -37,15 +37,12 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/mqclient"
) )
const ( const (
collectionMetaPrefix = "queryCoord-collectionMeta" collectionMetaPrefix = "queryCoord-collectionMeta"
dmChannelMetaPrefix = "queryCoord-dmChannelWatchInfo" dmChannelMetaPrefix = "queryCoord-dmChannelWatchInfo"
queryChannelMetaPrefix = "queryCoord-queryChannel" deltaChannelMetaPrefix = "queryCoord-deltaChannel"
deltaChannelMetaPrefix = "queryCoord-deltaChannel"
globalQuerySeekPositionPrefix = "queryCoord-globalQuerySeekPosition"
) )
type col2SegmentInfos = map[UniqueID][]*querypb.SegmentInfo type col2SegmentInfos = map[UniqueID][]*querypb.SegmentInfo
@ -80,15 +77,15 @@ type Meta interface {
getDeltaChannelsByCollectionID(collectionID UniqueID) ([]*datapb.VchannelInfo, error) getDeltaChannelsByCollectionID(collectionID UniqueID) ([]*datapb.VchannelInfo, error)
setDeltaChannel(collectionID UniqueID, info []*datapb.VchannelInfo) error setDeltaChannel(collectionID UniqueID, info []*datapb.VchannelInfo) error
getQueryChannelInfoByID(collectionID UniqueID) (*querypb.QueryChannelInfo, error) getQueryChannelInfoByID(collectionID UniqueID) *querypb.QueryChannelInfo
getQueryStreamByID(collectionID UniqueID) (msgstream.MsgStream, error) getQueryStreamByID(collectionID UniqueID, queryChannel string) (msgstream.MsgStream, error)
setLoadType(collectionID UniqueID, loadType querypb.LoadType) error setLoadType(collectionID UniqueID, loadType querypb.LoadType) error
setLoadPercentage(collectionID UniqueID, partitionID UniqueID, percentage int64, loadType querypb.LoadType) error setLoadPercentage(collectionID UniqueID, partitionID UniqueID, percentage int64, loadType querypb.LoadType) error
//printMeta() //printMeta()
saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2SealedSegmentChangeInfos, error) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2SealedSegmentChangeInfos, error)
removeGlobalSealedSegInfos(collectionID UniqueID, partitionIDs []UniqueID) (col2SealedSegmentChangeInfos, error) removeGlobalSealedSegInfos(collectionID UniqueID, partitionIDs []UniqueID) (col2SealedSegmentChangeInfos, error)
sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos *querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error) sendSealedSegmentChangeInfos(collectionID UniqueID, queryChannel string, changeInfos *querypb.SealedSegmentsChangeInfo) (*internalpb.MsgPosition, error)
} }
// MetaReplica records the current load information on all querynodes // MetaReplica records the current load information on all querynodes
@ -113,7 +110,6 @@ type MetaReplica struct {
queryStreams map[UniqueID]msgstream.MsgStream queryStreams map[UniqueID]msgstream.MsgStream
streamMu sync.RWMutex streamMu sync.RWMutex
globalSeekPosition *internalpb.MsgPosition
//partitionStates map[UniqueID]*querypb.PartitionStates //partitionStates map[UniqueID]*querypb.PartitionStates
} }
@ -125,7 +121,6 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll
deltaChannelInfos := make(map[UniqueID][]*datapb.VchannelInfo) deltaChannelInfos := make(map[UniqueID][]*datapb.VchannelInfo)
dmChannelInfos := make(map[string]*querypb.DmChannelWatchInfo) dmChannelInfos := make(map[string]*querypb.DmChannelWatchInfo)
queryMsgStream := make(map[UniqueID]msgstream.MsgStream) queryMsgStream := make(map[UniqueID]msgstream.MsgStream)
position := &internalpb.MsgPosition{}
m := &MetaReplica{ m := &MetaReplica{
ctx: childCtx, ctx: childCtx,
@ -134,13 +129,12 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll
msFactory: factory, msFactory: factory,
idAllocator: idAllocator, idAllocator: idAllocator,
collectionInfos: collectionInfos, collectionInfos: collectionInfos,
segmentInfos: segmentInfos, segmentInfos: segmentInfos,
queryChannelInfos: queryChannelInfos, queryChannelInfos: queryChannelInfos,
deltaChannelInfos: deltaChannelInfos, deltaChannelInfos: deltaChannelInfos,
dmChannelInfos: dmChannelInfos, dmChannelInfos: dmChannelInfos,
queryStreams: queryMsgStream, queryStreams: queryMsgStream,
globalSeekPosition: position,
} }
err := m.reloadFromKV() err := m.reloadFromKV()
@ -187,23 +181,6 @@ func (m *MetaReplica) reloadFromKV() error {
m.segmentInfos[segmentID] = segmentInfo m.segmentInfos[segmentID] = segmentInfo
} }
queryChannelKeys, queryChannelValues, err := m.client.LoadWithPrefix(queryChannelMetaPrefix)
if err != nil {
return nil
}
for index := range queryChannelKeys {
collectionID, err := strconv.ParseInt(filepath.Base(queryChannelKeys[index]), 10, 64)
if err != nil {
return err
}
queryChannelInfo := &querypb.QueryChannelInfo{}
err = proto.Unmarshal([]byte(queryChannelValues[index]), queryChannelInfo)
if err != nil {
return err
}
m.queryChannelInfos[collectionID] = queryChannelInfo
}
deltaChannelKeys, deltaChannelValues, err := m.client.LoadWithPrefix(deltaChannelMetaPrefix) deltaChannelKeys, deltaChannelValues, err := m.client.LoadWithPrefix(deltaChannelMetaPrefix)
if err != nil { if err != nil {
return nil return nil
@ -236,15 +213,6 @@ func (m *MetaReplica) reloadFromKV() error {
m.dmChannelInfos[dmChannel] = dmChannelWatchInfo m.dmChannelInfos[dmChannel] = dmChannelWatchInfo
} }
globalSeekPosValue, err := m.client.Load(globalQuerySeekPositionPrefix)
if err == nil {
position := &internalpb.MsgPosition{}
err = proto.Unmarshal([]byte(globalSeekPosValue), position)
if err != nil {
return err
}
m.globalSeekPosition = position
}
//TODO::update partition states //TODO::update partition states
log.Debug("reload from kv finished") log.Debug("reload from kv finished")
@ -520,26 +488,14 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
} }
queryChannelInfosMap := make(map[UniqueID]*querypb.QueryChannelInfo) queryChannelInfosMap := make(map[UniqueID]*querypb.QueryChannelInfo)
var globalSeekPositionTmp *internalpb.MsgPosition
for collectionID, segmentChangeInfos := range col2SegmentChangeInfos { for collectionID, segmentChangeInfos := range col2SegmentChangeInfos {
// get msgStream to produce sealedSegmentChangeInfos to query channel // get msgStream to produce sealedSegmentChangeInfos to query channel
queryChannelInfo, messageIDInfos, err := m.sendSealedSegmentChangeInfos(collectionID, segmentChangeInfos) queryChannelInfo := m.getQueryChannelInfoByID(collectionID)
msgPosition, err := m.sendSealedSegmentChangeInfos(collectionID, queryChannelInfo.QueryChannel, segmentChangeInfos)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// len(messageIDs) == 1 queryChannelInfo.SeekPosition = msgPosition
messageIDs, ok := messageIDInfos[queryChannelInfo.QueryChannel]
if !ok || len(messageIDs) == 0 {
return col2SegmentChangeInfos, errors.New("updateGlobalSealedSegmentInfos: send sealed segment change info failed")
}
if queryChannelInfo.SeekPosition == nil {
queryChannelInfo.SeekPosition = &internalpb.MsgPosition{
ChannelName: queryChannelInfo.QueryChannel,
}
}
queryChannelInfo.SeekPosition.MsgID = messageIDs[0].Serialize()
// update segmentInfo, queryChannelInfo meta to cache and etcd // update segmentInfo, queryChannelInfo meta to cache and etcd
seg2Info := make(map[UniqueID]*querypb.SegmentInfo) seg2Info := make(map[UniqueID]*querypb.SegmentInfo)
@ -560,7 +516,6 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
} }
queryChannelInfo.GlobalSealedSegments = globalSealedSegmentInfos queryChannelInfo.GlobalSealedSegments = globalSealedSegmentInfos
queryChannelInfosMap[collectionID] = queryChannelInfo queryChannelInfosMap[collectionID] = queryChannelInfo
globalSeekPositionTmp = queryChannelInfo.SeekPosition
} }
// save segmentInfo to etcd // save segmentInfo to etcd
@ -591,22 +546,8 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
} }
} }
// save queryChannelInfo and sealedSegmentsChangeInfo to etcd // save sealedSegmentsChangeInfo to etcd
saveKvs := make(map[string]string) saveKvs := make(map[string]string)
for collectionID, queryChannelInfo := range queryChannelInfosMap {
channelInfoBytes, err := proto.Marshal(queryChannelInfo)
if err != nil {
return col2SegmentChangeInfos, err
}
channelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID)
saveKvs[channelKey] = string(channelInfoBytes)
}
seekPos, err := proto.Marshal(globalSeekPositionTmp)
if err != nil {
return col2SegmentChangeInfos, err
}
saveKvs[globalQuerySeekPositionPrefix] = string(seekPos)
// save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd // save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd
// avoid the produce process success but save meta to etcd failed // avoid the produce process success but save meta to etcd failed
// then the msgID key will not exist, and changeIndo will be ignored by query node // then the msgID key will not exist, and changeIndo will be ignored by query node
@ -620,7 +561,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
saveKvs[changeInfoKey] = string(changeInfoBytes) saveKvs[changeInfoKey] = string(changeInfoBytes)
} }
err = m.client.MultiSave(saveKvs) err := m.client.MultiSave(saveKvs)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -641,7 +582,6 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
for collectionID, channelInfo := range queryChannelInfosMap { for collectionID, channelInfo := range queryChannelInfosMap {
m.queryChannelInfos[collectionID] = channelInfo m.queryChannelInfos[collectionID] = channelInfo
} }
m.globalSeekPosition = globalSeekPositionTmp
m.channelMu.Unlock() m.channelMu.Unlock()
return col2SegmentChangeInfos, nil return col2SegmentChangeInfos, nil
@ -669,23 +609,13 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
segmentChangeInfos.Infos = append(segmentChangeInfos.Infos, changeInfo) segmentChangeInfos.Infos = append(segmentChangeInfos.Infos, changeInfo)
} }
// get msgStream to produce sealedSegmentChangeInfos to query channel // produce sealedSegmentChangeInfos to query channel
queryChannelInfo, messageIDInfos, err := m.sendSealedSegmentChangeInfos(collectionID, segmentChangeInfos) queryChannelInfo := m.getQueryChannelInfoByID(collectionID)
msgPosition, err := m.sendSealedSegmentChangeInfos(collectionID, queryChannelInfo.QueryChannel, segmentChangeInfos)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// len(messageIDs) = 1 queryChannelInfo.SeekPosition = msgPosition
messageIDs, ok := messageIDInfos[queryChannelInfo.QueryChannel]
if !ok || len(messageIDs) == 0 {
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, errors.New("updateGlobalSealedSegmentInfos: send sealed segment change info failed")
}
if queryChannelInfo.SeekPosition == nil {
queryChannelInfo.SeekPosition = &internalpb.MsgPosition{
ChannelName: queryChannelInfo.QueryChannel,
}
}
queryChannelInfo.SeekPosition.MsgID = messageIDs[0].Serialize()
// update segmentInfo, queryChannelInfo meta to cache and etcd // update segmentInfo, queryChannelInfo meta to cache and etcd
seg2Info := make(map[UniqueID]*querypb.SegmentInfo) seg2Info := make(map[UniqueID]*querypb.SegmentInfo)
@ -714,20 +644,7 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
} }
} }
// save meta to etcd
saveKvs := make(map[string]string) saveKvs := make(map[string]string)
channelInfoBytes, err := proto.Marshal(queryChannelInfo)
if err != nil {
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err
}
channelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID)
saveKvs[channelKey] = string(channelInfoBytes)
seekPos, err := proto.Marshal(queryChannelInfo.SeekPosition)
if err != nil {
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err
}
saveKvs[globalQuerySeekPositionPrefix] = string(seekPos)
// save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd // save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd
// avoid the produce process success but save meta to etcd failed // avoid the produce process success but save meta to etcd failed
// then the msgID key will not exist, and changeIndo will be ignored by query node // then the msgID key will not exist, and changeIndo will be ignored by query node
@ -752,25 +669,18 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
m.channelMu.Lock() m.channelMu.Lock()
m.queryChannelInfos[collectionID] = queryChannelInfo m.queryChannelInfos[collectionID] = queryChannelInfo
m.globalSeekPosition = queryChannelInfo.SeekPosition
m.channelMu.Unlock() m.channelMu.Unlock()
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, nil return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, nil
} }
// send sealed segment change infos into query channels // send sealed segment change infos into query channels
func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos *querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error) { func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, queryChannel string, changeInfos *querypb.SealedSegmentsChangeInfo) (*internalpb.MsgPosition, error) {
// get msgStream to produce sealedSegmentChangeInfos to query channel // get msgStream to produce sealedSegmentChangeInfos to query channel
queryChannelInfo, err := m.getQueryChannelInfoByID(collectionID) queryStream, err := m.getQueryStreamByID(collectionID, queryChannel)
if err != nil { if err != nil {
log.Error("updateGlobalSealedSegmentInfos: get query channel info failed", zap.Int64("collectionID", collectionID), zap.Error(err)) log.Error("sendSealedSegmentChangeInfos: get query stream failed", zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, nil, err return nil, err
}
queryStream, err := m.getQueryStreamByID(collectionID)
if err != nil {
log.Error("updateGlobalSealedSegmentInfos: get query stream failed", zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, nil, err
} }
var msgPack = &msgstream.MsgPack{ var msgPack = &msgstream.MsgPack{
@ -778,8 +688,8 @@ func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, change
} }
id, err := m.idAllocator() id, err := m.idAllocator()
if err != nil { if err != nil {
log.Error("allocator trigger taskID failed", zap.Error(err)) log.Error("sendSealedSegmentChangeInfos: allocator trigger taskID failed", zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, nil, err return nil, err
} }
changeInfos.Base.MsgID = id changeInfos.Base.MsgID = id
segmentChangeMsg := &msgstream.SealedSegmentsChangeInfoMsg{ segmentChangeMsg := &msgstream.SealedSegmentsChangeInfoMsg{
@ -792,12 +702,25 @@ func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, change
messageIDInfos, err := queryStream.ProduceMark(msgPack) messageIDInfos, err := queryStream.ProduceMark(msgPack)
if err != nil { if err != nil {
log.Error("updateGlobalSealedSegmentInfos: send sealed segment change info failed", zap.Int64("collectionID", collectionID), zap.Error(err)) log.Error("sendSealedSegmentChangeInfos: send sealed segment change info failed", zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, nil, err return nil, err
} }
log.Debug("updateGlobalSealedSegmentInfos: send sealed segment change info to queryChannel", zap.Any("msgPack", msgPack))
return queryChannelInfo, messageIDInfos, nil messageIDs, ok := messageIDInfos[queryChannel]
if !ok {
return nil, fmt.Errorf("sendSealedSegmentChangeInfos: send sealed segment change info to wrong query channel, collectionID = %d, query channel = %s", collectionID, queryChannel)
}
// len(messageIDs) = 1
if len(messageIDs) != 1 {
return nil, fmt.Errorf("sendSealedSegmentChangeInfos: length of the positions in stream is not correct, collectionID = %d, query channel = %s, len = %d", collectionID, queryChannel, len(messageIDs))
}
log.Debug("updateGlobalSealedSegmentInfos: send sealed segment change info to queryChannel", zap.Any("msgPack", msgPack))
return &internalpb.MsgPosition{
ChannelName: queryChannel,
MsgID: messageIDs[0].Serialize(),
}, nil
} }
func (m *MetaReplica) showSegmentInfos(collectionID UniqueID, partitionIDs []UniqueID) []*querypb.SegmentInfo { func (m *MetaReplica) showSegmentInfos(collectionID UniqueID, partitionIDs []UniqueID) []*querypb.SegmentInfo {
@ -814,11 +737,12 @@ func (m *MetaReplica) showSegmentInfos(collectionID UniqueID, partitionIDs []Uni
if len(partitionIDs) == 0 { if len(partitionIDs) == 0 {
return segmentInfos return segmentInfos
} }
partitionIDMap := getCompareMapFromSlice(partitionIDs)
for _, info := range segmentInfos { for _, info := range segmentInfos {
for _, partitionID := range partitionIDs { partitionID := info.PartitionID
if info.PartitionID == partitionID { if _, ok := partitionIDMap[partitionID]; ok {
results = append(results, info) results = append(results, info)
}
} }
} }
return results return results
@ -905,21 +829,26 @@ func (m *MetaReplica) setDmChannelInfos(dmChannelWatchInfos []*querypb.DmChannel
return nil return nil
} }
func createQueryChannel(collectionID UniqueID) *querypb.QueryChannelInfo { func (m *MetaReplica) createQueryChannel(collectionID UniqueID) *querypb.QueryChannelInfo {
// TODO::to remove
// all collection use the same query channel
colIDForAssignChannel := UniqueID(0)
searchPrefix := Params.SearchChannelPrefix searchPrefix := Params.SearchChannelPrefix
searchResultPrefix := Params.SearchResultChannelPrefix searchResultPrefix := Params.SearchResultChannelPrefix
allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(collectionID, 10) allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10)
allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(collectionID, 10) allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10)
log.Debug("query coordinator create query channel", zap.String("queryChannelName", allocatedQueryChannel), zap.String("queryResultChannelName", allocatedQueryResultChannel)) log.Debug("query coordinator create query channel", zap.String("queryChannelName", allocatedQueryChannel), zap.String("queryResultChannelName", allocatedQueryResultChannel))
seekPosition := &internalpb.MsgPosition{ seekPosition := &internalpb.MsgPosition{
ChannelName: allocatedQueryChannel, ChannelName: allocatedQueryChannel,
} }
segmentInfos := m.showSegmentInfos(collectionID, nil)
info := &querypb.QueryChannelInfo{ info := &querypb.QueryChannelInfo{
CollectionID: collectionID, CollectionID: collectionID,
QueryChannel: allocatedQueryChannel, QueryChannel: allocatedQueryChannel,
QueryResultChannel: allocatedQueryResultChannel, QueryResultChannel: allocatedQueryResultChannel,
GlobalSealedSegments: []*querypb.SegmentInfo{}, GlobalSealedSegments: segmentInfos,
SeekPosition: seekPosition, SeekPosition: seekPosition,
} }
@ -957,57 +886,42 @@ func (m *MetaReplica) setDeltaChannel(collectionID UniqueID, infos []*datapb.Vch
} }
// Get Query channel info for collection, so far all the collection share the same query channel 0 // Get Query channel info for collection, so far all the collection share the same query channel 0
func (m *MetaReplica) getQueryChannelInfoByID(collectionID UniqueID) (*querypb.QueryChannelInfo, error) { func (m *MetaReplica) getQueryChannelInfoByID(collectionID UniqueID) *querypb.QueryChannelInfo {
m.channelMu.Lock() m.channelMu.Lock()
defer m.channelMu.Unlock() defer m.channelMu.Unlock()
var channelInfo *querypb.QueryChannelInfo
if info, ok := m.queryChannelInfos[collectionID]; ok { if info, ok := m.queryChannelInfos[collectionID]; ok {
return proto.Clone(info).(*querypb.QueryChannelInfo), nil channelInfo = proto.Clone(info).(*querypb.QueryChannelInfo)
} else {
channelInfo = m.createQueryChannel(collectionID)
m.queryChannelInfos[collectionID] = channelInfo
} }
// TODO::to remove return proto.Clone(channelInfo).(*querypb.QueryChannelInfo)
// all collection use the same query channel
colIDForAssignChannel := UniqueID(0)
info := createQueryChannel(colIDForAssignChannel)
err := saveQueryChannelInfo(info, m.client)
if err != nil {
log.Error("getQueryChannel: save channel to etcd error", zap.Error(err))
return nil, err
}
// set info.collectionID from 0 to realID
info.CollectionID = collectionID
m.queryChannelInfos[collectionID] = info
info.SeekPosition = m.globalSeekPosition
if info.SeekPosition != nil {
info.SeekPosition.ChannelName = info.QueryChannel
}
return proto.Clone(info).(*querypb.QueryChannelInfo), nil
} }
func (m *MetaReplica) getQueryStreamByID(collectionID UniqueID) (msgstream.MsgStream, error) { func (m *MetaReplica) getQueryStreamByID(collectionID UniqueID, queryChannel string) (msgstream.MsgStream, error) {
m.streamMu.Lock() m.streamMu.Lock()
defer m.streamMu.Unlock() defer m.streamMu.Unlock()
info, err := m.getQueryChannelInfoByID(collectionID) var queryStream msgstream.MsgStream
if err != nil { var err error
return nil, err if stream, ok := m.queryStreams[collectionID]; ok {
} queryStream = stream
} else {
stream, ok := m.queryStreams[collectionID] queryStream, err = m.msFactory.NewMsgStream(m.ctx)
if !ok {
stream, err = m.msFactory.NewMsgStream(m.ctx)
if err != nil { if err != nil {
log.Error("updateGlobalSealedSegmentInfos: create msgStream failed", zap.Error(err)) log.Error("updateGlobalSealedSegmentInfos: create msgStream failed", zap.Error(err))
return nil, err return nil, err
} }
queryChannel := info.QueryChannel queryStream.AsProducer([]string{queryChannel})
stream.AsProducer([]string{queryChannel}) m.queryStreams[collectionID] = queryStream
m.queryStreams[collectionID] = stream
log.Debug("getQueryStreamByID: create query msgStream for collection", zap.Int64("collectionID", collectionID)) log.Debug("getQueryStreamByID: create query msgStream for collection", zap.Int64("collectionID", collectionID))
} }
return stream, nil return queryStream, nil
} }
func (m *MetaReplica) setLoadType(collectionID UniqueID, loadType querypb.LoadType) error { func (m *MetaReplica) setLoadType(collectionID UniqueID, loadType querypb.LoadType) error {
@ -1107,16 +1021,6 @@ func saveGlobalCollectionInfo(collectionID UniqueID, info *querypb.CollectionInf
return kv.Save(key, string(infoBytes)) return kv.Save(key, string(infoBytes))
} }
func saveQueryChannelInfo(info *querypb.QueryChannelInfo, kv kv.MetaKv) error {
infoBytes, err := proto.Marshal(info)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, info.CollectionID)
return kv.Save(key, string(infoBytes))
}
func saveDeltaChannelInfo(collectionID UniqueID, infos []*datapb.VchannelInfo, kv kv.MetaKv) error { func saveDeltaChannelInfo(collectionID UniqueID, infos []*datapb.VchannelInfo, kv kv.MetaKv) error {
kvs := make(map[string]string) kvs := make(map[string]string)
for _, info := range infos { for _, info := range infos {

View File

@ -29,7 +29,6 @@ import (
"github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util"
) )
@ -106,12 +105,11 @@ func TestMetaFunc(t *testing.T) {
NodeID: nodeID, NodeID: nodeID,
} }
meta := &MetaReplica{ meta := &MetaReplica{
client: kv, client: kv,
collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
segmentInfos: segmentInfos, segmentInfos: segmentInfos,
queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{}, queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{},
dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{}, dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{},
globalSeekPosition: &internalpb.MsgPosition{},
} }
dmChannels := []string{"testDm1", "testDm2"} dmChannels := []string{"testDm1", "testDm2"}
@ -149,9 +147,8 @@ func TestMetaFunc(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
}) })
t.Run("Test GetQueryChannelInfoByIDFail", func(t *testing.T) { t.Run("Test GetQueryChannelInfoByIDFirst", func(t *testing.T) {
res, err := meta.getQueryChannelInfoByID(defaultCollectionID) res := meta.getQueryChannelInfoByID(defaultCollectionID)
assert.Nil(t, err)
assert.NotNil(t, res) assert.NotNil(t, res)
}) })
@ -245,11 +242,10 @@ func TestMetaFunc(t *testing.T) {
assert.Equal(t, defaultSegmentID, infos[0].SegmentID) assert.Equal(t, defaultSegmentID, infos[0].SegmentID)
}) })
t.Run("Test getQueryChannel", func(t *testing.T) { t.Run("Test getQueryChannelSecond", func(t *testing.T) {
info, err := meta.getQueryChannelInfoByID(defaultCollectionID) info := meta.getQueryChannelInfoByID(defaultCollectionID)
assert.NotNil(t, info.QueryChannel) assert.NotNil(t, info.QueryChannel)
assert.NotNil(t, info.QueryResultChannel) assert.NotNil(t, info.QueryResultChannel)
assert.Nil(t, err)
}) })
t.Run("Test GetSegmentInfoByID", func(t *testing.T) { t.Run("Test GetSegmentInfoByID", func(t *testing.T) {
@ -317,14 +313,6 @@ func TestReloadMetaFromKV(t *testing.T) {
segmentKey := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, defaultSegmentID) segmentKey := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, defaultSegmentID)
kvs[segmentKey] = string(segmentBlobs) kvs[segmentKey] = string(segmentBlobs)
queryChannelInfo := &querypb.QueryChannelInfo{
CollectionID: defaultCollectionID,
}
queryChannelBlobs, err := proto.Marshal(queryChannelInfo)
assert.Nil(t, err)
queryChannelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, defaultCollectionID)
kvs[queryChannelKey] = string(queryChannelBlobs)
deltaChannel1 := &datapb.VchannelInfo{CollectionID: defaultCollectionID, ChannelName: "delta-channel1"} deltaChannel1 := &datapb.VchannelInfo{CollectionID: defaultCollectionID, ChannelName: "delta-channel1"}
deltaChannel2 := &datapb.VchannelInfo{CollectionID: defaultCollectionID, ChannelName: "delta-channel2"} deltaChannel2 := &datapb.VchannelInfo{CollectionID: defaultCollectionID, ChannelName: "delta-channel2"}
@ -354,27 +342,8 @@ func TestReloadMetaFromKV(t *testing.T) {
assert.Equal(t, 1, len(meta.collectionInfos)) assert.Equal(t, 1, len(meta.collectionInfos))
assert.Equal(t, 1, len(meta.segmentInfos)) assert.Equal(t, 1, len(meta.segmentInfos))
assert.Equal(t, 1, len(meta.queryChannelInfos))
_, ok := meta.collectionInfos[defaultCollectionID] _, ok := meta.collectionInfos[defaultCollectionID]
assert.Equal(t, true, ok) assert.Equal(t, true, ok)
_, ok = meta.segmentInfos[defaultSegmentID] _, ok = meta.segmentInfos[defaultSegmentID]
assert.Equal(t, true, ok) assert.Equal(t, true, ok)
_, ok = meta.queryChannelInfos[defaultCollectionID]
assert.Equal(t, true, ok)
t.Run("test no global query seek position", func(t *testing.T) {
err = kv.Remove(globalQuerySeekPositionPrefix)
assert.NoError(t, err)
err = meta.reloadFromKV()
assert.NoError(t, err)
})
t.Run("test wrong global query seek position", func(t *testing.T) {
err = kv.Save(globalQuerySeekPositionPrefix, "&%*&^*^(&%*&%&^%")
assert.NoError(t, err)
err = meta.reloadFromKV()
assert.Error(t, err)
})
} }

View File

@ -123,7 +123,7 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) {
assert.Equal(t, node2ID, firstReq.DstNodeID) assert.Equal(t, node2ID, firstReq.DstNodeID)
assert.Equal(t, node2ID, secondReq.DstNodeID) assert.Equal(t, node2ID, secondReq.DstNodeID)
err = shuffleSegmentsToQueryNodeV2(baseCtx, reqs, cluster, true, nil, nil) err = shuffleSegmentsToQueryNodeV2(baseCtx, reqs, cluster, meta, true, nil, nil)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, node2ID, firstReq.DstNodeID) assert.Equal(t, node2ID, firstReq.DstNodeID)

View File

@ -2061,11 +2061,7 @@ func assignInternalTask(ctx context.Context,
for nodeID, collectionIDs := range watchQueryChannelInfo { for nodeID, collectionIDs := range watchQueryChannelInfo {
for _, collectionID := range collectionIDs { for _, collectionID := range collectionIDs {
queryChannelInfo, err := meta.getQueryChannelInfoByID(collectionID) queryChannelInfo := meta.getQueryChannelInfoByID(collectionID)
if err != nil {
return nil, err
}
msgBase := proto.Clone(parentTask.msgBase()).(*commonpb.MsgBase) msgBase := proto.Clone(parentTask.msgBase()).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchQueryChannels msgBase.MsgType = commonpb.MsgType_WatchQueryChannels
addQueryChannelRequest := &querypb.AddQueryChannelRequest{ addQueryChannelRequest := &querypb.AddQueryChannelRequest{
@ -2075,7 +2071,6 @@ func assignInternalTask(ctx context.Context,
QueryChannel: queryChannelInfo.QueryChannel, QueryChannel: queryChannelInfo.QueryChannel,
QueryResultChannel: queryChannelInfo.QueryResultChannel, QueryResultChannel: queryChannelInfo.QueryResultChannel,
GlobalSealedSegments: queryChannelInfo.GlobalSealedSegments, GlobalSealedSegments: queryChannelInfo.GlobalSealedSegments,
SeekPosition: queryChannelInfo.SeekPosition,
} }
baseTask := newBaseTask(ctx, parentTask.getTriggerCondition()) baseTask := newBaseTask(ctx, parentTask.getTriggerCondition())
baseTask.setParentTask(parentTask) baseTask.setParentTask(parentTask)

View File

@ -925,7 +925,8 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta)
rollBackSegmentChangeInfoErr := retry.Do(ctx, func() error { rollBackSegmentChangeInfoErr := retry.Do(ctx, func() error {
rollBackChangeInfos := reverseSealedSegmentChangeInfo(sealedSegmentChangeInfos) rollBackChangeInfos := reverseSealedSegmentChangeInfo(sealedSegmentChangeInfos)
for collectionID, infos := range rollBackChangeInfos { for collectionID, infos := range rollBackChangeInfos {
_, _, sendErr := meta.sendSealedSegmentChangeInfos(collectionID, infos) channelInfo := meta.getQueryChannelInfoByID(collectionID)
_, sendErr := meta.sendSealedSegmentChangeInfos(collectionID, channelInfo.QueryChannel, infos)
if sendErr != nil { if sendErr != nil {
return sendErr return sendErr
} }
@ -934,8 +935,10 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta)
}, retry.Attempts(20)) }, retry.Attempts(20))
if rollBackSegmentChangeInfoErr != nil { if rollBackSegmentChangeInfoErr != nil {
log.Error("scheduleLoop: Restore the information of global sealed segments in query node failed", zap.Error(rollBackSegmentChangeInfoErr)) log.Error("scheduleLoop: Restore the information of global sealed segments in query node failed", zap.Error(rollBackSegmentChangeInfoErr))
panic(rollBackSegmentChangeInfoErr)
} else {
log.Info("Successfully roll back segment info change")
} }
log.Info("Successfully roll back segment info change")
return err return err
} }

View File

@ -0,0 +1,26 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querycoord
func getCompareMapFromSlice(sliceData []int64) map[int64]struct{} {
compareMap := make(map[int64]struct{})
for _, data := range sliceData {
compareMap[data] = struct{}{}
}
return compareMap
}