From 266bdc1c2bf277867b518f1d396b6604723fe28c Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 27 Aug 2015 16:55:55 -0600 Subject: [PATCH] Support sort by time DESC in wal and bz1 engines --- cmd/influxd/run/server_test.go | 48 +++++++++++++++++++ tsdb/cursor.go | 14 ++++-- tsdb/cursor_test.go | 10 ++-- tsdb/engine.go | 3 +- tsdb/engine/b1/b1.go | 4 +- tsdb/engine/b1/b1_test.go | 2 +- tsdb/engine/bz1/bz1.go | 88 ++++++++++++++++++++++++++++------ tsdb/engine/bz1/bz1_test.go | 16 ++++--- tsdb/engine/wal/wal.go | 31 +++++++++--- tsdb/engine/wal/wal_test.go | 44 ++++++++--------- tsdb/executor.go | 58 +++++++++++++++++++--- tsdb/mapper.go | 21 ++++++-- 12 files changed, 268 insertions(+), 71 deletions(-) diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index e18a7d15fb..cd7c644afd 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -3877,3 +3877,51 @@ func TestServer_Query_EvilIdentifiers(t *testing.T) { } } } + +func TestServer_Query_OrderByTime(t *testing.T) { + t.Parallel() + s := OpenServer(NewConfig(), "") + defer s.Close() + + if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil { + t.Fatal(err) + } + if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil { + t.Fatal(err) + } + + writes := []string{ + fmt.Sprintf(`cpu,host=server1 value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()), + fmt.Sprintf(`cpu,host=server1 value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()), + fmt.Sprintf(`cpu,host=server1 value=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()), + } + + test := NewTest("db0", "rp0") + test.write = strings.Join(writes, "\n") + + test.addQueries([]*Query{ + &Query{ + name: "order on points", + params: url.Values{"db": []string{"db0"}}, + command: `select value from "cpu" ORDER BY time DESC`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:03Z",3],["2000-01-01T00:00:02Z",2],["2000-01-01T00:00:01Z",1]]}]}]}`, + }, + }...) + + for i, query := range test.queries { + if i == 0 { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + } + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } +} diff --git a/tsdb/cursor.go b/tsdb/cursor.go index e5c42ff1dc..e5ac15e823 100644 --- a/tsdb/cursor.go +++ b/tsdb/cursor.go @@ -10,8 +10,8 @@ import ( // If the same key is returned from multiple cursors then the first cursor // specified will take precendence. A key will only be returned once from the // returned cursor. -func MultiCursor(cursors ...Cursor) Cursor { - return &multiCursor{cursors: cursors} +func MultiCursor(forward bool, cursors ...Cursor) Cursor { + return &multiCursor{cursors: cursors, forward: forward} } // multiCursor represents a cursor that combines multiple cursors into one. @@ -19,6 +19,7 @@ type multiCursor struct { cursors []Cursor heap cursorHeap prev []byte + forward bool } // Seek moves the cursor to a given key. @@ -48,6 +49,8 @@ func (mc *multiCursor) Seek(seek []byte) (key, value []byte) { return mc.pop() } +func (mc *multiCursor) Direction() bool { return mc.forward } + // Next returns the next key/value from the cursor. func (mc *multiCursor) Next() (key, value []byte) { return mc.pop() } @@ -90,7 +93,12 @@ type cursorHeap []*cursorHeapItem func (h cursorHeap) Len() int { return len(h) } func (h cursorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h cursorHeap) Less(i, j int) bool { - if cmp := bytes.Compare(h[i].key, h[j].key); cmp == -1 { + dir := -1 + if !h[i].cursor.Direction() { + dir = 1 + } + + if cmp := bytes.Compare(h[i].key, h[j].key); cmp == dir { return true } else if cmp == 0 { return h[i].priority > h[j].priority diff --git a/tsdb/cursor_test.go b/tsdb/cursor_test.go index 1857a34a48..750cc78148 100644 --- a/tsdb/cursor_test.go +++ b/tsdb/cursor_test.go @@ -14,7 +14,7 @@ import ( // Ensure the multi-cursor can correctly iterate across a single subcursor. func TestMultiCursor_Single(t *testing.T) { - mc := tsdb.MultiCursor( + mc := tsdb.MultiCursor(true, NewCursor([]CursorItem{ {Key: []byte{0x00}, Value: []byte{0x00}}, {Key: []byte{0x01}, Value: []byte{0x10}}, @@ -35,7 +35,7 @@ func TestMultiCursor_Single(t *testing.T) { // Ensure the multi-cursor can correctly iterate across multiple non-overlapping subcursors. func TestMultiCursor_Multiple_NonOverlapping(t *testing.T) { - mc := tsdb.MultiCursor( + mc := tsdb.MultiCursor(true, NewCursor([]CursorItem{ {Key: []byte{0x00}, Value: []byte{0x00}}, {Key: []byte{0x03}, Value: []byte{0x30}}, @@ -64,7 +64,7 @@ func TestMultiCursor_Multiple_NonOverlapping(t *testing.T) { // Ensure the multi-cursor can correctly iterate across multiple overlapping subcursors. func TestMultiCursor_Multiple_Overlapping(t *testing.T) { - mc := tsdb.MultiCursor( + mc := tsdb.MultiCursor(true, NewCursor([]CursorItem{ {Key: []byte{0x00}, Value: []byte{0x00}}, {Key: []byte{0x03}, Value: []byte{0x03}}, @@ -118,7 +118,7 @@ func TestMultiCursor_Quick(t *testing.T) { sort.Sort(byteSlices(exp)) // Create multi-cursor and iterate over all items. - mc := tsdb.MultiCursor(tsdbCursorSlice(cursors)...) + mc := tsdb.MultiCursor(true, tsdbCursorSlice(cursors)...) for k, v := mc.Seek(u64tob(seek)); k != nil; k, v = mc.Next() { got = append(got, append(k, v...)) } @@ -144,6 +144,8 @@ func NewCursor(items []CursorItem) *Cursor { return &Cursor{items: items} } +func (c *Cursor) Direction() bool { return true } + // Seek seeks to an item by key. func (c *Cursor) Seek(seek []byte) (key, value []byte) { for c.index = 0; c.index < len(c.items); c.index++ { diff --git a/tsdb/engine.go b/tsdb/engine.go index f2d1332f3f..d902a6d21a 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -123,7 +123,7 @@ func NewEngineOptions() EngineOptions { type Tx interface { io.WriterTo - Cursor(series string) Cursor + Cursor(series string, forward bool) Cursor Size() int64 Commit() error Rollback() error @@ -133,6 +133,7 @@ type Tx interface { type Cursor interface { Seek(seek []byte) (key, value []byte) Next() (key, value []byte) + Direction() bool } // DedupeEntries returns slices with unique keys (the first 8 bytes). diff --git a/tsdb/engine/b1/b1.go b/tsdb/engine/b1/b1.go index ce9cde2922..46727b183a 100644 --- a/tsdb/engine/b1/b1.go +++ b/tsdb/engine/b1/b1.go @@ -550,7 +550,7 @@ type Tx struct { } // Cursor returns an iterator for a key. -func (tx *Tx) Cursor(key string) tsdb.Cursor { +func (tx *Tx) Cursor(key string, forward bool) tsdb.Cursor { // Retrieve key bucket. b := tx.Bucket([]byte(key)) @@ -591,6 +591,8 @@ type Cursor struct { prev []byte } +func (c *Cursor) Direction() bool { return true } + // Seek moves the cursor to a position and returns the closest key/value pair. func (c *Cursor) Seek(seek []byte) (key, value []byte) { // Seek bolt cursor. diff --git a/tsdb/engine/b1/b1_test.go b/tsdb/engine/b1/b1_test.go index 24ebdb306d..f8860cfb8f 100644 --- a/tsdb/engine/b1/b1_test.go +++ b/tsdb/engine/b1/b1_test.go @@ -64,7 +64,7 @@ func TestEngine_WritePoints(t *testing.T) { tx := e.MustBegin(false) defer tx.Rollback() - c := tx.Cursor("temperature") + c := tx.Cursor("temperature", true) if k, v := c.Seek([]byte{0}); !bytes.Equal(k, u64tob(uint64(time.Unix(1434059627, 0).UnixNano()))) { t.Fatalf("unexpected key: %#v", k) } else if m, err := mf.Codec.DecodeFieldsWithNames(v); err != nil { diff --git a/tsdb/engine/bz1/bz1.go b/tsdb/engine/bz1/bz1.go index 82dcafaaf4..ab1db8a414 100644 --- a/tsdb/engine/bz1/bz1.go +++ b/tsdb/engine/bz1/bz1.go @@ -59,7 +59,7 @@ type WAL interface { WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error DeleteSeries(keys []string) error - Cursor(key string) tsdb.Cursor + Cursor(key string, forward bool) tsdb.Cursor Open() error Close() error Flush() error @@ -606,8 +606,8 @@ type Tx struct { } // Cursor returns an iterator for a key. -func (tx *Tx) Cursor(key string) tsdb.Cursor { - walCursor := tx.wal.Cursor(key) +func (tx *Tx) Cursor(key string, forward bool) tsdb.Cursor { + walCursor := tx.wal.Cursor(key, forward) // Retrieve points bucket. Ignore if there is no bucket. b := tx.Bucket([]byte("points")).Bucket([]byte(key)) @@ -616,19 +616,25 @@ func (tx *Tx) Cursor(key string) tsdb.Cursor { } c := &Cursor{ - cursor: b.Cursor(), + cursor: b.Cursor(), + forward: forward, } - return tsdb.MultiCursor(walCursor, c) + return tsdb.MultiCursor(forward, walCursor, c) } // Cursor provides ordered iteration across a series. type Cursor struct { - cursor *bolt.Cursor - buf []byte // uncompressed buffer - off int // buffer offset + cursor *bolt.Cursor + buf []byte // uncompressed buffer + off int // buffer offset + forward bool + fieldIndices []int + index int } +func (c *Cursor) Direction() bool { return c.forward } + // Seek moves the cursor to a position and returns the closest key/value pair. func (c *Cursor) Seek(seek []byte) (key, value []byte) { // Move cursor to appropriate block and set to buffer. @@ -659,12 +665,26 @@ func (c *Cursor) seekBuf(seek []byte) (key, value []byte) { buf := c.buf[c.off:] // Exit if current entry's timestamp is on or after the seek. - if len(buf) == 0 || bytes.Compare(buf[0:8], seek) != -1 { + if len(buf) == 0 { return } - // Otherwise skip ahead to the next entry. - c.off += entryHeaderSize + entryDataSize(buf) + if c.forward && bytes.Compare(buf[0:8], seek) != -1 { + return + } else if !c.forward && bytes.Compare(buf[0:8], seek) != 1 { + return + } + + if c.forward { + // Otherwise skip ahead to the next entry. + c.off += entryHeaderSize + entryDataSize(buf) + } else { + c.index -= 1 + if c.index < 0 { + return + } + c.off = c.fieldIndices[c.index] + } } } @@ -675,8 +695,22 @@ func (c *Cursor) Next() (key, value []byte) { return nil, nil } - // Move forward to next entry. - c.off += entryHeaderSize + entryDataSize(c.buf[c.off:]) + if c.forward { + // Move forward to next entry. + c.off += entryHeaderSize + entryDataSize(c.buf[c.off:]) + } else { + c.index -= 1 + + // If we've move past the beginning of buf, grab the previous block + if c.index < 0 { + _, v := c.cursor.Prev() + c.setBuf(v) + } + + if len(c.fieldIndices) > 0 { + c.off = c.fieldIndices[c.index] + } + } // If no items left then read first item from next block. if c.off >= len(c.buf) { @@ -691,7 +725,7 @@ func (c *Cursor) Next() (key, value []byte) { func (c *Cursor) setBuf(block []byte) { // Clear if the block is empty. if len(block) == 0 { - c.buf, c.off = c.buf[0:0], 0 + c.buf, c.off, c.fieldIndices, c.index = c.buf[0:0], 0, c.fieldIndices[0:0], 0 return } @@ -702,7 +736,31 @@ func (c *Cursor) setBuf(block []byte) { c.buf = c.buf[0:0] log.Printf("block decode error: %s", err) } - c.buf, c.off = buf, 0 + + if c.forward { + c.buf, c.off = buf, 0 + } else { + c.buf, c.off = buf, 0 + + // Buf contains multiple fields packed into a byte slice with timestamp + // and data lengths. We need to build an index into this byte slice that + // tells us where each field block is in buf so we can iterate backward without + // rescanning the buf each time Next is called. Forward iteration does not + // need this because we know the entries lenghth and the header size so we can + // skip forward that many bytes. + c.fieldIndices = []int{} + for { + if c.off >= len(buf) { + break + } + + c.fieldIndices = append(c.fieldIndices, c.off) + c.off += entryHeaderSize + entryDataSize(buf[c.off:]) + } + + c.off = c.fieldIndices[len(c.fieldIndices)-1] + c.index = len(c.fieldIndices) - 1 + } } // read reads the current key and value from the current block. diff --git a/tsdb/engine/bz1/bz1_test.go b/tsdb/engine/bz1/bz1_test.go index 69144cc2a5..ba285913b6 100644 --- a/tsdb/engine/bz1/bz1_test.go +++ b/tsdb/engine/bz1/bz1_test.go @@ -175,7 +175,7 @@ func TestEngine_WriteIndex_Append(t *testing.T) { defer tx.Rollback() // Iterate over "cpu" series. - c := tx.Cursor("cpu") + c := tx.Cursor("cpu", true) if k, v := c.Seek(u64tob(0)); !reflect.DeepEqual(k, []byte{0, 0, 0, 0, 0, 0, 0, 1}) || !reflect.DeepEqual(v, []byte{0x10}) { t.Fatalf("unexpected key/value: %x / %x", k, v) } else if k, v = c.Next(); !reflect.DeepEqual(k, []byte{0, 0, 0, 0, 0, 0, 0, 2}) || !reflect.DeepEqual(v, []byte{0x20}) { @@ -185,7 +185,7 @@ func TestEngine_WriteIndex_Append(t *testing.T) { } // Iterate over "mem" series. - c = tx.Cursor("mem") + c = tx.Cursor("mem", true) if k, v := c.Seek(u64tob(0)); !reflect.DeepEqual(k, []byte{0, 0, 0, 0, 0, 0, 0, 0}) || !reflect.DeepEqual(v, []byte{0x30}) { t.Fatalf("unexpected key/value: %x / %x", k, v) } else if k, _ = c.Next(); k != nil { @@ -235,7 +235,7 @@ func TestEngine_WriteIndex_Insert(t *testing.T) { defer tx.Rollback() // Iterate over "cpu" series. - c := tx.Cursor("cpu") + c := tx.Cursor("cpu", true) if k, v := c.Seek(u64tob(0)); btou64(k) != 9 || !bytes.Equal(v, []byte{0x09}) { t.Fatalf("unexpected key/value: %x / %x", k, v) } else if k, v = c.Next(); btou64(k) != 10 || !bytes.Equal(v, []byte{0xFF}) { @@ -276,7 +276,7 @@ func TestEngine_WriteIndex_SeekAgainstInBlockValue(t *testing.T) { defer tx.Rollback() // Ensure that we can seek to a block in the middle - c := tx.Cursor("cpu") + c := tx.Cursor("cpu", true) if k, _ := c.Seek(u64tob(15)); btou64(k) != 20 { t.Fatalf("expected to seek to time 20, but got %d", btou64(k)) } @@ -334,7 +334,7 @@ func TestEngine_WriteIndex_Quick(t *testing.T) { // Iterate over results to ensure they are correct. for _, key := range keys { - c := tx.Cursor(key) + c := tx.Cursor(key, true) // Read list of key/values. var got [][]byte @@ -381,7 +381,7 @@ func TestEngine_WriteIndex_Quick_Append(t *testing.T) { // Iterate over results to ensure they are correct. for _, key := range keys { - c := tx.Cursor(key) + c := tx.Cursor(key, true) // Read list of key/values. var got [][]byte @@ -523,7 +523,7 @@ func (w *EnginePointsWriter) Open() error { return nil } func (w *EnginePointsWriter) Close() error { return nil } -func (w *EnginePointsWriter) Cursor(key string) tsdb.Cursor { return &Cursor{} } +func (w *EnginePointsWriter) Cursor(key string, forward bool) tsdb.Cursor { return &Cursor{} } func (w *EnginePointsWriter) Flush() error { return nil } @@ -531,6 +531,8 @@ func (w *EnginePointsWriter) Flush() error { return nil } type Cursor struct { } +func (c *Cursor) Direction() bool { return true } + func (c *Cursor) Seek(key []byte) ([]byte, []byte) { return nil, nil } func (c *Cursor) Next() ([]byte, []byte) { return nil, nil } diff --git a/tsdb/engine/wal/wal.go b/tsdb/engine/wal/wal.go index 4c139456f3..786a924002 100644 --- a/tsdb/engine/wal/wal.go +++ b/tsdb/engine/wal/wal.go @@ -223,11 +223,11 @@ func (l *Log) Open() error { } // Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given -func (l *Log) Cursor(key string) tsdb.Cursor { +func (l *Log) Cursor(key string, forward bool) tsdb.Cursor { l.mu.RLock() defer l.mu.RUnlock() - return l.partition([]byte(key)).cursor(key) + return l.partition([]byte(key)).cursor(key, forward) } func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error { @@ -1380,7 +1380,7 @@ func (p *Partition) addToCache(key, data []byte, timestamp int64) { } // cursor will combine the in memory cache and flush cache (if a flush is currently happening) to give a single ordered cursor for the key -func (p *Partition) cursor(key string) *cursor { +func (p *Partition) cursor(key string, forward bool) *cursor { p.mu.Lock() defer p.mu.Unlock() @@ -1398,7 +1398,7 @@ func (p *Partition) cursor(key string) *cursor { c = append(c, entry.points...) dedupe := tsdb.DedupeEntries(c) - return &cursor{cache: dedupe} + return &cursor{cache: dedupe, forward: forward} } } @@ -1410,7 +1410,7 @@ func (p *Partition) cursor(key string) *cursor { // build a copy so modifications to the partition don't change the result set a := make([][]byte, len(entry.points)) copy(a, entry.points) - return &cursor{cache: a} + return &cursor{cache: a, forward: forward} } // idFromFileName parses the segment file ID from its name @@ -1593,8 +1593,11 @@ type entry struct { type cursor struct { cache [][]byte position int + forward bool } +func (c *cursor) Direction() bool { return c.forward } + // Seek will point the cursor to the given time (or key) func (c *cursor) Seek(seek []byte) (key, value []byte) { // Seek cache index. @@ -1607,12 +1610,26 @@ func (c *cursor) Seek(seek []byte) (key, value []byte) { // Next moves the cursor to the next key/value. will return nil if at the end func (c *cursor) Next() (key, value []byte) { - if c.position >= len(c.cache) { + + if !c.forward && c.position >= len(c.cache) { + c.position-- + } + + if c.forward && c.position >= len(c.cache) { + return nil, nil + } + + if !c.forward && c.position < 0 { return nil, nil } v := c.cache[c.position] - c.position++ + + if c.forward { + c.position++ + } else { + c.position-- + } return v[0:8], v[8:] diff --git a/tsdb/engine/wal/wal_test.go b/tsdb/engine/wal/wal_test.go index e9aaab887f..c00fa30ea4 100644 --- a/tsdb/engine/wal/wal_test.go +++ b/tsdb/engine/wal/wal_test.go @@ -46,7 +46,7 @@ func TestWAL_WritePoints(t *testing.T) { } verify := func() { - c := log.Cursor("cpu,host=A") + c := log.Cursor("cpu,host=A", true) k, v := c.Seek(inttob(1)) // ensure the series are there and points are in order @@ -64,7 +64,7 @@ func TestWAL_WritePoints(t *testing.T) { t.Fatalf("expected nil on last seek: %v %v", k, v) } - c = log.Cursor("cpu,host=B") + c = log.Cursor("cpu,host=B", true) k, v = c.Next() if bytes.Compare(v, p3.Data()) != 0 { t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v) @@ -92,7 +92,7 @@ func TestWAL_WritePoints(t *testing.T) { } verify2 := func() { - c := log.Cursor("cpu,host=A") + c := log.Cursor("cpu,host=A", true) k, v := c.Next() if bytes.Compare(v, p1.Data()) != 0 { t.Fatalf("order wrong, expected p1, %v %v %v", v, k, p1.Data()) @@ -110,7 +110,7 @@ func TestWAL_WritePoints(t *testing.T) { t.Fatal("order wrong, expected p6") } - c = log.Cursor("cpu,host=C") + c = log.Cursor("cpu,host=C", true) _, v = c.Next() if bytes.Compare(v, p5.Data()) != 0 { t.Fatal("order wrong, expected p6") @@ -150,7 +150,7 @@ func TestWAL_CorruptDataLengthSize(t *testing.T) { } verify := func() { - c := log.Cursor("cpu,host=A") + c := log.Cursor("cpu,host=A", true) _, v := c.Next() if bytes.Compare(v, p1.Data()) != 0 { t.Fatal("p1 value wrong") @@ -183,7 +183,7 @@ func TestWAL_CorruptDataLengthSize(t *testing.T) { } verify = func() { - c := log.Cursor("cpu,host=A") + c := log.Cursor("cpu,host=A", true) _, v := c.Next() if bytes.Compare(v, p1.Data()) != 0 { t.Fatal("p1 value wrong") @@ -229,7 +229,7 @@ func TestWAL_CorruptDataBlock(t *testing.T) { } verify := func() { - c := log.Cursor("cpu,host=A") + c := log.Cursor("cpu,host=A", true) _, v := c.Next() if bytes.Compare(v, p1.Data()) != 0 { t.Fatal("p1 value wrong") @@ -268,7 +268,7 @@ func TestWAL_CorruptDataBlock(t *testing.T) { } verify = func() { - c := log.Cursor("cpu,host=A") + c := log.Cursor("cpu,host=A", true) _, v := c.Next() if bytes.Compare(v, p1.Data()) != 0 { t.Fatal("p1 value wrong") @@ -349,7 +349,7 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) { } // ensure we have some data - c := log.Cursor("cpu,host=A,region=uswest23") + c := log.Cursor("cpu,host=A,region=uswest23", true) k, v := c.Next() if btou64(k) != 1 { t.Fatalf("expected timestamp of 1, but got %v %v", k, v) @@ -365,13 +365,13 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) { } // should be nil - c = log.Cursor("cpu,host=A,region=uswest23") + c = log.Cursor("cpu,host=A,region=uswest23", true) k, v = c.Next() if k != nil || v != nil { t.Fatal("expected cache to be nil after flush: ", k, v) } - c = log.Cursor("cpu,host=A,region=useast1") + c = log.Cursor("cpu,host=A,region=useast1", true) k, v = c.Next() if btou64(k) != 1 { t.Fatal("expected cache to be there after flush and compact: ", k, v) @@ -385,13 +385,13 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) { log.Close() log.Open() - c = log.Cursor("cpu,host=A,region=uswest23") + c = log.Cursor("cpu,host=A,region=uswest23", true) k, v = c.Next() if k != nil || v != nil { t.Fatal("expected cache to be nil after flush and re-open: ", k, v) } - c = log.Cursor("cpu,host=A,region=useast1") + c = log.Cursor("cpu,host=A,region=useast1", true) k, v = c.Next() if btou64(k) != 1 { t.Fatal("expected cache to be there after flush and compact: ", k, v) @@ -444,7 +444,7 @@ func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) { } // ensure we have some data - c := log.Cursor("cpu,host=A,region=uswest10") + c := log.Cursor("cpu,host=A,region=uswest10", true) k, _ := c.Next() if btou64(k) != 1 { t.Fatalf("expected first data point but got one with key: %v", k) @@ -625,12 +625,12 @@ func TestWAL_DeleteSeries(t *testing.T) { } // ensure data is there - c := log.Cursor("cpu,host=A") + c := log.Cursor("cpu,host=A", true) if k, _ := c.Next(); btou64(k) != 1 { t.Fatal("expected data point for cpu,host=A") } - c = log.Cursor("cpu,host=B") + c = log.Cursor("cpu,host=B", true) if k, _ := c.Next(); btou64(k) != 2 { t.Fatal("expected data point for cpu,host=B") } @@ -641,13 +641,13 @@ func TestWAL_DeleteSeries(t *testing.T) { } // ensure data is there - c = log.Cursor("cpu,host=A") + c = log.Cursor("cpu,host=A", true) if k, _ := c.Next(); btou64(k) != 1 { t.Fatal("expected data point for cpu,host=A") } // ensure series is deleted - c = log.Cursor("cpu,host=B") + c = log.Cursor("cpu,host=B", true) if k, _ := c.Next(); k != nil { t.Fatal("expected no data for cpu,host=B") } @@ -675,13 +675,13 @@ func TestWAL_DeleteSeries(t *testing.T) { } // ensure data is there - c = log.Cursor("cpu,host=A") + c = log.Cursor("cpu,host=A", true) if k, _ := c.Next(); btou64(k) != 1 { t.Fatal("expected data point for cpu,host=A") } // ensure series is deleted - c = log.Cursor("cpu,host=B") + c = log.Cursor("cpu,host=B", true) if k, _ := c.Next(); k != nil { t.Fatal("expected no data for cpu,host=B") } @@ -805,7 +805,7 @@ func TestWAL_QueryDuringCompaction(t *testing.T) { } verify := func() { - c := log.Cursor("cpu,host=A") + c := log.Cursor("cpu,host=A", true) k, v := c.Seek(inttob(1)) // ensure the series are there and points are in order if bytes.Compare(v, p1.Data()) != 0 { @@ -851,7 +851,7 @@ func TestWAL_PointsSorted(t *testing.T) { t.Fatalf("failed to write points: %s", err.Error()) } - c := log.Cursor("cpu,host=A") + c := log.Cursor("cpu,host=A", true) k, _ := c.Next() if btou64(k) != 1 { t.Fatal("points out of order") diff --git a/tsdb/executor.go b/tsdb/executor.go index 72dbeb676e..1c6cd63ec4 100644 --- a/tsdb/executor.go +++ b/tsdb/executor.go @@ -140,6 +140,23 @@ func (e *SelectExecutor) nextMapperLowestTime(tagset string) int64 { return minTime } +// nextMapperHighestTime returns the highest time across all Mappers, for the given tagset. +func (e *SelectExecutor) nextMapperHighestTime(tagset string) int64 { + maxTime := int64(math.MinInt64) + for _, m := range e.mappers { + if !m.drained && m.bufferedChunk != nil { + if m.bufferedChunk.key() != tagset { + continue + } + t := m.bufferedChunk.Values[0].Time + if t > maxTime { + maxTime = t + } + } + } + return maxTime +} + // tagSetIsLimited returns whether data for the given tagset has been LIMITed. func (e *SelectExecutor) tagSetIsLimited(tagset string) bool { _, ok := e.limitedTagSets[tagset] @@ -246,9 +263,21 @@ func (e *SelectExecutor) executeRaw(out chan *influxql.Row) { rowWriter = nil } - // Process the mapper outputs. We can send out everything up to the min of the last time - // of the chunks for the next tagset. - minTime := e.nextMapperLowestTime(tagset) + ascending := true + if len(e.stmt.SortFields) > 0 { + ascending = e.stmt.SortFields[0].Ascending + + } + + var timeBoundary int64 + + if ascending { + // Process the mapper outputs. We can send out everything up to the min of the last time + // of the chunks for the next tagset. + timeBoundary = e.nextMapperLowestTime(tagset) + } else { + timeBoundary = e.nextMapperHighestTime(tagset) + } // Now empty out all the chunks up to the min time. Create new output struct for this data. var chunkedOutput *MapperOutput @@ -257,19 +286,30 @@ func (e *SelectExecutor) executeRaw(out chan *influxql.Row) { continue } + chunkBoundary := false + if ascending { + chunkBoundary = m.bufferedChunk.Values[0].Time > timeBoundary + } else { + chunkBoundary = m.bufferedChunk.Values[0].Time < timeBoundary + } + // This mapper's next chunk is not for the next tagset, or the very first value of // the chunk is at a higher acceptable timestamp. Skip it. - if m.bufferedChunk.key() != tagset || m.bufferedChunk.Values[0].Time > minTime { + if m.bufferedChunk.key() != tagset || chunkBoundary { continue } // Find the index of the point up to the min. ind := len(m.bufferedChunk.Values) for i, mo := range m.bufferedChunk.Values { - if mo.Time > minTime { + if ascending && mo.Time > timeBoundary { + ind = i + break + } else if !ascending && mo.Time < timeBoundary { ind = i break } + } // Add up to the index to the values @@ -293,8 +333,12 @@ func (e *SelectExecutor) executeRaw(out chan *influxql.Row) { } } - // Sort the values by time first so we can then handle offset and limit - sort.Sort(MapperValues(chunkedOutput.Values)) + if ascending { + // Sort the values by time first so we can then handle offset and limit + sort.Sort(MapperValues(chunkedOutput.Values)) + } else { + sort.Sort(sort.Reverse(MapperValues(chunkedOutput.Values))) + } // Now that we have full name and tag details, initialize the rowWriter. // The Name and Tags will be the same for all mappers. diff --git a/tsdb/mapper.go b/tsdb/mapper.go index e46b535e1d..68ac80cb25 100644 --- a/tsdb/mapper.go +++ b/tsdb/mapper.go @@ -218,12 +218,17 @@ func (lm *SelectMapper) Open() error { } } + forward := true + if len(lm.selectStmt.SortFields) > 0 { + forward = lm.selectStmt.SortFields[0].Ascending + } + // Create all cursors for reading the data from this shard. for _, t := range tagSets { cursors := []*seriesCursor{} for i, key := range t.SeriesKeys { - c := lm.tx.Cursor(key) + c := lm.tx.Cursor(key, forward) if c == nil { // No data exists for this key. continue @@ -238,7 +243,18 @@ func (lm *SelectMapper) Open() error { tsc.pointHeap = newPointHeap() //Prime the buffers. for i := 0; i < len(tsc.cursors); i++ { - k, v := tsc.cursors[i].SeekTo(lm.queryTMin) + var k int64 + var v []byte + if forward { + k, v = tsc.cursors[i].SeekTo(lm.queryTMin) + if k == -1 { + k, v = tsc.cursors[i].Next() + } + + } else { + k, v = tsc.cursors[i].SeekTo(lm.queryTMax) + } + if k == -1 { continue } @@ -325,7 +341,6 @@ func (lm *SelectMapper) nextChunkRaw() (interface{}, error) { continue } } - if output == nil { output = &MapperOutput{ Name: cursor.measurement,