commit
dea5814e04
|
@ -41,6 +41,7 @@ With this release InfluxDB is moving to Go 1.5.
|
|||
- [#4034](https://github.com/influxdb/influxdb/pull/4034): Rollback bolt tx on mapper open error
|
||||
- [#3848](https://github.com/influxdb/influxdb/issues/3848): restart influxdb causing panic
|
||||
- [#3881](https://github.com/influxdb/influxdb/issues/3881): panic: runtime error: invalid memory address or nil pointer dereference
|
||||
- [#3926](https://github.com/influxdb/influxdb/issues/3926): First or last value of `GROUP BY time(x)` is often null. Fixed by [#4038](https://github.com/influxdb/influxdb/pull/4038)
|
||||
|
||||
## v0.9.3 [2015-08-26]
|
||||
|
||||
|
|
|
@ -2481,6 +2481,88 @@ func TestServer_Query_AggregatesIdenticalTime(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// This will test that when using a group by, that it observes the time you asked for
|
||||
// but will only put the values in the bucket that match the time range
|
||||
func TestServer_Query_GroupByTimeCutoffs(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := OpenServer(NewConfig(), "")
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
writes := []string{
|
||||
fmt.Sprintf(`cpu value=1i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`cpu value=2i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
|
||||
fmt.Sprintf(`cpu value=3i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:05Z").UnixNano()),
|
||||
fmt.Sprintf(`cpu value=4i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:08Z").UnixNano()),
|
||||
fmt.Sprintf(`cpu value=5i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:09Z").UnixNano()),
|
||||
fmt.Sprintf(`cpu value=6i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano()),
|
||||
}
|
||||
test := NewTest("db0", "rp0")
|
||||
test.write = strings.Join(writes, "\n")
|
||||
|
||||
test.addQueries([]*Query{
|
||||
&Query{
|
||||
name: "sum all time",
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `SELECT SUM(value) FROM cpu`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",21]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "sum all time grouped by time 5s",
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T00:00:10Z' group by time(5s)`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",3],["2000-01-01T00:00:05Z",12],["2000-01-01T00:00:10Z",6]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "sum all time grouped by time 5s missing first point",
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:01Z' and time <= '2000-01-01T00:00:10Z' group by time(5s)`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",2],["2000-01-01T00:00:05Z",12],["2000-01-01T00:00:10Z",6]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "sum all time grouped by time 5s missing first points (null for bucket)",
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:02Z' and time <= '2000-01-01T00:00:10Z' group by time(5s)`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",null],["2000-01-01T00:00:05Z",12],["2000-01-01T00:00:10Z",6]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "sum all time grouped by time 5s missing last point - 2 time intervals",
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T00:00:09Z' group by time(5s)`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",3],["2000-01-01T00:00:05Z",12]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "sum all time grouped by time 5s missing last 2 points - 2 time intervals",
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T00:00:08Z' group by time(5s)`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",3],["2000-01-01T00:00:05Z",7]]}]}]}`,
|
||||
},
|
||||
}...)
|
||||
|
||||
for i, query := range test.queries {
|
||||
if i == 0 {
|
||||
if err := test.init(s); err != nil {
|
||||
t.Fatalf("test init failed: %s", err)
|
||||
}
|
||||
}
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
if err := query.Execute(s); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_Write_Precision(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := OpenServer(NewConfig(), "")
|
||||
|
|
|
@ -190,7 +190,7 @@ func (lm *SelectMapper) Open() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Validate that ANY GROUP BY is not a field for thie measurement.
|
||||
// Validate that ANY GROUP BY is not a field for the measurement.
|
||||
if err := m.ValidateGroupBy(lm.selectStmt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -414,12 +414,17 @@ func (lm *SelectMapper) nextChunkAgg() (interface{}, error) {
|
|||
Value: make([]interface{}, 0)}
|
||||
}
|
||||
|
||||
// Always clamp tmin. This can happen as bucket-times are bucketed to the nearest
|
||||
// interval, and this can be less than the times in the query.
|
||||
// Always clamp tmin and tmax. This can happen as bucket-times are bucketed to the nearest
|
||||
// interval. This is necessary to grab the "partial" buckets at the beginning and end of the time range
|
||||
qmin := tmin
|
||||
if qmin < lm.queryTMin {
|
||||
qmin = lm.queryTMin
|
||||
}
|
||||
qmax := tmax
|
||||
if qmax > lm.queryTMax {
|
||||
// Need to offset by one nanosecond for the logic to work properly in the tagset cursor Next
|
||||
qmax = lm.queryTMax + 1
|
||||
}
|
||||
|
||||
tsc.pointHeap = newPointHeap()
|
||||
for i := range lm.mapFuncs {
|
||||
|
@ -428,7 +433,7 @@ func (lm *SelectMapper) nextChunkAgg() (interface{}, error) {
|
|||
// changes to the mapper functions, which can come later.
|
||||
// Prime the buffers.
|
||||
for i := 0; i < len(tsc.cursors); i++ {
|
||||
k, v := tsc.cursors[i].SeekTo(tmin)
|
||||
k, v := tsc.cursors[i].SeekTo(qmin)
|
||||
if k == -1 || k > tmax {
|
||||
continue
|
||||
}
|
||||
|
@ -440,8 +445,8 @@ func (lm *SelectMapper) nextChunkAgg() (interface{}, error) {
|
|||
heap.Push(tsc.pointHeap, p)
|
||||
}
|
||||
// Wrap the tagset cursor so it implements the mapping functions interface.
|
||||
nextf := func() (time int64, value interface{}) {
|
||||
k, v := tsc.Next(qmin, tmax, []string{lm.fieldNames[i]}, lm.whereFields)
|
||||
nextf := func() (_ int64, value interface{}) {
|
||||
k, v := tsc.Next(qmin, qmax, []string{lm.fieldNames[i]}, lm.whereFields)
|
||||
return k, v
|
||||
}
|
||||
|
||||
|
@ -768,7 +773,7 @@ func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []stri
|
|||
p := heap.Pop(tsc.pointHeap).(*pointHeapItem)
|
||||
|
||||
// We're done if the point is outside the query's time range [tmin:tmax).
|
||||
if p.timestamp != tmin && (tmin > p.timestamp || p.timestamp >= tmax) {
|
||||
if p.timestamp != tmin && (p.timestamp < tmin || p.timestamp >= tmax) {
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -211,11 +211,14 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu
|
|||
func (q *QueryExecutor) PlanSelect(stmt *influxql.SelectStatement, chunkSize int) (Executor, error) {
|
||||
shards := map[uint64]meta.ShardInfo{} // Shards requiring mappers.
|
||||
|
||||
// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
|
||||
now := time.Now().UTC()
|
||||
|
||||
// Replace instances of "now()" with the current time, and check the resultant times.
|
||||
stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: time.Now().UTC()})
|
||||
stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: now})
|
||||
tmin, tmax := influxql.TimeRange(stmt.Condition)
|
||||
if tmax.IsZero() {
|
||||
tmax = time.Now()
|
||||
tmax = now
|
||||
}
|
||||
if tmin.IsZero() {
|
||||
tmin = time.Unix(0, 0)
|
||||
|
|
Loading…
Reference in New Issue