expiry service
parent
43249b65b0
commit
6360476930
|
@ -37,6 +37,8 @@ type Manager interface {
|
|||
Get(identifier string) (*types.Approval, error)
|
||||
List() ([]*types.Approval, error)
|
||||
Delete(identifier string) error
|
||||
|
||||
StartExpiryService(ctx context.Context) error
|
||||
}
|
||||
|
||||
// Approvals related errors
|
||||
|
@ -82,6 +84,64 @@ func New(cache cache.Cache, serializer codecs.Serializer) *DefaultManager {
|
|||
return man
|
||||
}
|
||||
|
||||
// StartExpiryService - starts approval expiry service which deletes approvals
|
||||
// that already reached their deadline
|
||||
func (m *DefaultManager) StartExpiryService(ctx context.Context) error {
|
||||
ticker := time.NewTicker(60 * time.Minute)
|
||||
|
||||
err := m.expireEntries()
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
}).Error("approvals.StartExpiryService: got error while performing initial expired approvals check")
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
err := m.expireEntries()
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
}).Error("approvals.StartExpiryService: got error while performing routinely expired approvals check")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *DefaultManager) expireEntries() error {
|
||||
approvals, err := m.cache.List(ApprovalsPrefix + "/")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for k, v := range approvals {
|
||||
var approval types.Approval
|
||||
err = m.serializer.Decode(v, &approval)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"identifier": k,
|
||||
}).Error("approvals.expireEntries: failed to decode approval into value")
|
||||
continue
|
||||
}
|
||||
|
||||
if approval.Expired() {
|
||||
err = m.Delete(approval.Identifier)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"identifier": k,
|
||||
}).Error("approvals.expireEntries: failed to delete expired approval")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribe - subscribe for approval events
|
||||
func (m *DefaultManager) Subscribe(ctx context.Context) (<-chan *types.Approval, error) {
|
||||
m.subMu.Lock()
|
||||
|
@ -167,17 +227,15 @@ func (m *DefaultManager) Create(r *types.Approval) error {
|
|||
return err
|
||||
}
|
||||
|
||||
ctx := cache.SetContextExpiration(context.Background(), r.Deadline)
|
||||
|
||||
err = m.cache.Put(ctx, getKey(r.Identifier), bts)
|
||||
err = m.cache.Put(getKey(r.Identifier), bts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return m.publishRequest(r)
|
||||
|
||||
}
|
||||
|
||||
// Update - update approval
|
||||
func (m *DefaultManager) Update(r *types.Approval) error {
|
||||
existing, err := m.Get(r.Identifier)
|
||||
if err != nil {
|
||||
|
@ -203,8 +261,7 @@ func (m *DefaultManager) Update(r *types.Approval) error {
|
|||
}
|
||||
}
|
||||
|
||||
ctx := cache.SetContextExpiration(context.Background(), r.Deadline)
|
||||
return m.cache.Put(ctx, getKey(r.Identifier), bts)
|
||||
return m.cache.Put(getKey(r.Identifier), bts)
|
||||
}
|
||||
|
||||
// Approve - increase VotesReceived by 1 and returns updated version
|
||||
|
@ -260,8 +317,9 @@ func (m *DefaultManager) Reject(identifier string) (*types.Approval, error) {
|
|||
return existing, nil
|
||||
}
|
||||
|
||||
// Get - get specified approval
|
||||
func (m *DefaultManager) Get(identifier string) (*types.Approval, error) {
|
||||
bts, err := m.cache.Get(context.Background(), getKey(identifier))
|
||||
bts, err := m.cache.Get(getKey(identifier))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -271,6 +329,7 @@ func (m *DefaultManager) Get(identifier string) (*types.Approval, error) {
|
|||
return &approval, err
|
||||
}
|
||||
|
||||
// List - list approvals
|
||||
func (m *DefaultManager) List() ([]*types.Approval, error) {
|
||||
bts, err := m.cache.List(ApprovalsPrefix)
|
||||
if err != nil {
|
||||
|
@ -290,10 +349,11 @@ func (m *DefaultManager) List() ([]*types.Approval, error) {
|
|||
approvals = append(approvals, &approval)
|
||||
}
|
||||
return approvals, nil
|
||||
|
||||
}
|
||||
|
||||
// Delete - delete specified approval
|
||||
func (m *DefaultManager) Delete(identifier string) error {
|
||||
return m.cache.Delete(context.Background(), getKey(identifier))
|
||||
return m.cache.Delete(getKey(identifier))
|
||||
}
|
||||
|
||||
func getKey(identifier string) string {
|
||||
|
|
|
@ -20,7 +20,7 @@ func TestCreateApproval(t *testing.T) {
|
|||
Identifier: "xxx/app-1",
|
||||
CurrentVersion: "1.2.3",
|
||||
NewVersion: "1.2.5",
|
||||
Deadline: 0,
|
||||
Deadline: time.Now().Add(5 * time.Minute),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
@ -37,6 +37,35 @@ func TestCreateApproval(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestDeleteApproval(t *testing.T) {
|
||||
mem := memory.NewMemoryCache(100*time.Millisecond, 100*time.Millisecond, 10*time.Millisecond)
|
||||
|
||||
am := New(mem, codecs.DefaultSerializer())
|
||||
|
||||
err := am.Create(&types.Approval{
|
||||
Provider: types.ProviderTypeKubernetes,
|
||||
Identifier: "xxx/app-1",
|
||||
CurrentVersion: "1.2.3",
|
||||
NewVersion: "1.2.5",
|
||||
Deadline: time.Now().Add(5 * time.Minute),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create approval: %s", err)
|
||||
}
|
||||
|
||||
err = am.Delete("xxx/app-1")
|
||||
if err != nil {
|
||||
t.Errorf("failed to delete approval: %s", err)
|
||||
}
|
||||
|
||||
_, err = am.Get("xxx/app-1")
|
||||
if err == nil {
|
||||
t.Errorf("expected to get an error when retrieving deleted approval")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestUpdateApproval(t *testing.T) {
|
||||
mem := memory.NewMemoryCache(100*time.Millisecond, 100*time.Millisecond, 10*time.Millisecond)
|
||||
|
||||
|
@ -49,7 +78,7 @@ func TestUpdateApproval(t *testing.T) {
|
|||
NewVersion: "1.2.5",
|
||||
VotesRequired: 1,
|
||||
VotesReceived: 0,
|
||||
Deadline: 0,
|
||||
Deadline: time.Now().Add(5 * time.Minute),
|
||||
Event: &types.Event{
|
||||
Repository: types.Repository{
|
||||
Name: "very/repo",
|
||||
|
@ -75,7 +104,7 @@ func TestUpdateApproval(t *testing.T) {
|
|||
NewVersion: "1.2.5",
|
||||
VotesRequired: 1,
|
||||
VotesReceived: 1,
|
||||
Deadline: 0,
|
||||
Deadline: time.Now().Add(5 * time.Minute),
|
||||
Event: &types.Event{
|
||||
Repository: types.Repository{
|
||||
Name: "very/repo",
|
||||
|
@ -106,7 +135,7 @@ func TestUpdateApprovalRejected(t *testing.T) {
|
|||
NewVersion: "1.2.5",
|
||||
VotesRequired: 1,
|
||||
VotesReceived: 0,
|
||||
Deadline: 0,
|
||||
Deadline: time.Now().Add(5 * time.Minute),
|
||||
Event: &types.Event{
|
||||
Repository: types.Repository{
|
||||
Name: "very/repo",
|
||||
|
@ -135,7 +164,7 @@ func TestUpdateApprovalRejected(t *testing.T) {
|
|||
VotesRequired: 1,
|
||||
VotesReceived: 0,
|
||||
Rejected: true,
|
||||
Deadline: 0,
|
||||
Deadline: time.Now().Add(5 * time.Minute),
|
||||
Event: &types.Event{
|
||||
Repository: types.Repository{
|
||||
Name: "very/repo",
|
||||
|
@ -156,7 +185,7 @@ func TestUpdateApprovalRejected(t *testing.T) {
|
|||
VotesRequired: 1,
|
||||
VotesReceived: 1,
|
||||
Rejected: true,
|
||||
Deadline: 0,
|
||||
Deadline: time.Now().Add(5 * time.Minute),
|
||||
Event: &types.Event{
|
||||
Repository: types.Repository{
|
||||
Name: "very/repo",
|
||||
|
@ -188,7 +217,7 @@ func TestApprove(t *testing.T) {
|
|||
Identifier: "xxx/app-1:1.2.5",
|
||||
CurrentVersion: "1.2.3",
|
||||
NewVersion: "1.2.5",
|
||||
Deadline: 0,
|
||||
Deadline: time.Now().Add(5 * time.Minute),
|
||||
VotesRequired: 2,
|
||||
VotesReceived: 0,
|
||||
})
|
||||
|
@ -219,7 +248,7 @@ func TestReject(t *testing.T) {
|
|||
Identifier: "xxx/app-1",
|
||||
CurrentVersion: "1.2.3",
|
||||
NewVersion: "1.2.5",
|
||||
Deadline: 0,
|
||||
Deadline: time.Now().Add(5 * time.Minute),
|
||||
VotesRequired: 2,
|
||||
VotesReceived: 0,
|
||||
})
|
||||
|
@ -239,3 +268,33 @@ func TestReject(t *testing.T) {
|
|||
t.Errorf("unexpected approval to be rejected")
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpire(t *testing.T) {
|
||||
mem := memory.NewMemoryCache(100*time.Millisecond, 100*time.Millisecond, 10*time.Millisecond)
|
||||
|
||||
am := New(mem, codecs.DefaultSerializer())
|
||||
|
||||
err := am.Create(&types.Approval{
|
||||
Provider: types.ProviderTypeKubernetes,
|
||||
Identifier: "xxx/app-1",
|
||||
CurrentVersion: "1.2.3",
|
||||
NewVersion: "1.2.5",
|
||||
Deadline: time.Now().Add(-5 * time.Minute),
|
||||
VotesRequired: 2,
|
||||
VotesReceived: 0,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create approval: %s", err)
|
||||
}
|
||||
|
||||
err = am.expireEntries()
|
||||
if err != nil {
|
||||
t.Errorf("got error while expiring entries: %s", err)
|
||||
}
|
||||
|
||||
_, err = am.Get("xxx/app-1")
|
||||
if err == nil {
|
||||
t.Errorf("expected approval to be deleted but didn't get an error")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue