mirror of https://github.com/milvus-io/milvus.git
Fix broken kafka UT (#20743)
Signed-off-by: yun.zhang <yun.zhang@zilliz.com> Signed-off-by: yun.zhang <yun.zhang@zilliz.com>pull/20727/head
parent
234a4577cf
commit
c761de19bc
|
@ -10,6 +10,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/config"
|
||||||
|
|
||||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/common"
|
"github.com/milvus-io/milvus/internal/common"
|
||||||
|
@ -102,7 +104,7 @@ func Consume2(ctx context.Context, t *testing.T, kc *kafkaClient, topic string,
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
SubscriptionName: subName,
|
SubscriptionName: subName,
|
||||||
BufSize: 1024,
|
BufSize: 1024,
|
||||||
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
|
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionUnknown,
|
||||||
})
|
})
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.NotNil(t, consumer)
|
assert.NotNil(t, consumer)
|
||||||
|
@ -221,7 +223,7 @@ func TestKafkaClient_SeekPosition(t *testing.T) {
|
||||||
data := []int{1, 2, 3}
|
data := []int{1, 2, 3}
|
||||||
ids := produceData(ctx, t, producer, data)
|
ids := produceData(ctx, t, producer, data)
|
||||||
|
|
||||||
consumer := createConsumer(t, kc, topic, subName, mqwrapper.SubscriptionPositionLatest)
|
consumer := createConsumer(t, kc, topic, subName, mqwrapper.SubscriptionPositionUnknown)
|
||||||
defer consumer.Close()
|
defer consumer.Close()
|
||||||
|
|
||||||
err := consumer.Seek(ids[2], true)
|
err := consumer.Seek(ids[2], true)
|
||||||
|
@ -297,60 +299,56 @@ func TestKafkaClient_MsgSerializAndDeserialize(t *testing.T) {
|
||||||
assert.Nil(t, msgID)
|
assert.Nil(t, msgID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createParamItem(v string) paramtable.ParamItem {
|
||||||
|
item := paramtable.ParamItem{
|
||||||
|
Formatter: func(originValue string) string { return v },
|
||||||
|
}
|
||||||
|
item.Init(&config.Manager{})
|
||||||
|
return item
|
||||||
|
}
|
||||||
|
|
||||||
func TestKafkaClient_NewKafkaClientInstanceWithConfig(t *testing.T) {
|
func TestKafkaClient_NewKafkaClientInstanceWithConfig(t *testing.T) {
|
||||||
config1 := ¶mtable.KafkaConfig{
|
config1 := ¶mtable.KafkaConfig{
|
||||||
Address: paramtable.ParamItem{Formatter: func(originValue string) string { return "addr" }},
|
Address: createParamItem("addr"),
|
||||||
SaslPassword: paramtable.ParamItem{Formatter: func(originValue string) string { return "password" }},
|
SaslPassword: createParamItem("password"),
|
||||||
}
|
}
|
||||||
assert.Panics(t, func() { NewKafkaClientInstanceWithConfig(config1) })
|
assert.Panics(t, func() { NewKafkaClientInstanceWithConfig(config1) })
|
||||||
|
|
||||||
config2 := ¶mtable.KafkaConfig{
|
config2 := ¶mtable.KafkaConfig{
|
||||||
Address: paramtable.ParamItem{Formatter: func(originValue string) string { return "addr" }},
|
Address: createParamItem("addr"),
|
||||||
SaslUsername: paramtable.ParamItem{Formatter: func(originValue string) string { return "username" }},
|
SaslUsername: createParamItem("username"),
|
||||||
}
|
}
|
||||||
assert.Panics(t, func() { NewKafkaClientInstanceWithConfig(config2) })
|
assert.Panics(t, func() { NewKafkaClientInstanceWithConfig(config2) })
|
||||||
|
|
||||||
config3 := ¶mtable.KafkaConfig{
|
producerConfig := make(map[string]string)
|
||||||
Address: paramtable.ParamItem{Formatter: func(originValue string) string { return "addr" }},
|
producerConfig["client.id"] = "dc1"
|
||||||
SaslUsername: paramtable.ParamItem{Formatter: func(originValue string) string { return "username" }},
|
consumerConfig := make(map[string]string)
|
||||||
SaslPassword: paramtable.ParamItem{Formatter: func(originValue string) string { return "password" }},
|
consumerConfig["client.id"] = "dc"
|
||||||
|
|
||||||
|
config := ¶mtable.KafkaConfig{
|
||||||
|
Address: createParamItem("addr"),
|
||||||
|
SaslUsername: createParamItem("username"),
|
||||||
|
SaslPassword: createParamItem("password"),
|
||||||
|
SaslMechanisms: createParamItem("sasl"),
|
||||||
|
SecurityProtocol: createParamItem("plain"),
|
||||||
|
ConsumerExtraConfig: paramtable.ParamGroup{GetFunc: func() map[string]string { return consumerConfig }},
|
||||||
|
ProducerExtraConfig: paramtable.ParamGroup{GetFunc: func() map[string]string { return producerConfig }},
|
||||||
}
|
}
|
||||||
client := NewKafkaClientInstanceWithConfig(config3)
|
client := NewKafkaClientInstanceWithConfig(config)
|
||||||
assert.NotNil(t, client)
|
assert.NotNil(t, client)
|
||||||
assert.NotNil(t, client.basicConfig)
|
assert.NotNil(t, client.basicConfig)
|
||||||
|
|
||||||
consumerConfig := make(map[string]string)
|
assert.Equal(t, "dc", client.consumerConfig["client.id"])
|
||||||
consumerConfig["client.id"] = "dc"
|
newConsumerConfig := client.newConsumerConfig("test", 0)
|
||||||
config4 := ¶mtable.KafkaConfig{
|
|
||||||
Address: paramtable.ParamItem{Formatter: func(originValue string) string { return "addr" }},
|
|
||||||
SaslUsername: paramtable.ParamItem{Formatter: func(originValue string) string { return "username" }},
|
|
||||||
SaslPassword: paramtable.ParamItem{Formatter: func(originValue string) string { return "password" }},
|
|
||||||
ConsumerExtraConfig: paramtable.ParamGroup{GetFunc: func() map[string]string { return consumerConfig }},
|
|
||||||
}
|
|
||||||
client4 := NewKafkaClientInstanceWithConfig(config4)
|
|
||||||
assert.Equal(t, "dc", client4.consumerConfig["client.id"])
|
|
||||||
|
|
||||||
newConsumerConfig := client4.newConsumerConfig("test", 0)
|
|
||||||
clientID, err := newConsumerConfig.Get("client.id", "")
|
clientID, err := newConsumerConfig.Get("client.id", "")
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.Equal(t, "dc", clientID)
|
assert.Equal(t, "dc", clientID)
|
||||||
|
|
||||||
producerConfig := make(map[string]string)
|
assert.Equal(t, "dc1", client.producerConfig["client.id"])
|
||||||
producerConfig["client.id"] = "dc1"
|
newProducerConfig := client.newProducerConfig()
|
||||||
config5 := ¶mtable.KafkaConfig{
|
|
||||||
Address: paramtable.ParamItem{Formatter: func(originValue string) string { return "addr" }},
|
|
||||||
SaslUsername: paramtable.ParamItem{Formatter: func(originValue string) string { return "username" }},
|
|
||||||
SaslPassword: paramtable.ParamItem{Formatter: func(originValue string) string { return "password" }},
|
|
||||||
ProducerExtraConfig: paramtable.ParamGroup{GetFunc: func() map[string]string { return producerConfig }},
|
|
||||||
}
|
|
||||||
client5 := NewKafkaClientInstanceWithConfig(config5)
|
|
||||||
assert.Equal(t, "dc1", client5.producerConfig["client.id"])
|
|
||||||
|
|
||||||
newProducerConfig := client5.newProducerConfig()
|
|
||||||
pClientID, err := newProducerConfig.Get("client.id", "")
|
pClientID, err := newProducerConfig.Get("client.id", "")
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.Equal(t, pClientID, "dc1")
|
assert.Equal(t, pClientID, "dc1")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func createKafkaClient(t *testing.T) *kafkaClient {
|
func createKafkaClient(t *testing.T) *kafkaClient {
|
||||||
|
|
|
@ -45,7 +45,7 @@ func TestKafkaID_LessOrEqualThan(t *testing.T) {
|
||||||
|
|
||||||
func TestKafkaID_Equal(t *testing.T) {
|
func TestKafkaID_Equal(t *testing.T) {
|
||||||
rid1 := &kafkaID{messageID: 0}
|
rid1 := &kafkaID{messageID: 0}
|
||||||
rid2 := &kafkaID{messageID: 0}
|
rid2 := &kafkaID{messageID: 1}
|
||||||
|
|
||||||
{
|
{
|
||||||
ret, err := rid1.Equal(rid1.Serialize())
|
ret, err := rid1.Equal(rid1.Serialize())
|
||||||
|
|
Loading…
Reference in New Issue