mirror of https://github.com/milvus-io/milvus.git
Add Buffer for consumer channel (#9578)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/10000/head
parent
da5195aab8
commit
d845153de4
|
@ -19,6 +19,7 @@ package datanode
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
|
@ -47,11 +48,13 @@ func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNode
|
||||||
|
|
||||||
if seekPos != nil {
|
if seekPos != nil {
|
||||||
seekPos.ChannelName = pchannelName
|
seekPos.ChannelName = pchannelName
|
||||||
log.Debug("datanode Seek", zap.String("channelName", seekPos.GetChannelName()))
|
start := time.Now()
|
||||||
|
log.Debug("datanode begin to seek: " + seekPos.GetChannelName())
|
||||||
err = insertStream.Seek([]*internalpb.MsgPosition{seekPos})
|
err = insertStream.Seek([]*internalpb.MsgPosition{seekPos})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
log.Debug("datanode Seek successfully: "+seekPos.GetChannelName(), zap.Int64("elapse ", time.Since(start).Milliseconds()))
|
||||||
}
|
}
|
||||||
|
|
||||||
node := flowgraph.NewInputNode(insertStream, "dmInputNode", dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism)
|
node := flowgraph.NewInputNode(insertStream, "dmInputNode", dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism)
|
||||||
|
|
|
@ -874,11 +874,6 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
|
||||||
}
|
}
|
||||||
ms.addConsumer(consumer, mp.ChannelName)
|
ms.addConsumer(consumer, mp.ChannelName)
|
||||||
|
|
||||||
//TODO: May cause problem
|
|
||||||
//if len(consumer.Chan()) == 0 {
|
|
||||||
// return nil
|
|
||||||
//}
|
|
||||||
|
|
||||||
runLoop := true
|
runLoop := true
|
||||||
for runLoop {
|
for runLoop {
|
||||||
select {
|
select {
|
||||||
|
|
|
@ -34,7 +34,7 @@ func (pc *pulsarConsumer) Subscription() string {
|
||||||
func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage {
|
func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage {
|
||||||
if pc.msgChannel == nil {
|
if pc.msgChannel == nil {
|
||||||
pc.once.Do(func() {
|
pc.once.Do(func() {
|
||||||
pc.msgChannel = make(chan ConsumerMessage)
|
pc.msgChannel = make(chan ConsumerMessage, 256)
|
||||||
// this part handles msgstream expectation when the consumer is not seeked
|
// this part handles msgstream expectation when the consumer is not seeked
|
||||||
// pulsar's default behavior is setting postition to the earliest pointer when client of the same subscription pointer is not acked
|
// pulsar's default behavior is setting postition to the earliest pointer when client of the same subscription pointer is not acked
|
||||||
// yet, our message stream is to setting to the very start point of the topic
|
// yet, our message stream is to setting to the very start point of the topic
|
||||||
|
|
|
@ -34,7 +34,7 @@ func (rc *RmqConsumer) Subscription() string {
|
||||||
func (rc *RmqConsumer) Chan() <-chan ConsumerMessage {
|
func (rc *RmqConsumer) Chan() <-chan ConsumerMessage {
|
||||||
if rc.msgChannel == nil {
|
if rc.msgChannel == nil {
|
||||||
rc.once.Do(func() {
|
rc.once.Do(func() {
|
||||||
rc.msgChannel = make(chan ConsumerMessage)
|
rc.msgChannel = make(chan ConsumerMessage, 256)
|
||||||
go func() {
|
go func() {
|
||||||
for { //nolint:gosimple
|
for { //nolint:gosimple
|
||||||
select {
|
select {
|
||||||
|
|
Loading…
Reference in New Issue