velero/pkg/restic/repository_ensurer.go

216 lines
7.1 KiB
Go

/*
Copyright 2018, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package restic
import (
"context"
"fmt"
"sync"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
)
// repositoryEnsurer ensures that Velero restic repositories are created and ready.
type repositoryEnsurer struct {
log logrus.FieldLogger
repoLister velerov1listers.ResticRepositoryLister
repoClient velerov1client.ResticRepositoriesGetter
repoChansLock sync.Mutex
repoChans map[string]chan *velerov1api.ResticRepository
// repoLocksMu synchronizes reads/writes to the repoLocks map itself
// since maps are not threadsafe.
repoLocksMu sync.Mutex
repoLocks map[repoKey]*sync.Mutex
}
type repoKey struct {
volumeNamespace string
backupLocation string
}
func newRepositoryEnsurer(repoInformer velerov1informers.ResticRepositoryInformer, repoClient velerov1client.ResticRepositoriesGetter, log logrus.FieldLogger) *repositoryEnsurer {
r := &repositoryEnsurer{
log: log,
repoLister: repoInformer.Lister(),
repoClient: repoClient,
repoChans: make(map[string]chan *velerov1api.ResticRepository),
repoLocks: make(map[repoKey]*sync.Mutex),
}
repoInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, upd interface{}) {
oldObj := old.(*velerov1api.ResticRepository)
newObj := upd.(*velerov1api.ResticRepository)
// we're only interested in phase-changing updates
if oldObj.Status.Phase == newObj.Status.Phase {
return
}
// we're only interested in updates where the updated object is either Ready or NotReady
if newObj.Status.Phase != velerov1api.ResticRepositoryPhaseReady && newObj.Status.Phase != velerov1api.ResticRepositoryPhaseNotReady {
return
}
r.repoChansLock.Lock()
defer r.repoChansLock.Unlock()
key := repoLabels(newObj.Spec.VolumeNamespace, newObj.Spec.BackupStorageLocation).String()
repoChan, ok := r.repoChans[key]
if !ok {
log.Debugf("No ready channel found for repository %s/%s", newObj.Namespace, newObj.Name)
return
}
repoChan <- newObj
},
},
)
return r
}
func repoLabels(volumeNamespace, backupLocation string) labels.Set {
return map[string]string{
velerov1api.ResticVolumeNamespaceLabel: label.GetValidName(volumeNamespace),
velerov1api.StorageLocationLabel: label.GetValidName(backupLocation),
}
}
func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNamespace, backupLocation string) (*velerov1api.ResticRepository, error) {
log := r.log.WithField("volumeNamespace", volumeNamespace).WithField("backupLocation", backupLocation)
// It's only safe to have one instance of this method executing concurrently for a
// given volumeNamespace + backupLocation, so synchronize based on that. It's fine
// to run concurrently for *different* namespaces/locations. If you had 2 goroutines
// running this for the same inputs, both might find no ResticRepository exists, then
// both would create new ones for the same namespace/location.
//
// This issue could probably be avoided if we had a deterministic name for
// each restic repository, and we just tried to create it, checked for an
// AlreadyExists err, and then waited for it to be ready. However, there are
// already repositories in the wild with non-deterministic names (i.e. using
// GenerateName) which poses a backwards compatibility problem.
log.Debug("Acquiring lock")
repoMu := r.repoLock(volumeNamespace, backupLocation)
repoMu.Lock()
defer func() {
repoMu.Unlock()
log.Debug("Released lock")
}()
log.Debug("Acquired lock")
selector := labels.SelectorFromSet(repoLabels(volumeNamespace, backupLocation))
repos, err := r.repoLister.ResticRepositories(namespace).List(selector)
if err != nil {
return nil, errors.WithStack(err)
}
if len(repos) > 1 {
return nil, errors.Errorf("more than one ResticRepository found for workload namespace %q, backup storage location %q", volumeNamespace, backupLocation)
}
if len(repos) == 1 {
if repos[0].Status.Phase != velerov1api.ResticRepositoryPhaseReady {
return nil, errors.Errorf("restic repository is not ready: %s", repos[0].Status.Message)
}
log.Debug("Ready repository found")
return repos[0], nil
}
log.Debug("No repository found, creating one")
// no repo found: create one and wait for it to be ready
repo := &velerov1api.ResticRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
GenerateName: fmt.Sprintf("%s-%s-", volumeNamespace, backupLocation),
Labels: repoLabels(volumeNamespace, backupLocation),
},
Spec: velerov1api.ResticRepositorySpec{
VolumeNamespace: volumeNamespace,
BackupStorageLocation: backupLocation,
MaintenanceFrequency: metav1.Duration{Duration: DefaultMaintenanceFrequency},
},
}
repoChan := r.getRepoChan(selector.String())
defer func() {
delete(r.repoChans, selector.String())
close(repoChan)
}()
if _, err := r.repoClient.ResticRepositories(namespace).Create(context.TODO(), repo, metav1.CreateOptions{}); err != nil {
return nil, errors.Wrapf(err, "unable to create restic repository resource")
}
select {
// repositories should become either ready or not ready quickly if they're
// newly created.
case <-time.After(time.Minute):
return nil, errors.New("timed out waiting for restic repository to become ready")
case <-ctx.Done():
return nil, errors.New("timed out waiting for restic repository to become ready")
case res := <-repoChan:
if res.Status.Phase == velerov1api.ResticRepositoryPhaseNotReady {
return nil, errors.Errorf("restic repository is not ready: %s", res.Status.Message)
}
return res, nil
}
}
func (r *repositoryEnsurer) getRepoChan(name string) chan *velerov1api.ResticRepository {
r.repoChansLock.Lock()
defer r.repoChansLock.Unlock()
r.repoChans[name] = make(chan *velerov1api.ResticRepository)
return r.repoChans[name]
}
func (r *repositoryEnsurer) repoLock(volumeNamespace, backupLocation string) *sync.Mutex {
r.repoLocksMu.Lock()
defer r.repoLocksMu.Unlock()
key := repoKey{
volumeNamespace: volumeNamespace,
backupLocation: backupLocation,
}
if r.repoLocks[key] == nil {
r.repoLocks[key] = new(sync.Mutex)
}
return r.repoLocks[key]
}