fix: fixes an error querying virtual dbrps (#23731)
* fix: fixes an error querying virtual dbrps When the virtual pointer was set to false, the mappings were being ignored. * fix: missed part in a rebase * test: add test for shard mapping virtual dbrps * fix: do not create virtual mappings for equivalent physical mappingspull/23812/head
parent
a0f1184bb3
commit
4ed184dd82
|
@ -366,6 +366,7 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
|
|||
// we were unable to find any virtual mappings, so return what physical mappings we have
|
||||
return ms, len(ms), nil
|
||||
}
|
||||
OUTER:
|
||||
for _, bucket := range buckets {
|
||||
if bucket == nil {
|
||||
continue
|
||||
|
@ -373,9 +374,12 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
|
|||
newMapping := bucketToMapping(bucket)
|
||||
// if any bucket already exists that is default for this database,
|
||||
// this virtual mapping should not be the default
|
||||
if newMapping.Default {
|
||||
for _, m := range ms {
|
||||
if m.Database == newMapping.Database && m.Default {
|
||||
for _, m := range ms {
|
||||
if m.Database == newMapping.Database {
|
||||
if newMapping.Virtual && m.RetentionPolicy == newMapping.RetentionPolicy {
|
||||
continue OUTER
|
||||
}
|
||||
if m.Default && newMapping.Default {
|
||||
newMapping.Default = false
|
||||
break
|
||||
}
|
||||
|
|
|
@ -701,7 +701,17 @@ func FindManyDBRPMappingsV2(
|
|||
{ID: 500, Name: "testdb4", OrgID: MustIDBase16(dbrpOrg2ID)},
|
||||
}, 0, nil
|
||||
}},
|
||||
DBRPMappingsV2: []*influxdb.DBRPMapping{},
|
||||
DBRPMappingsV2: []*influxdb.DBRPMapping{
|
||||
{
|
||||
ID: 500,
|
||||
Database: "testdb4",
|
||||
RetentionPolicy: "autogen",
|
||||
Default: true,
|
||||
Virtual: false,
|
||||
OrganizationID: MustIDBase16(dbrpOrg2ID),
|
||||
BucketID: 500,
|
||||
},
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
filter: influxdb.DBRPMappingFilter{
|
||||
|
@ -715,7 +725,7 @@ func FindManyDBRPMappingsV2(
|
|||
Database: "testdb4",
|
||||
RetentionPolicy: "autogen",
|
||||
Default: true,
|
||||
Virtual: true,
|
||||
Virtual: false,
|
||||
OrganizationID: MustIDBase16(dbrpOrg2ID),
|
||||
BucketID: 500,
|
||||
},
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influx-cli/v2/api"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/influxql/query"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
|
@ -67,7 +66,7 @@ func (e *LocalShardMapper) mapShards(ctx context.Context, a *LocalShardMapping,
|
|||
OrgID: &orgID,
|
||||
Database: &s.Database,
|
||||
RetentionPolicy: &s.RetentionPolicy,
|
||||
Virtual: api.PtrBool(false),
|
||||
Virtual: nil,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("finding DBRP mappings: %v", err)
|
||||
|
|
|
@ -2,12 +2,12 @@ package coordinator_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"github.com/influxdata/influx-cli/v2/api"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/influxdata/influx-cli/v2/api"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/dbrp/mocks"
|
||||
"github.com/influxdata/influxdb/v2/influxql/query"
|
||||
|
@ -20,107 +20,124 @@ import (
|
|||
)
|
||||
|
||||
func TestLocalShardMapper(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
dbrp := mocks.NewMockDBRPMappingService(ctrl)
|
||||
orgID := platform.ID(0xff00)
|
||||
bucketID := platform.ID(0xffee)
|
||||
db := "db0"
|
||||
rp := "rp0"
|
||||
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &db, RetentionPolicy: &rp, Virtual: api.PtrBool(false)}
|
||||
res := []*influxdb.DBRPMapping{{Database: db, RetentionPolicy: rp, OrganizationID: orgID, BucketID: bucketID}}
|
||||
dbrp.EXPECT().
|
||||
FindMany(gomock.Any(), filt).
|
||||
Times(2).
|
||||
Return(res, 1, nil)
|
||||
|
||||
var metaClient MetaClient
|
||||
metaClient.ShardGroupsByTimeRangeFn = func(database, policy string, min, max time.Time) ([]meta.ShardGroupInfo, error) {
|
||||
if database != bucketID.String() {
|
||||
t.Errorf("unexpected database: %s", database)
|
||||
}
|
||||
if policy != meta.DefaultRetentionPolicyName {
|
||||
t.Errorf("unexpected retention policy: %s", policy)
|
||||
}
|
||||
return []meta.ShardGroupInfo{
|
||||
{ID: 1, Shards: []meta.ShardInfo{
|
||||
{ID: 1, Owners: []meta.ShardOwner{{NodeID: 0}}},
|
||||
{ID: 2, Owners: []meta.ShardOwner{{NodeID: 0}}},
|
||||
}},
|
||||
{ID: 2, Shards: []meta.ShardInfo{
|
||||
{ID: 3, Owners: []meta.ShardOwner{{NodeID: 0}}},
|
||||
{ID: 4, Owners: []meta.ShardOwner{{NodeID: 0}}},
|
||||
}},
|
||||
}, nil
|
||||
}
|
||||
|
||||
tsdbStore := &internal.TSDBStoreMock{}
|
||||
tsdbStore.ShardGroupFn = func(ids []uint64) tsdb.ShardGroup {
|
||||
if !reflect.DeepEqual(ids, []uint64{1, 2, 3, 4}) {
|
||||
t.Errorf("unexpected shard ids: %#v", ids)
|
||||
}
|
||||
|
||||
var sh MockShard
|
||||
sh.CreateIteratorFn = func(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
|
||||
if measurement.Name != "cpu" {
|
||||
t.Errorf("unexpected measurement: %s", measurement.Name)
|
||||
}
|
||||
return &FloatIterator{}, nil
|
||||
}
|
||||
return &sh
|
||||
}
|
||||
|
||||
// Initialize the shard mapper.
|
||||
shardMapper := &coordinator.LocalShardMapper{
|
||||
MetaClient: &metaClient,
|
||||
TSDBStore: tsdbStore,
|
||||
DBRP: dbrp,
|
||||
}
|
||||
|
||||
// Normal measurement.
|
||||
measurement := &influxql.Measurement{
|
||||
Database: db,
|
||||
RetentionPolicy: rp,
|
||||
Name: "cpu",
|
||||
}
|
||||
ic, err := shardMapper.MapShards(context.Background(), []influxql.Source{measurement}, influxql.TimeRange{}, query.SelectOptions{OrgID: orgID})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
// This should be a LocalShardMapping.
|
||||
m, ok := ic.(*coordinator.LocalShardMapping)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected mapping type: %T", ic)
|
||||
} else if len(m.ShardMap) != 1 {
|
||||
t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap))
|
||||
}
|
||||
|
||||
if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{OrgID: orgID}); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
// Subquery.
|
||||
subquery := &influxql.SubQuery{
|
||||
Statement: &influxql.SelectStatement{
|
||||
Sources: []influxql.Source{measurement},
|
||||
tests := []struct {
|
||||
name string
|
||||
db string
|
||||
rp string
|
||||
filt influxdb.DBRPMappingFilter
|
||||
mapping []*influxdb.DBRPMapping
|
||||
}{
|
||||
{
|
||||
name: "Physical DBRP Mapping",
|
||||
db: "db0",
|
||||
rp: "rp0",
|
||||
filt: influxdb.DBRPMappingFilter{OrgID: &orgID, Database: api.PtrString("db0"), RetentionPolicy: api.PtrString("rp0"), Virtual: nil},
|
||||
mapping: []*influxdb.DBRPMapping{{Database: "db0", RetentionPolicy: "rp0", OrganizationID: orgID, BucketID: bucketID}},
|
||||
},
|
||||
}
|
||||
ic, err = shardMapper.MapShards(context.Background(), []influxql.Source{subquery}, influxql.TimeRange{}, query.SelectOptions{OrgID: orgID})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
// This should be a LocalShardMapping.
|
||||
m, ok = ic.(*coordinator.LocalShardMapping)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected mapping type: %T", ic)
|
||||
} else if len(m.ShardMap) != 1 {
|
||||
t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap))
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{OrgID: orgID}); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
dbrp := mocks.NewMockDBRPMappingService(ctrl)
|
||||
dbrp.EXPECT().
|
||||
FindMany(gomock.Any(), tc.filt).
|
||||
Times(2).
|
||||
Return(tc.mapping, len(tc.mapping), nil)
|
||||
|
||||
var metaClient MetaClient
|
||||
metaClient.ShardGroupsByTimeRangeFn = func(database, policy string, min, max time.Time) ([]meta.ShardGroupInfo, error) {
|
||||
if database != bucketID.String() {
|
||||
t.Errorf("unexpected database: %s", database)
|
||||
}
|
||||
if policy != meta.DefaultRetentionPolicyName {
|
||||
t.Errorf("unexpected retention policy: %s", policy)
|
||||
}
|
||||
return []meta.ShardGroupInfo{
|
||||
{ID: 1, Shards: []meta.ShardInfo{
|
||||
{ID: 1, Owners: []meta.ShardOwner{{NodeID: 0}}},
|
||||
{ID: 2, Owners: []meta.ShardOwner{{NodeID: 0}}},
|
||||
}},
|
||||
{ID: 2, Shards: []meta.ShardInfo{
|
||||
{ID: 3, Owners: []meta.ShardOwner{{NodeID: 0}}},
|
||||
{ID: 4, Owners: []meta.ShardOwner{{NodeID: 0}}},
|
||||
}},
|
||||
}, nil
|
||||
}
|
||||
|
||||
tsdbStore := &internal.TSDBStoreMock{}
|
||||
tsdbStore.ShardGroupFn = func(ids []uint64) tsdb.ShardGroup {
|
||||
if !reflect.DeepEqual(ids, []uint64{1, 2, 3, 4}) {
|
||||
t.Errorf("unexpected shard ids: %#v", ids)
|
||||
}
|
||||
|
||||
var sh MockShard
|
||||
sh.CreateIteratorFn = func(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
|
||||
if measurement.Name != "cpu" {
|
||||
t.Errorf("unexpected measurement: %s", measurement.Name)
|
||||
}
|
||||
return &FloatIterator{}, nil
|
||||
}
|
||||
return &sh
|
||||
}
|
||||
|
||||
// Initialize the shard mapper.
|
||||
shardMapper := &coordinator.LocalShardMapper{
|
||||
MetaClient: &metaClient,
|
||||
TSDBStore: tsdbStore,
|
||||
DBRP: dbrp,
|
||||
}
|
||||
|
||||
// Normal measurement.
|
||||
measurement := &influxql.Measurement{
|
||||
Database: tc.db,
|
||||
RetentionPolicy: tc.rp,
|
||||
Name: "cpu",
|
||||
}
|
||||
ic, err := shardMapper.MapShards(context.Background(), []influxql.Source{measurement}, influxql.TimeRange{}, query.SelectOptions{OrgID: orgID})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
// This should be a LocalShardMapping.
|
||||
m, ok := ic.(*coordinator.LocalShardMapping)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected mapping type: %T", ic)
|
||||
} else if len(m.ShardMap) != 1 {
|
||||
t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap))
|
||||
}
|
||||
|
||||
if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{OrgID: orgID}); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
// Subquery.
|
||||
subquery := &influxql.SubQuery{
|
||||
Statement: &influxql.SelectStatement{
|
||||
Sources: []influxql.Source{measurement},
|
||||
},
|
||||
}
|
||||
ic, err = shardMapper.MapShards(context.Background(), []influxql.Source{subquery}, influxql.TimeRange{}, query.SelectOptions{OrgID: orgID})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
// This should be a LocalShardMapping.
|
||||
m, ok = ic.(*coordinator.LocalShardMapping)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected mapping type: %T", ic)
|
||||
} else if len(m.ShardMap) != 1 {
|
||||
t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap))
|
||||
}
|
||||
|
||||
if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{OrgID: orgID}); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/influxdata/influx-cli/v2/api"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
icontext "github.com/influxdata/influxdb/v2/context"
|
||||
"github.com/influxdata/influxdb/v2/dbrp/mocks"
|
||||
|
@ -46,7 +45,7 @@ func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) {
|
|||
dbrp := mocks.NewMockDBRPMappingService(ctrl)
|
||||
orgID := platform.ID(0xff00)
|
||||
empty := ""
|
||||
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty, Virtual: api.PtrBool(false)}
|
||||
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty, Virtual: nil}
|
||||
res := []*influxdb.DBRPMapping{{}}
|
||||
dbrp.EXPECT().
|
||||
FindMany(gomock.Any(), filt).
|
||||
|
@ -112,7 +111,7 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) {
|
|||
dbrp := mocks.NewMockDBRPMappingService(ctrl)
|
||||
orgID := platform.ID(0xff00)
|
||||
empty := ""
|
||||
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty, Virtual: api.PtrBool(false)}
|
||||
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty, Virtual: nil}
|
||||
res := []*influxdb.DBRPMapping{{}}
|
||||
dbrp.EXPECT().
|
||||
FindMany(gomock.Any(), filt).
|
||||
|
|
Loading…
Reference in New Issue