Add support for limit and offset on the number of series returned
parent
0baf8bd084
commit
dffcac541d
|
@ -1053,6 +1053,34 @@ func TestServer_RawDataReturnsInOrder(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that limit and offset work
|
||||
func TestServer_LimitAndOffset(t *testing.T) {
|
||||
s := OpenServer(NewMessagingClient())
|
||||
defer s.Close()
|
||||
s.CreateDatabase("foo")
|
||||
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour})
|
||||
s.SetDefaultRetentionPolicy("foo", "raw")
|
||||
|
||||
for i := 1; i < 10; i++ {
|
||||
host := fmt.Sprintf("server-%d", i)
|
||||
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": host}, Timestamp: time.Unix(int64(i), 0), Fields: map[string]interface{}{"value": float64(i)}}})
|
||||
}
|
||||
|
||||
results := s.ExecuteQuery(MustParseQuery(`SELECT count(value) FROM cpu GROUP BY * LIMIT 20`), "foo", nil)
|
||||
if res := results.Results[0]; res.Err != nil {
|
||||
t.Fatalf("unexpected error during COUNT: %s", res.Err)
|
||||
} else if len(res.Series) != 9 {
|
||||
t.Fatalf("unexpected 9 series back but got %d", len(res.Series))
|
||||
}
|
||||
|
||||
results = s.ExecuteQuery(MustParseQuery(`SELECT count(value) FROM cpu GROUP BY * LIMIT 2 OFFSET 1`), "foo", nil)
|
||||
if res := results.Results[0]; res.Err != nil {
|
||||
t.Fatalf("unexpected error during COUNT: %s", res.Err)
|
||||
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","tags":{"host":"server-2","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]},{"name":"cpu","tags":{"host":"server-1","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}` {
|
||||
t.Fatalf("unexpected row(0) during COUNT: %s", s)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the server can execute a wildcard query and return the data correctly.
|
||||
func TestServer_ExecuteWildcardQuery(t *testing.T) {
|
||||
s := OpenServer(NewMessagingClient())
|
||||
|
|
26
tx.go
26
tx.go
|
@ -1,7 +1,9 @@
|
|||
package influxdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -146,6 +148,30 @@ func (tx *tx) CreateIterators(stmt *influxql.SelectStatement) ([]influxql.Iterat
|
|||
fieldNames = tx.fieldNames(stmt.Fields)
|
||||
}
|
||||
|
||||
// limit the number of series in this query if they specified a limit
|
||||
if stmt.Limit > 0 {
|
||||
if stmt.Offset > len(tagSets) {
|
||||
return nil, errors.New("offset higher than the number of series in the result")
|
||||
}
|
||||
|
||||
limitSets := make(map[string]map[uint32]influxql.Expr)
|
||||
orderedSets := make([]string, 0, len(tagSets))
|
||||
for k, _ := range tagSets {
|
||||
orderedSets = append(orderedSets, k)
|
||||
}
|
||||
sort.Strings(orderedSets)
|
||||
|
||||
if stmt.Limit > len(orderedSets) {
|
||||
stmt.Limit = len(orderedSets) - stmt.Offset
|
||||
}
|
||||
|
||||
orderedSets = orderedSets[stmt.Offset:stmt.Limit]
|
||||
for _, s := range orderedSets {
|
||||
limitSets[s] = tagSets[s]
|
||||
}
|
||||
tagSets = limitSets
|
||||
}
|
||||
|
||||
// Create an iterator for every shard.
|
||||
var itrs []influxql.Iterator
|
||||
for tag, set := range tagSets {
|
||||
|
|
Loading…
Reference in New Issue