feat(tsi1): provide API tooling for use in testing
parent
8f99d20deb
commit
bfd38d93d8
|
@ -15,7 +15,7 @@ import (
|
|||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
// Command represents the program execution for "influxd reporttsi".
|
||||
// Command represents the program execution for "influxd inspect report-tsi".
|
||||
var tsiFlags = struct {
|
||||
// Standard input/output, overridden for testing.
|
||||
Stderr io.Writer
|
||||
|
@ -88,13 +88,13 @@ func RunReportTsi(cmd *cobra.Command, args []string) error {
|
|||
if bucketID, err := influxdb.IDFromString(tsiFlags.bucket); err != nil {
|
||||
return err
|
||||
} else if report.OrgID == nil {
|
||||
return errors.New("org must be provided if filtering by bucket ")
|
||||
return errors.New("org must be provided if filtering by bucket")
|
||||
} else {
|
||||
report.BucketID = bucketID
|
||||
}
|
||||
}
|
||||
|
||||
err = report.Run()
|
||||
_, err = report.Run()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -59,9 +59,23 @@ func NewReportCommand() *ReportCommand {
|
|||
}
|
||||
}
|
||||
|
||||
// Run initializes the orgBucketCardinality map which can be used to find the cardinality
|
||||
// any org or bucket. Run() must be called before GetOrgCardinality() or GetOrgBucketCardinality()
|
||||
func (report *ReportCommand) Run() error {
|
||||
// ReportTsiSummary is returned by a report-tsi Run() command and is used to access cardinality information
|
||||
type ReportTsiSummary struct {
|
||||
OrgCardinality map[influxdb.ID]int64
|
||||
BucketCardinality map[influxdb.ID]int64
|
||||
}
|
||||
|
||||
func newTsiSummary() *ReportTsiSummary {
|
||||
return &ReportTsiSummary{
|
||||
OrgCardinality: map[influxdb.ID]int64{},
|
||||
BucketCardinality: map[influxdb.ID]int64{},
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs the report-tsi tool which can be used to find the cardinality
|
||||
// any org or bucket. Run returns a *ReportTsiSummary, which contains maps for finding
|
||||
// the cardinality of a bucket or org based on it's influxdb.ID
|
||||
func (report *ReportCommand) Run() (*ReportTsiSummary, error) {
|
||||
report.Stdout = os.Stdout
|
||||
|
||||
if report.SeriesDirPath == "" {
|
||||
|
@ -72,7 +86,7 @@ func (report *ReportCommand) Run() error {
|
|||
sFile.WithLogger(report.Logger)
|
||||
if err := sFile.Open(context.Background()); err != nil {
|
||||
report.Logger.Error("failed to open series")
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
defer sFile.Close()
|
||||
report.sfile = sFile
|
||||
|
@ -80,7 +94,7 @@ func (report *ReportCommand) Run() error {
|
|||
path := filepath.Join(report.DataPath, "index")
|
||||
report.indexFile = NewIndex(sFile, NewConfig(), WithPath(path))
|
||||
if err := report.indexFile.Open(context.Background()); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
defer report.indexFile.Close()
|
||||
|
||||
|
@ -90,9 +104,9 @@ func (report *ReportCommand) Run() error {
|
|||
// Blocks until all work done.
|
||||
report.calculateCardinalities(fn)
|
||||
|
||||
// Print summary.
|
||||
report.printOrgBucketCardinality()
|
||||
return nil
|
||||
// Generate and print summary.
|
||||
summary := report.printOrgBucketCardinality()
|
||||
return summary, nil
|
||||
}
|
||||
|
||||
// calculateCardinalities calculates the cardinalities of the set of shard being
|
||||
|
@ -272,10 +286,10 @@ func (a results) Len() int { return len(a) }
|
|||
func (a results) Less(i, j int) bool { return a[i].count < a[j].count }
|
||||
func (a results) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
||||
func (report *ReportCommand) printSummaryByMeasurement() error {
|
||||
report.printOrgBucketCardinality()
|
||||
return nil
|
||||
}
|
||||
// func (report *ReportCommand) printSummaryByMeasurement() error {
|
||||
// report.printOrgBucketCardinality()
|
||||
// return nil
|
||||
// }
|
||||
|
||||
// GetOrgCardinality returns the total cardinality of the org provided.
|
||||
// Can only be called after Run().
|
||||
|
@ -293,18 +307,24 @@ func (report *ReportCommand) GetBucketCardinality(orgID, bucketID influxdb.ID) i
|
|||
return report.orgBucketCardinality[orgID][bucketID].cardinality()
|
||||
}
|
||||
|
||||
func (report *ReportCommand) printOrgBucketCardinality() {
|
||||
func (report *ReportCommand) printOrgBucketCardinality() *ReportTsiSummary {
|
||||
tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0)
|
||||
|
||||
// Generate a new summary
|
||||
summary := newTsiSummary()
|
||||
|
||||
totalCard := int64(0)
|
||||
orgTotals := make(map[influxdb.ID]int64)
|
||||
for org, orgToBucket := range report.orgBucketCardinality {
|
||||
orgTotal := int64(0)
|
||||
for _, bucketCard := range orgToBucket {
|
||||
totalCard += bucketCard.cardinality()
|
||||
orgTotal += bucketCard.cardinality()
|
||||
for bucketID, bucketCard := range orgToBucket {
|
||||
c := bucketCard.cardinality()
|
||||
totalCard += c
|
||||
orgTotal += c
|
||||
summary.BucketCardinality[bucketID] = c
|
||||
}
|
||||
orgTotals[org] = orgTotal
|
||||
summary.OrgCardinality[org] = orgTotal
|
||||
}
|
||||
|
||||
fmt.Fprintf(tw, "Summary (total): %v \n\n", totalCard)
|
||||
|
@ -317,4 +337,6 @@ func (report *ReportCommand) printOrgBucketCardinality() {
|
|||
fmt.Fprintf(tw, " Bucket %s %d\n", bucketName.String(), bucketCard.cardinality())
|
||||
}
|
||||
}
|
||||
|
||||
return summary
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue