mirror of https://github.com/milvus-io/milvus.git
Remove topic empty restriction when preCreated topic (#27351)
Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>pull/27618/head
parent
11ddb7c794
commit
a9982aa7c6
|
@ -1151,22 +1151,14 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, firstID UniqueI
|
|||
}
|
||||
|
||||
func (rmq *rocksmq) CheckTopicValid(topic string) error {
|
||||
// Check if key exists
|
||||
log := log.With(zap.String("topic", topic))
|
||||
|
||||
_, ok := topicMu.Load(topic)
|
||||
if !ok {
|
||||
return merr.WrapErrMqTopicNotFound(topic, "failed to get topic")
|
||||
}
|
||||
|
||||
latestMsgID, err := rmq.GetLatestMsg(topic)
|
||||
_, err := rmq.GetLatestMsg(topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if latestMsgID != DefaultMessageID {
|
||||
return merr.WrapErrMqTopicNotEmpty(topic, "topic is not empty")
|
||||
}
|
||||
log.Info("created topic is empty")
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1188,7 +1188,7 @@ func TestRocksmq_CheckPreTopicValid(t *testing.T) {
|
|||
assert.Equal(t, true, errors.Is(err, merr.ErrMqTopicNotFound))
|
||||
|
||||
channelName2 := "topic2"
|
||||
// topic is not empty
|
||||
// allow topic is not empty
|
||||
err = rmq.CreateTopic(channelName2)
|
||||
defer rmq.DestroyTopic(channelName2)
|
||||
assert.NoError(t, err)
|
||||
|
@ -1204,7 +1204,7 @@ func TestRocksmq_CheckPreTopicValid(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
err = rmq.CheckTopicValid(channelName2)
|
||||
assert.Equal(t, true, errors.Is(err, merr.ErrMqTopicNotEmpty))
|
||||
assert.NoError(t, err)
|
||||
|
||||
channelName3 := "topic3"
|
||||
// pass
|
||||
|
|
|
@ -187,9 +187,10 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref
|
|||
if params.PreCreatedTopicEnabled.GetAsBool() {
|
||||
subName := fmt.Sprintf("pre-created-topic-check-%s", name)
|
||||
ms.AsConsumer(ctx, []string{name}, subName, mqwrapper.SubscriptionPositionUnknown)
|
||||
// check topic exist and check the existed topic whether empty or not
|
||||
// check if topic is existed
|
||||
// kafka and rmq will err if the topic does not yet exist, pulsar will not
|
||||
// if one of the topics is not empty, panic
|
||||
// allow topics is not empty, for the reason that when restart or upgrade, the topic is not empty
|
||||
// if there are any message that not belong to milvus, will skip it
|
||||
err := ms.CheckTopicValid(name)
|
||||
if err != nil {
|
||||
log.Error("created topic is invaild", zap.String("name", name), zap.Error(err))
|
||||
|
|
|
@ -386,9 +386,11 @@ func (ms *mqMsgStream) receiveMsg(consumer mqwrapper.Consumer) {
|
|||
log.Warn("MqMsgStream get msg whose payload is nil")
|
||||
continue
|
||||
}
|
||||
// not need to check the preCreatedTopic is empty, related issue: https://github.com/milvus-io/milvus/issues/27295
|
||||
// if the message not belong to the topic, will skip it
|
||||
tsMsg, err := ms.getTsMsgFromConsumerMsg(msg)
|
||||
if err != nil {
|
||||
log.Error("Failed to getTsMsgFromConsumerMsg", zap.Error(err))
|
||||
log.Warn("Failed to getTsMsgFromConsumerMsg", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
pos := tsMsg.Position()
|
||||
|
@ -739,9 +741,11 @@ func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqwrapper.Consumer) {
|
|||
log.Warn("MqTtMsgStream get msg whose payload is nil")
|
||||
continue
|
||||
}
|
||||
// not need to check the preCreatedTopic is empty, related issue: https://github.com/milvus-io/milvus/issues/27295
|
||||
// if the message not belong to the topic, will skip it
|
||||
tsMsg, err := ms.getTsMsgFromConsumerMsg(msg)
|
||||
if err != nil {
|
||||
log.Error("Failed to getTsMsgFromConsumerMsg", zap.Error(err))
|
||||
log.Warn("Failed to getTsMsgFromConsumerMsg", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -222,7 +222,7 @@ func (kc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) {
|
|||
}
|
||||
|
||||
func (kc *Consumer) CheckTopicValid(topic string) error {
|
||||
latestMsgID, err := kc.GetLatestMsgID()
|
||||
_, err := kc.GetLatestMsgID()
|
||||
log.With(zap.String("topic", kc.topic))
|
||||
// check topic is existed
|
||||
if err != nil {
|
||||
|
@ -237,12 +237,6 @@ func (kc *Consumer) CheckTopicValid(topic string) error {
|
|||
}
|
||||
}
|
||||
|
||||
// check topic is empty
|
||||
if !latestMsgID.AtEarliestPosition() {
|
||||
return merr.WrapErrMqTopicNotEmpty(topic, "topic is not empty")
|
||||
}
|
||||
log.Info("created topic is empty")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -149,7 +149,6 @@ func (nc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) {
|
|||
|
||||
// CheckTopicValid verifies if the given topic is valid for this consumer.
|
||||
// 1. topic should exist.
|
||||
// 2. topic should be empty.
|
||||
func (nc *Consumer) CheckTopicValid(topic string) error {
|
||||
if err := nc.closed(); err != nil {
|
||||
return err
|
||||
|
@ -162,18 +161,13 @@ func (nc *Consumer) CheckTopicValid(topic string) error {
|
|||
}
|
||||
|
||||
// check if topic valid or exist.
|
||||
streamInfo, err := nc.js.StreamInfo(topic)
|
||||
_, err := nc.js.StreamInfo(topic)
|
||||
if errors.Is(err, nats.ErrStreamNotFound) {
|
||||
return merr.WrapErrMqTopicNotFound(topic, err.Error())
|
||||
} else if err != nil {
|
||||
log.Warn("fail to get stream info of nats", zap.String("topic", nc.topic), zap.Error(err))
|
||||
return errors.Wrap(err, "failed to get stream info of nats jetstream")
|
||||
}
|
||||
|
||||
// check if topic stream is empty.
|
||||
if streamInfo.State.Msgs > 0 {
|
||||
return merr.WrapErrMqTopicNotEmpty(topic, "stream in nats is not empty")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,6 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
)
|
||||
|
||||
func TestNatsConsumer_Subscription(t *testing.T) {
|
||||
|
@ -219,7 +218,7 @@ func TestCheckTopicValid(t *testing.T) {
|
|||
err = consumer.CheckTopicValid("BadTopic")
|
||||
assert.Error(t, err)
|
||||
|
||||
// non empty topic should fail
|
||||
// not empty topic can pass
|
||||
pub, err := client.CreateProducer(mqwrapper.ProducerOptions{
|
||||
Topic: topic,
|
||||
})
|
||||
|
@ -230,7 +229,7 @@ func TestCheckTopicValid(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
err = consumer.CheckTopicValid(topic)
|
||||
assert.ErrorIs(t, err, merr.ErrMqTopicNotEmpty)
|
||||
assert.NoError(t, err)
|
||||
|
||||
consumer.Close()
|
||||
err = consumer.CheckTopicValid(topic)
|
||||
|
|
|
@ -28,7 +28,6 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
)
|
||||
|
||||
|
@ -158,16 +157,11 @@ func (pc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) {
|
|||
}
|
||||
|
||||
func (pc *Consumer) CheckTopicValid(topic string) error {
|
||||
latestMsgID, err := pc.GetLatestMsgID()
|
||||
_, err := pc.GetLatestMsgID()
|
||||
// Pulsar creates that topic under the namespace provided in the topic name automatically
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !latestMsgID.AtEarliestPosition() {
|
||||
return merr.WrapErrMqTopicNotEmpty(topic, "topic is not empty")
|
||||
}
|
||||
log.Info("created topic is empty", zap.String("topic", topic))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue