velero/pkg/datapath/file_system.go

261 lines
8.1 KiB
Go

/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package datapath
import (
"context"
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository"
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
repoProvider "github.com/vmware-tanzu/velero/pkg/repository/provider"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/uploader/provider"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
// FSBRInitParam define the input param for FSBR init
type FSBRInitParam struct {
BSLName string
SourceNamespace string
UploaderType string
RepositoryType string
RepoIdentifier string
RepositoryEnsurer *repository.Ensurer
CredentialGetter *credentials.CredentialGetter
Filesystem filesystem.Interface
}
// FSBRStartParam define the input param for FSBR start
type FSBRStartParam struct {
RealSource string
ParentSnapshot string
ForceFull bool
Tags map[string]string
}
type fileSystemBR struct {
ctx context.Context
cancel context.CancelFunc
backupRepo *velerov1api.BackupRepository
uploaderProv provider.Provider
log logrus.FieldLogger
client client.Client
backupLocation *velerov1api.BackupStorageLocation
namespace string
initialized bool
callbacks Callbacks
jobName string
requestorType string
wgDataPath sync.WaitGroup
dataPathLock sync.Mutex
}
func newFileSystemBR(jobName string, requestorType string, client client.Client, namespace string, callbacks Callbacks, log logrus.FieldLogger) AsyncBR {
fs := &fileSystemBR{
jobName: jobName,
requestorType: requestorType,
client: client,
namespace: namespace,
callbacks: callbacks,
wgDataPath: sync.WaitGroup{},
log: log,
}
return fs
}
func (fs *fileSystemBR) Init(ctx context.Context, param any) error {
initParam := param.(*FSBRInitParam)
var err error
defer func() {
if err != nil {
fs.Close(ctx)
}
}()
fs.ctx, fs.cancel = context.WithCancel(ctx)
backupLocation := &velerov1api.BackupStorageLocation{}
if err = fs.client.Get(ctx, client.ObjectKey{
Namespace: fs.namespace,
Name: initParam.BSLName,
}, backupLocation); err != nil {
return errors.Wrapf(err, "error getting backup storage location %s", initParam.BSLName)
}
fs.backupLocation = backupLocation
fs.backupRepo, err = initParam.RepositoryEnsurer.EnsureRepo(ctx, fs.namespace, initParam.SourceNamespace, initParam.BSLName, initParam.RepositoryType)
if err != nil {
return errors.Wrapf(err, "error to ensure backup repository %s-%s-%s", initParam.BSLName, initParam.SourceNamespace, initParam.RepositoryType)
}
err = fs.boostRepoConnect(ctx, initParam.RepositoryType, initParam.CredentialGetter)
if err != nil {
return errors.Wrapf(err, "error to boost backup repository connection %s-%s-%s", initParam.BSLName, initParam.SourceNamespace, initParam.RepositoryType)
}
fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, initParam.UploaderType, fs.requestorType, initParam.RepoIdentifier,
fs.backupLocation, fs.backupRepo, initParam.CredentialGetter, repokey.RepoKeySelector(), fs.log)
if err != nil {
return errors.Wrapf(err, "error creating uploader %s", initParam.UploaderType)
}
fs.initialized = true
fs.log.WithFields(
logrus.Fields{
"jobName": fs.jobName,
"bsl": initParam.BSLName,
"source namespace": initParam.SourceNamespace,
"uploader": initParam.UploaderType,
"repository": initParam.RepositoryType,
}).Info("FileSystemBR is initialized")
return nil
}
func (fs *fileSystemBR) Close(ctx context.Context) {
if fs.cancel != nil {
fs.cancel()
}
fs.log.WithField("user", fs.jobName).Info("Closing FileSystemBR")
fs.wgDataPath.Wait()
fs.close(ctx)
fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed")
}
func (fs *fileSystemBR) close(ctx context.Context) {
fs.dataPathLock.Lock()
defer fs.dataPathLock.Unlock()
if fs.uploaderProv != nil {
if err := fs.uploaderProv.Close(ctx); err != nil {
fs.log.Errorf("failed to close uploader provider with error %v", err)
}
fs.uploaderProv = nil
}
}
func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[string]string, param any) error {
if !fs.initialized {
return errors.New("file system data path is not initialized")
}
fs.wgDataPath.Add(1)
backupParam := param.(*FSBRStartParam)
go func() {
fs.log.Info("Start data path backup")
defer func() {
fs.close(context.Background())
fs.wgDataPath.Done()
}()
snapshotID, emptySnapshot, totalBytes, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs)
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
} else if err != nil {
dataPathErr := DataPathError{
snapshotID: snapshotID,
err: err,
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source, totalBytes}})
}
}()
return nil
}
func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uploaderConfigs map[string]string) error {
if !fs.initialized {
return errors.New("file system data path is not initialized")
}
fs.wgDataPath.Add(1)
go func() {
fs.log.Info("Start data path restore")
defer func() {
fs.close(context.Background())
fs.wgDataPath.Done()
}()
totalBytes, err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs)
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
} else if err != nil {
dataPathErr := DataPathError{
snapshotID: snapshotID,
err: err,
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target, TotalBytes: totalBytes}})
}
}()
return nil
}
// UpdateProgress which implement ProgressUpdater interface to update progress status
func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) {
if fs.callbacks.OnProgress != nil {
fs.callbacks.OnProgress(context.Background(), fs.namespace, fs.jobName, &uploader.Progress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone})
}
}
func (fs *fileSystemBR) Cancel() {
fs.cancel()
fs.log.WithField("user", fs.jobName).Info("FileSystemBR is canceled")
}
func (fs *fileSystemBR) boostRepoConnect(ctx context.Context, repositoryType string, credentialGetter *credentials.CredentialGetter) error {
if repositoryType == velerov1api.BackupRepositoryTypeKopia {
if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil {
return err
}
} else {
if err := repoProvider.NewResticRepositoryProvider(credentialGetter.FromFile, filesystem.NewFileSystem(), fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil {
return err
}
}
return nil
}