Add Prev support to FileStore

Allows read the previous block of values given a timestamp and key.
pull/4969/head
Jason Wilder 2015-12-02 22:03:48 -07:00
parent e9832d7414
commit be59ba3455
5 changed files with 235 additions and 89 deletions

View File

@ -763,9 +763,9 @@ type TSMReader struct {
// TSM file.
type blockAccessor interface {
init() (TSMIndex, error)
next(key string, timestamp time.Time) ([]Value, error)
read(key string, timestamp time.Time) ([]Value, error)
readAll(key string) ([]Value, error)
readBlock(entry *IndexEntry) ([]Value, error)
path() string
close() error
}
@ -860,7 +860,54 @@ func (t *TSMReader) Next(key string, timestamp time.Time) ([]Value, error) {
t.mu.Lock()
defer t.mu.Unlock()
return t.accessor.next(key, timestamp)
entries := t.index.Entries(key)
var entry *IndexEntry
if timestamp.Before(entries[0].MinTime) {
entry = entries[0]
} else {
for i, e := range entries {
if e.Contains(timestamp) {
if i+1 < len(entries) {
entry = entries[i+1]
}
}
}
}
if entry == nil {
return nil, nil
}
return t.accessor.readBlock(entry)
}
func (t *TSMReader) Prev(key string, timestamp time.Time) ([]Value, error) {
t.mu.Lock()
defer t.mu.Unlock()
entries := t.index.Entries(key)
var entry *IndexEntry
if timestamp.After(entries[len(entries)-1].MaxTime) {
entry = entries[len(entries)-1]
} else {
for i := len(entries) - 1; i >= 0; i-- {
e := entries[i]
if e.Contains(timestamp) {
if i-1 >= 0 {
entry = entries[i-1]
}
}
}
}
if entry == nil {
return nil, nil
}
return t.accessor.readBlock(entry)
}
func (t *TSMReader) Read(key string, timestamp time.Time) ([]Value, error) {
@ -1002,26 +1049,17 @@ func (f *fileAccessor) init() (TSMIndex, error) {
return f.index, nil
}
func (f *fileAccessor) next(key string, t time.Time) ([]Value, error) {
entries := f.index.Entries(key)
var entry *IndexEntry
if t.Before(entries[0].MinTime) {
entry = entries[0]
} else {
for i, entry := range entries {
if entry.Contains(t) {
if i+1 < len(entries) {
entry = entries[i+1]
}
}
}
}
func (f *fileAccessor) read(key string, timestamp time.Time) ([]Value, error) {
entry := f.index.Entry(key, timestamp)
if entry == nil {
return nil, nil
}
return f.readBlock(entry)
}
func (f *fileAccessor) readBlock(entry *IndexEntry) ([]Value, error) {
// TODO: remove this allocation
b := make([]byte, 16*1024)
_, err := f.r.Seek(entry.Offset, os.SEEK_SET)
@ -1048,39 +1086,6 @@ func (f *fileAccessor) next(key string, t time.Time) ([]Value, error) {
return values, nil
}
func (f *fileAccessor) read(key string, timestamp time.Time) ([]Value, error) {
block := f.index.Entry(key, timestamp)
if block == nil {
return nil, nil
}
// TODO: remove this allocation
b := make([]byte, 16*1024)
_, err := f.r.Seek(block.Offset, os.SEEK_SET)
if err != nil {
return nil, err
}
if int(block.Size) > len(b) {
b = make([]byte, block.Size)
}
n, err := f.r.Read(b)
if err != nil {
return nil, err
}
//TODO: Validate checksum
var values []Value
values, err = DecodeBlock(b[4:n], values)
if err != nil {
return nil, err
}
return values, nil
}
// ReadAll returns all values for a key in all blocks.
func (f *fileAccessor) readAll(key string) ([]Value, error) {
var values []Value
@ -1176,26 +1181,16 @@ func (m *mmapAccessor) init() (TSMIndex, error) {
return m.index, nil
}
func (m *mmapAccessor) next(key string, t time.Time) ([]Value, error) {
entries := m.index.Entries(key)
var entry *IndexEntry
if t.Before(entries[0].MinTime) {
entry = entries[0]
} else {
for i, entry := range entries {
if entry.Contains(t) {
if i+1 < len(entries) {
entry = entries[i+1]
}
}
}
}
func (m *mmapAccessor) read(key string, timestamp time.Time) ([]Value, error) {
entry := m.index.Entry(key, timestamp)
if entry == nil {
return nil, nil
}
return m.readBlock(entry)
}
func (m *mmapAccessor) readBlock(entry *IndexEntry) ([]Value, error) {
//TODO: Validate checksum
var values []Value
var err error
@ -1207,23 +1202,6 @@ func (m *mmapAccessor) next(key string, t time.Time) ([]Value, error) {
return values, nil
}
func (m *mmapAccessor) read(key string, timestamp time.Time) ([]Value, error) {
block := m.index.Entry(key, timestamp)
if block == nil {
return nil, nil
}
//TODO: Validate checksum
var values []Value
var err error
values, err = DecodeBlock(m.b[block.Offset+4:block.Offset+4+int64(block.Size)], values)
if err != nil {
return nil, err
}
return values, nil
}
// ReadAll returns all values for a key in all blocks.
func (m *mmapAccessor) readAll(key string) ([]Value, error) {
blocks := m.index.Entries(key)

View File

@ -368,6 +368,12 @@ func (e *DevEngine) Next(key string, t time.Time) ([]Value, error) {
return e.FileStore.Next(key, t)
}
func (e *DevEngine) Prev(key string, t time.Time) ([]Value, error) {
e.mu.RLock()
defer e.mu.RUnlock()
return e.FileStore.Prev(key, t)
}
type devTx struct {
engine *DevEngine
}
@ -414,6 +420,7 @@ type devCursor struct {
tsm interface {
Read(key string, time time.Time) ([]Value, error)
Next(key string, time time.Time) ([]Value, error)
Prev(key string, time time.Time) ([]Value, error)
}
series string

View File

@ -14,7 +14,7 @@ import (
)
// Ensure an engine containing cached values responds correctly to queries.
func TestDevEngine_CacheQuery_Ascending(t *testing.T) {
func TestDevEngine_QueryCache_Ascending(t *testing.T) {
// Generate temporary file.
f, _ := ioutil.TempFile("", "tsm1dev")
f.Close()
@ -73,7 +73,7 @@ func TestDevEngine_CacheQuery_Ascending(t *testing.T) {
}
// Ensure an engine containing cached values responds correctly to queries.
func TestDevEngine_TSMQuery_Ascending(t *testing.T) {
func TestDevEngine_QueryTSM_Ascending(t *testing.T) {
fs := NewFileStore("")
// Setup 3 files
@ -130,7 +130,7 @@ func TestDevEngine_TSMQuery_Ascending(t *testing.T) {
}
// Ensure an engine containing cached values responds correctly to queries.
func TestDevEngine_CacheQuery_Descending(t *testing.T) {
func TestDevEngine_QueryCache_Descending(t *testing.T) {
t.Skip("fixme")
// Generate temporary file.
f, _ := ioutil.TempFile("", "tsm1dev")

View File

@ -22,6 +22,9 @@ type TSMFile interface {
// Next returns all the values in the block after the block where time t resides
Next(key string, t time.Time) ([]Value, error)
// Prev returns all the values in the block before the block where time t resides
Prev(key string, t time.Time) ([]Value, error)
// Returns true if the TSMFile may contain a value with the specified
// key and time
ContainsValue(key string, t time.Time) bool
@ -268,14 +271,38 @@ func (f *FileStore) Next(key string, t time.Time) ([]Value, error) {
f.mu.RLock()
defer f.mu.RUnlock()
for _, f := range f.files {
for _, fd := range f.files {
// Can this file possibly contain this key and timestamp?
if !f.Contains(key) {
if !fd.Contains(key) {
continue
}
// May have the key and time we are looking for so try to find
v, err := f.Next(key, t)
v, err := fd.Next(key, t)
if err != nil {
return nil, err
}
if len(v) > 0 {
return v, nil
}
}
return nil, nil
}
func (f *FileStore) Prev(key string, t time.Time) ([]Value, error) {
f.mu.RLock()
defer f.mu.RUnlock()
for i := len(f.files) - 1; i >= 0; i-- {
fd := f.files[i]
// Can this file possibly contain this key and timestamp?
if !fd.Contains(key) {
continue
}
// May have the key and time we are looking for so try to find
v, err := fd.Prev(key, t)
if err != nil {
return nil, err
}

View File

@ -180,6 +180,140 @@ func TestFileStore_Next_End(t *testing.T) {
}
}
func TestFileStore_Prev_FromStart(t *testing.T) {
fs := tsm1.NewFileStore("")
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 1.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(1, 0), 2.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(2, 0), 3.0)}},
}
files, err := newFiles(data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
// Search for an entry that exists in the second file
values, err := fs.Prev("cpu", time.Unix(0, 0))
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
if got, exp := len(values), 0; got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
}
func TestFileStore_Prev_AfterEnd(t *testing.T) {
fs := tsm1.NewFileStore("")
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(1, 0), 1.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(2, 0), 2.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(3, 0), 3.0)}},
}
files, err := newFiles(data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
values, err := fs.Prev("cpu", time.Unix(4, 0))
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := data[2]
if got, exp := len(values), len(exp.values); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp.values {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
func TestFileStore_Prev_Middle(t *testing.T) {
fs := tsm1.NewFileStore("")
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(1, 0), 1.0)}},
keyValues{"cpu", []tsm1.Value{
tsm1.NewValue(time.Unix(2, 0), 2.0),
tsm1.NewValue(time.Unix(3, 0), 3.0),
tsm1.NewValue(time.Unix(4, 0), 4.0)},
},
}
files, err := newFiles(data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
// Search for an entry that exists in the second file
values, err := fs.Prev("cpu", time.Unix(3, 0))
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := data[0]
if got, exp := len(values), len(exp.values); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp.values {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %d", i, got, exp)
}
}
}
func TestFileStore_Prev_End(t *testing.T) {
fs := tsm1.NewFileStore("")
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 1.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(1, 0), 2.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(2, 0), 3.0)}},
}
files, err := newFiles(data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
values, err := fs.Prev("cpu", time.Unix(2, 0))
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := data[1]
if got, exp := len(values), len(exp.values); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp.values {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %d", i, got, exp)
}
}
}
func TestFileStore_Open(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)