161 lines
4.3 KiB
Go
161 lines
4.3 KiB
Go
package cardinality
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"text/tabwriter"
|
|
|
|
"github.com/influxdata/influxdb/cmd/influx_inspect/cardinality/aggregators"
|
|
"github.com/influxdata/influxdb/models"
|
|
"github.com/influxdata/influxdb/pkg/reporthelper"
|
|
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// Command represents the program execution for "influxd cardinality".
|
|
type Command struct {
|
|
// Standard input/output, overridden for testing.
|
|
Stderr io.Writer
|
|
Stdout io.Writer
|
|
|
|
dbPath string
|
|
shardPaths map[uint64]string
|
|
exact bool
|
|
detailed bool
|
|
// How many goroutines to dedicate to calculating cardinality.
|
|
concurrency int
|
|
// t, d, r, m for Total, Database, Retention Policy, Measurement
|
|
rollup string
|
|
}
|
|
|
|
// NewCommand returns a new instance of Command with default setting applied.
|
|
func NewCommand() *Command {
|
|
return &Command{
|
|
Stderr: os.Stderr,
|
|
Stdout: os.Stdout,
|
|
shardPaths: map[uint64]string{},
|
|
concurrency: 1,
|
|
detailed: false,
|
|
rollup: "m",
|
|
}
|
|
}
|
|
|
|
// Run executes the command.
|
|
func (cmd *Command) Run(args ...string) (err error) {
|
|
var legalRollups = map[string]int{"m": 3, "r": 2, "d": 1, "t": 0}
|
|
fs := flag.NewFlagSet("report-db", flag.ExitOnError)
|
|
fs.StringVar(&cmd.dbPath, "db-path", "", "Path to database. Required.")
|
|
fs.IntVar(&cmd.concurrency, "c", 1, "Set worker concurrency. Defaults to one.")
|
|
fs.BoolVar(&cmd.detailed, "detailed", false, "Include counts for fields, tags, ")
|
|
fs.BoolVar(&cmd.exact, "exact", false, "Report exact counts")
|
|
fs.StringVar(&cmd.rollup, "rollup", "m", "Rollup level - t: total, d: database, r: retention policy, m: measurement")
|
|
fs.SetOutput(cmd.Stdout)
|
|
if err := fs.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
|
|
if cmd.dbPath == "" {
|
|
return errors.New("path to database must be provided")
|
|
}
|
|
|
|
totalDepth, ok := legalRollups[cmd.rollup]
|
|
|
|
if !ok {
|
|
return fmt.Errorf("invalid rollup specified: %q", cmd.rollup)
|
|
}
|
|
|
|
factory := aggregators.CreateNodeFactory(cmd.detailed, cmd.exact)
|
|
totalsTree := factory.NewNode(totalDepth == 0)
|
|
|
|
g, ctx := errgroup.WithContext(context.Background())
|
|
g.SetLimit(cmd.concurrency)
|
|
processTSM := func(db, rp, id, path string) error {
|
|
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
|
|
if err != nil {
|
|
_, _ = fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err)
|
|
return nil
|
|
}
|
|
|
|
reader, err := tsm1.NewTSMReader(file)
|
|
if err != nil {
|
|
_, _ = fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", file.Name(), err)
|
|
// NewTSMReader won't close the file handle on failure, so do it here.
|
|
_ = file.Close()
|
|
return nil
|
|
}
|
|
defer func() {
|
|
// The TSMReader will close the underlying file handle here.
|
|
if err := reader.Close(); err != nil {
|
|
_, _ = fmt.Fprintf(cmd.Stderr, "error closing: %s: %v.\n", file.Name(), err)
|
|
}
|
|
}()
|
|
|
|
seriesCount := reader.KeyCount()
|
|
for i := 0; i < seriesCount; i++ {
|
|
func() {
|
|
key, _ := reader.KeyAt(i)
|
|
seriesKey, field, _ := bytes.Cut(key, []byte("#!~#"))
|
|
measurement, tags := models.ParseKey(seriesKey)
|
|
totalsTree.Record(0, totalDepth, db, rp, measurement, key, field, tags)
|
|
}()
|
|
}
|
|
return nil
|
|
}
|
|
done := ctx.Done()
|
|
err = reporthelper.WalkShardDirs(cmd.dbPath, func(db, rp, id, path string) error {
|
|
select {
|
|
case <-done:
|
|
return nil
|
|
default:
|
|
g.Go(func() error {
|
|
return processTSM(db, rp, id, path)
|
|
})
|
|
return nil
|
|
}
|
|
})
|
|
|
|
if err != nil {
|
|
_, _ = fmt.Fprintf(cmd.Stderr, "%s: %v\n", cmd.dbPath, err)
|
|
return err
|
|
}
|
|
err = g.Wait()
|
|
if err != nil {
|
|
_, _ = fmt.Fprintf(cmd.Stderr, "%s: %v\n", cmd.dbPath, err)
|
|
return err
|
|
}
|
|
|
|
tw := tabwriter.NewWriter(cmd.Stdout, 8, 2, 1, ' ', 0)
|
|
|
|
if err = factory.PrintHeader(tw); err != nil {
|
|
return err
|
|
}
|
|
if err = factory.PrintDivider(tw); err != nil {
|
|
return err
|
|
}
|
|
for d, db := range totalsTree.Children() {
|
|
for r, rp := range db.Children() {
|
|
for m, measure := range rp.Children() {
|
|
err = measure.Print(tw, true, fmt.Sprintf("%q", d), fmt.Sprintf("%q", r), fmt.Sprintf("%q", m))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err = rp.Print(tw, false, fmt.Sprintf("%q", d), fmt.Sprintf("%q", r), ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err = db.Print(tw, false, fmt.Sprintf("%q", d), "", ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err = totalsTree.Print(tw, false, "Total"+factory.EstTitle, "", ""); err != nil {
|
|
return err
|
|
}
|
|
return tw.Flush()
|
|
}
|