fix(gitops): avoid cancelling the auto updates for any error EE-5604 (#10294)
parent
f17da30d31
commit
5a0cb4d0e8
|
@ -17,6 +17,18 @@ type Scheduler struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PermanentError struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPermanentError(err error) *PermanentError {
|
||||||
|
return &PermanentError{err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *PermanentError) Error() string {
|
||||||
|
return e.err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
func NewScheduler(ctx context.Context) *Scheduler {
|
func NewScheduler(ctx context.Context) *Scheduler {
|
||||||
crontab := cron.New(cron.WithChain(cron.Recover(cron.DefaultLogger)))
|
crontab := cron.New(cron.WithChain(cron.Recover(cron.DefaultLogger)))
|
||||||
crontab.Start()
|
crontab.Start()
|
||||||
|
@ -84,14 +96,24 @@ func (s *Scheduler) StopJob(jobID string) error {
|
||||||
func (s *Scheduler) StartJobEvery(duration time.Duration, job func() error) string {
|
func (s *Scheduler) StartJobEvery(duration time.Duration, job func() error) string {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
j := cron.FuncJob(func() {
|
jobFn := cron.FuncJob(func() {
|
||||||
if err := job(); err != nil {
|
err := job()
|
||||||
log.Debug().Msg("job returned an error")
|
if err == nil {
|
||||||
cancel()
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var permErr *PermanentError
|
||||||
|
if errors.As(err, &permErr) {
|
||||||
|
log.Error().Err(permErr).Msg("job returned a permanent error, it will be stopped")
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Error().Err(err).Msg("job returned an error, it will be rescheduled")
|
||||||
})
|
})
|
||||||
|
|
||||||
entryID := s.crontab.Schedule(cron.Every(duration), j)
|
entryID := s.crontab.Schedule(cron.Every(duration), jobFn)
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.activeJobs[entryID] = cancel
|
s.activeJobs[entryID] = cancel
|
||||||
|
|
|
@ -49,7 +49,7 @@ func Test_JobCanBeStopped(t *testing.T) {
|
||||||
assert.False(t, workDone, "job shouldn't had a chance to run")
|
assert.False(t, workDone, "job shouldn't had a chance to run")
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_JobShouldStop_UponError(t *testing.T) {
|
func Test_JobShouldStop_UponPermError(t *testing.T) {
|
||||||
s := NewScheduler(context.Background())
|
s := NewScheduler(context.Background())
|
||||||
defer s.Shutdown()
|
defer s.Shutdown()
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ func Test_JobShouldStop_UponError(t *testing.T) {
|
||||||
s.StartJobEvery(jobInterval, func() error {
|
s.StartJobEvery(jobInterval, func() error {
|
||||||
acc++
|
acc++
|
||||||
close(ch)
|
close(ch)
|
||||||
return fmt.Errorf("failed")
|
return NewPermanentError(fmt.Errorf("failed"))
|
||||||
})
|
})
|
||||||
|
|
||||||
<-time.After(3 * jobInterval)
|
<-time.After(3 * jobInterval)
|
||||||
|
@ -66,6 +66,28 @@ func Test_JobShouldStop_UponError(t *testing.T) {
|
||||||
assert.Equal(t, 1, acc, "job stop after the first run because it returns an error")
|
assert.Equal(t, 1, acc, "job stop after the first run because it returns an error")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_JobShouldNotStop_UponError(t *testing.T) {
|
||||||
|
s := NewScheduler(context.Background())
|
||||||
|
defer s.Shutdown()
|
||||||
|
|
||||||
|
var acc int
|
||||||
|
ch := make(chan struct{})
|
||||||
|
s.StartJobEvery(jobInterval, func() error {
|
||||||
|
acc++
|
||||||
|
|
||||||
|
if acc == 2 {
|
||||||
|
close(ch)
|
||||||
|
return NewPermanentError(fmt.Errorf("failed"))
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.New("non-permanent error")
|
||||||
|
})
|
||||||
|
|
||||||
|
<-time.After(3 * jobInterval)
|
||||||
|
<-ch
|
||||||
|
assert.Equal(t, 2, acc)
|
||||||
|
}
|
||||||
|
|
||||||
func Test_CanTerminateAllJobs_ByShuttingDownScheduler(t *testing.T) {
|
func Test_CanTerminateAllJobs_ByShuttingDownScheduler(t *testing.T) {
|
||||||
s := NewScheduler(context.Background())
|
s := NewScheduler(context.Background())
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/portainer/portainer/api/dataservices"
|
"github.com/portainer/portainer/api/dataservices"
|
||||||
"github.com/portainer/portainer/api/git/update"
|
"github.com/portainer/portainer/api/git/update"
|
||||||
"github.com/portainer/portainer/api/http/security"
|
"github.com/portainer/portainer/api/http/security"
|
||||||
|
"github.com/portainer/portainer/api/scheduler"
|
||||||
"github.com/portainer/portainer/api/stacks/stackutils"
|
"github.com/portainer/portainer/api/stacks/stackutils"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -29,7 +30,9 @@ func RedeployWhenChanged(stackID portainer.StackID, deployer StackDeployer, data
|
||||||
log.Debug().Int("stack_id", int(stackID)).Msg("redeploying stack")
|
log.Debug().Int("stack_id", int(stackID)).Msg("redeploying stack")
|
||||||
|
|
||||||
stack, err := datastore.Stack().Read(stackID)
|
stack, err := datastore.Stack().Read(stackID)
|
||||||
if err != nil {
|
if dataservices.IsErrObjectNotFound(err) {
|
||||||
|
return scheduler.NewPermanentError(errors.WithMessagef(err, "failed to get the stack %v", stackID))
|
||||||
|
} else if err != nil {
|
||||||
return errors.WithMessagef(err, "failed to get the stack %v", stackID)
|
return errors.WithMessagef(err, "failed to get the stack %v", stackID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,7 +41,15 @@ func RedeployWhenChanged(stackID portainer.StackID, deployer StackDeployer, data
|
||||||
}
|
}
|
||||||
|
|
||||||
endpoint, err := datastore.Endpoint().Endpoint(stack.EndpointID)
|
endpoint, err := datastore.Endpoint().Endpoint(stack.EndpointID)
|
||||||
if err != nil {
|
if dataservices.IsErrObjectNotFound(err) {
|
||||||
|
return scheduler.NewPermanentError(
|
||||||
|
errors.WithMessagef(err,
|
||||||
|
"failed to find the environment %v associated to the stack %v",
|
||||||
|
stack.EndpointID,
|
||||||
|
stack.ID,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
} else if err != nil {
|
||||||
return errors.WithMessagef(err, "failed to find the environment %v associated to the stack %v", stack.EndpointID, stack.ID)
|
return errors.WithMessagef(err, "failed to find the environment %v associated to the stack %v", stack.EndpointID, stack.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,7 +89,9 @@ func RedeployWhenChanged(stackID portainer.StackID, deployer StackDeployer, data
|
||||||
}
|
}
|
||||||
|
|
||||||
registries, err := getUserRegistries(datastore, user, endpoint.ID)
|
registries, err := getUserRegistries(datastore, user, endpoint.ID)
|
||||||
if err != nil {
|
if dataservices.IsErrObjectNotFound(err) {
|
||||||
|
return scheduler.NewPermanentError(err)
|
||||||
|
} else if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue