Fix bug :GetTimeTick not return error if TimeBarrier is closed

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
pull/4973/head^2
neza2017 2020-11-07 13:19:31 +08:00 committed by yefu.chen
parent 86d2d36053
commit 7e182a230a
10 changed files with 450 additions and 657 deletions

View File

@ -1173,7 +1173,7 @@ type softTimeTickBarrier struct {
ctx context.Context
}
func (ttBarrier *softTimeTickBarrier) GetTimeTick() Timestamp
func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp,error)
func (ttBarrier *softTimeTickBarrier) Start() error
func newSoftTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds []UniqueId, minTtInterval Timestamp) *softTimeTickBarrier
@ -1193,7 +1193,7 @@ type hardTimeTickBarrier struct {
ctx context.Context
}
func (ttBarrier *hardTimeTickBarrier) GetTimeTick() Timestamp
func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp,error)
func (ttBarrier *hardTimeTickBarrier) Start() error
func newHardTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds []UniqueId) *softTimeTickBarrier

View File

@ -1,169 +0,0 @@
package msgstream
import (
"github.com/golang/protobuf/proto"
commonPb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
type TsMsgMarshaler interface {
Marshal(input *TsMsg) ([]byte, commonPb.Status)
Unmarshal(input []byte) (*TsMsg, commonPb.Status)
}
func GetMarshalers(inputMsgType MsgType, outputMsgType MsgType) (*TsMsgMarshaler, *TsMsgMarshaler) {
return GetMarshaler(inputMsgType), GetMarshaler(outputMsgType)
}
func GetMarshaler(MsgType MsgType) *TsMsgMarshaler {
switch MsgType {
case internalPb.MsgType_kInsert:
insertMarshaler := &InsertMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = insertMarshaler
return &tsMsgMarshaller
case internalPb.MsgType_kDelete:
deleteMarshaler := &DeleteMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = deleteMarshaler
return &tsMsgMarshaller
case internalPb.MsgType_kSearch:
searchMarshaler := &SearchMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = searchMarshaler
return &tsMsgMarshaller
case internalPb.MsgType_kSearchResult:
searchResultMarshler := &SearchResultMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = searchResultMarshler
return &tsMsgMarshaller
case internalPb.MsgType_kTimeTick:
timeTickMarshaler := &TimeTickMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = timeTickMarshaler
return &tsMsgMarshaller
default:
return nil
}
}
//////////////////////////////////////Insert///////////////////////////////////////////////
type InsertMarshaler struct{}
func (im *InsertMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) {
insertTask := (*input).(*InsertMsg)
insertRequest := &insertTask.InsertRequest
mb, err := proto.Marshal(insertRequest)
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
func (im *InsertMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) {
insertRequest := internalPb.InsertRequest{}
err := proto.Unmarshal(input, &insertRequest)
insertMsg := &InsertMsg{InsertRequest: insertRequest}
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
var tsMsg TsMsg = insertMsg
return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
/////////////////////////////////////Delete//////////////////////////////////////////////
type DeleteMarshaler struct{}
func (dm *DeleteMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) {
deleteMsg := (*input).(*DeleteMsg)
deleteRequest := &deleteMsg.DeleteRequest
mb, err := proto.Marshal(deleteRequest)
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
func (dm *DeleteMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) {
deleteRequest := internalPb.DeleteRequest{}
err := proto.Unmarshal(input, &deleteRequest)
deleteMsg := &DeleteMsg{DeleteRequest: deleteRequest}
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
var tsMsg TsMsg = deleteMsg
return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
/////////////////////////////////////Search///////////////////////////////////////////////
type SearchMarshaler struct{}
func (sm *SearchMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) {
searchMsg := (*input).(*SearchMsg)
searchRequest := &searchMsg.SearchRequest
mb, err := proto.Marshal(searchRequest)
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
func (sm *SearchMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) {
searchRequest := internalPb.SearchRequest{}
err := proto.Unmarshal(input, &searchRequest)
searchMsg := &SearchMsg{SearchRequest: searchRequest}
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
var tsMsg TsMsg = searchMsg
return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
/////////////////////////////////////SearchResult///////////////////////////////////////////////
type SearchResultMarshaler struct{}
func (srm *SearchResultMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) {
searchResultMsg := (*input).(*SearchResultMsg)
searchResult := &searchResultMsg.SearchResult
mb, err := proto.Marshal(searchResult)
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
func (srm *SearchResultMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) {
searchResult := internalPb.SearchResult{}
err := proto.Unmarshal(input, &searchResult)
searchResultMsg := &SearchResultMsg{SearchResult: searchResult}
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
var tsMsg TsMsg = searchResultMsg
return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
/////////////////////////////////////TimeTick///////////////////////////////////////////////
type TimeTickMarshaler struct{}
func (tm *TimeTickMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) {
timeTickMsg := (*input).(*TimeTickMsg)
timeTick := &timeTickMsg.TimeTickMsg
mb, err := proto.Marshal(timeTick)
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
func (tm *TimeTickMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) {
timeTickMsg := internalPb.TimeTickMsg{}
err := proto.Unmarshal(input, &timeTickMsg)
timeTick := &TimeTickMsg{TimeTickMsg: timeTickMsg}
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
var tsMsg TsMsg = timeTick
return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}

View File

@ -2,13 +2,13 @@ package msgstream
import (
"context"
"github.com/gogo/protobuf/proto"
"log"
"sync"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/apache/pulsar-client-go/pulsar"
commonPb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
@ -28,45 +28,53 @@ type MsgStream interface {
Start()
Close()
SetRepackFunc(repackFunc RepackFunc)
SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler)
Produce(*MsgPack) commonPb.Status
Consume() *MsgPack // message can be consumed exactly once
Produce(*MsgPack) error
Broadcast(*MsgPack) error
Consume() *MsgPack
}
type PulsarMsgStream struct {
client *pulsar.Client
producers []*pulsar.Producer
consumers []*pulsar.Consumer
repackFunc RepackFunc // return a map from produceChannel idx to *MsgPack
ctx context.Context
client *pulsar.Client
producers []*pulsar.Producer
consumers []*pulsar.Consumer
repackFunc RepackFunc
unmarshal *UnmarshalDispatcher
receiveBuf chan *MsgPack
receiveBufSize int64
wait sync.WaitGroup
}
receiveBuf chan *MsgPack
msgMarshaler *TsMsgMarshaler
msgUnmarshaler *TsMsgMarshaler
inputChannel chan *MsgPack
outputChannel chan *MsgPack
func NewPulsarMsgStream(ctx context.Context, receiveBufSize int64) *PulsarMsgStream{
return &PulsarMsgStream{
ctx: ctx,
receiveBufSize: receiveBufSize,
}
}
func (ms *PulsarMsgStream) SetPulsarCient(address string) {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address})
if err != nil {
log.Printf("connect pulsar failed, %v", err)
log.Printf("Set pulsar client failed, error = %v", err)
}
ms.client = &client
}
func (ms *PulsarMsgStream) SetProducers(channels []string) {
func (ms *PulsarMsgStream) CreatePulsarProducers(channels []string) {
for i := 0; i < len(channels); i++ {
pp, err := (*ms.client).CreateProducer(pulsar.ProducerOptions{Topic: channels[i]})
if err != nil {
log.Printf("failed to create reader producer %s, error = %v", channels[i], err)
log.Printf("Failed to create reader producer %s, error = %v", channels[i], err)
}
ms.producers = append(ms.producers, &pp)
}
}
func (ms *PulsarMsgStream) SetConsumers(channels []string, subName string, pulsarBufSize int64) {
func (ms *PulsarMsgStream) CreatePulsarConsumers(channels []string,
subName string,
unmarshal *UnmarshalDispatcher,
pulsarBufSize int64) {
ms.unmarshal = unmarshal
for i := 0; i < len(channels); i++ {
receiveChannel := make(chan pulsar.ConsumerMessage, pulsarBufSize)
pc, err := (*ms.client).Subscribe(pulsar.ConsumerOptions{
@ -77,22 +85,18 @@ func (ms *PulsarMsgStream) SetConsumers(channels []string, subName string, pulsa
MessageChannel: receiveChannel,
})
if err != nil {
log.Printf("failed to subscribe topic, error = %v", err)
log.Printf("Failed to subscribe topic, error = %v", err)
}
ms.consumers = append(ms.consumers, &pc)
}
}
func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) {
ms.msgMarshaler = marshal
ms.msgUnmarshaler = unmarshal
}
func (ms *PulsarMsgStream) SetRepackFunc(repackFunc RepackFunc) {
ms.repackFunc = repackFunc
}
func (ms *PulsarMsgStream) Start() {
ms.wait.Add(1)
go ms.bufMsgPackToChannel()
}
@ -110,17 +114,14 @@ func (ms *PulsarMsgStream) Close() {
if ms.client != nil {
(*ms.client).Close()
}
ms.wait.Wait()
}
func (ms *PulsarMsgStream) InitMsgPackBuf(msgPackBufSize int64) {
ms.receiveBuf = make(chan *MsgPack, msgPackBufSize)
}
func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) commonPb.Status {
func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
tsMsgs := msgPack.Msgs
if len(tsMsgs) <= 0 {
log.Println("receive empty msgPack")
return commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
log.Printf("Warning: Receive empty msgPack")
return nil
}
reBucketValues := make([][]int32, len(tsMsgs))
for channelId, tsMsg := range tsMsgs {
@ -151,46 +152,41 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) commonPb.Status {
}
for k, v := range result {
for i := 0; i < len(v.Msgs); i++ {
mb, status := (*ms.msgMarshaler).Marshal(v.Msgs[i])
if status.ErrorCode != commonPb.ErrorCode_SUCCESS {
log.Printf("Marshal ManipulationReqMsg failed, error ")
continue
mb, err := (*v.Msgs[i]).Marshal(v.Msgs[i])
if err != nil {
return err
}
if _, err := (*ms.producers[k]).Send(
context.Background(),
&pulsar.ProducerMessage{Payload: mb},
); err != nil {
log.Printf("post into pulsar filed, error = %v", err)
return err
}
}
}
return commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
return nil
}
func (ms *PulsarMsgStream) BroadCast(msgPack *MsgPack) commonPb.Status {
func (ms *PulsarMsgStream) Broadcast(msgPack *MsgPack) error {
producerLen := len(ms.producers)
for _, v := range msgPack.Msgs {
mb, status := (*ms.msgMarshaler).Marshal(v)
if status.ErrorCode != commonPb.ErrorCode_SUCCESS {
log.Printf("Marshal ManipulationReqMsg failed, error ")
continue
mb, err := (*v).Marshal(v)
if err != nil {
return err
}
for i := 0; i < producerLen; i++ {
if _, err := (*ms.producers[i]).Send(
context.Background(),
&pulsar.ProducerMessage{Payload: mb},
); err != nil {
log.Printf("post into pulsar filed, error = %v", err)
return err
}
}
}
return commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
return nil
}
func (ms *PulsarMsgStream) Consume() *MsgPack {
ctx := context.Background()
for {
select {
case cm, ok := <-ms.receiveBuf:
@ -199,34 +195,56 @@ func (ms *PulsarMsgStream) Consume() *MsgPack {
return nil
}
return cm
case <-ctx.Done():
case <-ms.ctx.Done():
log.Printf("context closed")
return nil
}
}
}
func (ms *PulsarMsgStream) bufMsgPackToChannel() {
defer ms.wait.Done()
ms.receiveBuf = make(chan *MsgPack, ms.receiveBufSize)
for {
tsMsgList := make([]*TsMsg, 0)
for i := 0; i < len(ms.consumers); i++ {
consumerChan := (*ms.consumers[i]).Chan()
chanLen := len(consumerChan)
for l := 0; l < chanLen; l++ {
pulsarMsg, ok := <-consumerChan
if ok == false {
log.Printf("channel closed")
select {
case <-ms.ctx.Done():
return
default:
tsMsgList := make([]*TsMsg, 0)
for i := 0; i < len(ms.consumers); i++ {
consumerChan := (*ms.consumers[i]).Chan()
chanLen := len(consumerChan)
for l := 0; l < chanLen; l++ {
pulsarMsg, ok := <-consumerChan
if ok == false {
log.Printf("channel closed")
return
}
(*ms.consumers[i]).AckID(pulsarMsg.ID())
headerMsg := internalPb.MsgHeader{}
err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg)
if err != nil {
log.Printf("Failed to unmarshal message header, error = %v", err)
continue
}
unMarshalFunc, ok:= (*ms.unmarshal).tempMap[headerMsg.MsgType]
if ok == false {
log.Printf("Not set unmarshalFunc for messageType %v", headerMsg.MsgType)
continue
}
tsMsg, err := unMarshalFunc(pulsarMsg.Payload())
if err != nil {
log.Printf("Failed to unmarshal tsMsg, error = %v", err)
continue
}
tsMsgList = append(tsMsgList, tsMsg)
}
(*ms.consumers[i]).AckID(pulsarMsg.ID())
tsMsg, status := (*ms.msgUnmarshaler).Unmarshal(pulsarMsg.Payload())
if status.ErrorCode != commonPb.ErrorCode_SUCCESS {
log.Printf("Marshal ManipulationReqMsg failed, error ")
}
tsMsgList = append(tsMsgList, tsMsg)
}
}
if len(tsMsgList) > 0 {
msgPack := MsgPack{Msgs: tsMsgList}
ms.receiveBuf <- &msgPack
if len(tsMsgList) > 0 {
msgPack := MsgPack{Msgs: tsMsgList}
ms.receiveBuf <- &msgPack
}
}
}
}
@ -235,73 +253,101 @@ type PulsarTtMsgStream struct {
PulsarMsgStream
inputBuf []*TsMsg
unsolvedBuf []*TsMsg
msgPacks []*MsgPack
lastTimeStamp Timestamp
}
func NewPulsarTtMsgStream(ctx context.Context, receiveBufSize int64) *PulsarTtMsgStream {
pulsarMsgStream := PulsarMsgStream{
ctx: ctx,
receiveBufSize: receiveBufSize,
}
return &PulsarTtMsgStream{
PulsarMsgStream: pulsarMsgStream,
}
}
func (ms *PulsarTtMsgStream) Start() {
ms.wait.Add(1)
go ms.bufMsgPackToChannel()
}
func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
wg := sync.WaitGroup{}
wg.Add(len(ms.consumers))
eofMsgTimeStamp := make(map[int]Timestamp)
mu := sync.Mutex{}
for i := 0; i < len(ms.consumers); i++ {
go ms.findTimeTick(context.Background(), i, eofMsgTimeStamp, &wg, &mu)
}
wg.Wait()
timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp)
if ok == false {
log.Fatal("timeTick err")
}
defer ms.wait.Done()
ms.receiveBuf = make(chan *MsgPack, ms.receiveBufSize)
ms.unsolvedBuf = make([]*TsMsg, 0)
ms.inputBuf = make([]*TsMsg, 0)
for {
select {
case <-ms.ctx.Done():
return
default:
wg := sync.WaitGroup{}
wg.Add(len(ms.consumers))
eofMsgTimeStamp := make(map[int]Timestamp)
mu := sync.Mutex{}
for i := 0; i < len(ms.consumers); i++ {
go ms.findTimeTick(i, eofMsgTimeStamp, &wg, &mu)
}
wg.Wait()
timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp)
if ok == false {
log.Printf("timeTick err")
}
timeTickBuf := make([]*TsMsg, 0)
ms.inputBuf = append(ms.inputBuf, ms.unsolvedBuf...)
ms.unsolvedBuf = ms.unsolvedBuf[:0]
for _, v := range ms.inputBuf {
if (*v).EndTs() >= timeStamp {
timeTickBuf = append(timeTickBuf, v)
} else {
ms.unsolvedBuf = append(ms.unsolvedBuf, v)
timeTickBuf := make([]*TsMsg, 0)
ms.inputBuf = append(ms.inputBuf, ms.unsolvedBuf...)
ms.unsolvedBuf = ms.unsolvedBuf[:0]
for _, v := range ms.inputBuf {
if (*v).EndTs() <= timeStamp {
timeTickBuf = append(timeTickBuf, v)
} else {
ms.unsolvedBuf = append(ms.unsolvedBuf, v)
}
}
ms.inputBuf = ms.inputBuf[:0]
msgPack := MsgPack{
BeginTs: ms.lastTimeStamp,
EndTs: timeStamp,
Msgs: timeTickBuf,
}
ms.receiveBuf <- &msgPack
ms.lastTimeStamp = timeStamp
}
}
ms.inputBuf = ms.inputBuf[:0]
msgPack := MsgPack{
BeginTs: ms.lastTimeStamp,
EndTs: timeStamp,
Msgs: timeTickBuf,
}
ms.receiveBuf <- &msgPack
}
func (ms *PulsarTtMsgStream) findTimeTick(ctx context.Context,
channelIndex int,
func (ms *PulsarTtMsgStream) findTimeTick(channelIndex int,
eofMsgMap map[int]Timestamp,
wg *sync.WaitGroup,
mu *sync.Mutex) {
defer wg.Done()
for {
select {
case <-ctx.Done():
case <-ms.ctx.Done():
return
case pulsarMsg, ok := <-(*ms.consumers[channelIndex]).Chan():
if ok == false {
log.Fatal("consumer closed!")
continue
}
(*ms.consumers[channelIndex]).Ack(pulsarMsg)
tsMsg, status := (*ms.msgUnmarshaler).Unmarshal(pulsarMsg.Payload())
// TODO:: Find the EOF
if (*tsMsg).Type() == internalPb.MsgType_kTimeTick {
eofMsgMap[channelIndex] = (*tsMsg).EndTs()
wg.Done()
log.Printf("consumer closed!")
return
}
if status.ErrorCode != commonPb.ErrorCode_SUCCESS {
log.Printf("Marshal ManipulationReqMsg failed, error ")
(*ms.consumers[channelIndex]).Ack(pulsarMsg)
headerMsg := internalPb.MsgHeader{}
err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg)
if err != nil {
log.Printf("Failed to unmarshal, error = %v", err)
}
unMarshalFunc := (*ms.unmarshal).tempMap[headerMsg.MsgType]
tsMsg, err := unMarshalFunc(pulsarMsg.Payload())
if err != nil {
log.Printf("Failed to unmarshal, error = %v", err)
}
if headerMsg.MsgType == internalPb.MsgType_kTimeTick {
eofMsgMap[channelIndex] = (*tsMsg).(*TimeTickMsg).Timestamp
return
}
mu.Lock()
ms.inputBuf = append(ms.inputBuf, tsMsg)

View File

@ -1,6 +1,7 @@
package msgstream
import (
"context"
"fmt"
"testing"
@ -27,7 +28,9 @@ func repackFunc(msgs []*TsMsg, hashKeys [][]int32) map[int32]*MsgPack {
func getTsMsg(msgType MsgType, reqId UniqueID, hashValue int32) *TsMsg {
var tsMsg TsMsg
baseMsg := BaseMsg{
HashValues: []int32{hashValue},
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []int32{hashValue},
}
switch msgType {
case internalPb.MsgType_kInsert:
@ -104,42 +107,82 @@ func getTsMsg(msgType MsgType, reqId UniqueID, hashValue int32) *TsMsg {
return &tsMsg
}
func initStream(pulsarAddress string,
func getTimeTickMsg(msgType MsgType, reqId UniqueID, hashValue int32, time uint64) *TsMsg {
var tsMsg TsMsg
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []int32{hashValue},
}
timeTickResult := internalPb.TimeTickMsg{
MsgType: internalPb.MsgType_kTimeTick,
PeerId: reqId,
Timestamp: time,
}
timeTickMsg := &TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
tsMsg = timeTickMsg
return &tsMsg
}
func initPulsarStream(pulsarAddress string,
producerChannels []string,
consumerChannels []string,
consumerSubName string,
msgPack *MsgPack,
inputMsgType MsgType,
outputMsgType MsgType,
broadCast bool) {
opts ...RepackFunc) (*MsgStream, *MsgStream) {
// set input stream
inputStream := PulsarMsgStream{}
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.SetMsgMarshaler(GetMarshaler(inputMsgType), nil)
inputStream.SetProducers(producerChannels)
inputStream.SetRepackFunc(repackFunc)
inputStream.CreatePulsarProducers(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
}
var input MsgStream = inputStream
// set output stream
outputStream := PulsarMsgStream{}
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
outputStream.SetMsgMarshaler(nil, GetMarshaler(outputMsgType))
outputStream.SetConsumers(consumerChannels, consumerSubName, 100)
outputStream.InitMsgPackBuf(100)
unmarshalDispatcher := NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
var output MsgStream = outputStream
//send msgPack
if broadCast {
inputStream.BroadCast(msgPack)
} else {
inputStream.Produce(msgPack)
//outputStream.Start()
return &input, &output
}
func initPulsarTtStream(pulsarAddress string,
producerChannels []string,
consumerChannels []string,
consumerSubName string,
opts ...RepackFunc) (*MsgStream, *MsgStream) {
// set input stream
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
}
var input MsgStream = inputStream
// receive msg
// set output stream
outputStream := NewPulsarTtMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
var output MsgStream = outputStream
return &input, &output
}
func receiveMsg(outputStream *MsgStream, msgCount int) {
receiveCount := 0
for {
result := outputStream.Consume()
result := (*outputStream).Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
@ -147,19 +190,13 @@ func initStream(pulsarAddress string,
fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v)
}
}
if broadCast {
if receiveCount >= len(msgPack.Msgs)*len(producerChannels) {
break
}
} else {
if receiveCount >= len(msgPack.Msgs) {
break
}
if receiveCount >= msgCount {
break
}
}
}
func TestStream_Insert(t *testing.T) {
func TestStream_PulsarMsgStream_Insert(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
@ -169,11 +206,12 @@ func TestStream_Insert(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3))
//run stream
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kInsert, internalPb.MsgType_kInsert, false)
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
(*inputStream).Produce(&msgPack)
receiveMsg(outputStream, len(msgPack.Msgs))
}
func TestStream_Delete(t *testing.T) {
func TestStream_PulsarMsgStream_Delete(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"delete"}
consumerChannels := []string{"delete"}
@ -183,11 +221,12 @@ func TestStream_Delete(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kDelete, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kDelete, 3, 3))
//run stream
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kDelete, internalPb.MsgType_kDelete, false)
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
(*inputStream).Produce(&msgPack)
receiveMsg(outputStream, len(msgPack.Msgs))
}
func TestStream_Search(t *testing.T) {
func TestStream_PulsarMsgStream_Search(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"search"}
consumerChannels := []string{"search"}
@ -197,11 +236,12 @@ func TestStream_Search(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 3, 3))
//run stream
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kSearch, internalPb.MsgType_kSearch, false)
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
(*inputStream).Produce(&msgPack)
receiveMsg(outputStream, len(msgPack.Msgs))
}
func TestStream_SearchResult(t *testing.T) {
func TestStream_PulsarMsgStream_SearchResult(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"search"}
consumerChannels := []string{"search"}
@ -211,11 +251,12 @@ func TestStream_SearchResult(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 3, 3))
//run stream
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kSearchResult, internalPb.MsgType_kSearchResult, false)
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
(*inputStream).Produce(&msgPack)
receiveMsg(outputStream, len(msgPack.Msgs))
}
func TestStream_TimeTick(t *testing.T) {
func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"search"}
consumerChannels := []string{"search"}
@ -225,11 +266,12 @@ func TestStream_TimeTick(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 3, 3))
//run stream
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kTimeTick, internalPb.MsgType_kTimeTick, false)
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
(*inputStream).Produce(&msgPack)
receiveMsg(outputStream, len(msgPack.Msgs))
}
func TestStream_BroadCast(t *testing.T) {
func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
@ -239,6 +281,47 @@ func TestStream_BroadCast(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 3, 3))
//run stream
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kTimeTick, internalPb.MsgType_kTimeTick, true)
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
(*inputStream).Broadcast(&msgPack)
receiveMsg(outputStream, len(consumerChannels)*len(msgPack.Msgs))
}
func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, repackFunc)
(*inputStream).Produce(&msgPack)
receiveMsg(outputStream, len(msgPack.Msgs))
}
func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack0 := MsgPack{}
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(internalPb.MsgType_kTimeTick, 0, 0, 0))
msgPack1 := MsgPack{}
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1))
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3))
msgPack2 := MsgPack{}
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(internalPb.MsgType_kTimeTick, 5, 5, 5))
inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
(*inputStream).Broadcast(&msgPack0)
(*inputStream).Produce(&msgPack1)
(*inputStream).Broadcast(&msgPack2)
receiveMsg(outputStream, len(msgPack1.Msgs))
outputTtStream := (*outputStream).(*PulsarTtMsgStream)
fmt.Printf("timestamp = %v", outputTtStream.lastTimeStamp)
}

View File

@ -1,71 +0,0 @@
package msgstream
func NewInputStream(pulsarAddress string,
producerChannels []string,
timeTick bool) *MsgStream {
var stream MsgStream
if timeTick {
pulsarTtStream := PulsarTtMsgStream{}
pulsarTtStream.SetPulsarCient(pulsarAddress)
pulsarTtStream.SetProducers(producerChannels)
stream = &pulsarTtStream
} else {
pulsarStream := PulsarMsgStream{}
pulsarStream.SetPulsarCient(pulsarAddress)
pulsarStream.SetProducers(producerChannels)
stream = &pulsarStream
}
return &stream
}
func NewOutputStream(pulsarAddress string,
pulsarBufSize int64,
consumerChannelSize int64,
consumerChannels []string,
consumerSubName string,
timeTick bool) *MsgStream {
var stream MsgStream
if timeTick {
pulsarTtStream := PulsarTtMsgStream{}
pulsarTtStream.SetPulsarCient(pulsarAddress)
pulsarTtStream.SetConsumers(consumerChannels, consumerSubName, pulsarBufSize)
pulsarTtStream.InitMsgPackBuf(consumerChannelSize)
stream = &pulsarTtStream
} else {
pulsarStream := PulsarMsgStream{}
pulsarStream.SetPulsarCient(pulsarAddress)
pulsarStream.SetConsumers(consumerChannels, consumerSubName, pulsarBufSize)
pulsarStream.InitMsgPackBuf(consumerChannelSize)
stream = &pulsarStream
}
return &stream
}
func NewPipeStream(pulsarAddress string,
pulsarBufSize int64,
consumerChannelSize int64,
producerChannels []string,
consumerChannels []string,
consumerSubName string,
timeTick bool) *MsgStream {
var stream MsgStream
if timeTick {
pulsarTtStream := PulsarTtMsgStream{}
pulsarTtStream.SetPulsarCient(pulsarAddress)
pulsarTtStream.SetProducers(producerChannels)
pulsarTtStream.SetConsumers(consumerChannels, consumerSubName, pulsarBufSize)
pulsarTtStream.InitMsgPackBuf(consumerChannelSize)
stream = &pulsarTtStream
} else {
pulsarStream := PulsarMsgStream{}
pulsarStream.SetPulsarCient(pulsarAddress)
pulsarStream.SetProducers(producerChannels)
pulsarStream.SetConsumers(consumerChannels, consumerSubName, pulsarBufSize)
pulsarStream.InitMsgPackBuf(consumerChannelSize)
stream = &pulsarStream
}
return &stream
}

View File

@ -1,252 +0,0 @@
package msgstream
import (
"fmt"
"testing"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
func TestNewStream_Insert(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3))
inputStream := NewInputStream(pulsarAddress, producerChannels, false)
outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false)
(*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kInsert), nil)
(*inputStream).SetRepackFunc(repackFunc)
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kInsert))
(*outputStream).Start()
//send msgPack
(*inputStream).Produce(&msgPack)
//(*outputStream).Start()
// receive msg
receiveCount := 0
for {
result := (*outputStream).Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v)
}
}
if receiveCount >= len(msgPack.Msgs) {
break
}
}
}
func TestNewStream_Delete(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"delete1", "delete2"}
consumerChannels := []string{"delete1", "delete2"}
consumerSubName := "subDelete"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kDelete, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kDelete, 3, 3))
inputStream := NewInputStream(pulsarAddress, producerChannels, false)
outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false)
(*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kDelete), nil)
(*inputStream).SetRepackFunc(repackFunc)
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kDelete))
(*outputStream).Start()
//send msgPack
(*inputStream).Produce(&msgPack)
//(*outputStream).Start()
// receive msg
receiveCount := 0
for {
result := (*outputStream).Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v)
}
}
if receiveCount >= len(msgPack.Msgs) {
break
}
}
}
func TestNewStream_Search(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"search1", "search2"}
consumerChannels := []string{"search1", "search2"}
consumerSubName := "subSearch"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 3, 3))
inputStream := NewInputStream(pulsarAddress, producerChannels, false)
outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false)
(*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kSearch), nil)
(*inputStream).SetRepackFunc(repackFunc)
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kSearch))
(*outputStream).Start()
//send msgPack
(*inputStream).Produce(&msgPack)
//(*outputStream).Start()
// receive msg
receiveCount := 0
for {
result := (*outputStream).Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v)
}
}
if receiveCount >= len(msgPack.Msgs) {
break
}
}
}
func TestNewStream_SearchResult(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"searchResult1", "searchResult2"}
consumerChannels := []string{"searchResult1", "searchResult2"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 3, 3))
inputStream := NewInputStream(pulsarAddress, producerChannels, false)
outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false)
(*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kSearchResult), nil)
(*inputStream).SetRepackFunc(repackFunc)
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kSearchResult))
(*outputStream).Start()
//send msgPack
(*inputStream).Produce(&msgPack)
//(*outputStream).Start()
// receive msg
receiveCount := 0
for {
result := (*outputStream).Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v)
}
}
if receiveCount >= len(msgPack.Msgs) {
break
}
}
}
func TestNewStream_TimeTick(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"timeSync1", "timeSync2"}
consumerChannels := []string{"timeSync1", "timeSync2"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 3, 3))
inputStream := NewInputStream(pulsarAddress, producerChannels, false)
outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false)
(*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kTimeTick), nil)
(*inputStream).SetRepackFunc(repackFunc)
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kTimeTick))
(*outputStream).Start()
//send msgPack
(*inputStream).Produce(&msgPack)
// receive msg
receiveCount := 0
for {
result := (*outputStream).Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v)
}
}
if receiveCount >= len(msgPack.Msgs) {
break
}
}
}
func TestNewTtStream_Insert_TimeSync(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert"}
consumerChannels := []string{"insert"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3))
insertRequest := internalPb.InsertRequest{
MsgType: internalPb.MsgType_kTimeTick,
ReqId: 2,
CollectionName: "Collection",
PartitionTag: "Partition",
SegmentId: 1,
ChannelId: 1,
ProxyId: 1,
Timestamps: []Timestamp{1},
}
insertMsg := &InsertMsg{
BaseMsg: BaseMsg{HashValues: []int32{2}},
InsertRequest: insertRequest,
}
var tsMsg TsMsg = insertMsg
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
inputStream := NewInputStream(pulsarAddress, producerChannels, false)
outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, true)
(*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kInsert), nil)
(*inputStream).SetRepackFunc(repackFunc)
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kInsert))
(*outputStream).Start()
//send msgPack
(*inputStream).Produce(&msgPack)
// receive msg
receiveCount := 0
for {
result := (*outputStream).Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v)
}
}
if receiveCount+1 >= len(msgPack.Msgs) {
break
}
}
}

View File

@ -1,6 +1,7 @@
package msgstream
import (
"github.com/gogo/protobuf/proto"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
@ -11,6 +12,8 @@ type TsMsg interface {
EndTs() Timestamp
Type() MsgType
HashKeys() []int32
Marshal(*TsMsg) ([]byte, error)
Unmarshal([]byte) (*TsMsg, error)
}
type BaseMsg struct {
@ -41,6 +44,28 @@ func (it *InsertMsg) Type() MsgType {
return it.MsgType
}
func (it *InsertMsg) Marshal(input *TsMsg) ([]byte, error) {
insertMsg := (*input).(*InsertMsg)
insertRequest := &insertMsg.InsertRequest
mb, err := proto.Marshal(insertRequest)
if err != nil {
return nil, err
}
return mb, nil
}
func (it *InsertMsg) Unmarshal(input []byte) (*TsMsg, error) {
insertRequest := internalPb.InsertRequest{}
err := proto.Unmarshal(input, &insertRequest)
insertMsg := &InsertMsg{InsertRequest: insertRequest}
if err != nil {
return nil, err
}
var tsMsg TsMsg = insertMsg
return &tsMsg, nil
}
/////////////////////////////////////////Delete//////////////////////////////////////////
type DeleteMsg struct {
BaseMsg
@ -51,6 +76,28 @@ func (dt *DeleteMsg) Type() MsgType {
return dt.MsgType
}
func (dt *DeleteMsg) Marshal(input *TsMsg) ([]byte, error) {
deleteTask := (*input).(*DeleteMsg)
deleteRequest := &deleteTask.DeleteRequest
mb, err := proto.Marshal(deleteRequest)
if err != nil {
return nil, err
}
return mb, nil
}
func (dt *DeleteMsg) Unmarshal(input []byte) (*TsMsg, error) {
deleteRequest := internalPb.DeleteRequest{}
err := proto.Unmarshal(input, &deleteRequest)
deleteMsg := &DeleteMsg{DeleteRequest: deleteRequest}
if err != nil {
return nil, err
}
var tsMsg TsMsg = deleteMsg
return &tsMsg, nil
}
/////////////////////////////////////////Search//////////////////////////////////////////
type SearchMsg struct {
BaseMsg
@ -61,6 +108,28 @@ func (st *SearchMsg) Type() MsgType {
return st.MsgType
}
func (st *SearchMsg) Marshal(input *TsMsg) ([]byte, error) {
searchTask := (*input).(*SearchMsg)
searchRequest := &searchTask.SearchRequest
mb, err := proto.Marshal(searchRequest)
if err != nil {
return nil, err
}
return mb, nil
}
func (st *SearchMsg) Unmarshal(input []byte) (*TsMsg, error) {
searchRequest := internalPb.SearchRequest{}
err := proto.Unmarshal(input, &searchRequest)
searchMsg := &SearchMsg{SearchRequest: searchRequest}
if err != nil {
return nil, err
}
var tsMsg TsMsg = searchMsg
return &tsMsg, nil
}
/////////////////////////////////////////SearchResult//////////////////////////////////////////
type SearchResultMsg struct {
BaseMsg
@ -71,6 +140,28 @@ func (srt *SearchResultMsg) Type() MsgType {
return srt.MsgType
}
func (srt *SearchResultMsg) Marshal(input *TsMsg) ([]byte, error) {
searchResultTask := (*input).(*SearchResultMsg)
searchResultRequest := &searchResultTask.SearchResult
mb, err := proto.Marshal(searchResultRequest)
if err != nil {
return nil, err
}
return mb, nil
}
func (srt *SearchResultMsg) Unmarshal(input []byte) (*TsMsg, error) {
searchResultRequest := internalPb.SearchResult{}
err := proto.Unmarshal(input, &searchResultRequest)
searchResultMsg := &SearchResultMsg{SearchResult: searchResultRequest}
if err != nil {
return nil, err
}
var tsMsg TsMsg = searchResultMsg
return &tsMsg, nil
}
/////////////////////////////////////////TimeTick//////////////////////////////////////////
type TimeTickMsg struct {
BaseMsg
@ -81,6 +172,28 @@ func (tst *TimeTickMsg) Type() MsgType {
return tst.MsgType
}
func (tst *TimeTickMsg) Marshal(input *TsMsg) ([]byte, error) {
timeTickTask := (*input).(*TimeTickMsg)
timeTick := &timeTickTask.TimeTickMsg
mb, err := proto.Marshal(timeTick)
if err != nil {
return nil, err
}
return mb, nil
}
func (tst *TimeTickMsg) Unmarshal(input []byte) (*TsMsg, error) {
timeTickMsg := internalPb.TimeTickMsg{}
err := proto.Unmarshal(input, &timeTickMsg)
timeTick := &TimeTickMsg{TimeTickMsg: timeTickMsg}
if err != nil {
return nil, err
}
var tsMsg TsMsg = timeTick
return &tsMsg, nil
}
///////////////////////////////////////////Key2Seg//////////////////////////////////////////
//type Key2SegMsg struct {
// BaseMsg

View File

@ -0,0 +1,41 @@
package msgstream
import (
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
type MarshalFunc func(*TsMsg) ([]byte, error)
type UnmarshalFunc func([]byte) (*TsMsg, error)
type UnmarshalDispatcher struct {
tempMap map[internalPb.MsgType]UnmarshalFunc
}
func (dispatcher *UnmarshalDispatcher) Unmarshal(input []byte, msgType internalPb.MsgType) (*TsMsg, error) {
unmarshalFunc := dispatcher.tempMap[msgType]
return unmarshalFunc(input)
}
func (dispatcher *UnmarshalDispatcher) AddMsgTemplate(msgType internalPb.MsgType, unmarshal UnmarshalFunc) {
dispatcher.tempMap[msgType] = unmarshal
}
func (dispatcher *UnmarshalDispatcher) addDefaultMsgTemplates() {
insertMsg := InsertMsg{}
deleteMsg := DeleteMsg{}
searchMsg := SearchMsg{}
searchResultMsg := SearchResultMsg{}
timeTickMsg := TimeTickMsg{}
dispatcher.tempMap = make(map[internalPb.MsgType]UnmarshalFunc)
dispatcher.tempMap[internalPb.MsgType_kInsert] = insertMsg.Unmarshal
dispatcher.tempMap[internalPb.MsgType_kDelete] = deleteMsg.Unmarshal
dispatcher.tempMap[internalPb.MsgType_kSearch] = searchMsg.Unmarshal
dispatcher.tempMap[internalPb.MsgType_kSearchResult] = searchResultMsg.Unmarshal
dispatcher.tempMap[internalPb.MsgType_kTimeTick] = timeTickMsg.Unmarshal
}
func NewUnmarshalDispatcher() *UnmarshalDispatcher {
unmarshalDispatcher := UnmarshalDispatcher{}
unmarshalDispatcher.addDefaultMsgTemplates()
return &unmarshalDispatcher
}

View File

@ -2,7 +2,6 @@ package reader
import (
"context"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
@ -36,10 +35,12 @@ func (dmService *manipulationService) start() {
consumerChannels := []string{"insert"}
consumerSubName := "subInsert"
outputStream := msgstream.NewOutputStream(dmService.pulsarURL, pulsarBufSize, consumerChannelSize, consumerChannels, consumerSubName, true)
(*outputStream).SetMsgMarshaler(nil, msgstream.GetMarshaler(internalPb.MsgType_kInsert))
go (*outputStream).Start()
// TODO:: load receiveBufSize from config file
outputStream := msgstream.NewPulsarTtMsgStream(dmService.ctx, 100)
outputStream.SetPulsarCient(dmService.pulsarURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, pulsarBufSize)
(*outputStream).Start()
dmService.initNodes()
go dmService.fg.Start()

View File

@ -64,6 +64,7 @@ func TestManipulationService_Start(t *testing.T) {
for i := 0; i < msgLength; i++ {
var msg msgstream.TsMsg = &msgstream.InsertMsg{
InsertRequest: internalPb.InsertRequest{
//MsgType: internalPb.MsgType_kInsert,
ReqId: int64(0),
CollectionName: "collection0",
PartitionTag: "default",