influxdb/http/bucket_service.go

456 lines
10 KiB
Go

package http
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/influxdata/platform"
"github.com/influxdata/platform/kit/errors"
"github.com/julienschmidt/httprouter"
)
// BucketHandler represents an HTTP API handler for buckets.
type BucketHandler struct {
*httprouter.Router
BucketService platform.BucketService
}
// NewBucketHandler returns a new instance of BucketHandler.
func NewBucketHandler() *BucketHandler {
h := &BucketHandler{
Router: httprouter.New(),
}
h.HandlerFunc("POST", "/v1/buckets", h.handlePostBucket)
h.HandlerFunc("GET", "/v1/buckets", h.handleGetBuckets)
h.HandlerFunc("GET", "/v1/buckets/:id", h.handleGetBucket)
h.HandlerFunc("PATCH", "/v1/buckets/:id", h.handlePatchBucket)
return h
}
// handlePostBucket is the HTTP handler for the POST /v1/buckets route.
func (h *BucketHandler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodePostBucketRequest(ctx, r)
if err != nil {
errors.EncodeHTTP(ctx, err, w)
return
}
if err := h.BucketService.CreateBucket(ctx, req.Bucket); err != nil {
errors.EncodeHTTP(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusCreated, req.Bucket); err != nil {
errors.EncodeHTTP(ctx, err, w)
return
}
}
type postBucketRequest struct {
Bucket *platform.Bucket
}
func decodePostBucketRequest(ctx context.Context, r *http.Request) (*postBucketRequest, error) {
b := &platform.Bucket{}
if err := json.NewDecoder(r.Body).Decode(b); err != nil {
return nil, err
}
return &postBucketRequest{
Bucket: b,
}, nil
}
// handleGetBucket is the HTTP handler for the GET /v1/buckets/:id route.
func (h *BucketHandler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeGetBucketRequest(ctx, r)
if err != nil {
errors.EncodeHTTP(ctx, err, w)
return
}
b, err := h.BucketService.FindBucketByID(ctx, req.BucketID)
if err != nil {
errors.EncodeHTTP(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, b); err != nil {
errors.EncodeHTTP(ctx, err, w)
return
}
}
type getBucketRequest struct {
BucketID platform.ID
}
func decodeGetBucketRequest(ctx context.Context, r *http.Request) (*getBucketRequest, error) {
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
if id == "" {
return nil, errors.InvalidDataf("url missing id")
}
var i platform.ID
if err := (&i).Decode([]byte(id)); err != nil {
return nil, err
}
req := &getBucketRequest{
BucketID: i,
}
return req, nil
}
// handleGetBuckets is the HTTP handler for the GET /v1/buckets route.
func (h *BucketHandler) handleGetBuckets(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeGetBucketsRequest(ctx, r)
if err != nil {
errors.EncodeHTTP(ctx, err, w)
return
}
bs, _, err := h.BucketService.FindBuckets(ctx, req.filter)
if err != nil {
errors.EncodeHTTP(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, bs); err != nil {
errors.EncodeHTTP(ctx, err, w)
return
}
}
type getBucketsRequest struct {
filter platform.BucketFilter
}
func decodeGetBucketsRequest(ctx context.Context, r *http.Request) (*getBucketsRequest, error) {
qp := r.URL.Query()
req := &getBucketsRequest{}
if id := qp.Get("orgID"); id != "" {
req.filter.OrganizationID = &platform.ID{}
if err := req.filter.OrganizationID.DecodeFromString(id); err != nil {
return nil, err
}
}
if id := qp.Get("bucketID"); id != "" {
req.filter.ID = &platform.ID{}
if err := req.filter.ID.DecodeFromString(id); err != nil {
return nil, err
}
}
if name := qp.Get("bucketName"); name != "" {
req.filter.Name = &name
}
return req, nil
}
// handlePatchBucket is the HTTP handler for the PATH /v1/buckets route.
func (h *BucketHandler) handlePatchBucket(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodePatchBucketRequest(ctx, r)
if err != nil {
errors.EncodeHTTP(ctx, err, w)
return
}
b, err := h.BucketService.UpdateBucket(ctx, req.BucketID, req.Update)
if err != nil {
errors.EncodeHTTP(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, b); err != nil {
errors.EncodeHTTP(ctx, err, w)
return
}
}
type patchBucketRequest struct {
Update platform.BucketUpdate
BucketID platform.ID
}
func decodePatchBucketRequest(ctx context.Context, r *http.Request) (*patchBucketRequest, error) {
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
if id == "" {
return nil, errors.InvalidDataf("url missing id")
}
var i platform.ID
if err := (&i).Decode([]byte(id)); err != nil {
return nil, err
}
var upd platform.BucketUpdate
if err := json.NewDecoder(r.Body).Decode(&upd); err != nil {
return nil, err
}
return &patchBucketRequest{
Update: upd,
BucketID: i,
}, nil
}
const (
bucketPath = "/v1/buckets"
)
// BucketService connects to Influx via HTTP using tokens to manage buckets
type BucketService struct {
Addr string
Token string
InsecureSkipVerify bool
}
// FindBucketByID returns a single bucket by ID.
func (s *BucketService) FindBucketByID(ctx context.Context, id platform.ID) (*platform.Bucket, error) {
u, err := newURL(s.Addr, bucketplatformath(id))
if err != nil {
return nil, err
}
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", s.Token)
hc := newClient(u.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
if resp.StatusCode != http.StatusNoContent {
var reqErr errors.Error
if err := dec.Decode(&reqErr); err != nil {
return nil, err
}
return nil, reqErr
}
var b platform.Bucket
if err := dec.Decode(&b); err != nil {
return nil, err
}
return &b, nil
}
// FindBucket returns the first bucket that matches filter.
func (s *BucketService) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error) {
bs, n, err := s.FindBuckets(ctx, filter)
if err != nil {
return nil, err
}
if n == 0 {
return nil, fmt.Errorf("found no matching buckets")
}
return bs[0], nil
}
// FindBuckets returns a list of buckets that match filter and the total count of matching buckets.
// Additional options provide pagination & sorting.
func (s *BucketService) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) {
u, err := newURL(s.Addr, bucketPath)
if err != nil {
return nil, 0, err
}
query := u.Query()
if filter.OrganizationID != nil {
query.Add("orgID", filter.OrganizationID.String())
}
if filter.ID != nil {
query.Add("bucketID", filter.ID.String())
}
if filter.Name != nil {
query.Add("bucketName", *filter.Name)
}
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, 0, err
}
req.URL.RawQuery = query.Encode()
req.Header.Set("Authorization", s.Token)
hc := newClient(u.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
if resp.StatusCode != http.StatusOK {
var reqErr errors.Error
if err := dec.Decode(&reqErr); err != nil {
return nil, 0, err
}
return nil, 0, reqErr
}
var bs []*platform.Bucket
if err := dec.Decode(&bs); err != nil {
return nil, 0, err
}
return bs, len(bs), nil
}
// CreateBucket creates a new bucket and sets b.ID with the new identifier.
func (s *BucketService) CreateBucket(ctx context.Context, b *platform.Bucket) error {
u, err := newURL(s.Addr, bucketPath)
if err != nil {
return err
}
octets, err := json.Marshal(b)
if err != nil {
return err
}
req, err := http.NewRequest("POST", u.String(), bytes.NewReader(octets))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", s.Token)
hc := newClient(u.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return err
}
// TODO: this should really check the error from the headers
if resp.StatusCode != http.StatusCreated {
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
var reqErr errors.Error
if err := dec.Decode(&reqErr); err != nil {
return err
}
return reqErr
}
if err := json.NewDecoder(resp.Body).Decode(b); err != nil {
return err
}
return nil
}
// UpdateBucket updates a single bucket with changeset.
// Returns the new bucket state after update.
func (s *BucketService) UpdateBucket(ctx context.Context, id platform.ID, upd platform.BucketUpdate) (*platform.Bucket, error) {
u, err := newURL(s.Addr, bucketplatformath(id))
if err != nil {
return nil, err
}
octets, err := json.Marshal(upd)
if err != nil {
return nil, err
}
req, err := http.NewRequest("PATCH", u.String(), bytes.NewReader(octets))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", s.Token)
hc := newClient(u.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
if resp.StatusCode != http.StatusCreated {
var reqErr errors.Error
if err := dec.Decode(&reqErr); err != nil {
return nil, err
}
return nil, reqErr
}
var b platform.Bucket
if err := dec.Decode(&b); err != nil {
return nil, err
}
return &b, nil
}
// DeleteBucket removes a bucket by ID.
func (s *BucketService) DeleteBucket(ctx context.Context, id platform.ID) error {
u, err := newURL(s.Addr, bucketplatformath(id))
if err != nil {
return err
}
req, err := http.NewRequest("DELETE", u.String(), nil)
if err != nil {
return err
}
req.Header.Set("Authorization", s.Token)
hc := newClient(u.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusNoContent {
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
var reqErr errors.Error
if err := dec.Decode(&reqErr); err != nil {
return err
}
return reqErr
}
return nil
}
func bucketplatformath(id platform.ID) string {
return bucketPath + "/" + id.String()
}