Fix tsi1 tools.

pull/9150/head
Ben Johnson 2017-12-08 16:12:33 -07:00
parent 7d13bf3262
commit 288c5217e8
No known key found for this signature in database
GPG Key ID: 81741CD251883081
4 changed files with 77 additions and 37 deletions

View File

@ -2,6 +2,7 @@
package dumptsi
import (
"errors"
"flag"
"fmt"
"io"
@ -11,6 +12,7 @@ import (
"text/tabwriter"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
)
@ -20,7 +22,8 @@ type Command struct {
Stderr io.Writer
Stdout io.Writer
paths []string
seriesFilePath string
paths []string
showSeries bool
showMeasurements bool
@ -45,6 +48,7 @@ func NewCommand() *Command {
func (cmd *Command) Run(args ...string) error {
var measurementFilter, tagKeyFilter, tagValueFilter string
fs := flag.NewFlagSet("dumptsi", flag.ExitOnError)
fs.StringVar(&cmd.seriesFilePath, "series-file", "", "Path to series file")
fs.BoolVar(&cmd.showSeries, "series", false, "Show raw series data")
fs.BoolVar(&cmd.showMeasurements, "measurements", false, "Show raw measurement data")
fs.BoolVar(&cmd.showTagKeys, "tag-keys", false, "Show raw tag key data")
@ -82,6 +86,11 @@ func (cmd *Command) Run(args ...string) error {
cmd.tagValueFilter = re
}
// Validate series file path.
if cmd.seriesFilePath == "" {
return errors.New("series file path required")
}
cmd.paths = fs.Args()
if len(cmd.paths) == 0 {
fmt.Printf("at least one path required\n\n")
@ -104,8 +113,14 @@ func (cmd *Command) Run(args ...string) error {
}
func (cmd *Command) run() error {
sfile := tsdb.NewSeriesFile(cmd.seriesFilePath)
if err := sfile.Open(); err != nil {
return err
}
defer sfile.Close()
// Build a file set from the paths on the command line.
idx, fs, err := cmd.readFileSet()
idx, fs, err := cmd.readFileSet(sfile)
if err != nil {
return err
}
@ -114,7 +129,7 @@ func (cmd *Command) run() error {
if fs != nil {
defer fs.Release()
defer fs.Close()
return cmd.printFileSet(fs)
return cmd.printFileSet(sfile, fs)
}
// Otherwise iterate over each partition in the index.
@ -123,7 +138,7 @@ func (cmd *Command) run() error {
if err := func() error {
fs := idx.PartitionAt(i).RetainFileSet()
defer fs.Release()
return cmd.printFileSet(fs)
return cmd.printFileSet(sfile, fs)
}(); err != nil {
return err
}
@ -131,10 +146,10 @@ func (cmd *Command) run() error {
return nil
}
func (cmd *Command) printFileSet(fs *tsi1.FileSet) error {
func (cmd *Command) printFileSet(sfile *tsdb.SeriesFile, fs *tsi1.FileSet) error {
// Show either raw data or summary stats.
if cmd.showSeries || cmd.showMeasurements {
if err := cmd.printMerged(fs); err != nil {
if err := cmd.printMerged(sfile, fs); err != nil {
return err
}
} else {
@ -146,14 +161,14 @@ func (cmd *Command) printFileSet(fs *tsi1.FileSet) error {
return nil
}
func (cmd *Command) readFileSet() (*tsi1.Index, *tsi1.FileSet, error) {
func (cmd *Command) readFileSet(sfile *tsdb.SeriesFile) (*tsi1.Index, *tsi1.FileSet, error) {
// If only one path exists and it's a directory then open as an index.
if len(cmd.paths) == 1 {
fi, err := os.Stat(cmd.paths[0])
if err != nil {
return nil, nil, err
} else if fi.IsDir() {
idx := tsi1.NewIndex(
idx := tsi1.NewIndex(sfile,
tsi1.WithPath(cmd.paths[0]),
tsi1.DisableCompactions(),
)
@ -164,12 +179,6 @@ func (cmd *Command) readFileSet() (*tsi1.Index, *tsi1.FileSet, error) {
}
}
// Open series file in path.
sfile := tsi1.NewSeriesFile(filepath.Join(filepath.Dir(cmd.paths[0]), tsi1.SeriesFileName))
if err := sfile.Open(); err != nil {
return nil, nil, err
}
// Open each file and group into a fileset.
var files []tsi1.File
for _, path := range cmd.paths {
@ -203,16 +212,16 @@ func (cmd *Command) readFileSet() (*tsi1.Index, *tsi1.FileSet, error) {
return nil, fs, nil
}
func (cmd *Command) printMerged(fs *tsi1.FileSet) error {
if err := cmd.printSeries(fs); err != nil {
func (cmd *Command) printMerged(sfile *tsdb.SeriesFile, fs *tsi1.FileSet) error {
if err := cmd.printSeries(sfile); err != nil {
return err
} else if err := cmd.printMeasurements(fs); err != nil {
} else if err := cmd.printMeasurements(sfile, fs); err != nil {
return err
}
return nil
}
func (cmd *Command) printSeries(fs *tsi1.FileSet) error {
func (cmd *Command) printSeries(sfile *tsdb.SeriesFile) error {
if !cmd.showSeries {
return nil
}
@ -222,15 +231,23 @@ func (cmd *Command) printSeries(fs *tsi1.FileSet) error {
fmt.Fprintln(tw, "Series\t")
// Iterate over each series.
itr := fs.SeriesFile().SeriesIDIterator()
for e := itr.Next(); e.SeriesID != 0; e = itr.Next() {
name, tags := tsi1.ParseSeriesKey(fs.SeriesFile().SeriesKey(e.SeriesID))
itr := sfile.SeriesIDIterator()
for {
e, err := itr.Next()
if err != nil {
return err
} else if e.SeriesID == 0 {
break
}
name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID))
if !cmd.matchSeries(name, tags) {
continue
}
fmt.Fprintf(tw, "%s%s\t%v\n", name, tags.HashKey(), deletedString(e.Deleted))
deleted := sfile.IsDeleted(e.SeriesID)
fmt.Fprintf(tw, "%s%s\t%v\n", name, tags.HashKey(), deletedString(deleted))
}
// Flush & write footer spacing.
@ -242,7 +259,7 @@ func (cmd *Command) printSeries(fs *tsi1.FileSet) error {
return nil
}
func (cmd *Command) printMeasurements(fs *tsi1.FileSet) error {
func (cmd *Command) printMeasurements(sfile *tsdb.SeriesFile, fs *tsi1.FileSet) error {
if !cmd.showMeasurements {
return nil
}
@ -262,7 +279,7 @@ func (cmd *Command) printMeasurements(fs *tsi1.FileSet) error {
return err
}
if err := cmd.printTagKeys(fs, e.Name()); err != nil {
if err := cmd.printTagKeys(sfile, fs, e.Name()); err != nil {
return err
}
}
@ -273,7 +290,7 @@ func (cmd *Command) printMeasurements(fs *tsi1.FileSet) error {
return nil
}
func (cmd *Command) printTagKeys(fs *tsi1.FileSet, name []byte) error {
func (cmd *Command) printTagKeys(sfile *tsdb.SeriesFile, fs *tsi1.FileSet, name []byte) error {
if !cmd.showTagKeys {
return nil
}
@ -291,7 +308,7 @@ func (cmd *Command) printTagKeys(fs *tsi1.FileSet, name []byte) error {
return err
}
if err := cmd.printTagValues(fs, name, e.Key()); err != nil {
if err := cmd.printTagValues(sfile, fs, name, e.Key()); err != nil {
return err
}
}
@ -300,7 +317,7 @@ func (cmd *Command) printTagKeys(fs *tsi1.FileSet, name []byte) error {
return nil
}
func (cmd *Command) printTagValues(fs *tsi1.FileSet, name, key []byte) error {
func (cmd *Command) printTagValues(sfile *tsdb.SeriesFile, fs *tsi1.FileSet, name, key []byte) error {
if !cmd.showTagValues {
return nil
}
@ -318,7 +335,7 @@ func (cmd *Command) printTagValues(fs *tsi1.FileSet, name, key []byte) error {
return err
}
if err := cmd.printTagValueSeries(fs, name, key, e.Value()); err != nil {
if err := cmd.printTagValueSeries(sfile, fs, name, key, e.Value()); err != nil {
return err
}
}
@ -327,7 +344,7 @@ func (cmd *Command) printTagValues(fs *tsi1.FileSet, name, key []byte) error {
return nil
}
func (cmd *Command) printTagValueSeries(fs *tsi1.FileSet, name, key, value []byte) error {
func (cmd *Command) printTagValueSeries(sfile *tsdb.SeriesFile, fs *tsi1.FileSet, name, key, value []byte) error {
if !cmd.showTagValueSeries {
return nil
}
@ -335,8 +352,15 @@ func (cmd *Command) printTagValueSeries(fs *tsi1.FileSet, name, key, value []byt
// Iterate over each series.
tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0)
itr := fs.TagValueSeriesIDIterator(name, key, value)
for e := itr.Next(); e.SeriesID != 0; e = itr.Next() {
name, tags := tsi1.ParseSeriesKey(fs.SeriesFile().SeriesKey(e.SeriesID))
for {
e, err := itr.Next()
if err != nil {
return err
} else if e.SeriesID == 0 {
break
}
name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID))
if !cmd.matchSeries(name, tags) {
continue

View File

@ -38,21 +38,28 @@ func NewCommand() *Command {
// Run executes the command.
func (cmd *Command) Run(args ...string) error {
fs := flag.NewFlagSet("inmem2tsi", flag.ExitOnError)
seriesFilePath := fs.String("series-file", "", "series file path")
dataDir := fs.String("datadir", "", "shard data directory")
walDir := fs.String("waldir", "", "shard WAL directory")
fs.BoolVar(&cmd.Verbose, "v", false, "verbose")
fs.SetOutput(cmd.Stdout)
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() > 0 || *dataDir == "" || *walDir == "" {
} else if fs.NArg() > 0 || *seriesFilePath == "" || *dataDir == "" || *walDir == "" {
return flag.ErrHelp
}
cmd.Logger = logger.New(cmd.Stderr)
return cmd.run(*dataDir, *walDir)
return cmd.run(*seriesFilePath, *dataDir, *walDir)
}
func (cmd *Command) run(dataDir, walDir string) error {
func (cmd *Command) run(seriesFilePath, dataDir, walDir string) error {
sfile := tsdb.NewSeriesFile(seriesFilePath)
if err := sfile.Open(); err != nil {
return err
}
defer sfile.Close()
// Check if shard already has a TSI index.
indexPath := filepath.Join(dataDir, "index")
cmd.Logger.Info("checking index path", zap.String("path", indexPath))
@ -83,10 +90,10 @@ func (cmd *Command) run(dataDir, walDir string) error {
}
// Open TSI index in temporary path.
tsiIndex := tsi1.NewIndex(
tsiIndex := tsi1.NewIndex(sfile,
tsi1.WithPath(tmpPath),
tsi1.WithLogger(cmd.Logger),
)
tsiIndex.WithLogger(cmd.Logger)
cmd.Logger.Info("opening tsi index in temporary location", zap.String("path", tmpPath))
if err := tsiIndex.Open(); err != nil {
return err

View File

@ -195,6 +195,15 @@ func (i *Index) Open() error {
return nil
}
// Compact requests a compaction of partitions.
func (i *Index) Compact() {
i.mu.Lock()
defer i.mu.Unlock()
for _, p := range i.partitions {
p.Compact()
}
}
// Wait blocks until all outstanding compactions have completed.
func (i *Index) Wait() {
for _, p := range i.partitions {

View File

@ -313,7 +313,7 @@ func (i *Partition) writeManifestFile() error {
}
// WithLogger sets the logger for the index.
func (i *Partition) WithLogger(logger zap.Logger) {
func (i *Partition) WithLogger(logger *zap.Logger) {
i.logger = logger.With(zap.String("index", "tsi"))
}