Merge pull request #8305 from influxdata/jw-wal

Optimize WAL segment reading
pull/7544/merge
Jason Wilder 2017-04-20 14:36:47 -06:00 committed by GitHub
commit fce183f21a
1 changed files with 63 additions and 87 deletions

View File

@ -1,6 +1,7 @@
package tsm1
import (
"bufio"
"encoding/binary"
"errors"
"fmt"
@ -740,89 +741,68 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error {
nvals := int(binary.BigEndian.Uint32(b[i : i+4]))
i += 4
values := make([]Value, nvals)
switch typ {
case float64EntryType:
for i := 0; i < nvals; i++ {
values[i] = FloatValue{}
}
case integerEntryType:
for i := 0; i < nvals; i++ {
values[i] = IntegerValue{}
}
case booleanEntryType:
for i := 0; i < nvals; i++ {
values[i] = BooleanValue{}
}
case stringEntryType:
for i := 0; i < nvals; i++ {
values[i] = StringValue{}
}
default:
return fmt.Errorf("unsupported value type: %#v", typ)
if i+16*nvals > len(b) {
return ErrWALCorrupt
}
values := make([]Value, 0, nvals)
for j := 0; j < nvals; j++ {
if i+8 > len(b) {
un := int64(binary.BigEndian.Uint64(b[i : i+8]))
i += 8
v := math.Float64frombits((binary.BigEndian.Uint64(b[i : i+8])))
i += 8
values = append(values, NewFloatValue(un, v))
}
w.Values[k] = values
case integerEntryType:
if i+16*nvals > len(b) {
return ErrWALCorrupt
}
values := make([]Value, 0, nvals)
for j := 0; j < nvals; j++ {
un := int64(binary.BigEndian.Uint64(b[i : i+8]))
i += 8
v := int64(binary.BigEndian.Uint64(b[i : i+8]))
i += 8
values = append(values, NewIntegerValue(un, v))
}
w.Values[k] = values
case booleanEntryType:
if i+9*nvals > len(b) {
return ErrWALCorrupt
}
values := make([]Value, 0, nvals)
for j := 0; j < nvals; j++ {
un := int64(binary.BigEndian.Uint64(b[i : i+8]))
i += 8
v := b[i]
i += 1
if v == 1 {
values = append(values, NewBooleanValue(un, true))
} else {
values = append(values, NewBooleanValue(un, false))
}
}
w.Values[k] = values
case stringEntryType:
values := make([]Value, 0, nvals)
for j := 0; j < nvals; j++ {
if i+12 > len(b) {
return ErrWALCorrupt
}
un := int64(binary.BigEndian.Uint64(b[i : i+8]))
i += 8
switch typ {
case float64EntryType:
if i+8 > len(b) {
return ErrWALCorrupt
}
v := math.Float64frombits((binary.BigEndian.Uint64(b[i : i+8])))
i += 8
if fv, ok := values[j].(FloatValue); ok {
x := (&fv)
x.unixnano = un
x.value = v
values[j] = *x
}
case integerEntryType:
if i+8 > len(b) {
return ErrWALCorrupt
}
v := int64(binary.BigEndian.Uint64(b[i : i+8]))
i += 8
if fv, ok := values[j].(IntegerValue); ok {
x := (&fv)
x.unixnano = un
x.value = v
values[j] = *x
}
case booleanEntryType:
if i >= len(b) {
return ErrWALCorrupt
}
v := b[i]
i += 1
if fv, ok := values[j].(BooleanValue); ok {
x := (&fv)
x.unixnano = un
fv.unixnano = un
if v == 1 {
x.value = true
} else {
x.value = false
}
values[j] = *x
}
case stringEntryType:
if i+4 > len(b) {
return ErrWALCorrupt
}
length := int(binary.BigEndian.Uint32(b[i : i+4]))
if i+length > int(uint32(len(b))) {
if i+length > len(b) {
return ErrWALCorrupt
}
@ -834,18 +814,14 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error {
v := string(b[i : i+length])
i += length
if fv, ok := values[j].(StringValue); ok {
x := (&fv)
x.unixnano = un
x.value = v
values[j] = *x
values = append(values, NewStringValue(un, v))
}
w.Values[k] = values
default:
return fmt.Errorf("unsupported value type: %#v", typ)
}
}
w.Values[k] = values
}
return nil
}
@ -1016,7 +992,8 @@ func (w *WALSegmentWriter) close() error {
// WALSegmentReader reads WAL segments.
type WALSegmentReader struct {
r io.ReadCloser
rc io.ReadCloser
r io.Reader
entry WALEntry
n int64
err error
@ -1025,18 +1002,18 @@ type WALSegmentReader struct {
// NewWALSegmentReader returns a new WALSegmentReader reading from r.
func NewWALSegmentReader(r io.ReadCloser) *WALSegmentReader {
return &WALSegmentReader{
r: r,
rc: r,
r: bufio.NewReaderSize(r, 1024*1024),
}
}
// Next indicates if there is a value to read.
func (r *WALSegmentReader) Next() bool {
b := *(getBuf(defaultBufLen))
defer putBuf(&b)
var nReadOK int
// read the type and the length of the entry
n, err := io.ReadFull(r.r, b[:5])
var lv [5]byte
n, err := io.ReadFull(r.r, lv[:])
if err == io.EOF {
return false
}
@ -1049,14 +1026,13 @@ func (r *WALSegmentReader) Next() bool {
}
nReadOK += n
entryType := b[0]
length := binary.BigEndian.Uint32(b[1:5])
entryType := lv[0]
length := binary.BigEndian.Uint32(lv[1:5])
b := *(getBuf(int(length)))
defer putBuf(&b)
// read the compressed block and decompress it
if int(length) > len(b) {
b = make([]byte, length)
}
n, err = io.ReadFull(r.r, b[:length])
if err != nil {
r.err = err
@ -1082,7 +1058,7 @@ func (r *WALSegmentReader) Next() bool {
switch WalEntryType(entryType) {
case WriteWALEntryType:
r.entry = &WriteWALEntry{
Values: map[string][]Value{},
Values: make(map[string][]Value),
}
case DeleteWALEntryType:
r.entry = &DeleteWALEntry{}
@ -1123,7 +1099,7 @@ func (r *WALSegmentReader) Error() error {
// Close closes the underlying io.Reader.
func (r *WALSegmentReader) Close() error {
return r.r.Close()
return r.rc.Close()
}
// idFromFileName parses the segment file ID from its name.