mirror of https://github.com/milvus-io/milvus.git
Add pursuit mode for mqtt msgstream (#26098)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/26130/head
parent
2c6c7749e2
commit
fcde5b8d7f
|
@ -37,6 +37,8 @@ import (
|
|||
|
||||
func TestMain(m *testing.M) {
|
||||
paramtable.Init()
|
||||
pt := paramtable.Get()
|
||||
pt.Save(pt.ServiceParam.MQCfg.EnablePursuitMode.Key, "false")
|
||||
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
path := "/tmp/milvus/rdb_data"
|
||||
|
|
|
@ -37,10 +37,11 @@ const (
|
|||
dim = 128
|
||||
)
|
||||
|
||||
var Params paramtable.ComponentParam
|
||||
var Params = paramtable.Get()
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
Params.Init()
|
||||
Params.Save(Params.ServiceParam.MQCfg.EnablePursuitMode.Key, "false")
|
||||
exitCode := m.Run()
|
||||
os.Exit(exitCode)
|
||||
}
|
||||
|
|
|
@ -32,7 +32,9 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -548,6 +550,30 @@ func isDMLMsg(msg TsMsg) bool {
|
|||
return msg.Type() == commonpb.MsgType_Insert || msg.Type() == commonpb.MsgType_Delete
|
||||
}
|
||||
|
||||
func (ms *MqTtMsgStream) continueBuffering(endTs uint64, size uint64) bool {
|
||||
if ms.ctx.Err() != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// first run
|
||||
if endTs == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
// pursuit mode not enabled
|
||||
if !paramtable.Get().ServiceParam.MQCfg.EnablePursuitMode.GetAsBool() {
|
||||
return false
|
||||
}
|
||||
|
||||
// buffer full
|
||||
if size > paramtable.Get().ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64() {
|
||||
return false
|
||||
}
|
||||
|
||||
endTime, _ := tsoutil.ParseTS(endTs)
|
||||
return time.Since(endTime) > paramtable.Get().ServiceParam.MQCfg.PursuitLag.GetAsDuration(time.Second)
|
||||
}
|
||||
|
||||
func (ms *MqTtMsgStream) bufMsgPackToChannel() {
|
||||
ms.closeRWMutex.RLock()
|
||||
defer ms.closeRWMutex.RUnlock()
|
||||
|
@ -567,74 +593,78 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
|
|||
case <-ms.ctx.Done():
|
||||
return
|
||||
default:
|
||||
ms.consumerLock.Lock()
|
||||
|
||||
// wait all channels get ttMsg
|
||||
for _, consumer := range ms.consumers {
|
||||
if !chanTtMsgSync[consumer] {
|
||||
ms.chanWaitGroup.Add(1)
|
||||
go ms.consumeToTtMsg(consumer)
|
||||
}
|
||||
}
|
||||
ms.chanWaitGroup.Wait()
|
||||
|
||||
// block here until all channels reach same timetick
|
||||
currTs, ok := ms.allChanReachSameTtMsg(chanTtMsgSync)
|
||||
if !ok || currTs <= ms.lastTimeStamp {
|
||||
//log.Printf("All timeTick's timestamps are inconsistent")
|
||||
ms.consumerLock.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
timeTickBuf := make([]TsMsg, 0)
|
||||
startMsgPosition := make([]*msgpb.MsgPosition, 0)
|
||||
endMsgPositions := make([]*msgpb.MsgPosition, 0)
|
||||
ms.chanMsgBufMutex.Lock()
|
||||
for consumer, msgs := range ms.chanMsgBuf {
|
||||
if len(msgs) == 0 {
|
||||
var endTs uint64
|
||||
var size uint64
|
||||
|
||||
for ms.continueBuffering(endTs, size) {
|
||||
ms.consumerLock.Lock()
|
||||
// wait all channels get ttMsg
|
||||
for _, consumer := range ms.consumers {
|
||||
if !chanTtMsgSync[consumer] {
|
||||
ms.chanWaitGroup.Add(1)
|
||||
go ms.consumeToTtMsg(consumer)
|
||||
}
|
||||
}
|
||||
ms.chanWaitGroup.Wait()
|
||||
|
||||
// block here until all channels reach same timetick
|
||||
currTs, ok := ms.allChanReachSameTtMsg(chanTtMsgSync)
|
||||
if !ok || currTs <= ms.lastTimeStamp {
|
||||
ms.consumerLock.Unlock()
|
||||
continue
|
||||
}
|
||||
tempBuffer := make([]TsMsg, 0)
|
||||
var timeTickMsg TsMsg
|
||||
for _, v := range msgs {
|
||||
if v.Type() == commonpb.MsgType_TimeTick {
|
||||
timeTickMsg = v
|
||||
endTs = currTs
|
||||
|
||||
ms.chanMsgBufMutex.Lock()
|
||||
for consumer, msgs := range ms.chanMsgBuf {
|
||||
if len(msgs) == 0 {
|
||||
continue
|
||||
}
|
||||
if v.EndTs() <= currTs {
|
||||
timeTickBuf = append(timeTickBuf, v)
|
||||
//log.Debug("pack msg", zap.Uint64("curr", v.EndTs()), zap.Uint64("currTs", currTs))
|
||||
} else {
|
||||
tempBuffer = append(tempBuffer, v)
|
||||
tempBuffer := make([]TsMsg, 0)
|
||||
var timeTickMsg TsMsg
|
||||
for _, v := range msgs {
|
||||
if v.Type() == commonpb.MsgType_TimeTick {
|
||||
timeTickMsg = v
|
||||
continue
|
||||
}
|
||||
if v.EndTs() <= currTs {
|
||||
size += uint64(v.Size())
|
||||
timeTickBuf = append(timeTickBuf, v)
|
||||
} else {
|
||||
tempBuffer = append(tempBuffer, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
ms.chanMsgBuf[consumer] = tempBuffer
|
||||
ms.chanMsgBuf[consumer] = tempBuffer
|
||||
|
||||
startMsgPosition = append(startMsgPosition, proto.Clone(ms.chanMsgPos[consumer]).(*msgpb.MsgPosition))
|
||||
var newPos *msgpb.MsgPosition
|
||||
if len(tempBuffer) > 0 {
|
||||
// if tempBuffer is not empty, use tempBuffer[0] to seek
|
||||
newPos = &msgpb.MsgPosition{
|
||||
ChannelName: tempBuffer[0].Position().ChannelName,
|
||||
MsgID: tempBuffer[0].Position().MsgID,
|
||||
Timestamp: currTs,
|
||||
MsgGroup: consumer.Subscription(),
|
||||
startMsgPosition = append(startMsgPosition, proto.Clone(ms.chanMsgPos[consumer]).(*msgpb.MsgPosition))
|
||||
var newPos *msgpb.MsgPosition
|
||||
if len(tempBuffer) > 0 {
|
||||
// if tempBuffer is not empty, use tempBuffer[0] to seek
|
||||
newPos = &msgpb.MsgPosition{
|
||||
ChannelName: tempBuffer[0].Position().ChannelName,
|
||||
MsgID: tempBuffer[0].Position().MsgID,
|
||||
Timestamp: currTs,
|
||||
MsgGroup: consumer.Subscription(),
|
||||
}
|
||||
endMsgPositions = append(endMsgPositions, newPos)
|
||||
} else if timeTickMsg != nil {
|
||||
// if tempBuffer is empty, use timeTickMsg to seek
|
||||
newPos = &msgpb.MsgPosition{
|
||||
ChannelName: timeTickMsg.Position().ChannelName,
|
||||
MsgID: timeTickMsg.Position().MsgID,
|
||||
Timestamp: currTs,
|
||||
MsgGroup: consumer.Subscription(),
|
||||
}
|
||||
endMsgPositions = append(endMsgPositions, newPos)
|
||||
}
|
||||
endMsgPositions = append(endMsgPositions, newPos)
|
||||
} else if timeTickMsg != nil {
|
||||
// if tempBuffer is empty, use timeTickMsg to seek
|
||||
newPos = &msgpb.MsgPosition{
|
||||
ChannelName: timeTickMsg.Position().ChannelName,
|
||||
MsgID: timeTickMsg.Position().MsgID,
|
||||
Timestamp: currTs,
|
||||
MsgGroup: consumer.Subscription(),
|
||||
}
|
||||
endMsgPositions = append(endMsgPositions, newPos)
|
||||
ms.chanMsgPos[consumer] = newPos
|
||||
}
|
||||
ms.chanMsgPos[consumer] = newPos
|
||||
ms.chanMsgBufMutex.Unlock()
|
||||
ms.consumerLock.Unlock()
|
||||
}
|
||||
ms.chanMsgBufMutex.Unlock()
|
||||
ms.consumerLock.Unlock()
|
||||
|
||||
idset := make(typeutil.UniqueSet)
|
||||
uniqueMsgs := make([]TsMsg, 0, len(timeTickBuf))
|
||||
|
@ -647,21 +677,23 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
|
|||
uniqueMsgs = append(uniqueMsgs, msg)
|
||||
}
|
||||
|
||||
msgPack := MsgPack{
|
||||
BeginTs: ms.lastTimeStamp,
|
||||
EndTs: currTs,
|
||||
Msgs: uniqueMsgs,
|
||||
StartPositions: startMsgPosition,
|
||||
EndPositions: endMsgPositions,
|
||||
}
|
||||
// skip endTs = 0 (no run for ctx error)
|
||||
if endTs > 0 {
|
||||
msgPack := MsgPack{
|
||||
BeginTs: ms.lastTimeStamp,
|
||||
EndTs: endTs,
|
||||
Msgs: uniqueMsgs,
|
||||
StartPositions: startMsgPosition,
|
||||
EndPositions: endMsgPositions,
|
||||
}
|
||||
|
||||
//log.Debug("send msg pack", zap.Int("len", len(msgPack.Msgs)), zap.Uint64("currTs", currTs))
|
||||
select {
|
||||
case ms.receiveBuf <- &msgPack:
|
||||
case <-ms.ctx.Done():
|
||||
return
|
||||
select {
|
||||
case ms.receiveBuf <- &msgPack:
|
||||
case <-ms.ctx.Done():
|
||||
return
|
||||
}
|
||||
ms.lastTimeStamp = endTs
|
||||
}
|
||||
ms.lastTimeStamp = currTs
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import (
|
|||
pulsarwrapper "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/pulsar"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -46,9 +47,10 @@ const (
|
|||
DefaultPulsarNamespace = "default"
|
||||
)
|
||||
|
||||
var Params paramtable.ComponentParam
|
||||
var Params *paramtable.ComponentParam
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
Params = paramtable.Get()
|
||||
Params.Init()
|
||||
mockKafkaCluster, err := kafka.NewMockCluster(1)
|
||||
defer mockKafkaCluster.Close()
|
||||
|
@ -59,6 +61,8 @@ func TestMain(m *testing.M) {
|
|||
}
|
||||
broker := mockKafkaCluster.BootstrapServers()
|
||||
Params.Save("kafka.brokerList", broker)
|
||||
// Disable pursuit mode for unit test by default
|
||||
Params.Save(Params.ServiceParam.MQCfg.EnablePursuitMode.Key, "false")
|
||||
|
||||
exitCode := m.Run()
|
||||
os.Exit(exitCode)
|
||||
|
@ -1395,3 +1399,99 @@ type messageID struct {
|
|||
type iface struct {
|
||||
Type, Data unsafe.Pointer
|
||||
}
|
||||
|
||||
func TestMqttStream_continueBuffering(t *testing.T) {
|
||||
defer Params.Save(Params.ServiceParam.MQCfg.EnablePursuitMode.Key, "false")
|
||||
|
||||
ms := &MqTtMsgStream{
|
||||
mqMsgStream: &mqMsgStream{
|
||||
ctx: context.Background(),
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("disable_pursuit_mode", func(t *testing.T) {
|
||||
Params.Save(Params.ServiceParam.MQCfg.EnablePursuitMode.Key, "false")
|
||||
Params.Save(Params.ServiceParam.MQCfg.PursuitLag.Key, "10")
|
||||
Params.Save(Params.ServiceParam.MQCfg.PursuitBufferSize.Key, "1024")
|
||||
|
||||
type testCase struct {
|
||||
tag string
|
||||
endTs uint64
|
||||
size uint64
|
||||
expect bool
|
||||
}
|
||||
|
||||
currTs := tsoutil.ComposeTSByTime(time.Now(), 0)
|
||||
cases := []testCase{
|
||||
{
|
||||
tag: "first_run",
|
||||
endTs: 0,
|
||||
size: 0,
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
tag: "lag_large",
|
||||
endTs: 1,
|
||||
size: 10,
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
tag: "currTs",
|
||||
endTs: currTs,
|
||||
size: 10,
|
||||
expect: false,
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.tag, func(t *testing.T) {
|
||||
assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("disable_pursuit_mode", func(t *testing.T) {
|
||||
Params.Save(Params.ServiceParam.MQCfg.EnablePursuitMode.Key, "true")
|
||||
Params.Save(Params.ServiceParam.MQCfg.PursuitLag.Key, "10")
|
||||
Params.Save(Params.ServiceParam.MQCfg.PursuitBufferSize.Key, "1024")
|
||||
|
||||
type testCase struct {
|
||||
tag string
|
||||
endTs uint64
|
||||
size uint64
|
||||
expect bool
|
||||
}
|
||||
|
||||
currTs := tsoutil.ComposeTSByTime(time.Now(), 0)
|
||||
cases := []testCase{
|
||||
{
|
||||
tag: "first_run",
|
||||
endTs: 0,
|
||||
size: 0,
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
tag: "lag_large",
|
||||
endTs: 1,
|
||||
size: 10,
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
tag: "currTs",
|
||||
endTs: currTs,
|
||||
size: 10,
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
tag: "large_lag_buffer_full",
|
||||
endTs: 1,
|
||||
size: 2048,
|
||||
expect: false,
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.tag, func(t *testing.T) {
|
||||
assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size))
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -369,7 +369,10 @@ func (p *MetaDBConfig) Init(base *BaseTable) {
|
|||
|
||||
// MQConfig represents the configuration settings for the message queue.
|
||||
type MQConfig struct {
|
||||
Type ParamItem `refreshable:"false"`
|
||||
Type ParamItem `refreshable:"false"`
|
||||
EnablePursuitMode ParamItem `refreshable:"true"`
|
||||
PursuitLag ParamItem `refreshable:"true"`
|
||||
PursuitBufferSize ParamItem `refreshable:"true"`
|
||||
}
|
||||
|
||||
// Init initializes the MQConfig object with a BaseTable.
|
||||
|
@ -383,6 +386,33 @@ Valid values: [default, pulsar, kafka, rocksmq, natsmq]`,
|
|||
Export: true,
|
||||
}
|
||||
p.Type.Init(base.mgr)
|
||||
|
||||
p.EnablePursuitMode = ParamItem{
|
||||
Key: "mq.enablePursuitMode",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: "true",
|
||||
Doc: `Default value: "true"`,
|
||||
Export: true,
|
||||
}
|
||||
p.EnablePursuitMode.Init(base.mgr)
|
||||
|
||||
p.PursuitLag = ParamItem{
|
||||
Key: "mq.pursuitLag",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: "10",
|
||||
Doc: `time tick lag threshold to enter pursuit mode, in seconds`,
|
||||
Export: true,
|
||||
}
|
||||
p.PursuitLag.Init(base.mgr)
|
||||
|
||||
p.PursuitBufferSize = ParamItem{
|
||||
Key: "mq.pursuitBufferSize",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: "8388608", // 8 MB
|
||||
Doc: `pursuit mode buffer size in bytes`,
|
||||
Export: true,
|
||||
}
|
||||
p.PursuitBufferSize.Init(base.mgr)
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////////////////////////////////////////
|
||||
|
|
Loading…
Reference in New Issue