feat(influxdb): Add system buckets on org creation
* Only allow users to create user buckets * Only accept bucket creation parameters on postpull/15241/head
parent
c04249ec78
commit
2e0749b3ba
|
@ -105,12 +105,6 @@ func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketF
|
|||
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
||||
buckets := bs[:0]
|
||||
for _, b := range bs {
|
||||
// temporary hack for system buckets
|
||||
if b.IsSystem() {
|
||||
buckets = append(buckets, b)
|
||||
continue
|
||||
}
|
||||
|
||||
err := authorizeReadBucket(ctx, b.OrgID, b.ID)
|
||||
if err != nil && influxdb.ErrorCode(err) != influxdb.EUnauthorized {
|
||||
return nil, 0, err
|
||||
|
|
19
bucket.go
19
bucket.go
|
@ -6,20 +6,19 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// TasksSystemBucketID and MonitoringSystemBucketID are IDs that are reserved for system buckets.
|
||||
// If any system bucket IDs are added, Bucket.IsSystem must be updated to include them.
|
||||
const (
|
||||
// TasksSystemBucketID is the fixed ID for our tasks system bucket
|
||||
TasksSystemBucketID = ID(10)
|
||||
// MonitoringSystemBucketID is the fixed ID for our monitoring system bucket
|
||||
MonitoringSystemBucketID = ID(11)
|
||||
|
||||
// BucketTypeUser is a user created bucket
|
||||
BucketTypeUser = BucketType(0)
|
||||
// BucketTypeSystem is an internally created bucket that cannot be deleted/renamed.
|
||||
BucketTypeSystem = BucketType(1)
|
||||
)
|
||||
|
||||
// Bucket names constants
|
||||
const (
|
||||
TasksSystemBucketName = "_tasks"
|
||||
MonitoringSystemBucketName = "_monitoring"
|
||||
)
|
||||
|
||||
// InfiniteRetention is default infinite retention period.
|
||||
const InfiniteRetention = 0
|
||||
|
||||
|
@ -46,12 +45,6 @@ func (bt BucketType) String() string {
|
|||
return "user"
|
||||
}
|
||||
|
||||
// TODO(jade): move this logic to a type set directly on Bucket.
|
||||
// IsSystem returns true if a bucket is a known system bucket
|
||||
func (b *Bucket) IsSystem() bool {
|
||||
return b.ID == TasksSystemBucketID || b.ID == MonitoringSystemBucketID
|
||||
}
|
||||
|
||||
// ops for buckets error and buckets op logs.
|
||||
var (
|
||||
OpFindBucketByID = "FindBucketByID"
|
||||
|
|
|
@ -148,6 +148,20 @@ type retentionRule struct {
|
|||
EverySeconds int64 `json:"everySeconds"`
|
||||
}
|
||||
|
||||
func (rr *retentionRule) RetentionPeriod() (time.Duration, error) {
|
||||
var t time.Duration
|
||||
|
||||
t = time.Duration(rr.EverySeconds) * time.Second
|
||||
if t < time.Second {
|
||||
return t, &influxdb.Error{
|
||||
Code: influxdb.EUnprocessableEntity,
|
||||
Msg: "expiration seconds must be greater than or equal to one second",
|
||||
}
|
||||
}
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (b *bucket) toInfluxDB() (*influxdb.Bucket, error) {
|
||||
if b == nil {
|
||||
return nil, nil
|
||||
|
@ -217,13 +231,12 @@ func (b *bucketUpdate) toInfluxDB() (*influxdb.BucketUpdate, error) {
|
|||
|
||||
// For now, only use a single retention rule.
|
||||
var d time.Duration
|
||||
var err error
|
||||
|
||||
if len(b.RetentionRules) > 0 {
|
||||
d = time.Duration(b.RetentionRules[0].EverySeconds) * time.Second
|
||||
if d < time.Second {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EUnprocessableEntity,
|
||||
Msg: "expiration seconds must be greater than or equal to one second",
|
||||
}
|
||||
d, err = b.RetentionRules[0].RetentionPeriod()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -309,24 +322,34 @@ func (h *BucketHandler) handlePostBucket(w http.ResponseWriter, r *http.Request)
|
|||
return
|
||||
}
|
||||
|
||||
if err := h.BucketService.CreateBucket(ctx, req.Bucket); err != nil {
|
||||
bucket, err := req.toInfluxDB()
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
h.Logger.Debug("bucket created", zap.String("bucket", fmt.Sprint(req.Bucket)))
|
||||
|
||||
if err := encodeResponse(ctx, w, http.StatusCreated, newBucketResponse(req.Bucket, []*influxdb.Label{})); err != nil {
|
||||
if err := h.BucketService.CreateBucket(ctx, bucket); err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
h.Logger.Debug("bucket created", zap.String("bucket", fmt.Sprint(bucket)))
|
||||
|
||||
if err := encodeResponse(ctx, w, http.StatusCreated, newBucketResponse(bucket, []*influxdb.Label{})); err != nil {
|
||||
logEncodingError(h.Logger, r, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type postBucketRequest struct {
|
||||
Bucket *influxdb.Bucket
|
||||
OrgID influxdb.ID `json:"orgID,omitempty"`
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description"`
|
||||
RetentionPolicyName string `json:"rp,omitempty"` // This to support v1 sources
|
||||
RetentionRules []retentionRule `json:"retentionRules"`
|
||||
}
|
||||
|
||||
func (b postBucketRequest) Validate() error {
|
||||
if !b.Bucket.OrgID.Valid() {
|
||||
if !b.OrgID.Valid() {
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: "bucket requires an organization",
|
||||
|
@ -336,8 +359,30 @@ func (b postBucketRequest) Validate() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (b postBucketRequest) toInfluxDB() (*influxdb.Bucket, error) {
|
||||
var dur time.Duration // zero value implies infinite retention policy
|
||||
var err error
|
||||
|
||||
// Only support a single retention period for the moment
|
||||
if len(b.RetentionRules) > 0 {
|
||||
dur, err = b.RetentionRules[0].RetentionPeriod()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &influxdb.Bucket{
|
||||
OrgID: b.OrgID,
|
||||
Description: b.Description,
|
||||
Name: b.Name,
|
||||
Type: influxdb.BucketTypeUser,
|
||||
RetentionPolicyName: b.RetentionPolicyName,
|
||||
RetentionPeriod: dur,
|
||||
}, err
|
||||
}
|
||||
|
||||
func decodePostBucketRequest(ctx context.Context, r *http.Request) (*postBucketRequest, error) {
|
||||
b := &bucket{}
|
||||
b := &postBucketRequest{}
|
||||
if err := json.NewDecoder(r.Body).Decode(b); err != nil {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
|
@ -345,16 +390,7 @@ func decodePostBucketRequest(ctx context.Context, r *http.Request) (*postBucketR
|
|||
}
|
||||
}
|
||||
|
||||
pb, err := b.toInfluxDB()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req := &postBucketRequest{
|
||||
Bucket: pb,
|
||||
}
|
||||
|
||||
return req, req.Validate()
|
||||
return b, b.Validate()
|
||||
}
|
||||
|
||||
// handleGetBucket is the HTTP handler for the GET /api/v2/buckets/:id route.
|
||||
|
|
|
@ -3274,7 +3274,7 @@ paths:
|
|||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Bucket"
|
||||
$ref: "#/components/schemas/PostBucketRequest"
|
||||
responses:
|
||||
'201':
|
||||
description: Bucket created
|
||||
|
@ -6763,6 +6763,34 @@ components:
|
|||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/Authorization"
|
||||
PostBucketRequest:
|
||||
properties:
|
||||
orgID:
|
||||
type: string
|
||||
name:
|
||||
type: string
|
||||
description:
|
||||
type: string
|
||||
rp:
|
||||
type: string
|
||||
retentionRules:
|
||||
type: array
|
||||
description: Rules to expire or retain data. No rules means data never expires.
|
||||
items:
|
||||
type: object
|
||||
properties:
|
||||
type:
|
||||
type: string
|
||||
default: expire
|
||||
enum:
|
||||
- expire
|
||||
everySeconds:
|
||||
type: integer
|
||||
description: Duration in seconds for how long data will be kept in the database.
|
||||
example: 86400
|
||||
minimum: 1
|
||||
required: [type, everySeconds]
|
||||
required: [name, retentionRules]
|
||||
Bucket:
|
||||
properties:
|
||||
links:
|
||||
|
|
|
@ -174,16 +174,6 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
|
|||
bucket = b
|
||||
}
|
||||
|
||||
// TODO(jade): remove this after system buckets issue is resolved
|
||||
if bucket.IsSystem() {
|
||||
h.HandleHTTPError(ctx, &influxdb.Error{
|
||||
Code: influxdb.EForbidden,
|
||||
Op: "http/handleWrite",
|
||||
Msg: fmt.Sprintf("cannot write to internal bucket %s", bucket.Name),
|
||||
}, w)
|
||||
return
|
||||
}
|
||||
|
||||
p, err := influxdb.NewPermissionAtID(bucket.ID, influxdb.WriteAction, influxdb.BucketsResourceType, org.ID)
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, &influxdb.Error{
|
||||
|
|
|
@ -222,23 +222,6 @@ func TestWriteHandler_handleWrite(t *testing.T) {
|
|||
body: `{"code":"invalid","message":"unable to parse 'invalid': missing fields"}`,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "forbidden to write to system buckets",
|
||||
request: request{
|
||||
org: "043e0780ee2b1000",
|
||||
bucket: "000000000000000a",
|
||||
auth: bucketWritePermission("043e0780ee2b1000", "000000000000000a"),
|
||||
body: "invalid",
|
||||
},
|
||||
state: state{
|
||||
org: testOrg("043e0780ee2b1000"),
|
||||
bucket: testBucket("043e0780ee2b1000", "000000000000000a"),
|
||||
},
|
||||
wants: wants{
|
||||
code: 403,
|
||||
body: `{"code":"forbidden","message":"cannot write to internal bucket "}`,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "forbidden to write with insufficient permission",
|
||||
request: request{
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
platform "github.com/influxdata/influxdb"
|
||||
)
|
||||
|
@ -210,14 +209,6 @@ func (s *Service) findBuckets(ctx context.Context, filter platform.BucketFilter,
|
|||
// FindBuckets returns a list of buckets that match filter and the total count of matching buckets.
|
||||
// Additional options provide pagination & sorting.
|
||||
func (s *Service) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) {
|
||||
if filter.Name != nil {
|
||||
internal, err := s.findSystemBucket(*filter.Name)
|
||||
// if found in our internals list, return mock
|
||||
if err == nil {
|
||||
return []*platform.Bucket{internal}, 0, nil
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
bs, pe := s.findBuckets(ctx, filter, opt...)
|
||||
if pe != nil {
|
||||
|
@ -228,32 +219,6 @@ func (s *Service) FindBuckets(ctx context.Context, filter platform.BucketFilter,
|
|||
return bs, len(bs), nil
|
||||
}
|
||||
|
||||
func (s *Service) findSystemBucket(n string) (*platform.Bucket, error) {
|
||||
switch n {
|
||||
case "_tasks":
|
||||
return &platform.Bucket{
|
||||
ID: platform.TasksSystemBucketID,
|
||||
Type: platform.BucketTypeSystem,
|
||||
Name: "_tasks",
|
||||
RetentionPeriod: time.Hour * 24 * 3,
|
||||
Description: "System bucket for task logs",
|
||||
}, nil
|
||||
case "_monitoring":
|
||||
return &platform.Bucket{
|
||||
ID: platform.MonitoringSystemBucketID,
|
||||
Type: platform.BucketTypeSystem,
|
||||
Name: "_monitoring",
|
||||
RetentionPeriod: time.Hour * 24 * 7,
|
||||
Description: "System bucket for monitoring logs",
|
||||
}, nil
|
||||
default:
|
||||
return nil, &platform.Error{
|
||||
Code: platform.ENotFound,
|
||||
Msg: fmt.Sprintf("system bucket %q not found", n),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// CreateBucket creates a new bucket and sets b.ID with the new identifier.
|
||||
func (s *Service) CreateBucket(ctx context.Context, b *platform.Bucket) error {
|
||||
if b.OrgID.Valid() {
|
||||
|
|
75
kv/bucket.go
75
kv/bucket.go
|
@ -119,21 +119,14 @@ func (s *Service) FindBucketByName(ctx context.Context, orgID influxdb.ID, n str
|
|||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
internal, err := s.findSystemBucket(n)
|
||||
// if found in our internals list, return mock
|
||||
if err == nil {
|
||||
internal.OrgID = orgID
|
||||
return internal, nil
|
||||
}
|
||||
|
||||
var b *influxdb.Bucket
|
||||
|
||||
err = s.kv.View(ctx, func(tx Tx) error {
|
||||
err := s.kv.View(ctx, func(tx Tx) error {
|
||||
bkt, pe := s.findBucketByName(ctx, tx, orgID, n)
|
||||
if pe != nil {
|
||||
err = pe
|
||||
err := pe
|
||||
return err
|
||||
}
|
||||
|
||||
b = bkt
|
||||
return nil
|
||||
})
|
||||
|
@ -141,30 +134,33 @@ func (s *Service) FindBucketByName(ctx context.Context, orgID influxdb.ID, n str
|
|||
return b, err
|
||||
}
|
||||
|
||||
func (s *Service) findSystemBucket(n string) (*influxdb.Bucket, error) {
|
||||
switch n {
|
||||
case "_tasks":
|
||||
return &influxdb.Bucket{
|
||||
ID: influxdb.TasksSystemBucketID,
|
||||
Type: influxdb.BucketTypeSystem,
|
||||
Name: "_tasks",
|
||||
RetentionPeriod: time.Hour * 24 * 3,
|
||||
Description: "System bucket for task logs",
|
||||
}, nil
|
||||
case "_monitoring":
|
||||
return &influxdb.Bucket{
|
||||
ID: influxdb.MonitoringSystemBucketID,
|
||||
Type: influxdb.BucketTypeSystem,
|
||||
Name: "_monitoring",
|
||||
RetentionPeriod: time.Hour * 24 * 7,
|
||||
Description: "System bucket for monitoring logs",
|
||||
}, nil
|
||||
default:
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.ENotFound,
|
||||
Msg: fmt.Sprintf("system bucket %q not found", n),
|
||||
}
|
||||
// CreateSystemBuckets creates the task and monitoring system buckets for an organization
|
||||
func (s *Service) createSystemBuckets(ctx context.Context, tx Tx, o *influxdb.Organization) error {
|
||||
tb := &influxdb.Bucket{
|
||||
OrgID: o.ID,
|
||||
Type: influxdb.BucketTypeSystem,
|
||||
Name: influxdb.TasksSystemBucketName,
|
||||
RetentionPeriod: time.Hour * 24 * 3,
|
||||
Description: "System bucket for task logs",
|
||||
}
|
||||
|
||||
if err := s.createBucket(ctx, tx, tb); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mb := &influxdb.Bucket{
|
||||
OrgID: o.ID,
|
||||
Type: influxdb.BucketTypeSystem,
|
||||
Name: influxdb.MonitoringSystemBucketName,
|
||||
RetentionPeriod: time.Hour * 24 * 7,
|
||||
Description: "System bucket for monitoring logs",
|
||||
}
|
||||
|
||||
if err := s.createBucket(ctx, tx, mb); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) findBucketByName(ctx context.Context, tx Tx, orgID influxdb.ID, n string) (*influxdb.Bucket, error) {
|
||||
|
@ -335,17 +331,6 @@ func (s *Service) FindBuckets(ctx context.Context, filter influxdb.BucketFilter,
|
|||
return nil, 0, err
|
||||
}
|
||||
|
||||
tasks, error := s.findSystemBucket("_tasks")
|
||||
if error != nil {
|
||||
return bs, 0, error
|
||||
}
|
||||
|
||||
monitoring, error := s.findSystemBucket("_monitoring")
|
||||
if error != nil {
|
||||
return bs, 0, error
|
||||
}
|
||||
bs = append(bs, tasks, monitoring)
|
||||
|
||||
return bs, len(bs), nil
|
||||
}
|
||||
|
||||
|
@ -611,7 +596,7 @@ func (s *Service) validBucketName(ctx context.Context, tx Tx, b *influxdb.Bucket
|
|||
}
|
||||
|
||||
// names starting with an underscore are reserved for system buckets
|
||||
if strings.HasPrefix(b.Name, "_") {
|
||||
if strings.HasPrefix(b.Name, "_") && b.Type != influxdb.BucketTypeSystem {
|
||||
return ReservedBucketNameError(b)
|
||||
}
|
||||
|
||||
|
|
|
@ -267,6 +267,11 @@ func (s *Service) CreateOrganization(ctx context.Context, o *influxdb.Organizati
|
|||
s.Logger.Info("failed to make user owner of organization", zap.Error(err))
|
||||
}
|
||||
|
||||
if err := s.createSystemBuckets(ctx, tx, o); err != nil {
|
||||
s.Logger.Info("failed to create system buckets of organization", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
|
@ -133,7 +133,7 @@ func (s *retentionEnforcer) expireData(ctx context.Context, buckets []*influxdb.
|
|||
zap.String("org_id", b.OrgID.String()),
|
||||
zap.String("bucket_id", b.ID.String()),
|
||||
zap.Duration("retention_period", b.RetentionPeriod),
|
||||
zap.Bool("system", b.IsSystem()),
|
||||
zap.String("system_type", b.Type.String()),
|
||||
}
|
||||
|
||||
if b.RetentionPeriod == 0 {
|
||||
|
@ -153,7 +153,7 @@ func (s *retentionEnforcer) expireData(ctx context.Context, buckets []*influxdb.
|
|||
span.LogKV(
|
||||
"bucket", b.ID,
|
||||
"org_id", b.OrgID,
|
||||
"system", b.IsSystem(),
|
||||
"system", b.Type,
|
||||
"retention_period", b.RetentionPeriod,
|
||||
"retention_policy", b.RetentionPolicyName,
|
||||
"from", time.Unix(0, min).UTC(),
|
||||
|
|
Loading…
Reference in New Issue