diff --git a/cmd/influx_tsm/b1/reader.go b/cmd/influx_tsm/b1/reader.go index 17f1f16857..16a46c82d6 100644 --- a/cmd/influx_tsm/b1/reader.go +++ b/cmd/influx_tsm/b1/reader.go @@ -2,20 +2,17 @@ package b1 // import "github.com/influxdata/influxdb/cmd/influx_tsm/b1" import ( "encoding/binary" + "math" "sort" - "sync/atomic" "time" "github.com/boltdb/bolt" + "github.com/influxdata/influxdb/cmd/influx_tsm/stats" "github.com/influxdata/influxdb/cmd/influx_tsm/tsdb" "github.com/influxdata/influxdb/tsdb/engine/tsm1" ) -// DefaultChunkSize is the size of chunks read from the b1 shard -const DefaultChunkSize = 1000 - -// NoFieldsFiltered is the number of nil fields filtered -var NoFieldsFiltered uint64 +const DefaultChunkSize int = 1000 var excludedBuckets = map[string]bool{ "fields": true, @@ -34,21 +31,37 @@ type Reader struct { currCursor int keyBuf string - valuesBuf []tsm1.Value + tsmValues []tsm1.Value + values []tsdb.Value + valuePos int fields map[string]*tsdb.MeasurementFields codecs map[string]*tsdb.FieldCodec - ChunkSize int + stats *stats.Stats } // NewReader returns a reader for the b1 shard at path. -func NewReader(path string) *Reader { - return &Reader{ +func NewReader(path string, stats *stats.Stats, chunkSize int) *Reader { + r := &Reader{ path: path, fields: make(map[string]*tsdb.MeasurementFields), codecs: make(map[string]*tsdb.FieldCodec), + stats: stats, } + + if chunkSize <= 0 { + chunkSize = DefaultChunkSize + } + + // known-sized slice of a known type, in a contiguous chunk + r.values = make([]tsdb.Value, chunkSize) + r.tsmValues = make([]tsm1.Value, len(r.values)) + for i := range r.values { + r.tsmValues[i] = &r.values[i] + } + + return r } // Open opens the reader. @@ -104,7 +117,7 @@ func (r *Reader) Open() error { measurement := tsdb.MeasurementFromSeriesKey(s) fields := r.fields[measurement] if fields == nil { - atomic.AddUint64(&NoFieldsFiltered, 1) + r.stats.IncrFiltered() continue } for _, f := range fields.Fields { @@ -121,44 +134,61 @@ func (r *Reader) Open() error { // Next returns whether any data remains to be read. It must be called before // the next call to Read(). func (r *Reader) Next() bool { + r.valuePos = 0 +OUTER: for { - if r.currCursor == len(r.cursors) { + if r.currCursor >= len(r.cursors) { // All cursors drained. No more data remains. return false } cc := r.cursors[r.currCursor] - k, v := cc.Next() - if k == -1 { - // Go to next cursor and try again. - r.currCursor++ - if len(r.valuesBuf) == 0 { - // The previous cursor had no data. Instead of returning - // just go immediately to the next cursor. - continue - } - // There is some data available. Indicate that it should be read. - return true - } - r.keyBuf = tsm1.SeriesFieldKey(cc.series, cc.field) - r.valuesBuf = append(r.valuesBuf, tsdb.ConvertToValue(k, v)) - if len(r.valuesBuf) == r.ChunkSize { - return true + + for { + k, v := cc.Next() + if k == -1 { + // Go to next cursor and try again. + r.currCursor++ + if r.valuePos == 0 { + // The previous cursor had no data. Instead of returning + // just go immediately to the next cursor. + continue OUTER + } + // There is some data available. Indicate that it should be read. + return true + } + + if f, ok := v.(float64); ok { + if math.IsInf(f, 0) { + r.stats.AddPointsRead(1) + r.stats.IncrInf() + continue + } + + if math.IsNaN(f) { + r.stats.AddPointsRead(1) + r.stats.IncrNaN() + continue + } + } + + r.values[r.valuePos].T = k + r.values[r.valuePos].Val = v + r.valuePos++ + + if r.valuePos >= len(r.values) { + return true + } } } - } // Read returns the next chunk of data in the shard, converted to tsm1 values. Data is // emitted completely for every field, in every series, before the next field is processed. // Data from Read() adheres to the requirements for writing to tsm1 shards func (r *Reader) Read() (string, []tsm1.Value, error) { - defer func() { - r.valuesBuf = nil - }() - - return r.keyBuf, r.valuesBuf, nil + return r.keyBuf, r.tsmValues[:r.valuePos], nil } // Close closes the reader. @@ -198,7 +228,7 @@ func newCursor(tx *bolt.Tx, series string, field string, dec *tsdb.FieldCodec) * } // Seek moves the cursor to a position. -func (c cursor) SeekTo(seek int64) { +func (c *cursor) SeekTo(seek int64) { var seekBytes [8]byte binary.BigEndian.PutUint64(seekBytes[:], uint64(seek)) k, v := c.cursor.Seek(seekBytes[:]) @@ -238,5 +268,8 @@ type cursors []*cursor func (a cursors) Len() int { return len(a) } func (a cursors) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a cursors) Less(i, j int) bool { - return tsm1.SeriesFieldKey(a[i].series, a[i].field) < tsm1.SeriesFieldKey(a[j].series, a[j].field) + if a[i].series == a[j].series { + return a[i].field < a[j].field + } + return a[i].series < a[j].series } diff --git a/cmd/influx_tsm/bz1/reader.go b/cmd/influx_tsm/bz1/reader.go index 2946813dd1..d207a1a07f 100644 --- a/cmd/influx_tsm/bz1/reader.go +++ b/cmd/influx_tsm/bz1/reader.go @@ -5,22 +5,20 @@ import ( "encoding/binary" "encoding/json" "fmt" + "math" "sort" - "sync/atomic" "time" "github.com/boltdb/bolt" "github.com/golang/snappy" + "github.com/influxdata/influxdb/cmd/influx_tsm/stats" "github.com/influxdata/influxdb/cmd/influx_tsm/tsdb" - tsm "github.com/influxdata/influxdb/tsdb/engine/tsm1" + "github.com/influxdata/influxdb/tsdb/engine/tsm1" ) // DefaultChunkSize is the size of chunks read from the bz1 shard const DefaultChunkSize = 1000 -// NoFieldsFiltered is the number of nil fields filtered -var NoFieldsFiltered uint64 - // Reader is used to read all data from a bz1 shard. type Reader struct { path string @@ -31,22 +29,37 @@ type Reader struct { currCursor int keyBuf string - valuesBuf []tsm.Value + tsmValues []tsm1.Value + values []tsdb.Value + valuePos int fields map[string]*tsdb.MeasurementFields codecs map[string]*tsdb.FieldCodec - ChunkSize int + stats *stats.Stats } // NewReader returns a reader for the bz1 shard at path. -func NewReader(path string) *Reader { - return &Reader{ - path: path, - fields: make(map[string]*tsdb.MeasurementFields), - codecs: make(map[string]*tsdb.FieldCodec), - ChunkSize: DefaultChunkSize, +func NewReader(path string, stats *stats.Stats, chunkSize int) *Reader { + r := &Reader{ + path: path, + fields: make(map[string]*tsdb.MeasurementFields), + codecs: make(map[string]*tsdb.FieldCodec), + stats: stats, } + + if chunkSize <= 0 { + chunkSize = DefaultChunkSize + } + + // known-sized slice of a known type, in a contiguous chunk + r.values = make([]tsdb.Value, chunkSize) + r.tsmValues = make([]tsm1.Value, len(r.values)) + for i := range r.values { + r.tsmValues[i] = &r.values[i] + } + + return r } // Open opens the reader. @@ -114,7 +127,7 @@ func (r *Reader) Open() error { measurement := tsdb.MeasurementFromSeriesKey(s) fields := r.fields[measurement] if fields == nil { - atomic.AddUint64(&NoFieldsFiltered, 1) + r.stats.IncrFiltered() continue } for _, f := range fields.Fields { @@ -133,30 +146,52 @@ func (r *Reader) Open() error { // Next returns whether there is any more data to be read. func (r *Reader) Next() bool { + r.valuePos = 0 +OUTER: for { - if r.currCursor == len(r.cursors) { + if r.currCursor >= len(r.cursors) { // All cursors drained. No more data remains. return false } cc := r.cursors[r.currCursor] - k, v := cc.Next() - if k == -1 { - // Go to next cursor and try again. - r.currCursor++ - if len(r.valuesBuf) == 0 { - // The previous cursor had no data. Instead of returning - // just go immediately to the next cursor. - continue - } - // There is some data available. Indicate that it should be read. - return true - } + r.keyBuf = tsm1.SeriesFieldKey(cc.series, cc.field) - r.keyBuf = tsm.SeriesFieldKey(cc.series, cc.field) - r.valuesBuf = append(r.valuesBuf, tsdb.ConvertToValue(k, v)) - if len(r.valuesBuf) == r.ChunkSize { - return true + for { + k, v := cc.Next() + if k == -1 { + // Go to next cursor and try again. + r.currCursor++ + if r.valuePos == 0 { + // The previous cursor had no data. Instead of returning + // just go immediately to the next cursor. + continue OUTER + } + // There is some data available. Indicate that it should be read. + return true + } + + if f, ok := v.(float64); ok { + if math.IsInf(f, 0) { + r.stats.AddPointsRead(1) + r.stats.IncrInf() + continue + } + + if math.IsNaN(f) { + r.stats.AddPointsRead(1) + r.stats.IncrNaN() + continue + } + } + + r.values[r.valuePos].T = k + r.values[r.valuePos].Val = v + r.valuePos++ + + if r.valuePos >= len(r.values) { + return true + } } } } @@ -164,12 +199,8 @@ func (r *Reader) Next() bool { // Read returns the next chunk of data in the shard, converted to tsm1 values. Data is // emitted completely for every field, in every series, before the next field is processed. // Data from Read() adheres to the requirements for writing to tsm1 shards -func (r *Reader) Read() (string, []tsm.Value, error) { - defer func() { - r.valuesBuf = nil - }() - - return r.keyBuf, r.valuesBuf, nil +func (r *Reader) Read() (string, []tsm1.Value, error) { + return r.keyBuf, r.tsmValues[:r.valuePos], nil } // Close closes the reader. @@ -333,7 +364,10 @@ type cursors []*cursor func (a cursors) Len() int { return len(a) } func (a cursors) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a cursors) Less(i, j int) bool { - return tsm.SeriesFieldKey(a[i].series, a[i].field) < tsm.SeriesFieldKey(a[j].series, a[j].field) + if a[i].series == a[j].series { + return a[i].field < a[j].field + } + return a[i].series < a[j].series } // entryHeaderSize is the number of bytes required for the header. diff --git a/cmd/influx_tsm/converter.go b/cmd/influx_tsm/converter.go index 220ad2110f..f289827296 100644 --- a/cmd/influx_tsm/converter.go +++ b/cmd/influx_tsm/converter.go @@ -2,10 +2,10 @@ package main import ( "fmt" - "math" "os" "path/filepath" + "github.com/influxdata/influxdb/cmd/influx_tsm/stats" "github.com/influxdata/influxdb/tsdb/engine/tsm1" ) @@ -20,15 +20,15 @@ type Converter struct { path string maxTSMFileSize uint32 sequence int - tracker *tracker + stats *stats.Stats } // NewConverter returns a new instance of the Converter. -func NewConverter(path string, sz uint32, t *tracker) *Converter { +func NewConverter(path string, sz uint32, stats *stats.Stats) *Converter { return &Converter{ path: path, maxTSMFileSize: sz, - tracker: t, + stats: stats, } } @@ -46,7 +46,6 @@ func (c *Converter) Process(iter KeyIterator) error { if err != nil { return err } - scrubbed := c.scrubValues(v) if w == nil { w, err = c.nextTSMWriter() @@ -54,12 +53,12 @@ func (c *Converter) Process(iter KeyIterator) error { return err } } - if err := w.Write(k, scrubbed); err != nil { + if err := w.Write(k, v); err != nil { return err } - c.tracker.AddPointsRead(len(v)) - c.tracker.AddPointsWritten(len(scrubbed)) + c.stats.AddPointsRead(len(v)) + c.stats.AddPointsWritten(len(v)) // If we have a max file size configured and we're over it, start a new TSM file. if w.Size() > c.maxTSMFileSize { @@ -67,7 +66,7 @@ func (c *Converter) Process(iter KeyIterator) error { return err } - c.tracker.AddTSMBytes(w.Size()) + c.stats.AddTSMBytes(w.Size()) if err := w.Close(); err != nil { return err @@ -80,7 +79,7 @@ func (c *Converter) Process(iter KeyIterator) error { if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues { return err } - c.tracker.AddTSMBytes(w.Size()) + c.stats.AddTSMBytes(w.Size()) if err := w.Close(); err != nil { return err @@ -106,49 +105,6 @@ func (c *Converter) nextTSMWriter() (tsm1.TSMWriter, error) { return nil, err } - c.tracker.IncrTSMFileCount() + c.stats.IncrTSMFileCount() return w, nil } - -// scrubValues takes a slice and removes float64 NaN and Inf. If neither is -// present in the slice, the original slice is returned. This is to avoid -// copying slices unnecessarily. -func (c *Converter) scrubValues(values []tsm1.Value) []tsm1.Value { - var scrubbed []tsm1.Value - - if values == nil { - return nil - } - - for i, v := range values { - if f, ok := v.Value().(float64); ok { - var filter bool - if math.IsNaN(f) { - filter = true - c.tracker.IncrNaN() - } - if math.IsInf(f, 0) { - filter = true - c.tracker.IncrInf() - } - - if filter { - if scrubbed == nil { - // Take every value up to the NaN, indicating that scrubbed - // should now be used. - scrubbed = values[:i] - } - } else { - if scrubbed != nil { - // We've filtered at least 1 value, so add value to filtered slice. - scrubbed = append(scrubbed, v) - } - } - } - } - - if scrubbed != nil { - return scrubbed - } - return values -} diff --git a/cmd/influx_tsm/converter_test.go b/cmd/influx_tsm/converter_test.go deleted file mode 100644 index 05bba5bcf6..0000000000 --- a/cmd/influx_tsm/converter_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package main - -import ( - "fmt" - "math" - "reflect" - "strings" - "testing" - - "github.com/influxdata/influxdb/tsdb/engine/tsm1" -) - -func TestScrubValues(t *testing.T) { - dummy := Converter{ - tracker: new(tracker), - } - - var epoch int64 - simple := []tsm1.Value{tsm1.NewValue(epoch, 1.0)} - - for _, tt := range []struct { - input, expected []tsm1.Value - }{ - { - input: simple, - expected: simple, - }, { - input: []tsm1.Value{simple[0], tsm1.NewValue(epoch, math.NaN())}, - expected: simple, - }, { - input: []tsm1.Value{simple[0], tsm1.NewValue(epoch, math.Inf(-1))}, - expected: simple, - }, { - input: []tsm1.Value{simple[0], tsm1.NewValue(epoch, math.Inf(1)), tsm1.NewValue(epoch, math.NaN())}, - expected: simple, - }, - } { - out := dummy.scrubValues(tt.input) - if !reflect.DeepEqual(out, tt.expected) { - t.Errorf("Failed to scrub '%s': Got '%s', Expected '%s'", pretty(tt.input), pretty(out), pretty(tt.expected)) - } - } -} - -func pretty(vals []tsm1.Value) string { - if len(vals) == 0 { - return "[]" - } - - strs := make([]string, len(vals)) - - for i := range vals { - strs[i] = fmt.Sprintf("{%v: %v}", vals[i].UnixNano(), vals[i].Value()) - } - - return "[" + strings.Join(strs, ", ") + "]" -} diff --git a/cmd/influx_tsm/main.go b/cmd/influx_tsm/main.go index a48966ac77..ebe884acbc 100644 --- a/cmd/influx_tsm/main.go +++ b/cmd/influx_tsm/main.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "runtime" + "runtime/pprof" "sort" "strings" "text/tabwriter" @@ -55,6 +56,8 @@ type options struct { Parallel bool SkipBackup bool UpdateInterval time.Duration + Yes bool + CpuFile string } func (o *options) Parse() error { @@ -69,6 +72,8 @@ func (o *options) Parse() error { fs.StringVar(&opts.BackupPath, "backup", "", "The location to backup up the current databases. Must not be within the data directory.") fs.StringVar(&opts.DebugAddr, "debug", "", "If set, http debugging endpoints will be enabled on the given address") fs.DurationVar(&opts.UpdateInterval, "interval", 5*time.Second, "How often status updates are printed.") + fs.BoolVar(&opts.Yes, "y", false, "Don't ask, just convert") + fs.StringVar(&opts.CpuFile, "profile", "", "CPU Profile location") fs.Usage = func() { fmt.Fprintf(os.Stderr, "Usage: %v [options] \n", os.Args[0]) fmt.Fprintf(os.Stderr, "%v\n\nOptions:\n", description) @@ -197,19 +202,32 @@ func main() { } w.Flush() - // Get confirmation from user. - fmt.Printf("\nThese shards will be converted. Proceed? y/N: ") - liner := bufio.NewReader(os.Stdin) - yn, err := liner.ReadString('\n') - if err != nil { - log.Fatalf("failed to read response: %v", err) - } - yn = strings.TrimRight(strings.ToLower(yn), "\n") - if yn != "y" { - log.Fatal("Conversion aborted.") + if !opts.Yes { + // Get confirmation from user. + fmt.Printf("\nThese shards will be converted. Proceed? y/N: ") + liner := bufio.NewReader(os.Stdin) + yn, err := liner.ReadString('\n') + if err != nil { + log.Fatalf("failed to read response: %v", err) + } + yn = strings.TrimRight(strings.ToLower(yn), "\n") + if yn != "y" { + log.Fatal("Conversion aborted.") + } } fmt.Println("Conversion starting....") + if opts.CpuFile != "" { + f, err := os.Create(opts.CpuFile) + if err != nil { + log.Fatal(err) + } + if err = pprof.StartCPUProfile(f); err != nil { + log.Fatal(err) + } + defer pprof.StopCPUProfile() + } + tr := newTracker(shards, opts) if err := tr.Run(); err != nil { @@ -317,9 +335,9 @@ func convertShard(si *tsdb.ShardInfo, tr *tracker) error { var reader ShardReader switch si.Format { case tsdb.BZ1: - reader = bz1.NewReader(src) + reader = bz1.NewReader(src, &tr.Stats, 0) case tsdb.B1: - reader = b1.NewReader(src) + reader = b1.NewReader(src, &tr.Stats, 0) default: return fmt.Errorf("Unsupported shard format: %v", si.FormatAsString()) } @@ -329,7 +347,7 @@ func convertShard(si *tsdb.ShardInfo, tr *tracker) error { return fmt.Errorf("Failed to open %v for conversion: %v", src, err) } defer reader.Close() - converter := NewConverter(dst, uint32(opts.TSMSize), tr) + converter := NewConverter(dst, uint32(opts.TSMSize), &tr.Stats) // Perform the conversion. if err := converter.Process(reader); err != nil { diff --git a/cmd/influx_tsm/stats/stats.go b/cmd/influx_tsm/stats/stats.go new file mode 100644 index 0000000000..de1270ca2e --- /dev/null +++ b/cmd/influx_tsm/stats/stats.go @@ -0,0 +1,47 @@ +package stats + +import ( + "sync/atomic" + "time" +) + +// Stats are the statistics captured while converting non-TSM shards to TSM +type Stats struct { + NanFiltered uint64 + InfFiltered uint64 + FieldsFiltered uint64 + PointsWritten uint64 + PointsRead uint64 + TsmFilesCreated uint64 + TsmBytesWritten uint64 + CompletedShards uint64 + TotalTime time.Duration +} + +func (s *Stats) AddPointsRead(n int) { + atomic.AddUint64(&s.PointsRead, uint64(n)) +} + +func (s *Stats) AddPointsWritten(n int) { + atomic.AddUint64(&s.PointsWritten, uint64(n)) +} + +func (s *Stats) AddTSMBytes(n uint32) { + atomic.AddUint64(&s.TsmBytesWritten, uint64(n)) +} + +func (s *Stats) IncrTSMFileCount() { + atomic.AddUint64(&s.TsmFilesCreated, 1) +} + +func (s *Stats) IncrNaN() { + atomic.AddUint64(&s.NanFiltered, 1) +} + +func (s *Stats) IncrInf() { + atomic.AddUint64(&s.InfFiltered, 1) +} + +func (s *Stats) IncrFiltered() { + atomic.AddUint64(&s.FieldsFiltered, 1) +} diff --git a/cmd/influx_tsm/tracker.go b/cmd/influx_tsm/tracker.go index a3a5a37d71..9048aa7647 100644 --- a/cmd/influx_tsm/tracker.go +++ b/cmd/influx_tsm/tracker.go @@ -8,14 +8,13 @@ import ( "sync/atomic" "time" - "github.com/influxdata/influxdb/cmd/influx_tsm/b1" - "github.com/influxdata/influxdb/cmd/influx_tsm/bz1" + "github.com/influxdata/influxdb/cmd/influx_tsm/stats" "github.com/influxdata/influxdb/cmd/influx_tsm/tsdb" ) // tracker will orchestrate and track the conversions of non-TSM shards to TSM type tracker struct { - stats Stats + Stats stats.Stats shards tsdb.ShardInfos opts options @@ -24,18 +23,6 @@ type tracker struct { wg sync.WaitGroup } -// Stats are the statistics captured while converting non-TSM shards to TSM -type Stats struct { - NanFiltered uint64 - InfFiltered uint64 - PointsWritten uint64 - PointsRead uint64 - TsmFilesCreated uint64 - TsmBytesWritten uint64 - CompletedShards uint64 - TotalTime time.Duration -} - // newTracker will setup and return a clean tracker instance func newTracker(shards tsdb.ShardInfos, opts options) *tracker { t := &tracker{ @@ -47,10 +34,6 @@ func newTracker(shards tsdb.ShardInfos, opts options) *tracker { return t } -func (t *tracker) Errorf(str string, args ...interface{}) { - -} - func (t *tracker) Run() error { conversionStart := time.Now() @@ -83,7 +66,7 @@ func (t *tracker) Run() error { si := t.shards[i] go t.pg.Do(func() { defer func() { - atomic.AddUint64(&t.stats.CompletedShards, 1) + atomic.AddUint64(&t.Stats.CompletedShards, 1) t.wg.Done() }() @@ -112,60 +95,36 @@ WAIT_LOOP: } } - t.stats.TotalTime = time.Since(conversionStart) + t.Stats.TotalTime = time.Since(conversionStart) return nil } func (t *tracker) StatusUpdate() { - shardCount := atomic.LoadUint64(&t.stats.CompletedShards) - pointCount := atomic.LoadUint64(&t.stats.PointsRead) - pointWritten := atomic.LoadUint64(&t.stats.PointsWritten) + shardCount := atomic.LoadUint64(&t.Stats.CompletedShards) + pointCount := atomic.LoadUint64(&t.Stats.PointsRead) + pointWritten := atomic.LoadUint64(&t.Stats.PointsWritten) log.Printf("Still Working: Completed Shards: %d/%d Points read/written: %d/%d", shardCount, len(t.shards), pointCount, pointWritten) } func (t *tracker) PrintStats() { preSize := t.shards.Size() - postSize := int64(t.stats.TsmBytesWritten) + postSize := int64(t.Stats.TsmBytesWritten) fmt.Printf("\nSummary statistics\n========================================\n") fmt.Printf("Databases converted: %d\n", len(t.shards.Databases())) fmt.Printf("Shards converted: %d\n", len(t.shards)) - fmt.Printf("TSM files created: %d\n", t.stats.TsmFilesCreated) - fmt.Printf("Points read: %d\n", t.stats.PointsRead) - fmt.Printf("Points written: %d\n", t.stats.PointsWritten) - fmt.Printf("NaN filtered: %d\n", t.stats.NanFiltered) - fmt.Printf("Inf filtered: %d\n", t.stats.InfFiltered) - fmt.Printf("Points without fields filtered: %d\n", b1.NoFieldsFiltered+bz1.NoFieldsFiltered) + fmt.Printf("TSM files created: %d\n", t.Stats.TsmFilesCreated) + fmt.Printf("Points read: %d\n", t.Stats.PointsRead) + fmt.Printf("Points written: %d\n", t.Stats.PointsWritten) + fmt.Printf("NaN filtered: %d\n", t.Stats.NanFiltered) + fmt.Printf("Inf filtered: %d\n", t.Stats.InfFiltered) + fmt.Printf("Points without fields filtered: %d\n", t.Stats.FieldsFiltered) fmt.Printf("Disk usage pre-conversion (bytes): %d\n", preSize) fmt.Printf("Disk usage post-conversion (bytes): %d\n", postSize) fmt.Printf("Reduction factor: %d%%\n", 100*(preSize-postSize)/preSize) - fmt.Printf("Bytes per TSM point: %.2f\n", float64(postSize)/float64(t.stats.PointsWritten)) - fmt.Printf("Total conversion time: %v\n", t.stats.TotalTime) + fmt.Printf("Bytes per TSM point: %.2f\n", float64(postSize)/float64(t.Stats.PointsWritten)) + fmt.Printf("Total conversion time: %v\n", t.Stats.TotalTime) fmt.Println() } - -func (t *tracker) AddPointsRead(n int) { - atomic.AddUint64(&t.stats.PointsRead, uint64(n)) -} - -func (t *tracker) AddPointsWritten(n int) { - atomic.AddUint64(&t.stats.PointsWritten, uint64(n)) -} - -func (t *tracker) AddTSMBytes(n uint32) { - atomic.AddUint64(&t.stats.TsmBytesWritten, uint64(n)) -} - -func (t *tracker) IncrTSMFileCount() { - atomic.AddUint64(&t.stats.TsmFilesCreated, 1) -} - -func (t *tracker) IncrNaN() { - atomic.AddUint64(&t.stats.NanFiltered, 1) -} - -func (t *tracker) IncrInf() { - atomic.AddUint64(&t.stats.InfFiltered, 1) -} diff --git a/cmd/influx_tsm/tsdb/codec.go b/cmd/influx_tsm/tsdb/codec.go index a38b9d0f32..9590bb2975 100644 --- a/cmd/influx_tsm/tsdb/codec.go +++ b/cmd/influx_tsm/tsdb/codec.go @@ -2,7 +2,6 @@ package tsdb import ( "encoding/binary" - "encoding/json" "errors" "fmt" "math" @@ -40,79 +39,6 @@ func NewFieldCodec(fields map[string]*Field) *FieldCodec { return &FieldCodec{fieldsByID: fieldsByID, fieldsByName: fieldsByName} } -// EncodeFields converts a map of values with string keys to a byte slice of field -// IDs and values. -// -// If a field exists in the codec, but its type is different, an error is returned. If -// a field is not present in the codec, the system panics. -func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error) { - // Allocate byte slice - b := make([]byte, 0, 10) - - for k, v := range values { - field := f.fieldsByName[k] - if field == nil { - panic(fmt.Sprintf("field does not exist for %s", k)) - } else if influxql.InspectDataType(v) != field.Type { - return nil, fmt.Errorf("field \"%s\" is type %T, mapped as type %s", k, v, field.Type) - } - - var buf []byte - - switch field.Type { - case influxql.Float: - value := v.(float64) - buf = make([]byte, 9) - binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(value)) - case influxql.Integer: - var value uint64 - switch v.(type) { - case int: - value = uint64(v.(int)) - case int32: - value = uint64(v.(int32)) - case int64: - value = uint64(v.(int64)) - default: - panic(fmt.Sprintf("invalid integer type: %T", v)) - } - buf = make([]byte, 9) - binary.BigEndian.PutUint64(buf[1:9], value) - case influxql.Boolean: - value := v.(bool) - - // Only 1 byte need for a boolean. - buf = make([]byte, 2) - if value { - buf[1] = byte(1) - } - case influxql.String: - value := v.(string) - if len(value) > maxStringLength { - value = value[:maxStringLength] - } - // Make a buffer for field ID (1 bytes), the string length (2 bytes), and the string. - buf = make([]byte, len(value)+3) - - // Set the string length, then copy the string itself. - binary.BigEndian.PutUint16(buf[1:3], uint16(len(value))) - for i, c := range []byte(value) { - buf[i+3] = byte(c) - } - default: - panic(fmt.Sprintf("unsupported value type during encode fields: %T", v)) - } - - // Always set the field ID as the leading byte. - buf[0] = field.ID - - // Append temp buffer to the end. - b = append(b, buf...) - } - - return b, nil -} - // FieldIDByName returns the ID for the given field. func (f *FieldCodec) FieldIDByName(s string) (uint8, error) { fi := f.fieldsByName[s] @@ -122,132 +48,41 @@ func (f *FieldCodec) FieldIDByName(s string) (uint8, error) { return fi.ID, nil } -// DecodeFields decodes a byte slice into a set of field ids and values. -func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error) { - if len(b) == 0 { - return nil, nil - } - - // Create a map to hold the decoded data. - values := make(map[uint8]interface{}, 0) - - for { - if len(b) < 1 { - // No more bytes. - break - } - - // First byte is the field identifier. - fieldID := b[0] - field := f.fieldsByID[fieldID] - if field == nil { - // See note in DecodeByID() regarding field-mapping failures. - return nil, ErrFieldUnmappedID - } - - var value interface{} - switch field.Type { - case influxql.Float: - value = math.Float64frombits(binary.BigEndian.Uint64(b[1:9])) - // Move bytes forward. - b = b[9:] - case influxql.Integer: - value = int64(binary.BigEndian.Uint64(b[1:9])) - // Move bytes forward. - b = b[9:] - case influxql.Boolean: - if b[1] == 1 { - value = true - } else { - value = false - } - // Move bytes forward. - b = b[2:] - case influxql.String: - size := binary.BigEndian.Uint16(b[1:3]) - value = string(b[3 : size+3]) - // Move bytes forward. - b = b[size+3:] - default: - panic(fmt.Sprintf("unsupported value type during decode fields: %T", f.fieldsByID[fieldID])) - } - - values[fieldID] = value - } - - return values, nil -} - -// DecodeFieldsWithNames decodes a byte slice into a set of field names and values -func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error) { - fields, err := f.DecodeFields(b) - if err != nil { - return nil, err - } - m := make(map[string]interface{}) - for id, v := range fields { - field := f.fieldsByID[id] - if field != nil { - m[field.Name] = v - } - } - return m, nil -} - // DecodeByID scans a byte slice for a field with the given ID, converts it to its // expected type, and return that value. func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) { if len(b) == 0 { - return 0, ErrFieldNotFound + // No more bytes. + return nil, ErrFieldNotFound } - for { - if len(b) < 1 { - // No more bytes. - break - } - field, ok := f.fieldsByID[b[0]] - if !ok { - // This can happen, though is very unlikely. If this node receives encoded data, to be written - // to disk, and is queried for that data before its metastore is updated, there will be no field - // mapping for the data during decode. All this can happen because data is encoded by the node - // that first received the write request, not the node that actually writes the data to disk. - // So if this happens, the read must be aborted. - return 0, ErrFieldUnmappedID - } - - var value interface{} - switch field.Type { - case influxql.Float: - // Move bytes forward. - value = math.Float64frombits(binary.BigEndian.Uint64(b[1:9])) - b = b[9:] - case influxql.Integer: - value = int64(binary.BigEndian.Uint64(b[1:9])) - b = b[9:] - case influxql.Boolean: - if b[1] == 1 { - value = true - } else { - value = false - } - // Move bytes forward. - b = b[2:] - case influxql.String: - size := binary.BigEndian.Uint16(b[1:3]) - value = string(b[3 : 3+size]) - // Move bytes forward. - b = b[size+3:] - default: - panic(fmt.Sprintf("unsupported value type during decode by id: %T", field.Type)) - } - - if field.ID == targetID { - return value, nil - } + field := f.fieldsByID[b[0]] + if field == nil { + // This can happen, though is very unlikely. If this node receives encoded data, to be written + // to disk, and is queried for that data before its metastore is updated, there will be no field + // mapping for the data during decode. All this can happen because data is encoded by the node + // that first received the write request, not the node that actually writes the data to disk. + // So if this happens, the read must be aborted. + return nil, ErrFieldUnmappedID } - return 0, ErrFieldNotFound + if field.ID != targetID { + return nil, ErrFieldNotFound + } + + switch field.Type { + case influxql.Float: + // Move bytes forward. + return math.Float64frombits(binary.BigEndian.Uint64(b[1:9])), nil + case influxql.Integer: + return int64(binary.BigEndian.Uint64(b[1:9])), nil + case influxql.Boolean: + return b[1] == 1, nil + case influxql.String: + return string(b[3 : 3+binary.BigEndian.Uint16(b[1:3])]), nil + default: + panic(fmt.Sprintf("unsupported value type during decode by id: %T", field.Type)) + } } // DecodeByName scans a byte slice for a field with the given name, converts it to its @@ -260,35 +95,7 @@ func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error) { return f.DecodeByID(fi.ID, b) } -// Fields returns the array of fields in the FieldCodec -func (f *FieldCodec) Fields() (a []*Field) { - for _, f := range f.fieldsByID { - a = append(a, f) - } - return -} - // FieldByName returns the field by its name. It will return a nil if not found func (f *FieldCodec) FieldByName(name string) *Field { return f.fieldsByName[name] } - -// mustMarshal encodes a value to JSON. -// This will panic if an error occurs. This should only be used internally when -// an invalid marshal will cause corruption and a panic is appropriate. -func mustMarshalJSON(v interface{}) []byte { - b, err := json.Marshal(v) - if err != nil { - panic("marshal: " + err.Error()) - } - return b -} - -// mustUnmarshalJSON decodes a value from JSON. -// This will panic if an error occurs. This should only be used internally when -// an invalid unmarshal will cause corruption and a panic is appropriate. -func mustUnmarshalJSON(b []byte, v interface{}) { - if err := json.Unmarshal(b, v); err != nil { - panic("unmarshal: " + err.Error()) - } -} diff --git a/cmd/influx_tsm/tsdb/database.go b/cmd/influx_tsm/tsdb/database.go index d2e1fc3892..c276db08a7 100644 --- a/cmd/influx_tsm/tsdb/database.go +++ b/cmd/influx_tsm/tsdb/database.go @@ -196,12 +196,7 @@ func (d *Database) Shards() ([]*ShardInfo, error) { // shardFormat returns the format and size on disk of the shard at path. func shardFormat(path string) (EngineFormat, int64, error) { // If it's a directory then it's a tsm1 engine - f, err := os.Open(path) - if err != nil { - return 0, 0, err - } - fi, err := f.Stat() - f.Close() + fi, err := os.Stat(path) if err != nil { return 0, 0, err } @@ -228,13 +223,13 @@ func shardFormat(path string) (EngineFormat, int64, error) { } // There is an actual format indicator. - switch string(b.Get([]byte("format"))) { + switch f := string(b.Get([]byte("format"))); f { case "b1", "v1": format = B1 case "bz1": format = BZ1 default: - return fmt.Errorf("unrecognized engine format: %s", string(b.Get([]byte("format")))) + return fmt.Errorf("unrecognized engine format: %s", f) } return nil diff --git a/cmd/influx_tsm/tsdb/types.go b/cmd/influx_tsm/tsdb/types.go index dc1a1a9452..c0d0010f35 100644 --- a/cmd/influx_tsm/tsdb/types.go +++ b/cmd/influx_tsm/tsdb/types.go @@ -10,12 +10,6 @@ import ( "github.com/gogo/protobuf/proto" ) -// Cursor represents an iterator over a series. -type Cursor interface { - SeekTo(seek int64) (key int64, value interface{}) - Next() (key int64, value interface{}) -} - // Field represents an encoded field. type Field struct { ID uint8 `json:"id,omitempty"` @@ -29,18 +23,6 @@ type MeasurementFields struct { Codec *FieldCodec } -// MarshalBinary encodes the object to a binary format. -func (m *MeasurementFields) MarshalBinary() ([]byte, error) { - var pb internal.MeasurementFields - for _, f := range m.Fields { - id := int32(f.ID) - name := f.Name - t := int32(f.Type) - pb.Fields = append(pb.Fields, &internal.Field{ID: &id, Name: &name, Type: &t}) - } - return proto.Marshal(&pb) -} - // UnmarshalBinary decodes the object from a binary format. func (m *MeasurementFields) UnmarshalBinary(buf []byte) error { var pb internal.MeasurementFields @@ -60,39 +42,9 @@ type Series struct { Tags map[string]string } -// MarshalBinary encodes the object to a binary format. -func (s *Series) MarshalBinary() ([]byte, error) { - var pb internal.Series - pb.Key = &s.Key - for k, v := range s.Tags { - key := k - value := v - pb.Tags = append(pb.Tags, &internal.Tag{Key: &key, Value: &value}) - } - return proto.Marshal(&pb) -} - -// UnmarshalBinary decodes the object from a binary format. -func (s *Series) UnmarshalBinary(buf []byte) error { - var pb internal.Series - if err := proto.Unmarshal(buf, &pb); err != nil { - return err - } - s.Key = pb.GetKey() - s.Tags = make(map[string]string) - for _, t := range pb.Tags { - s.Tags[t.GetKey()] = t.GetValue() - } - return nil -} - // MeasurementFromSeriesKey returns the Measurement name for a given series. func MeasurementFromSeriesKey(key string) string { - idx := strings.Index(key, ",") - if idx == -1 { - return key - } - return key[:strings.Index(key, ",")] + return strings.SplitN(key, ",", 2)[0] } // DecodeKeyValue decodes the key and value from bytes. diff --git a/cmd/influx_tsm/tsdb/values.go b/cmd/influx_tsm/tsdb/values.go index 9766ee0a24..90ec1d4407 100644 --- a/cmd/influx_tsm/tsdb/values.go +++ b/cmd/influx_tsm/tsdb/values.go @@ -3,138 +3,37 @@ package tsdb import ( "fmt" "time" - - tsm "github.com/influxdata/influxdb/tsdb/engine/tsm1" ) -// FloatValue holds float64 values -type FloatValue struct { - T int64 - V float64 +type Value struct { + T int64 + Val interface{} } -func (f *FloatValue) UnixNano() int64 { - return f.T +func (v *Value) Time() time.Time { + return time.Unix(0, v.T) } -// Value returns the float64 value -func (f *FloatValue) Value() interface{} { - return f.V -} - -// Size returns the size of the FloatValue. It is always 16 -func (f *FloatValue) Size() int { - return 16 -} - -// String returns the formatted string. Implements the Stringer interface -func (f *FloatValue) String() string { - return fmt.Sprintf("%v %v", time.Unix(0, f.T), f.Value()) -} - -// BoolValue holds bool values -type BoolValue struct { - T int64 - V bool -} - -func (b *BoolValue) Size() int { - return 9 -} - -// UnixNano returns the Unix time in nanoseconds associated with the BoolValue -func (b *BoolValue) UnixNano() int64 { - return b.T -} - -// Value returns the boolean stored -func (b *BoolValue) Value() interface{} { - return b.V -} - -// String returns the formatted string. Implements the Stringer interface -func (f *BoolValue) String() string { - return fmt.Sprintf("%v %v", time.Unix(0, f.T), f.Value()) -} - -// Int64Value holds int64 values -type Int64Value struct { - T int64 - V int64 -} - -func (v *Int64Value) Value() interface{} { - return v.V -} - -// UnixNano returns the Unix time in nanoseconds associated with the Int64Value -func (v *Int64Value) UnixNano() int64 { +func (v *Value) UnixNano() int64 { return v.T } -// Size returns the size of the Int64Value. It is always 16 -func (v *Int64Value) Size() int { - return 16 +func (v *Value) Value() interface{} { + return v.Val } -// String returns the formatted string. Implements the Stringer interface -func (f *Int64Value) String() string { - return fmt.Sprintf("%v %v", time.Unix(0, f.T), f.Value()) +func (v *Value) String() string { + return fmt.Sprintf("%v %v", v.Time(), v.Val) } -// StringValue holds string values -type StringValue struct { - T int64 - V string -} - -func (v *StringValue) Value() interface{} { - return v.V -} - -// UnixNano returns the Unix time in nanoseconds associated with the StringValue -func (v *StringValue) UnixNano() int64 { - return v.T -} - -// Size returns the size of the StringValue -func (v *StringValue) Size() int { - return 8 + len(v.V) -} - -// String returns the formatted string. Implements the Stringer interface -func (f *StringValue) String() string { - return fmt.Sprintf("%v %v", time.Unix(0, f.T), f.Value()) -} - -// ConvertToValue converts the data from other engines to TSM -func ConvertToValue(k int64, v interface{}) tsm.Value { - var value tsm.Value - - switch v := v.(type) { - case int64: - value = &Int64Value{ - T: k, - V: v, - } - case float64: - value = &FloatValue{ - T: k, - V: v, - } +func (v *Value) Size() int { + switch vv := v.Val.(type) { + case int64, float64: + return 16 case bool: - value = &BoolValue{ - T: k, - V: v, - } + return 9 case string: - value = &StringValue{ - T: k, - V: v, - } - default: - panic(fmt.Sprintf("value type %T unsupported for conversion", v)) + return 8 + len(vv) } - - return value + return 0 }