From c69ad0978c2fa7466cb19ef976b2de0ee3d21388 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 6 Oct 2017 15:00:30 -0600 Subject: [PATCH] Update influx_inspect to report total cardinalities This updates the report command to be able to report cardinalities across all dbs and shards --- cmd/influx_inspect/report/report.go | 123 ++++++++++++++++++++-------- 1 file changed, 87 insertions(+), 36 deletions(-) diff --git a/cmd/influx_inspect/report/report.go b/cmd/influx_inspect/report/report.go index 5e1a0b8075..185b19a177 100644 --- a/cmd/influx_inspect/report/report.go +++ b/cmd/influx_inspect/report/report.go @@ -40,6 +40,7 @@ func NewCommand() *Command { func (cmd *Command) Run(args ...string) error { fs := flag.NewFlagSet("report", flag.ExitOnError) fs.StringVar(&cmd.pattern, "pattern", "", "Include only files matching a pattern") + fs.BoolVar(&cmd.detailed, "detailed", false, "Report detailed cardinality estimates") fs.SetOutput(cmd.Stdout) @@ -50,54 +51,55 @@ func (cmd *Command) Run(args ...string) error { } cmd.dir = fs.Arg(0) - start := time.Now() - - files, err := filepath.Glob(filepath.Join(cmd.dir, fmt.Sprintf("*.%s", tsm1.TSMFileExtension))) - if err != nil { - return err + err := cmd.isShardDir(cmd.dir) + if cmd.detailed && err != nil { + return fmt.Errorf("-detailed only supported for shard dirs.") } - var filtered []string - if cmd.pattern != "" { - for _, f := range files { - if strings.Contains(f, cmd.pattern) { - filtered = append(filtered, f) - } - } - files = filtered - } - - if len(files) == 0 { - return fmt.Errorf("no tsm files at %v", cmd.dir) - } - - tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0) - fmt.Fprintln(tw, strings.Join([]string{"File", "Series", "Load Time"}, "\t")) - totalSeries := hllpp.New() tagCardinalities := map[string]*hllpp.HLLPP{} measCardinalities := map[string]*hllpp.HLLPP{} fieldCardinalities := map[string]*hllpp.HLLPP{} - for _, f := range files { - file, err := os.OpenFile(f, os.O_RDONLY, 0600) + dbCardinalities := map[string]*hllpp.HLLPP{} + + start := time.Now() + + tw := tabwriter.NewWriter(cmd.Stdout, 8, 2, 1, ' ', 0) + fmt.Fprintln(tw, strings.Join([]string{"DB", "RP", "Shard", "File", "Series", "New (Est)", "Load Time"}, "\t")) + + if err := cmd.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error { + if cmd.pattern != "" && strings.Contains(path, cmd.pattern) { + return nil + } + + file, err := os.OpenFile(path, os.O_RDONLY, 0600) if err != nil { - fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", f, err) - continue + fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err) + return nil } loadStart := time.Now() reader, err := tsm1.NewTSMReader(file) if err != nil { fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", file.Name(), err) - continue + return nil } loadTime := time.Since(loadStart) + dbCount := dbCardinalities[db] + if dbCount == nil { + dbCount = hllpp.New() + dbCardinalities[db] = dbCount + } + + oldCount := dbCount.Count() + seriesCount := reader.KeyCount() for i := 0; i < seriesCount; i++ { key, _ := reader.KeyAt(i) totalSeries.Add([]byte(key)) + dbCount.Add([]byte(key)) if cmd.detailed { sep := strings.Index(string(key), "#!~#") @@ -131,33 +133,44 @@ func (cmd *Command) Run(args ...string) error { reader.Close() fmt.Fprintln(tw, strings.Join([]string{ + db, rp, id, filepath.Base(file.Name()), strconv.FormatInt(int64(seriesCount), 10), + strconv.FormatInt(int64(dbCount.Count()-oldCount), 10), + loadTime.String(), }, "\t")) - tw.Flush() + if cmd.detailed { + tw.Flush() + } + return nil + }); err != nil { + return err } tw.Flush() println() fmt.Printf("Statistics\n") - fmt.Printf("\tSeries:\n") - fmt.Printf("\t\tTotal (est): %d\n", totalSeries.Count()) + fmt.Printf(" Series:\n") + for db, counts := range dbCardinalities { + fmt.Printf(" - %s (est): %d (%d%%)\n", db, counts.Count(), int(float64(counts.Count())/float64(totalSeries.Count())*100)) + } + fmt.Printf(" Total (est): %d\n", totalSeries.Count()) if cmd.detailed { - fmt.Printf("\tMeasurements (est):\n") + fmt.Printf("\n Measurements (est):\n") for _, t := range sortKeys(measCardinalities) { - fmt.Printf("\t\t%v: %d (%d%%)\n", t, measCardinalities[t].Count(), int((float64(measCardinalities[t].Count())/float64(totalSeries.Count()))*100)) + fmt.Printf(" - %v: %d (%d%%)\n", t, measCardinalities[t].Count(), int((float64(measCardinalities[t].Count())/float64(totalSeries.Count()))*100)) } - fmt.Printf("\tFields (est):\n") + fmt.Printf("\n Fields (est):\n") for _, t := range sortKeys(fieldCardinalities) { - fmt.Printf("\t\t%v: %d\n", t, fieldCardinalities[t].Count()) + fmt.Printf(" - %v: %d\n", t, fieldCardinalities[t].Count()) } - fmt.Printf("\tTags (est):\n") + fmt.Printf("\n Tags (est):\n") for _, t := range sortKeys(tagCardinalities) { - fmt.Printf("\t\t%v: %d\n", t, tagCardinalities[t].Count()) + fmt.Printf(" - %v: %d\n", t, tagCardinalities[t].Count()) } } @@ -175,6 +188,44 @@ func sortKeys(vals map[string]*hllpp.HLLPP) (keys []string) { return keys } +func (cmd *Command) isShardDir(dir string) error { + name := filepath.Base(dir) + if id, err := strconv.Atoi(name); err != nil || id < 1 { + return fmt.Errorf("not a valid shard dir: %v", dir) + } + + return nil +} + +func (cmd *Command) WalkShardDirs(root string, fn func(db, rp, id, path string) error) error { + return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if info.IsDir() { + return nil + } + + if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension { + shardDir := filepath.Dir(path) + + if err := cmd.isShardDir(shardDir); err != nil { + return err + } + absPath, err := filepath.Abs(path) + if err != nil { + return err + } + parts := strings.Split(absPath, string(filepath.Separator)) + db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2] + + return fn(db, rp, id, path) + } + return nil + }) +} + // printUsage prints the usage message to STDERR. func (cmd *Command) printUsage() { usage := `Displays shard level report.