From 0652effb78cd0a34f7263343e35b1e170c3c07ac Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 19 Jan 2018 13:06:52 -0700 Subject: [PATCH] Interrupt TSI & Series File Compactions --- tsdb/index/tsi1/index.go | 3 +++ tsdb/index/tsi1/index_files.go | 47 +++++++++++++++++++++++++++++++++- tsdb/index/tsi1/log_file.go | 37 ++++++++++++++++++++++++-- tsdb/index/tsi1/partition.go | 14 +++++++--- tsdb/series_partition.go | 36 ++++++++++++++++++++------ 5 files changed, 123 insertions(+), 14 deletions(-) diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 9e48c0429f..5a3ddbd176 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -24,6 +24,9 @@ import ( // IndexName is the name of the index. const IndexName = "tsi1" +// ErrCompactionCancelled is returned if an index is closed while a compaction is occuring. +var ErrCompactionCancelled = errors.New("tsi1: compaction cancelled") + func init() { // FIXME(edd): Remove this. if os.Getenv("TSI_PARTITIONS") != "" { diff --git a/tsdb/index/tsi1/index_files.go b/tsdb/index/tsi1/index_files.go index 9be9f2b9dc..334a02ade3 100644 --- a/tsdb/index/tsi1/index_files.go +++ b/tsdb/index/tsi1/index_files.go @@ -151,14 +151,22 @@ func (p IndexFiles) TagValueSeriesIDIterator(name, key, value []byte) tsdb.Serie } // CompactTo merges all index files and writes them to w. -func (p IndexFiles) CompactTo(w io.Writer, sfile *tsdb.SeriesFile, m, k uint64) (n int64, err error) { +func (p IndexFiles) CompactTo(w io.Writer, sfile *tsdb.SeriesFile, m, k uint64, cancel <-chan struct{}) (n int64, err error) { var t IndexFileTrailer + // Check for cancellation. + select { + case <-cancel: + return n, ErrCompactionCancelled + default: + } + // Wrap writer in buffered I/O. bw := bufio.NewWriter(w) // Setup context object to track shared data for this compaction. var info indexCompactInfo + info.cancel = cancel info.tagSets = make(map[string]indexTagSetPos) // Write magic number. @@ -238,11 +246,19 @@ func (p IndexFiles) writeTagsetsTo(w io.Writer, info *indexCompactInfo, n *int64 func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactInfo, n *int64) error { var seriesIDs []uint64 + // Check for cancellation. + select { + case <-info.cancel: + return ErrCompactionCancelled + default: + } + kitr, err := p.TagKeyIterator(name) if err != nil { return err } + var seriesN int enc := NewTagBlockEncoder(w) for ke := kitr.Next(); ke != nil; ke = kitr.Next() { // Encode key. @@ -268,6 +284,15 @@ func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactIn break } seriesIDs = append(seriesIDs, se.SeriesID) + + // Check for cancellation periodically. + if seriesN++; seriesN%1000 == 0 { + select { + case <-info.cancel: + return ErrCompactionCancelled + default: + } + } } } @@ -301,9 +326,17 @@ func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactIn func (p IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo, n *int64) error { mw := NewMeasurementBlockWriter() + // Check for cancellation. + select { + case <-info.cancel: + return ErrCompactionCancelled + default: + } + // Add measurement data & compute sketches. mitr := p.MeasurementIterator() if mitr != nil { + var seriesN int for m := mitr.Next(); m != nil; m = mitr.Next() { name := m.Name() @@ -321,6 +354,15 @@ func (p IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo, break } seriesIDs = append(seriesIDs, e.SeriesID) + + // Check for cancellation periodically. + if seriesN++; seriesN%1000 == 0 { + select { + case <-info.cancel: + return ErrCompactionCancelled + default: + } + } } sort.Sort(uint64Slice(seriesIDs)) @@ -373,6 +415,9 @@ type IndexFilesInfo struct { // indexCompactInfo is a context object used for tracking position information // during the compaction of index files. type indexCompactInfo struct { + cancel <-chan struct{} + sfile *tsdb.SeriesFile + // Tracks offset/size for each measurement's tagset. tagSets map[string]indexTagSetPos } diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 896db6eecc..ffca941349 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -751,16 +751,24 @@ func (f *LogFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator } // CompactTo compacts the log file and writes it to w. -func (f *LogFile) CompactTo(w io.Writer, m, k uint64) (n int64, err error) { +func (f *LogFile) CompactTo(w io.Writer, m, k uint64, cancel <-chan struct{}) (n int64, err error) { f.mu.RLock() defer f.mu.RUnlock() + // Check for cancellation. + select { + case <-cancel: + return n, ErrCompactionCancelled + default: + } + // Wrap in bufferred writer. bw := bufio.NewWriter(w) // Setup compaction offset tracking data. var t IndexFileTrailer info := newLogFileCompactInfo() + info.cancel = cancel // Write magic number. if err := writeTo(bw, []byte(FileSignature), &n); err != nil { @@ -831,7 +839,15 @@ func (f *LogFile) writeTagsetsTo(w io.Writer, names []string, info *logFileCompa func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactInfo, n *int64) error { mm := f.mms[name] + // Check for cancellation. + select { + case <-info.cancel: + return ErrCompactionCancelled + default: + } + enc := NewTagBlockEncoder(w) + var valueN int for _, k := range mm.keys() { tag := mm.tagSet[k] @@ -855,6 +871,15 @@ func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactIn if err := enc.EncodeValue(value.name, value.deleted, value.seriesIDs()); err != nil { return err } + + // Check for cancellation periodically. + if valueN++; valueN%1000 == 0 { + select { + case <-info.cancel: + return ErrCompactionCancelled + default: + } + } } } @@ -879,6 +904,13 @@ func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactIn func (f *LogFile) writeMeasurementBlockTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error { mw := NewMeasurementBlockWriter() + // Check for cancellation. + select { + case <-info.cancel: + return ErrCompactionCancelled + default: + } + // Add measurement data. for _, name := range names { mm := f.mms[name] @@ -895,7 +927,8 @@ func (f *LogFile) writeMeasurementBlockTo(w io.Writer, names []string, info *log // logFileCompactInfo is a context object to track compaction position info. type logFileCompactInfo struct { - mms map[string]*logFileMeasurementCompactInfo + cancel <-chan struct{} + mms map[string]*logFileMeasurementCompactInfo } // newLogFileCompactInfo returns a new instance of logFileCompactInfo. diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index 963e4e149f..d08854f78c 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -844,6 +844,14 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int) { // Build a logger for this compaction. logger := i.logger.With(zap.String("token", generateCompactionToken())) + // Check for cancellation. + select { + case <-i.closing: + logger.Error("cannot begin compaction", zap.Error(ErrCompactionCancelled)) + return + default: + } + // Files have already been retained by caller. // Ensure files are released only once. var once sync.Once @@ -856,7 +864,7 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int) { path := filepath.Join(i.path, FormatIndexFileName(i.NextSequence(), level)) f, err := os.Create(path) if err != nil { - logger.Error("cannot create compation files", zap.Error(err)) + logger.Error("cannot create compaction files", zap.Error(err)) return } defer f.Close() @@ -868,7 +876,7 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int) { // Compact all index files to new index file. lvl := i.levels[level] - n, err := IndexFiles(files).CompactTo(f, i.sfile, lvl.M, lvl.K) + n, err := IndexFiles(files).CompactTo(f, i.sfile, lvl.M, lvl.K, i.closing) if err != nil { logger.Error("cannot compact index files", zap.Error(err)) return @@ -1007,7 +1015,7 @@ func (i *Partition) compactLogFile(logFile *LogFile) { // Compact log file to new index file. lvl := i.levels[1] - n, err := logFile.CompactTo(f, lvl.M, lvl.K) + n, err := logFile.CompactTo(f, lvl.M, lvl.K, i.closing) if err != nil { logger.Error("cannot compact log file", zap.Error(err), zap.String("path", logFile.Path())) return diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index a145ea3f1f..bebfd88e39 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -16,7 +16,8 @@ import ( ) var ( - ErrSeriesPartitionClosed = errors.New("tsdb: series partition closed") + ErrSeriesPartitionClosed = errors.New("tsdb: series partition closed") + ErrSeriesPartitionCompactionCancelled = errors.New("tsdb: series partition compaction cancelled") ) // DefaultSeriesPartitionCompactThreshold is the number of series IDs to hold in the in-memory @@ -25,11 +26,14 @@ const DefaultSeriesPartitionCompactThreshold = 1 << 17 // 128K // SeriesPartition represents a subset of series file data. type SeriesPartition struct { - mu sync.RWMutex - wg sync.WaitGroup - id int - path string - closed bool + mu sync.RWMutex + wg sync.WaitGroup + id int + path string + + closed bool + closing chan struct{} + once sync.Once segments []*SeriesSegment index *SeriesIndex @@ -48,6 +52,7 @@ func NewSeriesPartition(id int, path string) *SeriesPartition { return &SeriesPartition{ id: id, path: path, + closing: make(chan struct{}), CompactThreshold: DefaultSeriesPartitionCompactThreshold, Logger: zap.NewNop(), seq: uint64(id) + 1, @@ -134,6 +139,7 @@ func (p *SeriesPartition) openSegments() error { // Close unmaps the data files. func (p *SeriesPartition) Close() (err error) { + p.once.Do(func() { close(p.closing) }) p.wg.Wait() p.mu.Lock() @@ -260,7 +266,9 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio go func() { defer p.wg.Done() - if err := NewSeriesPartitionCompactor().Compact(p); err != nil { + compactor := NewSeriesPartitionCompactor() + compactor.cancel = p.closing + if err := compactor.Compact(p); err != nil { logger.With(zap.Error(err)).Error("series partition compaction failed") } @@ -472,7 +480,9 @@ func (p *SeriesPartition) seriesKeyByOffset(offset int64) []byte { } // SeriesPartitionCompactor represents an object reindexes a series partition and optionally compacts segments. -type SeriesPartitionCompactor struct{} +type SeriesPartitionCompactor struct { + cancel <-chan struct{} +} // NewSeriesPartitionCompactor returns a new instance of SeriesPartitionCompactor. func NewSeriesPartitionCompactor() *SeriesPartitionCompactor { @@ -530,6 +540,7 @@ func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN ui idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) // Reindex all partitions. + var entryN int for _, segment := range segments { errDone := errors.New("done") @@ -539,6 +550,15 @@ func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN ui return errDone } + // Check for cancellation periodically. + if entryN++; entryN%1000 == 0 { + select { + case <-c.cancel: + return ErrSeriesPartitionCompactionCancelled + default: + } + } + // Only process insert entries. switch flag { case SeriesEntryInsertFlag: // fallthrough