diff --git a/CHANGELOG.md b/CHANGELOG.md index fc05849749..cabcb45628 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ - [#6676](https://github.com/influxdata/influxdb/issues/6676): Ensures client sends correct precision when inserting points. - [#2048](https://github.com/influxdata/influxdb/issues/2048): Check that retention policies exist before creating CQ - [#6702](https://github.com/influxdata/influxdb/issues/6702): Fix SELECT statement required privileges. +- [#6701](https://github.com/influxdata/influxdb/issues/6701): Filter out sources that do not match the shard database/retention policy. ## v0.13.0 [2016-05-12] diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 5905f4aaa5..ec248eaddb 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -6094,6 +6094,61 @@ func TestServer_Query_IntoTarget(t *testing.T) { } } +// This test ensures that data is not duplicated with measurements +// of the same name. +func TestServer_Query_DuplicateMeasurements(t *testing.T) { + t.Parallel() + s := OpenDefaultServer(NewConfig()) + defer s.Close() + + // Create a second database. + if err := s.CreateDatabaseAndRetentionPolicy("db1", newRetentionPolicyInfo("rp0", 1, 0)); err != nil { + t.Fatal(err) + } + if err := s.MetaClient.SetDefaultRetentionPolicy("db1", "rp0"); err != nil { + t.Fatal(err) + } + + test := NewTest("db0", "rp0") + test.writes = Writes{ + &Write{data: fmt.Sprintf(`cpu value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())}, + } + + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + + test = NewTest("db1", "rp0") + test.writes = Writes{ + &Write{data: fmt.Sprintf(`cpu value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano())}, + } + + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + + test.addQueries([]*Query{ + &Query{ + name: "select from both databases", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT value FROM db0.rp0.cpu, db1.rp0.cpu`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:00Z",1],["2000-01-01T00:00:10Z",2]]}]}]}`, + }, + }...) + + for _, query := range test.queries { + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } +} + // This test reproduced a data race with closing the // Subscriber points channel while writes were in-flight in the PointsWriter. func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) { diff --git a/influxql/ast.go b/influxql/ast.go index 7d9211ae3f..fdab052f3e 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -311,6 +311,20 @@ func (a Sources) Names() []string { return names } +// Filter returns a list of source names filtered by the database/retention policy. +func (a Sources) Filter(database, retentionPolicy string) []Source { + sources := make([]Source, 0, len(a)) + for _, s := range a { + switch s := s.(type) { + case *Measurement: + if s.Database == database && s.RetentionPolicy == retentionPolicy { + sources = append(sources, s) + } + } + } + return sources +} + // HasSystemSource returns true if any of the sources are internal, system sources. func (a Sources) HasSystemSource() bool { for _, s := range a { diff --git a/tsdb/shard.go b/tsdb/shard.go index c18a298c5c..f7e710d99a 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -457,6 +457,7 @@ func (s *Shard) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, if influxql.Sources(opt.Sources).HasSystemSource() { return s.createSystemIterator(opt) } + opt.Sources = influxql.Sources(opt.Sources).Filter(s.database, s.retentionPolicy) return s.engine.CreateIterator(opt) } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index c55a168b04..3dbb007a93 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -221,10 +221,14 @@ cpu,host=serverB,region=uswest value=25 0 Expr: influxql.MustParseExpr(`value`), Aux: []influxql.VarRef{{Val: "val2"}}, Dimensions: []string{"host"}, - Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, - Ascending: true, - StartTime: influxql.MinTime, - EndTime: influxql.MaxTime, + Sources: []influxql.Source{&influxql.Measurement{ + Name: "cpu", + Database: "db0", + RetentionPolicy: "rp0", + }}, + Ascending: true, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, }) if err != nil { t.Fatal(err) @@ -297,10 +301,14 @@ cpu,host=serverB,region=uswest value=25 0 Expr: influxql.MustParseExpr(`value`), Aux: []influxql.VarRef{{Val: "val2"}}, Dimensions: []string{"host"}, - Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, - Ascending: false, - StartTime: influxql.MinTime, - EndTime: influxql.MaxTime, + Sources: []influxql.Source{&influxql.Measurement{ + Name: "cpu", + Database: "db0", + RetentionPolicy: "rp0", + }}, + Ascending: false, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, }) if err != nil { t.Fatal(err) @@ -494,8 +502,8 @@ func NewShard() *Shard { return &Shard{ Shard: tsdb.NewShard(0, tsdb.NewDatabaseIndex("db"), - filepath.Join(path, "data"), - filepath.Join(path, "wal"), + filepath.Join(path, "data", "db0", "rp0", "1"), + filepath.Join(path, "wal", "db0", "rp0", "1"), opt, ), path: path, diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 45ffc0c4ed..087ea6b25f 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -250,10 +250,14 @@ func TestShards_CreateIterator(t *testing.T) { itr, err := ics.CreateIterator(influxql.IteratorOptions{ Expr: influxql.MustParseExpr(`value`), Dimensions: []string{"host"}, - Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, - Ascending: true, - StartTime: influxql.MinTime, - EndTime: influxql.MaxTime, + Sources: []influxql.Source{&influxql.Measurement{ + Name: "cpu", + Database: "db0", + RetentionPolicy: "rp0", + }}, + Ascending: true, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, }) if err != nil { t.Fatal(err) @@ -329,8 +333,12 @@ func TestStore_BackupRestoreShard(t *testing.T) { // Read data from itr, err := s1.Shard(100).CreateIterator(influxql.IteratorOptions{ - Expr: influxql.MustParseExpr(`value`), - Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, + Expr: influxql.MustParseExpr(`value`), + Sources: []influxql.Source{&influxql.Measurement{ + Name: "cpu", + Database: "db0", + RetentionPolicy: "rp0", + }}, Ascending: true, StartTime: influxql.MinTime, EndTime: influxql.MaxTime,