255 lines
7.9 KiB
Go
255 lines
7.9 KiB
Go
package tsdb_test
|
|
|
|
import (
|
|
"bytes"
|
|
"os"
|
|
"path/filepath"
|
|
"testing"
|
|
|
|
"github.com/google/go-cmp/cmp"
|
|
"github.com/influxdata/influxdb/v2/tsdb"
|
|
)
|
|
|
|
func TestSeriesSegment(t *testing.T) {
|
|
dir := t.TempDir()
|
|
|
|
// Create a new initial segment (4mb) and initialize for writing.
|
|
segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000"))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
} else if err := segment.InitForWrite(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer segment.Close()
|
|
|
|
// Write initial entry.
|
|
key1 := tsdb.AppendSeriesKey(nil, []byte("m0"), nil)
|
|
offset, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1, key1))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
} else if offset != tsdb.SeriesSegmentHeaderSize {
|
|
t.Fatalf("unexpected offset: %d", offset)
|
|
}
|
|
|
|
// Write a large entry (3mb).
|
|
key2 := tsdb.AppendSeriesKey(nil, bytes.Repeat([]byte("m"), 3*(1<<20)), nil)
|
|
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, key2)); err != nil {
|
|
t.Fatal(err)
|
|
} else if offset != tsdb.SeriesSegmentHeaderSize {
|
|
t.Fatalf("unexpected offset: %d", offset)
|
|
}
|
|
|
|
// Write another entry that is too large for the remaining segment space.
|
|
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 3, tsdb.AppendSeriesKey(nil, bytes.Repeat([]byte("n"), 3*(1<<20)), nil))); err != tsdb.ErrSeriesSegmentNotWritable {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
|
|
// Verify two entries exist.
|
|
var n int
|
|
segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
|
|
switch n {
|
|
case 0:
|
|
if flag != tsdb.SeriesEntryInsertFlag || id != 1 || !bytes.Equal(key1, key) {
|
|
t.Fatalf("unexpected entry(0): %d, %d, %q", flag, id, key)
|
|
}
|
|
case 1:
|
|
if flag != tsdb.SeriesEntryInsertFlag || id != 2 || !bytes.Equal(key2, key) {
|
|
t.Fatalf("unexpected entry(1): %d, %d, %q", flag, id, key)
|
|
}
|
|
default:
|
|
t.Fatalf("too many entries")
|
|
}
|
|
n++
|
|
return nil
|
|
})
|
|
if n != 2 {
|
|
t.Fatalf("unexpected entry count: %d", n)
|
|
}
|
|
}
|
|
|
|
func TestSeriesSegment_AppendSeriesIDs(t *testing.T) {
|
|
dir := t.TempDir()
|
|
|
|
segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000"))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
} else if err := segment.InitForWrite(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer segment.Close()
|
|
|
|
// Write entries.
|
|
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 10, tsdb.AppendSeriesKey(nil, []byte("m0"), nil))); err != nil {
|
|
t.Fatal(err)
|
|
} else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 11, tsdb.AppendSeriesKey(nil, []byte("m1"), nil))); err != nil {
|
|
t.Fatal(err)
|
|
} else if err := segment.Flush(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Collect series ids with existing set.
|
|
a := segment.AppendSeriesIDs([]uint64{1, 2})
|
|
if diff := cmp.Diff(a, []uint64{1, 2, 10, 11}); diff != "" {
|
|
t.Fatal(diff)
|
|
}
|
|
}
|
|
|
|
func TestSeriesSegment_MaxSeriesID(t *testing.T) {
|
|
dir := t.TempDir()
|
|
|
|
segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000"))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
} else if err := segment.InitForWrite(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer segment.Close()
|
|
|
|
// Write entries.
|
|
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 10, tsdb.AppendSeriesKey(nil, []byte("m0"), nil))); err != nil {
|
|
t.Fatal(err)
|
|
} else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 11, tsdb.AppendSeriesKey(nil, []byte("m1"), nil))); err != nil {
|
|
t.Fatal(err)
|
|
} else if err := segment.Flush(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Verify maximum.
|
|
if max := segment.MaxSeriesID(); max != 11 {
|
|
t.Fatalf("unexpected max: %d", max)
|
|
}
|
|
}
|
|
|
|
func TestSeriesSegmentHeader(t *testing.T) {
|
|
// Verify header initializes correctly.
|
|
hdr := tsdb.NewSeriesSegmentHeader()
|
|
if hdr.Version != tsdb.SeriesSegmentVersion {
|
|
t.Fatalf("unexpected version: %d", hdr.Version)
|
|
}
|
|
|
|
// Marshal/unmarshal.
|
|
var buf bytes.Buffer
|
|
if _, err := hdr.WriteTo(&buf); err != nil {
|
|
t.Fatal(err)
|
|
} else if other, err := tsdb.ReadSeriesSegmentHeader(buf.Bytes()); err != nil {
|
|
t.Fatal(err)
|
|
} else if diff := cmp.Diff(hdr, other); diff != "" {
|
|
t.Fatal(diff)
|
|
}
|
|
}
|
|
|
|
func TestSeriesSegment_PartialWrite(t *testing.T) {
|
|
dir := t.TempDir()
|
|
|
|
// Create a new initial segment (4mb) and initialize for writing.
|
|
segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000"))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
} else if err := segment.InitForWrite(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer segment.Close()
|
|
|
|
// Write two entries.
|
|
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1, tsdb.AppendSeriesKey(nil, []byte("A"), nil))); err != nil {
|
|
t.Fatal(err)
|
|
} else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil))); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
sz := segment.Size()
|
|
entrySize := len(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil)))
|
|
|
|
// Close segment.
|
|
if err := segment.Close(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Truncate at each point and reopen.
|
|
for i := entrySize; i > 0; i-- {
|
|
if err := os.Truncate(filepath.Join(dir, "0000"), sz-int64(entrySize-i)); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
segment := tsdb.NewSeriesSegment(0, filepath.Join(dir, "0000"))
|
|
if err := segment.Open(); err != nil {
|
|
t.Fatal(err)
|
|
} else if err := segment.InitForWrite(); err != nil {
|
|
t.Fatal(err)
|
|
} else if err := segment.Close(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestJoinSeriesOffset(t *testing.T) {
|
|
if offset := tsdb.JoinSeriesOffset(0x1234, 0x56789ABC); offset != 0x123456789ABC {
|
|
t.Fatalf("unexpected offset: %x", offset)
|
|
}
|
|
}
|
|
|
|
func TestSplitSeriesOffset(t *testing.T) {
|
|
if segmentID, pos := tsdb.SplitSeriesOffset(0x123456789ABC); segmentID != 0x1234 || pos != 0x56789ABC {
|
|
t.Fatalf("unexpected segmentID/pos: %x/%x", segmentID, pos)
|
|
}
|
|
}
|
|
|
|
func TestIsValidSeriesSegmentFilename(t *testing.T) {
|
|
if tsdb.IsValidSeriesSegmentFilename("") {
|
|
t.Fatal("expected invalid")
|
|
} else if tsdb.IsValidSeriesSegmentFilename("0ab") {
|
|
t.Fatal("expected invalid")
|
|
} else if !tsdb.IsValidSeriesSegmentFilename("192a") {
|
|
t.Fatal("expected valid")
|
|
}
|
|
}
|
|
|
|
func TestParseSeriesSegmentFilename(t *testing.T) {
|
|
if v, err := tsdb.ParseSeriesSegmentFilename("a90b"); err != nil {
|
|
t.Fatal(err)
|
|
} else if v != 0xA90B {
|
|
t.Fatalf("unexpected value: %x", v)
|
|
}
|
|
if v, err := tsdb.ParseSeriesSegmentFilename("0001"); err != nil {
|
|
t.Fatal(err)
|
|
} else if v != 1 {
|
|
t.Fatalf("unexpected value: %x", v)
|
|
}
|
|
if _, err := tsdb.ParseSeriesSegmentFilename("invalid"); err == nil {
|
|
t.Fatal("expected error")
|
|
}
|
|
}
|
|
|
|
func TestSeriesSegmentSize(t *testing.T) {
|
|
const mb = (1 << 20)
|
|
if sz := tsdb.SeriesSegmentSize(0); sz != 4*mb {
|
|
t.Fatalf("unexpected size: %d", sz)
|
|
} else if sz := tsdb.SeriesSegmentSize(1); sz != 8*mb {
|
|
t.Fatalf("unexpected size: %d", sz)
|
|
} else if sz := tsdb.SeriesSegmentSize(2); sz != 16*mb {
|
|
t.Fatalf("unexpected size: %d", sz)
|
|
} else if sz := tsdb.SeriesSegmentSize(3); sz != 32*mb {
|
|
t.Fatalf("unexpected size: %d", sz)
|
|
} else if sz := tsdb.SeriesSegmentSize(4); sz != 64*mb {
|
|
t.Fatalf("unexpected size: %d", sz)
|
|
} else if sz := tsdb.SeriesSegmentSize(5); sz != 128*mb {
|
|
t.Fatalf("unexpected size: %d", sz)
|
|
} else if sz := tsdb.SeriesSegmentSize(6); sz != 256*mb {
|
|
t.Fatalf("unexpected size: %d", sz)
|
|
} else if sz := tsdb.SeriesSegmentSize(7); sz != 256*mb {
|
|
t.Fatalf("unexpected size: %d", sz)
|
|
}
|
|
}
|
|
|
|
func TestSeriesEntry(t *testing.T) {
|
|
seriesKey := tsdb.AppendSeriesKey(nil, []byte("m0"), nil)
|
|
buf := tsdb.AppendSeriesEntry(nil, 1, 2, seriesKey)
|
|
if flag, id, key, sz := tsdb.ReadSeriesEntry(buf); flag != 1 {
|
|
t.Fatalf("unexpected flag: %d", flag)
|
|
} else if id != 2 {
|
|
t.Fatalf("unexpected id: %d", id)
|
|
} else if !bytes.Equal(seriesKey, key) {
|
|
t.Fatalf("unexpected key: %q", key)
|
|
} else if sz != int64(tsdb.SeriesEntryHeaderSize+len(key)) {
|
|
t.Fatalf("unexpected size: %d", sz)
|
|
}
|
|
}
|