372 lines
12 KiB
Go
372 lines
12 KiB
Go
package tsm1
|
|
|
|
import (
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/golang/snappy"
|
|
)
|
|
|
|
func TestCache_NewCache(t *testing.T) {
|
|
c := NewCache(100)
|
|
if c == nil {
|
|
t.Fatalf("failed to create new cache")
|
|
}
|
|
|
|
if c.MaxSize() != 100 {
|
|
t.Fatalf("new cache max size not correct")
|
|
}
|
|
if c.Size() != 0 {
|
|
t.Fatalf("new cache size not correct")
|
|
}
|
|
if len(c.Keys()) != 0 {
|
|
t.Fatalf("new cache keys not correct: %v", c.Keys())
|
|
}
|
|
}
|
|
|
|
func TestCache_CacheWrite(t *testing.T) {
|
|
v0 := NewValue(time.Unix(1, 0).UTC(), 1.0)
|
|
v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)
|
|
v2 := NewValue(time.Unix(3, 0).UTC(), 3.0)
|
|
values := Values{v0, v1, v2}
|
|
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
|
|
|
|
c := NewCache(3 * valuesSize)
|
|
|
|
if err := c.Write("foo", values); err != nil {
|
|
t.Fatalf("failed to write key foo to cache: %s", err.Error())
|
|
}
|
|
if err := c.Write("bar", values); err != nil {
|
|
t.Fatalf("failed to write key foo to cache: %s", err.Error())
|
|
}
|
|
if n := c.Size(); n != 2*valuesSize {
|
|
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", 2*valuesSize, n)
|
|
}
|
|
|
|
if exp, keys := []string{"bar", "foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) {
|
|
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
|
|
}
|
|
}
|
|
|
|
func TestCache_CacheWriteMulti(t *testing.T) {
|
|
v0 := NewValue(time.Unix(1, 0).UTC(), 1.0)
|
|
v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)
|
|
v2 := NewValue(time.Unix(3, 0).UTC(), 3.0)
|
|
values := Values{v0, v1, v2}
|
|
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
|
|
|
|
c := NewCache(3 * valuesSize)
|
|
|
|
if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
|
|
t.Fatalf("failed to write key foo to cache: %s", err.Error())
|
|
}
|
|
if n := c.Size(); n != 2*valuesSize {
|
|
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", 2*valuesSize, n)
|
|
}
|
|
|
|
if exp, keys := []string{"bar", "foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) {
|
|
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
|
|
}
|
|
}
|
|
|
|
func TestCache_CacheValues(t *testing.T) {
|
|
v0 := NewValue(time.Unix(1, 0).UTC(), 0.0)
|
|
v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)
|
|
v2 := NewValue(time.Unix(3, 0).UTC(), 3.0)
|
|
v3 := NewValue(time.Unix(1, 0).UTC(), 1.0)
|
|
v4 := NewValue(time.Unix(4, 0).UTC(), 4.0)
|
|
|
|
c := NewCache(512)
|
|
if deduped := c.Values("no such key"); deduped != nil {
|
|
t.Fatalf("Values returned for no such key")
|
|
}
|
|
|
|
if err := c.Write("foo", Values{v0, v1, v2, v3}); err != nil {
|
|
t.Fatalf("failed to write 3 values, key foo to cache: %s", err.Error())
|
|
}
|
|
if err := c.Write("foo", Values{v4}); err != nil {
|
|
t.Fatalf("failed to write 1 value, key foo to cache: %s", err.Error())
|
|
}
|
|
|
|
expAscValues := Values{v3, v1, v2, v4}
|
|
if deduped := c.Values("foo"); !reflect.DeepEqual(expAscValues, deduped) {
|
|
t.Fatalf("deduped ascending values for foo incorrect, exp: %v, got %v", expAscValues, deduped)
|
|
}
|
|
}
|
|
|
|
func TestCache_CacheSnapshot(t *testing.T) {
|
|
v0 := NewValue(time.Unix(2, 0).UTC(), 0.0)
|
|
v1 := NewValue(time.Unix(3, 0).UTC(), 2.0)
|
|
v2 := NewValue(time.Unix(4, 0).UTC(), 3.0)
|
|
v3 := NewValue(time.Unix(5, 0).UTC(), 4.0)
|
|
v4 := NewValue(time.Unix(6, 0).UTC(), 5.0)
|
|
v5 := NewValue(time.Unix(1, 0).UTC(), 5.0)
|
|
|
|
c := NewCache(512)
|
|
if err := c.Write("foo", Values{v0, v1, v2, v3}); err != nil {
|
|
t.Fatalf("failed to write 3 values, key foo to cache: %s", err.Error())
|
|
}
|
|
|
|
// Grab snapshot, and ensure it's as expected.
|
|
snapshot := c.Snapshot()
|
|
expValues := Values{v0, v1, v2, v3}
|
|
if deduped := snapshot.values("foo"); !reflect.DeepEqual(expValues, deduped) {
|
|
t.Fatalf("snapshotted values for foo incorrect, exp: %v, got %v", expValues, deduped)
|
|
}
|
|
|
|
// Ensure cache is still as expected.
|
|
if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) {
|
|
t.Fatalf("post-snapshot values for foo incorrect, exp: %v, got %v", expValues, deduped)
|
|
}
|
|
|
|
// Write a new value to the cache.
|
|
if err := c.Write("foo", Values{v4}); err != nil {
|
|
t.Fatalf("failed to write post-snap value, key foo to cache: %s", err.Error())
|
|
}
|
|
expValues = Values{v0, v1, v2, v3, v4}
|
|
if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) {
|
|
t.Fatalf("post-snapshot write values for foo incorrect, exp: %v, got %v", expValues, deduped)
|
|
}
|
|
|
|
// Write a new, out-of-order, value to the cache.
|
|
if err := c.Write("foo", Values{v5}); err != nil {
|
|
t.Fatalf("failed to write post-snap value, key foo to cache: %s", err.Error())
|
|
}
|
|
expValues = Values{v5, v0, v1, v2, v3, v4}
|
|
if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) {
|
|
t.Fatalf("post-snapshot out-of-order write values for foo incorrect, exp: %v, got %v", expValues, deduped)
|
|
}
|
|
|
|
// Clear snapshot, ensuring non-snapshot data untouched.
|
|
c.ClearSnapshot(snapshot)
|
|
expValues = Values{v5, v4}
|
|
if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) {
|
|
t.Fatalf("post-clear values for foo incorrect, exp: %v, got %v", expValues, deduped)
|
|
}
|
|
}
|
|
|
|
func TestCache_CacheEmptySnapshot(t *testing.T) {
|
|
c := NewCache(512)
|
|
|
|
// Grab snapshot, and ensure it's as expected.
|
|
snapshot := c.Snapshot()
|
|
if deduped := snapshot.values("foo"); !reflect.DeepEqual(Values(nil), deduped) {
|
|
t.Fatalf("snapshotted values for foo incorrect, exp: %v, got %v", nil, deduped)
|
|
}
|
|
|
|
// Ensure cache is still as expected.
|
|
if deduped := c.Values("foo"); !reflect.DeepEqual(Values(nil), deduped) {
|
|
t.Fatalf("post-snapshotted values for foo incorrect, exp: %v, got %v", Values(nil), deduped)
|
|
}
|
|
|
|
// Clear snapshot.
|
|
c.ClearSnapshot(snapshot)
|
|
if deduped := c.Values("foo"); !reflect.DeepEqual(Values(nil), deduped) {
|
|
t.Fatalf("post-snapshot-clear values for foo incorrect, exp: %v, got %v", Values(nil), deduped)
|
|
}
|
|
}
|
|
|
|
func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
|
|
v0 := NewValue(time.Unix(1, 0).UTC(), 1.0)
|
|
v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)
|
|
|
|
c := NewCache(uint64(v1.Size()))
|
|
|
|
if err := c.Write("foo", Values{v0}); err != nil {
|
|
t.Fatalf("failed to write key foo to cache: %s", err.Error())
|
|
}
|
|
if exp, keys := []string{"foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) {
|
|
t.Fatalf("cache keys incorrect after writes, exp %v, got %v", exp, keys)
|
|
}
|
|
if err := c.Write("bar", Values{v1}); err != ErrCacheMemoryExceeded {
|
|
t.Fatalf("wrong error writing key bar to cache")
|
|
}
|
|
|
|
// Grab snapshot, write should still fail since we're still using the memory.
|
|
snapshot := c.Snapshot()
|
|
if err := c.Write("bar", Values{v1}); err != ErrCacheMemoryExceeded {
|
|
t.Fatalf("wrong error writing key bar to cache")
|
|
}
|
|
|
|
// Clear the snapshot and the write should now succeed.
|
|
c.ClearSnapshot(snapshot)
|
|
if err := c.Write("bar", Values{v1}); err != nil {
|
|
t.Fatalf("failed to write key foo to cache: %s", err.Error())
|
|
}
|
|
expAscValues := Values{v1}
|
|
if deduped := c.Values("bar"); !reflect.DeepEqual(expAscValues, deduped) {
|
|
t.Fatalf("deduped ascending values for bar incorrect, exp: %v, got %v", expAscValues, deduped)
|
|
}
|
|
}
|
|
|
|
// Ensure the CacheLoader can correctly load from a single segment, even if it's corrupted.
|
|
func TestCacheLoader_LoadSingle(t *testing.T) {
|
|
// Create a WAL segment.
|
|
dir := mustTempDir()
|
|
defer os.RemoveAll(dir)
|
|
f := mustTempFile(dir)
|
|
w := NewWALSegmentWriter(f)
|
|
|
|
p1 := NewValue(time.Unix(1, 0), 1.1)
|
|
p2 := NewValue(time.Unix(1, 0), int64(1))
|
|
p3 := NewValue(time.Unix(1, 0), true)
|
|
|
|
values := map[string][]Value{
|
|
"foo": []Value{p1},
|
|
"bar": []Value{p2},
|
|
"baz": []Value{p3},
|
|
}
|
|
|
|
entry := &WriteWALEntry{
|
|
Values: values,
|
|
}
|
|
|
|
if err := w.Write(mustMarshalEntry(entry)); err != nil {
|
|
t.Fatal("write points", err)
|
|
}
|
|
|
|
// Load the cache using the segment.
|
|
cache := NewCache(1024)
|
|
loader := NewCacheLoader([]string{f.Name()})
|
|
if err := loader.Load(cache); err != nil {
|
|
t.Fatalf("failed to load cache: %s", err.Error())
|
|
}
|
|
|
|
// Check the cache.
|
|
if values := cache.Values("foo"); !reflect.DeepEqual(values, Values{p1}) {
|
|
t.Fatalf("cache key foo not as expected, got %v, exp %v", values, Values{p1})
|
|
}
|
|
if values := cache.Values("bar"); !reflect.DeepEqual(values, Values{p2}) {
|
|
t.Fatalf("cache key foo not as expected, got %v, exp %v", values, Values{p2})
|
|
}
|
|
if values := cache.Values("baz"); !reflect.DeepEqual(values, Values{p3}) {
|
|
t.Fatalf("cache key foo not as expected, got %v, exp %v", values, Values{p3})
|
|
}
|
|
|
|
// Corrupt the WAL segment.
|
|
if _, err := f.Write([]byte{1, 4, 0, 0, 0}); err != nil {
|
|
t.Fatalf("corrupt WAL segment: %s", err.Error())
|
|
}
|
|
|
|
// Reload the cache using the segment.
|
|
cache = NewCache(1024)
|
|
loader = NewCacheLoader([]string{f.Name()})
|
|
if err := loader.Load(cache); err != nil {
|
|
t.Fatalf("failed to load cache: %s", err.Error())
|
|
}
|
|
|
|
// Check the cache.
|
|
if values := cache.Values("foo"); !reflect.DeepEqual(values, Values{p1}) {
|
|
t.Fatalf("cache key foo not as expected, got %v, exp %v", values, Values{p1})
|
|
}
|
|
if values := cache.Values("bar"); !reflect.DeepEqual(values, Values{p2}) {
|
|
t.Fatalf("cache key bar not as expected, got %v, exp %v", values, Values{p2})
|
|
}
|
|
if values := cache.Values("baz"); !reflect.DeepEqual(values, Values{p3}) {
|
|
t.Fatalf("cache key baz not as expected, got %v, exp %v", values, Values{p3})
|
|
}
|
|
}
|
|
|
|
// Ensure the CacheLoader can correctly load from two segments, even if one is corrupted.
|
|
func TestCacheLoader_LoadDouble(t *testing.T) {
|
|
// Create a WAL segment.
|
|
dir := mustTempDir()
|
|
defer os.RemoveAll(dir)
|
|
f1, f2 := mustTempFile(dir), mustTempFile(dir)
|
|
w1, w2 := NewWALSegmentWriter(f1), NewWALSegmentWriter(f2)
|
|
|
|
p1 := NewValue(time.Unix(1, 0), 1.1)
|
|
p2 := NewValue(time.Unix(1, 0), int64(1))
|
|
p3 := NewValue(time.Unix(1, 0), true)
|
|
p4 := NewValue(time.Unix(1, 0), "string")
|
|
|
|
// Write first and second segment.
|
|
|
|
segmentWrite := func(w *WALSegmentWriter, values map[string][]Value) {
|
|
entry := &WriteWALEntry{
|
|
Values: values,
|
|
}
|
|
if err := w1.Write(mustMarshalEntry(entry)); err != nil {
|
|
t.Fatal("write points", err)
|
|
}
|
|
}
|
|
|
|
values := map[string][]Value{
|
|
"foo": []Value{p1},
|
|
"bar": []Value{p2},
|
|
}
|
|
segmentWrite(w1, values)
|
|
values = map[string][]Value{
|
|
"baz": []Value{p3},
|
|
"qux": []Value{p4},
|
|
}
|
|
segmentWrite(w2, values)
|
|
|
|
// Corrupt the first WAL segment.
|
|
if _, err := f1.Write([]byte{1, 4, 0, 0, 0}); err != nil {
|
|
t.Fatalf("corrupt WAL segment: %s", err.Error())
|
|
}
|
|
|
|
// Load the cache using the segments.
|
|
cache := NewCache(1024)
|
|
loader := NewCacheLoader([]string{f1.Name(), f2.Name()})
|
|
if err := loader.Load(cache); err != nil {
|
|
t.Fatalf("failed to load cache: %s", err.Error())
|
|
}
|
|
|
|
// Check the cache.
|
|
if values := cache.Values("foo"); !reflect.DeepEqual(values, Values{p1}) {
|
|
t.Fatalf("cache key foo not as expected, got %v, exp %v", values, Values{p1})
|
|
}
|
|
if values := cache.Values("bar"); !reflect.DeepEqual(values, Values{p2}) {
|
|
t.Fatalf("cache key bar not as expected, got %v, exp %v", values, Values{p2})
|
|
}
|
|
if values := cache.Values("baz"); !reflect.DeepEqual(values, Values{p3}) {
|
|
t.Fatalf("cache key baz not as expected, got %v, exp %v", values, Values{p3})
|
|
}
|
|
if values := cache.Values("qux"); !reflect.DeepEqual(values, Values{p4}) {
|
|
t.Fatalf("cache key qux not as expected, got %v, exp %v", values, Values{p4})
|
|
}
|
|
}
|
|
|
|
func mustTempDir() string {
|
|
dir, err := ioutil.TempDir("", "tsm1-test")
|
|
if err != nil {
|
|
panic(fmt.Sprintf("failed to create temp dir: %v", err))
|
|
}
|
|
return dir
|
|
}
|
|
|
|
func mustTempFile(dir string) *os.File {
|
|
f, err := ioutil.TempFile(dir, "tsm1test")
|
|
if err != nil {
|
|
panic(fmt.Sprintf("failed to create temp file: %v", err))
|
|
}
|
|
return f
|
|
}
|
|
|
|
func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) {
|
|
bytes := make([]byte, 1024<<2)
|
|
|
|
b, err := entry.Encode(bytes)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("error encoding: %v", err))
|
|
}
|
|
|
|
return entry.Type(), snappy.Encode(b, b)
|
|
}
|
|
|
|
func BenchmarkCacheFloatEntries(b *testing.B) {
|
|
for i := 0; i < b.N; i++ {
|
|
cache := NewCache(10000)
|
|
for j := 0; j < 10000; j++ {
|
|
v := NewValue(time.Unix(1, 0), float64(j))
|
|
cache.Write("test", []Value{v})
|
|
}
|
|
}
|
|
}
|