Enable concurrent index building
parent
ac0f125db9
commit
cd0294ea70
|
@ -14,6 +14,7 @@ import (
|
|||
"runtime/pprof"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/influxdata/influxdb/logger"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
|
@ -32,6 +33,7 @@ type Command struct {
|
|||
Verbose bool
|
||||
Logger *zap.Logger
|
||||
|
||||
concurrency int // Number of goroutines to dedicate to shard index building.
|
||||
databaseFilter string
|
||||
retentionFilter string
|
||||
shardFilter string
|
||||
|
@ -42,10 +44,11 @@ type Command struct {
|
|||
// NewCommand returns a new instance of Command.
|
||||
func NewCommand() *Command {
|
||||
return &Command{
|
||||
Stderr: os.Stderr,
|
||||
Stdout: os.Stdout,
|
||||
Logger: zap.NewNop(),
|
||||
batchSize: defaultBatchSize,
|
||||
Stderr: os.Stderr,
|
||||
Stdout: os.Stdout,
|
||||
Logger: zap.NewNop(),
|
||||
batchSize: defaultBatchSize,
|
||||
concurrency: runtime.GOMAXPROCS(0),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -54,6 +57,7 @@ func (cmd *Command) Run(args ...string) error {
|
|||
fs := flag.NewFlagSet("buildtsi", flag.ExitOnError)
|
||||
dataDir := fs.String("datadir", "", "data directory")
|
||||
walDir := fs.String("waldir", "", "WAL directory")
|
||||
fs.IntVar(&cmd.concurrency, "concurrency", runtime.GOMAXPROCS(0), "Number of workers to dedicate to shard index building. Defaults to GOMAXPROCS")
|
||||
fs.StringVar(&cmd.databaseFilter, "database", "", "optional: database name")
|
||||
fs.StringVar(&cmd.retentionFilter, "retention", "", "optional: retention policy")
|
||||
fs.StringVar(&cmd.shardFilter, "shard", "", "optional: shard id")
|
||||
|
@ -187,6 +191,11 @@ func (cmd *Command) processRetentionPolicy(sfile *tsdb.SeriesFile, dbName, rpNam
|
|||
return err
|
||||
}
|
||||
|
||||
var shards []struct {
|
||||
ID uint64
|
||||
Path string
|
||||
}
|
||||
|
||||
for _, fi := range fis {
|
||||
if !fi.IsDir() {
|
||||
continue
|
||||
|
@ -199,7 +208,31 @@ func (cmd *Command) processRetentionPolicy(sfile *tsdb.SeriesFile, dbName, rpNam
|
|||
continue
|
||||
}
|
||||
|
||||
if err := cmd.processShard(sfile, dbName, rpName, shardID, filepath.Join(dataDir, fi.Name()), filepath.Join(walDir, fi.Name())); err != nil {
|
||||
shards = append(shards, struct {
|
||||
ID uint64
|
||||
Path string
|
||||
}{shardID, fi.Name()})
|
||||
}
|
||||
|
||||
errC := make(chan error, len(shards))
|
||||
var maxi uint32 // index of maximum shard being worked on.
|
||||
for k := 0; k < cmd.concurrency; k++ {
|
||||
go func() {
|
||||
for {
|
||||
i := int(atomic.AddUint32(&maxi, 1) - 1) // Get next partition to work on.
|
||||
if i >= len(shards) {
|
||||
return // No more work.
|
||||
}
|
||||
|
||||
id, name := shards[i].ID, shards[i].Path
|
||||
errC <- cmd.processShard(sfile, dbName, rpName, id, filepath.Join(dataDir, name), filepath.Join(walDir, name))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Check for error
|
||||
for i := 0; i < cap(errC); i++ {
|
||||
if err := <-errC; err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -207,17 +240,18 @@ func (cmd *Command) processRetentionPolicy(sfile *tsdb.SeriesFile, dbName, rpNam
|
|||
}
|
||||
|
||||
func (cmd *Command) processShard(sfile *tsdb.SeriesFile, dbName, rpName string, shardID uint64, dataDir, walDir string) error {
|
||||
cmd.Logger.Info("Rebuilding shard", logger.Database(dbName), logger.RetentionPolicy(rpName), logger.Shard(shardID))
|
||||
log := cmd.Logger.With(logger.Database(dbName), logger.RetentionPolicy(rpName), logger.Shard(shardID))
|
||||
log.Info("Rebuilding shard")
|
||||
|
||||
// Check if shard already has a TSI index.
|
||||
indexPath := filepath.Join(dataDir, "index")
|
||||
cmd.Logger.Info("Checking index path", zap.String("path", indexPath))
|
||||
log.Info("Checking index path", zap.String("path", indexPath))
|
||||
if _, err := os.Stat(indexPath); !os.IsNotExist(err) {
|
||||
cmd.Logger.Info("tsi1 index already exists, skipping", zap.String("path", indexPath))
|
||||
log.Info("tsi1 index already exists, skipping", zap.String("path", indexPath))
|
||||
return nil
|
||||
}
|
||||
|
||||
cmd.Logger.Info("Opening shard")
|
||||
log.Info("Opening shard")
|
||||
|
||||
// Find shard files.
|
||||
tsmPaths, err := cmd.collectTSMFiles(dataDir)
|
||||
|
@ -231,7 +265,7 @@ func (cmd *Command) processShard(sfile *tsdb.SeriesFile, dbName, rpName string,
|
|||
|
||||
// Remove temporary index files if this is being re-run.
|
||||
tmpPath := filepath.Join(dataDir, ".index")
|
||||
cmd.Logger.Info("Cleaning up partial index from previous run, if any")
|
||||
log.Info("Cleaning up partial index from previous run, if any")
|
||||
if err := os.RemoveAll(tmpPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -248,23 +282,23 @@ func (cmd *Command) processShard(sfile *tsdb.SeriesFile, dbName, rpName string,
|
|||
|
||||
tsiIndex.WithLogger(cmd.Logger)
|
||||
|
||||
cmd.Logger.Info("Opening tsi index in temporary location", zap.String("path", tmpPath))
|
||||
log.Info("Opening tsi index in temporary location", zap.String("path", tmpPath))
|
||||
if err := tsiIndex.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer tsiIndex.Close()
|
||||
|
||||
// Write out tsm1 files.
|
||||
cmd.Logger.Info("Iterating over tsm files")
|
||||
log.Info("Iterating over tsm files")
|
||||
for _, path := range tsmPaths {
|
||||
cmd.Logger.Info("Processing tsm file", zap.String("path", path))
|
||||
if err := cmd.processTSMFile(tsiIndex, path); err != nil {
|
||||
log.Info("Processing tsm file", zap.String("path", path))
|
||||
if err := cmd.processTSMFile(tsiIndex, path, log); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Write out wal files.
|
||||
cmd.Logger.Info("Building cache from wal files")
|
||||
log.Info("Building cache from wal files")
|
||||
cache := tsm1.NewCache(tsdb.DefaultCacheMaxMemorySize)
|
||||
loader := tsm1.NewCacheLoader(walPaths)
|
||||
loader.WithLogger(cmd.Logger)
|
||||
|
@ -272,7 +306,7 @@ func (cmd *Command) processShard(sfile *tsdb.SeriesFile, dbName, rpName string,
|
|||
return err
|
||||
}
|
||||
|
||||
cmd.Logger.Info("Iterating over cache")
|
||||
log.Info("Iterating over cache")
|
||||
keysBatch := make([][]byte, 0, cmd.batchSize)
|
||||
namesBatch := make([][]byte, 0, cmd.batchSize)
|
||||
tagsBatch := make([]models.Tags, 0, cmd.batchSize)
|
||||
|
@ -282,7 +316,7 @@ func (cmd *Command) processShard(sfile *tsdb.SeriesFile, dbName, rpName string,
|
|||
name, tags := models.ParseKeyBytes(seriesKey)
|
||||
|
||||
if cmd.Verbose {
|
||||
cmd.Logger.Info("Series", zap.String("name", string(name)), zap.String("tags", tags.String()))
|
||||
log.Info("Series", zap.String("name", string(name)), zap.String("tags", tags.String()))
|
||||
}
|
||||
|
||||
keysBatch = append(keysBatch, seriesKey)
|
||||
|
@ -311,22 +345,22 @@ func (cmd *Command) processShard(sfile *tsdb.SeriesFile, dbName, rpName string,
|
|||
}
|
||||
|
||||
// Attempt to compact the index & wait for all compactions to complete.
|
||||
cmd.Logger.Info("compacting index")
|
||||
log.Info("compacting index")
|
||||
tsiIndex.Compact()
|
||||
tsiIndex.Wait()
|
||||
|
||||
// Close TSI index.
|
||||
cmd.Logger.Info("Closing tsi index")
|
||||
log.Info("Closing tsi index")
|
||||
if err := tsiIndex.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Rename TSI to standard path.
|
||||
cmd.Logger.Info("Moving tsi to permanent location")
|
||||
log.Info("Moving tsi to permanent location")
|
||||
return os.Rename(tmpPath, indexPath)
|
||||
}
|
||||
|
||||
func (cmd *Command) processTSMFile(index *tsi1.Index, path string) error {
|
||||
func (cmd *Command) processTSMFile(index *tsi1.Index, path string, log *zap.Logger) error {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -335,7 +369,7 @@ func (cmd *Command) processTSMFile(index *tsi1.Index, path string) error {
|
|||
|
||||
r, err := tsm1.NewTSMReader(f)
|
||||
if err != nil {
|
||||
cmd.Logger.Warn("Unable to read, skipping", zap.String("path", path), zap.Error(err))
|
||||
log.Warn("Unable to read, skipping", zap.String("path", path), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
defer r.Close()
|
||||
|
@ -351,7 +385,7 @@ func (cmd *Command) processTSMFile(index *tsi1.Index, path string) error {
|
|||
name, tagsBatch[ti] = models.ParseKeyBytesWithTags(seriesKey, tagsBatch[ti])
|
||||
|
||||
if cmd.Verbose {
|
||||
cmd.Logger.Info("Series", zap.String("name", string(name)), zap.String("tags", tagsBatch[ti].String()))
|
||||
log.Info("Series", zap.String("name", string(name)), zap.String("tags", tagsBatch[ti].String()))
|
||||
}
|
||||
|
||||
keysBatch = append(keysBatch, seriesKey)
|
||||
|
|
Loading…
Reference in New Issue