feat: resource log
parent
fa7861d904
commit
b91d778579
|
@ -6,6 +6,7 @@
|
|||
1. [16547](https://github.com/influxdata/influxdb/pull/16547): Allow trailing newline in credentials file and CLI integration
|
||||
1. [16545](https://github.com/influxdata/influxdb/pull/16545): Add support for prefixed cursor search to ForwardCursor types
|
||||
1. [16504](https://github.com/influxdata/influxdb/pull/16504): Add backup and restore
|
||||
1. [16522](https://github.com/influxdata/influxdb/pull/16522): Introduce resource logger to tasks, buckets and organizations
|
||||
|
||||
### UI Improvements
|
||||
|
||||
|
|
11
bucket.go
11
bucket.go
|
@ -2,6 +2,7 @@ package influxdb
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
@ -68,6 +69,7 @@ var (
|
|||
OpFindBucket = "FindBucket"
|
||||
OpFindBuckets = "FindBuckets"
|
||||
OpCreateBucket = "CreateBucket"
|
||||
OpPutBucket = "PutBucket"
|
||||
OpUpdateBucket = "UpdateBucket"
|
||||
OpDeleteBucket = "DeleteBucket"
|
||||
)
|
||||
|
@ -153,3 +155,12 @@ func (f BucketFilter) String() string {
|
|||
}
|
||||
return "[" + strings.Join(parts, ", ") + "]"
|
||||
}
|
||||
|
||||
func ErrInternalBucketServiceError(op string, err error) *Error {
|
||||
return &Error{
|
||||
Code: EInternal,
|
||||
Msg: fmt.Sprintf("unexpected error in buckets; Err: %v", err),
|
||||
Op: op,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,3 +57,12 @@ func GetToken(ctx context.Context) (string, error) {
|
|||
|
||||
return auth.Token, nil
|
||||
}
|
||||
|
||||
// GetUserID retrieves the user ID from the authorizer on the context.
|
||||
func GetUserID(ctx context.Context) (influxdb.ID, error) {
|
||||
a, err := GetAuthorizer(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return a.GetUserID(), nil
|
||||
}
|
||||
|
|
|
@ -8,6 +8,21 @@ import (
|
|||
icontext "github.com/influxdata/influxdb/context"
|
||||
)
|
||||
|
||||
func TestGetAuthorizer(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctx = icontext.SetAuthorizer(ctx, &influxdb.Authorization{
|
||||
ID: 1234,
|
||||
})
|
||||
got, err := icontext.GetAuthorizer(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error while retrieving token: %v", err)
|
||||
}
|
||||
|
||||
if want := influxdb.ID(1234); got.Identifier() != want {
|
||||
t.Errorf("GetToken() want %s, got %s", want, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetToken(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctx = icontext.SetAuthorizer(ctx, &influxdb.Authorization{
|
||||
|
@ -22,3 +37,18 @@ func TestGetToken(t *testing.T) {
|
|||
t.Errorf("GetToken() want %s, got %s", want, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetUserID(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctx = icontext.SetAuthorizer(ctx, &influxdb.Authorization{
|
||||
UserID: 5678,
|
||||
})
|
||||
got, err := icontext.GetUserID(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error while retrieving user ID: %v", err)
|
||||
}
|
||||
|
||||
if want := influxdb.ID(5678); got != want {
|
||||
t.Errorf("GetUserID() want %s, got %s", want, got)
|
||||
}
|
||||
}
|
||||
|
|
87
kv/bucket.go
87
kv/bucket.go
|
@ -9,6 +9,7 @@ import (
|
|||
"github.com/influxdata/influxdb"
|
||||
icontext "github.com/influxdata/influxdb/context"
|
||||
"github.com/influxdata/influxdb/kit/tracing"
|
||||
"github.com/influxdata/influxdb/resource"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -477,14 +478,28 @@ func (s *Service) createBucket(ctx context.Context, tx Tx, b *influxdb.Bucket) (
|
|||
}
|
||||
}
|
||||
|
||||
if err := s.putBucket(ctx, tx, b); err != nil {
|
||||
v, err := json.Marshal(b)
|
||||
if err != nil {
|
||||
return influxdb.ErrInternalBucketServiceError(influxdb.OpCreateBucket, err)
|
||||
}
|
||||
if err := s.putBucket(ctx, tx, b, v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.createUserResourceMappingForOrg(ctx, tx, b.OrgID, b.ID, influxdb.BucketsResourceType); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
uid, _ := icontext.GetUserID(ctx)
|
||||
return s.audit.Log(resource.Change{
|
||||
Type: resource.Create,
|
||||
ResourceID: b.ID,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
OrganizationID: b.OrgID,
|
||||
UserID: uid,
|
||||
ResourceBody: v,
|
||||
Time: time.Now(),
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Service) generateBucketID(ctx context.Context, tx Tx) (influxdb.ID, error) {
|
||||
|
@ -494,21 +509,32 @@ func (s *Service) generateBucketID(ctx context.Context, tx Tx) (influxdb.ID, err
|
|||
// PutBucket will put a bucket without setting an ID.
|
||||
func (s *Service) PutBucket(ctx context.Context, b *influxdb.Bucket) error {
|
||||
return s.kv.Update(ctx, func(tx Tx) error {
|
||||
return s.putBucket(ctx, tx, b)
|
||||
v, err := json.Marshal(b)
|
||||
if err != nil {
|
||||
return influxdb.ErrInternalBucketServiceError(influxdb.OpPutBucket, err)
|
||||
}
|
||||
|
||||
if err := s.putBucket(ctx, tx, b, v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
uid, _ := icontext.GetUserID(ctx)
|
||||
return s.audit.Log(resource.Change{
|
||||
Type: resource.Put,
|
||||
ResourceID: b.ID,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
OrganizationID: b.OrgID,
|
||||
UserID: uid,
|
||||
ResourceBody: v,
|
||||
Time: time.Now(),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Service) putBucket(ctx context.Context, tx Tx, b *influxdb.Bucket) error {
|
||||
func (s *Service) putBucket(ctx context.Context, tx Tx, b *influxdb.Bucket, v []byte) error {
|
||||
span, _ := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
v, err := json.Marshal(b)
|
||||
if err != nil {
|
||||
return &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
encodedID, err := b.ID.Encode()
|
||||
if err != nil {
|
||||
return &influxdb.Error{
|
||||
|
@ -682,18 +708,36 @@ func (s *Service) updateBucket(ctx context.Context, tx Tx, id influxdb.ID, upd i
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if err := s.putBucket(ctx, tx, b); err != nil {
|
||||
v, err := json.Marshal(b)
|
||||
if err != nil {
|
||||
return nil, influxdb.ErrInternalBucketServiceError(influxdb.OpUpdateBucket, err)
|
||||
}
|
||||
|
||||
if err := s.putBucket(ctx, tx, b, v); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
uid, _ := icontext.GetUserID(ctx)
|
||||
if err := s.audit.Log(resource.Change{
|
||||
Type: resource.Update,
|
||||
ResourceID: b.ID,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
OrganizationID: b.OrgID,
|
||||
UserID: uid,
|
||||
ResourceBody: v,
|
||||
Time: time.Now(),
|
||||
}); err != nil {
|
||||
return nil, &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// DeleteBucket deletes a bucket and prunes it from the index.
|
||||
func (s *Service) DeleteBucket(ctx context.Context, id influxdb.ID) error {
|
||||
return s.kv.Update(ctx, func(tx Tx) error {
|
||||
var err error
|
||||
|
||||
bucket, err := s.findBucketByID(ctx, tx, id)
|
||||
if err != nil && !IsNotFound(err) {
|
||||
return err
|
||||
|
@ -706,10 +750,19 @@ func (s *Service) DeleteBucket(ctx context.Context, id influxdb.ID) error {
|
|||
}
|
||||
}
|
||||
|
||||
if pe := s.deleteBucket(ctx, tx, id); pe != nil {
|
||||
err = pe
|
||||
if err := s.deleteBucket(ctx, tx, id); err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
|
||||
uid, _ := icontext.GetUserID(ctx)
|
||||
return s.audit.Log(resource.Change{
|
||||
Type: resource.Delete,
|
||||
ResourceID: id,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
OrganizationID: bucket.OrgID,
|
||||
UserID: uid,
|
||||
Time: time.Now(),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
|
82
kv/org.go
82
kv/org.go
|
@ -7,6 +7,8 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/resource"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
|
@ -293,12 +295,26 @@ func (s *Service) createOrganization(ctx context.Context, tx Tx, o *influxdb.Org
|
|||
}
|
||||
}
|
||||
|
||||
if err := s.putOrganization(ctx, tx, o); err != nil {
|
||||
v, err := json.Marshal(o)
|
||||
if err != nil {
|
||||
return influxdb.ErrInternalOrgServiceError(influxdb.OpCreateOrganization, err)
|
||||
}
|
||||
if err := s.putOrganization(ctx, tx, o, v); err != nil {
|
||||
return &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
uid, _ := icontext.GetUserID(ctx)
|
||||
return s.audit.Log(resource.Change{
|
||||
Type: resource.Create,
|
||||
ResourceID: o.ID,
|
||||
ResourceType: influxdb.OrgsResourceType,
|
||||
OrganizationID: o.ID,
|
||||
UserID: uid,
|
||||
ResourceBody: v,
|
||||
Time: time.Now(),
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Service) generateOrgID(ctx context.Context, tx Tx) (influxdb.ID, error) {
|
||||
|
@ -307,22 +323,30 @@ func (s *Service) generateOrgID(ctx context.Context, tx Tx) (influxdb.ID, error)
|
|||
|
||||
// PutOrganization will put a organization without setting an ID.
|
||||
func (s *Service) PutOrganization(ctx context.Context, o *influxdb.Organization) error {
|
||||
var err error
|
||||
return s.kv.Update(ctx, func(tx Tx) error {
|
||||
if pe := s.putOrganization(ctx, tx, o); pe != nil {
|
||||
err = pe
|
||||
v, err := json.Marshal(o)
|
||||
if err != nil {
|
||||
return influxdb.ErrInternalOrgServiceError(influxdb.OpPutOrganization, err)
|
||||
}
|
||||
return err
|
||||
|
||||
if err := s.putOrganization(ctx, tx, o, v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
uid, _ := icontext.GetUserID(ctx)
|
||||
return s.audit.Log(resource.Change{
|
||||
Type: resource.Put,
|
||||
ResourceID: o.ID,
|
||||
ResourceType: influxdb.OrgsResourceType,
|
||||
OrganizationID: o.ID,
|
||||
UserID: uid,
|
||||
ResourceBody: v,
|
||||
Time: time.Now(),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Service) putOrganization(ctx context.Context, tx Tx, o *influxdb.Organization) error {
|
||||
v, err := json.Marshal(o)
|
||||
if err != nil {
|
||||
return &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
func (s *Service) putOrganization(ctx context.Context, tx Tx, o *influxdb.Organization, v []byte) error {
|
||||
encodedID, err := o.ID.Encode()
|
||||
if err != nil {
|
||||
return &influxdb.Error{
|
||||
|
@ -452,10 +476,29 @@ func (s *Service) updateOrganization(ctx context.Context, tx Tx, id influxdb.ID,
|
|||
}
|
||||
}
|
||||
|
||||
if pe := s.putOrganization(ctx, tx, o); pe != nil {
|
||||
v, err := json.Marshal(o)
|
||||
if err != nil {
|
||||
return nil, influxdb.ErrInternalOrgServiceError(influxdb.OpUpdateOrganization, err)
|
||||
}
|
||||
if pe := s.putOrganization(ctx, tx, o, v); pe != nil {
|
||||
return nil, pe
|
||||
}
|
||||
|
||||
uid, _ := icontext.GetUserID(ctx)
|
||||
if err := s.audit.Log(resource.Change{
|
||||
Type: resource.Update,
|
||||
ResourceID: o.ID,
|
||||
ResourceType: influxdb.OrgsResourceType,
|
||||
OrganizationID: o.ID,
|
||||
UserID: uid,
|
||||
ResourceBody: v,
|
||||
Time: time.Now(),
|
||||
}); err != nil {
|
||||
return nil, &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
return o, nil
|
||||
}
|
||||
|
||||
|
@ -484,7 +527,16 @@ func (s *Service) DeleteOrganization(ctx context.Context, id influxdb.ID) error
|
|||
if pe := s.deleteOrganization(ctx, tx, id); pe != nil {
|
||||
return pe
|
||||
}
|
||||
return nil
|
||||
|
||||
uid, _ := icontext.GetUserID(ctx)
|
||||
return s.audit.Log(resource.Change{
|
||||
Type: resource.Delete,
|
||||
ResourceID: id,
|
||||
ResourceType: influxdb.OrgsResourceType,
|
||||
OrganizationID: id,
|
||||
UserID: uid,
|
||||
Time: time.Now(),
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
return &influxdb.Error{
|
||||
|
|
|
@ -4,9 +4,12 @@ import (
|
|||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/resource/noop"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/rand"
|
||||
"github.com/influxdata/influxdb/resource"
|
||||
"github.com/influxdata/influxdb/snowflake"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -24,6 +27,7 @@ type Service struct {
|
|||
log *zap.Logger
|
||||
clock clock.Clock
|
||||
Config ServiceConfig
|
||||
audit resource.Logger
|
||||
|
||||
IDGenerator influxdb.IDGenerator
|
||||
|
||||
|
@ -52,6 +56,7 @@ func NewService(log *zap.Logger, kv Store, configs ...ServiceConfig) *Service {
|
|||
TokenGenerator: rand.NewTokenGenerator(64),
|
||||
Hash: &Bcrypt{},
|
||||
kv: kv,
|
||||
audit: noop.ResourceLogger{},
|
||||
TimeGenerator: influxdb.RealTimeGenerator{},
|
||||
checkStore: newCheckStore(),
|
||||
endpointStore: newEndpointStore(),
|
||||
|
@ -170,6 +175,11 @@ func (s *Service) Initialize(ctx context.Context) error {
|
|||
})
|
||||
}
|
||||
|
||||
// WithResourceLogger sets the resource audit logger for the service.
|
||||
func (s *Service) WithResourceLogger(audit resource.Logger) {
|
||||
s.audit = audit
|
||||
}
|
||||
|
||||
// WithStore sets kv store for the service.
|
||||
// Should only be used in tests for mocking.
|
||||
func (s *Service) WithStore(store Store) {
|
||||
|
|
46
kv/task.go
46
kv/task.go
|
@ -6,6 +6,8 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/resource"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
icontext "github.com/influxdata/influxdb/context"
|
||||
"github.com/influxdata/influxdb/task/backend"
|
||||
|
@ -655,6 +657,20 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
|
|||
OrgID: task.OrganizationID,
|
||||
Permissions: ps,
|
||||
}
|
||||
|
||||
uid, _ := icontext.GetUserID(ctx)
|
||||
if err := s.audit.Log(resource.Change{
|
||||
Type: resource.Create,
|
||||
ResourceID: task.ID,
|
||||
ResourceType: influxdb.TasksResourceType,
|
||||
OrganizationID: task.OrganizationID,
|
||||
UserID: uid,
|
||||
ResourceBody: taskBytes,
|
||||
Time: time.Now(),
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return task, nil
|
||||
}
|
||||
|
||||
|
@ -790,7 +806,25 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
|
|||
return nil, influxdb.ErrInternalTaskServiceError(err)
|
||||
}
|
||||
|
||||
return task, bucket.Put(key, taskBytes)
|
||||
err = bucket.Put(key, taskBytes)
|
||||
if err != nil {
|
||||
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||
}
|
||||
|
||||
uid, _ := icontext.GetUserID(ctx)
|
||||
if err := s.audit.Log(resource.Change{
|
||||
Type: resource.Update,
|
||||
ResourceID: task.ID,
|
||||
ResourceType: influxdb.TasksResourceType,
|
||||
OrganizationID: task.OrganizationID,
|
||||
UserID: uid,
|
||||
ResourceBody: taskBytes,
|
||||
Time: time.Now(),
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return task, nil
|
||||
}
|
||||
|
||||
// DeleteTask removes a task by ID and purges all associated data and scheduled runs.
|
||||
|
@ -883,7 +917,15 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error {
|
|||
s.log.Info("Error deleting user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err))
|
||||
}
|
||||
|
||||
return nil
|
||||
uid, _ := icontext.GetUserID(ctx)
|
||||
return s.audit.Log(resource.Change{
|
||||
Type: resource.Delete,
|
||||
ResourceID: task.ID,
|
||||
ResourceType: influxdb.TasksResourceType,
|
||||
OrganizationID: task.OrganizationID,
|
||||
UserID: uid,
|
||||
Time: time.Now(),
|
||||
})
|
||||
}
|
||||
|
||||
// FindLogs returns logs for a run.
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package influxdb
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Organization is an organization. 🎉
|
||||
type Organization struct {
|
||||
|
@ -25,6 +28,7 @@ const (
|
|||
OpFindOrganization = "FindOrganization"
|
||||
OpFindOrganizations = "FindOrganizations"
|
||||
OpCreateOrganization = "CreateOrganization"
|
||||
OpPutOrganization = "PutOrganization"
|
||||
OpUpdateOrganization = "UpdateOrganization"
|
||||
OpDeleteOrganization = "DeleteOrganization"
|
||||
)
|
||||
|
@ -70,3 +74,12 @@ type OrganizationFilter struct {
|
|||
Name *string
|
||||
ID *ID
|
||||
}
|
||||
|
||||
func ErrInternalOrgServiceError(op string, err error) *Error {
|
||||
return &Error{
|
||||
Code: EInternal,
|
||||
Msg: fmt.Sprintf("unexpected error in organizations; Err: %v", err),
|
||||
Op: op,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
package noop
|
||||
|
||||
import "github.com/influxdata/influxdb/resource"
|
||||
|
||||
type ResourceLogger struct{}
|
||||
|
||||
func (ResourceLogger) Log(resource.Change) error {
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
// Package resource defines an interface for recording changes to InfluxDB resources.
|
||||
//
|
||||
// A resource is an entity in our system, e.g. an organization, task or bucket.
|
||||
// A change includes the creation, update or deletion of a resource.
|
||||
package resource
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
)
|
||||
|
||||
// Logger records changes to resources.
|
||||
type Logger interface {
|
||||
// Log a change to a resource.
|
||||
Log(Change) error
|
||||
}
|
||||
|
||||
// Change to a resource.
|
||||
type Change struct {
|
||||
// Type of change.
|
||||
Type ChangeType
|
||||
// ResourceID of the changed resource.
|
||||
ResourceID influxdb.ID
|
||||
// ResourceType that was changed.
|
||||
ResourceType influxdb.ResourceType
|
||||
// OrganizationID of the organization owning the changed resource.
|
||||
OrganizationID influxdb.ID
|
||||
// UserID of the user changing the resource.
|
||||
UserID influxdb.ID
|
||||
// ResourceBody after the change.
|
||||
ResourceBody []byte
|
||||
// Time when the resource was changed.
|
||||
Time time.Time
|
||||
}
|
||||
|
||||
// Type of change.
|
||||
type ChangeType string
|
||||
|
||||
const (
|
||||
// Create a resource.
|
||||
Create ChangeType = "create"
|
||||
// Put a resource.
|
||||
Put = "put"
|
||||
// Update a resource.
|
||||
Update = "update"
|
||||
// Delete a resource
|
||||
Delete = "delete"
|
||||
)
|
Loading…
Reference in New Issue