diff --git a/CHANGELOG.md b/CHANGELOG.md index f94cc21728..6791d0cfbb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ 1. [16547](https://github.com/influxdata/influxdb/pull/16547): Allow trailing newline in credentials file and CLI integration 1. [16545](https://github.com/influxdata/influxdb/pull/16545): Add support for prefixed cursor search to ForwardCursor types 1. [16504](https://github.com/influxdata/influxdb/pull/16504): Add backup and restore +1. [16522](https://github.com/influxdata/influxdb/pull/16522): Introduce resource logger to tasks, buckets and organizations ### UI Improvements diff --git a/bucket.go b/bucket.go index b5fc2dbc23..2eab49672c 100644 --- a/bucket.go +++ b/bucket.go @@ -2,6 +2,7 @@ package influxdb import ( "context" + "fmt" "strings" "time" ) @@ -68,6 +69,7 @@ var ( OpFindBucket = "FindBucket" OpFindBuckets = "FindBuckets" OpCreateBucket = "CreateBucket" + OpPutBucket = "PutBucket" OpUpdateBucket = "UpdateBucket" OpDeleteBucket = "DeleteBucket" ) @@ -153,3 +155,12 @@ func (f BucketFilter) String() string { } return "[" + strings.Join(parts, ", ") + "]" } + +func ErrInternalBucketServiceError(op string, err error) *Error { + return &Error{ + Code: EInternal, + Msg: fmt.Sprintf("unexpected error in buckets; Err: %v", err), + Op: op, + Err: err, + } +} diff --git a/context/token.go b/context/token.go index f880fcc9b2..07cbbd8dfc 100644 --- a/context/token.go +++ b/context/token.go @@ -57,3 +57,12 @@ func GetToken(ctx context.Context) (string, error) { return auth.Token, nil } + +// GetUserID retrieves the user ID from the authorizer on the context. +func GetUserID(ctx context.Context) (influxdb.ID, error) { + a, err := GetAuthorizer(ctx) + if err != nil { + return 0, err + } + return a.GetUserID(), nil +} diff --git a/context/token_test.go b/context/token_test.go index 27caa0667e..20d35c653e 100644 --- a/context/token_test.go +++ b/context/token_test.go @@ -8,6 +8,21 @@ import ( icontext "github.com/influxdata/influxdb/context" ) +func TestGetAuthorizer(t *testing.T) { + ctx := context.Background() + ctx = icontext.SetAuthorizer(ctx, &influxdb.Authorization{ + ID: 1234, + }) + got, err := icontext.GetAuthorizer(ctx) + if err != nil { + t.Errorf("unexpected error while retrieving token: %v", err) + } + + if want := influxdb.ID(1234); got.Identifier() != want { + t.Errorf("GetToken() want %s, got %s", want, got) + } +} + func TestGetToken(t *testing.T) { ctx := context.Background() ctx = icontext.SetAuthorizer(ctx, &influxdb.Authorization{ @@ -22,3 +37,18 @@ func TestGetToken(t *testing.T) { t.Errorf("GetToken() want %s, got %s", want, got) } } + +func TestGetUserID(t *testing.T) { + ctx := context.Background() + ctx = icontext.SetAuthorizer(ctx, &influxdb.Authorization{ + UserID: 5678, + }) + got, err := icontext.GetUserID(ctx) + if err != nil { + t.Errorf("unexpected error while retrieving user ID: %v", err) + } + + if want := influxdb.ID(5678); got != want { + t.Errorf("GetUserID() want %s, got %s", want, got) + } +} diff --git a/kv/bucket.go b/kv/bucket.go index 4cbf3c5aff..a86838d770 100644 --- a/kv/bucket.go +++ b/kv/bucket.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/influxdb" icontext "github.com/influxdata/influxdb/context" "github.com/influxdata/influxdb/kit/tracing" + "github.com/influxdata/influxdb/resource" ) var ( @@ -477,14 +478,28 @@ func (s *Service) createBucket(ctx context.Context, tx Tx, b *influxdb.Bucket) ( } } - if err := s.putBucket(ctx, tx, b); err != nil { + v, err := json.Marshal(b) + if err != nil { + return influxdb.ErrInternalBucketServiceError(influxdb.OpCreateBucket, err) + } + if err := s.putBucket(ctx, tx, b, v); err != nil { return err } if err := s.createUserResourceMappingForOrg(ctx, tx, b.OrgID, b.ID, influxdb.BucketsResourceType); err != nil { return err } - return nil + + uid, _ := icontext.GetUserID(ctx) + return s.audit.Log(resource.Change{ + Type: resource.Create, + ResourceID: b.ID, + ResourceType: influxdb.BucketsResourceType, + OrganizationID: b.OrgID, + UserID: uid, + ResourceBody: v, + Time: time.Now(), + }) } func (s *Service) generateBucketID(ctx context.Context, tx Tx) (influxdb.ID, error) { @@ -494,21 +509,32 @@ func (s *Service) generateBucketID(ctx context.Context, tx Tx) (influxdb.ID, err // PutBucket will put a bucket without setting an ID. func (s *Service) PutBucket(ctx context.Context, b *influxdb.Bucket) error { return s.kv.Update(ctx, func(tx Tx) error { - return s.putBucket(ctx, tx, b) + v, err := json.Marshal(b) + if err != nil { + return influxdb.ErrInternalBucketServiceError(influxdb.OpPutBucket, err) + } + + if err := s.putBucket(ctx, tx, b, v); err != nil { + return err + } + + uid, _ := icontext.GetUserID(ctx) + return s.audit.Log(resource.Change{ + Type: resource.Put, + ResourceID: b.ID, + ResourceType: influxdb.BucketsResourceType, + OrganizationID: b.OrgID, + UserID: uid, + ResourceBody: v, + Time: time.Now(), + }) }) } -func (s *Service) putBucket(ctx context.Context, tx Tx, b *influxdb.Bucket) error { +func (s *Service) putBucket(ctx context.Context, tx Tx, b *influxdb.Bucket, v []byte) error { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() - v, err := json.Marshal(b) - if err != nil { - return &influxdb.Error{ - Err: err, - } - } - encodedID, err := b.ID.Encode() if err != nil { return &influxdb.Error{ @@ -682,18 +708,36 @@ func (s *Service) updateBucket(ctx context.Context, tx Tx, id influxdb.ID, upd i return nil, err } - if err := s.putBucket(ctx, tx, b); err != nil { + v, err := json.Marshal(b) + if err != nil { + return nil, influxdb.ErrInternalBucketServiceError(influxdb.OpUpdateBucket, err) + } + + if err := s.putBucket(ctx, tx, b, v); err != nil { return nil, err } + uid, _ := icontext.GetUserID(ctx) + if err := s.audit.Log(resource.Change{ + Type: resource.Update, + ResourceID: b.ID, + ResourceType: influxdb.BucketsResourceType, + OrganizationID: b.OrgID, + UserID: uid, + ResourceBody: v, + Time: time.Now(), + }); err != nil { + return nil, &influxdb.Error{ + Err: err, + } + } + return b, nil } // DeleteBucket deletes a bucket and prunes it from the index. func (s *Service) DeleteBucket(ctx context.Context, id influxdb.ID) error { return s.kv.Update(ctx, func(tx Tx) error { - var err error - bucket, err := s.findBucketByID(ctx, tx, id) if err != nil && !IsNotFound(err) { return err @@ -706,10 +750,19 @@ func (s *Service) DeleteBucket(ctx context.Context, id influxdb.ID) error { } } - if pe := s.deleteBucket(ctx, tx, id); pe != nil { - err = pe + if err := s.deleteBucket(ctx, tx, id); err != nil { + return err } - return err + + uid, _ := icontext.GetUserID(ctx) + return s.audit.Log(resource.Change{ + Type: resource.Delete, + ResourceID: id, + ResourceType: influxdb.BucketsResourceType, + OrganizationID: bucket.OrgID, + UserID: uid, + Time: time.Now(), + }) }) } diff --git a/kv/org.go b/kv/org.go index bf8f4915c7..808d0be4fe 100644 --- a/kv/org.go +++ b/kv/org.go @@ -7,6 +7,8 @@ import ( "strings" "time" + "github.com/influxdata/influxdb/resource" + "go.uber.org/zap" "github.com/influxdata/influxdb" @@ -293,12 +295,26 @@ func (s *Service) createOrganization(ctx context.Context, tx Tx, o *influxdb.Org } } - if err := s.putOrganization(ctx, tx, o); err != nil { + v, err := json.Marshal(o) + if err != nil { + return influxdb.ErrInternalOrgServiceError(influxdb.OpCreateOrganization, err) + } + if err := s.putOrganization(ctx, tx, o, v); err != nil { return &influxdb.Error{ Err: err, } } - return nil + + uid, _ := icontext.GetUserID(ctx) + return s.audit.Log(resource.Change{ + Type: resource.Create, + ResourceID: o.ID, + ResourceType: influxdb.OrgsResourceType, + OrganizationID: o.ID, + UserID: uid, + ResourceBody: v, + Time: time.Now(), + }) } func (s *Service) generateOrgID(ctx context.Context, tx Tx) (influxdb.ID, error) { @@ -307,22 +323,30 @@ func (s *Service) generateOrgID(ctx context.Context, tx Tx) (influxdb.ID, error) // PutOrganization will put a organization without setting an ID. func (s *Service) PutOrganization(ctx context.Context, o *influxdb.Organization) error { - var err error return s.kv.Update(ctx, func(tx Tx) error { - if pe := s.putOrganization(ctx, tx, o); pe != nil { - err = pe + v, err := json.Marshal(o) + if err != nil { + return influxdb.ErrInternalOrgServiceError(influxdb.OpPutOrganization, err) } - return err + + if err := s.putOrganization(ctx, tx, o, v); err != nil { + return err + } + + uid, _ := icontext.GetUserID(ctx) + return s.audit.Log(resource.Change{ + Type: resource.Put, + ResourceID: o.ID, + ResourceType: influxdb.OrgsResourceType, + OrganizationID: o.ID, + UserID: uid, + ResourceBody: v, + Time: time.Now(), + }) }) } -func (s *Service) putOrganization(ctx context.Context, tx Tx, o *influxdb.Organization) error { - v, err := json.Marshal(o) - if err != nil { - return &influxdb.Error{ - Err: err, - } - } +func (s *Service) putOrganization(ctx context.Context, tx Tx, o *influxdb.Organization, v []byte) error { encodedID, err := o.ID.Encode() if err != nil { return &influxdb.Error{ @@ -452,10 +476,29 @@ func (s *Service) updateOrganization(ctx context.Context, tx Tx, id influxdb.ID, } } - if pe := s.putOrganization(ctx, tx, o); pe != nil { + v, err := json.Marshal(o) + if err != nil { + return nil, influxdb.ErrInternalOrgServiceError(influxdb.OpUpdateOrganization, err) + } + if pe := s.putOrganization(ctx, tx, o, v); pe != nil { return nil, pe } + uid, _ := icontext.GetUserID(ctx) + if err := s.audit.Log(resource.Change{ + Type: resource.Update, + ResourceID: o.ID, + ResourceType: influxdb.OrgsResourceType, + OrganizationID: o.ID, + UserID: uid, + ResourceBody: v, + Time: time.Now(), + }); err != nil { + return nil, &influxdb.Error{ + Err: err, + } + } + return o, nil } @@ -484,7 +527,16 @@ func (s *Service) DeleteOrganization(ctx context.Context, id influxdb.ID) error if pe := s.deleteOrganization(ctx, tx, id); pe != nil { return pe } - return nil + + uid, _ := icontext.GetUserID(ctx) + return s.audit.Log(resource.Change{ + Type: resource.Delete, + ResourceID: id, + ResourceType: influxdb.OrgsResourceType, + OrganizationID: id, + UserID: uid, + Time: time.Now(), + }) }) if err != nil { return &influxdb.Error{ diff --git a/kv/service.go b/kv/service.go index fd3da913c7..252ca373d8 100644 --- a/kv/service.go +++ b/kv/service.go @@ -4,9 +4,12 @@ import ( "context" "time" + "github.com/influxdata/influxdb/resource/noop" + "github.com/benbjohnson/clock" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/rand" + "github.com/influxdata/influxdb/resource" "github.com/influxdata/influxdb/snowflake" "go.uber.org/zap" ) @@ -24,6 +27,7 @@ type Service struct { log *zap.Logger clock clock.Clock Config ServiceConfig + audit resource.Logger IDGenerator influxdb.IDGenerator @@ -52,6 +56,7 @@ func NewService(log *zap.Logger, kv Store, configs ...ServiceConfig) *Service { TokenGenerator: rand.NewTokenGenerator(64), Hash: &Bcrypt{}, kv: kv, + audit: noop.ResourceLogger{}, TimeGenerator: influxdb.RealTimeGenerator{}, checkStore: newCheckStore(), endpointStore: newEndpointStore(), @@ -170,6 +175,11 @@ func (s *Service) Initialize(ctx context.Context) error { }) } +// WithResourceLogger sets the resource audit logger for the service. +func (s *Service) WithResourceLogger(audit resource.Logger) { + s.audit = audit +} + // WithStore sets kv store for the service. // Should only be used in tests for mocking. func (s *Service) WithStore(store Store) { diff --git a/kv/task.go b/kv/task.go index f3dbb1e52d..6cec71ed76 100644 --- a/kv/task.go +++ b/kv/task.go @@ -6,6 +6,8 @@ import ( "strings" "time" + "github.com/influxdata/influxdb/resource" + "github.com/influxdata/influxdb" icontext "github.com/influxdata/influxdb/context" "github.com/influxdata/influxdb/task/backend" @@ -655,6 +657,20 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) OrgID: task.OrganizationID, Permissions: ps, } + + uid, _ := icontext.GetUserID(ctx) + if err := s.audit.Log(resource.Change{ + Type: resource.Create, + ResourceID: task.ID, + ResourceType: influxdb.TasksResourceType, + OrganizationID: task.OrganizationID, + UserID: uid, + ResourceBody: taskBytes, + Time: time.Now(), + }); err != nil { + return nil, err + } + return task, nil } @@ -790,7 +806,25 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf return nil, influxdb.ErrInternalTaskServiceError(err) } - return task, bucket.Put(key, taskBytes) + err = bucket.Put(key, taskBytes) + if err != nil { + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) + } + + uid, _ := icontext.GetUserID(ctx) + if err := s.audit.Log(resource.Change{ + Type: resource.Update, + ResourceID: task.ID, + ResourceType: influxdb.TasksResourceType, + OrganizationID: task.OrganizationID, + UserID: uid, + ResourceBody: taskBytes, + Time: time.Now(), + }); err != nil { + return nil, err + } + + return task, nil } // DeleteTask removes a task by ID and purges all associated data and scheduled runs. @@ -883,7 +917,15 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error { s.log.Info("Error deleting user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err)) } - return nil + uid, _ := icontext.GetUserID(ctx) + return s.audit.Log(resource.Change{ + Type: resource.Delete, + ResourceID: task.ID, + ResourceType: influxdb.TasksResourceType, + OrganizationID: task.OrganizationID, + UserID: uid, + Time: time.Now(), + }) } // FindLogs returns logs for a run. diff --git a/organization.go b/organization.go index 030dd0a4a8..29ff07abee 100644 --- a/organization.go +++ b/organization.go @@ -1,6 +1,9 @@ package influxdb -import "context" +import ( + "context" + "fmt" +) // Organization is an organization. 🎉 type Organization struct { @@ -25,6 +28,7 @@ const ( OpFindOrganization = "FindOrganization" OpFindOrganizations = "FindOrganizations" OpCreateOrganization = "CreateOrganization" + OpPutOrganization = "PutOrganization" OpUpdateOrganization = "UpdateOrganization" OpDeleteOrganization = "DeleteOrganization" ) @@ -70,3 +74,12 @@ type OrganizationFilter struct { Name *string ID *ID } + +func ErrInternalOrgServiceError(op string, err error) *Error { + return &Error{ + Code: EInternal, + Msg: fmt.Sprintf("unexpected error in organizations; Err: %v", err), + Op: op, + Err: err, + } +} diff --git a/resource/noop/resource_logger.go b/resource/noop/resource_logger.go new file mode 100644 index 0000000000..ca080fb6f1 --- /dev/null +++ b/resource/noop/resource_logger.go @@ -0,0 +1,9 @@ +package noop + +import "github.com/influxdata/influxdb/resource" + +type ResourceLogger struct{} + +func (ResourceLogger) Log(resource.Change) error { + return nil +} diff --git a/resource/resource.go b/resource/resource.go new file mode 100644 index 0000000000..fef41b21e0 --- /dev/null +++ b/resource/resource.go @@ -0,0 +1,49 @@ +// Package resource defines an interface for recording changes to InfluxDB resources. +// +// A resource is an entity in our system, e.g. an organization, task or bucket. +// A change includes the creation, update or deletion of a resource. +package resource + +import ( + "time" + + "github.com/influxdata/influxdb" +) + +// Logger records changes to resources. +type Logger interface { + // Log a change to a resource. + Log(Change) error +} + +// Change to a resource. +type Change struct { + // Type of change. + Type ChangeType + // ResourceID of the changed resource. + ResourceID influxdb.ID + // ResourceType that was changed. + ResourceType influxdb.ResourceType + // OrganizationID of the organization owning the changed resource. + OrganizationID influxdb.ID + // UserID of the user changing the resource. + UserID influxdb.ID + // ResourceBody after the change. + ResourceBody []byte + // Time when the resource was changed. + Time time.Time +} + +// Type of change. +type ChangeType string + +const ( + // Create a resource. + Create ChangeType = "create" + // Put a resource. + Put = "put" + // Update a resource. + Update = "update" + // Delete a resource + Delete = "delete" +)