From e95a6a4a4b31ce2f7ea1ea9cf17a674b12f93383 Mon Sep 17 00:00:00 2001 From: Sam Arnold <sarnold@influxdata.com> Date: Thu, 11 Mar 2021 14:25:55 -0500 Subject: [PATCH] feat(inspect): Add report-disk for disk usage by measurement (#20917) * feat(inspect): Add report-disk for disk usage by measurement (cherry picked from commit a6152e8ac135ba749b5ed7eb11e5c24b3f3d527a) * fix(inspect): bad pattern matching (cherry picked from commit 3a31e2370e757d12fc8b3e14760daba7aec697e1) * chore: fix goimports * chore: update changelog --- CHANGELOG.md | 1 + cmd/influx_inspect/main.go | 6 + cmd/influx_inspect/report/report.go | 65 +--- cmd/influx_inspect/reportdisk/reportdisk.go | 311 ++++++++++++++++++ .../reportdisk/reportdisk_test.go | 3 + pkg/reporthelper/walkshards.go | 71 ++++ .../tsm1/file_store_key_iterator_test.go | 58 ++-- tsdb/engine/tsm1/ring_test.go | 4 +- 8 files changed, 429 insertions(+), 90 deletions(-) create mode 100644 cmd/influx_inspect/reportdisk/reportdisk.go create mode 100644 cmd/influx_inspect/reportdisk/reportdisk_test.go create mode 100644 pkg/reporthelper/walkshards.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 94db6da0a7..a83ad7d412 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ v1.8.5 [unreleased] ### Features +- [#20917](https://github.com/influxdata/influxdb/pull/20917): feat(inspect): Add report-disk for disk usage by measurement - [#20118](https://github.com/influxdata/influxdb/pull/20118): feat: Optimize shard lookups in groups containing only one shard. Thanks @StoneYunZhao! - [#20910](https://github.com/influxdata/influxdb/pull/20910): feat: Make meta queries respect QueryTimeout values diff --git a/cmd/influx_inspect/main.go b/cmd/influx_inspect/main.go index 47d0c209a5..74dd3d8282 100644 --- a/cmd/influx_inspect/main.go +++ b/cmd/influx_inspect/main.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/influxdb/cmd/influx_inspect/export" "github.com/influxdata/influxdb/cmd/influx_inspect/help" "github.com/influxdata/influxdb/cmd/influx_inspect/report" + "github.com/influxdata/influxdb/cmd/influx_inspect/reportdisk" "github.com/influxdata/influxdb/cmd/influx_inspect/reporttsi" "github.com/influxdata/influxdb/cmd/influx_inspect/verify/seriesfile" "github.com/influxdata/influxdb/cmd/influx_inspect/verify/tombstone" @@ -98,6 +99,11 @@ func (m *Main) Run(args ...string) error { if err := name.Run(args...); err != nil { return fmt.Errorf("report: %s", err) } + case "report-disk": + name := reportdisk.NewCommand() + if err := name.Run(args...); err != nil { + return fmt.Errorf("report: %s", err) + } case "reporttsi": name := reporttsi.NewCommand() if err := name.Run(args...); err != nil { diff --git a/cmd/influx_inspect/report/report.go b/cmd/influx_inspect/report/report.go index 5b6c1f137f..62470bb71b 100644 --- a/cmd/influx_inspect/report/report.go +++ b/cmd/influx_inspect/report/report.go @@ -15,6 +15,7 @@ import ( "time" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/reporthelper" "github.com/influxdata/influxdb/tsdb/engine/tsm1" "github.com/retailnext/hllpp" ) @@ -60,7 +61,7 @@ func (cmd *Command) Run(args ...string) error { cmd.dir = fs.Arg(0) - err := cmd.isShardDir(cmd.dir) + err := reporthelper.IsShardDir(cmd.dir) if cmd.detailed && err != nil { return fmt.Errorf("-detailed only supported for shard dirs") } @@ -79,8 +80,8 @@ func (cmd *Command) Run(args ...string) error { minTime, maxTime := int64(math.MaxInt64), int64(math.MinInt64) var fileCount int - if err := cmd.walkShardDirs(cmd.dir, func(db, rp, id, path string) error { - if cmd.pattern != "" && strings.Contains(path, cmd.pattern) { + if err := reporthelper.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error { + if cmd.pattern != "" && !strings.Contains(path, cmd.pattern) { return nil } @@ -218,64 +219,6 @@ func sortKeys(vals map[string]counter) (keys []string) { return keys } -func (cmd *Command) isShardDir(dir string) error { - name := filepath.Base(dir) - if id, err := strconv.Atoi(name); err != nil || id < 1 { - return fmt.Errorf("not a valid shard dir: %v", dir) - } - - return nil -} - -func (cmd *Command) walkShardDirs(root string, fn func(db, rp, id, path string) error) error { - type location struct { - db, rp, id, path string - } - - var dirs []location - if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - if info.IsDir() { - return nil - } - - if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension { - shardDir := filepath.Dir(path) - - if err := cmd.isShardDir(shardDir); err != nil { - return err - } - absPath, err := filepath.Abs(path) - if err != nil { - return err - } - parts := strings.Split(absPath, string(filepath.Separator)) - db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2] - dirs = append(dirs, location{db: db, rp: rp, id: id, path: path}) - return nil - } - return nil - }); err != nil { - return err - } - - sort.Slice(dirs, func(i, j int) bool { - a, _ := strconv.Atoi(dirs[i].id) - b, _ := strconv.Atoi(dirs[j].id) - return a < b - }) - - for _, shard := range dirs { - if err := fn(shard.db, shard.rp, shard.id, shard.path); err != nil { - return err - } - } - return nil -} - // printUsage prints the usage message to STDERR. func (cmd *Command) printUsage() { usage := `Displays shard level report. diff --git a/cmd/influx_inspect/reportdisk/reportdisk.go b/cmd/influx_inspect/reportdisk/reportdisk.go new file mode 100644 index 0000000000..f3896aa4e8 --- /dev/null +++ b/cmd/influx_inspect/reportdisk/reportdisk.go @@ -0,0 +1,311 @@ +// Package report reports statistics about TSM files. +package reportdisk + +import ( + "encoding/json" + "flag" + "fmt" + "io" + "os" + "sort" + "strconv" + "strings" + "time" + + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/reporthelper" + "github.com/influxdata/influxdb/tsdb/engine/tsm1" +) + +// Command represents the program execution for "influxd report". +type Command struct { + Stderr io.Writer + Stdout io.Writer + + dir string + pattern string + detailed bool +} + +// NewCommand returns a new instance of Command. +func NewCommand() *Command { + return &Command{ + Stderr: os.Stderr, + Stdout: os.Stdout, + } +} + +// Run executes the command. +func (cmd *Command) Run(args ...string) error { + fs := flag.NewFlagSet("report", flag.ExitOnError) + fs.StringVar(&cmd.pattern, "pattern", "", "Include only files matching a pattern") + fs.BoolVar(&cmd.detailed, "detailed", false, "Report disk size by measurement") + + fs.SetOutput(cmd.Stdout) + fs.Usage = cmd.printUsage + + if err := fs.Parse(args); err != nil { + return err + } + + cmd.dir = fs.Arg(0) + + start := time.Now() + + shardSizes := ShardSizes{} + if err := reporthelper.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error { + if cmd.pattern != "" && !strings.Contains(path, cmd.pattern) { + return nil + } + + stat, err := os.Stat(path) + if err != nil { + fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err) + return nil + } + + shardSizes.AddTsmFileWithSize(db, rp, id, stat.Size()) + return nil + }); err != nil { + return err + } + + measurementSizes := MeasurementSizes{} + if cmd.detailed { + processedFiles := 0 + progress := NewProgressReporter(cmd.Stderr) + + if err := reporthelper.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error { + if cmd.pattern != "" && !strings.Contains(path, cmd.pattern) { + return nil + } + file, err := os.OpenFile(path, os.O_RDONLY, 0600) + if err != nil { + fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err) + return nil + } + + progress.Report(fmt.Sprintf("TSM files inspected: %d\t/%d", processedFiles, shardSizes.files)) + processedFiles++ + + reader, err := tsm1.NewTSMReader(file) + if err != nil { + fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", file.Name(), err) + return nil + } + + keyNum := reader.KeyCount() + for i := 0; i < keyNum; i++ { + key, _ := reader.KeyAt(i) + series, _ := tsm1.SeriesAndFieldFromCompositeKey(key) + measurement := models.ParseName(series) + var size int64 + for _, entry := range reader.Entries(key) { + size += int64(entry.Size) + } + measurementSizes.AddSize(db, rp, string(measurement), size) + } + + return nil + }); err != nil { + return err + } + progress.Report(fmt.Sprintf("TSM files inspected: %d\t/%d", processedFiles, shardSizes.files)) + } + fmt.Fprintf(cmd.Stderr, "\nCompleted in %s\n", time.Since(start)) + + sanitize := func(s string) []byte { + b, _ := json.Marshal(s) // json shouldn't be throwing errors when marshalling a string + return b + } + + fmt.Fprintf(cmd.Stdout, `{ + "Summary": {"shards": %d, "tsm_files": %d, "total_tsm_size": %d }, + "Shard": [`, shardSizes.shards, shardSizes.files, shardSizes.totalSize) + + first := true + shardSizes.ForEach(func(db, rp, id string, detail ShardDetails) { + var shardString []byte + if s, err := strconv.ParseInt(id, 10, 64); err != nil && strconv.FormatInt(s, 10) == id { + shardString = []byte(id) + } else { + shardString = sanitize(id) + } + if !first { + fmt.Fprint(cmd.Stdout, ",") + } + first = false + fmt.Fprintf(cmd.Stdout, ` + {"db": %s, "rp": %s, "shard": %s, "tsm_files": %d, "size": %d}`, + sanitize(db), sanitize(rp), shardString, detail.files, detail.size) + }) + + if cmd.detailed { + fmt.Fprintf(cmd.Stdout, ` + ], + "Measurement": [`) + + first = true + measurementSizes.ForEach(func(db, rp, measurement string, detail MeasurementDetails) { + if !first { + fmt.Fprint(cmd.Stdout, ",") + } + first = false + fmt.Fprintf(cmd.Stdout, ` + {"db": %s, "rp": %s, "measurement": %s, "size": %d}`, + sanitize(db), sanitize(rp), sanitize(measurement), detail.size) + }) + + } + fmt.Fprintf(cmd.Stdout, ` + ] +} +`, + ) + + return nil +} + +// printUsage prints the usage message to STDERR. +func (cmd *Command) printUsage() { + usage := `Displays report of disk usage. + +Usage: influx_inspect report [flags] <directory> + + -pattern <pattern> + Include only files matching a pattern. + -detailed + Report disk usage by measurement. + Defaults to "false". +` + + fmt.Fprintf(cmd.Stdout, usage) +} + +type ShardDetails struct { + size int64 + files int64 +} + +type ShardSizes struct { + m map[string]map[string]map[string]*ShardDetails + files int64 + shards int64 + totalSize int64 +} + +func (s *ShardSizes) AddTsmFileWithSize(db, rp, id string, size int64) { + if s.m == nil { + s.m = make(map[string]map[string]map[string]*ShardDetails) + } + if _, ok := s.m[db]; !ok { + s.m[db] = make(map[string]map[string]*ShardDetails) + } + if _, ok := s.m[db][rp]; !ok { + s.m[db][rp] = make(map[string]*ShardDetails) + } + if _, ok := s.m[db][rp][id]; !ok { + s.m[db][rp][id] = &ShardDetails{} + s.shards += 1 + } + s.m[db][rp][id].size += size + s.m[db][rp][id].files += 1 + s.files += 1 + s.totalSize += size +} + +func (s *ShardSizes) ForEach(f func(db, rp, id string, detail ShardDetails)) { + dbKeys := make([]string, 0, len(s.m)) + for db, _ := range s.m { + dbKeys = append(dbKeys, db) + } + sort.Strings(dbKeys) + for _, db := range dbKeys { + rpKeys := make([]string, 0, len(s.m[db])) + for rp, _ := range s.m[db] { + rpKeys = append(rpKeys, rp) + } + sort.Strings(rpKeys) + for _, rp := range rpKeys { + idKeys := make([]string, 0, len(s.m[db][rp])) + for id, _ := range s.m[db][rp] { + idKeys = append(idKeys, id) + } + sort.Strings(idKeys) + for _, id := range idKeys { + f(db, rp, id, *s.m[db][rp][id]) + } + } + } +} + +type MeasurementDetails struct { + size int64 +} + +type MeasurementSizes struct { + m map[string]map[string]map[string]*MeasurementDetails +} + +func (s *MeasurementSizes) AddSize(db, rp, measurement string, size int64) { + if s.m == nil { + s.m = make(map[string]map[string]map[string]*MeasurementDetails) + } + if _, ok := s.m[db]; !ok { + s.m[db] = make(map[string]map[string]*MeasurementDetails) + } + if _, ok := s.m[db][rp]; !ok { + s.m[db][rp] = make(map[string]*MeasurementDetails) + } + if _, ok := s.m[db][rp][measurement]; !ok { + s.m[db][rp][measurement] = &MeasurementDetails{} + } + s.m[db][rp][measurement].size += size +} + +func (s *MeasurementSizes) ForEach(f func(db, rp, measurement string, detail MeasurementDetails)) { + dbKeys := make([]string, 0, len(s.m)) + for db, _ := range s.m { + dbKeys = append(dbKeys, db) + } + sort.Strings(dbKeys) + for _, db := range dbKeys { + rpKeys := make([]string, 0, len(s.m[db])) + for rp, _ := range s.m[db] { + rpKeys = append(rpKeys, rp) + } + sort.Strings(rpKeys) + for _, rp := range rpKeys { + mKeys := make([]string, 0, len(s.m[db][rp])) + for m, _ := range s.m[db][rp] { + mKeys = append(mKeys, m) + } + sort.Strings(mKeys) + for _, m := range mKeys { + f(db, rp, m, *s.m[db][rp][m]) + } + } + } +} + +type ProgressReporter struct { + maxLength int + w io.Writer +} + +func NewProgressReporter(w io.Writer) *ProgressReporter { + return &ProgressReporter{w: w} +} + +func (p *ProgressReporter) Report(line string) { + if p.maxLength == 0 { + fmt.Fprintf(p.w, "\n") + p.maxLength = 1 + } + for len(line) < p.maxLength { + line += " " + } + p.maxLength = len(line) + p.maxLength++ + fmt.Fprint(p.w, line+"\r") +} diff --git a/cmd/influx_inspect/reportdisk/reportdisk_test.go b/cmd/influx_inspect/reportdisk/reportdisk_test.go new file mode 100644 index 0000000000..42e3d4ecaa --- /dev/null +++ b/cmd/influx_inspect/reportdisk/reportdisk_test.go @@ -0,0 +1,3 @@ +package reportdisk_test + +// TODO: write some tests diff --git a/pkg/reporthelper/walkshards.go b/pkg/reporthelper/walkshards.go new file mode 100644 index 0000000000..bb3e879a96 --- /dev/null +++ b/pkg/reporthelper/walkshards.go @@ -0,0 +1,71 @@ +// Package report reports statistics about TSM files. +package reporthelper + +import ( + "fmt" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + + "github.com/influxdata/influxdb/tsdb/engine/tsm1" +) + +func IsShardDir(dir string) error { + name := filepath.Base(dir) + if id, err := strconv.Atoi(name); err != nil || id < 1 { + return fmt.Errorf("not a valid shard dir: %v", dir) + } + + return nil +} + +func WalkShardDirs(root string, fn func(db, rp, id, path string) error) error { + type location struct { + db, rp, id, path string + } + + var dirs []location + if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if info.IsDir() { + return nil + } + + if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension { + shardDir := filepath.Dir(path) + + if err := IsShardDir(shardDir); err != nil { + return err + } + absPath, err := filepath.Abs(path) + if err != nil { + return err + } + parts := strings.Split(absPath, string(filepath.Separator)) + db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2] + dirs = append(dirs, location{db: db, rp: rp, id: id, path: path}) + return nil + } + return nil + }); err != nil { + return err + } + + sort.Slice(dirs, func(i, j int) bool { + a, _ := strconv.Atoi(dirs[i].id) + b, _ := strconv.Atoi(dirs[j].id) + return a < b + }) + + for _, shard := range dirs { + if err := fn(shard.db, shard.rp, shard.id, shard.path); err != nil { + return err + } + } + return nil +} diff --git a/tsdb/engine/tsm1/file_store_key_iterator_test.go b/tsdb/engine/tsm1/file_store_key_iterator_test.go index 8fadf1b10e..abb8a87916 100644 --- a/tsdb/engine/tsm1/file_store_key_iterator_test.go +++ b/tsdb/engine/tsm1/file_store_key_iterator_test.go @@ -149,34 +149,36 @@ func (t *mockTSMFile) KeyAt(idx int) ([]byte, byte) { return []byte(t.keys[idx]), BlockFloat64 } -func (*mockTSMFile) Path() string { panic("implement me") } -func (*mockTSMFile) Read(key []byte, t int64) ([]Value, error) { panic("implement me") } -func (*mockTSMFile) ReadAt(entry *IndexEntry, values []Value) ([]Value, error) { panic("implement me") } -func (*mockTSMFile) Entries(key []byte) []IndexEntry { panic("implement me") } -func (*mockTSMFile) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { panic("implement me") } -func (*mockTSMFile) ContainsValue(key []byte, t int64) bool { panic("implement me") } -func (*mockTSMFile) Contains(key []byte) bool { panic("implement me") } -func (*mockTSMFile) OverlapsTimeRange(min, max int64) bool { panic("implement me") } -func (*mockTSMFile) OverlapsKeyRange(min, max []byte) bool { panic("implement me") } -func (*mockTSMFile) TimeRange() (int64, int64) { panic("implement me") } -func (*mockTSMFile) TombstoneRange(key []byte) []TimeRange { panic("implement me") } -func (*mockTSMFile) KeyRange() ([]byte, []byte) { panic("implement me") } -func (*mockTSMFile) Type(key []byte) (byte, error) { panic("implement me") } -func (*mockTSMFile) BatchDelete() BatchDeleter { panic("implement me") } -func (*mockTSMFile) Delete(keys [][]byte) error { panic("implement me") } -func (*mockTSMFile) DeleteRange(keys [][]byte, min, max int64) error { panic("implement me") } -func (*mockTSMFile) HasTombstones() bool { panic("implement me") } -func (*mockTSMFile) TombstoneStats() TombstoneStat { panic("implement me") } -func (*mockTSMFile) Close() error { panic("implement me") } -func (*mockTSMFile) Size() uint32 { panic("implement me") } -func (*mockTSMFile) Rename(path string) error { panic("implement me") } -func (*mockTSMFile) Remove() error { panic("implement me") } -func (*mockTSMFile) InUse() bool { panic("implement me") } -func (*mockTSMFile) Ref() { panic("implement me") } -func (*mockTSMFile) Unref() { panic("implement me") } -func (*mockTSMFile) Stats() FileStat { panic("implement me") } -func (*mockTSMFile) BlockIterator() *BlockIterator { panic("implement me") } -func (*mockTSMFile) Free() error { panic("implement me") } +func (*mockTSMFile) Path() string { panic("implement me") } +func (*mockTSMFile) Read(key []byte, t int64) ([]Value, error) { panic("implement me") } +func (*mockTSMFile) ReadAt(entry *IndexEntry, values []Value) ([]Value, error) { panic("implement me") } +func (*mockTSMFile) Entries(key []byte) []IndexEntry { panic("implement me") } +func (*mockTSMFile) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { + panic("implement me") +} +func (*mockTSMFile) ContainsValue(key []byte, t int64) bool { panic("implement me") } +func (*mockTSMFile) Contains(key []byte) bool { panic("implement me") } +func (*mockTSMFile) OverlapsTimeRange(min, max int64) bool { panic("implement me") } +func (*mockTSMFile) OverlapsKeyRange(min, max []byte) bool { panic("implement me") } +func (*mockTSMFile) TimeRange() (int64, int64) { panic("implement me") } +func (*mockTSMFile) TombstoneRange(key []byte) []TimeRange { panic("implement me") } +func (*mockTSMFile) KeyRange() ([]byte, []byte) { panic("implement me") } +func (*mockTSMFile) Type(key []byte) (byte, error) { panic("implement me") } +func (*mockTSMFile) BatchDelete() BatchDeleter { panic("implement me") } +func (*mockTSMFile) Delete(keys [][]byte) error { panic("implement me") } +func (*mockTSMFile) DeleteRange(keys [][]byte, min, max int64) error { panic("implement me") } +func (*mockTSMFile) HasTombstones() bool { panic("implement me") } +func (*mockTSMFile) TombstoneStats() TombstoneStat { panic("implement me") } +func (*mockTSMFile) Close() error { panic("implement me") } +func (*mockTSMFile) Size() uint32 { panic("implement me") } +func (*mockTSMFile) Rename(path string) error { panic("implement me") } +func (*mockTSMFile) Remove() error { panic("implement me") } +func (*mockTSMFile) InUse() bool { panic("implement me") } +func (*mockTSMFile) Ref() { panic("implement me") } +func (*mockTSMFile) Unref() { panic("implement me") } +func (*mockTSMFile) Stats() FileStat { panic("implement me") } +func (*mockTSMFile) BlockIterator() *BlockIterator { panic("implement me") } +func (*mockTSMFile) Free() error { panic("implement me") } func (*mockTSMFile) ReadFloatBlockAt(*IndexEntry, *[]FloatValue) ([]FloatValue, error) { panic("implement me") diff --git a/tsdb/engine/tsm1/ring_test.go b/tsdb/engine/tsm1/ring_test.go index 394de7246e..c8bea3abf1 100644 --- a/tsdb/engine/tsm1/ring_test.go +++ b/tsdb/engine/tsm1/ring_test.go @@ -79,7 +79,9 @@ func benchmarkRingGetPartition(b *testing.B, r *ring, keys int) { } } -func BenchmarkRing_getPartition_100(b *testing.B) { benchmarkRingGetPartition(b, MustNewRing(256), 100) } +func BenchmarkRing_getPartition_100(b *testing.B) { + benchmarkRingGetPartition(b, MustNewRing(256), 100) +} func BenchmarkRing_getPartition_1000(b *testing.B) { benchmarkRingGetPartition(b, MustNewRing(256), 1000) }