Unit test TopicReader and truncation

pull/2433/head
Philip O'Toole 2015-04-25 10:48:56 -07:00 committed by Philip O'Toole
parent 8ab44301b9
commit 9635811efa
2 changed files with 68 additions and 2 deletions

View File

@ -1260,7 +1260,7 @@ func (r *TopicReader) Read(p []byte) (int, error) {
if err == ErrReaderClosed {
return 0, io.EOF
} else if err != nil {
return 0, fmt.Errorf("file: %s", err)
return 0, err
} else if f == nil {
if r.streaming {
time.Sleep(r.PollInterval)
@ -1306,6 +1306,8 @@ func (r *TopicReader) File() (*os.File, error) {
segment, err := ReadSegmentByIndex(r.path, r.index)
if os.IsNotExist(err) {
return nil, nil
} else if err == ErrTopicTruncated {
return nil, err
} else if err != nil {
return nil, fmt.Errorf("segment by index: %s", err)
} else if segment == nil {
@ -1551,7 +1553,7 @@ func (dec *MessageDecoder) Decode(m *Message) error {
warnf("unexpected eof(0): len=%d, n=%d, err=%s", len(b[:]), n, err)
return err
} else if err != nil {
return fmt.Errorf("read header: %s", err)
return err
}
// Read checksum.

View File

@ -797,6 +797,70 @@ func TestTopicReader_streaming(t *testing.T) {
r.Close()
}
// Ensure a topic reader correctly deals with truncated topics.
func TestTopicReader_Truncated(t *testing.T) {
path, _ := ioutil.TempDir("", "")
defer os.RemoveAll(path)
// Generate segments in directory.
MustWriteFile(filepath.Join(path, "6"),
MustMarshalMessages([]*messaging.Message{
{Index: 6},
{Index: 7},
{Index: 10},
}),
)
MustWriteFile(filepath.Join(path, "12"),
MustMarshalMessages([]*messaging.Message{
{Index: 12},
}),
)
// Simulate a prior truncation.
MustWriteFile(filepath.Join(path, "tombstone"), []byte{})
// Execute table tests.
for i, tt := range []struct {
index uint64 // starting index
results []uint64 // returned indices
err error // expected error. Ignored if not set.
}{
{index: 0, results: []uint64{}, err: messaging.ErrTopicTruncated},
{index: 6, results: []uint64{6, 7, 10, 12}},
{index: 7, results: []uint64{7, 10, 12}},
{index: 9, results: []uint64{10, 12}},
{index: 10, results: []uint64{10, 12}},
{index: 11, results: []uint64{12}},
{index: 12, results: []uint64{12}},
{index: 13, results: []uint64{}},
} {
// Start topic reader from an index.
r := messaging.NewTopicReader(path, tt.index, false)
// Slurp all message ids from the reader.
results := make([]uint64, 0)
dec := messaging.NewMessageDecoder(r)
for {
m := &messaging.Message{}
if err := dec.Decode(m); err == io.EOF {
break
} else if err != nil {
if tt.err != nil && tt.err == err {
break
}
t.Fatalf("%d. decode error: %s", i, err)
} else {
results = append(results, m.Index)
}
}
// Verify the retrieved indices match what's expected.
if !reflect.DeepEqual(results, tt.results) {
t.Fatalf("%d. %v: result mismatch:\n\nexp=%#v\n\ngot=%#v", i, tt.index, tt.results, results)
}
}
}
// Ensure multiple topic readers can read from the same topic directory.
func BenchmarkTopicReaderStreaming(b *testing.B) {
path, _ := ioutil.TempDir("", "")