[Feature|Pick] enable scheduler policy and add user-task-polling policy (#24839)

Signed-off-by: chyezh <ye.zhen@zilliz.com>
pull/25287/head
chyezh 2023-07-03 18:24:25 +08:00 committed by GitHub
parent 549c4a0e1b
commit d7d61f529c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1620 additions and 383 deletions

View File

@ -255,6 +255,22 @@ queryNode:
maxReadConcurrentRatio: 1
cpuRatio: 10 # ratio used to estimate read task cpu usage.
maxTimestampLag: 86400
# read task schedule policy: fifo(by default), user-task-polling.
scheduleReadPolicy:
# fifo: A FIFO queue support the schedule.
# user-task-polling:
# The user's tasks will be polled one by one and scheduled.
# Scheduling is fair on task granularity.
# The policy is based on the username for authentication.
# And an empty username is considered the same user.
# When there are no multi-users, the policy decay into FIFO
name: fifo
maxPendingTask: 10240
# user-task-polling configure:
taskQueueExpire: 60 # 1 min by default, expire time of inner user task queue since queue is empty.
enableCrossUserGrouping: false # false by default Enable Cross user grouping when using user-task-polling policy. (close it if task of any user can not merge others).
maxPendingTaskPerUser: 1024 # 50 by default, max pending task in scheduler per user.
gracefulStopTimeout: 30
port: 21123
grpc:

View File

@ -101,6 +101,7 @@ message SearchRequest {
int64 topk = 15;
string metricType = 16;
bool ignoreGrowing = 17; // Optional
string username = 18;
}
message SearchResults {
@ -143,6 +144,7 @@ message RetrieveRequest {
bool ignoreGrowing = 12;
bool is_count = 13;
int64 iteration_extension_reduce_rate = 14;
string username = 15;
}

View File

@ -748,6 +748,7 @@ type SearchRequest struct {
Topk int64 `protobuf:"varint,15,opt,name=topk,proto3" json:"topk,omitempty"`
MetricType string `protobuf:"bytes,16,opt,name=metricType,proto3" json:"metricType,omitempty"`
IgnoreGrowing bool `protobuf:"varint,17,opt,name=ignoreGrowing,proto3" json:"ignoreGrowing,omitempty"`
Username string `protobuf:"bytes,18,opt,name=username,proto3" json:"username,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -897,6 +898,13 @@ func (m *SearchRequest) GetIgnoreGrowing() bool {
return false
}
func (m *SearchRequest) GetUsername() string {
if m != nil {
return m.Username
}
return ""
}
type SearchResults struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
Status *commonpb.Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"`
@ -1104,6 +1112,7 @@ type RetrieveRequest struct {
IgnoreGrowing bool `protobuf:"varint,12,opt,name=ignoreGrowing,proto3" json:"ignoreGrowing,omitempty"`
IsCount bool `protobuf:"varint,13,opt,name=is_count,json=isCount,proto3" json:"is_count,omitempty"`
IterationExtensionReduceRate int64 `protobuf:"varint,14,opt,name=iteration_extension_reduce_rate,json=iterationExtensionReduceRate,proto3" json:"iteration_extension_reduce_rate,omitempty"`
Username string `protobuf:"bytes,15,opt,name=username,proto3" json:"username,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -1232,6 +1241,13 @@ func (m *RetrieveRequest) GetIterationExtensionReduceRate() int64 {
return 0
}
func (m *RetrieveRequest) GetUsername() string {
if m != nil {
return m.Username
}
return ""
}
type RetrieveResults struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
Status *commonpb.Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"`
@ -1988,125 +2004,126 @@ func init() {
func init() { proto.RegisterFile("internal.proto", fileDescriptor_41f4a519b878ee3b) }
var fileDescriptor_41f4a519b878ee3b = []byte{
// 1906 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x58, 0x4b, 0x6f, 0x1c, 0xb9,
0x11, 0x4e, 0xcf, 0x7b, 0x6a, 0x46, 0xd2, 0x88, 0x2b, 0x3b, 0xed, 0xc7, 0xae, 0xb5, 0x9d, 0x20,
0x51, 0x36, 0x58, 0x7b, 0xa3, 0xc5, 0xae, 0x73, 0x08, 0x12, 0xd8, 0x6a, 0xad, 0x30, 0xd8, 0x91,
0x23, 0xf7, 0x18, 0x0b, 0x24, 0x97, 0x06, 0x67, 0xba, 0x34, 0x62, 0xdc, 0x2f, 0x91, 0x6c, 0x3d,
0x7c, 0xce, 0x2d, 0x40, 0x6e, 0xb9, 0x04, 0x48, 0x7e, 0x40, 0x80, 0x9c, 0x93, 0x5b, 0xfe, 0x41,
0x7e, 0x90, 0x4f, 0x01, 0x1f, 0x3d, 0x0f, 0x69, 0x2c, 0x48, 0x72, 0x1e, 0x9b, 0x1b, 0x59, 0xf5,
0xb1, 0x48, 0x56, 0x15, 0x3f, 0x16, 0x09, 0xab, 0x2c, 0x95, 0xc8, 0x53, 0x1a, 0x3f, 0xce, 0x79,
0x26, 0x33, 0x72, 0x27, 0x61, 0xf1, 0x49, 0x21, 0x4c, 0xef, 0x71, 0xa9, 0xbc, 0xdf, 0x1d, 0x67,
0x49, 0x92, 0xa5, 0x46, 0x7c, 0xbf, 0x2b, 0xc6, 0x47, 0x98, 0x50, 0xd3, 0xf3, 0x1e, 0xc0, 0xbd,
0x3d, 0x94, 0xaf, 0x58, 0x82, 0xaf, 0xd8, 0xf8, 0xf5, 0xce, 0x11, 0x4d, 0x53, 0x8c, 0x03, 0x3c,
0x2e, 0x50, 0x48, 0xef, 0x43, 0x78, 0xb0, 0x87, 0x72, 0x28, 0xa9, 0x64, 0x42, 0xb2, 0xb1, 0xb8,
0xa0, 0xbe, 0x03, 0x1f, 0xec, 0xa1, 0xf4, 0xa3, 0x0b, 0xe2, 0x6f, 0xa0, 0xf5, 0x22, 0x8b, 0xb0,
0x9f, 0x1e, 0x66, 0xe4, 0x4b, 0x68, 0xd2, 0x28, 0xe2, 0x28, 0x84, 0xeb, 0x6c, 0x3a, 0x5b, 0x9d,
0xed, 0x87, 0x8f, 0x17, 0xd6, 0x68, 0x57, 0xf6, 0xcc, 0x60, 0x82, 0x12, 0x4c, 0x08, 0xd4, 0x78,
0x16, 0xa3, 0x5b, 0xd9, 0x74, 0xb6, 0xda, 0x81, 0x6e, 0x7b, 0xbf, 0x01, 0xe8, 0xa7, 0x4c, 0x1e,
0x50, 0x4e, 0x13, 0x41, 0xee, 0x42, 0x23, 0x55, 0xb3, 0xf8, 0xda, 0x70, 0x35, 0xb0, 0x3d, 0xe2,
0x43, 0x57, 0x48, 0xca, 0x65, 0x98, 0x6b, 0x9c, 0x5b, 0xd9, 0xac, 0x6e, 0x75, 0xb6, 0x3f, 0x5e,
0x3a, 0xed, 0xd7, 0x78, 0xfe, 0x0d, 0x8d, 0x0b, 0x3c, 0xa0, 0x8c, 0x07, 0x1d, 0x3d, 0xcc, 0x58,
0xf7, 0x7e, 0x05, 0x30, 0x94, 0x9c, 0xa5, 0x93, 0x01, 0x13, 0x52, 0xcd, 0x75, 0xa2, 0x70, 0x6a,
0x13, 0xd5, 0xad, 0x76, 0x60, 0x7b, 0xe4, 0x73, 0x68, 0x08, 0x49, 0x65, 0x21, 0xf4, 0x3a, 0x3b,
0xdb, 0x0f, 0x96, 0xce, 0x32, 0xd4, 0x90, 0xc0, 0x42, 0xbd, 0xbf, 0x56, 0x60, 0x63, 0xc1, 0xab,
0xd6, 0x6f, 0xe4, 0x33, 0xa8, 0x8d, 0xa8, 0xc0, 0x2b, 0x1d, 0xb5, 0x2f, 0x26, 0xcf, 0xa9, 0xc0,
0x40, 0x23, 0x95, 0x97, 0xa2, 0x51, 0xdf, 0xd7, 0xb3, 0x57, 0x03, 0xdd, 0x26, 0x1e, 0x74, 0xc7,
0x59, 0x1c, 0xe3, 0x58, 0xb2, 0x2c, 0xed, 0xfb, 0x6e, 0x55, 0xeb, 0x16, 0x64, 0x0a, 0x93, 0x53,
0x2e, 0x99, 0xe9, 0x0a, 0xb7, 0xb6, 0x59, 0x55, 0x98, 0x79, 0x19, 0xf9, 0x11, 0xf4, 0x24, 0xa7,
0x27, 0x18, 0x87, 0x92, 0x25, 0x28, 0x24, 0x4d, 0x72, 0xb7, 0xbe, 0xe9, 0x6c, 0xd5, 0x82, 0x35,
0x23, 0x7f, 0x55, 0x8a, 0xc9, 0x13, 0xf8, 0x60, 0x52, 0x50, 0x4e, 0x53, 0x89, 0x38, 0x87, 0x6e,
0x68, 0x34, 0x99, 0xaa, 0x66, 0x03, 0x7e, 0x0c, 0xeb, 0x0a, 0x96, 0x15, 0x72, 0x0e, 0xde, 0xd4,
0xf0, 0x9e, 0x55, 0x4c, 0xc1, 0xde, 0xdf, 0x1c, 0xb8, 0x73, 0xc1, 0x5f, 0x22, 0xcf, 0x52, 0x81,
0xb7, 0x70, 0xd8, 0x6d, 0x02, 0x46, 0x9e, 0x42, 0x5d, 0xb5, 0x84, 0x5b, 0xbd, 0x6e, 0x2a, 0x19,
0xbc, 0xf7, 0x67, 0x07, 0xc8, 0x0e, 0x47, 0x2a, 0xf1, 0x59, 0xcc, 0xe8, 0x7b, 0xc4, 0xf9, 0xbb,
0xd0, 0x8c, 0x46, 0x61, 0x4a, 0x93, 0xf2, 0x40, 0x34, 0xa2, 0xd1, 0x0b, 0x9a, 0x20, 0xf9, 0x21,
0xac, 0xcd, 0x02, 0x6b, 0x00, 0x55, 0x0d, 0x58, 0x9d, 0x89, 0x35, 0x70, 0x03, 0xea, 0x54, 0xad,
0xc1, 0xad, 0x69, 0xb5, 0xe9, 0x78, 0x02, 0x7a, 0x3e, 0xcf, 0xf2, 0xff, 0xd4, 0xea, 0xa6, 0x93,
0x56, 0xe7, 0x27, 0xfd, 0x93, 0x03, 0xeb, 0xcf, 0x62, 0x89, 0xfc, 0x5b, 0xea, 0x94, 0x7f, 0x54,
0xca, 0xa8, 0xf5, 0xd3, 0x08, 0xcf, 0xfe, 0x97, 0x0b, 0xfc, 0x10, 0xe0, 0x90, 0x61, 0x1c, 0x19,
0x8c, 0x59, 0x65, 0x5b, 0x4b, 0xb4, 0xba, 0x3c, 0xfe, 0xf5, 0x2b, 0x8e, 0x7f, 0x63, 0xc9, 0xf1,
0x77, 0xa1, 0xa9, 0x8d, 0xf4, 0x7d, 0x7d, 0xe8, 0xaa, 0x41, 0xd9, 0x55, 0xe4, 0x89, 0x67, 0x92,
0xd3, 0x92, 0x3c, 0x5b, 0xd7, 0x26, 0x4f, 0x3d, 0xcc, 0x92, 0xe7, 0xdb, 0x1a, 0xac, 0x0c, 0x91,
0xf2, 0xf1, 0xd1, 0xed, 0x9d, 0xb7, 0x01, 0x75, 0x8e, 0xc7, 0x53, 0x6e, 0x33, 0x9d, 0xe9, 0x8e,
0xab, 0x57, 0xec, 0xb8, 0x76, 0x0d, 0xc2, 0xab, 0x2f, 0x21, 0xbc, 0x1e, 0x54, 0x23, 0x11, 0x6b,
0x87, 0xb5, 0x03, 0xd5, 0x54, 0x34, 0x95, 0xc7, 0x74, 0x8c, 0x47, 0x59, 0x1c, 0x21, 0x0f, 0x27,
0x3c, 0x2b, 0x0c, 0x4d, 0x75, 0x83, 0xde, 0x9c, 0x62, 0x4f, 0xc9, 0xc9, 0x53, 0x68, 0x45, 0x22,
0x0e, 0xe5, 0x79, 0x8e, 0x6e, 0x6b, 0xd3, 0xd9, 0x5a, 0x7d, 0xc7, 0x36, 0x7d, 0x11, 0xbf, 0x3a,
0xcf, 0x31, 0x68, 0x46, 0xa6, 0x41, 0x3e, 0x83, 0x0d, 0x81, 0x9c, 0xd1, 0x98, 0xbd, 0xc1, 0x28,
0xc4, 0xb3, 0x9c, 0x87, 0x79, 0x4c, 0x53, 0xb7, 0xad, 0x27, 0x22, 0x33, 0xdd, 0xee, 0x59, 0xce,
0x0f, 0x62, 0x9a, 0x92, 0x2d, 0xe8, 0x65, 0x85, 0xcc, 0x0b, 0x19, 0xea, 0xb8, 0x89, 0x90, 0x45,
0x2e, 0xe8, 0x1d, 0xad, 0x1a, 0xf9, 0x57, 0x5a, 0xdc, 0x8f, 0x96, 0x92, 0x78, 0xe7, 0x46, 0x24,
0xde, 0xbd, 0x19, 0x89, 0xaf, 0x2c, 0x27, 0x71, 0xb2, 0x0a, 0x95, 0xf4, 0xd8, 0x5d, 0xd5, 0xa1,
0xa9, 0xa4, 0xc7, 0x2a, 0x90, 0x32, 0xcb, 0x5f, 0xbb, 0x6b, 0x26, 0x90, 0xaa, 0x4d, 0x3e, 0x02,
0x48, 0x50, 0x72, 0x36, 0x56, 0x6e, 0x71, 0x7b, 0x3a, 0x0e, 0x73, 0x12, 0xf2, 0x7d, 0x58, 0x61,
0x93, 0x34, 0xe3, 0xb8, 0xc7, 0xb3, 0x53, 0x96, 0x4e, 0xdc, 0xf5, 0x4d, 0x67, 0xab, 0x15, 0x2c,
0x0a, 0xbd, 0x7f, 0xce, 0x25, 0x9f, 0x28, 0x62, 0x29, 0xfe, 0x5b, 0xd7, 0xc4, 0x34, 0x63, 0xab,
0xf3, 0x19, 0xfb, 0x08, 0x3a, 0x66, 0x0b, 0x26, 0x33, 0x6a, 0x97, 0x76, 0xf5, 0x08, 0x3a, 0x69,
0x91, 0x84, 0xc7, 0x05, 0x72, 0x86, 0xc2, 0x9e, 0x65, 0x48, 0x8b, 0xe4, 0xa5, 0x91, 0x90, 0x0f,
0xa0, 0x2e, 0xb3, 0x3c, 0x7c, 0x6d, 0x8f, 0xb2, 0xf2, 0xd5, 0xd7, 0xe4, 0x67, 0x70, 0x5f, 0x20,
0x8d, 0x31, 0x0a, 0x05, 0x4e, 0x12, 0x4c, 0x65, 0xdf, 0x17, 0xa1, 0xd0, 0xdb, 0xc6, 0xc8, 0x6d,
0xea, 0x64, 0x70, 0x0d, 0x62, 0x38, 0x05, 0x0c, 0xad, 0x5e, 0xc5, 0x7a, 0x6c, 0x6a, 0xb6, 0x85,
0x61, 0x2d, 0x5d, 0xdc, 0x90, 0x99, 0x6a, 0x3a, 0xe0, 0xa7, 0xe0, 0x4e, 0xe2, 0x6c, 0x44, 0xe3,
0xf0, 0xd2, 0xac, 0x6e, 0x5b, 0x4f, 0x76, 0xd7, 0xe8, 0x87, 0x17, 0xa6, 0x54, 0xdb, 0x13, 0x31,
0x1b, 0x63, 0x14, 0x8e, 0xe2, 0x6c, 0xe4, 0x82, 0x4e, 0x6a, 0x30, 0xa2, 0xe7, 0x71, 0x36, 0x52,
0xc9, 0x6c, 0x01, 0xca, 0x0d, 0xe3, 0xac, 0x48, 0xa5, 0x4e, 0xd1, 0x6a, 0xb0, 0x6a, 0xe4, 0x2f,
0x8a, 0x64, 0x47, 0x49, 0xc9, 0xf7, 0x60, 0xc5, 0x22, 0xb3, 0xc3, 0x43, 0x81, 0x52, 0xe7, 0x66,
0x35, 0xe8, 0x1a, 0xe1, 0x2f, 0xb5, 0x8c, 0x1c, 0x28, 0x6e, 0x15, 0xf2, 0xd9, 0x64, 0xc2, 0x71,
0x42, 0xd5, 0xd9, 0xd6, 0x39, 0xd9, 0xd9, 0xfe, 0xc1, 0xe3, 0xa5, 0xc5, 0xf1, 0xe3, 0x9d, 0x45,
0x74, 0x70, 0x71, 0xb8, 0x77, 0x0c, 0x6b, 0x17, 0x30, 0x8a, 0x4e, 0xb8, 0x2d, 0x42, 0x54, 0x8a,
0xdb, 0x0a, 0x74, 0x41, 0x46, 0x36, 0xa1, 0x23, 0x90, 0x9f, 0xb0, 0xb1, 0x81, 0x18, 0x1a, 0x9b,
0x17, 0x29, 0x1a, 0x96, 0x99, 0xa4, 0xf1, 0x8b, 0x97, 0x36, 0x65, 0xca, 0xae, 0xf7, 0xf7, 0x1a,
0xac, 0x05, 0x2a, 0x45, 0xf0, 0x04, 0xff, 0x9f, 0x28, 0xf4, 0x5d, 0x54, 0xd6, 0xb8, 0x11, 0x95,
0x35, 0xaf, 0x4d, 0x65, 0xad, 0x1b, 0x51, 0x59, 0xfb, 0x66, 0x54, 0x06, 0xef, 0xa0, 0xb2, 0x0d,
0xa8, 0xc7, 0x2c, 0x61, 0x65, 0x96, 0x9a, 0xce, 0x65, 0x72, 0xea, 0x2e, 0x21, 0x27, 0x72, 0x0f,
0x5a, 0x4c, 0xd8, 0x24, 0x5f, 0xd1, 0x80, 0x26, 0x13, 0x26, 0xbb, 0x77, 0xe1, 0x11, 0x93, 0xc8,
0x75, 0x82, 0x85, 0x78, 0x26, 0x31, 0x15, 0xaa, 0xc5, 0x31, 0x2a, 0xc6, 0x18, 0x72, 0x2a, 0xd1,
0xd2, 0xe7, 0xc3, 0x29, 0x6c, 0xb7, 0x44, 0x05, 0x1a, 0x14, 0x50, 0x89, 0xde, 0xdb, 0xea, 0x7c,
0xea, 0x7c, 0x0b, 0x08, 0xf0, 0x13, 0xa8, 0xb2, 0xc8, 0x94, 0x58, 0x9d, 0x6d, 0x77, 0xd1, 0x8e,
0x7d, 0x89, 0xf6, 0x7d, 0x11, 0x28, 0x10, 0xf9, 0x05, 0x74, 0x6c, 0x1a, 0x44, 0x54, 0x52, 0x9d,
0x62, 0x9d, 0xed, 0x8f, 0x96, 0x8e, 0xd1, 0x79, 0xe1, 0x53, 0x49, 0x03, 0x53, 0x22, 0x09, 0xd5,
0x26, 0x3f, 0x87, 0x07, 0x97, 0x69, 0x91, 0x5b, 0x77, 0x44, 0x6e, 0x43, 0x67, 0xd6, 0xbd, 0x8b,
0xbc, 0x58, 0xfa, 0x2b, 0x22, 0x3f, 0x81, 0x8d, 0x39, 0x62, 0x9c, 0x0d, 0x6c, 0x6a, 0x66, 0x9c,
0x23, 0xcd, 0xd9, 0x90, 0xab, 0xa8, 0xb1, 0x75, 0x25, 0x35, 0xfe, 0xfb, 0xa9, 0xea, 0xad, 0x03,
0xed, 0x41, 0x46, 0x23, 0x5d, 0xb8, 0xde, 0x22, 0xec, 0x0f, 0xa1, 0x3d, 0x5d, 0xbd, 0x65, 0x8d,
0x99, 0x40, 0x69, 0xa7, 0xb5, 0xa7, 0x2d, 0x58, 0xe7, 0x8a, 0xd1, 0xb9, 0xa2, 0xb2, 0xb6, 0x58,
0x54, 0x3e, 0x82, 0x0e, 0x53, 0x0b, 0x0a, 0x73, 0x2a, 0x8f, 0x0c, 0x71, 0xb4, 0x03, 0xd0, 0xa2,
0x03, 0x25, 0x51, 0x55, 0x67, 0x09, 0xd0, 0x55, 0x67, 0xe3, 0xda, 0x55, 0xa7, 0x35, 0xa2, 0xab,
0xce, 0xdf, 0x3a, 0x00, 0x7a, 0xe3, 0x2a, 0x2d, 0x2f, 0x1b, 0x75, 0x6e, 0x63, 0x54, 0x31, 0x9a,
0xba, 0x96, 0x38, 0xc6, 0x54, 0xce, 0x62, 0x2b, 0xac, 0x73, 0x48, 0x5a, 0x24, 0x81, 0x51, 0xd9,
0xb8, 0x0a, 0xef, 0xf7, 0x0e, 0x80, 0x4e, 0x4e, 0xb3, 0x8c, 0x8b, 0xd4, 0xea, 0x5c, 0x5d, 0x8f,
0x57, 0x16, 0x5d, 0xf7, 0xbc, 0x74, 0xdd, 0x15, 0x0f, 0xd0, 0x69, 0x7a, 0xcc, 0x36, 0x6f, 0xbd,
0xab, 0xdb, 0xde, 0x1f, 0x1c, 0xe8, 0xda, 0xd5, 0x99, 0x25, 0x2d, 0x44, 0xd9, 0xb9, 0x18, 0x65,
0x5d, 0xb0, 0x24, 0x19, 0x3f, 0x0f, 0x05, 0x7b, 0x53, 0xde, 0x5b, 0x60, 0x44, 0x43, 0xf6, 0x06,
0x15, 0x87, 0x69, 0x97, 0x64, 0xa7, 0xa2, 0xbc, 0xb7, 0x94, 0x1b, 0xb2, 0x53, 0xa1, 0x78, 0x94,
0xe3, 0x18, 0x53, 0x19, 0x9f, 0x87, 0x49, 0x16, 0xb1, 0x43, 0x86, 0x91, 0xce, 0x86, 0x56, 0xd0,
0x2b, 0x15, 0xfb, 0x56, 0xae, 0xde, 0xf5, 0xc4, 0xfe, 0x1c, 0x95, 0xdf, 0x4f, 0xfb, 0x62, 0x72,
0x8b, 0xac, 0x55, 0x2e, 0x36, 0x76, 0x54, 0x22, 0x9a, 0x1f, 0x9f, 0x76, 0xb0, 0x20, 0x53, 0xb5,
0xe5, 0x94, 0xd9, 0x8d, 0x1f, 0x6b, 0xc1, 0x9c, 0x44, 0xad, 0x3c, 0xc2, 0x43, 0x5a, 0xc4, 0xf3,
0x37, 0x40, 0xcd, 0xdc, 0x00, 0x56, 0xb1, 0xf0, 0x23, 0xb1, 0xba, 0xc3, 0x31, 0xc2, 0x54, 0x32,
0x1a, 0xeb, 0x7f, 0xae, 0xfb, 0xd0, 0x2a, 0x84, 0x0a, 0x83, 0xad, 0x06, 0xda, 0xc1, 0xb4, 0x4f,
0x3e, 0x05, 0x82, 0xe9, 0x98, 0x9f, 0xe7, 0x2a, 0x83, 0x72, 0x2a, 0xc4, 0x69, 0xc6, 0x23, 0xfb,
0x24, 0x5c, 0x9f, 0x6a, 0x0e, 0xac, 0x82, 0xdc, 0x85, 0x86, 0xc4, 0x94, 0xa6, 0xd2, 0x9e, 0x31,
0xdb, 0xb3, 0x77, 0x87, 0x28, 0x72, 0xe4, 0xd6, 0xa7, 0x4d, 0x26, 0x86, 0xaa, 0xab, 0x1e, 0x94,
0xe2, 0x88, 0x6e, 0x7f, 0xf1, 0xe5, 0xcc, 0x7c, 0xdd, 0x3c, 0x28, 0x8d, 0xb8, 0xb4, 0xed, 0xed,
0xc2, 0xfa, 0x80, 0x09, 0x79, 0x90, 0xc5, 0x6c, 0x7c, 0x7e, 0xeb, 0xca, 0xc2, 0xfb, 0x9d, 0x03,
0x64, 0xde, 0x8e, 0xfd, 0x8f, 0x99, 0xdd, 0x1a, 0xce, 0xf5, 0x6f, 0x8d, 0x8f, 0xa1, 0x9b, 0x6b,
0x33, 0x21, 0x4b, 0x0f, 0xb3, 0x32, 0x7a, 0x1d, 0x23, 0x53, 0xbe, 0x15, 0xea, 0x19, 0xac, 0x9c,
0x19, 0xf2, 0x2c, 0x46, 0x13, 0xbc, 0x76, 0xd0, 0x56, 0x92, 0x40, 0x09, 0xbc, 0x09, 0xdc, 0x1b,
0x1e, 0x65, 0xa7, 0x3b, 0x59, 0x7a, 0xc8, 0x26, 0x85, 0xb9, 0x1a, 0xdf, 0xe3, 0x5f, 0xc1, 0x85,
0x66, 0x4e, 0xa5, 0x3a, 0x53, 0x36, 0x46, 0x65, 0xd7, 0xfb, 0xa3, 0x03, 0xf7, 0x97, 0xcd, 0xf4,
0x3e, 0xdb, 0xdf, 0x83, 0x95, 0xb1, 0x31, 0x67, 0xac, 0x5d, 0xff, 0xbf, 0x72, 0x71, 0x9c, 0xb7,
0x0b, 0x35, 0x55, 0x00, 0x90, 0x27, 0x50, 0xe1, 0x52, 0xaf, 0x60, 0x75, 0xfb, 0xd1, 0x3b, 0x98,
0x42, 0x01, 0xf5, 0x23, 0xb4, 0xc2, 0x25, 0xe9, 0x82, 0xc3, 0xf5, 0x4e, 0x9d, 0xc0, 0xe1, 0x9f,
0xfc, 0xc5, 0x81, 0x56, 0xa9, 0x26, 0xeb, 0xb0, 0xe2, 0xfb, 0x83, 0x9d, 0x29, 0x57, 0xf5, 0xbe,
0x43, 0x7a, 0xd0, 0xf5, 0xfd, 0xc1, 0x41, 0x59, 0xf5, 0xf5, 0x1c, 0xd2, 0x85, 0x96, 0xef, 0x0f,
0x34, 0xf9, 0xf4, 0x2a, 0xb6, 0xf7, 0x55, 0x5c, 0x88, 0xa3, 0x5e, 0x75, 0x6a, 0x20, 0xc9, 0xa9,
0x31, 0x50, 0x23, 0x2b, 0xd0, 0xf6, 0xf7, 0x07, 0xfd, 0x54, 0x20, 0x97, 0xbd, 0xba, 0xed, 0xfa,
0x18, 0xa3, 0xc4, 0x5e, 0x83, 0xac, 0x41, 0xc7, 0xdf, 0x1f, 0x3c, 0x2f, 0xe2, 0xd7, 0xea, 0x1e,
0xeb, 0x35, 0xb5, 0xfe, 0xe5, 0xc0, 0x3c, 0x44, 0x7a, 0x2d, 0x6d, 0xfe, 0xe5, 0x40, 0x3d, 0x8d,
0xce, 0x7b, 0xed, 0xe7, 0x4f, 0x7f, 0xfd, 0xc5, 0x84, 0xc9, 0xa3, 0x62, 0xa4, 0x1c, 0xf4, 0xc4,
0xec, 0xf5, 0x53, 0x96, 0xd9, 0xd6, 0x93, 0x72, 0xbf, 0x4f, 0xf4, 0xf6, 0xa7, 0xdd, 0x7c, 0x34,
0x6a, 0x68, 0xc9, 0xe7, 0xff, 0x0a, 0x00, 0x00, 0xff, 0xff, 0x70, 0x7b, 0x95, 0x84, 0x41, 0x17,
0x00, 0x00,
// 1921 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x58, 0x4b, 0x6f, 0x1c, 0xc7,
0x11, 0xce, 0xec, 0x7b, 0x6b, 0x97, 0xe4, 0xb2, 0x4d, 0x29, 0xa3, 0x87, 0x2d, 0x7a, 0x12, 0x24,
0x8c, 0x03, 0x4b, 0x0e, 0x0d, 0x5b, 0x39, 0x04, 0x09, 0x24, 0x8e, 0x4c, 0x2c, 0xbc, 0x52, 0xa8,
0x59, 0xc1, 0x40, 0x72, 0x19, 0xf4, 0xee, 0x14, 0x97, 0x1d, 0xcd, 0x8b, 0xdd, 0x3d, 0x7c, 0xe8,
0x9c, 0x5b, 0x80, 0xdc, 0x92, 0x43, 0x80, 0xe4, 0x07, 0x04, 0xc8, 0xd9, 0xc7, 0xfc, 0x83, 0x9c,
0xf2, 0x6b, 0x7c, 0x0a, 0xfa, 0x31, 0xfb, 0xe2, 0x8a, 0x20, 0xa9, 0x3c, 0x9c, 0xdb, 0x54, 0xd5,
0xd7, 0xd5, 0xdd, 0x55, 0xd5, 0x5f, 0x57, 0x0f, 0xac, 0xb3, 0x54, 0x22, 0x4f, 0x69, 0xfc, 0x30,
0xe7, 0x99, 0xcc, 0xc8, 0xad, 0x84, 0xc5, 0x27, 0x85, 0x30, 0xd2, 0xc3, 0xd2, 0x78, 0xb7, 0x3b,
0xce, 0x92, 0x24, 0x4b, 0x8d, 0xfa, 0x6e, 0x57, 0x8c, 0x8f, 0x30, 0xa1, 0x46, 0xf2, 0xee, 0xc1,
0x9d, 0x7d, 0x94, 0xaf, 0x58, 0x82, 0xaf, 0xd8, 0xf8, 0xf5, 0xde, 0x11, 0x4d, 0x53, 0x8c, 0x03,
0x3c, 0x2e, 0x50, 0x48, 0xef, 0x7d, 0xb8, 0xb7, 0x8f, 0x72, 0x28, 0xa9, 0x64, 0x42, 0xb2, 0xb1,
0x58, 0x32, 0xdf, 0x82, 0xf7, 0xf6, 0x51, 0xfa, 0xd1, 0x92, 0xfa, 0x2b, 0x68, 0xbd, 0xc8, 0x22,
0xec, 0xa7, 0x87, 0x19, 0xf9, 0x1c, 0x9a, 0x34, 0x8a, 0x38, 0x0a, 0xe1, 0x3a, 0xdb, 0xce, 0x4e,
0x67, 0xf7, 0xfe, 0xc3, 0x85, 0x35, 0xda, 0x95, 0x3d, 0x31, 0x98, 0xa0, 0x04, 0x13, 0x02, 0x35,
0x9e, 0xc5, 0xe8, 0x56, 0xb6, 0x9d, 0x9d, 0x76, 0xa0, 0xbf, 0xbd, 0xdf, 0x00, 0xf4, 0x53, 0x26,
0x0f, 0x28, 0xa7, 0x89, 0x20, 0xb7, 0xa1, 0x91, 0xaa, 0x59, 0x7c, 0xed, 0xb8, 0x1a, 0x58, 0x89,
0xf8, 0xd0, 0x15, 0x92, 0x72, 0x19, 0xe6, 0x1a, 0xe7, 0x56, 0xb6, 0xab, 0x3b, 0x9d, 0xdd, 0x0f,
0x57, 0x4e, 0xfb, 0x25, 0x9e, 0x7f, 0x45, 0xe3, 0x02, 0x0f, 0x28, 0xe3, 0x41, 0x47, 0x0f, 0x33,
0xde, 0xbd, 0x5f, 0x01, 0x0c, 0x25, 0x67, 0xe9, 0x64, 0xc0, 0x84, 0x54, 0x73, 0x9d, 0x28, 0x9c,
0xda, 0x44, 0x75, 0xa7, 0x1d, 0x58, 0x89, 0x7c, 0x0a, 0x0d, 0x21, 0xa9, 0x2c, 0x84, 0x5e, 0x67,
0x67, 0xf7, 0xde, 0xca, 0x59, 0x86, 0x1a, 0x12, 0x58, 0xa8, 0xf7, 0xb7, 0x0a, 0x6c, 0x2d, 0x44,
0xd5, 0xc6, 0x8d, 0x7c, 0x02, 0xb5, 0x11, 0x15, 0x78, 0x69, 0xa0, 0x9e, 0x8b, 0xc9, 0x53, 0x2a,
0x30, 0xd0, 0x48, 0x15, 0xa5, 0x68, 0xd4, 0xf7, 0xf5, 0xec, 0xd5, 0x40, 0x7f, 0x13, 0x0f, 0xba,
0xe3, 0x2c, 0x8e, 0x71, 0x2c, 0x59, 0x96, 0xf6, 0x7d, 0xb7, 0xaa, 0x6d, 0x0b, 0x3a, 0x85, 0xc9,
0x29, 0x97, 0xcc, 0x88, 0xc2, 0xad, 0x6d, 0x57, 0x15, 0x66, 0x5e, 0x47, 0x7e, 0x04, 0x3d, 0xc9,
0xe9, 0x09, 0xc6, 0xa1, 0x64, 0x09, 0x0a, 0x49, 0x93, 0xdc, 0xad, 0x6f, 0x3b, 0x3b, 0xb5, 0x60,
0xc3, 0xe8, 0x5f, 0x95, 0x6a, 0xf2, 0x08, 0xde, 0x9b, 0x14, 0x94, 0xd3, 0x54, 0x22, 0xce, 0xa1,
0x1b, 0x1a, 0x4d, 0xa6, 0xa6, 0xd9, 0x80, 0x1f, 0xc3, 0xa6, 0x82, 0x65, 0x85, 0x9c, 0x83, 0x37,
0x35, 0xbc, 0x67, 0x0d, 0x53, 0xb0, 0xf7, 0xb5, 0x03, 0xb7, 0x96, 0xe2, 0x25, 0xf2, 0x2c, 0x15,
0x78, 0x83, 0x80, 0xdd, 0x24, 0x61, 0xe4, 0x31, 0xd4, 0xd5, 0x97, 0x70, 0xab, 0x57, 0x2d, 0x25,
0x83, 0xf7, 0xfe, 0xe2, 0x00, 0xd9, 0xe3, 0x48, 0x25, 0x3e, 0x89, 0x19, 0x7d, 0x87, 0x3c, 0x7f,
0x17, 0x9a, 0xd1, 0x28, 0x4c, 0x69, 0x52, 0x1e, 0x88, 0x46, 0x34, 0x7a, 0x41, 0x13, 0x24, 0x3f,
0x84, 0x8d, 0x59, 0x62, 0x0d, 0xa0, 0xaa, 0x01, 0xeb, 0x33, 0xb5, 0x06, 0x6e, 0x41, 0x9d, 0xaa,
0x35, 0xb8, 0x35, 0x6d, 0x36, 0x82, 0x27, 0xa0, 0xe7, 0xf3, 0x2c, 0xff, 0x4f, 0xad, 0x6e, 0x3a,
0x69, 0x75, 0x7e, 0xd2, 0x3f, 0x3b, 0xb0, 0xf9, 0x24, 0x96, 0xc8, 0xbf, 0xa5, 0x41, 0xf9, 0x7b,
0xa5, 0xcc, 0x5a, 0x3f, 0x8d, 0xf0, 0xec, 0x7f, 0xb9, 0xc0, 0xf7, 0x01, 0x0e, 0x19, 0xc6, 0x91,
0xc1, 0x98, 0x55, 0xb6, 0xb5, 0x46, 0x9b, 0xcb, 0xe3, 0x5f, 0xbf, 0xe4, 0xf8, 0x37, 0x56, 0x1c,
0x7f, 0x17, 0x9a, 0xda, 0x49, 0xdf, 0xd7, 0x87, 0xae, 0x1a, 0x94, 0xa2, 0x22, 0x4f, 0x3c, 0x93,
0x9c, 0x96, 0xe4, 0xd9, 0xba, 0x32, 0x79, 0xea, 0x61, 0x96, 0x3c, 0xff, 0x58, 0x87, 0xb5, 0x21,
0x52, 0x3e, 0x3e, 0xba, 0x79, 0xf0, 0xb6, 0xa0, 0xce, 0xf1, 0x78, 0xca, 0x6d, 0x46, 0x98, 0xee,
0xb8, 0x7a, 0xc9, 0x8e, 0x6b, 0x57, 0x20, 0xbc, 0xfa, 0x0a, 0xc2, 0xeb, 0x41, 0x35, 0x12, 0xb1,
0x0e, 0x58, 0x3b, 0x50, 0x9f, 0x8a, 0xa6, 0xf2, 0x98, 0x8e, 0xf1, 0x28, 0x8b, 0x23, 0xe4, 0xe1,
0x84, 0x67, 0x85, 0xa1, 0xa9, 0x6e, 0xd0, 0x9b, 0x33, 0xec, 0x2b, 0x3d, 0x79, 0x0c, 0xad, 0x48,
0xc4, 0xa1, 0x3c, 0xcf, 0xd1, 0x6d, 0x6d, 0x3b, 0x3b, 0xeb, 0x6f, 0xd9, 0xa6, 0x2f, 0xe2, 0x57,
0xe7, 0x39, 0x06, 0xcd, 0xc8, 0x7c, 0x90, 0x4f, 0x60, 0x4b, 0x20, 0x67, 0x34, 0x66, 0x6f, 0x30,
0x0a, 0xf1, 0x2c, 0xe7, 0x61, 0x1e, 0xd3, 0xd4, 0x6d, 0xeb, 0x89, 0xc8, 0xcc, 0xf6, 0xec, 0x2c,
0xe7, 0x07, 0x31, 0x4d, 0xc9, 0x0e, 0xf4, 0xb2, 0x42, 0xe6, 0x85, 0x0c, 0x75, 0xde, 0x44, 0xc8,
0x22, 0x17, 0xf4, 0x8e, 0xd6, 0x8d, 0xfe, 0x0b, 0xad, 0xee, 0x47, 0x2b, 0x49, 0xbc, 0x73, 0x2d,
0x12, 0xef, 0x5e, 0x8f, 0xc4, 0xd7, 0x56, 0x93, 0x38, 0x59, 0x87, 0x4a, 0x7a, 0xec, 0xae, 0xeb,
0xd4, 0x54, 0xd2, 0x63, 0x95, 0x48, 0x99, 0xe5, 0xaf, 0xdd, 0x0d, 0x93, 0x48, 0xf5, 0x4d, 0x3e,
0x00, 0x48, 0x50, 0x72, 0x36, 0x56, 0x61, 0x71, 0x7b, 0x3a, 0x0f, 0x73, 0x1a, 0xf2, 0x7d, 0x58,
0x63, 0x93, 0x34, 0xe3, 0xb8, 0xcf, 0xb3, 0x53, 0x96, 0x4e, 0xdc, 0xcd, 0x6d, 0x67, 0xa7, 0x15,
0x2c, 0x2a, 0xc9, 0x5d, 0x68, 0x15, 0x42, 0xf5, 0x3d, 0x09, 0xba, 0x44, 0xfb, 0x98, 0xca, 0xde,
0x3f, 0x6a, 0xb3, 0xc2, 0x14, 0x45, 0x2c, 0xc5, 0x7f, 0xeb, 0x0a, 0x99, 0x56, 0x73, 0x75, 0xbe,
0x9a, 0x1f, 0x40, 0xc7, 0x6c, 0xcf, 0x54, 0x4d, 0xed, 0xc2, 0x8e, 0x1f, 0x40, 0x27, 0x2d, 0x92,
0xf0, 0xb8, 0x40, 0xce, 0x50, 0xd8, 0x73, 0x0e, 0x69, 0x91, 0xbc, 0x34, 0x1a, 0xf2, 0x1e, 0xd4,
0x65, 0x96, 0x87, 0xaf, 0xed, 0x31, 0x57, 0x71, 0xfc, 0x92, 0xfc, 0x0c, 0xee, 0x0a, 0xa4, 0x31,
0x46, 0xa1, 0xc0, 0x49, 0x82, 0xa9, 0xec, 0xfb, 0x22, 0x14, 0x7a, 0xdb, 0x18, 0xb9, 0x4d, 0x5d,
0x28, 0xae, 0x41, 0x0c, 0xa7, 0x80, 0xa1, 0xb5, 0xab, 0x3a, 0x18, 0x9b, 0x7e, 0x6e, 0x61, 0x58,
0x4b, 0x37, 0x3e, 0x64, 0x66, 0x9a, 0x0e, 0xf8, 0x29, 0xb8, 0x93, 0x38, 0x1b, 0xd1, 0x38, 0xbc,
0x30, 0xab, 0xdb, 0xd6, 0x93, 0xdd, 0x36, 0xf6, 0xe1, 0xd2, 0x94, 0x6a, 0x7b, 0x22, 0x66, 0x63,
0x8c, 0xc2, 0x51, 0x9c, 0x8d, 0x5c, 0xd0, 0x05, 0x0f, 0x46, 0xf5, 0x34, 0xce, 0x46, 0xaa, 0xd0,
0x2d, 0x40, 0x85, 0x61, 0x9c, 0x15, 0xa9, 0xd4, 0xe5, 0x5b, 0x0d, 0xd6, 0x8d, 0xfe, 0x45, 0x91,
0xec, 0x29, 0x2d, 0xf9, 0x1e, 0xac, 0x59, 0x64, 0x76, 0x78, 0x28, 0x50, 0xea, 0xba, 0xad, 0x06,
0x5d, 0xa3, 0xfc, 0xa5, 0xd6, 0x91, 0x03, 0xc5, 0xbb, 0x42, 0x3e, 0x99, 0x4c, 0x38, 0x4e, 0xa8,
0x3a, 0xf7, 0xba, 0x5e, 0x3b, 0xbb, 0x3f, 0x78, 0xb8, 0xb2, 0x71, 0x7e, 0xb8, 0xb7, 0x88, 0x0e,
0x96, 0x87, 0x7b, 0xc7, 0xb0, 0xb1, 0x84, 0x51, 0x54, 0xc3, 0x6d, 0x83, 0xa2, 0xca, 0xdf, 0x76,
0xa7, 0x0b, 0x3a, 0xb2, 0x0d, 0x1d, 0x81, 0xfc, 0x84, 0x8d, 0x0d, 0xc4, 0x50, 0xdc, 0xbc, 0x4a,
0x51, 0xb4, 0xcc, 0x24, 0x8d, 0x5f, 0xbc, 0xb4, 0x25, 0x53, 0x8a, 0xde, 0x3f, 0x6b, 0xb0, 0x11,
0xa8, 0x12, 0xc1, 0x13, 0xfc, 0x7f, 0xa2, 0xd7, 0xb7, 0xd1, 0x5c, 0xe3, 0x5a, 0x34, 0xd7, 0xbc,
0x32, 0xcd, 0xb5, 0xae, 0x45, 0x73, 0xed, 0xeb, 0xd1, 0x1c, 0xbc, 0x85, 0xe6, 0xb6, 0xa0, 0x1e,
0xb3, 0x84, 0x95, 0x55, 0x6a, 0x84, 0x8b, 0xc4, 0xd5, 0x5d, 0x45, 0x5c, 0x77, 0xa0, 0xc5, 0x84,
0x2d, 0xf2, 0x35, 0x0d, 0x68, 0x32, 0x61, 0xaa, 0xfb, 0x19, 0x3c, 0x60, 0x12, 0xb9, 0x2e, 0xb0,
0x10, 0xcf, 0x24, 0xa6, 0x42, 0x7d, 0x71, 0x8c, 0x8a, 0x31, 0x86, 0x9c, 0x4a, 0xb4, 0xd4, 0x7a,
0x7f, 0x0a, 0x7b, 0x56, 0xa2, 0x02, 0x0d, 0x0a, 0xa8, 0xc4, 0x05, 0x6a, 0xdc, 0x58, 0xa2, 0xc6,
0x6f, 0xaa, 0xf3, 0x65, 0xf5, 0x2d, 0x20, 0xc7, 0x8f, 0xa0, 0xca, 0x22, 0xd3, 0x9a, 0x75, 0x76,
0xdd, 0x45, 0x3f, 0xf6, 0x05, 0xdb, 0xf7, 0x45, 0xa0, 0x40, 0xe4, 0x17, 0xd0, 0xb1, 0x25, 0x12,
0x51, 0x49, 0x75, 0xf9, 0x75, 0x76, 0x3f, 0x58, 0x39, 0x46, 0xd7, 0x8c, 0x4f, 0x25, 0x0d, 0x4c,
0x6b, 0x25, 0xd4, 0x37, 0xf9, 0x39, 0xdc, 0xbb, 0x48, 0x99, 0xdc, 0x86, 0x23, 0x72, 0x1b, 0xba,
0xea, 0xee, 0x2c, 0x73, 0x66, 0x19, 0xaf, 0x88, 0xfc, 0x04, 0xb6, 0xe6, 0x48, 0x73, 0x36, 0xb0,
0xa9, 0x59, 0x73, 0x8e, 0x50, 0x67, 0x43, 0x2e, 0xa3, 0xcd, 0xd6, 0xa5, 0xb4, 0xf9, 0xef, 0xa7,
0xb1, 0x6f, 0x1c, 0x68, 0x0f, 0x32, 0x1a, 0xe9, 0x86, 0xf7, 0x06, 0x69, 0xbf, 0x0f, 0xed, 0xe9,
0xea, 0x2d, 0xa3, 0xcc, 0x14, 0xca, 0x3a, 0xed, 0x59, 0x6d, 0xa3, 0x3b, 0xd7, 0xc4, 0xce, 0x35,
0xa3, 0xb5, 0xc5, 0x66, 0xf4, 0x01, 0x74, 0x98, 0x5a, 0x50, 0x98, 0x53, 0x79, 0x64, 0x48, 0xa5,
0x1d, 0x80, 0x56, 0x1d, 0x28, 0x8d, 0xea, 0x56, 0x4b, 0x80, 0xee, 0x56, 0x1b, 0x57, 0xee, 0x56,
0xad, 0x13, 0xdd, 0xad, 0xfe, 0xd6, 0x01, 0xd0, 0x1b, 0x57, 0x65, 0x79, 0xd1, 0xa9, 0x73, 0x13,
0xa7, 0x8a, 0xed, 0xd4, 0x95, 0xc5, 0x31, 0xa6, 0x72, 0x96, 0x5b, 0x61, 0x83, 0x43, 0xd2, 0x22,
0x09, 0x8c, 0xc9, 0xe6, 0x55, 0x78, 0xbf, 0x77, 0x00, 0x74, 0x71, 0x9a, 0x65, 0x2c, 0xd3, 0xae,
0x73, 0x79, 0x1f, 0x5f, 0x59, 0x0c, 0xdd, 0xd3, 0x32, 0x74, 0x97, 0x3c, 0x5c, 0xa7, 0xe5, 0x31,
0xdb, 0xbc, 0x8d, 0xae, 0xfe, 0xf6, 0xfe, 0xe0, 0x40, 0xd7, 0xae, 0xce, 0x2c, 0x69, 0x21, 0xcb,
0xce, 0x72, 0x96, 0x75, 0x33, 0x93, 0x64, 0xfc, 0x3c, 0x14, 0xec, 0x4d, 0x79, 0xa7, 0x81, 0x51,
0x0d, 0xd9, 0x1b, 0x54, 0xfc, 0xa6, 0x43, 0x92, 0x9d, 0x8a, 0xf2, 0x4e, 0x53, 0x61, 0xc8, 0x4e,
0x85, 0xe2, 0x58, 0x8e, 0x63, 0x4c, 0x65, 0x7c, 0x1e, 0x26, 0x59, 0xc4, 0x0e, 0x19, 0x46, 0xba,
0x1a, 0x5a, 0x41, 0xaf, 0x34, 0x3c, 0xb7, 0x7a, 0xef, 0x6b, 0xf5, 0xaa, 0x36, 0x07, 0xaa, 0xfc,
0x6d, 0xf5, 0x5c, 0x4c, 0x6e, 0x50, 0xb5, 0x2a, 0xc4, 0xc6, 0x8f, 0x2a, 0x44, 0xf3, 0xa7, 0xa8,
0x1d, 0x2c, 0xe8, 0x54, 0x4f, 0x3a, 0x65, 0x7d, 0x13, 0xc7, 0x5a, 0x30, 0xa7, 0x51, 0x2b, 0x8f,
0xf0, 0x90, 0x16, 0xf1, 0xfc, 0xed, 0x50, 0x33, 0xb7, 0x83, 0x35, 0x2c, 0xfc, 0xc9, 0x58, 0xdf,
0xe3, 0x18, 0x61, 0x2a, 0x19, 0x8d, 0xf5, 0xff, 0xb1, 0x79, 0x4a, 0x76, 0x16, 0x29, 0x99, 0x7c,
0x0c, 0x04, 0xd3, 0x31, 0x3f, 0xcf, 0x55, 0x05, 0xe5, 0x54, 0x88, 0xd3, 0x8c, 0x47, 0xf6, 0x29,
0xb9, 0x39, 0xb5, 0x1c, 0x58, 0x03, 0xb9, 0x0d, 0x0d, 0x89, 0x29, 0x4d, 0xa5, 0x3d, 0x63, 0x56,
0xb2, 0xf7, 0x8a, 0x28, 0x72, 0xe4, 0x36, 0xa6, 0x4d, 0x26, 0x86, 0x4a, 0x54, 0x0f, 0x51, 0x71,
0x44, 0x77, 0x3f, 0xfb, 0x7c, 0xe6, 0xbe, 0x6e, 0x1e, 0xa2, 0x46, 0x5d, 0xfa, 0xf6, 0x9e, 0xc1,
0xe6, 0x80, 0x09, 0x79, 0x90, 0xc5, 0x6c, 0x7c, 0x7e, 0xe3, 0xae, 0xc3, 0xfb, 0x9d, 0x03, 0x64,
0xde, 0x8f, 0xfd, 0x8f, 0x33, 0xbb, 0x35, 0x9c, 0xab, 0xdf, 0x1a, 0x1f, 0x42, 0x37, 0xd7, 0x6e,
0x42, 0x96, 0x1e, 0x66, 0x65, 0xf6, 0x3a, 0x46, 0xa7, 0x62, 0x2b, 0xd4, 0xf3, 0x59, 0x05, 0x33,
0xe4, 0x59, 0x8c, 0x26, 0x79, 0xed, 0xa0, 0xad, 0x34, 0x81, 0x52, 0x78, 0x13, 0xb8, 0x33, 0x3c,
0xca, 0x4e, 0xf7, 0xb2, 0xf4, 0x90, 0x4d, 0x0a, 0x73, 0x6d, 0xbe, 0xc3, 0xff, 0x08, 0x17, 0x9a,
0x39, 0x95, 0xea, 0x4c, 0xd9, 0x1c, 0x95, 0xa2, 0xf7, 0x27, 0x07, 0xee, 0xae, 0x9a, 0xe9, 0x5d,
0xb6, 0xbf, 0x0f, 0x6b, 0x63, 0xe3, 0xce, 0x78, 0xbb, 0xfa, 0x7f, 0xce, 0xc5, 0x71, 0xde, 0x33,
0xa8, 0xe9, 0xe6, 0xe0, 0x11, 0x54, 0xb8, 0xd4, 0x2b, 0x58, 0xdf, 0x7d, 0xf0, 0x16, 0xa6, 0x50,
0x40, 0xfd, 0x78, 0xad, 0x70, 0x49, 0xba, 0xe0, 0x70, 0xbd, 0x53, 0x27, 0x70, 0xf8, 0x47, 0x7f,
0x75, 0xa0, 0x55, 0x9a, 0xc9, 0x26, 0xac, 0xf9, 0xfe, 0x60, 0x6f, 0xca, 0x55, 0xbd, 0xef, 0x90,
0x1e, 0x74, 0x7d, 0x7f, 0x70, 0x50, 0x76, 0x84, 0x3d, 0x87, 0x74, 0xa1, 0xe5, 0xfb, 0x03, 0x4d,
0x3e, 0xbd, 0x8a, 0x95, 0xbe, 0x88, 0x0b, 0x71, 0xd4, 0xab, 0x4e, 0x1d, 0x24, 0x39, 0x35, 0x0e,
0x6a, 0x64, 0x0d, 0xda, 0xfe, 0xf3, 0x41, 0x3f, 0x15, 0xc8, 0x65, 0xaf, 0x6e, 0x45, 0x1f, 0x63,
0x94, 0xd8, 0x6b, 0x90, 0x0d, 0xe8, 0xf8, 0xcf, 0x07, 0x4f, 0x8b, 0xf8, 0xb5, 0xba, 0xc7, 0x7a,
0x4d, 0x6d, 0x7f, 0x39, 0x30, 0x8f, 0x94, 0x5e, 0x4b, 0xbb, 0x7f, 0x39, 0x50, 0xcf, 0xa6, 0xf3,
0x5e, 0xfb, 0xe9, 0xe3, 0x5f, 0x7f, 0x36, 0x61, 0xf2, 0xa8, 0x18, 0xa9, 0x00, 0x3d, 0x32, 0x7b,
0xfd, 0x98, 0x65, 0xf6, 0xeb, 0x51, 0xb9, 0xdf, 0x47, 0x7a, 0xfb, 0x53, 0x31, 0x1f, 0x8d, 0x1a,
0x5a, 0xf3, 0xe9, 0xbf, 0x02, 0x00, 0x00, 0xff, 0xff, 0xb8, 0x21, 0x90, 0xae, 0x79, 0x17, 0x00,
0x00,
}

View File

@ -270,7 +270,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
}
log.Debug("Validate partition names.")
//fetch search_growing from query param
// fetch search_growing from query param
var ignoreGrowing bool
for i, kv := range t.request.GetQueryParams() {
if kv.GetKey() == IgnoreGrowingKey {
@ -284,7 +284,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
}
t.RetrieveRequest.IgnoreGrowing = ignoreGrowing
//fetch iteration_extension_reduce_rate from query param
// fetch iteration_extension_reduce_rate from query param
var iterationExtensionReduceRate int64
for i, kv := range t.request.GetQueryParams() {
if kv.GetKey() == IterationExtensionReduceRateKey {
@ -355,6 +355,11 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
return err
}
// Set username for this query request,
if username, _ := GetCurUserFromContext(ctx); username != "" {
t.RetrieveRequest.Username = username
}
if t.request.TravelTimestamp == 0 {
t.TravelTimestamp = t.BeginTs()
} else {
@ -396,7 +401,6 @@ func (t *queryTask) Execute(ctx context.Context) error {
nq: 1,
exec: t.queryShard,
})
if err != nil {
return merr.WrapErrShardDelegatorQueryFailed(err.Error())
}

View File

@ -384,6 +384,11 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
t.SearchRequest.Dsl = t.request.Dsl
t.SearchRequest.PlaceholderGroup = t.request.PlaceholderGroup
// Set username of this search request for feature like task scheduling.
if username, _ := GetCurUserFromContext(ctx); username != "" {
t.SearchRequest.Username = username
}
log.Ctx(ctx).Debug("search PreExecute done.",
zap.Uint64("travel_ts", travelTimestamp), zap.Uint64("guarantee_ts", guaranteeTs),
zap.Bool("use_default_consistency", useDefaultConsistency),

View File

@ -51,6 +51,12 @@ func (c *counter) Dec(label string, value int64) {
}
}
func (c *counter) Set(label string, value int64) {
c.Lock()
defer c.Unlock()
c.values[label] = value
}
func (c *counter) Get(label string) int64 {
c.Lock()
defer c.Unlock()

View File

@ -30,10 +30,9 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/planpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/collector"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tasks"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
@ -41,7 +40,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
@ -200,44 +198,20 @@ func (node *QueryNode) querySegments(ctx context.Context, req *querypb.QueryRequ
if collection == nil {
return nil, segments.ErrCollectionNotFound
}
// build plan
retrievePlan, err := segments.NewRetrievePlan(
collection,
req.Req.GetSerializedExprPlan(),
req.Req.GetTravelTimestamp(),
req.Req.Base.GetMsgID(),
)
if err != nil {
// Send task to scheduler and wait until it finished.
task := tasks.NewQueryTask(ctx, collection, node.manager, req)
if err := node.scheduler.Add(task); err != nil {
log.Warn("failed to add query task into scheduler", zap.Error(err))
return nil, err
}
defer retrievePlan.Delete()
var results []*segcorepb.RetrieveResults
collector.Counter.Inc(metricsinfo.ExecuteQueueType, 1)
if req.GetScope() == querypb.DataScope_Historical {
results, _, _, err = segments.RetrieveHistorical(ctx, node.manager, retrievePlan, req.Req.CollectionID, nil, req.GetSegmentIDs())
} else {
results, _, _, err = segments.RetrieveStreaming(ctx, node.manager, retrievePlan, req.Req.CollectionID, nil, req.GetSegmentIDs())
}
collector.Counter.Dec(metricsinfo.ExecuteQueueType, 1)
err := task.Wait()
if err != nil {
log.Warn("failed to execute task by node scheduler", zap.Error(err))
return nil, err
}
reducer := segments.CreateSegCoreReducer(req, collection.Schema())
reducedResult, err := reducer.Reduce(ctx, results)
if err != nil {
return nil, err
}
return &internalpb.RetrieveResults{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Ids: reducedResult.Ids,
FieldsData: reducedResult.FieldsData,
}, nil
return task.Result(), nil
}
func (node *QueryNode) optimizeSearchParams(ctx context.Context, req *querypb.SearchRequest, deleg delegator.ShardDelegator) (*querypb.SearchRequest, error) {

View File

@ -106,7 +106,7 @@ type QueryNode struct {
loader segments.Loader
// Search/Query
scheduler *tasks.Scheduler
scheduler tasks.Scheduler
// etcd client
etcdCli *clientv3.Client
@ -280,7 +280,11 @@ func (node *QueryNode) Init() error {
log.Info("queryNode try to connect etcd success", zap.String("MetaRootPath", paramtable.Get().EtcdCfg.MetaRootPath.GetValue()))
node.scheduler = tasks.NewScheduler()
schedulePolicy := paramtable.Get().QueryNodeCfg.SchedulePolicyName.GetValue()
node.scheduler = tasks.NewScheduler(
schedulePolicy,
)
log.Info("queryNode init scheduler", zap.String("policy", schedulePolicy))
node.clusterManager = cluster.NewWorkerManager(func(nodeID int64) (cluster.Worker, error) {
if nodeID == paramtable.GetNodeID() {
@ -345,7 +349,7 @@ func (node *QueryNode) Init() error {
// Start mainly start QueryNode's query service.
func (node *QueryNode) Start() error {
node.startOnce.Do(func() {
go node.scheduler.Schedule(node.ctx)
node.scheduler.Start(node.ctx)
paramtable.SetCreateTime(time.Now())
paramtable.SetUpdateTime(time.Now())

View File

@ -19,7 +19,6 @@ package querynodev2
import (
"context"
"fmt"
"strconv"
"sync"
@ -721,9 +720,8 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
}
task := tasks.NewSearchTask(searchCtx, collection, node.manager, req)
if !node.scheduler.Add(task) {
err := merr.WrapErrTaskQueueFull()
log.Warn("failed to search segments", zap.Error(err))
if err := node.scheduler.Add(task); err != nil {
log.Warn("failed to search channel", zap.Error(err))
return nil, err
}
@ -1254,7 +1252,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
}, nil
}
//translate segment action
// translate segment action
removeActions := make([]*querypb.SyncAction, 0)
addSegments := make(map[int64][]*querypb.SegmentLoadInfo)
for _, action := range req.GetActions() {

View File

@ -0,0 +1,246 @@
package tasks
import (
"context"
"fmt"
"github.com/milvus-io/milvus/internal/querynodev2/collector"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"go.uber.org/atomic"
"go.uber.org/zap"
)
const (
maxReceiveChanBatchConsumeNum = 100
)
// newScheduler create a scheduler with given schedule policy.
func newScheduler(policy schedulePolicy) Scheduler {
maxReadConcurrency := paramtable.Get().QueryNodeCfg.MaxReadConcurrency.GetAsInt()
maxReceiveChanSize := paramtable.Get().QueryNodeCfg.MaxReceiveChanSize.GetAsInt()
log.Info("query node use concurrent safe scheduler", zap.Int("max_concurrency", maxReadConcurrency))
return &scheduler{
policy: policy,
receiveChan: make(chan addTaskReq, maxReceiveChanSize),
execChan: make(chan Task),
pool: conc.NewPool[any](maxReadConcurrency, conc.WithPreAlloc(true)),
schedulerCounter: schedulerCounter{},
}
}
type addTaskReq struct {
task Task
err chan<- error
}
// scheduler is a general concurrent safe scheduler implementation by wrapping a schedule policy.
type scheduler struct {
policy schedulePolicy
receiveChan chan addTaskReq
execChan chan Task
pool *conc.Pool[any]
schedulerCounter
}
// Add a new task into scheduler,
// error will be returned if scheduler reaches some limit.
func (s *scheduler) Add(task Task) (err error) {
errCh := make(chan error, 1)
// TODO: add operation should be fast, is UnsolveLen metric unnesscery?
metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
// start a new in queue span and send task to add chan
s.receiveChan <- addTaskReq{
task: task,
err: errCh,
}
err = <-errCh
metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
return
}
// Start schedule the owned task asynchronously and continuously.
// Start should be only call once.
func (s *scheduler) Start(ctx context.Context) {
// Start a background task executing loop.
go s.exec(ctx)
// Begin to schedule tasks.
go s.schedule(ctx)
}
// schedule the owned task asynchronously and continuously.
func (s *scheduler) schedule(ctx context.Context) {
var task Task
for {
s.setupReadyLenMetric()
var execChan chan Task
task, execChan = s.setupExecListener(task)
select {
case <-ctx.Done():
log.Warn("unexpected quit of schedule loop")
return
case req := <-s.receiveChan:
// Receive add operation request and return the process result.
// And consume recv chan as much as possible.
s.consumeRecvChan(req, maxReceiveChanBatchConsumeNum)
case execChan <- task:
// Task sent, drop the ownership of sent task.
// And produce new task into execChan as much as possible.
task = s.produceExecChan()
}
}
}
// consumeRecvChan consume the recv chan as much as possible.
func (s *scheduler) consumeRecvChan(req addTaskReq, limit int) {
// Check the dynamic wait task limit.
maxWaitTaskNum := paramtable.Get().QueryNodeCfg.MaxUnsolvedQueueSize.GetAsInt64()
if !s.handleAddTaskRequest(req, maxWaitTaskNum) {
return
}
// consume the add chan until reaching the batch operation limit
for i := 1; i < limit; i++ {
select {
case req := <-s.receiveChan:
if !s.handleAddTaskRequest(req, maxWaitTaskNum) {
return
}
default:
return
}
}
}
// HandleAddTaskRequest handle a add task request.
// Return true if the process can be continued.
func (s *scheduler) handleAddTaskRequest(req addTaskReq, maxWaitTaskNum int64) bool {
if err := req.task.Canceled(); err != nil {
log.Warn("task canceled before enqueue", zap.Error(err))
req.err <- err
} else {
// Push the task into the policy to schedule and update the counter of the ready queue.
nq := req.task.NQ()
newTaskAdded, err := s.policy.Push(req.task)
if err == nil {
s.updateWaitingTaskCounter(int64(newTaskAdded), nq)
}
req.err <- err
}
// Continue processing if the queue isn't reach the max limit.
return s.GetWaitingTaskTotal() < maxWaitTaskNum
}
// produceExecChan produce task from scheduler into exec chan as much as possible
func (s *scheduler) produceExecChan() Task {
var task Task
for {
var execChan chan Task
task, execChan = s.setupExecListener(task)
select {
case execChan <- task:
// Task sent, drop the ownership of sent task.
task = nil
default:
return task
}
}
}
// exec exec the ready task in background continuously.
func (s *scheduler) exec(ctx context.Context) {
log.Info("start execute loop")
for {
select {
case <-ctx.Done():
log.Warn("unexpected quit of exec loop")
return
case t := <-s.execChan:
// Skip this task if task is canceled.
if err := t.Canceled(); err != nil {
log.Warn("task canceled before executing", zap.Error(err))
t.Done(err)
continue
}
if err := t.PreExecute(); err != nil {
log.Warn("failed to pre-execute task", zap.Error(err))
t.Done(err)
continue
}
s.pool.Submit(func() (any, error) {
// Update concurrency metric and notify task done.
metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
collector.Counter.Inc(metricsinfo.ExecuteQueueType, 1)
err := t.Execute()
// Update all metric after task finished.
s.updateWaitingTaskCounter(-1, -t.NQ())
metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
collector.Counter.Dec(metricsinfo.ExecuteQueueType, -1)
// Notify task done.
t.Done(err)
return nil, err
})
}
}
}
// setupExecListener setup the execChan and next task to run.
func (s *scheduler) setupExecListener(lastWaitingTask Task) (Task, chan Task) {
var execChan chan Task
if lastWaitingTask == nil {
// No task is waiting to send to execChan, schedule a new one from queue.
lastWaitingTask = s.policy.Pop()
}
if lastWaitingTask != nil {
// Try to sent task to execChan if there is a task ready to run.
execChan = s.execChan
}
return lastWaitingTask, execChan
}
// setupReadyLenMetric update the read task ready len metric.
func (s *scheduler) setupReadyLenMetric() {
waitingTaskCount := s.GetWaitingTaskTotal()
// Update the ReadyQueue counter for quota.
collector.Counter.Set(metricsinfo.ReadyQueueType, waitingTaskCount)
// Record the waiting task length of policy as ready task metric.
metrics.QueryNodeReadTaskReadyLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(waitingTaskCount))
}
// scheduler counter implement, concurrent safe.
type schedulerCounter struct {
waitingTaskTotal atomic.Int64
waitingTaskTotalNQ atomic.Int64
}
// GetWaitingTaskTotal get ready task counts.
func (s *schedulerCounter) GetWaitingTaskTotal() int64 {
return s.waitingTaskTotal.Load()
}
// GetWaitingTaskTotalNQ get ready task NQ.
func (s *schedulerCounter) GetWaitingTaskTotalNQ() int64 {
return s.waitingTaskTotalNQ.Load()
}
// updateWaitingTaskCounter update the waiting task counter for observing.
func (s *schedulerCounter) updateWaitingTaskCounter(num int64, nq int64) {
s.waitingTaskTotal.Add(num)
s.waitingTaskTotalNQ.Add(nq)
}

View File

@ -0,0 +1,81 @@
package tasks
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/stretchr/testify/assert"
)
func TestScheduler(t *testing.T) {
paramtable.Init()
t.Run("user-task-polling", func(t *testing.T) {
testScheduler(t, newUserTaskPollingPolicy())
})
t.Run("fifo", func(t *testing.T) {
testScheduler(t, newFIFOPolicy())
})
}
func testScheduler(t *testing.T, policy schedulePolicy) {
// start a new scheduler
scheduler := newScheduler(policy)
go scheduler.Start(context.Background())
var cnt atomic.Int32
n := 100
nq := 0
userN := 10
// Test Push
for i := 1; i <= n; i++ {
username := fmt.Sprintf("user_%d", rand.Int31n(int32(userN)))
task := newMockTask(mockTaskConfig{
username: username,
nq: int64(i),
executeCost: 10 * time.Millisecond,
execution: func(ctx context.Context) error {
cnt.Inc()
return nil
},
})
nq += i
assert.NoError(t, scheduler.Add(task))
total := int(scheduler.GetWaitingTaskTotal())
nqNow := int(scheduler.GetWaitingTaskTotalNQ())
assert.LessOrEqual(t, total, i)
assert.LessOrEqual(t, nqNow, nq)
}
time.Sleep(2 * time.Second)
assert.Equal(t, cnt.Load(), int32(n))
assert.Equal(t, 0, int(scheduler.GetWaitingTaskTotal()))
assert.Equal(t, 0, int(scheduler.GetWaitingTaskTotalNQ()))
// Test Push
for i := 1; i <= n; i++ {
username := fmt.Sprintf("user_%d", rand.Int31n(int32(userN)))
task := newMockTask(mockTaskConfig{
username: username,
executeCost: 10 * time.Millisecond,
execution: func(ctx context.Context) error {
cnt.Inc()
return nil
},
})
assert.NoError(t, scheduler.Add(task))
total := int(scheduler.GetWaitingTaskTotal())
nqNow := int(scheduler.GetWaitingTaskTotalNQ())
assert.LessOrEqual(t, total, i)
assert.LessOrEqual(t, nqNow, i)
}
time.Sleep(2 * time.Second)
assert.Equal(t, cnt.Load(), int32(2*n))
assert.Equal(t, 0, int(scheduler.GetWaitingTaskTotal()))
assert.Equal(t, 0, int(scheduler.GetWaitingTaskTotalNQ()))
}

View File

@ -0,0 +1,48 @@
package tasks
import (
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var _ schedulePolicy = &fifoPolicy{}
// newFIFOPolicy create a new fifo schedule policy.
func newFIFOPolicy() schedulePolicy {
return &fifoPolicy{
queue: newMergeTaskQueue(""),
}
}
// fifoPolicy is a fifo policy with merge queue.
type fifoPolicy struct {
queue *mergeTaskQueue
}
// Push add a new task into scheduler, an error will be returned if scheduler reaches some limit.
func (p *fifoPolicy) Push(task Task) (int, error) {
pt := paramtable.Get()
// Try to merge task if task can merge.
if t := tryIntoMergeTask(task); t != nil {
maxNQ := pt.QueryNodeCfg.MaxGroupNQ.GetAsInt64()
if p.queue.tryMerge(t, maxNQ) {
return 0, nil
}
}
// Add a new task into queue.
p.queue.push(task)
return 1, nil
}
// Pop get the task next ready to run.
func (p *fifoPolicy) Pop() Task {
task := p.queue.front()
p.queue.pop()
return task
}
// Len get ready task counts.
func (p *fifoPolicy) Len() int {
return p.queue.len()
}

View File

@ -0,0 +1,115 @@
package tasks
import (
"context"
"math/rand"
"time"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
var (
_ Task = &MockTask{}
_ MergeTask = &MockTask{}
)
type mockTaskConfig struct {
ctx context.Context
mergeAble bool
nq int64
username string
executeCost time.Duration
execution func(ctx context.Context) error
}
func newMockTask(c mockTaskConfig) Task {
if c.ctx == nil {
c.ctx = context.Background()
}
if c.nq == 0 {
c.nq = 1
}
if c.executeCost == 0 {
c.executeCost = time.Duration((rand.Int31n(4) + 1) * int32(time.Second))
}
return &MockTask{
ctx: c.ctx,
executeCost: c.executeCost,
notifier: make(chan error, 1),
mergeAble: c.mergeAble,
nq: c.nq,
username: c.username,
execution: c.execution,
tr: timerecord.NewTimeRecorderWithTrace(c.ctx, "searchTask"),
}
}
type MockTask struct {
ctx context.Context
executeCost time.Duration
notifier chan error
mergeAble bool
nq int64
username string
execution func(ctx context.Context) error
tr *timerecord.TimeRecorder
}
// QueryTypeMetricLabel Return Metric label for metric label.
func (t *MockTask) QueryTypeMetricLabel() string {
return "mock"
}
func (t *MockTask) Username() string {
return t.username
}
func (t *MockTask) TimeRecorder() *timerecord.TimeRecorder {
return t.tr
}
func (t *MockTask) PreExecute() error {
return nil
}
func (t *MockTask) Execute() error {
var err error
time.Sleep(t.executeCost)
if t.execution != nil {
err = t.execution(t.ctx)
}
return err
}
func (t *MockTask) Done(err error) {
t.notifier <- err
}
func (t *MockTask) Canceled() error {
return t.ctx.Err()
}
func (t *MockTask) Wait() error {
return <-t.notifier
}
// Return the context of task.
func (t *MockTask) Context() context.Context {
return t.ctx
}
func (t *MockTask) MergeWith(t2 Task) bool {
switch t2 := t2.(type) {
case *MockTask:
if t.mergeAble && t2.mergeAble {
t.nq += t2.nq
t.executeCost += t2.executeCost
return true
}
}
return false
}
func (t *MockTask) NQ() int64 {
return t.nq
}

View File

@ -0,0 +1,122 @@
package tasks
import (
"fmt"
"testing"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/stretchr/testify/assert"
)
func TestUserTaskPollingPolicy(t *testing.T) {
paramtable.Init()
testCommonPolicyOperation(t, newUserTaskPollingPolicy())
testCrossUserMerge(t, newUserTaskPollingPolicy())
}
func TestFIFOPolicy(t *testing.T) {
paramtable.Init()
testCommonPolicyOperation(t, newFIFOPolicy())
}
func testCrossUserMerge(t *testing.T, policy schedulePolicy) {
userN := 10
maxNQ := paramtable.Get().QueryNodeCfg.MaxGroupNQ.GetAsInt64()
// Do not open cross user merge.
n := userN * 4
for i := 1; i <= n; i++ {
username := fmt.Sprintf("user_%d", (i-1)%userN)
task := newMockTask(mockTaskConfig{
username: username,
nq: maxNQ / 2,
mergeAble: true,
})
policy.Push(task)
}
nAfterMerge := n / 2
assert.Equal(t, nAfterMerge, policy.Len())
for i := 1; i <= nAfterMerge; i++ {
assert.NotNil(t, policy.Pop())
assert.Equal(t, nAfterMerge-i, policy.Len())
}
// Open cross user grouping
paramtable.Get().QueryNodeCfg.SchedulePolicyEnableCrossUserGrouping.SwapTempValue("true")
for i := 1; i <= n; i++ {
username := fmt.Sprintf("user_%d", (i-1)%userN)
task := newMockTask(mockTaskConfig{
username: username,
nq: maxNQ / 4,
mergeAble: true,
})
policy.Push(task)
}
nAfterMerge = n / 4
assert.Equal(t, nAfterMerge, policy.Len())
for i := 1; i <= nAfterMerge; i++ {
assert.NotNil(t, policy.Pop())
assert.Equal(t, nAfterMerge-i, policy.Len())
}
}
// testCommonPolicyOperation
func testCommonPolicyOperation(t *testing.T, policy schedulePolicy) {
// Empty policy assertion.
assert.Equal(t, 0, policy.Len())
assert.Nil(t, policy.Pop())
assert.Equal(t, 0, policy.Len())
// Test no merge push pop.
n := 50
userN := 10
// Test Push
for i := 1; i <= n; i++ {
username := fmt.Sprintf("user_%d", (i-1)%userN)
task := newMockTask(mockTaskConfig{
username: username,
})
policy.Push(task)
assert.Equal(t, i, policy.Len())
}
// Test Pop
for i := 1; i <= n; i++ {
assert.NotNil(t, policy.Pop())
assert.Equal(t, n-i, policy.Len())
}
// Test with merge
maxNQ := paramtable.Get().QueryNodeCfg.MaxGroupNQ.GetAsInt64()
// cannot merge if the nq is gte than maxNQ
for i := 1; i <= n; i++ {
username := fmt.Sprintf("user_%d", (i-1)%userN)
task := newMockTask(mockTaskConfig{
username: username,
nq: maxNQ,
mergeAble: true,
})
policy.Push(task)
}
assert.Equal(t, n, policy.Len())
for i := 1; i <= n; i++ {
assert.NotNil(t, policy.Pop())
assert.Equal(t, n-i, policy.Len())
}
// Merge half MaxNQ
n = userN * 2
for i := 1; i <= n; i++ {
username := fmt.Sprintf("user_%d", (i-1)%userN)
task := newMockTask(mockTaskConfig{
username: username,
nq: maxNQ / 2,
mergeAble: true,
})
policy.Push(task)
}
nAfterMerge := n / 2
assert.Equal(t, nAfterMerge, policy.Len())
for i := 1; i <= nAfterMerge; i++ {
assert.NotNil(t, policy.Pop())
assert.Equal(t, nAfterMerge-i, policy.Len())
}
}

View File

@ -0,0 +1,149 @@
package tasks
import (
"context"
"strconv"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/collector"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
var _ Task = &QueryTask{}
func NewQueryTask(ctx context.Context,
collection *segments.Collection,
manager *segments.Manager,
req *querypb.QueryRequest,
) *QueryTask {
return &QueryTask{
ctx: ctx,
collection: collection,
segmentManager: manager,
req: req,
notifier: make(chan error, 1),
tr: timerecord.NewTimeRecorderWithTrace(ctx, "queryTask"),
}
}
type QueryTask struct {
ctx context.Context
collection *segments.Collection
segmentManager *segments.Manager
req *querypb.QueryRequest
result *internalpb.RetrieveResults
notifier chan error
tr *timerecord.TimeRecorder
}
// Return the username which task is belong to.
// Return "" if the task do not contain any user info.
func (t *QueryTask) Username() string {
return t.req.Req.GetUsername()
}
// PreExecute the task, only call once.
func (t *QueryTask) PreExecute() error {
// Update task wait time metric before execute
nodeID := strconv.FormatInt(paramtable.GetNodeID(), 10)
inQueueDuration := t.tr.ElapseSpan()
// Update in queue metric for prometheus.
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(
nodeID,
metrics.QueryLabel).
Observe(float64(inQueueDuration.Milliseconds()))
username := t.Username()
metrics.QueryNodeSQPerUserLatencyInQueue.WithLabelValues(
nodeID,
metrics.QueryLabel,
username).
Observe(float64(inQueueDuration.Milliseconds()))
// Update collector for query node quota.
collector.Average.Add(metricsinfo.QueryQueueMetric, float64(inQueueDuration.Microseconds()))
return nil
}
// Execute the task, only call once.
func (t *QueryTask) Execute() error {
retrievePlan, err := segments.NewRetrievePlan(
t.collection,
t.req.Req.GetSerializedExprPlan(),
t.req.Req.GetTravelTimestamp(),
t.req.Req.Base.GetMsgID(),
)
if err != nil {
return err
}
defer retrievePlan.Delete()
var results []*segcorepb.RetrieveResults
if t.req.GetScope() == querypb.DataScope_Historical {
results, _, _, err = segments.RetrieveHistorical(
t.ctx,
t.segmentManager,
retrievePlan,
t.req.Req.CollectionID,
nil,
t.req.GetSegmentIDs(),
)
} else {
results, _, _, err = segments.RetrieveStreaming(
t.ctx,
t.segmentManager,
retrievePlan,
t.req.Req.CollectionID,
nil,
t.req.GetSegmentIDs(),
)
}
if err != nil {
return err
}
reducer := segments.CreateSegCoreReducer(
t.req,
t.collection.Schema(),
)
reducedResult, err := reducer.Reduce(t.ctx, results)
if err != nil {
return err
}
t.result = &internalpb.RetrieveResults{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Ids: reducedResult.Ids,
FieldsData: reducedResult.FieldsData,
}
return nil
}
func (t *QueryTask) Done(err error) {
t.notifier <- err
}
func (t *QueryTask) Canceled() error {
return t.ctx.Err()
}
func (t *QueryTask) Wait() error {
return <-t.notifier
}
func (t *QueryTask) Result() *internalpb.RetrieveResults {
return t.result
}
func (t *QueryTask) NQ() int64 {
return 1
}

View File

@ -0,0 +1,209 @@
package tasks
import (
"container/ring"
"time"
)
func newMergeTaskQueue(group string) *mergeTaskQueue {
return &mergeTaskQueue{
name: group,
tasks: make([]Task, 0),
cleanupTimestamp: time.Now(),
}
}
type mergeTaskQueue struct {
name string
tasks []Task
cleanupTimestamp time.Time
}
// len returns the length of taskQueue.
func (q *mergeTaskQueue) len() int {
return len(q.tasks)
}
// push add a new task to the end of taskQueue.
func (q *mergeTaskQueue) push(t Task) {
q.tasks = append(q.tasks, t)
}
// front returns the first element of taskQueue,
// returns nil if task queue is empty.
func (q *mergeTaskQueue) front() Task {
if q.len() > 0 {
return q.tasks[0]
}
return nil
}
// pop pops the first element of taskQueue,
func (q *mergeTaskQueue) pop() {
if q.len() > 0 {
q.tasks = q.tasks[1:]
if q.len() == 0 {
q.cleanupTimestamp = time.Now()
}
}
}
// Return true if user based task is empty and empty for d time.
func (q *mergeTaskQueue) expire(d time.Duration) bool {
if q.len() != 0 {
return false
}
if time.Since(q.cleanupTimestamp) > d {
return true
}
return false
}
// tryMerge try to a new task to any task in queue.
func (q *mergeTaskQueue) tryMerge(task MergeTask, maxNQ int64) bool {
nqRest := maxNQ - task.NQ()
// No need to perform any merge if task.nq is greater than maxNQ.
if nqRest <= 0 {
return false
}
for i := q.len() - 1; i >= 0; i-- {
if taskInQueue := tryIntoMergeTask(q.tasks[i]); taskInQueue != nil {
// Try to merge it if limit of nq is enough.
if taskInQueue.NQ() <= nqRest && taskInQueue.MergeWith(task) {
return true
}
}
}
return false
}
// newFairPollingTaskQueue create a fair polling task queue.
func newFairPollingTaskQueue() *fairPollingTaskQueue {
return &fairPollingTaskQueue{
count: 0,
route: make(map[string]*ring.Ring),
checkpoint: nil,
}
}
// fairPollingTaskQueue is a fairly polling queue.
type fairPollingTaskQueue struct {
count int
route map[string]*ring.Ring
checkpoint *ring.Ring
}
// len returns the item count in FairPollingQueue.
func (q *fairPollingTaskQueue) len() int {
return q.count
}
// groupLen returns the length of a group.
func (q *fairPollingTaskQueue) groupLen(group string) int {
if r, ok := q.route[group]; ok {
return r.Value.(*mergeTaskQueue).len()
}
return 0
}
// tryMergeWithOtherGroup try to merge given task into exists tasks in the other group.
func (q *fairPollingTaskQueue) tryMergeWithOtherGroup(group string, task MergeTask, maxNQ int64) bool {
if q.count == 0 {
return false
}
// Try to merge task into other group before checkpoint.
node := q.checkpoint.Prev()
queuesLen := q.checkpoint.Len()
for i := 0; i < queuesLen; i++ {
prev := node.Prev()
queue := node.Value.(*mergeTaskQueue)
if queue.len() == 0 || queue.name == group {
continue
}
if queue.tryMerge(task, maxNQ) {
return true
}
node = prev
}
return false
}
// tryMergeWithSameGroup try to merge given task into exists tasks in the same group.
func (q *fairPollingTaskQueue) tryMergeWithSameGroup(group string, task MergeTask, maxNQ int64) bool {
if q.count == 0 {
return false
}
// Applied to task with same group first.
if r, ok := q.route[group]; ok {
// Try to merge task into queue.
if r.Value.(*mergeTaskQueue).tryMerge(task, maxNQ) {
return true
}
}
return false
}
// push add a new task into queue, try merge first.
func (q *fairPollingTaskQueue) push(group string, task Task) {
// Add a new task.
if r, ok := q.route[group]; ok {
// Add new task to the back of queue if queue exist.
r.Value.(*mergeTaskQueue).push(task)
} else {
// Create a new task queue, and add it to the route and queues.
newQueue := newMergeTaskQueue(group)
newQueue.push(task)
newRing := ring.New(1)
newRing.Value = newQueue
q.route[group] = newRing
if q.checkpoint == nil {
// Create new ring if not exist.
q.checkpoint = newRing
} else {
// Add the new ring before the checkpoint.
q.checkpoint.Prev().Link(newRing)
}
}
q.count++
}
// pop pop next ready task.
func (q *fairPollingTaskQueue) pop(queueExpire time.Duration) (task Task) {
// Return directly if there's no task exists.
if q.count == 0 {
return
}
checkpoint := q.checkpoint
queuesLen := q.checkpoint.Len()
for i := 0; i < queuesLen; i++ {
next := checkpoint.Next()
// Find task in this queue.
queue := checkpoint.Value.(*mergeTaskQueue)
// empty task queue for this user.
if queue.len() == 0 {
// expire the queue.
if queue.expire(queueExpire) {
delete(q.route, queue.name)
if checkpoint.Len() == 1 {
checkpoint = nil
break
} else {
checkpoint.Prev().Unlink(1)
}
}
checkpoint = next
continue
}
task = queue.front()
queue.pop()
q.count--
checkpoint = next
break
}
// Update checkpoint.
q.checkpoint = checkpoint
return
}

View File

@ -0,0 +1,163 @@
package tasks
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestMergeTaskQueue(t *testing.T) {
q := newMergeTaskQueue("test_user")
assert.Equal(t, 0, q.len())
assert.Nil(t, q.front())
q.pop()
assert.Nil(t, q.front())
assert.Equal(t, 0, q.len())
assert.False(t, q.expire(5*time.Second))
time.Sleep(1 * time.Second)
assert.True(t, q.expire(1*time.Second))
// Test push.
n := 50
for i := 1; i <= n; i++ {
task := newMockTask(mockTaskConfig{
username: "test_user",
})
q.push(task)
assert.Equal(t, i, q.len())
assert.False(t, q.expire(time.Second))
}
// Test Pop.
for i := 0; i < n; i++ {
q.pop()
assert.Equal(t, n-(i+1), q.len())
assert.False(t, q.expire(time.Second))
}
time.Sleep(time.Second)
assert.Equal(t, q.expire(time.Second), true)
// Test Merge.
task := newMockTask(mockTaskConfig{
username: "test_user",
mergeAble: true,
nq: 1,
})
q.push(task)
for i := 1; i <= 20; i++ {
task := newMockTask(mockTaskConfig{
username: "test_user",
mergeAble: true,
nq: int64(i),
})
assert.Equal(t, i <= 10, q.tryMerge(tryIntoMergeTask(task), 56))
}
for i := 1; i <= 2; i++ {
task := newMockTask(mockTaskConfig{
username: "test_user",
mergeAble: true,
nq: int64(1),
})
q.push(task)
}
for i := 1; i <= 20; i++ {
task := newMockTask(mockTaskConfig{
username: "test_user",
mergeAble: true,
nq: int64(i),
})
// 2 + 1 + ... + 9 < 55
// 1 + 10 + 11 + 12 + 13 < 55
assert.Equal(t, i <= 13, q.tryMerge(tryIntoMergeTask(task), 55))
}
}
func TestFairPollingTaskQueue(t *testing.T) {
q := newFairPollingTaskQueue()
assert.Equal(t, 0, q.len())
assert.Equal(t, 0, q.groupLen(""))
assert.Nil(t, q.pop(time.Second))
assert.Equal(t, 0, q.len())
assert.Equal(t, 0, q.groupLen(""))
n := 50
userN := 10
// Test Push
for i := 1; i <= n; i++ {
username := fmt.Sprintf("user_%d", (i-1)%userN)
task := newMockTask(mockTaskConfig{
username: username,
})
q.push(username, task)
assert.Equal(t, i, q.len())
assert.Equal(t, (i-1)/10+1, q.groupLen(username))
}
assert.Equal(t, userN, len(q.route))
// Test Pop
for i := 1; i <= n; i++ {
task := q.pop(time.Minute)
assert.NotNil(t, task)
username := task.Username()
expectedUserName := fmt.Sprintf("user_%d", (i-1)%userN)
assert.Equal(t, n-i, q.len())
assert.Equal(t, expectedUserName, username)
}
// Test Expire inner queue.
assert.Equal(t, userN, len(q.route))
time.Sleep(time.Second)
username := "test_user"
task := newMockTask(mockTaskConfig{
username: username,
})
q.push(username, task)
assert.Equal(t, userN+1, len(q.route))
assert.NotNil(t, q.pop(time.Second))
assert.Equal(t, 1, len(q.route))
// Test Merge.
// test on empty queue.
task = newMockTask(mockTaskConfig{
username: username,
mergeAble: true,
})
assert.False(t, q.tryMergeWithSameGroup(username, tryIntoMergeTask(task), 1))
assert.False(t, q.tryMergeWithOtherGroup(username, tryIntoMergeTask(task), 1))
// Add basic user info first.
for i := 1; i <= userN; i++ {
username := fmt.Sprintf("user_%d", (i-1)%userN)
task := newMockTask(mockTaskConfig{
username: username,
mergeAble: true,
})
q.push(username, task)
assert.Equal(t, i, q.len())
assert.Equal(t, 1, q.groupLen(username))
}
// Try to merge with same group.
for i := 1; i <= n; i++ {
username := fmt.Sprintf("user_%d", (i-1)%userN)
task := newMockTask(mockTaskConfig{
username: username,
mergeAble: true,
})
success := q.tryMergeWithSameGroup(username, tryIntoMergeTask(task), int64(n/userN))
assert.Equal(t, i+userN <= n, success)
assert.Equal(t, userN, q.len())
assert.Equal(t, 1, q.groupLen(username))
}
// Try to merge with other group.
task = newMockTask(mockTaskConfig{
username: username,
mergeAble: true,
})
assert.False(t, q.tryMergeWithOtherGroup(username, tryIntoMergeTask(task), int64(n/userN)))
assert.True(t, q.tryMergeWithOtherGroup(username, tryIntoMergeTask(task), int64(n/userN)+1))
assert.Equal(t, 0, q.groupLen(username))
}

View File

@ -1,198 +0,0 @@
package tasks
import (
"context"
"fmt"
"github.com/milvus-io/milvus/internal/querynodev2/collector"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"go.uber.org/atomic"
"go.uber.org/zap"
)
const (
MaxProcessTaskNum = 1024 * 10
)
type Scheduler struct {
searchWaitQueue chan *SearchTask
mergingSearchTasks []*SearchTask
mergedSearchTasks chan *SearchTask
pool *conc.Pool[any]
waitingTaskTotalNQ atomic.Int64
}
func NewScheduler() *Scheduler {
maxWaitTaskNum := paramtable.Get().QueryNodeCfg.MaxReceiveChanSize.GetAsInt()
maxReadConcurrency := paramtable.Get().QueryNodeCfg.MaxReadConcurrency.GetAsInt()
return &Scheduler{
searchWaitQueue: make(chan *SearchTask, maxWaitTaskNum),
mergingSearchTasks: make([]*SearchTask, 0),
mergedSearchTasks: make(chan *SearchTask, 1),
pool: conc.NewPool[any](maxReadConcurrency, conc.WithPreAlloc(true)),
waitingTaskTotalNQ: *atomic.NewInt64(0),
}
}
func (s *Scheduler) Add(task Task) bool {
switch t := task.(type) {
case *SearchTask:
t.tr.RecordSpan()
select {
case s.searchWaitQueue <- t:
collector.Counter.Inc(metricsinfo.ReadyQueueType, 1)
s.waitingTaskTotalNQ.Add(t.nq)
metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
default:
return false
}
}
return true
}
// schedule all tasks in the order:
// try execute merged tasks
// try execute waiting tasks
func (s *Scheduler) Schedule(ctx context.Context) {
go s.processAll(ctx)
for {
if len(s.mergingSearchTasks) > 0 { // wait for an idle worker or a new task
task := s.mergingSearchTasks[0]
select {
case <-ctx.Done():
return
case task = <-s.searchWaitQueue:
collector.Counter.Dec(metricsinfo.ReadyQueueType, 1)
s.schedule(task)
case s.mergedSearchTasks <- task:
s.mergingSearchTasks = s.mergingSearchTasks[1:]
}
} else { // wait for a new task if no task
select {
case <-ctx.Done():
return
case task := <-s.searchWaitQueue:
collector.Counter.Dec(metricsinfo.ReadyQueueType, 1)
s.schedule(task)
}
}
}
}
func (s *Scheduler) schedule(task Task) {
collector.Counter.Inc(metricsinfo.ExecuteQueueType, 1)
// add this task
if err := task.Canceled(); err != nil {
task.Done(err)
return
}
s.mergeTasks(task)
metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
maxNq := paramtable.Get().QueryNodeCfg.MaxGroupNQ.GetAsInt64()
totalNq := task.(*SearchTask).nq
// try to merge the coming tasks
maxMergingTaskNum := paramtable.Get().QueryNodeCfg.MaxUnsolvedQueueSize.GetAsInt()
outer:
for len(s.mergingSearchTasks) < maxMergingTaskNum && totalNq < maxNq {
select {
case t := <-s.searchWaitQueue:
if err := t.Canceled(); err != nil {
t.Done(err)
continue
}
s.mergeTasks(t)
totalNq += t.nq
metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
default:
break outer
}
}
// submit existing tasks to the pool
processedCount := 0
processOuter:
for i := range s.mergingSearchTasks {
select {
case s.mergedSearchTasks <- s.mergingSearchTasks[i]:
processedCount++
default:
break processOuter
}
}
s.mergingSearchTasks = s.mergingSearchTasks[processedCount:]
metrics.QueryNodeReadTaskReadyLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(processedCount))
}
func (s *Scheduler) processAll(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case task := <-s.mergedSearchTasks:
inQueueDuration := task.tr.RecordSpan()
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
metrics.SearchLabel).
Observe(float64(inQueueDuration.Milliseconds()))
s.process(task)
}
}
}
func (s *Scheduler) process(t Task) {
err := t.PreExecute()
if err != nil {
log.Warn("failed to pre-execute task", zap.Error(err))
}
s.pool.Submit(func() (any, error) {
metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
err = t.Execute()
t.Done(err)
metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
return nil, err
})
switch t := t.(type) {
case *SearchTask:
s.waitingTaskTotalNQ.Sub(t.nq)
}
}
// mergeTasks merge the given task with one of merged tasks,
func (s *Scheduler) mergeTasks(t Task) {
switch t := t.(type) {
case *SearchTask:
merged := false
for i := len(s.mergingSearchTasks) - 1; i >= 0; i-- {
if s.mergingSearchTasks[i].Merge(t) {
merged = true
break
}
}
if !merged {
s.mergingSearchTasks = append(s.mergingSearchTasks, t)
}
}
}
func (s *Scheduler) GetWaitingTaskTotalNQ() int64 {
return s.waitingTaskTotalNQ.Load()
}

View File

@ -1,9 +1,12 @@
package tasks
// TODO: rename this file into search_task.go
import (
"bytes"
"context"
"fmt"
"strconv"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
@ -22,13 +25,10 @@ import (
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
type Task interface {
PreExecute() error
Execute() error
Done(err error)
Canceled() error
Wait() error
}
var (
_ Task = &SearchTask{}
_ MergeTask = &SearchTask{}
)
type SearchTask struct {
ctx context.Context
@ -71,9 +71,34 @@ func NewSearchTask(ctx context.Context,
}
}
// Return the username which task is belong to.
// Return "" if the task do not contain any user info.
func (t *SearchTask) Username() string {
return t.req.Req.GetUsername()
}
func (t *SearchTask) PreExecute() error {
// update task wait time metric before execute
collector.Average.Add(metricsinfo.SearchQueueMetric, float64(t.tr.ElapseSpan().Microseconds()))
// Update task wait time metric before execute
nodeID := strconv.FormatInt(paramtable.GetNodeID(), 10)
inQueueDuration := t.tr.ElapseSpan()
// Update in queue metric for prometheus.
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(
nodeID,
metrics.SearchLabel).
Observe(float64(inQueueDuration.Milliseconds()))
username := t.Username()
metrics.QueryNodeSQPerUserLatencyInQueue.WithLabelValues(
nodeID,
metrics.SearchLabel,
username).
Observe(float64(inQueueDuration.Milliseconds()))
// Update collector for query node quota.
collector.Average.Add(metricsinfo.SearchQueueMetric, float64(inQueueDuration.Microseconds()))
// Execute merged task's PreExecute.
for _, subTask := range t.others {
err := subTask.PreExecute()
if err != nil {
@ -241,16 +266,12 @@ func (t *SearchTask) Merge(other *SearchTask) bool {
}
func (t *SearchTask) Done(err error) {
collector.Counter.Dec(metricsinfo.ExecuteQueueType, 1)
if !t.merged {
metrics.QueryNodeSearchGroupSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(t.groupSize))
metrics.QueryNodeSearchGroupNQ.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(t.nq))
metrics.QueryNodeSearchGroupTopK.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(t.topk))
}
select {
case t.notifier <- err:
default:
}
t.notifier <- err
for _, other := range t.others {
other.Done(err)
}
@ -268,6 +289,18 @@ func (t *SearchTask) Result() *internalpb.SearchResults {
return t.result
}
func (t *SearchTask) NQ() int64 {
return t.nq
}
func (t *SearchTask) MergeWith(other Task) bool {
switch other := other.(type) {
case *SearchTask:
return t.Merge(other)
}
return false
}
// combinePlaceHolderGroups combine all the placeholder groups.
func (t *SearchTask) combinePlaceHolderGroups() {
if len(t.others) > 0 {
@ -281,6 +314,3 @@ func (t *SearchTask) combinePlaceHolderGroups() {
t.placeholderGroup, _ = proto.Marshal(ret)
}
}
type QueryTask struct {
}

View File

@ -0,0 +1,105 @@
package tasks
import (
"context"
)
const (
schedulePolicyNameFIFO = "fifo"
schedulePolicyNameUserTaskPolling = "user-task-polling"
)
// NewScheduler create a scheduler by policyName.
func NewScheduler(policyName string) Scheduler {
switch policyName {
case "":
fallthrough
case schedulePolicyNameFIFO:
return newScheduler(
newFIFOPolicy(),
)
case schedulePolicyNameUserTaskPolling:
return newScheduler(
newUserTaskPollingPolicy(),
)
default:
panic("invalid schedule task policy")
}
}
// tryIntoMergeTask convert inner task into MergeTask,
// Return nil if inner task is not a MergeTask.
func tryIntoMergeTask(t Task) MergeTask {
if mt, ok := t.(MergeTask); ok {
return mt
}
return nil
}
type Scheduler interface {
// Add a new task into scheduler, follow some constraints.
// 1. It's a non-block operation.
// 2. Error will be returned if scheduler reaches some limit.
// 3. Concurrent safe.
Add(task Task) error
// Start schedule the owned task asynchronously and continuously.
// 1. Stop processing until ctx.Cancel() is called.
// 2. Only call once.
Start(ctx context.Context)
// GetWaitingTaskTotalNQ
GetWaitingTaskTotalNQ() int64
// GetWaitingTaskTotal
GetWaitingTaskTotal() int64
}
// schedulePolicy is the policy of scheduler.
type schedulePolicy interface {
// Push add a new task into scheduler.
// Return the count of new task added (task may be chunked, merged or dropped)
// 0 and an error will be returned if scheduler reaches some limit.
Push(task Task) (int, error)
// Pop get the task next ready to run.
Pop() Task
Len() int
}
// MergeTask is a Task which can be merged with other task
type MergeTask interface {
Task
// MergeWith other task, return true if merge success.
// After success, the task merged should be dropped.
MergeWith(Task) bool
}
// A task is execute unit of scheduler.
type Task interface {
// Return the username which task is belong to.
// Return "" if the task do not contain any user info.
Username() string
// PreExecute the task, only call once.
PreExecute() error
// Execute the task, only call once.
Execute() error
// Done notify the task finished.
Done(err error)
// Check if the Task is canceled.
// Concurrent safe.
Canceled() error
// Wait for task finish.
// Concurrent safe.
Wait() error
// Return the NQ of task.
NQ() int64
}

View File

@ -0,0 +1,71 @@
package tasks
import (
"fmt"
"time"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var _ schedulePolicy = &userTaskPollingPolicy{}
// newUserTaskPollingPolicy create a new user task polling schedule policy.
func newUserTaskPollingPolicy() *userTaskPollingPolicy {
return &userTaskPollingPolicy{
queue: newFairPollingTaskQueue(),
}
}
// userTaskPollingPolicy is a user based polling schedule policy.
type userTaskPollingPolicy struct {
queue *fairPollingTaskQueue
}
// Push add a new task into scheduler, an error will be returned if scheduler reaches some limit.
func (p *userTaskPollingPolicy) Push(task Task) (int, error) {
pt := paramtable.Get()
username := task.Username()
// Try to merge task if task is mergeable.
if t := tryIntoMergeTask(task); t != nil {
// Try to merge with same group first.
maxNQ := pt.QueryNodeCfg.MaxGroupNQ.GetAsInt64()
if p.queue.tryMergeWithSameGroup(username, t, maxNQ) {
return 0, nil
}
// Try to merge with other group if option is enabled.
enableCrossGroupMerge := pt.QueryNodeCfg.SchedulePolicyEnableCrossUserGrouping.GetAsBool()
if enableCrossGroupMerge && p.queue.tryMergeWithOtherGroup(username, t, maxNQ) {
return 0, nil
}
}
// Check if length of user queue is greater than limit.
taskGroupLen := p.queue.groupLen(username)
if taskGroupLen > 0 {
limit := pt.QueryNodeCfg.SchedulePolicyMaxPendingTaskPerUser.GetAsInt()
if limit > 0 && taskGroupLen >= limit {
return 0, merr.WrapErrServiceRequestLimitExceeded(
int32(limit),
fmt.Sprintf("limit by %s", pt.QueryNodeCfg.SchedulePolicyMaxPendingTaskPerUser.Key),
)
}
}
// Add a new task into queue.
p.queue.push(username, task)
return 1, nil
}
// Pop get the task next ready to run.
func (p *userTaskPollingPolicy) Pop() Task {
expire := paramtable.Get().QueryNodeCfg.SchedulePolicyTaskQueueExpire.GetAsDuration(time.Second)
return p.queue.pop(expire)
}
// Len get ready task counts.
func (p *userTaskPollingPolicy) Len() int {
return p.queue.len()
}

View File

@ -152,6 +152,20 @@ var (
queryTypeLabelName,
})
QueryNodeSQPerUserLatencyInQueue = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "sq_queue_user_latency",
Help: "latency per user of search or query in queue",
Buckets: buckets,
}, []string{
nodeIDLabelName,
queryTypeLabelName,
usernameLabelName,
},
)
QueryNodeSQSegmentLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
@ -394,6 +408,7 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeSQReqLatency)
registry.MustRegister(QueryNodeSQLatencyWaitTSafe)
registry.MustRegister(QueryNodeSQLatencyInQueue)
registry.MustRegister(QueryNodeSQPerUserLatencyInQueue)
registry.MustRegister(QueryNodeSQSegmentLatency)
registry.MustRegister(QueryNodeSQSegmentLatencyInCore)
registry.MustRegister(QueryNodeReduceLatency)
@ -437,5 +452,4 @@ func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {
collectionIDLabelName: fmt.Sprint(collectionID),
})
}
}

View File

@ -1479,6 +1479,12 @@ type queryNodeConfig struct {
// loader
IoPoolSize ParamItem `refreshable:"false"`
// schedule task policy.
SchedulePolicyName ParamItem `refreshable:"false"`
SchedulePolicyTaskQueueExpire ParamItem `refreshable:"true"`
SchedulePolicyEnableCrossUserGrouping ParamItem `refreshable:"true"`
SchedulePolicyMaxPendingTaskPerUser ParamItem `refreshable:"true"`
}
func (p *queryNodeConfig) init(base *BaseTable) {
@ -1803,6 +1809,36 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
Doc: "Control how many goroutines will the loader use to pull files, if the given value is non-positive, the value will be set to CpuNum * 8, at least 32, and at most 256",
}
p.IoPoolSize.Init(base.mgr)
// schedule read task policy.
p.SchedulePolicyName = ParamItem{
Key: "queryNode.scheduler.scheduleReadPolicy.name",
Version: "2.3.0",
DefaultValue: "fifo",
Doc: "Control how to schedule query/search read task in query node",
}
p.SchedulePolicyName.Init(base.mgr)
p.SchedulePolicyTaskQueueExpire = ParamItem{
Key: "queryNode.scheduler.scheduleReadPolicy.taskQueueExpire",
Version: "2.3.0",
DefaultValue: "60",
Doc: "Control how long (many seconds) that queue retains since queue is empty",
}
p.SchedulePolicyTaskQueueExpire.Init(base.mgr)
p.SchedulePolicyEnableCrossUserGrouping = ParamItem{
Key: "queryNode.scheduler.scheduleReadPolicy.enableCrossUserGrouping",
Version: "2.3.0",
DefaultValue: "false",
Doc: "Enable Cross user grouping when using user-task-polling policy. (Disable it if user's task can not merge each other)",
}
p.SchedulePolicyEnableCrossUserGrouping.Init(base.mgr)
p.SchedulePolicyMaxPendingTaskPerUser = ParamItem{
Key: "queryNode.scheduler.scheduleReadPolicy.maxPendingTaskPerUser",
Version: "2.3.0",
DefaultValue: "1024",
Doc: "Max pending task per user in scheduler",
}
p.SchedulePolicyMaxPendingTaskPerUser.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////

View File

@ -16,6 +16,8 @@ import (
"strings"
"time"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/util/funcutil"
)
@ -33,6 +35,9 @@ type ParamItem struct {
Forbidden bool
manager *config.Manager
// for unittest.
tempValue atomic.Pointer[string]
}
func (pi *ParamItem) Init(manager *config.Manager) {
@ -44,6 +49,11 @@ func (pi *ParamItem) Init(manager *config.Manager) {
// Get original value with error
func (pi *ParamItem) get() (string, error) {
// For unittest.
if s := pi.tempValue.Load(); s != nil {
return *s, nil
}
if pi.manager == nil {
panic(fmt.Sprintf("manager is nil %s", pi.Key))
}
@ -68,6 +78,16 @@ func (pi *ParamItem) get() (string, error) {
return ret, err
}
// SetTempValue set the value for this ParamItem,
// Once value set, ParamItem will use the value instead of underlying config manager.
// Usage: should only use for unittest, swap empty string will remove the value.
func (pi *ParamItem) SwapTempValue(s string) *string {
if s == "" {
return pi.tempValue.Swap(nil)
}
return pi.tempValue.Swap(&s)
}
func (pi *ParamItem) GetValue() string {
v, _ := pi.get()
return v