subscribe for approvals

pull/99/head
Karolis Rusenas 2017-08-19 19:30:10 +01:00
parent f026adfef8
commit 0fc48a82e9
2 changed files with 59 additions and 4 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
"github.com/rusenask/keel/cache"
@ -16,6 +17,10 @@ import (
// Manager is used to manage updates
type Manager interface {
// Subscribe for approval request events, subscriber should provide
// its name
Subscribe(ctx context.Context) (<-chan *types.Approval, error)
// request approval for deployment/release/etc..
Create(r *types.Approval) error
// Update whole approval object
@ -52,6 +57,10 @@ type DefaultManager struct {
// when all approvals are collected
providers provider.Providers
// subscriber channels
channels map[uint64]chan *types.Approval
index uint64
mu *sync.Mutex
}
@ -61,10 +70,50 @@ func New(cache cache.Cache, serializer codecs.Serializer, providers provider.Pro
cache: cache,
serializer: serializer,
providers: providers,
channels: make(map[uint64]chan *types.Approval),
index: 0,
mu: &sync.Mutex{},
}
}
// Subscribe - subscribe for approval events
func (m *DefaultManager) Subscribe(ctx context.Context) (<-chan *types.Approval, error) {
m.mu.Lock()
m.mu.Unlock()
index := atomic.AddUint64(&m.index, 1)
approvalsCh := make(chan *types.Approval, 10)
m.channels[index] = approvalsCh
m.mu.Unlock()
go func() {
for {
select {
case <-ctx.Done():
m.mu.Lock()
delete(m.channels, index)
m.mu.Unlock()
return
}
}
}()
return approvalsCh, nil
}
func (m *DefaultManager) publish(approval *types.Approval) error {
m.mu.Lock()
defer m.mu.Unlock()
for _, subscriber := range m.channels {
subscriber <- approval
}
return nil
}
// Create - creates new approval request and publishes to all subscribers
func (m *DefaultManager) Create(r *types.Approval) error {
_, err := m.Get(r.Provider, r.Identifier)
if err == nil {
@ -78,7 +127,13 @@ func (m *DefaultManager) Create(r *types.Approval) error {
ctx := cache.SetContextExpiration(context.Background(), r.Deadline)
return m.cache.Put(ctx, getKey(r.Provider, r.Identifier), bts)
err = m.cache.Put(ctx, getKey(r.Provider, r.Identifier), bts)
if err != nil {
return err
}
return m.publish(r)
}
func (m *DefaultManager) Update(r *types.Approval) error {

View File

@ -197,7 +197,7 @@ func TestApprove(t *testing.T) {
err := am.Create(&types.Approval{
Provider: types.ProviderTypeKubernetes,
Identifier: "xxx/app-1",
Identifier: "xxx/app-1:1.2.5",
CurrentVersion: "1.2.3",
NewVersion: "1.2.5",
Deadline: 0,
@ -209,9 +209,9 @@ func TestApprove(t *testing.T) {
t.Fatalf("failed to create approval: %s", err)
}
am.Approve(types.ProviderTypeKubernetes, "xxx/app-1")
am.Approve(types.ProviderTypeKubernetes, "xxx/app-1:1.2.5")
stored, err := am.Get(types.ProviderTypeKubernetes, "xxx/app-1")
stored, err := am.Get(types.ProviderTypeKubernetes, "xxx/app-1:1.2.5")
if err != nil {
t.Fatalf("failed to get approval: %s", err)
}