diff --git a/cmd/influx_inspect/inmem2tsi/inmem2tsi.go b/cmd/influx_inspect/inmem2tsi/inmem2tsi.go index 6126a5076c..32cc702ff9 100644 --- a/cmd/influx_inspect/inmem2tsi/inmem2tsi.go +++ b/cmd/influx_inspect/inmem2tsi/inmem2tsi.go @@ -11,6 +11,7 @@ import ( "path/filepath" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/engine/tsm1" "github.com/influxdata/influxdb/tsdb/index/tsi1" "github.com/uber-go/zap" @@ -37,15 +38,13 @@ func NewCommand() *Command { // Run executes the command. func (cmd *Command) Run(args ...string) error { fs := flag.NewFlagSet("inmem2tsi", flag.ExitOnError) - dataDir := fs.String("datadir", "", "data directory") - walDir := fs.String("waldir", "", "WAL directory") + dataDir := fs.String("datadir", "", "shard data directory") + walDir := fs.String("waldir", "", "shard WAL directory") fs.BoolVar(&cmd.Verbose, "v", false, "verbose") fs.SetOutput(cmd.Stdout) - fs.Usage = cmd.printUsage if err := fs.Parse(args); err != nil { return err } else if fs.NArg() > 0 || *dataDir == "" || *walDir == "" { - cmd.printUsage() return flag.ErrHelp } @@ -107,11 +106,25 @@ func (cmd *Command) run(dataDir, walDir string) error { } // Write out wal files. - cmd.Logger.Info("iterating over wal files") - for _, path := range walPaths { - cmd.Logger.Info("processing wal file", zap.String("path", path)) - if err := cmd.processWALFile(tsiIndex, path); err != nil { - return err + cmd.Logger.Info("building cache from wal files") + cache := tsm1.NewCache(tsdb.DefaultCacheMaxMemorySize, "") + loader := tsm1.NewCacheLoader(walPaths) + loader.WithLogger(cmd.Logger) + if err := loader.Load(cache); err != nil { + return err + } + + cmd.Logger.Info("iterating over cache") + for _, key := range cache.Keys() { + seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey(key) + name, tags := models.ParseKey(seriesKey) + + if cmd.Verbose { + cmd.Logger.Info("series", zap.String("name", string(name)), zap.String("tags", tags.String())) + } + + if err := tsiIndex.CreateSeriesIfNotExists(nil, []byte(name), tags); err != nil { + return fmt.Errorf("cannot create series: %s %s (%s)", name, tags.String(), err) } } @@ -165,43 +178,6 @@ func (cmd *Command) processTSMFile(index *tsi1.Index, path string) error { return nil } -func (cmd *Command) processWALFile(index *tsi1.Index, path string) error { - f, err := os.Open(path) - if err != nil { - return err - } - defer f.Close() - - r := tsm1.NewWALSegmentReader(f) - defer r.Close() - - for r.Next() { - entry, err := r.Read() - if err != nil { - n := r.Count() - cmd.Logger.Warn("wal file corruption", zap.String("path", path), zap.Int64("n", n)) - break - } - - switch t := entry.(type) { - case *tsm1.WriteWALEntry: - for key, _ := range t.Values { - seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey([]byte(key)) - name, tags := models.ParseKey(seriesKey) - - if cmd.Verbose { - cmd.Logger.Info("series", zap.String("name", string(name)), zap.String("tags", tags.String())) - } - - if err := index.CreateSeriesIfNotExists(nil, []byte(name), tags); err != nil { - return fmt.Errorf("cannot create series: %s %s (%s)", name, tags.String(), err) - } - } - } - } - return nil -} - func (cmd *Command) collectTSMFiles(path string) ([]string, error) { fis, err := ioutil.ReadDir(path) if err != nil {