velero/pkg/podvolume/backup_micro_service.go

314 lines
9.0 KiB
Go

/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podvolume
import (
"context"
"encoding/json"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
cachetool "k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/kube"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)
const (
podVolumeRequestor = "snapshot-pod-volume"
)
// BackupMicroService process data mover backups inside the backup pod
type BackupMicroService struct {
ctx context.Context
client client.Client
kubeClient kubernetes.Interface
repoEnsurer *repository.Ensurer
credentialGetter *credentials.CredentialGetter
logger logrus.FieldLogger
dataPathMgr *datapath.Manager
eventRecorder kube.EventRecorder
namespace string
pvbName string
pvb *velerov1api.PodVolumeBackup
sourceTargetPath datapath.AccessPoint
resultSignal chan dataPathResult
pvbInformer cache.Informer
pvbHandler cachetool.ResourceEventHandlerRegistration
nodeName string
}
type dataPathResult struct {
err error
result string
}
func NewBackupMicroService(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, pvbName string, namespace string, nodeName string,
sourceTargetPath datapath.AccessPoint, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, cred *credentials.CredentialGetter,
pvbInformer cache.Informer, log logrus.FieldLogger) *BackupMicroService {
return &BackupMicroService{
ctx: ctx,
client: client,
kubeClient: kubeClient,
credentialGetter: cred,
logger: log,
repoEnsurer: repoEnsurer,
dataPathMgr: dataPathMgr,
namespace: namespace,
pvbName: pvbName,
sourceTargetPath: sourceTargetPath,
nodeName: nodeName,
resultSignal: make(chan dataPathResult),
pvbInformer: pvbInformer,
}
}
func (r *BackupMicroService) Init() error {
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.pvbName, r.nodeName, r.logger)
handler, err := r.pvbInformer.AddEventHandler(
cachetool.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj any, newObj any) {
oldPvb := oldObj.(*velerov1api.PodVolumeBackup)
newPvb := newObj.(*velerov1api.PodVolumeBackup)
if newPvb.Name != r.pvbName {
return
}
if newPvb.Status.Phase != velerov1api.PodVolumeBackupPhaseInProgress {
return
}
if newPvb.Spec.Cancel && !oldPvb.Spec.Cancel {
r.cancelPodVolumeBackup(newPvb)
}
},
},
)
if err != nil {
return errors.Wrap(err, "error adding PVB handler")
}
r.pvbHandler = handler
return err
}
func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string, error) {
log := r.logger.WithFields(logrus.Fields{
"PVB": r.pvbName,
})
pvb := &velerov1api.PodVolumeBackup{}
err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) {
err := r.client.Get(ctx, types.NamespacedName{
Namespace: r.namespace,
Name: r.pvbName,
}, pvb)
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return true, errors.Wrapf(err, "error to get PVB %s", r.pvbName)
}
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseInProgress {
return true, nil
} else {
return false, nil
}
})
if err != nil {
log.WithError(err).Error("Failed to wait PVB")
return "", errors.Wrap(err, "error waiting for PVB")
}
r.pvb = pvb
log.Info("Run cancelable PVB")
callbacks := datapath.Callbacks{
OnCompleted: r.OnDataPathCompleted,
OnFailed: r.OnDataPathFailed,
OnCancelled: r.OnDataPathCancelled,
OnProgress: r.OnDataPathProgress,
}
fsBackup, err := r.dataPathMgr.CreateFileSystemBR(pvb.Name, podVolumeRequestor, ctx, r.client, pvb.Namespace, callbacks, log)
if err != nil {
return "", errors.Wrap(err, "error to create data path")
}
log.Debug("Async fs br created")
if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
BSLName: pvb.Spec.BackupStorageLocation,
SourceNamespace: pvb.Spec.Pod.Namespace,
UploaderType: pvb.Spec.UploaderType,
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repoEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return "", errors.Wrap(err, "error to initialize data path")
}
log.Info("Async fs br init")
tags := map[string]string{}
if err := fsBackup.StartBackup(r.sourceTargetPath, pvb.Spec.UploaderSettings, &datapath.FSBRStartParam{
RealSource: GetRealSource(pvb),
ParentSnapshot: "",
ForceFull: false,
Tags: tags,
}); err != nil {
return "", errors.Wrap(err, "error starting data path backup")
}
log.Info("Async fs backup data path started")
r.eventRecorder.Event(pvb, false, datapath.EventReasonStarted, "Data path for %s started", pvb.Name)
result := ""
select {
case <-ctx.Done():
err = errors.New("timed out waiting for fs backup to complete")
break
case res := <-r.resultSignal:
err = res.err
result = res.result
break
}
if err != nil {
log.WithError(err).Error("Async fs backup was not completed")
}
r.eventRecorder.EndingEvent(pvb, false, datapath.EventReasonStopped, "Data path for %s stopped", pvb.Name)
return result, err
}
func (r *BackupMicroService) Shutdown() {
r.eventRecorder.Shutdown()
r.closeDataPath(r.ctx, r.pvbName)
if r.pvbHandler != nil {
if err := r.pvbInformer.RemoveEventHandler(r.pvbHandler); err != nil {
r.logger.WithError(err).Warn("Failed to remove pod handler")
}
}
}
var funcMarshal = json.Marshal
func (r *BackupMicroService) OnDataPathCompleted(ctx context.Context, namespace string, pvbName string, result datapath.Result) {
log := r.logger.WithField("PVB", pvbName)
backupBytes, err := funcMarshal(result.Backup)
if err != nil {
log.WithError(err).Errorf("Failed to marshal backup result %v", result.Backup)
r.resultSignal <- dataPathResult{
err: errors.Wrapf(err, "Failed to marshal backup result %v", result.Backup),
}
} else {
r.eventRecorder.Event(r.pvb, false, datapath.EventReasonCompleted, string(backupBytes))
r.resultSignal <- dataPathResult{
result: string(backupBytes),
}
}
log.Info("Async fs backup completed")
}
func (r *BackupMicroService) OnDataPathFailed(ctx context.Context, namespace string, pvbName string, err error) {
log := r.logger.WithField("PVB", pvbName)
log.WithError(err).Error("Async fs backup data path failed")
r.eventRecorder.Event(r.pvb, false, datapath.EventReasonFailed, "Data path for PVB %s failed, error %v", r.pvbName, err)
r.resultSignal <- dataPathResult{
err: errors.Wrapf(err, "Data path for PVB %s failed", r.pvbName),
}
}
func (r *BackupMicroService) OnDataPathCancelled(ctx context.Context, namespace string, pvbName string) {
log := r.logger.WithField("PVB", pvbName)
log.Warn("Async fs backup data path canceled")
r.eventRecorder.Event(r.pvb, false, datapath.EventReasonCancelled, "Data path for PVB %s canceled", pvbName)
r.resultSignal <- dataPathResult{
err: errors.New(datapath.ErrCancelled),
}
}
func (r *BackupMicroService) OnDataPathProgress(ctx context.Context, namespace string, pvbName string, progress *uploader.Progress) {
log := r.logger.WithFields(logrus.Fields{
"PVB": pvbName,
})
progressBytes, err := funcMarshal(progress)
if err != nil {
log.WithError(err).Errorf("Failed to marshal progress %v", progress)
return
}
r.eventRecorder.Event(r.pvb, false, datapath.EventReasonProgress, string(progressBytes))
}
func (r *BackupMicroService) closeDataPath(ctx context.Context, duName string) {
fsBackup := r.dataPathMgr.GetAsyncBR(duName)
if fsBackup != nil {
fsBackup.Close(ctx)
}
r.dataPathMgr.RemoveAsyncBR(duName)
}
func (r *BackupMicroService) cancelPodVolumeBackup(pvb *velerov1api.PodVolumeBackup) {
r.logger.WithField("pvb", pvb.Name).Info("PVB is being canceled")
r.eventRecorder.Event(pvb, false, datapath.EventReasonCancelling, "Canceling for PVB %s", pvb.Name)
fsBackup := r.dataPathMgr.GetAsyncBR(pvb.Name)
if fsBackup == nil {
r.OnDataPathCancelled(r.ctx, pvb.GetNamespace(), pvb.GetName())
} else {
fsBackup.Cancel()
}
}