2018-06-08 18:07:10 +00:00
|
|
|
package influxql
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
|
2018-09-06 18:09:52 +00:00
|
|
|
"github.com/influxdata/flux"
|
|
|
|
"github.com/influxdata/flux/execute"
|
|
|
|
"github.com/influxdata/flux/functions"
|
|
|
|
"github.com/influxdata/flux/semantic"
|
2018-06-08 18:07:10 +00:00
|
|
|
"github.com/influxdata/influxql"
|
|
|
|
)
|
|
|
|
|
|
|
|
type joinCursor struct {
|
2018-09-06 18:09:52 +00:00
|
|
|
id flux.OperationID
|
2018-06-08 18:07:10 +00:00
|
|
|
m map[influxql.Expr]string
|
|
|
|
exprs []influxql.Expr
|
|
|
|
}
|
|
|
|
|
|
|
|
func Join(t *transpilerState, cursors []cursor, on, except []string) cursor {
|
|
|
|
if len(cursors) == 1 {
|
|
|
|
return cursors[0]
|
|
|
|
}
|
|
|
|
|
|
|
|
// Iterate through each cursor and each expression within each cursor to assign them an id.
|
|
|
|
var (
|
|
|
|
exprs []influxql.Expr
|
|
|
|
properties []*semantic.Property
|
|
|
|
)
|
|
|
|
m := make(map[influxql.Expr]string)
|
2018-09-06 18:09:52 +00:00
|
|
|
tables := make(map[flux.OperationID]string)
|
2018-06-08 18:07:10 +00:00
|
|
|
for i, cur := range cursors {
|
|
|
|
// Record this incoming cursor within the table.
|
|
|
|
tableName := fmt.Sprintf("t%d", i)
|
|
|
|
tables[cur.ID()] = tableName
|
|
|
|
|
|
|
|
for _, k := range cur.Keys() {
|
|
|
|
// Generate a name for accessing this expression and generate the index entries for it.
|
|
|
|
name := fmt.Sprintf("val%d", len(exprs))
|
|
|
|
exprs = append(exprs, k)
|
|
|
|
m[k] = name
|
|
|
|
|
|
|
|
property := &semantic.Property{
|
|
|
|
Key: &semantic.Identifier{Name: name},
|
|
|
|
Value: &semantic.MemberExpression{
|
|
|
|
Object: &semantic.IdentifierExpression{
|
|
|
|
Name: "tables",
|
|
|
|
},
|
|
|
|
Property: tableName,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
if valName, _ := cur.Value(k); valName != execute.DefaultValueColLabel {
|
|
|
|
property.Value = &semantic.MemberExpression{
|
|
|
|
Object: property.Value,
|
|
|
|
Property: valName,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
properties = append(properties, property)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-06-08 20:56:59 +00:00
|
|
|
// Retrieve the parent ids from the cursors.
|
2018-09-06 18:09:52 +00:00
|
|
|
parents := make([]flux.OperationID, 0, len(tables))
|
2018-06-08 20:56:59 +00:00
|
|
|
for _, cur := range cursors {
|
|
|
|
parents = append(parents, cur.ID())
|
2018-06-08 18:07:10 +00:00
|
|
|
}
|
|
|
|
id := t.op("join", &functions.JoinOpSpec{
|
|
|
|
TableNames: tables,
|
2018-07-25 22:26:50 +00:00
|
|
|
On: on,
|
2018-06-08 18:07:10 +00:00
|
|
|
// TODO(jsternberg): This option needs to be included.
|
|
|
|
//Except: except,
|
|
|
|
}, parents...)
|
|
|
|
return &joinCursor{
|
|
|
|
id: id,
|
|
|
|
m: m,
|
|
|
|
exprs: exprs,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-09-06 18:09:52 +00:00
|
|
|
func (c *joinCursor) ID() flux.OperationID {
|
2018-06-08 18:07:10 +00:00
|
|
|
return c.id
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *joinCursor) Keys() []influxql.Expr {
|
|
|
|
keys := make([]influxql.Expr, 0, len(c.m))
|
|
|
|
for k := range c.m {
|
|
|
|
keys = append(keys, k)
|
|
|
|
}
|
|
|
|
return keys
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *joinCursor) Value(expr influxql.Expr) (string, bool) {
|
|
|
|
value, ok := c.m[expr]
|
|
|
|
return value, ok
|
|
|
|
}
|