From 288c5217e81d2df7edaf050f554c300eba71bd6b Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 8 Dec 2017 16:12:33 -0700 Subject: [PATCH] Fix tsi1 tools. --- cmd/influx_inspect/dumptsi/dumptsi.go | 86 +++++++++++++++-------- cmd/influx_inspect/inmem2tsi/inmem2tsi.go | 17 +++-- tsdb/index/tsi1/index.go | 9 +++ tsdb/index/tsi1/partition.go | 2 +- 4 files changed, 77 insertions(+), 37 deletions(-) diff --git a/cmd/influx_inspect/dumptsi/dumptsi.go b/cmd/influx_inspect/dumptsi/dumptsi.go index fc7ad67ae1..7a10018493 100644 --- a/cmd/influx_inspect/dumptsi/dumptsi.go +++ b/cmd/influx_inspect/dumptsi/dumptsi.go @@ -2,6 +2,7 @@ package dumptsi import ( + "errors" "flag" "fmt" "io" @@ -11,6 +12,7 @@ import ( "text/tabwriter" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/index/tsi1" ) @@ -20,7 +22,8 @@ type Command struct { Stderr io.Writer Stdout io.Writer - paths []string + seriesFilePath string + paths []string showSeries bool showMeasurements bool @@ -45,6 +48,7 @@ func NewCommand() *Command { func (cmd *Command) Run(args ...string) error { var measurementFilter, tagKeyFilter, tagValueFilter string fs := flag.NewFlagSet("dumptsi", flag.ExitOnError) + fs.StringVar(&cmd.seriesFilePath, "series-file", "", "Path to series file") fs.BoolVar(&cmd.showSeries, "series", false, "Show raw series data") fs.BoolVar(&cmd.showMeasurements, "measurements", false, "Show raw measurement data") fs.BoolVar(&cmd.showTagKeys, "tag-keys", false, "Show raw tag key data") @@ -82,6 +86,11 @@ func (cmd *Command) Run(args ...string) error { cmd.tagValueFilter = re } + // Validate series file path. + if cmd.seriesFilePath == "" { + return errors.New("series file path required") + } + cmd.paths = fs.Args() if len(cmd.paths) == 0 { fmt.Printf("at least one path required\n\n") @@ -104,8 +113,14 @@ func (cmd *Command) Run(args ...string) error { } func (cmd *Command) run() error { + sfile := tsdb.NewSeriesFile(cmd.seriesFilePath) + if err := sfile.Open(); err != nil { + return err + } + defer sfile.Close() + // Build a file set from the paths on the command line. - idx, fs, err := cmd.readFileSet() + idx, fs, err := cmd.readFileSet(sfile) if err != nil { return err } @@ -114,7 +129,7 @@ func (cmd *Command) run() error { if fs != nil { defer fs.Release() defer fs.Close() - return cmd.printFileSet(fs) + return cmd.printFileSet(sfile, fs) } // Otherwise iterate over each partition in the index. @@ -123,7 +138,7 @@ func (cmd *Command) run() error { if err := func() error { fs := idx.PartitionAt(i).RetainFileSet() defer fs.Release() - return cmd.printFileSet(fs) + return cmd.printFileSet(sfile, fs) }(); err != nil { return err } @@ -131,10 +146,10 @@ func (cmd *Command) run() error { return nil } -func (cmd *Command) printFileSet(fs *tsi1.FileSet) error { +func (cmd *Command) printFileSet(sfile *tsdb.SeriesFile, fs *tsi1.FileSet) error { // Show either raw data or summary stats. if cmd.showSeries || cmd.showMeasurements { - if err := cmd.printMerged(fs); err != nil { + if err := cmd.printMerged(sfile, fs); err != nil { return err } } else { @@ -146,14 +161,14 @@ func (cmd *Command) printFileSet(fs *tsi1.FileSet) error { return nil } -func (cmd *Command) readFileSet() (*tsi1.Index, *tsi1.FileSet, error) { +func (cmd *Command) readFileSet(sfile *tsdb.SeriesFile) (*tsi1.Index, *tsi1.FileSet, error) { // If only one path exists and it's a directory then open as an index. if len(cmd.paths) == 1 { fi, err := os.Stat(cmd.paths[0]) if err != nil { return nil, nil, err } else if fi.IsDir() { - idx := tsi1.NewIndex( + idx := tsi1.NewIndex(sfile, tsi1.WithPath(cmd.paths[0]), tsi1.DisableCompactions(), ) @@ -164,12 +179,6 @@ func (cmd *Command) readFileSet() (*tsi1.Index, *tsi1.FileSet, error) { } } - // Open series file in path. - sfile := tsi1.NewSeriesFile(filepath.Join(filepath.Dir(cmd.paths[0]), tsi1.SeriesFileName)) - if err := sfile.Open(); err != nil { - return nil, nil, err - } - // Open each file and group into a fileset. var files []tsi1.File for _, path := range cmd.paths { @@ -203,16 +212,16 @@ func (cmd *Command) readFileSet() (*tsi1.Index, *tsi1.FileSet, error) { return nil, fs, nil } -func (cmd *Command) printMerged(fs *tsi1.FileSet) error { - if err := cmd.printSeries(fs); err != nil { +func (cmd *Command) printMerged(sfile *tsdb.SeriesFile, fs *tsi1.FileSet) error { + if err := cmd.printSeries(sfile); err != nil { return err - } else if err := cmd.printMeasurements(fs); err != nil { + } else if err := cmd.printMeasurements(sfile, fs); err != nil { return err } return nil } -func (cmd *Command) printSeries(fs *tsi1.FileSet) error { +func (cmd *Command) printSeries(sfile *tsdb.SeriesFile) error { if !cmd.showSeries { return nil } @@ -222,15 +231,23 @@ func (cmd *Command) printSeries(fs *tsi1.FileSet) error { fmt.Fprintln(tw, "Series\t") // Iterate over each series. - itr := fs.SeriesFile().SeriesIDIterator() - for e := itr.Next(); e.SeriesID != 0; e = itr.Next() { - name, tags := tsi1.ParseSeriesKey(fs.SeriesFile().SeriesKey(e.SeriesID)) + itr := sfile.SeriesIDIterator() + for { + e, err := itr.Next() + if err != nil { + return err + } else if e.SeriesID == 0 { + break + } + name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID)) if !cmd.matchSeries(name, tags) { continue } - fmt.Fprintf(tw, "%s%s\t%v\n", name, tags.HashKey(), deletedString(e.Deleted)) + deleted := sfile.IsDeleted(e.SeriesID) + + fmt.Fprintf(tw, "%s%s\t%v\n", name, tags.HashKey(), deletedString(deleted)) } // Flush & write footer spacing. @@ -242,7 +259,7 @@ func (cmd *Command) printSeries(fs *tsi1.FileSet) error { return nil } -func (cmd *Command) printMeasurements(fs *tsi1.FileSet) error { +func (cmd *Command) printMeasurements(sfile *tsdb.SeriesFile, fs *tsi1.FileSet) error { if !cmd.showMeasurements { return nil } @@ -262,7 +279,7 @@ func (cmd *Command) printMeasurements(fs *tsi1.FileSet) error { return err } - if err := cmd.printTagKeys(fs, e.Name()); err != nil { + if err := cmd.printTagKeys(sfile, fs, e.Name()); err != nil { return err } } @@ -273,7 +290,7 @@ func (cmd *Command) printMeasurements(fs *tsi1.FileSet) error { return nil } -func (cmd *Command) printTagKeys(fs *tsi1.FileSet, name []byte) error { +func (cmd *Command) printTagKeys(sfile *tsdb.SeriesFile, fs *tsi1.FileSet, name []byte) error { if !cmd.showTagKeys { return nil } @@ -291,7 +308,7 @@ func (cmd *Command) printTagKeys(fs *tsi1.FileSet, name []byte) error { return err } - if err := cmd.printTagValues(fs, name, e.Key()); err != nil { + if err := cmd.printTagValues(sfile, fs, name, e.Key()); err != nil { return err } } @@ -300,7 +317,7 @@ func (cmd *Command) printTagKeys(fs *tsi1.FileSet, name []byte) error { return nil } -func (cmd *Command) printTagValues(fs *tsi1.FileSet, name, key []byte) error { +func (cmd *Command) printTagValues(sfile *tsdb.SeriesFile, fs *tsi1.FileSet, name, key []byte) error { if !cmd.showTagValues { return nil } @@ -318,7 +335,7 @@ func (cmd *Command) printTagValues(fs *tsi1.FileSet, name, key []byte) error { return err } - if err := cmd.printTagValueSeries(fs, name, key, e.Value()); err != nil { + if err := cmd.printTagValueSeries(sfile, fs, name, key, e.Value()); err != nil { return err } } @@ -327,7 +344,7 @@ func (cmd *Command) printTagValues(fs *tsi1.FileSet, name, key []byte) error { return nil } -func (cmd *Command) printTagValueSeries(fs *tsi1.FileSet, name, key, value []byte) error { +func (cmd *Command) printTagValueSeries(sfile *tsdb.SeriesFile, fs *tsi1.FileSet, name, key, value []byte) error { if !cmd.showTagValueSeries { return nil } @@ -335,8 +352,15 @@ func (cmd *Command) printTagValueSeries(fs *tsi1.FileSet, name, key, value []byt // Iterate over each series. tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0) itr := fs.TagValueSeriesIDIterator(name, key, value) - for e := itr.Next(); e.SeriesID != 0; e = itr.Next() { - name, tags := tsi1.ParseSeriesKey(fs.SeriesFile().SeriesKey(e.SeriesID)) + for { + e, err := itr.Next() + if err != nil { + return err + } else if e.SeriesID == 0 { + break + } + + name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID)) if !cmd.matchSeries(name, tags) { continue diff --git a/cmd/influx_inspect/inmem2tsi/inmem2tsi.go b/cmd/influx_inspect/inmem2tsi/inmem2tsi.go index a4c74f24ea..299744f480 100644 --- a/cmd/influx_inspect/inmem2tsi/inmem2tsi.go +++ b/cmd/influx_inspect/inmem2tsi/inmem2tsi.go @@ -38,21 +38,28 @@ func NewCommand() *Command { // Run executes the command. func (cmd *Command) Run(args ...string) error { fs := flag.NewFlagSet("inmem2tsi", flag.ExitOnError) + seriesFilePath := fs.String("series-file", "", "series file path") dataDir := fs.String("datadir", "", "shard data directory") walDir := fs.String("waldir", "", "shard WAL directory") fs.BoolVar(&cmd.Verbose, "v", false, "verbose") fs.SetOutput(cmd.Stdout) if err := fs.Parse(args); err != nil { return err - } else if fs.NArg() > 0 || *dataDir == "" || *walDir == "" { + } else if fs.NArg() > 0 || *seriesFilePath == "" || *dataDir == "" || *walDir == "" { return flag.ErrHelp } cmd.Logger = logger.New(cmd.Stderr) - return cmd.run(*dataDir, *walDir) + return cmd.run(*seriesFilePath, *dataDir, *walDir) } -func (cmd *Command) run(dataDir, walDir string) error { +func (cmd *Command) run(seriesFilePath, dataDir, walDir string) error { + sfile := tsdb.NewSeriesFile(seriesFilePath) + if err := sfile.Open(); err != nil { + return err + } + defer sfile.Close() + // Check if shard already has a TSI index. indexPath := filepath.Join(dataDir, "index") cmd.Logger.Info("checking index path", zap.String("path", indexPath)) @@ -83,10 +90,10 @@ func (cmd *Command) run(dataDir, walDir string) error { } // Open TSI index in temporary path. - tsiIndex := tsi1.NewIndex( + tsiIndex := tsi1.NewIndex(sfile, tsi1.WithPath(tmpPath), - tsi1.WithLogger(cmd.Logger), ) + tsiIndex.WithLogger(cmd.Logger) cmd.Logger.Info("opening tsi index in temporary location", zap.String("path", tmpPath)) if err := tsiIndex.Open(); err != nil { return err diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index d62905fcfc..2cf3645fd9 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -195,6 +195,15 @@ func (i *Index) Open() error { return nil } +// Compact requests a compaction of partitions. +func (i *Index) Compact() { + i.mu.Lock() + defer i.mu.Unlock() + for _, p := range i.partitions { + p.Compact() + } +} + // Wait blocks until all outstanding compactions have completed. func (i *Index) Wait() { for _, p := range i.partitions { diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index c534280a26..7f10683b82 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -313,7 +313,7 @@ func (i *Partition) writeManifestFile() error { } // WithLogger sets the logger for the index. -func (i *Partition) WithLogger(logger zap.Logger) { +func (i *Partition) WithLogger(logger *zap.Logger) { i.logger = logger.With(zap.String("index", "tsi")) }