Fix rocksmq load with prefix (#7678)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/7686/head
yukun 2021-09-10 10:22:01 +08:00 committed by GitHub
parent 34228eb74b
commit 5a303e7672
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 3 deletions

View File

@ -90,6 +90,14 @@ func (kv *RocksdbKV) LoadWithPrefix(key string) ([]string, []string, error) {
return keys, values, nil
}
func (kv *RocksdbKV) ResetPrefixLength(len int) error {
kv.DB.Close()
kv.Opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(len))
var err error
kv.DB, err = gorocksdb.OpenDb(kv.Opts, kv.GetName())
return err
}
func (kv *RocksdbKV) MultiLoad(keys []string) ([]string, error) {
values := make([]string, 0, len(keys))
for _, key := range keys {

View File

@ -113,7 +113,7 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro
opts := gorocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(bbto)
opts.SetCreateIfMissing(true)
opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(FixedChannelNameLen + 1))
opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(FixedChannelNameLen))
db, err := gorocksdb.OpenDb(opts, name)
if err != nil {

View File

@ -76,10 +76,10 @@ func prefixLoad(db *gorocksdb.DB, prefix string) ([]string, []string, error) {
for ; iter.Valid(); iter.Next() {
key := iter.Key()
value := iter.Value()
defer key.Free()
defer value.Free()
keys = append(keys, string(key.Data()))
key.Free()
values = append(values, string(value.Data()))
value.Free()
}
if err := iter.Err(); err != nil {
return nil, nil, err
@ -112,6 +112,7 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf
func (ri *retentionInfo) startRetentionInfo() error {
var wg sync.WaitGroup
ri.kv.ResetPrefixLength(FixedChannelNameLen)
for _, topic := range ri.topics {
log.Debug("Start load retention info", zap.Any("topic", topic))
// Load all page infos