Add a channel prefix for all channels (#8166)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/8383/head
Xiaofan 2021-09-23 10:53:53 +08:00 committed by GitHub
parent a62bce360b
commit f5173b595f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 588 additions and 356 deletions

View File

@ -12,6 +12,7 @@
msgChannel:
# channel name generation rule: ${namePrefix}-${ChannelIdx}
chanNamePrefix:
cluster: "by-dev"
rootCoordTimeTick: "rootcoord-timetick"
rootCoordStatistics: "rootcoord-statistics"
rootCoordDml: "rootcoord-dml"
@ -29,7 +30,7 @@ msgChannel:
# sub name generation rule: ${subNamePrefix}-${NodeID}
subNamePrefix:
rootCoordSubNamePrefix: "rootcoord"
rootCoordSubNamePrefix: "rootCoord"
proxySubNamePrefix: "proxy"
queryNodeSubNamePrefix: "queryNode"
dataNodeSubNamePrefix: "dataNode"

View File

@ -46,11 +46,13 @@ type ParamTable struct {
FlushStreamPosSubPath string
StatsStreamPosSubPath string
// segment
// --- SEGMENTS ---
SegmentMaxSize float64
SegmentSealProportion float64
SegAssignmentExpiration int64
// --- Channels ---
ClusterChannelPrefix string
InsertChannelPrefixName string
StatisticsChannelName string
TimeTickChannelName string
@ -63,37 +65,46 @@ type ParamTable struct {
var Params ParamTable
var once sync.Once
/* Init params from base table as well as data coord yaml*/
func (p *ParamTable) Init() {
// load yaml
p.BaseTable.Init()
if err := p.LoadYaml("advanced/data_coord.yaml"); err != nil {
panic(err)
}
// set members
p.initEtcdEndpoints()
p.initMetaRootPath()
p.initKvRootPath()
p.initSegmentBinlogSubPath()
p.initCollectionBinlogSubPath()
p.initPulsarAddress()
p.initRocksmqPath()
p.initSegmentMaxSize()
p.initSegmentSealProportion()
p.initSegAssignmentExpiration()
// Has to init global msgchannel prefix before other channel names
p.initClusterMsgChannelPrefix()
p.initInsertChannelPrefixName()
p.initStatisticsChannelName()
p.initTimeTickChannelName()
p.initSegmentInfoChannelName()
p.initDataCoordSubscriptionName()
p.initLogCfg()
p.initFlushStreamPosSubPath()
p.initStatsStreamPosSubPath()
}
// Init once ensure param table is a singleton
func (p *ParamTable) InitOnce() {
once.Do(func() {
// load yaml
p.BaseTable.Init()
if err := p.LoadYaml("advanced/data_coord.yaml"); err != nil {
panic(err)
}
// set members
p.initEtcdEndpoints()
p.initMetaRootPath()
p.initKvRootPath()
p.initSegmentBinlogSubPath()
p.initCollectionBinlogSubPath()
p.initPulsarAddress()
p.initRocksmqPath()
p.initSegmentMaxSize()
p.initSegmentSealProportion()
p.initSegAssignmentExpiration()
p.initInsertChannelPrefixName()
p.initStatisticsChannelName()
p.initTimeTickChannelName()
p.initSegmentInfoChannelName()
p.initDataCoordSubscriptionName()
p.initLogCfg()
p.initFlushStreamPosSubPath()
p.initStatsStreamPosSubPath()
p.Init()
})
}
@ -173,44 +184,57 @@ func (p *ParamTable) initSegAssignmentExpiration() {
p.SegAssignmentExpiration = p.ParseInt64("datacoord.segment.assignmentExpiration")
}
func (p *ParamTable) initInsertChannelPrefixName() {
var err error
p.InsertChannelPrefixName, err = p.Load("msgChannel.chanNamePrefix.dataCoordInsertChannel")
func (p *ParamTable) initClusterMsgChannelPrefix() {
config, err := p.Load("msgChannel.chanNamePrefix.cluster")
if err != nil {
panic(err)
}
p.ClusterChannelPrefix = config
}
func (p *ParamTable) initInsertChannelPrefixName() {
config, err := p.Load("msgChannel.chanNamePrefix.dataCoordInsertChannel")
if err != nil {
panic(err)
}
s := []string{p.ClusterChannelPrefix, config}
p.InsertChannelPrefixName = strings.Join(s, "-")
}
func (p *ParamTable) initStatisticsChannelName() {
var err error
p.StatisticsChannelName, err = p.Load("msgChannel.chanNamePrefix.dataCoordStatistic")
config, err := p.Load("msgChannel.chanNamePrefix.dataCoordStatistic")
if err != nil {
panic(err)
}
s := []string{p.ClusterChannelPrefix, config}
p.StatisticsChannelName = strings.Join(s, "-")
}
func (p *ParamTable) initTimeTickChannelName() {
var err error
p.TimeTickChannelName, err = p.Load("msgChannel.chanNamePrefix.dataCoordTimeTick")
config, err := p.Load("msgChannel.chanNamePrefix.dataCoordTimeTick")
if err != nil {
panic(err)
}
s := []string{p.ClusterChannelPrefix, config}
p.TimeTickChannelName = strings.Join(s, "-")
}
func (p *ParamTable) initSegmentInfoChannelName() {
var err error
p.SegmentInfoChannelName, err = p.Load("msgChannel.chanNamePrefix.dataCoordSegmentInfo")
config, err := p.Load("msgChannel.chanNamePrefix.dataCoordSegmentInfo")
if err != nil {
panic(err)
}
s := []string{p.ClusterChannelPrefix, config}
p.SegmentInfoChannelName = strings.Join(s, "-")
}
func (p *ParamTable) initDataCoordSubscriptionName() {
var err error
p.DataCoordSubscriptionName, err = p.Load("msgChannel.subNamePrefix.dataCoordSubNamePrefix")
config, err := p.Load("msgChannel.subNamePrefix.dataCoordSubNamePrefix")
if err != nil {
panic(err)
}
s := []string{p.ClusterChannelPrefix, config}
p.DataCoordSubscriptionName = strings.Join(s, "-")
}
func (p *ParamTable) initLogCfg() {

View File

@ -0,0 +1,39 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datacoord
import (
"testing"
"github.com/stretchr/testify/assert"
)
//TODO add more test for other parameters
func TestParamTable(t *testing.T) {
Params.Init()
assert.Equal(t, Params.InsertChannelPrefixName, "by-dev-insert-channel-")
t.Logf("data coord insert channel = %s", Params.InsertChannelPrefixName)
assert.Equal(t, Params.StatisticsChannelName, "by-dev-datacoord-statistics-channel")
t.Logf("data coord stats channel = %s", Params.StatisticsChannelName)
assert.Equal(t, Params.TimeTickChannelName, "by-dev-datacoord-timetick-channel")
t.Logf("data coord timetick channel = %s", Params.TimeTickChannelName)
assert.Equal(t, Params.SegmentInfoChannelName, "by-dev-segment-info-channel")
t.Logf("data coord segment info channel = %s", Params.SegmentInfoChannelName)
assert.Equal(t, Params.DataCoordSubscriptionName, "by-dev-dataCoord")
t.Logf("data coord subscription channel = %s", Params.DataCoordSubscriptionName)
}

View File

@ -1431,8 +1431,8 @@ func TestPostFlush(t *testing.T) {
func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server {
Params.Init()
Params.TimeTickChannelName = strconv.Itoa(rand.Int())
Params.StatisticsChannelName = strconv.Itoa(rand.Int())
Params.TimeTickChannelName = Params.TimeTickChannelName + strconv.Itoa(rand.Int())
Params.StatisticsChannelName = Params.StatisticsChannelName + strconv.Itoa(rand.Int())
var err error
factory := msgstream.NewPmsFactory()
m := map[string]interface{}{

View File

@ -17,6 +17,7 @@ import (
"math"
"math/rand"
"os"
"strconv"
"strings"
"sync"
"testing"
@ -43,7 +44,9 @@ func TestMain(t *testing.M) {
rand.Seed(time.Now().Unix())
Params.InitAlias("datanode-alias-1")
Params.Init()
refreshChannelNames()
// change to specific channel for test
Params.TimeTickChannelName = Params.TimeTickChannelName + strconv.Itoa(rand.Int())
Params.SegmentStatisticsChannelName = Params.SegmentStatisticsChannelName + strconv.Itoa(rand.Int())
code := t.Run()
os.Exit(code)
}

View File

@ -99,11 +99,6 @@ func makeNewChannelNames(names []string, suffix string) []string {
return ret
}
func refreshChannelNames() {
Params.SegmentStatisticsChannelName = "datanode-refresh-segment-statistics"
Params.TimeTickChannelName = "datanode-refresh-hard-timetick"
}
func clearEtcd(rootPath string) error {
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, rootPath)
if err != nil {

View File

@ -44,6 +44,9 @@ type ParamTable struct {
// --- Rocksmq ---
RocksmqPath string
// --- Cluster channels ---
ClusterChannelPrefix string
// - seg statistics channel -
SegmentStatisticsChannelName string
@ -75,47 +78,54 @@ func (p *ParamTable) InitAlias(alias string) {
p.Alias = alias
}
func (p *ParamTable) Init() {
func (p *ParamTable) InitOnce() {
once.Do(func() {
p.BaseTable.Init()
err := p.LoadYaml("advanced/data_node.yaml")
if err != nil {
panic(err)
}
// === DataNode Internal Components Configs ===
p.initFlowGraphMaxQueueLength()
p.initFlowGraphMaxParallelism()
p.initFlushInsertBufferSize()
p.initInsertBinlogRootPath()
p.initStatsBinlogRootPath()
p.initLogCfg()
// === DataNode External Components Configs ===
// --- Pulsar ---
p.initPulsarAddress()
p.initRocksmqPath()
// - seg statistics channel -
p.initSegmentStatisticsChannelName()
// - timetick channel -
p.initTimeTickChannelName()
// --- ETCD ---
p.initEtcdEndpoints()
p.initMetaRootPath()
// --- MinIO ---
p.initMinioAddress()
p.initMinioAccessKeyID()
p.initMinioSecretAccessKey()
p.initMinioUseSSL()
p.initMinioBucketName()
p.Init()
})
}
func (p *ParamTable) Init() {
p.BaseTable.Init()
err := p.LoadYaml("advanced/data_node.yaml")
if err != nil {
panic(err)
}
// === DataNode Internal Components Configs ===
p.initFlowGraphMaxQueueLength()
p.initFlowGraphMaxParallelism()
p.initFlushInsertBufferSize()
p.initInsertBinlogRootPath()
p.initStatsBinlogRootPath()
p.initLogCfg()
// === DataNode External Components Configs ===
// --- Pulsar ---
p.initPulsarAddress()
p.initRocksmqPath()
// Has to init global msgchannel prefix before other channel names
p.initClusterMsgChannelPrefix()
// - seg statistics channel -
p.initSegmentStatisticsChannelName()
// - timetick channel -
p.initTimeTickChannelName()
// --- ETCD ---
p.initEtcdEndpoints()
p.initMetaRootPath()
// --- MinIO ---
p.initMinioAddress()
p.initMinioAccessKeyID()
p.initMinioSecretAccessKey()
p.initMinioUseSSL()
p.initMinioBucketName()
}
// ==== DataNode internal components configs ====
// ---- flowgraph configs ----
func (p *ParamTable) initFlowGraphMaxQueueLength() {
@ -165,30 +175,40 @@ func (p *ParamTable) initRocksmqPath() {
p.RocksmqPath = path
}
func (p *ParamTable) initSegmentStatisticsChannelName() {
path, err := p.Load("msgChannel.chanNamePrefix.dataCoordStatistic")
func (p *ParamTable) initClusterMsgChannelPrefix() {
name, err := p.Load("msgChannel.chanNamePrefix.cluster")
if err != nil {
panic(err)
}
p.SegmentStatisticsChannelName = path
p.ClusterChannelPrefix = name
}
func (p *ParamTable) initSegmentStatisticsChannelName() {
config, err := p.Load("msgChannel.chanNamePrefix.dataCoordStatistic")
if err != nil {
panic(err)
}
s := []string{p.ClusterChannelPrefix, config}
p.SegmentStatisticsChannelName = strings.Join(s, "-")
}
func (p *ParamTable) initTimeTickChannelName() {
path, err := p.Load("msgChannel.chanNamePrefix.dataCoordTimeTick")
config, err := p.Load("msgChannel.chanNamePrefix.dataCoordTimeTick")
if err != nil {
panic(err)
}
p.TimeTickChannelName = path
s := []string{p.ClusterChannelPrefix, config}
p.TimeTickChannelName = strings.Join(s, "-")
}
// - msg channel subname -
func (p *ParamTable) initMsgChannelSubName() {
name, err := p.Load("msgChannel.subNamePrefix.dataNodeSubNamePrefix")
config, err := p.Load("msgChannel.subNamePrefix.dataNodeSubNamePrefix")
if err != nil {
panic(err)
}
p.MsgChannelSubName = name + "-" + strconv.FormatInt(p.NodeID, 10)
s := []string{p.ClusterChannelPrefix, config, strconv.FormatInt(p.NodeID, 10)}
p.MsgChannelSubName = strings.Join(s, "-")
}
// --- ETCD ---

View File

@ -15,10 +15,14 @@ import (
"log"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestParamTable_DataNode(t *testing.T) {
func TestParamTable(t *testing.T) {
Params.Init()
Params.NodeID = 2
Params.initMsgChannelSubName()
t.Run("Test NodeID", func(t *testing.T) {
id := Params.NodeID
log.Println("NodeID:", id)
@ -55,18 +59,27 @@ func TestParamTable_DataNode(t *testing.T) {
log.Println("PulsarAddress:", address)
})
t.Run("Test ClusterChannelPrefix", func(t *testing.T) {
path := Params.ClusterChannelPrefix
assert.Equal(t, path, "by-dev")
log.Println("ClusterChannelPrefix:", Params.ClusterChannelPrefix)
})
t.Run("Test SegmentStatisticsChannelName", func(t *testing.T) {
path := Params.SegmentStatisticsChannelName
assert.Equal(t, path, "by-dev-datacoord-statistics-channel")
log.Println("SegmentStatisticsChannelName:", path)
})
t.Run("Test TimeTickChannelName", func(t *testing.T) {
name := Params.TimeTickChannelName
assert.Equal(t, name, "by-dev-datacoord-timetick-channel")
log.Println("TimeTickChannelName:", name)
})
t.Run("Test msgChannelSubName", func(t *testing.T) {
name := Params.MsgChannelSubName
assert.Equal(t, name, "by-dev-dataNode-2")
log.Println("MsgChannelSubName:", name)
})

View File

@ -69,12 +69,11 @@ func NewServer(ctx context.Context, factory msgstream.Factory, opts ...datacoord
func (s *Server) init() error {
Params.Init()
Params.LoadFromEnv()
closer := trace.InitTracing("datacoord")
s.closer = closer
datacoord.Params.Init()
datacoord.Params.InitOnce()
datacoord.Params.IP = Params.IP
datacoord.Params.Port = Params.Port

View File

@ -48,6 +48,9 @@ func (pt *ParamTable) Init() {
pt.initDataCoordAddress()
pt.initPort()
pt.LoadFromEnv()
pt.LoadFromArgs()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
})

View File

@ -169,10 +169,8 @@ func (s *Server) Stop() error {
func (s *Server) init() error {
ctx := context.Background()
Params.Init()
Params.LoadFromEnv()
Params.LoadFromArgs()
dn.Params.Init()
dn.Params.InitOnce()
dn.Params.Port = Params.Port
dn.Params.IP = Params.IP

View File

@ -65,7 +65,8 @@ func (s *Server) Run() error {
func (s *Server) init() error {
Params.Init()
indexcoord.Params.Init()
indexcoord.Params.InitOnce()
indexcoord.Params.Address = Params.ServiceAddress
indexcoord.Params.Port = Params.ServicePort

View File

@ -46,6 +46,13 @@ func (pt *ParamTable) Init() {
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
if !funcutil.CheckPortAvailable(pt.Port) {
pt.Port = funcutil.GetAvailablePort()
log.Warn("IndexNode init", zap.Any("Port", pt.Port))
}
pt.LoadFromEnv()
pt.LoadFromArgs()
})
}

View File

@ -92,14 +92,8 @@ func (s *Server) startGrpcLoop(grpcPort int) {
func (s *Server) init() error {
var err error
Params.Init()
if !funcutil.CheckPortAvailable(Params.Port) {
Params.Port = funcutil.GetAvailablePort()
log.Warn("IndexNode init", zap.Any("Port", Params.Port))
}
Params.LoadFromEnv()
Params.LoadFromArgs()
indexnode.Params.Init()
indexnode.Params.InitOnce()
indexnode.Params.Port = Params.Port
indexnode.Params.IP = Params.IP
indexnode.Params.Address = Params.Address

View File

@ -47,6 +47,10 @@ func (pt *ParamTable) Init() {
pt.BaseTable.Init()
pt.initParams()
pt.LoadFromEnv()
pt.LoadFromArgs()
pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
})

View File

@ -132,11 +132,8 @@ func (s *Server) init() error {
Params.Port = funcutil.GetAvailablePort()
log.Warn("Proxy init", zap.Any("Port", Params.Port))
}
Params.LoadFromEnv()
Params.LoadFromArgs()
Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10)
proxy.Params.Init()
proxy.Params.InitOnce()
log.Debug("init params done ...")
// NetworkPort & IP don't matter here, NetworkAddress matters

View File

@ -87,7 +87,8 @@ func (s *Server) Run() error {
func (s *Server) init() error {
Params.Init()
qc.Params.Init()
qc.Params.InitOnce()
qc.Params.Port = Params.Port
closer := trace.InitTracing("querycoord")

View File

@ -51,6 +51,9 @@ func (pt *ParamTable) Init() {
pt.initDataCoordAddress()
pt.initQueryCoordAddress()
pt.LoadFromEnv()
pt.LoadFromArgs()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
})

View File

@ -75,10 +75,8 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
func (s *Server) init() error {
Params.Init()
Params.LoadFromEnv()
Params.LoadFromArgs()
qn.Params.Init()
qn.Params.InitOnce()
qn.Params.QueryNodeIP = Params.QueryNodeIP
qn.Params.QueryNodePort = int64(Params.QueryNodePort)
qn.Params.QueryNodeID = Params.QueryNodeID

View File

@ -128,7 +128,7 @@ func (s *Server) Run() error {
func (s *Server) init() error {
Params.Init()
rootcoord.Params.Init()
rootcoord.Params.InitOnce()
rootcoord.Params.Address = Params.Address
rootcoord.Params.Port = Params.Port
log.Debug("grpc init done ...")

View File

@ -112,8 +112,8 @@ func (i *IndexCoord) Register() error {
func (i *IndexCoord) Init() error {
var initErr error = nil
Params.InitOnce()
i.initOnce.Do(func() {
Params.Init()
log.Debug("IndexCoord", zap.Any("etcd endpoints", Params.EtcdEndpoints))
i.UpdateStateCode(internalpb.StateCode_Initializing)

View File

@ -44,17 +44,27 @@ var Params ParamTable
var once sync.Once
func (pt *ParamTable) Init() {
pt.BaseTable.Init()
// TODO, load index_node.yaml
/*err := pt.LoadYaml("advanced/index_coord.yaml")
if err != nil {
panic(err)
}*/
pt.initLogCfg()
pt.initEtcdEndpoints()
pt.initMetaRootPath()
pt.initKvRootPath()
pt.initMinIOAddress()
pt.initMinIOAccessKeyID()
pt.initMinIOSecretAccessKey()
pt.initMinIOUseSSL()
pt.initMinioBucketName()
}
func (pt *ParamTable) InitOnce() {
once.Do(func() {
pt.BaseTable.Init()
pt.initLogCfg()
pt.initEtcdEndpoints()
pt.initMetaRootPath()
pt.initKvRootPath()
pt.initMinIOAddress()
pt.initMinIOAccessKeyID()
pt.initMinIOSecretAccessKey()
pt.initMinIOUseSSL()
pt.initMinioBucketName()
pt.Init()
})
}

View File

@ -112,7 +112,6 @@ func (i *IndexNode) initKnowhere() {
}
func (i *IndexNode) Init() error {
Params.Init()
i.UpdateStateCode(internalpb.StateCode_Initializing)
log.Debug("IndexNode", zap.Any("State", internalpb.StateCode_Initializing))
connectEtcdFn := func() error {

View File

@ -60,14 +60,25 @@ func (pt *ParamTable) InitAlias(alias string) {
}
func (pt *ParamTable) Init() {
pt.BaseTable.Init()
if err := pt.LoadYaml("advanced/knowhere.yaml"); err != nil {
panic(err)
}
// TODO, load index_node.yaml
/*err := pt.LoadYaml("advanced/index_node.yaml")
if err != nil {
panic(err)
}*/
pt.initLogCfg()
pt.initParams()
pt.initKnowhereSimdType()
}
func (pt *ParamTable) InitOnce() {
once.Do(func() {
pt.BaseTable.Init()
if err := pt.LoadYaml("advanced/knowhere.yaml"); err != nil {
panic(err)
}
pt.initLogCfg()
pt.initParams()
pt.initKnowhereSimdType()
pt.Init()
})
}

View File

@ -47,19 +47,24 @@ type ParamTable struct {
RocksmqPath string // not used in Proxy
ProxyID UniqueID
TimeTickInterval time.Duration
ProxyID UniqueID
TimeTickInterval time.Duration
MsgStreamTimeTickBufSize int64
MaxNameLength int64
MaxFieldNum int64
MaxShardNum int32
MaxDimension int64
DefaultPartitionName string
DefaultIndexName string
// --- Channels ---
ClusterChannelPrefix string
ProxyTimeTickChannelNames []string
ProxySubName string
// required from query coord
SearchResultChannelNames []string
RetrieveResultChannelNames []string
ProxySubName string
ProxyTimeTickChannelNames []string
MsgStreamTimeTickBufSize int64
MaxNameLength int64
MaxFieldNum int64
MaxShardNum int32
MaxDimension int64
DefaultPartitionName string
DefaultIndexName string
MaxTaskNum int64
@ -71,24 +76,27 @@ type ParamTable struct {
var Params ParamTable
var once sync.Once
func (pt *ParamTable) Init() {
func (pt *ParamTable) InitOnce() {
once.Do(func() {
pt.BaseTable.Init()
err := pt.LoadYaml("advanced/proxy.yaml")
if err != nil {
panic(err)
}
pt.initParams()
pt.Init()
})
}
func (pt *ParamTable) initParams() {
func (pt *ParamTable) Init() {
pt.BaseTable.Init()
err := pt.LoadYaml("advanced/proxy.yaml")
if err != nil {
panic(err)
}
pt.initLogCfg()
pt.initEtcdEndpoints()
pt.initMetaRootPath()
pt.initPulsarAddress()
pt.initRocksmqPath()
pt.initTimeTickInterval()
// Has to init global msgchannel prefix before other channel names
pt.initClusterMsgChannelPrefix()
pt.initProxySubName()
pt.initProxyTimeTickChannelNames()
pt.initMsgStreamTimeTickBufSize()
@ -137,20 +145,30 @@ func (pt *ParamTable) initTimeTickInterval() {
pt.TimeTickInterval = time.Duration(interval) * time.Millisecond
}
func (pt *ParamTable) initProxySubName() {
prefix, err := pt.Load("msgChannel.subNamePrefix.proxySubNamePrefix")
func (pt *ParamTable) initClusterMsgChannelPrefix() {
config, err := pt.Load("msgChannel.chanNamePrefix.cluster")
if err != nil {
panic(err)
}
pt.ProxySubName = prefix + "-" + strconv.FormatInt(pt.ProxyID, 10)
pt.ClusterChannelPrefix = config
}
func (pt *ParamTable) initProxySubName() {
config, err := pt.Load("msgChannel.subNamePrefix.proxySubNamePrefix")
if err != nil {
panic(err)
}
s := []string{pt.ClusterChannelPrefix, config, strconv.FormatInt(pt.ProxyID, 10)}
pt.ProxySubName = strings.Join(s, "-")
}
func (pt *ParamTable) initProxyTimeTickChannelNames() {
prefix, err := pt.Load("msgChannel.chanNamePrefix.proxyTimeTick")
config, err := pt.Load("msgChannel.chanNamePrefix.proxyTimeTick")
if err != nil {
panic(err)
}
prefix += "-0"
s := []string{pt.ClusterChannelPrefix, config, "0"}
prefix := strings.Join(s, "-")
pt.ProxyTimeTickChannelNames = []string{prefix}
}

View File

@ -13,6 +13,8 @@ package proxy
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestParamTable_Normal(t *testing.T) {
@ -39,10 +41,12 @@ func TestParamTable_Normal(t *testing.T) {
})
t.Run("ProxySubName", func(t *testing.T) {
assert.Equal(t, Params.ProxySubName, "by-dev-proxy-0")
t.Logf("ProxySubName: %s", Params.ProxySubName)
})
t.Run("ProxyTimeTickChannelNames", func(t *testing.T) {
assert.Equal(t, Params.ProxyTimeTickChannelNames, []string{"by-dev-proxyTimeTick-0"})
t.Logf("ProxyTimeTickChannelNames: %v", Params.ProxyTimeTickChannelNames)
})

View File

@ -151,6 +151,8 @@ func (node *Proxy) Init() error {
}
log.Debug("Proxy CreateQueryChannel success")
// TODO SearchResultChannelNames and RetrieveResultChannelNames should not be part in the Param table
// we should maintain a separate map for search result
Params.SearchResultChannelNames = []string{resp.ResultChannel}
Params.RetrieveResultChannelNames = []string{resp.ResultChannel}
log.Debug("Proxy CreateQueryChannel success", zap.Any("SearchResultChannelNames", Params.SearchResultChannelNames))

View File

@ -43,7 +43,8 @@ type ParamTable struct {
Log log.Config
RoleName string
// search
// channels
ClusterChannelPrefix string
SearchChannelPrefix string
SearchResultChannelPrefix string
@ -63,41 +64,47 @@ type ParamTable struct {
var Params ParamTable
var once sync.Once
func (p *ParamTable) Init() {
func (p *ParamTable) InitOnce() {
once.Do(func() {
p.BaseTable.Init()
err := p.LoadYaml("advanced/query_node.yaml")
if err != nil {
panic(err)
}
err = p.LoadYaml("milvus.yaml")
if err != nil {
panic(err)
}
p.initLogCfg()
p.initStatsChannelName()
p.initTimeTickChannelName()
p.initQueryCoordAddress()
p.initRoleName()
p.initSearchChannelPrefix()
p.initSearchResultChannelPrefix()
// --- ETCD ---
p.initEtcdEndpoints()
p.initMetaRootPath()
p.initKvRootPath()
//--- Minio ----
p.initMinioEndPoint()
p.initMinioAccessKeyID()
p.initMinioSecretAccessKey()
p.initMinioUseSSLStr()
p.initMinioBucketName()
p.Init()
})
}
func (p *ParamTable) Init() {
p.BaseTable.Init()
err := p.LoadYaml("advanced/query_node.yaml")
if err != nil {
panic(err)
}
err = p.LoadYaml("milvus.yaml")
if err != nil {
panic(err)
}
p.initLogCfg()
p.initQueryCoordAddress()
p.initRoleName()
// --- Channels ---
p.initClusterMsgChannelPrefix()
p.initSearchChannelPrefix()
p.initSearchResultChannelPrefix()
p.initStatsChannelName()
p.initTimeTickChannelName()
// --- ETCD ---
p.initEtcdEndpoints()
p.initMetaRootPath()
p.initKvRootPath()
//--- Minio ----
p.initMinioEndPoint()
p.initMinioAccessKeyID()
p.initMinioSecretAccessKey()
p.initMinioUseSSLStr()
p.initMinioBucketName()
}
func (p *ParamTable) initLogCfg() {
p.Log = log.Config{}
@ -125,23 +132,6 @@ func (p *ParamTable) initLogCfg() {
}
}
func (p *ParamTable) initStatsChannelName() {
channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats")
if err != nil {
panic(err)
}
p.StatsChannelName = channels
}
func (p *ParamTable) initTimeTickChannelName() {
timeTickChannelName, err := p.Load("msgChannel.chanNamePrefix.queryTimeTick")
if err != nil {
panic(err)
}
p.TimeTickChannelName = timeTickChannelName
}
func (p *ParamTable) initQueryCoordAddress() {
url, err := p.Load("_QueryCoordAddress")
if err != nil {
@ -154,22 +144,49 @@ func (p *ParamTable) initRoleName() {
p.RoleName = fmt.Sprintf("%s-%d", "QueryCoord", p.NodeID)
}
func (p *ParamTable) initClusterMsgChannelPrefix() {
config, err := p.Load("msgChannel.chanNamePrefix.cluster")
if err != nil {
panic(err)
}
p.ClusterChannelPrefix = config
}
func (p *ParamTable) initSearchChannelPrefix() {
channelName, err := p.Load("msgChannel.chanNamePrefix.search")
config, err := p.Load("msgChannel.chanNamePrefix.search")
if err != nil {
log.Error(err.Error())
}
p.SearchChannelPrefix = channelName
s := []string{p.ClusterChannelPrefix, config}
p.SearchChannelPrefix = strings.Join(s, "-")
}
func (p *ParamTable) initSearchResultChannelPrefix() {
channelName, err := p.Load("msgChannel.chanNamePrefix.searchResult")
config, err := p.Load("msgChannel.chanNamePrefix.searchResult")
if err != nil {
log.Error(err.Error())
}
s := []string{p.ClusterChannelPrefix, config}
p.SearchResultChannelPrefix = strings.Join(s, "-")
}
p.SearchResultChannelPrefix = channelName
func (p *ParamTable) initStatsChannelName() {
config, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats")
if err != nil {
panic(err)
}
s := []string{p.ClusterChannelPrefix, config}
p.StatsChannelName = strings.Join(s, "-")
}
func (p *ParamTable) initTimeTickChannelName() {
config, err := p.Load("msgChannel.chanNamePrefix.queryTimeTick")
if err != nil {
panic(err)
}
s := []string{p.ClusterChannelPrefix, config}
p.TimeTickChannelName = strings.Join(s, "-")
}
func (p *ParamTable) initEtcdEndpoints() {

View File

@ -0,0 +1,35 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querycoord
import (
"testing"
"github.com/stretchr/testify/assert"
)
//TODO add more test for other parameters
func TestParamTable(t *testing.T) {
Params.Init()
assert.Equal(t, Params.SearchChannelPrefix, "by-dev-search")
t.Logf("query coord search channel = %s", Params.SearchChannelPrefix)
assert.Equal(t, Params.SearchResultChannelPrefix, "by-dev-searchResult")
t.Logf("query coord search result channel = %s", Params.SearchResultChannelPrefix)
assert.Equal(t, Params.StatsChannelName, "by-dev-query-node-stats")
t.Logf("query coord stats channel = %s", Params.StatsChannelName)
assert.Equal(t, Params.TimeTickChannelName, "by-dev-queryTimeTick")
t.Logf("query coord time tick channel = %s", Params.TimeTickChannelName)
}

View File

@ -46,7 +46,7 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
SearchPulsarBufSize: Params.SearchPulsarBufSize,
SearchResultReceiveBufSize: Params.SearchResultReceiveBufSize,
RetrieveReceiveBufSize: Params.RetrieveReceiveBufSize,
RetrievePulsarBufSize: Params.retrievePulsarBufSize,
RetrievePulsarBufSize: Params.RetrievePulsarBufSize,
RetrieveResultReceiveBufSize: Params.RetrieveResultReceiveBufSize,
},
}

View File

@ -30,11 +30,16 @@ type ParamTable struct {
EtcdEndpoints []string
MetaRootPath string
Alias string
QueryNodeIP string
QueryNodePort int64
QueryNodeID UniqueID
Alias string
QueryNodeIP string
QueryNodePort int64
QueryNodeID UniqueID
// channel prefix
ClusterChannelPrefix string
QueryTimeTickChannelName string
StatsChannelName string
MsgChannelSubName string
FlowGraphMaxQueueLength int32
FlowGraphMaxParallelism int32
@ -57,16 +62,14 @@ type ParamTable struct {
RetrieveChannelNames []string
RetrieveResultChannelNames []string
RetrieveReceiveBufSize int64
retrievePulsarBufSize int64
RetrievePulsarBufSize int64
RetrieveResultReceiveBufSize int64
// stats
StatsPublishInterval int
StatsChannelName string
GracefulTime int64
MsgChannelSubName string
SliceIndex int
GracefulTime int64
SliceIndex int
// segcore
ChunkRows int64
@ -82,55 +85,53 @@ func (p *ParamTable) InitAlias(alias string) {
p.Alias = alias
}
func (p *ParamTable) Init() {
func (p *ParamTable) InitOnce() {
once.Do(func() {
p.BaseTable.Init()
if err := p.LoadYaml("advanced/query_node.yaml"); err != nil {
panic(err)
}
if err := p.LoadYaml("advanced/knowhere.yaml"); err != nil {
panic(err)
}
//p.initQueryTimeTickChannelName()
p.initMinioEndPoint()
p.initMinioAccessKeyID()
p.initMinioSecretAccessKey()
p.initMinioUseSSLStr()
p.initMinioBucketName()
p.initPulsarAddress()
p.initRocksmqPath()
p.initEtcdEndpoints()
p.initMetaRootPath()
p.initGracefulTime()
p.initFlowGraphMaxQueueLength()
p.initFlowGraphMaxParallelism()
p.initSearchReceiveBufSize()
p.initSearchPulsarBufSize()
p.initSearchResultReceiveBufSize()
p.initStatsPublishInterval()
p.initStatsChannelName()
p.initSegcoreChunkRows()
p.initKnowhereSimdType()
p.initLogCfg()
p.Init()
})
}
// ---------------------------------------------------------- query node
func (p *ParamTable) initQueryTimeTickChannelName() {
ch, err := p.Load("msgChannel.chanNamePrefix.queryTimeTick")
if err != nil {
log.Warn(err.Error())
func (p *ParamTable) Init() {
p.BaseTable.Init()
if err := p.LoadYaml("advanced/query_node.yaml"); err != nil {
panic(err)
}
p.QueryTimeTickChannelName = ch
if err := p.LoadYaml("advanced/knowhere.yaml"); err != nil {
panic(err)
}
p.initMinioEndPoint()
p.initMinioAccessKeyID()
p.initMinioSecretAccessKey()
p.initMinioUseSSLStr()
p.initMinioBucketName()
p.initPulsarAddress()
p.initRocksmqPath()
p.initEtcdEndpoints()
p.initMetaRootPath()
p.initGracefulTime()
p.initFlowGraphMaxQueueLength()
p.initFlowGraphMaxParallelism()
p.initSearchReceiveBufSize()
p.initSearchPulsarBufSize()
p.initSearchResultReceiveBufSize()
// Has to init global msgchannel prefix before other channel names
p.initClusterMsgChannelPrefix()
p.initQueryTimeTickChannelName()
p.initStatsChannelName()
p.initMsgChannelSubName()
p.initStatsPublishInterval()
p.initSegcoreChunkRows()
p.initKnowhereSimdType()
p.initLogCfg()
}
// ---------------------------------------------------------- minio
@ -222,6 +223,44 @@ func (p *ParamTable) initSearchResultReceiveBufSize() {
p.SearchResultReceiveBufSize = p.ParseInt64("queryNode.msgStream.searchResult.recvBufSize")
}
// ------------------------ channel names
func (p *ParamTable) initClusterMsgChannelPrefix() {
name, err := p.Load("msgChannel.chanNamePrefix.cluster")
if err != nil {
panic(err)
}
p.ClusterChannelPrefix = name
}
func (p *ParamTable) initQueryTimeTickChannelName() {
config, err := p.Load("msgChannel.chanNamePrefix.queryTimeTick")
if err != nil {
log.Warn(err.Error())
}
s := []string{p.ClusterChannelPrefix, config}
p.QueryTimeTickChannelName = strings.Join(s, "-")
}
func (p *ParamTable) initMsgChannelSubName() {
namePrefix, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix")
if err != nil {
log.Warn(err.Error())
}
s := []string{p.ClusterChannelPrefix, namePrefix, strconv.FormatInt(p.QueryNodeID, 10)}
p.MsgChannelSubName = strings.Join(s, "-")
}
func (p *ParamTable) initStatsChannelName() {
config, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats")
if err != nil {
panic(err)
}
s := []string{p.ClusterChannelPrefix, config}
p.StatsChannelName = strings.Join(s, "-")
}
// ETCD configs
func (p *ParamTable) initEtcdEndpoints() {
endpoints, err := p.Load("_EtcdEndpoints")
if err != nil {
@ -246,23 +285,6 @@ func (p *ParamTable) initGracefulTime() {
p.GracefulTime = p.ParseInt64("queryNode.gracefulTime")
}
func (p *ParamTable) initMsgChannelSubName() {
namePrefix, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix")
if err != nil {
log.Warn(err.Error())
}
subName := namePrefix + "-" + strconv.FormatInt(p.QueryNodeID, 10)
p.MsgChannelSubName = subName
}
func (p *ParamTable) initStatsChannelName() {
channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats")
if err != nil {
panic(err)
}
p.StatsChannelName = channels
}
func (p *ParamTable) initSegcoreChunkRows() {
p.ChunkRows = p.ParseInt64("queryNode.segcore.chunkRows")
}

View File

@ -26,13 +26,6 @@ func TestParamTable_PulsarAddress(t *testing.T) {
assert.Equal(t, "6650", split[len(split)-1])
}
func TestParamTable_QueryNode(t *testing.T) {
t.Run("Test time tick channel", func(t *testing.T) {
ch := Params.QueryTimeTickChannelName
assert.Equal(t, ch, "queryTimeTick")
})
}
func TestParamTable_minio(t *testing.T) {
t.Run("Test endPoint", func(t *testing.T) {
endPoint := Params.MinioEndPoint
@ -87,16 +80,22 @@ func TestParamTable_flowGraphMaxParallelism(t *testing.T) {
}
func TestParamTable_msgChannelSubName(t *testing.T) {
Params.QueryNodeID = 3
Params.initMsgChannelSubName()
name := Params.MsgChannelSubName
expectName := "queryNode-0"
assert.Equal(t, expectName, name)
assert.Equal(t, name, "by-dev-queryNode-3")
}
func TestParamTable_statsChannelName(t *testing.T) {
Params.Init()
name := Params.StatsChannelName
contains := strings.Contains(name, "query-node-stats")
assert.Equal(t, contains, true)
assert.Equal(t, name, "by-dev-query-node-stats")
}
func TestParamTable_QueryTimeTickChannel(t *testing.T) {
Params.Init()
ch := Params.QueryTimeTickChannelName
assert.Equal(t, ch, "by-dev-queryTimeTick")
}
func TestParamTable_metaRootPath(t *testing.T) {

View File

@ -38,9 +38,6 @@ type queryCoordMock struct {
func setup() {
os.Setenv("QUERY_NODE_ID", "1")
Params.Init()
//Params.QueryNodeID = 1
Params.initQueryTimeTickChannelName()
Params.initStatsChannelName()
Params.MetaRootPath = "/etcd/test/root/querynode"
}
@ -209,11 +206,6 @@ func makeNewChannelNames(names []string, suffix string) []string {
return ret
}
func refreshChannelNames() {
suffix := "-test-query-node" + strconv.FormatInt(rand.Int63n(1000000), 10)
Params.StatsChannelName = Params.StatsChannelName + suffix
}
func newMessageStreamFactory() (msgstream.Factory, error) {
const receiveBufSize = 1024
@ -229,7 +221,7 @@ func newMessageStreamFactory() (msgstream.Factory, error) {
func TestMain(m *testing.M) {
setup()
refreshChannelNames()
Params.StatsChannelName = Params.StatsChannelName + strconv.Itoa(rand.Int())
exitCode := m.Run()
os.Exit(exitCode)
}

View File

@ -30,14 +30,16 @@ type ParamTable struct {
Address string
Port int
PulsarAddress string
EtcdEndpoints []string
MetaRootPath string
KvRootPath string
MsgChannelSubName string
TimeTickChannel string
StatisticsChannel string
DmlChannelName string
PulsarAddress string
EtcdEndpoints []string
MetaRootPath string
KvRootPath string
ClusterChannelPrefix string
MsgChannelSubName string
TimeTickChannel string
StatisticsChannel string
DmlChannelName string
DmlChannelNum int64
MaxPartitionNum int64
@ -56,39 +58,45 @@ type ParamTable struct {
RoleName string
}
func (p *ParamTable) Init() {
func (p *ParamTable) InitOnce() {
once.Do(func() {
// load yaml
p.BaseTable.Init()
err := p.LoadYaml("advanced/root_coord.yaml")
if err != nil {
panic(err)
}
p.initPulsarAddress()
p.initEtcdEndpoints()
p.initMetaRootPath()
p.initKvRootPath()
p.initMsgChannelSubName()
p.initTimeTickChannel()
p.initStatisticsChannelName()
p.initDmlChannelName()
p.initDmlChannelNum()
p.initMaxPartitionNum()
p.initMinSegmentSizeToEnableIndex()
p.initDefaultPartitionName()
p.initDefaultIndexName()
p.initTimeout()
p.initTimeTickInterval()
p.initLogCfg()
p.initRoleName()
p.Init()
})
}
func (p *ParamTable) Init() {
// load yaml
p.BaseTable.Init()
err := p.LoadYaml("advanced/root_coord.yaml")
if err != nil {
panic(err)
}
p.initPulsarAddress()
p.initEtcdEndpoints()
p.initMetaRootPath()
p.initKvRootPath()
// Has to init global msgchannel prefix before other channel names
p.initClusterMsgChannelPrefix()
p.initMsgChannelSubName()
p.initTimeTickChannel()
p.initStatisticsChannelName()
p.initDmlChannelName()
p.initDmlChannelNum()
p.initMaxPartitionNum()
p.initMinSegmentSizeToEnableIndex()
p.initDefaultPartitionName()
p.initDefaultIndexName()
p.initTimeout()
p.initTimeTickInterval()
p.initLogCfg()
p.initRoleName()
}
func (p *ParamTable) initPulsarAddress() {
addr, err := p.Load("_PulsarAddress")
if err != nil {
@ -129,36 +137,48 @@ func (p *ParamTable) initKvRootPath() {
p.KvRootPath = rootPath + "/" + subPath
}
func (p *ParamTable) initMsgChannelSubName() {
name, err := p.Load("msgChannel.subNamePrefix.rootCoordSubNamePrefix")
func (p *ParamTable) initClusterMsgChannelPrefix() {
config, err := p.Load("msgChannel.chanNamePrefix.cluster")
if err != nil {
panic(err)
}
p.MsgChannelSubName = name
p.ClusterChannelPrefix = config
}
func (p *ParamTable) initMsgChannelSubName() {
config, err := p.Load("msgChannel.subNamePrefix.rootCoordSubNamePrefix")
if err != nil {
panic(err)
}
s := []string{p.ClusterChannelPrefix, config}
p.MsgChannelSubName = strings.Join(s, "-")
}
func (p *ParamTable) initTimeTickChannel() {
channel, err := p.Load("msgChannel.chanNamePrefix.rootCoordTimeTick")
config, err := p.Load("msgChannel.chanNamePrefix.rootCoordTimeTick")
if err != nil {
panic(err)
}
p.TimeTickChannel = channel
s := []string{p.ClusterChannelPrefix, config}
p.TimeTickChannel = strings.Join(s, "-")
}
func (p *ParamTable) initStatisticsChannelName() {
channel, err := p.Load("msgChannel.chanNamePrefix.rootCoordStatistics")
config, err := p.Load("msgChannel.chanNamePrefix.rootCoordStatistics")
if err != nil {
panic(err)
}
p.StatisticsChannel = channel
s := []string{p.ClusterChannelPrefix, config}
p.StatisticsChannel = strings.Join(s, "-")
}
func (p *ParamTable) initDmlChannelName() {
channel, err := p.Load("msgChannel.chanNamePrefix.rootCoordDml")
config, err := p.Load("msgChannel.chanNamePrefix.rootCoordDml")
if err != nil {
panic(err)
}
p.DmlChannelName = channel
s := []string{p.ClusterChannelPrefix, config}
p.DmlChannelName = strings.Join(s, "-")
}
func (p *ParamTable) initDmlChannelNum() {

View File

@ -33,15 +33,18 @@ func TestParamTable(t *testing.T) {
assert.NotEqual(t, Params.KvRootPath, "")
t.Logf("kv root path = %s", Params.KvRootPath)
assert.NotEqual(t, Params.MsgChannelSubName, "")
assert.Equal(t, Params.MsgChannelSubName, "by-dev-rootCoord")
t.Logf("msg channel sub name = %s", Params.MsgChannelSubName)
assert.NotEqual(t, Params.TimeTickChannel, "")
assert.Equal(t, Params.TimeTickChannel, "by-dev-rootcoord-timetick")
t.Logf("master time tick channel = %s", Params.TimeTickChannel)
assert.NotEqual(t, Params.StatisticsChannel, "")
assert.Equal(t, Params.StatisticsChannel, "by-dev-rootcoord-statistics")
t.Logf("master statistics channel = %s", Params.StatisticsChannel)
assert.Equal(t, Params.DmlChannelName, "by-dev-rootcoord-dml")
t.Logf("dml channel = %s", Params.DmlChannelName)
assert.NotEqual(t, Params.MaxPartitionNum, 0)
t.Logf("master MaxPartitionNum = %d", Params.MaxPartitionNum)