feat: port report-db command from 1.x (#23922)

* feat: port report-db command from 1.x

* chore: fix linting

* chore: rename db to bucket

* chore: fix linting
pull/23939/head
Jeffrey Smith II 2022-11-21 11:23:13 -05:00 committed by GitHub
parent 77081081b5
commit c2eac86131
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 857 additions and 18 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/dump_wal"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_index"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_lp"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_db"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsi"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsm"
typecheck "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/type_conflicts"
@ -35,6 +36,11 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) {
return nil, err
}
reportDB, err := report_db.NewReportDBCommand(v)
if err != nil {
return nil, err
}
checkSchema, err := typecheck.NewCheckSchemaCommand(v)
if err != nil {
return nil, err
@ -58,6 +64,7 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) {
base.AddCommand(verify_wal.NewVerifyWALCommand())
base.AddCommand(report_tsm.NewReportTSMCommand())
base.AddCommand(build_tsi.NewBuildTSICommand())
base.AddCommand(reportDB)
base.AddCommand(checkSchema)
base.AddCommand(mergeSchema)

View File

@ -0,0 +1,242 @@
package aggregators
import (
"fmt"
"strings"
"sync"
"text/tabwriter"
report "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsm"
"github.com/influxdata/influxdb/v2/models"
)
type rollupNodeMap map[string]RollupNode
type RollupNode interface {
sync.Locker
report.Counter
Children() rollupNodeMap
RecordSeries(bucket, rp, ms string, key, field []byte, tags models.Tags)
Print(tw *tabwriter.Writer, printTags bool, bucket, rp, ms string) error
isLeaf() bool
child(key string, isLeaf bool) NodeWrapper
}
type NodeWrapper struct {
RollupNode
}
var detailedHeader = []string{"bucket", "retention policy", "measurement", "series", "fields", "tag total", "tags"}
var simpleHeader = []string{"bucket", "retention policy", "measurement", "series"}
type RollupNodeFactory struct {
header []string
EstTitle string
NewNode func(isLeaf bool) NodeWrapper
counter func() report.Counter
}
var nodeFactory *RollupNodeFactory
func CreateNodeFactory(detailed, exact bool) *RollupNodeFactory {
estTitle := " (est.)"
newCounterFn := report.NewHLLCounter
if exact {
newCounterFn = report.NewExactCounter
estTitle = ""
}
if detailed {
nodeFactory = newDetailedNodeFactory(newCounterFn, estTitle)
} else {
nodeFactory = newSimpleNodeFactory(newCounterFn, estTitle)
}
return nodeFactory
}
func (f *RollupNodeFactory) PrintHeader(tw *tabwriter.Writer) error {
_, err := fmt.Fprintln(tw, strings.Join(f.header, "\t"))
return err
}
func (f *RollupNodeFactory) PrintDivider(tw *tabwriter.Writer) error {
divLine := f.makeTabDivider()
_, err := fmt.Fprintln(tw, divLine)
return err
}
func (f *RollupNodeFactory) makeTabDivider() string {
div := make([]string, 0, len(f.header))
for _, s := range f.header {
div = append(div, strings.Repeat("-", len(s)))
}
return strings.Join(div, "\t")
}
func newSimpleNodeFactory(newCounterFn func() report.Counter, est string) *RollupNodeFactory {
return &RollupNodeFactory{
header: simpleHeader,
EstTitle: est,
NewNode: func(isLeaf bool) NodeWrapper { return NodeWrapper{newSimpleNode(isLeaf, newCounterFn)} },
counter: newCounterFn,
}
}
func newDetailedNodeFactory(newCounterFn func() report.Counter, est string) *RollupNodeFactory {
return &RollupNodeFactory{
header: detailedHeader,
EstTitle: est,
NewNode: func(isLeaf bool) NodeWrapper { return NodeWrapper{newDetailedNode(isLeaf, newCounterFn)} },
counter: newCounterFn,
}
}
type simpleNode struct {
sync.Mutex
report.Counter
rollupNodeMap
}
func (s *simpleNode) Children() rollupNodeMap {
return s.rollupNodeMap
}
func (s *simpleNode) child(key string, isLeaf bool) NodeWrapper {
if s.isLeaf() {
panic("Trying to get the child to a leaf node")
}
s.Lock()
defer s.Unlock()
c, ok := s.Children()[key]
if !ok {
c = nodeFactory.NewNode(isLeaf)
s.Children()[key] = c
}
return NodeWrapper{c}
}
func (s *simpleNode) isLeaf() bool {
return s.Children() == nil
}
func newSimpleNode(isLeaf bool, fn func() report.Counter) *simpleNode {
s := &simpleNode{Counter: fn()}
if !isLeaf {
s.rollupNodeMap = make(rollupNodeMap)
} else {
s.rollupNodeMap = nil
}
return s
}
func (s *simpleNode) RecordSeries(bucket, rp, _ string, key, _ []byte, _ models.Tags) {
s.Lock()
defer s.Unlock()
s.recordSeriesNoLock(bucket, rp, key)
}
func (s *simpleNode) recordSeriesNoLock(bucket, rp string, key []byte) {
s.Add([]byte(fmt.Sprintf("%s.%s.%s", bucket, rp, key)))
}
func (s *simpleNode) Print(tw *tabwriter.Writer, _ bool, bucket, rp, ms string) error {
_, err := fmt.Fprintf(tw, "%s\t%s\t%s\t%d\n",
bucket,
rp,
ms,
s.Count())
return err
}
type detailedNode struct {
simpleNode
fields report.Counter
tags map[string]report.Counter
}
func newDetailedNode(isLeaf bool, fn func() report.Counter) *detailedNode {
d := &detailedNode{
simpleNode: simpleNode{
Counter: fn(),
},
fields: fn(),
tags: make(map[string]report.Counter),
}
if !isLeaf {
d.simpleNode.rollupNodeMap = make(rollupNodeMap)
} else {
d.simpleNode.rollupNodeMap = nil
}
return d
}
func (d *detailedNode) RecordSeries(bucket, rp, ms string, key, field []byte, tags models.Tags) {
d.Lock()
defer d.Unlock()
d.simpleNode.recordSeriesNoLock(bucket, rp, key)
d.fields.Add([]byte(fmt.Sprintf("%s.%s.%s.%s", bucket, rp, ms, field)))
for _, t := range tags {
// Add database, retention policy, and measurement
// to correctly aggregate in inner (non-leaf) nodes
canonTag := fmt.Sprintf("%s.%s.%s.%s", bucket, rp, ms, t.Key)
tc, ok := d.tags[canonTag]
if !ok {
tc = nodeFactory.counter()
d.tags[canonTag] = tc
}
tc.Add(t.Value)
}
}
func (d *detailedNode) Print(tw *tabwriter.Writer, printTags bool, bucket, rp, ms string) error {
seriesN := d.Count()
fieldsN := d.fields.Count()
var tagKeys []string
tagN := uint64(0)
if printTags {
tagKeys = make([]string, 0, len(d.tags))
}
for k, v := range d.tags {
c := v.Count()
tagN += c
if printTags {
tagKeys = append(tagKeys, fmt.Sprintf("%q: %d", k[strings.LastIndex(k, ".")+1:], c))
}
}
_, err := fmt.Fprintf(tw, "%s\t%s\t%s\t%d\t%d\t%d\t%s\n",
bucket,
rp,
ms,
seriesN,
fieldsN,
tagN,
strings.Join(tagKeys, ", "))
return err
}
func (r *NodeWrapper) Record(depth, totalDepth int, bucket, rp, measurement string, key []byte, field []byte, tags models.Tags) {
r.RecordSeries(bucket, rp, measurement, key, field, tags)
switch depth {
case 2:
if depth < totalDepth {
// Create measurement level in tree
c := r.child(measurement, true)
c.RecordSeries(bucket, rp, measurement, key, field, tags)
}
case 1:
if depth < totalDepth {
// Create retention policy level in tree
c := r.child(rp, (depth+1) == totalDepth)
c.Record(depth+1, totalDepth, bucket, rp, measurement, key, field, tags)
}
case 0:
if depth < totalDepth {
// Create database level in tree
c := r.child(bucket, (depth+1) == totalDepth)
c.Record(depth+1, totalDepth, bucket, rp, measurement, key, field, tags)
}
default:
}
}

View File

@ -0,0 +1,330 @@
package aggregators
import (
"bytes"
"sync"
"testing"
"github.com/influxdata/influxdb/v2/models"
"github.com/stretchr/testify/require"
)
type result struct {
fields uint64
tags uint64
series uint64
}
type test struct {
db string
rp string
key []byte
}
// Ensure that tags and fields and series which differ only in database, retention policy, or measurement
// are correctly counted.
func Test_canonicalize(t *testing.T) {
totalDepth := 3
// measurement,tag1=tag1_value1,tag2=tag2_value1#!~#field1
tests := []test{
{
db: "db1",
rp: "rp1",
key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"),
},
{
db: "db1",
rp: "rp1",
key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"),
},
{
db: "db1",
rp: "rp1",
key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"),
},
{
db: "db1",
rp: "rp1",
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f1"),
},
{
db: "db1",
rp: "rp1",
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"),
},
{
db: "db1",
rp: "rp2",
key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"),
},
{
db: "db1",
rp: "rp2",
key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"),
},
{
db: "db1",
rp: "rp2",
key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"),
},
{
db: "db1",
rp: "rp2",
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f3"),
},
{
db: "db1",
rp: "rp2",
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"),
},
{
db: "db1",
rp: "rp1",
key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"),
},
{
db: "db1",
rp: "rp1",
key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"),
},
{
db: "db1",
rp: "rp1",
key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"),
},
{
db: "db1",
rp: "rp1",
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"),
},
{
db: "db1",
rp: "rp1",
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"),
},
{
db: "db1",
rp: "rp2",
key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"),
},
{
db: "db1",
rp: "rp2",
key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"),
},
{
db: "db1",
rp: "rp2",
key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"),
},
{
db: "db1",
rp: "rp2",
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"),
},
{
db: "db1",
rp: "rp2",
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"),
},
{
db: "db2",
rp: "rp1",
key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"),
},
{
db: "db2",
rp: "rp1",
key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"),
},
{
db: "db2",
rp: "rp1",
key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"),
},
{
db: "db2",
rp: "rp1",
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f1"),
},
{
db: "db2",
rp: "rp1",
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"),
},
{
db: "db2",
rp: "rp2",
key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"),
},
{
db: "db2",
rp: "rp2",
key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"),
},
{
db: "db2",
rp: "rp2",
key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"),
},
{
db: "db2",
rp: "rp2",
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f1"),
},
{
db: "db2",
rp: "rp2",
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"),
},
{
db: "db2",
rp: "rp1",
key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"),
},
{
db: "db2",
rp: "rp1",
key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"),
},
{
db: "db2",
rp: "rp1",
key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"),
},
{
db: "db2",
rp: "rp1",
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"),
},
{
db: "db2",
rp: "rp1",
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"),
},
{
db: "db2",
rp: "rp2",
key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"),
},
{
db: "db2",
rp: "rp2",
key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"),
},
{
db: "db2",
rp: "rp2",
key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"),
},
{
db: "db2",
rp: "rp2",
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"),
},
{
db: "db2",
rp: "rp2",
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"),
},
}
results := map[string]map[string]map[string]*result{
"db1": {
"rp1": {
"m1": {2, 4, 5},
"m2": {2, 4, 5},
"": {4, 8, 10},
},
"rp2": {
"m1": {3, 4, 5},
"m2": {2, 4, 5},
"": {5, 8, 10},
},
"": {
"": {9, 16, 20},
},
},
"db2": {
"rp1": {
"m1": {2, 4, 5},
"m2": {2, 4, 5},
"": {4, 8, 10},
},
"rp2": {
"m1": {2, 4, 5},
"m2": {2, 4, 5},
"": {4, 8, 10},
},
"": {
"": {8, 16, 20},
},
},
"": {
"": {
"": {17, 32, 40},
},
},
}
testLoop(t, false, true, totalDepth, tests, results)
testLoop(t, true, true, totalDepth, tests, results)
testLoop(t, false, false, totalDepth, tests, results)
testLoop(t, true, false, totalDepth, tests, results)
}
func testLoop(t *testing.T, detailed bool, exact bool, totalDepth int, tests []test, results map[string]map[string]map[string]*result) {
factory := CreateNodeFactory(detailed, exact)
tree := factory.NewNode(totalDepth == 0)
wg := sync.WaitGroup{}
tf := func() {
for i := range tests {
seriesKey, field, _ := bytes.Cut(tests[i].key, []byte("#!~#"))
measurement, tags := models.ParseKey(seriesKey)
tree.Record(0, totalDepth, tests[i].db, tests[i].rp, measurement, tests[i].key, field, tags)
}
wg.Done()
}
const concurrency = 5
wg.Add(concurrency)
for j := 0; j < concurrency; j++ {
go tf()
}
wg.Wait()
for d, db := range tree.Children() {
for r, rp := range db.Children() {
for m, measure := range rp.Children() {
checkNode(t, measure, results[d][r][m], d, r, m)
}
checkNode(t, rp, results[d][r][""], d, r, "")
}
checkNode(t, db, results[d][""][""], d, "", "")
}
checkNode(t, tree, results[""][""][""], "", "", "")
}
func checkNode(t *testing.T, measure RollupNode, results *result, d string, r string, m string) {
mr, ok := measure.(NodeWrapper)
if !ok {
t.Fatalf("internal error: expected a NodeWrapper type")
}
switch node := mr.RollupNode.(type) {
case *detailedNode:
require.Equalf(t, results.series, node.Count(), "series count wrong. db: %q, rp: %q, ms: %q", d, r, m)
require.Equalf(t, results.fields, node.fields.Count(), "field count wrong. db: %q, rp: %q, ms: %q", d, r, m)
tagSum := uint64(0)
for _, t := range node.tags {
tagSum += t.Count()
}
require.Equalf(t, results.tags, tagSum, "tag value count wrong. db: %q, rp: %q, ms: %q", d, r, m)
case *simpleNode:
require.Equalf(t, results.series, node.Count(), "series count wrong. db: %q, rp: %q, ms: %q", d, r, m)
default:
t.Fatalf("internal error: unknown node type")
}
}

View File

@ -0,0 +1,189 @@
package report_db
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"text/tabwriter"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_db/aggregators"
"github.com/influxdata/influxdb/v2/kit/cli"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/reporthelper"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
)
// ReportDB represents the program execution for "influxd report-db".
type ReportDB struct {
// Standard input/output, overridden for testing.
Stderr io.Writer
Stdout io.Writer
dbPath string
exact bool
detailed bool
// How many goroutines to dedicate to calculating cardinality.
concurrency int
// t, d, r, m for Total, Database, Retention Policy, Measurement
rollup string
}
func NewReportDBCommand(v *viper.Viper) (*cobra.Command, error) {
flags := &ReportDB{
Stderr: os.Stderr,
Stdout: os.Stdout,
}
cmd := &cobra.Command{
Use: "report-db",
Short: "Estimates cloud 2 cardinality for a database",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return reportDBRunE(cmd, flags)
},
}
opts := []cli.Opt{
{
DestP: &flags.dbPath,
Flag: "db-path",
Desc: "path to database",
Required: true,
},
{
DestP: &flags.concurrency,
Flag: "c",
Desc: "set worker concurrency, defaults to one",
Default: 1,
},
{
DestP: &flags.detailed,
Flag: "detailed",
Desc: "include counts for fields, tags",
Default: false,
},
{
DestP: &flags.exact,
Flag: "exact",
Desc: "report exact counts",
Default: false,
},
{
DestP: &flags.rollup,
Flag: "rollup",
Desc: "rollup level - t: total, b: bucket, r: retention policy, m: measurement",
Default: "m",
},
}
if err := cli.BindOptions(v, cmd, opts); err != nil {
return nil, err
}
return cmd, nil
}
func reportDBRunE(_ *cobra.Command, reportdb *ReportDB) error {
var legalRollups = map[string]int{"m": 3, "r": 2, "b": 1, "t": 0}
if reportdb.dbPath == "" {
return errors.New("path to database must be provided")
}
totalDepth, ok := legalRollups[reportdb.rollup]
if !ok {
return fmt.Errorf("invalid rollup specified: %q", reportdb.rollup)
}
factory := aggregators.CreateNodeFactory(reportdb.detailed, reportdb.exact)
totalsTree := factory.NewNode(totalDepth == 0)
g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(reportdb.concurrency)
processTSM := func(bucket, rp, id, path string) error {
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
_, _ = fmt.Fprintf(reportdb.Stderr, "error: %s: %v. Skipping.\n", path, err)
return nil
}
reader, err := tsm1.NewTSMReader(file)
if err != nil {
_, _ = fmt.Fprintf(reportdb.Stderr, "error: %s: %v. Skipping.\n", file.Name(), err)
// NewTSMReader won't close the file handle on failure, so do it here.
_ = file.Close()
return nil
}
defer func() {
// The TSMReader will close the underlying file handle here.
if err := reader.Close(); err != nil {
_, _ = fmt.Fprintf(reportdb.Stderr, "error closing: %s: %v.\n", file.Name(), err)
}
}()
seriesCount := reader.KeyCount()
for i := 0; i < seriesCount; i++ {
func() {
key, _ := reader.KeyAt(i)
seriesKey, field, _ := bytes.Cut(key, []byte("#!~#"))
measurement, tags := models.ParseKey(seriesKey)
totalsTree.Record(0, totalDepth, bucket, rp, measurement, key, field, tags)
}()
}
return nil
}
done := ctx.Done()
err := reporthelper.WalkShardDirs(reportdb.dbPath, func(bucket, rp, id, path string) error {
select {
case <-done:
return nil
default:
g.Go(func() error {
return processTSM(bucket, rp, id, path)
})
return nil
}
})
if err != nil {
_, _ = fmt.Fprintf(reportdb.Stderr, "%s: %v\n", reportdb.dbPath, err)
return err
}
err = g.Wait()
if err != nil {
_, _ = fmt.Fprintf(reportdb.Stderr, "%s: %v\n", reportdb.dbPath, err)
return err
}
tw := tabwriter.NewWriter(reportdb.Stdout, 8, 2, 1, ' ', 0)
if err = factory.PrintHeader(tw); err != nil {
return err
}
if err = factory.PrintDivider(tw); err != nil {
return err
}
for d, bucket := range totalsTree.Children() {
for r, rp := range bucket.Children() {
for m, measure := range rp.Children() {
err = measure.Print(tw, true, fmt.Sprintf("%q", d), fmt.Sprintf("%q", r), fmt.Sprintf("%q", m))
if err != nil {
return err
}
}
if err = rp.Print(tw, false, fmt.Sprintf("%q", d), fmt.Sprintf("%q", r), ""); err != nil {
return err
}
}
if err = bucket.Print(tw, false, fmt.Sprintf("%q", d), "", ""); err != nil {
return err
}
}
if err = totalsTree.Print(tw, false, "Total"+factory.EstTitle, "", ""); err != nil {
return err
}
return tw.Flush()
}

View File

@ -91,20 +91,20 @@ func (a *args) isShardDir(dir string) error {
}
func (a *args) Run(cmd *cobra.Command) error {
// Create the cardinality counter
newCounterFn := newHLLCounter
// Create the cardinality Counter
newCounterFn := NewHLLCounter
estTitle := " (est)"
if a.exact {
estTitle = ""
newCounterFn = newExactCounter
newCounterFn = NewExactCounter
}
totalSeries := newCounterFn()
tagCardinalities := map[string]counter{}
measCardinalities := map[string]counter{}
fieldCardinalities := map[string]counter{}
tagCardinalities := map[string]Counter{}
measCardinalities := map[string]Counter{}
fieldCardinalities := map[string]Counter{}
dbCardinalities := map[string]counter{}
dbCardinalities := map[string]Counter{}
start := time.Now()
@ -233,13 +233,13 @@ type printArgs struct {
fileCount int
minTime, maxTime int64
estTitle string
totalSeries counter
totalSeries Counter
detailed bool
tagCardinalities map[string]counter
measCardinalities map[string]counter
fieldCardinalities map[string]counter
dbCardinalities map[string]counter
tagCardinalities map[string]Counter
measCardinalities map[string]Counter
fieldCardinalities map[string]Counter
dbCardinalities map[string]Counter
}
func printSummary(cmd *cobra.Command, p printArgs) {
@ -277,7 +277,7 @@ func printSummary(cmd *cobra.Command, p printArgs) {
}
// sortKeys is a quick helper to return the sorted set of a map's keys
func sortKeys(vals map[string]counter) (keys []string) {
func sortKeys(vals map[string]Counter) (keys []string) {
for k := range vals {
keys = append(keys, k)
}
@ -335,14 +335,14 @@ func (a *args) walkShardDirs(root string, fn func(db, rp, id, path string) error
return nil
}
// counter abstracts a method of counting keys.
type counter interface {
// Counter abstracts a method of counting keys.
type Counter interface {
Add(key []byte)
Count() uint64
}
// newHLLCounter returns an approximate counter using HyperLogLogs for cardinality estimation.
func newHLLCounter() counter {
// NewHLLCounter returns an approximate Counter using HyperLogLogs for cardinality estimation.
func NewHLLCounter() Counter {
return hllpp.New()
}
@ -359,7 +359,7 @@ func (c *exactCounter) Count() uint64 {
return uint64(len(c.m))
}
func newExactCounter() counter {
func NewExactCounter() Counter {
return &exactCounter{
m: make(map[string]struct{}),
}

View File

@ -0,0 +1,71 @@
// Package reporthelper reports statistics about TSM files.
package reporthelper
import (
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
)
func IsShardDir(dir string) error {
name := filepath.Base(dir)
if id, err := strconv.Atoi(name); err != nil || id < 1 {
return fmt.Errorf("not a valid shard dir: %v", dir)
}
return nil
}
func WalkShardDirs(root string, fn func(db, rp, id, path string) error) error {
type location struct {
db, rp, id, path string
}
var dirs []location
if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension {
shardDir := filepath.Dir(path)
if err := IsShardDir(shardDir); err != nil {
return err
}
absPath, err := filepath.Abs(path)
if err != nil {
return err
}
parts := strings.Split(absPath, string(filepath.Separator))
db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2]
dirs = append(dirs, location{db: db, rp: rp, id: id, path: path})
return nil
}
return nil
}); err != nil {
return err
}
sort.Slice(dirs, func(i, j int) bool {
a, _ := strconv.Atoi(dirs[i].id)
b, _ := strconv.Atoi(dirs[j].id)
return a < b
})
for _, shard := range dirs {
if err := fn(shard.db, shard.rp, shard.id, shard.path); err != nil {
return err
}
}
return nil
}