Fix nodeID mismatch at standalone mode (#20648)

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
pull/20393/head
Enwei Jiao 2022-11-17 17:15:08 +08:00 committed by GitHub
parent 1177ed1a33
commit 19524a5344
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 63 additions and 57 deletions

View File

@ -82,7 +82,7 @@ func (r *Runner) init() {
r.initEtcdCli()
r.session = sessionutil.NewSession(r.ctx, r.cfg.EtcdCfg.MetaRootPath, r.etcdCli,
sessionutil.WithCustomConfigEnable(), sessionutil.WithSessionTTL(60), sessionutil.WithSessionRetryTimes(30))
sessionutil.WithCustomConfigEnable(), sessionutil.WithTTL(60), sessionutil.WithRetryTimes(30))
// address not important here.
address := time.Now().String()
r.address = address

View File

@ -244,7 +244,6 @@ func (s *Server) initSession() error {
}
s.session.Init(typeutil.DataCoordRole, s.address, true, true)
s.session.SetEnableActiveStandBy(s.enableActiveStandBy)
paramtable.SetNodeID(s.session.ServerID)
return nil
}

View File

@ -216,7 +216,6 @@ func (node *DataNode) initSession() error {
return errors.New("failed to initialize session")
}
node.session.Init(typeutil.DataNodeRole, node.address, false, true)
paramtable.SetNodeID(node.session.ServerID)
return nil
}

View File

@ -79,9 +79,8 @@ type IndexCoord struct {
loopCancel func()
loopWg sync.WaitGroup
sched *TaskScheduler
session *sessionutil.Session
serverID UniqueID
sched *TaskScheduler
session *sessionutil.Session
eventChan <-chan *sessionutil.SessionEvent
@ -164,7 +163,6 @@ func (i *IndexCoord) initSession() error {
}
i.session.Init(typeutil.IndexCoordRole, i.address, true, true)
i.session.SetEnableActiveStandBy(i.enableActiveStandBy)
i.serverID = i.session.ServerID
return nil
}
@ -433,10 +431,10 @@ func (i *IndexCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.String
// indexBuilder will find this task and assign it to IndexNode for execution.
func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgIndexCoordIsUnhealthy(i.serverID),
Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()),
}, nil
}
log.Info("IndexCoord receive create index request", zap.Int64("CollectionID", req.CollectionID),
@ -495,11 +493,11 @@ func (i *IndexCoord) GetIndexState(ctx context.Context, req *indexpb.GetIndexSta
zap.String("indexName", req.IndexName))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID()))
return &indexpb.GetIndexStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgIndexCoordIsUnhealthy(i.serverID),
Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()),
},
}, nil
}
@ -546,11 +544,11 @@ func (i *IndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetS
zap.String("indexName", req.IndexName), zap.Int64s("segIDs", req.GetSegmentIDs()))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID()))
return &indexpb.GetSegmentIndexStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgIndexCoordIsUnhealthy(i.serverID),
Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()),
},
}, nil
}
@ -634,11 +632,11 @@ func (i *IndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.Get
log.RatedInfo(5, "IndexCoord receive GetIndexBuildProgress request", zap.Int64("collID", req.CollectionID),
zap.String("indexName", req.IndexName))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID()))
return &indexpb.GetIndexBuildProgressResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgIndexCoordIsUnhealthy(i.serverID),
Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()),
},
}, nil
}
@ -710,10 +708,10 @@ func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexReques
log.Info("IndexCoord DropIndex", zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", req.PartitionIDs), zap.String("indexName", req.IndexName))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgIndexCoordIsUnhealthy(i.serverID),
Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()),
}, nil
}
@ -777,11 +775,11 @@ func (i *IndexCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInf
log.RatedInfo(5, "IndexCoord GetIndexInfos", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.GetIndexName()), zap.Int64s("segIDs", req.GetSegmentIDs()))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID()))
return &indexpb.GetIndexInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgIndexCoordIsUnhealthy(i.serverID),
Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()),
},
SegmentInfo: nil,
}, nil
@ -836,11 +834,11 @@ func (i *IndexCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInf
func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) {
log.RatedInfo(5, "IndexCoord DescribeIndex", zap.Int64("collectionID", req.CollectionID), zap.String("indexName", req.GetIndexName()))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID()))
return &indexpb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgIndexCoordIsUnhealthy(i.serverID),
Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()),
},
IndexInfos: nil,
}, nil
@ -937,7 +935,7 @@ func (i *IndexCoord) ShowConfigurations(ctx context.Context, req *internalpb.Sho
log.Info("IndexCoord.ShowConfigurations", zap.String("pattern", req.Pattern))
if !i.isHealthy() {
log.Warn("IndexCoord.ShowConfigurations failed",
zap.Int64("nodeId", i.serverID),
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.String("req", req.Pattern),
zap.Error(errIndexCoordIsUnhealthy(paramtable.GetNodeID())))
@ -955,15 +953,15 @@ func (i *IndexCoord) ShowConfigurations(ctx context.Context, req *internalpb.Sho
// GetMetrics gets the metrics info of IndexCoord.
func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log.RatedInfo(5, "IndexCoord.GetMetrics", zap.Int64("node id", i.serverID), zap.String("req", req.Request))
log.RatedInfo(5, "IndexCoord.GetMetrics", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("req", req.Request))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID()))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgIndexCoordIsUnhealthy(i.serverID),
Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()),
},
Response: "",
}, nil

View File

@ -530,9 +530,7 @@ func TestIndexCoord_GetComponentStates(t *testing.T) {
func TestIndexCoord_UnHealthy(t *testing.T) {
ctx := context.Background()
ic := &IndexCoord{
serverID: 1,
}
ic := &IndexCoord{}
ic.stateCode.Store(commonpb.StateCode_Abnormal)
// Test IndexCoord function

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
type task interface {
@ -160,7 +161,7 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error {
commonpbutil.WithMsgType(0),
commonpbutil.WithMsgID(cit.indexID),
commonpbutil.WithTimeStamp(cit.req.Timestamp),
commonpbutil.WithSourceID(cit.indexCoordClient.serverID),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
CollectionID: cit.req.CollectionID,
PartitionID: -1,

View File

@ -176,7 +176,6 @@ func (i *IndexNode) initSession() error {
return errors.New("failed to initialize session")
}
i.session.Init(typeutil.IndexNodeRole, i.address, false, true)
paramtable.SetNodeID(i.session.ServerID)
return nil
}

View File

@ -155,7 +155,6 @@ func (node *Proxy) initSession() error {
return errors.New("new session failed, maybe etcd cannot be connected")
}
node.session.Init(typeutil.ProxyRole, node.address, false, true)
paramtable.SetNodeID(node.session.ServerID)
return nil
}

View File

@ -49,7 +49,6 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
@ -158,7 +157,6 @@ func (s *Server) Init() error {
s.session.Init(typeutil.QueryCoordRole, s.address, true, true)
s.enableActiveStandBy = Params.QueryCoordCfg.EnableActiveStandby
s.session.SetEnableActiveStandBy(s.enableActiveStandBy)
paramtable.SetNodeID(s.session.ServerID)
s.factory.Init(Params)
// Init KV

View File

@ -149,7 +149,6 @@ func (node *QueryNode) initSession() error {
return fmt.Errorf("session is nil, the etcd client connection may have failed")
}
node.session.Init(typeutil.QueryNodeRole, node.address, false, true)
paramtable.SetNodeID(node.session.ServerID)
return nil
}

View File

@ -104,6 +104,7 @@ type Session struct {
useCustomConfig bool
sessionTTL int64
sessionRetryTimes int64
reuseNodeID bool
}
type SessionOption func(session *Session)
@ -112,14 +113,18 @@ func WithCustomConfigEnable() SessionOption {
return func(session *Session) { session.useCustomConfig = true }
}
func WithSessionTTL(ttl int64) SessionOption {
func WithTTL(ttl int64) SessionOption {
return func(session *Session) { session.sessionTTL = ttl }
}
func WithSessionRetryTimes(n int64) SessionOption {
func WithRetryTimes(n int64) SessionOption {
return func(session *Session) { session.sessionRetryTimes = n }
}
func WithResueNodeID(b bool) SessionOption {
return func(session *Session) { session.reuseNodeID = b }
}
func (s *Session) apply(opts ...SessionOption) {
for _, opt := range opts {
opt(s)
@ -184,12 +189,15 @@ func (s *Session) MarshalJSON() ([]byte, error) {
// etcdEndpoints is to init etcdCli when NewSession
func NewSession(ctx context.Context, metaRoot string, client *clientv3.Client, opts ...SessionOption) *Session {
session := &Session{
ctx: ctx,
metaRoot: metaRoot,
Version: common.Version,
ctx: ctx,
metaRoot: metaRoot,
Version: common.Version,
// options
useCustomConfig: false,
sessionTTL: 60,
sessionRetryTimes: 30,
reuseNodeID: true,
}
session.apply(opts...)
@ -241,6 +249,7 @@ func (s *Session) String() string {
func (s *Session) Register() {
ch, err := s.registerService()
if err != nil {
log.Error("Register failed", zap.Error(err))
panic(err)
}
s.liveCh = s.processKeepAliveResponse(ch)
@ -253,12 +262,21 @@ func (s *Session) getServerID() (int64, error) {
serverIDMu.Lock()
defer serverIDMu.Unlock()
// Notice, For standalone, all process share the same nodeID.
nodeID := paramtable.GetNodeID()
if nodeID != 0 {
return nodeID, nil
log.Debug("getServerID", zap.Bool("reuse", s.reuseNodeID))
if s.reuseNodeID {
// Notice, For standalone, all process share the same nodeID.
if nodeID := paramtable.GetNodeID(); nodeID != 0 {
return nodeID, nil
}
}
return s.getServerIDWithKey(DefaultIDKey)
nodeID, err := s.getServerIDWithKey(DefaultIDKey)
if err != nil {
return nodeID, err
}
if s.reuseNodeID {
paramtable.SetNodeID(nodeID)
}
return nodeID, nil
}
func (s *Session) checkIDExist() {

View File

@ -73,9 +73,7 @@ func TestGetServerIDConcurrently(t *testing.T) {
go getIDFunc()
}
wg.Wait()
for i := 1; i <= 10; i++ {
assert.Contains(t, res, int64(i))
}
assert.ElementsMatch(t, []int64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, res)
}
func TestInit(t *testing.T) {
@ -125,7 +123,7 @@ func TestUpdateSessions(t *testing.T) {
var wg sync.WaitGroup
var muList = sync.Mutex{}
s := NewSession(ctx, metaRoot, etcdCli)
s := NewSession(ctx, metaRoot, etcdCli, WithResueNodeID(false))
sessions, rev, err := s.GetSessions("test")
assert.Nil(t, err)
@ -137,7 +135,7 @@ func TestUpdateSessions(t *testing.T) {
getIDFunc := func() {
etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints)
require.NoError(t, err)
singleS := NewSession(ctx, metaRoot, etcdCli)
singleS := NewSession(ctx, metaRoot, etcdCli, WithResueNodeID(false))
singleS.Init("test", "testAddr", false, false)
singleS.Register()
muList.Lock()
@ -498,21 +496,21 @@ func (suite *SessionWithVersionSuite) SetupTest() {
suite.metaRoot = "sessionWithVersion"
suite.serverName = "sessionComp"
s1 := NewSession(ctx, suite.metaRoot, client)
s1 := NewSession(ctx, suite.metaRoot, client, WithResueNodeID(false))
s1.Version.Major, s1.Version.Minor, s1.Version.Patch = 0, 0, 0
s1.Init(suite.serverName, "s1", false, false)
s1.Register()
suite.sessions = append(suite.sessions, s1)
s2 := NewSession(ctx, suite.metaRoot, client)
s2 := NewSession(ctx, suite.metaRoot, client, WithResueNodeID(false))
s2.Version.Major, s2.Version.Minor, s2.Version.Patch = 2, 1, 0
s2.Init(suite.serverName, "s2", false, false)
s2.Register()
suite.sessions = append(suite.sessions, s2)
s3 := NewSession(ctx, suite.metaRoot, client)
s3 := NewSession(ctx, suite.metaRoot, client, WithResueNodeID(false))
s3.Version.Major, s3.Version.Minor, s3.Version.Patch = 2, 2, 0
s3.Version.Build = []string{"dev"}
s3.Init(suite.serverName, "s3", false, false)
@ -538,7 +536,7 @@ func (suite *SessionWithVersionSuite) TearDownTest() {
}
func (suite *SessionWithVersionSuite) TestGetSessionsWithRangeVersion() {
s := NewSession(context.Background(), suite.metaRoot, suite.client)
s := NewSession(context.Background(), suite.metaRoot, suite.client, WithResueNodeID(false))
suite.Run(">1.0.0", func() {
r, err := semver.ParseRange(">1.0.0")
@ -581,7 +579,7 @@ func (suite *SessionWithVersionSuite) TestGetSessionsWithRangeVersion() {
}
func (suite *SessionWithVersionSuite) TestWatchServicesWithVersionRange() {
s := NewSession(context.Background(), suite.metaRoot, suite.client)
s := NewSession(context.Background(), suite.metaRoot, suite.client, WithResueNodeID(false))
suite.Run(">1.0.0 <=2.1.0", func() {
r, err := semver.ParseRange(">1.0.0 <=2.1.0")
@ -641,7 +639,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) {
// register session 1, will be active
ctx1 := context.Background()
s1 := NewSession(ctx1, metaRoot, etcdCli)
s1 := NewSession(ctx1, metaRoot, etcdCli, WithResueNodeID(false))
s1.Init("inittest", "testAddr", true, true)
s1.SetEnableActiveStandBy(true)
s1.Register()
@ -660,7 +658,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) {
// register session 2, will be standby
ctx2 := context.Background()
s2 := NewSession(ctx2, metaRoot, etcdCli)
s2 := NewSession(ctx2, metaRoot, etcdCli, WithResueNodeID(false))
s2.Init("inittest", "testAddr", true, true)
s2.SetEnableActiveStandBy(true)
s2.Register()
@ -704,7 +702,7 @@ func TestSessionEventType_String(t *testing.T) {
func TestSession_apply(t *testing.T) {
session := &Session{}
opts := []SessionOption{WithCustomConfigEnable(), WithSessionTTL(100), WithSessionRetryTimes(200)}
opts := []SessionOption{WithCustomConfigEnable(), WithTTL(100), WithRetryTimes(200)}
session.apply(opts...)
assert.True(t, session.useCustomConfig)
assert.Equal(t, int64(100), session.sessionTTL)