influxdb/tenant/http_server_bucket.go

489 lines
12 KiB
Go

package tenant
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/influxdata/influxdb/v2"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"go.uber.org/zap"
)
// BucketHandler represents an HTTP API handler for users.
type BucketHandler struct {
chi.Router
api *kithttp.API
log *zap.Logger
bucketSvc influxdb.BucketService
}
const (
prefixBuckets = "/api/v2/buckets"
)
// NewHTTPBucketHandler constructs a new http server.
func NewHTTPBucketHandler(log *zap.Logger, bucketSvc influxdb.BucketService, urmHandler, labelHandler http.Handler) *BucketHandler {
svr := &BucketHandler{
api: kithttp.NewAPI(kithttp.WithLog(log)),
log: log,
bucketSvc: bucketSvc,
}
r := chi.NewRouter()
r.Use(
middleware.Recoverer,
middleware.RequestID,
middleware.RealIP,
)
// RESTy routes for "articles" resource
r.Route("/", func(r chi.Router) {
r.Post("/", svr.handlePostBucket)
r.Get("/", svr.handleGetBuckets)
r.Route("/{id}", func(r chi.Router) {
r.Get("/", svr.handleGetBucket)
r.Patch("/", svr.handlePatchBucket)
r.Delete("/", svr.handleDeleteBucket)
// mount embedded resources
mountableRouter := r.With(ValidResource(svr.api, svr.lookupOrgByBucketID))
mountableRouter.Mount("/members", urmHandler)
mountableRouter.Mount("/owners", urmHandler)
mountableRouter.Mount("/labels", labelHandler)
})
})
svr.Router = r
return svr
}
func (h *BucketHandler) Prefix() string {
return prefixBuckets
}
// bucket is used for serialization/deserialization with duration string syntax.
type bucket struct {
ID influxdb.ID `json:"id,omitempty"`
OrgID influxdb.ID `json:"orgID,omitempty"`
Type string `json:"type"`
Description string `json:"description,omitempty"`
Name string `json:"name"`
RetentionPolicyName string `json:"rp,omitempty"` // This to support v1 sources
RetentionRules []retentionRule `json:"retentionRules"`
influxdb.CRUDLog
}
// retentionRule is the retention rule action for a bucket.
type retentionRule struct {
Type string `json:"type"`
EverySeconds int64 `json:"everySeconds"`
}
func (rr *retentionRule) RetentionPeriod() (time.Duration, error) {
t := time.Duration(rr.EverySeconds) * time.Second
if t < time.Second {
return t, &influxdb.Error{
Code: influxdb.EUnprocessableEntity,
Msg: "expiration seconds must be greater than or equal to one second",
}
}
return t, nil
}
func (b *bucket) toInfluxDB() (*influxdb.Bucket, error) {
if b == nil {
return nil, nil
}
var d time.Duration // zero value implies infinite retention policy
// Only support a single retention period for the moment
if len(b.RetentionRules) > 0 {
d = time.Duration(b.RetentionRules[0].EverySeconds) * time.Second
if d < time.Second {
return nil, &influxdb.Error{
Code: influxdb.EUnprocessableEntity,
Msg: "expiration seconds must be greater than or equal to one second",
}
}
}
return &influxdb.Bucket{
ID: b.ID,
OrgID: b.OrgID,
Type: influxdb.ParseBucketType(b.Type),
Description: b.Description,
Name: b.Name,
RetentionPolicyName: b.RetentionPolicyName,
RetentionPeriod: d,
CRUDLog: b.CRUDLog,
}, nil
}
func newBucket(pb *influxdb.Bucket) *bucket {
if pb == nil {
return nil
}
rules := []retentionRule{}
rp := int64(pb.RetentionPeriod.Round(time.Second) / time.Second)
if rp > 0 {
rules = append(rules, retentionRule{
Type: "expire",
EverySeconds: rp,
})
}
return &bucket{
ID: pb.ID,
OrgID: pb.OrgID,
Type: pb.Type.String(),
Name: pb.Name,
Description: pb.Description,
RetentionPolicyName: pb.RetentionPolicyName,
RetentionRules: rules,
CRUDLog: pb.CRUDLog,
}
}
// bucketUpdate is used for serialization/deserialization with retention rules.
type bucketUpdate struct {
Name *string `json:"name,omitempty"`
Description *string `json:"description,omitempty"`
RetentionRules []retentionRule `json:"retentionRules,omitempty"`
}
func (b *bucketUpdate) OK() error {
if len(b.RetentionRules) > 0 {
_, err := b.RetentionRules[0].RetentionPeriod()
if err != nil {
return err
}
}
return nil
}
func (b *bucketUpdate) toInfluxDB() *influxdb.BucketUpdate {
if b == nil {
return nil
}
// For now, only use a single retention rule.
var d time.Duration
if len(b.RetentionRules) > 0 {
d, _ = b.RetentionRules[0].RetentionPeriod()
}
return &influxdb.BucketUpdate{
Name: b.Name,
Description: b.Description,
RetentionPeriod: &d,
}
}
func newBucketUpdate(pb *influxdb.BucketUpdate) *bucketUpdate {
if pb == nil {
return nil
}
up := &bucketUpdate{
Name: pb.Name,
Description: pb.Description,
RetentionRules: []retentionRule{},
}
if pb.RetentionPeriod != nil {
d := int64((*pb.RetentionPeriod).Round(time.Second) / time.Second)
up.RetentionRules = append(up.RetentionRules, retentionRule{
Type: "expire",
EverySeconds: d,
})
}
return up
}
type bucketResponse struct {
bucket
Links map[string]string `json:"links"`
}
func NewBucketResponse(b *influxdb.Bucket) *bucketResponse {
res := &bucketResponse{
Links: map[string]string{
"self": fmt.Sprintf("/api/v2/buckets/%s", b.ID),
"org": fmt.Sprintf("/api/v2/orgs/%s", b.OrgID),
"members": fmt.Sprintf("/api/v2/buckets/%s/members", b.ID),
"owners": fmt.Sprintf("/api/v2/buckets/%s/owners", b.ID),
"labels": fmt.Sprintf("/api/v2/buckets/%s/labels", b.ID),
"write": fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", b.OrgID, b.ID),
},
bucket: *newBucket(b),
}
return res
}
type bucketsResponse struct {
Links *influxdb.PagingLinks `json:"links"`
Buckets []*bucketResponse `json:"buckets"`
}
func newBucketsResponse(ctx context.Context, opts influxdb.FindOptions, f influxdb.BucketFilter, bs []*influxdb.Bucket) *bucketsResponse {
rs := make([]*bucketResponse, 0, len(bs))
for _, b := range bs {
rs = append(rs, NewBucketResponse(b))
}
return &bucketsResponse{
Links: influxdb.NewPagingLinks(prefixBuckets, opts, f, len(bs)),
Buckets: rs,
}
}
// handlePostBucket is the HTTP handler for the POST /api/v2/buckets route.
func (h *BucketHandler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
var b postBucketRequest
if err := h.api.DecodeJSON(r.Body, &b); err != nil {
h.api.Err(w, err)
return
}
if err := b.OK(); err != nil {
h.api.Err(w, err)
return
}
bucket := b.toInfluxDB()
if err := validBucketName(bucket); err != nil {
h.api.Err(w, err)
return
}
if err := h.bucketSvc.CreateBucket(r.Context(), bucket); err != nil {
h.api.Err(w, err)
return
}
h.log.Debug("Bucket created", zap.String("bucket", fmt.Sprint(bucket)))
h.api.Respond(w, http.StatusCreated, NewBucketResponse(bucket))
}
type postBucketRequest struct {
OrgID influxdb.ID `json:"orgID,omitempty"`
Name string `json:"name"`
Description string `json:"description"`
RetentionPolicyName string `json:"rp,omitempty"` // This to support v1 sources
RetentionRules []retentionRule `json:"retentionRules"`
}
func (b *postBucketRequest) OK() error {
if !b.OrgID.Valid() {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "organization id must be provided",
}
}
// Only support a single retention period for the moment
if len(b.RetentionRules) > 0 {
if _, err := b.RetentionRules[0].RetentionPeriod(); err != nil {
return &influxdb.Error{
Code: influxdb.EUnprocessableEntity,
Msg: err.Error(),
}
}
}
// names starting with an underscore are reserved for system buckets
if err := validBucketName(b.toInfluxDB()); err != nil {
return &influxdb.Error{
Code: influxdb.EUnprocessableEntity,
Msg: err.Error(),
}
}
return nil
}
func (b postBucketRequest) toInfluxDB() *influxdb.Bucket {
// Only support a single retention period for the moment
var dur time.Duration
if len(b.RetentionRules) > 0 {
dur, _ = b.RetentionRules[0].RetentionPeriod()
}
return &influxdb.Bucket{
OrgID: b.OrgID,
Description: b.Description,
Name: b.Name,
Type: influxdb.BucketTypeUser,
RetentionPolicyName: b.RetentionPolicyName,
RetentionPeriod: dur,
}
}
// handleGetBucket is the HTTP handler for the GET /api/v2/buckets/:id route.
func (h *BucketHandler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
id, err := influxdb.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, err)
return
}
b, err := h.bucketSvc.FindBucketByID(ctx, *id)
if err != nil {
h.api.Err(w, err)
return
}
h.log.Debug("Bucket retrieved", zap.String("bucket", fmt.Sprint(b)))
h.api.Respond(w, http.StatusOK, NewBucketResponse(b))
}
// handleDeleteBucket is the HTTP handler for the DELETE /api/v2/buckets/:id route.
func (h *BucketHandler) handleDeleteBucket(w http.ResponseWriter, r *http.Request) {
id, err := influxdb.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, err)
return
}
if err := h.bucketSvc.DeleteBucket(r.Context(), *id); err != nil {
h.api.Err(w, err)
return
}
h.log.Debug("Bucket deleted", zap.String("bucketID", id.String()))
h.api.Respond(w, http.StatusNoContent, nil)
}
// handleGetBuckets is the HTTP handler for the GET /api/v2/buckets route.
func (h *BucketHandler) handleGetBuckets(w http.ResponseWriter, r *http.Request) {
bucketsRequest, err := decodeGetBucketsRequest(r)
if err != nil {
h.api.Err(w, err)
return
}
bs, _, err := h.bucketSvc.FindBuckets(r.Context(), bucketsRequest.filter, bucketsRequest.opts)
if err != nil {
h.api.Err(w, err)
return
}
h.log.Debug("Buckets retrieved", zap.String("buckets", fmt.Sprint(bs)))
h.api.Respond(w, http.StatusOK, newBucketsResponse(r.Context(), bucketsRequest.opts, bucketsRequest.filter, bs))
}
type getBucketsRequest struct {
filter influxdb.BucketFilter
opts influxdb.FindOptions
}
func decodeGetBucketsRequest(r *http.Request) (*getBucketsRequest, error) {
qp := r.URL.Query()
req := &getBucketsRequest{}
opts, err := decodeFindOptions(r)
if err != nil {
return nil, err
}
req.opts = *opts
if orgID := qp.Get("orgID"); orgID != "" {
id, err := influxdb.IDFromString(orgID)
if err != nil {
return nil, err
}
req.filter.OrganizationID = id
}
if org := qp.Get("org"); org != "" {
req.filter.Org = &org
}
if name := qp.Get("name"); name != "" {
req.filter.Name = &name
}
if bucketID := qp.Get("id"); bucketID != "" {
id, err := influxdb.IDFromString(bucketID)
if err != nil {
return nil, err
}
req.filter.ID = id
}
return req, nil
}
// handlePatchBucket is the HTTP handler for the PATCH /api/v2/buckets route.
func (h *BucketHandler) handlePatchBucket(w http.ResponseWriter, r *http.Request) {
id, err := influxdb.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, err)
return
}
var reqBody bucketUpdate
if err := h.api.DecodeJSON(r.Body, &reqBody); err != nil {
h.api.Err(w, err)
return
}
if reqBody.Name != nil {
b, err := h.bucketSvc.FindBucketByID(r.Context(), *id)
if err != nil {
h.api.Err(w, err)
return
}
b.Name = *reqBody.Name
if err := validBucketName(b); err != nil {
h.api.Err(w, err)
return
}
}
b, err := h.bucketSvc.UpdateBucket(r.Context(), *id, *reqBody.toInfluxDB())
if err != nil {
h.api.Err(w, err)
return
}
h.log.Debug("Bucket updated", zap.String("bucket", fmt.Sprint(b)))
h.api.Respond(w, http.StatusOK, NewBucketResponse(b))
}
func (h *BucketHandler) lookupOrgByBucketID(ctx context.Context, id influxdb.ID) (influxdb.ID, error) {
b, err := h.bucketSvc.FindBucketByID(ctx, id)
if err != nil {
return 0, err
}
return b.OrgID, nil
}
// validBucketName reports any errors with bucket names
func validBucketName(bucket *influxdb.Bucket) error {
// names starting with an underscore are reserved for system buckets
if strings.HasPrefix(bucket.Name, "_") && bucket.Type != influxdb.BucketTypeSystem {
return &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/bucket",
Msg: fmt.Sprintf("bucket name %s is invalid. Buckets may not start with underscore", bucket.Name),
}
}
return nil
}