commit
e22e33d5fd
13
CHANGELOG.md
13
CHANGELOG.md
|
@ -28,6 +28,19 @@
|
|||
- [#2792](https://github.com/influxdata/influxdb/issues/2792): Exceeding max retention policy duration gives incorrect error message
|
||||
- [#7226](https://github.com/influxdata/influxdb/issues/7226): Fix database locked up when deleting shards
|
||||
|
||||
## v1.0.1 [2016-09-26]
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- [#7271](https://github.com/influxdata/influxdb/issues/7271): Fixing typo within example configuration file. Thanks @andyfeller!
|
||||
- [#7270](https://github.com/influxdata/influxdb/issues/7270): Implement time math for lazy time literals.
|
||||
- [#7272](https://github.com/influxdata/influxdb/issues/7272): Report cmdline and memstats in /debug/vars.
|
||||
- [#7299](https://github.com/influxdata/influxdb/ssues/7299): Ensure fieldsCreated stat available in shard measurement.
|
||||
- [#6846](https://github.com/influxdata/influxdb/issues/6846): Read an invalid JSON response as an error in the influx client.
|
||||
- [#7110](https://github.com/influxdata/influxdb/issues/7110): Skip past points at the same time in derivative call within a merged series.
|
||||
- [#7226](https://github.com/influxdata/influxdb/issues/7226): Fix database locked up when deleting shards
|
||||
- [#7315](https://github.com/influxdata/influxdb/issues/7315): Prevent users from manually using system queries since incorrect use would result in a panic.
|
||||
|
||||
## v1.0.0 [2016-09-08]
|
||||
|
||||
### Release Notes
|
||||
|
|
|
@ -498,17 +498,36 @@ func (r *Response) Error() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// duplexReader reads responses and writes it to another writer while
|
||||
// satisfying the reader interface.
|
||||
type duplexReader struct {
|
||||
r io.Reader
|
||||
w io.Writer
|
||||
}
|
||||
|
||||
func (r *duplexReader) Read(p []byte) (n int, err error) {
|
||||
n, err = r.r.Read(p)
|
||||
if err == nil {
|
||||
r.w.Write(p[:n])
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// ChunkedResponse represents a response from the server that
|
||||
// uses chunking to stream the output.
|
||||
type ChunkedResponse struct {
|
||||
dec *json.Decoder
|
||||
dec *json.Decoder
|
||||
duplex *duplexReader
|
||||
buf bytes.Buffer
|
||||
}
|
||||
|
||||
// NewChunkedResponse reads a stream and produces responses from the stream.
|
||||
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
|
||||
dec := json.NewDecoder(r)
|
||||
dec.UseNumber()
|
||||
return &ChunkedResponse{dec: dec}
|
||||
resp := &ChunkedResponse{}
|
||||
resp.duplex = &duplexReader{r: r, w: &resp.buf}
|
||||
resp.dec = json.NewDecoder(resp.duplex)
|
||||
resp.dec.UseNumber()
|
||||
return resp
|
||||
}
|
||||
|
||||
// NextResponse reads the next line of the stream and returns a response.
|
||||
|
@ -518,8 +537,13 @@ func (r *ChunkedResponse) NextResponse() (*Response, error) {
|
|||
if err == io.EOF {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
// A decoding error happened. This probably means the server crashed
|
||||
// and sent a last-ditch error message to us. Ensure we have read the
|
||||
// entirety of the connection to get any remaining error text.
|
||||
io.Copy(ioutil.Discard, r.duplex)
|
||||
return nil, errors.New(strings.TrimSpace(r.buf.String()))
|
||||
}
|
||||
r.buf.Reset()
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
|
|
190
influxql/ast.go
190
influxql/ast.go
|
@ -409,7 +409,6 @@ func IsSystemName(name string) bool {
|
|||
case "_fieldKeys",
|
||||
"_measurements",
|
||||
"_series",
|
||||
"_tagKey",
|
||||
"_tagKeys",
|
||||
"_tags":
|
||||
return true
|
||||
|
@ -3315,6 +3314,33 @@ type StringLiteral struct {
|
|||
// String returns a string representation of the literal.
|
||||
func (l *StringLiteral) String() string { return QuoteString(l.Val) }
|
||||
|
||||
// IsTimeLiteral returns if this string can be interpreted as a time literal.
|
||||
func (l *StringLiteral) IsTimeLiteral() bool {
|
||||
return isDateTimeString(l.Val) || isDateString(l.Val)
|
||||
}
|
||||
|
||||
// ToTimeLiteral returns a time literal if this string can be converted to a time literal.
|
||||
func (l *StringLiteral) ToTimeLiteral() (*TimeLiteral, error) {
|
||||
if isDateTimeString(l.Val) {
|
||||
t, err := time.Parse(DateTimeFormat, l.Val)
|
||||
if err != nil {
|
||||
// try to parse it as an RFCNano time
|
||||
t, err = time.Parse(time.RFC3339Nano, l.Val)
|
||||
if err != nil {
|
||||
return nil, ErrInvalidTime
|
||||
}
|
||||
}
|
||||
return &TimeLiteral{Val: t}, nil
|
||||
} else if isDateString(l.Val) {
|
||||
t, err := time.Parse(DateFormat, l.Val)
|
||||
if err != nil {
|
||||
return nil, ErrInvalidTime
|
||||
}
|
||||
return &TimeLiteral{Val: t}, nil
|
||||
}
|
||||
return nil, ErrInvalidTime
|
||||
}
|
||||
|
||||
// TimeLiteral represents a point-in-time literal.
|
||||
type TimeLiteral struct {
|
||||
Val time.Time
|
||||
|
@ -3645,22 +3671,12 @@ func timeExprValue(ref Expr, lit Expr) (t time.Time, err error) {
|
|||
if ref, ok := ref.(*VarRef); ok && strings.ToLower(ref.Val) == "time" {
|
||||
// If literal looks like a date time then parse it as a time literal.
|
||||
if strlit, ok := lit.(*StringLiteral); ok {
|
||||
if isDateTimeString(strlit.Val) {
|
||||
t, err := time.Parse(DateTimeFormat, strlit.Val)
|
||||
if strlit.IsTimeLiteral() {
|
||||
t, err := strlit.ToTimeLiteral()
|
||||
if err != nil {
|
||||
// try to parse it as an RFCNano time
|
||||
t, err = time.Parse(time.RFC3339Nano, strlit.Val)
|
||||
if err != nil {
|
||||
return time.Time{}, ErrInvalidTime
|
||||
}
|
||||
return time.Time{}, err
|
||||
}
|
||||
lit = &TimeLiteral{Val: t}
|
||||
} else if isDateString(strlit.Val) {
|
||||
t, err := time.Parse(DateFormat, strlit.Val)
|
||||
if err != nil {
|
||||
return time.Time{}, ErrInvalidTime
|
||||
}
|
||||
lit = &TimeLiteral{Val: t}
|
||||
lit = t
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4235,6 +4251,18 @@ func reduceBinaryExprDurationLHS(op Token, lhs *DurationLiteral, rhs Expr) Expr
|
|||
case ADD:
|
||||
return &TimeLiteral{Val: rhs.Val.Add(lhs.Val)}
|
||||
}
|
||||
case *StringLiteral:
|
||||
t, err := rhs.ToTimeLiteral()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
expr := reduceBinaryExprDurationLHS(op, lhs, t)
|
||||
|
||||
// If the returned expression is still a binary expr, that means
|
||||
// we couldn't reduce it so this wasn't used in a time literal context.
|
||||
if _, ok := expr.(*BinaryExpr); !ok {
|
||||
return expr
|
||||
}
|
||||
case *nilLiteral:
|
||||
return &BooleanLiteral{Val: false}
|
||||
}
|
||||
|
@ -4279,6 +4307,22 @@ func reduceBinaryExprIntegerLHS(op Token, lhs *IntegerLiteral, rhs Expr) Expr {
|
|||
case SUB:
|
||||
return &TimeLiteral{Val: time.Unix(0, lhs.Val).Add(-rhs.Val)}
|
||||
}
|
||||
case *TimeLiteral:
|
||||
d := &DurationLiteral{Val: time.Duration(lhs.Val)}
|
||||
expr := reduceBinaryExprDurationLHS(op, d, rhs)
|
||||
if _, ok := expr.(*BinaryExpr); !ok {
|
||||
return expr
|
||||
}
|
||||
case *StringLiteral:
|
||||
t, err := rhs.ToTimeLiteral()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
d := &DurationLiteral{Val: time.Duration(lhs.Val)}
|
||||
expr := reduceBinaryExprDurationLHS(op, d, t)
|
||||
if _, ok := expr.(*BinaryExpr); !ok {
|
||||
return expr
|
||||
}
|
||||
case *nilLiteral:
|
||||
return &BooleanLiteral{Val: false}
|
||||
}
|
||||
|
@ -4358,11 +4402,105 @@ func reduceBinaryExprStringLHS(op Token, lhs *StringLiteral, rhs Expr) Expr {
|
|||
case *StringLiteral:
|
||||
switch op {
|
||||
case EQ:
|
||||
return &BooleanLiteral{Val: lhs.Val == rhs.Val}
|
||||
var expr Expr = &BooleanLiteral{Val: lhs.Val == rhs.Val}
|
||||
// This might be a comparison between time literals.
|
||||
// If it is, parse the time literals and then compare since it
|
||||
// could be a different result if they use different formats
|
||||
// for the same time.
|
||||
if lhs.IsTimeLiteral() && rhs.IsTimeLiteral() {
|
||||
tlhs, err := lhs.ToTimeLiteral()
|
||||
if err != nil {
|
||||
return expr
|
||||
}
|
||||
|
||||
trhs, err := rhs.ToTimeLiteral()
|
||||
if err != nil {
|
||||
return expr
|
||||
}
|
||||
|
||||
t := reduceBinaryExprTimeLHS(op, tlhs, trhs)
|
||||
if _, ok := t.(*BinaryExpr); !ok {
|
||||
expr = t
|
||||
}
|
||||
}
|
||||
return expr
|
||||
case NEQ:
|
||||
return &BooleanLiteral{Val: lhs.Val != rhs.Val}
|
||||
var expr Expr = &BooleanLiteral{Val: lhs.Val != rhs.Val}
|
||||
// This might be a comparison between time literals.
|
||||
// If it is, parse the time literals and then compare since it
|
||||
// could be a different result if they use different formats
|
||||
// for the same time.
|
||||
if lhs.IsTimeLiteral() && rhs.IsTimeLiteral() {
|
||||
tlhs, err := lhs.ToTimeLiteral()
|
||||
if err != nil {
|
||||
return expr
|
||||
}
|
||||
|
||||
trhs, err := rhs.ToTimeLiteral()
|
||||
if err != nil {
|
||||
return expr
|
||||
}
|
||||
|
||||
t := reduceBinaryExprTimeLHS(op, tlhs, trhs)
|
||||
if _, ok := t.(*BinaryExpr); !ok {
|
||||
expr = t
|
||||
}
|
||||
}
|
||||
return expr
|
||||
case ADD:
|
||||
return &StringLiteral{Val: lhs.Val + rhs.Val}
|
||||
default:
|
||||
// Attempt to convert the string literal to a time literal.
|
||||
t, err := lhs.ToTimeLiteral()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
expr := reduceBinaryExprTimeLHS(op, t, rhs)
|
||||
|
||||
// If the returned expression is still a binary expr, that means
|
||||
// we couldn't reduce it so this wasn't used in a time literal context.
|
||||
if _, ok := expr.(*BinaryExpr); !ok {
|
||||
return expr
|
||||
}
|
||||
}
|
||||
case *DurationLiteral:
|
||||
// Attempt to convert the string literal to a time literal.
|
||||
t, err := lhs.ToTimeLiteral()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
expr := reduceBinaryExprTimeLHS(op, t, rhs)
|
||||
|
||||
// If the returned expression is still a binary expr, that means
|
||||
// we couldn't reduce it so this wasn't used in a time literal context.
|
||||
if _, ok := expr.(*BinaryExpr); !ok {
|
||||
return expr
|
||||
}
|
||||
case *TimeLiteral:
|
||||
// Attempt to convert the string literal to a time literal.
|
||||
t, err := lhs.ToTimeLiteral()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
expr := reduceBinaryExprTimeLHS(op, t, rhs)
|
||||
|
||||
// If the returned expression is still a binary expr, that means
|
||||
// we couldn't reduce it so this wasn't used in a time literal context.
|
||||
if _, ok := expr.(*BinaryExpr); !ok {
|
||||
return expr
|
||||
}
|
||||
case *IntegerLiteral:
|
||||
// Attempt to convert the string literal to a time literal.
|
||||
t, err := lhs.ToTimeLiteral()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
expr := reduceBinaryExprTimeLHS(op, t, rhs)
|
||||
|
||||
// If the returned expression is still a binary expr, that means
|
||||
// we couldn't reduce it so this wasn't used in a time literal context.
|
||||
if _, ok := expr.(*BinaryExpr); !ok {
|
||||
return expr
|
||||
}
|
||||
case *nilLiteral:
|
||||
switch op {
|
||||
|
@ -4382,6 +4520,12 @@ func reduceBinaryExprTimeLHS(op Token, lhs *TimeLiteral, rhs Expr) Expr {
|
|||
case SUB:
|
||||
return &TimeLiteral{Val: lhs.Val.Add(-rhs.Val)}
|
||||
}
|
||||
case *IntegerLiteral:
|
||||
d := &DurationLiteral{Val: time.Duration(rhs.Val)}
|
||||
expr := reduceBinaryExprTimeLHS(op, lhs, d)
|
||||
if _, ok := expr.(*BinaryExpr); !ok {
|
||||
return expr
|
||||
}
|
||||
case *TimeLiteral:
|
||||
switch op {
|
||||
case SUB:
|
||||
|
@ -4399,6 +4543,18 @@ func reduceBinaryExprTimeLHS(op Token, lhs *TimeLiteral, rhs Expr) Expr {
|
|||
case LTE:
|
||||
return &BooleanLiteral{Val: lhs.Val.Before(rhs.Val) || lhs.Val.Equal(rhs.Val)}
|
||||
}
|
||||
case *StringLiteral:
|
||||
t, err := rhs.ToTimeLiteral()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
expr := reduceBinaryExprTimeLHS(op, lhs, t)
|
||||
|
||||
// If the returned expression is still a binary expr, that means
|
||||
// we couldn't reduce it so this wasn't used in a time literal context.
|
||||
if _, ok := expr.(*BinaryExpr); !ok {
|
||||
return expr
|
||||
}
|
||||
case *nilLiteral:
|
||||
return &BooleanLiteral{Val: false}
|
||||
}
|
||||
|
|
|
@ -1019,10 +1019,13 @@ func TestReduce(t *testing.T) {
|
|||
{in: `true <> false`, out: `true`},
|
||||
{in: `true + false`, out: `true + false`},
|
||||
|
||||
// Time literals.
|
||||
// Time literals with now().
|
||||
{in: `now() + 2h`, out: `'2000-01-01T02:00:00Z'`, data: map[string]interface{}{"now()": now}},
|
||||
{in: `now() / 2h`, out: `'2000-01-01T00:00:00Z' / 2h`, data: map[string]interface{}{"now()": now}},
|
||||
{in: `4µ + now()`, out: `'2000-01-01T00:00:00.000004Z'`, data: map[string]interface{}{"now()": now}},
|
||||
{in: `now() + 2000000000`, out: `'2000-01-01T00:00:02Z'`, data: map[string]interface{}{"now()": now}},
|
||||
{in: `2000000000 + now()`, out: `'2000-01-01T00:00:02Z'`, data: map[string]interface{}{"now()": now}},
|
||||
{in: `now() - 2000000000`, out: `'1999-12-31T23:59:58Z'`, data: map[string]interface{}{"now()": now}},
|
||||
{in: `now() = now()`, out: `true`, data: map[string]interface{}{"now()": now}},
|
||||
{in: `now() <> now()`, out: `false`, data: map[string]interface{}{"now()": now}},
|
||||
{in: `now() < now() + 1h`, out: `true`, data: map[string]interface{}{"now()": now}},
|
||||
|
@ -1034,6 +1037,28 @@ func TestReduce(t *testing.T) {
|
|||
{in: `now()`, out: `now()`},
|
||||
{in: `946684800000000000 + 2h`, out: `'2000-01-01T02:00:00Z'`},
|
||||
|
||||
// Time literals.
|
||||
{in: `'2000-01-01T00:00:00Z' + 2h`, out: `'2000-01-01T02:00:00Z'`},
|
||||
{in: `'2000-01-01T00:00:00Z' / 2h`, out: `'2000-01-01T00:00:00Z' / 2h`},
|
||||
{in: `4µ + '2000-01-01T00:00:00Z'`, out: `'2000-01-01T00:00:00.000004Z'`},
|
||||
{in: `'2000-01-01T00:00:00Z' + 2000000000`, out: `'2000-01-01T00:00:02Z'`},
|
||||
{in: `2000000000 + '2000-01-01T00:00:00Z'`, out: `'2000-01-01T00:00:02Z'`},
|
||||
{in: `'2000-01-01T00:00:00Z' - 2000000000`, out: `'1999-12-31T23:59:58Z'`},
|
||||
{in: `'2000-01-01T00:00:00Z' = '2000-01-01T00:00:00Z'`, out: `true`},
|
||||
{in: `'2000-01-01T00:00:00.000000000Z' = '2000-01-01T00:00:00Z'`, out: `true`},
|
||||
{in: `'2000-01-01T00:00:00Z' <> '2000-01-01T00:00:00Z'`, out: `false`},
|
||||
{in: `'2000-01-01T00:00:00.000000000Z' <> '2000-01-01T00:00:00Z'`, out: `false`},
|
||||
{in: `'2000-01-01T00:00:00Z' < '2000-01-01T00:00:00Z' + 1h`, out: `true`},
|
||||
{in: `'2000-01-01T00:00:00.000000000Z' < '2000-01-01T00:00:00Z' + 1h`, out: `true`},
|
||||
{in: `'2000-01-01T00:00:00Z' <= '2000-01-01T00:00:00Z' + 1h`, out: `true`},
|
||||
{in: `'2000-01-01T00:00:00.000000000Z' <= '2000-01-01T00:00:00Z' + 1h`, out: `true`},
|
||||
{in: `'2000-01-01T00:00:00Z' > '2000-01-01T00:00:00Z' - 1h`, out: `true`},
|
||||
{in: `'2000-01-01T00:00:00.000000000Z' > '2000-01-01T00:00:00Z' - 1h`, out: `true`},
|
||||
{in: `'2000-01-01T00:00:00Z' >= '2000-01-01T00:00:00Z' - 1h`, out: `true`},
|
||||
{in: `'2000-01-01T00:00:00.000000000Z' >= '2000-01-01T00:00:00Z' - 1h`, out: `true`},
|
||||
{in: `'2000-01-01T00:00:00Z' - ('2000-01-01T00:00:00Z' - 60s)`, out: `1m`},
|
||||
{in: `'2000-01-01T00:00:00Z' AND '2000-01-01T00:00:00Z'`, out: `'2000-01-01T00:00:00Z' AND '2000-01-01T00:00:00Z'`},
|
||||
|
||||
// Duration literals.
|
||||
{in: `10m + 1h - 60s`, out: `69m`},
|
||||
{in: `(10m / 2) * 5`, out: `25m`},
|
||||
|
|
|
@ -91,6 +91,13 @@ func NewFloatDerivativeReducer(interval Interval, isNonNegative, ascending bool)
|
|||
|
||||
// AggregateFloat aggregates a point into the reducer and updates the current window.
|
||||
func (r *FloatDerivativeReducer) AggregateFloat(p *FloatPoint) {
|
||||
// Skip past a point when it does not advance the stream. A joined series
|
||||
// may have multiple points at the same time so we will discard anything
|
||||
// except the first point we encounter.
|
||||
if !r.curr.Nil && r.curr.Time == p.Time {
|
||||
return
|
||||
}
|
||||
|
||||
r.prev = r.curr
|
||||
r.curr = *p
|
||||
}
|
||||
|
@ -107,6 +114,9 @@ func (r *FloatDerivativeReducer) Emit() []FloatPoint {
|
|||
}
|
||||
value := diff / (float64(elapsed) / float64(r.interval.Duration))
|
||||
|
||||
// Mark this point as read by changing the previous point to nil.
|
||||
r.prev.Nil = true
|
||||
|
||||
// Drop negative values for non-negative derivatives.
|
||||
if r.isNonNegative && diff < 0 {
|
||||
return nil
|
||||
|
@ -138,6 +148,13 @@ func NewIntegerDerivativeReducer(interval Interval, isNonNegative, ascending boo
|
|||
|
||||
// AggregateInteger aggregates a point into the reducer and updates the current window.
|
||||
func (r *IntegerDerivativeReducer) AggregateInteger(p *IntegerPoint) {
|
||||
// Skip past a point when it does not advance the stream. A joined series
|
||||
// may have multiple points at the same time so we will discard anything
|
||||
// except the first point we encounter.
|
||||
if !r.curr.Nil && r.curr.Time == p.Time {
|
||||
return
|
||||
}
|
||||
|
||||
r.prev = r.curr
|
||||
r.curr = *p
|
||||
}
|
||||
|
@ -154,6 +171,9 @@ func (r *IntegerDerivativeReducer) Emit() []FloatPoint {
|
|||
}
|
||||
value := diff / (float64(elapsed) / float64(r.interval.Duration))
|
||||
|
||||
// Mark this point as read by changing the previous point to nil.
|
||||
r.prev.Nil = true
|
||||
|
||||
// Drop negative values for non-negative derivatives.
|
||||
if r.isNonNegative && diff < 0 {
|
||||
return nil
|
||||
|
@ -179,6 +199,13 @@ func NewFloatDifferenceReducer() *FloatDifferenceReducer {
|
|||
|
||||
// AggregateFloat aggregates a point into the reducer and updates the current window.
|
||||
func (r *FloatDifferenceReducer) AggregateFloat(p *FloatPoint) {
|
||||
// Skip past a point when it does not advance the stream. A joined series
|
||||
// may have multiple points at the same time so we will discard anything
|
||||
// except the first point we encounter.
|
||||
if !r.curr.Nil && r.curr.Time == p.Time {
|
||||
return
|
||||
}
|
||||
|
||||
r.prev = r.curr
|
||||
r.curr = *p
|
||||
}
|
||||
|
@ -188,6 +215,9 @@ func (r *FloatDifferenceReducer) Emit() []FloatPoint {
|
|||
if !r.prev.Nil {
|
||||
// Calculate the difference of successive points.
|
||||
value := r.curr.Value - r.prev.Value
|
||||
|
||||
// Mark this point as read by changing the previous point to nil.
|
||||
r.prev.Nil = true
|
||||
return []FloatPoint{{Time: r.curr.Time, Value: value}}
|
||||
}
|
||||
return nil
|
||||
|
@ -209,6 +239,13 @@ func NewIntegerDifferenceReducer() *IntegerDifferenceReducer {
|
|||
|
||||
// AggregateInteger aggregates a point into the reducer and updates the current window.
|
||||
func (r *IntegerDifferenceReducer) AggregateInteger(p *IntegerPoint) {
|
||||
// Skip past a point when it does not advance the stream. A joined series
|
||||
// may have multiple points at the same time so we will discard anything
|
||||
// except the first point we encounter.
|
||||
if !r.curr.Nil && r.curr.Time == p.Time {
|
||||
return
|
||||
}
|
||||
|
||||
r.prev = r.curr
|
||||
r.curr = *p
|
||||
}
|
||||
|
@ -218,6 +255,9 @@ func (r *IntegerDifferenceReducer) Emit() []IntegerPoint {
|
|||
if !r.prev.Nil {
|
||||
// Calculate the difference of successive points.
|
||||
value := r.curr.Value - r.prev.Value
|
||||
|
||||
// Mark this point as read by changing the previous point to nil.
|
||||
r.prev.Nil = true
|
||||
return []IntegerPoint{{Time: r.curr.Time, Value: value}}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -207,6 +207,7 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing
|
|||
}
|
||||
|
||||
var i int
|
||||
LOOP:
|
||||
for ; i < len(query.Statements); i++ {
|
||||
ctx.StatementID = i
|
||||
stmt := query.Statements[i]
|
||||
|
@ -219,6 +220,36 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing
|
|||
}
|
||||
}
|
||||
|
||||
// Do not let queries manually use the system measurements. If we find
|
||||
// one, return an error. This prevents a person from using the
|
||||
// measurement incorrectly and causing a panic.
|
||||
if stmt, ok := stmt.(*SelectStatement); ok {
|
||||
for _, s := range stmt.Sources {
|
||||
switch s := s.(type) {
|
||||
case *Measurement:
|
||||
if IsSystemName(s.Name) {
|
||||
command := "the appropriate meta command"
|
||||
switch s.Name {
|
||||
case "_fieldKeys":
|
||||
command = "SHOW FIELD KEYS"
|
||||
case "_measurements":
|
||||
command = "SHOW MEASUREMENTS"
|
||||
case "_series":
|
||||
command = "SHOW SERIES"
|
||||
case "_tagKeys":
|
||||
command = "SHOW TAG KEYS"
|
||||
case "_tags":
|
||||
command = "SHOW TAG VALUES"
|
||||
}
|
||||
results <- &Result{
|
||||
Err: fmt.Errorf("unable to use system source '%s': use %s instead", s.Name, command),
|
||||
}
|
||||
break LOOP
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Rewrite statements, if necessary.
|
||||
// This can occur on meta read statements which convert to SELECT statements.
|
||||
newStmt, err := RewriteStatement(stmt)
|
||||
|
|
|
@ -283,6 +283,56 @@ func TestQueryExecutor_Panic(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestQueryExecutor_InvalidSource(t *testing.T) {
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
|
||||
return errors.New("statement executed unexpectedly")
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range []struct {
|
||||
q string
|
||||
err string
|
||||
}{
|
||||
{
|
||||
q: `SELECT fieldKey, fieldType FROM _fieldKeys`,
|
||||
err: `unable to use system source '_fieldKeys': use SHOW FIELD KEYS instead`,
|
||||
},
|
||||
{
|
||||
q: `SELECT "name" FROM _measurements`,
|
||||
err: `unable to use system source '_measurements': use SHOW MEASUREMENTS instead`,
|
||||
},
|
||||
{
|
||||
q: `SELECT "key" FROM _series`,
|
||||
err: `unable to use system source '_series': use SHOW SERIES instead`,
|
||||
},
|
||||
{
|
||||
q: `SELECT tagKey FROM _tagKeys`,
|
||||
err: `unable to use system source '_tagKeys': use SHOW TAG KEYS instead`,
|
||||
},
|
||||
{
|
||||
q: `SELECT "key", value FROM _tags`,
|
||||
err: `unable to use system source '_tags': use SHOW TAG VALUES instead`,
|
||||
},
|
||||
} {
|
||||
q, err := influxql.ParseQuery(tt.q)
|
||||
if err != nil {
|
||||
t.Errorf("%d. unable to parse: %s", i, tt.q)
|
||||
continue
|
||||
}
|
||||
|
||||
results := e.ExecuteQuery(q, influxql.ExecutionOptions{}, nil)
|
||||
result := <-results
|
||||
if len(result.Series) != 0 {
|
||||
t.Errorf("%d. expected %d rows, got %d", 0, i, len(result.Series))
|
||||
}
|
||||
if result.Err == nil || result.Err.Error() != tt.err {
|
||||
t.Errorf("%d. unexpected error: %s", i, result.Err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func discardOutput(results <-chan *influxql.Result) {
|
||||
for range results {
|
||||
// Read all results and discard.
|
||||
|
|
|
@ -2205,6 +2205,54 @@ func TestSelect_Derivative_Desc_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSelect_Derivative_Duplicate_Float(t *testing.T) {
|
||||
var ic IteratorCreator
|
||||
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
return &FloatIterator{Points: []influxql.FloatPoint{
|
||||
{Name: "cpu", Time: 0 * Second, Value: 20},
|
||||
{Name: "cpu", Time: 0 * Second, Value: 19},
|
||||
{Name: "cpu", Time: 4 * Second, Value: 10},
|
||||
{Name: "cpu", Time: 4 * Second, Value: 3},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if !deep.Equal(a, [][]influxql.Point{
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}},
|
||||
}) {
|
||||
t.Fatalf("unexpected points: %s", spew.Sdump(a))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelect_Derivative_Duplicate_Integer(t *testing.T) {
|
||||
var ic IteratorCreator
|
||||
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
return &IntegerIterator{Points: []influxql.IntegerPoint{
|
||||
{Name: "cpu", Time: 0 * Second, Value: 20},
|
||||
{Name: "cpu", Time: 0 * Second, Value: 19},
|
||||
{Name: "cpu", Time: 4 * Second, Value: 10},
|
||||
{Name: "cpu", Time: 4 * Second, Value: 3},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if !deep.Equal(a, [][]influxql.Point{
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}},
|
||||
}) {
|
||||
t.Fatalf("unexpected points: %s", spew.Sdump(a))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelect_Difference_Float(t *testing.T) {
|
||||
var ic IteratorCreator
|
||||
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
|
@ -2257,6 +2305,54 @@ func TestSelect_Difference_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSelect_Difference_Duplicate_Float(t *testing.T) {
|
||||
var ic IteratorCreator
|
||||
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
return &FloatIterator{Points: []influxql.FloatPoint{
|
||||
{Name: "cpu", Time: 0 * Second, Value: 20},
|
||||
{Name: "cpu", Time: 0 * Second, Value: 19},
|
||||
{Name: "cpu", Time: 4 * Second, Value: 10},
|
||||
{Name: "cpu", Time: 4 * Second, Value: 3},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if !deep.Equal(a, [][]influxql.Point{
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -10}},
|
||||
}) {
|
||||
t.Fatalf("unexpected points: %s", spew.Sdump(a))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelect_Difference_Duplicate_Integer(t *testing.T) {
|
||||
var ic IteratorCreator
|
||||
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
return &IntegerIterator{Points: []influxql.IntegerPoint{
|
||||
{Name: "cpu", Time: 0 * Second, Value: 20},
|
||||
{Name: "cpu", Time: 0 * Second, Value: 19},
|
||||
{Name: "cpu", Time: 4 * Second, Value: 10},
|
||||
{Name: "cpu", Time: 4 * Second, Value: 3},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if !deep.Equal(a, [][]influxql.Point{
|
||||
{&influxql.IntegerPoint{Name: "cpu", Time: 4 * Second, Value: -10}},
|
||||
}) {
|
||||
t.Fatalf("unexpected points: %s", spew.Sdump(a))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelect_Elapsed_Float(t *testing.T) {
|
||||
var ic IteratorCreator
|
||||
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
|
|
|
@ -305,8 +305,6 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
|
|||
statistics = append(statistics, statistic)
|
||||
|
||||
statistics = m.gatherStatistics(statistics, tags)
|
||||
sort.Sort(Statistics(statistics)) // Unstable sort.
|
||||
|
||||
return statistics, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
|
@ -12,7 +13,6 @@ import (
|
|||
"net/http/pprof"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
@ -738,8 +738,31 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
m := make(map[string]*monitor.Statistic)
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
|
||||
fmt.Fprintln(w, "{")
|
||||
first := true
|
||||
if val := expvar.Get("cmdline"); val != nil {
|
||||
if !first {
|
||||
fmt.Fprintln(w, ",")
|
||||
}
|
||||
first = false
|
||||
fmt.Fprintf(w, "\"cmdline\": %s", val)
|
||||
}
|
||||
if val := expvar.Get("memstats"); val != nil {
|
||||
if !first {
|
||||
fmt.Fprintln(w, ",")
|
||||
}
|
||||
first = false
|
||||
fmt.Fprintf(w, "\"memstats\": %s", val)
|
||||
}
|
||||
|
||||
for _, s := range stats {
|
||||
val, err := json.Marshal(s)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Very hackily create a unique key.
|
||||
buf := bytes.NewBufferString(s.Name)
|
||||
if path, ok := s.Tags["path"]; ok {
|
||||
|
@ -766,32 +789,12 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
key := buf.String()
|
||||
|
||||
m[key] = s
|
||||
}
|
||||
|
||||
// Sort the keys to simulate /debug/vars output.
|
||||
keys := make([]string, 0, len(m))
|
||||
for k := range m {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
fmt.Fprintln(w, "{")
|
||||
first := true
|
||||
for _, key := range keys {
|
||||
// Marshal this statistic to JSON.
|
||||
out, err := json.Marshal(m[key])
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if !first {
|
||||
fmt.Fprintln(w, ",")
|
||||
}
|
||||
first = false
|
||||
fmt.Fprintf(w, "%q: ", key)
|
||||
w.Write(bytes.TrimSpace(out))
|
||||
w.Write(bytes.TrimSpace(val))
|
||||
}
|
||||
fmt.Fprintln(w, "\n}")
|
||||
}
|
||||
|
|
|
@ -198,6 +198,7 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
|
|||
statWriteReqOK: atomic.LoadInt64(&s.stats.WriteReqOK),
|
||||
statWriteReqErr: atomic.LoadInt64(&s.stats.WriteReqErr),
|
||||
statSeriesCreate: seriesN,
|
||||
statFieldsCreate: atomic.LoadInt64(&s.stats.FieldsCreated),
|
||||
statWritePointsErr: atomic.LoadInt64(&s.stats.WritePointsErr),
|
||||
statWritePointsOK: atomic.LoadInt64(&s.stats.WritePointsOK),
|
||||
statWriteBytes: atomic.LoadInt64(&s.stats.BytesWritten),
|
||||
|
|
Loading…
Reference in New Issue