using database for approvals
parent
1605ff52ee
commit
f133d30d02
|
@ -2,13 +2,15 @@ package approvals
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/keel-hq/keel/cache"
|
||||
"github.com/google/uuid"
|
||||
"github.com/keel-hq/keel/pkg/store"
|
||||
"github.com/keel-hq/keel/types"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
@ -36,7 +38,8 @@ type Manager interface {
|
|||
|
||||
Get(identifier string) (*types.Approval, error)
|
||||
List() ([]*types.Approval, error)
|
||||
Delete(identifier string) error
|
||||
Delete(*types.Approval) error
|
||||
Archive(identifier string) error
|
||||
|
||||
StartExpiryService(ctx context.Context) error
|
||||
}
|
||||
|
@ -55,7 +58,9 @@ const (
|
|||
type DefaultManager struct {
|
||||
// cache is used to store approvals, key example:
|
||||
// approvals/<provider name>/<identifier>
|
||||
cache cache.Cache
|
||||
// cache cache.Cache
|
||||
|
||||
store store.Store
|
||||
|
||||
// subscriber channels
|
||||
channels map[uint32]chan *types.Approval
|
||||
|
@ -68,10 +73,16 @@ type DefaultManager struct {
|
|||
subMu *sync.RWMutex
|
||||
}
|
||||
|
||||
type Opts struct {
|
||||
Store store.Store
|
||||
// Cache cache.Cache
|
||||
}
|
||||
|
||||
// New create new instance of default manager
|
||||
func New(cache cache.Cache) *DefaultManager {
|
||||
func New(opts *Opts) *DefaultManager {
|
||||
man := &DefaultManager{
|
||||
cache: cache,
|
||||
// cache: opts.Cache,
|
||||
store: opts.Store,
|
||||
channels: make(map[uint32]chan *types.Approval),
|
||||
approvedCh: make(map[uint32]chan *types.Approval),
|
||||
index: 0,
|
||||
|
@ -110,30 +121,25 @@ func (m *DefaultManager) StartExpiryService(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (m *DefaultManager) expireEntries() error {
|
||||
approvals, err := m.cache.List(ApprovalsPrefix + "/")
|
||||
approvals, err := m.store.ListApprovals(&types.GetApprovalQuery{
|
||||
Archived: false,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for k, v := range approvals {
|
||||
var approval types.Approval
|
||||
err = json.Unmarshal(v, &approval)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"identifier": k,
|
||||
}).Error("approvals.expireEntries: failed to decode approval into value")
|
||||
continue
|
||||
}
|
||||
|
||||
for _, approval := range approvals {
|
||||
if approval.Expired() {
|
||||
err = m.Delete(approval.Identifier)
|
||||
err = m.Delete(approval)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"identifier": k,
|
||||
"error": err,
|
||||
// "identifier": k,
|
||||
}).Error("approvals.expireEntries: failed to delete expired approval")
|
||||
continue
|
||||
}
|
||||
|
||||
m.addAuditEntry(approval, types.AuditActionApprovalExpired, "")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -210,40 +216,9 @@ func (m *DefaultManager) publishApproved(approval *types.Approval) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Create - creates new approval request and publishes to all subscribers
|
||||
func (m *DefaultManager) Create(r *types.Approval) error {
|
||||
_, err := m.Get(r.Identifier)
|
||||
if err == nil {
|
||||
return ErrApprovalAlreadyExists
|
||||
}
|
||||
|
||||
r.CreatedAt = time.Now()
|
||||
r.UpdatedAt = time.Now()
|
||||
|
||||
bts, err := json.Marshal(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = m.cache.Put(getKey(r.Identifier), bts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return m.publishRequest(r)
|
||||
}
|
||||
|
||||
// Update - update approval
|
||||
func (m *DefaultManager) Update(r *types.Approval) error {
|
||||
existing, err := m.Get(r.Identifier)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.CreatedAt = existing.CreatedAt
|
||||
r.UpdatedAt = time.Now()
|
||||
|
||||
bts, err := json.Marshal(r)
|
||||
_, err := m.Get(r.Identifier)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -259,7 +234,7 @@ func (m *DefaultManager) Update(r *types.Approval) error {
|
|||
}
|
||||
}
|
||||
|
||||
return m.cache.Put(getKey(r.Identifier), bts)
|
||||
return m.store.UpdateApproval(r)
|
||||
}
|
||||
|
||||
// Approve - increase VotesReceived by 1 and returns updated version
|
||||
|
@ -283,7 +258,7 @@ func (m *DefaultManager) Approve(identifier, voter string) (*types.Approval, err
|
|||
}
|
||||
}
|
||||
|
||||
existing.Voters = append(existing.Voters, voter)
|
||||
existing.AddVoter(voter)
|
||||
existing.VotesReceived++
|
||||
|
||||
err = m.Update(existing)
|
||||
|
@ -295,6 +270,8 @@ func (m *DefaultManager) Approve(identifier, voter string) (*types.Approval, err
|
|||
return nil, err
|
||||
}
|
||||
|
||||
m.addAuditEntry(existing, types.AuditActionApprovalApproved, voter)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"identifier": identifier,
|
||||
}).Info("approvals.manager: approved")
|
||||
|
@ -302,6 +279,35 @@ func (m *DefaultManager) Approve(identifier, voter string) (*types.Approval, err
|
|||
return existing, nil
|
||||
}
|
||||
|
||||
func (m *DefaultManager) addAuditEntry(approval *types.Approval, action string, voter string) {
|
||||
|
||||
entry := &types.AuditLog{
|
||||
ID: uuid.New().String(),
|
||||
AccountID: voter,
|
||||
Username: voter,
|
||||
Action: action,
|
||||
ResourceKind: types.AuditResourceKindApproval,
|
||||
Identifier: approval.Identifier,
|
||||
}
|
||||
|
||||
entry.SetMetadata(map[string]string{
|
||||
"provider": approval.Provider.String(),
|
||||
"approval_id": approval.ID,
|
||||
"new_version": approval.NewVersion,
|
||||
"current_version": approval.CurrentVersion,
|
||||
"votes_required": strconv.Itoa(approval.VotesReceived),
|
||||
"votes_received": strconv.Itoa(approval.VotesReceived),
|
||||
})
|
||||
|
||||
_, err := m.store.CreateAuditLog(entry)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"module": "approvals",
|
||||
}).Error("failed to create audit log")
|
||||
}
|
||||
}
|
||||
|
||||
// Reject - rejects approval (marks rejected=true), approval will not be valid even if it
|
||||
// collects required votes
|
||||
func (m *DefaultManager) Reject(identifier string) (*types.Approval, error) {
|
||||
|
@ -320,47 +326,80 @@ func (m *DefaultManager) Reject(identifier string) (*types.Approval, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
m.addAuditEntry(existing, types.AuditActionApprovalRejected, "")
|
||||
|
||||
return existing, nil
|
||||
}
|
||||
|
||||
// Get - get specified approval
|
||||
// Get - get specified, not archived approval
|
||||
func (m *DefaultManager) Get(identifier string) (*types.Approval, error) {
|
||||
bts, err := m.cache.Get(getKey(identifier))
|
||||
|
||||
a, err := m.store.GetApproval(&types.GetApprovalQuery{
|
||||
Identifier: identifier,
|
||||
Archived: false,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var approval types.Approval
|
||||
err = json.Unmarshal(bts, &approval)
|
||||
return &approval, err
|
||||
// if it's archived, don't display it
|
||||
if a.Archived {
|
||||
return nil, store.ErrRecordNotFound
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
// List - list approvals
|
||||
// List - list not archived approvals (for expiration service)
|
||||
func (m *DefaultManager) List() ([]*types.Approval, error) {
|
||||
bts, err := m.cache.List(ApprovalsPrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var approvals []*types.Approval
|
||||
for _, v := range bts {
|
||||
var approval types.Approval
|
||||
err = json.Unmarshal(v, &approval)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"payload": string(v),
|
||||
}).Error("approvals.manager: failed to decode payload")
|
||||
continue
|
||||
}
|
||||
approvals = append(approvals, &approval)
|
||||
}
|
||||
return approvals, nil
|
||||
approvals, err := m.store.ListApprovals(&types.GetApprovalQuery{
|
||||
Archived: false,
|
||||
})
|
||||
return approvals, err
|
||||
}
|
||||
|
||||
// Delete - delete specified approval
|
||||
func (m *DefaultManager) Delete(identifier string) error {
|
||||
return m.cache.Delete(getKey(identifier))
|
||||
func (m *DefaultManager) Delete(approval *types.Approval) error {
|
||||
existing, err := m.store.GetApproval(&types.GetApprovalQuery{
|
||||
ID: approval.ID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.addAuditEntry(existing, types.AuditActionDeleted, "")
|
||||
|
||||
return m.store.DeleteApproval(existing)
|
||||
}
|
||||
|
||||
func (m *DefaultManager) Archive(identifier string) error {
|
||||
existing, err := m.Get(identifier)
|
||||
if err != nil {
|
||||
return fmt.Errorf("approval not found: %s", err)
|
||||
}
|
||||
existing.Archived = true
|
||||
|
||||
m.addAuditEntry(existing, types.AuditActionApprovalArchived, "")
|
||||
|
||||
return m.store.UpdateApproval(existing)
|
||||
}
|
||||
|
||||
// Create - creates new approval request and publishes to all subscribers
|
||||
func (m *DefaultManager) Create(r *types.Approval) error {
|
||||
_, err := m.Get(r.Identifier)
|
||||
if err == nil {
|
||||
return ErrApprovalAlreadyExists
|
||||
}
|
||||
|
||||
r.CreatedAt = time.Now()
|
||||
r.UpdatedAt = time.Now()
|
||||
|
||||
created, err := m.store.CreateApproval(r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create approval: %s", err)
|
||||
}
|
||||
|
||||
return m.publishRequest(created)
|
||||
}
|
||||
|
||||
func getKey(identifier string) string {
|
||||
|
|
|
@ -170,7 +170,11 @@ func IsApproval(eventUser string, eventText string) (resp *ApprovalResponse, ok
|
|||
}
|
||||
|
||||
func RemoveApprovalHandler(identifier string, approvalsManager approvals.Manager) string {
|
||||
err := approvalsManager.Delete(identifier)
|
||||
approval, err := approvalsManager.Get(identifier)
|
||||
if err != nil {
|
||||
return fmt.Sprintf("approval with identifier '%s' was not found", identifier)
|
||||
}
|
||||
err = approvalsManager.Delete(approval)
|
||||
if err != nil {
|
||||
return fmt.Sprintf("failed to remove '%s' approval: %s.", identifier, err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue