Add tag filtering.

This commit adds tag filtering via the `WHERE` clause.

Example:

	SELECT sum(value) FROM cpu GROUP BY time(1h), host WHERE region = 'us-west'
pull/1257/head
Ben Johnson 2014-12-21 13:22:04 -07:00
parent a034dab697
commit eced3a347c
2 changed files with 125 additions and 1 deletions

View File

@ -19,6 +19,7 @@ type DB interface {
SeriesTagValues(seriesID uint32, keys []string) []string SeriesTagValues(seriesID uint32, keys []string) []string
// Returns the id and data type for a series field. // Returns the id and data type for a series field.
// Returns id of zero if not a field.
Field(name, field string) (fieldID uint8, typ DataType) Field(name, field string) (fieldID uint8, typ DataType)
// Returns an iterator given a series data id, field id, & field data type. // Returns an iterator given a series data id, field id, & field data type.
@ -158,7 +159,14 @@ func (p *Planner) planCall(e *Executor, c *Call) (processor, error) {
return nil, err return nil, err
} }
name := sub.Source.(*Measurement).Name name := sub.Source.(*Measurement).Name
tags := make(map[string]string) // TODO: Extract tags.
// Extract tags from conditional.
tags := make(map[string]string)
condition, err := p.extractTags(name, sub.Condition, tags)
if err != nil {
return nil, err
}
sub.Condition = condition
// Find field. // Find field.
fname := strings.TrimPrefix(ref.Val, name+".") fname := strings.TrimPrefix(ref.Val, name+".")
@ -222,6 +230,74 @@ func (p *Planner) planBinaryExpr(e *Executor, expr *BinaryExpr) (processor, erro
return newBinaryExprEvaluator(e, expr.Op, lhs, rhs), nil return newBinaryExprEvaluator(e, expr.Op, lhs, rhs), nil
} }
// extractTags extracts a tag key/value map from a statement.
// Extracted tags are removed from the statement.
func (p *Planner) extractTags(name string, expr Expr, tags map[string]string) (Expr, error) {
// TODO: Refactor into a walk-like Replace().
switch expr := expr.(type) {
case *BinaryExpr:
// If the LHS is a variable ref then check for tag equality.
if lhs, ok := expr.LHS.(*VarRef); ok && expr.Op == EQ {
return p.extractBinaryExprTags(name, expr, lhs, expr.RHS, tags)
}
// If the RHS is a variable ref then check for tag equality.
if rhs, ok := expr.RHS.(*VarRef); ok && expr.Op == EQ {
return p.extractBinaryExprTags(name, expr, rhs, expr.LHS, tags)
}
// Recursively process LHS.
lhs, err := p.extractTags(name, expr.LHS, tags)
if err != nil {
return nil, err
}
expr.LHS = lhs
// Recursively process RHS.
rhs, err := p.extractTags(name, expr.RHS, tags)
if err != nil {
return nil, err
}
expr.RHS = rhs
return expr, nil
case *ParenExpr:
e, err := p.extractTags(name, expr.Expr, tags)
if err != nil {
return nil, err
}
expr.Expr = e
return expr, nil
default:
return expr, nil
}
}
// extractBinaryExprTags extracts a tag key/value map from a statement.
func (p *Planner) extractBinaryExprTags(name string, expr Expr, ref *VarRef, value Expr, tags map[string]string) (Expr, error) {
// Ignore if the value is not a string literal.
lit, ok := value.(*StringLiteral)
if !ok {
return expr, nil
}
// Extract the key and remove the measurement prefix.
key := strings.TrimPrefix(ref.Val, name+".")
// If tag is already filtered then return error.
if _, ok := tags[key]; ok {
return nil, fmt.Errorf("duplicate tag filter: %s.%s", name, key)
}
// Add tag to the filter.
tags[key] = lit.Val
// Return nil to remove the expression.
return nil, nil
}
// Executor represents the implementation of Executor. // Executor represents the implementation of Executor.
// It executes all reducers and combines their result into a row. // It executes all reducers and combines their result into a row.
type Executor struct { type Executor struct {

View File

@ -136,6 +136,54 @@ func TestPlanner_Plan_GroupByIntervalAndTag(t *testing.T) {
} }
} }
// Ensure the planner can plan and execute a query filtered by tag.
func TestPlanner_Plan_FilterByTag(t *testing.T) {
db := NewDB("2000-01-01T12:00:00Z")
db.WriteSeries("cpu", map[string]string{"host": "servera", "region": "us-west"}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(1)})
db.WriteSeries("cpu", map[string]string{"host": "servera", "region": "us-west"}, "2000-01-01T09:30:00Z", map[string]interface{}{"value": float64(2)})
db.WriteSeries("cpu", map[string]string{"host": "servera", "region": "us-west"}, "2000-01-01T11:00:00Z", map[string]interface{}{"value": float64(3)})
db.WriteSeries("cpu", map[string]string{"host": "servera", "region": "us-west"}, "2000-01-01T11:30:00Z", map[string]interface{}{"value": float64(4)})
db.WriteSeries("cpu", map[string]string{"host": "serverb", "region": "us-east"}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(10)})
db.WriteSeries("cpu", map[string]string{"host": "serverb", "region": "us-east"}, "2000-01-01T11:00:00Z", map[string]interface{}{"value": float64(20)})
db.WriteSeries("cpu", map[string]string{"host": "serverc", "region": "us-west"}, "2000-01-01T09:00:00Z", map[string]interface{}{"value": float64(100)})
db.WriteSeries("cpu", map[string]string{"host": "serverc", "region": "us-west"}, "2000-01-01T11:00:00Z", map[string]interface{}{"value": float64(200)})
// Query for data since 3 hours ago until now, grouped every 30 minutes.
rs := db.MustPlanAndExecute(`
SELECT sum(value)
FROM cpu
WHERE time >= now() - 3h AND region = 'us-west'
GROUP BY time(1h), host`)
// Expected resultset.
exp := minify(`[{
"name":"cpu",
"tags":{"host":"servera"},
"columns":["time","sum"],
"values":[
[946717200000000,3],
[946720800000000,0],
[946724400000000,7]
]
},{
"name":"cpu",
"tags":{"host":"serverc"},
"columns":["time","sum"],
"values":[
[946717200000000,100],
[946720800000000,0],
[946724400000000,200]
]
}]`)
// Compare resultsets.
if act := jsonify(rs); exp != act {
t.Fatalf("unexpected resultset: %s", indent(act))
}
}
// Ensure the planner can plan and execute a joined query. // Ensure the planner can plan and execute a joined query.
func TestPlanner_Plan_Join(t *testing.T) { func TestPlanner_Plan_Join(t *testing.T) {
db := NewDB("2000-01-01T12:00:00Z") db := NewDB("2000-01-01T12:00:00Z")