package bolt import ( "bytes" "context" "encoding/json" "fmt" "github.com/coreos/bbolt" "github.com/influxdata/platform" ) var ( bucketBucket = []byte("bucketsv1") bucketIndex = []byte("bucketindexv1") ) func (c *Client) initializeBuckets(ctx context.Context, tx *bolt.Tx) error { if _, err := tx.CreateBucketIfNotExists([]byte(bucketBucket)); err != nil { return err } if _, err := tx.CreateBucketIfNotExists([]byte(bucketIndex)); err != nil { return err } return nil } func (c *Client) setOrganizationOnBucket(ctx context.Context, tx *bolt.Tx, b *platform.Bucket) error { o, err := c.findOrganizationByID(ctx, tx, b.OrganizationID) if err != nil { return err } b.Organization = o.Name return nil } // FindBucketByID retrieves a bucket by id. func (c *Client) FindBucketByID(ctx context.Context, id platform.ID) (*platform.Bucket, error) { var b *platform.Bucket err := c.db.View(func(tx *bolt.Tx) error { bkt, err := c.findBucketByID(ctx, tx, id) if err != nil { return err } b = bkt return nil }) if err != nil { return nil, err } return b, nil } func (c *Client) findBucketByID(ctx context.Context, tx *bolt.Tx, id platform.ID) (*platform.Bucket, error) { var b platform.Bucket v := tx.Bucket(bucketBucket).Get(id) if len(v) == 0 { // TODO: Make standard error return nil, fmt.Errorf("bucket not found") } if err := json.Unmarshal(v, &b); err != nil { return nil, err } if err := c.setOrganizationOnBucket(ctx, tx, &b); err != nil { return nil, err } return &b, nil } // FindBucketByName returns a bucket by name for a particular organization. // TODO: have method for finding bucket using organization name and bucket name. func (c *Client) FindBucketByName(ctx context.Context, orgID platform.ID, n string) (*platform.Bucket, error) { var b *platform.Bucket err := c.db.View(func(tx *bolt.Tx) error { bkt, err := c.findBucketByName(ctx, tx, orgID, n) if err != nil { return err } b = bkt return nil }) return b, err } func (c *Client) findBucketByName(ctx context.Context, tx *bolt.Tx, orgID platform.ID, n string) (*platform.Bucket, error) { b := &platform.Bucket{ OrganizationID: orgID, Name: n, } id := tx.Bucket(bucketIndex).Get(bucketIndexKey(b)) return c.findBucketByID(ctx, tx, platform.ID(id)) } // FindBucket retrives a bucket using an arbitrary bucket filter. // Filters using ID, or OrganizationID and bucket Name should be efficient. // Other filters will do a linear scan across buckets until it finds a match. func (c *Client) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error) { if filter.ID != nil { return c.FindBucketByID(ctx, *filter.ID) } if filter.Name != nil && filter.OrganizationID != nil { return c.FindBucketByName(ctx, *filter.OrganizationID, *filter.Name) } var b *platform.Bucket err := c.db.View(func(tx *bolt.Tx) error { if filter.Organization != nil { o, err := c.findOrganizationByName(ctx, tx, *filter.Organization) if err != nil { return err } filter.OrganizationID = &o.ID } filterFn := filterBucketsFn(filter) return c.forEachBucket(ctx, tx, func(bkt *platform.Bucket) bool { if filterFn(bkt) { b = bkt return false } return true }) }) if err != nil { return nil, err } if b == nil { return nil, fmt.Errorf("bucket not found") } return b, nil } func filterBucketsFn(filter platform.BucketFilter) func(b *platform.Bucket) bool { if filter.ID != nil { return func(b *platform.Bucket) bool { return bytes.Equal(b.ID, *filter.ID) } } if filter.Name != nil && filter.OrganizationID != nil { return func(b *platform.Bucket) bool { return bytes.Equal(b.OrganizationID, *filter.OrganizationID) && b.Name == *filter.Name } } if filter.Name != nil { return func(b *platform.Bucket) bool { return b.Name == *filter.Name } } if filter.OrganizationID != nil { return func(b *platform.Bucket) bool { return bytes.Equal(b.OrganizationID, *filter.OrganizationID) } } return func(b *platform.Bucket) bool { return true } } // FindBuckets retrives all buckets that match an arbitrary bucket filter. // Filters using ID, or OrganizationID and bucket Name should be efficient. // Other filters will do a linear scan across all buckets searching for a match. func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) { if filter.ID != nil { b, err := c.FindBucketByID(ctx, *filter.ID) if err != nil { return nil, 0, err } return []*platform.Bucket{b}, 1, nil } if filter.Name != nil && filter.OrganizationID != nil { b, err := c.FindBucketByName(ctx, *filter.OrganizationID, *filter.Name) if err != nil { return nil, 0, err } return []*platform.Bucket{b}, 1, nil } bs := []*platform.Bucket{} err := c.db.View(func(tx *bolt.Tx) error { bkts, err := c.findBuckets(ctx, tx, filter) if err != nil { return err } bs = bkts return nil }) if err != nil { return nil, 0, err } return bs, len(bs), nil } func (c *Client) findBuckets(ctx context.Context, tx *bolt.Tx, filter platform.BucketFilter) ([]*platform.Bucket, error) { bs := []*platform.Bucket{} if filter.Organization != nil { o, err := c.findOrganizationByName(ctx, tx, *filter.Organization) if err != nil { return nil, err } filter.OrganizationID = &o.ID } filterFn := filterBucketsFn(filter) err := c.forEachBucket(ctx, tx, func(b *platform.Bucket) bool { if filterFn(b) { bs = append(bs, b) } return true }) if err != nil { return nil, err } return bs, nil } // CreateBucket creates a platform bucket and sets b.ID. func (c *Client) CreateBucket(ctx context.Context, b *platform.Bucket) error { return c.db.Update(func(tx *bolt.Tx) error { if len(b.OrganizationID) == 0 { o, err := c.findOrganizationByName(ctx, tx, b.Organization) if err != nil { return err } b.OrganizationID = o.ID } unique := c.uniqueBucketName(ctx, tx, b) if !unique { // TODO: make standard error return fmt.Errorf("bucket with name %s already exists", b.Name) } b.ID = c.IDGenerator.ID() return c.putBucket(ctx, tx, b) }) } // PutBucket will put a bucket without setting an ID. func (c *Client) PutBucket(ctx context.Context, b *platform.Bucket) error { return c.db.Update(func(tx *bolt.Tx) error { return c.putBucket(ctx, tx, b) }) } func (c *Client) putBucket(ctx context.Context, tx *bolt.Tx, b *platform.Bucket) error { b.Organization = "" v, err := json.Marshal(b) if err != nil { return err } if err := tx.Bucket(bucketIndex).Put(bucketIndexKey(b), b.ID); err != nil { return err } if err := tx.Bucket(bucketBucket).Put(b.ID, v); err != nil { return err } return c.setOrganizationOnBucket(ctx, tx, b) } func bucketIndexKey(b *platform.Bucket) []byte { k := make([]byte, len(b.OrganizationID)+len(b.Name)) copy(k, b.OrganizationID) copy(k[len(b.OrganizationID):], []byte(b.Name)) return k } // forEachBucket will iterate through all buckets while fn returns true. func (c *Client) forEachBucket(ctx context.Context, tx *bolt.Tx, fn func(*platform.Bucket) bool) error { cur := tx.Bucket(bucketBucket).Cursor() for k, v := cur.First(); k != nil; k, v = cur.Next() { b := &platform.Bucket{} if err := json.Unmarshal(v, b); err != nil { return err } if err := c.setOrganizationOnBucket(ctx, tx, b); err != nil { return err } if !fn(b) { break } } return nil } func (c *Client) uniqueBucketName(ctx context.Context, tx *bolt.Tx, b *platform.Bucket) bool { v := tx.Bucket(bucketIndex).Get(bucketIndexKey(b)) return len(v) == 0 } // UpdateBucket updates a bucket according the parameters set on upd. func (c *Client) UpdateBucket(ctx context.Context, id platform.ID, upd platform.BucketUpdate) (*platform.Bucket, error) { var b *platform.Bucket err := c.db.Update(func(tx *bolt.Tx) error { bkt, err := c.updateBucket(ctx, tx, id, upd) if err != nil { return err } b = bkt return nil }) return b, err } func (c *Client) updateBucket(ctx context.Context, tx *bolt.Tx, id platform.ID, upd platform.BucketUpdate) (*platform.Bucket, error) { b, err := c.findBucketByID(ctx, tx, id) if err != nil { return nil, err } if upd.RetentionPeriod != nil { b.RetentionPeriod = *upd.RetentionPeriod } if upd.Name != nil { // Buckets are indexed by name and so the bucket index must be pruned when name // is modified. if err := tx.Bucket(bucketIndex).Delete(bucketIndexKey(b)); err != nil { return nil, err } b.Name = *upd.Name } if err := c.putBucket(ctx, tx, b); err != nil { return nil, err } if err := c.setOrganizationOnBucket(ctx, tx, b); err != nil { return nil, err } return b, nil } // DeleteBucket deletes a bucket and prunes it from the index. func (c *Client) DeleteBucket(ctx context.Context, id platform.ID) error { return c.db.Update(func(tx *bolt.Tx) error { return c.deleteBucket(ctx, tx, id) }) } func (c *Client) deleteBucket(ctx context.Context, tx *bolt.Tx, id platform.ID) error { b, err := c.findBucketByID(ctx, tx, id) if err != nil { return err } // make lowercase deleteBucket with tx if err := tx.Bucket(bucketIndex).Delete(bucketIndexKey(b)); err != nil { return err } return tx.Bucket(bucketBucket).Delete(id) }