
342 lines
10 KiB
Raw Normal View History

2019-02-18 12:47:39 +00:00
package tsm1
import (
2019-02-18 12:47:39 +00:00
// Report runs a report over tsm data
type Report struct {
Stderr io.Writer
Stdout io.Writer
Dir string
OrgID, BucketID *influxdb.ID // Calculate only results for the provided org or bucket id.
Pattern string // Providing "01.tsm" for example would filter for level 1 files.
Detailed bool // Detailed will segment cardinality by tag keys.
Exact bool // Exact determines if estimation or exact methods are used to determine cardinality.
// ReportSummary provides a summary of the cardinalities in the processed fileset.
type ReportSummary struct {
Min, Max int64
Total uint64 //The exact or estimated unique set of series keys across all files.
Organizations map[string]uint64 // The exact or estimated unique set of series keys segmented by org.
Buckets map[string]uint64 // The exact or estimated unique set of series keys segmented by bucket.
// These are calculated when the detailed flag is in use.
Measurements map[string]uint64 // The exact or estimated unique set of series keys segmented by the measurement tag.
FieldKeys map[string]uint64 // The exact or estimated unique set of series keys segmented by the field tag.
TagKeys map[string]uint64 // The exact or estimated unique set of series keys segmented by tag keys.
func newReportSummary() *ReportSummary {
return &ReportSummary{
Organizations: map[string]uint64{},
Buckets: map[string]uint64{},
Measurements: map[string]uint64{},
FieldKeys: map[string]uint64{},
TagKeys: map[string]uint64{},
2019-02-18 12:47:39 +00:00
// Run executes the Report.
// Calling Run with print set to true emits data about each file to the report's
// Stdout fd as it is generated.
func (r *Report) Run(print bool) (*ReportSummary, error) {
2019-02-18 12:47:39 +00:00
if r.Stderr == nil {
r.Stderr = os.Stderr
if r.Stdout == nil {
r.Stdout = os.Stdout
if !print {
r.Stderr, r.Stdout = ioutil.Discard, ioutil.Discard
2019-02-18 12:47:39 +00:00
newCounterFn := newHLLCounter
estTitle := " (est)"
if r.Exact {
estTitle = ""
newCounterFn = newExactCounter
fi, err := os.Stat(r.Dir)
if err != nil {
return nil, err
2019-02-18 12:47:39 +00:00
} else if !fi.IsDir() {
return nil, errors.New("data directory not valid")
2019-02-18 12:47:39 +00:00
totalSeries := newCounterFn() // The exact or estimated unique set of series keys across all files.
orgCardinalities := map[string]counter{} // The exact or estimated unique set of series keys segmented by org.
bucketCardinalities := map[string]counter{} // The exact or estimated unique set of series keys segmented by bucket.
// These are calculated when the detailed flag is in use.
mCardinalities := map[string]counter{} // The exact or estimated unique set of series keys segmented by the measurement tag.
fCardinalities := map[string]counter{} // The exact or estimated unique set of series keys segmented by the field tag.
tCardinalities := map[string]counter{} // The exact or estimated unique set of series keys segmented by tag keys.
start := time.Now()
tw := tabwriter.NewWriter(r.Stdout, 8, 2, 1, ' ', 0)
fmt.Fprintln(tw, strings.Join([]string{"File", "Series", "New" + estTitle, "Min Time", "Max Time", "Load Time"}, "\t"))
minTime, maxTime := int64(math.MaxInt64), int64(math.MinInt64)
files, err := filepath.Glob(filepath.Join(r.Dir, "*.tsm"))
if err != nil {
panic(err) // Only error would be a bad pattern; not runtime related.
var processedFiles int
2019-02-18 13:42:14 +00:00
var tagBuf models.Tags // Buffer that can be re-used when parsing keys.
2019-02-18 12:47:39 +00:00
for _, path := range files {
if r.Pattern != "" && strings.Contains(path, r.Pattern) {
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
fmt.Fprintf(r.Stderr, "error: %s: %v. Exiting.\n", path, err)
return nil, err
2019-02-18 12:47:39 +00:00
loadStart := time.Now()
reader, err := NewTSMReader(file)
if err != nil {
fmt.Fprintf(r.Stderr, "error: %s: %v. Skipping file.\n", file.Name(), err)
loadTime := time.Since(loadStart)
// Tracks the current total, so it's possible to know how many new series this file adds.
currentTotalCount := totalSeries.Count()
seriesCount := reader.KeyCount()
itr := reader.Iterator(nil)
if itr == nil {
return nil, errors.New("invalid TSM file, no index iterator")
2019-02-18 12:47:39 +00:00
for itr.Next() {
key := itr.Key()
var a [16]byte // TODO(edd) if this shows up we can use a different API to DecodeName.
copy(a[:], key[:16])
org, bucket := tsdb.DecodeName(a)
if r.OrgID != nil && *r.OrgID != org { // If filtering on single org or bucket then skip if no match
// org does not match.
} else if r.BucketID != nil && *r.BucketID != bucket {
// bucket does not match.
totalSeries.Add(key) // Update total cardinality.
// Update org cardinality
orgCount := orgCardinalities[org.String()]
if orgCount == nil {
orgCount = newCounterFn()
orgCardinalities[org.String()] = orgCount
// Update bucket cardinality.
bucketCount := bucketCardinalities[bucket.String()]
if bucketCount == nil {
bucketCount = newCounterFn()
bucketCardinalities[bucket.String()] = bucketCount
// Update tag cardinalities.
if r.Detailed {
sep := bytes.Index(key, KeyFieldSeparatorBytes)
seriesKey := key[:sep] // Snip the tsm1 field key off.
2019-02-18 13:42:14 +00:00
_, tagBuf = models.ParseKeyBytesWithTags(seriesKey, tagBuf)
2019-02-18 12:47:39 +00:00
2019-02-18 13:42:14 +00:00
for _, t := range tagBuf {
2019-02-18 12:47:39 +00:00
tk := string(t.Key)
switch tk {
2019-03-19 15:12:35 +00:00
case models.MeasurementTagKey:
2019-02-18 13:42:14 +00:00
mname := string(t.Value)
2019-02-18 12:47:39 +00:00
// Total series cardinality segmented by measurement name.
2019-02-18 13:42:14 +00:00
mCount := mCardinalities[mname] // measurement name.
2019-02-18 12:47:39 +00:00
if mCount == nil {
mCount = newCounterFn()
2019-02-18 13:42:14 +00:00
mCardinalities[mname] = mCount
2019-02-18 12:47:39 +00:00
mCount.Add(key) // full series keys associated with measurement name.
2019-03-19 15:12:35 +00:00
case models.FieldKeyTagKey:
mname := tagBuf.GetString(models.MeasurementTagKey)
2019-02-18 12:47:39 +00:00
fCount := fCardinalities[mname]
if fCount == nil {
fCount = newCounterFn()
fCardinalities[mname] = fCount
fCount.Add(t.Value) // field keys associated with measurement name.
tagCount := tCardinalities[tk]
if tagCount == nil {
tagCount = newCounterFn()
tCardinalities[tk] = tagCount
minT, maxT := reader.TimeRange()
if minT < minTime {
minTime = minT
if maxT > maxTime {
maxTime = maxT
if err := reader.Close(); err != nil {
return nil, fmt.Errorf("error: %s: %v. Exiting", path, err)
2019-02-18 12:47:39 +00:00
fmt.Fprintln(tw, strings.Join([]string{
strconv.FormatInt(int64(seriesCount), 10),
strconv.FormatInt(int64(totalSeries.Count()-currentTotalCount), 10),
time.Unix(0, minT).UTC().Format(time.RFC3339Nano),
time.Unix(0, maxT).UTC().Format(time.RFC3339Nano),
}, "\t"))
if r.Detailed {
if err := tw.Flush(); err != nil {
return nil, err
2019-02-18 12:47:39 +00:00
if err := tw.Flush(); err != nil {
return nil, err
2019-02-18 12:47:39 +00:00
summary := newReportSummary()
summary.Min = minTime
summary.Max = maxTime
summary.Total = totalSeries.Count()
2019-02-18 12:47:39 +00:00
fmt.Printf(" Files: %d (%d skipped)\n", processedFiles, len(files)-processedFiles)
fmt.Printf(" Series Cardinality%s: %d\n", estTitle, totalSeries.Count())
fmt.Printf(" Time Range: %s - %s\n",
time.Unix(0, minTime).UTC().Format(time.RFC3339Nano),
time.Unix(0, maxTime).UTC().Format(time.RFC3339Nano),
fmt.Printf(" Duration: %s \n", time.Unix(0, maxTime).Sub(time.Unix(0, minTime)))
fmt.Printf(" Organizations (%d):\n", len(orgCardinalities))
for _, org := range sortKeys(orgCardinalities) {
cardinality := orgCardinalities[org].Count()
summary.Organizations[org] = cardinality
fmt.Printf(" - %s: %d%s (%d%%)\n", org, cardinality, estTitle, int(float64(cardinality)/float64(totalSeries.Count())*100))
2019-02-18 12:47:39 +00:00
fmt.Printf(" Total%s: %d\n", estTitle, totalSeries.Count())
fmt.Printf(" \n Buckets (%d):\n", len(bucketCardinalities))
for _, bucket := range sortKeys(bucketCardinalities) {
cardinality := bucketCardinalities[bucket].Count()
summary.Buckets[bucket] = cardinality
fmt.Printf(" - %s: %d%s (%d%%)\n", bucket, cardinality, estTitle, int(float64(cardinality)/float64(totalSeries.Count())*100))
2019-02-18 12:47:39 +00:00
fmt.Printf(" Total%s: %d\n", estTitle, totalSeries.Count())
if r.Detailed {
fmt.Printf("\n Series By Measurements (%d):\n", len(mCardinalities))
for _, mname := range sortKeys(mCardinalities) {
cardinality := mCardinalities[mname].Count()
summary.Measurements[mname] = cardinality
fmt.Printf(" - %v: %d%s (%d%%)\n", mname, cardinality, estTitle, int((float64(cardinality)/float64(totalSeries.Count()))*100))
2019-02-18 12:47:39 +00:00
fmt.Printf("\n Fields By Measurements (%d):\n", len(fCardinalities))
for _, mname := range sortKeys(fCardinalities) {
cardinality := fCardinalities[mname].Count()
summary.FieldKeys[mname] = cardinality
fmt.Printf(" - %v: %d%s\n", mname, cardinality, estTitle)
2019-02-18 12:47:39 +00:00
fmt.Printf("\n Tag Values By Tag Keys (%d):\n", len(tCardinalities))
for _, tkey := range sortKeys(tCardinalities) {
cardinality := tCardinalities[tkey].Count()
summary.TagKeys[tkey] = cardinality
fmt.Printf(" - %v: %d%s\n", tkey, cardinality, estTitle)
2019-02-18 12:47:39 +00:00
fmt.Printf("\nCompleted in %s\n", time.Since(start))
return summary, nil
2019-02-18 12:47:39 +00:00
// sortKeys is a quick helper to return the sorted set of a map's keys
func sortKeys(vals map[string]counter) (keys []string) {
for k := range vals {
keys = append(keys, k)
return keys
// counter abstracts a a method of counting keys.
type counter interface {
Add(key []byte)
Count() uint64
// newHLLCounter returns an approximate counter using HyperLogLogs for cardinality estimation.
func newHLLCounter() counter {
return hll.NewDefaultPlus()
// exactCounter returns an exact count for keys using counting all distinct items in a set.
type exactCounter struct {
m map[string]struct{}
func (c *exactCounter) Add(key []byte) {
c.m[string(key)] = struct{}{}
func (c *exactCounter) Count() uint64 {
return uint64(len(c.m))
func newExactCounter() counter {
return &exactCounter{
m: make(map[string]struct{}),