feat: estimate Cloud2 cardinality on 1.X databases (#23351)
feat: estimate Cloud2 cardinality on 1.X databases To ease migrations to Cloud 2 installations from 1.X databases, estimate Cloud 2 cardinality for a data node (or OSS system). closes https://github.com/influxdata/influxdb/issues/23356pull/23392/head
parent
522c32754c
commit
ef90bc830f
|
@ -0,0 +1,242 @@
|
|||
package aggregators
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"text/tabwriter"
|
||||
|
||||
"github.com/influxdata/influxdb/cmd/influx_inspect/report"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
||||
type rollupNodeMap map[string]RollupNode
|
||||
|
||||
type RollupNode interface {
|
||||
sync.Locker
|
||||
report.Counter
|
||||
Children() rollupNodeMap
|
||||
RecordSeries(db, rp, ms string, key, field []byte, tags models.Tags)
|
||||
Print(tw *tabwriter.Writer, printTags bool, db, rp, ms string) error
|
||||
isLeaf() bool
|
||||
child(key string, isLeaf bool) NodeWrapper
|
||||
}
|
||||
|
||||
type NodeWrapper struct {
|
||||
RollupNode
|
||||
}
|
||||
|
||||
var detailedHeader = []string{"DB", "RP", "measurement", "series", "fields", "tag total", "tags"}
|
||||
var simpleHeader = []string{"DB", "RP", "measurement", "series"}
|
||||
|
||||
type RollupNodeFactory struct {
|
||||
header []string
|
||||
EstTitle string
|
||||
NewNode func(isLeaf bool) NodeWrapper
|
||||
counter func() report.Counter
|
||||
}
|
||||
|
||||
var nodeFactory *RollupNodeFactory
|
||||
|
||||
func CreateNodeFactory(detailed, exact bool) *RollupNodeFactory {
|
||||
estTitle := " (est.)"
|
||||
newCounterFn := report.NewHLLCounter
|
||||
if exact {
|
||||
newCounterFn = report.NewExactCounter
|
||||
estTitle = ""
|
||||
}
|
||||
|
||||
if detailed {
|
||||
nodeFactory = newDetailedNodeFactory(newCounterFn, estTitle)
|
||||
} else {
|
||||
nodeFactory = newSimpleNodeFactory(newCounterFn, estTitle)
|
||||
}
|
||||
return nodeFactory
|
||||
}
|
||||
|
||||
func (f *RollupNodeFactory) PrintHeader(tw *tabwriter.Writer) error {
|
||||
_, err := fmt.Fprintln(tw, strings.Join(f.header, "\t"))
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *RollupNodeFactory) PrintDivider(tw *tabwriter.Writer) error {
|
||||
divLine := f.makeTabDivider()
|
||||
_, err := fmt.Fprintln(tw, divLine)
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *RollupNodeFactory) makeTabDivider() string {
|
||||
div := make([]string, 0, len(f.header))
|
||||
for _, s := range f.header {
|
||||
div = append(div, strings.Repeat("-", len(s)))
|
||||
}
|
||||
return strings.Join(div, "\t")
|
||||
}
|
||||
|
||||
func newSimpleNodeFactory(newCounterFn func() report.Counter, est string) *RollupNodeFactory {
|
||||
return &RollupNodeFactory{
|
||||
header: simpleHeader,
|
||||
EstTitle: est,
|
||||
NewNode: func(isLeaf bool) NodeWrapper { return NodeWrapper{newSimpleNode(isLeaf, newCounterFn)} },
|
||||
counter: newCounterFn,
|
||||
}
|
||||
}
|
||||
|
||||
func newDetailedNodeFactory(newCounterFn func() report.Counter, est string) *RollupNodeFactory {
|
||||
return &RollupNodeFactory{
|
||||
header: detailedHeader,
|
||||
EstTitle: est,
|
||||
NewNode: func(isLeaf bool) NodeWrapper { return NodeWrapper{newDetailedNode(isLeaf, newCounterFn)} },
|
||||
counter: newCounterFn,
|
||||
}
|
||||
}
|
||||
|
||||
type simpleNode struct {
|
||||
sync.Mutex
|
||||
report.Counter
|
||||
rollupNodeMap
|
||||
}
|
||||
|
||||
func (s *simpleNode) Children() rollupNodeMap {
|
||||
return s.rollupNodeMap
|
||||
}
|
||||
|
||||
func (s *simpleNode) child(key string, isLeaf bool) NodeWrapper {
|
||||
if s.isLeaf() {
|
||||
panic("Trying to get the child to a leaf node")
|
||||
}
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
c, ok := s.Children()[key]
|
||||
if !ok {
|
||||
c = nodeFactory.NewNode(isLeaf)
|
||||
s.Children()[key] = c
|
||||
}
|
||||
return NodeWrapper{c}
|
||||
}
|
||||
|
||||
func (s *simpleNode) isLeaf() bool {
|
||||
return s.Children() == nil
|
||||
}
|
||||
|
||||
func newSimpleNode(isLeaf bool, fn func() report.Counter) *simpleNode {
|
||||
s := &simpleNode{Counter: fn()}
|
||||
if !isLeaf {
|
||||
s.rollupNodeMap = make(rollupNodeMap)
|
||||
} else {
|
||||
s.rollupNodeMap = nil
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *simpleNode) RecordSeries(db, rp, _ string, key, _ []byte, _ models.Tags) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.recordSeriesNoLock(db, rp, key)
|
||||
}
|
||||
|
||||
func (s *simpleNode) recordSeriesNoLock(db, rp string, key []byte) {
|
||||
s.Add([]byte(fmt.Sprintf("%s.%s.%s", db, rp, key)))
|
||||
}
|
||||
|
||||
func (s *simpleNode) Print(tw *tabwriter.Writer, _ bool, db, rp, ms string) error {
|
||||
_, err := fmt.Fprintf(tw, "%s\t%s\t%s\t%d\n",
|
||||
db,
|
||||
rp,
|
||||
ms,
|
||||
s.Count())
|
||||
return err
|
||||
}
|
||||
|
||||
type detailedNode struct {
|
||||
simpleNode
|
||||
fields report.Counter
|
||||
tags map[string]report.Counter
|
||||
}
|
||||
|
||||
func newDetailedNode(isLeaf bool, fn func() report.Counter) *detailedNode {
|
||||
d := &detailedNode{
|
||||
simpleNode: simpleNode{
|
||||
Counter: fn(),
|
||||
},
|
||||
fields: fn(),
|
||||
tags: make(map[string]report.Counter),
|
||||
}
|
||||
if !isLeaf {
|
||||
d.simpleNode.rollupNodeMap = make(rollupNodeMap)
|
||||
} else {
|
||||
d.simpleNode.rollupNodeMap = nil
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
func (d *detailedNode) RecordSeries(db, rp, ms string, key, field []byte, tags models.Tags) {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
d.simpleNode.recordSeriesNoLock(db, rp, key)
|
||||
d.fields.Add([]byte(fmt.Sprintf("%s.%s.%s.%s", db, rp, ms, field)))
|
||||
for _, t := range tags {
|
||||
// Add database, retention policy, and measurement
|
||||
// to correctly aggregate in inner (non-leaf) nodes
|
||||
canonTag := fmt.Sprintf("%s.%s.%s.%s", db, rp, ms, t.Key)
|
||||
tc, ok := d.tags[canonTag]
|
||||
if !ok {
|
||||
tc = nodeFactory.counter()
|
||||
d.tags[canonTag] = tc
|
||||
}
|
||||
tc.Add(t.Value)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *detailedNode) Print(tw *tabwriter.Writer, printTags bool, db, rp, ms string) error {
|
||||
seriesN := d.Count()
|
||||
fieldsN := d.fields.Count()
|
||||
var tagKeys []string
|
||||
tagN := uint64(0)
|
||||
|
||||
if printTags {
|
||||
tagKeys = make([]string, 0, len(d.tags))
|
||||
}
|
||||
for k, v := range d.tags {
|
||||
c := v.Count()
|
||||
tagN += c
|
||||
if printTags {
|
||||
tagKeys = append(tagKeys, fmt.Sprintf("%q: %d", k[strings.LastIndex(k, ".")+1:], c))
|
||||
}
|
||||
}
|
||||
_, err := fmt.Fprintf(tw, "%s\t%s\t%s\t%d\t%d\t%d\t%s\n",
|
||||
db,
|
||||
rp,
|
||||
ms,
|
||||
seriesN,
|
||||
fieldsN,
|
||||
tagN,
|
||||
strings.Join(tagKeys, ", "))
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *NodeWrapper) Record(depth, totalDepth int, db, rp, measurement string, key []byte, field []byte, tags models.Tags) {
|
||||
r.RecordSeries(db, rp, measurement, key, field, tags)
|
||||
|
||||
switch depth {
|
||||
case 2:
|
||||
if depth < totalDepth {
|
||||
// Create measurement level in tree
|
||||
c := r.child(measurement, true)
|
||||
c.RecordSeries(db, rp, measurement, key, field, tags)
|
||||
}
|
||||
case 1:
|
||||
if depth < totalDepth {
|
||||
// Create retention policy level in tree
|
||||
c := r.child(rp, (depth+1) == totalDepth)
|
||||
c.Record(depth+1, totalDepth, db, rp, measurement, key, field, tags)
|
||||
}
|
||||
case 0:
|
||||
if depth < totalDepth {
|
||||
// Create database level in tree
|
||||
c := r.child(db, (depth+1) == totalDepth)
|
||||
c.Record(depth+1, totalDepth, db, rp, measurement, key, field, tags)
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
|
@ -0,0 +1,329 @@
|
|||
package aggregators
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type result struct {
|
||||
fields uint64
|
||||
tags uint64
|
||||
series uint64
|
||||
}
|
||||
|
||||
type test struct {
|
||||
db string
|
||||
rp string
|
||||
key []byte
|
||||
}
|
||||
|
||||
// Ensure that tags and fields and series which differ only in database, retention policy, or measurement
|
||||
// are correctly counted.
|
||||
func Test_canonicalize(t *testing.T) {
|
||||
totalDepth := 3
|
||||
|
||||
// measurement,tag1=tag1_value1,tag2=tag2_value1#!~#field1
|
||||
tests := []test{
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp1",
|
||||
key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp1",
|
||||
key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp1",
|
||||
key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp1",
|
||||
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp1",
|
||||
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp2",
|
||||
key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp2",
|
||||
key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp2",
|
||||
key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp2",
|
||||
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f3"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp2",
|
||||
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp1",
|
||||
key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp1",
|
||||
key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp1",
|
||||
key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp1",
|
||||
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp1",
|
||||
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp2",
|
||||
key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp2",
|
||||
key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp2",
|
||||
key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp2",
|
||||
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db1",
|
||||
rp: "rp2",
|
||||
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp1",
|
||||
key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp1",
|
||||
key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp1",
|
||||
key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp1",
|
||||
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp1",
|
||||
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp2",
|
||||
key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp2",
|
||||
key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp2",
|
||||
key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp2",
|
||||
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp2",
|
||||
key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp1",
|
||||
key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp1",
|
||||
key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp1",
|
||||
key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp1",
|
||||
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp1",
|
||||
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp2",
|
||||
key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp2",
|
||||
key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp2",
|
||||
key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp2",
|
||||
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"),
|
||||
},
|
||||
{
|
||||
db: "db2",
|
||||
rp: "rp2",
|
||||
key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"),
|
||||
},
|
||||
}
|
||||
|
||||
results := map[string]map[string]map[string]*result{
|
||||
"db1": {
|
||||
"rp1": {
|
||||
"m1": {2, 4, 5},
|
||||
"m2": {2, 4, 5},
|
||||
"": {4, 8, 10},
|
||||
},
|
||||
"rp2": {
|
||||
"m1": {3, 4, 5},
|
||||
"m2": {2, 4, 5},
|
||||
"": {5, 8, 10},
|
||||
},
|
||||
"": {
|
||||
"": {9, 16, 20},
|
||||
},
|
||||
},
|
||||
"db2": {
|
||||
"rp1": {
|
||||
"m1": {2, 4, 5},
|
||||
"m2": {2, 4, 5},
|
||||
"": {4, 8, 10},
|
||||
},
|
||||
"rp2": {
|
||||
"m1": {2, 4, 5},
|
||||
"m2": {2, 4, 5},
|
||||
"": {4, 8, 10},
|
||||
},
|
||||
"": {
|
||||
"": {8, 16, 20},
|
||||
},
|
||||
},
|
||||
"": {
|
||||
"": {
|
||||
"": {17, 32, 40},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
testLoop(t, false, true, totalDepth, tests, results)
|
||||
testLoop(t, true, true, totalDepth, tests, results)
|
||||
testLoop(t, false, false, totalDepth, tests, results)
|
||||
testLoop(t, true, false, totalDepth, tests, results)
|
||||
}
|
||||
|
||||
func testLoop(t *testing.T, detailed bool, exact bool, totalDepth int, tests []test, results map[string]map[string]map[string]*result) {
|
||||
factory := CreateNodeFactory(detailed, exact)
|
||||
tree := factory.NewNode(totalDepth == 0)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
tf := func() {
|
||||
for i, _ := range tests {
|
||||
seriesKey, field, _ := bytes.Cut(tests[i].key, []byte("#!~#"))
|
||||
measurement, tags := models.ParseKey(seriesKey)
|
||||
tree.Record(0, totalDepth, tests[i].db, tests[i].rp, measurement, tests[i].key, field, tags)
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
const concurrency = 5
|
||||
wg.Add(concurrency)
|
||||
for j := 0; j < concurrency; j++ {
|
||||
go tf()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for d, db := range tree.Children() {
|
||||
for r, rp := range db.Children() {
|
||||
for m, measure := range rp.Children() {
|
||||
checkNode(t, measure, results[d][r][m], d, r, m)
|
||||
}
|
||||
checkNode(t, rp, results[d][r][""], d, r, "")
|
||||
}
|
||||
checkNode(t, db, results[d][""][""], d, "", "")
|
||||
}
|
||||
checkNode(t, tree, results[""][""][""], "", "", "")
|
||||
}
|
||||
|
||||
func checkNode(t *testing.T, measure RollupNode, results *result, d string, r string, m string) {
|
||||
mr, ok := measure.(NodeWrapper)
|
||||
if !ok {
|
||||
t.Fatalf("internal error: expected a NodeWrapper type")
|
||||
}
|
||||
|
||||
switch node := mr.RollupNode.(type) {
|
||||
case *detailedNode:
|
||||
require.Equalf(t, results.series, node.Count(), "series count wrong. db: %q, rp: %q, ms: %q", d, r, m)
|
||||
require.Equalf(t, results.fields, node.fields.Count(), "field count wrong. db: %q, rp: %q, ms: %q", d, r, m)
|
||||
tagSum := uint64(0)
|
||||
for _, t := range node.tags {
|
||||
tagSum += t.Count()
|
||||
}
|
||||
require.Equalf(t, results.tags, tagSum, "tag value count wrong. db: %q, rp: %q, ms: %q", d, r, m)
|
||||
case *simpleNode:
|
||||
require.Equalf(t, results.series, node.Count(), "series count wrong. db: %q, rp: %q, ms: %q", d, r, m)
|
||||
default:
|
||||
t.Fatalf("internal error: unknown node type")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,160 @@
|
|||
package cardinality
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"text/tabwriter"
|
||||
|
||||
"github.com/influxdata/influxdb/cmd/influx_inspect/cardinality/aggregators"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/reporthelper"
|
||||
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// Command represents the program execution for "influxd cardinality".
|
||||
type Command struct {
|
||||
// Standard input/output, overridden for testing.
|
||||
Stderr io.Writer
|
||||
Stdout io.Writer
|
||||
|
||||
dbPath string
|
||||
shardPaths map[uint64]string
|
||||
exact bool
|
||||
detailed bool
|
||||
// How many goroutines to dedicate to calculating cardinality.
|
||||
concurrency int
|
||||
// t, d, r, m for Total, Database, Retention Policy, Measurement
|
||||
rollup string
|
||||
}
|
||||
|
||||
// NewCommand returns a new instance of Command with default setting applied.
|
||||
func NewCommand() *Command {
|
||||
return &Command{
|
||||
Stderr: os.Stderr,
|
||||
Stdout: os.Stdout,
|
||||
shardPaths: map[uint64]string{},
|
||||
concurrency: 1,
|
||||
detailed: false,
|
||||
rollup: "m",
|
||||
}
|
||||
}
|
||||
|
||||
// Run executes the command.
|
||||
func (cmd *Command) Run(args ...string) (err error) {
|
||||
var legalRollups = map[string]int{"m": 3, "r": 2, "d": 1, "t": 0}
|
||||
fs := flag.NewFlagSet("report-db", flag.ExitOnError)
|
||||
fs.StringVar(&cmd.dbPath, "db-path", "", "Path to database. Required.")
|
||||
fs.IntVar(&cmd.concurrency, "c", 1, "Set worker concurrency. Defaults to one.")
|
||||
fs.BoolVar(&cmd.detailed, "detailed", false, "Include counts for fields, tags, ")
|
||||
fs.BoolVar(&cmd.exact, "exact", false, "Report exact counts")
|
||||
fs.StringVar(&cmd.rollup, "rollup", "m", "Rollup level - t: total, d: database, r: retention policy, m: measurement")
|
||||
fs.SetOutput(cmd.Stdout)
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cmd.dbPath == "" {
|
||||
return errors.New("path to database must be provided")
|
||||
}
|
||||
|
||||
totalDepth, ok := legalRollups[cmd.rollup]
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid rollup specified: %q", cmd.rollup)
|
||||
}
|
||||
|
||||
factory := aggregators.CreateNodeFactory(cmd.detailed, cmd.exact)
|
||||
totalsTree := factory.NewNode(totalDepth == 0)
|
||||
|
||||
g, ctx := errgroup.WithContext(context.Background())
|
||||
g.SetLimit(cmd.concurrency)
|
||||
processTSM := func(db, rp, id, path string) error {
|
||||
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
|
||||
if err != nil {
|
||||
_, _ = fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
reader, err := tsm1.NewTSMReader(file)
|
||||
if err != nil {
|
||||
_, _ = fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", file.Name(), err)
|
||||
// NewTSMReader won't close the file handle on failure, so do it here.
|
||||
_ = file.Close()
|
||||
return nil
|
||||
}
|
||||
defer func() {
|
||||
// The TSMReader will close the underlying file handle here.
|
||||
if err := reader.Close(); err != nil {
|
||||
_, _ = fmt.Fprintf(cmd.Stderr, "error closing: %s: %v.\n", file.Name(), err)
|
||||
}
|
||||
}()
|
||||
|
||||
seriesCount := reader.KeyCount()
|
||||
for i := 0; i < seriesCount; i++ {
|
||||
func() {
|
||||
key, _ := reader.KeyAt(i)
|
||||
seriesKey, field, _ := bytes.Cut(key, []byte("#!~#"))
|
||||
measurement, tags := models.ParseKey(seriesKey)
|
||||
totalsTree.Record(0, totalDepth, db, rp, measurement, key, field, tags)
|
||||
}()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
done := ctx.Done()
|
||||
err = reporthelper.WalkShardDirs(cmd.dbPath, func(db, rp, id, path string) error {
|
||||
select {
|
||||
case <-done:
|
||||
return nil
|
||||
default:
|
||||
g.Go(func() error {
|
||||
return processTSM(db, rp, id, path)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
_, _ = fmt.Fprintf(cmd.Stderr, "%s: %v\n", cmd.dbPath, err)
|
||||
return err
|
||||
}
|
||||
err = g.Wait()
|
||||
if err != nil {
|
||||
_, _ = fmt.Fprintf(cmd.Stderr, "%s: %v\n", cmd.dbPath, err)
|
||||
return err
|
||||
}
|
||||
|
||||
tw := tabwriter.NewWriter(cmd.Stdout, 8, 2, 1, ' ', 0)
|
||||
|
||||
if err = factory.PrintHeader(tw); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = factory.PrintDivider(tw); err != nil {
|
||||
return err
|
||||
}
|
||||
for d, db := range totalsTree.Children() {
|
||||
for r, rp := range db.Children() {
|
||||
for m, measure := range rp.Children() {
|
||||
err = measure.Print(tw, true, fmt.Sprintf("%q", d), fmt.Sprintf("%q", r), fmt.Sprintf("%q", m))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err = rp.Print(tw, false, fmt.Sprintf("%q", d), fmt.Sprintf("%q", r), ""); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err = db.Print(tw, false, fmt.Sprintf("%q", d), "", ""); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err = totalsTree.Print(tw, false, "Total"+factory.EstTitle, "", ""); err != nil {
|
||||
return err
|
||||
}
|
||||
return tw.Flush()
|
||||
}
|
|
@ -38,6 +38,7 @@ The commands are:
|
|||
buildtsi generates tsi1 indexes from tsm1 data
|
||||
help display this help message
|
||||
report displays a shard level cardinality report
|
||||
report-db estimates cloud 2 cardinality for a database
|
||||
report-disk displays a shard level disk usage report
|
||||
verify verifies integrity of TSM files
|
||||
verify-seriesfile verifies integrity of the Series file
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/cmd"
|
||||
"github.com/influxdata/influxdb/cmd/influx_inspect/buildtsi"
|
||||
"github.com/influxdata/influxdb/cmd/influx_inspect/cardinality"
|
||||
"github.com/influxdata/influxdb/cmd/influx_inspect/deletetsm"
|
||||
"github.com/influxdata/influxdb/cmd/influx_inspect/dumptsi"
|
||||
"github.com/influxdata/influxdb/cmd/influx_inspect/dumptsm"
|
||||
|
@ -61,6 +62,11 @@ func (m *Main) Run(args ...string) error {
|
|||
if err := help.NewCommand().Run(args...); err != nil {
|
||||
return fmt.Errorf("help: %s", err)
|
||||
}
|
||||
case "report-db":
|
||||
name := cardinality.NewCommand()
|
||||
if err := name.Run(args...); err != nil {
|
||||
return fmt.Errorf("report-db: %w", err)
|
||||
}
|
||||
case "deletetsm":
|
||||
name := deletetsm.NewCommand()
|
||||
if err := name.Run(args...); err != nil {
|
||||
|
|
|
@ -53,11 +53,11 @@ func (cmd *Command) Run(args ...string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
newCounterFn := newHLLCounter
|
||||
newCounterFn := NewHLLCounter
|
||||
estTitle := " (est)"
|
||||
if cmd.exact {
|
||||
estTitle = ""
|
||||
newCounterFn = newExactCounter
|
||||
newCounterFn = NewExactCounter
|
||||
}
|
||||
|
||||
cmd.dir = fs.Arg(0)
|
||||
|
@ -68,11 +68,11 @@ func (cmd *Command) Run(args ...string) error {
|
|||
}
|
||||
|
||||
totalSeries := newCounterFn()
|
||||
tagCardinalities := map[string]counter{}
|
||||
measCardinalities := map[string]counter{}
|
||||
fieldCardinalities := map[string]counter{}
|
||||
tagCardinalities := map[string]Counter{}
|
||||
measCardinalities := map[string]Counter{}
|
||||
fieldCardinalities := map[string]Counter{}
|
||||
|
||||
dbCardinalities := map[string]counter{}
|
||||
dbCardinalities := map[string]Counter{}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
|
@ -210,7 +210,7 @@ func (cmd *Command) Run(args ...string) error {
|
|||
}
|
||||
|
||||
// sortKeys is a quick helper to return the sorted set of a map's keys
|
||||
func sortKeys(vals map[string]counter) (keys []string) {
|
||||
func sortKeys(vals map[string]Counter) (keys []string) {
|
||||
for k := range vals {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
|
@ -238,14 +238,14 @@ Usage: influx_inspect report [flags]
|
|||
fmt.Fprintf(cmd.Stdout, usage)
|
||||
}
|
||||
|
||||
// counter abstracts a a method of counting keys.
|
||||
type counter interface {
|
||||
// Counter abstracts a a method of counting keys.
|
||||
type Counter interface {
|
||||
Add(key []byte)
|
||||
Count() uint64
|
||||
}
|
||||
|
||||
// newHLLCounter returns an approximate counter using HyperLogLogs for cardinality estimation.
|
||||
func newHLLCounter() counter {
|
||||
// NewHLLCounter returns an approximate Counter using HyperLogLogs for cardinality estimation.
|
||||
func NewHLLCounter() Counter {
|
||||
return hllpp.New()
|
||||
}
|
||||
|
||||
|
@ -262,7 +262,7 @@ func (c *exactCounter) Count() uint64 {
|
|||
return uint64(len(c.m))
|
||||
}
|
||||
|
||||
func newExactCounter() counter {
|
||||
func NewExactCounter() Counter {
|
||||
return &exactCounter{
|
||||
m: make(map[string]struct{}),
|
||||
}
|
||||
|
|
2
go.mod
2
go.mod
|
@ -44,7 +44,7 @@ require (
|
|||
go.uber.org/multierr v1.6.0
|
||||
go.uber.org/zap v1.16.0
|
||||
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
||||
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e
|
||||
golang.org/x/text v0.3.7
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
||||
|
|
3
go.sum
3
go.sum
|
@ -1142,8 +1142,9 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
|
|||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4=
|
||||
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
|
Loading…
Reference in New Issue