Merge pull request #8172 from influxdata/jw-dropped-points

Fix series not getting created
pull/8173/head
Jason Wilder 2017-03-21 09:44:31 -06:00 committed by GitHub
commit 58c8736ebc
25 changed files with 521 additions and 325 deletions

View File

@ -9,4 +9,5 @@ _You can erase any checkboxes below this note if they are not applicable to your
- [ ] [InfluxQL Spec](https://github.com/influxdata/influxdb/blob/master/influxql/README.md) updated
- [ ] Provide example syntax
- [ ] Update man page when modifying a command
- [ ] Config changes: update sample config (`etc/config.sample.toml`), server `NewDemoConfig` method, and `Diagnostics` methods reporting config settings, if necessary
- [ ] [InfluxData Documentation](https://github.com/influxdata/docs.influxdata.com): issue filed or pull request submitted \<link to issue or pull request\>

View File

@ -2,6 +2,7 @@
### Features
- [#8143](https://github.com/influxdata/influxdb/pull/8143): Add WAL sync delay
- [#7977](https://github.com/influxdata/influxdb/issues/7977): Add chunked request processing back into the Go client v2
- [#7974](https://github.com/influxdata/influxdb/pull/7974): Allow non-admin users to execute SHOW DATABASES.
- [#7948](https://github.com/influxdata/influxdb/pull/7948): Reduce memory allocations by reusing gzip.Writers across requests
@ -9,6 +10,14 @@
- [#7553](https://github.com/influxdata/influxdb/issues/7553): Add modulo operator to the query language.
- [#7856](https://github.com/influxdata/influxdb/issues/7856): Failed points during an import now result in a non-zero exit code.
- [#7821](https://github.com/influxdata/influxdb/issues/7821): Expose some configuration settings via SHOW DIAGNOSTICS
- [#8025](https://github.com/influxdata/influxdb/issues/8025): Support single and multiline comments in InfluxQL.
### Bugfixes
- [#8091](https://github.com/influxdata/influxdb/issues/8091): Do not increment the continuous query statistic if no query is run.
- [#8064](https://github.com/influxdata/influxdb/issues/8064): Forbid wildcards in binary expressions.
- [#8148](https://github.com/influxdata/influxdb/issues/8148): Fix fill(linear) when multiple series exist and there are null values.
- [#7995](https://github.com/influxdata/influxdb/issues/7995): Update liner dependency to handle docker exec.
## v1.2.2 [2017-03-14]

2
Godeps
View File

@ -13,7 +13,7 @@ github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380
github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967
github.com/jwilder/encoding 27894731927e49b0a9023f00312be26733744815
github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447
github.com/peterh/liner 8975875355a81d612fafb9f5a6037bdcc2d9b073
github.com/peterh/liner 88609521dc4b6c858fd4c98b628147da928ce4ac
github.com/rakyll/statik e383bbf6b2ec1a2fb8492dfd152d945fb88919b6
github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d
github.com/uber-go/atomic 74ca5ec650841aee9f289dce76e928313a37cbc6

View File

@ -49,6 +49,12 @@
# The directory where the TSM storage engine stores WAL files.
wal-dir = "/var/lib/influxdb/wal"
# The amount of time that a write will wait before fsyncing. A duration
# greater than 0 can be used to batch up multiple fsync calls. This is useful for slower
# disks or when WAL write contention is seen. A value of 0s fsyncs every write to the WAL.
# Values in the range of 0-100ms are recommended for non-SSD disks.
# wal-fsync-delay = "0s"
# Trace logging provides more verbose output around the tsm engine. Turning
# this on can provide more useful output for debugging tsm engine issues.
# trace-logging-enabled = false

View File

@ -36,6 +36,27 @@ Notation operators in order of increasing precedence:
{} repetition (0 to n times)
```
## Comments
Both single and multiline comments are supported. A comment is treated
the same as whitespace by the parser.
```
-- single line comment
/*
multiline comment
*/
```
Single line comments will skip all text until the scanner hits a
newline. Multiline comments will skip all text until the end comment
marker is hit. Nested multiline comments are not supported so the
following does not work:
```
/* /* this does not work */ */
```
## Query representation
### Characters

View File

@ -1273,6 +1273,26 @@ func (s *SelectStatement) RewriteFields(m FieldMapper) (*SelectStatement, error)
Alias: fmt.Sprintf("%s_%s", f.Name(), ref.Val),
})
}
case *BinaryExpr:
// Search for regexes or wildcards within the binary
// expression. If we find any, throw an error indicating that
// it's illegal.
var regex, wildcard bool
WalkFunc(expr, func(n Node) {
switch n.(type) {
case *RegexLiteral:
regex = true
case *Wildcard:
wildcard = true
}
})
if wildcard {
return nil, fmt.Errorf("unsupported expression with wildcard: %s", f.Expr)
} else if regex {
return nil, fmt.Errorf("unsupported expression with regex field: %s", f.Expr)
}
rwFields = append(rwFields, f)
default:
rwFields = append(rwFields, f)
}

View File

@ -326,6 +326,7 @@ func TestSelectStatement_RewriteFields(t *testing.T) {
var tests = []struct {
stmt string
rewrite string
err string
}{
// No wildcards
{
@ -462,6 +463,35 @@ func TestSelectStatement_RewriteFields(t *testing.T) {
stmt: `SELECT * FROM (SELECT mean(value1) FROM cpu GROUP BY host) GROUP BY *`,
rewrite: `SELECT mean::float FROM (SELECT mean(value1::float) FROM cpu GROUP BY host) GROUP BY host`,
},
// Invalid queries that can't be rewritten should return an error (to
// avoid a panic in the query engine)
{
stmt: `SELECT count(*) / 2 FROM cpu`,
err: `unsupported expression with wildcard: count(*) / 2`,
},
{
stmt: `SELECT * / 2 FROM (SELECT count(*) FROM cpu)`,
err: `unsupported expression with wildcard: * / 2`,
},
{
stmt: `SELECT count(/value/) / 2 FROM cpu`,
err: `unsupported expression with regex field: count(/value/) / 2`,
},
// This one should be possible though since there's no wildcard in the
// binary expression.
{
stmt: `SELECT value1 + value2, * FROM cpu`,
rewrite: `SELECT value1::float + value2::integer, host::tag, region::tag, value1::float, value2::integer FROM cpu`,
},
{
stmt: `SELECT value1 + value2, /value/ FROM cpu`,
rewrite: `SELECT value1::float + value2::integer, value1::float, value2::integer FROM cpu`,
},
}
for i, tt := range tests {
@ -496,12 +526,20 @@ func TestSelectStatement_RewriteFields(t *testing.T) {
// Rewrite statement.
rw, err := stmt.(*influxql.SelectStatement).RewriteFields(&ic)
if err != nil {
t.Errorf("%d. %q: error: %s", i, tt.stmt, err)
} else if rw == nil {
t.Errorf("%d. %q: unexpected nil statement", i, tt.stmt)
} else if rw := rw.String(); tt.rewrite != rw {
t.Errorf("%d. %q: unexpected rewrite:\n\nexp=%s\n\ngot=%s\n\n", i, tt.stmt, tt.rewrite, rw)
if tt.err != "" {
if err != nil && err.Error() != tt.err {
t.Errorf("%d. %q: unexpected error: %s != %s", i, tt.stmt, err.Error(), tt.err)
} else if err == nil {
t.Errorf("%d. %q: expected error", i, tt.stmt)
}
} else {
if err != nil {
t.Errorf("%d. %q: error: %s", i, tt.stmt, err)
} else if rw == nil && tt.err == "" {
t.Errorf("%d. %q: unexpected nil statement", i, tt.stmt)
} else if rw := rw.String(); tt.rewrite != rw {
t.Errorf("%d. %q: unexpected rewrite:\n\nexp=%s\n\ngot=%s\n\n", i, tt.stmt, tt.rewrite, rw)
}
}
}
}

View File

@ -680,8 +680,7 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) {
next, err := itr.input.peek()
if err != nil {
return nil, err
}
if next != nil {
} else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() {
interval := int64(itr.opt.Interval.Duration)
start := itr.window.time / interval
p.Value = linearFloat(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value)
@ -3338,8 +3337,7 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) {
next, err := itr.input.peek()
if err != nil {
return nil, err
}
if next != nil {
} else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() {
interval := int64(itr.opt.Interval.Duration)
start := itr.window.time / interval
p.Value = linearInteger(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value)

View File

@ -679,8 +679,7 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) {
next, err := itr.input.peek()
if err != nil {
return nil, err
}
if next != nil {
} else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() {
interval := int64(itr.opt.Interval.Duration)
start := itr.window.time / interval
p.Value = linear{{$k.Name}}(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value)

View File

@ -2686,13 +2686,15 @@ func (p *Parser) parseResample() (time.Duration, time.Duration, error) {
// scan returns the next token from the underlying scanner.
func (p *Parser) scan() (tok Token, pos Pos, lit string) { return p.s.Scan() }
// scanIgnoreWhitespace scans the next non-whitespace token.
// scanIgnoreWhitespace scans the next non-whitespace and non-comment token.
func (p *Parser) scanIgnoreWhitespace() (tok Token, pos Pos, lit string) {
tok, pos, lit = p.scan()
if tok == WS {
for {
tok, pos, lit = p.scan()
if tok == WS || tok == COMMENT {
continue
}
return
}
return
}
// consumeWhitespace scans the next token if it's whitespace.

View File

@ -43,6 +43,29 @@ func TestParser_ParseQuery_Empty(t *testing.T) {
}
}
// Ensure the parser will skip comments.
func TestParser_ParseQuery_SkipComments(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT * FROM cpu; -- read from cpu database
/* create continuous query */
CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN
SELECT mean(*) INTO db1..:MEASUREMENT FROM cpu GROUP BY time(5m)
END;
/* just a multline comment
what is this doing here?
**/
-- should ignore the trailing multiline comment /*
SELECT mean(value) FROM gpu;
-- trailing comment at the end`)
if err != nil {
t.Fatalf("unexpected error: %s", err)
} else if len(q.Statements) != 3 {
t.Fatalf("unexpected statement count: %d", len(q.Statements))
}
}
// Ensure the parser can return an error from an malformed statement.
func TestParser_ParseQuery_ParseError(t *testing.T) {
_, err := influxql.NewParser(strings.NewReader(`SELECT`)).ParseQuery()
@ -1303,6 +1326,38 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// select statements with intertwined comments
{
s: `SELECT "user" /*, system, idle */ FROM cpu`,
stmt: &influxql.SelectStatement{
IsRawQuery: true,
Fields: []*influxql.Field{
{Expr: &influxql.VarRef{Val: "user"}},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
},
},
{
s: `SELECT /foo\/*bar/ FROM /foo\/*bar*/ WHERE x = 1`,
stmt: &influxql.SelectStatement{
IsRawQuery: true,
Fields: []*influxql.Field{
{Expr: &influxql.RegexLiteral{Val: regexp.MustCompile(`foo/*bar`)}},
},
Sources: []influxql.Source{
&influxql.Measurement{
Regex: &influxql.RegexLiteral{Val: regexp.MustCompile(`foo/*bar*`)},
},
},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "x"},
RHS: &influxql.IntegerLiteral{Val: 1},
},
},
},
// See issues https://github.com/influxdata/influxdb/issues/1647
// and https://github.com/influxdata/influxdb/issues/4404
// DELETE statement

View File

@ -64,6 +64,15 @@ func (s *Scanner) Scan() (tok Token, pos Pos, lit string) {
case '*':
return MUL, pos, ""
case '/':
ch1, _ := s.r.read()
if ch1 == '*' {
if err := s.skipUntilEndComment(); err != nil {
return ILLEGAL, pos, ""
}
return COMMENT, pos, ""
} else {
s.r.unread()
}
return DIV, pos, ""
case '%':
return MOD, pos, ""
@ -137,6 +146,36 @@ func (s *Scanner) scanWhitespace() (tok Token, pos Pos, lit string) {
return WS, pos, buf.String()
}
// skipUntilNewline skips characters until it reaches a newline.
func (s *Scanner) skipUntilNewline() {
for {
if ch, _ := s.r.read(); ch == '\n' || ch == eof {
return
}
}
}
// skipUntilEndComment skips characters until it reaches a '*/' symbol.
func (s *Scanner) skipUntilEndComment() error {
for {
if ch1, _ := s.r.read(); ch1 == '*' {
// We might be at the end.
star:
ch2, _ := s.r.read()
if ch2 == '/' {
return nil
} else if ch2 == '*' {
// We are back in the state machine since we see a star.
goto star
} else if ch2 == eof {
return io.EOF
}
} else if ch1 == eof {
return io.EOF
}
}
}
func (s *Scanner) scanIdent(lookup bool) (tok Token, pos Pos, lit string) {
// Save the starting position of the identifier.
_, pos = s.r.read()
@ -230,6 +269,10 @@ func (s *Scanner) scanNumber() (tok Token, pos Pos, lit string) {
} else if ch == '+' {
return ADD, pos, ""
} else if ch == '-' {
if ch1 == '-' {
s.skipUntilNewline()
return COMMENT, pos, ""
}
return SUB, pos, ""
}
} else if ch == '.' {

View File

@ -1291,6 +1291,42 @@ func TestSelect_Fill_Linear_Float_Many(t *testing.T) {
}
}
func TestSelect_Fill_Linear_Float_MultipleSeries(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if m.Name != "cpu" {
t.Fatalf("unexpected source: %s", m.Name)
}
return influxql.NewCallIterator(&FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 32 * Second, Value: 4},
}}, opt)
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(linear)`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2, Aggregated: 1}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Nil: true}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Nil: true}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Nil: true}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Nil: true}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Nil: true}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Nil: true}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 20 * Second, Nil: true}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 30 * Second, Value: 4, Aggregated: 1}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 40 * Second, Nil: true}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Nil: true}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure a SELECT query with a fill(linear) statement can be executed for integers.
func TestSelect_Fill_Linear_Integer_One(t *testing.T) {
var ic IteratorCreator
@ -1354,6 +1390,42 @@ func TestSelect_Fill_Linear_Integer_Many(t *testing.T) {
}
}
func TestSelect_Fill_Linear_Integer_MultipleSeries(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if m.Name != "cpu" {
t.Fatalf("unexpected source: %s", m.Name)
}
return influxql.NewCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 32 * Second, Value: 4},
}}, opt)
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT max(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(linear)`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2, Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Nil: true}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Nil: true}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Nil: true}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Nil: true}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Nil: true}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Nil: true}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 20 * Second, Nil: true}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 30 * Second, Value: 4, Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 40 * Second, Nil: true}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Nil: true}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure a SELECT stddev() query can be executed.
func TestSelect_Stddev_Float(t *testing.T) {
var ic IteratorCreator

View File

@ -13,6 +13,7 @@ const (
ILLEGAL Token = iota
EOF
WS
COMMENT
literalBeg
// IDENT and the following are InfluxQL literal tokens.

View File

@ -247,25 +247,25 @@ func (s *Service) runContinuousQueries(req *RunRequest) {
if !req.matches(&cq) {
continue
}
if err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil {
if ok, err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil {
s.Logger.Info(fmt.Sprintf("error executing query: %s: err = %s", cq.Query, err))
atomic.AddInt64(&s.stats.QueryFail, 1)
} else {
} else if ok {
atomic.AddInt64(&s.stats.QueryOK, 1)
}
}
}
}
// ExecuteContinuousQuery executes a single CQ.
func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) error {
// ExecuteContinuousQuery may execute a single CQ. This will return false if there were no errors and the CQ was not run.
func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) (bool, error) {
// TODO: re-enable stats
//s.stats.Inc("continuousQueryExecuted")
// Local wrapper / helper.
cq, err := NewContinuousQuery(dbi.Name, cqi)
if err != nil {
return err
return false, err
}
// Get the last time this CQ was run from the service's cache.
@ -279,26 +279,26 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
cq.setIntoRP(dbi.DefaultRetentionPolicy)
}
// See if this query needs to be run.
run, nextRun, err := cq.shouldRunContinuousQuery(now)
if err != nil {
return err
} else if !run {
return nil
}
// Get the group by interval.
interval, err := cq.q.GroupByInterval()
if err != nil {
return err
return false, err
} else if interval == 0 {
return nil
return false, nil
}
// Get the group by offset.
offset, err := cq.q.GroupByOffset()
if err != nil {
return err
return false, err
}
// See if this query needs to be run.
run, nextRun, err := cq.shouldRunContinuousQuery(now, interval)
if err != nil {
return false, err
} else if !run {
return false, nil
}
resampleEvery := interval
@ -333,12 +333,12 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
endTime := now.Add(interval - resampleEvery - offset).Truncate(interval).Add(offset)
if !endTime.After(startTime) {
// Exit early since there is no time interval.
return nil
return false, nil
}
if err := cq.q.SetTimeRange(startTime, endTime); err != nil {
s.Logger.Info(fmt.Sprintf("error setting time range: %s\n", err))
return err
return false, err
}
var start time.Time
@ -350,13 +350,13 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
// Do the actual processing of the query & writing of results.
if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
s.Logger.Info(fmt.Sprintf("error: %s. running: %s\n", err, cq.q.String()))
return err
return false, err
}
if s.loggingEnabled {
s.Logger.Info(fmt.Sprintf("finished continuous query %s (%v to %v) in %s", cq.Info.Name, startTime, endTime, time.Since(start)))
}
return nil
return true, nil
}
// runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in
@ -441,18 +441,12 @@ func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*Contin
// shouldRunContinuousQuery returns true if the CQ should be schedule to run. It will use the
// lastRunTime of the CQ and the rules for when to run set through the query to determine
// if this CQ should be run.
func (cq *ContinuousQuery) shouldRunContinuousQuery(now time.Time) (bool, time.Time, error) {
func (cq *ContinuousQuery) shouldRunContinuousQuery(now time.Time, interval time.Duration) (bool, time.Time, error) {
// if it's not aggregated we don't run it
if cq.q.IsRawQuery {
return false, cq.LastRun, errors.New("continuous queries must be aggregate queries")
}
// since it's aggregated we need to figure how often it should be run
interval, err := cq.q.GroupByInterval()
if err != nil {
return false, cq.LastRun, err
}
// allow the interval to be overwritten by the query's resample options
resampleEvery := interval
if cq.Resample.Every != 0 {

View File

@ -340,22 +340,19 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {
cqi := dbi.ContinuousQueries[0]
cqi.Query = `this is not a query`
err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
if err == nil {
if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil {
t.Error("expected error but got nil")
}
// Valid query but invalid continuous query.
cqi.Query = `SELECT * FROM cpu`
err = s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
if err == nil {
if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil {
t.Error("expected error but got nil")
}
// Group by requires aggregate.
cqi.Query = `SELECT value INTO other_value FROM cpu WHERE time > now() - 1h GROUP BY time(1s)`
err = s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
if err == nil {
if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil {
t.Error("expected error but got nil")
}
}
@ -374,8 +371,7 @@ func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) {
cqi := dbi.ContinuousQueries[0]
now := time.Now().Truncate(10 * time.Minute)
err := s.ExecuteContinuousQuery(&dbi, &cqi, now)
if err != errExpected {
if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, now); err != errExpected {
t.Errorf("exp = %s, got = %v", errExpected, err)
}
}

View File

@ -57,6 +57,11 @@ type Config struct {
// General WAL configuration options
WALDir string `toml:"wal-dir"`
// WALFsyncDelay is the amount of time that a write will wait before fsyncing. A duration
// greater than 0 can be used to batch up multiple fsync calls. This is useful for slower
// disks or when WAL write contention is seen. A value of 0 fsyncs every write to the WAL.
WALFsyncDelay toml.Duration `toml:"wal-fsync-delay"`
// Query logging
QueryLogEnabled bool `toml:"query-log-enabled"`
@ -139,6 +144,7 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
return diagnostics.RowFromMap(map[string]interface{}{
"dir": c.Dir,
"wal-dir": c.WALDir,
"wal-fsync-delay": c.WALFsyncDelay,
"cache-max-memory-size": c.CacheMaxMemorySize,
"cache-snapshot-memory-size": c.CacheSnapshotMemorySize,
"cache-snapshot-write-cold-duration": c.CacheSnapshotWriteColdDuration,

View File

@ -2,6 +2,7 @@ package tsdb_test
import (
"testing"
"time"
"github.com/BurntSushi/toml"
"github.com/influxdata/influxdb/tsdb"
@ -13,6 +14,7 @@ func TestConfig_Parse(t *testing.T) {
if _, err := toml.Decode(`
dir = "/var/lib/influxdb/data"
wal-dir = "/var/lib/influxdb/wal"
wal-fsync-delay = "10s"
`, &c); err != nil {
t.Fatal(err)
}
@ -27,6 +29,10 @@ wal-dir = "/var/lib/influxdb/wal"
if got, exp := c.WALDir, "/var/lib/influxdb/wal"; got != exp {
t.Errorf("unexpected wal-dir:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
}
if got, exp := c.WALFsyncDelay, time.Duration(10*time.Second); time.Duration(got).Nanoseconds() != exp.Nanoseconds() {
t.Errorf("unexpected wal-fsync-delay:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
}
}
func TestConfig_Validate_Error(t *testing.T) {

View File

@ -140,52 +140,20 @@ func (a Values) Merge(b Values) Values {
return append(b, a...)
}
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
// Save value in a
temp := a[i]
// Overwrite a with b
a[i] = b[0]
// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]
var k int
if len(b) > 0 && av > b[len(b)-1].UnixNano() {
// Fast path where a is after b, we skip the search
k = len(b)
} else {
// See where value we save from a should be inserted in b to keep b sorted
k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
}
if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
out := make(Values, 0, len(a)+len(b))
for len(a) > 0 && len(b) > 0 {
if a[0].UnixNano() < b[0].UnixNano() {
out, a = append(out, a[0]), a[1:]
} else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() {
a = a[1:]
} else {
out, b = append(out, b[0]), b[1:]
}
}
if len(b) > 0 {
return append(a, b...)
if len(a) > 0 {
return append(out, a...)
}
return a
return append(out, b...)
}
// Sort methods
@ -322,52 +290,20 @@ func (a FloatValues) Merge(b FloatValues) FloatValues {
return append(b, a...)
}
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
// Save value in a
temp := a[i]
// Overwrite a with b
a[i] = b[0]
// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]
var k int
if len(b) > 0 && av > b[len(b)-1].UnixNano() {
// Fast path where a is after b, we skip the search
k = len(b)
} else {
// See where value we save from a should be inserted in b to keep b sorted
k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
}
if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
out := make(FloatValues, 0, len(a)+len(b))
for len(a) > 0 && len(b) > 0 {
if a[0].UnixNano() < b[0].UnixNano() {
out, a = append(out, a[0]), a[1:]
} else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() {
a = a[1:]
} else {
out, b = append(out, b[0]), b[1:]
}
}
if len(b) > 0 {
return append(a, b...)
if len(a) > 0 {
return append(out, a...)
}
return a
return append(out, b...)
}
func (a FloatValues) Encode(buf []byte) ([]byte, error) {
@ -548,52 +484,20 @@ func (a IntegerValues) Merge(b IntegerValues) IntegerValues {
return append(b, a...)
}
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
// Save value in a
temp := a[i]
// Overwrite a with b
a[i] = b[0]
// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]
var k int
if len(b) > 0 && av > b[len(b)-1].UnixNano() {
// Fast path where a is after b, we skip the search
k = len(b)
} else {
// See where value we save from a should be inserted in b to keep b sorted
k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
}
if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
out := make(IntegerValues, 0, len(a)+len(b))
for len(a) > 0 && len(b) > 0 {
if a[0].UnixNano() < b[0].UnixNano() {
out, a = append(out, a[0]), a[1:]
} else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() {
a = a[1:]
} else {
out, b = append(out, b[0]), b[1:]
}
}
if len(b) > 0 {
return append(a, b...)
if len(a) > 0 {
return append(out, a...)
}
return a
return append(out, b...)
}
func (a IntegerValues) Encode(buf []byte) ([]byte, error) {
@ -774,52 +678,20 @@ func (a StringValues) Merge(b StringValues) StringValues {
return append(b, a...)
}
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
// Save value in a
temp := a[i]
// Overwrite a with b
a[i] = b[0]
// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]
var k int
if len(b) > 0 && av > b[len(b)-1].UnixNano() {
// Fast path where a is after b, we skip the search
k = len(b)
} else {
// See where value we save from a should be inserted in b to keep b sorted
k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
}
if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
out := make(StringValues, 0, len(a)+len(b))
for len(a) > 0 && len(b) > 0 {
if a[0].UnixNano() < b[0].UnixNano() {
out, a = append(out, a[0]), a[1:]
} else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() {
a = a[1:]
} else {
out, b = append(out, b[0]), b[1:]
}
}
if len(b) > 0 {
return append(a, b...)
if len(a) > 0 {
return append(out, a...)
}
return a
return append(out, b...)
}
func (a StringValues) Encode(buf []byte) ([]byte, error) {
@ -1000,52 +872,20 @@ func (a BooleanValues) Merge(b BooleanValues) BooleanValues {
return append(b, a...)
}
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
// Save value in a
temp := a[i]
// Overwrite a with b
a[i] = b[0]
// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]
var k int
if len(b) > 0 && av > b[len(b)-1].UnixNano() {
// Fast path where a is after b, we skip the search
k = len(b)
} else {
// See where value we save from a should be inserted in b to keep b sorted
k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
}
if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
out := make(BooleanValues, 0, len(a)+len(b))
for len(a) > 0 && len(b) > 0 {
if a[0].UnixNano() < b[0].UnixNano() {
out, a = append(out, a[0]), a[1:]
} else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() {
a = a[1:]
} else {
out, b = append(out, b[0]), b[1:]
}
}
if len(b) > 0 {
return append(a, b...)
if len(a) > 0 {
return append(out, a...)
}
return a
return append(out, b...)
}
func (a BooleanValues) Encode(buf []byte) ([]byte, error) {

View File

@ -137,52 +137,20 @@ func (a {{.Name}}Values) Merge(b {{.Name}}Values) {{.Name}}Values {
return append(b, a...)
}
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
// Save value in a
temp := a[i]
// Overwrite a with b
a[i] = b[0]
// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]
var k int
if len(b) > 0 && av > b[len(b)-1].UnixNano() {
// Fast path where a is after b, we skip the search
k = len(b)
} else {
// See where value we save from a should be inserted in b to keep b sorted
k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
}
if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
out := make({{.Name}}Values, 0, len(a)+len(b))
for len(a) > 0 && len(b) > 0 {
if a[0].UnixNano() < b[0].UnixNano() {
out, a = append(out, a[0]), a[1:]
} else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() {
a = a[1:]
} else {
out, b = append(out, b[0]), b[1:]
}
}
if len(b) > 0 {
return append(a, b...)
if len(a) > 0 {
return append(out, a...)
}
return a
return append(out, b...)
}
{{ if ne .Name "" }}

View File

@ -488,7 +488,6 @@ func TestValues_MergeFloat(t *testing.T) {
for i, test := range tests {
got := tsm1.Values(test.a).Merge(test.b)
if exp, got := len(test.exp), len(got); exp != got {
t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
}
@ -1366,8 +1365,96 @@ func BenchmarkValues_Merge(b *testing.B) {
}
b.ResetTimer()
benchmarkMerge(a, c, b)
}
func BenchmarkValues_MergeDisjoint(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
a := make([]tsm1.Value, len(times))
c := make([]tsm1.Value, len(times))
for i, t := range times {
a[i] = tsm1.NewValue(t, float64(i))
c[i] = tsm1.NewValue(times[len(times)-1]+int64((i+1)*1e9), float64(i))
}
b.ResetTimer()
benchmarkMerge(a, c, b)
}
func BenchmarkValues_MergeSame(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
a := make([]tsm1.Value, len(times))
c := make([]tsm1.Value, len(times))
for i, t := range times {
a[i] = tsm1.NewValue(t, float64(i))
c[i] = tsm1.NewValue(t, float64(i))
}
b.ResetTimer()
benchmarkMerge(a, c, b)
}
func BenchmarkValues_MergeSimilar(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
a := make([]tsm1.Value, len(times))
c := make([]tsm1.Value, len(times))
for i, t := range times {
a[i] = tsm1.NewValue(t, float64(i))
if i == 0 {
t++
}
c[i] = tsm1.NewValue(t, float64(i))
}
b.ResetTimer()
benchmarkMerge(a, c, b)
}
func BenchmarkValues_MergeUnevenA(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
a := make([]tsm1.Value, len(times))
c := make([]tsm1.Value, len(times))
for i, t := range times {
a[i] = tsm1.NewValue(t, float64(i))
c[i] = tsm1.NewValue(t, float64(i))
}
b.ResetTimer()
benchmarkMerge(a[:700], c[:10], b)
}
func BenchmarkValues_MergeUnevenB(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
a := make([]tsm1.Value, len(times))
c := make([]tsm1.Value, len(times))
for i, t := range times {
a[i] = tsm1.NewValue(t, float64(i))
c[i] = tsm1.NewValue(t, float64(i))
}
b.ResetTimer()
benchmarkMerge(a[:10], c[:700], b)
}
func benchmarkMerge(a, c tsm1.Values, b *testing.B) {
for i := 0; i < b.N; i++ {
tsm1.Values(a).Merge(c)
b.StopTimer()
aa := make(tsm1.Values, len(a))
copy(aa, a)
cc := make(tsm1.Values, len(c))
copy(cc, c)
b.StartTimer()
tsm1.Values(aa).Merge(tsm1.Values(cc))
}
}

View File

@ -133,6 +133,8 @@ type Engine struct {
// NewEngine returns a new instance of Engine.
func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine {
w := NewWAL(walPath)
w.syncDelay = time.Duration(opt.Config.WALFsyncDelay)
fs := NewFileStore(path)
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), path)

View File

@ -91,6 +91,12 @@ type WAL struct {
closing chan struct{}
// goroutines waiting for the next fsync
syncWaiters chan chan error
syncCount uint64
// syncDelay sets the duration to wait before fsyncing writes. A value of 0 (default)
// will cause every write to be fsync'd. This must be set before the WAL
// is opened if a non-default value is required.
syncDelay time.Duration
// WALOutput is the writer used by the logger.
logger zap.Logger // Logger to be used for important messages
@ -222,30 +228,39 @@ func (l *WAL) Open() error {
atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize)
l.closing = make(chan struct{})
go l.syncPeriodically()
return nil
}
func (l *WAL) syncPeriodically() {
t := time.NewTicker(100 * time.Millisecond)
defer t.Stop()
for {
// sync will schedule an fsync to the current wal segment and notify any
// waiting gorutines. If an fsync is already scheduled, subsequent calls will
// not schedule a new fsync and will be handle by the existing scheduled fsync.
func (l *WAL) sync() {
// If we're not the first to sync, then another goroutine is fsyncing the wal for us.
if !atomic.CompareAndSwapUint64(&l.syncCount, 0, 1) {
return
}
// Fsync the wal and notify all pending waiters
go func() {
t := time.NewTimer(l.syncDelay)
select {
case <-t.C:
if len(l.syncWaiters) > 0 {
l.mu.Lock()
err := l.currentSegmentWriter.sync()
for i := 0; i < len(l.syncWaiters); i++ {
for len(l.syncWaiters) > 0 {
errC := <-l.syncWaiters
errC <- err
}
l.mu.Unlock()
}
case <-l.closing:
return
t.Stop()
}
}
atomic.StoreUint64(&l.syncCount, 0)
}()
}
// WritePoints writes the given points to the WAL. It returns the WAL segment ID to
@ -390,6 +405,8 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) {
return segID, err
}
// schedule an fsync and wait for it to complete
l.sync()
return segID, <-syncErr
}

View File

@ -492,6 +492,7 @@ func (i *Index) DropSeries(key []byte) error {
k := string(key)
series := i.series[k]
if series == nil {
i.mu.Unlock()
return nil
}
@ -701,10 +702,13 @@ type ShardIndex struct {
// CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk.
func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags) error {
// Ensure that no tags go over the maximum cardinality.
var reason string
var dropped, n int
var dropped int
// Ensure that no tags go over the maximum cardinality.
if maxValuesPerTag := idx.opt.Config.MaxValuesPerTag; maxValuesPerTag > 0 {
var n int
outer:
for i, name := range names {
tags := tagsSlice[i]
@ -730,10 +734,10 @@ func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSli
keys[n], names[n], tagsSlice[n] = keys[i], names[i], tagsSlice[i]
n++
}
}
// Slice to only include successful points.
keys, names, tagsSlice = keys[:n], names[:n], tagsSlice[:n]
// Slice to only include successful points.
keys, names, tagsSlice = keys[:n], names[:n], tagsSlice[:n]
}
// Write
for i := range keys {

View File

@ -951,17 +951,28 @@ func (s *Shard) monitor() {
defer t.Stop()
t2 := time.NewTicker(time.Minute)
defer t2.Stop()
var changed time.Time
for {
select {
case <-s.closing:
return
case <-t.C:
// Checking DiskSize can be expensive with a lot of shards and TSM files, only
// check if something has changed.
lm := s.LastModified()
if lm.Equal(changed) {
continue
}
size, err := s.DiskSize()
if err != nil {
s.logger.Info(fmt.Sprintf("Error collecting shard size: %v", err))
continue
}
atomic.StoreInt64(&s.stats.DiskBytes, size)
changed = lm
case <-t2.C:
if s.options.Config.MaxValuesPerTag == 0 {
continue