Add interface for heap to support Reverse for `order by desc`
parent
cfbfbc2361
commit
824d7a1d9b
|
@ -57,6 +57,7 @@ There are breaking changes in this release:
|
|||
- [#4685](https://github.com/influxdb/influxdb/pull/4685): Automatically promote node to raft peer if drop server results in removing a raft peer.
|
||||
|
||||
### Bugfixes
|
||||
- [#4235](https://github.com/influxdb/influxdb/issues/4235): "ORDER BY DESC" doesn't properly order
|
||||
- [#4789](https://github.com/influxdb/influxdb/pull/4789): Decode WHERE fields during aggregates. Fix [issue #4701](https://github.com/influxdb/influxdb/issues/4701).
|
||||
- [#4778](https://github.com/influxdb/influxdb/pull/4778): If there are no points to count, count is 0.
|
||||
- [#4715](https://github.com/influxdb/influxdb/pull/4715): Fix panic during Raft-close. Fix [issue #4707](https://github.com/influxdb/influxdb/issues/4707). Thanks @oiooj
|
||||
|
|
|
@ -5118,6 +5118,11 @@ func TestServer_Query_OrderByTime(t *testing.T) {
|
|||
fmt.Sprintf(`cpu,host=server1 value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
|
||||
fmt.Sprintf(`cpu,host=server1 value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()),
|
||||
fmt.Sprintf(`cpu,host=server1 value=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()),
|
||||
|
||||
fmt.Sprintf(`power,presence=true value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
|
||||
fmt.Sprintf(`power,presence=true value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()),
|
||||
fmt.Sprintf(`power,presence=true value=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()),
|
||||
fmt.Sprintf(`power,presence=false value=4 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:04Z").UnixNano()),
|
||||
}
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
|
@ -5130,6 +5135,13 @@ func TestServer_Query_OrderByTime(t *testing.T) {
|
|||
command: `select value from "cpu" ORDER BY time DESC`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:03Z",3],["2000-01-01T00:00:02Z",2],["2000-01-01T00:00:01Z",1]]}]}]}`,
|
||||
},
|
||||
|
||||
&Query{
|
||||
name: "order desc with tags",
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `select value from "power" ORDER BY time DESC`,
|
||||
exp: `{"results":[{"series":[{"name":"power","columns":["time","value"],"values":[["2000-01-01T00:00:04Z",4],["2000-01-01T00:00:03Z",3],["2000-01-01T00:00:02Z",2],["2000-01-01T00:00:01Z",1]]}]}]}`,
|
||||
},
|
||||
}...)
|
||||
|
||||
for i, query := range test.queries {
|
||||
|
|
|
@ -153,22 +153,24 @@ type TagSetCursor struct {
|
|||
tags map[string]string // Tag key-value pairs
|
||||
cursors []*TagsCursor // Underlying tags cursors.
|
||||
currentTags map[string]string // the current tags for the underlying series cursor in play
|
||||
ascending bool
|
||||
|
||||
SelectFields []string // fields to be selected
|
||||
|
||||
// Min-heap of cursors ordered by timestamp.
|
||||
heap *pointHeap
|
||||
heap heap.Interface
|
||||
|
||||
// Memoize the cursor's tagset-based key.
|
||||
memokey string
|
||||
}
|
||||
|
||||
// NewTagSetCursor returns a instance of TagSetCursor.
|
||||
func NewTagSetCursor(m string, t map[string]string, c []*TagsCursor) *TagSetCursor {
|
||||
func NewTagSetCursor(m string, t map[string]string, c []*TagsCursor, ascending bool) *TagSetCursor {
|
||||
return &TagSetCursor{
|
||||
measurement: m,
|
||||
tags: t,
|
||||
cursors: c,
|
||||
ascending: ascending,
|
||||
heap: newPointHeap(),
|
||||
}
|
||||
}
|
||||
|
@ -185,7 +187,11 @@ func (tsc *TagSetCursor) key() string {
|
|||
}
|
||||
|
||||
func (tsc *TagSetCursor) Init(seek int64) {
|
||||
tsc.heap = newPointHeap()
|
||||
if tsc.ascending {
|
||||
tsc.heap = newPointHeap()
|
||||
} else {
|
||||
tsc.heap = newPointHeapReverse()
|
||||
}
|
||||
|
||||
// Prime the buffers.
|
||||
for i := 0; i < len(tsc.cursors); i++ {
|
||||
|
@ -297,13 +303,27 @@ type pointHeapItem struct {
|
|||
}
|
||||
|
||||
type pointHeap []*pointHeapItem
|
||||
type pointHeapReverse struct {
|
||||
pointHeap
|
||||
}
|
||||
|
||||
func newPointHeap() *pointHeap {
|
||||
func newPointHeap() heap.Interface {
|
||||
q := make(pointHeap, 0)
|
||||
heap.Init(&q)
|
||||
return &q
|
||||
}
|
||||
|
||||
func newPointHeapReverse() heap.Interface {
|
||||
q := make(pointHeap, 0)
|
||||
heap.Init(&q)
|
||||
|
||||
return &pointHeapReverse{q}
|
||||
}
|
||||
|
||||
func (pq *pointHeapReverse) Less(i, j int) bool {
|
||||
return pq.pointHeap[i].timestamp > pq.pointHeap[j].timestamp
|
||||
}
|
||||
|
||||
func (pq pointHeap) Len() int { return len(pq) }
|
||||
|
||||
func (pq pointHeap) Less(i, j int) bool {
|
||||
|
|
|
@ -873,7 +873,7 @@ func (m *RawMapper) openMeasurement(mm *Measurement) error {
|
|||
cursors = append(cursors, cm)
|
||||
}
|
||||
|
||||
tsc := NewTagSetCursor(mm.Name, t.Tags, cursors)
|
||||
tsc := NewTagSetCursor(mm.Name, t.Tags, cursors, ascending)
|
||||
tsc.SelectFields = m.selectFields
|
||||
if ascending {
|
||||
tsc.Init(m.qmin)
|
||||
|
|
Loading…
Reference in New Issue