mirror of https://github.com/milvus-io/milvus.git
Make data node start only once (#24031)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/24035/head
parent
67cf23d050
commit
084424d636
|
@ -116,6 +116,7 @@ type DataNode struct {
|
|||
|
||||
//call once
|
||||
initOnce sync.Once
|
||||
startOnce sync.Once
|
||||
sessionMu sync.Mutex // to fix data race
|
||||
session *sessionutil.Session
|
||||
watchKv kv.MetaKv
|
||||
|
@ -485,54 +486,62 @@ func (node *DataNode) BackGroundGC(vChannelCh <-chan string) {
|
|||
|
||||
// Start will update DataNode state to HEALTHY
|
||||
func (node *DataNode) Start() error {
|
||||
if err := node.allocator.Start(); err != nil {
|
||||
log.Error("failed to start id allocator", zap.Error(err), zap.String("role", typeutil.DataNodeRole))
|
||||
return err
|
||||
}
|
||||
log.Info("start id allocator done", zap.String("role", typeutil.DataNodeRole))
|
||||
var startErr error
|
||||
node.startOnce.Do(func() {
|
||||
if err := node.allocator.Start(); err != nil {
|
||||
log.Error("failed to start id allocator", zap.Error(err), zap.String("role", typeutil.DataNodeRole))
|
||||
startErr = err
|
||||
return
|
||||
}
|
||||
log.Info("start id allocator done", zap.String("role", typeutil.DataNodeRole))
|
||||
|
||||
rep, err := node.rootCoord.AllocTimestamp(node.ctx, &rootcoordpb.AllocTimestampRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO),
|
||||
commonpbutil.WithMsgID(0),
|
||||
commonpbutil.WithSourceID(paramtable.GetNodeID()),
|
||||
),
|
||||
Count: 1,
|
||||
})
|
||||
if err != nil || rep.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
log.Warn("fail to alloc timestamp", zap.Any("rep", rep), zap.Error(err))
|
||||
startErr = errors.New("DataNode fail to alloc timestamp")
|
||||
return
|
||||
}
|
||||
|
||||
connectEtcdFn := func() error {
|
||||
etcdKV := etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
|
||||
node.watchKv = etcdKV
|
||||
return nil
|
||||
}
|
||||
err = retry.Do(node.ctx, connectEtcdFn, retry.Attempts(ConnectEtcdMaxRetryTime))
|
||||
if err != nil {
|
||||
startErr = errors.New("DataNode fail to connect etcd")
|
||||
return
|
||||
}
|
||||
|
||||
chunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx)
|
||||
|
||||
if err != nil {
|
||||
startErr = err
|
||||
return
|
||||
}
|
||||
|
||||
node.chunkManager = chunkManager
|
||||
|
||||
go node.BackGroundGC(node.clearSignal)
|
||||
|
||||
go node.compactionExecutor.start(node.ctx)
|
||||
|
||||
// Start node watch node
|
||||
go node.StartWatchChannels(node.ctx)
|
||||
|
||||
go node.flowgraphManager.start()
|
||||
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
|
||||
rep, err := node.rootCoord.AllocTimestamp(node.ctx, &rootcoordpb.AllocTimestampRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO),
|
||||
commonpbutil.WithMsgID(0),
|
||||
commonpbutil.WithSourceID(paramtable.GetNodeID()),
|
||||
),
|
||||
Count: 1,
|
||||
})
|
||||
if err != nil || rep.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
log.Warn("fail to alloc timestamp", zap.Any("rep", rep), zap.Error(err))
|
||||
return errors.New("DataNode fail to alloc timestamp")
|
||||
}
|
||||
|
||||
connectEtcdFn := func() error {
|
||||
etcdKV := etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
|
||||
node.watchKv = etcdKV
|
||||
return nil
|
||||
}
|
||||
err = retry.Do(node.ctx, connectEtcdFn, retry.Attempts(ConnectEtcdMaxRetryTime))
|
||||
if err != nil {
|
||||
return errors.New("DataNode fail to connect etcd")
|
||||
}
|
||||
|
||||
chunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
node.chunkManager = chunkManager
|
||||
|
||||
go node.BackGroundGC(node.clearSignal)
|
||||
|
||||
go node.compactionExecutor.start(node.ctx)
|
||||
|
||||
// Start node watch node
|
||||
go node.StartWatchChannels(node.ctx)
|
||||
|
||||
go node.flowgraphManager.start()
|
||||
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
return nil
|
||||
return startErr
|
||||
}
|
||||
|
||||
// UpdateStateCode updates datanode's state code
|
||||
|
|
Loading…
Reference in New Issue