recall existing repo maintenance to history
Signed-off-by: Lyndon-Li <lyonghui@vmware.com>pull/8580/head
parent
eeee79e551
commit
6ff0aa32e3
|
@ -22,6 +22,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
@ -40,6 +41,7 @@ import (
|
|||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/constant"
|
||||
"github.com/vmware-tanzu/velero/pkg/label"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
repoconfig "github.com/vmware-tanzu/velero/pkg/repository/config"
|
||||
repomanager "github.com/vmware-tanzu/velero/pkg/repository/manager"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
|
@ -206,12 +208,95 @@ func (r *BackupRepoReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
|||
}
|
||||
fallthrough
|
||||
case velerov1api.BackupRepositoryPhaseReady:
|
||||
if err := r.processUnrecordedMaintenance(ctx, backupRepo, log); err != nil {
|
||||
return ctrl.Result{}, errors.Wrap(err, "error handling incomplete repo maintenance jobs")
|
||||
}
|
||||
|
||||
return ctrl.Result{}, r.runMaintenanceIfDue(ctx, backupRepo, log)
|
||||
}
|
||||
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
func (r *BackupRepoReconciler) processUnrecordedMaintenance(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
|
||||
history, err := repository.WaitIncompleteMaintenance(ctx, r.Client, req, defaultMaintenanceStatusQueueLength, log)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error waiting incomplete repo maintenance job for repo %s", req.Name)
|
||||
}
|
||||
|
||||
consolidated := consolidateHistory(history, req.Status.RecentMaintenance)
|
||||
if consolidated == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Warn("Updating backup repository because of unrecorded histories")
|
||||
|
||||
return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
|
||||
rr.Status.RecentMaintenance = consolidated
|
||||
})
|
||||
}
|
||||
|
||||
func consolidateHistory(coming, cur []velerov1api.BackupRepositoryMaintenanceStatus) []velerov1api.BackupRepositoryMaintenanceStatus {
|
||||
if len(coming) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if isIdenticalHistories(coming, cur) {
|
||||
return nil
|
||||
}
|
||||
|
||||
truncated := []velerov1api.BackupRepositoryMaintenanceStatus{}
|
||||
i := len(cur) - 1
|
||||
j := len(coming) - 1
|
||||
for i >= 0 || j >= 0 {
|
||||
if len(truncated) == defaultMaintenanceStatusQueueLength {
|
||||
break
|
||||
}
|
||||
|
||||
if i >= 0 && j >= 0 {
|
||||
if isEarlierHistory(cur[i], coming[j]) {
|
||||
truncated = append(truncated, coming[j])
|
||||
j--
|
||||
} else {
|
||||
truncated = append(truncated, cur[i])
|
||||
i--
|
||||
}
|
||||
} else if i >= 0 {
|
||||
truncated = append(truncated, cur[i])
|
||||
i--
|
||||
} else {
|
||||
truncated = append(truncated, coming[j])
|
||||
j--
|
||||
}
|
||||
}
|
||||
|
||||
slices.Reverse(truncated)
|
||||
|
||||
if isIdenticalHistories(truncated, cur) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return truncated
|
||||
}
|
||||
|
||||
func isIdenticalHistories(a, b []velerov1api.BackupRepositoryMaintenanceStatus) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
|
||||
for i := 0; i < len(a); i++ {
|
||||
if !a[i].CompleteTimestamp.Equal(b[i].CompleteTimestamp) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func isEarlierHistory(a, b velerov1api.BackupRepositoryMaintenanceStatus) bool {
|
||||
return a.CompleteTimestamp.Before(b.CompleteTimestamp)
|
||||
}
|
||||
|
||||
func (r *BackupRepoReconciler) getIdentiferByBSL(ctx context.Context, req *velerov1api.BackupRepository) (string, error) {
|
||||
loc := &velerov1api.BackupStorageLocation{}
|
||||
|
||||
|
|
|
@ -87,7 +87,7 @@ func DeleteOldMaintenanceJobs(cli client.Client, repo string, keep int) error {
|
|||
}
|
||||
|
||||
func WaitForJobComplete(ctx context.Context, client client.Client, job *batchv1.Job) error {
|
||||
return wait.PollUntilContextCancel(ctx, 1, true, func(ctx context.Context) (bool, error) {
|
||||
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
err := client.Get(ctx, types.NamespacedName{Namespace: job.Namespace, Name: job.Name}, job)
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
return false, err
|
||||
|
@ -250,3 +250,62 @@ func GetMaintenanceJobConfig(
|
|||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// WaitIncompleteMaintenance checks all the incomplete maintenance jobs of the specified repo and wait for them to complete,
|
||||
// and then return the maintenance jobs in the range of limit
|
||||
func WaitIncompleteMaintenance(ctx context.Context, cli client.Client, repo *velerov1api.BackupRepository, limit int, log logrus.FieldLogger) ([]velerov1api.BackupRepositoryMaintenanceStatus, error) {
|
||||
jobList := &batchv1.JobList{}
|
||||
err := cli.List(context.TODO(), jobList, &client.ListOptions{
|
||||
Namespace: repo.Namespace,
|
||||
},
|
||||
client.MatchingLabels(map[string]string{RepositoryNameLabel: repo.Name}),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error listing maintenance job for repo %s", repo.Name)
|
||||
}
|
||||
|
||||
if len(jobList.Items) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
history := []velerov1api.BackupRepositoryMaintenanceStatus{}
|
||||
|
||||
for _, job := range jobList.Items {
|
||||
if job.Status.Succeeded == 0 && job.Status.Failed == 0 {
|
||||
log.Infof("Waiting for maintenance job %s to complete", job.Name)
|
||||
|
||||
if err := WaitForJobComplete(ctx, cli, &job); err != nil {
|
||||
return nil, errors.Wrapf(err, "error waiting maintenance job[%s] complete", job.Name)
|
||||
}
|
||||
}
|
||||
|
||||
result := velerov1api.BackupRepositoryMaintenanceSucceeded
|
||||
if job.Status.Failed > 0 {
|
||||
result = velerov1api.BackupRepositoryMaintenanceFailed
|
||||
}
|
||||
|
||||
message, err := GetMaintenanceResultFromJob(cli, &job)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error getting maintenance job[%s] result", job.Name)
|
||||
}
|
||||
|
||||
history = append(history, velerov1api.BackupRepositoryMaintenanceStatus{
|
||||
Result: result,
|
||||
StartTimestamp: &metav1.Time{Time: job.Status.StartTime.Time},
|
||||
CompleteTimestamp: &metav1.Time{Time: job.Status.CompletionTime.Time},
|
||||
Message: message,
|
||||
})
|
||||
}
|
||||
|
||||
sort.Slice(history, func(i, j int) bool {
|
||||
return history[i].CompleteTimestamp.Time.After(history[j].CompleteTimestamp.Time)
|
||||
})
|
||||
|
||||
startPos := len(history) - limit
|
||||
if startPos < 0 {
|
||||
startPos = 0
|
||||
}
|
||||
|
||||
return history[startPos:], nil
|
||||
}
|
||||
|
|
|
@ -458,3 +458,144 @@ func TestGetMaintenanceJobConfig(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
// func TestWaitIncompleteMaintenance(t *testing.T) {
|
||||
// ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// veleroNamespace := "velero"
|
||||
// repo := &velerov1api.BackupRepository{
|
||||
// ObjectMeta: metav1.ObjectMeta{
|
||||
// Namespace: veleroNamespace,
|
||||
// Name: "fake-repo",
|
||||
// },
|
||||
// Spec: velerov1api.BackupRepositorySpec{
|
||||
// BackupStorageLocation: "default",
|
||||
// RepositoryType: "kopia",
|
||||
// VolumeNamespace: "test",
|
||||
// },
|
||||
// }
|
||||
|
||||
// scheme := runtime.NewScheme()
|
||||
// batchv1.AddToScheme(scheme)
|
||||
|
||||
// testCases := []struct {
|
||||
// name string
|
||||
// ctx context.Context
|
||||
// kubeClientObj []runtime.Object
|
||||
// runtimeScheme *runtime.Scheme
|
||||
// expectedStatus []velerov1api.BackupRepositoryMaintenanceStatus
|
||||
// expectedError string
|
||||
// }{
|
||||
// {
|
||||
// name: "list job error",
|
||||
// expectedError: "error listing maintenance job for repo fake-repo",
|
||||
// },
|
||||
// {
|
||||
// name: "job not exist",
|
||||
// runtimeScheme: scheme,
|
||||
// },
|
||||
// {
|
||||
// name: "no matching job",
|
||||
// runtimeScheme: scheme,
|
||||
// kubeClientObj: []runtime.Object{
|
||||
// jobOtherLabel,
|
||||
// },
|
||||
// },
|
||||
// {
|
||||
// name: "wait complete error",
|
||||
// ctx: context.WithTimeout(context.TODO(), time.Second),
|
||||
// runtimeScheme: scheme,
|
||||
// kubeClientObj: []runtime.Object{
|
||||
// jobIncomplete,
|
||||
// },
|
||||
// expectedError: nil,
|
||||
// },
|
||||
// {
|
||||
// name: "Find config specific for global",
|
||||
// repoJobConfig: &v1.ConfigMap{
|
||||
// ObjectMeta: metav1.ObjectMeta{
|
||||
// Namespace: veleroNamespace,
|
||||
// Name: repoMaintenanceJobConfig,
|
||||
// },
|
||||
// Data: map[string]string{
|
||||
// GlobalKeyForRepoMaintenanceJobCM: "{\"podResources\":{\"cpuRequest\":\"50m\",\"cpuLimit\":\"100m\",\"memoryRequest\":\"50Mi\",\"memoryLimit\":\"100Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"n2\"]}]}}]}",
|
||||
// },
|
||||
// },
|
||||
// expectedConfig: &JobConfigs{
|
||||
// PodResources: &kube.PodResources{
|
||||
// CPURequest: "50m",
|
||||
// CPULimit: "100m",
|
||||
// MemoryRequest: "50Mi",
|
||||
// MemoryLimit: "100Mi",
|
||||
// },
|
||||
// LoadAffinities: []*kube.LoadAffinity{
|
||||
// {
|
||||
// NodeSelector: metav1.LabelSelector{
|
||||
// MatchExpressions: []metav1.LabelSelectorRequirement{
|
||||
// {
|
||||
// Key: "cloud.google.com/machine-family",
|
||||
// Operator: metav1.LabelSelectorOpIn,
|
||||
// Values: []string{"n2"},
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// expectedError: nil,
|
||||
// },
|
||||
// {
|
||||
// name: "Specific config supersede global config",
|
||||
// repoJobConfig: &v1.ConfigMap{
|
||||
// ObjectMeta: metav1.ObjectMeta{
|
||||
// Namespace: veleroNamespace,
|
||||
// Name: repoMaintenanceJobConfig,
|
||||
// },
|
||||
// Data: map[string]string{
|
||||
// GlobalKeyForRepoMaintenanceJobCM: "{\"podResources\":{\"cpuRequest\":\"50m\",\"cpuLimit\":\"100m\",\"memoryRequest\":\"50Mi\",\"memoryLimit\":\"100Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"n2\"]}]}}]}",
|
||||
// "test-default-kopia": "{\"podResources\":{\"cpuRequest\":\"100m\",\"cpuLimit\":\"200m\",\"memoryRequest\":\"100Mi\",\"memoryLimit\":\"200Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"e2\"]}]}}]}",
|
||||
// },
|
||||
// },
|
||||
// expectedConfig: &JobConfigs{
|
||||
// PodResources: &kube.PodResources{
|
||||
// CPURequest: "100m",
|
||||
// CPULimit: "200m",
|
||||
// MemoryRequest: "100Mi",
|
||||
// MemoryLimit: "200Mi",
|
||||
// },
|
||||
// LoadAffinities: []*kube.LoadAffinity{
|
||||
// {
|
||||
// NodeSelector: metav1.LabelSelector{
|
||||
// MatchExpressions: []metav1.LabelSelectorRequirement{
|
||||
// {
|
||||
// Key: "cloud.google.com/machine-family",
|
||||
// Operator: metav1.LabelSelectorOpIn,
|
||||
// Values: []string{"e2"},
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// expectedError: nil,
|
||||
// },
|
||||
// }
|
||||
|
||||
// for _, test := range testCases {
|
||||
// t.Run(test.name, func(t *testing.T) {
|
||||
// fakeClientBuilder := fake.NewClientBuilder()
|
||||
// if test.runtimeScheme != nil {
|
||||
// fakeClientBuilder = fakeClientBuilder.WithScheme(test.runtimeScheme)
|
||||
// }
|
||||
|
||||
// fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
|
||||
|
||||
// if tc.expectedError != nil {
|
||||
// require.ErrorContains(t, err, tc.expectedError.Error())
|
||||
// } else {
|
||||
// require.NoError(t, err)
|
||||
// }
|
||||
// require.Equal(t, tc.expectedConfig, jobConfig)
|
||||
// })
|
||||
// }
|
||||
// }
|
||||
|
|
Loading…
Reference in New Issue