From 2ba032b7a8fff67e9efab4d1cbd98c3e94708b85 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Mon, 7 Sep 2015 15:56:21 -0700 Subject: [PATCH] WIP: finish basics of PD1. IT WORKS! (kind of) --- cmd/influx_stress/influx_stress.go | 1 + tsdb/engine.go | 3 + tsdb/engine/pd1/encoding.go | 22 ++- tsdb/engine/pd1/pd1.go | 254 ++++++++++++++++++++++------- tsdb/engine/pd1/pd1_test.go | 120 +++++++++----- tsdb/shard.go | 52 +++--- 6 files changed, 326 insertions(+), 126 deletions(-) diff --git a/cmd/influx_stress/influx_stress.go b/cmd/influx_stress/influx_stress.go index 660007125a..bd2f1b4a2a 100644 --- a/cmd/influx_stress/influx_stress.go +++ b/cmd/influx_stress/influx_stress.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "net/url" "runtime" "sort" "time" diff --git a/tsdb/engine.go b/tsdb/engine.go index d2862b5486..b0e9254d95 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -18,6 +18,9 @@ var ( ErrFormatNotFound = errors.New("format not found") ) +// DefaultEngine is the default engine used by the shard when initializing. +const DefaultEngine = "pd1" + // Engine represents a swappable storage engine for the shard. type Engine interface { Open() error diff --git a/tsdb/engine/pd1/encoding.go b/tsdb/engine/pd1/encoding.go index 285c83f8c0..60f72766d8 100644 --- a/tsdb/engine/pd1/encoding.go +++ b/tsdb/engine/pd1/encoding.go @@ -1,16 +1,34 @@ package pd1 import ( + "encoding/binary" + "math" "time" "github.com/dgryski/go-tsz" ) +type Value interface { + TimeBytes() []byte + ValueBytes() []byte + Time() time.Time +} + type FloatValue struct { Time time.Time Value float64 } +func (f *FloatValue) TimeBytes() []byte { + return u64tob(uint64(f.Time.UnixNano())) +} + +func (f *FloatValue) ValueBytes() []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, math.Float64bits(f.Value)) + return buf +} + type FloatValues []FloatValue func (a FloatValues) Len() int { return len(a) } @@ -24,11 +42,11 @@ func EncodeFloatBlock(buf []byte, values []FloatValue) []byte { s.Push(uint32(v.Time.Unix()), v.Value) } s.Finish() - return s.Bytes() + return append(u64tob(uint64(values[0].Time.UnixNano())), s.Bytes()...) } func DecodeFloatBlock(block []byte) ([]FloatValue, error) { - iter, _ := tsz.NewIterator(block) + iter, _ := tsz.NewIterator(block[8:]) a := make([]FloatValue, 0) for iter.Next() { t, f := iter.Values() diff --git a/tsdb/engine/pd1/pd1.go b/tsdb/engine/pd1/pd1.go index d94a7d7795..4d3c752ae2 100644 --- a/tsdb/engine/pd1/pd1.go +++ b/tsdb/engine/pd1/pd1.go @@ -55,9 +55,9 @@ const ( // DefaultBlockSize is the default size of uncompressed points blocks. DefaultBlockSize = 512 * 1024 // 512KB - DefaultMaxFileSize = 50 * 1024 * 1024 // 50MB + DefaultMaxFileSize = 5 * 1024 * 1024 // 5MB - DefaultMaxPointsPerBlock = 5000 + DefaultMaxPointsPerBlock = 1000 // MAP_POPULATE is for the mmap syscall. For some reason this isn't defined in golang's syscall MAP_POPULATE = 0x8000 @@ -71,12 +71,15 @@ type Engine struct { mu sync.Mutex path string - shard *tsdb.Shard - // HashSeriesField is a function that takes a series key and a field name // and returns a hash identifier. It's not guaranteed to be unique. HashSeriesField func(key string) uint64 + // Shard is an interface that can pull back field type information based on measurement name + Shard interface { + FieldCodec(measurementName string) *tsdb.FieldCodec + } + filesLock sync.RWMutex files dataFiles currentFileID int @@ -108,11 +111,34 @@ func (e *Engine) Open() error { // TODO: clean up previous names write // TODO: clean up any data files that didn't get cleaned up + files, err := filepath.Glob(filepath.Join(e.path, fmt.Sprintf("*.%s", Format))) + if err != nil { + return err + } + for _, fn := range files { + f, err := os.OpenFile(fn, os.O_RDONLY, 0666) + if err != nil { + return fmt.Errorf("error opening file %s: %s", fn, err.Error()) + } + df, err := NewDataFile(f) + if err != nil { + return fmt.Errorf("error opening memory map for file %s: %s", fn, err.Error()) + } + e.files = append(e.files, df) + } + sort.Sort(e.files) + return nil } // Close closes the engine. func (e *Engine) Close() error { + e.queryLock.Lock() + defer e.queryLock.Unlock() + + for _, df := range e.files { + _ = df.Close() + } return nil } @@ -121,7 +147,7 @@ func (e *Engine) SetLogOutput(w io.Writer) {} // LoadMetadataIndex loads the shard metadata into memory. func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error { - e.shard = shard + e.Shard = shard // TODO: write the metadata from the WAL // Load measurement metadata @@ -165,7 +191,7 @@ func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, func (e *Engine) WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { // TODO: Write points to the WAL - return nil + return e.WriteAndCompact(points, measurementFieldsToSave, seriesToCreate) } func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { @@ -200,8 +226,6 @@ func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave ma } } - fmt.Println("read names: ", len(names), len(ids)) - // these are values that are newer than anything stored in the shard valuesByID := make(map[uint64]*valueCollection) // map the points to the data file they belong to if they overlap @@ -288,7 +312,6 @@ func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave ma } } - fmt.Println("writing names:", len(names)) b, err = json.Marshal(names) if err != nil { return err @@ -302,14 +325,13 @@ func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave ma if overwriteNewestFile { if err := e.rewriteFile(newestDataFile, valuesByID); err != nil { return err - } else if err := e.rewriteFile(nil, valuesByID); err != nil { - return err } + } else if err := e.rewriteFile(nil, valuesByID); err != nil { + return err } // flush each of the old ones for df, vals := range dataFileToValues { - fmt.Println("writing vals to old file: ", df.f.Name()) if err := e.rewriteFile(df, vals); err != nil { return err } @@ -374,6 +396,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection f.Close() return nil } + // write the series ids and empty starting positions for _, id := range ids { if _, err := f.Write(append(u64tob(id), []byte{0x00, 0x00, 0x00, 0x00}...)); err != nil { @@ -423,6 +446,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection // if the values are not in the file, just write the new ones fpos, ok := oldIDToPosition[id] if !ok { + // TODO: ensure we encode only the amount in a block block := newVals.Encode(buf) if _, err := f.Write(append(u64tob(id), u32tob(uint32(len(block)))...)); err != nil { f.Close() @@ -444,7 +468,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection break } length := btou32(oldDF.mmap[fpos+8 : fpos+12]) - block := oldDF.mmap[fpos : fpos+12+length] + block := oldDF.mmap[fpos+12 : fpos+12+length] fpos += (12 + length) // determine if there's a block after this with the same id and get its time @@ -477,6 +501,21 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection break } } + + // TODO: ensure we encode only the amount in a block, refactor this wil line 450 into func + if len(newVals.floatValues) > 0 { + // TODO: ensure we encode only the amount in a block + block := newVals.Encode(buf) + if _, err := f.Write(append(u64tob(id), u32tob(uint32(len(block)))...)); err != nil { + f.Close() + return err + } + if _, err := f.Write(block); err != nil { + f.Close() + return err + } + currentPosition += uint32(12 + len(block)) + } } // write out the times and positions @@ -572,7 +611,6 @@ func (e *Engine) replaceCompressedFile(name string, data []byte) error { if _, err := f.Write(b); err != nil { return err } - fmt.Println("compressed: ", len(b)) if err := f.Close(); err != nil { return err } @@ -605,7 +643,7 @@ func (e *Engine) Begin(writable bool) (tsdb.Tx, error) { // TODO: make the cursor take a field name func (e *Engine) Cursor(series string, direction tsdb.Direction) tsdb.Cursor { measurementName := tsdb.MeasurementFromSeriesKey(series) - codec := e.shard.FieldCodec(measurementName) + codec := e.Shard.FieldCodec(measurementName) if codec == nil { return &cursor{} } @@ -658,7 +696,7 @@ func (e *Engine) writeFields(fields map[string]*tsdb.MeasurementFields) error { return err } - fn := e.path + "." + FieldsFileExtension + "tmp" + fn := filepath.Join(e.path, FieldsFileExtension+"tmp") ff, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666) if err != nil { return err @@ -670,7 +708,7 @@ func (e *Engine) writeFields(fields map[string]*tsdb.MeasurementFields) error { if err := ff.Close(); err != nil { return err } - fieldsFileName := e.path + "." + FieldsFileExtension + fieldsFileName := filepath.Join(e.path, FieldsFileExtension) if _, err := os.Stat(fieldsFileName); !os.IsNotExist(err) { if err := os.Remove(fieldsFileName); err != nil { @@ -684,7 +722,7 @@ func (e *Engine) writeFields(fields map[string]*tsdb.MeasurementFields) error { func (e *Engine) readFields() (map[string]*tsdb.MeasurementFields, error) { fields := make(map[string]*tsdb.MeasurementFields) - f, err := os.OpenFile(e.path+"."+FieldsFileExtension, os.O_RDONLY, 0666) + f, err := os.OpenFile(filepath.Join(e.path, FieldsFileExtension), os.O_RDONLY, 0666) if os.IsNotExist(err) { return fields, nil } else if err != nil { @@ -732,7 +770,7 @@ func (e *Engine) writeSeries(series map[string]*tsdb.Series) error { return err } - fn := e.path + "." + SeriesFileExtension + "tmp" + fn := filepath.Join(e.path, SeriesFileExtension+"tmp") ff, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666) if err != nil { return err @@ -744,7 +782,7 @@ func (e *Engine) writeSeries(series map[string]*tsdb.Series) error { if err := ff.Close(); err != nil { return err } - seriesFileName := e.path + "." + SeriesFileExtension + seriesFileName := filepath.Join(e.path, SeriesFileExtension) if _, err := os.Stat(seriesFileName); !os.IsNotExist(err) { if err := os.Remove(seriesFileName); err != nil && err != os.ErrNotExist { @@ -758,7 +796,7 @@ func (e *Engine) writeSeries(series map[string]*tsdb.Series) error { func (e *Engine) readSeries() (map[string]*tsdb.Series, error) { series := make(map[string]*tsdb.Series) - f, err := os.OpenFile(e.path+"."+SeriesFileExtension, os.O_RDONLY, 0666) + f, err := os.OpenFile(filepath.Join(e.path, SeriesFileExtension), os.O_RDONLY, 0666) if os.IsNotExist(err) { return series, nil } else if err != nil { @@ -843,14 +881,15 @@ func (v *valueCollection) DecodeAndCombine(block, buf []byte, nextTime int64, ha } if hasFutureBlock { - for i, val := range v.floatValues { - if val.Time.UnixNano() > nextTime { - values = append(values, v.floatValues[:i]...) - v.floatValues = v.floatValues[i:] - } - } + // take all values that have times less than the future block and update the vals array + pos := sort.Search(len(v.floatValues), func(i int) bool { + return v.floatValues[i].Time.UnixNano() >= nextTime + }) + values = append(values, v.floatValues[:pos]...) + v.floatValues = v.floatValues[pos:] } else { values = append(values, v.floatValues...) + v.floatValues = nil } sort.Sort(FloatValues(values)) // TODO: deduplicate values @@ -955,7 +994,8 @@ func (d *dataFile) IDToPosition() map[uint64]uint32 { for i := 0; i < count; i++ { offset := 20 + (i * 12) id := btou64(d.mmap[offset : offset+8]) - m[id] = btou32(d.mmap[offset+8 : offset+12]) + pos := btou32(d.mmap[offset+8 : offset+12]) + m[id] = pos } return m @@ -968,26 +1008,23 @@ func (d *dataFile) StartingPositionForID(id uint64) uint32 { seriesCount := d.SeriesCount() min := 0 - max := seriesCount - // // set the minimum position to the first after the file header - // posMin := fileHeaderSize - - // // set the maximum position to the end of the series header - // posMax := fileHeaderSize + (seriesCount * seriesHeaderSize) + max := int(seriesCount) for min < max { mid := (max-min)/2 + min offset := mid*seriesHeaderSize + fileHeaderSize - checkID := btou64(d.mmap[offset:8]) + checkID := btou64(d.mmap[offset : offset+8]) if checkID == id { return btou32(d.mmap[offset+8 : offset+12]) } else if checkID < id { min = mid + 1 + } else { + max = mid } - max = mid } + return uint32(0) } @@ -998,12 +1035,12 @@ func (a dataFiles) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a dataFiles) Less(i, j int) bool { return a[i].MinTime() < a[j].MinTime() } type cursor struct { - id uint64 - dataType influxql.DataType - f *dataFile - dataFilePos int - pos uint32 - vals []FloatValues + id uint64 + dataType influxql.DataType + f *dataFile + filesPos int // the index in the files slice we're looking at + pos uint32 + vals FloatValues direction tsdb.Direction @@ -1013,29 +1050,136 @@ type cursor struct { func newCursor(id uint64, dataType influxql.DataType, files []*dataFile, direction tsdb.Direction) *cursor { return &cursor{ - ids: id, - types: dataType, + id: id, + dataType: dataType, direction: direction, files: files, } } -func (c *cursor) Seek(seek []byte) (key, value []byte) { return nil, nil } +func (c *cursor) Seek(seek []byte) (key, value []byte) { + t := int64(btou64(seek)) -func (c *cursor) Next() (key, value []byte) { - if vals == nil { - // loop until we find a file with some data - for dataFilePos < len(c.files) { - f = c.files[c.dataFilePos] - c.dataFilePos++ - - // startPosition := f + if t < c.files[0].MinTime() { + c.filesPos = 0 + c.f = c.files[0] + } else { + for i, f := range c.files { + if t >= f.MinTime() && t <= f.MaxTime() { + c.filesPos = i + c.f = f + break + } + } + } + + if c.f == nil { + return nil, nil + } + + // TODO: make this for the reverse direction cursor + + // now find the spot in the file we need to go + for { + pos := c.f.StartingPositionForID(c.id) + + // if this id isn't in this file, move to next one or return + if pos == 0 { + c.filesPos++ + if c.filesPos >= len(c.files) { + return nil, nil + } + c.f = c.files[c.filesPos] + continue + } + + // seek to the block and values we're looking for + for { + // if the time is between this block and the next, + // decode this block and go, otherwise seek to next block + length := btou32(c.f.mmap[pos+8 : pos+12]) + + // if the next block has a time less than what we're seeking to, + // skip decoding this block and continue on + nextBlockPos := pos + 12 + length + if nextBlockPos < c.f.size { + nextBlockID := btou64(c.f.mmap[nextBlockPos : nextBlockPos+8]) + if nextBlockID == c.id { + nextBlockTime := int64(btou64(c.f.mmap[nextBlockPos+12 : nextBlockPos+20])) + if nextBlockTime <= t { + pos = nextBlockPos + continue + } + } + } + + // it must be in this block or not at all + tb, vb := c.decodeBlockAndGetValues(pos) + if int64(btou64(tb)) >= t { + return tb, vb + } + + // wasn't in the first value popped out of the block, check the rest + for i, v := range c.vals { + if v.Time.UnixNano() >= t { + c.vals = c.vals[i+1:] + return v.TimeBytes(), v.ValueBytes() + } + } + + // not in this one, let the top loop look for it in the next file + break } } - return nil, nil } -func (c *cursor) next(id uint64) (key, value []byte) +func (c *cursor) Next() (key, value []byte) { + if len(c.vals) == 0 { + // if we have a file set, see if the next block is for this ID + if c.f != nil && c.pos < c.f.size { + nextBlockID := btou64(c.f.mmap[c.pos : c.pos+8]) + if nextBlockID == c.id { + return c.decodeBlockAndGetValues(c.pos) + } + } + + // if the file is nil we hit the end of the previous file, advance the file cursor + if c.f != nil { + c.filesPos++ + } + + // loop until we find a file with some data + for c.filesPos < len(c.files) { + f := c.files[c.filesPos] + + startingPos := f.StartingPositionForID(c.id) + if startingPos == 0 { + continue + } + c.f = f + return c.decodeBlockAndGetValues(startingPos) + } + + // we didn't get to a file that had a next value + return nil, nil + } + + v := c.vals[0] + c.vals = c.vals[1:] + + return v.TimeBytes(), v.ValueBytes() +} + +func (c *cursor) decodeBlockAndGetValues(position uint32) ([]byte, []byte) { + length := btou32(c.f.mmap[position+8 : position+12]) + block := c.f.mmap[position+12 : position+12+length] + c.vals, _ = DecodeFloatBlock(block) + c.pos = position + 12 + length + + v := c.vals[0] + c.vals = c.vals[1:] + return v.TimeBytes(), v.ValueBytes() +} func (c *cursor) Direction() tsdb.Direction { return c.direction } diff --git a/tsdb/engine/pd1/pd1_test.go b/tsdb/engine/pd1/pd1_test.go index 48cdff7929..7d915aab69 100644 --- a/tsdb/engine/pd1/pd1_test.go +++ b/tsdb/engine/pd1/pd1_test.go @@ -4,8 +4,8 @@ import ( "encoding/binary" "fmt" "io/ioutil" + "math" "os" - "reflect" "testing" "time" @@ -18,37 +18,31 @@ func TestEngine_WriteAndReadFloats(t *testing.T) { e := OpenDefaultEngine() defer e.Close() - codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ - "value": { - ID: uint8(1), - Name: "value", - Type: influxql.Float, - }, - }) + e.Shard = newFieldCodecMock(map[string]influxql.DataType{"value": influxql.Float}) - p1 := parsePoint("cpu,host=A value=1.1 1000000000", codec) - p2 := parsePoint("cpu,host=B value=1.2 1000000000", codec) - p3 := parsePoint("cpu,host=A value=2.1 2000000000", codec) - p4 := parsePoint("cpu,host=B value=2.2 2000000000", codec) + p1 := parsePoint("cpu,host=A value=1.1 1000000000") + p2 := parsePoint("cpu,host=B value=1.2 1000000000") + p3 := parsePoint("cpu,host=A value=2.1 2000000000") + p4 := parsePoint("cpu,host=B value=2.2 2000000000") if err := e.WriteAndCompact([]tsdb.Point{p1, p2, p3}, nil, nil); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } - verify := func() { + verify := func(checkSingleBVal bool) { c := e.Cursor("cpu,host=A", tsdb.Forward) k, v := c.Next() if btou64(k) != uint64(p1.UnixNano()) { t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), btou64(k)) } - if !reflect.DeepEqual(v, p1.Data()) { + if 1.1 != btof64(v) { t.Fatal("p1 data not equal") } k, v = c.Next() if btou64(k) != uint64(p3.UnixNano()) { t.Fatalf("p3 time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), btou64(k)) } - if !reflect.DeepEqual(v, p3.Data()) { + if 2.1 != btof64(v) { t.Fatal("p3 data not equal") } k, v = c.Next() @@ -61,28 +55,56 @@ func TestEngine_WriteAndReadFloats(t *testing.T) { if btou64(k) != uint64(p2.UnixNano()) { t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k)) } - if !reflect.DeepEqual(v, p2.Data()) { + if 1.2 != btof64(v) { t.Fatal("p2 data not equal") } - k, v = c.Next() - if k != nil { - t.Fatal("expected nil") + + if checkSingleBVal { + k, v = c.Next() + if k != nil { + t.Fatal("expected nil") + } } } - verify() + verify(true) if err := e.WriteAndCompact([]tsdb.Point{p4}, nil, nil); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } - verify() + verify(false) c := e.Cursor("cpu,host=B", tsdb.Forward) - k, v := c.Seek(u64tob(2000000000)) - if btou64(k) != uint64(p4.UnixNano()) { - t.Fatalf("p4 time wrong:\n\texp:%d\n\tgot:%d\n", p4.UnixNano(), btou64(k)) + k, v := c.Next() + if btou64(k) != uint64(p2.UnixNano()) { + t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k)) } - if !reflect.DeepEqual(v, p4.Data()) { - t.Fatal("p4 data not equal") + if 1.2 != btof64(v) { + t.Fatal("p2 data not equal") + } + k, v = c.Next() + if btou64(k) != uint64(p4.UnixNano()) { + t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k)) + } + if 2.2 != btof64(v) { + t.Fatal("p2 data not equal") + } + + // verify we can seek + k, v = c.Seek(u64tob(2000000000)) + if btou64(k) != uint64(p4.UnixNano()) { + t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k)) + } + if 2.2 != btof64(v) { + t.Fatal("p2 data not equal") + } + + c = e.Cursor("cpu,host=A", tsdb.Forward) + k, v = c.Seek(u64tob(0)) + if btou64(k) != uint64(p1.UnixNano()) { + t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), btou64(k)) + } + if 1.1 != btof64(v) { + t.Fatal("p1 data not equal") } } @@ -95,17 +117,9 @@ func TestEngine_WriteIndexBenchmarkNames(t *testing.T) { e := OpenDefaultEngine() defer e.Close() - codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ - "value": { - ID: uint8(1), - Name: "value", - Type: influxql.Float, - }, - }) - var points []tsdb.Point for i := 0; i < 100000; i++ { - points = append(points, parsePoint(fmt.Sprintf("cpu%d value=22.1", i), codec)) + points = append(points, parsePoint(fmt.Sprintf("cpu%d value=22.1", i))) } st := time.Now() @@ -160,23 +174,35 @@ func (e *Engine) Close() error { return nil } -func parsePoints(buf string, codec *tsdb.FieldCodec) []tsdb.Point { +func newFieldCodecMock(fields map[string]influxql.DataType) *FieldCodeMock { + m := make(map[string]*tsdb.Field) + + for n, t := range fields { + m[n] = &tsdb.Field{Name: n, Type: t} + } + codec := tsdb.NewFieldCodec(m) + + return &FieldCodeMock{codec: codec} +} + +type FieldCodeMock struct { + codec *tsdb.FieldCodec +} + +func (f *FieldCodeMock) FieldCodec(m string) *tsdb.FieldCodec { + return f.codec +} + +func parsePoints(buf string) []tsdb.Point { points, err := tsdb.ParsePointsString(buf) if err != nil { panic(fmt.Sprintf("couldn't parse points: %s", err.Error())) } - for _, p := range points { - b, err := codec.EncodeFields(p.Fields()) - if err != nil { - panic(fmt.Sprintf("couldn't encode fields: %s", err.Error())) - } - p.SetData(b) - } return points } -func parsePoint(buf string, codec *tsdb.FieldCodec) tsdb.Point { - return parsePoints(buf, codec)[0] +func parsePoint(buf string) tsdb.Point { + return parsePoints(buf)[0] } func inttob(v int) []byte { @@ -194,3 +220,7 @@ func u64tob(v uint64) []byte { binary.BigEndian.PutUint64(b, v) return b } + +func btof64(b []byte) float64 { + return math.Float64frombits(binary.BigEndian.Uint64(b)) +} diff --git a/tsdb/shard.go b/tsdb/shard.go index a14822f367..2e04735665 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -229,28 +229,29 @@ func (s *Shard) WritePoints(points []models.Point) error { } // make sure all data is encoded before attempting to save to bolt - for _, p := range points { - // Ignore if raw data has already been marshaled. - if p.Data() != nil { - continue - } + // TODO: make this only commented out for pd1 engine + // for _, p := range points { + // // Ignore if raw data has already been marshaled. + // if p.Data() != nil { + // continue + // } - // This was populated earlier, don't need to validate that it's there. - s.mu.RLock() - mf := s.measurementFields[p.Name()] - s.mu.RUnlock() + // // This was populated earlier, don't need to validate that it's there. + // s.mu.RLock() + // mf := s.measurementFields[p.Name()] + // s.mu.RUnlock() - // If a measurement is dropped while writes for it are in progress, this could be nil - if mf == nil { - return ErrFieldNotFound - } + // // If a measurement is dropped while writes for it are in progress, this could be nil + // if mf == nil { + // return ErrFieldNotFound + // } - data, err := mf.Codec.EncodeFields(p.Fields()) - if err != nil { - return err - } - p.SetData(data) - } + // data, err := mf.Codec.EncodeFields(p.Fields()) + // if err != nil { + // return err + // } + // p.SetData(data) + // } // Write to the engine. if err := s.engine.WritePoints(points, measurementFieldsToSave, seriesToCreate); err != nil { @@ -741,11 +742,14 @@ func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) { // DecodeByName scans a byte slice for a field with the given name, converts it to its // expected type, and return that value. func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error) { - fi := f.FieldByName(name) - if fi == nil { - return 0, ErrFieldNotFound - } - return f.DecodeByID(fi.ID, b) + // TODO: this is a hack for PD1 testing, please to remove + return math.Float64frombits(binary.BigEndian.Uint64(b)), nil + + // fi := f.FieldByName(name) + // if fi == nil { + // return 0, ErrFieldNotFound + // } + // return f.DecodeByID(fi.ID, b) } func (f *FieldCodec) Fields() (a []*Field) {