From d694454f47a197ecaf5eca6fd7014c55ce379e77 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Mon, 28 Sep 2015 21:04:02 -0400 Subject: [PATCH] Fix wal flushing, compacting, and write lock --- tsdb/config.go | 6 +- tsdb/engine/pd1/pd1.go | 157 ++++++++++++++++++++++++++++++------ tsdb/engine/pd1/pd1_test.go | 2 + tsdb/engine/pd1/wal.go | 26 +++--- 4 files changed, 153 insertions(+), 38 deletions(-) diff --git a/tsdb/config.go b/tsdb/config.go index 8716a45537..2039ff813b 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -45,9 +45,9 @@ const ( DefaultPartitionSizeThreshold = 50 * 1024 * 1024 // 50MB // Default WAL settings for the PD1 WAL - DefaultFlushMemorySizeThreshold = 10 * 1024 * 1024 // 10MB - DefaultMaxMemorySizeThreshold = 200 * 1024 * 1024 // 200MB - DefaultIndexCompactionAge = 10 * time.Minute + DefaultFlushMemorySizeThreshold = 5 * 1024 * 1024 // 5MB + DefaultMaxMemorySizeThreshold = 100 * 1024 * 1024 // 100MB + DefaultIndexCompactionAge = time.Minute DefaultIndexCompactionFileCount = 5 ) diff --git a/tsdb/engine/pd1/pd1.go b/tsdb/engine/pd1/pd1.go index a2d8687db9..54cb6d4750 100644 --- a/tsdb/engine/pd1/pd1.go +++ b/tsdb/engine/pd1/pd1.go @@ -55,7 +55,7 @@ const ( // DefaultRotateBlockSize is the default size to rotate to a new compressed block DefaultRotateBlockSize = 512 * 1024 // 512KB - DefaultRotateFileSize = 10 * 1024 * 1024 // 10MB + DefaultRotateFileSize = 5 * 1024 * 1024 // 5MB DefaultMaxPointsPerBlock = 1000 @@ -90,9 +90,10 @@ type Engine struct { CompactionFileCount int // filesLock is only for modifying and accessing the files slice - filesLock sync.RWMutex - files dataFiles - currentFileID int + filesLock sync.RWMutex + files dataFiles + currentFileID int + compactionRunning bool collisionsLock sync.RWMutex collisions map[string]uint64 @@ -264,16 +265,21 @@ func (e *Engine) Write(pointsByKey map[string]Values, measurementFieldsToSave ma return nil } - e.writeLock.LockRange(startTime, endTime) - defer e.writeLock.UnlockRange(startTime, endTime) + files, lockStart, lockEnd := e.filesAndLock(startTime, endTime) + defer e.writeLock.UnlockRange(lockStart, lockEnd) - if len(e.files) == 0 { + if len(files) == 0 { return e.rewriteFile(nil, valuesByID) } maxTime := int64(math.MaxInt64) + + // do the file rewrites in parallel + var mu sync.Mutex + var writes sync.WaitGroup + var errors []error + // reverse through the data files and write in the data - files := e.copyFilesCollection() for i := len(files) - 1; i >= 0; i-- { f := files[i] // max times are exclusive, so add 1 to it @@ -281,31 +287,105 @@ func (e *Engine) Write(pointsByKey map[string]Values, measurementFieldsToSave ma fileMin := f.MinTime() // if the file is < rotate, write all data between fileMin and maxTime if f.size < e.RotateFileSize { - if err := e.rewriteFile(f, e.filterDataBetweenTimes(valuesByID, fileMin, maxTime)); err != nil { - return err - } + writes.Add(1) + go func(df *dataFile, vals map[uint64]Values) { + if err := e.rewriteFile(df, vals); err != nil { + mu.Lock() + errors = append(errors, err) + mu.Unlock() + } + writes.Done() + }(f, e.filterDataBetweenTimes(valuesByID, fileMin, maxTime)) continue } // if the file is > rotate: // write all data between fileMax and maxTime into new file // write all data between fileMin and fileMax into old file - if err := e.rewriteFile(nil, e.filterDataBetweenTimes(valuesByID, fileMax, maxTime)); err != nil { - return err - } - if err := e.rewriteFile(f, e.filterDataBetweenTimes(valuesByID, fileMin, fileMax)); err != nil { - return err - } + writes.Add(1) + go func(vals map[uint64]Values) { + if err := e.rewriteFile(nil, vals); err != nil { + mu.Lock() + errors = append(errors, err) + mu.Unlock() + } + writes.Done() + }(e.filterDataBetweenTimes(valuesByID, fileMax, maxTime)) + writes.Add(1) + go func(df *dataFile, vals map[uint64]Values) { + if err := e.rewriteFile(df, vals); err != nil { + mu.Lock() + errors = append(errors, err) + mu.Unlock() + } + writes.Done() + }(f, e.filterDataBetweenTimes(valuesByID, fileMin, fileMax)) maxTime = fileMin } // for any data leftover, write into a new file since it's all older // than any file we currently have - err = e.rewriteFile(nil, valuesByID) + writes.Add(1) + go func() { + if err := e.rewriteFile(nil, valuesByID); err != nil { + mu.Lock() + errors = append(errors, err) + mu.Unlock() + } + writes.Done() + }() + + writes.Wait() + + if len(errors) > 0 { + // TODO: log errors + return errors[0] + } if !e.SkipCompaction && e.shouldCompact() { go e.Compact() } - return err + return nil +} + +// filesAndLock returns the data files that match the given range and +// ensures that the write lock will hold for the entire range +func (e *Engine) filesAndLock(min, max int64) (a dataFiles, lockStart, lockEnd int64) { + for { + a = make([]*dataFile, 0) + files := e.copyFilesCollection() + + for _, f := range e.files { + fmin, fmax := f.MinTime(), f.MaxTime() + if min < fmax && fmin >= fmin { + a = append(a, f) + } else if max >= fmin && max < fmax { + a = append(a, f) + } + } + + if len(a) > 0 { + lockStart = a[0].MinTime() + lockEnd = a[len(a)-1].MaxTime() + if max > lockEnd { + lockEnd = max + } + } else { + lockStart = min + lockEnd = max + } + + e.writeLock.LockRange(lockStart, lockEnd) + + // it's possible for compaction to change the files collection while we + // were waiting for a write lock on the range. Make sure the files are still the + // same after we got the lock, otherwise try again. This shouldn't happen often. + filesAfterLock := e.copyFilesCollection() + if reflect.DeepEqual(files, filesAfterLock) { + return + } + + e.writeLock.UnlockRange(lockStart, lockEnd) + } } func (e *Engine) Compact() error { @@ -333,7 +413,24 @@ func (e *Engine) Compact() error { // we've got the write lock and the files are all there break } - defer e.writeLock.UnlockRange(minTime, maxTime) + + // mark the compaction as running + e.filesLock.Lock() + e.compactionRunning = true + e.filesLock.Unlock() + defer func() { + //release the lock + e.writeLock.UnlockRange(minTime, maxTime) + + // see if we should run aonther compaction + if e.shouldCompact() { + go e.Compact() + } else { + e.filesLock.Lock() + e.compactionRunning = false + e.filesLock.Unlock() + } + }() positions := make([]uint32, len(files)) ids := make([]uint64, len(files)) @@ -472,6 +569,7 @@ func (e *Engine) Compact() error { for _, f := range files { if err := f.Delete(); err != nil { // TODO: log this error + fmt.Println("ERROR DELETING:", f.f.Name()) } } e.deletesPending.Done() @@ -527,6 +625,12 @@ func (e *Engine) writeIndexAndGetDataFile(f *os.File, minTime, maxTime int64, id } func (e *Engine) shouldCompact() bool { + e.filesLock.RLock() + running := e.compactionRunning + e.filesLock.RUnlock() + if running { + return false + } return len(e.filesToCompact()) >= e.CompactionFileCount } @@ -538,6 +642,10 @@ func (e *Engine) filesToCompact() dataFiles { for _, df := range e.files { if time.Since(df.modTime) > e.CompactionAge && df.size < MaxDataFileSize { a = append(a, df) + } else if len(a) > 0 { + // only compact contiguous ranges. If we hit the negative case and + // there are files to compact, stop here + break } } return a @@ -744,7 +852,8 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro minTime = v.MinTime() } if maxTime < v.MaxTime() { - maxTime = v.MaxTime() + // add 1 ns to the time since maxTime is exclusive + maxTime = v.MaxTime() + 1 } } @@ -896,7 +1005,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro e.deletesPending.Add(1) go func() { if err := oldDF.Delete(); err != nil { - // TODO: log this error + fmt.Println("ERROR DELETING FROM REWRITE:", oldDF.f.Name()) } e.deletesPending.Done() }() @@ -1312,7 +1421,7 @@ func (d *dataFile) indexPosition() uint32 { func (d *dataFile) StartingPositionForID(id uint64) uint32 { seriesCount := d.SeriesCount() - indexStart := d.size - uint32(seriesCount*12+20) + indexStart := d.indexPosition() min := uint32(0) max := uint32(seriesCount) @@ -1522,7 +1631,7 @@ func btou32(b []byte) uint32 { func hashSeriesField(key string) uint64 { h := fnv.New64a() h.Write([]byte(key)) - return h.Sum64() % 100 + return h.Sum64() } // seriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID diff --git a/tsdb/engine/pd1/pd1_test.go b/tsdb/engine/pd1/pd1_test.go index 52cb9717f3..35ec6c720f 100644 --- a/tsdb/engine/pd1/pd1_test.go +++ b/tsdb/engine/pd1/pd1_test.go @@ -363,6 +363,8 @@ func TestEngine_Compaction(t *testing.T) { } } + verify("cpu,host=A", []models.Point{p1, p3, p5, p7}, 0) + verify("cpu,host=B", []models.Point{p2, p4, p6, p8}, 0) if err := e.Close(); err != nil { t.Fatalf("error closing: %s", err.Error()) } diff --git a/tsdb/engine/pd1/wal.go b/tsdb/engine/pd1/wal.go index df1cbfe1b2..c4c6d23ebe 100644 --- a/tsdb/engine/pd1/wal.go +++ b/tsdb/engine/pd1/wal.go @@ -270,12 +270,14 @@ func (l *Log) addToCache(points []models.Point, fields map[string]*tsdb.Measurem defer l.cacheLock.Unlock() // if we should check memory and we're over the threshold, mark a flush as running and kick one off in a goroutine - if checkMemory && l.memorySize > l.MaxMemorySizeThreshold { + if checkMemory && l.memorySize > l.FlushMemorySizeThreshold { if !l.flushRunning { l.flushRunning = true go l.flush(memoryFlush) } - return false + if l.memorySize > l.MaxMemorySizeThreshold { + return false + } } for _, p := range points { @@ -401,7 +403,7 @@ func (l *Log) writeToLog(writeType walEntryType, data []byte) error { l.writeLock.Lock() defer l.writeLock.Unlock() - if l.currentSegmentFile == nil { + if l.currentSegmentFile == nil || l.currentSegmentSize > DefaultSegmentSize { if err := l.newSegmentFile(); err != nil { // fail hard since we can't write data panic(fmt.Sprintf("error opening new segment file for wal: %s", err.Error())) @@ -421,6 +423,8 @@ func (l *Log) writeToLog(writeType walEntryType, data []byte) error { panic(fmt.Sprintf("error writing data to wal: %s", err.Error())) } + l.currentSegmentSize += 5 + len(data) + return l.currentSegmentFile.Sync() } @@ -489,6 +493,7 @@ func (l *Log) flush(flush flushType) error { // only flush if there isn't one already running. Memory flushes are only triggered // by writes, which will mark the flush as running, so we can ignore it. l.cacheLock.Lock() + if l.flushRunning && flush != memoryFlush { l.cacheLock.Unlock() return nil @@ -523,19 +528,18 @@ func (l *Log) flush(flush flushType) error { l.writeLock.Unlock() // copy the cache items to new maps so we can empty them out - l.flushCache = l.cache - l.cache = make(map[string]Values) + l.flushCache = make(map[string]Values) for k, _ := range l.cacheDirtySort { l.flushCache[k] = l.flushCache[k].Deduplicate() } l.cacheDirtySort = make(map[string]bool) - valuesByKey := make(map[string]Values) valueCount := 0 - for key, v := range l.flushCache { - valuesByKey[key] = v + for key, v := range l.cache { + l.flushCache[key] = v valueCount += len(v) } + l.cache = make(map[string]Values) flushSize := l.memorySize @@ -553,7 +557,7 @@ func (l *Log) flush(flush flushType) error { l.cacheLock.Unlock() // exit if there's nothing to flush to the index - if len(valuesByKey) == 0 && len(mfc) == 0 && len(scc) == 0 { + if len(l.flushCache) == 0 && len(mfc) == 0 && len(scc) == 0 { return nil } @@ -564,11 +568,11 @@ func (l *Log) flush(flush flushType) error { } else if flush == startupFlush { ftype = "startup" } - l.logger.Printf("%s flush of %d keys with %d values of %d bytes\n", ftype, len(valuesByKey), valueCount, flushSize) + l.logger.Printf("%s flush of %d keys with %d values of %d bytes\n", ftype, len(l.flushCache), valueCount, flushSize) } startTime := time.Now() - if err := l.Index.Write(valuesByKey, mfc, scc); err != nil { + if err := l.Index.Write(l.flushCache, mfc, scc); err != nil { return err } if l.LoggingEnabled {