Merge pull request #891 from skriss/fix-restore-logs
remove restore log helper for accurate line #'spull/893/head
commit
94b8fae15a
|
@ -233,7 +233,7 @@ func (kr *kubernetesRestorer) Restore(log logrus.FieldLogger, restore *api.Resto
|
|||
restore: restore,
|
||||
prioritizedResources: prioritizedResources,
|
||||
selector: selector,
|
||||
logger: log,
|
||||
log: log,
|
||||
dynamicFactory: kr.dynamicFactory,
|
||||
fileSystem: kr.fileSystem,
|
||||
namespaceClient: kr.namespaceClient,
|
||||
|
@ -314,7 +314,7 @@ type context struct {
|
|||
restore *api.Restore
|
||||
prioritizedResources []schema.GroupResource
|
||||
selector labels.Selector
|
||||
logger logrus.FieldLogger
|
||||
log logrus.FieldLogger
|
||||
dynamicFactory client.DynamicFactory
|
||||
fileSystem filesystem.Interface
|
||||
namespaceClient corev1.NamespaceInterface
|
||||
|
@ -328,16 +328,12 @@ type context struct {
|
|||
pvRestorer PVRestorer
|
||||
}
|
||||
|
||||
func (ctx *context) infof(msg string, args ...interface{}) {
|
||||
ctx.logger.Infof(msg, args...)
|
||||
}
|
||||
|
||||
func (ctx *context) execute() (api.RestoreResult, api.RestoreResult) {
|
||||
ctx.infof("Starting restore of backup %s", kube.NamespaceAndName(ctx.backup))
|
||||
ctx.log.Infof("Starting restore of backup %s", kube.NamespaceAndName(ctx.backup))
|
||||
|
||||
dir, err := ctx.unzipAndExtractBackup(ctx.backupReader)
|
||||
if err != nil {
|
||||
ctx.infof("error unzipping and extracting: %v", err)
|
||||
ctx.log.Infof("error unzipping and extracting: %v", err)
|
||||
return api.RestoreResult{}, api.RestoreResult{Ark: []string{err.Error()}}
|
||||
}
|
||||
defer ctx.fileSystem.RemoveAll(dir)
|
||||
|
@ -442,7 +438,7 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
|
|||
nsPath := filepath.Join(nsSubDir, nsName)
|
||||
|
||||
if !namespaceFilter.ShouldInclude(nsName) {
|
||||
ctx.infof("Skipping namespace %s", nsName)
|
||||
ctx.log.Infof("Skipping namespace %s", nsName)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -457,7 +453,7 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
|
|||
// (in order to get any backed-up metadata), but if we don't find it there,
|
||||
// create a blank one.
|
||||
if !existingNamespaces.Has(mappedNsName) {
|
||||
logger := ctx.logger.WithField("namespace", nsName)
|
||||
logger := ctx.log.WithField("namespace", nsName)
|
||||
ns := getNamespace(logger, filepath.Join(dir, api.ResourcesDir, "namespaces", api.ClusterScopedDir, nsName+".json"), mappedNsName)
|
||||
if _, err := kube.EnsureNamespaceExists(ns, ctx.namespaceClient); err != nil {
|
||||
addArkError(&errs, err)
|
||||
|
@ -475,15 +471,15 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
|
|||
}
|
||||
|
||||
// TODO timeout?
|
||||
ctx.logger.Debugf("Waiting on resource wait group for resource=%s", resource.String())
|
||||
ctx.log.Debugf("Waiting on resource wait group for resource=%s", resource.String())
|
||||
ctx.resourceWaitGroup.Wait()
|
||||
ctx.logger.Debugf("Done waiting on resource wait group for resource=%s", resource.String())
|
||||
ctx.log.Debugf("Done waiting on resource wait group for resource=%s", resource.String())
|
||||
}
|
||||
|
||||
// TODO timeout?
|
||||
ctx.logger.Debug("Waiting on global wait group")
|
||||
ctx.log.Debug("Waiting on global wait group")
|
||||
waitErrs := ctx.globalWaitGroup.Wait()
|
||||
ctx.logger.Debug("Done waiting on global wait group")
|
||||
ctx.log.Debug("Done waiting on global wait group")
|
||||
|
||||
for _, err := range waitErrs {
|
||||
// TODO not ideal to be adding these to Ark-level errors
|
||||
|
@ -569,14 +565,14 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
|
|||
warnings, errs := api.RestoreResult{}, api.RestoreResult{}
|
||||
|
||||
if ctx.restore.Spec.IncludeClusterResources != nil && !*ctx.restore.Spec.IncludeClusterResources && namespace == "" {
|
||||
ctx.infof("Skipping resource %s because it's cluster-scoped", resource)
|
||||
ctx.log.Infof("Skipping resource %s because it's cluster-scoped", resource)
|
||||
return warnings, errs
|
||||
}
|
||||
|
||||
if namespace != "" {
|
||||
ctx.infof("Restoring resource '%s' into namespace '%s' from: %s", resource, namespace, resourcePath)
|
||||
ctx.log.Infof("Restoring resource '%s' into namespace '%s' from: %s", resource, namespace, resourcePath)
|
||||
} else {
|
||||
ctx.infof("Restoring cluster level resource '%s' from: %s", resource, resourcePath)
|
||||
ctx.log.Infof("Restoring cluster level resource '%s' from: %s", resource, resourcePath)
|
||||
}
|
||||
|
||||
files, err := ctx.fileSystem.ReadDir(resourcePath)
|
||||
|
@ -625,7 +621,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
|
|||
// non-pods with controller owners shouldn't be restored; pods with controller
|
||||
// owners should only be restored if they have restic snapshots to restore
|
||||
if groupResource != kuberesource.Pods || !restic.PodHasSnapshotAnnotation(obj) {
|
||||
ctx.infof("%s has a controller owner - skipping", kube.NamespaceAndName(obj))
|
||||
ctx.log.Infof("%s has a controller owner - skipping", kube.NamespaceAndName(obj))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
@ -636,14 +632,14 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
|
|||
continue
|
||||
}
|
||||
if complete {
|
||||
ctx.infof("%s is complete - skipping", kube.NamespaceAndName(obj))
|
||||
ctx.log.Infof("%s is complete - skipping", kube.NamespaceAndName(obj))
|
||||
continue
|
||||
}
|
||||
|
||||
if resourceClient == nil {
|
||||
// initialize client for this Resource. we need
|
||||
// metadata from an object to do this.
|
||||
ctx.infof("Getting client for %v", obj.GroupVersionKind())
|
||||
ctx.log.Infof("Getting client for %v", obj.GroupVersionKind())
|
||||
|
||||
resource := metav1.APIResource{
|
||||
Namespaced: len(namespace) > 0,
|
||||
|
@ -662,7 +658,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
|
|||
|
||||
// TODO: move to restore item action if/when we add a ShouldRestore() method to the interface
|
||||
if groupResource == kuberesource.Pods && obj.GetAnnotations()[v1.MirrorPodAnnotationKey] != "" {
|
||||
ctx.infof("Not restoring pod because it's a mirror pod")
|
||||
ctx.log.Infof("Not restoring pod because it's a mirror pod")
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -670,7 +666,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
|
|||
_, found := ctx.backup.Status.VolumeBackups[name]
|
||||
reclaimPolicy, err := collections.GetString(obj.Object, "spec.persistentVolumeReclaimPolicy")
|
||||
if err == nil && !found && reclaimPolicy == "Delete" {
|
||||
ctx.infof("Not restoring PV because it doesn't have a snapshot and its reclaim policy is Delete.")
|
||||
ctx.log.Infof("Not restoring PV because it doesn't have a snapshot and its reclaim policy is Delete.")
|
||||
|
||||
ctx.pvsToProvision.Insert(name)
|
||||
|
||||
|
@ -696,8 +692,8 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
|
|||
go func() {
|
||||
defer ctx.resourceWaitGroup.Done()
|
||||
|
||||
if _, err := waitForReady(resourceWatch.ResultChan(), name, isPVReady, time.Minute, ctx.logger); err != nil {
|
||||
ctx.logger.Warnf("Timeout reached waiting for persistent volume %s to become ready", name)
|
||||
if _, err := waitForReady(resourceWatch.ResultChan(), name, isPVReady, time.Minute, ctx.log); err != nil {
|
||||
ctx.log.Warnf("Timeout reached waiting for persistent volume %s to become ready", name)
|
||||
addArkError(&warnings, fmt.Errorf("timeout reached waiting for persistent volume %s to become ready", name))
|
||||
}
|
||||
}()
|
||||
|
@ -712,7 +708,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
|
|||
}
|
||||
|
||||
if volumeName, exists := spec["volumeName"]; exists && ctx.pvsToProvision.Has(volumeName.(string)) {
|
||||
ctx.infof("Resetting PersistentVolumeClaim %s/%s for dynamic provisioning because its PV %v has a reclaim policy of Delete", namespace, name, volumeName)
|
||||
ctx.log.Infof("Resetting PersistentVolumeClaim %s/%s for dynamic provisioning because its PV %v has a reclaim policy of Delete", namespace, name, volumeName)
|
||||
|
||||
delete(spec, "volumeName")
|
||||
|
||||
|
@ -728,7 +724,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
|
|||
continue
|
||||
}
|
||||
|
||||
ctx.infof("Executing item action for %v", &groupResource)
|
||||
ctx.log.Infof("Executing item action for %v", &groupResource)
|
||||
|
||||
updatedObj, warning, err := action.Execute(obj, ctx.restore)
|
||||
if warning != nil {
|
||||
|
@ -765,19 +761,19 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
|
|||
// and which backup they came from
|
||||
addRestoreLabels(obj, ctx.restore.Name, ctx.restore.Spec.BackupName)
|
||||
|
||||
ctx.infof("Restoring %s: %v", obj.GroupVersionKind().Kind, name)
|
||||
ctx.log.Infof("Restoring %s: %v", obj.GroupVersionKind().Kind, name)
|
||||
createdObj, restoreErr := resourceClient.Create(obj)
|
||||
if apierrors.IsAlreadyExists(restoreErr) {
|
||||
fromCluster, err := resourceClient.Get(name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
ctx.infof("Error retrieving cluster version of %s: %v", kube.NamespaceAndName(obj), err)
|
||||
ctx.log.Infof("Error retrieving cluster version of %s: %v", kube.NamespaceAndName(obj), err)
|
||||
addToResult(&warnings, namespace, err)
|
||||
continue
|
||||
}
|
||||
// Remove insubstantial metadata
|
||||
fromCluster, err = resetMetadataAndStatus(fromCluster)
|
||||
if err != nil {
|
||||
ctx.infof("Error trying to reset metadata for %s: %v", kube.NamespaceAndName(obj), err)
|
||||
ctx.log.Infof("Error trying to reset metadata for %s: %v", kube.NamespaceAndName(obj), err)
|
||||
addToResult(&warnings, namespace, err)
|
||||
continue
|
||||
}
|
||||
|
@ -792,14 +788,14 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
|
|||
case kuberesource.ServiceAccounts:
|
||||
desired, err := mergeServiceAccounts(fromCluster, obj)
|
||||
if err != nil {
|
||||
ctx.infof("error merging secrets for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err)
|
||||
ctx.log.Infof("error merging secrets for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err)
|
||||
addToResult(&warnings, namespace, err)
|
||||
continue
|
||||
}
|
||||
|
||||
patchBytes, err := generatePatch(fromCluster, desired)
|
||||
if err != nil {
|
||||
ctx.infof("error generating patch for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err)
|
||||
ctx.log.Infof("error generating patch for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err)
|
||||
addToResult(&warnings, namespace, err)
|
||||
continue
|
||||
}
|
||||
|
@ -813,7 +809,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
|
|||
if err != nil {
|
||||
addToResult(&warnings, namespace, err)
|
||||
} else {
|
||||
ctx.infof("ServiceAccount %s successfully updated", kube.NamespaceAndName(obj))
|
||||
ctx.log.Infof("ServiceAccount %s successfully updated", kube.NamespaceAndName(obj))
|
||||
}
|
||||
default:
|
||||
e := errors.Errorf("not restored: %s and is different from backed up version.", restoreErr)
|
||||
|
@ -824,24 +820,24 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
|
|||
}
|
||||
// Error was something other than an AlreadyExists
|
||||
if restoreErr != nil {
|
||||
ctx.infof("error restoring %s: %v", name, err)
|
||||
ctx.log.Infof("error restoring %s: %v", name, err)
|
||||
addToResult(&errs, namespace, fmt.Errorf("error restoring %s: %v", fullPath, restoreErr))
|
||||
continue
|
||||
}
|
||||
|
||||
if groupResource == kuberesource.Pods && len(restic.GetPodSnapshotAnnotations(obj)) > 0 {
|
||||
if ctx.resticRestorer == nil {
|
||||
ctx.logger.Warn("No restic restorer, not restoring pod's volumes")
|
||||
ctx.log.Warn("No restic restorer, not restoring pod's volumes")
|
||||
} else {
|
||||
ctx.globalWaitGroup.GoErrorSlice(func() []error {
|
||||
pod := new(v1.Pod)
|
||||
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdObj.UnstructuredContent(), &pod); err != nil {
|
||||
ctx.logger.WithError(err).Error("error converting unstructured pod")
|
||||
ctx.log.WithError(err).Error("error converting unstructured pod")
|
||||
return []error{err}
|
||||
}
|
||||
|
||||
if errs := ctx.resticRestorer.RestorePodVolumes(ctx.restore, pod, ctx.logger); errs != nil {
|
||||
ctx.logger.WithError(kubeerrs.NewAggregate(errs)).Error("unable to successfully complete restic restores of pod's volumes")
|
||||
if errs := ctx.resticRestorer.RestorePodVolumes(ctx.restore, pod, ctx.log); errs != nil {
|
||||
ctx.log.WithError(kubeerrs.NewAggregate(errs)).Error("unable to successfully complete restic restores of pod's volumes")
|
||||
return errs
|
||||
}
|
||||
|
||||
|
@ -1079,7 +1075,7 @@ func (ctx *context) unmarshal(filePath string) (*unstructured.Unstructured, erro
|
|||
func (ctx *context) unzipAndExtractBackup(src io.Reader) (string, error) {
|
||||
gzr, err := gzip.NewReader(src)
|
||||
if err != nil {
|
||||
ctx.infof("error creating gzip reader: %v", err)
|
||||
ctx.log.Infof("error creating gzip reader: %v", err)
|
||||
return "", err
|
||||
}
|
||||
defer gzr.Close()
|
||||
|
@ -1092,7 +1088,7 @@ func (ctx *context) unzipAndExtractBackup(src io.Reader) (string, error) {
|
|||
func (ctx *context) readBackup(tarRdr *tar.Reader) (string, error) {
|
||||
dir, err := ctx.fileSystem.TempDir("", "")
|
||||
if err != nil {
|
||||
ctx.infof("error creating temp dir: %v", err)
|
||||
ctx.log.Infof("error creating temp dir: %v", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
|
@ -1103,7 +1099,7 @@ func (ctx *context) readBackup(tarRdr *tar.Reader) (string, error) {
|
|||
break
|
||||
}
|
||||
if err != nil {
|
||||
ctx.infof("error reading tar: %v", err)
|
||||
ctx.log.Infof("error reading tar: %v", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
|
@ -1113,7 +1109,7 @@ func (ctx *context) readBackup(tarRdr *tar.Reader) (string, error) {
|
|||
case tar.TypeDir:
|
||||
err := ctx.fileSystem.MkdirAll(target, header.FileInfo().Mode())
|
||||
if err != nil {
|
||||
ctx.infof("mkdirall error: %v", err)
|
||||
ctx.log.Infof("mkdirall error: %v", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
|
@ -1121,7 +1117,7 @@ func (ctx *context) readBackup(tarRdr *tar.Reader) (string, error) {
|
|||
// make sure we have the directory created
|
||||
err := ctx.fileSystem.MkdirAll(filepath.Dir(target), header.FileInfo().Mode())
|
||||
if err != nil {
|
||||
ctx.infof("mkdirall error: %v", err)
|
||||
ctx.log.Infof("mkdirall error: %v", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
|
@ -1133,7 +1129,7 @@ func (ctx *context) readBackup(tarRdr *tar.Reader) (string, error) {
|
|||
defer file.Close()
|
||||
|
||||
if _, err := io.Copy(file, tarRdr); err != nil {
|
||||
ctx.infof("error copying: %v", err)
|
||||
ctx.log.Infof("error copying: %v", err)
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -189,7 +189,7 @@ func TestRestoreNamespaceFiltering(t *testing.T) {
|
|||
restore: test.restore,
|
||||
namespaceClient: &fakeNamespaceClient{},
|
||||
fileSystem: test.fileSystem,
|
||||
logger: log,
|
||||
log: log,
|
||||
prioritizedResources: test.prioritizedResources,
|
||||
}
|
||||
|
||||
|
@ -282,7 +282,7 @@ func TestRestorePriority(t *testing.T) {
|
|||
namespaceClient: &fakeNamespaceClient{},
|
||||
fileSystem: test.fileSystem,
|
||||
prioritizedResources: test.prioritizedResources,
|
||||
logger: log,
|
||||
log: log,
|
||||
}
|
||||
|
||||
warnings, errors := ctx.restoreFromDir(test.baseDir)
|
||||
|
@ -331,7 +331,7 @@ func TestNamespaceRemapping(t *testing.T) {
|
|||
prioritizedResources: prioritizedResources,
|
||||
restore: restore,
|
||||
backup: &api.Backup{},
|
||||
logger: arktest.NewLogger(),
|
||||
log: arktest.NewLogger(),
|
||||
}
|
||||
|
||||
warnings, errors := ctx.restoreFromDir(baseDir)
|
||||
|
@ -630,7 +630,7 @@ func TestRestoreResourceForNamespace(t *testing.T) {
|
|||
},
|
||||
},
|
||||
backup: &api.Backup{},
|
||||
logger: arktest.NewLogger(),
|
||||
log: arktest.NewLogger(),
|
||||
pvRestorer: &pvRestorer{},
|
||||
}
|
||||
|
||||
|
@ -716,7 +716,7 @@ func TestRestoringExistingServiceAccount(t *testing.T) {
|
|||
},
|
||||
},
|
||||
backup: &api.Backup{},
|
||||
logger: arktest.NewLogger(),
|
||||
log: arktest.NewLogger(),
|
||||
}
|
||||
warnings, errors := ctx.restoreResource("serviceaccounts", "ns-1", "foo/resources/serviceaccounts/namespaces/ns-1/")
|
||||
|
||||
|
@ -902,7 +902,7 @@ status:
|
|||
},
|
||||
},
|
||||
backup: backup,
|
||||
logger: arktest.NewLogger(),
|
||||
log: arktest.NewLogger(),
|
||||
pvsToProvision: sets.NewString(),
|
||||
pvRestorer: pvRestorer,
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue