let master use session.ServerID as nodeID (#5385)

let master use session.ServerID as nodeID

also see #5386 

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>
pull/5402/head^2
neza2017 2021-05-25 15:06:05 +08:00 committed by GitHub
parent 88b42304ef
commit c28c34e852
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 194 additions and 68 deletions

View File

@ -125,6 +125,14 @@ func (node *DataNode) SetDataServiceInterface(ds types.DataService) error {
} }
} }
// Register register data node at etcd
func (node *DataNode) Register() error {
node.session = sessionutil.NewSession(node.ctx, []string{Params.EtcdAddress})
node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
Params.NodeID = node.session.ServerID
return nil
}
// Init function supposes data service is in INITIALIZING state. // Init function supposes data service is in INITIALIZING state.
// //
// In Init process, data node will register itself to data service with its node id // In Init process, data node will register itself to data service with its node id
@ -138,9 +146,6 @@ func (node *DataNode) SetDataServiceInterface(ds types.DataService) error {
func (node *DataNode) Init() error { func (node *DataNode) Init() error {
ctx := context.Background() ctx := context.Background()
node.session = sessionutil.NewSession(ctx, []string{Params.EtcdAddress})
node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
req := &datapb.RegisterNodeRequest{ req := &datapb.RegisterNodeRequest{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{
SourceID: node.NodeID, SourceID: node.NodeID,

View File

@ -84,6 +84,10 @@ func (c *mockDataNodeClient) Start() error {
return nil return nil
} }
func (c *mockDataNodeClient) Register() error {
return nil
}
func (c *mockDataNodeClient) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { func (c *mockDataNodeClient) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return &internalpb.ComponentStates{ return &internalpb.ComponentStates{
State: &internalpb.ComponentInfo{ State: &internalpb.ComponentInfo{
@ -134,6 +138,10 @@ func (m *mockMasterService) Stop() error {
return nil return nil
} }
func (m *mockMasterService) Register() error {
return nil
}
func (m *mockMasterService) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { func (m *mockMasterService) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return &internalpb.ComponentStates{ return &internalpb.ComponentStates{
State: &internalpb.ComponentInfo{ State: &internalpb.ComponentInfo{

View File

@ -107,9 +107,15 @@ func (s *Server) SetMasterClient(masterClient types.MasterService) {
s.masterClient = masterClient s.masterClient = masterClient
} }
func (s *Server) Init() error { // Register register data service at etcd
func (s *Server) Register() error {
s.session = sessionutil.NewSession(s.ctx, []string{Params.EtcdAddress}) s.session = sessionutil.NewSession(s.ctx, []string{Params.EtcdAddress})
s.session.Init(typeutil.DataServiceRole, Params.IP, true) s.session.Init(typeutil.DataServiceRole, Params.IP, true)
Params.NodeID = s.session.ServerID
return nil
}
func (s *Server) Init() error {
return nil return nil
} }

View File

@ -75,6 +75,11 @@ func (c *Client) Stop() error {
return c.conn.Close() return c.conn.Close()
} }
// Register dummy
func (c *Client) Register() error {
return nil
}
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return c.grpc.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) return c.grpc.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
} }

View File

@ -169,7 +169,11 @@ func (s *Server) init() error {
addr := Params.IP + ":" + strconv.Itoa(Params.Port) addr := Params.IP + ":" + strconv.Itoa(Params.Port)
log.Debug("DataNode address", zap.String("address", addr)) log.Debug("DataNode address", zap.String("address", addr))
err := s.startGrpc() err := s.datanode.Register()
if err != nil {
return err
}
err = s.startGrpc()
if err != nil { if err != nil {
return err return err
} }

View File

@ -75,6 +75,11 @@ func (c *Client) Stop() error {
return c.conn.Close() return c.conn.Close()
} }
// Register dumy
func (c *Client) Register() error {
return nil
}
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
} }

View File

@ -91,7 +91,12 @@ func (s *Server) init() error {
dataservice.Params.IP = Params.IP dataservice.Params.IP = Params.IP
dataservice.Params.Port = Params.Port dataservice.Params.Port = Params.Port
err := s.startGrpc() err := s.dataService.Register()
if err != nil {
return err
}
err = s.startGrpc()
if err != nil { if err != nil {
return err return err
} }

View File

@ -71,6 +71,11 @@ func (c *Client) Stop() error {
return nil return nil
} }
// Register dummy
func (c *Client) Register() error {
return nil
}
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
} }

View File

@ -123,6 +123,11 @@ func (s *Server) init() error {
} }
}() }()
err = s.indexnode.Register()
if err != nil {
return err
}
s.loopWg.Add(1) s.loopWg.Add(1)
go s.startGrpcLoop(Params.Port) go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start // wait for grpc server loop start

View File

@ -75,6 +75,11 @@ func (c *Client) Stop() error {
return nil return nil
} }
// Register dummy
func (c *Client) Register() error {
return nil
}
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
} }

View File

@ -72,6 +72,10 @@ func (s *Server) init() error {
closer := trace.InitTracing("index_service") closer := trace.InitTracing("index_service")
s.closer = closer s.closer = closer
if err := s.indexservice.Register(); err != nil {
return err
}
s.loopWg.Add(1) s.loopWg.Add(1)
go s.startGrpcLoop(Params.ServicePort) go s.startGrpcLoop(Params.ServicePort)
// wait for grpc indexservice loop start // wait for grpc indexservice loop start

View File

@ -129,6 +129,11 @@ func (c *GrpcClient) Stop() error {
return c.conn.Close() return c.conn.Close()
} }
// Register dummy
func (c *GrpcClient) Register() error {
return nil
}
func (c *GrpcClient) recall(caller func() (interface{}, error)) (interface{}, error) { func (c *GrpcClient) recall(caller func() (interface{}, error)) (interface{}, error) {
ret, err := caller() ret, err := caller()
if err == nil { if err == nil {

View File

@ -129,13 +129,16 @@ func TestGrpcService(t *testing.T) {
t.Logf("master service port = %d", Params.Port) t.Logf("master service port = %d", Params.Port)
core, ok := (svr.masterService).(*cms.Core)
assert.True(t, ok)
err = core.Register()
assert.Nil(t, err)
err = svr.startGrpc() err = svr.startGrpc()
assert.Nil(t, err) assert.Nil(t, err)
svr.masterService.UpdateStateCode(internalpb.StateCode_Initializing) svr.masterService.UpdateStateCode(internalpb.StateCode_Initializing)
core, ok := (svr.masterService).(*cms.Core)
assert.True(t, ok)
etcdCli, err := initEtcd(cms.Params.EtcdAddress) etcdCli, err := initEtcd(cms.Params.EtcdAddress)
assert.Nil(t, err) assert.Nil(t, err)
_, err = etcdCli.Delete(ctx, sessionutil.DefaultServiceRoot, clientv3.WithPrefix()) _, err = etcdCli.Delete(ctx, sessionutil.DefaultServiceRoot, clientv3.WithPrefix())
@ -806,6 +809,10 @@ func (m *mockCore) SetQueryService(types.QueryService) error {
return nil return nil
} }
func (m *mockCore) Register() error {
return nil
}
func (m *mockCore) Init() error { func (m *mockCore) Init() error {
return nil return nil
} }

View File

@ -161,7 +161,13 @@ func (s *Server) init() error {
log.Debug("init params done") log.Debug("init params done")
if err := s.startGrpc(); err != nil { err := s.masterService.Register()
if err != nil {
return err
}
err = s.startGrpc()
if err != nil {
return err return err
} }

View File

@ -70,6 +70,11 @@ func (c *Client) Stop() error {
return nil return nil
} }
// Register dummy
func (c *Client) Register() error {
return nil
}
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
} }

View File

@ -164,6 +164,11 @@ func (s *Server) init() error {
} }
}() }()
err = s.proxynode.Register()
if err != nil {
return err
}
s.wg.Add(1) s.wg.Add(1)
go s.startGrpcLoop(Params.Port) go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start // wait for grpc server loop start

View File

@ -70,6 +70,11 @@ func (c *Client) Stop() error {
return nil return nil
} }
// Register dummy
func (c *Client) Register() error {
return nil
}
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return c.proxyServiceClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) return c.proxyServiceClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
} }

View File

@ -72,6 +72,11 @@ func (c *Client) Stop() error {
return c.conn.Close() return c.conn.Close()
} }
// Register dummy
func (c *Client) Register() error {
return nil
}
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
} }

View File

@ -91,6 +91,10 @@ func (s *Server) init() error {
closer := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort)) closer := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort))
s.closer = closer s.closer = closer
if err := s.querynode.Register(); err != nil {
return err
}
log.Debug("QueryNode", zap.Int("port", Params.QueryNodePort)) log.Debug("QueryNode", zap.Int("port", Params.QueryNodePort))
s.wg.Add(1) s.wg.Add(1)
go s.startGrpcLoop(Params.QueryNodePort) go s.startGrpcLoop(Params.QueryNodePort)

View File

@ -79,6 +79,11 @@ func (c *Client) Stop() error {
return c.conn.Close() return c.conn.Close()
} }
// Register dummy
func (c *Client) Register() error {
return nil
}
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
} }

View File

@ -96,6 +96,10 @@ func (s *Server) init() error {
closer := trace.InitTracing("query_service") closer := trace.InitTracing("query_service")
s.closer = closer s.closer = closer
if err := s.queryservice.Register(); err != nil {
return err
}
s.wg.Add(1) s.wg.Add(1)
go s.startGrpcLoop(Params.Port) go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start // wait for grpc server loop start

View File

@ -77,10 +77,16 @@ func NewIndexNode(ctx context.Context) (*IndexNode, error) {
return b, nil return b, nil
} }
// Register register index node at etcd
func (i *IndexNode) Register() error {
i.session = sessionutil.NewSession(i.loopCtx, []string{Params.EtcdAddress})
i.session.Init(typeutil.IndexNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
Params.NodeID = i.session.ServerID
return nil
}
func (i *IndexNode) Init() error { func (i *IndexNode) Init() error {
ctx := context.Background() ctx := context.Background()
i.session = sessionutil.NewSession(ctx, []string{Params.EtcdAddress})
i.session.Init(typeutil.IndexNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
err := funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 1000000, time.Millisecond*200) err := funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 1000000, time.Millisecond*200)
if err != nil { if err != nil {

View File

@ -85,13 +85,16 @@ func NewIndexService(ctx context.Context) (*IndexService, error) {
return i, nil return i, nil
} }
// Register register index service at etcd
func (i *IndexService) Register() error {
i.session = sessionutil.NewSession(i.loopCtx, []string{Params.EtcdAddress})
i.session.Init(typeutil.IndexServiceRole, Params.Address, true)
return nil
}
func (i *IndexService) Init() error { func (i *IndexService) Init() error {
log.Debug("indexservice", zap.String("etcd address", Params.EtcdAddress)) log.Debug("indexservice", zap.String("etcd address", Params.EtcdAddress))
ctx := context.Background()
i.session = sessionutil.NewSession(ctx, []string{Params.EtcdAddress})
i.session.Init(typeutil.IndexServiceRole, Params.Address, true)
connectEtcdFn := func() error { connectEtcdFn := func() error {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
if err != nil { if err != nil {

View File

@ -523,7 +523,7 @@ func (c *Core) setMsgStreams() error {
MsgType: commonpb.MsgType_TimeTick, MsgType: commonpb.MsgType_TimeTick,
MsgID: 0, MsgID: 0,
Timestamp: t, Timestamp: t,
SourceID: int64(Params.NodeID), SourceID: c.session.ServerID,
}, },
} }
timeTickMsg := &ms.TimeTickMsg{ timeTickMsg := &ms.TimeTickMsg{
@ -678,7 +678,7 @@ func (c *Core) SetProxyService(ctx context.Context, s types.ProxyService) error
MsgType: 0, //TODO,MsgType MsgType: 0, //TODO,MsgType
MsgID: 0, MsgID: 0,
Timestamp: ts, Timestamp: ts,
SourceID: int64(Params.NodeID), SourceID: c.session.ServerID,
}, },
DbName: dbName, DbName: dbName,
CollectionName: collectionName, CollectionName: collectionName,
@ -712,7 +712,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
MsgType: 0, //TODO, msg type MsgType: 0, //TODO, msg type
MsgID: 0, MsgID: 0,
Timestamp: ts, Timestamp: ts,
SourceID: int64(Params.NodeID), SourceID: c.session.ServerID,
}, },
SegmentID: segID, SegmentID: segID,
}) })
@ -740,7 +740,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
MsgType: 0, //TODO, msg type MsgType: 0, //TODO, msg type
MsgID: 0, MsgID: 0,
Timestamp: ts, Timestamp: ts,
SourceID: int64(Params.NodeID), SourceID: c.session.ServerID,
}, },
SegmentIDs: []typeutil.UniqueID{segID}, SegmentIDs: []typeutil.UniqueID{segID},
}) })
@ -804,7 +804,7 @@ func (c *Core) SetQueryService(s types.QueryService) error {
MsgType: commonpb.MsgType_ReleaseCollection, MsgType: commonpb.MsgType_ReleaseCollection,
MsgID: 0, //TODO, msg ID MsgID: 0, //TODO, msg ID
Timestamp: ts, Timestamp: ts,
SourceID: int64(Params.NodeID), SourceID: c.session.ServerID,
}, },
DbID: dbID, DbID: dbID,
CollectionID: collectionID, CollectionID: collectionID,
@ -849,12 +849,16 @@ func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema,
return bldID, nil return bldID, nil
} }
// Register register master service at etcd
func (c *Core) Register() error {
c.session = sessionutil.NewSession(c.ctx, []string{Params.EtcdAddress})
c.session.Init(typeutil.MasterServiceRole, Params.Address, true)
return nil
}
func (c *Core) Init() error { func (c *Core) Init() error {
var initError error = nil var initError error = nil
c.initOnce.Do(func() { c.initOnce.Do(func() {
c.session = sessionutil.NewSession(c.ctx, []string{Params.EtcdAddress})
c.session.Init(typeutil.MasterServiceRole, Params.Address, true)
connectEtcdFn := func() error { connectEtcdFn := func() error {
if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}); initError != nil { if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}); initError != nil {
return initError return initError
@ -915,7 +919,7 @@ func (c *Core) Init() error {
if initError = c.msFactory.SetParams(m); initError != nil { if initError = c.msFactory.SetParams(m); initError != nil {
return return
} }
c.chanTimeTick, initError = newTimeTickSync(c.ctx, c.msFactory, c.etcdCli) c.chanTimeTick, initError = newTimeTickSync(c)
if initError != nil { if initError != nil {
return return
} }
@ -1000,7 +1004,7 @@ func (c *Core) Start() error {
return err return err
} }
log.Debug("master", zap.Int64("node id", int64(Params.NodeID))) log.Debug("master", zap.Int64("node id", c.session.ServerID))
log.Debug("master", zap.String("dd channel name", Params.DdChannel)) log.Debug("master", zap.String("dd channel name", Params.DdChannel))
log.Debug("master", zap.String("time tick channel name", Params.TimeTickChannel)) log.Debug("master", zap.String("time tick channel name", Params.TimeTickChannel))
@ -1032,7 +1036,7 @@ func (c *Core) GetComponentStates(ctx context.Context) (*internalpb.ComponentSta
return &internalpb.ComponentStates{ return &internalpb.ComponentStates{
State: &internalpb.ComponentInfo{ State: &internalpb.ComponentInfo{
NodeID: int64(Params.NodeID), NodeID: c.session.ServerID,
Role: typeutil.MasterServiceRole, Role: typeutil.MasterServiceRole,
StateCode: code, StateCode: code,
ExtraInfo: nil, ExtraInfo: nil,
@ -1043,7 +1047,7 @@ func (c *Core) GetComponentStates(ctx context.Context) (*internalpb.ComponentSta
}, },
SubcomponentStates: []*internalpb.ComponentInfo{ SubcomponentStates: []*internalpb.ComponentInfo{
{ {
NodeID: int64(Params.NodeID), NodeID: c.session.ServerID,
Role: typeutil.MasterServiceRole, Role: typeutil.MasterServiceRole,
StateCode: code, StateCode: code,
ExtraInfo: nil, ExtraInfo: nil,

View File

@ -210,6 +210,9 @@ func TestMasterService(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
randVal := rand.Int() randVal := rand.Int()
err = core.Register()
assert.Nil(t, err)
Params.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal) Params.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal)
Params.DdChannel = fmt.Sprintf("master-dd-%d", randVal) Params.DdChannel = fmt.Sprintf("master-dd-%d", randVal)
Params.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal) Params.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal)
@ -1712,6 +1715,9 @@ func TestMasterService2(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
randVal := rand.Int() randVal := rand.Int()
err = core.Register()
assert.Nil(t, err)
Params.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal) Params.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal)
Params.DdChannel = fmt.Sprintf("master-dd-%d", randVal) Params.DdChannel = fmt.Sprintf("master-dd-%d", randVal)
Params.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal) Params.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal)

View File

@ -12,7 +12,6 @@
package masterservice package masterservice
import ( import (
"fmt"
"path" "path"
"strconv" "strconv"
"sync" "sync"
@ -27,8 +26,6 @@ var once sync.Once
type ParamTable struct { type ParamTable struct {
paramtable.BaseTable paramtable.BaseTable
NodeID uint64
Address string Address string
Port int Port int
@ -65,8 +62,6 @@ func (p *ParamTable) Init() {
panic(err) panic(err)
} }
p.initNodeID()
p.initPulsarAddress() p.initPulsarAddress()
p.initEtcdAddress() p.initEtcdAddress()
p.initMetaRootPath() p.initMetaRootPath()
@ -90,10 +85,6 @@ func (p *ParamTable) Init() {
}) })
} }
func (p *ParamTable) initNodeID() {
p.NodeID = uint64(p.ParseInt64("master.nodeID"))
}
func (p *ParamTable) initPulsarAddress() { func (p *ParamTable) initPulsarAddress() {
addr, err := p.Load("_PulsarAddress") addr, err := p.Load("_PulsarAddress")
if err != nil { if err != nil {
@ -227,12 +218,12 @@ func (p *ParamTable) initLogCfg() {
panic(err) panic(err)
} }
if len(rootPath) != 0 { if len(rootPath) != 0 {
p.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("masterservice-%d.log", p.NodeID)) p.Log.File.Filename = path.Join(rootPath, "masterservice.log")
} else { } else {
p.Log.File.Filename = "" p.Log.File.Filename = ""
} }
} }
func (p *ParamTable) initRoleName() { func (p *ParamTable) initRoleName() {
p.RoleName = fmt.Sprintf("%s-%d", "MasterService", p.NodeID) p.RoleName = "MasterService"
} }

View File

@ -20,9 +20,6 @@ import (
func TestParamTable(t *testing.T) { func TestParamTable(t *testing.T) {
Params.Init() Params.Init()
assert.NotEqual(t, Params.NodeID, 0)
t.Logf("master node ID = %d", Params.NodeID)
assert.NotEqual(t, Params.PulsarAddress, "") assert.NotEqual(t, Params.PulsarAddress, "")
t.Logf("pulsar address = %s", Params.PulsarAddress) t.Logf("pulsar address = %s", Params.PulsarAddress)

View File

@ -30,30 +30,26 @@ import (
) )
type timetickSync struct { type timetickSync struct {
core *Core
lock sync.Mutex lock sync.Mutex
ctx context.Context
etcdCli *clientv3.Client
msFactory msgstream.Factory
proxyTimeTick map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg proxyTimeTick map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg
chanStream map[string]msgstream.MsgStream chanStream map[string]msgstream.MsgStream
sendChan chan map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg sendChan chan map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg
} }
func newTimeTickSync(ctx context.Context, factory msgstream.Factory, cli *clientv3.Client) (*timetickSync, error) { func newTimeTickSync(core *Core) (*timetickSync, error) {
tss := timetickSync{ tss := timetickSync{
lock: sync.Mutex{}, lock: sync.Mutex{},
ctx: ctx, core: core,
etcdCli: cli,
msFactory: factory,
proxyTimeTick: make(map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg), proxyTimeTick: make(map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg),
chanStream: make(map[string]msgstream.MsgStream), chanStream: make(map[string]msgstream.MsgStream),
sendChan: make(chan map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg, 16), sendChan: make(chan map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg, 16),
} }
ctx2, cancel := context.WithTimeout(ctx, RequestTimeout) ctx2, cancel := context.WithTimeout(core.ctx, RequestTimeout)
defer cancel() defer cancel()
resp, err := cli.Get(ctx2, ProxyMetaPrefix, clientv3.WithPrefix()) resp, err := core.etcdCli.Get(ctx2, ProxyMetaPrefix, clientv3.WithPrefix())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -108,11 +104,11 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg) error {
// StartWatch watch proxy node change and process all channels' timetick msg // StartWatch watch proxy node change and process all channels' timetick msg
func (t *timetickSync) StartWatch() { func (t *timetickSync) StartWatch() {
proxyNodePrefix := path.Join(sessionutil.DefaultServiceRoot, typeutil.ProxyNodeRole) proxyNodePrefix := path.Join(sessionutil.DefaultServiceRoot, typeutil.ProxyNodeRole)
rch := t.etcdCli.Watch(t.ctx, proxyNodePrefix, clientv3.WithPrefix(), clientv3.WithCreatedNotify()) rch := t.core.etcdCli.Watch(t.core.ctx, proxyNodePrefix, clientv3.WithPrefix(), clientv3.WithCreatedNotify())
for { for {
select { select {
case <-t.ctx.Done(): case <-t.core.ctx.Done():
log.Debug("timetickSync context done", zap.Error(t.ctx.Err())) log.Debug("master service context done", zap.Error(t.core.ctx.Err()))
return return
case wresp, ok := <-rch: case wresp, ok := <-rch:
if !ok { if !ok {
@ -151,7 +147,7 @@ func (t *timetickSync) StartWatch() {
} }
case ptt, ok := <-t.sendChan: case ptt, ok := <-t.sendChan:
if !ok { if !ok {
log.Debug("timetickSync sendChan closed", zap.Error(t.ctx.Err())) log.Debug("timetickSync sendChan closed")
return return
} }
// reduce each channel to get min timestamp // reduce each channel to get min timestamp
@ -170,7 +166,7 @@ func (t *timetickSync) StartWatch() {
// send timetick msg to msg stream // send timetick msg to msg stream
for chanName, chanTs := range chanName2TimeTickMap { for chanName, chanTs := range chanName2TimeTickMap {
if err := t.SendChannelTimeTick(chanName, chanTs); err != nil { if err := t.SendChannelTimeTick(chanName, chanTs); err != nil {
log.Debug("SendChannelTimeTick fail", zap.Error(t.ctx.Err())) log.Debug("SendChannelTimeTick fail", zap.Error(err))
} }
} }
} }
@ -190,7 +186,7 @@ func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestam
MsgType: commonpb.MsgType_TimeTick, MsgType: commonpb.MsgType_TimeTick,
MsgID: 0, MsgID: 0,
Timestamp: ts, Timestamp: ts,
SourceID: int64(Params.NodeID), SourceID: t.core.session.ServerID,
}, },
} }
timeTickMsg := &msgstream.TimeTickMsg{ timeTickMsg := &msgstream.TimeTickMsg{
@ -207,7 +203,7 @@ func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestam
var stream msgstream.MsgStream var stream msgstream.MsgStream
stream, ok := t.chanStream[chanName] stream, ok := t.chanStream[chanName]
if !ok { if !ok {
stream, err = t.msFactory.NewMsgStream(t.ctx) stream, err = t.core.msFactory.NewMsgStream(t.core.ctx)
if err != nil { if err != nil {
return err return err
} }

View File

@ -85,13 +85,18 @@ func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, e
} }
// Register register proxy node at etcd
func (node *ProxyNode) Register() error {
node.session = sessionutil.NewSession(node.ctx, []string{Params.EtcdAddress})
node.session.Init(typeutil.ProxyNodeRole, Params.NetworkAddress, false)
Params.ProxyID = node.session.ServerID
return nil
}
func (node *ProxyNode) Init() error { func (node *ProxyNode) Init() error {
// todo wait for proxyservice state changed to Healthy // todo wait for proxyservice state changed to Healthy
ctx := context.Background() ctx := context.Background()
node.session = sessionutil.NewSession(ctx, []string{Params.EtcdAddress})
node.session.Init(typeutil.ProxyNodeRole, Params.NetworkAddress, false)
err := funcutil.WaitForComponentHealthy(ctx, node.proxyService, "ProxyService", 1000000, time.Millisecond*200) err := funcutil.WaitForComponentHealthy(ctx, node.proxyService, "ProxyService", 1000000, time.Millisecond*200)
if err != nil { if err != nil {
return err return err

View File

@ -118,12 +118,17 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer
return node return node
} }
// Register register query node at etcd
func (node *QueryNode) Register() error {
node.session = sessionutil.NewSession(node.queryNodeLoopCtx, []string{Params.EtcdAddress})
node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false)
Params.QueryNodeID = node.session.ServerID
return nil
}
func (node *QueryNode) Init() error { func (node *QueryNode) Init() error {
ctx := context.Background() ctx := context.Background()
node.session = sessionutil.NewSession(ctx, []string{Params.EtcdAddress})
node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false)
C.SegcoreInit() C.SegcoreInit()
registerReq := &queryPb.RegisterNodeRequest{ registerReq := &queryPb.RegisterNodeRequest{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{

View File

@ -56,11 +56,15 @@ type QueryService struct {
msFactory msgstream.Factory msFactory msgstream.Factory
} }
func (qs *QueryService) Init() error { // Register register query service at etcd
ctx := context.Background() func (qs *QueryService) Register() error {
qs.session = sessionutil.NewSession(qs.loopCtx, []string{Params.EtcdAddress})
qs.session = sessionutil.NewSession(ctx, []string{Params.EtcdAddress})
qs.session.Init(typeutil.QueryServiceRole, Params.Address, true) qs.session.Init(typeutil.QueryServiceRole, Params.Address, true)
Params.NodeID = uint64(qs.session.ServerID)
return nil
}
func (qs *QueryService) Init() error {
return nil return nil
} }

View File

@ -34,6 +34,7 @@ type Component interface {
Stop() error Stop() error
GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error)
GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error)
Register() error
} }
type DataNode interface { type DataNode interface {