fix: streaming node health check panic (#36336)

issue: #36335

Signed-off-by: chyezh <chyezh@outlook.com>
pull/36181/head
Zhen Ye 2024-09-19 17:11:12 +08:00 committed by GitHub
parent 139787371e
commit f65261215b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 22 additions and 45 deletions

View File

@ -398,6 +398,7 @@ func (mr *MilvusRoles) Run() {
mr.EnableDataNode,
mr.EnableIndexCoord,
mr.EnableIndexNode,
mr.EnableStreamingNode,
}
enableComponents = lo.Filter(enableComponents, func(v bool, _ int) bool {
return v

View File

@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/keepalive"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
@ -48,6 +49,7 @@ import (
streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/funcutil"
@ -78,11 +80,12 @@ type Server struct {
factory dependency.Factory
// component client
etcdCli *clientv3.Client
tikvCli *txnkv.Client
rootCoord types.RootCoordClient
dataCoord types.DataCoordClient
chunkManager storage.ChunkManager
etcdCli *clientv3.Client
tikvCli *txnkv.Client
rootCoord types.RootCoordClient
dataCoord types.DataCoordClient
chunkManager storage.ChunkManager
componentState *componentutil.ComponentStateService
}
// NewServer create a new StreamingNode server.
@ -91,6 +94,7 @@ func NewServer(f dependency.Factory) (*Server, error) {
stopOnce: sync.Once{},
factory: f,
grpcServerChan: make(chan struct{}),
componentState: componentutil.NewComponentStateService(typeutil.StreamingNodeRole),
}, nil
}
@ -120,6 +124,8 @@ func (s *Server) Stop() (err error) {
// stop stops the server.
func (s *Server) stop() {
s.componentState.OnStopping()
addr, _ := s.getAddress()
log.Info("streamingnode stop", zap.String("Address", addr))
@ -162,7 +168,8 @@ func (s *Server) stop() {
// Health check the health status of streamingnode.
func (s *Server) Health(ctx context.Context) commonpb.StateCode {
return s.streamingnode.Health(ctx)
resp, _ := s.componentState.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
return resp.GetState().StateCode
}
func (s *Server) init(ctx context.Context) (err error) {
@ -230,9 +237,10 @@ func (s *Server) start(ctx context.Context) (err error) {
if err := s.startGPRCServer(ctx); err != nil {
return errors.Wrap(err, "StreamingNode start gRPC server fail")
}
// Register current server to etcd.
s.registerSessionToETCD()
s.componentState.OnInitialized(s.session.ServerID)
return nil
}
@ -349,6 +357,7 @@ func (s *Server) initGRPCServer() {
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
)
streamingpb.RegisterStreamingNodeStateServiceServer(s.grpcServer, s.componentState)
}
// allocateAddress allocates a available address for streamingnode grpc server.

View File

@ -32,7 +32,6 @@ type Server struct {
// Init initializes the streamingcoord server.
func (s *Server) Init(ctx context.Context) (err error) {
log.Info("init streamingcoord server...")
s.componentStateService.OnInitializing()
// Init all underlying component of streamingcoord server.
if err := s.initBasicComponent(ctx); err != nil {

View File

@ -9,10 +9,8 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/flusher/flusherimpl"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// ServerBuilder is used to build a server.
@ -87,8 +85,7 @@ func (b *ServerBuilder) Build() *Server {
)
resource.Done()
return &Server{
session: b.session,
grpcServer: b.grpcServer,
componentStateService: componentutil.NewComponentStateService(typeutil.StreamingNodeRole),
session: b.session,
grpcServer: b.grpcServer,
}
}

View File

@ -5,12 +5,9 @@ import (
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/service"
"github.com/milvus-io/milvus/internal/streamingnode/server/walmanager"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
@ -25,9 +22,8 @@ type Server struct {
grpcServer *grpc.Server
// service level instances.
handlerService service.HandlerService
managerService service.ManagerService
componentStateService *componentutil.ComponentStateService // state.
handlerService service.HandlerService
managerService service.ManagerService
// basic component instances.
walManager walmanager.Manager
@ -36,14 +32,12 @@ type Server struct {
// Init initializes the streamingnode server.
func (s *Server) Init(ctx context.Context) (err error) {
log.Info("init streamingnode server...")
s.componentStateService.OnInitializing()
// init all basic components.
s.initBasicComponent(ctx)
// init all service.
s.initService(ctx)
log.Info("streamingnode server initialized")
s.componentStateService.OnInitialized(s.session.ServerID)
return nil
}
@ -56,7 +50,6 @@ func (s *Server) Start() {
// Stop stops the streamingnode server.
func (s *Server) Stop() {
log.Info("stopping streamingnode server...")
s.componentStateService.OnStopping()
log.Info("close wal manager...")
s.walManager.Close()
log.Info("streamingnode server stopped")
@ -65,12 +58,6 @@ func (s *Server) Stop() {
log.Info("flusher stopped")
}
// Health returns the health status of the streamingnode server.
func (s *Server) Health(ctx context.Context) commonpb.StateCode {
resp, _ := s.componentStateService.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
return resp.State.StateCode
}
// initBasicComponent initialize all underlying dependency for streamingnode.
func (s *Server) initBasicComponent(_ context.Context) {
var err error
@ -91,5 +78,4 @@ func (s *Server) initService(_ context.Context) {
func (s *Server) registerGRPCService(grpcServer *grpc.Server) {
streamingpb.RegisterStreamingNodeHandlerServiceServer(grpcServer, s.handlerService)
streamingpb.RegisterStreamingNodeManagerServiceServer(grpcServer, s.managerService)
streamingpb.RegisterStreamingNodeStateServiceServer(grpcServer, s.componentStateService)
}

View File

@ -15,7 +15,7 @@ func NewComponentStateService(role string) *ComponentStateService {
return &ComponentStateService{
nodeID: common.NotRegisteredID,
role: role,
stateCode: commonpb.StateCode_StandBy,
stateCode: commonpb.StateCode_Initializing,
}
}
@ -29,16 +29,6 @@ type ComponentStateService struct {
stateCode commonpb.StateCode
}
// OnInitializing set the state to initializing
func (s *ComponentStateService) OnInitializing() {
s.mu.Lock()
defer s.mu.Unlock()
if s.stateCode != commonpb.StateCode_StandBy {
panic("standby -> initializing")
}
s.stateCode = commonpb.StateCode_Initializing
}
func (s *ComponentStateService) OnInitialized(nodeID int64) {
s.mu.Lock()
defer s.mu.Unlock()

View File

@ -15,11 +15,6 @@ func TestComponentStateService(t *testing.T) {
s := NewComponentStateService("role")
resp, err := s.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_StandBy, resp.State.StateCode)
s.OnInitializing()
resp, err = s.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_Initializing, resp.State.StateCode)
s.OnInitialized(1)