2017-08-02 17:27:17 +00:00
|
|
|
/*
|
2018-01-02 18:51:49 +00:00
|
|
|
Copyright 2017 the Heptio Ark contributors.
|
2017-08-02 17:27:17 +00:00
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package controller
|
|
|
|
|
|
|
|
import (
|
2017-12-11 22:10:52 +00:00
|
|
|
"encoding/json"
|
2017-08-02 17:27:17 +00:00
|
|
|
"fmt"
|
|
|
|
"time"
|
|
|
|
|
2018-05-14 21:34:24 +00:00
|
|
|
jsonpatch "github.com/evanphx/json-patch"
|
2017-09-14 21:27:31 +00:00
|
|
|
"github.com/pkg/errors"
|
2017-08-02 17:27:17 +00:00
|
|
|
"github.com/robfig/cron"
|
2017-09-14 21:27:31 +00:00
|
|
|
"github.com/sirupsen/logrus"
|
2017-08-02 17:27:17 +00:00
|
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
|
"k8s.io/apimachinery/pkg/labels"
|
2017-12-11 22:10:52 +00:00
|
|
|
"k8s.io/apimachinery/pkg/types"
|
2017-08-02 17:27:17 +00:00
|
|
|
"k8s.io/apimachinery/pkg/util/clock"
|
|
|
|
"k8s.io/client-go/tools/cache"
|
|
|
|
|
|
|
|
api "github.com/heptio/ark/pkg/apis/ark/v1"
|
2017-10-25 16:42:03 +00:00
|
|
|
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
|
2017-08-02 17:27:17 +00:00
|
|
|
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
|
|
|
|
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
|
2018-07-20 13:03:44 +00:00
|
|
|
"github.com/heptio/ark/pkg/metrics"
|
2017-09-14 21:27:31 +00:00
|
|
|
kubeutil "github.com/heptio/ark/pkg/util/kube"
|
2017-08-02 17:27:17 +00:00
|
|
|
)
|
|
|
|
|
2018-08-09 16:54:48 +00:00
|
|
|
const (
|
|
|
|
scheduleSyncPeriod = time.Minute
|
|
|
|
)
|
|
|
|
|
2017-08-02 17:27:17 +00:00
|
|
|
type scheduleController struct {
|
2018-08-29 19:52:09 +00:00
|
|
|
*genericController
|
|
|
|
|
|
|
|
namespace string
|
|
|
|
schedulesClient arkv1client.SchedulesGetter
|
|
|
|
backupsClient arkv1client.BackupsGetter
|
|
|
|
schedulesLister listers.ScheduleLister
|
|
|
|
clock clock.Clock
|
|
|
|
metrics *metrics.ServerMetrics
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewScheduleController(
|
2017-12-22 14:43:44 +00:00
|
|
|
namespace string,
|
2017-08-02 17:27:17 +00:00
|
|
|
schedulesClient arkv1client.SchedulesGetter,
|
|
|
|
backupsClient arkv1client.BackupsGetter,
|
|
|
|
schedulesInformer informers.ScheduleInformer,
|
2017-12-11 22:10:52 +00:00
|
|
|
logger logrus.FieldLogger,
|
2018-07-20 13:03:44 +00:00
|
|
|
metrics *metrics.ServerMetrics,
|
2017-08-02 17:27:17 +00:00
|
|
|
) *scheduleController {
|
|
|
|
c := &scheduleController{
|
2018-08-29 19:52:09 +00:00
|
|
|
genericController: newGenericController("schedule", logger),
|
|
|
|
namespace: namespace,
|
|
|
|
schedulesClient: schedulesClient,
|
|
|
|
backupsClient: backupsClient,
|
|
|
|
schedulesLister: schedulesInformer.Lister(),
|
|
|
|
clock: clock.RealClock{},
|
|
|
|
metrics: metrics,
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
c.syncHandler = c.processSchedule
|
2018-08-29 19:52:09 +00:00
|
|
|
c.cacheSyncWaiters = append(c.cacheSyncWaiters, schedulesInformer.Informer().HasSynced)
|
|
|
|
c.resyncFunc = c.enqueueAllEnabledSchedules
|
|
|
|
c.resyncPeriod = scheduleSyncPeriod
|
2017-08-02 17:27:17 +00:00
|
|
|
|
|
|
|
schedulesInformer.Informer().AddEventHandler(
|
|
|
|
cache.ResourceEventHandlerFuncs{
|
|
|
|
AddFunc: func(obj interface{}) {
|
|
|
|
schedule := obj.(*api.Schedule)
|
|
|
|
|
|
|
|
switch schedule.Status.Phase {
|
|
|
|
case "", api.SchedulePhaseNew, api.SchedulePhaseEnabled:
|
|
|
|
// add to work queue
|
|
|
|
default:
|
2017-09-14 21:27:31 +00:00
|
|
|
c.logger.WithFields(logrus.Fields{
|
|
|
|
"schedule": kubeutil.NamespaceAndName(schedule),
|
|
|
|
"phase": schedule.Status.Phase,
|
|
|
|
}).Debug("Schedule is not new, skipping")
|
2017-08-02 17:27:17 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
key, err := cache.MetaNamespaceKeyFunc(schedule)
|
|
|
|
if err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
c.logger.WithError(errors.WithStack(err)).WithField("schedule", schedule).Error("Error creating queue key, item not added to queue")
|
2017-08-02 17:27:17 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
c.queue.Add(key)
|
2018-07-20 13:03:44 +00:00
|
|
|
scheduleName := schedule.GetName()
|
|
|
|
c.logger.Info("Creating schedule ", scheduleName)
|
|
|
|
//Init Prometheus metrics to 0 to have them flowing up
|
|
|
|
metrics.InitSchedule(scheduleName)
|
2017-08-02 17:27:17 +00:00
|
|
|
},
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
2018-08-29 19:52:09 +00:00
|
|
|
func (c *scheduleController) enqueueAllEnabledSchedules() {
|
|
|
|
schedules, err := c.schedulesLister.Schedules(c.namespace).List(labels.NewSelector())
|
2017-08-02 17:27:17 +00:00
|
|
|
if err != nil {
|
2018-08-29 19:52:09 +00:00
|
|
|
c.logger.WithError(errors.WithStack(err)).Error("Error listing Schedules")
|
2017-08-02 17:27:17 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, schedule := range schedules {
|
|
|
|
if schedule.Status.Phase != api.SchedulePhaseEnabled {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
key, err := cache.MetaNamespaceKeyFunc(schedule)
|
|
|
|
if err != nil {
|
2018-08-29 19:52:09 +00:00
|
|
|
c.logger.WithError(errors.WithStack(err)).WithField("schedule", schedule).Error("Error creating queue key, item not added to queue")
|
2017-08-02 17:27:17 +00:00
|
|
|
continue
|
|
|
|
}
|
2018-08-29 19:52:09 +00:00
|
|
|
c.queue.Add(key)
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-08-29 19:52:09 +00:00
|
|
|
func (c *scheduleController) processSchedule(key string) error {
|
|
|
|
log := c.logger.WithField("key", key)
|
2017-09-14 21:27:31 +00:00
|
|
|
|
2018-08-29 19:52:09 +00:00
|
|
|
log.Debug("Running processSchedule")
|
2017-08-02 17:27:17 +00:00
|
|
|
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
|
|
|
if err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
return errors.Wrap(err, "error splitting queue key")
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
|
2018-08-29 19:52:09 +00:00
|
|
|
log.Debug("Getting Schedule")
|
|
|
|
schedule, err := c.schedulesLister.Schedules(ns).Get(name)
|
2017-08-02 17:27:17 +00:00
|
|
|
if err != nil {
|
|
|
|
// schedule no longer exists
|
|
|
|
if apierrors.IsNotFound(err) {
|
2018-08-29 19:52:09 +00:00
|
|
|
log.WithError(err).Debug("Schedule not found")
|
2017-08-02 17:27:17 +00:00
|
|
|
return nil
|
|
|
|
}
|
2017-09-14 21:27:31 +00:00
|
|
|
return errors.Wrap(err, "error getting Schedule")
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
switch schedule.Status.Phase {
|
|
|
|
case "", api.SchedulePhaseNew, api.SchedulePhaseEnabled:
|
|
|
|
// valid phase for processing
|
|
|
|
default:
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-08-29 19:52:09 +00:00
|
|
|
log.Debug("Cloning schedule")
|
2017-12-11 22:10:52 +00:00
|
|
|
// store ref to original for creating patch
|
|
|
|
original := schedule
|
2017-08-02 17:27:17 +00:00
|
|
|
// don't modify items in the cache
|
2017-10-25 16:57:40 +00:00
|
|
|
schedule = schedule.DeepCopy()
|
2017-08-02 17:27:17 +00:00
|
|
|
|
|
|
|
// validation - even if the item is Enabled, we can't trust it
|
|
|
|
// so re-validate
|
|
|
|
currentPhase := schedule.Status.Phase
|
|
|
|
|
2018-08-29 19:52:09 +00:00
|
|
|
cronSchedule, errs := parseCronSchedule(schedule, c.logger)
|
2017-08-02 17:27:17 +00:00
|
|
|
if len(errs) > 0 {
|
|
|
|
schedule.Status.Phase = api.SchedulePhaseFailedValidation
|
|
|
|
schedule.Status.ValidationErrors = errs
|
|
|
|
} else {
|
|
|
|
schedule.Status.Phase = api.SchedulePhaseEnabled
|
|
|
|
}
|
|
|
|
|
|
|
|
// update status if it's changed
|
|
|
|
if currentPhase != schedule.Status.Phase {
|
2018-08-29 19:52:09 +00:00
|
|
|
updatedSchedule, err := patchSchedule(original, schedule, c.schedulesClient)
|
2017-08-02 17:27:17 +00:00
|
|
|
if err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
return errors.Wrapf(err, "error updating Schedule phase to %s", schedule.Status.Phase)
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
schedule = updatedSchedule
|
|
|
|
}
|
|
|
|
|
|
|
|
if schedule.Status.Phase != api.SchedulePhaseEnabled {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// check for the schedule being due to run, and submit a Backup if so
|
2018-08-29 19:52:09 +00:00
|
|
|
if err := c.submitBackupIfDue(schedule, cronSchedule); err != nil {
|
2017-08-02 17:27:17 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-12-11 22:10:52 +00:00
|
|
|
func parseCronSchedule(itm *api.Schedule, logger logrus.FieldLogger) (cron.Schedule, []string) {
|
2017-08-02 17:27:17 +00:00
|
|
|
var validationErrors []string
|
|
|
|
var schedule cron.Schedule
|
|
|
|
|
|
|
|
// cron.Parse panics if schedule is empty
|
|
|
|
if len(itm.Spec.Schedule) == 0 {
|
|
|
|
validationErrors = append(validationErrors, "Schedule must be a non-empty valid Cron expression")
|
|
|
|
return nil, validationErrors
|
|
|
|
}
|
|
|
|
|
2018-08-29 19:52:09 +00:00
|
|
|
log := logger.WithField("schedule", kubeutil.NamespaceAndName(itm))
|
2017-09-14 21:27:31 +00:00
|
|
|
|
2017-08-02 17:27:17 +00:00
|
|
|
// adding a recover() around cron.Parse because it panics on empty string and is possible
|
|
|
|
// that it panics under other scenarios as well.
|
|
|
|
func() {
|
|
|
|
defer func() {
|
|
|
|
if r := recover(); r != nil {
|
2018-08-29 19:52:09 +00:00
|
|
|
log.WithFields(logrus.Fields{
|
2017-09-14 21:27:31 +00:00
|
|
|
"schedule": itm.Spec.Schedule,
|
|
|
|
"recover": r,
|
|
|
|
}).Debug("Panic parsing schedule")
|
2017-08-02 17:27:17 +00:00
|
|
|
validationErrors = append(validationErrors, fmt.Sprintf("invalid schedule: %v", r))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2017-08-10 16:44:00 +00:00
|
|
|
if res, err := cron.ParseStandard(itm.Spec.Schedule); err != nil {
|
2018-08-29 19:52:09 +00:00
|
|
|
log.WithError(errors.WithStack(err)).WithField("schedule", itm.Spec.Schedule).Debug("Error parsing schedule")
|
2017-08-02 17:27:17 +00:00
|
|
|
validationErrors = append(validationErrors, fmt.Sprintf("invalid schedule: %v", err))
|
|
|
|
} else {
|
|
|
|
schedule = res
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
if len(validationErrors) > 0 {
|
|
|
|
return nil, validationErrors
|
|
|
|
}
|
|
|
|
|
|
|
|
return schedule, nil
|
|
|
|
}
|
|
|
|
|
2018-08-29 19:52:09 +00:00
|
|
|
func (c *scheduleController) submitBackupIfDue(item *api.Schedule, cronSchedule cron.Schedule) error {
|
2017-09-14 21:27:31 +00:00
|
|
|
var (
|
2018-08-29 19:52:09 +00:00
|
|
|
now = c.clock.Now()
|
2017-09-14 21:27:31 +00:00
|
|
|
isDue, nextRunTime = getNextRunTime(item, cronSchedule, now)
|
2018-08-29 19:52:09 +00:00
|
|
|
log = c.logger.WithField("schedule", kubeutil.NamespaceAndName(item))
|
2017-09-14 21:27:31 +00:00
|
|
|
)
|
2017-08-02 17:27:17 +00:00
|
|
|
|
|
|
|
if !isDue {
|
2018-08-29 19:52:09 +00:00
|
|
|
log.WithField("nextRunTime", nextRunTime).Info("Schedule is not due, skipping")
|
2017-08-02 17:27:17 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Don't attempt to "catch up" if there are any missed or failed runs - simply
|
|
|
|
// trigger a Backup if it's time.
|
|
|
|
//
|
|
|
|
// It might also make sense in the future to explicitly check for currently-running
|
|
|
|
// backups so that we don't overlap runs (for disk snapshots in particular, this can
|
|
|
|
// lead to performance issues).
|
2018-08-29 19:52:09 +00:00
|
|
|
log.WithField("nextRunTime", nextRunTime).Info("Schedule is due, submitting Backup")
|
2017-08-02 17:27:17 +00:00
|
|
|
backup := getBackup(item, now)
|
2018-08-29 19:52:09 +00:00
|
|
|
if _, err := c.backupsClient.Backups(backup.Namespace).Create(backup); err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
return errors.Wrap(err, "error creating Backup")
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
|
2017-12-11 22:10:52 +00:00
|
|
|
original := item
|
2017-10-25 16:57:40 +00:00
|
|
|
schedule := item.DeepCopy()
|
2017-08-02 17:27:17 +00:00
|
|
|
|
|
|
|
schedule.Status.LastBackup = metav1.NewTime(now)
|
|
|
|
|
2018-08-29 19:52:09 +00:00
|
|
|
if _, err := patchSchedule(original, schedule, c.schedulesClient); err != nil {
|
2017-09-14 21:27:31 +00:00
|
|
|
return errors.Wrapf(err, "error updating Schedule's LastBackup time to %v", schedule.Status.LastBackup)
|
2017-08-02 17:27:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func getNextRunTime(schedule *api.Schedule, cronSchedule cron.Schedule, asOf time.Time) (bool, time.Time) {
|
|
|
|
// get the latest run time (if the schedule hasn't run yet, this will be the zero value which will trigger
|
|
|
|
// an immediate backup)
|
|
|
|
lastBackupTime := schedule.Status.LastBackup.Time
|
|
|
|
|
|
|
|
nextRunTime := cronSchedule.Next(lastBackupTime)
|
|
|
|
|
|
|
|
return asOf.After(nextRunTime), nextRunTime
|
|
|
|
}
|
|
|
|
|
|
|
|
func getBackup(item *api.Schedule, timestamp time.Time) *api.Backup {
|
|
|
|
backup := &api.Backup{
|
|
|
|
Spec: item.Spec.Template,
|
|
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
|
|
Namespace: item.Namespace,
|
|
|
|
Name: fmt.Sprintf("%s-%s", item.Name, timestamp.Format("20060102150405")),
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2018-09-03 13:01:03 +00:00
|
|
|
// add schedule labels and 'ark-schedule' label to the backup
|
|
|
|
addLabelsToBackup(item, backup)
|
|
|
|
|
2017-08-02 17:27:17 +00:00
|
|
|
return backup
|
|
|
|
}
|
2017-12-11 22:10:52 +00:00
|
|
|
|
2018-09-03 13:01:03 +00:00
|
|
|
func addLabelsToBackup(item *api.Schedule, backup *api.Backup) {
|
|
|
|
labels := item.Labels
|
|
|
|
if labels == nil {
|
|
|
|
labels = make(map[string]string)
|
|
|
|
}
|
|
|
|
labels["ark-schedule"] = item.Name
|
|
|
|
|
|
|
|
backup.Labels = labels
|
|
|
|
}
|
|
|
|
|
2017-12-11 22:10:52 +00:00
|
|
|
func patchSchedule(original, updated *api.Schedule, client arkv1client.SchedulesGetter) (*api.Schedule, error) {
|
|
|
|
origBytes, err := json.Marshal(original)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "error marshalling original schedule")
|
|
|
|
}
|
|
|
|
|
|
|
|
updatedBytes, err := json.Marshal(updated)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "error marshalling updated schedule")
|
|
|
|
}
|
|
|
|
|
2018-05-14 21:34:24 +00:00
|
|
|
patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes)
|
2017-12-11 22:10:52 +00:00
|
|
|
if err != nil {
|
2018-05-14 21:34:24 +00:00
|
|
|
return nil, errors.Wrap(err, "error creating json merge patch for schedule")
|
2017-12-11 22:10:52 +00:00
|
|
|
}
|
|
|
|
|
2017-12-22 14:43:44 +00:00
|
|
|
res, err := client.Schedules(original.Namespace).Patch(original.Name, types.MergePatchType, patchBytes)
|
2017-12-11 22:10:52 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "error patching schedule")
|
|
|
|
}
|
|
|
|
|
|
|
|
return res, nil
|
|
|
|
}
|