Remove NewTSMReaderWithOptions

There are two TSMIndex implementations, the directIndex and the
indirectIndex.  Originally, we only had the directIndex and later
added the indirectIndex and NewTSMReaderWithOptions in order to
allow both indexes to be used in tests and code.  This has created
a problem since we really only use the directIndex for writing and
always use the indirectIndex for reading.

This changes removes the NewTSMReaderWithOptions func so that it is
no longer possible to create a TSMReader with a directIndex.  This
will allow a lot of the block reading code used by the directIndex
to be removed and simplify maintainence.  It also gives better test
coverage of the code that is actually used by the TSM engine now.
pull/6483/head
Jason Wilder 2016-04-25 14:23:00 -06:00
parent bc6328d196
commit a789e819a3
9 changed files with 309 additions and 194 deletions

View File

@ -470,11 +470,10 @@ func cmdDumpTsm1dev(opts *tsdmDumpOpts) {
}
b := make([]byte, 8)
r, err := tsm1.NewTSMReaderWithOptions(tsm1.TSMReaderOptions{
MMAPFile: f,
})
r, err := tsm1.NewTSMReader(f)
if err != nil {
println("Error opening TSM files: ", err.Error())
os.Exit(1)
}
defer r.Close()

View File

@ -46,10 +46,7 @@ func cmdVerify(path string) {
os.Exit(1)
}
reader, err := tsm1.NewTSMReaderWithOptions(tsm1.TSMReaderOptions{
MMAPFile: file,
})
reader, err := tsm1.NewTSMReader(file)
if err != nil {
fmt.Printf("%v", err)
os.Exit(1)
@ -73,6 +70,7 @@ func cmdVerify(path string) {
if brokenFileBlocks == 0 {
fmt.Fprintf(tw, "%s: healthy\n", f)
}
reader.Close()
}
fmt.Fprintf(tw, "Broken Blocks: %d / %d, in %vs\n", brokenBlocks, totalBlocks, time.Since(start).Seconds())

View File

@ -453,10 +453,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
return nil, err
}
tr, err := NewTSMReaderWithOptions(
TSMReaderOptions{
MMAPFile: f,
})
tr, err := NewTSMReader(f)
if err != nil {
return nil, err
}

View File

@ -1153,10 +1153,7 @@ func MustOpenTSMReader(name string) *tsm1.TSMReader {
panic(fmt.Sprintf("open file: %v", err))
}
r, err := tsm1.NewTSMReaderWithOptions(
tsm1.TSMReaderOptions{
MMAPFile: f,
})
r, err := tsm1.NewTSMReader(f)
if err != nil {
panic(fmt.Sprintf("new reader: %v", err))
}

View File

@ -320,9 +320,7 @@ func (f *FileStore) Open() error {
go func(idx int, file *os.File) {
start := time.Now()
df, err := NewTSMReaderWithOptions(TSMReaderOptions{
MMAPFile: file,
})
df, err := NewTSMReader(file)
if f.traceLogging {
f.Logger.Printf("%s (#%d) opened in %v", file.Name(), idx, time.Now().Sub(start))
}
@ -432,9 +430,7 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
return err
}
tsm, err := NewTSMReaderWithOptions(TSMReaderOptions{
MMAPFile: fd,
})
tsm, err := NewTSMReader(fd)
if err != nil {
return err
}

View File

@ -1,7 +1,6 @@
package tsm1_test
import (
"bytes"
"fmt"
"io/ioutil"
"os"
@ -12,7 +11,9 @@ import (
)
func TestFileStore_Read(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -21,7 +22,7 @@ func TestFileStore_Read(t *testing.T) {
keyValues{"mem", []tsm1.Value{tsm1.NewValue(0, 1.0)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -47,7 +48,9 @@ func TestFileStore_Read(t *testing.T) {
}
func TestFileStore_SeekToAsc_FromStart(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -56,7 +59,7 @@ func TestFileStore_SeekToAsc_FromStart(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 3.0)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -84,7 +87,9 @@ func TestFileStore_SeekToAsc_FromStart(t *testing.T) {
}
func TestFileStore_SeekToAsc_Duplicate(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -94,7 +99,7 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 4.0)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -137,7 +142,9 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) {
}
func TestFileStore_SeekToAsc_BeforeStart(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -146,7 +153,7 @@ func TestFileStore_SeekToAsc_BeforeStart(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, 3.0)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -176,7 +183,9 @@ func TestFileStore_SeekToAsc_BeforeStart(t *testing.T) {
// Tests that seeking and reading all blocks that contain overlapping points does
// not skip any blocks.
func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -186,7 +195,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 4.0), tsm1.NewValue(7, 7.0)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -222,7 +231,9 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) {
// Tests that seeking and reading all blocks that contain overlapping points does
// not skip any blocks.
func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -232,7 +243,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, int64(4)), tsm1.NewValue(7, int64(7))}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -268,7 +279,9 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) {
// Tests that seeking and reading all blocks that contain overlapping points does
// not skip any blocks.
func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -278,7 +291,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, false), tsm1.NewValue(7, true)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -314,7 +327,9 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) {
// Tests that seeking and reading all blocks that contain overlapping points does
// not skip any blocks.
func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -324,7 +339,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, "four"), tsm1.NewValue(7, "seven")}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -358,7 +373,9 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) {
}
func TestFileStore_SeekToAsc_Middle(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -368,7 +385,7 @@ func TestFileStore_SeekToAsc_Middle(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(4, 4.0)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -396,7 +413,9 @@ func TestFileStore_SeekToAsc_Middle(t *testing.T) {
}
func TestFileStore_SeekToAsc_End(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -405,7 +424,7 @@ func TestFileStore_SeekToAsc_End(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 3.0)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -432,7 +451,9 @@ func TestFileStore_SeekToAsc_End(t *testing.T) {
}
func TestFileStore_SeekToDesc_FromStart(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -441,7 +462,7 @@ func TestFileStore_SeekToDesc_FromStart(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 3.0)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -468,7 +489,9 @@ func TestFileStore_SeekToDesc_FromStart(t *testing.T) {
}
func TestFileStore_SeekToDesc_Duplicate(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -478,7 +501,7 @@ func TestFileStore_SeekToDesc_Duplicate(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 3.0)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -508,7 +531,9 @@ func TestFileStore_SeekToDesc_Duplicate(t *testing.T) {
}
func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -517,7 +542,7 @@ func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, 3.0)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -544,7 +569,9 @@ func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) {
}
func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -554,7 +581,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, 4.0), tsm1.NewValue(7, 7.0)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -588,7 +615,9 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) {
}
func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -598,7 +627,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, int64(4)), tsm1.NewValue(7, int64(7))}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -632,7 +661,9 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) {
}
func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -642,7 +673,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, true), tsm1.NewValue(7, false)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -676,7 +707,9 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) {
}
func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -686,7 +719,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, "four"), tsm1.NewValue(7, "seven")}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -720,7 +753,9 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) {
}
func TestFileStore_SeekToDesc_Middle(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -732,7 +767,7 @@ func TestFileStore_SeekToDesc_Middle(t *testing.T) {
},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -760,7 +795,9 @@ func TestFileStore_SeekToDesc_Middle(t *testing.T) {
}
func TestFileStore_SeekToDesc_End(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -769,7 +806,7 @@ func TestFileStore_SeekToDesc_End(t *testing.T) {
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 3.0)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -909,7 +946,9 @@ func TestFileStore_Open_Deleted(t *testing.T) {
}
func TestFileStore_Delete(t *testing.T) {
fs := tsm1.NewFileStore("")
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -918,7 +957,7 @@ func TestFileStore_Delete(t *testing.T) {
keyValues{"mem,host=server1!~#!value", []tsm1.Value{tsm1.NewValue(0, 1.0)}},
}
files, err := newFiles(data...)
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
@ -1003,12 +1042,13 @@ func newFileDir(dir string, values ...keyValues) ([]string, error) {
}
func newFiles(values ...keyValues) ([]tsm1.TSMFile, error) {
func newFiles(dir string, values ...keyValues) ([]tsm1.TSMFile, error) {
var files []tsm1.TSMFile
id := 1
for _, v := range values {
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
return nil, err
}
@ -1025,7 +1065,17 @@ func newFiles(values ...keyValues) ([]tsm1.TSMFile, error) {
return nil, err
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
newName := filepath.Join(filepath.Dir(f.Name()), tsmFileName(id))
if err := os.Rename(f.Name(), newName); err != nil {
return nil, err
}
id++
fd, err := os.Open(newName)
if err != nil {
return nil, err
}
r, err := tsm1.NewTSMReader(fd)
if err != nil {
return nil, err
}

View File

@ -112,46 +112,17 @@ type TSMReaderOptions struct {
MMAPFile *os.File
}
func NewTSMReader(r io.ReadSeeker) (*TSMReader, error) {
return NewTSMReaderWithOptions(
TSMReaderOptions{
Reader: r,
})
}
func NewTSMReaderWithOptions(opt TSMReaderOptions) (*TSMReader, error) {
func NewTSMReader(f *os.File) (*TSMReader, error) {
t := &TSMReader{}
if opt.Reader != nil {
// Seek to the end of the file to determine the size
size, err := opt.Reader.Seek(0, os.SEEK_END)
if err != nil {
return nil, err
}
t.size = size
if f, ok := opt.Reader.(*os.File); ok {
stat, err := f.Stat()
if err != nil {
return nil, err
}
t.lastModified = stat.ModTime().UnixNano()
}
t.accessor = &fileAccessor{
r: opt.Reader,
}
} else if opt.MMAPFile != nil {
stat, err := opt.MMAPFile.Stat()
if err != nil {
return nil, err
}
t.size = stat.Size()
t.lastModified = stat.ModTime().UnixNano()
t.accessor = &mmapAccessor{
f: opt.MMAPFile,
}
} else {
panic("invalid options: need Reader or MMAPFile")
stat, err := f.Stat()
if err != nil {
return nil, err
}
t.size = stat.Size()
t.lastModified = stat.ModTime().UnixNano()
t.accessor = &mmapAccessor{
f: f,
}
index, err := t.accessor.init()

View File

@ -1,7 +1,6 @@
package tsm1_test
import (
"bytes"
"fmt"
"os"
"testing"
@ -10,8 +9,11 @@ import (
)
func TestTSMReader_Type(t *testing.T) {
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
@ -29,7 +31,11 @@ func TestTSMReader_Type(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
f, err = os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error opening: %v", err)
}
r, err := tsm1.NewTSMReader(f)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
@ -92,10 +98,7 @@ func TestTSMReader_MMAP_ReadAll(t *testing.T) {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReaderWithOptions(
tsm1.TSMReaderOptions{
MMAPFile: f,
})
r, err := tsm1.NewTSMReader(f)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
@ -172,10 +175,7 @@ func TestTSMReader_MMAP_Read(t *testing.T) {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReaderWithOptions(
tsm1.TSMReaderOptions{
MMAPFile: f,
})
r, err := tsm1.NewTSMReader(f)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
@ -253,10 +253,7 @@ func TestTSMReader_MMAP_Keys(t *testing.T) {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReaderWithOptions(
tsm1.TSMReaderOptions{
MMAPFile: f,
})
r, err := tsm1.NewTSMReader(f)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
@ -319,10 +316,7 @@ func TestTSMReader_MMAP_Tombstone(t *testing.T) {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReaderWithOptions(
tsm1.TSMReaderOptions{
MMAPFile: f,
})
r, err := tsm1.NewTSMReader(f)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
@ -331,10 +325,7 @@ func TestTSMReader_MMAP_Tombstone(t *testing.T) {
t.Fatalf("unexpected error deleting: %v", err)
}
r, err = tsm1.NewTSMReaderWithOptions(
tsm1.TSMReaderOptions{
MMAPFile: f,
})
r, err = tsm1.NewTSMReader(f)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
@ -379,10 +370,7 @@ func TestTSMReader_MMAP_Stats(t *testing.T) {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReaderWithOptions(
tsm1.TSMReaderOptions{
MMAPFile: f,
})
r, err := tsm1.NewTSMReader(f)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
@ -420,10 +408,7 @@ func TestTSMReader_VerifiesFileType(t *testing.T) {
// write some garbage
f.Write([]byte{0x23, 0xac, 0x99, 0x22, 0x77, 0x23, 0xac, 0x99, 0x22, 0x77, 0x23, 0xac, 0x99, 0x22, 0x77, 0x23, 0xac, 0x99, 0x22, 0x77})
_, err := tsm1.NewTSMReaderWithOptions(
tsm1.TSMReaderOptions{
MMAPFile: f,
})
_, err := tsm1.NewTSMReader(f)
if err == nil {
t.Fatal("expected error trying to open non-tsm file")
}
@ -554,8 +539,11 @@ func TestIndirectIndex_Keys(t *testing.T) {
}
func TestBlockIterator_Single(t *testing.T) {
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
@ -573,7 +561,12 @@ func TestBlockIterator_Single(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error opening: %v", err)
}
r, err := tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
@ -612,8 +605,11 @@ func TestBlockIterator_Single(t *testing.T) {
}
func TestBlockIterator_MultipleBlocks(t *testing.T) {
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
@ -636,7 +632,12 @@ func TestBlockIterator_MultipleBlocks(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error opening: %v", err)
}
r, err := tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
@ -677,8 +678,11 @@ func TestBlockIterator_MultipleBlocks(t *testing.T) {
}
}
func TestBlockIterator_Sorted(t *testing.T) {
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
@ -705,7 +709,12 @@ func TestBlockIterator_Sorted(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error opening: %v", err)
}
r, err := tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
@ -766,10 +775,7 @@ func TestIndirectIndex_UnmarshalBinary_BlockCountOverflow(t *testing.T) {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReaderWithOptions(
tsm1.TSMReaderOptions{
MMAPFile: f,
})
r, err := tsm1.NewTSMReader(f)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
@ -777,8 +783,11 @@ func TestIndirectIndex_UnmarshalBinary_BlockCountOverflow(t *testing.T) {
}
func TestCompacted_NotFull(t *testing.T) {
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
@ -796,7 +805,12 @@ func TestCompacted_NotFull(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
@ -864,10 +878,7 @@ func TestTSMReader_File_ReadAll(t *testing.T) {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReaderWithOptions(
tsm1.TSMReaderOptions{
Reader: f,
})
r, err := tsm1.NewTSMReader(f)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
@ -944,10 +955,7 @@ func TestTSMReader_File_Read(t *testing.T) {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReaderWithOptions(
tsm1.TSMReaderOptions{
Reader: f,
})
r, err := tsm1.NewTSMReader(f)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}

View File

@ -3,6 +3,7 @@ package tsm1_test
import (
"bytes"
"encoding/binary"
"io/ioutil"
"os"
"testing"
@ -46,8 +47,11 @@ func TestTSMWriter_Write_NoValues(t *testing.T) {
}
func TestTSMWriter_Write_Single(t *testing.T) {
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
@ -65,17 +69,32 @@ func TestTSMWriter_Write_Single(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
if got, exp := len(b.Bytes()), 5; got < exp {
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error open file: %v", err)
}
b, err := ioutil.ReadAll(fd)
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(b), 5; got < exp {
t.Fatalf("file size mismatch: got %v, exp %v", got, exp)
}
if got := binary.BigEndian.Uint32(b.Bytes()[0:4]); got != tsm1.MagicNumber {
if got := binary.BigEndian.Uint32(b[0:4]); got != tsm1.MagicNumber {
t.Fatalf("magic number mismatch: got %v, exp %v", got, tsm1.MagicNumber)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
if _, err := fd.Seek(0, os.SEEK_SET); err != nil {
t.Fatalf("unexpected error seeking: %v", err)
}
r, err := tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
defer r.Close()
readValues, err := r.ReadAll("cpu")
if err != nil {
@ -94,8 +113,11 @@ func TestTSMWriter_Write_Single(t *testing.T) {
}
func TestTSMWriter_Write_Multiple(t *testing.T) {
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
@ -122,10 +144,16 @@ func TestTSMWriter_Write_Multiple(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
defer r.Close()
for _, d := range data {
readValues, err := r.ReadAll(d.key)
@ -146,8 +174,11 @@ func TestTSMWriter_Write_Multiple(t *testing.T) {
}
func TestTSMWriter_Write_MultipleKeyValues(t *testing.T) {
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
@ -180,10 +211,16 @@ func TestTSMWriter_Write_MultipleKeyValues(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
defer r.Close()
for _, d := range data {
readValues, err := r.ReadAll(d.key)
@ -205,8 +242,11 @@ func TestTSMWriter_Write_MultipleKeyValues(t *testing.T) {
// Tests that writing keys in reverse is able to read them back.
func TestTSMWriter_Write_ReverseKeys(t *testing.T) {
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
@ -239,10 +279,16 @@ func TestTSMWriter_Write_ReverseKeys(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
defer r.Close()
for _, d := range data {
readValues, err := r.ReadAll(d.key)
@ -264,8 +310,11 @@ func TestTSMWriter_Write_ReverseKeys(t *testing.T) {
// Tests that writing keys in reverse is able to read them back.
func TestTSMWriter_Write_SameKey(t *testing.T) {
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
@ -298,10 +347,16 @@ func TestTSMWriter_Write_SameKey(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
defer r.Close()
values := append(data[0].values, data[1].values...)
@ -324,8 +379,11 @@ func TestTSMWriter_Write_SameKey(t *testing.T) {
// Tests that calling Read returns all the values for block matching the key
// and timestamp
func TestTSMWriter_Read_Multiple(t *testing.T) {
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
@ -358,10 +416,16 @@ func TestTSMWriter_Read_Multiple(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
defer r.Close()
for _, values := range data {
// Try the first timestamp
@ -399,9 +463,11 @@ func TestTSMWriter_Read_Multiple(t *testing.T) {
}
func TestTSMWriter_WriteBlock_Empty(t *testing.T) {
// Write a new TSM file
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
@ -414,15 +480,28 @@ func TestTSMWriter_WriteBlock_Empty(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
if got, exp := len(b.Bytes()), 0; got < exp {
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error open file: %v", err)
}
defer fd.Close()
b, err := ioutil.ReadAll(fd)
if err != nil {
t.Fatalf("unexpected error read all: %v", err)
}
if got, exp := len(b), 0; got < exp {
t.Fatalf("file size mismatch: got %v, exp %v", got, exp)
}
}
func TestTSMWriter_WriteBlock_Multiple(t *testing.T) {
// Write a new TSM file
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
@ -449,22 +528,36 @@ func TestTSMWriter_WriteBlock_Multiple(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
if got, exp := len(b.Bytes()), 5; got < exp {
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error open file: %v", err)
}
defer fd.Close()
b, err := ioutil.ReadAll(fd)
if err != nil {
t.Fatalf("unexpected error read all: %v", err)
}
if got, exp := len(b), 5; got < exp {
t.Fatalf("file size mismatch: got %v, exp %v", got, exp)
}
if got := binary.BigEndian.Uint32(b.Bytes()[0:4]); got != tsm1.MagicNumber {
if got := binary.BigEndian.Uint32(b[0:4]); got != tsm1.MagicNumber {
t.Fatalf("magic number mismatch: got %v, exp %v", got, tsm1.MagicNumber)
}
if _, err := fd.Seek(0, os.SEEK_SET); err != nil {
t.Fatalf("error seeking: %v", err)
}
// Create reader for that file
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
r, err := tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
// Using the reader, write a new file using WriteBlocks
b.Reset()
w, err = tsm1.NewTSMWriter(&b)
f = MustTempFile(dir)
w, err = tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
@ -487,12 +580,18 @@ func TestTSMWriter_WriteBlock_Multiple(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
fd, err = os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error open file: %v", err)
}
// Now create a reader to verify the written blocks matches the originally
// written file using Write
r, err = tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
r, err = tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
defer r.Close()
for _, d := range data {
readValues, err := r.ReadAll(d.key)