pkg/restic: fix concurrency bugs causing dupe repos, panics
Signed-off-by: Steve Kriss <krisss@vmware.com>pull/1235/head
parent
e4771f582b
commit
e3e76c2067
|
@ -0,0 +1 @@
|
|||
Fix concurrency bug in code ensuring restic repository exists
|
|
@ -35,18 +35,31 @@ import (
|
|||
|
||||
// repositoryEnsurer ensures that Velero restic repositories are created and ready.
|
||||
type repositoryEnsurer struct {
|
||||
log logrus.FieldLogger
|
||||
repoLister velerov1listers.ResticRepositoryLister
|
||||
repoClient velerov1client.ResticRepositoriesGetter
|
||||
|
||||
readyChansLock sync.Mutex
|
||||
readyChans map[string]chan *velerov1api.ResticRepository
|
||||
|
||||
// repoLocksMu synchronizes reads/writes to the repoLocks map itself
|
||||
// since maps are not threadsafe.
|
||||
repoLocksMu sync.Mutex
|
||||
repoLocks map[repoKey]*sync.Mutex
|
||||
}
|
||||
|
||||
type repoKey struct {
|
||||
volumeNamespace string
|
||||
backupLocation string
|
||||
}
|
||||
|
||||
func newRepositoryEnsurer(repoInformer velerov1informers.ResticRepositoryInformer, repoClient velerov1client.ResticRepositoriesGetter, log logrus.FieldLogger) *repositoryEnsurer {
|
||||
r := &repositoryEnsurer{
|
||||
log: log,
|
||||
repoLister: repoInformer.Lister(),
|
||||
repoClient: repoClient,
|
||||
readyChans: make(map[string]chan *velerov1api.ResticRepository),
|
||||
repoLocks: make(map[repoKey]*sync.Mutex),
|
||||
}
|
||||
|
||||
repoInformer.Informer().AddEventHandler(
|
||||
|
@ -67,7 +80,7 @@ func newRepositoryEnsurer(repoInformer velerov1informers.ResticRepositoryInforme
|
|||
}
|
||||
|
||||
readyChan <- newObj
|
||||
delete(r.readyChans, newObj.Name)
|
||||
delete(r.readyChans, key)
|
||||
}
|
||||
},
|
||||
},
|
||||
|
@ -84,6 +97,30 @@ func repoLabels(volumeNamespace, backupLocation string) labels.Set {
|
|||
}
|
||||
|
||||
func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNamespace, backupLocation string) (*velerov1api.ResticRepository, error) {
|
||||
log := r.log.WithField("volumeNamespace", volumeNamespace).WithField("backupLocation", backupLocation)
|
||||
|
||||
// It's only safe to have one instance of this method executing concurrently for a
|
||||
// given volumeNamespace + backupLocation, so synchronize based on that. It's fine
|
||||
// to run concurrently for *different* namespaces/locations. If you had 2 goroutines
|
||||
// running this for the same inputs, both might find no ResticRepository exists, then
|
||||
// both would create new ones for the same namespace/location.
|
||||
//
|
||||
// This issue could probably be avoided if we had a deterministic name for
|
||||
// each restic repository, and we just tried to create it, checked for an
|
||||
// AlreadyExists err, and then waited for it to be ready. However, there are
|
||||
// already repositories in the wild with non-deterministic names (i.e. using
|
||||
// GenerateName) which poses a backwards compatibility problem.
|
||||
log.Debug("Acquiring lock")
|
||||
|
||||
repoMu := r.repoLock(volumeNamespace, backupLocation)
|
||||
repoMu.Lock()
|
||||
defer func() {
|
||||
repoMu.Unlock()
|
||||
log.Debug("Released lock")
|
||||
}()
|
||||
|
||||
log.Debug("Acquired lock")
|
||||
|
||||
selector := labels.SelectorFromSet(repoLabels(volumeNamespace, backupLocation))
|
||||
|
||||
repos, err := r.repoLister.ResticRepositories(namespace).List(selector)
|
||||
|
@ -97,11 +134,14 @@ func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam
|
|||
if repos[0].Status.Phase != velerov1api.ResticRepositoryPhaseReady {
|
||||
return nil, errors.New("restic repository is not ready")
|
||||
}
|
||||
|
||||
log.Debug("Ready repository found")
|
||||
return repos[0], nil
|
||||
}
|
||||
|
||||
// no repo found: create one and wait for it to be ready
|
||||
log.Debug("No repository found, creating one")
|
||||
|
||||
// no repo found: create one and wait for it to be ready
|
||||
repo := &velerov1api.ResticRepository{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: namespace,
|
||||
|
@ -137,3 +177,19 @@ func (r *repositoryEnsurer) getReadyChan(name string) chan *velerov1api.ResticRe
|
|||
r.readyChans[name] = make(chan *velerov1api.ResticRepository)
|
||||
return r.readyChans[name]
|
||||
}
|
||||
|
||||
func (r *repositoryEnsurer) repoLock(volumeNamespace, backupLocation string) *sync.Mutex {
|
||||
r.repoLocksMu.Lock()
|
||||
defer r.repoLocksMu.Unlock()
|
||||
|
||||
key := repoKey{
|
||||
volumeNamespace: volumeNamespace,
|
||||
backupLocation: backupLocation,
|
||||
}
|
||||
|
||||
if r.repoLocks[key] == nil {
|
||||
r.repoLocks[key] = new(sync.Mutex)
|
||||
}
|
||||
|
||||
return r.repoLocks[key]
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue