265 lines
8.1 KiB
265 lines
8.1 KiB
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package repository
import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero"
const RepositoryNameLabel = "velero.io/repo-name"
const DefaultKeepLatestMaitenanceJobs = 3
const DefaultMaintenanceJobCPURequest = "0"
const DefaultMaintenanceJobCPULimit = "0"
const DefaultMaintenanceJobMemRequest = "0"
const DefaultMaintenanceJobMemLimit = "0"
// MaintenanceConfig is the configuration for the repo maintenance job
type MaintenanceConfig struct {
KeepLatestMaitenanceJobs int
CPURequest string
MemRequest string
CPULimit string
MemLimit string
LogLevelFlag *logging.LevelFlag
FormatFlag *logging.FormatFlag
func generateJobName(repo string) string {
millisecond := time.Now().UTC().UnixMilli() // millisecond
jobName := fmt.Sprintf("%s-maintain-job-%d", repo, millisecond)
if len(jobName) > 63 { // k8s job name length limit
jobName = fmt.Sprintf("repo-maintain-job-%d", millisecond)
return jobName
func buildMaintenanceJob(m MaintenanceConfig, param provider.RepoParam, cli client.Client, namespace string) (*batchv1.Job, error) {
// Get the Velero server deployment
deployment := &appsv1.Deployment{}
err := cli.Get(context.TODO(), types.NamespacedName{Name: "velero", Namespace: namespace}, deployment)
if err != nil {
return nil, err
// Get the environment variables from the Velero server deployment
envVars := veleroutil.GetEnvVarsFromVeleroServer(deployment)
// Get the volume mounts from the Velero server deployment
volumeMounts := veleroutil.GetVolumeMountsFromVeleroServer(deployment)
// Get the volumes from the Velero server deployment
volumes := veleroutil.GetVolumesFromVeleroServer(deployment)
// Get the service account from the Velero server deployment
serviceAccount := veleroutil.GetServiceAccountFromVeleroServer(deployment)
// Get image
image := veleroutil.GetVeleroServerImage(deployment)
// Set resource limits and requests
if m.CPURequest == "" {
m.CPURequest = DefaultMaintenanceJobCPURequest
if m.MemRequest == "" {
m.MemRequest = DefaultMaintenanceJobMemRequest
if m.CPULimit == "" {
m.CPULimit = DefaultMaintenanceJobCPULimit
if m.MemLimit == "" {
m.MemLimit = DefaultMaintenanceJobMemLimit
resources, err := kube.ParseResourceRequirements(m.CPURequest, m.MemRequest, m.CPULimit, m.MemLimit)
if err != nil {
return nil, errors.Wrap(err, "failed to parse resource requirements for maintenance job")
// Set arguments
args := []string{"repo-maintenance"}
args = append(args, fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace))
args = append(args, fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType))
args = append(args, fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name))
args = append(args, fmt.Sprintf("--log-level=%s", m.LogLevelFlag.String()))
args = append(args, fmt.Sprintf("--log-format=%s", m.FormatFlag.String()))
// build the maintenance job
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: generateJobName(param.BackupRepo.Name),
Namespace: param.BackupRepo.Namespace,
Labels: map[string]string{
RepositoryNameLabel: param.BackupRepo.Name,
Spec: batchv1.JobSpec{
BackoffLimit: new(int32), // Never retry
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "velero-repo-maintenance-pod",
Spec: v1.PodSpec{
Containers: []v1.Container{
Name: "velero-repo-maintenance-container",
Image: image,
Command: []string{
Args: args,
ImagePullPolicy: v1.PullIfNotPresent,
Env: envVars,
VolumeMounts: volumeMounts,
Resources: resources,
RestartPolicy: v1.RestartPolicyNever,
Volumes: volumes,
ServiceAccountName: serviceAccount,
if affinity := veleroutil.GetAffinityFromVeleroServer(deployment); affinity != nil {
job.Spec.Template.Spec.Affinity = affinity
if tolerations := veleroutil.GetTolerationsFromVeleroServer(deployment); tolerations != nil {
job.Spec.Template.Spec.Tolerations = tolerations
if nodeSelector := veleroutil.GetNodeSelectorFromVeleroServer(deployment); nodeSelector != nil {
job.Spec.Template.Spec.NodeSelector = nodeSelector
if labels := veleroutil.GetVeleroServerLables(deployment); len(labels) > 0 {
job.Spec.Template.Labels = labels
if annotations := veleroutil.GetVeleroServerAnnotations(deployment); len(annotations) > 0 {
job.Spec.Template.Annotations = annotations
return job, nil
// deleteOldMaintenanceJobs deletes old maintenance jobs and keeps the latest N jobs
func deleteOldMaintenanceJobs(cli client.Client, repo string, keep int) error {
// Get the maintenance job list by label
jobList := &batchv1.JobList{}
err := cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo}))
if err != nil {
return err
// Delete old maintenance jobs
if len(jobList.Items) > keep {
sort.Slice(jobList.Items, func(i, j int) bool {
return jobList.Items[i].CreationTimestamp.Before(&jobList.Items[j].CreationTimestamp)
for i := 0; i < len(jobList.Items)-keep; i++ {
err = cli.Delete(context.TODO(), &jobList.Items[i], client.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil {
return err
return nil
func waitForJobComplete(ctx context.Context, client client.Client, job *batchv1.Job) error {
return wait.PollUntilContextCancel(ctx, 1, true, func(ctx context.Context) (bool, error) {
err := client.Get(ctx, types.NamespacedName{Namespace: job.Namespace, Name: job.Name}, job)
if err != nil && !apierrors.IsNotFound(err) {
return false, err
if job.Status.Succeeded > 0 {
return true, nil
if job.Status.Failed > 0 {
return true, fmt.Errorf("maintenance job %s/%s failed", job.Namespace, job.Name)
return false, nil
func getMaintenanceResultFromJob(cli client.Client, job *batchv1.Job) (string, error) {
// Get the maintenance job related pod by label selector
podList := &v1.PodList{}
err := cli.List(context.TODO(), podList, client.InNamespace(job.Namespace), client.MatchingLabels(map[string]string{"job-name": job.Name}))
if err != nil {
return "", err
if len(podList.Items) == 0 {
return "", fmt.Errorf("no pod found for job %s", job.Name)
// we only have one maintenance pod for the job
return podList.Items[0].Status.ContainerStatuses[0].State.Terminated.Message, nil
func getLatestMaintenanceJob(cli client.Client, ns string) (*batchv1.Job, error) {
// Get the maintenance job list by label
jobList := &batchv1.JobList{}
err := cli.List(context.TODO(), jobList, &client.ListOptions{
Namespace: ns,
if err != nil {
return nil, err
if len(jobList.Items) == 0 {
return nil, nil
// Get the latest maintenance job
sort.Slice(jobList.Items, func(i, j int) bool {
return jobList.Items[i].CreationTimestamp.Time.After(jobList.Items[j].CreationTimestamp.Time)
return &jobList.Items[0], nil