526 lines
14 KiB
Go
526 lines
14 KiB
Go
package tenant
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi"
|
|
"github.com/go-chi/chi/middleware"
|
|
"github.com/influxdata/influxdb/v2"
|
|
"github.com/influxdata/influxdb/v2/kit/platform"
|
|
"github.com/influxdata/influxdb/v2/kit/platform/errors"
|
|
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// BucketHandler represents an HTTP API handler for users.
|
|
type BucketHandler struct {
|
|
chi.Router
|
|
api *kithttp.API
|
|
log *zap.Logger
|
|
bucketSvc influxdb.BucketService
|
|
labelSvc influxdb.LabelService // we may need this for now but we dont want it permanently
|
|
}
|
|
|
|
const (
|
|
prefixBuckets = "/api/v2/buckets"
|
|
)
|
|
|
|
// NewHTTPBucketHandler constructs a new http server.
|
|
func NewHTTPBucketHandler(log *zap.Logger, bucketSvc influxdb.BucketService, labelSvc influxdb.LabelService, urmHandler, labelHandler http.Handler) *BucketHandler {
|
|
svr := &BucketHandler{
|
|
api: kithttp.NewAPI(kithttp.WithLog(log)),
|
|
log: log,
|
|
bucketSvc: bucketSvc,
|
|
labelSvc: labelSvc,
|
|
}
|
|
|
|
r := chi.NewRouter()
|
|
r.Use(
|
|
middleware.Recoverer,
|
|
middleware.RequestID,
|
|
middleware.RealIP,
|
|
)
|
|
|
|
// RESTy routes for "articles" resource
|
|
r.Route("/", func(r chi.Router) {
|
|
r.Post("/", svr.handlePostBucket)
|
|
r.Get("/", svr.handleGetBuckets)
|
|
|
|
r.Route("/{id}", func(r chi.Router) {
|
|
r.Get("/", svr.handleGetBucket)
|
|
r.Patch("/", svr.handlePatchBucket)
|
|
r.Delete("/", svr.handleDeleteBucket)
|
|
|
|
// mount embedded resources
|
|
mountableRouter := r.With(kithttp.ValidResource(svr.api, svr.lookupOrgByBucketID))
|
|
mountableRouter.Mount("/members", urmHandler)
|
|
mountableRouter.Mount("/owners", urmHandler)
|
|
mountableRouter.Mount("/labels", labelHandler)
|
|
})
|
|
})
|
|
|
|
svr.Router = r
|
|
return svr
|
|
}
|
|
|
|
func (h *BucketHandler) Prefix() string {
|
|
return prefixBuckets
|
|
}
|
|
|
|
// bucket is used for serialization/deserialization with duration string syntax.
|
|
type bucket struct {
|
|
ID platform.ID `json:"id,omitempty"`
|
|
OrgID platform.ID `json:"orgID,omitempty"`
|
|
Type string `json:"type"`
|
|
Description string `json:"description,omitempty"`
|
|
Name string `json:"name"`
|
|
RetentionPolicyName string `json:"rp,omitempty"` // This to support v1 sources
|
|
RetentionRules []retentionRule `json:"retentionRules"`
|
|
influxdb.CRUDLog
|
|
}
|
|
|
|
// retentionRule is the retention rule action for a bucket.
|
|
type retentionRule struct {
|
|
Type string `json:"type"`
|
|
EverySeconds int64 `json:"everySeconds"`
|
|
ShardGroupDurationSeconds int64 `json:"shardGroupDurationSeconds"`
|
|
}
|
|
|
|
func (b *bucket) toInfluxDB() *influxdb.Bucket {
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
|
|
var rpDuration time.Duration // zero value implies infinite retention policy
|
|
var sgDuration time.Duration // zero value implies the server should pick a value
|
|
|
|
// Only support a single retention period for the moment
|
|
if len(b.RetentionRules) > 0 {
|
|
rpDuration = time.Duration(b.RetentionRules[0].EverySeconds) * time.Second
|
|
sgDuration = time.Duration(b.RetentionRules[0].ShardGroupDurationSeconds) * time.Second
|
|
}
|
|
|
|
return &influxdb.Bucket{
|
|
ID: b.ID,
|
|
OrgID: b.OrgID,
|
|
Type: influxdb.ParseBucketType(b.Type),
|
|
Description: b.Description,
|
|
Name: b.Name,
|
|
RetentionPolicyName: b.RetentionPolicyName,
|
|
RetentionPeriod: rpDuration,
|
|
ShardGroupDuration: sgDuration,
|
|
CRUDLog: b.CRUDLog,
|
|
}
|
|
}
|
|
|
|
func newBucket(pb *influxdb.Bucket) *bucket {
|
|
if pb == nil {
|
|
return nil
|
|
}
|
|
|
|
bkt := bucket{
|
|
ID: pb.ID,
|
|
OrgID: pb.OrgID,
|
|
Type: pb.Type.String(),
|
|
Name: pb.Name,
|
|
Description: pb.Description,
|
|
RetentionPolicyName: pb.RetentionPolicyName,
|
|
RetentionRules: []retentionRule{},
|
|
CRUDLog: pb.CRUDLog,
|
|
}
|
|
|
|
// Only append a retention rule if the user wants to explicitly set
|
|
// a parameter on the rule.
|
|
//
|
|
// This is for backwards-compatibility with older versions of the API,
|
|
// which didn't support setting shard-group durations and used an empty
|
|
// array of rules to represent infinite retention.
|
|
if pb.RetentionPeriod > 0 || pb.ShardGroupDuration > 0 {
|
|
bkt.RetentionRules = append(bkt.RetentionRules, retentionRule{
|
|
Type: "expire",
|
|
EverySeconds: int64(pb.RetentionPeriod.Round(time.Second) / time.Second),
|
|
ShardGroupDurationSeconds: int64(pb.ShardGroupDuration.Round(time.Second) / time.Second),
|
|
})
|
|
}
|
|
|
|
return &bkt
|
|
}
|
|
|
|
type retentionRuleUpdate struct {
|
|
Type string `json:"type"`
|
|
EverySeconds *int64 `json:"everySeconds"`
|
|
ShardGroupDurationSeconds *int64 `json:"shardGroupDurationSeconds"`
|
|
}
|
|
|
|
// bucketUpdate is used for serialization/deserialization with retention rules.
|
|
type bucketUpdate struct {
|
|
Name *string `json:"name,omitempty"`
|
|
Description *string `json:"description,omitempty"`
|
|
RetentionRules []retentionRuleUpdate `json:"retentionRules,omitempty"`
|
|
}
|
|
|
|
func (b *bucketUpdate) OK() error {
|
|
if len(b.RetentionRules) > 1 {
|
|
return &errors.Error{
|
|
Code: errors.EUnprocessableEntity,
|
|
Msg: "buckets cannot have more than one retention rule at this time",
|
|
}
|
|
}
|
|
|
|
if len(b.RetentionRules) > 0 {
|
|
rule := b.RetentionRules[0]
|
|
if rule.EverySeconds != nil && *rule.EverySeconds < 0 {
|
|
return &errors.Error{
|
|
Code: errors.EUnprocessableEntity,
|
|
Msg: "expiration seconds cannot be negative",
|
|
}
|
|
}
|
|
if rule.ShardGroupDurationSeconds != nil && *rule.ShardGroupDurationSeconds < 0 {
|
|
return &errors.Error{
|
|
Code: errors.EUnprocessableEntity,
|
|
Msg: "shard-group duration seconds cannot be negative",
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *bucketUpdate) toInfluxDB() *influxdb.BucketUpdate {
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
|
|
upd := influxdb.BucketUpdate{
|
|
Name: b.Name,
|
|
Description: b.Description,
|
|
}
|
|
|
|
// For now, only use a single retention rule.
|
|
if len(b.RetentionRules) > 0 {
|
|
rule := b.RetentionRules[0]
|
|
if rule.EverySeconds != nil {
|
|
rp := time.Duration(*rule.EverySeconds) * time.Second
|
|
upd.RetentionPeriod = &rp
|
|
}
|
|
if rule.ShardGroupDurationSeconds != nil {
|
|
sgd := time.Duration(*rule.ShardGroupDurationSeconds) * time.Second
|
|
upd.ShardGroupDuration = &sgd
|
|
}
|
|
}
|
|
|
|
return &upd
|
|
}
|
|
|
|
func newBucketUpdate(pb *influxdb.BucketUpdate) *bucketUpdate {
|
|
if pb == nil {
|
|
return nil
|
|
}
|
|
|
|
up := &bucketUpdate{
|
|
Name: pb.Name,
|
|
Description: pb.Description,
|
|
RetentionRules: []retentionRuleUpdate{},
|
|
}
|
|
|
|
if pb.RetentionPeriod == nil && pb.ShardGroupDuration == nil {
|
|
return up
|
|
}
|
|
|
|
rule := retentionRuleUpdate{Type: "expire"}
|
|
|
|
if pb.RetentionPeriod != nil {
|
|
rp := int64((*pb.RetentionPeriod).Round(time.Second) / time.Second)
|
|
rule.EverySeconds = &rp
|
|
}
|
|
if pb.ShardGroupDuration != nil {
|
|
sgd := int64((*pb.ShardGroupDuration).Round(time.Second) / time.Second)
|
|
rule.ShardGroupDurationSeconds = &sgd
|
|
}
|
|
|
|
up.RetentionRules = append(up.RetentionRules, rule)
|
|
return up
|
|
}
|
|
|
|
type bucketResponse struct {
|
|
bucket
|
|
Links map[string]string `json:"links"`
|
|
Labels []influxdb.Label `json:"labels"`
|
|
}
|
|
|
|
func NewBucketResponse(b *influxdb.Bucket, labels ...*influxdb.Label) *bucketResponse {
|
|
res := &bucketResponse{
|
|
Links: map[string]string{
|
|
"self": fmt.Sprintf("/api/v2/buckets/%s", b.ID),
|
|
"org": fmt.Sprintf("/api/v2/orgs/%s", b.OrgID),
|
|
"members": fmt.Sprintf("/api/v2/buckets/%s/members", b.ID),
|
|
"owners": fmt.Sprintf("/api/v2/buckets/%s/owners", b.ID),
|
|
"labels": fmt.Sprintf("/api/v2/buckets/%s/labels", b.ID),
|
|
"write": fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", b.OrgID, b.ID),
|
|
},
|
|
bucket: *newBucket(b),
|
|
Labels: []influxdb.Label{},
|
|
}
|
|
for _, l := range labels {
|
|
res.Labels = append(res.Labels, *l)
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
type bucketsResponse struct {
|
|
Links *influxdb.PagingLinks `json:"links"`
|
|
Buckets []*bucketResponse `json:"buckets"`
|
|
}
|
|
|
|
func newBucketsResponse(ctx context.Context, opts influxdb.FindOptions, f influxdb.BucketFilter, bs []*influxdb.Bucket, labelSvc influxdb.LabelService) *bucketsResponse {
|
|
rs := make([]*bucketResponse, 0, len(bs))
|
|
for _, b := range bs {
|
|
var labels []*influxdb.Label
|
|
if labelSvc != nil { // allow for no label svc
|
|
labels, _ = labelSvc.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ResourceID: b.ID, ResourceType: influxdb.BucketsResourceType})
|
|
}
|
|
rs = append(rs, NewBucketResponse(b, labels...))
|
|
}
|
|
return &bucketsResponse{
|
|
Links: influxdb.NewPagingLinks(prefixBuckets, opts, f, len(bs)),
|
|
Buckets: rs,
|
|
}
|
|
}
|
|
|
|
// handlePostBucket is the HTTP handler for the POST /api/v2/buckets route.
|
|
func (h *BucketHandler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
|
|
var b postBucketRequest
|
|
if err := h.api.DecodeJSON(r.Body, &b); err != nil {
|
|
h.api.Err(w, r, err)
|
|
return
|
|
}
|
|
if err := b.OK(); err != nil {
|
|
h.api.Err(w, r, err)
|
|
return
|
|
}
|
|
|
|
bucket := b.toInfluxDB()
|
|
|
|
if err := h.bucketSvc.CreateBucket(r.Context(), bucket); err != nil {
|
|
h.api.Err(w, r, err)
|
|
return
|
|
}
|
|
h.log.Debug("Bucket created", zap.String("bucket", fmt.Sprint(bucket)))
|
|
|
|
h.api.Respond(w, r, http.StatusCreated, NewBucketResponse(bucket))
|
|
}
|
|
|
|
type postBucketRequest struct {
|
|
OrgID platform.ID `json:"orgID,omitempty"`
|
|
Name string `json:"name"`
|
|
Description string `json:"description"`
|
|
RetentionPolicyName string `json:"rp,omitempty"` // This to support v1 sources
|
|
RetentionRules []retentionRule `json:"retentionRules"`
|
|
}
|
|
|
|
func (b *postBucketRequest) OK() error {
|
|
if !b.OrgID.Valid() {
|
|
return &errors.Error{
|
|
Code: errors.EInvalid,
|
|
Msg: "organization id must be provided",
|
|
}
|
|
}
|
|
|
|
if len(b.RetentionRules) > 1 {
|
|
return &errors.Error{
|
|
Code: errors.EUnprocessableEntity,
|
|
Msg: "buckets cannot have more than one retention rule at this time",
|
|
}
|
|
}
|
|
|
|
if len(b.RetentionRules) > 0 {
|
|
rule := b.RetentionRules[0]
|
|
|
|
if rule.EverySeconds < 0 {
|
|
return &errors.Error{
|
|
Code: errors.EUnprocessableEntity,
|
|
Msg: "expiration seconds cannot be negative",
|
|
}
|
|
}
|
|
if rule.ShardGroupDurationSeconds < 0 {
|
|
return &errors.Error{
|
|
Code: errors.EUnprocessableEntity,
|
|
Msg: "shard-group duration seconds cannot be negative",
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b postBucketRequest) toInfluxDB() *influxdb.Bucket {
|
|
// Only support a single retention period for the moment
|
|
var rpDur time.Duration
|
|
var sgDur time.Duration
|
|
if len(b.RetentionRules) > 0 {
|
|
rule := b.RetentionRules[0]
|
|
rpDur = time.Duration(rule.EverySeconds) * time.Second
|
|
sgDur = time.Duration(rule.ShardGroupDurationSeconds) * time.Second
|
|
}
|
|
|
|
return &influxdb.Bucket{
|
|
OrgID: b.OrgID,
|
|
Description: b.Description,
|
|
Name: b.Name,
|
|
Type: influxdb.BucketTypeUser,
|
|
RetentionPolicyName: b.RetentionPolicyName,
|
|
RetentionPeriod: rpDur,
|
|
ShardGroupDuration: sgDur,
|
|
}
|
|
}
|
|
|
|
// handleGetBucket is the HTTP handler for the GET /api/v2/buckets/:id route.
|
|
func (h *BucketHandler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
|
|
id, err := platform.IDFromString(chi.URLParam(r, "id"))
|
|
if err != nil {
|
|
h.api.Err(w, r, err)
|
|
return
|
|
}
|
|
|
|
b, err := h.bucketSvc.FindBucketByID(ctx, *id)
|
|
if err != nil {
|
|
h.api.Err(w, r, err)
|
|
return
|
|
}
|
|
|
|
h.log.Debug("Bucket retrieved", zap.String("bucket", fmt.Sprint(b)))
|
|
var labels []*influxdb.Label
|
|
if h.labelSvc != nil { // allow for no label svc
|
|
labels, _ = h.labelSvc.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ResourceID: b.ID, ResourceType: influxdb.BucketsResourceType})
|
|
}
|
|
|
|
h.api.Respond(w, r, http.StatusOK, NewBucketResponse(b, labels...))
|
|
}
|
|
|
|
// handleDeleteBucket is the HTTP handler for the DELETE /api/v2/buckets/:id route.
|
|
func (h *BucketHandler) handleDeleteBucket(w http.ResponseWriter, r *http.Request) {
|
|
id, err := platform.IDFromString(chi.URLParam(r, "id"))
|
|
if err != nil {
|
|
h.api.Err(w, r, err)
|
|
return
|
|
}
|
|
|
|
if err := h.bucketSvc.DeleteBucket(r.Context(), *id); err != nil {
|
|
h.api.Err(w, r, err)
|
|
return
|
|
}
|
|
|
|
h.log.Debug("Bucket deleted", zap.String("bucketID", id.String()))
|
|
|
|
h.api.Respond(w, r, http.StatusNoContent, nil)
|
|
}
|
|
|
|
// handleGetBuckets is the HTTP handler for the GET /api/v2/buckets route.
|
|
func (h *BucketHandler) handleGetBuckets(w http.ResponseWriter, r *http.Request) {
|
|
bucketsRequest, err := decodeGetBucketsRequest(r)
|
|
if err != nil {
|
|
h.api.Err(w, r, err)
|
|
return
|
|
}
|
|
|
|
bs, _, err := h.bucketSvc.FindBuckets(r.Context(), bucketsRequest.filter, bucketsRequest.opts)
|
|
if err != nil {
|
|
h.api.Err(w, r, err)
|
|
return
|
|
}
|
|
h.log.Debug("Buckets retrieved", zap.String("buckets", fmt.Sprint(bs)))
|
|
|
|
h.api.Respond(w, r, http.StatusOK, newBucketsResponse(r.Context(), bucketsRequest.opts, bucketsRequest.filter, bs, h.labelSvc))
|
|
}
|
|
|
|
type getBucketsRequest struct {
|
|
filter influxdb.BucketFilter
|
|
opts influxdb.FindOptions
|
|
}
|
|
|
|
func decodeGetBucketsRequest(r *http.Request) (*getBucketsRequest, error) {
|
|
qp := r.URL.Query()
|
|
req := &getBucketsRequest{}
|
|
|
|
opts, err := influxdb.DecodeFindOptions(r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req.opts = *opts
|
|
|
|
if orgID := qp.Get("orgID"); orgID != "" {
|
|
id, err := platform.IDFromString(orgID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.filter.OrganizationID = id
|
|
}
|
|
|
|
if org := qp.Get("org"); org != "" {
|
|
req.filter.Org = &org
|
|
}
|
|
|
|
if name := qp.Get("name"); name != "" {
|
|
req.filter.Name = &name
|
|
}
|
|
|
|
if bucketID := qp.Get("id"); bucketID != "" {
|
|
id, err := platform.IDFromString(bucketID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.filter.ID = id
|
|
}
|
|
|
|
return req, nil
|
|
}
|
|
|
|
// handlePatchBucket is the HTTP handler for the PATCH /api/v2/buckets route.
|
|
func (h *BucketHandler) handlePatchBucket(w http.ResponseWriter, r *http.Request) {
|
|
id, err := platform.IDFromString(chi.URLParam(r, "id"))
|
|
if err != nil {
|
|
h.api.Err(w, r, err)
|
|
return
|
|
}
|
|
|
|
var reqBody bucketUpdate
|
|
if err := h.api.DecodeJSON(r.Body, &reqBody); err != nil {
|
|
h.api.Err(w, r, err)
|
|
return
|
|
}
|
|
|
|
if reqBody.Name != nil {
|
|
b, err := h.bucketSvc.FindBucketByID(r.Context(), *id)
|
|
if err != nil {
|
|
h.api.Err(w, r, err)
|
|
return
|
|
}
|
|
b.Name = *reqBody.Name
|
|
}
|
|
|
|
b, err := h.bucketSvc.UpdateBucket(r.Context(), *id, *reqBody.toInfluxDB())
|
|
if err != nil {
|
|
h.api.Err(w, r, err)
|
|
return
|
|
}
|
|
|
|
h.log.Debug("Bucket updated", zap.String("bucket", fmt.Sprint(b)))
|
|
|
|
h.api.Respond(w, r, http.StatusOK, NewBucketResponse(b))
|
|
}
|
|
|
|
func (h *BucketHandler) lookupOrgByBucketID(ctx context.Context, id platform.ID) (platform.ID, error) {
|
|
b, err := h.bucketSvc.FindBucketByID(ctx, id)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return b.OrgID, nil
|
|
}
|