From d517bad6f2a624ab8910d58e34f81f711256c732 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 16 Nov 2015 09:40:52 -0700 Subject: [PATCH 1/6] Add BlockType func Allows the block type to be determined without decoding all the values. --- tsdb/engine/tsm1/encoding.go | 17 ++++++++++++++- tsdb/engine/tsm1/encoding_test.go | 36 +++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index 1b7a742032..d83a2b397e 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -98,6 +98,18 @@ func (a Values) Encode(buf []byte) ([]byte, error) { return nil, fmt.Errorf("unsupported value type %T", a[0]) } +// BlockType returns the type of value encoded in a block or an error +// if the block type is unknown. +func BlockType(block []byte) (byte, error) { + blockType := block[8] + switch blockType { + case BlockFloat64, BlockInt64, BlockBool, BlockString: + return blockType, nil + default: + return 0, fmt.Errorf("unknown block type: %d", blockType) + } +} + // DecodeBlock takes a byte array and will decode into values of the appropriate type // based on the block func DecodeBlock(block []byte, vals *[]Value) error { @@ -105,7 +117,10 @@ func DecodeBlock(block []byte, vals *[]Value) error { panic(fmt.Sprintf("decode of short block: got %v, exp %v", len(block), encodedBlockHeaderSize)) } - blockType := block[8] + blockType, err := BlockType(block) + if err != nil { + return err + } switch blockType { case BlockFloat64: return decodeFloatBlock(block, vals) diff --git a/tsdb/engine/tsm1/encoding_test.go b/tsdb/engine/tsm1/encoding_test.go index 3c72afc53f..fd8b22b48f 100644 --- a/tsdb/engine/tsm1/encoding_test.go +++ b/tsdb/engine/tsm1/encoding_test.go @@ -188,6 +188,42 @@ func TestEncoding_StringBlock_Basic(t *testing.T) { } } +func TestEncoding_BlockType(t *testing.T) { + tests := []struct { + value interface{} + blockType byte + }{ + {value: float64(1.0), blockType: tsm1.BlockFloat64}, + {value: int64(1), blockType: tsm1.BlockInt64}, + {value: true, blockType: tsm1.BlockBool}, + {value: "string", blockType: tsm1.BlockString}, + } + + for _, test := range tests { + var values []tsm1.Value + values = append(values, tsm1.NewValue(time.Unix(0, 0), test.value)) + + b, err := tsm1.Values(values).Encode(nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + bt, err := tsm1.BlockType(b) + if err != nil { + t.Fatalf("unexpected error decoding block type: %v", err) + } + + if got, exp := bt, test.blockType; got != exp { + t.Fatalf("block type mismatch: got %v, exp %v", got, exp) + } + } + + _, err := tsm1.BlockType([]byte{0, 0, 0, 0, 0, 0, 0, 0, 10}) + if err == nil { + t.Fatalf("expected error decoding block type, got nil") + } +} + func getTimes(n, step int, precision time.Duration) []time.Time { t := time.Now().Round(precision) a := make([]time.Time, n) From 5a12c494755cd9c7fa4da82bc9c4e42bda0a226c Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 16 Nov 2015 09:51:03 -0700 Subject: [PATCH 2/6] Make type specific decoders exported --- tsdb/engine/tsm1/encoding.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index d83a2b397e..08f529ca29 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -123,13 +123,13 @@ func DecodeBlock(block []byte, vals *[]Value) error { } switch blockType { case BlockFloat64: - return decodeFloatBlock(block, vals) + return DecodeFloatBlock(block, vals) case BlockInt64: - return decodeInt64Block(block, vals) + return DecodeInt64Block(block, vals) case BlockBool: - return decodeBoolBlock(block, vals) + return DecodeBoolBlock(block, vals) case BlockString: - return decodeStringBlock(block, vals) + return DecodeStringBlock(block, vals) default: panic(fmt.Sprintf("unknown block type: %d", blockType)) } @@ -218,7 +218,7 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) { return block, nil } -func decodeFloatBlock(block []byte, a *[]Value) error { +func DecodeFloatBlock(block []byte, a *[]Value) error { // The first 8 bytes is the minimum timestamp of the block block = block[8:] @@ -315,7 +315,7 @@ func encodeBoolBlock(buf []byte, values []Value) ([]byte, error) { return block, nil } -func decodeBoolBlock(block []byte, a *[]Value) error { +func DecodeBoolBlock(block []byte, a *[]Value) error { // The first 8 bytes is the minimum timestamp of the block block = block[8:] @@ -398,7 +398,7 @@ func encodeInt64Block(buf []byte, values []Value) ([]byte, error) { return append(block, packBlock(tb, vb)...), nil } -func decodeInt64Block(block []byte, a *[]Value) error { +func DecodeInt64Block(block []byte, a *[]Value) error { // slice off the first 8 bytes (min timestmap for the block) block = block[8:] @@ -482,7 +482,7 @@ func encodeStringBlock(buf []byte, values []Value) ([]byte, error) { return append(block, packBlock(tb, vb)...), nil } -func decodeStringBlock(block []byte, a *[]Value) error { +func DecodeStringBlock(block []byte, a *[]Value) error { // slice off the first 8 bytes (min timestmap for the block) block = block[8:] From e5022a898ded6b50514b613a013feab9fd13c6e0 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 16 Nov 2015 11:05:18 -0700 Subject: [PATCH 3/6] Support decoding into type specific slices There is a lot of allocations performed when decoding blocks. These types can be re-used to reduce allocations in many cases. This change allows a type specific slice to be passed in to decode funcs to be re-used if it is large enough. The existing decode is is left for backwards compatibility but is not very efficient right now. It may be removed. --- tsdb/engine/tsm1/cursor.go | 2 +- tsdb/engine/tsm1/data_file.go | 4 +- tsdb/engine/tsm1/encoding.go | 125 ++++++++---- tsdb/engine/tsm1/encoding_test.go | 309 +++++++++++++++++++++++++++++- tsdb/engine/tsm1/tsm1.go | 4 +- 5 files changed, 399 insertions(+), 45 deletions(-) diff --git a/tsdb/engine/tsm1/cursor.go b/tsdb/engine/tsm1/cursor.go index 6ec413d54a..9a055d2b79 100644 --- a/tsdb/engine/tsm1/cursor.go +++ b/tsdb/engine/tsm1/cursor.go @@ -474,7 +474,7 @@ func (c *cursor) decodeBlock(position uint32) { length := c.blockLength(position) block := c.f.mmap[position+blockHeaderSize : position+blockHeaderSize+length] c.vals = c.vals[:0] - _ = DecodeBlock(block, &c.vals) + c.vals, _ = DecodeBlock(block, c.vals) // only adavance the position if we're asceending. // Descending queries use the blockPositions diff --git a/tsdb/engine/tsm1/data_file.go b/tsdb/engine/tsm1/data_file.go index 2ee4474aea..03d225fa21 100644 --- a/tsdb/engine/tsm1/data_file.go +++ b/tsdb/engine/tsm1/data_file.go @@ -758,7 +758,7 @@ func (t *tsmReader) Read(key string, timestamp time.Time) ([]Value, error) { //TODO: Validate checksum var values []Value - err = DecodeBlock(b[4:n], &values) + values, err = DecodeBlock(b[4:n], values) if err != nil { return nil, err } @@ -804,7 +804,7 @@ func (t *tsmReader) ReadAll(key string) ([]Value, error) { //TODO: Validate checksum temp = temp[:0] - err = DecodeBlock(b[4:n], &temp) + temp, err = DecodeBlock(b[4:n], temp) if err != nil { return nil, err } diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index 08f529ca29..abc952143e 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -111,25 +111,56 @@ func BlockType(block []byte) (byte, error) { } // DecodeBlock takes a byte array and will decode into values of the appropriate type -// based on the block -func DecodeBlock(block []byte, vals *[]Value) error { +// based on the block. +func DecodeBlock(block []byte, vals []Value) ([]Value, error) { if len(block) <= encodedBlockHeaderSize { panic(fmt.Sprintf("decode of short block: got %v, exp %v", len(block), encodedBlockHeaderSize)) } blockType, err := BlockType(block) if err != nil { - return err + return nil, err } switch blockType { case BlockFloat64: - return DecodeFloatBlock(block, vals) + decoded, err := DecodeFloatBlock(block, nil) + if len(vals) < len(decoded) { + vals = make([]Value, len(decoded)) + } + for i := range decoded { + vals[i] = decoded[i] + } + return vals, err case BlockInt64: - return DecodeInt64Block(block, vals) + decoded, err := DecodeInt64Block(block, nil) + if len(vals) < len(decoded) { + vals = make([]Value, len(decoded)) + } + for i := range decoded { + vals[i] = decoded[i] + } + return vals, err + case BlockBool: - return DecodeBoolBlock(block, vals) + decoded, err := DecodeBoolBlock(block, nil) + if len(vals) < len(decoded) { + vals = make([]Value, len(decoded)) + } + for i := range decoded { + vals[i] = decoded[i] + } + return vals, err + case BlockString: - return DecodeStringBlock(block, vals) + decoded, err := DecodeStringBlock(block, nil) + if len(vals) < len(decoded) { + vals = make([]Value, len(decoded)) + } + for i := range decoded { + vals[i] = decoded[i] + } + return vals, err + default: panic(fmt.Sprintf("unknown block type: %d", blockType)) } @@ -218,14 +249,14 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) { return block, nil } -func DecodeFloatBlock(block []byte, a *[]Value) error { +func DecodeFloatBlock(block []byte, a []*FloatValue) ([]*FloatValue, error) { // The first 8 bytes is the minimum timestamp of the block block = block[8:] // Block type is the next block, make sure we actually have a float block blockType := block[0] if blockType != BlockFloat64 { - return fmt.Errorf("invalid block type: exp %d, got %d", BlockFloat64, blockType) + return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockFloat64, blockType) } block = block[1:] @@ -235,26 +266,33 @@ func DecodeFloatBlock(block []byte, a *[]Value) error { dec := NewTimeDecoder(tb) iter, err := NewFloatDecoder(vb) if err != nil { - return err + return nil, err } // Decode both a timestamp and value + i := 0 for dec.Next() && iter.Next() { ts := dec.Read() v := iter.Values() - *a = append(*a, &FloatValue{ts, v}) + if i < len(a) && a[i] != nil { + a[i].time = ts + a[i].value = v + } else { + a = append(a, &FloatValue{ts, v}) + } + i++ } // Did timestamp decoding have an error? if dec.Error() != nil { - return dec.Error() + return nil, dec.Error() } // Did float decoding have an error? if iter.Error() != nil { - return iter.Error() + return nil, iter.Error() } - return nil + return a[:i], nil } type BoolValue struct { @@ -315,14 +353,14 @@ func encodeBoolBlock(buf []byte, values []Value) ([]byte, error) { return block, nil } -func DecodeBoolBlock(block []byte, a *[]Value) error { +func DecodeBoolBlock(block []byte, a []*BoolValue) ([]*BoolValue, error) { // The first 8 bytes is the minimum timestamp of the block block = block[8:] // Block type is the next block, make sure we actually have a float block blockType := block[0] if blockType != BlockBool { - return fmt.Errorf("invalid block type: exp %d, got %d", BlockBool, blockType) + return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockBool, blockType) } block = block[1:] @@ -333,22 +371,29 @@ func DecodeBoolBlock(block []byte, a *[]Value) error { vdec := NewBoolDecoder(vb) // Decode both a timestamp and value + i := 0 for dec.Next() && vdec.Next() { ts := dec.Read() v := vdec.Read() - *a = append(*a, &BoolValue{ts, v}) + if i < len(a) && a[i] != nil { + a[i].time = ts + a[i].value = v + } else { + a = append(a, &BoolValue{ts, v}) + } + i++ } // Did timestamp decoding have an error? if dec.Error() != nil { - return dec.Error() + return nil, dec.Error() } // Did bool decoding have an error? if vdec.Error() != nil { - return vdec.Error() + return nil, vdec.Error() } - return nil + return a[:i], nil } type Int64Value struct { @@ -398,13 +443,13 @@ func encodeInt64Block(buf []byte, values []Value) ([]byte, error) { return append(block, packBlock(tb, vb)...), nil } -func DecodeInt64Block(block []byte, a *[]Value) error { +func DecodeInt64Block(block []byte, a []*Int64Value) ([]*Int64Value, error) { // slice off the first 8 bytes (min timestmap for the block) block = block[8:] blockType := block[0] if blockType != BlockInt64 { - return fmt.Errorf("invalid block type: exp %d, got %d", BlockInt64, blockType) + return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockInt64, blockType) } block = block[1:] @@ -417,22 +462,29 @@ func DecodeInt64Block(block []byte, a *[]Value) error { vDec := NewInt64Decoder(vb) // Decode both a timestamp and value + i := 0 for tsDec.Next() && vDec.Next() { ts := tsDec.Read() v := vDec.Read() - *a = append(*a, &Int64Value{ts, v}) + if i < len(a) && a[i] != nil { + a[i].time = ts + a[i].value = v + } else { + a = append(a, &Int64Value{ts, v}) + } + i++ } // Did timestamp decoding have an error? if tsDec.Error() != nil { - return tsDec.Error() + return nil, tsDec.Error() } // Did int64 decoding have an error? if vDec.Error() != nil { - return vDec.Error() + return nil, vDec.Error() } - return nil + return a[:i], nil } type StringValue struct { @@ -482,13 +534,13 @@ func encodeStringBlock(buf []byte, values []Value) ([]byte, error) { return append(block, packBlock(tb, vb)...), nil } -func DecodeStringBlock(block []byte, a *[]Value) error { +func DecodeStringBlock(block []byte, a []*StringValue) ([]*StringValue, error) { // slice off the first 8 bytes (min timestmap for the block) block = block[8:] blockType := block[0] if blockType != BlockString { - return fmt.Errorf("invalid block type: exp %d, got %d", BlockString, blockType) + return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockString, blockType) } block = block[1:] @@ -500,26 +552,33 @@ func DecodeStringBlock(block []byte, a *[]Value) error { tsDec := NewTimeDecoder(tb) vDec, err := NewStringDecoder(vb) if err != nil { - return err + return nil, err } // Decode both a timestamp and value + i := 0 for tsDec.Next() && vDec.Next() { ts := tsDec.Read() v := vDec.Read() - *a = append(*a, &StringValue{ts, v}) + if i < len(a) && a[i] != nil { + a[i].time = ts + a[i].value = v + } else { + a = append(a, &StringValue{ts, v}) + } + i++ } // Did timestamp decoding have an error? if tsDec.Error() != nil { - return tsDec.Error() + return nil, tsDec.Error() } // Did string decoding have an error? if vDec.Error() != nil { - return vDec.Error() + return nil, vDec.Error() } - return nil + return a[:i], nil } func packBlockHeader(firstTime time.Time, blockType byte) []byte { diff --git a/tsdb/engine/tsm1/encoding_test.go b/tsdb/engine/tsm1/encoding_test.go index fd8b22b48f..8daca378e0 100644 --- a/tsdb/engine/tsm1/encoding_test.go +++ b/tsdb/engine/tsm1/encoding_test.go @@ -24,7 +24,8 @@ func TestEncoding_FloatBlock(t *testing.T) { } var decodedValues []tsm1.Value - if err := tsm1.DecodeBlock(b, &decodedValues); err != nil { + decodedValues, err = tsm1.DecodeBlock(b, decodedValues) + if err != nil { t.Fatalf("unexpected error decoding block: %v", err) } @@ -45,7 +46,8 @@ func TestEncoding_FloatBlock_ZeroTime(t *testing.T) { } var decodedValues []tsm1.Value - if err := tsm1.DecodeBlock(b, &decodedValues); err != nil { + decodedValues, err = tsm1.DecodeBlock(b, decodedValues) + if err != nil { t.Fatalf("unexpected error decoding block: %v", err) } @@ -68,7 +70,8 @@ func TestEncoding_FloatBlock_SimilarFloats(t *testing.T) { } var decodedValues []tsm1.Value - if err := tsm1.DecodeBlock(b, &decodedValues); err != nil { + decodedValues, err = tsm1.DecodeBlock(b, decodedValues) + if err != nil { t.Fatalf("unexpected error decoding block: %v", err) } @@ -91,7 +94,8 @@ func TestEncoding_IntBlock_Basic(t *testing.T) { } var decodedValues []tsm1.Value - if err := tsm1.DecodeBlock(b, &decodedValues); err != nil { + decodedValues, err = tsm1.DecodeBlock(b, decodedValues) + if err != nil { t.Fatalf("unexpected error decoding block: %v", err) } @@ -129,7 +133,8 @@ func TestEncoding_IntBlock_Negatives(t *testing.T) { } var decodedValues []tsm1.Value - if err := tsm1.DecodeBlock(b, &decodedValues); err != nil { + decodedValues, err = tsm1.DecodeBlock(b, decodedValues) + if err != nil { t.Fatalf("unexpected error decoding block: %v", err) } @@ -156,7 +161,8 @@ func TestEncoding_BoolBlock_Basic(t *testing.T) { } var decodedValues []tsm1.Value - if err := tsm1.DecodeBlock(b, &decodedValues); err != nil { + decodedValues, err = tsm1.DecodeBlock(b, decodedValues) + if err != nil { t.Fatalf("unexpected error decoding block: %v", err) } @@ -179,7 +185,8 @@ func TestEncoding_StringBlock_Basic(t *testing.T) { } var decodedValues []tsm1.Value - if err := tsm1.DecodeBlock(b, &decodedValues); err != nil { + decodedValues, err = tsm1.DecodeBlock(b, decodedValues) + if err != nil { t.Fatalf("unexpected error decoding block: %v", err) } @@ -232,3 +239,291 @@ func getTimes(n, step int, precision time.Duration) []time.Time { } return a } + +func BenchmarkDecodeBlock_Float_Empty(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + values := make([]tsm1.Value, len(times)) + for i, t := range times { + values[i] = tsm1.NewValue(t, float64(i)) + } + + bytes, err := tsm1.Values(values).Encode(nil) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + + var decodedValues []tsm1.Value + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = tsm1.DecodeBlock(bytes, decodedValues) + if err != nil { + b.Fatalf("unexpected error decoding block: %v", err) + } + } +} + +func BenchmarkDecodeBlock_Float_EqualSize(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + values := make([]tsm1.Value, len(times)) + for i, t := range times { + values[i] = tsm1.NewValue(t, float64(i)) + } + + bytes, err := tsm1.Values(values).Encode(nil) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + + decodedValues := make([]tsm1.Value, len(values)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = tsm1.DecodeBlock(bytes, decodedValues) + if err != nil { + b.Fatalf("unexpected error decoding block: %v", err) + } + } +} + +func BenchmarkDecodeBlock_Float_TypeSpecific(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + values := make([]tsm1.Value, len(times)) + for i, t := range times { + values[i] = tsm1.NewValue(t, float64(i)) + } + + bytes, err := tsm1.Values(values).Encode(nil) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + + decodedValues := make([]*tsm1.FloatValue, len(values)) + for i := 0; i < len(decodedValues); i++ { + decodedValues[i] = &tsm1.FloatValue{} + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = tsm1.DecodeFloatBlock(bytes, decodedValues) + if err != nil { + b.Fatalf("unexpected error decoding block: %v", err) + } + } +} + +func BenchmarkDecodeBlock_Int64_Empty(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + values := make([]tsm1.Value, len(times)) + for i, t := range times { + values[i] = tsm1.NewValue(t, int64(i)) + } + + bytes, err := tsm1.Values(values).Encode(nil) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + + var decodedValues []tsm1.Value + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = tsm1.DecodeBlock(bytes, decodedValues) + if err != nil { + b.Fatalf("unexpected error decoding block: %v", err) + } + } +} + +func BenchmarkDecodeBlock_Int64_EqualSize(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + values := make([]tsm1.Value, len(times)) + for i, t := range times { + values[i] = tsm1.NewValue(t, int64(i)) + } + + bytes, err := tsm1.Values(values).Encode(nil) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + + decodedValues := make([]tsm1.Value, len(values)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = tsm1.DecodeBlock(bytes, decodedValues) + if err != nil { + b.Fatalf("unexpected error decoding block: %v", err) + } + } +} + +func BenchmarkDecodeBlock_Int64_TypeSpecific(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + values := make([]tsm1.Value, len(times)) + for i, t := range times { + values[i] = tsm1.NewValue(t, int64(i)) + } + + bytes, err := tsm1.Values(values).Encode(nil) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + + decodedValues := make([]*tsm1.Int64Value, len(values)) + for i := 0; i < len(decodedValues); i++ { + decodedValues[i] = &tsm1.Int64Value{} + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = tsm1.DecodeInt64Block(bytes, decodedValues) + if err != nil { + b.Fatalf("unexpected error decoding block: %v", err) + } + } +} + +func BenchmarkDecodeBlock_Bool_Empty(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + values := make([]tsm1.Value, len(times)) + for i, t := range times { + values[i] = tsm1.NewValue(t, true) + } + + bytes, err := tsm1.Values(values).Encode(nil) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + + var decodedValues []tsm1.Value + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = tsm1.DecodeBlock(bytes, decodedValues) + if err != nil { + b.Fatalf("unexpected error decoding block: %v", err) + } + } +} + +func BenchmarkDecodeBlock_Bool_EqualSize(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + values := make([]tsm1.Value, len(times)) + for i, t := range times { + values[i] = tsm1.NewValue(t, true) + } + + bytes, err := tsm1.Values(values).Encode(nil) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + + decodedValues := make([]tsm1.Value, len(values)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = tsm1.DecodeBlock(bytes, decodedValues) + if err != nil { + b.Fatalf("unexpected error decoding block: %v", err) + } + } +} + +func BenchmarkDecodeBlock_Bool_TypeSpecific(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + values := make([]tsm1.Value, len(times)) + for i, t := range times { + values[i] = tsm1.NewValue(t, true) + } + + bytes, err := tsm1.Values(values).Encode(nil) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + + decodedValues := make([]*tsm1.BoolValue, len(values)) + for i := 0; i < len(decodedValues); i++ { + decodedValues[i] = &tsm1.BoolValue{} + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = tsm1.DecodeBoolBlock(bytes, decodedValues) + if err != nil { + b.Fatalf("unexpected error decoding block: %v", err) + } + } +} + +func BenchmarkDecodeBlock_String_Empty(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + values := make([]tsm1.Value, len(times)) + for i, t := range times { + values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i)) + } + + bytes, err := tsm1.Values(values).Encode(nil) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + + var decodedValues []tsm1.Value + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = tsm1.DecodeBlock(bytes, decodedValues) + if err != nil { + b.Fatalf("unexpected error decoding block: %v", err) + } + } +} + +func BenchmarkDecodeBlock_String_EqualSize(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + values := make([]tsm1.Value, len(times)) + for i, t := range times { + values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i)) + } + + bytes, err := tsm1.Values(values).Encode(nil) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + + decodedValues := make([]tsm1.Value, len(values)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = tsm1.DecodeBlock(bytes, decodedValues) + if err != nil { + b.Fatalf("unexpected error decoding block: %v", err) + } + } +} + +func BenchmarkDecodeBlock_String_TypeSpecific(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + values := make([]tsm1.Value, len(times)) + for i, t := range times { + values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i)) + } + + bytes, err := tsm1.Values(values).Encode(nil) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + + decodedValues := make([]*tsm1.StringValue, len(values)) + for i := 0; i < len(decodedValues); i++ { + decodedValues[i] = &tsm1.StringValue{} + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = tsm1.DecodeStringBlock(bytes, decodedValues) + if err != nil { + b.Fatalf("unexpected error decoding block: %v", err) + } + } +} diff --git a/tsdb/engine/tsm1/tsm1.go b/tsdb/engine/tsm1/tsm1.go index 59ddcd7043..12fbd81e6d 100644 --- a/tsdb/engine/tsm1/tsm1.go +++ b/tsdb/engine/tsm1/tsm1.go @@ -1642,7 +1642,7 @@ func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime } var values []Value - err := DecodeBlock(block, &values) + values, err := DecodeBlock(block, values) if err != nil { panic(fmt.Sprintf("failure decoding block: %v", err)) } @@ -1871,7 +1871,7 @@ func (c *compactionJob) writeIDFromFile(id uint64, previousValues Values, filePo // decode the block and append to previous values // TODO: update this so that blocks already at their limit don't need decoding var values []Value - err := DecodeBlock(block, &values) + values, err := DecodeBlock(block, values) if err != nil { panic(fmt.Sprintf("error decoding block: %s", err.Error())) } From a7d7c280edc5af677ef41c0d0f2f3c6a75b4fb12 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 16 Nov 2015 16:12:24 -0700 Subject: [PATCH 4/6] Add block type to index This will faciliate loading a block into a type specific result without first loading the block. This will also allow us to populate the database index solely from the index. --- tsdb/engine/tsm1/DESIGN.md | 12 +- tsdb/engine/tsm1/data_file.go | 270 +++++++++++++++++++---------- tsdb/engine/tsm1/data_file_test.go | 71 ++++++-- tsdb/engine/tsm1/encoding.go | 8 +- 4 files changed, 249 insertions(+), 112 deletions(-) diff --git a/tsdb/engine/tsm1/DESIGN.md b/tsdb/engine/tsm1/DESIGN.md index f1fc115b29..9f7ffe74a0 100644 --- a/tsdb/engine/tsm1/DESIGN.md +++ b/tsdb/engine/tsm1/DESIGN.md @@ -39,12 +39,12 @@ The index structure can provide efficient access to all blocks as well as the ab _TBD: The block length stored in the block data could probably be dropped since we store it in the index._ ``` -┌──────────────────────────────────────────────────────────────────────────┐ -│ Index │ -├─────────┬─────────┬───────┬─────────┬─────────┬─────────┬─────────┬──────┤ -│ Key Len │ Key │ Count │Min Time │Max Time │ Offset │ Size │ ... │ -│ 2 bytes │ N bytes │2 bytes│ 8 bytes │ 8 bytes │ 8 bytes │ 4 bytes │ │ -└─────────┴─────────┴───────┴─────────┴─────────┴─────────┴─────────┴──────┘ +┌────────────────────────────────────────────────────────────────────────────┐ +│ Index │ +├─────────┬─────────┬──────┬───────┬─────────┬─────────┬────────┬────────┬───┤ +│ Key Len │ Key │ Type │ Count │Min Time │Max Time │ Offset │ Size │...│ +│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │ │ +└─────────┴─────────┴──────┴───────┴─────────┴─────────┴────────┴────────┴───┘ ``` The last section is the footer that stores the offset of the start of the index. diff --git a/tsdb/engine/tsm1/data_file.go b/tsdb/engine/tsm1/data_file.go index 03d225fa21..df4ff97f2e 100644 --- a/tsdb/engine/tsm1/data_file.go +++ b/tsdb/engine/tsm1/data_file.go @@ -45,12 +45,12 @@ timestamp as well as where that block resides and how much data to read to retrieve the block. If we know we need to read all or multiple blocks in a file, we can use the size to determine how much to read in a given IO. -┌──────────────────────────────────────────────────────────────────────────┐ -│ Index │ -├─────────┬─────────┬───────┬─────────┬─────────┬─────────┬─────────┬──────┤ -│ Key Len │ Key │ Count │Min Time │Max Time │ Offset │ Size │ ... │ -│ 2 bytes │ N bytes │2 bytes│ 8 bytes │ 8 bytes │ 8 bytes │ 4 bytes │ │ -└─────────┴─────────┴───────┴─────────┴─────────┴─────────┴─────────┴──────┘ +┌────────────────────────────────────────────────────────────────────────────┐ +│ Index │ +├─────────┬─────────┬──────┬───────┬─────────┬─────────┬────────┬────────┬───┤ +│ Key Len │ Key │ Type │ Count │Min Time │Max Time │ Offset │ Size │...│ +│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │ │ +└─────────┴─────────┴──────┴───────┴─────────┴─────────┴────────┴────────┴───┘ The last section is the footer that stores the offset of the start of the index. @@ -86,6 +86,9 @@ const ( // Size in bytes used to store the count of index entries for a key indexCountSize = 2 + // Size in bytes used to store the type of block encoded + indexTypeSize = 1 + // Max number of blocks for a given key that can exist in a single file maxIndexEntries = (1 << (indexCountSize * 8)) - 1 ) @@ -113,7 +116,7 @@ type TSMWriter interface { type TSMIndex interface { // Add records a new block entry for a key in the index. - Add(key string, minTime, maxTime time.Time, offset int64, size uint32) + Add(key string, blockType byte, minTime, maxTime time.Time, offset int64, size uint32) // Delete removes the given key from the index. Delete(key string) @@ -136,6 +139,11 @@ type TSMIndex interface { // Keys returns the unique set of keys in the index. Keys() []string + // Type returns the block type of the values stored for the key. Returns one of + // BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist, + // an error is returned. + Type(key string) (byte, error) + // MarshalBinary returns a byte slice encoded version of the index. MarshalBinary() ([]byte, error) @@ -177,7 +185,7 @@ func (e *IndexEntry) Contains(t time.Time) bool { func NewDirectIndex() TSMIndex { return &directIndex{ - blocks: map[string]indexEntries{}, + blocks: map[string]*indexEntries{}, } } @@ -186,14 +194,21 @@ func NewDirectIndex() TSMIndex { type directIndex struct { mu sync.RWMutex - blocks map[string]indexEntries + blocks map[string]*indexEntries } -func (d *directIndex) Add(key string, minTime, maxTime time.Time, offset int64, size uint32) { +func (d *directIndex) Add(key string, blockType byte, minTime, maxTime time.Time, offset int64, size uint32) { d.mu.Lock() defer d.mu.Unlock() - d.blocks[key] = append(d.blocks[key], &IndexEntry{ + entries := d.blocks[key] + if entries == nil { + entries = &indexEntries{ + Type: blockType, + } + d.blocks[key] = entries + } + entries.Append(&IndexEntry{ MinTime: minTime, MaxTime: maxTime, Offset: offset, @@ -205,7 +220,11 @@ func (d *directIndex) Entries(key string) []*IndexEntry { d.mu.RLock() defer d.mu.RUnlock() - return d.blocks[key] + entries := d.blocks[key] + if entries == nil { + return nil + } + return d.blocks[key].entries } func (d *directIndex) Entry(key string, t time.Time) *IndexEntry { @@ -221,6 +240,16 @@ func (d *directIndex) Entry(key string, t time.Time) *IndexEntry { return nil } +func (d *directIndex) Type(key string) (byte, error) { + d.mu.RLock() + defer d.mu.RUnlock() + entries := d.blocks[key] + if entries != nil { + return entries.Type, nil + } + return 0, fmt.Errorf("key does not exist: %v", key) +} + func (d *directIndex) Contains(key string) bool { return len(d.Entries(key)) > 0 } @@ -248,8 +277,13 @@ func (d *directIndex) Keys() []string { return keys } -func (d *directIndex) addEntries(key string, entries indexEntries) { - d.blocks[key] = append(d.blocks[key], entries...) +func (d *directIndex) addEntries(key string, entries *indexEntries) { + existing := d.blocks[key] + if existing == nil { + d.blocks[key] = entries + return + } + existing.Append(entries.entries...) } func (d *directIndex) Write(w io.Writer) error { @@ -284,9 +318,9 @@ func (d *directIndex) MarshalBinary() ([]byte, error) { for _, key := range keys { entries := d.blocks[key] - if len(entries) > maxIndexEntries { + if entries.Len() > maxIndexEntries { return nil, fmt.Errorf("key '%s' exceeds max index entries: %d > %d", - key, len(entries), maxIndexEntries) + key, entries.Len(), maxIndexEntries) } sort.Sort(entries) @@ -294,16 +328,18 @@ func (d *directIndex) MarshalBinary() ([]byte, error) { b = append(b, u16tob(uint16(len(key)))...) b = append(b, key...) + // Append the block type + b = append(b, entries.Type) + // Append the index block count - b = append(b, u16tob(uint16(len(entries)))...) + b = append(b, u16tob(uint16(entries.Len()))...) // Append each index entry for all blocks for this key - for _, entry := range entries { - b = append(b, u64tob(uint64(entry.MinTime.UnixNano()))...) - b = append(b, u64tob(uint64(entry.MaxTime.UnixNano()))...) - b = append(b, u64tob(uint64(entry.Offset))...) - b = append(b, u32tob(entry.Size)...) + eb, err := entries.MarshalBinary() + if err != nil { + return nil, err } + b = append(b, eb...) } return b, nil } @@ -314,13 +350,13 @@ func (d *directIndex) UnmarshalBinary(b []byte) error { var pos int for pos < len(b) { - n, key, err := d.readKey(b[pos:]) + n, key, err := readKey(b[pos:]) if err != nil { return fmt.Errorf("readIndex: read key error: %v", err) } - pos += n - n, entries, err := d.readEntries(b[pos:]) + + n, entries, err := readEntries(b[pos:]) if err != nil { return fmt.Errorf("readIndex: read entries error: %v", err) } @@ -331,31 +367,6 @@ func (d *directIndex) UnmarshalBinary(b []byte) error { return nil } -func (d *directIndex) readKey(b []byte) (n int, key string, err error) { - // 2 byte size of key - n, size := 2, int(btou16(b[:2])) - - // N byte key - key = string(b[n : n+size]) - n += len(key) - return -} - -func (d *directIndex) readEntries(b []byte) (n int, entries indexEntries, err error) { - // 2 byte count of index entries - n, count := indexCountSize, int(btou16(b[:2])) - - for i := 0; i < count; i++ { - ie := &IndexEntry{} - if err := ie.UnmarshalBinary(b[i*indexEntrySize+indexCountSize : i*indexEntrySize+indexCountSize+indexEntrySize]); err != nil { - return 0, nil, fmt.Errorf("readEntries: unmarshal error: %v", err) - } - entries = append(entries, ie) - n += indexEntrySize - } - return -} - // indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This // implementation can be used for indexes that may be MMAPed into memory. type indirectIndex struct { @@ -413,12 +424,11 @@ func NewIndirectIndex() TSMIndex { } // Add records a new block entry for a key in the index. -func (d *indirectIndex) Add(key string, minTime, maxTime time.Time, offset int64, size uint32) { +func (d *indirectIndex) Add(key string, blockType byte, minTime, maxTime time.Time, offset int64, size uint32) { panic("unsupported operation") } -// Entries returns all index entries for a key. -func (d *indirectIndex) Entries(key string) []*IndexEntry { +func (d *indirectIndex) search(key string) int { // We use a binary search across our indirect offsets (pointers to all the keys // in the index slice). i := sort.Search(len(d.offsets), func(i int) bool { @@ -438,7 +448,31 @@ func (d *indirectIndex) Entries(key string) []*IndexEntry { // See if we might have found the right index if i < len(d.offsets) { ofs := d.offsets[i] - n, k, err := d.readKey(d.b[ofs:]) + _, k, err := readKey(d.b[ofs:]) + if err != nil { + panic(fmt.Sprintf("error reading key: %v", err)) + } + + // The search may have returned an i == 0 which could indicated that the value + // searched should be inserted at postion 0. Make sure the key in the index + // matches the search value. + if k != key { + return len(d.offsets) + } + + return int(ofs) + } + + // The key is not in the index. i is the index where it would be inserted so return + // a value outside our offet range. + return len(d.offsets) +} + +// Entries returns all index entries for a key. +func (d *indirectIndex) Entries(key string) []*IndexEntry { + ofs := d.search(key) + if ofs < len(d.offsets) { + n, k, err := readKey(d.b[ofs:]) if err != nil { panic(fmt.Sprintf("error reading key: %v", err)) } @@ -451,14 +485,13 @@ func (d *indirectIndex) Entries(key string) []*IndexEntry { } // Read and return all the entries - ofs += int32(n) - _, entries, err := d.readEntries(d.b[ofs:]) + ofs += n + _, entries, err := readEntries(d.b[ofs:]) if err != nil { panic(fmt.Sprintf("error reading entries: %v", err)) } - return entries - + return entries.entries } // The key is not in the index. i is the index where it would be inserted. @@ -480,7 +513,7 @@ func (d *indirectIndex) Entry(key string, timestamp time.Time) *IndexEntry { func (d *indirectIndex) Keys() []string { var keys []string for offset := range d.offsets { - _, key, _ := d.readKey(d.b[offset:]) + _, key, _ := readKey(d.b[offset:]) keys = append(keys, key) } return keys @@ -489,7 +522,7 @@ func (d *indirectIndex) Keys() []string { func (d *indirectIndex) Delete(key string) { var offsets []int32 for offset := range d.offsets { - _, indexKey, _ := d.readKey(d.b[offset:]) + _, indexKey, _ := readKey(d.b[offset:]) if key == indexKey { continue } @@ -506,6 +539,20 @@ func (d *indirectIndex) ContainsValue(key string, timestamp time.Time) bool { return d.Entry(key, timestamp) != nil } +func (d *indirectIndex) Type(key string) (byte, error) { + ofs := d.search(key) + if ofs < len(d.offsets) { + n, _, err := readKey(d.b[ofs:]) + if err != nil { + panic(fmt.Sprintf("error reading key: %v", err)) + } + + ofs += n + return d.b[ofs], nil + } + return 0, fmt.Errorf("key does not exist: %v", key) +} + // MarshalBinary returns a byte slice encoded version of the index. func (d *indirectIndex) MarshalBinary() ([]byte, error) { return d.b, nil @@ -532,6 +579,9 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error { // Skip over the key i += keyLen + // Skip over the block type + i += indexTypeSize + // Count of all the index blocks for this key count := int32(btou16(b[i : i+2])) @@ -545,31 +595,6 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error { return nil } -func (d *indirectIndex) readKey(b []byte) (n int, key string, err error) { - // 2 byte size of key - n, size := 2, int(btou16(b[:2])) - - // N byte key - key = string(b[n : n+size]) - n += len(key) - return -} - -func (d *indirectIndex) readEntries(b []byte) (n int, entries indexEntries, err error) { - // 2 byte count of index entries - n, count := indexCountSize, int(btou16(b[:2])) - - for i := 0; i < count; i++ { - ie := &IndexEntry{} - if err := ie.UnmarshalBinary(b[i*indexEntrySize+indexCountSize : i*indexEntrySize+indexCountSize+indexEntrySize]); err != nil { - return 0, nil, fmt.Errorf("readEntries: unmarshal error: %v", err) - } - entries = append(entries, ie) - n += indexEntrySize - } - return -} - // tsmWriter writes keys and values in the TSM format type tsmWriter struct { w io.Writer @@ -584,7 +609,7 @@ func NewTSMWriter(w io.Writer) (TSMWriter, error) { } index := &directIndex{ - blocks: map[string]indexEntries{}, + blocks: map[string]*indexEntries{}, } return &tsmWriter{w: w, index: index, n: int64(n)}, nil @@ -603,8 +628,12 @@ func (t *tsmWriter) Write(key string, values Values) error { return err } + blockType, err := BlockType(block) + if err != nil { + return err + } // Record this block in index - t.index.Add(key, values[0].Time(), values[len(values)-1].Time(), t.n, uint32(n)) + t.index.Add(key, blockType, values[0].Time(), values[len(values)-1].Time(), t.n, uint32(n)) // Increment file position pointer t.n += int64(n) @@ -689,7 +718,7 @@ func (t *tsmReader) init() error { b = make([]byte, t.indexEnd-t.indexStart) t.index = &directIndex{ - blocks: map[string]indexEntries{}, + blocks: map[string]*indexEntries{}, } _, err = t.r.Read(b) if err != nil { @@ -814,6 +843,10 @@ func (t *tsmReader) ReadAll(key string) ([]Value, error) { return values, nil } +func (t *tsmReader) Type(key string) (byte, error) { + return t.index.Type(key) +} + func (t *tsmReader) Close() error { t.mu.Lock() defer t.mu.Unlock() @@ -844,11 +877,64 @@ func (t *tsmReader) Delete(key string) error { return nil } -type indexEntries []*IndexEntry +type indexEntries struct { + Type byte + entries []*IndexEntry +} -func (a indexEntries) Len() int { return len(a) } -func (a indexEntries) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a indexEntries) Less(i, j int) bool { return a[i].MinTime.UnixNano() < a[j].MinTime.UnixNano() } +func (a *indexEntries) Len() int { return len(a.entries) } +func (a *indexEntries) Swap(i, j int) { a.entries[i], a.entries[j] = a.entries[j], a.entries[i] } +func (a *indexEntries) Less(i, j int) bool { + return a.entries[i].MinTime.UnixNano() < a.entries[j].MinTime.UnixNano() +} + +func (a *indexEntries) Append(entry ...*IndexEntry) { + a.entries = append(a.entries, entry...) +} + +func (a *indexEntries) MarshalBinary() (b []byte, err error) { + for _, entry := range a.entries { + b = append(b, u64tob(uint64(entry.MinTime.UnixNano()))...) + b = append(b, u64tob(uint64(entry.MaxTime.UnixNano()))...) + b = append(b, u64tob(uint64(entry.Offset))...) + b = append(b, u32tob(entry.Size)...) + } + return b, nil +} + +func readKey(b []byte) (n int, key string, err error) { + // 2 byte size of key + n, size := 2, int(btou16(b[:2])) + + // N byte key + key = string(b[n : n+size]) + n += len(key) + return +} + +func readEntries(b []byte) (n int, entries *indexEntries, err error) { + // 1 byte block type + blockType := b[n] + entries = &indexEntries{ + Type: blockType, + entries: []*IndexEntry{}, + } + n++ + + // 2 byte count of index entries + count := int(btou16(b[n : n+indexCountSize])) + n += indexCountSize + + for i := 0; i < count; i++ { + ie := &IndexEntry{} + if err := ie.UnmarshalBinary(b[i*indexEntrySize+indexCountSize+indexTypeSize : i*indexEntrySize+indexCountSize+indexEntrySize+indexTypeSize]); err != nil { + return 0, nil, fmt.Errorf("readEntries: unmarshal error: %v", err) + } + entries.Append(ie) + n += indexEntrySize + } + return +} func u16tob(v uint16) []byte { b := make([]byte, 2) diff --git a/tsdb/engine/tsm1/data_file_test.go b/tsdb/engine/tsm1/data_file_test.go index 4987b047ba..01202289e3 100644 --- a/tsdb/engine/tsm1/data_file_test.go +++ b/tsdb/engine/tsm1/data_file_test.go @@ -360,9 +360,9 @@ func TestTSMWriter_Read_Multiple(t *testing.T) { func TestIndirectIndex_Entries(t *testing.T) { index := tsm1.NewDirectIndex() - index.Add("cpu", time.Unix(0, 0), time.Unix(1, 0), 10, 100) - index.Add("cpu", time.Unix(2, 0), time.Unix(3, 0), 20, 200) - index.Add("mem", time.Unix(0, 0), time.Unix(1, 0), 10, 100) + index.Add("cpu", tsm1.BlockFloat64, time.Unix(0, 0), time.Unix(1, 0), 10, 100) + index.Add("cpu", tsm1.BlockFloat64, time.Unix(2, 0), time.Unix(3, 0), 20, 200) + index.Add("mem", tsm1.BlockFloat64, time.Unix(0, 0), time.Unix(1, 0), 10, 100) b, err := index.MarshalBinary() if err != nil { @@ -402,8 +402,8 @@ func TestIndirectIndex_Entries(t *testing.T) { func TestIndirectIndex_Entries_NonExistent(t *testing.T) { index := tsm1.NewDirectIndex() - index.Add("cpu", time.Unix(0, 0), time.Unix(1, 0), 10, 100) - index.Add("cpu", time.Unix(2, 0), time.Unix(3, 0), 20, 200) + index.Add("cpu", tsm1.BlockFloat64, time.Unix(0, 0), time.Unix(1, 0), 10, 100) + index.Add("cpu", tsm1.BlockFloat64, time.Unix(2, 0), time.Unix(3, 0), 20, 200) b, err := index.MarshalBinary() if err != nil { @@ -428,7 +428,7 @@ func TestIndirectIndex_Entries_NonExistent(t *testing.T) { func TestIndirectIndex_MaxBlocks(t *testing.T) { index := tsm1.NewDirectIndex() for i := 0; i < 1<<16; i++ { - index.Add("cpu", time.Unix(0, 0), time.Unix(1, 0), 10, 20) + index.Add("cpu", tsm1.BlockFloat64, time.Unix(0, 0), time.Unix(1, 0), 10, 20) } if _, err := index.MarshalBinary(); err == nil { @@ -438,11 +438,32 @@ func TestIndirectIndex_MaxBlocks(t *testing.T) { } } +func TestIndirectIndex_Type(t *testing.T) { + index := tsm1.NewDirectIndex() + index.Add("cpu", tsm1.BlockInt64, time.Unix(0, 0), time.Unix(1, 0), 10, 20) + + b, err := index.MarshalBinary() + + ind := tsm1.NewIndirectIndex() + if err := ind.UnmarshalBinary(b); err != nil { + fatal(t, "unmarshal binary", err) + } + + typ, err := ind.Type("cpu") + if err != nil { + fatal(t, "reading type", err) + } + + if got, exp := typ, tsm1.BlockInt64; got != exp { + t.Fatalf("type mismatch: got %v, exp %v", got, exp) + } +} + func TestIndirectIndex_Keys(t *testing.T) { index := tsm1.NewDirectIndex() - index.Add("cpu", time.Unix(0, 0), time.Unix(1, 0), 10, 20) - index.Add("mem", time.Unix(0, 0), time.Unix(1, 0), 10, 20) - index.Add("cpu", time.Unix(1, 0), time.Unix(2, 0), 20, 30) + index.Add("cpu", tsm1.BlockFloat64, time.Unix(0, 0), time.Unix(1, 0), 10, 20) + index.Add("mem", tsm1.BlockFloat64, time.Unix(0, 0), time.Unix(1, 0), 10, 20) + index.Add("cpu", tsm1.BlockFloat64, time.Unix(1, 0), time.Unix(2, 0), 20, 30) keys := index.Keys() @@ -459,5 +480,35 @@ func TestIndirectIndex_Keys(t *testing.T) { if got, exp := keys[1], "mem"; got != exp { t.Fatalf("key mismatch: got %v, exp %v", got, exp) } - +} + +func TestTSMWriter_Type(t *testing.T) { + var b bytes.Buffer + w, err := tsm1.NewTSMWriter(&b) + if err != nil { + t.Fatalf("unexpected error creating writer: %v", err) + } + + values := []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), int64(1))} + if err := w.Write("cpu", values); err != nil { + t.Fatalf("unexpeted error writing: %v", err) + + } + if err := w.WriteIndex(); err != nil { + t.Fatalf("unexpeted error closing: %v", err) + } + + r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes())) + if err != nil { + t.Fatalf("unexpected error created reader: %v", err) + } + + typ, err := r.Type("cpu") + if err != nil { + fatal(t, "reading type", err) + } + + if got, exp := typ, tsm1.BlockInt64; got != exp { + t.Fatalf("type mismatch: got %v, exp %v", got, exp) + } } diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index abc952143e..e2db58f410 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -11,16 +11,16 @@ import ( const ( // BlockFloat64 designates a block encodes float64 values - BlockFloat64 = 0 + BlockFloat64 = byte(0) // BlockInt64 designates a block encodes int64 values - BlockInt64 = 1 + BlockInt64 = byte(1) // BlockBool designates a block encodes bool values - BlockBool = 2 + BlockBool = byte(2) // BlockString designates a block encodes string values - BlockString = 3 + BlockString = byte(3) // encodedBlockHeaderSize is the size of the header for an encoded block. The first 8 bytes // are the minimum timestamp of the block. The next byte is a block encoding type indicator. From c4a6490d322e466bbb66839b73788bd9a5aaee56 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 17 Nov 2015 12:30:09 -0700 Subject: [PATCH 5/6] Update influx_inspect to use new DecodeBlock interface --- cmd/influx_inspect/tsm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/influx_inspect/tsm.go b/cmd/influx_inspect/tsm.go index 6b82b8e6fc..66788b877e 100644 --- a/cmd/influx_inspect/tsm.go +++ b/cmd/influx_inspect/tsm.go @@ -354,7 +354,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { encoded := buf[9:] var v []tsm1.Value - err := tsm1.DecodeBlock(buf, &v) + v, err := tsm1.DecodeBlock(buf, v) if err != nil { fmt.Printf("error: %v\n", err.Error()) os.Exit(1) From 0d1508a7c658b0273f914909825be8d4e0497b2f Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 17 Nov 2015 23:23:16 -0700 Subject: [PATCH 6/6] Add comments for search --- tsdb/engine/tsm1/data_file.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tsdb/engine/tsm1/data_file.go b/tsdb/engine/tsm1/data_file.go index df4ff97f2e..86a5d33776 100644 --- a/tsdb/engine/tsm1/data_file.go +++ b/tsdb/engine/tsm1/data_file.go @@ -428,6 +428,8 @@ func (d *indirectIndex) Add(key string, blockType byte, minTime, maxTime time.Ti panic("unsupported operation") } +// search returns the index of i in offsets for where key is located. If key is not +// in the index, len(offsets) is returned. func (d *indirectIndex) search(key string) int { // We use a binary search across our indirect offsets (pointers to all the keys // in the index slice). @@ -464,7 +466,7 @@ func (d *indirectIndex) search(key string) int { } // The key is not in the index. i is the index where it would be inserted so return - // a value outside our offet range. + // a value outside our offset range. return len(d.offsets) }