diff --git a/tsdb/engine/tsm1/data_file.go b/tsdb/engine/tsm1/data_file.go index fef499c5ee..85560bdc25 100644 --- a/tsdb/engine/tsm1/data_file.go +++ b/tsdb/engine/tsm1/data_file.go @@ -484,7 +484,7 @@ func (d *indirectIndex) search(key string) int { // Entries returns all index entries for a key. func (d *indirectIndex) Entries(key string) []*IndexEntry { ofs := d.search(key) - if ofs < len(d.offsets) { + if ofs < len(d.b) { n, k, err := readKey(d.b[ofs:]) if err != nil { panic(fmt.Sprintf("error reading key: %v", err)) @@ -689,70 +689,66 @@ func (t *tsmWriter) Size() int { type tsmReader struct { mu sync.Mutex - r io.ReadSeeker - indexStart, indexEnd int64 - index TSMIndex + // accessor provides access and decoding of blocks for the reader + accessor blockAccessor + // index is the index of all blocks. + index TSMIndex + + // tombstoner ensures tombstoned keys are not available by the index. tombstoner *Tombstoner } +// blockAccessor abstracts a method of accessing blocks from a +// TSM file. +type blockAccessor interface { + init() (TSMIndex, error) + read(key string, timestamp time.Time) ([]Value, error) + readAll(key string) ([]Value, error) + path() string + close() error +} + +type TSMReaderOptions struct { + // Reader is used to create file IO based reader. + Reader io.ReadSeeker + + // MMAPFile is used to create an MMAP based reader. + MMAPFile *os.File +} + func NewTSMReader(r io.ReadSeeker) (*tsmReader, error) { - t := &tsmReader{r: r} + return NewTSMReaderWithOptions( + TSMReaderOptions{ + Reader: r, + }) +} + +func NewTSMReaderWithOptions(opt TSMReaderOptions) (*tsmReader, error) { + t := &tsmReader{} + if opt.Reader != nil { + t.accessor = &fileAccessor{r: opt.Reader} + } else if opt.MMAPFile != nil { + t.accessor = &mmapAccessor{f: opt.MMAPFile} + } else { + panic("invalid options: need Reader or MMAPFile") + } + + index, err := t.accessor.init() + if err != nil { + return nil, err + } + + t.index = index t.tombstoner = &Tombstoner{Path: t.Path()} - if err := t.init(); err != nil { + if err := t.applyTombstones(); err != nil { return nil, err } return t, nil } -func (t *tsmReader) init() error { - // Current the readers size - size, err := t.r.Seek(0, os.SEEK_END) - if err != nil { - return fmt.Errorf("init: failed to seek: %v", err) - } - - t.indexEnd = size - 8 - - // Seek to index location pointer - _, err = t.r.Seek(-8, os.SEEK_END) - if err != nil { - return fmt.Errorf("init: failed to seek to index ptr: %v", err) - } - - // Read the absolute position of the start of the index - b := make([]byte, 8) - _, err = t.r.Read(b) - if err != nil { - return fmt.Errorf("init: failed to read index ptr: %v", err) - - } - - t.indexStart = int64(btou64(b)) - - _, err = t.r.Seek(t.indexStart, os.SEEK_SET) - if err != nil { - return fmt.Errorf("init: failed to seek to index: %v", err) - } - - b = make([]byte, t.indexEnd-t.indexStart) - t.index = &directIndex{ - blocks: map[string]*indexEntries{}, - } - _, err = t.r.Read(b) - if err != nil { - return fmt.Errorf("init: read index: %v", err) - } - - if err := t.index.UnmarshalBinary(b); err != nil { - return fmt.Errorf("init: unmarshal error: %v", err) - } - - return t.applyTombstones() -} - func (t *tsmReader) applyTombstones() error { // Read any tombstone entries if the exist tombstones, err := t.tombstoner.ReadAll() @@ -760,7 +756,7 @@ func (t *tsmReader) applyTombstones() error { return fmt.Errorf("init: read tombstones: %v", err) } - // Update our our index + // Update our index for _, tombstone := range tombstones { t.index.Delete(tombstone) } @@ -771,10 +767,7 @@ func (t *tsmReader) Path() string { t.mu.Lock() defer t.mu.Unlock() - if f, ok := t.r.(*os.File); ok { - return f.Name() - } - return "" + return t.accessor.path() } func (t *tsmReader) Keys() []string { @@ -785,35 +778,7 @@ func (t *tsmReader) Read(key string, timestamp time.Time) ([]Value, error) { t.mu.Lock() defer t.mu.Unlock() - block := t.index.Entry(key, timestamp) - if block == nil { - return nil, nil - } - - // TODO: remove this allocation - b := make([]byte, 16*1024) - _, err := t.r.Seek(block.Offset, os.SEEK_SET) - if err != nil { - return nil, err - } - - if int(block.Size) > len(b) { - b = make([]byte, block.Size) - } - - n, err := t.r.Read(b) - if err != nil { - return nil, err - } - - //TODO: Validate checksum - var values []Value - values, err = DecodeBlock(b[4:n], values) - if err != nil { - return nil, err - } - - return values, nil + return t.accessor.read(key, timestamp) } // ReadAll returns all values for a key in all blocks. @@ -821,47 +786,7 @@ func (t *tsmReader) ReadAll(key string) ([]Value, error) { t.mu.Lock() defer t.mu.Unlock() - var values []Value - blocks := t.index.Entries(key) - if len(blocks) == 0 { - return values, nil - } - - var temp []Value - // TODO: we can determine the max block size when loading the file create/re-use - // a reader level buf then. - b := make([]byte, 16*1024) - var pos int64 - for _, block := range blocks { - // Skip the seek call if we are already at the position we're seeking to - if pos != block.Offset { - _, err := t.r.Seek(block.Offset, os.SEEK_SET) - if err != nil { - return nil, err - } - pos = block.Offset - } - - if int(block.Size) > len(b) { - b = make([]byte, block.Size) - } - - n, err := t.r.Read(b[:block.Size]) - if err != nil { - return nil, err - } - pos += int64(block.Size) - - //TODO: Validate checksum - temp = temp[:0] - temp, err = DecodeBlock(b[4:n], temp) - if err != nil { - return nil, err - } - values = append(values, temp...) - } - - return values, nil + return t.accessor.readAll(key) } func (t *tsmReader) Type(key string) (byte, error) { @@ -872,10 +797,7 @@ func (t *tsmReader) Close() error { t.mu.Lock() defer t.mu.Unlock() - if c, ok := t.r.(io.Closer); ok { - return c.Close() - } - return nil + return t.accessor.close() } func (t *tsmReader) Contains(key string) bool { @@ -922,6 +844,245 @@ func (t *tsmReader) IndexSize() int { return t.index.Size() } +// fileAccessor is file IO based block accessor. It provides access to blocks +// using a file IO based approach (seek, read, etc.) +type fileAccessor struct { + r io.ReadSeeker + index TSMIndex +} + +func (f *fileAccessor) init() (TSMIndex, error) { + // Current the readers size + size, err := f.r.Seek(0, os.SEEK_END) + if err != nil { + return nil, fmt.Errorf("init: failed to seek: %v", err) + } + + indexEnd := size - 8 + + // Seek to index location pointer + _, err = f.r.Seek(-8, os.SEEK_END) + if err != nil { + return nil, fmt.Errorf("init: failed to seek to index ptr: %v", err) + } + + // Read the absolute position of the start of the index + b := make([]byte, 8) + _, err = f.r.Read(b) + if err != nil { + return nil, fmt.Errorf("init: failed to read index ptr: %v", err) + + } + + indexStart := int64(btou64(b)) + + _, err = f.r.Seek(indexStart, os.SEEK_SET) + if err != nil { + return nil, fmt.Errorf("init: failed to seek to index: %v", err) + } + + b = make([]byte, indexEnd-indexStart) + f.index = &directIndex{ + blocks: map[string]*indexEntries{}, + } + _, err = f.r.Read(b) + if err != nil { + return nil, fmt.Errorf("init: read index: %v", err) + } + + if err := f.index.UnmarshalBinary(b); err != nil { + return nil, fmt.Errorf("init: unmarshal error: %v", err) + } + + return f.index, nil +} + +func (f *fileAccessor) read(key string, timestamp time.Time) ([]Value, error) { + block := f.index.Entry(key, timestamp) + if block == nil { + return nil, nil + } + + // TODO: remove this allocation + b := make([]byte, 16*1024) + _, err := f.r.Seek(block.Offset, os.SEEK_SET) + if err != nil { + return nil, err + } + + if int(block.Size) > len(b) { + b = make([]byte, block.Size) + } + + n, err := f.r.Read(b) + if err != nil { + return nil, err + } + + //TODO: Validate checksum + var values []Value + values, err = DecodeBlock(b[4:n], values) + if err != nil { + return nil, err + } + + return values, nil +} + +// ReadAll returns all values for a key in all blocks. +func (f *fileAccessor) readAll(key string) ([]Value, error) { + var values []Value + blocks := f.index.Entries(key) + if len(blocks) == 0 { + return values, nil + } + + var temp []Value + // TODO: we can determine the max block size when loading the file create/re-use + // a reader level buf then. + b := make([]byte, 16*1024) + var pos int64 + for _, block := range blocks { + // Skip the seek call if we are already at the position we're seeking to + if pos != block.Offset { + _, err := f.r.Seek(block.Offset, os.SEEK_SET) + if err != nil { + return nil, err + } + pos = block.Offset + } + + if int(block.Size) > len(b) { + b = make([]byte, block.Size) + } + + n, err := f.r.Read(b[:block.Size]) + if err != nil { + return nil, err + } + pos += int64(block.Size) + + //TODO: Validate checksum + temp = temp[:0] + temp, err = DecodeBlock(b[4:n], temp) + if err != nil { + return nil, err + } + values = append(values, temp...) + } + + return values, nil +} + +func (f *fileAccessor) path() string { + if fd, ok := f.r.(*os.File); ok { + return fd.Name() + } + return "" +} + +func (f *fileAccessor) close() error { + if c, ok := f.r.(io.Closer); ok { + return c.Close() + } + return nil +} + +// mmapAccess is mmap based block accessor. It access blocks through an +// MMAP file interface. +type mmapAccessor struct { + f *os.File + b []byte + index TSMIndex +} + +func (m *mmapAccessor) init() (TSMIndex, error) { + var err error + + if _, err := m.f.Seek(0, 0); err != nil { + return nil, err + } + + stat, err := m.f.Stat() + if err != nil { + return nil, err + } + + m.b, err = mmap(m.f, 0, int(stat.Size())) + if err != nil { + return nil, err + } + + indexOfsPos := len(m.b) - 8 + indexStart := btou64(m.b[indexOfsPos : indexOfsPos+8]) + + m.index = NewIndirectIndex() + if err := m.index.UnmarshalBinary(m.b[indexStart:indexOfsPos]); err != nil { + return nil, err + } + + return m.index, nil +} + +func (m *mmapAccessor) read(key string, timestamp time.Time) ([]Value, error) { + block := m.index.Entry(key, timestamp) + if block == nil { + return nil, nil + } + + //TODO: Validate checksum + var values []Value + var err error + values, err = DecodeBlock(m.b[block.Offset+4:block.Offset+4+int64(block.Size)], values) + if err != nil { + return nil, err + } + + return values, nil +} + +// ReadAll returns all values for a key in all blocks. +func (m *mmapAccessor) readAll(key string) ([]Value, error) { + blocks := m.index.Entries(key) + if len(blocks) == 0 { + return nil, nil + } + + var temp []Value + var err error + var values []Value + for _, block := range blocks { + //TODO: Validate checksum + temp = temp[:0] + // The +4 is the 4 byte checksum length + temp, err = DecodeBlock(m.b[block.Offset+4:block.Offset+4+int64(block.Size)], temp) + if err != nil { + return nil, err + } + values = append(values, temp...) + } + + return values, nil +} + +func (m *mmapAccessor) path() string { + return m.f.Name() +} + +func (m *mmapAccessor) close() error { + if m.b == nil { + return nil + } + + err := munmap(m.b) + if err != nil { + return err + } + + m.b = nil + return m.f.Close() +} + type indexEntries struct { Type byte entries []*IndexEntry diff --git a/tsdb/engine/tsm1/data_file_test.go b/tsdb/engine/tsm1/data_file_test.go index 01202289e3..f5c0580584 100644 --- a/tsdb/engine/tsm1/data_file_test.go +++ b/tsdb/engine/tsm1/data_file_test.go @@ -3,6 +3,7 @@ package tsm1_test import ( "bytes" "encoding/binary" + "os" "testing" "time" @@ -512,3 +513,111 @@ func TestTSMWriter_Type(t *testing.T) { t.Fatalf("type mismatch: got %v, exp %v", got, exp) } } + +func TestTSMReader_MMAP_ReadAll(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + f := MustTempFile(dir) + defer f.Close() + + w, err := tsm1.NewTSMWriter(f) + if err != nil { + t.Fatalf("unexpected error creating writer: %v", err) + } + + values := []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 1.0)} + if err := w.Write("cpu", values); err != nil { + t.Fatalf("unexpeted error writing: %v", err) + + } + if err := w.WriteIndex(); err != nil { + t.Fatalf("unexpeted error writing index: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatalf("unexpeted error closing: %v", err) + } + + f, err = os.Open(f.Name()) + if err != nil { + t.Fatalf("unexpeted error open file: %v", err) + } + + r, err := tsm1.NewTSMReaderWithOptions( + tsm1.TSMReaderOptions{ + MMAPFile: f, + }) + if err != nil { + t.Fatalf("unexpected error created reader: %v", err) + } + defer r.Close() + + readValues, err := r.ReadAll("cpu") + if err != nil { + t.Fatalf("unexpeted error readin: %v", err) + } + + if len(readValues) != len(values) { + t.Fatalf("read values length mismatch: got %v, exp %v", len(readValues), len(values)) + } + + for i, v := range values { + if v.Value() != readValues[i].Value() { + t.Fatalf("read value mismatch(%d): got %v, exp %d", i, readValues[i].Value(), v.Value()) + } + } +} + +func TestTSMReader_MMAP_Read(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + f := MustTempFile(dir) + defer f.Close() + + w, err := tsm1.NewTSMWriter(f) + if err != nil { + t.Fatalf("unexpected error creating writer: %v", err) + } + + values := []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 1.0)} + if err := w.Write("cpu", values); err != nil { + t.Fatalf("unexpeted error writing: %v", err) + + } + if err := w.WriteIndex(); err != nil { + t.Fatalf("unexpeted error writing index: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatalf("unexpeted error closing: %v", err) + } + + f, err = os.Open(f.Name()) + if err != nil { + t.Fatalf("unexpeted error open file: %v", err) + } + + r, err := tsm1.NewTSMReaderWithOptions( + tsm1.TSMReaderOptions{ + MMAPFile: f, + }) + if err != nil { + t.Fatalf("unexpected error created reader: %v", err) + } + defer r.Close() + + readValues, err := r.Read("cpu", time.Unix(0, 0)) + if err != nil { + t.Fatalf("unexpeted error readin: %v", err) + } + + if len(readValues) != len(values) { + t.Fatalf("read values length mismatch: got %v, exp %v", len(readValues), len(values)) + } + + for i, v := range values { + if v.Value() != readValues[i].Value() { + t.Fatalf("read value mismatch(%d): got %v, exp %d", i, readValues[i].Value(), v.Value()) + } + } +} diff --git a/tsdb/engine/tsm1/mmap_unix.go b/tsdb/engine/tsm1/mmap_unix.go new file mode 100644 index 0000000000..e178232aa1 --- /dev/null +++ b/tsdb/engine/tsm1/mmap_unix.go @@ -0,0 +1,35 @@ +// +build !windows,!plan9,!solaris + +package tsm1 + +import ( + "os" + "syscall" + "unsafe" +) + +func mmap(f *os.File, offset int64, length int) ([]byte, error) { + mmap, err := syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED) + if err != nil { + return nil, err + } + + if err := madvise(mmap, syscall.MADV_RANDOM); err != nil { + return nil, err + } + + return mmap, nil +} + +func munmap(b []byte) (err error) { + return syscall.Munmap(b) +} + +// From: github.com/boltdb/bolt/bolt_unix.go +func madvise(b []byte, advice int) (err error) { + _, _, e1 := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice)) + if e1 != 0 { + err = e1 + } + return +} diff --git a/tsdb/engine/tsm1/mmap_windows.go b/tsdb/engine/tsm1/mmap_windows.go new file mode 100644 index 0000000000..64dafe792f --- /dev/null +++ b/tsdb/engine/tsm1/mmap_windows.go @@ -0,0 +1,14 @@ +package tsm1 + +import ( + "fmt" + "os" +) + +func mmap(f *os.File, offset int64, length int) ([]byte, error) { + return nil, fmt.Errorf("mmap file not supported windows") +} + +func munmap(b []byte) (err error) { + return nil, fmt.Errorf("munmap file not supported on windows") +}