Merge pull request #14470 from influxdata/mu-reportTSI-2.x
feat(tsi1): add report-tsi tool to 2.xpull/14592/head
commit
a2fc43ff0c
|
|
@ -18,6 +18,7 @@ func NewCommand() *cobra.Command {
|
|||
NewReportTSMCommand(),
|
||||
NewVerifyTSMCommand(),
|
||||
NewVerifyWALCommand(),
|
||||
NewReportTSICommand(),
|
||||
}
|
||||
|
||||
for _, command := range subCommands {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,100 @@
|
|||
package inspect
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
|
||||
"github.com/influxdata/influxdb/tsdb/tsi1"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
// Command represents the program execution for "influxd inspect report-tsi".
|
||||
var reportTSIFlags = struct {
|
||||
// Standard input/output, overridden for testing.
|
||||
Stderr io.Writer
|
||||
Stdout io.Writer
|
||||
|
||||
// Data path options
|
||||
Path string // optional. Defaults to dbPath/engine/index
|
||||
SeriesFilePath string // optional. Defaults to dbPath/_series
|
||||
|
||||
// Tenant filtering options
|
||||
Org string
|
||||
Bucket string
|
||||
|
||||
// Reporting options
|
||||
TopN int
|
||||
ByMeasurement bool
|
||||
byTagKey bool // currently unused
|
||||
}{}
|
||||
|
||||
// NewReportTsiCommand returns a new instance of Command with default setting applied.
|
||||
func NewReportTSICommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "report-tsi",
|
||||
Short: "Reports the cardinality of TSI files",
|
||||
Long: `This command will analyze TSI files within a storage engine directory, reporting
|
||||
the cardinality of data within the files, divided into org and bucket cardinalities.
|
||||
|
||||
For each report, the following is output:
|
||||
|
||||
* All orgs and buckets in the index;
|
||||
* The series cardinality within each org and each bucket;
|
||||
* The time taken to read the index.
|
||||
|
||||
Depending on the --measurements flag, series cardinality is segmented
|
||||
in the following ways:
|
||||
|
||||
* Series cardinality for each organization;
|
||||
* Series cardinality for each bucket;
|
||||
* Series cardinality for each measurement;`,
|
||||
RunE: RunReportTSI,
|
||||
}
|
||||
|
||||
cmd.Flags().StringVar(&reportTSIFlags.Path, "path", os.Getenv("HOME")+"/.influxdbv2/engine/index", "Path to index. Defaults $HOME/.influxdbv2/engine/index")
|
||||
cmd.Flags().StringVar(&reportTSIFlags.SeriesFilePath, "series-file", os.Getenv("HOME")+"/.influxdbv2/engine/_series", "Optional path to series file. Defaults $HOME/.influxdbv2/engine/_series")
|
||||
cmd.Flags().BoolVarP(&reportTSIFlags.ByMeasurement, "measurements", "m", false, "Segment cardinality by measurements")
|
||||
cmd.Flags().IntVarP(&reportTSIFlags.TopN, "top", "t", 0, "Limit results to top n")
|
||||
cmd.Flags().StringVarP(&reportTSIFlags.Bucket, "bucket_id", "b", "", "If bucket is specified, org must be specified. A bucket id must be a base-16 string")
|
||||
cmd.Flags().StringVarP(&reportTSIFlags.Org, "org_id", "o", "", "Only specified org data will be reported. An org id must be a base-16 string")
|
||||
|
||||
cmd.SetOutput(reportTSIFlags.Stdout)
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
// RunReportTSI executes the run command for ReportTSI.
|
||||
func RunReportTSI(cmd *cobra.Command, args []string) error {
|
||||
report := tsi1.NewReportCommand()
|
||||
report.DataPath = reportTSIFlags.Path
|
||||
report.ByMeasurement = reportTSIFlags.ByMeasurement
|
||||
report.TopN = reportTSIFlags.TopN
|
||||
report.SeriesDirPath = reportTSIFlags.SeriesFilePath
|
||||
|
||||
report.Stdout = os.Stdout
|
||||
report.Stderr = os.Stderr
|
||||
|
||||
var err error
|
||||
if reportTSIFlags.Org != "" {
|
||||
if report.OrgID, err = influxdb.IDFromString(reportTSIFlags.Org); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if reportTSIFlags.Bucket != "" {
|
||||
if report.BucketID, err = influxdb.IDFromString(reportTSIFlags.Bucket); err != nil {
|
||||
return err
|
||||
} else if report.OrgID == nil {
|
||||
return errors.New("org must be provided if filtering by bucket")
|
||||
}
|
||||
}
|
||||
|
||||
// Run command with printing enabled
|
||||
if _, err = report.Run(true); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,367 @@
|
|||
package tsi1
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"sort"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
const (
|
||||
// Number of series IDs to stored in slice before we convert to a roaring
|
||||
// bitmap. Roaring bitmaps have a non-trivial initial cost to construct.
|
||||
useBitmapN = 25
|
||||
)
|
||||
|
||||
// ReportCommand represents the program execution for "influxd inspect report-tsi".
|
||||
type ReportCommand struct {
|
||||
// Standard input/output, overridden for testing.
|
||||
Stderr io.Writer
|
||||
Stdout io.Writer
|
||||
|
||||
// Filters
|
||||
DataPath string
|
||||
OrgID, BucketID *influxdb.ID
|
||||
|
||||
byOrgBucket map[influxdb.ID]map[influxdb.ID]*cardinality
|
||||
byBucketMeasurement map[influxdb.ID]map[string]*cardinality
|
||||
orgToBucket map[influxdb.ID][]influxdb.ID
|
||||
|
||||
SeriesDirPath string // optional. Defaults to dbPath/_series
|
||||
sfile *tsdb.SeriesFile
|
||||
indexFile *Index
|
||||
|
||||
TopN int
|
||||
ByMeasurement bool
|
||||
byTagKey bool
|
||||
|
||||
start time.Time
|
||||
}
|
||||
|
||||
// NewReportCommand returns a new instance of ReportCommand with default setting applied.
|
||||
func NewReportCommand() *ReportCommand {
|
||||
return &ReportCommand{
|
||||
byOrgBucket: make(map[influxdb.ID]map[influxdb.ID]*cardinality),
|
||||
byBucketMeasurement: make(map[influxdb.ID]map[string]*cardinality),
|
||||
orgToBucket: make(map[influxdb.ID][]influxdb.ID),
|
||||
TopN: 0,
|
||||
byTagKey: false,
|
||||
}
|
||||
}
|
||||
|
||||
// ReportTSISummary is returned by a report-tsi Run() command and is used to access cardinality information
|
||||
type Summary struct {
|
||||
TotalCardinality int64
|
||||
OrgCardinality map[influxdb.ID]int64
|
||||
BucketByOrgCardinality map[influxdb.ID]map[influxdb.ID]int64
|
||||
BucketMeasurementCardinality map[influxdb.ID]map[string]int64
|
||||
}
|
||||
|
||||
func newSummary() *Summary {
|
||||
return &Summary{
|
||||
OrgCardinality: make(map[influxdb.ID]int64),
|
||||
BucketByOrgCardinality: make(map[influxdb.ID]map[influxdb.ID]int64),
|
||||
BucketMeasurementCardinality: make(map[influxdb.ID]map[string]int64),
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs the report-tsi tool which can be used to find the cardinality
|
||||
// any org or bucket. Run returns a *ReportTSISummary, which contains maps for finding
|
||||
// the cardinality of a bucket or org based on its influxdb.ID
|
||||
func (report *ReportCommand) Run(print bool) (*Summary, error) {
|
||||
report.start = time.Now()
|
||||
|
||||
sfile := tsdb.NewSeriesFile(report.SeriesDirPath)
|
||||
|
||||
if err := sfile.Open(context.Background()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer sfile.Close()
|
||||
report.sfile = sfile
|
||||
|
||||
report.indexFile = NewIndex(sfile, NewConfig(), WithPath(report.DataPath))
|
||||
if err := report.indexFile.Open(context.Background()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer report.indexFile.Close()
|
||||
|
||||
summary, err := report.calculateOrgBucketCardinality()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if print {
|
||||
report.printCardinalitySummary(summary)
|
||||
}
|
||||
|
||||
return summary, nil
|
||||
}
|
||||
|
||||
type cardinality struct {
|
||||
name []byte
|
||||
short []uint32
|
||||
set *tsdb.SeriesIDSet
|
||||
}
|
||||
|
||||
func (c *cardinality) add(x uint64) {
|
||||
if c.set != nil {
|
||||
c.set.AddNoLock(tsdb.NewSeriesID(x))
|
||||
return
|
||||
}
|
||||
|
||||
c.short = append(c.short, uint32(x)) // Series IDs never get beyond 2^32
|
||||
|
||||
// Cheaper to store in bitmap.
|
||||
if len(c.short) > useBitmapN {
|
||||
c.set = tsdb.NewSeriesIDSet()
|
||||
for i := 0; i < len(c.short); i++ {
|
||||
c.set.AddNoLock(tsdb.NewSeriesID(uint64(c.short[i])))
|
||||
}
|
||||
c.short = nil
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cardinality) cardinality() int64 {
|
||||
if c == nil || (c.short == nil && c.set == nil) {
|
||||
return 0
|
||||
}
|
||||
|
||||
if c.short != nil {
|
||||
return int64(len(c.short))
|
||||
}
|
||||
return int64(c.set.Cardinality())
|
||||
}
|
||||
|
||||
func (report *ReportCommand) calculateCardinalities() error {
|
||||
itr, err := report.indexFile.MeasurementIterator()
|
||||
if err != nil {
|
||||
return err
|
||||
} else if itr == nil {
|
||||
return nil
|
||||
}
|
||||
defer itr.Close()
|
||||
|
||||
for {
|
||||
name, err := itr.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
} else if name == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err = report.calculateMeasurementCardinalities(name); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (report *ReportCommand) calculateMeasurementCardinalities(name []byte) error {
|
||||
// decode org and bucket from measurement name
|
||||
var a [16]byte
|
||||
copy(a[:], name[:16])
|
||||
org, bucket := tsdb.DecodeName(a)
|
||||
if report.OrgID != nil && *report.OrgID != org ||
|
||||
report.BucketID != nil && *report.BucketID != bucket {
|
||||
return nil
|
||||
}
|
||||
|
||||
idx := report.indexFile
|
||||
sitr, err := idx.MeasurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if sitr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
defer sitr.Close()
|
||||
|
||||
var bucketCard *cardinality
|
||||
|
||||
// initialize map of bucket to measurements
|
||||
if _, ok := report.byBucketMeasurement[bucket]; !ok {
|
||||
report.byBucketMeasurement[bucket] = make(map[string]*cardinality)
|
||||
}
|
||||
|
||||
if _, ok := report.byOrgBucket[org]; !ok {
|
||||
report.byOrgBucket[org] = make(map[influxdb.ID]*cardinality)
|
||||
}
|
||||
|
||||
// initialize total cardinality tracking struct for this bucket
|
||||
if c, ok := report.byOrgBucket[org][bucket]; !ok {
|
||||
bucketCard = &cardinality{name: []byte(bucket.String())}
|
||||
report.byOrgBucket[org][bucket] = bucketCard
|
||||
} else {
|
||||
bucketCard = c
|
||||
}
|
||||
|
||||
for {
|
||||
e, err := sitr.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
} else if e.SeriesID.ID == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
id := e.SeriesID.ID
|
||||
if id > math.MaxUint32 {
|
||||
return fmt.Errorf("series ID is too large: %d (max %d). Corrupted series file?", e.SeriesID, uint32(math.MaxUint32))
|
||||
}
|
||||
|
||||
// add cardinality to bucket
|
||||
bucketCard.add(id)
|
||||
|
||||
// retrieve tags associated with series id so we can get
|
||||
// associated measurement
|
||||
_, tags := report.sfile.Series(e.SeriesID)
|
||||
if len(tags) == 0 {
|
||||
return fmt.Errorf("series ID has empty key: %d", e.SeriesID)
|
||||
}
|
||||
|
||||
// measurement name should be first tag
|
||||
if !bytes.Equal(tags[0].Key, models.MeasurementTagKeyBytes) {
|
||||
return fmt.Errorf("corrupted data: first tag should be measurement name, got: %v", string(tags[0].Value))
|
||||
}
|
||||
mName := string(tags[0].Value)
|
||||
|
||||
// update measurement-level cardinality if tracking by measurement
|
||||
if report.ByMeasurement {
|
||||
var mCard *cardinality
|
||||
if cardForM, ok := report.byBucketMeasurement[bucket][mName]; !ok {
|
||||
mCard = &cardinality{name: []byte(mName)}
|
||||
report.byBucketMeasurement[bucket][mName] = mCard
|
||||
} else {
|
||||
mCard = cardForM
|
||||
}
|
||||
mCard.add(id)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (report *ReportCommand) calculateOrgBucketCardinality() (*Summary, error) {
|
||||
if err := report.calculateCardinalities(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var totalCard int64
|
||||
// Generate a new summary
|
||||
summary := newSummary()
|
||||
for orgID, bucketMap := range report.byOrgBucket {
|
||||
summary.BucketByOrgCardinality[orgID] = make(map[influxdb.ID]int64)
|
||||
orgTotal := int64(0)
|
||||
for bucketID, bucketCard := range bucketMap {
|
||||
count := bucketCard.cardinality()
|
||||
summary.BucketByOrgCardinality[orgID][bucketID] = count
|
||||
summary.BucketMeasurementCardinality[bucketID] = make(map[string]int64)
|
||||
orgTotal += count
|
||||
totalCard += count
|
||||
}
|
||||
summary.OrgCardinality[orgID] = orgTotal
|
||||
}
|
||||
|
||||
summary.TotalCardinality = totalCard
|
||||
|
||||
for bucketID, bucketMeasurement := range report.byBucketMeasurement {
|
||||
for mName, mCard := range bucketMeasurement {
|
||||
summary.BucketMeasurementCardinality[bucketID][mName] = mCard.cardinality()
|
||||
}
|
||||
}
|
||||
|
||||
return summary, nil
|
||||
}
|
||||
|
||||
func (report *ReportCommand) printCardinalitySummary(summary *Summary) {
|
||||
tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0)
|
||||
fmt.Fprint(tw, "\n")
|
||||
|
||||
fmt.Fprintf(tw, "Total: %d\n", summary.TotalCardinality)
|
||||
// sort total org and bucket and limit to top n values
|
||||
sortedOrgs := sortKeys(summary.OrgCardinality, report.TopN)
|
||||
|
||||
for i, orgResult := range sortedOrgs {
|
||||
orgID, _ := influxdb.IDFromString(orgResult.id)
|
||||
sortedBuckets := sortKeys(summary.BucketByOrgCardinality[*orgID], report.TopN)
|
||||
// if we specify a bucket, we do not print the org cardinality
|
||||
fmt.Fprintln(tw, "===============")
|
||||
if report.BucketID == nil {
|
||||
fmt.Fprintf(tw, "Org %s total: %d\n", orgResult.id, orgResult.card)
|
||||
}
|
||||
|
||||
for _, bucketResult := range sortedBuckets {
|
||||
fmt.Fprintf(tw, "\tBucket %s total: %d\n", bucketResult.id, bucketResult.card)
|
||||
|
||||
if report.ByMeasurement {
|
||||
bucketID, _ := influxdb.IDFromString(bucketResult.id)
|
||||
sortedMeasurements := sortMeasurements(summary.BucketMeasurementCardinality[*bucketID], report.TopN)
|
||||
|
||||
for _, measResult := range sortedMeasurements {
|
||||
fmt.Fprintf(tw, "\t\t_m=%s\t%d\n", measResult.id, measResult.card)
|
||||
}
|
||||
}
|
||||
}
|
||||
if i == len(sortedOrgs)-1 {
|
||||
fmt.Fprintln(tw, "===============")
|
||||
}
|
||||
}
|
||||
fmt.Fprint(tw, "\n\n")
|
||||
|
||||
elapsed := time.Since(report.start)
|
||||
fmt.Fprintf(tw, "Finished in %v\n", elapsed)
|
||||
|
||||
tw.Flush()
|
||||
}
|
||||
|
||||
// sortKeys is a quick helper to return the sorted set of a map's keys
|
||||
// sortKeys will only return report.topN keys if the flag is set
|
||||
type result struct {
|
||||
id string
|
||||
card int64
|
||||
}
|
||||
|
||||
type resultList []result
|
||||
|
||||
func (a resultList) Len() int { return len(a) }
|
||||
func (a resultList) Less(i, j int) bool { return a[i].card < a[j].card }
|
||||
func (a resultList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
||||
func sortKeys(vals map[influxdb.ID]int64, topN int) resultList {
|
||||
sorted := make(resultList, 0)
|
||||
for k, v := range vals {
|
||||
sorted = append(sorted, result{k.String(), v})
|
||||
}
|
||||
sort.Sort(sort.Reverse(sorted))
|
||||
|
||||
if topN == 0 {
|
||||
return sorted
|
||||
}
|
||||
if topN > len(sorted) {
|
||||
topN = len(sorted)
|
||||
}
|
||||
return sorted[:topN]
|
||||
}
|
||||
|
||||
func sortMeasurements(vals map[string]int64, topN int) resultList {
|
||||
sorted := make(resultList, 0)
|
||||
for k, v := range vals {
|
||||
sorted = append(sorted, result{k, v})
|
||||
}
|
||||
sort.Sort(sort.Reverse(sorted))
|
||||
|
||||
if topN == 0 {
|
||||
return sorted
|
||||
}
|
||||
if topN > len(sorted) {
|
||||
topN = len(sorted)
|
||||
}
|
||||
return sorted[:topN]
|
||||
}
|
||||
Loading…
Reference in New Issue