diff --git a/cmd/influx_inspect/dumptsi/dumptsi.go b/cmd/influx_inspect/dumptsi/dumptsi.go index 04eda19d11..0321f49369 100644 --- a/cmd/influx_inspect/dumptsi/dumptsi.go +++ b/cmd/influx_inspect/dumptsi/dumptsi.go @@ -11,6 +11,7 @@ import ( "regexp" "text/tabwriter" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/index/tsi1" @@ -114,6 +115,7 @@ func (cmd *Command) Run(args ...string) error { func (cmd *Command) run() error { sfile := tsdb.NewSeriesFile(cmd.seriesFilePath) + sfile.Logger = logger.New(os.Stderr) if err := sfile.Open(); err != nil { return err } diff --git a/cmd/influx_inspect/inmem2tsi/inmem2tsi.go b/cmd/influx_inspect/inmem2tsi/inmem2tsi.go index 299744f480..d3e503a930 100644 --- a/cmd/influx_inspect/inmem2tsi/inmem2tsi.go +++ b/cmd/influx_inspect/inmem2tsi/inmem2tsi.go @@ -10,7 +10,6 @@ import ( "os" "path/filepath" - "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/engine/tsm1" @@ -55,6 +54,7 @@ func (cmd *Command) Run(args ...string) error { func (cmd *Command) run(seriesFilePath, dataDir, walDir string) error { sfile := tsdb.NewSeriesFile(seriesFilePath) + sfile.Logger = cmd.Logger if err := sfile.Open(); err != nil { return err } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index d845df1a50..42dc2c1be7 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/deep" "github.com/influxdata/influxdb/query" @@ -1690,6 +1691,7 @@ func NewEngine(index string) (*Engine, error) { } sfile := tsdb.NewSeriesFile(seriesPath) + sfile.Logger = logger.New(os.Stdout) if err = sfile.Open(); err != nil { return nil, err } diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 2cdda8115c..bfa994cbbd 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -10,33 +10,44 @@ import ( "os" "path/filepath" "sync" + "time" "github.com/cespare/xxhash" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/rhh" + "go.uber.org/zap" ) // SeriesIDSize is the size in bytes of a series key ID. const SeriesIDSize = 8 -// SeriesMapThreshold is the number of series IDs to hold in the in-memory +// DefaultSeriesFileCompactThreshold is the number of series IDs to hold in the in-memory // series map before compacting and rebuilding the on-disk representation. -const SeriesMapThreshold = 1 << 25 // ~33M ids * 8 bytes per id == 256MB +const DefaultSeriesFileCompactThreshold = 1 << 20 // 1M // SeriesFile represents the section of the index that holds series data. type SeriesFile struct { mu sync.RWMutex + wg sync.WaitGroup path string segments []*SeriesSegment index *SeriesIndex seq uint64 // series id sequence + + compacting bool + + CompactThreshold int + + Logger *zap.Logger } // NewSeriesFile returns a new instance of SeriesFile. func NewSeriesFile(path string) *SeriesFile { return &SeriesFile{ - path: path, + path: path, + CompactThreshold: DefaultSeriesFileCompactThreshold, + Logger: zap.NewNop(), } } @@ -61,10 +72,10 @@ func (f *SeriesFile) Open() error { f.index = NewSeriesIndex(f.IndexPath()) if err := f.index.Open(); err != nil { return err + } else if f.index.Recover(f.segments); err != nil { + return err } - // TODO: Replay new entries since index was built. - return nil }(); err != nil { f.Close() @@ -113,6 +124,8 @@ func (f *SeriesFile) openSegments() error { // Close unmaps the data file. func (f *SeriesFile) Close() (err error) { + f.wg.Wait() + f.mu.Lock() defer f.mu.Unlock() @@ -202,6 +215,30 @@ func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []mod f.index.Insert(f.seriesKeyByOffset(keyRange.offset), keyRange.id, keyRange.offset) } + // Check if we've crossed the compaction threshold. + if !f.compacting && f.CompactThreshold != 0 && f.index.InMemCount() >= uint64(f.CompactThreshold) { + f.compacting = true + logger := f.Logger.With(zap.String("path", f.path)) + logger.Info("beginning series file compaction") + + startTime := time.Now() + f.wg.Add(1) + go func() { + defer f.wg.Done() + + if err := NewSeriesFileCompactor().Compact(f); err != nil { + logger.With(zap.Error(err)).Error("series file compaction failed") + } + + logger.With(zap.Duration("elapsed", time.Since(startTime))).Info("completed series file compaction") + + // Clear compaction flag. + f.mu.Lock() + f.compacting = false + f.mu.Unlock() + }() + } + return ids, nil } diff --git a/tsdb/series_file_test.go b/tsdb/series_file_test.go index b4386645f7..81b20933ac 100644 --- a/tsdb/series_file_test.go +++ b/tsdb/series_file_test.go @@ -6,6 +6,7 @@ import ( "os" "testing" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb" ) @@ -47,6 +48,7 @@ func TestSeriesFile_Series(t *testing.T) { // Ensure series file can be compacted. func TestSeriesFileCompactor(t *testing.T) { sfile := MustOpenSeriesFile() + sfile.CompactThreshold = 0 defer sfile.Close() var names [][]byte @@ -102,6 +104,7 @@ func NewSeriesFile() *SeriesFile { // MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error. func MustOpenSeriesFile() *SeriesFile { f := NewSeriesFile() + f.Logger = logger.New(os.Stdout) if err := f.Open(); err != nil { panic(err) } diff --git a/tsdb/series_index.go b/tsdb/series_index.go index c803cf0db0..078d0f1776 100644 --- a/tsdb/series_index.go +++ b/tsdb/series_index.go @@ -135,9 +135,15 @@ func (idx *SeriesIndex) Recover(segments []*SeriesSegment) error { // Count returns the number of series in the index. func (idx *SeriesIndex) Count() uint64 { - return idx.count + uint64(len(idx.idOffsetMap)) + return idx.OnDiskCount() + idx.InMemCount() } +// OnDiskCount returns the number of series in the on-disk index. +func (idx *SeriesIndex) OnDiskCount() uint64 { return idx.count } + +// InMemCount returns the number of series in the in-memory index. +func (idx *SeriesIndex) InMemCount() uint64 { return uint64(len(idx.idOffsetMap)) } + func (idx *SeriesIndex) Insert(key []byte, id uint64, offset int64) { idx.execEntry(SeriesEntryInsertFlag, id, offset, key) } diff --git a/tsdb/series_segment.go b/tsdb/series_segment.go index a9600a26d1..7f80afffc8 100644 --- a/tsdb/series_segment.go +++ b/tsdb/series_segment.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "os" "regexp" "strconv" @@ -59,7 +58,7 @@ func NewSeriesSegment(id uint16, path string) *SeriesSegment { // CreateSeriesSegment generates an empty segment at path. func CreateSeriesSegment(id uint16, path string) (*SeriesSegment, error) { // Generate segment in temp location. - f, err := ioutil.TempFile("", "series-segment-") + f, err := os.Create(path + ".initializing") if err != nil { return nil, err } diff --git a/tsdb/shard_internal_test.go b/tsdb/shard_internal_test.go index 1da93603e4..d0e9c0fefc 100644 --- a/tsdb/shard_internal_test.go +++ b/tsdb/shard_internal_test.go @@ -14,6 +14,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxql" ) @@ -221,6 +222,7 @@ func NewTempShard(index string) *TempShard { // Create series file. sfile := NewSeriesFile(filepath.Join(dir, "db0", SeriesFileName)) + sfile.Logger = logger.New(os.Stdout) if err := sfile.Open(); err != nil { panic(err) } diff --git a/tsdb/store.go b/tsdb/store.go index fad24f5e77..7373255f10 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -354,6 +354,7 @@ func (s *Store) openSeriesFile(database string) (*SeriesFile, error) { } sfile := NewSeriesFile(filepath.Join(s.path, database, SeriesFileName)) + sfile.Logger = s.baseLogger if err := sfile.Open(); err != nil { return nil, err } diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 58202b969f..ed3ee0ce10 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -11,7 +11,6 @@ import ( "path/filepath" "reflect" "regexp" - "runtime" "sort" "strings" "testing" @@ -1514,11 +1513,6 @@ func NewStore() *Store { s.WithLogger(logger.New(os.Stdout)) } - if runtime.GOARCH == "386" { - // Set the mmap size to something addressable in the process. - s.SeriesFileMaxSize = 1 << 27 // 128MB - } - return s } @@ -1540,12 +1534,8 @@ func (s *Store) Reopen() error { return err } - // Keep old max series file size. - seriesMapSize := s.Store.SeriesFileMaxSize - s.Store = tsdb.NewStore(s.Path()) s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal") - s.SeriesFileMaxSize = seriesMapSize return s.Store.Open() }