Merge pull request #2 from influxdata/feat/create-idpd

Add boltdb implementation of bucket service
pull/10616/head
Michael Desa 2018-05-16 12:06:12 -04:00 committed by GitHub
commit 24b2fbfd95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1577 additions and 49 deletions

3
.gitignore vendored
View File

@ -1,6 +1,9 @@
vendor
.netrc
# binary databases
idpdb.bolt
# Project binaries.
/idp
/transpilerd

146
Gopkg.lock generated
View File

@ -8,10 +8,16 @@
revision = "3a771d992973f24aa725d07868b467d1ddfceafb"
[[projects]]
name = "github.com/gogo/protobuf"
packages = ["proto"]
revision = "1adfc126b41513cc696b209667c8656ea7aac67c"
version = "v1.0.0"
name = "github.com/coreos/bbolt"
packages = ["."]
revision = "583e8937c61f1af6513608ccc75c97b6abdf4ff9"
version = "v1.3.0"
[[projects]]
name = "github.com/fsnotify/fsnotify"
packages = ["."]
revision = "c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9"
version = "v1.4.7"
[[projects]]
name = "github.com/golang/protobuf"
@ -32,36 +38,59 @@
[[projects]]
branch = "master"
name = "github.com/influxdata/ifql"
name = "github.com/hashicorp/hcl"
packages = [
"ast",
"compiler",
"id",
"interpreter",
"parser",
"query",
"query/execute",
"query/plan",
"semantic",
"values"
".",
"hcl/ast",
"hcl/parser",
"hcl/printer",
"hcl/scanner",
"hcl/strconv",
"hcl/token",
"json/parser",
"json/scanner",
"json/token"
]
revision = "66973752cd65cd99dc43332a94272e250672d448"
revision = "ef8a98b0bbce4a65b5aa4c368430a80ddc533168"
[[projects]]
name = "github.com/inconshreveable/mousetrap"
packages = ["."]
revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75"
version = "v1.0"
[[projects]]
branch = "master"
name = "github.com/influxdata/influxdb"
packages = [
"models",
"pkg/escape",
"logger",
"pkg/snowflake"
]
revision = "200fda999f915dfc13c2b4030a2db6e0b08c689f"
[[projects]]
name = "github.com/jsternberg/zap-logfmt"
packages = ["."]
revision = "ac4bd917e18a4548ce6e0e765b29a4e7f397b0b6"
version = "v1.0.0"
[[projects]]
name = "github.com/julienschmidt/httprouter"
packages = ["."]
revision = "d1898390779332322e6b5ca5011da4bf249bb056"
[[projects]]
name = "github.com/magiconair/properties"
packages = ["."]
revision = "c3beff4c2358b44d0493c7dda585e7db7ff28ae6"
version = "v1.7.6"
[[projects]]
branch = "master"
name = "github.com/mattn/go-isatty"
packages = ["."]
revision = "6ca4dbf54d38eea1a992b3c722a76a5d1c4cb25c"
[[projects]]
name = "github.com/matttproud/golang_protobuf_extensions"
packages = ["pbutil"]
@ -69,19 +98,16 @@
version = "v1.0.0"
[[projects]]
name = "github.com/opentracing/opentracing-go"
packages = [
".",
"log"
]
revision = "1949ddbfd147afd4d964a9f00b24eb291e0e7c38"
version = "v1.0.2"
branch = "master"
name = "github.com/mitchellh/mapstructure"
packages = ["."]
revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b"
[[projects]]
name = "github.com/pkg/errors"
name = "github.com/pelletier/go-toml"
packages = ["."]
revision = "645ef00459ed84a119197bfb8d8205042c6df63d"
version = "v0.8.0"
revision = "acdc4509485b587f5e675510c4f2c63e90ff68a8"
version = "v1.1.0"
[[projects]]
name = "github.com/prometheus/client_golang"
@ -119,11 +145,44 @@
revision = "8b1c2da0d56deffdbb9e48d4414b4e674bd8083e"
[[projects]]
name = "github.com/satori/go.uuid"
name = "github.com/spf13/afero"
packages = [
".",
"mem"
]
revision = "63644898a8da0bc22138abf860edaf5277b6102e"
version = "v1.1.0"
[[projects]]
name = "github.com/spf13/cast"
packages = ["."]
revision = "f58768cc1a7a7e77a3bd49e98cdd21419399b6a3"
revision = "8965335b8c7107321228e3e3702cab9832751bac"
version = "v1.2.0"
[[projects]]
name = "github.com/spf13/cobra"
packages = ["."]
revision = "ef82de70bb3f60c65fb8eebacbb2d122ef517385"
version = "v0.0.3"
[[projects]]
branch = "master"
name = "github.com/spf13/jwalterweatherman"
packages = ["."]
revision = "7c0cea34c8ece3fbeb2b27ab9b59511d360fb394"
[[projects]]
name = "github.com/spf13/pflag"
packages = ["."]
revision = "583c0c0531f06d5278b7d917446061adc344b5cd"
version = "v1.0.1"
[[projects]]
name = "github.com/spf13/viper"
packages = ["."]
revision = "b5e8006cbee93ec955a89ab31e0e3ce3204f3736"
version = "v1.0.2"
[[projects]]
name = "go.uber.org/atomic"
packages = ["."]
@ -151,13 +210,32 @@
[[projects]]
branch = "master"
name = "golang.org/x/net"
packages = ["context"]
revision = "2491c5de3490fced2f6cff376127c667efeed857"
name = "golang.org/x/sys"
packages = ["unix"]
revision = "7c87d13f8e835d2fb3a70a2912c811ed0c1d241b"
[[projects]]
name = "golang.org/x/text"
packages = [
"internal/gen",
"internal/triegen",
"internal/ucd",
"transform",
"unicode/cldr",
"unicode/norm"
]
revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0"
version = "v0.3.0"
[[projects]]
name = "gopkg.in/yaml.v2"
packages = ["."]
revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183"
version = "v2.2.1"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "9a6351899a346f8d38fe7215b78df522799b8a8aefb6619c9025a1ace2549126"
inputs-digest = "6cd13a6e1af35a40f1e1ed0f2a3c5929ec54f41f0b9da941bfaaaecf2da7f840"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -25,18 +25,10 @@
# unused-packages = true
[[constraint]]
name = "github.com/gogo/protobuf"
version = "1.0.0"
[[constraint]]
name = "github.com/google/go-cmp"
version = "0.2.0"
[[constraint]]
name = "github.com/influxdata/ifql"
branch = "master"
[[constraint]]
name = "github.com/influxdata/influxdb"
branch = "master"

77
bolt/bbolt.go Normal file
View File

@ -0,0 +1,77 @@
package bolt
import (
"context"
"fmt"
"os"
"time"
"github.com/coreos/bbolt"
"github.com/influxdata/platform"
"github.com/influxdata/platform/snowflake"
)
const (
// ErrUnableToOpen means we had an issue establishing a connection (or creating the database)
ErrUnableToOpen = "Unable to open boltdb; is there a chronograf already running? %v"
// ErrUnableToBackup means we couldn't copy the db file into ./backup
ErrUnableToBackup = "Unable to backup your database prior to migrations: %v"
// ErrUnableToInitialize means we couldn't create missing Buckets (maybe a timeout)
ErrUnableToInitialize = "Unable to boot boltdb: %v"
// ErrUnableToMigrate means we had an issue changing the db schema
ErrUnableToMigrate = "Unable to migrate boltdb: %v"
)
// Client is a client for the boltDB data store.
type Client struct {
Path string
db *bolt.DB
IDGenerator platform.IDGenerator
}
// NewClient returns an instance of a Client.
func NewClient() *Client {
return &Client{
IDGenerator: snowflake.NewIDGenerator(),
}
}
// Open / create boltDB file.
func (c *Client) Open(ctx context.Context) error {
if _, err := os.Stat(c.Path); err != nil && !os.IsNotExist(err) {
return err
}
// Open database file.
db, err := bolt.Open(c.Path, 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return fmt.Errorf(ErrUnableToOpen, err)
}
c.db = db
return c.initialize(context.TODO())
}
// initialize creates Buckets that are missing
func (c *Client) initialize(ctx context.Context) error {
if err := c.db.Update(func(tx *bolt.Tx) error {
// Always create Buckets bucket.
if err := c.initializeBuckets(ctx, tx); err != nil {
return err
}
return nil
}); err != nil {
return err
}
return nil
}
// Close the connection to the bolt database
func (c *Client) Close() error {
if c.db != nil {
return c.db.Close()
}
return nil
}

33
bolt/bbolt_test.go Normal file
View File

@ -0,0 +1,33 @@
package bolt_test
import (
"context"
"errors"
"io/ioutil"
"os"
"github.com/influxdata/platform/bolt"
)
func NewTestClient() (*bolt.Client, func(), error) {
c := bolt.NewClient()
f, err := ioutil.TempFile("", "influxdata-platform-bolt-")
if err != nil {
return nil, nil, errors.New("unable to open temporary boltdb file")
}
f.Close()
c.Path = f.Name()
if err := c.Open(context.TODO()); err != nil {
return nil, nil, err
}
close := func() {
c.Close()
os.Remove(c.Path)
}
return c, close, nil
}

326
bolt/bucket.go Normal file
View File

@ -0,0 +1,326 @@
package bolt
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/coreos/bbolt"
"github.com/influxdata/platform"
)
var (
bucketBucket = []byte("bucketsv1")
bucketIndex = []byte("bucketindexv1")
)
func (c *Client) initializeBuckets(ctx context.Context, tx *bolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists([]byte(bucketBucket)); err != nil {
return err
}
if _, err := tx.CreateBucketIfNotExists([]byte(bucketIndex)); err != nil {
return err
}
return nil
}
// FindBucketByID retrieves a bucket by id.
func (c *Client) FindBucketByID(ctx context.Context, id platform.ID) (*platform.Bucket, error) {
var b *platform.Bucket
err := c.db.View(func(tx *bolt.Tx) error {
bkt, err := c.findBucketByID(ctx, tx, id)
if err != nil {
return err
}
b = bkt
return nil
})
if err != nil {
return nil, err
}
return b, nil
}
func (c *Client) findBucketByID(ctx context.Context, tx *bolt.Tx, id platform.ID) (*platform.Bucket, error) {
var b platform.Bucket
v := tx.Bucket(bucketBucket).Get(id)
if len(v) == 0 {
// TODO: Make standard error
return nil, fmt.Errorf("bucket not found")
}
if err := json.Unmarshal(v, &b); err != nil {
return nil, err
}
return &b, nil
}
// FindBucketByName returns a bucket by name for a particular organization.
// TODO: have method for finding bucket using organization name and bucket name.
func (c *Client) FindBucketByName(ctx context.Context, orgID platform.ID, n string) (*platform.Bucket, error) {
var b *platform.Bucket
err := c.db.View(func(tx *bolt.Tx) error {
bkt, err := c.findBucketByName(ctx, tx, orgID, n)
if err != nil {
return err
}
b = bkt
return nil
})
return b, err
}
func (c *Client) findBucketByName(ctx context.Context, tx *bolt.Tx, orgID platform.ID, n string) (*platform.Bucket, error) {
b := &platform.Bucket{
OrganizationID: orgID,
Name: n,
}
id := tx.Bucket(bucketIndex).Get(bucketIndexKey(b))
v := tx.Bucket(bucketBucket).Get(id)
if len(v) == 0 {
return nil, fmt.Errorf("bucket not found")
}
if err := json.Unmarshal(v, b); err != nil {
return nil, err
}
return b, nil
}
// FindBucket retrives a bucket using an arbitrary bucket filter.
// Filters using ID, or OrganizationID and bucket Name should be efficient.
// Other filters will do a linear scan across buckets until it finds a match.
func (c *Client) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error) {
if filter.ID != nil {
return c.FindBucketByID(ctx, *filter.ID)
}
if filter.Name != nil && filter.OrganizationID != nil {
return c.FindBucketByName(ctx, *filter.OrganizationID, *filter.Name)
}
filterFn := filterBucketsFn(filter)
var b *platform.Bucket
err := c.db.View(func(tx *bolt.Tx) error {
return forEachBucket(ctx, tx, func(bkt *platform.Bucket) bool {
if filterFn(bkt) {
b = bkt
return false
}
return true
})
})
if err != nil {
return nil, err
}
if b == nil {
return nil, fmt.Errorf("bucket not found")
}
return b, nil
}
func filterBucketsFn(filter platform.BucketFilter) func(b *platform.Bucket) bool {
if filter.ID != nil {
return func(b *platform.Bucket) bool {
return bytes.Equal(b.ID, *filter.ID)
}
}
if filter.Name != nil && filter.OrganizationID != nil {
return func(b *platform.Bucket) bool {
return bytes.Equal(b.OrganizationID, *filter.OrganizationID) && b.Name == *filter.Name
}
}
if filter.Name != nil {
return func(b *platform.Bucket) bool {
return b.Name == *filter.Name
}
}
if filter.OrganizationID != nil {
return func(b *platform.Bucket) bool {
return b.Name == *filter.Name
}
}
return func(b *platform.Bucket) bool { return true }
}
// FindBuckets retrives all buckets that match an arbitrary bucket filter.
// Filters using ID, or OrganizationID and bucket Name should be efficient.
// Other filters will do a linear scan across all buckets searching for a match.
func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) {
if filter.ID != nil {
b, err := c.FindBucketByID(ctx, *filter.ID)
if err != nil {
return nil, 0, err
}
return []*platform.Bucket{b}, 1, nil
}
if filter.Name != nil && filter.OrganizationID != nil {
b, err := c.FindBucketByName(ctx, *filter.OrganizationID, *filter.Name)
if err != nil {
return nil, 0, err
}
return []*platform.Bucket{b}, 1, nil
}
bs := []*platform.Bucket{}
filterFn := filterBucketsFn(filter)
err := c.db.View(func(tx *bolt.Tx) error {
return forEachBucket(ctx, tx, func(b *platform.Bucket) bool {
if filterFn(b) {
bs = append(bs, b)
}
return true
})
})
if err != nil {
return nil, 0, err
}
return bs, len(bs), nil
}
// CreateBucket creates a platform bucket and sets b.ID.
func (c *Client) CreateBucket(ctx context.Context, b *platform.Bucket) error {
return c.db.Update(func(tx *bolt.Tx) error {
unique := c.uniqueBucketName(ctx, tx, b)
if !unique {
// TODO: make standard error
return fmt.Errorf("bucket with name %s already exists", b.Name)
}
b.ID = c.IDGenerator.ID()
return c.putBucket(ctx, tx, b)
})
}
// PutBucket will put a bucket without setting an ID.
func (c *Client) PutBucket(ctx context.Context, b *platform.Bucket) error {
return c.db.Update(func(tx *bolt.Tx) error {
return c.putBucket(ctx, tx, b)
})
}
func (c *Client) putBucket(ctx context.Context, tx *bolt.Tx, b *platform.Bucket) error {
v, err := json.Marshal(b)
if err != nil {
return err
}
if err := tx.Bucket(bucketIndex).Put(bucketIndexKey(b), b.ID); err != nil {
return err
}
return tx.Bucket(bucketBucket).Put(b.ID, v)
}
func bucketIndexKey(b *platform.Bucket) []byte {
k := make([]byte, len(b.OrganizationID)+len(b.Name))
copy(k, b.OrganizationID)
copy(k[len(b.OrganizationID):], []byte(b.Name))
return k
}
// forEachBucket will iterate through all buckets while fn returns true.
func forEachBucket(ctx context.Context, tx *bolt.Tx, fn func(*platform.Bucket) bool) error {
cur := tx.Bucket(bucketBucket).Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
b := &platform.Bucket{}
if err := json.Unmarshal(v, b); err != nil {
return err
}
if !fn(b) {
break
}
}
return nil
}
func (c *Client) uniqueBucketName(ctx context.Context, tx *bolt.Tx, b *platform.Bucket) bool {
v := tx.Bucket(bucketIndex).Get(bucketIndexKey(b))
return len(v) == 0
}
// UpdateBucket updates a bucket according the parameters set on upd.
func (c *Client) UpdateBucket(ctx context.Context, id platform.ID, upd platform.BucketUpdate) (*platform.Bucket, error) {
var b *platform.Bucket
err := c.db.Update(func(tx *bolt.Tx) error {
bkt, err := c.updateBucket(ctx, tx, id, upd)
if err != nil {
return err
}
b = bkt
return nil
})
return b, err
}
func (c *Client) updateBucket(ctx context.Context, tx *bolt.Tx, id platform.ID, upd platform.BucketUpdate) (*platform.Bucket, error) {
b, err := c.findBucketByID(ctx, tx, id)
if err != nil {
return nil, err
}
if upd.RetentionPeriod != nil {
b.RetentionPeriod = *upd.RetentionPeriod
}
if upd.Name != nil {
// Buckets are indexed by name and so the bucket index must be pruned when name
// is modified.
if err := tx.Bucket(bucketIndex).Delete(bucketIndexKey(b)); err != nil {
return nil, err
}
b.Name = *upd.Name
}
if err := c.putBucket(ctx, tx, b); err != nil {
return nil, err
}
return b, nil
}
// DeleteBucket deletes a bucket and prunes it from the index.
func (c *Client) DeleteBucket(ctx context.Context, id platform.ID) error {
return c.db.Update(func(tx *bolt.Tx) error {
return c.deleteBucket(ctx, tx, id)
})
}
func (c *Client) deleteBucket(ctx context.Context, tx *bolt.Tx, id platform.ID) error {
b, err := c.findBucketByID(ctx, tx, id)
if err != nil {
return err
}
// make lowercase deleteBucket with tx
if err := tx.Bucket(bucketIndex).Delete(bucketIndexKey(b)); err != nil {
return err
}
return tx.Bucket(bucketBucket).Delete(id)
}

55
bolt/bucket_test.go Normal file
View File

@ -0,0 +1,55 @@
package bolt_test
import (
"context"
"testing"
"github.com/influxdata/platform"
platformtesting "github.com/influxdata/platform/testing"
)
func initBucketService(f platformtesting.BucketFields, t *testing.T) (platform.BucketService, func()) {
c, closeFn, err := NewTestClient()
if err != nil {
t.Fatalf("failed to create new bolt client: %v", err)
}
c.IDGenerator = f.IDGenerator
ctx := context.TODO()
for _, u := range f.Buckets {
if err := c.PutBucket(ctx, u); err != nil {
t.Fatalf("failed to populate buckets")
}
}
return c, func() {
defer closeFn()
for _, u := range f.Buckets {
if err := c.DeleteBucket(ctx, u.ID); err != nil {
t.Logf("failed to remove buckets: %v", err)
}
}
}
}
func TestBucketService_CreateBucket(t *testing.T) {
platformtesting.CreateBucket(initBucketService, t)
}
func TestBucketService_FindBucketByID(t *testing.T) {
platformtesting.FindBucketByID(initBucketService, t)
}
func TestBucketService_FindBuckets(t *testing.T) {
platformtesting.FindBuckets(initBucketService, t)
}
func TestBucketService_DeleteBucket(t *testing.T) {
platformtesting.DeleteBucket(initBucketService, t)
}
func TestBucketService_FindBucket(t *testing.T) {
platformtesting.FindBucket(initBucketService, t)
}
func TestBucketService_UpdateBucket(t *testing.T) {
platformtesting.UpdateBucket(initBucketService, t)
}

146
cmd/idpd/main.go Normal file
View File

@ -0,0 +1,146 @@
package main
import (
"context"
"fmt"
nethttp "net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
"go.uber.org/zap"
influxlogger "github.com/influxdata/influxdb/logger"
"github.com/influxdata/platform"
"github.com/influxdata/platform/bolt"
"github.com/influxdata/platform/http"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
func main() {
Execute()
}
var (
httpBindAddress string
authorizationPath string
boltPath string
)
func init() {
viper.SetEnvPrefix("INFLUX")
platformCmd.Flags().StringVar(&httpBindAddress, "http-bind-address", ":9999", "bind address for the rest http api")
viper.BindEnv("HTTP_BIND_ADDRESS")
if h := viper.GetString("HTTP_BIND_ADDRESS"); h != "" {
httpBindAddress = h
}
platformCmd.Flags().StringVar(&authorizationPath, "authorizationPath", "", "path to a bootstrap token")
viper.BindEnv("TOKEN_PATH")
if h := viper.GetString("TOKEN_PATH"); h != "" {
authorizationPath = h
}
platformCmd.Flags().StringVar(&boltPath, "bolt-path", "idpdb.bolt", "path to boltdb database")
viper.BindEnv("BOLT_PATH")
if h := viper.GetString("BOLT_PATH"); h != "" {
boltPath = h
}
}
var platformCmd = &cobra.Command{
Use: "idpd",
Short: "influxdata platform",
Run: platformF,
}
func platformF(cmd *cobra.Command, args []string) {
// Create top level logger
logger := influxlogger.New(os.Stdout)
c := bolt.NewClient()
c.Path = boltPath
if err := c.Open(context.TODO()); err != nil {
logger.Error("failed opening bolt", zap.Error(err))
os.Exit(1)
}
defer c.Close()
var authSvc platform.AuthorizationService
{
}
var bucketSvc platform.BucketService
{
bucketSvc = c
}
var orgSvc platform.OrganizationService
{
}
var userSvc platform.UserService
{
}
errc := make(chan error)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
httpServer := &nethttp.Server{
Addr: httpBindAddress,
}
// HTTP server
go func() {
bucketHandler := http.NewBucketHandler()
bucketHandler.BucketService = bucketSvc
orgHandler := http.NewOrgHandler()
orgHandler.OrganizationService = orgSvc
userHandler := http.NewUserHandler()
userHandler.UserService = userSvc
authHandler := http.NewAuthorizationHandler()
authHandler.AuthorizationService = authSvc
authHandler.Logger = logger.With(zap.String("handler", "auth"))
platformHandler := &http.PlatformHandler{
BucketHandler: bucketHandler,
OrgHandler: orgHandler,
UserHandler: userHandler,
AuthorizationHandler: authHandler,
}
h := http.NewHandler("platform")
h.Handler = platformHandler
httpServer.Handler = h
logger.Info("listening", zap.String("transport", "http"), zap.String("addr", httpBindAddress))
errc <- httpServer.ListenAndServe()
}()
select {
case <-sigs:
case err := <-errc:
logger.Fatal("unable to start platform", zap.Error(err))
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
httpServer.Shutdown(ctx)
}
// Execute executes the idped command
func Execute() {
if err := platformCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}

59
http/platform_handler.go Normal file
View File

@ -0,0 +1,59 @@
package http
import (
nethttp "net/http"
"strings"
idpctx "github.com/influxdata/platform/context"
)
// PlatformHandler is a collection of all the service handlers.
type PlatformHandler struct {
BucketHandler *BucketHandler
UserHandler *UserHandler
OrgHandler *OrgHandler
AuthorizationHandler *AuthorizationHandler
}
func setCORSResponseHeaders(w nethttp.ResponseWriter, r *nethttp.Request) {
if origin := r.Header.Get("Origin"); origin != "" {
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
w.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, Authorization")
}
}
// ServeHTTP delegates a request to the appropriate subhandler.
func (h *PlatformHandler) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request) {
setCORSResponseHeaders(w, r)
if r.Method == "OPTIONS" {
return
}
ctx := r.Context()
ctx = idpctx.TokenFromAuthorizationHeader(ctx, r)
r = r.WithContext(ctx)
if strings.HasPrefix(r.URL.Path, "/v1/buckets") {
h.BucketHandler.ServeHTTP(w, r)
return
}
if strings.HasPrefix(r.URL.Path, "/v1/users") {
h.UserHandler.ServeHTTP(w, r)
return
}
if strings.HasPrefix(r.URL.Path, "/v1/orgs") {
h.OrgHandler.ServeHTTP(w, r)
return
}
if strings.HasPrefix(r.URL.Path, "/v1/authorizations") {
h.AuthorizationHandler.ServeHTTP(w, r)
return
}
nethttp.NotFound(w, r)
}

View File

@ -8,6 +8,12 @@ import (
// ID is a unique identifier.
type ID []byte
// IDGenerator represents a generator for IDs.
type IDGenerator interface {
// ID creates unique byte slice ID.
ID() ID
}
// Decode parses b as a hex-encoded byte-slice-string.
func (i *ID) Decode(b []byte) error {
dst := make([]byte, hex.DecodedLen(len(b)))
@ -36,12 +42,6 @@ func (i ID) String() string {
return string(i.Encode())
}
// IDGenerator represents a generator for IDs.
type IDGenerator interface {
// ID creates unique byte slice ID.
ID() ID
}
// UnmarshalJSON implements JSON unmarshaller for IDs.
func (i *ID) UnmarshalJSON(b []byte) error {
b = b[1 : len(b)-1]

View File

@ -5,6 +5,8 @@ import (
"net/http"
)
// TODO: move to base directory
const (
// InternalError indicates an unexpected error condition.
InternalError = 1

View File

@ -6,6 +6,8 @@ import (
"net/http"
)
// TODO: move to http directory
// EncodeHTTP encodes err with the appropriate status code and format,
// sets the X-Influx-Error and X-Influx-Reference headers on the response,
// and sets the response status to the corresponding status code.

View File

@ -7,6 +7,8 @@ import (
"github.com/influxdata/platform"
)
// TODO: rename to token.go
// TokenGenerator implements platform.TokenGenerator.
type TokenGenerator struct {
size int

View File

@ -5,10 +5,12 @@ import (
"math/rand"
"time"
"github.com/influxdata/platform"
"github.com/influxdata/influxdb/pkg/snowflake"
"github.com/influxdata/platform"
)
// TODO: rename to id.go
func init() {
rand.Seed(time.Now().UnixNano())
}

751
testing/bucket_service.go Normal file
View File

@ -0,0 +1,751 @@
package testing
import (
"bytes"
"context"
"fmt"
"sort"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform"
"github.com/influxdata/platform/mock"
)
var bucketCmpOptions = cmp.Options{
cmp.Comparer(func(x, y []byte) bool {
return bytes.Equal(x, y)
}),
cmp.Transformer("Sort", func(in []*platform.Bucket) []*platform.Bucket {
out := append([]*platform.Bucket(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].ID.String() > out[j].ID.String()
})
return out
}),
}
// BucketFields will include the IDGenerator, and buckets
type BucketFields struct {
IDGenerator platform.IDGenerator
Buckets []*platform.Bucket
}
// CreateBucket testing
func CreateBucket(
init func(BucketFields, *testing.T) (platform.BucketService, func()),
t *testing.T,
) {
type args struct {
bucket *platform.Bucket
}
type wants struct {
err error
buckets []*platform.Bucket
}
tests := []struct {
name string
fields BucketFields
args args
wants wants
}{
{
name: "create buckets with empty set",
fields: BucketFields{
IDGenerator: mock.NewIDGenerator("id1"),
Buckets: []*platform.Bucket{},
},
args: args{
bucket: &platform.Bucket{
Name: "name1",
OrganizationID: platform.ID("org1"),
},
},
wants: wants{
buckets: []*platform.Bucket{
{
Name: "name1",
ID: platform.ID("id1"),
OrganizationID: platform.ID("org1"),
},
},
},
},
{
name: "basic create bucket",
fields: BucketFields{
IDGenerator: &mock.IDGenerator{
IDFn: func() platform.ID {
return platform.ID("2")
},
},
Buckets: []*platform.Bucket{
{
ID: platform.ID("1"),
Name: "bucket1",
OrganizationID: platform.ID("org1"),
},
},
},
args: args{
bucket: &platform.Bucket{
Name: "bucket2",
OrganizationID: platform.ID("org1"),
},
},
wants: wants{
buckets: []*platform.Bucket{
{
ID: platform.ID("1"),
Name: "bucket1",
OrganizationID: platform.ID("org1"),
},
{
ID: platform.ID("2"),
Name: "bucket2",
OrganizationID: platform.ID("org1"),
},
},
},
},
{
name: "names should be unique within an organization",
fields: BucketFields{
IDGenerator: &mock.IDGenerator{
IDFn: func() platform.ID {
return platform.ID("2")
},
},
Buckets: []*platform.Bucket{
{
ID: platform.ID("1"),
Name: "bucket1",
OrganizationID: platform.ID("org1"),
},
},
},
args: args{
bucket: &platform.Bucket{
Name: "bucket1",
OrganizationID: platform.ID("org1"),
},
},
wants: wants{
buckets: []*platform.Bucket{
{
ID: platform.ID("1"),
Name: "bucket1",
OrganizationID: platform.ID("org1"),
},
},
err: fmt.Errorf("bucket with name bucket1 already exists"),
},
},
{
name: "names should not be unique across organizations",
fields: BucketFields{
IDGenerator: &mock.IDGenerator{
IDFn: func() platform.ID {
return platform.ID("2")
},
},
Buckets: []*platform.Bucket{
{
ID: platform.ID("1"),
Name: "bucket1",
OrganizationID: platform.ID("org1"),
},
},
},
args: args{
bucket: &platform.Bucket{
Name: "bucket1",
OrganizationID: platform.ID("org2"),
},
},
wants: wants{
buckets: []*platform.Bucket{
{
ID: platform.ID("1"),
Name: "bucket1",
OrganizationID: platform.ID("org1"),
},
{
ID: platform.ID("2"),
Name: "bucket1",
OrganizationID: platform.ID("org2"),
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.TODO()
err := s.CreateBucket(ctx, tt.args.bucket)
if (err != nil) != (tt.wants.err != nil) {
t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err)
}
if err != nil && tt.wants.err != nil {
if err.Error() != tt.wants.err.Error() {
t.Fatalf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error())
}
}
defer s.DeleteBucket(ctx, tt.args.bucket.ID)
buckets, _, err := s.FindBuckets(ctx, platform.BucketFilter{})
if err != nil {
t.Fatalf("failed to retrieve buckets: %v", err)
}
if diff := cmp.Diff(buckets, tt.wants.buckets, bucketCmpOptions...); diff != "" {
t.Errorf("buckets are different -got/+want\ndiff %s", diff)
}
})
}
}
// FindBucketByID testing
func FindBucketByID(
init func(BucketFields, *testing.T) (platform.BucketService, func()),
t *testing.T,
) {
type args struct {
id platform.ID
}
type wants struct {
err error
bucket *platform.Bucket
}
tests := []struct {
name string
fields BucketFields
args args
wants wants
}{
{
name: "basic find bucket by id",
fields: BucketFields{
Buckets: []*platform.Bucket{
{
ID: platform.ID("1"),
OrganizationID: platform.ID("org1"),
Name: "bucket1",
},
{
ID: platform.ID("2"),
OrganizationID: platform.ID("org1"),
Name: "bucket2",
},
},
},
args: args{
id: platform.ID("2"),
},
wants: wants{
bucket: &platform.Bucket{
ID: platform.ID("2"),
OrganizationID: platform.ID("org1"),
Name: "bucket2",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.TODO()
bucket, err := s.FindBucketByID(ctx, tt.args.id)
if (err != nil) != (tt.wants.err != nil) {
t.Fatalf("expected errors to be equal '%v' got '%v'", tt.wants.err, err)
}
if err != nil && tt.wants.err != nil {
if err.Error() != tt.wants.err.Error() {
t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err)
}
}
if diff := cmp.Diff(bucket, tt.wants.bucket, bucketCmpOptions...); diff != "" {
t.Errorf("bucket is different -got/+want\ndiff %s", diff)
}
})
}
}
// FindBuckets testing
func FindBuckets(
init func(BucketFields, *testing.T) (platform.BucketService, func()),
t *testing.T,
) {
type args struct {
ID string
name string
}
type wants struct {
buckets []*platform.Bucket
err error
}
tests := []struct {
name string
fields BucketFields
args args
wants wants
}{
{
name: "find all buckets",
fields: BucketFields{
Buckets: []*platform.Bucket{
{
ID: platform.ID("test1"),
OrganizationID: platform.ID("org1"),
Name: "abc",
},
{
ID: platform.ID("test2"),
OrganizationID: platform.ID("org2"),
Name: "xyz",
},
},
},
args: args{},
wants: wants{
buckets: []*platform.Bucket{
{
ID: platform.ID("test1"),
OrganizationID: platform.ID("org1"),
Name: "abc",
},
{
ID: platform.ID("test2"),
OrganizationID: platform.ID("org2"),
Name: "xyz",
},
},
},
},
{
name: "find bucket by id",
fields: BucketFields{
Buckets: []*platform.Bucket{
{
ID: platform.ID("test1"),
OrganizationID: platform.ID("org1"),
Name: "abc",
},
{
ID: platform.ID("test2"),
OrganizationID: platform.ID("org1"),
Name: "xyz",
},
},
},
args: args{
ID: "test2",
},
wants: wants{
buckets: []*platform.Bucket{
{
ID: platform.ID("test2"),
OrganizationID: platform.ID("org1"),
Name: "xyz",
},
},
},
},
{
name: "find bucket by name",
fields: BucketFields{
Buckets: []*platform.Bucket{
{
ID: platform.ID("test1"),
OrganizationID: platform.ID("org1"),
Name: "abc",
},
{
ID: platform.ID("test2"),
OrganizationID: platform.ID("org1"),
Name: "xyz",
},
},
},
args: args{
name: "xyz",
},
wants: wants{
buckets: []*platform.Bucket{
{
ID: platform.ID("test2"),
OrganizationID: platform.ID("org1"),
Name: "xyz",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.TODO()
filter := platform.BucketFilter{}
if tt.args.ID != "" {
id := platform.ID(tt.args.ID)
filter.ID = &id
}
if tt.args.name != "" {
filter.Name = &tt.args.name
}
buckets, _, err := s.FindBuckets(ctx, filter)
if (err != nil) != (tt.wants.err != nil) {
t.Fatalf("expected errors to be equal '%v' got '%v'", tt.wants.err, err)
}
if err != nil && tt.wants.err != nil {
if err.Error() != tt.wants.err.Error() {
t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err)
}
}
if diff := cmp.Diff(buckets, tt.wants.buckets, bucketCmpOptions...); diff != "" {
t.Errorf("buckets are different -got/+want\ndiff %s", diff)
}
})
}
}
// DeleteBucket testing
func DeleteBucket(
init func(BucketFields, *testing.T) (platform.BucketService, func()),
t *testing.T,
) {
type args struct {
ID string
}
type wants struct {
err error
buckets []*platform.Bucket
}
tests := []struct {
name string
fields BucketFields
args args
wants wants
}{
{
name: "delete buckets using exist id",
fields: BucketFields{
Buckets: []*platform.Bucket{
{
Name: "orgA",
ID: platform.ID("abc"),
},
{
Name: "orgB",
ID: platform.ID("xyz"),
},
},
},
args: args{
ID: "abc",
},
wants: wants{
buckets: []*platform.Bucket{
{
Name: "orgB",
ID: platform.ID("xyz"),
},
},
},
},
{
name: "delete buckets using id that does not exist",
fields: BucketFields{
Buckets: []*platform.Bucket{
{
Name: "orgA",
ID: platform.ID("abc"),
},
{
Name: "orgB",
ID: platform.ID("xyz"),
},
},
},
args: args{
ID: "123",
},
wants: wants{
err: fmt.Errorf("bucket not found"),
buckets: []*platform.Bucket{
{
Name: "orgA",
ID: platform.ID("abc"),
},
{
Name: "orgB",
ID: platform.ID("xyz"),
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.TODO()
err := s.DeleteBucket(ctx, platform.ID(tt.args.ID))
if (err != nil) != (tt.wants.err != nil) {
t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err)
}
if err != nil && tt.wants.err != nil {
if err.Error() != tt.wants.err.Error() {
t.Fatalf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error())
}
}
filter := platform.BucketFilter{}
buckets, _, err := s.FindBuckets(ctx, filter)
if err != nil {
t.Fatalf("failed to retrieve buckets: %v", err)
}
if diff := cmp.Diff(buckets, tt.wants.buckets, bucketCmpOptions...); diff != "" {
t.Errorf("buckets are different -got/+want\ndiff %s", diff)
}
})
}
}
// FindBucket testing
func FindBucket(
init func(BucketFields, *testing.T) (platform.BucketService, func()),
t *testing.T,
) {
type args struct {
name string
}
type wants struct {
bucket *platform.Bucket
err error
}
tests := []struct {
name string
fields BucketFields
args args
wants wants
}{
{
name: "find bucket by name",
fields: BucketFields{
Buckets: []*platform.Bucket{
{
ID: platform.ID("a"),
Name: "abc",
},
{
ID: platform.ID("b"),
Name: "xyz",
},
},
},
args: args{
name: "abc",
},
wants: wants{
bucket: &platform.Bucket{
ID: platform.ID("a"),
Name: "abc",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.TODO()
filter := platform.BucketFilter{}
if tt.args.name != "" {
filter.Name = &tt.args.name
}
bucket, err := s.FindBucket(ctx, filter)
if (err != nil) != (tt.wants.err != nil) {
t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err)
}
if err != nil && tt.wants.err != nil {
if err.Error() != tt.wants.err.Error() {
t.Fatalf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error())
}
}
if diff := cmp.Diff(bucket, tt.wants.bucket, bucketCmpOptions...); diff != "" {
t.Errorf("buckets are different -got/+want\ndiff %s", diff)
}
})
}
}
// UpdateBucket testing
func UpdateBucket(
init func(BucketFields, *testing.T) (platform.BucketService, func()),
t *testing.T,
) {
type args struct {
name string
id platform.ID
retention int
}
type wants struct {
err error
bucket *platform.Bucket
}
tests := []struct {
name string
fields BucketFields
args args
wants wants
}{
{
name: "update name",
fields: BucketFields{
Buckets: []*platform.Bucket{
{
ID: platform.ID("1"),
OrganizationID: platform.ID("org1"),
Name: "bucket1",
},
{
ID: platform.ID("2"),
OrganizationID: platform.ID("org1"),
Name: "bucket2",
},
},
},
args: args{
id: platform.ID("1"),
name: "changed",
},
wants: wants{
bucket: &platform.Bucket{
ID: platform.ID("1"),
OrganizationID: platform.ID("org1"),
Name: "changed",
},
},
},
{
name: "update retention",
fields: BucketFields{
Buckets: []*platform.Bucket{
{
ID: platform.ID("1"),
OrganizationID: platform.ID("org1"),
Name: "bucket1",
},
{
ID: platform.ID("2"),
OrganizationID: platform.ID("org1"),
Name: "bucket2",
},
},
},
args: args{
id: platform.ID("1"),
retention: 100,
},
wants: wants{
bucket: &platform.Bucket{
ID: platform.ID("1"),
OrganizationID: platform.ID("org1"),
Name: "bucket1",
RetentionPeriod: 100 * time.Minute,
},
},
},
{
name: "update retention and name",
fields: BucketFields{
Buckets: []*platform.Bucket{
{
ID: platform.ID("1"),
OrganizationID: platform.ID("org1"),
Name: "bucket1",
},
{
ID: platform.ID("2"),
OrganizationID: platform.ID("org1"),
Name: "bucket2",
},
},
},
args: args{
id: platform.ID("2"),
retention: 101,
name: "changed",
},
wants: wants{
bucket: &platform.Bucket{
ID: platform.ID("2"),
OrganizationID: platform.ID("org1"),
Name: "changed",
RetentionPeriod: 101 * time.Minute,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.TODO()
upd := platform.BucketUpdate{}
if tt.args.name != "" {
upd.Name = &tt.args.name
}
if tt.args.retention != 0 {
d := time.Duration(tt.args.retention) * time.Minute
upd.RetentionPeriod = &d
}
bucket, err := s.UpdateBucket(ctx, tt.args.id, upd)
if (err != nil) != (tt.wants.err != nil) {
t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err)
}
if err != nil && tt.wants.err != nil {
if err.Error() != tt.wants.err.Error() {
t.Fatalf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error())
}
}
if diff := cmp.Diff(bucket, tt.wants.bucket, bucketCmpOptions...); diff != "" {
t.Errorf("bucket is different -got/+want\ndiff %s", diff)
}
})
}
}