From f18dfb4ff690b805dc57ef38f6a587b95808f1c6 Mon Sep 17 00:00:00 2001 From: yukun Date: Thu, 20 May 2021 15:02:31 +0800 Subject: [PATCH] Add RetrieveTask implementation (#5313) Resolves: #5257 Signed-off-by: fishpenguin --- internal/core/src/pb/common.pb.cc | 23 ++-- internal/core/src/pb/common.pb.h | 2 + internal/msgstream/msg.go | 98 +++++++++++++++++ internal/proto/common.proto | 2 + internal/proto/commonpb/common.pb.go | 155 ++++++++++++++------------- internal/proxynode/paramtable.go | 32 +++--- internal/proxynode/task.go | 132 +++++++++++++++++++++++ 7 files changed, 345 insertions(+), 99 deletions(-) diff --git a/internal/core/src/pb/common.pb.cc b/internal/core/src/pb/common.pb.cc index ae3aa427d2..e505b99270 100644 --- a/internal/core/src/pb/common.pb.cc +++ b/internal/core/src/pb/common.pb.cc @@ -225,7 +225,7 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE( "StateNone\020\000\022\014\n\010Unissued\020\001\022\016\n\nInProgress\020" "\002\022\014\n\010Finished\020\003\022\n\n\006Failed\020\004*X\n\014SegmentSt" "ate\022\024\n\020SegmentStateNone\020\000\022\014\n\010NotExist\020\001\022" - "\013\n\007Growing\020\002\022\n\n\006Sealed\020\003\022\013\n\007Flushed\020\004*\226\006" + "\013\n\007Growing\020\002\022\n\n\006Sealed\020\003\022\013\n\007Flushed\020\004*\272\006" "\n\007MsgType\022\r\n\tUndefined\020\000\022\024\n\020CreateCollec" "tion\020d\022\022\n\016DropCollection\020e\022\021\n\rHasCollect" "ion\020f\022\026\n\022DescribeCollection\020g\022\023\n\017ShowCol" @@ -241,14 +241,15 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE( "\013\n\006Search\020\364\003\022\021\n\014SearchResult\020\365\003\022\022\n\rGetIn" "dexState\020\366\003\022\032\n\025GetIndexBuildProgress\020\367\003\022" "\034\n\027GetCollectionStatistics\020\370\003\022\033\n\026GetPart" - "itionStatistics\020\371\003\022\020\n\013SegmentInfo\020\330\004\022\r\n\010" - "TimeTick\020\260\t\022\023\n\016QueryNodeStats\020\261\t\022\016\n\tLoad" - "Index\020\262\t\022\016\n\tRequestID\020\263\t\022\017\n\nRequestTSO\020\264" - "\t\022\024\n\017AllocateSegment\020\265\t\022\026\n\021SegmentStatis" - "tics\020\266\t\022\025\n\020SegmentFlushDone\020\267\t*\"\n\007DslTyp" - "e\022\007\n\003Dsl\020\000\022\016\n\nBoolExprV1\020\001B5Z3github.com" - "/milvus-io/milvus/internal/proto/commonp" - "bb\006proto3" + "itionStatistics\020\371\003\022\r\n\010Retrieve\020\372\003\022\023\n\016Ret" + "rieveResult\020\373\003\022\020\n\013SegmentInfo\020\330\004\022\r\n\010Time" + "Tick\020\260\t\022\023\n\016QueryNodeStats\020\261\t\022\016\n\tLoadInde" + "x\020\262\t\022\016\n\tRequestID\020\263\t\022\017\n\nRequestTSO\020\264\t\022\024\n" + "\017AllocateSegment\020\265\t\022\026\n\021SegmentStatistics" + "\020\266\t\022\025\n\020SegmentFlushDone\020\267\t*\"\n\007DslType\022\007\n" + "\003Dsl\020\000\022\016\n\nBoolExprV1\020\001B5Z3github.com/mil" + "vus-io/milvus/internal/proto/commonpbb\006p" + "roto3" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = { }; @@ -263,7 +264,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once; static bool descriptor_table_common_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = { - &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 2009, + &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 2045, &descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 6, 0, schemas, file_default_instances, TableStruct_common_2eproto::offsets, file_level_metadata_common_2eproto, 6, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto, @@ -382,6 +383,8 @@ bool MsgType_IsValid(int value) { case 503: case 504: case 505: + case 506: + case 507: case 600: case 1200: case 1201: diff --git a/internal/core/src/pb/common.pb.h b/internal/core/src/pb/common.pb.h index 2f86ab3bce..335fbf3b9c 100644 --- a/internal/core/src/pb/common.pb.h +++ b/internal/core/src/pb/common.pb.h @@ -227,6 +227,8 @@ enum MsgType : int { GetIndexBuildProgress = 503, GetCollectionStatistics = 504, GetPartitionStatistics = 505, + Retrieve = 506, + RetrieveResult = 507, SegmentInfo = 600, TimeTick = 1200, QueryNodeStats = 1201, diff --git a/internal/msgstream/msg.go b/internal/msgstream/msg.go index 17af1fcf79..80d9c17dc2 100644 --- a/internal/msgstream/msg.go +++ b/internal/msgstream/msg.go @@ -342,6 +342,104 @@ func (srt *SearchResultMsg) Unmarshal(input MarshalType) (TsMsg, error) { return searchResultMsg, nil } +////////////////////////////////////////Retrieve///////////////////////////////////////// +type RetrieveMsg struct { + BaseMsg + internalpb.RetrieveRequest +} + +func (rm *RetrieveMsg) TraceCtx() context.Context { + return rm.BaseMsg.Ctx +} + +func (rm *RetrieveMsg) SetTraceCtx(ctx context.Context) { + rm.BaseMsg.Ctx = ctx +} + +func (rm *RetrieveMsg) ID() UniqueID { + return rm.Base.MsgID +} + +func (rm *RetrieveMsg) Type() MsgType { + return rm.Base.MsgType +} + +func (rm *RetrieveMsg) Marshal(input TsMsg) (MarshalType, error) { + retrieveTask := input.(*RetrieveMsg) + retrieveRequest := &retrieveTask.RetrieveRequest + mb, err := proto.Marshal(retrieveRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (rm *RetrieveMsg) Unmarshal(input MarshalType) (TsMsg, error) { + retrieveRequest := internalpb.RetrieveRequest{} + in, err := ConvertToByteArray(input) + if err != nil { + return nil, err + } + err = proto.Unmarshal(in, &retrieveRequest) + if err != nil { + return nil, err + } + retrieveMsg := &RetrieveMsg{RetrieveRequest: retrieveRequest} + retrieveMsg.BeginTimestamp = retrieveMsg.Base.Timestamp + retrieveMsg.EndTimestamp = retrieveMsg.Base.Timestamp + + return retrieveMsg, nil +} + +//////////////////////////////////////RetrieveResult/////////////////////////////////////// +type RetrieveResultMsg struct { + BaseMsg + internalpb.RetrieveResults +} + +func (rrm *RetrieveResultMsg) TraceCtx() context.Context { + return rrm.BaseMsg.Ctx +} + +func (rrm *RetrieveResultMsg) SetTraceCtx(ctx context.Context) { + rrm.BaseMsg.Ctx = ctx +} + +func (rrm *RetrieveResultMsg) ID() UniqueID { + return rrm.Base.MsgID +} + +func (rrm *RetrieveResultMsg) Type() MsgType { + return rrm.Base.MsgType +} + +func (rrm *RetrieveResultMsg) Marshal(input TsMsg) (MarshalType, error) { + retrieveResultTask := input.(*RetrieveResultMsg) + retrieveResultRequest := &retrieveResultTask.RetrieveResults + mb, err := proto.Marshal(retrieveResultRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (rrm *RetrieveResultMsg) Unmarshal(input MarshalType) (TsMsg, error) { + retrieveResultRequest := internalpb.RetrieveResults{} + in, err := ConvertToByteArray(input) + if err != nil { + return nil, err + } + err = proto.Unmarshal(in, &retrieveResultRequest) + if err != nil { + return nil, err + } + retrieveResultMsg := &RetrieveResultMsg{RetrieveResults: retrieveResultRequest} + retrieveResultMsg.BeginTimestamp = retrieveResultMsg.Base.Timestamp + retrieveResultMsg.EndTimestamp = retrieveResultMsg.Base.Timestamp + + return retrieveResultMsg, nil +} + /////////////////////////////////////////TimeTick////////////////////////////////////////// type TimeTickMsg struct { BaseMsg diff --git a/internal/proto/common.proto b/internal/proto/common.proto index 042b2ba7d3..63ce3fcf15 100644 --- a/internal/proto/common.proto +++ b/internal/proto/common.proto @@ -112,6 +112,8 @@ enum MsgType { GetIndexBuildProgress = 503; GetCollectionStatistics = 504; GetPartitionStatistics = 505; + Retrieve = 506; + RetrieveResult = 507; /* DATA SERVICE */ SegmentInfo = 600; diff --git a/internal/proto/commonpb/common.pb.go b/internal/proto/commonpb/common.pb.go index bf1283237e..c900eabe60 100644 --- a/internal/proto/commonpb/common.pb.go +++ b/internal/proto/commonpb/common.pb.go @@ -225,6 +225,8 @@ const ( MsgType_GetIndexBuildProgress MsgType = 503 MsgType_GetCollectionStatistics MsgType = 504 MsgType_GetPartitionStatistics MsgType = 505 + MsgType_Retrieve MsgType = 506 + MsgType_RetrieveResult MsgType = 507 // DATA SERVICE MsgType_SegmentInfo MsgType = 600 // SYSTEM CONTROL @@ -269,6 +271,8 @@ var MsgType_name = map[int32]string{ 503: "GetIndexBuildProgress", 504: "GetCollectionStatistics", 505: "GetPartitionStatistics", + 506: "Retrieve", + 507: "RetrieveResult", 600: "SegmentInfo", 1200: "TimeTick", 1201: "QueryNodeStats", @@ -311,6 +315,8 @@ var MsgType_value = map[string]int32{ "GetIndexBuildProgress": 503, "GetCollectionStatistics": 504, "GetPartitionStatistics": 505, + "Retrieve": 506, + "RetrieveResult": 507, "SegmentInfo": 600, "TimeTick": 1200, "QueryNodeStats": 1201, @@ -655,78 +661,79 @@ func init() { func init() { proto.RegisterFile("common.proto", fileDescriptor_555bd8c177793206) } var fileDescriptor_555bd8c177793206 = []byte{ - // 1164 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x55, 0xcb, 0x6e, 0xdb, 0x46, - 0x17, 0x36, 0x25, 0x45, 0x32, 0x8f, 0x64, 0x79, 0x3c, 0xbe, 0xc4, 0xc9, 0x6f, 0xfc, 0x08, 0xbc, - 0x0a, 0x0c, 0xc4, 0x6e, 0x1b, 0xb4, 0x5d, 0x65, 0x11, 0x8b, 0xb1, 0x23, 0x24, 0xbe, 0x54, 0x72, - 0x82, 0xa0, 0x9b, 0x80, 0x26, 0x8f, 0xe5, 0x69, 0xc8, 0x19, 0x75, 0x66, 0x98, 0x58, 0x6f, 0xd1, - 0x66, 0xd1, 0x37, 0xe8, 0xae, 0x2d, 0x7a, 0xef, 0x2b, 0xf4, 0xbe, 0xee, 0x23, 0xf4, 0x01, 0x7a, - 0xef, 0xa6, 0x38, 0x43, 0x4a, 0x64, 0x81, 0x74, 0x37, 0xe7, 0x9b, 0x33, 0xdf, 0xf9, 0xce, 0x8d, - 0x84, 0x4e, 0xa4, 0xd2, 0x54, 0xc9, 0xed, 0xb1, 0x56, 0x56, 0xf1, 0xe5, 0x54, 0x24, 0x4f, 0x33, - 0x93, 0x5b, 0xdb, 0xf9, 0xd5, 0xe6, 0x63, 0x68, 0x0e, 0x6d, 0x68, 0x33, 0xc3, 0x6f, 0x01, 0xa0, - 0xd6, 0x4a, 0x3f, 0x8e, 0x54, 0x8c, 0xeb, 0xde, 0x35, 0xef, 0x7a, 0xf7, 0x95, 0xff, 0x6f, 0xbf, - 0xe0, 0xcd, 0xf6, 0x1d, 0x72, 0xeb, 0xa9, 0x18, 0x07, 0x3e, 0x4e, 0x8f, 0x7c, 0x0d, 0x9a, 0x1a, - 0x43, 0xa3, 0xe4, 0x7a, 0xed, 0x9a, 0x77, 0xdd, 0x1f, 0x14, 0xd6, 0xe6, 0x6b, 0xd0, 0xb9, 0x87, - 0x93, 0x87, 0x61, 0x92, 0xe1, 0x71, 0x28, 0x34, 0x67, 0x50, 0x7f, 0x82, 0x13, 0xc7, 0xef, 0x0f, - 0xe8, 0xc8, 0x57, 0xe0, 0xd2, 0x53, 0xba, 0x2e, 0x1e, 0xe6, 0xc6, 0xe6, 0x06, 0x34, 0x76, 0x13, - 0x75, 0x5a, 0xde, 0xd2, 0x8b, 0xce, 0xf4, 0xf6, 0x06, 0xb4, 0x6e, 0xc7, 0xb1, 0x46, 0x63, 0x78, - 0x17, 0x6a, 0x62, 0x5c, 0xf0, 0xd5, 0xc4, 0x98, 0x73, 0x68, 0x8c, 0x95, 0xb6, 0x8e, 0xad, 0x3e, - 0x70, 0xe7, 0xcd, 0xe7, 0x1e, 0xb4, 0x0e, 0xcc, 0x68, 0x37, 0x34, 0xc8, 0x5f, 0x87, 0xf9, 0xd4, - 0x8c, 0x1e, 0xdb, 0xc9, 0x78, 0x9a, 0xe5, 0xc6, 0x0b, 0xb3, 0x3c, 0x30, 0xa3, 0x93, 0xc9, 0x18, - 0x07, 0xad, 0x34, 0x3f, 0x90, 0x92, 0xd4, 0x8c, 0xfa, 0x41, 0xc1, 0x9c, 0x1b, 0x7c, 0x03, 0x7c, - 0x2b, 0x52, 0x34, 0x36, 0x4c, 0xc7, 0xeb, 0xf5, 0x6b, 0xde, 0xf5, 0xc6, 0xa0, 0x04, 0xf8, 0x55, - 0x98, 0x37, 0x2a, 0xd3, 0x11, 0xf6, 0x83, 0xf5, 0x86, 0x7b, 0x36, 0xb3, 0x37, 0x6f, 0x81, 0x7f, - 0x60, 0x46, 0x77, 0x31, 0x8c, 0x51, 0xf3, 0x97, 0xa0, 0x71, 0x1a, 0x9a, 0x5c, 0x51, 0xfb, 0xbf, - 0x15, 0x51, 0x06, 0x03, 0xe7, 0xb9, 0xf5, 0x7e, 0x03, 0xfc, 0x59, 0x27, 0x78, 0x1b, 0x5a, 0xc3, - 0x2c, 0x8a, 0xd0, 0x18, 0x36, 0xc7, 0x97, 0x61, 0xf1, 0x81, 0xc4, 0x8b, 0x31, 0x46, 0x16, 0x63, - 0xe7, 0xc3, 0x3c, 0xbe, 0x04, 0x0b, 0x3d, 0x25, 0x25, 0x46, 0x76, 0x2f, 0x14, 0x09, 0xc6, 0xac, - 0xc6, 0x57, 0x80, 0x1d, 0xa3, 0x4e, 0x85, 0x31, 0x42, 0xc9, 0x00, 0xa5, 0xc0, 0x98, 0xd5, 0xf9, - 0x65, 0x58, 0xee, 0xa9, 0x24, 0xc1, 0xc8, 0x0a, 0x25, 0x0f, 0x95, 0xbd, 0x73, 0x21, 0x8c, 0x35, - 0xac, 0x41, 0xb4, 0xfd, 0x24, 0xc1, 0x51, 0x98, 0xdc, 0xd6, 0xa3, 0x2c, 0x45, 0x69, 0xd9, 0x25, - 0xe2, 0x28, 0xc0, 0x40, 0xa4, 0x28, 0x89, 0x89, 0xb5, 0x2a, 0x68, 0x5f, 0xc6, 0x78, 0x41, 0xf5, - 0x63, 0xf3, 0xfc, 0x0a, 0xac, 0x16, 0x68, 0x25, 0x40, 0x98, 0x22, 0xf3, 0xf9, 0x22, 0xb4, 0x8b, - 0xab, 0x93, 0xa3, 0xe3, 0x7b, 0x0c, 0x2a, 0x0c, 0x03, 0xf5, 0x6c, 0x80, 0x91, 0xd2, 0x31, 0x6b, - 0x57, 0x24, 0x3c, 0xc4, 0xc8, 0x2a, 0xdd, 0x0f, 0x58, 0x87, 0x04, 0x17, 0xe0, 0x10, 0x43, 0x1d, - 0x9d, 0x0f, 0xd0, 0x64, 0x89, 0x65, 0x0b, 0x9c, 0x41, 0x67, 0x4f, 0x24, 0x78, 0xa8, 0xec, 0x9e, - 0xca, 0x64, 0xcc, 0xba, 0xbc, 0x0b, 0x70, 0x80, 0x36, 0x2c, 0x2a, 0xb0, 0x48, 0x61, 0x7b, 0x61, - 0x74, 0x8e, 0x05, 0xc0, 0xf8, 0x1a, 0xf0, 0x5e, 0x28, 0xa5, 0xb2, 0x3d, 0x8d, 0xa1, 0xc5, 0x3d, - 0x95, 0xc4, 0xa8, 0xd9, 0x12, 0xc9, 0xf9, 0x17, 0x2e, 0x12, 0x64, 0xbc, 0xf4, 0x0e, 0x30, 0xc1, - 0x99, 0xf7, 0x72, 0xe9, 0x5d, 0xe0, 0xe4, 0xbd, 0x42, 0xe2, 0x77, 0x33, 0x91, 0xc4, 0xae, 0x24, - 0x79, 0x5b, 0x56, 0x49, 0x63, 0x21, 0xfe, 0xf0, 0x7e, 0x7f, 0x78, 0xc2, 0xd6, 0xf8, 0x2a, 0x2c, - 0x15, 0xc8, 0x01, 0x5a, 0x2d, 0x22, 0x57, 0xbc, 0xcb, 0x24, 0xf5, 0x28, 0xb3, 0x47, 0x67, 0x07, - 0x98, 0x2a, 0x3d, 0x61, 0xeb, 0xd4, 0x50, 0xc7, 0x34, 0x6d, 0x11, 0xbb, 0xc2, 0x39, 0x2c, 0x04, - 0xc1, 0x00, 0xdf, 0xce, 0xd0, 0xd8, 0x41, 0x18, 0x21, 0xfb, 0xb9, 0xb5, 0xf5, 0x08, 0xc0, 0xb9, - 0xd1, 0x9a, 0x23, 0xe7, 0xd0, 0x2d, 0xad, 0x43, 0x25, 0x91, 0xcd, 0xf1, 0x0e, 0xcc, 0x3f, 0x90, - 0xc2, 0x98, 0x0c, 0x63, 0xe6, 0x51, 0x89, 0xfa, 0xf2, 0x58, 0xab, 0x11, 0x6d, 0x17, 0xab, 0xd1, - 0xed, 0x9e, 0x90, 0xc2, 0x9c, 0xbb, 0xe1, 0x00, 0x68, 0x16, 0xb5, 0x6a, 0x6c, 0x3d, 0x82, 0xce, - 0x10, 0x47, 0x34, 0x07, 0x39, 0xf7, 0x0a, 0xb0, 0xaa, 0x5d, 0xb2, 0xcf, 0x14, 0x7a, 0x34, 0xa7, - 0xfb, 0x5a, 0x3d, 0x13, 0x72, 0xc4, 0x6a, 0x44, 0x36, 0xc4, 0x30, 0x71, 0xc4, 0x6d, 0x68, 0xed, - 0x25, 0x99, 0x8b, 0xd2, 0xd8, 0x7a, 0xaf, 0xe9, 0xf6, 0xd5, 0xad, 0xdd, 0x02, 0xf8, 0x0f, 0x64, - 0x8c, 0x67, 0x42, 0x62, 0xcc, 0xe6, 0x5c, 0x69, 0x5d, 0x0b, 0xca, 0x11, 0x62, 0x31, 0xa5, 0x15, - 0x68, 0x35, 0xae, 0x60, 0x48, 0xf5, 0xb9, 0x1b, 0x9a, 0x0a, 0x74, 0x46, 0xfd, 0x0a, 0xd0, 0x44, - 0x5a, 0x9c, 0x56, 0x9f, 0x8f, 0xa8, 0x33, 0xc3, 0x73, 0xf5, 0xac, 0xc4, 0x0c, 0x3b, 0xa7, 0x48, - 0xfb, 0x68, 0x87, 0x13, 0x63, 0x31, 0xed, 0x29, 0x79, 0x26, 0x46, 0x86, 0x09, 0x8a, 0x74, 0x5f, - 0x85, 0x71, 0xe5, 0xf9, 0x5b, 0xd4, 0xb1, 0x01, 0x26, 0x18, 0x9a, 0x2a, 0xeb, 0x13, 0xbe, 0x02, - 0x8b, 0xb9, 0xd4, 0xe3, 0x50, 0x5b, 0xe1, 0xc0, 0xaf, 0x3d, 0xd7, 0x23, 0xad, 0xc6, 0x25, 0xf6, - 0x0d, 0xed, 0x66, 0xe7, 0x6e, 0x68, 0x4a, 0xe8, 0x5b, 0x8f, 0xaf, 0xc1, 0xd2, 0x54, 0x6a, 0x89, - 0x7f, 0xe7, 0xf1, 0x65, 0xe8, 0x92, 0xd4, 0x19, 0x66, 0xd8, 0xf7, 0x0e, 0x24, 0x51, 0x15, 0xf0, - 0x07, 0xc7, 0x50, 0xa8, 0xaa, 0xe0, 0x3f, 0xba, 0x60, 0xc4, 0x50, 0xb4, 0xca, 0xb0, 0xbf, 0x3c, - 0x52, 0x3a, 0x0d, 0x56, 0xc0, 0xec, 0x6f, 0x8f, 0x33, 0x68, 0xe7, 0xfa, 0xdd, 0xc4, 0xb0, 0x0f, - 0x6a, 0x4e, 0x7b, 0xe1, 0x97, 0x63, 0x1f, 0xd6, 0x78, 0x17, 0x7c, 0xca, 0x27, 0xb7, 0x3f, 0xaa, - 0xf1, 0x36, 0x34, 0xfb, 0xd2, 0xa0, 0xb6, 0xec, 0x1d, 0xea, 0x6a, 0x33, 0x5f, 0x01, 0xf6, 0x2e, - 0xcd, 0xce, 0x25, 0xd7, 0x62, 0xf6, 0xdc, 0x5d, 0xe4, 0xcb, 0xca, 0x7e, 0xa9, 0x3b, 0x45, 0xd5, - 0xcd, 0xfd, 0xb5, 0x4e, 0x91, 0xf6, 0xd1, 0x96, 0xa3, 0xca, 0x7e, 0xab, 0xf3, 0xab, 0xb0, 0x3a, - 0xc5, 0xdc, 0x1e, 0xcd, 0x86, 0xf4, 0xf7, 0x3a, 0xdf, 0x80, 0xcb, 0xfb, 0x68, 0xcb, 0xf2, 0xd3, - 0x23, 0x61, 0xac, 0x88, 0x0c, 0xfb, 0xa3, 0xce, 0xff, 0x07, 0x6b, 0xfb, 0x68, 0x67, 0x65, 0xa8, - 0x5c, 0xfe, 0x59, 0xa7, 0x34, 0x8b, 0xa4, 0xfb, 0xf2, 0x4c, 0xb1, 0x9f, 0x1a, 0x7c, 0x01, 0xe6, - 0x4f, 0x44, 0x8a, 0x27, 0x22, 0x7a, 0xc2, 0x3e, 0xf6, 0xa9, 0xba, 0x6f, 0x64, 0xa8, 0x27, 0x87, - 0x2a, 0x46, 0x7a, 0x6a, 0xd8, 0x27, 0x3e, 0xa5, 0x4d, 0x25, 0xcf, 0xd3, 0xfe, 0xd4, 0xd9, 0xc5, - 0xe2, 0xf5, 0x03, 0xf6, 0x19, 0x7d, 0xd0, 0xa0, 0xb0, 0x4f, 0x86, 0x47, 0xec, 0x73, 0x9f, 0x6a, - 0x7c, 0x3b, 0x49, 0x54, 0x14, 0xda, 0x59, 0x8d, 0xbf, 0xf0, 0xa9, 0x49, 0x95, 0x9d, 0x29, 0x44, - 0x7d, 0xe9, 0xf3, 0xd5, 0xd9, 0x2e, 0xb9, 0x92, 0x05, 0xb4, 0x4b, 0x5f, 0xf9, 0x5b, 0x9b, 0xd0, - 0x0a, 0x4c, 0xe2, 0xf6, 0xa2, 0x05, 0xf5, 0xc0, 0x24, 0x6c, 0x8e, 0x16, 0x76, 0x57, 0xa9, 0xe4, - 0xce, 0xc5, 0x58, 0x3f, 0x7c, 0x99, 0x79, 0xbb, 0xaf, 0xbe, 0x79, 0x73, 0x24, 0xec, 0x79, 0x76, - 0x4a, 0xff, 0x8d, 0x9d, 0xfc, 0x47, 0x72, 0x43, 0xa8, 0xe2, 0xb4, 0x23, 0xa4, 0x45, 0x2d, 0xc3, - 0x64, 0xc7, 0xfd, 0x5b, 0x76, 0xf2, 0x7f, 0xcb, 0xf8, 0xf4, 0xb4, 0xe9, 0xec, 0x9b, 0xff, 0x04, - 0x00, 0x00, 0xff, 0xff, 0x2d, 0x7f, 0xd4, 0x22, 0x35, 0x08, 0x00, 0x00, + // 1183 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x55, 0xcb, 0x6e, 0x1b, 0xc7, + 0x12, 0xd5, 0x90, 0x34, 0xa9, 0x29, 0x52, 0x54, 0xab, 0xf5, 0xb0, 0xec, 0x2b, 0x5c, 0x18, 0x5a, + 0x19, 0x02, 0x2c, 0xdd, 0x7b, 0x8d, 0x9b, 0xac, 0xbc, 0xb0, 0x38, 0x96, 0x4c, 0xd8, 0x7a, 0x64, + 0x28, 0x1b, 0x46, 0x36, 0xc6, 0x68, 0xa6, 0x44, 0x75, 0x3c, 0xd3, 0xcd, 0x74, 0xf7, 0xd8, 0xe2, + 0x5f, 0x24, 0xfe, 0x87, 0xec, 0x92, 0x20, 0xef, 0x00, 0xf9, 0x82, 0xbc, 0xd7, 0xf9, 0x84, 0x7c, + 0x40, 0x9e, 0xf6, 0x26, 0xa8, 0x9e, 0x21, 0x39, 0x01, 0x9c, 0x5d, 0xd7, 0xe9, 0xaa, 0xd3, 0xa7, + 0xab, 0xfa, 0xcc, 0x40, 0x27, 0x56, 0x59, 0xa6, 0xe4, 0xf6, 0x48, 0x2b, 0xab, 0xf8, 0x72, 0x26, + 0xd2, 0xa7, 0xb9, 0x29, 0xa2, 0xed, 0x62, 0x6b, 0xf3, 0x31, 0x34, 0x07, 0x36, 0xb2, 0xb9, 0xe1, + 0xb7, 0x00, 0x50, 0x6b, 0xa5, 0x1f, 0xc7, 0x2a, 0xc1, 0x75, 0xef, 0x9a, 0x77, 0xbd, 0xfb, 0xbf, + 0x7f, 0x6f, 0xbf, 0xa2, 0x66, 0xfb, 0x0e, 0xa5, 0xf5, 0x54, 0x82, 0xa1, 0x8f, 0x93, 0x25, 0x5f, + 0x83, 0xa6, 0xc6, 0xc8, 0x28, 0xb9, 0x5e, 0xbb, 0xe6, 0x5d, 0xf7, 0xc3, 0x32, 0xda, 0x7c, 0x0d, + 0x3a, 0xf7, 0x70, 0xfc, 0x30, 0x4a, 0x73, 0x3c, 0x8e, 0x84, 0xe6, 0x0c, 0xea, 0x4f, 0x70, 0xec, + 0xf8, 0xfd, 0x90, 0x96, 0x7c, 0x05, 0x2e, 0x3d, 0xa5, 0xed, 0xb2, 0xb0, 0x08, 0x36, 0x37, 0xa0, + 0xb1, 0x9b, 0xaa, 0xd3, 0xd9, 0x2e, 0x55, 0x74, 0x26, 0xbb, 0x37, 0xa0, 0x75, 0x3b, 0x49, 0x34, + 0x1a, 0xc3, 0xbb, 0x50, 0x13, 0xa3, 0x92, 0xaf, 0x26, 0x46, 0x9c, 0x43, 0x63, 0xa4, 0xb4, 0x75, + 0x6c, 0xf5, 0xd0, 0xad, 0x37, 0x9f, 0x7b, 0xd0, 0x3a, 0x30, 0xc3, 0xdd, 0xc8, 0x20, 0x7f, 0x1d, + 0xe6, 0x33, 0x33, 0x7c, 0x6c, 0xc7, 0xa3, 0xc9, 0x2d, 0x37, 0x5e, 0x79, 0xcb, 0x03, 0x33, 0x3c, + 0x19, 0x8f, 0x30, 0x6c, 0x65, 0xc5, 0x82, 0x94, 0x64, 0x66, 0xd8, 0x0f, 0x4a, 0xe6, 0x22, 0xe0, + 0x1b, 0xe0, 0x5b, 0x91, 0xa1, 0xb1, 0x51, 0x36, 0x5a, 0xaf, 0x5f, 0xf3, 0xae, 0x37, 0xc2, 0x19, + 0xc0, 0xaf, 0xc2, 0xbc, 0x51, 0xb9, 0x8e, 0xb1, 0x1f, 0xac, 0x37, 0x5c, 0xd9, 0x34, 0xde, 0xbc, + 0x05, 0xfe, 0x81, 0x19, 0xde, 0xc5, 0x28, 0x41, 0xcd, 0xff, 0x03, 0x8d, 0xd3, 0xc8, 0x14, 0x8a, + 0xda, 0xff, 0xac, 0x88, 0x6e, 0x10, 0xba, 0xcc, 0xad, 0xf7, 0x1a, 0xe0, 0x4f, 0x27, 0xc1, 0xdb, + 0xd0, 0x1a, 0xe4, 0x71, 0x8c, 0xc6, 0xb0, 0x39, 0xbe, 0x0c, 0x8b, 0x0f, 0x24, 0x5e, 0x8c, 0x30, + 0xb6, 0x98, 0xb8, 0x1c, 0xe6, 0xf1, 0x25, 0x58, 0xe8, 0x29, 0x29, 0x31, 0xb6, 0x7b, 0x91, 0x48, + 0x31, 0x61, 0x35, 0xbe, 0x02, 0xec, 0x18, 0x75, 0x26, 0x8c, 0x11, 0x4a, 0x06, 0x28, 0x05, 0x26, + 0xac, 0xce, 0x2f, 0xc3, 0x72, 0x4f, 0xa5, 0x29, 0xc6, 0x56, 0x28, 0x79, 0xa8, 0xec, 0x9d, 0x0b, + 0x61, 0xac, 0x61, 0x0d, 0xa2, 0xed, 0xa7, 0x29, 0x0e, 0xa3, 0xf4, 0xb6, 0x1e, 0xe6, 0x19, 0x4a, + 0xcb, 0x2e, 0x11, 0x47, 0x09, 0x06, 0x22, 0x43, 0x49, 0x4c, 0xac, 0x55, 0x41, 0xfb, 0x32, 0xc1, + 0x0b, 0xea, 0x1f, 0x9b, 0xe7, 0x57, 0x60, 0xb5, 0x44, 0x2b, 0x07, 0x44, 0x19, 0x32, 0x9f, 0x2f, + 0x42, 0xbb, 0xdc, 0x3a, 0x39, 0x3a, 0xbe, 0xc7, 0xa0, 0xc2, 0x10, 0xaa, 0x67, 0x21, 0xc6, 0x4a, + 0x27, 0xac, 0x5d, 0x91, 0xf0, 0x10, 0x63, 0xab, 0x74, 0x3f, 0x60, 0x1d, 0x12, 0x5c, 0x82, 0x03, + 0x8c, 0x74, 0x7c, 0x1e, 0xa2, 0xc9, 0x53, 0xcb, 0x16, 0x38, 0x83, 0xce, 0x9e, 0x48, 0xf1, 0x50, + 0xd9, 0x3d, 0x95, 0xcb, 0x84, 0x75, 0x79, 0x17, 0xe0, 0x00, 0x6d, 0x54, 0x76, 0x60, 0x91, 0x8e, + 0xed, 0x45, 0xf1, 0x39, 0x96, 0x00, 0xe3, 0x6b, 0xc0, 0x7b, 0x91, 0x94, 0xca, 0xf6, 0x34, 0x46, + 0x16, 0xf7, 0x54, 0x9a, 0xa0, 0x66, 0x4b, 0x24, 0xe7, 0x6f, 0xb8, 0x48, 0x91, 0xf1, 0x59, 0x76, + 0x80, 0x29, 0x4e, 0xb3, 0x97, 0x67, 0xd9, 0x25, 0x4e, 0xd9, 0x2b, 0x24, 0x7e, 0x37, 0x17, 0x69, + 0xe2, 0x5a, 0x52, 0x8c, 0x65, 0x95, 0x34, 0x96, 0xe2, 0x0f, 0xef, 0xf7, 0x07, 0x27, 0x6c, 0x8d, + 0xaf, 0xc2, 0x52, 0x89, 0x1c, 0xa0, 0xd5, 0x22, 0x76, 0xcd, 0xbb, 0x4c, 0x52, 0x8f, 0x72, 0x7b, + 0x74, 0x76, 0x80, 0x99, 0xd2, 0x63, 0xb6, 0x4e, 0x03, 0x75, 0x4c, 0x93, 0x11, 0xb1, 0x2b, 0x9c, + 0xc3, 0x42, 0x10, 0x84, 0xf8, 0x76, 0x8e, 0xc6, 0x86, 0x51, 0x8c, 0xec, 0xe7, 0xd6, 0xd6, 0x23, + 0x00, 0x97, 0x46, 0x36, 0x47, 0xce, 0xa1, 0x3b, 0x8b, 0x0e, 0x95, 0x44, 0x36, 0xc7, 0x3b, 0x30, + 0xff, 0x40, 0x0a, 0x63, 0x72, 0x4c, 0x98, 0x47, 0x2d, 0xea, 0xcb, 0x63, 0xad, 0x86, 0xe4, 0x2e, + 0x56, 0xa3, 0xdd, 0x3d, 0x21, 0x85, 0x39, 0x77, 0x8f, 0x03, 0xa0, 0x59, 0xf6, 0xaa, 0xb1, 0xf5, + 0x08, 0x3a, 0x03, 0x1c, 0xd2, 0x3b, 0x28, 0xb8, 0x57, 0x80, 0x55, 0xe3, 0x19, 0xfb, 0x54, 0xa1, + 0x47, 0xef, 0x74, 0x5f, 0xab, 0x67, 0x42, 0x0e, 0x59, 0x8d, 0xc8, 0x06, 0x18, 0xa5, 0x8e, 0xb8, + 0x0d, 0xad, 0xbd, 0x34, 0x77, 0xa7, 0x34, 0xb6, 0xbe, 0x6a, 0x3a, 0xbf, 0x3a, 0xdb, 0x2d, 0x80, + 0xff, 0x40, 0x26, 0x78, 0x26, 0x24, 0x26, 0x6c, 0xce, 0xb5, 0xd6, 0x8d, 0x60, 0xf6, 0x84, 0x58, + 0x42, 0xd7, 0x0a, 0xb4, 0x1a, 0x55, 0x30, 0xa4, 0xfe, 0xdc, 0x8d, 0x4c, 0x05, 0x3a, 0xa3, 0x79, + 0x05, 0x68, 0x62, 0x2d, 0x4e, 0xab, 0xe5, 0x43, 0x9a, 0xcc, 0xe0, 0x5c, 0x3d, 0x9b, 0x61, 0x86, + 0x9d, 0xd3, 0x49, 0xfb, 0x68, 0x07, 0x63, 0x63, 0x31, 0xeb, 0x29, 0x79, 0x26, 0x86, 0x86, 0x09, + 0x3a, 0xe9, 0xbe, 0x8a, 0x92, 0x4a, 0xf9, 0x5b, 0x34, 0xb1, 0x10, 0x53, 0x8c, 0x4c, 0x95, 0xf5, + 0x09, 0x5f, 0x81, 0xc5, 0x42, 0xea, 0x71, 0xa4, 0xad, 0x70, 0xe0, 0xd7, 0x9e, 0x9b, 0x91, 0x56, + 0xa3, 0x19, 0xf6, 0x0d, 0x79, 0xb3, 0x73, 0x37, 0x32, 0x33, 0xe8, 0x5b, 0x8f, 0xaf, 0xc1, 0xd2, + 0x44, 0xea, 0x0c, 0xff, 0xce, 0xe3, 0xcb, 0xd0, 0x25, 0xa9, 0x53, 0xcc, 0xb0, 0xef, 0x1d, 0x48, + 0xa2, 0x2a, 0xe0, 0x0f, 0x8e, 0xa1, 0x54, 0x55, 0xc1, 0x7f, 0x74, 0x87, 0x11, 0x43, 0x39, 0x2a, + 0xc3, 0x5e, 0x78, 0xa4, 0x74, 0x72, 0x58, 0x09, 0xb3, 0x97, 0x1e, 0x67, 0xd0, 0x2e, 0xf4, 0xbb, + 0x17, 0xc3, 0xde, 0xaf, 0x39, 0xed, 0x65, 0x5e, 0x81, 0x7d, 0x50, 0xe3, 0x5d, 0xf0, 0xe9, 0x3e, + 0x45, 0xfc, 0x61, 0x8d, 0xb7, 0xa1, 0xd9, 0x97, 0x06, 0xb5, 0x65, 0xef, 0xd0, 0x54, 0x9b, 0x85, + 0x05, 0xd8, 0xbb, 0xf4, 0x76, 0x2e, 0xb9, 0x11, 0xb3, 0xe7, 0x6e, 0xa3, 0x30, 0x2b, 0xfb, 0xa5, + 0xee, 0x14, 0x55, 0x9d, 0xfb, 0x6b, 0x9d, 0x4e, 0xda, 0x47, 0x3b, 0x7b, 0xaa, 0xec, 0xb7, 0x3a, + 0xbf, 0x0a, 0xab, 0x13, 0xcc, 0xf9, 0x68, 0xfa, 0x48, 0x7f, 0xaf, 0xf3, 0x0d, 0xb8, 0xbc, 0x8f, + 0x76, 0xd6, 0x7e, 0x2a, 0x12, 0xc6, 0x8a, 0xd8, 0xb0, 0x3f, 0xea, 0xfc, 0x5f, 0xb0, 0xb6, 0x8f, + 0x76, 0xda, 0x86, 0xca, 0xe6, 0x9f, 0x75, 0xbe, 0x00, 0xf3, 0x21, 0x19, 0x0d, 0x9f, 0x22, 0x7b, + 0x51, 0xa7, 0x5e, 0x4e, 0xc2, 0x52, 0xce, 0xcb, 0x3a, 0xb5, 0xa2, 0x6c, 0x4c, 0x5f, 0x9e, 0x29, + 0xf6, 0x53, 0x83, 0xaa, 0x4e, 0x44, 0x86, 0x27, 0x22, 0x7e, 0xc2, 0x3e, 0xf2, 0xa9, 0xea, 0x8d, + 0x1c, 0xf5, 0xf8, 0x50, 0x25, 0x48, 0xf4, 0x86, 0x7d, 0xec, 0x53, 0x6b, 0x68, 0x2c, 0x45, 0x6b, + 0x3e, 0x71, 0x71, 0x69, 0xce, 0x7e, 0xc0, 0x3e, 0xa5, 0x8f, 0x1e, 0x94, 0xf1, 0xc9, 0xe0, 0x88, + 0x7d, 0xe6, 0xd3, 0x1c, 0x6e, 0xa7, 0xa9, 0x8a, 0x23, 0x3b, 0x9d, 0xc3, 0xe7, 0x3e, 0x0d, 0xb2, + 0xe2, 0xab, 0x52, 0xf8, 0x17, 0x3e, 0x5f, 0x9d, 0xfa, 0xcd, 0xb5, 0x35, 0x20, 0xbf, 0x7d, 0xe9, + 0x6f, 0x6d, 0x42, 0x2b, 0x30, 0xa9, 0xf3, 0x4e, 0x0b, 0xea, 0x81, 0x49, 0xd9, 0x1c, 0x99, 0x7a, + 0x57, 0xa9, 0xf4, 0xce, 0xc5, 0x48, 0x3f, 0xfc, 0x2f, 0xf3, 0x76, 0xff, 0xff, 0xe6, 0xcd, 0xa1, + 0xb0, 0xe7, 0xf9, 0x29, 0xfd, 0x5b, 0x76, 0x8a, 0x9f, 0xcd, 0x0d, 0xa1, 0xca, 0xd5, 0x8e, 0x90, + 0x16, 0xb5, 0x8c, 0xd2, 0x1d, 0xf7, 0xff, 0xd9, 0x29, 0xfe, 0x3f, 0xa3, 0xd3, 0xd3, 0xa6, 0x8b, + 0x6f, 0xfe, 0x15, 0x00, 0x00, 0xff, 0xff, 0x77, 0x76, 0x64, 0xb2, 0x59, 0x08, 0x00, 0x00, } diff --git a/internal/proxynode/paramtable.go b/internal/proxynode/paramtable.go index a5e9b497a5..877c14ebf4 100644 --- a/internal/proxynode/paramtable.go +++ b/internal/proxynode/paramtable.go @@ -47,21 +47,23 @@ type ParamTable struct { MasterAddress string PulsarAddress string - QueryNodeNum int - QueryNodeIDList []UniqueID - ProxyID UniqueID - TimeTickInterval time.Duration - K2SChannelNames []string - SearchChannelNames []string - SearchResultChannelNames []string - ProxySubName string - ProxyTimeTickChannelNames []string - MsgStreamTimeTickBufSize int64 - MaxNameLength int64 - MaxFieldNum int64 - MaxDimension int64 - DefaultPartitionName string - DefaultIndexName string + QueryNodeNum int + QueryNodeIDList []UniqueID + ProxyID UniqueID + TimeTickInterval time.Duration + K2SChannelNames []string + SearchChannelNames []string + SearchResultChannelNames []string + RetrieveChannelNames []string + RetrieveResultChannelNames []string + ProxySubName string + ProxyTimeTickChannelNames []string + MsgStreamTimeTickBufSize int64 + MaxNameLength int64 + MaxFieldNum int64 + MaxDimension int64 + DefaultPartitionName string + DefaultIndexName string PulsarMaxMessageSize int Log log.Config diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index f25ded4789..3ecbb01eaa 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -936,6 +936,138 @@ func (st *SearchTask) PostExecute(ctx context.Context) error { } } +type RetrieveTask struct { + Condition + *internalpb.RetrieveRequest + ctx context.Context + queryMsgStream msgstream.MsgStream + resultBuf chan []*internalpb.RetrieveResults + result *milvuspb.RetrieveResults + retrieve *milvuspb.RetrieveRequest +} + +func (rt *RetrieveTask) TraceCtx() context.Context { + return rt.ctx +} + +func (rt *RetrieveTask) ID() UniqueID { + return rt.Base.MsgID +} + +func (rt *RetrieveTask) SetID(uid UniqueID) { + rt.Base.MsgID = uid +} + +func (rt *RetrieveTask) Name() string { + return RetrieveTaskName +} + +func (rt *RetrieveTask) Type() commonpb.MsgType { + return rt.Base.MsgType +} + +func (rt *RetrieveTask) BeginTs() Timestamp { + return rt.Base.Timestamp +} + +func (rt *RetrieveTask) EndTs() Timestamp { + return rt.Base.Timestamp +} + +func (rt *RetrieveTask) SetTs(ts Timestamp) { + rt.Base.Timestamp = ts +} + +func (rt *RetrieveTask) OnEnqueue() error { + rt.Base.MsgType = commonpb.MsgType_Retrieve + return nil +} + +func (rt *RetrieveTask) PreExecute(ctx context.Context) error { + rt.Base.MsgType = commonpb.MsgType_Retrieve + rt.Base.SourceID = Params.ProxyID + + collectionName := rt.retrieve.CollectionName + collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) + if err != nil { + return err + } + + if err := ValidateCollectionName(rt.retrieve.CollectionName); err != nil { + return err + } + + for _, tag := range rt.retrieve.PartitionNames { + if err := ValidatePartitionTag(tag, false); err != nil { + return err + } + } + + rt.Base.MsgType = commonpb.MsgType_Retrieve + rt.Ids = rt.retrieve.Ids + rt.OutputFields = rt.retrieve.OutputFields + + rt.ResultChannelID = Params.RetrieveChannelNames[0] + rt.DbID = 0 // todo(yukun) + + rt.CollectionID = collectionID + rt.PartitionIDs = make([]UniqueID, 0) + + partitionsMap, err := globalMetaCache.GetPartitions(ctx, collectionName) + if err != nil { + return err + } + + partitionsRecord := make(map[UniqueID]bool) + for _, partitionName := range rt.retrieve.PartitionNames { + pattern := fmt.Sprintf("^%s$", partitionName) + re, err := regexp.Compile(pattern) + if err != nil { + return errors.New("invalid partition names") + } + found := false + for name, pID := range partitionsMap { + if re.MatchString(name) { + if _, exist := partitionsRecord[pID]; !exist { + rt.PartitionIDs = append(rt.PartitionIDs, pID) + partitionsRecord[pID] = true + } + found = true + } + } + if !found { + errMsg := fmt.Sprintf("PartitonName: %s not found", partitionName) + return errors.New(errMsg) + } + } + + return nil +} + +func (rt *RetrieveTask) Execute(ctx context.Context) error { + var tsMsg msgstream.TsMsg = &msgstream.RetrieveMsg{ + RetrieveRequest: *rt.RetrieveRequest, + BaseMsg: msgstream.BaseMsg{ + Ctx: ctx, + HashValues: []uint32{uint32(Params.ProxyID)}, + BeginTimestamp: rt.Base.Timestamp, + EndTimestamp: rt.Base.Timestamp, + }, + } + msgPack := msgstream.MsgPack{ + BeginTs: rt.Base.Timestamp, + EndTs: rt.Base.Timestamp, + Msgs: make([]msgstream.TsMsg, 1), + } + msgPack.Msgs[0] = tsMsg + err := rt.queryMsgStream.Produce(&msgPack) + log.Debug("proxynode", zap.Int("length of retrieveMsg", len(msgPack.Msgs))) + if err != nil { + log.Debug("proxynode", zap.String("send retrieve request failed", err.Error())) + } + return err +} + type HasCollectionTask struct { Condition *milvuspb.HasCollectionRequest