243 lines
5.7 KiB
Go
243 lines
5.7 KiB
Go
package aggregators
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"text/tabwriter"
|
|
|
|
"github.com/influxdata/influxdb/cmd/influx_inspect/report"
|
|
"github.com/influxdata/influxdb/models"
|
|
)
|
|
|
|
type rollupNodeMap map[string]RollupNode
|
|
|
|
type RollupNode interface {
|
|
sync.Locker
|
|
report.Counter
|
|
Children() rollupNodeMap
|
|
RecordSeries(db, rp, ms string, key, field []byte, tags models.Tags)
|
|
Print(tw *tabwriter.Writer, printTags bool, db, rp, ms string) error
|
|
isLeaf() bool
|
|
child(key string, isLeaf bool) NodeWrapper
|
|
}
|
|
|
|
type NodeWrapper struct {
|
|
RollupNode
|
|
}
|
|
|
|
var detailedHeader = []string{"DB", "RP", "measurement", "series", "fields", "tag total", "tags"}
|
|
var simpleHeader = []string{"DB", "RP", "measurement", "series"}
|
|
|
|
type RollupNodeFactory struct {
|
|
header []string
|
|
EstTitle string
|
|
NewNode func(isLeaf bool) NodeWrapper
|
|
counter func() report.Counter
|
|
}
|
|
|
|
var nodeFactory *RollupNodeFactory
|
|
|
|
func CreateNodeFactory(detailed, exact bool) *RollupNodeFactory {
|
|
estTitle := " (est.)"
|
|
newCounterFn := report.NewHLLCounter
|
|
if exact {
|
|
newCounterFn = report.NewExactCounter
|
|
estTitle = ""
|
|
}
|
|
|
|
if detailed {
|
|
nodeFactory = newDetailedNodeFactory(newCounterFn, estTitle)
|
|
} else {
|
|
nodeFactory = newSimpleNodeFactory(newCounterFn, estTitle)
|
|
}
|
|
return nodeFactory
|
|
}
|
|
|
|
func (f *RollupNodeFactory) PrintHeader(tw *tabwriter.Writer) error {
|
|
_, err := fmt.Fprintln(tw, strings.Join(f.header, "\t"))
|
|
return err
|
|
}
|
|
|
|
func (f *RollupNodeFactory) PrintDivider(tw *tabwriter.Writer) error {
|
|
divLine := f.makeTabDivider()
|
|
_, err := fmt.Fprintln(tw, divLine)
|
|
return err
|
|
}
|
|
|
|
func (f *RollupNodeFactory) makeTabDivider() string {
|
|
div := make([]string, 0, len(f.header))
|
|
for _, s := range f.header {
|
|
div = append(div, strings.Repeat("-", len(s)))
|
|
}
|
|
return strings.Join(div, "\t")
|
|
}
|
|
|
|
func newSimpleNodeFactory(newCounterFn func() report.Counter, est string) *RollupNodeFactory {
|
|
return &RollupNodeFactory{
|
|
header: simpleHeader,
|
|
EstTitle: est,
|
|
NewNode: func(isLeaf bool) NodeWrapper { return NodeWrapper{newSimpleNode(isLeaf, newCounterFn)} },
|
|
counter: newCounterFn,
|
|
}
|
|
}
|
|
|
|
func newDetailedNodeFactory(newCounterFn func() report.Counter, est string) *RollupNodeFactory {
|
|
return &RollupNodeFactory{
|
|
header: detailedHeader,
|
|
EstTitle: est,
|
|
NewNode: func(isLeaf bool) NodeWrapper { return NodeWrapper{newDetailedNode(isLeaf, newCounterFn)} },
|
|
counter: newCounterFn,
|
|
}
|
|
}
|
|
|
|
type simpleNode struct {
|
|
sync.Mutex
|
|
report.Counter
|
|
rollupNodeMap
|
|
}
|
|
|
|
func (s *simpleNode) Children() rollupNodeMap {
|
|
return s.rollupNodeMap
|
|
}
|
|
|
|
func (s *simpleNode) child(key string, isLeaf bool) NodeWrapper {
|
|
if s.isLeaf() {
|
|
panic("Trying to get the child to a leaf node")
|
|
}
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
c, ok := s.Children()[key]
|
|
if !ok {
|
|
c = nodeFactory.NewNode(isLeaf)
|
|
s.Children()[key] = c
|
|
}
|
|
return NodeWrapper{c}
|
|
}
|
|
|
|
func (s *simpleNode) isLeaf() bool {
|
|
return s.Children() == nil
|
|
}
|
|
|
|
func newSimpleNode(isLeaf bool, fn func() report.Counter) *simpleNode {
|
|
s := &simpleNode{Counter: fn()}
|
|
if !isLeaf {
|
|
s.rollupNodeMap = make(rollupNodeMap)
|
|
} else {
|
|
s.rollupNodeMap = nil
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (s *simpleNode) RecordSeries(db, rp, _ string, key, _ []byte, _ models.Tags) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
s.recordSeriesNoLock(db, rp, key)
|
|
}
|
|
|
|
func (s *simpleNode) recordSeriesNoLock(db, rp string, key []byte) {
|
|
s.Add([]byte(fmt.Sprintf("%s.%s.%s", db, rp, key)))
|
|
}
|
|
|
|
func (s *simpleNode) Print(tw *tabwriter.Writer, _ bool, db, rp, ms string) error {
|
|
_, err := fmt.Fprintf(tw, "%s\t%s\t%s\t%d\n",
|
|
db,
|
|
rp,
|
|
ms,
|
|
s.Count())
|
|
return err
|
|
}
|
|
|
|
type detailedNode struct {
|
|
simpleNode
|
|
fields report.Counter
|
|
tags map[string]report.Counter
|
|
}
|
|
|
|
func newDetailedNode(isLeaf bool, fn func() report.Counter) *detailedNode {
|
|
d := &detailedNode{
|
|
simpleNode: simpleNode{
|
|
Counter: fn(),
|
|
},
|
|
fields: fn(),
|
|
tags: make(map[string]report.Counter),
|
|
}
|
|
if !isLeaf {
|
|
d.simpleNode.rollupNodeMap = make(rollupNodeMap)
|
|
} else {
|
|
d.simpleNode.rollupNodeMap = nil
|
|
}
|
|
return d
|
|
}
|
|
|
|
func (d *detailedNode) RecordSeries(db, rp, ms string, key, field []byte, tags models.Tags) {
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
d.simpleNode.recordSeriesNoLock(db, rp, key)
|
|
d.fields.Add([]byte(fmt.Sprintf("%s.%s.%s.%s", db, rp, ms, field)))
|
|
for _, t := range tags {
|
|
// Add database, retention policy, and measurement
|
|
// to correctly aggregate in inner (non-leaf) nodes
|
|
canonTag := fmt.Sprintf("%s.%s.%s.%s", db, rp, ms, t.Key)
|
|
tc, ok := d.tags[canonTag]
|
|
if !ok {
|
|
tc = nodeFactory.counter()
|
|
d.tags[canonTag] = tc
|
|
}
|
|
tc.Add(t.Value)
|
|
}
|
|
}
|
|
|
|
func (d *detailedNode) Print(tw *tabwriter.Writer, printTags bool, db, rp, ms string) error {
|
|
seriesN := d.Count()
|
|
fieldsN := d.fields.Count()
|
|
var tagKeys []string
|
|
tagN := uint64(0)
|
|
|
|
if printTags {
|
|
tagKeys = make([]string, 0, len(d.tags))
|
|
}
|
|
for k, v := range d.tags {
|
|
c := v.Count()
|
|
tagN += c
|
|
if printTags {
|
|
tagKeys = append(tagKeys, fmt.Sprintf("%q: %d", k[strings.LastIndex(k, ".")+1:], c))
|
|
}
|
|
}
|
|
_, err := fmt.Fprintf(tw, "%s\t%s\t%s\t%d\t%d\t%d\t%s\n",
|
|
db,
|
|
rp,
|
|
ms,
|
|
seriesN,
|
|
fieldsN,
|
|
tagN,
|
|
strings.Join(tagKeys, ", "))
|
|
return err
|
|
}
|
|
|
|
func (r *NodeWrapper) Record(depth, totalDepth int, db, rp, measurement string, key []byte, field []byte, tags models.Tags) {
|
|
r.RecordSeries(db, rp, measurement, key, field, tags)
|
|
|
|
switch depth {
|
|
case 2:
|
|
if depth < totalDepth {
|
|
// Create measurement level in tree
|
|
c := r.child(measurement, true)
|
|
c.RecordSeries(db, rp, measurement, key, field, tags)
|
|
}
|
|
case 1:
|
|
if depth < totalDepth {
|
|
// Create retention policy level in tree
|
|
c := r.child(rp, (depth+1) == totalDepth)
|
|
c.Record(depth+1, totalDepth, db, rp, measurement, key, field, tags)
|
|
}
|
|
case 0:
|
|
if depth < totalDepth {
|
|
// Create database level in tree
|
|
c := r.child(db, (depth+1) == totalDepth)
|
|
c.Record(depth+1, totalDepth, db, rp, measurement, key, field, tags)
|
|
}
|
|
default:
|
|
}
|
|
}
|