keel/trigger/poll/watcher.go

365 lines
9.9 KiB
Go
Raw Normal View History

2017-07-01 12:55:26 +00:00
package poll
import (
2017-07-02 12:31:11 +00:00
"context"
2017-07-20 19:39:02 +00:00
"fmt"
2017-07-02 12:31:11 +00:00
2017-07-01 12:55:26 +00:00
"github.com/rusenask/cron"
"github.com/rusenask/keel/provider"
2017-07-02 12:31:11 +00:00
"github.com/rusenask/keel/registry"
2017-07-01 12:55:26 +00:00
"github.com/rusenask/keel/types"
2017-07-02 19:10:07 +00:00
"github.com/rusenask/keel/util/image"
2017-07-16 19:45:38 +00:00
"github.com/rusenask/keel/util/version"
2017-07-02 12:31:11 +00:00
log "github.com/Sirupsen/logrus"
2017-07-01 12:55:26 +00:00
)
2017-08-08 20:47:29 +00:00
// Watcher - generic watcher interface
2017-07-01 12:55:26 +00:00
type Watcher interface {
2017-07-02 12:31:11 +00:00
Watch(imageName, registryUsername, registryPassword, schedule string) error
2017-07-02 07:29:15 +00:00
Unwatch(image string) error
2017-07-01 12:55:26 +00:00
}
2017-07-02 12:31:11 +00:00
type watchDetails struct {
imageRef *image.Reference
registryUsername string // "" for anonymous
registryPassword string // "" for anonymous
digest string // image digest
2017-07-16 19:45:38 +00:00
latest string // latest tag
2017-07-02 12:31:11 +00:00
schedule string
}
// RepositoryWatcher - repository watcher cron
2017-07-01 12:55:26 +00:00
type RepositoryWatcher struct {
providers provider.Providers
2017-07-02 12:31:11 +00:00
// registry client
registryClient registry.Client
// internal map of internal watches
// map[registry/name]=image.Reference
2017-07-02 19:10:07 +00:00
watched map[string]*watchDetails
2017-07-02 12:31:11 +00:00
2017-07-01 12:55:26 +00:00
cron *cron.Cron
}
2017-07-02 12:31:01 +00:00
2017-07-02 12:31:11 +00:00
// NewRepositoryWatcher - create new repository watcher
func NewRepositoryWatcher(providers provider.Providers, registryClient registry.Client) *RepositoryWatcher {
c := cron.New()
return &RepositoryWatcher{
providers: providers,
registryClient: registryClient,
2017-07-02 19:10:07 +00:00
watched: make(map[string]*watchDetails),
2017-07-02 12:31:11 +00:00
cron: c,
}
}
2017-07-16 19:45:38 +00:00
// Start - starts repository watcher
2017-07-02 12:31:11 +00:00
func (w *RepositoryWatcher) Start(ctx context.Context) {
// starting cron job
w.cron.Start()
go func() {
for {
select {
case <-ctx.Done():
w.cron.Stop()
}
}
}()
}
func getImageIdentifier(ref *image.Reference) string {
return ref.Registry() + "/" + ref.ShortName()
}
// Unwatch - stop watching for changes
func (w *RepositoryWatcher) Unwatch(imageName string) error {
imageRef, err := image.Parse(imageName)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image_name": imageName,
}).Error("trigger.poll.RepositoryWatcher.Unwatch: failed to parse image")
return err
}
key := getImageIdentifier(imageRef)
_, ok := w.watched[key]
if ok {
w.cron.DeleteJob(key)
2017-07-02 19:10:07 +00:00
delete(w.watched, key)
2017-07-02 12:31:11 +00:00
}
return nil
}
2017-07-02 12:31:01 +00:00
// Watch - starts watching repository for changes, if it's already watching - ignores,
// if details changed - updates details
func (w *RepositoryWatcher) Watch(imageName, schedule, registryUsername, registryPassword string) error {
2017-07-20 19:39:02 +00:00
if schedule == "" {
return fmt.Errorf("cron schedule cannot be empty")
}
_, err := cron.Parse(schedule)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image": imageName,
"schedule": schedule,
}).Error("trigger.poll.RepositoryWatcher.addJob: invalid cron schedule")
return fmt.Errorf("invalid cron schedule: %s", err)
}
2017-07-02 12:31:01 +00:00
imageRef, err := image.Parse(imageName)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image_name": imageName,
}).Error("trigger.poll.RepositoryWatcher.Watch: failed to parse image")
return err
}
key := getImageIdentifier(imageRef)
// checking whether it's already being watched
details, ok := w.watched[key]
if !ok {
err = w.addJob(imageRef, registryUsername, registryPassword, schedule)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image_name": imageName,
"registry_username": registryUsername,
}).Error("trigger.poll.RepositoryWatcher.Watch: failed to add image watch job")
}
return err
}
// checking schedule
if details.schedule != schedule {
w.cron.UpdateJob(key, schedule)
}
// checking auth details, if changed - need to update
if details.registryPassword != registryPassword || details.registryUsername != registryUsername {
// recreating job
w.cron.DeleteJob(key)
err = w.addJob(imageRef, registryUsername, registryPassword, schedule)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image_name": imageName,
"registry_username": registryUsername,
}).Error("trigger.poll.RepositoryWatcher.Watch: failed to add image watch job")
}
return err
}
// nothing to do
return nil
}
func (w *RepositoryWatcher) addJob(ref *image.Reference, registryUsername, registryPassword, schedule string) error {
// getting initial digest
2017-07-02 20:22:05 +00:00
reg := ref.Scheme() + "://" + ref.Registry()
2017-07-02 12:31:01 +00:00
digest, err := w.registryClient.Digest(registry.Opts{
2017-07-02 20:22:05 +00:00
Registry: reg,
2017-07-02 12:31:01 +00:00
Name: ref.ShortName(),
Tag: ref.Tag(),
Username: registryUsername,
Password: registryPassword,
2017-07-02 12:31:01 +00:00
})
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image": ref.Remote(),
}).Error("trigger.poll.RepositoryWatcher.addJob: failed to get image digest")
return err
}
key := getImageIdentifier(ref)
details := &watchDetails{
imageRef: ref,
digest: digest, // current image digest
2017-07-16 19:45:38 +00:00
latest: ref.Tag(),
2017-07-02 12:31:01 +00:00
registryUsername: registryUsername,
registryPassword: registryPassword,
schedule: schedule,
}
2017-07-02 19:10:07 +00:00
// adding job to internal map
w.watched[key] = details
2017-07-16 19:45:38 +00:00
// checking tag type, for versioned (semver) tags we setup a watch all tags job
// and for non-semver types we create a single tag watcher which
// checks digest
_, err = version.GetVersion(ref.Tag())
if err != nil {
// adding new job
job := NewWatchTagJob(w.providers, w.registryClient, details)
log.WithFields(log.Fields{
"job_name": key,
"image": ref.Remote(),
"digest": digest,
"schedule": schedule,
}).Info("trigger.poll.RepositoryWatcher: new watch tag digest job added")
return w.cron.AddJob(key, schedule, job)
}
2017-07-02 12:31:01 +00:00
// adding new job
2017-07-16 19:45:38 +00:00
job := NewWatchRepositoryTagsJob(w.providers, w.registryClient, details)
2017-07-02 12:31:01 +00:00
log.WithFields(log.Fields{
"job_name": key,
"image": ref.Remote(),
2017-07-02 20:22:05 +00:00
"digest": digest,
2017-07-02 12:31:01 +00:00
"schedule": schedule,
2017-07-16 19:45:38 +00:00
}).Info("trigger.poll.RepositoryWatcher: new watch repository tags job added")
2017-07-02 12:31:01 +00:00
return w.cron.AddJob(key, schedule, job)
}
2017-07-02 19:10:07 +00:00
// WatchTagJob - Watch specific tag job
2017-07-02 12:30:36 +00:00
type WatchTagJob struct {
providers provider.Providers
registryClient registry.Client
details *watchDetails
}
2017-07-02 19:10:07 +00:00
// NewWatchTagJob - new watch tag job monitors specific tag by checking digest based on specified
// cron style schedule
2017-07-02 12:30:36 +00:00
func NewWatchTagJob(providers provider.Providers, registryClient registry.Client, details *watchDetails) *WatchTagJob {
return &WatchTagJob{
providers: providers,
registryClient: registryClient,
details: details,
}
}
2017-07-02 19:10:07 +00:00
// Run - main function to check schedule
2017-07-02 12:30:36 +00:00
func (j *WatchTagJob) Run() {
2017-07-02 20:22:05 +00:00
reg := j.details.imageRef.Scheme() + "://" + j.details.imageRef.Registry()
2017-07-02 12:30:36 +00:00
currentDigest, err := j.registryClient.Digest(registry.Opts{
2017-07-02 20:22:05 +00:00
Registry: reg,
2017-07-02 12:30:36 +00:00
Name: j.details.imageRef.ShortName(),
Tag: j.details.imageRef.Tag(),
Username: j.details.registryUsername,
Password: j.details.registryPassword,
2017-07-02 12:30:36 +00:00
})
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image": j.details.imageRef.Remote(),
}).Error("trigger.poll.WatchTagJob: failed to check digest")
return
}
2017-07-02 20:22:05 +00:00
log.WithFields(log.Fields{
"current_digest": j.details.digest,
"new_digest": currentDigest,
"image_name": j.details.imageRef.Remote(),
2017-07-16 19:45:38 +00:00
}).Debug("trigger.poll.WatchTagJob: checking digest")
2017-07-02 20:22:05 +00:00
2017-07-02 12:30:36 +00:00
// checking whether image digest has changed
if j.details.digest != currentDigest {
// updating digest
j.details.digest = currentDigest
event := types.Event{
Repository: types.Repository{
2017-07-16 19:45:38 +00:00
Name: j.details.imageRef.Repository(),
2017-07-02 12:30:36 +00:00
Tag: j.details.imageRef.Tag(),
Digest: currentDigest,
},
TriggerName: types.TriggerTypePoll.String(),
}
2017-07-16 19:45:38 +00:00
log.WithFields(log.Fields{
"repository": j.details.imageRef.Repository(),
"new_digest": currentDigest,
}).Info("trigger.poll.WatchTagJob: digest change detected, submiting event to providers")
2017-07-02 12:30:36 +00:00
j.providers.Submit(event)
}
}
2017-07-16 19:45:38 +00:00
// WatchRepositoryTagsJob - watch all tags
type WatchRepositoryTagsJob struct {
providers provider.Providers
registryClient registry.Client
details *watchDetails
}
// NewWatchRepositoryTagsJob - new tags watcher job
func NewWatchRepositoryTagsJob(providers provider.Providers, registryClient registry.Client, details *watchDetails) *WatchRepositoryTagsJob {
return &WatchRepositoryTagsJob{
providers: providers,
registryClient: registryClient,
details: details,
}
}
// Run - main function to check schedule
func (j *WatchRepositoryTagsJob) Run() {
reg := j.details.imageRef.Scheme() + "://" + j.details.imageRef.Registry()
if j.details.latest == "" {
j.details.latest = j.details.imageRef.Tag()
}
repository, err := j.registryClient.Get(registry.Opts{
Registry: reg,
Name: j.details.imageRef.ShortName(),
Tag: j.details.latest,
Username: j.details.registryUsername,
Password: j.details.registryPassword,
2017-07-16 19:45:38 +00:00
})
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image": j.details.imageRef.Remote(),
}).Error("trigger.poll.WatchRepositoryTagsJob: failed to get repository")
return
}
log.WithFields(log.Fields{
"current_tag": j.details.imageRef.Tag(),
"repository_tags": repository.Tags,
"image_name": j.details.imageRef.Remote(),
}).Debug("trigger.poll.WatchRepositoryTagsJob: checking tags")
latestVersion, newAvailable, err := version.NewAvailable(j.details.latest, repository.Tags)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"repository_tags": repository.Tags,
"image": j.details.imageRef.Remote(),
}).Error("trigger.poll.WatchRepositoryTagsJob: failed to get latest version from tags")
return
}
log.Debugf("new tag '%s' available", latestVersion)
if newAvailable {
// updating current latest
j.details.latest = latestVersion
event := types.Event{
Repository: types.Repository{
Name: j.details.imageRef.Repository(),
Tag: latestVersion,
},
TriggerName: types.TriggerTypePoll.String(),
}
log.WithFields(log.Fields{
"repository": j.details.imageRef.Repository(),
"new_tag": latestVersion,
}).Info("trigger.poll.WatchRepositoryTagsJob: submiting event to providers")
j.providers.Submit(event)
}
}