Disable pulsar batch and change background flush goroutine to larger interval (#18888)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/19128/head
Xiaofan 2022-09-09 11:26:35 +08:00 committed by GitHub
parent ceea04c274
commit eabdc2b114
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 6 deletions

View File

@ -346,10 +346,12 @@ func TestStream_KafkaTtMsgStream_2(t *testing.T) {
ctx := context.Background() ctx := context.Background()
inputStream1 := getKafkaInputStream(ctx, kafkaAddress, p1Channels) inputStream1 := getKafkaInputStream(ctx, kafkaAddress, p1Channels)
defer inputStream1.Close()
msgPacks1 := createRandMsgPacks(3, 10, 10) msgPacks1 := createRandMsgPacks(3, 10, 10)
assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1)) assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1))
inputStream2 := getKafkaInputStream(ctx, kafkaAddress, p2Channels) inputStream2 := getKafkaInputStream(ctx, kafkaAddress, p2Channels)
defer inputStream2.Close()
msgPacks2 := createRandMsgPacks(5, 10, 10) msgPacks2 := createRandMsgPacks(5, 10, 10)
assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2)) assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2))
@ -365,17 +367,17 @@ func TestStream_KafkaTtMsgStream_2(t *testing.T) {
} else { } else {
outputStream = getKafkaTtOutputStreamAndSeek(ctx, kafkaAddress, rcvMsgPacks[msgCount-1].EndPositions) outputStream = getKafkaTtOutputStreamAndSeek(ctx, kafkaAddress, rcvMsgPacks[msgCount-1].EndPositions)
} }
defer outputStream.Close()
msgPack := consumer(ctx, outputStream) msgPack := consumer(ctx, outputStream)
rcvMsgPacks = append(rcvMsgPacks, msgPack) rcvMsgPacks = append(rcvMsgPacks, msgPack)
if len(msgPack.Msgs) > 0 { if len(msgPack.Msgs) > 0 {
for _, msg := range msgPack.Msgs { for _, msg := range msgPack.Msgs {
log.Println("msg type: ", msg.Type(), ", msg value: ", msg) log.Println("TestStream_KafkaTtMsgStream_2 msg type: ", msg.Type(), ", msg value: ", msg)
assert.Greater(t, msg.BeginTs(), msgPack.BeginTs) assert.Greater(t, msg.BeginTs(), msgPack.BeginTs)
assert.LessOrEqual(t, msg.BeginTs(), msgPack.EndTs) assert.LessOrEqual(t, msg.BeginTs(), msgPack.EndTs)
} }
log.Println("================") log.Println("================")
} }
outputStream.Close()
return len(rcvMsgPacks[msgCount].Msgs) return len(rcvMsgPacks[msgCount].Msgs)
} }
@ -387,8 +389,6 @@ func TestStream_KafkaTtMsgStream_2(t *testing.T) {
cnt2 := (len(msgPacks2)/2 - 1) * len(msgPacks2[0].Msgs) cnt2 := (len(msgPacks2)/2 - 1) * len(msgPacks2[0].Msgs)
assert.Equal(t, (cnt1 + cnt2), msgCount) assert.Equal(t, (cnt1 + cnt2), msgCount)
inputStream1.Close()
inputStream2.Close()
} }
func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) { func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) {

View File

@ -550,7 +550,7 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
return err return err
} }
log.Info("MsgStream seek begin", zap.String("channel", mp.ChannelName), zap.Any("MessageID", messageID)) log.Info("MsgStream seek begin", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID))
err = consumer.Seek(messageID, false) err = consumer.Seek(messageID, false)
if err != nil { if err != nil {
log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err)) log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err))
@ -880,7 +880,7 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
if err != nil { if err != nil {
return err return err
} }
log.Info("MsgStream begin to seek start msg: ", zap.String("channel", mp.ChannelName), zap.Any("MessageID", seekMsgID)) log.Info("MsgStream begin to seek start msg: ", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID))
err = consumer.Seek(seekMsgID, true) err = consumer.Seek(seekMsgID, true)
if err != nil { if err != nil {
log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err)) log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err))

View File

@ -20,6 +20,7 @@ import (
"errors" "errors"
"strings" "strings"
"sync" "sync"
"time"
"github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
@ -57,6 +58,10 @@ func (pc *pulsarClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwra
opts.CompressionType = pulsar.ZSTD opts.CompressionType = pulsar.ZSTD
opts.CompressionLevel = pulsar.Faster opts.CompressionLevel = pulsar.Faster
} }
// disable automatic batching
opts.DisableBatching = true
// change the batching max publish delay higher to avoid extra cpu consumption
opts.BatchingMaxPublishDelay = 1 * time.Minute
pp, err := pc.client.CreateProducer(opts) pp, err := pc.client.CreateProducer(opts)
if err != nil { if err != nil {