Merge pull request #5280 from influxdata/c_fixes
Gather conversion stats and filter NaN and Infpull/5236/merge
commit
53afa0addc
|
@ -2,12 +2,22 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
NanFiltered uint64
|
||||||
|
InfFiltered uint64
|
||||||
|
PointsWritten uint64
|
||||||
|
PointsRead uint64
|
||||||
|
TsmFilesCreated uint64
|
||||||
|
)
|
||||||
|
|
||||||
type KeyIterator interface {
|
type KeyIterator interface {
|
||||||
Next() bool
|
Next() bool
|
||||||
Read() (string, []tsm1.Value, error)
|
Read() (string, []tsm1.Value, error)
|
||||||
|
@ -39,6 +49,7 @@ func (c *Converter) Process(iter KeyIterator) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
atomic.AddUint64(&TsmFilesCreated, 1)
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
|
|
||||||
// Iterate until no more data remains.
|
// Iterate until no more data remains.
|
||||||
|
@ -47,11 +58,17 @@ func (c *Converter) Process(iter KeyIterator) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.Write(k, v)
|
|
||||||
|
scrubbed := scrubValues(v)
|
||||||
|
if err := w.Write(k, scrubbed); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
atomic.AddUint64(&PointsRead, uint64(len(v)))
|
||||||
|
atomic.AddUint64(&PointsWritten, uint64(len(scrubbed)))
|
||||||
|
|
||||||
// If we have a max file size configured and we're over it, start a new TSM file.
|
// If we have a max file size configured and we're over it, start a new TSM file.
|
||||||
if w.Size() > c.maxTSMFileSize {
|
if w.Size() > c.maxTSMFileSize {
|
||||||
if err := w.WriteIndex(); err != nil {
|
if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := w.Close(); err != nil {
|
if err := w.Close(); err != nil {
|
||||||
|
@ -62,11 +79,15 @@ func (c *Converter) Process(iter KeyIterator) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
atomic.AddUint64(&TsmFilesCreated, 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// All done!
|
// All done!
|
||||||
return w.WriteIndex()
|
if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// nextTSMWriter returns the next TSMWriter for the Converter.
|
// nextTSMWriter returns the next TSMWriter for the Converter.
|
||||||
|
@ -87,3 +108,46 @@ func (c *Converter) nextTSMWriter() (tsm1.TSMWriter, error) {
|
||||||
|
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// scrubValues takes a slice and removes float64 NaN and Inf. If neither is
|
||||||
|
// present in the slice, the original slice is returned. This is to avoid
|
||||||
|
// copying slices unnecessarily.
|
||||||
|
func scrubValues(values []tsm1.Value) []tsm1.Value {
|
||||||
|
var scrubbed []tsm1.Value
|
||||||
|
|
||||||
|
if values == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, v := range values {
|
||||||
|
if f, ok := v.Value().(float64); ok {
|
||||||
|
var filter bool
|
||||||
|
if math.IsNaN(f) {
|
||||||
|
filter = true
|
||||||
|
atomic.AddUint64(&NanFiltered, 1)
|
||||||
|
}
|
||||||
|
if math.IsInf(f, 0) {
|
||||||
|
filter = true
|
||||||
|
atomic.AddUint64(&InfFiltered, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
if filter {
|
||||||
|
if scrubbed == nil {
|
||||||
|
// Take every value up to the NaN, indicating that scrubbed
|
||||||
|
// should now be used.
|
||||||
|
scrubbed = values[:i]
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if scrubbed != nil {
|
||||||
|
// We've filtered at least 1 value, so add value to filtered slice.
|
||||||
|
scrubbed = append(scrubbed, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if scrubbed != nil {
|
||||||
|
return scrubbed
|
||||||
|
}
|
||||||
|
return values
|
||||||
|
}
|
||||||
|
|
|
@ -155,8 +155,10 @@ func main() {
|
||||||
fmt.Println("Conversion starting....")
|
fmt.Println("Conversion starting....")
|
||||||
|
|
||||||
// Backup each directory.
|
// Backup each directory.
|
||||||
|
conversionStart := time.Now()
|
||||||
if !disBack {
|
if !disBack {
|
||||||
databases := tsdb.ShardInfos(shards).Databases()
|
databases := tsdb.ShardInfos(shards).Databases()
|
||||||
|
fmt.Printf("Backing up %d databases...\n", len(databases))
|
||||||
if parallel {
|
if parallel {
|
||||||
pg = NewParallelGroup(len(databases))
|
pg = NewParallelGroup(len(databases))
|
||||||
}
|
}
|
||||||
|
@ -197,6 +199,18 @@ func main() {
|
||||||
}(si)
|
}(si)
|
||||||
}
|
}
|
||||||
pg.Wait()
|
pg.Wait()
|
||||||
|
|
||||||
|
// Dump stats.
|
||||||
|
fmt.Printf("\nSummary statistics\n=========================\n")
|
||||||
|
fmt.Printf("Databases converted: %d\n", len(tsdb.ShardInfos(shards).Databases()))
|
||||||
|
fmt.Printf("Shards converted: %d\n", len(shards))
|
||||||
|
fmt.Printf("TSM files created: %d\n", TsmFilesCreated)
|
||||||
|
fmt.Printf("Points read: %d\n", PointsRead)
|
||||||
|
fmt.Printf("Points written: %d\n", PointsWritten)
|
||||||
|
fmt.Printf("NaN filtered: %d\n", NanFiltered)
|
||||||
|
fmt.Printf("Inf filtered: %d\n", InfFiltered)
|
||||||
|
fmt.Printf("Total conversion time: %v\n", time.Now().Sub(conversionStart))
|
||||||
|
fmt.Println()
|
||||||
}
|
}
|
||||||
|
|
||||||
// backupDatabase backs up the database at src.
|
// backupDatabase backs up the database at src.
|
||||||
|
|
Loading…
Reference in New Issue