Add SeriesList tombstoning.

pull/7913/head
Ben Johnson 2016-09-12 09:26:43 -06:00
parent 2c34b24f5c
commit 4eeb81ef38
No known key found for this signature in database
GPG Key ID: 81741CD251883081
2 changed files with 31 additions and 16 deletions

View File

@ -22,6 +22,11 @@ const (
SeriesCountSize = 4
)
// Series flag constants.
const (
SeriesTombstoneFlag = 0x01
)
// SeriesList represents the section of the index which holds the term
// dictionary and a sorted list of series keys.
type SeriesList struct {
@ -31,13 +36,13 @@ type SeriesList struct {
// SeriesOffset returns offset of the encoded series key.
// Returns 0 if the key does not exist in the series list.
func (l *SeriesList) SeriesOffset(key []byte) uint32 {
offset := uint32(len(l.termList) + SeriesCountSize)
func (l *SeriesList) SeriesOffset(key []byte) (offset uint32, deleted bool) {
offset = uint32(len(l.termList) + SeriesCountSize)
data := l.seriesData[SeriesCountSize:]
for i, n := uint32(0), l.SeriesCount(); i < n; i++ {
// Read series flag.
// flag := data[0]
flag := data[0]
data = data[1:]
// Read series length.
@ -46,7 +51,8 @@ func (l *SeriesList) SeriesOffset(key []byte) uint32 {
// Return offset if the series key matches.
if bytes.Equal(key, data[:ln]) {
return offset
deleted = (flag & SeriesTombstoneFlag) != 0
return offset, deleted
}
// Update offset & move data forward.
@ -54,7 +60,7 @@ func (l *SeriesList) SeriesOffset(key []byte) uint32 {
offset += uint32(ln) + uint32(sz)
}
return 0
return 0, false
}
// EncodeSeries returns a dictionary-encoded series key.
@ -203,6 +209,15 @@ func NewSeriesListWriter() *SeriesListWriter {
// Add adds a series to the writer's set.
// Returns an ErrSeriesOverflow if no more series can be held in the writer.
func (sw *SeriesListWriter) Add(name string, tags models.Tags) error {
return sw.append(name, tags, false)
}
// Delete marks a series as tombstoned.
func (sw *SeriesListWriter) Delete(name string, tags models.Tags) error {
return sw.append(name, tags, true)
}
func (sw *SeriesListWriter) append(name string, tags models.Tags, deleted bool) error {
// Ensure writer doesn't add too many series.
if len(sw.series) == math.MaxUint32 {
return ErrSeriesOverflow
@ -216,16 +231,11 @@ func (sw *SeriesListWriter) Add(name string, tags models.Tags) error {
}
// Append series to list.
sw.series = append(sw.series, serie{name: name, tags: tags})
sw.series = append(sw.series, serie{name: name, tags: tags, deleted: deleted})
return nil
}
// Delete marks a series as tombstoned.
func (sw *SeriesListWriter) Delete(name string, tags models.Tags) error {
panic("TODO")
}
// WriteTo computes the dictionary encoding of the series and writes to w.
func (sw *SeriesListWriter) WriteTo(w io.Writer) (n int64, err error) {
terms := newTermList(sw.terms)
@ -357,9 +367,10 @@ func (sw *SeriesListWriter) writeTrailerTo(w io.Writer, termListOffset, seriesDa
}
type serie struct {
name string
tags models.Tags
offset uint32
name string
tags models.Tags
deleted bool
offset uint32
}
type series []serie

View File

@ -83,14 +83,18 @@ func TestSeriesList_Series(t *testing.T) {
// Verify all series exist.
for i, s := range series {
if offset := l.SeriesOffset(l.EncodeSeries(s.Name, s.Tags)); offset == 0 {
if offset, deleted := l.SeriesOffset(l.EncodeSeries(s.Name, s.Tags)); offset == 0 {
t.Fatalf("series does not exist: i=%d", i)
} else if deleted {
t.Fatalf("series deleted: i=%d", i)
}
}
// Verify non-existent series doesn't exist.
if offset := l.SeriesOffset(l.EncodeSeries("foo", models.NewTags(map[string]string{"region": "north"}))); offset != 0 {
if offset, deleted := l.SeriesOffset(l.EncodeSeries("foo", models.NewTags(map[string]string{"region": "north"}))); offset != 0 {
t.Fatalf("series should not exist: offset=%d", offset)
} else if deleted {
t.Fatalf("series should not be deleted")
}
}