fix: long buffering causes mq to be unable to receive messages. (#36420)

- issue: #36397

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/36406/head
SimFG 2024-09-23 16:33:18 +08:00 committed by GitHub
parent 1fb8b46db0
commit c50fe71163
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 50 additions and 22 deletions

View File

@ -161,6 +161,7 @@ mq:
enablePursuitMode: true # Default value: "true" enablePursuitMode: true # Default value: "true"
pursuitLag: 10 # time tick lag threshold to enter pursuit mode, in seconds pursuitLag: 10 # time tick lag threshold to enter pursuit mode, in seconds
pursuitBufferSize: 8388608 # pursuit mode buffer size in bytes pursuitBufferSize: 8388608 # pursuit mode buffer size in bytes
pursuitBufferTime: 60 # pursuit mode buffer time in seconds
mqBufSize: 16 # MQ client consumer buffer length mqBufSize: 16 # MQ client consumer buffer length
dispatcher: dispatcher:
mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge

View File

@ -23,14 +23,13 @@ import (
"sort" "sort"
"time" "time"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/samber/lo" "github.com/samber/lo"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/internal/util/importutilv2"

View File

@ -629,7 +629,7 @@ func isDMLMsg(msg TsMsg) bool {
return msg.Type() == commonpb.MsgType_Insert || msg.Type() == commonpb.MsgType_Delete return msg.Type() == commonpb.MsgType_Insert || msg.Type() == commonpb.MsgType_Delete
} }
func (ms *MqTtMsgStream) continueBuffering(endTs uint64, size uint64) bool { func (ms *MqTtMsgStream) continueBuffering(endTs, size uint64, startTime time.Time) bool {
if ms.ctx.Err() != nil { if ms.ctx.Err() != nil {
return false return false
} }
@ -649,6 +649,10 @@ func (ms *MqTtMsgStream) continueBuffering(endTs uint64, size uint64) bool {
return false return false
} }
if time.Since(startTime) > paramtable.Get().ServiceParam.MQCfg.PursuitBufferTime.GetAsDuration(time.Second) {
return false
}
endTime, _ := tsoutil.ParseTS(endTs) endTime, _ := tsoutil.ParseTS(endTs)
return time.Since(endTime) > paramtable.Get().ServiceParam.MQCfg.PursuitLag.GetAsDuration(time.Second) return time.Since(endTime) > paramtable.Get().ServiceParam.MQCfg.PursuitLag.GetAsDuration(time.Second)
} }
@ -677,10 +681,11 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
// endMsgPositions := make([]*msgpb.MsgPosition, 0) // endMsgPositions := make([]*msgpb.MsgPosition, 0)
startPositions := make(map[string]*msgpb.MsgPosition) startPositions := make(map[string]*msgpb.MsgPosition)
endPositions := make(map[string]*msgpb.MsgPosition) endPositions := make(map[string]*msgpb.MsgPosition)
startBufTime := time.Now()
var endTs uint64 var endTs uint64
var size uint64 var size uint64
for ms.continueBuffering(endTs, size) { for ms.continueBuffering(endTs, size, startBufTime) {
ms.consumerLock.Lock() ms.consumerLock.Lock()
// wait all channels get ttMsg // wait all channels get ttMsg
for _, consumer := range ms.consumers { for _, consumer := range ms.consumers {

View File

@ -1547,6 +1547,7 @@ func TestMqttStream_continueBuffering(t *testing.T) {
endTs uint64 endTs uint64
size uint64 size uint64
expect bool expect bool
startTime time.Time
} }
currTs := tsoutil.ComposeTSByTime(time.Now(), 0) currTs := tsoutil.ComposeTSByTime(time.Now(), 0)
@ -1556,23 +1557,33 @@ func TestMqttStream_continueBuffering(t *testing.T) {
endTs: 0, endTs: 0,
size: 0, size: 0,
expect: true, expect: true,
startTime: time.Now(),
}, },
{ {
tag: "lag_large", tag: "lag_large",
endTs: 1, endTs: 1,
size: 10, size: 10,
expect: false, expect: false,
startTime: time.Now(),
}, },
{ {
tag: "currTs", tag: "currTs",
endTs: currTs, endTs: currTs,
size: 10, size: 10,
expect: false, expect: false,
startTime: time.Now(),
},
{
tag: "bufferTs",
endTs: 10,
size: 0,
expect: false,
startTime: time.Now().Add(-time.Hour),
}, },
} }
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.tag, func(t *testing.T) { t.Run(tc.tag, func(t *testing.T) {
assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size)) assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size, tc.startTime))
}) })
} }
}) })
@ -1618,7 +1629,7 @@ func TestMqttStream_continueBuffering(t *testing.T) {
} }
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.tag, func(t *testing.T) { t.Run(tc.tag, func(t *testing.T) {
assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size)) assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size, time.Now()))
}) })
} }
}) })

View File

@ -645,6 +645,8 @@ func TestCachedParam(t *testing.T) {
assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64()) assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64())
assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64()) assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64())
assert.Equal(t, 60, params.ServiceParam.MQCfg.PursuitBufferTime.GetAsInt())
assert.Equal(t, int64(1024), params.DataCoordCfg.SegmentMaxSize.GetAsInt64()) assert.Equal(t, int64(1024), params.DataCoordCfg.SegmentMaxSize.GetAsInt64())
assert.Equal(t, int64(1024), params.DataCoordCfg.SegmentMaxSize.GetAsInt64()) assert.Equal(t, int64(1024), params.DataCoordCfg.SegmentMaxSize.GetAsInt64())

View File

@ -484,6 +484,7 @@ type MQConfig struct {
EnablePursuitMode ParamItem `refreshable:"true"` EnablePursuitMode ParamItem `refreshable:"true"`
PursuitLag ParamItem `refreshable:"true"` PursuitLag ParamItem `refreshable:"true"`
PursuitBufferSize ParamItem `refreshable:"true"` PursuitBufferSize ParamItem `refreshable:"true"`
PursuitBufferTime ParamItem `refreshable:"true"`
MQBufSize ParamItem `refreshable:"false"` MQBufSize ParamItem `refreshable:"false"`
ReceiveBufSize ParamItem `refreshable:"false"` ReceiveBufSize ParamItem `refreshable:"false"`
@ -561,6 +562,15 @@ Valid values: [default, pulsar, kafka, rocksmq, natsmq]`,
} }
p.PursuitBufferSize.Init(base.mgr) p.PursuitBufferSize.Init(base.mgr)
p.PursuitBufferTime = ParamItem{
Key: "mq.pursuitBufferTime",
Version: "2.4.12",
DefaultValue: "60", // 60 s
Doc: `pursuit mode buffer time in seconds`,
Export: true,
}
p.PursuitBufferTime.Init(base.mgr)
p.MQBufSize = ParamItem{ p.MQBufSize = ParamItem{
Key: "mq.mqBufSize", Key: "mq.mqBufSize",
Version: "2.3.0", Version: "2.3.0",