feat: add virtual DBRP mappings based on bucket name (#23606)

* feat: add virtual DBRP mapping based on bucket name

* fix: improve error handling logic

* fix: update physical dbrp tests

* fix: update influxql tests with auto-mapped buckets

* fix: add virtual filtering and testing for virtual dbrps
pull/23524/head
Andrew Lee 2022-08-03 15:44:04 -06:00 committed by GitHub
parent f7b1905ed7
commit adeac8beea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 301 additions and 6 deletions

View File

@ -489,7 +489,7 @@ func Test_handleDeleteDBRP(t *testing.T) {
BucketID: influxdbtesting.MustIDBase16("5555f7ed2a035555"), BucketID: influxdbtesting.MustIDBase16("5555f7ed2a035555"),
OrganizationID: influxdbtesting.MustIDBase16("059af7ed2a034000"), OrganizationID: influxdbtesting.MustIDBase16("059af7ed2a034000"),
Database: "mydb", Database: "mydb",
RetentionPolicy: "autogen", RetentionPolicy: "testrp",
Default: true, Default: true,
} }
if err := svc.Create(ctx, d); err != nil { if err := svc.Create(ctx, d); err != nil {

View File

@ -28,6 +28,7 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"strings"
"github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/kit/platform"
@ -231,11 +232,28 @@ func (s *Service) FindByID(ctx context.Context, orgID, id platform.ID) (*influxd
} }
return nil return nil
}); err != nil { }); err != nil {
// if not found, fallback to virtual DBRP search
if err == ErrDBRPNotFound {
b, err := s.bucketSvc.FindBucketByID(ctx, id)
if err != nil || b == nil {
return nil, ErrDBRPNotFound
}
return bucketToMapping(b), nil
}
return nil, err return nil, err
} }
return m, nil return m, nil
} }
// parseDBRP parses DB and RP strings out of a bucket name
func parseDBRP(bucketName string) (string, string) {
db, rp, isCut := strings.Cut(bucketName, "/")
if isCut {
return db, rp
}
return bucketName, "autogen"
}
// FindMany returns a list of mappings that match filter and the total count of matching dbrp mappings. // FindMany returns a list of mappings that match filter and the total count of matching dbrp mappings.
// TODO(affo): find a smart way to apply FindOptions to a list of items. // TODO(affo): find a smart way to apply FindOptions to a list of items.
func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilter, opts ...influxdb.FindOptions) ([]*influxdb.DBRPMapping, int, error) { func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilter, opts ...influxdb.FindOptions) ([]*influxdb.DBRPMapping, int, error) {
@ -334,8 +352,65 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
} }
return nil return nil
}) })
if err != nil {
return ms, len(ms), err
}
return ms, len(ms), err buckets, count, err := s.bucketSvc.FindBuckets(ctx, influxdb.BucketFilter{
ID: filter.BucketID,
Name: filter.Database,
OrganizationID: filter.OrgID,
}, opts...)
if (err != nil || count == 0) && filter.Database != nil && filter.RetentionPolicy != nil {
// if the search couldn't find a corresponding dbrp, it could be that the bucket name has a slash (like db/rp)
// instead of just bucket name being the database with "autogen" retention policy
bucketName := *filter.Database + "/" + *filter.RetentionPolicy
buckets, _, err = s.bucketSvc.FindBuckets(ctx, influxdb.BucketFilter{
ID: filter.BucketID,
Name: &bucketName,
OrganizationID: filter.OrgID,
}, opts...)
if err != nil {
// we were unable to find any virtual mappings, so return what physical mappings we have
return ms, len(ms), nil
}
}
OUTER:
for _, bucket := range buckets {
// check if this virtual mapping has been overriden by a custom, physical mapping
for _, m := range ms {
if m.BucketID == bucket.ID {
continue OUTER
}
}
if bucket == nil {
continue
}
mapping := bucketToMapping(bucket)
if filterFunc(mapping, filter) {
ms = append(ms, mapping)
}
}
return ms, len(ms), nil
}
func bucketToMapping(bucket *influxdb.Bucket) *influxdb.DBRPMapping {
if bucket == nil {
return nil
}
// for now, virtual DBRPs will use the same ID as their bucket to be able to find them by ID
dbrpID := bucket.ID
db, rp := parseDBRP(bucket.Name)
return &influxdb.DBRPMapping{
ID: dbrpID,
Default: false,
Database: db,
RetentionPolicy: rp,
OrganizationID: bucket.OrgID,
BucketID: bucket.ID,
Virtual: true,
}
} }
// Create creates a new mapping. // Create creates a new mapping.
@ -354,7 +429,7 @@ func (s *Service) Create(ctx context.Context, dbrp *influxdb.DBRPMapping) error
} }
// If a dbrp with this particular ID already exists an error is returned. // If a dbrp with this particular ID already exists an error is returned.
if _, err := s.FindByID(ctx, dbrp.OrganizationID, dbrp.ID); err == nil { if d, err := s.FindByID(ctx, dbrp.OrganizationID, dbrp.ID); err == nil && !d.Virtual {
return ErrDBRPAlreadyExists("dbrp already exist for this particular ID. If you are trying an update use the right function .Update") return ErrDBRPAlreadyExists("dbrp already exist for this particular ID. If you are trying an update use the right function .Update")
} }
// If a dbrp with this orgID, db, and rp exists an error is returned. // If a dbrp with this orgID, db, and rp exists an error is returned.

View File

@ -23,6 +23,9 @@ func initDBRPMappingService(f itesting.DBRPMappingFields, t *testing.T) (influxd
Name: fmt.Sprintf("bucket-%v", id), Name: fmt.Sprintf("bucket-%v", id),
}, nil }, nil
}, },
FindBucketsFn: func(ctx context.Context, bf influxdb.BucketFilter, fo ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
return []*influxdb.Bucket{}, 0, nil
},
} }
} }

View File

@ -35,6 +35,8 @@ type DBRPMapping struct {
// Default indicates if this mapping is the default for the cluster and database. // Default indicates if this mapping is the default for the cluster and database.
Default bool `json:"default"` Default bool `json:"default"`
// Virtual indicates if this is a virtual mapping (tied to bucket name) or physical
Virtual bool `json:"virtual"`
OrganizationID platform.ID `json:"orgID"` OrganizationID platform.ID `json:"orgID"`
BucketID platform.ID `json:"bucketID"` BucketID platform.ID `json:"bucketID"`

View File

@ -177,7 +177,7 @@ func TestServer_Query_ShowDatabases(t *testing.T) {
&Query{ &Query{
name: "show databases does not return duplicates", name: "show databases does not return duplicates",
command: "SHOW DATABASES", command: "SHOW DATABASES",
exp: `{"results":[{"statement_id":0,"series":[{"name":"databases","columns":["name"],"values":[["my-bucket"],["telegraf"]]}]}]}`, exp: `{"results":[{"statement_id":0,"series":[{"name":"databases","columns":["name"],"values":[["my-bucket"],["telegraf"],["_monitoring"],["_tasks"],["db"]]}]}]}`,
}, },
) )

View File

@ -331,7 +331,9 @@ func CreateDBRPMappingV2(
} }
return nil, nil return nil, nil
}, },
}, FindBucketsFn: func(ctx context.Context, bf influxdb.BucketFilter, fo ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
return []*influxdb.Bucket{}, 0, nil
}},
DBRPMappingsV2: []*influxdb.DBRPMapping{{ DBRPMappingsV2: []*influxdb.DBRPMapping{{
ID: 100, ID: 100,
Database: "database1", Database: "database1",
@ -622,6 +624,112 @@ func FindManyDBRPMappingsV2(
}, },
}, },
}, },
{
name: "find virtual by orgID",
fields: DBRPMappingFields{
BucketSvc: &mock.BucketService{
FindBucketByIDFn: func(ctx context.Context, id platform.ID) (*influxdb.Bucket, error) {
if id == MustIDBase16(dbrpBucket2ID) {
return nil, &errors2.Error{
Code: errors2.ENotFound,
Msg: "bucket not found",
}
}
return nil, nil
},
FindBucketsFn: func(ctx context.Context, bf influxdb.BucketFilter, fo ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
return []*influxdb.Bucket{
// org 3
{ID: 100, Name: "testdb", OrgID: MustIDBase16(dbrpOrg3ID)},
{ID: 200, Name: "testdb2/testrp2", OrgID: MustIDBase16(dbrpOrg3ID)},
// org 2
{ID: 300, Name: "testdb3", OrgID: MustIDBase16(dbrpOrg2ID)},
{ID: 400, Name: "testdb4/testrp4", OrgID: MustIDBase16(dbrpOrg2ID)},
}, 0, nil
}},
DBRPMappingsV2: []*influxdb.DBRPMapping{},
},
args: args{
filter: influxdb.DBRPMappingFilter{
OrgID: MustIDBase16Ptr(dbrpOrg3ID),
},
},
wants: wants{
dbrpMappings: []*influxdb.DBRPMapping{
{
ID: 100,
Database: "testdb",
RetentionPolicy: "autogen",
Default: false,
Virtual: true,
OrganizationID: MustIDBase16(dbrpOrg3ID),
BucketID: 100,
},
{
ID: 200,
Database: "testdb2",
RetentionPolicy: "testrp2",
Default: false,
Virtual: true,
OrganizationID: MustIDBase16(dbrpOrg3ID),
BucketID: 200,
},
},
},
},
{
name: "find virtual by rp",
fields: DBRPMappingFields{
BucketSvc: &mock.BucketService{
FindBucketByIDFn: func(ctx context.Context, id platform.ID) (*influxdb.Bucket, error) {
if id == MustIDBase16(dbrpBucket2ID) {
return nil, &errors2.Error{
Code: errors2.ENotFound,
Msg: "bucket not found",
}
}
return nil, nil
},
FindBucketsFn: func(ctx context.Context, bf influxdb.BucketFilter, fo ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
return []*influxdb.Bucket{
// org 3
{ID: 100, Name: "testdb", OrgID: MustIDBase16(dbrpOrg3ID)},
{ID: 200, Name: "testdb2/testrp2", OrgID: MustIDBase16(dbrpOrg3ID)},
// org 2
{ID: 300, Name: "testdb3", OrgID: MustIDBase16(dbrpOrg2ID)},
{ID: 400, Name: "testdb4/testrp4", OrgID: MustIDBase16(dbrpOrg2ID)},
}, 0, nil
}},
DBRPMappingsV2: []*influxdb.DBRPMapping{},
},
args: args{
filter: influxdb.DBRPMappingFilter{
RetentionPolicy: stringPtr("autogen"),
},
},
wants: wants{
dbrpMappings: []*influxdb.DBRPMapping{
{
ID: 100,
Database: "testdb",
RetentionPolicy: "autogen",
Default: false,
Virtual: true,
OrganizationID: MustIDBase16(dbrpOrg3ID),
BucketID: 100,
},
{
ID: 300,
Database: "testdb3",
RetentionPolicy: "autogen",
Default: false,
Virtual: true,
OrganizationID: MustIDBase16(dbrpOrg2ID),
BucketID: 300,
},
},
},
},
{ {
name: "find by db", name: "find by db",
fields: DBRPMappingFields{ fields: DBRPMappingFields{
@ -989,6 +1097,19 @@ func FindDBRPMappingByIDV2(
{ {
name: "find non existing dbrp", name: "find non existing dbrp",
fields: DBRPMappingFields{ fields: DBRPMappingFields{
BucketSvc: &mock.BucketService{
FindBucketByIDFn: func(ctx context.Context, id platform.ID) (*influxdb.Bucket, error) {
if id == MustIDBase16(dbrpBucketAID) {
return &influxdb.Bucket{ID: id}, nil
}
return nil, &errors2.Error{
Code: errors2.ENotFound,
Msg: "bucket not found",
}
},
FindBucketsFn: func(ctx context.Context, bf influxdb.BucketFilter, fo ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
return []*influxdb.Bucket{}, 0, nil
}},
DBRPMappingsV2: []*influxdb.DBRPMapping{ DBRPMappingsV2: []*influxdb.DBRPMapping{
{ {
ID: 100, ID: 100,
@ -1008,9 +1129,90 @@ func FindDBRPMappingByIDV2(
err: dbrp.ErrDBRPNotFound, err: dbrp.ErrDBRPNotFound,
}, },
}, },
{
name: "find virtual dbrp with slash",
fields: DBRPMappingFields{
BucketSvc: &mock.BucketService{
FindBucketByIDFn: func(ctx context.Context, id platform.ID) (*influxdb.Bucket, error) {
if id == MustIDBase16(dbrpBucketAID) {
return &influxdb.Bucket{ID: id, Name: "testdb/testrp", OrgID: MustIDBase16(dbrpOrg3ID)}, nil
}
return nil, &errors2.Error{
Code: errors2.ENotFound,
Msg: "bucket not found",
}
},
FindBucketsFn: func(ctx context.Context, bf influxdb.BucketFilter, fo ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
return []*influxdb.Bucket{}, 0, nil
}},
DBRPMappingsV2: []*influxdb.DBRPMapping{},
},
args: args{
OrgID: MustIDBase16(dbrpOrg3ID),
ID: MustIDBase16(dbrpBucketAID),
},
wants: wants{
dbrpMapping: &influxdb.DBRPMapping{
ID: MustIDBase16(dbrpBucketAID),
Database: "testdb",
RetentionPolicy: "testrp",
Default: false,
Virtual: true,
OrganizationID: MustIDBase16(dbrpOrg3ID),
BucketID: MustIDBase16(dbrpBucketAID),
},
},
},
{
name: "find virtual dbrp",
fields: DBRPMappingFields{
BucketSvc: &mock.BucketService{
FindBucketByIDFn: func(ctx context.Context, id platform.ID) (*influxdb.Bucket, error) {
if id == MustIDBase16(dbrpBucketAID) {
return &influxdb.Bucket{ID: id, Name: "testdb", OrgID: MustIDBase16(dbrpOrg3ID)}, nil
}
return nil, &errors2.Error{
Code: errors2.ENotFound,
Msg: "bucket not found",
}
},
FindBucketsFn: func(ctx context.Context, bf influxdb.BucketFilter, fo ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
return []*influxdb.Bucket{}, 0, nil
}},
DBRPMappingsV2: []*influxdb.DBRPMapping{},
},
args: args{
OrgID: MustIDBase16(dbrpOrg3ID),
ID: MustIDBase16(dbrpBucketAID),
},
wants: wants{
dbrpMapping: &influxdb.DBRPMapping{
ID: MustIDBase16(dbrpBucketAID),
Database: "testdb",
RetentionPolicy: "autogen",
Default: false,
Virtual: true,
OrganizationID: MustIDBase16(dbrpOrg3ID),
BucketID: MustIDBase16(dbrpBucketAID),
},
},
},
{ {
name: "find existing dbrp but wrong orgID", name: "find existing dbrp but wrong orgID",
fields: DBRPMappingFields{ fields: DBRPMappingFields{
BucketSvc: &mock.BucketService{
FindBucketByIDFn: func(ctx context.Context, id platform.ID) (*influxdb.Bucket, error) {
if id == MustIDBase16(dbrpBucketAID) {
return &influxdb.Bucket{ID: id}, nil
}
return nil, &errors2.Error{
Code: errors2.ENotFound,
Msg: "bucket not found",
}
},
FindBucketsFn: func(ctx context.Context, bf influxdb.BucketFilter, fo ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
return []*influxdb.Bucket{}, 0, nil
}},
DBRPMappingsV2: []*influxdb.DBRPMapping{ DBRPMappingsV2: []*influxdb.DBRPMapping{
{ {
ID: 100, ID: 100,
@ -1146,6 +1348,19 @@ func UpdateDBRPMappingV2(
{ {
name: "error dbrp not found", name: "error dbrp not found",
fields: DBRPMappingFields{ fields: DBRPMappingFields{
BucketSvc: &mock.BucketService{
FindBucketByIDFn: func(ctx context.Context, id platform.ID) (*influxdb.Bucket, error) {
if id == MustIDBase16(dbrpBucket1ID) {
return &influxdb.Bucket{ID: id}, nil
}
return nil, &errors2.Error{
Code: errors2.ENotFound,
Msg: "bucket not found",
}
},
FindBucketsFn: func(ctx context.Context, bf influxdb.BucketFilter, fo ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
return []*influxdb.Bucket{}, 0, nil
}},
DBRPMappingsV2: []*influxdb.DBRPMapping{{ DBRPMappingsV2: []*influxdb.DBRPMapping{{
ID: 100, ID: 100,
Database: "database1", Database: "database1",
@ -1631,7 +1846,7 @@ func DeleteDBRPMappingV2(
}, },
args: args{ args: args{
OrgID: MustIDBase16(dbrpOrg2ID), OrgID: MustIDBase16(dbrpOrg2ID),
ID: 100, ID: 150,
}, },
wants: wants{ wants: wants{
dbrpMappings: []*influxdb.DBRPMapping{ dbrpMappings: []*influxdb.DBRPMapping{