Move ProxySubName/QueryNodeSubName/DataNodeSubName to msgChannelConfig (#15501)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/15506/head
Cai Yudong 2022-02-09 14:41:45 +08:00 committed by GitHub
parent 307a8ce535
commit 8255b713ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 52 additions and 114 deletions

View File

@ -279,7 +279,7 @@ func (c *ChannelManager) tryToUnsubscribe(nodeChannelInfo *NodeChannelInfo) {
}
func subscriptionGenerator(collectionID int64, nodeID int64) string {
return fmt.Sprintf("%s-%d-%d", Params.DataNodeCfg.DataNodeSubName, nodeID, collectionID)
return fmt.Sprintf("%s-%d-%d", Params.MsgChannelCfg.DataNodeSubName, nodeID, collectionID)
}
func (c *ChannelManager) unsubscribe(subscriptionName string, channel string) error {

View File

@ -213,7 +213,6 @@ func (node *DataNode) Init() error {
log.Error("DataNode init session failed", zap.Error(err))
return err
}
Params.DataNodeCfg.Refresh()
m := map[string]interface{}{
"PulsarAddress": Params.PulsarCfg.Address,
@ -227,7 +226,7 @@ func (node *DataNode) Init() error {
return err
}
log.Debug("DataNode Init",
zap.String("MsgChannelSubName", Params.DataNodeCfg.DataNodeSubName))
zap.String("MsgChannelSubName", Params.MsgChannelCfg.DataNodeSubName))
return nil
}

View File

@ -34,7 +34,7 @@ import (
func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNodeConfig *nodeConfig) (*flowgraph.InputNode, error) {
// subName should be unique, since pchannelName is shared among several collections
// consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10)
consumeSubName := fmt.Sprintf("%s-%d", Params.DataNodeCfg.DataNodeSubName, dmNodeConfig.collectionID)
consumeSubName := fmt.Sprintf("%s-%d-%d", Params.MsgChannelCfg.DataNodeSubName, Params.DataNodeCfg.NodeID, dmNodeConfig.collectionID)
insertStream, err := dmNodeConfig.msFactory.NewTtMsgStream(ctx)
if err != nil {
return nil, err

View File

@ -154,10 +154,6 @@ func (node *Proxy) Init() error {
}
log.Debug("init session for Proxy done")
log.Debug("refresh configuration of Proxy")
Params.ProxyCfg.Refresh()
log.Debug("refresh configuration of Proxy done")
if node.queryCoord != nil {
log.Debug("create query channel for Proxy")
resp, err := node.queryCoord.CreateQueryChannel(node.ctx, &querypb.CreateQueryChannelRequest{})

View File

@ -362,7 +362,6 @@ func (s *proxyTestServer) startGrpc(ctx context.Context, wg *sync.WaitGroup) {
p.InitOnce(typeutil.ProxyRole)
Params.InitOnce()
Params.ProxyCfg.NetworkAddress = p.GetAddress()
Params.ProxyCfg.Refresh()
var kaep = keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection

View File

@ -817,9 +817,10 @@ func (sched *taskScheduler) collectResultLoop() {
queryResultMsgStream, _ := sched.msFactory.NewQueryMsgStream(sched.ctx)
// proxy didn't need to walk through all the search results in channel, because it no longer has client connections.
queryResultMsgStream.AsConsumerWithPosition(Params.ProxyCfg.SearchResultChannelNames, Params.ProxyCfg.ProxySubName, mqclient.SubscriptionPositionLatest)
consumeSubName := fmt.Sprintf("%s-%d", Params.MsgChannelCfg.ProxySubName, Params.ProxyCfg.ProxyID)
queryResultMsgStream.AsConsumerWithPosition(Params.ProxyCfg.SearchResultChannelNames, consumeSubName, mqclient.SubscriptionPositionLatest)
log.Debug("Proxy", zap.Strings("SearchResultChannelNames", Params.ProxyCfg.SearchResultChannelNames),
zap.Any("ProxySubName", Params.ProxyCfg.ProxySubName))
zap.Any("consumeSubName", consumeSubName))
queryResultMsgStream.Start()
defer queryResultMsgStream.Close()

View File

@ -132,7 +132,7 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() {
nodeID := channelInfo.NodeID
for _, collectionChannels := range channelInfo.CollectionChannels {
collectionID := collectionChannels.CollectionID
subName := funcutil.GenChannelSubName(Params.QueryNodeCfg.QueryNodeSubName, collectionID, nodeID)
subName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, nodeID)
err := unsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels)
if err != nil {
log.Debug("unsubscribe channels failed", zap.Int64("nodeID", nodeID))

View File

@ -259,7 +259,6 @@ func (node *QueryNode) Init() error {
initError = err
return
}
Params.QueryNodeCfg.Refresh()
node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath)
log.Debug("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.EtcdCfg.MetaRootPath))

View File

@ -157,7 +157,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error {
return err
}
consumeChannels := []string{r.req.QueryChannel}
consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
consumeSubName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName)
if r.req.SeekPosition == nil || len(r.req.SeekPosition.MsgID) == 0 {
@ -301,7 +301,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
}
}()
consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
consumeSubName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
// group channels by to seeking or consuming
channel2SeekPosition := make(map[string]*internalpb.MsgPosition)
@ -525,7 +525,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
}
channel2FlowGraph := w.node.dataSyncService.addFlowGraphsForDeltaChannels(collectionID, vDeltaChannels)
consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
consumeSubName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
// channels as consumer
for _, channel := range vDeltaChannels {
fg := channel2FlowGraph[channel]

View File

@ -29,7 +29,7 @@ const (
DefaultRetentionDuration = 3600 * 24 * 5
)
// ComponentParam is used to quickly and easily access all system configuration.
// ComponentParam is used to quickly and easily access all components' configurations.
type ComponentParam struct {
ServiceParam
once sync.Once
@ -134,6 +134,8 @@ type msgChannelConfig struct {
ClusterPrefix string
ProxySubName string
RootCoordTimeTick string
RootCoordStatistics string
RootCoordDml string
@ -144,11 +146,13 @@ type msgChannelConfig struct {
QueryCoordSearchResult string
QueryCoordTimeTick string
QueryNodeStats string
QueryNodeSubName string
DataCoordStatistic string
DataCoordTimeTick string
DataCoordSegmentInfo string
DataCoordSubName string
DataNodeSubName string
}
func (p *msgChannelConfig) init(base *BaseTable) {
@ -157,6 +161,8 @@ func (p *msgChannelConfig) init(base *BaseTable) {
// must init cluster prefix first
p.initClusterPrefix()
p.initProxySubName()
p.initRootCoordTimeTick()
p.initRootCoordStatistics()
p.initRootCoordDml()
@ -167,11 +173,13 @@ func (p *msgChannelConfig) init(base *BaseTable) {
p.initQueryCoordSearchResult()
p.initQueryCoordTimeTick()
p.initQueryNodeStats()
p.initQueryNodeSubName()
p.initDataCoordStatistic()
p.initDataCoordTimeTick()
p.initDataCoordSegmentInfo()
p.initDataCoordSubName()
p.initDataNodeSubName()
}
func (p *msgChannelConfig) initClusterPrefix() {
@ -191,6 +199,11 @@ func (p *msgChannelConfig) initChanNamePrefix(cfg string) string {
return strings.Join(s, "-")
}
// --- proxy ---
func (p *msgChannelConfig) initProxySubName() {
p.ProxySubName = p.initChanNamePrefix("msgChannel.subNamePrefix.proxySubNamePrefix")
}
// --- rootcoord ---
func (p *msgChannelConfig) initRootCoordTimeTick() {
p.RootCoordTimeTick = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordTimeTick")
@ -230,6 +243,10 @@ func (p *msgChannelConfig) initQueryNodeStats() {
p.QueryNodeStats = p.initChanNamePrefix("msgChannel.chanNamePrefix.queryNodeStats")
}
func (p *msgChannelConfig) initQueryNodeSubName() {
p.QueryNodeSubName = p.initChanNamePrefix("msgChannel.subNamePrefix.queryNodeSubNamePrefix")
}
// --- datacoord ---
func (p *msgChannelConfig) initDataCoordStatistic() {
p.DataCoordStatistic = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordStatistic")
@ -247,6 +264,10 @@ func (p *msgChannelConfig) initDataCoordSubName() {
p.DataCoordSubName = p.initChanNamePrefix("msgChannel.subNamePrefix.dataCoordSubNamePrefix")
}
func (p *msgChannelConfig) initDataNodeSubName() {
p.DataNodeSubName = p.initChanNamePrefix("msgChannel.subNamePrefix.dataNodeSubNamePrefix")
}
///////////////////////////////////////////////////////////////////////////////
// --- rootcoord ---
type rootCoordConfig struct {
@ -305,9 +326,6 @@ type proxyConfig struct {
BufFlagExpireTime time.Duration
BufFlagCleanupInterval time.Duration
// --- Channels ---
ProxySubName string
// required from QueryCoord
SearchResultChannelNames []string
RetrieveResultChannelNames []string
@ -323,8 +341,6 @@ func (p *proxyConfig) init(base *BaseTable) {
p.initTimeTickInterval()
p.initProxySubName()
p.initMsgStreamTimeTickBufSize()
p.initMaxNameLength()
p.initMaxFieldNum()
@ -336,11 +352,6 @@ func (p *proxyConfig) init(base *BaseTable) {
p.initBufFlagCleanupInterval()
}
// Refresh is called after session init
func (p *proxyConfig) Refresh() {
p.initProxySubName()
}
// InitAlias initialize Alias member.
func (p *proxyConfig) InitAlias(alias string) {
p.Alias = alias
@ -351,19 +362,6 @@ func (p *proxyConfig) initTimeTickInterval() {
p.TimeTickInterval = time.Duration(interval) * time.Millisecond
}
func (p *proxyConfig) initProxySubName() {
cluster, err := p.Base.Load("msgChannel.chanNamePrefix.cluster")
if err != nil {
panic(err)
}
subname, err := p.Base.Load("msgChannel.subNamePrefix.proxySubNamePrefix")
if err != nil {
panic(err)
}
s := []string{cluster, subname, strconv.FormatInt(p.ProxyID, 10)}
p.ProxySubName = strings.Join(s, "-")
}
func (p *proxyConfig) initMsgStreamTimeTickBufSize() {
p.MsgStreamTimeTickBufSize = p.Base.ParseInt64WithDefault("proxy.msgStream.timeTick.bufSize", 512)
}
@ -514,9 +512,6 @@ type queryNodeConfig struct {
// TODO: remove cacheSize
CacheSize int64 // deprecated
// channel prefix
QueryNodeSubName string
FlowGraphMaxQueueLength int32
FlowGraphMaxParallelism int32
@ -563,8 +558,6 @@ func (p *queryNodeConfig) init(base *BaseTable) {
p.initSearchPulsarBufSize()
p.initSearchResultReceiveBufSize()
p.initQueryNodeSubName()
p.initStatsPublishInterval()
p.initSegcoreChunkRows()
@ -577,11 +570,6 @@ func (p *queryNodeConfig) InitAlias(alias string) {
p.Alias = alias
}
// Refresh is called after session init
func (p *queryNodeConfig) Refresh() {
p.initQueryNodeSubName()
}
func (p *queryNodeConfig) initCacheSize() {
defer log.Debug("init cacheSize", zap.Any("cacheSize (GB)", p.CacheSize))
@ -631,21 +619,6 @@ func (p *queryNodeConfig) initSearchResultReceiveBufSize() {
p.SearchResultReceiveBufSize = p.Base.ParseInt64WithDefault("queryNode.msgStream.searchResult.recvBufSize", 64)
}
// ------------------------ channel names
func (p *queryNodeConfig) initQueryNodeSubName() {
cluster, err := p.Base.Load("msgChannel.chanNamePrefix.cluster")
if err != nil {
panic(err)
}
subname, err := p.Base.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix")
if err != nil {
log.Warn(err.Error())
}
s := []string{cluster, subname}
p.QueryNodeSubName = strings.Join(s, "-")
}
func (p *queryNodeConfig) initGracefulTime() {
p.GracefulTime = p.Base.ParseInt64("queryNode.gracefulTime")
log.Debug("query node init gracefulTime", zap.Any("gracefulTime", p.GracefulTime))
@ -792,9 +765,6 @@ type dataNodeConfig struct {
DeleteBinlogRootPath string
Alias string // Different datanode in one machine
// Channel subscribition name -
DataNodeSubName string
// etcd
ChannelWatchSubPath string
@ -812,15 +782,9 @@ func (p *dataNodeConfig) init(base *BaseTable) {
p.initStatsBinlogRootPath()
p.initDeleteBinlogRootPath()
p.initDataNodeSubName()
p.initChannelWatchPath()
}
// Refresh is called after session init
func (p *dataNodeConfig) Refresh() {
p.initDataNodeSubName()
}
// InitAlias init this DataNode alias
func (p *dataNodeConfig) InitAlias(alias string) {
p.Alias = alias
@ -863,19 +827,6 @@ func (p *dataNodeConfig) initDeleteBinlogRootPath() {
p.DeleteBinlogRootPath = path.Join(rootPath, "delta_log")
}
func (p *dataNodeConfig) initDataNodeSubName() {
cluster, err := p.Base.Load("msgChannel.chanNamePrefix.cluster")
if err != nil {
panic(err)
}
subname, err := p.Base.Load("msgChannel.subNamePrefix.dataNodeSubNamePrefix")
if err != nil {
panic(err)
}
s := []string{cluster, subname, strconv.FormatInt(p.NodeID, 10)}
p.DataNodeSubName = strings.Join(s, "-")
}
func (p *dataNodeConfig) initChannelWatchPath() {
p.ChannelWatchSubPath = "channelwatch"
}

View File

@ -12,7 +12,6 @@
package paramtable
import (
"log"
"os"
"path"
"testing"
@ -50,9 +49,13 @@ func TestComponentParam(t *testing.T) {
t.Logf("knowhere simd type = %s", Params.SimdType)
})
t.Run("test knowhereConfig", func(t *testing.T) {
t.Run("test msgChannelConfig", func(t *testing.T) {
Params := CParams.MsgChannelCfg
// -- proxy --
assert.Equal(t, Params.ProxySubName, "by-dev-proxy")
t.Logf("ProxySubName: %s", Params.ProxySubName)
// -- rootcoord --
assert.Equal(t, Params.RootCoordTimeTick, "by-dev-rootcoord-timetick")
t.Logf("rootcoord timetick channel = %s", Params.RootCoordTimeTick)
@ -83,6 +86,9 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, Params.QueryNodeStats, "by-dev-query-node-stats")
t.Logf("querynode stats channel = %s", Params.QueryNodeStats)
assert.Equal(t, Params.QueryNodeSubName, "by-dev-queryNode")
t.Logf("querynode subname = %s", Params.QueryNodeSubName)
// -- datacoord --
assert.Equal(t, Params.DataCoordTimeTick, "by-dev-datacoord-timetick-channel")
t.Logf("datacoord timetick channel = %s", Params.DataCoordTimeTick)
@ -92,6 +98,9 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, Params.DataCoordSubName, "by-dev-dataCoord")
t.Logf("datacoord subname = %s", Params.DataCoordSubName)
assert.Equal(t, Params.DataNodeSubName, "by-dev-dataNode")
t.Logf("datanode subname = %s", Params.DataNodeSubName)
})
t.Run("test rootCoordConfig", func(t *testing.T) {
@ -114,9 +123,6 @@ func TestComponentParam(t *testing.T) {
t.Logf("TimeTickInterval: %v", Params.TimeTickInterval)
assert.Equal(t, Params.ProxySubName, "by-dev-proxy-0")
t.Logf("ProxySubName: %s", Params.ProxySubName)
t.Logf("MsgStreamTimeTickBufSize: %d", Params.MsgStreamTimeTickBufSize)
t.Logf("MaxNameLength: %d", Params.MaxNameLength)
@ -204,11 +210,6 @@ func TestComponentParam(t *testing.T) {
maxParallelism := Params.FlowGraphMaxParallelism
assert.Equal(t, int32(1024), maxParallelism)
Params.QueryNodeID = 3
Params.initQueryNodeSubName()
name := Params.QueryNodeSubName
assert.Equal(t, name, "by-dev-queryNode")
})
t.Run("test dataCoordConfig", func(t *testing.T) {
@ -219,35 +220,30 @@ func TestComponentParam(t *testing.T) {
Params := CParams.DataNodeCfg
Params.NodeID = 2
Params.Refresh()
id := Params.NodeID
log.Println("NodeID:", id)
t.Logf("NodeID: %d", id)
alias := Params.Alias
log.Println("Alias:", alias)
t.Logf("Alias: %s", alias)
length := Params.FlowGraphMaxQueueLength
log.Println("flowGraphMaxQueueLength:", length)
t.Logf("flowGraphMaxQueueLength: %d", length)
maxParallelism := Params.FlowGraphMaxParallelism
log.Println("flowGraphMaxParallelism:", maxParallelism)
t.Logf("flowGraphMaxParallelism: %d", maxParallelism)
size := Params.FlushInsertBufferSize
log.Println("FlushInsertBufferSize:", size)
t.Logf("FlushInsertBufferSize: %d", size)
path1 := Params.InsertBinlogRootPath
log.Println("InsertBinlogRootPath:", path1)
name := Params.DataNodeSubName
assert.Equal(t, name, "by-dev-dataNode-2")
log.Println("DataNodeSubName:", name)
t.Logf("InsertBinlogRootPath: %s", path1)
Params.CreatedTime = time.Now()
log.Println("CreatedTime: ", Params.CreatedTime)
t.Logf("CreatedTime: %v", Params.CreatedTime)
Params.UpdatedTime = time.Now()
log.Println("UpdatedTime: ", Params.UpdatedTime)
t.Logf("UpdatedTime: %v", Params.UpdatedTime)
assert.Equal(t, path.Join("files", "insert_log"), Params.InsertBinlogRootPath)

View File

@ -30,8 +30,7 @@ const (
SuggestPulsarMaxMessageSize = 5 * 1024 * 1024
)
// BaseParamTable is a derived struct of BaseTable. It achieves Composition by
// embedding BaseTable. It is used to quickly and easily access the system configuration.
// ServiceParam is used to quickly and easily access all basic service configurations.
type ServiceParam struct {
BaseTable
@ -41,8 +40,6 @@ type ServiceParam struct {
MinioCfg MinioConfig
}
// Init is an override method of BaseTable's Init. It mainly calls the
// Init of BaseTable and do some other initialization.
func (p *ServiceParam) Init() {
p.BaseTable.Init()