refactor of limit/offset

pull/2004/head
Cory LaNou 2015-03-19 13:31:46 -06:00
parent 56281fbfd5
commit a6171b3382
3 changed files with 68 additions and 64 deletions

View File

@ -218,6 +218,15 @@ func queryAndWait(t *testing.T, nodes Cluster, urlDb, q, expected string, timeou
timer = time.NewTimer(time.Duration(math.MaxInt64)) timer = time.NewTimer(time.Duration(math.MaxInt64))
) )
defer timer.Stop() defer timer.Stop()
// Check to see if they set the env for duration sleep
sleep := 10 * time.Millisecond
if d, e := time.ParseDuration(os.Getenv("TEST_SLEEP")); e == nil {
// this will limit the http log noise in the test output
sleep = d
timeout = d + 1
}
if timeout > 0 { if timeout > 0 {
timer.Reset(time.Duration(timeout)) timer.Reset(time.Duration(timeout))
go func() { go func() {
@ -232,7 +241,7 @@ func queryAndWait(t *testing.T, nodes Cluster, urlDb, q, expected string, timeou
} else if atomic.LoadInt32(&timedOut) == 1 { } else if atomic.LoadInt32(&timedOut) == 1 {
return got, false return got, false
} else { } else {
time.Sleep(10 * time.Millisecond) time.Sleep(sleep)
} }
} }
} }
@ -270,7 +279,7 @@ func runTests_Errors(t *testing.T, nodes Cluster) {
} }
// runTests tests write and query of data. Setting testNumbers allows only a subset of tests to be run. // runTests tests write and query of data. Setting testNumbers allows only a subset of tests to be run.
func runTestsData(t *testing.T, testName string, nodes Cluster, database, retention string, testNums ...int) { func runTestsData(t *testing.T, testName string, nodes Cluster, database, retention string) {
t.Logf("Running tests against %d-node cluster", len(nodes)) t.Logf("Running tests against %d-node cluster", len(nodes))
// Start by ensuring database and retention policy exist. // Start by ensuring database and retention policy exist.
@ -574,12 +583,12 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:03Z",3],["2009-11-10T23:00:04Z",4]]}]}]}`, expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:03Z",3],["2009-11-10T23:00:04Z",4]]}]}]}`,
}, },
{ {
name: "limit + offset higher than number of points", name: "limit + offset equal to total number of points",
query: `select foo from "%DB%"."%RP%".limit LIMIT 3 OFFSET 3`, query: `select foo from "%DB%"."%RP%".limit LIMIT 3 OFFSET 3`,
expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:05Z",5]]}]}]}`, expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:05Z",5]]}]}]}`,
}, },
{ {
name: "offset higher than number of points", name: "limit - offset higher than number of points",
query: `select foo from "%DB%"."%RP%".limit LIMIT 2 OFFSET 20`, query: `select foo from "%DB%"."%RP%".limit LIMIT 2 OFFSET 20`,
expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"]}]}]}`, expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"]}]}]}`,
}, },
@ -599,14 +608,14 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
expected: `{"results":[{"series":[{"name":"limit","columns":["time","mean"],"values":[["2009-11-10T23:00:03Z",3],["2009-11-10T23:00:04Z",4]]}]}]}`, expected: `{"results":[{"series":[{"name":"limit","columns":["time","mean"],"values":[["2009-11-10T23:00:03Z",3],["2009-11-10T23:00:04Z",4]]}]}]}`,
}, },
{ {
name: "limit + offset higher than number of points with group by time", name: "limit + offset equal to the number of points with group by time",
query: `select mean(foo) from "%DB%"."%RP%".limit WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY time(1s) LIMIT 3 OFFSET 3`, query: `select mean(foo) from "%DB%"."%RP%".limit WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY time(1s) LIMIT 3 OFFSET 3`,
expected: `{"results":[{"series":[{"name":"limit","columns":["time","mean"],"values":[["2009-11-10T23:00:05Z",5]]}]}]}`, expected: `{"results":[{"series":[{"name":"limit","columns":["time","mean"],"values":[["2009-11-10T23:00:05Z",5]]}]}]}`,
}, },
{ {
name: "offset higher than number of points with group by time", name: "limit - offset higher than number of points with group by time",
query: `select mean(foo) from "%DB%"."%RP%".limit WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY time(1s) LIMIT 2 OFFSET 20`, query: `select mean(foo) from "%DB%"."%RP%".limit WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY time(1s) LIMIT 2 OFFSET 20`,
expected: `{"results":[{"series":[{"name":"limit","columns":["time","mean"]}]}]}`, expected: `{"results":[{}]}`,
}, },
// Fill tests // Fill tests
@ -952,25 +961,23 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
}, },
} }
// See if we should run a subset of this test
testPrefix := os.Getenv("TEST_PREFIX")
if testPrefix != "" {
t.Logf("Skipping all tests that do not match the prefix of %q\n", testPrefix)
}
for i, tt := range tests { for i, tt := range tests {
// If tests were explicitly requested, only run those tests.
if len(testNums) > 0 {
var found bool
for _, t := range testNums {
if i == t {
found = true
break
}
}
if !found {
continue
}
}
name := tt.name name := tt.name
if name == "" { if name == "" {
name = tt.query name = tt.query
} }
if testPrefix != "" && !strings.HasPrefix(name, testPrefix) {
continue
}
fmt.Printf("TEST: %d: %s\n", i, name) fmt.Printf("TEST: %d: %s\n", i, name)
t.Logf("Running test %d: %s", i, name) t.Logf("Running test %d: %s", i, name)

View File

@ -6,6 +6,8 @@ import (
"hash/fnv" "hash/fnv"
"sort" "sort"
"time" "time"
"github.com/davecgh/go-spew/spew"
) )
// DB represents an interface for creating transactions. // DB represents an interface for creating transactions.
@ -103,14 +105,9 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) {
// For group by time queries, limit the number of data points returned by the limit and offset // For group by time queries, limit the number of data points returned by the limit and offset
// raw query limits are handled elsewhere // raw query limits are handled elsewhere
warn("> ", m.stmt.IsRawQuery, pointCountInResult)
if !m.stmt.IsRawQuery && (m.stmt.Limit > 0 || m.stmt.Offset > 0) { if !m.stmt.IsRawQuery && (m.stmt.Limit > 0 || m.stmt.Offset > 0) {
// ensure that the offset isn't higher than the number of points we'd get // ensure that the offset isn't higher than the number of points we'd get
if m.stmt.Offset > pointCountInResult { if m.stmt.Offset > pointCountInResult {
out <- &Row{
Name: m.MeasurementName,
Tags: m.TagSet.Tags,
}
return return
} }
@ -120,7 +117,6 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) {
pointCountInResult = m.stmt.Limit pointCountInResult = m.stmt.Limit
} }
} }
warn("< ", m.stmt.Limit)
// If we are exceeding our MaxGroupByPoints and we aren't a raw query, error out // If we are exceeding our MaxGroupByPoints and we aren't a raw query, error out
if !m.stmt.IsRawQuery && pointCountInResult > MaxGroupByPoints { if !m.stmt.IsRawQuery && pointCountInResult > MaxGroupByPoints {
@ -140,17 +136,28 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) {
startTimeBucket := m.TMin / m.interval * m.interval startTimeBucket := m.TMin / m.interval * m.interval
for i, _ := range resultTimes { for i, _ := range resultTimes {
t := startTimeBucket + (int64(i+1) * m.interval * int64(m.stmt.Offset+1)) - m.interval var t int64
if m.stmt.Offset > 0 {
t = startTimeBucket + (int64(i+1) * m.interval * int64(m.stmt.Offset))
} else {
t = startTimeBucket + (int64(i+1) * m.interval) - m.interval
}
// If we start getting out of our max time range, then truncate values and return
if t > m.TMax && !isRaw {
resultValues = resultValues[:i]
break
}
resultTimes[i] = t resultTimes[i] = t
// we always include time so we need one more column than we have aggregates // we always include time so we need one more column than we have aggregates
vals := make([]interface{}, 0, len(aggregates)+1) vals := make([]interface{}, 0, len(aggregates)+1)
resultValues[i] = append(vals, time.Unix(0, t).UTC()) resultValues[i] = append(vals, time.Unix(0, t).UTC())
} }
spew.Dump(resultValues)
// This just makes sure that if they specify a start time less than what the start time would be with the offset, // This just makes sure that if they specify a start time less than what the start time would be with the offset,
// we just reset the start time to the later time to avoid going over data that won't show up in the result. // we just reset the start time to the later time to avoid going over data that won't show up in the result.
if m.stmt.Offset > 0 && !m.stmt.IsRawQuery { if m.stmt.Offset > 0 && !m.stmt.IsRawQuery {
warn(". setting tmin: ", resultTimes[0])
m.TMin = resultTimes[0] m.TMin = resultTimes[0]
} }

View File

@ -61,6 +61,7 @@ func TestParser_ParseStatement(t *testing.T) {
{ {
s: `SELECT * FROM myseries`, s: `SELECT * FROM myseries`,
stmt: &influxql.SelectStatement{ stmt: &influxql.SelectStatement{
IsRawQuery: true,
Fields: []*influxql.Field{ Fields: []*influxql.Field{
{Expr: &influxql.Wildcard{}}, {Expr: &influxql.Wildcard{}},
}, },
@ -70,7 +71,7 @@ func TestParser_ParseStatement(t *testing.T) {
// SELECT statement // SELECT statement
{ {
s: `SELECT field1, field2 ,field3 AS field_x FROM myseries WHERE host = 'hosta.influxdb.org' GROUP BY 10h ORDER BY ASC LIMIT 20 OFFSET 10;`, s: `SELECT field1, field2 ,field3 AS field_x FROM myseries WHERE host = 'hosta.influxdb.org' GROUP BY time(10h) ORDER BY ASC LIMIT 20 OFFSET 10;`,
stmt: &influxql.SelectStatement{ stmt: &influxql.SelectStatement{
Fields: []*influxql.Field{ Fields: []*influxql.Field{
{Expr: &influxql.VarRef{Val: "field1"}}, {Expr: &influxql.VarRef{Val: "field1"}},
@ -83,9 +84,7 @@ func TestParser_ParseStatement(t *testing.T) {
LHS: &influxql.VarRef{Val: "host"}, LHS: &influxql.VarRef{Val: "host"},
RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"}, RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"},
}, },
Dimensions: []*influxql.Dimension{ Dimensions: influxql.Dimensions{{Expr: &influxql.Call{Name: "time", Args: []influxql.Expr{&influxql.DurationLiteral{Val: 10 * time.Hour}}}}},
{Expr: &influxql.DurationLiteral{Val: 10 * time.Hour}},
},
SortFields: []*influxql.SortField{ SortFields: []*influxql.SortField{
{Ascending: true}, {Ascending: true},
}, },
@ -98,8 +97,9 @@ func TestParser_ParseStatement(t *testing.T) {
{ {
s: `select my_field from myseries`, s: `select my_field from myseries`,
stmt: &influxql.SelectStatement{ stmt: &influxql.SelectStatement{
Fields: []*influxql.Field{{Expr: &influxql.VarRef{Val: "my_field"}}}, IsRawQuery: true,
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, Fields: []*influxql.Field{{Expr: &influxql.VarRef{Val: "my_field"}}},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
}, },
}, },
@ -107,8 +107,9 @@ func TestParser_ParseStatement(t *testing.T) {
{ {
s: `SELECT field1 FROM myseries ORDER BY ASC, field1, field2 DESC LIMIT 10`, s: `SELECT field1 FROM myseries ORDER BY ASC, field1, field2 DESC LIMIT 10`,
stmt: &influxql.SelectStatement{ stmt: &influxql.SelectStatement{
Fields: []*influxql.Field{{Expr: &influxql.VarRef{Val: "field1"}}}, IsRawQuery: true,
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, Fields: []*influxql.Field{{Expr: &influxql.VarRef{Val: "field1"}}},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
SortFields: []*influxql.SortField{ SortFields: []*influxql.SortField{
{Ascending: true}, {Ascending: true},
{Name: "field1"}, {Name: "field1"},
@ -122,10 +123,11 @@ func TestParser_ParseStatement(t *testing.T) {
{ {
s: `SELECT field1 FROM myseries SLIMIT 10 SOFFSET 5`, s: `SELECT field1 FROM myseries SLIMIT 10 SOFFSET 5`,
stmt: &influxql.SelectStatement{ stmt: &influxql.SelectStatement{
Fields: []*influxql.Field{{Expr: &influxql.VarRef{Val: "field1"}}}, IsRawQuery: true,
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, Fields: []*influxql.Field{{Expr: &influxql.VarRef{Val: "field1"}}},
SLimit: 10, Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
SOffset: 5, SLimit: 10,
SOffset: 5,
}, },
}, },
@ -133,8 +135,9 @@ func TestParser_ParseStatement(t *testing.T) {
{ {
s: `SELECT * FROM cpu WHERE host = 'serverC' AND region =~ /.*west.*/`, s: `SELECT * FROM cpu WHERE host = 'serverC' AND region =~ /.*west.*/`,
stmt: &influxql.SelectStatement{ stmt: &influxql.SelectStatement{
Fields: []*influxql.Field{{Expr: &influxql.Wildcard{}}}, IsRawQuery: true,
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, Fields: []*influxql.Field{{Expr: &influxql.Wildcard{}}},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Condition: &influxql.BinaryExpr{ Condition: &influxql.BinaryExpr{
Op: influxql.AND, Op: influxql.AND,
LHS: &influxql.BinaryExpr{ LHS: &influxql.BinaryExpr{
@ -155,7 +158,8 @@ func TestParser_ParseStatement(t *testing.T) {
{ {
s: `SELECT * FROM /cpu.*/`, s: `SELECT * FROM /cpu.*/`,
stmt: &influxql.SelectStatement{ stmt: &influxql.SelectStatement{
Fields: []*influxql.Field{{Expr: &influxql.Wildcard{}}}, IsRawQuery: true,
Fields: []*influxql.Field{{Expr: &influxql.Wildcard{}}},
Sources: []influxql.Source{&influxql.Measurement{ Sources: []influxql.Source{&influxql.Measurement{
Regex: &influxql.RegexLiteral{Val: regexp.MustCompile("cpu.*")}}}, Regex: &influxql.RegexLiteral{Val: regexp.MustCompile("cpu.*")}}},
}, },
@ -169,17 +173,10 @@ func TestParser_ParseStatement(t *testing.T) {
Expr: &influxql.Call{ Expr: &influxql.Call{
Name: "mean", Name: "mean",
Args: []influxql.Expr{&influxql.VarRef{Val: "value"}}}}}, Args: []influxql.Expr{&influxql.VarRef{Val: "value"}}}}},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Dimensions: []*influxql.Dimension{ Dimensions: []*influxql.Dimension{{Expr: &influxql.Call{Name: "time", Args: []influxql.Expr{&influxql.DurationLiteral{Val: 5 * time.Minute}}}}},
{Expr: &influxql.Call{ Fill: influxql.NumberFill,
Name: "time", FillValue: float64(1),
Args: []influxql.Expr{
&influxql.DurationLiteral{Val: 5 * time.Minute},
},
}},
},
Fill: influxql.NumberFill,
FillValue: float64(1),
}, },
}, },
@ -191,16 +188,9 @@ func TestParser_ParseStatement(t *testing.T) {
Expr: &influxql.Call{ Expr: &influxql.Call{
Name: "mean", Name: "mean",
Args: []influxql.Expr{&influxql.VarRef{Val: "value"}}}}}, Args: []influxql.Expr{&influxql.VarRef{Val: "value"}}}}},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Dimensions: []*influxql.Dimension{ Dimensions: []*influxql.Dimension{{Expr: &influxql.Call{Name: "time", Args: []influxql.Expr{&influxql.DurationLiteral{Val: 5 * time.Minute}}}}},
{Expr: &influxql.Call{ Fill: influxql.PreviousFill,
Name: "time",
Args: []influxql.Expr{
&influxql.DurationLiteral{Val: 5 * time.Minute},
},
}},
},
Fill: influxql.PreviousFill,
}, },
}, },