Update to work with new cursor definitiono and Point in models

pull/4317/merge
Paul Dix 2015-09-25 10:49:26 -04:00
parent ea85f8042d
commit ed7055146a
7 changed files with 125 additions and 120 deletions

View File

@ -8,7 +8,7 @@ import (
const (
// DefaultEngine is the default engine for new shards
DefaultEngine = "bz1"
DefaultEngine = "pd1"
// DefaultMaxWALSize is the default size of the WAL before it is flushed.
DefaultMaxWALSize = 100 * 1024 * 1024 // 100MB

View File

@ -18,9 +18,6 @@ var (
ErrFormatNotFound = errors.New("format not found")
)
// DefaultEngine is the default engine used by the shard when initializing.
const DefaultEngine = "pd1"
// Engine represents a swappable storage engine for the shard.
type Engine interface {
Open() error

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/dgryski/go-tsz"
"github.com/influxdb/influxdb/tsdb"
)
type Value interface {
@ -35,7 +36,7 @@ type EmptyValue struct {
func (e *EmptyValue) TimeBytes() []byte { return nil }
func (e *EmptyValue) ValueBytes() []byte { return nil }
func (e *EmptyValue) Time() time.Time { return time.Unix(0, 0) }
func (e *EmptyValue) Time() time.Time { return time.Unix(0, tsdb.EOF) }
func (e *EmptyValue) Value() interface{} { return nil }
func (e *EmptyValue) Size() int { return 0 }

View File

@ -17,6 +17,7 @@ import (
"github.com/golang/snappy"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb"
)
@ -217,7 +218,7 @@ func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex,
// WritePoints writes metadata and point data into the engine.
// Returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
func (e *Engine) WritePoints(points []models.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
return e.WAL.WritePoints(points, measurementFieldsToSave, seriesToCreate)
}
@ -614,7 +615,7 @@ func (e *Engine) Begin(writable bool) (tsdb.Tx, error) {
}
// TODO: make the cursor take a field name
func (e *Engine) Cursor(series string, direction tsdb.Direction) tsdb.Cursor {
func (e *Engine) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor {
measurementName := tsdb.MeasurementFromSeriesKey(series)
codec := e.Shard.FieldCodec(measurementName)
if codec == nil {
@ -627,7 +628,7 @@ func (e *Engine) Cursor(series string, direction tsdb.Direction) tsdb.Cursor {
// TODO: ensure we map the collisions
id := hashSeriesField(seriesFieldKey(series, field.Name))
return newCursor(id, field.Type, e.copyFilesCollection(), direction)
return newCursor(id, field.Type, e.copyFilesCollection(), ascending)
}
func (e *Engine) copyFilesCollection() []*dataFile {
@ -961,30 +962,28 @@ type cursor struct {
pos uint32
vals Values
direction tsdb.Direction
ascending bool
// time acending list of data files
files []*dataFile
}
func newCursor(id uint64, dataType influxql.DataType, files []*dataFile, direction tsdb.Direction) *cursor {
func newCursor(id uint64, dataType influxql.DataType, files []*dataFile, ascending bool) *cursor {
return &cursor{
id: id,
dataType: dataType,
direction: direction,
ascending: ascending,
files: files,
}
}
func (c *cursor) Seek(seek []byte) (key, value []byte) {
t := int64(btou64(seek))
if t < c.files[0].MinTime() {
func (c *cursor) SeekTo(seek int64) (int64, interface{}) {
if seek < c.files[0].MinTime() {
c.filesPos = 0
c.f = c.files[0]
} else {
for i, f := range c.files {
if t >= f.MinTime() && t <= f.MaxTime() {
if seek >= f.MinTime() && seek <= f.MaxTime() {
c.filesPos = i
c.f = f
break
@ -993,7 +992,7 @@ func (c *cursor) Seek(seek []byte) (key, value []byte) {
}
if c.f == nil {
return nil, nil
return tsdb.EOF, nil
}
// TODO: make this for the reverse direction cursor
@ -1006,7 +1005,7 @@ func (c *cursor) Seek(seek []byte) (key, value []byte) {
if pos == 0 {
c.filesPos++
if c.filesPos >= len(c.files) {
return nil, nil
return tsdb.EOF, nil
}
c.f = c.files[c.filesPos]
continue
@ -1025,7 +1024,7 @@ func (c *cursor) Seek(seek []byte) (key, value []byte) {
nextBlockID := btou64(c.f.mmap[nextBlockPos : nextBlockPos+8])
if nextBlockID == c.id {
nextBlockTime := int64(btou64(c.f.mmap[nextBlockPos+12 : nextBlockPos+20]))
if nextBlockTime <= t {
if nextBlockTime <= seek {
pos = nextBlockPos
continue
}
@ -1033,16 +1032,16 @@ func (c *cursor) Seek(seek []byte) (key, value []byte) {
}
// it must be in this block or not at all
tb, vb := c.decodeBlockAndGetValues(pos)
if int64(btou64(tb)) >= t {
return tb, vb
t, v := c.decodeBlockAndGetValues(pos)
if t >= seek {
return t, v
}
// wasn't in the first value popped out of the block, check the rest
for i, v := range c.vals {
if v.Time().UnixNano() >= t {
if v.Time().UnixNano() >= seek {
c.vals = c.vals[i+1:]
return v.TimeBytes(), v.ValueBytes()
return v.Time().UnixNano(), v.Value()
}
}
@ -1052,7 +1051,7 @@ func (c *cursor) Seek(seek []byte) (key, value []byte) {
}
}
func (c *cursor) Next() (key, value []byte) {
func (c *cursor) Next() (int64, interface{}) {
if len(c.vals) == 0 {
// if we have a file set, see if the next block is for this ID
if c.f != nil && c.pos < c.f.size {
@ -1081,16 +1080,16 @@ func (c *cursor) Next() (key, value []byte) {
}
// we didn't get to a file that had a next value
return nil, nil
return tsdb.EOF, nil
}
v := c.vals[0]
c.vals = c.vals[1:]
return v.TimeBytes(), v.ValueBytes()
return v.Time().UnixNano(), v.Value()
}
func (c *cursor) decodeBlockAndGetValues(position uint32) ([]byte, []byte) {
func (c *cursor) decodeBlockAndGetValues(position uint32) (int64, interface{}) {
length := btou32(c.f.mmap[position+8 : position+12])
block := c.f.mmap[position+12 : position+12+length]
c.vals, _ = DecodeFloatBlock(block)
@ -1098,10 +1097,10 @@ func (c *cursor) decodeBlockAndGetValues(position uint32) ([]byte, []byte) {
v := c.vals[0]
c.vals = c.vals[1:]
return v.TimeBytes(), v.ValueBytes()
return v.Time().UnixNano(), v.Value()
}
func (c *cursor) Direction() tsdb.Direction { return c.direction }
func (c *cursor) Ascending() bool { return c.ascending }
// u64tob converts a uint64 into an 8-byte slice.
func u64tob(v uint64) []byte {

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/influxdb/tsdb/engine/pd1"
)
@ -25,86 +26,88 @@ func TestEngine_WriteAndReadFloats(t *testing.T) {
p3 := parsePoint("cpu,host=A value=2.1 2000000000")
p4 := parsePoint("cpu,host=B value=2.2 2000000000")
if err := e.WritePoints([]tsdb.Point{p1, p2, p3}, nil, nil); err != nil {
if err := e.WritePoints([]models.Point{p1, p2, p3}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
fields := []string{"value"}
var codec *tsdb.FieldCodec
verify := func(checkSingleBVal bool) {
c := e.Cursor("cpu,host=A", tsdb.Forward)
c := e.Cursor("cpu,host=A", fields, codec, true)
k, v := c.Next()
if btou64(k) != uint64(p1.UnixNano()) {
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), btou64(k))
if k != p1.UnixNano() {
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k)
}
if 1.1 != btof64(v) {
if 1.1 != v {
t.Fatal("p1 data not equal")
}
k, v = c.Next()
if btou64(k) != uint64(p3.UnixNano()) {
t.Fatalf("p3 time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), btou64(k))
if k != p3.UnixNano() {
t.Fatalf("p3 time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), k)
}
if 2.1 != btof64(v) {
if 2.1 != v {
t.Fatal("p3 data not equal")
}
k, v = c.Next()
if k != nil {
fmt.Println(btou64(k), btof64(v))
t.Fatal("expected nil")
if k != tsdb.EOF {
t.Fatal("expected EOF")
}
c = e.Cursor("cpu,host=B", tsdb.Forward)
c = e.Cursor("cpu,host=B", fields, codec, true)
k, v = c.Next()
if btou64(k) != uint64(p2.UnixNano()) {
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k))
if k != p2.UnixNano() {
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k)
}
if 1.2 != btof64(v) {
if 1.2 != v {
t.Fatal("p2 data not equal")
}
if checkSingleBVal {
k, v = c.Next()
if k != nil {
t.Fatal("expected nil")
if k != tsdb.EOF {
t.Fatal("expected EOF")
}
}
}
verify(true)
if err := e.WritePoints([]tsdb.Point{p4}, nil, nil); err != nil {
if err := e.WritePoints([]models.Point{p4}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
verify(false)
c := e.Cursor("cpu,host=B", tsdb.Forward)
c := e.Cursor("cpu,host=B", fields, codec, true)
k, v := c.Next()
if btou64(k) != uint64(p2.UnixNano()) {
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k))
if k != p2.UnixNano() {
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k)
}
if 1.2 != btof64(v) {
if 1.2 != v {
t.Fatal("p2 data not equal")
}
k, v = c.Next()
if btou64(k) != uint64(p4.UnixNano()) {
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k))
if k != p4.UnixNano() {
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k)
}
if 2.2 != btof64(v) {
if 2.2 != v {
t.Fatal("p2 data not equal")
}
// verify we can seek
k, v = c.Seek(u64tob(2000000000))
if btou64(k) != uint64(p4.UnixNano()) {
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k))
k, v = c.SeekTo(2000000000)
if k != p4.UnixNano() {
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k)
}
if 2.2 != btof64(v) {
if 2.2 != v {
t.Fatal("p2 data not equal")
}
c = e.Cursor("cpu,host=A", tsdb.Forward)
k, v = c.Seek(u64tob(0))
if btou64(k) != uint64(p1.UnixNano()) {
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), btou64(k))
c = e.Cursor("cpu,host=A", fields, codec, true)
k, v = c.SeekTo(0)
if k != p1.UnixNano() {
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k)
}
if 1.1 != btof64(v) {
if 1.1 != v {
t.Fatal("p1 data not equal")
}
@ -128,7 +131,7 @@ func TestEngine_WriteIndexBenchmarkNames(t *testing.T) {
e := OpenDefaultEngine()
defer e.Cleanup()
var points []tsdb.Point
var points []models.Point
for i := 0; i < 100000; i++ {
points = append(points, parsePoint(fmt.Sprintf("cpu%d value=22.1", i)))
}
@ -205,15 +208,15 @@ func (f *FieldCodeMock) FieldCodec(m string) *tsdb.FieldCodec {
return f.codec
}
func parsePoints(buf string) []tsdb.Point {
points, err := tsdb.ParsePointsString(buf)
func parsePoints(buf string) []models.Point {
points, err := models.ParsePointsString(buf)
if err != nil {
panic(fmt.Sprintf("couldn't parse points: %s", err.Error()))
}
return points
}
func parsePoint(buf string) tsdb.Point {
func parsePoint(buf string) models.Point {
return parsePoints(buf)[0]
}

View File

@ -1,7 +1,6 @@
package pd1
import (
"bytes"
"encoding/json"
"fmt"
"io"
@ -14,8 +13,10 @@ import (
"sync"
"time"
"github.com/golang/snappy"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb"
"github.com/golang/snappy"
)
const (
@ -171,12 +172,12 @@ func (l *Log) Open() error {
}
// Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given
func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor {
l.cacheLock.RLock()
defer l.cacheLock.RUnlock()
// TODO: make this work for other fields
ck := seriesFieldKey(key, "value")
ck := seriesFieldKey(series, "value")
values := l.cache[ck]
// if we're in the middle of a flush, combine the previous cache
@ -187,7 +188,7 @@ func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
copy(c, fc)
c = append(c, values...)
return newWALCursor(c, direction)
return newWALCursor(c, ascending)
}
}
@ -199,10 +200,10 @@ func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
// build a copy so writes afterwards don't change the result set
a := make([]Value, len(values))
copy(a, values)
return newWALCursor(a, direction)
return newWALCursor(a, ascending)
}
func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
func (l *Log) WritePoints(points []models.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
// add everything to the cache, or return an error if we've hit our max memory
if addedToCache := l.addToCache(points, fields, series, true); !addedToCache {
return fmt.Errorf("WAL backed up flushing to index, hit max memory")
@ -260,7 +261,7 @@ func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.Measureme
// immediately after return and will be flushed at the next flush cycle. Before adding to the cache we check if we're over the
// max memory threshold. If we are we request a flush in a new goroutine and return false, indicating we didn't add the values
// to the cache and that writes should return a failure.
func (l *Log) addToCache(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate, checkMemory bool) bool {
func (l *Log) addToCache(points []models.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate, checkMemory bool) bool {
l.cacheLock.Lock()
defer l.cacheLock.Unlock()
@ -371,7 +372,7 @@ func (l *Log) readFileToCache(fileName string) error {
// and marshal it and send it to the cache
switch walEntryType(entryType) {
case pointsEntry:
points, err := tsdb.ParsePoints(data)
points, err := models.ParsePoints(data)
if err != nil {
return err
}
@ -674,53 +675,53 @@ func (l *Log) shouldFlush() flushType {
type walCursor struct {
cache Values
position int
direction tsdb.Direction
ascending bool
}
func newWALCursor(cache Values, direction tsdb.Direction) *walCursor {
func newWALCursor(cache Values, ascending bool) *walCursor {
// position is set such that a call to Next will successfully advance
// to the next postion and return the value.
c := &walCursor{cache: cache, direction: direction, position: -1}
if direction.Reverse() {
c := &walCursor{cache: cache, ascending: ascending, position: -1}
if !ascending {
c.position = len(c.cache)
}
return c
}
func (c *walCursor) Direction() tsdb.Direction { return c.direction }
func (c *walCursor) Ascending() bool { return c.ascending }
// Seek will point the cursor to the given time (or key)
func (c *walCursor) Seek(seek []byte) (key, value []byte) {
func (c *walCursor) SeekTo(seek int64) (int64, interface{}) {
// Seek cache index
c.position = sort.Search(len(c.cache), func(i int) bool {
return bytes.Compare(c.cache[i].TimeBytes(), seek) != -1
return c.cache[i].Time().UnixNano() >= seek
})
// If seek is not in the cache, return the last value in the cache
if c.direction.Reverse() && c.position >= len(c.cache) {
if !c.ascending && c.position >= len(c.cache) {
c.position = len(c.cache)
}
// Make sure our position points to something in the cache
if c.position < 0 || c.position >= len(c.cache) {
return nil, nil
return tsdb.EOF, nil
}
v := c.cache[c.position]
return v.TimeBytes(), v.ValueBytes()
return v.Time().UnixNano(), v.Value()
}
// Next moves the cursor to the next key/value. will return nil if at the end
func (c *walCursor) Next() (key, value []byte) {
func (c *walCursor) Next() (int64, interface{}) {
var v Value
if c.direction.Forward() {
if c.ascending {
v = c.nextForward()
} else {
v = c.nextReverse()
}
return v.TimeBytes(), v.ValueBytes()
return v.Time().UnixNano(), v.Value()
}
// nextForward advances the cursor forward returning the next value

View File

@ -6,6 +6,7 @@ import (
"reflect"
"testing"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/influxdb/tsdb/engine/pd1"
)
@ -38,64 +39,67 @@ func TestWAL_TestWriteQueryOpen(t *testing.T) {
fieldsToWrite := map[string]*tsdb.MeasurementFields{"foo": {Fields: map[string]*tsdb.Field{"bar": {Name: "value"}}}}
seriesToWrite := []*tsdb.SeriesCreate{{Measurement: "asdf"}}
if err := w.WritePoints([]tsdb.Point{p1, p2}, fieldsToWrite, seriesToWrite); err != nil {
if err := w.WritePoints([]models.Point{p1, p2}, fieldsToWrite, seriesToWrite); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
c := w.Cursor("cpu,host=A", tsdb.Forward)
fieldNames := []string{"value"}
var codec *tsdb.FieldCodec
c := w.Cursor("cpu,host=A", fieldNames, codec, true)
k, v := c.Next()
if btou64(k) != uint64(p1.UnixNano()) {
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), btou64(k))
if k != p1.UnixNano() {
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k)
}
if 1.1 != btof64(v) {
if 1.1 != v {
t.Fatal("p1 data not equal")
}
c = w.Cursor("cpu,host=B", tsdb.Forward)
c = w.Cursor("cpu,host=B", fieldNames, codec, true)
k, v = c.Next()
if btou64(k) != uint64(p2.UnixNano()) {
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k))
if k != p2.UnixNano() {
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k)
}
if 1.2 != btof64(v) {
if 1.2 != v {
t.Fatal("p2 data not equal")
}
k, v = c.Next()
if k != nil {
t.Fatal("expected nil")
if k != tsdb.EOF {
t.Fatal("expected EOF", k, v)
}
// ensure we can do another write to the wal and get stuff
if err := w.WritePoints([]tsdb.Point{p3}, nil, nil); err != nil {
if err := w.WritePoints([]models.Point{p3}, nil, nil); err != nil {
t.Fatalf("failed to write: %s", err.Error)
}
c = w.Cursor("cpu,host=A", tsdb.Forward)
c = w.Cursor("cpu,host=A", fieldNames, codec, true)
k, v = c.Next()
if btou64(k) != uint64(p1.UnixNano()) {
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), btou64(k))
if k != p1.UnixNano() {
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k)
}
if 1.1 != btof64(v) {
if 1.1 != v {
t.Fatal("p1 data not equal")
}
k, v = c.Next()
if btou64(k) != uint64(p3.UnixNano()) {
t.Fatalf("p3 time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), btou64(k))
if k != p3.UnixNano() {
t.Fatalf("p3 time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), k)
}
if 2.1 != btof64(v) {
if 2.1 != v {
t.Fatal("p3 data not equal")
}
// ensure we can seek
k, v = c.Seek(u64tob(2000000000))
if btou64(k) != uint64(p3.UnixNano()) {
t.Fatalf("p3 time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), btou64(k))
k, v = c.SeekTo(2000000000)
if k != p3.UnixNano() {
t.Fatalf("p3 time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), k)
}
if 2.1 != btof64(v) {
if 2.1 != v {
t.Fatal("p3 data not equal")
}
k, v = c.Next()
if k != nil {
t.Fatal("expected nil")
if k != tsdb.EOF {
t.Fatal("expected EOF")
}
// ensure we close and after open it flushes to the index
@ -115,15 +119,15 @@ func TestWAL_TestWriteQueryOpen(t *testing.T) {
t.Fatal("expected host B values to flush to index on open")
}
if err := w.WritePoints([]tsdb.Point{p4}, nil, nil); err != nil {
if err := w.WritePoints([]models.Point{p4}, nil, nil); err != nil {
t.Fatalf("failed to write: %s", err.Error)
}
c = w.Cursor("cpu,host=B", tsdb.Forward)
c = w.Cursor("cpu,host=B", fieldNames, codec, true)
k, v = c.Next()
if btou64(k) != uint64(p4.UnixNano()) {
t.Fatalf("p4 time wrong:\n\texp:%d\n\tgot:%d\n", p4.UnixNano(), btou64(k))
if k != p4.UnixNano() {
t.Fatalf("p4 time wrong:\n\texp:%d\n\tgot:%d\n", p4.UnixNano(), k)
}
if 2.2 != btof64(v) {
if 2.2 != v {
t.Fatal("p4 data not equal")
}