fix: skip consuming from streaming service message (#40879)

issue: #40532
pr: #40877

Signed-off-by: chyezh <chyezh@outlook.com>
pull/40867/head
Zhen Ye 2025-03-25 15:22:23 +08:00 committed by GitHub
parent ec0c25aaec
commit 53438c751d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 31 additions and 0 deletions

View File

@ -39,6 +39,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/mq/common"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/retry"
@ -465,6 +466,11 @@ func (ms *mqMsgStream) receiveMsg(consumer mqwrapper.Consumer) {
log.Ctx(ms.ctx).Warn("MqMsgStream get msg whose payload is nil")
continue
}
if message.CheckIfMessageFromStreaming(msg.Properties()) {
log.Ctx(ms.ctx).Warn("MqMsgStream can not consume the message from streaming service")
continue
}
// not need to check the preCreatedTopic is empty, related issue: https://github.com/milvus-io/milvus/issues/27295
// if the message not belong to the topic, will skip it
tsMsg, err := ms.getTsMsgFromConsumerMsg(msg)
@ -841,6 +847,11 @@ func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqwrapper.Consumer) {
log.Warn("MqTtMsgStream get msg whose payload is nil")
continue
}
if message.CheckIfMessageFromStreaming(msg.Properties()) {
log.Warn("MqTtMsgStream can not consume the message from streaming service")
continue
}
// not need to check the preCreatedTopic is empty, related issue: https://github.com/milvus-io/milvus/issues/27295
// if the message not belong to the topic, will skip it
tsMsg, err := ms.getTsMsgFromConsumerMsg(msg)

View File

@ -34,3 +34,12 @@ func TestVersion(t *testing.T) {
assert.True(t, VersionV1.GT(VersionOld))
assert.True(t, VersionV2.GT(VersionV1))
}
// TestCheckIfMessageFromStreaming tests CheckIfMessageFromStreaming function.
func TestCheckIfMessageFromStreaming(t *testing.T) {
assert.False(t, CheckIfMessageFromStreaming(nil))
assert.False(t, CheckIfMessageFromStreaming(map[string]string{}))
assert.True(t, CheckIfMessageFromStreaming(map[string]string{
messageVersion: "1",
}))
}

View File

@ -73,3 +73,14 @@ func (prop propertiesImpl) EstimateSize() int {
}
return size
}
// CheckIfMessageFromStreaming checks if the message is from streaming.
func CheckIfMessageFromStreaming(props map[string]string) bool {
if props == nil {
return false
}
if props[messageVersion] != "" {
return true
}
return false
}