Fix the nil point about the session (#22696)

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/22723/head
SimFG 2023-03-10 21:43:54 +08:00 committed by GitHub
parent bfe6b24565
commit b2ece6a569
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 33 additions and 36 deletions

4
go.mod
View File

@ -11,7 +11,7 @@ require (
github.com/antonmedv/expr v1.8.9
github.com/apache/arrow/go/v8 v8.0.0-20220322092137-778b1772fd20
github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7
github.com/apache/thrift v0.15.0
github.com/apache/thrift v0.15.0 // indirect
github.com/bits-and-blooms/bloom/v3 v3.0.1
github.com/casbin/casbin/v2 v2.44.2
github.com/casbin/json-adapter/v2 v2.0.0
@ -183,8 +183,6 @@ require (
sigs.k8s.io/yaml v1.2.0 // indirect
)
require github.com/golang/mock v1.5.0
require (
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.0.1 // indirect

2
go.sum
View File

@ -491,8 +491,6 @@ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyex
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-2c1f3aadc378 h1:ttJp/ZUB/3GGbd2mIbASSfdOiBUrkP50gn5gDgCsD0g=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-2c1f3aadc378/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230301092744-7efc6eec15fd h1:9ilgTEqZSdEPbJKSrRGB1TIHTaF7DqVDIwn8/azcaBk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230301092744-7efc6eec15fd/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100=

View File

@ -1472,7 +1472,7 @@ func (s *Server) BroadcastAlteredCollection(ctx context.Context, req *datapb.Alt
func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
if s.isClosed() {
reason := errorutil.UnHealthReason("datacoord", s.session.ServerID, "datacoord is closed")
reason := errorutil.UnHealthReason("datacoord", Params.DataCoordCfg.GetNodeID(), "datacoord is closed")
return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: []string{reason}}, nil
}

View File

@ -765,7 +765,7 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
if !node.isHealthy() {
log.Warn("DataNode.GetMetrics failed",
zap.Error(errDataNodeIsUnhealthy(node.session.ServerID)))
zap.Error(errDataNodeIsUnhealthy(Params.DataNodeCfg.GetNodeID())))
resp := &milvuspb.GetMetricsResponse{Status: &commonpb.Status{}}
setNotServingStatus(resp.Status, node.GetStateCode())

View File

@ -56,6 +56,6 @@ func errSegmentNotFound(segID UniqueID) error {
func setNotServingStatus(status *commonpb.Status, stateCode commonpb.StateCode) {
reason := fmt.Sprintf("sate code: %s", stateCode.String())
status.Reason = errorutil.NotServingReason(typeutil.IndexCoordRole, Params.DataCoordCfg.GetNodeID(), reason)
status.Reason = errorutil.NotServingReason(typeutil.IndexCoordRole, Params.IndexCoordCfg.GetNodeID(), reason)
status.ErrorCode = commonpb.ErrorCode_NotReadyServe
}

View File

@ -165,6 +165,7 @@ func (i *IndexCoord) initSession() error {
i.session.SetEnableActiveStandBy(i.enableActiveStandBy)
Params.SetLogger(i.session.ServerID)
i.serverID = i.session.ServerID
Params.IndexCoordCfg.SetNodeID(i.serverID)
return nil
}
@ -951,7 +952,7 @@ func (i *IndexCoord) ShowConfigurations(ctx context.Context, req *internalpb.Sho
log.Warn("IndexCoord.ShowConfigurations failed",
zap.Int64("nodeId", i.serverID),
zap.String("req", req.Pattern),
zap.Error(errIndexCoordIsUnhealthy(Params.QueryNodeCfg.GetNodeID())))
zap.Error(errIndexCoordIsUnhealthy(i.serverID)))
ret := &internalpb.ShowConfigurationsResponse{Status: &commonpb.Status{}}
setNotServingStatus(ret.GetStatus(), i.GetStateCode())
@ -1024,7 +1025,7 @@ func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReq
func (i *IndexCoord) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
if !i.isHealthy() {
reason := errorutil.UnHealthReason("indexcoord", i.session.ServerID, "indexcoord is unhealthy")
reason := errorutil.UnHealthReason("indexcoord", i.serverID, "indexcoord is unhealthy")
return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: []string{reason}}, nil
}

View File

@ -68,7 +68,7 @@ func (s *Server) GetStateCode() commonpb.StateCode {
func (s *Server) NotReadyServeResp(status *commonpb.Status) {
status.ErrorCode = commonpb.ErrorCode_NotReadyServe
status.Reason = errorutil.NotServingReason(typeutil.QueryCoordRole, Params.QueryNodeCfg.GetNodeID(), s.GetStateCode().String())
status.Reason = errorutil.NotServingReason(typeutil.QueryCoordRole, Params.QueryCoordCfg.GetNodeID(), s.GetStateCode().String())
}
func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
@ -964,7 +964,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
if s.status.Load() != commonpb.StateCode_Healthy {
reason := errorutil.UnHealthReason("querycoord", s.session.ServerID, "querycoord is unhealthy")
reason := errorutil.UnHealthReason("querycoord", Params.QueryCoordCfg.GetNodeID(), "querycoord is unhealthy")
return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: []string{reason}}, nil
}

View File

@ -77,7 +77,7 @@ func (node *QueryNode) GetComponentStates(ctx context.Context) (*milvuspb.Compon
}
nodeID := common.NotRegisteredID
if node.session != nil && node.session.Registered() {
nodeID = node.session.ServerID
nodeID = Params.QueryNodeCfg.GetNodeID()
}
info := &milvuspb.ComponentInfo{
NodeID: nodeID,
@ -308,17 +308,17 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *querypb.WatchDmC
defer node.wg.Done()
// check target matches
if in.GetBase().GetTargetID() != node.session.ServerID {
if in.GetBase().GetTargetID() != Params.QueryNodeCfg.GetNodeID() {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: common.WrapNodeIDNotMatchMsg(in.GetBase().GetTargetID(), node.session.ServerID),
Reason: common.WrapNodeIDNotMatchMsg(in.GetBase().GetTargetID(), Params.QueryNodeCfg.GetNodeID()),
}
return status, nil
}
log := log.With(
zap.Int64("collectionID", in.GetCollectionID()),
zap.Int64("nodeID", node.session.ServerID),
zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()),
zap.Strings("channels", lo.Map(in.GetInfos(), func(info *datapb.VchannelInfo, _ int) string {
return info.GetChannelName()
})),
@ -399,10 +399,10 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
defer node.wg.Done()
// check target matches
if req.GetBase().GetTargetID() != node.session.ServerID {
if req.GetBase().GetTargetID() != Params.QueryNodeCfg.GetNodeID() {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), node.session.ServerID),
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), Params.QueryNodeCfg.GetNodeID()),
}
return status, nil
}
@ -457,10 +457,10 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *querypb.LoadSegment
defer node.wg.Done()
// check target matches
if in.GetBase().GetTargetID() != node.session.ServerID {
if in.GetBase().GetTargetID() != Params.QueryNodeCfg.GetNodeID() {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: common.WrapNodeIDNotMatchMsg(in.GetBase().GetTargetID(), node.session.ServerID),
Reason: common.WrapNodeIDNotMatchMsg(in.GetBase().GetTargetID(), Params.QueryNodeCfg.GetNodeID()),
}
return status, nil
}
@ -630,10 +630,10 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *querypb.ReleaseS
defer node.wg.Done()
// check target matches
if in.GetBase().GetTargetID() != node.session.ServerID {
if in.GetBase().GetTargetID() != Params.QueryNodeCfg.GetNodeID() {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: common.WrapNodeIDNotMatchMsg(in.GetBase().GetTargetID(), node.session.ServerID),
Reason: common.WrapNodeIDNotMatchMsg(in.GetBase().GetTargetID(), Params.QueryNodeCfg.GetNodeID()),
}
return status, nil
}
@ -713,13 +713,13 @@ func filterSegmentInfo(segmentInfos []*querypb.SegmentInfo, segmentIDs map[int64
// Search performs replica search tasks.
func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
if !node.IsStandAlone && req.GetReq().GetBase().GetTargetID() != node.session.ServerID {
if !node.IsStandAlone && req.GetReq().GetBase().GetTargetID() != Params.QueryNodeCfg.GetNodeID() {
return &internalpb.SearchResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: fmt.Sprintf("QueryNode %d can't serve, recovering: %s",
node.session.ServerID,
common.WrapNodeIDNotMatchMsg(req.GetReq().GetBase().GetTargetID(), node.session.ServerID)),
Params.QueryNodeCfg.GetNodeID(),
common.WrapNodeIDNotMatchMsg(req.GetReq().GetBase().GetTargetID(), Params.QueryNodeCfg.GetNodeID())),
},
}, nil
}
@ -1051,13 +1051,13 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i
zap.Int64s("partitionIDs", req.GetReq().GetPartitionIDs()),
)
if req.GetReq().GetBase().GetTargetID() != node.session.ServerID {
if req.GetReq().GetBase().GetTargetID() != Params.QueryNodeCfg.GetNodeID() {
return &internalpb.RetrieveResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: fmt.Sprintf("QueryNode %d can't serve, recovering: %s",
node.session.ServerID,
common.WrapNodeIDNotMatchMsg(req.GetReq().GetBase().GetTargetID(), node.session.ServerID)),
Params.QueryNodeCfg.GetNodeID(),
common.WrapNodeIDNotMatchMsg(req.GetReq().GetBase().GetTargetID(), Params.QueryNodeCfg.GetNodeID())),
},
}, nil
}
@ -1152,7 +1152,7 @@ func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.Syn
func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
if !commonpbutil.IsHealthyOrStopping(node.stateCode) {
log.Warn("QueryNode.ShowConfigurations failed",
zap.Int64("nodeID", node.session.ServerID),
zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()),
zap.String("req", req.Pattern),
zap.Error(errQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())))
@ -1245,12 +1245,12 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
defer node.wg.Done()
// check target matches
if req.GetBase().GetTargetID() != node.session.ServerID {
if req.GetBase().GetTargetID() != Params.QueryNodeCfg.GetNodeID() {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: fmt.Sprintf("QueryNode %d can't serve, recovering: %s",
node.session.ServerID,
common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), node.session.ServerID)),
Params.QueryNodeCfg.GetNodeID(),
common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), Params.QueryNodeCfg.GetNodeID())),
}
return &querypb.GetDataDistributionResponse{Status: status}, nil
}
@ -1312,7 +1312,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
return &querypb.GetDataDistributionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
NodeID: node.session.ServerID,
NodeID: Params.QueryNodeCfg.GetNodeID(),
Segments: segmentVersionInfos,
Channels: channelVersionInfos,
LeaderViews: leaderViews,
@ -1331,11 +1331,11 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
defer node.wg.Done()
// check target matches
if req.GetBase().GetTargetID() != node.session.ServerID {
log.Warn("failed to do match target id when sync ", zap.Int64("expect", req.GetBase().GetTargetID()), zap.Int64("actual", node.session.ServerID))
if req.GetBase().GetTargetID() != Params.QueryNodeCfg.GetNodeID() {
log.Warn("failed to do match target id when sync ", zap.Int64("expect", req.GetBase().GetTargetID()), zap.Int64("actual", Params.QueryNodeCfg.GetNodeID()))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), node.session.ServerID),
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), Params.QueryNodeCfg.GetNodeID()),
}
return status, nil
}