From 4ed184dd82e5c5d44d037cc5b6f796589b7455a4 Mon Sep 17 00:00:00 2001 From: Jeffrey Smith II Date: Thu, 13 Oct 2022 14:58:07 -0400 Subject: [PATCH] fix: fixes an error querying virtual dbrps (#23731) * fix: fixes an error querying virtual dbrps When the virtual pointer was set to false, the mappings were being ignored. * fix: missed part in a rebase * test: add test for shard mapping virtual dbrps * fix: do not create virtual mappings for equivalent physical mappings --- dbrp/service.go | 10 +- testing/dbrp_mapping.go | 14 +- v1/coordinator/shard_mapper.go | 3 +- v1/coordinator/shard_mapper_test.go | 209 ++++++++++++---------- v1/coordinator/statement_executor_test.go | 5 +- 5 files changed, 135 insertions(+), 106 deletions(-) diff --git a/dbrp/service.go b/dbrp/service.go index 0575f796eb..2ad700c69d 100644 --- a/dbrp/service.go +++ b/dbrp/service.go @@ -366,6 +366,7 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte // we were unable to find any virtual mappings, so return what physical mappings we have return ms, len(ms), nil } +OUTER: for _, bucket := range buckets { if bucket == nil { continue @@ -373,9 +374,12 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte newMapping := bucketToMapping(bucket) // if any bucket already exists that is default for this database, // this virtual mapping should not be the default - if newMapping.Default { - for _, m := range ms { - if m.Database == newMapping.Database && m.Default { + for _, m := range ms { + if m.Database == newMapping.Database { + if newMapping.Virtual && m.RetentionPolicy == newMapping.RetentionPolicy { + continue OUTER + } + if m.Default && newMapping.Default { newMapping.Default = false break } diff --git a/testing/dbrp_mapping.go b/testing/dbrp_mapping.go index dbc4771514..3a5d0ed866 100644 --- a/testing/dbrp_mapping.go +++ b/testing/dbrp_mapping.go @@ -701,7 +701,17 @@ func FindManyDBRPMappingsV2( {ID: 500, Name: "testdb4", OrgID: MustIDBase16(dbrpOrg2ID)}, }, 0, nil }}, - DBRPMappingsV2: []*influxdb.DBRPMapping{}, + DBRPMappingsV2: []*influxdb.DBRPMapping{ + { + ID: 500, + Database: "testdb4", + RetentionPolicy: "autogen", + Default: true, + Virtual: false, + OrganizationID: MustIDBase16(dbrpOrg2ID), + BucketID: 500, + }, + }, }, args: args{ filter: influxdb.DBRPMappingFilter{ @@ -715,7 +725,7 @@ func FindManyDBRPMappingsV2( Database: "testdb4", RetentionPolicy: "autogen", Default: true, - Virtual: true, + Virtual: false, OrganizationID: MustIDBase16(dbrpOrg2ID), BucketID: 500, }, diff --git a/v1/coordinator/shard_mapper.go b/v1/coordinator/shard_mapper.go index c62d898b14..97fac2b584 100644 --- a/v1/coordinator/shard_mapper.go +++ b/v1/coordinator/shard_mapper.go @@ -6,7 +6,6 @@ import ( "io" "time" - "github.com/influxdata/influx-cli/v2/api" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/kit/platform" @@ -67,7 +66,7 @@ func (e *LocalShardMapper) mapShards(ctx context.Context, a *LocalShardMapping, OrgID: &orgID, Database: &s.Database, RetentionPolicy: &s.RetentionPolicy, - Virtual: api.PtrBool(false), + Virtual: nil, }) if err != nil { return fmt.Errorf("finding DBRP mappings: %v", err) diff --git a/v1/coordinator/shard_mapper_test.go b/v1/coordinator/shard_mapper_test.go index 73d56b1356..29c5c0a3c9 100644 --- a/v1/coordinator/shard_mapper_test.go +++ b/v1/coordinator/shard_mapper_test.go @@ -2,12 +2,12 @@ package coordinator_test import ( "context" + "github.com/influxdata/influx-cli/v2/api" "reflect" "testing" "time" "github.com/golang/mock/gomock" - "github.com/influxdata/influx-cli/v2/api" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/dbrp/mocks" "github.com/influxdata/influxdb/v2/influxql/query" @@ -20,107 +20,124 @@ import ( ) func TestLocalShardMapper(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - dbrp := mocks.NewMockDBRPMappingService(ctrl) orgID := platform.ID(0xff00) bucketID := platform.ID(0xffee) - db := "db0" - rp := "rp0" - filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &db, RetentionPolicy: &rp, Virtual: api.PtrBool(false)} - res := []*influxdb.DBRPMapping{{Database: db, RetentionPolicy: rp, OrganizationID: orgID, BucketID: bucketID}} - dbrp.EXPECT(). - FindMany(gomock.Any(), filt). - Times(2). - Return(res, 1, nil) - var metaClient MetaClient - metaClient.ShardGroupsByTimeRangeFn = func(database, policy string, min, max time.Time) ([]meta.ShardGroupInfo, error) { - if database != bucketID.String() { - t.Errorf("unexpected database: %s", database) - } - if policy != meta.DefaultRetentionPolicyName { - t.Errorf("unexpected retention policy: %s", policy) - } - return []meta.ShardGroupInfo{ - {ID: 1, Shards: []meta.ShardInfo{ - {ID: 1, Owners: []meta.ShardOwner{{NodeID: 0}}}, - {ID: 2, Owners: []meta.ShardOwner{{NodeID: 0}}}, - }}, - {ID: 2, Shards: []meta.ShardInfo{ - {ID: 3, Owners: []meta.ShardOwner{{NodeID: 0}}}, - {ID: 4, Owners: []meta.ShardOwner{{NodeID: 0}}}, - }}, - }, nil - } - - tsdbStore := &internal.TSDBStoreMock{} - tsdbStore.ShardGroupFn = func(ids []uint64) tsdb.ShardGroup { - if !reflect.DeepEqual(ids, []uint64{1, 2, 3, 4}) { - t.Errorf("unexpected shard ids: %#v", ids) - } - - var sh MockShard - sh.CreateIteratorFn = func(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { - if measurement.Name != "cpu" { - t.Errorf("unexpected measurement: %s", measurement.Name) - } - return &FloatIterator{}, nil - } - return &sh - } - - // Initialize the shard mapper. - shardMapper := &coordinator.LocalShardMapper{ - MetaClient: &metaClient, - TSDBStore: tsdbStore, - DBRP: dbrp, - } - - // Normal measurement. - measurement := &influxql.Measurement{ - Database: db, - RetentionPolicy: rp, - Name: "cpu", - } - ic, err := shardMapper.MapShards(context.Background(), []influxql.Source{measurement}, influxql.TimeRange{}, query.SelectOptions{OrgID: orgID}) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - - // This should be a LocalShardMapping. - m, ok := ic.(*coordinator.LocalShardMapping) - if !ok { - t.Fatalf("unexpected mapping type: %T", ic) - } else if len(m.ShardMap) != 1 { - t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap)) - } - - if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{OrgID: orgID}); err != nil { - t.Fatalf("unexpected error: %s", err) - } - - // Subquery. - subquery := &influxql.SubQuery{ - Statement: &influxql.SelectStatement{ - Sources: []influxql.Source{measurement}, + tests := []struct { + name string + db string + rp string + filt influxdb.DBRPMappingFilter + mapping []*influxdb.DBRPMapping + }{ + { + name: "Physical DBRP Mapping", + db: "db0", + rp: "rp0", + filt: influxdb.DBRPMappingFilter{OrgID: &orgID, Database: api.PtrString("db0"), RetentionPolicy: api.PtrString("rp0"), Virtual: nil}, + mapping: []*influxdb.DBRPMapping{{Database: "db0", RetentionPolicy: "rp0", OrganizationID: orgID, BucketID: bucketID}}, }, } - ic, err = shardMapper.MapShards(context.Background(), []influxql.Source{subquery}, influxql.TimeRange{}, query.SelectOptions{OrgID: orgID}) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - // This should be a LocalShardMapping. - m, ok = ic.(*coordinator.LocalShardMapping) - if !ok { - t.Fatalf("unexpected mapping type: %T", ic) - } else if len(m.ShardMap) != 1 { - t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap)) - } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() - if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{OrgID: orgID}); err != nil { - t.Fatalf("unexpected error: %s", err) + dbrp := mocks.NewMockDBRPMappingService(ctrl) + dbrp.EXPECT(). + FindMany(gomock.Any(), tc.filt). + Times(2). + Return(tc.mapping, len(tc.mapping), nil) + + var metaClient MetaClient + metaClient.ShardGroupsByTimeRangeFn = func(database, policy string, min, max time.Time) ([]meta.ShardGroupInfo, error) { + if database != bucketID.String() { + t.Errorf("unexpected database: %s", database) + } + if policy != meta.DefaultRetentionPolicyName { + t.Errorf("unexpected retention policy: %s", policy) + } + return []meta.ShardGroupInfo{ + {ID: 1, Shards: []meta.ShardInfo{ + {ID: 1, Owners: []meta.ShardOwner{{NodeID: 0}}}, + {ID: 2, Owners: []meta.ShardOwner{{NodeID: 0}}}, + }}, + {ID: 2, Shards: []meta.ShardInfo{ + {ID: 3, Owners: []meta.ShardOwner{{NodeID: 0}}}, + {ID: 4, Owners: []meta.ShardOwner{{NodeID: 0}}}, + }}, + }, nil + } + + tsdbStore := &internal.TSDBStoreMock{} + tsdbStore.ShardGroupFn = func(ids []uint64) tsdb.ShardGroup { + if !reflect.DeepEqual(ids, []uint64{1, 2, 3, 4}) { + t.Errorf("unexpected shard ids: %#v", ids) + } + + var sh MockShard + sh.CreateIteratorFn = func(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { + if measurement.Name != "cpu" { + t.Errorf("unexpected measurement: %s", measurement.Name) + } + return &FloatIterator{}, nil + } + return &sh + } + + // Initialize the shard mapper. + shardMapper := &coordinator.LocalShardMapper{ + MetaClient: &metaClient, + TSDBStore: tsdbStore, + DBRP: dbrp, + } + + // Normal measurement. + measurement := &influxql.Measurement{ + Database: tc.db, + RetentionPolicy: tc.rp, + Name: "cpu", + } + ic, err := shardMapper.MapShards(context.Background(), []influxql.Source{measurement}, influxql.TimeRange{}, query.SelectOptions{OrgID: orgID}) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // This should be a LocalShardMapping. + m, ok := ic.(*coordinator.LocalShardMapping) + if !ok { + t.Fatalf("unexpected mapping type: %T", ic) + } else if len(m.ShardMap) != 1 { + t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap)) + } + + if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{OrgID: orgID}); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Subquery. + subquery := &influxql.SubQuery{ + Statement: &influxql.SelectStatement{ + Sources: []influxql.Source{measurement}, + }, + } + ic, err = shardMapper.MapShards(context.Background(), []influxql.Source{subquery}, influxql.TimeRange{}, query.SelectOptions{OrgID: orgID}) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // This should be a LocalShardMapping. + m, ok = ic.(*coordinator.LocalShardMapping) + if !ok { + t.Fatalf("unexpected mapping type: %T", ic) + } else if len(m.ShardMap) != 1 { + t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap)) + } + + if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{OrgID: orgID}); err != nil { + t.Fatalf("unexpected error: %s", err) + } + }) } } diff --git a/v1/coordinator/statement_executor_test.go b/v1/coordinator/statement_executor_test.go index 5aa3890c44..a50b69ae36 100644 --- a/v1/coordinator/statement_executor_test.go +++ b/v1/coordinator/statement_executor_test.go @@ -12,7 +12,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/golang/mock/gomock" - "github.com/influxdata/influx-cli/v2/api" "github.com/influxdata/influxdb/v2" icontext "github.com/influxdata/influxdb/v2/context" "github.com/influxdata/influxdb/v2/dbrp/mocks" @@ -46,7 +45,7 @@ func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) { dbrp := mocks.NewMockDBRPMappingService(ctrl) orgID := platform.ID(0xff00) empty := "" - filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty, Virtual: api.PtrBool(false)} + filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty, Virtual: nil} res := []*influxdb.DBRPMapping{{}} dbrp.EXPECT(). FindMany(gomock.Any(), filt). @@ -112,7 +111,7 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) { dbrp := mocks.NewMockDBRPMappingService(ctrl) orgID := platform.ID(0xff00) empty := "" - filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty, Virtual: api.PtrBool(false)} + filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty, Virtual: nil} res := []*influxdb.DBRPMapping{{}} dbrp.EXPECT(). FindMany(gomock.Any(), filt).