From b0a12116c8ae211754f2dfa0cd7a2ed323ef50cc Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Mon, 17 Oct 2022 15:07:25 +0800 Subject: [PATCH] Fix meta migration tool (#19814) Signed-off-by: longjiquan Signed-off-by: longjiquan --- Makefile | 4 +- cmd/tools/migration/backend/backend.go | 4 +- cmd/tools/migration/backend/backup_header.go | 55 ++++++++++++++- cmd/tools/migration/backend/etcd210.go | 70 ++++++++++++++++++- cmd/tools/migration/command/config.go | 2 +- cmd/tools/migration/command/main.go | 4 +- cmd/tools/migration/command/run.go | 15 ++-- cmd/tools/migration/configs/config.go | 17 ++++- cmd/tools/migration/console/code.go | 10 +++ cmd/tools/migration/console/console.go | 23 +----- cmd/tools/migration/console/exit.go | 42 +++++++++++ cmd/tools/migration/console/exit_config.go | 37 ++++++++++ cmd/tools/migration/example.yaml | 1 + cmd/tools/migration/migration/runner.go | 44 +++++++----- go.mod | 10 +-- go.sum | 4 ++ internal/util/sessionutil/session_util.go | 53 ++++++++++++-- .../util/sessionutil/session_util_test.go | 9 +++ 18 files changed, 337 insertions(+), 67 deletions(-) create mode 100644 cmd/tools/migration/console/code.go create mode 100644 cmd/tools/migration/console/exit.go create mode 100644 cmd/tools/migration/console/exit_config.go diff --git a/Makefile b/Makefile index a6780e5d69..f7d48f98a3 100644 --- a/Makefile +++ b/Makefile @@ -88,10 +88,10 @@ binlog: GO111MODULE=on $(GO) build -ldflags="-r $${RPATH}" -o $(INSTALL_PATH)/binlog $(PWD)/cmd/tools/binlog/main.go 1>/dev/null MIGRATION_PATH = $(PWD)/cmd/tools/migration -migration: +meta-migration: @echo "Building migration tool ..." @source $(PWD)/scripts/setenv.sh && \ - mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/migration $(MIGRATION_PATH)/main.go 1>/dev/null + mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/meta-migration $(MIGRATION_PATH)/main.go 1>/dev/null BUILD_TAGS = $(shell git describe --tags --always --dirty="-dev") BUILD_TIME = $(shell date -u) diff --git a/cmd/tools/migration/backend/backend.go b/cmd/tools/migration/backend/backend.go index 316166173c..c0d27c4b14 100644 --- a/cmd/tools/migration/backend/backend.go +++ b/cmd/tools/migration/backend/backend.go @@ -18,12 +18,12 @@ type Backend interface { Save(meta *meta.Meta) error Clean() error Backup(meta *meta.Meta, backupFile string) error + BackupV2(file string) error Restore(backupFile string) error } func NewBackend(cfg *configs.MilvusConfig, version string) (Backend, error) { - switch cfg.MetaStoreCfg.MetaStoreType { - case util.MetaStoreTypeMysql: + if cfg.MetaStoreCfg.MetaStoreType != util.MetaStoreTypeEtcd { return nil, fmt.Errorf("%s is not supported now", cfg.MetaStoreCfg.MetaStoreType) } v, err := semver.Parse(version) diff --git a/cmd/tools/migration/backend/backup_header.go b/cmd/tools/migration/backend/backup_header.go index 263613b549..40e30ed8e9 100644 --- a/cmd/tools/migration/backend/backup_header.go +++ b/cmd/tools/migration/backend/backup_header.go @@ -1,6 +1,12 @@ package backend -import "github.com/golang/protobuf/proto" +import ( + "encoding/json" + + "github.com/milvus-io/milvus/cmd/tools/migration/console" + + "github.com/golang/protobuf/proto" +) type BackupHeaderVersion int32 @@ -33,3 +39,50 @@ func (v *BackupHeader) String() string { } func (v *BackupHeader) ProtoMessage() {} + +type BackupHeaderExtra struct { + EntryIncludeRootPath bool `json:"entry_include_root_path"` +} + +type extraOption func(extra *BackupHeaderExtra) + +func setEntryIncludeRootPath(include bool) extraOption { + return func(extra *BackupHeaderExtra) { + extra.EntryIncludeRootPath = include + } +} + +func newDefaultBackupHeaderExtra() *BackupHeaderExtra { + return &BackupHeaderExtra{EntryIncludeRootPath: false} +} + +func newBackupHeaderExtra(opts ...extraOption) *BackupHeaderExtra { + v := newDefaultBackupHeaderExtra() + v.apply(opts...) + return v +} + +func (v *BackupHeaderExtra) apply(opts ...extraOption) { + for _, opt := range opts { + opt(v) + } +} + +func (v *BackupHeaderExtra) ToJSONBytes() []byte { + bs, err := json.Marshal(v) + if err != nil { + console.Error(err.Error()) + return nil + } + return bs +} + +func GetExtra(extra []byte) *BackupHeaderExtra { + var v = newDefaultBackupHeaderExtra() + err := json.Unmarshal(extra, v) + if err != nil { + console.Error(err.Error()) + return v + } + return v +} diff --git a/cmd/tools/migration/backend/etcd210.go b/cmd/tools/migration/backend/etcd210.go index 648108be8d..54722f8c1a 100644 --- a/cmd/tools/migration/backend/etcd210.go +++ b/cmd/tools/migration/backend/etcd210.go @@ -1,12 +1,15 @@ package backend import ( + "context" "fmt" "io/ioutil" "path" "strconv" "strings" + clientv3 "go.etcd.io/etcd/client/v3" + "github.com/milvus-io/milvus/cmd/tools/migration/configs" "github.com/milvus-io/milvus/cmd/tools/migration/legacy" @@ -398,18 +401,81 @@ func (b etcd210) Backup(meta *meta.Meta, backupFile string) error { return ioutil.WriteFile(backupFile, backup, 0600) } +func (b etcd210) BackupV2(file string) error { + var instance, metaPath string + metaRootPath := b.cfg.EtcdCfg.MetaRootPath + parts := strings.Split(metaRootPath, "/") + if len(parts) > 1 { + metaPath = parts[len(parts)-1] + instance = path.Join(parts[:len(parts)-1]...) + } else { + instance = metaRootPath + } + + ctx := context.Background() + // TODO: optimize this if memory consumption is too large. + saves := make(map[string]string) + cntResp, err := b.etcdCli.Get(ctx, metaRootPath, clientv3.WithPrefix(), clientv3.WithCountOnly()) + if err != nil { + return err + } + + opts := []clientv3.OpOption{clientv3.WithFromKey(), clientv3.WithRev(cntResp.Header.Revision), clientv3.WithLimit(1)} + currentKey := metaRootPath + for i := 0; int64(i) < cntResp.Count; i++ { + resp, err := b.etcdCli.Get(ctx, currentKey, opts...) + if err != nil { + return err + } + for _, kv := range resp.Kvs { + if kv.Lease != 0 { + console.Warning(fmt.Sprintf("lease key won't be backuped: %s, lease id: %d", kv.Key, kv.Lease)) + continue + } + saves[string(kv.Key)] = string(kv.Value) + currentKey = string(append(kv.Key, 0)) + } + } + + header := &BackupHeader{ + Version: BackupHeaderVersionV1, + Instance: instance, + MetaPath: metaPath, + Entries: int64(len(saves)), + Component: "", + Extra: newBackupHeaderExtra(setEntryIncludeRootPath(true)).ToJSONBytes(), + } + + codec := NewBackupCodec() + backup, err := codec.Serialize(header, saves) + if err != nil { + return err + } + + console.Warning(fmt.Sprintf("backup to: %s", file)) + return ioutil.WriteFile(file, backup, 0600) +} + func (b etcd210) Restore(backupFile string) error { backup, err := ioutil.ReadFile(backupFile) if err != nil { return err } codec := NewBackupCodec() - _, saves, err := codec.DeSerialize(backup) + header, saves, err := codec.DeSerialize(backup) if err != nil { return err } + entryIncludeRootPath := GetExtra(header.Extra).EntryIncludeRootPath + getRealKey := func(key string) string { + if entryIncludeRootPath { + return key + } + return path.Join(header.Instance, header.MetaPath, key) + } + ctx := context.Background() for k, v := range saves { - if err := b.txn.Save(k, v); err != nil { + if _, err := b.etcdCli.Put(ctx, getRealKey(k), v); err != nil { return err } } diff --git a/cmd/tools/migration/command/config.go b/cmd/tools/migration/command/config.go index db36b30895..80e5375c37 100644 --- a/cmd/tools/migration/command/config.go +++ b/cmd/tools/migration/command/config.go @@ -15,7 +15,7 @@ func (c *commandParser) formatYaml(args []string, flags *flag.FlagSet) { } func (c *commandParser) parse(args []string, flags *flag.FlagSet) { - console.ExitIf(flags.Parse(args[1:])) + console.AbnormalExitIf(flags.Parse(args[1:]), false) } func (c *commandParser) format(args []string, flags *flag.FlagSet) { diff --git a/cmd/tools/migration/command/main.go b/cmd/tools/migration/command/main.go index 128455f9ca..5c393f4c5a 100644 --- a/cmd/tools/migration/command/main.go +++ b/cmd/tools/migration/command/main.go @@ -19,7 +19,7 @@ func Execute(args []string) { c := &commandParser{} c.format(args, flags) - console.ErrorExitIf(c.configYaml == "", "config not set") + console.ErrorExitIf(c.configYaml == "", false, "config not set") cfg := configs.NewConfig(c.configYaml) switch cfg.Cmd { @@ -30,6 +30,6 @@ func Execute(args []string) { case configs.RollbackCmd: Rollback(cfg) default: - console.Exit(fmt.Sprintf("cmd not set or not supported: %s", cfg.Cmd)) + console.AbnormalExit(false, fmt.Sprintf("cmd not set or not supported: %s", cfg.Cmd)) } } diff --git a/cmd/tools/migration/command/run.go b/cmd/tools/migration/command/run.go index d1fd8713c7..800abfc39e 100644 --- a/cmd/tools/migration/command/run.go +++ b/cmd/tools/migration/command/run.go @@ -13,12 +13,17 @@ import ( func Run(c *configs.Config) { ctx := context.Background() runner := migration.NewRunner(ctx, c) - console.ExitIf(runner.CheckSessions()) - console.ExitIf(runner.RegisterSession()) + console.AbnormalExitIf(runner.CheckSessions(), false) + console.AbnormalExitIf(runner.RegisterSession(), false) defer runner.Stop() // double check. - console.ExitIf(runner.CheckSessions()) - console.ExitIf(runner.Validate()) + console.AbnormalExitIf(runner.CheckSessions(), false) + console.AbnormalExitIf(runner.Validate(), false) console.NormalExitIf(runner.CheckCompatible(), "version compatible, no need to migrate") - console.ExitIf(runner.Migrate()) + if c.RunWithBackup { + console.AbnormalExitIf(runner.Backup(), false) + } else { + console.Warning("run migration without backup!") + } + console.AbnormalExitIf(runner.Migrate(), true) } diff --git a/cmd/tools/migration/configs/config.go b/cmd/tools/migration/configs/config.go index 0eb511f7a1..7d0e2caf04 100644 --- a/cmd/tools/migration/configs/config.go +++ b/cmd/tools/migration/configs/config.go @@ -17,6 +17,7 @@ const ( type RunConfig struct { base *paramtable.BaseTable Cmd string + RunWithBackup bool SourceVersion string TargetVersion string BackupFilePath string @@ -29,8 +30,19 @@ func newRunConfig(base *paramtable.BaseTable) *RunConfig { } func (c *RunConfig) String() string { - return fmt.Sprintf("Cmd: %s, SourceVersion: %s, TargetVersion: %s, BackupFilePath: %s", - c.Cmd, c.SourceVersion, c.TargetVersion, c.BackupFilePath) + switch c.Cmd { + case RunCmd: + return fmt.Sprintf("Cmd: %s, SourceVersion: %s, TargetVersion: %s, BackupFilePath: %s, RunWithBackup: %v", + c.Cmd, c.SourceVersion, c.TargetVersion, c.BackupFilePath, c.RunWithBackup) + case BackupCmd: + return fmt.Sprintf("Cmd: %s, SourceVersion: %s, BackupFilePath: %s", + c.Cmd, c.SourceVersion, c.BackupFilePath) + case RollbackCmd: + return fmt.Sprintf("Cmd: %s, SourceVersion: %s, TargetVersion: %s, BackupFilePath: %s", + c.Cmd, c.SourceVersion, c.TargetVersion, c.BackupFilePath) + default: + return fmt.Sprintf("invalid cmd: %s", c.Cmd) + } } func (c *RunConfig) show() { @@ -41,6 +53,7 @@ func (c *RunConfig) init(base *paramtable.BaseTable) { c.base = base c.Cmd = c.base.LoadWithDefault("cmd.type", "") + c.RunWithBackup = c.base.ParseBool("cmd.runWithBackup", false) c.SourceVersion = c.base.LoadWithDefault("config.sourceVersion", "") c.TargetVersion = c.base.LoadWithDefault("config.targetVersion", "") c.BackupFilePath = c.base.LoadWithDefault("config.backupFilePath", "") diff --git a/cmd/tools/migration/console/code.go b/cmd/tools/migration/console/code.go new file mode 100644 index 0000000000..9f25932bb3 --- /dev/null +++ b/cmd/tools/migration/console/code.go @@ -0,0 +1,10 @@ +package console + +type ErrorCode = int + +const ( + NormalCode ErrorCode = 0 + BackupUnfinished ErrorCode = 1 + FailButBackupFinished ErrorCode = 2 + Unexpected ErrorCode = 100 +) diff --git a/cmd/tools/migration/console/console.go b/cmd/tools/migration/console/console.go index 801bdb3aae..295648ee0a 100644 --- a/cmd/tools/migration/console/console.go +++ b/cmd/tools/migration/console/console.go @@ -20,8 +20,7 @@ func Warning(msg string) { } func Exit(msg string) { - Error(msg) - os.Exit(1) + ExitWithOption(WithAbnormalExit(), WithMsg(msg)) } func ExitIf(err error) { @@ -30,25 +29,9 @@ func ExitIf(err error) { } } -func NormalExit(msg string) { - Success(msg) - os.Exit(0) -} - -func NormalExitIf(success bool, msg string) { - if success { - NormalExit(msg) - } -} - -func ErrorExit(msg string) { - Warning(msg) - os.Exit(1) -} - -func ErrorExitIf(fail bool, msg string) { +func ErrorExitIf(fail bool, backupFinished bool, msg string) { if fail { - ErrorExit(msg) + AbnormalExit(backupFinished, msg) } } diff --git a/cmd/tools/migration/console/exit.go b/cmd/tools/migration/console/exit.go new file mode 100644 index 0000000000..793b284df1 --- /dev/null +++ b/cmd/tools/migration/console/exit.go @@ -0,0 +1,42 @@ +package console + +import ( + "os" +) + +func ExitWithOption(opts ...ExitOption) { + c := defaultExitConfig() + c.apply(opts...) + if c.abnormal { + Error(c.msg) + } else { + Success(c.msg) + } + os.Exit(c.code) +} + +func AbnormalExit(backupFinished bool, msg string) { + opts := []ExitOption{WithAbnormalExit(), WithMsg(msg)} + if backupFinished { + opts = append(opts, WithExitCode(FailButBackupFinished)) + } else { + opts = append(opts, WithExitCode(BackupUnfinished)) + } + ExitWithOption(opts...) +} + +func AbnormalExitIf(err error, backupFinished bool) { + if err != nil { + AbnormalExit(backupFinished, err.Error()) + } +} + +func NormalExit(msg string) { + ExitWithOption(WithExitCode(NormalCode), WithMsg(msg)) +} + +func NormalExitIf(success bool, msg string) { + if success { + NormalExit(msg) + } +} diff --git a/cmd/tools/migration/console/exit_config.go b/cmd/tools/migration/console/exit_config.go new file mode 100644 index 0000000000..629de6c199 --- /dev/null +++ b/cmd/tools/migration/console/exit_config.go @@ -0,0 +1,37 @@ +package console + +type exitConfig struct { + abnormal bool + code ErrorCode + msg string +} + +func defaultExitConfig() exitConfig { + return exitConfig{abnormal: false, code: 0, msg: ""} +} + +type ExitOption func(c *exitConfig) + +func (c *exitConfig) apply(opts ...ExitOption) { + for _, opt := range opts { + opt(c) + } +} + +func WithExitCode(code ErrorCode) ExitOption { + return func(c *exitConfig) { + c.code = code + } +} + +func WithAbnormalExit() ExitOption { + return func(c *exitConfig) { + c.abnormal = true + } +} + +func WithMsg(msg string) ExitOption { + return func(c *exitConfig) { + c.msg = msg + } +} diff --git a/cmd/tools/migration/example.yaml b/cmd/tools/migration/example.yaml index ca9c7b4f0f..cc28c80806 100644 --- a/cmd/tools/migration/example.yaml +++ b/cmd/tools/migration/example.yaml @@ -1,6 +1,7 @@ cmd: # Option: run/backup/rollback type: run + runWithBackup: false config: sourceVersion: 2.1.0 diff --git a/cmd/tools/migration/migration/runner.go b/cmd/tools/migration/migration/runner.go index 5513845dad..92951cba77 100644 --- a/cmd/tools/migration/migration/runner.go +++ b/cmd/tools/migration/migration/runner.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "go.uber.org/atomic" + "github.com/milvus-io/milvus/cmd/tools/migration/versions" "github.com/blang/semver/v4" @@ -22,19 +24,25 @@ import ( ) type Runner struct { - ctx context.Context - cancel context.CancelFunc - cfg *configs.Config - initOnce sync.Once - session *sessionutil.Session - address string - etcdCli *clientv3.Client - wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + cfg *configs.Config + initOnce sync.Once + session *sessionutil.Session + address string + etcdCli *clientv3.Client + wg sync.WaitGroup + backupFinished atomic.Bool } func NewRunner(ctx context.Context, cfg *configs.Config) *Runner { ctx1, cancel := context.WithCancel(ctx) - runner := &Runner{ctx: ctx1, cancel: cancel, cfg: cfg} + runner := &Runner{ + ctx: ctx1, + cancel: cancel, + cfg: cfg, + backupFinished: *atomic.NewBool(false), + } runner.initOnce.Do(runner.init) return runner } @@ -42,7 +50,7 @@ func NewRunner(ctx context.Context, cfg *configs.Config) *Runner { func (r *Runner) watchByPrefix(prefix string) { defer r.wg.Done() _, revision, err := r.session.GetSessions(prefix) - console.ExitIf(err) + console.AbnormalExitIf(err, r.backupFinished.Load()) eventCh := r.session.WatchServices(prefix, revision, nil) for { select { @@ -50,7 +58,7 @@ func (r *Runner) watchByPrefix(prefix string) { return case event := <-eventCh: msg := fmt.Sprintf("session up/down, exit migration, event type: %s, session: %s", event.EventType.String(), event.Session.String()) - console.Exit(msg) + console.AbnormalExit(r.backupFinished.Load(), msg) } } } @@ -64,14 +72,15 @@ func (r *Runner) WatchSessions() { func (r *Runner) initEtcdCli() { cli, err := etcd.GetEtcdClient(r.cfg.EtcdCfg) - console.ExitIf(err) + console.AbnormalExitIf(err, r.backupFinished.Load()) r.etcdCli = cli } func (r *Runner) init() { r.initEtcdCli() - r.session = sessionutil.NewSession(r.ctx, r.cfg.EtcdCfg.MetaRootPath, r.etcdCli) + r.session = sessionutil.NewSession(r.ctx, r.cfg.EtcdCfg.MetaRootPath, r.etcdCli, + sessionutil.WithCustomConfigEnable(), sessionutil.WithSessionTTL(60), sessionutil.WithSessionRetryTimes(30)) // address not important here. address := time.Now().String() r.address = address @@ -152,11 +161,11 @@ func (r *Runner) Backup() error { if err != nil { return err } - metas, err := source.Load() - if err != nil { + if err := source.BackupV2(r.cfg.BackupFilePath); err != nil { return err } - return source.Backup(metas, r.cfg.BackupFilePath) + r.backupFinished.Store(true) + return nil } func (r *Runner) Rollback() error { @@ -190,9 +199,6 @@ func (r *Runner) Migrate() error { if err != nil { return err } - if err := source.Backup(metas, r.cfg.BackupFilePath); err != nil { - return err - } if err := source.Clean(); err != nil { return err } diff --git a/go.mod b/go.mod index 24ccf68039..e9ae3e964a 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/klauspost/compress v1.14.2 github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api master + github.com/milvus-io/milvus-proto/go-api v0.0.0-20221014075920-6c03ca8c3749 github.com/minio/minio-go/v7 v7.0.17 github.com/opentracing/opentracing-go v1.2.0 github.com/panjf2000/ants/v2 v2.4.8 @@ -193,6 +193,8 @@ require ( sigs.k8s.io/yaml v1.2.0 // indirect ) +require github.com/quasilyte/go-ruleguard/dsl v0.3.21 // indirect + replace ( github.com/apache/pulsar-client-go => github.com/milvus-io/pulsar-client-go v0.6.8 github.com/bketelsen/crypt => github.com/bketelsen/crypt v0.0.4 // Fix security alert for core-os/etcd @@ -200,7 +202,7 @@ replace ( github.com/go-kit/kit => github.com/go-kit/kit v0.1.0 github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1 github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect - // If you want to use the hook interceptor, the following code should be commented out - // and you should modify the api version to be the same as the `so` project. - //replace github.com/milvus-io/milvus-proto/go-api => github.com/SimFG/milvus-proto/go-api v0.0.0-20221012123137-df1cf6457a79 +// If you want to use the hook interceptor, the following code should be commented out +// and you should modify the api version to be the same as the `so` project. +//replace github.com/milvus-io/milvus-proto/go-api => github.com/SimFG/milvus-proto/go-api v0.0.0-20221012123137-df1cf6457a79 ) diff --git a/go.sum b/go.sum index 90c3ec9077..024d8dec7d 100644 --- a/go.sum +++ b/go.sum @@ -495,6 +495,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/milvus-proto/go-api v0.0.0-20221013061520-f0f555f4f091 h1:hCuXvhJweswxha1A9+VlnrcZPRwcRwukb00Y6k+RaJo= github.com/milvus-io/milvus-proto/go-api v0.0.0-20221013061520-f0f555f4f091/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20221014075920-6c03ca8c3749 h1:IB+Jyx3dVtrMSGYYVlobkSC9Kat+lxd/GM1i0cKIJXA= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20221014075920-6c03ca8c3749/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100= github.com/milvus-io/pulsar-client-go v0.6.8/go.mod h1:oFIlYIk23tamkSLttw849qphmMIpHY8ztEBWDWJW+sc= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= @@ -596,6 +598,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/quasilyte/go-ruleguard/dsl v0.3.21 h1:vNkC6fC6qMLzCOGbnIHOd5ixUGgTbp3Z4fGnUgULlDA= +github.com/quasilyte/go-ruleguard/dsl v0.3.21/go.mod h1:KeCP03KrjuSO0H1kTuZQCWlQPulDV6YMIXmpQss17rU= github.com/rivo/tview v0.0.0-20200219210816-cd38d7432498/go.mod h1:6lkG1x+13OShEf0EaOCaTQYyB7d5nSbb181KtjlS+84= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ= diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 0182e8ff2c..dc01750afd 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -85,6 +85,30 @@ type Session struct { isStandby atomic.Value enableActiveStandBy bool activeKey string + + useCustomConfig bool + sessionTTL int64 + sessionRetryTimes int64 +} + +type SessionOption func(session *Session) + +func WithCustomConfigEnable() SessionOption { + return func(session *Session) { session.useCustomConfig = true } +} + +func WithSessionTTL(ttl int64) SessionOption { + return func(session *Session) { session.sessionTTL = ttl } +} + +func WithSessionRetryTimes(n int64) SessionOption { + return func(session *Session) { session.sessionRetryTimes = n } +} + +func (s *Session) apply(opts ...SessionOption) { + for _, opt := range opts { + opt(s) + } } // UnmarshalJSON unmarshal bytes to Session. @@ -143,13 +167,18 @@ func (s *Session) MarshalJSON() ([]byte, error) { // ServerID, ServerName, Address, Exclusive will be assigned after Init(). // metaRoot is a path in etcd to save session information. // etcdEndpoints is to init etcdCli when NewSession -func NewSession(ctx context.Context, metaRoot string, client *clientv3.Client) *Session { +func NewSession(ctx context.Context, metaRoot string, client *clientv3.Client, opts ...SessionOption) *Session { session := &Session{ - ctx: ctx, - metaRoot: metaRoot, - Version: common.Version, + ctx: ctx, + metaRoot: metaRoot, + Version: common.Version, + useCustomConfig: false, + sessionTTL: 60, + sessionRetryTimes: 30, } + session.apply(opts...) + session.UpdateRegistered(false) connectEtcdFn := func() error { @@ -185,7 +214,9 @@ func (s *Session) Init(serverName, address string, exclusive bool, triggerKill b panic(err) } s.ServerID = serverID - GlobalParams.InitOnce() + if !s.useCustomConfig { + GlobalParams.InitOnce() + } } // String makes Session struct able to be logged by zap @@ -277,8 +308,16 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er completeKey := path.Join(s.metaRoot, DefaultServiceRoot, key) var ch <-chan *clientv3.LeaseKeepAliveResponse log.Debug("service begin to register to etcd", zap.String("serverName", s.ServerName), zap.Int64("ServerID", s.ServerID)) + + ttl := s.sessionTTL + retryTimes := s.sessionRetryTimes + if !s.useCustomConfig { + ttl = GlobalParams.CommonCfg.SessionTTL + retryTimes = GlobalParams.CommonCfg.SessionRetryTimes + } + registerFn := func() error { - resp, err := s.etcdCli.Grant(s.ctx, GlobalParams.CommonCfg.SessionTTL) + resp, err := s.etcdCli.Grant(s.ctx, ttl) if err != nil { log.Error("register service", zap.Error(err)) return err @@ -317,7 +356,7 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er log.Info("Service registered successfully", zap.String("ServerName", s.ServerName), zap.Int64("serverID", s.ServerID)) return nil } - err := retry.Do(s.ctx, registerFn, retry.Attempts(uint(GlobalParams.CommonCfg.SessionRetryTimes))) + err := retry.Do(s.ctx, registerFn, retry.Attempts(uint(retryTimes))) if err != nil { return nil, err } diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 7ef0ec4400..6e3c5c1575 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -702,3 +702,12 @@ func TestSessionEventType_String(t *testing.T) { }) } } + +func TestSession_apply(t *testing.T) { + session := &Session{} + opts := []SessionOption{WithCustomConfigEnable(), WithSessionTTL(100), WithSessionRetryTimes(200)} + session.apply(opts...) + assert.True(t, session.useCustomConfig) + assert.Equal(t, int64(100), session.sessionTTL) + assert.Equal(t, int64(200), session.sessionRetryTimes) +}