Merge pull request #8934 from influxdata/jw-inspect
Update influx_inspect to report total cardinalitiespull/8938/head
commit
0ea4615b4a
|
|
@ -40,6 +40,7 @@ func NewCommand() *Command {
|
|||
func (cmd *Command) Run(args ...string) error {
|
||||
fs := flag.NewFlagSet("report", flag.ExitOnError)
|
||||
fs.StringVar(&cmd.pattern, "pattern", "", "Include only files matching a pattern")
|
||||
|
||||
fs.BoolVar(&cmd.detailed, "detailed", false, "Report detailed cardinality estimates")
|
||||
|
||||
fs.SetOutput(cmd.Stdout)
|
||||
|
|
@ -50,54 +51,55 @@ func (cmd *Command) Run(args ...string) error {
|
|||
}
|
||||
cmd.dir = fs.Arg(0)
|
||||
|
||||
start := time.Now()
|
||||
|
||||
files, err := filepath.Glob(filepath.Join(cmd.dir, fmt.Sprintf("*.%s", tsm1.TSMFileExtension)))
|
||||
if err != nil {
|
||||
return err
|
||||
err := cmd.isShardDir(cmd.dir)
|
||||
if cmd.detailed && err != nil {
|
||||
return fmt.Errorf("-detailed only supported for shard dirs.")
|
||||
}
|
||||
|
||||
var filtered []string
|
||||
if cmd.pattern != "" {
|
||||
for _, f := range files {
|
||||
if strings.Contains(f, cmd.pattern) {
|
||||
filtered = append(filtered, f)
|
||||
}
|
||||
}
|
||||
files = filtered
|
||||
}
|
||||
|
||||
if len(files) == 0 {
|
||||
return fmt.Errorf("no tsm files at %v", cmd.dir)
|
||||
}
|
||||
|
||||
tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0)
|
||||
fmt.Fprintln(tw, strings.Join([]string{"File", "Series", "Load Time"}, "\t"))
|
||||
|
||||
totalSeries := hllpp.New()
|
||||
tagCardinalities := map[string]*hllpp.HLLPP{}
|
||||
measCardinalities := map[string]*hllpp.HLLPP{}
|
||||
fieldCardinalities := map[string]*hllpp.HLLPP{}
|
||||
|
||||
for _, f := range files {
|
||||
file, err := os.OpenFile(f, os.O_RDONLY, 0600)
|
||||
dbCardinalities := map[string]*hllpp.HLLPP{}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
tw := tabwriter.NewWriter(cmd.Stdout, 8, 2, 1, ' ', 0)
|
||||
fmt.Fprintln(tw, strings.Join([]string{"DB", "RP", "Shard", "File", "Series", "New (Est)", "Load Time"}, "\t"))
|
||||
|
||||
if err := cmd.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error {
|
||||
if cmd.pattern != "" && strings.Contains(path, cmd.pattern) {
|
||||
return nil
|
||||
}
|
||||
|
||||
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
|
||||
if err != nil {
|
||||
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", f, err)
|
||||
continue
|
||||
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
loadStart := time.Now()
|
||||
reader, err := tsm1.NewTSMReader(file)
|
||||
if err != nil {
|
||||
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", file.Name(), err)
|
||||
continue
|
||||
return nil
|
||||
}
|
||||
loadTime := time.Since(loadStart)
|
||||
|
||||
dbCount := dbCardinalities[db]
|
||||
if dbCount == nil {
|
||||
dbCount = hllpp.New()
|
||||
dbCardinalities[db] = dbCount
|
||||
}
|
||||
|
||||
oldCount := dbCount.Count()
|
||||
|
||||
seriesCount := reader.KeyCount()
|
||||
for i := 0; i < seriesCount; i++ {
|
||||
key, _ := reader.KeyAt(i)
|
||||
totalSeries.Add([]byte(key))
|
||||
dbCount.Add([]byte(key))
|
||||
|
||||
if cmd.detailed {
|
||||
sep := strings.Index(string(key), "#!~#")
|
||||
|
|
@ -131,33 +133,44 @@ func (cmd *Command) Run(args ...string) error {
|
|||
reader.Close()
|
||||
|
||||
fmt.Fprintln(tw, strings.Join([]string{
|
||||
db, rp, id,
|
||||
filepath.Base(file.Name()),
|
||||
strconv.FormatInt(int64(seriesCount), 10),
|
||||
strconv.FormatInt(int64(dbCount.Count()-oldCount), 10),
|
||||
|
||||
loadTime.String(),
|
||||
}, "\t"))
|
||||
tw.Flush()
|
||||
if cmd.detailed {
|
||||
tw.Flush()
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tw.Flush()
|
||||
println()
|
||||
fmt.Printf("Statistics\n")
|
||||
fmt.Printf("\tSeries:\n")
|
||||
fmt.Printf("\t\tTotal (est): %d\n", totalSeries.Count())
|
||||
fmt.Printf(" Series:\n")
|
||||
for db, counts := range dbCardinalities {
|
||||
fmt.Printf(" - %s (est): %d (%d%%)\n", db, counts.Count(), int(float64(counts.Count())/float64(totalSeries.Count())*100))
|
||||
}
|
||||
fmt.Printf(" Total (est): %d\n", totalSeries.Count())
|
||||
|
||||
if cmd.detailed {
|
||||
fmt.Printf("\tMeasurements (est):\n")
|
||||
fmt.Printf("\n Measurements (est):\n")
|
||||
for _, t := range sortKeys(measCardinalities) {
|
||||
fmt.Printf("\t\t%v: %d (%d%%)\n", t, measCardinalities[t].Count(), int((float64(measCardinalities[t].Count())/float64(totalSeries.Count()))*100))
|
||||
fmt.Printf(" - %v: %d (%d%%)\n", t, measCardinalities[t].Count(), int((float64(measCardinalities[t].Count())/float64(totalSeries.Count()))*100))
|
||||
}
|
||||
|
||||
fmt.Printf("\tFields (est):\n")
|
||||
fmt.Printf("\n Fields (est):\n")
|
||||
for _, t := range sortKeys(fieldCardinalities) {
|
||||
fmt.Printf("\t\t%v: %d\n", t, fieldCardinalities[t].Count())
|
||||
fmt.Printf(" - %v: %d\n", t, fieldCardinalities[t].Count())
|
||||
}
|
||||
|
||||
fmt.Printf("\tTags (est):\n")
|
||||
fmt.Printf("\n Tags (est):\n")
|
||||
for _, t := range sortKeys(tagCardinalities) {
|
||||
fmt.Printf("\t\t%v: %d\n", t, tagCardinalities[t].Count())
|
||||
fmt.Printf(" - %v: %d\n", t, tagCardinalities[t].Count())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -175,6 +188,44 @@ func sortKeys(vals map[string]*hllpp.HLLPP) (keys []string) {
|
|||
return keys
|
||||
}
|
||||
|
||||
func (cmd *Command) isShardDir(dir string) error {
|
||||
name := filepath.Base(dir)
|
||||
if id, err := strconv.Atoi(name); err != nil || id < 1 {
|
||||
return fmt.Errorf("not a valid shard dir: %v", dir)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cmd *Command) WalkShardDirs(root string, fn func(db, rp, id, path string) error) error {
|
||||
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension {
|
||||
shardDir := filepath.Dir(path)
|
||||
|
||||
if err := cmd.isShardDir(shardDir); err != nil {
|
||||
return err
|
||||
}
|
||||
absPath, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
parts := strings.Split(absPath, string(filepath.Separator))
|
||||
db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2]
|
||||
|
||||
return fn(db, rp, id, path)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// printUsage prints the usage message to STDERR.
|
||||
func (cmd *Command) printUsage() {
|
||||
usage := `Displays shard level report.
|
||||
|
|
|
|||
Loading…
Reference in New Issue