From 3034d3fb5481ed58fa0306693be46a07420a66bd Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 25 Oct 2017 07:29:44 -0600 Subject: [PATCH] intermediate --- tsdb/index/tsi1/log_file.go | 69 ++++------------------------------ tsdb/index/tsi1/partition.go | 4 +- tsdb/index/tsi1/series_file.go | 21 +++++++++-- 3 files changed, 27 insertions(+), 67 deletions(-) diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 2aa3910cfb..7365baeaba 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -468,42 +468,6 @@ func (f *LogFile) AddSeriesList(names [][]byte, tagsSlice []models.Tags) error { return nil } -/* -// AddSeries adds a series to the log file. -func (f *LogFile) AddSeries(name []byte, tags models.Tags) error { - seriesID, err := f.sfile.CreateSeriesIfNotExists(name, tags, nil) - if err != nil { - return err - } - - f.mu.Lock() - defer f.mu.Unlock() - - e := LogEntry{SeriesID: seriesID} - if err := f.appendEntry(&e); err != nil { - return err - } - f.execEntry(&e) - return nil -} -*/ -// DeleteSeries adds a tombstone for a series to the log file. -func (f *LogFile) DeleteSeriesID(seriesID uint64) error { - if seriesID == 0 { - return nil - } - - f.mu.Lock() - defer f.mu.Unlock() - - e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: seriesID} - if err := f.appendEntry(&e); err != nil { - return err - } - f.execEntry(&e) - return nil -} - // SeriesN returns the total number of series in the file. func (f *LogFile) SeriesN() (n uint64) { f.mu.RLock() @@ -656,9 +620,6 @@ func (f *LogFile) execDeleteTagValueEntry(e *LogEntry) { } func (f *LogFile) execSeriesEntry(e *LogEntry) { - // Check if series is deleted. - deleted := (e.Flag & LogEntrySeriesTombstoneFlag) != 0 - seriesKey := f.sfile.SeriesKey(e.SeriesID) assert(seriesKey != nil, "series key not found") @@ -669,14 +630,6 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) { name, remainder := ReadSeriesKeyMeasurement(remainder) mm := f.createMeasurementIfNotExists(name) - // Undelete measurement if it's been tombstoned previously. - if !deleted && mm.deleted { - mm.deleted = false - } - - // Mark series id tombstone. - mm.series[e.SeriesID] = deleted - // Read tag count. tagN, remainder := ReadSeriesKeyTagN(remainder) @@ -688,19 +641,12 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) { tv := ts.createTagValueIfNotExists(v) // Add a reference to the series on the tag value. - tv.series[e.SeriesID] = deleted + tv.series[e.SeriesID] = struct{}{} ts.tagValues[string(v)] = tv mm.tagSet[string(k)] = ts } - // Update the sketches. - if deleted { - // TODO(edd) decrement series count... - f.sTSketch.Add(seriesKey) // Deleting series so update tombstone sketch. - return - } - // TODO(edd) increment series count.... f.sSketch.Add(seriesKey) // Add series to sketch. f.mSketch.Add(name) // Add measurement to sketch as this may be the fist series for the measurement. @@ -718,8 +664,8 @@ func (f *LogFile) SeriesIDIterator() SeriesIDIterator { for _, mm := range f.mms { n += len(mm.series) a := make([]SeriesIDElem, 0, len(mm.series)) - for seriesID, deleted := range mm.series { - a = append(a, SeriesIDElem{SeriesID: seriesID, Deleted: deleted}) + for seriesID := range mm.series { + a = append(a, SeriesIDElem{SeriesID: seriesID}) } sort.Sort(SeriesIDElems(a)) mSeries = append(mSeries, a) @@ -1091,9 +1037,8 @@ func appendLogEntry(dst []byte, e *LogEntry) []byte { } type logSerie struct { - name []byte - tags models.Tags - deleted bool + name []byte + tags models.Tags } func (s *logSerie) String() string { @@ -1136,7 +1081,7 @@ type logMeasurement struct { name []byte tagSet map[string]logTagKey deleted bool - series map[uint64]bool + series map[uint64]struct{}{} } func (mm *logMeasurement) seriesIDs() []uint64 { @@ -1225,7 +1170,7 @@ func (a logTagKeySlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[ type logTagValue struct { name []byte deleted bool - series map[uint64]bool + series map[uint64]struct{}{} } func (tv *logTagValue) seriesIDs() []uint64 { diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index 917b2afe93..3a173ed2eb 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -470,7 +470,7 @@ func (i *Partition) DropMeasurement(name []byte) error { if err := func() error { i.mu.RLock() defer i.mu.RUnlock() - return i.activeLogFile.DeleteSeriesID(s.SeriesID) + return i.sfile.DeleteSeriesID(s.SeriesID) }(); err != nil { return err } @@ -529,7 +529,7 @@ func (i *Partition) DropSeries(key []byte) error { mname := []byte(name) seriesID := i.sfile.Offset(mname, tags, nil) - if err := i.activeLogFile.DeleteSeriesID(seriesID); err != nil { + if err := i.sfile.DeleteSeriesID(seriesID); err != nil { return err } diff --git a/tsdb/index/tsi1/series_file.go b/tsdb/index/tsi1/series_file.go index 7cbcd5a5c0..e95484dbe5 100644 --- a/tsdb/index/tsi1/series_file.go +++ b/tsdb/index/tsi1/series_file.go @@ -50,6 +50,7 @@ type SeriesFile struct { seriesMap *seriesMap compactingSeriesMap *seriesMap + tombstones map[uint64]struct{} // MaxSize is the maximum size of the file. MaxSize int64 @@ -58,8 +59,9 @@ type SeriesFile struct { // NewSeriesFile returns a new instance of SeriesFile. func NewSeriesFile(path string) *SeriesFile { return &SeriesFile{ - path: path, - MaxSize: DefaultMaxSeriesFileSize, + path: path, + tombstones: make(map[uint64]struct{}), + MaxSize: DefaultMaxSeriesFileSize, } } @@ -227,6 +229,13 @@ func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []mod return offsets, nil } +// DeleteSeries flags a series as permanently deleted. +// If the series is reintroduced later then it must create a new offset. +func (f *SeriesFile) DeleteSeries(offset uint64) error { + f.tombstones[offset] = struct{}{} + return nil +} + // Offset returns the byte offset of the series within the block. func (f *SeriesFile) Offset(name []byte, tags models.Tags, buf []byte) (offset uint64) { f.mu.RLock() @@ -236,7 +245,13 @@ func (f *SeriesFile) Offset(name []byte, tags models.Tags, buf []byte) (offset u } func (f *SeriesFile) offset(name []byte, tags models.Tags, buf []byte) uint64 { - return f.seriesMap.offset(AppendSeriesKey(buf[:0], name, tags)) + offset := f.seriesMap.offset(AppendSeriesKey(buf[:0], name, tags)) + if offset == 0 { + return 0 + } else if _, ok := f.tombstones[offset]; ok { + return 0 + } + return offset } // SeriesKey returns the series key for a given offset.