Fix shards not getting assigned to series on restart
Also, simplifies the LoadMetaDataIndex func to not require a *Shardpull/6524/head
parent
2d09937fd2
commit
e0304ae3d5
|
@ -29,7 +29,7 @@ type Engine interface {
|
|||
Close() error
|
||||
|
||||
SetLogOutput(io.Writer)
|
||||
LoadMetadataIndex(shard *Shard, index *DatabaseIndex) error
|
||||
LoadMetadataIndex(shardID uint64, index *DatabaseIndex) error
|
||||
|
||||
CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
|
||||
SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error)
|
||||
|
|
|
@ -206,19 +206,17 @@ func (e *Engine) SetLogOutput(w io.Writer) {
|
|||
}
|
||||
|
||||
// LoadMetadataIndex loads the shard metadata into memory.
|
||||
func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex) error {
|
||||
func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) error {
|
||||
// Save reference to index for iterator creation.
|
||||
e.index = index
|
||||
|
||||
start := time.Now()
|
||||
|
||||
if err := e.FileStore.WalkKeys(func(key string, typ byte) error {
|
||||
fieldType, err := tsmFieldTypeToInfluxQLDataType(typ)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := e.addToIndexFromKey(key, fieldType, index); err != nil {
|
||||
if err := e.addToIndexFromKey(shardID, key, fieldType, index); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -238,15 +236,11 @@ func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex) er
|
|||
continue
|
||||
}
|
||||
|
||||
if err := e.addToIndexFromKey(key, fieldType, index); err != nil {
|
||||
if err := e.addToIndexFromKey(shardID, key, fieldType, index); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// sh may be nil in tests
|
||||
if sh != nil {
|
||||
e.logger.Printf("%s database index loaded in %s", sh.Path(), time.Now().Sub(start))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -315,7 +309,7 @@ func (e *Engine) writeFileToBackup(f FileStat, shardRelativePath string, tw *tar
|
|||
|
||||
// addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the
|
||||
// database index and measurement fields
|
||||
func (e *Engine) addToIndexFromKey(key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error {
|
||||
func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error {
|
||||
seriesKey, field := seriesAndFieldFromCompositeKey(key)
|
||||
measurement := tsdb.MeasurementFromSeriesKey(seriesKey)
|
||||
|
||||
|
@ -339,6 +333,7 @@ func (e *Engine) addToIndexFromKey(key string, fieldType influxql.DataType, inde
|
|||
s := tsdb.NewSeries(seriesKey, tags)
|
||||
s.InitializeShards()
|
||||
index.CreateSeriesIndexIfNotExists(measurement, s)
|
||||
s.AssignShard(shardID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
|
|||
|
||||
// Load metadata index.
|
||||
index := tsdb.NewDatabaseIndex("db")
|
||||
if err := e.LoadMetadataIndex(nil, index); err != nil {
|
||||
if err := e.LoadMetadataIndex(1, index); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -60,7 +60,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
|
|||
|
||||
// Load metadata index.
|
||||
index = tsdb.NewDatabaseIndex("db")
|
||||
if err := e.LoadMetadataIndex(nil, index); err != nil {
|
||||
if err := e.LoadMetadataIndex(1, index); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -85,7 +85,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
|
|||
|
||||
// Load metadata index.
|
||||
index = tsdb.NewDatabaseIndex("db")
|
||||
if err := e.LoadMetadataIndex(nil, index); err != nil {
|
||||
if err := e.LoadMetadataIndex(1, index); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -693,7 +693,7 @@ func MustOpenEngine() *Engine {
|
|||
if err := e.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := e.LoadMetadataIndex(nil, tsdb.NewDatabaseIndex("db")); err != nil {
|
||||
if err := e.LoadMetadataIndex(1, tsdb.NewDatabaseIndex("db")); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return e
|
||||
|
|
|
@ -7,11 +7,13 @@ import (
|
|||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/influxdata/influxdb"
|
||||
|
@ -88,6 +90,8 @@ type Shard struct {
|
|||
// expvar-based stats.
|
||||
statMap *expvar.Map
|
||||
|
||||
logger *log.Logger
|
||||
|
||||
// The writer used by the logger.
|
||||
LogOutput io.Writer
|
||||
}
|
||||
|
@ -106,7 +110,7 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
|
|||
}
|
||||
statMap := influxdb.NewStatistics(key, "shard", tags)
|
||||
|
||||
return &Shard{
|
||||
s := &Shard{
|
||||
index: index,
|
||||
id: id,
|
||||
path: path,
|
||||
|
@ -119,12 +123,15 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
|
|||
statMap: statMap,
|
||||
LogOutput: os.Stderr,
|
||||
}
|
||||
s.SetLogOutput(os.Stderr)
|
||||
return s
|
||||
}
|
||||
|
||||
// SetLogOutput sets the writer to which log output will be written. It must
|
||||
// not be called after the Open method has been called.
|
||||
func (s *Shard) SetLogOutput(w io.Writer) {
|
||||
s.LogOutput = w
|
||||
s.logger = log.New(w, "[shard] ", log.LstdFlags)
|
||||
if !s.closed() {
|
||||
s.engine.SetLogOutput(w)
|
||||
}
|
||||
|
@ -160,9 +167,11 @@ func (s *Shard) Open() error {
|
|||
}
|
||||
|
||||
// Load metadata index.
|
||||
if err := s.engine.LoadMetadataIndex(s, s.index); err != nil {
|
||||
start := time.Now()
|
||||
if err := s.engine.LoadMetadataIndex(s.id, s.index); err != nil {
|
||||
return err
|
||||
}
|
||||
s.logger.Printf("%s database index loaded in %s", s.path, time.Now().Sub(start))
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
|
|
Loading…
Reference in New Issue