Merge pull request #8696 from influxdata/er-8678-tsi1-where
Allow tag filtering when using DELETE with tsi1pull/8863/head
commit
e0caa2e0fd
|
@ -56,6 +56,7 @@
|
|||
- [#8787](https://github.com/influxdata/influxdb/issues/8787): panic: runtime error: invalid memory address or nil pointer dereference.
|
||||
- [#8697](https://github.com/influxdata/influxdb/issues/8697): Drop Series Cause Write Fail/Write Timeouts/High Memory Usage
|
||||
- [#8741](https://github.com/influxdata/influxdb/issues/8741): Fix increased memory usage in cache and wal readers
|
||||
- [#8678](https://github.com/influxdata/influxdb/issues/8678): Ensure time and tag-based condition can be used with tsi1 index when deleting.
|
||||
|
||||
## v1.3.4 [unreleased]
|
||||
|
||||
|
@ -65,6 +66,7 @@
|
|||
- [#8713](https://github.com/influxdata/influxdb/issues/8713): Deadlock when dropping measurement and writing
|
||||
- [#8726](https://github.com/influxdata/influxdb/pull/8726): Fix leaking tmp file when large compaction aborted
|
||||
|
||||
|
||||
## v1.3.3 [unreleased]
|
||||
|
||||
### Bugfixes
|
||||
|
|
|
@ -215,7 +215,7 @@ func init() {
|
|||
},
|
||||
}
|
||||
|
||||
tests["delete_series"] = Test{
|
||||
tests["delete_series_time"] = Test{
|
||||
db: "db0",
|
||||
rp: "rp0",
|
||||
writes: Writes{
|
||||
|
@ -259,6 +259,51 @@ func init() {
|
|||
},
|
||||
}
|
||||
|
||||
tests["delete_series_time_tag_filter"] = Test{
|
||||
db: "db0",
|
||||
rp: "rp0",
|
||||
writes: Writes{
|
||||
&Write{data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())},
|
||||
&Write{data: fmt.Sprintf(`cpu,host=serverB,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())},
|
||||
&Write{data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=100 %d`, mustParseTime(time.RFC3339Nano, "2000-01-02T00:00:00Z").UnixNano())},
|
||||
&Write{data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=200 %d`, mustParseTime(time.RFC3339Nano, "2000-01-03T00:00:00Z").UnixNano())},
|
||||
&Write{db: "db1", data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())},
|
||||
},
|
||||
queries: []*Query{
|
||||
&Query{
|
||||
name: "Show series is present",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"],["cpu,host=serverB,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Delete series",
|
||||
command: `DELETE FROM cpu WHERE host = 'serverA' AND time < '2000-01-03T00:00:00Z'`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "Show series still exists",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"],["cpu,host=serverB,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Make sure last point still exists",
|
||||
command: `SELECT * FROM cpu`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverB","uswest",23.2],["2000-01-03T00:00:00Z","serverA","uswest",200]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Make sure data wasn't deleted from other database.",
|
||||
command: `SELECT * FROM cpu`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db1"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tests["drop_and_recreate_series"] = Test{
|
||||
db: "db0",
|
||||
rp: "rp0",
|
||||
|
|
|
@ -161,7 +161,36 @@ func TestServer_Query_DeleteSeries(t *testing.T) {
|
|||
s := OpenServer(NewConfig())
|
||||
defer s.Close()
|
||||
|
||||
test := tests.load(t, "delete_series")
|
||||
test := tests.load(t, "delete_series_time")
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicySpec(test.retentionPolicy(), 1, 0), true); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for i, query := range test.queries {
|
||||
if i == 0 {
|
||||
if err := test.init(s); err != nil {
|
||||
t.Fatalf("test init failed: %s", err)
|
||||
}
|
||||
}
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
if err := query.Execute(s); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_Query_DeleteSeries_TagFilter(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := OpenServer(NewConfig())
|
||||
defer s.Close()
|
||||
|
||||
test := tests.load(t, "delete_series_time_tag_filter")
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicySpec(test.retentionPolicy(), 1, 0), true); err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
|
@ -783,8 +783,10 @@ func (fs *FileSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr,
|
|||
// Check for unsupported field filters.
|
||||
// Any remaining filters means there were fields (e.g., `WHERE value = 1.2`).
|
||||
if e.Expr() != nil {
|
||||
if v, ok := e.Expr().(*influxql.BooleanLiteral); !ok || !v.Val {
|
||||
return nil, errors.New("fields not supported in WHERE clause during deletion")
|
||||
}
|
||||
}
|
||||
|
||||
keys = append(keys, models.MakeKey(e.Name(), e.Tags()))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue