Merge pull request #10002 from influxdata/bj-series-segment-recovery

Improve series segment recovery
pull/10005/head
Ben Johnson 2018-06-26 11:27:03 -06:00 committed by GitHub
commit c093922b75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 3 deletions

View File

@ -118,7 +118,7 @@ func (s *SeriesSegment) InitForWrite() (err error) {
// Only calculcate segment data size if writing.
for s.size = uint32(SeriesSegmentHeaderSize); s.size < uint32(len(s.data)); {
flag, _, _, sz := ReadSeriesEntry(s.data[s.size:])
if flag == 0 {
if !IsValidSeriesEntryFlag(flag) {
break
}
s.size += uint32(sz)
@ -237,7 +237,7 @@ func (s *SeriesSegment) MaxSeriesID() uint64 {
func (s *SeriesSegment) ForEachEntry(fn func(flag uint8, id uint64, offset int64, key []byte) error) error {
for pos := uint32(SeriesSegmentHeaderSize); pos < uint32(len(s.data)); {
flag, id, key, sz := ReadSeriesEntry(s.data[pos:])
if flag == 0 {
if !IsValidSeriesEntryFlag(flag) {
break
}
@ -368,7 +368,7 @@ func (hdr *SeriesSegmentHeader) WriteTo(w io.Writer) (n int64, err error) {
func ReadSeriesEntry(data []byte) (flag uint8, id uint64, key []byte, sz int64) {
// If flag byte is zero then no more entries exist.
flag, data = uint8(data[0]), data[1:]
if flag == 0 {
if !IsValidSeriesEntryFlag(flag) {
return 0, 0, nil, 1
}
@ -396,3 +396,13 @@ func AppendSeriesEntry(dst []byte, flag uint8, id uint64, key []byte) []byte {
}
return dst
}
// IsValidSeriesEntryFlag returns true if flag is valid.
func IsValidSeriesEntryFlag(flag byte) bool {
switch flag {
case SeriesEntryInsertFlag, SeriesEntryTombstoneFlag:
return true
default:
return false
}
}

View File

@ -2,6 +2,7 @@ package tsdb_test
import (
"bytes"
"os"
"path/filepath"
"testing"
@ -140,6 +141,49 @@ func TestSeriesSegmentHeader(t *testing.T) {
}
}
func TestSeriesSegment_PartialWrite(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
// Create a new initial segment (4mb) and initialize for writing.
segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000"))
if err != nil {
t.Fatal(err)
} else if err := segment.InitForWrite(); err != nil {
t.Fatal(err)
}
defer segment.Close()
// Write two entries.
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1, tsdb.AppendSeriesKey(nil, []byte("A"), nil))); err != nil {
t.Fatal(err)
} else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil))); err != nil {
t.Fatal(err)
}
sz := segment.Size()
entrySize := len(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil)))
// Close segment.
if err := segment.Close(); err != nil {
t.Fatal(err)
}
// Truncate at each point and reopen.
for i := entrySize; i > 0; i-- {
if err := os.Truncate(filepath.Join(dir, "0000"), sz-int64(entrySize-i)); err != nil {
t.Fatal(err)
}
segment := tsdb.NewSeriesSegment(0, filepath.Join(dir, "0000"))
if err := segment.Open(); err != nil {
t.Fatal(err)
} else if err := segment.InitForWrite(); err != nil {
t.Fatal(err)
} else if err := segment.Close(); err != nil {
t.Fatal(err)
}
}
}
func TestJoinSeriesOffset(t *testing.T) {
if offset := tsdb.JoinSeriesOffset(0x1234, 0x56789ABC); offset != 0x123456789ABC {
t.Fatalf("unexpected offset: %x", offset)