149 lines
3.5 KiB
Go
149 lines
3.5 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/net/context"
|
|
|
|
"github.com/keel-hq/keel/provider"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// DefaultManager - subscription manager
|
|
type DefaultManager struct {
|
|
providers provider.Providers
|
|
|
|
client Subscriber
|
|
// existing subscribers
|
|
mu *sync.Mutex
|
|
// a map of GCR URIs and subscribers to those URIs
|
|
// i.e. key could be something like: gcr.io%2Fmy-project
|
|
subscribers map[string]context.Context
|
|
|
|
// projectID is required to correctly set GCR subscriptions
|
|
projectID string
|
|
|
|
// clusterName is used to create unique names for the subscriptions. Each subscription
|
|
// has to have a unique name in order to receive all events (otherwise, if it is the same,
|
|
// only 1 keel instance will receive a GCR event after a push event)
|
|
clusterName string
|
|
|
|
// scanTick - scan interval in seconds, defaults to 60 seconds
|
|
scanTick int
|
|
|
|
// root context
|
|
ctx context.Context
|
|
}
|
|
|
|
// Subscriber - subscribe is responsible to listen for repository events and
|
|
// inform providers
|
|
type Subscriber interface {
|
|
Subscribe(ctx context.Context, topic, subscription string) error
|
|
}
|
|
|
|
// NewDefaultManager - creates new pubsub manager to create subscription for deployments
|
|
func NewDefaultManager(clusterName, projectID string, providers provider.Providers, subClient Subscriber) *DefaultManager {
|
|
return &DefaultManager{
|
|
providers: providers,
|
|
client: subClient,
|
|
projectID: projectID,
|
|
clusterName: clusterName,
|
|
subscribers: make(map[string]context.Context),
|
|
mu: &sync.Mutex{},
|
|
scanTick: 60,
|
|
}
|
|
}
|
|
|
|
// Start - start scanning deployment for changes
|
|
func (s *DefaultManager) Start(ctx context.Context) error {
|
|
// setting root context
|
|
s.ctx = ctx
|
|
|
|
// initial scan
|
|
err := s.scan(ctx)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
}).Error("trigger.pubsub.manager: scan failed")
|
|
}
|
|
|
|
ticker := time.NewTicker(time.Duration(s.scanTick) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-ticker.C:
|
|
log.Debug("performing scan")
|
|
err := s.scan(ctx)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
}).Error("trigger.pubsub.manager: scan failed")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *DefaultManager) scan(ctx context.Context) error {
|
|
trackedImages, err := s.providers.TrackedImages()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, trackedImage := range trackedImages {
|
|
if !isGoogleArtifactRegistry(trackedImage.Image.Registry()) {
|
|
log.Debugf("registry %s is not a GCR, skipping", trackedImage.Image.Registry())
|
|
continue
|
|
}
|
|
|
|
// uri
|
|
// https://cloud.google.com/container-registry/docs/configuring-notifications
|
|
s.ensureSubscription("gcr")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *DefaultManager) subscribed(gcrURI string) bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
_, ok := s.subscribers[gcrURI]
|
|
return ok
|
|
}
|
|
|
|
func (s *DefaultManager) ensureSubscription(gcrURI string) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
_, ok := s.subscribers[gcrURI]
|
|
if !ok {
|
|
ctx, cancel := context.WithCancel(s.ctx)
|
|
s.subscribers[gcrURI] = ctx
|
|
subName := containerRegistrySubName(s.clusterName, s.projectID, gcrURI)
|
|
go func() {
|
|
defer cancel()
|
|
err := s.client.Subscribe(s.ctx, gcrURI, subName)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"gcr_uri": gcrURI,
|
|
"subscription_name": subName,
|
|
}).Error("trigger.pubsub.manager: failed to create subscription")
|
|
}
|
|
|
|
// cleanup
|
|
s.removeSubscription(gcrURI)
|
|
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (s *DefaultManager) removeSubscription(gcrURI string) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
delete(s.subscribers, gcrURI)
|
|
}
|