Add LimitedBytePool for wal buffers
This pool was previously a pool.Bytes to avoid repetitive allocations. It was recently switchted to a sync.Pool because pool.Bytes held onto very larger buffers at times which were never released. sync.Pool is showing up in allocation profiles quite frequently. This switches the pool to a new pool that limits how many buffers are in the pool as well as the max size of each buffer in the pool. This provides better bounds on allocations.pull/8384/head
parent
e17be9f4ba
commit
503d41a08f
|
@ -41,3 +41,60 @@ func (p *Bytes) Put(c []byte) {
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LimitedBytes is a pool of byte slices that can be re-used. Slices in
|
||||||
|
// this pool will not be garbage collected when not in use. The pool will
|
||||||
|
// hold onto a fixed number of byte slices of a maximum size. If the pool
|
||||||
|
// is empty and max pool size has not been allocated yet, it will return a
|
||||||
|
// new byte slice. Byte slices added to the pool that are over the max size
|
||||||
|
// are dropped.
|
||||||
|
type LimitedBytes struct {
|
||||||
|
allocated int64
|
||||||
|
maxSize int
|
||||||
|
pool chan []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBytes returns a Bytes pool with capacity for max byte slices
|
||||||
|
// to be pool.
|
||||||
|
func NewLimitedBytes(capacity int, maxSize int) *LimitedBytes {
|
||||||
|
return &LimitedBytes{
|
||||||
|
pool: make(chan []byte, capacity),
|
||||||
|
maxSize: maxSize,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns a byte slice size with at least sz capacity. Items
|
||||||
|
// returned may not be in the zero state and should be reset by the
|
||||||
|
// caller.
|
||||||
|
func (p *LimitedBytes) Get(sz int) []byte {
|
||||||
|
var c []byte
|
||||||
|
|
||||||
|
// If we have not allocated our capacity, return a new allocation,
|
||||||
|
// otherwise block until one frees up.
|
||||||
|
select {
|
||||||
|
case c = <-p.pool:
|
||||||
|
default:
|
||||||
|
return make([]byte, sz)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cap(c) < sz {
|
||||||
|
return make([]byte, sz)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c[:sz]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put returns a slice back to the pool. If the pool is full, the byte
|
||||||
|
// slice is discarded. If the byte slice is over the configured max size
|
||||||
|
// of any byte slice in the pool, it is discared.
|
||||||
|
func (p *LimitedBytes) Put(c []byte) {
|
||||||
|
// Drop buffers that are larger than the max size
|
||||||
|
if cap(c) >= p.maxSize {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case p.pool <- c:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
package pool_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/pkg/pool"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLimitedBytePool_Put_MaxSize(t *testing.T) {
|
||||||
|
bp := pool.NewLimitedBytes(1, 10)
|
||||||
|
bp.Put(make([]byte, 1024)) // should be dropped
|
||||||
|
|
||||||
|
if got, exp := cap(bp.Get(10)), 10; got != exp {
|
||||||
|
t.Fatalf("max cap size exceeded: got %v, exp %v", got, exp)
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"github.com/golang/snappy"
|
"github.com/golang/snappy"
|
||||||
"github.com/influxdata/influxdb/models"
|
"github.com/influxdata/influxdb/models"
|
||||||
"github.com/influxdata/influxdb/pkg/limiter"
|
"github.com/influxdata/influxdb/pkg/limiter"
|
||||||
|
"github.com/influxdata/influxdb/pkg/pool"
|
||||||
"github.com/uber-go/zap"
|
"github.com/uber-go/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -66,6 +67,9 @@ var (
|
||||||
ErrWALCorrupt = fmt.Errorf("corrupted WAL entry")
|
ErrWALCorrupt = fmt.Errorf("corrupted WAL entry")
|
||||||
|
|
||||||
defaultWaitingWALWrites = runtime.GOMAXPROCS(0) * 2
|
defaultWaitingWALWrites = runtime.GOMAXPROCS(0) * 2
|
||||||
|
|
||||||
|
// bytePool is a shared bytes pool buffer re-cycle []byte slices to reduce allocations.
|
||||||
|
bytesPool = pool.NewLimitedBytes(256, walEncodeBufSize*2)
|
||||||
)
|
)
|
||||||
|
|
||||||
// Statistics gathered by the WAL.
|
// Statistics gathered by the WAL.
|
||||||
|
@ -383,20 +387,17 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) {
|
||||||
// limit how many concurrent encodings can be in flight. Since we can only
|
// limit how many concurrent encodings can be in flight. Since we can only
|
||||||
// write one at a time to disk, a slow disk can cause the allocations below
|
// write one at a time to disk, a slow disk can cause the allocations below
|
||||||
// to increase quickly. If we're backed up, wait until others have completed.
|
// to increase quickly. If we're backed up, wait until others have completed.
|
||||||
//l.limiter.Take()
|
bytes := bytesPool.Get(walEncodeBufSize)
|
||||||
//defer l.limiter.Release()
|
defer bytesPool.Put(bytes)
|
||||||
|
|
||||||
// encode and compress the entry while we're not locked
|
|
||||||
bytes := *(getBuf(walEncodeBufSize))
|
|
||||||
defer putBuf(&bytes)
|
|
||||||
|
|
||||||
b, err := entry.Encode(bytes)
|
b, err := entry.Encode(bytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return -1, err
|
return -1, err
|
||||||
}
|
}
|
||||||
|
|
||||||
encBuf := *(getBuf(snappy.MaxEncodedLen(len(b))))
|
encBuf := bytesPool.Get(snappy.MaxEncodedLen(len(b)))
|
||||||
defer putBuf(&encBuf)
|
defer bytesPool.Put(encBuf)
|
||||||
|
|
||||||
compressed := snappy.Encode(encBuf, b)
|
compressed := snappy.Encode(encBuf, b)
|
||||||
syncErr := make(chan error)
|
syncErr := make(chan error)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue