Merge pull request #5863 from influxdata/jl-influx-tsm

influx_tsm: multiple improvements
pull/5882/head
joelegasse 2016-03-01 08:30:22 -05:00
commit 7adbd7da6e
11 changed files with 293 additions and 650 deletions

View File

@ -2,20 +2,17 @@ package b1 // import "github.com/influxdata/influxdb/cmd/influx_tsm/b1"
import (
"encoding/binary"
"math"
"sort"
"sync/atomic"
"time"
"github.com/boltdb/bolt"
"github.com/influxdata/influxdb/cmd/influx_tsm/stats"
"github.com/influxdata/influxdb/cmd/influx_tsm/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
// DefaultChunkSize is the size of chunks read from the b1 shard
const DefaultChunkSize = 1000
// NoFieldsFiltered is the number of nil fields filtered
var NoFieldsFiltered uint64
const DefaultChunkSize int = 1000
var excludedBuckets = map[string]bool{
"fields": true,
@ -34,21 +31,37 @@ type Reader struct {
currCursor int
keyBuf string
valuesBuf []tsm1.Value
tsmValues []tsm1.Value
values []tsdb.Value
valuePos int
fields map[string]*tsdb.MeasurementFields
codecs map[string]*tsdb.FieldCodec
ChunkSize int
stats *stats.Stats
}
// NewReader returns a reader for the b1 shard at path.
func NewReader(path string) *Reader {
return &Reader{
func NewReader(path string, stats *stats.Stats, chunkSize int) *Reader {
r := &Reader{
path: path,
fields: make(map[string]*tsdb.MeasurementFields),
codecs: make(map[string]*tsdb.FieldCodec),
stats: stats,
}
if chunkSize <= 0 {
chunkSize = DefaultChunkSize
}
// known-sized slice of a known type, in a contiguous chunk
r.values = make([]tsdb.Value, chunkSize)
r.tsmValues = make([]tsm1.Value, len(r.values))
for i := range r.values {
r.tsmValues[i] = &r.values[i]
}
return r
}
// Open opens the reader.
@ -104,7 +117,7 @@ func (r *Reader) Open() error {
measurement := tsdb.MeasurementFromSeriesKey(s)
fields := r.fields[measurement]
if fields == nil {
atomic.AddUint64(&NoFieldsFiltered, 1)
r.stats.IncrFiltered()
continue
}
for _, f := range fields.Fields {
@ -121,44 +134,61 @@ func (r *Reader) Open() error {
// Next returns whether any data remains to be read. It must be called before
// the next call to Read().
func (r *Reader) Next() bool {
r.valuePos = 0
OUTER:
for {
if r.currCursor == len(r.cursors) {
if r.currCursor >= len(r.cursors) {
// All cursors drained. No more data remains.
return false
}
cc := r.cursors[r.currCursor]
k, v := cc.Next()
if k == -1 {
// Go to next cursor and try again.
r.currCursor++
if len(r.valuesBuf) == 0 {
// The previous cursor had no data. Instead of returning
// just go immediately to the next cursor.
continue
}
// There is some data available. Indicate that it should be read.
return true
}
r.keyBuf = tsm1.SeriesFieldKey(cc.series, cc.field)
r.valuesBuf = append(r.valuesBuf, tsdb.ConvertToValue(k, v))
if len(r.valuesBuf) == r.ChunkSize {
return true
for {
k, v := cc.Next()
if k == -1 {
// Go to next cursor and try again.
r.currCursor++
if r.valuePos == 0 {
// The previous cursor had no data. Instead of returning
// just go immediately to the next cursor.
continue OUTER
}
// There is some data available. Indicate that it should be read.
return true
}
if f, ok := v.(float64); ok {
if math.IsInf(f, 0) {
r.stats.AddPointsRead(1)
r.stats.IncrInf()
continue
}
if math.IsNaN(f) {
r.stats.AddPointsRead(1)
r.stats.IncrNaN()
continue
}
}
r.values[r.valuePos].T = k
r.values[r.valuePos].Val = v
r.valuePos++
if r.valuePos >= len(r.values) {
return true
}
}
}
}
// Read returns the next chunk of data in the shard, converted to tsm1 values. Data is
// emitted completely for every field, in every series, before the next field is processed.
// Data from Read() adheres to the requirements for writing to tsm1 shards
func (r *Reader) Read() (string, []tsm1.Value, error) {
defer func() {
r.valuesBuf = nil
}()
return r.keyBuf, r.valuesBuf, nil
return r.keyBuf, r.tsmValues[:r.valuePos], nil
}
// Close closes the reader.
@ -198,7 +228,7 @@ func newCursor(tx *bolt.Tx, series string, field string, dec *tsdb.FieldCodec) *
}
// Seek moves the cursor to a position.
func (c cursor) SeekTo(seek int64) {
func (c *cursor) SeekTo(seek int64) {
var seekBytes [8]byte
binary.BigEndian.PutUint64(seekBytes[:], uint64(seek))
k, v := c.cursor.Seek(seekBytes[:])
@ -238,5 +268,8 @@ type cursors []*cursor
func (a cursors) Len() int { return len(a) }
func (a cursors) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a cursors) Less(i, j int) bool {
return tsm1.SeriesFieldKey(a[i].series, a[i].field) < tsm1.SeriesFieldKey(a[j].series, a[j].field)
if a[i].series == a[j].series {
return a[i].field < a[j].field
}
return a[i].series < a[j].series
}

View File

@ -5,22 +5,20 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"math"
"sort"
"sync/atomic"
"time"
"github.com/boltdb/bolt"
"github.com/golang/snappy"
"github.com/influxdata/influxdb/cmd/influx_tsm/stats"
"github.com/influxdata/influxdb/cmd/influx_tsm/tsdb"
tsm "github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
// DefaultChunkSize is the size of chunks read from the bz1 shard
const DefaultChunkSize = 1000
// NoFieldsFiltered is the number of nil fields filtered
var NoFieldsFiltered uint64
// Reader is used to read all data from a bz1 shard.
type Reader struct {
path string
@ -31,22 +29,37 @@ type Reader struct {
currCursor int
keyBuf string
valuesBuf []tsm.Value
tsmValues []tsm1.Value
values []tsdb.Value
valuePos int
fields map[string]*tsdb.MeasurementFields
codecs map[string]*tsdb.FieldCodec
ChunkSize int
stats *stats.Stats
}
// NewReader returns a reader for the bz1 shard at path.
func NewReader(path string) *Reader {
return &Reader{
path: path,
fields: make(map[string]*tsdb.MeasurementFields),
codecs: make(map[string]*tsdb.FieldCodec),
ChunkSize: DefaultChunkSize,
func NewReader(path string, stats *stats.Stats, chunkSize int) *Reader {
r := &Reader{
path: path,
fields: make(map[string]*tsdb.MeasurementFields),
codecs: make(map[string]*tsdb.FieldCodec),
stats: stats,
}
if chunkSize <= 0 {
chunkSize = DefaultChunkSize
}
// known-sized slice of a known type, in a contiguous chunk
r.values = make([]tsdb.Value, chunkSize)
r.tsmValues = make([]tsm1.Value, len(r.values))
for i := range r.values {
r.tsmValues[i] = &r.values[i]
}
return r
}
// Open opens the reader.
@ -114,7 +127,7 @@ func (r *Reader) Open() error {
measurement := tsdb.MeasurementFromSeriesKey(s)
fields := r.fields[measurement]
if fields == nil {
atomic.AddUint64(&NoFieldsFiltered, 1)
r.stats.IncrFiltered()
continue
}
for _, f := range fields.Fields {
@ -133,30 +146,52 @@ func (r *Reader) Open() error {
// Next returns whether there is any more data to be read.
func (r *Reader) Next() bool {
r.valuePos = 0
OUTER:
for {
if r.currCursor == len(r.cursors) {
if r.currCursor >= len(r.cursors) {
// All cursors drained. No more data remains.
return false
}
cc := r.cursors[r.currCursor]
k, v := cc.Next()
if k == -1 {
// Go to next cursor and try again.
r.currCursor++
if len(r.valuesBuf) == 0 {
// The previous cursor had no data. Instead of returning
// just go immediately to the next cursor.
continue
}
// There is some data available. Indicate that it should be read.
return true
}
r.keyBuf = tsm1.SeriesFieldKey(cc.series, cc.field)
r.keyBuf = tsm.SeriesFieldKey(cc.series, cc.field)
r.valuesBuf = append(r.valuesBuf, tsdb.ConvertToValue(k, v))
if len(r.valuesBuf) == r.ChunkSize {
return true
for {
k, v := cc.Next()
if k == -1 {
// Go to next cursor and try again.
r.currCursor++
if r.valuePos == 0 {
// The previous cursor had no data. Instead of returning
// just go immediately to the next cursor.
continue OUTER
}
// There is some data available. Indicate that it should be read.
return true
}
if f, ok := v.(float64); ok {
if math.IsInf(f, 0) {
r.stats.AddPointsRead(1)
r.stats.IncrInf()
continue
}
if math.IsNaN(f) {
r.stats.AddPointsRead(1)
r.stats.IncrNaN()
continue
}
}
r.values[r.valuePos].T = k
r.values[r.valuePos].Val = v
r.valuePos++
if r.valuePos >= len(r.values) {
return true
}
}
}
}
@ -164,12 +199,8 @@ func (r *Reader) Next() bool {
// Read returns the next chunk of data in the shard, converted to tsm1 values. Data is
// emitted completely for every field, in every series, before the next field is processed.
// Data from Read() adheres to the requirements for writing to tsm1 shards
func (r *Reader) Read() (string, []tsm.Value, error) {
defer func() {
r.valuesBuf = nil
}()
return r.keyBuf, r.valuesBuf, nil
func (r *Reader) Read() (string, []tsm1.Value, error) {
return r.keyBuf, r.tsmValues[:r.valuePos], nil
}
// Close closes the reader.
@ -333,7 +364,10 @@ type cursors []*cursor
func (a cursors) Len() int { return len(a) }
func (a cursors) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a cursors) Less(i, j int) bool {
return tsm.SeriesFieldKey(a[i].series, a[i].field) < tsm.SeriesFieldKey(a[j].series, a[j].field)
if a[i].series == a[j].series {
return a[i].field < a[j].field
}
return a[i].series < a[j].series
}
// entryHeaderSize is the number of bytes required for the header.

View File

@ -2,10 +2,10 @@ package main
import (
"fmt"
"math"
"os"
"path/filepath"
"github.com/influxdata/influxdb/cmd/influx_tsm/stats"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
@ -20,15 +20,15 @@ type Converter struct {
path string
maxTSMFileSize uint32
sequence int
tracker *tracker
stats *stats.Stats
}
// NewConverter returns a new instance of the Converter.
func NewConverter(path string, sz uint32, t *tracker) *Converter {
func NewConverter(path string, sz uint32, stats *stats.Stats) *Converter {
return &Converter{
path: path,
maxTSMFileSize: sz,
tracker: t,
stats: stats,
}
}
@ -46,7 +46,6 @@ func (c *Converter) Process(iter KeyIterator) error {
if err != nil {
return err
}
scrubbed := c.scrubValues(v)
if w == nil {
w, err = c.nextTSMWriter()
@ -54,12 +53,12 @@ func (c *Converter) Process(iter KeyIterator) error {
return err
}
}
if err := w.Write(k, scrubbed); err != nil {
if err := w.Write(k, v); err != nil {
return err
}
c.tracker.AddPointsRead(len(v))
c.tracker.AddPointsWritten(len(scrubbed))
c.stats.AddPointsRead(len(v))
c.stats.AddPointsWritten(len(v))
// If we have a max file size configured and we're over it, start a new TSM file.
if w.Size() > c.maxTSMFileSize {
@ -67,7 +66,7 @@ func (c *Converter) Process(iter KeyIterator) error {
return err
}
c.tracker.AddTSMBytes(w.Size())
c.stats.AddTSMBytes(w.Size())
if err := w.Close(); err != nil {
return err
@ -80,7 +79,7 @@ func (c *Converter) Process(iter KeyIterator) error {
if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues {
return err
}
c.tracker.AddTSMBytes(w.Size())
c.stats.AddTSMBytes(w.Size())
if err := w.Close(); err != nil {
return err
@ -106,49 +105,6 @@ func (c *Converter) nextTSMWriter() (tsm1.TSMWriter, error) {
return nil, err
}
c.tracker.IncrTSMFileCount()
c.stats.IncrTSMFileCount()
return w, nil
}
// scrubValues takes a slice and removes float64 NaN and Inf. If neither is
// present in the slice, the original slice is returned. This is to avoid
// copying slices unnecessarily.
func (c *Converter) scrubValues(values []tsm1.Value) []tsm1.Value {
var scrubbed []tsm1.Value
if values == nil {
return nil
}
for i, v := range values {
if f, ok := v.Value().(float64); ok {
var filter bool
if math.IsNaN(f) {
filter = true
c.tracker.IncrNaN()
}
if math.IsInf(f, 0) {
filter = true
c.tracker.IncrInf()
}
if filter {
if scrubbed == nil {
// Take every value up to the NaN, indicating that scrubbed
// should now be used.
scrubbed = values[:i]
}
} else {
if scrubbed != nil {
// We've filtered at least 1 value, so add value to filtered slice.
scrubbed = append(scrubbed, v)
}
}
}
}
if scrubbed != nil {
return scrubbed
}
return values
}

View File

@ -1,57 +0,0 @@
package main
import (
"fmt"
"math"
"reflect"
"strings"
"testing"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
func TestScrubValues(t *testing.T) {
dummy := Converter{
tracker: new(tracker),
}
var epoch int64
simple := []tsm1.Value{tsm1.NewValue(epoch, 1.0)}
for _, tt := range []struct {
input, expected []tsm1.Value
}{
{
input: simple,
expected: simple,
}, {
input: []tsm1.Value{simple[0], tsm1.NewValue(epoch, math.NaN())},
expected: simple,
}, {
input: []tsm1.Value{simple[0], tsm1.NewValue(epoch, math.Inf(-1))},
expected: simple,
}, {
input: []tsm1.Value{simple[0], tsm1.NewValue(epoch, math.Inf(1)), tsm1.NewValue(epoch, math.NaN())},
expected: simple,
},
} {
out := dummy.scrubValues(tt.input)
if !reflect.DeepEqual(out, tt.expected) {
t.Errorf("Failed to scrub '%s': Got '%s', Expected '%s'", pretty(tt.input), pretty(out), pretty(tt.expected))
}
}
}
func pretty(vals []tsm1.Value) string {
if len(vals) == 0 {
return "[]"
}
strs := make([]string, len(vals))
for i := range vals {
strs[i] = fmt.Sprintf("{%v: %v}", vals[i].UnixNano(), vals[i].Value())
}
return "[" + strings.Join(strs, ", ") + "]"
}

View File

@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"sort"
"strings"
"text/tabwriter"
@ -55,6 +56,8 @@ type options struct {
Parallel bool
SkipBackup bool
UpdateInterval time.Duration
Yes bool
CpuFile string
}
func (o *options) Parse() error {
@ -69,6 +72,8 @@ func (o *options) Parse() error {
fs.StringVar(&opts.BackupPath, "backup", "", "The location to backup up the current databases. Must not be within the data directory.")
fs.StringVar(&opts.DebugAddr, "debug", "", "If set, http debugging endpoints will be enabled on the given address")
fs.DurationVar(&opts.UpdateInterval, "interval", 5*time.Second, "How often status updates are printed.")
fs.BoolVar(&opts.Yes, "y", false, "Don't ask, just convert")
fs.StringVar(&opts.CpuFile, "profile", "", "CPU Profile location")
fs.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %v [options] <data-path> \n", os.Args[0])
fmt.Fprintf(os.Stderr, "%v\n\nOptions:\n", description)
@ -197,19 +202,32 @@ func main() {
}
w.Flush()
// Get confirmation from user.
fmt.Printf("\nThese shards will be converted. Proceed? y/N: ")
liner := bufio.NewReader(os.Stdin)
yn, err := liner.ReadString('\n')
if err != nil {
log.Fatalf("failed to read response: %v", err)
}
yn = strings.TrimRight(strings.ToLower(yn), "\n")
if yn != "y" {
log.Fatal("Conversion aborted.")
if !opts.Yes {
// Get confirmation from user.
fmt.Printf("\nThese shards will be converted. Proceed? y/N: ")
liner := bufio.NewReader(os.Stdin)
yn, err := liner.ReadString('\n')
if err != nil {
log.Fatalf("failed to read response: %v", err)
}
yn = strings.TrimRight(strings.ToLower(yn), "\n")
if yn != "y" {
log.Fatal("Conversion aborted.")
}
}
fmt.Println("Conversion starting....")
if opts.CpuFile != "" {
f, err := os.Create(opts.CpuFile)
if err != nil {
log.Fatal(err)
}
if err = pprof.StartCPUProfile(f); err != nil {
log.Fatal(err)
}
defer pprof.StopCPUProfile()
}
tr := newTracker(shards, opts)
if err := tr.Run(); err != nil {
@ -317,9 +335,9 @@ func convertShard(si *tsdb.ShardInfo, tr *tracker) error {
var reader ShardReader
switch si.Format {
case tsdb.BZ1:
reader = bz1.NewReader(src)
reader = bz1.NewReader(src, &tr.Stats, 0)
case tsdb.B1:
reader = b1.NewReader(src)
reader = b1.NewReader(src, &tr.Stats, 0)
default:
return fmt.Errorf("Unsupported shard format: %v", si.FormatAsString())
}
@ -329,7 +347,7 @@ func convertShard(si *tsdb.ShardInfo, tr *tracker) error {
return fmt.Errorf("Failed to open %v for conversion: %v", src, err)
}
defer reader.Close()
converter := NewConverter(dst, uint32(opts.TSMSize), tr)
converter := NewConverter(dst, uint32(opts.TSMSize), &tr.Stats)
// Perform the conversion.
if err := converter.Process(reader); err != nil {

View File

@ -0,0 +1,47 @@
package stats
import (
"sync/atomic"
"time"
)
// Stats are the statistics captured while converting non-TSM shards to TSM
type Stats struct {
NanFiltered uint64
InfFiltered uint64
FieldsFiltered uint64
PointsWritten uint64
PointsRead uint64
TsmFilesCreated uint64
TsmBytesWritten uint64
CompletedShards uint64
TotalTime time.Duration
}
func (s *Stats) AddPointsRead(n int) {
atomic.AddUint64(&s.PointsRead, uint64(n))
}
func (s *Stats) AddPointsWritten(n int) {
atomic.AddUint64(&s.PointsWritten, uint64(n))
}
func (s *Stats) AddTSMBytes(n uint32) {
atomic.AddUint64(&s.TsmBytesWritten, uint64(n))
}
func (s *Stats) IncrTSMFileCount() {
atomic.AddUint64(&s.TsmFilesCreated, 1)
}
func (s *Stats) IncrNaN() {
atomic.AddUint64(&s.NanFiltered, 1)
}
func (s *Stats) IncrInf() {
atomic.AddUint64(&s.InfFiltered, 1)
}
func (s *Stats) IncrFiltered() {
atomic.AddUint64(&s.FieldsFiltered, 1)
}

View File

@ -8,14 +8,13 @@ import (
"sync/atomic"
"time"
"github.com/influxdata/influxdb/cmd/influx_tsm/b1"
"github.com/influxdata/influxdb/cmd/influx_tsm/bz1"
"github.com/influxdata/influxdb/cmd/influx_tsm/stats"
"github.com/influxdata/influxdb/cmd/influx_tsm/tsdb"
)
// tracker will orchestrate and track the conversions of non-TSM shards to TSM
type tracker struct {
stats Stats
Stats stats.Stats
shards tsdb.ShardInfos
opts options
@ -24,18 +23,6 @@ type tracker struct {
wg sync.WaitGroup
}
// Stats are the statistics captured while converting non-TSM shards to TSM
type Stats struct {
NanFiltered uint64
InfFiltered uint64
PointsWritten uint64
PointsRead uint64
TsmFilesCreated uint64
TsmBytesWritten uint64
CompletedShards uint64
TotalTime time.Duration
}
// newTracker will setup and return a clean tracker instance
func newTracker(shards tsdb.ShardInfos, opts options) *tracker {
t := &tracker{
@ -47,10 +34,6 @@ func newTracker(shards tsdb.ShardInfos, opts options) *tracker {
return t
}
func (t *tracker) Errorf(str string, args ...interface{}) {
}
func (t *tracker) Run() error {
conversionStart := time.Now()
@ -83,7 +66,7 @@ func (t *tracker) Run() error {
si := t.shards[i]
go t.pg.Do(func() {
defer func() {
atomic.AddUint64(&t.stats.CompletedShards, 1)
atomic.AddUint64(&t.Stats.CompletedShards, 1)
t.wg.Done()
}()
@ -112,60 +95,36 @@ WAIT_LOOP:
}
}
t.stats.TotalTime = time.Since(conversionStart)
t.Stats.TotalTime = time.Since(conversionStart)
return nil
}
func (t *tracker) StatusUpdate() {
shardCount := atomic.LoadUint64(&t.stats.CompletedShards)
pointCount := atomic.LoadUint64(&t.stats.PointsRead)
pointWritten := atomic.LoadUint64(&t.stats.PointsWritten)
shardCount := atomic.LoadUint64(&t.Stats.CompletedShards)
pointCount := atomic.LoadUint64(&t.Stats.PointsRead)
pointWritten := atomic.LoadUint64(&t.Stats.PointsWritten)
log.Printf("Still Working: Completed Shards: %d/%d Points read/written: %d/%d", shardCount, len(t.shards), pointCount, pointWritten)
}
func (t *tracker) PrintStats() {
preSize := t.shards.Size()
postSize := int64(t.stats.TsmBytesWritten)
postSize := int64(t.Stats.TsmBytesWritten)
fmt.Printf("\nSummary statistics\n========================================\n")
fmt.Printf("Databases converted: %d\n", len(t.shards.Databases()))
fmt.Printf("Shards converted: %d\n", len(t.shards))
fmt.Printf("TSM files created: %d\n", t.stats.TsmFilesCreated)
fmt.Printf("Points read: %d\n", t.stats.PointsRead)
fmt.Printf("Points written: %d\n", t.stats.PointsWritten)
fmt.Printf("NaN filtered: %d\n", t.stats.NanFiltered)
fmt.Printf("Inf filtered: %d\n", t.stats.InfFiltered)
fmt.Printf("Points without fields filtered: %d\n", b1.NoFieldsFiltered+bz1.NoFieldsFiltered)
fmt.Printf("TSM files created: %d\n", t.Stats.TsmFilesCreated)
fmt.Printf("Points read: %d\n", t.Stats.PointsRead)
fmt.Printf("Points written: %d\n", t.Stats.PointsWritten)
fmt.Printf("NaN filtered: %d\n", t.Stats.NanFiltered)
fmt.Printf("Inf filtered: %d\n", t.Stats.InfFiltered)
fmt.Printf("Points without fields filtered: %d\n", t.Stats.FieldsFiltered)
fmt.Printf("Disk usage pre-conversion (bytes): %d\n", preSize)
fmt.Printf("Disk usage post-conversion (bytes): %d\n", postSize)
fmt.Printf("Reduction factor: %d%%\n", 100*(preSize-postSize)/preSize)
fmt.Printf("Bytes per TSM point: %.2f\n", float64(postSize)/float64(t.stats.PointsWritten))
fmt.Printf("Total conversion time: %v\n", t.stats.TotalTime)
fmt.Printf("Bytes per TSM point: %.2f\n", float64(postSize)/float64(t.Stats.PointsWritten))
fmt.Printf("Total conversion time: %v\n", t.Stats.TotalTime)
fmt.Println()
}
func (t *tracker) AddPointsRead(n int) {
atomic.AddUint64(&t.stats.PointsRead, uint64(n))
}
func (t *tracker) AddPointsWritten(n int) {
atomic.AddUint64(&t.stats.PointsWritten, uint64(n))
}
func (t *tracker) AddTSMBytes(n uint32) {
atomic.AddUint64(&t.stats.TsmBytesWritten, uint64(n))
}
func (t *tracker) IncrTSMFileCount() {
atomic.AddUint64(&t.stats.TsmFilesCreated, 1)
}
func (t *tracker) IncrNaN() {
atomic.AddUint64(&t.stats.NanFiltered, 1)
}
func (t *tracker) IncrInf() {
atomic.AddUint64(&t.stats.InfFiltered, 1)
}

View File

@ -2,7 +2,6 @@ package tsdb
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math"
@ -40,79 +39,6 @@ func NewFieldCodec(fields map[string]*Field) *FieldCodec {
return &FieldCodec{fieldsByID: fieldsByID, fieldsByName: fieldsByName}
}
// EncodeFields converts a map of values with string keys to a byte slice of field
// IDs and values.
//
// If a field exists in the codec, but its type is different, an error is returned. If
// a field is not present in the codec, the system panics.
func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error) {
// Allocate byte slice
b := make([]byte, 0, 10)
for k, v := range values {
field := f.fieldsByName[k]
if field == nil {
panic(fmt.Sprintf("field does not exist for %s", k))
} else if influxql.InspectDataType(v) != field.Type {
return nil, fmt.Errorf("field \"%s\" is type %T, mapped as type %s", k, v, field.Type)
}
var buf []byte
switch field.Type {
case influxql.Float:
value := v.(float64)
buf = make([]byte, 9)
binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(value))
case influxql.Integer:
var value uint64
switch v.(type) {
case int:
value = uint64(v.(int))
case int32:
value = uint64(v.(int32))
case int64:
value = uint64(v.(int64))
default:
panic(fmt.Sprintf("invalid integer type: %T", v))
}
buf = make([]byte, 9)
binary.BigEndian.PutUint64(buf[1:9], value)
case influxql.Boolean:
value := v.(bool)
// Only 1 byte need for a boolean.
buf = make([]byte, 2)
if value {
buf[1] = byte(1)
}
case influxql.String:
value := v.(string)
if len(value) > maxStringLength {
value = value[:maxStringLength]
}
// Make a buffer for field ID (1 bytes), the string length (2 bytes), and the string.
buf = make([]byte, len(value)+3)
// Set the string length, then copy the string itself.
binary.BigEndian.PutUint16(buf[1:3], uint16(len(value)))
for i, c := range []byte(value) {
buf[i+3] = byte(c)
}
default:
panic(fmt.Sprintf("unsupported value type during encode fields: %T", v))
}
// Always set the field ID as the leading byte.
buf[0] = field.ID
// Append temp buffer to the end.
b = append(b, buf...)
}
return b, nil
}
// FieldIDByName returns the ID for the given field.
func (f *FieldCodec) FieldIDByName(s string) (uint8, error) {
fi := f.fieldsByName[s]
@ -122,132 +48,41 @@ func (f *FieldCodec) FieldIDByName(s string) (uint8, error) {
return fi.ID, nil
}
// DecodeFields decodes a byte slice into a set of field ids and values.
func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error) {
if len(b) == 0 {
return nil, nil
}
// Create a map to hold the decoded data.
values := make(map[uint8]interface{}, 0)
for {
if len(b) < 1 {
// No more bytes.
break
}
// First byte is the field identifier.
fieldID := b[0]
field := f.fieldsByID[fieldID]
if field == nil {
// See note in DecodeByID() regarding field-mapping failures.
return nil, ErrFieldUnmappedID
}
var value interface{}
switch field.Type {
case influxql.Float:
value = math.Float64frombits(binary.BigEndian.Uint64(b[1:9]))
// Move bytes forward.
b = b[9:]
case influxql.Integer:
value = int64(binary.BigEndian.Uint64(b[1:9]))
// Move bytes forward.
b = b[9:]
case influxql.Boolean:
if b[1] == 1 {
value = true
} else {
value = false
}
// Move bytes forward.
b = b[2:]
case influxql.String:
size := binary.BigEndian.Uint16(b[1:3])
value = string(b[3 : size+3])
// Move bytes forward.
b = b[size+3:]
default:
panic(fmt.Sprintf("unsupported value type during decode fields: %T", f.fieldsByID[fieldID]))
}
values[fieldID] = value
}
return values, nil
}
// DecodeFieldsWithNames decodes a byte slice into a set of field names and values
func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error) {
fields, err := f.DecodeFields(b)
if err != nil {
return nil, err
}
m := make(map[string]interface{})
for id, v := range fields {
field := f.fieldsByID[id]
if field != nil {
m[field.Name] = v
}
}
return m, nil
}
// DecodeByID scans a byte slice for a field with the given ID, converts it to its
// expected type, and return that value.
func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) {
if len(b) == 0 {
return 0, ErrFieldNotFound
// No more bytes.
return nil, ErrFieldNotFound
}
for {
if len(b) < 1 {
// No more bytes.
break
}
field, ok := f.fieldsByID[b[0]]
if !ok {
// This can happen, though is very unlikely. If this node receives encoded data, to be written
// to disk, and is queried for that data before its metastore is updated, there will be no field
// mapping for the data during decode. All this can happen because data is encoded by the node
// that first received the write request, not the node that actually writes the data to disk.
// So if this happens, the read must be aborted.
return 0, ErrFieldUnmappedID
}
var value interface{}
switch field.Type {
case influxql.Float:
// Move bytes forward.
value = math.Float64frombits(binary.BigEndian.Uint64(b[1:9]))
b = b[9:]
case influxql.Integer:
value = int64(binary.BigEndian.Uint64(b[1:9]))
b = b[9:]
case influxql.Boolean:
if b[1] == 1 {
value = true
} else {
value = false
}
// Move bytes forward.
b = b[2:]
case influxql.String:
size := binary.BigEndian.Uint16(b[1:3])
value = string(b[3 : 3+size])
// Move bytes forward.
b = b[size+3:]
default:
panic(fmt.Sprintf("unsupported value type during decode by id: %T", field.Type))
}
if field.ID == targetID {
return value, nil
}
field := f.fieldsByID[b[0]]
if field == nil {
// This can happen, though is very unlikely. If this node receives encoded data, to be written
// to disk, and is queried for that data before its metastore is updated, there will be no field
// mapping for the data during decode. All this can happen because data is encoded by the node
// that first received the write request, not the node that actually writes the data to disk.
// So if this happens, the read must be aborted.
return nil, ErrFieldUnmappedID
}
return 0, ErrFieldNotFound
if field.ID != targetID {
return nil, ErrFieldNotFound
}
switch field.Type {
case influxql.Float:
// Move bytes forward.
return math.Float64frombits(binary.BigEndian.Uint64(b[1:9])), nil
case influxql.Integer:
return int64(binary.BigEndian.Uint64(b[1:9])), nil
case influxql.Boolean:
return b[1] == 1, nil
case influxql.String:
return string(b[3 : 3+binary.BigEndian.Uint16(b[1:3])]), nil
default:
panic(fmt.Sprintf("unsupported value type during decode by id: %T", field.Type))
}
}
// DecodeByName scans a byte slice for a field with the given name, converts it to its
@ -260,35 +95,7 @@ func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error) {
return f.DecodeByID(fi.ID, b)
}
// Fields returns the array of fields in the FieldCodec
func (f *FieldCodec) Fields() (a []*Field) {
for _, f := range f.fieldsByID {
a = append(a, f)
}
return
}
// FieldByName returns the field by its name. It will return a nil if not found
func (f *FieldCodec) FieldByName(name string) *Field {
return f.fieldsByName[name]
}
// mustMarshal encodes a value to JSON.
// This will panic if an error occurs. This should only be used internally when
// an invalid marshal will cause corruption and a panic is appropriate.
func mustMarshalJSON(v interface{}) []byte {
b, err := json.Marshal(v)
if err != nil {
panic("marshal: " + err.Error())
}
return b
}
// mustUnmarshalJSON decodes a value from JSON.
// This will panic if an error occurs. This should only be used internally when
// an invalid unmarshal will cause corruption and a panic is appropriate.
func mustUnmarshalJSON(b []byte, v interface{}) {
if err := json.Unmarshal(b, v); err != nil {
panic("unmarshal: " + err.Error())
}
}

View File

@ -196,12 +196,7 @@ func (d *Database) Shards() ([]*ShardInfo, error) {
// shardFormat returns the format and size on disk of the shard at path.
func shardFormat(path string) (EngineFormat, int64, error) {
// If it's a directory then it's a tsm1 engine
f, err := os.Open(path)
if err != nil {
return 0, 0, err
}
fi, err := f.Stat()
f.Close()
fi, err := os.Stat(path)
if err != nil {
return 0, 0, err
}
@ -228,13 +223,13 @@ func shardFormat(path string) (EngineFormat, int64, error) {
}
// There is an actual format indicator.
switch string(b.Get([]byte("format"))) {
switch f := string(b.Get([]byte("format"))); f {
case "b1", "v1":
format = B1
case "bz1":
format = BZ1
default:
return fmt.Errorf("unrecognized engine format: %s", string(b.Get([]byte("format"))))
return fmt.Errorf("unrecognized engine format: %s", f)
}
return nil

View File

@ -10,12 +10,6 @@ import (
"github.com/gogo/protobuf/proto"
)
// Cursor represents an iterator over a series.
type Cursor interface {
SeekTo(seek int64) (key int64, value interface{})
Next() (key int64, value interface{})
}
// Field represents an encoded field.
type Field struct {
ID uint8 `json:"id,omitempty"`
@ -29,18 +23,6 @@ type MeasurementFields struct {
Codec *FieldCodec
}
// MarshalBinary encodes the object to a binary format.
func (m *MeasurementFields) MarshalBinary() ([]byte, error) {
var pb internal.MeasurementFields
for _, f := range m.Fields {
id := int32(f.ID)
name := f.Name
t := int32(f.Type)
pb.Fields = append(pb.Fields, &internal.Field{ID: &id, Name: &name, Type: &t})
}
return proto.Marshal(&pb)
}
// UnmarshalBinary decodes the object from a binary format.
func (m *MeasurementFields) UnmarshalBinary(buf []byte) error {
var pb internal.MeasurementFields
@ -60,39 +42,9 @@ type Series struct {
Tags map[string]string
}
// MarshalBinary encodes the object to a binary format.
func (s *Series) MarshalBinary() ([]byte, error) {
var pb internal.Series
pb.Key = &s.Key
for k, v := range s.Tags {
key := k
value := v
pb.Tags = append(pb.Tags, &internal.Tag{Key: &key, Value: &value})
}
return proto.Marshal(&pb)
}
// UnmarshalBinary decodes the object from a binary format.
func (s *Series) UnmarshalBinary(buf []byte) error {
var pb internal.Series
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
s.Key = pb.GetKey()
s.Tags = make(map[string]string)
for _, t := range pb.Tags {
s.Tags[t.GetKey()] = t.GetValue()
}
return nil
}
// MeasurementFromSeriesKey returns the Measurement name for a given series.
func MeasurementFromSeriesKey(key string) string {
idx := strings.Index(key, ",")
if idx == -1 {
return key
}
return key[:strings.Index(key, ",")]
return strings.SplitN(key, ",", 2)[0]
}
// DecodeKeyValue decodes the key and value from bytes.

View File

@ -3,138 +3,37 @@ package tsdb
import (
"fmt"
"time"
tsm "github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
// FloatValue holds float64 values
type FloatValue struct {
T int64
V float64
type Value struct {
T int64
Val interface{}
}
func (f *FloatValue) UnixNano() int64 {
return f.T
func (v *Value) Time() time.Time {
return time.Unix(0, v.T)
}
// Value returns the float64 value
func (f *FloatValue) Value() interface{} {
return f.V
}
// Size returns the size of the FloatValue. It is always 16
func (f *FloatValue) Size() int {
return 16
}
// String returns the formatted string. Implements the Stringer interface
func (f *FloatValue) String() string {
return fmt.Sprintf("%v %v", time.Unix(0, f.T), f.Value())
}
// BoolValue holds bool values
type BoolValue struct {
T int64
V bool
}
func (b *BoolValue) Size() int {
return 9
}
// UnixNano returns the Unix time in nanoseconds associated with the BoolValue
func (b *BoolValue) UnixNano() int64 {
return b.T
}
// Value returns the boolean stored
func (b *BoolValue) Value() interface{} {
return b.V
}
// String returns the formatted string. Implements the Stringer interface
func (f *BoolValue) String() string {
return fmt.Sprintf("%v %v", time.Unix(0, f.T), f.Value())
}
// Int64Value holds int64 values
type Int64Value struct {
T int64
V int64
}
func (v *Int64Value) Value() interface{} {
return v.V
}
// UnixNano returns the Unix time in nanoseconds associated with the Int64Value
func (v *Int64Value) UnixNano() int64 {
func (v *Value) UnixNano() int64 {
return v.T
}
// Size returns the size of the Int64Value. It is always 16
func (v *Int64Value) Size() int {
return 16
func (v *Value) Value() interface{} {
return v.Val
}
// String returns the formatted string. Implements the Stringer interface
func (f *Int64Value) String() string {
return fmt.Sprintf("%v %v", time.Unix(0, f.T), f.Value())
func (v *Value) String() string {
return fmt.Sprintf("%v %v", v.Time(), v.Val)
}
// StringValue holds string values
type StringValue struct {
T int64
V string
}
func (v *StringValue) Value() interface{} {
return v.V
}
// UnixNano returns the Unix time in nanoseconds associated with the StringValue
func (v *StringValue) UnixNano() int64 {
return v.T
}
// Size returns the size of the StringValue
func (v *StringValue) Size() int {
return 8 + len(v.V)
}
// String returns the formatted string. Implements the Stringer interface
func (f *StringValue) String() string {
return fmt.Sprintf("%v %v", time.Unix(0, f.T), f.Value())
}
// ConvertToValue converts the data from other engines to TSM
func ConvertToValue(k int64, v interface{}) tsm.Value {
var value tsm.Value
switch v := v.(type) {
case int64:
value = &Int64Value{
T: k,
V: v,
}
case float64:
value = &FloatValue{
T: k,
V: v,
}
func (v *Value) Size() int {
switch vv := v.Val.(type) {
case int64, float64:
return 16
case bool:
value = &BoolValue{
T: k,
V: v,
}
return 9
case string:
value = &StringValue{
T: k,
V: v,
}
default:
panic(fmt.Sprintf("value type %T unsupported for conversion", v))
return 8 + len(vv)
}
return value
return 0
}