Add historical and streaming module in querynode (#5469)

* add historical and streaming

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>

* fix GetSegmentInfo

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>

* pass regression test

Signed-off-by: xige-16 <xi.ge@zilliz.com>

Co-authored-by: bigsheeper <yihao.dai@zilliz.com>
pull/5471/head
xige-16 2021-05-28 10:26:30 +08:00 committed by GitHub
parent c63524deb9
commit fce792b8bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 560 additions and 802 deletions

View File

@ -542,6 +542,7 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
ChannelName: tempBuffer[0].Position().ChannelName,
MsgID: tempBuffer[0].Position().MsgID,
Timestamp: timeStamp,
MsgGroup: consumer.Subscription(),
}
endMsgPositions = append(endMsgPositions, newPos)
} else {
@ -549,6 +550,7 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
ChannelName: timeTickMsg.Position().ChannelName,
MsgID: timeTickMsg.Position().MsgID,
Timestamp: timeStamp,
MsgGroup: consumer.Subscription(),
}
endMsgPositions = append(endMsgPositions, newPos)
}

View File

@ -247,11 +247,11 @@ func getPulsarTtOutputStreamAndSeek(pulsarAddress string, positions []*MsgPositi
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
//outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start()
for _, pos := range positions {
pos.MsgGroup = funcutil.RandomString(4)
outputStream.Seek(pos)
}
outputStream.Start()
//outputStream.Start()
return outputStream
}

View File

@ -140,6 +140,9 @@ message SearchResults {
string result_channelID = 3;
string metric_type = 4;
repeated bytes hits = 5;
repeated int64 sealed_segmentIDs_searched = 6;
repeated string channelIDs_searched = 7;
repeated int64 global_sealed_segmentIDs = 8;
}
message RetrieveRequest {

View File

@ -1140,14 +1140,17 @@ func (m *SearchRequest) GetSerializedExprPlan() []byte {
}
type SearchResults struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
Status *commonpb.Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"`
ResultChannelID string `protobuf:"bytes,3,opt,name=result_channelID,json=resultChannelID,proto3" json:"result_channelID,omitempty"`
MetricType string `protobuf:"bytes,4,opt,name=metric_type,json=metricType,proto3" json:"metric_type,omitempty"`
Hits [][]byte `protobuf:"bytes,5,rep,name=hits,proto3" json:"hits,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
Status *commonpb.Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"`
ResultChannelID string `protobuf:"bytes,3,opt,name=result_channelID,json=resultChannelID,proto3" json:"result_channelID,omitempty"`
MetricType string `protobuf:"bytes,4,opt,name=metric_type,json=metricType,proto3" json:"metric_type,omitempty"`
Hits [][]byte `protobuf:"bytes,5,rep,name=hits,proto3" json:"hits,omitempty"`
SealedSegmentIDsSearched []int64 `protobuf:"varint,6,rep,packed,name=sealed_segmentIDs_searched,json=sealedSegmentIDsSearched,proto3" json:"sealed_segmentIDs_searched,omitempty"`
ChannelIDsSearched []string `protobuf:"bytes,7,rep,name=channelIDs_searched,json=channelIDsSearched,proto3" json:"channelIDs_searched,omitempty"`
GlobalSealedSegmentIDs []int64 `protobuf:"varint,8,rep,packed,name=global_sealed_segmentIDs,json=globalSealedSegmentIDs,proto3" json:"global_sealed_segmentIDs,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SearchResults) Reset() { *m = SearchResults{} }
@ -1210,6 +1213,27 @@ func (m *SearchResults) GetHits() [][]byte {
return nil
}
func (m *SearchResults) GetSealedSegmentIDsSearched() []int64 {
if m != nil {
return m.SealedSegmentIDsSearched
}
return nil
}
func (m *SearchResults) GetChannelIDsSearched() []string {
if m != nil {
return m.ChannelIDsSearched
}
return nil
}
func (m *SearchResults) GetGlobalSealedSegmentIDs() []int64 {
if m != nil {
return m.GlobalSealedSegmentIDs
}
return nil
}
type RetrieveRequest struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
ResultChannelID string `protobuf:"bytes,2,opt,name=result_channelID,json=resultChannelID,proto3" json:"result_channelID,omitempty"`
@ -2075,112 +2099,115 @@ func init() {
func init() { proto.RegisterFile("internal.proto", fileDescriptor_41f4a519b878ee3b) }
var fileDescriptor_41f4a519b878ee3b = []byte{
// 1697 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x58, 0xcd, 0x8f, 0x1c, 0x47,
0x15, 0xa7, 0x67, 0x66, 0x77, 0x66, 0x5e, 0xf7, 0xae, 0xc7, 0x65, 0x3b, 0x69, 0x7f, 0x24, 0x99,
0x74, 0xf8, 0x58, 0x12, 0x61, 0x5b, 0x1b, 0x20, 0x88, 0x8b, 0x13, 0xef, 0x24, 0x66, 0xe4, 0xac,
0xb5, 0xf4, 0x38, 0x91, 0xe0, 0xd2, 0xaa, 0xe9, 0x2e, 0xcf, 0x74, 0xd2, 0x5f, 0x54, 0x55, 0xdb,
0x3b, 0x39, 0x71, 0xe0, 0x04, 0x02, 0x21, 0x24, 0x4e, 0xfc, 0x0f, 0xfc, 0x09, 0x80, 0x72, 0x42,
0xe2, 0x8e, 0xc4, 0xbf, 0xc1, 0x91, 0x13, 0xaa, 0x57, 0xd5, 0x3d, 0x1f, 0xee, 0x5d, 0x26, 0x6b,
0x21, 0x88, 0xe0, 0xd6, 0xf5, 0xde, 0xab, 0xaa, 0xf7, 0xfb, 0xbd, 0x57, 0xef, 0x55, 0x35, 0xec,
0xc7, 0x99, 0x64, 0x3c, 0xa3, 0xc9, 0xed, 0x82, 0xe7, 0x32, 0x27, 0xd7, 0xd2, 0x38, 0x79, 0x5a,
0x0a, 0x3d, 0xba, 0x5d, 0x29, 0x6f, 0x38, 0x61, 0x9e, 0xa6, 0x79, 0xa6, 0xc5, 0x37, 0x1c, 0x11,
0xce, 0x59, 0x4a, 0xf5, 0xc8, 0xfb, 0x83, 0x05, 0x7b, 0x47, 0x79, 0x5a, 0xe4, 0x19, 0xcb, 0xe4,
0x38, 0x7b, 0x92, 0x93, 0x97, 0x60, 0x37, 0xcb, 0x23, 0x36, 0x1e, 0xb9, 0xd6, 0xd0, 0x3a, 0x68,
0xfb, 0x66, 0x44, 0x08, 0x74, 0x78, 0x9e, 0x30, 0xb7, 0x35, 0xb4, 0x0e, 0xfa, 0x3e, 0x7e, 0x93,
0x7b, 0x00, 0x42, 0x52, 0xc9, 0x82, 0x30, 0x8f, 0x98, 0xdb, 0x1e, 0x5a, 0x07, 0xfb, 0x87, 0xc3,
0xdb, 0x8d, 0x5e, 0xdc, 0x9e, 0x28, 0xc3, 0xa3, 0x3c, 0x62, 0x7e, 0x5f, 0x54, 0x9f, 0xe4, 0x5d,
0x00, 0x76, 0x2a, 0x39, 0x0d, 0xe2, 0xec, 0x49, 0xee, 0x76, 0x86, 0xed, 0x03, 0xfb, 0xf0, 0xf5,
0xf5, 0x05, 0x8c, 0xf3, 0x0f, 0xd9, 0xe2, 0x63, 0x9a, 0x94, 0xec, 0x84, 0xc6, 0xdc, 0xef, 0xe3,
0x24, 0xe5, 0xae, 0xf7, 0x37, 0x0b, 0x2e, 0xd5, 0x00, 0x70, 0x0f, 0x41, 0xbe, 0x0f, 0x3b, 0xb8,
0x05, 0x22, 0xb0, 0x0f, 0xbf, 0x7a, 0x86, 0x47, 0x6b, 0xb8, 0x7d, 0x3d, 0x85, 0x7c, 0x04, 0x57,
0x44, 0x39, 0x0d, 0x2b, 0x55, 0x80, 0x52, 0xe1, 0xb6, 0xd0, 0xb5, 0xed, 0x56, 0x22, 0xab, 0x0b,
0x18, 0x97, 0xde, 0x86, 0x5d, 0xb5, 0x52, 0x29, 0x90, 0x25, 0xfb, 0xf0, 0x66, 0x23, 0xc8, 0x09,
0x9a, 0xf8, 0xc6, 0xd4, 0xbb, 0x09, 0xd7, 0x1f, 0x30, 0xb9, 0x81, 0xce, 0x67, 0x3f, 0x29, 0x99,
0x90, 0x46, 0xf9, 0x38, 0x4e, 0xd9, 0xe3, 0x38, 0xfc, 0xf4, 0x68, 0x4e, 0xb3, 0x8c, 0x25, 0x95,
0xf2, 0x15, 0xb8, 0xf9, 0x80, 0xe1, 0x84, 0x58, 0xc8, 0x38, 0x14, 0x1b, 0xea, 0x6b, 0x70, 0xe5,
0x01, 0x93, 0xa3, 0x68, 0x43, 0xfc, 0x31, 0xf4, 0x1e, 0xa9, 0x60, 0xab, 0x34, 0xf8, 0x2e, 0x74,
0x69, 0x14, 0x71, 0x26, 0x84, 0x61, 0xf1, 0x56, 0xa3, 0xc7, 0xef, 0x69, 0x1b, 0xbf, 0x32, 0x6e,
0x4a, 0x13, 0xef, 0x13, 0x80, 0x71, 0x16, 0xcb, 0x13, 0xca, 0x69, 0x2a, 0xce, 0x4c, 0xb0, 0x11,
0x38, 0x42, 0x52, 0x2e, 0x83, 0x02, 0xed, 0x0c, 0xe5, 0x5b, 0x64, 0x83, 0x8d, 0xd3, 0xf4, 0xea,
0xde, 0x8f, 0x00, 0x26, 0x92, 0xc7, 0xd9, 0xec, 0xc3, 0x58, 0x48, 0xb5, 0xd7, 0x53, 0x65, 0xa7,
0x40, 0xb4, 0x0f, 0xfa, 0xbe, 0x19, 0xad, 0x84, 0xa3, 0xb5, 0x7d, 0x38, 0xee, 0x81, 0x5d, 0xd1,
0x7d, 0x2c, 0x66, 0xe4, 0x2e, 0x74, 0xa6, 0x54, 0xb0, 0x73, 0xe9, 0x39, 0x16, 0xb3, 0xfb, 0x54,
0x30, 0x1f, 0x2d, 0xbd, 0xcf, 0x5b, 0xf0, 0xf2, 0x11, 0x67, 0x98, 0xfc, 0x49, 0xc2, 0x42, 0x19,
0xe7, 0x99, 0xe1, 0xfe, 0x8b, 0xaf, 0x46, 0x5e, 0x86, 0x6e, 0x34, 0x0d, 0x32, 0x9a, 0x56, 0x64,
0xef, 0x46, 0xd3, 0x47, 0x34, 0x65, 0xe4, 0xeb, 0xb0, 0x1f, 0xd6, 0xeb, 0x2b, 0x09, 0xe6, 0x5c,
0xdf, 0xdf, 0x90, 0xaa, 0x50, 0x45, 0xd3, 0xf1, 0xc8, 0xed, 0x60, 0x18, 0xf0, 0x9b, 0x78, 0xe0,
0x2c, 0xad, 0xc6, 0x23, 0x77, 0x07, 0x75, 0x6b, 0x32, 0x45, 0xaa, 0xae, 0x21, 0xee, 0xee, 0xd0,
0x3a, 0x70, 0x7c, 0x33, 0x22, 0x77, 0xe1, 0xca, 0xd3, 0x98, 0xcb, 0x92, 0x26, 0x26, 0xaf, 0xd4,
0x2e, 0xc2, 0xed, 0x22, 0xf3, 0x4d, 0x2a, 0x72, 0x08, 0x57, 0x8b, 0xf9, 0x42, 0xc4, 0xe1, 0xc6,
0x94, 0x1e, 0x4e, 0x69, 0xd4, 0x79, 0x9f, 0x5b, 0x70, 0x6d, 0xc4, 0xf3, 0xe2, 0xcb, 0x4c, 0xa1,
0xf7, 0xcb, 0x16, 0xbc, 0xa4, 0x33, 0xe1, 0x84, 0x72, 0x19, 0xff, 0x9b, 0x50, 0x7c, 0x03, 0x2e,
0x2d, 0x77, 0xd5, 0x06, 0xcd, 0x30, 0xbe, 0x06, 0xfb, 0x45, 0xe5, 0x87, 0xb6, 0xeb, 0xa0, 0xdd,
0x5e, 0x2d, 0x5d, 0x43, 0xbb, 0x73, 0x0e, 0xda, 0xdd, 0x86, 0x84, 0x19, 0x82, 0x5d, 0x2f, 0x34,
0x1e, 0xb9, 0x5d, 0x34, 0x59, 0x15, 0x79, 0xbf, 0x68, 0xc1, 0x55, 0x15, 0xd4, 0xff, 0xb3, 0xa1,
0xd8, 0xf8, 0x63, 0x0b, 0x88, 0xce, 0x8e, 0x71, 0x16, 0xb1, 0xd3, 0xff, 0x24, 0x17, 0xaf, 0x00,
0x3c, 0x89, 0x59, 0x12, 0xad, 0xf2, 0xd0, 0x47, 0xc9, 0x0b, 0x71, 0xe0, 0x42, 0x17, 0x17, 0xa9,
0xf1, 0x57, 0x43, 0xd5, 0x05, 0xf4, 0x8d, 0xc0, 0x74, 0x81, 0xde, 0xd6, 0x5d, 0x00, 0xa7, 0x99,
0x2e, 0xf0, 0xfb, 0x36, 0xec, 0x8d, 0x33, 0xc1, 0xb8, 0xfc, 0x5f, 0x4e, 0x24, 0x72, 0x0b, 0xfa,
0x82, 0xcd, 0x52, 0x75, 0x31, 0x19, 0xb9, 0x3d, 0xd4, 0x2f, 0x05, 0x4a, 0x1b, 0xea, 0xca, 0x3a,
0x1e, 0xb9, 0x7d, 0x1d, 0xda, 0x5a, 0x40, 0x5e, 0x05, 0x90, 0x71, 0xca, 0x84, 0xa4, 0x69, 0x21,
0x5c, 0x18, 0xb6, 0x0f, 0x3a, 0xfe, 0x8a, 0x44, 0x75, 0x01, 0x9e, 0x3f, 0x1b, 0x8f, 0x84, 0x6b,
0x0f, 0xdb, 0xaa, 0x8d, 0xeb, 0x11, 0xf9, 0x36, 0xf4, 0x78, 0xfe, 0x2c, 0x88, 0xa8, 0xa4, 0xae,
0x83, 0xc1, 0xbb, 0xde, 0x48, 0xf6, 0xfd, 0x24, 0x9f, 0xfa, 0x5d, 0x9e, 0x3f, 0x1b, 0x51, 0x49,
0xbd, 0xbf, 0xb7, 0x60, 0x6f, 0xc2, 0x28, 0x0f, 0xe7, 0x17, 0x0f, 0xd8, 0x37, 0x61, 0xc0, 0x99,
0x28, 0x13, 0x19, 0x2c, 0x61, 0xe9, 0xc8, 0x5d, 0xd2, 0xf2, 0xa3, 0x1a, 0x5c, 0x45, 0x79, 0xfb,
0x1c, 0xca, 0x3b, 0x0d, 0x94, 0x7b, 0xe0, 0xac, 0xf0, 0x2b, 0xdc, 0x1d, 0x84, 0xbe, 0x26, 0x23,
0x03, 0x68, 0x47, 0x22, 0xc1, 0x88, 0xf5, 0x7d, 0xf5, 0x49, 0xde, 0x82, 0xcb, 0x45, 0x42, 0x43,
0x36, 0xcf, 0x93, 0x88, 0xf1, 0x60, 0xc6, 0xf3, 0xb2, 0xc0, 0x70, 0x39, 0xfe, 0x60, 0x45, 0xf1,
0x40, 0xc9, 0xc9, 0x3b, 0xd0, 0x8b, 0x44, 0x12, 0xc8, 0x45, 0xc1, 0x30, 0x64, 0xfb, 0x67, 0x60,
0x1f, 0x89, 0xe4, 0xf1, 0xa2, 0x60, 0x7e, 0x37, 0xd2, 0x1f, 0xe4, 0x2e, 0x5c, 0x15, 0x8c, 0xc7,
0x34, 0x89, 0x3f, 0x63, 0x51, 0xc0, 0x4e, 0x0b, 0x1e, 0x14, 0x09, 0xcd, 0x30, 0xb2, 0x8e, 0x4f,
0x96, 0xba, 0xf7, 0x4f, 0x0b, 0x7e, 0x92, 0xd0, 0xcc, 0xfb, 0xab, 0xb5, 0x24, 0x5d, 0xf1, 0x23,
0x2e, 0x40, 0xfa, 0x45, 0x6e, 0x52, 0x8d, 0x91, 0x6a, 0x37, 0x47, 0xea, 0x35, 0xb0, 0x53, 0x26,
0x79, 0x1c, 0x6a, 0x46, 0xf4, 0x01, 0x02, 0x2d, 0x42, 0xd8, 0x04, 0x3a, 0xf3, 0x58, 0xea, 0x50,
0x38, 0x3e, 0x7e, 0x7b, 0xbf, 0x6b, 0xc1, 0x25, 0x5f, 0x99, 0xb0, 0xa7, 0xec, 0x4b, 0x9f, 0x4f,
0x6f, 0x42, 0x3b, 0x8e, 0x04, 0xe6, 0x93, 0x7d, 0xe8, 0xae, 0xfb, 0x6d, 0xde, 0x72, 0xe3, 0x91,
0xf0, 0x95, 0x11, 0x79, 0x03, 0xf6, 0xf2, 0x52, 0x16, 0xa5, 0x0c, 0xb0, 0x9e, 0x56, 0x97, 0x2f,
0x47, 0x0b, 0x3f, 0x40, 0x99, 0xf7, 0xeb, 0x35, 0x76, 0xfe, 0x5b, 0x03, 0x6f, 0x60, 0x77, 0xb6,
0x81, 0x7d, 0x0f, 0x6c, 0x8d, 0x57, 0x97, 0x9d, 0x1d, 0x2c, 0x3b, 0xaf, 0x36, 0xce, 0x41, 0x0e,
0x54, 0xc9, 0xf1, 0x75, 0x63, 0x13, 0x58, 0x7e, 0xfe, 0x62, 0xc1, 0xde, 0x88, 0x25, 0x4c, 0xbe,
0x40, 0xba, 0x34, 0xb4, 0x85, 0x56, 0x63, 0x5b, 0x58, 0xab, 0xbb, 0xed, 0xf3, 0xeb, 0x6e, 0xe7,
0xb9, 0xba, 0xfb, 0x3a, 0x38, 0x05, 0x8f, 0x53, 0xca, 0x17, 0xc1, 0xa7, 0x6c, 0x51, 0xa5, 0x8c,
0x6d, 0x64, 0x0f, 0xd9, 0x42, 0x78, 0xff, 0xb0, 0xa0, 0xff, 0x61, 0x4e, 0x23, 0xbc, 0x3d, 0x5c,
0x00, 0xc9, 0x5a, 0xdb, 0x68, 0x35, 0xb4, 0x8d, 0xfa, 0x02, 0x50, 0xb9, 0xbf, 0xbc, 0x11, 0xac,
0x74, 0xf6, 0xce, 0x7a, 0x67, 0x7f, 0x0d, 0xec, 0x58, 0x39, 0x14, 0x14, 0x54, 0xce, 0xb5, 0xdf,
0x7d, 0x1f, 0x50, 0x74, 0xa2, 0x24, 0xaa, 0xf5, 0x57, 0x06, 0xd8, 0xfa, 0x77, 0xb7, 0x6e, 0xfd,
0x66, 0x11, 0x6c, 0xfd, 0x7f, 0x6a, 0x81, 0x3b, 0xd1, 0xce, 0x2e, 0xdf, 0xbf, 0x1f, 0x15, 0x11,
0x3e, 0xc3, 0x6f, 0x41, 0x7f, 0x52, 0x23, 0xd3, 0xcf, 0xcf, 0xa5, 0x40, 0x51, 0x7f, 0xcc, 0xd2,
0x9c, 0x2f, 0x26, 0xf1, 0x67, 0xcc, 0x00, 0x5f, 0x91, 0x28, 0x6c, 0x8f, 0xca, 0xd4, 0xcf, 0x9f,
0x09, 0x73, 0xd0, 0xab, 0xa1, 0xc2, 0x16, 0xe2, 0x85, 0x2d, 0x50, 0x91, 0x42, 0xe4, 0x1d, 0x1f,
0xb4, 0x48, 0xbd, 0x19, 0xc9, 0x75, 0xe8, 0xb1, 0x2c, 0xd2, 0xda, 0x1d, 0xd4, 0x76, 0x59, 0x16,
0xa1, 0x6a, 0x0c, 0xfb, 0xe6, 0xdd, 0x9b, 0x0b, 0x3c, 0xf4, 0xe6, 0xa8, 0x7b, 0x67, 0xfc, 0x6c,
0x38, 0x16, 0xb3, 0x13, 0x63, 0xe9, 0xef, 0xe9, 0xa7, 0xaf, 0x19, 0x92, 0xf7, 0xc1, 0x51, 0xbb,
0xd4, 0x0b, 0x75, 0xb7, 0x5e, 0xc8, 0x66, 0x59, 0x54, 0x0d, 0xbc, 0xdf, 0x58, 0x70, 0xf9, 0x39,
0x0a, 0x2f, 0x90, 0x47, 0x0f, 0xa1, 0x37, 0x61, 0x33, 0xb5, 0x44, 0xf5, 0x9a, 0xbf, 0x73, 0xd6,
0xcf, 0xa1, 0x33, 0x02, 0xe6, 0xd7, 0x0b, 0x78, 0x9f, 0xd4, 0x61, 0xfd, 0x20, 0x29, 0xc5, 0xfc,
0x28, 0x4f, 0x0b, 0x75, 0x5e, 0xa3, 0x0b, 0x3d, 0xc5, 0xcf, 0x4f, 0x71, 0xef, 0x67, 0x16, 0x00,
0x1e, 0x1e, 0xdc, 0xfa, 0xb9, 0xc4, 0xb4, 0x2e, 0x92, 0x98, 0xaa, 0x3f, 0x67, 0x65, 0x1a, 0x70,
0x96, 0x50, 0xc9, 0xa2, 0xc0, 0xec, 0x26, 0xcc, 0xee, 0x24, 0x2b, 0x53, 0x5f, 0xab, 0x0c, 0x4c,
0xe1, 0xfd, 0xca, 0x02, 0xc0, 0x7a, 0xa5, 0xdd, 0xd8, 0x6c, 0x28, 0xd6, 0xf9, 0x17, 0xeb, 0xd6,
0xfa, 0xf1, 0xbb, 0x5f, 0x1d, 0x3f, 0x81, 0xf1, 0x68, 0x37, 0x61, 0xa8, 0xe3, 0xb1, 0x04, 0x6f,
0x4e, 0xa8, 0x8e, 0xc1, 0x6f, 0x2d, 0x70, 0x56, 0x42, 0x25, 0xd6, 0x69, 0xb4, 0x36, 0x2b, 0x05,
0xf6, 0x6e, 0x75, 0x7a, 0x02, 0xb1, 0x72, 0xa0, 0xd2, 0xe5, 0x81, 0xba, 0x0e, 0x3d, 0xa4, 0x64,
0xe5, 0x44, 0x65, 0xe6, 0x44, 0xbd, 0x05, 0x97, 0x39, 0x0b, 0x59, 0x26, 0x93, 0x45, 0x90, 0xe6,
0x51, 0xfc, 0x24, 0x66, 0x11, 0x9e, 0xab, 0x9e, 0x3f, 0xa8, 0x14, 0xc7, 0x46, 0xee, 0xfd, 0xd9,
0x82, 0xfd, 0x1f, 0x96, 0x8c, 0x2f, 0x1e, 0xe5, 0x11, 0xd3, 0x9e, 0x7d, 0xf1, 0x94, 0x78, 0x17,
0xb1, 0x18, 0x7a, 0x74, 0xba, 0xbe, 0xf1, 0xaf, 0xd3, 0x55, 0xf8, 0x3d, 0x61, 0x52, 0x54, 0x51,
0xac, 0x1f, 0x4b, 0xdb, 0x50, 0xbc, 0x0c, 0xac, 0xe9, 0x44, 0x9a, 0xe2, 0x9f, 0x5a, 0x60, 0xaf,
0x1c, 0x4c, 0x55, 0xee, 0x4d, 0x6f, 0xd0, 0x2d, 0xc5, 0xc2, 0x82, 0x6b, 0x87, 0xcb, 0x5f, 0x22,
0xe4, 0x2a, 0xec, 0xa4, 0x62, 0x66, 0x22, 0xee, 0xf8, 0x7a, 0x40, 0x6e, 0x40, 0x2f, 0x15, 0x33,
0xbc, 0x53, 0x9a, 0x2a, 0x5d, 0x8f, 0x55, 0xd8, 0xea, 0x8e, 0x62, 0x8a, 0xd5, 0x52, 0xe0, 0xfd,
0xdc, 0x02, 0x62, 0xfa, 0xf0, 0x0b, 0xfd, 0xef, 0xc2, 0x84, 0x5d, 0xfd, 0xad, 0xd3, 0xd2, 0x97,
0x91, 0x55, 0xd9, 0x46, 0xbb, 0x6b, 0x6f, 0xb6, 0xbb, 0x37, 0xbf, 0x07, 0xfd, 0xfa, 0xcf, 0x31,
0x19, 0x80, 0x33, 0xce, 0x62, 0x89, 0xf7, 0xd8, 0x38, 0x9b, 0x0d, 0xbe, 0x42, 0x6c, 0xe8, 0xfe,
0x80, 0xd1, 0x44, 0xce, 0x17, 0x03, 0x8b, 0x38, 0xd0, 0x7b, 0x6f, 0x9a, 0xe5, 0x3c, 0xa5, 0xc9,
0xa0, 0x75, 0xff, 0x9d, 0x1f, 0x7f, 0x67, 0x16, 0xcb, 0x79, 0x39, 0x55, 0xce, 0xdd, 0xd1, 0xde,
0x7e, 0x2b, 0xce, 0xcd, 0xd7, 0x9d, 0x2a, 0x10, 0x77, 0x10, 0x40, 0x3d, 0x2c, 0xa6, 0xd3, 0x5d,
0x94, 0xbc, 0xfd, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x45, 0x0a, 0xb0, 0xba, 0x5f, 0x17, 0x00,
0x00,
// 1755 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x58, 0x5b, 0x8f, 0x23, 0x47,
0x15, 0xa6, 0x6d, 0xcf, 0xd8, 0x3e, 0xee, 0x99, 0x9d, 0xad, 0xbd, 0xa4, 0xf7, 0x92, 0xc4, 0xe9,
0x70, 0x19, 0x12, 0xb1, 0xb3, 0x9a, 0x00, 0x89, 0x10, 0xd2, 0x26, 0x3b, 0x4e, 0x16, 0x6b, 0x33,
0xab, 0xa1, 0xbc, 0x89, 0x04, 0x2f, 0xad, 0xb2, 0xbb, 0xd6, 0xee, 0xa4, 0x6f, 0x54, 0x95, 0x77,
0xd7, 0x79, 0xe2, 0x81, 0x27, 0x10, 0x08, 0x21, 0xf1, 0xc4, 0x7f, 0xe0, 0x27, 0x00, 0xca, 0x13,
0x12, 0xbf, 0x80, 0xbf, 0xc1, 0x23, 0x2f, 0xa0, 0x3a, 0x55, 0xdd, 0x6e, 0x7b, 0x7a, 0x06, 0x67,
0x56, 0x08, 0x22, 0x78, 0xeb, 0x3a, 0xe7, 0x54, 0xd5, 0x39, 0xdf, 0xb9, 0x76, 0xc1, 0x6e, 0x94,
0x2a, 0x2e, 0x52, 0x16, 0xdf, 0xc9, 0x45, 0xa6, 0x32, 0x72, 0x2d, 0x89, 0xe2, 0xa7, 0x73, 0x69,
0x56, 0x77, 0x0a, 0xe6, 0x4d, 0x77, 0x92, 0x25, 0x49, 0x96, 0x1a, 0xf2, 0x4d, 0x57, 0x4e, 0x66,
0x3c, 0x61, 0x66, 0xe5, 0xff, 0xc1, 0x81, 0x9d, 0xa3, 0x2c, 0xc9, 0xb3, 0x94, 0xa7, 0x6a, 0x98,
0x3e, 0xc9, 0xc8, 0x75, 0xd8, 0x4e, 0xb3, 0x90, 0x0f, 0x07, 0x9e, 0xd3, 0x77, 0xf6, 0x9b, 0xd4,
0xae, 0x08, 0x81, 0x96, 0xc8, 0x62, 0xee, 0x35, 0xfa, 0xce, 0x7e, 0x97, 0xe2, 0x37, 0xb9, 0x07,
0x20, 0x15, 0x53, 0x3c, 0x98, 0x64, 0x21, 0xf7, 0x9a, 0x7d, 0x67, 0x7f, 0xf7, 0xb0, 0x7f, 0xa7,
0x56, 0x8b, 0x3b, 0x23, 0x2d, 0x78, 0x94, 0x85, 0x9c, 0x76, 0x65, 0xf1, 0x49, 0xde, 0x05, 0xe0,
0xcf, 0x95, 0x60, 0x41, 0x94, 0x3e, 0xc9, 0xbc, 0x56, 0xbf, 0xb9, 0xdf, 0x3b, 0x7c, 0x6d, 0xf5,
0x00, 0xab, 0xfc, 0x43, 0xbe, 0xf8, 0x98, 0xc5, 0x73, 0x7e, 0xc2, 0x22, 0x41, 0xbb, 0xb8, 0x49,
0xab, 0xeb, 0xff, 0xd5, 0x81, 0x4b, 0xa5, 0x01, 0x78, 0x87, 0x24, 0xdf, 0x83, 0x2d, 0xbc, 0x02,
0x2d, 0xe8, 0x1d, 0x7e, 0xf5, 0x0c, 0x8d, 0x56, 0xec, 0xa6, 0x66, 0x0b, 0xf9, 0x08, 0xae, 0xc8,
0xf9, 0x78, 0x52, 0xb0, 0x02, 0xa4, 0x4a, 0xaf, 0x81, 0xaa, 0x6d, 0x76, 0x12, 0xa9, 0x1e, 0x60,
0x55, 0x7a, 0x0b, 0xb6, 0xf5, 0x49, 0x73, 0x89, 0x28, 0xf5, 0x0e, 0x6f, 0xd5, 0x1a, 0x39, 0x42,
0x11, 0x6a, 0x45, 0xfd, 0x5b, 0x70, 0xe3, 0x01, 0x57, 0x6b, 0xd6, 0x51, 0xfe, 0x93, 0x39, 0x97,
0xca, 0x32, 0x1f, 0x47, 0x09, 0x7f, 0x1c, 0x4d, 0x3e, 0x3d, 0x9a, 0xb1, 0x34, 0xe5, 0x71, 0xc1,
0x7c, 0x19, 0x6e, 0x3d, 0xe0, 0xb8, 0x21, 0x92, 0x2a, 0x9a, 0xc8, 0x35, 0xf6, 0x35, 0xb8, 0xf2,
0x80, 0xab, 0x41, 0xb8, 0x46, 0xfe, 0x18, 0x3a, 0x8f, 0xb4, 0xb3, 0x75, 0x18, 0x7c, 0x17, 0xda,
0x2c, 0x0c, 0x05, 0x97, 0xd2, 0xa2, 0x78, 0xbb, 0x56, 0xe3, 0xf7, 0x8c, 0x0c, 0x2d, 0x84, 0xeb,
0xc2, 0xc4, 0xff, 0x04, 0x60, 0x98, 0x46, 0xea, 0x84, 0x09, 0x96, 0xc8, 0x33, 0x03, 0x6c, 0x00,
0xae, 0x54, 0x4c, 0xa8, 0x20, 0x47, 0x39, 0x0b, 0xf9, 0x06, 0xd1, 0xd0, 0xc3, 0x6d, 0xe6, 0x74,
0xff, 0x47, 0x00, 0x23, 0x25, 0xa2, 0x74, 0xfa, 0x61, 0x24, 0x95, 0xbe, 0xeb, 0xa9, 0x96, 0xd3,
0x46, 0x34, 0xf7, 0xbb, 0xd4, 0xae, 0x2a, 0xee, 0x68, 0x6c, 0xee, 0x8e, 0x7b, 0xd0, 0x2b, 0xe0,
0x3e, 0x96, 0x53, 0x72, 0x17, 0x5a, 0x63, 0x26, 0xf9, 0xb9, 0xf0, 0x1c, 0xcb, 0xe9, 0x7d, 0x26,
0x39, 0x45, 0x49, 0xff, 0xf3, 0x06, 0xbc, 0x74, 0x24, 0x38, 0x06, 0x7f, 0x1c, 0xf3, 0x89, 0x8a,
0xb2, 0xd4, 0x62, 0xff, 0xc5, 0x4f, 0x23, 0x2f, 0x41, 0x3b, 0x1c, 0x07, 0x29, 0x4b, 0x0a, 0xb0,
0xb7, 0xc3, 0xf1, 0x23, 0x96, 0x70, 0xf2, 0x75, 0xd8, 0x9d, 0x94, 0xe7, 0x6b, 0x0a, 0xc6, 0x5c,
0x97, 0xae, 0x51, 0xb5, 0xab, 0xc2, 0xf1, 0x70, 0xe0, 0xb5, 0xd0, 0x0d, 0xf8, 0x4d, 0x7c, 0x70,
0x97, 0x52, 0xc3, 0x81, 0xb7, 0x85, 0xbc, 0x15, 0x9a, 0x06, 0xd5, 0xd4, 0x10, 0x6f, 0xbb, 0xef,
0xec, 0xbb, 0xd4, 0xae, 0xc8, 0x5d, 0xb8, 0xf2, 0x34, 0x12, 0x6a, 0xce, 0x62, 0x1b, 0x57, 0xfa,
0x16, 0xe9, 0xb5, 0x11, 0xf9, 0x3a, 0x16, 0x39, 0x84, 0xab, 0xf9, 0x6c, 0x21, 0xa3, 0xc9, 0xda,
0x96, 0x0e, 0x6e, 0xa9, 0xe5, 0xf9, 0x9f, 0x3b, 0x70, 0x6d, 0x20, 0xb2, 0xfc, 0xcb, 0x0c, 0xa1,
0xff, 0xcb, 0x06, 0x5c, 0x37, 0x91, 0x70, 0xc2, 0x84, 0x8a, 0xfe, 0x4d, 0x56, 0x7c, 0x03, 0x2e,
0x2d, 0x6f, 0x35, 0x02, 0xf5, 0x66, 0x7c, 0x0d, 0x76, 0xf3, 0x42, 0x0f, 0x23, 0xd7, 0x42, 0xb9,
0x9d, 0x92, 0xba, 0x62, 0xed, 0xd6, 0x39, 0xd6, 0x6e, 0xd7, 0x04, 0x4c, 0x1f, 0x7a, 0xe5, 0x41,
0xc3, 0x81, 0xd7, 0x46, 0x91, 0x2a, 0xc9, 0xff, 0x45, 0x03, 0xae, 0x6a, 0xa7, 0xfe, 0x1f, 0x0d,
0x8d, 0xc6, 0x1f, 0x1b, 0x40, 0x4c, 0x74, 0x0c, 0xd3, 0x90, 0x3f, 0xff, 0x4f, 0x62, 0xf1, 0x32,
0xc0, 0x93, 0x88, 0xc7, 0x61, 0x15, 0x87, 0x2e, 0x52, 0x5e, 0x08, 0x03, 0x0f, 0xda, 0x78, 0x48,
0x69, 0x7f, 0xb1, 0xd4, 0x5d, 0xc0, 0x4c, 0x04, 0xb6, 0x0b, 0x74, 0x36, 0xee, 0x02, 0xb8, 0xcd,
0x76, 0x81, 0xdf, 0x37, 0x61, 0x67, 0x98, 0x4a, 0x2e, 0xd4, 0xff, 0x72, 0x20, 0x91, 0xdb, 0xd0,
0x95, 0x7c, 0x9a, 0xe8, 0xc1, 0x64, 0xe0, 0x75, 0x90, 0xbf, 0x24, 0x68, 0xee, 0xc4, 0x54, 0xd6,
0xe1, 0xc0, 0xeb, 0x1a, 0xd7, 0x96, 0x04, 0xf2, 0x0a, 0x80, 0x8a, 0x12, 0x2e, 0x15, 0x4b, 0x72,
0xe9, 0x41, 0xbf, 0xb9, 0xdf, 0xa2, 0x15, 0x8a, 0xee, 0x02, 0x22, 0x7b, 0x36, 0x1c, 0x48, 0xaf,
0xd7, 0x6f, 0xea, 0x36, 0x6e, 0x56, 0xe4, 0xdb, 0xd0, 0x11, 0xd9, 0xb3, 0x20, 0x64, 0x8a, 0x79,
0x2e, 0x3a, 0xef, 0x46, 0x2d, 0xd8, 0xf7, 0xe3, 0x6c, 0x4c, 0xdb, 0x22, 0x7b, 0x36, 0x60, 0x8a,
0xf9, 0x7f, 0x6b, 0xc0, 0xce, 0x88, 0x33, 0x31, 0x99, 0x5d, 0xdc, 0x61, 0xdf, 0x84, 0x3d, 0xc1,
0xe5, 0x3c, 0x56, 0xc1, 0xd2, 0x2c, 0xe3, 0xb9, 0x4b, 0x86, 0x7e, 0x54, 0x1a, 0x57, 0x40, 0xde,
0x3c, 0x07, 0xf2, 0x56, 0x0d, 0xe4, 0x3e, 0xb8, 0x15, 0x7c, 0xa5, 0xb7, 0x85, 0xa6, 0xaf, 0xd0,
0xc8, 0x1e, 0x34, 0x43, 0x19, 0xa3, 0xc7, 0xba, 0x54, 0x7f, 0x92, 0x37, 0xe1, 0x72, 0x1e, 0xb3,
0x09, 0x9f, 0x65, 0x71, 0xc8, 0x45, 0x30, 0x15, 0xd9, 0x3c, 0x47, 0x77, 0xb9, 0x74, 0xaf, 0xc2,
0x78, 0xa0, 0xe9, 0xe4, 0x6d, 0xe8, 0x84, 0x32, 0x0e, 0xd4, 0x22, 0xe7, 0xe8, 0xb2, 0xdd, 0x33,
0x6c, 0x1f, 0xc8, 0xf8, 0xf1, 0x22, 0xe7, 0xb4, 0x1d, 0x9a, 0x0f, 0x72, 0x17, 0xae, 0x4a, 0x2e,
0x22, 0x16, 0x47, 0x9f, 0xf1, 0x30, 0xe0, 0xcf, 0x73, 0x11, 0xe4, 0x31, 0x4b, 0xd1, 0xb3, 0x2e,
0x25, 0x4b, 0xde, 0xfb, 0xcf, 0x73, 0x71, 0x12, 0xb3, 0xd4, 0xff, 0x47, 0x05, 0x74, 0x8d, 0x8f,
0xbc, 0x00, 0xe8, 0x17, 0x99, 0xa4, 0x6a, 0x3d, 0xd5, 0xac, 0xf7, 0xd4, 0xab, 0xd0, 0x4b, 0xb8,
0x12, 0xd1, 0xc4, 0x20, 0x62, 0x12, 0x08, 0x0c, 0x09, 0xcd, 0x26, 0xd0, 0x9a, 0x45, 0xca, 0xb8,
0xc2, 0xa5, 0xf8, 0x4d, 0xbe, 0x0f, 0x37, 0x25, 0x67, 0x31, 0x0f, 0x83, 0x32, 0xda, 0x65, 0x20,
0xd1, 0x52, 0x1e, 0x7a, 0xdb, 0xe8, 0x34, 0xcf, 0x48, 0x8c, 0x4a, 0x81, 0x91, 0xe5, 0x93, 0x03,
0xb8, 0x52, 0xaa, 0x55, 0xd9, 0x66, 0xe6, 0x18, 0xb2, 0x64, 0x95, 0x1b, 0xde, 0x01, 0x6f, 0x1a,
0x67, 0x63, 0x16, 0x07, 0xa7, 0x6e, 0xc5, 0xfa, 0xd5, 0xa4, 0xd7, 0x0d, 0x7f, 0xb4, 0x76, 0xa5,
0xff, 0xbb, 0x06, 0x5c, 0xa2, 0xda, 0x16, 0xfe, 0x94, 0x7f, 0xe9, 0x03, 0xff, 0x0d, 0x68, 0x46,
0xa1, 0xc4, 0xc0, 0xef, 0x1d, 0x7a, 0xab, 0x7a, 0xdb, 0x9f, 0xce, 0xe1, 0x40, 0x52, 0x2d, 0x44,
0x5e, 0x87, 0x9d, 0x6c, 0xae, 0xf2, 0xb9, 0x0a, 0xb0, 0xf0, 0x17, 0x53, 0xa2, 0x6b, 0x88, 0x1f,
0x20, 0xcd, 0xff, 0xf5, 0x0a, 0x3a, 0xff, 0xad, 0x11, 0x6a, 0xcd, 0x6e, 0x6d, 0x62, 0xf6, 0x3d,
0xe8, 0x19, 0x7b, 0x4d, 0x7d, 0xdc, 0xc2, 0xfa, 0xf8, 0x4a, 0xed, 0x1e, 0xc4, 0x40, 0xd7, 0x46,
0x6a, 0x3a, 0xb0, 0xc4, 0x3a, 0xf9, 0x17, 0x07, 0x76, 0x06, 0x3c, 0xe6, 0xea, 0x05, 0xc2, 0xa5,
0xa6, 0x7f, 0x35, 0x6a, 0xfb, 0xd7, 0x4a, 0x83, 0x68, 0x9e, 0xdf, 0x20, 0x5a, 0xa7, 0x1a, 0xc4,
0x6b, 0xe0, 0xe6, 0x22, 0x4a, 0x98, 0x58, 0x04, 0x9f, 0xf2, 0x45, 0x11, 0x32, 0x3d, 0x4b, 0x7b,
0xc8, 0x17, 0xd2, 0xff, 0xbb, 0x03, 0xdd, 0x0f, 0x33, 0x16, 0xe2, 0x98, 0x73, 0x01, 0x4b, 0x56,
0xfa, 0x5b, 0xa3, 0xa6, 0xbf, 0x95, 0x93, 0x4a, 0xa1, 0xfe, 0x72, 0x74, 0xa9, 0x8c, 0x20, 0xad,
0xd5, 0x11, 0xe4, 0x55, 0xe8, 0x45, 0x5a, 0xa1, 0x20, 0x67, 0x6a, 0x66, 0xf4, 0xee, 0x52, 0x40,
0xd2, 0x89, 0xa6, 0xe8, 0x19, 0xa5, 0x10, 0xc0, 0x19, 0x65, 0x7b, 0xe3, 0x19, 0xc5, 0x1e, 0x82,
0x33, 0xca, 0x9f, 0x1a, 0xe0, 0xd9, 0x52, 0xb0, 0xfc, 0x51, 0xff, 0x28, 0x0f, 0xf1, 0xbd, 0xe0,
0x36, 0x74, 0xcb, 0x32, 0x61, 0xff, 0x93, 0x97, 0x04, 0x0d, 0xfd, 0x31, 0x4f, 0x32, 0xb1, 0x18,
0x45, 0x9f, 0x71, 0x6b, 0x78, 0x85, 0xa2, 0x6d, 0x7b, 0x34, 0x4f, 0x68, 0xf6, 0x4c, 0xda, 0x44,
0x2f, 0x96, 0xda, 0xb6, 0x09, 0x4e, 0x96, 0x81, 0xf6, 0x14, 0x5a, 0xde, 0xa2, 0x60, 0x48, 0xfa,
0xe7, 0x96, 0xdc, 0x80, 0x0e, 0x4f, 0x43, 0xc3, 0xdd, 0x42, 0x6e, 0x9b, 0xa7, 0x21, 0xb2, 0x86,
0xb0, 0x6b, 0x7f, 0xd0, 0x33, 0x89, 0x49, 0x6f, 0x53, 0xdd, 0x3f, 0xe3, 0x55, 0xe4, 0x58, 0x4e,
0x4f, 0xac, 0x24, 0xdd, 0x31, 0xff, 0xe8, 0x76, 0x49, 0xde, 0x07, 0x57, 0xdf, 0x52, 0x1e, 0xd4,
0xde, 0xf8, 0xa0, 0x1e, 0x4f, 0xc3, 0x62, 0xe1, 0xff, 0xc6, 0x81, 0xcb, 0xa7, 0x20, 0xbc, 0x40,
0x1c, 0x3d, 0x84, 0xce, 0x88, 0x4f, 0xf5, 0x11, 0xc5, 0xb3, 0xc3, 0xc1, 0x59, 0xaf, 0x58, 0x67,
0x38, 0x8c, 0x96, 0x07, 0xf8, 0x9f, 0x94, 0x6e, 0xfd, 0x20, 0x9e, 0xcb, 0xd9, 0x51, 0x96, 0xe4,
0x3a, 0x5f, 0xc3, 0x0b, 0xbd, 0x19, 0x9c, 0x1f, 0xe2, 0xfe, 0xcf, 0x1c, 0x00, 0x4c, 0x1e, 0xbc,
0xfa, 0x54, 0x60, 0x3a, 0x17, 0x09, 0x4c, 0x3d, 0x48, 0xa4, 0xf3, 0x24, 0x10, 0x3c, 0x66, 0x6a,
0xd9, 0xcc, 0xa4, 0xbd, 0x9d, 0xa4, 0xf3, 0x84, 0x1a, 0x96, 0x35, 0x53, 0xfa, 0xbf, 0x72, 0x00,
0xb0, 0x5e, 0x19, 0x35, 0xd6, 0x1b, 0x8a, 0x73, 0xfe, 0x1f, 0x40, 0x63, 0x35, 0xfd, 0xee, 0x17,
0xe9, 0x27, 0xd1, 0x1f, 0xcd, 0x3a, 0x1b, 0x4a, 0x7f, 0x2c, 0x8d, 0xb7, 0x19, 0x6a, 0x7c, 0xf0,
0x5b, 0x07, 0xdc, 0x8a, 0xab, 0xe4, 0x2a, 0x8c, 0xce, 0x7a, 0xa5, 0xc0, 0x21, 0x43, 0x67, 0x4f,
0x20, 0x2b, 0x09, 0x95, 0x2c, 0x13, 0xea, 0x06, 0x74, 0x10, 0x92, 0x4a, 0x46, 0xa5, 0x36, 0xa3,
0xde, 0x84, 0xcb, 0x82, 0x4f, 0x78, 0xaa, 0xe2, 0x45, 0x90, 0x64, 0x61, 0xf4, 0x24, 0xe2, 0x21,
0xe6, 0x55, 0x87, 0xee, 0x15, 0x8c, 0x63, 0x4b, 0xf7, 0xff, 0xec, 0xc0, 0xee, 0x0f, 0xe7, 0x5c,
0x2c, 0x1e, 0x65, 0x21, 0x37, 0x9a, 0x7d, 0xf1, 0x90, 0x78, 0x17, 0x6d, 0xb1, 0xf0, 0x98, 0x70,
0x7d, 0xfd, 0x5f, 0x87, 0xab, 0xa4, 0x1d, 0x69, 0x43, 0x54, 0x43, 0x6c, 0xfe, 0xea, 0x36, 0x81,
0x78, 0xe9, 0x58, 0xdb, 0x89, 0x0c, 0xc4, 0x3f, 0x75, 0xa0, 0x57, 0x49, 0x4c, 0x5d, 0xee, 0x6d,
0x6f, 0x30, 0x2d, 0xc5, 0xc1, 0x82, 0xdb, 0x9b, 0x2c, 0xdf, 0x6e, 0xc8, 0x55, 0xd8, 0x4a, 0xe4,
0xd4, 0x7a, 0xdc, 0xa5, 0x66, 0x41, 0x6e, 0x42, 0x27, 0x91, 0x53, 0x1c, 0x7e, 0x6d, 0x95, 0x2e,
0xd7, 0xda, 0x6d, 0x65, 0x47, 0xb1, 0xc5, 0x6a, 0x49, 0xf0, 0x7f, 0xee, 0x00, 0xb1, 0x7d, 0xf8,
0x85, 0x1e, 0xe6, 0x30, 0x60, 0xab, 0xef, 0x4f, 0x0d, 0x33, 0x8c, 0x54, 0x69, 0x6b, 0xed, 0xae,
0xb9, 0xde, 0xee, 0xde, 0x78, 0x07, 0xba, 0xe5, 0x13, 0x37, 0xd9, 0x03, 0x77, 0x98, 0x46, 0x0a,
0x07, 0xee, 0x28, 0x9d, 0xee, 0x7d, 0x85, 0xf4, 0xa0, 0xfd, 0x03, 0xce, 0x62, 0x35, 0x5b, 0xec,
0x39, 0xc4, 0x85, 0xce, 0x7b, 0xe3, 0x34, 0x13, 0x09, 0x8b, 0xf7, 0x1a, 0xf7, 0xdf, 0xfe, 0xf1,
0x77, 0xa6, 0x91, 0x9a, 0xcd, 0xc7, 0x5a, 0xb9, 0x03, 0xa3, 0xed, 0xb7, 0xa2, 0xcc, 0x7e, 0x1d,
0x14, 0x8e, 0x38, 0x40, 0x03, 0xca, 0x65, 0x3e, 0x1e, 0x6f, 0x23, 0xe5, 0xad, 0x7f, 0x06, 0x00,
0x00, 0xff, 0xff, 0xa3, 0xc1, 0xe7, 0x20, 0x08, 0x18, 0x00, 0x00,
}

View File

@ -90,6 +90,10 @@ type ReplicaInterface interface {
getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
replaceGrowingSegmentBySealedSegment(segment *Segment) error
//channels
addWatchedDmChannels(channels []string)
getWatchedDmChannels() []string
getTSafe(collectionID UniqueID) tSafer
addTSafe(collectionID UniqueID)
removeTSafe(collectionID UniqueID)
@ -105,6 +109,7 @@ type collectionReplica struct {
segments map[UniqueID]*Segment
excludedSegments map[UniqueID][]UniqueID // map[collectionID]segmentIDs
watchedChannels []string
}
//----------------------------------------------------------------------------------------------------- collection
@ -707,11 +712,20 @@ func (colReplica *collectionReplica) getSegmentsToLoadBySegmentType(segType segm
return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs
}
func (colReplica *collectionReplica) addWatchedDmChannels(channels []string) {
colReplica.watchedChannels = append(colReplica.watchedChannels, channels...)
}
func (colReplica *collectionReplica) getWatchedDmChannels() []string {
return colReplica.watchedChannels
}
func newCollectionReplica() ReplicaInterface {
collections := make(map[UniqueID]*Collection)
partitions := make(map[UniqueID]*Partition)
segments := make(map[UniqueID]*Segment)
excludedSegments := make(map[UniqueID][]UniqueID)
watchedChannels := make([]string, 0)
var replica ReplicaInterface = &collectionReplica{
collections: collections,
@ -719,6 +733,7 @@ func newCollectionReplica() ReplicaInterface {
segments: segments,
excludedSegments: excludedSegments,
watchedChannels: watchedChannels,
tSafes: make(map[UniqueID]tSafer),
}

View File

@ -21,7 +21,7 @@ import (
func TestCollectionReplica_getCollectionNum(t *testing.T) {
node := newQueryNodeMock()
initTestMeta(t, node, 0, 0)
assert.Equal(t, node.replica.getCollectionNum(), 1)
assert.Equal(t, node.historical.replica.getCollectionNum(), 1)
err := node.Stop()
assert.NoError(t, err)
}
@ -36,11 +36,11 @@ func TestCollectionReplica_addCollection(t *testing.T) {
func TestCollectionReplica_removeCollection(t *testing.T) {
node := newQueryNodeMock()
initTestMeta(t, node, 0, 0)
assert.Equal(t, node.replica.getCollectionNum(), 1)
assert.Equal(t, node.historical.replica.getCollectionNum(), 1)
err := node.replica.removeCollection(0)
err := node.historical.replica.removeCollection(0)
assert.NoError(t, err)
assert.Equal(t, node.replica.getCollectionNum(), 0)
assert.Equal(t, node.historical.replica.getCollectionNum(), 0)
err = node.Stop()
assert.NoError(t, err)
}
@ -49,7 +49,7 @@ func TestCollectionReplica_getCollectionByID(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
targetCollection, err := node.replica.getCollectionByID(collectionID)
targetCollection, err := node.historical.replica.getCollectionByID(collectionID)
assert.NoError(t, err)
assert.NotNil(t, targetCollection)
assert.Equal(t, targetCollection.ID(), collectionID)
@ -62,9 +62,9 @@ func TestCollectionReplica_hasCollection(t *testing.T) {
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
hasCollection := node.replica.hasCollection(collectionID)
hasCollection := node.historical.replica.hasCollection(collectionID)
assert.Equal(t, hasCollection, true)
hasCollection = node.replica.hasCollection(UniqueID(1))
hasCollection = node.historical.replica.hasCollection(UniqueID(1))
assert.Equal(t, hasCollection, false)
err := node.Stop()
@ -79,14 +79,14 @@ func TestCollectionReplica_getPartitionNum(t *testing.T) {
partitionIDs := []UniqueID{1, 2, 3}
for _, id := range partitionIDs {
err := node.replica.addPartition(collectionID, id)
err := node.historical.replica.addPartition(collectionID, id)
assert.NoError(t, err)
partition, err := node.replica.getPartitionByID(id)
partition, err := node.historical.replica.getPartitionByID(id)
assert.NoError(t, err)
assert.Equal(t, partition.ID(), id)
}
partitionNum := node.replica.getPartitionNum()
partitionNum := node.historical.replica.getPartitionNum()
assert.Equal(t, partitionNum, len(partitionIDs)+1)
err := node.Stop()
assert.NoError(t, err)
@ -99,9 +99,9 @@ func TestCollectionReplica_addPartition(t *testing.T) {
partitionIDs := []UniqueID{1, 2, 3}
for _, id := range partitionIDs {
err := node.replica.addPartition(collectionID, id)
err := node.historical.replica.addPartition(collectionID, id)
assert.NoError(t, err)
partition, err := node.replica.getPartitionByID(id)
partition, err := node.historical.replica.getPartitionByID(id)
assert.NoError(t, err)
assert.Equal(t, partition.ID(), id)
}
@ -117,12 +117,12 @@ func TestCollectionReplica_removePartition(t *testing.T) {
partitionIDs := []UniqueID{1, 2, 3}
for _, id := range partitionIDs {
err := node.replica.addPartition(collectionID, id)
err := node.historical.replica.addPartition(collectionID, id)
assert.NoError(t, err)
partition, err := node.replica.getPartitionByID(id)
partition, err := node.historical.replica.getPartitionByID(id)
assert.NoError(t, err)
assert.Equal(t, partition.ID(), id)
err = node.replica.removePartition(id)
err = node.historical.replica.removePartition(id)
assert.NoError(t, err)
}
err := node.Stop()
@ -137,9 +137,9 @@ func TestCollectionReplica_getPartitionByTag(t *testing.T) {
collectionMeta := genTestCollectionMeta(collectionID, false)
for _, id := range collectionMeta.PartitionIDs {
err := node.replica.addPartition(collectionID, id)
err := node.historical.replica.addPartition(collectionID, id)
assert.NoError(t, err)
partition, err := node.replica.getPartitionByID(id)
partition, err := node.historical.replica.getPartitionByID(id)
assert.NoError(t, err)
assert.Equal(t, partition.ID(), id)
assert.NotNil(t, partition)
@ -154,11 +154,11 @@ func TestCollectionReplica_hasPartition(t *testing.T) {
initTestMeta(t, node, collectionID, 0)
collectionMeta := genTestCollectionMeta(collectionID, false)
err := node.replica.addPartition(collectionID, collectionMeta.PartitionIDs[0])
err := node.historical.replica.addPartition(collectionID, collectionMeta.PartitionIDs[0])
assert.NoError(t, err)
hasPartition := node.replica.hasPartition(defaultPartitionID)
hasPartition := node.historical.replica.hasPartition(defaultPartitionID)
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(defaultPartitionID + 1)
hasPartition = node.historical.replica.hasPartition(defaultPartitionID + 1)
assert.Equal(t, hasPartition, false)
err = node.Stop()
assert.NoError(t, err)
@ -172,9 +172,9 @@ func TestCollectionReplica_addSegment(t *testing.T) {
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing)
err := node.historical.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing)
assert.NoError(t, err)
targetSeg, err := node.replica.getSegmentByID(UniqueID(i))
targetSeg, err := node.historical.replica.getSegmentByID(UniqueID(i))
assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
}
@ -191,12 +191,12 @@ func TestCollectionReplica_removeSegment(t *testing.T) {
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing)
err := node.historical.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing)
assert.NoError(t, err)
targetSeg, err := node.replica.getSegmentByID(UniqueID(i))
targetSeg, err := node.historical.replica.getSegmentByID(UniqueID(i))
assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
err = node.replica.removeSegment(UniqueID(i))
err = node.historical.replica.removeSegment(UniqueID(i))
assert.NoError(t, err)
}
@ -212,9 +212,9 @@ func TestCollectionReplica_getSegmentByID(t *testing.T) {
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing)
err := node.historical.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing)
assert.NoError(t, err)
targetSeg, err := node.replica.getSegmentByID(UniqueID(i))
targetSeg, err := node.historical.replica.getSegmentByID(UniqueID(i))
assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
}
@ -231,14 +231,14 @@ func TestCollectionReplica_hasSegment(t *testing.T) {
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing)
err := node.historical.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing)
assert.NoError(t, err)
targetSeg, err := node.replica.getSegmentByID(UniqueID(i))
targetSeg, err := node.historical.replica.getSegmentByID(UniqueID(i))
assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
hasSeg := node.replica.hasSegment(UniqueID(i))
hasSeg := node.historical.replica.hasSegment(UniqueID(i))
assert.Equal(t, hasSeg, true)
hasSeg = node.replica.hasSegment(UniqueID(i + 100))
hasSeg = node.historical.replica.hasSegment(UniqueID(i + 100))
assert.Equal(t, hasSeg, false)
}
@ -261,28 +261,28 @@ func TestReplaceGrowingSegmentBySealedSegment(t *testing.T) {
segmentID := UniqueID(520)
initTestMeta(t, node, collectionID, segmentID)
_, _, segIDs := node.replica.getSegmentsBySegmentType(segmentTypeGrowing)
_, _, segIDs := node.historical.replica.getSegmentsBySegmentType(segmentTypeGrowing)
assert.Equal(t, len(segIDs), 1)
collection, err := node.replica.getCollectionByID(collectionID)
collection, err := node.historical.replica.getCollectionByID(collectionID)
assert.NoError(t, err)
ns := newSegment(collection, segmentID, defaultPartitionID, collectionID, segmentTypeSealed)
err = node.replica.replaceGrowingSegmentBySealedSegment(ns)
err = node.historical.replica.replaceGrowingSegmentBySealedSegment(ns)
assert.NoError(t, err)
err = node.replica.setSegmentEnableIndex(segmentID, true)
err = node.historical.replica.setSegmentEnableIndex(segmentID, true)
assert.NoError(t, err)
segmentNums := node.replica.getSegmentNum()
segmentNums := node.historical.replica.getSegmentNum()
assert.Equal(t, segmentNums, 1)
segment, err := node.replica.getSegmentByID(segmentID)
segment, err := node.historical.replica.getSegmentByID(segmentID)
assert.NoError(t, err)
assert.Equal(t, segment.getType(), segmentTypeSealed)
_, _, segIDs = node.replica.getSegmentsBySegmentType(segmentTypeGrowing)
_, _, segIDs = node.historical.replica.getSegmentsBySegmentType(segmentTypeGrowing)
assert.Equal(t, len(segIDs), 0)
_, _, segIDs = node.replica.getSegmentsBySegmentType(segmentTypeSealed)
_, _, segIDs = node.historical.replica.getSegmentsBySegmentType(segmentTypeSealed)
assert.Equal(t, len(segIDs), 1)
err = node.Stop()

View File

@ -116,8 +116,8 @@ func TestDataSyncService_Start(t *testing.T) {
assert.Nil(t, err)
// dataSync
node.dataSyncServices[collectionID] = newDataSyncService(node.queryNodeLoopCtx, node.replica, msFactory, collectionID)
go node.dataSyncServices[collectionID].start()
node.streaming.dataSyncServices[collectionID] = newDataSyncService(node.queryNodeLoopCtx, node.streaming.replica, msFactory, collectionID)
go node.streaming.dataSyncServices[collectionID].start()
<-node.queryNodeLoopCtx.Done()
node.Stop()

View File

@ -0,0 +1,54 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"context"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/types"
)
type historical struct {
replica ReplicaInterface
loadService *loadService
statsService *statsService
}
func newHistorical(ctx context.Context,
masterService types.MasterService,
dataService types.DataService,
indexService types.IndexService,
factory msgstream.Factory) *historical {
replica := newCollectionReplica()
ls := newLoadService(ctx, masterService, dataService, indexService, replica)
ss := newStatsService(ctx, replica, ls.segLoader.indexLoader.fieldStatsChan, factory)
return &historical{
replica: replica,
loadService: ls,
statsService: ss,
}
}
func (h *historical) start() {
h.loadService.start()
h.statsService.start()
}
func (h *historical) close() {
h.loadService.close()
h.statsService.close()
// free collectionReplica
h.replica.freeAll()
}

View File

@ -298,7 +298,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseS
ErrorCode: commonpb.ErrorCode_Success,
}
for _, id := range in.SegmentIDs {
err2 := node.loadService.segLoader.replica.removeSegment(id)
err2 := node.historical.loadService.segLoader.replica.removeSegment(id)
if err2 != nil {
// not return, try to release all segments
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
@ -310,11 +310,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseS
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmentInfoRequest) (*queryPb.GetSegmentInfoResponse, error) {
infos := make([]*queryPb.SegmentInfo, 0)
for _, id := range in.SegmentIDs {
segment, err := node.replica.getSegmentByID(id)
if err != nil {
continue
}
getSegmentInfo := func(segment *Segment) *queryPb.SegmentInfo {
var indexName string
var indexID int64
// TODO:: segment has multi vec column
@ -334,6 +330,24 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen
IndexName: indexName,
IndexID: indexID,
}
return info
}
// get info from historical
for _, id := range in.SegmentIDs {
segment, err := node.historical.replica.getSegmentByID(id)
if err != nil {
continue
}
info := getSegmentInfo(segment)
infos = append(infos, info)
}
// get info from streaming
for _, id := range in.SegmentIDs {
segment, err := node.streaming.replica.getSegmentByID(id)
if err != nil {
continue
}
info := getSegmentInfo(segment)
infos = append(infos, info)
}
return &queryPb.GetSegmentInfoResponse{

View File

@ -1038,26 +1038,26 @@ func TestSegmentLoad_Search_Vector(t *testing.T) {
defer node.Stop()
ctx := node.queryNodeLoopCtx
node.loadService = newLoadService(ctx, nil, nil, nil, node.replica)
node.historical.loadService = newLoadService(ctx, nil, nil, nil, node.historical.replica)
initTestMeta(t, node, collectionID, 0)
err := node.replica.addPartition(collectionID, partitionID)
err := node.historical.replica.addPartition(collectionID, partitionID)
assert.NoError(t, err)
err = node.replica.addSegment(segmentID, partitionID, collectionID, segmentTypeSealed)
err = node.historical.replica.addSegment(segmentID, partitionID, collectionID, segmentTypeSealed)
assert.NoError(t, err)
paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix)
assert.NoError(t, err)
fieldsMap, _ := node.loadService.segLoader.checkTargetFields(paths, srcFieldIDs, fieldIDs)
fieldsMap, _ := node.historical.loadService.segLoader.checkTargetFields(paths, srcFieldIDs, fieldIDs)
assert.Equal(t, len(fieldsMap), 4)
segment, err := node.replica.getSegmentByID(segmentID)
segment, err := node.historical.replica.getSegmentByID(segmentID)
assert.NoError(t, err)
err = node.loadService.segLoader.loadSegmentFieldsData(segment, fieldsMap)
err = node.historical.loadService.segLoader.loadSegmentFieldsData(segment, fieldsMap)
assert.NoError(t, err)
indexPaths, err := generateIndex(segmentID)
@ -1070,7 +1070,7 @@ func TestSegmentLoad_Search_Vector(t *testing.T) {
err = segment.setIndexInfo(100, indexInfo)
assert.NoError(t, err)
err = node.loadService.segLoader.indexLoader.loadIndex(segment, 100)
err = node.historical.loadService.segLoader.indexLoader.loadIndex(segment, 100)
assert.NoError(t, err)
// do search
@ -1098,7 +1098,7 @@ func TestSegmentLoad_Search_Vector(t *testing.T) {
assert.NoError(t, err)
searchTimestamp := Timestamp(1020)
collection, err := node.replica.getCollectionByID(collectionID)
collection, err := node.historical.replica.getCollectionByID(collectionID)
assert.NoError(t, err)
plan, err := createPlan(*collection, dslString)
assert.NoError(t, err)

View File

@ -1,252 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"context"
"fmt"
"path"
"reflect"
"strings"
"time"
"github.com/golang/protobuf/proto"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/util/retry"
)
const (
CollectionPrefix = "/collection/"
SegmentPrefix = "/segment/"
)
type metaService struct {
ctx context.Context
kvBase *etcdkv.EtcdKV
replica ReplicaInterface
}
func newMetaService(ctx context.Context, replica ReplicaInterface) *metaService {
ETCDAddr := Params.EtcdAddress
MetaRootPath := Params.MetaRootPath
var cli *clientv3.Client
var err error
connectEtcdFn := func() error {
cli, err = clientv3.New(clientv3.Config{
Endpoints: []string{ETCDAddr},
DialTimeout: 5 * time.Second,
})
if err != nil {
return err
}
return nil
}
err = retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
panic(err)
}
return &metaService{
ctx: ctx,
kvBase: etcdkv.NewEtcdKV(cli, MetaRootPath),
replica: replica,
}
}
func (mService *metaService) start() {
// init from meta
err := mService.loadCollections()
if err != nil {
log.Error("metaService loadCollections failed")
}
err = mService.loadSegments()
if err != nil {
log.Error("metaService loadSegments failed")
}
}
func GetCollectionObjID(key string) string {
ETCDRootPath := Params.MetaRootPath
prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/"
return strings.TrimPrefix(key, prefix)
}
func GetSegmentObjID(key string) string {
ETCDRootPath := Params.MetaRootPath
prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/"
return strings.TrimPrefix(key, prefix)
}
func isCollectionObj(key string) bool {
ETCDRootPath := Params.MetaRootPath
prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/"
prefix = strings.TrimSpace(prefix)
index := strings.Index(key, prefix)
return index == 0
}
func isSegmentObj(key string) bool {
ETCDRootPath := Params.MetaRootPath
prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/"
prefix = strings.TrimSpace(prefix)
index := strings.Index(key, prefix)
return index == 0
}
func printCollectionStruct(obj *etcdpb.CollectionInfo) {
v := reflect.ValueOf(obj)
v = reflect.Indirect(v)
typeOfS := v.Type()
for i := 0; i < v.NumField(); i++ {
if typeOfS.Field(i).Name == "GrpcMarshalString" {
continue
}
log.Debug("Field", zap.String("field name", typeOfS.Field(i).Name), zap.String("field", fmt.Sprintln(v.Field(i).Interface())))
}
}
func printSegmentStruct(obj *datapb.SegmentInfo) {
v := reflect.ValueOf(obj)
v = reflect.Indirect(v)
typeOfS := v.Type()
for i := 0; i < v.NumField(); i++ {
log.Debug("Field", zap.String("field name", typeOfS.Field(i).Name), zap.String("field", fmt.Sprintln(v.Field(i).Interface())))
}
}
func (mService *metaService) processCollectionCreate(id string, value string) {
col := mService.collectionUnmarshal(value)
if col != nil {
schema := col.Schema
err := mService.replica.addCollection(col.ID, schema)
if err != nil {
log.Error(err.Error())
}
for _, partitionID := range col.PartitionIDs {
err = mService.replica.addPartition(col.ID, partitionID)
if err != nil {
log.Error(err.Error())
}
}
}
}
func (mService *metaService) processSegmentCreate(id string, value string) {
//println("Create Segment: ", id)
seg := mService.segmentUnmarshal(value)
// TODO: what if seg == nil? We need to notify master and return rpc request failed
if seg != nil {
// TODO: get partition id from segment meta
err := mService.replica.addSegment(seg.ID, seg.PartitionID, seg.CollectionID, segmentTypeGrowing)
if err != nil {
log.Error(err.Error())
return
}
}
}
func (mService *metaService) processCreate(key string, msg string) {
//println("process create", key)
if isCollectionObj(key) {
objID := GetCollectionObjID(key)
mService.processCollectionCreate(objID, msg)
} else if isSegmentObj(key) {
objID := GetSegmentObjID(key)
mService.processSegmentCreate(objID, msg)
} else {
println("can not process create msg:", key)
}
}
func (mService *metaService) loadCollections() error {
keys, values, err := mService.kvBase.LoadWithPrefix(CollectionPrefix)
if err != nil {
return err
}
for i := range keys {
objID := GetCollectionObjID(keys[i])
mService.processCollectionCreate(objID, values[i])
}
return nil
}
func (mService *metaService) loadSegments() error {
keys, values, err := mService.kvBase.LoadWithPrefix(SegmentPrefix)
if err != nil {
return err
}
for i := range keys {
objID := GetSegmentObjID(keys[i])
mService.processSegmentCreate(objID, values[i])
}
return nil
}
//----------------------------------------------------------------------- Unmarshal and Marshal
func (mService *metaService) collectionUnmarshal(value string) *etcdpb.CollectionInfo {
col := etcdpb.CollectionInfo{}
err := proto.UnmarshalText(value, &col)
if err != nil {
log.Error(fmt.Errorf("QueryNode metaService UnmarshalText etcdpb.CollectionInfo err:%w", err).Error())
return nil
}
return &col
}
func (mService *metaService) collectionMarshal(col *etcdpb.CollectionInfo) string {
value := proto.MarshalTextString(col)
if value == "" {
log.Error("marshal collection failed")
return ""
}
return value
}
func (mService *metaService) segmentUnmarshal(value string) *datapb.SegmentInfo {
seg := datapb.SegmentInfo{}
err := proto.UnmarshalText(value, &seg)
if err != nil {
log.Error(fmt.Errorf("QueryNode metaService UnmarshalText datapb.SegmentInfo err:%w", err).Error())
return nil
}
return &seg
}
func (mService *metaService) segmentMarshal(seg *etcdpb.SegmentMeta) string {
value := proto.MarshalTextString(seg)
if value == "" {
log.Error("marshal segment failed")
return ""
}
return value
}

View File

@ -1,227 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
func TestMetaService_start(t *testing.T) {
node := newQueryNodeMock()
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
node.metaService.start()
node.Stop()
}
func TestMetaService_getCollectionObjId(t *testing.T) {
var key = "/collection/collection0"
var collectionObjID1 = GetCollectionObjID(key)
assert.Equal(t, collectionObjID1, "/collection/collection0")
key = "fakeKey"
var collectionObjID2 = GetCollectionObjID(key)
assert.Equal(t, collectionObjID2, "fakeKey")
}
func TestMetaService_getSegmentObjId(t *testing.T) {
var key = "/segment/segment0"
var segmentObjID1 = GetSegmentObjID(key)
assert.Equal(t, segmentObjID1, "/segment/segment0")
key = "fakeKey"
var segmentObjID2 = GetSegmentObjID(key)
assert.Equal(t, segmentObjID2, "fakeKey")
}
func TestMetaService_isCollectionObj(t *testing.T) {
var key = Params.MetaRootPath + "/collection/collection0"
var b1 = isCollectionObj(key)
assert.Equal(t, b1, true)
key = Params.MetaRootPath + "/segment/segment0"
var b2 = isCollectionObj(key)
assert.Equal(t, b2, false)
}
func TestMetaService_isSegmentObj(t *testing.T) {
var key = Params.MetaRootPath + "/segment/segment0"
var b1 = isSegmentObj(key)
assert.Equal(t, b1, true)
key = Params.MetaRootPath + "/collection/collection0"
var b2 = isSegmentObj(key)
assert.Equal(t, b2, false)
}
func TestMetaService_printCollectionStruct(t *testing.T) {
collectionID := UniqueID(0)
collectionMeta := genTestCollectionMeta(collectionID, false)
printCollectionStruct(collectionMeta)
}
func TestMetaService_printSegmentStruct(t *testing.T) {
var s = datapb.SegmentInfo{
ID: UniqueID(0),
CollectionID: UniqueID(0),
PartitionID: defaultPartitionID,
NumRows: UniqueID(0),
}
printSegmentStruct(&s)
}
func TestMetaService_processCollectionCreate(t *testing.T) {
node := newQueryNodeMock()
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
id := "0"
value := `schema: <
name: "test"
autoID: true
fields: <
fieldID:100
name: "vec"
data_type: FloatVector
type_params: <
key: "dim"
value: "16"
>
index_params: <
key: "metric_type"
value: "L2"
>
>
fields: <
fieldID:101
name: "age"
data_type: Int32
type_params: <
key: "dim"
value: "1"
>
>
>
partitionIDs: 2021
`
node.metaService.processCollectionCreate(id, value)
collectionNum := node.replica.getCollectionNum()
assert.Equal(t, collectionNum, 1)
collection, err := node.replica.getCollectionByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
node.Stop()
}
func TestMetaService_processSegmentCreate(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
id := "0"
value := `partitionID: 2021
`
(*node.metaService).processSegmentCreate(id, value)
s, err := node.replica.getSegmentByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, s.segmentID, UniqueID(0))
node.Stop()
}
func TestMetaService_processCreate(t *testing.T) {
node := newQueryNodeMock()
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
key1 := Params.MetaRootPath + "/collection/0"
msg1 := `schema: <
name: "test"
autoID: true
fields: <
fieldID:100
name: "vec"
data_type: FloatVector
type_params: <
key: "dim"
value: "16"
>
index_params: <
key: "metric_type"
value: "L2"
>
>
fields: <
fieldID:101
name: "age"
data_type: Int32
type_params: <
key: "dim"
value: "1"
>
>
>
partitionIDs: 2021
`
(*node.metaService).processCreate(key1, msg1)
collectionNum := node.replica.getCollectionNum()
assert.Equal(t, collectionNum, 1)
collection, err := node.replica.getCollectionByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
key2 := Params.MetaRootPath + "/segment/0"
msg2 := `partitionID: 2021
`
(*node.metaService).processCreate(key2, msg2)
s, err := node.replica.getSegmentByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, s.segmentID, UniqueID(0))
node.Stop()
}
func TestMetaService_loadCollections(t *testing.T) {
node := newQueryNodeMock()
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
err2 := (*node.metaService).loadCollections()
assert.Nil(t, err2)
node.Stop()
}
func TestMetaService_loadSegments(t *testing.T) {
node := newQueryNodeMock()
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
err2 := (*node.metaService).loadSegments()
assert.Nil(t, err2)
node.Stop()
}

View File

@ -30,7 +30,6 @@ import (
"fmt"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
@ -53,15 +52,12 @@ type QueryNode struct {
QueryNodeID UniqueID
stateCode atomic.Value
replica ReplicaInterface
// internal components
historical *historical
streaming *streaming
// internal services
metaService *metaService
searchService *searchService
loadService *loadService
statsService *statsService
dsServicesMu sync.Mutex // guards dataSyncServices
dataSyncServices map[UniqueID]*dataSyncService
searchService *searchService
// clients
masterService types.MasterService
@ -82,18 +78,13 @@ func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.F
queryNodeLoopCtx: ctx1,
queryNodeLoopCancel: cancel,
QueryNodeID: queryNodeID,
dataSyncServices: make(map[UniqueID]*dataSyncService),
metaService: nil,
searchService: nil,
statsService: nil,
msFactory: factory,
searchService: nil,
msFactory: factory,
}
node.scheduler = newTaskScheduler(ctx1)
node.replica = newCollectionReplica()
node.UpdateStateCode(internalpb.StateCode_Abnormal)
return node
}
@ -102,17 +93,11 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer
node := &QueryNode{
queryNodeLoopCtx: ctx1,
queryNodeLoopCancel: cancel,
dataSyncServices: make(map[UniqueID]*dataSyncService),
metaService: nil,
searchService: nil,
statsService: nil,
msFactory: factory,
searchService: nil,
msFactory: factory,
}
node.scheduler = newTaskScheduler(ctx1)
node.replica = newCollectionReplica()
node.UpdateStateCode(internalpb.StateCode_Abnormal)
return node
@ -129,6 +114,13 @@ func (node *QueryNode) Register() error {
func (node *QueryNode) Init() error {
ctx := context.Background()
node.historical = newHistorical(node.queryNodeLoopCtx,
node.masterService,
node.dataService,
node.indexService,
node.msFactory)
node.streaming = newStreaming()
C.SegcoreInit()
registerReq := &queryPb.RegisterNodeRequest{
Base: &commonpb.MsgBase{
@ -192,17 +184,15 @@ func (node *QueryNode) Start() error {
}
// init services and manager
node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, node.msFactory)
node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterService, node.dataService, node.indexService, node.replica)
node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan, node.msFactory)
// TODO: pass node.streaming.replica to search service
node.searchService = newSearchService(node.queryNodeLoopCtx, node.historical.replica, node.streaming.replica, node.msFactory)
// start task scheduler
go node.scheduler.Start()
// start services
go node.searchService.start()
go node.loadService.start()
go node.statsService.start()
go node.historical.start()
node.UpdateStateCode(internalpb.StateCode_Healthy)
return nil
}
@ -211,24 +201,16 @@ func (node *QueryNode) Stop() error {
node.UpdateStateCode(internalpb.StateCode_Abnormal)
node.queryNodeLoopCancel()
// free collectionReplica
node.replica.freeAll()
// close services
for _, dsService := range node.dataSyncServices {
if dsService != nil {
dsService.close()
}
if node.historical != nil {
node.historical.close()
}
if node.streaming != nil {
node.streaming.close()
}
if node.searchService != nil {
node.searchService.close()
}
if node.loadService != nil {
node.loadService.close()
}
if node.statsService != nil {
node.statsService.close()
}
return nil
}
@ -267,29 +249,3 @@ func (node *QueryNode) SetDataService(data types.DataService) error {
node.dataService = data
return nil
}
func (node *QueryNode) getDataSyncService(collectionID UniqueID) (*dataSyncService, error) {
node.dsServicesMu.Lock()
defer node.dsServicesMu.Unlock()
ds, ok := node.dataSyncServices[collectionID]
if !ok {
return nil, errors.New("cannot found dataSyncService, collectionID =" + fmt.Sprintln(collectionID))
}
return ds, nil
}
func (node *QueryNode) addDataSyncService(collectionID UniqueID, ds *dataSyncService) error {
node.dsServicesMu.Lock()
defer node.dsServicesMu.Unlock()
if _, ok := node.dataSyncServices[collectionID]; ok {
return errors.New("dataSyncService has been existed, collectionID =" + fmt.Sprintln(collectionID))
}
node.dataSyncServices[collectionID] = ds
return nil
}
func (node *QueryNode) removeDataSyncService(collectionID UniqueID) {
node.dsServicesMu.Lock()
defer node.dsServicesMu.Unlock()
delete(node.dataSyncServices, collectionID)
}

View File

@ -121,18 +121,18 @@ func initTestMeta(t *testing.T, node *QueryNode, collectionID UniqueID, segmentI
}
collectionMeta := genTestCollectionMeta(collectionID, isBinary)
var err = node.replica.addCollection(collectionMeta.ID, collectionMeta.Schema)
var err = node.historical.replica.addCollection(collectionMeta.ID, collectionMeta.Schema)
assert.NoError(t, err)
collection, err := node.replica.getCollectionByID(collectionID)
collection, err := node.historical.replica.getCollectionByID(collectionID)
assert.NoError(t, err)
assert.Equal(t, collection.ID(), collectionID)
assert.Equal(t, node.replica.getCollectionNum(), 1)
assert.Equal(t, node.historical.replica.getCollectionNum(), 1)
err = node.replica.addPartition(collection.ID(), collectionMeta.PartitionIDs[0])
err = node.historical.replica.addPartition(collection.ID(), collectionMeta.PartitionIDs[0])
assert.NoError(t, err)
err = node.replica.addSegment(segmentID, collectionMeta.PartitionIDs[0], collectionID, segmentTypeGrowing)
err = node.historical.replica.addSegment(segmentID, collectionMeta.PartitionIDs[0], collectionID, segmentTypeGrowing)
assert.NoError(t, err)
}
@ -179,6 +179,8 @@ func newQueryNodeMock() *QueryNode {
if err != nil {
panic(err)
}
svr.historical = newHistorical(svr.queryNodeLoopCtx, nil, nil, nil, svr.msFactory)
svr.streaming = newStreaming()
return svr
}

View File

@ -34,8 +34,9 @@ type searchCollection struct {
releaseCtx context.Context
cancel context.CancelFunc
collectionID UniqueID
replica ReplicaInterface
collectionID UniqueID
historicalReplica ReplicaInterface
streamingReplica ReplicaInterface
msgBuffer chan *msgstream.SearchMsg
unsolvedMSgMu sync.Mutex // guards unsolvedMsg
@ -52,7 +53,12 @@ type searchCollection struct {
type ResultEntityIds []UniqueID
func newSearchCollection(releaseCtx context.Context, cancel context.CancelFunc, collectionID UniqueID, replica ReplicaInterface, searchResultStream msgstream.MsgStream) *searchCollection {
func newSearchCollection(releaseCtx context.Context,
cancel context.CancelFunc,
collectionID UniqueID,
historicalReplica ReplicaInterface,
streamingReplica ReplicaInterface,
searchResultStream msgstream.MsgStream) *searchCollection {
receiveBufSize := Params.SearchReceiveBufSize
msgBuffer := make(chan *msgstream.SearchMsg, receiveBufSize)
unsolvedMsg := make([]*msgstream.SearchMsg, 0)
@ -61,8 +67,9 @@ func newSearchCollection(releaseCtx context.Context, cancel context.CancelFunc,
releaseCtx: releaseCtx,
cancel: cancel,
collectionID: collectionID,
replica: replica,
collectionID: collectionID,
historicalReplica: historicalReplica,
streamingReplica: streamingReplica,
msgBuffer: msgBuffer,
unsolvedMsg: unsolvedMsg,
@ -80,8 +87,8 @@ func (s *searchCollection) start() {
}
func (s *searchCollection) register(collectionID UniqueID) {
s.replica.addTSafe(collectionID)
tSafe := s.replica.getTSafe(collectionID)
s.streamingReplica.addTSafe(collectionID)
tSafe := s.streamingReplica.getTSafe(collectionID)
s.tSafeMutex.Lock()
s.tSafeWatcher = newTSafeWatcher()
s.tSafeMutex.Unlock()
@ -105,7 +112,7 @@ func (s *searchCollection) popAllUnsolvedMsg() []*msgstream.SearchMsg {
func (s *searchCollection) waitNewTSafe() (Timestamp, error) {
// block until dataSyncService updating tSafe
s.tSafeWatcher.hasUpdate()
ts := s.replica.getTSafe(s.collectionID)
ts := s.streamingReplica.getTSafe(s.collectionID)
if ts != nil {
return ts.get(), nil
}
@ -271,7 +278,7 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
searchTimestamp := searchMsg.Base.Timestamp
collectionID := searchMsg.CollectionID
collection, err := s.replica.getCollectionByID(collectionID)
collection, err := s.historicalReplica.getCollectionByID(collectionID)
if err != nil {
return err
}
@ -302,25 +309,35 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
matchedSegments := make([]*Segment, 0)
//log.Debug("search msg's partitionID = ", partitionIDsInQuery)
partitionIDsInCol, err := s.replica.getPartitionIDs(collectionID)
if err != nil {
return err
partitionIDsInHistoricalCol, err1 := s.historicalReplica.getPartitionIDs(collectionID)
partitionIDsInStreamingCol, err2 := s.streamingReplica.getPartitionIDs(collectionID)
if err1 != nil && err2 != nil {
return err2
}
var searchPartitionIDs []UniqueID
var searchPartitionIDsInHistorical []UniqueID
var searchPartitionIDsInStreaming []UniqueID
partitionIDsInQuery := searchMsg.PartitionIDs
if len(partitionIDsInQuery) == 0 {
if len(partitionIDsInCol) == 0 {
if len(partitionIDsInHistoricalCol) == 0 {
return errors.New("none of this collection's partition has been loaded")
}
searchPartitionIDs = partitionIDsInCol
searchPartitionIDsInHistorical = partitionIDsInHistoricalCol
searchPartitionIDsInStreaming = partitionIDsInStreamingCol
} else {
for _, id := range partitionIDsInQuery {
_, err2 := s.replica.getPartitionByID(id)
if err2 != nil {
_, err1 = s.historicalReplica.getPartitionByID(id)
if err1 == nil {
searchPartitionIDsInHistorical = append(searchPartitionIDsInHistorical, id)
}
_, err2 = s.streamingReplica.getPartitionByID(id)
if err2 == nil {
searchPartitionIDsInStreaming = append(searchPartitionIDsInStreaming, id)
}
if err1 != nil && err2 != nil {
return err2
}
}
searchPartitionIDs = partitionIDsInQuery
}
if searchMsg.GetDslType() == commonpb.DslType_BoolExprV1 {
@ -332,13 +349,37 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
oplog.Object("nq", queryNum),
oplog.Object("dsl", searchMsg.Dsl))
}
for _, partitionID := range searchPartitionIDs {
segmentIDs, err := s.replica.getSegmentIDs(partitionID)
sealedSegmentSearched := make([]UniqueID, 0)
for _, partitionID := range searchPartitionIDsInHistorical {
segmentIDs, err := s.historicalReplica.getSegmentIDs(partitionID)
if err != nil {
return err
}
for _, segmentID := range segmentIDs {
segment, err := s.replica.getSegmentByID(segmentID)
segment, err := s.historicalReplica.getSegmentByID(segmentID)
if err != nil {
return err
}
searchResult, err := segment.segmentSearch(plan, searchRequests, []Timestamp{searchTimestamp})
if err != nil {
return err
}
searchResults = append(searchResults, searchResult)
matchedSegments = append(matchedSegments, segment)
sealedSegmentSearched = append(sealedSegmentSearched, segmentID)
}
}
//TODO:: get searched channels
for _, partitionID := range searchPartitionIDsInStreaming {
segmentIDs, err := s.streamingReplica.getSegmentIDs(partitionID)
if err != nil {
return err
}
for _, segmentID := range segmentIDs {
segment, err := s.streamingReplica.getSegmentByID(segmentID)
if err != nil {
return err
}
@ -456,10 +497,14 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
Timestamp: searchTimestamp,
SourceID: searchMsg.Base.SourceID,
},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
ResultChannelID: searchMsg.ResultChannelID,
Hits: hits,
MetricType: plan.getMetricType(),
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
ResultChannelID: searchMsg.ResultChannelID,
Hits: hits,
MetricType: plan.getMetricType(),
SealedSegmentIDsSearched: sealedSegmentSearched,
ChannelIDsSearched: s.streamingReplica.getWatchedDmChannels(),
//TODO:: get global sealed segment from etcd
GlobalSealedSegmentIDs: sealedSegmentSearched,
},
}

View File

@ -26,7 +26,8 @@ type searchService struct {
ctx context.Context
cancel context.CancelFunc
replica ReplicaInterface
historicalReplica ReplicaInterface
streamingReplica ReplicaInterface
searchMsgStream msgstream.MsgStream
searchResultMsgStream msgstream.MsgStream
@ -36,7 +37,10 @@ type searchService struct {
emptySearchCollection *searchCollection
}
func newSearchService(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *searchService {
func newSearchService(ctx context.Context,
historicalReplica ReplicaInterface,
streamingReplica ReplicaInterface,
factory msgstream.Factory) *searchService {
searchStream, _ := factory.NewQueryMsgStream(ctx)
searchResultStream, _ := factory.NewQueryMsgStream(ctx)
@ -56,7 +60,8 @@ func newSearchService(ctx context.Context, replica ReplicaInterface, factory msg
ctx: searchServiceCtx,
cancel: searchServiceCancel,
replica: replica,
historicalReplica: historicalReplica,
streamingReplica: streamingReplica,
searchMsgStream: searchStream,
searchResultMsgStream: searchResultStream,
@ -75,7 +80,7 @@ func (s *searchService) start() {
func (s *searchService) collectionCheck(collectionID UniqueID) error {
// check if collection exists
if ok := s.replica.hasCollection(collectionID); !ok {
if ok := s.historicalReplica.hasCollection(collectionID); !ok {
err := errors.New("no collection found, collectionID = " + strconv.FormatInt(collectionID, 10))
log.Error(err.Error())
return err
@ -139,14 +144,14 @@ func (s *searchService) close() {
func (s *searchService) startSearchCollection(collectionID UniqueID) {
ctx1, cancel := context.WithCancel(s.ctx)
sc := newSearchCollection(ctx1, cancel, collectionID, s.replica, s.searchResultMsgStream)
sc := newSearchCollection(ctx1, cancel, collectionID, s.historicalReplica, s.streamingReplica, s.searchResultMsgStream)
s.searchCollections[collectionID] = sc
sc.start()
}
func (s *searchService) startEmptySearchCollection() {
ctx1, cancel := context.WithCancel(s.ctx)
sc := newSearchCollection(ctx1, cancel, UniqueID(-1), s.replica, s.searchResultMsgStream)
sc := newSearchCollection(ctx1, cancel, UniqueID(-1), s.historicalReplica, s.streamingReplica, s.searchResultMsgStream)
s.emptySearchCollection = sc
sc.start()
}

View File

@ -141,26 +141,26 @@ func TestSearch_Search(t *testing.T) {
assert.NoError(t, err)
// start dataSync
newDS := newDataSyncService(node.queryNodeLoopCtx, node.replica, msFactory, collectionID)
err = node.addDataSyncService(collectionID, newDS)
newDS := newDataSyncService(node.queryNodeLoopCtx, node.streaming.replica, msFactory, collectionID)
err = node.streaming.addDataSyncService(collectionID, newDS)
assert.NoError(t, err)
ds, err := node.getDataSyncService(collectionID)
ds, err := node.streaming.getDataSyncService(collectionID)
assert.NoError(t, err)
go ds.start()
// start search service
node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, msFactory)
node.searchService = newSearchService(node.queryNodeLoopCtx, node.historical.replica, node.streaming.replica, msFactory)
go node.searchService.start()
node.searchService.startSearchCollection(collectionID)
tSafe := node.replica.getTSafe(collectionID)
tSafe := node.streaming.replica.getTSafe(collectionID)
assert.NotNil(t, tSafe)
tSafe.set(1000)
// load segment
err = node.replica.addSegment(segmentID, defaultPartitionID, collectionID, segmentTypeSealed)
err = node.historical.replica.addSegment(segmentID, defaultPartitionID, collectionID, segmentTypeSealed)
assert.NoError(t, err)
segment, err := node.replica.getSegmentByID(segmentID)
segment, err := node.historical.replica.getSegmentByID(segmentID)
assert.NoError(t, err)
err = loadFields(segment, DIM, N)
assert.NoError(t, err)
@ -189,33 +189,33 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
assert.NoError(t, err)
// start dataSync
newDS := newDataSyncService(node.queryNodeLoopCtx, node.replica, msFactory, collectionID)
err = node.addDataSyncService(collectionID, newDS)
newDS := newDataSyncService(node.queryNodeLoopCtx, node.streaming.replica, msFactory, collectionID)
err = node.streaming.addDataSyncService(collectionID, newDS)
assert.NoError(t, err)
ds, err := node.getDataSyncService(collectionID)
ds, err := node.streaming.getDataSyncService(collectionID)
assert.NoError(t, err)
go ds.start()
// start search service
node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, msFactory)
node.searchService = newSearchService(node.queryNodeLoopCtx, node.streaming.replica, node.streaming.replica, msFactory)
go node.searchService.start()
node.searchService.startSearchCollection(collectionID)
tSafe := node.replica.getTSafe(collectionID)
tSafe := node.streaming.replica.getTSafe(collectionID)
assert.NotNil(t, tSafe)
tSafe.set(1000)
// load segments
err = node.replica.addSegment(segmentID1, defaultPartitionID, collectionID, segmentTypeSealed)
err = node.historical.replica.addSegment(segmentID1, defaultPartitionID, collectionID, segmentTypeSealed)
assert.NoError(t, err)
segment1, err := node.replica.getSegmentByID(segmentID1)
segment1, err := node.historical.replica.getSegmentByID(segmentID1)
assert.NoError(t, err)
err = loadFields(segment1, DIM, N)
assert.NoError(t, err)
err = node.replica.addSegment(segmentID2, defaultPartitionID, collectionID, segmentTypeSealed)
err = node.historical.replica.addSegment(segmentID2, defaultPartitionID, collectionID, segmentTypeSealed)
assert.NoError(t, err)
segment2, err := node.replica.getSegmentByID(segmentID2)
segment2, err := node.historical.replica.getSegmentByID(segmentID2)
assert.NoError(t, err)
err = loadFields(segment2, DIM, N)
assert.NoError(t, err)

View File

@ -29,8 +29,8 @@ func TestStatsService_start(t *testing.T) {
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
msFactory.SetParams(m)
node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil, msFactory)
node.statsService.start()
node.historical.statsService = newStatsService(node.queryNodeLoopCtx, node.historical.replica, nil, msFactory)
node.historical.statsService.start()
node.Stop()
}
@ -57,11 +57,11 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) {
var statsMsgStream msgstream.MsgStream = statsStream
node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil, msFactory)
node.statsService.statsStream = statsMsgStream
node.statsService.statsStream.Start()
node.historical.statsService = newStatsService(node.queryNodeLoopCtx, node.historical.replica, nil, msFactory)
node.historical.statsService.statsStream = statsMsgStream
node.historical.statsService.statsStream.Start()
// send stats
node.statsService.publicStatistic(nil)
node.historical.statsService.publicStatistic(nil)
node.Stop()
}

View File

@ -0,0 +1,75 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"errors"
"fmt"
"sync"
)
type streaming struct {
replica ReplicaInterface
dsServicesMu sync.Mutex // guards dataSyncServices
dataSyncServices map[UniqueID]*dataSyncService
}
func newStreaming() *streaming {
replica := newCollectionReplica()
ds := make(map[UniqueID]*dataSyncService)
return &streaming{
replica: replica,
dataSyncServices: ds,
}
}
func (s *streaming) start() {
// TODO: start stats
}
func (s *streaming) close() {
// TODO: stop stats
for _, ds := range s.dataSyncServices {
ds.close()
}
s.dataSyncServices = make(map[UniqueID]*dataSyncService)
// free collectionReplica
s.replica.freeAll()
}
func (s *streaming) getDataSyncService(collectionID UniqueID) (*dataSyncService, error) {
s.dsServicesMu.Lock()
defer s.dsServicesMu.Unlock()
ds, ok := s.dataSyncServices[collectionID]
if !ok {
return nil, errors.New("cannot found dataSyncService, collectionID =" + fmt.Sprintln(collectionID))
}
return ds, nil
}
func (s *streaming) addDataSyncService(collectionID UniqueID, ds *dataSyncService) error {
s.dsServicesMu.Lock()
defer s.dsServicesMu.Unlock()
if _, ok := s.dataSyncServices[collectionID]; ok {
return errors.New("dataSyncService has been existed, collectionID =" + fmt.Sprintln(collectionID))
}
s.dataSyncServices[collectionID] = ds
return nil
}
func (s *streaming) removeDataSyncService(collectionID UniqueID) {
s.dsServicesMu.Lock()
defer s.dsServicesMu.Unlock()
delete(s.dataSyncServices, collectionID)
}

View File

@ -111,7 +111,7 @@ func (w *watchDmChannelsTask) PreExecute(ctx context.Context) error {
func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
log.Debug("starting WatchDmChannels ...", zap.String("ChannelIDs", fmt.Sprintln(w.req.ChannelIDs)))
collectionID := w.req.CollectionID
ds, err := w.node.getDataSyncService(collectionID)
ds, err := w.node.streaming.getDataSyncService(collectionID)
if err != nil || ds.dmStream == nil {
errMsg := "null data sync service or null data manipulation stream, collectionID = " + fmt.Sprintln(collectionID)
log.Error(errMsg)
@ -148,7 +148,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
toSeekInfo = append(toSeekInfo, info.Pos)
log.Debug("prevent inserting segments", zap.String("segmentIDs", fmt.Sprintln(info.ExcludedSegments)))
err := w.node.replica.addExcludedSegments(collectionID, info.ExcludedSegments)
err := w.node.streaming.replica.addExcludedSegments(collectionID, info.ExcludedSegments)
if err != nil {
log.Error(err.Error())
return err
@ -164,6 +164,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
return errors.New(errMsg)
}
}
w.node.streaming.replica.addWatchedDmChannels(w.req.ChannelIDs)
log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
log.Debug("WatchDmChannels done", zap.String("ChannelIDs", fmt.Sprintln(w.req.ChannelIDs)))
return nil
@ -205,32 +206,48 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
log.Debug("query node load segment", zap.String("loadSegmentRequest", fmt.Sprintln(l.req)))
hasCollection := l.node.replica.hasCollection(collectionID)
hasPartition := l.node.replica.hasPartition(partitionID)
if !hasCollection {
hasCollectionInHistorical := l.node.historical.replica.hasCollection(collectionID)
hasPartitionInHistorical := l.node.historical.replica.hasPartition(partitionID)
if !hasCollectionInHistorical {
// loading init
err := l.node.replica.addCollection(collectionID, schema)
err := l.node.historical.replica.addCollection(collectionID, schema)
if err != nil {
return err
}
l.node.replica.initExcludedSegments(collectionID)
newDS := newDataSyncService(l.node.queryNodeLoopCtx, l.node.replica, l.node.msFactory, collectionID)
hasCollectionInStreaming := l.node.streaming.replica.hasCollection(collectionID)
if !hasCollectionInStreaming {
err = l.node.streaming.replica.addCollection(collectionID, schema)
if err != nil {
return err
}
}
l.node.streaming.replica.initExcludedSegments(collectionID)
newDS := newDataSyncService(l.node.queryNodeLoopCtx, l.node.streaming.replica, l.node.msFactory, collectionID)
// ignore duplicated dataSyncService error
_ = l.node.addDataSyncService(collectionID, newDS)
ds, err := l.node.getDataSyncService(collectionID)
_ = l.node.streaming.addDataSyncService(collectionID, newDS)
ds, err := l.node.streaming.getDataSyncService(collectionID)
if err != nil {
return err
}
go ds.start()
l.node.searchService.startSearchCollection(collectionID)
}
if !hasPartition {
err := l.node.replica.addPartition(collectionID, partitionID)
if !hasPartitionInHistorical {
err := l.node.historical.replica.addPartition(collectionID, partitionID)
if err != nil {
return err
}
hasPartitionInStreaming := l.node.streaming.replica.hasPartition(partitionID)
if !hasPartitionInStreaming {
err = l.node.streaming.replica.addPartition(collectionID, partitionID)
if err != nil {
return err
}
}
}
err := l.node.replica.enablePartition(partitionID)
err := l.node.streaming.replica.enablePartition(partitionID)
if err != nil {
return err
}
@ -239,7 +256,7 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
return nil
}
err = l.node.loadService.loadSegmentPassively(collectionID, partitionID, segmentIDs, fieldIDs)
err = l.node.historical.loadService.loadSegmentPassively(collectionID, partitionID, segmentIDs, fieldIDs)
if err != nil {
return err
}
@ -275,21 +292,32 @@ func (r *releaseCollectionTask) PreExecute(ctx context.Context) error {
}
func (r *releaseCollectionTask) Execute(ctx context.Context) error {
ds, err := r.node.getDataSyncService(r.req.CollectionID)
ds, err := r.node.streaming.getDataSyncService(r.req.CollectionID)
if err == nil && ds != nil {
ds.close()
r.node.removeDataSyncService(r.req.CollectionID)
r.node.replica.removeTSafe(r.req.CollectionID)
r.node.replica.removeExcludedSegments(r.req.CollectionID)
r.node.streaming.removeDataSyncService(r.req.CollectionID)
r.node.streaming.replica.removeTSafe(r.req.CollectionID)
r.node.streaming.replica.removeExcludedSegments(r.req.CollectionID)
}
if r.node.searchService.hasSearchCollection(r.req.CollectionID) {
r.node.searchService.stopSearchCollection(r.req.CollectionID)
}
err = r.node.replica.removeCollection(r.req.CollectionID)
if err != nil {
return err
hasCollectionInHistorical := r.node.historical.replica.hasCollection(r.req.CollectionID)
if hasCollectionInHistorical {
err := r.node.historical.replica.removeCollection(r.req.CollectionID)
if err != nil {
return err
}
}
hasCollectionInStreaming := r.node.streaming.replica.hasCollection(r.req.CollectionID)
if hasCollectionInStreaming {
err := r.node.streaming.replica.removePartition(r.req.CollectionID)
if err != nil {
return err
}
}
log.Debug("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID))
@ -324,10 +352,21 @@ func (r *releasePartitionsTask) PreExecute(ctx context.Context) error {
func (r *releasePartitionsTask) Execute(ctx context.Context) error {
for _, id := range r.req.PartitionIDs {
err := r.node.loadService.segLoader.replica.removePartition(id)
if err != nil {
// not return, try to release all partitions
log.Error(err.Error())
hasPartitionInHistorical := r.node.historical.replica.hasPartition(id)
if hasPartitionInHistorical {
err := r.node.historical.replica.removePartition(id)
if err != nil {
// not return, try to release all partitions
log.Error(err.Error())
}
}
hasPartitionInStreaming := r.node.streaming.replica.hasPartition(id)
if hasPartitionInStreaming {
err := r.node.streaming.replica.removePartition(id)
if err != nil {
log.Error(err.Error())
}
}
}
return nil