Switch to use tsm1.CacheLoader.

pull/8963/head
Ben Johnson 2017-10-19 08:25:45 -06:00
parent ea43660a55
commit 1f4e070b3a
No known key found for this signature in database
GPG Key ID: 81741CD251883081
1 changed files with 22 additions and 46 deletions

View File

@ -11,6 +11,7 @@ import (
"path/filepath"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
"github.com/uber-go/zap"
@ -37,15 +38,13 @@ func NewCommand() *Command {
// Run executes the command.
func (cmd *Command) Run(args ...string) error {
fs := flag.NewFlagSet("inmem2tsi", flag.ExitOnError)
dataDir := fs.String("datadir", "", "data directory")
walDir := fs.String("waldir", "", "WAL directory")
dataDir := fs.String("datadir", "", "shard data directory")
walDir := fs.String("waldir", "", "shard WAL directory")
fs.BoolVar(&cmd.Verbose, "v", false, "verbose")
fs.SetOutput(cmd.Stdout)
fs.Usage = cmd.printUsage
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() > 0 || *dataDir == "" || *walDir == "" {
cmd.printUsage()
return flag.ErrHelp
}
@ -107,11 +106,25 @@ func (cmd *Command) run(dataDir, walDir string) error {
}
// Write out wal files.
cmd.Logger.Info("iterating over wal files")
for _, path := range walPaths {
cmd.Logger.Info("processing wal file", zap.String("path", path))
if err := cmd.processWALFile(tsiIndex, path); err != nil {
return err
cmd.Logger.Info("building cache from wal files")
cache := tsm1.NewCache(tsdb.DefaultCacheMaxMemorySize, "")
loader := tsm1.NewCacheLoader(walPaths)
loader.WithLogger(cmd.Logger)
if err := loader.Load(cache); err != nil {
return err
}
cmd.Logger.Info("iterating over cache")
for _, key := range cache.Keys() {
seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
name, tags := models.ParseKey(seriesKey)
if cmd.Verbose {
cmd.Logger.Info("series", zap.String("name", string(name)), zap.String("tags", tags.String()))
}
if err := tsiIndex.CreateSeriesIfNotExists(nil, []byte(name), tags); err != nil {
return fmt.Errorf("cannot create series: %s %s (%s)", name, tags.String(), err)
}
}
@ -165,43 +178,6 @@ func (cmd *Command) processTSMFile(index *tsi1.Index, path string) error {
return nil
}
func (cmd *Command) processWALFile(index *tsi1.Index, path string) error {
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
r := tsm1.NewWALSegmentReader(f)
defer r.Close()
for r.Next() {
entry, err := r.Read()
if err != nil {
n := r.Count()
cmd.Logger.Warn("wal file corruption", zap.String("path", path), zap.Int64("n", n))
break
}
switch t := entry.(type) {
case *tsm1.WriteWALEntry:
for key, _ := range t.Values {
seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey([]byte(key))
name, tags := models.ParseKey(seriesKey)
if cmd.Verbose {
cmd.Logger.Info("series", zap.String("name", string(name)), zap.String("tags", tags.String()))
}
if err := index.CreateSeriesIfNotExists(nil, []byte(name), tags); err != nil {
return fmt.Errorf("cannot create series: %s %s (%s)", name, tags.String(), err)
}
}
}
}
return nil
}
func (cmd *Command) collectTSMFiles(path string) ([]string, error) {
fis, err := ioutil.ReadDir(path)
if err != nil {