Use RocksmqConfig in GlobalParams (#15083)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/15099/head
Cai Yudong 2022-01-10 14:51:34 +08:00 committed by GitHub
parent 8a088f1d3b
commit ee60e95310
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 55 deletions

View File

@ -53,7 +53,8 @@ type GlobalParamTable struct {
once sync.Once
BaseParams BaseParamTable
PulsarCfg pulsarConfig
PulsarCfg pulsarConfig
RocksmqCfg rocksmqConfig
//CommonCfg commonConfig
//KnowhereCfg knowhereConfig
//MsgChannelCfg msgChannelConfig
@ -80,6 +81,7 @@ func (p *GlobalParamTable) Init() {
p.BaseParams.Init()
p.PulsarCfg.init(&p.BaseParams)
p.RocksmqCfg.init(&p.BaseParams)
//p.CommonCfg.init(&p.BaseParams)
//p.KnowhereCfg.init(&p.BaseParams)
//p.MsgChannelCfg.init(&p.BaseParams)
@ -139,6 +141,29 @@ func (p *pulsarConfig) initMaxMessageSize() {
}
}
///////////////////////////////////////////////////////////////////////////////
// --- rocksmq ---
type rocksmqConfig struct {
BaseParams *BaseParamTable
Path string
}
func (p *rocksmqConfig) init(bp *BaseParamTable) {
p.BaseParams = bp
p.initPath()
}
func (p *rocksmqConfig) initPath() {
path, err := p.BaseParams.Load("_RocksmqPath")
if err != nil {
panic(err)
}
p.Path = path
}
///////////////////////////////////////////////////////////////////////////////
// --- common ---
//type commonConfig struct {
// BaseParams *BaseParamTable
@ -449,15 +474,12 @@ type proxyConfig struct {
BaseParams *BaseParamTable
// NetworkPort & IP are not used
NetworkPort int
IP string
NetworkPort int
IP string
NetworkAddress string
Alias string
RocksmqPath string // not used in Proxy
ProxyID UniqueID
TimeTickInterval time.Duration
MsgStreamTimeTickBufSize int64
@ -490,7 +512,6 @@ type proxyConfig struct {
func (p *proxyConfig) init(bp *BaseParamTable) {
p.BaseParams = bp
p.initRocksmqPath()
p.initTimeTickInterval()
// Has to init global msgchannel prefix before other channel names
@ -521,14 +542,6 @@ func (p *proxyConfig) InitAlias(alias string) {
p.Alias = alias
}
func (p *proxyConfig) initRocksmqPath() {
path, err := p.BaseParams.Load("_RocksmqPath")
if err != nil {
panic(err)
}
p.RocksmqPath = path
}
func (p *proxyConfig) initTimeTickInterval() {
interval := p.BaseParams.ParseIntWithDefault("proxy.timeTickInterval", 200)
p.TimeTickInterval = time.Duration(interval) * time.Millisecond
@ -863,8 +876,6 @@ func (p *queryCoordConfig) initDeltaChannelName() {
type queryNodeConfig struct {
BaseParams *BaseParamTable
RocksmqPath string
Alias string
QueryNodeIP string
QueryNodePort int64
@ -933,8 +944,6 @@ func (p *queryNodeConfig) init(bp *BaseParamTable) {
p.initMinioUseSSLStr()
p.initMinioBucketName()
p.initRocksmqPath()
p.initGracefulTime()
p.initFlowGraphMaxQueueLength()
@ -1035,14 +1044,6 @@ func (p *queryNodeConfig) initMinioBucketName() {
p.MinioBucketName = bucketName
}
func (p *queryNodeConfig) initRocksmqPath() {
path, err := p.BaseParams.Load("_RocksmqPath")
if err != nil {
panic(err)
}
p.RocksmqPath = path
}
// advanced params
// stats
func (p *queryNodeConfig) initStatsPublishInterval() {
@ -1158,9 +1159,6 @@ type dataCoordConfig struct {
MinioBucketName string
MinioRootPath string
// --- Rocksmq ---
RocksmqPath string
// --- SEGMENTS ---
SegmentMaxSize float64
SegmentSealProportion float64
@ -1193,8 +1191,6 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) {
p.initChannelWatchPrefix()
p.initRocksmqPath()
p.initSegmentMaxSize()
p.initSegmentSealProportion()
p.initSegAssignmentExpiration()
@ -1224,14 +1220,6 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) {
p.initGCDropTolerance()
}
func (p *dataCoordConfig) initRocksmqPath() {
path, err := p.BaseParams.Load("_RocksmqPath")
if err != nil {
panic(err)
}
p.RocksmqPath = path
}
func (p *dataCoordConfig) initSegmentMaxSize() {
p.SegmentMaxSize = p.BaseParams.ParseFloatWithDefault("dataCoord.segment.maxSize", 512.0)
}
@ -1398,9 +1386,6 @@ type dataNodeConfig struct {
DmlChannelName string
DeltaChannelName string
// Rocksmq path
RocksmqPath string
// Cluster channels
ClusterChannelPrefix string
@ -1434,8 +1419,6 @@ func (p *dataNodeConfig) init(bp *BaseParamTable) {
p.initStatsBinlogRootPath()
p.initDeleteBinlogRootPath()
p.initRocksmqPath()
// Must init global msgchannel prefix before other channel names
p.initClusterMsgChannelPrefix()
p.initTimeTickChannelName()
@ -1500,14 +1483,6 @@ func (p *dataNodeConfig) initDeleteBinlogRootPath() {
p.DeleteBinlogRootPath = path.Join(rootPath, "delta_log")
}
func (p *dataNodeConfig) initRocksmqPath() {
path, err := p.BaseParams.Load("_RocksmqPath")
if err != nil {
panic(err)
}
p.RocksmqPath = path
}
func (p *dataNodeConfig) initClusterMsgChannelPrefix() {
name, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster")
if err != nil {

View File

@ -41,6 +41,13 @@ func TestGlobalParamTable(t *testing.T) {
assert.Equal(t, Params.MaxMessageSize, SuggestPulsarMaxMessageSize)
})
t.Run("test rocksmqConfig", func(t *testing.T) {
Params := GlobalParams.RocksmqCfg
assert.NotEqual(t, Params.Path, "")
t.Logf("rocksmq path = %s", Params.Path)
})
t.Run("test rootCoordConfig", func(t *testing.T) {
Params := GlobalParams.RootCoordCfg
@ -80,8 +87,6 @@ func TestGlobalParamTable(t *testing.T) {
t.Run("test proxyConfig", func(t *testing.T) {
Params := GlobalParams.ProxyCfg
t.Logf("RocksmqPath: %s", Params.RocksmqPath)
t.Logf("TimeTickInterval: %v", Params.TimeTickInterval)
assert.Equal(t, Params.ProxySubName, "by-dev-proxy-0")