Merge pull request #7348 from influxdata/jw-write-allocs

Reduce allocations in write path/compactions
pull/7345/head
Jason Wilder 2016-09-26 12:29:31 -06:00 committed by GitHub
commit 6a604c2928
19 changed files with 430 additions and 119 deletions

View File

@ -171,7 +171,7 @@ func ParseKey(buf []byte) (string, Tags, error) {
// ParsePointsWithPrecision is similar to ParsePoints, but allows the
// caller to provide a precision for time.
func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) {
points := []Point{}
points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1)
var (
pos int
block []byte

View File

@ -96,6 +96,19 @@ func BenchmarkNewPoint(b *testing.B) {
}
}
func BenchmarkParsePointNoTags5000(b *testing.B) {
var batch [5000]string
for i := 0; i < len(batch); i++ {
batch[i] = `cpu value=1i 1000000000`
}
lines := strings.Join(batch[:], "\n")
b.ResetTimer()
for i := 0; i < b.N; i++ {
models.ParsePoints([]byte(lines))
b.SetBytes(int64(len(lines)))
}
}
func BenchmarkParsePointNoTags(b *testing.B) {
line := `cpu value=1i 1000000000`
for i := 0; i < b.N; i++ {

42
pkg/pool/bytes.go Normal file
View File

@ -0,0 +1,42 @@
package pool
// Bytes is a pool of byte slices that can be re-used. Slices in
// this pool will not be garbage collected when not in use.
type Bytes struct {
pool chan []byte
}
// NewBytes returns a Bytes pool with capacity for max byte slices
// to be pool.
func NewBytes(max int) *Bytes {
return &Bytes{
pool: make(chan []byte, max),
}
}
// Get returns a byte slice size with at least sz capacity. Items
// returned may not be in the zero state and should be reset by the
// caller.
func (p *Bytes) Get(sz int) []byte {
var c []byte
select {
case c = <-p.pool:
default:
return make([]byte, sz)
}
if cap(c) < sz {
return make([]byte, sz)
}
return c[:sz]
}
// Put returns a slice back to the pool. If the pool is full, the byte
// slice is discarded.
func (p *Bytes) Put(c []byte) {
select {
case p.pool <- c:
default:
}
}

40
pkg/pool/generic.go Normal file
View File

@ -0,0 +1,40 @@
package pool
// Generic is a pool of types that can be re-used. Items in
// this pool will not be garbage collected when not in use.
type Generic struct {
pool chan interface{}
fn func(sz int) interface{}
}
// NewGeneric returns a Generic pool with capacity for max items
// to be pool.
func NewGeneric(max int, fn func(sz int) interface{}) *Generic {
return &Generic{
pool: make(chan interface{}, max),
fn: fn,
}
}
// Get returns a item from the pool or a new instance if the pool
// is empty. Items returned may not be in the zero state and should
// be reset by the caller.
func (p *Generic) Get(sz int) interface{} {
var c interface{}
select {
case c = <-p.pool:
default:
c = p.fn(sz)
}
return c
}
// Put returns an item back to the pool. If the pool is full, the item
// is discarded.
func (p *Generic) Put(c interface{}) {
select {
case p.pool <- c:
default:
}
}

View File

@ -35,8 +35,17 @@ type BooleanEncoder struct {
}
// NewBooleanEncoder returns a new instance of BooleanEncoder.
func NewBooleanEncoder() BooleanEncoder {
return BooleanEncoder{}
func NewBooleanEncoder(sz int) BooleanEncoder {
return BooleanEncoder{
bytes: make([]byte, 0, (sz+7)/8),
}
}
func (e *BooleanEncoder) Reset() {
e.bytes = e.bytes[:0]
e.b = 0
e.i = 0
e.n = 0
}
func (e *BooleanEncoder) Write(b bool) {

View File

@ -9,7 +9,7 @@ import (
)
func Test_BooleanEncoder_NoValues(t *testing.T) {
enc := tsm1.NewBooleanEncoder()
enc := tsm1.NewBooleanEncoder(0)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -23,7 +23,7 @@ func Test_BooleanEncoder_NoValues(t *testing.T) {
}
func Test_BooleanEncoder_Single(t *testing.T) {
enc := tsm1.NewBooleanEncoder()
enc := tsm1.NewBooleanEncoder(1)
v1 := true
enc.Write(v1)
b, err := enc.Bytes()
@ -43,7 +43,7 @@ func Test_BooleanEncoder_Single(t *testing.T) {
}
func Test_BooleanEncoder_Multi_Compressed(t *testing.T) {
enc := tsm1.NewBooleanEncoder()
enc := tsm1.NewBooleanEncoder(10)
values := make([]bool, 10)
for i := range values {
@ -84,7 +84,7 @@ func Test_BooleanEncoder_Quick(t *testing.T) {
expected = []bool{}
}
// Write values to encoder.
enc := tsm1.NewBooleanEncoder()
enc := tsm1.NewBooleanEncoder(1024)
for _, v := range values {
enc.Write(v)
}
@ -134,7 +134,7 @@ func Test_BooleanDecoder_Corrupt(t *testing.T) {
func BenchmarkBooleanDecoder_2048(b *testing.B) { benchmarkBooleanDecoder(b, 2048) }
func benchmarkBooleanDecoder(b *testing.B, size int) {
e := tsm1.NewBooleanEncoder()
e := tsm1.NewBooleanEncoder(size)
for i := 0; i < size; i++ {
e.Write(i&1 == 1)
}

View File

@ -29,7 +29,9 @@ type entry struct {
// newEntry returns a new instance of entry.
func newEntry() *entry {
return &entry{}
return &entry{
values: make(Values, 0, 32),
}
}
// add adds the given values to the entry.
@ -261,7 +263,7 @@ func (c *Cache) Snapshot() (*Cache, error) {
// If no snapshot exists, create a new one, otherwise update the existing snapshot
if c.snapshot == nil {
c.snapshot = &Cache{
store: make(map[string]*entry),
store: make(map[string]*entry, len(c.store)),
}
}
@ -282,8 +284,7 @@ func (c *Cache) Snapshot() (*Cache, error) {
snapshotSize := c.size // record the number of bytes written into a snapshot
// Reset the cache
c.store = make(map[string]*entry)
c.store = make(map[string]*entry, len(c.store))
c.size = 0
c.lastSnapshot = time.Now()

View File

@ -21,6 +21,7 @@ import (
"sync"
"time"
"github.com/influxdata/influxdb/pkg/pool"
"github.com/influxdata/influxdb/tsdb"
)
@ -834,6 +835,9 @@ type tsmKeyIterator struct {
buf []blocks
// freeBytes are []byte allocated, and free to be re-used
freeBytes *pool.Bytes
// mergeValues are decoded blocks that have been combined
mergedValues Values
@ -898,12 +902,14 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator,
iterators: iter,
fast: fast,
buf: make([]blocks, len(iter)),
freeBytes: pool.NewBytes(1024),
}, nil
}
func (k *tsmKeyIterator) Next() bool {
// Any merged blocks pending?
if len(k.merged) > 0 {
k.freeBytes.Put(k.merged[0].b)
k.merged = k.merged[1:]
if len(k.merged) > 0 {
return true
@ -1163,7 +1169,8 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks {
func (k *tsmKeyIterator) chunk(dst blocks) blocks {
for len(k.mergedValues) > k.size {
values := k.mergedValues[:k.size]
cb, err := Values(values).Encode(nil)
buf := k.freeBytes.Get(16 * 1024)
cb, err := Values(values).Encode(buf)
if err != nil {
k.err = err
return nil
@ -1181,7 +1188,8 @@ func (k *tsmKeyIterator) chunk(dst blocks) blocks {
// Re-encode the remaining values into the last block
if len(k.mergedValues) > 0 {
cb, err := Values(k.mergedValues).Encode(nil)
buf := k.freeBytes.Get(16 * 1024)
cb, err := Values(k.mergedValues).Encode(buf)
if err != nil {
k.err = err
return nil

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/pkg/pool"
"github.com/influxdata/influxdb/tsdb"
)
@ -27,6 +28,24 @@ const (
encodedBlockHeaderSize = 1
)
var (
timeEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} {
return NewTimeEncoder(sz)
})
integerEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} {
return NewIntegerEncoder(sz)
})
floatEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} {
return NewFloatEncoder()
})
stringEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} {
return NewStringEncoder(sz)
})
booleanEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} {
return NewBooleanEncoder(sz)
})
)
type Value interface {
UnixNano() int64
Value() interface{}
@ -220,11 +239,11 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) {
// for timestamps and values.
// Encode values using Gorilla float compression
venc := NewFloatEncoder()
venc := getFloatEncoder()
// Encode timestamps using an adaptive encoder that uses delta-encoding,
// frame-or-reference and run length encoding.
tsenc := NewTimeEncoder()
tsenc := getTimeEncoder(len(values))
for _, v := range values {
tsenc.Write(v.UnixNano())
@ -235,19 +254,25 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) {
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
putTimeEncoder(tsenc)
putFloatEncoder(venc)
return nil, err
}
// Encoded float values
vb, err := venc.Bytes()
if err != nil {
putTimeEncoder(tsenc)
putFloatEncoder(venc)
return nil, err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
block := packBlockHeader(BlockFloat64)
block = append(block, packBlock(tb, vb)...)
return block, nil
b := packBlock(buf, BlockFloat64, tb, vb)
putTimeEncoder(tsenc)
putFloatEncoder(venc)
return b, nil
}
func DecodeFloatBlock(block []byte, tdec *TimeDecoder, vdec *FloatDecoder, a *[]FloatValue) ([]FloatValue, error) {
@ -324,12 +349,10 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) {
// A boolean block is encoded using different compression strategies
// for timestamps and values.
// Encode values using Gorilla float compression
venc := NewBooleanEncoder()
venc := getBooleanEncoder(len(values))
// Encode timestamps using an adaptive encoder
tsenc := NewTimeEncoder()
tsenc := getTimeEncoder(len(values))
for _, v := range values {
tsenc.Write(v.UnixNano())
@ -339,19 +362,24 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) {
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
putTimeEncoder(tsenc)
putBooleanEncoder(venc)
return nil, err
}
// Encoded float values
vb, err := venc.Bytes()
if err != nil {
putTimeEncoder(tsenc)
putBooleanEncoder(venc)
return nil, err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
block := packBlockHeader(BlockBoolean)
block = append(block, packBlock(tb, vb)...)
return block, nil
b := packBlock(buf, BlockBoolean, tb, vb)
putTimeEncoder(tsenc)
putBooleanEncoder(venc)
return b, nil
}
func DecodeBooleanBlock(block []byte, tdec *TimeDecoder, vdec *BooleanDecoder, a *[]BooleanValue) ([]BooleanValue, error) {
@ -420,8 +448,9 @@ func (f *IntegerValue) String() string {
}
func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) {
tsEnc := NewTimeEncoder()
vEnc := NewIntegerEncoder()
tsEnc := getTimeEncoder(len(values))
vEnc := getIntegerEncoder(len(values))
for _, v := range values {
tsEnc.Write(v.UnixNano())
vEnc.Write(v.(*IntegerValue).value)
@ -430,17 +459,23 @@ func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) {
// Encoded timestamp values
tb, err := tsEnc.Bytes()
if err != nil {
putTimeEncoder(tsEnc)
putIntegerEncoder(vEnc)
return nil, err
}
// Encoded int64 values
vb, err := vEnc.Bytes()
if err != nil {
putTimeEncoder(tsEnc)
putIntegerEncoder(vEnc)
return nil, err
}
// Prepend the first timestamp of the block in the first 8 bytes
block := packBlockHeader(BlockInteger)
return append(block, packBlock(tb, vb)...), nil
b := packBlock(buf, BlockInteger, tb, vb)
putTimeEncoder(tsEnc)
putIntegerEncoder(vEnc)
return b, nil
}
func DecodeIntegerBlock(block []byte, tdec *TimeDecoder, vdec *IntegerDecoder, a *[]IntegerValue) ([]IntegerValue, error) {
@ -510,8 +545,9 @@ func (f *StringValue) String() string {
}
func encodeStringBlock(buf []byte, values []Value) ([]byte, error) {
tsEnc := NewTimeEncoder()
vEnc := NewStringEncoder()
tsEnc := getTimeEncoder(len(values))
vEnc := getStringEncoder(len(values) * len(values[0].(*StringValue).value))
for _, v := range values {
tsEnc.Write(v.UnixNano())
vEnc.Write(v.(*StringValue).value)
@ -520,17 +556,23 @@ func encodeStringBlock(buf []byte, values []Value) ([]byte, error) {
// Encoded timestamp values
tb, err := tsEnc.Bytes()
if err != nil {
putTimeEncoder(tsEnc)
putStringEncoder(vEnc)
return nil, err
}
// Encoded string values
vb, err := vEnc.Bytes()
if err != nil {
putTimeEncoder(tsEnc)
putStringEncoder(vEnc)
return nil, err
}
// Prepend the first timestamp of the block in the first 8 bytes
block := packBlockHeader(BlockString)
return append(block, packBlock(tb, vb)...), nil
b := packBlock(buf, BlockString, tb, vb)
putTimeEncoder(tsEnc)
putStringEncoder(vEnc)
return b, nil
}
func DecodeStringBlock(block []byte, tdec *TimeDecoder, vdec *StringDecoder, a *[]StringValue) ([]StringValue, error) {
@ -580,22 +622,24 @@ func DecodeStringBlock(block []byte, tdec *TimeDecoder, vdec *StringDecoder, a *
return (*a)[:i], nil
}
func packBlockHeader(blockType byte) []byte {
return []byte{blockType}
}
func packBlock(ts []byte, values []byte) []byte {
func packBlock(buf []byte, typ byte, ts []byte, values []byte) []byte {
// We encode the length of the timestamp block using a variable byte encoding.
// This allows small byte slices to take up 1 byte while larger ones use 2 or more.
b := make([]byte, 10)
i := binary.PutUvarint(b, uint64(len(ts)))
sz := 1 + 10 + len(ts) + len(values)
if cap(buf) < sz {
buf = make([]byte, sz)
}
b := buf[:sz]
b[0] = typ
i := binary.PutUvarint(b[1:10], uint64(len(ts)))
i += 1
// block is <len timestamp bytes>, <ts bytes>, <value bytes>
block := append(b[:i], ts...)
copy(b[i:], ts)
// We don't encode the value length because we know it's the rest of the block after
// the timestamp block.
return append(block, values...)
copy(b[i+len(ts):], values)
return b[:i+len(ts)+len(values)]
}
func unpackBlock(buf []byte) (ts, values []byte, err error) {
@ -629,3 +673,37 @@ func ZigZagEncode(x int64) uint64 {
func ZigZagDecode(v uint64) int64 {
return int64((v >> 1) ^ uint64((int64(v&1)<<63)>>63))
}
func getTimeEncoder(sz int) TimeEncoder {
x := timeEncoderPool.Get(sz).(TimeEncoder)
x.Reset()
return x
}
func putTimeEncoder(enc TimeEncoder) { timeEncoderPool.Put(enc) }
func getIntegerEncoder(sz int) IntegerEncoder {
x := integerEncoderPool.Get(sz).(IntegerEncoder)
x.Reset()
return x
}
func putIntegerEncoder(enc IntegerEncoder) { integerEncoderPool.Put(enc) }
func getFloatEncoder() *FloatEncoder {
x := floatEncoderPool.Get(1024).(*FloatEncoder)
x.Reset()
return x
}
func putFloatEncoder(enc *FloatEncoder) { floatEncoderPool.Put(enc) }
func getStringEncoder(sz int) StringEncoder {
x := stringEncoderPool.Get(sz).(StringEncoder)
x.Reset()
return x
}
func putStringEncoder(enc StringEncoder) { stringEncoderPool.Put(enc) }
func getBooleanEncoder(sz int) BooleanEncoder {
x := booleanEncoderPool.Get(sz).(BooleanEncoder)
x.Reset()
return x
}
func putBooleanEncoder(enc BooleanEncoder) { booleanEncoderPool.Put(enc) }

View File

@ -1340,3 +1340,69 @@ func BenchmarkValues_Merge(b *testing.B) {
tsm1.Values(a).Merge(c)
}
}
func BenchmarkValues_EncodeInteger(b *testing.B) {
valueCount := 1024
times := getTimes(valueCount, 60, time.Second)
a := make([]tsm1.Value, len(times))
for i, t := range times {
a[i] = tsm1.NewValue(t, int64(i))
}
buf := make([]byte, 1024*8)
b.ResetTimer()
for i := 0; i < b.N; i++ {
tsm1.Values(a).Encode(buf)
}
}
func BenchmarkValues_EncodeFloat(b *testing.B) {
valueCount := 1024
times := getTimes(valueCount, 60, time.Second)
a := make([]tsm1.Value, len(times))
for i, t := range times {
a[i] = tsm1.NewValue(t, float64(i))
}
buf := make([]byte, 1024*8)
b.ResetTimer()
for i := 0; i < b.N; i++ {
tsm1.Values(a).Encode(buf)
}
}
func BenchmarkValues_EncodeString(b *testing.B) {
valueCount := 1024
times := getTimes(valueCount, 60, time.Second)
a := make([]tsm1.Value, len(times))
for i, t := range times {
a[i] = tsm1.NewValue(t, fmt.Sprintf("%d", i))
}
buf := make([]byte, 1024*8)
b.ResetTimer()
for i := 0; i < b.N; i++ {
tsm1.Values(a).Encode(buf)
}
}
func BenchmarkValues_EncodeBool(b *testing.B) {
valueCount := 1024
times := getTimes(valueCount, 60, time.Second)
a := make([]tsm1.Value, len(times))
for i, t := range times {
if i%2 == 0 {
a[i] = tsm1.NewValue(t, true)
} else {
a[i] = tsm1.NewValue(t, false)
}
}
buf := make([]byte, 1024*8)
b.ResetTimer()
for i := 0; i < b.N; i++ {
tsm1.Values(a).Encode(buf)
}
}

View File

@ -637,6 +637,10 @@ func BenchmarkEngine_WritePoints_1000(b *testing.B) {
benchmarkEngine_WritePoints(b, 1000)
}
func BenchmarkEngine_WritePoints_5000(b *testing.B) {
benchmarkEngine_WritePoints(b, 5000)
}
func benchmarkEngine_WritePoints(b *testing.B, batchSize int) {
e := MustOpenEngine()
defer e.Close()
@ -644,9 +648,9 @@ func benchmarkEngine_WritePoints(b *testing.B, batchSize int) {
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
p := MustParsePointString("cpu value=1.2")
pp := make([]models.Point, 0, batchSize)
for i := 0; i < batchSize; i++ {
p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2", i))
pp = append(pp, p)
}

View File

@ -52,13 +52,28 @@ func NewFloatEncoder() *FloatEncoder {
}
s.bw = bitstream.NewWriter(&s.buf)
s.buf.WriteByte(floatCompressedGorilla << 4)
return &s
}
func (s *FloatEncoder) Reset() {
s.val = 0
s.err = nil
s.leading = ^uint64(0)
s.trailing = 0
s.buf.Reset()
s.buf.WriteByte(floatCompressedGorilla << 4)
s.bw.Resume(0x0, 8)
s.finished = false
s.first = true
}
func (s *FloatEncoder) Bytes() ([]byte, error) {
return append([]byte{floatCompressedGorilla << 4}, s.buf.Bytes()...), s.err
return s.buf.Bytes(), s.err
}
func (s *FloatEncoder) Finish() {

View File

@ -43,8 +43,17 @@ type IntegerEncoder struct {
values []uint64
}
func NewIntegerEncoder() IntegerEncoder {
return IntegerEncoder{rle: true}
func NewIntegerEncoder(sz int) IntegerEncoder {
return IntegerEncoder{
rle: true,
values: make([]uint64, 0, sz),
}
}
func (e *IntegerEncoder) Reset() {
e.prev = 0
e.rle = true
e.values = e.values[:0]
}
func (e *IntegerEncoder) Write(v int64) {
@ -77,8 +86,9 @@ func (e *IntegerEncoder) Bytes() ([]byte, error) {
}
func (e *IntegerEncoder) encodeRLE() ([]byte, error) {
// Large varints can take up to 10 bytes
b := make([]byte, 1+10*3)
// Large varints can take up to 10 bytes. We're storing 3 + 1
// type byte.
var b [31]byte
// 4 high bits used for the encoding type
b[0] = byte(intCompressedRLE) << 4

View File

@ -9,7 +9,7 @@ import (
)
func Test_IntegerEncoder_NoValues(t *testing.T) {
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(0)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -27,7 +27,7 @@ func Test_IntegerEncoder_NoValues(t *testing.T) {
}
func Test_IntegerEncoder_One(t *testing.T) {
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(1)
v1 := int64(1)
enc.Write(1)
@ -52,7 +52,7 @@ func Test_IntegerEncoder_One(t *testing.T) {
}
func Test_IntegerEncoder_Two(t *testing.T) {
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(2)
var v1, v2 int64 = 1, 2
enc.Write(v1)
@ -87,7 +87,7 @@ func Test_IntegerEncoder_Two(t *testing.T) {
}
func Test_IntegerEncoder_Negative(t *testing.T) {
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(3)
var v1, v2, v3 int64 = -2, 0, 1
enc.Write(v1)
@ -131,7 +131,7 @@ func Test_IntegerEncoder_Negative(t *testing.T) {
}
func Test_IntegerEncoder_Large_Range(t *testing.T) {
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(2)
var v1, v2 int64 = math.MinInt64, math.MaxInt64
enc.Write(v1)
enc.Write(v2)
@ -164,7 +164,7 @@ func Test_IntegerEncoder_Large_Range(t *testing.T) {
}
func Test_IntegerEncoder_Uncompressed(t *testing.T) {
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(3)
var v1, v2, v3 int64 = 0, 1, 1 << 60
enc.Write(v1)
@ -223,7 +223,7 @@ func Test_IntegerEncoder_NegativeUncompressed(t *testing.T) {
2761419461769776844, -1324397441074946198, -680758138988210958,
94468846694902125, -2394093124890745254, -2682139311758778198,
}
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(256)
for _, v := range values {
enc.Write(v)
}
@ -258,7 +258,7 @@ func Test_IntegerEncoder_NegativeUncompressed(t *testing.T) {
}
func Test_IntegerEncoder_AllNegative(t *testing.T) {
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(3)
values := []int64{
-10, -5, -1,
}
@ -296,7 +296,7 @@ func Test_IntegerEncoder_AllNegative(t *testing.T) {
}
func Test_IntegerEncoder_CounterPacked(t *testing.T) {
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(16)
values := []int64{
1e15, 1e15 + 1, 1e15 + 2, 1e15 + 3, 1e15 + 4, 1e15 + 6,
}
@ -340,7 +340,7 @@ func Test_IntegerEncoder_CounterPacked(t *testing.T) {
}
func Test_IntegerEncoder_CounterRLE(t *testing.T) {
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(16)
values := []int64{
1e15, 1e15 + 1, 1e15 + 2, 1e15 + 3, 1e15 + 4, 1e15 + 5,
}
@ -384,7 +384,7 @@ func Test_IntegerEncoder_CounterRLE(t *testing.T) {
}
func Test_IntegerEncoder_MinMax(t *testing.T) {
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(2)
values := []int64{
math.MinInt64, math.MaxInt64,
}
@ -433,7 +433,7 @@ func Test_IntegerEncoder_Quick(t *testing.T) {
}
// Write values to encoder.
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(1024)
for _, v := range values {
enc.Write(v)
}
@ -484,7 +484,7 @@ func Test_IntegerDecoder_Corrupt(t *testing.T) {
}
func BenchmarkIntegerEncoderRLE(b *testing.B) {
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(1024)
x := make([]int64, 1024)
for i := 0; i < len(x); i++ {
x[i] = int64(i)
@ -498,7 +498,7 @@ func BenchmarkIntegerEncoderRLE(b *testing.B) {
}
func BenchmarkIntegerEncoderPackedSimple(b *testing.B) {
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(1024)
x := make([]int64, 1024)
for i := 0; i < len(x); i++ {
// Small amount of randomness prevents RLE from being used
@ -518,7 +518,7 @@ type byteSetter interface {
func BenchmarkIntegerDecoderPackedSimple(b *testing.B) {
x := make([]int64, 1024)
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(1024)
for i := 0; i < len(x); i++ {
// Small amount of randomness prevents RLE from being used
x[i] = int64(i) + int64(rand.Intn(10))
@ -538,7 +538,7 @@ func BenchmarkIntegerDecoderPackedSimple(b *testing.B) {
func BenchmarkIntegerDecoderRLE(b *testing.B) {
x := make([]int64, 1024)
enc := NewIntegerEncoder()
enc := NewIntegerEncoder(1024)
for i := 0; i < len(x); i++ {
x[i] = int64(i)
enc.Write(x[i])

View File

@ -1,9 +1,13 @@
package tsm1
import "sync"
import (
"sync"
"github.com/influxdata/influxdb/pkg/pool"
)
var (
bufPool sync.Pool
bufPool = pool.NewBytes(1024)
float64ValuePool sync.Pool
integerValuePool sync.Pool
booleanValuePool sync.Pool
@ -12,15 +16,7 @@ var (
// getBuf returns a buffer with length size from the buffer pool.
func getBuf(size int) []byte {
x := bufPool.Get()
if x == nil {
return make([]byte, size)
}
buf := x.([]byte)
if cap(buf) < size {
return make([]byte, size)
}
return buf[:size]
return bufPool.Get(size)
}
// putBuf returns a buffer to the pool.

View File

@ -26,8 +26,14 @@ type StringEncoder struct {
bytes []byte
}
func NewStringEncoder() StringEncoder {
return StringEncoder{}
func NewStringEncoder(sz int) StringEncoder {
return StringEncoder{
bytes: make([]byte, 0, sz),
}
}
func (e *StringEncoder) Reset() {
e.bytes = e.bytes[:0]
}
func (e *StringEncoder) Write(s string) {

View File

@ -8,7 +8,7 @@ import (
)
func Test_StringEncoder_NoValues(t *testing.T) {
enc := NewStringEncoder()
enc := NewStringEncoder(1024)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -24,7 +24,7 @@ func Test_StringEncoder_NoValues(t *testing.T) {
}
func Test_StringEncoder_Single(t *testing.T) {
enc := NewStringEncoder()
enc := NewStringEncoder(1024)
v1 := "v1"
enc.Write(v1)
b, err := enc.Bytes()
@ -46,7 +46,7 @@ func Test_StringEncoder_Single(t *testing.T) {
}
func Test_StringEncoder_Multi_Compressed(t *testing.T) {
enc := NewStringEncoder()
enc := NewStringEncoder(1024)
values := make([]string, 10)
for i := range values {
@ -93,7 +93,7 @@ func Test_StringEncoder_Quick(t *testing.T) {
expected = []string{}
}
// Write values to encoder.
enc := NewStringEncoder()
enc := NewStringEncoder(1024)
for _, v := range values {
enc.Write(v)
}

View File

@ -53,15 +53,27 @@ const (
type TimeEncoder interface {
Write(t int64)
Bytes() ([]byte, error)
Reset()
}
type encoder struct {
ts []uint64
ts []uint64
bytes []byte
enc *simple8b.Encoder
}
// NewTimeEncoder returns a TimeEncoder
func NewTimeEncoder() TimeEncoder {
return &encoder{}
func NewTimeEncoder(sz int) TimeEncoder {
return &encoder{
ts: make([]uint64, 0, sz),
enc: simple8b.NewEncoder(),
}
}
func (e *encoder) Reset() {
e.ts = e.ts[:0]
e.bytes = e.bytes[:0]
e.enc = simple8b.NewEncoder()
}
// Write adds a time.Time to the compressed stream.
@ -108,7 +120,7 @@ func (e *encoder) reduce() (max, divisor uint64, rle bool, deltas []uint64) {
// Bytes returns the encoded bytes of all written times.
func (e *encoder) Bytes() ([]byte, error) {
if len(e.ts) == 0 {
return []byte{}, nil
return e.bytes[:0], nil
}
// Maximum and largest common divisor. rle is true if dts (the delta timestamps),
@ -129,12 +141,21 @@ func (e *encoder) Bytes() ([]byte, error) {
}
func (e *encoder) encodePacked(div uint64, dts []uint64) ([]byte, error) {
enc := simple8b.NewEncoder()
for _, v := range dts[1:] {
enc.Write(uint64(v) / div)
e.enc.Write(uint64(v) / div)
}
b := make([]byte, 8+1)
// The compressed deltas
deltas, err := e.enc.Bytes()
if err != nil {
return nil, err
}
sz := 8 + 1 + len(deltas)
if cap(e.bytes) < sz {
e.bytes = make([]byte, sz)
}
b := e.bytes[:sz]
// 4 high bits used for the encoding type
b[0] = byte(timeCompressedPackedSimple) << 4
@ -144,17 +165,16 @@ func (e *encoder) encodePacked(div uint64, dts []uint64) ([]byte, error) {
// The first delta value
binary.BigEndian.PutUint64(b[1:9], uint64(dts[0]))
// The compressed deltas
deltas, err := enc.Bytes()
if err != nil {
return nil, err
}
return append(b, deltas...), nil
copy(b[9:], deltas)
return b[:9+len(deltas)], nil
}
func (e *encoder) encodeRaw() ([]byte, error) {
b := make([]byte, 1+len(e.ts)*8)
sz := 1 + len(e.ts)*8
if cap(e.bytes) < sz {
e.bytes = make([]byte, sz)
}
b := e.bytes[:sz]
b[0] = byte(timeUncompressed) << 4
for i, v := range e.ts {
binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], uint64(v))
@ -163,9 +183,12 @@ func (e *encoder) encodeRaw() ([]byte, error) {
}
func (e *encoder) encodeRLE(first, delta, div uint64, n int) ([]byte, error) {
// Large varints can take up to 10 bytes
b := make([]byte, 1+10*3)
// Large varints can take up to 10 bytes, we're encoding 3 + 1 byte type
sz := 31
if cap(e.bytes) < sz {
e.bytes = make([]byte, sz)
}
b := e.bytes[:sz]
// 4 high bits used for the encoding type
b[0] = byte(timeCompressedRLE) << 4
// 4 low bits are the log10 divisor

View File

@ -8,7 +8,7 @@ import (
)
func Test_TimeEncoder(t *testing.T) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(1)
x := []int64{}
now := time.Unix(0, 0)
@ -42,7 +42,7 @@ func Test_TimeEncoder(t *testing.T) {
}
func Test_TimeEncoder_NoValues(t *testing.T) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(0)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -56,7 +56,7 @@ func Test_TimeEncoder_NoValues(t *testing.T) {
}
func Test_TimeEncoder_One(t *testing.T) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(1)
var tm int64
enc.Write(tm)
@ -81,7 +81,7 @@ func Test_TimeEncoder_One(t *testing.T) {
}
func Test_TimeEncoder_Two(t *testing.T) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(2)
t1 := int64(0)
t2 := int64(1)
enc.Write(t1)
@ -116,7 +116,7 @@ func Test_TimeEncoder_Two(t *testing.T) {
}
func Test_TimeEncoder_Three(t *testing.T) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(3)
t1 := int64(0)
t2 := int64(1)
t3 := int64(3)
@ -162,7 +162,7 @@ func Test_TimeEncoder_Three(t *testing.T) {
}
func Test_TimeEncoder_Large_Range(t *testing.T) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(2)
t1 := int64(1442369134000000000)
t2 := int64(1442369135000000000)
enc.Write(t1)
@ -196,7 +196,7 @@ func Test_TimeEncoder_Large_Range(t *testing.T) {
}
func Test_TimeEncoder_Uncompressed(t *testing.T) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(3)
t1 := time.Unix(0, 0).UnixNano()
t2 := time.Unix(1, 0).UnixNano()
@ -248,7 +248,7 @@ func Test_TimeEncoder_Uncompressed(t *testing.T) {
}
func Test_TimeEncoder_RLE(t *testing.T) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(512)
var ts []int64
for i := 0; i < 500; i++ {
ts = append(ts, int64(i))
@ -289,7 +289,7 @@ func Test_TimeEncoder_RLE(t *testing.T) {
}
func Test_TimeEncoder_Reverse(t *testing.T) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(3)
ts := []int64{
int64(3),
int64(2),
@ -321,7 +321,7 @@ func Test_TimeEncoder_Reverse(t *testing.T) {
}
func Test_TimeEncoder_220SecondDelta(t *testing.T) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(256)
var ts []int64
now := time.Now()
for i := 0; i < 220; i++ {
@ -368,7 +368,7 @@ func Test_TimeEncoder_220SecondDelta(t *testing.T) {
func Test_TimeEncoder_Quick(t *testing.T) {
quick.Check(func(values []int64) bool {
// Write values to encoder.
enc := NewTimeEncoder()
enc := NewTimeEncoder(1024)
exp := make([]int64, len(values))
for i, v := range values {
exp[i] = int64(v)
@ -402,7 +402,7 @@ func Test_TimeEncoder_Quick(t *testing.T) {
}
func Test_TimeEncoder_RLESeconds(t *testing.T) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(6)
ts := make([]int64, 6)
ts[0] = int64(1444448158000000000)
@ -443,7 +443,7 @@ func Test_TimeEncoder_RLESeconds(t *testing.T) {
}
func TestTimeEncoder_Count_Uncompressed(t *testing.T) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(2)
t1 := time.Unix(0, 0).UnixNano()
t2 := time.Unix(1, 0).UnixNano()
@ -469,7 +469,7 @@ func TestTimeEncoder_Count_Uncompressed(t *testing.T) {
}
func TestTimeEncoder_Count_RLE(t *testing.T) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(5)
ts := make([]int64, 6)
ts[0] = int64(1444448158000000000)
@ -498,7 +498,7 @@ func TestTimeEncoder_Count_RLE(t *testing.T) {
}
func TestTimeEncoder_Count_Simple8(t *testing.T) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(3)
t1 := int64(0)
t2 := int64(1)
t3 := int64(3)
@ -545,7 +545,7 @@ func TestTimeDecoder_Corrupt(t *testing.T) {
}
func BenchmarkTimeEncoder(b *testing.B) {
enc := NewTimeEncoder()
enc := NewTimeEncoder(1024)
x := make([]int64, 1024)
for i := 0; i < len(x); i++ {
x[i] = time.Now().UnixNano()
@ -560,7 +560,7 @@ func BenchmarkTimeEncoder(b *testing.B) {
func BenchmarkTimeDecoder_Packed(b *testing.B) {
x := make([]int64, 1024)
enc := NewTimeEncoder()
enc := NewTimeEncoder(1024)
for i := 0; i < len(x); i++ {
x[i] = time.Now().UnixNano()
enc.Write(x[i])
@ -582,7 +582,7 @@ func BenchmarkTimeDecoder_Packed(b *testing.B) {
func BenchmarkTimeDecoder_RLE(b *testing.B) {
x := make([]int64, 1024)
enc := NewTimeEncoder()
enc := NewTimeEncoder(1024)
for i := 0; i < len(x); i++ {
x[i] = int64(i * 10)
enc.Write(x[i])