From 71e814f796b0a3b8eb05486c898b0035388e5fa4 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Fri, 12 Nov 2021 21:25:08 +0800 Subject: [PATCH] Remove common.yaml and component.yaml (#11661) Signed-off-by: xiaofan-luan --- internal/datacoord/param_table.go | 5 +- internal/datacoord/server.go | 2 +- internal/distributed/datacoord/param_table.go | 45 ++++----- .../distributed/datacoord/param_table_test.go | 3 - internal/distributed/datacoord/service.go | 1 + .../distributed/datanode/client/client.go | 1 + internal/distributed/datanode/param_table.go | 58 +++++------- .../distributed/datanode/param_table_test.go | 6 -- internal/distributed/datanode/service.go | 2 - .../distributed/indexcoord/client/client.go | 1 + .../distributed/indexcoord/param_table.go | 37 +++++--- internal/distributed/indexcoord/service.go | 8 +- .../distributed/indexnode/client/client.go | 1 + internal/distributed/indexnode/param_table.go | 30 ++---- internal/distributed/proxy/param_table.go | 57 +----------- internal/distributed/proxy/service.go | 14 --- .../distributed/querycoord/client/client.go | 1 + .../distributed/querycoord/param_table.go | 59 +++++------- .../querycoord/param_table_test.go | 6 -- internal/distributed/querycoord/service.go | 6 +- internal/distributed/querynode/param_table.go | 74 +++++---------- .../distributed/querynode/param_table_test.go | 14 +-- internal/distributed/querynode/service.go | 15 +-- .../distributed/rootcoord/client/client.go | 1 + internal/distributed/rootcoord/param_table.go | 92 +++++++------------ .../distributed/rootcoord/param_table_test.go | 9 -- internal/proxy/param_table.go | 7 +- internal/querycoord/param_table.go | 9 -- internal/querycoord/query_coord.go | 2 +- internal/util/paramtable/basetable.go | 56 ----------- internal/util/sessionutil/session_util.go | 6 +- 31 files changed, 184 insertions(+), 444 deletions(-) diff --git a/internal/datacoord/param_table.go b/internal/datacoord/param_table.go index f37f4e3325..a355a5615b 100644 --- a/internal/datacoord/param_table.go +++ b/internal/datacoord/param_table.go @@ -32,8 +32,9 @@ type ParamTable struct { NodeID int64 - IP string - Port int + IP string + Port int + Address string // --- ETCD --- EtcdEndpoints []string diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 9d6ebd293f..544302eb49 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -208,7 +208,7 @@ func (s *Server) Register() error { if s.session == nil { return errors.New("failed to initialize session") } - s.session.Init(typeutil.DataCoordRole, Params.IP, true) + s.session.Init(typeutil.DataCoordRole, Params.Address, true) Params.NodeID = s.session.ServerID Params.SetLogger(typeutil.UniqueID(-1)) return nil diff --git a/internal/distributed/datacoord/param_table.go b/internal/distributed/datacoord/param_table.go index 0cbb1ce0a7..d00ef1516e 100644 --- a/internal/distributed/datacoord/param_table.go +++ b/internal/distributed/datacoord/param_table.go @@ -22,9 +22,9 @@ import ( "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - + "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/paramtable" + "go.uber.org/zap" ) // ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by @@ -32,9 +32,9 @@ import ( type ParamTable struct { paramtable.BaseTable - IP string - Port int - RootCoordAddress string + IP string + Port int + Address string ServerMaxSendSize int ServerMaxRecvSize int @@ -49,21 +49,26 @@ var once sync.Once func (pt *ParamTable) Init() { once.Do(func() { pt.BaseTable.Init() - pt.initPort() pt.initParams() - pt.loadFromEnv() - - pt.initServerMaxSendSize() - pt.initServerMaxRecvSize() + pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) }) } func (pt *ParamTable) initParams() { - pt.initRootCoordAddress() - pt.initDataCoordAddress() + pt.loadFromEnv() + pt.loadFromArgs() + pt.initPort() + + pt.initServerMaxSendSize() + pt.initServerMaxRecvSize() } func (pt *ParamTable) loadFromEnv() { + Params.IP = funcutil.GetLocalIP() +} + +// LoadFromArgs is used to initialize configuration items from args. +func (pt *ParamTable) loadFromArgs() { } @@ -71,22 +76,6 @@ func (pt *ParamTable) initPort() { pt.Port = pt.ParseInt("dataCoord.port") } -func (pt *ParamTable) initRootCoordAddress() { - ret, err := pt.Load("_RootCoordAddress") - if err != nil { - panic(err) - } - pt.RootCoordAddress = ret -} - -func (pt *ParamTable) initDataCoordAddress() { - ret, err := pt.Load("_DataCoordAddress") - if err != nil { - panic(err) - } - pt.IP = ret -} - func (pt *ParamTable) initServerMaxSendSize() { var err error diff --git a/internal/distributed/datacoord/param_table_test.go b/internal/distributed/datacoord/param_table_test.go index 859594a651..d869f513f0 100644 --- a/internal/distributed/datacoord/param_table_test.go +++ b/internal/distributed/datacoord/param_table_test.go @@ -32,9 +32,6 @@ func TestParamTable(t *testing.T) { assert.NotEqual(t, Params.Port, 0) t.Logf("DataCoord Port:%d", Params.Port) - assert.NotEqual(t, Params.RootCoordAddress, "") - t.Logf("RootCoordAddress:%s", Params.RootCoordAddress) - log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index d4cbfa1811..9fb0d1a5ed 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -84,6 +84,7 @@ func (s *Server) init() error { datacoord.Params.InitOnce() datacoord.Params.IP = Params.IP datacoord.Params.Port = Params.Port + datacoord.Params.Address = Params.Address err := s.dataCoord.Register() if err != nil { diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index b2ee005b4d..c2b6b2c3a0 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -78,6 +78,7 @@ func (c *Client) getGrpcClientFunc() (datapb.DataNodeClient, error) { // if we return nil here, then we should check if client is nil outside, err := c.connect(retry.Attempts(20)) if err != nil { + log.Debug("DatanodeClient try reconnect failed", zap.Error(err)) return nil, err } diff --git a/internal/distributed/datanode/param_table.go b/internal/distributed/datanode/param_table.go index 05f884fd34..7feddecd0c 100644 --- a/internal/distributed/datanode/param_table.go +++ b/internal/distributed/datanode/param_table.go @@ -40,11 +40,9 @@ type ParamTable struct { IP string Port int + Address string listener net.Listener - RootCoordAddress string - DataCoordAddress string - ServerMaxSendSize int ServerMaxRecvSize int } @@ -54,18 +52,27 @@ type ParamTable struct { func (pt *ParamTable) Init() { once.Do(func() { pt.BaseTable.Init() - pt.initRootCoordAddress() - pt.initDataCoordAddress() - pt.initPort() + pt.initParams() + pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) - pt.loadFromEnv() - pt.loadFromArgs() - - pt.initServerMaxSendSize() - pt.initServerMaxRecvSize() + listener, err := net.Listen("tcp", pt.Address) + if err != nil { + panic(err) + } + pt.listener = listener }) } +// initParams initializes params of the configuration items. +func (pt *ParamTable) initParams() { + pt.loadFromEnv() + pt.loadFromArgs() + + pt.initPort() + pt.initServerMaxSendSize() + pt.initServerMaxRecvSize() +} + func (pt *ParamTable) loadFromArgs() { } @@ -75,31 +82,12 @@ func (pt *ParamTable) loadFromEnv() { } func (pt *ParamTable) initPort() { - - listener, err := net.Listen("tcp", ":0") - if err != nil { - panic(err) + port := pt.ParseInt("dataNode.port") + pt.Port = port + if !funcutil.CheckPortAvailable(pt.Port) { + pt.Port = funcutil.GetAvailablePort() + log.Warn("DataNode init", zap.Any("Port", pt.Port)) } - - pt.Port = listener.Addr().(*net.TCPAddr).Port - pt.listener = listener - log.Info("DataNode", zap.Int("port", pt.Port)) -} - -func (pt *ParamTable) initRootCoordAddress() { - ret, err := pt.Load("_RootCoordAddress") - if err != nil { - panic(err) - } - pt.RootCoordAddress = ret -} - -func (pt *ParamTable) initDataCoordAddress() { - ret, err := pt.Load("_DataCoordAddress") - if err != nil { - panic(err) - } - pt.DataCoordAddress = ret } func (pt *ParamTable) initServerMaxSendSize() { diff --git a/internal/distributed/datanode/param_table_test.go b/internal/distributed/datanode/param_table_test.go index 16e098bcae..02c7e1d497 100644 --- a/internal/distributed/datanode/param_table_test.go +++ b/internal/distributed/datanode/param_table_test.go @@ -39,12 +39,6 @@ func TestParamTable(t *testing.T) { assert.NotNil(t, Params.listener) t.Logf("DataNode listener:%d", Params.listener) - assert.NotEqual(t, Params.DataCoordAddress, "") - t.Logf("DataCoordAddress:%s", Params.DataCoordAddress) - - assert.NotEqual(t, Params.RootCoordAddress, "") - t.Logf("RootCoordAddress:%s", Params.RootCoordAddress) - log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 11ed44deaa..6a912cc9ff 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -191,7 +191,6 @@ func (s *Server) init() error { // --- RootCoord Client --- if s.newRootCoordClient != nil { - log.Debug("RootCoord address", zap.String("address", Params.RootCoordAddress)) log.Debug("Init root coord client ...") rootCoordClient, err := s.newRootCoordClient(dn.Params.MetaRootPath, dn.Params.EtcdEndpoints) if err != nil { @@ -219,7 +218,6 @@ func (s *Server) init() error { // --- Data Server Client --- if s.newDataCoordClient != nil { - log.Debug("Data service address", zap.String("address", Params.DataCoordAddress)) log.Debug("DataNode Init data service client ...") dataCoordClient, err := s.newDataCoordClient(dn.Params.MetaRootPath, dn.Params.EtcdEndpoints) if err != nil { diff --git a/internal/distributed/indexcoord/client/client.go b/internal/distributed/indexcoord/client/client.go index 2b235da1fa..b6f2f9256f 100644 --- a/internal/distributed/indexcoord/client/client.go +++ b/internal/distributed/indexcoord/client/client.go @@ -72,6 +72,7 @@ func (c *Client) getGrpcClient() (indexpb.IndexCoordClient, error) { // if we return nil here, then we should check if client is nil outside, err := c.connect(retry.Attempts(20)) if err != nil { + log.Debug("IndexcoordClient try reconnect failed", zap.Error(err)) return nil, err } diff --git a/internal/distributed/indexcoord/param_table.go b/internal/distributed/indexcoord/param_table.go index 5d4c0a3ead..ddccfe6ccc 100644 --- a/internal/distributed/indexcoord/param_table.go +++ b/internal/distributed/indexcoord/param_table.go @@ -22,17 +22,18 @@ import ( "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - + "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/paramtable" + "go.uber.org/zap" ) // ParamTable is used to record configuration items. type ParamTable struct { paramtable.BaseTable - ServiceAddress string - ServicePort int + IP string + Port int + Address string ServerMaxSendSize int ServerMaxRecvSize int @@ -47,30 +48,38 @@ func (pt *ParamTable) Init() { once.Do(func() { pt.BaseTable.Init() pt.initParams() + pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) }) } // initParams initializes params of the configuration items. func (pt *ParamTable) initParams() { - pt.initServicePort() - pt.initServiceAddress() + pt.LoadFromEnv() + pt.LoadFromArgs() + pt.initPort() pt.initServerMaxSendSize() pt.initServerMaxRecvSize() } // initServicePort initializes the port of IndexCoord service. -func (pt *ParamTable) initServicePort() { - pt.ServicePort = pt.ParseInt("indexCoord.port") +func (pt *ParamTable) initPort() { + pt.Port = pt.ParseInt("indexCoord.port") } // initServiceAddress initializes the address of IndexCoord service. -func (pt *ParamTable) initServiceAddress() { - ret, err := pt.Load("_IndexCoordAddress") - if err != nil { - panic(err) - } - pt.ServiceAddress = ret +func (pt *ParamTable) LoadFromEnv() { + Params.IP = funcutil.GetLocalIP() +} + +// LoadFromArgs is used to initialize configuration items from args. +func (pt *ParamTable) loadFromArgs() { + +} + +// LoadFromArgs is used to initialize configuration items from args. +func (pt *ParamTable) LoadFromArgs() { + } // initServerMaxSendSize initializes the max send size of IndexCoord service. diff --git a/internal/distributed/indexcoord/service.go b/internal/distributed/indexcoord/service.go index e01d2360d4..5f550efc0f 100644 --- a/internal/distributed/indexcoord/service.go +++ b/internal/distributed/indexcoord/service.go @@ -76,8 +76,8 @@ func (s *Server) init() error { Params.Init() indexcoord.Params.InitOnce() - indexcoord.Params.Address = Params.ServiceAddress - indexcoord.Params.Port = Params.ServicePort + indexcoord.Params.Address = Params.Address + indexcoord.Params.Port = Params.Port closer := trace.InitTracing("IndexCoord") s.closer = closer @@ -88,7 +88,7 @@ func (s *Server) init() error { } s.loopWg.Add(1) - go s.startGrpcLoop(Params.ServicePort) + go s.startGrpcLoop(indexcoord.Params.Port) // wait for grpc IndexCoord loop start if err := <-s.grpcErrChan; err != nil { log.Error("IndexCoord", zap.Any("init error", err)) @@ -181,7 +181,7 @@ func (s *Server) startGrpcLoop(grpcPort int) { defer s.loopWg.Done() - log.Debug("IndexCoord", zap.String("network address", Params.ServiceAddress), zap.Int("network port", grpcPort)) + log.Debug("IndexCoord", zap.String("network address", Params.IP), zap.Int("network port", grpcPort)) lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort)) if err != nil { log.Warn("IndexCoord", zap.String("GrpcServer:failed to listen", err.Error())) diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index 38d134f050..104c57d2e3 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -75,6 +75,7 @@ func (c *Client) getGrpcClientFunc() (indexpb.IndexNodeClient, error) { // if we return nil here, then we should check if client is nil outside, err := c.connect(retry.Attempts(20)) if err != nil { + log.Debug("IndexNodeClient try reconnect failed", zap.Error(err)) return nil, err } diff --git a/internal/distributed/indexnode/param_table.go b/internal/distributed/indexnode/param_table.go index 695708be5b..44bcdcd273 100644 --- a/internal/distributed/indexnode/param_table.go +++ b/internal/distributed/indexnode/param_table.go @@ -32,8 +32,6 @@ import ( type ParamTable struct { paramtable.BaseTable - IndexCoordAddress string - IP string Port int Address string @@ -51,16 +49,6 @@ func (pt *ParamTable) Init() { once.Do(func() { pt.BaseTable.Init() pt.initParams() - - pt.initServerMaxSendSize() - pt.initServerMaxRecvSize() - - if !funcutil.CheckPortAvailable(pt.Port) { - pt.Port = funcutil.GetAvailablePort() - log.Warn("IndexNode init", zap.Any("Port", pt.Port)) - } - pt.LoadFromEnv() - pt.LoadFromArgs() }) } @@ -75,22 +63,20 @@ func (pt *ParamTable) LoadFromEnv() { } func (pt *ParamTable) initParams() { + pt.LoadFromEnv() + pt.LoadFromArgs() pt.initPort() - pt.initIndexCoordAddress() -} - -// todo remove and use load from env -func (pt *ParamTable) initIndexCoordAddress() { - ret, err := pt.Load("_IndexCoordAddress") - if err != nil { - panic(err) - } - pt.IndexCoordAddress = ret + pt.initServerMaxSendSize() + pt.initServerMaxRecvSize() } func (pt *ParamTable) initPort() { port := pt.ParseInt("indexNode.port") pt.Port = port + if !funcutil.CheckPortAvailable(pt.Port) { + pt.Port = funcutil.GetAvailablePort() + log.Warn("IndexNode init", zap.Any("Port", pt.Port)) + } } func (pt *ParamTable) initServerMaxSendSize() { diff --git a/internal/distributed/proxy/param_table.go b/internal/distributed/proxy/param_table.go index fef5cff71f..76982107c3 100644 --- a/internal/distributed/proxy/param_table.go +++ b/internal/distributed/proxy/param_table.go @@ -33,11 +33,6 @@ import ( type ParamTable struct { paramtable.BaseTable - RootCoordAddress string - IndexCoordAddress string - DataCoordAddress string - QueryCoordAddress string - IP string Port int Address string @@ -56,13 +51,7 @@ func (pt *ParamTable) Init() { once.Do(func() { pt.BaseTable.Init() pt.initParams() - - pt.loadFromEnv() - pt.loadFromArgs() pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) - - pt.initServerMaxSendSize() - pt.initServerMaxRecvSize() }) } @@ -71,51 +60,15 @@ func (pt *ParamTable) loadFromArgs() { } func (pt *ParamTable) loadFromEnv() { - Params.IP = funcutil.GetLocalIP() + pt.IP = funcutil.GetLocalIP() } func (pt *ParamTable) initParams() { + pt.loadFromEnv() + pt.loadFromArgs() pt.initPort() - pt.initRootCoordAddress() - pt.initIndexCoordAddress() - pt.initDataCoordAddress() - pt.initQueryCoordAddress() -} - -// todo remove and use load from env -func (pt *ParamTable) initIndexCoordAddress() { - ret, err := pt.Load("_IndexCoordAddress") - if err != nil { - panic(err) - } - pt.IndexCoordAddress = ret -} - -// todo remove and use load from env -func (pt *ParamTable) initRootCoordAddress() { - ret, err := pt.Load("_RootCoordAddress") - if err != nil { - panic(err) - } - pt.RootCoordAddress = ret -} - -// todo remove and use load from env -func (pt *ParamTable) initDataCoordAddress() { - ret, err := pt.Load("_DataCoordAddress") - if err != nil { - panic(err) - } - pt.DataCoordAddress = ret -} - -// todo remove and use load from env -func (pt *ParamTable) initQueryCoordAddress() { - ret, err := pt.Load("_QueryCoordAddress") - if err != nil { - panic(err) - } - pt.QueryCoordAddress = ret + pt.initServerMaxSendSize() + pt.initServerMaxRecvSize() } func (pt *ParamTable) initPort() { diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 7b27bb5195..0b7117309a 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -148,8 +148,6 @@ func (s *Server) init() error { proxy.Params.IP = Params.IP proxy.Params.NetworkAddress = Params.Address - // for purpose of ID Allocator - proxy.Params.RootCoordAddress = Params.RootCoordAddress closer := trace.InitTracing(fmt.Sprintf("proxy ip: %s, port: %d", Params.IP, Params.Port)) s.closer = closer @@ -173,9 +171,6 @@ func (s *Server) init() error { return err } - rootCoordAddr := Params.RootCoordAddress - log.Debug("Proxy", zap.String("RootCoord address", rootCoordAddr)) - if s.rootCoordClient == nil { s.rootCoordClient, err = rcc.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints) if err != nil { @@ -196,9 +191,6 @@ func (s *Server) init() error { s.proxy.SetRootCoordClient(s.rootCoordClient) log.Debug("set rootcoord client ...") - dataCoordAddr := Params.DataCoordAddress - log.Debug("Proxy", zap.String("data coordinator address", dataCoordAddr)) - if s.dataCoordClient == nil { s.dataCoordClient, err = grpcdatacoordclient.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints) if err != nil { @@ -215,9 +207,6 @@ func (s *Server) init() error { s.proxy.SetDataCoordClient(s.dataCoordClient) log.Debug("set data coordinator address ...") - indexCoordAddr := Params.IndexCoordAddress - log.Debug("Proxy", zap.String("index coordinator address", indexCoordAddr)) - if s.indexCoordClient == nil { s.indexCoordClient, err = grpcindexcoordclient.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints) if err != nil { @@ -234,9 +223,6 @@ func (s *Server) init() error { s.proxy.SetIndexCoordClient(s.indexCoordClient) log.Debug("set index coordinator client ...") - queryCoordAddr := Params.QueryCoordAddress - log.Debug("Proxy", zap.String("query coordinator address", queryCoordAddr)) - if s.queryCooedClient == nil { s.queryCooedClient, err = grpcquerycoordclient.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints) if err != nil { diff --git a/internal/distributed/querycoord/client/client.go b/internal/distributed/querycoord/client/client.go index 9493d4347f..bf214a22fb 100644 --- a/internal/distributed/querycoord/client/client.go +++ b/internal/distributed/querycoord/client/client.go @@ -78,6 +78,7 @@ func (c *Client) getGrpcClientFunc() (querypb.QueryCoordClient, error) { // if we return nil here, then we should check if client is nil outside, err := c.connect(retry.Attempts(20)) if err != nil { + log.Warn("QueryCoordClient try connect fail", zap.Error(err)) return nil, err } diff --git a/internal/distributed/querycoord/param_table.go b/internal/distributed/querycoord/param_table.go index 91248ff1b1..f0e1b14b83 100644 --- a/internal/distributed/querycoord/param_table.go +++ b/internal/distributed/querycoord/param_table.go @@ -22,9 +22,9 @@ import ( "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - + "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/paramtable" + "go.uber.org/zap" ) // Params is a package scoped variable of type ParamTable. @@ -35,12 +35,10 @@ var once sync.Once // embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration. type ParamTable struct { paramtable.BaseTable - Port int - - RootCoordAddress string - DataCoordAddress string - IndexCoordAddress string + IP string + Port int + Address string ServerMaxSendSize int ServerMaxRecvSize int } @@ -50,44 +48,33 @@ type ParamTable struct { func (pt *ParamTable) Init() { once.Do(func() { pt.BaseTable.Init() - pt.initPort() - pt.initRootCoordAddress() - pt.initDataCoordAddress() - pt.initIndexCoordAddress() - - pt.initServerMaxSendSize() - pt.initServerMaxRecvSize() + pt.initParams() + pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) }) } -func (pt *ParamTable) initRootCoordAddress() { - ret, err := pt.Load("_RootCoordAddress") - if err != nil { - panic(err) - } - pt.RootCoordAddress = ret -} - -func (pt *ParamTable) initDataCoordAddress() { - ret, err := pt.Load("_DataCoordAddress") - if err != nil { - panic(err) - } - pt.DataCoordAddress = ret -} - -func (pt *ParamTable) initIndexCoordAddress() { - ret, err := pt.Load("_IndexCoordAddress") - if err != nil { - panic(err) - } - pt.IndexCoordAddress = ret +// initParams initializes params of the configuration items. +func (pt *ParamTable) initParams() { + pt.LoadFromEnv() + pt.LoadFromArgs() + pt.initPort() + pt.initServerMaxSendSize() + pt.initServerMaxRecvSize() } func (pt *ParamTable) initPort() { pt.Port = pt.ParseInt("queryCoord.port") } +func (pt *ParamTable) LoadFromEnv() { + pt.IP = funcutil.GetLocalIP() +} + +// LoadFromArgs is used to initialize configuration items from args. +func (pt *ParamTable) LoadFromArgs() { + +} + func (pt *ParamTable) initServerMaxSendSize() { var err error diff --git a/internal/distributed/querycoord/param_table_test.go b/internal/distributed/querycoord/param_table_test.go index 949a209634..7437a7c817 100644 --- a/internal/distributed/querycoord/param_table_test.go +++ b/internal/distributed/querycoord/param_table_test.go @@ -29,12 +29,6 @@ import ( func TestParamTable(t *testing.T) { Params.Init() - assert.NotEqual(t, Params.DataCoordAddress, "") - t.Logf("DataCoordAddress:%s", Params.DataCoordAddress) - - assert.NotEqual(t, Params.RootCoordAddress, "") - t.Logf("RootCoordAddress:%s", Params.RootCoordAddress) - log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index ec34686cd3..efc4614e87 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -101,6 +101,7 @@ func (s *Server) init() error { Params.Init() qc.Params.InitOnce() + qc.Params.Address = Params.Address qc.Params.Port = Params.Port closer := trace.InitTracing("querycoord") @@ -119,8 +120,6 @@ func (s *Server) init() error { } // --- Master Server Client --- - log.Debug("QueryCoord try to new RootCoord client", zap.Any("RootCoordAddress", Params.RootCoordAddress)) - if s.rootCoord == nil { s.rootCoord, err = rcc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints) if err != nil { @@ -152,8 +151,6 @@ func (s *Server) init() error { log.Debug("QueryCoord report RootCoord ready") // --- Data service client --- - log.Debug("QueryCoord try to new DataCoord client", zap.Any("DataCoordAddress", Params.DataCoordAddress)) - if s.dataCoord == nil { s.dataCoord, err = dsc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints) if err != nil { @@ -182,7 +179,6 @@ func (s *Server) init() error { log.Debug("QueryCoord report DataCoord ready") // --- IndexCoord --- - log.Debug("QueryCoord try to new IndexCoord client", zap.Any("IndexCoordAddress", Params.IndexCoordAddress)) if s.indexCoord == nil { s.indexCoord, err = isc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints) if err != nil { diff --git a/internal/distributed/querynode/param_table.go b/internal/distributed/querynode/param_table.go index 3afcd4a1e8..ab0955ffcd 100644 --- a/internal/distributed/querynode/param_table.go +++ b/internal/distributed/querynode/param_table.go @@ -37,14 +37,10 @@ var once sync.Once type ParamTable struct { paramtable.BaseTable - QueryNodeIP string - QueryNodePort int - QueryNodeID UniqueID - - RootCoordAddress string - IndexCoordAddress string - DataCoordAddress string - QueryCoordAddress string + IP string + Port int + Address string + QueryNodeID UniqueID ServerMaxSendSize int ServerMaxRecvSize int @@ -54,20 +50,20 @@ type ParamTable struct { func (pt *ParamTable) Init() { once.Do(func() { pt.BaseTable.Init() - pt.initPort() - pt.initRootCoordAddress() - pt.initIndexCoordAddress() - pt.initDataCoordAddress() - pt.initQueryCoordAddress() - - pt.LoadFromEnv() - pt.LoadFromArgs() - - pt.initServerMaxSendSize() - pt.initServerMaxRecvSize() + pt.initParams() + pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) }) } +// initParams initializes params of the configuration items. +func (pt *ParamTable) initParams() { + pt.LoadFromEnv() + pt.LoadFromArgs() + pt.initPort() + pt.initServerMaxSendSize() + pt.initServerMaxRecvSize() +} + // LoadFromArgs is used to initialize configuration items from args. func (pt *ParamTable) LoadFromArgs() { @@ -75,44 +71,16 @@ func (pt *ParamTable) LoadFromArgs() { // LoadFromEnv is used to initialize configuration items from env. func (pt *ParamTable) LoadFromEnv() { - Params.QueryNodeIP = funcutil.GetLocalIP() -} - -func (pt *ParamTable) initRootCoordAddress() { - ret, err := pt.Load("_RootCoordAddress") - if err != nil { - panic(err) - } - pt.RootCoordAddress = ret -} - -func (pt *ParamTable) initIndexCoordAddress() { - ret, err := pt.Load("_IndexCoordAddress") - if err != nil { - panic(err) - } - pt.IndexCoordAddress = ret -} - -func (pt *ParamTable) initDataCoordAddress() { - ret, err := pt.Load("_DataCoordAddress") - if err != nil { - panic(err) - } - pt.DataCoordAddress = ret -} - -func (pt *ParamTable) initQueryCoordAddress() { - ret, err := pt.Load("_QueryCoordAddress") - if err != nil { - panic(err) - } - pt.QueryCoordAddress = ret + pt.IP = funcutil.GetLocalIP() } func (pt *ParamTable) initPort() { port := pt.ParseInt("queryNode.port") - pt.QueryNodePort = port + pt.Port = port + if !funcutil.CheckPortAvailable(pt.Port) { + pt.Port = funcutil.GetAvailablePort() + log.Warn("QueryNode init", zap.Any("Port", pt.Port)) + } } func (pt *ParamTable) initServerMaxSendSize() { diff --git a/internal/distributed/querynode/param_table_test.go b/internal/distributed/querynode/param_table_test.go index 2bbd6e29c0..84d558e5e1 100644 --- a/internal/distributed/querynode/param_table_test.go +++ b/internal/distributed/querynode/param_table_test.go @@ -30,18 +30,6 @@ import ( func TestParamTable(t *testing.T) { Params.Init() - assert.NotEqual(t, Params.IndexCoordAddress, "") - t.Logf("IndexCoordAddress:%s", Params.IndexCoordAddress) - - assert.NotEqual(t, Params.DataCoordAddress, "") - t.Logf("DataCoordAddress:%s", Params.DataCoordAddress) - - assert.NotEqual(t, Params.RootCoordAddress, "") - t.Logf("RootCoordAddress:%s", Params.RootCoordAddress) - - assert.NotEqual(t, Params.QueryCoordAddress, "") - t.Logf("QueryCoordAddress:%s", Params.QueryCoordAddress) - log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) @@ -54,5 +42,5 @@ func TestParamTable(t *testing.T) { assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize) Params.LoadFromEnv() - assert.Equal(t, Params.QueryNodeIP, funcutil.GetLocalIP()) + assert.Equal(t, Params.IP, funcutil.GetLocalIP()) } diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 535f79ca74..3bef39b6ae 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -84,16 +84,16 @@ func (s *Server) init() error { Params.Init() qn.Params.InitOnce() - qn.Params.QueryNodeIP = Params.QueryNodeIP - qn.Params.QueryNodePort = int64(Params.QueryNodePort) + qn.Params.QueryNodeIP = Params.IP + qn.Params.QueryNodePort = int64(Params.Port) qn.Params.QueryNodeID = Params.QueryNodeID - closer := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort)) + closer := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.IP, Params.Port)) s.closer = closer - log.Debug("QueryNode", zap.Int("port", Params.QueryNodePort)) + log.Debug("QueryNode", zap.Int("port", Params.Port)) s.wg.Add(1) - go s.startGrpcLoop(Params.QueryNodePort) + go s.startGrpcLoop(Params.Port) // wait for grpc server loop start err := <-s.grpcErrChan if err != nil { @@ -101,10 +101,6 @@ func (s *Server) init() error { } // --- RootCoord Client --- - //ms.Params.Init() - addr := Params.RootCoordAddress - - log.Debug("QueryNode start to new RootCoordClient", zap.Any("QueryCoordAddress", addr)) if s.rootCoord == nil { s.rootCoord, err = rcc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints) if err != nil { @@ -135,7 +131,6 @@ func (s *Server) init() error { } // --- IndexCoord --- - log.Debug("Index coord", zap.String("address", Params.IndexCoordAddress)) if s.indexCoord == nil { s.indexCoord, err = isc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints) if err != nil { diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 256b6d20e0..7b3e01df33 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -180,6 +180,7 @@ func (c *GrpcClient) getGrpcClientFunc() (rootcoordpb.RootCoordClient, error) { // if we return nil here, then we should check if client is nil outside, err := c.connect(retry.Attempts(20)) if err != nil { + log.Debug("RoodCoordClient try connect failed", zap.Error(err)) return nil, err } diff --git a/internal/distributed/rootcoord/param_table.go b/internal/distributed/rootcoord/param_table.go index 0de6d442c6..4594b94de1 100644 --- a/internal/distributed/rootcoord/param_table.go +++ b/internal/distributed/rootcoord/param_table.go @@ -22,9 +22,9 @@ import ( "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - + "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/paramtable" + "go.uber.org/zap" ) // Params is a package scoped variable of type ParamTable. @@ -36,12 +36,9 @@ var once sync.Once type ParamTable struct { paramtable.BaseTable - Address string // ip:port + IP string Port int - - IndexCoordAddress string - QueryCoordAddress string - DataCoordAddress string + Address string ServerMaxSendSize int ServerMaxRecvSize int @@ -49,62 +46,43 @@ type ParamTable struct { // Init is an override method of BaseTable's Init. It mainly calls the // Init of BaseTable and do some other initialization. -func (p *ParamTable) Init() { +func (pt *ParamTable) Init() { once.Do(func() { - p.BaseTable.Init() - p.initAddress() - p.initPort() - p.initIndexCoordAddress() - p.initQueryCoordAddress() - p.initDataCoordAddress() - - p.initServerMaxSendSize() - p.initServerMaxRecvSize() + pt.BaseTable.Init() + pt.initParams() + pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) }) } -func (p *ParamTable) initAddress() { - ret, err := p.Load("_RootCoordAddress") - if err != nil { - panic(err) - } - p.Address = ret +// initParams initializes params of the configuration items. +func (pt *ParamTable) initParams() { + pt.LoadFromEnv() + pt.LoadFromArgs() + pt.initPort() + pt.initServerMaxSendSize() + pt.initServerMaxRecvSize() } -func (p *ParamTable) initPort() { - p.Port = p.ParseInt("rootCoord.port") +// LoadFromEnv is used to initialize configuration items from env. +func (pt *ParamTable) LoadFromEnv() { + pt.IP = funcutil.GetLocalIP() } -func (p *ParamTable) initIndexCoordAddress() { - ret, err := p.Load("_IndexCoordAddress") - if err != nil { - panic(err) - } - p.IndexCoordAddress = ret +// LoadFromArgs is used to initialize configuration items from args. +func (pt *ParamTable) LoadFromArgs() { + } -func (p *ParamTable) initQueryCoordAddress() { - ret, err := p.Load("_QueryCoordAddress") - if err != nil { - panic(err) - } - p.QueryCoordAddress = ret +func (pt *ParamTable) initPort() { + pt.Port = pt.ParseInt("rootCoord.port") } -func (p *ParamTable) initDataCoordAddress() { - ret, err := p.Load("_DataCoordAddress") - if err != nil { - panic(err) - } - p.DataCoordAddress = ret -} - -func (p *ParamTable) initServerMaxSendSize() { +func (pt *ParamTable) initServerMaxSendSize() { var err error - valueStr, err := p.Load("rootCoord.grpc.serverMaxSendSize") + valueStr, err := pt.Load("rootCoord.grpc.serverMaxSendSize") if err != nil { // not set - p.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize + pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize } value, err := strconv.Atoi(valueStr) @@ -113,21 +91,21 @@ func (p *ParamTable) initServerMaxSendSize() { zap.String("rootCoord.grpc.serverMaxSendSize", valueStr), zap.Error(err)) - p.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize + pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize } else { - p.ServerMaxSendSize = value + pt.ServerMaxSendSize = value } log.Debug("initServerMaxSendSize", - zap.Int("rootCoord.grpc.serverMaxSendSize", p.ServerMaxSendSize)) + zap.Int("rootCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) } -func (p *ParamTable) initServerMaxRecvSize() { +func (pt *ParamTable) initServerMaxRecvSize() { var err error - valueStr, err := p.Load("rootCoord.grpc.serverMaxRecvSize") + valueStr, err := pt.Load("rootCoord.grpc.serverMaxRecvSize") if err != nil { // not set - p.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize + pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize } value, err := strconv.Atoi(valueStr) @@ -136,11 +114,11 @@ func (p *ParamTable) initServerMaxRecvSize() { zap.String("rootCoord.grpc.serverMaxRecvSize", valueStr), zap.Error(err)) - p.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize + pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize } else { - p.ServerMaxRecvSize = value + pt.ServerMaxRecvSize = value } log.Debug("initServerMaxRecvSize", - zap.Int("rootCoord.grpc.serverMaxRecvSize", p.ServerMaxRecvSize)) + zap.Int("rootCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) } diff --git a/internal/distributed/rootcoord/param_table_test.go b/internal/distributed/rootcoord/param_table_test.go index 82b07d2d26..765f1742bc 100644 --- a/internal/distributed/rootcoord/param_table_test.go +++ b/internal/distributed/rootcoord/param_table_test.go @@ -35,15 +35,6 @@ func TestParamTable(t *testing.T) { assert.NotEqual(t, Params.Port, 0) t.Logf("master port = %d", Params.Port) - assert.NotEqual(t, Params.IndexCoordAddress, "") - t.Logf("IndexCoordAddress:%s", Params.IndexCoordAddress) - - assert.NotEqual(t, Params.DataCoordAddress, "") - t.Logf("DataCoordAddress:%s", Params.DataCoordAddress) - - assert.NotEqual(t, Params.QueryCoordAddress, "") - t.Logf("QueryCoordAddress:%s", Params.QueryCoordAddress) - log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) diff --git a/internal/proxy/param_table.go b/internal/proxy/param_table.go index 0257b3ee1b..3cba838462 100644 --- a/internal/proxy/param_table.go +++ b/internal/proxy/param_table.go @@ -44,10 +44,9 @@ type ParamTable struct { Alias string - EtcdEndpoints []string - MetaRootPath string - RootCoordAddress string - PulsarAddress string + EtcdEndpoints []string + MetaRootPath string + PulsarAddress string RocksmqPath string // not used in Proxy diff --git a/internal/querycoord/param_table.go b/internal/querycoord/param_table.go index 6911cfa54b..0fd5383556 100644 --- a/internal/querycoord/param_table.go +++ b/internal/querycoord/param_table.go @@ -93,7 +93,6 @@ func (p *ParamTable) InitOnce() { func (p *ParamTable) Init() { p.BaseTable.Init() - p.initQueryCoordAddress() p.initRoleName() // --- Channels --- @@ -131,14 +130,6 @@ func (p *ParamTable) Init() { p.initMemoryUsageMaxDifferencePercentage() } -func (p *ParamTable) initQueryCoordAddress() { - url, err := p.Load("_QueryCoordAddress") - if err != nil { - panic(err) - } - p.Address = url -} - func (p *ParamTable) initClusterMsgChannelPrefix() { config, err := p.Load("msgChannel.chanNamePrefix.cluster") if err != nil { diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 19d59fdc22..5539a5c75c 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -159,7 +159,7 @@ func (qc *QueryCoord) Init() error { qc.metricsCacheManager = metricsinfo.NewMetricsCacheManager() }) - + log.Debug("query coordinator init success") return initError } diff --git a/internal/util/paramtable/basetable.go b/internal/util/paramtable/basetable.go index dc0422d2af..10c373a4b4 100644 --- a/internal/util/paramtable/basetable.go +++ b/internal/util/paramtable/basetable.go @@ -179,62 +179,6 @@ func (gp *BaseTable) tryloadFromEnv() { } gp.Save("_RocksmqPath", rocksmqPath) - rootCoordAddress := os.Getenv("ROOT_COORD_ADDRESS") - if rootCoordAddress == "" { - rootCoordHost, err := gp.Load("rootCoord.address") - if err != nil { - panic(err) - } - port, err := gp.Load("rootCoord.port") - if err != nil { - panic(err) - } - rootCoordAddress = rootCoordHost + ":" + port - } - gp.Save("_RootCoordAddress", rootCoordAddress) - - indexCoordAddress := os.Getenv("INDEX_COORD_ADDRESS") - if indexCoordAddress == "" { - indexCoordHost, err := gp.Load("indexCoord.address") - if err != nil { - panic(err) - } - port, err := gp.Load("indexCoord.port") - if err != nil { - panic(err) - } - indexCoordAddress = indexCoordHost + ":" + port - } - gp.Save("_IndexCoordAddress", indexCoordAddress) - - queryCoordAddress := os.Getenv("QUERY_COORD_ADDRESS") - if queryCoordAddress == "" { - serviceHost, err := gp.Load("queryCoord.address") - if err != nil { - panic(err) - } - port, err := gp.Load("queryCoord.port") - if err != nil { - panic(err) - } - queryCoordAddress = serviceHost + ":" + port - } - gp.Save("_QueryCoordAddress", queryCoordAddress) - - dataCoordAddress := os.Getenv("DATA_COORD_ADDRESS") - if dataCoordAddress == "" { - serviceHost, err := gp.Load("dataCoord.address") - if err != nil { - panic(err) - } - port, err := gp.Load("dataCoord.port") - if err != nil { - panic(err) - } - dataCoordAddress = serviceHost + ":" + port - } - gp.Save("_DataCoordAddress", dataCoordAddress) - insertBufferFlushSize := os.Getenv("DATA_NODE_IBUFSIZE") if insertBufferFlushSize == "" { insertBufferFlushSize = gp.LoadWithDefault("datanode.flush.insertBufSize", "16777216") diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 98239493aa..d1b70a42ee 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -182,7 +182,7 @@ func (s *Session) getServerIDWithKey(key string, retryTimes uint) (int64, error) // it is false. Otherwise, set it to true. func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, error) { var ch <-chan *clientv3.LeaseKeepAliveResponse - log.Debug("Session Register Begin") + log.Debug("Session Register Begin " + s.ServerName) registerFn := func() error { resp, err := s.etcdCli.Grant(s.ctx, DefaultTTL) if err != nil { @@ -274,7 +274,6 @@ func (s *Session) GetSessions(prefix string) (map[string]*Session, int64, error) if err != nil { return nil, 0, err } - log.Debug("SessionUtil GetSessions", zap.Any("prefix", prefix), zap.Any("resp", resp)) for _, kv := range resp.Kvs { session := &Session{} err = json.Unmarshal(kv.Value, session) @@ -282,6 +281,9 @@ func (s *Session) GetSessions(prefix string) (map[string]*Session, int64, error) return nil, 0, err } _, mapKey := path.Split(string(kv.Key)) + log.Debug("SessionUtil GetSessions ", zap.Any("prefix", prefix), + zap.String("key", mapKey), + zap.Any("address", session.Address)) res[mapKey] = session } return res, resp.Header.Revision, nil