Merge pull request #5031 from influxdb/jw-mintime

Dedupe points at query time if there are overlapping blocks
pull/5052/head
Jason Wilder 2015-12-07 21:28:29 -07:00
commit d32aeb2535
3 changed files with 246 additions and 19 deletions

View File

@ -208,6 +208,15 @@ func (e *IndexEntry) Contains(t time.Time) bool {
(e.MaxTime.Equal(t) || e.MaxTime.After(t))
}
func (e *IndexEntry) OverlapsTimeRange(min, max time.Time) bool {
return (e.MinTime.Equal(max) || e.MinTime.Before(max)) &&
(e.MaxTime.Equal(min) || e.MaxTime.After(min))
}
func (e *IndexEntry) String() string {
return fmt.Sprintf("min=%s max=%s ofs=%d siz=%d", e.MinTime.UTC(), e.MaxTime.UTC(), e.Offset, e.Size)
}
func NewDirectIndex() TSMIndex {
return &directIndex{
blocks: map[string]*indexEntries{},

View File

@ -443,14 +443,31 @@ func ParseTSMFileName(name string) (int, int, error) {
}
type KeyCursor struct {
key string
fs *FileStore
seeks []*location
current *location
buf []Value
key string
fs *FileStore
// seeks is all the file locations that we need to return during iteration.
seeks []*location
// current is the set of blocks possibly containing the next set of points.
// Normally this is just one entry, but there may be multiple if points have
// been overwritten.
current []*location
buf []Value
// pos is the index within seeks. Based on ascending, it will increment or
// decrement through the size of seeks slice.
pos int
ascending bool
ready bool
// ready indicates that we know the files and blocks to seek to for the key.
ready bool
// duplicates is a hint that there are overlapping blocks for this key in
// multiple files (e.g. points have been overwritten but not fully compacted)
// If this is true, we need to scan the duplicate blocks and dedup the points
// as query time until they are compacted.
duplicates bool
}
type location struct {
@ -462,8 +479,20 @@ func (c *KeyCursor) init(t time.Time, ascending bool) {
if c.ready {
return
}
c.ascending = ascending
c.seeks = c.fs.locations(c.key, t, ascending)
if len(c.seeks) > 0 {
for i := 1; i < len(c.seeks); i++ {
prev := c.seeks[i-1]
cur := c.seeks[i]
if prev.entry.MaxTime.Equal(cur.entry.MinTime) || prev.entry.MaxTime.After(cur.entry.MinTime) {
c.duplicates = true
break
}
}
}
c.buf = make([]Value, 1000)
c.ready = true
}
@ -478,42 +507,127 @@ func (c *KeyCursor) SeekTo(t time.Time, ascending bool) ([]Value, error) {
if ascending {
for i, e := range c.seeks {
if t.Before(e.entry.MinTime) || e.entry.Contains(t) {
c.current = e
c.pos = i
break
// Record the position of the first block matching our seek time
if len(c.current) == 0 {
c.pos = i
}
c.current = append(c.current, e)
// If we don't have duplicates, break. Otherwise, keep looking for additional blocks containing
// this point.
if !c.duplicates {
break
}
}
}
} else {
for i := len(c.seeks) - 1; i >= 0; i-- {
e := c.seeks[i]
if t.After(e.entry.MaxTime) || e.entry.Contains(t) {
c.current = e
c.pos = i
break
// Record the position of the first block matching our seek time
if len(c.current) == 0 {
c.pos = i
}
c.current = append(c.current, e)
// If we don't have duplicates, break. Otherwise, keep looking for additional blocks containing
// this point.
if !c.duplicates {
break
}
}
}
}
if c.current == nil {
return c.readAt()
}
func (c *KeyCursor) readAt() ([]Value, error) {
// No matching blocks to decode
if len(c.current) == 0 {
return nil, nil
}
return c.current.r.ReadAt(c.current.entry, c.buf[:0])
// First block is the oldest block containing the points we're search for.
first := c.current[0]
values, err := first.r.ReadAt(first.entry, c.buf[:0])
// Only one block with this key and time range so return it
if len(c.current) == 1 {
return values, err
}
// Otherwise, search the remaining blocks that overlap and append their values so we can
// dedup them.
for i := 1; i < len(c.current); i++ {
cur := c.current[i]
if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) {
c.pos++
v, err := cur.r.ReadAt(cur.entry, nil)
if err != nil {
return nil, err
}
values = append(values, v...)
} else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) {
c.pos--
v, err := cur.r.ReadAt(cur.entry, nil)
if err != nil {
return nil, err
}
values = append(v, values...)
}
}
return Values(values).Deduplicate(), err
}
func (c *KeyCursor) Next(ascending bool) ([]Value, error) {
c.current = c.current[:0]
if ascending {
c.pos++
if c.pos >= len(c.seeks) {
return nil, nil
}
c.current = c.seeks[c.pos]
return c.current.r.ReadAt(c.current.entry, c.buf[:0])
// Append the first matching block
c.current = []*location{c.seeks[c.pos]}
// If we have ovelapping blocks, append all their values so we can dedup
if c.duplicates {
first := c.seeks[c.pos]
for i := c.pos; i < len(c.seeks); i++ {
if c.seeks[i].entry.MinTime.Before(first.entry.MaxTime) || c.seeks[i].entry.MinTime.Equal(first.entry.MaxTime) {
c.current = append(c.current, c.seeks[i])
}
}
}
return c.readAt()
} else {
c.pos--
if c.pos < 0 {
return nil, nil
}
c.current = c.seeks[c.pos]
return c.current.r.ReadAt(c.current.entry, c.buf[:0])
// Append the first matching block
c.current = []*location{c.seeks[c.pos]}
// If we have ovelapping blocks, append all their values so we can dedup
if c.duplicates {
first := c.seeks[c.pos]
for i := c.pos; i >= 0; i-- {
if c.seeks[i].entry.MaxTime.After(first.entry.MinTime) || c.seeks[i].entry.MaxTime.Equal(first.entry.MinTime) {
c.current = append(c.current, c.seeks[i])
}
}
}
return c.readAt()
}
}

View File

@ -83,6 +83,59 @@ func TestFileStore_SeekToAsc_FromStart(t *testing.T) {
}
}
func TestFileStore_SeekToAsc_Duplicate(t *testing.T) {
fs := tsm1.NewFileStore("")
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 1.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 2.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(2, 0), 3.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(2, 0), 4.0)}},
}
files, err := newFiles(data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
c := fs.KeyCursor("cpu")
// Search for an entry that exists in the second file
values, err := c.SeekTo(time.Unix(0, 0), true)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := data[1]
if got, exp := len(values), len(exp.values); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp.values {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
// Check that calling Next will dedupe points
values, err = c.Next(true)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp = data[3]
if got, exp := len(values), len(exp.values); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp.values {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
func TestFileStore_SeekToAsc_BeforeStart(t *testing.T) {
fs := tsm1.NewFileStore("")
@ -226,6 +279,57 @@ func TestFileStore_SeekToDesc_FromStart(t *testing.T) {
}
}
func TestFileStore_SeekToDesc_Duplicate(t *testing.T) {
fs := tsm1.NewFileStore("")
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 4.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 1.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(2, 0), 2.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(2, 0), 3.0)}},
}
files, err := newFiles(data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
// Search for an entry that exists in the second file
c := fs.KeyCursor("cpu")
values, err := c.SeekTo(time.Unix(2, 0), false)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := data[3]
if got, exp := len(values), len(exp.values); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp.values {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
// Check that calling Next will dedupe points
values, err = c.Next(false)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp = data[1]
if got, exp := len(values), len(exp.values); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp.values {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) {
fs := tsm1.NewFileStore("")