Switch WAL byte pools to sync/pool
The current bytes.Pool will hold onto byte slices indefinitely. Large writes can cause the pool to hold onto very large buffers over time. Testing w/ sync/pool seems to perform similarly now so using a sync/pool will allow these buffers to be GC'd when necessary.pull/8302/head
parent
d155d37ca8
commit
ef65ee77f4
|
@ -1,15 +1,26 @@
|
|||
package tsm1
|
||||
|
||||
import "github.com/influxdata/influxdb/pkg/pool"
|
||||
import "sync"
|
||||
|
||||
var bufPool = pool.NewBytes(10)
|
||||
var bufPool sync.Pool
|
||||
|
||||
// getBuf returns a buffer with length size from the buffer pool.
|
||||
func getBuf(size int) []byte {
|
||||
return bufPool.Get(size)
|
||||
func getBuf(size int) *[]byte {
|
||||
x := bufPool.Get()
|
||||
if x == nil {
|
||||
b := make([]byte, size)
|
||||
return &b
|
||||
}
|
||||
buf := x.(*[]byte)
|
||||
if cap(*buf) < size {
|
||||
b := make([]byte, size)
|
||||
return &b
|
||||
}
|
||||
*buf = (*buf)[:size]
|
||||
return buf
|
||||
}
|
||||
|
||||
// putBuf returns a buffer to the pool.
|
||||
func putBuf(buf []byte) {
|
||||
func putBuf(buf *[]byte) {
|
||||
bufPool.Put(buf)
|
||||
}
|
||||
|
|
|
@ -382,16 +382,16 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) {
|
|||
//defer l.limiter.Release()
|
||||
|
||||
// encode and compress the entry while we're not locked
|
||||
bytes := getBuf(walEncodeBufSize)
|
||||
defer putBuf(bytes)
|
||||
bytes := *(getBuf(walEncodeBufSize))
|
||||
defer putBuf(&bytes)
|
||||
|
||||
b, err := entry.Encode(bytes)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
encBuf := getBuf(snappy.MaxEncodedLen(len(b)))
|
||||
defer putBuf(encBuf)
|
||||
encBuf := *(getBuf(snappy.MaxEncodedLen(len(b))))
|
||||
defer putBuf(&encBuf)
|
||||
compressed := snappy.Encode(encBuf, b)
|
||||
syncErr := make(chan error)
|
||||
|
||||
|
@ -1031,8 +1031,8 @@ func NewWALSegmentReader(r io.ReadCloser) *WALSegmentReader {
|
|||
|
||||
// Next indicates if there is a value to read.
|
||||
func (r *WALSegmentReader) Next() bool {
|
||||
b := getBuf(defaultBufLen)
|
||||
defer putBuf(b)
|
||||
b := *(getBuf(defaultBufLen))
|
||||
defer putBuf(&b)
|
||||
var nReadOK int
|
||||
|
||||
// read the type and the length of the entry
|
||||
|
@ -1069,8 +1069,8 @@ func (r *WALSegmentReader) Next() bool {
|
|||
r.err = err
|
||||
return true
|
||||
}
|
||||
decBuf := getBuf(decLen)
|
||||
defer putBuf(decBuf)
|
||||
decBuf := *(getBuf(decLen))
|
||||
defer putBuf(&decBuf)
|
||||
|
||||
data, err := snappy.Decode(decBuf, b[:length])
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue