WIP: finish basics of PD1. IT WORKS! (kind of)
parent
7555ccbd70
commit
2ba032b7a8
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"runtime"
|
||||
"sort"
|
||||
"time"
|
||||
|
|
|
@ -18,6 +18,9 @@ var (
|
|||
ErrFormatNotFound = errors.New("format not found")
|
||||
)
|
||||
|
||||
// DefaultEngine is the default engine used by the shard when initializing.
|
||||
const DefaultEngine = "pd1"
|
||||
|
||||
// Engine represents a swappable storage engine for the shard.
|
||||
type Engine interface {
|
||||
Open() error
|
||||
|
|
|
@ -1,16 +1,34 @@
|
|||
package pd1
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/dgryski/go-tsz"
|
||||
)
|
||||
|
||||
type Value interface {
|
||||
TimeBytes() []byte
|
||||
ValueBytes() []byte
|
||||
Time() time.Time
|
||||
}
|
||||
|
||||
type FloatValue struct {
|
||||
Time time.Time
|
||||
Value float64
|
||||
}
|
||||
|
||||
func (f *FloatValue) TimeBytes() []byte {
|
||||
return u64tob(uint64(f.Time.UnixNano()))
|
||||
}
|
||||
|
||||
func (f *FloatValue) ValueBytes() []byte {
|
||||
buf := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(buf, math.Float64bits(f.Value))
|
||||
return buf
|
||||
}
|
||||
|
||||
type FloatValues []FloatValue
|
||||
|
||||
func (a FloatValues) Len() int { return len(a) }
|
||||
|
@ -24,11 +42,11 @@ func EncodeFloatBlock(buf []byte, values []FloatValue) []byte {
|
|||
s.Push(uint32(v.Time.Unix()), v.Value)
|
||||
}
|
||||
s.Finish()
|
||||
return s.Bytes()
|
||||
return append(u64tob(uint64(values[0].Time.UnixNano())), s.Bytes()...)
|
||||
}
|
||||
|
||||
func DecodeFloatBlock(block []byte) ([]FloatValue, error) {
|
||||
iter, _ := tsz.NewIterator(block)
|
||||
iter, _ := tsz.NewIterator(block[8:])
|
||||
a := make([]FloatValue, 0)
|
||||
for iter.Next() {
|
||||
t, f := iter.Values()
|
||||
|
|
|
@ -55,9 +55,9 @@ const (
|
|||
// DefaultBlockSize is the default size of uncompressed points blocks.
|
||||
DefaultBlockSize = 512 * 1024 // 512KB
|
||||
|
||||
DefaultMaxFileSize = 50 * 1024 * 1024 // 50MB
|
||||
DefaultMaxFileSize = 5 * 1024 * 1024 // 5MB
|
||||
|
||||
DefaultMaxPointsPerBlock = 5000
|
||||
DefaultMaxPointsPerBlock = 1000
|
||||
|
||||
// MAP_POPULATE is for the mmap syscall. For some reason this isn't defined in golang's syscall
|
||||
MAP_POPULATE = 0x8000
|
||||
|
@ -71,12 +71,15 @@ type Engine struct {
|
|||
mu sync.Mutex
|
||||
path string
|
||||
|
||||
shard *tsdb.Shard
|
||||
|
||||
// HashSeriesField is a function that takes a series key and a field name
|
||||
// and returns a hash identifier. It's not guaranteed to be unique.
|
||||
HashSeriesField func(key string) uint64
|
||||
|
||||
// Shard is an interface that can pull back field type information based on measurement name
|
||||
Shard interface {
|
||||
FieldCodec(measurementName string) *tsdb.FieldCodec
|
||||
}
|
||||
|
||||
filesLock sync.RWMutex
|
||||
files dataFiles
|
||||
currentFileID int
|
||||
|
@ -108,11 +111,34 @@ func (e *Engine) Open() error {
|
|||
// TODO: clean up previous names write
|
||||
// TODO: clean up any data files that didn't get cleaned up
|
||||
|
||||
files, err := filepath.Glob(filepath.Join(e.path, fmt.Sprintf("*.%s", Format)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, fn := range files {
|
||||
f, err := os.OpenFile(fn, os.O_RDONLY, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening file %s: %s", fn, err.Error())
|
||||
}
|
||||
df, err := NewDataFile(f)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening memory map for file %s: %s", fn, err.Error())
|
||||
}
|
||||
e.files = append(e.files, df)
|
||||
}
|
||||
sort.Sort(e.files)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the engine.
|
||||
func (e *Engine) Close() error {
|
||||
e.queryLock.Lock()
|
||||
defer e.queryLock.Unlock()
|
||||
|
||||
for _, df := range e.files {
|
||||
_ = df.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -121,7 +147,7 @@ func (e *Engine) SetLogOutput(w io.Writer) {}
|
|||
|
||||
// LoadMetadataIndex loads the shard metadata into memory.
|
||||
func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
|
||||
e.shard = shard
|
||||
e.Shard = shard
|
||||
// TODO: write the metadata from the WAL
|
||||
|
||||
// Load measurement metadata
|
||||
|
@ -165,7 +191,7 @@ func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex,
|
|||
func (e *Engine) WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
|
||||
// TODO: Write points to the WAL
|
||||
|
||||
return nil
|
||||
return e.WriteAndCompact(points, measurementFieldsToSave, seriesToCreate)
|
||||
}
|
||||
|
||||
func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
|
||||
|
@ -200,8 +226,6 @@ func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave ma
|
|||
}
|
||||
}
|
||||
|
||||
fmt.Println("read names: ", len(names), len(ids))
|
||||
|
||||
// these are values that are newer than anything stored in the shard
|
||||
valuesByID := make(map[uint64]*valueCollection)
|
||||
// map the points to the data file they belong to if they overlap
|
||||
|
@ -288,7 +312,6 @@ func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave ma
|
|||
}
|
||||
}
|
||||
|
||||
fmt.Println("writing names:", len(names))
|
||||
b, err = json.Marshal(names)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -302,14 +325,13 @@ func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave ma
|
|||
if overwriteNewestFile {
|
||||
if err := e.rewriteFile(newestDataFile, valuesByID); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if err := e.rewriteFile(nil, valuesByID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// flush each of the old ones
|
||||
for df, vals := range dataFileToValues {
|
||||
fmt.Println("writing vals to old file: ", df.f.Name())
|
||||
if err := e.rewriteFile(df, vals); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -374,6 +396,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection
|
|||
f.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// write the series ids and empty starting positions
|
||||
for _, id := range ids {
|
||||
if _, err := f.Write(append(u64tob(id), []byte{0x00, 0x00, 0x00, 0x00}...)); err != nil {
|
||||
|
@ -423,6 +446,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection
|
|||
// if the values are not in the file, just write the new ones
|
||||
fpos, ok := oldIDToPosition[id]
|
||||
if !ok {
|
||||
// TODO: ensure we encode only the amount in a block
|
||||
block := newVals.Encode(buf)
|
||||
if _, err := f.Write(append(u64tob(id), u32tob(uint32(len(block)))...)); err != nil {
|
||||
f.Close()
|
||||
|
@ -444,7 +468,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection
|
|||
break
|
||||
}
|
||||
length := btou32(oldDF.mmap[fpos+8 : fpos+12])
|
||||
block := oldDF.mmap[fpos : fpos+12+length]
|
||||
block := oldDF.mmap[fpos+12 : fpos+12+length]
|
||||
fpos += (12 + length)
|
||||
|
||||
// determine if there's a block after this with the same id and get its time
|
||||
|
@ -477,6 +501,21 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection
|
|||
break
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: ensure we encode only the amount in a block, refactor this wil line 450 into func
|
||||
if len(newVals.floatValues) > 0 {
|
||||
// TODO: ensure we encode only the amount in a block
|
||||
block := newVals.Encode(buf)
|
||||
if _, err := f.Write(append(u64tob(id), u32tob(uint32(len(block)))...)); err != nil {
|
||||
f.Close()
|
||||
return err
|
||||
}
|
||||
if _, err := f.Write(block); err != nil {
|
||||
f.Close()
|
||||
return err
|
||||
}
|
||||
currentPosition += uint32(12 + len(block))
|
||||
}
|
||||
}
|
||||
|
||||
// write out the times and positions
|
||||
|
@ -572,7 +611,6 @@ func (e *Engine) replaceCompressedFile(name string, data []byte) error {
|
|||
if _, err := f.Write(b); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Println("compressed: ", len(b))
|
||||
if err := f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -605,7 +643,7 @@ func (e *Engine) Begin(writable bool) (tsdb.Tx, error) {
|
|||
// TODO: make the cursor take a field name
|
||||
func (e *Engine) Cursor(series string, direction tsdb.Direction) tsdb.Cursor {
|
||||
measurementName := tsdb.MeasurementFromSeriesKey(series)
|
||||
codec := e.shard.FieldCodec(measurementName)
|
||||
codec := e.Shard.FieldCodec(measurementName)
|
||||
if codec == nil {
|
||||
return &cursor{}
|
||||
}
|
||||
|
@ -658,7 +696,7 @@ func (e *Engine) writeFields(fields map[string]*tsdb.MeasurementFields) error {
|
|||
return err
|
||||
}
|
||||
|
||||
fn := e.path + "." + FieldsFileExtension + "tmp"
|
||||
fn := filepath.Join(e.path, FieldsFileExtension+"tmp")
|
||||
ff, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -670,7 +708,7 @@ func (e *Engine) writeFields(fields map[string]*tsdb.MeasurementFields) error {
|
|||
if err := ff.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
fieldsFileName := e.path + "." + FieldsFileExtension
|
||||
fieldsFileName := filepath.Join(e.path, FieldsFileExtension)
|
||||
|
||||
if _, err := os.Stat(fieldsFileName); !os.IsNotExist(err) {
|
||||
if err := os.Remove(fieldsFileName); err != nil {
|
||||
|
@ -684,7 +722,7 @@ func (e *Engine) writeFields(fields map[string]*tsdb.MeasurementFields) error {
|
|||
func (e *Engine) readFields() (map[string]*tsdb.MeasurementFields, error) {
|
||||
fields := make(map[string]*tsdb.MeasurementFields)
|
||||
|
||||
f, err := os.OpenFile(e.path+"."+FieldsFileExtension, os.O_RDONLY, 0666)
|
||||
f, err := os.OpenFile(filepath.Join(e.path, FieldsFileExtension), os.O_RDONLY, 0666)
|
||||
if os.IsNotExist(err) {
|
||||
return fields, nil
|
||||
} else if err != nil {
|
||||
|
@ -732,7 +770,7 @@ func (e *Engine) writeSeries(series map[string]*tsdb.Series) error {
|
|||
return err
|
||||
}
|
||||
|
||||
fn := e.path + "." + SeriesFileExtension + "tmp"
|
||||
fn := filepath.Join(e.path, SeriesFileExtension+"tmp")
|
||||
ff, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -744,7 +782,7 @@ func (e *Engine) writeSeries(series map[string]*tsdb.Series) error {
|
|||
if err := ff.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
seriesFileName := e.path + "." + SeriesFileExtension
|
||||
seriesFileName := filepath.Join(e.path, SeriesFileExtension)
|
||||
|
||||
if _, err := os.Stat(seriesFileName); !os.IsNotExist(err) {
|
||||
if err := os.Remove(seriesFileName); err != nil && err != os.ErrNotExist {
|
||||
|
@ -758,7 +796,7 @@ func (e *Engine) writeSeries(series map[string]*tsdb.Series) error {
|
|||
func (e *Engine) readSeries() (map[string]*tsdb.Series, error) {
|
||||
series := make(map[string]*tsdb.Series)
|
||||
|
||||
f, err := os.OpenFile(e.path+"."+SeriesFileExtension, os.O_RDONLY, 0666)
|
||||
f, err := os.OpenFile(filepath.Join(e.path, SeriesFileExtension), os.O_RDONLY, 0666)
|
||||
if os.IsNotExist(err) {
|
||||
return series, nil
|
||||
} else if err != nil {
|
||||
|
@ -843,14 +881,15 @@ func (v *valueCollection) DecodeAndCombine(block, buf []byte, nextTime int64, ha
|
|||
}
|
||||
|
||||
if hasFutureBlock {
|
||||
for i, val := range v.floatValues {
|
||||
if val.Time.UnixNano() > nextTime {
|
||||
values = append(values, v.floatValues[:i]...)
|
||||
v.floatValues = v.floatValues[i:]
|
||||
}
|
||||
}
|
||||
// take all values that have times less than the future block and update the vals array
|
||||
pos := sort.Search(len(v.floatValues), func(i int) bool {
|
||||
return v.floatValues[i].Time.UnixNano() >= nextTime
|
||||
})
|
||||
values = append(values, v.floatValues[:pos]...)
|
||||
v.floatValues = v.floatValues[pos:]
|
||||
} else {
|
||||
values = append(values, v.floatValues...)
|
||||
v.floatValues = nil
|
||||
}
|
||||
sort.Sort(FloatValues(values))
|
||||
// TODO: deduplicate values
|
||||
|
@ -955,7 +994,8 @@ func (d *dataFile) IDToPosition() map[uint64]uint32 {
|
|||
for i := 0; i < count; i++ {
|
||||
offset := 20 + (i * 12)
|
||||
id := btou64(d.mmap[offset : offset+8])
|
||||
m[id] = btou32(d.mmap[offset+8 : offset+12])
|
||||
pos := btou32(d.mmap[offset+8 : offset+12])
|
||||
m[id] = pos
|
||||
}
|
||||
|
||||
return m
|
||||
|
@ -968,26 +1008,23 @@ func (d *dataFile) StartingPositionForID(id uint64) uint32 {
|
|||
seriesCount := d.SeriesCount()
|
||||
|
||||
min := 0
|
||||
max := seriesCount
|
||||
// // set the minimum position to the first after the file header
|
||||
// posMin := fileHeaderSize
|
||||
|
||||
// // set the maximum position to the end of the series header
|
||||
// posMax := fileHeaderSize + (seriesCount * seriesHeaderSize)
|
||||
max := int(seriesCount)
|
||||
|
||||
for min < max {
|
||||
mid := (max-min)/2 + min
|
||||
|
||||
offset := mid*seriesHeaderSize + fileHeaderSize
|
||||
checkID := btou64(d.mmap[offset:8])
|
||||
checkID := btou64(d.mmap[offset : offset+8])
|
||||
|
||||
if checkID == id {
|
||||
return btou32(d.mmap[offset+8 : offset+12])
|
||||
} else if checkID < id {
|
||||
min = mid + 1
|
||||
}
|
||||
} else {
|
||||
max = mid
|
||||
}
|
||||
}
|
||||
|
||||
return uint32(0)
|
||||
}
|
||||
|
||||
|
@ -1001,9 +1038,9 @@ type cursor struct {
|
|||
id uint64
|
||||
dataType influxql.DataType
|
||||
f *dataFile
|
||||
dataFilePos int
|
||||
filesPos int // the index in the files slice we're looking at
|
||||
pos uint32
|
||||
vals []FloatValues
|
||||
vals FloatValues
|
||||
|
||||
direction tsdb.Direction
|
||||
|
||||
|
@ -1013,29 +1050,136 @@ type cursor struct {
|
|||
|
||||
func newCursor(id uint64, dataType influxql.DataType, files []*dataFile, direction tsdb.Direction) *cursor {
|
||||
return &cursor{
|
||||
ids: id,
|
||||
types: dataType,
|
||||
id: id,
|
||||
dataType: dataType,
|
||||
direction: direction,
|
||||
files: files,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cursor) Seek(seek []byte) (key, value []byte) { return nil, nil }
|
||||
func (c *cursor) Seek(seek []byte) (key, value []byte) {
|
||||
t := int64(btou64(seek))
|
||||
|
||||
func (c *cursor) Next() (key, value []byte) {
|
||||
if vals == nil {
|
||||
// loop until we find a file with some data
|
||||
for dataFilePos < len(c.files) {
|
||||
f = c.files[c.dataFilePos]
|
||||
c.dataFilePos++
|
||||
|
||||
// startPosition := f
|
||||
if t < c.files[0].MinTime() {
|
||||
c.filesPos = 0
|
||||
c.f = c.files[0]
|
||||
} else {
|
||||
for i, f := range c.files {
|
||||
if t >= f.MinTime() && t <= f.MaxTime() {
|
||||
c.filesPos = i
|
||||
c.f = f
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if c.f == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *cursor) next(id uint64) (key, value []byte)
|
||||
// TODO: make this for the reverse direction cursor
|
||||
|
||||
// now find the spot in the file we need to go
|
||||
for {
|
||||
pos := c.f.StartingPositionForID(c.id)
|
||||
|
||||
// if this id isn't in this file, move to next one or return
|
||||
if pos == 0 {
|
||||
c.filesPos++
|
||||
if c.filesPos >= len(c.files) {
|
||||
return nil, nil
|
||||
}
|
||||
c.f = c.files[c.filesPos]
|
||||
continue
|
||||
}
|
||||
|
||||
// seek to the block and values we're looking for
|
||||
for {
|
||||
// if the time is between this block and the next,
|
||||
// decode this block and go, otherwise seek to next block
|
||||
length := btou32(c.f.mmap[pos+8 : pos+12])
|
||||
|
||||
// if the next block has a time less than what we're seeking to,
|
||||
// skip decoding this block and continue on
|
||||
nextBlockPos := pos + 12 + length
|
||||
if nextBlockPos < c.f.size {
|
||||
nextBlockID := btou64(c.f.mmap[nextBlockPos : nextBlockPos+8])
|
||||
if nextBlockID == c.id {
|
||||
nextBlockTime := int64(btou64(c.f.mmap[nextBlockPos+12 : nextBlockPos+20]))
|
||||
if nextBlockTime <= t {
|
||||
pos = nextBlockPos
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// it must be in this block or not at all
|
||||
tb, vb := c.decodeBlockAndGetValues(pos)
|
||||
if int64(btou64(tb)) >= t {
|
||||
return tb, vb
|
||||
}
|
||||
|
||||
// wasn't in the first value popped out of the block, check the rest
|
||||
for i, v := range c.vals {
|
||||
if v.Time.UnixNano() >= t {
|
||||
c.vals = c.vals[i+1:]
|
||||
return v.TimeBytes(), v.ValueBytes()
|
||||
}
|
||||
}
|
||||
|
||||
// not in this one, let the top loop look for it in the next file
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cursor) Next() (key, value []byte) {
|
||||
if len(c.vals) == 0 {
|
||||
// if we have a file set, see if the next block is for this ID
|
||||
if c.f != nil && c.pos < c.f.size {
|
||||
nextBlockID := btou64(c.f.mmap[c.pos : c.pos+8])
|
||||
if nextBlockID == c.id {
|
||||
return c.decodeBlockAndGetValues(c.pos)
|
||||
}
|
||||
}
|
||||
|
||||
// if the file is nil we hit the end of the previous file, advance the file cursor
|
||||
if c.f != nil {
|
||||
c.filesPos++
|
||||
}
|
||||
|
||||
// loop until we find a file with some data
|
||||
for c.filesPos < len(c.files) {
|
||||
f := c.files[c.filesPos]
|
||||
|
||||
startingPos := f.StartingPositionForID(c.id)
|
||||
if startingPos == 0 {
|
||||
continue
|
||||
}
|
||||
c.f = f
|
||||
return c.decodeBlockAndGetValues(startingPos)
|
||||
}
|
||||
|
||||
// we didn't get to a file that had a next value
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
v := c.vals[0]
|
||||
c.vals = c.vals[1:]
|
||||
|
||||
return v.TimeBytes(), v.ValueBytes()
|
||||
}
|
||||
|
||||
func (c *cursor) decodeBlockAndGetValues(position uint32) ([]byte, []byte) {
|
||||
length := btou32(c.f.mmap[position+8 : position+12])
|
||||
block := c.f.mmap[position+12 : position+12+length]
|
||||
c.vals, _ = DecodeFloatBlock(block)
|
||||
c.pos = position + 12 + length
|
||||
|
||||
v := c.vals[0]
|
||||
c.vals = c.vals[1:]
|
||||
return v.TimeBytes(), v.ValueBytes()
|
||||
}
|
||||
|
||||
func (c *cursor) Direction() tsdb.Direction { return c.direction }
|
||||
|
||||
|
|
|
@ -4,8 +4,8 @@ import (
|
|||
"encoding/binary"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -18,37 +18,31 @@ func TestEngine_WriteAndReadFloats(t *testing.T) {
|
|||
e := OpenDefaultEngine()
|
||||
defer e.Close()
|
||||
|
||||
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
|
||||
"value": {
|
||||
ID: uint8(1),
|
||||
Name: "value",
|
||||
Type: influxql.Float,
|
||||
},
|
||||
})
|
||||
e.Shard = newFieldCodecMock(map[string]influxql.DataType{"value": influxql.Float})
|
||||
|
||||
p1 := parsePoint("cpu,host=A value=1.1 1000000000", codec)
|
||||
p2 := parsePoint("cpu,host=B value=1.2 1000000000", codec)
|
||||
p3 := parsePoint("cpu,host=A value=2.1 2000000000", codec)
|
||||
p4 := parsePoint("cpu,host=B value=2.2 2000000000", codec)
|
||||
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
|
||||
p2 := parsePoint("cpu,host=B value=1.2 1000000000")
|
||||
p3 := parsePoint("cpu,host=A value=2.1 2000000000")
|
||||
p4 := parsePoint("cpu,host=B value=2.2 2000000000")
|
||||
|
||||
if err := e.WriteAndCompact([]tsdb.Point{p1, p2, p3}, nil, nil); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
verify := func() {
|
||||
verify := func(checkSingleBVal bool) {
|
||||
c := e.Cursor("cpu,host=A", tsdb.Forward)
|
||||
k, v := c.Next()
|
||||
if btou64(k) != uint64(p1.UnixNano()) {
|
||||
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), btou64(k))
|
||||
}
|
||||
if !reflect.DeepEqual(v, p1.Data()) {
|
||||
if 1.1 != btof64(v) {
|
||||
t.Fatal("p1 data not equal")
|
||||
}
|
||||
k, v = c.Next()
|
||||
if btou64(k) != uint64(p3.UnixNano()) {
|
||||
t.Fatalf("p3 time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), btou64(k))
|
||||
}
|
||||
if !reflect.DeepEqual(v, p3.Data()) {
|
||||
if 2.1 != btof64(v) {
|
||||
t.Fatal("p3 data not equal")
|
||||
}
|
||||
k, v = c.Next()
|
||||
|
@ -61,28 +55,56 @@ func TestEngine_WriteAndReadFloats(t *testing.T) {
|
|||
if btou64(k) != uint64(p2.UnixNano()) {
|
||||
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k))
|
||||
}
|
||||
if !reflect.DeepEqual(v, p2.Data()) {
|
||||
if 1.2 != btof64(v) {
|
||||
t.Fatal("p2 data not equal")
|
||||
}
|
||||
|
||||
if checkSingleBVal {
|
||||
k, v = c.Next()
|
||||
if k != nil {
|
||||
t.Fatal("expected nil")
|
||||
}
|
||||
}
|
||||
verify()
|
||||
}
|
||||
verify(true)
|
||||
|
||||
if err := e.WriteAndCompact([]tsdb.Point{p4}, nil, nil); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
verify()
|
||||
verify(false)
|
||||
|
||||
c := e.Cursor("cpu,host=B", tsdb.Forward)
|
||||
k, v := c.Seek(u64tob(2000000000))
|
||||
if btou64(k) != uint64(p4.UnixNano()) {
|
||||
t.Fatalf("p4 time wrong:\n\texp:%d\n\tgot:%d\n", p4.UnixNano(), btou64(k))
|
||||
k, v := c.Next()
|
||||
if btou64(k) != uint64(p2.UnixNano()) {
|
||||
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k))
|
||||
}
|
||||
if !reflect.DeepEqual(v, p4.Data()) {
|
||||
t.Fatal("p4 data not equal")
|
||||
if 1.2 != btof64(v) {
|
||||
t.Fatal("p2 data not equal")
|
||||
}
|
||||
k, v = c.Next()
|
||||
if btou64(k) != uint64(p4.UnixNano()) {
|
||||
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k))
|
||||
}
|
||||
if 2.2 != btof64(v) {
|
||||
t.Fatal("p2 data not equal")
|
||||
}
|
||||
|
||||
// verify we can seek
|
||||
k, v = c.Seek(u64tob(2000000000))
|
||||
if btou64(k) != uint64(p4.UnixNano()) {
|
||||
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k))
|
||||
}
|
||||
if 2.2 != btof64(v) {
|
||||
t.Fatal("p2 data not equal")
|
||||
}
|
||||
|
||||
c = e.Cursor("cpu,host=A", tsdb.Forward)
|
||||
k, v = c.Seek(u64tob(0))
|
||||
if btou64(k) != uint64(p1.UnixNano()) {
|
||||
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), btou64(k))
|
||||
}
|
||||
if 1.1 != btof64(v) {
|
||||
t.Fatal("p1 data not equal")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -95,17 +117,9 @@ func TestEngine_WriteIndexBenchmarkNames(t *testing.T) {
|
|||
e := OpenDefaultEngine()
|
||||
defer e.Close()
|
||||
|
||||
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
|
||||
"value": {
|
||||
ID: uint8(1),
|
||||
Name: "value",
|
||||
Type: influxql.Float,
|
||||
},
|
||||
})
|
||||
|
||||
var points []tsdb.Point
|
||||
for i := 0; i < 100000; i++ {
|
||||
points = append(points, parsePoint(fmt.Sprintf("cpu%d value=22.1", i), codec))
|
||||
points = append(points, parsePoint(fmt.Sprintf("cpu%d value=22.1", i)))
|
||||
}
|
||||
|
||||
st := time.Now()
|
||||
|
@ -160,23 +174,35 @@ func (e *Engine) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func parsePoints(buf string, codec *tsdb.FieldCodec) []tsdb.Point {
|
||||
func newFieldCodecMock(fields map[string]influxql.DataType) *FieldCodeMock {
|
||||
m := make(map[string]*tsdb.Field)
|
||||
|
||||
for n, t := range fields {
|
||||
m[n] = &tsdb.Field{Name: n, Type: t}
|
||||
}
|
||||
codec := tsdb.NewFieldCodec(m)
|
||||
|
||||
return &FieldCodeMock{codec: codec}
|
||||
}
|
||||
|
||||
type FieldCodeMock struct {
|
||||
codec *tsdb.FieldCodec
|
||||
}
|
||||
|
||||
func (f *FieldCodeMock) FieldCodec(m string) *tsdb.FieldCodec {
|
||||
return f.codec
|
||||
}
|
||||
|
||||
func parsePoints(buf string) []tsdb.Point {
|
||||
points, err := tsdb.ParsePointsString(buf)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("couldn't parse points: %s", err.Error()))
|
||||
}
|
||||
for _, p := range points {
|
||||
b, err := codec.EncodeFields(p.Fields())
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("couldn't encode fields: %s", err.Error()))
|
||||
}
|
||||
p.SetData(b)
|
||||
}
|
||||
return points
|
||||
}
|
||||
|
||||
func parsePoint(buf string, codec *tsdb.FieldCodec) tsdb.Point {
|
||||
return parsePoints(buf, codec)[0]
|
||||
func parsePoint(buf string) tsdb.Point {
|
||||
return parsePoints(buf)[0]
|
||||
}
|
||||
|
||||
func inttob(v int) []byte {
|
||||
|
@ -194,3 +220,7 @@ func u64tob(v uint64) []byte {
|
|||
binary.BigEndian.PutUint64(b, v)
|
||||
return b
|
||||
}
|
||||
|
||||
func btof64(b []byte) float64 {
|
||||
return math.Float64frombits(binary.BigEndian.Uint64(b))
|
||||
}
|
||||
|
|
|
@ -229,28 +229,29 @@ func (s *Shard) WritePoints(points []models.Point) error {
|
|||
}
|
||||
|
||||
// make sure all data is encoded before attempting to save to bolt
|
||||
for _, p := range points {
|
||||
// Ignore if raw data has already been marshaled.
|
||||
if p.Data() != nil {
|
||||
continue
|
||||
}
|
||||
// TODO: make this only commented out for pd1 engine
|
||||
// for _, p := range points {
|
||||
// // Ignore if raw data has already been marshaled.
|
||||
// if p.Data() != nil {
|
||||
// continue
|
||||
// }
|
||||
|
||||
// This was populated earlier, don't need to validate that it's there.
|
||||
s.mu.RLock()
|
||||
mf := s.measurementFields[p.Name()]
|
||||
s.mu.RUnlock()
|
||||
// // This was populated earlier, don't need to validate that it's there.
|
||||
// s.mu.RLock()
|
||||
// mf := s.measurementFields[p.Name()]
|
||||
// s.mu.RUnlock()
|
||||
|
||||
// If a measurement is dropped while writes for it are in progress, this could be nil
|
||||
if mf == nil {
|
||||
return ErrFieldNotFound
|
||||
}
|
||||
// // If a measurement is dropped while writes for it are in progress, this could be nil
|
||||
// if mf == nil {
|
||||
// return ErrFieldNotFound
|
||||
// }
|
||||
|
||||
data, err := mf.Codec.EncodeFields(p.Fields())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.SetData(data)
|
||||
}
|
||||
// data, err := mf.Codec.EncodeFields(p.Fields())
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// p.SetData(data)
|
||||
// }
|
||||
|
||||
// Write to the engine.
|
||||
if err := s.engine.WritePoints(points, measurementFieldsToSave, seriesToCreate); err != nil {
|
||||
|
@ -741,11 +742,14 @@ func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) {
|
|||
// DecodeByName scans a byte slice for a field with the given name, converts it to its
|
||||
// expected type, and return that value.
|
||||
func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error) {
|
||||
fi := f.FieldByName(name)
|
||||
if fi == nil {
|
||||
return 0, ErrFieldNotFound
|
||||
}
|
||||
return f.DecodeByID(fi.ID, b)
|
||||
// TODO: this is a hack for PD1 testing, please to remove
|
||||
return math.Float64frombits(binary.BigEndian.Uint64(b)), nil
|
||||
|
||||
// fi := f.FieldByName(name)
|
||||
// if fi == nil {
|
||||
// return 0, ErrFieldNotFound
|
||||
// }
|
||||
// return f.DecodeByID(fi.ID, b)
|
||||
}
|
||||
|
||||
func (f *FieldCodec) Fields() (a []*Field) {
|
||||
|
|
Loading…
Reference in New Issue