Skip reclaim policy Delete PVs without snapshots

If a PV has a reclaim policy of Delete and we didn't create a snapshot
of it, don't restore the PV, as doing so would create a PV whose
underlying volume is incorrect.

Also "reset" any PVCs bound to the PV so they'll be dynamically
provisioned when restored.

Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
pull/613/head
Andy Goldstein 2018-06-22 15:32:03 -04:00
parent 636b09a548
commit 7c283e5de8
2 changed files with 375 additions and 25 deletions

View File

@ -229,6 +229,14 @@ func (kr *kubernetesRestorer) Restore(restore *api.Restore, backup *api.Backup,
}
}
pvRestorer := &pvRestorer{
logger: log,
snapshotVolumes: backup.Spec.SnapshotVolumes,
restorePVs: restore.Spec.RestorePVs,
volumeBackups: backup.Status.VolumeBackups,
snapshotService: kr.snapshotService,
}
restoreCtx := &context{
backup: backup,
backupReader: backupReader,
@ -242,6 +250,8 @@ func (kr *kubernetesRestorer) Restore(restore *api.Restore, backup *api.Backup,
actions: resolvedActions,
snapshotService: kr.snapshotService,
resticRestorer: resticRestorer,
pvsToProvision: sets.NewString(),
pvRestorer: pvRestorer,
}
return restoreCtx.execute()
@ -324,6 +334,8 @@ type context struct {
globalWaitGroup arksync.ErrorGroup
resourceWaitGroup sync.WaitGroup
resourceWatches []watch.Interface
pvsToProvision sets.String
pvRestorer PVRestorer
}
func (ctx *context) infof(msg string, args ...interface{}) {
@ -656,9 +668,21 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
}
}
name := obj.GetName()
if groupResource == kuberesource.PersistentVolumes {
_, found := ctx.backup.Status.VolumeBackups[name]
reclaimPolicy, err := collections.GetString(obj.Object, "spec.persistentVolumeReclaimPolicy")
if err == nil && !found && reclaimPolicy == "Delete" {
ctx.infof("Not restoring PV because it doesn't have a snapshot and its reclaim policy is Delete.")
ctx.pvsToProvision.Insert(name)
continue
}
// restore the PV from snapshot (if applicable)
updatedObj, err := ctx.executePVAction(obj)
updatedObj, err := ctx.pvRestorer.executePVAction(obj)
if err != nil {
addToResult(&errs, namespace, fmt.Errorf("error executing PVAction for %s: %v", fullPath, err))
continue
@ -672,19 +696,37 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
return warnings, errs
}
ctx.resourceWatches = append(ctx.resourceWatches, resourceWatch)
ctx.resourceWaitGroup.Add(1)
go func() {
defer ctx.resourceWaitGroup.Done()
if _, err := waitForReady(resourceWatch.ResultChan(), obj.GetName(), isPVReady, time.Minute, ctx.logger); err != nil {
ctx.logger.Warnf("Timeout reached waiting for persistent volume %s to become ready", obj.GetName())
addArkError(&warnings, fmt.Errorf("timeout reached waiting for persistent volume %s to become ready", obj.GetName()))
if _, err := waitForReady(resourceWatch.ResultChan(), name, isPVReady, time.Minute, ctx.logger); err != nil {
ctx.logger.Warnf("Timeout reached waiting for persistent volume %s to become ready", name)
addArkError(&warnings, fmt.Errorf("timeout reached waiting for persistent volume %s to become ready", name))
}
}()
}
}
if groupResource == kuberesource.PersistentVolumeClaims {
spec, err := collections.GetMap(obj.UnstructuredContent(), "spec")
if err != nil {
addToResult(&errs, namespace, err)
continue
}
if volumeName, exists := spec["volumeName"]; exists && ctx.pvsToProvision.Has(volumeName.(string)) {
ctx.infof("Resetting PersistentVolumeClaim %s/%s for dynamic provisioning because its PV %v has a reclaim policy of Delete", namespace, name, volumeName)
delete(spec, "volumeName")
annotations := obj.GetAnnotations()
delete(annotations, "pv.kubernetes.io/bind-completed")
delete(annotations, "pv.kubernetes.io/bound-by-controller")
obj.SetAnnotations(annotations)
}
}
for _, action := range applicableActions {
if !action.selector.Matches(labels.Set(obj.GetLabels())) {
continue
@ -729,10 +771,10 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
// add an ark-restore label to each resource for easy ID
addLabel(obj, api.RestoreLabelKey, ctx.restore.Name)
ctx.infof("Restoring %s: %v", obj.GroupVersionKind().Kind, obj.GetName())
ctx.infof("Restoring %s: %v", obj.GroupVersionKind().Kind, name)
createdObj, restoreErr := resourceClient.Create(obj)
if apierrors.IsAlreadyExists(restoreErr) {
fromCluster, err := resourceClient.Get(obj.GetName(), metav1.GetOptions{})
fromCluster, err := resourceClient.Get(name, metav1.GetOptions{})
if err != nil {
ctx.infof("Error retrieving cluster version of %s: %v", kube.NamespaceAndName(obj), err)
addToResult(&warnings, namespace, err)
@ -773,7 +815,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
continue
}
_, err = resourceClient.Patch(obj.GetName(), patchBytes)
_, err = resourceClient.Patch(name, patchBytes)
if err != nil {
addToResult(&warnings, namespace, err)
} else {
@ -788,7 +830,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
}
// Error was something other than an AlreadyExists
if restoreErr != nil {
ctx.infof("error restoring %s: %v", obj.GetName(), err)
ctx.infof("error restoring %s: %v", name, err)
addToResult(&errs, namespace, fmt.Errorf("error restoring %s: %v", fullPath, restoreErr))
continue
}
@ -858,7 +900,19 @@ func waitForReady(
}
}
func (ctx *context) executePVAction(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
type PVRestorer interface {
executePVAction(obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
}
type pvRestorer struct {
logger logrus.FieldLogger
snapshotVolumes *bool
restorePVs *bool
volumeBackups map[string]*api.VolumeBackupInfo
snapshotService cloudprovider.SnapshotService
}
func (r *pvRestorer) executePVAction(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
pvName := obj.GetName()
if pvName == "" {
return nil, errors.New("PersistentVolume is missing its name")
@ -872,37 +926,44 @@ func (ctx *context) executePVAction(obj *unstructured.Unstructured) (*unstructur
delete(spec, "claimRef")
delete(spec, "storageClassName")
if boolptr.IsSetToFalse(ctx.backup.Spec.SnapshotVolumes) {
if boolptr.IsSetToFalse(r.snapshotVolumes) {
// The backup had snapshots disabled, so we can return early
return obj, nil
}
if boolptr.IsSetToFalse(ctx.restore.Spec.RestorePVs) {
if boolptr.IsSetToFalse(r.restorePVs) {
// The restore has pv restores disabled, so we can return early
return obj, nil
}
// If we can't find a snapshot record for this particular PV, it most likely wasn't a PV that Ark
// could snapshot, so return early instead of trying to restore from a snapshot.
backupInfo, found := ctx.backup.Status.VolumeBackups[pvName]
backupInfo, found := r.volumeBackups[pvName]
if !found {
return obj, nil
}
// Past this point, we expect to be doing a restore
if ctx.snapshotService == nil {
if r.snapshotService == nil {
return nil, errors.New("you must configure a persistentVolumeProvider to restore PersistentVolumes from snapshots")
}
ctx.infof("restoring PersistentVolume %s from SnapshotID %s", pvName, backupInfo.SnapshotID)
volumeID, err := ctx.snapshotService.CreateVolumeFromSnapshot(backupInfo.SnapshotID, backupInfo.Type, backupInfo.AvailabilityZone, backupInfo.Iops)
log := r.logger.WithFields(
logrus.Fields{
"persistentVolume": pvName,
"snapshot": backupInfo.SnapshotID,
},
)
log.Info("restoring persistent volume from snapshot")
volumeID, err := r.snapshotService.CreateVolumeFromSnapshot(backupInfo.SnapshotID, backupInfo.Type, backupInfo.AvailabilityZone, backupInfo.Iops)
if err != nil {
return nil, err
}
ctx.infof("successfully restored PersistentVolume %s from snapshot", pvName)
log.Info("successfully restored persistent volume from snapshot")
updated1, err := ctx.snapshotService.SetVolumeID(obj, volumeID)
updated1, err := r.snapshotService.SetVolumeID(obj, volumeID)
if err != nil {
return nil, err
}

View File

@ -21,8 +21,11 @@ import (
"testing"
"time"
"k8s.io/client-go/kubernetes/scheme"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"k8s.io/api/core/v1"
@ -32,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@ -578,6 +582,7 @@ func TestRestoreResourceForNamespace(t *testing.T) {
},
backup: &api.Backup{},
logger: arktest.NewLogger(),
pvRestorer: &pvRestorer{},
}
warnings, errors := ctx.restoreResource(test.resourcePath, test.namespace, test.resourcePath)
@ -674,6 +679,287 @@ func TestRestoringExistingServiceAccount(t *testing.T) {
}
}
func TestRestoringPVsWithoutSnapshots(t *testing.T) {
pv := `apiVersion: v1
kind: PersistentVolume
metadata:
annotations:
EXPORT_block: "\nEXPORT\n{\n\tExport_Id = 1;\n\tPath = /export/pvc-6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce;\n\tPseudo
= /export/pvc-6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce;\n\tAccess_Type = RW;\n\tSquash
= no_root_squash;\n\tSecType = sys;\n\tFilesystem_id = 1.1;\n\tFSAL {\n\t\tName
= VFS;\n\t}\n}\n"
Export_Id: "1"
Project_Id: "0"
Project_block: ""
Provisioner_Id: 5fdf4025-78a5-11e8-9ece-0242ac110004
kubernetes.io/createdby: nfs-dynamic-provisioner
pv.kubernetes.io/provisioned-by: example.com/nfs
volume.beta.kubernetes.io/mount-options: vers=4.1
creationTimestamp: 2018-06-25T18:27:35Z
finalizers:
- kubernetes.io/pv-protection
name: pvc-6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce
resourceVersion: "2576"
selfLink: /api/v1/persistentvolumes/pvc-6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce
uid: 6ecd24e4-78a5-11e8-a0d8-e2ad1e9734ce
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 1Mi
claimRef:
apiVersion: v1
kind: PersistentVolumeClaim
name: nfs
namespace: default
resourceVersion: "2565"
uid: 6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce
nfs:
path: /export/pvc-6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce
server: 10.103.235.254
storageClassName: example-nfs
status:
phase: Bound`
pvc := `apiVersion: v1
kind: PersistentVolumeClaim
metadata:
annotations:
control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"5fdf5572-78a5-11e8-9ece-0242ac110004","leaseDurationSeconds":15,"acquireTime":"2018-06-25T18:27:35Z","renewTime":"2018-06-25T18:27:37Z","leaderTransitions":0}'
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"v1","kind":"PersistentVolumeClaim","metadata":{"annotations":{},"name":"nfs","namespace":"default"},"spec":{"accessModes":["ReadWriteMany"],"resources":{"requests":{"storage":"1Mi"}},"storageClassName":"example-nfs"}}
pv.kubernetes.io/bind-completed: "yes"
pv.kubernetes.io/bound-by-controller: "yes"
volume.beta.kubernetes.io/storage-provisioner: example.com/nfs
creationTimestamp: 2018-06-25T18:27:28Z
finalizers:
- kubernetes.io/pvc-protection
name: nfs
namespace: default
resourceVersion: "2578"
selfLink: /api/v1/namespaces/default/persistentvolumeclaims/nfs
uid: 6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 1Mi
storageClassName: example-nfs
volumeName: pvc-6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce
status:
accessModes:
- ReadWriteMany
capacity:
storage: 1Mi
phase: Bound`
tests := []struct {
name string
haveSnapshot bool
reclaimPolicy string
expectPVCVolumeName bool
expectedPVCAnnotationsMissing sets.String
expectPVCreation bool
}{
{
name: "have snapshot, reclaim policy delete",
haveSnapshot: true,
reclaimPolicy: "Delete",
expectPVCVolumeName: true,
expectPVCreation: true,
},
{
name: "have snapshot, reclaim policy retain",
haveSnapshot: true,
reclaimPolicy: "Retain",
expectPVCVolumeName: true,
expectPVCreation: true,
},
{
name: "no snapshot, reclaim policy delete",
haveSnapshot: false,
reclaimPolicy: "Delete",
expectPVCVolumeName: false,
expectedPVCAnnotationsMissing: sets.NewString("pv.kubernetes.io/bind-completed", "pv.kubernetes.io/bound-by-controller"),
},
{
name: "no snapshot, reclaim policy retain",
haveSnapshot: false,
reclaimPolicy: "Retain",
expectPVCVolumeName: true,
expectPVCreation: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dynamicFactory := &arktest.FakeDynamicFactory{}
gv := schema.GroupVersion{Group: "", Version: "v1"}
pvClient := &arktest.FakeDynamicClient{}
defer pvClient.AssertExpectations(t)
pvResource := metav1.APIResource{Name: "persistentvolumes", Namespaced: false}
dynamicFactory.On("ClientForGroupVersionResource", gv, pvResource, "").Return(pvClient, nil)
pvcClient := &arktest.FakeDynamicClient{}
defer pvcClient.AssertExpectations(t)
pvcResource := metav1.APIResource{Name: "persistentvolumeclaims", Namespaced: true}
dynamicFactory.On("ClientForGroupVersionResource", gv, pvcResource, "default").Return(pvcClient, nil)
obj, _, err := scheme.Codecs.UniversalDecoder(v1.SchemeGroupVersion).Decode([]byte(pv), nil, nil)
require.NoError(t, err)
pvObj, ok := obj.(*v1.PersistentVolume)
require.True(t, ok)
pvObj.Spec.PersistentVolumeReclaimPolicy = v1.PersistentVolumeReclaimPolicy(test.reclaimPolicy)
pvBytes, err := json.Marshal(pvObj)
require.NoError(t, err)
obj, _, err = scheme.Codecs.UniversalDecoder(v1.SchemeGroupVersion).Decode([]byte(pvc), nil, nil)
require.NoError(t, err)
pvcObj, ok := obj.(*v1.PersistentVolumeClaim)
require.True(t, ok)
pvcBytes, err := json.Marshal(pvcObj)
require.NoError(t, err)
backup := &api.Backup{}
if test.haveSnapshot {
backup.Status.VolumeBackups = map[string]*api.VolumeBackupInfo{
"pvc-6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce": {
SnapshotID: "snap",
},
}
}
pvRestorer := new(mockPVRestorer)
defer pvRestorer.AssertExpectations(t)
ctx := &context{
dynamicFactory: dynamicFactory,
actions: []resolvedAction{},
fileSystem: arktest.NewFakeFileSystem().
WithFile("foo/resources/persistentvolumes/cluster/pv.json", pvBytes).
WithFile("foo/resources/persistentvolumeclaims/default/pvc.json", pvcBytes),
selector: labels.NewSelector(),
prioritizedResources: []schema.GroupResource{
kuberesource.PersistentVolumes,
kuberesource.PersistentVolumeClaims,
},
restore: &api.Restore{
ObjectMeta: metav1.ObjectMeta{
Namespace: api.DefaultNamespace,
Name: "my-restore",
},
},
backup: backup,
logger: arktest.NewLogger(),
pvsToProvision: sets.NewString(),
pvRestorer: pvRestorer,
}
pvWatch := new(mockWatch)
defer pvWatch.AssertExpectations(t)
unstructuredPVMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pvObj)
require.NoError(t, err)
unstructuredPV := &unstructured.Unstructured{Object: unstructuredPVMap}
pvToRestore := unstructuredPV.DeepCopy()
restoredPV := unstructuredPV.DeepCopy()
if test.expectPVCreation {
// just to ensure we have the data flowing correctly
restoredPV.Object["foo"] = "bar"
pvRestorer.On("executePVAction", pvToRestore).Return(restoredPV, nil)
}
resetMetadataAndStatus(unstructuredPV)
addLabel(unstructuredPV, api.RestoreLabelKey, ctx.restore.Name)
unstructuredPV.Object["foo"] = "bar"
if test.expectPVCreation {
createdPV := unstructuredPV.DeepCopy()
pvClient.On("Create", unstructuredPV).Return(createdPV, nil)
pvClient.On("Watch", metav1.ListOptions{}).Return(pvWatch, nil)
pvWatchChan := make(chan watch.Event, 1)
readyPV := restoredPV.DeepCopy()
readyStatus, err := collections.GetMap(readyPV.Object, "status")
require.NoError(t, err)
readyStatus["phase"] = string(v1.VolumeAvailable)
pvWatchChan <- watch.Event{
Type: watch.Modified,
Object: readyPV,
}
pvWatch.On("ResultChan").Return(pvWatchChan)
}
// Restore PV
warnings, errors := ctx.restoreResource("persistentvolumes", "", "foo/resources/persistentvolumes/cluster/")
assert.Empty(t, warnings.Ark)
assert.Empty(t, warnings.Cluster)
assert.Empty(t, warnings.Namespaces)
assert.Equal(t, api.RestoreResult{}, errors)
// Prep PVC restore
// Handle expectations
if !test.expectPVCVolumeName {
pvcObj.Spec.VolumeName = ""
}
for _, key := range test.expectedPVCAnnotationsMissing.List() {
delete(pvcObj.Annotations, key)
}
unstructuredPVCMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pvcObj)
require.NoError(t, err)
unstructuredPVC := &unstructured.Unstructured{Object: unstructuredPVCMap}
resetMetadataAndStatus(unstructuredPVC)
addLabel(unstructuredPVC, api.RestoreLabelKey, ctx.restore.Name)
createdPVC := unstructuredPVC.DeepCopy()
// just to ensure we have the data flowing correctly
createdPVC.Object["foo"] = "bar"
pvcClient.On("Create", unstructuredPVC).Return(createdPVC, nil)
// Restore PVC
warnings, errors = ctx.restoreResource("persistentvolumeclaims", "default", "foo/resources/persistentvolumeclaims/default/")
assert.Empty(t, warnings.Ark)
assert.Empty(t, warnings.Cluster)
assert.Empty(t, warnings.Namespaces)
assert.Equal(t, api.RestoreResult{}, errors)
ctx.resourceWaitGroup.Wait()
})
}
}
type mockPVRestorer struct {
mock.Mock
}
func (r *mockPVRestorer) executePVAction(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
args := r.Called(obj)
return args.Get(0).(*unstructured.Unstructured), args.Error(1)
}
type mockWatch struct {
mock.Mock
}
func (w *mockWatch) Stop() {
w.Called()
}
func (w *mockWatch) ResultChan() <-chan watch.Event {
args := w.Called()
return args.Get(0).(chan watch.Event)
}
type fakeWatch struct{}
func (w *fakeWatch) Stop() {}
@ -954,14 +1240,17 @@ func TestExecutePVAction(t *testing.T) {
snapshotService = fakeSnapshotService
}
ctx := &context{
restore: test.restore,
backup: test.backup,
snapshotService: snapshotService,
r := &pvRestorer{
logger: arktest.NewLogger(),
restorePVs: test.restore.Spec.RestorePVs,
snapshotService: snapshotService,
}
if test.backup != nil {
r.snapshotVolumes = test.backup.Spec.SnapshotVolumes
r.volumeBackups = test.backup.Status.VolumeBackups
}
res, err := ctx.executePVAction(test.obj)
res, err := r.executePVAction(test.obj)
if test.expectedErr {
require.Error(t, err)