/* Copyright The Velero Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package datapath import ( "context" "sync" "github.com/pkg/errors" "github.com/sirupsen/logrus" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/repository" repokey "github.com/vmware-tanzu/velero/pkg/repository/keys" repoProvider "github.com/vmware-tanzu/velero/pkg/repository/provider" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/uploader/provider" "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) // FSBRInitParam define the input param for FSBR init type FSBRInitParam struct { BSLName string SourceNamespace string UploaderType string RepositoryType string RepoIdentifier string RepositoryEnsurer *repository.Ensurer CredentialGetter *credentials.CredentialGetter Filesystem filesystem.Interface } // FSBRStartParam define the input param for FSBR start type FSBRStartParam struct { RealSource string ParentSnapshot string ForceFull bool Tags map[string]string } type fileSystemBR struct { ctx context.Context cancel context.CancelFunc backupRepo *velerov1api.BackupRepository uploaderProv provider.Provider log logrus.FieldLogger client client.Client backupLocation *velerov1api.BackupStorageLocation namespace string initialized bool callbacks Callbacks jobName string requestorType string wgDataPath sync.WaitGroup dataPathLock sync.Mutex } func newFileSystemBR(jobName string, requestorType string, client client.Client, namespace string, callbacks Callbacks, log logrus.FieldLogger) AsyncBR { fs := &fileSystemBR{ jobName: jobName, requestorType: requestorType, client: client, namespace: namespace, callbacks: callbacks, wgDataPath: sync.WaitGroup{}, log: log, } return fs } func (fs *fileSystemBR) Init(ctx context.Context, param any) error { initParam := param.(*FSBRInitParam) var err error defer func() { if err != nil { fs.Close(ctx) } }() fs.ctx, fs.cancel = context.WithCancel(ctx) backupLocation := &velerov1api.BackupStorageLocation{} if err = fs.client.Get(ctx, client.ObjectKey{ Namespace: fs.namespace, Name: initParam.BSLName, }, backupLocation); err != nil { return errors.Wrapf(err, "error getting backup storage location %s", initParam.BSLName) } fs.backupLocation = backupLocation fs.backupRepo, err = initParam.RepositoryEnsurer.EnsureRepo(ctx, fs.namespace, initParam.SourceNamespace, initParam.BSLName, initParam.RepositoryType) if err != nil { return errors.Wrapf(err, "error to ensure backup repository %s-%s-%s", initParam.BSLName, initParam.SourceNamespace, initParam.RepositoryType) } err = fs.boostRepoConnect(ctx, initParam.RepositoryType, initParam.CredentialGetter) if err != nil { return errors.Wrapf(err, "error to boost backup repository connection %s-%s-%s", initParam.BSLName, initParam.SourceNamespace, initParam.RepositoryType) } fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, initParam.UploaderType, fs.requestorType, initParam.RepoIdentifier, fs.backupLocation, fs.backupRepo, initParam.CredentialGetter, repokey.RepoKeySelector(), fs.log) if err != nil { return errors.Wrapf(err, "error creating uploader %s", initParam.UploaderType) } fs.initialized = true fs.log.WithFields( logrus.Fields{ "jobName": fs.jobName, "bsl": initParam.BSLName, "source namespace": initParam.SourceNamespace, "uploader": initParam.UploaderType, "repository": initParam.RepositoryType, }).Info("FileSystemBR is initialized") return nil } func (fs *fileSystemBR) Close(ctx context.Context) { if fs.cancel != nil { fs.cancel() } fs.log.WithField("user", fs.jobName).Info("Closing FileSystemBR") fs.wgDataPath.Wait() fs.close(ctx) fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed") } func (fs *fileSystemBR) close(ctx context.Context) { fs.dataPathLock.Lock() defer fs.dataPathLock.Unlock() if fs.uploaderProv != nil { if err := fs.uploaderProv.Close(ctx); err != nil { fs.log.Errorf("failed to close uploader provider with error %v", err) } fs.uploaderProv = nil } } func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[string]string, param any) error { if !fs.initialized { return errors.New("file system data path is not initialized") } fs.wgDataPath.Add(1) backupParam := param.(*FSBRStartParam) go func() { fs.log.Info("Start data path backup") defer func() { fs.close(context.Background()) fs.wgDataPath.Done() }() snapshotID, emptySnapshot, totalBytes, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull, backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs) if err == provider.ErrorCanceled { fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) } else if err != nil { dataPathErr := DataPathError{ snapshotID: snapshotID, err: err, } fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr) } else { fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source, totalBytes}}) } }() return nil } func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uploaderConfigs map[string]string) error { if !fs.initialized { return errors.New("file system data path is not initialized") } fs.wgDataPath.Add(1) go func() { fs.log.Info("Start data path restore") defer func() { fs.close(context.Background()) fs.wgDataPath.Done() }() totalBytes, err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs) if err == provider.ErrorCanceled { fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) } else if err != nil { dataPathErr := DataPathError{ snapshotID: snapshotID, err: err, } fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr) } else { fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target, TotalBytes: totalBytes}}) } }() return nil } // UpdateProgress which implement ProgressUpdater interface to update progress status func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) { if fs.callbacks.OnProgress != nil { fs.callbacks.OnProgress(context.Background(), fs.namespace, fs.jobName, &uploader.Progress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}) } } func (fs *fileSystemBR) Cancel() { fs.cancel() fs.log.WithField("user", fs.jobName).Info("FileSystemBR is canceled") } func (fs *fileSystemBR) boostRepoConnect(ctx context.Context, repositoryType string, credentialGetter *credentials.CredentialGetter) error { if repositoryType == velerov1api.BackupRepositoryTypeKopia { if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil { return err } } else { if err := repoProvider.NewResticRepositoryProvider(credentialGetter.FromFile, filesystem.NewFileSystem(), fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil { return err } } return nil }