Support sort by time DESC in wal and bz1 engines
parent
04a20566c1
commit
266bdc1c2b
|
@ -3877,3 +3877,51 @@ func TestServer_Query_EvilIdentifiers(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_Query_OrderByTime(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := OpenServer(NewConfig(), "")
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
writes := []string{
|
||||
fmt.Sprintf(`cpu,host=server1 value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
|
||||
fmt.Sprintf(`cpu,host=server1 value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()),
|
||||
fmt.Sprintf(`cpu,host=server1 value=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()),
|
||||
}
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.write = strings.Join(writes, "\n")
|
||||
|
||||
test.addQueries([]*Query{
|
||||
&Query{
|
||||
name: "order on points",
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `select value from "cpu" ORDER BY time DESC`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:03Z",3],["2000-01-01T00:00:02Z",2],["2000-01-01T00:00:01Z",1]]}]}]}`,
|
||||
},
|
||||
}...)
|
||||
|
||||
for i, query := range test.queries {
|
||||
if i == 0 {
|
||||
if err := test.init(s); err != nil {
|
||||
t.Fatalf("test init failed: %s", err)
|
||||
}
|
||||
}
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
if err := query.Execute(s); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,8 +10,8 @@ import (
|
|||
// If the same key is returned from multiple cursors then the first cursor
|
||||
// specified will take precendence. A key will only be returned once from the
|
||||
// returned cursor.
|
||||
func MultiCursor(cursors ...Cursor) Cursor {
|
||||
return &multiCursor{cursors: cursors}
|
||||
func MultiCursor(forward bool, cursors ...Cursor) Cursor {
|
||||
return &multiCursor{cursors: cursors, forward: forward}
|
||||
}
|
||||
|
||||
// multiCursor represents a cursor that combines multiple cursors into one.
|
||||
|
@ -19,6 +19,7 @@ type multiCursor struct {
|
|||
cursors []Cursor
|
||||
heap cursorHeap
|
||||
prev []byte
|
||||
forward bool
|
||||
}
|
||||
|
||||
// Seek moves the cursor to a given key.
|
||||
|
@ -48,6 +49,8 @@ func (mc *multiCursor) Seek(seek []byte) (key, value []byte) {
|
|||
return mc.pop()
|
||||
}
|
||||
|
||||
func (mc *multiCursor) Direction() bool { return mc.forward }
|
||||
|
||||
// Next returns the next key/value from the cursor.
|
||||
func (mc *multiCursor) Next() (key, value []byte) { return mc.pop() }
|
||||
|
||||
|
@ -90,7 +93,12 @@ type cursorHeap []*cursorHeapItem
|
|||
func (h cursorHeap) Len() int { return len(h) }
|
||||
func (h cursorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
||||
func (h cursorHeap) Less(i, j int) bool {
|
||||
if cmp := bytes.Compare(h[i].key, h[j].key); cmp == -1 {
|
||||
dir := -1
|
||||
if !h[i].cursor.Direction() {
|
||||
dir = 1
|
||||
}
|
||||
|
||||
if cmp := bytes.Compare(h[i].key, h[j].key); cmp == dir {
|
||||
return true
|
||||
} else if cmp == 0 {
|
||||
return h[i].priority > h[j].priority
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
|
||||
// Ensure the multi-cursor can correctly iterate across a single subcursor.
|
||||
func TestMultiCursor_Single(t *testing.T) {
|
||||
mc := tsdb.MultiCursor(
|
||||
mc := tsdb.MultiCursor(true,
|
||||
NewCursor([]CursorItem{
|
||||
{Key: []byte{0x00}, Value: []byte{0x00}},
|
||||
{Key: []byte{0x01}, Value: []byte{0x10}},
|
||||
|
@ -35,7 +35,7 @@ func TestMultiCursor_Single(t *testing.T) {
|
|||
|
||||
// Ensure the multi-cursor can correctly iterate across multiple non-overlapping subcursors.
|
||||
func TestMultiCursor_Multiple_NonOverlapping(t *testing.T) {
|
||||
mc := tsdb.MultiCursor(
|
||||
mc := tsdb.MultiCursor(true,
|
||||
NewCursor([]CursorItem{
|
||||
{Key: []byte{0x00}, Value: []byte{0x00}},
|
||||
{Key: []byte{0x03}, Value: []byte{0x30}},
|
||||
|
@ -64,7 +64,7 @@ func TestMultiCursor_Multiple_NonOverlapping(t *testing.T) {
|
|||
|
||||
// Ensure the multi-cursor can correctly iterate across multiple overlapping subcursors.
|
||||
func TestMultiCursor_Multiple_Overlapping(t *testing.T) {
|
||||
mc := tsdb.MultiCursor(
|
||||
mc := tsdb.MultiCursor(true,
|
||||
NewCursor([]CursorItem{
|
||||
{Key: []byte{0x00}, Value: []byte{0x00}},
|
||||
{Key: []byte{0x03}, Value: []byte{0x03}},
|
||||
|
@ -118,7 +118,7 @@ func TestMultiCursor_Quick(t *testing.T) {
|
|||
sort.Sort(byteSlices(exp))
|
||||
|
||||
// Create multi-cursor and iterate over all items.
|
||||
mc := tsdb.MultiCursor(tsdbCursorSlice(cursors)...)
|
||||
mc := tsdb.MultiCursor(true, tsdbCursorSlice(cursors)...)
|
||||
for k, v := mc.Seek(u64tob(seek)); k != nil; k, v = mc.Next() {
|
||||
got = append(got, append(k, v...))
|
||||
}
|
||||
|
@ -144,6 +144,8 @@ func NewCursor(items []CursorItem) *Cursor {
|
|||
return &Cursor{items: items}
|
||||
}
|
||||
|
||||
func (c *Cursor) Direction() bool { return true }
|
||||
|
||||
// Seek seeks to an item by key.
|
||||
func (c *Cursor) Seek(seek []byte) (key, value []byte) {
|
||||
for c.index = 0; c.index < len(c.items); c.index++ {
|
||||
|
|
|
@ -123,7 +123,7 @@ func NewEngineOptions() EngineOptions {
|
|||
type Tx interface {
|
||||
io.WriterTo
|
||||
|
||||
Cursor(series string) Cursor
|
||||
Cursor(series string, forward bool) Cursor
|
||||
Size() int64
|
||||
Commit() error
|
||||
Rollback() error
|
||||
|
@ -133,6 +133,7 @@ type Tx interface {
|
|||
type Cursor interface {
|
||||
Seek(seek []byte) (key, value []byte)
|
||||
Next() (key, value []byte)
|
||||
Direction() bool
|
||||
}
|
||||
|
||||
// DedupeEntries returns slices with unique keys (the first 8 bytes).
|
||||
|
|
|
@ -550,7 +550,7 @@ type Tx struct {
|
|||
}
|
||||
|
||||
// Cursor returns an iterator for a key.
|
||||
func (tx *Tx) Cursor(key string) tsdb.Cursor {
|
||||
func (tx *Tx) Cursor(key string, forward bool) tsdb.Cursor {
|
||||
// Retrieve key bucket.
|
||||
b := tx.Bucket([]byte(key))
|
||||
|
||||
|
@ -591,6 +591,8 @@ type Cursor struct {
|
|||
prev []byte
|
||||
}
|
||||
|
||||
func (c *Cursor) Direction() bool { return true }
|
||||
|
||||
// Seek moves the cursor to a position and returns the closest key/value pair.
|
||||
func (c *Cursor) Seek(seek []byte) (key, value []byte) {
|
||||
// Seek bolt cursor.
|
||||
|
|
|
@ -64,7 +64,7 @@ func TestEngine_WritePoints(t *testing.T) {
|
|||
tx := e.MustBegin(false)
|
||||
defer tx.Rollback()
|
||||
|
||||
c := tx.Cursor("temperature")
|
||||
c := tx.Cursor("temperature", true)
|
||||
if k, v := c.Seek([]byte{0}); !bytes.Equal(k, u64tob(uint64(time.Unix(1434059627, 0).UnixNano()))) {
|
||||
t.Fatalf("unexpected key: %#v", k)
|
||||
} else if m, err := mf.Codec.DecodeFieldsWithNames(v); err != nil {
|
||||
|
|
|
@ -59,7 +59,7 @@ type WAL interface {
|
|||
WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
|
||||
LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error
|
||||
DeleteSeries(keys []string) error
|
||||
Cursor(key string) tsdb.Cursor
|
||||
Cursor(key string, forward bool) tsdb.Cursor
|
||||
Open() error
|
||||
Close() error
|
||||
Flush() error
|
||||
|
@ -606,8 +606,8 @@ type Tx struct {
|
|||
}
|
||||
|
||||
// Cursor returns an iterator for a key.
|
||||
func (tx *Tx) Cursor(key string) tsdb.Cursor {
|
||||
walCursor := tx.wal.Cursor(key)
|
||||
func (tx *Tx) Cursor(key string, forward bool) tsdb.Cursor {
|
||||
walCursor := tx.wal.Cursor(key, forward)
|
||||
|
||||
// Retrieve points bucket. Ignore if there is no bucket.
|
||||
b := tx.Bucket([]byte("points")).Bucket([]byte(key))
|
||||
|
@ -616,19 +616,25 @@ func (tx *Tx) Cursor(key string) tsdb.Cursor {
|
|||
}
|
||||
|
||||
c := &Cursor{
|
||||
cursor: b.Cursor(),
|
||||
cursor: b.Cursor(),
|
||||
forward: forward,
|
||||
}
|
||||
|
||||
return tsdb.MultiCursor(walCursor, c)
|
||||
return tsdb.MultiCursor(forward, walCursor, c)
|
||||
}
|
||||
|
||||
// Cursor provides ordered iteration across a series.
|
||||
type Cursor struct {
|
||||
cursor *bolt.Cursor
|
||||
buf []byte // uncompressed buffer
|
||||
off int // buffer offset
|
||||
cursor *bolt.Cursor
|
||||
buf []byte // uncompressed buffer
|
||||
off int // buffer offset
|
||||
forward bool
|
||||
fieldIndices []int
|
||||
index int
|
||||
}
|
||||
|
||||
func (c *Cursor) Direction() bool { return c.forward }
|
||||
|
||||
// Seek moves the cursor to a position and returns the closest key/value pair.
|
||||
func (c *Cursor) Seek(seek []byte) (key, value []byte) {
|
||||
// Move cursor to appropriate block and set to buffer.
|
||||
|
@ -659,12 +665,26 @@ func (c *Cursor) seekBuf(seek []byte) (key, value []byte) {
|
|||
buf := c.buf[c.off:]
|
||||
|
||||
// Exit if current entry's timestamp is on or after the seek.
|
||||
if len(buf) == 0 || bytes.Compare(buf[0:8], seek) != -1 {
|
||||
if len(buf) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise skip ahead to the next entry.
|
||||
c.off += entryHeaderSize + entryDataSize(buf)
|
||||
if c.forward && bytes.Compare(buf[0:8], seek) != -1 {
|
||||
return
|
||||
} else if !c.forward && bytes.Compare(buf[0:8], seek) != 1 {
|
||||
return
|
||||
}
|
||||
|
||||
if c.forward {
|
||||
// Otherwise skip ahead to the next entry.
|
||||
c.off += entryHeaderSize + entryDataSize(buf)
|
||||
} else {
|
||||
c.index -= 1
|
||||
if c.index < 0 {
|
||||
return
|
||||
}
|
||||
c.off = c.fieldIndices[c.index]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -675,8 +695,22 @@ func (c *Cursor) Next() (key, value []byte) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// Move forward to next entry.
|
||||
c.off += entryHeaderSize + entryDataSize(c.buf[c.off:])
|
||||
if c.forward {
|
||||
// Move forward to next entry.
|
||||
c.off += entryHeaderSize + entryDataSize(c.buf[c.off:])
|
||||
} else {
|
||||
c.index -= 1
|
||||
|
||||
// If we've move past the beginning of buf, grab the previous block
|
||||
if c.index < 0 {
|
||||
_, v := c.cursor.Prev()
|
||||
c.setBuf(v)
|
||||
}
|
||||
|
||||
if len(c.fieldIndices) > 0 {
|
||||
c.off = c.fieldIndices[c.index]
|
||||
}
|
||||
}
|
||||
|
||||
// If no items left then read first item from next block.
|
||||
if c.off >= len(c.buf) {
|
||||
|
@ -691,7 +725,7 @@ func (c *Cursor) Next() (key, value []byte) {
|
|||
func (c *Cursor) setBuf(block []byte) {
|
||||
// Clear if the block is empty.
|
||||
if len(block) == 0 {
|
||||
c.buf, c.off = c.buf[0:0], 0
|
||||
c.buf, c.off, c.fieldIndices, c.index = c.buf[0:0], 0, c.fieldIndices[0:0], 0
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -702,7 +736,31 @@ func (c *Cursor) setBuf(block []byte) {
|
|||
c.buf = c.buf[0:0]
|
||||
log.Printf("block decode error: %s", err)
|
||||
}
|
||||
c.buf, c.off = buf, 0
|
||||
|
||||
if c.forward {
|
||||
c.buf, c.off = buf, 0
|
||||
} else {
|
||||
c.buf, c.off = buf, 0
|
||||
|
||||
// Buf contains multiple fields packed into a byte slice with timestamp
|
||||
// and data lengths. We need to build an index into this byte slice that
|
||||
// tells us where each field block is in buf so we can iterate backward without
|
||||
// rescanning the buf each time Next is called. Forward iteration does not
|
||||
// need this because we know the entries lenghth and the header size so we can
|
||||
// skip forward that many bytes.
|
||||
c.fieldIndices = []int{}
|
||||
for {
|
||||
if c.off >= len(buf) {
|
||||
break
|
||||
}
|
||||
|
||||
c.fieldIndices = append(c.fieldIndices, c.off)
|
||||
c.off += entryHeaderSize + entryDataSize(buf[c.off:])
|
||||
}
|
||||
|
||||
c.off = c.fieldIndices[len(c.fieldIndices)-1]
|
||||
c.index = len(c.fieldIndices) - 1
|
||||
}
|
||||
}
|
||||
|
||||
// read reads the current key and value from the current block.
|
||||
|
|
|
@ -175,7 +175,7 @@ func TestEngine_WriteIndex_Append(t *testing.T) {
|
|||
defer tx.Rollback()
|
||||
|
||||
// Iterate over "cpu" series.
|
||||
c := tx.Cursor("cpu")
|
||||
c := tx.Cursor("cpu", true)
|
||||
if k, v := c.Seek(u64tob(0)); !reflect.DeepEqual(k, []byte{0, 0, 0, 0, 0, 0, 0, 1}) || !reflect.DeepEqual(v, []byte{0x10}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = c.Next(); !reflect.DeepEqual(k, []byte{0, 0, 0, 0, 0, 0, 0, 2}) || !reflect.DeepEqual(v, []byte{0x20}) {
|
||||
|
@ -185,7 +185,7 @@ func TestEngine_WriteIndex_Append(t *testing.T) {
|
|||
}
|
||||
|
||||
// Iterate over "mem" series.
|
||||
c = tx.Cursor("mem")
|
||||
c = tx.Cursor("mem", true)
|
||||
if k, v := c.Seek(u64tob(0)); !reflect.DeepEqual(k, []byte{0, 0, 0, 0, 0, 0, 0, 0}) || !reflect.DeepEqual(v, []byte{0x30}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, _ = c.Next(); k != nil {
|
||||
|
@ -235,7 +235,7 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
|
|||
defer tx.Rollback()
|
||||
|
||||
// Iterate over "cpu" series.
|
||||
c := tx.Cursor("cpu")
|
||||
c := tx.Cursor("cpu", true)
|
||||
if k, v := c.Seek(u64tob(0)); btou64(k) != 9 || !bytes.Equal(v, []byte{0x09}) {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = c.Next(); btou64(k) != 10 || !bytes.Equal(v, []byte{0xFF}) {
|
||||
|
@ -276,7 +276,7 @@ func TestEngine_WriteIndex_SeekAgainstInBlockValue(t *testing.T) {
|
|||
defer tx.Rollback()
|
||||
|
||||
// Ensure that we can seek to a block in the middle
|
||||
c := tx.Cursor("cpu")
|
||||
c := tx.Cursor("cpu", true)
|
||||
if k, _ := c.Seek(u64tob(15)); btou64(k) != 20 {
|
||||
t.Fatalf("expected to seek to time 20, but got %d", btou64(k))
|
||||
}
|
||||
|
@ -334,7 +334,7 @@ func TestEngine_WriteIndex_Quick(t *testing.T) {
|
|||
|
||||
// Iterate over results to ensure they are correct.
|
||||
for _, key := range keys {
|
||||
c := tx.Cursor(key)
|
||||
c := tx.Cursor(key, true)
|
||||
|
||||
// Read list of key/values.
|
||||
var got [][]byte
|
||||
|
@ -381,7 +381,7 @@ func TestEngine_WriteIndex_Quick_Append(t *testing.T) {
|
|||
|
||||
// Iterate over results to ensure they are correct.
|
||||
for _, key := range keys {
|
||||
c := tx.Cursor(key)
|
||||
c := tx.Cursor(key, true)
|
||||
|
||||
// Read list of key/values.
|
||||
var got [][]byte
|
||||
|
@ -523,7 +523,7 @@ func (w *EnginePointsWriter) Open() error { return nil }
|
|||
|
||||
func (w *EnginePointsWriter) Close() error { return nil }
|
||||
|
||||
func (w *EnginePointsWriter) Cursor(key string) tsdb.Cursor { return &Cursor{} }
|
||||
func (w *EnginePointsWriter) Cursor(key string, forward bool) tsdb.Cursor { return &Cursor{} }
|
||||
|
||||
func (w *EnginePointsWriter) Flush() error { return nil }
|
||||
|
||||
|
@ -531,6 +531,8 @@ func (w *EnginePointsWriter) Flush() error { return nil }
|
|||
type Cursor struct {
|
||||
}
|
||||
|
||||
func (c *Cursor) Direction() bool { return true }
|
||||
|
||||
func (c *Cursor) Seek(key []byte) ([]byte, []byte) { return nil, nil }
|
||||
|
||||
func (c *Cursor) Next() ([]byte, []byte) { return nil, nil }
|
||||
|
|
|
@ -223,11 +223,11 @@ func (l *Log) Open() error {
|
|||
}
|
||||
|
||||
// Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given
|
||||
func (l *Log) Cursor(key string) tsdb.Cursor {
|
||||
func (l *Log) Cursor(key string, forward bool) tsdb.Cursor {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
|
||||
return l.partition([]byte(key)).cursor(key)
|
||||
return l.partition([]byte(key)).cursor(key, forward)
|
||||
}
|
||||
|
||||
func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
|
||||
|
@ -1380,7 +1380,7 @@ func (p *Partition) addToCache(key, data []byte, timestamp int64) {
|
|||
}
|
||||
|
||||
// cursor will combine the in memory cache and flush cache (if a flush is currently happening) to give a single ordered cursor for the key
|
||||
func (p *Partition) cursor(key string) *cursor {
|
||||
func (p *Partition) cursor(key string, forward bool) *cursor {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
|
@ -1398,7 +1398,7 @@ func (p *Partition) cursor(key string) *cursor {
|
|||
c = append(c, entry.points...)
|
||||
|
||||
dedupe := tsdb.DedupeEntries(c)
|
||||
return &cursor{cache: dedupe}
|
||||
return &cursor{cache: dedupe, forward: forward}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1410,7 +1410,7 @@ func (p *Partition) cursor(key string) *cursor {
|
|||
// build a copy so modifications to the partition don't change the result set
|
||||
a := make([][]byte, len(entry.points))
|
||||
copy(a, entry.points)
|
||||
return &cursor{cache: a}
|
||||
return &cursor{cache: a, forward: forward}
|
||||
}
|
||||
|
||||
// idFromFileName parses the segment file ID from its name
|
||||
|
@ -1593,8 +1593,11 @@ type entry struct {
|
|||
type cursor struct {
|
||||
cache [][]byte
|
||||
position int
|
||||
forward bool
|
||||
}
|
||||
|
||||
func (c *cursor) Direction() bool { return c.forward }
|
||||
|
||||
// Seek will point the cursor to the given time (or key)
|
||||
func (c *cursor) Seek(seek []byte) (key, value []byte) {
|
||||
// Seek cache index.
|
||||
|
@ -1607,12 +1610,26 @@ func (c *cursor) Seek(seek []byte) (key, value []byte) {
|
|||
|
||||
// Next moves the cursor to the next key/value. will return nil if at the end
|
||||
func (c *cursor) Next() (key, value []byte) {
|
||||
if c.position >= len(c.cache) {
|
||||
|
||||
if !c.forward && c.position >= len(c.cache) {
|
||||
c.position--
|
||||
}
|
||||
|
||||
if c.forward && c.position >= len(c.cache) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if !c.forward && c.position < 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
v := c.cache[c.position]
|
||||
c.position++
|
||||
|
||||
if c.forward {
|
||||
c.position++
|
||||
} else {
|
||||
c.position--
|
||||
}
|
||||
|
||||
return v[0:8], v[8:]
|
||||
|
||||
|
|
|
@ -46,7 +46,7 @@ func TestWAL_WritePoints(t *testing.T) {
|
|||
}
|
||||
|
||||
verify := func() {
|
||||
c := log.Cursor("cpu,host=A")
|
||||
c := log.Cursor("cpu,host=A", true)
|
||||
k, v := c.Seek(inttob(1))
|
||||
|
||||
// ensure the series are there and points are in order
|
||||
|
@ -64,7 +64,7 @@ func TestWAL_WritePoints(t *testing.T) {
|
|||
t.Fatalf("expected nil on last seek: %v %v", k, v)
|
||||
}
|
||||
|
||||
c = log.Cursor("cpu,host=B")
|
||||
c = log.Cursor("cpu,host=B", true)
|
||||
k, v = c.Next()
|
||||
if bytes.Compare(v, p3.Data()) != 0 {
|
||||
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
|
||||
|
@ -92,7 +92,7 @@ func TestWAL_WritePoints(t *testing.T) {
|
|||
}
|
||||
|
||||
verify2 := func() {
|
||||
c := log.Cursor("cpu,host=A")
|
||||
c := log.Cursor("cpu,host=A", true)
|
||||
k, v := c.Next()
|
||||
if bytes.Compare(v, p1.Data()) != 0 {
|
||||
t.Fatalf("order wrong, expected p1, %v %v %v", v, k, p1.Data())
|
||||
|
@ -110,7 +110,7 @@ func TestWAL_WritePoints(t *testing.T) {
|
|||
t.Fatal("order wrong, expected p6")
|
||||
}
|
||||
|
||||
c = log.Cursor("cpu,host=C")
|
||||
c = log.Cursor("cpu,host=C", true)
|
||||
_, v = c.Next()
|
||||
if bytes.Compare(v, p5.Data()) != 0 {
|
||||
t.Fatal("order wrong, expected p6")
|
||||
|
@ -150,7 +150,7 @@ func TestWAL_CorruptDataLengthSize(t *testing.T) {
|
|||
}
|
||||
|
||||
verify := func() {
|
||||
c := log.Cursor("cpu,host=A")
|
||||
c := log.Cursor("cpu,host=A", true)
|
||||
_, v := c.Next()
|
||||
if bytes.Compare(v, p1.Data()) != 0 {
|
||||
t.Fatal("p1 value wrong")
|
||||
|
@ -183,7 +183,7 @@ func TestWAL_CorruptDataLengthSize(t *testing.T) {
|
|||
}
|
||||
|
||||
verify = func() {
|
||||
c := log.Cursor("cpu,host=A")
|
||||
c := log.Cursor("cpu,host=A", true)
|
||||
_, v := c.Next()
|
||||
if bytes.Compare(v, p1.Data()) != 0 {
|
||||
t.Fatal("p1 value wrong")
|
||||
|
@ -229,7 +229,7 @@ func TestWAL_CorruptDataBlock(t *testing.T) {
|
|||
}
|
||||
|
||||
verify := func() {
|
||||
c := log.Cursor("cpu,host=A")
|
||||
c := log.Cursor("cpu,host=A", true)
|
||||
_, v := c.Next()
|
||||
if bytes.Compare(v, p1.Data()) != 0 {
|
||||
t.Fatal("p1 value wrong")
|
||||
|
@ -268,7 +268,7 @@ func TestWAL_CorruptDataBlock(t *testing.T) {
|
|||
}
|
||||
|
||||
verify = func() {
|
||||
c := log.Cursor("cpu,host=A")
|
||||
c := log.Cursor("cpu,host=A", true)
|
||||
_, v := c.Next()
|
||||
if bytes.Compare(v, p1.Data()) != 0 {
|
||||
t.Fatal("p1 value wrong")
|
||||
|
@ -349,7 +349,7 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
|
|||
}
|
||||
|
||||
// ensure we have some data
|
||||
c := log.Cursor("cpu,host=A,region=uswest23")
|
||||
c := log.Cursor("cpu,host=A,region=uswest23", true)
|
||||
k, v := c.Next()
|
||||
if btou64(k) != 1 {
|
||||
t.Fatalf("expected timestamp of 1, but got %v %v", k, v)
|
||||
|
@ -365,13 +365,13 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
|
|||
}
|
||||
|
||||
// should be nil
|
||||
c = log.Cursor("cpu,host=A,region=uswest23")
|
||||
c = log.Cursor("cpu,host=A,region=uswest23", true)
|
||||
k, v = c.Next()
|
||||
if k != nil || v != nil {
|
||||
t.Fatal("expected cache to be nil after flush: ", k, v)
|
||||
}
|
||||
|
||||
c = log.Cursor("cpu,host=A,region=useast1")
|
||||
c = log.Cursor("cpu,host=A,region=useast1", true)
|
||||
k, v = c.Next()
|
||||
if btou64(k) != 1 {
|
||||
t.Fatal("expected cache to be there after flush and compact: ", k, v)
|
||||
|
@ -385,13 +385,13 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
|
|||
log.Close()
|
||||
log.Open()
|
||||
|
||||
c = log.Cursor("cpu,host=A,region=uswest23")
|
||||
c = log.Cursor("cpu,host=A,region=uswest23", true)
|
||||
k, v = c.Next()
|
||||
if k != nil || v != nil {
|
||||
t.Fatal("expected cache to be nil after flush and re-open: ", k, v)
|
||||
}
|
||||
|
||||
c = log.Cursor("cpu,host=A,region=useast1")
|
||||
c = log.Cursor("cpu,host=A,region=useast1", true)
|
||||
k, v = c.Next()
|
||||
if btou64(k) != 1 {
|
||||
t.Fatal("expected cache to be there after flush and compact: ", k, v)
|
||||
|
@ -444,7 +444,7 @@ func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) {
|
|||
}
|
||||
|
||||
// ensure we have some data
|
||||
c := log.Cursor("cpu,host=A,region=uswest10")
|
||||
c := log.Cursor("cpu,host=A,region=uswest10", true)
|
||||
k, _ := c.Next()
|
||||
if btou64(k) != 1 {
|
||||
t.Fatalf("expected first data point but got one with key: %v", k)
|
||||
|
@ -625,12 +625,12 @@ func TestWAL_DeleteSeries(t *testing.T) {
|
|||
}
|
||||
|
||||
// ensure data is there
|
||||
c := log.Cursor("cpu,host=A")
|
||||
c := log.Cursor("cpu,host=A", true)
|
||||
if k, _ := c.Next(); btou64(k) != 1 {
|
||||
t.Fatal("expected data point for cpu,host=A")
|
||||
}
|
||||
|
||||
c = log.Cursor("cpu,host=B")
|
||||
c = log.Cursor("cpu,host=B", true)
|
||||
if k, _ := c.Next(); btou64(k) != 2 {
|
||||
t.Fatal("expected data point for cpu,host=B")
|
||||
}
|
||||
|
@ -641,13 +641,13 @@ func TestWAL_DeleteSeries(t *testing.T) {
|
|||
}
|
||||
|
||||
// ensure data is there
|
||||
c = log.Cursor("cpu,host=A")
|
||||
c = log.Cursor("cpu,host=A", true)
|
||||
if k, _ := c.Next(); btou64(k) != 1 {
|
||||
t.Fatal("expected data point for cpu,host=A")
|
||||
}
|
||||
|
||||
// ensure series is deleted
|
||||
c = log.Cursor("cpu,host=B")
|
||||
c = log.Cursor("cpu,host=B", true)
|
||||
if k, _ := c.Next(); k != nil {
|
||||
t.Fatal("expected no data for cpu,host=B")
|
||||
}
|
||||
|
@ -675,13 +675,13 @@ func TestWAL_DeleteSeries(t *testing.T) {
|
|||
}
|
||||
|
||||
// ensure data is there
|
||||
c = log.Cursor("cpu,host=A")
|
||||
c = log.Cursor("cpu,host=A", true)
|
||||
if k, _ := c.Next(); btou64(k) != 1 {
|
||||
t.Fatal("expected data point for cpu,host=A")
|
||||
}
|
||||
|
||||
// ensure series is deleted
|
||||
c = log.Cursor("cpu,host=B")
|
||||
c = log.Cursor("cpu,host=B", true)
|
||||
if k, _ := c.Next(); k != nil {
|
||||
t.Fatal("expected no data for cpu,host=B")
|
||||
}
|
||||
|
@ -805,7 +805,7 @@ func TestWAL_QueryDuringCompaction(t *testing.T) {
|
|||
}
|
||||
|
||||
verify := func() {
|
||||
c := log.Cursor("cpu,host=A")
|
||||
c := log.Cursor("cpu,host=A", true)
|
||||
k, v := c.Seek(inttob(1))
|
||||
// ensure the series are there and points are in order
|
||||
if bytes.Compare(v, p1.Data()) != 0 {
|
||||
|
@ -851,7 +851,7 @@ func TestWAL_PointsSorted(t *testing.T) {
|
|||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
c := log.Cursor("cpu,host=A")
|
||||
c := log.Cursor("cpu,host=A", true)
|
||||
k, _ := c.Next()
|
||||
if btou64(k) != 1 {
|
||||
t.Fatal("points out of order")
|
||||
|
|
|
@ -140,6 +140,23 @@ func (e *SelectExecutor) nextMapperLowestTime(tagset string) int64 {
|
|||
return minTime
|
||||
}
|
||||
|
||||
// nextMapperHighestTime returns the highest time across all Mappers, for the given tagset.
|
||||
func (e *SelectExecutor) nextMapperHighestTime(tagset string) int64 {
|
||||
maxTime := int64(math.MinInt64)
|
||||
for _, m := range e.mappers {
|
||||
if !m.drained && m.bufferedChunk != nil {
|
||||
if m.bufferedChunk.key() != tagset {
|
||||
continue
|
||||
}
|
||||
t := m.bufferedChunk.Values[0].Time
|
||||
if t > maxTime {
|
||||
maxTime = t
|
||||
}
|
||||
}
|
||||
}
|
||||
return maxTime
|
||||
}
|
||||
|
||||
// tagSetIsLimited returns whether data for the given tagset has been LIMITed.
|
||||
func (e *SelectExecutor) tagSetIsLimited(tagset string) bool {
|
||||
_, ok := e.limitedTagSets[tagset]
|
||||
|
@ -246,9 +263,21 @@ func (e *SelectExecutor) executeRaw(out chan *influxql.Row) {
|
|||
rowWriter = nil
|
||||
}
|
||||
|
||||
// Process the mapper outputs. We can send out everything up to the min of the last time
|
||||
// of the chunks for the next tagset.
|
||||
minTime := e.nextMapperLowestTime(tagset)
|
||||
ascending := true
|
||||
if len(e.stmt.SortFields) > 0 {
|
||||
ascending = e.stmt.SortFields[0].Ascending
|
||||
|
||||
}
|
||||
|
||||
var timeBoundary int64
|
||||
|
||||
if ascending {
|
||||
// Process the mapper outputs. We can send out everything up to the min of the last time
|
||||
// of the chunks for the next tagset.
|
||||
timeBoundary = e.nextMapperLowestTime(tagset)
|
||||
} else {
|
||||
timeBoundary = e.nextMapperHighestTime(tagset)
|
||||
}
|
||||
|
||||
// Now empty out all the chunks up to the min time. Create new output struct for this data.
|
||||
var chunkedOutput *MapperOutput
|
||||
|
@ -257,19 +286,30 @@ func (e *SelectExecutor) executeRaw(out chan *influxql.Row) {
|
|||
continue
|
||||
}
|
||||
|
||||
chunkBoundary := false
|
||||
if ascending {
|
||||
chunkBoundary = m.bufferedChunk.Values[0].Time > timeBoundary
|
||||
} else {
|
||||
chunkBoundary = m.bufferedChunk.Values[0].Time < timeBoundary
|
||||
}
|
||||
|
||||
// This mapper's next chunk is not for the next tagset, or the very first value of
|
||||
// the chunk is at a higher acceptable timestamp. Skip it.
|
||||
if m.bufferedChunk.key() != tagset || m.bufferedChunk.Values[0].Time > minTime {
|
||||
if m.bufferedChunk.key() != tagset || chunkBoundary {
|
||||
continue
|
||||
}
|
||||
|
||||
// Find the index of the point up to the min.
|
||||
ind := len(m.bufferedChunk.Values)
|
||||
for i, mo := range m.bufferedChunk.Values {
|
||||
if mo.Time > minTime {
|
||||
if ascending && mo.Time > timeBoundary {
|
||||
ind = i
|
||||
break
|
||||
} else if !ascending && mo.Time < timeBoundary {
|
||||
ind = i
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Add up to the index to the values
|
||||
|
@ -293,8 +333,12 @@ func (e *SelectExecutor) executeRaw(out chan *influxql.Row) {
|
|||
}
|
||||
}
|
||||
|
||||
// Sort the values by time first so we can then handle offset and limit
|
||||
sort.Sort(MapperValues(chunkedOutput.Values))
|
||||
if ascending {
|
||||
// Sort the values by time first so we can then handle offset and limit
|
||||
sort.Sort(MapperValues(chunkedOutput.Values))
|
||||
} else {
|
||||
sort.Sort(sort.Reverse(MapperValues(chunkedOutput.Values)))
|
||||
}
|
||||
|
||||
// Now that we have full name and tag details, initialize the rowWriter.
|
||||
// The Name and Tags will be the same for all mappers.
|
||||
|
|
|
@ -218,12 +218,17 @@ func (lm *SelectMapper) Open() error {
|
|||
}
|
||||
}
|
||||
|
||||
forward := true
|
||||
if len(lm.selectStmt.SortFields) > 0 {
|
||||
forward = lm.selectStmt.SortFields[0].Ascending
|
||||
}
|
||||
|
||||
// Create all cursors for reading the data from this shard.
|
||||
for _, t := range tagSets {
|
||||
cursors := []*seriesCursor{}
|
||||
|
||||
for i, key := range t.SeriesKeys {
|
||||
c := lm.tx.Cursor(key)
|
||||
c := lm.tx.Cursor(key, forward)
|
||||
if c == nil {
|
||||
// No data exists for this key.
|
||||
continue
|
||||
|
@ -238,7 +243,18 @@ func (lm *SelectMapper) Open() error {
|
|||
tsc.pointHeap = newPointHeap()
|
||||
//Prime the buffers.
|
||||
for i := 0; i < len(tsc.cursors); i++ {
|
||||
k, v := tsc.cursors[i].SeekTo(lm.queryTMin)
|
||||
var k int64
|
||||
var v []byte
|
||||
if forward {
|
||||
k, v = tsc.cursors[i].SeekTo(lm.queryTMin)
|
||||
if k == -1 {
|
||||
k, v = tsc.cursors[i].Next()
|
||||
}
|
||||
|
||||
} else {
|
||||
k, v = tsc.cursors[i].SeekTo(lm.queryTMax)
|
||||
}
|
||||
|
||||
if k == -1 {
|
||||
continue
|
||||
}
|
||||
|
@ -325,7 +341,6 @@ func (lm *SelectMapper) nextChunkRaw() (interface{}, error) {
|
|||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if output == nil {
|
||||
output = &MapperOutput{
|
||||
Name: cursor.measurement,
|
||||
|
|
Loading…
Reference in New Issue