Merge pull request #6768 from influxdata/jw-disable-open

Allow creating shards in a disabled state
pull/6765/head
Jason Wilder 2016-06-02 08:34:51 -06:00
commit cd336095ca
9 changed files with 48 additions and 33 deletions

View File

@ -59,7 +59,7 @@ type PointsWriter struct {
}
TSDBStore interface {
CreateShard(database, retentionPolicy string, shardID uint64) error
CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error
WriteToShard(shardID uint64, points []models.Point) error
}
@ -280,7 +280,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
// If we've written to shard that should exist on the current node, but the store has
// not actually created this shard, tell it to create it and retry the write
if err == tsdb.ErrShardNotFound {
err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID)
err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true)
if err != nil {
w.Logger.Printf("write failed for shard %d: %v", shard.ID, err)
w.statMap.Add(statWriteErr, 1)

View File

@ -381,15 +381,15 @@ func (f *fakeShardWriter) WriteShard(shardID, nodeID uint64, points []models.Poi
type fakeStore struct {
WriteFn func(shardID uint64, points []models.Point) error
CreateShardfn func(database, retentionPolicy string, shardID uint64) error
CreateShardfn func(database, retentionPolicy string, shardID uint64, enabled bool) error
}
func (f *fakeStore) WriteToShard(shardID uint64, points []models.Point) error {
return f.WriteFn(shardID, points)
}
func (f *fakeStore) CreateShard(database, retentionPolicy string, shardID uint64) error {
return f.CreateShardfn(database, retentionPolicy, shardID)
func (f *fakeStore) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error {
return f.CreateShardfn(database, retentionPolicy, shardID, enabled)
}
func NewPointsWriterMetaClient() *PointsWriterMetaClient {

View File

@ -997,7 +997,7 @@ type IntoWriteRequest struct {
// TSDBStore is an interface for accessing the time series data store.
type TSDBStore interface {
CreateShard(database, policy string, shardID uint64) error
CreateShard(database, policy string, shardID uint64, enabled bool) error
WriteToShard(shardID uint64, points []models.Point) error
RestoreShard(id uint64, r io.Reader) error

View File

@ -189,7 +189,7 @@ func (e *QueryExecutor) ExecuteQuery(query, database string, chunkSize int) <-ch
// TSDBStore is a mockable implementation of coordinator.TSDBStore.
type TSDBStore struct {
CreateShardFn func(database, policy string, shardID uint64) error
CreateShardFn func(database, policy string, shardID uint64, enabled bool) error
WriteToShardFn func(shardID uint64, points []models.Point) error
RestoreShardFn func(id uint64, r io.Reader) error
@ -203,11 +203,11 @@ type TSDBStore struct {
ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator
}
func (s *TSDBStore) CreateShard(database, policy string, shardID uint64) error {
func (s *TSDBStore) CreateShard(database, policy string, shardID uint64, enabled bool) error {
if s.CreateShardFn == nil {
return nil
}
return s.CreateShardFn(database, policy, shardID)
return s.CreateShardFn(database, policy, shardID, enabled)
}
func (s *TSDBStore) WriteToShard(shardID uint64, points []models.Point) error {

View File

@ -67,6 +67,9 @@ type Engine struct {
// no writes have been committed to the WAL, the engine will write
// a snapshot of the cache to a TSM file
CacheFlushWriteColdDuration time.Duration
// Controls whether to enabled compactions when the engine is open
enableCompactionsOnOpen bool
}
// NewEngine returns a new instance of Engine.
@ -101,6 +104,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize,
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
enableCompactionsOnOpen: true,
}
e.SetLogOutput(os.Stderr)
@ -108,6 +112,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
}
func (e *Engine) SetEnabled(enabled bool) {
e.enableCompactionsOnOpen = enabled
e.SetCompactionsEnabled(enabled)
}
@ -217,7 +222,9 @@ func (e *Engine) Open() error {
return err
}
e.SetCompactionsEnabled(true)
if e.enableCompactionsOnOpen {
e.SetCompactionsEnabled(true)
}
return nil
}
@ -642,7 +649,7 @@ func (e *Engine) WriteSnapshot() error {
// temporary hardlinks to the underylyng shard files
func (e *Engine) CreateSnapshot() (string, error) {
if err := e.WriteSnapshot(); err != nil {
return "", nil
return "", err
}
e.mu.RLock()

View File

@ -626,7 +626,7 @@ func (f *FileStore) CreateSnapshot() (string, error) {
tmpPath := fmt.Sprintf("%s/%d.tmp", f.dir, f.currentTempDirID)
err := os.Mkdir(tmpPath, 0777)
if err != nil {
return "", nil
return "", err
}
for _, tsmf := range files {

View File

@ -105,7 +105,8 @@ type Shard struct {
logger *log.Logger
// The writer used by the logger.
LogOutput io.Writer
LogOutput io.Writer
EnableOnOpen bool
}
// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
@ -133,8 +134,9 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
database: db,
retentionPolicy: rp,
statMap: statMap,
LogOutput: os.Stderr,
statMap: statMap,
LogOutput: os.Stderr,
EnableOnOpen: true,
}
s.SetLogOutput(os.Stderr)
return s
@ -156,8 +158,10 @@ func (s *Shard) SetEnabled(enabled bool) {
s.mu.Lock()
// Prevent writes and queries
s.enabled = enabled
// Disable background compactions and snapshotting
s.engine.SetEnabled(enabled)
if s.engine != nil {
// Disable background compactions and snapshotting
s.engine.SetEnabled(enabled)
}
s.mu.Unlock()
}
@ -184,14 +188,14 @@ func (s *Shard) Open() error {
// Set log output on the engine.
e.SetLogOutput(s.LogOutput)
// Disable compactions while loading the index
e.SetEnabled(false)
// Open engine.
if err := e.Open(); err != nil {
return err
}
// Disable compactions while loading the index
e.SetEnabled(false)
// Load metadata index.
start := time.Now()
if err := e.LoadMetadataIndex(s.id, s.index); err != nil {
@ -213,8 +217,10 @@ func (s *Shard) Open() error {
return NewShardError(s.id, err)
}
// enable writes, queries and compactions
s.SetEnabled(true)
if s.EnableOnOpen {
// enable writes, queries and compactions
s.SetEnabled(true)
}
return nil
}

View File

@ -264,7 +264,7 @@ func (s *Store) ShardN() int {
}
// CreateShard creates a shard with the given id and retention policy on a database.
func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) error {
func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error {
s.mu.Lock()
defer s.mu.Unlock()
@ -300,6 +300,8 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
shard := NewShard(shardID, db, path, walPath, s.EngineOptions)
shard.SetLogOutput(s.logOutput)
shard.EnableOnOpen = enabled
if err := shard.Open(); err != nil {
return err
}

View File

@ -24,7 +24,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) {
defer s.Close()
// Create a new shard and verify that it exists.
if err := s.CreateShard("db0", "rp0", 1); err != nil {
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard")
@ -32,7 +32,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) {
// Create a new shard under the same retention policy, and verify
// that it exists.
if err := s.CreateShard("db0", "rp0", 2); err != nil {
if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
t.Fatal(err)
} else if sh := s.Shard(2); sh == nil {
t.Fatalf("expected shard")
@ -40,7 +40,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) {
// Create a new shard under a different retention policy, and
// verify that it exists.
if err := s.CreateShard("db0", "rp1", 3); err != nil {
if err := s.CreateShard("db0", "rp1", 3, true); err != nil {
t.Fatal(err)
} else if sh := s.Shard(3); sh == nil {
t.Fatalf("expected shard")
@ -92,7 +92,7 @@ func TestStore_CreateShard(t *testing.T) {
defer s.Close()
// Create a new shard and verify that it exists.
if err := s.CreateShard("db0", "rp0", 1); err != nil {
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard")
@ -101,7 +101,7 @@ func TestStore_CreateShard(t *testing.T) {
}
// Create another shard and verify that it exists.
if err := s.CreateShard("db0", "rp0", 2); err != nil {
if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
t.Fatal(err)
} else if sh := s.Shard(2); sh == nil {
t.Fatalf("expected shard")
@ -123,7 +123,7 @@ func TestStore_DeleteShard(t *testing.T) {
defer s.Close()
// Create a new shard and verify that it exists.
if err := s.CreateShard("db0", "rp0", 1); err != nil {
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard")
@ -143,7 +143,7 @@ func TestStore_CreateShardSnapShot(t *testing.T) {
defer s.Close()
// Create a new shard and verify that it exists.
if err := s.CreateShard("db0", "rp0", 1); err != nil {
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard")
@ -324,7 +324,7 @@ func TestStore_BackupRestoreShard(t *testing.T) {
}
// Create the shard on the other store and restore from buffer.
if err := s1.CreateShard("db0", "rp0", 100); err != nil {
if err := s1.CreateShard("db0", "rp0", 100, true); err != nil {
t.Fatal(err)
}
if err := s1.RestoreShard(100, &buf); err != nil {
@ -395,7 +395,7 @@ func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int)
// Create requested number of shards in the store & write points.
for shardID := 0; shardID < shardCnt; shardID++ {
if err := store.CreateShard("mydb", "myrp", uint64(shardID)); err != nil {
if err := store.CreateShard("mydb", "myrp", uint64(shardID), true); err != nil {
return fmt.Errorf("create shard: %s", err)
}
if err := store.BatchWrite(shardID, points); err != nil {
@ -466,7 +466,7 @@ func (s *Store) Close() error {
// MustCreateShardWithData creates a shard and writes line protocol data to it.
func (s *Store) MustCreateShardWithData(db, rp string, shardID int, data ...string) {
if err := s.CreateShard(db, rp, uint64(shardID)); err != nil {
if err := s.CreateShard(db, rp, uint64(shardID), true); err != nil {
panic(err)
}
s.MustWriteToShardString(shardID, data...)