From 8e9cbd7ffcc30fa9bd049405c4aeac782ddd140d Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 18 Apr 2017 23:45:45 -0600 Subject: [PATCH 1/5] Simplify WALSegmentReader.UnmarshalBinary There were two loops over nvals which created some extra allocation which coudl be replaced with a simplet slice capacity and append. --- tsdb/engine/tsm1/wal.go | 61 ++++++----------------------------------- 1 file changed, 9 insertions(+), 52 deletions(-) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 12d9e1ee4e..367f8e1eab 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -740,29 +740,7 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { nvals := int(binary.BigEndian.Uint32(b[i : i+4])) i += 4 - values := make([]Value, nvals) - switch typ { - case float64EntryType: - for i := 0; i < nvals; i++ { - values[i] = FloatValue{} - } - case integerEntryType: - for i := 0; i < nvals; i++ { - values[i] = IntegerValue{} - } - case booleanEntryType: - for i := 0; i < nvals; i++ { - values[i] = BooleanValue{} - } - case stringEntryType: - for i := 0; i < nvals; i++ { - values[i] = StringValue{} - } - - default: - return fmt.Errorf("unsupported value type: %#v", typ) - } - + values := make([]Value, 0, nvals) for j := 0; j < nvals; j++ { if i+8 > len(b) { return ErrWALCorrupt @@ -779,12 +757,7 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { v := math.Float64frombits((binary.BigEndian.Uint64(b[i : i+8]))) i += 8 - if fv, ok := values[j].(FloatValue); ok { - x := (&fv) - x.unixnano = un - x.value = v - values[j] = *x - } + values = append(values, NewFloatValue(un, v)) case integerEntryType: if i+8 > len(b) { return ErrWALCorrupt @@ -792,12 +765,7 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { v := int64(binary.BigEndian.Uint64(b[i : i+8])) i += 8 - if fv, ok := values[j].(IntegerValue); ok { - x := (&fv) - x.unixnano = un - x.value = v - values[j] = *x - } + values = append(values, NewIntegerValue(un, v)) case booleanEntryType: if i >= len(b) { return ErrWALCorrupt @@ -805,16 +773,10 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { v := b[i] i += 1 - if fv, ok := values[j].(BooleanValue); ok { - x := (&fv) - x.unixnano = un - fv.unixnano = un - if v == 1 { - x.value = true - } else { - x.value = false - } - values[j] = *x + if v == 1 { + values = append(values, NewBooleanValue(un, true)) + } else { + values = append(values, NewBooleanValue(un, false)) } case stringEntryType: if i+4 > len(b) { @@ -834,12 +796,7 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { v := string(b[i : i+length]) i += length - if fv, ok := values[j].(StringValue); ok { - x := (&fv) - x.unixnano = un - x.value = v - values[j] = *x - } + values = append(values, NewStringValue(un, v)) default: return fmt.Errorf("unsupported value type: %#v", typ) } @@ -1082,7 +1039,7 @@ func (r *WALSegmentReader) Next() bool { switch WalEntryType(entryType) { case WriteWALEntryType: r.entry = &WriteWALEntry{ - Values: map[string][]Value{}, + Values: make(map[string][]Value), } case DeleteWALEntryType: r.entry = &DeleteWALEntry{} From da6bdfdda807272e4b0b81191b381333bf6eb7e4 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 19 Apr 2017 10:28:16 -0600 Subject: [PATCH 2/5] Use bufio.Reader when reading wal segments Reduces disk IO due to small reads. --- tsdb/engine/tsm1/wal.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 367f8e1eab..b26a289838 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -1,6 +1,7 @@ package tsm1 import ( + "bufio" "encoding/binary" "errors" "fmt" @@ -973,7 +974,8 @@ func (w *WALSegmentWriter) close() error { // WALSegmentReader reads WAL segments. type WALSegmentReader struct { - r io.ReadCloser + rc io.ReadCloser + r io.Reader entry WALEntry n int64 err error @@ -982,7 +984,8 @@ type WALSegmentReader struct { // NewWALSegmentReader returns a new WALSegmentReader reading from r. func NewWALSegmentReader(r io.ReadCloser) *WALSegmentReader { return &WALSegmentReader{ - r: r, + rc: r, + r: bufio.NewReaderSize(r, 1024*1024), } } @@ -1080,7 +1083,7 @@ func (r *WALSegmentReader) Error() error { // Close closes the underlying io.Reader. func (r *WALSegmentReader) Close() error { - return r.r.Close() + return r.rc.Close() } // idFromFileName parses the segment file ID from its name. From b0988511bff32d13ade131518ebf750d65cfee01 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 19 Apr 2017 10:39:34 -0600 Subject: [PATCH 3/5] Use fixed size array instead of slice --- tsdb/engine/tsm1/wal.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index b26a289838..8eb0c725ca 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -991,12 +991,11 @@ func NewWALSegmentReader(r io.ReadCloser) *WALSegmentReader { // Next indicates if there is a value to read. func (r *WALSegmentReader) Next() bool { - b := *(getBuf(defaultBufLen)) - defer putBuf(&b) var nReadOK int // read the type and the length of the entry - n, err := io.ReadFull(r.r, b[:5]) + var lv [5]byte + n, err := io.ReadFull(r.r, lv[:]) if err == io.EOF { return false } @@ -1009,14 +1008,13 @@ func (r *WALSegmentReader) Next() bool { } nReadOK += n - entryType := b[0] - length := binary.BigEndian.Uint32(b[1:5]) + entryType := lv[0] + length := binary.BigEndian.Uint32(lv[1:5]) + + b := *(getBuf(int(length))) + defer putBuf(&b) // read the compressed block and decompress it - if int(length) > len(b) { - b = make([]byte, length) - } - n, err = io.ReadFull(r.r, b[:length]) if err != nil { r.err = err From 888689f5d3dc35204927b9532ff983c6e88dc96f Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 19 Apr 2017 10:45:01 -0600 Subject: [PATCH 4/5] Move values loop under type switch All the values read must be of the same type so repeatedly using the type switch is confusing and less efficiient. --- tsdb/engine/tsm1/wal.go | 63 +++++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 8eb0c725ca..345e3dbbff 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -741,37 +741,49 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { nvals := int(binary.BigEndian.Uint32(b[i : i+4])) i += 4 - values := make([]Value, 0, nvals) - for j := 0; j < nvals; j++ { - if i+8 > len(b) { - return ErrWALCorrupt - } - - un := int64(binary.BigEndian.Uint64(b[i : i+8])) - i += 8 - - switch typ { - case float64EntryType: - if i+8 > len(b) { + switch typ { + case float64EntryType: + values := make([]Value, 0, nvals) + for j := 0; j < nvals; j++ { + if i+16 > len(b) { return ErrWALCorrupt } + un := int64(binary.BigEndian.Uint64(b[i : i+8])) + i += 8 + v := math.Float64frombits((binary.BigEndian.Uint64(b[i : i+8]))) i += 8 + values = append(values, NewFloatValue(un, v)) - case integerEntryType: - if i+8 > len(b) { + } + w.Values[k] = values + case integerEntryType: + values := make([]Value, 0, nvals) + for j := 0; j < nvals; j++ { + if i+16 > len(b) { return ErrWALCorrupt } + un := int64(binary.BigEndian.Uint64(b[i : i+8])) + i += 8 + v := int64(binary.BigEndian.Uint64(b[i : i+8])) i += 8 values = append(values, NewIntegerValue(un, v)) - case booleanEntryType: - if i >= len(b) { + } + w.Values[k] = values + + case booleanEntryType: + values := make([]Value, 0, nvals) + for j := 0; j < nvals; j++ { + if i+9 > len(b) { return ErrWALCorrupt } + un := int64(binary.BigEndian.Uint64(b[i : i+8])) + i += 8 + v := b[i] i += 1 if v == 1 { @@ -779,11 +791,19 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { } else { values = append(values, NewBooleanValue(un, false)) } - case stringEntryType: - if i+4 > len(b) { + } + w.Values[k] = values + + case stringEntryType: + values := make([]Value, 0, nvals) + for j := 0; j < nvals; j++ { + if i+12 > len(b) { return ErrWALCorrupt } + un := int64(binary.BigEndian.Uint64(b[i : i+8])) + i += 8 + length := int(binary.BigEndian.Uint32(b[i : i+4])) if i+length > int(uint32(len(b))) { return ErrWALCorrupt @@ -798,11 +818,12 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { v := string(b[i : i+length]) i += length values = append(values, NewStringValue(un, v)) - default: - return fmt.Errorf("unsupported value type: %#v", typ) } + w.Values[k] = values + + default: + return fmt.Errorf("unsupported value type: %#v", typ) } - w.Values[k] = values } return nil } From d88604f6f20ec785b78102d8e22dd7dfb67022ec Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 20 Apr 2017 13:45:04 -0600 Subject: [PATCH 5/5] Move repetive loop checks outside of values loop --- tsdb/engine/tsm1/wal.go | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 345e3dbbff..51989b5300 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -743,31 +743,28 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { switch typ { case float64EntryType: + if i+16*nvals > len(b) { + return ErrWALCorrupt + } + values := make([]Value, 0, nvals) for j := 0; j < nvals; j++ { - if i+16 > len(b) { - return ErrWALCorrupt - } - un := int64(binary.BigEndian.Uint64(b[i : i+8])) i += 8 - v := math.Float64frombits((binary.BigEndian.Uint64(b[i : i+8]))) i += 8 - values = append(values, NewFloatValue(un, v)) } w.Values[k] = values case integerEntryType: + if i+16*nvals > len(b) { + return ErrWALCorrupt + } + values := make([]Value, 0, nvals) for j := 0; j < nvals; j++ { - if i+16 > len(b) { - return ErrWALCorrupt - } - un := int64(binary.BigEndian.Uint64(b[i : i+8])) i += 8 - v := int64(binary.BigEndian.Uint64(b[i : i+8])) i += 8 values = append(values, NewIntegerValue(un, v)) @@ -775,12 +772,12 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { w.Values[k] = values case booleanEntryType: + if i+9*nvals > len(b) { + return ErrWALCorrupt + } + values := make([]Value, 0, nvals) for j := 0; j < nvals; j++ { - if i+9 > len(b) { - return ErrWALCorrupt - } - un := int64(binary.BigEndian.Uint64(b[i : i+8])) i += 8 @@ -805,7 +802,7 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { i += 8 length := int(binary.BigEndian.Uint32(b[i : i+4])) - if i+length > int(uint32(len(b))) { + if i+length > len(b) { return ErrWALCorrupt }