feat(cmd/influx): add command to manually transpile InfluxQL to Flux (#16119)
parent
ad961669ae
commit
fd63ff17f3
|
@ -76,6 +76,7 @@ func influxCmd() *cobra.Command {
|
|||
pingCmd,
|
||||
cmdPkg(newPkgerSVC),
|
||||
queryCmd,
|
||||
transpileCmd,
|
||||
replCmd,
|
||||
setupCmd,
|
||||
taskCmd,
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/kit/errors"
|
||||
"github.com/influxdata/influxdb/query/influxql"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
var transpileCmd = &cobra.Command{
|
||||
Use: "transpile [InfluxQL query]",
|
||||
Short: "Transpile an InfluxQL query to Flux source code",
|
||||
Long: `Transpile an InfluxQL query to Flux source code.
|
||||
|
||||
The transpiled query assumes that the bucket name is the of the form '<database>/<retention policy>'.
|
||||
|
||||
The transpiled query will be written for absolute time ranges using the provided now() time.`,
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: transpileF,
|
||||
}
|
||||
|
||||
var transpileFlags struct {
|
||||
Now string
|
||||
}
|
||||
|
||||
func init() {
|
||||
transpileCmd.PersistentFlags().StringVar(&transpileFlags.Now, "now", "", "An RFC3339Nano formatted time to use as the now() time. Defaults to the current time.")
|
||||
viper.BindEnv("NOW")
|
||||
if h := viper.GetString("NOW"); h != "" {
|
||||
transpileFlags.Now = h
|
||||
}
|
||||
}
|
||||
|
||||
func transpileF(cmd *cobra.Command, args []string) error {
|
||||
now := time.Now()
|
||||
if transpileFlags.Now != "" {
|
||||
var err error
|
||||
now, err = time.Parse(time.RFC3339Nano, transpileFlags.Now)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "invalid now time")
|
||||
}
|
||||
}
|
||||
t := influxql.NewTranspilerWithConfig(dbrpMapper{}, influxql.Config{
|
||||
Now: now,
|
||||
FallbackToDBRP: true,
|
||||
})
|
||||
pkg, err := t.Transpile(context.Background(), args[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Println(ast.Format(pkg))
|
||||
return nil
|
||||
}
|
||||
|
||||
type dbrpMapper struct {
|
||||
}
|
||||
|
||||
func (m dbrpMapper) FindBy(ctx context.Context, cluster string, db string, rp string) (*influxdb.DBRPMapping, error) {
|
||||
return nil, errors.New("mapping not found")
|
||||
}
|
||||
func (m dbrpMapper) Find(ctx context.Context, filter influxdb.DBRPMappingFilter) (*influxdb.DBRPMapping, error) {
|
||||
return nil, errors.New("mapping not found")
|
||||
}
|
||||
func (m dbrpMapper) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilter, opt ...influxdb.FindOptions) ([]*influxdb.DBRPMapping, int, error) {
|
||||
return nil, 0, errors.New("mapping not found")
|
||||
|
||||
}
|
||||
func (m dbrpMapper) Create(ctx context.Context, dbrpMap *influxdb.DBRPMapping) error {
|
||||
return errors.New("dbrpMapper does not support creating new mappings")
|
||||
}
|
||||
func (m dbrpMapper) Delete(ctx context.Context, cluster string, db string, rp string) error {
|
||||
return errors.New("dbrpMapper does not support deleteing mappings")
|
||||
}
|
|
@ -10,4 +10,7 @@ type Config struct {
|
|||
DefaultRetentionPolicy string
|
||||
Now time.Time
|
||||
Cluster string
|
||||
// FallbackToDBRP if true will use the naming convention of `db/rp`
|
||||
// for a bucket name when an mapping is not found
|
||||
FallbackToDBRP bool
|
||||
}
|
||||
|
|
|
@ -677,15 +677,29 @@ func (t *transpilerState) from(m *influxql.Measurement) (ast.Expression, error)
|
|||
defaultRP := rp == ""
|
||||
filter.Default = &defaultRP
|
||||
mapping, err := t.dbrpMappingSvc.Find(context.TODO(), filter)
|
||||
var args []ast.Expression
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ast.CallExpression{
|
||||
Callee: &ast.Identifier{
|
||||
Name: "from",
|
||||
},
|
||||
Arguments: []ast.Expression{
|
||||
if !t.config.FallbackToDBRP {
|
||||
return nil, err
|
||||
}
|
||||
// use `db/rp` naming convention
|
||||
args = []ast.Expression{
|
||||
&ast.ObjectExpression{
|
||||
Properties: []*ast.Property{
|
||||
{
|
||||
Key: &ast.Identifier{
|
||||
Name: "bucket",
|
||||
},
|
||||
Value: &ast.StringLiteral{
|
||||
Value: fmt.Sprintf("%s/%s", db, rp),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
} else {
|
||||
// use mapping bucket id
|
||||
args = []ast.Expression{
|
||||
&ast.ObjectExpression{
|
||||
Properties: []*ast.Property{
|
||||
{
|
||||
|
@ -698,7 +712,14 @@ func (t *transpilerState) from(m *influxql.Measurement) (ast.Expression, error)
|
|||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return &ast.CallExpression{
|
||||
Callee: &ast.Identifier{
|
||||
Name: "from",
|
||||
},
|
||||
Arguments: args,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue