Merge pull request #443 from skriss/restore-from-schedule
Add --from-schedule flag to `ark restore create`pull/676/head
commit
1e2b141e5d
|
@ -8,7 +8,7 @@ Create a restore
|
|||
Create a restore
|
||||
|
||||
```
|
||||
ark create restore [RESTORE_NAME] --from-backup BACKUP_NAME [flags]
|
||||
ark create restore [RESTORE_NAME] [--from-backup BACKUP_NAME | --from-schedule SCHEDULE_NAME] [flags]
|
||||
```
|
||||
|
||||
### Examples
|
||||
|
@ -19,6 +19,10 @@ ark create restore [RESTORE_NAME] --from-backup BACKUP_NAME [flags]
|
|||
|
||||
# create a restore with a default name ("backup-1-<timestamp>") from backup "backup-1"
|
||||
ark restore create --from-backup backup-1
|
||||
|
||||
# create a restore from the latest successful backup triggered by schedule "schedule-1"
|
||||
ark restore create --from-schedule schedule-1
|
||||
|
||||
```
|
||||
|
||||
### Options
|
||||
|
@ -27,6 +31,7 @@ ark create restore [RESTORE_NAME] --from-backup BACKUP_NAME [flags]
|
|||
--exclude-namespaces stringArray namespaces to exclude from the restore
|
||||
--exclude-resources stringArray resources to exclude from the restore, formatted as resource.group, such as storageclasses.storage.k8s.io
|
||||
--from-backup string backup to restore from
|
||||
--from-schedule string schedule to restore from
|
||||
-h, --help help for restore
|
||||
--include-cluster-resources optionalBool[=true] include cluster-scoped resources in the restore
|
||||
--include-namespaces stringArray namespaces to include in the restore (use '*' for all namespaces) (default *)
|
||||
|
|
|
@ -8,7 +8,7 @@ Create a restore
|
|||
Create a restore
|
||||
|
||||
```
|
||||
ark restore create [RESTORE_NAME] --from-backup BACKUP_NAME [flags]
|
||||
ark restore create [RESTORE_NAME] [--from-backup BACKUP_NAME | --from-schedule SCHEDULE_NAME] [flags]
|
||||
```
|
||||
|
||||
### Examples
|
||||
|
@ -19,6 +19,10 @@ ark restore create [RESTORE_NAME] --from-backup BACKUP_NAME [flags]
|
|||
|
||||
# create a restore with a default name ("backup-1-<timestamp>") from backup "backup-1"
|
||||
ark restore create --from-backup backup-1
|
||||
|
||||
# create a restore from the latest successful backup triggered by schedule "schedule-1"
|
||||
ark restore create --from-schedule schedule-1
|
||||
|
||||
```
|
||||
|
||||
### Options
|
||||
|
@ -27,6 +31,7 @@ ark restore create [RESTORE_NAME] --from-backup BACKUP_NAME [flags]
|
|||
--exclude-namespaces stringArray namespaces to exclude from the restore
|
||||
--exclude-resources stringArray resources to exclude from the restore, formatted as resource.group, such as storageclasses.storage.k8s.io
|
||||
--from-backup string backup to restore from
|
||||
--from-schedule string schedule to restore from
|
||||
-h, --help help for create
|
||||
--include-cluster-resources optionalBool[=true] include cluster-scoped resources in the restore
|
||||
--include-namespaces stringArray namespaces to include in the restore (use '*' for all namespaces) (default *)
|
||||
|
|
|
@ -24,6 +24,11 @@ type RestoreSpec struct {
|
|||
// from.
|
||||
BackupName string `json:"backupName"`
|
||||
|
||||
// ScheduleName is the unique name of the Ark schedule to restore
|
||||
// from. If specified, and BackupName is empty, Ark will restore
|
||||
// from the most recent successful backup created from this schedule.
|
||||
ScheduleName string `json:"scheduleName,omitempty"`
|
||||
|
||||
// IncludedNamespaces is a slice of namespace names to include objects
|
||||
// from. If empty, all namespaces are included.
|
||||
IncludedNamespaces []string `json:"includedNamespaces"`
|
||||
|
|
|
@ -38,13 +38,17 @@ func NewCreateCommand(f client.Factory, use string) *cobra.Command {
|
|||
o := NewCreateOptions()
|
||||
|
||||
c := &cobra.Command{
|
||||
Use: use + " [RESTORE_NAME] --from-backup BACKUP_NAME",
|
||||
Use: use + " [RESTORE_NAME] [--from-backup BACKUP_NAME | --from-schedule SCHEDULE_NAME]",
|
||||
Short: "Create a restore",
|
||||
Example: ` # create a restore named "restore-1" from backup "backup-1"
|
||||
ark restore create restore-1 --from-backup backup-1
|
||||
|
||||
# create a restore with a default name ("backup-1-<timestamp>") from backup "backup-1"
|
||||
ark restore create --from-backup backup-1`,
|
||||
ark restore create --from-backup backup-1
|
||||
|
||||
# create a restore from the latest successful backup triggered by schedule "schedule-1"
|
||||
ark restore create --from-schedule schedule-1
|
||||
`,
|
||||
Args: cobra.MaximumNArgs(1),
|
||||
Run: func(c *cobra.Command, args []string) {
|
||||
cmd.CheckError(o.Complete(args, f))
|
||||
|
@ -62,6 +66,7 @@ func NewCreateCommand(f client.Factory, use string) *cobra.Command {
|
|||
|
||||
type CreateOptions struct {
|
||||
BackupName string
|
||||
ScheduleName string
|
||||
RestoreName string
|
||||
RestoreVolumes flag.OptionalBool
|
||||
Labels flag.Map
|
||||
|
@ -88,6 +93,7 @@ func NewCreateOptions() *CreateOptions {
|
|||
|
||||
func (o *CreateOptions) BindFlags(flags *pflag.FlagSet) {
|
||||
flags.StringVar(&o.BackupName, "from-backup", "", "backup to restore from")
|
||||
flags.StringVar(&o.ScheduleName, "from-schedule", "", "schedule to restore from")
|
||||
flags.Var(&o.IncludeNamespaces, "include-namespaces", "namespaces to include in the restore (use '*' for all namespaces)")
|
||||
flags.Var(&o.ExcludeNamespaces, "exclude-namespaces", "namespaces to exclude from the restore")
|
||||
flags.Var(&o.NamespaceMappings, "namespace-mappings", "namespace mappings from name in the backup to desired restored name in the form src1:dst1,src2:dst2,...")
|
||||
|
@ -104,9 +110,34 @@ func (o *CreateOptions) BindFlags(flags *pflag.FlagSet) {
|
|||
f.NoOptDefVal = "true"
|
||||
}
|
||||
|
||||
func (o *CreateOptions) Complete(args []string, f client.Factory) error {
|
||||
if len(args) == 1 {
|
||||
o.RestoreName = args[0]
|
||||
} else {
|
||||
sourceName := o.BackupName
|
||||
if o.ScheduleName != "" {
|
||||
sourceName = o.ScheduleName
|
||||
}
|
||||
|
||||
o.RestoreName = fmt.Sprintf("%s-%s", sourceName, time.Now().Format("20060102150405"))
|
||||
}
|
||||
|
||||
client, err := f.Client()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.client = client
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *CreateOptions) Validate(c *cobra.Command, args []string, f client.Factory) error {
|
||||
if len(o.BackupName) == 0 {
|
||||
return errors.New("--from-backup is required")
|
||||
if o.BackupName != "" && o.ScheduleName != "" {
|
||||
return errors.New("either a backup or schedule must be specified, but not both")
|
||||
}
|
||||
|
||||
if o.BackupName == "" && o.ScheduleName == "" {
|
||||
return errors.New("either a backup or schedule must be specified, but not both")
|
||||
}
|
||||
|
||||
if err := output.ValidateFlags(c); err != nil {
|
||||
|
@ -118,29 +149,20 @@ func (o *CreateOptions) Validate(c *cobra.Command, args []string, f client.Facto
|
|||
return errors.New("Ark client is not set; unable to proceed")
|
||||
}
|
||||
|
||||
if _, err := o.client.ArkV1().Backups(f.Namespace()).Get(o.BackupName, metav1.GetOptions{}); err != nil {
|
||||
return err
|
||||
switch {
|
||||
case o.BackupName != "":
|
||||
if _, err := o.client.ArkV1().Backups(f.Namespace()).Get(o.BackupName, metav1.GetOptions{}); err != nil {
|
||||
return err
|
||||
}
|
||||
case o.ScheduleName != "":
|
||||
if _, err := o.client.ArkV1().Schedules(f.Namespace()).Get(o.ScheduleName, metav1.GetOptions{}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *CreateOptions) Complete(args []string, f client.Factory) error {
|
||||
if len(args) == 1 {
|
||||
o.RestoreName = args[0]
|
||||
} else {
|
||||
o.RestoreName = fmt.Sprintf("%s-%s", o.BackupName, time.Now().Format("20060102150405"))
|
||||
}
|
||||
|
||||
client, err := f.Client()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.client = client
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *CreateOptions) Run(c *cobra.Command, f client.Factory) error {
|
||||
if o.client == nil {
|
||||
// This should never happen
|
||||
|
@ -155,6 +177,7 @@ func (o *CreateOptions) Run(c *cobra.Command, f client.Factory) error {
|
|||
},
|
||||
Spec: api.RestoreSpec{
|
||||
BackupName: o.BackupName,
|
||||
ScheduleName: o.ScheduleName,
|
||||
IncludedNamespaces: o.IncludeNamespaces,
|
||||
ExcludedNamespaces: o.ExcludeNamespaces,
|
||||
IncludedResources: o.IncludeResources,
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -32,6 +33,7 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
@ -45,6 +47,7 @@ import (
|
|||
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
|
||||
"github.com/heptio/ark/pkg/plugin"
|
||||
"github.com/heptio/ark/pkg/restore"
|
||||
"github.com/heptio/ark/pkg/util/boolptr"
|
||||
"github.com/heptio/ark/pkg/util/collections"
|
||||
kubeutil "github.com/heptio/ark/pkg/util/kube"
|
||||
)
|
||||
|
@ -147,35 +150,35 @@ func NewRestoreController(
|
|||
// Run is a blocking function that runs the specified number of worker goroutines
|
||||
// to process items in the work queue. It will return when it receives on the
|
||||
// ctx.Done() channel.
|
||||
func (controller *restoreController) Run(ctx context.Context, numWorkers int) error {
|
||||
func (c *restoreController) Run(ctx context.Context, numWorkers int) error {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
defer func() {
|
||||
controller.logger.Info("Waiting for workers to finish their work")
|
||||
c.logger.Info("Waiting for workers to finish their work")
|
||||
|
||||
controller.queue.ShutDown()
|
||||
c.queue.ShutDown()
|
||||
|
||||
// We have to wait here in the deferred function instead of at the bottom of the function body
|
||||
// because we have to shut down the queue in order for the workers to shut down gracefully, and
|
||||
// we want to shut down the queue via defer and not at the end of the body.
|
||||
wg.Wait()
|
||||
|
||||
controller.logger.Info("All workers have finished")
|
||||
c.logger.Info("All workers have finished")
|
||||
}()
|
||||
|
||||
controller.logger.Info("Starting RestoreController")
|
||||
defer controller.logger.Info("Shutting down RestoreController")
|
||||
c.logger.Info("Starting RestoreController")
|
||||
defer c.logger.Info("Shutting down RestoreController")
|
||||
|
||||
controller.logger.Info("Waiting for caches to sync")
|
||||
if !cache.WaitForCacheSync(ctx.Done(), controller.backupListerSynced, controller.restoreListerSynced) {
|
||||
c.logger.Info("Waiting for caches to sync")
|
||||
if !cache.WaitForCacheSync(ctx.Done(), c.backupListerSynced, c.restoreListerSynced) {
|
||||
return errors.New("timed out waiting for caches to sync")
|
||||
}
|
||||
controller.logger.Info("Caches are synced")
|
||||
c.logger.Info("Caches are synced")
|
||||
|
||||
wg.Add(numWorkers)
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
go func() {
|
||||
wait.Until(controller.runWorker, time.Second, ctx.Done())
|
||||
wait.Until(c.runWorker, time.Second, ctx.Done())
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
@ -185,40 +188,40 @@ func (controller *restoreController) Run(ctx context.Context, numWorkers int) er
|
|||
return nil
|
||||
}
|
||||
|
||||
func (controller *restoreController) runWorker() {
|
||||
func (c *restoreController) runWorker() {
|
||||
// continually take items off the queue (waits if it's
|
||||
// empty) until we get a shutdown signal from the queue
|
||||
for controller.processNextWorkItem() {
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
func (controller *restoreController) processNextWorkItem() bool {
|
||||
key, quit := controller.queue.Get()
|
||||
func (c *restoreController) processNextWorkItem() bool {
|
||||
key, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
// always call done on this item, since if it fails we'll add
|
||||
// it back with rate-limiting below
|
||||
defer controller.queue.Done(key)
|
||||
defer c.queue.Done(key)
|
||||
|
||||
err := controller.syncHandler(key.(string))
|
||||
err := c.syncHandler(key.(string))
|
||||
if err == nil {
|
||||
// If you had no error, tell the queue to stop tracking history for your key. This will reset
|
||||
// things like failure counts for per-item rate limiting.
|
||||
controller.queue.Forget(key)
|
||||
c.queue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
controller.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue")
|
||||
c.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue")
|
||||
// we had an error processing the item so add it back
|
||||
// into the queue for re-processing with rate-limiting
|
||||
controller.queue.AddRateLimited(key)
|
||||
c.queue.AddRateLimited(key)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (controller *restoreController) processRestore(key string) error {
|
||||
logContext := controller.logger.WithField("key", key)
|
||||
func (c *restoreController) processRestore(key string) error {
|
||||
logContext := c.logger.WithField("key", key)
|
||||
|
||||
logContext.Debug("Running processRestore")
|
||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
|
@ -227,7 +230,7 @@ func (controller *restoreController) processRestore(key string) error {
|
|||
}
|
||||
|
||||
logContext.Debug("Getting Restore")
|
||||
restore, err := controller.restoreLister.Restores(ns).Get(name)
|
||||
restore, err := c.restoreLister.Restores(ns).Get(name)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error getting Restore")
|
||||
}
|
||||
|
@ -252,22 +255,15 @@ func (controller *restoreController) processRestore(key string) error {
|
|||
// don't modify items in the cache
|
||||
restore = restore.DeepCopy()
|
||||
|
||||
excludedResources := sets.NewString(restore.Spec.ExcludedResources...)
|
||||
for _, nonrestorable := range nonRestorableResources {
|
||||
if !excludedResources.Has(nonrestorable) {
|
||||
restore.Spec.ExcludedResources = append(restore.Spec.ExcludedResources, nonrestorable)
|
||||
}
|
||||
}
|
||||
|
||||
// validation
|
||||
if restore.Status.ValidationErrors = controller.getValidationErrors(restore); len(restore.Status.ValidationErrors) > 0 {
|
||||
// complete & validate restore
|
||||
if restore.Status.ValidationErrors = c.completeAndValidate(restore); len(restore.Status.ValidationErrors) > 0 {
|
||||
restore.Status.Phase = api.RestorePhaseFailedValidation
|
||||
} else {
|
||||
restore.Status.Phase = api.RestorePhaseInProgress
|
||||
}
|
||||
|
||||
// update status
|
||||
updatedRestore, err := patchRestore(original, restore, controller.restoreClient)
|
||||
updatedRestore, err := patchRestore(original, restore, c.restoreClient)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error updating Restore phase to %s", restore.Status.Phase)
|
||||
}
|
||||
|
@ -281,7 +277,7 @@ func (controller *restoreController) processRestore(key string) error {
|
|||
|
||||
logContext.Debug("Running restore")
|
||||
// execution & upload of restore
|
||||
restoreWarnings, restoreErrors := controller.runRestore(restore, controller.bucket)
|
||||
restoreWarnings, restoreErrors := c.runRestore(restore, c.bucket)
|
||||
|
||||
restore.Status.Warnings = len(restoreWarnings.Ark) + len(restoreWarnings.Cluster)
|
||||
for _, w := range restoreWarnings.Namespaces {
|
||||
|
@ -297,46 +293,115 @@ func (controller *restoreController) processRestore(key string) error {
|
|||
restore.Status.Phase = api.RestorePhaseCompleted
|
||||
|
||||
logContext.Debug("Updating Restore final status")
|
||||
if _, err = patchRestore(original, restore, controller.restoreClient); err != nil {
|
||||
if _, err = patchRestore(original, restore, c.restoreClient); err != nil {
|
||||
logContext.WithError(errors.WithStack(err)).Info("Error updating Restore final status")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (controller *restoreController) getValidationErrors(itm *api.Restore) []string {
|
||||
var validationErrors []string
|
||||
|
||||
if itm.Spec.BackupName == "" {
|
||||
validationErrors = append(validationErrors, "BackupName must be non-empty and correspond to the name of a backup in object storage.")
|
||||
} else if _, err := controller.fetchBackup(controller.bucket, itm.Spec.BackupName); err != nil {
|
||||
validationErrors = append(validationErrors, fmt.Sprintf("Error retrieving backup: %v", err))
|
||||
func (c *restoreController) completeAndValidate(restore *api.Restore) []string {
|
||||
// add non-restorable resources to restore's excluded resources
|
||||
excludedResources := sets.NewString(restore.Spec.ExcludedResources...)
|
||||
for _, nonrestorable := range nonRestorableResources {
|
||||
if !excludedResources.Has(nonrestorable) {
|
||||
restore.Spec.ExcludedResources = append(restore.Spec.ExcludedResources, nonrestorable)
|
||||
}
|
||||
}
|
||||
|
||||
includedResources := sets.NewString(itm.Spec.IncludedResources...)
|
||||
var validationErrors []string
|
||||
|
||||
// validate that included resources don't contain any non-restorable resources
|
||||
includedResources := sets.NewString(restore.Spec.IncludedResources...)
|
||||
for _, nonRestorableResource := range nonRestorableResources {
|
||||
if includedResources.Has(nonRestorableResource) {
|
||||
validationErrors = append(validationErrors, fmt.Sprintf("%v are non-restorable resources", nonRestorableResource))
|
||||
}
|
||||
}
|
||||
|
||||
for _, err := range collections.ValidateIncludesExcludes(itm.Spec.IncludedNamespaces, itm.Spec.ExcludedNamespaces) {
|
||||
validationErrors = append(validationErrors, fmt.Sprintf("Invalid included/excluded namespace lists: %v", err))
|
||||
}
|
||||
|
||||
for _, err := range collections.ValidateIncludesExcludes(itm.Spec.IncludedResources, itm.Spec.ExcludedResources) {
|
||||
// validate included/excluded resources
|
||||
for _, err := range collections.ValidateIncludesExcludes(restore.Spec.IncludedResources, restore.Spec.ExcludedResources) {
|
||||
validationErrors = append(validationErrors, fmt.Sprintf("Invalid included/excluded resource lists: %v", err))
|
||||
}
|
||||
|
||||
if !controller.pvProviderExists && itm.Spec.RestorePVs != nil && *itm.Spec.RestorePVs {
|
||||
// validate included/excluded namespaces
|
||||
for _, err := range collections.ValidateIncludesExcludes(restore.Spec.IncludedNamespaces, restore.Spec.ExcludedNamespaces) {
|
||||
validationErrors = append(validationErrors, fmt.Sprintf("Invalid included/excluded namespace lists: %v", err))
|
||||
}
|
||||
|
||||
// validate that PV provider exists if we're restoring PVs
|
||||
if boolptr.IsSetToTrue(restore.Spec.RestorePVs) && !c.pvProviderExists {
|
||||
validationErrors = append(validationErrors, "Server is not configured for PV snapshot restores")
|
||||
}
|
||||
|
||||
// validate that exactly one of BackupName and ScheduleName have been specified
|
||||
if !backupXorScheduleProvided(restore) {
|
||||
return append(validationErrors, "Either a backup or schedule must be specified as a source for the restore, but not both")
|
||||
}
|
||||
|
||||
// if ScheduleName is specified, fill in BackupName with the most recent successful backup from
|
||||
// the schedule
|
||||
if restore.Spec.ScheduleName != "" {
|
||||
selector := labels.SelectorFromSet(labels.Set(map[string]string{
|
||||
"ark-schedule": restore.Spec.ScheduleName,
|
||||
}))
|
||||
|
||||
backups, err := c.backupLister.Backups(c.namespace).List(selector)
|
||||
if err != nil {
|
||||
return append(validationErrors, "Unable to list backups for schedule")
|
||||
}
|
||||
if len(backups) == 0 {
|
||||
return append(validationErrors, "No backups found for schedule")
|
||||
}
|
||||
|
||||
if backup := mostRecentCompletedBackup(backups); backup != nil {
|
||||
restore.Spec.BackupName = backup.Name
|
||||
} else {
|
||||
return append(validationErrors, "No completed backups found for schedule")
|
||||
}
|
||||
}
|
||||
|
||||
// validate that we can fetch the source backup
|
||||
if _, err := c.fetchBackup(c.bucket, restore.Spec.BackupName); err != nil {
|
||||
return append(validationErrors, fmt.Sprintf("Error retrieving backup: %v", err))
|
||||
}
|
||||
|
||||
return validationErrors
|
||||
}
|
||||
|
||||
func (controller *restoreController) fetchBackup(bucket, name string) (*api.Backup, error) {
|
||||
backup, err := controller.backupLister.Backups(controller.namespace).Get(name)
|
||||
// backupXorScheduleProvided returns true if exactly one of BackupName and
|
||||
// ScheduleName are non-empty for the restore, or false otherwise.
|
||||
func backupXorScheduleProvided(restore *api.Restore) bool {
|
||||
if restore.Spec.BackupName != "" && restore.Spec.ScheduleName != "" {
|
||||
return false
|
||||
}
|
||||
|
||||
if restore.Spec.BackupName == "" && restore.Spec.ScheduleName == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// mostRecentCompletedBackup returns the most recent backup that's
|
||||
// completed from a list of backups.
|
||||
func mostRecentCompletedBackup(backups []*api.Backup) *api.Backup {
|
||||
sort.Slice(backups, func(i, j int) bool {
|
||||
// Use .After() because we want descending sort.
|
||||
return backups[i].Status.StartTimestamp.After(backups[j].Status.StartTimestamp.Time)
|
||||
})
|
||||
|
||||
for _, backup := range backups {
|
||||
if backup.Status.Phase == api.BackupPhaseCompleted {
|
||||
return backup
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *restoreController) fetchBackup(bucket, name string) (*api.Backup, error) {
|
||||
backup, err := c.backupLister.Backups(c.namespace).Get(name)
|
||||
if err == nil {
|
||||
return backup, nil
|
||||
}
|
||||
|
@ -345,10 +410,10 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back
|
|||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
logContext := controller.logger.WithField("backupName", name)
|
||||
logContext := c.logger.WithField("backupName", name)
|
||||
|
||||
logContext.Debug("Backup not found in backupLister, checking object storage directly")
|
||||
backup, err = controller.backupService.GetBackup(bucket, name)
|
||||
backup, err = c.backupService.GetBackup(bucket, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -358,7 +423,7 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back
|
|||
// Clear out the namespace too, just in case
|
||||
backup.Namespace = ""
|
||||
|
||||
created, createErr := controller.backupClient.Backups(controller.namespace).Create(backup)
|
||||
created, createErr := c.backupClient.Backups(c.namespace).Create(backup)
|
||||
if createErr != nil {
|
||||
logContext.WithError(errors.WithStack(createErr)).Error("Unable to create API object for Backup")
|
||||
} else {
|
||||
|
@ -368,14 +433,14 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back
|
|||
return backup, nil
|
||||
}
|
||||
|
||||
func (controller *restoreController) runRestore(restore *api.Restore, bucket string) (restoreWarnings, restoreErrors api.RestoreResult) {
|
||||
logContext := controller.logger.WithFields(
|
||||
func (c *restoreController) runRestore(restore *api.Restore, bucket string) (restoreWarnings, restoreErrors api.RestoreResult) {
|
||||
logContext := c.logger.WithFields(
|
||||
logrus.Fields{
|
||||
"restore": kubeutil.NamespaceAndName(restore),
|
||||
"backup": restore.Spec.BackupName,
|
||||
})
|
||||
|
||||
backup, err := controller.fetchBackup(bucket, restore.Spec.BackupName)
|
||||
backup, err := c.fetchBackup(bucket, restore.Spec.BackupName)
|
||||
if err != nil {
|
||||
logContext.WithError(err).Error("Error getting backup")
|
||||
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
|
||||
|
@ -384,7 +449,7 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str
|
|||
|
||||
var tempFiles []*os.File
|
||||
|
||||
backupFile, err := downloadToTempFile(restore.Spec.BackupName, controller.backupService, bucket, controller.logger)
|
||||
backupFile, err := downloadToTempFile(restore.Spec.BackupName, c.backupService, bucket, c.logger)
|
||||
if err != nil {
|
||||
logContext.WithError(err).Error("Error downloading backup")
|
||||
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
|
||||
|
@ -420,15 +485,15 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str
|
|||
}
|
||||
}()
|
||||
|
||||
actions, err := controller.pluginManager.GetRestoreItemActions(restore.Name)
|
||||
actions, err := c.pluginManager.GetRestoreItemActions(restore.Name)
|
||||
if err != nil {
|
||||
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
|
||||
return
|
||||
}
|
||||
defer controller.pluginManager.CloseRestoreItemActions(restore.Name)
|
||||
defer c.pluginManager.CloseRestoreItemActions(restore.Name)
|
||||
|
||||
logContext.Info("starting restore")
|
||||
restoreWarnings, restoreErrors = controller.restorer.Restore(restore, backup, backupFile, logFile, actions)
|
||||
restoreWarnings, restoreErrors = c.restorer.Restore(restore, backup, backupFile, logFile, actions)
|
||||
logContext.Info("restore completed")
|
||||
|
||||
// Try to upload the log file. This is best-effort. If we fail, we'll add to the ark errors.
|
||||
|
@ -439,7 +504,7 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str
|
|||
return
|
||||
}
|
||||
|
||||
if err := controller.backupService.UploadRestoreLog(bucket, restore.Spec.BackupName, restore.Name, logFile); err != nil {
|
||||
if err := c.backupService.UploadRestoreLog(bucket, restore.Spec.BackupName, restore.Name, logFile); err != nil {
|
||||
restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error uploading log file to object storage: %v", err))
|
||||
}
|
||||
|
||||
|
@ -460,7 +525,7 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str
|
|||
logContext.WithError(errors.WithStack(err)).Error("Error resetting results file offset to 0")
|
||||
return
|
||||
}
|
||||
if err := controller.backupService.UploadRestoreResults(bucket, restore.Spec.BackupName, restore.Name, resultsFile); err != nil {
|
||||
if err := c.backupService.UploadRestoreResults(bucket, restore.Spec.BackupName, restore.Name, resultsFile); err != nil {
|
||||
logContext.WithError(errors.WithStack(err)).Error("Error uploading results files to object storage")
|
||||
}
|
||||
|
||||
|
|
|
@ -23,11 +23,13 @@ import (
|
|||
"io"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
@ -172,13 +174,32 @@ func TestProcessRestore(t *testing.T) {
|
|||
expectedValidationErrors: []string{"Invalid included/excluded resource lists: excludes list cannot contain an item in the includes list: a-resource"},
|
||||
},
|
||||
{
|
||||
name: "new restore with empty backup name fails validation",
|
||||
name: "new restore with empty backup and schedule names fails validation",
|
||||
restore: NewRestore("foo", "bar", "", "ns-1", "", api.RestorePhaseNew).Restore,
|
||||
expectedErr: false,
|
||||
expectedPhase: string(api.RestorePhaseFailedValidation),
|
||||
expectedValidationErrors: []string{"BackupName must be non-empty and correspond to the name of a backup in object storage."},
|
||||
expectedValidationErrors: []string{"Either a backup or schedule must be specified as a source for the restore, but not both"},
|
||||
},
|
||||
{
|
||||
name: "new restore with backup and schedule names provided fails validation",
|
||||
restore: NewRestore("foo", "bar", "backup-1", "ns-1", "", api.RestorePhaseNew).WithSchedule("sched-1").Restore,
|
||||
expectedErr: false,
|
||||
expectedPhase: string(api.RestorePhaseFailedValidation),
|
||||
expectedValidationErrors: []string{"Either a backup or schedule must be specified as a source for the restore, but not both"},
|
||||
},
|
||||
{
|
||||
name: "valid restore with schedule name gets executed",
|
||||
restore: NewRestore("foo", "bar", "", "ns-1", "", api.RestorePhaseNew).WithSchedule("sched-1").Restore,
|
||||
backup: arktest.
|
||||
NewTestBackup().
|
||||
WithName("backup-1").
|
||||
WithLabel("ark-schedule", "sched-1").
|
||||
WithPhase(api.BackupPhaseCompleted).
|
||||
Backup,
|
||||
expectedErr: false,
|
||||
expectedPhase: string(api.RestorePhaseInProgress),
|
||||
expectedRestorerCall: NewRestore("foo", "bar", "backup-1", "ns-1", "", api.RestorePhaseInProgress).WithSchedule("sched-1").Restore,
|
||||
},
|
||||
|
||||
{
|
||||
name: "restore with non-existent backup name fails",
|
||||
restore: NewRestore("foo", "bar", "backup-1", "ns-1", "*", api.RestorePhaseNew).Restore,
|
||||
|
@ -337,6 +358,10 @@ func TestProcessRestore(t *testing.T) {
|
|||
|
||||
res.Status.Phase = api.RestorePhase(phase)
|
||||
|
||||
if backupName, err := collections.GetString(patchMap, "spec.backupName"); err == nil {
|
||||
res.Spec.BackupName = backupName
|
||||
}
|
||||
|
||||
return true, res, nil
|
||||
})
|
||||
}
|
||||
|
@ -356,8 +381,8 @@ func TestProcessRestore(t *testing.T) {
|
|||
downloadedBackup := ioutil.NopCloser(bytes.NewReader([]byte("hello world")))
|
||||
backupSvc.On("DownloadBackup", mock.Anything, mock.Anything).Return(downloadedBackup, nil)
|
||||
restorer.On("Restore", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(warnings, errors)
|
||||
backupSvc.On("UploadRestoreLog", "bucket", test.restore.Spec.BackupName, test.restore.Name, mock.Anything).Return(test.uploadLogError)
|
||||
backupSvc.On("UploadRestoreResults", "bucket", test.restore.Spec.BackupName, test.restore.Name, mock.Anything).Return(nil)
|
||||
backupSvc.On("UploadRestoreLog", "bucket", test.backup.Name, test.restore.Name, mock.Anything).Return(test.uploadLogError)
|
||||
backupSvc.On("UploadRestoreResults", "bucket", test.backup.Name, test.restore.Name, mock.Anything).Return(nil)
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -372,7 +397,7 @@ func TestProcessRestore(t *testing.T) {
|
|||
}
|
||||
|
||||
if test.backupServiceGetBackupError != nil {
|
||||
backupSvc.On("GetBackup", "bucket", test.restore.Spec.BackupName).Return(nil, test.backupServiceGetBackupError)
|
||||
backupSvc.On("GetBackup", "bucket", mock.Anything).Return(nil, test.backupServiceGetBackupError)
|
||||
}
|
||||
|
||||
if test.restore != nil {
|
||||
|
@ -394,6 +419,10 @@ func TestProcessRestore(t *testing.T) {
|
|||
}
|
||||
|
||||
// structs and func for decoding patch content
|
||||
type SpecPatch struct {
|
||||
BackupName string `json:"backupName"`
|
||||
}
|
||||
|
||||
type StatusPatch struct {
|
||||
Phase api.RestorePhase `json:"phase"`
|
||||
ValidationErrors []string `json:"validationErrors"`
|
||||
|
@ -401,6 +430,7 @@ func TestProcessRestore(t *testing.T) {
|
|||
}
|
||||
|
||||
type Patch struct {
|
||||
Spec SpecPatch `json:"spec,omitempty"`
|
||||
Status StatusPatch `json:"status"`
|
||||
}
|
||||
|
||||
|
@ -421,6 +451,12 @@ func TestProcessRestore(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
if test.restore.Spec.ScheduleName != "" && test.backup != nil {
|
||||
expected.Spec = SpecPatch{
|
||||
BackupName: test.backup.Name,
|
||||
}
|
||||
}
|
||||
|
||||
arktest.ValidatePatch(t, actions[0], expected, decode)
|
||||
|
||||
// if we don't expect a restore, validate it wasn't called and exit the test
|
||||
|
@ -450,6 +486,178 @@ func TestProcessRestore(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCompleteAndValidateWhenScheduleNameSpecified(t *testing.T) {
|
||||
var (
|
||||
client = fake.NewSimpleClientset()
|
||||
sharedInformers = informers.NewSharedInformerFactory(client, 0)
|
||||
logger = arktest.NewLogger()
|
||||
)
|
||||
|
||||
c := NewRestoreController(
|
||||
api.DefaultNamespace,
|
||||
sharedInformers.Ark().V1().Restores(),
|
||||
client.ArkV1(),
|
||||
client.ArkV1(),
|
||||
nil,
|
||||
nil,
|
||||
"bucket",
|
||||
sharedInformers.Ark().V1().Backups(),
|
||||
false,
|
||||
logger,
|
||||
nil,
|
||||
).(*restoreController)
|
||||
|
||||
restore := &api.Restore{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: api.DefaultNamespace,
|
||||
Name: "restore-1",
|
||||
},
|
||||
Spec: api.RestoreSpec{
|
||||
ScheduleName: "schedule-1",
|
||||
},
|
||||
}
|
||||
|
||||
// no backups created from the schedule: fail validation
|
||||
require.NoError(t, sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(arktest.
|
||||
NewTestBackup().
|
||||
WithName("backup-1").
|
||||
WithLabel("ark-schedule", "non-matching-schedule").
|
||||
WithPhase(api.BackupPhaseCompleted).
|
||||
Backup,
|
||||
))
|
||||
|
||||
errs := c.completeAndValidate(restore)
|
||||
assert.Equal(t, []string{"No backups found for schedule"}, errs)
|
||||
assert.Empty(t, restore.Spec.BackupName)
|
||||
|
||||
// no completed backups created from the schedule: fail validation
|
||||
require.NoError(t, sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(arktest.
|
||||
NewTestBackup().
|
||||
WithName("backup-2").
|
||||
WithLabel("ark-schedule", "schedule-1").
|
||||
WithPhase(api.BackupPhaseInProgress).
|
||||
Backup,
|
||||
))
|
||||
|
||||
errs = c.completeAndValidate(restore)
|
||||
assert.Equal(t, []string{"No completed backups found for schedule"}, errs)
|
||||
assert.Empty(t, restore.Spec.BackupName)
|
||||
|
||||
// multiple completed backups created from the schedule: use most recent
|
||||
now := time.Now()
|
||||
|
||||
require.NoError(t, sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(arktest.
|
||||
NewTestBackup().
|
||||
WithName("foo").
|
||||
WithLabel("ark-schedule", "schedule-1").
|
||||
WithPhase(api.BackupPhaseCompleted).
|
||||
WithStartTimestamp(now).
|
||||
Backup,
|
||||
))
|
||||
require.NoError(t, sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(arktest.
|
||||
NewTestBackup().
|
||||
WithName("bar").
|
||||
WithLabel("ark-schedule", "schedule-1").
|
||||
WithPhase(api.BackupPhaseCompleted).
|
||||
WithStartTimestamp(now.Add(time.Second)).
|
||||
Backup,
|
||||
))
|
||||
|
||||
errs = c.completeAndValidate(restore)
|
||||
assert.Nil(t, errs)
|
||||
assert.Equal(t, "bar", restore.Spec.BackupName)
|
||||
}
|
||||
|
||||
func TestBackupXorScheduleProvided(t *testing.T) {
|
||||
r := &api.Restore{}
|
||||
assert.False(t, backupXorScheduleProvided(r))
|
||||
|
||||
r.Spec.BackupName = "backup-1"
|
||||
r.Spec.ScheduleName = "schedule-1"
|
||||
assert.False(t, backupXorScheduleProvided(r))
|
||||
|
||||
r.Spec.BackupName = "backup-1"
|
||||
r.Spec.ScheduleName = ""
|
||||
assert.True(t, backupXorScheduleProvided(r))
|
||||
|
||||
r.Spec.BackupName = ""
|
||||
r.Spec.ScheduleName = "schedule-1"
|
||||
assert.True(t, backupXorScheduleProvided(r))
|
||||
|
||||
}
|
||||
|
||||
func TestMostRecentCompletedBackup(t *testing.T) {
|
||||
backups := []*api.Backup{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "a",
|
||||
},
|
||||
Status: api.BackupStatus{
|
||||
Phase: "",
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "b",
|
||||
},
|
||||
Status: api.BackupStatus{
|
||||
Phase: api.BackupPhaseNew,
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "c",
|
||||
},
|
||||
Status: api.BackupStatus{
|
||||
Phase: api.BackupPhaseInProgress,
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "d",
|
||||
},
|
||||
Status: api.BackupStatus{
|
||||
Phase: api.BackupPhaseFailedValidation,
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "e",
|
||||
},
|
||||
Status: api.BackupStatus{
|
||||
Phase: api.BackupPhaseFailed,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
assert.Nil(t, mostRecentCompletedBackup(backups))
|
||||
|
||||
now := time.Now()
|
||||
|
||||
backups = append(backups, &api.Backup{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo",
|
||||
},
|
||||
Status: api.BackupStatus{
|
||||
Phase: api.BackupPhaseCompleted,
|
||||
StartTimestamp: metav1.Time{Time: now},
|
||||
},
|
||||
})
|
||||
|
||||
expected := &api.Backup{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "bar",
|
||||
},
|
||||
Status: api.BackupStatus{
|
||||
Phase: api.BackupPhaseCompleted,
|
||||
StartTimestamp: metav1.Time{Time: now.Add(time.Second)},
|
||||
},
|
||||
}
|
||||
backups = append(backups, expected)
|
||||
|
||||
assert.Equal(t, expected, mostRecentCompletedBackup(backups))
|
||||
}
|
||||
|
||||
func NewRestore(ns, name, backup, includeNS, includeResource string, phase api.RestorePhase) *arktest.TestRestore {
|
||||
restore := arktest.NewTestRestore(ns, name, phase).WithBackup(backup)
|
||||
|
||||
|
|
|
@ -130,3 +130,8 @@ func (b *TestBackup) WithFinalizers(finalizers ...string) *TestBackup {
|
|||
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *TestBackup) WithStartTimestamp(startTime time.Time) *TestBackup {
|
||||
b.Status.StartTimestamp = metav1.Time{Time: startTime}
|
||||
return b
|
||||
}
|
||||
|
|
|
@ -65,6 +65,11 @@ func (r *TestRestore) WithBackup(name string) *TestRestore {
|
|||
return r
|
||||
}
|
||||
|
||||
func (r *TestRestore) WithSchedule(name string) *TestRestore {
|
||||
r.Spec.ScheduleName = name
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *TestRestore) WithErrors(i int) *TestRestore {
|
||||
r.Status.Errors = i
|
||||
return r
|
||||
|
|
Loading…
Reference in New Issue