diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 12d9e1ee4e..51989b5300 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -1,6 +1,7 @@ package tsm1 import ( + "bufio" "encoding/binary" "errors" "fmt" @@ -740,89 +741,68 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { nvals := int(binary.BigEndian.Uint32(b[i : i+4])) i += 4 - values := make([]Value, nvals) switch typ { case float64EntryType: - for i := 0; i < nvals; i++ { - values[i] = FloatValue{} - } - case integerEntryType: - for i := 0; i < nvals; i++ { - values[i] = IntegerValue{} - } - case booleanEntryType: - for i := 0; i < nvals; i++ { - values[i] = BooleanValue{} - } - case stringEntryType: - for i := 0; i < nvals; i++ { - values[i] = StringValue{} - } - - default: - return fmt.Errorf("unsupported value type: %#v", typ) - } - - for j := 0; j < nvals; j++ { - if i+8 > len(b) { + if i+16*nvals > len(b) { return ErrWALCorrupt } - un := int64(binary.BigEndian.Uint64(b[i : i+8])) - i += 8 - - switch typ { - case float64EntryType: - if i+8 > len(b) { - return ErrWALCorrupt - } - + values := make([]Value, 0, nvals) + for j := 0; j < nvals; j++ { + un := int64(binary.BigEndian.Uint64(b[i : i+8])) + i += 8 v := math.Float64frombits((binary.BigEndian.Uint64(b[i : i+8]))) i += 8 - if fv, ok := values[j].(FloatValue); ok { - x := (&fv) - x.unixnano = un - x.value = v - values[j] = *x - } - case integerEntryType: - if i+8 > len(b) { - return ErrWALCorrupt - } + values = append(values, NewFloatValue(un, v)) + } + w.Values[k] = values + case integerEntryType: + if i+16*nvals > len(b) { + return ErrWALCorrupt + } + values := make([]Value, 0, nvals) + for j := 0; j < nvals; j++ { + un := int64(binary.BigEndian.Uint64(b[i : i+8])) + i += 8 v := int64(binary.BigEndian.Uint64(b[i : i+8])) i += 8 - if fv, ok := values[j].(IntegerValue); ok { - x := (&fv) - x.unixnano = un - x.value = v - values[j] = *x - } - case booleanEntryType: - if i >= len(b) { - return ErrWALCorrupt - } + values = append(values, NewIntegerValue(un, v)) + } + w.Values[k] = values + + case booleanEntryType: + if i+9*nvals > len(b) { + return ErrWALCorrupt + } + + values := make([]Value, 0, nvals) + for j := 0; j < nvals; j++ { + un := int64(binary.BigEndian.Uint64(b[i : i+8])) + i += 8 v := b[i] i += 1 - if fv, ok := values[j].(BooleanValue); ok { - x := (&fv) - x.unixnano = un - fv.unixnano = un - if v == 1 { - x.value = true - } else { - x.value = false - } - values[j] = *x + if v == 1 { + values = append(values, NewBooleanValue(un, true)) + } else { + values = append(values, NewBooleanValue(un, false)) } - case stringEntryType: - if i+4 > len(b) { + } + w.Values[k] = values + + case stringEntryType: + values := make([]Value, 0, nvals) + for j := 0; j < nvals; j++ { + if i+12 > len(b) { return ErrWALCorrupt } + un := int64(binary.BigEndian.Uint64(b[i : i+8])) + i += 8 + length := int(binary.BigEndian.Uint32(b[i : i+4])) - if i+length > int(uint32(len(b))) { + if i+length > len(b) { return ErrWALCorrupt } @@ -834,17 +814,13 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { v := string(b[i : i+length]) i += length - if fv, ok := values[j].(StringValue); ok { - x := (&fv) - x.unixnano = un - x.value = v - values[j] = *x - } - default: - return fmt.Errorf("unsupported value type: %#v", typ) + values = append(values, NewStringValue(un, v)) } + w.Values[k] = values + + default: + return fmt.Errorf("unsupported value type: %#v", typ) } - w.Values[k] = values } return nil } @@ -1016,7 +992,8 @@ func (w *WALSegmentWriter) close() error { // WALSegmentReader reads WAL segments. type WALSegmentReader struct { - r io.ReadCloser + rc io.ReadCloser + r io.Reader entry WALEntry n int64 err error @@ -1025,18 +1002,18 @@ type WALSegmentReader struct { // NewWALSegmentReader returns a new WALSegmentReader reading from r. func NewWALSegmentReader(r io.ReadCloser) *WALSegmentReader { return &WALSegmentReader{ - r: r, + rc: r, + r: bufio.NewReaderSize(r, 1024*1024), } } // Next indicates if there is a value to read. func (r *WALSegmentReader) Next() bool { - b := *(getBuf(defaultBufLen)) - defer putBuf(&b) var nReadOK int // read the type and the length of the entry - n, err := io.ReadFull(r.r, b[:5]) + var lv [5]byte + n, err := io.ReadFull(r.r, lv[:]) if err == io.EOF { return false } @@ -1049,14 +1026,13 @@ func (r *WALSegmentReader) Next() bool { } nReadOK += n - entryType := b[0] - length := binary.BigEndian.Uint32(b[1:5]) + entryType := lv[0] + length := binary.BigEndian.Uint32(lv[1:5]) + + b := *(getBuf(int(length))) + defer putBuf(&b) // read the compressed block and decompress it - if int(length) > len(b) { - b = make([]byte, length) - } - n, err = io.ReadFull(r.r, b[:length]) if err != nil { r.err = err @@ -1082,7 +1058,7 @@ func (r *WALSegmentReader) Next() bool { switch WalEntryType(entryType) { case WriteWALEntryType: r.entry = &WriteWALEntry{ - Values: map[string][]Value{}, + Values: make(map[string][]Value), } case DeleteWALEntryType: r.entry = &DeleteWALEntry{} @@ -1123,7 +1099,7 @@ func (r *WALSegmentReader) Error() error { // Close closes the underlying io.Reader. func (r *WALSegmentReader) Close() error { - return r.r.Close() + return r.rc.Close() } // idFromFileName parses the segment file ID from its name.