Integrate SeriesFileCompactor

pull/9265/head
Ben Johnson 2018-01-02 12:20:03 -07:00
parent 56980b0d24
commit 52630e69d7
No known key found for this signature in database
GPG Key ID: 81741CD251883081
10 changed files with 61 additions and 19 deletions

View File

@ -11,6 +11,7 @@ import (
"regexp"
"text/tabwriter"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
@ -114,6 +115,7 @@ func (cmd *Command) Run(args ...string) error {
func (cmd *Command) run() error {
sfile := tsdb.NewSeriesFile(cmd.seriesFilePath)
sfile.Logger = logger.New(os.Stderr)
if err := sfile.Open(); err != nil {
return err
}

View File

@ -10,7 +10,6 @@ import (
"os"
"path/filepath"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
@ -55,6 +54,7 @@ func (cmd *Command) Run(args ...string) error {
func (cmd *Command) run(seriesFilePath, dataDir, walDir string) error {
sfile := tsdb.NewSeriesFile(seriesFilePath)
sfile.Logger = cmd.Logger
if err := sfile.Open(); err != nil {
return err
}

View File

@ -20,6 +20,7 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/deep"
"github.com/influxdata/influxdb/query"
@ -1690,6 +1691,7 @@ func NewEngine(index string) (*Engine, error) {
}
sfile := tsdb.NewSeriesFile(seriesPath)
sfile.Logger = logger.New(os.Stdout)
if err = sfile.Open(); err != nil {
return nil, err
}

View File

@ -10,33 +10,44 @@ import (
"os"
"path/filepath"
"sync"
"time"
"github.com/cespare/xxhash"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/rhh"
"go.uber.org/zap"
)
// SeriesIDSize is the size in bytes of a series key ID.
const SeriesIDSize = 8
// SeriesMapThreshold is the number of series IDs to hold in the in-memory
// DefaultSeriesFileCompactThreshold is the number of series IDs to hold in the in-memory
// series map before compacting and rebuilding the on-disk representation.
const SeriesMapThreshold = 1 << 25 // ~33M ids * 8 bytes per id == 256MB
const DefaultSeriesFileCompactThreshold = 1 << 20 // 1M
// SeriesFile represents the section of the index that holds series data.
type SeriesFile struct {
mu sync.RWMutex
wg sync.WaitGroup
path string
segments []*SeriesSegment
index *SeriesIndex
seq uint64 // series id sequence
compacting bool
CompactThreshold int
Logger *zap.Logger
}
// NewSeriesFile returns a new instance of SeriesFile.
func NewSeriesFile(path string) *SeriesFile {
return &SeriesFile{
path: path,
path: path,
CompactThreshold: DefaultSeriesFileCompactThreshold,
Logger: zap.NewNop(),
}
}
@ -61,10 +72,10 @@ func (f *SeriesFile) Open() error {
f.index = NewSeriesIndex(f.IndexPath())
if err := f.index.Open(); err != nil {
return err
} else if f.index.Recover(f.segments); err != nil {
return err
}
// TODO: Replay new entries since index was built.
return nil
}(); err != nil {
f.Close()
@ -113,6 +124,8 @@ func (f *SeriesFile) openSegments() error {
// Close unmaps the data file.
func (f *SeriesFile) Close() (err error) {
f.wg.Wait()
f.mu.Lock()
defer f.mu.Unlock()
@ -202,6 +215,30 @@ func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []mod
f.index.Insert(f.seriesKeyByOffset(keyRange.offset), keyRange.id, keyRange.offset)
}
// Check if we've crossed the compaction threshold.
if !f.compacting && f.CompactThreshold != 0 && f.index.InMemCount() >= uint64(f.CompactThreshold) {
f.compacting = true
logger := f.Logger.With(zap.String("path", f.path))
logger.Info("beginning series file compaction")
startTime := time.Now()
f.wg.Add(1)
go func() {
defer f.wg.Done()
if err := NewSeriesFileCompactor().Compact(f); err != nil {
logger.With(zap.Error(err)).Error("series file compaction failed")
}
logger.With(zap.Duration("elapsed", time.Since(startTime))).Info("completed series file compaction")
// Clear compaction flag.
f.mu.Lock()
f.compacting = false
f.mu.Unlock()
}()
}
return ids, nil
}

View File

@ -6,6 +6,7 @@ import (
"os"
"testing"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
)
@ -47,6 +48,7 @@ func TestSeriesFile_Series(t *testing.T) {
// Ensure series file can be compacted.
func TestSeriesFileCompactor(t *testing.T) {
sfile := MustOpenSeriesFile()
sfile.CompactThreshold = 0
defer sfile.Close()
var names [][]byte
@ -102,6 +104,7 @@ func NewSeriesFile() *SeriesFile {
// MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error.
func MustOpenSeriesFile() *SeriesFile {
f := NewSeriesFile()
f.Logger = logger.New(os.Stdout)
if err := f.Open(); err != nil {
panic(err)
}

View File

@ -135,9 +135,15 @@ func (idx *SeriesIndex) Recover(segments []*SeriesSegment) error {
// Count returns the number of series in the index.
func (idx *SeriesIndex) Count() uint64 {
return idx.count + uint64(len(idx.idOffsetMap))
return idx.OnDiskCount() + idx.InMemCount()
}
// OnDiskCount returns the number of series in the on-disk index.
func (idx *SeriesIndex) OnDiskCount() uint64 { return idx.count }
// InMemCount returns the number of series in the in-memory index.
func (idx *SeriesIndex) InMemCount() uint64 { return uint64(len(idx.idOffsetMap)) }
func (idx *SeriesIndex) Insert(key []byte, id uint64, offset int64) {
idx.execEntry(SeriesEntryInsertFlag, id, offset, key)
}

View File

@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"regexp"
"strconv"
@ -59,7 +58,7 @@ func NewSeriesSegment(id uint16, path string) *SeriesSegment {
// CreateSeriesSegment generates an empty segment at path.
func CreateSeriesSegment(id uint16, path string) (*SeriesSegment, error) {
// Generate segment in temp location.
f, err := ioutil.TempFile("", "series-segment-")
f, err := os.Create(path + ".initializing")
if err != nil {
return nil, err
}

View File

@ -14,6 +14,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxql"
)
@ -221,6 +222,7 @@ func NewTempShard(index string) *TempShard {
// Create series file.
sfile := NewSeriesFile(filepath.Join(dir, "db0", SeriesFileName))
sfile.Logger = logger.New(os.Stdout)
if err := sfile.Open(); err != nil {
panic(err)
}

View File

@ -354,6 +354,7 @@ func (s *Store) openSeriesFile(database string) (*SeriesFile, error) {
}
sfile := NewSeriesFile(filepath.Join(s.path, database, SeriesFileName))
sfile.Logger = s.baseLogger
if err := sfile.Open(); err != nil {
return nil, err
}

View File

@ -11,7 +11,6 @@ import (
"path/filepath"
"reflect"
"regexp"
"runtime"
"sort"
"strings"
"testing"
@ -1514,11 +1513,6 @@ func NewStore() *Store {
s.WithLogger(logger.New(os.Stdout))
}
if runtime.GOARCH == "386" {
// Set the mmap size to something addressable in the process.
s.SeriesFileMaxSize = 1 << 27 // 128MB
}
return s
}
@ -1540,12 +1534,8 @@ func (s *Store) Reopen() error {
return err
}
// Keep old max series file size.
seriesMapSize := s.Store.SeriesFileMaxSize
s.Store = tsdb.NewStore(s.Path())
s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal")
s.SeriesFileMaxSize = seriesMapSize
return s.Store.Open()
}