2018-03-14 18:17:27 +00:00
/ *
Copyright 2018 the Heptio Ark contributors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package controller
import (
"encoding/json"
2018-04-04 15:27:33 +00:00
"time"
2018-03-14 18:17:27 +00:00
jsonpatch "github.com/evanphx/json-patch"
"github.com/heptio/ark/pkg/apis/ark/v1"
pkgbackup "github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/cloudprovider"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
"github.com/heptio/ark/pkg/util/kube"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
2018-04-04 15:27:33 +00:00
"k8s.io/apimachinery/pkg/util/clock"
2018-03-14 18:17:27 +00:00
"k8s.io/client-go/tools/cache"
)
type backupDeletionController struct {
* genericController
deleteBackupRequestClient arkv1client . DeleteBackupRequestsGetter
deleteBackupRequestLister listers . DeleteBackupRequestLister
backupClient arkv1client . BackupsGetter
snapshotService cloudprovider . SnapshotService
backupService cloudprovider . BackupService
bucket string
restoreLister listers . RestoreLister
restoreClient arkv1client . RestoresGetter
processRequestFunc func ( * v1 . DeleteBackupRequest ) error
2018-04-04 15:27:33 +00:00
clock clock . Clock
2018-03-14 18:17:27 +00:00
}
// NewBackupDeletionController creates a new backup deletion controller.
func NewBackupDeletionController (
logger logrus . FieldLogger ,
deleteBackupRequestInformer informers . DeleteBackupRequestInformer ,
deleteBackupRequestClient arkv1client . DeleteBackupRequestsGetter ,
backupClient arkv1client . BackupsGetter ,
snapshotService cloudprovider . SnapshotService ,
backupService cloudprovider . BackupService ,
bucket string ,
restoreInformer informers . RestoreInformer ,
restoreClient arkv1client . RestoresGetter ,
) Interface {
c := & backupDeletionController {
genericController : newGenericController ( "backup-deletion" , logger ) ,
deleteBackupRequestClient : deleteBackupRequestClient ,
deleteBackupRequestLister : deleteBackupRequestInformer . Lister ( ) ,
backupClient : backupClient ,
snapshotService : snapshotService ,
backupService : backupService ,
bucket : bucket ,
restoreLister : restoreInformer . Lister ( ) ,
restoreClient : restoreClient ,
2018-04-04 15:27:33 +00:00
clock : & clock . RealClock { } ,
2018-03-14 18:17:27 +00:00
}
c . syncHandler = c . processQueueItem
c . cacheSyncWaiters = append ( c . cacheSyncWaiters , deleteBackupRequestInformer . Informer ( ) . HasSynced , restoreInformer . Informer ( ) . HasSynced )
c . processRequestFunc = c . processRequest
deleteBackupRequestInformer . Informer ( ) . AddEventHandler (
cache . ResourceEventHandlerFuncs {
AddFunc : c . enqueue ,
UpdateFunc : func ( _ , obj interface { } ) { c . enqueue ( obj ) } ,
} ,
)
2018-04-04 15:27:33 +00:00
c . resyncPeriod = time . Hour
c . resyncFunc = c . deleteExpiredRequests
2018-03-14 18:17:27 +00:00
return c
}
func ( c * backupDeletionController ) processQueueItem ( key string ) error {
log := c . logger . WithField ( "key" , key )
log . Debug ( "Running processItem" )
ns , name , err := cache . SplitMetaNamespaceKey ( key )
if err != nil {
return errors . Wrap ( err , "error splitting queue key" )
}
req , err := c . deleteBackupRequestLister . DeleteBackupRequests ( ns ) . Get ( name )
if apierrors . IsNotFound ( err ) {
log . Debug ( "Unable to find DeleteBackupRequest" )
return nil
}
if err != nil {
return errors . Wrap ( err , "error getting DeleteBackupRequest" )
}
switch req . Status . Phase {
case v1 . DeleteBackupRequestPhaseProcessed :
// Don't do anything because it's already been processed
default :
// Don't mutate the shared cache
reqCopy := req . DeepCopy ( )
return c . processRequestFunc ( reqCopy )
}
return nil
}
func ( c * backupDeletionController ) processRequest ( req * v1 . DeleteBackupRequest ) error {
log := c . logger . WithFields ( logrus . Fields {
"namespace" : req . Namespace ,
"name" : req . Name ,
"backup" : req . Spec . BackupName ,
} )
2018-04-05 18:53:10 +00:00
if req . Spec . BackupName == "" {
_ , err := c . patchDeleteBackupRequest ( req , func ( r * v1 . DeleteBackupRequest ) {
r . Status . Phase = v1 . DeleteBackupRequestPhaseProcessed
r . Status . Errors = [ ] string { "spec.backupName is required" }
} )
return err
}
2018-03-14 18:17:27 +00:00
var err error
// Update status to InProgress
req , err = c . patchDeleteBackupRequest ( req , func ( r * v1 . DeleteBackupRequest ) {
r . Status . Phase = v1 . DeleteBackupRequestPhaseInProgress
} )
if err != nil {
return err
}
// Get the backup we're trying to delete
backup , err := c . backupClient . Backups ( req . Namespace ) . Get ( req . Spec . BackupName , metav1 . GetOptions { } )
if apierrors . IsNotFound ( err ) {
// Couldn't find backup - update status to Processed and record the not-found error
req , err = c . patchDeleteBackupRequest ( req , func ( r * v1 . DeleteBackupRequest ) {
r . Status . Phase = v1 . DeleteBackupRequestPhaseProcessed
r . Status . Errors = [ ] string { "backup not found" }
} )
return err
}
if err != nil {
return errors . Wrap ( err , "error getting Backup" )
}
// If the backup includes snapshots but we don't currently have a PVProvider, we don't
// want to orphan the snapshots so skip deletion.
if c . snapshotService == nil && len ( backup . Status . VolumeBackups ) > 0 {
req , err = c . patchDeleteBackupRequest ( req , func ( r * v1 . DeleteBackupRequest ) {
r . Status . Phase = v1 . DeleteBackupRequestPhaseProcessed
r . Status . Errors = [ ] string { "unable to delete backup because it includes PV snapshots and Ark is not configured with a PersistentVolumeProvider" }
} )
return err
}
// Set backup status to Deleting
backup , err = c . patchBackup ( backup , func ( b * v1 . Backup ) {
b . Status . Phase = v1 . BackupPhaseDeleting
} )
if err != nil {
log . WithError ( errors . WithStack ( err ) ) . Error ( "Error setting backup phase to deleting" )
}
var errs [ ] string
// Try to delete snapshots
log . Info ( "Removing PV snapshots" )
for _ , volumeBackup := range backup . Status . VolumeBackups {
log . WithField ( "snapshotID" , volumeBackup . SnapshotID ) . Info ( "Removing snapshot associated with backup" )
if err := c . snapshotService . DeleteSnapshot ( volumeBackup . SnapshotID ) ; err != nil {
errs = append ( errs , errors . Wrapf ( err , "error deleting snapshot %s" , volumeBackup . SnapshotID ) . Error ( ) )
}
}
// Try to delete backup from object storage
log . Info ( "Removing backup from object storage" )
if err := c . backupService . DeleteBackupDir ( c . bucket , backup . Name ) ; err != nil {
errs = append ( errs , errors . Wrap ( err , "error deleting backup from object storage" ) . Error ( ) )
}
// Try to delete restores
log . Info ( "Removing restores" )
if restores , err := c . restoreLister . Restores ( backup . Namespace ) . List ( labels . Everything ( ) ) ; err != nil {
log . WithError ( errors . WithStack ( err ) ) . Error ( "Error listing restore API objects" )
} else {
for _ , restore := range restores {
if restore . Spec . BackupName != backup . Name {
continue
}
restoreLog := log . WithField ( "restore" , kube . NamespaceAndName ( restore ) )
restoreLog . Info ( "Deleting restore referencing backup" )
if err := c . restoreClient . Restores ( restore . Namespace ) . Delete ( restore . Name , & metav1 . DeleteOptions { } ) ; err != nil {
errs = append ( errs , errors . Wrapf ( err , "error deleting restore %s" , kube . NamespaceAndName ( restore ) ) . Error ( ) )
}
}
}
if len ( errs ) == 0 {
// Only try to delete the backup object from kube if everything preceding went smoothly
err = c . backupClient . Backups ( backup . Namespace ) . Delete ( backup . Name , nil )
if err != nil {
errs = append ( errs , errors . Wrapf ( err , "error deleting backup %s" , kube . NamespaceAndName ( backup ) ) . Error ( ) )
}
}
// Update status to processed and record errors
req , err = c . patchDeleteBackupRequest ( req , func ( r * v1 . DeleteBackupRequest ) {
r . Status . Phase = v1 . DeleteBackupRequestPhaseProcessed
r . Status . Errors = errs
} )
if err != nil {
return err
}
// Everything deleted correctly, so we can delete all DeleteBackupRequests for this backup
if len ( errs ) == 0 {
2018-04-05 18:21:45 +00:00
listOptions := pkgbackup . NewDeleteBackupRequestListOptions ( backup . Name , string ( backup . UID ) )
2018-03-14 18:17:27 +00:00
err = c . deleteBackupRequestClient . DeleteBackupRequests ( req . Namespace ) . DeleteCollection ( nil , listOptions )
if err != nil {
// If this errors, all we can do is log it.
c . logger . WithField ( "backup" , kube . NamespaceAndName ( backup ) ) . Error ( "error deleting all associated DeleteBackupRequests after successfully deleting the backup" )
}
}
return nil
}
2018-04-04 15:27:33 +00:00
const deleteBackupRequestMaxAge = 24 * time . Hour
func ( c * backupDeletionController ) deleteExpiredRequests ( ) {
c . logger . Info ( "Checking for expired DeleteBackupRequests" )
defer c . logger . Info ( "Done checking for expired DeleteBackupRequests" )
// Our shared informer factory filters on a single namespace, so asking for all is ok here.
requests , err := c . deleteBackupRequestLister . List ( labels . Everything ( ) )
if err != nil {
c . logger . WithError ( err ) . Error ( "unable to check for expired DeleteBackupRequests" )
return
}
now := c . clock . Now ( )
for _ , req := range requests {
if req . Status . Phase != v1 . DeleteBackupRequestPhaseProcessed {
continue
}
age := now . Sub ( req . CreationTimestamp . Time )
if age >= deleteBackupRequestMaxAge {
reqLog := c . logger . WithFields ( logrus . Fields { "namespace" : req . Namespace , "name" : req . Name } )
reqLog . Info ( "Deleting expired DeleteBackupRequest" )
err = c . deleteBackupRequestClient . DeleteBackupRequests ( req . Namespace ) . Delete ( req . Name , nil )
if err != nil {
reqLog . WithError ( err ) . Error ( "Error deleting DeleteBackupRequest" )
}
}
}
}
2018-03-14 18:17:27 +00:00
func ( c * backupDeletionController ) patchDeleteBackupRequest ( req * v1 . DeleteBackupRequest , mutate func ( * v1 . DeleteBackupRequest ) ) ( * v1 . DeleteBackupRequest , error ) {
// Record original json
oldData , err := json . Marshal ( req )
if err != nil {
return nil , errors . Wrap ( err , "error marshalling original DeleteBackupRequest" )
}
// Mutate
mutate ( req )
// Record new json
newData , err := json . Marshal ( req )
if err != nil {
return nil , errors . Wrap ( err , "error marshalling updated DeleteBackupRequest" )
}
patchBytes , err := jsonpatch . CreateMergePatch ( oldData , newData )
if err != nil {
return nil , errors . Wrap ( err , "error creating json merge patch for DeleteBackupRequest" )
}
req , err = c . deleteBackupRequestClient . DeleteBackupRequests ( req . Namespace ) . Patch ( req . Name , types . MergePatchType , patchBytes )
if err != nil {
return nil , errors . Wrap ( err , "error patching DeleteBackupRequest" )
}
return req , nil
}
func ( c * backupDeletionController ) patchBackup ( backup * v1 . Backup , mutate func ( * v1 . Backup ) ) ( * v1 . Backup , error ) {
// Record original json
oldData , err := json . Marshal ( backup )
if err != nil {
return nil , errors . Wrap ( err , "error marshalling original Backup" )
}
// Mutate
mutate ( backup )
// Record new json
newData , err := json . Marshal ( backup )
if err != nil {
return nil , errors . Wrap ( err , "error marshalling updated Backup" )
}
patchBytes , err := jsonpatch . CreateMergePatch ( oldData , newData )
if err != nil {
return nil , errors . Wrap ( err , "error creating json merge patch for Backup" )
}
backup , err = c . backupClient . Backups ( backup . Namespace ) . Patch ( backup . Name , types . MergePatchType , patchBytes )
if err != nil {
return nil , errors . Wrap ( err , "error patching Backup" )
}
return backup , nil
}