Merge branch '1.2' into jw-merge-12

pull/7938/head
Jason Wilder 2017-02-02 16:40:36 -07:00
commit 2e95b4043c
6 changed files with 940 additions and 395 deletions

View File

@ -4,7 +4,6 @@
- [#7776](https://github.com/influxdata/influxdb/issues/7776): Add system information to /debug/vars.
## v1.2.1 [unreleased]
### Bugfixes
@ -12,7 +11,9 @@
- [#7877](https://github.com/influxdata/influxdb/issues/7877): Fix mapping of types when the measurement uses a regex
- [#7888](https://github.com/influxdata/influxdb/pull/7888): Expand query dimensions from the subquery.
- [#7910](https://github.com/influxdata/influxdb/issues/7910): Fix EvalType when a parenthesis expression is used.
- [#7929](https://github.com/influxdata/influxdb/issues/7929): Fix series tag iteration segfault. (#7922)
- [#7906](https://github.com/influxdata/influxdb/issues/7906): Anchors not working as expected with case-insensitive regex
- [#7895](https://github.com/influxdata/influxdb/issues/7895): Fix incorrect math when aggregates that emit different times are used.
## v1.2.0 [2017-01-24]

View File

@ -1684,6 +1684,15 @@ func TestServer_Query_SelectGroupByTimeDerivative(t *testing.T) {
cpu value=15 1278010021000000000
cpu value=20 1278010022000000000
cpu value=25 1278010023000000000
cpu0,host=server01 ticks=10,total=100 1278010020000000000
cpu0,host=server01 ticks=30,total=100 1278010021000000000
cpu0,host=server01 ticks=32,total=100 1278010022000000000
cpu0,host=server01 ticks=47,total=100 1278010023000000000
cpu0,host=server02 ticks=40,total=100 1278010020000000000
cpu0,host=server02 ticks=45,total=100 1278010021000000000
cpu0,host=server02 ticks=84,total=100 1278010022000000000
cpu0,host=server02 ticks=101,total=100 1278010023000000000
`)},
}
@ -1789,6 +1798,11 @@ cpu value=25 1278010023000000000
command: `SELECT derivative(percentile(value, 50), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",20]]}]}]}`,
},
&Query{
name: "calculate derivative of ticks divided by aggregate",
command: `SELECT non_negative_derivative(mean(ticks), 1s) / last(total) * 100 AS usage FROM db0.rp0.cpu0 WHERE time >= '2010-07-01 18:47:00' AND time <= '2010-07-01 18:47:03' GROUP BY host, time(1s)`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu0","tags":{"host":"server01"},"columns":["time","usage"],"values":[["2010-07-01T18:47:00Z",null],["2010-07-01T18:47:01Z",20],["2010-07-01T18:47:02Z",2],["2010-07-01T18:47:03Z",15]]},{"name":"cpu0","tags":{"host":"server02"},"columns":["time","usage"],"values":[["2010-07-01T18:47:00Z",null],["2010-07-01T18:47:01Z",5],["2010-07-01T18:47:02Z",39],["2010-07-01T18:47:03Z",17]]}]}]}`,
},
}...)
for i, query := range test.queries {
@ -7362,3 +7376,48 @@ func TestServer_Query_Sample_Wildcard(t *testing.T) {
}
}
}
// Validate that nested aggregates don't panic
func TestServer_NestedAggregateWithMathPanics(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil {
t.Fatal(err)
}
writes := []string{
`cpu value=2 0`,
}
test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: strings.Join(writes, "\n")},
}
test.addQueries([]*Query{
&Query{
name: "dividing by elapsed count should not panic",
params: url.Values{"db": []string{"db0"}},
command: `SELECT sum(value) / elapsed(sum(value), 1m) FROM cpu WHERE time >= 0 AND time < 10m GROUP BY time(1m)`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","sum_elapsed"],"values":[["1970-01-01T00:00:00Z",null],["1970-01-01T00:01:00Z",null],["1970-01-01T00:02:00Z",null],["1970-01-01T00:03:00Z",null],["1970-01-01T00:04:00Z",null],["1970-01-01T00:05:00Z",null],["1970-01-01T00:06:00Z",null],["1970-01-01T00:07:00Z",null],["1970-01-01T00:08:00Z",null],["1970-01-01T00:09:00Z",null]]}]}]}`,
},
}...)
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1259,36 +1259,22 @@ func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) C
func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) Next() (*{{$v.Name}}Point, error) {
for {
a, err := itr.left.Next()
if err != nil {
a, b, err := itr.next()
if err != nil || (a == nil && b == nil) {
return nil, err
}
b, err := itr.right.Next()
if err != nil {
return nil, err
}
if a == nil && b == nil {
return nil, nil
} else if itr.points == nil && (a == nil || b == nil ) {
return nil, nil
}
if a != nil && b != nil {
if a.Time > b.Time {
itr.left.unread(a)
a = nil
} else if a.Time < b.Time {
itr.right.unread(b)
b = nil
}
}
if a == nil || a.Nil {
if itr.points == nil {
continue
}
p := *b
var p {{$k.Name}}Point
if b != nil {
p = *b
} else {
p = *a
}
p.Value = itr.points[0].Value
p.Nil = itr.points[0].Nil
a = &p
@ -1330,6 +1316,48 @@ func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) N
}
}
// next returns the next points within each iterator. If the iterators are
// uneven, it organizes them so only matching points are returned.
func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) next() (a, b *{{$k.Name}}Point, err error) {
// Retrieve the next value for both the left and right.
a, err = itr.left.Next()
if err != nil {
return nil, nil, err
}
b, err = itr.right.Next()
if err != nil {
return nil, nil, err
}
// If we have a point from both, make sure that they match each other.
if a != nil && b != nil {
if a.Name > b.Name {
itr.left.unread(a)
return nil, b, nil
} else if a.Name < b.Name {
itr.right.unread(b)
return a, nil, nil
}
if ltags, rtags := a.Tags.ID(), b.Tags.ID(); ltags > rtags {
itr.left.unread(a)
return nil, b, nil
} else if ltags < rtags {
itr.right.unread(b)
return a, nil, nil
}
if a.Time > b.Time {
itr.left.unread(a)
return nil, b, nil
} else if a.Time < b.Time {
itr.right.unread(b)
return a, nil, nil
}
}
return a, b, nil
}
// {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc creates or modifies a point by combining two
// points. The point passed in may be modified and returned rather than
// allocating a new point if possible. One of the points may be nil, but at

View File

@ -1488,6 +1488,15 @@ func (s *Series) ShardN() int {
return n
}
// ForEachTag executes fn for every tag. Iteration occurs under lock.
func (s *Series) ForEachTag(fn func(models.Tag)) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, t := range s.Tags {
fn(t)
}
}
// Dereference removes references to a byte slice.
func (s *Series) Dereference(b []byte) {
s.mu.Lock()

View File

@ -926,14 +926,14 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err
// Loop over all keys for each series.
m := make(map[KeyValue]struct{}, len(ss))
for _, series := range ss {
for _, t := range series.Tags {
series.ForEachTag(func(t models.Tag) {
if !ok {
// nop
} else if _, exists := keySet[string(t.Key)]; !exists {
continue
return
}
m[KeyValue{string(t.Key), string(t.Value)}] = struct{}{}
}
})
}
// Return an empty slice if there are no key/value matches.