influxdb/tsdb/engine/tsm1/compact_test.go

4810 lines
114 KiB
Go

package tsm1_test
import (
"errors"
"fmt"
"io/fs"
"math"
"os"
"path/filepath"
"sort"
"sync/atomic"
"testing"
"time"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
// Tests compacting a Cache snapshot into a single TSM file
func TestCompactor_Snapshot(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(1, float64(1))
v2 := tsm1.NewValue(1, float64(1))
v3 := tsm1.NewValue(2, float64(2))
points1 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {v1},
"cpu,host=B#!~#value": {v2, v3},
}
c := tsm1.NewCache(0)
for k, v := range points1 {
if err := c.Write([]byte(k), v); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
}
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = &fakeFileStore{}
files, err := compactor.WriteSnapshot(c, zap.NewNop())
if err == nil {
t.Fatalf("expected error writing snapshot: %v", err)
}
if len(files) > 0 {
t.Fatalf("no files should be compacted: got %v", len(files))
}
compactor.Open()
files, err = compactor.WriteSnapshot(c, zap.NewNop())
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
r := MustOpenTSMReader(files[0])
if got, exp := r.KeyCount(), 2; got != exp {
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
}
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{v1}},
{"cpu,host=B#!~#value", []tsm1.Value{v2, v3}},
}
for _, p := range data {
values, err := r.ReadAll([]byte(p.key))
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
}
func TestCompactor_CompactFullLastTimestamp(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
var vals tsm1.Values
ts := int64(1e9)
for i := 0; i < 120; i++ {
vals = append(vals, tsm1.NewIntegerValue(ts, 1))
ts += 1e9
}
// 121st timestamp skips a second
ts += 1e9
vals = append(vals, tsm1.NewIntegerValue(ts, 1))
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": vals[:100],
}
f1 := MustWriteTSM(dir, 1, writes)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": vals[100:],
}
f2 := MustWriteTSM(dir, 2, writes)
ffs := &fakeFileStore{}
defer ffs.Close()
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2}, zap.NewNop(), tsdb.DefaultMaxPointsPerBlock)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %#v", err)
}
r := MustOpenTSMReader(files[0])
entries := r.Entries([]byte("cpu,host=A#!~#value"))
_, b, err := r.ReadBytes(&entries[0], nil)
if err != nil {
t.Fatalf("ReadBytes: unexpected error %v", err)
}
var a tsdb.IntegerArray
err = tsm1.DecodeIntegerArrayBlock(b, &a)
if err != nil {
t.Fatalf("DecodeIntegerArrayBlock: unexpected error %v", err)
}
if a.MaxTime() != entries[0].MaxTime {
t.Fatalf("expected MaxTime == a.MaxTime()")
}
}
// Ensures that a compaction will properly merge multiple TSM files
func TestCompactor_CompactFull(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
// write 3 TSM files with different data and one new point
a1 := tsm1.NewValue(1, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a1},
}
f1 := MustWriteTSM(dir, 1, writes)
a2 := tsm1.NewValue(2, 1.2)
b1 := tsm1.NewValue(1, 2.1)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a2},
"cpu,host=B#!~#value": {b1},
}
f2 := MustWriteTSM(dir, 2, writes)
a3 := tsm1.NewValue(1, 1.3)
c1 := tsm1.NewValue(1, 3.1)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a3},
"cpu,host=C#!~#value": {c1},
}
f3 := MustWriteTSM(dir, 3, writes)
ffs := &fakeFileStore{}
defer ffs.Close()
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), tsdb.DefaultMaxPointsPerBlock)
if err == nil {
t.Fatalf("expected error writing snapshot: %v", err)
}
if len(files) > 0 {
t.Fatalf("no files should be compacted: got %v", len(files))
}
compactor.Open()
files, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), tsdb.DefaultMaxPointsPerBlock)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
expGen, expSeq, err := tsm1.DefaultParseFileName(f3)
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
expSeq = expSeq + 1
gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0])
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
if gotGen != expGen {
t.Fatalf("wrong generation for new file: got %v, exp %v", gotGen, expGen)
}
if gotSeq != expSeq {
t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq)
}
r := MustOpenTSMReader(files[0])
if got, exp := r.KeyCount(), 3; got != exp {
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
}
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{a3, a2}},
{"cpu,host=B#!~#value", []tsm1.Value{b1}},
{"cpu,host=C#!~#value", []tsm1.Value{c1}},
}
for _, p := range data {
values, err := r.ReadAll([]byte(p.key))
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch %s: got %v, exp %v", p.key, got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
}
// Ensures that a compaction will properly merge multiple TSM files
func TestCompactor_DecodeError(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
// write 3 TSM files with different data and one new point
a1 := tsm1.NewValue(1, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a1},
}
f1 := MustWriteTSM(dir, 1, writes)
a2 := tsm1.NewValue(2, 1.2)
b1 := tsm1.NewValue(1, 2.1)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a2},
"cpu,host=B#!~#value": {b1},
}
f2 := MustWriteTSM(dir, 2, writes)
a3 := tsm1.NewValue(1, 1.3)
c1 := tsm1.NewValue(1, 3.1)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a3},
"cpu,host=C#!~#value": {c1},
}
f3 := MustWriteTSM(dir, 3, writes)
f, err := os.OpenFile(f3, os.O_RDWR, os.ModePerm)
if err != nil {
panic(err)
}
f.WriteAt([]byte("ffff"), 10) // skip over header
f.Close()
ffs := &fakeFileStore{}
defer ffs.Close()
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), tsdb.DefaultMaxPointsPerBlock)
require.Error(t, err, "expected error writing snapshot")
require.Zero(t, len(files), "no files should be compacted")
compactor.Open()
_, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), tsdb.DefaultMaxPointsPerBlock)
require.ErrorContains(t, err, "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp")
tsm1.MoveTsmOnReadErr(err, zap.NewNop(), func(strings []string, strings2 []string) error {
require.Equal(t, 1, len(strings))
require.Equal(t, strings[0], f3)
return nil
})
}
// Ensures that a compaction will properly merge multiple TSM files
func TestCompactor_Compact_OverlappingBlocks(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
// write 3 TSM files with different data and one new point
a1 := tsm1.NewValue(4, 1.1)
a2 := tsm1.NewValue(5, 1.1)
a3 := tsm1.NewValue(7, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a1, a2, a3},
}
f1 := MustWriteTSM(dir, 1, writes)
c1 := tsm1.NewValue(3, 1.2)
c2 := tsm1.NewValue(8, 1.2)
c3 := tsm1.NewValue(9, 1.2)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {c1, c2, c3},
}
f3 := MustWriteTSM(dir, 3, writes)
ffs := &fakeFileStore{}
defer ffs.Close()
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Open()
files, err := compactor.CompactFast([]string{f1, f3}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
r := MustOpenTSMReader(files[0])
if got, exp := r.KeyCount(), 1; got != exp {
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
}
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{c1, a1, a2, a3, c2, c3}},
}
for _, p := range data {
values, err := r.ReadAll([]byte(p.key))
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch %s: got %v, exp %v", p.key, got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
}
// Ensures that a compaction will properly merge multiple TSM files
func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
// write 3 TSM files with different data and one new point
a1 := tsm1.NewValue(4, 1.1)
a2 := tsm1.NewValue(5, 1.1)
a3 := tsm1.NewValue(7, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a1, a2, a3},
}
f1 := MustWriteTSM(dir, 1, writes)
b1 := tsm1.NewValue(1, 1.2)
b2 := tsm1.NewValue(2, 1.2)
b3 := tsm1.NewValue(6, 1.2)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {b1, b2, b3},
}
f2 := MustWriteTSM(dir, 2, writes)
c1 := tsm1.NewValue(3, 1.2)
c2 := tsm1.NewValue(8, 1.2)
c3 := tsm1.NewValue(9, 1.2)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {c1, c2, c3},
}
f3 := MustWriteTSM(dir, 3, writes)
ffs := &fakeFileStore{}
defer ffs.Close()
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Open()
files, err := compactor.CompactFast([]string{f1, f2, f3}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
r := MustOpenTSMReader(files[0])
if got, exp := r.KeyCount(), 1; got != exp {
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
}
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{b1, b2, c1, a1, a2, b3, a3, c2, c3}},
}
for _, p := range data {
values, err := r.ReadAll([]byte(p.key))
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch %s: got %v, exp %v", p.key, got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
}
func TestCompactor_Compact_UnsortedBlocks(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
// write 2 TSM files with different data and one new point
a1 := tsm1.NewValue(4, 1.1)
a2 := tsm1.NewValue(5, 1.1)
a3 := tsm1.NewValue(6, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a1, a2, a3},
}
f1 := MustWriteTSM(dir, 1, writes)
b1 := tsm1.NewValue(1, 1.2)
b2 := tsm1.NewValue(2, 1.2)
b3 := tsm1.NewValue(3, 1.2)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {b1, b2, b3},
}
f2 := MustWriteTSM(dir, 2, writes)
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = &fakeFileStore{}
compactor.Open()
files, err := compactor.CompactFast([]string{f1, f2}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
r := MustOpenTSMReader(files[0])
if got, exp := r.KeyCount(), 1; got != exp {
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
}
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{b1, b2, b3, a1, a2, a3}},
}
for _, p := range data {
values, err := r.ReadAll([]byte(p.key))
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch %s: got %v, exp %v", p.key, got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
}
func TestCompactor_Compact_UnsortedBlocksOverlapping(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
// write 3 TSM files where two blocks are overlapping and with unsorted order
a1 := tsm1.NewValue(1, 1.1)
a2 := tsm1.NewValue(2, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a1, a2},
}
f1 := MustWriteTSM(dir, 1, writes)
b1 := tsm1.NewValue(3, 1.2)
b2 := tsm1.NewValue(4, 1.2)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {b1, b2},
}
f2 := MustWriteTSM(dir, 2, writes)
c1 := tsm1.NewValue(1, 1.1)
c2 := tsm1.NewValue(2, 1.1)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {c1, c2},
}
f3 := MustWriteTSM(dir, 3, writes)
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = &fakeFileStore{}
compactor.Open()
files, err := compactor.CompactFast([]string{f1, f2, f3}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
r := MustOpenTSMReader(files[0])
if got, exp := r.KeyCount(), 1; got != exp {
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
}
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{a1, a2, b1, b2}},
}
for _, p := range data {
values, err := r.ReadAll([]byte(p.key))
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch %s: got %v, exp %v", p.key, got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
}
// Ensures that a compaction will properly merge multiple TSM files
func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
// write 3 TSM files with different data and one new point
a1 := tsm1.NewValue(1, 1.1)
a2 := tsm1.NewValue(2, 1.2)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a1, a2},
}
f1 := MustWriteTSM(dir, 1, writes)
a3 := tsm1.NewValue(3, 1.3)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a3},
}
f2 := MustWriteTSM(dir, 2, writes)
a4 := tsm1.NewValue(4, 1.4)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a4},
}
f3 := MustWriteTSM(dir, 3, writes)
ffs := &fakeFileStore{}
defer ffs.Close()
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
expGen, expSeq, err := tsm1.DefaultParseFileName(f3)
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
expSeq = expSeq + 1
gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0])
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
if gotGen != expGen {
t.Fatalf("wrong generation for new file: got %v, exp %v", gotGen, expGen)
}
if gotSeq != expSeq {
t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq)
}
r := MustOpenTSMReader(files[0])
if got, exp := r.KeyCount(), 1; got != exp {
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
}
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{a1, a2, a3, a4}},
}
for _, p := range data {
values, err := r.ReadAll([]byte(p.key))
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch %s: got %v, exp %v", p.key, got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
if got, exp := len(r.Entries([]byte("cpu,host=A#!~#value"))), 2; got != exp {
t.Fatalf("block count mismatch: got %v, exp %v", got, exp)
}
}
// Ensures that a full compaction will skip over blocks that have the full
// range of time contained in the block tombstoned
func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
// write 3 TSM files with different data and one new point
a1 := tsm1.NewValue(1, 1.1)
a2 := tsm1.NewValue(2, 1.2)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a1, a2},
}
f1 := MustWriteTSM(dir, 1, writes)
ts := tsm1.NewTombstoner(f1, nil)
ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, math.MinInt64, math.MaxInt64)
if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}
a3 := tsm1.NewValue(3, 1.3)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a3},
}
f2 := MustWriteTSM(dir, 2, writes)
a4 := tsm1.NewValue(4, 1.4)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a4},
}
f3 := MustWriteTSM(dir, 3, writes)
ffs := &fakeFileStore{}
defer ffs.Close()
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
expGen, expSeq, err := tsm1.DefaultParseFileName(f3)
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
expSeq = expSeq + 1
gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0])
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
if gotGen != expGen {
t.Fatalf("wrong generation for new file: got %v, exp %v", gotGen, expGen)
}
if gotSeq != expSeq {
t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq)
}
r := MustOpenTSMReader(files[0])
if got, exp := r.KeyCount(), 1; got != exp {
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
}
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{a3, a4}},
}
for _, p := range data {
values, err := r.ReadAll([]byte(p.key))
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch %s: got %v, exp %v", p.key, got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
if got, exp := len(r.Entries([]byte("cpu,host=A#!~#value"))), 1; got != exp {
t.Fatalf("block count mismatch: got %v, exp %v", got, exp)
}
}
// Ensures that a full compaction will decode and combine blocks with
// partial tombstoned values
func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
// write 3 TSM files with different data and one new point
a1 := tsm1.NewValue(1, 1.1)
a2 := tsm1.NewValue(2, 1.2)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a1, a2},
}
f1 := MustWriteTSM(dir, 1, writes)
ts := tsm1.NewTombstoner(f1, nil)
// a1 should remain after compaction
ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, 2, math.MaxInt64)
if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}
a3 := tsm1.NewValue(3, 1.3)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a3},
}
f2 := MustWriteTSM(dir, 2, writes)
a4 := tsm1.NewValue(4, 1.4)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a4},
}
f3 := MustWriteTSM(dir, 3, writes)
ffs := &fakeFileStore{}
defer ffs.Close()
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
expGen, expSeq, err := tsm1.DefaultParseFileName(f3)
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
expSeq = expSeq + 1
gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0])
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
if gotGen != expGen {
t.Fatalf("wrong generation for new file: got %v, exp %v", gotGen, expGen)
}
if gotSeq != expSeq {
t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq)
}
r := MustOpenTSMReader(files[0])
if got, exp := r.KeyCount(), 1; got != exp {
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
}
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{a1, a3, a4}},
}
for _, p := range data {
values, err := r.ReadAll([]byte(p.key))
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch %s: got %v, exp %v", p.key, got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
if got, exp := len(r.Entries([]byte("cpu,host=A#!~#value"))), 2; got != exp {
t.Fatalf("block count mismatch: got %v, exp %v", got, exp)
}
}
// Ensures that a full compaction will decode and combine blocks with
// multiple tombstoned ranges within the block e.g. (t1, t2, t3, t4)
// having t2 and t3 removed
func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
// write 3 TSM files with different data and one new point
a1 := tsm1.NewValue(1, 1.1)
a2 := tsm1.NewValue(2, 1.2)
a3 := tsm1.NewValue(3, 1.3)
a4 := tsm1.NewValue(4, 1.4)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a1, a2, a3, a4},
}
f1 := MustWriteTSM(dir, 1, writes)
ts := tsm1.NewTombstoner(f1, nil)
// a1, a3 should remain after compaction
ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, 2, 2)
ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, 4, 4)
if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}
a5 := tsm1.NewValue(5, 1.5)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a5},
}
f2 := MustWriteTSM(dir, 2, writes)
a6 := tsm1.NewValue(6, 1.6)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": {a6},
}
f3 := MustWriteTSM(dir, 3, writes)
ffs := &fakeFileStore{}
defer ffs.Close()
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
expGen, expSeq, err := tsm1.DefaultParseFileName(f3)
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
expSeq = expSeq + 1
gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0])
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
if gotGen != expGen {
t.Fatalf("wrong generation for new file: got %v, exp %v", gotGen, expGen)
}
if gotSeq != expSeq {
t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq)
}
r := MustOpenTSMReader(files[0])
if got, exp := r.KeyCount(), 1; got != exp {
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
}
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{a1, a3, a5, a6}},
}
for _, p := range data {
values, err := r.ReadAll([]byte(p.key))
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch %s: got %v, exp %v", p.key, got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
if got, exp := len(r.Entries([]byte("cpu,host=A#!~#value"))), 2; got != exp {
t.Fatalf("block count mismatch: got %v, exp %v", got, exp)
}
}
// Ensures that a compaction will properly rollover to a new file when the
// max keys per blocks is exceeded
func TestCompactor_CompactFull_MaxKeys(t *testing.T) {
// This test creates a lot of data and causes timeout failures for these envs
if testing.Short() || os.Getenv("CI") != "" || os.Getenv("GORACE") != "" {
t.Skip("Skipping max keys compaction test")
}
dir := MustTempDir()
defer os.RemoveAll(dir)
// write two files where the first contains a single key with the maximum
// number of full blocks that can fit in a TSM file
f1, f1Name := MustTSMWriter(dir, 1)
values := make([]tsm1.Value, 1000)
for i := 0; i < 65534; i++ {
values = values[:0]
for j := 0; j < 1000; j++ {
values = append(values, tsm1.NewValue(int64(i*1000+j), int64(1)))
}
if err := f1.Write([]byte("cpu,host=A#!~#value"), values); err != nil {
t.Fatalf("write tsm f1: %v", err)
}
}
if err := f1.WriteIndex(); err != nil {
t.Fatalf("write index f1: %v", err)
}
f1.Close()
// Write a new file with 2 blocks that when compacted would exceed the max
// blocks
f2, f2Name := MustTSMWriter(dir, 2)
for i := 0; i < 2; i++ {
lastTimeStamp := values[len(values)-1].UnixNano() + 1
values = values[:0]
for j := lastTimeStamp; j < lastTimeStamp+1000; j++ {
values = append(values, tsm1.NewValue(int64(j), int64(1)))
}
if err := f2.Write([]byte("cpu,host=A#!~#value"), values); err != nil {
t.Fatalf("write tsm f1: %v", err)
}
}
if err := f2.WriteIndex(); err != nil {
t.Fatalf("write index f2: %v", err)
}
f2.Close()
ffs := &fakeFileStore{}
defer ffs.Close()
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Open()
// Compact both files, should get 2 files back
files, err := compactor.CompactFull([]string{f1Name, f2Name}, zap.NewNop(), tsdb.DefaultMaxPointsPerBlock)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
if got, exp := len(files), 2; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
expGen, expSeq, err := tsm1.DefaultParseFileName(f2Name)
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
expSeq = expSeq + 1
gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0])
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
if gotGen != expGen {
t.Fatalf("wrong generation for new file: got %v, exp %v", gotGen, expGen)
}
if gotSeq != expSeq {
t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq)
}
}
func TestCompactor_CompactFull_InProgress(t *testing.T) {
// This test creates a lot of data and causes timeout failures for these envs
if testing.Short() || os.Getenv("CI") != "" || os.Getenv("GORACE") != "" {
t.Skip("Skipping in progress compaction test")
}
dir := MustTempDir()
defer os.RemoveAll(dir)
f2Name := func() string {
values := make([]tsm1.Value, 1000)
// Write a new file with 2 blocks
f2, f2Name := MustTSMWriter(dir, 2)
defer func() {
assert.NoError(t, f2.Close(), "closing TSM file %s", f2Name)
}()
for i := 0; i < 2; i++ {
values = values[:0]
for j := 0; j < 1000; j++ {
values = append(values, tsm1.NewValue(int64(i*1000+j), int64(1)))
}
assert.NoError(t, f2.Write([]byte("cpu,host=A#!~#value"), values), "writing TSM file: %s", f2Name)
}
assert.NoError(t, f2.WriteIndex(), "writing TSM file index for %s", f2Name)
return f2Name
}()
ffs := &fakeFileStore{}
defer ffs.Close()
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Open()
expGen, expSeq, err := tsm1.DefaultParseFileName(f2Name)
assert.NoError(t, err, "unexpected error parsing file name %s", f2Name)
expSeq = expSeq + 1
fileName := filepath.Join(compactor.Dir, tsm1.DefaultFormatFileName(expGen, expSeq)+"."+tsm1.TSMFileExtension+"."+tsm1.TmpTSMFileExtension)
// Create a temp file to simulate an in progress compaction
f, err := os.Create(fileName)
assert.NoError(t, err, "creating in-progress compaction file %s", fileName)
defer func() {
assert.NoError(t, f.Close(), "closing in-progress compaction file %s", fileName)
}()
_, err = compactor.CompactFull([]string{f2Name}, zap.NewNop(), tsdb.DefaultMaxPointsPerBlock)
assert.Errorf(t, err, "expected an error writing snapshot for %s", f2Name)
assert.ErrorContainsf(t, err, "file exists", "unexpected error writing snapshot for %s", f2Name)
assert.Truef(t, errors.Is(err, fs.ErrExist), "error did not indicate file existence: %v", err)
pathErr := &os.PathError{}
assert.Truef(t, errors.As(err, &pathErr), "expected path error, got %v", err)
assert.Truef(t, errors.Is(pathErr, fs.ErrExist), "error did not indicate file existence: %v", pathErr)
}
func newTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*tsm1.TSMReader) (tsm1.KeyIterator, error) {
files := []string{}
for _, r := range readers {
files = append(files, r.Path())
}
return tsm1.NewTSMBatchKeyIterator(size, fast, 0, interrupt, files, readers...)
}
// Tests that a single TSM file can be read and iterated over
func TestTSMKeyIterator_Single(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(1, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {v1},
}
r := MustTSMReader(dir, 1, writes)
iter, err := newTSMKeyIterator(1, false, nil, r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
var readValues bool
for iter.Next() {
key, _, _, block, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error read: %v", err)
}
values, err := tsm1.DecodeBlock(block, nil)
if err != nil {
t.Fatalf("unexpected error decode: %v", err)
}
if got, exp := string(key), "cpu,host=A#!~#value"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), len(writes); got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
for _, v := range values {
readValues = true
assertValueEqual(t, v, v1)
}
}
if !readValues {
t.Fatalf("failed to read any values")
}
}
// Tests that duplicate point values are merged. There is only one case
// where this could happen and that is when a compaction completed and we replace
// the old TSM file with a new one and we crash just before deleting the old file.
// No data is lost but the same point time/value would exist in two files until
// compaction corrects it.
func TestTSMKeyIterator_Duplicate(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(1, int64(1))
v2 := tsm1.NewValue(1, int64(2))
writes1 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {v1},
}
r1 := MustTSMReader(dir, 1, writes1)
writes2 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {v2},
}
r2 := MustTSMReader(dir, 2, writes2)
iter, err := newTSMKeyIterator(1, false, nil, r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
var readValues bool
for iter.Next() {
key, _, _, block, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error read: %v", err)
}
values, err := tsm1.DecodeBlock(block, nil)
if err != nil {
t.Fatalf("unexpected error decode: %v", err)
}
if got, exp := string(key), "cpu,host=A#!~#value"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), 1; got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
readValues = true
assertValueEqual(t, values[0], v2)
}
if !readValues {
t.Fatalf("failed to read any values")
}
}
// Tests that deleted keys are not seen during iteration with
// TSM files.
func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(2, int64(1))
points1 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {v1},
}
r1 := MustTSMReader(dir, 1, points1)
if e := r1.Delete([][]byte{[]byte("cpu,host=A#!~#value")}); nil != e {
t.Fatal(e)
}
v2 := tsm1.NewValue(1, float64(1))
v3 := tsm1.NewValue(1, float64(1))
points2 := map[string][]tsm1.Value{
"cpu,host=A#!~#count": {v2},
"cpu,host=B#!~#value": {v3},
}
r2 := MustTSMReader(dir, 2, points2)
r2.Delete([][]byte{[]byte("cpu,host=A#!~#count")})
iter, err := newTSMKeyIterator(1, false, nil, r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
var readValues bool
var data = []struct {
key string
value tsm1.Value
}{
{"cpu,host=B#!~#value", v3},
}
for iter.Next() {
key, _, _, block, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error read: %v", err)
}
values, err := tsm1.DecodeBlock(block, nil)
if err != nil {
t.Fatalf("unexpected error decode: %v", err)
}
if got, exp := string(key), data[0].key; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), 1; got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
readValues = true
assertValueEqual(t, values[0], data[0].value)
data = data[1:]
}
if !readValues {
t.Fatalf("failed to read any values")
}
}
// Tests that deleted keys are not seen during iteration with
// TSM files.
func TestTSMKeyIterator_SingleDeletes(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(10, int64(1))
v2 := tsm1.NewValue(20, int64(1))
v3 := tsm1.NewValue(30, int64(1))
v4 := tsm1.NewValue(40, int64(1))
v5 := tsm1.NewValue(50, int64(1))
v6 := tsm1.NewValue(60, int64(1))
points1 := map[string][]tsm1.Value{
"cpu,host=0#!~#value": {v1, v2},
"cpu,host=A#!~#value": {v5, v6},
"cpu,host=B#!~#value": {v3, v4},
"cpu,host=C#!~#value": {v1, v2},
"cpu,host=D#!~#value": {v1, v2},
}
r1 := MustTSMReader(dir, 1, points1)
if e := r1.DeleteRange([][]byte{[]byte("cpu,host=A#!~#value")}, 50, 50); nil != e {
t.Fatal(e)
}
if e := r1.DeleteRange([][]byte{[]byte("cpu,host=A#!~#value")}, 60, 60); nil != e {
t.Fatal(e)
}
if e := r1.DeleteRange([][]byte{[]byte("cpu,host=C#!~#value")}, 10, 10); nil != e {
t.Fatal(e)
}
if e := r1.DeleteRange([][]byte{[]byte("cpu,host=C#!~#value")}, 60, 60); nil != e {
t.Fatal(e)
}
if e := r1.DeleteRange([][]byte{[]byte("cpu,host=C#!~#value")}, 20, 20); nil != e {
t.Fatal(e)
}
iter, err := newTSMKeyIterator(1, false, nil, r1)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
var readValues int
var data = []struct {
key string
value tsm1.Value
}{
{"cpu,host=0#!~#value", v1},
{"cpu,host=B#!~#value", v3},
{"cpu,host=D#!~#value", v1},
}
for iter.Next() {
key, _, _, block, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error read: %v", err)
}
values, err := tsm1.DecodeBlock(block, nil)
if err != nil {
t.Fatalf("unexpected error decode: %v", err)
}
if exp, got := string(key), data[0].key; exp != got {
t.Fatalf("key mismatch: got %v, exp %v", exp, got)
}
if exp, got := len(values), 2; exp != got {
t.Fatalf("values length mismatch: exp %v, got %v", exp, got)
}
readValues++
assertValueEqual(t, values[0], data[0].value)
data = data[1:]
}
if exp, got := 3, readValues; exp != got {
t.Fatalf("failed to read expected values: exp %v, got %v", exp, got)
}
}
// Tests that the TSMKeyIterator will abort if the interrupt channel is closed
func TestTSMKeyIterator_Abort(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(1, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {v1},
}
r := MustTSMReader(dir, 1, writes)
intC := make(chan struct{})
iter, err := newTSMKeyIterator(1, false, intC, r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
var aborted bool
for iter.Next() {
// Abort
close(intC)
_, _, _, _, err := iter.Read()
if err == nil {
t.Fatalf("unexpected error read: %v", err)
}
aborted = err != nil
}
if !aborted {
t.Fatalf("iteration not aborted")
}
}
func TestCacheKeyIterator_Single(t *testing.T) {
v0 := tsm1.NewValue(1, 1.0)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {v0},
}
c := tsm1.NewCache(0)
for k, v := range writes {
if err := c.Write([]byte(k), v); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
}
iter := tsm1.NewCacheKeyIterator(c, 1, nil)
var readValues bool
for iter.Next() {
key, _, _, block, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error read: %v", err)
}
values, err := tsm1.DecodeBlock(block, nil)
if err != nil {
t.Fatalf("unexpected error decode: %v", err)
}
if got, exp := string(key), "cpu,host=A#!~#value"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), len(writes); got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
for _, v := range values {
readValues = true
assertValueEqual(t, v, v0)
}
}
if !readValues {
t.Fatalf("failed to read any values")
}
}
func TestCacheKeyIterator_Chunked(t *testing.T) {
v0 := tsm1.NewValue(1, 1.0)
v1 := tsm1.NewValue(2, 2.0)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {v0, v1},
}
c := tsm1.NewCache(0)
for k, v := range writes {
if err := c.Write([]byte(k), v); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
}
iter := tsm1.NewCacheKeyIterator(c, 1, nil)
var readValues bool
var chunk int
for iter.Next() {
key, _, _, block, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error read: %v", err)
}
values, err := tsm1.DecodeBlock(block, nil)
if err != nil {
t.Fatalf("unexpected error decode: %v", err)
}
if got, exp := string(key), "cpu,host=A#!~#value"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), 1; got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
for _, v := range values {
readValues = true
assertValueEqual(t, v, writes["cpu,host=A#!~#value"][chunk])
}
chunk++
}
if !readValues {
t.Fatalf("failed to read any values")
}
}
// Tests that the CacheKeyIterator will abort if the interrupt channel is closed
func TestCacheKeyIterator_Abort(t *testing.T) {
v0 := tsm1.NewValue(1, 1.0)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {v0},
}
c := tsm1.NewCache(0)
for k, v := range writes {
if err := c.Write([]byte(k), v); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
}
intC := make(chan struct{})
iter := tsm1.NewCacheKeyIterator(c, 1, intC)
var aborted bool
for iter.Next() {
//Abort
close(intC)
_, _, _, _, err := iter.Read()
if err == nil {
t.Fatalf("unexpected error read: %v", err)
}
aborted = err != nil
}
if !aborted {
t.Fatalf("iteration not aborted")
}
}
func normalizeExtFileStat(efs []tsm1.ExtFileStat, defaultBlockCount int) []tsm1.ExtFileStat {
efsNorm := make([]tsm1.ExtFileStat, 0, len(efs))
for _, f := range efs {
if f.FirstBlockCount == 0 {
f.FirstBlockCount = defaultBlockCount
}
efsNorm = append(efsNorm, f)
}
return efsNorm
}
type ffsOpt func(ffs *fakeFileStore)
func withExtFileStats(efs []tsm1.ExtFileStat) ffsOpt {
return func(ffs *fakeFileStore) {
ffs.PathsFn = func() []tsm1.ExtFileStat { return normalizeExtFileStat(efs, ffs.defaultBlockCount) }
}
}
func withFileStats(fs []tsm1.FileStat) ffsOpt {
return withExtFileStats(tsm1.FileStats(fs).ToExtFileStats())
}
func withDefaultBlockCount(blockCount int) ffsOpt {
return func(ffs *fakeFileStore) {
ffs.defaultBlockCount = blockCount
}
}
func newFakeFileStore(opts ...ffsOpt) *fakeFileStore {
ffs := &fakeFileStore{}
for _, o := range opts {
o(ffs)
}
return ffs
}
func TestDefaultPlanner_Plan_Min(t *testing.T) {
fileStats := []tsm1.FileStat{
{
Path: "01-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "02-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "03-1.tsm1",
Size: 251 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(fileStats)), tsdb.DefaultCompactFullWriteColdDuration,
)
tsm, pLen := cp.Plan(time.Now())
if exp, got := 0, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
// Ensure that if there are older files that can be compacted together but a newer
// file that is in a larger step, the older ones will get compacted.
func TestDefaultPlanner_Plan_CombineSequence(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-04.tsm1",
Size: 128 * 1024 * 1024,
},
{
Path: "02-04.tsm1",
Size: 128 * 1024 * 1024,
},
{
Path: "03-04.tsm1",
Size: 128 * 1024 * 1024,
},
{
Path: "04-04.tsm1",
Size: 128 * 1024 * 1024,
},
{
Path: "06-02.tsm1",
Size: 67 * 1024 * 1024,
},
{
Path: "07-02.tsm1",
Size: 128 * 1024 * 1024,
},
{
Path: "08-01.tsm1",
Size: 251 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
tsm, pLen := cp.Plan(time.Now())
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
// Ensure that the planner grabs the smallest compaction step
func TestDefaultPlanner_Plan_MultipleGroups(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-04.tsm1",
Size: 64 * 1024 * 1024,
},
{
Path: "02-04.tsm1",
Size: 64 * 1024 * 1024,
},
{
Path: "03-04.tsm1",
Size: 64 * 1024 * 1024,
},
{
Path: "04-04.tsm1",
Size: 129 * 1024 * 1024,
},
{
Path: "05-04.tsm1",
Size: 129 * 1024 * 1024,
},
{
Path: "06-04.tsm1",
Size: 129 * 1024 * 1024,
},
{
Path: "07-04.tsm1",
Size: 129 * 1024 * 1024,
},
{
Path: "08-04.tsm1",
Size: 129 * 1024 * 1024,
},
{
Path: "09-04.tsm1", // should be skipped
Size: 129 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
tsdb.DefaultCompactFullWriteColdDuration)
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3],
data[4], data[5], data[6], data[7]}
tsm, pLen := cp.Plan(time.Now())
if got, exp := len(tsm), 2; got != exp {
t.Fatalf("compaction group length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
if exp, got := len(expFiles[:4]), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
if exp, got := len(expFiles[4:]), len(tsm[1]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
for i, p := range expFiles[:4] {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
for i, p := range expFiles[4:] {
if got, exp := tsm[1][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
// Ensure that the planner grabs the smallest compaction step
func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-03.tsm1",
Size: 251 * 1024 * 1024,
},
{
Path: "02-03.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "03-03.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "04-03.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "05-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "06-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "07-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "08-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "09-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "10-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "11-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "12-01.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[4], data[5], data[6], data[7], data[8], data[9], data[10], data[11]}
tsm, pLen := cp.PlanLevel(1)
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
func TestDefaultPlanner_PlanLevel_SplitFile(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-03.tsm1",
Size: 251 * 1024 * 1024,
},
{
Path: "02-03.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "03-03.tsm1",
Size: 2 * 1024 * 1024 * 1024,
},
{
Path: "03-04.tsm1",
Size: 10 * 1024 * 1024,
},
{
Path: "04-03.tsm1",
Size: 10 * 1024 * 1024,
},
{
Path: "05-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "06-01.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4]}
tsm, pLen := cp.PlanLevel(3)
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
func TestDefaultPlanner_PlanLevel_IsolatedHighLevel(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-02.tsm1",
Size: 251 * 1024 * 1024,
},
{
Path: "02-02.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "03-03.tsm1",
Size: 2 * 1024 * 1024 * 1024,
},
{
Path: "03-04.tsm1",
Size: 2 * 1024 * 1024 * 1024,
},
{
Path: "04-02.tsm1",
Size: 10 * 1024 * 1024,
},
{
Path: "05-02.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "06-02.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{}
tsm, pLen := cp.PlanLevel(3)
if exp, got := len(expFiles), len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
func TestDefaultPlanner_PlanLevel3_MinFiles(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-03.tsm1",
Size: 251 * 1024 * 1024,
},
{
Path: "02-03.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "03-01.tsm1",
Size: 2 * 1024 * 1024 * 1024,
},
{
Path: "04-01.tsm1",
Size: 10 * 1024 * 1024,
},
{
Path: "05-02.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "06-01.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{}
tsm, pLen := cp.PlanLevel(3)
if exp, got := len(expFiles), len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
func TestDefaultPlanner_PlanLevel2_MinFiles(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "02-04.tsm1",
Size: 251 * 1024 * 1024,
},
{
Path: "03-02.tsm1",
Size: 251 * 1024 * 1024,
},
{
Path: "03-03.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{}
tsm, pLen := cp.PlanLevel(2)
if exp, got := len(expFiles), len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
func TestDefaultPlanner_PlanLevel_Tombstone(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-03.tsm1",
Size: 251 * 1024 * 1024,
HasTombstone: true,
},
{
Path: "02-03.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "03-01.tsm1",
Size: 2 * 1024 * 1024 * 1024,
},
{
Path: "04-01.tsm1",
Size: 10 * 1024 * 1024,
},
{
Path: "05-02.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "06-01.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[0], data[1]}
tsm, pLen := cp.PlanLevel(3)
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-01.tsm1",
Size: 251 * 1024 * 1024,
},
{
Path: "02-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "03-01.tsm1",
Size: 2 * 1024 * 1024 * 1024,
},
{
Path: "04-01.tsm1",
Size: 10 * 1024 * 1024,
},
{
Path: "05-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "06-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "07-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "08-01.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}
tsm, pLen := cp.PlanLevel(1)
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles1 {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-01.tsm1",
Size: 251 * 1024 * 1024,
},
{
Path: "02-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "03-01.tsm1",
Size: 2 * 1024 * 1024 * 1024,
},
{
Path: "04-01.tsm1",
Size: 10 * 1024 * 1024,
},
{
Path: "05-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "06-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "07-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "08-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "09-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "10-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "11-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "12-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "13-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "14-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "15-01.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "16-01.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles1 := data[0:8]
expFiles2 := data[8:16]
tsm, pLen := cp.PlanLevel(1)
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles1 {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
if exp, got := len(expFiles2), len(tsm[1]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
for i, p := range expFiles2 {
if got, exp := tsm[1][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
cp.Release(tsm[1:])
tsm, pLen = cp.PlanLevel(1)
if exp, got := len(expFiles2), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles2 {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
func TestDefaultPlanner_PlanOptimize_NoLevel4(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-03.tsm1",
Size: 251 * 1024 * 1024,
},
{
Path: "02-03.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "03-03.tsm1",
Size: 2 * 1024 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{}
tsm, pLen, gLen := cp.PlanOptimize(time.Now().Add(-tsdb.DefaultCompactFullWriteColdDuration + 1))
if exp, got := len(expFiles), len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
} else if gLen != int64(3) {
t.Fatalf("generation len plan mismatch: got %v, exp %v", gLen, 3)
}
}
func TestDefaultPlanner_PlanOptimize_Tombstones(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-04.tsm1",
Size: 251 * 1024 * 1024,
},
{
Path: "01-05.tsm1",
Size: 1 * 1024 * 1024,
HasTombstone: true,
},
{
Path: "02-06.tsm1",
Size: 2 * 1024 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[0], data[1], data[2]}
tsm, pLen, _ := cp.PlanOptimize(time.Now().Add(-tsdb.DefaultCompactFullWriteColdDuration + 1))
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
// Ensure that the planner will compact all files if no writes
// have happened in some interval
func TestDefaultPlanner_Plan_FullOnCold(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-01.tsm1",
Size: 513 * 1024 * 1024,
},
{
Path: "02-02.tsm1",
Size: 129 * 1024 * 1024,
},
{
Path: "03-02.tsm1",
Size: 33 * 1024 * 1024,
},
{
Path: "04-02.tsm1",
Size: 1 * 1024 * 1024,
},
{
Path: "05-02.tsm1",
Size: 10 * 1024 * 1024,
},
{
Path: "06-01.tsm1",
Size: 2 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
time.Nanosecond,
)
tsm, pLen := cp.Plan(time.Now().Add(-time.Second))
if exp, got := len(data), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range data {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
// Ensure that the planner will not return files that are over the max
// allowable size
func TestDefaultPlanner_Plan_SkipMaxSizeFiles(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-01.tsm1",
Size: 2049 * 1024 * 1024,
},
{
Path: "02-02.tsm1",
Size: 2049 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
tsdb.DefaultCompactFullWriteColdDuration,
)
tsm, pLen := cp.Plan(time.Now())
if exp, got := 0, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
// Ensure that the planner will not return files that are over the max
// allowable size
func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) {
testSet := []tsm1.FileStat{
{
Path: "01-05.tsm1",
Size: 256 * 1024 * 1024,
},
{
Path: "02-05.tsm1",
Size: 256 * 1024 * 1024,
},
{
Path: "03-05.tsm1",
Size: 256 * 1024 * 1024,
},
{
Path: "04-04.tsm1",
Size: 256 * 1024 * 1024,
},
}
ffs := newFakeFileStore(withFileStats(testSet), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock))
cp := tsm1.NewDefaultPlanner(ffs, time.Nanosecond)
plan, pLen := cp.Plan(time.Now().Add(-time.Second))
// first verify that our test set would return files
if exp, got := 4, len(plan[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(plan)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(plan)
// skip planning if all files are over the limit
over := []tsm1.FileStat{
{
Path: "01-05.tsm1",
Size: 2049 * 1024 * 1024,
},
{
Path: "02-05.tsm1",
Size: 2049 * 1024 * 1024,
},
{
Path: "03-05.tsm1",
Size: 2049 * 1024 * 1024,
},
{
Path: "04-05.tsm1",
Size: 2049 * 1024 * 1024,
},
{
Path: "05-05.tsm1",
Size: 2049 * 1024 * 1024,
},
}
overFs := newFakeFileStore(withFileStats(over), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock))
cp.FileStore = overFs
plan, pLen = cp.Plan(time.Now().Add(-time.Second))
if exp, got := 0, len(plan); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(plan)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(plan)
plan, pLen, _ = cp.PlanOptimize(time.Now().Add(-tsdb.DefaultCompactFullWriteColdDuration + 1))
// ensure the optimize planner would pick this up
if exp, got := 1, len(plan); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(plan)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(plan)
cp.FileStore = ffs
// ensure that it will plan if last modified has changed
ffs.lastModified = time.Now()
cGroups, pLen := cp.Plan(time.Now())
if exp, got := 4, len(cGroups[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(cGroups)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
// Tests that 2 generations, each over 2 GB and the second in level 2 does
// not return just the first generation. This was a case where full planning
// would get repeatedly plan the same files and never stop.
func TestDefaultPlanner_Plan_TwoGenLevel3(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "000002245-000001666.tsm",
Size: 2049 * 1024 * 1024,
},
{
Path: "000002245-000001667.tsm",
Size: 2049 * 1024 * 1024,
},
{
Path: "000002245-000001668.tsm",
Size: 2049 * 1024 * 1024,
},
{
Path: "000002245-000001669.tsm",
Size: 2049 * 1024 * 1024,
},
{
Path: "000002245-000001670.tsm",
Size: 2049 * 1024 * 1024,
},
{
Path: "000002245-000001671.tsm",
Size: 2049 * 1024 * 1024,
},
{
Path: "000002245-000001672.tsm",
Size: 2049 * 1024 * 1024,
},
{
Path: "000002245-000001673.tsm",
Size: 192631258,
},
{
Path: "000002246-000000002.tsm",
Size: 2049 * 1024 * 1024,
},
{
Path: "000002246-000000003.tsm",
Size: 192631258,
},
}
fs := newFakeFileStore(withFileStats(data), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock))
cp := tsm1.NewDefaultPlanner(fs, time.Hour)
tsm, pLen := cp.Plan(time.Now().Add(-24 * time.Hour))
if exp, got := 1, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
// Ensure that the planner will return files over the max file
// size, but do not contain full blocks
func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) {
testSet := []tsm1.FileStat{
{
Path: "01-05.tsm1",
Size: 256 * 1024 * 1024,
},
{
Path: "02-05.tsm1",
Size: 256 * 1024 * 1024,
},
{
Path: "03-05.tsm1",
Size: 256 * 1024 * 1024,
},
{
Path: "04-04.tsm1",
Size: 256 * 1024 * 1024,
},
}
ffs := newFakeFileStore(withFileStats(testSet), withDefaultBlockCount(100))
cp := tsm1.NewDefaultPlanner(
ffs,
time.Nanosecond,
)
plan, pLen := cp.Plan(time.Now().Add(-time.Second))
// first verify that our test set would return files
if exp, got := 4, len(plan[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(plan)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(plan)
// skip planning if all files are over the limit
over := []tsm1.FileStat{
{
Path: "01-05.tsm1",
Size: 2049 * 1024 * 1024,
},
{
Path: "02-05.tsm1",
Size: 2049 * 1024 * 1024,
},
}
overFs := newFakeFileStore(withFileStats(over), withDefaultBlockCount(100))
cp.FileStore = overFs
cGroups, pLen := cp.Plan(time.Now().Add(-time.Second))
if exp, got := 1, len(cGroups); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(cGroups)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
// Ensure that the planner will compact files that are past the smallest step
// size even if there is a single file in the smaller step size
func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-04.tsm1",
Size: 64 * 1024 * 1024,
},
{
Path: "02-04.tsm1",
Size: 64 * 1024 * 1024,
},
{
Path: "03-04.tsm1",
Size: 64 * 1024 * 1024,
},
{
Path: "04-04.tsm1",
Size: 64 * 1024 * 1024,
},
{
Path: "05-02.tsm1",
Size: 2 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(data)),
tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
tsm, pLen := cp.Plan(time.Now())
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
func TestDefaultPlanner_Plan_LargeGeneration(t *testing.T) {
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(
[]tsm1.FileStat{
{
Path: "000000278-000000006.tsm",
Size: 2148340232,
},
{
Path: "000000278-000000007.tsm",
Size: 2148356556,
},
{
Path: "000000278-000000008.tsm",
Size: 167780181,
},
{
Path: "000000278-000047040.tsm",
Size: 2148728539,
},
{
Path: "000000278-000047041.tsm",
Size: 701863692,
},
})),
tsdb.DefaultCompactFullWriteColdDuration,
)
tsm, pLen := cp.Plan(time.Now())
if exp, got := 0, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
func TestDefaultPlanner_Plan_ForceFull(t *testing.T) {
cp := tsm1.NewDefaultPlanner(
newFakeFileStore(withFileStats(
[]tsm1.FileStat{
{
Path: "000000001-000000001.tsm",
Size: 2148340232,
},
{
Path: "000000002-000000001.tsm",
Size: 2148356556,
},
{
Path: "000000003-000000001.tsm",
Size: 167780181,
},
{
Path: "000000004-000000001.tsm",
Size: 2148728539,
},
{
Path: "000000005-000000001.tsm",
Size: 2148340232,
},
{
Path: "000000006-000000001.tsm",
Size: 2148356556,
},
{
Path: "000000007-000000001.tsm",
Size: 167780181,
},
{
Path: "000000008-000000001.tsm",
Size: 2148728539,
},
{
Path: "000000009-000000002.tsm",
Size: 701863692,
},
{
Path: "000000010-000000002.tsm",
Size: 701863692,
},
{
Path: "000000011-000000002.tsm",
Size: 701863692,
},
{
Path: "000000012-000000002.tsm",
Size: 701863692,
},
{
Path: "000000013-000000002.tsm",
Size: 701863692,
},
})),
tsdb.DefaultCompactFullWriteColdDuration,
)
tsm, pLen := cp.PlanLevel(1)
if exp, got := 1, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(tsm)
tsm, pLen = cp.PlanLevel(2)
if exp, got := 1, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(tsm)
cp.ForceFull()
// Level plans should not return any plans
tsm, pLen = cp.PlanLevel(1)
if exp, got := 0, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(tsm)
tsm, pLen = cp.PlanLevel(2)
if exp, got := 0, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(tsm)
tsm, pLen = cp.Plan(time.Now())
if exp, got := 1, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
if got, exp := len(tsm[0]), 13; got != exp {
t.Fatalf("plan length mismatch: got %v, exp %v", got, exp)
}
cp.Release(tsm)
// Level plans should return plans now that Plan has been called
tsm, pLen = cp.PlanLevel(1)
if exp, got := 1, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(tsm)
tsm, pLen = cp.PlanLevel(2)
if exp, got := 1, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(tsm)
}
func TestIsGroupOptimized(t *testing.T) {
testSetNoExt := []tsm1.FileStat{
{
Path: "01-05.tsm",
Size: 256 * 1024 * 1024,
},
{
Path: "02-05.tsm",
Size: 256 * 1024 * 1024,
},
{
Path: "03-05.tsm",
Size: 256 * 1024 * 1024,
},
{
Path: "04-04.tsm",
Size: 256 * 1024 * 1024,
},
}
testSet := tsm1.FileStats(testSetNoExt).ToExtFileStats()
blockCounts := []struct {
blockCounts []int
optimizedName string
}{
{
blockCounts: []int{
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
},
optimizedName: "",
},
{
blockCounts: []int{
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
},
optimizedName: "01-05.tsm",
},
{
blockCounts: []int{
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
},
optimizedName: "02-05.tsm",
},
{
blockCounts: []int{
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
},
optimizedName: "04-04.tsm",
},
}
ffs := newFakeFileStore(withExtFileStats(testSet))
cp := tsm1.NewDefaultPlanner(ffs, tsdb.DefaultCompactFullWriteColdDuration)
e := MustOpenEngine(tsdb.InmemIndexName)
e.CompactionPlan = cp
e.Compactor = tsm1.NewCompactor()
e.Compactor.FileStore = ffs
fileGroup := make([]string, 0, len(testSet))
for j := 0; j < len(testSet); j++ {
fileGroup = append(fileGroup, testSet[j].Path)
}
for testIdx := 0; testIdx < len(blockCounts); testIdx++ {
require.Lenf(t, blockCounts[testIdx].blockCounts, len(testSet), "incorrect block count for case %d", testIdx)
for statIdx := range testSet {
testSet[statIdx].FirstBlockCount = blockCounts[testIdx].blockCounts[statIdx]
}
ok, fName, _ := e.IsGroupOptimized(fileGroup)
require.Equal(t, blockCounts[testIdx].optimizedName != "", ok, "unexpected result for optimization check")
require.Equal(t, blockCounts[testIdx].optimizedName, fName, "unexpected file name in optimization check")
}
}
func TestEnginePlanCompactions(t *testing.T) {
type testLevelResults struct {
level1Groups []tsm1.PlannedCompactionGroup
level2Groups []tsm1.PlannedCompactionGroup
level3Groups []tsm1.PlannedCompactionGroup
level4Groups []tsm1.PlannedCompactionGroup
level5Groups []tsm1.PlannedCompactionGroup
}
type testEnginePlanCompactionsRunner struct {
name string
files []tsm1.ExtFileStat
defaultBlockCount int // Default block count if member of files has FirstBlockCount of 0.
// This is specifically used to adjust the modification time
// so we can simulate the passage of time in tests
testShardTime time.Duration
// Each result is for the different plantypes
getResultByPlanType func(planType tsm1.PlanType) testLevelResults
}
tests := []testEnginePlanCompactionsRunner{
{
name: "many generations under 2GB",
files: []tsm1.ExtFileStat{
{
FileStat: tsm1.FileStat{
Path: "01-05.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "02-05.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "03-05.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "04-04.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
},
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
common := testLevelResults{
level5Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"01-05.tsm", "02-05.tsm", "03-05.tsm", "04-04.tsm"},
tsdb.DefaultAggressiveMaxPointsPerBlock,
},
},
}
switch planType {
case tsm1.PT_Standard, tsm1.PT_SmartOptimize:
return common
case tsm1.PT_NoOptimize:
return testLevelResults{}
}
return testLevelResults{}
},
},
{
name: "Many generations with files over 2GB",
files: []tsm1.ExtFileStat{
{
FileStat: tsm1.FileStat{
Path: "01-05.tsm1",
Size: 2048 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-06.tsm1",
Size: 2048 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-07.tsm1",
Size: 2048 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-08.tsm1",
Size: 1048 * 1024 * 1024,
},
FirstBlockCount: 100,
},
{
FileStat: tsm1.FileStat{
Path: "02-05.tsm1",
Size: 2048 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "02-06.tsm1",
Size: 2048 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "02-07.tsm1",
Size: 2048 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "02-08.tsm1",
Size: 1048 * 1024 * 1024,
},
FirstBlockCount: 100,
},
{
FileStat: tsm1.FileStat{
Path: "03-04.tsm1",
Size: 2048 * 1024 * 1024,
},
FirstBlockCount: 10,
},
{
FileStat: tsm1.FileStat{
Path: "03-05.tsm1",
Size: 512 * 1024 * 1024,
},
FirstBlockCount: 5,
},
},
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
common := testLevelResults{
level5Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"01-05.tsm1", "01-06.tsm1", "01-07.tsm1", "01-08.tsm1", "02-05.tsm1", "02-06.tsm1", "02-07.tsm1", "02-08.tsm1", "03-04.tsm1", "03-05.tsm1"},
tsdb.DefaultMaxPointsPerBlock,
},
},
}
switch planType {
case tsm1.PT_Standard, tsm1.PT_SmartOptimize:
return common
case tsm1.PT_NoOptimize:
return testLevelResults{}
}
return testLevelResults{}
},
},
{
name: "Small group size with single generation",
/* These files are supposed to have 0 block counts */
files: []tsm1.ExtFileStat{
{
FileStat: tsm1.FileStat{
Path: "01-05.tsm1",
Size: 300 * 1024 * 1024,
},
},
{
FileStat: tsm1.FileStat{
Path: "01-06.tsm1",
Size: 200 * 1024 * 1024,
},
},
{
FileStat: tsm1.FileStat{
Path: "01-07.tsm1",
Size: 100 * 1024 * 1024,
},
},
{
FileStat: tsm1.FileStat{
Path: "01-08.tsm1",
Size: 50 * 1024 * 1024,
},
},
},
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
common := testLevelResults{
level5Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"01-05.tsm1",
"01-06.tsm1",
"01-07.tsm1",
"01-08.tsm1",
},
tsdb.DefaultAggressiveMaxPointsPerBlock,
},
},
}
switch planType {
case tsm1.PT_Standard, tsm1.PT_SmartOptimize:
return common
case tsm1.PT_NoOptimize:
return testLevelResults{}
}
return testLevelResults{}
},
},
{
name: "Small group size with single generation and levels under 4",
/* These files are supposed to have block counts of 0 */
files: tsm1.FileStats{
{
Path: "01-02.tsm1",
Size: 300 * 1024 * 1024,
},
{
Path: "01-03.tsm1",
Size: 200 * 1024 * 1024,
},
{
Path: "01-04.tsm1",
Size: 100 * 1024 * 1024,
},
}.ToExtFileStats(),
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
common := testLevelResults{
level5Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"01-02.tsm1",
"01-03.tsm1",
"01-04.tsm1",
},
tsdb.DefaultAggressiveMaxPointsPerBlock,
},
},
}
switch planType {
case tsm1.PT_Standard, tsm1.PT_SmartOptimize:
return common
case tsm1.PT_NoOptimize:
return testLevelResults{}
}
return testLevelResults{}
},
},
{
name: "Small group size with single generation all at DefaultMaxPointsPerBlock",
files: tsm1.FileStats{
{
Path: "01-05.tsm1",
Size: 300 * 1024 * 1024,
},
{
Path: "01-06.tsm1",
Size: 200 * 1024 * 1024,
},
{
Path: "01-07.tsm1",
Size: 100 * 1024 * 1024,
},
{
Path: "01-08.tsm1",
Size: 50 * 1024 * 1024,
},
}.ToExtFileStats(),
defaultBlockCount: tsdb.DefaultMaxPointsPerBlock,
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
common := testLevelResults{
level5Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"01-05.tsm1",
"01-06.tsm1",
"01-07.tsm1",
"01-08.tsm1",
},
tsdb.DefaultAggressiveMaxPointsPerBlock,
},
},
}
switch planType {
case tsm1.PT_Standard, tsm1.PT_SmartOptimize:
return common
case tsm1.PT_NoOptimize:
return testLevelResults{}
}
return testLevelResults{}
},
},
{
name: "Small group size with single generation 50% at DefaultMaxPointsPerBlock and 50% at DefaultAggressiveMaxPointsPerBlock",
files: []tsm1.ExtFileStat{
{
FileStat: tsm1.FileStat{
Path: "01-05.tsm1",
Size: 700 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-06.tsm1",
Size: 500 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-07.tsm1",
Size: 400 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-08.tsm1",
Size: 300 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-09.tsm1",
Size: 200 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-10.tsm1",
Size: 100 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-11.tsm1",
Size: 50 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-12.tsm1",
Size: 25 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
},
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
common := testLevelResults{
level5Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"01-05.tsm1",
"01-06.tsm1",
"01-07.tsm1",
"01-08.tsm1",
"01-09.tsm1",
"01-10.tsm1",
"01-11.tsm1",
"01-12.tsm1",
},
tsdb.DefaultAggressiveMaxPointsPerBlock,
},
},
}
switch planType {
case tsm1.PT_Standard, tsm1.PT_SmartOptimize:
return common
case tsm1.PT_NoOptimize:
return testLevelResults{}
}
return testLevelResults{}
},
},
{
name: "Group size over 2GB with single generation",
files: []tsm1.ExtFileStat{
{
FileStat: tsm1.FileStat{
Path: "01-13.tsm1",
Size: 2048 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-14.tsm1",
Size: 650 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-15.tsm1",
Size: 450 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
},
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
common := testLevelResults{
level5Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"01-13.tsm1",
"01-14.tsm1",
"01-15.tsm1",
},
tsdb.DefaultAggressiveMaxPointsPerBlock,
},
},
}
switch planType {
case tsm1.PT_Standard, tsm1.PT_SmartOptimize:
return common
case tsm1.PT_NoOptimize:
return testLevelResults{}
}
return testLevelResults{}
},
},
{
// This test is added to account for halting state after
// TestDefaultPlanner_FullyCompacted_SmallSingleGeneration
// will need to ensure that once we have single TSM file under 2 GB we stop
name: "Single TSM file",
/* These files are supposed to have 0 blocks */
files: tsm1.FileStats{
{
Path: "01-09.tsm1",
Size: 650 * 1024 * 1024,
},
}.ToExtFileStats(),
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
return testLevelResults{}
},
},
{
// This test is added to account for a single generation that has a group size
// over 2 GB with 1 file under 2 GB all at max points per block with aggressive compaction.
// It should not compact any further.
name: "TSM files at DefaultAggressiveMaxPointsPerBlock",
files: tsm1.FileStats{
{
Path: "01-13.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-14.tsm1",
Size: 691 * 1024 * 1024,
},
}.ToExtFileStats(),
defaultBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
return testLevelResults{}
},
},
{
// This test is added to account for a single generation that has a group size
// over 2 GB at max points per block with aggressive compaction, and, 1 file
// under 2 GB at default max points per block.
// It should not compact any further.
name: "TSM files cannot compact further, single file under 2G and at DefaultMaxPointsPerBlock",
files: []tsm1.ExtFileStat{
{
FileStat: tsm1.FileStat{
Path: "01-13.tsm1",
Size: 2048 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-14.tsm1",
Size: 691 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
},
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
return testLevelResults{}
},
},
{
// This test is added to account for a single generation that has a group size
// over 2 GB and multiple files under 2 GB all at max points per block for aggressive compaction.
name: "Group size over 2 with multiple files under 2GB and at DefaultAggressiveMaxPointsPerBlock",
files: tsm1.FileStats{
{
Path: "01-13.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-14.tsm1",
Size: 650 * 1024 * 1024,
},
{
Path: "01-15.tsm1",
Size: 450 * 1024 * 1024,
},
}.ToExtFileStats(),
defaultBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
return testLevelResults{}
},
},
{
name: "Generations with files under level 4",
// These files are supposed to have block counts of 0
files: tsm1.FileStats{
{
Path: "01-05.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-06.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-07.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-08.tsm1",
Size: 1048 * 1024 * 1024,
},
{
Path: "02-05.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "02-06.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "02-07.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "02-08.tsm1",
Size: 1048 * 1024 * 1024,
},
{
Path: "03-03.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "03-04.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "03-05.tsm1",
Size: 600 * 1024 * 1024,
},
{
Path: "03-06.tsm1",
Size: 500 * 1024 * 1024,
},
}.ToExtFileStats(),
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
common := testLevelResults{
level5Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"01-05.tsm1",
"01-06.tsm1",
"01-07.tsm1",
"01-08.tsm1",
"02-05.tsm1",
"02-06.tsm1",
"02-07.tsm1",
"02-08.tsm1",
"03-03.tsm1",
"03-04.tsm1",
"03-05.tsm1",
"03-06.tsm1"},
tsdb.DefaultMaxPointsPerBlock,
},
},
}
switch planType {
case tsm1.PT_Standard, tsm1.PT_SmartOptimize:
return common
case tsm1.PT_NoOptimize:
return testLevelResults{}
}
return testLevelResults{}
},
},
{
// This test will mock a 'backfill' condition where we have a single
// shard with many generations. The initial generation should be fully
// compacted, but we have some new generations that are not. We need to ensure
// the optimize planner will pick these up and compact everything together.
name: "Backfill mock condition",
files: []tsm1.ExtFileStat{
{
FileStat: tsm1.FileStat{
Path: "01-05.tsm1",
Size: 2048 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-06.tsm1",
Size: 2048 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "01-07.tsm1",
Size: 2048 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "02-04.tsm1",
Size: 700 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "02-05.tsm1",
Size: 500 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "02-06.tsm1",
Size: 400 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "03-02.tsm1",
Size: 700 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "03-03.tsm1",
Size: 500 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "03-04.tsm1",
Size: 400 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "04-01.tsm1",
Size: 700 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "04-02.tsm1",
Size: 500 * 1024 * 1024,
},
FirstBlockCount: 100,
},
{
FileStat: tsm1.FileStat{
Path: "03-03.tsm1",
Size: 400 * 1024 * 1024,
},
FirstBlockCount: 10,
},
},
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
common := testLevelResults{
level5Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{
"01-05.tsm1",
"01-06.tsm1",
"01-07.tsm1",
"02-04.tsm1",
"02-05.tsm1",
"02-06.tsm1",
"03-02.tsm1",
"03-03.tsm1",
"03-04.tsm1",
"03-03.tsm1",
"04-01.tsm1",
"04-02.tsm1",
}, tsdb.DefaultAggressiveMaxPointsPerBlock},
},
}
switch planType {
case tsm1.PT_Standard, tsm1.PT_SmartOptimize:
return common
case tsm1.PT_NoOptimize:
return testLevelResults{}
}
return testLevelResults{}
},
},
{
name: "1.12.0 RC0 Planner issue mock data from cluster",
files: []tsm1.ExtFileStat{
{
FileStat: tsm1.FileStat{
Path: "000029202-000000004.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000005.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000006.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000007.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000008.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000009.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000010.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000011.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000012.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000013.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000014.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000015.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000016.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000017.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000018.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000019.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000020.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000021.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000022.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000023.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000024.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000025.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000026.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000027.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000028.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000029.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000030.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000031.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000032.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000033.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000034.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000035.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000036.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000037.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000038.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000039.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000040.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000041.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000042.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000043.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000044.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000045.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029202-000000046.tsm",
Size: 161480704,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029235-000000003.tsm",
Size: 96468992,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029267-000000003.tsm",
Size: 109051904,
},
FirstBlockCount: 224,
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000001.tsm",
Size: 3040870,
},
FirstBlockCount: 413,
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000002.tsm",
Size: 2254857830,
},
FirstBlockCount: 561,
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000003.tsm",
Size: 2254857830,
},
FirstBlockCount: 402,
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000004.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000005.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000006.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000007.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000008.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000009.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000010.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000011.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000012.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000013.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000014.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000015.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000016.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000017.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000018.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000019.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000020.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000021.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000022.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000023.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000024.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000025.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000026.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000027.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000028.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000029.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000030.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000031.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000032.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000033.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000034.tsm",
Size: 2254857830,
},
},
{
FileStat: tsm1.FileStat{
Path: "000029268-000000035.tsm",
Size: 1717986918,
},
FirstBlockCount: 368,
},
},
defaultBlockCount: tsdb.DefaultMaxPointsPerBlock,
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
common := testLevelResults{
level5Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{
"000029202-000000004.tsm",
"000029202-000000005.tsm",
"000029202-000000006.tsm",
"000029202-000000007.tsm",
"000029202-000000008.tsm",
"000029202-000000009.tsm",
"000029202-000000010.tsm",
"000029202-000000011.tsm",
"000029202-000000012.tsm",
"000029202-000000013.tsm",
"000029202-000000014.tsm",
"000029202-000000015.tsm",
"000029202-000000016.tsm",
"000029202-000000017.tsm",
"000029202-000000018.tsm",
"000029202-000000019.tsm",
"000029202-000000020.tsm",
"000029202-000000021.tsm",
"000029202-000000022.tsm",
"000029202-000000023.tsm",
"000029202-000000024.tsm",
"000029202-000000025.tsm",
"000029202-000000026.tsm",
"000029202-000000027.tsm",
"000029202-000000028.tsm",
"000029202-000000029.tsm",
"000029202-000000030.tsm",
"000029202-000000031.tsm",
"000029202-000000032.tsm",
"000029202-000000033.tsm",
"000029202-000000034.tsm",
"000029202-000000035.tsm",
"000029202-000000036.tsm",
"000029202-000000037.tsm",
"000029202-000000038.tsm",
"000029202-000000039.tsm",
"000029202-000000040.tsm",
"000029202-000000041.tsm",
"000029202-000000042.tsm",
"000029202-000000043.tsm",
"000029202-000000044.tsm",
"000029202-000000045.tsm",
"000029202-000000046.tsm",
"000029235-000000003.tsm",
"000029267-000000003.tsm",
"000029268-000000001.tsm",
"000029268-000000002.tsm",
"000029268-000000003.tsm",
"000029268-000000004.tsm",
"000029268-000000005.tsm",
"000029268-000000006.tsm",
"000029268-000000007.tsm",
"000029268-000000008.tsm",
"000029268-000000009.tsm",
"000029268-000000010.tsm",
"000029268-000000011.tsm",
"000029268-000000012.tsm",
"000029268-000000013.tsm",
"000029268-000000014.tsm",
"000029268-000000015.tsm",
"000029268-000000016.tsm",
"000029268-000000017.tsm",
"000029268-000000018.tsm",
"000029268-000000019.tsm",
"000029268-000000020.tsm",
"000029268-000000021.tsm",
"000029268-000000022.tsm",
"000029268-000000023.tsm",
"000029268-000000024.tsm",
"000029268-000000025.tsm",
"000029268-000000026.tsm",
"000029268-000000027.tsm",
"000029268-000000028.tsm",
"000029268-000000029.tsm",
"000029268-000000030.tsm",
"000029268-000000031.tsm",
"000029268-000000032.tsm",
"000029268-000000033.tsm",
"000029268-000000034.tsm",
"000029268-000000035.tsm",
},
tsdb.DefaultMaxPointsPerBlock,
},
},
}
switch planType {
case tsm1.PT_Standard, tsm1.PT_SmartOptimize:
return common
case tsm1.PT_NoOptimize:
return testLevelResults{}
}
return testLevelResults{}
},
},
{
name: "Mock another planned level inside scheduler",
files: []tsm1.ExtFileStat{
{
FileStat: tsm1.FileStat{
Path: "01-05.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "02-05.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "03-05.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "04-04.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "05-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 100,
},
{
FileStat: tsm1.FileStat{
Path: "06-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "07-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "08-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "09-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "10-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "11-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "12-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "13-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "14-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
},
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
standard := testLevelResults{
level1Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"05-01.tsm", "06-01.tsm", "07-01.tsm", "08-01.tsm", "09-01.tsm", "10-01.tsm", "11-01.tsm", "12-01.tsm"},
tsdb.DefaultMaxPointsPerBlock,
},
},
level5Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"01-05.tsm", "02-05.tsm", "03-05.tsm", "04-04.tsm"},
tsdb.DefaultAggressiveMaxPointsPerBlock,
},
},
}
common := testLevelResults{
level1Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"05-01.tsm", "06-01.tsm", "07-01.tsm", "08-01.tsm", "09-01.tsm", "10-01.tsm", "11-01.tsm", "12-01.tsm"},
tsdb.DefaultMaxPointsPerBlock,
},
},
}
switch planType {
case tsm1.PT_Standard:
return standard
case tsm1.PT_SmartOptimize, tsm1.PT_NoOptimize:
return common
}
return testLevelResults{}
},
},
{
name: "Mock another planned level inside scheduler aggress blocks middle",
files: []tsm1.ExtFileStat{
{
FileStat: tsm1.FileStat{
Path: "05-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 100,
},
{
FileStat: tsm1.FileStat{
Path: "06-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "07-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "08-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "09-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "10-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "11-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "01-05.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "02-05.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "03-05.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "04-04.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "12-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "13-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "14-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
},
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
standard := testLevelResults{
level1Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"05-01.tsm", "06-01.tsm", "07-01.tsm", "08-01.tsm", "09-01.tsm", "10-01.tsm", "11-01.tsm", "12-01.tsm"},
tsdb.DefaultMaxPointsPerBlock,
},
},
level5Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"01-05.tsm", "02-05.tsm", "03-05.tsm", "04-04.tsm"},
tsdb.DefaultAggressiveMaxPointsPerBlock,
},
},
}
common := testLevelResults{
level1Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"05-01.tsm", "06-01.tsm", "07-01.tsm", "08-01.tsm", "09-01.tsm", "10-01.tsm", "11-01.tsm", "12-01.tsm"},
tsdb.DefaultMaxPointsPerBlock,
},
},
}
switch planType {
case tsm1.PT_Standard:
return standard
case tsm1.PT_SmartOptimize, tsm1.PT_NoOptimize:
return common
}
return testLevelResults{}
},
},
{
name: "Mock another planned level inside scheduler aggress blocks end",
files: []tsm1.ExtFileStat{
{
FileStat: tsm1.FileStat{
Path: "05-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 100,
},
{
FileStat: tsm1.FileStat{
Path: "06-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "07-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "08-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "09-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "10-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "11-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "12-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "13-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "14-01.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 50,
},
{
FileStat: tsm1.FileStat{
Path: "01-05.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: 200,
},
{
FileStat: tsm1.FileStat{
Path: "02-05.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultAggressiveMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "03-05.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
{
FileStat: tsm1.FileStat{
Path: "04-04.tsm",
Size: 256 * 1024 * 1024,
},
FirstBlockCount: tsdb.DefaultMaxPointsPerBlock,
},
},
testShardTime: -1,
getResultByPlanType: func(planType tsm1.PlanType) testLevelResults {
standard := testLevelResults{
level1Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"05-01.tsm", "06-01.tsm", "07-01.tsm", "08-01.tsm", "09-01.tsm", "10-01.tsm", "11-01.tsm", "12-01.tsm"},
tsdb.DefaultMaxPointsPerBlock,
},
},
level5Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"01-05.tsm", "02-05.tsm", "03-05.tsm", "04-04.tsm"},
tsdb.DefaultAggressiveMaxPointsPerBlock,
},
},
}
common := testLevelResults{
level1Groups: []tsm1.PlannedCompactionGroup{
{
tsm1.CompactionGroup{"05-01.tsm", "06-01.tsm", "07-01.tsm", "08-01.tsm", "09-01.tsm", "10-01.tsm", "11-01.tsm", "12-01.tsm"},
tsdb.DefaultMaxPointsPerBlock,
},
},
}
switch planType {
case tsm1.PT_Standard:
return standard
case tsm1.PT_SmartOptimize, tsm1.PT_NoOptimize:
return common
}
return testLevelResults{}
},
},
}
e, err := NewEngine(tsdb.InmemIndexName)
require.NoError(t, err, "create engine")
e.SetEnabled(false)
require.NoError(t, e.Open(), "open engine")
defer func() { require.NoError(t, e.Close(), "close engine") }()
e.Compactor = tsm1.NewCompactor()
defer e.Compactor.Close()
for i := 0; i < 3; i++ {
for _, test := range tests {
var testName string
switch i {
case 0:
testName = test.name + "_PT_Standard"
case 1:
testName = test.name + "_PT_SmartOptimize"
case 2:
testName = test.name + "_PT_NoOptimize"
}
t.Run(testName, func(t *testing.T) {
ffs := newFakeFileStore(withExtFileStats(test.files), withDefaultBlockCount(test.defaultBlockCount))
cp := tsm1.NewDefaultPlanner(ffs, test.testShardTime)
e.MaxPointsPerBlock = tsdb.DefaultMaxPointsPerBlock
e.CompactionPlan = cp
e.Compactor.FileStore = ffs
// Arbitrary group length to use in Scheduler.SetDepth
mockGroupLen := 5
// Set the scheduler depth for our lower level groups.
// During PT_Standard this should still plan a level5 compaction group
// but during PT_SmartOptimize this should not.
e.Scheduler.SetDepth(1, mockGroupLen)
e.Scheduler.SetDepth(2, mockGroupLen)
// Normally this is called within PlanCompactions but because we want to simulate already running
// some compactions we will set them manually here.
atomic.StoreInt64(&e.Stats.TSMCompactionsActive[0], int64(mockGroupLen))
atomic.StoreInt64(&e.Stats.TSMCompactionsActive[1], int64(mockGroupLen))
// Should use PlanType 0 (PT_Standard), 1(PT_SmartOptimize), 2(PT_NoOptimize)
planType := tsm1.PlanType(i)
level1Groups, level2Groups, Level3Groups, Level4Groups, Level5Groups := e.PlanCompactions(planType)
compareLevelGroups(t, test.getResultByPlanType(planType).level1Groups, level1Groups, "unexpected level 1 Group")
compareLevelGroups(t, test.getResultByPlanType(planType).level2Groups, level2Groups, "unexpected level 2 Group")
compareLevelGroups(t, test.getResultByPlanType(planType).level3Groups, Level3Groups, "unexpected level 3 Group")
compareLevelGroups(t, test.getResultByPlanType(planType).level4Groups, Level4Groups, "unexpected level 4 Group")
compareLevelGroups(t, test.getResultByPlanType(planType).level5Groups, Level5Groups, "unexpected level 5 Group")
})
}
}
}
func compareLevelGroups(t *testing.T, exp, got []tsm1.PlannedCompactionGroup, message string) {
require.Lenf(t, got, len(exp), "%s %s", message, " collection length mismatch")
for i, group := range exp {
require.Equal(t, group.Group, got[i].Group, message)
require.Equal(t, group.PointsPerBlock, got[i].PointsPerBlock, message)
}
}
func assertValueEqual(t *testing.T, a, b tsm1.Value) {
if got, exp := a.UnixNano(), b.UnixNano(); got != exp {
t.Fatalf("time mismatch: got %v, exp %v", got, exp)
}
if got, exp := a.Value(), b.Value(); got != exp {
t.Fatalf("value mismatch: got %v, exp %v", got, exp)
}
}
func MustTSMWriter(dir string, gen int) (tsm1.TSMWriter, string) {
f := MustTempFile(dir)
oldName := f.Name()
// Windows can't rename a file while it's open. Close first, rename and
// then re-open
if err := f.Close(); err != nil {
panic(fmt.Sprintf("close temp file: %v", err))
}
newName := filepath.Join(filepath.Dir(oldName), tsm1.DefaultFormatFileName(gen, 1)+".tsm")
if err := os.Rename(oldName, newName); err != nil {
panic(fmt.Sprintf("create tsm file: %v", err))
}
var err error
f, err = os.OpenFile(newName, os.O_RDWR, 0666)
if err != nil {
panic(fmt.Sprintf("open tsm files: %v", err))
}
w, err := tsm1.NewTSMWriter(f)
if err != nil {
panic(fmt.Sprintf("create TSM writer: %v", err))
}
return w, newName
}
func MustWriteTSM(dir string, gen int, values map[string][]tsm1.Value) string {
w, name := MustTSMWriter(dir, gen)
keys := make([]string, 0, len(values))
for k := range values {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if err := w.Write([]byte(k), values[k]); err != nil {
panic(fmt.Sprintf("write TSM value: %v", err))
}
}
if err := w.WriteIndex(); err != nil {
panic(fmt.Sprintf("write TSM index: %v", err))
}
if err := w.Close(); err != nil {
panic(fmt.Sprintf("write TSM close: %v", err))
}
return name
}
func MustTSMReader(dir string, gen int, values map[string][]tsm1.Value) *tsm1.TSMReader {
return MustOpenTSMReader(MustWriteTSM(dir, gen, values))
}
func MustOpenTSMReader(name string) *tsm1.TSMReader {
f, err := os.Open(name)
if err != nil {
panic(fmt.Sprintf("open file: %v", err))
}
r, err := tsm1.NewTSMReader(f)
if err != nil {
panic(fmt.Sprintf("new reader: %v", err))
}
return r
}
type fakeFileStore struct {
PathsFn func() []tsm1.ExtFileStat
defaultBlockCount int
lastModified time.Time
// fakeFileStore blockCount holds a map of file paths from
// PathsFn.FileStat to a mock block count as an integer.
blockCount map[string]int
readers []*tsm1.TSMReader
}
func (w *fakeFileStore) Stats() []tsm1.ExtFileStat {
return w.PathsFn()
}
func (w *fakeFileStore) NextGeneration() int {
return 1
}
func (w *fakeFileStore) LastModified() time.Time {
return w.lastModified
}
func (w *fakeFileStore) TSMReader(path string) (*tsm1.TSMReader, error) {
r := MustOpenTSMReader(path)
w.readers = append(w.readers, r)
r.Ref()
return r, nil
}
func (w *fakeFileStore) Close() {
for _, r := range w.readers {
r.Close()
}
w.readers = nil
}
func (w *fakeFileStore) ParseFileName(path string) (int, int, error) {
return tsm1.DefaultParseFileName(path)
}