From d13d01b516a0d73591c21642c96312db8e15e422 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 26 Apr 2016 14:25:20 -0600 Subject: [PATCH] Allow deleting series by time on a shard --- tsdb/engine.go | 2 +- tsdb/engine/tsm1/engine.go | 9 +++++---- tsdb/engine/tsm1/engine_test.go | 3 ++- tsdb/shard.go | 4 ++-- tsdb/store.go | 3 ++- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/tsdb/engine.go b/tsdb/engine.go index e1f711d3ae..350571c905 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -34,7 +34,7 @@ type Engine interface { CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) WritePoints(points []models.Point) error - DeleteSeries(keys []string) error + DeleteSeries(keys []string, min, max int64) error DeleteMeasurement(name string, seriesKeys []string) error SeriesCount() (n int, err error) MeasurementFields(measurement string) *MeasurementFields diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 591d709159..a95255d014 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "log" + "math" "os" "path/filepath" "strings" @@ -368,7 +369,7 @@ func (e *Engine) WritePoints(points []models.Point) error { } // DeleteSeries deletes the series from the engine. -func (e *Engine) DeleteSeries(seriesKeys []string) error { +func (e *Engine) DeleteSeries(seriesKeys []string, min, max int64) error { e.mu.RLock() defer e.mu.RUnlock() @@ -387,7 +388,7 @@ func (e *Engine) DeleteSeries(seriesKeys []string) error { deleteKeys = append(deleteKeys, k) } } - if err := e.FileStore.Delete(deleteKeys); err != nil { + if err := e.FileStore.DeleteRange(deleteKeys, min, max); err != nil { return err } @@ -403,7 +404,7 @@ func (e *Engine) DeleteSeries(seriesKeys []string) error { } e.Cache.RUnlock() - e.Cache.Delete(walKeys) + e.Cache.DeleteRange(walKeys, min, max) // delete from the WAL _, err := e.WAL.Delete(walKeys) @@ -416,7 +417,7 @@ func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error { delete(e.measurementFields, name) e.mu.Unlock() - return e.DeleteSeries(seriesKeys) + return e.DeleteSeries(seriesKeys, math.MinInt64, math.MaxInt64) } // SeriesCount returns the number of series buckets on the shard. diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 5cbebae054..b529c2bf5f 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -5,6 +5,7 @@ import ( "bytes" "fmt" "io/ioutil" + "math" "math/rand" "os" "path/filepath" @@ -112,7 +113,7 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) { } // Remove series. - if err := e.DeleteSeries([]string{"cpu,host=A"}); err != nil { + if err := e.DeleteSeries([]string{"cpu,host=A"}, math.MinInt64, math.MaxInt64); err != nil { t.Fatalf("failed to delete series: %s", err.Error()) } diff --git a/tsdb/shard.go b/tsdb/shard.go index 00530e4fb8..809ca35c91 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -265,11 +265,11 @@ func (s *Shard) WritePoints(points []models.Point) error { } // DeleteSeries deletes a list of series. -func (s *Shard) DeleteSeries(seriesKeys []string) error { +func (s *Shard) DeleteSeries(seriesKeys []string, min, max int64) error { if s.closed() { return ErrEngineClosed } - return s.engine.DeleteSeries(seriesKeys) + return s.engine.DeleteSeries(seriesKeys, min, max) } // DeleteMeasurement deletes a measurement and all underlying series. diff --git a/tsdb/store.go b/tsdb/store.go index fe731291e3..d2f688c8fb 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -6,6 +6,7 @@ import ( "io" "io/ioutil" "log" + "math" "os" "path/filepath" "sort" @@ -605,7 +606,7 @@ func (s *Store) deleteSeries(database string, seriesKeys []string) error { if sh.database != database { continue } - if err := sh.DeleteSeries(seriesKeys); err != nil { + if err := sh.DeleteSeries(seriesKeys, math.MinInt64, math.MaxInt64); err != nil { return err } }