From ca332b59de8c3630ab7439065607249cf551477b Mon Sep 17 00:00:00 2001 From: zhulongcheng Date: Sun, 30 Dec 2018 18:07:11 +0800 Subject: [PATCH] filter buckets by FindOptions --- bolt/bucket.go | 46 +++++++++++++--- http/bucket_service.go | 8 +++ inmem/bucket_service.go | 46 +++++++++++++--- paging.go | 10 +++- testing/bucket_service.go | 107 +++++++++++++++++++++++++++++++++++++- 5 files changed, 200 insertions(+), 17 deletions(-) diff --git a/bolt/bucket.go b/bolt/bucket.go index 636acd3e43..f5d393b528 100644 --- a/bolt/bucket.go +++ b/bolt/bucket.go @@ -179,7 +179,7 @@ func (c *Client) FindBucket(ctx context.Context, filter platform.BucketFilter) ( } filterFn := filterBucketsFn(filter) - return c.forEachBucket(ctx, tx, func(bkt *platform.Bucket) bool { + return c.forEachBucket(ctx, tx, false, func(bkt *platform.Bucket) bool { if filterFn(bkt) { b = bkt return false @@ -236,7 +236,7 @@ func filterBucketsFn(filter platform.BucketFilter) func(b *platform.Bucket) bool // FindBuckets retrives all buckets that match an arbitrary bucket filter. // Filters using ID, or OrganizationID and bucket Name should be efficient. // Other filters will do a linear scan across all buckets searching for a match. -func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) { +func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter, opts ...platform.FindOptions) ([]*platform.Bucket, int, error) { if filter.ID != nil { b, err := c.FindBucketByID(ctx, *filter.ID) if err != nil { @@ -257,7 +257,7 @@ func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter, bs := []*platform.Bucket{} err := c.db.View(func(tx *bolt.Tx) error { - bkts, err := c.findBuckets(ctx, tx, filter) + bkts, err := c.findBuckets(ctx, tx, filter, opts...) if err != nil { return err } @@ -272,7 +272,7 @@ func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter, return bs, len(bs), nil } -func (c *Client) findBuckets(ctx context.Context, tx *bolt.Tx, filter platform.BucketFilter) ([]*platform.Bucket, *platform.Error) { +func (c *Client) findBuckets(ctx context.Context, tx *bolt.Tx, filter platform.BucketFilter, opts ...platform.FindOptions) ([]*platform.Bucket, *platform.Error) { bs := []*platform.Bucket{} if filter.Organization != nil { o, err := c.findOrganizationByName(ctx, tx, *filter.Organization) @@ -284,11 +284,27 @@ func (c *Client) findBuckets(ctx context.Context, tx *bolt.Tx, filter platform.B filter.OrganizationID = &o.ID } + var offset, limit, count int + var descending bool + if len(opts) > 0 { + offset = opts[0].Offset + limit = opts[0].Limit + descending = opts[0].Descending + } + filterFn := filterBucketsFn(filter) - err := c.forEachBucket(ctx, tx, func(b *platform.Bucket) bool { + err := c.forEachBucket(ctx, tx, descending, func(b *platform.Bucket) bool { if filterFn(b) { - bs = append(bs, b) + if count >= offset { + bs = append(bs, b) + } + count += 1 } + + if limit > 0 && len(bs) >= limit { + return false + } + return true }) @@ -445,9 +461,17 @@ func bucketIndexKey(b *platform.Bucket) ([]byte, *platform.Error) { } // forEachBucket will iterate through all buckets while fn returns true. -func (c *Client) forEachBucket(ctx context.Context, tx *bolt.Tx, fn func(*platform.Bucket) bool) error { +func (c *Client) forEachBucket(ctx context.Context, tx *bolt.Tx, descending bool, fn func(*platform.Bucket) bool) error { cur := tx.Bucket(bucketBucket).Cursor() - for k, v := cur.First(); k != nil; k, v = cur.Next() { + + var k, v []byte + if descending { + k, v = cur.Last() + } else { + k, v = cur.First() + } + + for k != nil { b := &platform.Bucket{} if err := json.Unmarshal(v, b); err != nil { return err @@ -458,6 +482,12 @@ func (c *Client) forEachBucket(ctx context.Context, tx *bolt.Tx, fn func(*platfo if !fn(b) { break } + + if descending { + k, v = cur.Prev() + } else { + k, v = cur.Next() + } } return nil diff --git a/http/bucket_service.go b/http/bucket_service.go index 5947ce284d..d2c9949c63 100644 --- a/http/bucket_service.go +++ b/http/bucket_service.go @@ -572,6 +572,14 @@ func (s *BucketService) FindBuckets(ctx context.Context, filter platform.BucketF query.Add("name", *filter.Name) } + if len(opt) > 0 { + for k, vs := range opt[0].QueryParams() { + for _, v := range vs { + query.Add(k, v) + } + } + } + req, err := http.NewRequest("GET", u.String(), nil) if err != nil { return nil, 0, err diff --git a/inmem/bucket_service.go b/inmem/bucket_service.go index 986cbae502..1051395398 100644 --- a/inmem/bucket_service.go +++ b/inmem/bucket_service.go @@ -3,6 +3,7 @@ package inmem import ( "context" "fmt" + "sort" platform "github.com/influxdata/influxdb" ) @@ -60,26 +61,59 @@ func (s *Service) FindBucketByID(ctx context.Context, id platform.ID) (*platform return b, err } -func (s *Service) forEachBucket(ctx context.Context, fn func(b *platform.Bucket) bool) error { +func (s *Service) forEachBucket(ctx context.Context, descending bool, fn func(b *platform.Bucket) bool) error { var err error + bs := make([]*platform.Bucket, 0) s.bucketKV.Range(func(k, v interface{}) bool { b, ok := v.(platform.Bucket) if !ok { err = fmt.Errorf("type %T is not a bucket", v) return false } - return fn(&b) + + bs = append(bs, &b) + return true }) + // sort by id + sort.Slice(bs, func(i, j int) bool { + if descending { + return bs[i].ID.String() > bs[j].ID.String() + } + return bs[i].ID.String() < bs[j].ID.String() + }) + + for _, b := range bs { + if !fn(b) { + return nil + } + } + return err } -func (s *Service) filterBuckets(ctx context.Context, fn func(b *platform.Bucket) bool) ([]*platform.Bucket, error) { +func (s *Service) filterBuckets(ctx context.Context, fn func(b *platform.Bucket) bool, opts ...platform.FindOptions) ([]*platform.Bucket, error) { + var offset, limit, count int + var descending bool + if len(opts) > 0 { + offset = opts[0].Offset + limit = opts[0].Limit + descending = opts[0].Descending + } + buckets := []*platform.Bucket{} - err := s.forEachBucket(ctx, func(b *platform.Bucket) bool { + err := s.forEachBucket(ctx, descending, func(b *platform.Bucket) bool { if fn(b) { - buckets = append(buckets, b) + if count >= offset { + buckets = append(buckets, b) + } + count += 1 } + + if limit > 0 && len(buckets) >= limit { + return false + } + return true }) @@ -173,7 +207,7 @@ func (s *Service) findBuckets(ctx context.Context, filter platform.BucketFilter, } } - bs, err := s.filterBuckets(ctx, filterFunc) + bs, err := s.filterBuckets(ctx, filterFunc, opt...) if err != nil { return nil, &platform.Error{ Err: err, diff --git a/paging.go b/paging.go index e3635cb23d..6aa9371135 100644 --- a/paging.go +++ b/paging.go @@ -33,11 +33,17 @@ type FindOptions struct { // QueryParams returns a map containing url query params. func (f FindOptions) QueryParams() map[string][]string { qp := map[string][]string{ - "limit": {strconv.Itoa(f.Limit)}, "offset": {strconv.Itoa(f.Offset)}, - "sortBy": {f.SortBy}, "descending": {strconv.FormatBool(f.Descending)}, } + if f.Limit > 0 { + qp["limit"] = []string{strconv.Itoa(f.Limit)} + } + + if f.SortBy != "" { + qp["sortBy"] = []string{f.SortBy} + } + return qp } diff --git a/testing/bucket_service.go b/testing/bucket_service.go index e93f2e2b57..f9a283f412 100644 --- a/testing/bucket_service.go +++ b/testing/bucket_service.go @@ -512,6 +512,10 @@ func FindBuckets( name string organization string organizationID platform.ID + + offset int + limit int + descending bool } type wants struct { @@ -568,6 +572,96 @@ func FindBuckets( }, }, }, + { + name: "find all buckets by offset and limit", + fields: BucketFields{ + Organizations: []*platform.Organization{ + { + Name: "theorg", + ID: MustIDBase16(orgOneID), + }, + }, + Buckets: []*platform.Bucket{ + { + ID: MustIDBase16(bucketOneID), + OrganizationID: MustIDBase16(orgOneID), + Name: "abc", + }, + { + ID: MustIDBase16(bucketTwoID), + OrganizationID: MustIDBase16(orgOneID), + Name: "def", + }, + { + ID: MustIDBase16(bucketThreeID), + OrganizationID: MustIDBase16(orgOneID), + Name: "xyz", + }, + }, + }, + args: args{ + offset: 1, + limit: 1, + }, + wants: wants{ + buckets: []*platform.Bucket{ + { + ID: MustIDBase16(bucketTwoID), + OrganizationID: MustIDBase16(orgOneID), + Organization: "theorg", + Name: "def", + }, + }, + }, + }, + { + name: "find all buckets by descending", + fields: BucketFields{ + Organizations: []*platform.Organization{ + { + Name: "theorg", + ID: MustIDBase16(orgOneID), + }, + }, + Buckets: []*platform.Bucket{ + { + ID: MustIDBase16(bucketOneID), + OrganizationID: MustIDBase16(orgOneID), + Name: "abc", + }, + { + ID: MustIDBase16(bucketTwoID), + OrganizationID: MustIDBase16(orgOneID), + Name: "def", + }, + { + ID: MustIDBase16(bucketThreeID), + OrganizationID: MustIDBase16(orgOneID), + Name: "xyz", + }, + }, + }, + args: args{ + offset: 1, + descending: true, + }, + wants: wants{ + buckets: []*platform.Bucket{ + { + ID: MustIDBase16(bucketTwoID), + OrganizationID: MustIDBase16(orgOneID), + Organization: "theorg", + Name: "def", + }, + { + ID: MustIDBase16(bucketOneID), + OrganizationID: MustIDBase16(orgOneID), + Organization: "theorg", + Name: "abc", + }, + }, + }, + }, { name: "find buckets by organization name", fields: BucketFields{ @@ -744,7 +838,18 @@ func FindBuckets( filter.Name = &tt.args.name } - buckets, _, err := s.FindBuckets(ctx, filter) + opt := platform.FindOptions{} + if tt.args.offset > 0 { + opt.Offset = tt.args.offset + } + if tt.args.limit > 0 { + opt.Limit = tt.args.limit + } + if tt.args.descending { + opt.Descending = tt.args.descending + } + + buckets, _, err := s.FindBuckets(ctx, filter, opt) diffPlatformErrors(tt.name, err, tt.wants.err, opPrefix, t) if diff := cmp.Diff(buckets, tt.wants.buckets, bucketCmpOptions...); diff != "" {