fix(query): paginate bucket results in the bucket lookup (#19111)
The `buckets()` command would use a bucket lookup that wrapped the `FindBuckets` API. It did not use the pagination aspect of this API correctly. When the underlying implementation was changed to a version that correctly implemented pagination, this broke the query `buckets()` command. Since it was query that used the API incorrectly rather than a regression in the `FindBuckets` implementation, this fixes the usage to correctly use pagination.pull/19138/head
parent
e7310b414c
commit
0cdbd496f4
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/influxdata/flux/csv"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/execute/executetest"
|
||||
"github.com/influxdata/flux/execute/table"
|
||||
"github.com/influxdata/flux/lang"
|
||||
"github.com/influxdata/flux/runtime"
|
||||
"github.com/influxdata/flux/values"
|
||||
|
@ -2276,3 +2277,59 @@ from(bucket: v.bucket)
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLauncher_Query_Buckets_MultiplePages(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil)
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
// Create a large number of buckets. This is above the default
|
||||
// page size of 20.
|
||||
for i := 0; i < 50; i++ {
|
||||
b := &influxdb.Bucket{
|
||||
OrgID: l.Org.ID,
|
||||
Name: fmt.Sprintf("b%02d", i),
|
||||
}
|
||||
if err := l.BucketService(t).CreateBucket(ctx, b); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
var sb strings.Builder
|
||||
sb.WriteString(`
|
||||
#datatype,string,long,string
|
||||
#group,false,false,false
|
||||
#default,_result,,
|
||||
,result,table,name
|
||||
,,0,BUCKET
|
||||
,,0,_monitoring
|
||||
,,0,_tasks
|
||||
`)
|
||||
for i := 0; i < 50; i++ {
|
||||
_, _ = fmt.Fprintf(&sb, ",,0,b%02d\n", i)
|
||||
}
|
||||
data := sb.String()
|
||||
|
||||
bucketsQuery := `
|
||||
buckets()
|
||||
|> keep(columns: ["name"])
|
||||
|> sort(columns: ["name"])
|
||||
`
|
||||
res := l.MustExecuteQuery(bucketsQuery)
|
||||
defer res.Done()
|
||||
|
||||
firstResult := func(ri flux.ResultIterator) flux.Result {
|
||||
ri.More()
|
||||
return ri.Next()
|
||||
}
|
||||
got := firstResult(flux.NewSliceResultIterator(res.Results))
|
||||
|
||||
want, err := csv.NewResultDecoder(csv.ResultDecoderConfig{}).Decode(strings.NewReader(data))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if diff := table.Diff(want.Tables(), got.Tables()); diff != "" {
|
||||
t.Fatalf("unexpected output -want/+got:\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,41 +5,38 @@ import (
|
|||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/codes"
|
||||
platform "github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
)
|
||||
|
||||
// FromBucketService wraps an platform.BucketService in the BucketLookup interface.
|
||||
func FromBucketService(srv platform.BucketService) *BucketLookup {
|
||||
// FromBucketService wraps an influxdb.BucketService in the BucketLookup interface.
|
||||
func FromBucketService(srv influxdb.BucketService) *BucketLookup {
|
||||
return &BucketLookup{
|
||||
BucketService: srv,
|
||||
}
|
||||
}
|
||||
|
||||
// BucketLookup converts Flux bucket lookups into platform.BucketService calls.
|
||||
// BucketLookup converts Flux bucket lookups into influxdb.BucketService calls.
|
||||
type BucketLookup struct {
|
||||
BucketService platform.BucketService
|
||||
BucketService influxdb.BucketService
|
||||
}
|
||||
|
||||
// Lookup returns the bucket id and its existence given an org id and bucket name.
|
||||
func (b *BucketLookup) Lookup(ctx context.Context, orgID platform.ID, name string) (platform.ID, bool) {
|
||||
oid := platform.ID(orgID)
|
||||
filter := platform.BucketFilter{
|
||||
OrganizationID: &oid,
|
||||
func (b *BucketLookup) Lookup(ctx context.Context, orgID influxdb.ID, name string) (influxdb.ID, bool) {
|
||||
filter := influxdb.BucketFilter{
|
||||
OrganizationID: &orgID,
|
||||
Name: &name,
|
||||
}
|
||||
bucket, err := b.BucketService.FindBucket(ctx, filter)
|
||||
if err != nil {
|
||||
return platform.InvalidID(), false
|
||||
return influxdb.InvalidID(), false
|
||||
}
|
||||
return bucket.ID, true
|
||||
}
|
||||
|
||||
// LookupName returns an bucket name given its organization ID and its bucket ID.
|
||||
func (b *BucketLookup) LookupName(ctx context.Context, orgID platform.ID, id platform.ID) string {
|
||||
oid := platform.ID(orgID)
|
||||
id = platform.ID(id)
|
||||
filter := platform.BucketFilter{
|
||||
OrganizationID: &oid,
|
||||
func (b *BucketLookup) LookupName(ctx context.Context, orgID influxdb.ID, id influxdb.ID) string {
|
||||
filter := influxdb.BucketFilter{
|
||||
OrganizationID: &orgID,
|
||||
ID: &id,
|
||||
}
|
||||
bucket, err := b.BucketService.FindBucket(ctx, filter)
|
||||
|
@ -49,48 +46,55 @@ func (b *BucketLookup) LookupName(ctx context.Context, orgID platform.ID, id pla
|
|||
return bucket.Name
|
||||
}
|
||||
|
||||
func (b *BucketLookup) FindAllBuckets(ctx context.Context, orgID platform.ID) ([]*platform.Bucket, int) {
|
||||
oid := platform.ID(orgID)
|
||||
filter := platform.BucketFilter{
|
||||
OrganizationID: &oid,
|
||||
func (b *BucketLookup) FindAllBuckets(ctx context.Context, orgID influxdb.ID) ([]*influxdb.Bucket, int) {
|
||||
filter := influxdb.BucketFilter{
|
||||
OrganizationID: &orgID,
|
||||
}
|
||||
buckets, count, err := b.BucketService.FindBuckets(ctx, filter)
|
||||
if err != nil {
|
||||
return nil, count
|
||||
}
|
||||
return buckets, count
|
||||
|
||||
var allBuckets []*influxdb.Bucket
|
||||
opt := influxdb.FindOptions{Limit: 20}
|
||||
for ; ; opt.Offset += opt.Limit {
|
||||
buckets, _, err := b.BucketService.FindBuckets(ctx, filter, opt)
|
||||
if err != nil {
|
||||
return nil, len(buckets)
|
||||
}
|
||||
allBuckets = append(allBuckets, buckets...)
|
||||
if len(buckets) < opt.Limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
return allBuckets, len(allBuckets)
|
||||
}
|
||||
|
||||
// FromOrganizationService wraps a platform.OrganizationService in the OrganizationLookup interface.
|
||||
func FromOrganizationService(srv platform.OrganizationService) *OrganizationLookup {
|
||||
// FromOrganizationService wraps a influxdb.OrganizationService in the OrganizationLookup interface.
|
||||
func FromOrganizationService(srv influxdb.OrganizationService) *OrganizationLookup {
|
||||
return &OrganizationLookup{OrganizationService: srv}
|
||||
}
|
||||
|
||||
// OrganizationLookup converts organization name lookups into platform.OrganizationService calls.
|
||||
// OrganizationLookup converts organization name lookups into influxdb.OrganizationService calls.
|
||||
type OrganizationLookup struct {
|
||||
OrganizationService platform.OrganizationService
|
||||
OrganizationService influxdb.OrganizationService
|
||||
}
|
||||
|
||||
// Lookup returns the organization ID and its existence given an organization name.
|
||||
func (o *OrganizationLookup) Lookup(ctx context.Context, name string) (platform.ID, bool) {
|
||||
func (o *OrganizationLookup) Lookup(ctx context.Context, name string) (influxdb.ID, bool) {
|
||||
org, err := o.OrganizationService.FindOrganization(
|
||||
ctx,
|
||||
platform.OrganizationFilter{Name: &name},
|
||||
influxdb.OrganizationFilter{Name: &name},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return platform.InvalidID(), false
|
||||
return influxdb.InvalidID(), false
|
||||
}
|
||||
return org.ID, true
|
||||
}
|
||||
|
||||
// LookupName returns an organization name given its ID.
|
||||
func (o *OrganizationLookup) LookupName(ctx context.Context, id platform.ID) string {
|
||||
id = platform.ID(id)
|
||||
func (o *OrganizationLookup) LookupName(ctx context.Context, id influxdb.ID) string {
|
||||
id = influxdb.ID(id)
|
||||
org, err := o.OrganizationService.FindOrganization(
|
||||
ctx,
|
||||
platform.OrganizationFilter{
|
||||
influxdb.OrganizationFilter{
|
||||
ID: &id,
|
||||
},
|
||||
)
|
||||
|
@ -101,14 +105,14 @@ func (o *OrganizationLookup) LookupName(ctx context.Context, id platform.ID) str
|
|||
return org.Name
|
||||
}
|
||||
|
||||
// SecretLookup wraps the platform.SecretService to perform lookups based on the organization
|
||||
// SecretLookup wraps the influxdb.SecretService to perform lookups based on the organization
|
||||
// in the context.
|
||||
type SecretLookup struct {
|
||||
SecretService platform.SecretService
|
||||
SecretService influxdb.SecretService
|
||||
}
|
||||
|
||||
// FromSecretService wraps a platform.OrganizationService in the OrganizationLookup interface.
|
||||
func FromSecretService(srv platform.SecretService) *SecretLookup {
|
||||
// FromSecretService wraps a influxdb.OrganizationService in the OrganizationLookup interface.
|
||||
func FromSecretService(srv influxdb.SecretService) *SecretLookup {
|
||||
return &SecretLookup{SecretService: srv}
|
||||
}
|
||||
|
||||
|
|
|
@ -140,6 +140,7 @@ func CreateBucket(
|
|||
fields BucketFields
|
||||
args args
|
||||
wants wants
|
||||
skip string
|
||||
}{
|
||||
{
|
||||
name: "create buckets with empty set",
|
||||
|
@ -319,6 +320,7 @@ func CreateBucket(
|
|||
},
|
||||
},
|
||||
},
|
||||
skip: "flaky test: https://github.com/influxdata/influxdb/issues/19109",
|
||||
},
|
||||
{
|
||||
name: "create bucket with orgID not exist",
|
||||
|
@ -348,6 +350,10 @@ func CreateBucket(
|
|||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if tt.skip != "" {
|
||||
t.Skip(tt.skip)
|
||||
}
|
||||
|
||||
s, opPrefix, done := init(tt.fields, t)
|
||||
defer done()
|
||||
ctx := context.Background()
|
||||
|
|
Loading…
Reference in New Issue