Merge pull request #1367 from skriss/restic-repo-fixes
Restic repository management fixespull/1380/head
commit
0750b2c789
|
@ -0,0 +1 @@
|
|||
restic repo ensurer: return error if new repository does not become ready within a minute, and fix channel closing/deletion
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2018 the Velero contributors.
|
||||
Copyright 2018, 2019 the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
|
@ -74,7 +74,7 @@ func NewResticRepositoryController(
|
|||
},
|
||||
)
|
||||
|
||||
c.resyncPeriod = 30 * time.Minute
|
||||
c.resyncPeriod = 5 * time.Minute
|
||||
c.resyncFunc = c.enqueueAllRepositories
|
||||
|
||||
return c
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2018 the Velero contributors.
|
||||
Copyright 2018, 2019 the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
|
@ -20,6 +20,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
@ -39,8 +40,8 @@ type repositoryEnsurer struct {
|
|||
repoLister velerov1listers.ResticRepositoryLister
|
||||
repoClient velerov1client.ResticRepositoriesGetter
|
||||
|
||||
readyChansLock sync.Mutex
|
||||
readyChans map[string]chan *velerov1api.ResticRepository
|
||||
repoChansLock sync.Mutex
|
||||
repoChans map[string]chan *velerov1api.ResticRepository
|
||||
|
||||
// repoLocksMu synchronizes reads/writes to the repoLocks map itself
|
||||
// since maps are not threadsafe.
|
||||
|
@ -58,7 +59,7 @@ func newRepositoryEnsurer(repoInformer velerov1informers.ResticRepositoryInforme
|
|||
log: log,
|
||||
repoLister: repoInformer.Lister(),
|
||||
repoClient: repoClient,
|
||||
readyChans: make(map[string]chan *velerov1api.ResticRepository),
|
||||
repoChans: make(map[string]chan *velerov1api.ResticRepository),
|
||||
repoLocks: make(map[repoKey]*sync.Mutex),
|
||||
}
|
||||
|
||||
|
@ -68,20 +69,27 @@ func newRepositoryEnsurer(repoInformer velerov1informers.ResticRepositoryInforme
|
|||
oldObj := old.(*velerov1api.ResticRepository)
|
||||
newObj := upd.(*velerov1api.ResticRepository)
|
||||
|
||||
if oldObj.Status.Phase != velerov1api.ResticRepositoryPhaseReady && newObj.Status.Phase == velerov1api.ResticRepositoryPhaseReady {
|
||||
r.readyChansLock.Lock()
|
||||
defer r.readyChansLock.Unlock()
|
||||
|
||||
key := repoLabels(newObj.Spec.VolumeNamespace, newObj.Spec.BackupStorageLocation).String()
|
||||
readyChan, ok := r.readyChans[key]
|
||||
if !ok {
|
||||
log.Errorf("No ready channel found for repository %s/%s", newObj.Namespace, newObj.Name)
|
||||
return
|
||||
}
|
||||
|
||||
readyChan <- newObj
|
||||
delete(r.readyChans, key)
|
||||
// we're only interested in phase-changing updates
|
||||
if oldObj.Status.Phase == newObj.Status.Phase {
|
||||
return
|
||||
}
|
||||
|
||||
// we're only interested in updates where the updated object is either Ready or NotReady
|
||||
if newObj.Status.Phase != velerov1api.ResticRepositoryPhaseReady && newObj.Status.Phase != velerov1api.ResticRepositoryPhaseNotReady {
|
||||
return
|
||||
}
|
||||
|
||||
r.repoChansLock.Lock()
|
||||
defer r.repoChansLock.Unlock()
|
||||
|
||||
key := repoLabels(newObj.Spec.VolumeNamespace, newObj.Spec.BackupStorageLocation).String()
|
||||
repoChan, ok := r.repoChans[key]
|
||||
if !ok {
|
||||
log.Debugf("No ready channel found for repository %s/%s", newObj.Namespace, newObj.Name)
|
||||
return
|
||||
}
|
||||
|
||||
repoChan <- newObj
|
||||
},
|
||||
},
|
||||
)
|
||||
|
@ -132,7 +140,7 @@ func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam
|
|||
}
|
||||
if len(repos) == 1 {
|
||||
if repos[0].Status.Phase != velerov1api.ResticRepositoryPhaseReady {
|
||||
return nil, errors.New("restic repository is not ready")
|
||||
return nil, errors.Errorf("restic repository is not ready: %s", repos[0].Status.Message)
|
||||
}
|
||||
|
||||
log.Debug("Ready repository found")
|
||||
|
@ -155,27 +163,38 @@ func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam
|
|||
},
|
||||
}
|
||||
|
||||
readyChan := r.getReadyChan(selector.String())
|
||||
defer close(readyChan)
|
||||
repoChan := r.getRepoChan(selector.String())
|
||||
defer func() {
|
||||
delete(r.repoChans, selector.String())
|
||||
close(repoChan)
|
||||
}()
|
||||
|
||||
if _, err := r.repoClient.ResticRepositories(namespace).Create(repo); err != nil {
|
||||
return nil, errors.Wrapf(err, "unable to create restic repository resource")
|
||||
}
|
||||
|
||||
select {
|
||||
// repositories should become either ready or not ready quickly if they're
|
||||
// newly created.
|
||||
case <-time.After(time.Minute):
|
||||
return nil, errors.New("timed out waiting for restic repository to become ready")
|
||||
case <-ctx.Done():
|
||||
return nil, errors.New("timed out waiting for restic repository to become ready")
|
||||
case res := <-readyChan:
|
||||
case res := <-repoChan:
|
||||
if res.Status.Phase == velerov1api.ResticRepositoryPhaseNotReady {
|
||||
return nil, errors.Errorf("restic repository is not ready: %s", res.Status.Message)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (r *repositoryEnsurer) getReadyChan(name string) chan *velerov1api.ResticRepository {
|
||||
r.readyChansLock.Lock()
|
||||
defer r.readyChansLock.Unlock()
|
||||
func (r *repositoryEnsurer) getRepoChan(name string) chan *velerov1api.ResticRepository {
|
||||
r.repoChansLock.Lock()
|
||||
defer r.repoChansLock.Unlock()
|
||||
|
||||
r.readyChans[name] = make(chan *velerov1api.ResticRepository)
|
||||
return r.readyChans[name]
|
||||
r.repoChans[name] = make(chan *velerov1api.ResticRepository)
|
||||
return r.repoChans[name]
|
||||
}
|
||||
|
||||
func (r *repositoryEnsurer) repoLock(volumeNamespace, backupLocation string) *sync.Mutex {
|
||||
|
|
Loading…
Reference in New Issue