flux options interface

default now option value
pull/10616/head
jlapacik 2018-07-11 13:03:14 -07:00
parent 244288fa66
commit 524c4ccf35
29 changed files with 513 additions and 113 deletions

View File

@ -7,6 +7,7 @@ import (
"github.com/influxdata/platform/query/complete"
_ "github.com/influxdata/platform/query/functions" // Import the built-in functions
"github.com/influxdata/platform/query/interpreter"
_ "github.com/influxdata/platform/query/options" // Import the built-in options
)
func init() {

View File

@ -19,6 +19,7 @@ const (
TableParameter = "table"
tableKindKey = "kind"
tableParentsKey = "parents"
nowOption = "now"
//tableSpecKey = "spec"
)
@ -35,7 +36,8 @@ type options struct {
}
// Compile evaluates a Flux script producing a query Spec.
func Compile(ctx context.Context, q string, opts ...Option) (*Spec, error) {
// now parameter must be non-zero, that is the default now time should be set before compiling.
func Compile(ctx context.Context, q string, now time.Time, opts ...Option) (*Spec, error) {
o := new(options)
for _, opt := range opts {
opt(o)
@ -49,8 +51,10 @@ func Compile(ctx context.Context, q string, opts ...Option) (*Spec, error) {
s, _ = opentracing.StartSpanFromContext(ctx, "compile")
defer s.Finish()
scope, decls := builtIns()
interpScope := interpreter.NewScopeWithValues(scope)
itrp := NewInterpreter()
itrp.SetOption(nowOption, nowFunc(now))
_, decls := builtIns(itrp)
// Convert AST program to a semantic program
semProg, err := semantic.New(astProg, decls)
@ -58,11 +62,10 @@ func Compile(ctx context.Context, q string, opts ...Option) (*Spec, error) {
return nil, err
}
operations, err := interpreter.Eval(semProg, interpScope)
if err != nil {
if err := itrp.Eval(semProg); err != nil {
return nil, err
}
spec := toSpec(operations)
spec := toSpec(itrp)
if o.verbose {
log.Println("Query Spec: ", Formatted(spec, FmtJSON))
@ -70,7 +73,38 @@ func Compile(ctx context.Context, q string, opts ...Option) (*Spec, error) {
return spec, nil
}
func toSpec(stmtVals []values.Value) *Spec {
// NewInterpreter returns an interpreter instance with
// pre-constructed options and global scopes.
func NewInterpreter() *interpreter.Interpreter {
options := make(map[string]values.Value, len(builtinOptions))
globals := make(map[string]values.Value, len(builtinScope))
for k, v := range builtinScope {
globals[k] = v
}
for k, v := range builtinOptions {
options[k] = v
}
return interpreter.NewInterpreter(options, globals)
}
func nowFunc(now time.Time) values.Function {
timeVal := values.NewTimeValue(values.ConvertTime(now))
ftype := semantic.NewFunctionType(semantic.FunctionSignature{
ReturnType: semantic.Time,
})
call := func(args values.Object) (values.Value, error) {
return timeVal, nil
}
sideEffect := false
return values.NewFunction(nowOption, ftype, call, sideEffect)
}
func toSpec(itrp *interpreter.Interpreter) *Spec {
operations := itrp.SideEffects()
ider := &ider{
id: 0,
lookup: make(map[*TableObject]OperationID),
@ -78,9 +112,9 @@ func toSpec(stmtVals []values.Value) *Spec {
spec := new(Spec)
visited := make(map[*TableObject]bool)
nodes := make([]*TableObject, 0, len(stmtVals))
nodes := make([]*TableObject, 0, len(operations))
for _, val := range stmtVals {
for _, val := range operations {
if op, ok := val.(*TableObject); ok {
dup := false
for _, node := range nodes {
@ -95,12 +129,23 @@ func toSpec(stmtVals []values.Value) *Spec {
}
}
}
// now option is Time value
nowValue, _ := itrp.Option(nowOption).Function().Call(nil)
spec.Now = nowValue.Time().Time()
return spec
}
type CreateOperationSpec func(args Arguments, a *Administration) (OperationSpec, error)
var builtinScope = make(map[string]values.Value)
// TODO(Josh): Default option values should be registered similarly to built-in
// functions. Default options should be registered in their own files
// (or in a single file) using the RegisterBuiltInOption function which will
// place the resolved option value in the following map.
var builtinOptions = make(map[string]values.Value)
var builtinDeclarations = make(semantic.DeclarationScope)
// list of builtin scripts
@ -150,6 +195,17 @@ func RegisterBuiltInValue(name string, v values.Value) {
builtinScope[name] = v
}
// RegisterBuiltInOption adds the value to the builtin scope.
func RegisterBuiltInOption(name string, v values.Value) {
if finalized {
panic(errors.New("already finalized, cannot register builtin option"))
}
if _, ok := builtinOptions[name]; ok {
panic(fmt.Errorf("duplicate registration for builtin option %q", name))
}
builtinOptions[name] = v
}
// FinalizeBuiltIns must be called to complete registration.
// Future calls to RegisterFunction, RegisterBuiltIn or RegisterBuiltInValue will panic.
func FinalizeBuiltIns() {
@ -373,16 +429,12 @@ func BuiltIns() (map[string]values.Value, semantic.DeclarationScope) {
if !finalized {
panic("builtins not finalized")
}
return builtIns()
return builtIns(NewInterpreter())
}
func builtIns() (map[string]values.Value, semantic.DeclarationScope) {
func builtIns(itrp *interpreter.Interpreter) (map[string]values.Value, semantic.DeclarationScope) {
decls := builtinDeclarations.Copy()
scope := make(map[string]values.Value, len(builtinScope))
for k, v := range builtinScope {
scope[k] = v
}
interpScope := interpreter.NewScopeWithValues(scope)
for name, script := range builtins {
astProg, err := parser.NewAST(script)
if err != nil {
@ -393,11 +445,11 @@ func builtIns() (map[string]values.Value, semantic.DeclarationScope) {
panic(errors.Wrapf(err, "failed to create semantic graph for builtin %q", name))
}
if _, err := interpreter.Eval(semProg, interpScope); err != nil {
if err := itrp.Eval(semProg); err != nil {
panic(errors.Wrapf(err, "failed to evaluate builtin %q", name))
}
}
return scope, decls
return itrp.GlobalScope().Values(), decls
}
type Administration struct {

View File

@ -114,7 +114,7 @@ func (c *Controller) compileQuery(q *Query, queryStr string) error {
if !q.tryCompile() {
return errors.New("failed to transition query to compiling state")
}
spec, err := query.Compile(q.compilingCtx, queryStr, query.Verbose(c.verbose))
spec, err := query.Compile(q.compilingCtx, queryStr, q.now, query.Verbose(c.verbose))
if err != nil {
return errors.Wrap(err, "failed to compile query")
}
@ -226,7 +226,7 @@ func (c *Controller) processQuery(q *Query) (pop bool, err error) {
log.Println("logical plan", plan.Formatted(lp))
}
p, err := c.pplanner.Plan(lp, nil, q.now)
p, err := c.pplanner.Plan(lp, nil)
if err != nil {
return true, errors.Wrap(err, "failed to create physical plan")
}

View File

@ -4,10 +4,10 @@ import (
"testing"
"time"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/plan/plantest"
"github.com/influxdata/platform/query/querytest"

View File

@ -127,7 +127,7 @@ func createJoinOpSpec(args query.Arguments, a *query.Administration) (query.Oper
return
}
p := t.(*query.TableObject)
joinParams.add(k /*parameter*/, p /*argument*/)
joinParams.add(k, p)
spec.tableNames[p] = k
})
if err != nil {

View File

@ -4,9 +4,9 @@ import (
"math"
"testing"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/querytest"
)

View File

@ -4,10 +4,10 @@ import (
"math"
"testing"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/querytest"
)

View File

@ -9,6 +9,7 @@ import (
"path/filepath"
"regexp"
"strings"
"time"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
@ -69,7 +70,7 @@ func main() {
}
qs := querytest.GetQueryServiceBridge()
qspec, err := query.Compile(context.Background(), string(querytext))
qspec, err := query.Compile(context.Background(), string(querytext), time.Now().UTC())
if err != nil {
fmt.Printf("error compiling. \n query: \n %s \n err: %s", string(querytext), err)
return

View File

@ -6,6 +6,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
@ -83,7 +84,7 @@ func queryTester(t *testing.T, qs query.QueryService, prefix, queryExt string) e
t.Fatal(err)
}
spec, err := query.Compile(context.Background(), q)
spec, err := query.Compile(context.Background(), q, time.Now().UTC())
if err != nil {
t.Fatalf("failed to compile: %v", err)
}

View File

@ -4,9 +4,9 @@ import (
"testing"
"time"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/plan/plantest"
"github.com/influxdata/platform/query/querytest"

View File

@ -4,9 +4,9 @@ import (
"math"
"testing"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/querytest"
)

View File

@ -3,9 +3,9 @@ package functions_test
import (
"testing"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/querytest"
)

View File

@ -4,9 +4,9 @@ import (
"math"
"testing"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/querytest"
)

View File

@ -3,9 +3,9 @@ package functions_test
import (
"testing"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/plan/plantest"
"github.com/influxdata/platform/query/querytest"

View File

@ -0,0 +1,29 @@
package functions
import (
"time"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/semantic"
"github.com/influxdata/platform/query/values"
)
var systemTimeFuncName = "systemTime"
func init() {
nowFunc := SystemTime()
query.RegisterBuiltInValue(systemTimeFuncName, nowFunc)
}
// SystemTime return a function value that when called will give the current system time
func SystemTime() values.Value {
name := systemTimeFuncName
ftype := semantic.NewFunctionType(semantic.FunctionSignature{
ReturnType: semantic.Time,
})
call := func(args values.Object) (values.Value, error) {
return values.NewTimeValue(values.ConvertTime(time.Now().UTC())), nil
}
sideEffect := false
return values.NewFunction(name, ftype, call, sideEffect)
}

View File

@ -10,19 +10,63 @@ import (
"github.com/pkg/errors"
)
func Eval(program *semantic.Program, scope *Scope) ([]values.Value, error) {
itrp := interpreter{}
err := itrp.eval(program, scope)
return itrp.values, err
// Interpreter used to interpret a Flux program
type Interpreter struct {
values []values.Value
options *Scope
globals *Scope
}
type interpreter struct {
values []values.Value
// NewInterpreter instantiates a new Flux Interpreter
func NewInterpreter(options, builtins map[string]values.Value) *Interpreter {
optionScope := NewScopeWithValues(options)
globalScope := optionScope.NestWithValues(builtins)
interpreter := new(Interpreter)
interpreter.options = optionScope
interpreter.globals = globalScope
return interpreter
}
func (itrp *interpreter) eval(program *semantic.Program, scope *Scope) error {
// Return gives the return value from the block
func (itrp *Interpreter) Return() values.Value {
return itrp.globals.Return()
}
// GlobalScope returns a pointer to the global scope of the program.
// That is the scope nested directly below the options scope.
func (itrp *Interpreter) GlobalScope() *Scope {
return itrp.globals
}
// SetVar adds a variable binding to the global scope
func (itrp *Interpreter) SetVar(name string, val values.Value) {
itrp.globals.Set(name, val)
}
// SideEffects returns the evaluated expressions of a Flux program
func (itrp *Interpreter) SideEffects() []values.Value {
return itrp.values
}
// Option returns a Flux option by name
func (itrp *Interpreter) Option(name string) values.Value {
return itrp.options.Get(name)
}
// SetOption sets a new option binding
func (itrp *Interpreter) SetOption(name string, val values.Value) {
itrp.options.Set(name, val)
}
// Eval evaluates the expressions composing a Flux program.
func (itrp *Interpreter) Eval(program *semantic.Program) error {
return itrp.eval(program)
}
func (itrp *Interpreter) eval(program *semantic.Program) error {
topLevelScope := itrp.globals
for _, stmt := range program.Body {
val, err := itrp.doStatement(stmt, scope)
val, err := itrp.doStatement(stmt, topLevelScope)
if err != nil {
return err
}
@ -34,11 +78,11 @@ func (itrp *interpreter) eval(program *semantic.Program, scope *Scope) error {
}
// doStatement returns the resolved value of a top-level statement
func (itrp *interpreter) doStatement(stmt semantic.Statement, scope *Scope) (values.Value, error) {
func (itrp *Interpreter) doStatement(stmt semantic.Statement, scope *Scope) (values.Value, error) {
scope.SetReturn(values.InvalidValue)
switch s := stmt.(type) {
case *semantic.OptionStatement:
return itrp.doStatement(s.Declaration, scope)
return itrp.doOptionStatement(s.Declaration.(*semantic.NativeVariableDeclaration), scope)
case *semantic.NativeVariableDeclaration:
return itrp.doVariableDeclaration(s, scope)
case *semantic.ExpressionStatement:
@ -62,8 +106,8 @@ func (itrp *interpreter) doStatement(stmt semantic.Statement, scope *Scope) (val
}
}
}
// Propgate any return value from the nested scope out.
// Since a return statement is always last we do not have to worry about overriding an existing return value.
// Propgate any return value from the nested scope out. Since a return statement is
// always last we do not have to worry about overriding an existing return value.
scope.SetReturn(nested.Return())
case *semantic.ReturnStatement:
v, err := itrp.doExpression(s.Argument, scope)
@ -77,7 +121,16 @@ func (itrp *interpreter) doStatement(stmt semantic.Statement, scope *Scope) (val
return nil, nil
}
func (itrp *interpreter) doVariableDeclaration(declaration *semantic.NativeVariableDeclaration, scope *Scope) (values.Value, error) {
func (itrp *Interpreter) doOptionStatement(declaration *semantic.NativeVariableDeclaration, scope *Scope) (values.Value, error) {
value, err := itrp.doExpression(declaration.Init, scope)
if err != nil {
return nil, err
}
itrp.options.Set(declaration.Identifier.Name, value)
return value, nil
}
func (itrp *Interpreter) doVariableDeclaration(declaration *semantic.NativeVariableDeclaration, scope *Scope) (values.Value, error) {
value, err := itrp.doExpression(declaration.Init, scope)
if err != nil {
return nil, err
@ -86,7 +139,7 @@ func (itrp *interpreter) doVariableDeclaration(declaration *semantic.NativeVaria
return value, nil
}
func (itrp *interpreter) doExpression(expr semantic.Expression, scope *Scope) (values.Value, error) {
func (itrp *Interpreter) doExpression(expr semantic.Expression, scope *Scope) (values.Value, error) {
switch e := expr.(type) {
case semantic.Literal:
return itrp.doLiteral(e)
@ -208,7 +261,7 @@ func (itrp *interpreter) doExpression(expr semantic.Expression, scope *Scope) (v
}
}
func (itrp *interpreter) doArray(a *semantic.ArrayExpression, scope *Scope) (values.Value, error) {
func (itrp *Interpreter) doArray(a *semantic.ArrayExpression, scope *Scope) (values.Value, error) {
elements := make([]values.Value, len(a.Elements))
elementType := semantic.EmptyArrayType.ElementType()
for i, el := range a.Elements {
@ -227,7 +280,7 @@ func (itrp *interpreter) doArray(a *semantic.ArrayExpression, scope *Scope) (val
return values.NewArrayWithBacking(elementType, elements), nil
}
func (itrp *interpreter) doObject(m *semantic.ObjectExpression, scope *Scope) (values.Value, error) {
func (itrp *Interpreter) doObject(m *semantic.ObjectExpression, scope *Scope) (values.Value, error) {
obj := values.NewObject()
for _, p := range m.Properties {
v, err := itrp.doExpression(p.Value, scope)
@ -242,7 +295,7 @@ func (itrp *interpreter) doObject(m *semantic.ObjectExpression, scope *Scope) (v
return obj, nil
}
func (itrp *interpreter) doLiteral(lit semantic.Literal) (values.Value, error) {
func (itrp *Interpreter) doLiteral(lit semantic.Literal) (values.Value, error) {
switch l := lit.(type) {
case *semantic.DateTimeLiteral:
return values.NewTimeValue(values.Time(l.Value.UnixNano())), nil
@ -288,7 +341,7 @@ func DoFunctionCall(f func(args Arguments) (values.Value, error), argsObj values
return v, nil
}
func (itrp *interpreter) doCall(call *semantic.CallExpression, scope *Scope) (values.Value, error) {
func (itrp *Interpreter) doCall(call *semantic.CallExpression, scope *Scope) (values.Value, error) {
callee, err := itrp.doExpression(call.Callee, scope)
if err != nil {
return nil, err
@ -319,7 +372,7 @@ func (itrp *interpreter) doCall(call *semantic.CallExpression, scope *Scope) (va
return value, nil
}
func (itrp *interpreter) doArguments(args *semantic.ObjectExpression, scope *Scope) (values.Object, error) {
func (itrp *Interpreter) doArguments(args *semantic.ObjectExpression, scope *Scope) (values.Object, error) {
obj := values.NewObject()
if args == nil || len(args.Properties) == 0 {
return obj, nil
@ -337,6 +390,7 @@ func (itrp *interpreter) doArguments(args *semantic.ObjectExpression, scope *Sco
return obj, nil
}
// TODO(Josh): Scope methods should be private
type Scope struct {
parent *Scope
values map[string]values.Value
@ -348,9 +402,35 @@ func NewScope() *Scope {
values: make(map[string]values.Value),
}
}
func NewScopeWithValues(values map[string]values.Value) *Scope {
func NewScopeWithValues(vals map[string]values.Value) *Scope {
cp := make(map[string]values.Value, len(vals))
for k, v := range vals {
cp[k] = v
}
return &Scope{
values: values,
values: cp,
}
}
func (s *Scope) Get(name string) values.Value {
return s.values[name]
}
func (s *Scope) Set(name string, value values.Value) {
s.values[name] = value
}
func (s *Scope) Values() map[string]values.Value {
cp := make(map[string]values.Value, len(s.values))
for k, v := range s.values {
cp[k] = v
}
return cp
}
func (s *Scope) SetValues(vals map[string]values.Value) {
for k, v := range vals {
s.values[k] = v
}
}
@ -365,10 +445,6 @@ func (s *Scope) Lookup(name string) (values.Value, bool) {
return v, ok
}
func (s *Scope) Set(name string, value values.Value) {
s.values[name] = value
}
// SetReturn sets the return value of this scope.
func (s *Scope) SetReturn(value values.Value) {
s.returnValue = value
@ -397,6 +473,12 @@ func (s *Scope) Nest() *Scope {
return c
}
func (s *Scope) NestWithValues(values map[string]values.Value) *Scope {
c := NewScopeWithValues(values)
c.parent = s
return c
}
// Copy returns a copy of the scope and its parents.
func (s *Scope) Copy() *Scope {
c := NewScope()
@ -455,7 +537,7 @@ type function struct {
scope *Scope
call func(Arguments) (values.Value, error)
itrp *interpreter
itrp *Interpreter
}
func (f *function) Type() semantic.Type {

View File

@ -14,15 +14,20 @@ import (
"github.com/influxdata/platform/query/values"
)
var testScope = interpreter.NewScope()
var testScope = make(map[string]values.Value)
var optionScope = make(map[string]values.Value)
var testDeclarations = make(semantic.DeclarationScope)
var optionsObject = values.NewObject()
func addFunc(f *function) {
testScope.Set(f.name, f)
testScope[f.name] = f
testDeclarations[f.name] = semantic.NewExternalVariableDeclaration(f.name, f.t)
}
func addOption(name string, opt values.Value) {
optionScope[name] = opt
}
func init() {
addFunc(&function{
name: "fortyTwo",
@ -90,8 +95,11 @@ func init() {
},
hasSideEffect: true,
})
optionsObject.Set("name", values.NewStringValue("foo"))
optionsObject.Set("repeat", values.NewIntValue(100))
addOption("task", optionsObject)
}
// TestEval tests whether a program can run to completion or not
@ -337,14 +345,17 @@ func TestEval(t *testing.T) {
t.Fatal(err)
}
values, err := interpreter.Eval(graph, testScope.Nest())
// Create new interpreter scope for each test case
itrp := interpreter.NewInterpreter(optionScope, testScope)
err = itrp.Eval(graph)
if !tc.wantErr && err != nil {
t.Fatal(err)
} else if tc.wantErr && err == nil {
t.Fatal("expected error")
}
if tc.want != nil && !cmp.Equal(tc.want, values, semantictest.CmpOptions...) {
t.Fatalf("unexpected side effect values -want/+got: \n%s", cmp.Diff(tc.want, values, semantictest.CmpOptions...))
if tc.want != nil && !cmp.Equal(tc.want, itrp.SideEffects(), semantictest.CmpOptions...) {
t.Fatalf("unexpected side effect values -want/+got: \n%s", cmp.Diff(tc.want, itrp.SideEffects(), semantictest.CmpOptions...))
}
})
}
@ -352,7 +363,6 @@ func TestEval(t *testing.T) {
}
func TestResolver(t *testing.T) {
var got semantic.Expression
scope := interpreter.NewScope()
declarations := make(semantic.DeclarationScope)
f := &function{
name: "resolver",
@ -382,7 +392,7 @@ func TestResolver(t *testing.T) {
},
hasSideEffect: false,
}
scope.Set(f.name, f)
testScope[f.name] = f
declarations[f.name] = semantic.NewExternalVariableDeclaration(f.name, f.t)
program, err := parser.NewAST(`
@ -398,7 +408,9 @@ func TestResolver(t *testing.T) {
t.Fatal(err)
}
if _, err := interpreter.Eval(graph, scope); err != nil {
itrp := interpreter.NewInterpreter(optionScope, testScope)
if err := itrp.Eval(graph); err != nil {
t.Fatal(err)
}
@ -425,42 +437,42 @@ type function struct {
hasSideEffect bool
}
func (f function) Type() semantic.Type {
func (f *function) Type() semantic.Type {
return f.t
}
func (f function) Str() string {
func (f *function) Str() string {
panic(values.UnexpectedKind(semantic.Object, semantic.String))
}
func (f function) Int() int64 {
func (f *function) Int() int64 {
panic(values.UnexpectedKind(semantic.Object, semantic.Int))
}
func (f function) UInt() uint64 {
func (f *function) UInt() uint64 {
panic(values.UnexpectedKind(semantic.Object, semantic.UInt))
}
func (f function) Float() float64 {
func (f *function) Float() float64 {
panic(values.UnexpectedKind(semantic.Object, semantic.Float))
}
func (f function) Bool() bool {
func (f *function) Bool() bool {
panic(values.UnexpectedKind(semantic.Object, semantic.Bool))
}
func (f function) Time() values.Time {
func (f *function) Time() values.Time {
panic(values.UnexpectedKind(semantic.Object, semantic.Time))
}
func (f function) Duration() values.Duration {
func (f *function) Duration() values.Duration {
panic(values.UnexpectedKind(semantic.Object, semantic.Duration))
}
func (f function) Regexp() *regexp.Regexp {
func (f *function) Regexp() *regexp.Regexp {
panic(values.UnexpectedKind(semantic.Object, semantic.Regexp))
}
func (f function) Array() values.Array {
func (f *function) Array() values.Array {
panic(values.UnexpectedKind(semantic.Object, semantic.Function))
}
func (f function) Object() values.Object {
func (f *function) Object() values.Object {
panic(values.UnexpectedKind(semantic.Object, semantic.Object))
}
func (f function) Function() values.Function {
return &f
func (f *function) Function() values.Function {
return f
}
func (f *function) Equal(rhs values.Value) bool {
if f.Type() != rhs.Type() {
@ -469,10 +481,10 @@ func (f *function) Equal(rhs values.Value) bool {
v, ok := rhs.(*function)
return ok && (f == v)
}
func (f function) HasSideEffect() bool {
func (f *function) HasSideEffect() bool {
return f.hasSideEffect
}
func (f function) Call(args values.Object) (values.Value, error) {
func (f *function) Call(args values.Object) (values.Value, error) {
return f.call(args)
}

10
query/options/now.go Normal file
View File

@ -0,0 +1,10 @@
package options
import (
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/functions"
)
func init() {
query.RegisterBuiltInOption("now", functions.SystemTime())
}

View File

@ -2,6 +2,7 @@ package plan
import (
"fmt"
"time"
"github.com/influxdata/platform/query"
uuid "github.com/satori/go.uuid"
@ -14,6 +15,7 @@ type LogicalPlanSpec struct {
Procedures map[ProcedureID]*Procedure
Order []ProcedureID
Resources query.ResourceManagement
Now time.Time
}
func (lp *LogicalPlanSpec) Do(f func(pr *Procedure)) {
@ -49,6 +51,7 @@ func (p *logicalPlanner) Plan(q *query.Spec) (*LogicalPlanSpec, error) {
if err != nil {
return nil, err
}
p.plan.Now = q.Now
return p.plan, nil
}

View File

@ -6,8 +6,8 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/plan/plantest"
)
@ -171,6 +171,10 @@ func TestLogicalPlanner_Plan(t *testing.T) {
for i, tc := range testCases {
tc := tc
t.Run(strconv.Itoa(i), func(t *testing.T) {
// Set Now time on query spec
tc.q.Now = time.Now().UTC()
tc.ap.Now = tc.q.Now
planner := plan.NewLogicalPlanner()
got, err := planner.Plan(tc.q)
if err != nil {

View File

@ -14,15 +14,16 @@ import (
const DefaultYieldName = "_result"
type PlanSpec struct {
// Now represents the relative currentl time of the plan.
// Now represents the relative current time of the plan.
Now time.Time
Bounds BoundsSpec
// Procedures is a set of all operations
Procedures map[ProcedureID]*Procedure
Order []ProcedureID
// Results is a list of datasets that are the result of the plan
Results map[string]YieldSpec
// Results is a list of datasets that are the result of the plan
Results map[string]YieldSpec
Resources query.ResourceManagement
}
@ -42,7 +43,7 @@ func (p *PlanSpec) lookup(id ProcedureID) *Procedure {
type Planner interface {
// Plan create a plan from the logical plan and available storage.
Plan(p *LogicalPlanSpec, s Storage, now time.Time) (*PlanSpec, error)
Plan(p *LogicalPlanSpec, s Storage) (*PlanSpec, error)
}
type PlanRewriter interface {
@ -61,7 +62,9 @@ func NewPlanner() Planner {
return new(planner)
}
func (p *planner) Plan(lp *LogicalPlanSpec, s Storage, now time.Time) (*PlanSpec, error) {
func (p *planner) Plan(lp *LogicalPlanSpec, s Storage) (*PlanSpec, error) {
now := lp.Now
p.plan = &PlanSpec{
Now: now,
Procedures: make(map[ProcedureID]*Procedure, len(lp.Procedures)),

View File

@ -1050,12 +1050,14 @@ func TestPhysicalPlanner_Plan_PushDown_Mixed(t *testing.T) {
func PhysicalPlanTestHelper(t *testing.T, lp *plan.LogicalPlanSpec, want *plan.PlanSpec) {
t.Helper()
// Setup expected now time
now := time.Now()
lp.Now = now
want.Now = now
planner := plan.NewPlanner()
got, err := planner.Plan(lp, nil, now)
got, err := planner.Plan(lp, nil)
if err != nil {
t.Fatal(err)
}
@ -1076,10 +1078,11 @@ func BenchmarkPhysicalPlan(b *testing.B) {
if err != nil {
b.Fatal(err)
}
planner := plan.NewPlanner()
now := time.Date(2017, 8, 8, 0, 0, 0, 0, time.UTC)
lp.Now = now
planner := plan.NewPlanner()
for n := 0; n < b.N; n++ {
benchmarkPhysicalPlan, err = planner.Plan(lp, nil, now)
benchmarkPhysicalPlan, err = planner.Plan(lp, nil)
if err != nil {
b.Fatal(err)
}
@ -1097,7 +1100,8 @@ func BenchmarkQueryToPhysicalPlan(b *testing.B) {
if err != nil {
b.Fatal(err)
}
benchmarkQueryToPhysicalPlan, err = pp.Plan(lp, nil, now)
lp.Now = now
benchmarkQueryToPhysicalPlan, err = pp.Plan(lp, nil)
if err != nil {
b.Fatal(err)
}

View File

@ -3,16 +3,26 @@ package query_test
import (
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/functions"
_ "github.com/influxdata/platform/query/options"
"github.com/influxdata/platform/query/parser"
"github.com/influxdata/platform/query/semantic"
"github.com/influxdata/platform/query/values"
)
func init() {
query.FinalizeBuiltIns()
}
var ignoreUnexportedQuerySpec = cmpopts.IgnoreUnexported(query.Spec{})
func TestQuery_JSON(t *testing.T) {
@ -266,3 +276,93 @@ func TestQuery_Walk(t *testing.T) {
})
}
}
// Example_option demonstrates retrieving an option from the Flux interpreter
func Example_option() {
// Instantiate a new Flux interpreter with pre-populated option and global scopes
itrp := query.NewInterpreter()
// Retrieve the default value for an option
nowFunc := itrp.Option("now")
// The now option is a function value whose default behavior is to return
// the current system time when called. The function now() doesn't take
// any arguments so can be called with nil.
nowTime, _ := nowFunc.Function().Call(nil)
fmt.Fprintf(os.Stderr, "The current system time (UTC) is: %v\n", nowTime)
// Output:
}
// Example_setOption demonstrates setting an option from the Flux interpreter
func Example_setOption() {
// Instantiate a new Flux interpreter with pre-populated option and global scopes
itrp := query.NewInterpreter()
// Set a new option from the interpreter
itrp.SetOption("dummy_option", values.NewIntValue(3))
fmt.Printf("dummy_option = %d", itrp.Option("dummy_option").Int())
// Output: dummy_option = 3
}
// Example_overrideDefaultOptionExternally demonstrates how declaring an option
// in a Flux script will change that option's binding in the options scope of the interpreter.
func Example_overrideDefaultOptionExternally() {
queryString := `
now = () => 2018-07-13T00:00:00Z
what_time_is_it = now()`
itrp := query.NewInterpreter()
_, declarations := query.BuiltIns()
ast, _ := parser.NewAST(queryString)
semanticProgram, _ := semantic.New(ast, declarations)
// Evaluate program
itrp.Eval(semanticProgram)
// After evaluating the program, lookup the value of what_time_is_it
now, _ := itrp.GlobalScope().Lookup("what_time_is_it")
// what_time_is_it? Why it's ....
fmt.Printf("The new current time (UTC) is: %v", now)
// Output: The new current time (UTC) is: 2018-07-13T00:00:00.000000000Z
}
// Example_overrideDefaultOptionInternally demonstrates how one can override a default
// option that is used in a query before that query is evaluated by the interpreter.
func Example_overrideDefaultOptionInternally() {
queryString := `what_time_is_it = now()`
itrp := query.NewInterpreter()
_, declarations := query.BuiltIns()
ast, _ := parser.NewAST(queryString)
semanticProgram, _ := semantic.New(ast, declarations)
// Define a new now function which returns a static time value of 2018-07-13T00:00:00.000000000Z
timeValue := time.Date(2018, 7, 13, 0, 0, 0, 0, time.UTC)
functionName := "newTime"
functionType := semantic.NewFunctionType(semantic.FunctionSignature{
ReturnType: semantic.Time,
})
functionCall := func(args values.Object) (values.Value, error) {
return values.NewTimeValue(values.ConvertTime(timeValue)), nil
}
sideEffect := false
newNowFunc := values.NewFunction(functionName, functionType, functionCall, sideEffect)
// Override the default now function with the new one
itrp.SetOption("now", newNowFunc)
// Evaluate program
itrp.Eval(semanticProgram)
// After evaluating the program, lookup the value of what_time_is_it
now, _ := itrp.GlobalScope().Lookup("what_time_is_it")
// what_time_is_it? Why it's ....
fmt.Printf("The new current time (UTC) is: %v", now)
// Output: The new current time (UTC) is: 2018-07-13T00:00:00.000000000Z
}

View File

@ -1,13 +1,14 @@
package querytest
import (
"github.com/influxdata/platform/query/functions"
"context"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/semantic/semantictest"
)
@ -29,7 +30,8 @@ var opts = append(
func NewQueryTestHelper(t *testing.T, tc NewQueryTestCase) {
t.Helper()
got, err := query.Compile(context.Background(), tc.Raw)
now := time.Now().UTC()
got, err := query.Compile(context.Background(), tc.Raw, now)
if (err != nil) != tc.WantErr {
t.Errorf("query.NewQuery() error = %v, wantErr %v", err, tc.WantErr)
return
@ -37,6 +39,9 @@ func NewQueryTestHelper(t *testing.T, tc NewQueryTestCase) {
if tc.WantErr {
return
}
if tc.Want != nil {
tc.Want.Now = now
}
if !cmp.Equal(tc.Want, got, opts...) {
t.Errorf("query.NewQuery() = -want/+got %s", cmp.Diff(tc.Want, got, opts...))
}

View File

@ -8,6 +8,7 @@ import (
"path/filepath"
"regexp"
"strings"
"time"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/platform/query"
@ -62,7 +63,7 @@ func main() {
return
}
fluxSpec, err := query.Compile(context.Background(), string(fluxText))
fluxSpec, err := query.Compile(context.Background(), string(fluxText), time.Now().UTC())
if err != nil {
fmt.Printf("error compiling. \n query: \n %s \n err: %s", string(fluxText), err)
return

View File

@ -30,7 +30,7 @@ import (
type REPL struct {
orgID platform.ID
scope *interpreter.Scope
interpreter *interpreter.Interpreter
declarations semantic.DeclarationScope
c *control.Controller
@ -38,7 +38,7 @@ type REPL struct {
cancelFunc context.CancelFunc
}
func addBuiltIn(script string, scope *interpreter.Scope, declarations semantic.DeclarationScope) error {
func addBuiltIn(script string, itrp *interpreter.Interpreter, declarations semantic.DeclarationScope) error {
astProg, err := parser.NewAST(script)
if err != nil {
return errors.Wrap(err, "failed to parse builtin")
@ -48,20 +48,21 @@ func addBuiltIn(script string, scope *interpreter.Scope, declarations semantic.D
return errors.Wrap(err, "failed to create semantic graph for builtin")
}
if _, err := interpreter.Eval(semProg, scope); err != nil {
if err := itrp.Eval(semProg); err != nil {
return errors.Wrap(err, "failed to evaluate builtin")
}
return nil
}
func New(c *control.Controller, orgID platform.ID) *REPL {
scope, declarations := query.BuiltIns()
interpScope := interpreter.NewScopeWithValues(scope)
addBuiltIn("run = () => yield(table:_)", interpScope, declarations)
itrp := query.NewInterpreter()
_, decls := query.BuiltIns()
addBuiltIn("run = () => yield(table:_)", itrp, decls)
return &REPL{
orgID: orgID,
scope: interpScope,
declarations: declarations,
interpreter: itrp,
declarations: decls,
c: c,
}
}
@ -102,7 +103,7 @@ func (r *REPL) clearCancel() {
}
func (r *REPL) completer(d prompt.Document) []prompt.Suggest {
names := r.scope.Names()
names := r.interpreter.GlobalScope().Names()
sort.Strings(names)
s := make([]prompt.Suggest, 0, len(names))
@ -170,11 +171,11 @@ func (r *REPL) executeLine(t string, expectYield bool) (values.Value, error) {
return nil, err
}
if _, err := interpreter.Eval(semProg, r.scope); err != nil {
if err := r.interpreter.Eval(semProg); err != nil {
return nil, err
}
v := r.scope.Return()
v := r.interpreter.Return()
// Check for yield and execute query
if v.Type() == query.TableObjectType {
@ -186,7 +187,7 @@ func (r *REPL) executeLine(t string, expectYield bool) (values.Value, error) {
}
}
r.scope.Set("_", v)
r.interpreter.SetVar("_", v)
// Print value
if v.Type() != semantic.Invalid {

View File

@ -2,6 +2,7 @@ package query
import (
"fmt"
"time"
"github.com/pkg/errors"
)
@ -11,6 +12,7 @@ type Spec struct {
Operations []*Operation `json:"operations"`
Edges []Edge `json:"edges"`
Resources ResourceManagement `json:"resources"`
Now time.Time `json:"now"`
sorted []*Operation
children map[OperationID][]*Operation

96
query/values/function.go Normal file
View File

@ -0,0 +1,96 @@
package values
import (
"regexp"
"github.com/influxdata/platform/query/semantic"
)
// Function represents a callable type
type Function interface {
Value
HasSideEffect() bool
Call(args Object) (Value, error)
}
// NewFunction returns a new function value
func NewFunction(name string, typ semantic.Type, call func(args Object) (Value, error), sideEffect bool) Function {
return &function{
name: name,
t: typ,
call: call,
hasSideEffect: sideEffect,
}
}
// function implements Value interface and more specifically the Function interface
type function struct {
name string
t semantic.Type
call func(args Object) (Value, error)
hasSideEffect bool
}
func (f *function) Type() semantic.Type {
return f.t
}
func (f *function) Str() string {
panic(UnexpectedKind(semantic.Object, semantic.String))
}
func (f *function) Int() int64 {
panic(UnexpectedKind(semantic.Object, semantic.Int))
}
func (f *function) UInt() uint64 {
panic(UnexpectedKind(semantic.Object, semantic.UInt))
}
func (f *function) Float() float64 {
panic(UnexpectedKind(semantic.Object, semantic.Float))
}
func (f *function) Bool() bool {
panic(UnexpectedKind(semantic.Object, semantic.Bool))
}
func (f *function) Time() Time {
panic(UnexpectedKind(semantic.Object, semantic.Time))
}
func (f *function) Duration() Duration {
panic(UnexpectedKind(semantic.Object, semantic.Duration))
}
func (f *function) Regexp() *regexp.Regexp {
panic(UnexpectedKind(semantic.Object, semantic.Regexp))
}
func (f *function) Array() Array {
panic(UnexpectedKind(semantic.Object, semantic.Function))
}
func (f *function) Object() Object {
panic(UnexpectedKind(semantic.Object, semantic.Object))
}
func (f *function) Function() Function {
return f
}
func (f *function) Equal(rhs Value) bool {
if f.Type() != rhs.Type() {
return false
}
v, ok := rhs.(*function)
return ok && (f == v)
}
func (f *function) HasSideEffect() bool {
return f.hasSideEffect
}
func (f *function) Call(args Object) (Value, error) {
return f.call(args)
}

View File

@ -27,13 +27,6 @@ type Value interface {
Equal(Value) bool
}
// Function represents a callable type
type Function interface {
Value
HasSideEffect() bool
Call(args Object) (Value, error)
}
type value struct {
t semantic.Type
v interface{}