diff --git a/query/influxql/errors.go b/query/influxql/errors.go new file mode 100644 index 0000000000..d7be3c933f --- /dev/null +++ b/query/influxql/errors.go @@ -0,0 +1,7 @@ +package influxql + +import "errors" + +var ( + errDatabaseNameRequired = errors.New("database name required") +) diff --git a/query/influxql/spectests/show_tag_values.go b/query/influxql/spectests/show_tag_values.go new file mode 100644 index 0000000000..50dd6e30a9 --- /dev/null +++ b/query/influxql/spectests/show_tag_values.go @@ -0,0 +1,85 @@ +package spectests + +import ( + "time" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/functions" +) + +func init() { + RegisterFixture( + NewFixture( + `SHOW TAG VALUES ON "db0" WITH KEY = "host"`, + &flux.Spec{ + Operations: []*flux.Operation{ + { + ID: "from0", + Spec: &functions.FromOpSpec{ + BucketID: bucketID, + }, + }, + { + ID: "range0", + Spec: &functions.RangeOpSpec{ + Start: flux.Time{ + Relative: -time.Hour, + IsRelative: true, + }, + }, + }, + { + ID: "keyValues0", + Spec: &functions.KeyValuesOpSpec{ + KeyCols: []string{"host"}, + }, + }, + { + ID: "group0", + Spec: &functions.GroupOpSpec{ + By: []string{"_measurement", "_key"}, + }, + }, + { + ID: "distinct0", + Spec: &functions.DistinctOpSpec{ + Column: execute.DefaultValueColLabel, + }, + }, + { + ID: "group1", + Spec: &functions.GroupOpSpec{ + By: []string{"_measurement"}, + }, + }, + { + ID: "rename0", + Spec: &functions.RenameOpSpec{ + Cols: map[string]string{ + "_key": "key", + "_value": "value", + }, + }, + }, + { + ID: "yield0", + Spec: &functions.YieldOpSpec{ + Name: "0", + }, + }, + }, + Edges: []flux.Edge{ + {Parent: "from0", Child: "range0"}, + {Parent: "range0", Child: "keyValues0"}, + {Parent: "keyValues0", Child: "group0"}, + {Parent: "group0", Child: "distinct0"}, + {Parent: "distinct0", Child: "group1"}, + {Parent: "group1", Child: "rename0"}, + {Parent: "rename0", Child: "yield0"}, + }, + Now: Now(), + }, + ), + ) +} diff --git a/query/influxql/spectests/show_tag_values_in_list.go b/query/influxql/spectests/show_tag_values_in_list.go new file mode 100644 index 0000000000..736686ba59 --- /dev/null +++ b/query/influxql/spectests/show_tag_values_in_list.go @@ -0,0 +1,85 @@ +package spectests + +import ( + "time" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/functions" +) + +func init() { + RegisterFixture( + NewFixture( + `SHOW TAG VALUES ON "db0" WITH KEY IN ("host", "region")`, + &flux.Spec{ + Operations: []*flux.Operation{ + { + ID: "from0", + Spec: &functions.FromOpSpec{ + BucketID: bucketID, + }, + }, + { + ID: "range0", + Spec: &functions.RangeOpSpec{ + Start: flux.Time{ + Relative: -time.Hour, + IsRelative: true, + }, + }, + }, + { + ID: "keyValues0", + Spec: &functions.KeyValuesOpSpec{ + KeyCols: []string{"host", "region"}, + }, + }, + { + ID: "group0", + Spec: &functions.GroupOpSpec{ + By: []string{"_measurement", "_key"}, + }, + }, + { + ID: "distinct0", + Spec: &functions.DistinctOpSpec{ + Column: execute.DefaultValueColLabel, + }, + }, + { + ID: "group1", + Spec: &functions.GroupOpSpec{ + By: []string{"_measurement"}, + }, + }, + { + ID: "rename0", + Spec: &functions.RenameOpSpec{ + Cols: map[string]string{ + "_key": "key", + "_value": "value", + }, + }, + }, + { + ID: "yield0", + Spec: &functions.YieldOpSpec{ + Name: "0", + }, + }, + }, + Edges: []flux.Edge{ + {Parent: "from0", Child: "range0"}, + {Parent: "range0", Child: "keyValues0"}, + {Parent: "keyValues0", Child: "group0"}, + {Parent: "group0", Child: "distinct0"}, + {Parent: "distinct0", Child: "group1"}, + {Parent: "group1", Child: "rename0"}, + {Parent: "rename0", Child: "yield0"}, + }, + Now: Now(), + }, + ), + ) +} diff --git a/query/influxql/spectests/show_tag_values_multiple_measurements.go b/query/influxql/spectests/show_tag_values_multiple_measurements.go new file mode 100644 index 0000000000..e7ec972a33 --- /dev/null +++ b/query/influxql/spectests/show_tag_values_multiple_measurements.go @@ -0,0 +1,128 @@ +package spectests + +import ( + "time" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/functions" + "github.com/influxdata/flux/semantic" +) + +func init() { + RegisterFixture( + NewFixture( + `SHOW TAG VALUES ON "db0" FROM "cpu", "mem", "gpu" WITH KEY = "host"`, + &flux.Spec{ + Operations: []*flux.Operation{ + { + ID: "from0", + Spec: &functions.FromOpSpec{ + BucketID: bucketID, + }, + }, + { + ID: "range0", + Spec: &functions.RangeOpSpec{ + Start: flux.Time{ + Relative: -time.Hour, + IsRelative: true, + }, + }, + }, + { + ID: "filter0", + Spec: &functions.FilterOpSpec{ + Fn: &semantic.FunctionExpression{ + Params: []*semantic.FunctionParam{ + {Key: &semantic.Identifier{Name: "r"}}, + }, + Body: &semantic.LogicalExpression{ + Operator: ast.OrOperator, + Left: &semantic.BinaryExpression{ + Operator: ast.EqualOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{Name: "r"}, + Property: "_measurement", + }, + Right: &semantic.StringLiteral{Value: "cpu"}, + }, + Right: &semantic.LogicalExpression{ + Operator: ast.OrOperator, + Left: &semantic.BinaryExpression{ + Operator: ast.EqualOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{Name: "r"}, + Property: "_measurement", + }, + Right: &semantic.StringLiteral{Value: "mem"}, + }, + Right: &semantic.BinaryExpression{ + Operator: ast.EqualOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{Name: "r"}, + Property: "_measurement", + }, + Right: &semantic.StringLiteral{Value: "gpu"}, + }, + }, + }, + }, + }, + }, + { + ID: "keyValues0", + Spec: &functions.KeyValuesOpSpec{ + KeyCols: []string{"host"}, + }, + }, + { + ID: "group0", + Spec: &functions.GroupOpSpec{ + By: []string{"_measurement", "_key"}, + }, + }, + { + ID: "distinct0", + Spec: &functions.DistinctOpSpec{ + Column: execute.DefaultValueColLabel, + }, + }, + { + ID: "group1", + Spec: &functions.GroupOpSpec{ + By: []string{"_measurement"}, + }, + }, + { + ID: "rename0", + Spec: &functions.RenameOpSpec{ + Cols: map[string]string{ + "_key": "key", + "_value": "value", + }, + }, + }, + { + ID: "yield0", + Spec: &functions.YieldOpSpec{ + Name: "0", + }, + }, + }, + Edges: []flux.Edge{ + {Parent: "from0", Child: "range0"}, + {Parent: "range0", Child: "filter0"}, + {Parent: "filter0", Child: "keyValues0"}, + {Parent: "keyValues0", Child: "group0"}, + {Parent: "group0", Child: "distinct0"}, + {Parent: "distinct0", Child: "group1"}, + {Parent: "group1", Child: "rename0"}, + {Parent: "rename0", Child: "yield0"}, + }, + Now: Now(), + }, + ), + ) +} diff --git a/query/influxql/spectests/testing.go b/query/influxql/spectests/testing.go index f4f7521f96..dcd475e888 100644 --- a/query/influxql/spectests/testing.go +++ b/query/influxql/spectests/testing.go @@ -91,7 +91,9 @@ func (f *fixture) Run(t *testing.T) { transpiler := influxql.NewTranspilerWithConfig( dbrpMappingSvc, influxql.Config{ - NowFn: Now, + DefaultDatabase: "db0", + Cluster: "cluster", + NowFn: Now, }, ) spec, err := transpiler.Transpile(context.Background(), f.stmt) diff --git a/query/influxql/transpiler.go b/query/influxql/transpiler.go index 4315d4df34..1db3beb505 100644 --- a/query/influxql/transpiler.go +++ b/query/influxql/transpiler.go @@ -9,7 +9,10 @@ import ( "time" "github.com/influxdata/flux" + "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/execute" "github.com/influxdata/flux/functions" + "github.com/influxdata/flux/semantic" "github.com/influxdata/influxql" "github.com/influxdata/platform" ) @@ -75,44 +78,145 @@ func newTranspilerState(dbrpMappingSvc platform.DBRPMappingService, config *Conf } func (t *transpilerState) Transpile(ctx context.Context, id int, s influxql.Statement) error { + op, err := t.transpile(ctx, s) + if err != nil { + return err + } + t.op("yield", &functions.YieldOpSpec{Name: strconv.Itoa(id)}, op) + return nil +} + +func (t *transpilerState) transpile(ctx context.Context, s influxql.Statement) (flux.OperationID, error) { switch stmt := s.(type) { case *influxql.SelectStatement: - if err := t.transpileSelect(ctx, id, stmt); err != nil { - return err - } + return t.transpileSelect(ctx, stmt) case *influxql.ShowTagValuesStatement: - if err := t.transpileShowTagValues(ctx, id, stmt); err != nil { - return err + return t.transpileShowTagValues(ctx, stmt) + default: + return "", fmt.Errorf("unknown statement type %T", s) + } +} + +func (t *transpilerState) transpileShowTagValues(ctx context.Context, stmt *influxql.ShowTagValuesStatement) (flux.OperationID, error) { + // While the ShowTagValuesStatement contains a sources section and those sources are measurements, they do + // not actually contain the database and we do not factor in retention policies. So we are always going to use + // the default retention policy when evaluating which bucket we are querying and we do not have to consult + // the sources in the statement. + if stmt.Database == "" { + if t.config.DefaultDatabase == "" { + return "", errDatabaseNameRequired + } + stmt.Database = t.config.DefaultDatabase + } + + op, err := t.from(&influxql.Measurement{Database: stmt.Database}) + if err != nil { + return "", err + } + + // TODO(jsternberg): Read the range from the condition expression. 1.x doesn't actually do this so it isn't + // urgent to implement this functionality so we can use the default range. + op = t.op("range", &functions.RangeOpSpec{ + Start: flux.Time{ + Relative: -time.Hour, + IsRelative: true, + }, + }, op) + + // If we have a list of sources, look through it and add each of the measurement names. + measurementNames := make([]string, 0, len(stmt.Sources)) + for _, source := range stmt.Sources { + mm := source.(*influxql.Measurement) + measurementNames = append(measurementNames, mm.Name) + } + + if len(measurementNames) > 0 { + var expr semantic.Expression = &semantic.BinaryExpression{ + Operator: ast.EqualOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{Name: "r"}, + Property: "_measurement", + }, + Right: &semantic.StringLiteral{Value: measurementNames[len(measurementNames)-1]}, + } + for i := len(measurementNames) - 2; i >= 0; i-- { + expr = &semantic.LogicalExpression{ + Operator: ast.OrOperator, + Left: &semantic.BinaryExpression{ + Operator: ast.EqualOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{Name: "r"}, + Property: "_measurement", + }, + Right: &semantic.StringLiteral{Value: measurementNames[i]}, + }, + Right: expr, + } + } + op = t.op("filter", &functions.FilterOpSpec{ + Fn: &semantic.FunctionExpression{ + Params: []*semantic.FunctionParam{ + {Key: &semantic.Identifier{Name: "r"}}, + }, + Body: expr, + }, + }, op) + } + + // TODO(jsternberg): Add the condition filter for the where clause. + + // Create the key values op spec from the + var keyValues functions.KeyValuesOpSpec + switch expr := stmt.TagKeyExpr.(type) { + case *influxql.ListLiteral: + keyValues.KeyCols = expr.Vals + case *influxql.StringLiteral: + switch stmt.Op { + case influxql.EQ: + keyValues.KeyCols = []string{expr.Val} + case influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + return "", fmt.Errorf("unimplemented: tag key operand: %s", stmt.Op) + default: + return "", fmt.Errorf("unsupported operand: %s", stmt.Op) } default: - return fmt.Errorf("unknown statement type %T", s) + return "", fmt.Errorf("unsupported literal type: %T", expr) } - return nil + op = t.op("keyValues", &keyValues, op) + + // Group by the measurement and key, find distinct values, then group by the measurement + // to join all of the different keys together. Finish by renaming the columns. This is static. + return t.op("rename", &functions.RenameOpSpec{ + Cols: map[string]string{ + "_key": "key", + "_value": "value", + }, + }, t.op("group", &functions.GroupOpSpec{ + By: []string{"_measurement"}, + }, t.op("distinct", &functions.DistinctOpSpec{ + Column: execute.DefaultValueColLabel, + }, t.op("group", &functions.GroupOpSpec{ + By: []string{"_measurement", "_key"}, + }, op)))), nil } -func (t *transpilerState) transpileShowTagValues(ctx context.Context, id int, stmt *influxql.ShowTagValuesStatement) error { - - return nil -} - -func (t *transpilerState) transpileSelect(ctx context.Context, id int, stmt *influxql.SelectStatement) error { +func (t *transpilerState) transpileSelect(ctx context.Context, stmt *influxql.SelectStatement) (flux.OperationID, error) { // Clone the select statement and omit the time from the list of column names. t.stmt = stmt.Clone() t.stmt.OmitTime = true - t.id = id groups, err := identifyGroups(t.stmt) if err != nil { - return err + return "", err } else if len(groups) == 0 { - return errors.New("at least 1 non-time field must be queried") + return "", errors.New("at least 1 non-time field must be queried") } cursors := make([]cursor, 0, len(groups)) for _, gr := range groups { cur, err := gr.createCursor(t) if err != nil { - return err + return "", err } cursors = append(cursors, cur) } @@ -124,14 +228,9 @@ func (t *transpilerState) transpileSelect(ctx context.Context, id int, stmt *inf // Map each of the fields into another cursor. This evaluates any lingering expressions. cur, err = t.mapFields(cur) if err != nil { - return err + return "", err } - - // Yield the cursor from the last cursor to a stream with the name of the statement id. - // TODO(jsternberg): Include the statement id in the transpiler state when we create - // the state so we can yield to something other than zero. - t.op("yield", &functions.YieldOpSpec{Name: strconv.Itoa(t.id)}, cur.ID()) - return nil + return cur.ID(), nil } func (t *transpilerState) mapType(ref *influxql.VarRef) influxql.DataType {