keel/trigger/poll/watcher.go

303 lines
7.8 KiB
Go

package poll
import (
"context"
"fmt"
"strings"
"sync"
"github.com/keel-hq/keel/extension/credentialshelper"
"github.com/keel-hq/keel/provider"
"github.com/keel-hq/keel/registry"
"github.com/keel-hq/keel/types"
"github.com/keel-hq/keel/util/image"
"github.com/keel-hq/keel/util/version"
"github.com/rusenask/cron"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)
var registriesScannedCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "registries_scanned_total",
Help: "How many registries where checked for new images, partitioned by registry and image.",
},
[]string{"registry", "image"},
)
var pollTriggerTrackedImages = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "poll_trigger_tracked_images",
Help: "How many images are tracked by poll trigger",
},
)
func init() {
prometheus.MustRegister(registriesScannedCounter)
prometheus.MustRegister(pollTriggerTrackedImages)
}
// Watcher - generic watcher interface
type Watcher interface {
Watch(image ...*types.TrackedImage) error
Unwatch(image string) error
}
type watchDetails struct {
trackedImage *types.TrackedImage
digest string // image digest
latest string // latest tag
schedule string
mu sync.RWMutex
}
// RepositoryWatcher - repository watcher cron
type RepositoryWatcher struct {
providers provider.Providers
// registry client
registryClient registry.Client
// internal map of internal watches
// map[registry/name]=image.Reference
watched map[string]*watchDetails
cron *cron.Cron
}
// NewRepositoryWatcher - create new repository watcher
func NewRepositoryWatcher(providers provider.Providers, registryClient registry.Client) *RepositoryWatcher {
c := cron.New()
return &RepositoryWatcher{
providers: providers,
registryClient: registryClient,
watched: make(map[string]*watchDetails),
cron: c,
}
}
// Start - starts repository watcher
func (w *RepositoryWatcher) Start(ctx context.Context) {
// starting cron job
w.cron.Start()
go func() {
<-ctx.Done()
w.cron.Stop()
}()
}
func getImageIdentifier(ref *image.Reference, keepTag bool) string {
_, err := version.GetVersion(ref.Tag())
// if failed to parse version, will need to watch digest
if err != nil || keepTag == true {
return ref.Registry() + "/" + ref.ShortName() + ":" + ref.Tag()
}
return ref.Registry() + "/" + ref.ShortName()
}
// Unwatch - stop watching for changes
func (w *RepositoryWatcher) Unwatch(imageName string) error {
imageRef, err := image.Parse(imageName)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image_name": imageName,
}).Error("trigger.poll.RepositoryWatcher.Unwatch: failed to parse image")
return err
}
key := getImageIdentifier(imageRef, false)
_, ok := w.watched[key]
if ok {
w.cron.DeleteJob(key)
delete(w.watched, key)
}
return nil
}
// Watch - starts watching repository for changes, if it's already watching - ignores,
// if details changed - updates details
func (w *RepositoryWatcher) Watch(images ...*types.TrackedImage) error {
var errs []string
tracked := map[string]bool{}
for _, image := range images {
if image.Trigger != types.TriggerTypePoll {
continue
}
identifier, err := w.watch(image)
if err != nil {
errs = append(errs, err.Error())
continue
}
tracked[identifier] = true
}
pollTriggerTrackedImages.Set(float64(len(tracked)))
// removing registries that should not be tracked anymore
// for example: deployment using image X was deleted so we should not query
// registry that points to image X as nothing is using it anymore
w.unwatch(tracked)
if len(errs) > 0 {
return fmt.Errorf("encountered errors while adding images: %s", strings.Join(errs, ", "))
}
return nil
}
func (w *RepositoryWatcher) unwatch(tracked map[string]bool) {
for key, details := range w.watched {
if !tracked[key] {
log.WithFields(log.Fields{
"job_name": key,
"image": details.trackedImage.String(),
"schedule": details.schedule,
}).Info("trigger.poll.RepositoryWatcher: image no longer tracked, removing watcher")
w.cron.DeleteJob(key)
delete(w.watched, key)
}
}
}
func (w *RepositoryWatcher) watch(image *types.TrackedImage) (string, error) {
if image.PollSchedule == "" {
return "", fmt.Errorf("cron schedule cannot be empty")
}
_, err := cron.Parse(image.PollSchedule)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image": image.String(),
"schedule": image.PollSchedule,
}).Error("trigger.poll.RepositoryWatcher.addJob: invalid cron schedule")
return "", fmt.Errorf("invalid cron schedule: %s", err)
}
keepTag := image.Policy != nil && image.Policy.Name() == "force"
key := getImageIdentifier(image.Image, keepTag)
// checking whether it's already being watched
details, ok := w.watched[key]
if !ok {
// err = w.addJob(imageRef, registryUsername, registryPassword, schedule)
err = w.addJob(image, image.PollSchedule)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image": image.String(),
}).Error("trigger.poll.RepositoryWatcher.Watch: failed to add image watch job")
return "", err
}
return key, nil
}
// checking schedule
if details.schedule != image.PollSchedule {
err := w.cron.UpdateJob(key, image.PollSchedule)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image": image.String(),
}).Error("trigger.poll.RepositoryWatcher.Watch: failed to update image watch job")
}
}
details.mu.Lock()
details.trackedImage = image
// setting main latest version to the lowest from the tracked
details.latest = version.Lowest(details.trackedImage.Tags)
details.mu.Unlock()
// nothing to do
return key, nil
}
func (w *RepositoryWatcher) addJob(ti *types.TrackedImage, schedule string) error {
// getting initial digest
reg := ti.Image.Scheme() + "://" + ti.Image.Registry()
registryOpts := registry.Opts{
Registry: reg,
Name: ti.Image.ShortName(),
Tag: ti.Image.Tag(),
}
creds, err := credentialshelper.GetCredentials(ti)
if err == nil {
registryOpts.Username = creds.Username
registryOpts.Password = creds.Password
}
digest, err := w.registryClient.Digest(registryOpts)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image": ti.Image.String(),
"username": registryOpts.Username,
"password": strings.Repeat("*", len(registryOpts.Password)),
}).Error("trigger.poll.RepositoryWatcher.addJob: failed to get image digest")
return err
}
keepTag := ti.Policy != nil && ti.Policy.Name() == "force"
key := getImageIdentifier(ti.Image, keepTag)
details := &watchDetails{
trackedImage: ti,
digest: digest, // current image digest
latest: ti.Image.Tag(),
schedule: schedule,
}
// adding job to internal map
w.watched[key] = details
// checking tag type:
// - for versioned (semver) tags:
// - we setup a watch all tags job (default)
// - if "force" to follow a floating tag, a single tag watcher is
// setup, which checks digest
// - for non-semver types we create a single tag watcher which
// checks digest
_, err = version.GetVersion(ti.Image.Tag())
if err != nil || keepTag == true {
// adding new job
job := NewWatchTagJob(w.providers, w.registryClient, details)
log.WithFields(log.Fields{
"job_name": key,
"image": ti.Image.String(),
"digest": digest,
"schedule": schedule,
}).Info("trigger.poll.RepositoryWatcher: new watch tag digest job added")
// running it now
job.Run()
return w.cron.AddJob(key, schedule, job)
}
// adding new job
job := NewWatchRepositoryTagsJob(w.providers, w.registryClient, details)
log.WithFields(log.Fields{
"job_name": key,
"image": ti.Image.String(),
"digest": digest,
"schedule": schedule,
}).Info("trigger.poll.RepositoryWatcher: new watch repository tags job added")
// running it now
job.Run()
return w.cron.AddJob(key, schedule, job)
}