diff --git a/tsdb/engine/wal/wal.go b/tsdb/engine/wal/wal.go index 32186175ff..fe5a4b95b1 100644 --- a/tsdb/engine/wal/wal.go +++ b/tsdb/engine/wal/wal.go @@ -53,7 +53,7 @@ const ( // DefaultFlushColdInterval specifies how long after a partition has been cold // for writes that a full flush and compaction are forced - DefaultFlushColdInterval = 30 * time.Minute + DefaultFlushColdInterval = 5 * time.Minute // DefaultParititionSizeThreshold specifies when a partition gets to this size in // memory, we should slow down writes until it gets a chance to compact. @@ -74,6 +74,24 @@ const ( defaultFlushCheckInterval = time.Second ) +// flushType indiciates why a flush and compaction are being run so the partition can +// do the appropriate type of compaction +type flushType int + +const ( + // noFlush indicates that no flush or compaction are necesssary at this time + noFlush flushType = iota + // memoryFlush indicates that we should look for the series using the most + // memory to flush out and compact all others + memoryFlush + // idleFlush indicates that we should flush all series in the parition, + // delete all segment files and hold off on opening a new one + idleFlush + // thresholdFlush indicates that we should flush all series over the ReadySize + // and compact all other series + thresholdFlush +) + var ( // ErrCompactionRunning to return if we attempt to run a compaction on a partition that is currently running one ErrCompactionRunning = errors.New("compaction running") @@ -108,9 +126,9 @@ type Log struct { mu sync.RWMutex partitions map[uint8]*Partition - // FlushInterval is the period of time after which a partition will do a + // FlushColdInterval is the period of time after which a partition will do a // full flush and compaction if it has been cold for writes. - FlushInterval time.Duration + FlushColdInterval time.Duration // SegmentSize is the file size at which a segment file will be rotated in a partition. SegmentSize int64 @@ -151,7 +169,7 @@ func NewLog(path string) *Log { // these options should be overriden by any options in the config LogOutput: os.Stderr, - FlushInterval: DefaultFlushColdInterval, + FlushColdInterval: DefaultFlushColdInterval, SegmentSize: DefaultSegmentSize, MaxSeriesSize: DefaultMaxSeriesSize, CompactionThreshold: DefaultCompactionThreshold, @@ -164,10 +182,14 @@ func NewLog(path string) *Log { // Open opens and initializes the Log. Will recover from previous unclosed shutdowns func (l *Log) Open() error { + if err := os.MkdirAll(l.path, 0777); err != nil { + return err + } + // open the partitions l.partitions = make(map[uint8]*Partition) for i := uint64(1); i <= l.partitionCount; i++ { - p, err := NewPartition(uint8(i), l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.Index) + p, err := NewPartition(uint8(i), l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index) if err != nil { return err } @@ -302,8 +324,8 @@ func (l *Log) triggerAutoFlush() { l.mu.RLock() defer l.mu.RUnlock() for _, p := range l.partitions { - if p.shouldFlush(l.MaxSeriesSize, l.CompactionThreshold) { - if err := p.flushAndCompact(false); err != nil { + if f := p.shouldFlush(l.MaxSeriesSize, l.CompactionThreshold); f != noFlush { + if err := p.flushAndCompact(f); err != nil { l.logger.Printf("error flushing partition %d: %s\n", p.id, err) } } @@ -338,7 +360,7 @@ func (l *Log) partition(key []byte) *Partition { id := uint8(h.Sum64()%l.partitionCount + 1) p := l.partitions[id] if p == nil { - if p, err := NewPartition(id, l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.Index); err != nil { + if p, err := NewPartition(id, l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index); err != nil { panic(err) } else { @@ -358,7 +380,6 @@ type Partition struct { currentSegmentSize int64 currentSegmentID uint32 lastFileID uint32 - lastWrite time.Time maxSegmentSize int64 cache map[string][][]byte @@ -379,20 +400,26 @@ type Partition struct { cacheDirtySort map[string]bool // will be true if the key needs to be sorted cacheSizes map[string]int compactionRunning bool + + // flushColdInterval and lastWriteTime are used to determin if a partition should + // be flushed because it has been idle for writes. + flushColdInterval time.Duration + lastWriteTime time.Time } -func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64, readySeriesSize int, index IndexWriter) (*Partition, error) { +func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64, readySeriesSize int, flushColdInterval time.Duration, index IndexWriter) (*Partition, error) { return &Partition{ - id: id, - path: path, - maxSegmentSize: segmentSize, - sizeThreshold: sizeThreshold, - lastWrite: time.Now(), - cache: make(map[string][][]byte), - cacheDirtySort: make(map[string]bool), - cacheSizes: make(map[string]int), - readySeriesSize: readySeriesSize, - index: index, + id: id, + path: path, + maxSegmentSize: segmentSize, + sizeThreshold: sizeThreshold, + lastWriteTime: time.Now(), + cache: make(map[string][][]byte), + cacheDirtySort: make(map[string]bool), + cacheSizes: make(map[string]int), + readySeriesSize: readySeriesSize, + index: index, + flushColdInterval: flushColdInterval, }, nil } @@ -432,7 +459,7 @@ func (p *Partition) Write(points []tsdb.Point) error { return 0, false }(); ok { - go p.flushAndCompact(true) + go p.flushAndCompact(memoryFlush) time.Sleep(backoff) } p.mu.Lock() @@ -463,7 +490,7 @@ func (p *Partition) Write(points []tsdb.Point) error { } p.currentSegmentSize += int64(8 + len(b)) - p.lastWrite = time.Now() + p.lastWriteTime = time.Now() for _, pp := range points { p.addToCache(pp.Key(), pp.Data(), pp.UnixNano()) @@ -515,37 +542,45 @@ func (p *Partition) fileIDFromName(name string) (uint32, error) { return uint32(id), nil } -// shouldFlush returns true if a partition should be flushed. The criteria are: +// shouldFlush returns a flushType that indicates if a partition should be flushed and why. The criteria are: // maxSeriesSize - flush if any series in the partition has exceeded this size threshold // readySeriesSize - a series is ready to flush once it has this much data in it // compactionThreshold - a partition is ready to flush if this percentage of series has hit the readySeriesSize or greater -func (p *Partition) shouldFlush(maxSeriesSize int, compactionThreshold float64) bool { +func (p *Partition) shouldFlush(maxSeriesSize int, compactionThreshold float64) flushType { p.mu.Lock() defer p.mu.Unlock() + if len(p.cache) == 0 { + return noFlush + } + if p.memorySize > p.sizeThreshold { - return true + return memoryFlush + } + + if time.Since(p.lastWriteTime) > p.flushColdInterval { + return idleFlush } countReady := 0 for _, s := range p.cacheSizes { if s > maxSeriesSize { - return true + return thresholdFlush } else if s > p.readySeriesSize { countReady += 1 } } if float64(countReady)/float64(len(p.cacheSizes)) > compactionThreshold { - return true + return thresholdFlush } - return false + return noFlush } // prepareSeriesToFlush will empty the cache of series that are ready based on their size // and return information for the compaction process to use. -func (p *Partition) prepareSeriesToFlush(readySeriesSize int, triggerByMemory bool) (*compactionInfo, error) { +func (p *Partition) prepareSeriesToFlush(readySeriesSize int, flush flushType) (*compactionInfo, error) { p.mu.Lock() defer p.mu.Unlock() @@ -553,7 +588,7 @@ func (p *Partition) prepareSeriesToFlush(readySeriesSize int, triggerByMemory bo // memory pressure, just return from here if p.compactionRunning { return nil, ErrCompactionRunning - } else if triggerByMemory && p.memorySize < p.sizeThreshold { + } else if flush == memoryFlush && p.memorySize < p.sizeThreshold { return nil, ErrMemoryCompactionDone } p.compactionRunning = true @@ -563,24 +598,49 @@ func (p *Partition) prepareSeriesToFlush(readySeriesSize int, triggerByMemory bo // if we didn't come up with any var seriesToFlush map[string][][]byte var size int - for { - s, n := p.seriesToFlush(readySeriesSize) - if len(s) > 0 { - seriesToFlush = s - size += n - break + + // if this flush is being triggered because the partition is idle, all series hit the threshold + if flush == idleFlush { + for _, s := range p.cacheSizes { + size += s + } + seriesToFlush = p.cache + p.cache = make(map[string][][]byte) + p.cacheDirtySort = make(map[string]bool) + p.cacheSizes = make(map[string]int) + } else { // only grab the series that hit the thresold + for { + s, n := p.seriesToFlush(readySeriesSize) + if len(s) > 0 { + seriesToFlush = s + size += n + break + } + // we didn't get any series to flush so cut the ready size in half + // and see if there are series that are ready at that level + readySeriesSize = readySeriesSize / 2 } - // we didn't get any series to flush so cut the ready size in half - // and see if there are series that are ready at that level - readySeriesSize = readySeriesSize / 2 } c := &compactionInfo{seriesToFlush: seriesToFlush, flushSize: size} - // roll over a new segment file so we can compact all the old ones - if err := p.newSegmentFile(); err != nil { - return nil, err + if flush == idleFlush { + // don't create a new segment file because this partition is idle + if p.currentSegmentFile != nil { + if err := p.currentSegmentFile.Close(); err != nil { + return nil, err + } + } + p.currentSegmentFile = nil + p.currentSegmentID += 1 + p.currentSegmentSize = 0 + } else { + // roll over a new segment file so we can compact all the old ones + if err := p.newSegmentFile(); err != nil { + return nil, err + } } + p.flushCache = c.seriesToFlush c.compactFilesLessThan = p.currentSegmentID @@ -613,13 +673,15 @@ func (p *Partition) seriesToFlush(readySeriesSize int) (map[string][][]byte, int // flushAndCompact will flush any series that are over their threshold and then read in all old segment files and // write the data that was not flushed to a new file -func (p *Partition) flushAndCompact(triggerByMemory bool) error { - c, err := p.prepareSeriesToFlush(p.readySeriesSize, triggerByMemory) +func (p *Partition) flushAndCompact(flush flushType) error { + c, err := p.prepareSeriesToFlush(p.readySeriesSize, flush) if err == ErrCompactionRunning || err == ErrMemoryCompactionDone { return nil } else if err != nil { return err + } else if len(c.seriesToFlush) == 0 { // nothing to flush! + return nil } // write the data to the index first @@ -834,12 +896,14 @@ func (p *Partition) cursor(key string) *cursor { // if we're in the middle of a flush, combine the previous cache // with this one for the cursor - if fc, ok := p.flushCache[key]; ok { - c := make([][]byte, len(fc), len(fc)+len(cache)) - copy(c, fc) - c = append(c, cache...) - sort.Sort(byteSlices(c)) - return &cursor{cache: c, position: -1} + if p.flushCache != nil { + if fc, ok := p.flushCache[key]; ok { + c := make([][]byte, len(fc), len(fc)+len(cache)) + copy(c, fc) + c = append(c, cache...) + sort.Sort(byteSlices(c)) + return &cursor{cache: c, position: -1} + } } if p.cacheDirtySort[key] { diff --git a/tsdb/engine/wal/wal_test.go b/tsdb/engine/wal/wal_test.go index 1fe4901a1f..258450a775 100644 --- a/tsdb/engine/wal/wal_test.go +++ b/tsdb/engine/wal/wal_test.go @@ -333,7 +333,7 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) { buf.WriteString(fmt.Sprintf("cpu,host=A,region=useast3 value=%.3f %d\n", rand.Float64(), i)) // ensure that as a whole its not ready for flushing yet - if log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold) { + if log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold) != noFlush { t.Fatal("expected partition 1 to return false from shouldFlush") } } @@ -349,15 +349,15 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) { c := log.Cursor("cpu,host=A,region=uswest23") k, v := c.Next() if btou64(k) != 1 { - fmt.Println("expected data ", k, v) + t.Fatalf("expected timestamp of 1, but got %v %v", k, v) } // ensure it is marked as should flush because of the threshold - if !log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold) { + if log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold) != thresholdFlush { t.Fatal("expected partition 1 to return true from shouldFlush") } - if err := log.partitions[1].flushAndCompact(false); err != nil { + if err := log.partitions[1].flushAndCompact(thresholdFlush); err != nil { t.Fatalf("error flushing and compacting: %s", err.Error()) } @@ -395,6 +395,75 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) { } } +// Ensure the wal forces a full flush after not having a write in a given interval of time +func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) { + log := openTestWAL() + log.partitionCount = 1 + + // set this low + log.flushCheckInterval = 10 * time.Millisecond + log.FlushColdInterval = 500 * time.Millisecond + + defer log.Close() + defer os.RemoveAll(log.path) + + points := make([]map[string][][]byte, 0) + log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte) error { + points = append(points, pointsByKey) + return nil + }} + + if err := log.Open(); err != nil { + t.Fatalf("couldn't open wal: %s", err.Error()) + } + + codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ + "value": { + ID: uint8(1), + Name: "value", + Type: influxql.Float, + }, + }) + + numSeries := 100 + b := make([]byte, 70*5000) + for i := 1; i <= 10; i++ { + buf := bytes.NewBuffer(b) + for j := 1; j <= numSeries; j++ { + buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i)) + } + + // write the batch out + if err := log.WritePoints(parsePoints(buf.String(), codec)); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + buf = bytes.NewBuffer(b) + } + + // ensure we have some data + c := log.Cursor("cpu,host=A,region=uswest10") + k, _ := c.Next() + if btou64(k) != 1 { + t.Fatalf("expected first data point but got one with key: %v", k) + } + + time.Sleep(700 * time.Millisecond) + + // ensure that as a whole its not ready for flushing yet + if f := log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold); f != noFlush { + t.Fatalf("expected partition 1 to return noFlush from shouldFlush %v", f) + } + + // ensure that the partition is empty + if log.partitions[1].memorySize != 0 || len(log.partitions[1].cache) != 0 { + t.Fatal("expected partition to be empty") + } + // ensure that we didn't bother to open a new segment file + if log.partitions[1].currentSegmentFile != nil { + t.Fatal("expected partition to not have an open segment file") + } +} + // test that partitions get compacted and flushed when number of series hits compaction threshold // test that partitions get compacted and flushed when a single series hits the compaction threshold // test that writes slow down when the partition size threshold is hit