Backoff only when failed pod shows up

pull/6/head
Maciej Szulik 2018-03-09 16:39:09 +01:00
parent 0207a09074
commit 1266252dc2
No known key found for this signature in database
GPG Key ID: F15E55D276FA84C4
3 changed files with 94 additions and 15 deletions

View File

@ -64,6 +64,7 @@ go_test(
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)

View File

@ -109,9 +109,13 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
}
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.enqueueController,
AddFunc: func(obj interface{}) {
jm.enqueueController(obj, true)
},
UpdateFunc: jm.updateJob,
DeleteFunc: jm.enqueueController,
DeleteFunc: func(obj interface{}) {
jm.enqueueController(obj, true)
},
})
jm.jobLister = jobInformer.Lister()
jm.jobStoreSynced = jobInformer.Informer().HasSynced
@ -209,7 +213,7 @@ func (jm *JobController) addPod(obj interface{}) {
return
}
jm.expectations.CreationObserved(jobKey)
jm.enqueueController(job)
jm.enqueueController(job, true)
return
}
@ -218,7 +222,7 @@ func (jm *JobController) addPod(obj interface{}) {
// DO NOT observe creation because no controller should be waiting for an
// orphan.
for _, job := range jm.getPodJobs(pod) {
jm.enqueueController(job)
jm.enqueueController(job, true)
}
}
@ -242,7 +246,8 @@ func (jm *JobController) updatePod(old, cur interface{}) {
return
}
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
// the only time we want the backoff to kick-in, is when the pod failed
immediate := curPod.Status.Phase != v1.PodFailed
curControllerRef := metav1.GetControllerOf(curPod)
oldControllerRef := metav1.GetControllerOf(oldPod)
@ -250,7 +255,7 @@ func (jm *JobController) updatePod(old, cur interface{}) {
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil {
jm.enqueueController(job)
jm.enqueueController(job, immediate)
}
}
@ -260,15 +265,16 @@ func (jm *JobController) updatePod(old, cur interface{}) {
if job == nil {
return
}
jm.enqueueController(job)
jm.enqueueController(job, immediate)
return
}
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if labelChanged || controllerRefChanged {
for _, job := range jm.getPodJobs(curPod) {
jm.enqueueController(job)
jm.enqueueController(job, immediate)
}
}
}
@ -309,7 +315,7 @@ func (jm *JobController) deletePod(obj interface{}) {
return
}
jm.expectations.DeletionObserved(jobKey)
jm.enqueueController(job)
jm.enqueueController(job, true)
}
func (jm *JobController) updateJob(old, cur interface{}) {
@ -321,7 +327,7 @@ func (jm *JobController) updateJob(old, cur interface{}) {
if err != nil {
return
}
jm.enqueueController(curJob)
jm.enqueueController(curJob, true)
// check if need to add a new rsync for ActiveDeadlineSeconds
if curJob.Status.StartTime != nil {
curADS := curJob.Spec.ActiveDeadlineSeconds
@ -341,15 +347,19 @@ func (jm *JobController) updateJob(old, cur interface{}) {
}
}
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
func (jm *JobController) enqueueController(job interface{}) {
key, err := controller.KeyFunc(job)
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item,
// immediate tells the controller to update the status right away, and should
// happen ONLY when there was a successful pod run.
func (jm *JobController) enqueueController(obj interface{}, immediate bool) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", job, err))
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
// Retrieves the backoff duration for this Job
if immediate {
jm.queue.Forget(key)
}
backoff := getBackoff(jm.queue, key)
// TODO: Handle overlapping controllers better. Either disallow them at admission time or

View File

@ -36,6 +36,7 @@ import (
restclient "k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/legacyscheme"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller"
@ -1338,3 +1339,70 @@ func TestJobBackoffReset(t *testing.T) {
}
}
}
var _ workqueue.RateLimitingInterface = &fakeRateLimitingQueue{}
type fakeRateLimitingQueue struct {
workqueue.Interface
requeues int
item interface{}
duration time.Duration
}
func (f *fakeRateLimitingQueue) AddRateLimited(item interface{}) {}
func (f *fakeRateLimitingQueue) Forget(item interface{}) {}
func (f *fakeRateLimitingQueue) NumRequeues(item interface{}) int {
return f.requeues
}
func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duration) {
f.item = item
f.duration = duration
}
func TestJobBackoff(t *testing.T) {
job := newJob(1, 1, 1)
oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
oldPod.Status.Phase = v1.PodRunning
oldPod.ResourceVersion = "1"
newPod := oldPod.DeepCopy()
newPod.ResourceVersion = "2"
testCases := map[string]struct {
// inputs
requeues int
phase v1.PodPhase
// expectation
backoff int
}{
"1st failure": {0, v1.PodFailed, 0},
"2nd failure": {1, v1.PodFailed, 1},
"3rd failure": {2, v1.PodFailed, 2},
"1st success": {0, v1.PodSucceeded, 0},
"2nd success": {1, v1.PodSucceeded, 0},
"1st running": {0, v1.PodSucceeded, 0},
"2nd running": {1, v1.PodSucceeded, 0},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
queue := &fakeRateLimitingQueue{}
manager.queue = queue
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
queue.requeues = tc.requeues
newPod.Status.Phase = tc.phase
manager.updatePod(oldPod, newPod)
if queue.duration.Nanoseconds() != int64(tc.backoff)*DefaultJobBackOff.Nanoseconds() {
t.Errorf("unexpected backoff %v", queue.duration)
}
})
}
}