From 4d37c9dc9e976bffb0274ad5a0db3e0ea834f265 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 13 Mar 2017 16:59:03 -0600 Subject: [PATCH 01/11] Fix broken Values.Merge benchmark Merge had the side effect of modifying the original values so the results are wrong because they always hit the fast path after the first run. --- tsdb/engine/tsm1/encoding_test.go | 91 ++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 2 deletions(-) diff --git a/tsdb/engine/tsm1/encoding_test.go b/tsdb/engine/tsm1/encoding_test.go index 9391a4ec67..ddf20153e2 100644 --- a/tsdb/engine/tsm1/encoding_test.go +++ b/tsdb/engine/tsm1/encoding_test.go @@ -488,7 +488,6 @@ func TestValues_MergeFloat(t *testing.T) { for i, test := range tests { got := tsm1.Values(test.a).Merge(test.b) - if exp, got := len(test.exp), len(got); exp != got { t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got) } @@ -1366,8 +1365,96 @@ func BenchmarkValues_Merge(b *testing.B) { } b.ResetTimer() + benchmarkMerge(a, c, b) +} + +func BenchmarkValues_MergeDisjoint(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + c := make([]tsm1.Value, len(times)) + + for i, t := range times { + a[i] = tsm1.NewValue(t, float64(i)) + c[i] = tsm1.NewValue(times[len(times)-1]+int64((i+1)*1e9), float64(i)) + } + + b.ResetTimer() + benchmarkMerge(a, c, b) +} + +func BenchmarkValues_MergeSame(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + c := make([]tsm1.Value, len(times)) + + for i, t := range times { + a[i] = tsm1.NewValue(t, float64(i)) + c[i] = tsm1.NewValue(t, float64(i)) + } + + b.ResetTimer() + benchmarkMerge(a, c, b) +} + +func BenchmarkValues_MergeSimilar(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + c := make([]tsm1.Value, len(times)) + + for i, t := range times { + a[i] = tsm1.NewValue(t, float64(i)) + if i == 0 { + t++ + } + c[i] = tsm1.NewValue(t, float64(i)) + } + + b.ResetTimer() + benchmarkMerge(a, c, b) +} + +func BenchmarkValues_MergeUnevenA(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + c := make([]tsm1.Value, len(times)) + + for i, t := range times { + a[i] = tsm1.NewValue(t, float64(i)) + c[i] = tsm1.NewValue(t, float64(i)) + } + + b.ResetTimer() + benchmarkMerge(a[:700], c[:10], b) +} + +func BenchmarkValues_MergeUnevenB(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + c := make([]tsm1.Value, len(times)) + + for i, t := range times { + a[i] = tsm1.NewValue(t, float64(i)) + c[i] = tsm1.NewValue(t, float64(i)) + } + + b.ResetTimer() + benchmarkMerge(a[:10], c[:700], b) +} + +func benchmarkMerge(a, c tsm1.Values, b *testing.B) { for i := 0; i < b.N; i++ { - tsm1.Values(a).Merge(c) + b.StopTimer() + aa := make(tsm1.Values, len(a)) + copy(aa, a) + cc := make(tsm1.Values, len(c)) + copy(cc, c) + b.StartTimer() + tsm1.Values(aa).Merge(tsm1.Values(cc)) } } From a4cfeacedba3739520ce930532d9e9929ca218c3 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 13 Mar 2017 17:11:32 -0600 Subject: [PATCH 02/11] Use standard merge algorithm for merging values The previous version was very innefficient due to the benchmarks used to optimize it having a bug. This version always allocates a new slice, but is O(n). --- tsdb/engine/tsm1/encoding.gen.go | 270 ++++++-------------------- tsdb/engine/tsm1/encoding.gen.go.tmpl | 54 ++---- 2 files changed, 66 insertions(+), 258 deletions(-) diff --git a/tsdb/engine/tsm1/encoding.gen.go b/tsdb/engine/tsm1/encoding.gen.go index dbb42c1bd1..5d42118a46 100644 --- a/tsdb/engine/tsm1/encoding.gen.go +++ b/tsdb/engine/tsm1/encoding.gen.go @@ -140,52 +140,20 @@ func (a Values) Merge(b Values) Values { return append(b, a...) } - for i := 0; i < len(a) && len(b) > 0; i++ { - av, bv := a[i].UnixNano(), b[0].UnixNano() - // Value in a is greater than B, we need to merge - if av > bv { - // Save value in a - temp := a[i] - - // Overwrite a with b - a[i] = b[0] - - // Slide all values of b down 1 - copy(b, b[1:]) - b = b[:len(b)-1] - - var k int - if len(b) > 0 && av > b[len(b)-1].UnixNano() { - // Fast path where a is after b, we skip the search - k = len(b) - } else { - // See where value we save from a should be inserted in b to keep b sorted - k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) - } - - if k == len(b) { - // Last position? - b = append(b, temp) - } else if b[k].UnixNano() != temp.UnixNano() { - // Save the last element, since it will get overwritten - last := b[len(b)-1] - // Somewhere in the middle of b, insert it only if it's not a duplicate - copy(b[k+1:], b[k:]) - // Add the last vale to the end - b = append(b, last) - b[k] = temp - } - } else if av == bv { - // Value in a an b are the same, use b - a[i] = b[0] - b = b[1:] + out := make(Values, 0, len(a)+len(b)) + for len(a) > 0 && len(b) > 0 { + if a[0].UnixNano() < b[0].UnixNano() { + out, a = append(out, a[0]), a[1:] + } else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() { + a = a[1:] + } else { + out, b = append(out, b[0]), b[1:] } } - - if len(b) > 0 { - return append(a, b...) + if len(a) > 0 { + return append(out, a...) } - return a + return append(out, b...) } // Sort methods @@ -322,52 +290,20 @@ func (a FloatValues) Merge(b FloatValues) FloatValues { return append(b, a...) } - for i := 0; i < len(a) && len(b) > 0; i++ { - av, bv := a[i].UnixNano(), b[0].UnixNano() - // Value in a is greater than B, we need to merge - if av > bv { - // Save value in a - temp := a[i] - - // Overwrite a with b - a[i] = b[0] - - // Slide all values of b down 1 - copy(b, b[1:]) - b = b[:len(b)-1] - - var k int - if len(b) > 0 && av > b[len(b)-1].UnixNano() { - // Fast path where a is after b, we skip the search - k = len(b) - } else { - // See where value we save from a should be inserted in b to keep b sorted - k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) - } - - if k == len(b) { - // Last position? - b = append(b, temp) - } else if b[k].UnixNano() != temp.UnixNano() { - // Save the last element, since it will get overwritten - last := b[len(b)-1] - // Somewhere in the middle of b, insert it only if it's not a duplicate - copy(b[k+1:], b[k:]) - // Add the last vale to the end - b = append(b, last) - b[k] = temp - } - } else if av == bv { - // Value in a an b are the same, use b - a[i] = b[0] - b = b[1:] + out := make(FloatValues, 0, len(a)+len(b)) + for len(a) > 0 && len(b) > 0 { + if a[0].UnixNano() < b[0].UnixNano() { + out, a = append(out, a[0]), a[1:] + } else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() { + a = a[1:] + } else { + out, b = append(out, b[0]), b[1:] } } - - if len(b) > 0 { - return append(a, b...) + if len(a) > 0 { + return append(out, a...) } - return a + return append(out, b...) } // Sort methods @@ -504,52 +440,20 @@ func (a IntegerValues) Merge(b IntegerValues) IntegerValues { return append(b, a...) } - for i := 0; i < len(a) && len(b) > 0; i++ { - av, bv := a[i].UnixNano(), b[0].UnixNano() - // Value in a is greater than B, we need to merge - if av > bv { - // Save value in a - temp := a[i] - - // Overwrite a with b - a[i] = b[0] - - // Slide all values of b down 1 - copy(b, b[1:]) - b = b[:len(b)-1] - - var k int - if len(b) > 0 && av > b[len(b)-1].UnixNano() { - // Fast path where a is after b, we skip the search - k = len(b) - } else { - // See where value we save from a should be inserted in b to keep b sorted - k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) - } - - if k == len(b) { - // Last position? - b = append(b, temp) - } else if b[k].UnixNano() != temp.UnixNano() { - // Save the last element, since it will get overwritten - last := b[len(b)-1] - // Somewhere in the middle of b, insert it only if it's not a duplicate - copy(b[k+1:], b[k:]) - // Add the last vale to the end - b = append(b, last) - b[k] = temp - } - } else if av == bv { - // Value in a an b are the same, use b - a[i] = b[0] - b = b[1:] + out := make(IntegerValues, 0, len(a)+len(b)) + for len(a) > 0 && len(b) > 0 { + if a[0].UnixNano() < b[0].UnixNano() { + out, a = append(out, a[0]), a[1:] + } else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() { + a = a[1:] + } else { + out, b = append(out, b[0]), b[1:] } } - - if len(b) > 0 { - return append(a, b...) + if len(a) > 0 { + return append(out, a...) } - return a + return append(out, b...) } // Sort methods @@ -686,52 +590,20 @@ func (a StringValues) Merge(b StringValues) StringValues { return append(b, a...) } - for i := 0; i < len(a) && len(b) > 0; i++ { - av, bv := a[i].UnixNano(), b[0].UnixNano() - // Value in a is greater than B, we need to merge - if av > bv { - // Save value in a - temp := a[i] - - // Overwrite a with b - a[i] = b[0] - - // Slide all values of b down 1 - copy(b, b[1:]) - b = b[:len(b)-1] - - var k int - if len(b) > 0 && av > b[len(b)-1].UnixNano() { - // Fast path where a is after b, we skip the search - k = len(b) - } else { - // See where value we save from a should be inserted in b to keep b sorted - k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) - } - - if k == len(b) { - // Last position? - b = append(b, temp) - } else if b[k].UnixNano() != temp.UnixNano() { - // Save the last element, since it will get overwritten - last := b[len(b)-1] - // Somewhere in the middle of b, insert it only if it's not a duplicate - copy(b[k+1:], b[k:]) - // Add the last vale to the end - b = append(b, last) - b[k] = temp - } - } else if av == bv { - // Value in a an b are the same, use b - a[i] = b[0] - b = b[1:] + out := make(StringValues, 0, len(a)+len(b)) + for len(a) > 0 && len(b) > 0 { + if a[0].UnixNano() < b[0].UnixNano() { + out, a = append(out, a[0]), a[1:] + } else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() { + a = a[1:] + } else { + out, b = append(out, b[0]), b[1:] } } - - if len(b) > 0 { - return append(a, b...) + if len(a) > 0 { + return append(out, a...) } - return a + return append(out, b...) } // Sort methods @@ -868,52 +740,20 @@ func (a BooleanValues) Merge(b BooleanValues) BooleanValues { return append(b, a...) } - for i := 0; i < len(a) && len(b) > 0; i++ { - av, bv := a[i].UnixNano(), b[0].UnixNano() - // Value in a is greater than B, we need to merge - if av > bv { - // Save value in a - temp := a[i] - - // Overwrite a with b - a[i] = b[0] - - // Slide all values of b down 1 - copy(b, b[1:]) - b = b[:len(b)-1] - - var k int - if len(b) > 0 && av > b[len(b)-1].UnixNano() { - // Fast path where a is after b, we skip the search - k = len(b) - } else { - // See where value we save from a should be inserted in b to keep b sorted - k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) - } - - if k == len(b) { - // Last position? - b = append(b, temp) - } else if b[k].UnixNano() != temp.UnixNano() { - // Save the last element, since it will get overwritten - last := b[len(b)-1] - // Somewhere in the middle of b, insert it only if it's not a duplicate - copy(b[k+1:], b[k:]) - // Add the last vale to the end - b = append(b, last) - b[k] = temp - } - } else if av == bv { - // Value in a an b are the same, use b - a[i] = b[0] - b = b[1:] + out := make(BooleanValues, 0, len(a)+len(b)) + for len(a) > 0 && len(b) > 0 { + if a[0].UnixNano() < b[0].UnixNano() { + out, a = append(out, a[0]), a[1:] + } else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() { + a = a[1:] + } else { + out, b = append(out, b[0]), b[1:] } } - - if len(b) > 0 { - return append(a, b...) + if len(a) > 0 { + return append(out, a...) } - return a + return append(out, b...) } // Sort methods diff --git a/tsdb/engine/tsm1/encoding.gen.go.tmpl b/tsdb/engine/tsm1/encoding.gen.go.tmpl index 4d3fb50142..5475cbf28d 100644 --- a/tsdb/engine/tsm1/encoding.gen.go.tmpl +++ b/tsdb/engine/tsm1/encoding.gen.go.tmpl @@ -137,52 +137,20 @@ func (a {{.Name}}Values) Merge(b {{.Name}}Values) {{.Name}}Values { return append(b, a...) } - for i := 0; i < len(a) && len(b) > 0; i++ { - av, bv := a[i].UnixNano(), b[0].UnixNano() - // Value in a is greater than B, we need to merge - if av > bv { - // Save value in a - temp := a[i] - - // Overwrite a with b - a[i] = b[0] - - // Slide all values of b down 1 - copy(b, b[1:]) - b = b[:len(b)-1] - - var k int - if len(b) > 0 && av > b[len(b)-1].UnixNano() { - // Fast path where a is after b, we skip the search - k = len(b) - } else { - // See where value we save from a should be inserted in b to keep b sorted - k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) - } - - if k == len(b) { - // Last position? - b = append(b, temp) - } else if b[k].UnixNano() != temp.UnixNano() { - // Save the last element, since it will get overwritten - last := b[len(b)-1] - // Somewhere in the middle of b, insert it only if it's not a duplicate - copy(b[k+1:], b[k:]) - // Add the last vale to the end - b = append(b, last) - b[k] = temp - } - } else if av == bv { - // Value in a an b are the same, use b - a[i] = b[0] - b = b[1:] + out := make({{.Name}}Values, 0, len(a)+len(b)) + for len(a) > 0 && len(b) > 0 { + if a[0].UnixNano() < b[0].UnixNano() { + out, a = append(out, a[0]), a[1:] + } else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() { + a = a[1:] + } else { + out, b = append(out, b[0]), b[1:] } } - - if len(b) > 0 { - return append(a, b...) + if len(a) > 0 { + return append(out, a...) } - return a + return append(out, b...) } // Sort methods From 3ba65a365053b58434409d4987a60540a77f2162 Mon Sep 17 00:00:00 2001 From: Mark Rushakoff Date: Wed, 15 Mar 2017 08:44:48 -0700 Subject: [PATCH 03/11] Mention config settings in PR template --- .github/PULL_REQUEST_TEMPLATE.md | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index b2416b059c..5cfe48f3cc 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -9,4 +9,5 @@ _You can erase any checkboxes below this note if they are not applicable to your - [ ] [InfluxQL Spec](https://github.com/influxdata/influxdb/blob/master/influxql/README.md) updated - [ ] Provide example syntax - [ ] Update man page when modifying a command +- [ ] Config changes: update sample config (`etc/config.sample.toml`), server `NewDemoConfig` method, and `Diagnostics` methods reporting config settings, if necessary - [ ] [InfluxData Documentation](https://github.com/influxdata/docs.influxdata.com): issue filed or pull request submitted \ From 208d8507f1a231c1fc6e1dd22a28b5cddef21b12 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Mon, 27 Feb 2017 10:57:55 -0600 Subject: [PATCH 04/11] Implement both single and multiline comments in influxql A single line comment will read until the end of a line and is started with `--` (just like SQL). A multiline comment is with `/* */`. You cannot nest multiline comments. --- CHANGELOG.md | 1 + influxql/README.md | 21 ++++++++++++++++ influxql/parser.go | 10 +++++--- influxql/parser_test.go | 55 +++++++++++++++++++++++++++++++++++++++++ influxql/scanner.go | 43 ++++++++++++++++++++++++++++++++ influxql/token.go | 1 + 6 files changed, 127 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a0a84296e..1c1747c969 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - [#7553](https://github.com/influxdata/influxdb/issues/7553): Add modulo operator to the query language. - [#7856](https://github.com/influxdata/influxdb/issues/7856): Failed points during an import now result in a non-zero exit code. - [#7821](https://github.com/influxdata/influxdb/issues/7821): Expose some configuration settings via SHOW DIAGNOSTICS +- [#8025](https://github.com/influxdata/influxdb/issues/8025): Support single and multiline comments in InfluxQL. ## v1.2.2 [2017-03-14] diff --git a/influxql/README.md b/influxql/README.md index e577b74af2..9da91cf5de 100644 --- a/influxql/README.md +++ b/influxql/README.md @@ -36,6 +36,27 @@ Notation operators in order of increasing precedence: {} repetition (0 to n times) ``` +## Comments + +Both single and multiline comments are supported. A comment is treated +the same as whitespace by the parser. + +``` +-- single line comment +/* + multiline comment +*/ +``` + +Single line comments will skip all text until the scanner hits a +newline. Multiline comments will skip all text until the end comment +marker is hit. Nested multiline comments are not supported so the +following does not work: + +``` +/* /* this does not work */ */ +``` + ## Query representation ### Characters diff --git a/influxql/parser.go b/influxql/parser.go index fd79d9e852..2a1be3edb6 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -2686,13 +2686,15 @@ func (p *Parser) parseResample() (time.Duration, time.Duration, error) { // scan returns the next token from the underlying scanner. func (p *Parser) scan() (tok Token, pos Pos, lit string) { return p.s.Scan() } -// scanIgnoreWhitespace scans the next non-whitespace token. +// scanIgnoreWhitespace scans the next non-whitespace and non-comment token. func (p *Parser) scanIgnoreWhitespace() (tok Token, pos Pos, lit string) { - tok, pos, lit = p.scan() - if tok == WS { + for { tok, pos, lit = p.scan() + if tok == WS || tok == COMMENT { + continue + } + return } - return } // consumeWhitespace scans the next token if it's whitespace. diff --git a/influxql/parser_test.go b/influxql/parser_test.go index f3af57742e..d8064e2670 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -43,6 +43,29 @@ func TestParser_ParseQuery_Empty(t *testing.T) { } } +// Ensure the parser will skip comments. +func TestParser_ParseQuery_SkipComments(t *testing.T) { + q, err := influxql.ParseQuery(`SELECT * FROM cpu; -- read from cpu database + +/* create continuous query */ +CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN + SELECT mean(*) INTO db1..:MEASUREMENT FROM cpu GROUP BY time(5m) +END; + +/* just a multline comment +what is this doing here? +**/ + +-- should ignore the trailing multiline comment /* +SELECT mean(value) FROM gpu; +-- trailing comment at the end`) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } else if len(q.Statements) != 3 { + t.Fatalf("unexpected statement count: %d", len(q.Statements)) + } +} + // Ensure the parser can return an error from an malformed statement. func TestParser_ParseQuery_ParseError(t *testing.T) { _, err := influxql.NewParser(strings.NewReader(`SELECT`)).ParseQuery() @@ -1303,6 +1326,38 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // select statements with intertwined comments + { + s: `SELECT "user" /*, system, idle */ FROM cpu`, + stmt: &influxql.SelectStatement{ + IsRawQuery: true, + Fields: []*influxql.Field{ + {Expr: &influxql.VarRef{Val: "user"}}, + }, + Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, + }, + }, + + { + s: `SELECT /foo\/*bar/ FROM /foo\/*bar*/ WHERE x = 1`, + stmt: &influxql.SelectStatement{ + IsRawQuery: true, + Fields: []*influxql.Field{ + {Expr: &influxql.RegexLiteral{Val: regexp.MustCompile(`foo/*bar`)}}, + }, + Sources: []influxql.Source{ + &influxql.Measurement{ + Regex: &influxql.RegexLiteral{Val: regexp.MustCompile(`foo/*bar*`)}, + }, + }, + Condition: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "x"}, + RHS: &influxql.IntegerLiteral{Val: 1}, + }, + }, + }, + // See issues https://github.com/influxdata/influxdb/issues/1647 // and https://github.com/influxdata/influxdb/issues/4404 // DELETE statement diff --git a/influxql/scanner.go b/influxql/scanner.go index 1dc26ce4e8..a4bda50ff7 100644 --- a/influxql/scanner.go +++ b/influxql/scanner.go @@ -64,6 +64,15 @@ func (s *Scanner) Scan() (tok Token, pos Pos, lit string) { case '*': return MUL, pos, "" case '/': + ch1, _ := s.r.read() + if ch1 == '*' { + if err := s.skipUntilEndComment(); err != nil { + return ILLEGAL, pos, "" + } + return COMMENT, pos, "" + } else { + s.r.unread() + } return DIV, pos, "" case '%': return MOD, pos, "" @@ -137,6 +146,36 @@ func (s *Scanner) scanWhitespace() (tok Token, pos Pos, lit string) { return WS, pos, buf.String() } +// skipUntilNewline skips characters until it reaches a newline. +func (s *Scanner) skipUntilNewline() { + for { + if ch, _ := s.r.read(); ch == '\n' || ch == eof { + return + } + } +} + +// skipUntilEndComment skips characters until it reaches a '*/' symbol. +func (s *Scanner) skipUntilEndComment() error { + for { + if ch1, _ := s.r.read(); ch1 == '*' { + // We might be at the end. + star: + ch2, _ := s.r.read() + if ch2 == '/' { + return nil + } else if ch2 == '*' { + // We are back in the state machine since we see a star. + goto star + } else if ch2 == eof { + return io.EOF + } + } else if ch1 == eof { + return io.EOF + } + } +} + func (s *Scanner) scanIdent(lookup bool) (tok Token, pos Pos, lit string) { // Save the starting position of the identifier. _, pos = s.r.read() @@ -230,6 +269,10 @@ func (s *Scanner) scanNumber() (tok Token, pos Pos, lit string) { } else if ch == '+' { return ADD, pos, "" } else if ch == '-' { + if ch1 == '-' { + s.skipUntilNewline() + return COMMENT, pos, "" + } return SUB, pos, "" } } else if ch == '.' { diff --git a/influxql/token.go b/influxql/token.go index a2e17b04cb..4c67ef3af6 100644 --- a/influxql/token.go +++ b/influxql/token.go @@ -13,6 +13,7 @@ const ( ILLEGAL Token = iota EOF WS + COMMENT literalBeg // IDENT and the following are InfluxQL literal tokens. From 7bd1bd8ab394fbb6e3bd03e323cdb9c91ae6cf90 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 10 Mar 2017 15:48:15 -0700 Subject: [PATCH 05/11] Only calculate disk size if shard has changed Calling DiskSize can be expensive with many shards. Since the stats collection runs this every 10s by default, it can be expensive and wasteful to calculate the stats when nothing has changed. This avoids re-calculating the shard size unless something has chagned. --- tsdb/shard.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tsdb/shard.go b/tsdb/shard.go index 00e89f746c..f9e7b3c7d0 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -914,17 +914,28 @@ func (s *Shard) monitor() { defer t.Stop() t2 := time.NewTicker(time.Minute) defer t2.Stop() + var changed time.Time + for { select { case <-s.closing: return case <-t.C: + + // Checking DiskSize can be expensive with a lot of shards and TSM files, only + // check if something has changed. + lm := s.LastModified() + if lm.Equal(changed) { + continue + } + size, err := s.DiskSize() if err != nil { s.logger.Info(fmt.Sprintf("Error collecting shard size: %v", err)) continue } atomic.StoreInt64(&s.stats.DiskBytes, size) + changed = lm case <-t2.C: if s.options.Config.MaxValuesPerTag == 0 { continue From e9eb925170ba25082939d03882bf0c6848e13eeb Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 10 Mar 2017 10:10:19 -0700 Subject: [PATCH 06/11] Coalesce multiple WAL fsyncs Fsyncs to the WAL can cause higher IO with lots of small writes or slower disks. This reworks the previous wal fsyncing to remove the extra goroutine and remove the hard-coded 100ms delay. Writes to the wal still maintain the invariant that they do not return to the caller until the write is fsync'd. This also adds a new config options wal-fsync-delay (default 0s) which can be increased if a delay is desired. This is somewhat useful for system with slower disks, but the current default works well as is. --- CHANGELOG.md | 1 + etc/config.sample.toml | 6 ++++++ tsdb/config.go | 5 +++++ tsdb/config_test.go | 6 ++++++ tsdb/engine/tsm1/engine.go | 2 ++ tsdb/engine/tsm1/wal.go | 33 +++++++++++++++++++++++++-------- 6 files changed, 45 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c1747c969..54043460b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Features +- [#8143](https://github.com/influxdata/influxdb/pull/8143): Add WAL sync delay - [#7977](https://github.com/influxdata/influxdb/issues/7977): Add chunked request processing back into the Go client v2 - [#7974](https://github.com/influxdata/influxdb/pull/7974): Allow non-admin users to execute SHOW DATABASES. - [#7948](https://github.com/influxdata/influxdb/pull/7948): Reduce memory allocations by reusing gzip.Writers across requests diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 4df63186b2..20e2d27ae3 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -49,6 +49,12 @@ # The directory where the TSM storage engine stores WAL files. wal-dir = "/var/lib/influxdb/wal" + # The amount of time that a write will wait before fsyncing. A duration + # greater than 0 can be used to batch up multiple fsync calls. This is useful for slower + # disks or when WAL write contention is seen. A value of 0s fsyncs every write to the WAL. + # Values in the range of 0-100ms are recommended for non-SSD disks. + # wal-fsync-delay = "0s" + # Trace logging provides more verbose output around the tsm engine. Turning # this on can provide more useful output for debugging tsm engine issues. # trace-logging-enabled = false diff --git a/tsdb/config.go b/tsdb/config.go index 213c8cfb15..46b1c12171 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -53,6 +53,11 @@ type Config struct { // General WAL configuration options WALDir string `toml:"wal-dir"` + // WALFsyncDelay is the amount of time that a write will wait before fsyncing. A duration + // greater than 0 can be used to batch up multiple fsync calls. This is useful for slower + // disks or when WAL write contention is seen. A value of 0 fsyncs every write to the WAL. + WALFsyncDelay toml.Duration `toml:"wal-fsync-delay"` + // Query logging QueryLogEnabled bool `toml:"query-log-enabled"` diff --git a/tsdb/config_test.go b/tsdb/config_test.go index 7af94f90f9..13a58c358c 100644 --- a/tsdb/config_test.go +++ b/tsdb/config_test.go @@ -2,6 +2,7 @@ package tsdb_test import ( "testing" + "time" "github.com/BurntSushi/toml" "github.com/influxdata/influxdb/tsdb" @@ -13,6 +14,7 @@ func TestConfig_Parse(t *testing.T) { if _, err := toml.Decode(` dir = "/var/lib/influxdb/data" wal-dir = "/var/lib/influxdb/wal" +wal-fsync-delay = "10s" `, &c); err != nil { t.Fatal(err) } @@ -27,6 +29,10 @@ wal-dir = "/var/lib/influxdb/wal" if got, exp := c.WALDir, "/var/lib/influxdb/wal"; got != exp { t.Errorf("unexpected wal-dir:\n\nexp=%v\n\ngot=%v\n\n", exp, got) } + if got, exp := c.WALFsyncDelay, time.Duration(10*time.Second); time.Duration(got).Nanoseconds() != exp.Nanoseconds() { + t.Errorf("unexpected wal-fsync-delay:\n\nexp=%v\n\ngot=%v\n\n", exp, got) + } + } func TestConfig_Validate_Error(t *testing.T) { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 46e1994859..67eb5448ac 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -131,6 +131,8 @@ type Engine struct { // NewEngine returns a new instance of Engine. func NewEngine(id uint64, path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine { w := NewWAL(walPath) + w.syncDelay = time.Duration(opt.Config.WALFsyncDelay) + fs := NewFileStore(path) cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), path) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index b6255e831c..cb4541b4e2 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -90,6 +90,12 @@ type WAL struct { closing chan struct{} // goroutines waiting for the next fsync syncWaiters chan chan error + syncCount uint64 + + // syncDelay sets the duration to wait before fsyncing writes. A value of 0 (default) + // will cause every write to be fsync'd. This must be set before the WAL + // is opened if a non-default value is required. + syncDelay time.Duration // WALOutput is the writer used by the logger. logger zap.Logger // Logger to be used for important messages @@ -221,30 +227,39 @@ func (l *WAL) Open() error { atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize) l.closing = make(chan struct{}) - go l.syncPeriodically() return nil } -func (l *WAL) syncPeriodically() { - t := time.NewTicker(100 * time.Millisecond) - defer t.Stop() - for { +// sync will schedule an fsync to the current wal segment and notify any +// waiting gorutines. If an fsync is already scheduled, subsequent calls will +// not schedule a new fsync and will be handle by the existing scheduled fsync. +func (l *WAL) sync() { + // If we're not the first to sync, then another goroutine is fsyncing the wal for us. + if !atomic.CompareAndSwapUint64(&l.syncCount, 0, 1) { + return + } + + // Fsync the wal and notify all pending waiters + go func() { + t := time.NewTimer(l.syncDelay) select { case <-t.C: if len(l.syncWaiters) > 0 { l.mu.Lock() err := l.currentSegmentWriter.sync() - for i := 0; i < len(l.syncWaiters); i++ { + for len(l.syncWaiters) > 0 { errC := <-l.syncWaiters errC <- err } l.mu.Unlock() } case <-l.closing: - return + t.Stop() } - } + + atomic.StoreUint64(&l.syncCount, 0) + }() } // WritePoints writes the given points to the WAL. It returns the WAL segment ID to @@ -389,6 +404,8 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) { return segID, err } + // schedule an fsync and wait for it to complete + l.sync() return segID, <-syncErr } From 27ae2929fcf369c9d10503d5e650548db1c6a709 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 15 Mar 2017 12:31:36 -0600 Subject: [PATCH 07/11] Add wal-fsync-delay to Diagnostics --- tsdb/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tsdb/config.go b/tsdb/config.go index 46b1c12171..c3c9c2045c 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -128,6 +128,7 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) { return diagnostics.RowFromMap(map[string]interface{}{ "dir": c.Dir, "wal-dir": c.WALDir, + "wal-fsync-delay": c.WALFsyncDelay, "cache-max-memory-size": c.CacheMaxMemorySize, "cache-snapshot-memory-size": c.CacheSnapshotMemorySize, "cache-snapshot-write-cold-duration": c.CacheSnapshotWriteColdDuration, From 9cdfdd04e947e03c78d30dc070b08b874411eb4a Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Mon, 6 Mar 2017 16:06:51 -0600 Subject: [PATCH 08/11] Do not increment the continuous query statistic if no query is run Instead of incrementing the `queryOk` statistic with or without the continuous query running, it will only increment when the query is actually executed. --- CHANGELOG.md | 4 ++ services/continuous_querier/service.go | 48 +++++++++------------ services/continuous_querier/service_test.go | 12 ++---- 3 files changed, 29 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 54043460b0..2a28a8dc41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ - [#7821](https://github.com/influxdata/influxdb/issues/7821): Expose some configuration settings via SHOW DIAGNOSTICS - [#8025](https://github.com/influxdata/influxdb/issues/8025): Support single and multiline comments in InfluxQL. +### Bugfixes + +- [#8091](https://github.com/influxdata/influxdb/issues/8091): Do not increment the continuous query statistic if no query is run. + ## v1.2.2 [2017-03-14] ### Release Notes diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index ef237069dc..10f7e90615 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -247,25 +247,25 @@ func (s *Service) runContinuousQueries(req *RunRequest) { if !req.matches(&cq) { continue } - if err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil { + if ok, err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil { s.Logger.Info(fmt.Sprintf("error executing query: %s: err = %s", cq.Query, err)) atomic.AddInt64(&s.stats.QueryFail, 1) - } else { + } else if ok { atomic.AddInt64(&s.stats.QueryOK, 1) } } } } -// ExecuteContinuousQuery executes a single CQ. -func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) error { +// ExecuteContinuousQuery may execute a single CQ. This will return false if there were no errors and the CQ was not run. +func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) (bool, error) { // TODO: re-enable stats //s.stats.Inc("continuousQueryExecuted") // Local wrapper / helper. cq, err := NewContinuousQuery(dbi.Name, cqi) if err != nil { - return err + return false, err } // Get the last time this CQ was run from the service's cache. @@ -279,26 +279,26 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti cq.setIntoRP(dbi.DefaultRetentionPolicy) } - // See if this query needs to be run. - run, nextRun, err := cq.shouldRunContinuousQuery(now) - if err != nil { - return err - } else if !run { - return nil - } - // Get the group by interval. interval, err := cq.q.GroupByInterval() if err != nil { - return err + return false, err } else if interval == 0 { - return nil + return false, nil } // Get the group by offset. offset, err := cq.q.GroupByOffset() if err != nil { - return err + return false, err + } + + // See if this query needs to be run. + run, nextRun, err := cq.shouldRunContinuousQuery(now, interval) + if err != nil { + return false, err + } else if !run { + return false, nil } resampleEvery := interval @@ -333,12 +333,12 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti endTime := now.Add(interval - resampleEvery - offset).Truncate(interval).Add(offset) if !endTime.After(startTime) { // Exit early since there is no time interval. - return nil + return false, nil } if err := cq.q.SetTimeRange(startTime, endTime); err != nil { s.Logger.Info(fmt.Sprintf("error setting time range: %s\n", err)) - return err + return false, err } var start time.Time @@ -350,13 +350,13 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti // Do the actual processing of the query & writing of results. if err := s.runContinuousQueryAndWriteResult(cq); err != nil { s.Logger.Info(fmt.Sprintf("error: %s. running: %s\n", err, cq.q.String())) - return err + return false, err } if s.loggingEnabled { s.Logger.Info(fmt.Sprintf("finished continuous query %s (%v to %v) in %s", cq.Info.Name, startTime, endTime, time.Since(start))) } - return nil + return true, nil } // runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in @@ -441,18 +441,12 @@ func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*Contin // shouldRunContinuousQuery returns true if the CQ should be schedule to run. It will use the // lastRunTime of the CQ and the rules for when to run set through the query to determine // if this CQ should be run. -func (cq *ContinuousQuery) shouldRunContinuousQuery(now time.Time) (bool, time.Time, error) { +func (cq *ContinuousQuery) shouldRunContinuousQuery(now time.Time, interval time.Duration) (bool, time.Time, error) { // if it's not aggregated we don't run it if cq.q.IsRawQuery { return false, cq.LastRun, errors.New("continuous queries must be aggregate queries") } - // since it's aggregated we need to figure how often it should be run - interval, err := cq.q.GroupByInterval() - if err != nil { - return false, cq.LastRun, err - } - // allow the interval to be overwritten by the query's resample options resampleEvery := interval if cq.Resample.Every != 0 { diff --git a/services/continuous_querier/service_test.go b/services/continuous_querier/service_test.go index 6a38737e3d..72158f876e 100644 --- a/services/continuous_querier/service_test.go +++ b/services/continuous_querier/service_test.go @@ -340,22 +340,19 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) { cqi := dbi.ContinuousQueries[0] cqi.Query = `this is not a query` - err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()) - if err == nil { + if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil { t.Error("expected error but got nil") } // Valid query but invalid continuous query. cqi.Query = `SELECT * FROM cpu` - err = s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()) - if err == nil { + if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil { t.Error("expected error but got nil") } // Group by requires aggregate. cqi.Query = `SELECT value INTO other_value FROM cpu WHERE time > now() - 1h GROUP BY time(1s)` - err = s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()) - if err == nil { + if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil { t.Error("expected error but got nil") } } @@ -374,8 +371,7 @@ func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) { cqi := dbi.ContinuousQueries[0] now := time.Now().Truncate(10 * time.Minute) - err := s.ExecuteContinuousQuery(&dbi, &cqi, now) - if err != errExpected { + if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, now); err != errExpected { t.Errorf("exp = %s, got = %v", errExpected, err) } } From 5072db40c25e979839920e6833a967fee40b1bec Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Wed, 8 Mar 2017 13:54:18 -0600 Subject: [PATCH 09/11] Forbid wildcards in binary expressions When rewriting fields, wildcards within binary expressions were skipped. This now throws an error whenever it finds a wildcard within a binary expression in order to prevent the panic that occurs. --- CHANGELOG.md | 1 + influxql/ast.go | 20 ++++++++++++++++++ influxql/ast_test.go | 50 ++++++++++++++++++++++++++++++++++++++------ 3 files changed, 65 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a28a8dc41..8a30a45999 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ ### Bugfixes - [#8091](https://github.com/influxdata/influxdb/issues/8091): Do not increment the continuous query statistic if no query is run. +- [#8064](https://github.com/influxdata/influxdb/issues/8064): Forbid wildcards in binary expressions. ## v1.2.2 [2017-03-14] diff --git a/influxql/ast.go b/influxql/ast.go index 8a86b343d7..d1e3ee1584 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -1273,6 +1273,26 @@ func (s *SelectStatement) RewriteFields(m FieldMapper) (*SelectStatement, error) Alias: fmt.Sprintf("%s_%s", f.Name(), ref.Val), }) } + case *BinaryExpr: + // Search for regexes or wildcards within the binary + // expression. If we find any, throw an error indicating that + // it's illegal. + var regex, wildcard bool + WalkFunc(expr, func(n Node) { + switch n.(type) { + case *RegexLiteral: + regex = true + case *Wildcard: + wildcard = true + } + }) + + if wildcard { + return nil, fmt.Errorf("unsupported expression with wildcard: %s", f.Expr) + } else if regex { + return nil, fmt.Errorf("unsupported expression with regex field: %s", f.Expr) + } + rwFields = append(rwFields, f) default: rwFields = append(rwFields, f) } diff --git a/influxql/ast_test.go b/influxql/ast_test.go index 74e6779c8a..646f73a924 100644 --- a/influxql/ast_test.go +++ b/influxql/ast_test.go @@ -326,6 +326,7 @@ func TestSelectStatement_RewriteFields(t *testing.T) { var tests = []struct { stmt string rewrite string + err string }{ // No wildcards { @@ -462,6 +463,35 @@ func TestSelectStatement_RewriteFields(t *testing.T) { stmt: `SELECT * FROM (SELECT mean(value1) FROM cpu GROUP BY host) GROUP BY *`, rewrite: `SELECT mean::float FROM (SELECT mean(value1::float) FROM cpu GROUP BY host) GROUP BY host`, }, + + // Invalid queries that can't be rewritten should return an error (to + // avoid a panic in the query engine) + { + stmt: `SELECT count(*) / 2 FROM cpu`, + err: `unsupported expression with wildcard: count(*) / 2`, + }, + + { + stmt: `SELECT * / 2 FROM (SELECT count(*) FROM cpu)`, + err: `unsupported expression with wildcard: * / 2`, + }, + + { + stmt: `SELECT count(/value/) / 2 FROM cpu`, + err: `unsupported expression with regex field: count(/value/) / 2`, + }, + + // This one should be possible though since there's no wildcard in the + // binary expression. + { + stmt: `SELECT value1 + value2, * FROM cpu`, + rewrite: `SELECT value1::float + value2::integer, host::tag, region::tag, value1::float, value2::integer FROM cpu`, + }, + + { + stmt: `SELECT value1 + value2, /value/ FROM cpu`, + rewrite: `SELECT value1::float + value2::integer, value1::float, value2::integer FROM cpu`, + }, } for i, tt := range tests { @@ -496,12 +526,20 @@ func TestSelectStatement_RewriteFields(t *testing.T) { // Rewrite statement. rw, err := stmt.(*influxql.SelectStatement).RewriteFields(&ic) - if err != nil { - t.Errorf("%d. %q: error: %s", i, tt.stmt, err) - } else if rw == nil { - t.Errorf("%d. %q: unexpected nil statement", i, tt.stmt) - } else if rw := rw.String(); tt.rewrite != rw { - t.Errorf("%d. %q: unexpected rewrite:\n\nexp=%s\n\ngot=%s\n\n", i, tt.stmt, tt.rewrite, rw) + if tt.err != "" { + if err != nil && err.Error() != tt.err { + t.Errorf("%d. %q: unexpected error: %s != %s", i, tt.stmt, err.Error(), tt.err) + } else if err == nil { + t.Errorf("%d. %q: expected error", i, tt.stmt) + } + } else { + if err != nil { + t.Errorf("%d. %q: error: %s", i, tt.stmt, err) + } else if rw == nil && tt.err == "" { + t.Errorf("%d. %q: unexpected nil statement", i, tt.stmt) + } else if rw := rw.String(); tt.rewrite != rw { + t.Errorf("%d. %q: unexpected rewrite:\n\nexp=%s\n\ngot=%s\n\n", i, tt.stmt, tt.rewrite, rw) + } } } } From 41c8370bbc550ecfbf2652872bbc7d91fa573c9e Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 16 Mar 2017 15:51:23 -0500 Subject: [PATCH 10/11] Fix fill(linear) when multiple series exist and there are null values When there were multiple series and anything other than the last series had any null values, the series would start using the first point from the next series to interpolate points. Interpolation should not cross between series. Now, the linear fill checks to make sure the next point is within the same series before using it to perform interpolation. --- CHANGELOG.md | 1 + influxql/iterator.gen.go | 6 +-- influxql/iterator.gen.go.tmpl | 3 +- influxql/select_test.go | 72 +++++++++++++++++++++++++++++++++++ 4 files changed, 76 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a30a45999..10e742465d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - [#8091](https://github.com/influxdata/influxdb/issues/8091): Do not increment the continuous query statistic if no query is run. - [#8064](https://github.com/influxdata/influxdb/issues/8064): Forbid wildcards in binary expressions. +- [#8148](https://github.com/influxdata/influxdb/issues/8148): Fix fill(linear) when multiple series exist and there are null values. ## v1.2.2 [2017-03-14] diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index aa82c9104a..acf2a8a592 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -680,8 +680,7 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) { next, err := itr.input.peek() if err != nil { return nil, err - } - if next != nil { + } else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() { interval := int64(itr.opt.Interval.Duration) start := itr.window.time / interval p.Value = linearFloat(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value) @@ -3338,8 +3337,7 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) { next, err := itr.input.peek() if err != nil { return nil, err - } - if next != nil { + } else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() { interval := int64(itr.opt.Interval.Duration) start := itr.window.time / interval p.Value = linearInteger(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value) diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 5269fb9b28..08e39457f9 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -679,8 +679,7 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) { next, err := itr.input.peek() if err != nil { return nil, err - } - if next != nil { + } else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() { interval := int64(itr.opt.Interval.Duration) start := itr.window.time / interval p.Value = linear{{$k.Name}}(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value) diff --git a/influxql/select_test.go b/influxql/select_test.go index d738207a15..62b7aafd2d 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -1291,6 +1291,42 @@ func TestSelect_Fill_Linear_Float_Many(t *testing.T) { } } +func TestSelect_Fill_Linear_Float_MultipleSeries(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if m.Name != "cpu" { + t.Fatalf("unexpected source: %s", m.Name) + } + return influxql.NewCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("host=B"), Time: 32 * Second, Value: 4}, + }}, opt) + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(linear)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2, Aggregated: 1}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 20 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 30 * Second, Value: 4, Aggregated: 1}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 40 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + // Ensure a SELECT query with a fill(linear) statement can be executed for integers. func TestSelect_Fill_Linear_Integer_One(t *testing.T) { var ic IteratorCreator @@ -1354,6 +1390,42 @@ func TestSelect_Fill_Linear_Integer_Many(t *testing.T) { } } +func TestSelect_Fill_Linear_Integer_MultipleSeries(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if m.Name != "cpu" { + t.Fatalf("unexpected source: %s", m.Name) + } + return influxql.NewCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{ + {Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("host=B"), Time: 32 * Second, Value: 4}, + }}, opt) + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT max(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(linear)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2, Aggregated: 1}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 20 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 30 * Second, Value: 4, Aggregated: 1}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 40 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + // Ensure a SELECT stddev() query can be executed. func TestSelect_Stddev_Float(t *testing.T) { var ic IteratorCreator From 5fba1bdcd3ce697868d17eb9bebd31dbe2a4890e Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 17 Mar 2017 08:39:48 -0500 Subject: [PATCH 11/11] Update liner dependency to handle docker exec The liner dependency now handles the scenario where the terminal width is reported as zero. Previously, liner would panic when it tried to divide by the width (which was zero). Now it falls back onto a dumb prompt rather than attempting to use a smart prompt and panicking. --- CHANGELOG.md | 1 + Godeps | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 10e742465d..25a37ac197 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - [#8091](https://github.com/influxdata/influxdb/issues/8091): Do not increment the continuous query statistic if no query is run. - [#8064](https://github.com/influxdata/influxdb/issues/8064): Forbid wildcards in binary expressions. - [#8148](https://github.com/influxdata/influxdb/issues/8148): Fix fill(linear) when multiple series exist and there are null values. +- [#7995](https://github.com/influxdata/influxdb/issues/7995): Update liner dependency to handle docker exec. ## v1.2.2 [2017-03-14] diff --git a/Godeps b/Godeps index 0229a298ea..f247cdf8e3 100644 --- a/Godeps +++ b/Godeps @@ -12,7 +12,7 @@ github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380 github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967 github.com/jwilder/encoding 27894731927e49b0a9023f00312be26733744815 github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447 -github.com/peterh/liner 8975875355a81d612fafb9f5a6037bdcc2d9b073 +github.com/peterh/liner 88609521dc4b6c858fd4c98b628147da928ce4ac github.com/rakyll/statik e383bbf6b2ec1a2fb8492dfd152d945fb88919b6 github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d github.com/uber-go/atomic 74ca5ec650841aee9f289dce76e928313a37cbc6