Optimize the codec code of session (#27360)

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
pull/27460/head
Jiquan Long 2023-10-01 10:33:30 +08:00 committed by GitHub
parent 7d0dd0047d
commit 0f14d18201
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 119 additions and 143 deletions

View File

@ -17,8 +17,10 @@ func Test_IndexEngineVersionManager_GetMergedIndexVersion(t *testing.T) {
// startup
m.Startup(map[string]*sessionutil.Session{
"1": {
ServerID: 1,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 20, MinimalIndexVersion: 0},
SessionRaw: sessionutil.SessionRaw{
ServerID: 1,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 20, MinimalIndexVersion: 0},
},
},
})
assert.Equal(t, int32(20), m.GetCurrentIndexEngineVersion())
@ -26,24 +28,30 @@ func Test_IndexEngineVersionManager_GetMergedIndexVersion(t *testing.T) {
// add node
m.AddNode(&sessionutil.Session{
ServerID: 2,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 10, MinimalIndexVersion: 5},
SessionRaw: sessionutil.SessionRaw{
ServerID: 2,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 10, MinimalIndexVersion: 5},
},
})
assert.Equal(t, int32(10), m.GetCurrentIndexEngineVersion())
assert.Equal(t, int32(5), m.GetMinimalIndexEngineVersion())
// update
m.Update(&sessionutil.Session{
ServerID: 2,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 2},
SessionRaw: sessionutil.SessionRaw{
ServerID: 2,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 2},
},
})
assert.Equal(t, int32(5), m.GetCurrentIndexEngineVersion())
assert.Equal(t, int32(2), m.GetMinimalIndexEngineVersion())
// remove
m.RemoveNode(&sessionutil.Session{
ServerID: 2,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 3},
SessionRaw: sessionutil.SessionRaw{
ServerID: 2,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 3},
},
})
assert.Equal(t, int32(20), m.GetCurrentIndexEngineVersion())
assert.Equal(t, int32(0), m.GetMinimalIndexEngineVersion())

View File

@ -41,7 +41,7 @@ import (
)
func TestServerId(t *testing.T) {
s := &Server{session: &sessionutil.Session{ServerID: 0}}
s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 0}}}
assert.Equal(t, int64(0), s.serverID())
}

View File

@ -935,7 +935,7 @@ func TestService_WatchServices(t *testing.T) {
factory := dependency.NewDefaultFactory(true)
svr := CreateServer(context.TODO(), factory)
svr.session = &sessionutil.Session{
TriggerKill: true,
SessionRaw: sessionutil.SessionRaw{TriggerKill: true},
}
svr.serverLoopWg.Add(1)
@ -3277,10 +3277,12 @@ func TestHandleSessionEvent(t *testing.T) {
evt := &sessionutil.SessionEvent{
EventType: sessionutil.SessionNoneEvent,
Session: &sessionutil.Session{
ServerID: 0,
ServerName: "",
Address: "",
Exclusive: false,
SessionRaw: sessionutil.SessionRaw{
ServerID: 0,
ServerName: "",
Address: "",
Exclusive: false,
},
},
}
err = svr.handleSessionEvent(context.Background(), typeutil.DataNodeRole, evt)
@ -3289,10 +3291,12 @@ func TestHandleSessionEvent(t *testing.T) {
evt = &sessionutil.SessionEvent{
EventType: sessionutil.SessionAddEvent,
Session: &sessionutil.Session{
ServerID: 101,
ServerName: "DN101",
Address: "DN127.0.0.101",
Exclusive: false,
SessionRaw: sessionutil.SessionRaw{
ServerID: 101,
ServerName: "DN101",
Address: "DN127.0.0.101",
Exclusive: false,
},
},
}
err = svr.handleSessionEvent(context.Background(), typeutil.DataNodeRole, evt)
@ -3304,10 +3308,12 @@ func TestHandleSessionEvent(t *testing.T) {
evt = &sessionutil.SessionEvent{
EventType: sessionutil.SessionDelEvent,
Session: &sessionutil.Session{
ServerID: 101,
ServerName: "DN101",
Address: "DN127.0.0.101",
Exclusive: false,
SessionRaw: sessionutil.SessionRaw{
ServerID: 101,
ServerName: "DN101",
Address: "DN127.0.0.101",
Exclusive: false,
},
},
}
err = svr.handleSessionEvent(context.Background(), typeutil.DataNodeRole, evt)
@ -4320,7 +4326,7 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server {
func Test_CheckHealth(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
ctx := context.Background()
s := &Server{session: &sessionutil.Session{ServerID: 1}}
s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
s.stateCode.Store(commonpb.StateCode_Abnormal)
resp, err := s.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
@ -4329,7 +4335,7 @@ func Test_CheckHealth(t *testing.T) {
})
t.Run("data node health check is ok", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{ServerID: 1}}
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
healthClient := &mockDataNodeClient{
id: 1,
@ -4355,7 +4361,7 @@ func Test_CheckHealth(t *testing.T) {
})
t.Run("data node health check is fail", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{ServerID: 1}}
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
unhealthClient := &mockDataNodeClient{
id: 1,

View File

@ -156,7 +156,7 @@ func TestDataNode(t *testing.T) {
t.Run("Test getSystemInfoMetrics", func(t *testing.T) {
emptyNode := &DataNode{}
emptyNode.SetSession(&sessionutil.Session{ServerID: 1})
emptyNode.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
emptyNode.flowgraphManager = newFlowgraphManager()
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
@ -171,7 +171,7 @@ func TestDataNode(t *testing.T) {
t.Run("Test getSystemInfoMetrics with quotaMetric error", func(t *testing.T) {
emptyNode := &DataNode{}
emptyNode.SetSession(&sessionutil.Session{ServerID: 1})
emptyNode.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
emptyNode.flowgraphManager = newFlowgraphManager()
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)

View File

@ -79,7 +79,7 @@ var emptyFlushAndDropFunc flushAndDropFunc = func(_ []*segmentFlushPack) {}
func newIDLEDataNodeMock(ctx context.Context, pkType schemapb.DataType) *DataNode {
factory := dependency.NewDefaultFactory(true)
node := NewDataNode(ctx, factory)
node.SetSession(&sessionutil.Session{ServerID: 1})
node.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
node.dispClient = msgdispatcher.NewClient(factory, typeutil.DataNodeRole, paramtable.GetNodeID())
rc := &RootCoordFactory{

View File

@ -321,7 +321,7 @@ func (s *DataNodeServicesSuite) TestShowConfigurations() {
// test closed server
node := &DataNode{}
node.SetSession(&sessionutil.Session{ServerID: 1})
node.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
node.stateCode.Store(commonpb.StateCode_Abnormal)
resp, err := node.ShowConfigurations(s.ctx, req)
@ -338,7 +338,7 @@ func (s *DataNodeServicesSuite) TestShowConfigurations() {
func (s *DataNodeServicesSuite) TestGetMetrics() {
node := &DataNode{}
node.SetSession(&sessionutil.Session{ServerID: 1})
node.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
node.flowgraphManager = newFlowgraphManager()
// server is closed
node.stateCode.Store(commonpb.StateCode_Abnormal)

View File

@ -222,8 +222,10 @@ func TestConnectionManager_processEvent(t *testing.T) {
cm := &ConnectionManager{
closeCh: make(chan struct{}),
session: &sessionutil.Session{
ServerID: 1,
TriggerKill: true,
SessionRaw: sessionutil.SessionRaw{
ServerID: 1,
TriggerKill: true,
},
},
}

View File

@ -63,7 +63,7 @@ func TestProxy_InvalidateCollectionMetaCache_remove_stream(t *testing.T) {
func TestProxy_CheckHealth(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{ServerID: 1}}
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Abnormal)
ctx := context.Background()
@ -80,7 +80,7 @@ func TestProxy_CheckHealth(t *testing.T) {
rootCoord: NewRootCoordMock(),
queryCoord: qc,
dataCoord: NewDataCoordMock(),
session: &sessionutil.Session{ServerID: 1},
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Healthy)
@ -108,7 +108,7 @@ func TestProxy_CheckHealth(t *testing.T) {
qc := &mocks.MockQueryCoordClient{}
qc.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(nil, errors.New("test"))
node := &Proxy{
session: &sessionutil.Session{ServerID: 1},
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
rootCoord: NewRootCoordMock(func(mock *RootCoordMock) {
mock.checkHealthFunc = checkHealthFunc1
}),
@ -159,7 +159,7 @@ func TestProxy_CheckHealth(t *testing.T) {
func TestProxyRenameCollection(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{ServerID: 1}}
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{})
@ -168,7 +168,7 @@ func TestProxyRenameCollection(t *testing.T) {
})
t.Run("rename with illegal new collection name", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{ServerID: 1}}
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Healthy)
ctx := context.Background()
resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{NewName: "$#^%#&#$*!)#@!"})
@ -181,7 +181,7 @@ func TestProxyRenameCollection(t *testing.T) {
rc.On("RenameCollection", mock.Anything, mock.Anything).
Return(nil, errors.New("fail"))
node := &Proxy{
session: &sessionutil.Session{ServerID: 1},
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
rootCoord: rc,
}
node.stateCode.Store(commonpb.StateCode_Healthy)
@ -197,7 +197,7 @@ func TestProxyRenameCollection(t *testing.T) {
rc.On("RenameCollection", mock.Anything, mock.Anything).
Return(merr.Status(nil), nil)
node := &Proxy{
session: &sessionutil.Session{ServerID: 1},
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
rootCoord: rc,
}
node.stateCode.Store(commonpb.StateCode_Healthy)
@ -884,7 +884,7 @@ func TestProxyCreateDatabase(t *testing.T) {
paramtable.Init()
t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{ServerID: 1}}
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{})
@ -938,7 +938,7 @@ func TestProxyDropDatabase(t *testing.T) {
paramtable.Init()
t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{ServerID: 1}}
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.DropDatabase(ctx, &milvuspb.DropDatabaseRequest{})
@ -992,7 +992,7 @@ func TestProxyListDatabase(t *testing.T) {
paramtable.Init()
t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{ServerID: 1}}
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{})

View File

@ -48,7 +48,7 @@ func TestProxy_metrics(t *testing.T) {
rootCoord: rc,
queryCoord: qc,
dataCoord: dc,
session: &sessionutil.Session{Address: funcutil.GenRandomStr()},
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{Address: funcutil.GenRandomStr()}},
}
rc.getMetricsFunc = func(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {

View File

@ -390,7 +390,7 @@ func newMockProxy() *mockProxy {
func newTestCore(opts ...Opt) *Core {
c := &Core{
session: &sessionutil.Session{ServerID: TestRootCoordID},
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}},
}
executor := newMockStepExecutor()
executor.AddStepsFunc = func(s *stepStack) {

View File

@ -118,8 +118,10 @@ func TestProxyClientManager_GetProxyClients(t *testing.T) {
pcm := newProxyClientManager(core.proxyCreator)
session := &sessionutil.Session{
ServerID: 100,
Address: "localhost",
SessionRaw: sessionutil.SessionRaw{
ServerID: 100,
Address: "localhost",
},
}
sessions := []*sessionutil.Session{session}
@ -150,8 +152,10 @@ func TestProxyClientManager_AddProxyClient(t *testing.T) {
pcm := newProxyClientManager(core.proxyCreator)
session := &sessionutil.Session{
ServerID: 100,
Address: "localhost",
SessionRaw: sessionutil.SessionRaw{
ServerID: 100,
Address: "localhost",
},
}
pcm.AddProxyClient(session)

View File

@ -53,7 +53,7 @@ func TestProxyManager(t *testing.T) {
etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix())
defer etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix())
s1 := sessionutil.Session{
ServerID: 100,
SessionRaw: sessionutil.SessionRaw{ServerID: 100},
}
b1, err := json.Marshal(&s1)
assert.NoError(t, err)
@ -62,7 +62,7 @@ func TestProxyManager(t *testing.T) {
assert.NoError(t, err)
s0 := sessionutil.Session{
ServerID: 99,
SessionRaw: sessionutil.SessionRaw{ServerID: 99},
}
b0, err := json.Marshal(&s0)
assert.NoError(t, err)
@ -94,7 +94,7 @@ func TestProxyManager(t *testing.T) {
t.Log("======== start watch proxy ==========")
s2 := sessionutil.Session{
ServerID: 101,
SessionRaw: sessionutil.SessionRaw{ServerID: 101},
}
b2, err := json.Marshal(&s2)
assert.NoError(t, err)

View File

@ -801,7 +801,7 @@ func TestRootCoord_UpdateChannelTimeTick(t *testing.T) {
defaultTs := Timestamp(101)
ticker := newRocksMqTtSynchronizer()
ticker.addSession(&sessionutil.Session{ServerID: source})
ticker.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: source}})
ctx := context.Background()
c := newTestCore(withHealthyCode(),
@ -1650,7 +1650,7 @@ func TestCore_sendMinDdlTsAsTt(t *testing.T) {
c.stateCode.Store(commonpb.StateCode_Healthy)
c.session.ServerID = TestRootCoordID
c.sendMinDdlTsAsTt() // no session.
ticker.addSession(&sessionutil.Session{ServerID: TestRootCoordID})
ticker.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}})
c.sendMinDdlTsAsTt()
sched.GetMinDdlTsFunc = func() Timestamp {
return typeutil.ZeroTimestamp
@ -1667,7 +1667,7 @@ func TestCore_sendMinDdlTsAsTt(t *testing.T) {
func TestCore_startTimeTickLoop(t *testing.T) {
ticker := newRocksMqTtSynchronizer()
ticker.addSession(&sessionutil.Session{ServerID: TestRootCoordID})
ticker.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}})
ddlManager := newMockDdlTsLockManager()
ddlManager.GetMinDdlTsFunc = func() Timestamp {
return 100

View File

@ -129,10 +129,10 @@ func TestMultiTimetickSync(t *testing.T) {
defer wg.Done()
// suppose this is rooit
ttSync.addSession(&sessionutil.Session{ServerID: 1})
ttSync.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
// suppose this is proxy1
ttSync.addSession(&sessionutil.Session{ServerID: 2})
ttSync.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 2}})
msg := &internalpb.ChannelTimeTickMsg{
Base: &commonpb.MsgBase{

View File

@ -83,6 +83,19 @@ type IndexEngineVersion struct {
CurrentIndexVersion int32 `json:"CurrentIndexVersion,omitempty"`
}
// SessionRaw the persistent part of Session.
type SessionRaw struct {
ServerID int64 `json:"ServerID,omitempty"`
ServerName string `json:"ServerName,omitempty"`
Address string `json:"Address,omitempty"`
Exclusive bool `json:"Exclusive,omitempty"`
Stopping bool `json:"Stopping,omitempty"`
TriggerKill bool
Version string `json:"Version"`
IndexEngineVersion IndexEngineVersion `json:"IndexEngineVersion,omitempty"`
LeaseID *clientv3.LeaseID `json:"LeaseID,omitempty"`
}
// Session is a struct to store service's session, including ServerID, ServerName,
// Address.
// Exclusive indicates that this server can only start one.
@ -94,20 +107,14 @@ type Session struct {
keepAliveCancel context.CancelFunc
keepAliveCtx context.Context
ServerID int64 `json:"ServerID,omitempty"`
ServerName string `json:"ServerName,omitempty"`
Address string `json:"Address,omitempty"`
Exclusive bool `json:"Exclusive,omitempty"`
Stopping bool `json:"Stopping,omitempty"`
TriggerKill bool
Version semver.Version `json:"Version,omitempty"`
IndexEngineVersion IndexEngineVersion `json:"IndexEngineVersion,omitempty"`
SessionRaw
Version semver.Version `json:"Version,omitempty"`
liveChOnce sync.Once
liveCh chan struct{}
etcdCli *clientv3.Client
leaseID *clientv3.LeaseID
watchSessionKeyCh clientv3.WatchChan
watchCancel atomic.Pointer[context.CancelFunc]
wg sync.WaitGroup
@ -156,78 +163,25 @@ func (s *Session) apply(opts ...SessionOption) {
// UnmarshalJSON unmarshal bytes to Session.
func (s *Session) UnmarshalJSON(data []byte) error {
var raw struct {
ServerID int64 `json:"ServerID,omitempty"`
ServerName string `json:"ServerName,omitempty"`
Address string `json:"Address,omitempty"`
Exclusive bool `json:"Exclusive,omitempty"`
Stopping bool `json:"Stopping,omitempty"`
TriggerKill bool
Version string `json:"Version"`
IndexEngineVersion string `json:"IndexEngineVersion,omitempty"`
LeaseID *clientv3.LeaseID `json:"LeaseID,omitempty"`
}
err := json.Unmarshal(data, &raw)
err := json.Unmarshal(data, &s.SessionRaw)
if err != nil {
return err
}
if raw.Version != "" {
s.Version, err = semver.Parse(raw.Version)
if s.SessionRaw.Version != "" {
s.Version, err = semver.Parse(s.SessionRaw.Version)
if err != nil {
return err
}
}
if raw.IndexEngineVersion != "" {
json.Unmarshal([]byte(raw.IndexEngineVersion), &s.IndexEngineVersion)
if err != nil {
return err
}
} else {
// set zero when queryNode not register knowhere version
s.IndexEngineVersion.MinimalIndexVersion = 0
s.IndexEngineVersion.CurrentIndexVersion = 0
}
s.ServerID = raw.ServerID
s.ServerName = raw.ServerName
s.Address = raw.Address
s.Exclusive = raw.Exclusive
s.Stopping = raw.Stopping
s.TriggerKill = raw.TriggerKill
s.leaseID = raw.LeaseID
return nil
}
// MarshalJSON marshals session to bytes.
func (s *Session) MarshalJSON() ([]byte, error) {
verStr := s.Version.String()
indexVerStr, err := json.Marshal(s.IndexEngineVersion)
if err != nil {
return nil, err
}
return json.Marshal(&struct {
ServerID int64 `json:"ServerID,omitempty"`
ServerName string `json:"ServerName,omitempty"`
Address string `json:"Address,omitempty"`
Exclusive bool `json:"Exclusive,omitempty"`
Stopping bool `json:"Stopping,omitempty"`
TriggerKill bool
Version string `json:"Version"`
IndexEngineVersion string `json:"IndexEngineVersion,omitempty"`
LeaseID *clientv3.LeaseID `json:"LeaseID,omitempty"`
}{
ServerID: s.ServerID,
ServerName: s.ServerName,
Address: s.Address,
Exclusive: s.Exclusive,
Stopping: s.Stopping,
TriggerKill: s.TriggerKill,
Version: verStr,
IndexEngineVersion: string(indexVerStr),
LeaseID: s.leaseID,
})
s.SessionRaw.Version = s.Version.String()
return json.Marshal(s.SessionRaw)
}
// NewSession is a helper to build Session object.
@ -443,7 +397,7 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er
log.Error("register service", zap.Error(err))
return err
}
s.leaseID = &resp.ID
s.LeaseID = &resp.ID
sessionJSON, err := json.Marshal(s)
if err != nil {
@ -516,21 +470,21 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes
err := retry.Do(s.ctx, func() error {
ctx, cancel := context.WithTimeout(s.keepAliveCtx, time.Second*10)
defer cancel()
resp, err := s.etcdCli.KeepAliveOnce(ctx, *s.leaseID)
resp, err := s.etcdCli.KeepAliveOnce(ctx, *s.LeaseID)
keepAliveOnceResp = resp
return err
}, retry.Attempts(3))
if err != nil {
log.Warn("fail to retry keepAliveOnce", zap.String("serverName", s.ServerName), zap.Int64("leaseID", int64(*s.leaseID)), zap.Error(err))
log.Warn("fail to retry keepAliveOnce", zap.String("serverName", s.ServerName), zap.Int64("LeaseID", int64(*s.LeaseID)), zap.Error(err))
s.safeCloseLiveCh()
return
}
log.Info("succeed to KeepAliveOnce", zap.String("serverName", s.ServerName), zap.Int64("leaseID", int64(*s.leaseID)), zap.Any("resp", keepAliveOnceResp))
log.Info("succeed to KeepAliveOnce", zap.String("serverName", s.ServerName), zap.Int64("LeaseID", int64(*s.LeaseID)), zap.Any("resp", keepAliveOnceResp))
var chNew <-chan *clientv3.LeaseKeepAliveResponse
keepAliveFunc := func() error {
var err1 error
chNew, err1 = s.etcdCli.KeepAlive(s.keepAliveCtx, *s.leaseID)
chNew, err1 = s.etcdCli.KeepAlive(s.keepAliveCtx, *s.LeaseID)
return err1
}
err = fnWithTimeout(keepAliveFunc, time.Second*10)
@ -627,7 +581,7 @@ func (s *Session) GetSessionsWithVersionRange(prefix string, r semver.Range) (ma
}
func (s *Session) GoingStop() error {
if s == nil || s.etcdCli == nil || s.leaseID == nil {
if s == nil || s.etcdCli == nil || s.LeaseID == nil {
return errors.New("the session hasn't been init")
}
@ -650,7 +604,7 @@ func (s *Session) GoingStop() error {
log.Error("fail to marshal the session", zap.String("key", completeKey))
return err
}
_, err = s.etcdCli.Put(s.ctx, completeKey, string(sessionJSON), clientv3.WithLease(*s.leaseID))
_, err = s.etcdCli.Put(s.ctx, completeKey, string(sessionJSON), clientv3.WithLease(*s.LeaseID))
if err != nil {
log.Error("fail to update the session to stopping state", zap.String("key", completeKey))
return err
@ -906,12 +860,12 @@ func (s *Session) Stop() {
s.wg.Wait()
}
// Revoke revokes the internal leaseID for the session key
// Revoke revokes the internal LeaseID for the session key
func (s *Session) Revoke(timeout time.Duration) {
if s == nil {
return
}
if s.etcdCli == nil || s.leaseID == nil {
if s.etcdCli == nil || s.LeaseID == nil {
return
}
if s.Disconnected() {
@ -921,7 +875,7 @@ func (s *Session) Revoke(timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// ignores resp & error, just do best effort to revoke
_, _ = s.etcdCli.Revoke(ctx, *s.leaseID)
_, _ = s.etcdCli.Revoke(ctx, *s.LeaseID)
}
// UpdateRegistered update the state of registered.
@ -998,7 +952,7 @@ func (s *Session) ProcessActiveStandBy(activateFunc func() error) error {
clientv3.Version(s.activeKey),
"=",
0)).
Then(clientv3.OpPut(s.activeKey, string(sessionJSON), clientv3.WithLease(*s.leaseID))).Commit()
Then(clientv3.OpPut(s.activeKey, string(sessionJSON), clientv3.WithLease(*s.LeaseID))).Commit()
if err != nil {
log.Error("register active key to etcd failed", zap.Error(err))
return false, -1, err
@ -1085,7 +1039,7 @@ func (s *Session) ForceActiveStandby(activateFunc func() error) error {
if len(sessions) != 0 {
activeSess := sessions[s.ServerName]
if activeSess == nil || activeSess.leaseID == nil {
if activeSess == nil || activeSess.LeaseID == nil {
// force delete all old sessions
s.etcdCli.Delete(s.ctx, s.activeKey)
for _, sess := range sessions {
@ -1097,7 +1051,7 @@ func (s *Session) ForceActiveStandby(activateFunc func() error) error {
}
} else {
// force release old active session
_, _ = s.etcdCli.Revoke(s.ctx, *activeSess.leaseID)
_, _ = s.etcdCli.Revoke(s.ctx, *activeSess.LeaseID)
}
}
@ -1107,7 +1061,7 @@ func (s *Session) ForceActiveStandby(activateFunc func() error) error {
clientv3.Version(s.activeKey),
"=",
0)).
Then(clientv3.OpPut(s.activeKey, string(sessionJSON), clientv3.WithLease(*s.leaseID))).Commit()
Then(clientv3.OpPut(s.activeKey, string(sessionJSON), clientv3.WithLease(*s.LeaseID))).Commit()
if !resp.Succeeded {
msg := fmt.Sprintf("failed to force register ACTIVE %s", s.ServerName)

View File

@ -98,7 +98,7 @@ func TestInit(t *testing.T) {
s := NewSession(ctx, metaRoot, etcdCli)
s.Init("inittest", "testAddr", false, false)
assert.NotEqual(t, int64(0), s.leaseID)
assert.NotEqual(t, int64(0), s.LeaseID)
assert.NotEqual(t, int64(0), s.ServerID)
s.Register()
sessions, _, err := s.GetSessions("inittest")
@ -400,10 +400,12 @@ func TestSession_String(t *testing.T) {
func TestSesssionMarshal(t *testing.T) {
s := &Session{
ServerID: 1,
ServerName: "test",
Address: "localhost",
Version: common.Version,
SessionRaw: SessionRaw{
ServerID: 1,
ServerName: "test",
Address: "localhost",
},
Version: common.Version,
}
bs, err := json.Marshal(s)
@ -663,7 +665,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) {
{
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, _ = s1.etcdCli.Revoke(ctx, *s1.leaseID)
_, _ = s1.etcdCli.Revoke(ctx, *s1.LeaseID)
}
select {
case <-signal: