influxdb/cmd/influx_inspect/reportdisk/reportdisk.go

312 lines
7.3 KiB
Go

// Package report reports statistics about TSM files.
package reportdisk
import (
"encoding/json"
"flag"
"fmt"
"io"
"os"
"sort"
"strconv"
"strings"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/reporthelper"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
// Command represents the program execution for "influxd report".
type Command struct {
Stderr io.Writer
Stdout io.Writer
dir string
pattern string
detailed bool
}
// NewCommand returns a new instance of Command.
func NewCommand() *Command {
return &Command{
Stderr: os.Stderr,
Stdout: os.Stdout,
}
}
// Run executes the command.
func (cmd *Command) Run(args ...string) error {
fs := flag.NewFlagSet("report", flag.ExitOnError)
fs.StringVar(&cmd.pattern, "pattern", "", "Include only files matching a pattern")
fs.BoolVar(&cmd.detailed, "detailed", false, "Report disk size by measurement")
fs.SetOutput(cmd.Stdout)
fs.Usage = cmd.printUsage
if err := fs.Parse(args); err != nil {
return err
}
cmd.dir = fs.Arg(0)
start := time.Now()
shardSizes := ShardSizes{}
if err := reporthelper.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error {
if cmd.pattern != "" && !strings.Contains(path, cmd.pattern) {
return nil
}
stat, err := os.Stat(path)
if err != nil {
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err)
return nil
}
shardSizes.AddTsmFileWithSize(db, rp, id, stat.Size())
return nil
}); err != nil {
return err
}
measurementSizes := MeasurementSizes{}
if cmd.detailed {
processedFiles := 0
progress := NewProgressReporter(cmd.Stderr)
if err := reporthelper.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error {
if cmd.pattern != "" && !strings.Contains(path, cmd.pattern) {
return nil
}
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err)
return nil
}
progress.Report(fmt.Sprintf("TSM files inspected: %d\t/%d", processedFiles, shardSizes.files))
processedFiles++
reader, err := tsm1.NewTSMReader(file)
if err != nil {
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", file.Name(), err)
return nil
}
keyNum := reader.KeyCount()
for i := 0; i < keyNum; i++ {
key, _ := reader.KeyAt(i)
series, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
measurement := models.ParseName(series)
var size int64
for _, entry := range reader.Entries(key) {
size += int64(entry.Size)
}
measurementSizes.AddSize(db, rp, string(measurement), size)
}
return nil
}); err != nil {
return err
}
progress.Report(fmt.Sprintf("TSM files inspected: %d\t/%d", processedFiles, shardSizes.files))
}
fmt.Fprintf(cmd.Stderr, "\nCompleted in %s\n", time.Since(start))
sanitize := func(s string) []byte {
b, _ := json.Marshal(s) // json shouldn't be throwing errors when marshalling a string
return b
}
fmt.Fprintf(cmd.Stdout, `{
"Summary": {"shards": %d, "tsm_files": %d, "total_tsm_size": %d },
"Shard": [`, shardSizes.shards, shardSizes.files, shardSizes.totalSize)
first := true
shardSizes.ForEach(func(db, rp, id string, detail ShardDetails) {
var shardString []byte
if s, err := strconv.ParseInt(id, 10, 64); err != nil && strconv.FormatInt(s, 10) == id {
shardString = []byte(id)
} else {
shardString = sanitize(id)
}
if !first {
fmt.Fprint(cmd.Stdout, ",")
}
first = false
fmt.Fprintf(cmd.Stdout, `
{"db": %s, "rp": %s, "shard": %s, "tsm_files": %d, "size": %d}`,
sanitize(db), sanitize(rp), shardString, detail.files, detail.size)
})
if cmd.detailed {
fmt.Fprintf(cmd.Stdout, `
],
"Measurement": [`)
first = true
measurementSizes.ForEach(func(db, rp, measurement string, detail MeasurementDetails) {
if !first {
fmt.Fprint(cmd.Stdout, ",")
}
first = false
fmt.Fprintf(cmd.Stdout, `
{"db": %s, "rp": %s, "measurement": %s, "size": %d}`,
sanitize(db), sanitize(rp), sanitize(measurement), detail.size)
})
}
fmt.Fprintf(cmd.Stdout, `
]
}
`,
)
return nil
}
// printUsage prints the usage message to STDERR.
func (cmd *Command) printUsage() {
usage := `Displays report of disk usage.
Usage: influx_inspect report [flags] <directory>
-pattern <pattern>
Include only files matching a pattern.
-detailed
Report disk usage by measurement.
Defaults to "false".
`
fmt.Fprintf(cmd.Stdout, usage)
}
type ShardDetails struct {
size int64
files int64
}
type ShardSizes struct {
m map[string]map[string]map[string]*ShardDetails
files int64
shards int64
totalSize int64
}
func (s *ShardSizes) AddTsmFileWithSize(db, rp, id string, size int64) {
if s.m == nil {
s.m = make(map[string]map[string]map[string]*ShardDetails)
}
if _, ok := s.m[db]; !ok {
s.m[db] = make(map[string]map[string]*ShardDetails)
}
if _, ok := s.m[db][rp]; !ok {
s.m[db][rp] = make(map[string]*ShardDetails)
}
if _, ok := s.m[db][rp][id]; !ok {
s.m[db][rp][id] = &ShardDetails{}
s.shards += 1
}
s.m[db][rp][id].size += size
s.m[db][rp][id].files += 1
s.files += 1
s.totalSize += size
}
func (s *ShardSizes) ForEach(f func(db, rp, id string, detail ShardDetails)) {
dbKeys := make([]string, 0, len(s.m))
for db, _ := range s.m {
dbKeys = append(dbKeys, db)
}
sort.Strings(dbKeys)
for _, db := range dbKeys {
rpKeys := make([]string, 0, len(s.m[db]))
for rp, _ := range s.m[db] {
rpKeys = append(rpKeys, rp)
}
sort.Strings(rpKeys)
for _, rp := range rpKeys {
idKeys := make([]string, 0, len(s.m[db][rp]))
for id, _ := range s.m[db][rp] {
idKeys = append(idKeys, id)
}
sort.Strings(idKeys)
for _, id := range idKeys {
f(db, rp, id, *s.m[db][rp][id])
}
}
}
}
type MeasurementDetails struct {
size int64
}
type MeasurementSizes struct {
m map[string]map[string]map[string]*MeasurementDetails
}
func (s *MeasurementSizes) AddSize(db, rp, measurement string, size int64) {
if s.m == nil {
s.m = make(map[string]map[string]map[string]*MeasurementDetails)
}
if _, ok := s.m[db]; !ok {
s.m[db] = make(map[string]map[string]*MeasurementDetails)
}
if _, ok := s.m[db][rp]; !ok {
s.m[db][rp] = make(map[string]*MeasurementDetails)
}
if _, ok := s.m[db][rp][measurement]; !ok {
s.m[db][rp][measurement] = &MeasurementDetails{}
}
s.m[db][rp][measurement].size += size
}
func (s *MeasurementSizes) ForEach(f func(db, rp, measurement string, detail MeasurementDetails)) {
dbKeys := make([]string, 0, len(s.m))
for db, _ := range s.m {
dbKeys = append(dbKeys, db)
}
sort.Strings(dbKeys)
for _, db := range dbKeys {
rpKeys := make([]string, 0, len(s.m[db]))
for rp, _ := range s.m[db] {
rpKeys = append(rpKeys, rp)
}
sort.Strings(rpKeys)
for _, rp := range rpKeys {
mKeys := make([]string, 0, len(s.m[db][rp]))
for m, _ := range s.m[db][rp] {
mKeys = append(mKeys, m)
}
sort.Strings(mKeys)
for _, m := range mKeys {
f(db, rp, m, *s.m[db][rp][m])
}
}
}
}
type ProgressReporter struct {
maxLength int
w io.Writer
}
func NewProgressReporter(w io.Writer) *ProgressReporter {
return &ProgressReporter{w: w}
}
func (p *ProgressReporter) Report(line string) {
if p.maxLength == 0 {
fmt.Fprintf(p.w, "\n")
p.maxLength = 1
}
for len(line) < p.maxLength {
line += " "
}
p.maxLength = len(line)
p.maxLength++
fmt.Fprint(p.w, line+"\r")
}