diff --git a/CHANGELOG.md b/CHANGELOG.md index 70d5327ad1..f3ead791f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ There are breaking changes in this release: - [#4685](https://github.com/influxdb/influxdb/pull/4685): Automatically promote node to raft peer if drop server results in removing a raft peer. ### Bugfixes +- [#4235](https://github.com/influxdb/influxdb/issues/4235): "ORDER BY DESC" doesn't properly order - [#4789](https://github.com/influxdb/influxdb/pull/4789): Decode WHERE fields during aggregates. Fix [issue #4701](https://github.com/influxdb/influxdb/issues/4701). - [#4778](https://github.com/influxdb/influxdb/pull/4778): If there are no points to count, count is 0. - [#4715](https://github.com/influxdb/influxdb/pull/4715): Fix panic during Raft-close. Fix [issue #4707](https://github.com/influxdb/influxdb/issues/4707). Thanks @oiooj diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index d07a6cf383..d4b1b237ed 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -5118,6 +5118,11 @@ func TestServer_Query_OrderByTime(t *testing.T) { fmt.Sprintf(`cpu,host=server1 value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()), fmt.Sprintf(`cpu,host=server1 value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()), fmt.Sprintf(`cpu,host=server1 value=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()), + + fmt.Sprintf(`power,presence=true value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()), + fmt.Sprintf(`power,presence=true value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()), + fmt.Sprintf(`power,presence=true value=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()), + fmt.Sprintf(`power,presence=false value=4 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:04Z").UnixNano()), } test := NewTest("db0", "rp0") @@ -5130,6 +5135,13 @@ func TestServer_Query_OrderByTime(t *testing.T) { command: `select value from "cpu" ORDER BY time DESC`, exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:03Z",3],["2000-01-01T00:00:02Z",2],["2000-01-01T00:00:01Z",1]]}]}]}`, }, + + &Query{ + name: "order desc with tags", + params: url.Values{"db": []string{"db0"}}, + command: `select value from "power" ORDER BY time DESC`, + exp: `{"results":[{"series":[{"name":"power","columns":["time","value"],"values":[["2000-01-01T00:00:04Z",4],["2000-01-01T00:00:03Z",3],["2000-01-01T00:00:02Z",2],["2000-01-01T00:00:01Z",1]]}]}]}`, + }, }...) for i, query := range test.queries { diff --git a/tsdb/cursor.go b/tsdb/cursor.go index b7b7c6b752..013c95d4aa 100644 --- a/tsdb/cursor.go +++ b/tsdb/cursor.go @@ -153,22 +153,24 @@ type TagSetCursor struct { tags map[string]string // Tag key-value pairs cursors []*TagsCursor // Underlying tags cursors. currentTags map[string]string // the current tags for the underlying series cursor in play + ascending bool SelectFields []string // fields to be selected // Min-heap of cursors ordered by timestamp. - heap *pointHeap + heap heap.Interface // Memoize the cursor's tagset-based key. memokey string } // NewTagSetCursor returns a instance of TagSetCursor. -func NewTagSetCursor(m string, t map[string]string, c []*TagsCursor) *TagSetCursor { +func NewTagSetCursor(m string, t map[string]string, c []*TagsCursor, ascending bool) *TagSetCursor { return &TagSetCursor{ measurement: m, tags: t, cursors: c, + ascending: ascending, heap: newPointHeap(), } } @@ -185,7 +187,11 @@ func (tsc *TagSetCursor) key() string { } func (tsc *TagSetCursor) Init(seek int64) { - tsc.heap = newPointHeap() + if tsc.ascending { + tsc.heap = newPointHeap() + } else { + tsc.heap = newPointHeapReverse() + } // Prime the buffers. for i := 0; i < len(tsc.cursors); i++ { @@ -297,13 +303,27 @@ type pointHeapItem struct { } type pointHeap []*pointHeapItem +type pointHeapReverse struct { + pointHeap +} -func newPointHeap() *pointHeap { +func newPointHeap() heap.Interface { q := make(pointHeap, 0) heap.Init(&q) return &q } +func newPointHeapReverse() heap.Interface { + q := make(pointHeap, 0) + heap.Init(&q) + + return &pointHeapReverse{q} +} + +func (pq *pointHeapReverse) Less(i, j int) bool { + return pq.pointHeap[i].timestamp > pq.pointHeap[j].timestamp +} + func (pq pointHeap) Len() int { return len(pq) } func (pq pointHeap) Less(i, j int) bool { diff --git a/tsdb/raw.go b/tsdb/raw.go index dc13b9737b..0a73d6c06e 100644 --- a/tsdb/raw.go +++ b/tsdb/raw.go @@ -873,7 +873,7 @@ func (m *RawMapper) openMeasurement(mm *Measurement) error { cursors = append(cursors, cm) } - tsc := NewTagSetCursor(mm.Name, t.Tags, cursors) + tsc := NewTagSetCursor(mm.Name, t.Tags, cursors, ascending) tsc.SelectFields = m.selectFields if ascending { tsc.Init(m.qmin)