Gather conversion stats

pull/5280/head
Philip O'Toole 2016-01-05 16:28:19 -08:00
parent cac96113c0
commit 075ef45ae1
2 changed files with 80 additions and 3 deletions

View File

@ -2,12 +2,22 @@ package main
import (
"fmt"
"math"
"os"
"path/filepath"
"sync/atomic"
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
)
var (
NanFiltered uint64
InfFiltered uint64
PointsWritten uint64
PointsRead uint64
TsmFilesCreated uint64
)
type KeyIterator interface {
Next() bool
Read() (string, []tsm1.Value, error)
@ -39,6 +49,7 @@ func (c *Converter) Process(iter KeyIterator) error {
if err != nil {
return err
}
atomic.AddUint64(&TsmFilesCreated, 1)
defer w.Close()
// Iterate until no more data remains.
@ -47,11 +58,17 @@ func (c *Converter) Process(iter KeyIterator) error {
if err != nil {
return err
}
w.Write(k, v)
scrubbed := scrubValues(v)
if err := w.Write(k, scrubbed); err != nil {
return err
}
atomic.AddUint64(&PointsRead, uint64(len(v)))
atomic.AddUint64(&PointsWritten, uint64(len(scrubbed)))
// If we have a max file size configured and we're over it, start a new TSM file.
if w.Size() > c.maxTSMFileSize {
if err := w.WriteIndex(); err != nil {
if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues {
return err
}
if err := w.Close(); err != nil {
@ -62,11 +79,15 @@ func (c *Converter) Process(iter KeyIterator) error {
if err != nil {
return err
}
atomic.AddUint64(&TsmFilesCreated, 1)
}
}
// All done!
return w.WriteIndex()
if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues {
return err
}
return nil
}
// nextTSMWriter returns the next TSMWriter for the Converter.
@ -87,3 +108,46 @@ func (c *Converter) nextTSMWriter() (tsm1.TSMWriter, error) {
return w, nil
}
// scrubValues takes a slice and removes float64 NaN and Inf. If neither is
// present in the slice, the original slice is returned. This is to avoid
// copying slices unnecessarily.
func scrubValues(values []tsm1.Value) []tsm1.Value {
var scrubbed []tsm1.Value
if values == nil {
return nil
}
for i, v := range values {
if f, ok := v.Value().(float64); ok {
var filter bool
if math.IsNaN(f) {
filter = true
atomic.AddUint64(&NanFiltered, 1)
}
if math.IsInf(f, 0) {
filter = true
atomic.AddUint64(&InfFiltered, 1)
}
if filter {
if scrubbed == nil {
// Take every value up to the NaN, indicating that scrubbed
// should now be used.
scrubbed = values[:i]
}
} else {
if scrubbed != nil {
// We've filtered at least 1 value, so add value to filtered slice.
scrubbed = append(scrubbed, v)
}
}
}
}
if scrubbed != nil {
return scrubbed
}
return values
}

View File

@ -155,6 +155,7 @@ func main() {
fmt.Println("Conversion starting....")
// Backup each directory.
conversionStart := time.Now()
if !disBack {
databases := tsdb.ShardInfos(shards).Databases()
if parallel {
@ -197,6 +198,18 @@ func main() {
}(si)
}
pg.Wait()
// Dump stats.
fmt.Printf("\nSummary statistics\n=========================\n")
fmt.Printf("Databases converted: %d\n", len(tsdb.ShardInfos(shards).Databases()))
fmt.Printf("Shards converted: %d\n", len(shards))
fmt.Printf("TSM files created: %d\n", TsmFilesCreated)
fmt.Printf("Points read: %d\n", PointsRead)
fmt.Printf("Points written: %d\n", PointsWritten)
fmt.Printf("NaN filtered: %d\n", NanFiltered)
fmt.Printf("Inf filtered: %d\n", InfFiltered)
fmt.Printf("Total conversion time: %v\n", time.Now().Sub(conversionStart))
fmt.Println()
}
// backupDatabase backs up the database at src.