filter buckets by FindOptions

pull/11062/head
zhulongcheng 2018-12-30 18:07:11 +08:00 committed by Leo Di Donato
parent 44aa7f39c2
commit ca332b59de
5 changed files with 200 additions and 17 deletions

View File

@ -179,7 +179,7 @@ func (c *Client) FindBucket(ctx context.Context, filter platform.BucketFilter) (
}
filterFn := filterBucketsFn(filter)
return c.forEachBucket(ctx, tx, func(bkt *platform.Bucket) bool {
return c.forEachBucket(ctx, tx, false, func(bkt *platform.Bucket) bool {
if filterFn(bkt) {
b = bkt
return false
@ -236,7 +236,7 @@ func filterBucketsFn(filter platform.BucketFilter) func(b *platform.Bucket) bool
// FindBuckets retrives all buckets that match an arbitrary bucket filter.
// Filters using ID, or OrganizationID and bucket Name should be efficient.
// Other filters will do a linear scan across all buckets searching for a match.
func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) {
func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter, opts ...platform.FindOptions) ([]*platform.Bucket, int, error) {
if filter.ID != nil {
b, err := c.FindBucketByID(ctx, *filter.ID)
if err != nil {
@ -257,7 +257,7 @@ func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter,
bs := []*platform.Bucket{}
err := c.db.View(func(tx *bolt.Tx) error {
bkts, err := c.findBuckets(ctx, tx, filter)
bkts, err := c.findBuckets(ctx, tx, filter, opts...)
if err != nil {
return err
}
@ -272,7 +272,7 @@ func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter,
return bs, len(bs), nil
}
func (c *Client) findBuckets(ctx context.Context, tx *bolt.Tx, filter platform.BucketFilter) ([]*platform.Bucket, *platform.Error) {
func (c *Client) findBuckets(ctx context.Context, tx *bolt.Tx, filter platform.BucketFilter, opts ...platform.FindOptions) ([]*platform.Bucket, *platform.Error) {
bs := []*platform.Bucket{}
if filter.Organization != nil {
o, err := c.findOrganizationByName(ctx, tx, *filter.Organization)
@ -284,11 +284,27 @@ func (c *Client) findBuckets(ctx context.Context, tx *bolt.Tx, filter platform.B
filter.OrganizationID = &o.ID
}
var offset, limit, count int
var descending bool
if len(opts) > 0 {
offset = opts[0].Offset
limit = opts[0].Limit
descending = opts[0].Descending
}
filterFn := filterBucketsFn(filter)
err := c.forEachBucket(ctx, tx, func(b *platform.Bucket) bool {
err := c.forEachBucket(ctx, tx, descending, func(b *platform.Bucket) bool {
if filterFn(b) {
if count >= offset {
bs = append(bs, b)
}
count += 1
}
if limit > 0 && len(bs) >= limit {
return false
}
return true
})
@ -445,9 +461,17 @@ func bucketIndexKey(b *platform.Bucket) ([]byte, *platform.Error) {
}
// forEachBucket will iterate through all buckets while fn returns true.
func (c *Client) forEachBucket(ctx context.Context, tx *bolt.Tx, fn func(*platform.Bucket) bool) error {
func (c *Client) forEachBucket(ctx context.Context, tx *bolt.Tx, descending bool, fn func(*platform.Bucket) bool) error {
cur := tx.Bucket(bucketBucket).Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
var k, v []byte
if descending {
k, v = cur.Last()
} else {
k, v = cur.First()
}
for k != nil {
b := &platform.Bucket{}
if err := json.Unmarshal(v, b); err != nil {
return err
@ -458,6 +482,12 @@ func (c *Client) forEachBucket(ctx context.Context, tx *bolt.Tx, fn func(*platfo
if !fn(b) {
break
}
if descending {
k, v = cur.Prev()
} else {
k, v = cur.Next()
}
}
return nil

View File

@ -572,6 +572,14 @@ func (s *BucketService) FindBuckets(ctx context.Context, filter platform.BucketF
query.Add("name", *filter.Name)
}
if len(opt) > 0 {
for k, vs := range opt[0].QueryParams() {
for _, v := range vs {
query.Add(k, v)
}
}
}
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, 0, err

View File

@ -3,6 +3,7 @@ package inmem
import (
"context"
"fmt"
"sort"
platform "github.com/influxdata/influxdb"
)
@ -60,26 +61,59 @@ func (s *Service) FindBucketByID(ctx context.Context, id platform.ID) (*platform
return b, err
}
func (s *Service) forEachBucket(ctx context.Context, fn func(b *platform.Bucket) bool) error {
func (s *Service) forEachBucket(ctx context.Context, descending bool, fn func(b *platform.Bucket) bool) error {
var err error
bs := make([]*platform.Bucket, 0)
s.bucketKV.Range(func(k, v interface{}) bool {
b, ok := v.(platform.Bucket)
if !ok {
err = fmt.Errorf("type %T is not a bucket", v)
return false
}
return fn(&b)
bs = append(bs, &b)
return true
})
// sort by id
sort.Slice(bs, func(i, j int) bool {
if descending {
return bs[i].ID.String() > bs[j].ID.String()
}
return bs[i].ID.String() < bs[j].ID.String()
})
for _, b := range bs {
if !fn(b) {
return nil
}
}
return err
}
func (s *Service) filterBuckets(ctx context.Context, fn func(b *platform.Bucket) bool) ([]*platform.Bucket, error) {
func (s *Service) filterBuckets(ctx context.Context, fn func(b *platform.Bucket) bool, opts ...platform.FindOptions) ([]*platform.Bucket, error) {
var offset, limit, count int
var descending bool
if len(opts) > 0 {
offset = opts[0].Offset
limit = opts[0].Limit
descending = opts[0].Descending
}
buckets := []*platform.Bucket{}
err := s.forEachBucket(ctx, func(b *platform.Bucket) bool {
err := s.forEachBucket(ctx, descending, func(b *platform.Bucket) bool {
if fn(b) {
if count >= offset {
buckets = append(buckets, b)
}
count += 1
}
if limit > 0 && len(buckets) >= limit {
return false
}
return true
})
@ -173,7 +207,7 @@ func (s *Service) findBuckets(ctx context.Context, filter platform.BucketFilter,
}
}
bs, err := s.filterBuckets(ctx, filterFunc)
bs, err := s.filterBuckets(ctx, filterFunc, opt...)
if err != nil {
return nil, &platform.Error{
Err: err,

View File

@ -33,11 +33,17 @@ type FindOptions struct {
// QueryParams returns a map containing url query params.
func (f FindOptions) QueryParams() map[string][]string {
qp := map[string][]string{
"limit": {strconv.Itoa(f.Limit)},
"offset": {strconv.Itoa(f.Offset)},
"sortBy": {f.SortBy},
"descending": {strconv.FormatBool(f.Descending)},
}
if f.Limit > 0 {
qp["limit"] = []string{strconv.Itoa(f.Limit)}
}
if f.SortBy != "" {
qp["sortBy"] = []string{f.SortBy}
}
return qp
}

View File

@ -512,6 +512,10 @@ func FindBuckets(
name string
organization string
organizationID platform.ID
offset int
limit int
descending bool
}
type wants struct {
@ -568,6 +572,96 @@ func FindBuckets(
},
},
},
{
name: "find all buckets by offset and limit",
fields: BucketFields{
Organizations: []*platform.Organization{
{
Name: "theorg",
ID: MustIDBase16(orgOneID),
},
},
Buckets: []*platform.Bucket{
{
ID: MustIDBase16(bucketOneID),
OrganizationID: MustIDBase16(orgOneID),
Name: "abc",
},
{
ID: MustIDBase16(bucketTwoID),
OrganizationID: MustIDBase16(orgOneID),
Name: "def",
},
{
ID: MustIDBase16(bucketThreeID),
OrganizationID: MustIDBase16(orgOneID),
Name: "xyz",
},
},
},
args: args{
offset: 1,
limit: 1,
},
wants: wants{
buckets: []*platform.Bucket{
{
ID: MustIDBase16(bucketTwoID),
OrganizationID: MustIDBase16(orgOneID),
Organization: "theorg",
Name: "def",
},
},
},
},
{
name: "find all buckets by descending",
fields: BucketFields{
Organizations: []*platform.Organization{
{
Name: "theorg",
ID: MustIDBase16(orgOneID),
},
},
Buckets: []*platform.Bucket{
{
ID: MustIDBase16(bucketOneID),
OrganizationID: MustIDBase16(orgOneID),
Name: "abc",
},
{
ID: MustIDBase16(bucketTwoID),
OrganizationID: MustIDBase16(orgOneID),
Name: "def",
},
{
ID: MustIDBase16(bucketThreeID),
OrganizationID: MustIDBase16(orgOneID),
Name: "xyz",
},
},
},
args: args{
offset: 1,
descending: true,
},
wants: wants{
buckets: []*platform.Bucket{
{
ID: MustIDBase16(bucketTwoID),
OrganizationID: MustIDBase16(orgOneID),
Organization: "theorg",
Name: "def",
},
{
ID: MustIDBase16(bucketOneID),
OrganizationID: MustIDBase16(orgOneID),
Organization: "theorg",
Name: "abc",
},
},
},
},
{
name: "find buckets by organization name",
fields: BucketFields{
@ -744,7 +838,18 @@ func FindBuckets(
filter.Name = &tt.args.name
}
buckets, _, err := s.FindBuckets(ctx, filter)
opt := platform.FindOptions{}
if tt.args.offset > 0 {
opt.Offset = tt.args.offset
}
if tt.args.limit > 0 {
opt.Limit = tt.args.limit
}
if tt.args.descending {
opt.Descending = tt.args.descending
}
buckets, _, err := s.FindBuckets(ctx, filter, opt)
diffPlatformErrors(tt.name, err, tt.wants.err, opPrefix, t)
if diff := cmp.Diff(buckets, tt.wants.buckets, bucketCmpOptions...); diff != "" {