diff --git a/messaging/broker.go b/messaging/broker.go index 73f13f602b..4d0408ca95 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -1260,7 +1260,7 @@ func (r *TopicReader) Read(p []byte) (int, error) { if err == ErrReaderClosed { return 0, io.EOF } else if err != nil { - return 0, fmt.Errorf("file: %s", err) + return 0, err } else if f == nil { if r.streaming { time.Sleep(r.PollInterval) @@ -1306,6 +1306,8 @@ func (r *TopicReader) File() (*os.File, error) { segment, err := ReadSegmentByIndex(r.path, r.index) if os.IsNotExist(err) { return nil, nil + } else if err == ErrTopicTruncated { + return nil, err } else if err != nil { return nil, fmt.Errorf("segment by index: %s", err) } else if segment == nil { @@ -1551,7 +1553,7 @@ func (dec *MessageDecoder) Decode(m *Message) error { warnf("unexpected eof(0): len=%d, n=%d, err=%s", len(b[:]), n, err) return err } else if err != nil { - return fmt.Errorf("read header: %s", err) + return err } // Read checksum. diff --git a/messaging/broker_test.go b/messaging/broker_test.go index 55c37750ec..b55316bd7a 100644 --- a/messaging/broker_test.go +++ b/messaging/broker_test.go @@ -797,6 +797,70 @@ func TestTopicReader_streaming(t *testing.T) { r.Close() } +// Ensure a topic reader correctly deals with truncated topics. +func TestTopicReader_Truncated(t *testing.T) { + path, _ := ioutil.TempDir("", "") + defer os.RemoveAll(path) + + // Generate segments in directory. + MustWriteFile(filepath.Join(path, "6"), + MustMarshalMessages([]*messaging.Message{ + {Index: 6}, + {Index: 7}, + {Index: 10}, + }), + ) + MustWriteFile(filepath.Join(path, "12"), + MustMarshalMessages([]*messaging.Message{ + {Index: 12}, + }), + ) + + // Simulate a prior truncation. + MustWriteFile(filepath.Join(path, "tombstone"), []byte{}) + + // Execute table tests. + for i, tt := range []struct { + index uint64 // starting index + results []uint64 // returned indices + err error // expected error. Ignored if not set. + }{ + {index: 0, results: []uint64{}, err: messaging.ErrTopicTruncated}, + {index: 6, results: []uint64{6, 7, 10, 12}}, + {index: 7, results: []uint64{7, 10, 12}}, + {index: 9, results: []uint64{10, 12}}, + {index: 10, results: []uint64{10, 12}}, + {index: 11, results: []uint64{12}}, + {index: 12, results: []uint64{12}}, + {index: 13, results: []uint64{}}, + } { + // Start topic reader from an index. + r := messaging.NewTopicReader(path, tt.index, false) + + // Slurp all message ids from the reader. + results := make([]uint64, 0) + dec := messaging.NewMessageDecoder(r) + for { + m := &messaging.Message{} + if err := dec.Decode(m); err == io.EOF { + break + } else if err != nil { + if tt.err != nil && tt.err == err { + break + } + t.Fatalf("%d. decode error: %s", i, err) + } else { + results = append(results, m.Index) + } + } + + // Verify the retrieved indices match what's expected. + if !reflect.DeepEqual(results, tt.results) { + t.Fatalf("%d. %v: result mismatch:\n\nexp=%#v\n\ngot=%#v", i, tt.index, tt.results, results) + } + } +} + // Ensure multiple topic readers can read from the same topic directory. func BenchmarkTopicReaderStreaming(b *testing.B) { path, _ := ioutil.TempDir("", "")