Merge pull request #5319 from Lyndon-Li/issue-fix-4874

Issue fix 4874 and 4752
pull/5344/head
lyndon 2022-09-14 11:04:56 +08:00 committed by GitHub
commit 100d6b4430
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 142 additions and 14 deletions

View File

@ -0,0 +1 @@
Fix issue 4874 and 4752: check the daemonset pod is running in the node where the workload pod resides before running the PVB for the pod

View File

@ -85,6 +85,8 @@ import (
"github.com/vmware-tanzu/velero/pkg/podvolume"
"github.com/vmware-tanzu/velero/pkg/repository"
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
)
const (
@ -112,9 +114,6 @@ const (
// defaultCredentialsDirectory is the path on disk where credential
// files will be written to
defaultCredentialsDirectory = "/tmp/credentials"
// daemonSet is the name of the Velero restic daemonset.
daemonSet = "restic"
)
type serverConfig struct {
@ -534,11 +533,11 @@ var defaultRestorePriorities = []string{
}
func (s *server) initRestic() error {
// warn if restic daemonset does not exist
if _, err := s.kubeClient.AppsV1().DaemonSets(s.namespace).Get(s.ctx, daemonSet, metav1.GetOptions{}); apierrors.IsNotFound(err) {
s.logger.Warn("Velero restic daemonset not found; restic backups/restores will not work until it's created")
// warn if node agent does not exist
if err := nodeagent.IsRunning(s.ctx, s.kubeClient, s.namespace); err == nodeagent.DaemonsetNotFound {
s.logger.Warn("Velero node agent not found; pod volume backups/restores will not work until it's created")
} else if err != nil {
s.logger.WithError(errors.WithStack(err)).Warn("Error checking for existence of velero restic daemonset")
s.logger.WithError(errors.WithStack(err)).Warn("Error checking for existence of velero node agent")
}
// ensure the repo key secret is set up
@ -619,7 +618,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
client.NewDynamicFactory(s.dynamicClient),
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),
podvolume.NewBackupperFactory(s.repoLocker, s.repoEnsurer, s.veleroClient, s.kubeClient.CoreV1(),
s.kubeClient.CoreV1(), s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger),
s.kubeClient.CoreV1(), s.kubeClient.CoreV1(),
s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger),
s.config.podVolumeOperationTimeout,
s.config.defaultVolumesToRestic,
s.config.clientPageSize,

View File

@ -0,0 +1,75 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeagent
import (
"context"
"fmt"
"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"
"github.com/vmware-tanzu/velero/pkg/util/kube"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
)
const (
// daemonSet is the name of the Velero node agent daemonset.
daemonSet = "restic"
)
var (
DaemonsetNotFound = errors.New("daemonset not found")
)
// IsRunning checks if the node agent daemonset is running properly. If not, return the error found
func IsRunning(ctx context.Context, kubeClient kubernetes.Interface, namespace string) error {
if _, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, daemonSet, metav1.GetOptions{}); apierrors.IsNotFound(err) {
return DaemonsetNotFound
} else if err != nil {
return err
} else {
return nil
}
}
// IsRunningInNode checks if the node agent pod is running properly in a specified node. If not, return the error found
func IsRunningInNode(ctx context.Context, namespace string, nodeName string, podClient corev1client.PodsGetter) error {
if nodeName == "" {
return errors.New("node name is empty")
}
pods, err := podClient.Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("name=%s", daemonSet)})
if err != nil {
return errors.Wrap(err, "failed to list daemonset pods")
}
for _, pod := range pods.Items {
if kube.IsPodRunning(&pod) != nil {
continue
}
if pod.Spec.NodeName == nodeName {
return nil
}
}
return errors.Errorf("daemonset pod not found in running state in node %s", nodeName)
}

View File

@ -32,8 +32,10 @@ import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
clientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
// Backupper can execute restic backups of volumes in a pod.
@ -49,6 +51,7 @@ type backupper struct {
veleroClient clientset.Interface
pvcClient corev1client.PersistentVolumeClaimsGetter
pvClient corev1client.PersistentVolumesGetter
podClient corev1client.PodsGetter
uploaderType string
results map[string]chan *velerov1api.PodVolumeBackup
@ -63,6 +66,7 @@ func newBackupper(
veleroClient clientset.Interface,
pvcClient corev1client.PersistentVolumeClaimsGetter,
pvClient corev1client.PersistentVolumesGetter,
podClient corev1client.PodsGetter,
uploaderType string,
log logrus.FieldLogger,
) *backupper {
@ -73,6 +77,7 @@ func newBackupper(
veleroClient: veleroClient,
pvcClient: pvcClient,
pvClient: pvClient,
podClient: podClient,
uploaderType: uploaderType,
results: make(map[string]chan *velerov1api.PodVolumeBackup),
@ -121,6 +126,16 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
return nil, []error{err}
}
err = kube.IsPodRunning(pod)
if err != nil {
return nil, []error{err}
}
err = nodeagent.IsRunningInNode(b.ctx, backup.Namespace, pod.Spec.NodeName, b.podClient)
if err != nil {
return nil, []error{err}
}
// get a single non-exclusive lock since we'll wait for all individual
// backups to be complete before releasing it.
b.repoLocker.Lock(repo.Name)
@ -167,11 +182,6 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
}
}
// ignore non-running pods
if pod.Status.Phase != corev1api.PodRunning {
log.Warnf("Skipping volume %s in pod %s/%s - pod not running", volumeName, pod.Namespace, pod.Name)
continue
}
// hostPath volumes are not supported because they're not mounted into /var/lib/kubelet/pods, so our
// daemonset pod has no way to access their data.
isHostPath, err := isHostPathVolume(&volume, pvc, b.pvClient.PersistentVolumes())

View File

@ -43,6 +43,7 @@ func NewBackupperFactory(repoLocker *repository.RepoLocker,
veleroClient clientset.Interface,
pvcClient corev1client.PersistentVolumeClaimsGetter,
pvClient corev1client.PersistentVolumesGetter,
podClient corev1client.PodsGetter,
repoInformerSynced cache.InformerSynced,
log logrus.FieldLogger) BackupperFactory {
return &backupperFactory{
@ -51,6 +52,7 @@ func NewBackupperFactory(repoLocker *repository.RepoLocker,
veleroClient: veleroClient,
pvcClient: pvcClient,
pvClient: pvClient,
podClient: podClient,
repoInformerSynced: repoInformerSynced,
log: log,
}
@ -62,6 +64,7 @@ type backupperFactory struct {
veleroClient clientset.Interface
pvcClient corev1client.PersistentVolumeClaimsGetter
pvClient corev1client.PersistentVolumesGetter
podClient corev1client.PodsGetter
repoInformerSynced cache.InformerSynced
log logrus.FieldLogger
}
@ -77,7 +80,7 @@ func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1ap
},
)
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, informer, bf.veleroClient, bf.pvcClient, bf.pvClient, uploaderType, bf.log)
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, informer, bf.veleroClient, bf.pvcClient, bf.pvClient, bf.podClient, uploaderType, bf.log)
go informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced, bf.repoInformerSynced) {

39
pkg/util/kube/pod.go Normal file
View File

@ -0,0 +1,39 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube
import (
"github.com/pkg/errors"
corev1api "k8s.io/api/core/v1"
)
// IsPodRunning does a well-rounded check to make sure the specified pod is running stably.
// If not, return the error found
func IsPodRunning(pod *corev1api.Pod) error {
if pod.Spec.NodeName == "" {
return errors.Errorf("pod is not scheduled, name=%s, namespace=%s, phase=%s", pod.Name, pod.Namespace, pod.Status.Phase)
}
if pod.Status.Phase != corev1api.PodRunning {
return errors.Errorf("pod is not running, name=%s, namespace=%s, phase=%s", pod.Name, pod.Namespace, pod.Status.Phase)
}
if pod.DeletionTimestamp != nil {
return errors.Errorf("pod is being terminated, name=%s, namespace=%s, phase=%s", pod.Name, pod.Namespace, pod.Status.Phase)
}
return nil
}