Refine rocksmq (#25031)

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/25706/head
aoiasd 2023-07-18 20:18:57 +08:00 committed by GitHub
parent 079cd9dc70
commit 3545b1a608
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 144 additions and 44 deletions

View File

@ -124,6 +124,10 @@ rocksmq:
retentionTimeInMinutes: 4320 # 3 days, 3 * 24 * 60 minutes, The retention time of the message in rocksmq.
retentionSizeInMB: 8192 # 8 GB, 8 * 1024 MB, The retention size of the message in rocksmq.
compactionInterval: 86400 # 1 day, trigger rocksdb compaction every day to remove deleted data
# compaction compression type, only support use 0,7.
# 0 means not compress, 7 will use zstd
# len of types means num of rocksdb level.
compressionTypes: [0, 0, 7, 7, 7]
# natsmq configuration.
natsmq:

View File

@ -136,27 +136,25 @@ func (c *client) consume(consumer *consumer) {
if !ok {
return
}
c.deliver(consumer, 100)
c.deliver(consumer)
case _, ok := <-consumer.MsgMutex():
if !ok {
// consumer MsgMutex closed, goroutine exit
log.Debug("Consumer MsgMutex closed")
return
}
c.deliver(consumer, 100)
c.deliver(consumer)
}
}
}
func (c *client) deliver(consumer *consumer, batchMax int) {
func (c *client) deliver(consumer *consumer) {
for {
n := cap(consumer.messageCh) - len(consumer.messageCh)
if n == 0 {
return
}
if n > batchMax { // batch min size
n = batchMax
}
msgs, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, n)
if err != nil {
log.Warn("Consumer's goroutine cannot consume from (" + consumer.topic + "," + consumer.consumerName + "): " + err.Error())

View File

@ -23,6 +23,7 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/tecbot/gorocksdb"
"go.uber.org/zap"
@ -120,6 +121,7 @@ type rocksmq struct {
kv kv.BaseKV
idAllocator allocator.Interface
storeMu *sync.Mutex
topicLastID sync.Map
consumers sync.Map
consumersID sync.Map
@ -128,11 +130,31 @@ type rocksmq struct {
state RmqState
}
func parseCompressionType(params *paramtable.ComponentParam) ([]gorocksdb.CompressionType, error) {
var tError error
validType := []int{0, 7}
return lo.Map(params.RocksmqCfg.CompressionTypes.GetAsStrings(), func(sType string, _ int) gorocksdb.CompressionType {
iType, err := strconv.Atoi(sType)
if err != nil {
tError = fmt.Errorf("invalid rocksmq compression type: %s", err.Error())
return 0
}
if !lo.Contains(validType, iType) {
tError = fmt.Errorf("invalid rocksmq compression type, should in %v", validType)
return 0
}
return gorocksdb.CompressionType(iType)
}), tError
}
// NewRocksMQ step:
// 1. New rocksmq instance based on rocksdb with name and rocksdbkv with kvname
// 2. Init retention info, load retention info to memory
// 3. Start retention goroutine
func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error) {
params := paramtable.Get()
// TODO we should use same rocksdb instance with different cfs
maxProcs := runtime.GOMAXPROCS(0)
parallelism := 1
@ -145,7 +167,6 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error)
// default rocks db cache is set with memory
rocksDBLRUCacheCapacity := RocksDBLRUCacheMinCapacity
if memoryCount > 0 {
params := paramtable.Get()
ratio := params.RocksmqCfg.LRUCacheRatio.GetAsFloat()
calculatedCapacity := uint64(float64(memoryCount) * ratio)
if calculatedCapacity < RocksDBLRUCacheMinCapacity {
@ -162,11 +183,16 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error)
bbto.SetBlockSize(64 << 10)
bbto.SetBlockCache(gorocksdb.NewLRUCache(rocksDBLRUCacheCapacity))
compressionTypes, err := parseCompressionType(params)
if err != nil {
return nil, err
}
optsKV := gorocksdb.NewDefaultOptions()
// L0:No Compression
// L1,L2: ZSTD
optsKV.SetNumLevels(3)
optsKV.SetCompressionPerLevel([]gorocksdb.CompressionType{0, 7, 7})
optsKV.SetNumLevels(len(compressionTypes))
optsKV.SetCompressionPerLevel(compressionTypes)
optsKV.SetBlockBasedTableFactory(bbto)
optsKV.SetTargetFileSizeMultiplier(2)
optsKV.SetCreateIfMissing(true)
@ -186,8 +212,8 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error)
// finish rocks mq store initialization, rocks mq store has to set the prefix extractor
optsStore := gorocksdb.NewDefaultOptions()
// share block cache with kv
optsKV.SetNumLevels(3)
optsStore.SetCompressionPerLevel([]gorocksdb.CompressionType{0, 7, 7})
optsStore.SetNumLevels(len(compressionTypes))
optsStore.SetCompressionPerLevel(compressionTypes)
optsStore.SetBlockBasedTableFactory(bbto)
optsStore.SetTargetFileSizeMultiplier(2)
optsStore.SetCreateIfMissing(true)
@ -222,6 +248,7 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error)
storeMu: &sync.Mutex{},
consumers: sync.Map{},
readers: sync.Map{},
topicLastID: sync.Map{},
}
ri, err := initRetentionInfo(kv, db)
@ -299,14 +326,13 @@ func (rmq *rocksmq) Info() bool {
minConsumerPosition := UniqueID(-1)
minConsumerGroupName := ""
for _, consumer := range consumerList {
consumerKey := constructCurrentID(consumer.Topic, consumer.GroupName)
consumerPosition, ok := rmq.consumersID.Load(consumerKey)
consumerPosition, ok := rmq.getCurrentID(consumer.Topic, consumer.GroupName)
if !ok {
log.Error("some group not regist", zap.String("topic", consumer.Topic), zap.String("groupName", consumer.GroupName))
continue
}
if minConsumerPosition == UniqueID(-1) || consumerPosition.(UniqueID) < minConsumerPosition {
minConsumerPosition = consumerPosition.(UniqueID)
if minConsumerPosition == UniqueID(-1) || consumerPosition < minConsumerPosition {
minConsumerPosition = consumerPosition
minConsumerGroupName = consumer.GroupName
}
}
@ -658,6 +684,8 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
zap.Int64("produce total elapse", getProduceTime),
)
}
rmq.topicLastID.Store(topicName, msgIDs[len(msgIDs)-1])
return msgIDs, nil
}
@ -697,6 +725,22 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes
return err
}
func (rmq *rocksmq) getCurrentID(topicName, groupName string) (int64, bool) {
currentID, ok := rmq.consumersID.Load(constructCurrentID(topicName, groupName))
if !ok {
return 0, false
}
return currentID.(int64), true
}
func (rmq *rocksmq) getLastID(topicName string) (int64, bool) {
currentID, ok := rmq.consumersID.Load(topicName)
if !ok {
return 0, false
}
return currentID.(int64), true
}
// Consume steps:
// 1. Consume n messages from rocksdb
// 2. Update current_id to the last consumed message
@ -710,20 +754,26 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
if !ok {
return nil, fmt.Errorf("topic name = %s not exist", topicName)
}
lock, ok := ll.(*sync.Mutex)
if !ok {
return nil, fmt.Errorf("get mutex failed, topic name = %s", topicName)
}
lock.Lock()
defer lock.Unlock()
getLockTime := time.Since(start).Milliseconds()
metaKey := constructCurrentID(topicName, groupName)
currentID, ok := rmq.consumersID.Load(metaKey)
currentID, ok := rmq.getCurrentID(topicName, groupName)
if !ok {
return nil, fmt.Errorf("currentID of topicName=%s, groupName=%s not exist", topicName, groupName)
}
// return if don't have new message
lastID, ok := rmq.getLastID(topicName)
if ok && currentID > lastID {
return []ConsumerMessage{}, nil
}
getLockTime := time.Since(start).Milliseconds()
readOpts := gorocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
prefix := topicName + "/"
@ -734,7 +784,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
if currentID == DefaultMessageID {
dataKey = prefix
} else {
dataKey = path.Join(topicName, strconv.FormatInt(currentID.(int64), 10))
dataKey = path.Join(topicName, strconv.FormatInt(currentID, 10))
}
iter.Seek([]byte(dataKey))
consumerMessage := make([]ConsumerMessage, 0, n)
@ -844,27 +894,26 @@ func (rmq *rocksmq) seek(topicName string, groupName string, msgID UniqueID) err
}
func (rmq *rocksmq) moveConsumePos(topicName string, groupName string, msgID UniqueID) error {
key := constructCurrentID(topicName, groupName)
oldPos, ok := rmq.consumersID.Load(key)
oldPos, ok := rmq.getCurrentID(topicName, groupName)
if !ok {
return errors.New("move unknown consumer")
}
if msgID < oldPos.(UniqueID) {
if msgID < oldPos {
log.Warn("RocksMQ: trying to move Consume position backward",
zap.String("key", key), zap.Int64("oldPos", oldPos.(UniqueID)), zap.Int64("newPos", msgID))
zap.String("topic", topicName), zap.String("group", groupName), zap.Int64("oldPos", oldPos), zap.Int64("newPos", msgID))
panic("move consume position backward")
}
//update ack if position move forward
err := rmq.updateAckedInfo(topicName, groupName, oldPos.(UniqueID), msgID-1)
err := rmq.updateAckedInfo(topicName, groupName, oldPos, msgID-1)
if err != nil {
log.Warn("failed to update acked info ", zap.String("topic", topicName),
zap.String("groupName", groupName), zap.Error(err))
return err
}
rmq.consumersID.Store(key, msgID)
rmq.consumersID.Store(constructCurrentID(topicName, groupName), msgID)
return nil
}
@ -876,7 +925,7 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err
/* Step I: Check if key exists */
ll, ok := topicMu.Load(topicName)
if !ok {
return fmt.Errorf("Topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist)
return fmt.Errorf("topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist)
}
lock, ok := ll.(*sync.Mutex)
if !ok {
@ -902,7 +951,7 @@ func (rmq *rocksmq) ForceSeek(topicName string, groupName string, msgID UniqueID
/* Step I: Check if key exists */
ll, ok := topicMu.Load(topicName)
if !ok {
return fmt.Errorf("Topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist)
return fmt.Errorf("topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist)
}
lock, ok := ll.(*sync.Mutex)
if !ok {
@ -1058,13 +1107,12 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, firstID UniqueI
var minBeginID UniqueID = lastID
for _, consumer := range consumers {
if consumer.GroupName != groupName {
key := constructCurrentID(consumer.Topic, consumer.GroupName)
beginID, ok := rmq.consumersID.Load(key)
beginID, ok := rmq.getCurrentID(consumer.Topic, consumer.GroupName)
if !ok {
return fmt.Errorf("currentID of topicName=%s, groupName=%s not exist", consumer.Topic, consumer.GroupName)
}
if beginID.(UniqueID) < minBeginID {
minBeginID = beginID.(UniqueID)
if beginID < minBeginID {
minBeginID = beginID
}
}
}

View File

@ -261,16 +261,6 @@ func TestRocksmq_Basic(t *testing.T) {
_, err = rmq.Produce(channelName, pMsgs)
assert.NoError(t, err)
// before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq
// it aims to test if produce before 2.2.0, but consume after 2.2.0
msgD := "d_message"
tMsgs := make([]producerMessageBefore, 1)
tMsgD := producerMessageBefore{Payload: []byte(msgD)}
tMsgs[0] = tMsgD
_, err = rmq.produceBefore(channelName, tMsgs)
assert.NoError(t, err)
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(channelName, groupName)
err = rmq.CreateConsumerGroup(channelName, groupName)
@ -297,12 +287,47 @@ func TestRocksmq_Basic(t *testing.T) {
_, ok = cMsgs[1].Properties[common.TraceIDKey]
assert.True(t, ok)
assert.Equal(t, cMsgs[1].Properties[common.TraceIDKey], "c")
}
cMsgs, err = rmq.Consume(channelName, groupName, 1)
func TestRocksmq_Compatibility(t *testing.T) {
suffix := "rmq_compatibility"
kvPath := rmqPath + kvPathSuffix + suffix
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := rmqPath + suffix
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)
paramtable.Init()
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
channelName := "channel_rocks"
err = rmq.CreateTopic(channelName)
assert.NoError(t, err)
defer rmq.DestroyTopic(channelName)
// before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq
// it aims to test if produce before 2.2.0, but consume after 2.2.0
msgD := "d_message"
tMsgs := make([]producerMessageBefore, 1)
tMsgD := producerMessageBefore{Payload: []byte(msgD)}
tMsgs[0] = tMsgD
_, err = rmq.produceBefore(channelName, tMsgs)
assert.NoError(t, err)
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(channelName, groupName)
err = rmq.CreateConsumerGroup(channelName, groupName)
assert.NoError(t, err)
cMsgs, err := rmq.Consume(channelName, groupName, 1)
assert.NoError(t, err)
assert.Equal(t, len(cMsgs), 1)
assert.Equal(t, string(cMsgs[0].Payload), "d_message")
_, ok = cMsgs[0].Properties[common.TraceIDKey]
_, ok := cMsgs[0].Properties[common.TraceIDKey]
assert.False(t, ok)
// it will be set empty map if produce message has no properties field
expect := make(map[string]string)
@ -576,7 +601,7 @@ func TestRocksmq_Goroutines(t *testing.T) {
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq_2"
name := "/tmp/rocksmq_goroutines"
defer os.RemoveAll(name)
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
@ -1275,3 +1300,16 @@ func TestRocksmq_Info(t *testing.T) {
rmq.kv = &rocksdbkv.RocksdbKV{}
assert.False(t, rmq.Info())
}
func TestRocksmq_ParseCompressionTypeError(t *testing.T) {
params := paramtable.Get()
params.Init()
params.Save(params.RocksmqCfg.CompressionTypes.Key, "invalid,1")
_, err := parseCompressionType(params)
assert.Error(t, err)
params.Save(params.RocksmqCfg.CompressionTypes.Key, "-1,-1")
defer params.Save(params.RocksmqCfg.CompressionTypes.Key, "0,0,7")
_, err = parseCompressionType(params)
assert.Error(t, err)
}

View File

@ -588,6 +588,11 @@ type RocksmqConfig struct {
CompactionInterval ParamItem `refreshable:"false"`
// TickerTimeInSeconds is the time of expired check, default 10 minutes
TickerTimeInSeconds ParamItem `refreshable:"false"`
// CompressionTypes is compression type of each level
// len of CompressionTypes means num of rocksdb level.
// only support {0,7}, 0 means no compress, 7 means zstd
// default [0,7].
CompressionTypes ParamItem `refreshable:"false"`
}
func (r *RocksmqConfig) Init(base *BaseTable) {
@ -651,6 +656,13 @@ please adjust in embedded Milvus: /tmp/milvus/rdb_data`,
Version: "2.2.2",
}
r.TickerTimeInSeconds.Init(base.mgr)
r.CompressionTypes = ParamItem{
Key: "rocksmq.compressionTypes",
DefaultValue: "0,0,7,7,7",
Version: "2.2.12",
}
r.CompressionTypes.Init(base.mgr)
}
// NatsmqConfig describes the configuration options for the Nats message queue