346 lines
11 KiB
Go
346 lines
11 KiB
Go
/*
|
|
Copyright The Velero Contributors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package podvolume
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/kubernetes"
|
|
"sigs.k8s.io/controller-runtime/pkg/cache"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
|
|
"github.com/vmware-tanzu/velero/internal/credentials"
|
|
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
|
"github.com/vmware-tanzu/velero/pkg/datapath"
|
|
"github.com/vmware-tanzu/velero/pkg/repository"
|
|
"github.com/vmware-tanzu/velero/pkg/uploader"
|
|
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
|
|
|
cachetool "k8s.io/client-go/tools/cache"
|
|
)
|
|
|
|
// RestoreMicroService process data mover restores inside the restore pod
|
|
type RestoreMicroService struct {
|
|
ctx context.Context
|
|
client client.Client
|
|
kubeClient kubernetes.Interface
|
|
repoEnsurer *repository.Ensurer
|
|
credentialGetter *credentials.CredentialGetter
|
|
logger logrus.FieldLogger
|
|
dataPathMgr *datapath.Manager
|
|
eventRecorder kube.EventRecorder
|
|
|
|
namespace string
|
|
pvrName string
|
|
pvr *velerov1api.PodVolumeRestore
|
|
sourceTargetPath datapath.AccessPoint
|
|
|
|
resultSignal chan dataPathResult
|
|
|
|
pvrInformer cache.Informer
|
|
pvrHandler cachetool.ResourceEventHandlerRegistration
|
|
nodeName string
|
|
}
|
|
|
|
func NewRestoreMicroService(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, pvrName string, namespace string, nodeName string,
|
|
sourceTargetPath datapath.AccessPoint, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, cred *credentials.CredentialGetter,
|
|
pvrInformer cache.Informer, log logrus.FieldLogger) *RestoreMicroService {
|
|
return &RestoreMicroService{
|
|
ctx: ctx,
|
|
client: client,
|
|
kubeClient: kubeClient,
|
|
credentialGetter: cred,
|
|
logger: log,
|
|
repoEnsurer: repoEnsurer,
|
|
dataPathMgr: dataPathMgr,
|
|
namespace: namespace,
|
|
pvrName: pvrName,
|
|
sourceTargetPath: sourceTargetPath,
|
|
nodeName: nodeName,
|
|
resultSignal: make(chan dataPathResult),
|
|
pvrInformer: pvrInformer,
|
|
}
|
|
}
|
|
|
|
func (r *RestoreMicroService) Init() error {
|
|
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.pvrName, r.nodeName, r.logger)
|
|
|
|
handler, err := r.pvrInformer.AddEventHandler(
|
|
cachetool.ResourceEventHandlerFuncs{
|
|
UpdateFunc: func(oldObj any, newObj any) {
|
|
oldPvr := oldObj.(*velerov1api.PodVolumeRestore)
|
|
newPvr := newObj.(*velerov1api.PodVolumeRestore)
|
|
|
|
if newPvr.Name != r.pvrName {
|
|
return
|
|
}
|
|
|
|
if newPvr.Status.Phase != velerov1api.PodVolumeRestorePhaseInProgress {
|
|
return
|
|
}
|
|
|
|
if newPvr.Spec.Cancel && !oldPvr.Spec.Cancel {
|
|
r.cancelPodVolumeRestore(newPvr)
|
|
}
|
|
},
|
|
},
|
|
)
|
|
|
|
if err != nil {
|
|
return errors.Wrap(err, "error adding PVR handler")
|
|
}
|
|
|
|
r.pvrHandler = handler
|
|
|
|
return err
|
|
}
|
|
|
|
func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string, error) {
|
|
log := r.logger.WithFields(logrus.Fields{
|
|
"PVR": r.pvrName,
|
|
})
|
|
|
|
pvr := &velerov1api.PodVolumeRestore{}
|
|
err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) {
|
|
err := r.client.Get(ctx, types.NamespacedName{
|
|
Namespace: r.namespace,
|
|
Name: r.pvrName,
|
|
}, pvr)
|
|
if apierrors.IsNotFound(err) {
|
|
return false, nil
|
|
}
|
|
|
|
if err != nil {
|
|
return true, errors.Wrapf(err, "error to get PVR %s", r.pvrName)
|
|
}
|
|
|
|
if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress {
|
|
return true, nil
|
|
} else {
|
|
return false, nil
|
|
}
|
|
})
|
|
if err != nil {
|
|
log.WithError(err).Error("Failed to wait PVR")
|
|
return "", errors.Wrap(err, "error waiting for PVR")
|
|
}
|
|
|
|
r.pvr = pvr
|
|
|
|
log.Info("Run cancelable PVR")
|
|
|
|
callbacks := datapath.Callbacks{
|
|
OnCompleted: r.OnPvrCompleted,
|
|
OnFailed: r.OnPvrFailed,
|
|
OnCancelled: r.OnPvrCancelled,
|
|
OnProgress: r.OnPvrProgress,
|
|
}
|
|
|
|
fsRestore, err := r.dataPathMgr.CreateFileSystemBR(pvr.Name, podVolumeRequestor, ctx, r.client, pvr.Namespace, callbacks, log)
|
|
if err != nil {
|
|
return "", errors.Wrap(err, "error to create data path")
|
|
}
|
|
|
|
log.Debug("Async fs br created")
|
|
|
|
if err := fsRestore.Init(ctx,
|
|
&datapath.FSBRInitParam{
|
|
BSLName: pvr.Spec.BackupStorageLocation,
|
|
SourceNamespace: pvr.Spec.SourceNamespace,
|
|
UploaderType: pvr.Spec.UploaderType,
|
|
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
|
|
RepoIdentifier: "",
|
|
RepositoryEnsurer: r.repoEnsurer,
|
|
CredentialGetter: r.credentialGetter,
|
|
}); err != nil {
|
|
return "", errors.Wrap(err, "error to initialize data path")
|
|
}
|
|
|
|
log.Info("Async fs br init")
|
|
|
|
if err := fsRestore.StartRestore(pvr.Spec.SnapshotID, r.sourceTargetPath, pvr.Spec.UploaderSettings); err != nil {
|
|
return "", errors.Wrap(err, "error starting data path restore")
|
|
}
|
|
|
|
log.Info("Async fs restore data path started")
|
|
r.eventRecorder.Event(pvr, false, datapath.EventReasonStarted, "Data path for %s started", pvr.Name)
|
|
|
|
result := ""
|
|
select {
|
|
case <-ctx.Done():
|
|
err = errors.New("timed out waiting for fs restore to complete")
|
|
break
|
|
case res := <-r.resultSignal:
|
|
err = res.err
|
|
result = res.result
|
|
break
|
|
}
|
|
|
|
if err != nil {
|
|
log.WithError(err).Error("Async fs restore was not completed")
|
|
}
|
|
|
|
r.eventRecorder.EndingEvent(pvr, false, datapath.EventReasonStopped, "Data path for %s stopped", pvr.Name)
|
|
|
|
return result, err
|
|
}
|
|
|
|
func (r *RestoreMicroService) Shutdown() {
|
|
r.eventRecorder.Shutdown()
|
|
r.closeDataPath(r.ctx, r.pvrName)
|
|
|
|
if r.pvrHandler != nil {
|
|
if err := r.pvrInformer.RemoveEventHandler(r.pvrHandler); err != nil {
|
|
r.logger.WithError(err).Warn("Failed to remove pod handler")
|
|
}
|
|
}
|
|
}
|
|
|
|
var funcWriteCompletionMark = writeCompletionMark
|
|
|
|
func (r *RestoreMicroService) OnPvrCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) {
|
|
log := r.logger.WithField("PVR", pvrName)
|
|
|
|
err := funcWriteCompletionMark(r.pvr, result.Restore, log)
|
|
if err != nil {
|
|
log.WithError(err).Warnf("Failed to write completion mark, restored pod may failed to start")
|
|
}
|
|
|
|
restoreBytes, err := funcMarshal(result.Restore)
|
|
if err != nil {
|
|
log.WithError(err).Errorf("Failed to marshal restore result %v", result.Restore)
|
|
r.recordPvrFailed(fmt.Sprintf("error marshaling restore result %v", result.Restore), err)
|
|
} else {
|
|
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonCompleted, string(restoreBytes))
|
|
r.resultSignal <- dataPathResult{
|
|
result: string(restoreBytes),
|
|
}
|
|
}
|
|
|
|
log.Info("Async fs restore data path completed")
|
|
}
|
|
|
|
func (r *RestoreMicroService) recordPvrFailed(msg string, err error) {
|
|
evtMsg := fmt.Sprintf("%s, error %v", msg, err)
|
|
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonFailed, evtMsg)
|
|
r.resultSignal <- dataPathResult{
|
|
err: errors.Wrap(err, msg),
|
|
}
|
|
}
|
|
|
|
func (r *RestoreMicroService) OnPvrFailed(ctx context.Context, namespace string, pvrName string, err error) {
|
|
log := r.logger.WithField("PVR", pvrName)
|
|
log.WithError(err).Error("Async fs restore data path failed")
|
|
|
|
r.recordPvrFailed(fmt.Sprintf("Data path for PVR %s failed", pvrName), err)
|
|
}
|
|
|
|
func (r *RestoreMicroService) OnPvrCancelled(ctx context.Context, namespace string, pvrName string) {
|
|
log := r.logger.WithField("PVR", pvrName)
|
|
log.Warn("Async fs restore data path canceled")
|
|
|
|
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonCancelled, "Data path for PVR %s canceled", pvrName)
|
|
r.resultSignal <- dataPathResult{
|
|
err: errors.New(datapath.ErrCancelled),
|
|
}
|
|
}
|
|
|
|
func (r *RestoreMicroService) OnPvrProgress(ctx context.Context, namespace string, pvrName string, progress *uploader.Progress) {
|
|
log := r.logger.WithFields(logrus.Fields{
|
|
"PVR": pvrName,
|
|
})
|
|
|
|
progressBytes, err := funcMarshal(progress)
|
|
if err != nil {
|
|
log.WithError(err).Errorf("Failed to marshal progress %v", progress)
|
|
return
|
|
}
|
|
|
|
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonProgress, string(progressBytes))
|
|
}
|
|
|
|
func (r *RestoreMicroService) closeDataPath(ctx context.Context, pvrName string) {
|
|
fsRestore := r.dataPathMgr.GetAsyncBR(pvrName)
|
|
if fsRestore != nil {
|
|
fsRestore.Close(ctx)
|
|
}
|
|
|
|
r.dataPathMgr.RemoveAsyncBR(pvrName)
|
|
}
|
|
|
|
func (r *RestoreMicroService) cancelPodVolumeRestore(pvr *velerov1api.PodVolumeRestore) {
|
|
r.logger.WithField("PVR", pvr.Name).Info("PVR is being canceled")
|
|
|
|
r.eventRecorder.Event(pvr, false, datapath.EventReasonCancelling, "Canceling for PVR %s", pvr.Name)
|
|
|
|
fsBackup := r.dataPathMgr.GetAsyncBR(pvr.Name)
|
|
if fsBackup == nil {
|
|
r.OnPvrCancelled(r.ctx, pvr.GetNamespace(), pvr.GetName())
|
|
} else {
|
|
fsBackup.Cancel()
|
|
}
|
|
}
|
|
|
|
var funcRemoveAll = os.RemoveAll
|
|
var funcMkdirAll = os.MkdirAll
|
|
var funcWriteFile = os.WriteFile
|
|
|
|
func writeCompletionMark(pvr *velerov1api.PodVolumeRestore, result datapath.RestoreResult, log logrus.FieldLogger) error {
|
|
volumePath := result.Target.ByPath
|
|
if volumePath == "" {
|
|
return errors.New("target volume is empty in restore result")
|
|
}
|
|
|
|
// Remove the .velero directory from the restored volume (it may contain done files from previous restores
|
|
// of this volume, which we don't want to carry over). If this fails for any reason, log and continue, since
|
|
// this is non-essential cleanup (the done files are named based on restore UID and the init container looks
|
|
// for the one specific to the restore being executed).
|
|
if err := funcRemoveAll(filepath.Join(volumePath, ".velero")); err != nil {
|
|
log.WithError(err).Warnf("Failed to remove .velero directory from directory %s", volumePath)
|
|
}
|
|
|
|
if len(pvr.OwnerReferences) == 0 {
|
|
return errors.New("error finding restore UID")
|
|
}
|
|
|
|
restoreUID := pvr.OwnerReferences[0].UID
|
|
|
|
// Create the .velero directory within the volume dir so we can write a done file
|
|
// for this restore.
|
|
if err := funcMkdirAll(filepath.Join(volumePath, ".velero"), 0755); err != nil {
|
|
return errors.Wrapf(err, "error creating .velero directory for done file")
|
|
}
|
|
|
|
// Write a done file with name=<restore-uid> into the just-created .velero dir
|
|
// within the volume. The velero init container on the pod is waiting
|
|
// for this file to exist in each restored volume before completing.
|
|
if err := funcWriteFile(filepath.Join(volumePath, ".velero", string(restoreUID)), nil, 0644); err != nil {
|
|
return errors.Wrapf(err, "error writing done file")
|
|
}
|
|
|
|
return nil
|
|
}
|