diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e8109fe0d..d0a0d83fb0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,9 @@ - [#7391](https://github.com/influxdata/influxdb/issues/7391): Fix RLE integer decoding producing negative numbers - [#7335](https://github.com/influxdata/influxdb/pull/7335): Avoid stat syscall when planning compactions - [#7330](https://github.com/influxdata/influxdb/issues/7330): Subscription data loss under high write load +- [#1834](https://github.com/influxdata/influxdb/issues/1834): Drop time when used as a tag or field key. +- [#7152](https://github.com/influxdata/influxdb/issues/7152): Decrement number of measurements only once when deleting the last series from a measurement. +- [#7161](https://github.com/influxdata/influxdb/issues/7161): Drop measurement causes cache max memory exceeded error. ## v1.0.1 [2016-09-26] diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 0795e8b87f..6a4fc75298 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -491,41 +491,66 @@ type Compactor struct { NextGeneration() int } - mu sync.RWMutex - opened bool - closing chan struct{} - files map[string]struct{} + mu sync.RWMutex + snapshotsEnabled bool + compactionsEnabled bool + + files map[string]struct{} } func (c *Compactor) Open() { c.mu.Lock() defer c.mu.Unlock() - if c.opened { + if c.snapshotsEnabled || c.compactionsEnabled { return } - c.closing = make(chan struct{}) - c.opened = true + c.snapshotsEnabled = true + c.compactionsEnabled = true c.files = make(map[string]struct{}) } func (c *Compactor) Close() { c.mu.Lock() defer c.mu.Unlock() - if !c.opened { + if !(c.snapshotsEnabled || c.compactionsEnabled) { return } - c.opened = false - close(c.closing) + c.snapshotsEnabled = false + c.compactionsEnabled = false +} + +func (c *Compactor) DisableSnapshots() { + c.mu.Lock() + c.snapshotsEnabled = false + c.mu.Unlock() +} + +func (c *Compactor) EnabledSnapshots() { + c.mu.Lock() + c.snapshotsEnabled = true + c.mu.Unlock() +} + +func (c *Compactor) DisableCompactions() { + c.mu.Lock() + c.compactionsEnabled = false + c.mu.Unlock() +} + +func (c *Compactor) EnabledCompactions() { + c.mu.Lock() + c.compactionsEnabled = true + c.mu.Unlock() } // WriteSnapshot will write a Cache snapshot to a new TSM files. func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { c.mu.RLock() - opened := c.opened + enabled := c.snapshotsEnabled c.mu.RUnlock() - if !opened { + if !enabled { return nil, errSnapshotsDisabled } @@ -534,10 +559,10 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { // See if we were closed while writing a snapshot c.mu.RLock() - opened = c.opened + enabled = c.snapshotsEnabled c.mu.RUnlock() - if !opened { + if !enabled { return nil, errSnapshotsDisabled } @@ -601,10 +626,10 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { // Compact will write multiple smaller TSM files into 1 or more larger files func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) { c.mu.RLock() - opened := c.opened + enabled := c.compactionsEnabled c.mu.RUnlock() - if !opened { + if !enabled { return nil, errCompactionsDisabled } @@ -617,10 +642,10 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) { // See if we were closed while writing a snapshot c.mu.RLock() - opened = c.opened + enabled = c.compactionsEnabled c.mu.RUnlock() - if !opened { + if !enabled { return nil, errCompactionsDisabled } @@ -630,10 +655,10 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) { // Compact will write multiple smaller TSM files into 1 or more larger files func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) { c.mu.RLock() - opened := c.opened + enabled := c.compactionsEnabled c.mu.RUnlock() - if !opened { + if !enabled { return nil, errCompactionsDisabled } @@ -646,10 +671,10 @@ func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) { // See if we were closed while writing a snapshot c.mu.RLock() - opened = c.opened + enabled = c.compactionsEnabled c.mu.RUnlock() - if !opened { + if !enabled { return nil, errCompactionsDisabled } @@ -717,14 +742,12 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) { for iter.Next() { c.mu.RLock() - select { - case <-c.closing: - c.mu.RUnlock() - return errCompactionAborted - default: - } + enabled := c.snapshotsEnabled || c.compactionsEnabled c.mu.RUnlock() + if !enabled { + return errCompactionAborted + } // Each call to read returns the next sorted key (or the prior one if there are // more values to write). The size of values will be less than or equal to our // chunk size (1000) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 80a1543d84..b4c3954c09 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -58,10 +58,13 @@ const ( // Engine represents a storage engine with compressed blocks. type Engine struct { - mu sync.RWMutex - done chan struct{} - wg sync.WaitGroup - compactionsEnabled bool + mu sync.RWMutex + done chan struct{} + snapshotterDone chan struct{} + wg sync.WaitGroup + snapshotterWg sync.WaitGroup + levelCompactionsEnabled bool + snapshotCompactionsEnabled bool path string logger *log.Logger // Logger to be used for important messages @@ -151,49 +154,83 @@ func (e *Engine) SetEnabled(enabled bool) { // all running compactions are aborted and new compactions stop running. func (e *Engine) SetCompactionsEnabled(enabled bool) { if enabled { - e.mu.Lock() - if e.compactionsEnabled { - e.mu.Unlock() - return - } - e.compactionsEnabled = true + e.enableSnapshotCompactions() + e.enableLevelCompactions() - e.done = make(chan struct{}) - e.Compactor.Open() - - e.mu.Unlock() - - e.wg.Add(5) - go e.compactCache() - go e.compactTSMFull() - go e.compactTSMLevel(true, 1) - go e.compactTSMLevel(true, 2) - go e.compactTSMLevel(false, 3) } else { - e.mu.Lock() - if !e.compactionsEnabled { - e.mu.Unlock() - return - } - // Prevent new compactions from starting - e.compactionsEnabled = false - e.mu.Unlock() - - // Stop all background compaction goroutines - close(e.done) - - // Abort any running goroutines (this could take a while) - e.Compactor.Close() - - // Wait for compaction goroutines to exit - e.wg.Wait() - - if err := e.cleanup(); err != nil { - e.logger.Printf("error cleaning up temp file: %v", err) - } + e.disableSnapshotCompactions() + e.disableLevelCompactions() } } +func (e *Engine) enableLevelCompactions() { + e.mu.Lock() + if e.levelCompactionsEnabled { + e.mu.Unlock() + return + } + e.levelCompactionsEnabled = true + e.Compactor.EnabledCompactions() + e.done = make(chan struct{}) + e.mu.Unlock() + + e.wg.Add(4) + go e.compactTSMFull() + go e.compactTSMLevel(true, 1) + go e.compactTSMLevel(true, 2) + go e.compactTSMLevel(false, 3) +} + +func (e *Engine) disableLevelCompactions() { + e.mu.Lock() + if !e.levelCompactionsEnabled { + e.mu.Unlock() + return + } + // Prevent new compactions from starting + e.levelCompactionsEnabled = false + e.Compactor.DisableCompactions() + e.mu.Unlock() + + // Stop all background compaction goroutines + close(e.done) + + // Wait for compaction goroutines to exit + e.wg.Wait() + + if err := e.cleanup(); err != nil { + e.logger.Printf("error cleaning up temp file: %v", err) + } +} + +func (e *Engine) enableSnapshotCompactions() { + e.mu.Lock() + if e.snapshotCompactionsEnabled { + e.mu.Unlock() + return + } + + e.snapshotCompactionsEnabled = true + e.snapshotterDone = make(chan struct{}) + e.Compactor.EnabledSnapshots() + e.mu.Unlock() + + e.snapshotterWg.Add(1) + go e.compactCache() +} + +func (e *Engine) disableSnapshotCompactions() { + e.mu.Lock() + if !e.snapshotCompactionsEnabled { + e.mu.Unlock() + return + } + e.snapshotCompactionsEnabled = false + e.Compactor.DisableSnapshots() + e.mu.Unlock() + e.snapshotterWg.Wait() +} + // Path returns the path the engine was opened with. func (e *Engine) Path() string { return e.path } @@ -294,6 +331,8 @@ func (e *Engine) Open() error { return err } + e.Compactor.Open() + if e.enableCompactionsOnOpen { e.SetCompactionsEnabled(true) } @@ -645,12 +684,12 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error { // Disable and abort running compactions so that tombstones added existing tsm // files don't get removed. This would cause deleted measurements/series to - // re-appear once the compaction completed. - e.SetCompactionsEnabled(false) - defer e.SetCompactionsEnabled(true) - - e.mu.RLock() - defer e.mu.RUnlock() + // re-appear once the compaction completed. We only disable the level compactions + // so that snapshotting does not stop while writing out tombstones. If it is stopped, + // and writing tombstones takes a long time, writes can get rejected due to the cache + // filling up. + e.disableLevelCompactions() + defer e.enableLevelCompactions() // keyMap is used to see if a given key should be deleted. seriesKey // are the measurement + tagset (minus separate & field) @@ -822,13 +861,21 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) ( // compactCache continually checks if the WAL cache should be written to disk func (e *Engine) compactCache() { - defer e.wg.Done() + defer e.snapshotterWg.Done() for { select { - case <-e.done: + case <-e.snapshotterDone: return default: + e.mu.RLock() + enabled := e.snapshotCompactionsEnabled + e.mu.RUnlock() + + if !enabled { + return + } + e.Cache.UpdateAge() if e.ShouldCompactCache(e.WAL.LastWriteTime()) { start := time.Now() diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 99a0c22d5a..6b721d17a5 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -650,13 +650,16 @@ func (f *FileStore) BlockCount(path string, idx int) int { // walkFiles calls fn for every files in filestore in parallel func (f *FileStore) walkFiles(fn func(f TSMFile) error) error { + // Copy the current TSM files to prevent a slow walker from + // blocking other operations. f.mu.RLock() - defer f.mu.RUnlock() + files := make([]TSMFile, len(f.files)) + copy(files, f.files) + f.mu.RUnlock() // struct to hold the result of opening each reader in a goroutine - - errC := make(chan error, len(f.files)) - for _, f := range f.files { + errC := make(chan error, len(files)) + for _, f := range files { go func(tsm TSMFile) { if err := fn(tsm); err != nil { errC <- fmt.Errorf("file %s: %s", tsm.Path(), err)