Merge pull request #8942 from influxdata/jw-inspect
Add support for exact cardinality count to influx_inspectpull/8955/head
commit
6d93507389
|
@ -5,6 +5,7 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
|
@ -23,9 +24,9 @@ type Command struct {
|
|||
Stderr io.Writer
|
||||
Stdout io.Writer
|
||||
|
||||
dir string
|
||||
pattern string
|
||||
detailed bool
|
||||
dir string
|
||||
pattern string
|
||||
detailed, exact bool
|
||||
}
|
||||
|
||||
// NewCommand returns a new instance of Command.
|
||||
|
@ -40,8 +41,8 @@ func NewCommand() *Command {
|
|||
func (cmd *Command) Run(args ...string) error {
|
||||
fs := flag.NewFlagSet("report", flag.ExitOnError)
|
||||
fs.StringVar(&cmd.pattern, "pattern", "", "Include only files matching a pattern")
|
||||
|
||||
fs.BoolVar(&cmd.detailed, "detailed", false, "Report detailed cardinality estimates")
|
||||
fs.BoolVar(&cmd.exact, "exact", false, "Report exact counts")
|
||||
|
||||
fs.SetOutput(cmd.Stdout)
|
||||
fs.Usage = cmd.printUsage
|
||||
|
@ -49,6 +50,14 @@ func (cmd *Command) Run(args ...string) error {
|
|||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newCounterFn := newHLLCounter
|
||||
estTitle := " (est)"
|
||||
if cmd.exact {
|
||||
estTitle = ""
|
||||
newCounterFn = newExactCounter
|
||||
}
|
||||
|
||||
cmd.dir = fs.Arg(0)
|
||||
|
||||
err := cmd.isShardDir(cmd.dir)
|
||||
|
@ -56,18 +65,20 @@ func (cmd *Command) Run(args ...string) error {
|
|||
return fmt.Errorf("-detailed only supported for shard dirs.")
|
||||
}
|
||||
|
||||
totalSeries := hllpp.New()
|
||||
tagCardinalities := map[string]*hllpp.HLLPP{}
|
||||
measCardinalities := map[string]*hllpp.HLLPP{}
|
||||
fieldCardinalities := map[string]*hllpp.HLLPP{}
|
||||
totalSeries := newCounterFn()
|
||||
tagCardinalities := map[string]counter{}
|
||||
measCardinalities := map[string]counter{}
|
||||
fieldCardinalities := map[string]counter{}
|
||||
|
||||
dbCardinalities := map[string]*hllpp.HLLPP{}
|
||||
dbCardinalities := map[string]counter{}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
tw := tabwriter.NewWriter(cmd.Stdout, 8, 2, 1, ' ', 0)
|
||||
fmt.Fprintln(tw, strings.Join([]string{"DB", "RP", "Shard", "File", "Series", "New (Est)", "Load Time"}, "\t"))
|
||||
fmt.Fprintln(tw, strings.Join([]string{"DB", "RP", "Shard", "File", "Series", "New" + estTitle, "Min Time", "Max Time", "Load Time"}, "\t"))
|
||||
|
||||
minTime, maxTime := int64(math.MaxInt64), int64(math.MinInt64)
|
||||
var fileCount int
|
||||
if err := cmd.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error {
|
||||
if cmd.pattern != "" && strings.Contains(path, cmd.pattern) {
|
||||
return nil
|
||||
|
@ -86,10 +97,11 @@ func (cmd *Command) Run(args ...string) error {
|
|||
return nil
|
||||
}
|
||||
loadTime := time.Since(loadStart)
|
||||
fileCount++
|
||||
|
||||
dbCount := dbCardinalities[db]
|
||||
if dbCount == nil {
|
||||
dbCount = hllpp.New()
|
||||
dbCount = newCounterFn()
|
||||
dbCardinalities[db] = dbCount
|
||||
}
|
||||
|
||||
|
@ -108,14 +120,14 @@ func (cmd *Command) Run(args ...string) error {
|
|||
|
||||
measCount := measCardinalities[measurement]
|
||||
if measCount == nil {
|
||||
measCount = hllpp.New()
|
||||
measCount = newCounterFn()
|
||||
measCardinalities[measurement] = measCount
|
||||
}
|
||||
measCount.Add([]byte(key))
|
||||
|
||||
fieldCount := fieldCardinalities[measurement]
|
||||
if fieldCount == nil {
|
||||
fieldCount = hllpp.New()
|
||||
fieldCount = newCounterFn()
|
||||
fieldCardinalities[measurement] = fieldCount
|
||||
}
|
||||
fieldCount.Add([]byte(field))
|
||||
|
@ -123,13 +135,20 @@ func (cmd *Command) Run(args ...string) error {
|
|||
for _, t := range tags {
|
||||
tagCount := tagCardinalities[string(t.Key)]
|
||||
if tagCount == nil {
|
||||
tagCount = hllpp.New()
|
||||
tagCount = newCounterFn()
|
||||
tagCardinalities[string(t.Key)] = tagCount
|
||||
}
|
||||
tagCount.Add(t.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
minT, maxT := reader.TimeRange()
|
||||
if minT < minTime {
|
||||
minTime = minT
|
||||
}
|
||||
if maxT > maxTime {
|
||||
maxTime = maxT
|
||||
}
|
||||
reader.Close()
|
||||
|
||||
fmt.Fprintln(tw, strings.Join([]string{
|
||||
|
@ -137,7 +156,8 @@ func (cmd *Command) Run(args ...string) error {
|
|||
filepath.Base(file.Name()),
|
||||
strconv.FormatInt(int64(seriesCount), 10),
|
||||
strconv.FormatInt(int64(dbCount.Count()-oldCount), 10),
|
||||
|
||||
time.Unix(0, minT).UTC().Format(time.RFC3339Nano),
|
||||
time.Unix(0, maxT).UTC().Format(time.RFC3339Nano),
|
||||
loadTime.String(),
|
||||
}, "\t"))
|
||||
if cmd.detailed {
|
||||
|
@ -150,12 +170,22 @@ func (cmd *Command) Run(args ...string) error {
|
|||
|
||||
tw.Flush()
|
||||
println()
|
||||
|
||||
println("Summary:")
|
||||
fmt.Printf(" Files: %d\n", fileCount)
|
||||
fmt.Printf(" Time Range: %s - %s\n",
|
||||
time.Unix(0, minTime).UTC().Format(time.RFC3339Nano),
|
||||
time.Unix(0, maxTime).UTC().Format(time.RFC3339Nano),
|
||||
)
|
||||
fmt.Printf(" Duration: %s \n", time.Unix(0, maxTime).Sub(time.Unix(0, minTime)))
|
||||
println()
|
||||
|
||||
fmt.Printf("Statistics\n")
|
||||
fmt.Printf(" Series:\n")
|
||||
for db, counts := range dbCardinalities {
|
||||
fmt.Printf(" - %s (est): %d (%d%%)\n", db, counts.Count(), int(float64(counts.Count())/float64(totalSeries.Count())*100))
|
||||
fmt.Printf(" - %s%s: %d (%d%%)\n", db, estTitle, counts.Count(), int(float64(counts.Count())/float64(totalSeries.Count())*100))
|
||||
}
|
||||
fmt.Printf(" Total (est): %d\n", totalSeries.Count())
|
||||
fmt.Printf(" Total%s: %d\n", estTitle, totalSeries.Count())
|
||||
|
||||
if cmd.detailed {
|
||||
fmt.Printf("\n Measurements (est):\n")
|
||||
|
@ -179,7 +209,7 @@ func (cmd *Command) Run(args ...string) error {
|
|||
}
|
||||
|
||||
// sortKeys is a quick helper to return the sorted set of a map's keys
|
||||
func sortKeys(vals map[string]*hllpp.HLLPP) (keys []string) {
|
||||
func sortKeys(vals map[string]counter) (keys []string) {
|
||||
for k := range vals {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
|
@ -198,7 +228,12 @@ func (cmd *Command) isShardDir(dir string) error {
|
|||
}
|
||||
|
||||
func (cmd *Command) WalkShardDirs(root string, fn func(db, rp, id, path string) error) error {
|
||||
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
|
||||
type location struct {
|
||||
db, rp, id, path string
|
||||
}
|
||||
|
||||
var dirs []location
|
||||
if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -219,11 +254,26 @@ func (cmd *Command) WalkShardDirs(root string, fn func(db, rp, id, path string)
|
|||
}
|
||||
parts := strings.Split(absPath, string(filepath.Separator))
|
||||
db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2]
|
||||
|
||||
return fn(db, rp, id, path)
|
||||
dirs = append(dirs, location{db: db, rp: rp, id: id, path: path})
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sort.Slice(dirs, func(i, j int) bool {
|
||||
a, _ := strconv.Atoi(dirs[i].id)
|
||||
b, _ := strconv.Atoi(dirs[j].id)
|
||||
return a < b
|
||||
})
|
||||
|
||||
for _, shard := range dirs {
|
||||
if err := fn(shard.db, shard.rp, shard.id, shard.path); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// printUsage prints the usage message to STDERR.
|
||||
|
@ -234,6 +284,9 @@ Usage: influx_inspect report [flags]
|
|||
|
||||
-pattern <pattern>
|
||||
Include only files matching a pattern.
|
||||
-exact
|
||||
Report exact cardinality counts instead of estimates. Note: this can use a lot of memory.
|
||||
Defaults to "false".
|
||||
-detailed
|
||||
Report detailed cardinality estimates.
|
||||
Defaults to "false".
|
||||
|
@ -241,3 +294,33 @@ Usage: influx_inspect report [flags]
|
|||
|
||||
fmt.Fprintf(cmd.Stdout, usage)
|
||||
}
|
||||
|
||||
// counter abstracts a a method of counting keys.
|
||||
type counter interface {
|
||||
Add(key []byte)
|
||||
Count() uint64
|
||||
}
|
||||
|
||||
// newHLLCounter returns an approximate counter using HyperLogLogs for cardinality estimation.
|
||||
func newHLLCounter() counter {
|
||||
return hllpp.New()
|
||||
}
|
||||
|
||||
// exactCounter returns an exact count for keys using counting all distinct items in a set.
|
||||
type exactCounter struct {
|
||||
m map[string]struct{}
|
||||
}
|
||||
|
||||
func (c *exactCounter) Add(key []byte) {
|
||||
c.m[string(key)] = struct{}{}
|
||||
}
|
||||
|
||||
func (c *exactCounter) Count() uint64 {
|
||||
return uint64(len(c.m))
|
||||
}
|
||||
|
||||
func newExactCounter() counter {
|
||||
return &exactCounter{
|
||||
m: make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue