mirror of https://github.com/milvus-io/milvus.git
[skip e2e] Exit migration after session expired (#20282)
Signed-off-by: longjiquan <jiquan.long@zilliz.com> Signed-off-by: longjiquan <jiquan.long@zilliz.com>pull/20434/head
parent
9751bec04c
commit
d9081980e2
|
@ -13,8 +13,9 @@ func Backup(c *configs.Config) {
|
|||
runner := migration.NewRunner(ctx, c)
|
||||
console.ExitIf(runner.CheckSessions())
|
||||
console.ExitIf(runner.RegisterSession())
|
||||
defer runner.Stop()
|
||||
fn := func() { runner.Stop() }
|
||||
defer fn()
|
||||
// double check.
|
||||
console.ExitIf(runner.CheckSessions())
|
||||
console.ExitIf(runner.Backup())
|
||||
console.ExitIf(runner.CheckSessions(), console.AddCallbacks(fn))
|
||||
console.ExitIf(runner.Backup(), console.AddCallbacks(fn))
|
||||
}
|
||||
|
|
|
@ -13,8 +13,9 @@ func Rollback(c *configs.Config) {
|
|||
runner := migration.NewRunner(ctx, c)
|
||||
console.ExitIf(runner.CheckSessions())
|
||||
console.ExitIf(runner.RegisterSession())
|
||||
defer runner.Stop()
|
||||
fn := func() { runner.Stop() }
|
||||
defer fn()
|
||||
// double check.
|
||||
console.ExitIf(runner.CheckSessions())
|
||||
console.ExitIf(runner.Rollback())
|
||||
console.ExitIf(runner.CheckSessions(), console.AddCallbacks(fn))
|
||||
console.ExitIf(runner.Rollback(), console.AddCallbacks(fn))
|
||||
}
|
||||
|
|
|
@ -15,15 +15,16 @@ func Run(c *configs.Config) {
|
|||
runner := migration.NewRunner(ctx, c)
|
||||
console.AbnormalExitIf(runner.CheckSessions(), false)
|
||||
console.AbnormalExitIf(runner.RegisterSession(), false)
|
||||
defer runner.Stop()
|
||||
fn := func() { runner.Stop() }
|
||||
defer fn()
|
||||
// double check.
|
||||
console.AbnormalExitIf(runner.CheckSessions(), false)
|
||||
console.AbnormalExitIf(runner.Validate(), false)
|
||||
console.NormalExitIf(runner.CheckCompatible(), "version compatible, no need to migrate")
|
||||
console.AbnormalExitIf(runner.CheckSessions(), false, console.AddCallbacks(fn))
|
||||
console.AbnormalExitIf(runner.Validate(), false, console.AddCallbacks(fn))
|
||||
console.NormalExitIf(runner.CheckCompatible(), "version compatible, no need to migrate", console.AddCallbacks(fn))
|
||||
if c.RunWithBackup {
|
||||
console.AbnormalExitIf(runner.Backup(), false)
|
||||
console.AbnormalExitIf(runner.Backup(), false, console.AddCallbacks(fn))
|
||||
} else {
|
||||
console.Warning("run migration without backup!")
|
||||
}
|
||||
console.AbnormalExitIf(runner.Migrate(), true)
|
||||
console.AbnormalExitIf(runner.Migrate(), true, console.AddCallbacks(fn))
|
||||
}
|
||||
|
|
|
@ -3,8 +3,8 @@ package console
|
|||
type ErrorCode = int
|
||||
|
||||
const (
|
||||
NormalCode ErrorCode = 0
|
||||
BackupUnfinished ErrorCode = 1
|
||||
FailButBackupFinished ErrorCode = 2
|
||||
Unexpected ErrorCode = 100
|
||||
NormalCode ErrorCode = 0
|
||||
FailWithBackupUnfinished ErrorCode = 1
|
||||
FailButBackupFinished ErrorCode = 2
|
||||
Unexpected ErrorCode = 100
|
||||
)
|
||||
|
|
|
@ -19,13 +19,15 @@ func Warning(msg string) {
|
|||
colorOut(msg, "yellow")
|
||||
}
|
||||
|
||||
func Exit(msg string) {
|
||||
ExitWithOption(WithAbnormalExit(), WithExitCode(Unexpected), WithMsg(msg))
|
||||
func Exit(msg string, options ...ExitOption) {
|
||||
opts := append([]ExitOption{}, options...)
|
||||
opts = append(opts, WithAbnormalExit(), WithExitCode(Unexpected), WithMsg(msg))
|
||||
ExitWithOption(opts...)
|
||||
}
|
||||
|
||||
func ExitIf(err error) {
|
||||
func ExitIf(err error, options ...ExitOption) {
|
||||
if err != nil {
|
||||
Exit(err.Error())
|
||||
Exit(err.Error(), options...)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,31 +12,36 @@ func ExitWithOption(opts ...ExitOption) {
|
|||
} else {
|
||||
Success(c.msg)
|
||||
}
|
||||
c.runBeforeExit()
|
||||
os.Exit(c.code)
|
||||
}
|
||||
|
||||
func AbnormalExit(backupFinished bool, msg string) {
|
||||
opts := []ExitOption{WithAbnormalExit(), WithMsg(msg)}
|
||||
func AbnormalExit(backupFinished bool, msg string, options ...ExitOption) {
|
||||
opts := append([]ExitOption{}, options...)
|
||||
opts = append(opts, WithAbnormalExit())
|
||||
opts = append(opts, WithMsg(msg))
|
||||
if backupFinished {
|
||||
opts = append(opts, WithExitCode(FailButBackupFinished))
|
||||
} else {
|
||||
opts = append(opts, WithExitCode(BackupUnfinished))
|
||||
opts = append(opts, WithExitCode(FailWithBackupUnfinished))
|
||||
}
|
||||
ExitWithOption(opts...)
|
||||
}
|
||||
|
||||
func AbnormalExitIf(err error, backupFinished bool) {
|
||||
func AbnormalExitIf(err error, backupFinished bool, options ...ExitOption) {
|
||||
if err != nil {
|
||||
AbnormalExit(backupFinished, err.Error())
|
||||
AbnormalExit(backupFinished, err.Error(), options...)
|
||||
}
|
||||
}
|
||||
|
||||
func NormalExit(msg string) {
|
||||
ExitWithOption(WithExitCode(NormalCode), WithMsg(msg))
|
||||
func NormalExit(msg string, options ...ExitOption) {
|
||||
opts := append([]ExitOption{}, options...)
|
||||
opts = append(opts, WithExitCode(NormalCode), WithMsg(msg))
|
||||
ExitWithOption(opts...)
|
||||
}
|
||||
|
||||
func NormalExitIf(success bool, msg string) {
|
||||
func NormalExitIf(success bool, msg string, options ...ExitOption) {
|
||||
if success {
|
||||
NormalExit(msg)
|
||||
NormalExit(msg, options...)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,13 +1,19 @@
|
|||
package console
|
||||
|
||||
type exitConfig struct {
|
||||
abnormal bool
|
||||
code ErrorCode
|
||||
msg string
|
||||
abnormal bool
|
||||
code ErrorCode
|
||||
msg string
|
||||
callbacks []func()
|
||||
}
|
||||
|
||||
func defaultExitConfig() exitConfig {
|
||||
return exitConfig{abnormal: false, code: 0, msg: ""}
|
||||
return exitConfig{
|
||||
abnormal: false,
|
||||
code: NormalCode,
|
||||
msg: "",
|
||||
callbacks: make([]func(), 0),
|
||||
}
|
||||
}
|
||||
|
||||
type ExitOption func(c *exitConfig)
|
||||
|
@ -18,6 +24,12 @@ func (c *exitConfig) apply(opts ...ExitOption) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *exitConfig) runBeforeExit() {
|
||||
for _, cb := range c.callbacks {
|
||||
cb()
|
||||
}
|
||||
}
|
||||
|
||||
func WithExitCode(code ErrorCode) ExitOption {
|
||||
return func(c *exitConfig) {
|
||||
c.code = code
|
||||
|
@ -35,3 +47,9 @@ func WithMsg(msg string) ExitOption {
|
|||
c.msg = msg
|
||||
}
|
||||
}
|
||||
|
||||
func AddCallbacks(fns ...func()) ExitOption {
|
||||
return func(c *exitConfig) {
|
||||
c.callbacks = append(c.callbacks, fns...)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,7 +51,8 @@ func NewRunner(ctx context.Context, cfg *configs.Config) *Runner {
|
|||
func (r *Runner) watchByPrefix(prefix string) {
|
||||
defer r.wg.Done()
|
||||
_, revision, err := r.session.GetSessions(prefix)
|
||||
console.AbnormalExitIf(err, r.backupFinished.Load())
|
||||
fn := func() { r.Stop() }
|
||||
console.AbnormalExitIf(err, r.backupFinished.Load(), console.AddCallbacks(fn))
|
||||
eventCh := r.session.WatchServices(prefix, revision, nil)
|
||||
for {
|
||||
select {
|
||||
|
@ -59,7 +60,7 @@ func (r *Runner) watchByPrefix(prefix string) {
|
|||
return
|
||||
case event := <-eventCh:
|
||||
msg := fmt.Sprintf("session up/down, exit migration, event type: %s, session: %s", event.EventType.String(), event.Session.String())
|
||||
console.AbnormalExit(r.backupFinished.Load(), msg)
|
||||
console.AbnormalExit(r.backupFinished.Load(), msg, console.AddCallbacks(fn))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue