diff --git a/tsdb/engine/pd1/pd1_test.go b/tsdb/engine/pd1/pd1_test.go index 491c7cd552..e1abe87360 100644 --- a/tsdb/engine/pd1/pd1_test.go +++ b/tsdb/engine/pd1/pd1_test.go @@ -10,7 +10,6 @@ import ( "testing" "time" - "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/models" "github.com/influxdb/influxdb/tsdb" "github.com/influxdb/influxdb/tsdb/engine/pd1" @@ -30,16 +29,11 @@ func TestEngine_WriteAndReadFloats(t *testing.T) { } fields := []string{"value"} - codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ - "value": { - ID: uint8(1), - Name: "value", - Type: influxql.Float, - }, - }) verify := func(checkSingleBVal bool) { - c := e.Cursor("cpu,host=A", fields, codec, true) + tx, _ := e.Begin(false) + defer tx.Rollback() + c := tx.Cursor("cpu,host=A", fields, nil, true) k, v := c.SeekTo(0) if k != p1.UnixNano() { t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k) @@ -59,7 +53,7 @@ func TestEngine_WriteAndReadFloats(t *testing.T) { t.Fatal("expected EOF") } - c = e.Cursor("cpu,host=B", fields, codec, true) + c = tx.Cursor("cpu,host=B", fields, nil, true) k, v = c.SeekTo(0) if k != p2.UnixNano() { t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k) @@ -82,7 +76,9 @@ func TestEngine_WriteAndReadFloats(t *testing.T) { } verify(false) - c := e.Cursor("cpu,host=B", fields, codec, true) + tx, _ := e.Begin(false) + defer tx.Rollback() + c := tx.Cursor("cpu,host=B", fields, nil, true) k, v := c.SeekTo(0) if k != p2.UnixNano() { t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k) @@ -107,7 +103,7 @@ func TestEngine_WriteAndReadFloats(t *testing.T) { t.Fatal("p2 data not equal") } - c = e.Cursor("cpu,host=A", fields, codec, true) + c = tx.Cursor("cpu,host=A", fields, nil, true) k, v = c.SeekTo(0) if k != p1.UnixNano() { t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k) @@ -161,7 +157,9 @@ func TestEngine_WriteIndexQueryAcrossDataFiles(t *testing.T) { fields := []string{"value"} verify := func(series string, points []models.Point, seek int64) { - c := e.Cursor(series, fields, nil, true) + tx, _ := e.Begin(false) + defer tx.Rollback() + c := tx.Cursor(series, fields, nil, true) k, v := c.SeekTo(seek) p := points[0] @@ -191,13 +189,6 @@ func TestEngine_WriteOverwritePreviousPoint(t *testing.T) { defer e.Cleanup() fields := []string{"value"} - codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ - "value": { - ID: uint8(1), - Name: "value", - Type: influxql.Float, - }, - }) p1 := parsePoint("cpu,host=A value=1.1 1000000000") p2 := parsePoint("cpu,host=A value=1.2 1000000000") @@ -207,7 +198,9 @@ func TestEngine_WriteOverwritePreviousPoint(t *testing.T) { t.Fatalf("failed to write points: %s", err.Error()) } - c := e.Cursor("cpu,host=A", fields, codec, true) + tx, _ := e.Begin(false) + defer tx.Rollback() + c := tx.Cursor("cpu,host=A", fields, nil, true) k, v := c.SeekTo(0) if k != p2.UnixNano() { t.Fatalf("time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k) @@ -224,7 +217,9 @@ func TestEngine_WriteOverwritePreviousPoint(t *testing.T) { t.Fatalf("failed to write points: %s", err.Error()) } - c = e.Cursor("cpu,host=A", fields, codec, true) + tx2, _ := e.Begin(false) + defer tx2.Rollback() + c = tx2.Cursor("cpu,host=A", fields, nil, true) k, v = c.SeekTo(0) if k != p3.UnixNano() { t.Fatalf("time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), k) @@ -243,13 +238,6 @@ func TestEngine_CursorCombinesWALAndIndex(t *testing.T) { defer e.Cleanup() fields := []string{"value"} - codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ - "value": { - ID: uint8(1), - Name: "value", - Type: influxql.Float, - }, - }) p1 := parsePoint("cpu,host=A value=1.1 1000000000") p2 := parsePoint("cpu,host=A value=1.2 2000000000") @@ -262,7 +250,9 @@ func TestEngine_CursorCombinesWALAndIndex(t *testing.T) { t.Fatalf("failed to write points: %s", err.Error()) } - c := e.Cursor("cpu,host=A", fields, codec, true) + tx, _ := e.Begin(false) + defer tx.Rollback() + c := tx.Cursor("cpu,host=A", fields, nil, true) k, v := c.SeekTo(0) if k != p1.UnixNano() { t.Fatalf("time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k) @@ -318,13 +308,6 @@ func TestEngine_Compaction(t *testing.T) { } fields := []string{"value"} - codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ - "value": { - ID: uint8(1), - Name: "value", - Type: influxql.Float, - }, - }) e.CompactionAge = time.Duration(0) @@ -337,7 +320,9 @@ func TestEngine_Compaction(t *testing.T) { } verify := func(series string, points []models.Point, seek int64) { - c := e.Cursor(series, fields, codec, true) + tx, _ := e.Begin(false) + defer tx.Rollback() + c := tx.Cursor(series, fields, nil, true) k, v := c.SeekTo(seek) p := points[0] @@ -374,13 +359,6 @@ func TestEngine_KeyCollisionsAreHandled(t *testing.T) { defer e.Cleanup() fields := []string{"value"} - codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ - "value": { - ID: uint8(1), - Name: "value", - Type: influxql.Float, - }, - }) // make sure two of these keys collide e.HashSeriesField = func(key string) uint64 { @@ -395,7 +373,9 @@ func TestEngine_KeyCollisionsAreHandled(t *testing.T) { } verify := func(series string, points []models.Point, seek int64) { - c := e.Cursor(series, fields, codec, true) + tx, _ := e.Begin(false) + defer tx.Rollback() + c := tx.Cursor(series, fields, nil, true) k, v := c.SeekTo(seek) p := points[0] @@ -467,7 +447,9 @@ func TestEngine_SupportMultipleFields(t *testing.T) { if err := e.WritePoints([]models.Point{p1, p2}, nil, nil); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } - c := e.Cursor("cpu,host=A", fields, nil, true) + tx, _ := e.Begin(false) + defer tx.Rollback() + c := tx.Cursor("cpu,host=A", fields, nil, true) k, v := c.SeekTo(0) if k != p1.UnixNano() { t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k) @@ -493,7 +475,9 @@ func TestEngine_SupportMultipleFields(t *testing.T) { t.Fatalf("failed to write points: %s", err.Error()) } - c = e.Cursor("cpu,host=A", fields, nil, true) + tx2, _ := e.Begin(false) + defer tx2.Rollback() + c = tx2.Cursor("cpu,host=A", fields, nil, true) k, v = c.SeekTo(0) if k != p1.UnixNano() { t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k) @@ -522,7 +506,9 @@ func TestEngine_SupportMultipleFields(t *testing.T) { t.Fatalf("failed to write points: %s", err.Error()) } - c = e.Cursor("cpu,host=A", fields, nil, true) + tx3, _ := e.Begin(false) + defer tx3.Rollback() + c = tx3.Cursor("cpu,host=A", fields, nil, true) k, v = c.SeekTo(0) if k != p1.UnixNano() { t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k) @@ -561,7 +547,9 @@ func TestEngine_SupportMultipleFields(t *testing.T) { t.Fatalf("failed to write points: %s", err.Error()) } - c = e.Cursor("cpu,host=A", fields, nil, true) + tx4, _ := e.Begin(false) + defer tx4.Rollback() + c = tx4.Cursor("cpu,host=A", fields, nil, true) k, v = c.SeekTo(0) if k != p1.UnixNano() { t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p1.UnixNano(), k) @@ -596,7 +584,7 @@ func TestEngine_SupportMultipleFields(t *testing.T) { } // and ensure we can grab one of the fields - c = e.Cursor("cpu,host=A", []string{"value"}, nil, true) + c = tx4.Cursor("cpu,host=A", []string{"value"}, nil, true) k, v = c.SeekTo(4000000000) if k != p4.UnixNano() { t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", p4.UnixNano(), k) @@ -610,6 +598,42 @@ func TestEngine_SupportMultipleFields(t *testing.T) { } } +func TestEngine_WriteManyPointsToSingleSeries(t *testing.T) { + e := OpenDefaultEngine() + defer e.Cleanup() + + fields := []string{"value"} + + var points []models.Point + for i := 1; i <= 10000; i++ { + points = append(points, parsePoint(fmt.Sprintf("cpu,host=A value=%d %d000000000", i, i))) + if i%500 == 0 { + if err := e.WritePoints(points, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + points = nil + } + } + + tx, _ := e.Begin(false) + defer tx.Rollback() + c := tx.Cursor("cpu,host=A", fields, nil, true) + k, v := c.SeekTo(0) + for i := 2; i <= 10000; i++ { + k, v = c.Next() + if k != int64(i)*1000000000 { + t.Fatalf("time wrong:\n\texp: %d\n\tgot: %d", i*1000000000, k) + } + if v != float64(i) { + t.Fatalf("value wrong:\n\texp:%v\n\tgot:%v", float64(i), v) + } + } + k, _ = c.Next() + if k != tsdb.EOF { + t.Fatal("expected EOF") + } +} + func TestEngine_WriteIndexBenchmarkNames(t *testing.T) { t.Skip("whatevs")