Merge pull request #20647 from lesam/add-diskusage-inspect

feat(inspect): Add report-disk for disk usage by measurement
pull/20664/head
Sam Arnold 2021-01-29 11:36:31 -05:00 committed by GitHub
commit b2f0d05ecc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 394 additions and 61 deletions

View File

@ -3,6 +3,7 @@ package main
import (
"fmt"
"github.com/influxdata/influxdb/cmd/influx_inspect/reportdisk"
"io"
"log"
"os"
@ -98,6 +99,11 @@ func (m *Main) Run(args ...string) error {
if err := name.Run(args...); err != nil {
return fmt.Errorf("report: %s", err)
}
case "report-disk":
name := reportdisk.NewCommand()
if err := name.Run(args...); err != nil {
return fmt.Errorf("report: %s", err)
}
case "reporttsi":
name := reporttsi.NewCommand()
if err := name.Run(args...); err != nil {

View File

@ -4,6 +4,7 @@ package report
import (
"flag"
"fmt"
"github.com/influxdata/influxdb/pkg/reporthelper"
"io"
"math"
"os"
@ -60,7 +61,7 @@ func (cmd *Command) Run(args ...string) error {
cmd.dir = fs.Arg(0)
err := cmd.isShardDir(cmd.dir)
err := reporthelper.IsShardDir(cmd.dir)
if cmd.detailed && err != nil {
return fmt.Errorf("-detailed only supported for shard dirs")
}
@ -79,8 +80,8 @@ func (cmd *Command) Run(args ...string) error {
minTime, maxTime := int64(math.MaxInt64), int64(math.MinInt64)
var fileCount int
if err := cmd.walkShardDirs(cmd.dir, func(db, rp, id, path string) error {
if cmd.pattern != "" && strings.Contains(path, cmd.pattern) {
if err := reporthelper.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error {
if cmd.pattern != "" && !strings.Contains(path, cmd.pattern) {
return nil
}
@ -218,64 +219,6 @@ func sortKeys(vals map[string]counter) (keys []string) {
return keys
}
func (cmd *Command) isShardDir(dir string) error {
name := filepath.Base(dir)
if id, err := strconv.Atoi(name); err != nil || id < 1 {
return fmt.Errorf("not a valid shard dir: %v", dir)
}
return nil
}
func (cmd *Command) walkShardDirs(root string, fn func(db, rp, id, path string) error) error {
type location struct {
db, rp, id, path string
}
var dirs []location
if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension {
shardDir := filepath.Dir(path)
if err := cmd.isShardDir(shardDir); err != nil {
return err
}
absPath, err := filepath.Abs(path)
if err != nil {
return err
}
parts := strings.Split(absPath, string(filepath.Separator))
db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2]
dirs = append(dirs, location{db: db, rp: rp, id: id, path: path})
return nil
}
return nil
}); err != nil {
return err
}
sort.Slice(dirs, func(i, j int) bool {
a, _ := strconv.Atoi(dirs[i].id)
b, _ := strconv.Atoi(dirs[j].id)
return a < b
})
for _, shard := range dirs {
if err := fn(shard.db, shard.rp, shard.id, shard.path); err != nil {
return err
}
}
return nil
}
// printUsage prints the usage message to STDERR.
func (cmd *Command) printUsage() {
usage := `Displays shard level report.

View File

@ -0,0 +1,311 @@
// Package report reports statistics about TSM files.
package reportdisk
import (
"encoding/json"
"flag"
"fmt"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/reporthelper"
"io"
"os"
"sort"
"strconv"
"strings"
"time"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
// Command represents the program execution for "influxd report".
type Command struct {
Stderr io.Writer
Stdout io.Writer
dir string
pattern string
detailed bool
}
// NewCommand returns a new instance of Command.
func NewCommand() *Command {
return &Command{
Stderr: os.Stderr,
Stdout: os.Stdout,
}
}
// Run executes the command.
func (cmd *Command) Run(args ...string) error {
fs := flag.NewFlagSet("report", flag.ExitOnError)
fs.StringVar(&cmd.pattern, "pattern", "", "Include only files matching a pattern")
fs.BoolVar(&cmd.detailed, "detailed", false, "Report disk size by measurement")
fs.SetOutput(cmd.Stdout)
fs.Usage = cmd.printUsage
if err := fs.Parse(args); err != nil {
return err
}
cmd.dir = fs.Arg(0)
start := time.Now()
shardSizes := ShardSizes{}
if err := reporthelper.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error {
if cmd.pattern != "" && !strings.Contains(path, cmd.pattern) {
return nil
}
stat, err := os.Stat(path)
if err != nil {
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err)
return nil
}
shardSizes.AddTsmFileWithSize(db, rp, id, stat.Size())
return nil
}); err != nil {
return err
}
measurementSizes := MeasurementSizes{}
if cmd.detailed {
processedFiles := 0
progress := NewProgressReporter(cmd.Stderr)
if err := reporthelper.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error {
if cmd.pattern != "" && !strings.Contains(path, cmd.pattern) {
return nil
}
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err)
return nil
}
progress.Report(fmt.Sprintf("TSM files inspected: %d\t/%d", processedFiles, shardSizes.files))
processedFiles++
reader, err := tsm1.NewTSMReader(file)
if err != nil {
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", file.Name(), err)
return nil
}
keyNum := reader.KeyCount()
for i := 0; i < keyNum; i++ {
key, _ := reader.KeyAt(i)
series, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
measurement := models.ParseName(series)
var size int64
for _, entry := range reader.Entries(key) {
size += int64(entry.Size)
}
measurementSizes.AddSize(db, rp, string(measurement), size)
}
return nil
}); err != nil {
return err
}
progress.Report(fmt.Sprintf("TSM files inspected: %d\t/%d", processedFiles, shardSizes.files))
}
fmt.Fprintf(cmd.Stderr, "\nCompleted in %s\n", time.Since(start))
sanitize := func(s string) []byte {
b, _ := json.Marshal(s) // json shouldn't be throwing errors when marshalling a string
return b
}
fmt.Fprintf(cmd.Stdout, `{
"Summary": {"shards": %d, "tsm_files": %d, "total_tsm_size": %d },
"Shard": [`, shardSizes.shards, shardSizes.files, shardSizes.totalSize)
first := true
shardSizes.ForEach(func(db, rp, id string, detail ShardDetails) {
var shardString []byte
if s, err := strconv.ParseInt(id, 10, 64); err != nil && strconv.FormatInt(s, 10) == id {
shardString = []byte(id)
} else {
shardString = sanitize(id)
}
if !first {
fmt.Fprint(cmd.Stdout, ",")
}
first = false
fmt.Fprintf(cmd.Stdout, `
{"db": %s, "rp": %s, "shard": %s, "tsm_files": %d, "size": %d}`,
sanitize(db), sanitize(rp), shardString, detail.files, detail.size)
})
if cmd.detailed {
fmt.Fprintf(cmd.Stdout, `
],
"Measurement": [`)
first = true
measurementSizes.ForEach(func(db, rp, measurement string, detail MeasurementDetails) {
if !first {
fmt.Fprint(cmd.Stdout, ",")
}
first = false
fmt.Fprintf(cmd.Stdout, `
{"db": %s, "rp": %s, "measurement": %s, "size": %d}`,
sanitize(db), sanitize(rp), sanitize(measurement), detail.size)
})
}
fmt.Fprintf(cmd.Stdout, `
]
}
`,
)
return nil
}
// printUsage prints the usage message to STDERR.
func (cmd *Command) printUsage() {
usage := `Displays report of disk usage.
Usage: influx_inspect report [flags] <directory>
-pattern <pattern>
Include only files matching a pattern.
-detailed
Report disk usage by measurement.
Defaults to "false".
`
fmt.Fprintf(cmd.Stdout, usage)
}
type ShardDetails struct {
size int64
files int64
}
type ShardSizes struct {
m map[string]map[string]map[string]*ShardDetails
files int64
shards int64
totalSize int64
}
func (s *ShardSizes) AddTsmFileWithSize(db, rp, id string, size int64) {
if s.m == nil {
s.m = make(map[string]map[string]map[string]*ShardDetails)
}
if _, ok := s.m[db]; !ok {
s.m[db] = make(map[string]map[string]*ShardDetails)
}
if _, ok := s.m[db][rp]; !ok {
s.m[db][rp] = make(map[string]*ShardDetails)
}
if _, ok := s.m[db][rp][id]; !ok {
s.m[db][rp][id] = &ShardDetails{}
s.shards += 1
}
s.m[db][rp][id].size += size
s.m[db][rp][id].files += 1
s.files += 1
s.totalSize += size
}
func (s *ShardSizes) ForEach(f func(db, rp, id string, detail ShardDetails)) {
dbKeys := make([]string, 0, len(s.m))
for db, _ := range s.m {
dbKeys = append(dbKeys, db)
}
sort.Strings(dbKeys)
for _, db := range dbKeys {
rpKeys := make([]string, 0, len(s.m[db]))
for rp, _ := range s.m[db] {
rpKeys = append(rpKeys, rp)
}
sort.Strings(rpKeys)
for _, rp := range rpKeys {
idKeys := make([]string, 0, len(s.m[db][rp]))
for id, _ := range s.m[db][rp] {
idKeys = append(idKeys, id)
}
sort.Strings(idKeys)
for _, id := range idKeys {
f(db, rp, id, *s.m[db][rp][id])
}
}
}
}
type MeasurementDetails struct {
size int64
}
type MeasurementSizes struct {
m map[string]map[string]map[string]*MeasurementDetails
}
func (s *MeasurementSizes) AddSize(db, rp, measurement string, size int64) {
if s.m == nil {
s.m = make(map[string]map[string]map[string]*MeasurementDetails)
}
if _, ok := s.m[db]; !ok {
s.m[db] = make(map[string]map[string]*MeasurementDetails)
}
if _, ok := s.m[db][rp]; !ok {
s.m[db][rp] = make(map[string]*MeasurementDetails)
}
if _, ok := s.m[db][rp][measurement]; !ok {
s.m[db][rp][measurement] = &MeasurementDetails{}
}
s.m[db][rp][measurement].size += size
}
func (s *MeasurementSizes) ForEach(f func(db, rp, measurement string, detail MeasurementDetails)) {
dbKeys := make([]string, 0, len(s.m))
for db, _ := range s.m {
dbKeys = append(dbKeys, db)
}
sort.Strings(dbKeys)
for _, db := range dbKeys {
rpKeys := make([]string, 0, len(s.m[db]))
for rp, _ := range s.m[db] {
rpKeys = append(rpKeys, rp)
}
sort.Strings(rpKeys)
for _, rp := range rpKeys {
mKeys := make([]string, 0, len(s.m[db][rp]))
for m, _ := range s.m[db][rp] {
mKeys = append(mKeys, m)
}
sort.Strings(mKeys)
for _, m := range mKeys {
f(db, rp, m, *s.m[db][rp][m])
}
}
}
}
type ProgressReporter struct {
maxLength int
w io.Writer
}
func NewProgressReporter(w io.Writer) *ProgressReporter {
return &ProgressReporter{w: w}
}
func (p *ProgressReporter) Report(line string) {
if p.maxLength == 0 {
fmt.Fprintf(p.w, "\n")
p.maxLength = 1
}
for len(line) < p.maxLength {
line += " "
}
p.maxLength = len(line)
p.maxLength++
fmt.Fprint(p.w, line+"\r")
}

View File

@ -0,0 +1,3 @@
package reportdisk_test
// TODO: write some tests

View File

@ -0,0 +1,70 @@
// Package report reports statistics about TSM files.
package reporthelper
import (
"fmt"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
)
func IsShardDir(dir string) error {
name := filepath.Base(dir)
if id, err := strconv.Atoi(name); err != nil || id < 1 {
return fmt.Errorf("not a valid shard dir: %v", dir)
}
return nil
}
func WalkShardDirs(root string, fn func(db, rp, id, path string) error) error {
type location struct {
db, rp, id, path string
}
var dirs []location
if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension {
shardDir := filepath.Dir(path)
if err := IsShardDir(shardDir); err != nil {
return err
}
absPath, err := filepath.Abs(path)
if err != nil {
return err
}
parts := strings.Split(absPath, string(filepath.Separator))
db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2]
dirs = append(dirs, location{db: db, rp: rp, id: id, path: path})
return nil
}
return nil
}); err != nil {
return err
}
sort.Slice(dirs, func(i, j int) bool {
a, _ := strconv.Atoi(dirs[i].id)
b, _ := strconv.Atoi(dirs[j].id)
return a < b
})
for _, shard := range dirs {
if err := fn(shard.db, shard.rp, shard.id, shard.path); err != nil {
return err
}
}
return nil
}