diff --git a/tsdb/engine.go b/tsdb/engine.go index 41f7425f50..b62a09e21d 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -43,6 +43,7 @@ type Engine interface { SeriesCount() (n int, err error) MeasurementFields(measurement string) *MeasurementFields CreateSnapshot() (string, error) + SetEnabled(enabled bool) // Format will return the format for the engine Format() EngineFormat diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index c7bea637a4..3e879ef737 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -18,6 +18,7 @@ import ( "os" "path/filepath" "sort" + "sync" "time" "github.com/influxdata/influxdb/tsdb" @@ -30,7 +31,12 @@ const ( TSMFileExtension = "tsm" ) -var errMaxFileExceeded = fmt.Errorf("max file exceeded") +var ( + errMaxFileExceeded = fmt.Errorf("max file exceeded") + errSnapshotsDisabled = fmt.Errorf("snapshots disabled") + errCompactionsDisabled = fmt.Errorf("compactions disabled") + errCompactionAborted = fmt.Errorf("compaction aborted") +) var ( MaxTime = time.Unix(0, math.MaxInt64) @@ -406,17 +412,49 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations { // Compactor merges multiple TSM files into new files or // writes a Cache into 1 or more TSM files type Compactor struct { - Dir string - Cancel chan struct{} - Size int + Dir string + Size int FileStore interface { NextGeneration() int } + + mu sync.RWMutex + opened bool + closing chan struct{} +} + +func (c *Compactor) Open() { + c.mu.Lock() + defer c.mu.Unlock() + if c.opened { + return + } + + c.closing = make(chan struct{}) + c.opened = true +} + +func (c *Compactor) Close() { + c.mu.Lock() + defer c.mu.Unlock() + if !c.opened { + return + } + c.opened = false + close(c.closing) } // WriteSnapshot will write a Cache snapshot to a new TSM files. func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { + c.mu.RLock() + opened := c.opened + c.mu.RUnlock() + + if !opened { + return nil, errSnapshotsDisabled + } + iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock) return c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter) } @@ -477,21 +515,28 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { // Compact will write multiple smaller TSM files into 1 or more larger files func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) { + c.mu.RLock() + opened := c.opened + c.mu.RUnlock() + + if !opened { + return nil, errCompactionsDisabled + } + return c.compact(false, tsmFiles) } // Compact will write multiple smaller TSM files into 1 or more larger files func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) { - return c.compact(true, tsmFiles) -} + c.mu.RLock() + opened := c.opened + c.mu.RUnlock() -// Clone will return a new compactor that can be used even if the engine is closed -func (c *Compactor) Clone() *Compactor { - return &Compactor{ - Dir: c.Dir, - FileStore: c.FileStore, - Cancel: c.Cancel, + if !opened { + return nil, errCompactionsDisabled } + + return c.compact(true, tsmFiles) } // writeNewFiles will write from the iterator into new TSM files, rotating @@ -560,11 +605,14 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) { }() for iter.Next() { + c.mu.RLock() select { - case <-c.Cancel: - return fmt.Errorf("compaction aborted") + case <-c.closing: + c.mu.RUnlock() + return errCompactionAborted default: } + c.mu.RUnlock() // Each call to read returns the next sorted key (or the prior one if there are // more values to write). The size of values will be less than or equal to our diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 356f961240..25ab9cd6c4 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -39,6 +39,17 @@ func TestCompactor_Snapshot(t *testing.T) { } files, err := compactor.WriteSnapshot(c) + if err == nil { + t.Fatalf("expected error writing snapshot: %v", err) + } + if len(files) > 0 { + t.Fatalf("no files should be compacted: got %v", len(files)) + + } + + compactor.Open() + + files, err = compactor.WriteSnapshot(c) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -111,6 +122,17 @@ func TestCompactor_CompactFull(t *testing.T) { } files, err := compactor.CompactFull([]string{f1, f2, f3}) + if err == nil { + t.Fatalf("expected error writing snapshot: %v", err) + } + if len(files) > 0 { + t.Fatalf("no files should be compacted: got %v", len(files)) + + } + + compactor.Open() + + files, err = compactor.CompactFull([]string{f1, f2, f3}) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -199,6 +221,7 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) { FileStore: &fakeFileStore{}, Size: 2, } + compactor.Open() files, err := compactor.CompactFull([]string{f1, f2, f3}) if err != nil { @@ -297,6 +320,7 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) { FileStore: &fakeFileStore{}, Size: 2, } + compactor.Open() files, err := compactor.CompactFull([]string{f1, f2, f3}) if err != nil { @@ -396,6 +420,7 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) { FileStore: &fakeFileStore{}, Size: 2, } + compactor.Open() files, err := compactor.CompactFull([]string{f1, f2, f3}) if err != nil { @@ -500,6 +525,7 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) { FileStore: &fakeFileStore{}, Size: 2, } + compactor.Open() files, err := compactor.CompactFull([]string{f1, f2, f3}) if err != nil { @@ -611,6 +637,7 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) { Dir: dir, FileStore: &fakeFileStore{}, } + compactor.Open() // Compact both files, should get 2 files back files, err := compactor.CompactFull([]string{f1Name, f2Name}) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 53c99a2f49..6d595cc540 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -38,9 +38,10 @@ const ( // Engine represents a storage engine with compressed blocks. type Engine struct { - mu sync.RWMutex - done chan struct{} - wg sync.WaitGroup + mu sync.RWMutex + done chan struct{} + wg sync.WaitGroup + compactionsEnabled bool path string logger *log.Logger @@ -106,6 +107,57 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine return e } +func (e *Engine) SetEnabled(enabled bool) { + e.SetCompactionsEnabled(enabled) +} + +// SetCompactionsEnabled enables compactions on the engine. When disabled +// all running compactions are aborted and new compactions stop running. +func (e *Engine) SetCompactionsEnabled(enabled bool) { + if enabled { + e.mu.Lock() + if e.compactionsEnabled { + e.mu.Unlock() + return + } + e.compactionsEnabled = true + + e.done = make(chan struct{}) + e.Compactor.Open() + + e.mu.Unlock() + + e.wg.Add(5) + go e.compactCache() + go e.compactTSMFull() + go e.compactTSMLevel(true, 1) + go e.compactTSMLevel(true, 2) + go e.compactTSMLevel(false, 3) + + e.logger.Printf("compactions enabled for: %v", e.path) + } else { + e.mu.Lock() + if !e.compactionsEnabled { + e.mu.Unlock() + return + } + // Prevent new compactions from starting + e.compactionsEnabled = false + e.mu.Unlock() + + // Stop all background compaction goroutines + close(e.done) + + // Abort any running goroutines (this could take a while) + e.Compactor.Close() + + // Wait for compaction goroutines to exit + e.wg.Wait() + + e.logger.Printf("compactions disabled for: %v", e.path) + } +} + // Path returns the path the engine was opened with. func (e *Engine) Path() string { return e.path } @@ -144,7 +196,6 @@ func (e *Engine) Format() tsdb.EngineFormat { // Open opens and initializes the engine. func (e *Engine) Open() error { e.done = make(chan struct{}) - e.Compactor.Cancel = e.done if err := os.MkdirAll(e.path, 0777); err != nil { return err @@ -166,28 +217,14 @@ func (e *Engine) Open() error { return err } - e.wg.Add(5) - go e.compactCache() - go e.compactTSMFull() - go e.compactTSMLevel(true, 1) - go e.compactTSMLevel(true, 2) - go e.compactTSMLevel(false, 3) + e.SetCompactionsEnabled(true) return nil } // Close closes the engine. Subsequent calls to Close are a nop. func (e *Engine) Close() error { - e.mu.RLock() - if e.done == nil { - e.mu.RUnlock() - return nil - } - e.mu.RUnlock() - - // Shutdown goroutines and wait. - close(e.done) - e.wg.Wait() + e.SetCompactionsEnabled(false) // Lock now and close everything else down. e.mu.Lock() @@ -565,7 +602,7 @@ func (e *Engine) WriteSnapshot() error { } }() - closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) { + closedFiles, snapshot, err := func() ([]string, *Cache, error) { e.mu.Lock() defer e.mu.Unlock() @@ -573,20 +610,20 @@ func (e *Engine) WriteSnapshot() error { started = &now if err := e.WAL.CloseSegment(); err != nil { - return nil, nil, nil, err + return nil, nil, err } segments, err := e.WAL.ClosedSegments() if err != nil { - return nil, nil, nil, err + return nil, nil, err } snapshot, err := e.Cache.Snapshot() if err != nil { - return nil, nil, nil, err + return nil, nil, err } - return segments, snapshot, e.Compactor.Clone(), nil + return segments, snapshot, nil }() if err != nil { @@ -598,7 +635,7 @@ func (e *Engine) WriteSnapshot() error { // holding the engine write lock. snapshot.Deduplicate() - return e.writeSnapshotAndCommit(closedFiles, snapshot, compactor) + return e.writeSnapshotAndCommit(closedFiles, snapshot) } // CreateSnapshot will create a temp directory that holds @@ -615,7 +652,7 @@ func (e *Engine) CreateSnapshot() (string, error) { } // writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments -func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache, compactor *Compactor) (err error) { +func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (err error) { defer func() { if err != nil { @@ -623,7 +660,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache, c } }() // write the new snapshot files - newFiles, err := compactor.WriteSnapshot(snapshot) + newFiles, err := e.Compactor.WriteSnapshot(snapshot) if err != nil { e.logger.Printf("error writing snapshot from compactor: %v", err) return err diff --git a/tsdb/shard.go b/tsdb/shard.go index 1fce56dc2b..b8c7a239af 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -54,6 +54,10 @@ var ( // ErrEngineClosed is returned when a caller attempts indirectly to // access the shard's underlying engine. ErrEngineClosed = errors.New("engine is closed") + + // ErrShardDisabled is returned when a the shard is not available for + // queries or writes. + ErrShardDisabled = errors.New("shard is disabled") ) // A ShardError implements the error interface, and contains extra @@ -93,6 +97,7 @@ type Shard struct { mu sync.RWMutex engine Engine closing chan struct{} + enabled bool // expvar-based stats. statMap *expvar.Map @@ -140,11 +145,22 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti func (s *Shard) SetLogOutput(w io.Writer) { s.LogOutput = w s.logger = log.New(w, "[shard] ", log.LstdFlags) - if !s.closed() { + if err := s.ready(); err == nil { s.engine.SetLogOutput(w) } } +// SetEnabled enables the shard for queries and write. When disabled, all +// writes and queries return an error and compactions are stopped for the shard. +func (s *Shard) SetEnabled(enabled bool) { + s.mu.Lock() + // Prevent writes and queries + s.enabled = enabled + // Disable background compactions and snapshotting + s.engine.SetEnabled(enabled) + s.mu.Unlock() +} + // Path returns the path set on the shard when it was created. func (s *Shard) Path() string { return s.path } @@ -173,6 +189,9 @@ func (s *Shard) Open() error { return err } + // Disable compactions while loading the index + e.SetEnabled(false) + // Load metadata index. start := time.Now() if err := e.LoadMetadataIndex(s.id, s.index); err != nil { @@ -194,6 +213,9 @@ func (s *Shard) Open() error { return NewShardError(s.id, err) } + // enable writes, queries and compactions + s.SetEnabled(true) + return nil } @@ -226,12 +248,19 @@ func (s *Shard) close() error { return err } -// closed determines if the Shard is closed. -func (s *Shard) closed() bool { +// ready determines if the Shard is ready for queries or writes. +// It returns nil if ready, otherwise ErrShardClosed or ErrShardDiabled +func (s *Shard) ready() error { + var err error + s.mu.RLock() - closed := s.engine == nil + if s.engine == nil { + err = ErrEngineClosed + } else if !s.enabled { + err = ErrShardDisabled + } s.mu.RUnlock() - return closed + return err } // DiskSize returns the size on disk of this shard @@ -290,8 +319,8 @@ type SeriesCreate struct { // WritePoints will write the raw data points and any new metadata to the index in the shard func (s *Shard) WritePoints(points []models.Point) error { - if s.closed() { - return ErrEngineClosed + if err := s.ready(); err != nil { + return err } s.mu.RLock() @@ -321,8 +350,8 @@ func (s *Shard) WritePoints(points []models.Point) error { } func (s *Shard) ContainsSeries(seriesKeys []string) (map[string]bool, error) { - if s.closed() { - return nil, ErrEngineClosed + if err := s.ready(); err != nil { + return nil, err } return s.engine.ContainsSeries(seriesKeys) @@ -330,8 +359,8 @@ func (s *Shard) ContainsSeries(seriesKeys []string) (map[string]bool, error) { // DeleteSeries deletes a list of series. func (s *Shard) DeleteSeries(seriesKeys []string) error { - if s.closed() { - return ErrEngineClosed + if err := s.ready(); err != nil { + return err } if err := s.engine.DeleteSeries(seriesKeys); err != nil { return err @@ -341,9 +370,10 @@ func (s *Shard) DeleteSeries(seriesKeys []string) error { // DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive) func (s *Shard) DeleteSeriesRange(seriesKeys []string, min, max int64) error { - if s.closed() { - return ErrEngineClosed + if err := s.ready(); err != nil { + return err } + if err := s.engine.DeleteSeriesRange(seriesKeys, min, max); err != nil { return err } @@ -353,8 +383,8 @@ func (s *Shard) DeleteSeriesRange(seriesKeys []string, min, max int64) error { // DeleteMeasurement deletes a measurement and all underlying series. func (s *Shard) DeleteMeasurement(name string, seriesKeys []string) error { - if s.closed() { - return ErrEngineClosed + if err := s.ready(); err != nil { + return err } if err := s.engine.DeleteMeasurement(name, seriesKeys); err != nil { @@ -433,16 +463,16 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate, // SeriesCount returns the number of series buckets on the shard. func (s *Shard) SeriesCount() (int, error) { - if s.closed() { - return 0, ErrEngineClosed + if err := s.ready(); err != nil { + return 0, err } return s.engine.SeriesCount() } // WriteTo writes the shard's data to w. func (s *Shard) WriteTo(w io.Writer) (int64, error) { - if s.closed() { - return 0, ErrEngineClosed + if err := s.ready(); err != nil { + return 0, err } n, err := s.engine.WriteTo(w) s.statMap.Add(statWriteBytes, int64(n)) @@ -451,8 +481,8 @@ func (s *Shard) WriteTo(w io.Writer) (int64, error) { // CreateIterator returns an iterator for the data in the shard. func (s *Shard) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { - if s.closed() { - return nil, ErrEngineClosed + if err := s.ready(); err != nil { + return nil, err } if influxql.Sources(opt.Sources).HasSystemSource() { diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 3dbb007a93..16db25c164 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -354,6 +354,50 @@ cpu,host=serverB,region=uswest value=25 0 } } +func TestShard_Disabled_WriteQuery(t *testing.T) { + sh := NewShard() + if err := sh.Open(); err != nil { + t.Fatal(err) + } + defer sh.Close() + + sh.SetEnabled(false) + + pt := models.MustNewPoint( + "cpu", + map[string]string{"host": "server"}, + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + ) + + err := sh.WritePoints([]models.Point{pt}) + if err == nil { + t.Fatalf("expected shard disabled error") + } + if err != tsdb.ErrShardDisabled { + t.Fatalf(err.Error()) + } + + _, got := sh.CreateIterator(influxql.IteratorOptions{}) + if err == nil { + t.Fatalf("expected shard disabled error") + } + if exp := tsdb.ErrShardDisabled; got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } + + sh.SetEnabled(true) + + err = sh.WritePoints([]models.Point{pt}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if _, err = sh.CreateIterator(influxql.IteratorOptions{}); err != nil { + t.Fatalf("unexpected error: %v", got) + } +} + func BenchmarkWritePoints_NewSeries_1K(b *testing.B) { benchmarkWritePoints(b, 38, 3, 3, 1) } func BenchmarkWritePoints_NewSeries_100K(b *testing.B) { benchmarkWritePoints(b, 32, 5, 5, 1) } func BenchmarkWritePoints_NewSeries_250K(b *testing.B) { benchmarkWritePoints(b, 80, 5, 5, 1) } diff --git a/tsdb/store.go b/tsdb/store.go index 4366410470..d77f065b79 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -320,6 +320,16 @@ func (s *Store) CreateShardSnapshot(id uint64) (string, error) { return sh.CreateSnapshot() } +// SetShardEnabled enables or disables a shard for read and writes +func (s *Store) SetShardEnabled(shardID uint64, enabled bool) error { + sh := s.Shard(shardID) + if sh == nil { + return ErrShardNotFound + } + sh.SetEnabled(enabled) + return nil +} + // DeleteShard removes a shard from disk. func (s *Store) DeleteShard(shardID uint64) error { s.mu.Lock()