Merge pull request #5287 from influxdata/c_fixes

Ignore points without fields
pull/5130/merge
Philip O'Toole 2016-01-06 13:54:33 -08:00
commit db7c133f3c
3 changed files with 28 additions and 11 deletions

View File

@ -3,6 +3,7 @@ package b1
import (
"encoding/binary"
"sort"
"sync/atomic"
"time"
"github.com/boltdb/bolt"
@ -12,6 +13,8 @@ import (
const DefaultChunkSize = 1000
var NoFieldsFiltered uint64
// Reader is used to read all data from a b1 shard.
type Reader struct {
path string
@ -96,7 +99,12 @@ func (r *Reader) Open() error {
}
measurement := tsdb.MeasurementFromSeriesKey(s)
for _, f := range r.fields[tsdb.MeasurementFromSeriesKey(s)].Fields {
fields := r.fields[tsdb.MeasurementFromSeriesKey(s)]
if fields == nil {
atomic.AddUint64(&NoFieldsFiltered, 1)
continue
}
for _, f := range fields.Fields {
c := newCursor(r.tx, s, f.Name, r.codecs[measurement])
c.SeekTo(0)
r.cursors = append(r.cursors, c)

View File

@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"sort"
"sync/atomic"
"time"
"github.com/boltdb/bolt"
@ -16,6 +17,8 @@ import (
const DefaultChunkSize = 1000
var NoFieldsFiltered uint64
// Reader is used to read all data from a bz1 shard.
type Reader struct {
path string
@ -110,7 +113,12 @@ func (r *Reader) Open() error {
}
measurement := tsdb.MeasurementFromSeriesKey(s)
for _, f := range r.fields[tsdb.MeasurementFromSeriesKey(s)].Fields {
fields := r.fields[tsdb.MeasurementFromSeriesKey(s)]
if fields == nil {
atomic.AddUint64(&NoFieldsFiltered, 1)
continue
}
for _, f := range fields.Fields {
c := newCursor(r.tx, s, f.Name, r.codecs[measurement])
if c == nil {
continue

View File

@ -201,15 +201,16 @@ func main() {
pg.Wait()
// Dump stats.
fmt.Printf("\nSummary statistics\n=========================\n")
fmt.Printf("Databases converted: %d\n", len(tsdb.ShardInfos(shards).Databases()))
fmt.Printf("Shards converted: %d\n", len(shards))
fmt.Printf("TSM files created: %d\n", TsmFilesCreated)
fmt.Printf("Points read: %d\n", PointsRead)
fmt.Printf("Points written: %d\n", PointsWritten)
fmt.Printf("NaN filtered: %d\n", NanFiltered)
fmt.Printf("Inf filtered: %d\n", InfFiltered)
fmt.Printf("Total conversion time: %v\n", time.Now().Sub(conversionStart))
fmt.Printf("\nSummary statistics\n========================================\n")
fmt.Printf("Databases converted: %d\n", len(tsdb.ShardInfos(shards).Databases()))
fmt.Printf("Shards converted: %d\n", len(shards))
fmt.Printf("TSM files created: %d\n", TsmFilesCreated)
fmt.Printf("Points read: %d\n", PointsRead)
fmt.Printf("Points written: %d\n", PointsWritten)
fmt.Printf("NaN filtered: %d\n", NanFiltered)
fmt.Printf("Inf filtered: %d\n", InfFiltered)
fmt.Printf("Points without fields filtered: %d\n", b1.NoFieldsFiltered+bz1.NoFieldsFiltered)
fmt.Printf("Total conversion time: %v\n", time.Now().Sub(conversionStart))
fmt.Println()
}