Merge pull request #6705 from influxdata/js-6701-duplicate-points-with-select

Filter out sources that do not match the shard database/retention policy
pull/6586/head
Jonathan A. Sternberg 2016-05-24 09:48:31 -04:00
commit 32e42b93ae
6 changed files with 103 additions and 16 deletions

View File

@ -42,6 +42,7 @@
- [#6676](https://github.com/influxdata/influxdb/issues/6676): Ensures client sends correct precision when inserting points.
- [#2048](https://github.com/influxdata/influxdb/issues/2048): Check that retention policies exist before creating CQ
- [#6702](https://github.com/influxdata/influxdb/issues/6702): Fix SELECT statement required privileges.
- [#6701](https://github.com/influxdata/influxdb/issues/6701): Filter out sources that do not match the shard database/retention policy.
## v0.13.0 [2016-05-12]

View File

@ -6094,6 +6094,61 @@ func TestServer_Query_IntoTarget(t *testing.T) {
}
}
// This test ensures that data is not duplicated with measurements
// of the same name.
func TestServer_Query_DuplicateMeasurements(t *testing.T) {
t.Parallel()
s := OpenDefaultServer(NewConfig())
defer s.Close()
// Create a second database.
if err := s.CreateDatabaseAndRetentionPolicy("db1", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaClient.SetDefaultRetentionPolicy("db1", "rp0"); err != nil {
t.Fatal(err)
}
test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: fmt.Sprintf(`cpu value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())},
}
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
test = NewTest("db1", "rp0")
test.writes = Writes{
&Write{data: fmt.Sprintf(`cpu value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano())},
}
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
test.addQueries([]*Query{
&Query{
name: "select from both databases",
params: url.Values{"db": []string{"db0"}},
command: `SELECT value FROM db0.rp0.cpu, db1.rp0.cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:00Z",1],["2000-01-01T00:00:10Z",2]]}]}]}`,
},
}...)
for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
// This test reproduced a data race with closing the
// Subscriber points channel while writes were in-flight in the PointsWriter.
func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) {

View File

@ -311,6 +311,20 @@ func (a Sources) Names() []string {
return names
}
// Filter returns a list of source names filtered by the database/retention policy.
func (a Sources) Filter(database, retentionPolicy string) []Source {
sources := make([]Source, 0, len(a))
for _, s := range a {
switch s := s.(type) {
case *Measurement:
if s.Database == database && s.RetentionPolicy == retentionPolicy {
sources = append(sources, s)
}
}
}
return sources
}
// HasSystemSource returns true if any of the sources are internal, system sources.
func (a Sources) HasSystemSource() bool {
for _, s := range a {

View File

@ -457,6 +457,7 @@ func (s *Shard) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator,
if influxql.Sources(opt.Sources).HasSystemSource() {
return s.createSystemIterator(opt)
}
opt.Sources = influxql.Sources(opt.Sources).Filter(s.database, s.retentionPolicy)
return s.engine.CreateIterator(opt)
}

View File

@ -221,10 +221,14 @@ cpu,host=serverB,region=uswest value=25 0
Expr: influxql.MustParseExpr(`value`),
Aux: []influxql.VarRef{{Val: "val2"}},
Dimensions: []string{"host"},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Sources: []influxql.Source{&influxql.Measurement{
Name: "cpu",
Database: "db0",
RetentionPolicy: "rp0",
}},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
})
if err != nil {
t.Fatal(err)
@ -297,10 +301,14 @@ cpu,host=serverB,region=uswest value=25 0
Expr: influxql.MustParseExpr(`value`),
Aux: []influxql.VarRef{{Val: "val2"}},
Dimensions: []string{"host"},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Ascending: false,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Sources: []influxql.Source{&influxql.Measurement{
Name: "cpu",
Database: "db0",
RetentionPolicy: "rp0",
}},
Ascending: false,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
})
if err != nil {
t.Fatal(err)
@ -494,8 +502,8 @@ func NewShard() *Shard {
return &Shard{
Shard: tsdb.NewShard(0,
tsdb.NewDatabaseIndex("db"),
filepath.Join(path, "data"),
filepath.Join(path, "wal"),
filepath.Join(path, "data", "db0", "rp0", "1"),
filepath.Join(path, "wal", "db0", "rp0", "1"),
opt,
),
path: path,

View File

@ -250,10 +250,14 @@ func TestShards_CreateIterator(t *testing.T) {
itr, err := ics.CreateIterator(influxql.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Dimensions: []string{"host"},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Sources: []influxql.Source{&influxql.Measurement{
Name: "cpu",
Database: "db0",
RetentionPolicy: "rp0",
}},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
})
if err != nil {
t.Fatal(err)
@ -329,8 +333,12 @@ func TestStore_BackupRestoreShard(t *testing.T) {
// Read data from
itr, err := s1.Shard(100).CreateIterator(influxql.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Expr: influxql.MustParseExpr(`value`),
Sources: []influxql.Source{&influxql.Measurement{
Name: "cpu",
Database: "db0",
RetentionPolicy: "rp0",
}},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,