Allow snapshot compactions during deletes
If a delete takes a long time to process while writes to the shard are occuring, it was possible for the cache to fill up and writes to be rejected. This occurred because we disabled all compactions while writing tombstone file to prevent deleted data from re-appearing after a compaction completed. Instead, we only disable the level compactions and allow snapshot compactions to continue. Snapshots already handle deleted data with the cache and wal. Fixes #7161pull/7165/head
parent
4776e8ffd5
commit
f254b4f3ae
|
@ -55,6 +55,9 @@
|
||||||
- [#7391](https://github.com/influxdata/influxdb/issues/7391): Fix RLE integer decoding producing negative numbers
|
- [#7391](https://github.com/influxdata/influxdb/issues/7391): Fix RLE integer decoding producing negative numbers
|
||||||
- [#7335](https://github.com/influxdata/influxdb/pull/7335): Avoid stat syscall when planning compactions
|
- [#7335](https://github.com/influxdata/influxdb/pull/7335): Avoid stat syscall when planning compactions
|
||||||
- [#7330](https://github.com/influxdata/influxdb/issues/7330): Subscription data loss under high write load
|
- [#7330](https://github.com/influxdata/influxdb/issues/7330): Subscription data loss under high write load
|
||||||
|
- [#1834](https://github.com/influxdata/influxdb/issues/1834): Drop time when used as a tag or field key.
|
||||||
|
- [#7152](https://github.com/influxdata/influxdb/issues/7152): Decrement number of measurements only once when deleting the last series from a measurement.
|
||||||
|
- [#7161](https://github.com/influxdata/influxdb/issues/7161): Drop measurement causes cache max memory exceeded error.
|
||||||
|
|
||||||
## v1.0.1 [2016-09-26]
|
## v1.0.1 [2016-09-26]
|
||||||
|
|
||||||
|
|
|
@ -491,41 +491,66 @@ type Compactor struct {
|
||||||
NextGeneration() int
|
NextGeneration() int
|
||||||
}
|
}
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
opened bool
|
snapshotsEnabled bool
|
||||||
closing chan struct{}
|
compactionsEnabled bool
|
||||||
files map[string]struct{}
|
|
||||||
|
files map[string]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Compactor) Open() {
|
func (c *Compactor) Open() {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
if c.opened {
|
if c.snapshotsEnabled || c.compactionsEnabled {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.closing = make(chan struct{})
|
c.snapshotsEnabled = true
|
||||||
c.opened = true
|
c.compactionsEnabled = true
|
||||||
c.files = make(map[string]struct{})
|
c.files = make(map[string]struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Compactor) Close() {
|
func (c *Compactor) Close() {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
if !c.opened {
|
if !(c.snapshotsEnabled || c.compactionsEnabled) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.opened = false
|
c.snapshotsEnabled = false
|
||||||
close(c.closing)
|
c.compactionsEnabled = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Compactor) DisableSnapshots() {
|
||||||
|
c.mu.Lock()
|
||||||
|
c.snapshotsEnabled = false
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Compactor) EnabledSnapshots() {
|
||||||
|
c.mu.Lock()
|
||||||
|
c.snapshotsEnabled = true
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Compactor) DisableCompactions() {
|
||||||
|
c.mu.Lock()
|
||||||
|
c.compactionsEnabled = false
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Compactor) EnabledCompactions() {
|
||||||
|
c.mu.Lock()
|
||||||
|
c.compactionsEnabled = true
|
||||||
|
c.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteSnapshot will write a Cache snapshot to a new TSM files.
|
// WriteSnapshot will write a Cache snapshot to a new TSM files.
|
||||||
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
|
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
opened := c.opened
|
enabled := c.snapshotsEnabled
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
|
|
||||||
if !opened {
|
if !enabled {
|
||||||
return nil, errSnapshotsDisabled
|
return nil, errSnapshotsDisabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -534,10 +559,10 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
|
||||||
|
|
||||||
// See if we were closed while writing a snapshot
|
// See if we were closed while writing a snapshot
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
opened = c.opened
|
enabled = c.snapshotsEnabled
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
|
|
||||||
if !opened {
|
if !enabled {
|
||||||
return nil, errSnapshotsDisabled
|
return nil, errSnapshotsDisabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -601,10 +626,10 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
|
||||||
// Compact will write multiple smaller TSM files into 1 or more larger files
|
// Compact will write multiple smaller TSM files into 1 or more larger files
|
||||||
func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
|
func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
opened := c.opened
|
enabled := c.compactionsEnabled
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
|
|
||||||
if !opened {
|
if !enabled {
|
||||||
return nil, errCompactionsDisabled
|
return nil, errCompactionsDisabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -617,10 +642,10 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
|
||||||
|
|
||||||
// See if we were closed while writing a snapshot
|
// See if we were closed while writing a snapshot
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
opened = c.opened
|
enabled = c.compactionsEnabled
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
|
|
||||||
if !opened {
|
if !enabled {
|
||||||
return nil, errCompactionsDisabled
|
return nil, errCompactionsDisabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -630,10 +655,10 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
|
||||||
// Compact will write multiple smaller TSM files into 1 or more larger files
|
// Compact will write multiple smaller TSM files into 1 or more larger files
|
||||||
func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {
|
func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
opened := c.opened
|
enabled := c.compactionsEnabled
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
|
|
||||||
if !opened {
|
if !enabled {
|
||||||
return nil, errCompactionsDisabled
|
return nil, errCompactionsDisabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -646,10 +671,10 @@ func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {
|
||||||
|
|
||||||
// See if we were closed while writing a snapshot
|
// See if we were closed while writing a snapshot
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
opened = c.opened
|
enabled = c.compactionsEnabled
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
|
|
||||||
if !opened {
|
if !enabled {
|
||||||
return nil, errCompactionsDisabled
|
return nil, errCompactionsDisabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -717,14 +742,12 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) {
|
||||||
|
|
||||||
for iter.Next() {
|
for iter.Next() {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
select {
|
enabled := c.snapshotsEnabled || c.compactionsEnabled
|
||||||
case <-c.closing:
|
|
||||||
c.mu.RUnlock()
|
|
||||||
return errCompactionAborted
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
|
|
||||||
|
if !enabled {
|
||||||
|
return errCompactionAborted
|
||||||
|
}
|
||||||
// Each call to read returns the next sorted key (or the prior one if there are
|
// Each call to read returns the next sorted key (or the prior one if there are
|
||||||
// more values to write). The size of values will be less than or equal to our
|
// more values to write). The size of values will be less than or equal to our
|
||||||
// chunk size (1000)
|
// chunk size (1000)
|
||||||
|
|
|
@ -58,10 +58,13 @@ const (
|
||||||
|
|
||||||
// Engine represents a storage engine with compressed blocks.
|
// Engine represents a storage engine with compressed blocks.
|
||||||
type Engine struct {
|
type Engine struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
wg sync.WaitGroup
|
snapshotterDone chan struct{}
|
||||||
compactionsEnabled bool
|
wg sync.WaitGroup
|
||||||
|
snapshotterWg sync.WaitGroup
|
||||||
|
levelCompactionsEnabled bool
|
||||||
|
snapshotCompactionsEnabled bool
|
||||||
|
|
||||||
path string
|
path string
|
||||||
logger *log.Logger // Logger to be used for important messages
|
logger *log.Logger // Logger to be used for important messages
|
||||||
|
@ -151,49 +154,83 @@ func (e *Engine) SetEnabled(enabled bool) {
|
||||||
// all running compactions are aborted and new compactions stop running.
|
// all running compactions are aborted and new compactions stop running.
|
||||||
func (e *Engine) SetCompactionsEnabled(enabled bool) {
|
func (e *Engine) SetCompactionsEnabled(enabled bool) {
|
||||||
if enabled {
|
if enabled {
|
||||||
e.mu.Lock()
|
e.enableSnapshotCompactions()
|
||||||
if e.compactionsEnabled {
|
e.enableLevelCompactions()
|
||||||
e.mu.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
e.compactionsEnabled = true
|
|
||||||
|
|
||||||
e.done = make(chan struct{})
|
|
||||||
e.Compactor.Open()
|
|
||||||
|
|
||||||
e.mu.Unlock()
|
|
||||||
|
|
||||||
e.wg.Add(5)
|
|
||||||
go e.compactCache()
|
|
||||||
go e.compactTSMFull()
|
|
||||||
go e.compactTSMLevel(true, 1)
|
|
||||||
go e.compactTSMLevel(true, 2)
|
|
||||||
go e.compactTSMLevel(false, 3)
|
|
||||||
} else {
|
} else {
|
||||||
e.mu.Lock()
|
e.disableSnapshotCompactions()
|
||||||
if !e.compactionsEnabled {
|
e.disableLevelCompactions()
|
||||||
e.mu.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Prevent new compactions from starting
|
|
||||||
e.compactionsEnabled = false
|
|
||||||
e.mu.Unlock()
|
|
||||||
|
|
||||||
// Stop all background compaction goroutines
|
|
||||||
close(e.done)
|
|
||||||
|
|
||||||
// Abort any running goroutines (this could take a while)
|
|
||||||
e.Compactor.Close()
|
|
||||||
|
|
||||||
// Wait for compaction goroutines to exit
|
|
||||||
e.wg.Wait()
|
|
||||||
|
|
||||||
if err := e.cleanup(); err != nil {
|
|
||||||
e.logger.Printf("error cleaning up temp file: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *Engine) enableLevelCompactions() {
|
||||||
|
e.mu.Lock()
|
||||||
|
if e.levelCompactionsEnabled {
|
||||||
|
e.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
e.levelCompactionsEnabled = true
|
||||||
|
e.Compactor.EnabledCompactions()
|
||||||
|
e.done = make(chan struct{})
|
||||||
|
e.mu.Unlock()
|
||||||
|
|
||||||
|
e.wg.Add(4)
|
||||||
|
go e.compactTSMFull()
|
||||||
|
go e.compactTSMLevel(true, 1)
|
||||||
|
go e.compactTSMLevel(true, 2)
|
||||||
|
go e.compactTSMLevel(false, 3)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) disableLevelCompactions() {
|
||||||
|
e.mu.Lock()
|
||||||
|
if !e.levelCompactionsEnabled {
|
||||||
|
e.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Prevent new compactions from starting
|
||||||
|
e.levelCompactionsEnabled = false
|
||||||
|
e.Compactor.DisableCompactions()
|
||||||
|
e.mu.Unlock()
|
||||||
|
|
||||||
|
// Stop all background compaction goroutines
|
||||||
|
close(e.done)
|
||||||
|
|
||||||
|
// Wait for compaction goroutines to exit
|
||||||
|
e.wg.Wait()
|
||||||
|
|
||||||
|
if err := e.cleanup(); err != nil {
|
||||||
|
e.logger.Printf("error cleaning up temp file: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) enableSnapshotCompactions() {
|
||||||
|
e.mu.Lock()
|
||||||
|
if e.snapshotCompactionsEnabled {
|
||||||
|
e.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
e.snapshotCompactionsEnabled = true
|
||||||
|
e.snapshotterDone = make(chan struct{})
|
||||||
|
e.Compactor.EnabledSnapshots()
|
||||||
|
e.mu.Unlock()
|
||||||
|
|
||||||
|
e.snapshotterWg.Add(1)
|
||||||
|
go e.compactCache()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) disableSnapshotCompactions() {
|
||||||
|
e.mu.Lock()
|
||||||
|
if !e.snapshotCompactionsEnabled {
|
||||||
|
e.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
e.snapshotCompactionsEnabled = false
|
||||||
|
e.Compactor.DisableSnapshots()
|
||||||
|
e.mu.Unlock()
|
||||||
|
e.snapshotterWg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// Path returns the path the engine was opened with.
|
// Path returns the path the engine was opened with.
|
||||||
func (e *Engine) Path() string { return e.path }
|
func (e *Engine) Path() string { return e.path }
|
||||||
|
|
||||||
|
@ -294,6 +331,8 @@ func (e *Engine) Open() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.Compactor.Open()
|
||||||
|
|
||||||
if e.enableCompactionsOnOpen {
|
if e.enableCompactionsOnOpen {
|
||||||
e.SetCompactionsEnabled(true)
|
e.SetCompactionsEnabled(true)
|
||||||
}
|
}
|
||||||
|
@ -645,12 +684,12 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
|
||||||
|
|
||||||
// Disable and abort running compactions so that tombstones added existing tsm
|
// Disable and abort running compactions so that tombstones added existing tsm
|
||||||
// files don't get removed. This would cause deleted measurements/series to
|
// files don't get removed. This would cause deleted measurements/series to
|
||||||
// re-appear once the compaction completed.
|
// re-appear once the compaction completed. We only disable the level compactions
|
||||||
e.SetCompactionsEnabled(false)
|
// so that snapshotting does not stop while writing out tombstones. If it is stopped,
|
||||||
defer e.SetCompactionsEnabled(true)
|
// and writing tombstones takes a long time, writes can get rejected due to the cache
|
||||||
|
// filling up.
|
||||||
e.mu.RLock()
|
e.disableLevelCompactions()
|
||||||
defer e.mu.RUnlock()
|
defer e.enableLevelCompactions()
|
||||||
|
|
||||||
// keyMap is used to see if a given key should be deleted. seriesKey
|
// keyMap is used to see if a given key should be deleted. seriesKey
|
||||||
// are the measurement + tagset (minus separate & field)
|
// are the measurement + tagset (minus separate & field)
|
||||||
|
@ -822,13 +861,21 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (
|
||||||
|
|
||||||
// compactCache continually checks if the WAL cache should be written to disk
|
// compactCache continually checks if the WAL cache should be written to disk
|
||||||
func (e *Engine) compactCache() {
|
func (e *Engine) compactCache() {
|
||||||
defer e.wg.Done()
|
defer e.snapshotterWg.Done()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-e.done:
|
case <-e.snapshotterDone:
|
||||||
return
|
return
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
e.mu.RLock()
|
||||||
|
enabled := e.snapshotCompactionsEnabled
|
||||||
|
e.mu.RUnlock()
|
||||||
|
|
||||||
|
if !enabled {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
e.Cache.UpdateAge()
|
e.Cache.UpdateAge()
|
||||||
if e.ShouldCompactCache(e.WAL.LastWriteTime()) {
|
if e.ShouldCompactCache(e.WAL.LastWriteTime()) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
|
@ -650,13 +650,16 @@ func (f *FileStore) BlockCount(path string, idx int) int {
|
||||||
|
|
||||||
// walkFiles calls fn for every files in filestore in parallel
|
// walkFiles calls fn for every files in filestore in parallel
|
||||||
func (f *FileStore) walkFiles(fn func(f TSMFile) error) error {
|
func (f *FileStore) walkFiles(fn func(f TSMFile) error) error {
|
||||||
|
// Copy the current TSM files to prevent a slow walker from
|
||||||
|
// blocking other operations.
|
||||||
f.mu.RLock()
|
f.mu.RLock()
|
||||||
defer f.mu.RUnlock()
|
files := make([]TSMFile, len(f.files))
|
||||||
|
copy(files, f.files)
|
||||||
|
f.mu.RUnlock()
|
||||||
|
|
||||||
// struct to hold the result of opening each reader in a goroutine
|
// struct to hold the result of opening each reader in a goroutine
|
||||||
|
errC := make(chan error, len(files))
|
||||||
errC := make(chan error, len(f.files))
|
for _, f := range files {
|
||||||
for _, f := range f.files {
|
|
||||||
go func(tsm TSMFile) {
|
go func(tsm TSMFile) {
|
||||||
if err := fn(tsm); err != nil {
|
if err := fn(tsm); err != nil {
|
||||||
errC <- fmt.Errorf("file %s: %s", tsm.Path(), err)
|
errC <- fmt.Errorf("file %s: %s", tsm.Path(), err)
|
||||||
|
|
Loading…
Reference in New Issue