diff --git a/tsdb/series_segment.go b/tsdb/series_segment.go index 0fab853f8f..b91ebe018c 100644 --- a/tsdb/series_segment.go +++ b/tsdb/series_segment.go @@ -118,7 +118,7 @@ func (s *SeriesSegment) InitForWrite() (err error) { // Only calculcate segment data size if writing. for s.size = uint32(SeriesSegmentHeaderSize); s.size < uint32(len(s.data)); { flag, _, _, sz := ReadSeriesEntry(s.data[s.size:]) - if flag == 0 { + if !IsValidSeriesEntryFlag(flag) { break } s.size += uint32(sz) @@ -237,7 +237,7 @@ func (s *SeriesSegment) MaxSeriesID() uint64 { func (s *SeriesSegment) ForEachEntry(fn func(flag uint8, id uint64, offset int64, key []byte) error) error { for pos := uint32(SeriesSegmentHeaderSize); pos < uint32(len(s.data)); { flag, id, key, sz := ReadSeriesEntry(s.data[pos:]) - if flag == 0 { + if !IsValidSeriesEntryFlag(flag) { break } @@ -368,7 +368,7 @@ func (hdr *SeriesSegmentHeader) WriteTo(w io.Writer) (n int64, err error) { func ReadSeriesEntry(data []byte) (flag uint8, id uint64, key []byte, sz int64) { // If flag byte is zero then no more entries exist. flag, data = uint8(data[0]), data[1:] - if flag == 0 { + if !IsValidSeriesEntryFlag(flag) { return 0, 0, nil, 1 } @@ -396,3 +396,13 @@ func AppendSeriesEntry(dst []byte, flag uint8, id uint64, key []byte) []byte { } return dst } + +// IsValidSeriesEntryFlag returns true if flag is valid. +func IsValidSeriesEntryFlag(flag byte) bool { + switch flag { + case SeriesEntryInsertFlag, SeriesEntryTombstoneFlag: + return true + default: + return false + } +} diff --git a/tsdb/series_segment_test.go b/tsdb/series_segment_test.go index fe4f87c8fd..a98eab06dd 100644 --- a/tsdb/series_segment_test.go +++ b/tsdb/series_segment_test.go @@ -2,6 +2,7 @@ package tsdb_test import ( "bytes" + "os" "path/filepath" "testing" @@ -140,6 +141,49 @@ func TestSeriesSegmentHeader(t *testing.T) { } } +func TestSeriesSegment_PartialWrite(t *testing.T) { + dir, cleanup := MustTempDir() + defer cleanup() + + // Create a new initial segment (4mb) and initialize for writing. + segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000")) + if err != nil { + t.Fatal(err) + } else if err := segment.InitForWrite(); err != nil { + t.Fatal(err) + } + defer segment.Close() + + // Write two entries. + if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1, tsdb.AppendSeriesKey(nil, []byte("A"), nil))); err != nil { + t.Fatal(err) + } else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil))); err != nil { + t.Fatal(err) + } + sz := segment.Size() + entrySize := len(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil))) + + // Close segment. + if err := segment.Close(); err != nil { + t.Fatal(err) + } + + // Truncate at each point and reopen. + for i := entrySize; i > 0; i-- { + if err := os.Truncate(filepath.Join(dir, "0000"), sz-int64(entrySize-i)); err != nil { + t.Fatal(err) + } + segment := tsdb.NewSeriesSegment(0, filepath.Join(dir, "0000")) + if err := segment.Open(); err != nil { + t.Fatal(err) + } else if err := segment.InitForWrite(); err != nil { + t.Fatal(err) + } else if err := segment.Close(); err != nil { + t.Fatal(err) + } + } +} + func TestJoinSeriesOffset(t *testing.T) { if offset := tsdb.JoinSeriesOffset(0x1234, 0x56789ABC); offset != 0x123456789ABC { t.Fatalf("unexpected offset: %x", offset)