2017-08-02 17:27:17 +00:00
|
|
|
/*
|
2018-01-02 18:51:49 +00:00
|
|
|
Copyright 2017 the Heptio Ark contributors.
|
2017-08-02 17:27:17 +00:00
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package controller
|
|
|
|
|
|
|
|
import (
|
2017-10-26 15:24:16 +00:00
|
|
|
"compress/gzip"
|
2017-08-02 17:27:17 +00:00
|
|
|
"context"
|
2017-10-26 15:24:16 +00:00
|
|
|
"encoding/json"
|
2017-08-02 17:27:17 +00:00
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"io/ioutil"
|
|
|
|
"os"
|
2018-04-20 18:02:59 +00:00
|
|
|
"sort"
|
2017-08-02 17:27:17 +00:00
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
2018-05-14 21:34:24 +00:00
|
|
|
jsonpatch "github.com/evanphx/json-patch"
|
2017-09-14 21:27:31 +00:00
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/sirupsen/logrus"
|
2017-08-02 17:27:17 +00:00
|
|
|
|
2017-08-25 22:02:00 +00:00
|
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
2018-04-20 18:02:59 +00:00
|
|
|
"k8s.io/apimachinery/pkg/labels"
|
2017-12-11 22:10:52 +00:00
|
|
|
"k8s.io/apimachinery/pkg/types"
|
2017-09-19 03:44:33 +00:00
|
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
2017-08-02 17:27:17 +00:00
|
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
|
|
"k8s.io/client-go/tools/cache"
|
|
|
|
"k8s.io/client-go/util/workqueue"
|
|
|
|
|
|
|
|
api "github.com/heptio/ark/pkg/apis/ark/v1"
|
|
|
|
"github.com/heptio/ark/pkg/cloudprovider"
|
2017-10-25 16:42:03 +00:00
|
|
|
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
|
2017-08-02 17:27:17 +00:00
|
|
|
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
|
|
|
|
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
|
2018-06-25 18:15:46 +00:00
|
|
|
"github.com/heptio/ark/pkg/metrics"
|
2017-11-21 17:24:43 +00:00
|
|
|
"github.com/heptio/ark/pkg/plugin"
|
2017-08-02 17:27:17 +00:00
|
|
|
"github.com/heptio/ark/pkg/restore"
|
2018-04-20 18:02:59 +00:00
|
|
|
"github.com/heptio/ark/pkg/util/boolptr"
|
2017-08-30 02:14:21 +00:00
|
|
|
"github.com/heptio/ark/pkg/util/collections"
|
2017-09-14 21:27:31 +00:00
|
|
|
kubeutil "github.com/heptio/ark/pkg/util/kube"
|
2017-08-02 17:27:17 +00:00
|
|
|
)
|
|
|
|
|
2017-09-19 03:44:33 +00:00
|
|
|
// nonRestorableResources is a blacklist for the restoration process. Any resources
|
|
|
|
// included here are explicitly excluded from the restoration process.
|
2018-06-28 20:56:39 +00:00
|
|
|
var nonRestorableResources = []string{
|
|
|
|
"nodes",
|
|
|
|
"events",
|
|
|
|
"events.events.k8s.io",
|
|
|
|
|
|
|
|
// Don't ever restore backups - if appropriate, they'll be synced in from object storage.
|
|
|
|
// https://github.com/heptio/ark/issues/622
|
|
|
|
"backups.ark.heptio.com",
|
|
|
|
|
|
|
|
// Restores are cluster-specific, and don't have value moving across clusters.
|
|
|
|
// https://github.com/heptio/ark/issues/622
|
|
|
|
"restores.ark.heptio.com",
|
|
|
|
}
|
2017-09-19 03:44:33 +00:00
|
|
|
|
2017-08-02 17:27:17 +00:00
|
|
|
type restoreController struct {
|
2017-12-22 14:43:44 +00:00
|
|
|
namespace string
|
2017-09-14 21:27:31 +00:00
|
|
|
restoreClient arkv1client.RestoresGetter
|
|
|
|
backupClient arkv1client.BackupsGetter
|
|
|
|
restorer restore.Restorer
|
|
|
|
backupService cloudprovider.BackupService
|
|
|
|
bucket string
|
|
|
|
pvProviderExists bool
|
2017-08-02 17:27:17 +00:00
|
|
|
backupLister listers.BackupLister
|
|
|
|
backupListerSynced cache.InformerSynced
|
|
|
|
restoreLister listers.RestoreLister
|
|
|
|
restoreListerSynced cache.InformerSynced
|
|
|
|
syncHandler func(restoreName string) error
|
|
|
|
queue workqueue.RateLimitingInterface
|
2017-11-21 17:24:43 +00:00
|
|
|
logger logrus.FieldLogger
|
|
|
|
pluginManager plugin.Manager
|
2018-06-25 18:15:46 +00:00
|
|
|
metrics *metrics.ServerMetrics
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewRestoreController(
|
2017-12-22 14:43:44 +00:00
|
|
|
namespace string,
|
2017-08-02 17:27:17 +00:00
|
|
|
restoreInformer informers.RestoreInformer,
|
|
|
|
restoreClient arkv1client.RestoresGetter,
|
|
|
|
backupClient arkv1client.BackupsGetter,
|
|
|
|
restorer restore.Restorer,
|
|
|
|
backupService cloudprovider.BackupService,
|
|
|
|
bucket string,
|
|
|
|
backupInformer informers.BackupInformer,
|
2017-08-18 22:11:42 +00:00
|
|
|
pvProviderExists bool,
|
2017-11-21 17:24:43 +00:00
|
|
|
logger logrus.FieldLogger,
|
|
|
|
pluginManager plugin.Manager,
|
2018-06-25 18:15:46 +00:00
|
|
|
metrics *metrics.ServerMetrics,
|
2017-08-02 17:27:17 +00:00
|
|
|
) Interface {
|
|
|
|
c := &restoreController{
|
2017-12-22 14:43:44 +00:00
|
|
|
namespace: namespace,
|
2017-08-18 22:11:42 +00:00
|
|
|
restoreClient: restoreClient,
|
|
|
|
backupClient: backupClient,
|
|
|
|
restorer: restorer,
|
|
|
|
backupService: backupService,
|
|
|
|
bucket: bucket,
|
|
|
|
pvProviderExists: pvProviderExists,
|
|
|
|
backupLister: backupInformer.Lister(),
|
|
|
|
backupListerSynced: backupInformer.Informer().HasSynced,
|
|
|
|
restoreLister: restoreInformer.Lister(),
|
|
|
|
restoreListerSynced: restoreInformer.Informer().HasSynced,
|
|
|
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "restore"),
|
2017-09-14 21:27:31 +00:00
|
|
|
logger: logger,
|
2017-11-21 17:24:43 +00:00
|
|
|
pluginManager: pluginManager,
|
2018-06-25 18:15:46 +00:00
|
|
|
metrics: metrics,
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
c.syncHandler = c.processRestore
|
|
|
|
|
|
|
|
restoreInformer.Informer().AddEventHandler(
|
|
|
|
cache.ResourceEventHandlerFuncs{
|
|
|
|
AddFunc: func(obj interface{}) {
|
|
|
|
restore := obj.(*api.Restore)
|
|
|
|
|
|
|
|
switch restore.Status.Phase {
|
|
|
|
case "", api.RestorePhaseNew:
|
|
|
|
// only process new restores
|
|
|
|
default:
|
2017-09-14 21:27:31 +00:00
|
|
|
c.logger.WithFields(logrus.Fields{
|
|
|
|
"restore": kubeutil.NamespaceAndName(restore),
|
|
|
|
"phase": restore.Status.Phase,
|
|
|
|
}).Debug("Restore is not new, skipping")
|
2017-08-02 17:27:17 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
key, err := cache.MetaNamespaceKeyFunc(restore)
|
|
|
|
if err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
c.logger.WithError(errors.WithStack(err)).WithField("restore", restore).Error("Error creating queue key, item not added to queue")
|
2017-08-02 17:27:17 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
c.queue.Add(key)
|
|
|
|
},
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run is a blocking function that runs the specified number of worker goroutines
|
|
|
|
// to process items in the work queue. It will return when it receives on the
|
|
|
|
// ctx.Done() channel.
|
2018-07-09 18:02:41 +00:00
|
|
|
func (c *restoreController) Run(ctx context.Context, numWorkers int) error {
|
2017-08-02 17:27:17 +00:00
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
|
|
|
defer func() {
|
2018-07-09 18:02:41 +00:00
|
|
|
c.logger.Info("Waiting for workers to finish their work")
|
2017-08-02 17:27:17 +00:00
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
c.queue.ShutDown()
|
2017-08-02 17:27:17 +00:00
|
|
|
|
|
|
|
// We have to wait here in the deferred function instead of at the bottom of the function body
|
|
|
|
// because we have to shut down the queue in order for the workers to shut down gracefully, and
|
|
|
|
// we want to shut down the queue via defer and not at the end of the body.
|
|
|
|
wg.Wait()
|
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
c.logger.Info("All workers have finished")
|
2017-08-02 17:27:17 +00:00
|
|
|
}()
|
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
c.logger.Info("Starting RestoreController")
|
|
|
|
defer c.logger.Info("Shutting down RestoreController")
|
2017-08-02 17:27:17 +00:00
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
c.logger.Info("Waiting for caches to sync")
|
|
|
|
if !cache.WaitForCacheSync(ctx.Done(), c.backupListerSynced, c.restoreListerSynced) {
|
2017-08-02 17:27:17 +00:00
|
|
|
return errors.New("timed out waiting for caches to sync")
|
|
|
|
}
|
2018-07-09 18:02:41 +00:00
|
|
|
c.logger.Info("Caches are synced")
|
2017-08-02 17:27:17 +00:00
|
|
|
|
|
|
|
wg.Add(numWorkers)
|
|
|
|
for i := 0; i < numWorkers; i++ {
|
|
|
|
go func() {
|
2018-07-09 18:02:41 +00:00
|
|
|
wait.Until(c.runWorker, time.Second, ctx.Done())
|
2017-08-02 17:27:17 +00:00
|
|
|
wg.Done()
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
<-ctx.Done()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
func (c *restoreController) runWorker() {
|
2017-08-02 17:27:17 +00:00
|
|
|
// continually take items off the queue (waits if it's
|
|
|
|
// empty) until we get a shutdown signal from the queue
|
2018-07-09 18:02:41 +00:00
|
|
|
for c.processNextWorkItem() {
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
func (c *restoreController) processNextWorkItem() bool {
|
|
|
|
key, quit := c.queue.Get()
|
2017-08-02 17:27:17 +00:00
|
|
|
if quit {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// always call done on this item, since if it fails we'll add
|
|
|
|
// it back with rate-limiting below
|
2018-07-09 18:02:41 +00:00
|
|
|
defer c.queue.Done(key)
|
2017-08-02 17:27:17 +00:00
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
err := c.syncHandler(key.(string))
|
2017-08-02 17:27:17 +00:00
|
|
|
if err == nil {
|
|
|
|
// If you had no error, tell the queue to stop tracking history for your key. This will reset
|
|
|
|
// things like failure counts for per-item rate limiting.
|
2018-07-09 18:02:41 +00:00
|
|
|
c.queue.Forget(key)
|
2017-08-02 17:27:17 +00:00
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
c.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue")
|
2017-08-02 17:27:17 +00:00
|
|
|
// we had an error processing the item so add it back
|
|
|
|
// into the queue for re-processing with rate-limiting
|
2018-07-09 18:02:41 +00:00
|
|
|
c.queue.AddRateLimited(key)
|
2017-08-02 17:27:17 +00:00
|
|
|
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
func (c *restoreController) processRestore(key string) error {
|
|
|
|
logContext := c.logger.WithField("key", key)
|
2017-09-14 21:27:31 +00:00
|
|
|
|
|
|
|
logContext.Debug("Running processRestore")
|
2017-08-02 17:27:17 +00:00
|
|
|
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
|
|
|
if err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
return errors.Wrap(err, "error splitting queue key")
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
|
2017-09-14 21:27:31 +00:00
|
|
|
logContext.Debug("Getting Restore")
|
2018-07-09 18:02:41 +00:00
|
|
|
restore, err := c.restoreLister.Restores(ns).Get(name)
|
2017-08-02 17:27:17 +00:00
|
|
|
if err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
return errors.Wrap(err, "error getting Restore")
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// TODO I think this is now unnecessary. We only initially place
|
|
|
|
// item with Phase = ("" | New) into the queue. Items will only get
|
|
|
|
// re-queued if syncHandler returns an error, which will only
|
|
|
|
// happen if there's an error updating Phase from its initial
|
|
|
|
// state to something else. So any time it's re-queued it will
|
|
|
|
// still have its initial state, which we've already confirmed
|
|
|
|
// is ("" | New)
|
|
|
|
switch restore.Status.Phase {
|
|
|
|
case "", api.RestorePhaseNew:
|
|
|
|
// only process new restores
|
|
|
|
default:
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-09-14 21:27:31 +00:00
|
|
|
logContext.Debug("Cloning Restore")
|
2017-12-11 22:10:52 +00:00
|
|
|
// store ref to original for creating patch
|
|
|
|
original := restore
|
2017-08-02 17:27:17 +00:00
|
|
|
// don't modify items in the cache
|
2017-10-25 16:57:40 +00:00
|
|
|
restore = restore.DeepCopy()
|
2017-08-02 17:27:17 +00:00
|
|
|
|
2018-06-25 18:15:46 +00:00
|
|
|
excludedResources := sets.NewString(restore.Spec.ExcludedResources...)
|
|
|
|
for _, nonrestorable := range nonRestorableResources {
|
|
|
|
if !excludedResources.Has(nonrestorable) {
|
|
|
|
restore.Spec.ExcludedResources = append(restore.Spec.ExcludedResources, nonrestorable)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
backup, fetchErr := c.fetchBackup(c.bucket, restore.Spec.BackupName)
|
|
|
|
backupScheduleName := ""
|
|
|
|
if backup != nil {
|
|
|
|
backupScheduleName = backup.GetLabels()["ark-schedule"]
|
|
|
|
}
|
|
|
|
// Register attempts before we do validation so we can get better tracking
|
|
|
|
c.metrics.RegisterRestoreAttempt(backupScheduleName)
|
|
|
|
|
|
|
|
// validation
|
|
|
|
if restore.Status.ValidationErrors = c.completeAndValidate(restore, fetchErr); len(restore.Status.ValidationErrors) > 0 {
|
2017-08-02 17:27:17 +00:00
|
|
|
restore.Status.Phase = api.RestorePhaseFailedValidation
|
|
|
|
} else {
|
|
|
|
restore.Status.Phase = api.RestorePhaseInProgress
|
|
|
|
}
|
|
|
|
|
|
|
|
// update status
|
2018-07-09 18:02:41 +00:00
|
|
|
updatedRestore, err := patchRestore(original, restore, c.restoreClient)
|
2017-08-02 17:27:17 +00:00
|
|
|
if err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
return errors.Wrapf(err, "error updating Restore phase to %s", restore.Status.Phase)
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
2017-12-11 22:10:52 +00:00
|
|
|
// store ref to just-updated item for creating patch
|
|
|
|
original = updatedRestore
|
|
|
|
restore = updatedRestore.DeepCopy()
|
2017-08-02 17:27:17 +00:00
|
|
|
|
|
|
|
if restore.Status.Phase == api.RestorePhaseFailedValidation {
|
2018-06-25 18:15:46 +00:00
|
|
|
c.metrics.RegisterRestoreValidationFailed(backupScheduleName)
|
2017-08-02 17:27:17 +00:00
|
|
|
return nil
|
|
|
|
}
|
2017-09-14 21:27:31 +00:00
|
|
|
logContext.Debug("Running restore")
|
2017-08-02 17:27:17 +00:00
|
|
|
// execution & upload of restore
|
2018-06-25 18:15:46 +00:00
|
|
|
restoreWarnings, restoreErrors := c.runRestore(restore, c.bucket, backup)
|
2017-10-26 15:24:16 +00:00
|
|
|
|
|
|
|
restore.Status.Warnings = len(restoreWarnings.Ark) + len(restoreWarnings.Cluster)
|
|
|
|
for _, w := range restoreWarnings.Namespaces {
|
|
|
|
restore.Status.Warnings += len(w)
|
|
|
|
}
|
|
|
|
|
|
|
|
restore.Status.Errors = len(restoreErrors.Ark) + len(restoreErrors.Cluster)
|
|
|
|
for _, e := range restoreErrors.Namespaces {
|
|
|
|
restore.Status.Errors += len(e)
|
|
|
|
}
|
2018-06-25 18:15:46 +00:00
|
|
|
if restore.Status.Errors > 0 {
|
|
|
|
c.metrics.RegisterRestoreIncomplete(backupScheduleName)
|
|
|
|
} else {
|
|
|
|
c.metrics.RegisterRestoreSuccess(backupScheduleName)
|
|
|
|
}
|
2017-08-02 17:27:17 +00:00
|
|
|
|
2017-09-14 21:27:31 +00:00
|
|
|
logContext.Debug("restore completed")
|
2017-08-02 17:27:17 +00:00
|
|
|
restore.Status.Phase = api.RestorePhaseCompleted
|
|
|
|
|
2017-09-14 21:27:31 +00:00
|
|
|
logContext.Debug("Updating Restore final status")
|
2018-07-09 18:02:41 +00:00
|
|
|
if _, err = patchRestore(original, restore, c.restoreClient); err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
logContext.WithError(errors.WithStack(err)).Info("Error updating Restore final status")
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-06-25 18:15:46 +00:00
|
|
|
func (c *restoreController) completeAndValidate(restore *api.Restore, fetchErr error) []string {
|
2018-04-20 18:02:59 +00:00
|
|
|
// add non-restorable resources to restore's excluded resources
|
|
|
|
excludedResources := sets.NewString(restore.Spec.ExcludedResources...)
|
|
|
|
for _, nonrestorable := range nonRestorableResources {
|
|
|
|
if !excludedResources.Has(nonrestorable) {
|
|
|
|
restore.Spec.ExcludedResources = append(restore.Spec.ExcludedResources, nonrestorable)
|
|
|
|
}
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
2018-04-20 18:02:59 +00:00
|
|
|
var validationErrors []string
|
|
|
|
|
2018-06-25 18:15:46 +00:00
|
|
|
if restore.Spec.BackupName == "" {
|
|
|
|
validationErrors = append(validationErrors, "BackupName must be non-empty and correspond to the name of a backup in object storage.")
|
|
|
|
} else if fetchErr != nil {
|
|
|
|
validationErrors = append(validationErrors, fmt.Sprintf("Error retrieving backup: %v", fetchErr))
|
|
|
|
}
|
|
|
|
|
2018-04-20 18:02:59 +00:00
|
|
|
// validate that included resources don't contain any non-restorable resources
|
|
|
|
includedResources := sets.NewString(restore.Spec.IncludedResources...)
|
2017-09-19 03:44:33 +00:00
|
|
|
for _, nonRestorableResource := range nonRestorableResources {
|
|
|
|
if includedResources.Has(nonRestorableResource) {
|
2018-03-11 01:28:35 +00:00
|
|
|
validationErrors = append(validationErrors, fmt.Sprintf("%v are non-restorable resources", nonRestorableResource))
|
2017-09-19 03:44:33 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-04-20 18:02:59 +00:00
|
|
|
// validate included/excluded resources
|
|
|
|
for _, err := range collections.ValidateIncludesExcludes(restore.Spec.IncludedResources, restore.Spec.ExcludedResources) {
|
|
|
|
validationErrors = append(validationErrors, fmt.Sprintf("Invalid included/excluded resource lists: %v", err))
|
2017-08-27 16:42:10 +00:00
|
|
|
}
|
|
|
|
|
2018-04-20 18:02:59 +00:00
|
|
|
// validate included/excluded namespaces
|
|
|
|
for _, err := range collections.ValidateIncludesExcludes(restore.Spec.IncludedNamespaces, restore.Spec.ExcludedNamespaces) {
|
|
|
|
validationErrors = append(validationErrors, fmt.Sprintf("Invalid included/excluded namespace lists: %v", err))
|
2017-09-01 21:39:30 +00:00
|
|
|
}
|
|
|
|
|
2018-04-20 18:02:59 +00:00
|
|
|
// validate that PV provider exists if we're restoring PVs
|
2018-07-09 18:02:41 +00:00
|
|
|
if boolptr.IsSetToTrue(restore.Spec.RestorePVs) && !c.pvProviderExists {
|
2017-08-09 22:52:27 +00:00
|
|
|
validationErrors = append(validationErrors, "Server is not configured for PV snapshot restores")
|
|
|
|
}
|
|
|
|
|
2018-04-20 18:02:59 +00:00
|
|
|
// validate that exactly one of BackupName and ScheduleName have been specified
|
|
|
|
if !backupXorScheduleProvided(restore) {
|
|
|
|
return append(validationErrors, "Either a backup or schedule must be specified as a source for the restore, but not both")
|
|
|
|
}
|
|
|
|
|
|
|
|
// if ScheduleName is specified, fill in BackupName with the most recent successful backup from
|
|
|
|
// the schedule
|
|
|
|
if restore.Spec.ScheduleName != "" {
|
|
|
|
selector := labels.SelectorFromSet(labels.Set(map[string]string{
|
|
|
|
"ark-schedule": restore.Spec.ScheduleName,
|
|
|
|
}))
|
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
backups, err := c.backupLister.Backups(c.namespace).List(selector)
|
2018-04-20 18:02:59 +00:00
|
|
|
if err != nil {
|
|
|
|
return append(validationErrors, "Unable to list backups for schedule")
|
|
|
|
}
|
|
|
|
if len(backups) == 0 {
|
|
|
|
return append(validationErrors, "No backups found for schedule")
|
|
|
|
}
|
|
|
|
|
|
|
|
if backup := mostRecentCompletedBackup(backups); backup != nil {
|
|
|
|
restore.Spec.BackupName = backup.Name
|
|
|
|
} else {
|
|
|
|
return append(validationErrors, "No completed backups found for schedule")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// validate that we can fetch the source backup
|
2018-07-09 18:02:41 +00:00
|
|
|
if _, err := c.fetchBackup(c.bucket, restore.Spec.BackupName); err != nil {
|
2018-04-20 18:02:59 +00:00
|
|
|
return append(validationErrors, fmt.Sprintf("Error retrieving backup: %v", err))
|
|
|
|
}
|
|
|
|
|
2017-08-02 17:27:17 +00:00
|
|
|
return validationErrors
|
|
|
|
}
|
|
|
|
|
2018-04-20 18:02:59 +00:00
|
|
|
// backupXorScheduleProvided returns true if exactly one of BackupName and
|
|
|
|
// ScheduleName are non-empty for the restore, or false otherwise.
|
|
|
|
func backupXorScheduleProvided(restore *api.Restore) bool {
|
|
|
|
if restore.Spec.BackupName != "" && restore.Spec.ScheduleName != "" {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
if restore.Spec.BackupName == "" && restore.Spec.ScheduleName == "" {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// mostRecentCompletedBackup returns the most recent backup that's
|
2018-07-11 16:56:19 +00:00
|
|
|
// completed from a list of backups.
|
2018-04-20 18:02:59 +00:00
|
|
|
func mostRecentCompletedBackup(backups []*api.Backup) *api.Backup {
|
|
|
|
sort.Slice(backups, func(i, j int) bool {
|
2018-07-11 16:56:19 +00:00
|
|
|
// Use .After() because we want descending sort.
|
|
|
|
return backups[i].Status.StartTimestamp.After(backups[j].Status.StartTimestamp.Time)
|
2018-04-20 18:02:59 +00:00
|
|
|
})
|
|
|
|
|
|
|
|
for _, backup := range backups {
|
|
|
|
if backup.Status.Phase == api.BackupPhaseCompleted {
|
|
|
|
return backup
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
func (c *restoreController) fetchBackup(bucket, name string) (*api.Backup, error) {
|
|
|
|
backup, err := c.backupLister.Backups(c.namespace).Get(name)
|
2017-08-25 22:02:00 +00:00
|
|
|
if err == nil {
|
|
|
|
return backup, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if !apierrors.IsNotFound(err) {
|
2017-09-14 21:27:31 +00:00
|
|
|
return nil, errors.WithStack(err)
|
2017-08-25 22:02:00 +00:00
|
|
|
}
|
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
logContext := c.logger.WithField("backupName", name)
|
2017-09-14 21:27:31 +00:00
|
|
|
|
|
|
|
logContext.Debug("Backup not found in backupLister, checking object storage directly")
|
2018-07-09 18:02:41 +00:00
|
|
|
backup, err = c.backupService.GetBackup(bucket, name)
|
2017-08-25 22:02:00 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// ResourceVersion needs to be cleared in order to create the object in the API
|
|
|
|
backup.ResourceVersion = ""
|
2017-12-22 14:43:44 +00:00
|
|
|
// Clear out the namespace too, just in case
|
|
|
|
backup.Namespace = ""
|
2017-08-25 22:02:00 +00:00
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
created, createErr := c.backupClient.Backups(c.namespace).Create(backup)
|
2017-08-25 22:02:00 +00:00
|
|
|
if createErr != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
logContext.WithError(errors.WithStack(createErr)).Error("Unable to create API object for Backup")
|
2017-08-25 22:02:00 +00:00
|
|
|
} else {
|
|
|
|
backup = created
|
|
|
|
}
|
|
|
|
|
|
|
|
return backup, nil
|
|
|
|
}
|
|
|
|
|
2018-06-25 18:15:46 +00:00
|
|
|
func (c *restoreController) runRestore(restore *api.Restore, bucket string, backup *api.Backup) (restoreWarnings, restoreErrors api.RestoreResult) {
|
2018-07-09 18:02:41 +00:00
|
|
|
logContext := c.logger.WithFields(
|
2017-10-26 15:24:16 +00:00
|
|
|
logrus.Fields{
|
|
|
|
"restore": kubeutil.NamespaceAndName(restore),
|
|
|
|
"backup": restore.Spec.BackupName,
|
|
|
|
})
|
2017-09-14 21:27:31 +00:00
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
backup, err := c.fetchBackup(bucket, restore.Spec.BackupName)
|
2017-08-02 17:27:17 +00:00
|
|
|
if err != nil {
|
2017-10-26 15:24:16 +00:00
|
|
|
logContext.WithError(err).Error("Error getting backup")
|
2017-09-14 21:27:31 +00:00
|
|
|
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
|
2017-08-02 17:27:17 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-10-26 15:24:16 +00:00
|
|
|
var tempFiles []*os.File
|
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
backupFile, err := downloadToTempFile(restore.Spec.BackupName, c.backupService, bucket, c.logger)
|
2017-08-02 17:27:17 +00:00
|
|
|
if err != nil {
|
2017-10-26 15:24:16 +00:00
|
|
|
logContext.WithError(err).Error("Error downloading backup")
|
2017-09-14 21:27:31 +00:00
|
|
|
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
|
2017-09-12 19:54:08 +00:00
|
|
|
return
|
|
|
|
}
|
2017-10-26 15:24:16 +00:00
|
|
|
tempFiles = append(tempFiles, backupFile)
|
2017-09-12 19:54:08 +00:00
|
|
|
|
|
|
|
logFile, err := ioutil.TempFile("", "")
|
|
|
|
if err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
logContext.WithError(errors.WithStack(err)).Error("Error creating log temp file")
|
|
|
|
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
|
2017-08-02 17:27:17 +00:00
|
|
|
return
|
|
|
|
}
|
2017-10-26 15:24:16 +00:00
|
|
|
tempFiles = append(tempFiles, logFile)
|
2017-08-02 17:27:17 +00:00
|
|
|
|
2017-10-26 15:24:16 +00:00
|
|
|
resultsFile, err := ioutil.TempFile("", "")
|
|
|
|
if err != nil {
|
|
|
|
logContext.WithError(errors.WithStack(err)).Error("Error creating results temp file")
|
|
|
|
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
tempFiles = append(tempFiles, resultsFile)
|
2017-09-12 19:54:08 +00:00
|
|
|
|
2017-10-26 15:24:16 +00:00
|
|
|
defer func() {
|
|
|
|
for _, file := range tempFiles {
|
|
|
|
if err := file.Close(); err != nil {
|
|
|
|
logContext.WithError(errors.WithStack(err)).WithField("file", file.Name()).Error("Error closing file")
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := os.Remove(file.Name()); err != nil {
|
|
|
|
logContext.WithError(errors.WithStack(err)).WithField("file", file.Name()).Error("Error removing file")
|
|
|
|
}
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
actions, err := c.pluginManager.GetRestoreItemActions(restore.Name)
|
2017-11-21 17:24:43 +00:00
|
|
|
if err != nil {
|
|
|
|
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
|
|
|
|
return
|
|
|
|
}
|
2018-07-09 18:02:41 +00:00
|
|
|
defer c.pluginManager.CloseRestoreItemActions(restore.Name)
|
2017-11-21 17:24:43 +00:00
|
|
|
|
2017-11-14 18:11:00 +00:00
|
|
|
logContext.Info("starting restore")
|
2018-07-09 18:02:41 +00:00
|
|
|
restoreWarnings, restoreErrors = c.restorer.Restore(restore, backup, backupFile, logFile, actions)
|
2017-11-14 18:11:00 +00:00
|
|
|
logContext.Info("restore completed")
|
2017-09-12 19:54:08 +00:00
|
|
|
|
|
|
|
// Try to upload the log file. This is best-effort. If we fail, we'll add to the ark errors.
|
|
|
|
|
|
|
|
// Reset the offset to 0 for reading
|
|
|
|
if _, err = logFile.Seek(0, 0); err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error resetting log file offset to 0: %v", err))
|
2017-09-12 19:54:08 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-07-09 18:02:41 +00:00
|
|
|
if err := c.backupService.UploadRestoreLog(bucket, restore.Spec.BackupName, restore.Name, logFile); err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error uploading log file to object storage: %v", err))
|
2017-09-12 19:54:08 +00:00
|
|
|
}
|
|
|
|
|
2017-10-26 15:24:16 +00:00
|
|
|
m := map[string]api.RestoreResult{
|
|
|
|
"warnings": restoreWarnings,
|
|
|
|
"errors": restoreErrors,
|
|
|
|
}
|
|
|
|
|
|
|
|
gzippedResultsFile := gzip.NewWriter(resultsFile)
|
|
|
|
|
|
|
|
if err := json.NewEncoder(gzippedResultsFile).Encode(m); err != nil {
|
|
|
|
logContext.WithError(errors.WithStack(err)).Error("Error encoding restore results")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
gzippedResultsFile.Close()
|
|
|
|
|
2017-11-14 19:39:42 +00:00
|
|
|
if _, err = resultsFile.Seek(0, 0); err != nil {
|
|
|
|
logContext.WithError(errors.WithStack(err)).Error("Error resetting results file offset to 0")
|
|
|
|
return
|
|
|
|
}
|
2018-07-09 18:02:41 +00:00
|
|
|
if err := c.backupService.UploadRestoreResults(bucket, restore.Spec.BackupName, restore.Name, resultsFile); err != nil {
|
2017-10-26 15:24:16 +00:00
|
|
|
logContext.WithError(errors.WithStack(err)).Error("Error uploading results files to object storage")
|
|
|
|
}
|
|
|
|
|
2017-09-12 19:54:08 +00:00
|
|
|
return
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
|
2017-11-21 17:24:43 +00:00
|
|
|
func downloadToTempFile(backupName string, backupService cloudprovider.BackupService, bucket string, logger logrus.FieldLogger) (*os.File, error) {
|
2017-08-02 17:27:17 +00:00
|
|
|
readCloser, err := backupService.DownloadBackup(bucket, backupName)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer readCloser.Close()
|
|
|
|
|
|
|
|
file, err := ioutil.TempFile("", backupName)
|
|
|
|
if err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
return nil, errors.Wrap(err, "error creating Backup temp file")
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
n, err := io.Copy(file, readCloser)
|
|
|
|
if err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
return nil, errors.Wrap(err, "error copying Backup to temp file")
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
2017-09-14 21:27:31 +00:00
|
|
|
|
|
|
|
logContext := logger.WithField("backup", backupName)
|
|
|
|
|
|
|
|
logContext.WithFields(logrus.Fields{
|
|
|
|
"fileName": file.Name(),
|
|
|
|
"bytes": n,
|
|
|
|
}).Debug("Copied Backup to file")
|
2017-08-02 17:27:17 +00:00
|
|
|
|
|
|
|
if _, err := file.Seek(0, 0); err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
return nil, errors.Wrap(err, "error resetting Backup file offset")
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return file, nil
|
|
|
|
}
|
2017-12-11 22:10:52 +00:00
|
|
|
|
|
|
|
func patchRestore(original, updated *api.Restore, client arkv1client.RestoresGetter) (*api.Restore, error) {
|
|
|
|
origBytes, err := json.Marshal(original)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "error marshalling original restore")
|
|
|
|
}
|
|
|
|
|
|
|
|
updatedBytes, err := json.Marshal(updated)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "error marshalling updated restore")
|
|
|
|
}
|
|
|
|
|
2018-05-14 21:34:24 +00:00
|
|
|
patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes)
|
2017-12-11 22:10:52 +00:00
|
|
|
if err != nil {
|
2018-05-14 21:34:24 +00:00
|
|
|
return nil, errors.Wrap(err, "error creating json merge patch for restore")
|
2017-12-11 22:10:52 +00:00
|
|
|
}
|
|
|
|
|
2017-12-22 14:43:44 +00:00
|
|
|
res, err := client.Restores(original.Namespace).Patch(original.Name, types.MergePatchType, patchBytes)
|
2017-12-11 22:10:52 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "error patching restore")
|
|
|
|
}
|
|
|
|
|
|
|
|
return res, nil
|
|
|
|
}
|