From 512d6ac0507cd507aef4e129befc9aa2e2fb0fd4 Mon Sep 17 00:00:00 2001 From: David Norton Date: Wed, 7 Oct 2015 20:07:43 -0400 Subject: [PATCH] fix #4280: only drop points matching WHERE clause --- cmd/influxd/run/server_test.go | 48 +++++++++++++++++++ influxql/ast.go | 88 ++++++++++++++++------------------ influxql/ast_test.go | 4 +- tsdb/meta.go | 28 +++++++++-- tsdb/query_executor.go | 75 +++++++++++++++++------------ 5 files changed, 161 insertions(+), 82 deletions(-) diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 57f7ed8493..895f8f491c 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -497,6 +497,24 @@ func TestServer_Query_DropSeriesFromRegex(t *testing.T) { exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, + &Query{ + name: "Drop series with WHERE field should error", + command: `DROP SERIES FROM c WHERE val > 50.0`, + exp: `{"results":[{"error":"DROP SERIES doesn't support fields in WHERE clause"}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "make sure DROP SERIES with field in WHERE didn't delete data", + command: `SHOW SERIES`, + exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "Drop series with WHERE time should error", + command: `DROP SERIES FROM c WHERE time > now() - 1d`, + exp: `{"results":[{"error":"DROP SERIES doesn't support time in WHERE clause"}]}`, + params: url.Values{"db": []string{"db0"}}, + }, }...) for i, query := range test.queries { @@ -4255,6 +4273,18 @@ func TestServer_Query_ShowSeries(t *testing.T) { exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01,region=useast","server01","useast"],["cpu,host=server02,region=useast","server02","useast"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, + &Query{ + name: `show series with WHERE time should fail`, + command: "SHOW SERIES WHERE time > now() - 1h", + exp: `{"results":[{"error":"SHOW SERIES doesn't support time in WHERE clause"}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: `show series with WHERE field should fail`, + command: "SHOW SERIES WHERE value > 10.0", + exp: `{"results":[{"error":"SHOW SERIES doesn't support fields in WHERE clause"}]}`, + params: url.Values{"db": []string{"db0"}}, + }, }...) for i, query := range test.queries { @@ -4319,6 +4349,12 @@ func TestServer_Query_ShowMeasurements(t *testing.T) { exp: `{"results":[{"series":[{"name":"measurements","columns":["name"],"values":[["cpu"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, + &Query{ + name: `show measurements with time in WHERE clauses errors`, + command: `SHOW MEASUREMENTS WHERE time > now() - 1h`, + exp: `{"results":[{"error":"SHOW MEASUREMENTS doesn't support time in WHERE clause"}]}`, + params: url.Values{"db": []string{"db0"}}, + }, }...) for i, query := range test.queries { @@ -4389,6 +4425,12 @@ func TestServer_Query_ShowTagKeys(t *testing.T) { exp: `{"results":[{}]}`, params: url.Values{"db": []string{"db0"}}, }, + &Query{ + name: "show tag keys with time in WHERE clause errors", + command: "SHOW TAG KEYS FROM cpu WHERE time > now() - 1h", + exp: `{"results":[{"error":"SHOW TAG KEYS doesn't support time in WHERE clause"}]}`, + params: url.Values{"db": []string{"db0"}}, + }, &Query{ name: "show tag values with key", command: "SHOW TAG VALUES WITH KEY = host", @@ -4425,6 +4467,12 @@ func TestServer_Query_ShowTagKeys(t *testing.T) { exp: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server01"],["server02"],["server03"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, + &Query{ + name: `show tag values with key and time in WHERE clause should error`, + command: `SHOW TAG VALUES WITH KEY = host WHERE time > now() - 1h`, + exp: `{"results":[{"error":"SHOW SERIES doesn't support time in WHERE clause"}]}`, + params: url.Values{"db": []string{"db0"}}, + }, }...) for i, query := range test.queries { diff --git a/influxql/ast.go b/influxql/ast.go index 9c3d09f6c6..2874f5a4b7 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -1007,31 +1007,6 @@ func (s *SelectStatement) RequiredPrivileges() ExecutionPrivileges { return ep } -// OnlyTimeDimensions returns true if the statement has a where clause with only time constraints -func (s *SelectStatement) OnlyTimeDimensions() bool { - return s.walkForTime(s.Condition) -} - -// walkForTime is called by the OnlyTimeDimensions method to walk the where clause to determine if -// the only things specified are based on time -func (s *SelectStatement) walkForTime(node Node) bool { - switch n := node.(type) { - case *BinaryExpr: - if n.Op == AND || n.Op == OR { - return s.walkForTime(n.LHS) && s.walkForTime(n.RHS) - } - if ref, ok := n.LHS.(*VarRef); ok && strings.ToLower(ref.Val) == "time" { - return true - } - return false - case *ParenExpr: - // walk down the tree - return s.walkForTime(n.Expr) - default: - return false - } -} - // HasWildcard returns whether or not the select statement has at least 1 wildcard func (s *SelectStatement) HasWildcard() bool { return s.HasFieldWildcard() || s.HasDimensionWildcard() @@ -1062,26 +1037,6 @@ func (s *SelectStatement) HasDimensionWildcard() bool { return false } -// hasTimeDimensions returns whether or not the select statement has at least 1 -// where condition with time as the condition -func (s *SelectStatement) hasTimeDimensions(node Node) bool { - switch n := node.(type) { - case *BinaryExpr: - if n.Op == AND || n.Op == OR { - return s.hasTimeDimensions(n.LHS) || s.hasTimeDimensions(n.RHS) - } - if ref, ok := n.LHS.(*VarRef); ok && strings.ToLower(ref.Val) == "time" { - return true - } - return false - case *ParenExpr: - // walk down the tree - return s.hasTimeDimensions(n.Expr) - default: - return false - } -} - func (s *SelectStatement) validate(tr targetRequirement) error { if err := s.validateFields(); err != nil { return err @@ -1280,7 +1235,7 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error { // If we have an aggregate function with a group by time without a where clause, it's an invalid statement if tr == targetNotRequired { // ignore create continuous query statements - if !s.IsRawQuery && groupByDuration > 0 && !s.hasTimeDimensions(s.Condition) { + if !s.IsRawQuery && groupByDuration > 0 && !HasTimeExpr(s.Condition) { return fmt.Errorf("aggregate functions with GROUP BY time require a WHERE time clause") } } @@ -2724,6 +2679,47 @@ func CloneExpr(expr Expr) Expr { panic("unreachable") } +// HasTimeExpr returns true if the expression has a time term. +func HasTimeExpr(expr Expr) bool { + switch n := expr.(type) { + case *BinaryExpr: + if n.Op == AND || n.Op == OR { + return HasTimeExpr(n.LHS) || HasTimeExpr(n.RHS) + } + if ref, ok := n.LHS.(*VarRef); ok && strings.ToLower(ref.Val) == "time" { + return true + } + return false + case *ParenExpr: + // walk down the tree + return HasTimeExpr(n.Expr) + default: + return false + } +} + +// OnlyTimeExpr returns true if the expression only has time constraints. +func OnlyTimeExpr(expr Expr) bool { + if expr == nil { + return false + } + switch n := expr.(type) { + case *BinaryExpr: + if n.Op == AND || n.Op == OR { + return OnlyTimeExpr(n.LHS) && OnlyTimeExpr(n.RHS) + } + if ref, ok := n.LHS.(*VarRef); ok && strings.ToLower(ref.Val) == "time" { + return true + } + return false + case *ParenExpr: + // walk down the tree + return OnlyTimeExpr(n.Expr) + default: + return false + } +} + // TimeRange returns the minimum and maximum times specified by an expression. // Returns zero times if there is no bound. func TimeRange(expr Expr) (min, max time.Time) { diff --git a/influxql/ast_test.go b/influxql/ast_test.go index ca28361118..430c9a8473 100644 --- a/influxql/ast_test.go +++ b/influxql/ast_test.go @@ -521,7 +521,7 @@ func TestTimeRange(t *testing.T) { } // Ensure that we see if a where clause has only time limitations -func TestSelectStatement_OnlyTimeDimensions(t *testing.T) { +func TestOnlyTimeExpr(t *testing.T) { var tests = []struct { stmt string exp bool @@ -554,7 +554,7 @@ func TestSelectStatement_OnlyTimeDimensions(t *testing.T) { if err != nil { t.Fatalf("invalid statement: %q: %s", tt.stmt, err) } - if stmt.(*influxql.SelectStatement).OnlyTimeDimensions() != tt.exp { + if influxql.OnlyTimeExpr(stmt.(*influxql.SelectStatement).Condition) != tt.exp { t.Fatalf("%d. expected statement to return only time dimension to be %t: %s", i, tt.exp, tt.stmt) } } diff --git a/tsdb/meta.go b/tsdb/meta.go index 1c3f2d8127..059b3c01a0 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -603,7 +603,7 @@ func (m *Measurement) DropSeries(seriesID uint64) { // filters walks the where clause of a select statement and returns a map with all series ids // matching the where clause and any filter expression that should be applied to each func (m *Measurement) filters(stmt *influxql.SelectStatement) (map[uint64]influxql.Expr, error) { - if stmt.Condition == nil || stmt.OnlyTimeDimensions() { + if stmt.Condition == nil || influxql.OnlyTimeExpr(stmt.Condition) { seriesIdsToExpr := make(map[uint64]influxql.Expr) for _, id := range m.seriesIDs { seriesIdsToExpr[id] = nil @@ -699,7 +699,7 @@ func (m *Measurement) TagSets(stmt *influxql.SelectStatement, dimensions []strin } // mergeSeriesFilters merges two sets of filter expressions and culls series IDs. -func mergeSeriesFilters(op influxql.Token, ids SeriesIDs, lfilters, rfilters map[uint64]influxql.Expr) (SeriesIDs, map[uint64]influxql.Expr) { +func mergeSeriesFilters(op influxql.Token, ids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) { // Create a map to hold the final set of series filter expressions. filters := make(map[uint64]influxql.Expr, 0) // Resulting list of series IDs @@ -833,10 +833,30 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex return nil, nil, nil } +// FilterExprs represents a map of series IDs to filter expressions. +type FilterExprs map[uint64]influxql.Expr + +// DeleteBoolLiteralTrues deletes all elements whose filter expression is a boolean literal true. +func (fe FilterExprs) DeleteBoolLiteralTrues() { + for id, expr := range fe { + if e, ok := expr.(*influxql.BooleanLiteral); ok && e.Val == true { + delete(fe, id) + } + } +} + +// Len returns the number of elements. +func (fe FilterExprs) Len() int { + if fe == nil { + return 0 + } + return len(fe) +} + // walkWhereForSeriesIds recursively walks the WHERE clause and returns an ordered set of series IDs and // a map from those series IDs to filter expressions that should be used to limit points returned in // the final query result. -func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, map[uint64]influxql.Expr, error) { +func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error) { switch n := expr.(type) { case *influxql.BinaryExpr: switch n.Op { @@ -847,7 +867,7 @@ func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, map[ return nil, nil, err } - filters := map[uint64]influxql.Expr{} + filters := FilterExprs{} for _, id := range ids { filters[id] = expr } diff --git a/tsdb/query_executor.go b/tsdb/query_executor.go index 71505e9c45..e600cd44f7 100644 --- a/tsdb/query_executor.go +++ b/tsdb/query_executor.go @@ -416,6 +416,11 @@ func (q *QueryExecutor) executeDropMeasurementStatement(stmt *influxql.DropMeasu // executeDropSeriesStatement removes all series from the local store that match the drop query func (q *QueryExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string) *influxql.Result { + // Check for time in WHERE clause (not supported). + if influxql.HasTimeExpr(stmt.Condition) { + return &influxql.Result{Err: errors.New("DROP SERIES doesn't support time in WHERE clause")} + } + // Find the database. db := q.Store.DatabaseIndex(database) if db == nil { @@ -438,12 +443,21 @@ func (q *QueryExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStat var seriesKeys []string for _, m := range measurements { var ids SeriesIDs + var filters FilterExprs if stmt.Condition != nil { // Get series IDs that match the WHERE clause. - ids, _, err = m.walkWhereForSeriesIds(stmt.Condition) + ids, filters, err = m.walkWhereForSeriesIds(stmt.Condition) if err != nil { return &influxql.Result{Err: err} } + + // Delete boolean literal true filter expressions. + filters.DeleteBoolLiteralTrues() + + // Check for unsupported field filters. + if filters.Len() > 0 { + return &influxql.Result{Err: errors.New("DROP SERIES doesn't support fields in WHERE clause")} + } } else { // No WHERE clause so get all series IDs for this measurement. ids = m.seriesIDs @@ -465,6 +479,11 @@ func (q *QueryExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStat } func (q *QueryExecutor) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) *influxql.Result { + // Check for time in WHERE clause (not supported). + if influxql.HasTimeExpr(stmt.Condition) { + return &influxql.Result{Err: errors.New("SHOW SERIES doesn't support time in WHERE clause")} + } + // Find the database. db := q.Store.DatabaseIndex(database) if db == nil { @@ -491,20 +510,27 @@ func (q *QueryExecutor) executeShowSeriesStatement(stmt *influxql.ShowSeriesStat // Loop through measurements to build result. One result row / measurement. for _, m := range measurements { var ids SeriesIDs + var filters FilterExprs if stmt.Condition != nil { // Get series IDs that match the WHERE clause. - ids, _, err = m.walkWhereForSeriesIds(stmt.Condition) + ids, filters, err = m.walkWhereForSeriesIds(stmt.Condition) if err != nil { return &influxql.Result{Err: err} } + // Delete boolean literal true filter expressions. + filters.DeleteBoolLiteralTrues() + + // Check for unsupported field filters. + if filters.Len() > 0 { + return &influxql.Result{Err: errors.New("SHOW SERIES doesn't support fields in WHERE clause")} + } + // If no series matched, then go to the next measurement. if len(ids) == 0 { continue } - - // TODO: check return of walkWhereForSeriesIds for fields } else { // No WHERE clause so get all series IDs for this measurement. ids = m.seriesIDs @@ -590,6 +616,11 @@ func (q *QueryExecutor) planStatement(stmt influxql.Statement, database string, // PlanShowMeasurements creates an execution plan for a SHOW TAG KEYS statement and returns an Executor. func (q *QueryExecutor) PlanShowMeasurements(stmt *influxql.ShowMeasurementsStatement, database string, chunkSize int) (Executor, error) { + // Check for time in WHERE clause (not supported). + if influxql.HasTimeExpr(stmt.Condition) { + return nil, errors.New("SHOW MEASUREMENTS doesn't support time in WHERE clause") + } + // Get the database info. di, err := q.MetaStore.Database(database) if err != nil { @@ -621,6 +652,11 @@ func (q *QueryExecutor) PlanShowMeasurements(stmt *influxql.ShowMeasurementsStat // PlanShowTagKeys creates an execution plan for a SHOW MEASUREMENTS statement and returns an Executor. func (q *QueryExecutor) PlanShowTagKeys(stmt *influxql.ShowTagKeysStatement, database string, chunkSize int) (Executor, error) { + // Check for time in WHERE clause (not supported). + if influxql.HasTimeExpr(stmt.Condition) { + return nil, errors.New("SHOW TAG KEYS doesn't support time in WHERE clause") + } + // Get the database info. di, err := q.MetaStore.Database(database) if err != nil { @@ -677,33 +713,12 @@ func (q *QueryExecutor) executeStatement(statementID int, stmt influxql.Statemen return nil } -func (q *QueryExecutor) executeShowMeasurementsStatement(statementID int, stmt *influxql.ShowMeasurementsStatement, database string, results chan *influxql.Result, chunkSize int) error { // Plan statement execution. - e, err := q.PlanShowMeasurements(stmt, database, chunkSize) - if err != nil { - return err - } - - // Execute plan. - ch := e.Execute() - - // Stream results from the channel. We should send an empty result if nothing comes through. - resultSent := false - for row := range ch { - if row.Err != nil { - return row.Err - } - resultSent = true - results <- &influxql.Result{StatementID: statementID, Series: []*models.Row{row}} - } - - if !resultSent { - results <- &influxql.Result{StatementID: statementID, Series: make([]*models.Row, 0)} - } - - return nil -} - func (q *QueryExecutor) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) *influxql.Result { + // Check for time in WHERE clause (not supported). + if influxql.HasTimeExpr(stmt.Condition) { + return &influxql.Result{Err: errors.New("SHOW SERIES doesn't support time in WHERE clause")} + } + // Find the database. db := q.Store.DatabaseIndex(database) if db == nil {