Refactor msgstream

Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>
pull/4973/head^2
Xiangyu Wang 2021-01-20 10:02:59 +08:00 committed by yefu.chen
parent 2ec573d2f9
commit 7ee8623e1a
40 changed files with 1138 additions and 1071 deletions

4
.gitignore vendored
View File

@ -6,8 +6,8 @@ internal/core/output/*
internal/core/build/*
internal/kv/rocksdb/cwrapper/output/*
**/.idea/*
pulsar/client-cpp/build/
pulsar/client-cpp/build/*
internal/msgstream/pulsarms/client-cpp/build/
internal/msgstream/pulsarms/client-cpp/build/*
# vscode generated files
.vscode

View File

@ -14,11 +14,13 @@ type Master interface {
DropCollection(req DropCollectionRequest) error
HasCollection(req HasCollectionRequest) (bool, error)
DescribeCollection(req DescribeCollectionRequest) (DescribeCollectionResponse, error)
GetCollectionStatistics(req CollectionStatsRequest) (CollectionStatsResponse, error)
ShowCollections(req ShowCollectionRequest) (ShowCollectionResponse, error)
CreatePartition(req CreatePartitionRequest) error
DropPartition(req DropPartitionRequest) error
HasPartition(req HasPartitionRequest) (bool, error)
GetPartitionStatistics(req PartitionStatsRequest) (PartitionStatsResponse, error)
ShowPartitions(req ShowPartitionRequest) (ShowPartitionResponse, error)
DescribeSegment(req DescribeSegmentRequest) (DescribeSegmentResponse, error)

View File

@ -23,9 +23,6 @@ type DataService interface {
GetInsertBinlogPaths(req InsertBinlogPathRequest) (InsertBinlogPathsResponse, error)
GetInsertChannels(req InsertChannelRequest) ([]string, error)
GetCollectionStatistics(req CollectionStatsRequest) (CollectionStatsResponse, error)
GetPartitionStatistics(req PartitionStatsRequest) (PartitionStatsResponse, error)
}
```
@ -167,35 +164,6 @@ type InsertChannelRequest struct {
```
* *GetCollectionStatistics*
```go
type CollectionStatsRequest struct {
MsgBase
DbName string
CollectionName string
}
type CollectionStatsResponse struct {
Stats []KeyValuePair
}
```
* *GetPartitionStatistics*
```go
type PartitionStatsRequest struct {
MsgBase
DbName string
CollectionName string
PartitionName string
}
type PartitionStatsResponse struct {
Stats []KeyValuePair
}
```
#### 8.2 Insert Channel
@ -231,6 +199,7 @@ type DataNode interface {
```
* *WatchDmChannels*
```go

View File

@ -14,6 +14,7 @@ import (
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@ -180,11 +181,11 @@ func TestDataSyncService_Start(t *testing.T) {
insertChannels := Params.InsertChannelNames
ddChannels := Params.DDChannelNames
insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
insertStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize)
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(insertChannels)
ddStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
ddStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize)
ddStream.SetPulsarClient(pulsarURL)
ddStream.CreatePulsarProducers(ddChannels)

View File

@ -19,6 +19,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/kv"
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
@ -42,7 +43,7 @@ type (
minioPrifex string
idAllocator *allocator.IDAllocator
outCh chan *insertFlushSyncMsg
pulsarDataNodeTimeTickStream *msgstream.PulsarMsgStream
pulsarDataNodeTimeTickStream *pulsarms.PulsarMsgStream
replica collectionReplica
}
@ -643,7 +644,7 @@ func newInsertBufferNode(ctx context.Context, outCh chan *insertFlushSyncMsg, re
panic(err)
}
wTt := msgstream.NewPulsarMsgStream(ctx, 1024) //input stream, data node time tick
wTt := pulsarms.NewPulsarMsgStream(ctx, 1024) //input stream, data node time tick
wTt.SetPulsarClient(Params.PulsarAddress)
wTt.CreatePulsarProducers([]string{Params.TimeTickChannelName})

View File

@ -4,6 +4,8 @@ import (
"context"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
@ -16,11 +18,11 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
consumeChannels := Params.InsertChannelNames
consumeSubName := Params.MsgChannelSubName
insertStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize)
insertStream := pulsarms.NewPulsarTtMsgStream(ctx, receiveBufSize)
// TODO could panic of nil pointer
insertStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
// TODO could panic of nil pointer
insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
@ -43,9 +45,9 @@ func newDDInputNode(ctx context.Context) *flowgraph.InputNode {
consumeChannels := Params.DDChannelNames
consumeSubName := Params.MsgChannelSubName
ddStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize)
ddStream := pulsarms.NewPulsarTtMsgStream(ctx, receiveBufSize)
ddStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
ddStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
var stream msgstream.MsgStream = ddStream

View File

@ -18,6 +18,8 @@ import (
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
@ -113,33 +115,33 @@ func CreateServer(ctx context.Context) (*Master, error) {
if err != nil {
return nil, err
}
pulsarProxyStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream
pulsarProxyStream := pulsarms.NewPulsarMsgStream(ctx, 1024) //output stream
pulsarProxyStream.SetPulsarClient(pulsarAddr)
pulsarProxyStream.CreatePulsarConsumers(Params.ProxyTimeTickChannelNames, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
pulsarProxyStream.CreatePulsarConsumers(Params.ProxyTimeTickChannelNames, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
pulsarProxyStream.Start()
var proxyStream ms.MsgStream = pulsarProxyStream
proxyTimeTickBarrier := newSoftTimeTickBarrier(ctx, &proxyStream, Params.ProxyIDList, Params.SoftTimeTickBarrierInterval)
tsMsgProducer.SetProxyTtBarrier(proxyTimeTickBarrier)
pulsarWriteStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream
pulsarWriteStream := pulsarms.NewPulsarMsgStream(ctx, 1024) //output stream
pulsarWriteStream.SetPulsarClient(pulsarAddr)
pulsarWriteStream.CreatePulsarConsumers(Params.WriteNodeTimeTickChannelNames, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
pulsarWriteStream.CreatePulsarConsumers(Params.WriteNodeTimeTickChannelNames, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
pulsarWriteStream.Start()
var writeStream ms.MsgStream = pulsarWriteStream
writeTimeTickBarrier := newHardTimeTickBarrier(ctx, &writeStream, Params.WriteNodeIDList)
tsMsgProducer.SetWriteNodeTtBarrier(writeTimeTickBarrier)
pulsarDDStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarDDStream := pulsarms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarDDStream.SetPulsarClient(pulsarAddr)
pulsarDDStream.CreatePulsarProducers(Params.DDChannelNames)
tsMsgProducer.SetDDSyncStream(pulsarDDStream)
pulsarDMStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarDMStream := pulsarms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarDMStream.SetPulsarClient(pulsarAddr)
pulsarDMStream.CreatePulsarProducers(Params.InsertChannelNames)
tsMsgProducer.SetDMSyncStream(pulsarDMStream)
pulsarK2SStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarK2SStream := pulsarms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarK2SStream.SetPulsarClient(pulsarAddr)
pulsarK2SStream.CreatePulsarProducers(Params.K2SChannelNames)
tsMsgProducer.SetK2sSyncStream(pulsarK2SStream)
@ -150,9 +152,9 @@ func CreateServer(ctx context.Context) (*Master, error) {
tsMsgProducer.WatchWriteNodeTtBarrier(writeNodeTtBarrierWatcher)
// stats msg stream
statsMs := ms.NewPulsarMsgStream(ctx, 1024)
statsMs := pulsarms.NewPulsarMsgStream(ctx, 1024)
statsMs.SetPulsarClient(pulsarAddr)
statsMs.CreatePulsarConsumers([]string{Params.QueryNodeStatsChannelName}, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
statsMs.CreatePulsarConsumers([]string{Params.QueryNodeStatsChannelName}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
statsMs.Start()
m := &Master{

View File

@ -19,6 +19,8 @@ import (
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
@ -554,9 +556,9 @@ func TestMaster(t *testing.T) {
assert.NoError(t, err)
//consume msg
ddMs := ms.NewPulsarTtMsgStream(ctx, 1024)
ddMs := pulsarms.NewPulsarTtMsgStream(ctx, 1024)
ddMs.SetPulsarClient(pulsarAddr)
ddMs.CreatePulsarConsumers(Params.DDChannelNames, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
ddMs.CreatePulsarConsumers(Params.DDChannelNames, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
ddMs.Start()
var consumeMsg ms.MsgStream = ddMs
@ -876,9 +878,9 @@ func TestMaster(t *testing.T) {
assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS)
//consume msg
ddMs := ms.NewPulsarTtMsgStream(ctx, 1024)
ddMs := pulsarms.NewPulsarTtMsgStream(ctx, 1024)
ddMs.SetPulsarClient(pulsarAddr)
ddMs.CreatePulsarConsumers(Params.DDChannelNames, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
ddMs.CreatePulsarConsumers(Params.DDChannelNames, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
ddMs.Start()
time.Sleep(1000 * time.Millisecond)
@ -898,29 +900,29 @@ func TestMaster(t *testing.T) {
t.Run("TestBroadCastRequest", func(t *testing.T) {
proxyTimeTickStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
proxyTimeTickStream := pulsarms.NewPulsarMsgStream(ctx, 1024) //input stream
proxyTimeTickStream.SetPulsarClient(pulsarAddr)
proxyTimeTickStream.CreatePulsarProducers(Params.ProxyTimeTickChannelNames)
proxyTimeTickStream.Start()
writeNodeStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
writeNodeStream := pulsarms.NewPulsarMsgStream(ctx, 1024) //input stream
writeNodeStream.SetPulsarClient(pulsarAddr)
writeNodeStream.CreatePulsarProducers(Params.WriteNodeTimeTickChannelNames)
writeNodeStream.Start()
ddMs := ms.NewPulsarTtMsgStream(ctx, 1024)
ddMs := pulsarms.NewPulsarTtMsgStream(ctx, 1024)
ddMs.SetPulsarClient(pulsarAddr)
ddMs.CreatePulsarConsumers(Params.DDChannelNames, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
ddMs.CreatePulsarConsumers(Params.DDChannelNames, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
ddMs.Start()
dMMs := ms.NewPulsarTtMsgStream(ctx, 1024)
dMMs := pulsarms.NewPulsarTtMsgStream(ctx, 1024)
dMMs.SetPulsarClient(pulsarAddr)
dMMs.CreatePulsarConsumers(Params.InsertChannelNames, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
dMMs.CreatePulsarConsumers(Params.InsertChannelNames, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
dMMs.Start()
k2sMs := ms.NewPulsarMsgStream(ctx, 1024)
k2sMs := pulsarms.NewPulsarMsgStream(ctx, 1024)
k2sMs.SetPulsarClient(pulsarAddr)
k2sMs.CreatePulsarConsumers(Params.K2SChannelNames, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
k2sMs.CreatePulsarConsumers(Params.K2SChannelNames, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
k2sMs.Start()
ttsoftmsgs := [][2]uint64{
@ -1162,7 +1164,7 @@ func TestMaster(t *testing.T) {
// test stats
segID := assignments[0].SegID
pulsarAddress := Params.PulsarAddress
msgStream := ms.NewPulsarMsgStream(ctx, 1024)
msgStream := pulsarms.NewPulsarMsgStream(ctx, 1024)
msgStream.SetPulsarClient(pulsarAddress)
msgStream.CreatePulsarProducers([]string{Params.QueryNodeStatsChannelName})
msgStream.Start()

View File

@ -14,6 +14,8 @@ import (
"github.com/stretchr/testify/assert"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
@ -57,15 +59,15 @@ func TestMaster_Scheduler_Collection(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pulsarDDStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarDDStream := pulsarms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarDDStream.SetPulsarClient(pulsarAddr)
pulsarDDStream.CreatePulsarProducers(producerChannels)
pulsarDDStream.Start()
defer pulsarDDStream.Close()
consumeMs := ms.NewPulsarTtMsgStream(ctx, 1024)
consumeMs := pulsarms.NewPulsarTtMsgStream(ctx, 1024)
consumeMs.SetPulsarClient(pulsarAddr)
consumeMs.CreatePulsarConsumers(consumerChannels, consumerSubName, ms.NewUnmarshalDispatcher(), 1024)
consumeMs.CreatePulsarConsumers(consumerChannels, consumerSubName, util.NewUnmarshalDispatcher(), 1024)
consumeMs.Start()
defer consumeMs.Close()
@ -217,15 +219,15 @@ func TestMaster_Scheduler_Partition(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pulsarDDStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarDDStream := pulsarms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarDDStream.SetPulsarClient(pulsarAddr)
pulsarDDStream.CreatePulsarProducers(producerChannels)
pulsarDDStream.Start()
defer pulsarDDStream.Close()
consumeMs := ms.NewPulsarTtMsgStream(ctx, 1024)
consumeMs := pulsarms.NewPulsarTtMsgStream(ctx, 1024)
consumeMs.SetPulsarClient(pulsarAddr)
consumeMs.CreatePulsarConsumers(consumerChannels, consumerSubName, ms.NewUnmarshalDispatcher(), 1024)
consumeMs.CreatePulsarConsumers(consumerChannels, consumerSubName, util.NewUnmarshalDispatcher(), 1024)
consumeMs.Start()
defer consumeMs.Close()

View File

@ -8,6 +8,8 @@ import (
"github.com/stretchr/testify/assert"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
)
type (
@ -41,7 +43,7 @@ func initTestPulsarStream(ctx context.Context, pulsarAddress string,
consumerSubName string, opts ...ms.RepackFunc) (*ms.MsgStream, *ms.MsgStream) {
// set input stream
inputStream := ms.NewPulsarMsgStream(ctx, 100)
inputStream := pulsarms.NewPulsarMsgStream(ctx, 100)
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
for _, opt := range opts {
@ -50,9 +52,9 @@ func initTestPulsarStream(ctx context.Context, pulsarAddress string,
var input ms.MsgStream = inputStream
// set output stream
outputStream := ms.NewPulsarMsgStream(ctx, 100)
outputStream := pulsarms.NewPulsarMsgStream(ctx, 100)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := ms.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
var output ms.MsgStream = outputStream

View File

@ -11,6 +11,8 @@ import (
"github.com/stretchr/testify/require"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
@ -41,15 +43,15 @@ func initPulsarStream(pulsarAddress string,
consumerSubName string) (*ms.MsgStream, *ms.MsgStream) {
// set input stream
inputStream := ms.NewPulsarMsgStream(context.Background(), 100)
inputStream := pulsarms.NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
var input ms.MsgStream = inputStream
// set output stream
outputStream := ms.NewPulsarMsgStream(context.Background(), 100)
outputStream := pulsarms.NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := ms.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
var output ms.MsgStream = outputStream

View File

@ -11,6 +11,8 @@ import (
"github.com/zilliztech/milvus-distributed/internal/errors"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
@ -255,18 +257,18 @@ func (c *Core) startTimeTickLoop() {
func (c *Core) setMsgStreams() error {
//proxy time tick stream,
proxyTimeTickStream := ms.NewPulsarMsgStream(c.ctx, 1024)
proxyTimeTickStream := pulsarms.NewPulsarMsgStream(c.ctx, 1024)
proxyTimeTickStream.SetPulsarClient(Params.PulsarAddress)
proxyTimeTickStream.CreatePulsarConsumers([]string{Params.ProxyTimeTickChannel}, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
proxyTimeTickStream.CreatePulsarConsumers([]string{Params.ProxyTimeTickChannel}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
proxyTimeTickStream.Start()
// master time tick channel
timeTickStream := ms.NewPulsarMsgStream(c.ctx, 1024)
timeTickStream := pulsarms.NewPulsarMsgStream(c.ctx, 1024)
timeTickStream.SetPulsarClient(Params.PulsarAddress)
timeTickStream.CreatePulsarProducers([]string{Params.TimeTickChannel})
// master dd channel
ddStream := ms.NewPulsarMsgStream(c.ctx, 1024)
ddStream := pulsarms.NewPulsarMsgStream(c.ctx, 1024)
ddStream.SetPulsarClient(Params.PulsarAddress)
ddStream.CreatePulsarProducers([]string{Params.DdChannel})
@ -399,9 +401,9 @@ func (c *Core) setMsgStreams() error {
}()
//segment channel, data service create segment,or data node flush segment will put msg in this channel
dataServiceStream := ms.NewPulsarMsgStream(c.ctx, 1024)
dataServiceStream := pulsarms.NewPulsarMsgStream(c.ctx, 1024)
dataServiceStream.SetPulsarClient(Params.PulsarAddress)
dataServiceStream.CreatePulsarConsumers([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
dataServiceStream.CreatePulsarConsumers([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
dataServiceStream.Start()
// receive segment info from msg stream

View File

@ -1,19 +1,8 @@
package msgstream
import (
"context"
"log"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/golang/protobuf/proto"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@ -42,559 +31,6 @@ type MsgStream interface {
Chan() <-chan *MsgPack
}
type PulsarMsgStream struct {
ctx context.Context
client *pulsar.Client
producers []*pulsar.Producer
consumers []*pulsar.Consumer
repackFunc RepackFunc
unmarshal *UnmarshalDispatcher
receiveBuf chan *MsgPack
wait *sync.WaitGroup
streamCancel func()
}
func NewPulsarMsgStream(ctx context.Context, receiveBufSize int64) *PulsarMsgStream {
streamCtx, streamCancel := context.WithCancel(ctx)
stream := &PulsarMsgStream{
ctx: streamCtx,
streamCancel: streamCancel,
}
stream.receiveBuf = make(chan *MsgPack, receiveBufSize)
return stream
}
func (ms *PulsarMsgStream) SetPulsarClient(address string) {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address})
if err != nil {
log.Printf("Set pulsar client failed, error = %v", err)
}
ms.client = &client
}
func (ms *PulsarMsgStream) CreatePulsarProducers(channels []string) {
for i := 0; i < len(channels); i++ {
fn := func() error {
pp, err := (*ms.client).CreateProducer(pulsar.ProducerOptions{Topic: channels[i]})
if err != nil {
return err
}
if pp == nil {
return errors.New("pulsar is not ready, producer is nil")
}
ms.producers = append(ms.producers, &pp)
return nil
}
err := Retry(10, time.Millisecond*200, fn)
if err != nil {
errMsg := "Failed to create producer " + channels[i] + ", error = " + err.Error()
panic(errMsg)
}
}
}
func (ms *PulsarMsgStream) CreatePulsarConsumers(channels []string,
subName string,
unmarshal *UnmarshalDispatcher,
pulsarBufSize int64) {
ms.unmarshal = unmarshal
for i := 0; i < len(channels); i++ {
fn := func() error {
receiveChannel := make(chan pulsar.ConsumerMessage, pulsarBufSize)
pc, err := (*ms.client).Subscribe(pulsar.ConsumerOptions{
Topic: channels[i],
SubscriptionName: subName,
Type: pulsar.KeyShared,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
MessageChannel: receiveChannel,
})
if err != nil {
return err
}
if pc == nil {
return errors.New("pulsar is not ready, consumer is nil")
}
ms.consumers = append(ms.consumers, &pc)
return nil
}
err := Retry(10, time.Millisecond*200, fn)
if err != nil {
errMsg := "Failed to create consumer " + channels[i] + ", error = " + err.Error()
panic(errMsg)
}
}
}
func (ms *PulsarMsgStream) SetRepackFunc(repackFunc RepackFunc) {
ms.repackFunc = repackFunc
}
func (ms *PulsarMsgStream) Start() {
ms.wait = &sync.WaitGroup{}
if ms.consumers != nil {
ms.wait.Add(1)
go ms.bufMsgPackToChannel()
}
}
func (ms *PulsarMsgStream) Close() {
ms.streamCancel()
for _, producer := range ms.producers {
if producer != nil {
(*producer).Close()
}
}
for _, consumer := range ms.consumers {
if consumer != nil {
(*consumer).Close()
}
}
if ms.client != nil {
(*ms.client).Close()
}
}
type propertiesReaderWriter struct {
ppMap map[string]string
}
func (ppRW *propertiesReaderWriter) Set(key, val string) {
// The GRPC HPACK implementation rejects any uppercase keys here.
//
// As such, since the HTTP_HEADERS format is case-insensitive anyway, we
// blindly lowercase the key (which is guaranteed to work in the
// Inject/Extract sense per the OpenTracing spec).
key = strings.ToLower(key)
ppRW.ppMap[key] = val
}
func (ppRW *propertiesReaderWriter) ForeachKey(handler func(key, val string) error) error {
for k, val := range ppRW.ppMap {
if err := handler(k, val); err != nil {
return err
}
}
return nil
}
func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
tsMsgs := msgPack.Msgs
if len(tsMsgs) <= 0 {
log.Printf("Warning: Receive empty msgPack")
return nil
}
if len(ms.producers) <= 0 {
return errors.New("nil producer in msg stream")
}
reBucketValues := make([][]int32, len(tsMsgs))
for channelID, tsMsg := range tsMsgs {
hashValues := tsMsg.HashKeys()
bucketValues := make([]int32, len(hashValues))
for index, hashValue := range hashValues {
if tsMsg.Type() == commonpb.MsgType_kSearchResult {
searchResult := tsMsg.(*SearchResultMsg)
channelID := searchResult.ResultChannelID
channelIDInt, _ := strconv.ParseInt(channelID, 10, 64)
if channelIDInt >= int64(len(ms.producers)) {
return errors.New("Failed to produce pulsar msg to unKnow channel")
}
bucketValues[index] = int32(channelIDInt)
continue
}
bucketValues[index] = int32(hashValue % uint32(len(ms.producers)))
}
reBucketValues[channelID] = bucketValues
}
var result map[int32]*MsgPack
var err error
if ms.repackFunc != nil {
result, err = ms.repackFunc(tsMsgs, reBucketValues)
} else {
msgType := (tsMsgs[0]).Type()
switch msgType {
case commonpb.MsgType_kInsert:
result, err = insertRepackFunc(tsMsgs, reBucketValues)
case commonpb.MsgType_kDelete:
result, err = deleteRepackFunc(tsMsgs, reBucketValues)
default:
result, err = defaultRepackFunc(tsMsgs, reBucketValues)
}
}
if err != nil {
return err
}
for k, v := range result {
for i := 0; i < len(v.Msgs); i++ {
mb, err := v.Msgs[i].Marshal(v.Msgs[i])
if err != nil {
return err
}
msg := &pulsar.ProducerMessage{Payload: mb}
var child opentracing.Span
if v.Msgs[i].Type() == commonpb.MsgType_kInsert ||
v.Msgs[i].Type() == commonpb.MsgType_kSearch ||
v.Msgs[i].Type() == commonpb.MsgType_kSearchResult {
tracer := opentracing.GlobalTracer()
ctx := v.Msgs[i].GetMsgContext()
if ctx == nil {
ctx = context.Background()
}
if parent := opentracing.SpanFromContext(ctx); parent != nil {
child = tracer.StartSpan("start send pulsar msg",
opentracing.FollowsFrom(parent.Context()))
} else {
child = tracer.StartSpan("start send pulsar msg")
}
child.SetTag("hash keys", v.Msgs[i].HashKeys())
child.SetTag("start time", v.Msgs[i].BeginTs())
child.SetTag("end time", v.Msgs[i].EndTs())
child.SetTag("msg type", v.Msgs[i].Type())
msg.Properties = make(map[string]string)
err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties})
if err != nil {
child.LogFields(oplog.Error(err))
child.Finish()
return err
}
child.LogFields(oplog.String("inject success", "inject success"))
}
if _, err := (*ms.producers[k]).Send(
context.Background(),
msg,
); err != nil {
if child != nil {
child.LogFields(oplog.Error(err))
child.Finish()
}
return err
}
if child != nil {
child.Finish()
}
}
}
return nil
}
func (ms *PulsarMsgStream) Broadcast(msgPack *MsgPack) error {
producerLen := len(ms.producers)
for _, v := range msgPack.Msgs {
mb, err := v.Marshal(v)
if err != nil {
return err
}
msg := &pulsar.ProducerMessage{Payload: mb}
var child opentracing.Span
if v.Type() == commonpb.MsgType_kInsert ||
v.Type() == commonpb.MsgType_kSearch ||
v.Type() == commonpb.MsgType_kSearchResult {
tracer := opentracing.GlobalTracer()
ctx := v.GetMsgContext()
if ctx == nil {
ctx = context.Background()
}
if parent := opentracing.SpanFromContext(ctx); parent != nil {
child = tracer.StartSpan("start send pulsar msg",
opentracing.FollowsFrom(parent.Context()))
} else {
child = tracer.StartSpan("start send pulsar msg, start time: %d")
}
child.SetTag("hash keys", v.HashKeys())
child.SetTag("start time", v.BeginTs())
child.SetTag("end time", v.EndTs())
child.SetTag("msg type", v.Type())
msg.Properties = make(map[string]string)
err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties})
if err != nil {
child.LogFields(oplog.Error(err))
child.Finish()
return err
}
child.LogFields(oplog.String("inject success", "inject success"))
}
for i := 0; i < producerLen; i++ {
if _, err := (*ms.producers[i]).Send(
context.Background(),
msg,
); err != nil {
if child != nil {
child.LogFields(oplog.Error(err))
child.Finish()
}
return err
}
}
if child != nil {
child.Finish()
}
}
return nil
}
func (ms *PulsarMsgStream) Consume() *MsgPack {
for {
select {
case cm, ok := <-ms.receiveBuf:
if !ok {
log.Println("buf chan closed")
return nil
}
return cm
case <-ms.ctx.Done():
log.Printf("context closed")
return nil
}
}
}
func (ms *PulsarMsgStream) bufMsgPackToChannel() {
defer ms.wait.Done()
cases := make([]reflect.SelectCase, len(ms.consumers))
for i := 0; i < len(ms.consumers); i++ {
ch := (*ms.consumers[i]).Chan()
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
for {
select {
case <-ms.ctx.Done():
log.Println("done")
return
default:
tsMsgList := make([]TsMsg, 0)
for {
chosen, value, ok := reflect.Select(cases)
if !ok {
log.Printf("channel closed")
return
}
pulsarMsg, ok := value.Interface().(pulsar.ConsumerMessage)
if !ok {
log.Printf("type assertion failed, not consumer message type")
continue
}
(*ms.consumers[chosen]).AckID(pulsarMsg.ID())
headerMsg := commonpb.MsgHeader{}
err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg)
if err != nil {
log.Printf("Failed to unmarshal message header, error = %v", err)
continue
}
tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType)
if tsMsg.Type() == commonpb.MsgType_kSearch ||
tsMsg.Type() == commonpb.MsgType_kSearchResult {
tracer := opentracing.GlobalTracer()
spanContext, err := tracer.Extract(opentracing.HTTPHeaders, &propertiesReaderWriter{pulsarMsg.Properties()})
if err != nil {
log.Println("extract message err")
log.Println(err.Error())
}
span := opentracing.StartSpan("pulsar msg received",
ext.RPCServerOption(spanContext))
span.SetTag("msg type", tsMsg.Type())
span.SetTag("hash keys", tsMsg.HashKeys())
span.SetTag("start time", tsMsg.BeginTs())
span.SetTag("end time", tsMsg.EndTs())
tsMsg.SetMsgContext(opentracing.ContextWithSpan(context.Background(), span))
span.Finish()
}
if err != nil {
log.Printf("Failed to unmarshal tsMsg, error = %v", err)
continue
}
tsMsgList = append(tsMsgList, tsMsg)
noMoreMessage := true
for i := 0; i < len(ms.consumers); i++ {
if len((*ms.consumers[i]).Chan()) > 0 {
noMoreMessage = false
}
}
if noMoreMessage {
break
}
}
if len(tsMsgList) > 0 {
msgPack := MsgPack{Msgs: tsMsgList}
ms.receiveBuf <- &msgPack
}
}
}
}
func (ms *PulsarMsgStream) Chan() <-chan *MsgPack {
return ms.receiveBuf
}
type PulsarTtMsgStream struct {
PulsarMsgStream
inputBuf []TsMsg
unsolvedBuf []TsMsg
lastTimeStamp Timestamp
}
func NewPulsarTtMsgStream(ctx context.Context, receiveBufSize int64) *PulsarTtMsgStream {
streamCtx, streamCancel := context.WithCancel(ctx)
pulsarMsgStream := PulsarMsgStream{
ctx: streamCtx,
streamCancel: streamCancel,
}
pulsarMsgStream.receiveBuf = make(chan *MsgPack, receiveBufSize)
return &PulsarTtMsgStream{
PulsarMsgStream: pulsarMsgStream,
}
}
func (ms *PulsarTtMsgStream) Start() {
ms.wait = &sync.WaitGroup{}
if ms.consumers != nil {
ms.wait.Add(1)
go ms.bufMsgPackToChannel()
}
}
func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
defer ms.wait.Done()
ms.unsolvedBuf = make([]TsMsg, 0)
ms.inputBuf = make([]TsMsg, 0)
isChannelReady := make([]bool, len(ms.consumers))
eofMsgTimeStamp := make(map[int]Timestamp)
spans := make(map[Timestamp]opentracing.Span)
ctxs := make(map[Timestamp]context.Context)
for {
select {
case <-ms.ctx.Done():
return
default:
wg := sync.WaitGroup{}
mu := sync.Mutex{}
findMapMutex := sync.RWMutex{}
for i := 0; i < len(ms.consumers); i++ {
if isChannelReady[i] {
continue
}
wg.Add(1)
go ms.findTimeTick(i, eofMsgTimeStamp, &wg, &mu, &findMapMutex)
}
wg.Wait()
timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp, isChannelReady, &findMapMutex)
if !ok || timeStamp <= ms.lastTimeStamp {
log.Printf("All timeTick's timestamps are inconsistent")
continue
}
timeTickBuf := make([]TsMsg, 0)
ms.inputBuf = append(ms.inputBuf, ms.unsolvedBuf...)
ms.unsolvedBuf = ms.unsolvedBuf[:0]
for _, v := range ms.inputBuf {
var ctx context.Context
var span opentracing.Span
if v.Type() == commonpb.MsgType_kInsert {
if _, ok := spans[v.BeginTs()]; !ok {
span, ctx = opentracing.StartSpanFromContext(v.GetMsgContext(), "after find time tick")
ctxs[v.BeginTs()] = ctx
spans[v.BeginTs()] = span
}
}
if v.EndTs() <= timeStamp {
timeTickBuf = append(timeTickBuf, v)
if v.Type() == commonpb.MsgType_kInsert {
v.SetMsgContext(ctxs[v.BeginTs()])
spans[v.BeginTs()].Finish()
delete(spans, v.BeginTs())
}
} else {
ms.unsolvedBuf = append(ms.unsolvedBuf, v)
}
}
ms.inputBuf = ms.inputBuf[:0]
msgPack := MsgPack{
BeginTs: ms.lastTimeStamp,
EndTs: timeStamp,
Msgs: timeTickBuf,
}
ms.receiveBuf <- &msgPack
ms.lastTimeStamp = timeStamp
}
}
}
func (ms *PulsarTtMsgStream) findTimeTick(channelIndex int,
eofMsgMap map[int]Timestamp,
wg *sync.WaitGroup,
mu *sync.Mutex,
findMapMutex *sync.RWMutex) {
defer wg.Done()
for {
select {
case <-ms.ctx.Done():
return
case pulsarMsg, ok := <-(*ms.consumers[channelIndex]).Chan():
if !ok {
log.Printf("consumer closed!")
return
}
(*ms.consumers[channelIndex]).Ack(pulsarMsg)
headerMsg := commonpb.MsgHeader{}
err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg)
if err != nil {
log.Printf("Failed to unmarshal, error = %v", err)
}
unMarshalFunc := (*ms.unmarshal).tempMap[headerMsg.Base.MsgType]
if unMarshalFunc == nil {
panic("null unMarshalFunc for " + headerMsg.Base.MsgType.String() + " msg type")
}
tsMsg, err := unMarshalFunc(pulsarMsg.Payload())
if err != nil {
log.Printf("Failed to unmarshal, error = %v", err)
}
if tsMsg.Type() == commonpb.MsgType_kInsert {
tracer := opentracing.GlobalTracer()
spanContext, err := tracer.Extract(opentracing.HTTPHeaders, &propertiesReaderWriter{pulsarMsg.Properties()})
if err != nil {
log.Println("extract message err")
log.Println(err.Error())
}
span := opentracing.StartSpan("pulsar msg received",
ext.RPCServerOption(spanContext))
span.SetTag("hash keys", tsMsg.HashKeys())
span.SetTag("start time", tsMsg.BeginTs())
span.SetTag("end time", tsMsg.EndTs())
span.SetTag("msg type", tsMsg.Type())
tsMsg.SetMsgContext(opentracing.ContextWithSpan(context.Background(), span))
span.Finish()
}
if headerMsg.Base.MsgType == commonpb.MsgType_kTimeTick {
findMapMutex.Lock()
eofMsgMap[channelIndex] = tsMsg.(*TimeTickMsg).Base.Timestamp
findMapMutex.Unlock()
return
}
mu.Lock()
ms.inputBuf = append(ms.inputBuf, tsMsg)
mu.Unlock()
}
}
}
//TODO test InMemMsgStream
/*
type InMemMsgStream struct {
@ -634,7 +70,7 @@ func (ms *InMemMsgStream) Chan() <- chan *MsgPack {
}
*/
func checkTimeTickMsg(msg map[int]Timestamp, isChannelReady []bool, mu *sync.RWMutex) (Timestamp, bool) {
func CheckTimeTickMsg(msg map[int]Timestamp, isChannelReady []bool, mu *sync.RWMutex) (Timestamp, bool) {
checkMap := make(map[Timestamp]int)
var maxTime Timestamp = 0
for _, v := range msg {
@ -663,7 +99,7 @@ func checkTimeTickMsg(msg map[int]Timestamp, isChannelReady []bool, mu *sync.RWM
return 0, false
}
func insertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
func InsertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range tsMsgs {
if request.Type() != commonpb.MsgType_kInsert {
@ -715,7 +151,7 @@ func insertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
return result, nil
}
func deleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range tsMsgs {
if request.Type() != commonpb.MsgType_kDelete {
@ -764,7 +200,7 @@ func deleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
return result, nil
}
func defaultRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
func DefaultRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range tsMsgs {
keys := hashKeys[i]

View File

@ -1,4 +1,4 @@
package msgstream
package pulsarms
import (
"context"
@ -9,16 +9,18 @@ import (
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
type InsertTask struct {
Tag string
InsertMsg
msgstream.InsertMsg
}
func (tt *InsertTask) Marshal(input TsMsg) ([]byte, error) {
func (tt *InsertTask) Marshal(input msgstream.TsMsg) ([]byte, error) {
testMsg := input.(*InsertTask)
insertRequest := &testMsg.InsertRequest
mb, err := proto.Marshal(insertRequest)
@ -28,10 +30,10 @@ func (tt *InsertTask) Marshal(input TsMsg) ([]byte, error) {
return mb, nil
}
func (tt *InsertTask) Unmarshal(input []byte) (TsMsg, error) {
func (tt *InsertTask) Unmarshal(input []byte) (msgstream.TsMsg, error) {
insertRequest := internalpb2.InsertRequest{}
err := proto.Unmarshal(input, &insertRequest)
testMsg := &InsertTask{InsertMsg: InsertMsg{InsertRequest: insertRequest}}
testMsg := &InsertTask{InsertMsg: msgstream.InsertMsg{InsertRequest: insertRequest}}
testMsg.Tag = testMsg.InsertRequest.PartitionName
if err != nil {
return nil, err
@ -40,8 +42,8 @@ func (tt *InsertTask) Unmarshal(input []byte) (TsMsg, error) {
return testMsg, nil
}
func newRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
func newRepackFunc(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
result := make(map[int32]*msgstream.MsgPack)
for i, request := range tsMsgs {
if request.Type() != commonpb.MsgType_kInsert {
return nil, errors.New(string("msg's must be Insert"))
@ -60,7 +62,7 @@ func newRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, erro
for index, key := range keys {
_, ok := result[key]
if !ok {
msgPack := MsgPack{}
msgPack := msgstream.MsgPack{}
result[key] = &msgPack
}
@ -75,13 +77,13 @@ func newRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, erro
PartitionName: insertRequest.PartitionName,
SegmentID: insertRequest.SegmentID,
ChannelID: insertRequest.ChannelID,
Timestamps: []Timestamp{insertRequest.Timestamps[index]},
Timestamps: []msgstream.Timestamp{insertRequest.Timestamps[index]},
RowIDs: []int64{insertRequest.RowIDs[index]},
RowData: []*commonpb.Blob{insertRequest.RowData[index]},
}
insertMsg := &InsertTask{
InsertMsg: InsertMsg{InsertRequest: sliceRequest},
InsertMsg: msgstream.InsertMsg{InsertRequest: sliceRequest},
}
result[key].Msgs = append(result[key].Msgs, insertMsg)
@ -90,8 +92,8 @@ func newRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, erro
return result, nil
}
func getInsertTask(reqID UniqueID, hashValue uint32) TsMsg {
baseMsg := BaseMsg{
func getInsertTask(reqID msgstream.UniqueID, hashValue uint32) msgstream.TsMsg {
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{hashValue},
@ -107,11 +109,11 @@ func getInsertTask(reqID UniqueID, hashValue uint32) TsMsg {
PartitionName: "Partition",
SegmentID: 1,
ChannelID: "1",
Timestamps: []Timestamp{1},
Timestamps: []msgstream.Timestamp{1},
RowIDs: []int64{1},
RowData: []*commonpb.Blob{{}},
}
insertMsg := InsertMsg{
insertMsg := msgstream.InsertMsg{
BaseMsg: baseMsg,
InsertRequest: insertRequest,
}
@ -129,7 +131,7 @@ func TestStream_task_Insert(t *testing.T) {
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getInsertTask(1, 1))
msgPack.Msgs = append(msgPack.Msgs, getInsertTask(3, 3))
@ -141,7 +143,7 @@ func TestStream_task_Insert(t *testing.T) {
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
testTask := InsertTask{}
unmarshalDispatcher.AddMsgTemplate(commonpb.MsgType_kInsert, testTask.Unmarshal)
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)

View File

@ -0,0 +1,585 @@
package pulsarms
import (
"context"
"log"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/golang/protobuf/proto"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
)
type TsMsg = msgstream.TsMsg
type MsgPack = msgstream.MsgPack
type MsgType = msgstream.MsgType
type UniqueID = msgstream.UniqueID
type BaseMsg = msgstream.BaseMsg
type Timestamp = msgstream.Timestamp
type IntPrimaryKey = msgstream.IntPrimaryKey
type TimeTickMsg = msgstream.TimeTickMsg
type QueryNodeStatsMsg = msgstream.QueryNodeStatsMsg
type RepackFunc = msgstream.RepackFunc
type PulsarMsgStream struct {
ctx context.Context
client *pulsar.Client
producers []*pulsar.Producer
consumers []*pulsar.Consumer
repackFunc RepackFunc
unmarshal *util.UnmarshalDispatcher
receiveBuf chan *MsgPack
wait *sync.WaitGroup
streamCancel func()
}
func NewPulsarMsgStream(ctx context.Context, receiveBufSize int64) *PulsarMsgStream {
streamCtx, streamCancel := context.WithCancel(ctx)
stream := &PulsarMsgStream{
ctx: streamCtx,
streamCancel: streamCancel,
}
stream.receiveBuf = make(chan *MsgPack, receiveBufSize)
return stream
}
func (ms *PulsarMsgStream) SetPulsarClient(address string) {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address})
if err != nil {
log.Printf("Set pulsar client failed, error = %v", err)
}
ms.client = &client
}
func (ms *PulsarMsgStream) CreatePulsarProducers(channels []string) {
for i := 0; i < len(channels); i++ {
fn := func() error {
pp, err := (*ms.client).CreateProducer(pulsar.ProducerOptions{Topic: channels[i]})
if err != nil {
return err
}
if pp == nil {
return errors.New("pulsar is not ready, producer is nil")
}
ms.producers = append(ms.producers, &pp)
return nil
}
err := util.Retry(10, time.Millisecond*200, fn)
if err != nil {
errMsg := "Failed to create producer " + channels[i] + ", error = " + err.Error()
panic(errMsg)
}
}
}
func (ms *PulsarMsgStream) CreatePulsarConsumers(channels []string,
subName string,
unmarshal *util.UnmarshalDispatcher,
pulsarBufSize int64) {
ms.unmarshal = unmarshal
for i := 0; i < len(channels); i++ {
fn := func() error {
receiveChannel := make(chan pulsar.ConsumerMessage, pulsarBufSize)
pc, err := (*ms.client).Subscribe(pulsar.ConsumerOptions{
Topic: channels[i],
SubscriptionName: subName,
Type: pulsar.KeyShared,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
MessageChannel: receiveChannel,
})
if err != nil {
return err
}
if pc == nil {
return errors.New("pulsar is not ready, consumer is nil")
}
ms.consumers = append(ms.consumers, &pc)
return nil
}
err := util.Retry(10, time.Millisecond*200, fn)
if err != nil {
errMsg := "Failed to create consumer " + channels[i] + ", error = " + err.Error()
panic(errMsg)
}
}
}
func (ms *PulsarMsgStream) SetRepackFunc(repackFunc RepackFunc) {
ms.repackFunc = repackFunc
}
func (ms *PulsarMsgStream) Start() {
ms.wait = &sync.WaitGroup{}
if ms.consumers != nil {
ms.wait.Add(1)
go ms.bufMsgPackToChannel()
}
}
func (ms *PulsarMsgStream) Close() {
ms.streamCancel()
for _, producer := range ms.producers {
if producer != nil {
(*producer).Close()
}
}
for _, consumer := range ms.consumers {
if consumer != nil {
(*consumer).Close()
}
}
if ms.client != nil {
(*ms.client).Close()
}
}
type propertiesReaderWriter struct {
ppMap map[string]string
}
func (ppRW *propertiesReaderWriter) Set(key, val string) {
// The GRPC HPACK implementation rejects any uppercase keys here.
//
// As such, since the HTTP_HEADERS format is case-insensitive anyway, we
// blindly lowercase the key (which is guaranteed to work in the
// Inject/Extract sense per the OpenTracing spec).
key = strings.ToLower(key)
ppRW.ppMap[key] = val
}
func (ppRW *propertiesReaderWriter) ForeachKey(handler func(key, val string) error) error {
for k, val := range ppRW.ppMap {
if err := handler(k, val); err != nil {
return err
}
}
return nil
}
func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
tsMsgs := msgPack.Msgs
if len(tsMsgs) <= 0 {
log.Printf("Warning: Receive empty msgPack")
return nil
}
if len(ms.producers) <= 0 {
return errors.New("nil producer in msg stream")
}
reBucketValues := make([][]int32, len(tsMsgs))
for channelID, tsMsg := range tsMsgs {
hashValues := tsMsg.HashKeys()
bucketValues := make([]int32, len(hashValues))
for index, hashValue := range hashValues {
if tsMsg.Type() == commonpb.MsgType_kSearchResult {
searchResult := tsMsg.(*msgstream.SearchResultMsg)
channelID := searchResult.ResultChannelID
channelIDInt, _ := strconv.ParseInt(channelID, 10, 64)
if channelIDInt >= int64(len(ms.producers)) {
return errors.New("Failed to produce pulsar msg to unKnow channel")
}
bucketValues[index] = int32(channelIDInt)
continue
}
bucketValues[index] = int32(hashValue % uint32(len(ms.producers)))
}
reBucketValues[channelID] = bucketValues
}
var result map[int32]*MsgPack
var err error
if ms.repackFunc != nil {
result, err = ms.repackFunc(tsMsgs, reBucketValues)
} else {
msgType := (tsMsgs[0]).Type()
switch msgType {
case commonpb.MsgType_kInsert:
result, err = msgstream.InsertRepackFunc(tsMsgs, reBucketValues)
case commonpb.MsgType_kDelete:
result, err = msgstream.DeleteRepackFunc(tsMsgs, reBucketValues)
default:
result, err = msgstream.DefaultRepackFunc(tsMsgs, reBucketValues)
}
}
if err != nil {
return err
}
for k, v := range result {
for i := 0; i < len(v.Msgs); i++ {
mb, err := v.Msgs[i].Marshal(v.Msgs[i])
if err != nil {
return err
}
msg := &pulsar.ProducerMessage{Payload: mb}
var child opentracing.Span
if v.Msgs[i].Type() == commonpb.MsgType_kInsert ||
v.Msgs[i].Type() == commonpb.MsgType_kSearch ||
v.Msgs[i].Type() == commonpb.MsgType_kSearchResult {
tracer := opentracing.GlobalTracer()
ctx := v.Msgs[i].GetMsgContext()
if ctx == nil {
ctx = context.Background()
}
if parent := opentracing.SpanFromContext(ctx); parent != nil {
child = tracer.StartSpan("start send pulsar msg",
opentracing.FollowsFrom(parent.Context()))
} else {
child = tracer.StartSpan("start send pulsar msg")
}
child.SetTag("hash keys", v.Msgs[i].HashKeys())
child.SetTag("start time", v.Msgs[i].BeginTs())
child.SetTag("end time", v.Msgs[i].EndTs())
child.SetTag("msg type", v.Msgs[i].Type())
msg.Properties = make(map[string]string)
err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties})
if err != nil {
child.LogFields(oplog.Error(err))
child.Finish()
return err
}
child.LogFields(oplog.String("inject success", "inject success"))
}
if _, err := (*ms.producers[k]).Send(
context.Background(),
msg,
); err != nil {
if child != nil {
child.LogFields(oplog.Error(err))
child.Finish()
}
return err
}
if child != nil {
child.Finish()
}
}
}
return nil
}
func (ms *PulsarMsgStream) Broadcast(msgPack *MsgPack) error {
producerLen := len(ms.producers)
for _, v := range msgPack.Msgs {
mb, err := v.Marshal(v)
if err != nil {
return err
}
msg := &pulsar.ProducerMessage{Payload: mb}
var child opentracing.Span
if v.Type() == commonpb.MsgType_kInsert ||
v.Type() == commonpb.MsgType_kSearch ||
v.Type() == commonpb.MsgType_kSearchResult {
tracer := opentracing.GlobalTracer()
ctx := v.GetMsgContext()
if ctx == nil {
ctx = context.Background()
}
if parent := opentracing.SpanFromContext(ctx); parent != nil {
child = tracer.StartSpan("start send pulsar msg",
opentracing.FollowsFrom(parent.Context()))
} else {
child = tracer.StartSpan("start send pulsar msg, start time: %d")
}
child.SetTag("hash keys", v.HashKeys())
child.SetTag("start time", v.BeginTs())
child.SetTag("end time", v.EndTs())
child.SetTag("msg type", v.Type())
msg.Properties = make(map[string]string)
err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties})
if err != nil {
child.LogFields(oplog.Error(err))
child.Finish()
return err
}
child.LogFields(oplog.String("inject success", "inject success"))
}
for i := 0; i < producerLen; i++ {
if _, err := (*ms.producers[i]).Send(
context.Background(),
msg,
); err != nil {
if child != nil {
child.LogFields(oplog.Error(err))
child.Finish()
}
return err
}
}
if child != nil {
child.Finish()
}
}
return nil
}
func (ms *PulsarMsgStream) Consume() *MsgPack {
for {
select {
case cm, ok := <-ms.receiveBuf:
if !ok {
log.Println("buf chan closed")
return nil
}
return cm
case <-ms.ctx.Done():
log.Printf("context closed")
return nil
}
}
}
func (ms *PulsarMsgStream) bufMsgPackToChannel() {
defer ms.wait.Done()
cases := make([]reflect.SelectCase, len(ms.consumers))
for i := 0; i < len(ms.consumers); i++ {
ch := (*ms.consumers[i]).Chan()
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
for {
select {
case <-ms.ctx.Done():
log.Println("done")
return
default:
tsMsgList := make([]TsMsg, 0)
for {
chosen, value, ok := reflect.Select(cases)
if !ok {
log.Printf("channel closed")
return
}
pulsarMsg, ok := value.Interface().(pulsar.ConsumerMessage)
if !ok {
log.Printf("type assertion failed, not consumer message type")
continue
}
(*ms.consumers[chosen]).AckID(pulsarMsg.ID())
headerMsg := commonpb.MsgHeader{}
err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg)
if err != nil {
log.Printf("Failed to unmarshal message header, error = %v", err)
continue
}
tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType)
if tsMsg.Type() == commonpb.MsgType_kSearch ||
tsMsg.Type() == commonpb.MsgType_kSearchResult {
tracer := opentracing.GlobalTracer()
spanContext, err := tracer.Extract(opentracing.HTTPHeaders, &propertiesReaderWriter{pulsarMsg.Properties()})
if err != nil {
log.Println("extract message err")
log.Println(err.Error())
}
span := opentracing.StartSpan("pulsar msg received",
ext.RPCServerOption(spanContext))
span.SetTag("msg type", tsMsg.Type())
span.SetTag("hash keys", tsMsg.HashKeys())
span.SetTag("start time", tsMsg.BeginTs())
span.SetTag("end time", tsMsg.EndTs())
tsMsg.SetMsgContext(opentracing.ContextWithSpan(context.Background(), span))
span.Finish()
}
if err != nil {
log.Printf("Failed to unmarshal tsMsg, error = %v", err)
continue
}
tsMsgList = append(tsMsgList, tsMsg)
noMoreMessage := true
for i := 0; i < len(ms.consumers); i++ {
if len((*ms.consumers[i]).Chan()) > 0 {
noMoreMessage = false
}
}
if noMoreMessage {
break
}
}
if len(tsMsgList) > 0 {
msgPack := MsgPack{Msgs: tsMsgList}
ms.receiveBuf <- &msgPack
}
}
}
}
func (ms *PulsarMsgStream) Chan() <-chan *MsgPack {
return ms.receiveBuf
}
type PulsarTtMsgStream struct {
PulsarMsgStream
inputBuf []TsMsg
unsolvedBuf []TsMsg
lastTimeStamp Timestamp
}
func NewPulsarTtMsgStream(ctx context.Context, receiveBufSize int64) *PulsarTtMsgStream {
streamCtx, streamCancel := context.WithCancel(ctx)
pulsarMsgStream := PulsarMsgStream{
ctx: streamCtx,
streamCancel: streamCancel,
}
pulsarMsgStream.receiveBuf = make(chan *MsgPack, receiveBufSize)
return &PulsarTtMsgStream{
PulsarMsgStream: pulsarMsgStream,
}
}
func (ms *PulsarTtMsgStream) Start() {
ms.wait = &sync.WaitGroup{}
if ms.consumers != nil {
ms.wait.Add(1)
go ms.bufMsgPackToChannel()
}
}
func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
defer ms.wait.Done()
ms.unsolvedBuf = make([]TsMsg, 0)
ms.inputBuf = make([]TsMsg, 0)
isChannelReady := make([]bool, len(ms.consumers))
eofMsgTimeStamp := make(map[int]Timestamp)
spans := make(map[Timestamp]opentracing.Span)
ctxs := make(map[Timestamp]context.Context)
for {
select {
case <-ms.ctx.Done():
return
default:
wg := sync.WaitGroup{}
mu := sync.Mutex{}
findMapMutex := sync.RWMutex{}
for i := 0; i < len(ms.consumers); i++ {
if isChannelReady[i] {
continue
}
wg.Add(1)
go ms.findTimeTick(i, eofMsgTimeStamp, &wg, &mu, &findMapMutex)
}
wg.Wait()
timeStamp, ok := msgstream.CheckTimeTickMsg(eofMsgTimeStamp, isChannelReady, &findMapMutex)
if !ok || timeStamp <= ms.lastTimeStamp {
log.Printf("All timeTick's timestamps are inconsistent")
continue
}
timeTickBuf := make([]TsMsg, 0)
ms.inputBuf = append(ms.inputBuf, ms.unsolvedBuf...)
ms.unsolvedBuf = ms.unsolvedBuf[:0]
for _, v := range ms.inputBuf {
var ctx context.Context
var span opentracing.Span
if v.Type() == commonpb.MsgType_kInsert {
if _, ok := spans[v.BeginTs()]; !ok {
span, ctx = opentracing.StartSpanFromContext(v.GetMsgContext(), "after find time tick")
ctxs[v.BeginTs()] = ctx
spans[v.BeginTs()] = span
}
}
if v.EndTs() <= timeStamp {
timeTickBuf = append(timeTickBuf, v)
if v.Type() == commonpb.MsgType_kInsert {
v.SetMsgContext(ctxs[v.BeginTs()])
spans[v.BeginTs()].Finish()
delete(spans, v.BeginTs())
}
} else {
ms.unsolvedBuf = append(ms.unsolvedBuf, v)
}
}
ms.inputBuf = ms.inputBuf[:0]
msgPack := MsgPack{
BeginTs: ms.lastTimeStamp,
EndTs: timeStamp,
Msgs: timeTickBuf,
}
ms.receiveBuf <- &msgPack
ms.lastTimeStamp = timeStamp
}
}
}
func (ms *PulsarTtMsgStream) findTimeTick(channelIndex int,
eofMsgMap map[int]Timestamp,
wg *sync.WaitGroup,
mu *sync.Mutex,
findMapMutex *sync.RWMutex) {
defer wg.Done()
for {
select {
case <-ms.ctx.Done():
return
case pulsarMsg, ok := <-(*ms.consumers[channelIndex]).Chan():
if !ok {
log.Printf("consumer closed!")
return
}
(*ms.consumers[channelIndex]).Ack(pulsarMsg)
headerMsg := commonpb.MsgHeader{}
err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg)
if err != nil {
log.Printf("Failed to unmarshal, error = %v", err)
}
unMarshalFunc := (*ms.unmarshal).TempMap[headerMsg.Base.MsgType]
if unMarshalFunc == nil {
panic("null unMarshalFunc for " + headerMsg.Base.MsgType.String() + " msg type")
}
tsMsg, err := unMarshalFunc(pulsarMsg.Payload())
if err != nil {
log.Printf("Failed to unmarshal, error = %v", err)
}
if tsMsg.Type() == commonpb.MsgType_kInsert {
tracer := opentracing.GlobalTracer()
spanContext, err := tracer.Extract(opentracing.HTTPHeaders, &propertiesReaderWriter{pulsarMsg.Properties()})
if err != nil {
log.Println("extract message err")
log.Println(err.Error())
}
span := opentracing.StartSpan("pulsar msg received",
ext.RPCServerOption(spanContext))
span.SetTag("hash keys", tsMsg.HashKeys())
span.SetTag("start time", tsMsg.BeginTs())
span.SetTag("end time", tsMsg.EndTs())
span.SetTag("msg type", tsMsg.Type())
tsMsg.SetMsgContext(opentracing.ContextWithSpan(context.Background(), span))
span.Finish()
}
if headerMsg.Base.MsgType == commonpb.MsgType_kTimeTick {
findMapMutex.Lock()
eofMsgMap[channelIndex] = tsMsg.(*TimeTickMsg).Base.Timestamp
findMapMutex.Unlock()
return
}
mu.Lock()
ms.inputBuf = append(ms.inputBuf, tsMsg)
mu.Unlock()
}
}
}

View File

@ -1,4 +1,4 @@
package msgstream
package pulsarms
import (
"context"
@ -7,6 +7,8 @@ import (
"os"
"testing"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
@ -20,156 +22,11 @@ func TestMain(m *testing.M) {
os.Exit(exitCode)
}
func repackFunc(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range msgs {
keys := hashKeys[i]
for _, channelID := range keys {
_, ok := result[channelID]
if ok == false {
msgPack := MsgPack{}
result[channelID] = &msgPack
}
result[channelID].Msgs = append(result[channelID].Msgs, request)
}
}
return result, nil
}
func getTsMsg(msgType MsgType, reqID UniqueID, hashValue uint32) TsMsg {
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{hashValue},
}
switch msgType {
case commonpb.MsgType_kInsert:
insertRequest := internalpb2.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kInsert,
MsgID: reqID,
Timestamp: 11,
SourceID: reqID,
},
CollectionName: "Collection",
PartitionName: "Partition",
SegmentID: 1,
ChannelID: "0",
Timestamps: []Timestamp{1},
RowIDs: []int64{1},
RowData: []*commonpb.Blob{{}},
}
insertMsg := &InsertMsg{
BaseMsg: baseMsg,
InsertRequest: insertRequest,
}
return insertMsg
case commonpb.MsgType_kDelete:
deleteRequest := internalpb2.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDelete,
MsgID: reqID,
Timestamp: 11,
SourceID: reqID,
},
CollectionName: "Collection",
ChannelID: "1",
Timestamps: []Timestamp{1},
PrimaryKeys: []IntPrimaryKey{1},
}
deleteMsg := &DeleteMsg{
BaseMsg: baseMsg,
DeleteRequest: deleteRequest,
}
return deleteMsg
case commonpb.MsgType_kSearch:
searchRequest := internalpb2.SearchRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kSearch,
MsgID: reqID,
Timestamp: 11,
SourceID: reqID,
},
Query: nil,
ResultChannelID: "0",
}
searchMsg := &SearchMsg{
BaseMsg: baseMsg,
SearchRequest: searchRequest,
}
return searchMsg
case commonpb.MsgType_kSearchResult:
searchResult := internalpb2.SearchResults{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kSearchResult,
MsgID: reqID,
Timestamp: 1,
SourceID: reqID,
},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS},
ResultChannelID: "0",
}
searchResultMsg := &SearchResultMsg{
BaseMsg: baseMsg,
SearchResults: searchResult,
}
return searchResultMsg
case commonpb.MsgType_kTimeTick:
timeTickResult := internalpb2.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kTimeTick,
MsgID: reqID,
Timestamp: 1,
SourceID: reqID,
},
}
timeTickMsg := &TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
return timeTickMsg
case commonpb.MsgType_kQueryNodeStats:
queryNodeSegStats := internalpb2.QueryNodeStats{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kQueryNodeStats,
SourceID: reqID,
},
}
queryNodeSegStatsMsg := &QueryNodeStatsMsg{
BaseMsg: baseMsg,
QueryNodeStats: queryNodeSegStats,
}
return queryNodeSegStatsMsg
}
return nil
}
func getTimeTickMsg(reqID UniqueID, hashValue uint32, time uint64) TsMsg {
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{hashValue},
}
timeTickResult := internalpb2.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kTimeTick,
MsgID: reqID,
Timestamp: time,
SourceID: reqID,
},
}
timeTickMsg := &TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
return timeTickMsg
}
func initPulsarStream(pulsarAddress string,
producerChannels []string,
consumerChannels []string,
consumerSubName string,
opts ...RepackFunc) (*MsgStream, *MsgStream) {
opts ...msgstream.RepackFunc) (*msgstream.MsgStream, *msgstream.MsgStream) {
// set input stream
inputStream := NewPulsarMsgStream(context.Background(), 100)
@ -179,15 +36,15 @@ func initPulsarStream(pulsarAddress string,
inputStream.SetRepackFunc(opt)
}
inputStream.Start()
var input MsgStream = inputStream
var input msgstream.MsgStream = inputStream
// set output stream
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
var output MsgStream = outputStream
var output msgstream.MsgStream = outputStream
return &input, &output
}
@ -196,7 +53,7 @@ func initPulsarTtStream(pulsarAddress string,
producerChannels []string,
consumerChannels []string,
consumerSubName string,
opts ...RepackFunc) (*MsgStream, *MsgStream) {
opts ...msgstream.RepackFunc) (*msgstream.MsgStream, *msgstream.MsgStream) {
// set input stream
inputStream := NewPulsarMsgStream(context.Background(), 100)
@ -206,20 +63,20 @@ func initPulsarTtStream(pulsarAddress string,
inputStream.SetRepackFunc(opt)
}
inputStream.Start()
var input MsgStream = inputStream
var input msgstream.MsgStream = inputStream
// set output stream
outputStream := NewPulsarTtMsgStream(context.Background(), 100)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
var output MsgStream = outputStream
var output msgstream.MsgStream = outputStream
return &input, &output
}
func receiveMsg(outputStream *MsgStream, msgCount int) {
func receiveMsg(outputStream *msgstream.MsgStream, msgCount int) {
receiveCount := 0
for {
result := (*outputStream).Consume()
@ -242,9 +99,9 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) {
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kInsert, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := (*inputStream).Produce(&msgPack)
@ -264,8 +121,8 @@ func TestStream_PulsarMsgStream_Delete(t *testing.T) {
consumerChannels := []string{"delete"}
consumerSubName := "subDelete"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kDelete, 1, 1))
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kDelete, 1, 1))
//msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kDelete, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
@ -284,9 +141,9 @@ func TestStream_PulsarMsgStream_Search(t *testing.T) {
consumerChannels := []string{"search"}
consumerSubName := "subSearch"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearch, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearch, 3, 3))
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kSearch, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kSearch, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := (*inputStream).Produce(&msgPack)
@ -304,9 +161,9 @@ func TestStream_PulsarMsgStream_SearchResult(t *testing.T) {
consumerChannels := []string{"searchResult"}
consumerSubName := "subSearchResult"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 3, 3))
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kSearchResult, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kSearchResult, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := (*inputStream).Produce(&msgPack)
@ -324,9 +181,9 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
consumerChannels := []string{"timeTick"}
consumerSubName := "subTimeTick"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 3, 3))
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kTimeTick, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := (*inputStream).Produce(&msgPack)
@ -344,9 +201,9 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 3, 3))
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kTimeTick, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := (*inputStream).Broadcast(&msgPack)
@ -364,11 +221,11 @@ func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) {
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kInsert, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, repackFunc)
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, util.RepackFunc)
err := (*inputStream).Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
@ -384,7 +241,7 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
baseMsg := BaseMsg{
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{1, 3},
@ -401,16 +258,16 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
PartitionName: "Partition",
SegmentID: 1,
ChannelID: "1",
Timestamps: []Timestamp{1, 1},
Timestamps: []msgstream.Timestamp{1, 1},
RowIDs: []int64{1, 3},
RowData: []*commonpb.Blob{{}, {}},
}
insertMsg := &InsertMsg{
insertMsg := &msgstream.InsertMsg{
BaseMsg: baseMsg,
InsertRequest: insertRequest,
}
msgPack := MsgPack{}
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
inputStream := NewPulsarMsgStream(context.Background(), 100)
@ -420,10 +277,10 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
var output MsgStream = outputStream
var output msgstream.MsgStream = outputStream
err := (*inputStream).Produce(&msgPack)
if err != nil {
@ -440,7 +297,7 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
baseMsg := BaseMsg{
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{1, 3},
@ -455,15 +312,15 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
},
CollectionName: "Collection",
ChannelID: "1",
Timestamps: []Timestamp{1, 1},
Timestamps: []msgstream.Timestamp{1, 1},
PrimaryKeys: []int64{1, 3},
}
deleteMsg := &DeleteMsg{
deleteMsg := &msgstream.DeleteMsg{
BaseMsg: baseMsg,
DeleteRequest: deleteRequest,
}
msgPack := MsgPack{}
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, deleteMsg)
inputStream := NewPulsarMsgStream(context.Background(), 100)
@ -473,10 +330,10 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
var output MsgStream = outputStream
var output msgstream.MsgStream = outputStream
err := (*inputStream).Produce(&msgPack)
if err != nil {
@ -493,11 +350,11 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearch, 2, 2))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 3, 3))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kQueryNodeStats, 4, 4))
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kSearch, 2, 2))
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kSearchResult, 3, 3))
msgPack.Msgs = append(msgPack.Msgs, util.GetTsMsg(commonpb.MsgType_kQueryNodeStats, 4, 4))
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarClient(pulsarAddress)
@ -506,10 +363,10 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
var output MsgStream = outputStream
var output msgstream.MsgStream = outputStream
err := (*inputStream).Produce(&msgPack)
if err != nil {
@ -526,15 +383,15 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack0 := MsgPack{}
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))
msgPack0 := msgstream.MsgPack{}
msgPack0.Msgs = append(msgPack0.Msgs, util.GetTimeTickMsg(0, 0, 0))
msgPack1 := MsgPack{}
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
msgPack1 := msgstream.MsgPack{}
msgPack1.Msgs = append(msgPack1.Msgs, util.GetTsMsg(commonpb.MsgType_kInsert, 1, 1))
msgPack1.Msgs = append(msgPack1.Msgs, util.GetTsMsg(commonpb.MsgType_kInsert, 3, 3))
msgPack2 := MsgPack{}
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))
msgPack2 := msgstream.MsgPack{}
msgPack2.Msgs = append(msgPack2.Msgs, util.GetTimeTickMsg(5, 5, 5))
inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := (*inputStream).Broadcast(&msgPack0)
@ -560,15 +417,15 @@ func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) {
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack0 := MsgPack{}
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))
msgPack0 := msgstream.MsgPack{}
msgPack0.Msgs = append(msgPack0.Msgs, util.GetTimeTickMsg(0, 0, 0))
msgPack1 := MsgPack{}
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
msgPack1 := msgstream.MsgPack{}
msgPack1.Msgs = append(msgPack1.Msgs, util.GetTsMsg(commonpb.MsgType_kInsert, 1, 1))
msgPack1.Msgs = append(msgPack1.Msgs, util.GetTsMsg(commonpb.MsgType_kInsert, 3, 3))
msgPack2 := MsgPack{}
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))
msgPack2 := msgstream.MsgPack{}
msgPack2.Msgs = append(msgPack2.Msgs, util.GetTimeTickMsg(5, 5, 5))
inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := (*inputStream).Broadcast(&msgPack0)

View File

@ -1,60 +0,0 @@
package msgstream
import (
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
)
type MarshalFunc func(TsMsg) ([]byte, error)
type UnmarshalFunc func([]byte) (TsMsg, error)
type UnmarshalDispatcher struct {
tempMap map[commonpb.MsgType]UnmarshalFunc
}
func (dispatcher *UnmarshalDispatcher) Unmarshal(input []byte, msgType commonpb.MsgType) (TsMsg, error) {
unmarshalFunc, ok := dispatcher.tempMap[msgType]
if !ok {
return nil, errors.New(string("Not set unmarshalFunc for this messageType."))
}
return unmarshalFunc(input)
}
func (dispatcher *UnmarshalDispatcher) AddMsgTemplate(msgType commonpb.MsgType, unmarshal UnmarshalFunc) {
dispatcher.tempMap[msgType] = unmarshal
}
func (dispatcher *UnmarshalDispatcher) addDefaultMsgTemplates() {
insertMsg := InsertMsg{}
deleteMsg := DeleteMsg{}
searchMsg := SearchMsg{}
searchResultMsg := SearchResultMsg{}
timeTickMsg := TimeTickMsg{}
createCollectionMsg := CreateCollectionMsg{}
dropCollectionMsg := DropCollectionMsg{}
createPartitionMsg := CreatePartitionMsg{}
dropPartitionMsg := DropPartitionMsg{}
loadIndexMsg := LoadIndexMsg{}
flushMsg := FlushMsg{}
queryNodeSegStatsMsg := QueryNodeStatsMsg{}
dispatcher.tempMap = make(map[commonpb.MsgType]UnmarshalFunc)
dispatcher.tempMap[commonpb.MsgType_kInsert] = insertMsg.Unmarshal
dispatcher.tempMap[commonpb.MsgType_kDelete] = deleteMsg.Unmarshal
dispatcher.tempMap[commonpb.MsgType_kSearch] = searchMsg.Unmarshal
dispatcher.tempMap[commonpb.MsgType_kSearchResult] = searchResultMsg.Unmarshal
dispatcher.tempMap[commonpb.MsgType_kTimeTick] = timeTickMsg.Unmarshal
dispatcher.tempMap[commonpb.MsgType_kQueryNodeStats] = queryNodeSegStatsMsg.Unmarshal
dispatcher.tempMap[commonpb.MsgType_kCreateCollection] = createCollectionMsg.Unmarshal
dispatcher.tempMap[commonpb.MsgType_kDropCollection] = dropCollectionMsg.Unmarshal
dispatcher.tempMap[commonpb.MsgType_kCreatePartition] = createPartitionMsg.Unmarshal
dispatcher.tempMap[commonpb.MsgType_kDropPartition] = dropPartitionMsg.Unmarshal
dispatcher.tempMap[commonpb.MsgType_kLoadIndex] = loadIndexMsg.Unmarshal
dispatcher.tempMap[commonpb.MsgType_kFlush] = flushMsg.Unmarshal
}
func NewUnmarshalDispatcher() *UnmarshalDispatcher {
unmarshalDispatcher := UnmarshalDispatcher{}
unmarshalDispatcher.addDefaultMsgTemplates()
return &unmarshalDispatcher
}

View File

@ -1,72 +0,0 @@
package msgstream
import (
"context"
"fmt"
"log"
"testing"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
func newInsertMsgUnmarshal(input []byte) (TsMsg, error) {
insertRequest := internalpb2.InsertRequest{}
err := proto.Unmarshal(input, &insertRequest)
insertMsg := &InsertMsg{InsertRequest: insertRequest}
fmt.Println("use func newInsertMsgUnmarshal unmarshal")
if err != nil {
return nil, err
}
return insertMsg, nil
}
func TestStream_unmarshal_Insert(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
inputStream.Start()
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
//add a new unmarshall func for msgType kInsert
unmarshalDispatcher.AddMsgTemplate(commonpb.MsgType_kInsert, newInsertMsgUnmarshal)
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
err := inputStream.Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
receiveCount := 0
for {
result := (*outputStream).Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
fmt.Println("msg type: ", v.Type(), ", msg value: ", v, "msg tag: ")
}
}
if receiveCount >= len(msgPack.Msgs) {
break
}
}
inputStream.Close()
outputStream.Close()
}

View File

@ -1,4 +1,4 @@
package msgstream
package util
import (
"log"

View File

@ -0,0 +1,61 @@
package util
import (
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
)
type MarshalFunc func(msgstream.TsMsg) ([]byte, error)
type UnmarshalFunc func([]byte) (msgstream.TsMsg, error)
type UnmarshalDispatcher struct {
TempMap map[commonpb.MsgType]UnmarshalFunc
}
func (dispatcher *UnmarshalDispatcher) Unmarshal(input []byte, msgType commonpb.MsgType) (msgstream.TsMsg, error) {
unmarshalFunc, ok := dispatcher.TempMap[msgType]
if !ok {
return nil, errors.New(string("Not set unmarshalFunc for this messageType."))
}
return unmarshalFunc(input)
}
func (dispatcher *UnmarshalDispatcher) AddMsgTemplate(msgType commonpb.MsgType, unmarshal UnmarshalFunc) {
dispatcher.TempMap[msgType] = unmarshal
}
func (dispatcher *UnmarshalDispatcher) addDefaultMsgTemplates() {
insertMsg := msgstream.InsertMsg{}
deleteMsg := msgstream.DeleteMsg{}
searchMsg := msgstream.SearchMsg{}
searchResultMsg := msgstream.SearchResultMsg{}
timeTickMsg := msgstream.TimeTickMsg{}
createCollectionMsg := msgstream.CreateCollectionMsg{}
dropCollectionMsg := msgstream.DropCollectionMsg{}
createPartitionMsg := msgstream.CreatePartitionMsg{}
dropPartitionMsg := msgstream.DropPartitionMsg{}
loadIndexMsg := msgstream.LoadIndexMsg{}
flushMsg := msgstream.FlushMsg{}
queryNodeSegStatsMsg := msgstream.QueryNodeStatsMsg{}
dispatcher.TempMap = make(map[commonpb.MsgType]UnmarshalFunc)
dispatcher.TempMap[commonpb.MsgType_kInsert] = insertMsg.Unmarshal
dispatcher.TempMap[commonpb.MsgType_kDelete] = deleteMsg.Unmarshal
dispatcher.TempMap[commonpb.MsgType_kSearch] = searchMsg.Unmarshal
dispatcher.TempMap[commonpb.MsgType_kSearchResult] = searchResultMsg.Unmarshal
dispatcher.TempMap[commonpb.MsgType_kTimeTick] = timeTickMsg.Unmarshal
dispatcher.TempMap[commonpb.MsgType_kQueryNodeStats] = queryNodeSegStatsMsg.Unmarshal
dispatcher.TempMap[commonpb.MsgType_kCreateCollection] = createCollectionMsg.Unmarshal
dispatcher.TempMap[commonpb.MsgType_kDropCollection] = dropCollectionMsg.Unmarshal
dispatcher.TempMap[commonpb.MsgType_kCreatePartition] = createPartitionMsg.Unmarshal
dispatcher.TempMap[commonpb.MsgType_kDropPartition] = dropPartitionMsg.Unmarshal
dispatcher.TempMap[commonpb.MsgType_kLoadIndex] = loadIndexMsg.Unmarshal
dispatcher.TempMap[commonpb.MsgType_kFlush] = flushMsg.Unmarshal
}
func NewUnmarshalDispatcher() *UnmarshalDispatcher {
unmarshalDispatcher := UnmarshalDispatcher{}
unmarshalDispatcher.addDefaultMsgTemplates()
return &unmarshalDispatcher
}

View File

@ -0,0 +1,85 @@
package util
import (
"fmt"
"testing"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
var Params paramtable.BaseTable
func newInsertMsgUnmarshal(input []byte) (msgstream.TsMsg, error) {
insertRequest := internalpb2.InsertRequest{}
err := proto.Unmarshal(input, &insertRequest)
insertMsg := &msgstream.InsertMsg{InsertRequest: insertRequest}
fmt.Println("use func newInsertMsgUnmarshal unmarshal")
if err != nil {
return nil, err
}
return insertMsg, nil
}
func TestStream_unmarshal_Insert(t *testing.T) {
//pulsarAddress, _ := Params.Load("_PulsarAddress")
//producerChannels := []string{"insert1", "insert2"}
//consumerChannels := []string{"insert1", "insert2"}
//consumerSubName := "subInsert"
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, GetTsMsg(commonpb.MsgType_kInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, GetTsMsg(commonpb.MsgType_kInsert, 3, 3))
//inputStream := pulsarms.NewPulsarMsgStream(context.Background(), 100)
//inputStream.SetPulsarClient(pulsarAddress)
//inputStream.CreatePulsarProducers(producerChannels)
//inputStream.Start()
//outputStream := pulsarms.NewPulsarMsgStream(context.Background(), 100)
//outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
//add a new unmarshall func for msgType kInsert
unmarshalDispatcher.AddMsgTemplate(commonpb.MsgType_kInsert, newInsertMsgUnmarshal)
for _, v := range msgPack.Msgs {
headerMsg := commonpb.MsgHeader{}
payload, err := v.Marshal(v)
assert.Nil(t, err)
err = proto.Unmarshal(payload, &headerMsg)
assert.Nil(t, err)
msg, err := unmarshalDispatcher.Unmarshal(payload, headerMsg.Base.MsgType)
assert.Nil(t, err)
fmt.Println("msg type: ", msg.Type(), ", msg value: ", msg, "msg tag: ")
}
//outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
//outputStream.Start()
//err := inputStream.Produce(&msgPack)
//if err != nil {
// log.Fatalf("produce error = %v", err)
//}
//receiveCount := 0
//for {
// result := (*outputStream).Consume()
// if len(result.Msgs) > 0 {
// msgs := result.Msgs
// for _, v := range msgs {
// receiveCount++
// fmt.Println("msg type: ", v.Type(), ", msg value: ", v, "msg tag: ")
// }
// }
// if receiveCount >= len(msgPack.Msgs) {
// break
// }
//}
//inputStream.Close()
//outputStream.Close()
}

View File

@ -0,0 +1,162 @@
package util
import (
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
type TsMsg = msgstream.TsMsg
type MsgPack = msgstream.MsgPack
type MsgType = msgstream.MsgType
type UniqueID = msgstream.UniqueID
type BaseMsg = msgstream.BaseMsg
type Timestamp = msgstream.Timestamp
type IntPrimaryKey = msgstream.IntPrimaryKey
type TimeTickMsg = msgstream.TimeTickMsg
type QueryNodeStatsMsg = msgstream.QueryNodeStatsMsg
func RepackFunc(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range msgs {
keys := hashKeys[i]
for _, channelID := range keys {
_, ok := result[channelID]
if !ok {
msgPack := MsgPack{}
result[channelID] = &msgPack
}
result[channelID].Msgs = append(result[channelID].Msgs, request)
}
}
return result, nil
}
func GetTsMsg(msgType MsgType, reqID UniqueID, hashValue uint32) TsMsg {
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{hashValue},
}
switch msgType {
case commonpb.MsgType_kInsert:
insertRequest := internalpb2.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kInsert,
MsgID: reqID,
Timestamp: 11,
SourceID: reqID,
},
CollectionName: "Collection",
PartitionName: "Partition",
SegmentID: 1,
ChannelID: "0",
Timestamps: []Timestamp{1},
RowIDs: []int64{1},
RowData: []*commonpb.Blob{{}},
}
insertMsg := &msgstream.InsertMsg{
BaseMsg: baseMsg,
InsertRequest: insertRequest,
}
return insertMsg
case commonpb.MsgType_kDelete:
deleteRequest := internalpb2.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDelete,
MsgID: reqID,
Timestamp: 11,
SourceID: reqID,
},
CollectionName: "Collection",
ChannelID: "1",
Timestamps: []Timestamp{1},
PrimaryKeys: []IntPrimaryKey{1},
}
deleteMsg := &msgstream.DeleteMsg{
BaseMsg: baseMsg,
DeleteRequest: deleteRequest,
}
return deleteMsg
case commonpb.MsgType_kSearch:
searchRequest := internalpb2.SearchRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kSearch,
MsgID: reqID,
Timestamp: 11,
SourceID: reqID,
},
Query: nil,
ResultChannelID: "0",
}
searchMsg := &msgstream.SearchMsg{
BaseMsg: baseMsg,
SearchRequest: searchRequest,
}
return searchMsg
case commonpb.MsgType_kSearchResult:
searchResult := internalpb2.SearchResults{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kSearchResult,
MsgID: reqID,
Timestamp: 1,
SourceID: reqID,
},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS},
ResultChannelID: "0",
}
searchResultMsg := &msgstream.SearchResultMsg{
BaseMsg: baseMsg,
SearchResults: searchResult,
}
return searchResultMsg
case commonpb.MsgType_kTimeTick:
timeTickResult := internalpb2.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kTimeTick,
MsgID: reqID,
Timestamp: 1,
SourceID: reqID,
},
}
timeTickMsg := &TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
return timeTickMsg
case commonpb.MsgType_kQueryNodeStats:
queryNodeSegStats := internalpb2.QueryNodeStats{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kQueryNodeStats,
SourceID: reqID,
},
}
queryNodeSegStatsMsg := &QueryNodeStatsMsg{
BaseMsg: baseMsg,
QueryNodeStats: queryNodeSegStats,
}
return queryNodeSegStatsMsg
}
return nil
}
func GetTimeTickMsg(reqID UniqueID, hashValue uint32, time uint64) TsMsg {
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{hashValue},
}
timeTickResult := internalpb2.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kTimeTick,
MsgID: reqID,
Timestamp: time,
SourceID: reqID,
},
}
timeTickMsg := &TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
return timeTickMsg
}

View File

@ -18,6 +18,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
@ -41,8 +42,8 @@ type Proxy struct {
tsoAllocator *allocator.TimestampAllocator
segAssigner *allocator.SegIDAssigner
manipulationMsgStream *msgstream.PulsarMsgStream
queryMsgStream *msgstream.PulsarMsgStream
manipulationMsgStream *pulsarms.PulsarMsgStream
queryMsgStream *pulsarms.PulsarMsgStream
tracer opentracing.Tracer
closer io.Closer
@ -80,7 +81,7 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
pulsarAddress := Params.PulsarAddress()
p.queryMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, Params.MsgStreamSearchBufSize())
p.queryMsgStream = pulsarms.NewPulsarMsgStream(p.proxyLoopCtx, Params.MsgStreamSearchBufSize())
p.queryMsgStream.SetPulsarClient(pulsarAddress)
p.queryMsgStream.CreatePulsarProducers(Params.SearchChannelNames())
@ -107,7 +108,7 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
p.segAssigner = segAssigner
p.segAssigner.PeerID = Params.ProxyID()
p.manipulationMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, Params.MsgStreamInsertBufSize())
p.manipulationMsgStream = pulsarms.NewPulsarMsgStream(p.proxyLoopCtx, Params.MsgStreamInsertBufSize())
p.manipulationMsgStream.SetPulsarClient(pulsarAddress)
p.manipulationMsgStream.CreatePulsarProducers(Params.InsertChannelNames())
repackFuncImpl := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {

View File

@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/master"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
@ -330,7 +331,7 @@ func TestProxy_Search(t *testing.T) {
defer group.Done()
queryResultChannels := []string{"QueryResult"}
bufSize := 1024
queryResultMsgStream := msgstream.NewPulsarMsgStream(ctx, int64(bufSize))
queryResultMsgStream := pulsarms.NewPulsarMsgStream(ctx, int64(bufSize))
pulsarAddress := Params.PulsarAddress()
queryResultMsgStream.SetPulsarClient(pulsarAddress)
assert.NotEqual(t, queryResultMsgStream, nil, "query result message stream should not be nil!")

View File

@ -13,6 +13,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
@ -42,7 +43,7 @@ type InsertTask struct {
BaseInsertTask
Condition
result *servicepb.IntegerRangeResponse
manipulationMsgStream *msgstream.PulsarMsgStream
manipulationMsgStream *pulsarms.PulsarMsgStream
ctx context.Context
rowIDAllocator *allocator.IDAllocator
}
@ -352,7 +353,7 @@ func (dct *DropCollectionTask) PostExecute() error {
type SearchTask struct {
Condition
internalpb2.SearchRequest
queryMsgStream *msgstream.PulsarMsgStream
queryMsgStream *pulsarms.PulsarMsgStream
resultBuf chan []*internalpb2.SearchResults
result *servicepb.QueryResult
ctx context.Context

View File

@ -11,6 +11,8 @@ import (
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
)
type TaskQueue interface {
@ -364,8 +366,8 @@ func (sched *TaskScheduler) queryLoop() {
func (sched *TaskScheduler) queryResultLoop() {
defer sched.wg.Done()
unmarshal := msgstream.NewUnmarshalDispatcher()
queryResultMsgStream := msgstream.NewPulsarMsgStream(sched.ctx, Params.MsgStreamSearchResultBufSize())
unmarshal := util.NewUnmarshalDispatcher()
queryResultMsgStream := pulsarms.NewPulsarMsgStream(sched.ctx, Params.MsgStreamSearchResultBufSize())
queryResultMsgStream.SetPulsarClient(Params.PulsarAddress())
queryResultMsgStream.CreatePulsarConsumers(Params.SearchResultChannelNames(),
Params.ProxySubName(),

View File

@ -13,6 +13,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
)
type tickCheckFunc = func(Timestamp) bool
@ -25,7 +26,7 @@ type timeTick struct {
pulsarProducer pulsar.Producer
tsoAllocator *allocator.TimestampAllocator
tickMsgStream *msgstream.PulsarMsgStream
tickMsgStream *pulsarms.PulsarMsgStream
peerID UniqueID
wg sync.WaitGroup
@ -50,7 +51,7 @@ func newTimeTick(ctx context.Context,
checkFunc: checkFunc,
}
t.tickMsgStream = msgstream.NewPulsarMsgStream(t.ctx, Params.MsgStreamTimeTickBufSize())
t.tickMsgStream = pulsarms.NewPulsarMsgStream(t.ctx, Params.MsgStreamTimeTickBufSize())
pulsarAddress := Params.PulsarAddress()
t.tickMsgStream.SetPulsarClient(pulsarAddress)

View File

@ -4,6 +4,7 @@ import (
"context"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
@ -13,7 +14,7 @@ type Client struct {
}
func NewQueryNodeClient(ctx context.Context, pulsarAddress string, loadIndexChannels []string) *Client {
loadIndexStream := msgstream.NewPulsarMsgStream(ctx, 0)
loadIndexStream := pulsarms.NewPulsarMsgStream(ctx, 0)
loadIndexStream.SetPulsarClient(pulsarAddress)
loadIndexStream.CreatePulsarProducers(loadIndexChannels)
var input msgstream.MsgStream = loadIndexStream

View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
@ -108,11 +109,11 @@ func TestDataSyncService_Start(t *testing.T) {
ddChannels := Params.DDChannelNames
pulsarURL := Params.PulsarAddress
insertStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(insertChannels)
ddStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
ddStream.SetPulsarClient(pulsarURL)
ddStream.CreatePulsarProducers(ddChannels)

View File

@ -4,6 +4,8 @@ import (
"context"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
@ -16,9 +18,9 @@ func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph
consumeChannels := Params.InsertChannelNames
consumeSubName := Params.MsgChannelSubName
insertStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize)
insertStream := pulsarms.NewPulsarTtMsgStream(ctx, receiveBufSize)
insertStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
var stream msgstream.MsgStream = insertStream
@ -40,9 +42,9 @@ func (dsService *dataSyncService) newDDInputNode(ctx context.Context) *flowgraph
consumeChannels := Params.DDChannelNames
consumeSubName := Params.MsgChannelSubName
ddStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize)
ddStream := pulsarms.NewPulsarTtMsgStream(ctx, receiveBufSize)
ddStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
ddStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
var stream msgstream.MsgStream = ddStream

View File

@ -12,6 +12,8 @@ import (
minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
@ -59,9 +61,9 @@ func newLoadIndexService(ctx context.Context, replica collectionReplica) *loadIn
consumeChannels := Params.LoadIndexChannelNames
consumeSubName := Params.MsgChannelSubName
loadIndexStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
loadIndexStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize)
loadIndexStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
loadIndexStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
var stream msgstream.MsgStream = loadIndexStream
@ -153,7 +155,7 @@ func (lis *loadIndexService) execute(msg msgstream.TsMsg) error {
}
return nil
}
err = msgstream.Retry(5, time.Millisecond*200, fn)
err = util.Retry(5, time.Millisecond*200, fn)
if err != nil {
return err
}

View File

@ -16,6 +16,8 @@ import (
"github.com/zilliztech/milvus-distributed/internal/indexnode"
minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
@ -124,10 +126,10 @@ func TestLoadIndexService_FloatVector(t *testing.T) {
insertChannels := Params.InsertChannelNames
ddChannels := Params.DDChannelNames
insertStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
insertStream.SetPulsarClient(Params.PulsarAddress)
insertStream.CreatePulsarProducers(insertChannels)
ddStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
ddStream.SetPulsarClient(Params.PulsarAddress)
ddStream.CreatePulsarProducers(ddChannels)
@ -202,7 +204,7 @@ func TestLoadIndexService_FloatVector(t *testing.T) {
Msgs: []msgstream.TsMsg{searchMsg},
}
}
searchStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
searchStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
searchStream.SetPulsarClient(Params.PulsarAddress)
searchStream.CreatePulsarProducers(newSearchChannelNames)
searchStream.Start()
@ -210,9 +212,9 @@ func TestLoadIndexService_FloatVector(t *testing.T) {
assert.NoError(t, err)
//get search result
searchResultStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
searchResultStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
searchResultStream.SetPulsarClient(Params.PulsarAddress)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
searchResultStream.CreatePulsarConsumers(newSearchResultChannelNames, "loadIndexTestSubSearchResult", unmarshalDispatcher, receiveBufSize)
searchResultStream.Start()
searchResult := searchResultStream.Consume()
@ -288,9 +290,9 @@ func TestLoadIndexService_FloatVector(t *testing.T) {
client.LoadIndex(indexPaths, segmentID, fieldID, "vec", indexParams)
// init message stream consumer and do checks
statsMs := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, Params.StatsReceiveBufSize)
statsMs := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, Params.StatsReceiveBufSize)
statsMs.SetPulsarClient(Params.PulsarAddress)
statsMs.CreatePulsarConsumers([]string{Params.StatsChannelName}, Params.MsgChannelSubName, msgstream.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
statsMs.CreatePulsarConsumers([]string{Params.StatsChannelName}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
statsMs.Start()
findFiledStats := false
@ -457,10 +459,10 @@ func TestLoadIndexService_BinaryVector(t *testing.T) {
insertChannels := Params.InsertChannelNames
ddChannels := Params.DDChannelNames
insertStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
insertStream.SetPulsarClient(Params.PulsarAddress)
insertStream.CreatePulsarProducers(insertChannels)
ddStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
ddStream.SetPulsarClient(Params.PulsarAddress)
ddStream.CreatePulsarProducers(ddChannels)
@ -524,7 +526,7 @@ func TestLoadIndexService_BinaryVector(t *testing.T) {
Msgs: []msgstream.TsMsg{searchMsg},
}
}
searchStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
searchStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
searchStream.SetPulsarClient(Params.PulsarAddress)
searchStream.CreatePulsarProducers(newSearchChannelNames)
searchStream.Start()
@ -532,9 +534,9 @@ func TestLoadIndexService_BinaryVector(t *testing.T) {
assert.NoError(t, err)
//get search result
searchResultStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
searchResultStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
searchResultStream.SetPulsarClient(Params.PulsarAddress)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
searchResultStream.CreatePulsarConsumers(newSearchResultChannelNames, "loadIndexTestSubSearchResult2", unmarshalDispatcher, receiveBufSize)
searchResultStream.Start()
searchResult := searchResultStream.Consume()
@ -604,9 +606,9 @@ func TestLoadIndexService_BinaryVector(t *testing.T) {
client.LoadIndex(indexPaths, segmentID, fieldID, "vec", indexParams)
// init message stream consumer and do checks
statsMs := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, Params.StatsReceiveBufSize)
statsMs := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, Params.StatsReceiveBufSize)
statsMs.SetPulsarClient(Params.PulsarAddress)
statsMs.CreatePulsarConsumers([]string{Params.StatsChannelName}, Params.MsgChannelSubName, msgstream.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
statsMs.CreatePulsarConsumers([]string{Params.StatsChannelName}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
statsMs.Start()
findFiledStats := false

View File

@ -20,7 +20,8 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
@ -167,7 +168,7 @@ func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*co
return status, errors.New(errMsg)
}
searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream)
searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search message stream"
status := &commonpb.Status{
@ -178,7 +179,7 @@ func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*co
return status, errors.New(errMsg)
}
resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream)
resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search result message stream"
status := &commonpb.Status{
@ -193,7 +194,7 @@ func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*co
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
// add result channel
@ -217,7 +218,7 @@ func (node *QueryNode) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest
return status, errors.New(errMsg)
}
searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream)
searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search message stream"
status := &commonpb.Status{
@ -228,7 +229,7 @@ func (node *QueryNode) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest
return status, errors.New(errMsg)
}
resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream)
resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search result message stream"
status := &commonpb.Status{
@ -243,7 +244,7 @@ func (node *QueryNode) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
// TODO: searchStream.RemovePulsarConsumers(producerChannels)
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
@ -269,7 +270,7 @@ func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*com
return status, errors.New(errMsg)
}
fgDMMsgStream, ok := node.dataSyncService.dmStream.(*msgstream.PulsarMsgStream)
fgDMMsgStream, ok := node.dataSyncService.dmStream.(*pulsarms.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for dm message stream"
status := &commonpb.Status{
@ -284,7 +285,7 @@ func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*com
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := in.ChannelIDs
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
status := &commonpb.Status{

View File

@ -16,6 +16,8 @@ import (
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
@ -49,14 +51,14 @@ func newSearchService(ctx context.Context, replica collectionReplica) *searchSer
consumeChannels := Params.SearchChannelNames
consumeSubName := Params.MsgChannelSubName
searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
searchStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize)
searchStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
var inputStream msgstream.MsgStream = searchStream
producerChannels := Params.SearchResultChannelNames
searchResultStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
searchResultStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize)
searchResultStream.SetPulsarClient(msgStreamURL)
searchResultStream.CreatePulsarProducers(producerChannels)
var outputStream msgstream.MsgStream = searchResultStream

View File

@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
@ -94,7 +95,7 @@ func TestSearch_Search(t *testing.T) {
msgPackSearch := msgstream.MsgPack{}
msgPackSearch.Msgs = append(msgPackSearch.Msgs, searchMsg)
searchStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
searchStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
searchStream.SetPulsarClient(pulsarURL)
searchStream.CreatePulsarProducers(searchProducerChannels)
searchStream.Start()
@ -180,11 +181,11 @@ func TestSearch_Search(t *testing.T) {
insertChannels := Params.InsertChannelNames
ddChannels := Params.DDChannelNames
insertStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(insertChannels)
ddStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
ddStream.SetPulsarClient(pulsarURL)
ddStream.CreatePulsarProducers(ddChannels)
@ -288,7 +289,7 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
msgPackSearch := msgstream.MsgPack{}
msgPackSearch.Msgs = append(msgPackSearch.Msgs, searchMsg)
searchStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
searchStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
searchStream.SetPulsarClient(pulsarURL)
searchStream.CreatePulsarProducers(searchProducerChannels)
searchStream.Start()
@ -378,11 +379,11 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
insertChannels := Params.InsertChannelNames
ddChannels := Params.DDChannelNames
insertStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(insertChannels)
ddStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
ddStream.SetPulsarClient(pulsarURL)
ddStream.CreatePulsarProducers(ddChannels)

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
@ -41,7 +42,7 @@ func (sService *statsService) start() {
msgStreamURL := Params.PulsarAddress
producerChannels := []string{Params.StatsChannelName}
statsStream := msgstream.NewPulsarMsgStream(sService.ctx, receiveBufSize)
statsStream := pulsarms.NewPulsarMsgStream(sService.ctx, receiveBufSize)
statsStream.SetPulsarClient(msgStreamURL)
statsStream.CreatePulsarProducers(producerChannels)

View File

@ -4,6 +4,7 @@ import (
"testing"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
)
// NOTE: start pulsar before test
@ -26,7 +27,7 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) {
pulsarURL := Params.PulsarAddress
statsStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
statsStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
statsStream.SetPulsarClient(pulsarURL)
statsStream.CreatePulsarProducers(producerChannels)

View File

@ -14,6 +14,7 @@ import (
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@ -181,11 +182,11 @@ func TestDataSyncService_Start(t *testing.T) {
insertChannels := Params.InsertChannelNames
ddChannels := Params.DDChannelNames
insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
insertStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize)
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(insertChannels)
ddStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
ddStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize)
ddStream.SetPulsarClient(pulsarURL)
ddStream.CreatePulsarProducers(ddChannels)

View File

@ -19,6 +19,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/kv"
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
@ -42,7 +43,7 @@ type (
minioPrifex string
idAllocator *allocator.IDAllocator
outCh chan *insertFlushSyncMsg
pulsarWriteNodeTimeTickStream *msgstream.PulsarMsgStream
pulsarWriteNodeTimeTickStream *pulsarms.PulsarMsgStream
replica collectionReplica
}
@ -643,7 +644,7 @@ func newInsertBufferNode(ctx context.Context, outCh chan *insertFlushSyncMsg, re
panic(err)
}
wTt := msgstream.NewPulsarMsgStream(ctx, 1024) //input stream, write node time tick
wTt := pulsarms.NewPulsarMsgStream(ctx, 1024) //input stream, write node time tick
wTt.SetPulsarClient(Params.PulsarAddress)
wTt.CreatePulsarProducers([]string{Params.WriteNodeTimeTickChannelName})

View File

@ -4,6 +4,8 @@ import (
"context"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
@ -16,11 +18,11 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
consumeChannels := Params.InsertChannelNames
consumeSubName := Params.MsgChannelSubName
insertStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize)
insertStream := pulsarms.NewPulsarTtMsgStream(ctx, receiveBufSize)
// TODO could panic of nil pointer
insertStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
// TODO could panic of nil pointer
insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
@ -43,9 +45,9 @@ func newDDInputNode(ctx context.Context) *flowgraph.InputNode {
consumeChannels := Params.DDChannelNames
consumeSubName := Params.MsgChannelSubName
ddStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize)
ddStream := pulsarms.NewPulsarTtMsgStream(ctx, receiveBufSize)
ddStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
unmarshalDispatcher := util.NewUnmarshalDispatcher()
ddStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
var stream msgstream.MsgStream = ddStream