372 lines
12 KiB
Go
372 lines
12 KiB
Go
/*
|
|
Copyright 2018 the Heptio Ark contributors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package controller
|
|
|
|
import (
|
|
"encoding/json"
|
|
"time"
|
|
|
|
jsonpatch "github.com/evanphx/json-patch"
|
|
"github.com/heptio/ark/pkg/apis/ark/v1"
|
|
pkgbackup "github.com/heptio/ark/pkg/backup"
|
|
"github.com/heptio/ark/pkg/cloudprovider"
|
|
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
|
|
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
|
|
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
|
|
"github.com/heptio/ark/pkg/util/kube"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/clock"
|
|
"k8s.io/client-go/tools/cache"
|
|
)
|
|
|
|
type backupDeletionController struct {
|
|
*genericController
|
|
|
|
deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter
|
|
deleteBackupRequestLister listers.DeleteBackupRequestLister
|
|
backupClient arkv1client.BackupsGetter
|
|
snapshotService cloudprovider.SnapshotService
|
|
backupService cloudprovider.BackupService
|
|
bucket string
|
|
restoreLister listers.RestoreLister
|
|
restoreClient arkv1client.RestoresGetter
|
|
backupTracker BackupTracker
|
|
|
|
processRequestFunc func(*v1.DeleteBackupRequest) error
|
|
clock clock.Clock
|
|
}
|
|
|
|
// NewBackupDeletionController creates a new backup deletion controller.
|
|
func NewBackupDeletionController(
|
|
logger logrus.FieldLogger,
|
|
deleteBackupRequestInformer informers.DeleteBackupRequestInformer,
|
|
deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter,
|
|
backupClient arkv1client.BackupsGetter,
|
|
snapshotService cloudprovider.SnapshotService,
|
|
backupService cloudprovider.BackupService,
|
|
bucket string,
|
|
restoreInformer informers.RestoreInformer,
|
|
restoreClient arkv1client.RestoresGetter,
|
|
backupTracker BackupTracker,
|
|
) Interface {
|
|
c := &backupDeletionController{
|
|
genericController: newGenericController("backup-deletion", logger),
|
|
deleteBackupRequestClient: deleteBackupRequestClient,
|
|
deleteBackupRequestLister: deleteBackupRequestInformer.Lister(),
|
|
backupClient: backupClient,
|
|
snapshotService: snapshotService,
|
|
backupService: backupService,
|
|
bucket: bucket,
|
|
restoreLister: restoreInformer.Lister(),
|
|
restoreClient: restoreClient,
|
|
backupTracker: backupTracker,
|
|
clock: &clock.RealClock{},
|
|
}
|
|
|
|
c.syncHandler = c.processQueueItem
|
|
c.cacheSyncWaiters = append(c.cacheSyncWaiters, deleteBackupRequestInformer.Informer().HasSynced, restoreInformer.Informer().HasSynced)
|
|
c.processRequestFunc = c.processRequest
|
|
|
|
deleteBackupRequestInformer.Informer().AddEventHandler(
|
|
cache.ResourceEventHandlerFuncs{
|
|
AddFunc: c.enqueue,
|
|
UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) },
|
|
},
|
|
)
|
|
|
|
c.resyncPeriod = time.Hour
|
|
c.resyncFunc = c.deleteExpiredRequests
|
|
|
|
return c
|
|
}
|
|
|
|
func (c *backupDeletionController) processQueueItem(key string) error {
|
|
log := c.logger.WithField("key", key)
|
|
log.Debug("Running processItem")
|
|
|
|
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
|
if err != nil {
|
|
return errors.Wrap(err, "error splitting queue key")
|
|
}
|
|
|
|
req, err := c.deleteBackupRequestLister.DeleteBackupRequests(ns).Get(name)
|
|
if apierrors.IsNotFound(err) {
|
|
log.Debug("Unable to find DeleteBackupRequest")
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return errors.Wrap(err, "error getting DeleteBackupRequest")
|
|
}
|
|
|
|
switch req.Status.Phase {
|
|
case v1.DeleteBackupRequestPhaseProcessed:
|
|
// Don't do anything because it's already been processed
|
|
default:
|
|
// Don't mutate the shared cache
|
|
reqCopy := req.DeepCopy()
|
|
return c.processRequestFunc(reqCopy)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) error {
|
|
log := c.logger.WithFields(logrus.Fields{
|
|
"namespace": req.Namespace,
|
|
"name": req.Name,
|
|
"backup": req.Spec.BackupName,
|
|
})
|
|
|
|
var err error
|
|
|
|
// Make sure we have the backup name
|
|
if req.Spec.BackupName == "" {
|
|
_, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) {
|
|
r.Status.Phase = v1.DeleteBackupRequestPhaseProcessed
|
|
r.Status.Errors = []string{"spec.backupName is required"}
|
|
})
|
|
return err
|
|
}
|
|
|
|
// Don't allow deleting an in-progress backup
|
|
if c.backupTracker.Contains(req.Namespace, req.Spec.BackupName) {
|
|
_, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) {
|
|
r.Status.Phase = v1.DeleteBackupRequestPhaseProcessed
|
|
r.Status.Errors = []string{"backup is still in progress"}
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
// Update status to InProgress and set backup-name label if needed
|
|
req, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) {
|
|
r.Status.Phase = v1.DeleteBackupRequestPhaseInProgress
|
|
|
|
if req.Labels[v1.BackupNameLabel] == "" {
|
|
req.Labels[v1.BackupNameLabel] = req.Spec.BackupName
|
|
}
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get the backup we're trying to delete
|
|
backup, err := c.backupClient.Backups(req.Namespace).Get(req.Spec.BackupName, metav1.GetOptions{})
|
|
if apierrors.IsNotFound(err) {
|
|
// Couldn't find backup - update status to Processed and record the not-found error
|
|
req, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) {
|
|
r.Status.Phase = v1.DeleteBackupRequestPhaseProcessed
|
|
r.Status.Errors = []string{"backup not found"}
|
|
})
|
|
|
|
return err
|
|
}
|
|
if err != nil {
|
|
return errors.Wrap(err, "error getting Backup")
|
|
}
|
|
|
|
// Set backup-uid label if needed
|
|
if req.Labels[v1.BackupUIDLabel] == "" {
|
|
req, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) {
|
|
req.Labels[v1.BackupUIDLabel] = string(backup.UID)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// If the backup includes snapshots but we don't currently have a PVProvider, we don't
|
|
// want to orphan the snapshots so skip deletion.
|
|
if c.snapshotService == nil && len(backup.Status.VolumeBackups) > 0 {
|
|
req, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) {
|
|
r.Status.Phase = v1.DeleteBackupRequestPhaseProcessed
|
|
r.Status.Errors = []string{"unable to delete backup because it includes PV snapshots and Ark is not configured with a PersistentVolumeProvider"}
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
// Set backup status to Deleting
|
|
backup, err = c.patchBackup(backup, func(b *v1.Backup) {
|
|
b.Status.Phase = v1.BackupPhaseDeleting
|
|
})
|
|
if err != nil {
|
|
log.WithError(errors.WithStack(err)).Error("Error setting backup phase to deleting")
|
|
}
|
|
|
|
var errs []string
|
|
|
|
// Try to delete snapshots
|
|
log.Info("Removing PV snapshots")
|
|
for _, volumeBackup := range backup.Status.VolumeBackups {
|
|
log.WithField("snapshotID", volumeBackup.SnapshotID).Info("Removing snapshot associated with backup")
|
|
if err := c.snapshotService.DeleteSnapshot(volumeBackup.SnapshotID); err != nil {
|
|
errs = append(errs, errors.Wrapf(err, "error deleting snapshot %s", volumeBackup.SnapshotID).Error())
|
|
}
|
|
}
|
|
|
|
// Try to delete backup from object storage
|
|
log.Info("Removing backup from object storage")
|
|
if err := c.backupService.DeleteBackupDir(c.bucket, backup.Name); err != nil {
|
|
errs = append(errs, errors.Wrap(err, "error deleting backup from object storage").Error())
|
|
}
|
|
|
|
// Try to delete restores
|
|
log.Info("Removing restores")
|
|
if restores, err := c.restoreLister.Restores(backup.Namespace).List(labels.Everything()); err != nil {
|
|
log.WithError(errors.WithStack(err)).Error("Error listing restore API objects")
|
|
} else {
|
|
for _, restore := range restores {
|
|
if restore.Spec.BackupName != backup.Name {
|
|
continue
|
|
}
|
|
|
|
restoreLog := log.WithField("restore", kube.NamespaceAndName(restore))
|
|
|
|
restoreLog.Info("Deleting restore referencing backup")
|
|
if err := c.restoreClient.Restores(restore.Namespace).Delete(restore.Name, &metav1.DeleteOptions{}); err != nil {
|
|
errs = append(errs, errors.Wrapf(err, "error deleting restore %s", kube.NamespaceAndName(restore)).Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(errs) == 0 {
|
|
// Only try to delete the backup object from kube if everything preceding went smoothly
|
|
err = c.backupClient.Backups(backup.Namespace).Delete(backup.Name, nil)
|
|
if err != nil {
|
|
errs = append(errs, errors.Wrapf(err, "error deleting backup %s", kube.NamespaceAndName(backup)).Error())
|
|
}
|
|
}
|
|
|
|
// Update status to processed and record errors
|
|
req, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) {
|
|
r.Status.Phase = v1.DeleteBackupRequestPhaseProcessed
|
|
r.Status.Errors = errs
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Everything deleted correctly, so we can delete all DeleteBackupRequests for this backup
|
|
if len(errs) == 0 {
|
|
listOptions := pkgbackup.NewDeleteBackupRequestListOptions(backup.Name, string(backup.UID))
|
|
err = c.deleteBackupRequestClient.DeleteBackupRequests(req.Namespace).DeleteCollection(nil, listOptions)
|
|
if err != nil {
|
|
// If this errors, all we can do is log it.
|
|
c.logger.WithField("backup", kube.NamespaceAndName(backup)).Error("error deleting all associated DeleteBackupRequests after successfully deleting the backup")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
const deleteBackupRequestMaxAge = 24 * time.Hour
|
|
|
|
func (c *backupDeletionController) deleteExpiredRequests() {
|
|
c.logger.Info("Checking for expired DeleteBackupRequests")
|
|
defer c.logger.Info("Done checking for expired DeleteBackupRequests")
|
|
|
|
// Our shared informer factory filters on a single namespace, so asking for all is ok here.
|
|
requests, err := c.deleteBackupRequestLister.List(labels.Everything())
|
|
if err != nil {
|
|
c.logger.WithError(err).Error("unable to check for expired DeleteBackupRequests")
|
|
return
|
|
}
|
|
|
|
now := c.clock.Now()
|
|
|
|
for _, req := range requests {
|
|
if req.Status.Phase != v1.DeleteBackupRequestPhaseProcessed {
|
|
continue
|
|
}
|
|
|
|
age := now.Sub(req.CreationTimestamp.Time)
|
|
if age >= deleteBackupRequestMaxAge {
|
|
reqLog := c.logger.WithFields(logrus.Fields{"namespace": req.Namespace, "name": req.Name})
|
|
reqLog.Info("Deleting expired DeleteBackupRequest")
|
|
|
|
err = c.deleteBackupRequestClient.DeleteBackupRequests(req.Namespace).Delete(req.Name, nil)
|
|
if err != nil {
|
|
reqLog.WithError(err).Error("Error deleting DeleteBackupRequest")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *backupDeletionController) patchDeleteBackupRequest(req *v1.DeleteBackupRequest, mutate func(*v1.DeleteBackupRequest)) (*v1.DeleteBackupRequest, error) {
|
|
// Record original json
|
|
oldData, err := json.Marshal(req)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "error marshalling original DeleteBackupRequest")
|
|
}
|
|
|
|
// Mutate
|
|
mutate(req)
|
|
|
|
// Record new json
|
|
newData, err := json.Marshal(req)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "error marshalling updated DeleteBackupRequest")
|
|
}
|
|
|
|
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "error creating json merge patch for DeleteBackupRequest")
|
|
}
|
|
|
|
req, err = c.deleteBackupRequestClient.DeleteBackupRequests(req.Namespace).Patch(req.Name, types.MergePatchType, patchBytes)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "error patching DeleteBackupRequest")
|
|
}
|
|
|
|
return req, nil
|
|
}
|
|
|
|
func (c *backupDeletionController) patchBackup(backup *v1.Backup, mutate func(*v1.Backup)) (*v1.Backup, error) {
|
|
// Record original json
|
|
oldData, err := json.Marshal(backup)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "error marshalling original Backup")
|
|
}
|
|
|
|
// Mutate
|
|
mutate(backup)
|
|
|
|
// Record new json
|
|
newData, err := json.Marshal(backup)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "error marshalling updated Backup")
|
|
}
|
|
|
|
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "error creating json merge patch for Backup")
|
|
}
|
|
|
|
backup, err = c.backupClient.Backups(backup.Namespace).Patch(backup.Name, types.MergePatchType, patchBytes)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "error patching Backup")
|
|
}
|
|
|
|
return backup, nil
|
|
}
|