switching to tracked images

pull/39/head
Karolis Rusenas 2017-07-20 20:40:51 +01:00
parent 924ddb4aeb
commit 5fcb2ff53d
2 changed files with 51 additions and 104 deletions

View File

@ -6,18 +6,14 @@ import (
"golang.org/x/net/context"
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
"github.com/rusenask/keel/provider/kubernetes"
"github.com/rusenask/keel/types"
"github.com/rusenask/keel/util/policies"
"github.com/rusenask/keel/provider"
log "github.com/Sirupsen/logrus"
)
// DefaultManager - subscription manager
type DefaultManager struct {
implementer kubernetes.Implementer
providers provider.Providers
client Subscriber
// existing subscribers
@ -43,9 +39,9 @@ type Subscriber interface {
}
// NewDefaultManager - creates new pubsub manager to create subscription for deployments
func NewDefaultManager(projectID string, implementer kubernetes.Implementer, subClient Subscriber) *DefaultManager {
func NewDefaultManager(projectID string, providers provider.Providers, subClient Subscriber) *DefaultManager {
return &DefaultManager{
implementer: implementer,
providers: providers,
client: subClient,
projectID: projectID,
subscribers: make(map[string]context.Context),
@ -86,32 +82,21 @@ func (s *DefaultManager) Start(ctx context.Context) error {
}
func (s *DefaultManager) scan(ctx context.Context) error {
deploymentLists, err := s.deployments()
trackedImages, err := s.providers.TrackedImages()
if err != nil {
return err
}
for _, deploymentList := range deploymentLists {
for _, deployment := range deploymentList.Items {
labels := deployment.GetLabels()
// ignoring unlabelled deployments
policy := policies.GetPolicy(labels)
if policy == types.PolicyTypeNone {
continue
}
err = s.checkDeployment(&deployment)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"deployment": deployment.Name,
"namespace": deployment.Namespace,
}).Error("trigger.pubsub.manager: failed to check deployment subscription status")
}
for _, trackedImage := range trackedImages {
if !isGoogleContainerRegistry(trackedImage.Image.Registry()) {
log.Debug("registry %s is not a GCR, skipping", trackedImage.Image.Registry())
continue
}
}
// uri
gcrURI := containerRegistryURI(s.projectID, trackedImage.Image.Registry())
s.ensureSubscription(gcrURI)
}
return nil
}
@ -154,49 +139,3 @@ func (s *DefaultManager) removeSubscription(gcrURI string) {
defer s.mu.Unlock()
delete(s.subscribers, gcrURI)
}
// checkDeployment - gets deployment image and checks whether we have appropriate topic
// and subscription for this deployment
func (s *DefaultManager) checkDeployment(deployment *v1beta1.Deployment) error {
for _, c := range deployment.Spec.Template.Spec.Containers {
// registry host
registry := extractContainerRegistryURI(c.Image)
if !isGoogleContainerRegistry(registry) {
log.Debug("registry %s is not a GCR, skipping", registry)
continue
}
// uri
gcrURI := containerRegistryURI(s.projectID, registry)
s.ensureSubscription(gcrURI)
}
return nil
}
func (s *DefaultManager) deployments() ([]*v1beta1.DeploymentList, error) {
// namespaces := p.client.Namespaces()
deployments := []*v1beta1.DeploymentList{}
n, err := s.implementer.Namespaces()
if err != nil {
return nil, err
}
for _, n := range n.Items {
l, err := s.implementer.Deployments(n.GetName())
if err != nil {
log.WithFields(log.Fields{
"error": err,
"namespace": n.GetName(),
}).Error("trigger.pubsub.manager: failed to list deployments")
continue
}
deployments = append(deployments, l)
}
return deployments, nil
}

View File

@ -5,11 +5,9 @@ import (
"sync"
"time"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
"github.com/rusenask/keel/provider"
"github.com/rusenask/keel/types"
"github.com/rusenask/keel/util/image"
"testing"
)
@ -32,42 +30,52 @@ func (s *fakeSubscriber) Subscribe(ctx context.Context, topic, subscription stri
}
}
type fakeProvider struct {
images []*types.TrackedImage
submitted []types.Event
}
func (p *fakeProvider) Submit(event types.Event) error {
p.submitted = append(p.submitted, event)
return nil
}
func (p *fakeProvider) TrackedImages() ([]*types.TrackedImage, error) {
return p.images, nil
}
func (p *fakeProvider) List() []string {
return []string{"fakeprovider"}
}
func (p *fakeProvider) Stop() {
return
}
func (p *fakeProvider) GetName() string {
return "fp"
}
func TestCheckDeployment(t *testing.T) {
img, _ := image.Parse("gcr.io/v2-namespace/hello-world:1.1")
fp := &fakeProvider{
images: []*types.TrackedImage{
&types.TrackedImage{
Image: img,
Provider: "fp",
},
},
}
providers := provider.New([]provider.Provider{fp})
fs := &fakeSubscriber{}
mng := &DefaultManager{
providers: providers,
client: fs,
mu: &sync.Mutex{},
ctx: context.Background(),
subscribers: make(map[string]context.Context),
}
dep := &v1beta1.Deployment{
meta_v1.TypeMeta{},
meta_v1.ObjectMeta{
Name: "dep-1",
Namespace: "xxxx",
Labels: map[string]string{types.KeelPolicyLabel: "all"},
},
v1beta1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Image: "gcr.io/v2-namespace/hello-world:1.1.1",
},
v1.Container{
Image: "gcr.io/v2-namespace/greetings-world:1.1.1",
},
},
},
},
},
v1beta1.DeploymentStatus{},
}
err := mng.checkDeployment(dep)
err := mng.scan(context.Background())
if err != nil {
t.Errorf("deployment check failed: %s", err)
t.Errorf("failed to scan: %s", err)
}
// sleeping a bit since our fake subscriber goes into a separate goroutine