From e0304ae3d56334df9c08f9f5f22997c92921d577 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 29 Apr 2016 22:56:57 -0600 Subject: [PATCH] Fix shards not getting assigned to series on restart Also, simplifies the LoadMetaDataIndex func to not require a *Shard --- tsdb/engine.go | 2 +- tsdb/engine/tsm1/engine.go | 15 +++++---------- tsdb/engine/tsm1/engine_test.go | 8 ++++---- tsdb/shard.go | 13 +++++++++++-- 4 files changed, 21 insertions(+), 17 deletions(-) diff --git a/tsdb/engine.go b/tsdb/engine.go index d1fd7efd7d..3c2df38a74 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -29,7 +29,7 @@ type Engine interface { Close() error SetLogOutput(io.Writer) - LoadMetadataIndex(shard *Shard, index *DatabaseIndex) error + LoadMetadataIndex(shardID uint64, index *DatabaseIndex) error CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index cfc0ccb85f..7663da7dc6 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -206,19 +206,17 @@ func (e *Engine) SetLogOutput(w io.Writer) { } // LoadMetadataIndex loads the shard metadata into memory. -func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex) error { +func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) error { // Save reference to index for iterator creation. e.index = index - start := time.Now() - if err := e.FileStore.WalkKeys(func(key string, typ byte) error { fieldType, err := tsmFieldTypeToInfluxQLDataType(typ) if err != nil { return err } - if err := e.addToIndexFromKey(key, fieldType, index); err != nil { + if err := e.addToIndexFromKey(shardID, key, fieldType, index); err != nil { return err } return nil @@ -238,15 +236,11 @@ func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex) er continue } - if err := e.addToIndexFromKey(key, fieldType, index); err != nil { + if err := e.addToIndexFromKey(shardID, key, fieldType, index); err != nil { return err } } - // sh may be nil in tests - if sh != nil { - e.logger.Printf("%s database index loaded in %s", sh.Path(), time.Now().Sub(start)) - } return nil } @@ -315,7 +309,7 @@ func (e *Engine) writeFileToBackup(f FileStat, shardRelativePath string, tw *tar // addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the // database index and measurement fields -func (e *Engine) addToIndexFromKey(key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error { +func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error { seriesKey, field := seriesAndFieldFromCompositeKey(key) measurement := tsdb.MeasurementFromSeriesKey(seriesKey) @@ -339,6 +333,7 @@ func (e *Engine) addToIndexFromKey(key string, fieldType influxql.DataType, inde s := tsdb.NewSeries(seriesKey, tags) s.InitializeShards() index.CreateSeriesIndexIfNotExists(measurement, s) + s.AssignShard(shardID) return nil } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 5cbebae054..0f6abc4e92 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -37,7 +37,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { // Load metadata index. index := tsdb.NewDatabaseIndex("db") - if err := e.LoadMetadataIndex(nil, index); err != nil { + if err := e.LoadMetadataIndex(1, index); err != nil { t.Fatal(err) } @@ -60,7 +60,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { // Load metadata index. index = tsdb.NewDatabaseIndex("db") - if err := e.LoadMetadataIndex(nil, index); err != nil { + if err := e.LoadMetadataIndex(1, index); err != nil { t.Fatal(err) } @@ -85,7 +85,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { // Load metadata index. index = tsdb.NewDatabaseIndex("db") - if err := e.LoadMetadataIndex(nil, index); err != nil { + if err := e.LoadMetadataIndex(1, index); err != nil { t.Fatal(err) } @@ -693,7 +693,7 @@ func MustOpenEngine() *Engine { if err := e.Open(); err != nil { panic(err) } - if err := e.LoadMetadataIndex(nil, tsdb.NewDatabaseIndex("db")); err != nil { + if err := e.LoadMetadataIndex(1, tsdb.NewDatabaseIndex("db")); err != nil { panic(err) } return e diff --git a/tsdb/shard.go b/tsdb/shard.go index 592cbcc085..3915c2cee3 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -7,11 +7,13 @@ import ( "expvar" "fmt" "io" + "log" "math" "os" "sort" "strings" "sync" + "time" "github.com/gogo/protobuf/proto" "github.com/influxdata/influxdb" @@ -88,6 +90,8 @@ type Shard struct { // expvar-based stats. statMap *expvar.Map + logger *log.Logger + // The writer used by the logger. LogOutput io.Writer } @@ -106,7 +110,7 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti } statMap := influxdb.NewStatistics(key, "shard", tags) - return &Shard{ + s := &Shard{ index: index, id: id, path: path, @@ -119,12 +123,15 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti statMap: statMap, LogOutput: os.Stderr, } + s.SetLogOutput(os.Stderr) + return s } // SetLogOutput sets the writer to which log output will be written. It must // not be called after the Open method has been called. func (s *Shard) SetLogOutput(w io.Writer) { s.LogOutput = w + s.logger = log.New(w, "[shard] ", log.LstdFlags) if !s.closed() { s.engine.SetLogOutput(w) } @@ -160,9 +167,11 @@ func (s *Shard) Open() error { } // Load metadata index. - if err := s.engine.LoadMetadataIndex(s, s.index); err != nil { + start := time.Now() + if err := s.engine.LoadMetadataIndex(s.id, s.index); err != nil { return err } + s.logger.Printf("%s database index loaded in %s", s.path, time.Now().Sub(start)) return nil }(); err != nil {