Merge pull request #6744 from influxdata/jw-shard-enabled

Add ability to disable shards
pull/6732/head
Jason Wilder 2016-05-31 11:05:05 -06:00
commit 05ec9599b7
7 changed files with 260 additions and 63 deletions

View File

@ -43,6 +43,7 @@ type Engine interface {
SeriesCount() (n int, err error)
MeasurementFields(measurement string) *MeasurementFields
CreateSnapshot() (string, error)
SetEnabled(enabled bool)
// Format will return the format for the engine
Format() EngineFormat

View File

@ -18,6 +18,7 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"time"
"github.com/influxdata/influxdb/tsdb"
@ -30,7 +31,12 @@ const (
TSMFileExtension = "tsm"
)
var errMaxFileExceeded = fmt.Errorf("max file exceeded")
var (
errMaxFileExceeded = fmt.Errorf("max file exceeded")
errSnapshotsDisabled = fmt.Errorf("snapshots disabled")
errCompactionsDisabled = fmt.Errorf("compactions disabled")
errCompactionAborted = fmt.Errorf("compaction aborted")
)
var (
MaxTime = time.Unix(0, math.MaxInt64)
@ -406,17 +412,49 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations {
// Compactor merges multiple TSM files into new files or
// writes a Cache into 1 or more TSM files
type Compactor struct {
Dir string
Cancel chan struct{}
Size int
Dir string
Size int
FileStore interface {
NextGeneration() int
}
mu sync.RWMutex
opened bool
closing chan struct{}
}
func (c *Compactor) Open() {
c.mu.Lock()
defer c.mu.Unlock()
if c.opened {
return
}
c.closing = make(chan struct{})
c.opened = true
}
func (c *Compactor) Close() {
c.mu.Lock()
defer c.mu.Unlock()
if !c.opened {
return
}
c.opened = false
close(c.closing)
}
// WriteSnapshot will write a Cache snapshot to a new TSM files.
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
c.mu.RLock()
opened := c.opened
c.mu.RUnlock()
if !opened {
return nil, errSnapshotsDisabled
}
iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock)
return c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter)
}
@ -477,21 +515,28 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
// Compact will write multiple smaller TSM files into 1 or more larger files
func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
c.mu.RLock()
opened := c.opened
c.mu.RUnlock()
if !opened {
return nil, errCompactionsDisabled
}
return c.compact(false, tsmFiles)
}
// Compact will write multiple smaller TSM files into 1 or more larger files
func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {
return c.compact(true, tsmFiles)
}
c.mu.RLock()
opened := c.opened
c.mu.RUnlock()
// Clone will return a new compactor that can be used even if the engine is closed
func (c *Compactor) Clone() *Compactor {
return &Compactor{
Dir: c.Dir,
FileStore: c.FileStore,
Cancel: c.Cancel,
if !opened {
return nil, errCompactionsDisabled
}
return c.compact(true, tsmFiles)
}
// writeNewFiles will write from the iterator into new TSM files, rotating
@ -560,11 +605,14 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) {
}()
for iter.Next() {
c.mu.RLock()
select {
case <-c.Cancel:
return fmt.Errorf("compaction aborted")
case <-c.closing:
c.mu.RUnlock()
return errCompactionAborted
default:
}
c.mu.RUnlock()
// Each call to read returns the next sorted key (or the prior one if there are
// more values to write). The size of values will be less than or equal to our

View File

@ -39,6 +39,17 @@ func TestCompactor_Snapshot(t *testing.T) {
}
files, err := compactor.WriteSnapshot(c)
if err == nil {
t.Fatalf("expected error writing snapshot: %v", err)
}
if len(files) > 0 {
t.Fatalf("no files should be compacted: got %v", len(files))
}
compactor.Open()
files, err = compactor.WriteSnapshot(c)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -111,6 +122,17 @@ func TestCompactor_CompactFull(t *testing.T) {
}
files, err := compactor.CompactFull([]string{f1, f2, f3})
if err == nil {
t.Fatalf("expected error writing snapshot: %v", err)
}
if len(files) > 0 {
t.Fatalf("no files should be compacted: got %v", len(files))
}
compactor.Open()
files, err = compactor.CompactFull([]string{f1, f2, f3})
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -199,6 +221,7 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
FileStore: &fakeFileStore{},
Size: 2,
}
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3})
if err != nil {
@ -297,6 +320,7 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
FileStore: &fakeFileStore{},
Size: 2,
}
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3})
if err != nil {
@ -396,6 +420,7 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
FileStore: &fakeFileStore{},
Size: 2,
}
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3})
if err != nil {
@ -500,6 +525,7 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
FileStore: &fakeFileStore{},
Size: 2,
}
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3})
if err != nil {
@ -611,6 +637,7 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) {
Dir: dir,
FileStore: &fakeFileStore{},
}
compactor.Open()
// Compact both files, should get 2 files back
files, err := compactor.CompactFull([]string{f1Name, f2Name})

View File

@ -38,9 +38,10 @@ const (
// Engine represents a storage engine with compressed blocks.
type Engine struct {
mu sync.RWMutex
done chan struct{}
wg sync.WaitGroup
mu sync.RWMutex
done chan struct{}
wg sync.WaitGroup
compactionsEnabled bool
path string
logger *log.Logger
@ -106,6 +107,57 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
return e
}
func (e *Engine) SetEnabled(enabled bool) {
e.SetCompactionsEnabled(enabled)
}
// SetCompactionsEnabled enables compactions on the engine. When disabled
// all running compactions are aborted and new compactions stop running.
func (e *Engine) SetCompactionsEnabled(enabled bool) {
if enabled {
e.mu.Lock()
if e.compactionsEnabled {
e.mu.Unlock()
return
}
e.compactionsEnabled = true
e.done = make(chan struct{})
e.Compactor.Open()
e.mu.Unlock()
e.wg.Add(5)
go e.compactCache()
go e.compactTSMFull()
go e.compactTSMLevel(true, 1)
go e.compactTSMLevel(true, 2)
go e.compactTSMLevel(false, 3)
e.logger.Printf("compactions enabled for: %v", e.path)
} else {
e.mu.Lock()
if !e.compactionsEnabled {
e.mu.Unlock()
return
}
// Prevent new compactions from starting
e.compactionsEnabled = false
e.mu.Unlock()
// Stop all background compaction goroutines
close(e.done)
// Abort any running goroutines (this could take a while)
e.Compactor.Close()
// Wait for compaction goroutines to exit
e.wg.Wait()
e.logger.Printf("compactions disabled for: %v", e.path)
}
}
// Path returns the path the engine was opened with.
func (e *Engine) Path() string { return e.path }
@ -144,7 +196,6 @@ func (e *Engine) Format() tsdb.EngineFormat {
// Open opens and initializes the engine.
func (e *Engine) Open() error {
e.done = make(chan struct{})
e.Compactor.Cancel = e.done
if err := os.MkdirAll(e.path, 0777); err != nil {
return err
@ -166,28 +217,14 @@ func (e *Engine) Open() error {
return err
}
e.wg.Add(5)
go e.compactCache()
go e.compactTSMFull()
go e.compactTSMLevel(true, 1)
go e.compactTSMLevel(true, 2)
go e.compactTSMLevel(false, 3)
e.SetCompactionsEnabled(true)
return nil
}
// Close closes the engine. Subsequent calls to Close are a nop.
func (e *Engine) Close() error {
e.mu.RLock()
if e.done == nil {
e.mu.RUnlock()
return nil
}
e.mu.RUnlock()
// Shutdown goroutines and wait.
close(e.done)
e.wg.Wait()
e.SetCompactionsEnabled(false)
// Lock now and close everything else down.
e.mu.Lock()
@ -565,7 +602,7 @@ func (e *Engine) WriteSnapshot() error {
}
}()
closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) {
closedFiles, snapshot, err := func() ([]string, *Cache, error) {
e.mu.Lock()
defer e.mu.Unlock()
@ -573,20 +610,20 @@ func (e *Engine) WriteSnapshot() error {
started = &now
if err := e.WAL.CloseSegment(); err != nil {
return nil, nil, nil, err
return nil, nil, err
}
segments, err := e.WAL.ClosedSegments()
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
snapshot, err := e.Cache.Snapshot()
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
return segments, snapshot, e.Compactor.Clone(), nil
return segments, snapshot, nil
}()
if err != nil {
@ -598,7 +635,7 @@ func (e *Engine) WriteSnapshot() error {
// holding the engine write lock.
snapshot.Deduplicate()
return e.writeSnapshotAndCommit(closedFiles, snapshot, compactor)
return e.writeSnapshotAndCommit(closedFiles, snapshot)
}
// CreateSnapshot will create a temp directory that holds
@ -615,7 +652,7 @@ func (e *Engine) CreateSnapshot() (string, error) {
}
// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments
func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache, compactor *Compactor) (err error) {
func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (err error) {
defer func() {
if err != nil {
@ -623,7 +660,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache, c
}
}()
// write the new snapshot files
newFiles, err := compactor.WriteSnapshot(snapshot)
newFiles, err := e.Compactor.WriteSnapshot(snapshot)
if err != nil {
e.logger.Printf("error writing snapshot from compactor: %v", err)
return err

View File

@ -54,6 +54,10 @@ var (
// ErrEngineClosed is returned when a caller attempts indirectly to
// access the shard's underlying engine.
ErrEngineClosed = errors.New("engine is closed")
// ErrShardDisabled is returned when a the shard is not available for
// queries or writes.
ErrShardDisabled = errors.New("shard is disabled")
)
// A ShardError implements the error interface, and contains extra
@ -93,6 +97,7 @@ type Shard struct {
mu sync.RWMutex
engine Engine
closing chan struct{}
enabled bool
// expvar-based stats.
statMap *expvar.Map
@ -140,11 +145,22 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
func (s *Shard) SetLogOutput(w io.Writer) {
s.LogOutput = w
s.logger = log.New(w, "[shard] ", log.LstdFlags)
if !s.closed() {
if err := s.ready(); err == nil {
s.engine.SetLogOutput(w)
}
}
// SetEnabled enables the shard for queries and write. When disabled, all
// writes and queries return an error and compactions are stopped for the shard.
func (s *Shard) SetEnabled(enabled bool) {
s.mu.Lock()
// Prevent writes and queries
s.enabled = enabled
// Disable background compactions and snapshotting
s.engine.SetEnabled(enabled)
s.mu.Unlock()
}
// Path returns the path set on the shard when it was created.
func (s *Shard) Path() string { return s.path }
@ -173,6 +189,9 @@ func (s *Shard) Open() error {
return err
}
// Disable compactions while loading the index
e.SetEnabled(false)
// Load metadata index.
start := time.Now()
if err := e.LoadMetadataIndex(s.id, s.index); err != nil {
@ -194,6 +213,9 @@ func (s *Shard) Open() error {
return NewShardError(s.id, err)
}
// enable writes, queries and compactions
s.SetEnabled(true)
return nil
}
@ -226,12 +248,19 @@ func (s *Shard) close() error {
return err
}
// closed determines if the Shard is closed.
func (s *Shard) closed() bool {
// ready determines if the Shard is ready for queries or writes.
// It returns nil if ready, otherwise ErrShardClosed or ErrShardDiabled
func (s *Shard) ready() error {
var err error
s.mu.RLock()
closed := s.engine == nil
if s.engine == nil {
err = ErrEngineClosed
} else if !s.enabled {
err = ErrShardDisabled
}
s.mu.RUnlock()
return closed
return err
}
// DiskSize returns the size on disk of this shard
@ -290,8 +319,8 @@ type SeriesCreate struct {
// WritePoints will write the raw data points and any new metadata to the index in the shard
func (s *Shard) WritePoints(points []models.Point) error {
if s.closed() {
return ErrEngineClosed
if err := s.ready(); err != nil {
return err
}
s.mu.RLock()
@ -321,8 +350,8 @@ func (s *Shard) WritePoints(points []models.Point) error {
}
func (s *Shard) ContainsSeries(seriesKeys []string) (map[string]bool, error) {
if s.closed() {
return nil, ErrEngineClosed
if err := s.ready(); err != nil {
return nil, err
}
return s.engine.ContainsSeries(seriesKeys)
@ -330,8 +359,8 @@ func (s *Shard) ContainsSeries(seriesKeys []string) (map[string]bool, error) {
// DeleteSeries deletes a list of series.
func (s *Shard) DeleteSeries(seriesKeys []string) error {
if s.closed() {
return ErrEngineClosed
if err := s.ready(); err != nil {
return err
}
if err := s.engine.DeleteSeries(seriesKeys); err != nil {
return err
@ -341,9 +370,10 @@ func (s *Shard) DeleteSeries(seriesKeys []string) error {
// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
func (s *Shard) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
if s.closed() {
return ErrEngineClosed
if err := s.ready(); err != nil {
return err
}
if err := s.engine.DeleteSeriesRange(seriesKeys, min, max); err != nil {
return err
}
@ -353,8 +383,8 @@ func (s *Shard) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
// DeleteMeasurement deletes a measurement and all underlying series.
func (s *Shard) DeleteMeasurement(name string, seriesKeys []string) error {
if s.closed() {
return ErrEngineClosed
if err := s.ready(); err != nil {
return err
}
if err := s.engine.DeleteMeasurement(name, seriesKeys); err != nil {
@ -433,16 +463,16 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate,
// SeriesCount returns the number of series buckets on the shard.
func (s *Shard) SeriesCount() (int, error) {
if s.closed() {
return 0, ErrEngineClosed
if err := s.ready(); err != nil {
return 0, err
}
return s.engine.SeriesCount()
}
// WriteTo writes the shard's data to w.
func (s *Shard) WriteTo(w io.Writer) (int64, error) {
if s.closed() {
return 0, ErrEngineClosed
if err := s.ready(); err != nil {
return 0, err
}
n, err := s.engine.WriteTo(w)
s.statMap.Add(statWriteBytes, int64(n))
@ -451,8 +481,8 @@ func (s *Shard) WriteTo(w io.Writer) (int64, error) {
// CreateIterator returns an iterator for the data in the shard.
func (s *Shard) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
if s.closed() {
return nil, ErrEngineClosed
if err := s.ready(); err != nil {
return nil, err
}
if influxql.Sources(opt.Sources).HasSystemSource() {

View File

@ -354,6 +354,50 @@ cpu,host=serverB,region=uswest value=25 0
}
}
func TestShard_Disabled_WriteQuery(t *testing.T) {
sh := NewShard()
if err := sh.Open(); err != nil {
t.Fatal(err)
}
defer sh.Close()
sh.SetEnabled(false)
pt := models.MustNewPoint(
"cpu",
map[string]string{"host": "server"},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
err := sh.WritePoints([]models.Point{pt})
if err == nil {
t.Fatalf("expected shard disabled error")
}
if err != tsdb.ErrShardDisabled {
t.Fatalf(err.Error())
}
_, got := sh.CreateIterator(influxql.IteratorOptions{})
if err == nil {
t.Fatalf("expected shard disabled error")
}
if exp := tsdb.ErrShardDisabled; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
sh.SetEnabled(true)
err = sh.WritePoints([]models.Point{pt})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if _, err = sh.CreateIterator(influxql.IteratorOptions{}); err != nil {
t.Fatalf("unexpected error: %v", got)
}
}
func BenchmarkWritePoints_NewSeries_1K(b *testing.B) { benchmarkWritePoints(b, 38, 3, 3, 1) }
func BenchmarkWritePoints_NewSeries_100K(b *testing.B) { benchmarkWritePoints(b, 32, 5, 5, 1) }
func BenchmarkWritePoints_NewSeries_250K(b *testing.B) { benchmarkWritePoints(b, 80, 5, 5, 1) }

View File

@ -320,6 +320,16 @@ func (s *Store) CreateShardSnapshot(id uint64) (string, error) {
return sh.CreateSnapshot()
}
// SetShardEnabled enables or disables a shard for read and writes
func (s *Store) SetShardEnabled(shardID uint64, enabled bool) error {
sh := s.Shard(shardID)
if sh == nil {
return ErrShardNotFound
}
sh.SetEnabled(enabled)
return nil
}
// DeleteShard removes a shard from disk.
func (s *Store) DeleteShard(shardID uint64) error {
s.mu.Lock()