Add rocksmq_path in config and paramtable (#6099)

* Add rocksmq_path in config and paramtable

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Add rocksdbPath in NewRmsFactory

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Change rdb default path to /vat/lib/milvus/rdb_data

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/6130/head
yukun 2021-06-25 19:44:11 +08:00 committed by GitHub
parent 1d9c10bead
commit 39614aa8eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 86 additions and 10 deletions

View File

@ -38,9 +38,9 @@ import (
"github.com/milvus-io/milvus/internal/util/trace"
)
func newMsgFactory(localMsg bool) msgstream.Factory {
func newMsgFactory(localMsg bool, rocksmqPath string) msgstream.Factory {
if localMsg {
return msgstream.NewRmsFactory()
return msgstream.NewRmsFactory(rocksmqPath)
}
return msgstream.NewPmsFactory()
}
@ -87,7 +87,7 @@ func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *compone
defer log.Sync()
}
factory := newMsgFactory(localMsg)
factory := newMsgFactory(localMsg, rootcoord.Params.RocksmqPath)
var err error
rc, err = components.NewRootCoord(ctx, factory)
if err != nil {
@ -116,7 +116,7 @@ func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string
defer log.Sync()
}
factory := newMsgFactory(localMsg)
factory := newMsgFactory(localMsg, proxy.Params.RocksmqPath)
var err error
pn, err = components.NewProxy(ctx, factory)
if err != nil {
@ -144,7 +144,9 @@ func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *compon
defer log.Sync()
}
factory := newMsgFactory(localMsg)
// FIXME(yukun): newMsgFactory requires parameter rocksmqPath, but won't be used here
// so hardcode the path to /tmp/invalid_milvus_rdb
factory := newMsgFactory(localMsg, "/tmp/invalid_milvus_rdb")
var err error
qs, err = components.NewQueryCoord(ctx, factory)
if err != nil {
@ -173,7 +175,7 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias st
defer log.Sync()
}
factory := newMsgFactory(localMsg)
factory := newMsgFactory(localMsg, querynode.Params.RocksmqPath)
var err error
qn, err = components.NewQueryNode(ctx, factory)
if err != nil {
@ -201,7 +203,7 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *compone
defer log.Sync()
}
factory := newMsgFactory(localMsg)
factory := newMsgFactory(localMsg, datacoord.Params.RocksmqPath)
var err error
ds, err = components.NewDataCoord(ctx, factory)
if err != nil {
@ -230,7 +232,7 @@ func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias str
defer log.Sync()
}
factory := newMsgFactory(localMsg)
factory := newMsgFactory(localMsg, datanode.Params.RocksmqPath)
var err error
dn, err = components.NewDataNode(ctx, factory)
if err != nil {

View File

@ -34,6 +34,9 @@ pulsar:
port: 6650
maxMessageSize: 5242880 # 5 * 1024 * 1024 Bytes
rocksmq:
path: /var/lib/milvus/rdb_data
rootCoord:
address: localhost
port: 53100

View File

@ -381,6 +381,7 @@ type GlobalParamsTable struct {
MasterAddress string
PulsarAddress string
RocksmqPath string
ProxyID UniqueID
TimeTickInterval time.Duration

View File

@ -38,6 +38,9 @@ type ParamTable struct {
// --- Pulsar ---
PulsarAddress string
// --- Rocksmq ---
RocksmqPath string
FlushStreamPosSubPath string
StatsStreamPosSubPath string
@ -75,6 +78,7 @@ func (p *ParamTable) Init() {
p.initCollectionBinlogSubPath()
p.initPulsarAddress()
p.initRocksmqPath()
p.initSegmentMaxSize()
p.initSegmentSealProportion()
@ -107,6 +111,14 @@ func (p *ParamTable) initPulsarAddress() {
p.PulsarAddress = addr
}
func (p *ParamTable) initRocksmqPath() {
path, err := p.Load("_RocksmqPath")
if err != nil {
panic(err)
}
p.RocksmqPath = path
}
func (p *ParamTable) initMetaRootPath() {
rootPath, err := p.Load("etcd.rootPath")
if err != nil {

View File

@ -40,6 +40,9 @@ type ParamTable struct {
// --- Pulsar ---
PulsarAddress string
// --- Rocksmq ---
RocksmqPath string
// - seg statistics channel -
SegmentStatisticsChannelName string
@ -88,6 +91,8 @@ func (p *ParamTable) Init() {
// --- Pulsar ---
p.initPulsarAddress()
p.initRocksmqPath()
// - seg statistics channel -
p.initSegmentStatisticsChannelName()
@ -148,6 +153,14 @@ func (p *ParamTable) initPulsarAddress() {
p.PulsarAddress = url
}
func (p *ParamTable) initRocksmqPath() {
path, err := p.Load("_RocksmqPath")
if err != nil {
panic(err)
}
p.RocksmqPath = path
}
func (p *ParamTable) initSegmentStatisticsChannelName() {
path, err := p.Load("msgChannel.chanNamePrefix.dataCoordStatistic")

View File

@ -15,6 +15,7 @@ import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/mqclient"
"github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq"
rocksmqserver "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
@ -103,13 +104,14 @@ func (f *RmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) {
return NewMemMsgStream(ctx, f.ReceiveBufSize)
}
func NewRmsFactory() Factory {
func NewRmsFactory(rocksmqPath string) Factory {
f := &RmsFactory{
dispatcherFactory: ProtoUDFactory{},
ReceiveBufSize: 1024,
RmqBufSize: 1024,
}
rocksmqserver.InitRocksMQ("/tmp/milvus_rdb")
log.Debug("RocksmqPath=" + rocksmqPath)
rocksmqserver.InitRocksMQ(rocksmqPath)
return f
}

View File

@ -40,6 +40,7 @@ type ParamTable struct {
MetaRootPath string
RootCoordAddress string
PulsarAddress string
RocksmqPath string
ProxyID UniqueID
TimeTickInterval time.Duration
@ -78,6 +79,7 @@ func (pt *ParamTable) initParams() {
pt.initEtcdEndpoints()
pt.initMetaRootPath()
pt.initPulsarAddress()
pt.initRocksmqPath()
pt.initTimeTickInterval()
pt.initProxySubName()
pt.initProxyTimeTickChannelNames()
@ -104,6 +106,14 @@ func (pt *ParamTable) initPulsarAddress() {
pt.PulsarAddress = ret
}
func (pt *ParamTable) initRocksmqPath() {
path, err := pt.Load("_RocksmqPath")
if err != nil {
panic(err)
}
pt.RocksmqPath = path
}
func (pt *ParamTable) initTimeTickInterval() {
intervalStr, err := pt.Load("proxy.timeTickInterval")
if err != nil {

View File

@ -26,6 +26,7 @@ type ParamTable struct {
paramtable.BaseTable
PulsarAddress string
RocksmqPath string
EtcdEndpoints []string
MetaRootPath string
@ -94,6 +95,7 @@ func (p *ParamTable) Init() {
p.initMinioBucketName()
p.initPulsarAddress()
p.initRocksmqPath()
p.initEtcdEndpoints()
p.initMetaRootPath()
@ -175,6 +177,14 @@ func (p *ParamTable) initPulsarAddress() {
p.PulsarAddress = url
}
func (p *ParamTable) initRocksmqPath() {
path, err := p.Load("_RocksmqPath")
if err != nil {
panic(err)
}
p.RocksmqPath = path
}
// advanced params
// stats
func (p *ParamTable) initStatsPublishInterval() {

View File

@ -30,6 +30,7 @@ type ParamTable struct {
Port int
PulsarAddress string
RocksmqPath string
EtcdEndpoints []string
MetaRootPath string
KvRootPath string
@ -61,6 +62,7 @@ func (p *ParamTable) Init() {
}
p.initPulsarAddress()
p.initRocksmqPath()
p.initEtcdEndpoints()
p.initMetaRootPath()
p.initKvRootPath()
@ -91,6 +93,14 @@ func (p *ParamTable) initPulsarAddress() {
p.PulsarAddress = addr
}
func (p *ParamTable) initRocksmqPath() {
path, err := p.Load("_RocksmqPath")
if err != nil {
panic(err)
}
p.RocksmqPath = path
}
func (p *ParamTable) initEtcdEndpoints() {
endpoints, err := p.Load("_EtcdEndpoints")
if err != nil {

View File

@ -119,6 +119,19 @@ func (gp *BaseTable) tryloadFromEnv() {
panic(err)
}
rocksmqPath := os.Getenv("ROCKSMQ_PATH")
if rocksmqPath == "" {
path, err := gp.Load("rocksmq.path")
if err != nil {
panic(err)
}
rocksmqPath = path
}
err = gp.Save("_RocksmqPath", rocksmqPath)
if err != nil {
panic(err)
}
rootCoordAddress := os.Getenv("ROOT_COORD_ADDRESS")
if rootCoordAddress == "" {
rootCoordHost, err := gp.Load("rootCoord.address")