mirror of https://github.com/milvus-io/milvus.git
Add datacoord Server comment and improve para (#6182)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/6184/head
parent
d53b232adf
commit
c49ce4ddc2
|
@ -133,10 +133,13 @@ func (c *cluster) refresh(dataNodes []*datapb.DataNodeInfo) error {
|
|||
}
|
||||
|
||||
// paraRun parallel run, with max Parallel limit
|
||||
func parraRun(works []func(), maxRunner int) {
|
||||
func paraRun(works []func(), maxRunner int) {
|
||||
wg := sync.WaitGroup{}
|
||||
ch := make(chan func())
|
||||
wg.Add(len(works))
|
||||
if maxRunner > len(works) {
|
||||
maxRunner = len(works)
|
||||
}
|
||||
|
||||
for i := 0; i < maxRunner; i++ {
|
||||
go func() {
|
||||
|
@ -210,6 +213,7 @@ func (c *cluster) watch(nodes []*datapb.DataNodeInfo) ([]*datapb.DataNodeInfo, e
|
|||
mut.Lock()
|
||||
errs = append(errs, err)
|
||||
mut.Unlock()
|
||||
return
|
||||
}
|
||||
if resp.ErrorCode != commonpb.ErrorCode_Success {
|
||||
log.Warn("watch channels failed", zap.String("address", n.Address), zap.Error(err))
|
||||
|
@ -225,8 +229,11 @@ func (c *cluster) watch(nodes []*datapb.DataNodeInfo) ([]*datapb.DataNodeInfo, e
|
|||
}
|
||||
})
|
||||
}
|
||||
parraRun(works, 3)
|
||||
return nodes, retry.ErrorList(errs)
|
||||
paraRun(works, 20)
|
||||
if len(errs) > 0 {
|
||||
return nodes, retry.ErrorList(errs)
|
||||
}
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
func (c *cluster) register(n *datapb.DataNodeInfo) {
|
||||
|
|
|
@ -19,7 +19,7 @@ import (
|
|||
const serverNotServingErrMsg = "server is not serving"
|
||||
|
||||
func (s *Server) isClosed() bool {
|
||||
return atomic.LoadInt64(&s.isServing) != 2
|
||||
return atomic.LoadInt64(&s.isServing) != ServerStateHealthy
|
||||
}
|
||||
|
||||
func (s *Server) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
|
@ -354,9 +354,9 @@ func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentS
|
|||
}
|
||||
state := atomic.LoadInt64(&s.isServing)
|
||||
switch state {
|
||||
case 1:
|
||||
case ServerStateInitializing:
|
||||
resp.State.StateCode = internalpb.StateCode_Initializing
|
||||
case 2:
|
||||
case ServerStateHealthy:
|
||||
resp.State.StateCode = internalpb.StateCode_Healthy
|
||||
default:
|
||||
resp.State.StateCode = internalpb.StateCode_Abnormal
|
||||
|
|
|
@ -48,15 +48,29 @@ type (
|
|||
Timestamp = typeutil.Timestamp
|
||||
)
|
||||
|
||||
// ServerState type alias
|
||||
type ServerState = int64
|
||||
|
||||
const (
|
||||
// ServerStateStopped state stands for just created or stopped `Server` instance
|
||||
ServerStateStopped ServerState = 0
|
||||
// ServerStateInitializing state stands initializing `Server` instance
|
||||
ServerStateInitializing ServerState = 1
|
||||
// ServerStateHealthy state stands for healthy `Server` instance
|
||||
ServerStateHealthy ServerState = 2
|
||||
)
|
||||
|
||||
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
|
||||
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
|
||||
|
||||
// Server implements `types.Datacoord`
|
||||
// handles Data Cooridinator related jobs
|
||||
type Server struct {
|
||||
ctx context.Context
|
||||
serverLoopCtx context.Context
|
||||
serverLoopCancel context.CancelFunc
|
||||
serverLoopWg sync.WaitGroup
|
||||
isServing int64
|
||||
isServing ServerState
|
||||
|
||||
kvClient *etcdkv.EtcdKV
|
||||
meta *meta
|
||||
|
@ -79,6 +93,7 @@ type Server struct {
|
|||
rootCoordClientCreator rootCoordCreatorFunc
|
||||
}
|
||||
|
||||
// CreateServer create `Server` instance
|
||||
func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
s := &Server{
|
||||
|
@ -107,11 +122,19 @@ func (s *Server) Register() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Init change server state to Initializing
|
||||
func (s *Server) Init() error {
|
||||
atomic.StoreInt64(&s.isServing, 1)
|
||||
atomic.StoreInt64(&s.isServing, ServerStateInitializing)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start initialize `Server` members and start loops, follow steps are taken:
|
||||
// 1. initialize message factory parameters
|
||||
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
|
||||
// allocator, segment manager
|
||||
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
|
||||
// datanodes etcd watch, etcd alive check and flush completed status check
|
||||
// 4. set server state to Healthy
|
||||
func (s *Server) Start() error {
|
||||
var err error
|
||||
m := map[string]interface{}{
|
||||
|
@ -151,7 +174,7 @@ func (s *Server) Start() error {
|
|||
|
||||
s.startServerLoop()
|
||||
|
||||
atomic.StoreInt64(&s.isServing, 2)
|
||||
atomic.StoreInt64(&s.isServing, ServerStateHealthy)
|
||||
log.Debug("DataCoordinator startup success")
|
||||
return nil
|
||||
}
|
||||
|
@ -482,12 +505,16 @@ func (s *Server) initRootCoordClient() error {
|
|||
return s.rootCoordClient.Start()
|
||||
}
|
||||
|
||||
// Stop do the Server finalize processes
|
||||
// it checks the server status is healthy, if not, just quit
|
||||
// if Server is healthy, set server state to stopped, release etcd session,
|
||||
// stop message stream client and stop server loops
|
||||
func (s *Server) Stop() error {
|
||||
if !atomic.CompareAndSwapInt64(&s.isServing, 2, 0) {
|
||||
if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
|
||||
return nil
|
||||
}
|
||||
log.Debug("DataCoord server shutdown")
|
||||
atomic.StoreInt64(&s.isServing, 0)
|
||||
atomic.StoreInt64(&s.isServing, ServerStateStopped)
|
||||
s.cluster.releaseSessions()
|
||||
s.segmentInfoStream.Close()
|
||||
s.flushMsgStream.Close()
|
||||
|
|
Loading…
Reference in New Issue