diff --git a/models/points.go b/models/points.go index c9a113f0ec..8cb297dcfa 100644 --- a/models/points.go +++ b/models/points.go @@ -171,7 +171,7 @@ func ParseKey(buf []byte) (string, Tags, error) { // ParsePointsWithPrecision is similar to ParsePoints, but allows the // caller to provide a precision for time. func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) { - points := []Point{} + points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1) var ( pos int block []byte diff --git a/models/points_test.go b/models/points_test.go index 0a820bcc86..370001f8d4 100644 --- a/models/points_test.go +++ b/models/points_test.go @@ -96,6 +96,19 @@ func BenchmarkNewPoint(b *testing.B) { } } +func BenchmarkParsePointNoTags5000(b *testing.B) { + var batch [5000]string + for i := 0; i < len(batch); i++ { + batch[i] = `cpu value=1i 1000000000` + } + lines := strings.Join(batch[:], "\n") + b.ResetTimer() + for i := 0; i < b.N; i++ { + models.ParsePoints([]byte(lines)) + b.SetBytes(int64(len(lines))) + } +} + func BenchmarkParsePointNoTags(b *testing.B) { line := `cpu value=1i 1000000000` for i := 0; i < b.N; i++ { diff --git a/pkg/pool/bytes.go b/pkg/pool/bytes.go new file mode 100644 index 0000000000..05396077a9 --- /dev/null +++ b/pkg/pool/bytes.go @@ -0,0 +1,42 @@ +package pool + +// Bytes is a pool of byte slices that can be re-used. Slices in +// this pool will not be garbage collected when not in use. +type Bytes struct { + pool chan []byte +} + +// NewBytes returns a Bytes pool with capacity for max byte slices +// to be pool. +func NewBytes(max int) *Bytes { + return &Bytes{ + pool: make(chan []byte, max), + } +} + +// Get returns a byte slice size with at least sz capacity. Items +// returned may not be in the zero state and should be reset by the +// caller. +func (p *Bytes) Get(sz int) []byte { + var c []byte + select { + case c = <-p.pool: + default: + return make([]byte, sz) + } + + if cap(c) < sz { + return make([]byte, sz) + } + + return c[:sz] +} + +// Put returns a slice back to the pool. If the pool is full, the byte +// slice is discarded. +func (p *Bytes) Put(c []byte) { + select { + case p.pool <- c: + default: + } +} diff --git a/pkg/pool/generic.go b/pkg/pool/generic.go new file mode 100644 index 0000000000..9eb98cce3b --- /dev/null +++ b/pkg/pool/generic.go @@ -0,0 +1,40 @@ +package pool + +// Generic is a pool of types that can be re-used. Items in +// this pool will not be garbage collected when not in use. +type Generic struct { + pool chan interface{} + fn func(sz int) interface{} +} + +// NewGeneric returns a Generic pool with capacity for max items +// to be pool. +func NewGeneric(max int, fn func(sz int) interface{}) *Generic { + return &Generic{ + pool: make(chan interface{}, max), + fn: fn, + } +} + +// Get returns a item from the pool or a new instance if the pool +// is empty. Items returned may not be in the zero state and should +// be reset by the caller. +func (p *Generic) Get(sz int) interface{} { + var c interface{} + select { + case c = <-p.pool: + default: + c = p.fn(sz) + } + + return c +} + +// Put returns an item back to the pool. If the pool is full, the item +// is discarded. +func (p *Generic) Put(c interface{}) { + select { + case p.pool <- c: + default: + } +} diff --git a/tsdb/engine/tsm1/bool.go b/tsdb/engine/tsm1/bool.go index 1ee39f379a..858fe1a137 100644 --- a/tsdb/engine/tsm1/bool.go +++ b/tsdb/engine/tsm1/bool.go @@ -35,8 +35,17 @@ type BooleanEncoder struct { } // NewBooleanEncoder returns a new instance of BooleanEncoder. -func NewBooleanEncoder() BooleanEncoder { - return BooleanEncoder{} +func NewBooleanEncoder(sz int) BooleanEncoder { + return BooleanEncoder{ + bytes: make([]byte, 0, (sz+7)/8), + } +} + +func (e *BooleanEncoder) Reset() { + e.bytes = e.bytes[:0] + e.b = 0 + e.i = 0 + e.n = 0 } func (e *BooleanEncoder) Write(b bool) { diff --git a/tsdb/engine/tsm1/bool_test.go b/tsdb/engine/tsm1/bool_test.go index 0721830925..ad3929ae4b 100644 --- a/tsdb/engine/tsm1/bool_test.go +++ b/tsdb/engine/tsm1/bool_test.go @@ -9,7 +9,7 @@ import ( ) func Test_BooleanEncoder_NoValues(t *testing.T) { - enc := tsm1.NewBooleanEncoder() + enc := tsm1.NewBooleanEncoder(0) b, err := enc.Bytes() if err != nil { t.Fatalf("unexpected error: %v", err) @@ -23,7 +23,7 @@ func Test_BooleanEncoder_NoValues(t *testing.T) { } func Test_BooleanEncoder_Single(t *testing.T) { - enc := tsm1.NewBooleanEncoder() + enc := tsm1.NewBooleanEncoder(1) v1 := true enc.Write(v1) b, err := enc.Bytes() @@ -43,7 +43,7 @@ func Test_BooleanEncoder_Single(t *testing.T) { } func Test_BooleanEncoder_Multi_Compressed(t *testing.T) { - enc := tsm1.NewBooleanEncoder() + enc := tsm1.NewBooleanEncoder(10) values := make([]bool, 10) for i := range values { @@ -84,7 +84,7 @@ func Test_BooleanEncoder_Quick(t *testing.T) { expected = []bool{} } // Write values to encoder. - enc := tsm1.NewBooleanEncoder() + enc := tsm1.NewBooleanEncoder(1024) for _, v := range values { enc.Write(v) } @@ -134,7 +134,7 @@ func Test_BooleanDecoder_Corrupt(t *testing.T) { func BenchmarkBooleanDecoder_2048(b *testing.B) { benchmarkBooleanDecoder(b, 2048) } func benchmarkBooleanDecoder(b *testing.B, size int) { - e := tsm1.NewBooleanEncoder() + e := tsm1.NewBooleanEncoder(size) for i := 0; i < size; i++ { e.Write(i&1 == 1) } diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 4b18841435..42905e6c7f 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -29,7 +29,9 @@ type entry struct { // newEntry returns a new instance of entry. func newEntry() *entry { - return &entry{} + return &entry{ + values: make(Values, 0, 32), + } } // add adds the given values to the entry. @@ -261,7 +263,7 @@ func (c *Cache) Snapshot() (*Cache, error) { // If no snapshot exists, create a new one, otherwise update the existing snapshot if c.snapshot == nil { c.snapshot = &Cache{ - store: make(map[string]*entry), + store: make(map[string]*entry, len(c.store)), } } @@ -282,8 +284,7 @@ func (c *Cache) Snapshot() (*Cache, error) { snapshotSize := c.size // record the number of bytes written into a snapshot - // Reset the cache - c.store = make(map[string]*entry) + c.store = make(map[string]*entry, len(c.store)) c.size = 0 c.lastSnapshot = time.Now() diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 0d2926c6ed..051f71e290 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/influxdata/influxdb/pkg/pool" "github.com/influxdata/influxdb/tsdb" ) @@ -834,6 +835,9 @@ type tsmKeyIterator struct { buf []blocks + // freeBytes are []byte allocated, and free to be re-used + freeBytes *pool.Bytes + // mergeValues are decoded blocks that have been combined mergedValues Values @@ -898,12 +902,14 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, iterators: iter, fast: fast, buf: make([]blocks, len(iter)), + freeBytes: pool.NewBytes(1024), }, nil } func (k *tsmKeyIterator) Next() bool { // Any merged blocks pending? if len(k.merged) > 0 { + k.freeBytes.Put(k.merged[0].b) k.merged = k.merged[1:] if len(k.merged) > 0 { return true @@ -1163,7 +1169,8 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks { func (k *tsmKeyIterator) chunk(dst blocks) blocks { for len(k.mergedValues) > k.size { values := k.mergedValues[:k.size] - cb, err := Values(values).Encode(nil) + buf := k.freeBytes.Get(16 * 1024) + cb, err := Values(values).Encode(buf) if err != nil { k.err = err return nil @@ -1181,7 +1188,8 @@ func (k *tsmKeyIterator) chunk(dst blocks) blocks { // Re-encode the remaining values into the last block if len(k.mergedValues) > 0 { - cb, err := Values(k.mergedValues).Encode(nil) + buf := k.freeBytes.Get(16 * 1024) + cb, err := Values(k.mergedValues).Encode(buf) if err != nil { k.err = err return nil diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index f26a225d62..ce685aa2fa 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -6,6 +6,7 @@ import ( "time" "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/pkg/pool" "github.com/influxdata/influxdb/tsdb" ) @@ -27,6 +28,24 @@ const ( encodedBlockHeaderSize = 1 ) +var ( + timeEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + return NewTimeEncoder(sz) + }) + integerEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + return NewIntegerEncoder(sz) + }) + floatEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + return NewFloatEncoder() + }) + stringEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + return NewStringEncoder(sz) + }) + booleanEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + return NewBooleanEncoder(sz) + }) +) + type Value interface { UnixNano() int64 Value() interface{} @@ -220,11 +239,11 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) { // for timestamps and values. // Encode values using Gorilla float compression - venc := NewFloatEncoder() + venc := getFloatEncoder() // Encode timestamps using an adaptive encoder that uses delta-encoding, // frame-or-reference and run length encoding. - tsenc := NewTimeEncoder() + tsenc := getTimeEncoder(len(values)) for _, v := range values { tsenc.Write(v.UnixNano()) @@ -235,19 +254,25 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) { // Encoded timestamp values tb, err := tsenc.Bytes() if err != nil { + putTimeEncoder(tsenc) + putFloatEncoder(venc) return nil, err } // Encoded float values vb, err := venc.Bytes() if err != nil { + putTimeEncoder(tsenc) + putFloatEncoder(venc) return nil, err } // Prepend the first timestamp of the block in the first 8 bytes and the block // in the next byte, followed by the block - block := packBlockHeader(BlockFloat64) - block = append(block, packBlock(tb, vb)...) - return block, nil + b := packBlock(buf, BlockFloat64, tb, vb) + putTimeEncoder(tsenc) + putFloatEncoder(venc) + return b, nil + } func DecodeFloatBlock(block []byte, tdec *TimeDecoder, vdec *FloatDecoder, a *[]FloatValue) ([]FloatValue, error) { @@ -324,12 +349,10 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) { // A boolean block is encoded using different compression strategies // for timestamps and values. - - // Encode values using Gorilla float compression - venc := NewBooleanEncoder() + venc := getBooleanEncoder(len(values)) // Encode timestamps using an adaptive encoder - tsenc := NewTimeEncoder() + tsenc := getTimeEncoder(len(values)) for _, v := range values { tsenc.Write(v.UnixNano()) @@ -339,19 +362,24 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) { // Encoded timestamp values tb, err := tsenc.Bytes() if err != nil { + putTimeEncoder(tsenc) + putBooleanEncoder(venc) return nil, err } // Encoded float values vb, err := venc.Bytes() if err != nil { + putTimeEncoder(tsenc) + putBooleanEncoder(venc) return nil, err } // Prepend the first timestamp of the block in the first 8 bytes and the block // in the next byte, followed by the block - block := packBlockHeader(BlockBoolean) - block = append(block, packBlock(tb, vb)...) - return block, nil + b := packBlock(buf, BlockBoolean, tb, vb) + putTimeEncoder(tsenc) + putBooleanEncoder(venc) + return b, nil } func DecodeBooleanBlock(block []byte, tdec *TimeDecoder, vdec *BooleanDecoder, a *[]BooleanValue) ([]BooleanValue, error) { @@ -420,8 +448,9 @@ func (f *IntegerValue) String() string { } func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) { - tsEnc := NewTimeEncoder() - vEnc := NewIntegerEncoder() + tsEnc := getTimeEncoder(len(values)) + vEnc := getIntegerEncoder(len(values)) + for _, v := range values { tsEnc.Write(v.UnixNano()) vEnc.Write(v.(*IntegerValue).value) @@ -430,17 +459,23 @@ func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) { // Encoded timestamp values tb, err := tsEnc.Bytes() if err != nil { + putTimeEncoder(tsEnc) + putIntegerEncoder(vEnc) return nil, err } // Encoded int64 values vb, err := vEnc.Bytes() if err != nil { + putTimeEncoder(tsEnc) + putIntegerEncoder(vEnc) return nil, err } // Prepend the first timestamp of the block in the first 8 bytes - block := packBlockHeader(BlockInteger) - return append(block, packBlock(tb, vb)...), nil + b := packBlock(buf, BlockInteger, tb, vb) + putTimeEncoder(tsEnc) + putIntegerEncoder(vEnc) + return b, nil } func DecodeIntegerBlock(block []byte, tdec *TimeDecoder, vdec *IntegerDecoder, a *[]IntegerValue) ([]IntegerValue, error) { @@ -510,8 +545,9 @@ func (f *StringValue) String() string { } func encodeStringBlock(buf []byte, values []Value) ([]byte, error) { - tsEnc := NewTimeEncoder() - vEnc := NewStringEncoder() + tsEnc := getTimeEncoder(len(values)) + vEnc := getStringEncoder(len(values) * len(values[0].(*StringValue).value)) + for _, v := range values { tsEnc.Write(v.UnixNano()) vEnc.Write(v.(*StringValue).value) @@ -520,17 +556,23 @@ func encodeStringBlock(buf []byte, values []Value) ([]byte, error) { // Encoded timestamp values tb, err := tsEnc.Bytes() if err != nil { + putTimeEncoder(tsEnc) + putStringEncoder(vEnc) return nil, err } // Encoded string values vb, err := vEnc.Bytes() if err != nil { + putTimeEncoder(tsEnc) + putStringEncoder(vEnc) return nil, err } // Prepend the first timestamp of the block in the first 8 bytes - block := packBlockHeader(BlockString) - return append(block, packBlock(tb, vb)...), nil + b := packBlock(buf, BlockString, tb, vb) + putTimeEncoder(tsEnc) + putStringEncoder(vEnc) + return b, nil } func DecodeStringBlock(block []byte, tdec *TimeDecoder, vdec *StringDecoder, a *[]StringValue) ([]StringValue, error) { @@ -580,22 +622,24 @@ func DecodeStringBlock(block []byte, tdec *TimeDecoder, vdec *StringDecoder, a * return (*a)[:i], nil } -func packBlockHeader(blockType byte) []byte { - return []byte{blockType} -} - -func packBlock(ts []byte, values []byte) []byte { +func packBlock(buf []byte, typ byte, ts []byte, values []byte) []byte { // We encode the length of the timestamp block using a variable byte encoding. // This allows small byte slices to take up 1 byte while larger ones use 2 or more. - b := make([]byte, 10) - i := binary.PutUvarint(b, uint64(len(ts))) + sz := 1 + 10 + len(ts) + len(values) + if cap(buf) < sz { + buf = make([]byte, sz) + } + b := buf[:sz] + b[0] = typ + i := binary.PutUvarint(b[1:10], uint64(len(ts))) + i += 1 // block is , , - block := append(b[:i], ts...) - + copy(b[i:], ts) // We don't encode the value length because we know it's the rest of the block after // the timestamp block. - return append(block, values...) + copy(b[i+len(ts):], values) + return b[:i+len(ts)+len(values)] } func unpackBlock(buf []byte) (ts, values []byte, err error) { @@ -629,3 +673,37 @@ func ZigZagEncode(x int64) uint64 { func ZigZagDecode(v uint64) int64 { return int64((v >> 1) ^ uint64((int64(v&1)<<63)>>63)) } +func getTimeEncoder(sz int) TimeEncoder { + x := timeEncoderPool.Get(sz).(TimeEncoder) + x.Reset() + return x +} +func putTimeEncoder(enc TimeEncoder) { timeEncoderPool.Put(enc) } + +func getIntegerEncoder(sz int) IntegerEncoder { + x := integerEncoderPool.Get(sz).(IntegerEncoder) + x.Reset() + return x +} +func putIntegerEncoder(enc IntegerEncoder) { integerEncoderPool.Put(enc) } + +func getFloatEncoder() *FloatEncoder { + x := floatEncoderPool.Get(1024).(*FloatEncoder) + x.Reset() + return x +} +func putFloatEncoder(enc *FloatEncoder) { floatEncoderPool.Put(enc) } + +func getStringEncoder(sz int) StringEncoder { + x := stringEncoderPool.Get(sz).(StringEncoder) + x.Reset() + return x +} +func putStringEncoder(enc StringEncoder) { stringEncoderPool.Put(enc) } + +func getBooleanEncoder(sz int) BooleanEncoder { + x := booleanEncoderPool.Get(sz).(BooleanEncoder) + x.Reset() + return x +} +func putBooleanEncoder(enc BooleanEncoder) { booleanEncoderPool.Put(enc) } diff --git a/tsdb/engine/tsm1/encoding_test.go b/tsdb/engine/tsm1/encoding_test.go index e1809224f9..7d6ea7170f 100644 --- a/tsdb/engine/tsm1/encoding_test.go +++ b/tsdb/engine/tsm1/encoding_test.go @@ -1340,3 +1340,69 @@ func BenchmarkValues_Merge(b *testing.B) { tsm1.Values(a).Merge(c) } } + +func BenchmarkValues_EncodeInteger(b *testing.B) { + valueCount := 1024 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + + for i, t := range times { + a[i] = tsm1.NewValue(t, int64(i)) + } + + buf := make([]byte, 1024*8) + b.ResetTimer() + for i := 0; i < b.N; i++ { + tsm1.Values(a).Encode(buf) + } +} + +func BenchmarkValues_EncodeFloat(b *testing.B) { + valueCount := 1024 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + + for i, t := range times { + a[i] = tsm1.NewValue(t, float64(i)) + } + + buf := make([]byte, 1024*8) + b.ResetTimer() + for i := 0; i < b.N; i++ { + tsm1.Values(a).Encode(buf) + } +} +func BenchmarkValues_EncodeString(b *testing.B) { + valueCount := 1024 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + + for i, t := range times { + a[i] = tsm1.NewValue(t, fmt.Sprintf("%d", i)) + } + + buf := make([]byte, 1024*8) + b.ResetTimer() + for i := 0; i < b.N; i++ { + tsm1.Values(a).Encode(buf) + } +} +func BenchmarkValues_EncodeBool(b *testing.B) { + valueCount := 1024 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + + for i, t := range times { + if i%2 == 0 { + a[i] = tsm1.NewValue(t, true) + } else { + a[i] = tsm1.NewValue(t, false) + } + } + + buf := make([]byte, 1024*8) + b.ResetTimer() + for i := 0; i < b.N; i++ { + tsm1.Values(a).Encode(buf) + } +} diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 253655c66c..7bd6b8e620 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -637,6 +637,10 @@ func BenchmarkEngine_WritePoints_1000(b *testing.B) { benchmarkEngine_WritePoints(b, 1000) } +func BenchmarkEngine_WritePoints_5000(b *testing.B) { + benchmarkEngine_WritePoints(b, 5000) +} + func benchmarkEngine_WritePoints(b *testing.B, batchSize int) { e := MustOpenEngine() defer e.Close() @@ -644,9 +648,9 @@ func benchmarkEngine_WritePoints(b *testing.B, batchSize int) { e.Index().CreateMeasurementIndexIfNotExists("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - p := MustParsePointString("cpu value=1.2") pp := make([]models.Point, 0, batchSize) for i := 0; i < batchSize; i++ { + p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2", i)) pp = append(pp, p) } diff --git a/tsdb/engine/tsm1/float.go b/tsdb/engine/tsm1/float.go index bedd006516..d92ea1b00b 100644 --- a/tsdb/engine/tsm1/float.go +++ b/tsdb/engine/tsm1/float.go @@ -52,13 +52,28 @@ func NewFloatEncoder() *FloatEncoder { } s.bw = bitstream.NewWriter(&s.buf) + s.buf.WriteByte(floatCompressedGorilla << 4) return &s } +func (s *FloatEncoder) Reset() { + s.val = 0 + s.err = nil + s.leading = ^uint64(0) + s.trailing = 0 + s.buf.Reset() + s.buf.WriteByte(floatCompressedGorilla << 4) + + s.bw.Resume(0x0, 8) + + s.finished = false + s.first = true +} + func (s *FloatEncoder) Bytes() ([]byte, error) { - return append([]byte{floatCompressedGorilla << 4}, s.buf.Bytes()...), s.err + return s.buf.Bytes(), s.err } func (s *FloatEncoder) Finish() { diff --git a/tsdb/engine/tsm1/int.go b/tsdb/engine/tsm1/int.go index 8da99d61a3..21620a195b 100644 --- a/tsdb/engine/tsm1/int.go +++ b/tsdb/engine/tsm1/int.go @@ -43,8 +43,17 @@ type IntegerEncoder struct { values []uint64 } -func NewIntegerEncoder() IntegerEncoder { - return IntegerEncoder{rle: true} +func NewIntegerEncoder(sz int) IntegerEncoder { + return IntegerEncoder{ + rle: true, + values: make([]uint64, 0, sz), + } +} + +func (e *IntegerEncoder) Reset() { + e.prev = 0 + e.rle = true + e.values = e.values[:0] } func (e *IntegerEncoder) Write(v int64) { @@ -77,8 +86,9 @@ func (e *IntegerEncoder) Bytes() ([]byte, error) { } func (e *IntegerEncoder) encodeRLE() ([]byte, error) { - // Large varints can take up to 10 bytes - b := make([]byte, 1+10*3) + // Large varints can take up to 10 bytes. We're storing 3 + 1 + // type byte. + var b [31]byte // 4 high bits used for the encoding type b[0] = byte(intCompressedRLE) << 4 diff --git a/tsdb/engine/tsm1/int_test.go b/tsdb/engine/tsm1/int_test.go index 8d8525d11d..b1916e5425 100644 --- a/tsdb/engine/tsm1/int_test.go +++ b/tsdb/engine/tsm1/int_test.go @@ -9,7 +9,7 @@ import ( ) func Test_IntegerEncoder_NoValues(t *testing.T) { - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(0) b, err := enc.Bytes() if err != nil { t.Fatalf("unexpected error: %v", err) @@ -27,7 +27,7 @@ func Test_IntegerEncoder_NoValues(t *testing.T) { } func Test_IntegerEncoder_One(t *testing.T) { - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(1) v1 := int64(1) enc.Write(1) @@ -52,7 +52,7 @@ func Test_IntegerEncoder_One(t *testing.T) { } func Test_IntegerEncoder_Two(t *testing.T) { - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(2) var v1, v2 int64 = 1, 2 enc.Write(v1) @@ -87,7 +87,7 @@ func Test_IntegerEncoder_Two(t *testing.T) { } func Test_IntegerEncoder_Negative(t *testing.T) { - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(3) var v1, v2, v3 int64 = -2, 0, 1 enc.Write(v1) @@ -131,7 +131,7 @@ func Test_IntegerEncoder_Negative(t *testing.T) { } func Test_IntegerEncoder_Large_Range(t *testing.T) { - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(2) var v1, v2 int64 = math.MinInt64, math.MaxInt64 enc.Write(v1) enc.Write(v2) @@ -164,7 +164,7 @@ func Test_IntegerEncoder_Large_Range(t *testing.T) { } func Test_IntegerEncoder_Uncompressed(t *testing.T) { - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(3) var v1, v2, v3 int64 = 0, 1, 1 << 60 enc.Write(v1) @@ -223,7 +223,7 @@ func Test_IntegerEncoder_NegativeUncompressed(t *testing.T) { 2761419461769776844, -1324397441074946198, -680758138988210958, 94468846694902125, -2394093124890745254, -2682139311758778198, } - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(256) for _, v := range values { enc.Write(v) } @@ -258,7 +258,7 @@ func Test_IntegerEncoder_NegativeUncompressed(t *testing.T) { } func Test_IntegerEncoder_AllNegative(t *testing.T) { - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(3) values := []int64{ -10, -5, -1, } @@ -296,7 +296,7 @@ func Test_IntegerEncoder_AllNegative(t *testing.T) { } func Test_IntegerEncoder_CounterPacked(t *testing.T) { - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(16) values := []int64{ 1e15, 1e15 + 1, 1e15 + 2, 1e15 + 3, 1e15 + 4, 1e15 + 6, } @@ -340,7 +340,7 @@ func Test_IntegerEncoder_CounterPacked(t *testing.T) { } func Test_IntegerEncoder_CounterRLE(t *testing.T) { - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(16) values := []int64{ 1e15, 1e15 + 1, 1e15 + 2, 1e15 + 3, 1e15 + 4, 1e15 + 5, } @@ -384,7 +384,7 @@ func Test_IntegerEncoder_CounterRLE(t *testing.T) { } func Test_IntegerEncoder_MinMax(t *testing.T) { - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(2) values := []int64{ math.MinInt64, math.MaxInt64, } @@ -433,7 +433,7 @@ func Test_IntegerEncoder_Quick(t *testing.T) { } // Write values to encoder. - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(1024) for _, v := range values { enc.Write(v) } @@ -484,7 +484,7 @@ func Test_IntegerDecoder_Corrupt(t *testing.T) { } func BenchmarkIntegerEncoderRLE(b *testing.B) { - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(1024) x := make([]int64, 1024) for i := 0; i < len(x); i++ { x[i] = int64(i) @@ -498,7 +498,7 @@ func BenchmarkIntegerEncoderRLE(b *testing.B) { } func BenchmarkIntegerEncoderPackedSimple(b *testing.B) { - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(1024) x := make([]int64, 1024) for i := 0; i < len(x); i++ { // Small amount of randomness prevents RLE from being used @@ -518,7 +518,7 @@ type byteSetter interface { func BenchmarkIntegerDecoderPackedSimple(b *testing.B) { x := make([]int64, 1024) - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(1024) for i := 0; i < len(x); i++ { // Small amount of randomness prevents RLE from being used x[i] = int64(i) + int64(rand.Intn(10)) @@ -538,7 +538,7 @@ func BenchmarkIntegerDecoderPackedSimple(b *testing.B) { func BenchmarkIntegerDecoderRLE(b *testing.B) { x := make([]int64, 1024) - enc := NewIntegerEncoder() + enc := NewIntegerEncoder(1024) for i := 0; i < len(x); i++ { x[i] = int64(i) enc.Write(x[i]) diff --git a/tsdb/engine/tsm1/pools.go b/tsdb/engine/tsm1/pools.go index 4f728faacf..8f8b05e26f 100644 --- a/tsdb/engine/tsm1/pools.go +++ b/tsdb/engine/tsm1/pools.go @@ -1,9 +1,13 @@ package tsm1 -import "sync" +import ( + "sync" + + "github.com/influxdata/influxdb/pkg/pool" +) var ( - bufPool sync.Pool + bufPool = pool.NewBytes(1024) float64ValuePool sync.Pool integerValuePool sync.Pool booleanValuePool sync.Pool @@ -12,15 +16,7 @@ var ( // getBuf returns a buffer with length size from the buffer pool. func getBuf(size int) []byte { - x := bufPool.Get() - if x == nil { - return make([]byte, size) - } - buf := x.([]byte) - if cap(buf) < size { - return make([]byte, size) - } - return buf[:size] + return bufPool.Get(size) } // putBuf returns a buffer to the pool. diff --git a/tsdb/engine/tsm1/string.go b/tsdb/engine/tsm1/string.go index bf5e4fcceb..0aa8982075 100644 --- a/tsdb/engine/tsm1/string.go +++ b/tsdb/engine/tsm1/string.go @@ -26,8 +26,14 @@ type StringEncoder struct { bytes []byte } -func NewStringEncoder() StringEncoder { - return StringEncoder{} +func NewStringEncoder(sz int) StringEncoder { + return StringEncoder{ + bytes: make([]byte, 0, sz), + } +} + +func (e *StringEncoder) Reset() { + e.bytes = e.bytes[:0] } func (e *StringEncoder) Write(s string) { diff --git a/tsdb/engine/tsm1/string_test.go b/tsdb/engine/tsm1/string_test.go index f943302bcf..25453d6cf0 100644 --- a/tsdb/engine/tsm1/string_test.go +++ b/tsdb/engine/tsm1/string_test.go @@ -8,7 +8,7 @@ import ( ) func Test_StringEncoder_NoValues(t *testing.T) { - enc := NewStringEncoder() + enc := NewStringEncoder(1024) b, err := enc.Bytes() if err != nil { t.Fatalf("unexpected error: %v", err) @@ -24,7 +24,7 @@ func Test_StringEncoder_NoValues(t *testing.T) { } func Test_StringEncoder_Single(t *testing.T) { - enc := NewStringEncoder() + enc := NewStringEncoder(1024) v1 := "v1" enc.Write(v1) b, err := enc.Bytes() @@ -46,7 +46,7 @@ func Test_StringEncoder_Single(t *testing.T) { } func Test_StringEncoder_Multi_Compressed(t *testing.T) { - enc := NewStringEncoder() + enc := NewStringEncoder(1024) values := make([]string, 10) for i := range values { @@ -93,7 +93,7 @@ func Test_StringEncoder_Quick(t *testing.T) { expected = []string{} } // Write values to encoder. - enc := NewStringEncoder() + enc := NewStringEncoder(1024) for _, v := range values { enc.Write(v) } diff --git a/tsdb/engine/tsm1/timestamp.go b/tsdb/engine/tsm1/timestamp.go index 27dff49ecf..2c8bf90a5b 100644 --- a/tsdb/engine/tsm1/timestamp.go +++ b/tsdb/engine/tsm1/timestamp.go @@ -53,15 +53,27 @@ const ( type TimeEncoder interface { Write(t int64) Bytes() ([]byte, error) + Reset() } type encoder struct { - ts []uint64 + ts []uint64 + bytes []byte + enc *simple8b.Encoder } // NewTimeEncoder returns a TimeEncoder -func NewTimeEncoder() TimeEncoder { - return &encoder{} +func NewTimeEncoder(sz int) TimeEncoder { + return &encoder{ + ts: make([]uint64, 0, sz), + enc: simple8b.NewEncoder(), + } +} + +func (e *encoder) Reset() { + e.ts = e.ts[:0] + e.bytes = e.bytes[:0] + e.enc = simple8b.NewEncoder() } // Write adds a time.Time to the compressed stream. @@ -108,7 +120,7 @@ func (e *encoder) reduce() (max, divisor uint64, rle bool, deltas []uint64) { // Bytes returns the encoded bytes of all written times. func (e *encoder) Bytes() ([]byte, error) { if len(e.ts) == 0 { - return []byte{}, nil + return e.bytes[:0], nil } // Maximum and largest common divisor. rle is true if dts (the delta timestamps), @@ -129,12 +141,21 @@ func (e *encoder) Bytes() ([]byte, error) { } func (e *encoder) encodePacked(div uint64, dts []uint64) ([]byte, error) { - enc := simple8b.NewEncoder() for _, v := range dts[1:] { - enc.Write(uint64(v) / div) + e.enc.Write(uint64(v) / div) } - b := make([]byte, 8+1) + // The compressed deltas + deltas, err := e.enc.Bytes() + if err != nil { + return nil, err + } + + sz := 8 + 1 + len(deltas) + if cap(e.bytes) < sz { + e.bytes = make([]byte, sz) + } + b := e.bytes[:sz] // 4 high bits used for the encoding type b[0] = byte(timeCompressedPackedSimple) << 4 @@ -144,17 +165,16 @@ func (e *encoder) encodePacked(div uint64, dts []uint64) ([]byte, error) { // The first delta value binary.BigEndian.PutUint64(b[1:9], uint64(dts[0])) - // The compressed deltas - deltas, err := enc.Bytes() - if err != nil { - return nil, err - } - - return append(b, deltas...), nil + copy(b[9:], deltas) + return b[:9+len(deltas)], nil } func (e *encoder) encodeRaw() ([]byte, error) { - b := make([]byte, 1+len(e.ts)*8) + sz := 1 + len(e.ts)*8 + if cap(e.bytes) < sz { + e.bytes = make([]byte, sz) + } + b := e.bytes[:sz] b[0] = byte(timeUncompressed) << 4 for i, v := range e.ts { binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], uint64(v)) @@ -163,9 +183,12 @@ func (e *encoder) encodeRaw() ([]byte, error) { } func (e *encoder) encodeRLE(first, delta, div uint64, n int) ([]byte, error) { - // Large varints can take up to 10 bytes - b := make([]byte, 1+10*3) - + // Large varints can take up to 10 bytes, we're encoding 3 + 1 byte type + sz := 31 + if cap(e.bytes) < sz { + e.bytes = make([]byte, sz) + } + b := e.bytes[:sz] // 4 high bits used for the encoding type b[0] = byte(timeCompressedRLE) << 4 // 4 low bits are the log10 divisor diff --git a/tsdb/engine/tsm1/timestamp_test.go b/tsdb/engine/tsm1/timestamp_test.go index 158773ae26..f303b22eb2 100644 --- a/tsdb/engine/tsm1/timestamp_test.go +++ b/tsdb/engine/tsm1/timestamp_test.go @@ -8,7 +8,7 @@ import ( ) func Test_TimeEncoder(t *testing.T) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(1) x := []int64{} now := time.Unix(0, 0) @@ -42,7 +42,7 @@ func Test_TimeEncoder(t *testing.T) { } func Test_TimeEncoder_NoValues(t *testing.T) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(0) b, err := enc.Bytes() if err != nil { t.Fatalf("unexpected error: %v", err) @@ -56,7 +56,7 @@ func Test_TimeEncoder_NoValues(t *testing.T) { } func Test_TimeEncoder_One(t *testing.T) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(1) var tm int64 enc.Write(tm) @@ -81,7 +81,7 @@ func Test_TimeEncoder_One(t *testing.T) { } func Test_TimeEncoder_Two(t *testing.T) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(2) t1 := int64(0) t2 := int64(1) enc.Write(t1) @@ -116,7 +116,7 @@ func Test_TimeEncoder_Two(t *testing.T) { } func Test_TimeEncoder_Three(t *testing.T) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(3) t1 := int64(0) t2 := int64(1) t3 := int64(3) @@ -162,7 +162,7 @@ func Test_TimeEncoder_Three(t *testing.T) { } func Test_TimeEncoder_Large_Range(t *testing.T) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(2) t1 := int64(1442369134000000000) t2 := int64(1442369135000000000) enc.Write(t1) @@ -196,7 +196,7 @@ func Test_TimeEncoder_Large_Range(t *testing.T) { } func Test_TimeEncoder_Uncompressed(t *testing.T) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(3) t1 := time.Unix(0, 0).UnixNano() t2 := time.Unix(1, 0).UnixNano() @@ -248,7 +248,7 @@ func Test_TimeEncoder_Uncompressed(t *testing.T) { } func Test_TimeEncoder_RLE(t *testing.T) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(512) var ts []int64 for i := 0; i < 500; i++ { ts = append(ts, int64(i)) @@ -289,7 +289,7 @@ func Test_TimeEncoder_RLE(t *testing.T) { } func Test_TimeEncoder_Reverse(t *testing.T) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(3) ts := []int64{ int64(3), int64(2), @@ -321,7 +321,7 @@ func Test_TimeEncoder_Reverse(t *testing.T) { } func Test_TimeEncoder_220SecondDelta(t *testing.T) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(256) var ts []int64 now := time.Now() for i := 0; i < 220; i++ { @@ -368,7 +368,7 @@ func Test_TimeEncoder_220SecondDelta(t *testing.T) { func Test_TimeEncoder_Quick(t *testing.T) { quick.Check(func(values []int64) bool { // Write values to encoder. - enc := NewTimeEncoder() + enc := NewTimeEncoder(1024) exp := make([]int64, len(values)) for i, v := range values { exp[i] = int64(v) @@ -402,7 +402,7 @@ func Test_TimeEncoder_Quick(t *testing.T) { } func Test_TimeEncoder_RLESeconds(t *testing.T) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(6) ts := make([]int64, 6) ts[0] = int64(1444448158000000000) @@ -443,7 +443,7 @@ func Test_TimeEncoder_RLESeconds(t *testing.T) { } func TestTimeEncoder_Count_Uncompressed(t *testing.T) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(2) t1 := time.Unix(0, 0).UnixNano() t2 := time.Unix(1, 0).UnixNano() @@ -469,7 +469,7 @@ func TestTimeEncoder_Count_Uncompressed(t *testing.T) { } func TestTimeEncoder_Count_RLE(t *testing.T) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(5) ts := make([]int64, 6) ts[0] = int64(1444448158000000000) @@ -498,7 +498,7 @@ func TestTimeEncoder_Count_RLE(t *testing.T) { } func TestTimeEncoder_Count_Simple8(t *testing.T) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(3) t1 := int64(0) t2 := int64(1) t3 := int64(3) @@ -545,7 +545,7 @@ func TestTimeDecoder_Corrupt(t *testing.T) { } func BenchmarkTimeEncoder(b *testing.B) { - enc := NewTimeEncoder() + enc := NewTimeEncoder(1024) x := make([]int64, 1024) for i := 0; i < len(x); i++ { x[i] = time.Now().UnixNano() @@ -560,7 +560,7 @@ func BenchmarkTimeEncoder(b *testing.B) { func BenchmarkTimeDecoder_Packed(b *testing.B) { x := make([]int64, 1024) - enc := NewTimeEncoder() + enc := NewTimeEncoder(1024) for i := 0; i < len(x); i++ { x[i] = time.Now().UnixNano() enc.Write(x[i]) @@ -582,7 +582,7 @@ func BenchmarkTimeDecoder_Packed(b *testing.B) { func BenchmarkTimeDecoder_RLE(b *testing.B) { x := make([]int64, 1024) - enc := NewTimeEncoder() + enc := NewTimeEncoder(1024) for i := 0; i < len(x); i++ { x[i] = int64(i * 10) enc.Write(x[i])