Update tests to use transactions. Add test for single series 10k points.

pull/4308/head
Paul Dix 2015-09-29 11:27:02 -04:00
parent 0b33a71bb7
commit db4ad33f3c
1 changed files with 75 additions and 51 deletions

View File

@ -10,7 +10,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/models" "github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb" "github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/influxdb/tsdb/engine/pd1" "github.com/influxdb/influxdb/tsdb/engine/pd1"
@ -30,16 +29,11 @@ func TestEngine_WriteAndReadFloats(t *testing.T) {
} }
fields := []string{"value"} fields := []string{"value"}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
verify := func(checkSingleBVal bool) { verify := func(checkSingleBVal bool) {
c := e.Cursor("cpu,host=A", fields, codec, true) tx, _ := e.Begin(false)
defer tx.Rollback()
c := tx.Cursor("cpu,host=A", fields, nil, true)
k, v := c.SeekTo(0) k, v := c.SeekTo(0)
if k != p1.UnixNano() { if k != p1.UnixNano() {
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k) t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k)
@ -59,7 +53,7 @@ func TestEngine_WriteAndReadFloats(t *testing.T) {
t.Fatal("expected EOF") t.Fatal("expected EOF")
} }
c = e.Cursor("cpu,host=B", fields, codec, true) c = tx.Cursor("cpu,host=B", fields, nil, true)
k, v = c.SeekTo(0) k, v = c.SeekTo(0)
if k != p2.UnixNano() { if k != p2.UnixNano() {
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k) t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k)
@ -82,7 +76,9 @@ func TestEngine_WriteAndReadFloats(t *testing.T) {
} }
verify(false) verify(false)
c := e.Cursor("cpu,host=B", fields, codec, true) tx, _ := e.Begin(false)
defer tx.Rollback()
c := tx.Cursor("cpu,host=B", fields, nil, true)
k, v := c.SeekTo(0) k, v := c.SeekTo(0)
if k != p2.UnixNano() { if k != p2.UnixNano() {
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k) t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k)
@ -107,7 +103,7 @@ func TestEngine_WriteAndReadFloats(t *testing.T) {
t.Fatal("p2 data not equal") t.Fatal("p2 data not equal")
} }
c = e.Cursor("cpu,host=A", fields, codec, true) c = tx.Cursor("cpu,host=A", fields, nil, true)
k, v = c.SeekTo(0) k, v = c.SeekTo(0)
if k != p1.UnixNano() { if k != p1.UnixNano() {
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k) t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k)
@ -161,7 +157,9 @@ func TestEngine_WriteIndexQueryAcrossDataFiles(t *testing.T) {
fields := []string{"value"} fields := []string{"value"}
verify := func(series string, points []models.Point, seek int64) { verify := func(series string, points []models.Point, seek int64) {
c := e.Cursor(series, fields, nil, true) tx, _ := e.Begin(false)
defer tx.Rollback()
c := tx.Cursor(series, fields, nil, true)
k, v := c.SeekTo(seek) k, v := c.SeekTo(seek)
p := points[0] p := points[0]
@ -191,13 +189,6 @@ func TestEngine_WriteOverwritePreviousPoint(t *testing.T) {
defer e.Cleanup() defer e.Cleanup()
fields := []string{"value"} fields := []string{"value"}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
p1 := parsePoint("cpu,host=A value=1.1 1000000000") p1 := parsePoint("cpu,host=A value=1.1 1000000000")
p2 := parsePoint("cpu,host=A value=1.2 1000000000") p2 := parsePoint("cpu,host=A value=1.2 1000000000")
@ -207,7 +198,9 @@ func TestEngine_WriteOverwritePreviousPoint(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error()) t.Fatalf("failed to write points: %s", err.Error())
} }
c := e.Cursor("cpu,host=A", fields, codec, true) tx, _ := e.Begin(false)
defer tx.Rollback()
c := tx.Cursor("cpu,host=A", fields, nil, true)
k, v := c.SeekTo(0) k, v := c.SeekTo(0)
if k != p2.UnixNano() { if k != p2.UnixNano() {
t.Fatalf("time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k) t.Fatalf("time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k)
@ -224,7 +217,9 @@ func TestEngine_WriteOverwritePreviousPoint(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error()) t.Fatalf("failed to write points: %s", err.Error())
} }
c = e.Cursor("cpu,host=A", fields, codec, true) tx2, _ := e.Begin(false)
defer tx2.Rollback()
c = tx2.Cursor("cpu,host=A", fields, nil, true)
k, v = c.SeekTo(0) k, v = c.SeekTo(0)
if k != p3.UnixNano() { if k != p3.UnixNano() {
t.Fatalf("time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), k) t.Fatalf("time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), k)
@ -243,13 +238,6 @@ func TestEngine_CursorCombinesWALAndIndex(t *testing.T) {
defer e.Cleanup() defer e.Cleanup()
fields := []string{"value"} fields := []string{"value"}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
p1 := parsePoint("cpu,host=A value=1.1 1000000000") p1 := parsePoint("cpu,host=A value=1.1 1000000000")
p2 := parsePoint("cpu,host=A value=1.2 2000000000") p2 := parsePoint("cpu,host=A value=1.2 2000000000")
@ -262,7 +250,9 @@ func TestEngine_CursorCombinesWALAndIndex(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error()) t.Fatalf("failed to write points: %s", err.Error())
} }
c := e.Cursor("cpu,host=A", fields, codec, true) tx, _ := e.Begin(false)
defer tx.Rollback()
c := tx.Cursor("cpu,host=A", fields, nil, true)
k, v := c.SeekTo(0) k, v := c.SeekTo(0)
if k != p1.UnixNano() { if k != p1.UnixNano() {
t.Fatalf("time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k) t.Fatalf("time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k)
@ -318,13 +308,6 @@ func TestEngine_Compaction(t *testing.T) {
} }
fields := []string{"value"} fields := []string{"value"}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
e.CompactionAge = time.Duration(0) e.CompactionAge = time.Duration(0)
@ -337,7 +320,9 @@ func TestEngine_Compaction(t *testing.T) {
} }
verify := func(series string, points []models.Point, seek int64) { verify := func(series string, points []models.Point, seek int64) {
c := e.Cursor(series, fields, codec, true) tx, _ := e.Begin(false)
defer tx.Rollback()
c := tx.Cursor(series, fields, nil, true)
k, v := c.SeekTo(seek) k, v := c.SeekTo(seek)
p := points[0] p := points[0]
@ -374,13 +359,6 @@ func TestEngine_KeyCollisionsAreHandled(t *testing.T) {
defer e.Cleanup() defer e.Cleanup()
fields := []string{"value"} fields := []string{"value"}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
// make sure two of these keys collide // make sure two of these keys collide
e.HashSeriesField = func(key string) uint64 { e.HashSeriesField = func(key string) uint64 {
@ -395,7 +373,9 @@ func TestEngine_KeyCollisionsAreHandled(t *testing.T) {
} }
verify := func(series string, points []models.Point, seek int64) { verify := func(series string, points []models.Point, seek int64) {
c := e.Cursor(series, fields, codec, true) tx, _ := e.Begin(false)
defer tx.Rollback()
c := tx.Cursor(series, fields, nil, true)
k, v := c.SeekTo(seek) k, v := c.SeekTo(seek)
p := points[0] p := points[0]
@ -467,7 +447,9 @@ func TestEngine_SupportMultipleFields(t *testing.T) {
if err := e.WritePoints([]models.Point{p1, p2}, nil, nil); err != nil { if err := e.WritePoints([]models.Point{p1, p2}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error()) t.Fatalf("failed to write points: %s", err.Error())
} }
c := e.Cursor("cpu,host=A", fields, nil, true) tx, _ := e.Begin(false)
defer tx.Rollback()
c := tx.Cursor("cpu,host=A", fields, nil, true)
k, v := c.SeekTo(0) k, v := c.SeekTo(0)
if k != p1.UnixNano() { if k != p1.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k) t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k)
@ -493,7 +475,9 @@ func TestEngine_SupportMultipleFields(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error()) t.Fatalf("failed to write points: %s", err.Error())
} }
c = e.Cursor("cpu,host=A", fields, nil, true) tx2, _ := e.Begin(false)
defer tx2.Rollback()
c = tx2.Cursor("cpu,host=A", fields, nil, true)
k, v = c.SeekTo(0) k, v = c.SeekTo(0)
if k != p1.UnixNano() { if k != p1.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k) t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k)
@ -522,7 +506,9 @@ func TestEngine_SupportMultipleFields(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error()) t.Fatalf("failed to write points: %s", err.Error())
} }
c = e.Cursor("cpu,host=A", fields, nil, true) tx3, _ := e.Begin(false)
defer tx3.Rollback()
c = tx3.Cursor("cpu,host=A", fields, nil, true)
k, v = c.SeekTo(0) k, v = c.SeekTo(0)
if k != p1.UnixNano() { if k != p1.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k) t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k)
@ -561,7 +547,9 @@ func TestEngine_SupportMultipleFields(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error()) t.Fatalf("failed to write points: %s", err.Error())
} }
c = e.Cursor("cpu,host=A", fields, nil, true) tx4, _ := e.Begin(false)
defer tx4.Rollback()
c = tx4.Cursor("cpu,host=A", fields, nil, true)
k, v = c.SeekTo(0) k, v = c.SeekTo(0)
if k != p1.UnixNano() { if k != p1.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k) t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k)
@ -596,7 +584,7 @@ func TestEngine_SupportMultipleFields(t *testing.T) {
} }
// and ensure we can grab one of the fields // and ensure we can grab one of the fields
c = e.Cursor("cpu,host=A", []string{"value"}, nil, true) c = tx4.Cursor("cpu,host=A", []string{"value"}, nil, true)
k, v = c.SeekTo(4000000000) k, v = c.SeekTo(4000000000)
if k != p4.UnixNano() { if k != p4.UnixNano() {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p4.UnixNano(), k) t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p4.UnixNano(), k)
@ -610,6 +598,42 @@ func TestEngine_SupportMultipleFields(t *testing.T) {
} }
} }
func TestEngine_WriteManyPointsToSingleSeries(t *testing.T) {
e := OpenDefaultEngine()
defer e.Cleanup()
fields := []string{"value"}
var points []models.Point
for i := 1; i <= 10000; i++ {
points = append(points, parsePoint(fmt.Sprintf("cpu,host=A value=%d %d000000000", i, i)))
if i%500 == 0 {
if err := e.WritePoints(points, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
points = nil
}
}
tx, _ := e.Begin(false)
defer tx.Rollback()
c := tx.Cursor("cpu,host=A", fields, nil, true)
k, v := c.SeekTo(0)
for i := 2; i <= 10000; i++ {
k, v = c.Next()
if k != int64(i)*1000000000 {
t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", i*1000000000, k)
}
if v != float64(i) {
t.Fatalf("value wrong:\n\texp:%v\n\tgot:%v", float64(i), v)
}
}
k, _ = c.Next()
if k != tsdb.EOF {
t.Fatal("expected EOF")
}
}
func TestEngine_WriteIndexBenchmarkNames(t *testing.T) { func TestEngine_WriteIndexBenchmarkNames(t *testing.T) {
t.Skip("whatevs") t.Skip("whatevs")