Add rocksmq retention (#6617)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/7124/head
yukun 2021-08-16 18:46:10 +08:00 committed by GitHub
parent 671b2737d2
commit 847586eb95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1048 additions and 70 deletions

View File

@ -38,9 +38,9 @@ import (
"github.com/milvus-io/milvus/internal/util/trace"
)
func newMsgFactory(localMsg bool, rocksmqPath string) msgstream.Factory {
func newMsgFactory(localMsg bool) msgstream.Factory {
if localMsg {
return msgstream.NewRmsFactory(rocksmqPath)
return msgstream.NewRmsFactory()
}
return msgstream.NewPmsFactory()
}
@ -87,7 +87,7 @@ func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *compone
defer log.Sync()
}
factory := newMsgFactory(localMsg, rootcoord.Params.RocksmqPath)
factory := newMsgFactory(localMsg)
var err error
rc, err = components.NewRootCoord(ctx, factory)
if err != nil {
@ -116,7 +116,7 @@ func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string
defer log.Sync()
}
factory := newMsgFactory(localMsg, proxy.Params.RocksmqPath)
factory := newMsgFactory(localMsg)
var err error
pn, err = components.NewProxy(ctx, factory)
if err != nil {
@ -144,9 +144,7 @@ func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *compon
defer log.Sync()
}
// FIXME(yukun): newMsgFactory requires parameter rocksmqPath, but won't be used here
// so hardcode the path to /tmp/invalid_milvus_rdb
factory := newMsgFactory(localMsg, "/tmp/invalid_milvus_rdb")
factory := newMsgFactory(localMsg)
var err error
qs, err = components.NewQueryCoord(ctx, factory)
if err != nil {
@ -175,7 +173,7 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias st
defer log.Sync()
}
factory := newMsgFactory(localMsg, querynode.Params.RocksmqPath)
factory := newMsgFactory(localMsg)
var err error
qn, err = components.NewQueryNode(ctx, factory)
if err != nil {
@ -203,7 +201,7 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *compone
defer log.Sync()
}
factory := newMsgFactory(localMsg, datacoord.Params.RocksmqPath)
factory := newMsgFactory(localMsg)
var err error
ds, err = components.NewDataCoord(ctx, factory)
if err != nil {
@ -232,7 +230,7 @@ func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias str
defer log.Sync()
}
factory := newMsgFactory(localMsg, datanode.Params.RocksmqPath)
factory := newMsgFactory(localMsg)
var err error
dn, err = components.NewDataNode(ctx, factory)
if err != nil {

View File

@ -36,6 +36,8 @@ pulsar:
rocksmq:
path: /var/lib/milvus/rdb_data
retentionTimeInMinutes: 4320
retentionSizeInMB: 0
rootCoord:
address: localhost

View File

@ -382,6 +382,9 @@ type GlobalParamsTable struct {
MasterAddress string
PulsarAddress string
RocksmqPath string
RocksmqRetentionTimeInMinutes int64
RocksmqRetentionSizeInMB int64
ProxyID UniqueID
TimeTickInterval time.Duration

View File

@ -16,10 +16,10 @@ import (
)
type RocksdbKV struct {
opts *gorocksdb.Options
db *gorocksdb.DB
writeOptions *gorocksdb.WriteOptions
readOptions *gorocksdb.ReadOptions
Opts *gorocksdb.Options
DB *gorocksdb.DB
WriteOptions *gorocksdb.WriteOptions
ReadOptions *gorocksdb.ReadOptions
name string
}
@ -39,16 +39,16 @@ func NewRocksdbKV(name string) (*RocksdbKV, error) {
return nil, err
}
return &RocksdbKV{
opts: opts,
db: db,
writeOptions: wo,
readOptions: ro,
Opts: opts,
DB: db,
WriteOptions: wo,
ReadOptions: ro,
name: name,
}, nil
}
func (kv *RocksdbKV) Close() {
kv.db.Close()
kv.DB.Close()
}
func (kv *RocksdbKV) GetName() string {
@ -56,22 +56,22 @@ func (kv *RocksdbKV) GetName() string {
}
func (kv *RocksdbKV) Load(key string) (string, error) {
value, err := kv.db.Get(kv.readOptions, []byte(key))
value, err := kv.DB.Get(kv.ReadOptions, []byte(key))
defer value.Free()
return string(value.Data()), err
}
func (kv *RocksdbKV) LoadWithPrefix(key string) ([]string, []string, error) {
kv.readOptions.SetPrefixSameAsStart(true)
kv.db.Close()
kv.opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(len(key)))
kv.ReadOptions.SetPrefixSameAsStart(true)
kv.DB.Close()
kv.Opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(len(key)))
var err error
kv.db, err = gorocksdb.OpenDb(kv.opts, kv.GetName())
kv.DB, err = gorocksdb.OpenDb(kv.Opts, kv.GetName())
if err != nil {
return nil, nil, err
}
iter := kv.db.NewIterator(kv.readOptions)
iter := kv.DB.NewIterator(kv.ReadOptions)
defer iter.Close()
keys := make([]string, 0)
values := make([]string, 0)
@ -93,7 +93,7 @@ func (kv *RocksdbKV) LoadWithPrefix(key string) ([]string, []string, error) {
func (kv *RocksdbKV) MultiLoad(keys []string) ([]string, error) {
values := make([]string, 0, len(keys))
for _, key := range keys {
value, err := kv.db.Get(kv.readOptions, []byte(key))
value, err := kv.DB.Get(kv.ReadOptions, []byte(key))
if err != nil {
return []string{}, err
}
@ -103,7 +103,7 @@ func (kv *RocksdbKV) MultiLoad(keys []string) ([]string, error) {
}
func (kv *RocksdbKV) Save(key, value string) error {
err := kv.db.Put(kv.writeOptions, []byte(key), []byte(value))
err := kv.DB.Put(kv.WriteOptions, []byte(key), []byte(value))
return err
}
@ -113,26 +113,26 @@ func (kv *RocksdbKV) MultiSave(kvs map[string]string) error {
for k, v := range kvs {
writeBatch.Put([]byte(k), []byte(v))
}
err := kv.db.Write(kv.writeOptions, writeBatch)
err := kv.DB.Write(kv.WriteOptions, writeBatch)
return err
}
func (kv *RocksdbKV) RemoveWithPrefix(prefix string) error {
kv.readOptions.SetPrefixSameAsStart(true)
kv.db.Close()
kv.opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(len(prefix)))
kv.ReadOptions.SetPrefixSameAsStart(true)
kv.DB.Close()
kv.Opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(len(prefix)))
var err error
kv.db, err = gorocksdb.OpenDb(kv.opts, kv.GetName())
kv.DB, err = gorocksdb.OpenDb(kv.Opts, kv.GetName())
if err != nil {
return err
}
iter := kv.db.NewIterator(kv.readOptions)
iter := kv.DB.NewIterator(kv.ReadOptions)
defer iter.Close()
iter.Seek([]byte(prefix))
for ; iter.Valid(); iter.Next() {
key := iter.Key()
err := kv.db.Delete(kv.writeOptions, key.Data())
err := kv.DB.Delete(kv.WriteOptions, key.Data())
if err != nil {
return nil
}
@ -144,7 +144,7 @@ func (kv *RocksdbKV) RemoveWithPrefix(prefix string) error {
}
func (kv *RocksdbKV) Remove(key string) error {
err := kv.db.Delete(kv.writeOptions, []byte(key))
err := kv.DB.Delete(kv.WriteOptions, []byte(key))
return err
}
@ -154,7 +154,7 @@ func (kv *RocksdbKV) MultiRemove(keys []string) error {
for _, key := range keys {
writeBatch.Delete([]byte(key))
}
err := kv.db.Write(kv.writeOptions, writeBatch)
err := kv.DB.Write(kv.WriteOptions, writeBatch)
return err
}
@ -167,13 +167,29 @@ func (kv *RocksdbKV) MultiSaveAndRemove(saves map[string]string, removals []stri
for _, key := range removals {
writeBatch.Delete([]byte(key))
}
err := kv.db.Write(kv.writeOptions, writeBatch)
err := kv.DB.Write(kv.WriteOptions, writeBatch)
return err
}
func (kv *RocksdbKV) DeleteRange(startKey, endKey string) error {
writeBatch := gorocksdb.NewWriteBatch()
defer writeBatch.Clear()
if len(startKey) == 0 {
iter := kv.DB.NewIterator(kv.ReadOptions)
defer iter.Close()
iter.SeekToFirst()
startKey = string(iter.Key().Data())
}
writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
err := kv.DB.Write(kv.WriteOptions, writeBatch)
return err
}
func (kv *RocksdbKV) MultiRemoveWithPrefix(keys []string) error {
panic("not implement")
}
func (kv *RocksdbKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
panic("not implement")
}

View File

@ -12,6 +12,8 @@
package rocksdbkv_test
import (
"strconv"
"sync"
"testing"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
@ -103,3 +105,28 @@ func TestRocksdbKV_Prefix(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, val, "1234555")
}
func TestRocksdbKV_Goroutines(t *testing.T) {
name := "/tmp/rocksdb"
rocksdbkv, err := rocksdbkv.NewRocksdbKV(name)
assert.Nil(t, err)
defer rocksdbkv.Close()
defer rocksdbkv.RemoveWithPrefix("")
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := "key_" + strconv.Itoa(i)
val := "val_" + strconv.Itoa(i)
err := rocksdbkv.Save(key, val)
assert.Nil(t, err)
getVal, err := rocksdbkv.Load(key)
assert.Nil(t, err)
assert.Equal(t, getVal, val)
}(i)
}
wg.Wait()
}

View File

@ -15,7 +15,6 @@ import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/mqclient"
"github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq"
rocksmqserver "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
@ -106,14 +105,13 @@ func (f *RmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) {
return NewMqMsgStream(ctx, f.ReceiveBufSize, f.RmqBufSize, rmqClient, f.dispatcherFactory.NewUnmarshalDispatcher())
}
func NewRmsFactory(rocksmqPath string) Factory {
func NewRmsFactory() Factory {
f := &RmsFactory{
dispatcherFactory: ProtoUDFactory{},
ReceiveBufSize: 1024,
RmqBufSize: 1024,
}
log.Debug("RocksmqPath=" + rocksmqPath)
rocksmqserver.InitRocksMQ(rocksmqPath)
rocksmqserver.InitRocksMQ()
return f
}

View File

@ -1043,10 +1043,12 @@ func initRmq(name string) *etcdkv.EtcdKV {
}
func Close(rocksdbName string, intputStream, outputStream MsgStream, etcdKV *etcdkv.EtcdKV) {
rocksmq.CloseRocksMQ()
intputStream.Close()
outputStream.Close()
etcdKV.Close()
err := os.RemoveAll(rocksdbName)
_ = os.RemoveAll(rocksdbName + "_meta_kv")
log.Println(err)
}

View File

@ -29,14 +29,16 @@ type ParamTable struct {
Address string
Port int
PulsarAddress string
RocksmqPath string
EtcdEndpoints []string
MetaRootPath string
KvRootPath string
MsgChannelSubName string
TimeTickChannel string
StatisticsChannel string
PulsarAddress string
RocksmqPath string
RocksmqRetentionSizeInMinutes int64
RocksmqRetentionSizeInMB int64
EtcdEndpoints []string
MetaRootPath string
KvRootPath string
MsgChannelSubName string
TimeTickChannel string
StatisticsChannel string
MaxPartitionNum int64
DefaultPartitionName string
@ -99,6 +101,10 @@ func (p *ParamTable) initRocksmqPath() {
p.RocksmqPath = path
}
func (p *ParamTable) initRocksmqRetentionTimeInMinutes() {
p.RocksmqRetentionSizeInMinutes = p.ParseInt64("rootcoord.RocksmqRetentionSizeInMinutes")
}
func (p *ParamTable) initEtcdEndpoints() {
endpoints, err := p.Load("_EtcdEndpoints")
if err != nil {

View File

@ -27,8 +27,8 @@ var Params paramtable.BaseTable
func TestMain(m *testing.M) {
Params.Init()
rocksdbName := "/tmp/rocksdb_mqclient"
_ = rocksmq1.InitRocksMQ(rocksdbName)
os.Setenv("ROCKSMQ_PATH", "/tmp/milvus/rdb_data")
_ = rocksmq1.InitRocksMQ()
exitCode := m.Run()
defer rocksmq1.CloseRocksMQ()
os.Exit(exitCode)

View File

@ -31,4 +31,5 @@ func (rp *rmqProducer) Send(ctx context.Context, message *ProducerMessage) error
}
func (rp *rmqProducer) Close() {
}

View File

@ -147,7 +147,7 @@ func (c *client) Close() {
log.Debug("Close" + opt.Topic + "+" + opt.SubscriptionName)
_ = c.server.DestroyConsumerGroup(opt.Topic, opt.SubscriptionName)
//TODO(yukun): Should topic be closed?
//_ = c.server.DestroyTopic(opt.Topic)
_ = c.server.DestroyTopic(opt.Topic)
}
c.cancel()
}

View File

@ -63,4 +63,7 @@ type Consumer interface {
// Seek to the uniqueID position
Seek(UniqueID) error //nolint:govet
// Close consumer
Close()
}

View File

@ -11,6 +11,11 @@
package rocksmq
import (
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
type consumer struct {
topic string
client *client
@ -101,3 +106,10 @@ func (c *consumer) Seek(id UniqueID) error { //nolint:govet
c.client.server.Notify(c.topic, c.consumerName)
return nil
}
func (c *consumer) Close() {
err := c.client.server.DestroyConsumerGroup(c.topic, c.consumerName)
if err != nil {
log.Debug("Consumer close failed", zap.Any("topicName", c.topic), zap.Any("groupName", c.consumerName), zap.Any("error", err))
}
}

View File

@ -25,4 +25,7 @@ type Producer interface {
// publish a message
Send(message *ProducerMessage) error
// Close a producer
Close()
}

View File

@ -12,7 +12,9 @@
package rocksmq
import (
"github.com/milvus-io/milvus/internal/log"
server "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
"go.uber.org/zap"
)
type producer struct {
@ -47,3 +49,10 @@ func (p *producer) Send(message *ProducerMessage) error {
},
})
}
func (p *producer) Close() {
err := p.c.server.DestroyTopic(p.topic)
if err != nil {
log.Debug("Producer close failed", zap.Any("topicName", p.topic), zap.Any("error", err))
}
}

View File

@ -16,12 +16,16 @@ import (
"sync"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
)
var Rmq *rocksmq
var once sync.Once
var params paramtable.BaseTable
func InitRmq(rocksdbName string, idAllocator allocator.GIDAllocator) error {
var err error
@ -29,10 +33,13 @@ func InitRmq(rocksdbName string, idAllocator allocator.GIDAllocator) error {
return err
}
func InitRocksMQ(rocksdbName string) error {
func InitRocksMQ() error {
var err error
once.Do(func() {
_, err := os.Stat(rocksdbName)
params.Init()
rocksdbName, _ := params.Load("_RocksmqPath")
log.Debug("RocksmqPath=" + rocksdbName)
_, err = os.Stat(rocksdbName)
if os.IsNotExist(err) {
err = os.MkdirAll(rocksdbName, os.ModePerm)
if err != nil {
@ -49,6 +56,9 @@ func InitRocksMQ(rocksdbName string) error {
idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV)
_ = idAllocator.Initialize()
RocksmqRetentionTimeInMinutes = params.ParseInt64("rocksmq.retentionTimeInMinutes")
RocksmqRetentionSizeInMB = params.ParseInt64("rocksmq.retentionSizeInMB")
log.Debug("Rocksmq retention: ", zap.Any("RocksmqRetentionTimeInMinutes", RocksmqRetentionTimeInMinutes), zap.Any("RocksmqRetentionSizeInMB", RocksmqRetentionSizeInMB))
Rmq, err = NewRocksMQ(rocksdbName, idAllocator)
if err != nil {
panic(err)
@ -58,7 +68,11 @@ func InitRocksMQ(rocksdbName string) error {
}
func CloseRocksMQ() {
if Rmq != nil && Rmq.store != nil {
Rmq.store.Close()
log.Debug("Close Rocksmq!")
if Rmq != nil {
Rmq.stopRetention()
if Rmq.store != nil {
Rmq.store.Close()
}
}
}

View File

@ -16,6 +16,7 @@ import (
"fmt"
"strconv"
"sync"
"time"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/kv"
@ -23,7 +24,7 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/tecbot/gorocksdb"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
)
type UniqueID = typeutil.UniqueID
@ -32,6 +33,17 @@ const (
DefaultMessageID = "-1"
FixedChannelNameLen = 320
RocksDBLRUCacheCapacity = 3 << 30
RocksmqPageSize = 2 << 30
kvSuffix = "_meta_kv"
MessageSizeTitle = "message_size/"
PageMsgSizeTitle = "page_message_size/"
TopicBeginIDTitle = "topic_begin_id/"
BeginIDTitle = "begin_id/"
AckedTsTitle = "acked_ts/"
AckedSizeTitle = "acked_size/"
LastRetTsTitle = "last_retention_ts/"
)
/**
@ -63,13 +75,35 @@ func combKey(channelName string, id UniqueID) (string, error) {
return fixName + "/" + strconv.FormatInt(id, 10), nil
}
/**
* Construct table name and fixed channel name to be a key with length of FixedChannelNameLen,
* used for meta infos
*/
func constructKey(metaName, topic string) (string, error) {
// Check metaName/topic
oldLen := len(metaName + topic)
if oldLen > FixedChannelNameLen {
return "", errors.New("Topic name exceeds limit")
}
nameBytes := make([]byte, FixedChannelNameLen-oldLen)
for i := 0; i < len(nameBytes); i++ {
nameBytes[i] = byte('*')
}
return metaName + topic + string(nameBytes), nil
}
var topicMu sync.Map = sync.Map{}
type rocksmq struct {
store *gorocksdb.DB
kv kv.BaseKV
idAllocator allocator.GIDAllocator
channelMu sync.Map
consumers sync.Map
ackedMu sync.Map
consumers sync.Map
retentionInfo *retentionInfo
}
func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, error) {
@ -85,18 +119,38 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro
return nil, err
}
mkv := memkv.NewMemoryKV()
kvName := name + kvSuffix
kv, err := rocksdbkv.NewRocksdbKV(kvName)
if err != nil {
return nil, err
}
rmq := &rocksmq{
store: db,
kv: mkv,
kv: kv,
idAllocator: idAllocator,
consumers: sync.Map{},
ackedMu: sync.Map{},
}
rmq.channelMu = sync.Map{}
rmq.consumers = sync.Map{}
ri, err := initRetentionInfo(kv, db)
if err != nil {
return nil, err
}
rmq.retentionInfo = ri
err = rmq.retentionInfo.startRetentionInfo()
if err != nil {
return nil, err
}
return rmq, nil
}
func (rmq *rocksmq) stopRetention() {
rmq.retentionInfo.ctx.Done()
}
func (rmq *rocksmq) checkKeyExist(key string) bool {
val, _ := rmq.kv.Load(key)
return val != ""
@ -123,12 +177,56 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
log.Debug("RocksMQ: save " + endKey + " failed.")
return err
}
rmq.channelMu.Store(topicName, new(sync.Mutex))
if _, ok := topicMu.Load(topicName); !ok {
topicMu.Store(topicName, new(sync.Mutex))
}
if _, ok := rmq.ackedMu.Load(topicName); !ok {
rmq.ackedMu.Store(topicName, new(sync.Mutex))
}
// Initialize retention infos
// Initialize acked size to 0 for topic
ackedSizeKey := AckedSizeTitle + topicName
err = rmq.kv.Save(ackedSizeKey, "0")
if err != nil {
return err
}
// Initialize topic begin id to defaultMessageID
topicBeginIDKey := TopicBeginIDTitle + topicName
err = rmq.kv.Save(topicBeginIDKey, DefaultMessageID)
if err != nil {
return err
}
// Initialize topic message size to 0
msgSizeKey := MessageSizeTitle + topicName
err = rmq.kv.Save(msgSizeKey, "0")
if err != nil {
return err
}
// Initialize last retention timestamp to time_now
lastRetentionTsKey := LastRetTsTitle + topicName
timeNow := time.Now().Unix()
err = rmq.kv.Save(lastRetentionTsKey, strconv.FormatInt(timeNow, 10))
if err != nil {
return nil
}
rmq.retentionInfo.topics = append(rmq.retentionInfo.topics, topicName)
rmq.retentionInfo.pageInfo.Store(topicName, &topicPageInfo{
pageEndID: make([]UniqueID, 0),
pageMsgSize: map[UniqueID]int64{},
})
rmq.retentionInfo.lastRetentionTime.Store(topicName, timeNow)
rmq.retentionInfo.ackedInfo.Store(topicName, &topicAckedInfo{
ackedTs: map[UniqueID]int64{},
})
return nil
}
func (rmq *rocksmq) DestroyTopic(topicName string) error {
log.Debug("In DestroyTopic")
beginKey := topicName + "/begin_id"
endKey := topicName + "/end_id"
@ -145,13 +243,50 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
}
rmq.consumers.Delete(topicName)
log.Debug("DestroyTopic: " + topicName)
ackedSizeKey := AckedSizeTitle + topicName
err = rmq.kv.Remove(ackedSizeKey)
if err != nil {
return err
}
topicBeginIDKey := TopicBeginIDTitle + topicName
err = rmq.kv.Remove(topicBeginIDKey)
if err != nil {
return err
}
lastRetTsKey := LastRetTsTitle + topicName
err = rmq.kv.Remove(lastRetTsKey)
if err != nil {
return err
}
msgSizeKey := MessageSizeTitle + topicName
err = rmq.kv.Remove(msgSizeKey)
if err != nil {
return err
}
topicMu.Delete(topicName)
rmq.retentionInfo.ackedInfo.Delete(topicName)
rmq.retentionInfo.lastRetentionTime.Delete(topicName)
rmq.retentionInfo.pageInfo.Delete(topicName)
return nil
}
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer) {
key := groupName + "/" + topicName + "/current_id"
// keyExist := false
// if ll, ok := topicMu.Load(topicName); !ok {
// keyExist = rmq.checkKeyExist(key)
// } else {
// if lock, lok := ll.(*sync.Mutex); lok {
// lock.Lock()
// defer lock.Unlock()
// keyExist = rmq.checkKeyExist(key)
// } else {
// keyExist = rmq.checkKeyExist(key)
// }
// }
if rmq.checkKeyExist(key) {
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {
@ -216,13 +351,11 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
}
}
log.Debug("DestroyConsumerGroup: " + topicName + "+" + groupName)
return nil
}
func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error {
ll, ok := rmq.channelMu.Load(topicName)
ll, ok := topicMu.Load(topicName)
if !ok {
return fmt.Errorf("topic name = %s not exist", topicName)
}
@ -248,6 +381,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
/* Step I: Insert data to store system */
batch := gorocksdb.NewWriteBatch()
msgSizes := make(map[UniqueID]int64)
for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ {
key, err := combKey(topicName, idStart+UniqueID(i))
if err != nil {
@ -256,6 +390,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
}
batch.Put([]byte(key), messages[i].Payload)
msgSizes[idStart+UniqueID(i)] = int64(len(messages[i].Payload))
}
err = rmq.store.Write(gorocksdb.NewDefaultWriteOptions(), batch)
@ -299,11 +434,68 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
}
}
}
// Update message page info
// TODO(yukun): Should this be in a go routine
err = rmq.UpdatePageInfo(topicName, msgSizes)
if err != nil {
return err
}
return nil
}
func (rmq *rocksmq) UpdatePageInfo(topicName string, msgSizes map[UniqueID]int64) error {
msgSizeKey := MessageSizeTitle + topicName
msgSizeVal, err := rmq.kv.Load(msgSizeKey)
if err != nil {
return err
}
curMsgSize, err := strconv.ParseInt(msgSizeVal, 10, 64)
if err != nil {
return err
}
fixedPageSizeKey, err := constructKey(PageMsgSizeTitle, topicName)
if err != nil {
return err
}
for k, v := range msgSizes {
if curMsgSize+v > RocksmqPageSize {
// Current page is full
newPageSize := curMsgSize + v
pageEndID := k
// Update page message size for current page. key is page end ID
pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10)
err := rmq.kv.Save(pageMsgSizeKey, strconv.FormatInt(newPageSize, 10))
if err != nil {
return err
}
if pageInfo, ok := rmq.retentionInfo.pageInfo.Load(topicName); ok {
pageInfo.(*topicPageInfo).pageEndID = append(pageInfo.(*topicPageInfo).pageEndID, pageEndID)
pageInfo.(*topicPageInfo).pageMsgSize[pageEndID] = newPageSize
rmq.retentionInfo.pageInfo.Store(topicName, pageInfo)
}
// Update message size to 0
err = rmq.kv.Save(msgSizeKey, strconv.FormatInt(0, 10))
if err != nil {
return err
}
curMsgSize = 0
} else {
curMsgSize += v
// Update message size to current message size
err := rmq.kv.Save(msgSizeKey, strconv.FormatInt(curMsgSize, 10))
if err != nil {
return err
}
}
}
return nil
}
func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error) {
ll, ok := rmq.channelMu.Load(topicName)
ll, ok := topicMu.Load(topicName)
if !ok {
return nil, fmt.Errorf("topic name = %s not exist", topicName)
}
@ -372,7 +564,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
// When already consume to last mes, an empty slice will be returned
if len(consumerMessage) == 0 {
//log.Debug("RocksMQ: consumerMessage is empty")
// log.Debug("RocksMQ: consumerMessage is empty")
return consumerMessage, nil
}
@ -383,6 +575,9 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
return nil, err
}
msgSize := len(consumerMessage[len(consumerMessage)-1].Payload)
go rmq.UpdateAckedInfo(topicName, groupName, newID, int64(msgSize))
return consumerMessage, nil
}
@ -431,3 +626,86 @@ func (rmq *rocksmq) Notify(topicName, groupName string) {
}
}
}
func (rmq *rocksmq) UpdateAckedInfo(topicName, groupName string, newID UniqueID, msgSize int64) error {
ll, ok := rmq.ackedMu.Load(topicName)
if !ok {
return fmt.Errorf("topic name = %s not exist", topicName)
}
lock, ok := ll.(*sync.Mutex)
if !ok {
return fmt.Errorf("get mutex failed, topic name = %s", topicName)
}
lock.Lock()
defer lock.Unlock()
fixedBeginIDKey, err := constructKey(BeginIDTitle, topicName)
if err != nil {
return err
}
// Update begin_id for the consumer_group
beginIDKey := fixedBeginIDKey + "/" + groupName
err = rmq.kv.Save(beginIDKey, strconv.FormatInt(newID, 10))
if err != nil {
return err
}
// Update begin_id for topic
if vals, ok := rmq.consumers.Load(topicName); ok {
var minBeginID int64 = -1
for _, v := range vals.([]*Consumer) {
curBeginIDKey := fixedBeginIDKey + "/" + v.GroupName
curBeginIDVal, err := rmq.kv.Load(curBeginIDKey)
if err != nil {
return err
}
curBeginID, err := strconv.ParseInt(curBeginIDVal, 10, 64)
if err != nil {
return err
}
if curBeginID > minBeginID {
minBeginID = curBeginID
}
}
topicBeginIDKey := TopicBeginIDTitle + topicName
err = rmq.kv.Save(topicBeginIDKey, strconv.FormatInt(minBeginID, 10))
if err != nil {
return err
}
// Update acked info for msg of begin id
fixedAckedTsKey, err := constructKey(AckedTsTitle, topicName)
if err != nil {
return err
}
ackedTsKey := fixedAckedTsKey + "/" + strconv.FormatInt(minBeginID, 10)
ts := time.Now().Unix()
err = rmq.kv.Save(ackedTsKey, strconv.FormatInt(ts, 10))
if err != nil {
return err
}
if minBeginID == newID {
// Means the begin_id of topic update to newID, so needs to update acked size
ackedSizeKey := AckedSizeTitle + topicName
ackedSizeVal, err := rmq.kv.Load(ackedSizeKey)
if err != nil {
return err
}
ackedSize, err := strconv.ParseInt(ackedSizeVal, 10, 64)
if err != nil {
return err
}
ackedSize += msgSize
err = rmq.kv.Save(ackedSizeKey, strconv.FormatInt(ackedSize, 10))
if err != nil {
return err
}
if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok {
ackedInfo := info.(*topicAckedInfo)
ackedInfo.ackedSize = ackedSize
rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo)
}
}
}
return nil
}

View File

@ -53,6 +53,9 @@ func TestRocksMQ(t *testing.T) {
name := "/tmp/rocksmq"
_ = os.RemoveAll(name)
defer os.RemoveAll(name)
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
@ -93,6 +96,7 @@ func TestRocksMQ(t *testing.T) {
assert.Equal(t, len(cMsgs), 2)
assert.Equal(t, string(cMsgs[0].Payload), "b_message")
assert.Equal(t, string(cMsgs[1].Payload), "c_message")
rmq.stopRetention()
}
func TestRocksMQ_Loop(t *testing.T) {
@ -106,6 +110,9 @@ func TestRocksMQ_Loop(t *testing.T) {
name := "/tmp/rocksmq_1"
_ = os.RemoveAll(name)
defer os.RemoveAll(name)
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
@ -157,6 +164,7 @@ func TestRocksMQ_Loop(t *testing.T) {
cMsgs, err = rmq.Consume(channelName, groupName, 1)
assert.Nil(t, err)
assert.Equal(t, len(cMsgs), 0)
rmq.stopRetention()
}
func TestRocksMQ_Goroutines(t *testing.T) {
@ -169,6 +177,9 @@ func TestRocksMQ_Goroutines(t *testing.T) {
name := "/tmp/rocksmq_2"
defer os.RemoveAll(name)
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
@ -214,6 +225,7 @@ func TestRocksMQ_Goroutines(t *testing.T) {
}(&wg, rmq)
}
wg.Wait()
rmq.stopRetention()
}
/**
@ -236,6 +248,9 @@ func TestRocksMQ_Throughout(t *testing.T) {
name := "/tmp/rocksmq_3"
defer os.RemoveAll(name)
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
@ -274,6 +289,7 @@ func TestRocksMQ_Throughout(t *testing.T) {
ct1 := time.Now().UnixNano() / int64(time.Millisecond)
cDuration := ct1 - ct0
log.Printf("Total consume %d item, cost %v ms, throughout %v / s", entityNum, cDuration, int64(entityNum)*1000/cDuration)
rmq.stopRetention()
}
func TestRocksMQ_MultiChan(t *testing.T) {
@ -286,6 +302,9 @@ func TestRocksMQ_MultiChan(t *testing.T) {
name := "/tmp/rocksmq_multichan"
defer os.RemoveAll(name)
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
@ -319,4 +338,5 @@ func TestRocksMQ_MultiChan(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, len(cMsgs), 1)
assert.Equal(t, string(cMsgs[0].Payload), "for_chann1_"+strconv.Itoa(0))
rmq.stopRetention()
}

View File

@ -0,0 +1,495 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package rocksmq
import (
"context"
"fmt"
"strconv"
"sync"
"time"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/milvus-io/milvus/internal/log"
"github.com/tecbot/gorocksdb"
"go.uber.org/zap"
)
var RocksmqRetentionTimeInMinutes int64
var RocksmqRetentionSizeInMB int64
var TickerTimeInMinutes int64 = 1
var CheckTimeInterval int64 = 6
const (
MB = 2 << 20
MINUTE = 60
)
type topicPageInfo struct {
pageEndID []UniqueID
pageMsgSize map[UniqueID]int64
}
type topicAckedInfo struct {
topicBeginID UniqueID
// TODO(yukun): may need to delete ackedTs
ackedTs map[UniqueID]UniqueID
ackedSize int64
}
type retentionInfo struct {
ctx context.Context
topics []string
// pageInfo map[string]*topicPageInfo
pageInfo sync.Map
// ackedInfo map[string]*topicAckedInfo
ackedInfo sync.Map
// Key is last_retention_time/${topic}
// lastRetentionTime map[string]int64
lastRetentionTime sync.Map
kv *rocksdbkv.RocksdbKV
db *gorocksdb.DB
}
// Interface LoadWithPrefix() in rocksdbkv needs to close db instance first and then reopen,
// which will cause crash when other goroutines operate the db instance. So here implement a
// prefixLoad without reopen db instance.
func prefixLoad(db *gorocksdb.DB, prefix string) ([]string, []string, error) {
readOpts := gorocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
readOpts.SetPrefixSameAsStart(true)
iter := db.NewIterator(readOpts)
defer iter.Close()
keys := make([]string, 0)
values := make([]string, 0)
iter.Seek([]byte(prefix))
for ; iter.Valid(); iter.Next() {
key := iter.Key()
value := iter.Value()
defer key.Free()
defer value.Free()
keys = append(keys, string(key.Data()))
values = append(values, string(value.Data()))
}
if err := iter.Err(); err != nil {
return nil, nil, err
}
return keys, values, nil
}
func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) {
ri := &retentionInfo{
ctx: context.Background(),
topics: make([]string, 0),
pageInfo: sync.Map{},
ackedInfo: sync.Map{},
lastRetentionTime: sync.Map{},
kv: kv,
db: db,
}
// Get topic from topic begin id
beginIDKeys, _, err := ri.kv.LoadWithPrefix(TopicBeginIDTitle)
if err != nil {
return nil, err
}
for _, key := range beginIDKeys {
topic := key[len(TopicBeginIDTitle):]
ri.topics = append(ri.topics, topic)
topicMu.Store(topic, new(sync.Mutex))
}
return ri, nil
}
func (ri *retentionInfo) startRetentionInfo() error {
var wg sync.WaitGroup
for _, topic := range ri.topics {
// Load all page infos
wg.Add(1)
go ri.loadRetentionInfo(topic, &wg)
}
wg.Wait()
go ri.retention()
return nil
}
func (ri *retentionInfo) loadRetentionInfo(topic string, wg *sync.WaitGroup) {
// TODO(yukun): If there needs to add lock
// ll, ok := topicMu.Load(topic)
// if !ok {
// return fmt.Errorf("topic name = %s not exist", topic)
// }
// lock, ok := ll.(*sync.Mutex)
// if !ok {
// return fmt.Errorf("get mutex failed, topic name = %s", topic)
// }
// lock.Lock()
// defer lock.Unlock()
defer wg.Done()
pageEndID := make([]UniqueID, 0)
pageMsgSize := make(map[int64]UniqueID)
fixedPageSizeKey, err := constructKey(PageMsgSizeTitle, topic)
if err != nil {
log.Debug("ConstructKey failed", zap.Any("error", err))
return
}
pageMsgSizePrefix := fixedPageSizeKey + "/"
pageMsgSizeKeys, pageMsgSizeVals, err := prefixLoad(ri.kv.DB, pageMsgSizePrefix)
if err != nil {
log.Debug("PrefixLoad failed", zap.Any("error", err))
return
}
for i, key := range pageMsgSizeKeys {
endID, err := strconv.ParseInt(key[FixedChannelNameLen+1:], 10, 64)
if err != nil {
log.Debug("ParseInt failed", zap.Any("error", err))
return
}
pageEndID = append(pageEndID, endID)
msgSize, err := strconv.ParseInt(pageMsgSizeVals[i], 10, 64)
if err != nil {
log.Debug("ParseInt failed", zap.Any("error", err))
return
}
pageMsgSize[endID] = msgSize
}
topicPageInfo := &topicPageInfo{
pageEndID: pageEndID,
pageMsgSize: pageMsgSize,
}
// Load all acked infos
ackedTs := make(map[UniqueID]UniqueID)
topicBeginIDKey := TopicBeginIDTitle + topic
topicBeginIDVal, err := ri.kv.Load(topicBeginIDKey)
if err != nil {
return
}
topicBeginID, err := strconv.ParseInt(topicBeginIDVal, 10, 64)
if err != nil {
log.Debug("ParseInt failed", zap.Any("error", err))
return
}
ackedTsPrefix, err := constructKey(AckedTsTitle, topic)
if err != nil {
log.Debug("ConstructKey failed", zap.Any("error", err))
return
}
keys, vals, err := prefixLoad(ri.kv.DB, ackedTsPrefix)
if err != nil {
log.Debug("PrefixLoad failed", zap.Any("error", err))
return
}
if len(keys) != len(vals) {
log.Debug("LoadWithPrefix return unequal value length of keys and values")
return
}
for i, key := range keys {
offset := FixedChannelNameLen + 1
ackedID, err := strconv.ParseInt((key)[offset:], 10, 64)
if err != nil {
log.Debug("RocksMQ: parse int " + key[offset:] + " failed")
return
}
ts, err := strconv.ParseInt(vals[i], 10, 64)
if err != nil {
return
}
ackedTs[ackedID] = ts
}
ackedSizeKey := AckedSizeTitle + topic
ackedSizeVal, err := ri.kv.Load(ackedSizeKey)
if err != nil {
log.Debug("Load failed", zap.Any("error", err))
return
}
ackedSize, err := strconv.ParseInt(ackedSizeVal, 10, 64)
if err != nil {
log.Debug("PrefixLoad failed", zap.Any("error", err))
return
}
ackedInfo := &topicAckedInfo{
topicBeginID: topicBeginID,
ackedTs: ackedTs,
ackedSize: ackedSize,
}
//Load last retention timestamp
lastRetentionTsKey := LastRetTsTitle + topic
lastRetentionTsVal, err := ri.kv.Load(lastRetentionTsKey)
if err != nil {
log.Debug("Load failed", zap.Any("error", err))
return
}
lastRetentionTs, err := strconv.ParseInt(lastRetentionTsVal, 10, 64)
if err != nil {
log.Debug("ParseInt failed", zap.Any("error", err))
return
}
ri.ackedInfo.Store(topic, ackedInfo)
ri.pageInfo.Store(topic, topicPageInfo)
ri.lastRetentionTime.Store(topic, lastRetentionTs)
}
func (ri *retentionInfo) retention() error {
log.Debug("Rocksmq retention goroutine start!")
ticker := time.NewTicker(time.Duration(TickerTimeInMinutes * int64(time.Minute) / 10))
for {
select {
case <-ri.ctx.Done():
return nil
case t := <-ticker.C:
timeNow := t.Unix()
checkTime := RocksmqRetentionTimeInMinutes * 60 / 10
log.Debug("In ticker: ", zap.Any("ticker", timeNow))
ri.lastRetentionTime.Range(func(k, v interface{}) bool {
if v.(int64)+checkTime < timeNow {
err := ri.expiredCleanUp(k.(string))
if err != nil {
panic(err)
}
}
return true
})
// for k, v := range ri.lastRetentionTime {
// if v+checkTime < timeNow {
// err := ri.expiredCleanUp(k)
// if err != nil {
// panic(err)
// }
// }
// }
}
}
}
func (ri *retentionInfo) expiredCleanUp(topic string) error {
// log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic)
var ackedInfo *topicAckedInfo
if info, ok := ri.ackedInfo.Load(topic); ok {
ackedInfo = info.(*topicAckedInfo)
} else {
log.Debug("Topic " + topic + " doesn't have acked infos")
return nil
}
ll, ok := topicMu.Load(topic)
if !ok {
return fmt.Errorf("topic name = %s not exist", topic)
}
lock, ok := ll.(*sync.Mutex)
if !ok {
return fmt.Errorf("get mutex failed, topic name = %s", topic)
}
lock.Lock()
defer lock.Unlock()
readOpts := gorocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
readOpts.SetPrefixSameAsStart(true)
iter := ri.kv.DB.NewIterator(readOpts)
defer iter.Close()
ackedTsPrefix, err := constructKey(AckedTsTitle, topic)
if err != nil {
return err
}
iter.Seek([]byte(ackedTsPrefix))
if !iter.Valid() {
return nil
}
var startID UniqueID
var endID UniqueID
endID = 0
startID, err = strconv.ParseInt(string(iter.Key().Data())[FixedChannelNameLen+1:], 10, 64)
if err != nil {
return err
}
var deletedAckedSize int64 = 0
pageRetentionOffset := 0
var pageInfo *topicPageInfo
if info, ok := ri.pageInfo.Load(topic); ok {
pageInfo = info.(*topicPageInfo)
}
if pageInfo != nil {
for i, pageEndID := range pageInfo.pageEndID {
// Clean by RocksmqRetentionTimeInMinutes
if msgTimeExpiredCheck(ackedInfo.ackedTs[pageEndID]) {
// All of the page expired, set the pageEndID to current endID
endID = pageEndID
fixedAckedTsKey, err := constructKey(AckedTsTitle, topic)
if err != nil {
return err
}
newKey := fixedAckedTsKey + "/" + strconv.Itoa(int(pageEndID))
iter.Seek([]byte(newKey))
pageRetentionOffset = i + 1
deletedAckedSize += pageInfo.pageMsgSize[pageEndID]
delete(pageInfo.pageMsgSize, pageEndID)
}
}
}
log.Debug("In expiredCleanUp: ", zap.Any("topic", topic), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize))
pageEndID := endID
// The end msg of the page is not expired, find the last expired msg in this page
for ; iter.Valid(); iter.Next() {
ackedTs, err := strconv.ParseInt(string(iter.Value().Data()), 10, 64)
if err != nil {
return err
}
if msgTimeExpiredCheck(ackedTs) {
endID, err = strconv.ParseInt(string(iter.Key().Data())[FixedChannelNameLen+1:], 10, 64)
if err != nil {
return err
}
} else {
break
}
}
if endID == 0 {
log.Debug("All messages are not expired")
return nil
}
// Delete page message size in rocksdb_kv
if pageInfo != nil {
// Judge expire by ackedSize
if msgSizeExpiredCheck(deletedAckedSize, ackedInfo.ackedSize) {
for _, pEndID := range pageInfo.pageEndID[pageRetentionOffset:0] {
curDeletedSize := deletedAckedSize + pageInfo.pageMsgSize[pEndID]
if msgSizeExpiredCheck(curDeletedSize, ackedInfo.ackedSize) {
endID = pEndID
pageEndID = pEndID
deletedAckedSize = curDeletedSize
delete(pageInfo.pageMsgSize, pEndID)
} else {
break
}
}
}
if pageEndID > 0 && len(pageInfo.pageEndID) > 0 {
pageStartID := pageInfo.pageEndID[0]
fixedPageSizeKey, err := constructKey(PageMsgSizeTitle, topic)
if err != nil {
return err
}
pageStartKey := fixedPageSizeKey + "/" + strconv.Itoa(int(pageStartID))
pageEndKey := fixedPageSizeKey + "/" + strconv.Itoa(int(pageEndID))
pageWriteBatch := gorocksdb.NewWriteBatch()
defer pageWriteBatch.Clear()
log.Debug("Delete page info", zap.Any("topic", topic), zap.Any("pageStartID", pageStartID), zap.Any("pageEndID", pageEndID))
if pageStartID == pageEndID {
pageWriteBatch.Delete([]byte(pageStartKey))
} else {
pageWriteBatch.DeleteRange([]byte(pageStartKey), []byte(pageEndKey))
}
ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), pageWriteBatch)
pageInfo.pageEndID = pageInfo.pageEndID[pageRetentionOffset:]
}
ri.pageInfo.Store(topic, pageInfo)
}
// Delete acked_ts in rocksdb_kv
fixedAckedTsTitle, err := constructKey(AckedTsTitle, topic)
if err != nil {
return err
}
ackedStartIDKey := fixedAckedTsTitle + "/" + strconv.Itoa(int(startID))
ackedEndIDKey := fixedAckedTsTitle + "/" + strconv.Itoa(int(endID))
ackedTsWriteBatch := gorocksdb.NewWriteBatch()
defer ackedTsWriteBatch.Clear()
if startID == endID {
ackedTsWriteBatch.Delete([]byte(ackedStartIDKey))
} else {
ackedTsWriteBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey))
}
ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), ackedTsWriteBatch)
// Update acked_size in rocksdb_kv
// Update last retention ts
lastRetentionTsKey := LastRetTsTitle + topic
err = ri.kv.Save(lastRetentionTsKey, strconv.FormatInt(time.Now().Unix(), 10))
if err != nil {
return err
}
ackedInfo.ackedSize -= deletedAckedSize
ackedSizeKey := AckedSizeTitle + topic
err = ri.kv.Save(ackedSizeKey, strconv.FormatInt(ackedInfo.ackedSize, 10))
if err != nil {
return err
}
for k := range ackedInfo.ackedTs {
if k < endID {
delete(ackedInfo.ackedTs, k)
}
}
ri.ackedInfo.Store(topic, ackedInfo)
return DeleteMessages(ri.db, topic, startID, endID)
}
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
// Delete msg by range of startID and endID
startKey, err := combKey(topic, startID)
if err != nil {
log.Debug("RocksMQ: combKey(" + topic + "," + strconv.FormatInt(startID, 10) + ")")
return err
}
endKey, err := combKey(topic, endID)
if err != nil {
log.Debug("RocksMQ: combKey(" + topic + "," + strconv.FormatInt(endID, 10) + ")")
return err
}
writeBatch := gorocksdb.NewWriteBatch()
defer writeBatch.Clear()
log.Debug("Delete messages by range", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID))
if startID == endID {
writeBatch.Delete([]byte(startKey))
} else {
writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
}
err = db.Write(gorocksdb.NewDefaultWriteOptions(), writeBatch)
if err != nil {
return err
}
log.Debug("Delete message for topic: "+topic, zap.Any("startID", startID), zap.Any("endID", endID))
return nil
}
func msgTimeExpiredCheck(ackedTs int64) bool {
return ackedTs+RocksmqRetentionTimeInMinutes*MINUTE < time.Now().Unix()
}
func msgSizeExpiredCheck(deletedAckedSize, ackedSize int64) bool {
return ackedSize-deletedAckedSize > RocksmqRetentionSizeInMB*MB
}

View File

@ -0,0 +1,91 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package rocksmq
import (
"os"
"strconv"
"testing"
"time"
"github.com/milvus-io/milvus/internal/allocator"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/stretchr/testify/assert"
)
func InitIDAllocator(kvPath string) *allocator.GlobalIDAllocator {
rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvPath)
if err != nil {
panic(err)
}
idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV)
_ = idAllocator.Initialize()
return idAllocator
}
func TestRmqRetention(t *testing.T) {
//RocksmqRetentionSizeInMB = 0
//RocksmqRetentionTimeInMinutes = 0
kvPath := "/tmp/rocksmq_idAllocator_kv"
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := "/tmp/rocksmq_test"
defer os.RemoveAll(rocksdbPath)
metaPath := rocksdbPath + "_meta_kv"
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
topicName := "topic_a"
err = rmq.CreateTopic(topicName)
assert.Nil(t, err)
defer rmq.DestroyTopic(topicName)
msgNum := 100
pMsgs := make([]ProducerMessage, msgNum)
for i := 0; i < msgNum; i++ {
msg := "message_" + strconv.Itoa(i)
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
err = rmq.Produce(topicName, pMsgs)
assert.Nil(t, err)
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(topicName, groupName)
err = rmq.CreateConsumerGroup(topicName, groupName)
consumer := &Consumer{
Topic: topicName,
GroupName: groupName,
}
rmq.RegisterConsumer(consumer)
assert.Nil(t, err)
cMsgs := make([]ConsumerMessage, 0)
for i := 0; i < msgNum; i++ {
cMsg, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
cMsgs = append(cMsgs, cMsg[0])
}
assert.Equal(t, len(cMsgs), msgNum)
time.Sleep(time.Duration(CheckTimeInterval+1) * time.Second)
// Seek to a previous consumed message, the message should be clean up
err = rmq.Seek(topicName, groupName, cMsgs[msgNum/2].MsgID)
assert.Nil(t, err)
newRes, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
assert.Equal(t, len(newRes), 0)
}