From 28f702667e029d07528f146ff982786b3f9bea5d Mon Sep 17 00:00:00 2001 From: Andy Feller Date: Thu, 25 Aug 2016 14:37:32 -0400 Subject: [PATCH 01/10] Fixing typo within example configuration file --- CHANGELOG.md | 6 ++++++ etc/config.sample.toml | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b84fbb7d4..0ff2c4e1e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## v1.0.1 [unreleased] + +### Bugfixes + +- [#7271](https://github.com/influxdata/influxdb/issues/7271): Fixing typo within example configuration file. Thanks @andyfeller! + ## v1.0.0 [2016-09-07] ### Release Notes diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 8bda42ae3d..b99d0ae478 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -174,13 +174,13 @@ reporting-disabled = false realm = "InfluxDB" ### -### [subsciber] +### [subscriber] ### ### Controls the subscriptions, which can be used to fork a copy of all data ### received by the InfluxDB host. ### -[subsciber] +[subscriber] enabled = true http-timeout = "30s" From 4326da08209cc5d700e85f434e953a20bb439959 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 9 Sep 2016 13:34:54 -0500 Subject: [PATCH 02/10] Implement time math for lazy time literals When attempting to reduce the WHERE clause, the time literals had not been converted from string literals yet. This adds the functionality to have it handle the same time math when the time literal is still a string literal. --- CHANGELOG.md | 1 + influxql/ast.go | 189 +++++++++++++++++++++++++++++++++++++++---- influxql/ast_test.go | 27 ++++++- 3 files changed, 200 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ff2c4e1e7..1afb92a30c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Bugfixes - [#7271](https://github.com/influxdata/influxdb/issues/7271): Fixing typo within example configuration file. Thanks @andyfeller! +- [#7270](https://github.com/influxdata/influxdb/issues/7270): Implement time math for lazy time literals. ## v1.0.0 [2016-09-07] diff --git a/influxql/ast.go b/influxql/ast.go index f993f4d686..c2c6432225 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -3334,6 +3334,33 @@ type StringLiteral struct { // String returns a string representation of the literal. func (l *StringLiteral) String() string { return QuoteString(l.Val) } +// IsTimeLiteral returns if this string can be interpreted as a time literal. +func (l *StringLiteral) IsTimeLiteral() bool { + return isDateTimeString(l.Val) || isDateString(l.Val) +} + +// ToTimeLiteral returns a time literal if this string can be converted to a time literal. +func (l *StringLiteral) ToTimeLiteral() (*TimeLiteral, error) { + if isDateTimeString(l.Val) { + t, err := time.Parse(DateTimeFormat, l.Val) + if err != nil { + // try to parse it as an RFCNano time + t, err = time.Parse(time.RFC3339Nano, l.Val) + if err != nil { + return nil, ErrInvalidTime + } + } + return &TimeLiteral{Val: t}, nil + } else if isDateString(l.Val) { + t, err := time.Parse(DateFormat, l.Val) + if err != nil { + return nil, ErrInvalidTime + } + return &TimeLiteral{Val: t}, nil + } + return nil, ErrInvalidTime +} + // TimeLiteral represents a point-in-time literal. type TimeLiteral struct { Val time.Time @@ -3664,22 +3691,12 @@ func timeExprValue(ref Expr, lit Expr) (t time.Time, err error) { if ref, ok := ref.(*VarRef); ok && strings.ToLower(ref.Val) == "time" { // If literal looks like a date time then parse it as a time literal. if strlit, ok := lit.(*StringLiteral); ok { - if isDateTimeString(strlit.Val) { - t, err := time.Parse(DateTimeFormat, strlit.Val) + if strlit.IsTimeLiteral() { + t, err := strlit.ToTimeLiteral() if err != nil { - // try to parse it as an RFCNano time - t, err = time.Parse(time.RFC3339Nano, strlit.Val) - if err != nil { - return time.Time{}, ErrInvalidTime - } + return time.Time{}, err } - lit = &TimeLiteral{Val: t} - } else if isDateString(strlit.Val) { - t, err := time.Parse(DateFormat, strlit.Val) - if err != nil { - return time.Time{}, ErrInvalidTime - } - lit = &TimeLiteral{Val: t} + lit = t } } @@ -4254,6 +4271,18 @@ func reduceBinaryExprDurationLHS(op Token, lhs *DurationLiteral, rhs Expr) Expr case ADD: return &TimeLiteral{Val: rhs.Val.Add(lhs.Val)} } + case *StringLiteral: + t, err := rhs.ToTimeLiteral() + if err != nil { + break + } + expr := reduceBinaryExprDurationLHS(op, lhs, t) + + // If the returned expression is still a binary expr, that means + // we couldn't reduce it so this wasn't used in a time literal context. + if _, ok := expr.(*BinaryExpr); !ok { + return expr + } case *nilLiteral: return &BooleanLiteral{Val: false} } @@ -4298,6 +4327,22 @@ func reduceBinaryExprIntegerLHS(op Token, lhs *IntegerLiteral, rhs Expr) Expr { case SUB: return &TimeLiteral{Val: time.Unix(0, lhs.Val).Add(-rhs.Val)} } + case *TimeLiteral: + d := &DurationLiteral{Val: time.Duration(lhs.Val)} + expr := reduceBinaryExprDurationLHS(op, d, rhs) + if _, ok := expr.(*BinaryExpr); !ok { + return expr + } + case *StringLiteral: + t, err := rhs.ToTimeLiteral() + if err != nil { + break + } + d := &DurationLiteral{Val: time.Duration(lhs.Val)} + expr := reduceBinaryExprDurationLHS(op, d, t) + if _, ok := expr.(*BinaryExpr); !ok { + return expr + } case *nilLiteral: return &BooleanLiteral{Val: false} } @@ -4377,11 +4422,105 @@ func reduceBinaryExprStringLHS(op Token, lhs *StringLiteral, rhs Expr) Expr { case *StringLiteral: switch op { case EQ: - return &BooleanLiteral{Val: lhs.Val == rhs.Val} + var expr Expr = &BooleanLiteral{Val: lhs.Val == rhs.Val} + // This might be a comparison between time literals. + // If it is, parse the time literals and then compare since it + // could be a different result if they use different formats + // for the same time. + if lhs.IsTimeLiteral() && rhs.IsTimeLiteral() { + tlhs, err := lhs.ToTimeLiteral() + if err != nil { + return expr + } + + trhs, err := rhs.ToTimeLiteral() + if err != nil { + return expr + } + + t := reduceBinaryExprTimeLHS(op, tlhs, trhs) + if _, ok := t.(*BinaryExpr); !ok { + expr = t + } + } + return expr case NEQ: - return &BooleanLiteral{Val: lhs.Val != rhs.Val} + var expr Expr = &BooleanLiteral{Val: lhs.Val != rhs.Val} + // This might be a comparison between time literals. + // If it is, parse the time literals and then compare since it + // could be a different result if they use different formats + // for the same time. + if lhs.IsTimeLiteral() && rhs.IsTimeLiteral() { + tlhs, err := lhs.ToTimeLiteral() + if err != nil { + return expr + } + + trhs, err := rhs.ToTimeLiteral() + if err != nil { + return expr + } + + t := reduceBinaryExprTimeLHS(op, tlhs, trhs) + if _, ok := t.(*BinaryExpr); !ok { + expr = t + } + } + return expr case ADD: return &StringLiteral{Val: lhs.Val + rhs.Val} + default: + // Attempt to convert the string literal to a time literal. + t, err := lhs.ToTimeLiteral() + if err != nil { + break + } + expr := reduceBinaryExprTimeLHS(op, t, rhs) + + // If the returned expression is still a binary expr, that means + // we couldn't reduce it so this wasn't used in a time literal context. + if _, ok := expr.(*BinaryExpr); !ok { + return expr + } + } + case *DurationLiteral: + // Attempt to convert the string literal to a time literal. + t, err := lhs.ToTimeLiteral() + if err != nil { + break + } + expr := reduceBinaryExprTimeLHS(op, t, rhs) + + // If the returned expression is still a binary expr, that means + // we couldn't reduce it so this wasn't used in a time literal context. + if _, ok := expr.(*BinaryExpr); !ok { + return expr + } + case *TimeLiteral: + // Attempt to convert the string literal to a time literal. + t, err := lhs.ToTimeLiteral() + if err != nil { + break + } + expr := reduceBinaryExprTimeLHS(op, t, rhs) + + // If the returned expression is still a binary expr, that means + // we couldn't reduce it so this wasn't used in a time literal context. + if _, ok := expr.(*BinaryExpr); !ok { + return expr + } + case *IntegerLiteral: + // Attempt to convert the string literal to a time literal. + t, err := lhs.ToTimeLiteral() + if err != nil { + break + } + expr := reduceBinaryExprTimeLHS(op, t, rhs) + + // If the returned expression is still a binary expr, that means + // we couldn't reduce it so this wasn't used in a time literal context. + if _, ok := expr.(*BinaryExpr); !ok { + return expr } case *nilLiteral: switch op { @@ -4401,6 +4540,12 @@ func reduceBinaryExprTimeLHS(op Token, lhs *TimeLiteral, rhs Expr) Expr { case SUB: return &TimeLiteral{Val: lhs.Val.Add(-rhs.Val)} } + case *IntegerLiteral: + d := &DurationLiteral{Val: time.Duration(rhs.Val)} + expr := reduceBinaryExprTimeLHS(op, lhs, d) + if _, ok := expr.(*BinaryExpr); !ok { + return expr + } case *TimeLiteral: switch op { case SUB: @@ -4418,6 +4563,18 @@ func reduceBinaryExprTimeLHS(op Token, lhs *TimeLiteral, rhs Expr) Expr { case LTE: return &BooleanLiteral{Val: lhs.Val.Before(rhs.Val) || lhs.Val.Equal(rhs.Val)} } + case *StringLiteral: + t, err := rhs.ToTimeLiteral() + if err != nil { + break + } + expr := reduceBinaryExprTimeLHS(op, lhs, t) + + // If the returned expression is still a binary expr, that means + // we couldn't reduce it so this wasn't used in a time literal context. + if _, ok := expr.(*BinaryExpr); !ok { + return expr + } case *nilLiteral: return &BooleanLiteral{Val: false} } diff --git a/influxql/ast_test.go b/influxql/ast_test.go index bf825fbb91..10d6b10edc 100644 --- a/influxql/ast_test.go +++ b/influxql/ast_test.go @@ -1091,10 +1091,13 @@ func TestReduce(t *testing.T) { {in: `true <> false`, out: `true`}, {in: `true + false`, out: `true + false`}, - // Time literals. + // Time literals with now(). {in: `now() + 2h`, out: `'2000-01-01T02:00:00Z'`, data: map[string]interface{}{"now()": now}}, {in: `now() / 2h`, out: `'2000-01-01T00:00:00Z' / 2h`, data: map[string]interface{}{"now()": now}}, {in: `4µ + now()`, out: `'2000-01-01T00:00:00.000004Z'`, data: map[string]interface{}{"now()": now}}, + {in: `now() + 2000000000`, out: `'2000-01-01T00:00:02Z'`, data: map[string]interface{}{"now()": now}}, + {in: `2000000000 + now()`, out: `'2000-01-01T00:00:02Z'`, data: map[string]interface{}{"now()": now}}, + {in: `now() - 2000000000`, out: `'1999-12-31T23:59:58Z'`, data: map[string]interface{}{"now()": now}}, {in: `now() = now()`, out: `true`, data: map[string]interface{}{"now()": now}}, {in: `now() <> now()`, out: `false`, data: map[string]interface{}{"now()": now}}, {in: `now() < now() + 1h`, out: `true`, data: map[string]interface{}{"now()": now}}, @@ -1106,6 +1109,28 @@ func TestReduce(t *testing.T) { {in: `now()`, out: `now()`}, {in: `946684800000000000 + 2h`, out: `'2000-01-01T02:00:00Z'`}, + // Time literals. + {in: `'2000-01-01T00:00:00Z' + 2h`, out: `'2000-01-01T02:00:00Z'`}, + {in: `'2000-01-01T00:00:00Z' / 2h`, out: `'2000-01-01T00:00:00Z' / 2h`}, + {in: `4µ + '2000-01-01T00:00:00Z'`, out: `'2000-01-01T00:00:00.000004Z'`}, + {in: `'2000-01-01T00:00:00Z' + 2000000000`, out: `'2000-01-01T00:00:02Z'`}, + {in: `2000000000 + '2000-01-01T00:00:00Z'`, out: `'2000-01-01T00:00:02Z'`}, + {in: `'2000-01-01T00:00:00Z' - 2000000000`, out: `'1999-12-31T23:59:58Z'`}, + {in: `'2000-01-01T00:00:00Z' = '2000-01-01T00:00:00Z'`, out: `true`}, + {in: `'2000-01-01T00:00:00.000000000Z' = '2000-01-01T00:00:00Z'`, out: `true`}, + {in: `'2000-01-01T00:00:00Z' <> '2000-01-01T00:00:00Z'`, out: `false`}, + {in: `'2000-01-01T00:00:00.000000000Z' <> '2000-01-01T00:00:00Z'`, out: `false`}, + {in: `'2000-01-01T00:00:00Z' < '2000-01-01T00:00:00Z' + 1h`, out: `true`}, + {in: `'2000-01-01T00:00:00.000000000Z' < '2000-01-01T00:00:00Z' + 1h`, out: `true`}, + {in: `'2000-01-01T00:00:00Z' <= '2000-01-01T00:00:00Z' + 1h`, out: `true`}, + {in: `'2000-01-01T00:00:00.000000000Z' <= '2000-01-01T00:00:00Z' + 1h`, out: `true`}, + {in: `'2000-01-01T00:00:00Z' > '2000-01-01T00:00:00Z' - 1h`, out: `true`}, + {in: `'2000-01-01T00:00:00.000000000Z' > '2000-01-01T00:00:00Z' - 1h`, out: `true`}, + {in: `'2000-01-01T00:00:00Z' >= '2000-01-01T00:00:00Z' - 1h`, out: `true`}, + {in: `'2000-01-01T00:00:00.000000000Z' >= '2000-01-01T00:00:00Z' - 1h`, out: `true`}, + {in: `'2000-01-01T00:00:00Z' - ('2000-01-01T00:00:00Z' - 60s)`, out: `1m`}, + {in: `'2000-01-01T00:00:00Z' AND '2000-01-01T00:00:00Z'`, out: `'2000-01-01T00:00:00Z' AND '2000-01-01T00:00:00Z'`}, + // Duration literals. {in: `10m + 1h - 60s`, out: `69m`}, {in: `(10m / 2) * 5`, out: `25m`}, From ab4bca849523cc1d18fd2696ee1e5ee3e81ee3ca Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 9 Sep 2016 14:32:41 -0500 Subject: [PATCH 03/10] Report cmdline and memstats in /debug/vars When we refactored expvar, the cmdline and memstats sections were not readded to the output. This adds it back if they can be found inside of `expvar`. It also stops trying to sort the output of the statistics so they get returned faster. JSON doesn't need them to be sorted and it causes enough latency problems that sorting them hurts performance. --- CHANGELOG.md | 1 + monitor/service.go | 2 -- services/httpd/handler.go | 49 +++++++++++++++++++++------------------ 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1afb92a30c..759cd0863c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - [#7271](https://github.com/influxdata/influxdb/issues/7271): Fixing typo within example configuration file. Thanks @andyfeller! - [#7270](https://github.com/influxdata/influxdb/issues/7270): Implement time math for lazy time literals. +- [#7272](https://github.com/influxdata/influxdb/issues/7272): Report cmdline and memstats in /debug/vars. ## v1.0.0 [2016-09-07] diff --git a/monitor/service.go b/monitor/service.go index 75fab16865..e1de19a560 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -313,8 +313,6 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) { statistics = append(statistics, statistic) statistics = m.gatherStatistics(statistics, tags) - sort.Sort(Statistics(statistics)) - return statistics, nil } diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 1b7d7aac16..cbcf0fe4bb 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -5,6 +5,7 @@ import ( "compress/gzip" "encoding/json" "errors" + "expvar" "fmt" "io" "log" @@ -12,7 +13,6 @@ import ( "net/http/pprof" "os" "runtime/debug" - "sort" "strconv" "strings" "sync/atomic" @@ -690,8 +690,31 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { return } - m := make(map[string]*monitor.Statistic) + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + fmt.Fprintln(w, "{") + first := true + if val := expvar.Get("cmdline"); val != nil { + if !first { + fmt.Fprintln(w, ",") + } + first = false + fmt.Fprintf(w, "\"cmdline\": %s", val) + } + if val := expvar.Get("memstats"); val != nil { + if !first { + fmt.Fprintln(w, ",") + } + first = false + fmt.Fprintf(w, "\"memstats\": %s", val) + } + for _, s := range stats { + val, err := json.Marshal(s) + if err != nil { + continue + } + // Very hackily create a unique key. buf := bytes.NewBufferString(s.Name) if path, ok := s.Tags["path"]; ok { @@ -718,32 +741,12 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { } key := buf.String() - m[key] = s - } - - // Sort the keys to simulate /debug/vars output. - keys := make([]string, 0, len(m)) - for k := range m { - keys = append(keys, k) - } - sort.Strings(keys) - - w.Header().Set("Content-Type", "application/json; charset=utf-8") - fmt.Fprintln(w, "{") - first := true - for _, key := range keys { - // Marshal this statistic to JSON. - out, err := json.Marshal(m[key]) - if err != nil { - continue - } - if !first { fmt.Fprintln(w, ",") } first = false fmt.Fprintf(w, "%q: ", key) - w.Write(bytes.TrimSpace(out)) + w.Write(bytes.TrimSpace(val)) } fmt.Fprintln(w, "\n}") } From 2a99ef751d8f42ea0525043b70c6dbed2ee07231 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 13 Sep 2016 15:55:23 +0100 Subject: [PATCH 04/10] Emit fieldsCreated stat in shard measurement --- CHANGELOG.md | 1 + tsdb/shard.go | 1 + 2 files changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 759cd0863c..e46b565d43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - [#7271](https://github.com/influxdata/influxdb/issues/7271): Fixing typo within example configuration file. Thanks @andyfeller! - [#7270](https://github.com/influxdata/influxdb/issues/7270): Implement time math for lazy time literals. - [#7272](https://github.com/influxdata/influxdb/issues/7272): Report cmdline and memstats in /debug/vars. +- [#7299](https://github.com/influxdata/influxdb/ssues/7299): Ensure fieldsCreated stat available in shard measurement. ## v1.0.0 [2016-09-07] diff --git a/tsdb/shard.go b/tsdb/shard.go index 7af1ae1aa2..e6a8a9e32f 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -191,6 +191,7 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic { statWriteReqOK: atomic.LoadInt64(&s.stats.WriteReqOK), statWriteReqErr: atomic.LoadInt64(&s.stats.WriteReqErr), statSeriesCreate: seriesN, + statFieldsCreate: atomic.LoadInt64(&s.stats.FieldsCreated), statWritePointsErr: atomic.LoadInt64(&s.stats.WritePointsErr), statWritePointsOK: atomic.LoadInt64(&s.stats.WritePointsOK), statWriteBytes: atomic.LoadInt64(&s.stats.BytesWritten), From 954445efd27c48711d07dc683c83ef0979d0585a Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 9 Sep 2016 17:55:36 -0500 Subject: [PATCH 05/10] Read an invalid JSON response as an error in the influx client --- CHANGELOG.md | 1 + client/influxdb.go | 34 +++++++++++++++++++++++++++++----- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e46b565d43..a88f31e052 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - [#7270](https://github.com/influxdata/influxdb/issues/7270): Implement time math for lazy time literals. - [#7272](https://github.com/influxdata/influxdb/issues/7272): Report cmdline and memstats in /debug/vars. - [#7299](https://github.com/influxdata/influxdb/ssues/7299): Ensure fieldsCreated stat available in shard measurement. +- [#6846](https://github.com/influxdata/influxdb/issues/6846): Read an invalid JSON response as an error in the influx client. ## v1.0.0 [2016-09-07] diff --git a/client/influxdb.go b/client/influxdb.go index 90695b9ed2..0d42cf0bac 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -498,17 +498,36 @@ func (r *Response) Error() error { return nil } +// duplexReader reads responses and writes it to another writer while +// satisfying the reader interface. +type duplexReader struct { + r io.Reader + w io.Writer +} + +func (r *duplexReader) Read(p []byte) (n int, err error) { + n, err = r.r.Read(p) + if err == nil { + r.w.Write(p[:n]) + } + return n, err +} + // ChunkedResponse represents a response from the server that // uses chunking to stream the output. type ChunkedResponse struct { - dec *json.Decoder + dec *json.Decoder + duplex *duplexReader + buf bytes.Buffer } // NewChunkedResponse reads a stream and produces responses from the stream. func NewChunkedResponse(r io.Reader) *ChunkedResponse { - dec := json.NewDecoder(r) - dec.UseNumber() - return &ChunkedResponse{dec: dec} + resp := &ChunkedResponse{} + resp.duplex = &duplexReader{r: r, w: &resp.buf} + resp.dec = json.NewDecoder(resp.duplex) + resp.dec.UseNumber() + return resp } // NextResponse reads the next line of the stream and returns a response. @@ -518,8 +537,13 @@ func (r *ChunkedResponse) NextResponse() (*Response, error) { if err == io.EOF { return nil, nil } - return nil, err + // A decoding error happened. This probably means the server crashed + // and sent a last-ditch error message to us. Ensure we have read the + // entirety of the connection to get any remaining error text. + io.Copy(ioutil.Discard, r.duplex) + return nil, errors.New(strings.TrimSpace(r.buf.String())) } + r.buf.Reset() return &response, nil } From 0b94f5dc1ada986515c304ee76b05d43aad77d77 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Tue, 13 Sep 2016 16:57:34 -0500 Subject: [PATCH 06/10] Skip past points at the same time in derivative call within a merged series The derivative() call would panic if it received two points at the same time because it tried to divide by zero. The derivative call now skips past these points. To avoid skipping past these points, use `GROUP BY *` so that each series is kept separated into their own series. The difference() call has also been modified to skip past these points. Even though difference doesn't divide by the time, difference is supposed to perform the same as derivative, but without dividing by the time. --- CHANGELOG.md | 1 + influxql/functions.go | 40 +++++++++++++++++ influxql/select_test.go | 96 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 137 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a88f31e052..c78990ac3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - [#7272](https://github.com/influxdata/influxdb/issues/7272): Report cmdline and memstats in /debug/vars. - [#7299](https://github.com/influxdata/influxdb/ssues/7299): Ensure fieldsCreated stat available in shard measurement. - [#6846](https://github.com/influxdata/influxdb/issues/6846): Read an invalid JSON response as an error in the influx client. +- [#7110](https://github.com/influxdata/influxdb/issues/7110): Skip past points at the same time in derivative call within a merged series. ## v1.0.0 [2016-09-07] diff --git a/influxql/functions.go b/influxql/functions.go index cf0379fb8f..3a6a268a76 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -91,6 +91,13 @@ func NewFloatDerivativeReducer(interval Interval, isNonNegative, ascending bool) // AggregateFloat aggregates a point into the reducer and updates the current window. func (r *FloatDerivativeReducer) AggregateFloat(p *FloatPoint) { + // Skip past a point when it does not advance the stream. A joined series + // may have multiple points at the same time so we will discard anything + // except the first point we encounter. + if !r.curr.Nil && r.curr.Time == p.Time { + return + } + r.prev = r.curr r.curr = *p } @@ -107,6 +114,9 @@ func (r *FloatDerivativeReducer) Emit() []FloatPoint { } value := diff / (float64(elapsed) / float64(r.interval.Duration)) + // Mark this point as read by changing the previous point to nil. + r.prev.Nil = true + // Drop negative values for non-negative derivatives. if r.isNonNegative && diff < 0 { return nil @@ -138,6 +148,13 @@ func NewIntegerDerivativeReducer(interval Interval, isNonNegative, ascending boo // AggregateInteger aggregates a point into the reducer and updates the current window. func (r *IntegerDerivativeReducer) AggregateInteger(p *IntegerPoint) { + // Skip past a point when it does not advance the stream. A joined series + // may have multiple points at the same time so we will discard anything + // except the first point we encounter. + if !r.curr.Nil && r.curr.Time == p.Time { + return + } + r.prev = r.curr r.curr = *p } @@ -154,6 +171,9 @@ func (r *IntegerDerivativeReducer) Emit() []FloatPoint { } value := diff / (float64(elapsed) / float64(r.interval.Duration)) + // Mark this point as read by changing the previous point to nil. + r.prev.Nil = true + // Drop negative values for non-negative derivatives. if r.isNonNegative && diff < 0 { return nil @@ -179,6 +199,13 @@ func NewFloatDifferenceReducer() *FloatDifferenceReducer { // AggregateFloat aggregates a point into the reducer and updates the current window. func (r *FloatDifferenceReducer) AggregateFloat(p *FloatPoint) { + // Skip past a point when it does not advance the stream. A joined series + // may have multiple points at the same time so we will discard anything + // except the first point we encounter. + if !r.curr.Nil && r.curr.Time == p.Time { + return + } + r.prev = r.curr r.curr = *p } @@ -188,6 +215,9 @@ func (r *FloatDifferenceReducer) Emit() []FloatPoint { if !r.prev.Nil { // Calculate the difference of successive points. value := r.curr.Value - r.prev.Value + + // Mark this point as read by changing the previous point to nil. + r.prev.Nil = true return []FloatPoint{{Time: r.curr.Time, Value: value}} } return nil @@ -209,6 +239,13 @@ func NewIntegerDifferenceReducer() *IntegerDifferenceReducer { // AggregateInteger aggregates a point into the reducer and updates the current window. func (r *IntegerDifferenceReducer) AggregateInteger(p *IntegerPoint) { + // Skip past a point when it does not advance the stream. A joined series + // may have multiple points at the same time so we will discard anything + // except the first point we encounter. + if !r.curr.Nil && r.curr.Time == p.Time { + return + } + r.prev = r.curr r.curr = *p } @@ -218,6 +255,9 @@ func (r *IntegerDifferenceReducer) Emit() []IntegerPoint { if !r.prev.Nil { // Calculate the difference of successive points. value := r.curr.Value - r.prev.Value + + // Mark this point as read by changing the previous point to nil. + r.prev.Nil = true return []IntegerPoint{{Time: r.curr.Time, Value: value}} } return nil diff --git a/influxql/select_test.go b/influxql/select_test.go index 8c7ff82f3b..fd4abd1a0d 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -2205,6 +2205,54 @@ func TestSelect_Derivative_Desc_Integer(t *testing.T) { } } +func TestSelect_Derivative_Duplicate_Float(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 0 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 4 * Second, Value: 3}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestSelect_Derivative_Duplicate_Integer(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &IntegerIterator{Points: []influxql.IntegerPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 0 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 4 * Second, Value: 3}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + func TestSelect_Difference_Float(t *testing.T) { var ic IteratorCreator ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { @@ -2257,6 +2305,54 @@ func TestSelect_Difference_Integer(t *testing.T) { } } +func TestSelect_Difference_Duplicate_Float(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 0 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 4 * Second, Value: 3}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -10}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestSelect_Difference_Duplicate_Integer(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &IntegerIterator{Points: []influxql.IntegerPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 0 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 4 * Second, Value: 3}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.IntegerPoint{Name: "cpu", Time: 4 * Second, Value: -10}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + func TestSelect_Elapsed_Float(t *testing.T) { var ic IteratorCreator ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { From 39ade119449a9de5e6dd9fabc83bbca55b96fba5 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 15 Sep 2016 23:51:05 -0600 Subject: [PATCH 07/10] Unload index before closing shard When deleting a shard, the shard is locked and then removed from the index. Removal from the index can be slow if there are a lot of series. During this time, the shard is still expected to exist by the meta store and tsdb store so stats collections, queries and writes could all be run on this shard while it's locked. This can cause everything to lock up until the unindexing completes and the shard can be unlocked. Fixes #7226 --- CHANGELOG.md | 10 ++++++++++ tsdb/shard.go | 8 +++++++- tsdb/store.go | 5 +++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c78990ac3d..042f2ad84a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,16 @@ - [#7299](https://github.com/influxdata/influxdb/ssues/7299): Ensure fieldsCreated stat available in shard measurement. - [#6846](https://github.com/influxdata/influxdb/issues/6846): Read an invalid JSON response as an error in the influx client. - [#7110](https://github.com/influxdata/influxdb/issues/7110): Skip past points at the same time in derivative call within a merged series. +- [#1834](https://github.com/influxdata/influxdb/issues/1834): Drop time when used as a tag or field key. +- [#7152](https://github.com/influxdata/influxdb/issues/7152): Decrement number of measurements only once when deleting the last series from a measurement. +- [#7177](https://github.com/influxdata/influxdb/issues/7177): Fix base64 encoding issue with /debug/vars stats. +- [#7196](https://github.com/influxdata/influxdb/issues/7196): Fix mmap dereferencing, fixes #7183, #7180 +- [#7013](https://github.com/influxdata/influxdb/issues/7013): Fix the dollar sign so it properly handles reserved keywords. +- [#7297](https://github.com/influxdata/influxdb/issues/7297): Use consistent column output from the CLI for column formatted responses. +- [#7231](https://github.com/influxdata/influxdb/issues/7231): Duplicate parsing bug in ALTER RETENTION POLICY. +- [#7285](https://github.com/influxdata/influxdb/issues/7285): Correctly use password-type field in Admin UI. Thanks @dandv! +- [#2792](https://github.com/influxdata/influxdb/issues/2792): Exceeding max retention policy duration gives incorrect error message +- [#7226](https://github.com/influxdata/influxdb/issues/7226): Fix database locked up when deleting shards ## v1.0.0 [2016-09-07] diff --git a/tsdb/shard.go b/tsdb/shard.go index e6a8a9e32f..b91b1a3b0c 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -262,6 +262,12 @@ func (s *Shard) Open() error { return nil } +// UnloadIndex removes all references to this shard from the DatabaseIndex +func (s *Shard) UnloadIndex() { + // Don't leak our shard ID and series keys in the index + s.index.RemoveShard(s.id) +} + // Close shuts down the shard's store. func (s *Shard) Close() error { s.mu.Lock() @@ -282,7 +288,7 @@ func (s *Shard) close() error { } // Don't leak our shard ID and series keys in the index - s.index.RemoveShard(s.id) + s.UnloadIndex() err := s.engine.Close() if err == nil { diff --git a/tsdb/store.go b/tsdb/store.go index ef8cd327bd..ed7d9a78fc 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -357,6 +357,11 @@ func (s *Store) DeleteShard(shardID uint64) error { return nil } + // Remove the shard from the database indexes before closing the shard. + // Closing the shard will do this as well, but it will unload it while + // the shard is locked which can block stats collection and other calls. + sh.UnloadIndex() + if err := sh.Close(); err != nil { return err } From dbc4a9150f9e71509ada6d8f1213f7881d9cb8d1 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Tue, 20 Sep 2016 13:58:46 -0500 Subject: [PATCH 08/10] Prevent manual use of system queries Manual use of system queries could result in a user using the query incorrect. Rather than check to make sure the query was used correctly, we're just going to prevent users from using those sources so they can't use them incorrectly. --- CHANGELOG.md | 1 + influxql/ast.go | 1 - influxql/query_executor.go | 31 ++++++++++++++++++++ influxql/query_executor_test.go | 50 +++++++++++++++++++++++++++++++++ 4 files changed, 82 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 042f2ad84a..1f55ce0546 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ - [#7285](https://github.com/influxdata/influxdb/issues/7285): Correctly use password-type field in Admin UI. Thanks @dandv! - [#2792](https://github.com/influxdata/influxdb/issues/2792): Exceeding max retention policy duration gives incorrect error message - [#7226](https://github.com/influxdata/influxdb/issues/7226): Fix database locked up when deleting shards +- [#7315](https://github.com/influxdata/influxdb/issues/7315): Prevent users from manually using system queries since incorrect use would result in a panic. ## v1.0.0 [2016-09-07] diff --git a/influxql/ast.go b/influxql/ast.go index c2c6432225..8992091176 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -409,7 +409,6 @@ func IsSystemName(name string) bool { case "_fieldKeys", "_measurements", "_series", - "_tagKey", "_tagKeys", "_tags": return true diff --git a/influxql/query_executor.go b/influxql/query_executor.go index bc9d3ebe94..f3205f0c07 100644 --- a/influxql/query_executor.go +++ b/influxql/query_executor.go @@ -199,6 +199,7 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing } var i int +LOOP: for ; i < len(query.Statements); i++ { ctx.StatementID = i stmt := query.Statements[i] @@ -211,6 +212,36 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing } } + // Do not let queries manually use the system measurements. If we find + // one, return an error. This prevents a person from using the + // measurement incorrectly and causing a panic. + if stmt, ok := stmt.(*SelectStatement); ok { + for _, s := range stmt.Sources { + switch s := s.(type) { + case *Measurement: + if IsSystemName(s.Name) { + command := "the appropriate meta command" + switch s.Name { + case "_fieldKeys": + command = "SHOW FIELD KEYS" + case "_measurements": + command = "SHOW MEASUREMENTS" + case "_series": + command = "SHOW SERIES" + case "_tagKeys": + command = "SHOW TAG KEYS" + case "_tags": + command = "SHOW TAG VALUES" + } + results <- &Result{ + Err: fmt.Errorf("unable to use system source '%s': use %s instead", s.Name, command), + } + break LOOP + } + } + } + } + // Rewrite statements, if necessary. // This can occur on meta read statements which convert to SELECT statements. newStmt, err := RewriteStatement(stmt) diff --git a/influxql/query_executor_test.go b/influxql/query_executor_test.go index 6deda3e699..3d11c8232b 100644 --- a/influxql/query_executor_test.go +++ b/influxql/query_executor_test.go @@ -283,6 +283,56 @@ func TestQueryExecutor_Panic(t *testing.T) { } } +func TestQueryExecutor_InvalidSource(t *testing.T) { + e := NewQueryExecutor() + e.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { + return errors.New("statement executed unexpectedly") + }, + } + + for i, tt := range []struct { + q string + err string + }{ + { + q: `SELECT fieldKey, fieldType FROM _fieldKeys`, + err: `unable to use system source '_fieldKeys': use SHOW FIELD KEYS instead`, + }, + { + q: `SELECT "name" FROM _measurements`, + err: `unable to use system source '_measurements': use SHOW MEASUREMENTS instead`, + }, + { + q: `SELECT "key" FROM _series`, + err: `unable to use system source '_series': use SHOW SERIES instead`, + }, + { + q: `SELECT tagKey FROM _tagKeys`, + err: `unable to use system source '_tagKeys': use SHOW TAG KEYS instead`, + }, + { + q: `SELECT "key", value FROM _tags`, + err: `unable to use system source '_tags': use SHOW TAG VALUES instead`, + }, + } { + q, err := influxql.ParseQuery(tt.q) + if err != nil { + t.Errorf("%d. unable to parse: %s", i, tt.q) + continue + } + + results := e.ExecuteQuery(q, influxql.ExecutionOptions{}, nil) + result := <-results + if len(result.Series) != 0 { + t.Errorf("%d. expected %d rows, got %d", 0, i, len(result.Series)) + } + if result.Err == nil || result.Err.Error() != tt.err { + t.Errorf("%d. unexpected error: %s", i, result.Err) + } + } +} + func discardOutput(results <-chan *influxql.Result) { for range results { // Read all results and discard. From 8b354f72951f50b130894dddee2e05149a1801d8 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 26 Sep 2016 09:15:33 -0600 Subject: [PATCH 09/10] Update 1.0.1 changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f55ce0546..7c6577303b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## v1.0.1 [unreleased] +## v1.0.1 [2016-09-26] ### Bugfixes From 6660bf5cbaff0372e44e5c1145ed65a08777dbe9 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Tue, 27 Sep 2016 15:18:57 -0500 Subject: [PATCH 10/10] Removing bad changelog entries added in 39ade11 --- CHANGELOG.md | 9 --------- 1 file changed, 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c6577303b..4b172a7623 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,15 +8,6 @@ - [#7299](https://github.com/influxdata/influxdb/ssues/7299): Ensure fieldsCreated stat available in shard measurement. - [#6846](https://github.com/influxdata/influxdb/issues/6846): Read an invalid JSON response as an error in the influx client. - [#7110](https://github.com/influxdata/influxdb/issues/7110): Skip past points at the same time in derivative call within a merged series. -- [#1834](https://github.com/influxdata/influxdb/issues/1834): Drop time when used as a tag or field key. -- [#7152](https://github.com/influxdata/influxdb/issues/7152): Decrement number of measurements only once when deleting the last series from a measurement. -- [#7177](https://github.com/influxdata/influxdb/issues/7177): Fix base64 encoding issue with /debug/vars stats. -- [#7196](https://github.com/influxdata/influxdb/issues/7196): Fix mmap dereferencing, fixes #7183, #7180 -- [#7013](https://github.com/influxdata/influxdb/issues/7013): Fix the dollar sign so it properly handles reserved keywords. -- [#7297](https://github.com/influxdata/influxdb/issues/7297): Use consistent column output from the CLI for column formatted responses. -- [#7231](https://github.com/influxdata/influxdb/issues/7231): Duplicate parsing bug in ALTER RETENTION POLICY. -- [#7285](https://github.com/influxdata/influxdb/issues/7285): Correctly use password-type field in Admin UI. Thanks @dandv! -- [#2792](https://github.com/influxdata/influxdb/issues/2792): Exceeding max retention policy duration gives incorrect error message - [#7226](https://github.com/influxdata/influxdb/issues/7226): Fix database locked up when deleting shards - [#7315](https://github.com/influxdata/influxdb/issues/7315): Prevent users from manually using system queries since incorrect use would result in a panic.