Update influx_inspect to report total cardinalities

This updates the report command to be able to report cardinalities across
all dbs and shards
pull/8934/head
Jason Wilder 2017-10-06 15:00:30 -06:00
parent 10c4276eeb
commit c69ad0978c
1 changed files with 87 additions and 36 deletions

View File

@ -40,6 +40,7 @@ func NewCommand() *Command {
func (cmd *Command) Run(args ...string) error {
fs := flag.NewFlagSet("report", flag.ExitOnError)
fs.StringVar(&cmd.pattern, "pattern", "", "Include only files matching a pattern")
fs.BoolVar(&cmd.detailed, "detailed", false, "Report detailed cardinality estimates")
fs.SetOutput(cmd.Stdout)
@ -50,54 +51,55 @@ func (cmd *Command) Run(args ...string) error {
}
cmd.dir = fs.Arg(0)
start := time.Now()
files, err := filepath.Glob(filepath.Join(cmd.dir, fmt.Sprintf("*.%s", tsm1.TSMFileExtension)))
if err != nil {
return err
err := cmd.isShardDir(cmd.dir)
if cmd.detailed && err != nil {
return fmt.Errorf("-detailed only supported for shard dirs.")
}
var filtered []string
if cmd.pattern != "" {
for _, f := range files {
if strings.Contains(f, cmd.pattern) {
filtered = append(filtered, f)
}
}
files = filtered
}
if len(files) == 0 {
return fmt.Errorf("no tsm files at %v", cmd.dir)
}
tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0)
fmt.Fprintln(tw, strings.Join([]string{"File", "Series", "Load Time"}, "\t"))
totalSeries := hllpp.New()
tagCardinalities := map[string]*hllpp.HLLPP{}
measCardinalities := map[string]*hllpp.HLLPP{}
fieldCardinalities := map[string]*hllpp.HLLPP{}
for _, f := range files {
file, err := os.OpenFile(f, os.O_RDONLY, 0600)
dbCardinalities := map[string]*hllpp.HLLPP{}
start := time.Now()
tw := tabwriter.NewWriter(cmd.Stdout, 8, 2, 1, ' ', 0)
fmt.Fprintln(tw, strings.Join([]string{"DB", "RP", "Shard", "File", "Series", "New (Est)", "Load Time"}, "\t"))
if err := cmd.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error {
if cmd.pattern != "" && strings.Contains(path, cmd.pattern) {
return nil
}
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", f, err)
continue
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err)
return nil
}
loadStart := time.Now()
reader, err := tsm1.NewTSMReader(file)
if err != nil {
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", file.Name(), err)
continue
return nil
}
loadTime := time.Since(loadStart)
dbCount := dbCardinalities[db]
if dbCount == nil {
dbCount = hllpp.New()
dbCardinalities[db] = dbCount
}
oldCount := dbCount.Count()
seriesCount := reader.KeyCount()
for i := 0; i < seriesCount; i++ {
key, _ := reader.KeyAt(i)
totalSeries.Add([]byte(key))
dbCount.Add([]byte(key))
if cmd.detailed {
sep := strings.Index(string(key), "#!~#")
@ -131,33 +133,44 @@ func (cmd *Command) Run(args ...string) error {
reader.Close()
fmt.Fprintln(tw, strings.Join([]string{
db, rp, id,
filepath.Base(file.Name()),
strconv.FormatInt(int64(seriesCount), 10),
strconv.FormatInt(int64(dbCount.Count()-oldCount), 10),
loadTime.String(),
}, "\t"))
tw.Flush()
if cmd.detailed {
tw.Flush()
}
return nil
}); err != nil {
return err
}
tw.Flush()
println()
fmt.Printf("Statistics\n")
fmt.Printf("\tSeries:\n")
fmt.Printf("\t\tTotal (est): %d\n", totalSeries.Count())
fmt.Printf(" Series:\n")
for db, counts := range dbCardinalities {
fmt.Printf(" - %s (est): %d (%d%%)\n", db, counts.Count(), int(float64(counts.Count())/float64(totalSeries.Count())*100))
}
fmt.Printf(" Total (est): %d\n", totalSeries.Count())
if cmd.detailed {
fmt.Printf("\tMeasurements (est):\n")
fmt.Printf("\n Measurements (est):\n")
for _, t := range sortKeys(measCardinalities) {
fmt.Printf("\t\t%v: %d (%d%%)\n", t, measCardinalities[t].Count(), int((float64(measCardinalities[t].Count())/float64(totalSeries.Count()))*100))
fmt.Printf(" - %v: %d (%d%%)\n", t, measCardinalities[t].Count(), int((float64(measCardinalities[t].Count())/float64(totalSeries.Count()))*100))
}
fmt.Printf("\tFields (est):\n")
fmt.Printf("\n Fields (est):\n")
for _, t := range sortKeys(fieldCardinalities) {
fmt.Printf("\t\t%v: %d\n", t, fieldCardinalities[t].Count())
fmt.Printf(" - %v: %d\n", t, fieldCardinalities[t].Count())
}
fmt.Printf("\tTags (est):\n")
fmt.Printf("\n Tags (est):\n")
for _, t := range sortKeys(tagCardinalities) {
fmt.Printf("\t\t%v: %d\n", t, tagCardinalities[t].Count())
fmt.Printf(" - %v: %d\n", t, tagCardinalities[t].Count())
}
}
@ -175,6 +188,44 @@ func sortKeys(vals map[string]*hllpp.HLLPP) (keys []string) {
return keys
}
func (cmd *Command) isShardDir(dir string) error {
name := filepath.Base(dir)
if id, err := strconv.Atoi(name); err != nil || id < 1 {
return fmt.Errorf("not a valid shard dir: %v", dir)
}
return nil
}
func (cmd *Command) WalkShardDirs(root string, fn func(db, rp, id, path string) error) error {
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension {
shardDir := filepath.Dir(path)
if err := cmd.isShardDir(shardDir); err != nil {
return err
}
absPath, err := filepath.Abs(path)
if err != nil {
return err
}
parts := strings.Split(absPath, string(filepath.Separator))
db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2]
return fn(db, rp, id, path)
}
return nil
})
}
// printUsage prints the usage message to STDERR.
func (cmd *Command) printUsage() {
usage := `Displays shard level report.