fix #2531: make WHERE with multiple OR terms work
parent
de77ee1896
commit
858648becf
|
@ -123,6 +123,8 @@ func (cmd *BackupCommand) nextPath(path string) (string, error) {
|
|||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// download downloads a snapshot from a host to a given path.
|
||||
|
|
|
@ -1082,10 +1082,11 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
|
|||
{"name": "where_events", "time": "2009-11-10T23:00:02Z","fields": {"foo": "bar"}, "tags": {"tennant": "paul"}},
|
||||
{"name": "where_events", "time": "2009-11-10T23:00:03Z","fields": {"foo": "baz"}, "tags": {"tennant": "paul"}},
|
||||
{"name": "where_events", "time": "2009-11-10T23:00:04Z","fields": {"foo": "bat"}, "tags": {"tennant": "paul"}},
|
||||
{"name": "where_events", "time": "2009-11-10T23:00:05Z","fields": {"foo": "bar"}, "tags": {"tennant": "todd"}}
|
||||
{"name": "where_events", "time": "2009-11-10T23:00:05Z","fields": {"foo": "bar"}, "tags": {"tennant": "todd"}},
|
||||
{"name": "where_events", "time": "2009-11-10T23:00:06Z","fields": {"foo": "bap"}, "tags": {"tennant": "david"}}
|
||||
]}`,
|
||||
query: `select foo from "%DB%"."%RP%".where_events where tennant = 'paul' AND time > 1s AND (foo = 'bar' OR foo = 'baz')`,
|
||||
expected: `{"results":[{"series":[{"name":"where_events","columns":["time","foo"],"values":[["2009-11-10T23:00:02Z","bar"],["2009-11-10T23:00:03Z","baz"]]}]}]}`,
|
||||
query: `select foo from "%DB%"."%RP%".where_events where (tennant = 'paul' OR tennant = 'david') AND time > 1s AND (foo = 'bar' OR foo = 'baz' OR foo = 'bap')`,
|
||||
expected: `{"results":[{"series":[{"name":"where_events","columns":["time","foo"],"values":[["2009-11-10T23:00:02Z","bar"],["2009-11-10T23:00:03Z","baz"],["2009-11-10T23:00:06Z","bap"]]}]}]}`,
|
||||
},
|
||||
{
|
||||
name: "where on tag that should be double quoted but isn't",
|
||||
|
|
215
database.go
215
database.go
|
@ -282,16 +282,16 @@ func (m *Measurement) seriesByTags(tags map[string]string) *Series {
|
|||
// filters walks the where clause of a select statement and returns a map with all series ids
|
||||
// matching the where clause and any filter expression that should be applied to each
|
||||
func (m *Measurement) filters(stmt *influxql.SelectStatement) (map[uint64]influxql.Expr, error) {
|
||||
seriesIdsToExpr := make(map[uint64]influxql.Expr)
|
||||
|
||||
if stmt.Condition == nil || stmt.OnlyTimeDimensions() {
|
||||
seriesIdsToExpr := make(map[uint64]influxql.Expr)
|
||||
for _, id := range m.seriesIDs {
|
||||
seriesIdsToExpr[id] = nil
|
||||
}
|
||||
return seriesIdsToExpr, nil
|
||||
}
|
||||
|
||||
ids, _, _, err := m.walkWhereForSeriesIds(stmt.Condition, seriesIdsToExpr)
|
||||
ids, seriesIdsToExpr, err := m.walkWhereForSeriesIds(stmt.Condition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -359,35 +359,36 @@ func (m *Measurement) tagSets(stmt *influxql.SelectStatement, dimensions []strin
|
|||
return a, nil
|
||||
}
|
||||
|
||||
// idsForExpr will return a collection of series ids, a bool indicating if the result should be
|
||||
// used (it'll be false if it's a time expr) and a field expression if the passed in expression is against a field.
|
||||
func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, bool, influxql.Expr, error) {
|
||||
// idsForExpr will return a collection of series ids and a filter expression that should
|
||||
// be used to filter points from those series.
|
||||
func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, influxql.Expr, error) {
|
||||
name, ok := n.LHS.(*influxql.VarRef)
|
||||
value := n.RHS
|
||||
if !ok {
|
||||
name, ok = n.RHS.(*influxql.VarRef)
|
||||
if !ok {
|
||||
return nil, false, nil, fmt.Errorf("invalid expression: %s", n.String())
|
||||
return nil, nil, fmt.Errorf("invalid expression: %s", n.String())
|
||||
}
|
||||
value = n.LHS
|
||||
}
|
||||
|
||||
// ignore time literals
|
||||
// For time literals, return all series IDs and "true" as the filter.
|
||||
if _, ok := value.(*influxql.TimeLiteral); ok || name.Val == "time" {
|
||||
return nil, false, nil, nil
|
||||
return m.seriesIDs, &influxql.BooleanLiteral{Val: true}, nil
|
||||
}
|
||||
|
||||
// if it's a field we can't collapse it so we have to look at all series ids for this
|
||||
// For fields, return all series IDs from this measurement and return
|
||||
// the expression passed in, as the filter.
|
||||
if m.FieldByName(name.Val) != nil {
|
||||
return m.seriesIDs, true, n, nil
|
||||
return m.seriesIDs, n, nil
|
||||
}
|
||||
|
||||
tagVals, ok := m.seriesByTagKeyValue[name.Val]
|
||||
if !ok {
|
||||
return nil, true, nil, nil
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
// if we're looking for series with specific tag values
|
||||
// if we're looking for series with a specific tag value
|
||||
if str, ok := value.(*influxql.StringLiteral); ok {
|
||||
var ids seriesIDs
|
||||
|
||||
|
@ -397,10 +398,10 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, bool, influ
|
|||
} else if n.Op == influxql.NEQ {
|
||||
ids = m.seriesIDs.reject(tagVals[str.Val])
|
||||
}
|
||||
return ids, true, nil, nil
|
||||
return ids, &influxql.BooleanLiteral{Val: true}, nil
|
||||
}
|
||||
|
||||
// if we're looking for series with tag values that match a regex
|
||||
// if we're looking for series with a tag value that matches a regex
|
||||
if re, ok := value.(*influxql.RegexLiteral); ok {
|
||||
var ids seriesIDs
|
||||
|
||||
|
@ -419,111 +420,134 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, bool, influ
|
|||
ids = ids.reject(tagVals[k])
|
||||
}
|
||||
}
|
||||
return ids, true, nil, nil
|
||||
return ids, &influxql.BooleanLiteral{Val: true}, nil
|
||||
}
|
||||
|
||||
return nil, true, nil, nil
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
// walkWhereForSeriesIds will recursively walk the where clause and return a collection of series ids, a boolean indicating if this return
|
||||
// value should be included in the resulting set, and an expression if the return is a field expression.
|
||||
// The map that it takes maps each series id to the field expression that should be used to evaluate it when iterating over its cursor.
|
||||
// Series that have no field expressions won't be in the map
|
||||
func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr, filters map[uint64]influxql.Expr) (seriesIDs, bool, influxql.Expr, error) {
|
||||
// mergeSeriesFilters merges two sets of filter expressions and culls series IDs.
|
||||
func mergeSeriesFilters(op influxql.Token, ids seriesIDs, lfilters, rfilters map[uint64]influxql.Expr) (seriesIDs, map[uint64]influxql.Expr) {
|
||||
// Create a map to hold the final set of series filter expressions.
|
||||
filters := make(map[uint64]influxql.Expr, 0)
|
||||
// Series IDs that weren't culled
|
||||
var outIDs seriesIDs
|
||||
|
||||
// Combining logic:
|
||||
// +==========+==========+==========+=======================+=======================+
|
||||
// | operator | LHS | RHS | intermediate expr | result filter |
|
||||
// +==========+==========+==========+=======================+=======================+
|
||||
// | | <nil> | <r-expr> | true OR <r-expr> | true |
|
||||
// | |----------+----------+-----------------------+-----------------------+
|
||||
// | OR | <l-expr> | <nil> | <l-expr> OR true | true |
|
||||
// | |----------+----------+-----------------------+-----------------------+
|
||||
// | | <l-expr> | <r-expr> | <l-expr> OR <r-expr> | <l-expr> OR <r-expr> |
|
||||
// +----------+----------+----------+-----------------------+-----------------------+
|
||||
// | | <nil> | <r-expr> | false AND <r-expr> | false |
|
||||
// | |----------+----------+-----------------------+-----------------------+
|
||||
// | AND | <l-expr> | <nil> | <l-expr> AND false | false |
|
||||
// | |----------+----------+-----------------------+-----------------------+
|
||||
// | | <l-expr> | <r-expr> | <l-expr> AND <r-expr> | <l-expr> AND <r-expr> |
|
||||
// +----------+----------+----------+-----------------------+-----------------------+
|
||||
|
||||
def := false
|
||||
if op == influxql.OR {
|
||||
def = true
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
// Get LHS and RHS filter expressions for this series ID.
|
||||
lfilter, rfilter := lfilters[id], rfilters[id]
|
||||
|
||||
// Set default filters if either LHS or RHS expressions were nil.
|
||||
if lfilter == nil {
|
||||
lfilter = &influxql.BooleanLiteral{Val: def}
|
||||
}
|
||||
if rfilter == nil {
|
||||
rfilter = &influxql.BooleanLiteral{Val: def}
|
||||
}
|
||||
|
||||
// Create the intermediate filter expression for this series ID.
|
||||
be := &influxql.BinaryExpr{
|
||||
Op: op,
|
||||
LHS: lfilter,
|
||||
RHS: rfilter,
|
||||
}
|
||||
|
||||
// Reduce / simplify the intermediate expression.
|
||||
expr := influxql.Reduce(be, nil)
|
||||
|
||||
// Check if the reduced filter expression is a literal false,
|
||||
// exclude this series ID and filter expression from the results.
|
||||
if op == influxql.AND {
|
||||
if b, ok := expr.(*influxql.BooleanLiteral); ok && !b.Val {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Store the series ID and merged expression in the final results.
|
||||
filters[id] = expr
|
||||
outIDs = append(outIDs, id)
|
||||
}
|
||||
|
||||
return outIDs, filters
|
||||
}
|
||||
|
||||
// walkWhereForSeriesIds recursively walks the WHERE clause and returns an ordered set of series IDs and
|
||||
// a map from those series IDs to filter expressions that should be used to limit points returned in
|
||||
// the final query result.
|
||||
func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (seriesIDs, map[uint64]influxql.Expr, error) {
|
||||
switch n := expr.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch n.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.LT, influxql.LTE, influxql.GT, influxql.GTE, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
// if it's a compare, then it's either a field expression or against a tag. we can return this
|
||||
ids, shouldInclude, expr, err := m.idsForExpr(n)
|
||||
// Get the series IDs and filter expression for the tag or field comparison.
|
||||
ids, expr, err := m.idsForExpr(n)
|
||||
if err != nil {
|
||||
return nil, false, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
filters := map[uint64]influxql.Expr{}
|
||||
for _, id := range ids {
|
||||
filters[id] = expr
|
||||
}
|
||||
|
||||
return ids, shouldInclude, expr, nil
|
||||
return ids, filters, nil
|
||||
case influxql.AND, influxql.OR:
|
||||
// if it's an AND or OR we need to union or intersect the results
|
||||
// Get the series IDs and filter expressions for the LHS.
|
||||
lids, lfilters, err := m.walkWhereForSeriesIds(n.LHS)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Get the series IDs and filter expressions for the RHS.
|
||||
rids, rfilters, err := m.walkWhereForSeriesIds(n.RHS)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Combine the series IDs from the LHS and RHS.
|
||||
var ids seriesIDs
|
||||
l, il, lexpr, err := m.walkWhereForSeriesIds(n.LHS, filters)
|
||||
if err != nil {
|
||||
return nil, false, nil, err
|
||||
switch n.Op {
|
||||
case influxql.AND:
|
||||
ids = lids.intersect(rids)
|
||||
case influxql.OR:
|
||||
ids = lids.union(rids)
|
||||
}
|
||||
|
||||
r, ir, rexpr, err := m.walkWhereForSeriesIds(n.RHS, filters)
|
||||
if err != nil {
|
||||
return nil, false, nil, err
|
||||
}
|
||||
// Merge the filter expressions for the LHS and RHS.
|
||||
ids, filters := mergeSeriesFilters(n.Op, ids, lfilters, rfilters)
|
||||
|
||||
if il && ir { // we should include both the LHS and RHS of the BinaryExpr in the return
|
||||
if n.Op == influxql.AND {
|
||||
ids = l.intersect(r)
|
||||
} else if n.Op == influxql.OR {
|
||||
ids = l.union(r)
|
||||
}
|
||||
} else if !il && !ir { // we don't need to include either so return nothing
|
||||
return nil, false, nil, nil
|
||||
} else if il { // just include the left side
|
||||
ids = l
|
||||
} else { // just include the right side
|
||||
ids = r
|
||||
}
|
||||
|
||||
if n.Op == influxql.OR && il && ir && (lexpr == nil || rexpr == nil) {
|
||||
// if it's an OR and we're going to include both sides and one of those expression is nil,
|
||||
// we need to clear out restrictive filters on series that don't need them anymore
|
||||
idsToClear := l.intersect(r)
|
||||
for _, id := range idsToClear {
|
||||
delete(filters, id)
|
||||
}
|
||||
} else {
|
||||
// put the LHS field expression into the filters
|
||||
if lexpr != nil {
|
||||
for _, id := range ids {
|
||||
f := filters[id]
|
||||
if f == nil {
|
||||
filters[id] = lexpr
|
||||
} else {
|
||||
filters[id] = &influxql.BinaryExpr{LHS: f, RHS: lexpr, Op: n.Op}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// put the RHS field expression into the filters
|
||||
if rexpr != nil {
|
||||
for _, id := range ids {
|
||||
f := filters[id]
|
||||
if f == nil {
|
||||
filters[id] = rexpr
|
||||
} else {
|
||||
filters[id] = &influxql.BinaryExpr{LHS: f, RHS: rexpr, Op: n.Op}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if the op is AND and we include both, clear out any of the non-intersecting ids.
|
||||
// that is, filters that are no longer part of the end result set
|
||||
if n.Op == influxql.AND && il && ir {
|
||||
filtersToClear := l.union(r).reject(ids)
|
||||
for _, id := range filtersToClear {
|
||||
delete(filters, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// finally return the ids and say that we should include them
|
||||
return ids, true, nil, nil
|
||||
return ids, filters, nil
|
||||
}
|
||||
|
||||
return m.idsForExpr(n)
|
||||
ids, _, err := m.idsForExpr(n)
|
||||
return ids, nil, err
|
||||
case *influxql.ParenExpr:
|
||||
// walk down the tree
|
||||
return m.walkWhereForSeriesIds(n.Expr, filters)
|
||||
return m.walkWhereForSeriesIds(n.Expr)
|
||||
default:
|
||||
return nil, false, nil, nil
|
||||
return nil, nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -596,8 +620,7 @@ func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (seriesIDs, error
|
|||
}
|
||||
|
||||
// Get series IDs that match the WHERE clause.
|
||||
filters := map[uint64]influxql.Expr{}
|
||||
ids, _, _, err := m.walkWhereForSeriesIds(expr, filters)
|
||||
ids, _, err := m.walkWhereForSeriesIds(expr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -2578,8 +2578,7 @@ func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement,
|
|||
var ids seriesIDs
|
||||
if stmt.Condition != nil {
|
||||
// Get series IDs that match the WHERE clause.
|
||||
filters := map[uint64]influxql.Expr{}
|
||||
ids, _, _, err = m.walkWhereForSeriesIds(stmt.Condition, filters)
|
||||
ids, _, err = m.walkWhereForSeriesIds(stmt.Condition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -2630,8 +2629,7 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
|
|||
|
||||
if stmt.Condition != nil {
|
||||
// Get series IDs that match the WHERE clause.
|
||||
filters := map[uint64]influxql.Expr{}
|
||||
ids, _, _, err = m.walkWhereForSeriesIds(stmt.Condition, filters)
|
||||
ids, _, err = m.walkWhereForSeriesIds(stmt.Condition)
|
||||
if err != nil {
|
||||
return &Result{Err: err}
|
||||
}
|
||||
|
@ -2848,8 +2846,7 @@ func (s *Server) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesState
|
|||
|
||||
if stmt.Condition != nil {
|
||||
// Get series IDs that match the WHERE clause.
|
||||
filters := map[uint64]influxql.Expr{}
|
||||
ids, _, _, err = m.walkWhereForSeriesIds(stmt.Condition, filters)
|
||||
ids, _, err = m.walkWhereForSeriesIds(stmt.Condition)
|
||||
if err != nil {
|
||||
return &Result{Err: err}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue