Return to original DELETE/DROP SERIES semantics

This reverts commit 59afd8cc90.
pull/9315/head
Edd Robinson 2018-01-03 12:11:17 +00:00
parent 2169ad680e
commit 286c8f4c09
9 changed files with 103 additions and 95 deletions

View File

@ -330,8 +330,8 @@ func (e *StatementExecutor) executeDeleteSeriesStatement(stmt *influxql.DeleteSe
// Convert "now()" to current time. // Convert "now()" to current time.
stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: time.Now().UTC()}) stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: time.Now().UTC()})
// Locally delete the series. The series will not be removed from the index. // Locally delete the series.
return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition, false) return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition)
} }
func (e *StatementExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement) error { func (e *StatementExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement) error {
@ -375,7 +375,7 @@ func (e *StatementExecutor) executeDropSeriesStatement(stmt *influxql.DropSeries
} }
// Locally drop the series. // Locally drop the series.
return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition, true) return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition)
} }
func (e *StatementExecutor) executeDropShardStatement(stmt *influxql.DropShardStatement) error { func (e *StatementExecutor) executeDropShardStatement(stmt *influxql.DropShardStatement) error {
@ -1375,7 +1375,7 @@ type TSDBStore interface {
DeleteDatabase(name string) error DeleteDatabase(name string) error
DeleteMeasurement(database, name string) error DeleteMeasurement(database, name string) error
DeleteRetentionPolicy(database, name string) error DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr, removeIndex bool) error DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShard(id uint64) error DeleteShard(id uint64) error
MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)

View File

@ -23,7 +23,7 @@ type TSDBStoreMock struct {
DeleteDatabaseFn func(name string) error DeleteDatabaseFn func(name string) error
DeleteMeasurementFn func(database, name string) error DeleteMeasurementFn func(database, name string) error
DeleteRetentionPolicyFn func(database, name string) error DeleteRetentionPolicyFn func(database, name string) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr, removeIndex bool) error DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShardFn func(id uint64) error DeleteShardFn func(id uint64) error
DiskSizeFn func() (int64, error) DiskSizeFn func() (int64, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error) ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
@ -77,8 +77,8 @@ func (s *TSDBStoreMock) DeleteMeasurement(database string, name string) error {
func (s *TSDBStoreMock) DeleteRetentionPolicy(database string, name string) error { func (s *TSDBStoreMock) DeleteRetentionPolicy(database string, name string) error {
return s.DeleteRetentionPolicyFn(database, name) return s.DeleteRetentionPolicyFn(database, name)
} }
func (s *TSDBStoreMock) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr, removeIndex bool) error { func (s *TSDBStoreMock) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
return s.DeleteSeriesFn(database, sources, condition, removeIndex) return s.DeleteSeriesFn(database, sources, condition)
} }
func (s *TSDBStoreMock) DeleteShard(shardID uint64) error { func (s *TSDBStoreMock) DeleteShard(shardID uint64) error {
return s.DeleteShardFn(shardID) return s.DeleteShardFn(shardID)

View File

@ -256,6 +256,18 @@ func init() {
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`, exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
params: url.Values{"db": []string{"db1"}}, params: url.Values{"db": []string{"db1"}},
}, },
&Query{
name: "Delete remaining instances of series",
command: `DELETE FROM cpu WHERE time < '2000-01-04T00:00:00Z'`,
exp: `{"results":[{"statement_id":0}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "Show series should now be empty",
command: `SHOW SERIES`,
exp: `{"results":[{"statement_id":0}]}`,
params: url.Values{"db": []string{"db0"}},
},
}, },
} }

View File

@ -54,7 +54,7 @@ type Engine interface {
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DeleteSeriesRange(itr SeriesIterator, min, max int64, removeIndex bool) error DeleteSeriesRange(itr SeriesIterator, min, max int64) error
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64 SeriesN() int64

View File

@ -1158,7 +1158,7 @@ func (e *Engine) WritePoints(points []models.Point) error {
} }
// DeleteSeriesRange removes the values between min and max (inclusive) from all series // DeleteSeriesRange removes the values between min and max (inclusive) from all series
func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64, removeIndex bool) error { func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) error {
var disableOnce bool var disableOnce bool
var sz int var sz int
@ -1195,7 +1195,7 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64, remo
if sz >= deleteFlushThreshold { if sz >= deleteFlushThreshold {
// Delete all matching batch. // Delete all matching batch.
if err := e.deleteSeriesRange(batch, min, max, removeIndex); err != nil { if err := e.deleteSeriesRange(batch, min, max); err != nil {
return err return err
} }
batch = batch[:0] batch = batch[:0]
@ -1205,24 +1205,20 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64, remo
if len(batch) > 0 { if len(batch) > 0 {
// Delete all matching batch. // Delete all matching batch.
if err := e.deleteSeriesRange(batch, min, max, removeIndex); err != nil { if err := e.deleteSeriesRange(batch, min, max); err != nil {
return err return err
} }
batch = batch[:0] batch = batch[:0]
} }
if removeIndex { e.index.Rebuild()
e.index.Rebuild()
}
return nil return nil
} }
// deleteSeriesRange removes the values between min and max (inclusive) from all // deleteSeriesRange removes the values between min and max (inclusive) from all series. This
// series in the TSM engine. If removeIndex is true, then series will also be // does not update the index or disable compactions. This should mainly be called by DeleteSeriesRange
// removed from the index. // and not directly.
// func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
// This should mainly be called by DeleteSeriesRange and not directly.
func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64, removeIndex bool) error {
ts := time.Now().UTC().UnixNano() ts := time.Now().UTC().UnixNano()
if len(seriesKeys) == 0 { if len(seriesKeys) == 0 {
return nil return nil
@ -1383,7 +1379,8 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64, removeIn
i++ i++
} }
if hasCacheValues || !removeIndex { // Some cache values still exists, leave the series in the index.
if hasCacheValues {
continue continue
} }
@ -1465,8 +1462,7 @@ func (e *Engine) deleteMeasurement(name []byte) error {
return nil return nil
} }
defer itr.Close() defer itr.Close()
// Delete all associated series and remove them from the index. return e.DeleteSeriesRange(tsdb.NewSeriesIteratorAdapter(e.sfile, itr), math.MinInt64, math.MaxInt64)
return e.DeleteSeriesRange(tsdb.NewSeriesIteratorAdapter(e.sfile, itr), math.MinInt64, math.MaxInt64, true)
} }
// ForEachMeasurementName iterates over each measurement name in the engine. // ForEachMeasurementName iterates over each measurement name in the engine.

View File

@ -47,7 +47,7 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) {
// Remove series. // Remove series.
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64, false); err != nil { if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil {
t.Fatalf("failed to delete series: %s", err.Error()) t.Fatalf("failed to delete series: %s", err.Error())
} }
@ -1024,7 +1024,7 @@ func TestIndex_SeriesIDSet(t *testing.T) {
} }
// Ensures that deleting series from TSM files with multiple fields removes all the // Ensures that deleting series from TSM files with multiple fields removes all the
// series from the TSM files but leaves the series in the index intact. /// series
func TestEngine_DeleteSeries(t *testing.T) { func TestEngine_DeleteSeries(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() { for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { t.Run(index, func(t *testing.T) {
@ -1058,7 +1058,7 @@ func TestEngine_DeleteSeries(t *testing.T) {
} }
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64, false); err != nil { if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil {
t.Fatalf("failed to delete series: %v", err) t.Fatalf("failed to delete series: %v", err)
} }
@ -1071,56 +1071,22 @@ func TestEngine_DeleteSeries(t *testing.T) {
if _, ok := keys[exp]; !ok { if _, ok := keys[exp]; !ok {
t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys) t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys)
} }
// Deleting all the TSM values for a single series should still leave
// the series in the index intact.
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
if err != nil {
t.Fatalf("iterator error: %v", err)
} else if iter == nil {
t.Fatal("nil iterator")
}
defer iter.Close()
var gotKeys []string
expKeys := []string{"cpu,host=A", "cpu,host=B"}
for {
elem, err := iter.Next()
if err != nil {
t.Fatal(err)
}
if elem.SeriesID == 0 {
break
}
// Lookup series.
name, tags := e.sfile.Series(elem.SeriesID)
gotKeys = append(gotKeys, string(models.MakeKey(name, tags)))
}
if !reflect.DeepEqual(gotKeys, expKeys) {
t.Fatalf("got keys %v, expected %v", gotKeys, expKeys)
}
}) })
} }
} }
// Ensures that deleting series from TSM files over a range of time deleted the
// series from the TSM files but leaves the series in the index.
func TestEngine_DeleteSeriesRange(t *testing.T) { func TestEngine_DeleteSeriesRange(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() { for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { t.Run(index, func(t *testing.T) {
// Create a few points. // Create a few points.
p1 := MustParsePointString("cpu,host=0 value=1.1 6000000000") p1 := MustParsePointString("cpu,host=0 value=1.1 6000000000") // Should not be deleted
p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") p2 := MustParsePointString("cpu,host=A value=1.2 2000000000")
p3 := MustParsePointString("cpu,host=A value=1.3 3000000000") p3 := MustParsePointString("cpu,host=A value=1.3 3000000000")
p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") // Should not be deleted
p5 := MustParsePointString("cpu,host=B value=1.3 5000000000") p5 := MustParsePointString("cpu,host=B value=1.3 5000000000") // Should not be deleted
p6 := MustParsePointString("cpu,host=C value=1.3 1000000000") p6 := MustParsePointString("cpu,host=C value=1.3 1000000000")
p7 := MustParsePointString("mem,host=C value=1.3 1000000000") p7 := MustParsePointString("mem,host=C value=1.3 1000000000") // Should not be deleted
p8 := MustParsePointString("disk,host=C value=1.3 1000000000") p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted
e, err := NewEngine(index) e, err := NewEngine(index)
if err != nil { if err != nil {
@ -1153,7 +1119,7 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
} }
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C")}} itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C")}}
if err := e.DeleteSeriesRange(itr, 0, 3000000000, false); err != nil { if err := e.DeleteSeriesRange(itr, 0, 3000000000); err != nil {
t.Fatalf("failed to delete series: %v", err) t.Fatalf("failed to delete series: %v", err)
} }
@ -1167,39 +1133,73 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys) t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys)
} }
// Deleting all the TSM values for a single series should still leave // Check that the series still exists in the index
// the series in the index intact.
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu")) iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
if err != nil { if err != nil {
t.Fatalf("iterator error: %v", err) t.Fatalf("iterator error: %v", err)
} else if iter == nil {
t.Fatal("nil iterator")
} }
defer iter.Close() defer iter.Close()
var gotKeys []string elem, err := iter.Next()
expKeys := []string{"cpu,host=0", "cpu,host=A", "cpu,host=B", "cpu,host=C"} if err != nil {
t.Fatal(err)
}
if elem.SeriesID == 0 {
t.Fatalf("series index mismatch: EOF, exp 2 series")
}
for { // Lookup series.
elem, err := iter.Next() name, tags := e.sfile.Series(elem.SeriesID)
if err != nil { if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
t.Fatal(err) t.Fatalf("series mismatch: got %s, exp %s", got, exp)
} }
if elem.SeriesID == 0 {
break
}
// Lookup series. if got, exp := tags, models.NewTags(map[string]string{"host": "0"}); !got.Equal(exp) {
name, tags := e.sfile.Series(elem.SeriesID) t.Fatalf("series mismatch: got %s, exp %s", got, exp)
gotKeys = append(gotKeys, string(models.MakeKey(name, tags))) }
if elem, err = iter.Next(); err != nil {
t.Fatal(err)
}
if elem.SeriesID == 0 {
t.Fatalf("series index mismatch: EOF, exp 2 series")
}
// Lookup series.
name, tags = e.sfile.Series(elem.SeriesID)
if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
t.Fatalf("series mismatch: got %s, exp %s", got, exp)
}
if got, exp := tags, models.NewTags(map[string]string{"host": "B"}); !got.Equal(exp) {
t.Fatalf("series mismatch: got %s, exp %s", got, exp)
} }
sort.Strings(gotKeys) sort.Strings(gotKeys)
if !reflect.DeepEqual(gotKeys, expKeys) { iter.Close()
t.Fatalf("got keys %v, expected %v", gotKeys, expKeys)
// Deleting remaining series should remove them from the series.
itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=B")}}
if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil {
t.Fatalf("failed to delete series: %v", err)
} }
indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
t.Fatalf("iterator error: %v", err)
}
if iter == nil {
return
}
defer iter.Close()
if elem, err = iter.Next(); err != nil {
t.Fatal(err)
}
if elem.SeriesID != 0 {
t.Fatalf("got an undeleted series id, but series should be dropped from index")
}
}) })
} }
} }
@ -1241,7 +1241,7 @@ func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) {
} }
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
if err := e.DeleteSeriesRange(itr, 0, 0, false); err != nil { if err := e.DeleteSeriesRange(itr, 0, 0); err != nil {
t.Fatalf("failed to delete series: %v", err) t.Fatalf("failed to delete series: %v", err)
} }
@ -1325,7 +1325,7 @@ func TestEngine_LastModified(t *testing.T) {
} }
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64, false); err != nil { if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil {
t.Fatalf("failed to delete series: %v", err) t.Fatalf("failed to delete series: %v", err)
} }

View File

@ -719,12 +719,12 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error
} }
// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive) // DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
func (s *Shard) DeleteSeriesRange(itr SeriesIterator, min, max int64, removeIndex bool) error { func (s *Shard) DeleteSeriesRange(itr SeriesIterator, min, max int64) error {
engine, err := s.engine() engine, err := s.engine()
if err != nil { if err != nil {
return err return err
} }
return engine.DeleteSeriesRange(itr, min, max, removeIndex) return engine.DeleteSeriesRange(itr, min, max)
} }
// DeleteMeasurement deletes a measurement and all underlying series. // DeleteMeasurement deletes a measurement and all underlying series.

View File

@ -937,7 +937,7 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) {
// DeleteSeries loops through the local shards and deletes the series data for // DeleteSeries loops through the local shards and deletes the series data for
// the passed in series keys. // the passed in series keys.
func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr, removeIndex bool) error { func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
// Expand regex expressions in the FROM clause. // Expand regex expressions in the FROM clause.
a, err := s.ExpandSources(sources) a, err := s.ExpandSources(sources)
if err != nil { if err != nil {
@ -1015,7 +1015,7 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
continue continue
} }
defer itr.Close() defer itr.Close()
if err := sh.DeleteSeriesRange(NewSeriesIteratorAdapter(sfile, itr), min, max, removeIndex); err != nil { if err := sh.DeleteSeriesRange(NewSeriesIteratorAdapter(sfile, itr), min, max); err != nil {
return err return err
} }

View File

@ -579,7 +579,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) {
} }
for _, name := range mnames { for _, name := range mnames {
if err := store.DeleteSeries("db", []influxql.Source{&influxql.Measurement{Name: string(name)}}, nil, true); err != nil { if err := store.DeleteSeries("db", []influxql.Source{&influxql.Measurement{Name: string(name)}}, nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -1038,7 +1038,7 @@ func TestStore_Measurements_Auth(t *testing.T) {
return err return err
} }
if err := s.DeleteSeries("db0", nil, cond, true); err != nil { if err := s.DeleteSeries("db0", nil, cond); err != nil {
return err return err
} }
@ -1130,7 +1130,7 @@ func TestStore_TagKeys_Auth(t *testing.T) {
if err != nil { if err != nil {
return err return err
} }
if err := s.DeleteSeries("db0", nil, cond, true); err != nil { if err := s.DeleteSeries("db0", nil, cond); err != nil {
return err return err
} }
@ -1233,7 +1233,7 @@ func TestStore_TagValues_Auth(t *testing.T) {
if err != nil { if err != nil {
return err return err
} }
if err := s.DeleteSeries("db0", nil, cond, true); err != nil { if err := s.DeleteSeries("db0", nil, cond); err != nil {
return err return err
} }