Change session doc (#5355)

Change session doc

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/5359/head^2
godchen 2021-05-24 11:53:07 +08:00 committed by GitHub
parent 4e1b12269b
commit dd736ee8ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 73 deletions

View File

@ -64,7 +64,7 @@ The ID is stored in a key-value pair on etcd. The key is metaRootPath + "/servic
* The service creates a lease with etcd and stores a key-value pair in etcd. If the lease expires or the service goes offline, etcd will delete the key-value pair. You can judge whether this service is avaliable through the key.
* key: metaRootPath + "/services" + "/ServerName(-ServerID)(optional)"
* key: "/session" + "/ServerName(-ServerID)(optional)"
* value: json format
@ -73,7 +73,7 @@ The ID is stored in a key-value pair on etcd. The key is metaRootPath + "/servic
"ServerID": "ServerID",
"ServerName": "ServerName",
"Address": "ip:port",
"LeaseID": "LeaseID",
"Exclusive": "Exclusive",
}
```
@ -85,78 +85,48 @@ The ID is stored in a key-value pair on etcd. The key is metaRootPath + "/servic
* All currently available services can be obtained by obtaining all the key-value pairs deposited during registration. If you want to get all the available nodes for a certain type of service, you can pass in the prefix of the corresponding key
* Registeration time can be compared with ServerID for ServerID will increase according to time.
* Registration time can be compared with ServerID for ServerID will increase according to time.
###### Interface
```go
const defaultIDKey = "services/id"
const defaultRetryTimes = 30
const DefaultServiceRoot = "/session/"
const DefaultIDKey = "id"
const DefaultRetryTimes = 30
const DefaultTTL = 10
// Session is a struct to store service's session, including ServerID, ServerName,
// Address.
// LeaseID will be assigned after registered in etcd.
type Session struct {
ServerID int64
ServerName string
Address string
LeaseID clientv3.LeaseID
}
ctx context.Context
ServerID int64 `json:"ServerID,omitempty"`
ServerName string `json:"ServerName,omitempty"`
Address string `json:"Address,omitempty"`
Exclusive bool `json:"Exclusive,omitempty"`
var (
globalServerID = int64(-1)
)
etcdCli *clientv3.Client
leaseID clientv3.LeaseID
cancel context.CancelFunc
}
// NewSession is a helper to build Session object.LeaseID will be assigned after
// registeration.
func NewSession(serverID int64, serverName, address string) *Session {}
func NewSession(ctx context.Context, etcdAddress []string) *Session {}
// GlobalServerID returns [singleton] ServerID.
// Before SetGlobalServerID, GlobalServerID() returns -1
func GlobalServerID() int64 {}
// GetSessions will get all sessions registered in etcd.
func (s *Session) GetSessions(prefix string) (map[string]*Session, error) {}
// SetGlobalServerID sets the [singleton] ServerID. ServerID returned by
// GlobalServerID(). Those who use GlobalServerID should call SetGlobalServerID()
// as early as possible in main() before use ServerID.
func SetGlobalServerID(id int64) {}
// Init will initialize base struct in the SessionManager, including getServerID,
// and process keepAliveResponse
func (s *Session) Init(serverName, address string, exclusive bool) <-chan bool {}
// GetServerID gets id from etcd with key: metaRootPath + "/services/id"
// Each server get ServerID and add one to id.
func GetServerID(etcd *etcdkv.EtcdKV) (int64, error) {}
// RegisterService registers the service to etcd so that other services
// can find that the service is online and issue subsequent operations
// RegisterService will save a key-value in etcd
// key: metaRootPath + "/services/" + "ServerName(-ServerID)(optional)"
// value: json format
// {
// "ServerID": "ServerID",
// "ServerName": "ServerName",
// "Address": "ip:port",
// "LeaseID": "LeaseID",
// }
// MetaRootPath is configurable in the config file.
// Exclusive means whether this service can exist two at the same time, if so,
// it is false. Otherwise, set it to true and the key will not have ServerID.
// But ServerID still will be stored in value.
func RegisterService(etcdKV *etcdkv.EtcdKV, session *Session, ttl int64) (<-chan *clientv3.LeaseKeepAliveResponse, error) {}
// ProcessKeepAliveResponse processes the response of etcd keepAlive interface
// If keepAlive fails for unexpected error, it will retry for default_retry_times times
func ProcessKeepAliveResponse(ctx context.Context, ch <-chan *clientv3.LeaseKeepAliveResponse) (signal <-chan bool) {}
// GetAllSessions gets all the services registered in etcd.
// This gets all the key with prefix metaRootPath + "/services/" + prefix
// For general, "datanode" to get all datanodes
func GetSessions(etcdKV *etcdkv.EtcdKV, prefix string) ([]*Session, error) {}
// WatchServices watch all events in etcd.
// If a server register, a session will be sent to addChannel
// If a server offline, a session will be sent to deleteChannel
func WatchServices(ctx context.Context, etcdKV *etcdkv.EtcdKV, prefix string) (addChannel <-chan *Session, deleteChannel <-chan *Session) {}
```
// WatchServices watch the service's up and down in etcd, and saves it into local
// sessions.
// If a server up, it will be add to addChannel.
// If a server is offline, it will be add to delChannel.
func (s *Session) WatchServices(prefix string) (addChannel <-chan *Session, delChannel <-chan *Session) {}
#### A.3 Global Parameter Table

View File

@ -36,8 +36,9 @@ type Session struct {
cancel context.CancelFunc
}
// NewSession is a helper to build Session object.LeaseID will be assigned after
// registeration.
// NewSession is a helper to build Session object.
// ServerID and LeaseID will be assigned after registeration.
// etcdCli is initialized when NewSession
func NewSession(ctx context.Context, etcdAddress []string) *Session {
ctx, cancel := context.WithCancel(ctx)
session := &Session{
@ -62,7 +63,7 @@ func NewSession(ctx context.Context, etcdAddress []string) *Session {
// Init will initialize base struct in the SessionManager, including getServerID,
// and process keepAliveResponse
func (s *Session) Init(serverName, address string, exclusive bool) {
func (s *Session) Init(serverName, address string, exclusive bool) <-chan bool {
s.ServerName = serverName
s.Address = address
s.Exclusive = exclusive
@ -76,10 +77,10 @@ func (s *Session) Init(serverName, address string, exclusive bool) {
if err != nil {
panic(err)
}
s.processKeepAliveResponse(ch)
return s.processKeepAliveResponse(ch)
}
// GetServerID gets id from etcd with key: metaRootPath + "/services/id"
// getServerID gets id from etcd with key: "/session"+"id"
// Each server get ServerID and add one to id.
func (s *Session) getServerID() (int64, error) {
return s.getServerIDWithKey(DefaultIDKey, DefaultRetryTimes)
@ -128,22 +129,21 @@ func (s *Session) getServerIDWithKey(key string, retryTimes int) (int64, error)
return nil
}
err := retry.Retry(retryTimes, time.Millisecond*200, getServerIDWithKeyFn)
err := retry.Retry(retryTimes, time.Millisecond*500, getServerIDWithKeyFn)
return res, err
}
// RegisterService registers the service to etcd so that other services
// registerService registers the service to etcd so that other services
// can find that the service is online and issue subsequent operations
// RegisterService will save a key-value in etcd
// key: metaRootPath + "/services" + "/ServerName-ServerID"
// value: json format
// {
// "ServerID": "ServerID",
// "ServerName": "ServerName",
// "Address": "ip:port",
// "LeaseID": "LeaseID",
// ServerID int64 `json:"ServerID,omitempty"`
// ServerName string `json:"ServerName,omitempty"`
// Address string `json:"Address,omitempty"`
// Exclusive bool `json:"Exclusive,omitempty"`
// }
// MetaRootPath is configurable in the config file.
// Exclusive means whether this service can exist two at the same time, if so,
// it is false. Otherwise, set it to true.
func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, error) {
@ -188,14 +188,14 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er
}
return nil
}
err := retry.Retry(DefaultRetryTimes, time.Millisecond*200, registerFn)
err := retry.Retry(DefaultRetryTimes, time.Millisecond*500, registerFn)
if err != nil {
return ch, nil
}
return ch, nil
}
// ProcessKeepAliveResponse processes the response of etcd keepAlive interface
// processKeepAliveResponse processes the response of etcd keepAlive interface
// If keepAlive fails for unexpected error, it will send a signal to the channel.
func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveResponse) (failChannel <-chan bool) {
failCh := make(chan bool)
@ -207,11 +207,12 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes
return
case resp, ok := <-ch:
if !ok {
failCh <- true
close(failCh)
}
if resp == nil {
failCh <- true
close(failCh)
}
failCh <- true
}
}
}()