Fix proxy bug

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/4973/head^2
zhenshan.cao 2020-11-09 17:52:47 +08:00 committed by yefu.chen
parent be0a2e7e5f
commit 87a4495c59
10 changed files with 155 additions and 5696 deletions

View File

@ -50,7 +50,7 @@ func main() {
<-ctx.Done()
log.Print("Got signal to exit", zap.String("signal", sig.String()))
//svr.Close()
svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)

View File

@ -2,10 +2,10 @@ package msgstream
import (
"context"
"github.com/gogo/protobuf/proto"
"log"
"sync"
"github.com/gogo/protobuf/proto"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/apache/pulsar-client-go/pulsar"
@ -42,7 +42,7 @@ type PulsarMsgStream struct {
unmarshal *UnmarshalDispatcher
receiveBuf chan *MsgPack
receiveBufSize int64
wait sync.WaitGroup
wait *sync.WaitGroup
streamCancel func()
}
@ -99,6 +99,7 @@ func (ms *PulsarMsgStream) SetRepackFunc(repackFunc RepackFunc) {
}
func (ms *PulsarMsgStream) Start() {
ms.wait = &sync.WaitGroup{}
if ms.consumers != nil {
ms.wait.Add(1)
go ms.bufMsgPackToChannel()
@ -131,13 +132,13 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
return nil
}
reBucketValues := make([][]int32, len(tsMsgs))
for channelId, tsMsg := range tsMsgs {
for channelID, tsMsg := range tsMsgs {
hashValues := (*tsMsg).HashKeys()
bucketValues := make([]int32, len(hashValues))
for index, hashValue := range hashValues {
bucketValues[index] = hashValue % int32(len(ms.producers))
}
reBucketValues[channelId] = bucketValues
reBucketValues[channelID] = bucketValues
}
var result map[int32]*MsgPack
@ -147,13 +148,13 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
result = make(map[int32]*MsgPack)
for i, request := range tsMsgs {
keys := reBucketValues[i]
for _, channelId := range keys {
_, ok := result[channelId]
if ok == false {
for _, channelID := range keys {
_, ok := result[channelID]
if !ok {
msgPack := MsgPack{}
result[channelId] = &msgPack
result[channelID] = &msgPack
}
result[channelId].Msgs = append(result[channelId].Msgs, request)
result[channelID].Msgs = append(result[channelID].Msgs, request)
}
}
}
@ -223,7 +224,7 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() {
chanLen := len(consumerChan)
for l := 0; l < chanLen; l++ {
pulsarMsg, ok := <-consumerChan
if ok == false {
if !ok {
log.Printf("channel closed")
return
}
@ -271,8 +272,11 @@ func NewPulsarTtMsgStream(ctx context.Context, receiveBufSize int64) *PulsarTtMs
}
func (ms *PulsarTtMsgStream) Start() {
ms.wait.Add(1)
go ms.bufMsgPackToChannel()
ms.wait = &sync.WaitGroup{}
if ms.consumers != nil {
ms.wait.Add(1)
go ms.bufMsgPackToChannel()
}
}
func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
@ -294,8 +298,8 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
}
wg.Wait()
timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp)
if ok == false {
log.Printf("timeTick err")
if !ok {
log.Printf("All timeTick's timestamps are inconsistent")
}
timeTickBuf := make([]*TsMsg, 0)
@ -333,7 +337,7 @@ func (ms *PulsarTtMsgStream) findTimeTick(channelIndex int,
case <-ms.ctx.Done():
return
case pulsarMsg, ok := <-(*ms.consumers[channelIndex]).Chan():
if ok == false {
if !ok {
log.Printf("consumer closed!")
return
}
@ -363,10 +367,10 @@ func (ms *PulsarTtMsgStream) findTimeTick(channelIndex int,
func checkTimeTickMsg(msg map[int]Timestamp) (Timestamp, bool) {
checkMap := make(map[Timestamp]int)
for _, v := range msg {
checkMap[v] += 1
checkMap[v]++
}
if len(checkMap) <= 1 {
for k, _ := range checkMap {
for k := range checkMap {
return k, true
}
}

View File

@ -13,19 +13,19 @@ func repackFunc(msgs []*TsMsg, hashKeys [][]int32) map[int32]*MsgPack {
result := make(map[int32]*MsgPack)
for i, request := range msgs {
keys := hashKeys[i]
for _, channelId := range keys {
_, ok := result[channelId]
for _, channelID := range keys {
_, ok := result[channelID]
if ok == false {
msgPack := MsgPack{}
result[channelId] = &msgPack
result[channelID] = &msgPack
}
result[channelId].Msgs = append(result[channelId].Msgs, request)
result[channelID].Msgs = append(result[channelID].Msgs, request)
}
}
return result
}
func getTsMsg(msgType MsgType, reqId UniqueID, hashValue int32) *TsMsg {
func getTsMsg(msgType MsgType, reqID UniqueID, hashValue int32) *TsMsg {
var tsMsg TsMsg
baseMsg := BaseMsg{
BeginTimestamp: 0,
@ -36,7 +36,7 @@ func getTsMsg(msgType MsgType, reqId UniqueID, hashValue int32) *TsMsg {
case internalPb.MsgType_kInsert:
insertRequest := internalPb.InsertRequest{
MsgType: internalPb.MsgType_kInsert,
ReqId: reqId,
ReqId: reqID,
CollectionName: "Collection",
PartitionTag: "Partition",
SegmentId: 1,
@ -52,7 +52,7 @@ func getTsMsg(msgType MsgType, reqId UniqueID, hashValue int32) *TsMsg {
case internalPb.MsgType_kDelete:
deleteRequest := internalPb.DeleteRequest{
MsgType: internalPb.MsgType_kDelete,
ReqId: reqId,
ReqId: reqID,
CollectionName: "Collection",
ChannelId: 1,
ProxyId: 1,
@ -67,7 +67,7 @@ func getTsMsg(msgType MsgType, reqId UniqueID, hashValue int32) *TsMsg {
case internalPb.MsgType_kSearch:
searchRequest := internalPb.SearchRequest{
MsgType: internalPb.MsgType_kSearch,
ReqId: reqId,
ReqId: reqID,
ProxyId: 1,
Timestamp: 1,
ResultChannelId: 1,
@ -81,7 +81,7 @@ func getTsMsg(msgType MsgType, reqId UniqueID, hashValue int32) *TsMsg {
searchResult := internalPb.SearchResult{
MsgType: internalPb.MsgType_kSearchResult,
Status: &commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS},
ReqId: reqId,
ReqId: reqID,
ProxyId: 1,
QueryNodeId: 1,
Timestamp: 1,
@ -95,7 +95,7 @@ func getTsMsg(msgType MsgType, reqId UniqueID, hashValue int32) *TsMsg {
case internalPb.MsgType_kTimeTick:
timeTickResult := internalPb.TimeTickMsg{
MsgType: internalPb.MsgType_kTimeTick,
PeerId: reqId,
PeerId: reqID,
Timestamp: 1,
}
timeTickMsg := &TimeTickMsg{
@ -107,7 +107,7 @@ func getTsMsg(msgType MsgType, reqId UniqueID, hashValue int32) *TsMsg {
return &tsMsg
}
func getTimeTickMsg(reqId UniqueID, hashValue int32, time uint64) *TsMsg {
func getTimeTickMsg(reqID UniqueID, hashValue int32, time uint64) *TsMsg {
var tsMsg TsMsg
baseMsg := BaseMsg{
BeginTimestamp: 0,
@ -116,7 +116,7 @@ func getTimeTickMsg(reqId UniqueID, hashValue int32, time uint64) *TsMsg {
}
timeTickResult := internalPb.TimeTickMsg{
MsgType: internalPb.MsgType_kTimeTick,
PeerId: reqId,
PeerId: reqID,
Timestamp: time,
}
timeTickMsg := &TimeTickMsg{
@ -167,6 +167,7 @@ func initPulsarTtStream(pulsarAddress string,
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
}
inputStream.Start()
var input MsgStream = inputStream
// set output stream

View File

@ -3,9 +3,10 @@ package msgstream
import (
"context"
"fmt"
"testing"
"github.com/gogo/protobuf/proto"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"testing"
)
type InsertTask struct {
@ -26,7 +27,7 @@ func (tt *InsertTask) Marshal(input *TsMsg) ([]byte, error) {
func (tt *InsertTask) Unmarshal(input []byte) (*TsMsg, error) {
insertRequest := internalPb.InsertRequest{}
err := proto.Unmarshal(input, &insertRequest)
testMsg := &InsertTask{InsertMsg: InsertMsg{InsertRequest:insertRequest}}
testMsg := &InsertTask{InsertMsg: InsertMsg{InsertRequest: insertRequest}}
testMsg.Tag = testMsg.PartitionTag
if err != nil {
@ -36,7 +37,7 @@ func (tt *InsertTask) Unmarshal(input []byte) (*TsMsg, error) {
return &tsMsg, nil
}
func getMsg(reqId UniqueID, hashValue int32) *TsMsg {
func getMsg(reqID UniqueID, hashValue int32) *TsMsg {
var tsMsg TsMsg
baseMsg := BaseMsg{
BeginTimestamp: 0,
@ -45,7 +46,7 @@ func getMsg(reqId UniqueID, hashValue int32) *TsMsg {
}
insertRequest := internalPb.InsertRequest{
MsgType: internalPb.MsgType_kInsert,
ReqId: reqId,
ReqId: reqID,
CollectionName: "Collection",
PartitionTag: "Partition",
SegmentId: 1,
@ -59,7 +60,7 @@ func getMsg(reqId UniqueID, hashValue int32) *TsMsg {
}
testTask := InsertTask{
InsertMsg:insertMsg,
InsertMsg: insertMsg,
}
tsMsg = &testTask
return &tsMsg

View File

@ -14,8 +14,8 @@ type UnmarshalDispatcher struct {
func (dispatcher *UnmarshalDispatcher) Unmarshal(input []byte, msgType internalPb.MsgType) (*TsMsg, error) {
unmarshalFunc, ok := dispatcher.tempMap[msgType]
if ok == false {
return nil, errors.New("Not set unmarshalFunc for this messageType")
if !ok {
return nil, errors.New(string("Not set unmarshalFunc for this messageType."))
}
return unmarshalFunc(input)
}

View File

@ -0,0 +1,67 @@
package msgstream
import (
"context"
"fmt"
"testing"
"github.com/gogo/protobuf/proto"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
func newInsertMsgUnmarshal(input []byte) (*TsMsg, error) {
insertRequest := internalPb.InsertRequest{}
err := proto.Unmarshal(input, &insertRequest)
insertMsg := &InsertMsg{InsertRequest: insertRequest}
fmt.Println("use func newInsertMsgUnmarshal unmarshal")
if err != nil {
return nil, err
}
var tsMsg TsMsg = insertMsg
return &tsMsg, nil
}
func TestStream_unmarshal_Insert(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3))
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
inputStream.Start()
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
//add a new unmarshall func for msgType kInsert
unmarshalDispatcher.AddMsgTemplate(internalPb.MsgType_kInsert, newInsertMsgUnmarshal)
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
inputStream.Produce(&msgPack)
receiveCount := 0
for {
result := (*outputStream).Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v, "msg tag: ")
}
}
if receiveCount >= len(msgPack.Msgs) {
break
}
}
inputStream.Close()
outputStream.Close()
}

View File

@ -1,781 +0,0 @@
syntax = "proto3";
package milvus.grpc;
option go_package="github.com/zilliztech/milvus-distributed/internal/proto/message";
enum ErrorCode {
SUCCESS = 0;
UNEXPECTED_ERROR = 1;
CONNECT_FAILED = 2;
PERMISSION_DENIED = 3;
COLLECTION_NOT_EXISTS = 4;
ILLEGAL_ARGUMENT = 5;
ILLEGAL_DIMENSION = 7;
ILLEGAL_INDEX_TYPE = 8;
ILLEGAL_COLLECTION_NAME = 9;
ILLEGAL_TOPK = 10;
ILLEGAL_ROWRECORD = 11;
ILLEGAL_VECTOR_ID = 12;
ILLEGAL_SEARCH_RESULT = 13;
FILE_NOT_FOUND = 14;
META_FAILED = 15;
CACHE_FAILED = 16;
CANNOT_CREATE_FOLDER = 17;
CANNOT_CREATE_FILE = 18;
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
ErrorCode error_code = 1;
string reason = 2;
}
/**
* @brief Field data type
*/
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
/**
* @brief General usage
*/
message KeyValuePair {
string key = 1;
string value = 2;
}
/**
* @brief Collection name
*/
message CollectionName {
string collection_name = 1;
}
/**
* @brief Collection name list
*/
message CollectionNameList {
Status status = 1;
repeated string collection_names = 2;
}
/**
* @brief Field name
*/
message FieldName {
string collection_name = 1;
string field_name = 2;
}
/**
* @brief Collection mapping
* @extra_params: key-value pair for extra parameters of the collection
* typically usage:
* extra_params["params"] = {segment_row_count: 1000000, auto_id: true}
* Note:
* the segment_row_count specify segment row count limit for merging
* the auto_id = true means entity id is auto-generated by milvus
*/
message Mapping {
Status status = 1;
string collection_name = 2;
Schema schema = 3;
repeated KeyValuePair extra_params = 4;
}
/**
* @brief Collection mapping list
*/
message MappingList {
Status status = 1;
repeated Mapping mapping_list = 2;
}
/**
* @brief Parameters of partition
*/
message PartitionParam {
string collection_name = 1;
string tag = 2;
}
/**
* @brief Partition list
*/
message PartitionList {
Status status = 1;
repeated string partition_tag_array = 2;
}
/**
* @brief Vector row record
*/
message VectorRowRecord {
repeated float float_data = 1; //float vector data
bytes binary_data = 2; //binary vector data
}
message EntityIds {
Status status = 1;
repeated int64 entity_id_array = 2;
}
message VectorRecord {
repeated VectorRowRecord records = 1;
}
message VectorParam {
string json = 1;
VectorRecord row_record = 2;
}
//////////////////////////row schema and data///////////////////////////////////
/**
* @brief schema
*/
message FieldMeta {
string field_name = 1;
DataType type = 2;
int64 dim = 3;
}
message Schema {
repeated FieldMeta field_metas = 1;
}
message RowData {
bytes blob = 1;
}
//////////////////////suvlim-proxy///////////////////////////////////
message InsertParam {
string collection_name = 1;
Schema schema = 2;
repeated RowData rows_data = 3;
repeated int64 entity_id_array = 4; //optional
string partition_tag = 5;
repeated KeyValuePair extra_params = 6;
}
message SearchParam {
string collection_name = 1;
repeated VectorParam vector_param = 2;
string dsl = 3; //optional
repeated string partition_tag = 4; //why
repeated KeyValuePair extra_params = 5;
}
message SearchInSegmentParam {
repeated string file_id_array = 1;
SearchParam search_param = 2;
}
message Entities {
Status status = 1;
repeated int64 ids = 2;
repeated bool valid_row = 3;
repeated RowData rows_data = 4;
}
///////////////////////////milvus-server///////////////////////////
/**
* @brief Query result
*/
message QueryResult {
Status status = 1;
Entities entities = 2;
int64 row_num = 3;
repeated float scores = 4;
repeated float distances = 5;
repeated KeyValuePair extra_params = 6;
int64 query_id = 7;
int64 proxy_id = 8;
}
/**
* @brief Server string Reply
*/
message StringReply {
Status status = 1;
string string_reply = 2;
}
/**
* @brief Server bool Reply
*/
message BoolReply {
Status status = 1;
bool bool_reply = 2;
}
/**
* @brief Return collection row count
*/
message CollectionRowCount {
Status status = 1;
int64 collection_row_count = 2;
}
/**
* @brief Server command parameters
*/
message Command {
string cmd = 1;
}
/**
* @brief Index params
* @collection_name: target collection
* @field_name: target field
* @index_name: a name for index provided by user, unique within this field
* @extra_params: index parameters in json format
* for vector field:
* extra_params["index_type"] = one of the values: FLAT, IVF_LAT, IVF_SQ8, NSGMIX, IVFSQ8H,
* PQ, HNSW, HNSW_SQ8NM, ANNOY
* extra_params["metric_type"] = one of the values: L2, IP, HAMMING, JACCARD, TANIMOTO
* SUBSTRUCTURE, SUPERSTRUCTURE
* extra_params["params"] = extra parameters for index, for example ivflat: {nlist: 2048}
* for structured field:
* extra_params["index_type"] = one of the values: SORTED
*/
message IndexParam {
Status status = 1;
string collection_name = 2;
string field_name = 3;
string index_name = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for flush action
*/
message FlushParam {
repeated string collection_name_array = 1;
}
/**
* @brief Parameters for flush action
*/
message CompactParam {
string collection_name = 1;
double threshold = 2;
}
/**
* @brief Parameters for deleting entities by id
*/
message DeleteByIDParam {
string collection_name = 1;
repeated int64 id_array = 2;
}
/**
* @brief Return collection stats
* @json_info: collection stats in json format, typically, the format is like:
* {
* row_count: xxx,
* data_size: xxx,
* partitions: [
* {
* tag: xxx,
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* segments: [
* {
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* files: [
* {
* field: xxx,
* name: xxx,
* index_type: xxx,
* path: xxx,
* data_size: xxx,
* }
* ]
* }
* ]
* }
* ]
* }
*/
message CollectionInfo {
Status status = 1;
string json_info = 2;
}
/**
* @brief Parameters for returning entities id of a segment
*/
message GetEntityIDsParam {
string collection_name = 1;
int64 segment_id = 2;
}
/**
* @brief Entities identity
*/
message EntityIdentity {
string collection_name = 1;
repeated int64 id_array = 2;
repeated string field_names = 3;
}
/********************************************SearchPB interface***************************************************/
/**
* @brief Vector field parameters
*/
message VectorFieldParam {
int64 dimension = 1;
}
/**
* @brief Field type
*/
message FieldType {
oneof value {
DataType data_type = 1;
VectorFieldParam vector_param = 2;
}
}
/**
* @brief Field parameters
*/
message FieldParam {
int64 id = 1;
string name = 2;
DataType type = 3;
repeated KeyValuePair index_params = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Vector field record
*/
message VectorFieldRecord {
repeated VectorRowRecord value = 1;
}
///////////////////////////////////////////////////////////////////
message TermQuery {
string field_name = 1;
repeated int64 int_value = 2;
repeated double double_value = 3;
int64 value_num = 4;
float boost = 5;
repeated KeyValuePair extra_params = 6;
}
enum CompareOperator {
LT = 0;
LTE = 1;
EQ = 2;
GT = 3;
GTE = 4;
NE = 5;
}
message CompareExpr {
CompareOperator operator = 1;
string operand = 2;
}
message RangeQuery {
string field_name = 1;
repeated CompareExpr operand = 2;
float boost = 3;
repeated KeyValuePair extra_params = 4;
}
message VectorQuery {
string field_name = 1;
float query_boost = 2;
repeated VectorRowRecord records = 3;
int64 topk = 4;
repeated KeyValuePair extra_params = 5;
}
enum Occur {
INVALID = 0;
MUST = 1;
SHOULD = 2;
MUST_NOT = 3;
}
message BooleanQuery {
Occur occur = 1;
repeated GeneralQuery general_query = 2;
}
message GeneralQuery {
oneof query {
BooleanQuery boolean_query = 1;
TermQuery term_query = 2;
RangeQuery range_query = 3;
VectorQuery vector_query = 4;
}
}
message SearchParamPB {
string collection_name = 1;
repeated string partition_tag_array = 2;
GeneralQuery general_query = 3;
repeated KeyValuePair extra_params = 4;
}
service MilvusService {
/**
* @brief This method is used to create collection
*
* @param CollectionSchema, use to provide collection information to be created.
*
* @return Status
*/
rpc CreateCollection(Mapping) returns (Status){}
/**
* @brief This method is used to test collection existence.
*
* @param CollectionName, collection name is going to be tested.
*
* @return BoolReply
*/
rpc HasCollection(CollectionName) returns (BoolReply) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionSchema
*/
rpc DescribeCollection(CollectionName) returns (Mapping) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionRowCount
*/
rpc CountCollection(CollectionName) returns (CollectionRowCount) {}
/**
* @brief This method is used to list all collections.
*
* @param Command, dummy parameter.
*
* @return CollectionNameList
*/
rpc ShowCollections(Command) returns (CollectionNameList) {}
/**
* @brief This method is used to get collection detail information.
*
* @param CollectionName, target collection name.
*
* @return CollectionInfo
*/
rpc ShowCollectionInfo(CollectionName) returns (CollectionInfo) {}
/**
* @brief This method is used to delete collection.
*
* @param CollectionName, collection name is going to be deleted.
*
* @return Status
*/
rpc DropCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to build index by collection in sync mode.
*
* @param IndexParam, index paramters.
*
* @return Status
*/
rpc CreateIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to describe index
*
* @param IndexParam, target index.
*
* @return IndexParam
*/
rpc DescribeIndex(IndexParam) returns (IndexParam) {}
/**
* @brief This method is used to drop index
*
* @param IndexParam, target field. if the IndexParam.field_name is empty, will drop all index of the collection
*
* @return Status
*/
rpc DropIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to create partition
*
* @param PartitionParam, partition parameters.
*
* @return Status
*/
rpc CreatePartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to test partition existence.
*
* @param PartitionParam, target partition.
*
* @return BoolReply
*/
rpc HasPartition(PartitionParam) returns (BoolReply) {}
/**
* @brief This method is used to show partition information
*
* @param CollectionName, target collection name.
*
* @return PartitionList
*/
rpc ShowPartitions(CollectionName) returns (PartitionList) {}
/**
* @brief This method is used to drop partition
*
* @param PartitionParam, target partition.
*
* @return Status
*/
rpc DropPartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to add vector array to collection.
*
* @param InsertParam, insert parameters.
*
* @return VectorIds
*/
rpc Insert(InsertParam) returns (EntityIds) {}
/**
* @brief This method is used to get entities data by id array.
*
* @param EntitiesIdentity, target entity id array.
*
* @return EntitiesData
*/
rpc GetEntityByID(EntityIdentity) returns (Entities) {}
/**
* @brief This method is used to get vector ids from a segment
*
* @param GetVectorIDsParam, target collection and segment
*
* @return VectorIds
*/
rpc GetEntityIDs(GetEntityIDsParam) returns (EntityIds) {}
/**
* @brief This method is used to query vector in collection.
*
* @param SearchParam, search parameters.
*
* @return KQueryResult
*/
rpc Search(SearchParam) returns (QueryResult) {}
/**
* @brief This method is used to query vector in specified files.
*
* @param SearchInSegmentParam, target segments to search.
*
* @return TopKQueryResult
*/
rpc SearchInSegment(SearchInSegmentParam) returns (QueryResult) {}
/**
* @brief This method is used to give the server status.
*
* @param Command, command string
*
* @return StringReply
*/
rpc Cmd(Command) returns (StringReply) {}
/**
* @brief This method is used to delete vector by id
*
* @param DeleteByIDParam, delete parameters.
*
* @return status
*/
rpc DeleteByID(DeleteByIDParam) returns (Status) {}
/**
* @brief This method is used to preload collection
*
* @param CollectionName, target collection name.
*
* @return Status
*/
rpc PreloadCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to flush buffer into storage.
*
* @param FlushParam, flush parameters
*
* @return Status
*/
rpc Flush(FlushParam) returns (Status) {}
/**
* @brief This method is used to compact collection
*
* @param CompactParam, compact parameters
*
* @return Status
*/
rpc Compact(CompactParam) returns (Status) {}
/********************************New Interface********************************************/
rpc SearchPB(SearchParamPB) returns (QueryResult) {}
}
////////////////////pulsar//////////////////////////////////////
enum OpType {
INSERT = 0;
DELETE = 1;
}
enum ReqType {
// general operations
kCmd = 0;
/* collection operations */
kCreateCollection = 100;
kDropCollection = 101;
kHasCollection = 102;
kListCollections = 103;
kGetCollectionInfo = 104;
kGetCollectionStats = 105;
kCountEntities = 106;
/* partition operations */
kCreatePartition = 200;
kDropPartition = 201;
kHasPartition = 202;
kListPartitions = 203;
/* index operations */
kCreateIndex = 300;
kDropIndex = 301;
kDescribeIndex = 302;
/* data operations */
kInsert = 400;
kGetEntityByID = 401;
kDeleteEntityByID = 402;
kSearch = 403;
kListIDInSegment = 404;
/* other operations */
kLoadCollection = 500;
kFlush = 501;
kCompact = 502;
}
message QueryReqMsg {
string collection_name = 1;
repeated VectorParam vector_param = 2;
repeated string partition_tags = 3;
string dsl = 4;
repeated KeyValuePair extra_params = 5;
uint64 timestamp =6;
int64 proxy_id = 7;
int64 query_id = 8;
ReqType req_type = 9;
}
message ManipulationReqMsg {
string collection_name = 1;
string partition_tag = 2;
repeated int64 primary_keys = 3;
repeated RowData rows_data = 4;
uint64 timestamp =5;
int64 segment_id = 6;
int64 channel_id = 7;
ReqType req_type = 8;
int64 proxy_id = 9;
repeated KeyValuePair extra_params = 10;
}
message InsertOrDeleteMsg {
string collection_name = 1;
RowData rows_data = 2;
int64 uid = 3; //optional
string partition_tag = 4;
uint64 timestamp =5;
int64 segment_id = 6;
int64 channel_id = 7;
OpType op = 8;
int64 client_id = 9;
repeated KeyValuePair extra_params = 10;
}
message SearchMsg {
string collection_name = 1;
VectorRowRecord records = 2;
repeated string partition_tag = 3;
int64 uid = 4;
uint64 timestamp =5;
int64 client_id = 6;
repeated KeyValuePair extra_params = 7;
repeated string json = 8;
string dsl = 9;
}
enum SyncType {
READ = 0;
WRITE = 1;
}
message TimeSyncMsg{
int64 peer_Id = 1;
uint64 Timestamp = 2;
SyncType sync_type = 3;
}
message Key2SegMsg {
int64 uid = 1;
uint64 timestamp = 2;
repeated int64 segment_id = 3;
}

File diff suppressed because it is too large Load Diff

View File

@ -24,7 +24,6 @@ type Proxy struct {
proxyLoopCancel func()
proxyLoopWg sync.WaitGroup
servicepb.UnimplementedMilvusServiceServer
grpcServer *grpc.Server
masterConn *grpc.ClientConn
masterClient masterpb.MasterClient
@ -32,6 +31,10 @@ type Proxy struct {
manipulationMsgStream *msgstream.PulsarMsgStream
queryMsgStream *msgstream.PulsarMsgStream
queryResultMsgStream *msgstream.PulsarMsgStream
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
}
func CreateProxy(ctx context.Context) (*Proxy, error) {
@ -42,6 +45,25 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
return m, nil
}
// AddStartCallback adds a callback in the startServer phase.
func (s *Proxy) AddStartCallback(callbacks ...func()) {
s.startCallbacks = append(s.startCallbacks, callbacks...)
}
func (s *Proxy) startProxy(ctx context.Context) error {
// Run callbacks
for _, cb := range s.startCallbacks {
cb()
}
return nil
}
// AddCloseCallback adds a callback in the Close phase.
func (s *Proxy) AddCloseCallback(callbacks ...func()) {
s.closeCallbacks = append(s.closeCallbacks, callbacks...)
}
func (p *Proxy) grpcLoop() {
defer p.proxyLoopWg.Done()
@ -56,16 +78,6 @@ func (p *Proxy) grpcLoop() {
if err = p.grpcServer.Serve(lis); err != nil {
log.Fatalf("Proxy grpc server fatal error=%v", err)
}
ctx, cancel := context.WithCancel(p.proxyLoopCtx)
defer cancel()
for {
select {
case <-ctx.Done():
log.Print("proxy is closed...")
return
}
}
}
func (p *Proxy) pulsarMsgStreamLoop() {
@ -164,6 +176,8 @@ func (p *Proxy) queryResultLoop() {
}
}
func (p *Proxy) startProxyLoop(ctx context.Context) {
p.proxyLoopCtx, p.proxyLoopCancel = context.WithCancel(ctx)
p.proxyLoopWg.Add(4)
@ -177,8 +191,26 @@ func (p *Proxy) startProxyLoop(ctx context.Context) {
}
func (p *Proxy) Run() error {
if err := p.startProxy(p.ctx); err != nil {
return err
}
p.startProxyLoop(p.ctx)
p.proxyLoopWg.Wait()
return nil
}
func (p *Proxy) stopProxyLoop() {
if p.grpcServer != nil{
p.grpcServer.GracefulStop()
}
p.proxyLoopCancel()
p.proxyLoopWg.Wait()
}
// Close closes the server.
func (p *Proxy) Close() {
p.stopProxyLoop()
for _, cb := range p.closeCallbacks {
cb()
}
log.Print("proxy closed.")
}

View File

@ -31,6 +31,5 @@ ${protoc} --go_out=plugins=grpc,paths=source_relative:./internalpb internal_msg.
${protoc} --go_out=plugins=grpc,paths=source_relative:./servicepb service_msg.proto
${protoc} --go_out=plugins=grpc,paths=source_relative:./servicepb service.proto
${protoc} --go_out=plugins=grpc,paths=source_relative:./masterpb master.proto
${protoc} --go_out=plugins=grpc,paths=source_relative:./message message.proto
popd