From 081f95147ecf7be3a1f96b07dcb19ff95bab054a Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Tue, 19 Dec 2023 15:02:34 -0800 Subject: [PATCH] fix: avoid SIGBUS when reading non-std series segment files (#24509) (#24520) Some series files which are smaller than the standard sizes cause SIGBUS in influx_inspect and influxd, because entry iteration walks onto mapped memory not backed by the the file. Avoid walking off the end of the file while iterating series entries in oddly sized files. closes https://github.com/influxdata/influxdb/issues/24508 Co-authored-by: Geoffrey Wossum (cherry picked from commit 969abf3da274052a843032704a590efb94fce6af) closes https://github.com/influxdata/influxdb/issues/24511 --- .../verify_seriesfile/verify_seriesfile.go | 5 +- tsdb/series_file.go | 4 +- tsdb/series_partition.go | 2 +- tsdb/series_segment.go | 30 +++++-- tsdb/series_segment_test.go | 84 +++++++++++-------- tsdb/shard_test.go | 8 ++ 6 files changed, 86 insertions(+), 47 deletions(-) diff --git a/cmd/influxd/inspect/verify_seriesfile/verify_seriesfile.go b/cmd/influxd/inspect/verify_seriesfile/verify_seriesfile.go index e419104ec9..596522711f 100644 --- a/cmd/influxd/inspect/verify_seriesfile/verify_seriesfile.go +++ b/cmd/influxd/inspect/verify_seriesfile/verify_seriesfile.go @@ -269,7 +269,7 @@ func (v verify) verifySegment(segmentPath string, ids map[uint64]IDData) (valid v.Logger = v.Logger.With(zap.String("segment", segmentName)) v.Logger.Info("Verifying segment") - // Open up the segment and grab it's data. + // Open up the segment and grab its data. segmentID, err := tsdb.ParseSeriesSegmentFilename(segmentName) if err != nil { return false, err @@ -280,7 +280,8 @@ func (v verify) verifySegment(segmentPath string, ids map[uint64]IDData) (valid return false, nil } defer segment.Close() - buf := newBuffer(segment.Data()) + // Only walk the file as it exists, not the whole mapping which may be bigger than the file. + buf := newBuffer(segment.Data()[:segment.Size()]) defer func() { if rec := recover(); rec != nil { diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 4100dcfe4a..bd754472aa 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -366,9 +366,9 @@ func AppendSeriesKey(dst []byte, name []byte, tags models.Tags) []byte { } // ReadSeriesKey returns the series key from the beginning of the buffer. -func ReadSeriesKey(data []byte) (key, remainder []byte) { +func ReadSeriesKey(data []byte) (key []byte) { sz, n := binary.Uvarint(data) - return data[:int(sz)+n], data[int(sz)+n:] + return data[:int(sz)+n] } func ReadSeriesKeyLen(data []byte) (sz int, remainder []byte) { diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index 7dc433ffc5..eef16bba90 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -507,7 +507,7 @@ func (p *SeriesPartition) seriesKeyByOffset(offset int64) []byte { continue } - key, _ := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize)) + key := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize)) return key } diff --git a/tsdb/series_segment.go b/tsdb/series_segment.go index b1820e5228..6b3037eaf4 100644 --- a/tsdb/series_segment.go +++ b/tsdb/series_segment.go @@ -92,6 +92,12 @@ func CreateSeriesSegment(id uint16, path string) (*SeriesSegment, error) { // Open memory maps the data file at the file's path. func (s *SeriesSegment) Open() error { if err := func() (err error) { + st, err := os.Stat(s.path) + if err != nil { + return fmt.Errorf("cannot stat %s: %w", s.path, err) + } + s.size = uint32(st.Size()) + // Memory map file data. if s.data, err = mmap.Map(s.path, int64(SeriesSegmentSize(s.id))); err != nil { return err @@ -120,14 +126,16 @@ func (s *SeriesSegment) Path() string { return s.path } // InitForWrite initializes a write handle for the segment. // This is only used for the last segment in the series file. func (s *SeriesSegment) InitForWrite() (err error) { - // Only calculate segment data size if writing. - for s.size = uint32(SeriesSegmentHeaderSize); s.size < uint32(len(s.data)); { - flag, _, _, sz := ReadSeriesEntry(s.data[s.size:]) + // Only recalculate segment data size if writing. + var size uint32 + for size = uint32(SeriesSegmentHeaderSize); size < s.size; { + flag, _, _, sz := ReadSeriesEntry(s.data[size:s.size]) if !IsValidSeriesEntryFlag(flag) { break } - s.size += uint32(sz) + size += uint32(sz) } + s.size = size // Open file handler for writing & seek to end of data. if s.file, err = os.OpenFile(s.path, os.O_WRONLY|os.O_CREATE, 0666); err != nil { @@ -243,8 +251,8 @@ func (s *SeriesSegment) MaxSeriesID() uint64 { // ForEachEntry executes fn for every entry in the segment. func (s *SeriesSegment) ForEachEntry(fn func(flag uint8, id uint64, offset int64, key []byte) error) error { - for pos := uint32(SeriesSegmentHeaderSize); pos < uint32(len(s.data)); { - flag, id, key, sz := ReadSeriesEntry(s.data[pos:]) + for pos := uint32(SeriesSegmentHeaderSize); pos < s.size; { + flag, id, key, sz := ReadSeriesEntry(s.data[pos:s.size]) if !IsValidSeriesEntryFlag(flag) { break } @@ -337,7 +345,7 @@ func ReadSeriesKeyFromSegments(a []*SeriesSegment, offset int64) []byte { return nil } buf := segment.Slice(pos) - key, _ := ReadSeriesKey(buf) + key := ReadSeriesKey(buf) return key } @@ -416,16 +424,22 @@ func (hdr *SeriesSegmentHeader) WriteTo(w io.Writer) (n int64, err error) { } func ReadSeriesEntry(data []byte) (flag uint8, id uint64, key []byte, sz int64) { + if len(data) <= 0 { + return 0, 0, nil, 1 + } // If flag byte is zero then no more entries exist. flag, data = uint8(data[0]), data[1:] if !IsValidSeriesEntryFlag(flag) { return 0, 0, nil, 1 } + if len(data) < 8 { + return 0, 0, nil, 1 + } id, data = binary.BigEndian.Uint64(data), data[8:] switch flag { case SeriesEntryInsertFlag: - key, _ = ReadSeriesKey(data) + key = ReadSeriesKey(data) } return flag, id, key, int64(SeriesEntryHeaderSize + len(key)) } diff --git a/tsdb/series_segment_test.go b/tsdb/series_segment_test.go index 451091fe07..87cc7412e8 100644 --- a/tsdb/series_segment_test.go +++ b/tsdb/series_segment_test.go @@ -4,6 +4,7 @@ import ( "bytes" "os" "path/filepath" + "strconv" "testing" "github.com/google/go-cmp/cmp" @@ -139,44 +140,59 @@ func TestSeriesSegmentHeader(t *testing.T) { } func TestSeriesSegment_PartialWrite(t *testing.T) { - dir := t.TempDir() + for extraSegs := uint64(2000); extraSegs < 4000; extraSegs++ { + func() { + dir, cleanup := MustTempDir() + defer cleanup() - // Create a new initial segment (4mb) and initialize for writing. - segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000")) - if err != nil { - t.Fatal(err) - } else if err := segment.InitForWrite(); err != nil { - t.Fatal(err) - } - defer segment.Close() + // Create a new initial segment (4mb) and initialize for writing. + segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000")) + if err != nil { + t.Fatal(err) + } else if err := segment.InitForWrite(); err != nil { + t.Fatal(err) + } + defer segment.Close() - // Write two entries. - if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1, tsdb.AppendSeriesKey(nil, []byte("A"), nil))); err != nil { - t.Fatal(err) - } else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil))); err != nil { - t.Fatal(err) - } - sz := segment.Size() - entrySize := len(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil))) + // Write two entries. + if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1, tsdb.AppendSeriesKey(nil, []byte("A"), nil))); err != nil { + t.Fatal(err) + } - // Close segment. - if err := segment.Close(); err != nil { - t.Fatal(err) - } + // Adding intermediary segments in between "A" and "B" is to try and induce a SIGBUS + // when the file truncation backs over a page. + for i := uint64(0); i < extraSegs; i++ { + if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1+i, tsdb.AppendSeriesKey(nil, []byte(strconv.Itoa(int(i))), nil))); err != nil { + t.Fatal(err) + } + } - // Truncate at each point and reopen. - for i := entrySize; i > 0; i-- { - if err := os.Truncate(filepath.Join(dir, "0000"), sz-int64(entrySize-i)); err != nil { - t.Fatal(err) - } - segment := tsdb.NewSeriesSegment(0, filepath.Join(dir, "0000")) - if err := segment.Open(); err != nil { - t.Fatal(err) - } else if err := segment.InitForWrite(); err != nil { - t.Fatal(err) - } else if err := segment.Close(); err != nil { - t.Fatal(err) - } + if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2+extraSegs, tsdb.AppendSeriesKey(nil, []byte("B"), nil))); err != nil { + t.Fatal(err) + } + sz := segment.Size() + entrySize := len(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2+extraSegs, tsdb.AppendSeriesKey(nil, []byte("B"), nil))) + + // Close segment. + if err := segment.Close(); err != nil { + t.Fatal(err) + } + + // Truncate at each point and reopen. + for i := entrySize; i > 0; i-- { + if err := os.Truncate(filepath.Join(dir, "0000"), sz-int64(entrySize-i)); err != nil { + t.Fatal(err) + } + segment := tsdb.NewSeriesSegment(0, filepath.Join(dir, "0000")) + if err := segment.Open(); err != nil { + t.Fatal(err) + } else if err := segment.InitForWrite(); err != nil { + t.Fatal(err) + } else if err := segment.Close(); err != nil { + t.Fatal(err) + } + } + }() } } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index f799cafee5..9c3bbc90f0 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -2456,6 +2456,14 @@ func (sh *Shard) MustWritePointsString(s string) { } } +func MustTempDir() (string, func()) { + dir, err := os.MkdirTemp("", "shard-test") + if err != nil { + panic(fmt.Sprintf("failed to create temp dir: %v", err)) + } + return dir, func() { os.RemoveAll(dir) } +} + type seriesIterator struct { keys [][]byte }