Merge pull request #848 from influxdata/js-transpiler-show-tag-values
feat: implement basic show tag values in the transpilerpull/10616/head
commit
73e474567b
|
@ -0,0 +1,7 @@
|
|||
package influxql
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
errDatabaseNameRequired = errors.New("database name required")
|
||||
)
|
|
@ -0,0 +1,85 @@
|
|||
package spectests
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/functions"
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterFixture(
|
||||
NewFixture(
|
||||
`SHOW TAG VALUES ON "db0" WITH KEY = "host"`,
|
||||
&flux.Spec{
|
||||
Operations: []*flux.Operation{
|
||||
{
|
||||
ID: "from0",
|
||||
Spec: &functions.FromOpSpec{
|
||||
BucketID: bucketID,
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "range0",
|
||||
Spec: &functions.RangeOpSpec{
|
||||
Start: flux.Time{
|
||||
Relative: -time.Hour,
|
||||
IsRelative: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "keyValues0",
|
||||
Spec: &functions.KeyValuesOpSpec{
|
||||
KeyCols: []string{"host"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "group0",
|
||||
Spec: &functions.GroupOpSpec{
|
||||
By: []string{"_measurement", "_key"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "distinct0",
|
||||
Spec: &functions.DistinctOpSpec{
|
||||
Column: execute.DefaultValueColLabel,
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "group1",
|
||||
Spec: &functions.GroupOpSpec{
|
||||
By: []string{"_measurement"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "rename0",
|
||||
Spec: &functions.RenameOpSpec{
|
||||
Cols: map[string]string{
|
||||
"_key": "key",
|
||||
"_value": "value",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "yield0",
|
||||
Spec: &functions.YieldOpSpec{
|
||||
Name: "0",
|
||||
},
|
||||
},
|
||||
},
|
||||
Edges: []flux.Edge{
|
||||
{Parent: "from0", Child: "range0"},
|
||||
{Parent: "range0", Child: "keyValues0"},
|
||||
{Parent: "keyValues0", Child: "group0"},
|
||||
{Parent: "group0", Child: "distinct0"},
|
||||
{Parent: "distinct0", Child: "group1"},
|
||||
{Parent: "group1", Child: "rename0"},
|
||||
{Parent: "rename0", Child: "yield0"},
|
||||
},
|
||||
Now: Now(),
|
||||
},
|
||||
),
|
||||
)
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
package spectests
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/functions"
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterFixture(
|
||||
NewFixture(
|
||||
`SHOW TAG VALUES ON "db0" WITH KEY IN ("host", "region")`,
|
||||
&flux.Spec{
|
||||
Operations: []*flux.Operation{
|
||||
{
|
||||
ID: "from0",
|
||||
Spec: &functions.FromOpSpec{
|
||||
BucketID: bucketID,
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "range0",
|
||||
Spec: &functions.RangeOpSpec{
|
||||
Start: flux.Time{
|
||||
Relative: -time.Hour,
|
||||
IsRelative: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "keyValues0",
|
||||
Spec: &functions.KeyValuesOpSpec{
|
||||
KeyCols: []string{"host", "region"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "group0",
|
||||
Spec: &functions.GroupOpSpec{
|
||||
By: []string{"_measurement", "_key"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "distinct0",
|
||||
Spec: &functions.DistinctOpSpec{
|
||||
Column: execute.DefaultValueColLabel,
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "group1",
|
||||
Spec: &functions.GroupOpSpec{
|
||||
By: []string{"_measurement"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "rename0",
|
||||
Spec: &functions.RenameOpSpec{
|
||||
Cols: map[string]string{
|
||||
"_key": "key",
|
||||
"_value": "value",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "yield0",
|
||||
Spec: &functions.YieldOpSpec{
|
||||
Name: "0",
|
||||
},
|
||||
},
|
||||
},
|
||||
Edges: []flux.Edge{
|
||||
{Parent: "from0", Child: "range0"},
|
||||
{Parent: "range0", Child: "keyValues0"},
|
||||
{Parent: "keyValues0", Child: "group0"},
|
||||
{Parent: "group0", Child: "distinct0"},
|
||||
{Parent: "distinct0", Child: "group1"},
|
||||
{Parent: "group1", Child: "rename0"},
|
||||
{Parent: "rename0", Child: "yield0"},
|
||||
},
|
||||
Now: Now(),
|
||||
},
|
||||
),
|
||||
)
|
||||
}
|
|
@ -0,0 +1,128 @@
|
|||
package spectests
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/functions"
|
||||
"github.com/influxdata/flux/semantic"
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterFixture(
|
||||
NewFixture(
|
||||
`SHOW TAG VALUES ON "db0" FROM "cpu", "mem", "gpu" WITH KEY = "host"`,
|
||||
&flux.Spec{
|
||||
Operations: []*flux.Operation{
|
||||
{
|
||||
ID: "from0",
|
||||
Spec: &functions.FromOpSpec{
|
||||
BucketID: bucketID,
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "range0",
|
||||
Spec: &functions.RangeOpSpec{
|
||||
Start: flux.Time{
|
||||
Relative: -time.Hour,
|
||||
IsRelative: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "filter0",
|
||||
Spec: &functions.FilterOpSpec{
|
||||
Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{
|
||||
{Key: &semantic.Identifier{Name: "r"}},
|
||||
},
|
||||
Body: &semantic.LogicalExpression{
|
||||
Operator: ast.OrOperator,
|
||||
Left: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{Name: "r"},
|
||||
Property: "_measurement",
|
||||
},
|
||||
Right: &semantic.StringLiteral{Value: "cpu"},
|
||||
},
|
||||
Right: &semantic.LogicalExpression{
|
||||
Operator: ast.OrOperator,
|
||||
Left: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{Name: "r"},
|
||||
Property: "_measurement",
|
||||
},
|
||||
Right: &semantic.StringLiteral{Value: "mem"},
|
||||
},
|
||||
Right: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{Name: "r"},
|
||||
Property: "_measurement",
|
||||
},
|
||||
Right: &semantic.StringLiteral{Value: "gpu"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "keyValues0",
|
||||
Spec: &functions.KeyValuesOpSpec{
|
||||
KeyCols: []string{"host"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "group0",
|
||||
Spec: &functions.GroupOpSpec{
|
||||
By: []string{"_measurement", "_key"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "distinct0",
|
||||
Spec: &functions.DistinctOpSpec{
|
||||
Column: execute.DefaultValueColLabel,
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "group1",
|
||||
Spec: &functions.GroupOpSpec{
|
||||
By: []string{"_measurement"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "rename0",
|
||||
Spec: &functions.RenameOpSpec{
|
||||
Cols: map[string]string{
|
||||
"_key": "key",
|
||||
"_value": "value",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "yield0",
|
||||
Spec: &functions.YieldOpSpec{
|
||||
Name: "0",
|
||||
},
|
||||
},
|
||||
},
|
||||
Edges: []flux.Edge{
|
||||
{Parent: "from0", Child: "range0"},
|
||||
{Parent: "range0", Child: "filter0"},
|
||||
{Parent: "filter0", Child: "keyValues0"},
|
||||
{Parent: "keyValues0", Child: "group0"},
|
||||
{Parent: "group0", Child: "distinct0"},
|
||||
{Parent: "distinct0", Child: "group1"},
|
||||
{Parent: "group1", Child: "rename0"},
|
||||
{Parent: "rename0", Child: "yield0"},
|
||||
},
|
||||
Now: Now(),
|
||||
},
|
||||
),
|
||||
)
|
||||
}
|
|
@ -91,7 +91,9 @@ func (f *fixture) Run(t *testing.T) {
|
|||
transpiler := influxql.NewTranspilerWithConfig(
|
||||
dbrpMappingSvc,
|
||||
influxql.Config{
|
||||
NowFn: Now,
|
||||
DefaultDatabase: "db0",
|
||||
Cluster: "cluster",
|
||||
NowFn: Now,
|
||||
},
|
||||
)
|
||||
spec, err := transpiler.Transpile(context.Background(), f.stmt)
|
||||
|
|
|
@ -9,7 +9,10 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/functions"
|
||||
"github.com/influxdata/flux/semantic"
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform"
|
||||
)
|
||||
|
@ -75,44 +78,145 @@ func newTranspilerState(dbrpMappingSvc platform.DBRPMappingService, config *Conf
|
|||
}
|
||||
|
||||
func (t *transpilerState) Transpile(ctx context.Context, id int, s influxql.Statement) error {
|
||||
op, err := t.transpile(ctx, s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.op("yield", &functions.YieldOpSpec{Name: strconv.Itoa(id)}, op)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *transpilerState) transpile(ctx context.Context, s influxql.Statement) (flux.OperationID, error) {
|
||||
switch stmt := s.(type) {
|
||||
case *influxql.SelectStatement:
|
||||
if err := t.transpileSelect(ctx, id, stmt); err != nil {
|
||||
return err
|
||||
}
|
||||
return t.transpileSelect(ctx, stmt)
|
||||
case *influxql.ShowTagValuesStatement:
|
||||
if err := t.transpileShowTagValues(ctx, id, stmt); err != nil {
|
||||
return err
|
||||
return t.transpileShowTagValues(ctx, stmt)
|
||||
default:
|
||||
return "", fmt.Errorf("unknown statement type %T", s)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *transpilerState) transpileShowTagValues(ctx context.Context, stmt *influxql.ShowTagValuesStatement) (flux.OperationID, error) {
|
||||
// While the ShowTagValuesStatement contains a sources section and those sources are measurements, they do
|
||||
// not actually contain the database and we do not factor in retention policies. So we are always going to use
|
||||
// the default retention policy when evaluating which bucket we are querying and we do not have to consult
|
||||
// the sources in the statement.
|
||||
if stmt.Database == "" {
|
||||
if t.config.DefaultDatabase == "" {
|
||||
return "", errDatabaseNameRequired
|
||||
}
|
||||
stmt.Database = t.config.DefaultDatabase
|
||||
}
|
||||
|
||||
op, err := t.from(&influxql.Measurement{Database: stmt.Database})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// TODO(jsternberg): Read the range from the condition expression. 1.x doesn't actually do this so it isn't
|
||||
// urgent to implement this functionality so we can use the default range.
|
||||
op = t.op("range", &functions.RangeOpSpec{
|
||||
Start: flux.Time{
|
||||
Relative: -time.Hour,
|
||||
IsRelative: true,
|
||||
},
|
||||
}, op)
|
||||
|
||||
// If we have a list of sources, look through it and add each of the measurement names.
|
||||
measurementNames := make([]string, 0, len(stmt.Sources))
|
||||
for _, source := range stmt.Sources {
|
||||
mm := source.(*influxql.Measurement)
|
||||
measurementNames = append(measurementNames, mm.Name)
|
||||
}
|
||||
|
||||
if len(measurementNames) > 0 {
|
||||
var expr semantic.Expression = &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{Name: "r"},
|
||||
Property: "_measurement",
|
||||
},
|
||||
Right: &semantic.StringLiteral{Value: measurementNames[len(measurementNames)-1]},
|
||||
}
|
||||
for i := len(measurementNames) - 2; i >= 0; i-- {
|
||||
expr = &semantic.LogicalExpression{
|
||||
Operator: ast.OrOperator,
|
||||
Left: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{Name: "r"},
|
||||
Property: "_measurement",
|
||||
},
|
||||
Right: &semantic.StringLiteral{Value: measurementNames[i]},
|
||||
},
|
||||
Right: expr,
|
||||
}
|
||||
}
|
||||
op = t.op("filter", &functions.FilterOpSpec{
|
||||
Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{
|
||||
{Key: &semantic.Identifier{Name: "r"}},
|
||||
},
|
||||
Body: expr,
|
||||
},
|
||||
}, op)
|
||||
}
|
||||
|
||||
// TODO(jsternberg): Add the condition filter for the where clause.
|
||||
|
||||
// Create the key values op spec from the
|
||||
var keyValues functions.KeyValuesOpSpec
|
||||
switch expr := stmt.TagKeyExpr.(type) {
|
||||
case *influxql.ListLiteral:
|
||||
keyValues.KeyCols = expr.Vals
|
||||
case *influxql.StringLiteral:
|
||||
switch stmt.Op {
|
||||
case influxql.EQ:
|
||||
keyValues.KeyCols = []string{expr.Val}
|
||||
case influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
return "", fmt.Errorf("unimplemented: tag key operand: %s", stmt.Op)
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported operand: %s", stmt.Op)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unknown statement type %T", s)
|
||||
return "", fmt.Errorf("unsupported literal type: %T", expr)
|
||||
}
|
||||
return nil
|
||||
op = t.op("keyValues", &keyValues, op)
|
||||
|
||||
// Group by the measurement and key, find distinct values, then group by the measurement
|
||||
// to join all of the different keys together. Finish by renaming the columns. This is static.
|
||||
return t.op("rename", &functions.RenameOpSpec{
|
||||
Cols: map[string]string{
|
||||
"_key": "key",
|
||||
"_value": "value",
|
||||
},
|
||||
}, t.op("group", &functions.GroupOpSpec{
|
||||
By: []string{"_measurement"},
|
||||
}, t.op("distinct", &functions.DistinctOpSpec{
|
||||
Column: execute.DefaultValueColLabel,
|
||||
}, t.op("group", &functions.GroupOpSpec{
|
||||
By: []string{"_measurement", "_key"},
|
||||
}, op)))), nil
|
||||
}
|
||||
|
||||
func (t *transpilerState) transpileShowTagValues(ctx context.Context, id int, stmt *influxql.ShowTagValuesStatement) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *transpilerState) transpileSelect(ctx context.Context, id int, stmt *influxql.SelectStatement) error {
|
||||
func (t *transpilerState) transpileSelect(ctx context.Context, stmt *influxql.SelectStatement) (flux.OperationID, error) {
|
||||
// Clone the select statement and omit the time from the list of column names.
|
||||
t.stmt = stmt.Clone()
|
||||
t.stmt.OmitTime = true
|
||||
t.id = id
|
||||
|
||||
groups, err := identifyGroups(t.stmt)
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
} else if len(groups) == 0 {
|
||||
return errors.New("at least 1 non-time field must be queried")
|
||||
return "", errors.New("at least 1 non-time field must be queried")
|
||||
}
|
||||
|
||||
cursors := make([]cursor, 0, len(groups))
|
||||
for _, gr := range groups {
|
||||
cur, err := gr.createCursor(t)
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
cursors = append(cursors, cur)
|
||||
}
|
||||
|
@ -124,14 +228,9 @@ func (t *transpilerState) transpileSelect(ctx context.Context, id int, stmt *inf
|
|||
// Map each of the fields into another cursor. This evaluates any lingering expressions.
|
||||
cur, err = t.mapFields(cur)
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Yield the cursor from the last cursor to a stream with the name of the statement id.
|
||||
// TODO(jsternberg): Include the statement id in the transpiler state when we create
|
||||
// the state so we can yield to something other than zero.
|
||||
t.op("yield", &functions.YieldOpSpec{Name: strconv.Itoa(t.id)}, cur.ID())
|
||||
return nil
|
||||
return cur.ID(), nil
|
||||
}
|
||||
|
||||
func (t *transpilerState) mapType(ref *influxql.VarRef) influxql.DataType {
|
||||
|
|
Loading…
Reference in New Issue