single tag watcher split

feature/poll-multi-tags
Karolis Rusenas 2018-11-14 20:59:32 +00:00
parent b9d982d31d
commit d0760793a2
2 changed files with 97 additions and 75 deletions

View File

@ -0,0 +1,86 @@
package poll
import (
"github.com/keel-hq/keel/extension/credentialshelper"
"github.com/keel-hq/keel/provider"
"github.com/keel-hq/keel/registry"
"github.com/keel-hq/keel/types"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)
// WatchTagJob - Watch specific tag job
type WatchTagJob struct {
providers provider.Providers
registryClient registry.Client
details *watchDetails
}
// NewWatchTagJob - new watch tag job monitors specific tag by checking digest based on specified
// cron style schedule
func NewWatchTagJob(providers provider.Providers, registryClient registry.Client, details *watchDetails) *WatchTagJob {
return &WatchTagJob{
providers: providers,
registryClient: registryClient,
details: details,
}
}
// Run - main function to check schedule
func (j *WatchTagJob) Run() {
creds := credentialshelper.GetCredentials(j.details.trackedImage)
reg := j.details.trackedImage.Image.Scheme() + "://" + j.details.trackedImage.Image.Registry()
currentDigest, err := j.registryClient.Digest(registry.Opts{
Registry: reg,
Name: j.details.trackedImage.Image.ShortName(),
Tag: j.details.trackedImage.Image.Tag(),
Username: creds.Username,
Password: creds.Password,
})
registriesScannedCounter.With(prometheus.Labels{"registry": j.details.trackedImage.Image.Registry(), "image": j.details.trackedImage.Image.Repository()}).Inc()
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image": j.details.trackedImage.Image.String(),
}).Error("trigger.poll.WatchTagJob: failed to check digest")
return
}
log.WithFields(log.Fields{
"current_digest": j.details.digest,
"new_digest": currentDigest,
"image": j.details.trackedImage.Image.String(),
}).Debug("trigger.poll.WatchTagJob: checking digest")
// checking whether image digest has changed
if j.details.digest != currentDigest {
// updating digest
j.details.digest = currentDigest
event := types.Event{
Repository: types.Repository{
Name: j.details.trackedImage.Image.Repository(),
Tag: j.details.trackedImage.Image.Tag(),
Digest: currentDigest,
},
TriggerName: types.TriggerTypePoll.String(),
}
log.WithFields(log.Fields{
"image": j.details.trackedImage.Image.String(),
"new_digest": currentDigest,
}).Info("trigger.poll.WatchTagJob: digest change detected, submiting event to providers")
// j.providers.Submit(event)
err := j.providers.Submit(event)
if err != nil {
log.WithFields(log.Fields{
"repository": j.details.trackedImage.Image.Repository(),
"digest": currentDigest,
"error": err,
}).Error("trigger.poll.WatchRepositoryTagsJob: error while submitting an event")
}
}
}

View File

@ -85,11 +85,8 @@ func (w *RepositoryWatcher) Start(ctx context.Context) {
// starting cron job
w.cron.Start()
go func() {
select {
case <-ctx.Done():
w.cron.Stop()
return
}
<-ctx.Done()
w.cron.Stop()
}()
}
@ -205,15 +202,22 @@ func (w *RepositoryWatcher) watch(image *types.TrackedImage) (string, error) {
// checking schedule
if details.schedule != image.PollSchedule {
w.cron.UpdateJob(key, image.PollSchedule)
err := w.cron.UpdateJob(key, image.PollSchedule)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image": image.String(),
}).Error("trigger.poll.RepositoryWatcher.Watch: failed to update image watch job")
}
}
details.mu.Lock()
details.trackedImage = image
// setting main latest version to the lowest from the tracked
details.latest = version.Lowest(details.trackedImage.Tags)
details.mu.Unlock()
// nothing to do
return key, nil
}
@ -284,71 +288,3 @@ func (w *RepositoryWatcher) addJob(ti *types.TrackedImage, schedule string) erro
return w.cron.AddJob(key, schedule, job)
}
// WatchTagJob - Watch specific tag job
type WatchTagJob struct {
providers provider.Providers
registryClient registry.Client
details *watchDetails
}
// NewWatchTagJob - new watch tag job monitors specific tag by checking digest based on specified
// cron style schedule
func NewWatchTagJob(providers provider.Providers, registryClient registry.Client, details *watchDetails) *WatchTagJob {
return &WatchTagJob{
providers: providers,
registryClient: registryClient,
details: details,
}
}
// Run - main function to check schedule
func (j *WatchTagJob) Run() {
creds := credentialshelper.GetCredentials(j.details.trackedImage)
reg := j.details.trackedImage.Image.Scheme() + "://" + j.details.trackedImage.Image.Registry()
currentDigest, err := j.registryClient.Digest(registry.Opts{
Registry: reg,
Name: j.details.trackedImage.Image.ShortName(),
Tag: j.details.trackedImage.Image.Tag(),
Username: creds.Username,
Password: creds.Password,
})
registriesScannedCounter.With(prometheus.Labels{"registry": j.details.trackedImage.Image.Registry(), "image": j.details.trackedImage.Image.Repository()}).Inc()
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image": j.details.trackedImage.Image.String(),
}).Error("trigger.poll.WatchTagJob: failed to check digest")
return
}
log.WithFields(log.Fields{
"current_digest": j.details.digest,
"new_digest": currentDigest,
"image": j.details.trackedImage.Image.String(),
}).Debug("trigger.poll.WatchTagJob: checking digest")
// checking whether image digest has changed
if j.details.digest != currentDigest {
// updating digest
j.details.digest = currentDigest
event := types.Event{
Repository: types.Repository{
Name: j.details.trackedImage.Image.Repository(),
Tag: j.details.trackedImage.Image.Tag(),
Digest: currentDigest,
},
TriggerName: types.TriggerTypePoll.String(),
}
log.WithFields(log.Fields{
"image": j.details.trackedImage.Image.String(),
"new_digest": currentDigest,
}).Info("trigger.poll.WatchTagJob: digest change detected, submiting event to providers")
j.providers.Submit(event)
}
}