Refine kafka consumer (#19846)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/19893/head
Xiaofan 2022-10-25 13:23:30 +08:00 committed by GitHub
parent 85ef7331ee
commit 1f170ce1ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 323 additions and 275 deletions

View File

@ -478,9 +478,9 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
ttMsgStream, err := s.factory.NewMsgStream(ctx)
if err != nil {
log.Error("DataCoord failed to create timetick channel", zap.Error(err))
return
panic(err)
}
ttMsgStream.AsConsumerWithPosition([]string{Params.CommonCfg.DataCoordTimeTick},
ttMsgStream.AsConsumer([]string{Params.CommonCfg.DataCoordTimeTick},
Params.CommonCfg.DataCoordSubName, mqwrapper.SubscriptionPositionLatest)
log.Info("DataCoord creates the timetick channel consumer",
zap.String("timeTickChannel", Params.CommonCfg.DataCoordTimeTick),

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
@ -379,7 +380,7 @@ func (dsService *dataSyncService) getChannelLatestMsgID(ctx context.Context, cha
zap.String("pChannelName", pChannelName),
zap.String("subscription", subName),
)
dmlStream.AsConsumer([]string{pChannelName}, subName)
dmlStream.AsConsumer([]string{pChannelName}, subName, mqwrapper.SubscriptionPositionUnknown)
id, err := dmlStream.GetLatestMsgID(pChannelName)
if err != nil {
return nil, err

View File

@ -23,6 +23,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/funcutil"
@ -44,11 +45,8 @@ func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNode
// MsgStream needs a physical channel name, but the channel name in seek position from DataCoord
// is virtual channel name, so we need to convert vchannel name into pchannel neme here.
pchannelName := funcutil.ToPhysicalChannel(dmNodeConfig.vChannelName)
insertStream.AsConsumer([]string{pchannelName}, consumeSubName)
metrics.DataNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Inc()
log.Info("datanode AsConsumer", zap.String("physical channel", pchannelName), zap.String("subName", consumeSubName), zap.Int64("collection ID", dmNodeConfig.collectionID))
if seekPos != nil {
insertStream.AsConsumer([]string{pchannelName}, consumeSubName, mqwrapper.SubscriptionPositionUnknown)
seekPos.ChannelName = pchannelName
start := time.Now()
log.Info("datanode begin to seek", zap.ByteString("seek msgID", seekPos.GetMsgID()), zap.String("physical channel", seekPos.GetChannelName()), zap.Int64("collection ID", dmNodeConfig.collectionID))
@ -57,7 +55,11 @@ func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNode
return nil, err
}
log.Info("datanode seek successfully", zap.ByteString("seek msgID", seekPos.GetMsgID()), zap.String("physical channel", seekPos.GetChannelName()), zap.Int64("collection ID", dmNodeConfig.collectionID), zap.Duration("elapse", time.Since(start)))
} else {
insertStream.AsConsumer([]string{pchannelName}, consumeSubName, mqwrapper.SubscriptionPositionEarliest)
}
metrics.DataNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Inc()
log.Info("datanode AsConsumer", zap.String("physical channel", pchannelName), zap.String("subName", consumeSubName), zap.Int64("collection ID", dmNodeConfig.collectionID))
name := fmt.Sprintf("dmInputNode-data-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName)
node := flowgraph.NewInputNode(insertStream, name, dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism)

View File

@ -71,9 +71,8 @@ func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.MsgPack {
return make(chan *msgstream.MsgPack, 100)
}
func (mtm *mockTtMsgStream) AsProducer(channels []string) {}
func (mtm *mockTtMsgStream) AsConsumer(channels []string, subName string) {}
func (mtm *mockTtMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
func (mtm *mockTtMsgStream) AsProducer(channels []string) {}
func (mtm *mockTtMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
}
func (mtm *mockTtMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {}
func (mtm *mockTtMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 {

View File

@ -16,6 +16,7 @@ import (
"sync"
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
@ -83,7 +84,7 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
if err != nil {
return nil, err
}
if options.SubscriptionInitialPosition == SubscriptionPositionLatest {
if options.SubscriptionInitialPosition == mqwrapper.SubscriptionPositionLatest {
err = c.server.SeekToLatest(options.Topic, options.SubscriptionName)
if err != nil {
return nil, err
@ -102,7 +103,7 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
return nil, err
}
if options.SubscriptionInitialPosition == SubscriptionPositionLatest {
if options.SubscriptionInitialPosition == mqwrapper.SubscriptionPositionLatest {
err = c.server.SeekToLatest(options.Topic, options.SubscriptionName)
if err != nil {
return nil, err

View File

@ -16,6 +16,7 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/stretchr/testify/assert"
)
@ -80,7 +81,7 @@ func TestClient_Subscribe(t *testing.T) {
consumer, err := client.Subscribe(ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: SubscriptionPositionEarliest,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
})
assert.Error(t, err)
assert.Nil(t, consumer)
@ -98,7 +99,7 @@ func TestClient_Subscribe(t *testing.T) {
opt := ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: SubscriptionPositionEarliest,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
}
consumer1, err := client1.Subscribe(opt)
assert.NoError(t, err)
@ -110,7 +111,7 @@ func TestClient_Subscribe(t *testing.T) {
opt1 := ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: SubscriptionPositionLatest,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionLatest,
}
consumer3, err := client1.Subscribe(opt1)
assert.NoError(t, err)
@ -141,7 +142,7 @@ func TestClient_SeekLatest(t *testing.T) {
opt := ConsumerOptions{
Topic: topicName,
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: SubscriptionPositionEarliest,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
}
consumer1, err := client.Subscribe(opt)
assert.NoError(t, err)
@ -168,7 +169,7 @@ func TestClient_SeekLatest(t *testing.T) {
opt1 := ConsumerOptions{
Topic: topicName,
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: SubscriptionPositionLatest,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionLatest,
}
consumer2, err := client.Subscribe(opt1)
assert.NoError(t, err)
@ -217,7 +218,7 @@ func TestClient_consume(t *testing.T) {
opt := ConsumerOptions{
Topic: topicName,
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: SubscriptionPositionEarliest,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
}
consumer, err := client.Subscribe(opt)
assert.NoError(t, err)

View File

@ -13,20 +13,12 @@ package client
import (
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
)
// SubscriptionInitialPosition is the initial subscription position
type SubscriptionInitialPosition int
// UniqueID is the type of message ID
type UniqueID = server.UniqueID
// List 2 kinds of SubscriptionInitialPosition
const (
SubscriptionPositionLatest SubscriptionInitialPosition = iota
SubscriptionPositionEarliest
)
// EarliestMessageID is used to get the earliest message ID, default -1
func EarliestMessageID() UniqueID {
return -1
@ -42,7 +34,7 @@ type ConsumerOptions struct {
// InitialPosition at which the cursor will be set when subscribe
// Default is `Latest`
SubscriptionInitialPosition
mqwrapper.SubscriptionInitialPosition
// Message for this consumer
// When a message is received, it will be pushed to this channel for consumption

View File

@ -15,6 +15,7 @@ import (
"os"
"testing"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/stretchr/testify/assert"
)
@ -24,7 +25,7 @@ func TestConsumer_newConsumer(t *testing.T) {
consumer, err := newConsumer(nil, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: SubscriptionPositionEarliest,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
})
assert.Nil(t, consumer)
assert.NotNil(t, err)
@ -62,7 +63,7 @@ func TestConsumer_newConsumer(t *testing.T) {
consumer1, err := newConsumer(client, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: consumerName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
})
assert.NoError(t, err)
assert.NotNil(t, consumer1)
@ -85,7 +86,7 @@ func TestConsumer_newConsumer(t *testing.T) {
consumer4, err := getExistedConsumer(client, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: SubscriptionPositionEarliest,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
}, nil)
assert.NoError(t, err)
assert.NotNil(t, consumer4)
@ -99,7 +100,7 @@ func TestConsumer_newConsumer(t *testing.T) {
consumer6, err := getExistedConsumer(client, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: "",
SubscriptionInitialPosition: SubscriptionPositionEarliest,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
}, nil)
assert.Error(t, err)
assert.Nil(t, consumer6)
@ -111,7 +112,7 @@ func TestConsumer_Subscription(t *testing.T) {
consumer, err := newConsumer(newMockClient(), ConsumerOptions{
Topic: topicName,
SubscriptionName: consumerName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
})
assert.Nil(t, consumer)
assert.NotNil(t, err)
@ -135,7 +136,7 @@ func TestConsumer_Seek(t *testing.T) {
consumer, err := newConsumer(client, ConsumerOptions{
Topic: topicName,
SubscriptionName: consumerName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
})
assert.NoError(t, err)
assert.NotNil(t, consumer)

View File

@ -23,6 +23,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/milvus-io/milvus/internal/log"
rmqimplserver "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
kafkawrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka"
puslarmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/pulsar"
rmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/rmq"
@ -103,7 +104,7 @@ func (f *PmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, st
if err != nil {
return err
}
msgstream.AsConsumer([]string{channel}, subname)
msgstream.AsConsumer(channels, subname, mqwrapper.SubscriptionPositionUnknown)
msgstream.Close()
}
}
@ -152,7 +153,7 @@ func (f *RmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, st
if err != nil {
return err
}
msgstream.AsConsumer(channels, subname)
msgstream.AsConsumer(channels, subname, mqwrapper.SubscriptionPositionUnknown)
msgstream.Close()
return nil
}
@ -199,7 +200,7 @@ func (f *KmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, st
if err != nil {
return err
}
msgstream.AsConsumer(channels, subname)
msgstream.AsConsumer(channels, subname, mqwrapper.SubscriptionPositionUnknown)
msgstream.Close()
return nil
}

View File

@ -131,7 +131,7 @@ func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) {
// pick a seekPosition
var seekPosition *internalpb.MsgPosition
outputStream := getKafkaOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName)
outputStream := getKafkaOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
for i := 0; i < 10; i++ {
result := consumer(ctx, outputStream)
assert.Equal(t, result.Msgs[0].ID(), int64(i))
@ -143,7 +143,7 @@ func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) {
outputStream.Close()
// create a consumer can consume data from seek position to last msg
outputStream2 := getKafkaOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName)
outputStream2 := getKafkaOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionUnknown)
lastMsgID, err := outputStream2.GetLatestMsgID(c)
defer outputStream2.Close()
assert.Nil(t, err)
@ -412,7 +412,7 @@ func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) {
factory := ProtoUDFactory{}
kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress)
outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher())
outputStream.AsConsumerWithPosition(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest)
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest)
outputStream.Start()
var wg sync.WaitGroup
@ -464,11 +464,11 @@ func getKafkaInputStream(ctx context.Context, kafkaAddress string, producerChann
return inputStream
}
func getKafkaOutputStream(ctx context.Context, kafkaAddress string, consumerChannels []string, consumerSubName string) MsgStream {
func getKafkaOutputStream(ctx context.Context, kafkaAddress string, consumerChannels []string, consumerSubName string, position mqwrapper.SubscriptionInitialPosition) MsgStream {
factory := ProtoUDFactory{}
kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress)
outputStream, _ := NewMqMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.AsConsumer(consumerChannels, consumerSubName, position)
outputStream.Start()
return outputStream
}
@ -477,7 +477,7 @@ func getKafkaTtOutputStream(ctx context.Context, kafkaAddress string, consumerCh
factory := ProtoUDFactory{}
kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress)
outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
outputStream.Start()
return outputStream
}
@ -490,7 +490,7 @@ func getKafkaTtOutputStreamAndSeek(ctx context.Context, kafkaAddress string, pos
for _, c := range positions {
consumerName = append(consumerName, c.ChannelName)
}
outputStream.AsConsumer(consumerName, funcutil.RandomString(8))
outputStream.AsConsumer(consumerName, funcutil.RandomString(8), mqwrapper.SubscriptionPositionUnknown)
outputStream.Seek(positions)
outputStream.Start()
return outputStream

View File

@ -125,11 +125,6 @@ func (ms *mqMsgStream) AsProducer(channels []string) {
}
}
// AsConsumer Create consumer to receive message from channels
func (ms *mqMsgStream) AsConsumer(channels []string, subName string) {
ms.AsConsumerWithPosition(channels, subName, mqwrapper.SubscriptionPositionEarliest)
}
func (ms *mqMsgStream) GetLatestMsgID(channel string) (MessageID, error) {
lastMsg, err := ms.consumers[channel].GetLatestMsgID()
if err != nil {
@ -141,7 +136,7 @@ func (ms *mqMsgStream) GetLatestMsgID(channel string) (MessageID, error) {
// AsConsumerWithPosition Create consumer to receive message from channels, with initial position
// if initial position is set to latest, last message in the channel is exclusive
func (ms *mqMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
func (ms *mqMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
for _, channel := range channels {
if _, ok := ms.consumers[channel]; ok {
continue
@ -622,13 +617,8 @@ func (ms *MqTtMsgStream) addConsumer(consumer mqwrapper.Consumer, channel string
ms.chanTtMsgTime[consumer] = 0
}
// AsConsumer subscribes channels as consumer for a MsgStream
func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string) {
ms.AsConsumerWithPosition(channels, subName, mqwrapper.SubscriptionPositionEarliest)
}
// AsConsumerWithPosition subscribes channels as consumer for a MsgStream and seeks to a certain position.
func (ms *MqTtMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
for _, channel := range channels {
if _, ok := ms.consumers[channel]; ok {
continue

View File

@ -179,8 +179,8 @@ func TestMqMsgStream_AsConsumer(t *testing.T) {
assert.Nil(t, err)
// repeat calling AsConsumer
m.AsConsumer([]string{"a"}, "b")
m.AsConsumer([]string{"a"}, "b")
m.AsConsumer([]string{"a"}, "b", mqwrapper.SubscriptionPositionUnknown)
m.AsConsumer([]string{"a"}, "b", mqwrapper.SubscriptionPositionUnknown)
}(parameters[i].client)
}
}
@ -596,7 +596,7 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
pulsarClient2, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
outputStream.Start()
var output MsgStream = outputStream
@ -651,7 +651,7 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
pulsarClient2, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
outputStream.Start()
var output MsgStream = outputStream
@ -685,7 +685,7 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
pulsarClient2, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
outputStream.Start()
var output MsgStream = outputStream
@ -835,7 +835,7 @@ func TestStream_PulsarMsgStream_SeekToLast(t *testing.T) {
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumer(consumerChannels, consumerSubName)
outputStream2.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
lastMsgID, err := outputStream2.GetLatestMsgID(c)
defer outputStream2.Close()
assert.Nil(t, err)
@ -1241,7 +1241,7 @@ func TestStream_MqMsgStream_Seek(t *testing.T) {
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumer(consumerChannels, consumerSubName)
outputStream2.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
outputStream2.Seek([]*internalpb.MsgPosition{seekPosition})
outputStream2.Start()
@ -1284,7 +1284,7 @@ func TestStream_MqMsgStream_SeekInvalidMessage(t *testing.T) {
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumer(consumerChannels, funcutil.RandomString(8))
outputStream2.AsConsumer(consumerChannels, funcutil.RandomString(8), mqwrapper.SubscriptionPositionEarliest)
defer outputStream2.Close()
messageID, _ := pulsar.DeserializeMessageID(seekPosition.MsgID)
// try to seek to not written position
@ -1342,7 +1342,7 @@ func TestStream_RMqMsgStream_SeekInvalidMessage(t *testing.T) {
factory := ProtoUDFactory{}
rmqClient2, _ := rmq.NewClientWithDefaultOptions()
outputStream2, _ := NewMqMsgStream(ctx, 100, 100, rmqClient2, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumer(consumerChannels, funcutil.RandomString(8))
outputStream2.AsConsumer(consumerChannels, funcutil.RandomString(8), mqwrapper.SubscriptionPositionUnknown)
id := common.Endian.Uint64(seekPosition.MsgID) + 10
bs := make([]byte, 8)
@ -1395,7 +1395,7 @@ func TestStream_MqMsgStream_SeekLatest(t *testing.T) {
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumerWithPosition(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest)
outputStream2.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest)
outputStream2.Start()
msgPack.Msgs = nil
@ -1468,7 +1468,7 @@ func initRmqStream(ctx context.Context,
rmqClient2, _ := rmq.NewClientWithDefaultOptions()
outputStream, _ := NewMqMsgStream(ctx, 100, 100, rmqClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerGroupName)
outputStream.AsConsumer(consumerChannels, consumerGroupName, mqwrapper.SubscriptionPositionEarliest)
outputStream.Start()
var output MsgStream = outputStream
@ -1493,7 +1493,7 @@ func initRmqTtStream(ctx context.Context,
rmqClient2, _ := rmq.NewClientWithDefaultOptions()
outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, rmqClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerGroupName)
outputStream.AsConsumer(consumerChannels, consumerGroupName, mqwrapper.SubscriptionPositionEarliest)
outputStream.Start()
var output MsgStream = outputStream
@ -1602,8 +1602,7 @@ func TestStream_RmqTtMsgStream_DuplicatedIDs(t *testing.T) {
rmqClient, _ := rmq.NewClientWithDefaultOptions()
outputStream, _ = NewMqTtMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher())
consumerSubName = funcutil.RandomString(8)
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionUnknown)
outputStream.Seek(receivedMsg.StartPositions)
outputStream.Start()
seekMsg := consumer(ctx, outputStream)
@ -1708,7 +1707,7 @@ func TestStream_RmqTtMsgStream_Seek(t *testing.T) {
rmqClient, _ := rmq.NewClientWithDefaultOptions()
outputStream, _ = NewMqTtMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher())
consumerSubName = funcutil.RandomString(8)
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionUnknown)
outputStream.Seek(receivedMsg3.StartPositions)
outputStream.Start()
@ -2111,7 +2110,7 @@ func getPulsarOutputStream(ctx context.Context, pulsarAddress string, consumerCh
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
outputStream.Start()
return outputStream
}
@ -2120,7 +2119,7 @@ func getPulsarTtOutputStream(ctx context.Context, pulsarAddress string, consumer
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
outputStream.Start()
return outputStream
}
@ -2133,7 +2132,7 @@ func getPulsarTtOutputStreamAndSeek(ctx context.Context, pulsarAddress string, p
for _, c := range positions {
consumerName = append(consumerName, c.ChannelName)
}
outputStream.AsConsumer(consumerName, funcutil.RandomString(8))
outputStream.AsConsumer(consumerName, funcutil.RandomString(8), mqwrapper.SubscriptionPositionUnknown)
outputStream.Seek(positions)
outputStream.Start()
return outputStream
@ -2202,7 +2201,7 @@ func TestStream_RmqTtMsgStream_AsConsumerWithPosition(t *testing.T) {
rmqClient2, _ := rmq.NewClientWithDefaultOptions()
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, rmqClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumerWithPosition(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest)
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest)
outputStream.Start()
inputStream.Produce(getTimeTickMsgPack(1000))

View File

@ -25,6 +25,9 @@ const (
// SubscriptionPositionEarliest is earliest position which means the start consuming position will be the first message
SubscriptionPositionEarliest
// SubscriptionPositionUnkown indicates we don't care about the consumer location, since we are doing another seek or only some meta api over that
SubscriptionPositionUnknown
)
const DefaultPartitionIdx = 0
@ -53,7 +56,7 @@ type Consumer interface {
// returns the subscription for the consumer
Subscription() string
// Message channel
// Get Message channel, once you chan you can not seek again
Chan() <-chan Message
// Seek to the uniqueID position

View File

@ -25,10 +25,10 @@ type kafkaClient struct {
func getBasicConfig(address string) kafka.ConfigMap {
return kafka.ConfigMap{
"bootstrap.servers": address,
"socket.timeout.ms": 300000,
"socket.max.fails": 3,
"api.version.request": true,
"bootstrap.servers": address,
"api.version.request": true,
"reconnect.backoff.ms": 20,
"reconnect.backoff.max.ms": 5000,
}
}
@ -119,7 +119,8 @@ func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap {
// default max message size 5M
newConf.SetKey("message.max.bytes", 10485760)
newConf.SetKey("compression.codec", "zstd")
newConf.SetKey("linger.ms", 20)
// we want to ensure tt send out as soon as possible
newConf.SetKey("linger.ms", 2)
//special producer config
kc.specialExtraConfig(newConf, kc.producerConfig)
@ -130,24 +131,12 @@ func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap {
func (kc *kafkaClient) newConsumerConfig(group string, offset mqwrapper.SubscriptionInitialPosition) *kafka.ConfigMap {
newConf := cloneKafkaConfig(kc.basicConfig)
if offset == mqwrapper.SubscriptionPositionEarliest {
newConf.SetKey("auto.offset.reset", "earliest")
} else {
newConf.SetKey("auto.offset.reset", "latest")
}
newConf.SetKey("session.timeout.ms", 180000)
newConf.SetKey("group.id", group)
newConf.SetKey("enable.auto.commit", false)
//Kafka default will not create topics if consumer's the topics don't exist.
//In order to compatible with other MQ, we need to enable the following configuration,
//meanwhile, some implementation also try to consume a non-exist topic, such as dataCoordTimeTick.
newConf.SetKey("allow.auto.create.topics", true)
//newConf.SetKey("enable.partition.eof", true)
newConf.SetKey("go.events.channel.enable", true)
kc.specialExtraConfig(newConf, kc.consumerConfig)
return newConf
@ -166,8 +155,11 @@ func (kc *kafkaClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrap
func (kc *kafkaClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Consumer, error) {
config := kc.newConsumerConfig(options.SubscriptionName, options.SubscriptionInitialPosition)
consumer, err := newKafkaConsumer(config, options.Topic, options.SubscriptionName)
return consumer, err
consumer, err := newKafkaConsumer(config, options.Topic, options.SubscriptionName, options.SubscriptionInitialPosition)
if err != nil {
return nil, err
}
return consumer, nil
}
func (kc *kafkaClient) EarliestMessageID() mqwrapper.MessageID {

View File

@ -12,34 +12,84 @@ import (
)
type Consumer struct {
c *kafka.Consumer
config *kafka.ConfigMap
msgChannel chan mqwrapper.Message
hasSeek bool
hasConsume bool
skipMsg bool
latestMsgOffset kafka.Offset
topic string
groupID string
closeCh chan struct{}
chanOnce sync.Once
closeOnce sync.Once
c *kafka.Consumer
config *kafka.ConfigMap
msgChannel chan mqwrapper.Message
hasAssign bool
skipMsg bool
topic string
groupID string
chanOnce sync.Once
closeOnce sync.Once
closeCh chan struct{}
}
func newKafkaConsumer(config *kafka.ConfigMap, topic string, groupID string) (*Consumer, error) {
closeCh := make(chan struct{})
msgChannel := make(chan mqwrapper.Message, 256)
const timeout = 3000
kafkaConsumer := &Consumer{
func newKafkaConsumer(config *kafka.ConfigMap, topic string, groupID string, position mqwrapper.SubscriptionInitialPosition) (*Consumer, error) {
msgChannel := make(chan mqwrapper.Message, 256)
kc := &Consumer{
config: config,
msgChannel: msgChannel,
topic: topic,
groupID: groupID,
closeCh: closeCh,
closeCh: make(chan struct{}),
}
err := kafkaConsumer.createKafkaConsumer()
return kafkaConsumer, err
err := kc.createKafkaConsumer()
if err != nil {
return nil, err
}
// if it's unknown, we leave the assign to seek
if position != mqwrapper.SubscriptionPositionUnknown {
var offset kafka.Offset
if position == mqwrapper.SubscriptionPositionEarliest {
offset, err = kafka.NewOffset("earliest")
if err != nil {
return nil, err
}
} else {
latestMsgID, err := kc.GetLatestMsgID()
if err != nil {
switch v := err.(type) {
case kafka.Error:
if v.Code() == kafka.ErrUnknownTopic || v.Code() == kafka.ErrUnknownPartition || v.Code() == kafka.ErrUnknownTopicOrPart {
log.Warn("get latest msg ID failed, topic or partition does not exists!",
zap.String("topic", kc.topic),
zap.String("err msg", v.String()))
offset, err = kafka.NewOffset("earliest")
if err != nil {
return nil, err
}
}
default:
log.Error("kafka get latest msg ID failed", zap.String("topic", kc.topic), zap.Error(err))
return nil, err
}
} else {
offset = kafka.Offset(latestMsgID.(*kafkaID).messageID)
kc.skipMsg = true
}
}
start := time.Now()
topicPartition := []kafka.TopicPartition{{Topic: &topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}}
err = kc.c.Assign(topicPartition)
if err != nil {
log.Error("kafka consumer assign failed ", zap.String("topic name", topic), zap.Any("Msg position", position), zap.Error(err))
return nil, err
}
cost := time.Since(start).Milliseconds()
if cost > 200 {
log.Warn("kafka consumer assign take too long!", zap.String("topic name", topic), zap.Any("Msg position", position), zap.Int64("time cost(ms)", cost))
}
kc.hasAssign = true
}
return kc, nil
}
func (kc *Consumer) createKafkaConsumer() error {
@ -49,25 +99,6 @@ func (kc *Consumer) createKafkaConsumer() error {
log.Error("create kafka consumer failed", zap.String("topic", kc.topic), zap.Error(err))
return err
}
latestMsgID, err := kc.GetLatestMsgID()
if err != nil {
switch v := err.(type) {
case kafka.Error:
if v.Code() == kafka.ErrUnknownTopic || v.Code() == kafka.ErrUnknownPartition || v.Code() == kafka.ErrUnknownTopicOrPart {
log.Warn("get latest msg ID failed, topic or partition does not exists!",
zap.String("topic", kc.topic),
zap.String("err msg", v.String()))
kc.latestMsgOffset = kafka.OffsetBeginning
}
default:
log.Error("get latest msg ID failed", zap.String("topic", kc.topic), zap.Error(err))
return err
}
} else {
kc.latestMsgOffset = kafka.Offset(latestMsgID.(*kafkaID).messageID)
}
return nil
}
@ -80,111 +111,72 @@ func (kc *Consumer) Subscription() string {
// channel-based consumer API had already deprecated, see more details
// https://github.com/confluentinc/confluent-kafka-go.
func (kc *Consumer) Chan() <-chan mqwrapper.Message {
if !kc.hasAssign {
log.Error("can not chan with not assigned channel", zap.String("topic", kc.topic), zap.String("groupID", kc.groupID))
panic("failed to chan a kafka consumer without assign")
}
kc.chanOnce.Do(func() {
if !kc.hasSeek {
offsetStr, err := kc.config.Get("auto.offset.reset", "earliest")
if err != nil {
log.Error("get auto.offset.reset config fail in kafka consumer", zap.String("topic name", kc.topic), zap.Error(err))
panic(err)
}
offset, err := kafka.NewOffset(offsetStr)
if err != nil {
log.Error("Invalid kafka offset", zap.String("topic name", kc.topic), zap.Error(err))
panic(err)
}
// we assume that case is Chan starting before producing message with auto create topic config,
// consuming messages will fail that error is 'Subscribed topic not available'
// if invoke Subscribe method of kafka, so we use Assign instead of Subscribe.
var tps []kafka.TopicPartition
if offset == kafka.OffsetEnd && kc.latestMsgOffset != kafka.OffsetBeginning {
// kafka consumer will start when assign invoked, in order to guarantee the latest message
// position is same with created consumer time, there will use a seek to the latest to
// replace consuming from the latest position.
if err := kc.internalSeek(kc.latestMsgOffset, false); err != nil {
log.Error("kafka consumer subscribe failed ",
zap.String("topic name", kc.topic),
zap.Any("latestMsgOffset", kc.latestMsgOffset),
zap.Any("offset", offset),
zap.Error(err))
panic(err)
}
} else {
tps = []kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}}
if err := kc.c.Assign(tps); err != nil {
log.Error("kafka consumer subscribe failed ",
zap.String("topic name", kc.topic),
zap.Any("latestMsgOffset", kc.latestMsgOffset),
zap.Any("offset", offset),
zap.Error(err))
panic(err)
}
}
log.Debug("starting kafka consume",
zap.String("topic name", kc.topic),
zap.Any("latestMsgOffset", kc.latestMsgOffset),
zap.Any("offset", offset))
}
go func() {
// loop end if consumer is closed
for ev := range kc.c.Events() {
switch e := ev.(type) {
case *kafka.Message:
if kc.skipMsg {
kc.skipMsg = false
continue
for {
select {
case <-kc.closeCh:
log.Info("close consumer ", zap.String("topic", kc.topic), zap.String("groupID", kc.groupID))
start := time.Now()
err := kc.c.Close()
if err != nil {
log.Warn("failed to close ", zap.String("topic", kc.topic), zap.Error(err))
}
kc.msgChannel <- &kafkaMessage{msg: e}
case kafka.Error:
log.Error("consume msg failed", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(e))
if ev.(kafka.Error).IsFatal() {
panic(e)
cost := time.Since(start).Milliseconds()
if cost > 200 {
log.Warn("close consumer costs too long time", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Int64("time(ms)", cost))
}
if kc.msgChannel != nil {
close(kc.msgChannel)
}
return
default:
e, err := kc.c.ReadMessage(30 * time.Second)
if err != nil {
// if we failed to read message in 30 Seconds, print out a warn message since there should always be a tt
log.Warn("consume msg failed", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(err))
} else {
if kc.skipMsg {
kc.skipMsg = false
continue
}
kc.msgChannel <- &kafkaMessage{msg: e}
}
}
}
if kc.msgChannel != nil {
close(kc.msgChannel)
}
}()
kc.hasConsume = true
})
return kc.msgChannel
}
func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error {
if kc.hasAssign {
return errors.New("kafka consumer is already assigned, can not seek again")
}
offset := kafka.Offset(id.(*kafkaID).messageID)
return kc.internalSeek(offset, inclusive)
}
func (kc *Consumer) internalSeek(offset kafka.Offset, inclusive bool) error {
if kc.hasSeek {
return errors.New("unsupported multiple seek with the same kafka consumer")
}
if kc.hasConsume {
return errors.New("unsupported seek after consume message with the same kafka consumer")
}
log.Debug("kafka consumer seek start", zap.String("topic name", kc.topic),
log.Info("kafka consumer seek start", zap.String("topic name", kc.topic),
zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive))
start := time.Now()
err := kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}})
if err != nil {
log.Error("kafka consumer assign failed ", zap.String("topic name", kc.topic), zap.Any("Msg offset", offset), zap.Error(err))
log.Warn("kafka consumer assign failed ", zap.String("topic name", kc.topic), zap.Any("Msg offset", offset), zap.Error(err))
return err
}
cost := time.Since(start).Milliseconds()
if cost > 100 {
log.Debug("kafka consumer assign take too long!", zap.String("topic name", kc.topic),
if cost > 200 {
log.Warn("kafka consumer assign take too long!", zap.String("topic name", kc.topic),
zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost))
}
@ -194,23 +186,24 @@ func (kc *Consumer) internalSeek(offset kafka.Offset, inclusive bool) error {
if err := kc.c.Seek(kafka.TopicPartition{
Topic: &kc.topic,
Partition: mqwrapper.DefaultPartitionIdx,
Offset: offset}, 1000); err != nil {
Offset: offset}, timeout); err != nil {
return err
}
cost = time.Since(start).Milliseconds()
log.Debug("kafka consumer seek finished", zap.String("topic name", kc.topic),
log.Info("kafka consumer seek finished", zap.String("topic name", kc.topic),
zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost))
kc.hasSeek = true
kc.hasAssign = true
return nil
}
func (kc *Consumer) Ack(message mqwrapper.Message) {
kc.c.Commit()
kafkaMsg, _ := message.(*kafkaMessage)
kc.c.CommitMessage(kafkaMsg.msg)
}
func (kc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) {
low, high, err := kc.c.QueryWatermarkOffsets(kc.topic, mqwrapper.DefaultPartitionIdx, -1)
low, high, err := kc.c.QueryWatermarkOffsets(kc.topic, mqwrapper.DefaultPartitionIdx, timeout)
if err != nil {
return nil, err
}
@ -221,17 +214,12 @@ func (kc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) {
high = high - 1
}
log.Debug("get latest msg ID ", zap.Any("topic", kc.topic), zap.Int64("oldest offset", low), zap.Int64("latest offset", high))
log.Info("get latest msg ID ", zap.Any("topic", kc.topic), zap.Int64("oldest offset", low), zap.Int64("latest offset", high))
return &kafkaID{messageID: high}, nil
}
func (kc *Consumer) Close() {
kc.closeOnce.Do(func() {
start := time.Now()
kc.c.Close()
cost := time.Since(start).Milliseconds()
if cost > 200 {
log.Warn("close consumer costs too long time", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Int64("time(ms)", cost))
}
close(kc.closeCh)
})
}

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/stretchr/testify/assert"
)
@ -17,19 +18,19 @@ func TestKafkaConsumer_Subscription(t *testing.T) {
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
config := createConfig(groupID)
kc, err := newKafkaConsumer(config, topic, groupID)
kc, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
defer kc.Close()
assert.Equal(t, kc.Subscription(), groupID)
}
func TestKafkaConsumer_Chan(t *testing.T) {
func TestKafkaConsumer_SeekExclusive(t *testing.T) {
rand.Seed(time.Now().UnixNano())
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
config := createConfig(groupID)
consumer, err := newKafkaConsumer(config, topic, groupID)
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
defer consumer.Close()
@ -47,13 +48,37 @@ func TestKafkaConsumer_Chan(t *testing.T) {
assert.True(t, len(msg.Properties()) == 0)
}
func TestKafkaConsumer_SeekInclusive(t *testing.T) {
rand.Seed(time.Now().UnixNano())
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
config := createConfig(groupID)
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
defer consumer.Close()
data := []int{111, 222, 333}
testKafkaConsumerProduceData(t, topic, data)
msgID := &kafkaID{messageID: 1}
err = consumer.Seek(msgID, true)
assert.Nil(t, err)
msg := <-consumer.Chan()
assert.Equal(t, 222, BytesToInt(msg.Payload()))
assert.Equal(t, int64(1), msg.ID().(*kafkaID).messageID)
assert.Equal(t, topic, msg.Topic())
assert.True(t, len(msg.Properties()) == 0)
}
func TestKafkaConsumer_GetSeek(t *testing.T) {
rand.Seed(time.Now().UnixNano())
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
config := createConfig(groupID)
consumer, err := newKafkaConsumer(config, topic, groupID)
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
defer consumer.Close()
@ -64,13 +89,49 @@ func TestKafkaConsumer_GetSeek(t *testing.T) {
assert.Error(t, consumer.Seek(msgID, false))
}
func TestKafkaConsumer_ChanWithNoAssign(t *testing.T) {
rand.Seed(time.Now().UnixNano())
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
config := createConfig(groupID)
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
defer consumer.Close()
data := []int{111}
testKafkaConsumerProduceData(t, topic, data)
assert.Panics(t, func() {
<-consumer.Chan()
})
}
type mockMsgID struct {
}
func (m2 mockMsgID) AtEarliestPosition() bool {
return false
}
func (m2 mockMsgID) LessOrEqualThan(msgID []byte) (bool, error) {
return false, nil
}
func (m2 mockMsgID) Equal(msgID []byte) (bool, error) {
return false, nil
}
func (m2 mockMsgID) Serialize() []byte {
return nil
}
func TestKafkaConsumer_SeekAfterChan(t *testing.T) {
rand.Seed(time.Now().UnixNano())
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
config := createConfig(groupID)
consumer, err := newKafkaConsumer(config, topic, groupID)
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionEarliest)
assert.NoError(t, err)
defer consumer.Close()
@ -79,9 +140,8 @@ func TestKafkaConsumer_SeekAfterChan(t *testing.T) {
msg := <-consumer.Chan()
assert.Equal(t, 111, BytesToInt(msg.Payload()))
assert.Panics(t, func() {
consumer.Seek(nil, false)
})
err = consumer.Seek(mockMsgID{}, false)
assert.Error(t, err)
}
func TestKafkaConsumer_GetLatestMsgID(t *testing.T) {
@ -90,7 +150,7 @@ func TestKafkaConsumer_GetLatestMsgID(t *testing.T) {
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
config := createConfig(groupID)
consumer, err := newKafkaConsumer(config, topic, groupID)
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
defer consumer.Close()
@ -115,11 +175,9 @@ func TestKafkaConsumer_ConsumeFromLatest(t *testing.T) {
testKafkaConsumerProduceData(t, topic, data)
config := createConfig(groupID)
config.SetKey("auto.offset.reset", "latest")
consumer, err := newKafkaConsumer(config, topic, groupID)
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionLatest)
assert.NoError(t, err)
defer consumer.Close()
data = []int{444, 555}
testKafkaConsumerProduceData(t, topic, data)
@ -129,6 +187,31 @@ func TestKafkaConsumer_ConsumeFromLatest(t *testing.T) {
assert.Equal(t, 555, BytesToInt(msg.Payload()))
}
func TestKafkaConsumer_ConsumeFromEarliest(t *testing.T) {
rand.Seed(time.Now().UnixNano())
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
data := []int{111, 222, 333}
testKafkaConsumerProduceData(t, topic, data)
config := createConfig(groupID)
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionEarliest)
assert.NoError(t, err)
msg := <-consumer.Chan()
assert.Equal(t, 111, BytesToInt(msg.Payload()))
consumer.Ack(msg)
defer consumer.Close()
config = createConfig(groupID)
consumer2, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionEarliest)
assert.NoError(t, err)
msg = <-consumer2.Chan()
assert.Equal(t, 111, BytesToInt(msg.Payload()))
consumer2.Ack(msg)
defer consumer2.Close()
}
func TestKafkaConsumer_createKafkaConsumer(t *testing.T) {
consumer := &Consumer{config: &kafka.ConfigMap{}}
err := consumer.createKafkaConsumer()
@ -150,10 +233,8 @@ func testKafkaConsumerProduceData(t *testing.T, topic string, data []int) {
func createConfig(groupID string) *kafka.ConfigMap {
kafkaAddress := getKafkaBrokerList()
return &kafka.ConfigMap{
"bootstrap.servers": kafkaAddress,
"group.id": groupID,
"auto.offset.reset": "earliest",
"api.version.request": "true",
"go.events.channel.enable": true,
"bootstrap.servers": kafkaAddress,
"group.id": groupID,
"api.version.request": "true",
}
}

View File

@ -67,7 +67,7 @@ func (rc *rmqClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Con
Topic: options.Topic,
SubscriptionName: options.SubscriptionName,
MessageChannel: receiveChannel,
SubscriptionInitialPosition: client.SubscriptionInitialPosition(options.SubscriptionInitialPosition),
SubscriptionInitialPosition: options.SubscriptionInitialPosition,
})
if err != nil {
return nil, err

View File

@ -65,8 +65,7 @@ type MsgStream interface {
Broadcast(*MsgPack) error
BroadcastMark(*MsgPack) (map[string][]MessageID, error)
AsConsumer(channels []string, subName string)
AsConsumerWithPosition(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition)
AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition)
Chan() <-chan *MsgPack
Seek(offset []*MsgPosition) error

View File

@ -256,10 +256,7 @@ func (ms *simpleMockMsgStream) Chan() <-chan *msgstream.MsgPack {
func (ms *simpleMockMsgStream) AsProducer(channels []string) {
}
func (ms *simpleMockMsgStream) AsConsumer(channels []string, subName string) {
}
func (ms *simpleMockMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
func (ms *simpleMockMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
}
func (ms *simpleMockMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 {

View File

@ -197,7 +197,7 @@ func (q *queryNodeFlowGraph) consumeFlowGraph(channel Channel, subName ConsumeSu
if q.dmlStream == nil {
return errors.New("null dml message stream in flow graph")
}
q.dmlStream.AsConsumer([]string{channel}, subName)
q.dmlStream.AsConsumer([]string{channel}, subName, mqwrapper.SubscriptionPositionUnknown)
log.Info("query node flow graph consumes from pChannel",
zap.Any("collectionID", q.collectionID),
zap.Any("channel", channel),
@ -213,7 +213,7 @@ func (q *queryNodeFlowGraph) consumeFlowGraphFromLatest(channel Channel, subName
if q.dmlStream == nil {
return errors.New("null dml message stream in flow graph")
}
q.dmlStream.AsConsumerWithPosition([]string{channel}, subName, mqwrapper.SubscriptionPositionLatest)
q.dmlStream.AsConsumer([]string{channel}, subName, mqwrapper.SubscriptionPositionLatest)
log.Info("query node flow graph consumes from pChannel",
zap.Any("collectionID", q.collectionID),
zap.Any("channel", channel),
@ -225,8 +225,8 @@ func (q *queryNodeFlowGraph) consumeFlowGraphFromLatest(channel Channel, subName
}
// seekQueryNodeFlowGraph would seek by position
func (q *queryNodeFlowGraph) seekQueryNodeFlowGraph(position *internalpb.MsgPosition) error {
q.dmlStream.AsConsumer([]string{position.ChannelName}, position.MsgGroup)
func (q *queryNodeFlowGraph) consumeFlowGraphFromPosition(position *internalpb.MsgPosition) error {
q.dmlStream.AsConsumer([]string{position.ChannelName}, position.MsgGroup, mqwrapper.SubscriptionPositionUnknown)
err := q.dmlStream.Seek([]*internalpb.MsgPosition{position})
ts, _ := tsoutil.ParseTS(position.GetTimestamp())

View File

@ -75,7 +75,7 @@ func TestQueryNodeFlowGraph_seekQueryNodeFlowGraph(t *testing.T) {
MsgGroup: defaultSubName,
Timestamp: 0,
}
err = fg.seekQueryNodeFlowGraph(position)
err = fg.consumeFlowGraphFromPosition(position)
assert.Error(t, err)
fg.close()

View File

@ -1615,16 +1615,6 @@ func checkSearchResult(nq int64, plan *SearchPlan, searchResult *SearchResult) e
return nil
}
func initConsumer(ctx context.Context, queryResultChannel Channel) (msgstream.MsgStream, error) {
stream, err := genQueryMsgStream(ctx)
if err != nil {
return nil, err
}
stream.AsConsumer([]string{queryResultChannel}, defaultSubName)
stream.Start()
return stream, nil
}
func genSimpleSegmentInfo() *querypb.SegmentInfo {
return &querypb.SegmentInfo{
SegmentID: defaultSegmentID,

View File

@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -716,13 +717,18 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
pChannelName := funcutil.ToPhysicalChannel(position.ChannelName)
position.ChannelName = pChannelName
stream.AsConsumer([]string{pChannelName}, fmt.Sprintf("querynode-%d-%d", Params.QueryNodeCfg.GetNodeID(), collectionID))
stream.AsConsumer([]string{pChannelName}, fmt.Sprintf("querynode-%d-%d", Params.QueryNodeCfg.GetNodeID(), collectionID), mqwrapper.SubscriptionPositionUnknown)
// make sure seek position is earlier than
lastMsgID, err := stream.GetLatestMsgID(pChannelName)
if err != nil {
return err
}
reachLatest, _ := lastMsgID.Equal(position.MsgID)
reachLatest, err := lastMsgID.Equal(position.MsgID)
if err != nil {
return err
}
if reachLatest || lastMsgID.AtEarliestPosition() {
log.Info("there is no more delta msg", zap.Int64("Collection ID", collectionID), zap.String("channel", pChannelName))
return nil

View File

@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
@ -836,7 +837,7 @@ type LoadDeleteMsgStream struct {
func (ms *LoadDeleteMsgStream) Close() {
}
func (ms *LoadDeleteMsgStream) AsConsumer(channels []string, subName string) {
func (ms *LoadDeleteMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
}
func (ms *LoadDeleteMsgStream) Chan() <-chan *msgstream.MsgPack {

View File

@ -262,10 +262,13 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) (err error) {
}
}()
// So far, we don't support to enable each node with two different channel
consumeSubName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.GetNodeID())
// group channels by to seeking or consuming
channel2SeekPosition := make(map[string]*internalpb.MsgPosition)
// for channel with no position
channel2AsConsumerPosition := make(map[string]*internalpb.MsgPosition)
for _, info := range w.req.Infos {
if info.SeekPosition == nil || len(info.SeekPosition.MsgID) == 0 {
@ -370,7 +373,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) (err error) {
pos.MsgGroup = consumeSubName
// use pChannel to seek
pos.ChannelName = VPChannels[channel]
err = fg.seekQueryNodeFlowGraph(pos)
err = fg.consumeFlowGraphFromPosition(pos)
if err != nil {
log.Error("msgStream seek failed for dmChannels", zap.Int64("collectionID", collectionID), zap.String("vChannel", channel))
break

View File

@ -209,13 +209,12 @@ type FailMsgStream struct {
errBroadcast bool
}
func (ms *FailMsgStream) Start() {}
func (ms *FailMsgStream) Close() {}
func (ms *FailMsgStream) Chan() <-chan *msgstream.MsgPack { return nil }
func (ms *FailMsgStream) AsProducer(channels []string) {}
func (ms *FailMsgStream) AsConsumer(channels []string, subName string) {}
func (ms *FailMsgStream) AsReader(channels []string, subName string) {}
func (ms *FailMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
func (ms *FailMsgStream) Start() {}
func (ms *FailMsgStream) Close() {}
func (ms *FailMsgStream) Chan() <-chan *msgstream.MsgPack { return nil }
func (ms *FailMsgStream) AsProducer(channels []string) {}
func (ms *FailMsgStream) AsReader(channels []string, subName string) {}
func (ms *FailMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
}
func (ms *FailMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {}
func (ms *FailMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 { return nil }

View File

@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/util/dependency"
)
@ -31,7 +32,7 @@ func TestInputNode(t *testing.T) {
msgStream, _ := factory.NewMsgStream(context.TODO())
channels := []string{"cc"}
msgStream.AsConsumer(channels, "sub")
msgStream.AsConsumer(channels, "sub", mqwrapper.SubscriptionPositionEarliest)
msgStream.Start()
msgPack := generateMsgPack()

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/dependency"
)
@ -61,7 +62,7 @@ func TestNodeCtx_Start(t *testing.T) {
msgStream, _ := factory.NewMsgStream(context.TODO())
channels := []string{"cc"}
msgStream.AsConsumer(channels, "sub")
msgStream.AsConsumer(channels, "sub", mqwrapper.SubscriptionPositionEarliest)
produceStream, _ := factory.NewMsgStream(context.TODO())
produceStream.AsProducer(channels)