Merge pull request #5956 from Lyndon-Li/issue-fix-5935

Fix issue 5935
pull/5969/head
Xun Jiang/Bruce Jiang 2023-03-09 15:28:47 +08:00 committed by GitHub
commit 6d8f086283
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 229 additions and 111 deletions

View File

@ -154,7 +154,7 @@
* Skip completed jobs and pods when restoring (#463, @nrb)
* Set namespace correctly when syncing backups from object storage (#472, @skriss)
* When building on macOS, bind-mount volumes with delegated config (#478, @skriss)
* Add replica sets and daemonsets to cohabitating resources so they're not backed up twice (#482 #485, @skriss)
* Add replica sets and daemonsets to cohabiting resources so they're not backed up twice (#482 #485, @skriss)
* Shut down the Ark server gracefully on SIGINT/SIGTERM (#483, @skriss)
* Only back up resources that support GET and DELETE in addition to LIST and CREATE (#486, @nrb)
* Show a better error message when trying to get an incomplete restore's logs (#496, @nrb)

View File

@ -103,7 +103,7 @@ Also added DownloadTargetKindBackupItemSnapshots for retrieving the signed URL t
* Fix CVE-2020-29652 and CVE-2020-26160 (#4274, @ywk253100)
* Refine tag-release.sh to align with change in release process (#4185, @reasonerjt)
* Fix plugins incompatible issue in upgrade test (#4141, @danfengliu)
* Verify group before treating resource as cohabitating (#4126, @sseago)
* Verify group before treating resource as cohabiting (#4126, @sseago)
* Added ItemSnapshotter plugin definition and plugin framework - addresses #3533.
Part of the Upload Progress enhancement (#3533) (#4077, @dsmithuchida)
* Add upgrade test in E2E test (#4058, @danfengliu)

View File

@ -0,0 +1 @@
Fix issue #5935, refactor the logics for backup/restore persistent log, so as to remove the contest to gzip writer

View File

@ -272,7 +272,7 @@ ignored.
type OperationProgress struct {
Completed bool // True when the operation has completed, either successfully or with a failure
Err string // Set when the operation has failed
NCompleted, NTotal int64 // Quantity completed so far and the total quanity associated with the operaation in operationUnits
NCompleted, NTotal int64 // Quantity completed so far and the total quantity associated with the operaation in operationUnits
// For data mover and volume snapshotter use cases, this would be in bytes
// On successful completion, completed and total should be the same.
OperationUnits string // Units represented by completed and total -- for data mover and item

View File

@ -1003,7 +1003,7 @@ func TestBackupResourceCohabitation(t *testing.T) {
},
},
{
name: "when deployments exist that are not in the cohabitating groups those are backed up along with apps/deployments",
name: "when deployments exist that are not in the cohabiting groups those are backed up along with apps/deployments",
backup: defaultBackup().Result(),
apiResources: []*test.APIResource{
test.VeleroDeployments(
@ -1048,7 +1048,7 @@ func TestBackupResourceCohabitation(t *testing.T) {
}
// TestBackupUsesNewCohabitatingResourcesForEachBackup ensures that when two backups are
// run that each include cohabitating resources, one copy of the relevant resources is
// run that each include cohabiting resources, one copy of the relevant resources is
// backed up in each backup. Verification is done by looking at the contents of the backup
// tarball. This covers a specific issue that was fixed by https://github.com/vmware-tanzu/velero/pull/485.
func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {

View File

@ -22,7 +22,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"sync"
@ -599,25 +598,14 @@ func (c *backupController) validateAndGetSnapshotLocations(backup *velerov1api.B
func (c *backupController) runBackup(backup *pkgbackup.Request) error {
c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)).Info("Setting up backup log")
logFile, err := ioutil.TempFile("", "")
if err != nil {
return errors.Wrap(err, "error creating temp file for backup log")
}
gzippedLogFile := gzip.NewWriter(logFile)
// Assuming we successfully uploaded the log file, this will have already been closed below. It is safe to call
// close multiple times. If we get an error closing this, there's not really anything we can do about it.
defer gzippedLogFile.Close()
defer closeAndRemoveFile(logFile, c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)))
// Log the backup to both a backup log file and to stdout. This will help see what happened if the upload of the
// backup log failed for whatever reason.
logger := logging.DefaultLogger(c.backupLogLevel, c.formatFlag)
logger.Out = io.MultiWriter(os.Stdout, gzippedLogFile)
logCounter := logging.NewLogHook()
logger.Hooks.Add(logCounter)
backupLog := logger.WithField(Backup, kubeutil.NamespaceAndName(backup))
backupLog, err := logging.NewTempFileLogger(c.backupLogLevel, c.formatFlag, logCounter, logrus.Fields{Backup: kubeutil.NamespaceAndName(backup)})
if err != nil {
return errors.Wrap(err, "error creating dual mode logger for backup")
}
defer backupLog.Dispose(c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)))
backupLog.Info("Setting up backup temp file")
backupFile, err := ioutil.TempFile("", "")
@ -752,9 +740,7 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error {
"errors": backupErrors,
}
if err := gzippedLogFile.Close(); err != nil {
c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)).WithError(err).Error("error closing gzippedLogFile")
}
backupLog.DoneForPersist(c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)))
// Assign finalize phase as close to end as possible so that any errors
// logged to backupLog are captured. This is done before uploading the
@ -793,8 +779,12 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error {
return err
}
if errs := persistBackup(backup, backupFile, logFile, backupStore, volumeSnapshots, volumeSnapshotContents, volumeSnapshotClasses, results); len(errs) > 0 {
fatalErrs = append(fatalErrs, errs...)
if logFile, err := backupLog.GetPersistFile(); err != nil {
fatalErrs = append(fatalErrs, errors.Wrap(err, "error getting backup log file"))
} else {
if errs := persistBackup(backup, backupFile, logFile, backupStore, volumeSnapshots, volumeSnapshotContents, volumeSnapshotClasses, results); len(errs) > 0 {
fatalErrs = append(fatalErrs, errs...)
}
}
c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)).Info("Backup completed")

View File

@ -100,9 +100,8 @@ type restoreReconciler struct {
}
type backupInfo struct {
backup *api.Backup
location *api.BackupStorageLocation
backupStore persistence.BackupStore
backup *api.Backup
location *api.BackupStorageLocation
}
func NewRestoreReconciler(
@ -160,13 +159,8 @@ func (r *restoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
// store a copy of the original restore for creating patch
original := restore.DeepCopy()
// Validate the restore and fetch the backup. Note that the plugin
// manager used here is not the same one used by c.runValidatedRestore,
// since within that function we want the plugin manager to log to
// our per-restore log (which is instantiated within c.runValidatedRestore).
pluginManager := r.newPluginManager(r.logger)
defer pluginManager.CleanupClients()
info := r.validateAndComplete(restore, pluginManager)
// Validate the restore and fetch the backup
info := r.validateAndComplete(restore)
// Register attempts after validation so we don't have to fetch the backup multiple times
backupScheduleName := restore.Spec.ScheduleName
@ -243,7 +237,7 @@ func (r *restoreReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}
func (r *restoreReconciler) validateAndComplete(restore *api.Restore, pluginManager clientmgmt.Manager) backupInfo {
func (r *restoreReconciler) validateAndComplete(restore *api.Restore) backupInfo {
// add non-restorable resources to restore's excluded resources
excludedResources := sets.NewString(restore.Spec.ExcludedResources...)
for _, nonrestorable := range nonRestorableResources {
@ -326,7 +320,7 @@ func (r *restoreReconciler) validateAndComplete(restore *api.Restore, pluginMana
}
}
info, err := r.fetchBackupInfo(restore.Spec.BackupName, pluginManager)
info, err := r.fetchBackupInfo(restore.Spec.BackupName)
if err != nil {
restore.Status.ValidationErrors = append(restore.Status.ValidationErrors, fmt.Sprintf("Error retrieving backup: %v", err))
return backupInfo{}
@ -381,7 +375,7 @@ func mostRecentCompletedBackup(backups []api.Backup) api.Backup {
// fetchBackupInfo checks the backup lister for a backup that matches the given name. If it doesn't
// find it, it returns an error.
func (r *restoreReconciler) fetchBackupInfo(backupName string, pluginManager clientmgmt.Manager) (backupInfo, error) {
func (r *restoreReconciler) fetchBackupInfo(backupName string) (backupInfo, error) {
backup := &api.Backup{}
err := r.kbClient.Get(context.Background(), types.NamespacedName{Namespace: r.namespace, Name: backupName}, backup)
if err != nil {
@ -396,15 +390,9 @@ func (r *restoreReconciler) fetchBackupInfo(backupName string, pluginManager cli
return backupInfo{}, errors.WithStack(err)
}
backupStore, err := r.backupStoreGetter.Get(location, pluginManager, r.logger)
if err != nil {
return backupInfo{}, err
}
return backupInfo{
backup: backup,
location: location,
backupStore: backupStore,
backup: backup,
location: location,
}, nil
}
@ -415,15 +403,20 @@ func (r *restoreReconciler) fetchBackupInfo(backupName string, pluginManager cli
func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backupInfo) error {
// instantiate the per-restore logger that will output both to a temp file
// (for upload to object storage) and to stdout.
restoreLog, err := newRestoreLogger(restore, r.restoreLogLevel, r.logFormat)
restoreLog, err := logging.NewTempFileLogger(r.restoreLogLevel, r.logFormat, nil, logrus.Fields{"restore": kubeutil.NamespaceAndName(restore)})
if err != nil {
return err
}
defer restoreLog.closeAndRemove(r.logger)
defer restoreLog.Dispose(r.logger)
pluginManager := r.newPluginManager(restoreLog)
defer pluginManager.CleanupClients()
backupStore, err := r.backupStoreGetter.Get(info.location, pluginManager, r.logger)
if err != nil {
return err
}
actions, err := pluginManager.GetRestoreItemActionsV2()
if err != nil {
return errors.Wrap(err, "error getting restore item actions")
@ -436,7 +429,7 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
}
snapshotItemResolver := framework.NewItemSnapshotterResolver(itemSnapshotters)
backupFile, err := downloadToTempFile(restore.Spec.BackupName, info.backupStore, restoreLog)
backupFile, err := downloadToTempFile(restore.Spec.BackupName, backupStore, restoreLog)
if err != nil {
return errors.Wrap(err, "error downloading backup")
}
@ -455,7 +448,7 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
return errors.WithStack(err)
}
volumeSnapshots, err := info.backupStore.GetBackupVolumeSnapshots(restore.Spec.BackupName)
volumeSnapshots, err := backupStore.GetBackupVolumeSnapshots(restore.Spec.BackupName)
if err != nil {
return errors.Wrap(err, "error fetching volume snapshots metadata")
}
@ -502,17 +495,19 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
}
restoreLog.Info("restore completed")
restoreLog.DoneForPersist(r.logger)
// re-instantiate the backup store because credentials could have changed since the original
// instantiation, if this was a long-running restore
info.backupStore, err = r.backupStoreGetter.Get(info.location, pluginManager, r.logger)
backupStore, err = r.backupStoreGetter.Get(info.location, pluginManager, r.logger)
if err != nil {
return errors.Wrap(err, "error setting up backup store to persist log and results files")
}
if logReader, err := restoreLog.done(r.logger); err != nil {
if logReader, err := restoreLog.GetPersistFile(); err != nil {
restoreErrors.Velero = append(restoreErrors.Velero, fmt.Sprintf("error getting restore log reader: %v", err))
} else {
if err := info.backupStore.PutRestoreLog(restore.Spec.BackupName, restore.Name, logReader); err != nil {
if err := backupStore.PutRestoreLog(restore.Spec.BackupName, restore.Name, logReader); err != nil {
restoreErrors.Velero = append(restoreErrors.Velero, fmt.Sprintf("error uploading log file to backup storage: %v", err))
}
}
@ -535,11 +530,11 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
"errors": restoreErrors,
}
if err := putResults(restore, m, info.backupStore); err != nil {
if err := putResults(restore, m, backupStore); err != nil {
r.logger.WithError(err).Error("Error uploading restore results to backup storage")
}
if err := putRestoredResourceList(restore, restoreReq.RestoredResourceList(), info.backupStore); err != nil {
if err := putRestoredResourceList(restore, restoreReq.RestoredResourceList(), backupStore); err != nil {
r.logger.WithError(err).Error("Error uploading restored resource list to backup storage")
}
@ -643,50 +638,3 @@ func downloadToTempFile(backupName string, backupStore persistence.BackupStore,
return file, nil
}
type restoreLogger struct {
logrus.FieldLogger
file *os.File
w *gzip.Writer
}
func newRestoreLogger(restore *api.Restore, logLevel logrus.Level, logFormat logging.Format) (*restoreLogger, error) {
file, err := ioutil.TempFile("", "")
if err != nil {
return nil, errors.Wrap(err, "error creating temp file")
}
w := gzip.NewWriter(file)
logger := logging.DefaultLogger(logLevel, logFormat)
logger.Out = io.MultiWriter(os.Stdout, w)
return &restoreLogger{
FieldLogger: logger.WithField("restore", kubeutil.NamespaceAndName(restore)),
file: file,
w: w,
}, nil
}
// done stops the restoreLogger from being able to be written to, and returns
// an io.Reader for getting the content of the logger. Any attempts to use
// restoreLogger to log after calling done will panic.
func (l *restoreLogger) done(log logrus.FieldLogger) (io.Reader, error) {
l.FieldLogger = nil
if err := l.w.Close(); err != nil {
log.WithError(errors.WithStack(err)).Error("error closing gzip writer")
}
if _, err := l.file.Seek(0, 0); err != nil {
return nil, errors.Wrap(err, "error resetting log file offset to 0")
}
return l.file, nil
}
// closeAndRemove removes the logger's underlying temporary storage. This
// method should be called when all logging and reading from the logger is
// complete.
func (l *restoreLogger) closeAndRemove(log logrus.FieldLogger) {
closeAndRemoveFile(l.file, log)
}

View File

@ -136,7 +136,7 @@ func TestFetchBackupInfo(t *testing.T) {
backupStore.On("GetBackupMetadata", test.backupName).Return(test.backupStoreBackup, nil).Maybe()
}
info, err := r.fetchBackupInfo(test.backupName, pluginManager)
info, err := r.fetchBackupInfo(test.backupName)
require.Equal(t, test.expectedErr, err != nil)
if test.expectedRes != nil {
@ -604,7 +604,7 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
Phase(velerov1api.BackupPhaseCompleted).
Result()))
r.validateAndComplete(restore, pluginManager)
r.validateAndComplete(restore)
assert.Contains(t, restore.Status.ValidationErrors, "No backups found for schedule")
assert.Empty(t, restore.Spec.BackupName)
@ -620,7 +620,7 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
Result(),
))
r.validateAndComplete(restore, pluginManager)
r.validateAndComplete(restore)
assert.Contains(t, restore.Status.ValidationErrors, "No completed backups found for schedule")
assert.Empty(t, restore.Spec.BackupName)
@ -651,7 +651,7 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
ScheduleName: "schedule-1",
},
}
r.validateAndComplete(restore, pluginManager)
r.validateAndComplete(restore)
assert.Nil(t, restore.Status.ValidationErrors)
assert.Equal(t, "foo", restore.Spec.BackupName)
}

View File

@ -0,0 +1,104 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logging
import (
"compress/gzip"
"io"
"io/ioutil"
"os"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// DualModeLogger is a thread safe logger interface to write logs to dual targets, one of which
// is a persist file, so that the log could be further transferred.
type DualModeLogger interface {
logrus.FieldLogger
// DoneForPersist stops outputting logs to the persist file
DoneForPersist(log logrus.FieldLogger)
// GetPersistFile moves the persist file pointer to beginning and returns it
GetPersistFile() (*os.File, error)
// Dispose closes the temp file pointer and removes the file
Dispose(log logrus.FieldLogger)
}
type tempFileLogger struct {
logrus.FieldLogger
logger *logrus.Logger
file *os.File
w *gzip.Writer
}
// NewTempFileLogger creates a DualModeLogger instance that writes logs to both Stdout and a file in the temp folder.
func NewTempFileLogger(logLevel logrus.Level, logFormat Format, hook *LogHook, fields logrus.Fields) (DualModeLogger, error) {
file, err := ioutil.TempFile("", "")
if err != nil {
return nil, errors.Wrap(err, "error creating temp file")
}
w := gzip.NewWriter(file)
logger := DefaultLogger(logLevel, logFormat)
logger.Out = io.MultiWriter(os.Stdout, w)
if hook != nil {
logger.Hooks.Add(hook)
}
return &tempFileLogger{
FieldLogger: logger.WithFields(fields),
logger: logger,
file: file,
w: w,
}, nil
}
func (p *tempFileLogger) DoneForPersist(log logrus.FieldLogger) {
p.logger.SetOutput(os.Stdout)
if err := p.w.Close(); err != nil {
log.WithError(err).Warn("error closing gzip writer")
}
}
func (p *tempFileLogger) GetPersistFile() (*os.File, error) {
if _, err := p.file.Seek(0, 0); err != nil {
return nil, errors.Wrap(err, "error resetting log file offset to 0")
}
return p.file, nil
}
func (p *tempFileLogger) Dispose(log logrus.FieldLogger) {
p.w.Close()
closeAndRemoveFile(p.file, log)
}
func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) {
if file == nil {
log.Debug("Skipping removal of temp log file due to nil file pointer")
return
}
if err := file.Close(); err != nil {
log.WithError(err).WithField("file", file.Name()).Warn("error closing temp log file")
}
if err := os.Remove(file.Name()); err != nil {
log.WithError(err).WithField("file", file.Name()).Warn("error removing temp log file")
}
}

View File

@ -0,0 +1,75 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logging
import (
"compress/gzip"
"io"
"os"
"strings"
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)
func TestDualModeLogger(t *testing.T) {
logMsgExpect := "Expected message in log"
logMsgUnexpect := "Unexpected message in log"
logger, err := NewTempFileLogger(logrus.DebugLevel, FormatText, nil, logrus.Fields{})
require.NoError(t, err)
logger.Info(logMsgExpect)
logger.DoneForPersist(velerotest.NewLogger())
logger.Info(logMsgUnexpect)
logFile, err := logger.GetPersistFile()
require.NoError(t, err)
logStr, err := readLogString(logFile)
require.NoError(t, err)
assert.Equal(t, true, strings.Contains(logStr, logMsgExpect))
assert.Equal(t, false, strings.Contains(logStr, logMsgUnexpect))
logger.Dispose(velerotest.NewLogger())
_, err = os.Stat(logFile.Name())
assert.Equal(t, true, os.IsNotExist(err))
}
func readLogString(file *os.File) (string, error) {
gzr, err := gzip.NewReader(file)
if err != nil {
return "", err
}
buffer := make([]byte, 1024)
_, err = gzr.Read(buffer)
if err != io.EOF {
return "", err
}
return string(buffer[:]), nil
}

View File

@ -81,7 +81,7 @@ Server:
Below we've done 6 groups of tests, for each single group of test, we used limited resources (1 core CPU 2 GB memory or 4 cores CPU 4 GB memory) to do Velero file system backup under Restic path and Kopia path, and then compare the results.
Recorded the metrics of time consumption, maximum CPU usage, maximum memory usage, and minio strorage usage for node-agent daemonset, and the metrics of Velero deployment are not included since the differences are not obvious by whether using Restic uploader or Kopia uploader.
Recorded the metrics of time consumption, maximum CPU usage, maximum memory usage, and minio storage usage for node-agent daemonset, and the metrics of Velero deployment are not included since the differences are not obvious by whether using Restic uploader or Kopia uploader.
Compression is either disabled or not unavailable for both uploader.

View File

@ -81,7 +81,7 @@ Server:
Below we've done 6 groups of tests, for each single group of test, we used limited resources (1 core CPU 2 GB memory or 4 cores CPU 4 GB memory) to do Velero file system backup under Restic path and Kopia path, and then compare the results.
Recorded the metrics of time consumption, maximum CPU usage, maximum memory usage, and minio strorage usage for node-agent daemonset, and the metrics of Velero deployment are not included since the differences are not obvious by whether using Restic uploader or Kopia uploader.
Recorded the metrics of time consumption, maximum CPU usage, maximum memory usage, and minio storage usage for node-agent daemonset, and the metrics of Velero deployment are not included since the differences are not obvious by whether using Restic uploader or Kopia uploader.
Compression is either disabled or not unavailable for both uploader.

View File

@ -118,7 +118,7 @@ func (p *PVBackupFiltering) CreateResources() error {
})
}
})
By(fmt.Sprintf("Polulate all pods %s with file %s", p.podsList, FILE_NAME), func() {
By(fmt.Sprintf("Populate all pods %s with file %s", p.podsList, FILE_NAME), func() {
for index, ns := range *p.NSIncluded {
By(fmt.Sprintf("Creating file in all pods to start %d in namespace %s", index, ns), func() {
WaitForPods(p.Ctx, p.Client, ns, p.podsList[index])