Defer lock's unlock in meta::DropSegment

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2021-02-04 11:52:10 +08:00 committed by yefu.chen
parent 8d35e76ad1
commit 71b6c88c6e
31 changed files with 270 additions and 259 deletions

View File

@ -82,12 +82,14 @@ func TestDataSyncService_Start(t *testing.T) {
insertChannels := Params.InsertChannelNames
ddChannels := Params.DDChannelNames
factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
insertStream, _ := factory.NewMsgStream(ctx)
insertStream.AsProducer(insertChannels)
factory := msgstream.ProtoUDFactory{}
insertStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(insertChannels)
ddStream, _ := factory.NewMsgStream(ctx)
ddStream.AsProducer(ddChannels)
ddStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
ddStream.SetPulsarClient(pulsarURL)
ddStream.CreatePulsarProducers(ddChannels)
var insertMsgStream msgstream.MsgStream = insertStream
insertMsgStream.Start()

View File

@ -664,23 +664,26 @@ func newInsertBufferNode(ctx context.Context, flushMeta *metaTable,
}
minioPrefix := Params.InsertBinlogRootPath
factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
factory := msgstream.ProtoUDFactory{}
//input stream, data node time tick
wTt, _ := factory.NewMsgStream(ctx)
wTt.AsProducer([]string{Params.TimeTickChannelName})
wTt := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
wTt.SetPulsarClient(Params.PulsarAddress)
wTt.CreatePulsarProducers([]string{Params.TimeTickChannelName})
var wTtMsgStream msgstream.MsgStream = wTt
wTtMsgStream.Start()
// update statistics channel
segS, _ := factory.NewMsgStream(ctx)
segS.AsProducer([]string{Params.SegmentStatisticsChannelName})
segS := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
segS.SetPulsarClient(Params.PulsarAddress)
segS.CreatePulsarProducers([]string{Params.SegmentStatisticsChannelName})
var segStatisticsMsgStream msgstream.MsgStream = segS
segStatisticsMsgStream.Start()
// segment flush completed channel
cf, _ := factory.NewMsgStream(ctx)
cf.AsProducer([]string{Params.CompleteFlushChannelName})
cf := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
cf.SetPulsarClient(Params.PulsarAddress)
cf.CreatePulsarProducers([]string{Params.CompleteFlushChannelName})
var completeFlushStream msgstream.MsgStream = cf
completeFlushStream.Start()

View File

@ -15,9 +15,11 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
consumeChannels := Params.InsertChannelNames
consumeSubName := Params.MsgChannelSubName
factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
insertStream, _ := factory.NewTtMsgStream(ctx)
insertStream.AsConsumer(consumeChannels, consumeSubName)
factory := msgstream.ProtoUDFactory{}
insertStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
insertStream.SetPulsarClient(Params.PulsarAddress)
insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName)
var stream msgstream.MsgStream = insertStream
node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism)
@ -30,9 +32,10 @@ func newDDInputNode(ctx context.Context) *flowgraph.InputNode {
maxParallelism := Params.FlowGraphMaxParallelism
consumeSubName := Params.MsgChannelSubName
factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
tmpStream, _ := factory.NewTtMsgStream(ctx)
tmpStream.AsConsumer(Params.DDChannelNames, consumeSubName)
factory := msgstream.ProtoUDFactory{}
tmpStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
tmpStream.SetPulsarClient(Params.PulsarAddress)
tmpStream.CreatePulsarConsumers(Params.DDChannelNames, consumeSubName)
var stream msgstream.MsgStream = tmpStream
node := flowgraph.NewInputNode(&stream, "ddInputNode", maxQueueLength, maxParallelism)

View File

@ -183,7 +183,7 @@ func (meta *meta) UpdateSegment(segmentInfo *datapb.SegmentInfo) error {
func (meta *meta) DropSegment(segmentID UniqueID) error {
meta.ddLock.Lock()
meta.ddLock.Unlock()
defer meta.ddLock.Unlock()
if _, ok := meta.segID2Info[segmentID]; !ok {
return newErrSegmentNotFound(segmentID)

View File

@ -71,7 +71,7 @@ type (
}
)
func newSegmentAllocator(meta *meta, allocator allocator) (*segmentAllocatorImpl, error) {
func newSegmentAllocator(meta *meta, allocator allocator) *segmentAllocatorImpl {
segmentAllocator := &segmentAllocatorImpl{
mt: meta,
segments: make(map[UniqueID]*segmentStatus),
@ -80,7 +80,7 @@ func newSegmentAllocator(meta *meta, allocator allocator) (*segmentAllocatorImpl
segmentThresholdFactor: Params.SegmentSizeFactor,
allocator: allocator,
}
return segmentAllocator, nil
return segmentAllocator
}
func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentInfo) error {

View File

@ -17,8 +17,7 @@ func TestAllocSegment(t *testing.T) {
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
segAllocator, err := newSegmentAllocator(meta, mockAllocator)
assert.Nil(t, err)
segAllocator := newSegmentAllocator(meta, mockAllocator)
schema := newTestSchema()
collID, err := mockAllocator.allocID()
@ -68,8 +67,7 @@ func TestSealSegment(t *testing.T) {
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
segAllocator, err := newSegmentAllocator(meta, mockAllocator)
assert.Nil(t, err)
segAllocator := newSegmentAllocator(meta, mockAllocator)
schema := newTestSchema()
collID, err := mockAllocator.allocID()
@ -105,8 +103,7 @@ func TestExpireSegment(t *testing.T) {
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
segAllocator, err := newSegmentAllocator(meta, mockAllocator)
assert.Nil(t, err)
segAllocator := newSegmentAllocator(meta, mockAllocator)
schema := newTestSchema()
collID, err := mockAllocator.allocID()

View File

@ -134,10 +134,7 @@ func (s *Server) Start() error {
return err
}
s.statsHandler = newStatsHandler(s.meta)
s.segAllocator, err = newSegmentAllocator(s.meta, s.allocator)
if err != nil {
return err
}
s.segAllocator = newSegmentAllocator(s.meta, s.allocator)
s.ddHandler = newDDHandler(s.meta, s.segAllocator)
s.initSegmentInfoChannel()
if err = s.loadMetaFromMaster(); err != nil {
@ -173,22 +170,25 @@ func (s *Server) initMeta() error {
}
func (s *Server) initSegmentInfoChannel() {
factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
segmentInfoStream, _ := factory.NewMsgStream(s.ctx)
segmentInfoStream.AsProducer([]string{Params.SegmentInfoChannelName})
factory := msgstream.ProtoUDFactory{}
segmentInfoStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
segmentInfoStream.SetPulsarClient(Params.PulsarAddress)
segmentInfoStream.CreatePulsarProducers([]string{Params.SegmentInfoChannelName})
s.segmentInfoStream = segmentInfoStream
s.segmentInfoStream.Start()
}
func (s *Server) initMsgProducer() error {
factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
ttMsgStream, _ := factory.NewMsgStream(s.ctx)
ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
factory := msgstream.ProtoUDFactory{}
ttMsgStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
ttMsgStream.SetPulsarClient(Params.PulsarAddress)
ttMsgStream.CreatePulsarConsumers([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
s.ttMsgStream = ttMsgStream
s.ttMsgStream.Start()
timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs())
dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster)
k2sStream, _ := factory.NewMsgStream(s.ctx)
k2sStream.AsProducer(Params.K2SChannelNames)
k2sStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
k2sStream.SetPulsarClient(Params.PulsarAddress)
k2sStream.CreatePulsarProducers(Params.K2SChannelNames)
s.k2sMsgStream = k2sStream
s.k2sMsgStream.Start()
k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream)
@ -308,9 +308,10 @@ func (s *Server) startServerLoop() {
func (s *Server) startStatsChannel(ctx context.Context) {
defer s.serverLoopWg.Done()
factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
statsStream, _ := factory.NewMsgStream(ctx)
statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName)
factory := msgstream.ProtoUDFactory{}
statsStream := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
statsStream.SetPulsarClient(Params.PulsarAddress)
statsStream.CreatePulsarConsumers([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName)
statsStream.Start()
defer statsStream.Close()
for {
@ -334,9 +335,10 @@ func (s *Server) startStatsChannel(ctx context.Context) {
func (s *Server) startSegmentFlushChannel(ctx context.Context) {
defer s.serverLoopWg.Done()
factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
flushStream, _ := factory.NewMsgStream(ctx)
flushStream.AsConsumer([]string{Params.SegmentInfoChannelName}, Params.DataServiceSubscriptionName)
factory := msgstream.ProtoUDFactory{}
flushStream := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
flushStream.SetPulsarClient(Params.PulsarAddress)
flushStream.CreatePulsarConsumers([]string{Params.SegmentInfoChannelName}, Params.DataServiceSubscriptionName)
flushStream.Start()
defer flushStream.Close()
for {
@ -369,9 +371,10 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) {
func (s *Server) startDDChannel(ctx context.Context) {
defer s.serverLoopWg.Done()
factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
ddStream, _ := factory.NewMsgStream(ctx)
ddStream.AsConsumer([]string{s.ddChannelName}, Params.DataServiceSubscriptionName)
factory := msgstream.ProtoUDFactory{}
ddStream := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
ddStream.SetPulsarClient(Params.PulsarAddress)
ddStream.CreatePulsarConsumers([]string{s.ddChannelName}, Params.DataServiceSubscriptionName)
ddStream.Start()
defer ddStream.Close()
for {

View File

@ -21,7 +21,7 @@ func TestDataNodeTTWatcher(t *testing.T) {
allocator := newMockAllocator()
meta, err := newMemoryMeta(allocator)
assert.Nil(t, err)
segAllocator, err := newSegmentAllocator(meta, allocator)
segAllocator := newSegmentAllocator(meta, allocator)
assert.Nil(t, err)
watcher := newDataNodeTimeTickWatcher(meta, segAllocator, cluster)

View File

@ -402,6 +402,7 @@ func (c *Core) tsLoop() {
}
}
func (c *Core) setMsgStreams() error {
dispatcherFactory := ms.ProtoUDFactory{}
if Params.PulsarAddress == "" {
return errors.Errorf("PulsarAddress is empty")
@ -414,25 +415,26 @@ func (c *Core) setMsgStreams() error {
if Params.ProxyTimeTickChannel == "" {
return errors.Errorf("ProxyTimeTickChannel is empty")
}
factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
proxyTimeTickStream, _ := factory.NewMsgStream(c.ctx)
proxyTimeTickStream.AsConsumer([]string{Params.ProxyTimeTickChannel}, Params.MsgChannelSubName)
proxyTimeTickStream := pulsarms.NewPulsarMsgStream(c.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
proxyTimeTickStream.SetPulsarClient(Params.PulsarAddress)
proxyTimeTickStream.CreatePulsarConsumers([]string{Params.ProxyTimeTickChannel}, Params.MsgChannelSubName)
proxyTimeTickStream.Start()
// master time tick channel
if Params.TimeTickChannel == "" {
return errors.Errorf("TimeTickChannel is empty")
}
timeTickStream, _ := factory.NewMsgStream(c.ctx)
timeTickStream.AsProducer([]string{Params.TimeTickChannel})
timeTickStream := pulsarms.NewPulsarMsgStream(c.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
timeTickStream.SetPulsarClient(Params.PulsarAddress)
timeTickStream.CreatePulsarProducers([]string{Params.TimeTickChannel})
// master dd channel
if Params.DdChannel == "" {
return errors.Errorf("DdChannel is empty")
}
ddStream, _ := factory.NewMsgStream(c.ctx)
ddStream.AsProducer([]string{Params.DdChannel})
ddStream := pulsarms.NewPulsarMsgStream(c.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
ddStream.SetPulsarClient(Params.PulsarAddress)
ddStream.CreatePulsarProducers([]string{Params.DdChannel})
c.SendTimeTick = func(t typeutil.Timestamp) error {
msgPack := ms.MsgPack{}
@ -565,8 +567,9 @@ func (c *Core) setMsgStreams() error {
if Params.DataServiceSegmentChannel == "" {
return errors.Errorf("DataServiceSegmentChannel is empty")
}
dataServiceStream, _ := factory.NewMsgStream(c.ctx)
dataServiceStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName)
dataServiceStream := pulsarms.NewPulsarMsgStream(c.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
dataServiceStream.SetPulsarClient(Params.PulsarAddress)
dataServiceStream.CreatePulsarConsumers([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName)
dataServiceStream.Start()
c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo, 1024)
c.DataNodeSegmentFlushCompletedChan = make(chan typeutil.UniqueID, 1024)

View File

@ -146,19 +146,23 @@ func TestMasterService(t *testing.T) {
err = core.Start()
assert.Nil(t, err)
factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
proxyTimeTickStream, _ := factory.NewMsgStream(ctx)
proxyTimeTickStream.AsProducer([]string{Params.ProxyTimeTickChannel})
factory := ms.ProtoUDFactory{}
proxyTimeTickStream := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
proxyTimeTickStream.SetPulsarClient(Params.PulsarAddress)
proxyTimeTickStream.CreatePulsarProducers([]string{Params.ProxyTimeTickChannel})
dataServiceSegmentStream, _ := factory.NewMsgStream(ctx)
dataServiceSegmentStream.AsProducer([]string{Params.DataServiceSegmentChannel})
dataServiceSegmentStream := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
dataServiceSegmentStream.SetPulsarClient(Params.PulsarAddress)
dataServiceSegmentStream.CreatePulsarProducers([]string{Params.DataServiceSegmentChannel})
timeTickStream, _ := factory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.TimeTickChannel}, Params.MsgChannelSubName)
timeTickStream := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
timeTickStream.SetPulsarClient(Params.PulsarAddress)
timeTickStream.CreatePulsarConsumers([]string{Params.TimeTickChannel}, Params.MsgChannelSubName)
timeTickStream.Start()
ddStream, _ := factory.NewMsgStream(ctx)
ddStream.AsConsumer([]string{Params.DdChannel}, Params.MsgChannelSubName)
ddStream := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
ddStream.SetPulsarClient(Params.PulsarAddress)
ddStream.CreatePulsarConsumers([]string{Params.DdChannel}, Params.MsgChannelSubName)
ddStream.Start()
time.Sleep(time.Second)

View File

@ -1,8 +1,6 @@
package msgstream
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
@ -26,17 +24,9 @@ type MsgStream interface {
Start()
Close()
Chan() <-chan *MsgPack
AsProducer(channels []string)
AsConsumer(channels []string, subName string)
SetRepackFunc(repackFunc RepackFunc)
Produce(*MsgPack) error
Broadcast(*MsgPack) error
Consume() *MsgPack
Seek(offset *MsgPosition) error
}
type Factory interface {
NewMsgStream(ctx context.Context) (MsgStream, error)
NewTtMsgStream(ctx context.Context) (MsgStream, error)
}

View File

@ -1,32 +0,0 @@
package pulsarms
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
)
type Factory struct {
dispatcherFactory msgstream.ProtoUDFactory
address string
receiveBufSize int64
pulsarBufSize int64
}
func (f *Factory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
return newPulsarMsgStream(ctx, f.address, f.receiveBufSize, f.pulsarBufSize, f.dispatcherFactory.NewUnmarshalDispatcher())
}
func (f *Factory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
return NewPulsarTtMsgStream(ctx, f.address, f.receiveBufSize, f.pulsarBufSize, f.dispatcherFactory.NewUnmarshalDispatcher())
}
func NewFactory(address string, receiveBufSize int64, pulsarBufSize int64) *Factory {
f := &Factory{
dispatcherFactory: msgstream.ProtoUDFactory{},
address: address,
receiveBufSize: receiveBufSize,
pulsarBufSize: pulsarBufSize,
}
return f
}

View File

@ -141,16 +141,18 @@ func TestStream_task_Insert(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getInsertTask(3, 3))
factory := msgstream.ProtoUDFactory{}
inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
inputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
inputStream.SetRepackFunc(newRepackFunc)
inputStream.Start()
dispatcher := factory.NewUnmarshalDispatcher()
outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, dispatcher)
outputStream := NewPulsarMsgStream(context.Background(), 100, 100, dispatcher)
outputStream.SetPulsarClient(pulsarAddress)
testTask := InsertTask{}
dispatcher.AddMsgTemplate(commonpb.MsgType_kInsert, testTask.Unmarshal)
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName)
outputStream.Start()
err := inputStream.Produce(&msgPack)
@ -159,7 +161,7 @@ func TestStream_task_Insert(t *testing.T) {
}
receiveCount := 0
for {
result := outputStream.Consume()
result := (*outputStream).Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {

View File

@ -52,12 +52,7 @@ type PulsarMsgStream struct {
pulsarBufSize int64
}
func newPulsarMsgStream(ctx context.Context,
address string,
receiveBufSize int64,
pulsarBufSize int64,
unmarshal UnmarshalDispatcher) (*PulsarMsgStream, error) {
func NewPulsarMsgStream(ctx context.Context, receiveBufSize int64, pulsarBufSize int64, unmarshal UnmarshalDispatcher) *PulsarMsgStream {
streamCtx, streamCancel := context.WithCancel(ctx)
producers := make([]Producer, 0)
consumers := make([]Consumer, 0)
@ -71,17 +66,19 @@ func newPulsarMsgStream(ctx context.Context,
unmarshal: unmarshal,
pulsarBufSize: pulsarBufSize,
}
stream.receiveBuf = make(chan *MsgPack, receiveBufSize)
return stream
}
func (ms *PulsarMsgStream) SetPulsarClient(address string) {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address})
if err != nil {
log.Printf("Set pulsar client failed, error = %v", err)
return nil, err
}
stream.client = client
stream.receiveBuf = make(chan *MsgPack, receiveBufSize)
return stream, nil
ms.client = client
}
func (ms *PulsarMsgStream) AsProducer(channels []string) {
func (ms *PulsarMsgStream) CreatePulsarProducers(channels []string) {
for i := 0; i < len(channels); i++ {
fn := func() error {
pp, err := ms.client.CreateProducer(pulsar.ProducerOptions{Topic: channels[i]})
@ -103,7 +100,7 @@ func (ms *PulsarMsgStream) AsProducer(channels []string) {
}
}
func (ms *PulsarMsgStream) AsConsumer(channels []string,
func (ms *PulsarMsgStream) CreatePulsarConsumers(channels []string,
subName string) {
for i := 0; i < len(channels); i++ {
fn := func() error {
@ -482,12 +479,7 @@ type PulsarTtMsgStream struct {
lastTimeStamp Timestamp
}
func NewPulsarTtMsgStream(ctx context.Context,
address string,
receiveBufSize int64,
pulsarBufSize int64,
unmarshal msgstream.UnmarshalDispatcher) (*PulsarTtMsgStream, error) {
func NewPulsarTtMsgStream(ctx context.Context, receiveBufSize int64, pulsarBufSize int64, unmarshal msgstream.UnmarshalDispatcher) *PulsarTtMsgStream {
streamCtx, streamCancel := context.WithCancel(ctx)
pulsarMsgStream := PulsarMsgStream{
ctx: streamCtx,
@ -495,17 +487,10 @@ func NewPulsarTtMsgStream(ctx context.Context,
pulsarBufSize: pulsarBufSize,
unmarshal: unmarshal,
}
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address})
if err != nil {
log.Printf("Set pulsar client failed, error = %v", err)
return nil, err
}
pulsarMsgStream.client = client
pulsarMsgStream.receiveBuf = make(chan *MsgPack, receiveBufSize)
return &PulsarTtMsgStream{
PulsarMsgStream: pulsarMsgStream,
}, nil
}
}
func (ms *PulsarTtMsgStream) Start() {

View File

@ -176,8 +176,9 @@ func initPulsarStream(pulsarAddress string,
factory := msgstream.ProtoUDFactory{}
// set input stream
inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
inputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
}
@ -185,8 +186,9 @@ func initPulsarStream(pulsarAddress string,
var input msgstream.MsgStream = inputStream
// set output stream
outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
outputStream.SetPulsarClient(pulsarAddress)
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName)
outputStream.Start()
var output msgstream.MsgStream = outputStream
@ -201,8 +203,9 @@ func initPulsarTtStream(pulsarAddress string,
factory := msgstream.ProtoUDFactory{}
// set input stream
inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
inputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
}
@ -210,8 +213,9 @@ func initPulsarTtStream(pulsarAddress string,
var input msgstream.MsgStream = inputStream
// set output stream
outputStream, _ := NewPulsarTtMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream := NewPulsarTtMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
outputStream.SetPulsarClient(pulsarAddress)
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName)
outputStream.Start()
var output msgstream.MsgStream = outputStream
@ -413,12 +417,14 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
factory := msgstream.ProtoUDFactory{}
inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
inputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
inputStream.Start()
outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
outputStream.SetPulsarClient(pulsarAddress)
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName)
outputStream.Start()
var output msgstream.MsgStream = outputStream
@ -464,12 +470,14 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, deleteMsg)
factory := msgstream.ProtoUDFactory{}
inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
inputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
inputStream.Start()
outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
outputStream.SetPulsarClient(pulsarAddress)
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName)
outputStream.Start()
var output msgstream.MsgStream = outputStream
@ -495,12 +503,14 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kQueryNodeStats, 4, 4))
factory := msgstream.ProtoUDFactory{}
inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
inputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
inputStream.Start()
outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
outputStream.SetPulsarClient(pulsarAddress)
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName)
outputStream.Start()
var output msgstream.MsgStream = outputStream

View File

@ -83,6 +83,7 @@ type InsertChannelsMap struct {
func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []string) error {
m.mtx.Lock()
defer m.mtx.Unlock()
factory := msgstream.ProtoUDFactory{}
_, ok := m.collectionID2InsertChannels[collID]
if ok {
@ -100,10 +101,9 @@ func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []st
}
m.insertChannels = append(m.insertChannels, channels)
m.collectionID2InsertChannels[collID] = len(m.insertChannels) - 1
factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamInsertBufSize, 1024)
stream, _ := factory.NewMsgStream(context.Background())
stream.AsProducer(channels)
stream := pulsarms.NewPulsarMsgStream(context.Background(), Params.MsgStreamInsertBufSize, 1024, factory.NewUnmarshalDispatcher())
stream.SetPulsarClient(Params.PulsarAddress)
stream.CreatePulsarProducers(channels)
repack := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
return insertRepackFunc(tsMsgs, hashKeys, m.nodeInstance.segAssigner, true)
}

View File

@ -54,8 +54,8 @@ type NodeImpl struct {
tsoAllocator *allocator.TimestampAllocator
segAssigner *SegIDAssigner
manipulationMsgStream msgstream.MsgStream
queryMsgStream msgstream.MsgStream
manipulationMsgStream *pulsarms.PulsarMsgStream
queryMsgStream *pulsarms.PulsarMsgStream
tracer opentracing.Tracer
closer io.Closer
@ -106,7 +106,7 @@ func (node *NodeImpl) waitForServiceReady(service Component, serviceName string)
}
func (node *NodeImpl) Init() error {
factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamSearchBufSize, 1024)
factory := msgstream.ProtoUDFactory{}
// todo wait for proxyservice state changed to Healthy
@ -195,8 +195,11 @@ func (node *NodeImpl) Init() error {
}
opentracing.SetGlobalTracer(node.tracer)
node.queryMsgStream, _ = factory.NewMsgStream(node.ctx)
node.queryMsgStream.AsProducer(Params.SearchChannelNames)
pulsarAddress := Params.PulsarAddress
node.queryMsgStream = pulsarms.NewPulsarMsgStream(node.ctx, Params.MsgStreamSearchBufSize, 1024, factory.NewUnmarshalDispatcher())
node.queryMsgStream.SetPulsarClient(pulsarAddress)
node.queryMsgStream.CreatePulsarProducers(Params.SearchChannelNames)
log.Println("create query message stream ...")
masterAddr := Params.MasterAddress
@ -222,8 +225,9 @@ func (node *NodeImpl) Init() error {
node.segAssigner = segAssigner
node.segAssigner.PeerID = Params.ProxyID
node.manipulationMsgStream, _ = factory.NewMsgStream(node.ctx)
node.manipulationMsgStream.AsProducer(Params.InsertChannelNames)
node.manipulationMsgStream = pulsarms.NewPulsarMsgStream(node.ctx, Params.MsgStreamInsertBufSize, 1024, factory.NewUnmarshalDispatcher())
node.manipulationMsgStream.SetPulsarClient(pulsarAddress)
node.manipulationMsgStream.CreatePulsarProducers(Params.InsertChannelNames)
repackFuncImpl := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, true)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
@ -409,7 +410,7 @@ func (dct *DropCollectionTask) PostExecute() error {
type SearchTask struct {
Condition
internalpb2.SearchRequest
queryMsgStream msgstream.MsgStream
queryMsgStream *pulsarms.PulsarMsgStream
resultBuf chan []*internalpb2.SearchResults
result *milvuspb.SearchResults
query *milvuspb.SearchRequest

View File

@ -371,10 +371,11 @@ func (sched *TaskScheduler) queryLoop() {
func (sched *TaskScheduler) queryResultLoop() {
defer sched.wg.Done()
factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamSearchResultBufSize, 1024)
factory := msgstream.ProtoUDFactory{}
queryResultMsgStream, _ := factory.NewMsgStream(sched.ctx)
queryResultMsgStream.AsConsumer(Params.SearchResultChannelNames,
queryResultMsgStream := pulsarms.NewPulsarMsgStream(sched.ctx, Params.MsgStreamSearchResultBufSize, 1024, factory.NewUnmarshalDispatcher())
queryResultMsgStream.SetPulsarClient(Params.PulsarAddress)
queryResultMsgStream.CreatePulsarConsumers(Params.SearchResultChannelNames,
Params.ProxySubName)
queryNodeNum := Params.QueryNodeNum

View File

@ -26,7 +26,7 @@ type timeTick struct {
pulsarProducer pulsar.Producer
tsoAllocator *allocator.TimestampAllocator
tickMsgStream msgstream.MsgStream
tickMsgStream *pulsarms.PulsarMsgStream
peerID UniqueID
wg sync.WaitGroup
@ -51,9 +51,12 @@ func newTimeTick(ctx context.Context,
checkFunc: checkFunc,
}
factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamTimeTickBufSize, 1024)
t.tickMsgStream, _ = factory.NewMsgStream(t.ctx)
t.tickMsgStream.AsProducer(Params.ProxyTimeTickChannelNames)
factory := msgstream.ProtoUDFactory{}
t.tickMsgStream = pulsarms.NewPulsarMsgStream(t.ctx, Params.MsgStreamTimeTickBufSize, 1024, factory.NewUnmarshalDispatcher())
pulsarAddress := Params.PulsarAddress
t.tickMsgStream.SetPulsarClient(pulsarAddress)
t.tickMsgStream.CreatePulsarProducers(Params.ProxyTimeTickChannelNames)
return t
}

View File

@ -10,6 +10,7 @@ import (
"strconv"
"time"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -95,7 +96,7 @@ func (s *ServiceImpl) fillNodeInitParams() error {
}
func (s *ServiceImpl) Init() error {
factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
dispatcherFactory := msgstream.ProtoUDFactory{}
err := s.fillNodeInitParams()
if err != nil {
@ -103,8 +104,9 @@ func (s *ServiceImpl) Init() error {
}
log.Println("fill node init params ...")
serviceTimeTickMsgStream, _ := factory.NewTtMsgStream(s.ctx)
serviceTimeTickMsgStream.AsProducer([]string{Params.ServiceTimeTickChannel})
serviceTimeTickMsgStream := pulsarms.NewPulsarTtMsgStream(s.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
serviceTimeTickMsgStream.SetPulsarClient(Params.PulsarAddress)
serviceTimeTickMsgStream.CreatePulsarProducers([]string{Params.ServiceTimeTickChannel})
log.Println("create service time tick producer channel: ", []string{Params.ServiceTimeTickChannel})
channels := make([]string, Params.InsertChannelNum)
@ -112,12 +114,14 @@ func (s *ServiceImpl) Init() error {
for ; i < Params.InsertChannelNum; i++ {
channels[i] = Params.InsertChannelPrefixName + strconv.FormatInt(i, 10)
}
insertTickMsgStream, _ := factory.NewMsgStream(s.ctx)
insertTickMsgStream.AsProducer(channels)
insertTickMsgStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
insertTickMsgStream.SetPulsarClient(Params.PulsarAddress)
insertTickMsgStream.CreatePulsarProducers(channels)
log.Println("create service time tick producer channel: ", channels)
nodeTimeTickMsgStream, _ := factory.NewMsgStream(s.ctx)
nodeTimeTickMsgStream.AsConsumer(Params.NodeTimeTickChannel,
nodeTimeTickMsgStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
nodeTimeTickMsgStream.SetPulsarClient(Params.PulsarAddress)
nodeTimeTickMsgStream.CreatePulsarConsumers(Params.NodeTimeTickChannel,
"proxyservicesub") // TODO: add config
log.Println("create node time tick consumer channel: ", Params.NodeTimeTickChannel)

View File

@ -109,12 +109,14 @@ func TestDataSyncService_Start(t *testing.T) {
ddChannels := Params.DDChannelNames
pulsarURL := Params.PulsarAddress
factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
insertStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
insertStream.AsProducer(insertChannels)
factory := msgstream.ProtoUDFactory{}
insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(insertChannels)
ddStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
ddStream.AsProducer(ddChannels)
ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
ddStream.SetPulsarClient(pulsarURL)
ddStream.CreatePulsarProducers(ddChannels)
var insertMsgStream msgstream.MsgStream = insertStream
insertMsgStream.Start()

View File

@ -9,13 +9,18 @@ import (
)
func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph.InputNode {
factory := pulsarms.NewFactory(Params.PulsarAddress, Params.InsertReceiveBufSize, Params.InsertPulsarBufSize)
factory := msgstream.ProtoUDFactory{}
receiveBufSize := Params.InsertReceiveBufSize
pulsarBufSize := Params.InsertPulsarBufSize
msgStreamURL := Params.PulsarAddress
consumeChannels := Params.InsertChannelNames
consumeSubName := Params.MsgChannelSubName
insertStream, _ := factory.NewTtMsgStream(ctx)
insertStream.AsConsumer(consumeChannels, consumeSubName)
insertStream := pulsarms.NewPulsarTtMsgStream(ctx, receiveBufSize, pulsarBufSize, factory.NewUnmarshalDispatcher())
insertStream.SetPulsarClient(msgStreamURL)
insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName)
var stream msgstream.MsgStream = insertStream
dsService.dmStream = stream
@ -28,13 +33,18 @@ func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph
}
func (dsService *dataSyncService) newDDInputNode(ctx context.Context) *flowgraph.InputNode {
factory := pulsarms.NewFactory(Params.PulsarAddress, Params.DDReceiveBufSize, Params.DDPulsarBufSize)
factory := msgstream.ProtoUDFactory{}
receiveBufSize := Params.DDReceiveBufSize
pulsarBufSize := Params.DDPulsarBufSize
msgStreamURL := Params.PulsarAddress
consumeChannels := Params.DDChannelNames
consumeSubName := Params.MsgChannelSubName
ddStream, _ := factory.NewTtMsgStream(ctx)
ddStream.AsConsumer(consumeChannels, consumeSubName)
ddStream := pulsarms.NewPulsarTtMsgStream(ctx, receiveBufSize, pulsarBufSize, factory.NewUnmarshalDispatcher())
ddStream.SetPulsarClient(msgStreamURL)
ddStream.CreatePulsarConsumers(consumeChannels, consumeSubName)
var stream msgstream.MsgStream = ddStream
dsService.ddStream = stream

View File

@ -13,7 +13,7 @@ import (
type serviceTimeNode struct {
baseNode
replica collectionReplica
timeTickMsgStream msgstream.MsgStream
timeTickMsgStream *pulsarms.PulsarMsgStream
}
func (stNode *serviceTimeNode) Name() string {
@ -78,9 +78,10 @@ func newServiceTimeNode(ctx context.Context, replica collectionReplica) *service
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
factory := pulsarms.NewFactory(Params.PulsarAddress, Params.SearchReceiveBufSize, 1024)
timeTimeMsgStream, _ := factory.NewMsgStream(ctx)
timeTimeMsgStream.AsProducer([]string{Params.QueryTimeTickChannelName})
factory := msgstream.ProtoUDFactory{}
timeTimeMsgStream := pulsarms.NewPulsarMsgStream(ctx, Params.SearchReceiveBufSize, 1024, factory.NewUnmarshalDispatcher())
timeTimeMsgStream.SetPulsarClient(Params.PulsarAddress)
timeTimeMsgStream.CreatePulsarProducers([]string{Params.QueryTimeTickChannelName})
return &serviceTimeNode{
baseNode: baseNode,

View File

@ -130,10 +130,10 @@ import (
//
// insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
// insertStream.SetPulsarClient(Params.PulsarAddress)
// insertStream.AsProducer(insertChannels)
// insertStream.CreatePulsarProducers(insertChannels)
// ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
// ddStream.SetPulsarClient(Params.PulsarAddress)
// ddStream.AsProducer(ddChannels)
// ddStream.CreatePulsarProducers(ddChannels)
//
// var insertMsgStream msgstream.MsgStream = insertStream
// insertMsgStream.Start()
@ -206,7 +206,7 @@ import (
// }
// searchStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
// searchStream.SetPulsarClient(Params.PulsarAddress)
// searchStream.AsProducer(newSearchChannelNames)
// searchStream.CreatePulsarProducers(newSearchChannelNames)
// searchStream.Start()
// err = searchStream.Produce(fn(1))
// assert.NoError(t, err)
@ -215,7 +215,7 @@ import (
// searchResultStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
// searchResultStream.SetPulsarClient(Params.PulsarAddress)
// unmarshalDispatcher := util.NewUnmarshalDispatcher()
// searchResultStream.AsConsumer(newSearchResultChannelNames, "loadIndexTestSubSearchResult", unmarshalDispatcher, receiveBufSize)
// searchResultStream.CreatePulsarConsumers(newSearchResultChannelNames, "loadIndexTestSubSearchResult", unmarshalDispatcher, receiveBufSize)
// searchResultStream.Start()
// searchResult := searchResultStream.Consume()
// assert.NotNil(t, searchResult)
@ -295,7 +295,7 @@ import (
// // init message stream consumer and do checks
// statsMs := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, Params.StatsReceiveBufSize)
// statsMs.SetPulsarClient(Params.PulsarAddress)
// statsMs.AsConsumer([]string{Params.StatsChannelName}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
// statsMs.CreatePulsarConsumers([]string{Params.StatsChannelName}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
// statsMs.Start()
//
// findFiledStats := false
@ -464,10 +464,10 @@ import (
//
// insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
// insertStream.SetPulsarClient(Params.PulsarAddress)
// insertStream.AsProducer(insertChannels)
// insertStream.CreatePulsarProducers(insertChannels)
// ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
// ddStream.SetPulsarClient(Params.PulsarAddress)
// ddStream.AsProducer(ddChannels)
// ddStream.CreatePulsarProducers(ddChannels)
//
// var insertMsgStream msgstream.MsgStream = insertStream
// insertMsgStream.Start()
@ -529,7 +529,7 @@ import (
// }
// searchStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
// searchStream.SetPulsarClient(Params.PulsarAddress)
// searchStream.AsProducer(newSearchChannelNames)
// searchStream.CreatePulsarProducers(newSearchChannelNames)
// searchStream.Start()
// err = searchStream.Produce(fn(1))
// assert.NoError(t, err)
@ -538,7 +538,7 @@ import (
// searchResultStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
// searchResultStream.SetPulsarClient(Params.PulsarAddress)
// unmarshalDispatcher := util.NewUnmarshalDispatcher()
// searchResultStream.AsConsumer(newSearchResultChannelNames, "loadIndexTestSubSearchResult2", unmarshalDispatcher, receiveBufSize)
// searchResultStream.CreatePulsarConsumers(newSearchResultChannelNames, "loadIndexTestSubSearchResult2", unmarshalDispatcher, receiveBufSize)
// searchResultStream.Start()
// searchResult := searchResultStream.Consume()
// assert.NotNil(t, searchResult)
@ -612,7 +612,7 @@ import (
// // init message stream consumer and do checks
// statsMs := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, Params.StatsReceiveBufSize)
// statsMs.SetPulsarClient(Params.PulsarAddress)
// statsMs.AsConsumer([]string{Params.StatsChannelName}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
// statsMs.CreatePulsarConsumers([]string{Params.StatsChannelName}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
// statsMs.Start()
//
// findFiledStats := false
@ -1016,13 +1016,15 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID,
ddChannels := Params.DDChannelNames
pulsarURL := Params.PulsarAddress
factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
insertStream, _ := factory.NewMsgStream(ctx)
insertStream.AsProducer(insertChannels)
insertStream.AsConsumer(insertChannels, Params.MsgChannelSubName)
factory := msgstream.ProtoUDFactory{}
insertStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(insertChannels)
insertStream.CreatePulsarConsumers(insertChannels, Params.MsgChannelSubName)
ddStream, _ := factory.NewMsgStream(ctx)
ddStream.AsProducer(ddChannels)
ddStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
ddStream.SetPulsarClient(pulsarURL)
ddStream.CreatePulsarProducers(ddChannels)
var insertMsgStream msgstream.MsgStream = insertStream
insertMsgStream.Start()
@ -1074,13 +1076,15 @@ func sentTimeTick(ctx context.Context) error {
ddChannels := Params.DDChannelNames
pulsarURL := Params.PulsarAddress
factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
insertStream, _ := factory.NewMsgStream(ctx)
insertStream.AsProducer(insertChannels)
insertStream.AsConsumer(insertChannels, Params.MsgChannelSubName)
factory := msgstream.ProtoUDFactory{}
insertStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(insertChannels)
insertStream.CreatePulsarConsumers(insertChannels, Params.MsgChannelSubName)
ddStream, _ := factory.NewMsgStream(ctx)
ddStream.AsProducer(ddChannels)
ddStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
ddStream.SetPulsarClient(pulsarURL)
ddStream.CreatePulsarProducers(ddChannels)
var insertMsgStream msgstream.MsgStream = insertStream
insertMsgStream.Start()

View File

@ -333,11 +333,11 @@ func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*co
// add request channel
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
searchStream.AsConsumer(consumeChannels, consumeSubName)
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName)
// add result channel
producerChannels := []string{in.ResultChannelID}
resultStream.AsProducer(producerChannels)
resultStream.CreatePulsarProducers(producerChannels)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
@ -382,12 +382,12 @@ func (node *QueryNode) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
// TODO: searchStream.RemovePulsarConsumers(producerChannels)
searchStream.AsConsumer(consumeChannels, consumeSubName)
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName)
// remove result channel
producerChannels := []string{in.ResultChannelID}
// TODO: resultStream.RemovePulsarProducer(producerChannels)
resultStream.AsProducer(producerChannels)
resultStream.CreatePulsarProducers(producerChannels)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
@ -420,7 +420,7 @@ func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*com
// add request channel
consumeChannels := in.ChannelIDs
consumeSubName := Params.MsgChannelSubName
fgDMMsgStream.AsConsumer(consumeChannels, consumeSubName)
fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,

View File

@ -47,17 +47,19 @@ func newSearchService(ctx context.Context, replica collectionReplica) *searchSer
msgStreamURL := Params.PulsarAddress
factory := pulsarms.NewFactory(msgStreamURL, receiveBufSize, pulsarBufSize)
factory := msgstream.ProtoUDFactory{}
consumeChannels := Params.SearchChannelNames
consumeSubName := Params.MsgChannelSubName
searchStream, _ := factory.NewMsgStream(ctx)
searchStream.AsConsumer(consumeChannels, consumeSubName)
searchStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, pulsarBufSize, factory.NewUnmarshalDispatcher())
searchStream.SetPulsarClient(msgStreamURL)
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName)
var inputStream msgstream.MsgStream = searchStream
producerChannels := Params.SearchResultChannelNames
searchResultStream, _ := factory.NewMsgStream(ctx)
searchResultStream.AsProducer(producerChannels)
searchResultStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, pulsarBufSize, factory.NewUnmarshalDispatcher())
searchResultStream.SetPulsarClient(msgStreamURL)
searchResultStream.CreatePulsarProducers(producerChannels)
var outputStream msgstream.MsgStream = searchResultStream
searchServiceCtx, searchServiceCancel := context.WithCancel(ctx)

View File

@ -93,9 +93,10 @@ func TestSearch_Search(t *testing.T) {
msgPackSearch := msgstream.MsgPack{}
msgPackSearch.Msgs = append(msgPackSearch.Msgs, searchMsg)
factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
searchStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
searchStream.AsProducer(searchProducerChannels)
factory := msgstream.ProtoUDFactory{}
searchStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
searchStream.SetPulsarClient(pulsarURL)
searchStream.CreatePulsarProducers(searchProducerChannels)
searchStream.Start()
err = searchStream.Produce(&msgPackSearch)
assert.NoError(t, err)
@ -179,11 +180,13 @@ func TestSearch_Search(t *testing.T) {
insertChannels := Params.InsertChannelNames
ddChannels := Params.DDChannelNames
insertStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
insertStream.AsProducer(insertChannels)
insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(insertChannels)
ddStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
ddStream.AsProducer(ddChannels)
ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
ddStream.SetPulsarClient(pulsarURL)
ddStream.CreatePulsarProducers(ddChannels)
var insertMsgStream msgstream.MsgStream = insertStream
insertMsgStream.Start()
@ -283,9 +286,10 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
msgPackSearch := msgstream.MsgPack{}
msgPackSearch.Msgs = append(msgPackSearch.Msgs, searchMsg)
factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
searchStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
searchStream.AsProducer(searchProducerChannels)
factory := msgstream.ProtoUDFactory{}
searchStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
searchStream.SetPulsarClient(pulsarURL)
searchStream.CreatePulsarProducers(searchProducerChannels)
searchStream.Start()
err = searchStream.Produce(&msgPackSearch)
assert.NoError(t, err)
@ -373,11 +377,13 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
insertChannels := Params.InsertChannelNames
ddChannels := Params.DDChannelNames
insertStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
insertStream.AsProducer(insertChannels)
insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(insertChannels)
ddStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
ddStream.AsProducer(ddChannels)
ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
ddStream.SetPulsarClient(pulsarURL)
ddStream.CreatePulsarProducers(ddChannels)
var insertMsgStream msgstream.MsgStream = insertStream
insertMsgStream.Start()

View File

@ -36,13 +36,16 @@ func newStatsService(ctx context.Context, replica collectionReplica, fieldStatsC
func (sService *statsService) start() {
sleepTimeInterval := Params.StatsPublishInterval
receiveBufSize := Params.StatsReceiveBufSize
// start pulsar
msgStreamURL := Params.PulsarAddress
producerChannels := []string{Params.StatsChannelName}
factory := pulsarms.NewFactory(Params.PulsarAddress, Params.StatsReceiveBufSize, 1024)
statsStream, _ := factory.NewMsgStream(sService.ctx)
statsStream.AsProducer(producerChannels)
factory := msgstream.ProtoUDFactory{}
statsStream := pulsarms.NewPulsarMsgStream(sService.ctx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
statsStream.SetPulsarClient(msgStreamURL)
statsStream.CreatePulsarProducers(producerChannels)
var statsMsgStream msgstream.MsgStream = statsStream

View File

@ -3,7 +3,6 @@ package querynode
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
)
@ -27,10 +26,11 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) {
producerChannels := []string{Params.StatsChannelName}
pulsarURL := Params.PulsarAddress
factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
statsStream, err := factory.NewMsgStream(node.queryNodeLoopCtx)
assert.Nil(t, err)
statsStream.AsProducer(producerChannels)
factory := msgstream.ProtoUDFactory{}
statsStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
statsStream.SetPulsarClient(pulsarURL)
statsStream.CreatePulsarProducers(producerChannels)
var statsMsgStream msgstream.MsgStream = statsStream

View File

@ -228,10 +228,10 @@ func TestLoadCollection(t *testing.T) {
//
//insertStream := pulsarms.NewPulsarMsgStream(context.Background(), receiveBufSize)
//insertStream.SetPulsarClient(pulsarAddress)
//insertStream.AsProducer(insertChannels)
//insertStream.CreatePulsarProducers(insertChannels)
//ddStream := pulsarms.NewPulsarMsgStream(context.Background(), receiveBufSize)
//ddStream.SetPulsarClient(pulsarAddress)
//ddStream.AsProducer(ddChannels)
//ddStream.CreatePulsarProducers(ddChannels)
//
//var insertMsgStream msgstream.MsgStream = insertStream
//insertMsgStream.Start()
@ -246,7 +246,7 @@ func TestLoadCollection(t *testing.T) {
//consumeStream := pulsarms.NewPulsarTtMsgStream(context.Background(), receiveBufSize)
//consumeStream.SetPulsarClient(pulsarAddress)
//unmarshalDispatcher := util.NewUnmarshalDispatcher()
//consumeStream.AsConsumer(insertChannels, "test", unmarshalDispatcher, pulsarBufSize)
//consumeStream.CreatePulsarConsumers(insertChannels, "test", unmarshalDispatcher, pulsarBufSize)
//consumeStream.Start()
//
//for i := 0; i < 10; i++ {