Fix wal flushing, compacting, and write lock

pull/4308/head
Paul Dix 2015-09-28 21:04:02 -04:00
parent 6c94e738a0
commit d694454f47
4 changed files with 153 additions and 38 deletions

View File

@ -45,9 +45,9 @@ const (
DefaultPartitionSizeThreshold = 50 * 1024 * 1024 // 50MB
// Default WAL settings for the PD1 WAL
DefaultFlushMemorySizeThreshold = 10 * 1024 * 1024 // 10MB
DefaultMaxMemorySizeThreshold = 200 * 1024 * 1024 // 200MB
DefaultIndexCompactionAge = 10 * time.Minute
DefaultFlushMemorySizeThreshold = 5 * 1024 * 1024 // 5MB
DefaultMaxMemorySizeThreshold = 100 * 1024 * 1024 // 100MB
DefaultIndexCompactionAge = time.Minute
DefaultIndexCompactionFileCount = 5
)

View File

@ -55,7 +55,7 @@ const (
// DefaultRotateBlockSize is the default size to rotate to a new compressed block
DefaultRotateBlockSize = 512 * 1024 // 512KB
DefaultRotateFileSize = 10 * 1024 * 1024 // 10MB
DefaultRotateFileSize = 5 * 1024 * 1024 // 5MB
DefaultMaxPointsPerBlock = 1000
@ -90,9 +90,10 @@ type Engine struct {
CompactionFileCount int
// filesLock is only for modifying and accessing the files slice
filesLock sync.RWMutex
files dataFiles
currentFileID int
filesLock sync.RWMutex
files dataFiles
currentFileID int
compactionRunning bool
collisionsLock sync.RWMutex
collisions map[string]uint64
@ -264,16 +265,21 @@ func (e *Engine) Write(pointsByKey map[string]Values, measurementFieldsToSave ma
return nil
}
e.writeLock.LockRange(startTime, endTime)
defer e.writeLock.UnlockRange(startTime, endTime)
files, lockStart, lockEnd := e.filesAndLock(startTime, endTime)
defer e.writeLock.UnlockRange(lockStart, lockEnd)
if len(e.files) == 0 {
if len(files) == 0 {
return e.rewriteFile(nil, valuesByID)
}
maxTime := int64(math.MaxInt64)
// do the file rewrites in parallel
var mu sync.Mutex
var writes sync.WaitGroup
var errors []error
// reverse through the data files and write in the data
files := e.copyFilesCollection()
for i := len(files) - 1; i >= 0; i-- {
f := files[i]
// max times are exclusive, so add 1 to it
@ -281,31 +287,105 @@ func (e *Engine) Write(pointsByKey map[string]Values, measurementFieldsToSave ma
fileMin := f.MinTime()
// if the file is < rotate, write all data between fileMin and maxTime
if f.size < e.RotateFileSize {
if err := e.rewriteFile(f, e.filterDataBetweenTimes(valuesByID, fileMin, maxTime)); err != nil {
return err
}
writes.Add(1)
go func(df *dataFile, vals map[uint64]Values) {
if err := e.rewriteFile(df, vals); err != nil {
mu.Lock()
errors = append(errors, err)
mu.Unlock()
}
writes.Done()
}(f, e.filterDataBetweenTimes(valuesByID, fileMin, maxTime))
continue
}
// if the file is > rotate:
// write all data between fileMax and maxTime into new file
// write all data between fileMin and fileMax into old file
if err := e.rewriteFile(nil, e.filterDataBetweenTimes(valuesByID, fileMax, maxTime)); err != nil {
return err
}
if err := e.rewriteFile(f, e.filterDataBetweenTimes(valuesByID, fileMin, fileMax)); err != nil {
return err
}
writes.Add(1)
go func(vals map[uint64]Values) {
if err := e.rewriteFile(nil, vals); err != nil {
mu.Lock()
errors = append(errors, err)
mu.Unlock()
}
writes.Done()
}(e.filterDataBetweenTimes(valuesByID, fileMax, maxTime))
writes.Add(1)
go func(df *dataFile, vals map[uint64]Values) {
if err := e.rewriteFile(df, vals); err != nil {
mu.Lock()
errors = append(errors, err)
mu.Unlock()
}
writes.Done()
}(f, e.filterDataBetweenTimes(valuesByID, fileMin, fileMax))
maxTime = fileMin
}
// for any data leftover, write into a new file since it's all older
// than any file we currently have
err = e.rewriteFile(nil, valuesByID)
writes.Add(1)
go func() {
if err := e.rewriteFile(nil, valuesByID); err != nil {
mu.Lock()
errors = append(errors, err)
mu.Unlock()
}
writes.Done()
}()
writes.Wait()
if len(errors) > 0 {
// TODO: log errors
return errors[0]
}
if !e.SkipCompaction && e.shouldCompact() {
go e.Compact()
}
return err
return nil
}
// filesAndLock returns the data files that match the given range and
// ensures that the write lock will hold for the entire range
func (e *Engine) filesAndLock(min, max int64) (a dataFiles, lockStart, lockEnd int64) {
for {
a = make([]*dataFile, 0)
files := e.copyFilesCollection()
for _, f := range e.files {
fmin, fmax := f.MinTime(), f.MaxTime()
if min < fmax && fmin >= fmin {
a = append(a, f)
} else if max >= fmin && max < fmax {
a = append(a, f)
}
}
if len(a) > 0 {
lockStart = a[0].MinTime()
lockEnd = a[len(a)-1].MaxTime()
if max > lockEnd {
lockEnd = max
}
} else {
lockStart = min
lockEnd = max
}
e.writeLock.LockRange(lockStart, lockEnd)
// it's possible for compaction to change the files collection while we
// were waiting for a write lock on the range. Make sure the files are still the
// same after we got the lock, otherwise try again. This shouldn't happen often.
filesAfterLock := e.copyFilesCollection()
if reflect.DeepEqual(files, filesAfterLock) {
return
}
e.writeLock.UnlockRange(lockStart, lockEnd)
}
}
func (e *Engine) Compact() error {
@ -333,7 +413,24 @@ func (e *Engine) Compact() error {
// we've got the write lock and the files are all there
break
}
defer e.writeLock.UnlockRange(minTime, maxTime)
// mark the compaction as running
e.filesLock.Lock()
e.compactionRunning = true
e.filesLock.Unlock()
defer func() {
//release the lock
e.writeLock.UnlockRange(minTime, maxTime)
// see if we should run aonther compaction
if e.shouldCompact() {
go e.Compact()
} else {
e.filesLock.Lock()
e.compactionRunning = false
e.filesLock.Unlock()
}
}()
positions := make([]uint32, len(files))
ids := make([]uint64, len(files))
@ -472,6 +569,7 @@ func (e *Engine) Compact() error {
for _, f := range files {
if err := f.Delete(); err != nil {
// TODO: log this error
fmt.Println("ERROR DELETING:", f.f.Name())
}
}
e.deletesPending.Done()
@ -527,6 +625,12 @@ func (e *Engine) writeIndexAndGetDataFile(f *os.File, minTime, maxTime int64, id
}
func (e *Engine) shouldCompact() bool {
e.filesLock.RLock()
running := e.compactionRunning
e.filesLock.RUnlock()
if running {
return false
}
return len(e.filesToCompact()) >= e.CompactionFileCount
}
@ -538,6 +642,10 @@ func (e *Engine) filesToCompact() dataFiles {
for _, df := range e.files {
if time.Since(df.modTime) > e.CompactionAge && df.size < MaxDataFileSize {
a = append(a, df)
} else if len(a) > 0 {
// only compact contiguous ranges. If we hit the negative case and
// there are files to compact, stop here
break
}
}
return a
@ -744,7 +852,8 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro
minTime = v.MinTime()
}
if maxTime < v.MaxTime() {
maxTime = v.MaxTime()
// add 1 ns to the time since maxTime is exclusive
maxTime = v.MaxTime() + 1
}
}
@ -896,7 +1005,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro
e.deletesPending.Add(1)
go func() {
if err := oldDF.Delete(); err != nil {
// TODO: log this error
fmt.Println("ERROR DELETING FROM REWRITE:", oldDF.f.Name())
}
e.deletesPending.Done()
}()
@ -1312,7 +1421,7 @@ func (d *dataFile) indexPosition() uint32 {
func (d *dataFile) StartingPositionForID(id uint64) uint32 {
seriesCount := d.SeriesCount()
indexStart := d.size - uint32(seriesCount*12+20)
indexStart := d.indexPosition()
min := uint32(0)
max := uint32(seriesCount)
@ -1522,7 +1631,7 @@ func btou32(b []byte) uint32 {
func hashSeriesField(key string) uint64 {
h := fnv.New64a()
h.Write([]byte(key))
return h.Sum64() % 100
return h.Sum64()
}
// seriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID

View File

@ -363,6 +363,8 @@ func TestEngine_Compaction(t *testing.T) {
}
}
verify("cpu,host=A", []models.Point{p1, p3, p5, p7}, 0)
verify("cpu,host=B", []models.Point{p2, p4, p6, p8}, 0)
if err := e.Close(); err != nil {
t.Fatalf("error closing: %s", err.Error())
}

View File

@ -270,12 +270,14 @@ func (l *Log) addToCache(points []models.Point, fields map[string]*tsdb.Measurem
defer l.cacheLock.Unlock()
// if we should check memory and we're over the threshold, mark a flush as running and kick one off in a goroutine
if checkMemory && l.memorySize > l.MaxMemorySizeThreshold {
if checkMemory && l.memorySize > l.FlushMemorySizeThreshold {
if !l.flushRunning {
l.flushRunning = true
go l.flush(memoryFlush)
}
return false
if l.memorySize > l.MaxMemorySizeThreshold {
return false
}
}
for _, p := range points {
@ -401,7 +403,7 @@ func (l *Log) writeToLog(writeType walEntryType, data []byte) error {
l.writeLock.Lock()
defer l.writeLock.Unlock()
if l.currentSegmentFile == nil {
if l.currentSegmentFile == nil || l.currentSegmentSize > DefaultSegmentSize {
if err := l.newSegmentFile(); err != nil {
// fail hard since we can't write data
panic(fmt.Sprintf("error opening new segment file for wal: %s", err.Error()))
@ -421,6 +423,8 @@ func (l *Log) writeToLog(writeType walEntryType, data []byte) error {
panic(fmt.Sprintf("error writing data to wal: %s", err.Error()))
}
l.currentSegmentSize += 5 + len(data)
return l.currentSegmentFile.Sync()
}
@ -489,6 +493,7 @@ func (l *Log) flush(flush flushType) error {
// only flush if there isn't one already running. Memory flushes are only triggered
// by writes, which will mark the flush as running, so we can ignore it.
l.cacheLock.Lock()
if l.flushRunning && flush != memoryFlush {
l.cacheLock.Unlock()
return nil
@ -523,19 +528,18 @@ func (l *Log) flush(flush flushType) error {
l.writeLock.Unlock()
// copy the cache items to new maps so we can empty them out
l.flushCache = l.cache
l.cache = make(map[string]Values)
l.flushCache = make(map[string]Values)
for k, _ := range l.cacheDirtySort {
l.flushCache[k] = l.flushCache[k].Deduplicate()
}
l.cacheDirtySort = make(map[string]bool)
valuesByKey := make(map[string]Values)
valueCount := 0
for key, v := range l.flushCache {
valuesByKey[key] = v
for key, v := range l.cache {
l.flushCache[key] = v
valueCount += len(v)
}
l.cache = make(map[string]Values)
flushSize := l.memorySize
@ -553,7 +557,7 @@ func (l *Log) flush(flush flushType) error {
l.cacheLock.Unlock()
// exit if there's nothing to flush to the index
if len(valuesByKey) == 0 && len(mfc) == 0 && len(scc) == 0 {
if len(l.flushCache) == 0 && len(mfc) == 0 && len(scc) == 0 {
return nil
}
@ -564,11 +568,11 @@ func (l *Log) flush(flush flushType) error {
} else if flush == startupFlush {
ftype = "startup"
}
l.logger.Printf("%s flush of %d keys with %d values of %d bytes\n", ftype, len(valuesByKey), valueCount, flushSize)
l.logger.Printf("%s flush of %d keys with %d values of %d bytes\n", ftype, len(l.flushCache), valueCount, flushSize)
}
startTime := time.Now()
if err := l.Index.Write(valuesByKey, mfc, scc); err != nil {
if err := l.Index.Write(l.flushCache, mfc, scc); err != nil {
return err
}
if l.LoggingEnabled {