From a74ea4cbf4888272b11c12c562a8b837a7f1c225 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 1 Jun 2016 16:17:18 -0600 Subject: [PATCH] Allow creating shards in a disable state For restoring a shard, we need to be able to have the shard open, but disabled. It was racy to open it and then disable it separately since writes/queries could occur in between that time. --- coordinator/points_writer.go | 4 ++-- coordinator/points_writer_test.go | 6 +++--- coordinator/statement_executor.go | 2 +- coordinator/statement_executor_test.go | 6 +++--- tsdb/engine/tsm1/engine.go | 11 +++++++++-- tsdb/engine/tsm1/file_store.go | 2 +- tsdb/shard.go | 26 ++++++++++++++++---------- tsdb/store.go | 4 +++- tsdb/store_test.go | 20 ++++++++++---------- 9 files changed, 48 insertions(+), 33 deletions(-) diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 5cee6892d5..30fe74fd25 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -59,7 +59,7 @@ type PointsWriter struct { } TSDBStore interface { - CreateShard(database, retentionPolicy string, shardID uint64) error + CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error WriteToShard(shardID uint64, points []models.Point) error } @@ -280,7 +280,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo // If we've written to shard that should exist on the current node, but the store has // not actually created this shard, tell it to create it and retry the write if err == tsdb.ErrShardNotFound { - err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID) + err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true) if err != nil { w.Logger.Printf("write failed for shard %d: %v", shard.ID, err) w.statMap.Add(statWriteErr, 1) diff --git a/coordinator/points_writer_test.go b/coordinator/points_writer_test.go index 511b48b2df..a2fedaa294 100644 --- a/coordinator/points_writer_test.go +++ b/coordinator/points_writer_test.go @@ -381,15 +381,15 @@ func (f *fakeShardWriter) WriteShard(shardID, nodeID uint64, points []models.Poi type fakeStore struct { WriteFn func(shardID uint64, points []models.Point) error - CreateShardfn func(database, retentionPolicy string, shardID uint64) error + CreateShardfn func(database, retentionPolicy string, shardID uint64, enabled bool) error } func (f *fakeStore) WriteToShard(shardID uint64, points []models.Point) error { return f.WriteFn(shardID, points) } -func (f *fakeStore) CreateShard(database, retentionPolicy string, shardID uint64) error { - return f.CreateShardfn(database, retentionPolicy, shardID) +func (f *fakeStore) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error { + return f.CreateShardfn(database, retentionPolicy, shardID, enabled) } func NewPointsWriterMetaClient() *PointsWriterMetaClient { diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index c490628052..820215d826 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -997,7 +997,7 @@ type IntoWriteRequest struct { // TSDBStore is an interface for accessing the time series data store. type TSDBStore interface { - CreateShard(database, policy string, shardID uint64) error + CreateShard(database, policy string, shardID uint64, enabled bool) error WriteToShard(shardID uint64, points []models.Point) error RestoreShard(id uint64, r io.Reader) error diff --git a/coordinator/statement_executor_test.go b/coordinator/statement_executor_test.go index 5e5b64b5e8..9917ffaa26 100644 --- a/coordinator/statement_executor_test.go +++ b/coordinator/statement_executor_test.go @@ -189,7 +189,7 @@ func (e *QueryExecutor) ExecuteQuery(query, database string, chunkSize int) <-ch // TSDBStore is a mockable implementation of coordinator.TSDBStore. type TSDBStore struct { - CreateShardFn func(database, policy string, shardID uint64) error + CreateShardFn func(database, policy string, shardID uint64, enabled bool) error WriteToShardFn func(shardID uint64, points []models.Point) error RestoreShardFn func(id uint64, r io.Reader) error @@ -203,11 +203,11 @@ type TSDBStore struct { ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator } -func (s *TSDBStore) CreateShard(database, policy string, shardID uint64) error { +func (s *TSDBStore) CreateShard(database, policy string, shardID uint64, enabled bool) error { if s.CreateShardFn == nil { return nil } - return s.CreateShardFn(database, policy, shardID) + return s.CreateShardFn(database, policy, shardID, enabled) } func (s *TSDBStore) WriteToShard(shardID uint64, points []models.Point) error { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 6d595cc540..a29e9def40 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -67,6 +67,9 @@ type Engine struct { // no writes have been committed to the WAL, the engine will write // a snapshot of the cache to a TSM file CacheFlushWriteColdDuration time.Duration + + // Controls whether to enabled compactions when the engine is open + enableCompactionsOnOpen bool } // NewEngine returns a new instance of Engine. @@ -101,6 +104,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize, CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration), + enableCompactionsOnOpen: true, } e.SetLogOutput(os.Stderr) @@ -108,6 +112,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine } func (e *Engine) SetEnabled(enabled bool) { + e.enableCompactionsOnOpen = enabled e.SetCompactionsEnabled(enabled) } @@ -217,7 +222,9 @@ func (e *Engine) Open() error { return err } - e.SetCompactionsEnabled(true) + if e.enableCompactionsOnOpen { + e.SetCompactionsEnabled(true) + } return nil } @@ -642,7 +649,7 @@ func (e *Engine) WriteSnapshot() error { // temporary hardlinks to the underylyng shard files func (e *Engine) CreateSnapshot() (string, error) { if err := e.WriteSnapshot(); err != nil { - return "", nil + return "", err } e.mu.RLock() diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 39d41625a3..47ee9f37a4 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -626,7 +626,7 @@ func (f *FileStore) CreateSnapshot() (string, error) { tmpPath := fmt.Sprintf("%s/%d.tmp", f.dir, f.currentTempDirID) err := os.Mkdir(tmpPath, 0777) if err != nil { - return "", nil + return "", err } for _, tsmf := range files { diff --git a/tsdb/shard.go b/tsdb/shard.go index b8c7a239af..c300091dcc 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -105,7 +105,8 @@ type Shard struct { logger *log.Logger // The writer used by the logger. - LogOutput io.Writer + LogOutput io.Writer + EnableOnOpen bool } // NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index @@ -133,8 +134,9 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti database: db, retentionPolicy: rp, - statMap: statMap, - LogOutput: os.Stderr, + statMap: statMap, + LogOutput: os.Stderr, + EnableOnOpen: true, } s.SetLogOutput(os.Stderr) return s @@ -156,8 +158,10 @@ func (s *Shard) SetEnabled(enabled bool) { s.mu.Lock() // Prevent writes and queries s.enabled = enabled - // Disable background compactions and snapshotting - s.engine.SetEnabled(enabled) + if s.engine != nil { + // Disable background compactions and snapshotting + s.engine.SetEnabled(enabled) + } s.mu.Unlock() } @@ -184,14 +188,14 @@ func (s *Shard) Open() error { // Set log output on the engine. e.SetLogOutput(s.LogOutput) + // Disable compactions while loading the index + e.SetEnabled(false) + // Open engine. if err := e.Open(); err != nil { return err } - // Disable compactions while loading the index - e.SetEnabled(false) - // Load metadata index. start := time.Now() if err := e.LoadMetadataIndex(s.id, s.index); err != nil { @@ -213,8 +217,10 @@ func (s *Shard) Open() error { return NewShardError(s.id, err) } - // enable writes, queries and compactions - s.SetEnabled(true) + if s.EnableOnOpen { + // enable writes, queries and compactions + s.SetEnabled(true) + } return nil } diff --git a/tsdb/store.go b/tsdb/store.go index d77f065b79..4a3e76228c 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -264,7 +264,7 @@ func (s *Store) ShardN() int { } // CreateShard creates a shard with the given id and retention policy on a database. -func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) error { +func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error { s.mu.Lock() defer s.mu.Unlock() @@ -300,6 +300,8 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)) shard := NewShard(shardID, db, path, walPath, s.EngineOptions) shard.SetLogOutput(s.logOutput) + shard.EnableOnOpen = enabled + if err := shard.Open(); err != nil { return err } diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 087ea6b25f..792fa79c29 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -24,7 +24,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) { defer s.Close() // Create a new shard and verify that it exists. - if err := s.CreateShard("db0", "rp0", 1); err != nil { + if err := s.CreateShard("db0", "rp0", 1, true); err != nil { t.Fatal(err) } else if sh := s.Shard(1); sh == nil { t.Fatalf("expected shard") @@ -32,7 +32,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) { // Create a new shard under the same retention policy, and verify // that it exists. - if err := s.CreateShard("db0", "rp0", 2); err != nil { + if err := s.CreateShard("db0", "rp0", 2, true); err != nil { t.Fatal(err) } else if sh := s.Shard(2); sh == nil { t.Fatalf("expected shard") @@ -40,7 +40,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) { // Create a new shard under a different retention policy, and // verify that it exists. - if err := s.CreateShard("db0", "rp1", 3); err != nil { + if err := s.CreateShard("db0", "rp1", 3, true); err != nil { t.Fatal(err) } else if sh := s.Shard(3); sh == nil { t.Fatalf("expected shard") @@ -92,7 +92,7 @@ func TestStore_CreateShard(t *testing.T) { defer s.Close() // Create a new shard and verify that it exists. - if err := s.CreateShard("db0", "rp0", 1); err != nil { + if err := s.CreateShard("db0", "rp0", 1, true); err != nil { t.Fatal(err) } else if sh := s.Shard(1); sh == nil { t.Fatalf("expected shard") @@ -101,7 +101,7 @@ func TestStore_CreateShard(t *testing.T) { } // Create another shard and verify that it exists. - if err := s.CreateShard("db0", "rp0", 2); err != nil { + if err := s.CreateShard("db0", "rp0", 2, true); err != nil { t.Fatal(err) } else if sh := s.Shard(2); sh == nil { t.Fatalf("expected shard") @@ -123,7 +123,7 @@ func TestStore_DeleteShard(t *testing.T) { defer s.Close() // Create a new shard and verify that it exists. - if err := s.CreateShard("db0", "rp0", 1); err != nil { + if err := s.CreateShard("db0", "rp0", 1, true); err != nil { t.Fatal(err) } else if sh := s.Shard(1); sh == nil { t.Fatalf("expected shard") @@ -143,7 +143,7 @@ func TestStore_CreateShardSnapShot(t *testing.T) { defer s.Close() // Create a new shard and verify that it exists. - if err := s.CreateShard("db0", "rp0", 1); err != nil { + if err := s.CreateShard("db0", "rp0", 1, true); err != nil { t.Fatal(err) } else if sh := s.Shard(1); sh == nil { t.Fatalf("expected shard") @@ -324,7 +324,7 @@ func TestStore_BackupRestoreShard(t *testing.T) { } // Create the shard on the other store and restore from buffer. - if err := s1.CreateShard("db0", "rp0", 100); err != nil { + if err := s1.CreateShard("db0", "rp0", 100, true); err != nil { t.Fatal(err) } if err := s1.RestoreShard(100, &buf); err != nil { @@ -395,7 +395,7 @@ func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) // Create requested number of shards in the store & write points. for shardID := 0; shardID < shardCnt; shardID++ { - if err := store.CreateShard("mydb", "myrp", uint64(shardID)); err != nil { + if err := store.CreateShard("mydb", "myrp", uint64(shardID), true); err != nil { return fmt.Errorf("create shard: %s", err) } if err := store.BatchWrite(shardID, points); err != nil { @@ -466,7 +466,7 @@ func (s *Store) Close() error { // MustCreateShardWithData creates a shard and writes line protocol data to it. func (s *Store) MustCreateShardWithData(db, rp string, shardID int, data ...string) { - if err := s.CreateShard(db, rp, uint64(shardID)); err != nil { + if err := s.CreateShard(db, rp, uint64(shardID), true); err != nil { panic(err) } s.MustWriteToShardString(shardID, data...)