diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index b2416b059c..5cfe48f3cc 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -9,4 +9,5 @@ _You can erase any checkboxes below this note if they are not applicable to your - [ ] [InfluxQL Spec](https://github.com/influxdata/influxdb/blob/master/influxql/README.md) updated - [ ] Provide example syntax - [ ] Update man page when modifying a command +- [ ] Config changes: update sample config (`etc/config.sample.toml`), server `NewDemoConfig` method, and `Diagnostics` methods reporting config settings, if necessary - [ ] [InfluxData Documentation](https://github.com/influxdata/docs.influxdata.com): issue filed or pull request submitted \ diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a0a84296e..25a37ac197 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Features +- [#8143](https://github.com/influxdata/influxdb/pull/8143): Add WAL sync delay - [#7977](https://github.com/influxdata/influxdb/issues/7977): Add chunked request processing back into the Go client v2 - [#7974](https://github.com/influxdata/influxdb/pull/7974): Allow non-admin users to execute SHOW DATABASES. - [#7948](https://github.com/influxdata/influxdb/pull/7948): Reduce memory allocations by reusing gzip.Writers across requests @@ -9,6 +10,14 @@ - [#7553](https://github.com/influxdata/influxdb/issues/7553): Add modulo operator to the query language. - [#7856](https://github.com/influxdata/influxdb/issues/7856): Failed points during an import now result in a non-zero exit code. - [#7821](https://github.com/influxdata/influxdb/issues/7821): Expose some configuration settings via SHOW DIAGNOSTICS +- [#8025](https://github.com/influxdata/influxdb/issues/8025): Support single and multiline comments in InfluxQL. + +### Bugfixes + +- [#8091](https://github.com/influxdata/influxdb/issues/8091): Do not increment the continuous query statistic if no query is run. +- [#8064](https://github.com/influxdata/influxdb/issues/8064): Forbid wildcards in binary expressions. +- [#8148](https://github.com/influxdata/influxdb/issues/8148): Fix fill(linear) when multiple series exist and there are null values. +- [#7995](https://github.com/influxdata/influxdb/issues/7995): Update liner dependency to handle docker exec. ## v1.2.2 [2017-03-14] diff --git a/Godeps b/Godeps index 34ee614e18..bf1125aa8c 100644 --- a/Godeps +++ b/Godeps @@ -13,7 +13,7 @@ github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380 github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967 github.com/jwilder/encoding 27894731927e49b0a9023f00312be26733744815 github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447 -github.com/peterh/liner 8975875355a81d612fafb9f5a6037bdcc2d9b073 +github.com/peterh/liner 88609521dc4b6c858fd4c98b628147da928ce4ac github.com/rakyll/statik e383bbf6b2ec1a2fb8492dfd152d945fb88919b6 github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d github.com/uber-go/atomic 74ca5ec650841aee9f289dce76e928313a37cbc6 diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 4df63186b2..20e2d27ae3 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -49,6 +49,12 @@ # The directory where the TSM storage engine stores WAL files. wal-dir = "/var/lib/influxdb/wal" + # The amount of time that a write will wait before fsyncing. A duration + # greater than 0 can be used to batch up multiple fsync calls. This is useful for slower + # disks or when WAL write contention is seen. A value of 0s fsyncs every write to the WAL. + # Values in the range of 0-100ms are recommended for non-SSD disks. + # wal-fsync-delay = "0s" + # Trace logging provides more verbose output around the tsm engine. Turning # this on can provide more useful output for debugging tsm engine issues. # trace-logging-enabled = false diff --git a/influxql/README.md b/influxql/README.md index e577b74af2..9da91cf5de 100644 --- a/influxql/README.md +++ b/influxql/README.md @@ -36,6 +36,27 @@ Notation operators in order of increasing precedence: {} repetition (0 to n times) ``` +## Comments + +Both single and multiline comments are supported. A comment is treated +the same as whitespace by the parser. + +``` +-- single line comment +/* + multiline comment +*/ +``` + +Single line comments will skip all text until the scanner hits a +newline. Multiline comments will skip all text until the end comment +marker is hit. Nested multiline comments are not supported so the +following does not work: + +``` +/* /* this does not work */ */ +``` + ## Query representation ### Characters diff --git a/influxql/ast.go b/influxql/ast.go index 8a86b343d7..d1e3ee1584 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -1273,6 +1273,26 @@ func (s *SelectStatement) RewriteFields(m FieldMapper) (*SelectStatement, error) Alias: fmt.Sprintf("%s_%s", f.Name(), ref.Val), }) } + case *BinaryExpr: + // Search for regexes or wildcards within the binary + // expression. If we find any, throw an error indicating that + // it's illegal. + var regex, wildcard bool + WalkFunc(expr, func(n Node) { + switch n.(type) { + case *RegexLiteral: + regex = true + case *Wildcard: + wildcard = true + } + }) + + if wildcard { + return nil, fmt.Errorf("unsupported expression with wildcard: %s", f.Expr) + } else if regex { + return nil, fmt.Errorf("unsupported expression with regex field: %s", f.Expr) + } + rwFields = append(rwFields, f) default: rwFields = append(rwFields, f) } diff --git a/influxql/ast_test.go b/influxql/ast_test.go index 74e6779c8a..646f73a924 100644 --- a/influxql/ast_test.go +++ b/influxql/ast_test.go @@ -326,6 +326,7 @@ func TestSelectStatement_RewriteFields(t *testing.T) { var tests = []struct { stmt string rewrite string + err string }{ // No wildcards { @@ -462,6 +463,35 @@ func TestSelectStatement_RewriteFields(t *testing.T) { stmt: `SELECT * FROM (SELECT mean(value1) FROM cpu GROUP BY host) GROUP BY *`, rewrite: `SELECT mean::float FROM (SELECT mean(value1::float) FROM cpu GROUP BY host) GROUP BY host`, }, + + // Invalid queries that can't be rewritten should return an error (to + // avoid a panic in the query engine) + { + stmt: `SELECT count(*) / 2 FROM cpu`, + err: `unsupported expression with wildcard: count(*) / 2`, + }, + + { + stmt: `SELECT * / 2 FROM (SELECT count(*) FROM cpu)`, + err: `unsupported expression with wildcard: * / 2`, + }, + + { + stmt: `SELECT count(/value/) / 2 FROM cpu`, + err: `unsupported expression with regex field: count(/value/) / 2`, + }, + + // This one should be possible though since there's no wildcard in the + // binary expression. + { + stmt: `SELECT value1 + value2, * FROM cpu`, + rewrite: `SELECT value1::float + value2::integer, host::tag, region::tag, value1::float, value2::integer FROM cpu`, + }, + + { + stmt: `SELECT value1 + value2, /value/ FROM cpu`, + rewrite: `SELECT value1::float + value2::integer, value1::float, value2::integer FROM cpu`, + }, } for i, tt := range tests { @@ -496,12 +526,20 @@ func TestSelectStatement_RewriteFields(t *testing.T) { // Rewrite statement. rw, err := stmt.(*influxql.SelectStatement).RewriteFields(&ic) - if err != nil { - t.Errorf("%d. %q: error: %s", i, tt.stmt, err) - } else if rw == nil { - t.Errorf("%d. %q: unexpected nil statement", i, tt.stmt) - } else if rw := rw.String(); tt.rewrite != rw { - t.Errorf("%d. %q: unexpected rewrite:\n\nexp=%s\n\ngot=%s\n\n", i, tt.stmt, tt.rewrite, rw) + if tt.err != "" { + if err != nil && err.Error() != tt.err { + t.Errorf("%d. %q: unexpected error: %s != %s", i, tt.stmt, err.Error(), tt.err) + } else if err == nil { + t.Errorf("%d. %q: expected error", i, tt.stmt) + } + } else { + if err != nil { + t.Errorf("%d. %q: error: %s", i, tt.stmt, err) + } else if rw == nil && tt.err == "" { + t.Errorf("%d. %q: unexpected nil statement", i, tt.stmt) + } else if rw := rw.String(); tt.rewrite != rw { + t.Errorf("%d. %q: unexpected rewrite:\n\nexp=%s\n\ngot=%s\n\n", i, tt.stmt, tt.rewrite, rw) + } } } } diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index aa82c9104a..acf2a8a592 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -680,8 +680,7 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) { next, err := itr.input.peek() if err != nil { return nil, err - } - if next != nil { + } else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() { interval := int64(itr.opt.Interval.Duration) start := itr.window.time / interval p.Value = linearFloat(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value) @@ -3338,8 +3337,7 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) { next, err := itr.input.peek() if err != nil { return nil, err - } - if next != nil { + } else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() { interval := int64(itr.opt.Interval.Duration) start := itr.window.time / interval p.Value = linearInteger(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value) diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 5269fb9b28..08e39457f9 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -679,8 +679,7 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) { next, err := itr.input.peek() if err != nil { return nil, err - } - if next != nil { + } else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() { interval := int64(itr.opt.Interval.Duration) start := itr.window.time / interval p.Value = linear{{$k.Name}}(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value) diff --git a/influxql/parser.go b/influxql/parser.go index fd79d9e852..2a1be3edb6 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -2686,13 +2686,15 @@ func (p *Parser) parseResample() (time.Duration, time.Duration, error) { // scan returns the next token from the underlying scanner. func (p *Parser) scan() (tok Token, pos Pos, lit string) { return p.s.Scan() } -// scanIgnoreWhitespace scans the next non-whitespace token. +// scanIgnoreWhitespace scans the next non-whitespace and non-comment token. func (p *Parser) scanIgnoreWhitespace() (tok Token, pos Pos, lit string) { - tok, pos, lit = p.scan() - if tok == WS { + for { tok, pos, lit = p.scan() + if tok == WS || tok == COMMENT { + continue + } + return } - return } // consumeWhitespace scans the next token if it's whitespace. diff --git a/influxql/parser_test.go b/influxql/parser_test.go index f3af57742e..d8064e2670 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -43,6 +43,29 @@ func TestParser_ParseQuery_Empty(t *testing.T) { } } +// Ensure the parser will skip comments. +func TestParser_ParseQuery_SkipComments(t *testing.T) { + q, err := influxql.ParseQuery(`SELECT * FROM cpu; -- read from cpu database + +/* create continuous query */ +CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN + SELECT mean(*) INTO db1..:MEASUREMENT FROM cpu GROUP BY time(5m) +END; + +/* just a multline comment +what is this doing here? +**/ + +-- should ignore the trailing multiline comment /* +SELECT mean(value) FROM gpu; +-- trailing comment at the end`) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } else if len(q.Statements) != 3 { + t.Fatalf("unexpected statement count: %d", len(q.Statements)) + } +} + // Ensure the parser can return an error from an malformed statement. func TestParser_ParseQuery_ParseError(t *testing.T) { _, err := influxql.NewParser(strings.NewReader(`SELECT`)).ParseQuery() @@ -1303,6 +1326,38 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // select statements with intertwined comments + { + s: `SELECT "user" /*, system, idle */ FROM cpu`, + stmt: &influxql.SelectStatement{ + IsRawQuery: true, + Fields: []*influxql.Field{ + {Expr: &influxql.VarRef{Val: "user"}}, + }, + Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, + }, + }, + + { + s: `SELECT /foo\/*bar/ FROM /foo\/*bar*/ WHERE x = 1`, + stmt: &influxql.SelectStatement{ + IsRawQuery: true, + Fields: []*influxql.Field{ + {Expr: &influxql.RegexLiteral{Val: regexp.MustCompile(`foo/*bar`)}}, + }, + Sources: []influxql.Source{ + &influxql.Measurement{ + Regex: &influxql.RegexLiteral{Val: regexp.MustCompile(`foo/*bar*`)}, + }, + }, + Condition: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "x"}, + RHS: &influxql.IntegerLiteral{Val: 1}, + }, + }, + }, + // See issues https://github.com/influxdata/influxdb/issues/1647 // and https://github.com/influxdata/influxdb/issues/4404 // DELETE statement diff --git a/influxql/scanner.go b/influxql/scanner.go index 1dc26ce4e8..a4bda50ff7 100644 --- a/influxql/scanner.go +++ b/influxql/scanner.go @@ -64,6 +64,15 @@ func (s *Scanner) Scan() (tok Token, pos Pos, lit string) { case '*': return MUL, pos, "" case '/': + ch1, _ := s.r.read() + if ch1 == '*' { + if err := s.skipUntilEndComment(); err != nil { + return ILLEGAL, pos, "" + } + return COMMENT, pos, "" + } else { + s.r.unread() + } return DIV, pos, "" case '%': return MOD, pos, "" @@ -137,6 +146,36 @@ func (s *Scanner) scanWhitespace() (tok Token, pos Pos, lit string) { return WS, pos, buf.String() } +// skipUntilNewline skips characters until it reaches a newline. +func (s *Scanner) skipUntilNewline() { + for { + if ch, _ := s.r.read(); ch == '\n' || ch == eof { + return + } + } +} + +// skipUntilEndComment skips characters until it reaches a '*/' symbol. +func (s *Scanner) skipUntilEndComment() error { + for { + if ch1, _ := s.r.read(); ch1 == '*' { + // We might be at the end. + star: + ch2, _ := s.r.read() + if ch2 == '/' { + return nil + } else if ch2 == '*' { + // We are back in the state machine since we see a star. + goto star + } else if ch2 == eof { + return io.EOF + } + } else if ch1 == eof { + return io.EOF + } + } +} + func (s *Scanner) scanIdent(lookup bool) (tok Token, pos Pos, lit string) { // Save the starting position of the identifier. _, pos = s.r.read() @@ -230,6 +269,10 @@ func (s *Scanner) scanNumber() (tok Token, pos Pos, lit string) { } else if ch == '+' { return ADD, pos, "" } else if ch == '-' { + if ch1 == '-' { + s.skipUntilNewline() + return COMMENT, pos, "" + } return SUB, pos, "" } } else if ch == '.' { diff --git a/influxql/select_test.go b/influxql/select_test.go index d738207a15..62b7aafd2d 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -1291,6 +1291,42 @@ func TestSelect_Fill_Linear_Float_Many(t *testing.T) { } } +func TestSelect_Fill_Linear_Float_MultipleSeries(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if m.Name != "cpu" { + t.Fatalf("unexpected source: %s", m.Name) + } + return influxql.NewCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("host=B"), Time: 32 * Second, Value: 4}, + }}, opt) + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(linear)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2, Aggregated: 1}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 20 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 30 * Second, Value: 4, Aggregated: 1}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 40 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + // Ensure a SELECT query with a fill(linear) statement can be executed for integers. func TestSelect_Fill_Linear_Integer_One(t *testing.T) { var ic IteratorCreator @@ -1354,6 +1390,42 @@ func TestSelect_Fill_Linear_Integer_Many(t *testing.T) { } } +func TestSelect_Fill_Linear_Integer_MultipleSeries(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if m.Name != "cpu" { + t.Fatalf("unexpected source: %s", m.Name) + } + return influxql.NewCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{ + {Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("host=B"), Time: 32 * Second, Value: 4}, + }}, opt) + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT max(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(linear)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2, Aggregated: 1}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 20 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 30 * Second, Value: 4, Aggregated: 1}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 40 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + // Ensure a SELECT stddev() query can be executed. func TestSelect_Stddev_Float(t *testing.T) { var ic IteratorCreator diff --git a/influxql/token.go b/influxql/token.go index a2e17b04cb..4c67ef3af6 100644 --- a/influxql/token.go +++ b/influxql/token.go @@ -13,6 +13,7 @@ const ( ILLEGAL Token = iota EOF WS + COMMENT literalBeg // IDENT and the following are InfluxQL literal tokens. diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index ef237069dc..10f7e90615 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -247,25 +247,25 @@ func (s *Service) runContinuousQueries(req *RunRequest) { if !req.matches(&cq) { continue } - if err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil { + if ok, err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil { s.Logger.Info(fmt.Sprintf("error executing query: %s: err = %s", cq.Query, err)) atomic.AddInt64(&s.stats.QueryFail, 1) - } else { + } else if ok { atomic.AddInt64(&s.stats.QueryOK, 1) } } } } -// ExecuteContinuousQuery executes a single CQ. -func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) error { +// ExecuteContinuousQuery may execute a single CQ. This will return false if there were no errors and the CQ was not run. +func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) (bool, error) { // TODO: re-enable stats //s.stats.Inc("continuousQueryExecuted") // Local wrapper / helper. cq, err := NewContinuousQuery(dbi.Name, cqi) if err != nil { - return err + return false, err } // Get the last time this CQ was run from the service's cache. @@ -279,26 +279,26 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti cq.setIntoRP(dbi.DefaultRetentionPolicy) } - // See if this query needs to be run. - run, nextRun, err := cq.shouldRunContinuousQuery(now) - if err != nil { - return err - } else if !run { - return nil - } - // Get the group by interval. interval, err := cq.q.GroupByInterval() if err != nil { - return err + return false, err } else if interval == 0 { - return nil + return false, nil } // Get the group by offset. offset, err := cq.q.GroupByOffset() if err != nil { - return err + return false, err + } + + // See if this query needs to be run. + run, nextRun, err := cq.shouldRunContinuousQuery(now, interval) + if err != nil { + return false, err + } else if !run { + return false, nil } resampleEvery := interval @@ -333,12 +333,12 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti endTime := now.Add(interval - resampleEvery - offset).Truncate(interval).Add(offset) if !endTime.After(startTime) { // Exit early since there is no time interval. - return nil + return false, nil } if err := cq.q.SetTimeRange(startTime, endTime); err != nil { s.Logger.Info(fmt.Sprintf("error setting time range: %s\n", err)) - return err + return false, err } var start time.Time @@ -350,13 +350,13 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti // Do the actual processing of the query & writing of results. if err := s.runContinuousQueryAndWriteResult(cq); err != nil { s.Logger.Info(fmt.Sprintf("error: %s. running: %s\n", err, cq.q.String())) - return err + return false, err } if s.loggingEnabled { s.Logger.Info(fmt.Sprintf("finished continuous query %s (%v to %v) in %s", cq.Info.Name, startTime, endTime, time.Since(start))) } - return nil + return true, nil } // runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in @@ -441,18 +441,12 @@ func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*Contin // shouldRunContinuousQuery returns true if the CQ should be schedule to run. It will use the // lastRunTime of the CQ and the rules for when to run set through the query to determine // if this CQ should be run. -func (cq *ContinuousQuery) shouldRunContinuousQuery(now time.Time) (bool, time.Time, error) { +func (cq *ContinuousQuery) shouldRunContinuousQuery(now time.Time, interval time.Duration) (bool, time.Time, error) { // if it's not aggregated we don't run it if cq.q.IsRawQuery { return false, cq.LastRun, errors.New("continuous queries must be aggregate queries") } - // since it's aggregated we need to figure how often it should be run - interval, err := cq.q.GroupByInterval() - if err != nil { - return false, cq.LastRun, err - } - // allow the interval to be overwritten by the query's resample options resampleEvery := interval if cq.Resample.Every != 0 { diff --git a/services/continuous_querier/service_test.go b/services/continuous_querier/service_test.go index 6a38737e3d..72158f876e 100644 --- a/services/continuous_querier/service_test.go +++ b/services/continuous_querier/service_test.go @@ -340,22 +340,19 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) { cqi := dbi.ContinuousQueries[0] cqi.Query = `this is not a query` - err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()) - if err == nil { + if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil { t.Error("expected error but got nil") } // Valid query but invalid continuous query. cqi.Query = `SELECT * FROM cpu` - err = s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()) - if err == nil { + if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil { t.Error("expected error but got nil") } // Group by requires aggregate. cqi.Query = `SELECT value INTO other_value FROM cpu WHERE time > now() - 1h GROUP BY time(1s)` - err = s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()) - if err == nil { + if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil { t.Error("expected error but got nil") } } @@ -374,8 +371,7 @@ func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) { cqi := dbi.ContinuousQueries[0] now := time.Now().Truncate(10 * time.Minute) - err := s.ExecuteContinuousQuery(&dbi, &cqi, now) - if err != errExpected { + if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, now); err != errExpected { t.Errorf("exp = %s, got = %v", errExpected, err) } } diff --git a/tsdb/config.go b/tsdb/config.go index 9a23abf63f..02a1047bcb 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -57,6 +57,11 @@ type Config struct { // General WAL configuration options WALDir string `toml:"wal-dir"` + // WALFsyncDelay is the amount of time that a write will wait before fsyncing. A duration + // greater than 0 can be used to batch up multiple fsync calls. This is useful for slower + // disks or when WAL write contention is seen. A value of 0 fsyncs every write to the WAL. + WALFsyncDelay toml.Duration `toml:"wal-fsync-delay"` + // Query logging QueryLogEnabled bool `toml:"query-log-enabled"` @@ -139,6 +144,7 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) { return diagnostics.RowFromMap(map[string]interface{}{ "dir": c.Dir, "wal-dir": c.WALDir, + "wal-fsync-delay": c.WALFsyncDelay, "cache-max-memory-size": c.CacheMaxMemorySize, "cache-snapshot-memory-size": c.CacheSnapshotMemorySize, "cache-snapshot-write-cold-duration": c.CacheSnapshotWriteColdDuration, diff --git a/tsdb/config_test.go b/tsdb/config_test.go index 44d644f808..24749513ec 100644 --- a/tsdb/config_test.go +++ b/tsdb/config_test.go @@ -2,6 +2,7 @@ package tsdb_test import ( "testing" + "time" "github.com/BurntSushi/toml" "github.com/influxdata/influxdb/tsdb" @@ -13,6 +14,7 @@ func TestConfig_Parse(t *testing.T) { if _, err := toml.Decode(` dir = "/var/lib/influxdb/data" wal-dir = "/var/lib/influxdb/wal" +wal-fsync-delay = "10s" `, &c); err != nil { t.Fatal(err) } @@ -27,6 +29,10 @@ wal-dir = "/var/lib/influxdb/wal" if got, exp := c.WALDir, "/var/lib/influxdb/wal"; got != exp { t.Errorf("unexpected wal-dir:\n\nexp=%v\n\ngot=%v\n\n", exp, got) } + if got, exp := c.WALFsyncDelay, time.Duration(10*time.Second); time.Duration(got).Nanoseconds() != exp.Nanoseconds() { + t.Errorf("unexpected wal-fsync-delay:\n\nexp=%v\n\ngot=%v\n\n", exp, got) + } + } func TestConfig_Validate_Error(t *testing.T) { diff --git a/tsdb/engine/tsm1/encoding.gen.go b/tsdb/engine/tsm1/encoding.gen.go index 600e49f912..3a83fc3b44 100644 --- a/tsdb/engine/tsm1/encoding.gen.go +++ b/tsdb/engine/tsm1/encoding.gen.go @@ -140,52 +140,20 @@ func (a Values) Merge(b Values) Values { return append(b, a...) } - for i := 0; i < len(a) && len(b) > 0; i++ { - av, bv := a[i].UnixNano(), b[0].UnixNano() - // Value in a is greater than B, we need to merge - if av > bv { - // Save value in a - temp := a[i] - - // Overwrite a with b - a[i] = b[0] - - // Slide all values of b down 1 - copy(b, b[1:]) - b = b[:len(b)-1] - - var k int - if len(b) > 0 && av > b[len(b)-1].UnixNano() { - // Fast path where a is after b, we skip the search - k = len(b) - } else { - // See where value we save from a should be inserted in b to keep b sorted - k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) - } - - if k == len(b) { - // Last position? - b = append(b, temp) - } else if b[k].UnixNano() != temp.UnixNano() { - // Save the last element, since it will get overwritten - last := b[len(b)-1] - // Somewhere in the middle of b, insert it only if it's not a duplicate - copy(b[k+1:], b[k:]) - // Add the last vale to the end - b = append(b, last) - b[k] = temp - } - } else if av == bv { - // Value in a an b are the same, use b - a[i] = b[0] - b = b[1:] + out := make(Values, 0, len(a)+len(b)) + for len(a) > 0 && len(b) > 0 { + if a[0].UnixNano() < b[0].UnixNano() { + out, a = append(out, a[0]), a[1:] + } else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() { + a = a[1:] + } else { + out, b = append(out, b[0]), b[1:] } } - - if len(b) > 0 { - return append(a, b...) + if len(a) > 0 { + return append(out, a...) } - return a + return append(out, b...) } // Sort methods @@ -322,52 +290,20 @@ func (a FloatValues) Merge(b FloatValues) FloatValues { return append(b, a...) } - for i := 0; i < len(a) && len(b) > 0; i++ { - av, bv := a[i].UnixNano(), b[0].UnixNano() - // Value in a is greater than B, we need to merge - if av > bv { - // Save value in a - temp := a[i] - - // Overwrite a with b - a[i] = b[0] - - // Slide all values of b down 1 - copy(b, b[1:]) - b = b[:len(b)-1] - - var k int - if len(b) > 0 && av > b[len(b)-1].UnixNano() { - // Fast path where a is after b, we skip the search - k = len(b) - } else { - // See where value we save from a should be inserted in b to keep b sorted - k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) - } - - if k == len(b) { - // Last position? - b = append(b, temp) - } else if b[k].UnixNano() != temp.UnixNano() { - // Save the last element, since it will get overwritten - last := b[len(b)-1] - // Somewhere in the middle of b, insert it only if it's not a duplicate - copy(b[k+1:], b[k:]) - // Add the last vale to the end - b = append(b, last) - b[k] = temp - } - } else if av == bv { - // Value in a an b are the same, use b - a[i] = b[0] - b = b[1:] + out := make(FloatValues, 0, len(a)+len(b)) + for len(a) > 0 && len(b) > 0 { + if a[0].UnixNano() < b[0].UnixNano() { + out, a = append(out, a[0]), a[1:] + } else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() { + a = a[1:] + } else { + out, b = append(out, b[0]), b[1:] } } - - if len(b) > 0 { - return append(a, b...) + if len(a) > 0 { + return append(out, a...) } - return a + return append(out, b...) } func (a FloatValues) Encode(buf []byte) ([]byte, error) { @@ -548,52 +484,20 @@ func (a IntegerValues) Merge(b IntegerValues) IntegerValues { return append(b, a...) } - for i := 0; i < len(a) && len(b) > 0; i++ { - av, bv := a[i].UnixNano(), b[0].UnixNano() - // Value in a is greater than B, we need to merge - if av > bv { - // Save value in a - temp := a[i] - - // Overwrite a with b - a[i] = b[0] - - // Slide all values of b down 1 - copy(b, b[1:]) - b = b[:len(b)-1] - - var k int - if len(b) > 0 && av > b[len(b)-1].UnixNano() { - // Fast path where a is after b, we skip the search - k = len(b) - } else { - // See where value we save from a should be inserted in b to keep b sorted - k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) - } - - if k == len(b) { - // Last position? - b = append(b, temp) - } else if b[k].UnixNano() != temp.UnixNano() { - // Save the last element, since it will get overwritten - last := b[len(b)-1] - // Somewhere in the middle of b, insert it only if it's not a duplicate - copy(b[k+1:], b[k:]) - // Add the last vale to the end - b = append(b, last) - b[k] = temp - } - } else if av == bv { - // Value in a an b are the same, use b - a[i] = b[0] - b = b[1:] + out := make(IntegerValues, 0, len(a)+len(b)) + for len(a) > 0 && len(b) > 0 { + if a[0].UnixNano() < b[0].UnixNano() { + out, a = append(out, a[0]), a[1:] + } else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() { + a = a[1:] + } else { + out, b = append(out, b[0]), b[1:] } } - - if len(b) > 0 { - return append(a, b...) + if len(a) > 0 { + return append(out, a...) } - return a + return append(out, b...) } func (a IntegerValues) Encode(buf []byte) ([]byte, error) { @@ -774,52 +678,20 @@ func (a StringValues) Merge(b StringValues) StringValues { return append(b, a...) } - for i := 0; i < len(a) && len(b) > 0; i++ { - av, bv := a[i].UnixNano(), b[0].UnixNano() - // Value in a is greater than B, we need to merge - if av > bv { - // Save value in a - temp := a[i] - - // Overwrite a with b - a[i] = b[0] - - // Slide all values of b down 1 - copy(b, b[1:]) - b = b[:len(b)-1] - - var k int - if len(b) > 0 && av > b[len(b)-1].UnixNano() { - // Fast path where a is after b, we skip the search - k = len(b) - } else { - // See where value we save from a should be inserted in b to keep b sorted - k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) - } - - if k == len(b) { - // Last position? - b = append(b, temp) - } else if b[k].UnixNano() != temp.UnixNano() { - // Save the last element, since it will get overwritten - last := b[len(b)-1] - // Somewhere in the middle of b, insert it only if it's not a duplicate - copy(b[k+1:], b[k:]) - // Add the last vale to the end - b = append(b, last) - b[k] = temp - } - } else if av == bv { - // Value in a an b are the same, use b - a[i] = b[0] - b = b[1:] + out := make(StringValues, 0, len(a)+len(b)) + for len(a) > 0 && len(b) > 0 { + if a[0].UnixNano() < b[0].UnixNano() { + out, a = append(out, a[0]), a[1:] + } else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() { + a = a[1:] + } else { + out, b = append(out, b[0]), b[1:] } } - - if len(b) > 0 { - return append(a, b...) + if len(a) > 0 { + return append(out, a...) } - return a + return append(out, b...) } func (a StringValues) Encode(buf []byte) ([]byte, error) { @@ -1000,52 +872,20 @@ func (a BooleanValues) Merge(b BooleanValues) BooleanValues { return append(b, a...) } - for i := 0; i < len(a) && len(b) > 0; i++ { - av, bv := a[i].UnixNano(), b[0].UnixNano() - // Value in a is greater than B, we need to merge - if av > bv { - // Save value in a - temp := a[i] - - // Overwrite a with b - a[i] = b[0] - - // Slide all values of b down 1 - copy(b, b[1:]) - b = b[:len(b)-1] - - var k int - if len(b) > 0 && av > b[len(b)-1].UnixNano() { - // Fast path where a is after b, we skip the search - k = len(b) - } else { - // See where value we save from a should be inserted in b to keep b sorted - k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) - } - - if k == len(b) { - // Last position? - b = append(b, temp) - } else if b[k].UnixNano() != temp.UnixNano() { - // Save the last element, since it will get overwritten - last := b[len(b)-1] - // Somewhere in the middle of b, insert it only if it's not a duplicate - copy(b[k+1:], b[k:]) - // Add the last vale to the end - b = append(b, last) - b[k] = temp - } - } else if av == bv { - // Value in a an b are the same, use b - a[i] = b[0] - b = b[1:] + out := make(BooleanValues, 0, len(a)+len(b)) + for len(a) > 0 && len(b) > 0 { + if a[0].UnixNano() < b[0].UnixNano() { + out, a = append(out, a[0]), a[1:] + } else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() { + a = a[1:] + } else { + out, b = append(out, b[0]), b[1:] } } - - if len(b) > 0 { - return append(a, b...) + if len(a) > 0 { + return append(out, a...) } - return a + return append(out, b...) } func (a BooleanValues) Encode(buf []byte) ([]byte, error) { diff --git a/tsdb/engine/tsm1/encoding.gen.go.tmpl b/tsdb/engine/tsm1/encoding.gen.go.tmpl index c837602bf1..f410679700 100644 --- a/tsdb/engine/tsm1/encoding.gen.go.tmpl +++ b/tsdb/engine/tsm1/encoding.gen.go.tmpl @@ -137,52 +137,20 @@ func (a {{.Name}}Values) Merge(b {{.Name}}Values) {{.Name}}Values { return append(b, a...) } - for i := 0; i < len(a) && len(b) > 0; i++ { - av, bv := a[i].UnixNano(), b[0].UnixNano() - // Value in a is greater than B, we need to merge - if av > bv { - // Save value in a - temp := a[i] - - // Overwrite a with b - a[i] = b[0] - - // Slide all values of b down 1 - copy(b, b[1:]) - b = b[:len(b)-1] - - var k int - if len(b) > 0 && av > b[len(b)-1].UnixNano() { - // Fast path where a is after b, we skip the search - k = len(b) - } else { - // See where value we save from a should be inserted in b to keep b sorted - k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) - } - - if k == len(b) { - // Last position? - b = append(b, temp) - } else if b[k].UnixNano() != temp.UnixNano() { - // Save the last element, since it will get overwritten - last := b[len(b)-1] - // Somewhere in the middle of b, insert it only if it's not a duplicate - copy(b[k+1:], b[k:]) - // Add the last vale to the end - b = append(b, last) - b[k] = temp - } - } else if av == bv { - // Value in a an b are the same, use b - a[i] = b[0] - b = b[1:] + out := make({{.Name}}Values, 0, len(a)+len(b)) + for len(a) > 0 && len(b) > 0 { + if a[0].UnixNano() < b[0].UnixNano() { + out, a = append(out, a[0]), a[1:] + } else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() { + a = a[1:] + } else { + out, b = append(out, b[0]), b[1:] } } - - if len(b) > 0 { - return append(a, b...) + if len(a) > 0 { + return append(out, a...) } - return a + return append(out, b...) } {{ if ne .Name "" }} diff --git a/tsdb/engine/tsm1/encoding_test.go b/tsdb/engine/tsm1/encoding_test.go index 9391a4ec67..ddf20153e2 100644 --- a/tsdb/engine/tsm1/encoding_test.go +++ b/tsdb/engine/tsm1/encoding_test.go @@ -488,7 +488,6 @@ func TestValues_MergeFloat(t *testing.T) { for i, test := range tests { got := tsm1.Values(test.a).Merge(test.b) - if exp, got := len(test.exp), len(got); exp != got { t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got) } @@ -1366,8 +1365,96 @@ func BenchmarkValues_Merge(b *testing.B) { } b.ResetTimer() + benchmarkMerge(a, c, b) +} + +func BenchmarkValues_MergeDisjoint(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + c := make([]tsm1.Value, len(times)) + + for i, t := range times { + a[i] = tsm1.NewValue(t, float64(i)) + c[i] = tsm1.NewValue(times[len(times)-1]+int64((i+1)*1e9), float64(i)) + } + + b.ResetTimer() + benchmarkMerge(a, c, b) +} + +func BenchmarkValues_MergeSame(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + c := make([]tsm1.Value, len(times)) + + for i, t := range times { + a[i] = tsm1.NewValue(t, float64(i)) + c[i] = tsm1.NewValue(t, float64(i)) + } + + b.ResetTimer() + benchmarkMerge(a, c, b) +} + +func BenchmarkValues_MergeSimilar(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + c := make([]tsm1.Value, len(times)) + + for i, t := range times { + a[i] = tsm1.NewValue(t, float64(i)) + if i == 0 { + t++ + } + c[i] = tsm1.NewValue(t, float64(i)) + } + + b.ResetTimer() + benchmarkMerge(a, c, b) +} + +func BenchmarkValues_MergeUnevenA(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + c := make([]tsm1.Value, len(times)) + + for i, t := range times { + a[i] = tsm1.NewValue(t, float64(i)) + c[i] = tsm1.NewValue(t, float64(i)) + } + + b.ResetTimer() + benchmarkMerge(a[:700], c[:10], b) +} + +func BenchmarkValues_MergeUnevenB(b *testing.B) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + a := make([]tsm1.Value, len(times)) + c := make([]tsm1.Value, len(times)) + + for i, t := range times { + a[i] = tsm1.NewValue(t, float64(i)) + c[i] = tsm1.NewValue(t, float64(i)) + } + + b.ResetTimer() + benchmarkMerge(a[:10], c[:700], b) +} + +func benchmarkMerge(a, c tsm1.Values, b *testing.B) { for i := 0; i < b.N; i++ { - tsm1.Values(a).Merge(c) + b.StopTimer() + aa := make(tsm1.Values, len(a)) + copy(aa, a) + cc := make(tsm1.Values, len(c)) + copy(cc, c) + b.StartTimer() + tsm1.Values(aa).Merge(tsm1.Values(cc)) } } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 615ee9105f..a790faf8af 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -133,6 +133,8 @@ type Engine struct { // NewEngine returns a new instance of Engine. func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine { w := NewWAL(walPath) + w.syncDelay = time.Duration(opt.Config.WALFsyncDelay) + fs := NewFileStore(path) cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), path) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index a0b3e36f61..e25add1272 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -91,6 +91,12 @@ type WAL struct { closing chan struct{} // goroutines waiting for the next fsync syncWaiters chan chan error + syncCount uint64 + + // syncDelay sets the duration to wait before fsyncing writes. A value of 0 (default) + // will cause every write to be fsync'd. This must be set before the WAL + // is opened if a non-default value is required. + syncDelay time.Duration // WALOutput is the writer used by the logger. logger zap.Logger // Logger to be used for important messages @@ -222,30 +228,39 @@ func (l *WAL) Open() error { atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize) l.closing = make(chan struct{}) - go l.syncPeriodically() return nil } -func (l *WAL) syncPeriodically() { - t := time.NewTicker(100 * time.Millisecond) - defer t.Stop() - for { +// sync will schedule an fsync to the current wal segment and notify any +// waiting gorutines. If an fsync is already scheduled, subsequent calls will +// not schedule a new fsync and will be handle by the existing scheduled fsync. +func (l *WAL) sync() { + // If we're not the first to sync, then another goroutine is fsyncing the wal for us. + if !atomic.CompareAndSwapUint64(&l.syncCount, 0, 1) { + return + } + + // Fsync the wal and notify all pending waiters + go func() { + t := time.NewTimer(l.syncDelay) select { case <-t.C: if len(l.syncWaiters) > 0 { l.mu.Lock() err := l.currentSegmentWriter.sync() - for i := 0; i < len(l.syncWaiters); i++ { + for len(l.syncWaiters) > 0 { errC := <-l.syncWaiters errC <- err } l.mu.Unlock() } case <-l.closing: - return + t.Stop() } - } + + atomic.StoreUint64(&l.syncCount, 0) + }() } // WritePoints writes the given points to the WAL. It returns the WAL segment ID to @@ -390,6 +405,8 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) { return segID, err } + // schedule an fsync and wait for it to complete + l.sync() return segID, <-syncErr } diff --git a/tsdb/shard.go b/tsdb/shard.go index d2943124f1..7df8eba580 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -951,17 +951,28 @@ func (s *Shard) monitor() { defer t.Stop() t2 := time.NewTicker(time.Minute) defer t2.Stop() + var changed time.Time + for { select { case <-s.closing: return case <-t.C: + + // Checking DiskSize can be expensive with a lot of shards and TSM files, only + // check if something has changed. + lm := s.LastModified() + if lm.Equal(changed) { + continue + } + size, err := s.DiskSize() if err != nil { s.logger.Info(fmt.Sprintf("Error collecting shard size: %v", err)) continue } atomic.StoreInt64(&s.stats.DiskBytes, size) + changed = lm case <-t2.C: if s.options.Config.MaxValuesPerTag == 0 { continue