From 6360476930e083503a7e094c35a97c1977640dd5 Mon Sep 17 00:00:00 2001 From: Karolis Rusenas Date: Tue, 12 Sep 2017 16:37:21 +0300 Subject: [PATCH] expiry service --- approvals/approvals.go | 78 ++++++++++++++++++++++++++++++++----- approvals/approvals_test.go | 75 +++++++++++++++++++++++++++++++---- 2 files changed, 136 insertions(+), 17 deletions(-) diff --git a/approvals/approvals.go b/approvals/approvals.go index 8a49bcb3..8fda43ce 100644 --- a/approvals/approvals.go +++ b/approvals/approvals.go @@ -37,6 +37,8 @@ type Manager interface { Get(identifier string) (*types.Approval, error) List() ([]*types.Approval, error) Delete(identifier string) error + + StartExpiryService(ctx context.Context) error } // Approvals related errors @@ -82,6 +84,64 @@ func New(cache cache.Cache, serializer codecs.Serializer) *DefaultManager { return man } +// StartExpiryService - starts approval expiry service which deletes approvals +// that already reached their deadline +func (m *DefaultManager) StartExpiryService(ctx context.Context) error { + ticker := time.NewTicker(60 * time.Minute) + + err := m.expireEntries() + if err != nil { + log.WithFields(log.Fields{ + "error": err, + }).Error("approvals.StartExpiryService: got error while performing initial expired approvals check") + } + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + err := m.expireEntries() + if err != nil { + log.WithFields(log.Fields{ + "error": err, + }).Error("approvals.StartExpiryService: got error while performing routinely expired approvals check") + } + } + } +} + +func (m *DefaultManager) expireEntries() error { + approvals, err := m.cache.List(ApprovalsPrefix + "/") + if err != nil { + return err + } + + for k, v := range approvals { + var approval types.Approval + err = m.serializer.Decode(v, &approval) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "identifier": k, + }).Error("approvals.expireEntries: failed to decode approval into value") + continue + } + + if approval.Expired() { + err = m.Delete(approval.Identifier) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "identifier": k, + }).Error("approvals.expireEntries: failed to delete expired approval") + } + } + } + + return nil +} + // Subscribe - subscribe for approval events func (m *DefaultManager) Subscribe(ctx context.Context) (<-chan *types.Approval, error) { m.subMu.Lock() @@ -167,17 +227,15 @@ func (m *DefaultManager) Create(r *types.Approval) error { return err } - ctx := cache.SetContextExpiration(context.Background(), r.Deadline) - - err = m.cache.Put(ctx, getKey(r.Identifier), bts) + err = m.cache.Put(getKey(r.Identifier), bts) if err != nil { return err } return m.publishRequest(r) - } +// Update - update approval func (m *DefaultManager) Update(r *types.Approval) error { existing, err := m.Get(r.Identifier) if err != nil { @@ -203,8 +261,7 @@ func (m *DefaultManager) Update(r *types.Approval) error { } } - ctx := cache.SetContextExpiration(context.Background(), r.Deadline) - return m.cache.Put(ctx, getKey(r.Identifier), bts) + return m.cache.Put(getKey(r.Identifier), bts) } // Approve - increase VotesReceived by 1 and returns updated version @@ -260,8 +317,9 @@ func (m *DefaultManager) Reject(identifier string) (*types.Approval, error) { return existing, nil } +// Get - get specified approval func (m *DefaultManager) Get(identifier string) (*types.Approval, error) { - bts, err := m.cache.Get(context.Background(), getKey(identifier)) + bts, err := m.cache.Get(getKey(identifier)) if err != nil { return nil, err } @@ -271,6 +329,7 @@ func (m *DefaultManager) Get(identifier string) (*types.Approval, error) { return &approval, err } +// List - list approvals func (m *DefaultManager) List() ([]*types.Approval, error) { bts, err := m.cache.List(ApprovalsPrefix) if err != nil { @@ -290,10 +349,11 @@ func (m *DefaultManager) List() ([]*types.Approval, error) { approvals = append(approvals, &approval) } return approvals, nil - } + +// Delete - delete specified approval func (m *DefaultManager) Delete(identifier string) error { - return m.cache.Delete(context.Background(), getKey(identifier)) + return m.cache.Delete(getKey(identifier)) } func getKey(identifier string) string { diff --git a/approvals/approvals_test.go b/approvals/approvals_test.go index 82606c7f..b8f519c5 100644 --- a/approvals/approvals_test.go +++ b/approvals/approvals_test.go @@ -20,7 +20,7 @@ func TestCreateApproval(t *testing.T) { Identifier: "xxx/app-1", CurrentVersion: "1.2.3", NewVersion: "1.2.5", - Deadline: 0, + Deadline: time.Now().Add(5 * time.Minute), }) if err != nil { @@ -37,6 +37,35 @@ func TestCreateApproval(t *testing.T) { } } +func TestDeleteApproval(t *testing.T) { + mem := memory.NewMemoryCache(100*time.Millisecond, 100*time.Millisecond, 10*time.Millisecond) + + am := New(mem, codecs.DefaultSerializer()) + + err := am.Create(&types.Approval{ + Provider: types.ProviderTypeKubernetes, + Identifier: "xxx/app-1", + CurrentVersion: "1.2.3", + NewVersion: "1.2.5", + Deadline: time.Now().Add(5 * time.Minute), + }) + + if err != nil { + t.Fatalf("failed to create approval: %s", err) + } + + err = am.Delete("xxx/app-1") + if err != nil { + t.Errorf("failed to delete approval: %s", err) + } + + _, err = am.Get("xxx/app-1") + if err == nil { + t.Errorf("expected to get an error when retrieving deleted approval") + } + +} + func TestUpdateApproval(t *testing.T) { mem := memory.NewMemoryCache(100*time.Millisecond, 100*time.Millisecond, 10*time.Millisecond) @@ -49,7 +78,7 @@ func TestUpdateApproval(t *testing.T) { NewVersion: "1.2.5", VotesRequired: 1, VotesReceived: 0, - Deadline: 0, + Deadline: time.Now().Add(5 * time.Minute), Event: &types.Event{ Repository: types.Repository{ Name: "very/repo", @@ -75,7 +104,7 @@ func TestUpdateApproval(t *testing.T) { NewVersion: "1.2.5", VotesRequired: 1, VotesReceived: 1, - Deadline: 0, + Deadline: time.Now().Add(5 * time.Minute), Event: &types.Event{ Repository: types.Repository{ Name: "very/repo", @@ -106,7 +135,7 @@ func TestUpdateApprovalRejected(t *testing.T) { NewVersion: "1.2.5", VotesRequired: 1, VotesReceived: 0, - Deadline: 0, + Deadline: time.Now().Add(5 * time.Minute), Event: &types.Event{ Repository: types.Repository{ Name: "very/repo", @@ -135,7 +164,7 @@ func TestUpdateApprovalRejected(t *testing.T) { VotesRequired: 1, VotesReceived: 0, Rejected: true, - Deadline: 0, + Deadline: time.Now().Add(5 * time.Minute), Event: &types.Event{ Repository: types.Repository{ Name: "very/repo", @@ -156,7 +185,7 @@ func TestUpdateApprovalRejected(t *testing.T) { VotesRequired: 1, VotesReceived: 1, Rejected: true, - Deadline: 0, + Deadline: time.Now().Add(5 * time.Minute), Event: &types.Event{ Repository: types.Repository{ Name: "very/repo", @@ -188,7 +217,7 @@ func TestApprove(t *testing.T) { Identifier: "xxx/app-1:1.2.5", CurrentVersion: "1.2.3", NewVersion: "1.2.5", - Deadline: 0, + Deadline: time.Now().Add(5 * time.Minute), VotesRequired: 2, VotesReceived: 0, }) @@ -219,7 +248,7 @@ func TestReject(t *testing.T) { Identifier: "xxx/app-1", CurrentVersion: "1.2.3", NewVersion: "1.2.5", - Deadline: 0, + Deadline: time.Now().Add(5 * time.Minute), VotesRequired: 2, VotesReceived: 0, }) @@ -239,3 +268,33 @@ func TestReject(t *testing.T) { t.Errorf("unexpected approval to be rejected") } } + +func TestExpire(t *testing.T) { + mem := memory.NewMemoryCache(100*time.Millisecond, 100*time.Millisecond, 10*time.Millisecond) + + am := New(mem, codecs.DefaultSerializer()) + + err := am.Create(&types.Approval{ + Provider: types.ProviderTypeKubernetes, + Identifier: "xxx/app-1", + CurrentVersion: "1.2.3", + NewVersion: "1.2.5", + Deadline: time.Now().Add(-5 * time.Minute), + VotesRequired: 2, + VotesReceived: 0, + }) + + if err != nil { + t.Fatalf("failed to create approval: %s", err) + } + + err = am.expireEntries() + if err != nil { + t.Errorf("got error while expiring entries: %s", err) + } + + _, err = am.Get("xxx/app-1") + if err == nil { + t.Errorf("expected approval to be deleted but didn't get an error") + } +}