Add support for multiple fields

pull/4317/merge
Paul Dix 2015-09-27 19:37:03 -04:00
parent a103432598
commit 17ed6932ae
5 changed files with 343 additions and 22 deletions

148
tsdb/engine/pd1/cursor.go Normal file
View File

@ -0,0 +1,148 @@
package pd1
import (
"math"
"github.com/influxdb/influxdb/tsdb"
)
type combinedEngineCursor struct {
walCursor tsdb.Cursor
engineCursor tsdb.Cursor
walKeyBuf int64
walValueBuf interface{}
engineKeyBuf int64
engineValueBuf interface{}
ascending bool
}
func NewCombinedEngineCursor(wc, ec tsdb.Cursor, ascending bool) tsdb.Cursor {
return &combinedEngineCursor{
walCursor: wc,
engineCursor: ec,
ascending: ascending,
}
}
func (c *combinedEngineCursor) SeekTo(seek int64) (key int64, value interface{}) {
c.walKeyBuf, c.walValueBuf = c.walCursor.SeekTo(seek)
c.engineKeyBuf, c.engineValueBuf = c.engineCursor.SeekTo(seek)
return c.read()
}
func (c *combinedEngineCursor) Next() (int64, interface{}) {
return c.read()
}
func (c *combinedEngineCursor) Ascending() bool {
return c.ascending
}
func (c *combinedEngineCursor) read() (key int64, value interface{}) {
key = tsdb.EOF
// handle the case where they have the same point
if c.walKeyBuf != tsdb.EOF && c.walKeyBuf == c.engineKeyBuf {
// keep the wal value since it will overwrite the engine value
key = c.walKeyBuf
value = c.walValueBuf
c.walKeyBuf, c.walValueBuf = c.walCursor.Next()
// drop the engine value
_, _ = c.engineCursor.Next()
return
}
// ascending order
if c.ascending {
if c.engineKeyBuf == tsdb.EOF || (c.walKeyBuf != tsdb.EOF && c.walKeyBuf < c.engineKeyBuf) {
key = c.walKeyBuf
value = c.walValueBuf
c.walKeyBuf, c.walValueBuf = c.walCursor.Next()
} else {
key = c.engineKeyBuf
value = c.engineValueBuf
c.engineKeyBuf, c.engineValueBuf = c.engineCursor.Next()
}
return
}
// descending order
if c.engineKeyBuf == tsdb.EOF || (c.walKeyBuf != tsdb.EOF && c.walKeyBuf > c.engineKeyBuf) {
key = c.walKeyBuf
value = c.walValueBuf
c.walKeyBuf, c.walValueBuf = c.walCursor.Next()
return
}
key = c.engineKeyBuf
value = c.engineValueBuf
c.engineKeyBuf, c.engineValueBuf = c.engineCursor.Next()
return
}
type multiFieldCursor struct {
fields []string
cursors []tsdb.Cursor
ascending bool
keyBuffer []int64
valueBuffer []interface{}
}
func NewMultiFieldCursor(fields []string, cursors []tsdb.Cursor, ascending bool) tsdb.Cursor {
return &multiFieldCursor{
fields: fields,
cursors: cursors,
ascending: ascending,
keyBuffer: make([]int64, len(cursors)),
valueBuffer: make([]interface{}, len(cursors)),
}
}
func (m *multiFieldCursor) SeekTo(seek int64) (key int64, value interface{}) {
for i, c := range m.cursors {
m.keyBuffer[i], m.valueBuffer[i] = c.SeekTo(seek)
}
return m.read()
}
func (m *multiFieldCursor) Next() (int64, interface{}) {
return m.read()
}
func (m *multiFieldCursor) Ascending() bool {
return m.ascending
}
func (m *multiFieldCursor) read() (int64, interface{}) {
t := int64(math.MaxInt64)
if !m.ascending {
t = int64(math.MinInt64)
}
// find the time we need to combine all fields
for _, k := range m.keyBuffer {
if k == tsdb.EOF {
continue
}
if m.ascending && t > k {
t = k
} else if !m.ascending && t < k {
t = k
}
}
// get the value and advance each of the cursors that have the matching time
if t == math.MinInt64 || t == math.MaxInt64 {
return tsdb.EOF, nil
}
mm := make(map[string]interface{})
for i, k := range m.keyBuffer {
if k == t {
mm[m.fields[i]] = m.valueBuffer[i]
m.keyBuffer[i], m.valueBuffer[i] = m.cursors[i].Next()
}
}
return t, mm
}

View File

@ -78,10 +78,9 @@ func (v Values) DecodeSameTypeBlock(block []byte) Values {
// DecodeBlock takes a byte array and will decode into values of the appropriate type
// based on the block
func DecodeBlock(block []byte) Values {
func DecodeBlock(block []byte) (Values, error) {
// TODO: add support for other block types
a, _ := DecodeFloatBlock(block)
return a
return DecodeFloatBlock(block)
}
// Deduplicate returns a new Values slice with any values

View File

@ -17,7 +17,6 @@ import (
"time"
"github.com/golang/snappy"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb"
)
@ -404,7 +403,8 @@ func (e *Engine) Compact() error {
}
currentPosition += uint32(newPos - pos)
} else {
previousValues = DecodeBlock(block)
// TODO: handle decode error
previousValues, _ = DecodeBlock(block)
}
// write the previous values and clear if we've hit the limit
@ -971,13 +971,35 @@ func (e *Engine) Begin(writable bool) (tsdb.Tx, error) {
// TODO: handle multiple fields and descending
func (e *Engine) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor {
field := dec.FieldByName("value")
if field == nil || len(fields) > 1 {
panic("pd1 engine only supports one field with name of value")
files := e.copyFilesCollection()
// don't add the overhead of the multifield cursor if we only have one field
if len(fields) == 1 {
id := e.keyAndFieldToID(series, fields[0])
indexCursor := newCursor(id, files, ascending)
wc := e.WAL.Cursor(series, fields, dec, ascending)
return NewCombinedEngineCursor(wc, indexCursor, ascending)
}
// multiple fields. use just the MultiFieldCursor, which also handles time collisions
// so we don't need to use the combined cursor
cursors := make([]tsdb.Cursor, 0)
cursorFields := make([]string, 0)
for _, field := range fields {
id := e.keyAndFieldToID(series, field)
indexCursor := newCursor(id, files, ascending)
wc := e.WAL.Cursor(series, []string{field}, dec, ascending)
// double up the fields since there's one for the wal and one for the index
cursorFields = append(cursorFields, field, field)
cursors = append(cursors, indexCursor, wc)
}
return NewMultiFieldCursor(cursorFields, cursors, ascending)
}
func (e *Engine) keyAndFieldToID(series, field string) uint64 {
// get the ID for the key and be sure to check if it had hash collision before
key := seriesFieldKey(series, field.Name)
key := seriesFieldKey(series, field)
e.collisionsLock.RLock()
id, ok := e.collisions[key]
e.collisionsLock.RUnlock()
@ -985,10 +1007,7 @@ func (e *Engine) Cursor(series string, fields []string, dec *tsdb.FieldCodec, as
if !ok {
id = e.HashSeriesField(key)
}
indexCursor := newCursor(id, field.Type, e.copyFilesCollection(), ascending)
wc := e.WAL.Cursor(series, fields, dec, ascending)
return tsdb.MultiCursor(wc, indexCursor)
return id
}
func (e *Engine) copyFilesCollection() []*dataFile {
@ -1334,7 +1353,6 @@ func (a dataFiles) Less(i, j int) bool { return a[i].MinTime() < a[j].MinTime()
type cursor struct {
id uint64
dataType influxql.DataType
f *dataFile
filesPos int // the index in the files slice we're looking at
pos uint32
@ -1346,10 +1364,9 @@ type cursor struct {
files []*dataFile
}
func newCursor(id uint64, dataType influxql.DataType, files []*dataFile, ascending bool) *cursor {
func newCursor(id uint64, files []*dataFile, ascending bool) *cursor {
return &cursor{
id: id,
dataType: dataType,
ascending: ascending,
files: files,
}
@ -1470,7 +1487,7 @@ func (c *cursor) Next() (int64, interface{}) {
func (c *cursor) decodeBlockAndGetValues(position uint32) (int64, interface{}) {
length := btou32(c.f.mmap[position+8 : position+12])
block := c.f.mmap[position+12 : position+12+length]
c.vals, _ = DecodeFloatBlock(block)
c.vals, _ = DecodeBlock(block)
c.pos = position + 12 + length
v := c.vals[0]

View File

@ -6,6 +6,7 @@ import (
"io/ioutil"
"math"
"os"
"reflect"
"testing"
"time"
@ -459,6 +460,161 @@ func TestEngine_KeyCollisionsAreHandled(t *testing.T) {
verify("cpu,host=C", []models.Point{p3, p6, p9}, 0)
}
func TestEngine_SupportMultipleFields(t *testing.T) {
e := OpenDefaultEngine()
defer e.Cleanup()
fields := []string{"value", "foo"}
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
p2 := parsePoint("cpu,host=A value=1.2,foo=2.2 2000000000")
if err := e.WritePoints([]models.Point{p1, p2}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
c := e.Cursor("cpu,host=A", fields, nil, true)
k, v := c.SeekTo(0)
if k != p1.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k)
}
if !reflect.DeepEqual(v, map[string]interface{}{"value": 1.1}) {
t.Fatalf("value wrong: %v", v)
}
k, v = c.Next()
if k != p2.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p2.UnixNano(), k)
}
if !reflect.DeepEqual(v, map[string]interface{}{"value": 1.2, "foo": 2.2}) {
t.Fatalf("value wrong: %v", v)
}
k, _ = c.Next()
if k != tsdb.EOF {
t.Fatal("expected EOF")
}
// verify we can update a field and it's still all good
p11 := parsePoint("cpu,host=A foo=2.1 1000000000")
if err := e.WritePoints([]models.Point{p11}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
c = e.Cursor("cpu,host=A", fields, nil, true)
k, v = c.SeekTo(0)
if k != p1.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k)
}
if !reflect.DeepEqual(v, map[string]interface{}{"value": 1.1, "foo": 2.1}) {
t.Fatalf("value wrong: %v", v)
}
k, v = c.Next()
if k != p2.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p2.UnixNano(), k)
}
if !reflect.DeepEqual(v, map[string]interface{}{"value": 1.2, "foo": 2.2}) {
t.Fatalf("value wrong: %v", v)
}
k, _ = c.Next()
if k != tsdb.EOF {
t.Fatal("expected EOF")
}
// verify it's all good with the wal in the picture
e.WAL.SkipCache = false
p3 := parsePoint("cpu,host=A value=1.3 3000000000")
p4 := parsePoint("cpu,host=A value=1.4,foo=2.4 4000000000")
if err := e.WritePoints([]models.Point{p3, p4}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
c = e.Cursor("cpu,host=A", fields, nil, true)
k, v = c.SeekTo(0)
if k != p1.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k)
}
if !reflect.DeepEqual(v, map[string]interface{}{"value": 1.1, "foo": 2.1}) {
t.Fatalf("value wrong: %v", v)
}
k, v = c.Next()
if k != p2.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p2.UnixNano(), k)
}
if !reflect.DeepEqual(v, map[string]interface{}{"value": 1.2, "foo": 2.2}) {
t.Fatalf("value wrong: %v", v)
}
k, v = c.Next()
if k != p3.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p3.UnixNano(), k)
}
if !reflect.DeepEqual(v, map[string]interface{}{"value": 1.3}) {
t.Fatalf("value wrong: %v", v)
}
k, v = c.Next()
if k != p4.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p2.UnixNano(), k)
}
if !reflect.DeepEqual(v, map[string]interface{}{"value": 1.4, "foo": 2.4}) {
t.Fatalf("value wrong: %v", v)
}
k, _ = c.Next()
if k != tsdb.EOF {
t.Fatal("expected EOF")
}
p33 := parsePoint("cpu,host=A foo=2.3 3000000000")
if err := e.WritePoints([]models.Point{p33}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
c = e.Cursor("cpu,host=A", fields, nil, true)
k, v = c.SeekTo(0)
if k != p1.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k)
}
if !reflect.DeepEqual(v, map[string]interface{}{"value": 1.1, "foo": 2.1}) {
t.Fatalf("value wrong: %v", v)
}
k, v = c.Next()
if k != p2.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p2.UnixNano(), k)
}
if !reflect.DeepEqual(v, map[string]interface{}{"value": 1.2, "foo": 2.2}) {
t.Fatalf("value wrong: %v", v)
}
k, v = c.Next()
if k != p3.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p3.UnixNano(), k)
}
if !reflect.DeepEqual(v, map[string]interface{}{"value": 1.3, "foo": 2.3}) {
t.Fatalf("value wrong: %v", v)
}
k, v = c.Next()
if k != p4.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p2.UnixNano(), k)
}
if !reflect.DeepEqual(v, map[string]interface{}{"value": 1.4, "foo": 2.4}) {
t.Fatalf("value wrong: %v", v)
}
k, _ = c.Next()
if k != tsdb.EOF {
t.Fatal("expected EOF")
}
// and ensure we can grab one of the fields
c = e.Cursor("cpu,host=A", []string{"value"}, nil, true)
k, v = c.SeekTo(4000000000)
if k != p4.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p4.UnixNano(), k)
}
if v != 1.4 {
t.Fatalf("value wrong: %v", v)
}
k, _ = c.Next()
if k != tsdb.EOF {
t.Fatal("expected EOF")
}
}
func TestEngine_WriteIndexBenchmarkNames(t *testing.T) {
t.Skip("whatevs")

View File

@ -171,16 +171,17 @@ func (l *Log) Open() error {
return nil
}
// Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given
// Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given.
// This should only ever be called by the engine cursor method, which will always give it
// exactly one field.
func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor {
l.cacheLock.RLock()
defer l.cacheLock.RUnlock()
// TODO: make this work for other fields
if len(fields) != 1 || fields[0] != "value" {
panic("pd1 wal only supports 1 field with name value")
if len(fields) != 1 {
panic("wal cursor should only ever be called with 1 field")
}
ck := seriesFieldKey(series, "value")
ck := seriesFieldKey(series, fields[0])
values := l.cache[ck]
// if we're in the middle of a flush, combine the previous cache