diff --git a/approvals/approvals.go b/approvals/approvals.go index fb62db17..3a9e2a9d 100644 --- a/approvals/approvals.go +++ b/approvals/approvals.go @@ -4,6 +4,7 @@ import ( "context" "errors" "sync" + "sync/atomic" "time" "github.com/rusenask/keel/cache" @@ -16,6 +17,10 @@ import ( // Manager is used to manage updates type Manager interface { + // Subscribe for approval request events, subscriber should provide + // its name + Subscribe(ctx context.Context) (<-chan *types.Approval, error) + // request approval for deployment/release/etc.. Create(r *types.Approval) error // Update whole approval object @@ -52,6 +57,10 @@ type DefaultManager struct { // when all approvals are collected providers provider.Providers + // subscriber channels + channels map[uint64]chan *types.Approval + index uint64 + mu *sync.Mutex } @@ -61,10 +70,50 @@ func New(cache cache.Cache, serializer codecs.Serializer, providers provider.Pro cache: cache, serializer: serializer, providers: providers, + channels: make(map[uint64]chan *types.Approval), + index: 0, mu: &sync.Mutex{}, } } +// Subscribe - subscribe for approval events +func (m *DefaultManager) Subscribe(ctx context.Context) (<-chan *types.Approval, error) { + m.mu.Lock() + m.mu.Unlock() + + index := atomic.AddUint64(&m.index, 1) + approvalsCh := make(chan *types.Approval, 10) + m.channels[index] = approvalsCh + m.mu.Unlock() + + go func() { + for { + select { + case <-ctx.Done(): + m.mu.Lock() + + delete(m.channels, index) + + m.mu.Unlock() + return + } + } + }() + + return approvalsCh, nil +} + +func (m *DefaultManager) publish(approval *types.Approval) error { + m.mu.Lock() + defer m.mu.Unlock() + + for _, subscriber := range m.channels { + subscriber <- approval + } + return nil +} + +// Create - creates new approval request and publishes to all subscribers func (m *DefaultManager) Create(r *types.Approval) error { _, err := m.Get(r.Provider, r.Identifier) if err == nil { @@ -78,7 +127,13 @@ func (m *DefaultManager) Create(r *types.Approval) error { ctx := cache.SetContextExpiration(context.Background(), r.Deadline) - return m.cache.Put(ctx, getKey(r.Provider, r.Identifier), bts) + err = m.cache.Put(ctx, getKey(r.Provider, r.Identifier), bts) + if err != nil { + return err + } + + return m.publish(r) + } func (m *DefaultManager) Update(r *types.Approval) error { diff --git a/approvals/approvals_test.go b/approvals/approvals_test.go index 79203afd..1cc03b70 100644 --- a/approvals/approvals_test.go +++ b/approvals/approvals_test.go @@ -197,7 +197,7 @@ func TestApprove(t *testing.T) { err := am.Create(&types.Approval{ Provider: types.ProviderTypeKubernetes, - Identifier: "xxx/app-1", + Identifier: "xxx/app-1:1.2.5", CurrentVersion: "1.2.3", NewVersion: "1.2.5", Deadline: 0, @@ -209,9 +209,9 @@ func TestApprove(t *testing.T) { t.Fatalf("failed to create approval: %s", err) } - am.Approve(types.ProviderTypeKubernetes, "xxx/app-1") + am.Approve(types.ProviderTypeKubernetes, "xxx/app-1:1.2.5") - stored, err := am.Get(types.ProviderTypeKubernetes, "xxx/app-1") + stored, err := am.Get(types.ProviderTypeKubernetes, "xxx/app-1:1.2.5") if err != nil { t.Fatalf("failed to get approval: %s", err) }