303 lines
7.8 KiB
Go
303 lines
7.8 KiB
Go
package poll
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/keel-hq/keel/extension/credentialshelper"
|
|
"github.com/keel-hq/keel/provider"
|
|
"github.com/keel-hq/keel/registry"
|
|
"github.com/keel-hq/keel/types"
|
|
"github.com/keel-hq/keel/util/image"
|
|
"github.com/keel-hq/keel/util/version"
|
|
"github.com/rusenask/cron"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
var registriesScannedCounter = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "registries_scanned_total",
|
|
Help: "How many registries where checked for new images, partitioned by registry and image.",
|
|
},
|
|
[]string{"registry", "image"},
|
|
)
|
|
|
|
var pollTriggerTrackedImages = prometheus.NewGauge(
|
|
prometheus.GaugeOpts{
|
|
Name: "poll_trigger_tracked_images",
|
|
Help: "How many images are tracked by poll trigger",
|
|
},
|
|
)
|
|
|
|
func init() {
|
|
prometheus.MustRegister(registriesScannedCounter)
|
|
prometheus.MustRegister(pollTriggerTrackedImages)
|
|
}
|
|
|
|
// Watcher - generic watcher interface
|
|
type Watcher interface {
|
|
Watch(image ...*types.TrackedImage) error
|
|
Unwatch(image string) error
|
|
}
|
|
|
|
type watchDetails struct {
|
|
trackedImage *types.TrackedImage
|
|
digest string // image digest
|
|
latest string // latest tag
|
|
schedule string
|
|
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// RepositoryWatcher - repository watcher cron
|
|
type RepositoryWatcher struct {
|
|
providers provider.Providers
|
|
|
|
// registry client
|
|
registryClient registry.Client
|
|
|
|
// internal map of internal watches
|
|
// map[registry/name]=image.Reference
|
|
watched map[string]*watchDetails
|
|
|
|
cron *cron.Cron
|
|
}
|
|
|
|
// NewRepositoryWatcher - create new repository watcher
|
|
func NewRepositoryWatcher(providers provider.Providers, registryClient registry.Client) *RepositoryWatcher {
|
|
c := cron.New()
|
|
|
|
return &RepositoryWatcher{
|
|
providers: providers,
|
|
registryClient: registryClient,
|
|
watched: make(map[string]*watchDetails),
|
|
cron: c,
|
|
}
|
|
}
|
|
|
|
// Start - starts repository watcher
|
|
func (w *RepositoryWatcher) Start(ctx context.Context) {
|
|
// starting cron job
|
|
w.cron.Start()
|
|
go func() {
|
|
<-ctx.Done()
|
|
w.cron.Stop()
|
|
}()
|
|
}
|
|
|
|
func getImageIdentifier(ref *image.Reference, keepTag bool) string {
|
|
_, err := version.GetVersion(ref.Tag())
|
|
// if failed to parse version, will need to watch digest
|
|
if err != nil || keepTag == true {
|
|
return ref.Registry() + "/" + ref.ShortName() + ":" + ref.Tag()
|
|
}
|
|
|
|
return ref.Registry() + "/" + ref.ShortName()
|
|
}
|
|
|
|
// Unwatch - stop watching for changes
|
|
func (w *RepositoryWatcher) Unwatch(imageName string) error {
|
|
imageRef, err := image.Parse(imageName)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"image_name": imageName,
|
|
}).Error("trigger.poll.RepositoryWatcher.Unwatch: failed to parse image")
|
|
return err
|
|
}
|
|
key := getImageIdentifier(imageRef, false)
|
|
_, ok := w.watched[key]
|
|
if ok {
|
|
w.cron.DeleteJob(key)
|
|
delete(w.watched, key)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Watch - starts watching repository for changes, if it's already watching - ignores,
|
|
// if details changed - updates details
|
|
func (w *RepositoryWatcher) Watch(images ...*types.TrackedImage) error {
|
|
|
|
var errs []string
|
|
tracked := map[string]bool{}
|
|
|
|
for _, image := range images {
|
|
if image.Trigger != types.TriggerTypePoll {
|
|
continue
|
|
}
|
|
identifier, err := w.watch(image)
|
|
if err != nil {
|
|
errs = append(errs, err.Error())
|
|
continue
|
|
}
|
|
tracked[identifier] = true
|
|
}
|
|
|
|
pollTriggerTrackedImages.Set(float64(len(tracked)))
|
|
|
|
// removing registries that should not be tracked anymore
|
|
// for example: deployment using image X was deleted so we should not query
|
|
// registry that points to image X as nothing is using it anymore
|
|
w.unwatch(tracked)
|
|
|
|
if len(errs) > 0 {
|
|
return fmt.Errorf("encountered errors while adding images: %s", strings.Join(errs, ", "))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *RepositoryWatcher) unwatch(tracked map[string]bool) {
|
|
for key, details := range w.watched {
|
|
if !tracked[key] {
|
|
log.WithFields(log.Fields{
|
|
"job_name": key,
|
|
"image": details.trackedImage.String(),
|
|
"schedule": details.schedule,
|
|
}).Info("trigger.poll.RepositoryWatcher: image no longer tracked, removing watcher")
|
|
w.cron.DeleteJob(key)
|
|
delete(w.watched, key)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *RepositoryWatcher) watch(image *types.TrackedImage) (string, error) {
|
|
|
|
if image.PollSchedule == "" {
|
|
return "", fmt.Errorf("cron schedule cannot be empty")
|
|
}
|
|
|
|
_, err := cron.Parse(image.PollSchedule)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"image": image.String(),
|
|
"schedule": image.PollSchedule,
|
|
}).Error("trigger.poll.RepositoryWatcher.addJob: invalid cron schedule")
|
|
return "", fmt.Errorf("invalid cron schedule: %s", err)
|
|
}
|
|
|
|
keepTag := image.Policy != nil && image.Policy.Name() == "force"
|
|
key := getImageIdentifier(image.Image, keepTag)
|
|
|
|
// checking whether it's already being watched
|
|
details, ok := w.watched[key]
|
|
if !ok {
|
|
// err = w.addJob(imageRef, registryUsername, registryPassword, schedule)
|
|
err = w.addJob(image, image.PollSchedule)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"image": image.String(),
|
|
}).Error("trigger.poll.RepositoryWatcher.Watch: failed to add image watch job")
|
|
return "", err
|
|
}
|
|
return key, nil
|
|
}
|
|
|
|
// checking schedule
|
|
if details.schedule != image.PollSchedule {
|
|
err := w.cron.UpdateJob(key, image.PollSchedule)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"image": image.String(),
|
|
}).Error("trigger.poll.RepositoryWatcher.Watch: failed to update image watch job")
|
|
}
|
|
}
|
|
|
|
details.mu.Lock()
|
|
details.trackedImage = image
|
|
// setting main latest version to the lowest from the tracked
|
|
details.latest = version.Lowest(details.trackedImage.Tags)
|
|
details.mu.Unlock()
|
|
|
|
// nothing to do
|
|
return key, nil
|
|
}
|
|
|
|
func (w *RepositoryWatcher) addJob(ti *types.TrackedImage, schedule string) error {
|
|
// getting initial digest
|
|
reg := ti.Image.Scheme() + "://" + ti.Image.Registry()
|
|
|
|
registryOpts := registry.Opts{
|
|
Registry: reg,
|
|
Name: ti.Image.ShortName(),
|
|
Tag: ti.Image.Tag(),
|
|
}
|
|
|
|
creds, err := credentialshelper.GetCredentials(ti)
|
|
if err == nil {
|
|
registryOpts.Username = creds.Username
|
|
registryOpts.Password = creds.Password
|
|
}
|
|
|
|
digest, err := w.registryClient.Digest(registryOpts)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"image": ti.Image.String(),
|
|
"username": registryOpts.Username,
|
|
"password": strings.Repeat("*", len(registryOpts.Password)),
|
|
}).Error("trigger.poll.RepositoryWatcher.addJob: failed to get image digest")
|
|
return err
|
|
}
|
|
|
|
keepTag := ti.Policy != nil && ti.Policy.Name() == "force"
|
|
key := getImageIdentifier(ti.Image, keepTag)
|
|
details := &watchDetails{
|
|
trackedImage: ti,
|
|
digest: digest, // current image digest
|
|
latest: ti.Image.Tag(),
|
|
schedule: schedule,
|
|
}
|
|
|
|
// adding job to internal map
|
|
w.watched[key] = details
|
|
|
|
// checking tag type:
|
|
// - for versioned (semver) tags:
|
|
// - we setup a watch all tags job (default)
|
|
// - if "force" to follow a floating tag, a single tag watcher is
|
|
// setup, which checks digest
|
|
// - for non-semver types we create a single tag watcher which
|
|
// checks digest
|
|
_, err = version.GetVersion(ti.Image.Tag())
|
|
if err != nil || keepTag == true {
|
|
// adding new job
|
|
job := NewWatchTagJob(w.providers, w.registryClient, details)
|
|
log.WithFields(log.Fields{
|
|
"job_name": key,
|
|
"image": ti.Image.String(),
|
|
"digest": digest,
|
|
"schedule": schedule,
|
|
}).Info("trigger.poll.RepositoryWatcher: new watch tag digest job added")
|
|
|
|
// running it now
|
|
job.Run()
|
|
|
|
return w.cron.AddJob(key, schedule, job)
|
|
}
|
|
|
|
// adding new job
|
|
job := NewWatchRepositoryTagsJob(w.providers, w.registryClient, details)
|
|
log.WithFields(log.Fields{
|
|
"job_name": key,
|
|
"image": ti.Image.String(),
|
|
"digest": digest,
|
|
"schedule": schedule,
|
|
}).Info("trigger.poll.RepositoryWatcher: new watch repository tags job added")
|
|
|
|
// running it now
|
|
job.Run()
|
|
|
|
return w.cron.AddJob(key, schedule, job)
|
|
|
|
}
|