fix: close rocksmq and natsmq when shutdown (#30706)

related: #30703 #30355

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
pull/30817/head
smellthemoon 2024-02-26 11:38:55 +08:00 committed by GitHub
parent befe0e21fd
commit 9815cf50c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 12 additions and 3 deletions

View File

@ -40,6 +40,7 @@ import (
internalmetrics "github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/nmq"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/expr"
@ -318,7 +319,15 @@ func (mr *MilvusRoles) Run() {
} else {
paramtable.Init()
}
params := paramtable.Get()
if paramtable.Get().RocksmqEnable() {
defer stopRocksmq()
} else if paramtable.Get().NatsmqEnable() {
defer nmq.CloseNatsMQ()
} else {
panic("only support Rocksmq and Natsmq in standalone mode")
}
if params.EtcdCfg.UseEmbedEtcd.GetAsBool() {
// Start etcd server.
etcd.InitEtcdServer(

View File

@ -174,7 +174,7 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error)
rocksDBLRUCacheCapacity = calculatedCapacity
}
}
log.Debug("Start rocksmq ", zap.Int("max proc", maxProcs),
log.Debug("Start rocksmq", zap.Int("max proc", maxProcs),
zap.Int("parallism", parallelism), zap.Uint64("lru cache", rocksDBLRUCacheCapacity))
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetBlockSize(64 << 10)
@ -219,11 +219,11 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error)
optsStore.IncreaseParallelism(parallelism)
// enable back ground flush
optsStore.SetMaxBackgroundFlushes(1)
// use properties as the column families to store trace id
// properties is not used anymore, keep it for upgrading successfully
optsStore.SetCreateIfMissingColumnFamilies(true)
// db, err := gorocksdb.OpenDb(opts, name)
// use properties as the column families to store trace id
// properties is not used anymore, keep it for upgrading successfully
giveColumnFamilies := []string{"default", "properties"}
db, cfHandles, err := gorocksdb.OpenDbColumnFamilies(optsStore, name, giveColumnFamilies, []*gorocksdb.Options{optsStore, optsStore})
if err != nil {