Rename storage engine to tsm1, for Time Structured Merge Tree!
parent
0a11a2fdbc
commit
594253cbba
|
@ -1,4 +1,4 @@
|
|||
package pd1
|
||||
package tsm1
|
||||
|
||||
// bool encoding uses 1 bit per value. Each compressed byte slice contains a 1 byte header
|
||||
// indicating the compression type, followed by a variable byte encoded length indicating
|
|
@ -1,26 +1,26 @@
|
|||
package pd1_test
|
||||
package tsm1_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb/engine/pd1"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
func Test_BoolEncoder_NoValues(t *testing.T) {
|
||||
enc := pd1.NewBoolEncoder()
|
||||
enc := tsm1.NewBoolEncoder()
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewBoolDecoder(b)
|
||||
dec := tsm1.NewBoolDecoder(b)
|
||||
if dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_BoolEncoder_Single(t *testing.T) {
|
||||
enc := pd1.NewBoolEncoder()
|
||||
enc := tsm1.NewBoolEncoder()
|
||||
v1 := true
|
||||
enc.Write(v1)
|
||||
b, err := enc.Bytes()
|
||||
|
@ -28,7 +28,7 @@ func Test_BoolEncoder_Single(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewBoolDecoder(b)
|
||||
dec := tsm1.NewBoolDecoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got false, exp true")
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ func Test_BoolEncoder_Single(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_BoolEncoder_Multi_Compressed(t *testing.T) {
|
||||
enc := pd1.NewBoolEncoder()
|
||||
enc := tsm1.NewBoolEncoder()
|
||||
|
||||
values := make([]bool, 10)
|
||||
for i := range values {
|
||||
|
@ -56,7 +56,7 @@ func Test_BoolEncoder_Multi_Compressed(t *testing.T) {
|
|||
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
|
||||
}
|
||||
|
||||
dec := pd1.NewBoolDecoder(b)
|
||||
dec := tsm1.NewBoolDecoder(b)
|
||||
|
||||
for i, v := range values {
|
||||
if !dec.Next() {
|
|
@ -1,4 +1,4 @@
|
|||
package pd1
|
||||
package tsm1
|
||||
|
||||
import (
|
||||
"math"
|
|
@ -1,4 +1,4 @@
|
|||
package pd1
|
||||
package tsm1
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
|
@ -1,4 +1,4 @@
|
|||
package pd1_test
|
||||
package tsm1_test
|
||||
|
||||
import (
|
||||
// "math/rand"
|
||||
|
@ -8,15 +8,15 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb/engine/pd1"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
func TestEncoding_FloatBlock(t *testing.T) {
|
||||
valueCount := 1000
|
||||
times := getTimes(valueCount, 60, time.Second)
|
||||
values := make(pd1.Values, len(times))
|
||||
values := make(tsm1.Values, len(times))
|
||||
for i, t := range times {
|
||||
values[i] = pd1.NewValue(t, float64(i))
|
||||
values[i] = tsm1.NewValue(t, float64(i))
|
||||
}
|
||||
|
||||
b := values.Encode(nil)
|
||||
|
@ -29,9 +29,9 @@ func TestEncoding_FloatBlock(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEncoding_FloatBlock_ZeroTime(t *testing.T) {
|
||||
values := make(pd1.Values, 3)
|
||||
values := make(tsm1.Values, 3)
|
||||
for i := 0; i < 3; i++ {
|
||||
values[i] = pd1.NewValue(time.Unix(0, 0), float64(i))
|
||||
values[i] = tsm1.NewValue(time.Unix(0, 0), float64(i))
|
||||
}
|
||||
|
||||
b := values.Encode(nil)
|
||||
|
@ -46,9 +46,9 @@ func TestEncoding_FloatBlock_ZeroTime(t *testing.T) {
|
|||
func TestEncoding_IntBlock_Basic(t *testing.T) {
|
||||
valueCount := 1000
|
||||
times := getTimes(valueCount, 60, time.Second)
|
||||
values := make(pd1.Values, len(times))
|
||||
values := make(tsm1.Values, len(times))
|
||||
for i, t := range times {
|
||||
values[i] = pd1.NewValue(t, int64(i))
|
||||
values[i] = tsm1.NewValue(t, int64(i))
|
||||
}
|
||||
|
||||
b := values.Encode(nil)
|
||||
|
@ -74,13 +74,13 @@ func TestEncoding_IntBlock_Basic(t *testing.T) {
|
|||
func TestEncoding_IntBlock_Negatives(t *testing.T) {
|
||||
valueCount := 1000
|
||||
times := getTimes(valueCount, 60, time.Second)
|
||||
values := make(pd1.Values, len(times))
|
||||
values := make(tsm1.Values, len(times))
|
||||
for i, t := range times {
|
||||
v := int64(i)
|
||||
if i%2 == 0 {
|
||||
v = -v
|
||||
}
|
||||
values[i] = pd1.NewValue(t, int64(v))
|
||||
values[i] = tsm1.NewValue(t, int64(v))
|
||||
}
|
||||
|
||||
b := values.Encode(nil)
|
||||
|
@ -95,13 +95,13 @@ func TestEncoding_IntBlock_Negatives(t *testing.T) {
|
|||
func TestEncoding_BoolBlock_Basic(t *testing.T) {
|
||||
valueCount := 1000
|
||||
times := getTimes(valueCount, 60, time.Second)
|
||||
values := make(pd1.Values, len(times))
|
||||
values := make(tsm1.Values, len(times))
|
||||
for i, t := range times {
|
||||
v := true
|
||||
if i%2 == 0 {
|
||||
v = false
|
||||
}
|
||||
values[i] = pd1.NewValue(t, v)
|
||||
values[i] = tsm1.NewValue(t, v)
|
||||
}
|
||||
|
||||
b := values.Encode(nil)
|
||||
|
@ -116,9 +116,9 @@ func TestEncoding_BoolBlock_Basic(t *testing.T) {
|
|||
func TestEncoding_StringBlock_Basic(t *testing.T) {
|
||||
valueCount := 1000
|
||||
times := getTimes(valueCount, 60, time.Second)
|
||||
values := make(pd1.Values, len(times))
|
||||
values := make(tsm1.Values, len(times))
|
||||
for i, t := range times {
|
||||
values[i] = pd1.NewValue(t, fmt.Sprintf("value %d", i))
|
||||
values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
|
||||
}
|
||||
|
||||
b := values.Encode(nil)
|
|
@ -1,4 +1,4 @@
|
|||
package pd1
|
||||
package tsm1
|
||||
|
||||
/*
|
||||
This code is originally from: https://github.com/dgryski/go-tsz and has been modified to remove
|
|
@ -1,15 +1,15 @@
|
|||
package pd1_test
|
||||
package tsm1_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb/engine/pd1"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
func TestFloatEncoder_Simple(t *testing.T) {
|
||||
|
||||
// Example from the paper
|
||||
s := pd1.NewFloatEncoder()
|
||||
s := tsm1.NewFloatEncoder()
|
||||
|
||||
s.Push(12)
|
||||
s.Push(12)
|
||||
|
@ -94,7 +94,7 @@ var TwoHoursData = []struct {
|
|||
|
||||
func TestFloatEncoder_Roundtrip(t *testing.T) {
|
||||
|
||||
s := pd1.NewFloatEncoder()
|
||||
s := tsm1.NewFloatEncoder()
|
||||
for _, p := range TwoHoursData {
|
||||
s.Push(p.v)
|
||||
}
|
||||
|
@ -123,7 +123,7 @@ func TestFloatEncoder_Roundtrip(t *testing.T) {
|
|||
|
||||
func BenchmarkFloatEncoder(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
s := pd1.NewFloatEncoder()
|
||||
s := tsm1.NewFloatEncoder()
|
||||
for _, tt := range TwoHoursData {
|
||||
s.Push(tt.v)
|
||||
}
|
||||
|
@ -132,7 +132,7 @@ func BenchmarkFloatEncoder(b *testing.B) {
|
|||
}
|
||||
|
||||
func BenchmarkFloatDecoder(b *testing.B) {
|
||||
s := pd1.NewFloatEncoder()
|
||||
s := tsm1.NewFloatEncoder()
|
||||
for _, tt := range TwoHoursData {
|
||||
s.Push(tt.v)
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package pd1
|
||||
package tsm1
|
||||
|
||||
// Int64 encoding uses two different strategies depending on the range of values in
|
||||
// the uncompressed data. Encoded values are first encoding used zig zag encoding.
|
|
@ -1,27 +1,27 @@
|
|||
package pd1_test
|
||||
package tsm1_test
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb/engine/pd1"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
func Test_Int64Encoder_NoValues(t *testing.T) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
enc := tsm1.NewInt64Encoder()
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewInt64Decoder(b)
|
||||
dec := tsm1.NewInt64Decoder(b)
|
||||
if dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_Int64Encoder_One(t *testing.T) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
enc := tsm1.NewInt64Encoder()
|
||||
v1 := int64(1)
|
||||
|
||||
enc.Write(1)
|
||||
|
@ -30,7 +30,7 @@ func Test_Int64Encoder_One(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewInt64Decoder(b)
|
||||
dec := tsm1.NewInt64Decoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ func Test_Int64Encoder_One(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_Int64Encoder_Two(t *testing.T) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
enc := tsm1.NewInt64Encoder()
|
||||
var v1, v2 int64 = 1, 2
|
||||
|
||||
enc.Write(v1)
|
||||
|
@ -52,7 +52,7 @@ func Test_Int64Encoder_Two(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewInt64Decoder(b)
|
||||
dec := tsm1.NewInt64Decoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ func Test_Int64Encoder_Two(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_Int64Encoder_Negative(t *testing.T) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
enc := tsm1.NewInt64Encoder()
|
||||
var v1, v2, v3 int64 = -2, 0, 1
|
||||
|
||||
enc.Write(v1)
|
||||
|
@ -83,7 +83,7 @@ func Test_Int64Encoder_Negative(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewInt64Decoder(b)
|
||||
dec := tsm1.NewInt64Decoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
@ -110,7 +110,7 @@ func Test_Int64Encoder_Negative(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_Int64Encoder_Large_Range(t *testing.T) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
enc := tsm1.NewInt64Encoder()
|
||||
var v1, v2 int64 = math.MinInt64, math.MaxInt64
|
||||
enc.Write(v1)
|
||||
enc.Write(v2)
|
||||
|
@ -119,7 +119,7 @@ func Test_Int64Encoder_Large_Range(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewInt64Decoder(b)
|
||||
dec := tsm1.NewInt64Decoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
@ -138,7 +138,7 @@ func Test_Int64Encoder_Large_Range(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_Int64Encoder_Uncompressed(t *testing.T) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
enc := tsm1.NewInt64Encoder()
|
||||
var v1, v2, v3 int64 = 0, 1, 1 << 60
|
||||
|
||||
enc.Write(v1)
|
||||
|
@ -155,7 +155,7 @@ func Test_Int64Encoder_Uncompressed(t *testing.T) {
|
|||
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
|
||||
}
|
||||
|
||||
dec := pd1.NewInt64Decoder(b)
|
||||
dec := tsm1.NewInt64Decoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
@ -182,7 +182,7 @@ func Test_Int64Encoder_Uncompressed(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_Int64Encoder_AllNegative(t *testing.T) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
enc := tsm1.NewInt64Encoder()
|
||||
values := []int64{
|
||||
-10, -5, -1,
|
||||
}
|
||||
|
@ -196,7 +196,7 @@ func Test_Int64Encoder_AllNegative(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewInt64Decoder(b)
|
||||
dec := tsm1.NewInt64Decoder(b)
|
||||
i := 0
|
||||
for dec.Next() {
|
||||
if i > len(values) {
|
||||
|
@ -211,7 +211,7 @@ func Test_Int64Encoder_AllNegative(t *testing.T) {
|
|||
}
|
||||
|
||||
func BenchmarkInt64Encoder(b *testing.B) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
enc := tsm1.NewInt64Encoder()
|
||||
x := make([]int64, 1024)
|
||||
for i := 0; i < len(x); i++ {
|
||||
x[i] = int64(i)
|
||||
|
@ -230,7 +230,7 @@ type byteSetter interface {
|
|||
|
||||
func BenchmarkInt64Decoder(b *testing.B) {
|
||||
x := make([]int64, 1024)
|
||||
enc := pd1.NewInt64Encoder()
|
||||
enc := tsm1.NewInt64Encoder()
|
||||
for i := 0; i < len(x); i++ {
|
||||
x[i] = int64(i)
|
||||
enc.Write(x[i])
|
||||
|
@ -239,7 +239,7 @@ func BenchmarkInt64Decoder(b *testing.B) {
|
|||
|
||||
b.ResetTimer()
|
||||
|
||||
dec := pd1.NewInt64Decoder(bytes)
|
||||
dec := tsm1.NewInt64Decoder(bytes)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
dec.(byteSetter).SetBytes(bytes)
|
|
@ -1,4 +1,4 @@
|
|||
package pd1
|
||||
package tsm1
|
||||
|
||||
// String encoding uses snappy compression to compress each string. Each string is
|
||||
// appended to byte slice prefixed with a variable byte length followed by the string
|
|
@ -1,27 +1,27 @@
|
|||
package pd1_test
|
||||
package tsm1_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb/engine/pd1"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
func Test_StringEncoder_NoValues(t *testing.T) {
|
||||
enc := pd1.NewStringEncoder()
|
||||
enc := tsm1.NewStringEncoder()
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewStringDecoder(b)
|
||||
dec := tsm1.NewStringDecoder(b)
|
||||
if dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_StringEncoder_Single(t *testing.T) {
|
||||
enc := pd1.NewStringEncoder()
|
||||
enc := tsm1.NewStringEncoder()
|
||||
v1 := "v1"
|
||||
enc.Write(v1)
|
||||
b, err := enc.Bytes()
|
||||
|
@ -29,7 +29,7 @@ func Test_StringEncoder_Single(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewStringDecoder(b)
|
||||
dec := tsm1.NewStringDecoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got false, exp true")
|
||||
}
|
||||
|
@ -40,7 +40,7 @@ func Test_StringEncoder_Single(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_StringEncoder_Multi_Compressed(t *testing.T) {
|
||||
enc := pd1.NewStringEncoder()
|
||||
enc := tsm1.NewStringEncoder()
|
||||
|
||||
values := make([]string, 10)
|
||||
for i := range values {
|
||||
|
@ -53,15 +53,15 @@ func Test_StringEncoder_Multi_Compressed(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if b[0]>>4 != pd1.EncodingSnappy {
|
||||
t.Fatalf("unexpected encoding: got %v, exp %v", b[0], pd1.EncodingSnappy)
|
||||
if b[0]>>4 != tsm1.EncodingSnappy {
|
||||
t.Fatalf("unexpected encoding: got %v, exp %v", b[0], tsm1.EncodingSnappy)
|
||||
}
|
||||
|
||||
if exp := 47; len(b) != exp {
|
||||
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
|
||||
}
|
||||
|
||||
dec := pd1.NewStringDecoder(b)
|
||||
dec := tsm1.NewStringDecoder(b)
|
||||
|
||||
for i, v := range values {
|
||||
if !dec.Next() {
|
|
@ -1,4 +1,4 @@
|
|||
package pd1
|
||||
package tsm1
|
||||
|
||||
// Timestamp encoding is adapative and based on structure of the timestamps that are encoded. It
|
||||
// uses a combination of delta encoding, scaling and compression using simple8b, run length encoding
|
|
@ -1,14 +1,14 @@
|
|||
package pd1_test
|
||||
package tsm1_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb/engine/pd1"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
func Test_TimeEncoder(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
enc := tsm1.NewTimeEncoder()
|
||||
|
||||
x := []time.Time{}
|
||||
now := time.Unix(0, 0)
|
||||
|
@ -24,11 +24,11 @@ func Test_TimeEncoder(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if got := b[0] >> 4; got != pd1.EncodingPackedSimple {
|
||||
if got := b[0] >> 4; got != tsm1.EncodingPackedSimple {
|
||||
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
dec := tsm1.NewTimeDecoder(b)
|
||||
for i, v := range x {
|
||||
if !dec.Next() {
|
||||
t.Fatalf("Next == false, expected true")
|
||||
|
@ -41,20 +41,20 @@ func Test_TimeEncoder(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_TimeEncoder_NoValues(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
enc := tsm1.NewTimeEncoder()
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
dec := tsm1.NewTimeDecoder(b)
|
||||
if dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_TimeEncoder_One(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
enc := tsm1.NewTimeEncoder()
|
||||
tm := time.Unix(0, 0)
|
||||
|
||||
enc.Write(tm)
|
||||
|
@ -63,11 +63,11 @@ func Test_TimeEncoder_One(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if got := b[0] >> 4; got != pd1.EncodingPackedSimple {
|
||||
if got := b[0] >> 4; got != tsm1.EncodingPackedSimple {
|
||||
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
dec := tsm1.NewTimeDecoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
@ -78,7 +78,7 @@ func Test_TimeEncoder_One(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_TimeEncoder_Two(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
enc := tsm1.NewTimeEncoder()
|
||||
t1 := time.Unix(0, 0)
|
||||
t2 := time.Unix(0, 1)
|
||||
enc.Write(t1)
|
||||
|
@ -89,11 +89,11 @@ func Test_TimeEncoder_Two(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if got := b[0] >> 4; got != pd1.EncodingPackedSimple {
|
||||
if got := b[0] >> 4; got != tsm1.EncodingPackedSimple {
|
||||
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
dec := tsm1.NewTimeDecoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
@ -112,7 +112,7 @@ func Test_TimeEncoder_Two(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_TimeEncoder_Three(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
enc := tsm1.NewTimeEncoder()
|
||||
t1 := time.Unix(0, 0)
|
||||
t2 := time.Unix(0, 1)
|
||||
t3 := time.Unix(0, 2)
|
||||
|
@ -126,11 +126,11 @@ func Test_TimeEncoder_Three(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if got := b[0] >> 4; got != pd1.EncodingPackedSimple {
|
||||
if got := b[0] >> 4; got != tsm1.EncodingPackedSimple {
|
||||
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
dec := tsm1.NewTimeDecoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
@ -157,7 +157,7 @@ func Test_TimeEncoder_Three(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_TimeEncoder_Large_Range(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
enc := tsm1.NewTimeEncoder()
|
||||
t1 := time.Unix(0, 1442369134000000000)
|
||||
t2 := time.Unix(0, 1442369135000000000)
|
||||
enc.Write(t1)
|
||||
|
@ -167,11 +167,11 @@ func Test_TimeEncoder_Large_Range(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if got := b[0] >> 4; got != pd1.EncodingPackedSimple {
|
||||
if got := b[0] >> 4; got != tsm1.EncodingPackedSimple {
|
||||
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
dec := tsm1.NewTimeDecoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
@ -190,7 +190,7 @@ func Test_TimeEncoder_Large_Range(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_TimeEncoder_Uncompressed(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
enc := tsm1.NewTimeEncoder()
|
||||
t1 := time.Unix(0, 0)
|
||||
t2 := time.Unix(1, 0)
|
||||
|
||||
|
@ -210,11 +210,11 @@ func Test_TimeEncoder_Uncompressed(t *testing.T) {
|
|||
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
|
||||
}
|
||||
|
||||
if got := b[0] >> 4; got != pd1.EncodingUncompressed {
|
||||
if got := b[0] >> 4; got != tsm1.EncodingUncompressed {
|
||||
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
dec := tsm1.NewTimeDecoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
@ -241,7 +241,7 @@ func Test_TimeEncoder_Uncompressed(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_TimeEncoder_RLE(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
enc := tsm1.NewTimeEncoder()
|
||||
var ts []time.Time
|
||||
for i := 0; i < 500; i++ {
|
||||
ts = append(ts, time.Unix(int64(i), 0))
|
||||
|
@ -256,7 +256,7 @@ func Test_TimeEncoder_RLE(t *testing.T) {
|
|||
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
|
||||
}
|
||||
|
||||
if got := b[0] >> 4; got != pd1.EncodingRLE {
|
||||
if got := b[0] >> 4; got != tsm1.EncodingRLE {
|
||||
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
|
||||
}
|
||||
|
||||
|
@ -264,7 +264,7 @@ func Test_TimeEncoder_RLE(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
dec := tsm1.NewTimeDecoder(b)
|
||||
for i, v := range ts {
|
||||
if !dec.Next() {
|
||||
t.Fatalf("Next == false, expected true")
|
||||
|
@ -281,7 +281,7 @@ func Test_TimeEncoder_RLE(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_TimeEncoder_Reverse(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
enc := tsm1.NewTimeEncoder()
|
||||
ts := []time.Time{
|
||||
time.Unix(0, 3),
|
||||
time.Unix(0, 2),
|
||||
|
@ -297,11 +297,11 @@ func Test_TimeEncoder_Reverse(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if got := b[0] >> 4; got != pd1.EncodingUncompressed {
|
||||
if got := b[0] >> 4; got != tsm1.EncodingUncompressed {
|
||||
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
dec := tsm1.NewTimeDecoder(b)
|
||||
i := 0
|
||||
for dec.Next() {
|
||||
if ts[i] != dec.Read() {
|
||||
|
@ -312,7 +312,7 @@ func Test_TimeEncoder_Reverse(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_TimeEncoder_220SecondDelta(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
enc := tsm1.NewTimeEncoder()
|
||||
var ts []time.Time
|
||||
now := time.Now()
|
||||
for i := 0; i < 220; i++ {
|
||||
|
@ -333,11 +333,11 @@ func Test_TimeEncoder_220SecondDelta(t *testing.T) {
|
|||
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
|
||||
}
|
||||
|
||||
if got := b[0] >> 4; got != pd1.EncodingRLE {
|
||||
if got := b[0] >> 4; got != tsm1.EncodingRLE {
|
||||
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
dec := tsm1.NewTimeDecoder(b)
|
||||
i := 0
|
||||
for dec.Next() {
|
||||
if ts[i] != dec.Read() {
|
||||
|
@ -356,7 +356,7 @@ func Test_TimeEncoder_220SecondDelta(t *testing.T) {
|
|||
}
|
||||
|
||||
func BenchmarkTimeEncoder(b *testing.B) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
enc := tsm1.NewTimeEncoder()
|
||||
x := make([]time.Time, 1024)
|
||||
for i := 0; i < len(x); i++ {
|
||||
x[i] = time.Now()
|
||||
|
@ -371,7 +371,7 @@ func BenchmarkTimeEncoder(b *testing.B) {
|
|||
|
||||
func BenchmarkTimeDecoder(b *testing.B) {
|
||||
x := make([]time.Time, 1024)
|
||||
enc := pd1.NewTimeEncoder()
|
||||
enc := tsm1.NewTimeEncoder()
|
||||
for i := 0; i < len(x); i++ {
|
||||
x[i] = time.Now()
|
||||
enc.Write(x[i])
|
||||
|
@ -382,7 +382,7 @@ func BenchmarkTimeDecoder(b *testing.B) {
|
|||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
dec := pd1.NewTimeDecoder(bytes)
|
||||
dec := tsm1.NewTimeDecoder(bytes)
|
||||
b.StartTimer()
|
||||
for dec.Next() {
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package pd1
|
||||
package tsm1
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
|
@ -1,4 +1,4 @@
|
|||
package pd1_test
|
||||
package tsm1_test
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
|
@ -13,7 +13,7 @@ import (
|
|||
"github.com/influxdb/influxdb/influxql"
|
||||
"github.com/influxdb/influxdb/models"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/pd1"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
func TestEngine_WriteAndReadFloats(t *testing.T) {
|
||||
|
@ -1236,21 +1236,21 @@ func TestEngine_Deletes(t *testing.T) {
|
|||
}()
|
||||
}
|
||||
|
||||
// Engine represents a test wrapper for pd1.Engine.
|
||||
// Engine represents a test wrapper for tsm1.Engine.
|
||||
type Engine struct {
|
||||
*pd1.Engine
|
||||
*tsm1.Engine
|
||||
}
|
||||
|
||||
// NewEngine returns a new instance of Engine.
|
||||
func NewEngine(opt tsdb.EngineOptions) *Engine {
|
||||
dir, err := ioutil.TempDir("", "pd1-test")
|
||||
dir, err := ioutil.TempDir("", "tsm1-test")
|
||||
if err != nil {
|
||||
panic("couldn't get temp dir")
|
||||
}
|
||||
|
||||
// Create test wrapper and attach mocks.
|
||||
e := &Engine{
|
||||
Engine: pd1.NewEngine(dir, dir, opt).(*pd1.Engine),
|
||||
Engine: tsm1.NewEngine(dir, dir, opt).(*tsm1.Engine),
|
||||
}
|
||||
|
||||
return e
|
|
@ -1,4 +1,4 @@
|
|||
package pd1
|
||||
package tsm1
|
||||
|
||||
import (
|
||||
"io"
|
|
@ -1,4 +1,4 @@
|
|||
package pd1
|
||||
package tsm1
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
@ -131,7 +131,7 @@ func NewLog(path string) *Log {
|
|||
SegmentSize: DefaultSegmentSize,
|
||||
FlushMemorySizeThreshold: tsdb.DefaultFlushMemorySizeThreshold,
|
||||
MaxMemorySizeThreshold: tsdb.DefaultMaxMemorySizeThreshold,
|
||||
logger: log.New(os.Stderr, "[pd1wal] ", log.LstdFlags),
|
||||
logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,8 +139,8 @@ func NewLog(path string) *Log {
|
|||
func (l *Log) Open() error {
|
||||
|
||||
if l.LoggingEnabled {
|
||||
l.logger.Printf("PD1 WAL starting with %d flush memory size threshold and %d max memory size threshold\n", l.FlushMemorySizeThreshold, l.MaxMemorySizeThreshold)
|
||||
l.logger.Printf("PD1 WAL writing to %s\n", l.path)
|
||||
l.logger.Printf("tsm1 WAL starting with %d flush memory size threshold and %d max memory size threshold\n", l.FlushMemorySizeThreshold, l.MaxMemorySizeThreshold)
|
||||
l.logger.Printf("tsm1 WAL writing to %s\n", l.path)
|
||||
}
|
||||
if err := os.MkdirAll(l.path, 0777); err != nil {
|
||||
return err
|
|
@ -1,4 +1,4 @@
|
|||
package pd1_test
|
||||
package tsm1_test
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
|
@ -8,19 +8,19 @@ import (
|
|||
|
||||
"github.com/influxdb/influxdb/models"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/pd1"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
func TestWAL_TestWriteQueryOpen(t *testing.T) {
|
||||
w := NewWAL()
|
||||
defer w.Cleanup()
|
||||
|
||||
var vals map[string]pd1.Values
|
||||
var vals map[string]tsm1.Values
|
||||
var fields map[string]*tsdb.MeasurementFields
|
||||
var series []*tsdb.SeriesCreate
|
||||
|
||||
w.Index = &MockIndexWriter{
|
||||
fn: func(valuesByKey map[string]pd1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
|
||||
fn: func(valuesByKey map[string]tsm1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
|
||||
vals = valuesByKey
|
||||
fields = measurementFieldsToSave
|
||||
series = seriesToCreate
|
||||
|
@ -141,18 +141,18 @@ func TestWAL_TestWriteQueryOpen(t *testing.T) {
|
|||
}
|
||||
|
||||
type Log struct {
|
||||
*pd1.Log
|
||||
*tsm1.Log
|
||||
path string
|
||||
}
|
||||
|
||||
func NewWAL() *Log {
|
||||
dir, err := ioutil.TempDir("", "pd1-test")
|
||||
dir, err := ioutil.TempDir("", "tsm1-test")
|
||||
if err != nil {
|
||||
panic("couldn't get temp dir")
|
||||
}
|
||||
|
||||
l := &Log{
|
||||
Log: pd1.NewLog(dir),
|
||||
Log: tsm1.NewLog(dir),
|
||||
path: dir,
|
||||
}
|
||||
l.LoggingEnabled = true
|
||||
|
@ -166,10 +166,10 @@ func (l *Log) Cleanup() error {
|
|||
}
|
||||
|
||||
type MockIndexWriter struct {
|
||||
fn func(valuesByKey map[string]pd1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
|
||||
fn func(valuesByKey map[string]tsm1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
|
||||
}
|
||||
|
||||
func (m *MockIndexWriter) Write(valuesByKey map[string]pd1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
|
||||
func (m *MockIndexWriter) Write(valuesByKey map[string]tsm1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
|
||||
return m.fn(valuesByKey, measurementFieldsToSave, seriesToCreate)
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package pd1
|
||||
package tsm1
|
||||
|
||||
import (
|
||||
"reflect"
|
|
@ -1,15 +1,15 @@
|
|||
package pd1_test
|
||||
package tsm1_test
|
||||
|
||||
import (
|
||||
// "sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb/engine/pd1"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
func TestWriteLock_FullCover(t *testing.T) {
|
||||
w := &pd1.WriteLock{}
|
||||
w := &tsm1.WriteLock{}
|
||||
w.LockRange(2, 10)
|
||||
|
||||
lock := make(chan bool)
|
||||
|
@ -27,7 +27,7 @@ func TestWriteLock_FullCover(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestWriteLock_RightIntersect(t *testing.T) {
|
||||
w := &pd1.WriteLock{}
|
||||
w := &tsm1.WriteLock{}
|
||||
w.LockRange(2, 10)
|
||||
|
||||
lock := make(chan bool)
|
||||
|
@ -45,7 +45,7 @@ func TestWriteLock_RightIntersect(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestWriteLock_LeftIntersect(t *testing.T) {
|
||||
w := &pd1.WriteLock{}
|
||||
w := &tsm1.WriteLock{}
|
||||
w.LockRange(1, 4)
|
||||
|
||||
lock := make(chan bool)
|
||||
|
@ -63,7 +63,7 @@ func TestWriteLock_LeftIntersect(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestWriteLock_Inside(t *testing.T) {
|
||||
w := &pd1.WriteLock{}
|
||||
w := &tsm1.WriteLock{}
|
||||
w.LockRange(4, 8)
|
||||
|
||||
lock := make(chan bool)
|
||||
|
@ -81,7 +81,7 @@ func TestWriteLock_Inside(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestWriteLock_Same(t *testing.T) {
|
||||
w := &pd1.WriteLock{}
|
||||
w := &tsm1.WriteLock{}
|
||||
w.LockRange(2, 10)
|
||||
|
||||
lock := make(chan bool)
|
||||
|
@ -99,7 +99,7 @@ func TestWriteLock_Same(t *testing.T) {
|
|||
}
|
||||
|
||||
// func TestWriteLock_FreeRangeWithContentionElsewhere(t *testing.T) {
|
||||
// w := &pd1.WriteLock{}
|
||||
// w := &tsm1.WriteLock{}
|
||||
// w.LockRange(2, 10)
|
||||
|
||||
// lock := make(chan bool)
|
Loading…
Reference in New Issue