WALSegmentReader counts bytes read without error

pull/5006/head
Philip O'Toole 2015-12-05 17:44:40 -08:00
parent c67831bc79
commit 4b5fb8db72
2 changed files with 88 additions and 2 deletions

View File

@ -602,6 +602,7 @@ func (w *WALSegmentWriter) Close() error {
type WALSegmentReader struct {
r io.ReadCloser
entry WALEntry
n int64
err error
}
@ -615,9 +616,10 @@ func NewWALSegmentReader(r io.ReadCloser) *WALSegmentReader {
func (r *WALSegmentReader) Next() bool {
b := getBuf(defaultBufLen)
defer putBuf(b)
var nReadOK int
// read the type and the length of the entry
_, err := io.ReadFull(r.r, b[:5])
n, err := io.ReadFull(r.r, b[:5])
if err == io.EOF {
return false
}
@ -628,6 +630,7 @@ func (r *WALSegmentReader) Next() bool {
// will return the this error to be handled.
return true
}
nReadOK += n
entryType := b[0]
length := btou32(b[1:5])
@ -637,11 +640,12 @@ func (r *WALSegmentReader) Next() bool {
b = make([]byte, length)
}
_, err = io.ReadFull(r.r, b[:length])
n, err = io.ReadFull(r.r, b[:length])
if err != nil {
r.err = err
return true
}
nReadOK += n
data, err := snappy.Decode(nil, b[:length])
if err != nil {
@ -662,6 +666,10 @@ func (r *WALSegmentReader) Next() bool {
return true
}
r.err = r.entry.UnmarshalBinary(data)
if r.err == nil {
// Read and decode of this entry was successful.
r.n += int64(nReadOK)
}
return true
}
@ -673,6 +681,13 @@ func (r *WALSegmentReader) Read() (WALEntry, error) {
return r.entry, nil
}
// Count returns the total number of bytes read successfully from the segment, as
// of the last call to Read(). The segment is guaranteed to be valid up to and
// including this number of bytes.
func (r *WALSegmentReader) Count() int64 {
return r.n
}
func (r *WALSegmentReader) Error() error {
return r.err
}

View File

@ -1,6 +1,7 @@
package tsm1_test
import (
"fmt"
"os"
"testing"
"time"
@ -61,6 +62,10 @@ func TestWALWriter_WritePoints_Single(t *testing.T) {
}
}
}
if n := r.Count(); n != MustReadFileSize(f) {
t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f))
}
}
func TestWALWriter_WritePoints_Multiple(t *testing.T) {
@ -128,6 +133,10 @@ func TestWALWriter_WritePoints_Multiple(t *testing.T) {
}
}
}
if n := r.Count(); n != MustReadFileSize(f) {
t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f))
}
}
func TestWALWriter_WriteDelete_Single(t *testing.T) {
@ -347,6 +356,59 @@ func TestWAL_Delete(t *testing.T) {
}
}
func TestWALWriter_Corrupt(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w := tsm1.NewWALSegmentWriter(f)
corruption := []byte{1, 4, 0, 0, 0}
p1 := tsm1.NewValue(time.Unix(1, 0), 1.1)
values := map[string][]tsm1.Value{
"cpu,host=A#!~#float": []tsm1.Value{p1},
}
entry := &tsm1.WriteWALEntry{
Values: values,
}
if err := w.Write(entry); err != nil {
fatal(t, "write points", err)
}
// Write some random bytes to the file to simulate corruption.
if _, err := f.Write(corruption); err != nil {
fatal(t, "corrupt WAL segment", err)
}
// Create the WAL segment reader.
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
fatal(t, "seek", err)
}
r := tsm1.NewWALSegmentReader(f)
// Try to decode two entries.
if !r.Next() {
t.Fatalf("expected next, got false")
}
if _, err := r.Read(); err != nil {
fatal(t, "read entry", err)
}
if !r.Next() {
t.Fatalf("expected next, got false")
}
if _, err := r.Read(); err == nil {
fatal(t, "read entry did not return err", nil)
}
// Count should only return size of valid data.
expCount := MustReadFileSize(f) - int64(len(corruption))
if n := r.Count(); n != expCount {
t.Fatalf("wrong count of bytes read, got %d, exp %d", n, expCount)
}
}
func BenchmarkWALSegmentWriter(b *testing.B) {
points := map[string][]tsm1.Value{}
for i := 0; i < 5000; i++ {
@ -411,3 +473,12 @@ func BenchmarkWALSegmentReader(b *testing.B) {
}
}
}
// MustReadFileSize returns the size of the file, or panics.
func MustReadFileSize(f *os.File) int64 {
stat, err := os.Stat(f.Name())
if err != nil {
panic(fmt.Sprintf("failed to get size of file at %s: %s", f.Name(), err.Error()))
}
return stat.Size()
}