fix: fix virtual DBRP FindMany, make virtual bucket default if not overridden (#23623)
* fix: fix find dbrps, make bucket default if not overridden * fix: allow filtering of virtual DBRPs, filter in shard mapper * fix: update tests to mock for virtual filter for shards and update server testpull/23627/head
parent
78c969e510
commit
afbbfaca87
|
@ -238,7 +238,7 @@ func (s *Service) FindByID(ctx context.Context, orgID, id platform.ID) (*influxd
|
||||||
if err != nil || b == nil {
|
if err != nil || b == nil {
|
||||||
return nil, ErrDBRPNotFound
|
return nil, ErrDBRPNotFound
|
||||||
}
|
}
|
||||||
return bucketToMapping(b), nil
|
return bucketToMapping(b, true), nil
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -356,37 +356,29 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
|
||||||
return ms, len(ms), err
|
return ms, len(ms), err
|
||||||
}
|
}
|
||||||
|
|
||||||
buckets, count, err := s.bucketSvc.FindBuckets(ctx, influxdb.BucketFilter{
|
// a very general search, because if we search for database name of "hello",
|
||||||
|
// the bucket name could be "hello" (with autogen rp) or "hello/foo" which we wouldn't find
|
||||||
|
buckets, _, err := s.bucketSvc.FindBuckets(ctx, influxdb.BucketFilter{
|
||||||
ID: filter.BucketID,
|
ID: filter.BucketID,
|
||||||
Name: filter.Database,
|
|
||||||
OrganizationID: filter.OrgID,
|
OrganizationID: filter.OrgID,
|
||||||
}, opts...)
|
}, opts...)
|
||||||
if (err != nil || count == 0) && filter.Database != nil && filter.RetentionPolicy != nil {
|
if err != nil {
|
||||||
// if the search couldn't find a corresponding dbrp, it could be that the bucket name has a slash (like db/rp)
|
// we were unable to find any virtual mappings, so return what physical mappings we have
|
||||||
// instead of just bucket name being the database with "autogen" retention policy
|
return ms, len(ms), nil
|
||||||
bucketName := *filter.Database + "/" + *filter.RetentionPolicy
|
|
||||||
buckets, _, err = s.bucketSvc.FindBuckets(ctx, influxdb.BucketFilter{
|
|
||||||
ID: filter.BucketID,
|
|
||||||
Name: &bucketName,
|
|
||||||
OrganizationID: filter.OrgID,
|
|
||||||
}, opts...)
|
|
||||||
if err != nil {
|
|
||||||
// we were unable to find any virtual mappings, so return what physical mappings we have
|
|
||||||
return ms, len(ms), nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
OUTER:
|
|
||||||
for _, bucket := range buckets {
|
for _, bucket := range buckets {
|
||||||
// check if this virtual mapping has been overriden by a custom, physical mapping
|
|
||||||
for _, m := range ms {
|
|
||||||
if m.BucketID == bucket.ID {
|
|
||||||
continue OUTER
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if bucket == nil {
|
if bucket == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
mapping := bucketToMapping(bucket)
|
isDefault := true
|
||||||
|
// check if this virtual mapping has been overriden by a custom, physical mapping
|
||||||
|
for _, m := range ms {
|
||||||
|
if m.BucketID == bucket.ID {
|
||||||
|
isDefault = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mapping := bucketToMapping(bucket, isDefault)
|
||||||
if filterFunc(mapping, filter) {
|
if filterFunc(mapping, filter) {
|
||||||
ms = append(ms, mapping)
|
ms = append(ms, mapping)
|
||||||
}
|
}
|
||||||
|
@ -395,7 +387,7 @@ OUTER:
|
||||||
return ms, len(ms), nil
|
return ms, len(ms), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func bucketToMapping(bucket *influxdb.Bucket) *influxdb.DBRPMapping {
|
func bucketToMapping(bucket *influxdb.Bucket, isDefault bool) *influxdb.DBRPMapping {
|
||||||
if bucket == nil {
|
if bucket == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -404,7 +396,7 @@ func bucketToMapping(bucket *influxdb.Bucket) *influxdb.DBRPMapping {
|
||||||
db, rp := parseDBRP(bucket.Name)
|
db, rp := parseDBRP(bucket.Name)
|
||||||
return &influxdb.DBRPMapping{
|
return &influxdb.DBRPMapping{
|
||||||
ID: dbrpID,
|
ID: dbrpID,
|
||||||
Default: false,
|
Default: isDefault,
|
||||||
Database: db,
|
Database: db,
|
||||||
RetentionPolicy: rp,
|
RetentionPolicy: rp,
|
||||||
OrganizationID: bucket.OrgID,
|
OrganizationID: bucket.OrgID,
|
||||||
|
@ -604,5 +596,7 @@ func filterFunc(dbrp *influxdb.DBRPMapping, filter influxdb.DBRPMappingFilter) b
|
||||||
(filter.BucketID == nil || (*filter.BucketID) == dbrp.BucketID) &&
|
(filter.BucketID == nil || (*filter.BucketID) == dbrp.BucketID) &&
|
||||||
(filter.Database == nil || (*filter.Database) == dbrp.Database) &&
|
(filter.Database == nil || (*filter.Database) == dbrp.Database) &&
|
||||||
(filter.RetentionPolicy == nil || (*filter.RetentionPolicy) == dbrp.RetentionPolicy) &&
|
(filter.RetentionPolicy == nil || (*filter.RetentionPolicy) == dbrp.RetentionPolicy) &&
|
||||||
(filter.Default == nil || (*filter.Default) == dbrp.Default)
|
(filter.Default == nil || (*filter.Default) == dbrp.Default) &&
|
||||||
|
(filter.Virtual == nil || (*filter.Virtual) == dbrp.Virtual)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,6 +101,7 @@ type DBRPMappingFilter struct {
|
||||||
Database *string
|
Database *string
|
||||||
RetentionPolicy *string
|
RetentionPolicy *string
|
||||||
Default *bool
|
Default *bool
|
||||||
|
Virtual *bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f DBRPMappingFilter) String() string {
|
func (f DBRPMappingFilter) String() string {
|
||||||
|
|
|
@ -4899,7 +4899,7 @@ func TestServer_Query_ShowMeasurements(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: `show measurements on all dbs and rps`,
|
name: `show measurements on all dbs and rps`,
|
||||||
command: "SHOW MEASUREMENTS on *.*",
|
command: "SHOW MEASUREMENTS on *.*",
|
||||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"measurements","columns":["name","database","retention policy"],"values":[["cpu","db0","rp0"],["gpu","db0","rp0"],["other","db0","rp0"],["other2","db0","rp1"],["cpu","db1","rp0"],["disk","db1","rp0"]]}]}]}`,
|
exp: `{"results":[{"statement_id":0,"series":[{"name":"measurements","columns":["name","database","retention policy"],"values":[["other2","b2","autogen"],["cpu","b3","autogen"],["disk","b3","autogen"],["cpu","db","rp"],["gpu","db","rp"],["other","db","rp"],["cpu","db0","rp0"],["gpu","db0","rp0"],["other","db0","rp0"],["other2","db0","rp1"],["cpu","db1","rp0"],["disk","db1","rp0"]]}]}]}`,
|
||||||
params: url.Values{"db": []string{"db0"}, "rp": []string{"rp0"}},
|
params: url.Values{"db": []string{"db0"}, "rp": []string{"rp0"}},
|
||||||
},
|
},
|
||||||
}...)
|
}...)
|
||||||
|
|
|
@ -660,7 +660,7 @@ func FindManyDBRPMappingsV2(
|
||||||
ID: 100,
|
ID: 100,
|
||||||
Database: "testdb",
|
Database: "testdb",
|
||||||
RetentionPolicy: "autogen",
|
RetentionPolicy: "autogen",
|
||||||
Default: false,
|
Default: true,
|
||||||
Virtual: true,
|
Virtual: true,
|
||||||
OrganizationID: MustIDBase16(dbrpOrg3ID),
|
OrganizationID: MustIDBase16(dbrpOrg3ID),
|
||||||
BucketID: 100,
|
BucketID: 100,
|
||||||
|
@ -669,7 +669,7 @@ func FindManyDBRPMappingsV2(
|
||||||
ID: 200,
|
ID: 200,
|
||||||
Database: "testdb2",
|
Database: "testdb2",
|
||||||
RetentionPolicy: "testrp2",
|
RetentionPolicy: "testrp2",
|
||||||
Default: false,
|
Default: true,
|
||||||
Virtual: true,
|
Virtual: true,
|
||||||
OrganizationID: MustIDBase16(dbrpOrg3ID),
|
OrganizationID: MustIDBase16(dbrpOrg3ID),
|
||||||
BucketID: 200,
|
BucketID: 200,
|
||||||
|
@ -677,6 +677,94 @@ func FindManyDBRPMappingsV2(
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "find virtual by database",
|
||||||
|
fields: DBRPMappingFields{
|
||||||
|
BucketSvc: &mock.BucketService{
|
||||||
|
FindBucketByIDFn: func(ctx context.Context, id platform.ID) (*influxdb.Bucket, error) {
|
||||||
|
if id == MustIDBase16(dbrpBucket2ID) {
|
||||||
|
return nil, &errors2.Error{
|
||||||
|
Code: errors2.ENotFound,
|
||||||
|
Msg: "bucket not found",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
},
|
||||||
|
FindBucketsFn: func(ctx context.Context, bf influxdb.BucketFilter, fo ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
|
||||||
|
return []*influxdb.Bucket{
|
||||||
|
// org 3
|
||||||
|
{ID: 100, Name: "testdb", OrgID: MustIDBase16(dbrpOrg3ID)},
|
||||||
|
{ID: 200, Name: "testdb2/testrp2", OrgID: MustIDBase16(dbrpOrg3ID)},
|
||||||
|
// org 2
|
||||||
|
{ID: 300, Name: "testdb3", OrgID: MustIDBase16(dbrpOrg2ID)},
|
||||||
|
{ID: 400, Name: "testdb4/testrp4", OrgID: MustIDBase16(dbrpOrg2ID)},
|
||||||
|
}, 0, nil
|
||||||
|
}},
|
||||||
|
DBRPMappingsV2: []*influxdb.DBRPMapping{},
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
filter: influxdb.DBRPMappingFilter{
|
||||||
|
Database: strPtr("testdb4"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wants: wants{
|
||||||
|
dbrpMappings: []*influxdb.DBRPMapping{
|
||||||
|
{
|
||||||
|
ID: 400,
|
||||||
|
Database: "testdb4",
|
||||||
|
RetentionPolicy: "testrp4",
|
||||||
|
Default: true,
|
||||||
|
Virtual: true,
|
||||||
|
OrganizationID: MustIDBase16(dbrpOrg2ID),
|
||||||
|
BucketID: 400,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "find virtual by database autogen",
|
||||||
|
fields: DBRPMappingFields{
|
||||||
|
BucketSvc: &mock.BucketService{
|
||||||
|
FindBucketByIDFn: func(ctx context.Context, id platform.ID) (*influxdb.Bucket, error) {
|
||||||
|
if id == MustIDBase16(dbrpBucket2ID) {
|
||||||
|
return nil, &errors2.Error{
|
||||||
|
Code: errors2.ENotFound,
|
||||||
|
Msg: "bucket not found",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
},
|
||||||
|
FindBucketsFn: func(ctx context.Context, bf influxdb.BucketFilter, fo ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
|
||||||
|
return []*influxdb.Bucket{
|
||||||
|
// org 3
|
||||||
|
{ID: 100, Name: "testdb", OrgID: MustIDBase16(dbrpOrg3ID)},
|
||||||
|
{ID: 200, Name: "testdb2/testrp2", OrgID: MustIDBase16(dbrpOrg3ID)},
|
||||||
|
// org 2
|
||||||
|
{ID: 300, Name: "testdb3", OrgID: MustIDBase16(dbrpOrg2ID)},
|
||||||
|
{ID: 400, Name: "testdb4/testrp4", OrgID: MustIDBase16(dbrpOrg2ID)},
|
||||||
|
}, 0, nil
|
||||||
|
}},
|
||||||
|
DBRPMappingsV2: []*influxdb.DBRPMapping{},
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
filter: influxdb.DBRPMappingFilter{
|
||||||
|
Database: strPtr("testdb"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wants: wants{
|
||||||
|
dbrpMappings: []*influxdb.DBRPMapping{
|
||||||
|
{
|
||||||
|
ID: 100,
|
||||||
|
Database: "testdb",
|
||||||
|
RetentionPolicy: "autogen",
|
||||||
|
Default: true,
|
||||||
|
Virtual: true,
|
||||||
|
OrganizationID: MustIDBase16(dbrpOrg3ID),
|
||||||
|
BucketID: 100,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "find virtual by rp",
|
name: "find virtual by rp",
|
||||||
fields: DBRPMappingFields{
|
fields: DBRPMappingFields{
|
||||||
|
@ -713,7 +801,7 @@ func FindManyDBRPMappingsV2(
|
||||||
ID: 100,
|
ID: 100,
|
||||||
Database: "testdb",
|
Database: "testdb",
|
||||||
RetentionPolicy: "autogen",
|
RetentionPolicy: "autogen",
|
||||||
Default: false,
|
Default: true,
|
||||||
Virtual: true,
|
Virtual: true,
|
||||||
OrganizationID: MustIDBase16(dbrpOrg3ID),
|
OrganizationID: MustIDBase16(dbrpOrg3ID),
|
||||||
BucketID: 100,
|
BucketID: 100,
|
||||||
|
@ -722,7 +810,7 @@ func FindManyDBRPMappingsV2(
|
||||||
ID: 300,
|
ID: 300,
|
||||||
Database: "testdb3",
|
Database: "testdb3",
|
||||||
RetentionPolicy: "autogen",
|
RetentionPolicy: "autogen",
|
||||||
Default: false,
|
Default: true,
|
||||||
Virtual: true,
|
Virtual: true,
|
||||||
OrganizationID: MustIDBase16(dbrpOrg2ID),
|
OrganizationID: MustIDBase16(dbrpOrg2ID),
|
||||||
BucketID: 300,
|
BucketID: 300,
|
||||||
|
@ -1156,7 +1244,7 @@ func FindDBRPMappingByIDV2(
|
||||||
ID: MustIDBase16(dbrpBucketAID),
|
ID: MustIDBase16(dbrpBucketAID),
|
||||||
Database: "testdb",
|
Database: "testdb",
|
||||||
RetentionPolicy: "testrp",
|
RetentionPolicy: "testrp",
|
||||||
Default: false,
|
Default: true,
|
||||||
Virtual: true,
|
Virtual: true,
|
||||||
OrganizationID: MustIDBase16(dbrpOrg3ID),
|
OrganizationID: MustIDBase16(dbrpOrg3ID),
|
||||||
BucketID: MustIDBase16(dbrpBucketAID),
|
BucketID: MustIDBase16(dbrpBucketAID),
|
||||||
|
@ -1190,7 +1278,7 @@ func FindDBRPMappingByIDV2(
|
||||||
ID: MustIDBase16(dbrpBucketAID),
|
ID: MustIDBase16(dbrpBucketAID),
|
||||||
Database: "testdb",
|
Database: "testdb",
|
||||||
RetentionPolicy: "autogen",
|
RetentionPolicy: "autogen",
|
||||||
Default: false,
|
Default: true,
|
||||||
Virtual: true,
|
Virtual: true,
|
||||||
OrganizationID: MustIDBase16(dbrpOrg3ID),
|
OrganizationID: MustIDBase16(dbrpOrg3ID),
|
||||||
BucketID: MustIDBase16(dbrpBucketAID),
|
BucketID: MustIDBase16(dbrpBucketAID),
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/influx-cli/v2/api"
|
||||||
"github.com/influxdata/influxdb/v2"
|
"github.com/influxdata/influxdb/v2"
|
||||||
"github.com/influxdata/influxdb/v2/influxql/query"
|
"github.com/influxdata/influxdb/v2/influxql/query"
|
||||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||||
|
@ -66,6 +67,7 @@ func (e *LocalShardMapper) mapShards(ctx context.Context, a *LocalShardMapping,
|
||||||
OrgID: &orgID,
|
OrgID: &orgID,
|
||||||
Database: &s.Database,
|
Database: &s.Database,
|
||||||
RetentionPolicy: &s.RetentionPolicy,
|
RetentionPolicy: &s.RetentionPolicy,
|
||||||
|
Virtual: api.PtrBool(false),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("finding DBRP mappings: %v", err)
|
return fmt.Errorf("finding DBRP mappings: %v", err)
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
|
"github.com/influxdata/influx-cli/v2/api"
|
||||||
"github.com/influxdata/influxdb/v2"
|
"github.com/influxdata/influxdb/v2"
|
||||||
"github.com/influxdata/influxdb/v2/dbrp/mocks"
|
"github.com/influxdata/influxdb/v2/dbrp/mocks"
|
||||||
"github.com/influxdata/influxdb/v2/influxql/query"
|
"github.com/influxdata/influxdb/v2/influxql/query"
|
||||||
|
@ -27,7 +28,7 @@ func TestLocalShardMapper(t *testing.T) {
|
||||||
bucketID := platform.ID(0xffee)
|
bucketID := platform.ID(0xffee)
|
||||||
db := "db0"
|
db := "db0"
|
||||||
rp := "rp0"
|
rp := "rp0"
|
||||||
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &db, RetentionPolicy: &rp}
|
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &db, RetentionPolicy: &rp, Virtual: api.PtrBool(false)}
|
||||||
res := []*influxdb.DBRPMapping{{Database: db, RetentionPolicy: rp, OrganizationID: orgID, BucketID: bucketID}}
|
res := []*influxdb.DBRPMapping{{Database: db, RetentionPolicy: rp, OrganizationID: orgID, BucketID: bucketID}}
|
||||||
dbrp.EXPECT().
|
dbrp.EXPECT().
|
||||||
FindMany(gomock.Any(), filt).
|
FindMany(gomock.Any(), filt).
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
|
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
|
"github.com/influxdata/influx-cli/v2/api"
|
||||||
"github.com/influxdata/influxdb/v2"
|
"github.com/influxdata/influxdb/v2"
|
||||||
icontext "github.com/influxdata/influxdb/v2/context"
|
icontext "github.com/influxdata/influxdb/v2/context"
|
||||||
"github.com/influxdata/influxdb/v2/dbrp/mocks"
|
"github.com/influxdata/influxdb/v2/dbrp/mocks"
|
||||||
|
@ -45,7 +46,7 @@ func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) {
|
||||||
dbrp := mocks.NewMockDBRPMappingService(ctrl)
|
dbrp := mocks.NewMockDBRPMappingService(ctrl)
|
||||||
orgID := platform.ID(0xff00)
|
orgID := platform.ID(0xff00)
|
||||||
empty := ""
|
empty := ""
|
||||||
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty}
|
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty, Virtual: api.PtrBool(false)}
|
||||||
res := []*influxdb.DBRPMapping{{}}
|
res := []*influxdb.DBRPMapping{{}}
|
||||||
dbrp.EXPECT().
|
dbrp.EXPECT().
|
||||||
FindMany(gomock.Any(), filt).
|
FindMany(gomock.Any(), filt).
|
||||||
|
@ -111,7 +112,7 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) {
|
||||||
dbrp := mocks.NewMockDBRPMappingService(ctrl)
|
dbrp := mocks.NewMockDBRPMappingService(ctrl)
|
||||||
orgID := platform.ID(0xff00)
|
orgID := platform.ID(0xff00)
|
||||||
empty := ""
|
empty := ""
|
||||||
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty}
|
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty, Virtual: api.PtrBool(false)}
|
||||||
res := []*influxdb.DBRPMapping{{}}
|
res := []*influxdb.DBRPMapping{{}}
|
||||||
dbrp.EXPECT().
|
dbrp.EXPECT().
|
||||||
FindMany(gomock.Any(), filt).
|
FindMany(gomock.Any(), filt).
|
||||||
|
|
Loading…
Reference in New Issue