Allow tag filtering when using DELETE with tsi1

pull/8696/head
Edd Robinson 2017-08-11 12:36:12 +01:00
parent 9e0e62323a
commit 45969ef3c6
4 changed files with 80 additions and 3 deletions

View File

@ -25,6 +25,7 @@
- [#8097](https://github.com/influxdata/influxdb/pull/8097): Return query parsing errors in CSV formats.
- [#8607](https://github.com/influxdata/influxdb/issues/8607): Fix time zone shifts when the shift happens on a time zone boundary.
- [#8639](https://github.com/influxdata/influxdb/issues/8639): Parse time literals using the time zone in the select statement.
- [#8678](https://github.com/influxdata/influxdb/issues/8678): Ensure time and tag-based condition can be used with tsi1 index when deleting.
## v1.3.3 [unreleased]

View File

@ -215,7 +215,7 @@ func init() {
},
}
tests["delete_series"] = Test{
tests["delete_series_time"] = Test{
db: "db0",
rp: "rp0",
writes: Writes{
@ -259,6 +259,51 @@ func init() {
},
}
tests["delete_series_time_tag_filter"] = Test{
db: "db0",
rp: "rp0",
writes: Writes{
&Write{data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())},
&Write{data: fmt.Sprintf(`cpu,host=serverB,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())},
&Write{data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=100 %d`, mustParseTime(time.RFC3339Nano, "2000-01-02T00:00:00Z").UnixNano())},
&Write{data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=200 %d`, mustParseTime(time.RFC3339Nano, "2000-01-03T00:00:00Z").UnixNano())},
&Write{db: "db1", data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())},
},
queries: []*Query{
&Query{
name: "Show series is present",
command: `SHOW SERIES`,
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"],["cpu,host=serverB,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "Delete series",
command: `DELETE FROM cpu WHERE host = 'serverA' AND time < '2000-01-03T00:00:00Z'`,
exp: `{"results":[{"statement_id":0}]}`,
params: url.Values{"db": []string{"db0"}},
once: true,
},
&Query{
name: "Show series still exists",
command: `SHOW SERIES`,
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"],["cpu,host=serverB,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "Make sure last point still exists",
command: `SELECT * FROM cpu`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverB","uswest",23.2],["2000-01-03T00:00:00Z","serverA","uswest",200]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "Make sure data wasn't deleted from other database.",
command: `SELECT * FROM cpu`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
params: url.Values{"db": []string{"db1"}},
},
},
}
tests["drop_and_recreate_series"] = Test{
db: "db0",
rp: "rp0",

View File

@ -144,7 +144,36 @@ func TestServer_Query_DeleteSeries(t *testing.T) {
s := OpenServer(NewConfig())
defer s.Close()
test := tests.load(t, "delete_series")
test := tests.load(t, "delete_series_time")
if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicySpec(test.retentionPolicy(), 1, 0), true); err != nil {
t.Fatal(err)
}
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
func TestServer_Query_DeleteSeries_TagFilter(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()
test := tests.load(t, "delete_series_time_tag_filter")
if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicySpec(test.retentionPolicy(), 1, 0), true); err != nil {
t.Fatal(err)

View File

@ -758,8 +758,10 @@ func (fs *FileSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr,
// Check for unsupported field filters.
// Any remaining filters means there were fields (e.g., `WHERE value = 1.2`).
if e.Expr() != nil {
if v, ok := e.Expr().(*influxql.BooleanLiteral); !ok || !v.Val {
return nil, errors.New("fields not supported in WHERE clause during deletion")
}
}
keys = append(keys, models.MakeKey(e.Name(), e.Tags()))
}