mirror of https://github.com/milvus-io/milvus.git
Fix RocksDB Slow (#14614)
parent
ca70b3fb9c
commit
98e4ff33a8
|
@ -0,0 +1,108 @@
|
|||
package rocksdbkv
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/tecbot/gorocksdb"
|
||||
)
|
||||
|
||||
/**
|
||||
* A wrapper of go rocksdb iterator
|
||||
* it helps on 1) reserve the upperBound array to avoid garbage collection
|
||||
* 2) do a leakage check of iterator
|
||||
*/
|
||||
type RocksIterator struct {
|
||||
it *gorocksdb.Iterator
|
||||
upperBound []byte
|
||||
close bool
|
||||
}
|
||||
|
||||
func NewRocksIterator(db *gorocksdb.DB, opts *gorocksdb.ReadOptions) *RocksIterator {
|
||||
iter := db.NewIterator(opts)
|
||||
it := &RocksIterator{iter, nil, false}
|
||||
runtime.SetFinalizer(it, func(rocksit *RocksIterator) {
|
||||
if !rocksit.close {
|
||||
log.Error("iterator is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return it
|
||||
}
|
||||
|
||||
func NewRocksIteratorWithUpperBound(db *gorocksdb.DB, upperBoundString string, opts *gorocksdb.ReadOptions) *RocksIterator {
|
||||
upperBound := []byte(upperBoundString)
|
||||
opts.SetIterateUpperBound(upperBound)
|
||||
iter := db.NewIterator(opts)
|
||||
it := &RocksIterator{iter, upperBound, false}
|
||||
runtime.SetFinalizer(it, func(rocksit *RocksIterator) {
|
||||
if !rocksit.close {
|
||||
log.Error("iterator is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return it
|
||||
}
|
||||
|
||||
// Valid returns false only when an Iterator has iterated past either the
|
||||
// first or the last key in the database.
|
||||
func (iter *RocksIterator) Valid() bool {
|
||||
return iter.it.Valid()
|
||||
}
|
||||
|
||||
// ValidForPrefix returns false only when an Iterator has iterated past the
|
||||
// first or the last key in the database or the specified prefix.
|
||||
func (iter *RocksIterator) ValidForPrefix(prefix []byte) bool {
|
||||
return iter.it.ValidForPrefix(prefix)
|
||||
}
|
||||
|
||||
// Key returns the key the iterator currently holds.
|
||||
func (iter *RocksIterator) Key() *gorocksdb.Slice {
|
||||
return iter.it.Key()
|
||||
}
|
||||
|
||||
// Value returns the value in the database the iterator currently holds.
|
||||
func (iter *RocksIterator) Value() *gorocksdb.Slice {
|
||||
return iter.it.Value()
|
||||
}
|
||||
|
||||
// Next moves the iterator to the next sequential key in the database.
|
||||
func (iter *RocksIterator) Next() {
|
||||
iter.it.Next()
|
||||
}
|
||||
|
||||
// Prev moves the iterator to the previous sequential key in the database.
|
||||
func (iter *RocksIterator) Prev() {
|
||||
iter.it.Prev()
|
||||
}
|
||||
|
||||
// SeekToFirst moves the iterator to the first key in the database.
|
||||
func (iter *RocksIterator) SeekToFirst() {
|
||||
iter.it.SeekToFirst()
|
||||
}
|
||||
|
||||
// SeekToLast moves the iterator to the last key in the database.
|
||||
func (iter *RocksIterator) SeekToLast() {
|
||||
iter.it.SeekToLast()
|
||||
}
|
||||
|
||||
// Seek moves the iterator to the position greater than or equal to the key.
|
||||
func (iter *RocksIterator) Seek(key []byte) {
|
||||
iter.it.Seek(key)
|
||||
}
|
||||
|
||||
// SeekForPrev moves the iterator to the last key that less than or equal
|
||||
// to the target key, in contrast with Seek.
|
||||
func (iter *RocksIterator) SeekForPrev(key []byte) {
|
||||
iter.it.SeekForPrev(key)
|
||||
}
|
||||
|
||||
// Err returns nil if no errors happened during iteration, or the actual
|
||||
// error otherwise.
|
||||
func (iter *RocksIterator) Err() error {
|
||||
return iter.it.Err()
|
||||
}
|
||||
|
||||
// Close closes the iterator.
|
||||
func (iter *RocksIterator) Close() {
|
||||
iter.close = true
|
||||
iter.it.Close()
|
||||
}
|
|
@ -116,12 +116,12 @@ func (kv *RocksdbKV) LoadWithPrefix(prefix string) ([]string, []string, error) {
|
|||
}
|
||||
option := gorocksdb.NewDefaultReadOptions()
|
||||
defer option.Destroy()
|
||||
iter := kv.DB.NewIterator(option)
|
||||
iter := NewRocksIteratorWithUpperBound(kv.DB, typeutil.AddOne(prefix), option)
|
||||
defer iter.Close()
|
||||
|
||||
var keys, values []string
|
||||
iter.Seek([]byte(prefix))
|
||||
for ; iter.ValidForPrefix([]byte(prefix)); iter.Next() {
|
||||
for ; iter.Valid(); iter.Next() {
|
||||
key := iter.Key()
|
||||
value := iter.Value()
|
||||
keys = append(keys, string(key.Data()))
|
||||
|
@ -193,7 +193,7 @@ func (kv *RocksdbKV) RemoveWithPrefix(prefix string) error {
|
|||
// better to use drop column family, but as we use default column family, we just delete ["",lastKey+1)
|
||||
readOpts := gorocksdb.NewDefaultReadOptions()
|
||||
defer readOpts.Destroy()
|
||||
iter := kv.DB.NewIterator(readOpts)
|
||||
iter := NewRocksIterator(kv.DB, readOpts)
|
||||
defer iter.Close()
|
||||
// seek to the last key
|
||||
iter.SeekToLast()
|
||||
|
|
|
@ -891,7 +891,6 @@ func (it *insertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstre
|
|||
}
|
||||
|
||||
threshold := Params.ProxyCfg.PulsarMaxMessageSize
|
||||
log.Debug("Proxy", zap.Int("threshold of message size: ", threshold))
|
||||
// not accurate
|
||||
/* #nosec G103 */
|
||||
getFixedSizeOfInsertMsg := func(msg *msgstream.InsertMsg) int {
|
||||
|
|
|
@ -143,11 +143,14 @@ func (c *client) consume(consumer *consumer) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *client) deliver(consumer *consumer, batchMin int) {
|
||||
func (c *client) deliver(consumer *consumer, batchMax int) {
|
||||
for {
|
||||
n := cap(consumer.messageCh) - len(consumer.messageCh)
|
||||
if n < batchMin { // batch min size
|
||||
n = batchMin
|
||||
if n == 0 {
|
||||
return
|
||||
}
|
||||
if n > batchMax { // batch min size
|
||||
n = batchMax
|
||||
}
|
||||
msgs, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, n)
|
||||
if err != nil {
|
||||
|
|
|
@ -83,7 +83,7 @@ func InitRocksMQ() error {
|
|||
if err == nil {
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, rawRmqRetentionTimeInMinutes*60)
|
||||
} else {
|
||||
log.Warn("rocksmq.retentionTimeInMinutes is invalid, using default value 3 days")
|
||||
log.Warn("rocksmq.retentionTimeInMinutes is invalid, using default value")
|
||||
}
|
||||
}
|
||||
rawRmqRetentionSizeInMB, err := params.Load("rocksmq.retentionSizeInMB")
|
||||
|
|
|
@ -147,16 +147,22 @@ type rocksmq struct {
|
|||
// 3. Start retention goroutine
|
||||
func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, error) {
|
||||
// TODO we should use same rocksdb instance with different cfs
|
||||
maxProcs := runtime.GOMAXPROCS(0)
|
||||
parallelism := 1
|
||||
if maxProcs > 32 {
|
||||
parallelism = 4
|
||||
} else if maxProcs > 8 {
|
||||
parallelism = 2
|
||||
}
|
||||
log.Debug("Start rocksmq ", zap.Int("max proc", maxProcs), zap.Int("parallism", parallelism))
|
||||
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
|
||||
bbto.SetCacheIndexAndFilterBlocks(true)
|
||||
bbto.SetPinL0FilterAndIndexBlocksInCache(true)
|
||||
bbto.SetBlockCache(gorocksdb.NewLRUCache(RocksDBLRUCacheCapacity))
|
||||
optsKV := gorocksdb.NewDefaultOptions()
|
||||
optsKV.SetBlockBasedTableFactory(bbto)
|
||||
optsKV.SetCreateIfMissing(true)
|
||||
// by default there are only 1 thread for flush compaction, which may block each other.
|
||||
// increase to a reasonable thread numbers
|
||||
optsKV.IncreaseParallelism(runtime.NumCPU())
|
||||
optsKV.IncreaseParallelism(parallelism)
|
||||
// enable back ground flush
|
||||
optsKV.SetMaxBackgroundFlushes(1)
|
||||
|
||||
|
@ -174,7 +180,7 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro
|
|||
optsStore.SetCreateIfMissing(true)
|
||||
// by default there are only 1 thread for flush compaction, which may block each other.
|
||||
// increase to a reasonable thread numbers
|
||||
optsStore.IncreaseParallelism(runtime.NumCPU())
|
||||
optsStore.IncreaseParallelism(parallelism)
|
||||
// enable back ground flush
|
||||
optsStore.SetMaxBackgroundFlushes(1)
|
||||
|
||||
|
@ -219,16 +225,19 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro
|
|||
go func() {
|
||||
for {
|
||||
time.Sleep(5 * time.Minute)
|
||||
log.Info("Rocksmq memory usage",
|
||||
zap.String("rockskv kv cache", kv.DB.GetProperty("rocksdb.block-cache-usage")),
|
||||
zap.String("rockskv memtable ", kv.DB.GetProperty("rocksdb.cur-size-all-mem-tables")),
|
||||
log.Info("Rocksmq stats",
|
||||
zap.String("cache", kv.DB.GetProperty("rocksdb.block-cache-usage")),
|
||||
zap.String("rockskv memtable ", kv.DB.GetProperty("rocksdb.size-all-mem-tables")),
|
||||
zap.String("rockskv table readers", kv.DB.GetProperty("rocksdb.estimate-table-readers-mem")),
|
||||
zap.String("rockskv pinned", kv.DB.GetProperty("rocksdb.block-cache-pinned-usage")),
|
||||
|
||||
zap.String("store kv cache", db.GetProperty("rocksdb.block-cache-usage")),
|
||||
zap.String("store memtable ", db.GetProperty("rocksdb.cur-size-all-mem-tables")),
|
||||
zap.String("store memtable ", db.GetProperty("rocksdb.size-all-mem-tables")),
|
||||
zap.String("store table readers", db.GetProperty("rocksdb.estimate-table-readers-mem")),
|
||||
zap.String("store pinned", db.GetProperty("rocksdb.block-cache-pinned-usage")),
|
||||
zap.String("store l0 file num", db.GetProperty("rocksdb.num-files-at-level0")),
|
||||
zap.String("store l1 file num", db.GetProperty("rocksdb.num-files-at-level1")),
|
||||
zap.String("store l2 file num", db.GetProperty("rocksdb.num-files-at-level2")),
|
||||
zap.String("store l3 file num", db.GetProperty("rocksdb.num-files-at-level3")),
|
||||
zap.String("store l4 file num", db.GetProperty("rocksdb.num-files-at-level4")),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
@ -644,7 +653,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
|
|||
readOpts := gorocksdb.NewDefaultReadOptions()
|
||||
defer readOpts.Destroy()
|
||||
prefix := topicName + "/"
|
||||
iter := rmq.store.NewIterator(readOpts)
|
||||
iter := rocksdbkv.NewRocksIteratorWithUpperBound(rmq.store, typeutil.AddOne(prefix), readOpts)
|
||||
defer iter.Close()
|
||||
|
||||
var dataKey string
|
||||
|
@ -654,10 +663,9 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
|
|||
dataKey = path.Join(topicName, strconv.FormatInt(currentID.(int64), 10))
|
||||
}
|
||||
iter.Seek([]byte(dataKey))
|
||||
|
||||
consumerMessage := make([]ConsumerMessage, 0, n)
|
||||
offset := 0
|
||||
for ; iter.ValidForPrefix([]byte(prefix)) && offset < n; iter.Next() {
|
||||
for ; iter.Valid() && offset < n; iter.Next() {
|
||||
key := iter.Key()
|
||||
val := iter.Value()
|
||||
strKey := string(key.Data())
|
||||
|
@ -795,7 +803,7 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
|
|||
|
||||
readOpts := gorocksdb.NewDefaultReadOptions()
|
||||
defer readOpts.Destroy()
|
||||
iter := rmq.store.NewIterator(readOpts)
|
||||
iter := rocksdbkv.NewRocksIterator(rmq.store, readOpts)
|
||||
defer iter.Close()
|
||||
|
||||
prefix := topicName + "/"
|
||||
|
@ -863,11 +871,11 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID)
|
|||
defer readOpts.Destroy()
|
||||
pageMsgFirstKey := pageMsgPrefix + strconv.FormatInt(firstID, 10)
|
||||
|
||||
iter := rmq.kv.(*rocksdbkv.RocksdbKV).DB.NewIterator(readOpts)
|
||||
iter := rocksdbkv.NewRocksIteratorWithUpperBound(rmq.kv.(*rocksdbkv.RocksdbKV).DB, typeutil.AddOne(pageMsgPrefix), readOpts)
|
||||
defer iter.Close()
|
||||
var pageIDs []UniqueID
|
||||
|
||||
for iter.Seek([]byte(pageMsgFirstKey)); iter.ValidForPrefix([]byte(pageMsgPrefix)); iter.Next() {
|
||||
for iter.Seek([]byte(pageMsgFirstKey)); iter.Valid(); iter.Next() {
|
||||
key := iter.Key()
|
||||
pageID, err := parsePageID(string(key.Data()))
|
||||
if key != nil {
|
||||
|
@ -940,7 +948,7 @@ func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageI
|
|||
}
|
||||
readOpts := gorocksdb.NewDefaultReadOptions()
|
||||
readOpts.SetPrefixSameAsStart(true)
|
||||
iter := rmq.store.NewIterator(readOpts)
|
||||
iter := rocksdbkv.NewRocksIteratorWithUpperBound(rmq.store, typeutil.AddOne(topicName+"/"), readOpts)
|
||||
dataKey := path.Join(topicName, strconv.FormatInt(startMsgID, 10))
|
||||
iter.Seek([]byte(dataKey))
|
||||
// if iterate fail
|
||||
|
@ -957,7 +965,6 @@ func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageI
|
|||
reader := &rocksmqReader{
|
||||
store: rmq.store,
|
||||
topic: topicName,
|
||||
prefix: []byte(topicName + "/"),
|
||||
readerName: readerName,
|
||||
readOpts: readOpts,
|
||||
iter: iter,
|
||||
|
|
|
@ -18,18 +18,19 @@ import (
|
|||
"path"
|
||||
"strconv"
|
||||
|
||||
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/tecbot/gorocksdb"
|
||||
)
|
||||
|
||||
type rocksmqReader struct {
|
||||
store *gorocksdb.DB
|
||||
topic string
|
||||
prefix []byte
|
||||
readerName string
|
||||
|
||||
readOpts *gorocksdb.ReadOptions
|
||||
iter *gorocksdb.Iterator
|
||||
iter *rocksdbkv.RocksIterator
|
||||
|
||||
currentID UniqueID
|
||||
messageIDInclusive bool
|
||||
|
@ -77,7 +78,7 @@ func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) {
|
|||
iter.Next()
|
||||
rr.currentID = msgID
|
||||
}
|
||||
if iter.ValidForPrefix(rr.prefix) {
|
||||
if iter.Valid() {
|
||||
getMsg()
|
||||
return msg, err
|
||||
}
|
||||
|
@ -92,11 +93,11 @@ func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) {
|
|||
return nil, fmt.Errorf("reader Mutex closed")
|
||||
}
|
||||
rr.iter.Close()
|
||||
rr.iter = rr.store.NewIterator(rr.readOpts)
|
||||
rr.iter = rocksdbkv.NewRocksIteratorWithUpperBound(rr.store, typeutil.AddOne(rr.topic+"/"), rr.readOpts)
|
||||
dataKey := path.Join(rr.topic, strconv.FormatInt(rr.currentID+1, 10))
|
||||
iter = rr.iter
|
||||
iter.Seek([]byte(dataKey))
|
||||
if !iter.ValidForPrefix(rr.prefix) {
|
||||
if !iter.Valid() {
|
||||
return nil, errors.New("reader iterater is still invalid after receive mutex")
|
||||
}
|
||||
getMsg()
|
||||
|
@ -105,7 +106,7 @@ func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) {
|
|||
}
|
||||
|
||||
func (rr *rocksmqReader) HasNext() bool {
|
||||
if rr.iter.ValidForPrefix(rr.prefix) {
|
||||
if rr.iter.Valid() {
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -115,10 +116,10 @@ func (rr *rocksmqReader) HasNext() bool {
|
|||
return false
|
||||
}
|
||||
rr.iter.Close()
|
||||
rr.iter = rr.store.NewIterator(rr.readOpts)
|
||||
rr.iter = rocksdbkv.NewRocksIteratorWithUpperBound(rr.store, typeutil.AddOne(rr.topic+"/"), rr.readOpts)
|
||||
dataKey := path.Join(rr.topic, strconv.FormatInt(rr.currentID+1, 10))
|
||||
rr.iter.Seek([]byte(dataKey))
|
||||
return rr.iter.ValidForPrefix(rr.prefix)
|
||||
return rr.iter.Valid()
|
||||
default:
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
|
||||
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/tecbot/gorocksdb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -133,7 +134,6 @@ func (ri *retentionInfo) Stop() {
|
|||
// 3. delete acked info by range of page id;
|
||||
// 4. delete message by range of page id;
|
||||
func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
||||
log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic)
|
||||
start := time.Now()
|
||||
var deletedAckedSize int64
|
||||
var pageCleaned UniqueID
|
||||
|
@ -148,17 +148,18 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
|||
}
|
||||
// Quick Path, No page to check
|
||||
if totalAckedSize == 0 {
|
||||
log.Debug("All messages are not expired, skip retention because no ack", zap.Any("topic", topic))
|
||||
log.Debug("All messages are not expired, skip retention because no ack", zap.Any("topic", topic),
|
||||
zap.Any("time taken", time.Since(start).Milliseconds()))
|
||||
return nil
|
||||
}
|
||||
pageReadOpts := gorocksdb.NewDefaultReadOptions()
|
||||
defer pageReadOpts.Destroy()
|
||||
pageMsgPrefix := constructKey(PageMsgSizeTitle, topic) + "/"
|
||||
|
||||
pageIter := ri.kv.DB.NewIterator(pageReadOpts)
|
||||
pageIter := rocksdbkv.NewRocksIteratorWithUpperBound(ri.kv.DB, typeutil.AddOne(pageMsgPrefix), pageReadOpts)
|
||||
defer pageIter.Close()
|
||||
pageIter.Seek([]byte(pageMsgPrefix))
|
||||
for ; pageIter.ValidForPrefix([]byte(pageMsgPrefix)); pageIter.Next() {
|
||||
for ; pageIter.Valid(); pageIter.Next() {
|
||||
pKey := pageIter.Key()
|
||||
pageID, err := parsePageID(string(pKey.Data()))
|
||||
if pKey != nil {
|
||||
|
@ -201,7 +202,8 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
|||
}
|
||||
|
||||
log.Debug("Expired check by retention time", zap.Any("topic", topic),
|
||||
zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize), zap.Any("pageCleaned", pageCleaned))
|
||||
zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize),
|
||||
zap.Any("pageCleaned", pageCleaned), zap.Any("time taken", time.Since(start).Milliseconds()))
|
||||
|
||||
for ; pageIter.Valid(); pageIter.Next() {
|
||||
pValue := pageIter.Value()
|
||||
|
@ -234,7 +236,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
|||
}
|
||||
|
||||
if pageEndID == 0 {
|
||||
log.Debug("All messages are not expired, skip retention", zap.Any("topic", topic))
|
||||
log.Debug("All messages are not expired, skip retention", zap.Any("topic", topic), zap.Any("time taken", time.Since(start).Milliseconds()))
|
||||
return nil
|
||||
}
|
||||
expireTime := time.Since(start).Milliseconds()
|
||||
|
@ -251,11 +253,11 @@ func (ri *retentionInfo) calculateTopicAckedSize(topic string) (int64, error) {
|
|||
defer pageReadOpts.Destroy()
|
||||
pageMsgPrefix := constructKey(PageMsgSizeTitle, topic) + "/"
|
||||
// ensure the iterator won't iterate to other topics
|
||||
pageIter := ri.kv.DB.NewIterator(pageReadOpts)
|
||||
pageIter := rocksdbkv.NewRocksIteratorWithUpperBound(ri.kv.DB, typeutil.AddOne(pageMsgPrefix), pageReadOpts)
|
||||
defer pageIter.Close()
|
||||
pageIter.Seek([]byte(pageMsgPrefix))
|
||||
var ackedSize int64
|
||||
for ; pageIter.ValidForPrefix([]byte(pageMsgPrefix)); pageIter.Next() {
|
||||
for ; pageIter.Valid(); pageIter.Next() {
|
||||
key := pageIter.Key()
|
||||
pageID, err := parsePageID(string(key.Data()))
|
||||
if key != nil {
|
||||
|
|
Loading…
Reference in New Issue