Merge pull request #4748 from ywk253100/220309_schedule_kubebuilder
Refactor schedule controller with kubebuilderpull/4822/head
commit
9f83fc57c9
|
@ -0,0 +1 @@
|
|||
Refactor schedule controller with kubebuilder
|
|
@ -16,7 +16,27 @@ spec:
|
|||
singular: schedule
|
||||
scope: Namespaced
|
||||
versions:
|
||||
- name: v1
|
||||
- additionalPrinterColumns:
|
||||
- description: Name of the schedule
|
||||
jsonPath: .metadata.name
|
||||
name: Name
|
||||
type: string
|
||||
- description: Status of the schedule
|
||||
jsonPath: .status.phase
|
||||
name: Status
|
||||
type: string
|
||||
- description: A Cron expression defining when to run the Backup
|
||||
jsonPath: .spec.schedule
|
||||
name: Schedule
|
||||
type: string
|
||||
- description: The last time a Backup was run for this schedule
|
||||
jsonPath: .status.lastBackup
|
||||
name: LastBackup
|
||||
type: date
|
||||
- jsonPath: .metadata.creationTimestamp
|
||||
name: Age
|
||||
type: date
|
||||
name: v1
|
||||
schema:
|
||||
openAPIV3Schema:
|
||||
description: Schedule is a Velero resource that represents a pre-scheduled
|
||||
|
@ -393,6 +413,8 @@ spec:
|
|||
type: object
|
||||
served: true
|
||||
storage: true
|
||||
subresources:
|
||||
status: {}
|
||||
status:
|
||||
acceptedNames:
|
||||
kind: ""
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -4,8 +4,14 @@ apiVersion: rbac.authorization.k8s.io/v1
|
|||
kind: ClusterRole
|
||||
metadata:
|
||||
creationTimestamp: null
|
||||
name: manager-role
|
||||
name: velero-perms
|
||||
rules:
|
||||
- apiGroups:
|
||||
- velero.io
|
||||
resources:
|
||||
- backups
|
||||
verbs:
|
||||
- create
|
||||
- apiGroups:
|
||||
- velero.io
|
||||
resources:
|
||||
|
@ -46,6 +52,26 @@ rules:
|
|||
- get
|
||||
- patch
|
||||
- update
|
||||
- apiGroups:
|
||||
- velero.io
|
||||
resources:
|
||||
- schedules
|
||||
verbs:
|
||||
- create
|
||||
- delete
|
||||
- get
|
||||
- list
|
||||
- patch
|
||||
- update
|
||||
- watch
|
||||
- apiGroups:
|
||||
- velero.io
|
||||
resources:
|
||||
- schedules/status
|
||||
verbs:
|
||||
- get
|
||||
- patch
|
||||
- update
|
||||
- apiGroups:
|
||||
- velero.io
|
||||
resources:
|
||||
|
|
|
@ -49,6 +49,7 @@ ${GOPATH}/src/k8s.io/code-generator/generate-groups.sh \
|
|||
controller-gen \
|
||||
crd:crdVersions=v1\
|
||||
paths=./pkg/apis/velero/v1/... \
|
||||
rbac:roleName=velero-perms \
|
||||
paths=./pkg/controller/... \
|
||||
output:crd:artifacts:config=config/crd/v1/bases
|
||||
|
||||
|
|
|
@ -77,8 +77,18 @@ type ScheduleStatus struct {
|
|||
ValidationErrors []string `json:"validationErrors,omitempty"`
|
||||
}
|
||||
|
||||
// TODO(2.0) After converting all resources to use the runtime-controller client, the genclient and k8s:deepcopy markers will no longer be needed and should be removed.
|
||||
// +genclient
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
// +kubebuilder:object:root
|
||||
// +kubebuilder:object:generate=true
|
||||
// +kubebuilder:storageversion
|
||||
// +kubebuilder:subresource:status
|
||||
// +kubebuilder:printcolumn:name="Name",type="string",JSONPath=".metadata.name",description="Name of the schedule"
|
||||
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="Status of the schedule"
|
||||
// +kubebuilder:printcolumn:name="Schedule",type="string",JSONPath=".spec.schedule",description="A Cron expression defining when to run the Backup"
|
||||
// +kubebuilder:printcolumn:name="LastBackup",type="date",JSONPath=".status.lastBackup",description="The last time a Backup was run for this schedule"
|
||||
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
|
||||
|
||||
// Schedule is a Velero resource that represents a pre-scheduled or
|
||||
// periodic Backup that should be run.
|
||||
|
@ -96,6 +106,7 @@ type Schedule struct {
|
|||
}
|
||||
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
// +kubebuilder:object:root
|
||||
|
||||
// ScheduleList is a list of Schedules.
|
||||
type ScheduleList struct {
|
|
@ -654,22 +654,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
|
|||
}
|
||||
}
|
||||
|
||||
scheduleControllerRunInfo := func() controllerRunInfo {
|
||||
scheduleController := controller.NewScheduleController(
|
||||
s.namespace,
|
||||
s.veleroClient.VeleroV1(),
|
||||
s.veleroClient.VeleroV1(),
|
||||
s.sharedInformerFactory.Velero().V1().Schedules(),
|
||||
s.logger,
|
||||
s.metrics,
|
||||
)
|
||||
|
||||
return controllerRunInfo{
|
||||
controller: scheduleController,
|
||||
numWorkers: defaultControllerWorkers,
|
||||
}
|
||||
}
|
||||
|
||||
gcControllerRunInfo := func() controllerRunInfo {
|
||||
gcController := controller.NewGCController(
|
||||
s.logger,
|
||||
|
@ -771,7 +755,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
|
|||
enabledControllers := map[string]func() controllerRunInfo{
|
||||
controller.BackupSync: backupSyncControllerRunInfo,
|
||||
controller.Backup: backupControllerRunInfo,
|
||||
controller.Schedule: scheduleControllerRunInfo,
|
||||
controller.GarbageCollection: gcControllerRunInfo,
|
||||
controller.BackupDeletion: deletionControllerRunInfo,
|
||||
controller.Restore: restoreControllerRunInfo,
|
||||
|
@ -843,6 +826,10 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
|
|||
s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupStorageLocation)
|
||||
}
|
||||
|
||||
if err := controller.NewScheduleReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.metrics).SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.Fatal(err, "unable to create controller", "controller", controller.Schedule)
|
||||
}
|
||||
|
||||
if _, ok := enabledRuntimeControllers[controller.ServerStatusRequest]; ok {
|
||||
r := controller.ServerStatusRequestReconciler{
|
||||
Scheme: s.mgr.GetScheme(),
|
||||
|
|
|
@ -18,27 +18,22 @@ package controller
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
jsonpatch "github.com/evanphx/json-patch"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/robfig/cron"
|
||||
"github.com/sirupsen/logrus"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"sigs.k8s.io/cluster-api/util/patch"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
|
||||
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
|
||||
velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
|
||||
|
@ -46,160 +41,100 @@ const (
|
|||
scheduleSyncPeriod = time.Minute
|
||||
)
|
||||
|
||||
type scheduleController struct {
|
||||
*genericController
|
||||
|
||||
namespace string
|
||||
schedulesClient velerov1client.SchedulesGetter
|
||||
backupsClient velerov1client.BackupsGetter
|
||||
schedulesLister velerov1listers.ScheduleLister
|
||||
clock clock.Clock
|
||||
metrics *metrics.ServerMetrics
|
||||
type scheduleReconciler struct {
|
||||
client.Client
|
||||
namespace string
|
||||
logger logrus.FieldLogger
|
||||
clock clock.Clock
|
||||
metrics *metrics.ServerMetrics
|
||||
}
|
||||
|
||||
func NewScheduleController(
|
||||
func NewScheduleReconciler(
|
||||
namespace string,
|
||||
schedulesClient velerov1client.SchedulesGetter,
|
||||
backupsClient velerov1client.BackupsGetter,
|
||||
schedulesInformer velerov1informers.ScheduleInformer,
|
||||
logger logrus.FieldLogger,
|
||||
client client.Client,
|
||||
metrics *metrics.ServerMetrics,
|
||||
) *scheduleController {
|
||||
c := &scheduleController{
|
||||
genericController: newGenericController(Schedule, logger),
|
||||
namespace: namespace,
|
||||
schedulesClient: schedulesClient,
|
||||
backupsClient: backupsClient,
|
||||
schedulesLister: schedulesInformer.Lister(),
|
||||
clock: clock.RealClock{},
|
||||
metrics: metrics,
|
||||
}
|
||||
|
||||
c.syncHandler = c.processSchedule
|
||||
c.resyncFunc = c.enqueueAllEnabledSchedules
|
||||
c.resyncPeriod = scheduleSyncPeriod
|
||||
|
||||
schedulesInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
schedule := obj.(*api.Schedule)
|
||||
|
||||
switch schedule.Status.Phase {
|
||||
case "", api.SchedulePhaseNew, api.SchedulePhaseEnabled:
|
||||
// add to work queue
|
||||
default:
|
||||
c.logger.WithFields(logrus.Fields{
|
||||
"schedule": kubeutil.NamespaceAndName(schedule),
|
||||
"phase": schedule.Status.Phase,
|
||||
}).Debug("Schedule is not new, skipping")
|
||||
return
|
||||
}
|
||||
|
||||
key, err := cache.MetaNamespaceKeyFunc(schedule)
|
||||
if err != nil {
|
||||
c.logger.WithError(errors.WithStack(err)).WithField("schedule", schedule).Error("Error creating queue key, item not added to queue")
|
||||
return
|
||||
}
|
||||
c.queue.Add(key)
|
||||
scheduleName := schedule.GetName()
|
||||
c.logger.Info("Creating schedule ", scheduleName)
|
||||
//Init Prometheus metrics to 0 to have them flowing up
|
||||
metrics.InitSchedule(scheduleName)
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *scheduleController) enqueueAllEnabledSchedules() {
|
||||
schedules, err := c.schedulesLister.Schedules(c.namespace).List(labels.NewSelector())
|
||||
if err != nil {
|
||||
c.logger.WithError(errors.WithStack(err)).Error("Error listing Schedules")
|
||||
return
|
||||
}
|
||||
|
||||
for _, schedule := range schedules {
|
||||
if schedule.Status.Phase != api.SchedulePhaseEnabled {
|
||||
continue
|
||||
}
|
||||
|
||||
key, err := cache.MetaNamespaceKeyFunc(schedule)
|
||||
if err != nil {
|
||||
c.logger.WithError(errors.WithStack(err)).WithField("schedule", schedule).Error("Error creating queue key, item not added to queue")
|
||||
continue
|
||||
}
|
||||
c.queue.Add(key)
|
||||
) *scheduleReconciler {
|
||||
return &scheduleReconciler{
|
||||
Client: client,
|
||||
namespace: namespace,
|
||||
logger: logger,
|
||||
clock: clock.RealClock{},
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *scheduleController) processSchedule(key string) error {
|
||||
log := c.logger.WithField("key", key)
|
||||
func (c *scheduleReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1.ScheduleList{}, scheduleSyncPeriod)
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&velerov1.Schedule{}).
|
||||
Watches(s, nil).
|
||||
Complete(c)
|
||||
}
|
||||
|
||||
log.Debug("Running processSchedule")
|
||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=schedules,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=schedules/status,verbs=get;update;patch
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=backups,verbs=create
|
||||
|
||||
func (c *scheduleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
log := c.logger.WithField("schedule", req.String())
|
||||
|
||||
log.Debug("Getting schedule")
|
||||
schedule := &velerov1.Schedule{}
|
||||
if err := c.Get(ctx, req.NamespacedName, schedule); err != nil {
|
||||
log.WithError(err).Error("error getting schedule")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
if schedule.Status.Phase != "" &&
|
||||
schedule.Status.Phase != velerov1.SchedulePhaseNew &&
|
||||
schedule.Status.Phase != velerov1.SchedulePhaseEnabled {
|
||||
log.Debugf("the schedule phase is %s, isn't %s or %s, skip", schedule.Status.Phase, velerov1.SchedulePhaseNew, velerov1.SchedulePhaseEnabled)
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
c.metrics.InitSchedule(schedule.Name)
|
||||
|
||||
patchHelper, err := patch.NewHelper(schedule, c.Client)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error splitting queue key")
|
||||
log.WithError(err).Error("error new patch helper")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
log.Debug("Getting Schedule")
|
||||
schedule, err := c.schedulesLister.Schedules(ns).Get(name)
|
||||
if err != nil {
|
||||
// schedule no longer exists
|
||||
if apierrors.IsNotFound(err) {
|
||||
log.WithError(err).Debug("Schedule not found")
|
||||
return nil
|
||||
}
|
||||
return errors.Wrap(err, "error getting Schedule")
|
||||
}
|
||||
|
||||
switch schedule.Status.Phase {
|
||||
case "", api.SchedulePhaseNew, api.SchedulePhaseEnabled:
|
||||
// valid phase for processing
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debug("Cloning schedule")
|
||||
// store ref to original for creating patch
|
||||
original := schedule
|
||||
// don't modify items in the cache
|
||||
schedule = schedule.DeepCopy()
|
||||
|
||||
// validation - even if the item is Enabled, we can't trust it
|
||||
// so re-validate
|
||||
currentPhase := schedule.Status.Phase
|
||||
|
||||
cronSchedule, errs := parseCronSchedule(schedule, c.logger)
|
||||
if len(errs) > 0 {
|
||||
schedule.Status.Phase = api.SchedulePhaseFailedValidation
|
||||
schedule.Status.Phase = velerov1.SchedulePhaseFailedValidation
|
||||
schedule.Status.ValidationErrors = errs
|
||||
} else {
|
||||
schedule.Status.Phase = api.SchedulePhaseEnabled
|
||||
schedule.Status.Phase = velerov1.SchedulePhaseEnabled
|
||||
}
|
||||
|
||||
// update status if it's changed
|
||||
if currentPhase != schedule.Status.Phase {
|
||||
updatedSchedule, err := patchSchedule(original, schedule, c.schedulesClient)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error updating Schedule phase to %s", schedule.Status.Phase)
|
||||
if err = patchHelper.Patch(ctx, schedule); err != nil {
|
||||
log.WithError(err).Errorf("error updating schedule phase to %s", schedule.Status.Phase)
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
schedule = updatedSchedule
|
||||
}
|
||||
|
||||
if schedule.Status.Phase != api.SchedulePhaseEnabled {
|
||||
return nil
|
||||
if schedule.Status.Phase != velerov1.SchedulePhaseEnabled {
|
||||
log.Debugf("the schedule's phase is %s, isn't %s, skip", schedule.Status.Phase, velerov1.SchedulePhaseEnabled)
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// check for the schedule being due to run, and submit a Backup if so
|
||||
if err := c.submitBackupIfDue(schedule, cronSchedule); err != nil {
|
||||
return err
|
||||
if err := c.submitBackupIfDue(ctx, schedule, cronSchedule); err != nil {
|
||||
log.WithError(err).Error("error running submitBackupIfDue")
|
||||
}
|
||||
|
||||
return nil
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
func parseCronSchedule(itm *api.Schedule, logger logrus.FieldLogger) (cron.Schedule, []string) {
|
||||
func parseCronSchedule(itm *velerov1.Schedule, logger logrus.FieldLogger) (cron.Schedule, []string) {
|
||||
var validationErrors []string
|
||||
var schedule cron.Schedule
|
||||
|
||||
|
@ -239,7 +174,7 @@ func parseCronSchedule(itm *api.Schedule, logger logrus.FieldLogger) (cron.Sched
|
|||
return schedule, nil
|
||||
}
|
||||
|
||||
func (c *scheduleController) submitBackupIfDue(item *api.Schedule, cronSchedule cron.Schedule) error {
|
||||
func (c *scheduleReconciler) submitBackupIfDue(ctx context.Context, item *velerov1.Schedule, cronSchedule cron.Schedule) error {
|
||||
var (
|
||||
now = c.clock.Now()
|
||||
isDue, nextRunTime = getNextRunTime(item, cronSchedule, now)
|
||||
|
@ -259,23 +194,24 @@ func (c *scheduleController) submitBackupIfDue(item *api.Schedule, cronSchedule
|
|||
// lead to performance issues).
|
||||
log.WithField("nextRunTime", nextRunTime).Info("Schedule is due, submitting Backup")
|
||||
backup := getBackup(item, now)
|
||||
if _, err := c.backupsClient.Backups(backup.Namespace).Create(context.TODO(), backup, metav1.CreateOptions{}); err != nil {
|
||||
if err := c.Create(ctx, backup); err != nil {
|
||||
return errors.Wrap(err, "error creating Backup")
|
||||
}
|
||||
|
||||
original := item
|
||||
schedule := item.DeepCopy()
|
||||
patchHelper, err := patch.NewHelper(item, c.Client)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error creating patch helper")
|
||||
}
|
||||
item.Status.LastBackup = &metav1.Time{Time: now}
|
||||
|
||||
schedule.Status.LastBackup = &metav1.Time{Time: now}
|
||||
|
||||
if _, err := patchSchedule(original, schedule, c.schedulesClient); err != nil {
|
||||
return errors.Wrapf(err, "error updating Schedule's LastBackup time to %v", schedule.Status.LastBackup)
|
||||
if err := patchHelper.Patch(ctx, item); err != nil {
|
||||
return errors.Wrapf(err, "error updating Schedule's LastBackup time to %v", item.Status.LastBackup)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getNextRunTime(schedule *api.Schedule, cronSchedule cron.Schedule, asOf time.Time) (bool, time.Time) {
|
||||
func getNextRunTime(schedule *velerov1.Schedule, cronSchedule cron.Schedule, asOf time.Time) (bool, time.Time) {
|
||||
var lastBackupTime time.Time
|
||||
if schedule.Status.LastBackup != nil {
|
||||
lastBackupTime = schedule.Status.LastBackup.Time
|
||||
|
@ -288,7 +224,7 @@ func getNextRunTime(schedule *api.Schedule, cronSchedule cron.Schedule, asOf tim
|
|||
return asOf.After(nextRunTime), nextRunTime
|
||||
}
|
||||
|
||||
func getBackup(item *api.Schedule, timestamp time.Time) *api.Backup {
|
||||
func getBackup(item *velerov1.Schedule, timestamp time.Time) *velerov1.Backup {
|
||||
name := item.TimestampedName(timestamp)
|
||||
backup := builder.
|
||||
ForBackup(item.Namespace, name).
|
||||
|
@ -297,27 +233,3 @@ func getBackup(item *api.Schedule, timestamp time.Time) *api.Backup {
|
|||
|
||||
return backup
|
||||
}
|
||||
|
||||
func patchSchedule(original, updated *api.Schedule, client velerov1client.SchedulesGetter) (*api.Schedule, error) {
|
||||
origBytes, err := json.Marshal(original)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error marshalling original schedule")
|
||||
}
|
||||
|
||||
updatedBytes, err := json.Marshal(updated)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error marshalling updated schedule")
|
||||
}
|
||||
|
||||
patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating json merge patch for schedule")
|
||||
}
|
||||
|
||||
res, err := client.Schedules(original.Namespace).Patch(context.TODO(), original.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error patching schedule")
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@ limitations under the License.
|
|||
package controller
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -25,21 +24,22 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake"
|
||||
informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions"
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
)
|
||||
|
||||
func TestProcessSchedule(t *testing.T) {
|
||||
func TestReconcileOfSchedule(t *testing.T) {
|
||||
require.Nil(t, velerov1.AddToScheme(scheme.Scheme))
|
||||
|
||||
newScheduleBuilder := func(phase velerov1api.SchedulePhase) *builder.ScheduleBuilder {
|
||||
return builder.ForSchedule("ns", "name").Phase(phase)
|
||||
}
|
||||
|
@ -49,45 +49,34 @@ func TestProcessSchedule(t *testing.T) {
|
|||
scheduleKey string
|
||||
schedule *velerov1api.Schedule
|
||||
fakeClockTime string
|
||||
expectedErr bool
|
||||
expectedPhase string
|
||||
expectedValidationErrors []string
|
||||
expectedBackupCreate *velerov1api.Backup
|
||||
expectedLastBackup string
|
||||
}{
|
||||
{
|
||||
name: "invalid key returns error",
|
||||
scheduleKey: "invalid/key/value",
|
||||
expectedErr: true,
|
||||
},
|
||||
{
|
||||
name: "missing schedule returns early without an error",
|
||||
name: "missing schedule triggers no backup",
|
||||
scheduleKey: "foo/bar",
|
||||
expectedErr: false,
|
||||
},
|
||||
{
|
||||
name: "schedule with phase FailedValidation does not get processed",
|
||||
schedule: newScheduleBuilder(velerov1api.SchedulePhaseFailedValidation).Result(),
|
||||
expectedErr: false,
|
||||
name: "schedule with phase FailedValidation triggers no backup",
|
||||
schedule: newScheduleBuilder(velerov1api.SchedulePhaseFailedValidation).Result(),
|
||||
},
|
||||
{
|
||||
name: "schedule with phase New gets validated and failed if invalid",
|
||||
schedule: newScheduleBuilder(velerov1api.SchedulePhaseNew).Result(),
|
||||
expectedErr: false,
|
||||
expectedPhase: string(velerov1api.SchedulePhaseFailedValidation),
|
||||
expectedValidationErrors: []string{"Schedule must be a non-empty valid Cron expression"},
|
||||
},
|
||||
{
|
||||
name: "schedule with phase <blank> gets validated and failed if invalid",
|
||||
schedule: newScheduleBuilder(velerov1api.SchedulePhase("")).Result(),
|
||||
expectedErr: false,
|
||||
expectedPhase: string(velerov1api.SchedulePhaseFailedValidation),
|
||||
expectedValidationErrors: []string{"Schedule must be a non-empty valid Cron expression"},
|
||||
},
|
||||
{
|
||||
name: "schedule with phase Enabled gets re-validated and failed if invalid",
|
||||
schedule: newScheduleBuilder(velerov1api.SchedulePhaseEnabled).Result(),
|
||||
expectedErr: false,
|
||||
expectedPhase: string(velerov1api.SchedulePhaseFailedValidation),
|
||||
expectedValidationErrors: []string{"Schedule must be a non-empty valid Cron expression"},
|
||||
},
|
||||
|
@ -95,7 +84,6 @@ func TestProcessSchedule(t *testing.T) {
|
|||
name: "schedule with phase New gets validated and triggers a backup",
|
||||
schedule: newScheduleBuilder(velerov1api.SchedulePhaseNew).CronSchedule("@every 5m").Result(),
|
||||
fakeClockTime: "2017-01-01 12:00:00",
|
||||
expectedErr: false,
|
||||
expectedPhase: string(velerov1api.SchedulePhaseEnabled),
|
||||
expectedBackupCreate: builder.ForBackup("ns", "name-20170101120000").ObjectMeta(builder.WithLabels(velerov1api.ScheduleNameLabel, "name")).Result(),
|
||||
expectedLastBackup: "2017-01-01 12:00:00",
|
||||
|
@ -104,7 +92,7 @@ func TestProcessSchedule(t *testing.T) {
|
|||
name: "schedule with phase Enabled gets re-validated and triggers a backup if valid",
|
||||
schedule: newScheduleBuilder(velerov1api.SchedulePhaseEnabled).CronSchedule("@every 5m").Result(),
|
||||
fakeClockTime: "2017-01-01 12:00:00",
|
||||
expectedErr: false,
|
||||
expectedPhase: string(velerov1api.SchedulePhaseEnabled),
|
||||
expectedBackupCreate: builder.ForBackup("ns", "name-20170101120000").ObjectMeta(builder.WithLabels(velerov1api.ScheduleNameLabel, "name")).Result(),
|
||||
expectedLastBackup: "2017-01-01 12:00:00",
|
||||
},
|
||||
|
@ -112,7 +100,6 @@ func TestProcessSchedule(t *testing.T) {
|
|||
name: "schedule that's already run gets LastBackup updated",
|
||||
schedule: newScheduleBuilder(velerov1api.SchedulePhaseEnabled).CronSchedule("@every 5m").LastBackupTime("2000-01-01 00:00:00").Result(),
|
||||
fakeClockTime: "2017-01-01 12:00:00",
|
||||
expectedErr: false,
|
||||
expectedBackupCreate: builder.ForBackup("ns", "name-20170101120000").ObjectMeta(builder.WithLabels(velerov1api.ScheduleNameLabel, "name")).Result(),
|
||||
expectedLastBackup: "2017-01-01 12:00:00",
|
||||
},
|
||||
|
@ -121,134 +108,48 @@ func TestProcessSchedule(t *testing.T) {
|
|||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
var (
|
||||
client = fake.NewSimpleClientset()
|
||||
sharedInformers = informers.NewSharedInformerFactory(client, 0)
|
||||
logger = velerotest.NewLogger()
|
||||
)
|
||||
|
||||
c := NewScheduleController(
|
||||
"namespace",
|
||||
client.VeleroV1(),
|
||||
client.VeleroV1(),
|
||||
sharedInformers.Velero().V1().Schedules(),
|
||||
logger,
|
||||
metrics.NewServerMetrics(),
|
||||
)
|
||||
|
||||
var (
|
||||
client = (&fake.ClientBuilder{}).Build()
|
||||
logger = velerotest.NewLogger()
|
||||
testTime time.Time
|
||||
err error
|
||||
)
|
||||
|
||||
reconciler := NewScheduleReconciler("namespace", logger, client, metrics.NewServerMetrics())
|
||||
|
||||
if test.fakeClockTime != "" {
|
||||
testTime, err = time.Parse("2006-01-02 15:04:05", test.fakeClockTime)
|
||||
require.NoError(t, err, "unable to parse test.fakeClockTime: %v", err)
|
||||
}
|
||||
c.clock = clock.NewFakeClock(testTime)
|
||||
reconciler.clock = clock.NewFakeClock(testTime)
|
||||
|
||||
if test.schedule != nil {
|
||||
sharedInformers.Velero().V1().Schedules().Informer().GetStore().Add(test.schedule)
|
||||
|
||||
// this is necessary so the Patch() call returns the appropriate object
|
||||
client.PrependReactor("patch", "schedules", func(action core.Action) (bool, runtime.Object, error) {
|
||||
var (
|
||||
patch = action.(core.PatchAction).GetPatch()
|
||||
patchMap = make(map[string]interface{})
|
||||
res = test.schedule.DeepCopy()
|
||||
)
|
||||
|
||||
if err := json.Unmarshal(patch, &patchMap); err != nil {
|
||||
t.Logf("error unmarshalling patch: %s\n", err)
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
// these are the fields that may be updated by the controller
|
||||
phase, found, err := unstructured.NestedString(patchMap, "status", "phase")
|
||||
if err == nil && found {
|
||||
res.Status.Phase = velerov1api.SchedulePhase(phase)
|
||||
}
|
||||
|
||||
lastBackupStr, found, err := unstructured.NestedString(patchMap, "status", "lastBackup")
|
||||
if err == nil && found {
|
||||
parsed, err := time.Parse(time.RFC3339, lastBackupStr)
|
||||
if err != nil {
|
||||
t.Logf("error parsing status.lastBackup: %s\n", err)
|
||||
return false, nil, err
|
||||
}
|
||||
res.Status.LastBackup = &metav1.Time{Time: parsed}
|
||||
}
|
||||
|
||||
return true, res, nil
|
||||
})
|
||||
require.Nil(t, client.Create(ctx, test.schedule))
|
||||
}
|
||||
|
||||
key := test.scheduleKey
|
||||
if key == "" && test.schedule != nil {
|
||||
key, err = cache.MetaNamespaceKeyFunc(test.schedule)
|
||||
require.NoError(t, err, "error getting key from test.schedule: %v", err)
|
||||
_, err = reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Namespace: "ns", Name: "name"}})
|
||||
require.Nil(t, err)
|
||||
|
||||
schedule := &velerov1api.Schedule{}
|
||||
err = client.Get(ctx, types.NamespacedName{"ns", "name"}, schedule)
|
||||
if len(test.expectedPhase) > 0 {
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, test.expectedPhase, string(schedule.Status.Phase))
|
||||
}
|
||||
if len(test.expectedValidationErrors) > 0 {
|
||||
require.Nil(t, err)
|
||||
assert.EqualValues(t, test.expectedValidationErrors, schedule.Status.ValidationErrors)
|
||||
}
|
||||
if len(test.expectedLastBackup) > 0 {
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, parseTime(test.expectedLastBackup).Unix(), schedule.Status.LastBackup.Unix())
|
||||
}
|
||||
|
||||
err = c.processSchedule(key)
|
||||
|
||||
assert.Equal(t, test.expectedErr, err != nil, "got error %v", err)
|
||||
|
||||
actions := client.Actions()
|
||||
index := 0
|
||||
|
||||
type PatchStatus struct {
|
||||
ValidationErrors []string `json:"validationErrors"`
|
||||
Phase velerov1api.SchedulePhase `json:"phase"`
|
||||
LastBackup time.Time `json:"lastBackup"`
|
||||
}
|
||||
|
||||
type Patch struct {
|
||||
Status PatchStatus `json:"status"`
|
||||
}
|
||||
|
||||
decode := func(decoder *json.Decoder) (interface{}, error) {
|
||||
actual := new(Patch)
|
||||
err := decoder.Decode(actual)
|
||||
|
||||
return *actual, err
|
||||
}
|
||||
|
||||
if test.expectedPhase != "" {
|
||||
require.True(t, len(actions) > index, "len(actions) is too small")
|
||||
|
||||
expected := Patch{
|
||||
Status: PatchStatus{
|
||||
ValidationErrors: test.expectedValidationErrors,
|
||||
Phase: velerov1api.SchedulePhase(test.expectedPhase),
|
||||
},
|
||||
}
|
||||
|
||||
velerotest.ValidatePatch(t, actions[index], expected, decode)
|
||||
|
||||
index++
|
||||
}
|
||||
|
||||
if created := test.expectedBackupCreate; created != nil {
|
||||
require.True(t, len(actions) > index, "len(actions) is too small")
|
||||
|
||||
action := core.NewCreateAction(
|
||||
velerov1api.SchemeGroupVersion.WithResource("backups"),
|
||||
created.Namespace,
|
||||
created)
|
||||
|
||||
assert.Equal(t, action, actions[index])
|
||||
|
||||
index++
|
||||
}
|
||||
|
||||
if test.expectedLastBackup != "" {
|
||||
require.True(t, len(actions) > index, "len(actions) is too small")
|
||||
|
||||
expected := Patch{
|
||||
Status: PatchStatus{
|
||||
LastBackup: parseTime(test.expectedLastBackup),
|
||||
},
|
||||
}
|
||||
|
||||
velerotest.ValidatePatch(t, actions[index], expected, decode)
|
||||
backups := &velerov1api.BackupList{}
|
||||
require.Nil(t, client.List(ctx, backups))
|
||||
if test.expectedBackupCreate == nil {
|
||||
assert.Equal(t, 0, len(backups.Items))
|
||||
} else {
|
||||
assert.Equal(t, 1, len(backups.Items))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
Copyright the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kube
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
)
|
||||
|
||||
func NewPeriodicalEnqueueSource(logger logrus.FieldLogger, client client.Client, objList client.ObjectList, period time.Duration) *PeriodicalEnqueueSource {
|
||||
return &PeriodicalEnqueueSource{
|
||||
logger: logger.WithField("resource", reflect.TypeOf(objList).String()),
|
||||
Client: client,
|
||||
objList: objList,
|
||||
period: period,
|
||||
}
|
||||
}
|
||||
|
||||
// PeriodicalEnqueueSource is an implementation of interface sigs.k8s.io/controller-runtime/pkg/source/Source
|
||||
// It reads the specific resources from Kubernetes/cache and enqueues them into the queue to trigger
|
||||
// the reconcile logic periodically
|
||||
type PeriodicalEnqueueSource struct {
|
||||
client.Client
|
||||
logger logrus.FieldLogger
|
||||
objList client.ObjectList
|
||||
period time.Duration
|
||||
}
|
||||
|
||||
func (p *PeriodicalEnqueueSource) Start(ctx context.Context, h handler.EventHandler, q workqueue.RateLimitingInterface, pre ...predicate.Predicate) error {
|
||||
go wait.Until(func() {
|
||||
p.logger.Debug("enqueueing resources ...")
|
||||
if err := p.List(ctx, p.objList); err != nil {
|
||||
p.logger.WithError(err).Error("error listing resources")
|
||||
return
|
||||
}
|
||||
if meta.LenList(p.objList) == 0 {
|
||||
p.logger.Debug("no resources, skip")
|
||||
return
|
||||
}
|
||||
if err := meta.EachListItem(p.objList, func(object runtime.Object) error {
|
||||
obj, ok := object.(metav1.Object)
|
||||
if !ok {
|
||||
p.logger.Error("%s's type isn't metav1.Object", object.GetObjectKind().GroupVersionKind().String())
|
||||
return nil
|
||||
}
|
||||
q.Add(ctrl.Request{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: obj.GetNamespace(),
|
||||
Name: obj.GetName(),
|
||||
},
|
||||
})
|
||||
p.logger.Debugf("resource %s/%s enqueued", obj.GetNamespace(), obj.GetName())
|
||||
return nil
|
||||
}); err != nil {
|
||||
p.logger.WithError(err).Error("error enqueueing resources")
|
||||
return
|
||||
}
|
||||
}, p.period, ctx.Done())
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
Copyright the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kube
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/context"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
)
|
||||
|
||||
func TestStart(t *testing.T) {
|
||||
require.Nil(t, velerov1.AddToScheme(scheme.Scheme))
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(context.TODO())
|
||||
client := (&fake.ClientBuilder{}).Build()
|
||||
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter())
|
||||
source := NewPeriodicalEnqueueSource(logrus.WithContext(ctx), client, &velerov1.ScheduleList{}, 1*time.Second)
|
||||
|
||||
require.Nil(t, source.Start(ctx, nil, queue))
|
||||
|
||||
// no resources
|
||||
time.Sleep(1 * time.Second)
|
||||
require.Equal(t, queue.Len(), 0)
|
||||
|
||||
// contain one resource
|
||||
require.Nil(t, client.Create(ctx, &velerov1.Schedule{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "schedule",
|
||||
},
|
||||
}))
|
||||
time.Sleep(2 * time.Second)
|
||||
require.Equal(t, queue.Len(), 1)
|
||||
|
||||
// context canceled, the enqueue source shouldn't run anymore
|
||||
item, _ := queue.Get()
|
||||
queue.Forget(item)
|
||||
require.Equal(t, queue.Len(), 0)
|
||||
cancelFunc()
|
||||
time.Sleep(2 * time.Second)
|
||||
require.Equal(t, queue.Len(), 0)
|
||||
}
|
Loading…
Reference in New Issue