fix issue 5043

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
pull/5760/head
Lyndon-Li 2022-12-15 19:58:24 +08:00
parent 55873c1c37
commit 88a1317f48
4 changed files with 84 additions and 5 deletions

View File

@ -680,7 +680,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.config.restoreResourcePriorities,
s.kubeClient.CoreV1().Namespaces(),
podvolume.NewRestorerFactory(s.repoLocker, s.repoEnsurer, s.veleroClient, s.kubeClient.CoreV1(),
s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger),
s.kubeClient.CoreV1(), s.kubeClient, s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger),
s.config.podVolumeOperationTimeout,
s.config.resourceTerminatingTimeout,
s.logger,

View File

@ -19,19 +19,24 @@ package podvolume
import (
"context"
"sync"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
clientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
type RestoreData struct {
@ -53,10 +58,13 @@ type restorer struct {
repoEnsurer *repository.RepositoryEnsurer
veleroClient clientset.Interface
pvcClient corev1client.PersistentVolumeClaimsGetter
podClient corev1client.PodsGetter
kubeClient kubernetes.Interface
resultsLock sync.Mutex
results map[string]chan *velerov1api.PodVolumeRestore
log logrus.FieldLogger
resultsLock sync.Mutex
results map[string]chan *velerov1api.PodVolumeRestore
nodeAgentCheck chan struct{}
log logrus.FieldLogger
}
func newRestorer(
@ -66,6 +74,8 @@ func newRestorer(
podVolumeRestoreInformer cache.SharedIndexInformer,
veleroClient clientset.Interface,
pvcClient corev1client.PersistentVolumeClaimsGetter,
podClient corev1client.PodsGetter,
kubeClient kubernetes.Interface,
log logrus.FieldLogger,
) *restorer {
r := &restorer{
@ -74,6 +84,8 @@ func newRestorer(
repoEnsurer: repoEnsurer,
veleroClient: veleroClient,
pvcClient: pvcClient,
podClient: podClient,
kubeClient: kubeClient,
results: make(map[string]chan *velerov1api.PodVolumeRestore),
log: log,
@ -108,6 +120,10 @@ func (r *restorer) RestorePodVolumes(data RestoreData) []error {
return nil
}
if err := nodeagent.IsRunning(r.ctx, r.kubeClient, data.Restore.Namespace); err != nil {
return []error{errors.Wrapf(err, "error to check node agent status")}
}
repositoryType, err := getVolumesRepositoryType(volumesToRestore)
if err != nil {
return []error{err}
@ -129,6 +145,8 @@ func (r *restorer) RestorePodVolumes(data RestoreData) []error {
r.results[resultsKey(data.Pod.Namespace, data.Pod.Name)] = resultsChan
r.resultsLock.Unlock()
r.nodeAgentCheck = make(chan struct{})
var (
errs []error
numRestores int
@ -161,6 +179,39 @@ func (r *restorer) RestorePodVolumes(data RestoreData) []error {
numRestores++
}
go func() {
nodeName := ""
checkFunc := func(ctx context.Context) (bool, error) {
newObj, err := r.kubeClient.CoreV1().Pods(data.Pod.Namespace).Get(context.TODO(), data.Pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
nodeName = newObj.Spec.NodeName
err = kube.IsPodScheduled(newObj)
if err != nil {
return false, nil
} else {
return true, nil
}
}
err := wait.PollWithContext(r.ctx, time.Millisecond*500, time.Minute*10, checkFunc)
if err == wait.ErrWaitTimeout {
r.log.WithError(err).Error("Restoring pod is not scheduled until timeout, disengage")
} else if err != nil {
r.log.WithError(err).Error("Failed to check node-agent pod status, disengage")
} else {
err = nodeagent.IsRunningInNode(r.ctx, data.Restore.Namespace, nodeName, r.podClient)
if err != nil {
r.log.WithField("node", nodeName).WithError(err).Error("node-agent pod is not running on node, abort the restore")
r.nodeAgentCheck <- struct{}{}
}
}
}()
ForEachVolume:
for i := 0; i < numRestores; i++ {
select {
@ -171,6 +222,9 @@ ForEachVolume:
if res.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed {
errs = append(errs, errors.Errorf("pod volume restore failed: %s", res.Status.Message))
}
case <-r.nodeAgentCheck:
errs = append(errs, errors.New("node agent pod is not running in node"))
break ForEachVolume
}
}

View File

@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
@ -42,6 +43,8 @@ func NewRestorerFactory(repoLocker *repository.RepoLocker,
repoEnsurer *repository.RepositoryEnsurer,
veleroClient clientset.Interface,
pvcClient corev1client.PersistentVolumeClaimsGetter,
podClient corev1client.PodsGetter,
kubeClient kubernetes.Interface,
repoInformerSynced cache.InformerSynced,
log logrus.FieldLogger) RestorerFactory {
return &restorerFactory{
@ -49,6 +52,8 @@ func NewRestorerFactory(repoLocker *repository.RepoLocker,
repoEnsurer: repoEnsurer,
veleroClient: veleroClient,
pvcClient: pvcClient,
podClient: podClient,
kubeClient: kubeClient,
repoInformerSynced: repoInformerSynced,
log: log,
}
@ -59,6 +64,8 @@ type restorerFactory struct {
repoEnsurer *repository.RepositoryEnsurer
veleroClient clientset.Interface
pvcClient corev1client.PersistentVolumeClaimsGetter
podClient corev1client.PodsGetter
kubeClient kubernetes.Interface
repoInformerSynced cache.InformerSynced
log logrus.FieldLogger
}
@ -74,7 +81,7 @@ func (rf *restorerFactory) NewRestorer(ctx context.Context, restore *velerov1api
},
)
r := newRestorer(ctx, rf.repoLocker, rf.repoEnsurer, informer, rf.veleroClient, rf.pvcClient, rf.log)
r := newRestorer(ctx, rf.repoLocker, rf.repoEnsurer, informer, rf.veleroClient, rf.pvcClient, rf.podClient, rf.kubeClient, rf.log)
go informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced, rf.repoInformerSynced) {

View File

@ -37,3 +37,21 @@ func IsPodRunning(pod *corev1api.Pod) error {
return nil
}
// IsPodRunning does a well-rounded check to make sure the specified pod has been scheduled into a node and in a stable status.
// If not, return the error found
func IsPodScheduled(pod *corev1api.Pod) error {
if pod.Spec.NodeName == "" {
return errors.Errorf("pod is not scheduled, name=%s, namespace=%s, phase=%s", pod.Name, pod.Namespace, pod.Status.Phase)
}
if pod.Status.Phase != corev1api.PodRunning && pod.Status.Phase != corev1api.PodPending {
return errors.Errorf("pod is not in a stable status, name=%s, namespace=%s, phase=%s", pod.Name, pod.Namespace, pod.Status.Phase)
}
if pod.DeletionTimestamp != nil {
return errors.Errorf("pod is being terminated, name=%s, namespace=%s, phase=%s", pod.Name, pod.Namespace, pod.Status.Phase)
}
return nil
}