restore controller: switch to 'c' for receiver name
Signed-off-by: Steve Kriss <steve@heptio.com>pull/443/head
parent
706ae07d0d
commit
c6050845a0
|
@ -150,35 +150,35 @@ func NewRestoreController(
|
|||
// Run is a blocking function that runs the specified number of worker goroutines
|
||||
// to process items in the work queue. It will return when it receives on the
|
||||
// ctx.Done() channel.
|
||||
func (controller *restoreController) Run(ctx context.Context, numWorkers int) error {
|
||||
func (c *restoreController) Run(ctx context.Context, numWorkers int) error {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
defer func() {
|
||||
controller.logger.Info("Waiting for workers to finish their work")
|
||||
c.logger.Info("Waiting for workers to finish their work")
|
||||
|
||||
controller.queue.ShutDown()
|
||||
c.queue.ShutDown()
|
||||
|
||||
// We have to wait here in the deferred function instead of at the bottom of the function body
|
||||
// because we have to shut down the queue in order for the workers to shut down gracefully, and
|
||||
// we want to shut down the queue via defer and not at the end of the body.
|
||||
wg.Wait()
|
||||
|
||||
controller.logger.Info("All workers have finished")
|
||||
c.logger.Info("All workers have finished")
|
||||
}()
|
||||
|
||||
controller.logger.Info("Starting RestoreController")
|
||||
defer controller.logger.Info("Shutting down RestoreController")
|
||||
c.logger.Info("Starting RestoreController")
|
||||
defer c.logger.Info("Shutting down RestoreController")
|
||||
|
||||
controller.logger.Info("Waiting for caches to sync")
|
||||
if !cache.WaitForCacheSync(ctx.Done(), controller.backupListerSynced, controller.restoreListerSynced) {
|
||||
c.logger.Info("Waiting for caches to sync")
|
||||
if !cache.WaitForCacheSync(ctx.Done(), c.backupListerSynced, c.restoreListerSynced) {
|
||||
return errors.New("timed out waiting for caches to sync")
|
||||
}
|
||||
controller.logger.Info("Caches are synced")
|
||||
c.logger.Info("Caches are synced")
|
||||
|
||||
wg.Add(numWorkers)
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
go func() {
|
||||
wait.Until(controller.runWorker, time.Second, ctx.Done())
|
||||
wait.Until(c.runWorker, time.Second, ctx.Done())
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
@ -188,40 +188,40 @@ func (controller *restoreController) Run(ctx context.Context, numWorkers int) er
|
|||
return nil
|
||||
}
|
||||
|
||||
func (controller *restoreController) runWorker() {
|
||||
func (c *restoreController) runWorker() {
|
||||
// continually take items off the queue (waits if it's
|
||||
// empty) until we get a shutdown signal from the queue
|
||||
for controller.processNextWorkItem() {
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
func (controller *restoreController) processNextWorkItem() bool {
|
||||
key, quit := controller.queue.Get()
|
||||
func (c *restoreController) processNextWorkItem() bool {
|
||||
key, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
// always call done on this item, since if it fails we'll add
|
||||
// it back with rate-limiting below
|
||||
defer controller.queue.Done(key)
|
||||
defer c.queue.Done(key)
|
||||
|
||||
err := controller.syncHandler(key.(string))
|
||||
err := c.syncHandler(key.(string))
|
||||
if err == nil {
|
||||
// If you had no error, tell the queue to stop tracking history for your key. This will reset
|
||||
// things like failure counts for per-item rate limiting.
|
||||
controller.queue.Forget(key)
|
||||
c.queue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
controller.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue")
|
||||
c.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue")
|
||||
// we had an error processing the item so add it back
|
||||
// into the queue for re-processing with rate-limiting
|
||||
controller.queue.AddRateLimited(key)
|
||||
c.queue.AddRateLimited(key)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (controller *restoreController) processRestore(key string) error {
|
||||
logContext := controller.logger.WithField("key", key)
|
||||
func (c *restoreController) processRestore(key string) error {
|
||||
logContext := c.logger.WithField("key", key)
|
||||
|
||||
logContext.Debug("Running processRestore")
|
||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
|
@ -230,7 +230,7 @@ func (controller *restoreController) processRestore(key string) error {
|
|||
}
|
||||
|
||||
logContext.Debug("Getting Restore")
|
||||
restore, err := controller.restoreLister.Restores(ns).Get(name)
|
||||
restore, err := c.restoreLister.Restores(ns).Get(name)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error getting Restore")
|
||||
}
|
||||
|
@ -256,14 +256,14 @@ func (controller *restoreController) processRestore(key string) error {
|
|||
restore = restore.DeepCopy()
|
||||
|
||||
// complete & validate restore
|
||||
if restore.Status.ValidationErrors = controller.completeAndValidate(restore); len(restore.Status.ValidationErrors) > 0 {
|
||||
if restore.Status.ValidationErrors = c.completeAndValidate(restore); len(restore.Status.ValidationErrors) > 0 {
|
||||
restore.Status.Phase = api.RestorePhaseFailedValidation
|
||||
} else {
|
||||
restore.Status.Phase = api.RestorePhaseInProgress
|
||||
}
|
||||
|
||||
// update status
|
||||
updatedRestore, err := patchRestore(original, restore, controller.restoreClient)
|
||||
updatedRestore, err := patchRestore(original, restore, c.restoreClient)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error updating Restore phase to %s", restore.Status.Phase)
|
||||
}
|
||||
|
@ -277,7 +277,7 @@ func (controller *restoreController) processRestore(key string) error {
|
|||
|
||||
logContext.Debug("Running restore")
|
||||
// execution & upload of restore
|
||||
restoreWarnings, restoreErrors := controller.runRestore(restore, controller.bucket)
|
||||
restoreWarnings, restoreErrors := c.runRestore(restore, c.bucket)
|
||||
|
||||
restore.Status.Warnings = len(restoreWarnings.Ark) + len(restoreWarnings.Cluster)
|
||||
for _, w := range restoreWarnings.Namespaces {
|
||||
|
@ -293,14 +293,14 @@ func (controller *restoreController) processRestore(key string) error {
|
|||
restore.Status.Phase = api.RestorePhaseCompleted
|
||||
|
||||
logContext.Debug("Updating Restore final status")
|
||||
if _, err = patchRestore(original, restore, controller.restoreClient); err != nil {
|
||||
if _, err = patchRestore(original, restore, c.restoreClient); err != nil {
|
||||
logContext.WithError(errors.WithStack(err)).Info("Error updating Restore final status")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (controller *restoreController) completeAndValidate(restore *api.Restore) []string {
|
||||
func (c *restoreController) completeAndValidate(restore *api.Restore) []string {
|
||||
// add non-restorable resources to restore's excluded resources
|
||||
excludedResources := sets.NewString(restore.Spec.ExcludedResources...)
|
||||
for _, nonrestorable := range nonRestorableResources {
|
||||
|
@ -330,7 +330,7 @@ func (controller *restoreController) completeAndValidate(restore *api.Restore) [
|
|||
}
|
||||
|
||||
// validate that PV provider exists if we're restoring PVs
|
||||
if boolptr.IsSetToTrue(restore.Spec.RestorePVs) && !controller.pvProviderExists {
|
||||
if boolptr.IsSetToTrue(restore.Spec.RestorePVs) && !c.pvProviderExists {
|
||||
validationErrors = append(validationErrors, "Server is not configured for PV snapshot restores")
|
||||
}
|
||||
|
||||
|
@ -346,7 +346,7 @@ func (controller *restoreController) completeAndValidate(restore *api.Restore) [
|
|||
"ark-schedule": restore.Spec.ScheduleName,
|
||||
}))
|
||||
|
||||
backups, err := controller.backupLister.Backups(controller.namespace).List(selector)
|
||||
backups, err := c.backupLister.Backups(c.namespace).List(selector)
|
||||
if err != nil {
|
||||
return append(validationErrors, "Unable to list backups for schedule")
|
||||
}
|
||||
|
@ -362,7 +362,7 @@ func (controller *restoreController) completeAndValidate(restore *api.Restore) [
|
|||
}
|
||||
|
||||
// validate that we can fetch the source backup
|
||||
if _, err := controller.fetchBackup(controller.bucket, restore.Spec.BackupName); err != nil {
|
||||
if _, err := c.fetchBackup(c.bucket, restore.Spec.BackupName); err != nil {
|
||||
return append(validationErrors, fmt.Sprintf("Error retrieving backup: %v", err))
|
||||
}
|
||||
|
||||
|
@ -408,8 +408,8 @@ func mostRecentCompletedBackup(backups []*api.Backup) *api.Backup {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (controller *restoreController) fetchBackup(bucket, name string) (*api.Backup, error) {
|
||||
backup, err := controller.backupLister.Backups(controller.namespace).Get(name)
|
||||
func (c *restoreController) fetchBackup(bucket, name string) (*api.Backup, error) {
|
||||
backup, err := c.backupLister.Backups(c.namespace).Get(name)
|
||||
if err == nil {
|
||||
return backup, nil
|
||||
}
|
||||
|
@ -418,10 +418,10 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back
|
|||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
logContext := controller.logger.WithField("backupName", name)
|
||||
logContext := c.logger.WithField("backupName", name)
|
||||
|
||||
logContext.Debug("Backup not found in backupLister, checking object storage directly")
|
||||
backup, err = controller.backupService.GetBackup(bucket, name)
|
||||
backup, err = c.backupService.GetBackup(bucket, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -431,7 +431,7 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back
|
|||
// Clear out the namespace too, just in case
|
||||
backup.Namespace = ""
|
||||
|
||||
created, createErr := controller.backupClient.Backups(controller.namespace).Create(backup)
|
||||
created, createErr := c.backupClient.Backups(c.namespace).Create(backup)
|
||||
if createErr != nil {
|
||||
logContext.WithError(errors.WithStack(createErr)).Error("Unable to create API object for Backup")
|
||||
} else {
|
||||
|
@ -441,14 +441,14 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back
|
|||
return backup, nil
|
||||
}
|
||||
|
||||
func (controller *restoreController) runRestore(restore *api.Restore, bucket string) (restoreWarnings, restoreErrors api.RestoreResult) {
|
||||
logContext := controller.logger.WithFields(
|
||||
func (c *restoreController) runRestore(restore *api.Restore, bucket string) (restoreWarnings, restoreErrors api.RestoreResult) {
|
||||
logContext := c.logger.WithFields(
|
||||
logrus.Fields{
|
||||
"restore": kubeutil.NamespaceAndName(restore),
|
||||
"backup": restore.Spec.BackupName,
|
||||
})
|
||||
|
||||
backup, err := controller.fetchBackup(bucket, restore.Spec.BackupName)
|
||||
backup, err := c.fetchBackup(bucket, restore.Spec.BackupName)
|
||||
if err != nil {
|
||||
logContext.WithError(err).Error("Error getting backup")
|
||||
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
|
||||
|
@ -457,7 +457,7 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str
|
|||
|
||||
var tempFiles []*os.File
|
||||
|
||||
backupFile, err := downloadToTempFile(restore.Spec.BackupName, controller.backupService, bucket, controller.logger)
|
||||
backupFile, err := downloadToTempFile(restore.Spec.BackupName, c.backupService, bucket, c.logger)
|
||||
if err != nil {
|
||||
logContext.WithError(err).Error("Error downloading backup")
|
||||
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
|
||||
|
@ -493,15 +493,15 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str
|
|||
}
|
||||
}()
|
||||
|
||||
actions, err := controller.pluginManager.GetRestoreItemActions(restore.Name)
|
||||
actions, err := c.pluginManager.GetRestoreItemActions(restore.Name)
|
||||
if err != nil {
|
||||
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
|
||||
return
|
||||
}
|
||||
defer controller.pluginManager.CloseRestoreItemActions(restore.Name)
|
||||
defer c.pluginManager.CloseRestoreItemActions(restore.Name)
|
||||
|
||||
logContext.Info("starting restore")
|
||||
restoreWarnings, restoreErrors = controller.restorer.Restore(restore, backup, backupFile, logFile, actions)
|
||||
restoreWarnings, restoreErrors = c.restorer.Restore(restore, backup, backupFile, logFile, actions)
|
||||
logContext.Info("restore completed")
|
||||
|
||||
// Try to upload the log file. This is best-effort. If we fail, we'll add to the ark errors.
|
||||
|
@ -512,7 +512,7 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str
|
|||
return
|
||||
}
|
||||
|
||||
if err := controller.backupService.UploadRestoreLog(bucket, restore.Spec.BackupName, restore.Name, logFile); err != nil {
|
||||
if err := c.backupService.UploadRestoreLog(bucket, restore.Spec.BackupName, restore.Name, logFile); err != nil {
|
||||
restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error uploading log file to object storage: %v", err))
|
||||
}
|
||||
|
||||
|
@ -533,7 +533,7 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str
|
|||
logContext.WithError(errors.WithStack(err)).Error("Error resetting results file offset to 0")
|
||||
return
|
||||
}
|
||||
if err := controller.backupService.UploadRestoreResults(bucket, restore.Spec.BackupName, restore.Name, resultsFile); err != nil {
|
||||
if err := c.backupService.UploadRestoreResults(bucket, restore.Spec.BackupName, restore.Name, resultsFile); err != nil {
|
||||
logContext.WithError(errors.WithStack(err)).Error("Error uploading results files to object storage")
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue