Interrupt TSI & Series File Compactions

pull/9344/head
Ben Johnson 2018-01-19 13:06:52 -07:00
parent 9f757290f8
commit 0652effb78
No known key found for this signature in database
GPG Key ID: 048846D1E3EB6818
5 changed files with 123 additions and 14 deletions

View File

@ -24,6 +24,9 @@ import (
// IndexName is the name of the index.
const IndexName = "tsi1"
// ErrCompactionCancelled is returned if an index is closed while a compaction is occuring.
var ErrCompactionCancelled = errors.New("tsi1: compaction cancelled")
func init() {
// FIXME(edd): Remove this.
if os.Getenv("TSI_PARTITIONS") != "" {

View File

@ -151,14 +151,22 @@ func (p IndexFiles) TagValueSeriesIDIterator(name, key, value []byte) tsdb.Serie
}
// CompactTo merges all index files and writes them to w.
func (p IndexFiles) CompactTo(w io.Writer, sfile *tsdb.SeriesFile, m, k uint64) (n int64, err error) {
func (p IndexFiles) CompactTo(w io.Writer, sfile *tsdb.SeriesFile, m, k uint64, cancel <-chan struct{}) (n int64, err error) {
var t IndexFileTrailer
// Check for cancellation.
select {
case <-cancel:
return n, ErrCompactionCancelled
default:
}
// Wrap writer in buffered I/O.
bw := bufio.NewWriter(w)
// Setup context object to track shared data for this compaction.
var info indexCompactInfo
info.cancel = cancel
info.tagSets = make(map[string]indexTagSetPos)
// Write magic number.
@ -238,11 +246,19 @@ func (p IndexFiles) writeTagsetsTo(w io.Writer, info *indexCompactInfo, n *int64
func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactInfo, n *int64) error {
var seriesIDs []uint64
// Check for cancellation.
select {
case <-info.cancel:
return ErrCompactionCancelled
default:
}
kitr, err := p.TagKeyIterator(name)
if err != nil {
return err
}
var seriesN int
enc := NewTagBlockEncoder(w)
for ke := kitr.Next(); ke != nil; ke = kitr.Next() {
// Encode key.
@ -268,6 +284,15 @@ func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactIn
break
}
seriesIDs = append(seriesIDs, se.SeriesID)
// Check for cancellation periodically.
if seriesN++; seriesN%1000 == 0 {
select {
case <-info.cancel:
return ErrCompactionCancelled
default:
}
}
}
}
@ -301,9 +326,17 @@ func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactIn
func (p IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo, n *int64) error {
mw := NewMeasurementBlockWriter()
// Check for cancellation.
select {
case <-info.cancel:
return ErrCompactionCancelled
default:
}
// Add measurement data & compute sketches.
mitr := p.MeasurementIterator()
if mitr != nil {
var seriesN int
for m := mitr.Next(); m != nil; m = mitr.Next() {
name := m.Name()
@ -321,6 +354,15 @@ func (p IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo,
break
}
seriesIDs = append(seriesIDs, e.SeriesID)
// Check for cancellation periodically.
if seriesN++; seriesN%1000 == 0 {
select {
case <-info.cancel:
return ErrCompactionCancelled
default:
}
}
}
sort.Sort(uint64Slice(seriesIDs))
@ -373,6 +415,9 @@ type IndexFilesInfo struct {
// indexCompactInfo is a context object used for tracking position information
// during the compaction of index files.
type indexCompactInfo struct {
cancel <-chan struct{}
sfile *tsdb.SeriesFile
// Tracks offset/size for each measurement's tagset.
tagSets map[string]indexTagSetPos
}

View File

@ -751,16 +751,24 @@ func (f *LogFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator
}
// CompactTo compacts the log file and writes it to w.
func (f *LogFile) CompactTo(w io.Writer, m, k uint64) (n int64, err error) {
func (f *LogFile) CompactTo(w io.Writer, m, k uint64, cancel <-chan struct{}) (n int64, err error) {
f.mu.RLock()
defer f.mu.RUnlock()
// Check for cancellation.
select {
case <-cancel:
return n, ErrCompactionCancelled
default:
}
// Wrap in bufferred writer.
bw := bufio.NewWriter(w)
// Setup compaction offset tracking data.
var t IndexFileTrailer
info := newLogFileCompactInfo()
info.cancel = cancel
// Write magic number.
if err := writeTo(bw, []byte(FileSignature), &n); err != nil {
@ -831,7 +839,15 @@ func (f *LogFile) writeTagsetsTo(w io.Writer, names []string, info *logFileCompa
func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactInfo, n *int64) error {
mm := f.mms[name]
// Check for cancellation.
select {
case <-info.cancel:
return ErrCompactionCancelled
default:
}
enc := NewTagBlockEncoder(w)
var valueN int
for _, k := range mm.keys() {
tag := mm.tagSet[k]
@ -855,6 +871,15 @@ func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactIn
if err := enc.EncodeValue(value.name, value.deleted, value.seriesIDs()); err != nil {
return err
}
// Check for cancellation periodically.
if valueN++; valueN%1000 == 0 {
select {
case <-info.cancel:
return ErrCompactionCancelled
default:
}
}
}
}
@ -879,6 +904,13 @@ func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactIn
func (f *LogFile) writeMeasurementBlockTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error {
mw := NewMeasurementBlockWriter()
// Check for cancellation.
select {
case <-info.cancel:
return ErrCompactionCancelled
default:
}
// Add measurement data.
for _, name := range names {
mm := f.mms[name]
@ -895,7 +927,8 @@ func (f *LogFile) writeMeasurementBlockTo(w io.Writer, names []string, info *log
// logFileCompactInfo is a context object to track compaction position info.
type logFileCompactInfo struct {
mms map[string]*logFileMeasurementCompactInfo
cancel <-chan struct{}
mms map[string]*logFileMeasurementCompactInfo
}
// newLogFileCompactInfo returns a new instance of logFileCompactInfo.

View File

@ -844,6 +844,14 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int) {
// Build a logger for this compaction.
logger := i.logger.With(zap.String("token", generateCompactionToken()))
// Check for cancellation.
select {
case <-i.closing:
logger.Error("cannot begin compaction", zap.Error(ErrCompactionCancelled))
return
default:
}
// Files have already been retained by caller.
// Ensure files are released only once.
var once sync.Once
@ -856,7 +864,7 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int) {
path := filepath.Join(i.path, FormatIndexFileName(i.NextSequence(), level))
f, err := os.Create(path)
if err != nil {
logger.Error("cannot create compation files", zap.Error(err))
logger.Error("cannot create compaction files", zap.Error(err))
return
}
defer f.Close()
@ -868,7 +876,7 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int) {
// Compact all index files to new index file.
lvl := i.levels[level]
n, err := IndexFiles(files).CompactTo(f, i.sfile, lvl.M, lvl.K)
n, err := IndexFiles(files).CompactTo(f, i.sfile, lvl.M, lvl.K, i.closing)
if err != nil {
logger.Error("cannot compact index files", zap.Error(err))
return
@ -1007,7 +1015,7 @@ func (i *Partition) compactLogFile(logFile *LogFile) {
// Compact log file to new index file.
lvl := i.levels[1]
n, err := logFile.CompactTo(f, lvl.M, lvl.K)
n, err := logFile.CompactTo(f, lvl.M, lvl.K, i.closing)
if err != nil {
logger.Error("cannot compact log file", zap.Error(err), zap.String("path", logFile.Path()))
return

View File

@ -16,7 +16,8 @@ import (
)
var (
ErrSeriesPartitionClosed = errors.New("tsdb: series partition closed")
ErrSeriesPartitionClosed = errors.New("tsdb: series partition closed")
ErrSeriesPartitionCompactionCancelled = errors.New("tsdb: series partition compaction cancelled")
)
// DefaultSeriesPartitionCompactThreshold is the number of series IDs to hold in the in-memory
@ -25,11 +26,14 @@ const DefaultSeriesPartitionCompactThreshold = 1 << 17 // 128K
// SeriesPartition represents a subset of series file data.
type SeriesPartition struct {
mu sync.RWMutex
wg sync.WaitGroup
id int
path string
closed bool
mu sync.RWMutex
wg sync.WaitGroup
id int
path string
closed bool
closing chan struct{}
once sync.Once
segments []*SeriesSegment
index *SeriesIndex
@ -48,6 +52,7 @@ func NewSeriesPartition(id int, path string) *SeriesPartition {
return &SeriesPartition{
id: id,
path: path,
closing: make(chan struct{}),
CompactThreshold: DefaultSeriesPartitionCompactThreshold,
Logger: zap.NewNop(),
seq: uint64(id) + 1,
@ -134,6 +139,7 @@ func (p *SeriesPartition) openSegments() error {
// Close unmaps the data files.
func (p *SeriesPartition) Close() (err error) {
p.once.Do(func() { close(p.closing) })
p.wg.Wait()
p.mu.Lock()
@ -260,7 +266,9 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio
go func() {
defer p.wg.Done()
if err := NewSeriesPartitionCompactor().Compact(p); err != nil {
compactor := NewSeriesPartitionCompactor()
compactor.cancel = p.closing
if err := compactor.Compact(p); err != nil {
logger.With(zap.Error(err)).Error("series partition compaction failed")
}
@ -472,7 +480,9 @@ func (p *SeriesPartition) seriesKeyByOffset(offset int64) []byte {
}
// SeriesPartitionCompactor represents an object reindexes a series partition and optionally compacts segments.
type SeriesPartitionCompactor struct{}
type SeriesPartitionCompactor struct {
cancel <-chan struct{}
}
// NewSeriesPartitionCompactor returns a new instance of SeriesPartitionCompactor.
func NewSeriesPartitionCompactor() *SeriesPartitionCompactor {
@ -530,6 +540,7 @@ func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN ui
idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
// Reindex all partitions.
var entryN int
for _, segment := range segments {
errDone := errors.New("done")
@ -539,6 +550,15 @@ func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN ui
return errDone
}
// Check for cancellation periodically.
if entryN++; entryN%1000 == 0 {
select {
case <-c.cancel:
return ErrSeriesPartitionCompactionCancelled
default:
}
}
// Only process insert entries.
switch flag {
case SeriesEntryInsertFlag: // fallthrough