diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index 50dbf1058..4cc5ee23d 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -19,13 +19,11 @@ package controller import ( "bytes" "compress/gzip" - "context" "encoding/json" "fmt" "io" "io/ioutil" "os" - "sync" "time" jsonpatch "github.com/evanphx/json-patch" @@ -36,9 +34,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" kerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" api "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/backup" @@ -57,23 +53,20 @@ import ( const backupVersion = 1 type backupController struct { - backupper backup.Backupper - pvProviderExists bool - lister listers.BackupLister - listerSynced cache.InformerSynced - client arkv1client.BackupsGetter - syncHandler func(backupName string) error - queue workqueue.RateLimitingInterface - clock clock.Clock - logger logrus.FieldLogger - logLevel logrus.Level - newPluginManager func(logrus.FieldLogger) plugin.Manager - backupTracker BackupTracker - backupLocationLister listers.BackupStorageLocationLister - backupLocationListerSynced cache.InformerSynced - defaultBackupLocation string - metrics *metrics.ServerMetrics - newBackupStore func(*api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) + *genericController + + backupper backup.Backupper + pvProviderExists bool + lister listers.BackupLister + client arkv1client.BackupsGetter + clock clock.Clock + backupLogLevel logrus.Level + newPluginManager func(logrus.FieldLogger) plugin.Manager + backupTracker BackupTracker + backupLocationLister listers.BackupStorageLocationLister + defaultBackupLocation string + metrics *metrics.ServerMetrics + newBackupStore func(*api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) } func NewBackupController( @@ -82,7 +75,7 @@ func NewBackupController( backupper backup.Backupper, pvProviderExists bool, logger logrus.FieldLogger, - logLevel logrus.Level, + backupLogLevel logrus.Level, newPluginManager func(logrus.FieldLogger) plugin.Manager, backupTracker BackupTracker, backupLocationInformer informers.BackupStorageLocationInformer, @@ -90,26 +83,27 @@ func NewBackupController( metrics *metrics.ServerMetrics, ) Interface { c := &backupController{ - backupper: backupper, - pvProviderExists: pvProviderExists, - lister: backupInformer.Lister(), - listerSynced: backupInformer.Informer().HasSynced, - client: client, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"), - clock: &clock.RealClock{}, - logger: logger, - logLevel: logLevel, - newPluginManager: newPluginManager, - backupTracker: backupTracker, - backupLocationLister: backupLocationInformer.Lister(), - backupLocationListerSynced: backupLocationInformer.Informer().HasSynced, - defaultBackupLocation: defaultBackupLocation, - metrics: metrics, + genericController: newGenericController("backup", logger), + backupper: backupper, + pvProviderExists: pvProviderExists, + lister: backupInformer.Lister(), + client: client, + clock: &clock.RealClock{}, + backupLogLevel: backupLogLevel, + newPluginManager: newPluginManager, + backupTracker: backupTracker, + backupLocationLister: backupLocationInformer.Lister(), + defaultBackupLocation: defaultBackupLocation, + metrics: metrics, newBackupStore: persistence.NewObjectBackupStore, } c.syncHandler = c.processBackup + c.cacheSyncWaiters = append(c.cacheSyncWaiters, + backupInformer.Informer().HasSynced, + backupLocationInformer.Informer().HasSynced, + ) backupInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ @@ -140,91 +134,17 @@ func NewBackupController( return c } -// Run is a blocking function that runs the specified number of worker goroutines -// to process items in the work queue. It will return when it receives on the -// ctx.Done() channel. -func (controller *backupController) Run(ctx context.Context, numWorkers int) error { - var wg sync.WaitGroup +func (c *backupController) processBackup(key string) error { + log := c.logger.WithField("key", key) - defer func() { - controller.logger.Info("Waiting for workers to finish their work") - - controller.queue.ShutDown() - - // We have to wait here in the deferred function instead of at the bottom of the function body - // because we have to shut down the queue in order for the workers to shut down gracefully, and - // we want to shut down the queue via defer and not at the end of the body. - wg.Wait() - - controller.logger.Info("All workers have finished") - - }() - - controller.logger.Info("Starting BackupController") - defer controller.logger.Info("Shutting down BackupController") - - controller.logger.Info("Waiting for caches to sync") - if !cache.WaitForCacheSync(ctx.Done(), controller.listerSynced, controller.backupLocationListerSynced) { - return errors.New("timed out waiting for caches to sync") - } - controller.logger.Info("Caches are synced") - - wg.Add(numWorkers) - for i := 0; i < numWorkers; i++ { - go func() { - wait.Until(controller.runWorker, time.Second, ctx.Done()) - wg.Done() - }() - } - - <-ctx.Done() - - return nil -} - -func (controller *backupController) runWorker() { - // continually take items off the queue (waits if it's - // empty) until we get a shutdown signal from the queue - for controller.processNextWorkItem() { - } -} - -func (controller *backupController) processNextWorkItem() bool { - key, quit := controller.queue.Get() - if quit { - return false - } - // always call done on this item, since if it fails we'll add - // it back with rate-limiting below - defer controller.queue.Done(key) - - err := controller.syncHandler(key.(string)) - if err == nil { - // If you had no error, tell the queue to stop tracking history for your key. This will reset - // things like failure counts for per-item rate limiting. - controller.queue.Forget(key) - return true - } - - controller.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue") - // we had an error processing the item so add it back - // into the queue for re-processing with rate-limiting - controller.queue.AddRateLimited(key) - - return true -} - -func (controller *backupController) processBackup(key string) error { - logContext := controller.logger.WithField("key", key) - - logContext.Debug("Running processBackup") + log.Debug("Running processBackup") ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return errors.Wrap(err, "error splitting queue key") } - logContext.Debug("Getting backup") - backup, err := controller.lister.Backups(ns).Get(name) + log.Debug("Getting backup") + backup, err := c.lister.Backups(ns).Get(name) if err != nil { return errors.Wrap(err, "error getting backup") } @@ -244,7 +164,7 @@ func (controller *backupController) processBackup(key string) error { return nil } - logContext.Debug("Cloning backup") + log.Debug("Cloning backup") // store ref to original for creating patch original := backup // don't modify items in the cache @@ -255,19 +175,19 @@ func (controller *backupController) processBackup(key string) error { // calculate expiration if backup.Spec.TTL.Duration > 0 { - backup.Status.Expiration = metav1.NewTime(controller.clock.Now().Add(backup.Spec.TTL.Duration)) + backup.Status.Expiration = metav1.NewTime(c.clock.Now().Add(backup.Spec.TTL.Duration)) } var backupLocation *api.BackupStorageLocation // validation - if backupLocation, backup.Status.ValidationErrors = controller.getLocationAndValidate(backup, controller.defaultBackupLocation); len(backup.Status.ValidationErrors) > 0 { + if backupLocation, backup.Status.ValidationErrors = c.getLocationAndValidate(backup, c.defaultBackupLocation); len(backup.Status.ValidationErrors) > 0 { backup.Status.Phase = api.BackupPhaseFailedValidation } else { backup.Status.Phase = api.BackupPhaseInProgress } // update status - updatedBackup, err := patchBackup(original, backup, controller.client) + updatedBackup, err := patchBackup(original, backup, c.client) if err != nil { return errors.Wrapf(err, "error updating Backup status to %s", backup.Status.Phase) } @@ -279,25 +199,25 @@ func (controller *backupController) processBackup(key string) error { return nil } - controller.backupTracker.Add(backup.Namespace, backup.Name) - defer controller.backupTracker.Delete(backup.Namespace, backup.Name) + c.backupTracker.Add(backup.Namespace, backup.Name) + defer c.backupTracker.Delete(backup.Namespace, backup.Name) - logContext.Debug("Running backup") + log.Debug("Running backup") // execution & upload of backup backupScheduleName := backup.GetLabels()["ark-schedule"] - controller.metrics.RegisterBackupAttempt(backupScheduleName) + c.metrics.RegisterBackupAttempt(backupScheduleName) - if err := controller.runBackup(backup, backupLocation); err != nil { - logContext.WithError(err).Error("backup failed") + if err := c.runBackup(backup, backupLocation); err != nil { + log.WithError(err).Error("backup failed") backup.Status.Phase = api.BackupPhaseFailed - controller.metrics.RegisterBackupFailed(backupScheduleName) + c.metrics.RegisterBackupFailed(backupScheduleName) } else { - controller.metrics.RegisterBackupSuccess(backupScheduleName) + c.metrics.RegisterBackupSuccess(backupScheduleName) } - logContext.Debug("Updating backup's final status") - if _, err := patchBackup(original, backup, controller.client); err != nil { - logContext.WithError(err).Error("error updating backup's final status") + log.Debug("Updating backup's final status") + if _, err := patchBackup(original, backup, c.client); err != nil { + log.WithError(err).Error("error updating backup's final status") } return nil @@ -327,7 +247,7 @@ func patchBackup(original, updated *api.Backup, client arkv1client.BackupsGetter return res, nil } -func (controller *backupController) getLocationAndValidate(itm *api.Backup, defaultBackupLocation string) (*api.BackupStorageLocation, []string) { +func (c *backupController) getLocationAndValidate(itm *api.Backup, defaultBackupLocation string) (*api.BackupStorageLocation, []string) { var validationErrors []string for _, err := range collections.ValidateIncludesExcludes(itm.Spec.IncludedResources, itm.Spec.ExcludedResources) { @@ -338,7 +258,7 @@ func (controller *backupController) getLocationAndValidate(itm *api.Backup, defa validationErrors = append(validationErrors, fmt.Sprintf("Invalid included/excluded namespace lists: %v", err)) } - if !controller.pvProviderExists && itm.Spec.SnapshotVolumes != nil && *itm.Spec.SnapshotVolumes { + if !c.pvProviderExists && itm.Spec.SnapshotVolumes != nil && *itm.Spec.SnapshotVolumes { validationErrors = append(validationErrors, "Server is not configured for PV snapshots") } @@ -353,7 +273,7 @@ func (controller *backupController) getLocationAndValidate(itm *api.Backup, defa itm.Labels[api.StorageLocationLabel] = itm.Spec.StorageLocation var backupLocation *api.BackupStorageLocation - backupLocation, err := controller.backupLocationLister.BackupStorageLocations(itm.Namespace).Get(itm.Spec.StorageLocation) + backupLocation, err := c.backupLocationLister.BackupStorageLocations(itm.Namespace).Get(itm.Spec.StorageLocation) if err != nil { validationErrors = append(validationErrors, fmt.Sprintf("Error getting backup storage location: %v", err)) } @@ -361,10 +281,10 @@ func (controller *backupController) getLocationAndValidate(itm *api.Backup, defa return backupLocation, validationErrors } -func (controller *backupController) runBackup(backup *api.Backup, backupLocation *api.BackupStorageLocation) error { - log := controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)) +func (c *backupController) runBackup(backup *api.Backup, backupLocation *api.BackupStorageLocation) error { + log := c.logger.WithField("backup", kubeutil.NamespaceAndName(backup)) log.Info("Starting backup") - backup.Status.StartTimestamp.Time = controller.clock.Now() + backup.Status.StartTimestamp.Time = c.clock.Now() logFile, err := ioutil.TempFile("", "") if err != nil { @@ -374,11 +294,11 @@ func (controller *backupController) runBackup(backup *api.Backup, backupLocation // Assuming we successfully uploaded the log file, this will have already been closed below. It is safe to call // close multiple times. If we get an error closing this, there's not really anything we can do about it. defer gzippedLogFile.Close() - defer closeAndRemoveFile(logFile, controller.logger) + defer closeAndRemoveFile(logFile, c.logger) // Log the backup to both a backup log file and to stdout. This will help see what happened if the upload of the // backup log failed for whatever reason. - logger := logging.DefaultLogger(controller.logLevel) + logger := logging.DefaultLogger(c.backupLogLevel) logger.Out = io.MultiWriter(os.Stdout, gzippedLogFile) log = logger.WithField("backup", kubeutil.NamespaceAndName(backup)) @@ -390,7 +310,7 @@ func (controller *backupController) runBackup(backup *api.Backup, backupLocation } defer closeAndRemoveFile(backupFile, log) - pluginManager := controller.newPluginManager(log) + pluginManager := c.newPluginManager(log) defer pluginManager.CleanupClients() actions, err := pluginManager.GetBackupItemActions() @@ -398,7 +318,7 @@ func (controller *backupController) runBackup(backup *api.Backup, backupLocation return err } - backupStore, err := controller.newBackupStore(backupLocation, pluginManager, log) + backupStore, err := c.newBackupStore(backupLocation, pluginManager, log) if err != nil { return err } @@ -408,7 +328,7 @@ func (controller *backupController) runBackup(backup *api.Backup, backupLocation var backupJSONToUpload, backupFileToUpload io.Reader // Do the actual backup - if err := controller.backupper.Backup(log, backup, backupFile, actions); err != nil { + if err := c.backupper.Backup(log, backup, backupFile, actions); err != nil { errs = append(errs, err) backup.Status.Phase = api.BackupPhaseFailed @@ -418,7 +338,7 @@ func (controller *backupController) runBackup(backup *api.Backup, backupLocation // Mark completion timestamp before serializing and uploading. // Otherwise, the JSON file in object storage has a CompletionTimestamp of 'null'. - backup.Status.CompletionTimestamp.Time = controller.clock.Now() + backup.Status.CompletionTimestamp.Time = c.clock.Now() backupJSON := new(bytes.Buffer) if err := encode.EncodeTo(backup, "json", backupJSON); err != nil { @@ -437,7 +357,7 @@ func (controller *backupController) runBackup(backup *api.Backup, backupLocation } if err := gzippedLogFile.Close(); err != nil { - controller.logger.WithError(err).Error("error closing gzippedLogFile") + c.logger.WithError(err).Error("error closing gzippedLogFile") } if err := backupStore.PutBackup(backup.Name, backupJSONToUpload, backupFileToUpload, logFile); err != nil { @@ -445,11 +365,11 @@ func (controller *backupController) runBackup(backup *api.Backup, backupLocation } backupScheduleName := backup.GetLabels()["ark-schedule"] - controller.metrics.SetBackupTarballSizeBytesGauge(backupScheduleName, backupSizeBytes) + c.metrics.SetBackupTarballSizeBytesGauge(backupScheduleName, backupSizeBytes) backupDuration := backup.Status.CompletionTimestamp.Time.Sub(backup.Status.StartTimestamp.Time) backupDurationSeconds := float64(backupDuration / time.Second) - controller.metrics.RegisterBackupDuration(backupScheduleName, backupDurationSeconds) + c.metrics.RegisterBackupDuration(backupScheduleName, backupDurationSeconds) log.Info("Backup completed") diff --git a/pkg/controller/download_request_controller.go b/pkg/controller/download_request_controller.go index 0a224f1d9..831d01af2 100644 --- a/pkg/controller/download_request_controller.go +++ b/pkg/controller/download_request_controller.go @@ -193,14 +193,14 @@ func (c *downloadRequestController) generatePreSignedURL(downloadRequest *v1.Dow // deleteIfExpired deletes downloadRequest if it has expired. func (c *downloadRequestController) deleteIfExpired(downloadRequest *v1.DownloadRequest) error { - logContext := c.logger.WithField("key", kube.NamespaceAndName(downloadRequest)) - logContext.Info("checking for expiration of DownloadRequest") + log := c.logger.WithField("key", kube.NamespaceAndName(downloadRequest)) + log.Info("checking for expiration of DownloadRequest") if downloadRequest.Status.Expiration.Time.After(c.clock.Now()) { - logContext.Debug("DownloadRequest has not expired") + log.Debug("DownloadRequest has not expired") return nil } - logContext.Debug("DownloadRequest has expired - deleting") + log.Debug("DownloadRequest has expired - deleting") return errors.WithStack(c.downloadRequestClient.DownloadRequests(downloadRequest.Namespace).Delete(downloadRequest.Name, nil)) } diff --git a/pkg/controller/gc_controller.go b/pkg/controller/gc_controller.go index 71ae4f018..95431227d 100644 --- a/pkg/controller/gc_controller.go +++ b/pkg/controller/gc_controller.go @@ -42,7 +42,6 @@ const ( type gcController struct { *genericController - logger logrus.FieldLogger backupLister listers.BackupLister deleteBackupRequestLister listers.DeleteBackupRequestLister deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter @@ -63,7 +62,6 @@ func NewGCController( backupLister: backupInformer.Lister(), deleteBackupRequestLister: deleteBackupRequestInformer.Lister(), deleteBackupRequestClient: deleteBackupRequestClient, - logger: logger, } c.syncHandler = c.processQueueItem diff --git a/pkg/controller/restore_controller.go b/pkg/controller/restore_controller.go index aaf01c65f..9162cee06 100644 --- a/pkg/controller/restore_controller.go +++ b/pkg/controller/restore_controller.go @@ -18,15 +18,12 @@ package controller import ( "compress/gzip" - "context" "encoding/json" "fmt" "io" "io/ioutil" "os" "sort" - "sync" - "time" jsonpatch "github.com/evanphx/json-patch" "github.com/pkg/errors" @@ -36,9 +33,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" api "github.com/heptio/ark/pkg/apis/ark/v1" arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" @@ -71,23 +66,19 @@ var nonRestorableResources = []string{ } type restoreController struct { - namespace string - restoreClient arkv1client.RestoresGetter - backupClient arkv1client.BackupsGetter - restorer restore.Restorer - pvProviderExists bool - backupLister listers.BackupLister - backupListerSynced cache.InformerSynced - restoreLister listers.RestoreLister - restoreListerSynced cache.InformerSynced - backupLocationLister listers.BackupStorageLocationLister - backupLocationListerSynced cache.InformerSynced - syncHandler func(restoreName string) error - queue workqueue.RateLimitingInterface - logger logrus.FieldLogger - logLevel logrus.Level - defaultBackupLocation string - metrics *metrics.ServerMetrics + *genericController + + namespace string + restoreClient arkv1client.RestoresGetter + backupClient arkv1client.BackupsGetter + restorer restore.Restorer + pvProviderExists bool + backupLister listers.BackupLister + restoreLister listers.RestoreLister + backupLocationLister listers.BackupStorageLocationLister + restoreLogLevel logrus.Level + defaultBackupLocation string + metrics *metrics.ServerMetrics newPluginManager func(logger logrus.FieldLogger) plugin.Manager newBackupStore func(*api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) @@ -103,26 +94,22 @@ func NewRestoreController( backupLocationInformer informers.BackupStorageLocationInformer, pvProviderExists bool, logger logrus.FieldLogger, - logLevel logrus.Level, + restoreLogLevel logrus.Level, newPluginManager func(logrus.FieldLogger) plugin.Manager, defaultBackupLocation string, metrics *metrics.ServerMetrics, ) Interface { c := &restoreController{ - namespace: namespace, - restoreClient: restoreClient, - backupClient: backupClient, - restorer: restorer, - pvProviderExists: pvProviderExists, - backupLister: backupInformer.Lister(), - backupListerSynced: backupInformer.Informer().HasSynced, - restoreLister: restoreInformer.Lister(), - restoreListerSynced: restoreInformer.Informer().HasSynced, - backupLocationLister: backupLocationInformer.Lister(), - backupLocationListerSynced: backupLocationInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "restore"), - logger: logger, - logLevel: logLevel, + genericController: newGenericController("restore", logger), + namespace: namespace, + restoreClient: restoreClient, + backupClient: backupClient, + restorer: restorer, + pvProviderExists: pvProviderExists, + backupLister: backupInformer.Lister(), + restoreLister: restoreInformer.Lister(), + backupLocationLister: backupLocationInformer.Lister(), + restoreLogLevel: restoreLogLevel, defaultBackupLocation: defaultBackupLocation, metrics: metrics, @@ -133,6 +120,11 @@ func NewRestoreController( } c.syncHandler = c.processRestore + c.cacheSyncWaiters = append(c.cacheSyncWaiters, + backupInformer.Informer().HasSynced, + restoreInformer.Informer().HasSynced, + backupLocationInformer.Informer().HasSynced, + ) restoreInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ @@ -163,91 +155,18 @@ func NewRestoreController( return c } -// Run is a blocking function that runs the specified number of worker goroutines -// to process items in the work queue. It will return when it receives on the -// ctx.Done() channel. -func (c *restoreController) Run(ctx context.Context, numWorkers int) error { - var wg sync.WaitGroup - - defer func() { - c.logger.Info("Waiting for workers to finish their work") - - c.queue.ShutDown() - - // We have to wait here in the deferred function instead of at the bottom of the function body - // because we have to shut down the queue in order for the workers to shut down gracefully, and - // we want to shut down the queue via defer and not at the end of the body. - wg.Wait() - - c.logger.Info("All workers have finished") - }() - - c.logger.Info("Starting RestoreController") - defer c.logger.Info("Shutting down RestoreController") - - c.logger.Info("Waiting for caches to sync") - if !cache.WaitForCacheSync(ctx.Done(), c.backupListerSynced, c.restoreListerSynced, c.backupLocationListerSynced) { - return errors.New("timed out waiting for caches to sync") - } - c.logger.Info("Caches are synced") - - wg.Add(numWorkers) - for i := 0; i < numWorkers; i++ { - go func() { - wait.Until(c.runWorker, time.Second, ctx.Done()) - wg.Done() - }() - } - - <-ctx.Done() - - return nil -} - -func (c *restoreController) runWorker() { - // continually take items off the queue (waits if it's - // empty) until we get a shutdown signal from the queue - for c.processNextWorkItem() { - } -} - -func (c *restoreController) processNextWorkItem() bool { - key, quit := c.queue.Get() - if quit { - return false - } - // always call done on this item, since if it fails we'll add - // it back with rate-limiting below - defer c.queue.Done(key) - - err := c.syncHandler(key.(string)) - if err == nil { - // If you had no error, tell the queue to stop tracking history for your key. This will reset - // things like failure counts for per-item rate limiting. - c.queue.Forget(key) - return true - } - - c.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue") - // we had an error processing the item so add it back - // into the queue for re-processing with rate-limiting - c.queue.AddRateLimited(key) - - return true -} - func (c *restoreController) processRestore(key string) error { - logContext := c.logger.WithField("key", key) + log := c.logger.WithField("key", key) - logContext.Debug("Running processRestore") + log.Debug("Running processRestore") ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - logContext.WithError(err).Error("unable to process restore: error splitting queue key") + log.WithError(err).Error("unable to process restore: error splitting queue key") // Return nil here so we don't try to process the key any more return nil } - logContext.Debug("Getting Restore") + log.Debug("Getting Restore") restore, err := c.restoreLister.Restores(ns).Get(name) if err != nil { return errors.Wrap(err, "error getting Restore") @@ -267,13 +186,13 @@ func (c *restoreController) processRestore(key string) error { return nil } - logContext.Debug("Cloning Restore") + log.Debug("Cloning Restore") // store ref to original for creating patch original := restore // don't modify items in the cache restore = restore.DeepCopy() - pluginManager := c.newPluginManager(logContext) + pluginManager := c.newPluginManager(log) defer pluginManager.CleanupClients() actions, err := pluginManager.GetRestoreItemActions() @@ -307,7 +226,7 @@ func (c *restoreController) processRestore(key string) error { return nil } - logContext.Debug("Running restore") + log.Debug("Running restore") // execution & upload of restore restoreWarnings, restoreErrors, restoreFailure := c.runRestore( @@ -327,20 +246,20 @@ func (c *restoreController) processRestore(key string) error { } if restoreFailure != nil { - logContext.Debug("restore failed") + log.Debug("restore failed") restore.Status.Phase = api.RestorePhaseFailed restore.Status.FailureReason = restoreFailure.Error() c.metrics.RegisterRestoreFailed(backupScheduleName) } else { - logContext.Debug("restore completed") + log.Debug("restore completed") // We got through the restore process without failing validation or restore execution restore.Status.Phase = api.RestorePhaseCompleted c.metrics.RegisterRestoreSuccess(backupScheduleName) } - logContext.Debug("Updating Restore final status") + log.Debug("Updating Restore final status") if _, err = patchRestore(original, restore, c.restoreClient); err != nil { - logContext.WithError(errors.WithStack(err)).Info("Error updating Restore final status") + log.WithError(errors.WithStack(err)).Info("Error updating Restore final status") } return nil @@ -467,8 +386,8 @@ func (c *restoreController) fetchBackupInfo(backupName string, pluginManager plu return backupInfo{}, errors.WithStack(err) } - logContext := c.logger.WithField("backupName", backupName) - logContext.Debug("Backup not found in backupLister, checking each backup location directly, starting with default...") + log := c.logger.WithField("backupName", backupName) + log.Debug("Backup not found in backupLister, checking each backup location directly, starting with default...") return c.fetchFromBackupStorage(backupName, pluginManager) } @@ -498,11 +417,11 @@ func (c *restoreController) fetchFromBackupStorage(backupName string, pluginMana orderedLocations := orderedBackupLocations(locations, c.defaultBackupLocation) - logContext := c.logger.WithField("backupName", backupName) + log := c.logger.WithField("backupName", backupName) for _, location := range orderedLocations { info, err := c.backupInfoForLocation(location, backupName, pluginManager) if err != nil { - logContext.WithField("locationName", location.Name).WithError(err).Error("Unable to fetch backup from object storage location") + log.WithField("locationName", location.Name).WithError(err).Error("Unable to fetch backup from object storage location") continue } return info, nil @@ -586,9 +505,9 @@ func (c *restoreController) runRestore( // Log the backup to both a backup log file and to stdout. This will help see what happened if the upload of the // backup log failed for whatever reason. - logger := logging.DefaultLogger(c.logLevel) + logger := logging.DefaultLogger(c.restoreLogLevel) logger.Out = io.MultiWriter(os.Stdout, gzippedLogFile) - logContext := logger.WithFields( + log := logger.WithFields( logrus.Fields{ "restore": kubeutil.NamespaceAndName(restore), "backup": restore.Spec.BackupName, @@ -596,7 +515,7 @@ func (c *restoreController) runRestore( backupFile, err := downloadToTempFile(restore.Spec.BackupName, info.backupStore, c.logger) if err != nil { - logContext.WithError(err).Error("Error downloading backup") + log.WithError(err).Error("Error downloading backup") restoreErrors.Ark = append(restoreErrors.Ark, err.Error()) restoreFailure = err return @@ -605,7 +524,7 @@ func (c *restoreController) runRestore( resultsFile, err := ioutil.TempFile("", "") if err != nil { - logContext.WithError(errors.WithStack(err)).Error("Error creating results temp file") + log.WithError(errors.WithStack(err)).Error("Error creating results temp file") restoreErrors.Ark = append(restoreErrors.Ark, err.Error()) restoreFailure = err return @@ -614,9 +533,9 @@ func (c *restoreController) runRestore( // Any return statement above this line means a total restore failure // Some failures after this line *may* be a total restore failure - logContext.Info("starting restore") - restoreWarnings, restoreErrors = c.restorer.Restore(logContext, restore, info.backup, backupFile, actions) - logContext.Info("restore completed") + log.Info("starting restore") + restoreWarnings, restoreErrors = c.restorer.Restore(log, restore, info.backup, backupFile, actions) + log.Info("restore completed") // Try to upload the log file. This is best-effort. If we fail, we'll add to the ark errors. if err := gzippedLogFile.Close(); err != nil { @@ -640,17 +559,17 @@ func (c *restoreController) runRestore( gzippedResultsFile := gzip.NewWriter(resultsFile) if err := json.NewEncoder(gzippedResultsFile).Encode(m); err != nil { - logContext.WithError(errors.WithStack(err)).Error("Error encoding restore results") + log.WithError(errors.WithStack(err)).Error("Error encoding restore results") return } gzippedResultsFile.Close() if _, err = resultsFile.Seek(0, 0); err != nil { - logContext.WithError(errors.WithStack(err)).Error("Error resetting results file offset to 0") + log.WithError(errors.WithStack(err)).Error("Error resetting results file offset to 0") return } if err := info.backupStore.PutRestoreResults(restore.Spec.BackupName, restore.Name, resultsFile); err != nil { - logContext.WithError(errors.WithStack(err)).Error("Error uploading results file to backup storage") + log.WithError(errors.WithStack(err)).Error("Error uploading results file to backup storage") } return @@ -677,9 +596,9 @@ func downloadToTempFile( return nil, errors.Wrap(err, "error copying Backup to temp file") } - logContext := logger.WithField("backup", backupName) + log := logger.WithField("backup", backupName) - logContext.WithFields(logrus.Fields{ + log.WithFields(logrus.Fields{ "fileName": file.Name(), "bytes": n, }).Debug("Copied Backup to file") diff --git a/pkg/controller/schedule_controller.go b/pkg/controller/schedule_controller.go index 266b6191b..c68b2e81c 100644 --- a/pkg/controller/schedule_controller.go +++ b/pkg/controller/schedule_controller.go @@ -17,10 +17,8 @@ limitations under the License. package controller import ( - "context" "encoding/json" "fmt" - "sync" "time" jsonpatch "github.com/evanphx/json-patch" @@ -33,9 +31,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" api "github.com/heptio/ark/pkg/apis/ark/v1" arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" @@ -50,16 +46,14 @@ const ( ) type scheduleController struct { - namespace string - schedulesClient arkv1client.SchedulesGetter - backupsClient arkv1client.BackupsGetter - schedulesLister listers.ScheduleLister - schedulesListerSynced cache.InformerSynced - syncHandler func(scheduleName string) error - queue workqueue.RateLimitingInterface - clock clock.Clock - logger logrus.FieldLogger - metrics *metrics.ServerMetrics + *genericController + + namespace string + schedulesClient arkv1client.SchedulesGetter + backupsClient arkv1client.BackupsGetter + schedulesLister listers.ScheduleLister + clock clock.Clock + metrics *metrics.ServerMetrics } func NewScheduleController( @@ -71,18 +65,19 @@ func NewScheduleController( metrics *metrics.ServerMetrics, ) *scheduleController { c := &scheduleController{ - namespace: namespace, - schedulesClient: schedulesClient, - backupsClient: backupsClient, - schedulesLister: schedulesInformer.Lister(), - schedulesListerSynced: schedulesInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "schedule"), - clock: clock.RealClock{}, - logger: logger, - metrics: metrics, + genericController: newGenericController("schedule", logger), + namespace: namespace, + schedulesClient: schedulesClient, + backupsClient: backupsClient, + schedulesLister: schedulesInformer.Lister(), + clock: clock.RealClock{}, + metrics: metrics, } c.syncHandler = c.processSchedule + c.cacheSyncWaiters = append(c.cacheSyncWaiters, schedulesInformer.Informer().HasSynced) + c.resyncFunc = c.enqueueAllEnabledSchedules + c.resyncPeriod = scheduleSyncPeriod schedulesInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ @@ -117,52 +112,10 @@ func NewScheduleController( return c } -// Run is a blocking function that runs the specified number of worker goroutines -// to process items in the work queue. It will return when it receives on the -// ctx.Done() channel. -func (controller *scheduleController) Run(ctx context.Context, numWorkers int) error { - var wg sync.WaitGroup - - defer func() { - controller.logger.Info("Waiting for workers to finish their work") - - controller.queue.ShutDown() - - // We have to wait here in the deferred function instead of at the bottom of the function body - // because we have to shut down the queue in order for the workers to shut down gracefully, and - // we want to shut down the queue via defer and not at the end of the body. - wg.Wait() - - controller.logger.Info("All workers have finished") - }() - - controller.logger.Info("Starting ScheduleController") - defer controller.logger.Info("Shutting down ScheduleController") - - controller.logger.Info("Waiting for caches to sync") - if !cache.WaitForCacheSync(ctx.Done(), controller.schedulesListerSynced) { - return errors.New("timed out waiting for caches to sync") - } - controller.logger.Info("Caches are synced") - - wg.Add(numWorkers) - for i := 0; i < numWorkers; i++ { - go func() { - wait.Until(controller.runWorker, time.Second, ctx.Done()) - wg.Done() - }() - } - - go wait.Until(controller.enqueueAllEnabledSchedules, scheduleSyncPeriod, ctx.Done()) - - <-ctx.Done() - return nil -} - -func (controller *scheduleController) enqueueAllEnabledSchedules() { - schedules, err := controller.schedulesLister.Schedules(controller.namespace).List(labels.NewSelector()) +func (c *scheduleController) enqueueAllEnabledSchedules() { + schedules, err := c.schedulesLister.Schedules(c.namespace).List(labels.NewSelector()) if err != nil { - controller.logger.WithError(errors.WithStack(err)).Error("Error listing Schedules") + c.logger.WithError(errors.WithStack(err)).Error("Error listing Schedules") return } @@ -173,60 +126,28 @@ func (controller *scheduleController) enqueueAllEnabledSchedules() { key, err := cache.MetaNamespaceKeyFunc(schedule) if err != nil { - controller.logger.WithError(errors.WithStack(err)).WithField("schedule", schedule).Error("Error creating queue key, item not added to queue") + c.logger.WithError(errors.WithStack(err)).WithField("schedule", schedule).Error("Error creating queue key, item not added to queue") continue } - controller.queue.Add(key) + c.queue.Add(key) } } -func (controller *scheduleController) runWorker() { - // continually take items off the queue (waits if it's - // empty) until we get a shutdown signal from the queue - for controller.processNextWorkItem() { - } -} +func (c *scheduleController) processSchedule(key string) error { + log := c.logger.WithField("key", key) -func (controller *scheduleController) processNextWorkItem() bool { - key, quit := controller.queue.Get() - if quit { - return false - } - // always call done on this item, since if it fails we'll add - // it back with rate-limiting below - defer controller.queue.Done(key) - - err := controller.syncHandler(key.(string)) - if err == nil { - // If you had no error, tell the queue to stop tracking history for your key. This will reset - // things like failure counts for per-item rate limiting. - controller.queue.Forget(key) - return true - } - - controller.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue") - // we had an error processing the item so add it back - // into the queue for re-processing with rate-limiting - controller.queue.AddRateLimited(key) - - return true -} - -func (controller *scheduleController) processSchedule(key string) error { - logContext := controller.logger.WithField("key", key) - - logContext.Debug("Running processSchedule") + log.Debug("Running processSchedule") ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return errors.Wrap(err, "error splitting queue key") } - logContext.Debug("Getting Schedule") - schedule, err := controller.schedulesLister.Schedules(ns).Get(name) + log.Debug("Getting Schedule") + schedule, err := c.schedulesLister.Schedules(ns).Get(name) if err != nil { // schedule no longer exists if apierrors.IsNotFound(err) { - logContext.WithError(err).Debug("Schedule not found") + log.WithError(err).Debug("Schedule not found") return nil } return errors.Wrap(err, "error getting Schedule") @@ -239,7 +160,7 @@ func (controller *scheduleController) processSchedule(key string) error { return nil } - logContext.Debug("Cloning schedule") + log.Debug("Cloning schedule") // store ref to original for creating patch original := schedule // don't modify items in the cache @@ -249,7 +170,7 @@ func (controller *scheduleController) processSchedule(key string) error { // so re-validate currentPhase := schedule.Status.Phase - cronSchedule, errs := parseCronSchedule(schedule, controller.logger) + cronSchedule, errs := parseCronSchedule(schedule, c.logger) if len(errs) > 0 { schedule.Status.Phase = api.SchedulePhaseFailedValidation schedule.Status.ValidationErrors = errs @@ -259,7 +180,7 @@ func (controller *scheduleController) processSchedule(key string) error { // update status if it's changed if currentPhase != schedule.Status.Phase { - updatedSchedule, err := patchSchedule(original, schedule, controller.schedulesClient) + updatedSchedule, err := patchSchedule(original, schedule, c.schedulesClient) if err != nil { return errors.Wrapf(err, "error updating Schedule phase to %s", schedule.Status.Phase) } @@ -271,7 +192,7 @@ func (controller *scheduleController) processSchedule(key string) error { } // check for the schedule being due to run, and submit a Backup if so - if err := controller.submitBackupIfDue(schedule, cronSchedule); err != nil { + if err := c.submitBackupIfDue(schedule, cronSchedule); err != nil { return err } @@ -288,14 +209,14 @@ func parseCronSchedule(itm *api.Schedule, logger logrus.FieldLogger) (cron.Sched return nil, validationErrors } - logContext := logger.WithField("schedule", kubeutil.NamespaceAndName(itm)) + log := logger.WithField("schedule", kubeutil.NamespaceAndName(itm)) // adding a recover() around cron.Parse because it panics on empty string and is possible // that it panics under other scenarios as well. func() { defer func() { if r := recover(); r != nil { - logContext.WithFields(logrus.Fields{ + log.WithFields(logrus.Fields{ "schedule": itm.Spec.Schedule, "recover": r, }).Debug("Panic parsing schedule") @@ -304,7 +225,7 @@ func parseCronSchedule(itm *api.Schedule, logger logrus.FieldLogger) (cron.Sched }() if res, err := cron.ParseStandard(itm.Spec.Schedule); err != nil { - logContext.WithError(errors.WithStack(err)).WithField("schedule", itm.Spec.Schedule).Debug("Error parsing schedule") + log.WithError(errors.WithStack(err)).WithField("schedule", itm.Spec.Schedule).Debug("Error parsing schedule") validationErrors = append(validationErrors, fmt.Sprintf("invalid schedule: %v", err)) } else { schedule = res @@ -318,15 +239,15 @@ func parseCronSchedule(itm *api.Schedule, logger logrus.FieldLogger) (cron.Sched return schedule, nil } -func (controller *scheduleController) submitBackupIfDue(item *api.Schedule, cronSchedule cron.Schedule) error { +func (c *scheduleController) submitBackupIfDue(item *api.Schedule, cronSchedule cron.Schedule) error { var ( - now = controller.clock.Now() + now = c.clock.Now() isDue, nextRunTime = getNextRunTime(item, cronSchedule, now) - logContext = controller.logger.WithField("schedule", kubeutil.NamespaceAndName(item)) + log = c.logger.WithField("schedule", kubeutil.NamespaceAndName(item)) ) if !isDue { - logContext.WithField("nextRunTime", nextRunTime).Info("Schedule is not due, skipping") + log.WithField("nextRunTime", nextRunTime).Info("Schedule is not due, skipping") return nil } @@ -336,9 +257,9 @@ func (controller *scheduleController) submitBackupIfDue(item *api.Schedule, cron // It might also make sense in the future to explicitly check for currently-running // backups so that we don't overlap runs (for disk snapshots in particular, this can // lead to performance issues). - logContext.WithField("nextRunTime", nextRunTime).Info("Schedule is due, submitting Backup") + log.WithField("nextRunTime", nextRunTime).Info("Schedule is due, submitting Backup") backup := getBackup(item, now) - if _, err := controller.backupsClient.Backups(backup.Namespace).Create(backup); err != nil { + if _, err := c.backupsClient.Backups(backup.Namespace).Create(backup); err != nil { return errors.Wrap(err, "error creating Backup") } @@ -347,7 +268,7 @@ func (controller *scheduleController) submitBackupIfDue(item *api.Schedule, cron schedule.Status.LastBackup = metav1.NewTime(now) - if _, err := patchSchedule(original, schedule, controller.schedulesClient); err != nil { + if _, err := patchSchedule(original, schedule, c.schedulesClient); err != nil { return errors.Wrapf(err, "error updating Schedule's LastBackup time to %v", schedule.Status.LastBackup) }