2017-08-02 17:27:17 +00:00
/ *
2018-01-02 18:51:49 +00:00
Copyright 2017 the Heptio Ark contributors .
2017-08-02 17:27:17 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package restore
import (
"archive/tar"
"compress/gzip"
2018-02-28 01:35:35 +00:00
go_context "context"
2017-08-02 17:27:17 +00:00
"encoding/json"
"fmt"
"io"
2017-11-21 17:24:43 +00:00
"io/ioutil"
2017-08-02 17:27:17 +00:00
"os"
"path/filepath"
"sort"
2018-02-28 01:35:35 +00:00
"sync"
"time"
2017-08-02 17:27:17 +00:00
2017-11-21 17:24:43 +00:00
"github.com/pkg/errors"
2017-09-14 21:27:31 +00:00
"github.com/sirupsen/logrus"
2019-01-25 03:33:07 +00:00
v1 "k8s.io/api/core/v1"
2018-03-29 18:50:30 +00:00
"k8s.io/apimachinery/pkg/api/equality"
2017-08-02 17:27:17 +00:00
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
2017-11-21 17:24:43 +00:00
"k8s.io/apimachinery/pkg/runtime"
2017-08-02 17:27:17 +00:00
"k8s.io/apimachinery/pkg/runtime/schema"
2018-02-28 01:35:35 +00:00
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
2017-08-02 17:27:17 +00:00
"k8s.io/apimachinery/pkg/util/sets"
2018-09-07 14:42:57 +00:00
"k8s.io/apimachinery/pkg/util/wait"
2018-02-28 01:35:35 +00:00
"k8s.io/apimachinery/pkg/watch"
2017-08-02 17:27:17 +00:00
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
2019-01-25 03:33:07 +00:00
api "github.com/heptio/velero/pkg/apis/velero/v1"
"github.com/heptio/velero/pkg/client"
"github.com/heptio/velero/pkg/cloudprovider"
"github.com/heptio/velero/pkg/discovery"
listers "github.com/heptio/velero/pkg/generated/listers/velero/v1"
"github.com/heptio/velero/pkg/kuberesource"
"github.com/heptio/velero/pkg/restic"
"github.com/heptio/velero/pkg/util/boolptr"
"github.com/heptio/velero/pkg/util/collections"
"github.com/heptio/velero/pkg/util/filesystem"
"github.com/heptio/velero/pkg/util/kube"
velerosync "github.com/heptio/velero/pkg/util/sync"
"github.com/heptio/velero/pkg/volume"
2017-08-02 17:27:17 +00:00
)
2018-10-16 14:28:05 +00:00
type BlockStoreGetter interface {
GetBlockStore ( name string ) ( cloudprovider . BlockStore , error )
}
2017-08-02 17:27:17 +00:00
// Restorer knows how to restore a backup.
type Restorer interface {
// Restore restores the backup data from backupReader, returning warnings and errors.
2018-10-16 14:28:05 +00:00
Restore ( log logrus . FieldLogger ,
restore * api . Restore ,
backup * api . Backup ,
volumeSnapshots [ ] * volume . Snapshot ,
backupReader io . Reader ,
actions [ ] ItemAction ,
snapshotLocationLister listers . VolumeSnapshotLocationLister ,
blockStoreGetter BlockStoreGetter ,
) ( api . RestoreResult , api . RestoreResult )
2017-08-02 17:27:17 +00:00
}
type gvString string
type kindString string
// kubernetesRestorer implements Restorer for restoring into a Kubernetes cluster.
type kubernetesRestorer struct {
2018-09-07 14:42:57 +00:00
discoveryHelper discovery . Helper
dynamicFactory client . DynamicFactory
namespaceClient corev1 . NamespaceInterface
resticRestorerFactory restic . RestorerFactory
resticTimeout time . Duration
resourceTerminatingTimeout time . Duration
resourcePriorities [ ] string
fileSystem filesystem . Interface
logger logrus . FieldLogger
2017-08-02 17:27:17 +00:00
}
2017-08-24 23:44:01 +00:00
// prioritizeResources returns an ordered, fully-resolved list of resources to restore based on
// the provided discovery helper, resource priorities, and included/excluded resources.
2017-11-21 17:24:43 +00:00
func prioritizeResources ( helper discovery . Helper , priorities [ ] string , includedResources * collections . IncludesExcludes , logger logrus . FieldLogger ) ( [ ] schema . GroupResource , error ) {
2017-08-02 17:27:17 +00:00
var ret [ ] schema . GroupResource
// set keeps track of resolved GroupResource names
set := sets . NewString ( )
// start by resolving priorities into GroupResources and adding them to ret
for _ , r := range priorities {
2017-08-24 23:44:01 +00:00
gvr , _ , err := helper . ResourceFor ( schema . ParseGroupResource ( r ) . WithVersion ( "" ) )
2017-08-02 17:27:17 +00:00
if err != nil {
return nil , err
}
2017-08-24 23:44:01 +00:00
gr := gvr . GroupResource ( )
2017-09-01 21:39:30 +00:00
if ! includedResources . ShouldInclude ( gr . String ( ) ) {
2017-09-14 21:27:31 +00:00
logger . WithField ( "groupResource" , gr ) . Info ( "Not including resource" )
2017-09-01 21:39:30 +00:00
continue
}
2017-08-02 17:27:17 +00:00
ret = append ( ret , gr )
set . Insert ( gr . String ( ) )
}
// go through everything we got from discovery and add anything not in "set" to byName
var byName [ ] schema . GroupResource
2017-09-01 21:39:30 +00:00
for _ , resourceGroup := range helper . Resources ( ) {
2017-08-02 17:27:17 +00:00
// will be something like storage.k8s.io/v1
groupVersion , err := schema . ParseGroupVersion ( resourceGroup . GroupVersion )
if err != nil {
return nil , err
}
for _ , resource := range resourceGroup . APIResources {
gr := groupVersion . WithResource ( resource . Name ) . GroupResource ( )
2017-09-01 21:39:30 +00:00
if ! includedResources . ShouldInclude ( gr . String ( ) ) {
2017-11-21 17:24:43 +00:00
logger . WithField ( "groupResource" , gr . String ( ) ) . Info ( "Not including resource" )
2017-09-01 21:39:30 +00:00
continue
}
2017-08-02 17:27:17 +00:00
if ! set . Has ( gr . String ( ) ) {
byName = append ( byName , gr )
}
}
}
// sort byName by name
sort . Slice ( byName , func ( i , j int ) bool {
return byName [ i ] . String ( ) < byName [ j ] . String ( )
} )
// combine prioritized with by-name
ret = append ( ret , byName ... )
return ret , nil
}
// NewKubernetesRestorer creates a new kubernetesRestorer.
func NewKubernetesRestorer (
discoveryHelper discovery . Helper ,
dynamicFactory client . DynamicFactory ,
resourcePriorities [ ] string ,
namespaceClient corev1 . NamespaceInterface ,
2018-02-28 01:35:35 +00:00
resticRestorerFactory restic . RestorerFactory ,
resticTimeout time . Duration ,
2018-09-07 14:42:57 +00:00
resourceTerminatingTimeout time . Duration ,
2017-12-12 23:22:46 +00:00
logger logrus . FieldLogger ,
2017-08-02 17:27:17 +00:00
) ( Restorer , error ) {
return & kubernetesRestorer {
2018-09-07 14:42:57 +00:00
discoveryHelper : discoveryHelper ,
dynamicFactory : dynamicFactory ,
namespaceClient : namespaceClient ,
resticRestorerFactory : resticRestorerFactory ,
resticTimeout : resticTimeout ,
resourceTerminatingTimeout : resourceTerminatingTimeout ,
resourcePriorities : resourcePriorities ,
logger : logger ,
fileSystem : filesystem . NewFileSystem ( ) ,
2017-08-02 17:27:17 +00:00
} , nil
}
// Restore executes a restore into the target Kubernetes cluster according to the restore spec
// and using data from the provided backup/backup reader. Returns a warnings and errors RestoreResult,
// respectively, summarizing info about the restore.
2018-10-16 14:28:05 +00:00
func ( kr * kubernetesRestorer ) Restore (
log logrus . FieldLogger ,
restore * api . Restore ,
backup * api . Backup ,
volumeSnapshots [ ] * volume . Snapshot ,
backupReader io . Reader ,
actions [ ] ItemAction ,
snapshotLocationLister listers . VolumeSnapshotLocationLister ,
blockStoreGetter BlockStoreGetter ,
) ( api . RestoreResult , api . RestoreResult ) {
2017-08-02 17:27:17 +00:00
// metav1.LabelSelectorAsSelector converts a nil LabelSelector to a
// Nothing Selector, i.e. a selector that matches nothing. We want
// a selector that matches everything. This can be accomplished by
// passing a non-nil empty LabelSelector.
ls := restore . Spec . LabelSelector
if ls == nil {
ls = & metav1 . LabelSelector { }
}
selector , err := metav1 . LabelSelectorAsSelector ( ls )
if err != nil {
return api . RestoreResult { } , api . RestoreResult { Ark : [ ] string { err . Error ( ) } }
}
2017-11-21 17:24:43 +00:00
// get resource includes-excludes
resourceIncludesExcludes := getResourceIncludesExcludes ( kr . discoveryHelper , restore . Spec . IncludedResources , restore . Spec . ExcludedResources )
prioritizedResources , err := prioritizeResources ( kr . discoveryHelper , kr . resourcePriorities , resourceIncludesExcludes , log )
if err != nil {
return api . RestoreResult { } , api . RestoreResult { Ark : [ ] string { err . Error ( ) } }
}
resolvedActions , err := resolveActions ( actions , kr . discoveryHelper )
if err != nil {
return api . RestoreResult { } , api . RestoreResult { Ark : [ ] string { err . Error ( ) } }
}
2018-02-28 01:35:35 +00:00
podVolumeTimeout := kr . resticTimeout
if val := restore . Annotations [ api . PodVolumeOperationTimeoutAnnotation ] ; val != "" {
parsed , err := time . ParseDuration ( val )
if err != nil {
log . WithError ( errors . WithStack ( err ) ) . Errorf ( "Unable to parse pod volume timeout annotation %s, using server value." , val )
} else {
podVolumeTimeout = parsed
}
}
ctx , cancelFunc := go_context . WithTimeout ( go_context . Background ( ) , podVolumeTimeout )
defer cancelFunc ( )
var resticRestorer restic . Restorer
if kr . resticRestorerFactory != nil {
resticRestorer , err = kr . resticRestorerFactory . NewRestorer ( ctx , restore )
if err != nil {
return api . RestoreResult { } , api . RestoreResult { Ark : [ ] string { err . Error ( ) } }
}
}
2018-06-22 19:32:03 +00:00
pvRestorer := & pvRestorer {
2018-10-16 14:28:05 +00:00
logger : log ,
2018-10-19 18:54:24 +00:00
backup : backup ,
2018-10-16 14:28:05 +00:00
snapshotVolumes : backup . Spec . SnapshotVolumes ,
restorePVs : restore . Spec . RestorePVs ,
volumeSnapshots : volumeSnapshots ,
blockStoreGetter : blockStoreGetter ,
snapshotLocationLister : snapshotLocationLister ,
2018-06-22 19:32:03 +00:00
}
2018-02-28 01:35:35 +00:00
restoreCtx := & context {
2018-09-07 14:42:57 +00:00
backup : backup ,
backupReader : backupReader ,
restore : restore ,
prioritizedResources : prioritizedResources ,
selector : selector ,
log : log ,
dynamicFactory : kr . dynamicFactory ,
fileSystem : kr . fileSystem ,
namespaceClient : kr . namespaceClient ,
actions : resolvedActions ,
blockStoreGetter : blockStoreGetter ,
resticRestorer : resticRestorer ,
pvsToProvision : sets . NewString ( ) ,
pvRestorer : pvRestorer ,
volumeSnapshots : volumeSnapshots ,
resourceTerminatingTimeout : kr . resourceTerminatingTimeout ,
2017-09-12 19:54:08 +00:00
}
2018-02-28 01:35:35 +00:00
return restoreCtx . execute ( )
2017-09-12 19:54:08 +00:00
}
2017-11-21 17:24:43 +00:00
// getResourceIncludesExcludes takes the lists of resources to include and exclude, uses the
// discovery helper to resolve them to fully-qualified group-resource names, and returns an
// IncludesExcludes list.
func getResourceIncludesExcludes ( helper discovery . Helper , includes , excludes [ ] string ) * collections . IncludesExcludes {
resources := collections . GenerateIncludesExcludes (
includes ,
excludes ,
func ( item string ) string {
gvr , _ , err := helper . ResourceFor ( schema . ParseGroupResource ( item ) . WithVersion ( "" ) )
if err != nil {
return ""
}
gr := gvr . GroupResource ( )
return gr . String ( )
} ,
)
return resources
}
type resolvedAction struct {
ItemAction
resourceIncludesExcludes * collections . IncludesExcludes
namespaceIncludesExcludes * collections . IncludesExcludes
selector labels . Selector
}
func resolveActions ( actions [ ] ItemAction , helper discovery . Helper ) ( [ ] resolvedAction , error ) {
var resolved [ ] resolvedAction
for _ , action := range actions {
resourceSelector , err := action . AppliesTo ( )
if err != nil {
return nil , err
}
resources := getResourceIncludesExcludes ( helper , resourceSelector . IncludedResources , resourceSelector . ExcludedResources )
namespaces := collections . NewIncludesExcludes ( ) . Includes ( resourceSelector . IncludedNamespaces ... ) . Excludes ( resourceSelector . ExcludedNamespaces ... )
selector := labels . Everything ( )
if resourceSelector . LabelSelector != "" {
if selector , err = labels . Parse ( resourceSelector . LabelSelector ) ; err != nil {
return nil , err
}
}
res := resolvedAction {
ItemAction : action ,
resourceIncludesExcludes : resources ,
namespaceIncludesExcludes : namespaces ,
selector : selector ,
}
resolved = append ( resolved , res )
}
return resolved , nil
}
2017-09-12 19:54:08 +00:00
type context struct {
2018-09-07 14:42:57 +00:00
backup * api . Backup
backupReader io . Reader
restore * api . Restore
prioritizedResources [ ] schema . GroupResource
selector labels . Selector
log logrus . FieldLogger
dynamicFactory client . DynamicFactory
fileSystem filesystem . Interface
namespaceClient corev1 . NamespaceInterface
actions [ ] resolvedAction
blockStoreGetter BlockStoreGetter
resticRestorer restic . Restorer
globalWaitGroup velerosync . ErrorGroup
resourceWaitGroup sync . WaitGroup
resourceWatches [ ] watch . Interface
pvsToProvision sets . String
pvRestorer PVRestorer
volumeSnapshots [ ] * volume . Snapshot
resourceTerminatingTimeout time . Duration
2017-09-12 19:54:08 +00:00
}
func ( ctx * context ) execute ( ) ( api . RestoreResult , api . RestoreResult ) {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "Starting restore of backup %s" , kube . NamespaceAndName ( ctx . backup ) )
2017-09-12 19:54:08 +00:00
dir , err := ctx . unzipAndExtractBackup ( ctx . backupReader )
2017-08-02 17:27:17 +00:00
if err != nil {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "error unzipping and extracting: %v" , err )
2017-08-02 17:27:17 +00:00
return api . RestoreResult { } , api . RestoreResult { Ark : [ ] string { err . Error ( ) } }
}
2017-09-12 19:54:08 +00:00
defer ctx . fileSystem . RemoveAll ( dir )
2017-08-02 17:27:17 +00:00
2017-09-12 19:54:08 +00:00
return ctx . restoreFromDir ( dir )
2017-08-02 17:27:17 +00:00
}
// restoreFromDir executes a restore based on backup data contained within a local
// directory.
2017-09-12 19:54:08 +00:00
func ( ctx * context ) restoreFromDir ( dir string ) ( api . RestoreResult , api . RestoreResult ) {
2017-10-10 18:43:53 +00:00
warnings , errs := api . RestoreResult { } , api . RestoreResult { }
2017-08-02 17:27:17 +00:00
2017-11-21 17:24:43 +00:00
namespaceFilter := collections . NewIncludesExcludes ( ) .
Includes ( ctx . restore . Spec . IncludedNamespaces ... ) .
Excludes ( ctx . restore . Spec . ExcludedNamespaces ... )
2017-10-10 18:43:53 +00:00
// Make sure the top level "resources" dir exists:
resourcesDir := filepath . Join ( dir , api . ResourcesDir )
rde , err := ctx . fileSystem . DirExists ( resourcesDir )
2017-08-02 17:27:17 +00:00
if err != nil {
2019-01-25 03:33:07 +00:00
addVeleroError ( & errs , err )
2017-10-10 18:43:53 +00:00
return warnings , errs
2017-08-02 17:27:17 +00:00
}
2017-10-10 18:43:53 +00:00
if ! rde {
2019-01-25 03:33:07 +00:00
addVeleroError ( & errs , errors . New ( "backup does not contain top level resources directory" ) )
2017-11-21 17:24:43 +00:00
return warnings , errs
2017-08-02 17:27:17 +00:00
}
2017-10-10 18:43:53 +00:00
resourceDirs , err := ctx . fileSystem . ReadDir ( resourcesDir )
2017-08-11 21:05:06 +00:00
if err != nil {
2019-01-25 03:33:07 +00:00
addVeleroError ( & errs , err )
2017-10-10 18:43:53 +00:00
return warnings , errs
2017-08-11 21:05:06 +00:00
}
2017-10-10 18:43:53 +00:00
resourceDirsMap := make ( map [ string ] os . FileInfo )
for _ , rscDir := range resourceDirs {
rscName := rscDir . Name ( )
resourceDirsMap [ rscName ] = rscDir
2017-08-02 17:27:17 +00:00
}
2017-11-21 17:24:43 +00:00
existingNamespaces := sets . NewString ( )
2018-02-28 01:35:35 +00:00
// TODO this is not optimal since it'll keep watches open for all resources/namespaces
// until the very end of the restore. This should be done per resource type. Deferring
// refactoring for now since this may be able to be removed entirely if we eliminate
// waiting for PV snapshot restores.
defer func ( ) {
for _ , watch := range ctx . resourceWatches {
watch . Stop ( )
}
} ( )
2017-10-10 18:43:53 +00:00
for _ , resource := range ctx . prioritizedResources {
2017-11-30 00:53:07 +00:00
// we don't want to explicitly restore namespace API objs because we'll handle
// them as a special case prior to restoring anything into them
2018-05-11 19:40:19 +00:00
if resource == kuberesource . Namespaces {
2017-11-30 00:53:07 +00:00
continue
}
2017-10-10 18:43:53 +00:00
rscDir := resourceDirsMap [ resource . String ( ) ]
if rscDir == nil {
continue
}
resourcePath := filepath . Join ( resourcesDir , rscDir . Name ( ) )
clusterSubDir := filepath . Join ( resourcePath , api . ClusterScopedDir )
clusterSubDirExists , err := ctx . fileSystem . DirExists ( clusterSubDir )
if err != nil {
2019-01-25 03:33:07 +00:00
addVeleroError ( & errs , err )
2017-10-10 18:43:53 +00:00
return warnings , errs
}
if clusterSubDirExists {
w , e := ctx . restoreResource ( resource . String ( ) , "" , clusterSubDir )
merge ( & warnings , & w )
merge ( & errs , & e )
2017-08-02 17:27:17 +00:00
continue
}
2017-10-10 18:43:53 +00:00
nsSubDir := filepath . Join ( resourcePath , api . NamespaceScopedDir )
nsSubDirExists , err := ctx . fileSystem . DirExists ( nsSubDir )
if err != nil {
2019-01-25 03:33:07 +00:00
addVeleroError ( & errs , err )
2017-10-10 18:43:53 +00:00
return warnings , errs
}
if ! nsSubDirExists {
2017-08-02 17:27:17 +00:00
continue
}
2017-08-27 16:42:10 +00:00
2017-10-10 18:43:53 +00:00
nsDirs , err := ctx . fileSystem . ReadDir ( nsSubDir )
if err != nil {
2019-01-25 03:33:07 +00:00
addVeleroError ( & errs , err )
2017-10-10 18:43:53 +00:00
return warnings , errs
}
for _ , nsDir := range nsDirs {
if ! nsDir . IsDir ( ) {
continue
}
nsName := nsDir . Name ( )
nsPath := filepath . Join ( nsSubDir , nsName )
if ! namespaceFilter . ShouldInclude ( nsName ) {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "Skipping namespace %s" , nsName )
2017-10-10 18:43:53 +00:00
continue
}
// fetch mapped NS name
mappedNsName := nsName
if target , ok := ctx . restore . Spec . NamespaceMapping [ nsName ] ; ok {
mappedNsName = target
}
2017-11-21 17:24:43 +00:00
// if we don't know whether this namespace exists yet, attempt to create
// it in order to ensure it exists. Try to get it from the backup tarball
// (in order to get any backed-up metadata), but if we don't find it there,
// create a blank one.
if ! existingNamespaces . Has ( mappedNsName ) {
2018-09-30 20:45:32 +00:00
logger := ctx . log . WithField ( "namespace" , nsName )
2017-11-21 17:24:43 +00:00
ns := getNamespace ( logger , filepath . Join ( dir , api . ResourcesDir , "namespaces" , api . ClusterScopedDir , nsName + ".json" ) , mappedNsName )
2018-09-07 14:42:57 +00:00
if _ , err := kube . EnsureNamespaceExistsAndIsReady ( ns , ctx . namespaceClient , ctx . resourceTerminatingTimeout ) ; err != nil {
2019-01-25 03:33:07 +00:00
addVeleroError ( & errs , err )
2017-11-21 17:24:43 +00:00
continue
}
// keep track of namespaces that we know exist so we don't
// have to try to create them multiple times
existingNamespaces . Insert ( mappedNsName )
2017-10-10 18:43:53 +00:00
}
2017-11-02 17:09:44 +00:00
w , e := ctx . restoreResource ( resource . String ( ) , mappedNsName , nsPath )
2017-10-10 18:43:53 +00:00
merge ( & warnings , & w )
merge ( & errs , & e )
}
2018-02-28 01:35:35 +00:00
// TODO timeout?
2018-09-30 20:45:32 +00:00
ctx . log . Debugf ( "Waiting on resource wait group for resource=%s" , resource . String ( ) )
2018-02-28 01:35:35 +00:00
ctx . resourceWaitGroup . Wait ( )
2018-09-30 20:45:32 +00:00
ctx . log . Debugf ( "Done waiting on resource wait group for resource=%s" , resource . String ( ) )
2018-02-28 01:35:35 +00:00
}
// TODO timeout?
2018-09-30 20:45:32 +00:00
ctx . log . Debug ( "Waiting on global wait group" )
2018-02-28 01:35:35 +00:00
waitErrs := ctx . globalWaitGroup . Wait ( )
2018-09-30 20:45:32 +00:00
ctx . log . Debug ( "Done waiting on global wait group" )
2018-02-28 01:35:35 +00:00
for _ , err := range waitErrs {
2019-01-25 03:33:07 +00:00
// TODO not ideal to be adding these to Velero-level errors
2018-02-28 01:35:35 +00:00
// rather than a specific namespace, but don't have a way
// to track the namespace right now.
2019-01-25 03:33:07 +00:00
errs . Velero = append ( errs . Velero , err . Error ( ) )
2017-08-02 17:27:17 +00:00
}
2017-10-10 18:43:53 +00:00
return warnings , errs
2017-08-02 17:27:17 +00:00
}
2017-11-21 17:24:43 +00:00
// getNamespace returns a namespace API object that we should attempt to
// create before restoring anything into it. It will come from the backup
// tarball if it exists, else will be a new one. If from the tarball, it
// will retain its labels, annotations, and spec.
func getNamespace ( logger logrus . FieldLogger , path , remappedName string ) * v1 . Namespace {
var nsBytes [ ] byte
var err error
if nsBytes , err = ioutil . ReadFile ( path ) ; err != nil {
return & v1 . Namespace {
ObjectMeta : metav1 . ObjectMeta {
Name : remappedName ,
} ,
}
}
var backupNS v1 . Namespace
if err := json . Unmarshal ( nsBytes , & backupNS ) ; err != nil {
logger . Warnf ( "Error unmarshalling namespace from backup, creating new one." )
return & v1 . Namespace {
ObjectMeta : metav1 . ObjectMeta {
Name : remappedName ,
} ,
}
}
return & v1 . Namespace {
ObjectMeta : metav1 . ObjectMeta {
Name : remappedName ,
Labels : backupNS . Labels ,
Annotations : backupNS . Annotations ,
} ,
Spec : backupNS . Spec ,
}
}
2017-08-02 17:27:17 +00:00
// merge combines two RestoreResult objects into one
// by appending the corresponding lists to one another.
func merge ( a , b * api . RestoreResult ) {
a . Cluster = append ( a . Cluster , b . Cluster ... )
2019-01-25 03:33:07 +00:00
a . Velero = append ( a . Velero , b . Velero ... )
2017-08-02 17:27:17 +00:00
for k , v := range b . Namespaces {
if a . Namespaces == nil {
a . Namespaces = make ( map [ string ] [ ] string )
}
a . Namespaces [ k ] = append ( a . Namespaces [ k ] , v ... )
}
}
2019-01-25 03:33:07 +00:00
// addVeleroError appends an error to the provided RestoreResult's Velero list.
func addVeleroError ( r * api . RestoreResult , err error ) {
r . Velero = append ( r . Velero , err . Error ( ) )
2017-08-02 17:27:17 +00:00
}
// addToResult appends an error to the provided RestoreResult, either within
// the cluster-scoped list (if ns == "") or within the provided namespace's
// entry.
func addToResult ( r * api . RestoreResult , ns string , e error ) {
if ns == "" {
r . Cluster = append ( r . Cluster , e . Error ( ) )
} else {
if r . Namespaces == nil {
r . Namespaces = make ( map [ string ] [ ] string )
}
r . Namespaces [ ns ] = append ( r . Namespaces [ ns ] , e . Error ( ) )
}
}
2018-09-07 14:42:57 +00:00
func ( ctx * context ) shouldRestore ( name string , pvClient client . Dynamic ) ( bool , error ) {
pvLogger := ctx . log . WithField ( "pvName" , name )
var shouldRestore bool
err := wait . PollImmediate ( time . Second , ctx . resourceTerminatingTimeout , func ( ) ( bool , error ) {
clusterPV , err := pvClient . Get ( name , metav1 . GetOptions { } )
if apierrors . IsNotFound ( err ) {
pvLogger . Debug ( "PV not found, safe to restore" )
// PV not found, can safely exit loop and proceed with restore.
shouldRestore = true
return true , nil
}
if err != nil {
return false , errors . Wrapf ( err , "could not retrieve in-cluster copy of PV %s" , name )
}
phase , err := collections . GetString ( clusterPV . UnstructuredContent ( ) , "status.phase" )
if err != nil {
// Break the loop since we couldn't read the phase
return false , errors . Wrapf ( err , "error getting phase for in-cluster PV %s" , name )
}
if phase == string ( v1 . VolumeReleased ) || clusterPV . GetDeletionTimestamp ( ) != nil {
// PV was found and marked for deletion, or it was released; wait for it to go away.
pvLogger . Debugf ( "PV found, but marked for deletion, waiting" )
return false , nil
}
// Check for the namespace and PVC to see if anything that's referencing the PV is deleting.
// If either the namespace or PVC is in a deleting/terminating state, wait for them to finish before
// trying to restore the PV
// Not doing so may result in the underlying PV disappearing but not restoring due to timing issues,
// then the PVC getting restored and showing as lost.
namespace , err := collections . GetString ( clusterPV . UnstructuredContent ( ) , "spec.claimRef.namespace" )
if err != nil {
return false , errors . Wrapf ( err , "error looking up namespace name for in-cluster PV %s" , name )
}
pvcName , err := collections . GetString ( clusterPV . UnstructuredContent ( ) , "spec.claimRef.name" )
if err != nil {
return false , errors . Wrapf ( err , "error looking up persistentvolumeclaim for in-cluster PV %s" , name )
}
// Have to create the PVC client here because we don't know what namespace we're using til we get to this point.
// Using a dynamic client since it's easier to mock for testing
pvcResource := metav1 . APIResource { Name : "persistentvolumeclaims" , Namespaced : true }
pvcClient , err := ctx . dynamicFactory . ClientForGroupVersionResource ( schema . GroupVersion { Group : "" , Version : "v1" } , pvcResource , namespace )
if err != nil {
return false , errors . Wrapf ( err , "error getting pvc client" )
}
pvc , err := pvcClient . Get ( pvcName , metav1 . GetOptions { } )
if apierrors . IsNotFound ( err ) {
pvLogger . Debugf ( "PVC %s for PV not found, waiting" , pvcName )
// PVC wasn't found, but the PV still exists, so continue to wait.
return false , nil
}
if err != nil {
return false , errors . Wrapf ( err , "error getting claim %s for persistent volume" , pvcName )
}
if pvc != nil && pvc . GetDeletionTimestamp ( ) != nil {
pvLogger . Debugf ( "PVC for PV marked for deletion, waiting" )
// PVC is still deleting, continue to wait.
return false , nil
}
// Check the namespace associated with the claimRef to see if it's deleting/terminating before proceeding
ns , err := ctx . namespaceClient . Get ( namespace , metav1 . GetOptions { } )
if apierrors . IsNotFound ( err ) {
pvLogger . Debugf ( "namespace %s for PV not found, waiting" , namespace )
// namespace not found but the PV still exists, so continue to wait
return false , nil
}
if err != nil {
return false , errors . Wrapf ( err , "error getting namespace %s associated with PV %s" , namespace , name )
}
if ns != nil && ( ns . GetDeletionTimestamp ( ) != nil || ns . Status . Phase == v1 . NamespaceTerminating ) {
pvLogger . Debugf ( "namespace %s associated with PV is deleting, waiting" , namespace )
// namespace is in the process of deleting, keep looping
return false , nil
}
// None of the PV, PVC, or NS are marked for deletion, break the loop.
pvLogger . Debug ( "PV, associated PVC and namespace are not marked for deletion" )
return true , nil
} )
if err == wait . ErrWaitTimeout {
pvLogger . Debug ( "timeout reached waiting for persistent volume to delete" )
}
return shouldRestore , err
}
2017-10-10 18:43:53 +00:00
// restoreResource restores the specified cluster or namespace scoped resource. If namespace is
// empty we are restoring a cluster level resource, otherwise into the specified namespace.
func ( ctx * context ) restoreResource ( resource , namespace , resourcePath string ) ( api . RestoreResult , api . RestoreResult ) {
warnings , errs := api . RestoreResult { } , api . RestoreResult { }
2017-08-02 17:27:17 +00:00
2017-10-20 19:51:54 +00:00
if ctx . restore . Spec . IncludeClusterResources != nil && ! * ctx . restore . Spec . IncludeClusterResources && namespace == "" {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "Skipping resource %s because it's cluster-scoped" , resource )
2017-10-20 19:51:54 +00:00
return warnings , errs
}
2017-10-10 18:43:53 +00:00
if namespace != "" {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "Restoring resource '%s' into namespace '%s' from: %s" , resource , namespace , resourcePath )
2017-08-02 17:27:17 +00:00
} else {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "Restoring cluster level resource '%s' from: %s" , resource , resourcePath )
2017-08-02 17:27:17 +00:00
}
2017-09-12 19:54:08 +00:00
files , err := ctx . fileSystem . ReadDir ( resourcePath )
2017-08-02 17:27:17 +00:00
if err != nil {
2017-10-10 18:43:53 +00:00
addToResult ( & errs , namespace , fmt . Errorf ( "error reading %q resource directory: %v" , resource , err ) )
return warnings , errs
2017-08-02 17:27:17 +00:00
}
if len ( files ) == 0 {
2017-10-10 18:43:53 +00:00
return warnings , errs
2017-08-02 17:27:17 +00:00
}
var (
2017-11-21 17:24:43 +00:00
resourceClient client . Dynamic
groupResource = schema . ParseGroupResource ( resource )
applicableActions [ ] resolvedAction
2018-02-28 01:35:35 +00:00
resourceWatch watch . Interface
2017-08-02 17:27:17 +00:00
)
2017-11-21 17:24:43 +00:00
// pre-filter the actions based on namespace & resource includes/excludes since
// these will be the same for all items being restored below
for _ , action := range ctx . actions {
if ! action . resourceIncludesExcludes . ShouldInclude ( groupResource . String ( ) ) {
continue
}
if namespace != "" && ! action . namespaceIncludesExcludes . ShouldInclude ( namespace ) {
continue
}
applicableActions = append ( applicableActions , action )
}
2017-08-02 17:27:17 +00:00
for _ , file := range files {
fullPath := filepath . Join ( resourcePath , file . Name ( ) )
2017-09-12 19:54:08 +00:00
obj , err := ctx . unmarshal ( fullPath )
2017-08-02 17:27:17 +00:00
if err != nil {
2017-10-10 18:43:53 +00:00
addToResult ( & errs , namespace , fmt . Errorf ( "error decoding %q: %v" , fullPath , err ) )
2017-08-02 17:27:17 +00:00
continue
}
2017-09-12 19:54:08 +00:00
if ! ctx . selector . Matches ( labels . Set ( obj . GetLabels ( ) ) ) {
2017-08-02 17:27:17 +00:00
continue
}
2018-04-26 20:07:50 +00:00
complete , err := isCompleted ( obj , groupResource )
if err != nil {
addToResult ( & errs , namespace , fmt . Errorf ( "error checking completion %q: %v" , fullPath , err ) )
continue
}
if complete {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "%s is complete - skipping" , kube . NamespaceAndName ( obj ) )
2018-04-26 20:07:50 +00:00
continue
}
2017-11-21 17:24:43 +00:00
if resourceClient == nil {
// initialize client for this Resource. we need
2017-08-02 17:27:17 +00:00
// metadata from an object to do this.
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "Getting client for %v" , obj . GroupVersionKind ( ) )
2017-08-02 17:27:17 +00:00
resource := metav1 . APIResource {
Namespaced : len ( namespace ) > 0 ,
Name : groupResource . Resource ,
}
var err error
2017-10-02 20:53:08 +00:00
resourceClient , err = ctx . dynamicFactory . ClientForGroupVersionResource ( obj . GroupVersionKind ( ) . GroupVersion ( ) , resource , namespace )
2017-08-02 17:27:17 +00:00
if err != nil {
2019-01-25 03:33:07 +00:00
addVeleroError ( & errs , fmt . Errorf ( "error getting resource client for namespace %q, resource %q: %v" , namespace , & groupResource , err ) )
2017-10-10 18:43:53 +00:00
return warnings , errs
2017-08-02 17:27:17 +00:00
}
2017-11-21 17:24:43 +00:00
}
2017-08-02 17:27:17 +00:00
2018-06-22 19:32:03 +00:00
name := obj . GetName ( )
2018-06-28 14:06:55 +00:00
// TODO: move to restore item action if/when we add a ShouldRestore() method to the interface
if groupResource == kuberesource . Pods && obj . GetAnnotations ( ) [ v1 . MirrorPodAnnotationKey ] != "" {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "Not restoring pod because it's a mirror pod" )
2018-06-28 14:06:55 +00:00
continue
}
2018-05-11 19:40:19 +00:00
if groupResource == kuberesource . PersistentVolumes {
2018-10-23 18:44:05 +00:00
var hasSnapshot bool
2018-06-22 19:32:03 +00:00
2018-10-23 18:44:05 +00:00
if len ( ctx . backup . Status . VolumeBackups ) > 0 {
// pre-v0.10 backup
_ , hasSnapshot = ctx . backup . Status . VolumeBackups [ name ]
} else {
// v0.10+ backup
for _ , snapshot := range ctx . volumeSnapshots {
if snapshot . Spec . PersistentVolumeName == name {
hasSnapshot = true
break
}
}
}
2018-06-22 19:32:03 +00:00
2018-10-23 18:44:05 +00:00
if ! hasSnapshot && hasDeleteReclaimPolicy ( obj . Object ) {
ctx . log . Infof ( "Not restoring PV because it doesn't have a snapshot and its reclaim policy is Delete." )
ctx . pvsToProvision . Insert ( name )
2018-06-22 19:32:03 +00:00
continue
}
2018-10-25 20:24:57 +00:00
// Check if the PV exists in the cluster before attempting to create
// a volume from the snapshot, in order to avoid orphaned volumes (GH #609)
2018-09-07 14:42:57 +00:00
shouldRestoreSnapshot , err := ctx . shouldRestore ( name , resourceClient )
if err != nil {
addToResult ( & errs , namespace , errors . Wrapf ( err , "error waiting on in-cluster persistentvolume %s" , name ) )
continue
}
2017-08-02 17:27:17 +00:00
2018-10-25 20:24:57 +00:00
// PV's existence will be recorded later. Just skip the volume restore logic.
2018-09-07 14:42:57 +00:00
if shouldRestoreSnapshot {
2018-10-25 20:24:57 +00:00
// restore the PV from snapshot (if applicable)
updatedObj , err := ctx . pvRestorer . executePVAction ( obj )
2017-08-02 17:27:17 +00:00
if err != nil {
2018-10-25 20:24:57 +00:00
addToResult ( & errs , namespace , fmt . Errorf ( "error executing PVAction for %s: %v" , fullPath , err ) )
continue
2017-08-02 17:27:17 +00:00
}
2018-10-25 20:24:57 +00:00
obj = updatedObj
if resourceWatch == nil {
resourceWatch , err = resourceClient . Watch ( metav1 . ListOptions { } )
if err != nil {
addToResult ( & errs , namespace , fmt . Errorf ( "error watching for namespace %q, resource %q: %v" , namespace , & groupResource , err ) )
return warnings , errs
2018-02-28 01:35:35 +00:00
}
2018-10-25 20:24:57 +00:00
ctx . resourceWatches = append ( ctx . resourceWatches , resourceWatch )
ctx . resourceWaitGroup . Add ( 1 )
go func ( ) {
defer ctx . resourceWaitGroup . Done ( )
if _ , err := waitForReady ( resourceWatch . ResultChan ( ) , name , isPVReady , time . Minute , ctx . log ) ; err != nil {
ctx . log . Warnf ( "Timeout reached waiting for persistent volume %s to become ready" , name )
2019-01-25 03:33:07 +00:00
addVeleroError ( & warnings , fmt . Errorf ( "timeout reached waiting for persistent volume %s to become ready" , name ) )
2018-10-25 20:24:57 +00:00
}
} ( )
}
2018-11-08 15:56:14 +00:00
} else if err != nil {
addToResult ( & errs , namespace , fmt . Errorf ( "error checking existence for PV %s: %v" , name , err ) )
continue
2017-08-02 17:27:17 +00:00
}
}
2018-06-22 19:32:03 +00:00
if groupResource == kuberesource . PersistentVolumeClaims {
2019-01-07 22:07:53 +00:00
pvc := new ( v1 . PersistentVolumeClaim )
if err := runtime . DefaultUnstructuredConverter . FromUnstructured ( obj . UnstructuredContent ( ) , pvc ) ; err != nil {
2018-06-22 19:32:03 +00:00
addToResult ( & errs , namespace , err )
continue
}
2019-01-07 22:07:53 +00:00
if pvc . Spec . VolumeName != "" && ctx . pvsToProvision . Has ( pvc . Spec . VolumeName ) {
ctx . log . Infof ( "Resetting PersistentVolumeClaim %s/%s for dynamic provisioning because its PV %v has a reclaim policy of Delete" , namespace , name , pvc . Spec . VolumeName )
2018-06-22 19:32:03 +00:00
2019-01-07 22:07:53 +00:00
pvc . Spec . VolumeName = ""
delete ( pvc . Annotations , "pv.kubernetes.io/bind-completed" )
delete ( pvc . Annotations , "pv.kubernetes.io/bound-by-controller" )
2018-06-22 19:32:03 +00:00
2019-01-07 22:07:53 +00:00
res , err := runtime . DefaultUnstructuredConverter . ToUnstructured ( pvc )
if err != nil {
addToResult ( & errs , namespace , err )
continue
}
obj . Object = res
2018-06-22 19:32:03 +00:00
}
}
2018-11-02 08:42:58 +00:00
// clear out non-core metadata fields & status
if obj , err = resetMetadataAndStatus ( obj ) ; err != nil {
addToResult ( & errs , namespace , err )
continue
}
2017-11-21 17:24:43 +00:00
for _ , action := range applicableActions {
if ! action . selector . Matches ( labels . Set ( obj . GetLabels ( ) ) ) {
continue
}
2017-08-02 17:27:17 +00:00
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "Executing item action for %v" , & groupResource )
2017-08-02 17:27:17 +00:00
2017-11-21 17:24:43 +00:00
updatedObj , warning , err := action . Execute ( obj , ctx . restore )
if warning != nil {
addToResult ( & warnings , namespace , fmt . Errorf ( "warning preparing %s: %v" , fullPath , warning ) )
}
if err != nil {
addToResult ( & errs , namespace , fmt . Errorf ( "error preparing %s: %v" , fullPath , err ) )
continue
}
unstructuredObj , ok := updatedObj . ( * unstructured . Unstructured )
if ! ok {
addToResult ( & errs , namespace , fmt . Errorf ( "%s: unexpected type %T" , fullPath , updatedObj ) )
continue
}
obj = unstructuredObj
2017-08-02 17:27:17 +00:00
}
// necessary because we may have remapped the namespace
2018-03-29 18:50:30 +00:00
// if the namespace is blank, don't create the key
2018-09-26 19:00:48 +00:00
originalNamespace := obj . GetNamespace ( )
2018-03-29 18:50:30 +00:00
if namespace != "" {
obj . SetNamespace ( namespace )
}
2017-08-02 17:27:17 +00:00
2018-08-08 23:51:33 +00:00
// label the resource with the restore's name and the restored backup's name
// for easy identification of all cluster resources created by this restore
// and which backup they came from
addRestoreLabels ( obj , ctx . restore . Name , ctx . restore . Spec . BackupName )
2017-08-02 17:27:17 +00:00
2019-02-11 17:12:07 +00:00
ctx . log . Infof ( "Attempting to restore %s: %v" , obj . GroupVersionKind ( ) . Kind , name )
2018-02-28 01:35:35 +00:00
createdObj , restoreErr := resourceClient . Create ( obj )
2018-03-29 18:50:30 +00:00
if apierrors . IsAlreadyExists ( restoreErr ) {
2018-06-22 19:32:03 +00:00
fromCluster , err := resourceClient . Get ( name , metav1 . GetOptions { } )
2018-04-11 15:07:43 +00:00
if err != nil {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "Error retrieving cluster version of %s: %v" , kube . NamespaceAndName ( obj ) , err )
2018-04-11 15:07:43 +00:00
addToResult ( & warnings , namespace , err )
continue
2018-03-29 18:50:30 +00:00
}
2018-04-11 15:07:43 +00:00
// Remove insubstantial metadata
fromCluster , err = resetMetadataAndStatus ( fromCluster )
if err != nil {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "Error trying to reset metadata for %s: %v" , kube . NamespaceAndName ( obj ) , err )
2018-04-11 15:07:43 +00:00
addToResult ( & warnings , namespace , err )
continue
}
2018-08-08 23:51:33 +00:00
// We know the object from the cluster won't have the backup/restore name labels, so
// copy them from the object we attempted to restore.
labels := obj . GetLabels ( )
addRestoreLabels ( fromCluster , labels [ api . RestoreNameLabel ] , labels [ api . BackupNameLabel ] )
2018-04-11 15:07:43 +00:00
2019-02-11 17:12:07 +00:00
if equality . Semantic . DeepEqual ( fromCluster , obj ) {
ctx . log . Infof ( "Skipping restore of %s: %v because it already exists in the cluster and is unchanged from the backed up version" , obj . GroupVersionKind ( ) . Kind , name )
} else {
2018-04-11 15:07:43 +00:00
switch groupResource {
case kuberesource . ServiceAccounts :
desired , err := mergeServiceAccounts ( fromCluster , obj )
if err != nil {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "error merging secrets for ServiceAccount %s: %v" , kube . NamespaceAndName ( obj ) , err )
2018-04-11 15:07:43 +00:00
addToResult ( & warnings , namespace , err )
2019-02-11 17:12:07 +00:00
break
2018-04-11 15:07:43 +00:00
}
patchBytes , err := generatePatch ( fromCluster , desired )
if err != nil {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "error generating patch for ServiceAccount %s: %v" , kube . NamespaceAndName ( obj ) , err )
2018-04-11 15:07:43 +00:00
addToResult ( & warnings , namespace , err )
2019-02-11 17:12:07 +00:00
break
2018-04-11 15:07:43 +00:00
}
if patchBytes == nil {
// In-cluster and desired state are the same, so move on to the next item
2019-02-11 17:12:07 +00:00
break
2018-04-11 15:07:43 +00:00
}
2018-06-22 19:32:03 +00:00
_ , err = resourceClient . Patch ( name , patchBytes )
2018-04-11 15:07:43 +00:00
if err != nil {
addToResult ( & warnings , namespace , err )
2019-02-11 17:12:07 +00:00
break
2018-04-11 15:07:43 +00:00
}
2019-02-11 17:12:07 +00:00
ctx . log . Infof ( "ServiceAccount %s successfully updated" , kube . NamespaceAndName ( obj ) )
2018-04-11 15:07:43 +00:00
default :
e := errors . Errorf ( "not restored: %s and is different from backed up version." , restoreErr )
addToResult ( & warnings , namespace , e )
}
2018-03-29 18:50:30 +00:00
}
2017-08-02 17:27:17 +00:00
continue
}
2018-03-29 18:50:30 +00:00
// Error was something other than an AlreadyExists
if restoreErr != nil {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "error restoring %s: %v" , name , err )
2018-03-29 18:50:30 +00:00
addToResult ( & errs , namespace , fmt . Errorf ( "error restoring %s: %v" , fullPath , restoreErr ) )
2017-08-02 17:27:17 +00:00
continue
}
2018-02-28 01:35:35 +00:00
if groupResource == kuberesource . Pods && len ( restic . GetPodSnapshotAnnotations ( obj ) ) > 0 {
if ctx . resticRestorer == nil {
2018-09-30 20:45:32 +00:00
ctx . log . Warn ( "No restic restorer, not restoring pod's volumes" )
2018-02-28 01:35:35 +00:00
} else {
ctx . globalWaitGroup . GoErrorSlice ( func ( ) [ ] error {
pod := new ( v1 . Pod )
if err := runtime . DefaultUnstructuredConverter . FromUnstructured ( createdObj . UnstructuredContent ( ) , & pod ) ; err != nil {
2018-09-30 20:45:32 +00:00
ctx . log . WithError ( err ) . Error ( "error converting unstructured pod" )
2018-02-28 01:35:35 +00:00
return [ ] error { err }
}
2018-09-25 20:20:58 +00:00
if errs := ctx . resticRestorer . RestorePodVolumes ( ctx . restore , pod , originalNamespace , ctx . backup . Spec . StorageLocation , ctx . log ) ; errs != nil {
2018-09-30 20:45:32 +00:00
ctx . log . WithError ( kubeerrs . NewAggregate ( errs ) ) . Error ( "unable to successfully complete restic restores of pod's volumes" )
2018-02-28 01:35:35 +00:00
return errs
}
return nil
} )
}
2017-08-02 17:27:17 +00:00
}
}
2018-02-28 01:35:35 +00:00
return warnings , errs
}
2018-10-23 18:44:05 +00:00
func hasDeleteReclaimPolicy ( obj map [ string ] interface { } ) bool {
2019-01-07 22:07:53 +00:00
policy , _ , _ := unstructured . NestedString ( obj , "spec" , "persistentVolumeReclaimPolicy" )
return policy == string ( v1 . PersistentVolumeReclaimDelete )
2018-10-23 18:44:05 +00:00
}
2018-02-28 01:35:35 +00:00
func waitForReady (
watchChan <- chan watch . Event ,
name string ,
ready func ( runtime . Unstructured ) bool ,
timeout time . Duration ,
log logrus . FieldLogger ,
) ( * unstructured . Unstructured , error ) {
var timeoutChan <- chan time . Time
if timeout != 0 {
timeoutChan = time . After ( timeout )
} else {
timeoutChan = make ( chan time . Time )
2017-08-02 17:27:17 +00:00
}
2018-02-28 01:35:35 +00:00
for {
select {
case event := <- watchChan :
if event . Type != watch . Added && event . Type != watch . Modified {
continue
}
obj , ok := event . Object . ( * unstructured . Unstructured )
switch {
case ! ok :
log . Errorf ( "Unexpected type %T" , event . Object )
continue
case obj . GetName ( ) != name :
continue
case ! ready ( obj ) :
log . Debugf ( "Item %s is not ready yet" , name )
continue
default :
return obj , nil
}
case <- timeoutChan :
return nil , errors . New ( "failed to observe item becoming ready within the timeout" )
}
}
2017-08-02 17:27:17 +00:00
}
2018-06-22 19:32:03 +00:00
type PVRestorer interface {
executePVAction ( obj * unstructured . Unstructured ) ( * unstructured . Unstructured , error )
}
type pvRestorer struct {
2018-10-16 14:28:05 +00:00
logger logrus . FieldLogger
2018-10-19 18:54:24 +00:00
backup * api . Backup
2018-10-16 14:28:05 +00:00
snapshotVolumes * bool
restorePVs * bool
volumeSnapshots [ ] * volume . Snapshot
blockStoreGetter BlockStoreGetter
snapshotLocationLister listers . VolumeSnapshotLocationLister
2018-06-22 19:32:03 +00:00
}
2018-10-19 18:54:24 +00:00
type snapshotInfo struct {
providerSnapshotID string
volumeType string
volumeAZ string
volumeIOPS * int64
location * api . VolumeSnapshotLocation
}
func getSnapshotInfo ( pvName string , backup * api . Backup , volumeSnapshots [ ] * volume . Snapshot , snapshotLocationLister listers . VolumeSnapshotLocationLister ) ( * snapshotInfo , error ) {
// pre-v0.10 backup
if backup . Status . VolumeBackups != nil {
volumeBackup := backup . Status . VolumeBackups [ pvName ]
if volumeBackup == nil {
return nil , nil
}
locations , err := snapshotLocationLister . VolumeSnapshotLocations ( backup . Namespace ) . List ( labels . Everything ( ) )
if err != nil {
return nil , errors . WithStack ( err )
}
if len ( locations ) != 1 {
return nil , errors . Errorf ( "unable to restore pre-v0.10 volume snapshot because exactly one volume snapshot location must exist, got %d" , len ( locations ) )
}
return & snapshotInfo {
providerSnapshotID : volumeBackup . SnapshotID ,
volumeType : volumeBackup . Type ,
volumeAZ : volumeBackup . AvailabilityZone ,
volumeIOPS : volumeBackup . Iops ,
location : locations [ 0 ] ,
} , nil
}
// v0.10+ backup
var pvSnapshot * volume . Snapshot
for _ , snapshot := range volumeSnapshots {
if snapshot . Spec . PersistentVolumeName == pvName {
pvSnapshot = snapshot
break
}
}
if pvSnapshot == nil {
return nil , nil
}
loc , err := snapshotLocationLister . VolumeSnapshotLocations ( backup . Namespace ) . Get ( pvSnapshot . Spec . Location )
if err != nil {
return nil , errors . WithStack ( err )
}
return & snapshotInfo {
providerSnapshotID : pvSnapshot . Status . ProviderSnapshotID ,
volumeType : pvSnapshot . Spec . VolumeType ,
volumeAZ : pvSnapshot . Spec . VolumeAZ ,
volumeIOPS : pvSnapshot . Spec . VolumeIOPS ,
location : loc ,
} , nil
}
2018-06-22 19:32:03 +00:00
func ( r * pvRestorer ) executePVAction ( obj * unstructured . Unstructured ) ( * unstructured . Unstructured , error ) {
2017-11-29 17:23:21 +00:00
pvName := obj . GetName ( )
if pvName == "" {
return nil , errors . New ( "PersistentVolume is missing its name" )
2017-11-21 17:24:43 +00:00
}
2019-01-23 20:41:03 +00:00
// It's simpler to just access the spec through the unstructured object than to convert
// to structured and back here, especially since the SetVolumeID(...) call below needs
// the unstructured representation (and does a conversion internally).
2019-01-07 22:07:53 +00:00
res , ok := obj . Object [ "spec" ]
if ! ok {
return nil , errors . New ( "spec not found" )
}
spec , ok := res . ( map [ string ] interface { } )
if ! ok {
return nil , errors . Errorf ( "spec was of type %T, expected map[string]interface{}" , res )
2017-11-21 17:24:43 +00:00
}
2017-11-29 17:23:21 +00:00
delete ( spec , "claimRef" )
delete ( spec , "storageClassName" )
2017-11-21 17:24:43 +00:00
2018-06-22 19:32:03 +00:00
if boolptr . IsSetToFalse ( r . snapshotVolumes ) {
2017-11-29 17:23:21 +00:00
// The backup had snapshots disabled, so we can return early
return obj , nil
}
2017-11-21 17:24:43 +00:00
2018-06-22 19:32:03 +00:00
if boolptr . IsSetToFalse ( r . restorePVs ) {
2017-11-29 17:23:21 +00:00
// The restore has pv restores disabled, so we can return early
return obj , nil
2017-11-21 17:24:43 +00:00
}
2018-10-16 14:28:05 +00:00
log := r . logger . WithFields ( logrus . Fields { "persistentVolume" : pvName } )
2018-10-19 18:54:24 +00:00
snapshotInfo , err := getSnapshotInfo ( pvName , r . backup , r . volumeSnapshots , r . snapshotLocationLister )
if err != nil {
return nil , err
2018-10-16 14:28:05 +00:00
}
2018-10-19 18:54:24 +00:00
if snapshotInfo == nil {
log . Infof ( "No snapshot found for persistent volume" )
2017-11-29 17:23:21 +00:00
return obj , nil
2017-11-21 17:24:43 +00:00
}
2018-10-19 18:54:24 +00:00
blockStore , err := r . blockStoreGetter . GetBlockStore ( snapshotInfo . location . Spec . Provider )
2018-10-16 14:28:05 +00:00
if err != nil {
return nil , errors . WithStack ( err )
2017-11-29 17:23:21 +00:00
}
2017-11-21 17:24:43 +00:00
2018-10-19 18:54:24 +00:00
if err := blockStore . Init ( snapshotInfo . location . Spec . Config ) ; err != nil {
2018-10-16 14:28:05 +00:00
return nil , errors . WithStack ( err )
}
2018-06-22 19:32:03 +00:00
2018-10-19 18:54:24 +00:00
volumeID , err := blockStore . CreateVolumeFromSnapshot ( snapshotInfo . providerSnapshotID , snapshotInfo . volumeType , snapshotInfo . volumeAZ , snapshotInfo . volumeIOPS )
2017-11-29 17:23:21 +00:00
if err != nil {
2018-10-16 14:28:05 +00:00
return nil , errors . WithStack ( err )
2017-11-21 17:24:43 +00:00
}
2018-10-19 18:54:24 +00:00
log . WithField ( "providerSnapshotID" , snapshotInfo . providerSnapshotID ) . Info ( "successfully restored persistent volume from snapshot" )
2018-10-16 14:28:05 +00:00
updated1 , err := blockStore . SetVolumeID ( obj , volumeID )
2017-11-29 17:23:21 +00:00
if err != nil {
2018-10-16 14:28:05 +00:00
return nil , errors . WithStack ( err )
2017-11-29 17:23:21 +00:00
}
updated2 , ok := updated1 . ( * unstructured . Unstructured )
if ! ok {
return nil , errors . Errorf ( "unexpected type %T" , updated1 )
2017-11-21 17:24:43 +00:00
}
2017-11-29 17:23:21 +00:00
return updated2 , nil
2017-11-21 17:24:43 +00:00
}
func isPVReady ( obj runtime . Unstructured ) bool {
2019-01-07 22:07:53 +00:00
phase , _ , _ := unstructured . NestedString ( obj . UnstructuredContent ( ) , "status" , "phase" )
2017-11-21 17:24:43 +00:00
return phase == string ( v1 . VolumeAvailable )
}
2017-12-21 21:23:48 +00:00
func resetMetadataAndStatus ( obj * unstructured . Unstructured ) ( * unstructured . Unstructured , error ) {
2019-01-07 22:07:53 +00:00
res , ok := obj . Object [ "metadata" ]
if ! ok {
return nil , errors . New ( "metadata not found" )
}
metadata , ok := res . ( map [ string ] interface { } )
if ! ok {
return nil , errors . Errorf ( "metadata was of type %T, expected map[string]interface{}" , res )
2017-11-21 17:24:43 +00:00
}
for k := range metadata {
2017-12-21 21:23:48 +00:00
switch k {
case "name" , "namespace" , "labels" , "annotations" :
default :
delete ( metadata , k )
2017-11-21 17:24:43 +00:00
}
}
2018-04-26 20:07:50 +00:00
// Never restore status
2017-11-21 17:24:43 +00:00
delete ( obj . UnstructuredContent ( ) , "status" )
return obj , nil
}
2018-08-08 23:51:33 +00:00
// addRestoreLabels labels the provided object with the restore name and
// the restored backup's name.
func addRestoreLabels ( obj metav1 . Object , restoreName , backupName string ) {
2017-08-02 17:27:17 +00:00
labels := obj . GetLabels ( )
if labels == nil {
labels = make ( map [ string ] string )
}
2018-08-08 23:51:33 +00:00
labels [ api . BackupNameLabel ] = backupName
2018-08-08 23:33:09 +00:00
labels [ api . RestoreNameLabel ] = restoreName
// TODO(1.0): remove the below line, and remove the `RestoreLabelKey`
// constant from the API pkg, since it's been replaced with the
// namespaced label above.
labels [ api . RestoreLabelKey ] = restoreName
2017-08-02 17:27:17 +00:00
obj . SetLabels ( labels )
}
// hasControllerOwner returns whether or not an object has a controller
// owner ref. Used to identify whether or not an object should be explicitly
// recreated during a restore.
2017-08-02 20:35:35 +00:00
func hasControllerOwner ( refs [ ] metav1 . OwnerReference ) bool {
for _ , ref := range refs {
if ref . Controller != nil && * ref . Controller {
return true
2017-08-02 17:27:17 +00:00
}
}
2017-08-02 20:35:35 +00:00
return false
2017-08-02 17:27:17 +00:00
}
2018-04-26 20:07:50 +00:00
// isCompleted returns whether or not an object is considered completed.
// Used to identify whether or not an object should be restored. Only Jobs or Pods are considered
func isCompleted ( obj * unstructured . Unstructured , groupResource schema . GroupResource ) ( bool , error ) {
switch groupResource {
2018-05-11 19:40:19 +00:00
case kuberesource . Pods :
2018-04-26 20:07:50 +00:00
phase , _ , err := unstructured . NestedString ( obj . UnstructuredContent ( ) , "status" , "phase" )
if err != nil {
return false , errors . WithStack ( err )
}
if phase == string ( v1 . PodFailed ) || phase == string ( v1 . PodSucceeded ) {
return true , nil
}
2018-05-11 19:40:19 +00:00
case kuberesource . Jobs :
2018-04-26 20:07:50 +00:00
ct , found , err := unstructured . NestedString ( obj . UnstructuredContent ( ) , "status" , "completionTime" )
if err != nil {
return false , errors . WithStack ( err )
}
if found && ct != "" {
return true , nil
}
}
// Assume any other resource isn't complete and can be restored
return false , nil
}
2017-08-02 17:27:17 +00:00
// unmarshal reads the specified file, unmarshals the JSON contained within it
// and returns an Unstructured object.
2017-09-12 19:54:08 +00:00
func ( ctx * context ) unmarshal ( filePath string ) ( * unstructured . Unstructured , error ) {
2017-08-02 17:27:17 +00:00
var obj unstructured . Unstructured
2017-09-12 19:54:08 +00:00
bytes , err := ctx . fileSystem . ReadFile ( filePath )
2017-08-02 17:27:17 +00:00
if err != nil {
return nil , err
}
err = json . Unmarshal ( bytes , & obj )
if err != nil {
return nil , err
}
return & obj , nil
}
// unzipAndExtractBackup extracts a reader on a gzipped tarball to a local temp directory
2017-09-12 19:54:08 +00:00
func ( ctx * context ) unzipAndExtractBackup ( src io . Reader ) ( string , error ) {
2017-08-02 17:27:17 +00:00
gzr , err := gzip . NewReader ( src )
if err != nil {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "error creating gzip reader: %v" , err )
2017-08-02 17:27:17 +00:00
return "" , err
}
defer gzr . Close ( )
2017-09-12 19:54:08 +00:00
return ctx . readBackup ( tar . NewReader ( gzr ) )
2017-08-02 17:27:17 +00:00
}
// readBackup extracts a tar reader to a local directory/file tree within a
// temp directory.
2017-09-12 19:54:08 +00:00
func ( ctx * context ) readBackup ( tarRdr * tar . Reader ) ( string , error ) {
dir , err := ctx . fileSystem . TempDir ( "" , "" )
2017-08-02 17:27:17 +00:00
if err != nil {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "error creating temp dir: %v" , err )
2017-08-02 17:27:17 +00:00
return "" , err
}
for {
header , err := tarRdr . Next ( )
if err == io . EOF {
break
}
if err != nil {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "error reading tar: %v" , err )
2017-08-02 17:27:17 +00:00
return "" , err
}
2017-10-10 18:43:53 +00:00
target := filepath . Join ( dir , header . Name )
2017-08-02 17:27:17 +00:00
switch header . Typeflag {
case tar . TypeDir :
2017-09-12 19:54:08 +00:00
err := ctx . fileSystem . MkdirAll ( target , header . FileInfo ( ) . Mode ( ) )
2017-08-02 17:27:17 +00:00
if err != nil {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "mkdirall error: %v" , err )
2017-08-02 17:27:17 +00:00
return "" , err
}
case tar . TypeReg :
// make sure we have the directory created
2017-10-10 18:43:53 +00:00
err := ctx . fileSystem . MkdirAll ( filepath . Dir ( target ) , header . FileInfo ( ) . Mode ( ) )
2017-08-02 17:27:17 +00:00
if err != nil {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "mkdirall error: %v" , err )
2017-08-02 17:27:17 +00:00
return "" , err
}
// create the file
2017-09-12 19:54:08 +00:00
file , err := ctx . fileSystem . Create ( target )
2017-08-02 17:27:17 +00:00
if err != nil {
return "" , err
}
defer file . Close ( )
if _ , err := io . Copy ( file , tarRdr ) ; err != nil {
2018-09-30 20:45:32 +00:00
ctx . log . Infof ( "error copying: %v" , err )
2017-08-02 17:27:17 +00:00
return "" , err
}
}
}
return dir , nil
}