Refine rocksdb (#12743)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/13683/head
Xiaofan 2021-12-17 23:44:42 +08:00 committed by GitHub
parent 2ef2228ad0
commit b6cca25d1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 907 additions and 646 deletions

View File

@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/tecbot/gorocksdb"
)
@ -37,8 +38,9 @@ const (
LRUCacheSize = 0
)
// NewRocksdbKV returns a rockskv object
// NewRocksdbKV returns a rockskv object, only used in test
func NewRocksdbKV(name string) (*RocksdbKV, error) {
// TODO we should use multiple column family of rocks db rather than init multiple db instance
if name == "" {
return nil, errors.New("rocksdb name is nil")
}
@ -48,12 +50,21 @@ func NewRocksdbKV(name string) (*RocksdbKV, error) {
bbto.SetBlockCache(gorocksdb.NewLRUCache(LRUCacheSize))
opts := gorocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(bbto)
// by default there are only 1 thread for flush compaction, which may block each other.
// increase to a reasonable thread numbers
opts.IncreaseParallelism(2)
// enable back ground flush
opts.SetMaxBackgroundFlushes(1)
opts.SetCreateIfMissing(true)
return NewRocksdbKVWithOpts(name, opts)
}
// NewRocksdbKV returns a rockskv object
func NewRocksdbKVWithOpts(name string, opts *gorocksdb.Options) (*RocksdbKV, error) {
ro := gorocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
wo := gorocksdb.NewDefaultWriteOptions()
// only has one columnn families
db, err := gorocksdb.OpenDb(opts, name)
if err != nil {
return nil, err
@ -84,7 +95,9 @@ func (kv *RocksdbKV) Load(key string) (string, error) {
if kv.DB == nil {
return "", fmt.Errorf("rocksdb instance is nil when load %s", key)
}
if key == "" {
return "", errors.New("rocksdb kv does not support load empty key")
}
value, err := kv.DB.Get(kv.ReadOptions, []byte(key))
if err != nil {
return "", err
@ -94,27 +107,20 @@ func (kv *RocksdbKV) Load(key string) (string, error) {
}
// LoadWithPrefix returns a batch values of keys with a prefix
func (kv *RocksdbKV) LoadWithPrefix(key string) ([]string, []string, error) {
if key == "" {
return nil, nil, errors.New("key is nil in LoadWithPrefix")
}
// if prefix is "", then load every thing from the database
func (kv *RocksdbKV) LoadWithPrefix(prefix string) ([]string, []string, error) {
if kv.DB == nil {
return nil, nil, fmt.Errorf("rocksdb instance is nil when load %s", key)
return nil, nil, fmt.Errorf("rocksdb instance is nil when load %s", prefix)
}
kv.ReadOptions.SetPrefixSameAsStart(true)
kv.DB.Close()
kv.Opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(len(key)))
var err error
kv.DB, err = gorocksdb.OpenDb(kv.Opts, kv.GetName())
if err != nil {
return nil, nil, err
if prefix != "" {
kv.ReadOptions.SetIterateUpperBound([]byte(typeutil.AddOne(prefix)))
}
iter := kv.DB.NewIterator(kv.ReadOptions)
defer iter.Close()
keys := make([]string, 0)
values := make([]string, 0)
iter.Seek([]byte(key))
iter.Seek([]byte(prefix))
for ; iter.Valid(); iter.Next() {
key := iter.Key()
value := iter.Value()
@ -129,15 +135,6 @@ func (kv *RocksdbKV) LoadWithPrefix(key string) ([]string, []string, error) {
return keys, values, nil
}
// ResetPrefixLength will close rocksdb object and open a new rocksdb with new prefix length
func (kv *RocksdbKV) ResetPrefixLength(len int) error {
kv.DB.Close()
kv.Opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(len))
var err error
kv.DB, err = gorocksdb.OpenDb(kv.Opts, kv.GetName())
return err
}
// MultiLoad load a batch of values by keys
func (kv *RocksdbKV) MultiLoad(keys []string) ([]string, error) {
if kv.DB == nil {
@ -160,6 +157,12 @@ func (kv *RocksdbKV) Save(key, value string) error {
if kv.DB == nil {
return errors.New("rocksdb instance is nil when do save")
}
if key == "" {
return errors.New("rocksdb kv does not support empty key")
}
if value == "" {
return errors.New("rocksdb kv does not support empty value")
}
err := kv.DB.Put(kv.WriteOptions, []byte(key), []byte(value))
return err
}
@ -179,34 +182,27 @@ func (kv *RocksdbKV) MultiSave(kvs map[string]string) error {
}
// RemoveWithPrefix removes a batch of key-values with specified prefix
// If prefix is "", then all data in the rocksdb kv will be deleted
func (kv *RocksdbKV) RemoveWithPrefix(prefix string) error {
if kv.DB == nil {
return errors.New("rocksdb instance is nil when do RemoveWithPrefix")
}
kv.ReadOptions.SetPrefixSameAsStart(true)
kv.DB.Close()
kv.Opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(len(prefix)))
var err error
kv.DB, err = gorocksdb.OpenDb(kv.Opts, kv.GetName())
if err != nil {
return err
}
iter := kv.DB.NewIterator(kv.ReadOptions)
defer iter.Close()
iter.Seek([]byte(prefix))
for ; iter.Valid(); iter.Next() {
key := iter.Key()
err := kv.DB.Delete(kv.WriteOptions, key.Data())
key.Free()
if err != nil {
return nil
if len(prefix) == 0 {
// better to use drop column family, but as we use default column family, we just delete ["",lastKey+1)
readOpts := gorocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
iter := kv.DB.NewIterator(readOpts)
defer iter.Close()
// seek to the last key
iter.SeekToLast()
if iter.Valid() {
return kv.DeleteRange(prefix, typeutil.AddOne(string(iter.Key().Data())))
}
// nothing in the range, skip
return nil
}
if err := iter.Err(); err != nil {
return err
}
return nil
prefixEnd := typeutil.AddOne(prefix)
return kv.DeleteRange(prefix, prefixEnd)
}
// Remove is used to remove a pair of key-value
@ -214,6 +210,9 @@ func (kv *RocksdbKV) Remove(key string) error {
if kv.DB == nil {
return errors.New("rocksdb instance is nil when do Remove")
}
if key == "" {
return errors.New("rocksdb kv does not support empty key")
}
err := kv.DB.Delete(kv.WriteOptions, []byte(key))
return err
}
@ -254,15 +253,11 @@ func (kv *RocksdbKV) DeleteRange(startKey, endKey string) error {
if kv.DB == nil {
return errors.New("Rocksdb instance is nil when do DeleteRange")
}
if startKey >= endKey {
return fmt.Errorf("rockskv delete range startkey must < endkey, startkey %s, endkey %s", startKey, endKey)
}
writeBatch := gorocksdb.NewWriteBatch()
defer writeBatch.Destroy()
if len(startKey) == 0 {
iter := kv.DB.NewIterator(kv.ReadOptions)
defer iter.Close()
iter.SeekToFirst()
startKey = string(iter.Key().Data())
}
writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
err := kv.DB.Write(kv.WriteOptions, writeBatch)
return err

View File

@ -108,10 +108,9 @@ func TestRocksdbKV_Prefix(t *testing.T) {
keys, vals, err := rocksdbKV.LoadWithPrefix("abc")
assert.Nil(t, err)
assert.Equal(t, len(keys), 1)
assert.Equal(t, len(vals), 1)
//fmt.Println(keys)
//fmt.Println(vals)
err = rocksdbKV.RemoveWithPrefix("abc")
assert.Nil(t, err)
@ -124,6 +123,27 @@ func TestRocksdbKV_Prefix(t *testing.T) {
val, err = rocksdbKV.Load("abddqqq")
assert.Nil(t, err)
assert.Equal(t, val, "1234555")
// test remove ""
err = rocksdbKV.RemoveWithPrefix("")
assert.Nil(t, err)
// test remove from a empty cf
err = rocksdbKV.RemoveWithPrefix("")
assert.Nil(t, err)
val, err = rocksdbKV.Load("abddqqq")
assert.Nil(t, err)
assert.Equal(t, len(val), 0)
// test we can still save after drop
err = rocksdbKV.Save("abcd", "123")
assert.Nil(t, err)
val, err = rocksdbKV.Load("abcd")
assert.Nil(t, err)
assert.Equal(t, val, "123")
}
func TestRocksdbKV_Goroutines(t *testing.T) {
@ -151,7 +171,7 @@ func TestRocksdbKV_Goroutines(t *testing.T) {
wg.Wait()
}
func TestRocksdbKV_Dummy(t *testing.T) {
func TestRocksdbKV_DummyDB(t *testing.T) {
name := "/tmp/rocksdb_dummy"
rocksdbkv, err := rocksdbkv.NewRocksdbKV(name)
assert.Nil(t, err)
@ -184,3 +204,25 @@ func TestRocksdbKV_Dummy(t *testing.T) {
_, err = rocksdbkv.Load("dummy")
assert.Error(t, err)
}
func TestRocksdbKV_CornerCase(t *testing.T) {
name := "/tmp/rocksdb_corner"
rocksdbkv, err := rocksdbkv.NewRocksdbKV(name)
assert.Nil(t, err)
defer rocksdbkv.Close()
defer rocksdbkv.RemoveWithPrefix("")
_, err = rocksdbkv.Load("")
assert.Error(t, err)
keys, values, err := rocksdbkv.LoadWithPrefix("")
assert.NoError(t, err)
assert.Equal(t, len(keys), 0)
assert.Equal(t, len(values), 0)
err = rocksdbkv.Save("", "")
assert.Error(t, err)
err = rocksdbkv.Save("test", "")
assert.Error(t, err)
err = rocksdbkv.Remove("")
assert.Error(t, err)
err = rocksdbkv.DeleteRange("a", "a")
assert.Error(t, err)
}

View File

@ -71,7 +71,11 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
if reflect.ValueOf(c.server).IsNil() {
return nil, newError(0, "Rmq server is nil")
}
if exist, con := c.server.ExistConsumerGroup(options.Topic, options.SubscriptionName); exist {
exist, con, err := c.server.ExistConsumerGroup(options.Topic, options.SubscriptionName)
if err != nil {
return nil, err
}
if exist {
log.Debug("ConsumerGroup already existed", zap.Any("topic", options.Topic), zap.Any("SubscriptionName", options.SubscriptionName))
consumer, err := getExistedConsumer(c, options, con.MsgMutex)
if err != nil {

View File

@ -16,10 +16,8 @@ import (
"os"
"time"
"github.com/milvus-io/milvus/internal/allocator"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/milvus-io/milvus/internal/log"
rocksmq "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
"github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
server "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
"go.uber.org/zap"
@ -45,23 +43,9 @@ func newMockClient() *client {
return client
}
func initIDAllocator(kvPath string) *allocator.GlobalIDAllocator {
rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvPath)
if err != nil {
panic(err)
}
idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV)
_ = idAllocator.Initialize()
return idAllocator
}
func newRocksMQ(rmqPath string) server.RocksMQ {
kvPath := rmqPath + "_kv"
idAllocator := initIDAllocator(kvPath)
rocksdbPath := rmqPath + "_db"
rmq, _ := rocksmq.NewRocksMQ(rocksdbPath, idAllocator)
rmq, _ := rocksmq.NewRocksMQ(rocksdbPath, nil)
return rmq
}

View File

@ -19,7 +19,6 @@ import (
"sync/atomic"
"github.com/milvus-io/milvus/internal/allocator"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -64,15 +63,6 @@ func InitRocksMQ() error {
}
}
kvname := rocksdbName + "_kv"
var rkv *rocksdbkv.RocksdbKV
rkv, finalErr = rocksdbkv.NewRocksdbKV(kvname)
if finalErr != nil {
return
}
idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rkv)
_ = idAllocator.Initialize()
rawRmqPageSize, err := params.Load("rocksmq.rocksmqPageSize")
if err == nil && rawRmqPageSize != "" {
rmqPageSize, err := strconv.ParseInt(rawRmqPageSize, 10, 64)
@ -86,7 +76,7 @@ func InitRocksMQ() error {
if err == nil && rawRmqRetentionTimeInMinutes != "" {
rawRmqRetentionTimeInMinutes, err := strconv.ParseInt(rawRmqRetentionTimeInMinutes, 10, 64)
if err == nil {
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, rawRmqRetentionTimeInMinutes)
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, rawRmqRetentionTimeInMinutes*60)
} else {
log.Warn("rocksmq.retentionTimeInMinutes is invalid, using default value 3 days")
}
@ -100,9 +90,9 @@ func InitRocksMQ() error {
log.Warn("rocksmq.retentionSizeInMB is invalid, using default value 0")
}
}
log.Debug("", zap.Any("RocksmqRetentionTimeInMinutes", RocksmqRetentionTimeInMinutes),
log.Debug("", zap.Any("RocksmqRetentionTimeInMinutes", rawRmqRetentionTimeInMinutes),
zap.Any("RocksmqRetentionSizeInMB", RocksmqRetentionSizeInMB), zap.Any("RocksmqPageSize", RocksmqPageSize))
Rmq, finalErr = NewRocksMQ(rocksdbName, idAllocator)
Rmq, finalErr = NewRocksMQ(rocksdbName, nil)
})
return finalErr
}

View File

@ -26,6 +26,7 @@ import (
func Test_InitRmq(t *testing.T) {
name := "/tmp/rmq_init"
defer os.RemoveAll("/tmp/rmq_init")
endpoints := os.Getenv("ETCD_ENDPOINTS")
if endpoints == "" {
endpoints = "localhost:2379"
@ -38,6 +39,8 @@ func Test_InitRmq(t *testing.T) {
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
defer os.RemoveAll(name + kvSuffix)
defer os.RemoveAll(name)
err = InitRmq(name, idAllocator)
defer Rmq.stopRetention()
assert.NoError(t, err)
@ -49,7 +52,7 @@ func Test_InitRocksMQ(t *testing.T) {
rmqPath := "/tmp/milvus/rdb_data_global"
err := os.Setenv("ROCKSMQ_PATH", rmqPath)
assert.Nil(t, err)
defer os.RemoveAll(rmqPath)
defer os.RemoveAll("/tmp/milvus")
err = InitRocksMQ()
defer Rmq.stopRetention()
assert.NoError(t, err)
@ -73,10 +76,15 @@ func Test_InitRocksMQ(t *testing.T) {
func Test_InitRocksMQError(t *testing.T) {
once = sync.Once{}
dummyPath := "/tmp/milvus/dummy"
os.Create(dummyPath)
dir := "/tmp/milvus/"
dummyPath := dir + "dummy"
err := os.MkdirAll(dir, os.ModePerm)
assert.NoError(t, err)
f, err := os.Create(dummyPath)
defer f.Close()
assert.NoError(t, err)
os.Setenv("ROCKSMQ_PATH", dummyPath)
defer os.RemoveAll(dummyPath)
err := InitRocksMQ()
defer os.RemoveAll(dir)
err = InitRocksMQ()
assert.Error(t, err)
}

View File

@ -23,6 +23,7 @@ type Consumer struct {
Topic string
GroupName string
MsgMutex chan struct{}
beginID UniqueID
}
// ConsumerMessage that consumed from rocksdb
@ -40,18 +41,18 @@ type RocksMQ interface {
DestroyConsumerGroup(topicName string, groupName string) error
Close()
RegisterConsumer(consumer *Consumer)
RegisterConsumer(consumer *Consumer) error
Produce(topicName string, messages []ProducerMessage) ([]UniqueID, error)
Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error)
Seek(topicName string, groupName string, msgID UniqueID) error
SeekToLatest(topicName, groupName string) error
ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer)
ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer, error)
Notify(topicName, groupName string)
CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool, subscriptionRolePrefix string) (string, error)
ReaderSeek(topicName string, readerName string, msgID UniqueID)
ReaderSeek(topicName string, readerName string, msgID UniqueID) error
Next(ctx context.Context, topicName string, readerName string) (*ConsumerMessage, error)
HasNext(topicName string, readerName string) bool
CloseReader(topicName string, readerName string)

View File

@ -17,6 +17,7 @@ import (
"fmt"
"math"
"path"
"runtime"
"strconv"
"strings"
"sync"
@ -40,29 +41,42 @@ type UniqueID = typeutil.UniqueID
// RmqState Rocksmq state
type RmqState = int64
// RocksmqPageSize is the size of a message page, default 2GB
var RocksmqPageSize int64 = 2 << 30
// RocksmqPageSize is the size of a message page, default 256MB
var RocksmqPageSize int64 = 256 << 20
// Const variable that will be used in rocksmqs
const (
DefaultMessageID = "-1"
FixedChannelNameLen = 320
RocksDBLRUCacheCapacity = 0
DefaultMessageID = -1
FixedChannelNameLen = 320
// TODO make it configable
RocksDBLRUCacheCapacity = 1 << 30
kvSuffix = "_meta_kv"
MessageSizeTitle = "message_size/"
PageMsgSizeTitle = "page_message_size/"
TopicBeginIDTitle = "topic_begin_id/"
BeginIDTitle = "begin_id/"
AckedTsTitle = "acked_ts/"
AckedSizeTitle = "acked_size/"
LastRetTsTitle = "last_retention_ts/"
// topic_begin_id/topicName
// topic begin id record a topic is valid, create when topic is created, cleaned up on destroy topic
TopicIDTitle = "topic_id/"
// message_size/topicName record the current page message size, once current message size > RocksMq size, reset this value and open a new page
// TODO should be cached
MessageSizeTitle = "message_size/"
// page_message_size/topicName/pageId record the endId of each page, it will be purged either in retention or the destroy of topic
PageMsgSizeTitle = "page_message_size/"
// page_ts/topicName/pageId, record the page last ts, used for TTL functionality
PageTsTitle = "page_ts/"
// acked_ts/topicName/pageId, record the latest ack ts of each page, will be purged on retention or destroy of the topic
AckedTsTitle = "acked_ts/"
// only in memory
CurrentIDSuffix = "current_id"
CurrentIDSuffix = "current_id"
ReaderNamePrefix = "reader-"
RmqNotServingErrMsg = "rocksmq is not serving"
RmqNotServingErrMsg = "Rocksmq is not serving"
)
const (
@ -74,6 +88,7 @@ const (
/**
* @brief fill with '_' to ensure channel name fixed length
* TODO this is a waste of memory, remove it
*/
func fixChannelName(name string) (string, error) {
if len(name) > FixedChannelNameLen {
@ -109,26 +124,23 @@ func constructCurrentID(topicName, groupName string) string {
}
/**
* Construct table name and fixed channel name to be a key with length of FixedChannelNameLen,
* used for meta infos
* Combine metaname together with topic
*/
func constructKey(metaName, topic string) (string, error) {
func constructKey(metaName, topic string) string {
// Check metaName/topic
oldLen := len(metaName + topic)
if oldLen > FixedChannelNameLen {
return "", errors.New("topic name exceeds limit")
}
return metaName + topic
}
nameBytes := make([]byte, FixedChannelNameLen-oldLen)
for i := 0; i < len(nameBytes); i++ {
nameBytes[i] = byte('*')
func parsePageID(key string) (int64, error) {
stringSlice := strings.Split(key, "/")
if len(stringSlice) != 3 {
return 0, fmt.Errorf("Invalid page id %s ", key)
}
return metaName + topic + string(nameBytes), nil
return strconv.ParseInt(stringSlice[2], 10, 64)
}
func checkRetention() bool {
return RocksmqRetentionTimeInMinutes != -1 && RocksmqRetentionSizeInMB != -1
return RocksmqRetentionTimeInSecs != -1 || RocksmqRetentionSizeInMB != -1
}
func getNowTs(idAllocator allocator.GIDAllocator) (int64, error) {
@ -152,7 +164,7 @@ type rocksmq struct {
idAllocator allocator.GIDAllocator
storeMu *sync.Mutex
consumers sync.Map
ackedMu sync.Map
consumersID sync.Map
retentionInfo *retentionInfo
readers sync.Map
@ -164,33 +176,64 @@ type rocksmq struct {
// 2. Init retention info, load retention info to memory
// 3. Start retention goroutine
func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, error) {
// TODO we should use same rocksdb instance with different cfs
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetCacheIndexAndFilterBlocks(true)
bbto.SetPinL0FilterAndIndexBlocksInCache(true)
bbto.SetBlockCache(gorocksdb.NewLRUCache(RocksDBLRUCacheCapacity))
opts := gorocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(bbto)
opts.SetCreateIfMissing(true)
opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(FixedChannelNameLen + 1))
// opts.SetMaxOpenFiles(-1)
optsKV := gorocksdb.NewDefaultOptions()
optsKV.SetBlockBasedTableFactory(bbto)
optsKV.SetCreateIfMissing(true)
// by default there are only 1 thread for flush compaction, which may block each other.
// increase to a reasonable thread numbers
optsKV.IncreaseParallelism(runtime.NumCPU())
// enable back ground flush
optsKV.SetMaxBackgroundFlushes(1)
db, err := gorocksdb.OpenDb(opts, name)
// finish rocks KV
kvName := name + kvSuffix
kv, err := rocksdbkv.NewRocksdbKVWithOpts(kvName, optsKV)
if err != nil {
return nil, err
}
kvName := name + kvSuffix
kv, err := rocksdbkv.NewRocksdbKV(kvName)
// finish rocks mq store initialization, rocks mq store has to set the prefix extractor
optsStore := gorocksdb.NewDefaultOptions()
// share block cache with kv
optsStore.SetBlockBasedTableFactory(bbto)
optsStore.SetCreateIfMissing(true)
// by default there are only 1 thread for flush compaction, which may block each other.
// increase to a reasonable thread numbers
optsStore.IncreaseParallelism(runtime.NumCPU())
// enable back ground flush
optsStore.SetMaxBackgroundFlushes(1)
// TODO remove fix channel name len logic
optsStore.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(FixedChannelNameLen + 1))
db, err := gorocksdb.OpenDb(optsStore, name)
if err != nil {
return nil, err
}
var mqIDAllocator allocator.GIDAllocator
// if user didn't specify id allocator, init one with kv
if idAllocator == nil {
allocator := allocator.NewGlobalIDAllocator("rmq_id", kv)
err = allocator.Initialize()
if err != nil {
return nil, err
}
mqIDAllocator = allocator
} else {
mqIDAllocator = idAllocator
}
rmq := &rocksmq{
store: db,
kv: kv,
idAllocator: idAllocator,
idAllocator: mqIDAllocator,
storeMu: &sync.Mutex{},
consumers: sync.Map{},
ackedMu: sync.Map{},
readers: sync.Map{},
}
@ -204,6 +247,24 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro
rmq.retentionInfo.startRetentionInfo()
}
atomic.StoreInt64(&rmq.state, RmqStateHealthy)
// TODO add this to monitor metrics
go func() {
for {
time.Sleep(5 * time.Minute)
log.Info("Rocksmq memory usage",
zap.String("rockskv kv cache", kv.DB.GetProperty("rocksdb.block-cache-usage")),
zap.String("rockskv memtable ", kv.DB.GetProperty("rocksdb.cur-size-all-mem-tables")),
zap.String("rockskv table readers", kv.DB.GetProperty("rocksdb.estimate-table-readers-mem")),
zap.String("rockskv pinned", kv.DB.GetProperty("rocksdb.block-cache-pinned-usage")),
zap.String("store kv cache", db.GetProperty("rocksdb.block-cache-usage")),
zap.String("store memtable ", db.GetProperty("rocksdb.cur-size-all-mem-tables")),
zap.String("store table readers", db.GetProperty("rocksdb.estimate-table-readers-mem")),
zap.String("store pinned", db.GetProperty("rocksdb.block-cache-pinned-usage")),
)
}
}()
return rmq, nil
}
@ -219,24 +280,19 @@ func (rmq *rocksmq) Close() {
atomic.StoreInt64(&rmq.state, RmqStateStopped)
rmq.stopRetention()
rmq.consumers.Range(func(k, v interface{}) bool {
var topic string
// TODO what happened if the server crashed? who handled the destroy consumer group? should we just handled it when rocksmq created?
// or we should not even make consumer info persistent?
for _, consumer := range v.([]*Consumer) {
err := rmq.DestroyConsumerGroup(consumer.Topic, consumer.GroupName)
err := rmq.destroyConsumerGroupInternal(consumer.Topic, consumer.GroupName)
if err != nil {
log.Warn("Failed to destroy consumer group in rocksmq!", zap.Any("topic", consumer.Topic), zap.Any("groupName", consumer.GroupName), zap.Any("error", err))
}
topic = consumer.Topic
}
if topic != "" {
err := rmq.DestroyTopic(topic)
if err != nil {
log.Warn("Rocksmq DestroyTopic failed!", zap.Any("topic", topic), zap.Any("error", err))
}
}
return true
})
rmq.storeMu.Lock()
defer rmq.storeMu.Unlock()
rmq.kv.Close()
rmq.store.Close()
}
@ -246,56 +302,26 @@ func (rmq *rocksmq) stopRetention() {
}
}
func (rmq *rocksmq) checkKeyExist(key string) bool {
val, _ := rmq.kv.Load(key)
return val != ""
}
// CreateTopic writes initialized messages for topic in rocksdb
func (rmq *rocksmq) CreateTopic(topicName string) error {
if rmq.isClosed() {
return errors.New(RmqNotServingErrMsg)
}
start := time.Now()
beginKey := topicName + "/begin_id"
endKey := topicName + "/end_id"
// Check if topic exist
if rmq.checkKeyExist(beginKey) || rmq.checkKeyExist(endKey) {
log.Warn("RocksMQ: " + beginKey + " or " + endKey + " existed.")
// TopicBeginIDTitle is the only identifier of a topic exist or not
topicIDKey := TopicIDTitle + topicName
val, err := rmq.kv.Load(topicIDKey)
if err != nil {
return err
}
if val != "" {
return nil
}
// TODO change rmq kv save logic into a batch
err := rmq.kv.Save(beginKey, "0")
if err != nil {
return err
}
err = rmq.kv.Save(endKey, "0")
if err != nil {
return err
}
if _, ok := topicMu.Load(topicName); !ok {
topicMu.Store(topicName, new(sync.Mutex))
}
if _, ok := rmq.ackedMu.Load(topicName); !ok {
rmq.ackedMu.Store(topicName, new(sync.Mutex))
}
// Initialize retention infos
// Initialize acked size to 0 for topic
ackedSizeKey := AckedSizeTitle + topicName
err = rmq.kv.Save(ackedSizeKey, "0")
if err != nil {
return err
}
// Initialize topic begin id to defaultMessageID
topicBeginIDKey := TopicBeginIDTitle + topicName
err = rmq.kv.Save(topicBeginIDKey, DefaultMessageID)
if err != nil {
return err
}
// Initialize topic message size to 0
msgSizeKey := MessageSizeTitle + topicName
@ -304,9 +330,16 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
return err
}
// Initialize topic id to its create Tme, we don't really use it for now
nowTs := strconv.FormatInt(time.Now().Unix(), 10)
err = rmq.kv.Save(topicIDKey, nowTs)
if err != nil {
return err
}
rmq.retentionInfo.mutex.Lock()
defer rmq.retentionInfo.mutex.Unlock()
rmq.retentionInfo.topics.Store(topicName, time.Now().Unix())
rmq.retentionInfo.topicRetetionTime.Store(topicName, time.Now().Unix())
log.Debug("Rocksmq create topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
return nil
}
@ -324,29 +357,55 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
}
lock.Lock()
defer lock.Unlock()
beginKey := topicName + "/begin_id"
endKey := topicName + "/end_id"
var removedKeys []string
rmq.consumers.Delete(topicName)
ackedSizeKey := AckedSizeTitle + topicName
topicBeginIDKey := TopicBeginIDTitle + topicName
// clean the topic data it self
fixChanName, err := fixChannelName(topicName)
if err != nil {
return err
}
err = rmq.kv.RemoveWithPrefix(fixChanName)
if err != nil {
return err
}
// just for clean up old topics, for new topics this is not required
lastRetTsKey := LastRetTsTitle + topicName
// clean page size info
pageMsgSizeKey := constructKey(PageMsgSizeTitle, topicName)
err = rmq.kv.RemoveWithPrefix(pageMsgSizeKey)
if err != nil {
return err
}
// clean page ts info
pageMsgTsKey := constructKey(PageTsTitle, topicName)
err = rmq.kv.RemoveWithPrefix(pageMsgTsKey)
if err != nil {
return err
}
// cleaned acked ts info
ackedTsKey := constructKey(AckedTsTitle, topicName)
err = rmq.kv.RemoveWithPrefix(ackedTsKey)
if err != nil {
return err
}
// topic info
topicIDKey := TopicIDTitle + topicName
// message size of this topic
msgSizeKey := MessageSizeTitle + topicName
removedKeys = append(removedKeys, beginKey, endKey, ackedSizeKey, topicBeginIDKey, lastRetTsKey, msgSizeKey)
var removedKeys []string
removedKeys = append(removedKeys, topicIDKey, msgSizeKey)
// Batch remove, atomic operation
err := rmq.kv.MultiRemove(removedKeys)
err = rmq.kv.MultiRemove(removedKeys)
if err != nil {
return err
}
// clean up retention info
topicMu.Delete(topicName)
rmq.retentionInfo.topics.Delete(topicName)
rmq.retentionInfo.topicRetetionTime.Delete(topicName)
// clean up reader
if val, ok := rmq.readers.LoadAndDelete(topicName); ok {
@ -359,18 +418,19 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
}
// ExistConsumerGroup check if a consumer exists and return the existed consumer
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer) {
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer, error) {
key := constructCurrentID(topicName, groupName)
if rmq.checkKeyExist(key) {
_, ok := rmq.consumersID.Load(key)
if ok {
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {
if v.GroupName == groupName {
return true, v
return true, v, nil
}
}
}
}
return false, nil
return false, nil, nil
}
// CreateConsumerGroup creates an nonexistent consumer group for topic
@ -380,14 +440,11 @@ func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error {
}
start := time.Now()
key := constructCurrentID(topicName, groupName)
if rmq.checkKeyExist(key) {
log.Debug("RMQ CreateConsumerGroup key already exists", zap.String("key", key))
return nil
}
err := rmq.kv.Save(key, DefaultMessageID)
if err != nil {
return err
_, ok := rmq.consumersID.Load(key)
if ok {
return fmt.Errorf("RMQ CreateConsumerGroup key already exists, key = %s", key)
}
rmq.consumersID.Store(key, DefaultMessageID)
log.Debug("Rocksmq create consumer group successfully ", zap.String("topic", topicName),
zap.String("group", groupName),
zap.Int64("elapsed", time.Since(start).Milliseconds()))
@ -395,15 +452,15 @@ func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error {
}
// RegisterConsumer registers a consumer in rocksmq consumers
func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) {
func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) error {
if rmq.isClosed() {
return
return errors.New(RmqNotServingErrMsg)
}
start := time.Now()
if vals, ok := rmq.consumers.Load(consumer.Topic); ok {
for _, v := range vals.([]*Consumer) {
if v.GroupName == consumer.GroupName {
return
return nil
}
}
consumers := vals.([]*Consumer)
@ -415,10 +472,19 @@ func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) {
rmq.consumers.Store(consumer.Topic, consumers)
}
log.Debug("Rocksmq register consumer successfully ", zap.String("topic", consumer.Topic), zap.Int64("elapsed", time.Since(start).Milliseconds()))
return nil
}
// DestroyConsumerGroup removes a consumer group from rocksdb_kv
func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
if rmq.isClosed() {
return errors.New(RmqNotServingErrMsg)
}
return rmq.destroyConsumerGroupInternal(topicName, groupName)
}
// DestroyConsumerGroup removes a consumer group from rocksdb_kv
func (rmq *rocksmq) destroyConsumerGroupInternal(topicName, groupName string) error {
start := time.Now()
ll, ok := topicMu.Load(topicName)
if !ok {
@ -431,11 +497,7 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
lock.Lock()
defer lock.Unlock()
key := constructCurrentID(topicName, groupName)
err := rmq.kv.Remove(key)
if err != nil {
return err
}
rmq.consumersID.Delete(key)
if vals, ok := rmq.consumers.Load(topicName); ok {
consumers := vals.([]*Consumer)
for index, v := range consumers {
@ -484,7 +546,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
return []UniqueID{}, errors.New("Obtained id length is not equal that of message")
}
/* Step I: Insert data to store system */
// Insert data to store system
batch := gorocksdb.NewWriteBatch()
defer batch.Destroy()
msgSizes := make(map[UniqueID]int64)
@ -495,7 +557,6 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
if err != nil {
return []UniqueID{}, err
}
batch.Put([]byte(key), messages[i].Payload)
msgIDs[i] = msgID
msgSizes[msgID] = int64(len(messages[i].Payload))
@ -509,29 +570,6 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
return []UniqueID{}, err
}
/* Step II: Update meta data to kv system */
kvChannelBeginID := topicName + "/begin_id"
beginIDValue, err := rmq.kv.Load(kvChannelBeginID)
if err != nil {
log.Debug("RocksMQ: load " + kvChannelBeginID + " failed")
return []UniqueID{}, err
}
kvValues := make(map[string]string)
if beginIDValue == "0" {
kvValues[kvChannelBeginID] = strconv.FormatInt(idStart, 10)
}
kvChannelEndID := topicName + "/end_id"
kvValues[kvChannelEndID] = strconv.FormatInt(idEnd, 10)
err = rmq.kv.MultiSave(kvValues)
if err != nil {
log.Debug("RocksMQ: multisave failed")
return []UniqueID{}, err
}
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {
select {
@ -554,12 +592,12 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
}
// Update message page info
// TODO(yukun): Should this be in a go routine
err = rmq.updatePageInfo(topicName, msgIDs, msgSizes)
if err != nil {
return []UniqueID{}, err
}
// TODO add this to monitor metrics
getProduceTime := time.Since(start).Milliseconds()
if getLockTime > 200 || getProduceTime > 200 {
log.Warn("rocksmq produce too slowly", zap.String("topic", topicName),
@ -578,10 +616,10 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes
if err != nil {
return err
}
fixedPageSizeKey, err := constructKey(PageMsgSizeTitle, topicName)
if err != nil {
return err
}
fixedPageSizeKey := constructKey(PageMsgSizeTitle, topicName)
fixedPageTsKey := constructKey(PageTsTitle, topicName)
nowTs := strconv.FormatInt(time.Now().Unix(), 10)
mutateBuffer := make(map[string]string)
for _, id := range msgIDs {
msgSize := msgSizes[id]
if curMsgSize+msgSize > RocksmqPageSize {
@ -590,17 +628,16 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes
pageEndID := id
// Update page message size for current page. key is page end ID
pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10)
err := rmq.kv.Save(pageMsgSizeKey, strconv.FormatInt(newPageSize, 10))
if err != nil {
return err
}
mutateBuffer[pageMsgSizeKey] = strconv.FormatInt(newPageSize, 10)
pageTsKey := fixedPageTsKey + "/" + strconv.FormatInt(pageEndID, 10)
mutateBuffer[pageTsKey] = nowTs
curMsgSize = 0
} else {
curMsgSize += msgSize
}
}
// Update message size to current message size
err = rmq.kv.Save(msgSizeKey, strconv.FormatInt(curMsgSize, 10))
mutateBuffer[msgSizeKey] = strconv.FormatInt(curMsgSize, 10)
err = rmq.kv.MultiSave(mutateBuffer)
return err
}
@ -626,12 +663,8 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
getLockTime := time.Since(start).Milliseconds()
metaKey := constructCurrentID(topicName, groupName)
currentID, err := rmq.kv.Load(metaKey)
if err != nil {
log.Debug("RocksMQ: load " + metaKey + " failed")
return nil, err
}
if currentID == "" {
currentID, ok := rmq.consumersID.Load(metaKey)
if !ok {
return nil, fmt.Errorf("currentID of topicName=%s, groupName=%s not exist", topicName, groupName)
}
@ -653,7 +686,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
if currentID == DefaultMessageID {
dataKey = fixChanName + "/"
} else {
dataKey = fixChanName + "/" + currentID
dataKey = fixChanName + "/" + strconv.FormatInt(currentID.(int64), 10)
}
iter.Seek([]byte(dataKey))
@ -666,7 +699,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
offset++
msgID, err := strconv.ParseInt(strKey[FixedChannelNameLen+1:], 10, 64)
if err != nil {
log.Debug("RocksMQ: parse int " + strKey[FixedChannelNameLen+1:] + " failed")
log.Warn("RocksMQ: parse int " + strKey[FixedChannelNameLen+1:] + " failed")
val.Free()
return nil, err
}
@ -684,6 +717,10 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
consumerMessage = append(consumerMessage, msg)
val.Free()
}
// if iterate fail
if err := iter.Err(); err != nil {
return nil, err
}
// When already consume to last mes, an empty slice will be returned
if len(consumerMessage) == 0 {
@ -696,12 +733,10 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
consumedIDs = append(consumedIDs, msg.MsgID)
}
newID := consumedIDs[len(consumedIDs)-1]
err = rmq.moveConsumePos(topicName, groupName, newID+1)
if err != nil {
return nil, err
}
rmq.moveConsumePos(topicName, groupName, newID+1)
go rmq.updateAckedInfo(topicName, groupName, consumedIDs)
rmq.updateAckedInfo(topicName, groupName, consumedIDs)
// TODO add this to monitor metrics
getConsumeTime := time.Since(start).Milliseconds()
if getLockTime > 200 || getConsumeTime > 200 {
log.Warn("rocksmq consume too slowly", zap.String("topic", topicName),
@ -715,10 +750,12 @@ func (rmq *rocksmq) seek(topicName string, groupName string, msgID UniqueID) err
rmq.storeMu.Lock()
defer rmq.storeMu.Unlock()
key := constructCurrentID(topicName, groupName)
if !rmq.checkKeyExist(key) {
_, ok := rmq.consumersID.Load(key)
if !ok {
log.Warn("RocksMQ: channel " + key + " not exists")
return fmt.Errorf("consumerGroup %s, channel %s not exists", groupName, topicName)
return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName)
}
storeKey, err := combKey(topicName, msgID)
if err != nil {
log.Warn("RocksMQ: combKey(" + topicName + "," + strconv.FormatInt(msgID, 10) + ") failed")
@ -727,28 +764,26 @@ func (rmq *rocksmq) seek(topicName string, groupName string, msgID UniqueID) err
opts := gorocksdb.NewDefaultReadOptions()
defer opts.Destroy()
val, err := rmq.store.Get(opts, []byte(storeKey))
defer val.Free()
if err != nil {
log.Warn("RocksMQ: get " + storeKey + " failed")
return err
}
defer val.Free()
if !val.Exists() {
log.Warn("RocksMQ: trying to seek to no exist position, reset current id",
zap.String("topic", topicName), zap.String("group", groupName), zap.Int64("msgId", msgID))
rmq.moveConsumePos(topicName, groupName, DefaultMessageID)
//skip seek if key is not found, this is the behavior as pulsar
return nil
}
/* Step II: Save current_id in kv */
return rmq.moveConsumePos(topicName, groupName, msgID)
/* Step II: update current_id */
rmq.moveConsumePos(topicName, groupName, msgID)
return nil
}
func (rmq *rocksmq) moveConsumePos(topicName string, groupName string, msgID UniqueID) error {
func (rmq *rocksmq) moveConsumePos(topicName string, groupName string, msgID UniqueID) {
key := constructCurrentID(topicName, groupName)
err := rmq.kv.Save(key, strconv.FormatInt(msgID, 10))
if err != nil {
log.Warn("RocksMQ: save " + key + " failed")
return err
}
return nil
rmq.consumersID.Store(key, msgID)
}
// Seek updates the current id to the given msgID
@ -779,8 +814,9 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
rmq.storeMu.Lock()
defer rmq.storeMu.Unlock()
key := constructCurrentID(topicName, groupName)
if !rmq.checkKeyExist(key) {
log.Debug("RocksMQ: channel " + key + " not exists")
_, ok := rmq.consumersID.Load(key)
if !ok {
log.Warn("RocksMQ: channel " + key + " not exists")
return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName)
}
@ -794,6 +830,10 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
// 0 is the ASC value of "/" + 1
iter.SeekForPrev([]byte(fixChanName + "0"))
// if iterate fail
if err := iter.Err(); err != nil {
return err
}
// should find the last key we written into, start with fixChanName/
// if not find, start from 0
if !iter.Valid() {
@ -813,7 +853,8 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
return err
}
// current msgID should not be included
return rmq.moveConsumePos(topicName, groupName, msgID+1)
rmq.moveConsumePos(topicName, groupName, msgID+1)
return nil
}
// Notify sends a mutex in MsgMutex channel to tell consumers to consume
@ -837,48 +878,25 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID)
if len(ids) == 0 {
return nil
}
ll, ok := topicMu.Load(topicName)
if !ok {
return fmt.Errorf("topic name = %s not exist", topicName)
}
lock, ok := ll.(*sync.Mutex)
if !ok {
return fmt.Errorf("get mutex failed, topic name = %s", topicName)
}
lock.Lock()
defer lock.Unlock()
firstID := ids[0]
lastID := ids[len(ids)-1]
fixedBeginIDKey, err := constructKey(BeginIDTitle, topicName)
if err != nil {
return err
}
// 1. Update begin_id for the consumer_group
beginIDKey := fixedBeginIDKey + "/" + groupName
err = rmq.kv.Save(beginIDKey, strconv.FormatInt(lastID, 10))
if err != nil {
return err
}
// 2. Try to get the page id between first ID and last ID of ids
pageMsgPrefix, err := constructKey(PageMsgSizeTitle, topicName)
if err != nil {
return err
}
// 1. Try to get the page id between first ID and last ID of ids
pageMsgPrefix := constructKey(PageMsgSizeTitle, topicName)
readOpts := gorocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
readOpts.SetPrefixSameAsStart(true)
pageMsgFirstKey := pageMsgPrefix + "/" + strconv.FormatInt(firstID, 10)
// set last key by lastID
pageMsgLastKey := pageMsgPrefix + "/" + strconv.FormatInt(lastID+1, 10)
readOpts.SetIterateUpperBound([]byte(pageMsgLastKey))
iter := rmq.kv.(*rocksdbkv.RocksdbKV).DB.NewIterator(readOpts)
defer iter.Close()
var pageIDs []UniqueID
pageMsgKey := pageMsgPrefix + "/" + strconv.FormatInt(firstID, 10)
for iter.Seek([]byte(pageMsgKey)); iter.Valid(); iter.Next() {
for iter.Seek([]byte(pageMsgFirstKey)); iter.Valid(); iter.Next() {
key := iter.Key()
pageID, err := strconv.ParseInt(string(key.Data())[FixedChannelNameLen+1:], 10, 64)
pageID, err := parsePageID(string(key.Data()))
if key != nil {
key.Free()
}
@ -891,80 +909,47 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID)
break
}
}
if err := iter.Err(); err != nil {
return err
}
if len(pageIDs) == 0 {
return nil
}
fixedAckedTsKey := constructKey(AckedTsTitle, topicName)
fixedAckedTsKey, err := constructKey(AckedTsTitle, topicName)
if err != nil {
return err
}
// 3. Update acked ts and acked size for pageIDs
// 2. Update acked ts and acked size for pageIDs
if vals, ok := rmq.consumers.Load(topicName); ok {
var minBeginID int64 = math.MaxInt64
consumers, ok := vals.([]*Consumer)
if !ok || len(consumers) == 0 {
return nil
}
for _, v := range consumers {
curBeginIDKey := path.Join(fixedBeginIDKey, v.GroupName)
curBeginIDVal, err := rmq.kv.Load(curBeginIDKey)
if err != nil {
return err
// update consumer id
for _, consumer := range consumers {
if consumer.GroupName == groupName {
consumer.beginID = lastID
break
}
curBeginID, err := strconv.ParseInt(curBeginIDVal, 10, 64)
if err != nil {
return err
}
if curBeginID < minBeginID {
minBeginID = curBeginID
}
// find min id of all consumer
var minBeginID int64 = math.MaxInt64
for _, consumer := range consumers {
if consumer.beginID < minBeginID {
minBeginID = consumer.beginID
}
}
nowTs := strconv.FormatInt(time.Now().Unix(), 10)
ackedTsKvs := make(map[string]string)
totalAckMsgSize := int64(0)
fixedPageSizeKey, err := constructKey(PageMsgSizeTitle, topicName)
if err != nil {
return err
}
// update ackedTs, if page is all acked, then ackedTs is set
for _, pID := range pageIDs {
if pID <= minBeginID {
// Update acked info for message pID
pageAckedTsKey := path.Join(fixedAckedTsKey, strconv.FormatInt(pID, 10))
ackedTsKvs[pageAckedTsKey] = nowTs
// get current page message size
pageMsgSizeKey := path.Join(fixedPageSizeKey, strconv.FormatInt(pID, 10))
pageMsgSizeVal, err := rmq.kv.Load(pageMsgSizeKey)
if err != nil {
return err
}
pageMsgSize, err := strconv.ParseInt(pageMsgSizeVal, 10, 64)
if err != nil {
return err
}
totalAckMsgSize += pageMsgSize
}
}
err = rmq.kv.MultiSave(ackedTsKvs)
if err != nil {
return err
}
ackedSizeKey := AckedSizeTitle + topicName
ackedSizeVal, err := rmq.kv.Load(ackedSizeKey)
if err != nil {
return err
}
ackedSize, err := strconv.ParseInt(ackedSizeVal, 10, 64)
if err != nil {
return err
}
ackedSize += totalAckMsgSize
err = rmq.kv.Save(ackedSizeKey, strconv.FormatInt(ackedSize, 10))
err := rmq.kv.MultiSave(ackedTsKvs)
if err != nil {
return err
}
@ -983,9 +968,21 @@ func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageI
readOpts := gorocksdb.NewDefaultReadOptions()
readOpts.SetPrefixSameAsStart(true)
iter := rmq.store.NewIterator(readOpts)
fixChanName, err := fixChannelName(topicName)
if err != nil {
log.Debug("RocksMQ: fixChannelName " + topicName + " failed")
return "", err
}
dataKey := path.Join(fixChanName, strconv.FormatInt(startMsgID, 10))
iter.Seek([]byte(dataKey))
// if iterate fail
if err := iter.Err(); err != nil {
return "", err
}
nowTs, err := getNowTs(rmq.idAllocator)
if err != nil {
return "", errors.New("can't get current ts from rocksmq idAllocator")
return "", errors.New("Can't get current ts from rocksmq idAllocator")
}
readerName := subscriptionRolePrefix + ReaderNamePrefix + strconv.FormatInt(nowTs, 10)
@ -999,7 +996,6 @@ func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageI
messageIDInclusive: messageIDInclusive,
readerMutex: make(chan struct{}, 1),
}
reader.Seek(startMsgID)
if vals, ok := rmq.readers.Load(topicName); ok {
readers := vals.([]*rocksmqReader)
readers = append(readers, reader)
@ -1024,17 +1020,17 @@ func (rmq *rocksmq) getReader(topicName, readerName string) *rocksmqReader {
}
// ReaderSeek seek a reader to the pointed position
func (rmq *rocksmq) ReaderSeek(topicName string, readerName string, msgID UniqueID) {
func (rmq *rocksmq) ReaderSeek(topicName string, readerName string, msgID UniqueID) error {
if rmq.isClosed() {
return
return errors.New(RmqNotServingErrMsg)
}
reader := rmq.getReader(topicName, readerName)
if reader == nil {
log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName))
return
return fmt.Errorf("reader not exist, topic %s, reader %s", topicName, readerName)
}
reader.Seek(msgID)
return nil
}
// Next get the next message of reader

View File

@ -78,9 +78,8 @@ func TestRocksmq_RegisterConsumer(t *testing.T) {
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := rmqPath + dbPathSuffix + suffix
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)
metaPath := rmqPath + metaPathSuffix + suffix
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.NoError(t, err)
@ -88,9 +87,14 @@ func TestRocksmq_RegisterConsumer(t *testing.T) {
topicName := "topic_register"
groupName := "group_register"
_ = rmq.DestroyConsumerGroup(topicName, groupName)
err = rmq.CreateTopic(topicName)
assert.NoError(t, err)
defer rmq.DestroyTopic(topicName)
err = rmq.CreateConsumerGroup(topicName, groupName)
assert.Nil(t, err)
defer rmq.DestroyConsumerGroup(topicName, groupName)
consumer := &Consumer{
Topic: topicName,
@ -98,10 +102,10 @@ func TestRocksmq_RegisterConsumer(t *testing.T) {
MsgMutex: make(chan struct{}),
}
rmq.RegisterConsumer(consumer)
exist, _ := rmq.ExistConsumerGroup(topicName, groupName)
exist, _, _ := rmq.ExistConsumerGroup(topicName, groupName)
assert.Equal(t, exist, true)
dummyGrpName := "group_dummy"
exist, _ = rmq.ExistConsumerGroup(topicName, dummyGrpName)
exist, _, _ = rmq.ExistConsumerGroup(topicName, dummyGrpName)
assert.Equal(t, exist, false)
msgA := "a_message"
@ -111,7 +115,7 @@ func TestRocksmq_RegisterConsumer(t *testing.T) {
_ = idAllocator.UpdateID()
_, err = rmq.Produce(topicName, pMsgs)
assert.Error(t, err)
assert.Nil(t, err)
rmq.Notify(topicName, groupName)
@ -129,25 +133,18 @@ func TestRocksmq_RegisterConsumer(t *testing.T) {
MsgMutex: make(chan struct{}),
}
rmq.RegisterConsumer(consumer2)
topicMu.Delete(topicName)
topicMu.Store(topicName, topicName)
assert.Error(t, rmq.DestroyConsumerGroup(topicName, groupName))
err = rmq.DestroyConsumerGroup(topicName, groupName)
assert.Error(t, err)
}
func TestRocksmq(t *testing.T) {
func TestRocksmq_Basic(t *testing.T) {
suffix := "_rmq"
kvPath := rmqPath + kvPathSuffix + suffix
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := rmqPath + dbPathSuffix + suffix
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)
metaPath := rmqPath + metaPathSuffix + suffix
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
@ -181,7 +178,7 @@ func TestRocksmq(t *testing.T) {
assert.Nil(t, err)
// double create consumer group
err = rmq.CreateConsumerGroup(channelName, groupName)
assert.Nil(t, err)
assert.Error(t, err)
cMsgs, err := rmq.Consume(channelName, groupName, 1)
assert.Nil(t, err)
assert.Equal(t, len(cMsgs), 1)
@ -201,9 +198,8 @@ func TestRocksmq_Dummy(t *testing.T) {
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := rmqPath + dbPathSuffix + suffix
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)
metaPath := rmqPath + metaPathSuffix + suffix
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
@ -216,8 +212,9 @@ func TestRocksmq_Dummy(t *testing.T) {
err = rmq.CreateTopic(channelName)
assert.Nil(t, err)
defer rmq.DestroyTopic(channelName)
// create topic twice should be ignored
err = rmq.CreateTopic(channelName)
assert.NoError(t, err)
assert.Nil(t, err)
channelName1 := "channel_dummy"
topicMu.Store(channelName1, new(sync.Mutex))
@ -258,7 +255,6 @@ func TestRocksmq_Dummy(t *testing.T) {
_, err = rmq.Consume(channelName, groupName1, 1)
assert.Error(t, err)
}
func TestRocksmq_Seek(t *testing.T) {
@ -268,9 +264,8 @@ func TestRocksmq_Seek(t *testing.T) {
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := rmqPath + dbPathSuffix + suffix
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)
metaPath := rmqPath + metaPathSuffix + suffix
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
@ -824,7 +819,6 @@ func TestReader_CornerCase(t *testing.T) {
extraMsgs[0] = ProducerMessage{Payload: []byte(msg)}
extraIds, _ = rmq.Produce(channelName, extraMsgs)
// assert.NoError(t, er)
fmt.Println(extraIds[0])
assert.Equal(t, 1, len(extraIds))
}()

View File

@ -20,30 +20,30 @@ import (
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/tecbot/gorocksdb"
"go.uber.org/zap"
)
// RocksmqRetentionTimeInMinutes is the time of retention
var RocksmqRetentionTimeInMinutes int64 = 10080
var RocksmqRetentionTimeInSecs int64 = 10080 * 60
// RocksmqRetentionSizeInMB is the size of retention
var RocksmqRetentionSizeInMB int64 = 8192
// Const value that used to convert unit
const (
MB = 1024 * 1024
MINUTE = 60
MB = 1024 * 1024
)
// TickerTimeInSeconds is the time of expired check, default 10 minutes
var TickerTimeInSeconds int64 = 10 * MINUTE
var TickerTimeInSeconds int64 = 60
type retentionInfo struct {
// key is topic name, value is last retention type
topics sync.Map
mutex sync.RWMutex
// key is topic name, value is last retention time
topicRetetionTime sync.Map
mutex sync.RWMutex
kv *rocksdbkv.RocksdbKV
db *gorocksdb.DB
@ -55,21 +55,21 @@ type retentionInfo struct {
func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) {
ri := &retentionInfo{
topics: sync.Map{},
mutex: sync.RWMutex{},
kv: kv,
db: db,
closeCh: make(chan struct{}),
closeWg: sync.WaitGroup{},
topicRetetionTime: sync.Map{},
mutex: sync.RWMutex{},
kv: kv,
db: db,
closeCh: make(chan struct{}),
closeWg: sync.WaitGroup{},
}
// Get topic from topic begin id
beginIDKeys, _, err := ri.kv.LoadWithPrefix(TopicBeginIDTitle)
topicKeys, _, err := ri.kv.LoadWithPrefix(TopicIDTitle)
if err != nil {
return nil, err
}
for _, key := range beginIDKeys {
topic := key[len(TopicBeginIDTitle):]
ri.topics.Store(topic, time.Now().Unix())
for _, key := range topicKeys {
topic := key[len(TopicIDTitle):]
ri.topicRetetionTime.Store(topic, time.Now().Unix())
topicMu.Store(topic, new(sync.Mutex))
}
return ri, nil
@ -79,7 +79,6 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf
// Because loadRetentionInfo may need some time, so do this asynchronously. Finally start retention goroutine.
func (ri *retentionInfo) startRetentionInfo() {
// var wg sync.WaitGroup
ri.kv.ResetPrefixLength(FixedChannelNameLen)
ri.closeWg.Add(1)
go ri.retention()
}
@ -98,9 +97,9 @@ func (ri *retentionInfo) retention() error {
return nil
case t := <-ticker.C:
timeNow := t.Unix()
checkTime := atomic.LoadInt64(&RocksmqRetentionTimeInMinutes) * MINUTE / 10
checkTime := atomic.LoadInt64(&RocksmqRetentionTimeInSecs) / 10
ri.mutex.RLock()
ri.topics.Range(func(k, v interface{}) bool {
ri.topicRetetionTime.Range(func(k, v interface{}) bool {
topic, _ := k.(string)
lastRetentionTs, ok := v.(int64)
if !ok {
@ -112,6 +111,7 @@ func (ri *retentionInfo) retention() error {
if err != nil {
log.Warn("Retention expired clean failed", zap.Any("error", err))
}
ri.topicRetetionTime.Store(topic, timeNow)
}
return true
})
@ -136,132 +136,75 @@ func (ri *retentionInfo) Stop() {
func (ri *retentionInfo) expiredCleanUp(topic string) error {
log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic)
var deletedAckedSize int64
var startID UniqueID
var pageStartID UniqueID
var pageCleaned UniqueID
var pageEndID UniqueID
var err error
fixedAckedTsKey, _ := constructKey(AckedTsTitle, topic)
fixedAckedTsKey := constructKey(AckedTsTitle, topic)
// calculate total acked size, simply add all page info
totalAckedSize, err := ri.calculateTopicAckedSize(topic)
if err != nil {
return err
}
// Quick Path, No page to check
if totalAckedSize == 0 {
log.Debug("All messages are not expired, skip retention because no ack", zap.Any("topic", topic))
return nil
}
pageReadOpts := gorocksdb.NewDefaultReadOptions()
defer pageReadOpts.Destroy()
pageReadOpts.SetPrefixSameAsStart(true)
pageMsgPrefix := constructKey(PageMsgSizeTitle, topic)
// ensure the iterator won't iterate to other topics
pageReadOpts.SetIterateUpperBound([]byte(typeutil.AddOne(pageMsgPrefix)))
pageIter := ri.kv.DB.NewIterator(pageReadOpts)
defer pageIter.Close()
pageMsgPrefix, _ := constructKey(PageMsgSizeTitle, topic)
pageIter.Seek([]byte(pageMsgPrefix))
if pageIter.Valid() {
pageStartID, err = strconv.ParseInt(string(pageIter.Key().Data())[FixedChannelNameLen+1:], 10, 64)
for ; pageIter.Valid(); pageIter.Next() {
pKey := pageIter.Key()
pageID, err := parsePageID(string(pKey.Data()))
if pKey != nil {
pKey.Free()
}
if err != nil {
return err
}
for ; pageIter.Valid(); pageIter.Next() {
pKey := pageIter.Key()
pageID, err := strconv.ParseInt(string(pKey.Data())[FixedChannelNameLen+1:], 10, 64)
if pKey != nil {
pKey.Free()
ackedTsKey := fixedAckedTsKey + "/" + strconv.FormatInt(pageID, 10)
ackedTsVal, err := ri.kv.Load(ackedTsKey)
if err != nil {
return err
}
// not acked page, TODO add TTL info there
if ackedTsVal == "" {
break
}
ackedTs, err := strconv.ParseInt(ackedTsVal, 10, 64)
if err != nil {
return err
}
if msgTimeExpiredCheck(ackedTs) {
pageEndID = pageID
pValue := pageIter.Value()
size, err := strconv.ParseInt(string(pValue.Data()), 10, 64)
if pValue != nil {
pValue.Free()
}
if err != nil {
return err
}
ackedTsKey := fixedAckedTsKey + "/" + strconv.FormatInt(pageID, 10)
ackedTsVal, err := ri.kv.Load(ackedTsKey)
if err != nil {
return err
}
if ackedTsVal == "" {
break
}
ackedTs, err := strconv.ParseInt(ackedTsVal, 10, 64)
if err != nil {
return err
}
if msgTimeExpiredCheck(ackedTs) {
pageEndID = pageID
pValue := pageIter.Value()
size, err := strconv.ParseInt(string(pValue.Data()), 10, 64)
if pValue != nil {
pValue.Free()
}
if err != nil {
return err
}
deletedAckedSize += size
} else {
break
}
deletedAckedSize += size
pageCleaned++
} else {
break
}
}
// TODO(yukun): Remove ackedTs expiredCheck one by one
// ackedReadOpts := gorocksdb.NewDefaultReadOptions()
// defer ackedReadOpts.Destroy()
// ackedReadOpts.SetPrefixSameAsStart(true)
// ackedIter := ri.kv.DB.NewIterator(ackedReadOpts)
// defer ackedIter.Close()
// if err != nil {
// return err
// }
// ackedIter.Seek([]byte(fixedAckedTsKey))
// if !ackedIter.Valid() {
// return nil
// }
// startID, err = strconv.ParseInt(string(ackedIter.Key().Data())[FixedChannelNameLen+1:], 10, 64)
// if err != nil {
// return err
// }
// if endID > startID {
// newPos := fixedAckedTsKey + "/" + strconv.FormatInt(endID, 10)
// ackedIter.Seek([]byte(newPos))
// }
// for ; ackedIter.Valid(); ackedIter.Next() {
// aKey := ackedIter.Key()
// aValue := ackedIter.Value()
// ackedTs, err := strconv.ParseInt(string(aValue.Data()), 10, 64)
// if aValue != nil {
// aValue.Free()
// }
// if err != nil {
// if aKey != nil {
// aKey.Free()
// }
// return err
// }
// if msgTimeExpiredCheck(ackedTs) {
// endID, err = strconv.ParseInt(string(aKey.Data())[FixedChannelNameLen+1:], 10, 64)
// if aKey != nil {
// aKey.Free()
// }
// if err != nil {
// return err
// }
// } else {
// if aKey != nil {
// aKey.Free()
// }
// break
// }
// }
if pageEndID == 0 {
log.Debug("All messages are not time expired")
}
log.Debug("Expired check by retention time", zap.Any("topic", topic), zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize))
ackedSizeKey := AckedSizeTitle + topic
totalAckedSizeVal, err := ri.kv.Load(ackedSizeKey)
if err != nil {
return err
}
totalAckedSize, err := strconv.ParseInt(totalAckedSizeVal, 10, 64)
if err != nil {
if err := pageIter.Err(); err != nil {
return err
}
log.Debug("Expired check by retention time", zap.Any("topic", topic),
zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize), zap.Any("pageCleaned", pageCleaned))
for ; pageIter.Valid(); pageIter.Next() {
pValue := pageIter.Value()
size, err := strconv.ParseInt(string(pValue.Data()), 10, 64)
@ -278,39 +221,97 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
}
curDeleteSize := deletedAckedSize + size
if msgSizeExpiredCheck(curDeleteSize, totalAckedSize) {
pageEndID, err = strconv.ParseInt(pKeyStr[FixedChannelNameLen+1:], 10, 64)
pageEndID, err = parsePageID(pKeyStr)
if err != nil {
return err
}
deletedAckedSize += size
pageCleaned++
} else {
break
}
}
if err := pageIter.Err(); err != nil {
return err
}
if pageEndID == 0 {
log.Debug("All messages are not expired")
log.Debug("All messages are not expired, skip retention", zap.Any("topic", topic))
return nil
}
log.Debug("ExpiredCleanUp: ", zap.Any("topic", topic), zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize))
log.Debug("Expired check by message size: ", zap.Any("topic", topic),
zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize), zap.Any("pageCleaned", pageCleaned))
return ri.cleanData(topic, pageEndID)
}
func (ri *retentionInfo) calculateTopicAckedSize(topic string) (int64, error) {
fixedAckedTsKey := constructKey(AckedTsTitle, topic)
pageReadOpts := gorocksdb.NewDefaultReadOptions()
defer pageReadOpts.Destroy()
pageMsgPrefix := constructKey(PageMsgSizeTitle, topic)
// ensure the iterator won't iterate to other topics
pageReadOpts.SetIterateUpperBound([]byte(typeutil.AddOne(pageMsgPrefix)))
pageIter := ri.kv.DB.NewIterator(pageReadOpts)
defer pageIter.Close()
pageIter.Seek([]byte(pageMsgPrefix))
var ackedSize int64
for ; pageIter.Valid(); pageIter.Next() {
key := pageIter.Key()
pageID, err := parsePageID(string(key.Data()))
if key != nil {
key.Free()
}
if err != nil {
return -1, err
}
// check if page is acked
ackedTsKey := fixedAckedTsKey + "/" + strconv.FormatInt(pageID, 10)
ackedTsVal, err := ri.kv.Load(ackedTsKey)
if err != nil {
return -1, err
}
// not acked yet, break
// TODO, Add TTL logic here, mark it as acked if not
if ackedTsVal == "" {
break
}
// Get page size
val := pageIter.Value()
size, err := strconv.ParseInt(string(val.Data()), 10, 64)
if val != nil {
val.Free()
}
if err != nil {
return -1, err
}
ackedSize += size
}
if err := pageIter.Err(); err != nil {
return -1, err
}
return ackedSize, nil
}
func (ri *retentionInfo) cleanData(topic string, pageEndID UniqueID) error {
writeBatch := gorocksdb.NewWriteBatch()
defer writeBatch.Destroy()
pageStartIDKey := pageMsgPrefix + "/" + strconv.FormatInt(pageStartID, 10)
pageMsgPrefix := constructKey(PageMsgSizeTitle, topic)
fixedAckedTsKey := constructKey(AckedTsTitle, topic)
pageStartIDKey := pageMsgPrefix + "/"
pageEndIDKey := pageMsgPrefix + "/" + strconv.FormatInt(pageEndID+1, 10)
if pageStartID == pageEndID {
if pageStartID != 0 {
writeBatch.Delete([]byte(pageStartIDKey))
}
} else if pageStartID < pageEndID {
writeBatch.DeleteRange([]byte(pageStartIDKey), []byte(pageEndIDKey))
}
writeBatch.DeleteRange([]byte(pageStartIDKey), []byte(pageEndIDKey))
ackedStartIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(startID))
ackedEndIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(pageEndID+1))
if startID > pageEndID {
return nil
}
pageTsPrefix := constructKey(PageTsTitle, topic)
pageTsStartIDKey := pageTsPrefix + "/"
pageTsEndIDKey := pageTsPrefix + "/" + strconv.FormatInt(pageEndID+1, 10)
writeBatch.DeleteRange([]byte(pageTsStartIDKey), []byte(pageTsEndIDKey))
ackedStartIDKey := fixedAckedTsKey + "/"
ackedEndIDKey := fixedAckedTsKey + "/" + strconv.FormatInt(pageEndID+1, 10)
writeBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey))
ll, ok := topicMu.Load(topic)
@ -323,26 +324,18 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
}
lock.Lock()
defer lock.Unlock()
currentAckedSizeVal, err := ri.kv.Load(ackedSizeKey)
if err != nil {
return err
}
currentAckedSize, err := strconv.ParseInt(currentAckedSizeVal, 10, 64)
if err != nil {
return err
}
newAckedSize := currentAckedSize - deletedAckedSize
writeBatch.Put([]byte(ackedSizeKey), []byte(strconv.FormatInt(newAckedSize, 10)))
err = DeleteMessages(ri.db, topic, startID, pageEndID)
err := DeleteMessages(ri.db, topic, 0, pageEndID)
if err != nil {
return err
}
writeOpts := gorocksdb.NewDefaultWriteOptions()
defer writeOpts.Destroy()
ri.kv.DB.Write(writeOpts, writeBatch)
err = ri.kv.DB.Write(writeOpts, writeBatch)
if err != nil {
return err
}
return nil
}
@ -359,14 +352,9 @@ func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) err
log.Debug("RocksMQ: combKey(" + topic + "," + strconv.FormatInt(endID, 10) + ")")
return err
}
writeBatch := gorocksdb.NewWriteBatch()
defer writeBatch.Destroy()
if startID == endID {
writeBatch.Delete([]byte(startKey))
} else {
writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
}
writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
opts := gorocksdb.NewDefaultWriteOptions()
defer opts.Destroy()
err = db.Write(opts, writeBatch)
@ -375,14 +363,19 @@ func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) err
}
log.Debug("Delete message for topic: "+topic, zap.Any("startID", startID), zap.Any("endID", endID))
return nil
}
func msgTimeExpiredCheck(ackedTs int64) bool {
return ackedTs+atomic.LoadInt64(&RocksmqRetentionTimeInMinutes)*MINUTE < time.Now().Unix()
if RocksmqRetentionTimeInSecs < 0 {
return false
}
return ackedTs+atomic.LoadInt64(&RocksmqRetentionTimeInSecs) < time.Now().Unix()
}
func msgSizeExpiredCheck(deletedAckedSize, ackedSize int64) bool {
if RocksmqRetentionSizeInMB < 0 {
return false
}
return ackedSize-deletedAckedSize > atomic.LoadInt64(&RocksmqRetentionSizeInMB)*MB
}

View File

@ -12,7 +12,6 @@
package rocksmq
import (
"math/rand"
"os"
"strconv"
"sync/atomic"
@ -28,43 +27,30 @@ import (
var retentionPath = "/tmp/rmq_retention/"
func TestMain(m *testing.M) {
err := os.MkdirAll(retentionPath, os.ModePerm)
if err != nil {
log.Error("MkdirALl error for path", zap.Any("path", retentionPath))
return
}
atomic.StoreInt64(&TickerTimeInSeconds, 6)
code := m.Run()
os.Exit(code)
}
func genRandonName() string {
len := 6
r := rand.New(rand.NewSource(time.Now().UnixNano()))
bytes := make([]byte, len)
for i := 0; i < len; i++ {
b := r.Intn(26) + 65
bytes[i] = byte(b)
// Test write data and wait for retention
func TestRmqRetention_Basic(t *testing.T) {
err := os.MkdirAll(retentionPath, os.ModePerm)
if err != nil {
log.Error("MkdirAll error for path", zap.Any("path", retentionPath))
return
}
return string(bytes)
}
func TestRmqRetention(t *testing.T) {
defer os.RemoveAll(retentionPath)
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0)
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 0)
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 2)
defer atomic.StoreInt64(&TickerTimeInSeconds, 6)
kvPath := retentionPath + kvPathSuffix
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := retentionPath + dbPathSuffix
defer os.RemoveAll(rocksdbPath)
metaPath := retentionPath + metaPathSuffix
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
rmq, err := NewRocksMQ(rocksdbPath, nil)
defer rmq.Close()
assert.Nil(t, err)
defer rmq.stopRetention()
@ -111,13 +97,147 @@ func TestRmqRetention(t *testing.T) {
newRes, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
assert.Equal(t, len(newRes), 0)
//////////////////////////////////////////////////
// test valid value case
rmq.retentionInfo.topics.Store(topicName, "dummy")
// test acked size acked ts and other meta are updated as expect
msgSizeKey := MessageSizeTitle + topicName
msgSizeVal, err := rmq.kv.Load(msgSizeKey)
assert.NoError(t, err)
assert.Equal(t, msgSizeVal, "0")
pageMsgSizeKey := constructKey(PageMsgSizeTitle, topicName)
keys, values, err := rmq.kv.LoadWithPrefix(pageMsgSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 0)
assert.Equal(t, len(values), 0)
pageTsSizeKey := constructKey(PageTsTitle, topicName)
keys, values, err = rmq.kv.LoadWithPrefix(pageTsSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 0)
assert.Equal(t, len(values), 0)
aclTsSizeKey := constructKey(AckedTsTitle, topicName)
keys, values, err = rmq.kv.LoadWithPrefix(aclTsSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 0)
assert.Equal(t, len(values), 0)
}
// Not acked message should not be purged
func TestRmqRetention_NotConsumed(t *testing.T) {
err := os.MkdirAll(retentionPath, os.ModePerm)
if err != nil {
log.Error("MkdirAll error for path", zap.Any("path", retentionPath))
return
}
defer os.RemoveAll(retentionPath)
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 0)
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 2)
rocksdbPath := retentionPath + dbPathSuffix
defer os.RemoveAll(rocksdbPath)
metaPath := retentionPath + metaPathSuffix
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, nil)
defer rmq.Close()
assert.Nil(t, err)
defer rmq.stopRetention()
topicName := "topic_a"
err = rmq.CreateTopic(topicName)
assert.Nil(t, err)
defer rmq.DestroyTopic(topicName)
msgNum := 100
pMsgs := make([]ProducerMessage, msgNum)
for i := 0; i < msgNum; i++ {
msg := "message_" + strconv.Itoa(i)
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
ids, err := rmq.Produce(topicName, pMsgs)
assert.Nil(t, err)
assert.Equal(t, len(pMsgs), len(ids))
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(topicName, groupName)
err = rmq.CreateConsumerGroup(topicName, groupName)
consumer := &Consumer{
Topic: topicName,
GroupName: groupName,
}
rmq.RegisterConsumer(consumer)
assert.Nil(t, err)
cMsgs := make([]ConsumerMessage, 0)
for i := 0; i < 5; i++ {
cMsg, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
cMsgs = append(cMsgs, cMsg[0])
}
assert.Equal(t, len(cMsgs), 5)
id := cMsgs[0].MsgID
aclTsSizeKey := constructKey(AckedTsTitle, topicName)
keys, values, err := rmq.kv.LoadWithPrefix(aclTsSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 2)
assert.Equal(t, len(values), 2)
// wait for retention
checkTimeInterval := 2
time.Sleep(time.Duration(checkTimeInterval+1) * time.Second)
// Seek to a previous consumed message, the message should be clean up
err = rmq.Seek(topicName, groupName, cMsgs[1].MsgID)
assert.Nil(t, err)
newRes, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
assert.Equal(t, len(newRes), 1)
assert.Equal(t, newRes[0].MsgID, id+4)
// test acked size acked ts and other meta are updated as expect
msgSizeKey := MessageSizeTitle + topicName
msgSizeVal, err := rmq.kv.Load(msgSizeKey)
assert.NoError(t, err)
assert.Equal(t, msgSizeVal, "0")
// should only clean 2 pages
pageMsgSizeKey := constructKey(PageMsgSizeTitle, topicName)
keys, values, err = rmq.kv.LoadWithPrefix(pageMsgSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 48)
assert.Equal(t, len(values), 48)
pageTsSizeKey := constructKey(PageTsTitle, topicName)
keys, values, err = rmq.kv.LoadWithPrefix(pageTsSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 48)
assert.Equal(t, len(values), 48)
aclTsSizeKey = constructKey(AckedTsTitle, topicName)
keys, values, err = rmq.kv.LoadWithPrefix(aclTsSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 0)
assert.Equal(t, len(values), 0)
}
// Test multiple topic
func TestRmqRetention_MultipleTopic(t *testing.T) {
}
func TestRetentionInfo_InitRetentionInfo(t *testing.T) {
err := os.MkdirAll(retentionPath, os.ModePerm)
if err != nil {
log.Error("MkdirALl error for path", zap.Any("path", retentionPath))
return
}
defer os.RemoveAll(retentionPath)
suffix := "init"
kvPath := retentionPath + kvPathSuffix + suffix
defer os.RemoveAll(kvPath)
@ -133,32 +253,24 @@ func TestRetentionInfo_InitRetentionInfo(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, rmq)
rmq.retentionInfo.kv.DB = nil
_, err = initRetentionInfo(rmq.retentionInfo.kv, rmq.retentionInfo.db)
assert.Error(t, err)
}
func TestRmqRetention_Complex(t *testing.T) {
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 1)
atomic.StoreInt64(&RocksmqPageSize, 10)
kvPath := retentionPath + "kv_com"
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := retentionPath + "db_com"
defer os.RemoveAll(rocksdbPath)
metaPath := retentionPath + "meta_kv_com"
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.stopRetention()
topicName := "topic_a"
err = rmq.CreateTopic(topicName)
assert.Nil(t, err)
defer rmq.DestroyTopic(topicName)
rmq.Close()
rmq, err = NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
assert.NotNil(t, rmq)
assert.Equal(t, rmq.isClosed(), false)
// write some data, restart and check.
topicName = "topic_a"
err = rmq.CreateTopic(topicName)
assert.Nil(t, err)
topicName = "topic_b"
err = rmq.CreateTopic(topicName)
assert.Nil(t, err)
msgNum := 100
pMsgs := make([]ProducerMessage, msgNum)
@ -171,42 +283,22 @@ func TestRmqRetention_Complex(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, len(pMsgs), len(ids))
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(topicName, groupName)
err = rmq.CreateConsumerGroup(topicName, groupName)
consumer := &Consumer{
Topic: topicName,
GroupName: groupName,
}
rmq.RegisterConsumer(consumer)
assert.Nil(t, err)
cMsgs := make([]ConsumerMessage, 0)
for i := 0; i < msgNum; i++ {
cMsg, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
cMsgs = append(cMsgs, cMsg[0])
}
assert.Equal(t, len(cMsgs), msgNum)
checkTimeInterval := atomic.LoadInt64(&RocksmqRetentionTimeInMinutes) * MINUTE / 10
time.Sleep(time.Duration(checkTimeInterval*2) * time.Second)
// Seek to a previous consumed message, the message should be clean up
log.Debug("cMsg", zap.Any("id", cMsgs[10].MsgID))
err = rmq.Seek(topicName, groupName, cMsgs[10].MsgID)
assert.Nil(t, err)
newRes, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
//TODO(yukun)
log.Debug("Consume result", zap.Any("result len", len(newRes)))
// assert.NotEqual(t, newRes[0].MsgID, cMsgs[11].MsgID)
rmq.Close()
}
func TestRmqRetention_PageTimeExpire(t *testing.T) {
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0)
err := os.MkdirAll(retentionPath, os.ModePerm)
if err != nil {
log.Error("MkdirALl error for path", zap.Any("path", retentionPath))
return
}
defer os.RemoveAll(retentionPath)
// no retention by size
atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1)
// retention by secs
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 5)
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 1)
kvPath := retentionPath + "kv_com1"
os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
@ -218,7 +310,7 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) {
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.stopRetention()
defer rmq.Close()
topicName := "topic_a"
err = rmq.CreateTopic(topicName)
@ -236,6 +328,126 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, len(pMsgs), len(ids))
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(topicName, groupName)
err = rmq.CreateConsumerGroup(topicName, groupName)
assert.NoError(t, err)
consumer := &Consumer{
Topic: topicName,
GroupName: groupName,
}
rmq.RegisterConsumer(consumer)
cMsgs := make([]ConsumerMessage, 0)
for i := 0; i < msgNum; i++ {
cMsg, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
cMsgs = append(cMsgs, cMsg[0])
}
assert.Equal(t, len(cMsgs), msgNum)
assert.Equal(t, cMsgs[0].MsgID, ids[0])
time.Sleep(time.Duration(3) * time.Second)
// insert another 100 messages which should not be cleand up
pMsgs2 := make([]ProducerMessage, msgNum)
for i := 0; i < msgNum; i++ {
msg := "message_" + strconv.Itoa(i+100)
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs2[i] = pMsg
}
ids2, err := rmq.Produce(topicName, pMsgs2)
assert.Nil(t, err)
assert.Equal(t, len(pMsgs2), len(ids2))
assert.Nil(t, err)
cMsgs = make([]ConsumerMessage, 0)
for i := 0; i < msgNum; i++ {
cMsg, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
cMsgs = append(cMsgs, cMsg[0])
}
assert.Equal(t, len(cMsgs), msgNum)
assert.Equal(t, cMsgs[0].MsgID, ids2[0])
time.Sleep(time.Duration(3) * time.Second)
err = rmq.Seek(topicName, groupName, ids[10])
assert.Nil(t, err)
newRes, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
assert.Equal(t, len(newRes), 1)
// point to first not consumed messages
assert.Equal(t, newRes[0].MsgID, ids2[0])
// test acked size acked ts and other meta are updated as expect
msgSizeKey := MessageSizeTitle + topicName
msgSizeVal, err := rmq.kv.Load(msgSizeKey)
assert.NoError(t, err)
assert.Equal(t, msgSizeVal, "0")
// 100 page left, each entity is a page
pageMsgSizeKey := constructKey(PageMsgSizeTitle, topicName)
keys, values, err := rmq.kv.LoadWithPrefix(pageMsgSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 100)
assert.Equal(t, len(values), 100)
pageTsSizeKey := constructKey(PageTsTitle, topicName)
keys, values, err = rmq.kv.LoadWithPrefix(pageTsSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 100)
assert.Equal(t, len(values), 100)
aclTsSizeKey := constructKey(AckedTsTitle, topicName)
keys, values, err = rmq.kv.LoadWithPrefix(aclTsSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 100)
assert.Equal(t, len(values), 100)
}
func TestRmqRetention_PageSizeExpire(t *testing.T) {
err := os.MkdirAll(retentionPath, os.ModePerm)
if err != nil {
log.Error("MkdirALl error for path", zap.Any("path", retentionPath))
return
}
defer os.RemoveAll(retentionPath)
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 1)
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, -1)
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 2)
kvPath := retentionPath + "kv_com2"
os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := retentionPath + "db_com2"
os.RemoveAll(rocksdbPath)
metaPath := retentionPath + "meta_kv_com2"
os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
topicName := "topic_a"
err = rmq.CreateTopic(topicName)
assert.Nil(t, err)
defer rmq.DestroyTopic(topicName)
// need to be larger than 1M
msgNum := 100000
pMsgs := make([]ProducerMessage, msgNum)
for i := 0; i < msgNum; i++ {
msg := "message_" + strconv.Itoa(i)
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
ids, err := rmq.Produce(topicName, pMsgs)
assert.Nil(t, err)
assert.Equal(t, len(pMsgs), len(ids))
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(topicName, groupName)
err = rmq.CreateConsumerGroup(topicName, groupName)
@ -255,14 +467,12 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) {
}
assert.Equal(t, len(cMsgs), msgNum)
checkTimeInterval := 7
time.Sleep(time.Duration(checkTimeInterval) * time.Second)
// Seek to a previous consumed message, the message should be clean up
log.Debug("cMsg", zap.Any("id", cMsgs[10].MsgID))
err = rmq.Seek(topicName, groupName, cMsgs[len(cMsgs)/2].MsgID)
time.Sleep(time.Duration(2) * time.Second)
err = rmq.Seek(topicName, groupName, ids[0])
assert.Nil(t, err)
newRes, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
assert.Equal(t, len(newRes), 0)
// assert.NotEqual(t, newRes[0].MsgID, cMsgs[11].MsgID)
assert.Equal(t, len(newRes), 1)
// make sure clean up happens
assert.True(t, newRes[0].MsgID > ids[0])
}

View File

@ -0,0 +1,15 @@
package typeutil
// Add one to string, add one on empty string return empty
func AddOne(data string) string {
if len(data) == 0 {
return data
}
var datab = []byte(data)
if datab[len(datab)-1] != 255 {
datab[len(datab)-1]++
} else {
datab = append(datab, byte(0))
}
return string(datab)
}

View File

@ -0,0 +1,29 @@
package typeutil
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestAddOne(t *testing.T) {
input := ""
output := AddOne(input)
assert.Equal(t, output, "")
input = "a"
output = AddOne(input)
assert.Equal(t, output, "b")
input = "aaa="
output = AddOne(input)
assert.Equal(t, output, "aaa>")
// test the increate case
binary := []byte{1, 20, 255}
input = string(binary)
output = AddOne(input)
assert.Equal(t, len(output), 4)
resultb := []byte(output)
assert.Equal(t, resultb, []byte{1, 20, 255, 0})
}