Use pulsar go client in query node and write node

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/4973/head^2
xige-16 2020-08-27 16:45:43 +08:00 committed by yefu.chen
parent f1d6b62e1d
commit 3b10f74ce1
6 changed files with 553 additions and 432 deletions

View File

@ -1,43 +1,191 @@
package pulsar
import "suvlim/pulsar/schema"
func Send(msg schema.Message) {
}
func BatchSend(msgs []schema.Message) {
}
import (
"context"
"github.com/apache/pulsar/pulsar-client-go/pulsar"
"log"
"suvlim/pulsar/schema"
"sync"
)
var (
InsertSchemaDef = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
wg sync.WaitGroup
//wgJob sync.WaitGroup
//wgQuery sync.WaitGroup
//wgWrite sync.WaitGroup
OriginMsgSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"CollectionName\",\"type\":\"string\"}," +
"{\"name\":\"Fields\",\"type\":\"[]*FieldValue\"}" +
"{\"name\":\"EntityId\",\"type\":\"int64\"}" +
"{\"name\":\"PartitionTag\",\"type\":\"string\"}" +
"{\"name\":\"Timestamp\",\"type\":\"int64\"}" +
"{\"name\":\"ClientId\",\"type\":\"int64\"}" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}" +
"]}"
DeleteSchemaDef = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"CollectionName\",\"type\":\"string\"}," +
"{\"name\":\"EntityId\",\"type\":\"int64\"}" +
"{\"name\":\"Timestamp\",\"type\":\"int64\"}" +
"{\"name\":\"ClientId\",\"type\":\"int64\"}" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}" +
"]}"
SearchSchemaDef = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"CollectionName\",\"type\":\"string\"}," +
"{\"name\":\"PartitionTag\",\"type\":\"string\"}" +
"{\"name\":\"VectorParam\",\"type\":\"*VectorParam\"}" +
"{\"name\":\"Segments\",\"type\":\"[]string\"}" +
"{\"name\":\"Timestamp\",\"type\":\"int64\"}" +
"{\"name\":\"ClientId\",\"type\":\"int64\"}" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}" +
"]}"
TimeSyncSchemaDef = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"Timestamp\",\"type\":\"int64\"}" +
"{\"name\":\"ClientId\",\"type\":\"int64\"}" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}" +
SyncEofSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}," +
"]}"
)
)
type MessageClient struct {
//message channel
insertChan chan *schema.InsertMsg
deleteChan chan *schema.DeleteMsg
searchChan chan *schema.SearchMsg
timeSyncChan chan *schema.TimeSyncMsg
key2SegChan chan *schema.Key2SegMsg
// pulsar
client pulsar.Client
syncInsertProducer pulsar.Producer
syncDeleteProducer pulsar.Producer
key2segProducer pulsar.Producer
consumer pulsar.Consumer
// batch messages
insertMsg []*schema.InsertMsg
deleteMsg []*schema.DeleteMsg
searchMsg []*schema.SearchMsg
timeMsg []*schema.TimeSyncMsg
key2segMsg []*schema.Key2SegMsg
}
func (mc *MessageClient) ReceiveMessage() {
for {
pulsarMessage := schema.PulsarMessage{}
msg, err := mc.consumer.Receive(context.Background())
err = msg.GetValue(&pulsarMessage)
if err != nil {
log.Fatal(err)
}
msgType := pulsarMessage.MsgType
switch msgType {
case schema.Insert:
IMsgObj := schema.InsertMsg{}
mc.insertChan <- &IMsgObj
case schema.Delete:
DMsgObj := schema.DeleteMsg{}
mc.deleteChan <- &DMsgObj
case schema.Search:
SMsgObj := schema.SearchMsg{}
mc.searchChan <- &SMsgObj
case schema.TimeSync:
TMsgObj := schema.TimeSyncMsg{}
mc.timeSyncChan <- &TMsgObj
case schema.Key2Seg:
KMsgObj := schema.Key2SegMsg{}
mc.key2SegChan <- &KMsgObj
}
}
}
func (mc *MessageClient) CreatProducer(schemaDef string, topicName string) pulsar.Producer{
schema := pulsar.NewProtoSchema(schemaDef, nil)
producer, err := mc.client.CreateProducerWithSchema(pulsar.ProducerOptions{
Topic: topicName,
}, schema)
defer producer.Close()
if err != nil {
log.Fatal(err)
}
return producer
}
func (mc *MessageClient) CreateConsumer(schemaDef string, topics []string) pulsar.Consumer {
originMsgSchema := pulsar.NewProtoSchema(schemaDef, nil)
consumer, err := mc.client.SubscribeWithSchema(pulsar.ConsumerOptions{
Topics: topics,
SubscriptionName: "multi-topic-sub",
}, originMsgSchema)
defer consumer.Close()
if err != nil {
log.Fatal(err)
}
return consumer
}
func (mc *MessageClient) CreateClient(url string) pulsar.Client {
// create client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: url,
})
defer client.Close()
if err != nil {
log.Fatal(err)
}
return client
}
func (mc *MessageClient) InitClient(url string,topics []string) {
//create client
mc.client = mc.CreateClient(url)
//create producer
mc.syncInsertProducer = mc.CreatProducer(SyncEofSchema, "insert")
mc.syncDeleteProducer = mc.CreatProducer(SyncEofSchema, "delete")
mc.key2segProducer = mc.CreatProducer(SyncEofSchema, "key2seg")
//create consumer
mc.consumer = mc.CreateConsumer(OriginMsgSchema, topics)
// init channel
mc.insertChan = make(chan *schema.InsertMsg, 1000)
mc.deleteChan = make(chan *schema.DeleteMsg, 1000)
mc.searchChan = make(chan *schema.SearchMsg, 1000)
mc.timeSyncChan = make(chan *schema.TimeSyncMsg, 1000)
mc.key2SegChan = make(chan *schema.Key2SegMsg, 1000)
}
type JobType int
const (
OpInQueryNode JobType = 0
OpInWriteNode JobType = 1
)
func (mc *MessageClient) PrepareBatchMsg(jobType JobType) {
// assume the channel not full
mc.insertMsg = make([]*schema.InsertMsg, 1000)
mc.deleteMsg = make([]*schema.DeleteMsg, 1000)
mc.searchMsg = make([]*schema.SearchMsg, 1000)
mc.timeMsg = make([]*schema.TimeSyncMsg, 1000)
mc.key2segMsg = make([]*schema.Key2SegMsg, 1000)
// get the length of every channel
insertLen := len(mc.insertChan)
deleteLen := len(mc.deleteChan)
searchLen := len(mc.searchChan)
timeLen := len(mc.timeSyncChan)
key2segLen := len(mc.key2SegChan)
// get message from channel to slice
for i := 0; i < insertLen; i++ {
msg := <- mc.insertChan
mc.insertMsg[i] = msg
}
for i := 0; i < deleteLen; i++ {
msg := <- mc.deleteChan
mc.deleteMsg[i] = msg
}
for i := 0; i < timeLen; i++ {
msg := <- mc.timeSyncChan
mc.timeMsg[i] = msg
}
if jobType == OpInQueryNode {
for i := 0; i < key2segLen; i++ {
msg := <-mc.key2SegChan
mc.key2segMsg[i] = msg
}
for i := 0; i < searchLen; i++ {
msg := <-mc.searchChan
mc.searchMsg[i] = msg
}
}
}

View File

@ -9,18 +9,14 @@ It is generated from these files:
It has these top-level messages:
Status
KeyValuePair
SegmentRecord
VectorRowRecord
AttrRecord
VectorRecord
VectorParam
FieldValue
Entities
InsertParam
EntityIds
DeleteByIDParam
SearchParam
QueryResult
PulsarMessage
PulsarMessages
*/
package pb
@ -174,6 +170,39 @@ func (x DataType) String() string {
}
func (DataType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
type OpType int32
const (
OpType_Insert OpType = 0
OpType_Delete OpType = 1
OpType_Search OpType = 2
OpType_TimeSync OpType = 3
OpType_Key2Seg OpType = 4
OpType_Statistics OpType = 5
)
var OpType_name = map[int32]string{
0: "Insert",
1: "Delete",
2: "Search",
3: "TimeSync",
4: "Key2Seg",
5: "Statistics",
}
var OpType_value = map[string]int32{
"Insert": 0,
"Delete": 1,
"Search": 2,
"TimeSync": 3,
"Key2Seg": 4,
"Statistics": 5,
}
func (x OpType) String() string {
return proto.EnumName(OpType_name, int32(x))
}
func (OpType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
type Status struct {
ErrorCode ErrorCode `protobuf:"varint,1,opt,name=error_code,json=errorCode,enum=pb.ErrorCode" json:"error_code,omitempty"`
Reason string `protobuf:"bytes,2,opt,name=reason" json:"reason,omitempty"`
@ -198,28 +227,20 @@ func (m *Status) GetReason() string {
return ""
}
type KeyValuePair struct {
Key string `protobuf:"bytes,1,opt,name=key" json:"key,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"`
type SegmentRecord struct {
SegInfo []string `protobuf:"bytes,1,rep,name=seg_info,json=segInfo" json:"seg_info,omitempty"`
}
func (m *KeyValuePair) Reset() { *m = KeyValuePair{} }
func (m *KeyValuePair) String() string { return proto.CompactTextString(m) }
func (*KeyValuePair) ProtoMessage() {}
func (*KeyValuePair) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *SegmentRecord) Reset() { *m = SegmentRecord{} }
func (m *SegmentRecord) String() string { return proto.CompactTextString(m) }
func (*SegmentRecord) ProtoMessage() {}
func (*SegmentRecord) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *KeyValuePair) GetKey() string {
func (m *SegmentRecord) GetSegInfo() []string {
if m != nil {
return m.Key
return m.SegInfo
}
return ""
}
func (m *KeyValuePair) GetValue() string {
if m != nil {
return m.Value
}
return ""
return nil
}
type VectorRowRecord struct {
@ -366,392 +387,252 @@ func (m *FieldValue) GetVectorRecord() *VectorRecord {
return nil
}
type Entities struct {
Status *Status `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"`
Ids []int64 `protobuf:"varint,2,rep,packed,name=ids" json:"ids,omitempty"`
ValidRow []bool `protobuf:"varint,3,rep,packed,name=valid_row,json=validRow" json:"valid_row,omitempty"`
Fields []*FieldValue `protobuf:"bytes,4,rep,name=fields" json:"fields,omitempty"`
type PulsarMessage struct {
CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName" json:"collection_name,omitempty"`
Fields []*FieldValue `protobuf:"bytes,2,rep,name=fields" json:"fields,omitempty"`
EntityId int64 `protobuf:"varint,3,opt,name=entity_id,json=entityId" json:"entity_id,omitempty"`
PartitionTag string `protobuf:"bytes,4,opt,name=partition_tag,json=partitionTag" json:"partition_tag,omitempty"`
VectorParam *VectorParam `protobuf:"bytes,5,opt,name=vector_param,json=vectorParam" json:"vector_param,omitempty"`
Segments *SegmentRecord `protobuf:"bytes,6,opt,name=segments" json:"segments,omitempty"`
Timestamp int64 `protobuf:"varint,7,opt,name=timestamp" json:"timestamp,omitempty"`
ClientId int64 `protobuf:"varint,8,opt,name=client_id,json=clientId" json:"client_id,omitempty"`
MsgType OpType `protobuf:"varint,9,opt,name=msg_type,json=msgType,enum=pb.OpType" json:"msg_type,omitempty"`
}
func (m *Entities) Reset() { *m = Entities{} }
func (m *Entities) String() string { return proto.CompactTextString(m) }
func (*Entities) ProtoMessage() {}
func (*Entities) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func (m *PulsarMessage) Reset() { *m = PulsarMessage{} }
func (m *PulsarMessage) String() string { return proto.CompactTextString(m) }
func (*PulsarMessage) ProtoMessage() {}
func (*PulsarMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func (m *Entities) GetStatus() *Status {
if m != nil {
return m.Status
}
return nil
}
func (m *Entities) GetIds() []int64 {
if m != nil {
return m.Ids
}
return nil
}
func (m *Entities) GetValidRow() []bool {
if m != nil {
return m.ValidRow
}
return nil
}
func (m *Entities) GetFields() []*FieldValue {
if m != nil {
return m.Fields
}
return nil
}
type InsertParam struct {
CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName" json:"collection_name,omitempty"`
Fields []*FieldValue `protobuf:"bytes,2,rep,name=fields" json:"fields,omitempty"`
EntityIdArray []int64 `protobuf:"varint,3,rep,packed,name=entity_id_array,json=entityIdArray" json:"entity_id_array,omitempty"`
PartitionTag string `protobuf:"bytes,4,opt,name=partition_tag,json=partitionTag" json:"partition_tag,omitempty"`
Timestamp []int64 `protobuf:"varint,5,rep,packed,name=timestamp" json:"timestamp,omitempty"`
GrpcServerClientId int64 `protobuf:"varint,6,opt,name=grpc_server_client_id,json=grpcServerClientId" json:"grpc_server_client_id,omitempty"`
ExtraParams []*KeyValuePair `protobuf:"bytes,7,rep,name=extra_params,json=extraParams" json:"extra_params,omitempty"`
}
func (m *InsertParam) Reset() { *m = InsertParam{} }
func (m *InsertParam) String() string { return proto.CompactTextString(m) }
func (*InsertParam) ProtoMessage() {}
func (*InsertParam) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
func (m *InsertParam) GetCollectionName() string {
func (m *PulsarMessage) GetCollectionName() string {
if m != nil {
return m.CollectionName
}
return ""
}
func (m *InsertParam) GetFields() []*FieldValue {
func (m *PulsarMessage) GetFields() []*FieldValue {
if m != nil {
return m.Fields
}
return nil
}
func (m *InsertParam) GetEntityIdArray() []int64 {
func (m *PulsarMessage) GetEntityId() int64 {
if m != nil {
return m.EntityIdArray
return m.EntityId
}
return nil
return 0
}
func (m *InsertParam) GetPartitionTag() string {
func (m *PulsarMessage) GetPartitionTag() string {
if m != nil {
return m.PartitionTag
}
return ""
}
func (m *InsertParam) GetTimestamp() []int64 {
if m != nil {
return m.Timestamp
}
return nil
}
func (m *InsertParam) GetGrpcServerClientId() int64 {
if m != nil {
return m.GrpcServerClientId
}
return 0
}
func (m *InsertParam) GetExtraParams() []*KeyValuePair {
if m != nil {
return m.ExtraParams
}
return nil
}
type EntityIds struct {
Status *Status `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"`
EntityIdArray []int64 `protobuf:"varint,2,rep,packed,name=entity_id_array,json=entityIdArray" json:"entity_id_array,omitempty"`
}
func (m *EntityIds) Reset() { *m = EntityIds{} }
func (m *EntityIds) String() string { return proto.CompactTextString(m) }
func (*EntityIds) ProtoMessage() {}
func (*EntityIds) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} }
func (m *EntityIds) GetStatus() *Status {
if m != nil {
return m.Status
}
return nil
}
func (m *EntityIds) GetEntityIdArray() []int64 {
if m != nil {
return m.EntityIdArray
}
return nil
}
type DeleteByIDParam struct {
CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName" json:"collection_name,omitempty"`
IdArray []int64 `protobuf:"varint,2,rep,packed,name=id_array,json=idArray" json:"id_array,omitempty"`
Timestamp []int64 `protobuf:"varint,3,rep,packed,name=timestamp" json:"timestamp,omitempty"`
GrpcServerClientId int64 `protobuf:"varint,4,opt,name=grpc_server_client_id,json=grpcServerClientId" json:"grpc_server_client_id,omitempty"`
}
func (m *DeleteByIDParam) Reset() { *m = DeleteByIDParam{} }
func (m *DeleteByIDParam) String() string { return proto.CompactTextString(m) }
func (*DeleteByIDParam) ProtoMessage() {}
func (*DeleteByIDParam) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} }
func (m *DeleteByIDParam) GetCollectionName() string {
if m != nil {
return m.CollectionName
}
return ""
}
func (m *DeleteByIDParam) GetIdArray() []int64 {
if m != nil {
return m.IdArray
}
return nil
}
func (m *DeleteByIDParam) GetTimestamp() []int64 {
if m != nil {
return m.Timestamp
}
return nil
}
func (m *DeleteByIDParam) GetGrpcServerClientId() int64 {
if m != nil {
return m.GrpcServerClientId
}
return 0
}
type SearchParam struct {
CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName" json:"collection_name,omitempty"`
PartitionTagArray []string `protobuf:"bytes,2,rep,name=partition_tag_array,json=partitionTagArray" json:"partition_tag_array,omitempty"`
VectorParam []*VectorParam `protobuf:"bytes,3,rep,name=vector_param,json=vectorParam" json:"vector_param,omitempty"`
Dsl string `protobuf:"bytes,4,opt,name=dsl" json:"dsl,omitempty"`
Timestamp []int64 `protobuf:"varint,5,rep,packed,name=timestamp" json:"timestamp,omitempty"`
GrpcServerClientId int64 `protobuf:"varint,6,opt,name=grpc_server_client_id,json=grpcServerClientId" json:"grpc_server_client_id,omitempty"`
ExtraParams []*KeyValuePair `protobuf:"bytes,7,rep,name=extra_params,json=extraParams" json:"extra_params,omitempty"`
}
func (m *SearchParam) Reset() { *m = SearchParam{} }
func (m *SearchParam) String() string { return proto.CompactTextString(m) }
func (*SearchParam) ProtoMessage() {}
func (*SearchParam) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} }
func (m *SearchParam) GetCollectionName() string {
if m != nil {
return m.CollectionName
}
return ""
}
func (m *SearchParam) GetPartitionTagArray() []string {
if m != nil {
return m.PartitionTagArray
}
return nil
}
func (m *SearchParam) GetVectorParam() []*VectorParam {
func (m *PulsarMessage) GetVectorParam() *VectorParam {
if m != nil {
return m.VectorParam
}
return nil
}
func (m *SearchParam) GetDsl() string {
func (m *PulsarMessage) GetSegments() *SegmentRecord {
if m != nil {
return m.Dsl
return m.Segments
}
return nil
}
func (m *PulsarMessage) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
func (m *PulsarMessage) GetClientId() int64 {
if m != nil {
return m.ClientId
}
return 0
}
func (m *PulsarMessage) GetMsgType() OpType {
if m != nil {
return m.MsgType
}
return OpType_Insert
}
type PulsarMessages struct {
CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName" json:"collection_name,omitempty"`
Fields []*FieldValue `protobuf:"bytes,2,rep,name=fields" json:"fields,omitempty"`
EntityId []int64 `protobuf:"varint,3,rep,packed,name=entity_id,json=entityId" json:"entity_id,omitempty"`
PartitionTag string `protobuf:"bytes,4,opt,name=partition_tag,json=partitionTag" json:"partition_tag,omitempty"`
VectorParam []*VectorParam `protobuf:"bytes,5,rep,name=vector_param,json=vectorParam" json:"vector_param,omitempty"`
Segments []*SegmentRecord `protobuf:"bytes,6,rep,name=segments" json:"segments,omitempty"`
Timestamp []int64 `protobuf:"varint,7,rep,packed,name=timestamp" json:"timestamp,omitempty"`
ClientId []int64 `protobuf:"varint,8,rep,packed,name=client_id,json=clientId" json:"client_id,omitempty"`
MsgType OpType `protobuf:"varint,9,opt,name=msg_type,json=msgType,enum=pb.OpType" json:"msg_type,omitempty"`
}
func (m *PulsarMessages) Reset() { *m = PulsarMessages{} }
func (m *PulsarMessages) String() string { return proto.CompactTextString(m) }
func (*PulsarMessages) ProtoMessage() {}
func (*PulsarMessages) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
func (m *PulsarMessages) GetCollectionName() string {
if m != nil {
return m.CollectionName
}
return ""
}
func (m *SearchParam) GetTimestamp() []int64 {
func (m *PulsarMessages) GetFields() []*FieldValue {
if m != nil {
return m.Fields
}
return nil
}
func (m *PulsarMessages) GetEntityId() []int64 {
if m != nil {
return m.EntityId
}
return nil
}
func (m *PulsarMessages) GetPartitionTag() string {
if m != nil {
return m.PartitionTag
}
return ""
}
func (m *PulsarMessages) GetVectorParam() []*VectorParam {
if m != nil {
return m.VectorParam
}
return nil
}
func (m *PulsarMessages) GetSegments() []*SegmentRecord {
if m != nil {
return m.Segments
}
return nil
}
func (m *PulsarMessages) GetTimestamp() []int64 {
if m != nil {
return m.Timestamp
}
return nil
}
func (m *SearchParam) GetGrpcServerClientId() int64 {
func (m *PulsarMessages) GetClientId() []int64 {
if m != nil {
return m.GrpcServerClientId
}
return 0
}
func (m *SearchParam) GetExtraParams() []*KeyValuePair {
if m != nil {
return m.ExtraParams
return m.ClientId
}
return nil
}
type QueryResult struct {
Status *Status `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"`
Entities *Entities `protobuf:"bytes,2,opt,name=entities" json:"entities,omitempty"`
RowNum int64 `protobuf:"varint,3,opt,name=row_num,json=rowNum" json:"row_num,omitempty"`
Scores []float32 `protobuf:"fixed32,4,rep,packed,name=scores" json:"scores,omitempty"`
Distances []float32 `protobuf:"fixed32,5,rep,packed,name=distances" json:"distances,omitempty"`
ExtraParams []*KeyValuePair `protobuf:"bytes,6,rep,name=extra_params,json=extraParams" json:"extra_params,omitempty"`
}
func (m *QueryResult) Reset() { *m = QueryResult{} }
func (m *QueryResult) String() string { return proto.CompactTextString(m) }
func (*QueryResult) ProtoMessage() {}
func (*QueryResult) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} }
func (m *QueryResult) GetStatus() *Status {
func (m *PulsarMessages) GetMsgType() OpType {
if m != nil {
return m.Status
return m.MsgType
}
return nil
}
func (m *QueryResult) GetEntities() *Entities {
if m != nil {
return m.Entities
}
return nil
}
func (m *QueryResult) GetRowNum() int64 {
if m != nil {
return m.RowNum
}
return 0
}
func (m *QueryResult) GetScores() []float32 {
if m != nil {
return m.Scores
}
return nil
}
func (m *QueryResult) GetDistances() []float32 {
if m != nil {
return m.Distances
}
return nil
}
func (m *QueryResult) GetExtraParams() []*KeyValuePair {
if m != nil {
return m.ExtraParams
}
return nil
return OpType_Insert
}
func init() {
proto.RegisterType((*Status)(nil), "pb.Status")
proto.RegisterType((*KeyValuePair)(nil), "pb.KeyValuePair")
proto.RegisterType((*SegmentRecord)(nil), "pb.SegmentRecord")
proto.RegisterType((*VectorRowRecord)(nil), "pb.VectorRowRecord")
proto.RegisterType((*AttrRecord)(nil), "pb.AttrRecord")
proto.RegisterType((*VectorRecord)(nil), "pb.VectorRecord")
proto.RegisterType((*VectorParam)(nil), "pb.VectorParam")
proto.RegisterType((*FieldValue)(nil), "pb.FieldValue")
proto.RegisterType((*Entities)(nil), "pb.Entities")
proto.RegisterType((*InsertParam)(nil), "pb.InsertParam")
proto.RegisterType((*EntityIds)(nil), "pb.EntityIds")
proto.RegisterType((*DeleteByIDParam)(nil), "pb.DeleteByIDParam")
proto.RegisterType((*SearchParam)(nil), "pb.SearchParam")
proto.RegisterType((*QueryResult)(nil), "pb.QueryResult")
proto.RegisterType((*PulsarMessage)(nil), "pb.PulsarMessage")
proto.RegisterType((*PulsarMessages)(nil), "pb.PulsarMessages")
proto.RegisterEnum("pb.ErrorCode", ErrorCode_name, ErrorCode_value)
proto.RegisterEnum("pb.DataType", DataType_name, DataType_value)
proto.RegisterEnum("pb.OpType", OpType_name, OpType_value)
}
func init() { proto.RegisterFile("pulsar.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 1248 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x56, 0xdd, 0x72, 0xdb, 0x44,
0x14, 0xae, 0x2c, 0xc7, 0xb1, 0x8e, 0x9c, 0x64, 0xb3, 0x49, 0x1a, 0x77, 0x0a, 0xd3, 0x60, 0x66,
0x8a, 0xa7, 0x03, 0xe9, 0xd4, 0x29, 0x19, 0x6e, 0xb8, 0x70, 0xa4, 0x75, 0xab, 0xa9, 0x2c, 0xa5,
0x2b, 0xb9, 0x3f, 0x57, 0x9a, 0x8d, 0xb5, 0x14, 0x81, 0x6d, 0x79, 0x56, 0x72, 0x82, 0x1f, 0x80,
0x07, 0x80, 0x47, 0xe0, 0x82, 0x7b, 0x98, 0xe1, 0x79, 0x78, 0x15, 0x66, 0x57, 0x52, 0xec, 0x96,
0x96, 0x49, 0xef, 0xb8, 0x3b, 0xfb, 0x9d, 0x9f, 0xfd, 0xce, 0x77, 0xd6, 0x47, 0x86, 0xd6, 0x7c,
0x31, 0xc9, 0x98, 0x38, 0x9e, 0x8b, 0x34, 0x4f, 0x71, 0x6d, 0x7e, 0xd1, 0xf1, 0xa0, 0x11, 0xe4,
0x2c, 0x5f, 0x64, 0xf8, 0x4b, 0x00, 0x2e, 0x44, 0x2a, 0xa2, 0x71, 0x1a, 0xf3, 0xb6, 0x76, 0xa4,
0x75, 0xb7, 0x7b, 0x5b, 0xc7, 0xf3, 0x8b, 0x63, 0x22, 0x51, 0x2b, 0x8d, 0x39, 0x35, 0x78, 0x65,
0xe2, 0xdb, 0xd0, 0x10, 0x9c, 0x65, 0xe9, 0xac, 0x5d, 0x3b, 0xd2, 0xba, 0x06, 0x2d, 0x4f, 0x9d,
0x53, 0x68, 0x3d, 0xe3, 0xcb, 0x17, 0x6c, 0xb2, 0xe0, 0xe7, 0x2c, 0x11, 0x18, 0x81, 0xfe, 0x23,
0x5f, 0xaa, 0x72, 0x06, 0x95, 0x26, 0xde, 0x87, 0x8d, 0x4b, 0xe9, 0x2e, 0x13, 0x8b, 0x43, 0xe7,
0x39, 0xec, 0xbc, 0xe0, 0xe3, 0x3c, 0x15, 0x34, 0xbd, 0xa2, 0x7c, 0x9c, 0x8a, 0x18, 0x7f, 0x0a,
0xf0, 0xdd, 0x24, 0x65, 0x79, 0x14, 0xb3, 0x9c, 0xb5, 0xb5, 0x23, 0xbd, 0x5b, 0xa3, 0x86, 0x42,
0x6c, 0x96, 0x33, 0x7c, 0x0f, 0xcc, 0x8b, 0x64, 0xc6, 0xc4, 0xb2, 0xf0, 0xcb, 0x6a, 0x2d, 0x0a,
0x05, 0x24, 0x03, 0x3a, 0xbf, 0x6a, 0x00, 0xfd, 0x3c, 0x17, 0x65, 0xb9, 0x7b, 0x60, 0x26, 0xb3,
0xfc, 0xa4, 0x17, 0x15, 0xb7, 0xcb, 0x7a, 0x1b, 0x14, 0x14, 0xa4, 0xe8, 0x96, 0x01, 0xa7, 0x8f,
0xa3, 0x8a, 0x9e, 0xde, 0xd5, 0x55, 0xc0, 0xe9, 0xe3, 0xeb, 0x80, 0x82, 0x50, 0x11, 0xa0, 0x2b,
0x46, 0x05, 0xc7, 0x22, 0xe0, 0x33, 0x68, 0xc5, 0xe9, 0xe2, 0x62, 0xc2, 0xcb, 0x88, 0xfa, 0x91,
0xde, 0xd5, 0xa8, 0x59, 0x60, 0x2a, 0xa4, 0xf3, 0x2d, 0xb4, 0xca, 0x3e, 0x0b, 0x56, 0x5f, 0xc1,
0xa6, 0x50, 0x56, 0xa6, 0x18, 0x99, 0xbd, 0x3d, 0x29, 0xf9, 0x3b, 0x52, 0xd0, 0x2a, 0xa6, 0x43,
0xc1, 0x2c, 0x7c, 0xe7, 0x4c, 0xb0, 0x29, 0xc6, 0x50, 0xff, 0x41, 0xce, 0xa0, 0x90, 0x57, 0xd9,
0xf8, 0x21, 0x80, 0x48, 0xaf, 0xa2, 0x22, 0x43, 0xc9, 0x62, 0xf6, 0xd0, 0x5a, 0xd1, 0xa2, 0xa2,
0x21, 0xaa, 0xe2, 0x9d, 0xbf, 0x34, 0x80, 0x41, 0xc2, 0x27, 0x71, 0xd1, 0x84, 0x94, 0x5d, 0x9e,
0xa2, 0x19, 0x9b, 0xf2, 0xb2, 0xb2, 0xa1, 0x10, 0x8f, 0x4d, 0x39, 0x3e, 0x82, 0x7a, 0xbe, 0x9c,
0x17, 0xd3, 0xdb, 0xee, 0xb5, 0x64, 0x61, 0xa9, 0x76, 0xb8, 0x9c, 0x73, 0xaa, 0x3c, 0xf8, 0x21,
0x98, 0x2c, 0xcf, 0x45, 0xc5, 0x40, 0x57, 0x0c, 0xb6, 0x65, 0xe0, 0x6a, 0x1a, 0x14, 0xd8, 0x6a,
0x32, 0x5f, 0xc3, 0xd6, 0xa5, 0xe2, 0x56, 0xa5, 0xd4, 0x3f, 0x40, 0xba, 0x75, 0xb9, 0x76, 0xea,
0xfc, 0xac, 0x41, 0x93, 0xcc, 0xf2, 0x24, 0x4f, 0x78, 0x86, 0x3b, 0xd0, 0xc8, 0xd4, 0x3b, 0x56,
0x8c, 0xcd, 0x1e, 0xc8, 0xe4, 0xe2, 0x65, 0xd3, 0xd2, 0x23, 0xdf, 0x62, 0x12, 0x67, 0xe5, 0x60,
0xa5, 0x89, 0xef, 0x82, 0x71, 0xc9, 0x26, 0x49, 0x1c, 0x89, 0xf4, 0x4a, 0xcd, 0xb3, 0x49, 0x9b,
0x0a, 0xa0, 0xe9, 0x15, 0xbe, 0x0f, 0x0d, 0xd5, 0x76, 0xa6, 0xe6, 0x58, 0xb6, 0xb0, 0x12, 0x8a,
0x96, 0xde, 0xce, 0x1f, 0x35, 0x30, 0x9d, 0x59, 0xc6, 0x45, 0x5e, 0x0c, 0xe5, 0x0b, 0xd8, 0x19,
0xa7, 0x93, 0x09, 0x1f, 0xe7, 0x49, 0x3a, 0x5b, 0x57, 0x71, 0x7b, 0x05, 0x2b, 0x29, 0x57, 0x17,
0xd4, 0xfe, 0xeb, 0x02, 0x7c, 0x1f, 0x76, 0xb8, 0xec, 0x73, 0x19, 0x25, 0x71, 0xc4, 0x84, 0x60,
0x4b, 0xc5, 0x55, 0xa7, 0x5b, 0x05, 0xec, 0xc4, 0x7d, 0x09, 0xe2, 0xcf, 0x61, 0x6b, 0xce, 0x84,
0x14, 0x24, 0x9d, 0x45, 0x39, 0x7b, 0xa3, 0x74, 0x34, 0x68, 0xeb, 0x1a, 0x0c, 0xd9, 0x1b, 0xfc,
0x09, 0x18, 0x79, 0x32, 0xe5, 0x59, 0xce, 0xa6, 0xf3, 0xf6, 0x86, 0x2a, 0xb3, 0x02, 0xf0, 0x23,
0x38, 0x78, 0x23, 0xe6, 0xe3, 0x28, 0xe3, 0xe2, 0x92, 0x8b, 0x68, 0x3c, 0x49, 0xf8, 0x2c, 0x8f,
0x92, 0xb8, 0xdd, 0x38, 0xd2, 0xba, 0x3a, 0xc5, 0xd2, 0x19, 0x28, 0x9f, 0xa5, 0x5c, 0x4e, 0x8c,
0x4f, 0xa0, 0xc5, 0x7f, 0xca, 0x05, 0x8b, 0xe6, 0xb2, 0xfb, 0xac, 0xbd, 0xa9, 0x7a, 0x51, 0xc3,
0x5b, 0xdf, 0x04, 0xd4, 0x54, 0x51, 0x4a, 0xa2, 0xac, 0xf3, 0x12, 0x0c, 0x52, 0x72, 0xbf, 0xd9,
0xec, 0xde, 0xa3, 0x41, 0xed, 0x3d, 0x1a, 0x74, 0x7e, 0xd7, 0x60, 0xc7, 0xe6, 0x13, 0x9e, 0xf3,
0xb3, 0xa5, 0x63, 0x7f, 0xe4, 0x40, 0xee, 0x40, 0xf3, 0x9d, 0xea, 0x9b, 0x49, 0xa9, 0xed, 0x5b,
0xb2, 0xe9, 0x37, 0x96, 0xad, 0xfe, 0x21, 0xd9, 0x3a, 0x7f, 0xd6, 0xc0, 0x0c, 0x38, 0x13, 0xe3,
0xef, 0x3f, 0x92, 0xe4, 0x31, 0xec, 0xbd, 0x35, 0xe5, 0x35, 0xbe, 0x06, 0xdd, 0x5d, 0x9f, 0x75,
0xc1, 0xbc, 0x07, 0xe5, 0xcf, 0xa6, 0x18, 0x90, 0x22, 0x6f, 0xf6, 0x76, 0x56, 0x3f, 0x2e, 0x75,
0x3f, 0x35, 0x2f, 0xd7, 0xf6, 0x0a, 0x02, 0x3d, 0xce, 0x26, 0xe5, 0xfb, 0x91, 0xe6, 0xff, 0xe4,
0xd9, 0xfc, 0xad, 0x81, 0xf9, 0x7c, 0xc1, 0xc5, 0x92, 0xf2, 0x6c, 0x31, 0xc9, 0x6f, 0xf4, 0x72,
0xba, 0xd0, 0xe4, 0xe5, 0x96, 0x28, 0xb7, 0xa1, 0x5a, 0x5a, 0xd5, 0xe6, 0xa0, 0xd7, 0x5e, 0x7c,
0x08, 0x9b, 0x72, 0x73, 0xce, 0x16, 0x53, 0xb5, 0xb4, 0x74, 0xda, 0x10, 0xe9, 0x95, 0xb7, 0x98,
0xca, 0x8f, 0x5d, 0x36, 0x4e, 0x05, 0x2f, 0x36, 0x41, 0x8d, 0x96, 0x27, 0x29, 0x4a, 0x9c, 0x64,
0x39, 0x9b, 0x8d, 0x79, 0xa6, 0x44, 0xa9, 0xd1, 0x15, 0xf0, 0xaf, 0x0e, 0x1b, 0x37, 0xe8, 0xf0,
0xc1, 0x6f, 0x75, 0x30, 0xae, 0x3f, 0xb8, 0xd8, 0x84, 0xcd, 0x60, 0x64, 0x59, 0x24, 0x08, 0xd0,
0x2d, 0xbc, 0x0f, 0x68, 0xe4, 0x91, 0x57, 0xe7, 0xc4, 0x0a, 0x89, 0x1d, 0x11, 0x4a, 0x7d, 0x8a,
0x34, 0x8c, 0x61, 0xdb, 0xf2, 0x3d, 0x8f, 0x58, 0x61, 0x34, 0xe8, 0x3b, 0x2e, 0xb1, 0x51, 0x0d,
0x1f, 0xc0, 0xee, 0x39, 0xa1, 0x43, 0x27, 0x08, 0x1c, 0xdf, 0x8b, 0x6c, 0xe2, 0x39, 0xc4, 0x46,
0x3a, 0xbe, 0x03, 0x07, 0x96, 0xef, 0xba, 0xc4, 0x0a, 0x25, 0xec, 0xf9, 0x61, 0x44, 0x5e, 0x39,
0x41, 0x18, 0xa0, 0xba, 0xac, 0xed, 0xb8, 0x2e, 0x79, 0xd2, 0x77, 0xa3, 0x3e, 0x7d, 0x32, 0x1a,
0x12, 0x2f, 0x44, 0x1b, 0xb2, 0x4e, 0x85, 0xda, 0xce, 0x90, 0x78, 0xb2, 0x1c, 0xda, 0xc4, 0xb7,
0x01, 0x57, 0xb0, 0xe3, 0xd9, 0xe4, 0x55, 0x14, 0xbe, 0x3e, 0x27, 0xa8, 0x89, 0xef, 0xc2, 0x61,
0x85, 0xaf, 0xdf, 0xd3, 0x1f, 0x12, 0x64, 0x60, 0x04, 0xad, 0xca, 0x19, 0xfa, 0xe7, 0xcf, 0x10,
0xac, 0x57, 0xa7, 0xfe, 0x4b, 0x4a, 0x2c, 0x9f, 0xda, 0xc8, 0x5c, 0x87, 0x5f, 0x10, 0x2b, 0xf4,
0x69, 0xe4, 0xd8, 0xa8, 0x25, 0xc9, 0x57, 0x70, 0x40, 0xfa, 0xd4, 0x7a, 0x1a, 0x51, 0x12, 0x8c,
0xdc, 0x10, 0x6d, 0x49, 0x09, 0x06, 0x8e, 0x4b, 0x54, 0x47, 0x03, 0x7f, 0xe4, 0xd9, 0x68, 0x1b,
0xef, 0x80, 0x39, 0x24, 0x61, 0xbf, 0xd2, 0x64, 0x47, 0xde, 0x6f, 0xf5, 0xad, 0xa7, 0xa4, 0x42,
0x10, 0x6e, 0xc3, 0xbe, 0xd5, 0xf7, 0x64, 0x92, 0x45, 0x49, 0x3f, 0x24, 0xd1, 0xc0, 0x77, 0x6d,
0x42, 0xd1, 0xae, 0x6c, 0xf0, 0x1d, 0x8f, 0xe3, 0x12, 0x84, 0xd7, 0x32, 0x6c, 0xe2, 0x92, 0x55,
0xc6, 0xde, 0x5a, 0x46, 0xe5, 0x91, 0x19, 0xfb, 0xb2, 0x99, 0xb3, 0x91, 0xe3, 0xda, 0xa5, 0x50,
0xc5, 0xd0, 0x0e, 0xf0, 0x2e, 0x6c, 0x55, 0xcd, 0x78, 0xae, 0x13, 0x84, 0xe8, 0x36, 0x3e, 0x84,
0xbd, 0x0a, 0x1a, 0x92, 0x90, 0x3a, 0x56, 0xa1, 0xea, 0xa1, 0x8c, 0xf5, 0x47, 0x61, 0xe4, 0x0f,
0xa2, 0x21, 0x19, 0xfa, 0xf4, 0x35, 0x6a, 0x3f, 0xf8, 0x45, 0x83, 0x66, 0xf5, 0xd1, 0xc5, 0x4d,
0xa8, 0x7b, 0xbe, 0x47, 0xd0, 0x2d, 0x69, 0x9d, 0xf9, 0xbe, 0x8b, 0x34, 0x69, 0x39, 0x5e, 0xf8,
0x0d, 0xaa, 0x61, 0x03, 0x36, 0x1c, 0x2f, 0x7c, 0x74, 0x8a, 0xf4, 0xd2, 0x3c, 0xe9, 0xa1, 0x7a,
0x69, 0x9e, 0x3e, 0x46, 0x1b, 0xd2, 0x1c, 0xb8, 0x7e, 0x3f, 0x44, 0x80, 0x01, 0x1a, 0xb6, 0x3f,
0x3a, 0x73, 0x09, 0x32, 0xa5, 0x1d, 0x84, 0xd4, 0xf1, 0x9e, 0xa0, 0x7d, 0xc9, 0xa0, 0x9c, 0xc4,
0x99, 0xe3, 0xf5, 0xe9, 0x6b, 0x14, 0x4b, 0x35, 0x4b, 0xa8, 0x48, 0xe6, 0x17, 0x0d, 0xf5, 0x9f,
0xf2, 0xe4, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x3c, 0x17, 0xca, 0xe4, 0x63, 0x0a, 0x00, 0x00,
// 1101 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x5d, 0x6e, 0xdb, 0x46,
0x17, 0x0d, 0x45, 0xfd, 0xf1, 0x52, 0x92, 0xc7, 0x13, 0x27, 0x51, 0x90, 0xef, 0x43, 0x54, 0x15,
0x6d, 0x0d, 0xa3, 0x49, 0x50, 0x25, 0x35, 0xfa, 0xd2, 0x07, 0x9a, 0x1c, 0x25, 0x44, 0x28, 0x52,
0x1d, 0x52, 0x8e, 0xfd, 0x44, 0xd0, 0xd2, 0x58, 0x65, 0x21, 0x91, 0x02, 0x39, 0x76, 0xa0, 0x65,
0xb4, 0x4b, 0xe8, 0x1e, 0xba, 0xa6, 0x76, 0x19, 0xc5, 0x0c, 0x49, 0x4b, 0x09, 0xd0, 0x02, 0x29,
0xda, 0xb7, 0xcb, 0x33, 0xe7, 0xde, 0x39, 0xe7, 0x5c, 0x52, 0x10, 0x74, 0x36, 0x37, 0xab, 0x3c,
0xca, 0x9e, 0x6f, 0xb2, 0x94, 0xa7, 0xb8, 0xb6, 0xb9, 0x1a, 0xba, 0xd0, 0xf4, 0x79, 0xc4, 0x6f,
0x72, 0xfc, 0x35, 0x00, 0xcb, 0xb2, 0x34, 0x0b, 0xe7, 0xe9, 0x82, 0xf5, 0x95, 0x81, 0x72, 0xdc,
0x1b, 0x75, 0x9f, 0x6f, 0xae, 0x9e, 0x13, 0x81, 0x9a, 0xe9, 0x82, 0x51, 0x8d, 0x55, 0x25, 0x7e,
0x08, 0xcd, 0x8c, 0x45, 0x79, 0x9a, 0xf4, 0x6b, 0x03, 0xe5, 0x58, 0xa3, 0xe5, 0xd3, 0xf0, 0x04,
0xba, 0x3e, 0x5b, 0xae, 0x59, 0xc2, 0x29, 0x9b, 0xa7, 0xd9, 0x02, 0x3f, 0x86, 0x76, 0xce, 0x96,
0x61, 0x9c, 0x5c, 0xa7, 0x7d, 0x65, 0xa0, 0x1e, 0x6b, 0xb4, 0x95, 0xb3, 0xa5, 0x9d, 0x5c, 0xa7,
0xc3, 0x1f, 0xe0, 0xe0, 0x9c, 0xcd, 0x79, 0x9a, 0xd1, 0xf4, 0x7d, 0xc9, 0xfe, 0x3f, 0xc0, 0xf5,
0x2a, 0x8d, 0x78, 0xb8, 0x88, 0x78, 0x24, 0xf9, 0x35, 0xaa, 0x49, 0xc4, 0x8a, 0x78, 0x84, 0x9f,
0x82, 0x7e, 0x15, 0x27, 0x51, 0xb6, 0x2d, 0xce, 0xc5, 0xd5, 0x1d, 0x0a, 0x05, 0x24, 0x08, 0xc3,
0x5f, 0x14, 0x00, 0x83, 0xf3, 0xac, 0x1c, 0xf7, 0x14, 0xf4, 0x38, 0xe1, 0x2f, 0x47, 0xe1, 0x6d,
0xb4, 0xba, 0x61, 0x72, 0x5e, 0x83, 0x82, 0x84, 0xce, 0x05, 0x52, 0x12, 0x4e, 0x5f, 0x95, 0x84,
0xda, 0x40, 0x3d, 0x56, 0x25, 0xe1, 0xf4, 0xd5, 0x1d, 0xa1, 0x10, 0x54, 0x10, 0x54, 0xa9, 0xa8,
0xd0, 0x58, 0x10, 0x3e, 0x83, 0xce, 0x22, 0xbd, 0xb9, 0x5a, 0xb1, 0x92, 0x51, 0x1f, 0xa8, 0xc7,
0x0a, 0xd5, 0x0b, 0x4c, 0x52, 0x86, 0xdf, 0x43, 0xa7, 0xf4, 0x59, 0xa8, 0x7a, 0x06, 0xad, 0x4c,
0x56, 0xb9, 0x54, 0xa4, 0x8f, 0xee, 0x8b, 0x98, 0x3f, 0x8a, 0x82, 0x56, 0x9c, 0x21, 0x05, 0xbd,
0x38, 0x9b, 0x46, 0x59, 0xb4, 0xc6, 0x18, 0xea, 0x3f, 0x89, 0xdc, 0x15, 0x99, 0xbb, 0xac, 0xf1,
0x0b, 0x80, 0x2c, 0x7d, 0x1f, 0x16, 0x1d, 0x32, 0x16, 0x7d, 0x84, 0xf6, 0x86, 0x16, 0x13, 0xb5,
0xac, 0x1a, 0x3e, 0xfc, 0x4d, 0x01, 0x18, 0xc7, 0x6c, 0xb5, 0x28, 0x4c, 0x88, 0xd8, 0xc5, 0x53,
0x98, 0x44, 0x6b, 0x56, 0x4e, 0xd6, 0x24, 0xe2, 0x46, 0x6b, 0x86, 0x07, 0x50, 0xe7, 0xdb, 0x0d,
0x93, 0x83, 0x7b, 0xa3, 0x8e, 0x18, 0x2c, 0xd2, 0x0e, 0xb6, 0x1b, 0x46, 0xe5, 0x09, 0x7e, 0x01,
0x7a, 0xc4, 0x79, 0x56, 0x29, 0x50, 0xa5, 0x82, 0x9e, 0x20, 0xee, 0xb6, 0x41, 0x21, 0xda, 0x6d,
0xe6, 0x5b, 0xe8, 0xde, 0x4a, 0x6d, 0x55, 0x4b, 0xfd, 0x2f, 0x44, 0x77, 0x6e, 0xf7, 0x9e, 0x86,
0xbf, 0xd7, 0xa0, 0x3b, 0x95, 0xef, 0xf0, 0x84, 0xe5, 0x79, 0xb4, 0x64, 0xf8, 0x2b, 0x38, 0x98,
0xa7, 0xab, 0x15, 0x9b, 0xf3, 0x38, 0x4d, 0xf6, 0xf5, 0xf7, 0x76, 0xb0, 0x34, 0xf1, 0x25, 0x34,
0xa5, 0xa3, 0x5c, 0x6e, 0xb9, 0x54, 0xb7, 0xcb, 0x80, 0x96, 0xa7, 0xf8, 0x09, 0x68, 0x2c, 0xe1,
0x31, 0xdf, 0x86, 0x71, 0x61, 0x44, 0xa5, 0xed, 0x02, 0xb0, 0x17, 0xf8, 0x73, 0xe8, 0x6e, 0xa2,
0x8c, 0xc7, 0xf2, 0x32, 0x1e, 0x2d, 0xa5, 0x6c, 0x8d, 0x76, 0xee, 0xc0, 0x20, 0x5a, 0xe2, 0x11,
0x94, 0xa2, 0xc3, 0x8d, 0xd8, 0x58, 0xbf, 0x21, 0xad, 0x1d, 0xec, 0xac, 0xc9, 0x45, 0x52, 0xfd,
0x76, 0x6f, 0xab, 0xcf, 0xe4, 0x67, 0x22, 0xbe, 0x9b, 0xbc, 0xdf, 0x94, 0xfc, 0x43, 0xc1, 0xff,
0xe0, 0x5b, 0xa2, 0x77, 0x14, 0xfc, 0x3f, 0xd0, 0x78, 0xbc, 0x66, 0x39, 0x8f, 0xd6, 0x9b, 0x7e,
0x4b, 0x8a, 0xdc, 0x01, 0xc2, 0xc2, 0x7c, 0x15, 0xb3, 0x84, 0x0b, 0x0b, 0xed, 0xc2, 0x42, 0x01,
0xd8, 0x0b, 0xfc, 0x05, 0xb4, 0xd7, 0xf9, 0x32, 0x94, 0x0b, 0xd5, 0xe4, 0x42, 0x41, 0xdc, 0xe4,
0x6d, 0xe4, 0x3a, 0x5b, 0xeb, 0x7c, 0x29, 0x8a, 0xe1, 0x1f, 0x35, 0xe8, 0x7d, 0x90, 0x74, 0xfe,
0x9f, 0x47, 0xad, 0xfe, 0x1b, 0x51, 0xab, 0x9f, 0x18, 0xb5, 0xfa, 0x89, 0x51, 0xab, 0x7f, 0x1b,
0xb5, 0xfa, 0x0f, 0xa2, 0x3e, 0xf9, 0xb5, 0x0e, 0xda, 0xdd, 0x8f, 0x2c, 0xd6, 0xa1, 0xe5, 0xcf,
0x4c, 0x93, 0xf8, 0x3e, 0xba, 0x87, 0x8f, 0x00, 0xcd, 0x5c, 0x72, 0x31, 0x25, 0x66, 0x40, 0xac,
0x90, 0x50, 0xea, 0x51, 0xa4, 0x60, 0x0c, 0x3d, 0xd3, 0x73, 0x5d, 0x62, 0x06, 0xe1, 0xd8, 0xb0,
0x1d, 0x62, 0xa1, 0x1a, 0x7e, 0x00, 0x87, 0x53, 0x42, 0x27, 0xb6, 0xef, 0xdb, 0x9e, 0x1b, 0x5a,
0xc4, 0xb5, 0x89, 0x85, 0x54, 0xfc, 0x18, 0x1e, 0x98, 0x9e, 0xe3, 0x10, 0x33, 0x10, 0xb0, 0xeb,
0x05, 0x21, 0xb9, 0xb0, 0xfd, 0xc0, 0x47, 0x75, 0x31, 0xdb, 0x76, 0x1c, 0xf2, 0xda, 0x70, 0x42,
0x83, 0xbe, 0x9e, 0x4d, 0x88, 0x1b, 0xa0, 0x86, 0x98, 0x53, 0xa1, 0x96, 0x3d, 0x21, 0xae, 0x18,
0x87, 0x5a, 0xf8, 0x21, 0xe0, 0x0a, 0xb6, 0x5d, 0x8b, 0x5c, 0x84, 0xc1, 0xe5, 0x94, 0xa0, 0x36,
0x7e, 0x02, 0x8f, 0x2a, 0x7c, 0xff, 0x1e, 0x63, 0x42, 0x90, 0x86, 0x11, 0x74, 0xaa, 0xc3, 0xc0,
0x9b, 0xbe, 0x45, 0xb0, 0x3f, 0x9d, 0x7a, 0xef, 0x28, 0x31, 0x3d, 0x6a, 0x21, 0x7d, 0x1f, 0x3e,
0x27, 0x66, 0xe0, 0xd1, 0xd0, 0xb6, 0x50, 0x47, 0x88, 0xaf, 0x60, 0x9f, 0x18, 0xd4, 0x7c, 0x13,
0x52, 0xe2, 0xcf, 0x9c, 0x00, 0x75, 0x45, 0x04, 0x63, 0xdb, 0x21, 0xd2, 0xd1, 0xd8, 0x9b, 0xb9,
0x16, 0xea, 0xe1, 0x03, 0xd0, 0x27, 0x24, 0x30, 0xaa, 0x4c, 0x0e, 0xc4, 0xfd, 0xa6, 0x61, 0xbe,
0x21, 0x15, 0x82, 0x70, 0x1f, 0x8e, 0x4c, 0xc3, 0x15, 0x4d, 0x26, 0x25, 0x46, 0x40, 0xc2, 0xb1,
0xe7, 0x58, 0x84, 0xa2, 0x43, 0x61, 0xf0, 0xa3, 0x13, 0xdb, 0x21, 0x08, 0xef, 0x75, 0x58, 0xc4,
0x21, 0xbb, 0x8e, 0xfb, 0x7b, 0x1d, 0xd5, 0x89, 0xe8, 0x38, 0x12, 0x66, 0xce, 0x66, 0xb6, 0x63,
0x95, 0x41, 0x15, 0x4b, 0x7b, 0x80, 0x0f, 0xa1, 0x5b, 0x99, 0x71, 0x1d, 0xdb, 0x0f, 0xd0, 0x43,
0xfc, 0x08, 0xee, 0x57, 0xd0, 0x84, 0x04, 0xd4, 0x36, 0x8b, 0x54, 0x1f, 0x09, 0xae, 0x37, 0x0b,
0x42, 0x6f, 0x1c, 0x4e, 0xc8, 0xc4, 0xa3, 0x97, 0xa8, 0x7f, 0xf2, 0xb3, 0x02, 0xed, 0xea, 0x47,
0x17, 0xb7, 0xa1, 0xee, 0x7a, 0x2e, 0x41, 0xf7, 0x44, 0x75, 0xe6, 0x79, 0x0e, 0x52, 0x44, 0x65,
0xbb, 0xc1, 0x77, 0xa8, 0x86, 0x35, 0x68, 0xd8, 0x6e, 0xf0, 0xcd, 0x29, 0x52, 0xcb, 0xf2, 0xe5,
0x08, 0xd5, 0xcb, 0xf2, 0xf4, 0x15, 0x6a, 0x88, 0x72, 0xec, 0x78, 0x46, 0x80, 0x00, 0x03, 0x34,
0x2d, 0x6f, 0x76, 0xe6, 0x10, 0xa4, 0x8b, 0xda, 0x0f, 0xa8, 0xed, 0xbe, 0x46, 0x47, 0x42, 0x41,
0xb9, 0x89, 0x33, 0xdb, 0x35, 0xe8, 0x25, 0x5a, 0x88, 0x34, 0x4b, 0xa8, 0x68, 0x66, 0x27, 0xef,
0xa0, 0x59, 0xbc, 0xcb, 0xa2, 0xd5, 0x4e, 0x72, 0x96, 0x71, 0x74, 0x4f, 0x8e, 0x64, 0x2b, 0xc6,
0x19, 0x52, 0xe4, 0x48, 0x16, 0x65, 0xf3, 0x1f, 0x51, 0x0d, 0x77, 0xa0, 0x1d, 0xc4, 0x6b, 0xe6,
0x6f, 0x93, 0x39, 0x52, 0xc5, 0x6b, 0xfe, 0x96, 0x6d, 0x47, 0x3e, 0x5b, 0xa2, 0x3a, 0xee, 0x01,
0x88, 0x7f, 0x21, 0x71, 0xce, 0xe3, 0x79, 0x8e, 0x1a, 0x57, 0x4d, 0xf9, 0x07, 0xe5, 0xe5, 0x9f,
0x01, 0x00, 0x00, 0xff, 0xff, 0x27, 0x2a, 0xd3, 0x11, 0xb0, 0x08, 0x00, 0x00,
}

View File

@ -52,9 +52,17 @@ enum DataType {
VECTOR_FLOAT = 101;
}
message KeyValuePair {
string key = 1;
string value = 2;
enum OpType {
Insert = 0;
Delete = 1;
Search = 2;
TimeSync = 3;
Key2Seg = 4;
Statistics = 5;
}
message SegmentRecord {
repeated string seg_info = 1;
}
message VectorRowRecord {
@ -85,50 +93,28 @@ message FieldValue {
VectorRecord vector_record = 4;
}
message Entities {
Status status = 1;
repeated int64 ids = 2;
repeated bool valid_row = 3;
repeated FieldValue fields = 4;
}
message InsertParam {
message PulsarMessage {
string collection_name = 1;
repeated FieldValue fields = 2;
repeated int64 entity_id_array = 3; //optional
int64 entity_id = 3;
string partition_tag = 4;
repeated int64 timestamp = 5;
int64 grpc_server_client_id = 6;
repeated KeyValuePair extra_params = 7;
VectorParam vector_param =5;
SegmentRecord segments = 6;
int64 timestamp = 7;
int64 client_id = 8;
OpType msg_type = 9;
}
message EntityIds {
Status status = 1;
repeated int64 entity_id_array = 2;
}
message DeleteByIDParam {
message PulsarMessages {
string collection_name = 1;
repeated int64 id_array = 2;
repeated int64 timestamp = 3;
int64 grpc_server_client_id = 4;
repeated FieldValue fields = 2;
repeated int64 entity_id = 3;
string partition_tag = 4;
repeated VectorParam vector_param =5;
repeated SegmentRecord segments = 6;
repeated int64 timestamp = 7;
repeated int64 client_id = 8;
OpType msg_type = 9;
}
message SearchParam {
string collection_name = 1;
repeated string partition_tag_array = 2;
repeated VectorParam vector_param = 3;
string dsl = 4;
repeated int64 timestamp = 5;
int64 grpc_server_client_id = 6;
repeated KeyValuePair extra_params = 7;
}
message QueryResult {
Status status = 1;
Entities entities = 2;
int64 row_num = 3;
repeated float scores = 4;
repeated float distances = 5;
repeated KeyValuePair extra_params = 6;
}

60
pulsar/query_node.go Normal file
View File

@ -0,0 +1,60 @@
package pulsar
import (
"fmt"
"suvlim/pulsar/schema"
"sync"
"time"
)
type QueryNode struct {
mc MessageClient
}
func (qn *QueryNode)doQueryNode(wg sync.WaitGroup) {
wg.Add(3)
go qn.insert_query(qn.mc.insertMsg, wg)
go qn.delete_query(qn.mc.deleteMsg, wg)
go qn.search_query(qn.mc.searchMsg, wg)
wg.Wait()
}
func (qn *QueryNode) PrepareBatchMsg() {
qn.mc.PrepareBatchMsg(JobType(0))
}
func main() {
mc := MessageClient{}
topics := []string{"insert", "delete"}
mc.InitClient("pulsar://localhost:6650", topics)
go mc.ReceiveMessage()
qn := QueryNode{mc}
for {
time.Sleep(200 * time.Millisecond)
qn.PrepareBatchMsg()
qn.doQueryNode(wg)
fmt.Println("do a batch in 200ms")
}
}
func (qn *QueryNode) insert_query(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status{
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}
func (qn *QueryNode) delete_query(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status{
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}
func (qn *QueryNode) search_query(data []*schema.SearchMsg, wg sync.WaitGroup) schema.Status{
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}

View File

@ -83,6 +83,10 @@ type VectorParam struct {
RowRecord *VectorRecord
}
type SegmentRecord struct {
segInfo []string
}
type OpType int
const (
@ -92,8 +96,21 @@ const (
TimeSync OpType = 3
Key2Seg OpType = 4
Statistics OpType = 5
EOF OpType = 6
)
type PulsarMessage struct {
CollectionName string
Fields []*FieldValue
EntityId int64
PartitionTag string
VectorParam *VectorParam
Segments []*SegmentRecord
Timestamp int64
ClientId int64
MsgType OpType
}
type Message interface {
GetType() OpType
Serialization() []byte
@ -135,7 +152,7 @@ type TimeSyncMsg struct {
type Key2SegMsg struct {
EntityId int64
Segments []string
Segments []*SegmentRecord
MsgType OpType
}
@ -170,3 +187,7 @@ func (tms *TimeSyncMsg) GetType() OpType {
func (kms *Key2SegMsg) GetType() OpType {
return kms.MsgType
}
type SyncEofMsg struct {
MsgType OpType
}

View File

@ -1,27 +1,52 @@
package pulsar
import (
"fmt"
"suvlim/pulsar/schema"
"sync"
"time"
)
func BeforeSend() schema.Message {
segs := make([]string, 2, 2)
segs[0] = "seg1"
segs[1] = "seg2"
var msg schema.Message = &schema.Key2SegMsg{EntityId: 1, Segments: segs, MsgType: schema.OpType(4)}
return msg
type WriteNode struct {
mc MessageClient
}
func insert([]*schema.InsertMsg) schema.Status{
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}
var wg sync.WaitGroup
func delete([]*schema.DeleteMsg) schema.Status{
msg := BeforeSend()
go Send(msg)
func (wn *WriteNode)doWriteNode(wg sync.WaitGroup) {
wg.Add(2)
go wn.insert_write(wn.mc.insertMsg, wg)
go wn.delete_write(wn.mc.deleteMsg, wg)
wg.Wait()
}
func (wn *WriteNode) PrepareBatchMsg() {
wn.mc.PrepareBatchMsg(JobType(1))
}
func main() {
mc := MessageClient{}
topics := []string{"insert", "delete"}
mc.InitClient("pulsar://localhost:6650", topics)
go mc.ReceiveMessage()
wn := WriteNode{mc}
for {
time.Sleep(200 * time.Millisecond)
wn.PrepareBatchMsg()
wn.doWriteNode(wg)
fmt.Println("do a batch in 200ms")
}
}
func (wn *WriteNode) insert_write(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status{
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}
func (wn *WriteNode) delete_write(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status{
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}