Make WAL flush after inactive for writes for a given interval.

pull/3717/head
Paul Dix 2015-08-07 18:48:09 -04:00
parent d4b04510ab
commit 301b014f3f
2 changed files with 187 additions and 54 deletions

View File

@ -53,7 +53,7 @@ const (
// DefaultFlushColdInterval specifies how long after a partition has been cold
// for writes that a full flush and compaction are forced
DefaultFlushColdInterval = 30 * time.Minute
DefaultFlushColdInterval = 5 * time.Minute
// DefaultParititionSizeThreshold specifies when a partition gets to this size in
// memory, we should slow down writes until it gets a chance to compact.
@ -74,6 +74,24 @@ const (
defaultFlushCheckInterval = time.Second
)
// flushType indiciates why a flush and compaction are being run so the partition can
// do the appropriate type of compaction
type flushType int
const (
// noFlush indicates that no flush or compaction are necesssary at this time
noFlush flushType = iota
// memoryFlush indicates that we should look for the series using the most
// memory to flush out and compact all others
memoryFlush
// idleFlush indicates that we should flush all series in the parition,
// delete all segment files and hold off on opening a new one
idleFlush
// thresholdFlush indicates that we should flush all series over the ReadySize
// and compact all other series
thresholdFlush
)
var (
// ErrCompactionRunning to return if we attempt to run a compaction on a partition that is currently running one
ErrCompactionRunning = errors.New("compaction running")
@ -108,9 +126,9 @@ type Log struct {
mu sync.RWMutex
partitions map[uint8]*Partition
// FlushInterval is the period of time after which a partition will do a
// FlushColdInterval is the period of time after which a partition will do a
// full flush and compaction if it has been cold for writes.
FlushInterval time.Duration
FlushColdInterval time.Duration
// SegmentSize is the file size at which a segment file will be rotated in a partition.
SegmentSize int64
@ -151,7 +169,7 @@ func NewLog(path string) *Log {
// these options should be overriden by any options in the config
LogOutput: os.Stderr,
FlushInterval: DefaultFlushColdInterval,
FlushColdInterval: DefaultFlushColdInterval,
SegmentSize: DefaultSegmentSize,
MaxSeriesSize: DefaultMaxSeriesSize,
CompactionThreshold: DefaultCompactionThreshold,
@ -164,10 +182,14 @@ func NewLog(path string) *Log {
// Open opens and initializes the Log. Will recover from previous unclosed shutdowns
func (l *Log) Open() error {
if err := os.MkdirAll(l.path, 0777); err != nil {
return err
}
// open the partitions
l.partitions = make(map[uint8]*Partition)
for i := uint64(1); i <= l.partitionCount; i++ {
p, err := NewPartition(uint8(i), l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.Index)
p, err := NewPartition(uint8(i), l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index)
if err != nil {
return err
}
@ -302,8 +324,8 @@ func (l *Log) triggerAutoFlush() {
l.mu.RLock()
defer l.mu.RUnlock()
for _, p := range l.partitions {
if p.shouldFlush(l.MaxSeriesSize, l.CompactionThreshold) {
if err := p.flushAndCompact(false); err != nil {
if f := p.shouldFlush(l.MaxSeriesSize, l.CompactionThreshold); f != noFlush {
if err := p.flushAndCompact(f); err != nil {
l.logger.Printf("error flushing partition %d: %s\n", p.id, err)
}
}
@ -338,7 +360,7 @@ func (l *Log) partition(key []byte) *Partition {
id := uint8(h.Sum64()%l.partitionCount + 1)
p := l.partitions[id]
if p == nil {
if p, err := NewPartition(id, l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.Index); err != nil {
if p, err := NewPartition(id, l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index); err != nil {
panic(err)
} else {
@ -358,7 +380,6 @@ type Partition struct {
currentSegmentSize int64
currentSegmentID uint32
lastFileID uint32
lastWrite time.Time
maxSegmentSize int64
cache map[string][][]byte
@ -379,20 +400,26 @@ type Partition struct {
cacheDirtySort map[string]bool // will be true if the key needs to be sorted
cacheSizes map[string]int
compactionRunning bool
// flushColdInterval and lastWriteTime are used to determin if a partition should
// be flushed because it has been idle for writes.
flushColdInterval time.Duration
lastWriteTime time.Time
}
func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64, readySeriesSize int, index IndexWriter) (*Partition, error) {
func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64, readySeriesSize int, flushColdInterval time.Duration, index IndexWriter) (*Partition, error) {
return &Partition{
id: id,
path: path,
maxSegmentSize: segmentSize,
sizeThreshold: sizeThreshold,
lastWrite: time.Now(),
lastWriteTime: time.Now(),
cache: make(map[string][][]byte),
cacheDirtySort: make(map[string]bool),
cacheSizes: make(map[string]int),
readySeriesSize: readySeriesSize,
index: index,
flushColdInterval: flushColdInterval,
}, nil
}
@ -432,7 +459,7 @@ func (p *Partition) Write(points []tsdb.Point) error {
return 0, false
}(); ok {
go p.flushAndCompact(true)
go p.flushAndCompact(memoryFlush)
time.Sleep(backoff)
}
p.mu.Lock()
@ -463,7 +490,7 @@ func (p *Partition) Write(points []tsdb.Point) error {
}
p.currentSegmentSize += int64(8 + len(b))
p.lastWrite = time.Now()
p.lastWriteTime = time.Now()
for _, pp := range points {
p.addToCache(pp.Key(), pp.Data(), pp.UnixNano())
@ -515,37 +542,45 @@ func (p *Partition) fileIDFromName(name string) (uint32, error) {
return uint32(id), nil
}
// shouldFlush returns true if a partition should be flushed. The criteria are:
// shouldFlush returns a flushType that indicates if a partition should be flushed and why. The criteria are:
// maxSeriesSize - flush if any series in the partition has exceeded this size threshold
// readySeriesSize - a series is ready to flush once it has this much data in it
// compactionThreshold - a partition is ready to flush if this percentage of series has hit the readySeriesSize or greater
func (p *Partition) shouldFlush(maxSeriesSize int, compactionThreshold float64) bool {
func (p *Partition) shouldFlush(maxSeriesSize int, compactionThreshold float64) flushType {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.cache) == 0 {
return noFlush
}
if p.memorySize > p.sizeThreshold {
return true
return memoryFlush
}
if time.Since(p.lastWriteTime) > p.flushColdInterval {
return idleFlush
}
countReady := 0
for _, s := range p.cacheSizes {
if s > maxSeriesSize {
return true
return thresholdFlush
} else if s > p.readySeriesSize {
countReady += 1
}
}
if float64(countReady)/float64(len(p.cacheSizes)) > compactionThreshold {
return true
return thresholdFlush
}
return false
return noFlush
}
// prepareSeriesToFlush will empty the cache of series that are ready based on their size
// and return information for the compaction process to use.
func (p *Partition) prepareSeriesToFlush(readySeriesSize int, triggerByMemory bool) (*compactionInfo, error) {
func (p *Partition) prepareSeriesToFlush(readySeriesSize int, flush flushType) (*compactionInfo, error) {
p.mu.Lock()
defer p.mu.Unlock()
@ -553,7 +588,7 @@ func (p *Partition) prepareSeriesToFlush(readySeriesSize int, triggerByMemory bo
// memory pressure, just return from here
if p.compactionRunning {
return nil, ErrCompactionRunning
} else if triggerByMemory && p.memorySize < p.sizeThreshold {
} else if flush == memoryFlush && p.memorySize < p.sizeThreshold {
return nil, ErrMemoryCompactionDone
}
p.compactionRunning = true
@ -563,6 +598,17 @@ func (p *Partition) prepareSeriesToFlush(readySeriesSize int, triggerByMemory bo
// if we didn't come up with any
var seriesToFlush map[string][][]byte
var size int
// if this flush is being triggered because the partition is idle, all series hit the threshold
if flush == idleFlush {
for _, s := range p.cacheSizes {
size += s
}
seriesToFlush = p.cache
p.cache = make(map[string][][]byte)
p.cacheDirtySort = make(map[string]bool)
p.cacheSizes = make(map[string]int)
} else { // only grab the series that hit the thresold
for {
s, n := p.seriesToFlush(readySeriesSize)
if len(s) > 0 {
@ -574,13 +620,27 @@ func (p *Partition) prepareSeriesToFlush(readySeriesSize int, triggerByMemory bo
// and see if there are series that are ready at that level
readySeriesSize = readySeriesSize / 2
}
}
c := &compactionInfo{seriesToFlush: seriesToFlush, flushSize: size}
if flush == idleFlush {
// don't create a new segment file because this partition is idle
if p.currentSegmentFile != nil {
if err := p.currentSegmentFile.Close(); err != nil {
return nil, err
}
}
p.currentSegmentFile = nil
p.currentSegmentID += 1
p.currentSegmentSize = 0
} else {
// roll over a new segment file so we can compact all the old ones
if err := p.newSegmentFile(); err != nil {
return nil, err
}
}
p.flushCache = c.seriesToFlush
c.compactFilesLessThan = p.currentSegmentID
@ -613,13 +673,15 @@ func (p *Partition) seriesToFlush(readySeriesSize int) (map[string][][]byte, int
// flushAndCompact will flush any series that are over their threshold and then read in all old segment files and
// write the data that was not flushed to a new file
func (p *Partition) flushAndCompact(triggerByMemory bool) error {
c, err := p.prepareSeriesToFlush(p.readySeriesSize, triggerByMemory)
func (p *Partition) flushAndCompact(flush flushType) error {
c, err := p.prepareSeriesToFlush(p.readySeriesSize, flush)
if err == ErrCompactionRunning || err == ErrMemoryCompactionDone {
return nil
} else if err != nil {
return err
} else if len(c.seriesToFlush) == 0 { // nothing to flush!
return nil
}
// write the data to the index first
@ -834,6 +896,7 @@ func (p *Partition) cursor(key string) *cursor {
// if we're in the middle of a flush, combine the previous cache
// with this one for the cursor
if p.flushCache != nil {
if fc, ok := p.flushCache[key]; ok {
c := make([][]byte, len(fc), len(fc)+len(cache))
copy(c, fc)
@ -841,6 +904,7 @@ func (p *Partition) cursor(key string) *cursor {
sort.Sort(byteSlices(c))
return &cursor{cache: c, position: -1}
}
}
if p.cacheDirtySort[key] {
sort.Sort(byteSlices(cache))

View File

@ -333,7 +333,7 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
buf.WriteString(fmt.Sprintf("cpu,host=A,region=useast3 value=%.3f %d\n", rand.Float64(), i))
// ensure that as a whole its not ready for flushing yet
if log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold) {
if log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold) != noFlush {
t.Fatal("expected partition 1 to return false from shouldFlush")
}
}
@ -349,15 +349,15 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
c := log.Cursor("cpu,host=A,region=uswest23")
k, v := c.Next()
if btou64(k) != 1 {
fmt.Println("expected data ", k, v)
t.Fatalf("expected timestamp of 1, but got %v %v", k, v)
}
// ensure it is marked as should flush because of the threshold
if !log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold) {
if log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold) != thresholdFlush {
t.Fatal("expected partition 1 to return true from shouldFlush")
}
if err := log.partitions[1].flushAndCompact(false); err != nil {
if err := log.partitions[1].flushAndCompact(thresholdFlush); err != nil {
t.Fatalf("error flushing and compacting: %s", err.Error())
}
@ -395,6 +395,75 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
}
}
// Ensure the wal forces a full flush after not having a write in a given interval of time
func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) {
log := openTestWAL()
log.partitionCount = 1
// set this low
log.flushCheckInterval = 10 * time.Millisecond
log.FlushColdInterval = 500 * time.Millisecond
defer log.Close()
defer os.RemoveAll(log.path)
points := make([]map[string][][]byte, 0)
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte) error {
points = append(points, pointsByKey)
return nil
}}
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
numSeries := 100
b := make([]byte, 70*5000)
for i := 1; i <= 10; i++ {
buf := bytes.NewBuffer(b)
for j := 1; j <= numSeries; j++ {
buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i))
}
// write the batch out
if err := log.WritePoints(parsePoints(buf.String(), codec)); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
buf = bytes.NewBuffer(b)
}
// ensure we have some data
c := log.Cursor("cpu,host=A,region=uswest10")
k, _ := c.Next()
if btou64(k) != 1 {
t.Fatalf("expected first data point but got one with key: %v", k)
}
time.Sleep(700 * time.Millisecond)
// ensure that as a whole its not ready for flushing yet
if f := log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold); f != noFlush {
t.Fatalf("expected partition 1 to return noFlush from shouldFlush %v", f)
}
// ensure that the partition is empty
if log.partitions[1].memorySize != 0 || len(log.partitions[1].cache) != 0 {
t.Fatal("expected partition to be empty")
}
// ensure that we didn't bother to open a new segment file
if log.partitions[1].currentSegmentFile != nil {
t.Fatal("expected partition to not have an open segment file")
}
}
// test that partitions get compacted and flushed when number of series hits compaction threshold
// test that partitions get compacted and flushed when a single series hits the compaction threshold
// test that writes slow down when the partition size threshold is hit