Merged pull request #401 from influxdata/nc-dbrp-read

feat: Use DBRPMappings in 1.x read path
pull/10616/head
Nathaniel Cook 2018-07-18 09:55:08 -06:00
commit e302d3a178
22 changed files with 252 additions and 51 deletions

View File

@ -82,6 +82,8 @@ func transpileF(cmd *cobra.Command, logger *zap.Logger, args []string) error {
transpileHandler.QueryService = &http.QueryService{
Addr: hosts[0],
}
//TODO(nathanielc): Add DBRPMappingService
transpileHandler.DBRPMappingService = nil
transpileHandler.Logger = logger
reg.MustRegister(transpileHandler.PrometheusCollectors()...)

View File

@ -15,9 +15,10 @@ import (
type TranspilerQueryHandler struct {
*httprouter.Router
Logger *zap.Logger
QueryService query.QueryService
OrgID platform.ID
Logger *zap.Logger
QueryService query.QueryService
DBRPMappingService platform.DBRPMappingService
OrgID platform.ID
}
// NewQueryHandler returns a new instance of QueryHandler.
@ -52,7 +53,7 @@ func (h *TranspilerQueryHandler) handlePostQuery(w http.ResponseWriter, r *http.
// Create a new transpiler from the http request.
ce := influxqlCE
transpiler := ce.transpiler(r)
transpiler := ce.transpiler(r, h.DBRPMappingService)
// Run the transpiler against the query service.
results, err := query.QueryWithTranspile(ctx, h.OrgID, queryStr, h.QueryService, transpiler)
@ -77,18 +78,18 @@ func (h *TranspilerQueryHandler) PrometheusCollectors() []prometheus.Collector {
// crossExecute contains the components needed to execute a transpiled query and encode results.
type crossExecute struct {
transpiler func(req *http.Request) query.Transpiler
transpiler func(req *http.Request, dbrpMappingSvc platform.DBRPMappingService) query.Transpiler
encoder query.MultiResultEncoder
contentType string
}
var influxqlCE = crossExecute{
transpiler: func(req *http.Request) query.Transpiler {
transpiler: func(req *http.Request, dbrpMappingSvc platform.DBRPMappingService) query.Transpiler {
config := influxql.Config{
DefaultDatabase: req.FormValue("db"),
DefaultRetentionPolicy: req.FormValue("rp"),
}
return influxql.NewTranspilerWithConfig(config)
return influxql.NewTranspilerWithConfig(dbrpMappingSvc, config)
},
encoder: influxql.NewMultiResultEncoder(),
contentType: "application/json",

51
mock/dbrp_mapping.go Normal file
View File

@ -0,0 +1,51 @@
package mock
import (
"context"
"github.com/influxdata/platform"
)
type DBRPMappingService struct {
FindByFn func(ctx context.Context, cluster string, db string, rp string) (*platform.DBRPMapping, error)
FindFn func(ctx context.Context, filter platform.DBRPMappingFilter) (*platform.DBRPMapping, error)
FindManyFn func(ctx context.Context, filter platform.DBRPMappingFilter, opt ...platform.FindOptions) ([]*platform.DBRPMapping, int, error)
CreateFn func(ctx context.Context, dbrpMap *platform.DBRPMapping) error
DeleteFn func(ctx context.Context, cluster string, db string, rp string) error
}
func NewDBRPMappingService() *DBRPMappingService {
return &DBRPMappingService{
FindByFn: func(ctx context.Context, cluster string, db string, rp string) (*platform.DBRPMapping, error) {
return nil, nil
},
FindFn: func(ctx context.Context, filter platform.DBRPMappingFilter) (*platform.DBRPMapping, error) {
return nil, nil
},
FindManyFn: func(ctx context.Context, filter platform.DBRPMappingFilter, opt ...platform.FindOptions) ([]*platform.DBRPMapping, int, error) {
return nil, 0, nil
},
CreateFn: func(ctx context.Context, dbrpMap *platform.DBRPMapping) error { return nil },
DeleteFn: func(ctx context.Context, cluster string, db string, rp string) error { return nil },
}
}
func (s *DBRPMappingService) FindBy(ctx context.Context, cluster string, db string, rp string) (*platform.DBRPMapping, error) {
return s.FindByFn(ctx, cluster, db, rp)
}
func (s *DBRPMappingService) Find(ctx context.Context, filter platform.DBRPMappingFilter) (*platform.DBRPMapping, error) {
return s.FindFn(ctx, filter)
}
func (s *DBRPMappingService) FindMany(ctx context.Context, filter platform.DBRPMappingFilter, opt ...platform.FindOptions) ([]*platform.DBRPMapping, int, error) {
return s.FindManyFn(ctx, filter, opt...)
}
func (s *DBRPMappingService) Create(ctx context.Context, dbrpMap *platform.DBRPMapping) error {
return s.CreateFn(ctx, dbrpMap)
}
func (s *DBRPMappingService) Delete(ctx context.Context, cluster string, db string, rp string) error {
return s.DeleteFn(ctx, cluster, db, rp)
}

View File

@ -8,6 +8,8 @@ import (
"testing"
"time"
"github.com/influxdata/platform"
"github.com/influxdata/platform/mock"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
"github.com/influxdata/platform/query/csv"
@ -18,6 +20,28 @@ import (
"github.com/andreyvit/diff"
)
var dbrpMappingSvc = mock.NewDBRPMappingService()
func init() {
mapping := platform.DBRPMapping{
Cluster: "cluster",
Database: "db0",
RetentionPolicy: "autogen",
Default: true,
OrganizationID: platform.ID("org"),
BucketID: platform.ID("bucket"),
}
dbrpMappingSvc.FindByFn = func(ctx context.Context, cluster string, db string, rp string) (*platform.DBRPMapping, error) {
return &mapping, nil
}
dbrpMappingSvc.FindFn = func(ctx context.Context, filter platform.DBRPMappingFilter) (*platform.DBRPMapping, error) {
return &mapping, nil
}
dbrpMappingSvc.FindManyFn = func(ctx context.Context, filter platform.DBRPMappingFilter, opt ...platform.FindOptions) ([]*platform.DBRPMapping, int, error) {
return []*platform.DBRPMapping{&mapping}, 1, nil
}
}
var skipTests = map[string]string{
"derivative": "derivative not supported by influxql (https://github.com/influxdata/platform/issues/93)",
"filter_by_tags": "arbitrary filtering not supported by influxql (https://github.com/influxdata/platform/issues/94)",
@ -31,7 +55,7 @@ var skipTests = map[string]string{
func Test_QueryEndToEnd(t *testing.T) {
qs := querytest.GetQueryServiceBridge()
influxqlTranspiler := influxql.NewTranspiler()
influxqlTranspiler := influxql.NewTranspiler(dbrpMappingSvc)
dir, err := os.Getwd()
if err != nil {

View File

@ -1,10 +1,13 @@
package influxql
import "time"
import (
"time"
)
// Config modifies the behavior of the Transpiler.
type Config struct {
DefaultDatabase string
DefaultRetentionPolicy string
NowFn func() time.Time
Cluster string
}

View File

@ -58,7 +58,7 @@ func init() {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{

View File

@ -20,7 +20,7 @@ func init() {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{

View File

@ -20,7 +20,7 @@ func init() {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{

View File

@ -21,7 +21,7 @@ func init() {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{

View File

@ -21,7 +21,7 @@ func init() {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{

View File

@ -19,7 +19,7 @@ func init() {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{
@ -69,7 +69,7 @@ func init() {
{
ID: "from1",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{

View File

@ -20,7 +20,7 @@ func init() {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{
@ -86,7 +86,7 @@ func init() {
{
ID: "from1",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{

View File

@ -20,7 +20,7 @@ func init() {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{
@ -125,7 +125,7 @@ func init() {
{
ID: "from1",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{

View File

@ -19,7 +19,7 @@ func init() {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{

View File

@ -19,7 +19,7 @@ func init() {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{

View File

@ -20,7 +20,7 @@ func init() {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{
@ -155,7 +155,7 @@ func init() {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{

View File

@ -19,7 +19,7 @@ func init() {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/alternate",
BucketID: altBucketID,
},
},
{

View File

@ -59,7 +59,7 @@ func init() {
{
ID: "from0",
Spec: &functions.FromOpSpec{
Bucket: "db0/autogen",
BucketID: bucketID,
},
},
{

View File

@ -10,10 +10,55 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform"
"github.com/influxdata/platform/mock"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/influxql"
)
var dbrpMappingSvc = mock.NewDBRPMappingService()
var organizationID = platform.ID("aaaa")
var bucketID = platform.ID("bbbb")
var altBucketID = platform.ID("cccc")
func init() {
mapping := platform.DBRPMapping{
Cluster: "cluster",
Database: "db0",
RetentionPolicy: "autogen",
Default: true,
OrganizationID: organizationID,
BucketID: bucketID,
}
altMapping := platform.DBRPMapping{
Cluster: "cluster",
Database: "db0",
RetentionPolicy: "autogen",
Default: true,
OrganizationID: organizationID,
BucketID: altBucketID,
}
dbrpMappingSvc.FindByFn = func(ctx context.Context, cluster string, db string, rp string) (*platform.DBRPMapping, error) {
if rp == "alternate" {
return &altMapping, nil
}
return &mapping, nil
}
dbrpMappingSvc.FindFn = func(ctx context.Context, filter platform.DBRPMappingFilter) (*platform.DBRPMapping, error) {
if filter.RetentionPolicy != nil && *filter.RetentionPolicy == "alternate" {
return &altMapping, nil
}
return &mapping, nil
}
dbrpMappingSvc.FindManyFn = func(ctx context.Context, filter platform.DBRPMappingFilter, opt ...platform.FindOptions) ([]*platform.DBRPMapping, int, error) {
m := &mapping
if filter.RetentionPolicy != nil && *filter.RetentionPolicy == "alternate" {
m = &altMapping
}
return []*platform.DBRPMapping{m}, 1, nil
}
}
// Fixture is a structure that will run tests.
type Fixture interface {
Run(t *testing.T)
@ -43,9 +88,12 @@ func (f *fixture) Run(t *testing.T) {
t.Fatalf("%s:%d: expected spec is not valid: %s", f.file, f.line, err)
}
transpiler := influxql.NewTranspilerWithConfig(influxql.Config{
NowFn: Now,
})
transpiler := influxql.NewTranspilerWithConfig(
dbrpMappingSvc,
influxql.Config{
NowFn: Now,
},
)
spec, err := transpiler.Transpile(context.Background(), f.stmt)
if err != nil {
t.Fatalf("%s:%d: unexpected error: %s", f.file, f.line, err)

View File

@ -8,22 +8,25 @@ import (
"time"
"github.com/influxdata/influxql"
"github.com/influxdata/platform"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/functions"
)
// Transpiler converts InfluxQL queries into a query spec.
type Transpiler struct {
Config *Config
Config *Config
dbrpMappingSvc platform.DBRPMappingService
}
func NewTranspiler() *Transpiler {
return &Transpiler{}
func NewTranspiler(dbrpMappingSvc platform.DBRPMappingService) *Transpiler {
return NewTranspilerWithConfig(dbrpMappingSvc, Config{})
}
func NewTranspilerWithConfig(cfg Config) *Transpiler {
func NewTranspilerWithConfig(dbrpMappingSvc platform.DBRPMappingService, cfg Config) *Transpiler {
return &Transpiler{
Config: &cfg,
Config: &cfg,
dbrpMappingSvc: dbrpMappingSvc,
}
}
@ -34,7 +37,7 @@ func (t *Transpiler) Transpile(ctx context.Context, txt string) (*query.Spec, er
return nil, err
}
transpiler := newTranspilerState(t.Config)
transpiler := newTranspilerState(t.dbrpMappingSvc, t.Config)
for i, s := range q.Statements {
stmt, ok := s.(*influxql.SelectStatement)
if !ok {
@ -48,18 +51,20 @@ func (t *Transpiler) Transpile(ctx context.Context, txt string) (*query.Spec, er
}
type transpilerState struct {
id int
stmt *influxql.SelectStatement
config Config
spec *query.Spec
nextID map[string]int
now time.Time
id int
stmt *influxql.SelectStatement
config Config
spec *query.Spec
nextID map[string]int
now time.Time
dbrpMappingSvc platform.DBRPMappingService
}
func newTranspilerState(config *Config) *transpilerState {
func newTranspilerState(dbrpMappingSvc platform.DBRPMappingService, config *Config) *transpilerState {
state := &transpilerState{
spec: &query.Spec{},
nextID: make(map[string]int),
spec: &query.Spec{},
nextID: make(map[string]int),
dbrpMappingSvc: dbrpMappingSvc,
}
if config != nil {
state.config = *config
@ -128,13 +133,26 @@ func (t *transpilerState) from(m *influxql.Measurement) (query.OperationID, erro
if rp == "" {
if t.config.DefaultRetentionPolicy != "" {
rp = t.config.DefaultRetentionPolicy
} else {
rp = "autogen"
}
}
var filter platform.DBRPMappingFilter
filter.Cluster = &t.config.Cluster
if db != "" {
filter.Database = &db
}
if rp != "" {
filter.RetentionPolicy = &rp
}
defaultRP := rp == ""
filter.Default = &defaultRP
mapping, err := t.dbrpMappingSvc.Find(context.TODO(), filter)
if err != nil {
return "", err
}
spec := &functions.FromOpSpec{
Bucket: fmt.Sprintf("%s/%s", db, rp),
BucketID: mapping.BucketID,
}
return t.op("from", spec), nil
}

View File

@ -5,11 +5,35 @@ import (
"strings"
"testing"
"github.com/influxdata/platform"
"github.com/influxdata/platform/mock"
"github.com/influxdata/platform/query/influxql"
"github.com/influxdata/platform/query/influxql/spectests"
"github.com/pkg/errors"
)
var dbrpMappingSvc = mock.NewDBRPMappingService()
func init() {
mapping := platform.DBRPMapping{
Cluster: "cluster",
Database: "db0",
RetentionPolicy: "autogen",
Default: true,
OrganizationID: platform.ID("aaaa"),
BucketID: platform.ID("bbbb"),
}
dbrpMappingSvc.FindByFn = func(ctx context.Context, cluster string, db string, rp string) (*platform.DBRPMapping, error) {
return &mapping, nil
}
dbrpMappingSvc.FindFn = func(ctx context.Context, filter platform.DBRPMappingFilter) (*platform.DBRPMapping, error) {
return &mapping, nil
}
dbrpMappingSvc.FindManyFn = func(ctx context.Context, filter platform.DBRPMappingFilter, opt ...platform.FindOptions) ([]*platform.DBRPMapping, int, error) {
return []*platform.DBRPMapping{&mapping}, 1, nil
}
}
func TestTranspiler(t *testing.T) {
for _, fixture := range spectests.All() {
fixture.Run(t)
@ -361,9 +385,12 @@ func TestTranspiler_Compile(t *testing.T) {
}
}()
transpiler := influxql.NewTranspilerWithConfig(influxql.Config{
DefaultDatabase: "db0",
})
transpiler := influxql.NewTranspilerWithConfig(
dbrpMappingSvc,
influxql.Config{
DefaultDatabase: "db0",
},
)
if _, err := transpiler.Transpile(context.Background(), tt.s); err != nil {
if got, want := err.Error(), tt.err; got != want {
if cause := errors.Cause(err); strings.HasPrefix(cause.Error(), "unimplemented") {

View File

@ -11,6 +11,8 @@ import (
"time"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/platform"
"github.com/influxdata/platform/mock"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/influxql"
"github.com/influxdata/platform/query/semantic/semantictest"
@ -69,7 +71,7 @@ func main() {
return
}
transpiler := influxql.NewTranspiler()
transpiler := influxql.NewTranspiler(dbrpMappingSvc)
influxqlSpec, err := transpiler.Transpile(context.Background(), string(influxqlText))
if err != nil {
fmt.Printf("error transpiling. \n query: \n %s \n err: %s", string(influxqlText), err)
@ -85,3 +87,28 @@ func main() {
fmt.Printf("compiled vs transpiled diff: \n%s", difference)
}
}
// Setup mock DBRPMappingService to always return `db.rp`.
var dbrpMappingSvc = mock.NewDBRPMappingService()
func init() {
organizationID := platform.ID("aaaa")
bucketID := platform.ID("bbbb")
mapping := platform.DBRPMapping{
Cluster: "cluster",
Database: "db",
RetentionPolicy: "rp",
Default: true,
OrganizationID: organizationID,
BucketID: bucketID,
}
dbrpMappingSvc.FindByFn = func(ctx context.Context, cluster string, db string, rp string) (*platform.DBRPMapping, error) {
return &mapping, nil
}
dbrpMappingSvc.FindFn = func(ctx context.Context, filter platform.DBRPMappingFilter) (*platform.DBRPMapping, error) {
return &mapping, nil
}
dbrpMappingSvc.FindManyFn = func(ctx context.Context, filter platform.DBRPMappingFilter, opt ...platform.FindOptions) ([]*platform.DBRPMapping, int, error) {
return []*platform.DBRPMapping{&mapping}, 1, nil
}
}