Add tsdb.MultiCursor
This commit adds a cursor that wraps multiple `tsdb.Cursor` objects and streams them out as one cursor. The multi-cursor automatically dedupes keys by using the first cursor specified in the argument list.pull/3593/head
parent
bd512762db
commit
1ebcb10b03
|
@ -0,0 +1,119 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"container/heap"
|
||||
)
|
||||
|
||||
// MultiCursor returns a single cursor that combines the results of all cursors in order.
|
||||
//
|
||||
// If the same key is returned from multiple cursors then the first cursor
|
||||
// specified will take precendence. A key will only be returned once from the
|
||||
// returned cursor.
|
||||
func MultiCursor(cursors ...Cursor) Cursor {
|
||||
return &multiCursor{cursors: cursors}
|
||||
}
|
||||
|
||||
// multiCursor represents a cursor that combines multiple cursors into one.
|
||||
type multiCursor struct {
|
||||
cursors []Cursor
|
||||
heap cursorHeap
|
||||
prev []byte
|
||||
}
|
||||
|
||||
// Seek moves the cursor to a given key.
|
||||
func (mc *multiCursor) Seek(seek []byte) (key, value []byte) {
|
||||
// Initialize heap.
|
||||
h := make(cursorHeap, 0, len(mc.cursors))
|
||||
for i, c := range mc.cursors {
|
||||
// Move cursor to position. Skip if it's empty.
|
||||
k, v := c.Seek(seek)
|
||||
if k == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Append cursor to heap.
|
||||
h = append(h, &cursorHeapItem{
|
||||
key: k,
|
||||
value: v,
|
||||
cursor: c,
|
||||
priority: len(mc.cursors) - i,
|
||||
})
|
||||
}
|
||||
|
||||
heap.Init(&h)
|
||||
mc.heap = h
|
||||
mc.prev = nil
|
||||
|
||||
return mc.pop()
|
||||
}
|
||||
|
||||
// Next returns the next key/value from the cursor.
|
||||
func (mc *multiCursor) Next() (key, value []byte) { return mc.pop() }
|
||||
|
||||
// pop returns the next item from the heap.
|
||||
// Reads the next key/value from item's cursor and puts it back on the heap.
|
||||
func (mc *multiCursor) pop() (key, value []byte) {
|
||||
// Read items until we have a key that doesn't match the previously read one.
|
||||
// This is to perform deduplication when there's multiple items with the same key.
|
||||
// The highest priority cursor will be read first and then remaining keys will be dropped.
|
||||
for {
|
||||
// Return nil if there are no more items left.
|
||||
if len(mc.heap) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Read the next item from the heap.
|
||||
item := heap.Pop(&mc.heap).(*cursorHeapItem)
|
||||
|
||||
// Save the key/value for return.
|
||||
key, value = item.key, item.value
|
||||
|
||||
// Read the next item from the cursor. Push back to heap if one exists.
|
||||
if item.key, item.value = item.cursor.Next(); item.key != nil {
|
||||
heap.Push(&mc.heap, item)
|
||||
}
|
||||
|
||||
// Skip if this key matches the previously returned one.
|
||||
if bytes.Equal(mc.prev, key) {
|
||||
continue
|
||||
}
|
||||
|
||||
mc.prev = key
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// cursorHeap represents a heap of cursorHeapItems.
|
||||
type cursorHeap []*cursorHeapItem
|
||||
|
||||
func (h cursorHeap) Len() int { return len(h) }
|
||||
func (h cursorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
||||
func (h cursorHeap) Less(i, j int) bool {
|
||||
if cmp := bytes.Compare(h[i].key, h[j].key); cmp == -1 {
|
||||
return true
|
||||
} else if cmp == 0 {
|
||||
return h[i].priority > h[j].priority
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (h *cursorHeap) Push(x interface{}) {
|
||||
*h = append(*h, x.(*cursorHeapItem))
|
||||
}
|
||||
|
||||
func (h *cursorHeap) Pop() interface{} {
|
||||
old := *h
|
||||
n := len(old)
|
||||
item := old[n-1]
|
||||
*h = old[0 : n-1]
|
||||
return item
|
||||
}
|
||||
|
||||
// cursorHeapItem is something we manage in a priority queue.
|
||||
type cursorHeapItem struct {
|
||||
key []byte
|
||||
value []byte
|
||||
cursor Cursor
|
||||
priority int
|
||||
}
|
|
@ -0,0 +1,221 @@
|
|||
package tsdb_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"testing/quick"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// Ensure the multi-cursor can correctly iterate across a single subcursor.
|
||||
func TestMultiCursor_Single(t *testing.T) {
|
||||
mc := tsdb.MultiCursor(
|
||||
NewCursor([]CursorItem{
|
||||
{Key: []byte{0x00}, Value: []byte{0x00}},
|
||||
{Key: []byte{0x01}, Value: []byte{0x10}},
|
||||
{Key: []byte{0x02}, Value: []byte{0x20}},
|
||||
}),
|
||||
)
|
||||
|
||||
if k, v := mc.Seek([]byte{0x00}); !bytes.Equal(k, []byte{0x00}) || !bytes.Equal(v, []byte{0x00}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x01}) || !bytes.Equal(v, []byte{0x10}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x02}) || !bytes.Equal(v, []byte{0x20}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != nil {
|
||||
t.Fatalf("expected eof, got: %x / %x", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the multi-cursor can correctly iterate across multiple non-overlapping subcursors.
|
||||
func TestMultiCursor_Multiple_NonOverlapping(t *testing.T) {
|
||||
mc := tsdb.MultiCursor(
|
||||
NewCursor([]CursorItem{
|
||||
{Key: []byte{0x00}, Value: []byte{0x00}},
|
||||
{Key: []byte{0x03}, Value: []byte{0x30}},
|
||||
{Key: []byte{0x04}, Value: []byte{0x40}},
|
||||
}),
|
||||
NewCursor([]CursorItem{
|
||||
{Key: []byte{0x01}, Value: []byte{0x10}},
|
||||
{Key: []byte{0x02}, Value: []byte{0x20}},
|
||||
}),
|
||||
)
|
||||
|
||||
if k, v := mc.Seek([]byte{0x00}); !bytes.Equal(k, []byte{0x00}) || !bytes.Equal(v, []byte{0x00}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x01}) || !bytes.Equal(v, []byte{0x10}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x02}) || !bytes.Equal(v, []byte{0x20}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x03}) || !bytes.Equal(v, []byte{0x30}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x04}) || !bytes.Equal(v, []byte{0x40}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != nil {
|
||||
t.Fatalf("expected eof, got: %x / %x", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the multi-cursor can correctly iterate across multiple overlapping subcursors.
|
||||
func TestMultiCursor_Multiple_Overlapping(t *testing.T) {
|
||||
mc := tsdb.MultiCursor(
|
||||
NewCursor([]CursorItem{
|
||||
{Key: []byte{0x00}, Value: []byte{0x00}},
|
||||
{Key: []byte{0x03}, Value: []byte{0x03}},
|
||||
{Key: []byte{0x04}, Value: []byte{0x04}},
|
||||
}),
|
||||
NewCursor([]CursorItem{
|
||||
{Key: []byte{0x00}, Value: []byte{0xF0}},
|
||||
{Key: []byte{0x02}, Value: []byte{0xF2}},
|
||||
{Key: []byte{0x04}, Value: []byte{0xF4}},
|
||||
}),
|
||||
)
|
||||
|
||||
if k, v := mc.Seek([]byte{0x00}); !bytes.Equal(k, []byte{0x00}) || !bytes.Equal(v, []byte{0x00}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x02}) || !bytes.Equal(v, []byte{0xF2}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x03}) || !bytes.Equal(v, []byte{0x03}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x04}) || !bytes.Equal(v, []byte{0x04}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != nil {
|
||||
t.Fatalf("expected eof, got: %x / %x", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the multi-cursor can handle randomly generated data.
|
||||
func TestMultiCursor_Quick(t *testing.T) {
|
||||
quick.Check(func(seek uint64, cursors []Cursor) bool {
|
||||
var got, exp [][]byte
|
||||
seek %= 100
|
||||
|
||||
// Merge all cursor data to determine expected output.
|
||||
// First seen key overrides all other items with the same key.
|
||||
m := make(map[string][]byte)
|
||||
for _, c := range cursors {
|
||||
for _, item := range c.items {
|
||||
if bytes.Compare(item.Key, u64tob(seek)) == -1 {
|
||||
continue
|
||||
}
|
||||
if _, ok := m[string(item.Key)]; ok {
|
||||
continue
|
||||
}
|
||||
m[string(item.Key)] = item.Value
|
||||
}
|
||||
}
|
||||
|
||||
// Convert map back to single item list.
|
||||
for k, v := range m {
|
||||
exp = append(exp, append([]byte(k), v...))
|
||||
}
|
||||
sort.Sort(byteSlices(exp))
|
||||
|
||||
// Create multi-cursor and iterate over all items.
|
||||
mc := tsdb.MultiCursor(tsdbCursorSlice(cursors)...)
|
||||
for k, v := mc.Seek(u64tob(seek)); k != nil; k, v = mc.Next() {
|
||||
got = append(got, append(k, v...))
|
||||
}
|
||||
|
||||
// Verify results.
|
||||
if !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("mismatch: seek=%d\n\ngot=%+v\n\nexp=%+v", seek, got, exp)
|
||||
}
|
||||
|
||||
return true
|
||||
}, nil)
|
||||
}
|
||||
|
||||
// Cursor represents an in-memory test cursor.
|
||||
type Cursor struct {
|
||||
items []CursorItem
|
||||
index int
|
||||
}
|
||||
|
||||
// NewCursor returns a new instance of Cursor.
|
||||
func NewCursor(items []CursorItem) *Cursor {
|
||||
sort.Sort(CursorItems(items))
|
||||
return &Cursor{items: items}
|
||||
}
|
||||
|
||||
// Seek seeks to an item by key.
|
||||
func (c *Cursor) Seek(seek []byte) (key, value []byte) {
|
||||
for c.index = 0; c.index < len(c.items); c.index++ {
|
||||
if bytes.Compare(c.items[c.index].Key, seek) == -1 { // skip keys less than seek
|
||||
continue
|
||||
}
|
||||
return c.items[c.index].Key, c.items[c.index].Value
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Next returns the next key/value pair.
|
||||
func (c *Cursor) Next() (key, value []byte) {
|
||||
if c.index >= len(c.items)-1 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
c.index++
|
||||
return c.items[c.index].Key, c.items[c.index].Value
|
||||
}
|
||||
|
||||
// Generate returns a randomly generated cursor. Implements quick.Generator.
|
||||
func (c Cursor) Generate(rand *rand.Rand, size int) reflect.Value {
|
||||
c.index = 0
|
||||
|
||||
c.items = make([]CursorItem, rand.Intn(size))
|
||||
for i := range c.items {
|
||||
value, _ := quick.Value(reflect.TypeOf([]byte(nil)), rand)
|
||||
|
||||
c.items[i] = CursorItem{
|
||||
Key: u64tob(uint64(rand.Intn(size))),
|
||||
Value: value.Interface().([]byte),
|
||||
}
|
||||
}
|
||||
|
||||
// Sort items by key.
|
||||
sort.Sort(CursorItems(c.items))
|
||||
|
||||
return reflect.ValueOf(c)
|
||||
}
|
||||
|
||||
// tsdbCursorSlice converts a Cursor slice to a tsdb.Cursor slice.
|
||||
func tsdbCursorSlice(a []Cursor) []tsdb.Cursor {
|
||||
var other []tsdb.Cursor
|
||||
for i := range a {
|
||||
other = append(other, &a[i])
|
||||
}
|
||||
return other
|
||||
}
|
||||
|
||||
// CursorItem represents a key/value pair in a cursor.
|
||||
type CursorItem struct {
|
||||
Key []byte
|
||||
Value []byte
|
||||
}
|
||||
|
||||
type CursorItems []CursorItem
|
||||
|
||||
func (a CursorItems) Len() int { return len(a) }
|
||||
func (a CursorItems) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a CursorItems) Less(i, j int) bool { return bytes.Compare(a[i].Key, a[j].Key) == -1 }
|
||||
|
||||
// byteSlices represents a sortable slice of byte slices.
|
||||
type byteSlices [][]byte
|
||||
|
||||
func (a byteSlices) Len() int { return len(a) }
|
||||
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
|
||||
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
||||
// u64tob converts a uint64 into an 8-byte slice.
|
||||
func u64tob(v uint64) []byte {
|
||||
b := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(b, v)
|
||||
return b
|
||||
}
|
Loading…
Reference in New Issue