Fix the nil point about the session (#22748)

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/22757/head
SimFG 2023-03-14 20:07:54 +08:00 committed by GitHub
parent 50265292d8
commit b57e476089
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 13 additions and 10 deletions

View File

@ -123,7 +123,7 @@ func (s *Server) getDataCoordMetrics() metricsinfo.DataCoordInfos {
CreatedTime: paramtable.GetCreateTime().String(), CreatedTime: paramtable.GetCreateTime().String(),
UpdatedTime: paramtable.GetUpdateTime().String(), UpdatedTime: paramtable.GetUpdateTime().String(),
Type: typeutil.DataCoordRole, Type: typeutil.DataCoordRole,
ID: s.session.ServerID, ID: paramtable.GetNodeID(),
}, },
SystemConfigurations: metricsinfo.DataCoordConfiguration{ SystemConfigurations: metricsinfo.DataCoordConfiguration{
SegmentMaxSize: Params.DataCoordCfg.SegmentMaxSize.GetAsFloat(), SegmentMaxSize: Params.DataCoordCfg.SegmentMaxSize.GetAsFloat(),

View File

@ -250,7 +250,7 @@ func (s *Server) Register() error {
} }
} }
go s.session.LivenessCheck(s.serverLoopCtx, func() { go s.session.LivenessCheck(s.serverLoopCtx, func() {
logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID)) logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", paramtable.GetNodeID()))
if err := s.Stop(); err != nil { if err := s.Stop(); err != nil {
logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err)) logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err))
} }
@ -735,7 +735,7 @@ func (s *Server) startWatchService(ctx context.Context) {
func (s *Server) stopServiceWatch() { func (s *Server) stopServiceWatch() {
// ErrCompacted is handled inside SessionWatcher, which means there is some other error occurred, closing server. // ErrCompacted is handled inside SessionWatcher, which means there is some other error occurred, closing server.
logutil.Logger(s.ctx).Error("watch service channel closed", zap.Int64("serverID", s.session.ServerID)) logutil.Logger(s.ctx).Error("watch service channel closed", zap.Int64("serverID", paramtable.GetNodeID()))
go s.Stop() go s.Stop()
if s.session.TriggerKill { if s.session.TriggerKill {
if p, err := os.FindProcess(os.Getpid()); err == nil { if p, err := os.FindProcess(os.Getpid()); err == nil {

View File

@ -23,6 +23,8 @@ import (
"strconv" "strconv"
"sync" "sync"
"github.com/milvus-io/milvus/internal/common"
"github.com/samber/lo" "github.com/samber/lo"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.uber.org/zap" "go.uber.org/zap"
@ -31,7 +33,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/msgpb" "github.com/milvus-io/milvus-proto/go-api/msgpb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
@ -588,11 +589,11 @@ func (s *Server) GetStateCode() commonpb.StateCode {
// GetComponentStates returns DataCoord's current state // GetComponentStates returns DataCoord's current state
func (s *Server) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) { func (s *Server) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
code := s.GetStateCode()
nodeID := common.NotRegisteredID nodeID := common.NotRegisteredID
if s.session != nil && s.session.Registered() { if s.session != nil && s.session.Registered() {
nodeID = s.session.ServerID // or Params.NodeID nodeID = s.session.ServerID // or Params.NodeID
} }
code := s.GetStateCode()
resp := &milvuspb.ComponentStates{ resp := &milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{ State: &milvuspb.ComponentInfo{
// NodeID: Params.NodeID, // will race with Server.Register() // NodeID: Params.NodeID, // will race with Server.Register()
@ -1432,7 +1433,7 @@ func (s *Server) BroadcastAlteredCollection(ctx context.Context, req *datapb.Alt
func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
if s.isClosed() { if s.isClosed() {
reason := errorutil.UnHealthReason("datacoord", s.session.ServerID, "datacoord is closed") reason := errorutil.UnHealthReason("datacoord", paramtable.GetNodeID(), "datacoord is closed")
return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: []string{reason}}, nil return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: []string{reason}}, nil
} }

View File

@ -191,7 +191,7 @@ func (s *Server) getSystemInfoMetrics(
CreatedTime: paramtable.GetCreateTime().String(), CreatedTime: paramtable.GetCreateTime().String(),
UpdatedTime: paramtable.GetUpdateTime().String(), UpdatedTime: paramtable.GetUpdateTime().String(),
Type: typeutil.QueryCoordRole, Type: typeutil.QueryCoordRole,
ID: s.session.ServerID, ID: paramtable.GetNodeID(),
}, },
SystemConfigurations: metricsinfo.QueryCoordConfiguration{ SystemConfigurations: metricsinfo.QueryCoordConfiguration{
SearchChannelPrefix: Params.CommonCfg.QueryCoordSearch.GetValue(), SearchChannelPrefix: Params.CommonCfg.QueryCoordSearch.GetValue(),

View File

@ -25,6 +25,8 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/metrics"
@ -134,7 +136,7 @@ func (s *Server) Register() error {
} }
} }
go s.session.LivenessCheck(s.ctx, func() { go s.session.LivenessCheck(s.ctx, func() {
log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.ServerID)) log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", paramtable.GetNodeID()))
if err := s.Stop(); err != nil { if err := s.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err)) log.Fatal("failed to stop server", zap.Error(err))
} }
@ -588,7 +590,7 @@ func (s *Server) watchNodes(revision int64) {
case event, ok := <-eventChan: case event, ok := <-eventChan:
if !ok { if !ok {
// ErrCompacted is handled inside SessionWatcher // ErrCompacted is handled inside SessionWatcher
log.Error("Session Watcher channel closed", zap.Int64("serverID", s.session.ServerID)) log.Error("Session Watcher channel closed", zap.Int64("serverID", paramtable.GetNodeID()))
go s.Stop() go s.Stop()
if s.session.TriggerKill { if s.session.TriggerKill {
if p, err := os.FindProcess(os.Getpid()); err == nil { if p, err := os.FindProcess(os.Getpid()); err == nil {

View File

@ -947,7 +947,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
if s.status.Load() != commonpb.StateCode_Healthy { if s.status.Load() != commonpb.StateCode_Healthy {
reason := errorutil.UnHealthReason("querycoord", s.session.ServerID, "querycoord is unhealthy") reason := errorutil.UnHealthReason("querycoord", paramtable.GetNodeID(), "querycoord is unhealthy")
return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: []string{reason}}, nil return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: []string{reason}}, nil
} }