feat(query/influxql): transpile using buckets instead of the database

The transpiler should use a bucket for the `from()` call instead of the
database parameter which will likely be deprecated. The bucket that it
will read data from is `db/rp` and, if the retention policy isn't
specified, `autogen` will be used as the default.
pull/10616/head
Jonathan A. Sternberg 2018-06-11 10:13:11 -05:00
parent af7c391ec3
commit 86defc1e37
4 changed files with 145 additions and 14 deletions

View File

@ -49,7 +49,7 @@ If a wildcard is identified, then the schema must be consulted for all of the fi
The base cursor for each variable is generated using the following template:
create_cursor = (db, rp="autogen", start, stop=now(), m, f) => from(db: db+"/"+rp)
create_cursor = (db, rp="autogen", start, stop=now(), m, f) => from(bucket: db+"/"+rp)
|> range(start: start, stop: stop)
|> filter(fn: (r) => r._measurement == m and r._field == f)

View File

@ -48,11 +48,10 @@ func createVarRefCursor(t *transpilerState, ref *influxql.VarRef) (cursor, error
}
// Create the from spec and add it to the list of operations.
// TODO(jsternberg): Autogenerate these IDs and track the resulting operation
// so we can reference them from other locations.
from := t.op("from", &functions.FromOpSpec{
Database: mm.Database,
})
from, err := t.from(mm)
if err != nil {
return nil, err
}
valuer := influxql.NowValuer{Now: t.now}
_, tr, err := influxql.ConditionExpr(t.stmt.Condition, &valuer)

View File

@ -97,6 +97,21 @@ func (t *transpilerState) mapType(ref *influxql.VarRef) influxql.DataType {
return influxql.Tag
}
func (t *transpilerState) from(m *influxql.Measurement) (query.OperationID, error) {
db, rp := m.Database, m.RetentionPolicy
if db == "" {
return "", errors.New("database is required")
}
if rp == "" {
rp = "autogen"
}
spec := &functions.FromOpSpec{
Bucket: fmt.Sprintf("%s/%s", db, rp),
}
return t.op("from", spec), nil
}
func (t *transpilerState) op(name string, spec query.OperationSpec, parents ...query.OperationID) query.OperationID {
op := query.Operation{
ID: query.OperationID(fmt.Sprintf("%s%d", name, t.nextID[name])),

View File

@ -29,7 +29,7 @@ func TestTranspiler(t *testing.T) {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Database: "db0",
Bucket: "db0/autogen",
},
},
{
@ -157,7 +157,7 @@ func TestTranspiler(t *testing.T) {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Database: "db0",
Bucket: "db0/autogen",
},
},
{
@ -274,7 +274,7 @@ func TestTranspiler(t *testing.T) {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Database: "db0",
Bucket: "db0/autogen",
},
},
{
@ -340,7 +340,7 @@ func TestTranspiler(t *testing.T) {
{
ID: "from1",
Spec: &functions.FromOpSpec{
Database: "db0",
Bucket: "db0/autogen",
},
},
{
@ -518,7 +518,7 @@ func TestTranspiler(t *testing.T) {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Database: "db0",
Bucket: "db0/autogen",
},
},
{
@ -568,7 +568,7 @@ func TestTranspiler(t *testing.T) {
{
ID: "from1",
Spec: &functions.FromOpSpec{
Database: "db0",
Bucket: "db0/autogen",
},
},
{
@ -735,7 +735,7 @@ func TestTranspiler(t *testing.T) {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Database: "db0",
Bucket: "db0/autogen",
},
},
{
@ -886,7 +886,7 @@ func TestTranspiler(t *testing.T) {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Database: "db0",
Bucket: "db0/autogen",
},
},
{
@ -1019,6 +1019,123 @@ func TestTranspiler(t *testing.T) {
},
},
},
{
s: `SELECT value FROM db0.alternate.cpu`,
spec: &query.Spec{
Operations: []*query.Operation{
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/alternate",
},
},
{
ID: "range0",
Spec: &functions.RangeOpSpec{
Start: query.Time{Absolute: time.Unix(0, influxqllib.MinTime)},
Stop: query.Time{Absolute: time.Unix(0, influxqllib.MaxTime)},
},
},
{
ID: "filter0",
Spec: &functions.FilterOpSpec{
Fn: &semantic.FunctionExpression{
Params: []*semantic.FunctionParam{
{Key: &semantic.Identifier{Name: "r"}},
},
Body: &semantic.LogicalExpression{
Operator: ast.AndOperator,
Left: &semantic.BinaryExpression{
Operator: ast.EqualOperator,
Left: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "_measurement",
},
Right: &semantic.StringLiteral{
Value: "cpu",
},
},
Right: &semantic.BinaryExpression{
Operator: ast.EqualOperator,
Left: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "_field",
},
Right: &semantic.StringLiteral{
Value: "value",
},
},
},
},
},
},
{
ID: "group0",
Spec: &functions.GroupOpSpec{
By: []string{"_measurement"},
},
},
{
ID: "map0",
Spec: &functions.MapOpSpec{
Fn: &semantic.FunctionExpression{
Params: []*semantic.FunctionParam{{
Key: &semantic.Identifier{Name: "r"},
}},
Body: &semantic.ObjectExpression{
Properties: []*semantic.Property{
{
Key: &semantic.Identifier{Name: "time"},
Value: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "_time",
},
},
{
Key: &semantic.Identifier{Name: "_measurement"},
Value: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "_measurement",
},
},
{
Key: &semantic.Identifier{Name: "value"},
Value: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "_value",
},
},
},
},
},
},
},
{
ID: "yield0",
Spec: &functions.YieldOpSpec{
Name: "0",
},
},
},
Edges: []query.Edge{
{Parent: "from0", Child: "range0"},
{Parent: "range0", Child: "filter0"},
{Parent: "filter0", Child: "group0"},
{Parent: "group0", Child: "map0"},
{Parent: "map0", Child: "yield0"},
},
},
},
} {
t.Run(tt.s, func(t *testing.T) {
if err := tt.spec.Validate(); err != nil {