feat(inspect): Add report-disk for disk usage by measurement (#20917)

* feat(inspect): Add report-disk for disk usage by measurement

(cherry picked from commit a6152e8ac1)

* fix(inspect): bad pattern matching

(cherry picked from commit 3a31e2370e)

* chore: fix goimports

* chore: update changelog
pull/20989/head v1.8.5rc0
Sam Arnold 2021-03-11 14:25:55 -05:00 committed by GitHub
parent 8a31b0ebdc
commit e95a6a4a4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 429 additions and 90 deletions

View File

@ -3,6 +3,7 @@ v1.8.5 [unreleased]
### Features
- [#20917](https://github.com/influxdata/influxdb/pull/20917): feat(inspect): Add report-disk for disk usage by measurement
- [#20118](https://github.com/influxdata/influxdb/pull/20118): feat: Optimize shard lookups in groups containing only one shard. Thanks @StoneYunZhao!
- [#20910](https://github.com/influxdata/influxdb/pull/20910): feat: Make meta queries respect QueryTimeout values

View File

@ -16,6 +16,7 @@ import (
"github.com/influxdata/influxdb/cmd/influx_inspect/export"
"github.com/influxdata/influxdb/cmd/influx_inspect/help"
"github.com/influxdata/influxdb/cmd/influx_inspect/report"
"github.com/influxdata/influxdb/cmd/influx_inspect/reportdisk"
"github.com/influxdata/influxdb/cmd/influx_inspect/reporttsi"
"github.com/influxdata/influxdb/cmd/influx_inspect/verify/seriesfile"
"github.com/influxdata/influxdb/cmd/influx_inspect/verify/tombstone"
@ -98,6 +99,11 @@ func (m *Main) Run(args ...string) error {
if err := name.Run(args...); err != nil {
return fmt.Errorf("report: %s", err)
}
case "report-disk":
name := reportdisk.NewCommand()
if err := name.Run(args...); err != nil {
return fmt.Errorf("report: %s", err)
}
case "reporttsi":
name := reporttsi.NewCommand()
if err := name.Run(args...); err != nil {

View File

@ -15,6 +15,7 @@ import (
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/reporthelper"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/retailnext/hllpp"
)
@ -60,7 +61,7 @@ func (cmd *Command) Run(args ...string) error {
cmd.dir = fs.Arg(0)
err := cmd.isShardDir(cmd.dir)
err := reporthelper.IsShardDir(cmd.dir)
if cmd.detailed && err != nil {
return fmt.Errorf("-detailed only supported for shard dirs")
}
@ -79,8 +80,8 @@ func (cmd *Command) Run(args ...string) error {
minTime, maxTime := int64(math.MaxInt64), int64(math.MinInt64)
var fileCount int
if err := cmd.walkShardDirs(cmd.dir, func(db, rp, id, path string) error {
if cmd.pattern != "" && strings.Contains(path, cmd.pattern) {
if err := reporthelper.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error {
if cmd.pattern != "" && !strings.Contains(path, cmd.pattern) {
return nil
}
@ -218,64 +219,6 @@ func sortKeys(vals map[string]counter) (keys []string) {
return keys
}
func (cmd *Command) isShardDir(dir string) error {
name := filepath.Base(dir)
if id, err := strconv.Atoi(name); err != nil || id < 1 {
return fmt.Errorf("not a valid shard dir: %v", dir)
}
return nil
}
func (cmd *Command) walkShardDirs(root string, fn func(db, rp, id, path string) error) error {
type location struct {
db, rp, id, path string
}
var dirs []location
if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension {
shardDir := filepath.Dir(path)
if err := cmd.isShardDir(shardDir); err != nil {
return err
}
absPath, err := filepath.Abs(path)
if err != nil {
return err
}
parts := strings.Split(absPath, string(filepath.Separator))
db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2]
dirs = append(dirs, location{db: db, rp: rp, id: id, path: path})
return nil
}
return nil
}); err != nil {
return err
}
sort.Slice(dirs, func(i, j int) bool {
a, _ := strconv.Atoi(dirs[i].id)
b, _ := strconv.Atoi(dirs[j].id)
return a < b
})
for _, shard := range dirs {
if err := fn(shard.db, shard.rp, shard.id, shard.path); err != nil {
return err
}
}
return nil
}
// printUsage prints the usage message to STDERR.
func (cmd *Command) printUsage() {
usage := `Displays shard level report.

View File

@ -0,0 +1,311 @@
// Package report reports statistics about TSM files.
package reportdisk
import (
"encoding/json"
"flag"
"fmt"
"io"
"os"
"sort"
"strconv"
"strings"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/reporthelper"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
// Command represents the program execution for "influxd report".
type Command struct {
Stderr io.Writer
Stdout io.Writer
dir string
pattern string
detailed bool
}
// NewCommand returns a new instance of Command.
func NewCommand() *Command {
return &Command{
Stderr: os.Stderr,
Stdout: os.Stdout,
}
}
// Run executes the command.
func (cmd *Command) Run(args ...string) error {
fs := flag.NewFlagSet("report", flag.ExitOnError)
fs.StringVar(&cmd.pattern, "pattern", "", "Include only files matching a pattern")
fs.BoolVar(&cmd.detailed, "detailed", false, "Report disk size by measurement")
fs.SetOutput(cmd.Stdout)
fs.Usage = cmd.printUsage
if err := fs.Parse(args); err != nil {
return err
}
cmd.dir = fs.Arg(0)
start := time.Now()
shardSizes := ShardSizes{}
if err := reporthelper.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error {
if cmd.pattern != "" && !strings.Contains(path, cmd.pattern) {
return nil
}
stat, err := os.Stat(path)
if err != nil {
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err)
return nil
}
shardSizes.AddTsmFileWithSize(db, rp, id, stat.Size())
return nil
}); err != nil {
return err
}
measurementSizes := MeasurementSizes{}
if cmd.detailed {
processedFiles := 0
progress := NewProgressReporter(cmd.Stderr)
if err := reporthelper.WalkShardDirs(cmd.dir, func(db, rp, id, path string) error {
if cmd.pattern != "" && !strings.Contains(path, cmd.pattern) {
return nil
}
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err)
return nil
}
progress.Report(fmt.Sprintf("TSM files inspected: %d\t/%d", processedFiles, shardSizes.files))
processedFiles++
reader, err := tsm1.NewTSMReader(file)
if err != nil {
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", file.Name(), err)
return nil
}
keyNum := reader.KeyCount()
for i := 0; i < keyNum; i++ {
key, _ := reader.KeyAt(i)
series, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
measurement := models.ParseName(series)
var size int64
for _, entry := range reader.Entries(key) {
size += int64(entry.Size)
}
measurementSizes.AddSize(db, rp, string(measurement), size)
}
return nil
}); err != nil {
return err
}
progress.Report(fmt.Sprintf("TSM files inspected: %d\t/%d", processedFiles, shardSizes.files))
}
fmt.Fprintf(cmd.Stderr, "\nCompleted in %s\n", time.Since(start))
sanitize := func(s string) []byte {
b, _ := json.Marshal(s) // json shouldn't be throwing errors when marshalling a string
return b
}
fmt.Fprintf(cmd.Stdout, `{
"Summary": {"shards": %d, "tsm_files": %d, "total_tsm_size": %d },
"Shard": [`, shardSizes.shards, shardSizes.files, shardSizes.totalSize)
first := true
shardSizes.ForEach(func(db, rp, id string, detail ShardDetails) {
var shardString []byte
if s, err := strconv.ParseInt(id, 10, 64); err != nil && strconv.FormatInt(s, 10) == id {
shardString = []byte(id)
} else {
shardString = sanitize(id)
}
if !first {
fmt.Fprint(cmd.Stdout, ",")
}
first = false
fmt.Fprintf(cmd.Stdout, `
{"db": %s, "rp": %s, "shard": %s, "tsm_files": %d, "size": %d}`,
sanitize(db), sanitize(rp), shardString, detail.files, detail.size)
})
if cmd.detailed {
fmt.Fprintf(cmd.Stdout, `
],
"Measurement": [`)
first = true
measurementSizes.ForEach(func(db, rp, measurement string, detail MeasurementDetails) {
if !first {
fmt.Fprint(cmd.Stdout, ",")
}
first = false
fmt.Fprintf(cmd.Stdout, `
{"db": %s, "rp": %s, "measurement": %s, "size": %d}`,
sanitize(db), sanitize(rp), sanitize(measurement), detail.size)
})
}
fmt.Fprintf(cmd.Stdout, `
]
}
`,
)
return nil
}
// printUsage prints the usage message to STDERR.
func (cmd *Command) printUsage() {
usage := `Displays report of disk usage.
Usage: influx_inspect report [flags] <directory>
-pattern <pattern>
Include only files matching a pattern.
-detailed
Report disk usage by measurement.
Defaults to "false".
`
fmt.Fprintf(cmd.Stdout, usage)
}
type ShardDetails struct {
size int64
files int64
}
type ShardSizes struct {
m map[string]map[string]map[string]*ShardDetails
files int64
shards int64
totalSize int64
}
func (s *ShardSizes) AddTsmFileWithSize(db, rp, id string, size int64) {
if s.m == nil {
s.m = make(map[string]map[string]map[string]*ShardDetails)
}
if _, ok := s.m[db]; !ok {
s.m[db] = make(map[string]map[string]*ShardDetails)
}
if _, ok := s.m[db][rp]; !ok {
s.m[db][rp] = make(map[string]*ShardDetails)
}
if _, ok := s.m[db][rp][id]; !ok {
s.m[db][rp][id] = &ShardDetails{}
s.shards += 1
}
s.m[db][rp][id].size += size
s.m[db][rp][id].files += 1
s.files += 1
s.totalSize += size
}
func (s *ShardSizes) ForEach(f func(db, rp, id string, detail ShardDetails)) {
dbKeys := make([]string, 0, len(s.m))
for db, _ := range s.m {
dbKeys = append(dbKeys, db)
}
sort.Strings(dbKeys)
for _, db := range dbKeys {
rpKeys := make([]string, 0, len(s.m[db]))
for rp, _ := range s.m[db] {
rpKeys = append(rpKeys, rp)
}
sort.Strings(rpKeys)
for _, rp := range rpKeys {
idKeys := make([]string, 0, len(s.m[db][rp]))
for id, _ := range s.m[db][rp] {
idKeys = append(idKeys, id)
}
sort.Strings(idKeys)
for _, id := range idKeys {
f(db, rp, id, *s.m[db][rp][id])
}
}
}
}
type MeasurementDetails struct {
size int64
}
type MeasurementSizes struct {
m map[string]map[string]map[string]*MeasurementDetails
}
func (s *MeasurementSizes) AddSize(db, rp, measurement string, size int64) {
if s.m == nil {
s.m = make(map[string]map[string]map[string]*MeasurementDetails)
}
if _, ok := s.m[db]; !ok {
s.m[db] = make(map[string]map[string]*MeasurementDetails)
}
if _, ok := s.m[db][rp]; !ok {
s.m[db][rp] = make(map[string]*MeasurementDetails)
}
if _, ok := s.m[db][rp][measurement]; !ok {
s.m[db][rp][measurement] = &MeasurementDetails{}
}
s.m[db][rp][measurement].size += size
}
func (s *MeasurementSizes) ForEach(f func(db, rp, measurement string, detail MeasurementDetails)) {
dbKeys := make([]string, 0, len(s.m))
for db, _ := range s.m {
dbKeys = append(dbKeys, db)
}
sort.Strings(dbKeys)
for _, db := range dbKeys {
rpKeys := make([]string, 0, len(s.m[db]))
for rp, _ := range s.m[db] {
rpKeys = append(rpKeys, rp)
}
sort.Strings(rpKeys)
for _, rp := range rpKeys {
mKeys := make([]string, 0, len(s.m[db][rp]))
for m, _ := range s.m[db][rp] {
mKeys = append(mKeys, m)
}
sort.Strings(mKeys)
for _, m := range mKeys {
f(db, rp, m, *s.m[db][rp][m])
}
}
}
}
type ProgressReporter struct {
maxLength int
w io.Writer
}
func NewProgressReporter(w io.Writer) *ProgressReporter {
return &ProgressReporter{w: w}
}
func (p *ProgressReporter) Report(line string) {
if p.maxLength == 0 {
fmt.Fprintf(p.w, "\n")
p.maxLength = 1
}
for len(line) < p.maxLength {
line += " "
}
p.maxLength = len(line)
p.maxLength++
fmt.Fprint(p.w, line+"\r")
}

View File

@ -0,0 +1,3 @@
package reportdisk_test
// TODO: write some tests

View File

@ -0,0 +1,71 @@
// Package report reports statistics about TSM files.
package reporthelper
import (
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
func IsShardDir(dir string) error {
name := filepath.Base(dir)
if id, err := strconv.Atoi(name); err != nil || id < 1 {
return fmt.Errorf("not a valid shard dir: %v", dir)
}
return nil
}
func WalkShardDirs(root string, fn func(db, rp, id, path string) error) error {
type location struct {
db, rp, id, path string
}
var dirs []location
if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension {
shardDir := filepath.Dir(path)
if err := IsShardDir(shardDir); err != nil {
return err
}
absPath, err := filepath.Abs(path)
if err != nil {
return err
}
parts := strings.Split(absPath, string(filepath.Separator))
db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2]
dirs = append(dirs, location{db: db, rp: rp, id: id, path: path})
return nil
}
return nil
}); err != nil {
return err
}
sort.Slice(dirs, func(i, j int) bool {
a, _ := strconv.Atoi(dirs[i].id)
b, _ := strconv.Atoi(dirs[j].id)
return a < b
})
for _, shard := range dirs {
if err := fn(shard.db, shard.rp, shard.id, shard.path); err != nil {
return err
}
}
return nil
}

View File

@ -149,34 +149,36 @@ func (t *mockTSMFile) KeyAt(idx int) ([]byte, byte) {
return []byte(t.keys[idx]), BlockFloat64
}
func (*mockTSMFile) Path() string { panic("implement me") }
func (*mockTSMFile) Read(key []byte, t int64) ([]Value, error) { panic("implement me") }
func (*mockTSMFile) ReadAt(entry *IndexEntry, values []Value) ([]Value, error) { panic("implement me") }
func (*mockTSMFile) Entries(key []byte) []IndexEntry { panic("implement me") }
func (*mockTSMFile) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { panic("implement me") }
func (*mockTSMFile) ContainsValue(key []byte, t int64) bool { panic("implement me") }
func (*mockTSMFile) Contains(key []byte) bool { panic("implement me") }
func (*mockTSMFile) OverlapsTimeRange(min, max int64) bool { panic("implement me") }
func (*mockTSMFile) OverlapsKeyRange(min, max []byte) bool { panic("implement me") }
func (*mockTSMFile) TimeRange() (int64, int64) { panic("implement me") }
func (*mockTSMFile) TombstoneRange(key []byte) []TimeRange { panic("implement me") }
func (*mockTSMFile) KeyRange() ([]byte, []byte) { panic("implement me") }
func (*mockTSMFile) Type(key []byte) (byte, error) { panic("implement me") }
func (*mockTSMFile) BatchDelete() BatchDeleter { panic("implement me") }
func (*mockTSMFile) Delete(keys [][]byte) error { panic("implement me") }
func (*mockTSMFile) DeleteRange(keys [][]byte, min, max int64) error { panic("implement me") }
func (*mockTSMFile) HasTombstones() bool { panic("implement me") }
func (*mockTSMFile) TombstoneStats() TombstoneStat { panic("implement me") }
func (*mockTSMFile) Close() error { panic("implement me") }
func (*mockTSMFile) Size() uint32 { panic("implement me") }
func (*mockTSMFile) Rename(path string) error { panic("implement me") }
func (*mockTSMFile) Remove() error { panic("implement me") }
func (*mockTSMFile) InUse() bool { panic("implement me") }
func (*mockTSMFile) Ref() { panic("implement me") }
func (*mockTSMFile) Unref() { panic("implement me") }
func (*mockTSMFile) Stats() FileStat { panic("implement me") }
func (*mockTSMFile) BlockIterator() *BlockIterator { panic("implement me") }
func (*mockTSMFile) Free() error { panic("implement me") }
func (*mockTSMFile) Path() string { panic("implement me") }
func (*mockTSMFile) Read(key []byte, t int64) ([]Value, error) { panic("implement me") }
func (*mockTSMFile) ReadAt(entry *IndexEntry, values []Value) ([]Value, error) { panic("implement me") }
func (*mockTSMFile) Entries(key []byte) []IndexEntry { panic("implement me") }
func (*mockTSMFile) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry {
panic("implement me")
}
func (*mockTSMFile) ContainsValue(key []byte, t int64) bool { panic("implement me") }
func (*mockTSMFile) Contains(key []byte) bool { panic("implement me") }
func (*mockTSMFile) OverlapsTimeRange(min, max int64) bool { panic("implement me") }
func (*mockTSMFile) OverlapsKeyRange(min, max []byte) bool { panic("implement me") }
func (*mockTSMFile) TimeRange() (int64, int64) { panic("implement me") }
func (*mockTSMFile) TombstoneRange(key []byte) []TimeRange { panic("implement me") }
func (*mockTSMFile) KeyRange() ([]byte, []byte) { panic("implement me") }
func (*mockTSMFile) Type(key []byte) (byte, error) { panic("implement me") }
func (*mockTSMFile) BatchDelete() BatchDeleter { panic("implement me") }
func (*mockTSMFile) Delete(keys [][]byte) error { panic("implement me") }
func (*mockTSMFile) DeleteRange(keys [][]byte, min, max int64) error { panic("implement me") }
func (*mockTSMFile) HasTombstones() bool { panic("implement me") }
func (*mockTSMFile) TombstoneStats() TombstoneStat { panic("implement me") }
func (*mockTSMFile) Close() error { panic("implement me") }
func (*mockTSMFile) Size() uint32 { panic("implement me") }
func (*mockTSMFile) Rename(path string) error { panic("implement me") }
func (*mockTSMFile) Remove() error { panic("implement me") }
func (*mockTSMFile) InUse() bool { panic("implement me") }
func (*mockTSMFile) Ref() { panic("implement me") }
func (*mockTSMFile) Unref() { panic("implement me") }
func (*mockTSMFile) Stats() FileStat { panic("implement me") }
func (*mockTSMFile) BlockIterator() *BlockIterator { panic("implement me") }
func (*mockTSMFile) Free() error { panic("implement me") }
func (*mockTSMFile) ReadFloatBlockAt(*IndexEntry, *[]FloatValue) ([]FloatValue, error) {
panic("implement me")

View File

@ -79,7 +79,9 @@ func benchmarkRingGetPartition(b *testing.B, r *ring, keys int) {
}
}
func BenchmarkRing_getPartition_100(b *testing.B) { benchmarkRingGetPartition(b, MustNewRing(256), 100) }
func BenchmarkRing_getPartition_100(b *testing.B) {
benchmarkRingGetPartition(b, MustNewRing(256), 100)
}
func BenchmarkRing_getPartition_1000(b *testing.B) {
benchmarkRingGetPartition(b, MustNewRing(256), 1000)
}