From 7dc5327a0a80c173f2dc67a3f607d7d836132e1f Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 11 Dec 2017 17:01:32 -0700 Subject: [PATCH 1/4] Adjust snapshot concurrency by latency This changes the approach to adjusting the amount of concurrency used for snapshotting to be based on the snapshot latency vs cardinality. The cardinality approach could use too much concurrency and increase the number of level 1 TSM files too quickly which incurs more disk IO. The latency model seems to adjust better to different workloads. --- tsdb/engine/tsm1/compact.go | 75 +++++++++++++++++--------------- tsdb/engine/tsm1/compact_test.go | 10 ++++- tsdb/engine/tsm1/engine.go | 2 +- 3 files changed, 50 insertions(+), 37 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 57d938a993..192fc25b62 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -251,21 +251,10 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { } } - // Determine the minimum number of files required for the level. Higher levels are more - // CPU intensive so we only want to include them when we have enough data to make them - // worthwhile. - // minGenerations 1 -> 2 - // minGenerations 2 -> 2 - // minGenerations 3 -> 4 - // minGenerations 4 -> 4 - minGenerations := level - if minGenerations%2 != 0 { - minGenerations = level + 1 - } - + minGenerations := 4 var cGroups []CompactionGroup for _, group := range levelGroups { - for _, chunk := range group.chunk(4) { + for _, chunk := range group.chunk(minGenerations) { var cGroup CompactionGroup var hasTombstones bool for _, gen := range chunk { @@ -676,6 +665,12 @@ type Compactor struct { snapshotsEnabled bool compactionsEnabled bool + // lastSnapshotDuration is the amount of time the last snapshot took to complete. + lastSnapshotDuration time.Duration + + // snapshotConcurrency is the amount of parallelism used to snapshot the cache. + snapshotConcurrency int + // The channel to signal that any in progress snapshots should be aborted. snapshotsInterrupt chan struct{} // The channel to signal that any in progress level compactions should be aborted. @@ -696,6 +691,7 @@ func (c *Compactor) Open() { c.compactionsEnabled = true c.snapshotsInterrupt = make(chan struct{}) c.compactionsInterrupt = make(chan struct{}) + c.snapshotConcurrency = 1 c.files = make(map[string]struct{}) } @@ -764,32 +760,14 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { c.mu.RLock() enabled := c.snapshotsEnabled intC := c.snapshotsInterrupt + concurrency := c.snapshotConcurrency c.mu.RUnlock() if !enabled { return nil, errSnapshotsDisabled } - card := cache.Count() - - concurrency, maxConcurrency := 1, runtime.GOMAXPROCS(0)/2 - if maxConcurrency < 1 { - maxConcurrency = 1 - } - if maxConcurrency > 4 { - maxConcurrency = 4 - } - - concurrency = 1 - if card >= 3*1024*1024 { - concurrency = 4 - } else if card >= 1024*1024 { - concurrency = 2 - } - - if concurrency > maxConcurrency { - concurrency = maxConcurrency - } + start := time.Now() splits := cache.Split(concurrency) @@ -818,10 +796,37 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { files = append(files, result.files...) } + dur := time.Since(start).Truncate(time.Second) + maxConcurrency := runtime.GOMAXPROCS(0) / 2 + if maxConcurrency < 1 { + maxConcurrency = 1 + } + if maxConcurrency > 4 { + maxConcurrency = 4 + } + + c.mu.Lock() + // See if we were disabled while writing a snapshot - c.mu.RLock() enabled = c.snapshotsEnabled - c.mu.RUnlock() + + // See if we need to adjust our snapshot concurrency + if dur > 30*time.Second && dur > c.lastSnapshotDuration { + // Increase snapshot concurrency if they are running slow + c.snapshotConcurrency++ + if c.snapshotConcurrency > maxConcurrency { + c.snapshotConcurrency = maxConcurrency + } + } else if dur < 30*time.Second && dur < c.lastSnapshotDuration { + // Decrease snapshot concurrency if they are running too fast + c.snapshotConcurrency-- + if c.snapshotConcurrency < 1 { + c.snapshotConcurrency = 1 + } + } + + c.lastSnapshotDuration = dur + c.mu.Unlock() if !enabled { return nil, errSnapshotsDisabled diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 69e5b0c392..59221315d9 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -1461,6 +1461,14 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) { Path: "06-01.tsm1", Size: 1 * 1024 * 1024, }, + tsm1.FileStat{ + Path: "07-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "08-01.tsm1", + Size: 1 * 1024 * 1024, + }, } cp := tsm1.NewDefaultPlanner( @@ -1471,7 +1479,7 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles := []tsm1.FileStat{data[4], data[5]} + expFiles := []tsm1.FileStat{data[4], data[5], data[6], data[7]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index d77dc3b99a..971236fbbe 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1694,7 +1694,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { } func (e *Engine) compact(quit <-chan struct{}) { - t := time.NewTicker(5 * time.Second) + t := time.NewTicker(10 * time.Second) defer t.Stop() for { From 6e3602c9376ad04fad8c7215d76751be959898ee Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 12 Dec 2017 09:59:39 -0700 Subject: [PATCH 2/4] Revert "Increase cache-snapshot-memory-size default" This reverts commit 171b427a1b39ecb3ecbbbe65b7819b5e5e202375. --- etc/config.sample.toml | 2 +- tsdb/config.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 5f9a4e8f78..c56eb1ca63 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -79,7 +79,7 @@ # snapshot the cache and write it to a TSM file, freeing up memory # Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k). # Values without a size suffix are in bytes. - # cache-snapshot-memory-size = "256m" + # cache-snapshot-memory-size = "25m" # CacheSnapshotWriteColdDuration is the length of time at # which the engine will snapshot the cache and write it to diff --git a/tsdb/config.go b/tsdb/config.go index 5041a44cae..6ab91feaad 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -26,7 +26,7 @@ const ( // DefaultCacheSnapshotMemorySize is the size at which the engine will // snapshot the cache and write it to a TSM file, freeing up memory - DefaultCacheSnapshotMemorySize = 256 * 1024 * 1024 // 256MB + DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB // DefaultCacheSnapshotWriteColdDuration is the length of time at which // the engine will snapshot the cache and write it to a new TSM file if From 749c9d2483c5ad42bf6ebad52b2ea45e7745eecf Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 12 Dec 2017 12:16:16 -0700 Subject: [PATCH 3/4] Rate limit disk IO when writing TSM files This limits the disk IO for writing TSM files during compactions and snapshots. This helps reduce the spiky IO patterns on SSDs and when compactions run very quickly. --- Godeps | 1 + pkg/limiter/write_test.go | 34 +++++++++++++++ pkg/limiter/writer.go | 83 +++++++++++++++++++++++++++++++++++++ tsdb/engine.go | 3 +- tsdb/engine/tsm1/compact.go | 14 ++++++- tsdb/engine/tsm1/engine.go | 1 + tsdb/engine/tsm1/writer.go | 61 +++++++++++++++++++++++++-- tsdb/store.go | 7 ++++ 8 files changed, 198 insertions(+), 6 deletions(-) create mode 100644 pkg/limiter/write_test.go create mode 100644 pkg/limiter/writer.go diff --git a/Godeps b/Godeps index ac7a0fc8dc..33f3e2f83a 100644 --- a/Godeps +++ b/Godeps @@ -37,3 +37,4 @@ golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd golang.org/x/net 9dfe39835686865bff950a07b394c12a98ddc811 golang.org/x/sys 062cd7e4e68206d8bab9b18396626e855c992658 golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34 +golang.org/x/time 6dc17368e09b0e8634d71cac8168d853e869a0c7 diff --git a/pkg/limiter/write_test.go b/pkg/limiter/write_test.go new file mode 100644 index 0000000000..094e15d067 --- /dev/null +++ b/pkg/limiter/write_test.go @@ -0,0 +1,34 @@ +package limiter_test + +import ( + "bytes" + "io" + "testing" + "time" + + "github.com/influxdata/influxdb/pkg/limiter" +) + +func TestWriter_Limited(t *testing.T) { + r := bytes.NewReader(bytes.Repeat([]byte{0}, 1024*1024)) + + limit := 512 * 1024 + w := limiter.NewWriter(discardCloser{}, limit, 10*1024*1024) + + start := time.Now() + n, err := io.Copy(w, r) + elapsed := time.Since(start) + if err != nil { + t.Error("copy error: ", err) + } + + rate := float64(n) / elapsed.Seconds() + if rate > float64(limit) { + t.Errorf("rate limit mismath: exp %f, got %f", float64(limit), rate) + } +} + +type discardCloser struct{} + +func (d discardCloser) Write(b []byte) (int, error) { return len(b), nil } +func (d discardCloser) Close() error { return nil } diff --git a/pkg/limiter/writer.go b/pkg/limiter/writer.go new file mode 100644 index 0000000000..f14981a5cc --- /dev/null +++ b/pkg/limiter/writer.go @@ -0,0 +1,83 @@ +package limiter + +import ( + "context" + "io" + "os" + "time" + + "golang.org/x/time/rate" +) + +type Writer struct { + w io.WriteCloser + limiter Rate + ctx context.Context +} + +type Rate interface { + WaitN(ctx context.Context, n int) error +} + +func NewRate(bytesPerSec, burstLimit int) Rate { + limiter := rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) + limiter.AllowN(time.Now(), burstLimit) // spend initial burst + return limiter +} + +// NewWriter returns a writer that implements io.Writer with rate limiting. +// The limiter use a token bucket approach and limits the rate to bytesPerSec +// with a maximum burst of burstLimit. +func NewWriter(w io.WriteCloser, bytesPerSec, burstLimit int) *Writer { + limiter := NewRate(bytesPerSec, burstLimit) + + return &Writer{ + w: w, + ctx: context.Background(), + limiter: limiter, + } +} + +// WithRate returns a Writer with the specified rate limiter. +func NewWriterWithRate(w io.WriteCloser, limiter Rate) *Writer { + return &Writer{ + w: w, + ctx: context.Background(), + limiter: limiter, + } +} + +// Write writes bytes from p. +func (s *Writer) Write(b []byte) (int, error) { + if s.limiter == nil { + return s.w.Write(b) + } + + n, err := s.w.Write(b) + if err != nil { + return n, err + } + + if err := s.limiter.WaitN(s.ctx, n); err != nil { + return n, err + } + return n, err +} + +func (s *Writer) Sync() error { + if f, ok := s.w.(*os.File); ok { + return f.Sync() + } + return nil +} + +func (s *Writer) Name() string { + if f, ok := s.w.(*os.File); ok { + return f.Name() + } + return "" +} + +func (s *Writer) Close() error { + return s.w.Close() +} diff --git a/tsdb/engine.go b/tsdb/engine.go index 73c74fa63b..c935d4febc 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -156,7 +156,8 @@ type EngineOptions struct { ShardID uint64 InmemIndex interface{} // shared in-memory index - CompactionLimiter limiter.Fixed + CompactionLimiter limiter.Fixed + CompactionThroughputLimiter limiter.Rate Config Config } diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 192fc25b62..bb3ee07c44 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -15,6 +15,7 @@ package tsm1 import ( "bytes" "fmt" + "io" "math" "os" "path/filepath" @@ -24,6 +25,7 @@ import ( "sync/atomic" "time" + "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/tsdb" ) @@ -661,6 +663,9 @@ type Compactor struct { TSMReader(path string) *TSMReader } + // RateLimit is the limit for disk writes for all concurrent compactions. + RateLimit limiter.Rate + mu sync.RWMutex snapshotsEnabled bool compactionsEnabled bool @@ -1033,15 +1038,20 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) { // Create the write for the new TSM file. var w TSMWriter + var limitWriter io.Writer = fd + if c.RateLimit != nil { + limitWriter = limiter.NewWriterWithRate(fd, c.RateLimit) + } + // Use a disk based TSM buffer if it looks like we might create a big index // in memory. if iter.EstimatedIndexSize() > 64*1024*1024 { - w, err = NewTSMWriterWithDiskBuffer(fd) + w, err = NewTSMWriterWithDiskBuffer(limitWriter) if err != nil { return err } } else { - w, err = NewTSMWriter(fd) + w, err = NewTSMWriter(limitWriter) if err != nil { return err } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 971236fbbe..fbc0de5fbf 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -190,6 +190,7 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string, c := &Compactor{ Dir: path, FileStore: fs, + RateLimit: opt.CompactionThroughputLimiter, } logger := zap.NewNop() diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index eda8985f2e..a82f22079e 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -100,7 +100,7 @@ const ( // The threshold amount data written before we periodically fsync a TSM file. This helps avoid // long pauses due to very large fsyncs at the end of writing a TSM file. - fsyncEvery = 512 * 1024 * 1024 + fsyncEvery = 25 * 1024 * 1024 ) var ( @@ -252,6 +252,11 @@ type indexBlock struct { entries *indexEntries } +type syncer interface { + Name() string + Sync() error +} + // directIndex is a simple in-memory index implementation for a TSM file. The full index // must fit in memory. type directIndex struct { @@ -263,6 +268,8 @@ type directIndex struct { fd *os.File buf *bytes.Buffer + f syncer + w *bufio.Writer key []byte @@ -367,6 +374,48 @@ func (d *directIndex) KeyCount() int { return d.keyCount } +// copyBuffer is the actual implementation of Copy and CopyBuffer. +// if buf is nil, one is allocated. This is copied from the Go stdlib +// in order to remove the fast path WriteTo calls which circumvent any +// IO throttling as well as to add periodic fsyncs to avoid long stalls. +func copyBuffer(f syncer, dst io.Writer, src io.Reader, buf []byte) (written int64, err error) { + if buf == nil { + buf = make([]byte, 32*1024) + } + var lastSync int64 + for { + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if nw > 0 { + written += int64(nw) + } + + if written-lastSync > fsyncEvery { + if err := f.Sync(); err != nil { + return 0, err + } + lastSync = written + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + if er != nil { + if er != io.EOF { + err = er + } + break + } + } + return written, err +} + func (d *directIndex) WriteTo(w io.Writer) (int64, error) { if _, err := d.flush(d.w); err != nil { return 0, err @@ -377,7 +426,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) { } if d.fd == nil { - return io.Copy(w, d.buf) + return copyBuffer(d.f, w, d.buf, nil) } if _, err := d.fd.Seek(0, io.SeekStart); err != nil { @@ -518,7 +567,7 @@ func NewTSMWriter(w io.Writer) (TSMWriter, error) { func NewTSMWriterWithDiskBuffer(w io.Writer) (TSMWriter, error) { var index IndexWriter // Make sure is a File so we can write the temp index alongside it. - if fw, ok := w.(*os.File); ok { + if fw, ok := w.(syncer); ok { f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return nil, err @@ -664,6 +713,12 @@ func (t *tsmWriter) WriteIndex() error { return ErrNoValues } + // Set the destination file on the index so we can periodically + // fsync while writing the index. + if f, ok := t.wrapped.(syncer); ok { + t.index.(*directIndex).f = f + } + // Write the index if _, err := t.index.WriteTo(t.w); err != nil { return err diff --git a/tsdb/store.go b/tsdb/store.go index a26e4e8fcf..1a61b05d19 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -187,6 +187,13 @@ func (s *Store) loadShards() error { s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim) + // Env var to disable throughput limiter. This will be moved to a config option in 1.5. + if os.Getenv("INFLUXDB_DATA_COMPACTION_THROUGHPUT") == "" { + s.EngineOptions.CompactionThroughputLimiter = limiter.NewRate(48*1024*1024, 48*1024*1024) + } else { + s.Logger.Info("Compaction throughput limit disabled") + } + t := limiter.NewFixed(runtime.GOMAXPROCS(0)) resC := make(chan *res) var n int From 2d85ff1d09c13a666ed3bd04b2197ea98655d845 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 13 Dec 2017 08:20:35 -0700 Subject: [PATCH 4/4] Adjust compaction planning Increase level 1 min criteria, fix only fast compactions getting run, and fix very large generations getting included in optimize plans. --- tsdb/engine/tsm1/compact.go | 109 +++++++++++++++--------- tsdb/engine/tsm1/compact_test.go | 142 +++++++++++++++---------------- tsdb/engine/tsm1/engine.go | 16 ++-- 3 files changed, 148 insertions(+), 119 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index bb3ee07c44..2e58058c53 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -254,6 +254,10 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { } minGenerations := 4 + if level == 1 { + minGenerations = 8 + } + var cGroups []CompactionGroup for _, group := range levelGroups { for _, chunk := range group.chunk(minGenerations) { @@ -314,6 +318,11 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup { for i := 0; i < len(generations); i++ { cur := generations[i] + // Skip the file if it's over the max size and contains a full block and it does not have any tombstones + if cur.count() > 2 && cur.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(cur.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !cur.hasTombstones() { + continue + } + // See if this generation is orphan'd which would prevent it from being further // compacted until a final full compactin runs. if i < len(generations)-1 { @@ -542,7 +551,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { compactable := []tsmGenerations{} for _, group := range groups { //if we don't have enough generations to compact, skip it - if len(group) < 2 && !group.hasTombstones() { + if len(group) < 4 && !group.hasTombstones() { continue } compactable = append(compactable, group) @@ -673,8 +682,7 @@ type Compactor struct { // lastSnapshotDuration is the amount of time the last snapshot took to complete. lastSnapshotDuration time.Duration - // snapshotConcurrency is the amount of parallelism used to snapshot the cache. - snapshotConcurrency int + snapshotLatencies *latencies // The channel to signal that any in progress snapshots should be aborted. snapshotsInterrupt chan struct{} @@ -696,7 +704,7 @@ func (c *Compactor) Open() { c.compactionsEnabled = true c.snapshotsInterrupt = make(chan struct{}) c.compactionsInterrupt = make(chan struct{}) - c.snapshotConcurrency = 1 + c.snapshotLatencies = &latencies{values: make([]time.Duration, 4)} c.files = make(map[string]struct{}) } @@ -765,7 +773,6 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { c.mu.RLock() enabled := c.snapshotsEnabled intC := c.snapshotsInterrupt - concurrency := c.snapshotConcurrency c.mu.RUnlock() if !enabled { @@ -773,6 +780,22 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { } start := time.Now() + card := cache.Count() + + // Enable throttling if we have lower cardinality or snapshots are going fast. + throttle := card < 3e6 && c.snapshotLatencies.avg() < 15*time.Second + + // Write snapshost concurrently if cardinality is relatively high. + concurrency := card / 2e6 + if concurrency < 1 { + concurrency = 1 + } + + // Special case very high cardinality, use max concurrency and don't throttle writes. + if card >= 3e6 { + concurrency = 4 + throttle = false + } splits := cache.Split(concurrency) @@ -785,7 +808,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { for i := 0; i < concurrency; i++ { go func(sp *Cache) { iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC) - files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter) + files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter, throttle) resC <- res{files: files, err: err} }(splits[i]) @@ -802,35 +825,13 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { } dur := time.Since(start).Truncate(time.Second) - maxConcurrency := runtime.GOMAXPROCS(0) / 2 - if maxConcurrency < 1 { - maxConcurrency = 1 - } - if maxConcurrency > 4 { - maxConcurrency = 4 - } c.mu.Lock() // See if we were disabled while writing a snapshot enabled = c.snapshotsEnabled - - // See if we need to adjust our snapshot concurrency - if dur > 30*time.Second && dur > c.lastSnapshotDuration { - // Increase snapshot concurrency if they are running slow - c.snapshotConcurrency++ - if c.snapshotConcurrency > maxConcurrency { - c.snapshotConcurrency = maxConcurrency - } - } else if dur < 30*time.Second && dur < c.lastSnapshotDuration { - // Decrease snapshot concurrency if they are running too fast - c.snapshotConcurrency-- - if c.snapshotConcurrency < 1 { - c.snapshotConcurrency = 1 - } - } - c.lastSnapshotDuration = dur + c.snapshotLatencies.add(time.Since(start)) c.mu.Unlock() if !enabled { @@ -899,7 +900,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { return nil, err } - return c.writeNewFiles(maxGeneration, maxSequence, tsm) + return c.writeNewFiles(maxGeneration, maxSequence, tsm, true) } // CompactFull writes multiple smaller TSM files into 1 or more larger files. @@ -980,7 +981,7 @@ func (c *Compactor) removeTmpFiles(files []string) error { // writeNewFiles writes from the iterator into new TSM files, rotating // to a new file once it has reached the max TSM file size. -func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([]string, error) { +func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator, throttle bool) ([]string, error) { // These are the new TSM files written var files []string @@ -990,7 +991,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.%s", generation, sequence, TSMFileExtension, TmpTSMFileExtension)) // Write as much as possible to this file - err := c.write(fileName, iter) + err := c.write(fileName, iter, throttle) // We've hit the max file limit and there is more to write. Create a new file // and continue. @@ -1029,17 +1030,19 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ return files, nil } -func (c *Compactor) write(path string, iter KeyIterator) (err error) { +func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err error) { fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return errCompactionInProgress{err: err} } // Create the write for the new TSM file. - var w TSMWriter + var ( + w TSMWriter + limitWriter io.Writer = fd + ) - var limitWriter io.Writer = fd - if c.RateLimit != nil { + if c.RateLimit != nil && throttle { limitWriter = limiter.NewWriterWithRate(fd, c.RateLimit) } @@ -1549,8 +1552,11 @@ func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIte } func (c *cacheKeyIterator) EstimatedIndexSize() int { - // We return 0 here since we already have all the entries in memory to write an index. - return 0 + var n int + for _, v := range c.order { + n += len(v) + } + return n } func (c *cacheKeyIterator) encode() { @@ -1724,3 +1730,30 @@ func (a tsmGenerations) IsSorted() bool { } return true } + +type latencies struct { + i int + values []time.Duration +} + +func (l *latencies) add(t time.Duration) { + l.values[l.i%len(l.values)] = t + l.i++ +} + +func (l *latencies) avg() time.Duration { + var n int64 + var sum time.Duration + for _, v := range l.values { + if v == 0 { + continue + } + sum += v + n++ + } + + if n > 0 { + return time.Duration(int64(sum) / n) + } + return time.Duration(0) +} diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 59221315d9..617b7567a6 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -1469,6 +1469,22 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) { Path: "08-01.tsm1", Size: 1 * 1024 * 1024, }, + tsm1.FileStat{ + Path: "09-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "10-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "11-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "12-01.tsm1", + Size: 1 * 1024 * 1024, + }, } cp := tsm1.NewDefaultPlanner( @@ -1479,7 +1495,7 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles := []tsm1.FileStat{data[4], data[5], data[6], data[7]} + expFiles := []tsm1.FileStat{data[4], data[5], data[6], data[7], data[8], data[9], data[10], data[11]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) @@ -1545,55 +1561,6 @@ func TestDefaultPlanner_PlanLevel_SplitFile(t *testing.T) { } } -func TestDefaultPlanner_PlanLevel_IsolatedLowLevel(t *testing.T) { - data := []tsm1.FileStat{ - tsm1.FileStat{ - Path: "01-03.tsm1", - Size: 251 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "02-03.tsm1", - Size: 1 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "03-01.tsm1", - Size: 2 * 1024 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "04-01.tsm1", - Size: 10 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "05-02.tsm1", - Size: 1 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "06-01.tsm1", - Size: 1 * 1024 * 1024, - }, - } - - cp := tsm1.NewDefaultPlanner( - &fakeFileStore{ - PathsFn: func() []tsm1.FileStat { - return data - }, - }, tsdb.DefaultCompactFullWriteColdDuration, - ) - - expFiles := []tsm1.FileStat{data[2], data[3]} - tsm := cp.PlanLevel(1) - if exp, got := len(expFiles), len(tsm[0]); got != exp { - t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) - } - - for i, p := range expFiles { - if got, exp := tsm[0][i], p.Path; got != exp { - t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) - } - } -} - func TestDefaultPlanner_PlanLevel_IsolatedHighLevel(t *testing.T) { data := []tsm1.FileStat{ tsm1.FileStat{ @@ -1810,8 +1777,7 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} - expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]} + expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { @@ -1823,16 +1789,6 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) } } - - if exp, got := len(expFiles2), len(tsm[1]); got != exp { - t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) - } - - for i, p := range expFiles2 { - if got, exp := tsm[1][i], p.Path; got != exp { - t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) - } - } } func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { @@ -1877,6 +1833,30 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { Path: "10-01.tsm1", Size: 1 * 1024 * 1024, }, + tsm1.FileStat{ + Path: "11-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "12-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "13-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "14-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "15-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "16-01.tsm1", + Size: 1 * 1024 * 1024, + }, } cp := tsm1.NewDefaultPlanner( @@ -1887,8 +1867,8 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} - expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]} + expFiles1 := data[0:8] + expFiles2 := data[8:16] tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { @@ -2567,25 +2547,41 @@ func TestDefaultPlanner_Plan_ForceFull(t *testing.T) { Size: 2148728539, }, tsm1.FileStat{ - Path: "000000005-000000002.tsm", - Size: 701863692, + Path: "000000005-000000001.tsm", + Size: 2148340232, }, tsm1.FileStat{ - Path: "000000006-000000002.tsm", - Size: 701863692, + Path: "000000006-000000001.tsm", + Size: 2148356556, }, tsm1.FileStat{ - Path: "000000007-000000002.tsm", - Size: 701863692, + Path: "000000007-000000001.tsm", + Size: 167780181, }, tsm1.FileStat{ - Path: "000000008-000000002.tsm", - Size: 701863692, + Path: "000000008-000000001.tsm", + Size: 2148728539, }, tsm1.FileStat{ Path: "000000009-000000002.tsm", Size: 701863692, }, + tsm1.FileStat{ + Path: "000000010-000000002.tsm", + Size: 701863692, + }, + tsm1.FileStat{ + Path: "000000011-000000002.tsm", + Size: 701863692, + }, + tsm1.FileStat{ + Path: "000000012-000000002.tsm", + Size: 701863692, + }, + tsm1.FileStat{ + Path: "000000013-000000002.tsm", + Size: 701863692, + }, } }, }, tsdb.DefaultCompactFullWriteColdDuration, @@ -2623,7 +2619,7 @@ func TestDefaultPlanner_Plan_ForceFull(t *testing.T) { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } - if got, exp := len(tsm[0]), 9; got != exp { + if got, exp := len(tsm[0]), 13; got != exp { t.Fatalf("plan length mismatch: got %v, exp %v", got, exp) } cp.Release(tsm) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index fbc0de5fbf..9bc17273e2 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1695,7 +1695,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { } func (e *Engine) compact(quit <-chan struct{}) { - t := time.NewTicker(10 * time.Second) + t := time.NewTicker(time.Second) defer t.Stop() for { @@ -1757,15 +1757,15 @@ func (e *Engine) compact(quit <-chan struct{}) { switch level { case 1: - if e.compactHiPriorityLevel(level1Groups[0], 1) { + if e.compactHiPriorityLevel(level1Groups[0], 1, false) { level1Groups = level1Groups[1:] } case 2: - if e.compactHiPriorityLevel(level2Groups[0], 2) { + if e.compactHiPriorityLevel(level2Groups[0], 2, false) { level2Groups = level2Groups[1:] } case 3: - if e.compactLoPriorityLevel(level3Groups[0], 3) { + if e.compactLoPriorityLevel(level3Groups[0], 3, true) { level3Groups = level3Groups[1:] } case 4: @@ -1786,8 +1786,8 @@ func (e *Engine) compact(quit <-chan struct{}) { // compactHiPriorityLevel kicks off compactions using the high priority policy. It returns // true if the compaction was started -func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int) bool { - s := e.levelCompactionStrategy(grp, true, level) +func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast bool) bool { + s := e.levelCompactionStrategy(grp, fast, level) if s == nil { return false } @@ -1815,8 +1815,8 @@ func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int) bool { // compactLoPriorityLevel kicks off compactions using the lo priority policy. It returns // the plans that were not able to be started -func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int) bool { - s := e.levelCompactionStrategy(grp, true, level) +func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast bool) bool { + s := e.levelCompactionStrategy(grp, fast, level) if s == nil { return false }