From 4eeb81ef38e2ca6bb66a651449e8e550f10c9aca Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 12 Sep 2016 09:26:43 -0600 Subject: [PATCH] Add SeriesList tombstoning. --- tsdb/engine/tsi1/series.go | 39 +++++++++++++++++++++------------ tsdb/engine/tsi1/series_test.go | 8 +++++-- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/tsdb/engine/tsi1/series.go b/tsdb/engine/tsi1/series.go index e2f38e4da7..12048e78d7 100644 --- a/tsdb/engine/tsi1/series.go +++ b/tsdb/engine/tsi1/series.go @@ -22,6 +22,11 @@ const ( SeriesCountSize = 4 ) +// Series flag constants. +const ( + SeriesTombstoneFlag = 0x01 +) + // SeriesList represents the section of the index which holds the term // dictionary and a sorted list of series keys. type SeriesList struct { @@ -31,13 +36,13 @@ type SeriesList struct { // SeriesOffset returns offset of the encoded series key. // Returns 0 if the key does not exist in the series list. -func (l *SeriesList) SeriesOffset(key []byte) uint32 { - offset := uint32(len(l.termList) + SeriesCountSize) +func (l *SeriesList) SeriesOffset(key []byte) (offset uint32, deleted bool) { + offset = uint32(len(l.termList) + SeriesCountSize) data := l.seriesData[SeriesCountSize:] for i, n := uint32(0), l.SeriesCount(); i < n; i++ { // Read series flag. - // flag := data[0] + flag := data[0] data = data[1:] // Read series length. @@ -46,7 +51,8 @@ func (l *SeriesList) SeriesOffset(key []byte) uint32 { // Return offset if the series key matches. if bytes.Equal(key, data[:ln]) { - return offset + deleted = (flag & SeriesTombstoneFlag) != 0 + return offset, deleted } // Update offset & move data forward. @@ -54,7 +60,7 @@ func (l *SeriesList) SeriesOffset(key []byte) uint32 { offset += uint32(ln) + uint32(sz) } - return 0 + return 0, false } // EncodeSeries returns a dictionary-encoded series key. @@ -203,6 +209,15 @@ func NewSeriesListWriter() *SeriesListWriter { // Add adds a series to the writer's set. // Returns an ErrSeriesOverflow if no more series can be held in the writer. func (sw *SeriesListWriter) Add(name string, tags models.Tags) error { + return sw.append(name, tags, false) +} + +// Delete marks a series as tombstoned. +func (sw *SeriesListWriter) Delete(name string, tags models.Tags) error { + return sw.append(name, tags, true) +} + +func (sw *SeriesListWriter) append(name string, tags models.Tags, deleted bool) error { // Ensure writer doesn't add too many series. if len(sw.series) == math.MaxUint32 { return ErrSeriesOverflow @@ -216,16 +231,11 @@ func (sw *SeriesListWriter) Add(name string, tags models.Tags) error { } // Append series to list. - sw.series = append(sw.series, serie{name: name, tags: tags}) + sw.series = append(sw.series, serie{name: name, tags: tags, deleted: deleted}) return nil } -// Delete marks a series as tombstoned. -func (sw *SeriesListWriter) Delete(name string, tags models.Tags) error { - panic("TODO") -} - // WriteTo computes the dictionary encoding of the series and writes to w. func (sw *SeriesListWriter) WriteTo(w io.Writer) (n int64, err error) { terms := newTermList(sw.terms) @@ -357,9 +367,10 @@ func (sw *SeriesListWriter) writeTrailerTo(w io.Writer, termListOffset, seriesDa } type serie struct { - name string - tags models.Tags - offset uint32 + name string + tags models.Tags + deleted bool + offset uint32 } type series []serie diff --git a/tsdb/engine/tsi1/series_test.go b/tsdb/engine/tsi1/series_test.go index 69035b1825..d5c24ce3ae 100644 --- a/tsdb/engine/tsi1/series_test.go +++ b/tsdb/engine/tsi1/series_test.go @@ -83,14 +83,18 @@ func TestSeriesList_Series(t *testing.T) { // Verify all series exist. for i, s := range series { - if offset := l.SeriesOffset(l.EncodeSeries(s.Name, s.Tags)); offset == 0 { + if offset, deleted := l.SeriesOffset(l.EncodeSeries(s.Name, s.Tags)); offset == 0 { t.Fatalf("series does not exist: i=%d", i) + } else if deleted { + t.Fatalf("series deleted: i=%d", i) } } // Verify non-existent series doesn't exist. - if offset := l.SeriesOffset(l.EncodeSeries("foo", models.NewTags(map[string]string{"region": "north"}))); offset != 0 { + if offset, deleted := l.SeriesOffset(l.EncodeSeries("foo", models.NewTags(map[string]string{"region": "north"}))); offset != 0 { t.Fatalf("series should not exist: offset=%d", offset) + } else if deleted { + t.Fatalf("series should not be deleted") } }