influxdb/tsdb/series_segment_test.go

255 lines
7.9 KiB
Go

package tsdb_test
import (
"bytes"
"os"
"path/filepath"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2/tsdb"
)
func TestSeriesSegment(t *testing.T) {
dir := t.TempDir()
// Create a new initial segment (4mb) and initialize for writing.
segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000"))
if err != nil {
t.Fatal(err)
} else if err := segment.InitForWrite(); err != nil {
t.Fatal(err)
}
defer segment.Close()
// Write initial entry.
key1 := tsdb.AppendSeriesKey(nil, []byte("m0"), nil)
offset, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1, key1))
if err != nil {
t.Fatal(err)
} else if offset != tsdb.SeriesSegmentHeaderSize {
t.Fatalf("unexpected offset: %d", offset)
}
// Write a large entry (3mb).
key2 := tsdb.AppendSeriesKey(nil, bytes.Repeat([]byte("m"), 3*(1<<20)), nil)
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, key2)); err != nil {
t.Fatal(err)
} else if offset != tsdb.SeriesSegmentHeaderSize {
t.Fatalf("unexpected offset: %d", offset)
}
// Write another entry that is too large for the remaining segment space.
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 3, tsdb.AppendSeriesKey(nil, bytes.Repeat([]byte("n"), 3*(1<<20)), nil))); err != tsdb.ErrSeriesSegmentNotWritable {
t.Fatalf("unexpected error: %s", err)
}
// Verify two entries exist.
var n int
segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
switch n {
case 0:
if flag != tsdb.SeriesEntryInsertFlag || id != 1 || !bytes.Equal(key1, key) {
t.Fatalf("unexpected entry(0): %d, %d, %q", flag, id, key)
}
case 1:
if flag != tsdb.SeriesEntryInsertFlag || id != 2 || !bytes.Equal(key2, key) {
t.Fatalf("unexpected entry(1): %d, %d, %q", flag, id, key)
}
default:
t.Fatalf("too many entries")
}
n++
return nil
})
if n != 2 {
t.Fatalf("unexpected entry count: %d", n)
}
}
func TestSeriesSegment_AppendSeriesIDs(t *testing.T) {
dir := t.TempDir()
segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000"))
if err != nil {
t.Fatal(err)
} else if err := segment.InitForWrite(); err != nil {
t.Fatal(err)
}
defer segment.Close()
// Write entries.
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 10, tsdb.AppendSeriesKey(nil, []byte("m0"), nil))); err != nil {
t.Fatal(err)
} else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 11, tsdb.AppendSeriesKey(nil, []byte("m1"), nil))); err != nil {
t.Fatal(err)
} else if err := segment.Flush(); err != nil {
t.Fatal(err)
}
// Collect series ids with existing set.
a := segment.AppendSeriesIDs([]uint64{1, 2})
if diff := cmp.Diff(a, []uint64{1, 2, 10, 11}); diff != "" {
t.Fatal(diff)
}
}
func TestSeriesSegment_MaxSeriesID(t *testing.T) {
dir := t.TempDir()
segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000"))
if err != nil {
t.Fatal(err)
} else if err := segment.InitForWrite(); err != nil {
t.Fatal(err)
}
defer segment.Close()
// Write entries.
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 10, tsdb.AppendSeriesKey(nil, []byte("m0"), nil))); err != nil {
t.Fatal(err)
} else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 11, tsdb.AppendSeriesKey(nil, []byte("m1"), nil))); err != nil {
t.Fatal(err)
} else if err := segment.Flush(); err != nil {
t.Fatal(err)
}
// Verify maximum.
if max := segment.MaxSeriesID(); max != 11 {
t.Fatalf("unexpected max: %d", max)
}
}
func TestSeriesSegmentHeader(t *testing.T) {
// Verify header initializes correctly.
hdr := tsdb.NewSeriesSegmentHeader()
if hdr.Version != tsdb.SeriesSegmentVersion {
t.Fatalf("unexpected version: %d", hdr.Version)
}
// Marshal/unmarshal.
var buf bytes.Buffer
if _, err := hdr.WriteTo(&buf); err != nil {
t.Fatal(err)
} else if other, err := tsdb.ReadSeriesSegmentHeader(buf.Bytes()); err != nil {
t.Fatal(err)
} else if diff := cmp.Diff(hdr, other); diff != "" {
t.Fatal(diff)
}
}
func TestSeriesSegment_PartialWrite(t *testing.T) {
dir := t.TempDir()
// Create a new initial segment (4mb) and initialize for writing.
segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000"))
if err != nil {
t.Fatal(err)
} else if err := segment.InitForWrite(); err != nil {
t.Fatal(err)
}
defer segment.Close()
// Write two entries.
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1, tsdb.AppendSeriesKey(nil, []byte("A"), nil))); err != nil {
t.Fatal(err)
} else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil))); err != nil {
t.Fatal(err)
}
sz := segment.Size()
entrySize := len(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil)))
// Close segment.
if err := segment.Close(); err != nil {
t.Fatal(err)
}
// Truncate at each point and reopen.
for i := entrySize; i > 0; i-- {
if err := os.Truncate(filepath.Join(dir, "0000"), sz-int64(entrySize-i)); err != nil {
t.Fatal(err)
}
segment := tsdb.NewSeriesSegment(0, filepath.Join(dir, "0000"))
if err := segment.Open(); err != nil {
t.Fatal(err)
} else if err := segment.InitForWrite(); err != nil {
t.Fatal(err)
} else if err := segment.Close(); err != nil {
t.Fatal(err)
}
}
}
func TestJoinSeriesOffset(t *testing.T) {
if offset := tsdb.JoinSeriesOffset(0x1234, 0x56789ABC); offset != 0x123456789ABC {
t.Fatalf("unexpected offset: %x", offset)
}
}
func TestSplitSeriesOffset(t *testing.T) {
if segmentID, pos := tsdb.SplitSeriesOffset(0x123456789ABC); segmentID != 0x1234 || pos != 0x56789ABC {
t.Fatalf("unexpected segmentID/pos: %x/%x", segmentID, pos)
}
}
func TestIsValidSeriesSegmentFilename(t *testing.T) {
if tsdb.IsValidSeriesSegmentFilename("") {
t.Fatal("expected invalid")
} else if tsdb.IsValidSeriesSegmentFilename("0ab") {
t.Fatal("expected invalid")
} else if !tsdb.IsValidSeriesSegmentFilename("192a") {
t.Fatal("expected valid")
}
}
func TestParseSeriesSegmentFilename(t *testing.T) {
if v, err := tsdb.ParseSeriesSegmentFilename("a90b"); err != nil {
t.Fatal(err)
} else if v != 0xA90B {
t.Fatalf("unexpected value: %x", v)
}
if v, err := tsdb.ParseSeriesSegmentFilename("0001"); err != nil {
t.Fatal(err)
} else if v != 1 {
t.Fatalf("unexpected value: %x", v)
}
if _, err := tsdb.ParseSeriesSegmentFilename("invalid"); err == nil {
t.Fatal("expected error")
}
}
func TestSeriesSegmentSize(t *testing.T) {
const mb = (1 << 20)
if sz := tsdb.SeriesSegmentSize(0); sz != 4*mb {
t.Fatalf("unexpected size: %d", sz)
} else if sz := tsdb.SeriesSegmentSize(1); sz != 8*mb {
t.Fatalf("unexpected size: %d", sz)
} else if sz := tsdb.SeriesSegmentSize(2); sz != 16*mb {
t.Fatalf("unexpected size: %d", sz)
} else if sz := tsdb.SeriesSegmentSize(3); sz != 32*mb {
t.Fatalf("unexpected size: %d", sz)
} else if sz := tsdb.SeriesSegmentSize(4); sz != 64*mb {
t.Fatalf("unexpected size: %d", sz)
} else if sz := tsdb.SeriesSegmentSize(5); sz != 128*mb {
t.Fatalf("unexpected size: %d", sz)
} else if sz := tsdb.SeriesSegmentSize(6); sz != 256*mb {
t.Fatalf("unexpected size: %d", sz)
} else if sz := tsdb.SeriesSegmentSize(7); sz != 256*mb {
t.Fatalf("unexpected size: %d", sz)
}
}
func TestSeriesEntry(t *testing.T) {
seriesKey := tsdb.AppendSeriesKey(nil, []byte("m0"), nil)
buf := tsdb.AppendSeriesEntry(nil, 1, 2, seriesKey)
if flag, id, key, sz := tsdb.ReadSeriesEntry(buf); flag != 1 {
t.Fatalf("unexpected flag: %d", flag)
} else if id != 2 {
t.Fatalf("unexpected id: %d", id)
} else if !bytes.Equal(seriesKey, key) {
t.Fatalf("unexpected key: %q", key)
} else if sz != int64(tsdb.SeriesEntryHeaderSize+len(key)) {
t.Fatalf("unexpected size: %d", sz)
}
}