diff --git a/cmd/influx_tsm/converter.go b/cmd/influx_tsm/converter.go index 995d578384..e4aaeff781 100644 --- a/cmd/influx_tsm/converter.go +++ b/cmd/influx_tsm/converter.go @@ -2,12 +2,22 @@ package main import ( "fmt" + "math" "os" "path/filepath" + "sync/atomic" "github.com/influxdb/influxdb/tsdb/engine/tsm1" ) +var ( + NanFiltered uint64 + InfFiltered uint64 + PointsWritten uint64 + PointsRead uint64 + TsmFilesCreated uint64 +) + type KeyIterator interface { Next() bool Read() (string, []tsm1.Value, error) @@ -39,6 +49,7 @@ func (c *Converter) Process(iter KeyIterator) error { if err != nil { return err } + atomic.AddUint64(&TsmFilesCreated, 1) defer w.Close() // Iterate until no more data remains. @@ -47,11 +58,17 @@ func (c *Converter) Process(iter KeyIterator) error { if err != nil { return err } - w.Write(k, v) + + scrubbed := scrubValues(v) + if err := w.Write(k, scrubbed); err != nil { + return err + } + atomic.AddUint64(&PointsRead, uint64(len(v))) + atomic.AddUint64(&PointsWritten, uint64(len(scrubbed))) // If we have a max file size configured and we're over it, start a new TSM file. if w.Size() > c.maxTSMFileSize { - if err := w.WriteIndex(); err != nil { + if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues { return err } if err := w.Close(); err != nil { @@ -62,11 +79,15 @@ func (c *Converter) Process(iter KeyIterator) error { if err != nil { return err } + atomic.AddUint64(&TsmFilesCreated, 1) } } // All done! - return w.WriteIndex() + if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues { + return err + } + return nil } // nextTSMWriter returns the next TSMWriter for the Converter. @@ -87,3 +108,46 @@ func (c *Converter) nextTSMWriter() (tsm1.TSMWriter, error) { return w, nil } + +// scrubValues takes a slice and removes float64 NaN and Inf. If neither is +// present in the slice, the original slice is returned. This is to avoid +// copying slices unnecessarily. +func scrubValues(values []tsm1.Value) []tsm1.Value { + var scrubbed []tsm1.Value + + if values == nil { + return nil + } + + for i, v := range values { + if f, ok := v.Value().(float64); ok { + var filter bool + if math.IsNaN(f) { + filter = true + atomic.AddUint64(&NanFiltered, 1) + } + if math.IsInf(f, 0) { + filter = true + atomic.AddUint64(&InfFiltered, 1) + } + + if filter { + if scrubbed == nil { + // Take every value up to the NaN, indicating that scrubbed + // should now be used. + scrubbed = values[:i] + } + } else { + if scrubbed != nil { + // We've filtered at least 1 value, so add value to filtered slice. + scrubbed = append(scrubbed, v) + } + } + } + } + + if scrubbed != nil { + return scrubbed + } + return values +} diff --git a/cmd/influx_tsm/main.go b/cmd/influx_tsm/main.go index afeaddb841..6b2bbe2af4 100644 --- a/cmd/influx_tsm/main.go +++ b/cmd/influx_tsm/main.go @@ -155,8 +155,10 @@ func main() { fmt.Println("Conversion starting....") // Backup each directory. + conversionStart := time.Now() if !disBack { databases := tsdb.ShardInfos(shards).Databases() + fmt.Printf("Backing up %d databases...\n", len(databases)) if parallel { pg = NewParallelGroup(len(databases)) } @@ -197,6 +199,18 @@ func main() { }(si) } pg.Wait() + + // Dump stats. + fmt.Printf("\nSummary statistics\n=========================\n") + fmt.Printf("Databases converted: %d\n", len(tsdb.ShardInfos(shards).Databases())) + fmt.Printf("Shards converted: %d\n", len(shards)) + fmt.Printf("TSM files created: %d\n", TsmFilesCreated) + fmt.Printf("Points read: %d\n", PointsRead) + fmt.Printf("Points written: %d\n", PointsWritten) + fmt.Printf("NaN filtered: %d\n", NanFiltered) + fmt.Printf("Inf filtered: %d\n", InfFiltered) + fmt.Printf("Total conversion time: %v\n", time.Now().Sub(conversionStart)) + fmt.Println() } // backupDatabase backs up the database at src.