From a907e0542619bc0cda535a42b1c4ba04ca3a0f76 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 5 Mar 2020 10:32:17 -0600 Subject: [PATCH] refactor(http): modify query handler to use a language service (#17074) The language service abstracts away the parse source which breaks the dependency without moving any of the code. --- cmd/influxd/launcher/launcher.go | 2 ++ http/api_handler.go | 1 + http/query.go | 12 +++++++----- http/query_handler.go | 12 +++++++----- http/query_handler_test.go | 5 +++++ query.go | 11 +++++++++++ query/fluxlang/service.go | 21 +++++++++++++++++++++ 7 files changed, 54 insertions(+), 10 deletions(-) create mode 100644 query/fluxlang/service.go diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 841a25d1f1..9d00cb8761 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -38,6 +38,7 @@ import ( infprom "github.com/influxdata/influxdb/prometheus" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/query/control" + "github.com/influxdata/influxdb/query/fluxlang" "github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb" "github.com/influxdata/influxdb/snowflake" "github.com/influxdata/influxdb/source" @@ -795,6 +796,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { OnboardingService: onboardingSvc, InfluxQLService: nil, // No InfluxQL support FluxService: storageQueryService, + FluxLanguageService: fluxlang.DefaultService, TaskService: taskSvc, TelegrafService: telegrafSvc, NotificationRuleStore: notificationRuleSvc, diff --git a/http/api_handler.go b/http/api_handler.go index 1bbdbe236c..8ed1626cde 100644 --- a/http/api_handler.go +++ b/http/api_handler.go @@ -72,6 +72,7 @@ type APIBackend struct { OnboardingService influxdb.OnboardingService InfluxQLService query.ProxyQueryService FluxService query.ProxyQueryService + FluxLanguageService influxdb.FluxLanguageService TaskService influxdb.TaskService CheckService influxdb.CheckService TelegrafService influxdb.TelegrafConfigStore diff --git a/http/query.go b/http/query.go index 425f56353d..de77e424b0 100644 --- a/http/query.go +++ b/http/query.go @@ -18,7 +18,6 @@ import ( "github.com/influxdata/flux/ast" "github.com/influxdata/flux/csv" "github.com/influxdata/flux/lang" - "github.com/influxdata/flux/parser" "github.com/influxdata/flux/repl" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/jsonweb" @@ -145,10 +144,10 @@ type queryParseError struct { // Analyze attempts to parse the query request and returns any errors // encountered in a structured way. -func (r QueryRequest) Analyze() (*QueryAnalysis, error) { +func (r QueryRequest) Analyze(l influxdb.FluxLanguageService) (*QueryAnalysis, error) { switch r.Type { case "flux": - return r.analyzeFluxQuery() + return r.analyzeFluxQuery(l) case "influxql": return r.analyzeInfluxQLQuery() } @@ -156,9 +155,12 @@ func (r QueryRequest) Analyze() (*QueryAnalysis, error) { return nil, fmt.Errorf("unknown query request type %s", r.Type) } -func (r QueryRequest) analyzeFluxQuery() (*QueryAnalysis, error) { +func (r QueryRequest) analyzeFluxQuery(l influxdb.FluxLanguageService) (*QueryAnalysis, error) { a := &QueryAnalysis{} - pkg := parser.ParseSource(r.Query) + pkg, err := l.Parse(r.Query) + if pkg == nil { + return nil, err + } errCount := ast.Check(pkg) if errCount == 0 { a.Errors = []queryParseError{} diff --git a/http/query_handler.go b/http/query_handler.go index 5f6a57a77a..2586786fb5 100644 --- a/http/query_handler.go +++ b/http/query_handler.go @@ -17,7 +17,6 @@ import ( "github.com/influxdata/flux/complete" "github.com/influxdata/flux/csv" "github.com/influxdata/flux/iocounter" - "github.com/influxdata/flux/parser" "github.com/influxdata/httprouter" "github.com/influxdata/influxdb" pcontext "github.com/influxdata/influxdb/context" @@ -46,6 +45,7 @@ type FluxBackend struct { OrganizationService influxdb.OrganizationService ProxyQueryService query.ProxyQueryService + FluxLanguageService influxdb.FluxLanguageService } // NewFluxBackend returns a new instance of FluxBackend. @@ -57,6 +57,7 @@ func NewFluxBackend(log *zap.Logger, b *APIBackend) *FluxBackend { ProxyQueryService: b.FluxService, OrganizationService: b.OrganizationService, + FluxLanguageService: b.FluxLanguageService, } } @@ -74,6 +75,7 @@ type FluxHandler struct { Now func() time.Time OrganizationService influxdb.OrganizationService ProxyQueryService query.ProxyQueryService + LanguageService influxdb.FluxLanguageService EventRecorder metric.EventRecorder } @@ -94,6 +96,7 @@ func NewFluxHandler(log *zap.Logger, b *FluxBackend) *FluxHandler { ProxyQueryService: b.ProxyQueryService, OrganizationService: b.OrganizationService, EventRecorder: b.QueryEventRecorder, + LanguageService: b.FluxLanguageService, } // query reponses can optionally be gzip encoded @@ -216,9 +219,8 @@ func (h *FluxHandler) postFluxAST(w http.ResponseWriter, r *http.Request) { return } - pkg := parser.ParseSource(request.Query) - if ast.Check(pkg) > 0 { - err := ast.GetError(pkg) + pkg, err := h.LanguageService.Parse(request.Query) + if err != nil { h.HandleHTTPError(ctx, &influxdb.Error{ Code: influxdb.EInvalid, Msg: "invalid AST", @@ -254,7 +256,7 @@ func (h *FluxHandler) postQueryAnalyze(w http.ResponseWriter, r *http.Request) { return } - a, err := req.Analyze() + a, err := req.Analyze(h.LanguageService) if err != nil { h.HandleHTTPError(ctx, err, w) return diff --git a/http/query_handler_test.go b/http/query_handler_test.go index f838f3e326..400fa0b312 100644 --- a/http/query_handler_test.go +++ b/http/query_handler_test.go @@ -28,6 +28,7 @@ import ( kithttp "github.com/influxdata/influxdb/kit/transport/http" influxmock "github.com/influxdata/influxdb/mock" "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxdb/query/fluxlang" "github.com/influxdata/influxdb/query/mock" "go.uber.org/zap/zaptest" ) @@ -260,6 +261,7 @@ func TestFluxHandler_postFluxAST(t *testing.T) { t.Run(tt.name, func(t *testing.T) { h := &FluxHandler{ HTTPErrorHandler: kithttp.ErrorHandler(0), + LanguageService: fluxlang.DefaultService, } h.postFluxAST(tt.w, tt.r) if got := tt.w.Body.String(); got != tt.want { @@ -338,6 +340,7 @@ func TestFluxHandler_PostQuery_Errors(t *testing.T) { } }, }, + FluxLanguageService: fluxlang.DefaultService, } h := NewFluxHandler(zaptest.NewLogger(t), b) @@ -497,6 +500,7 @@ func TestFluxService_Query_gzip(t *testing.T) { QueryEventRecorder: noopEventRecorder{}, OrganizationService: orgService, ProxyQueryService: queryService, + FluxLanguageService: fluxlang.DefaultService, } fluxHandler := NewFluxHandler(zaptest.NewLogger(t), fluxBackend) @@ -633,6 +637,7 @@ func benchmarkQuery(b *testing.B, disableCompression bool) { QueryEventRecorder: noopEventRecorder{}, OrganizationService: orgService, ProxyQueryService: queryService, + FluxLanguageService: fluxlang.DefaultService, } fluxHandler := NewFluxHandler(zaptest.NewLogger(b), fluxBackend) diff --git a/query.go b/query.go index 476b54364b..9fc94dfac8 100644 --- a/query.go +++ b/query.go @@ -1,5 +1,7 @@ package influxdb +import "github.com/influxdata/flux/ast" + // TODO(desa): These files are possibly a temporary. This is needed // as a part of the source work that is being done. // See https://github.com/influxdata/platform/issues/594 for more info. @@ -9,3 +11,12 @@ type SourceQuery struct { Query string `json:"query"` Type string `json:"type"` } + +// FluxLanguageService is a service for interacting with flux code. +type FluxLanguageService interface { + // Parse will take flux source code and produce a package. + // If there are errors when parsing, the first error is returned. + // An ast.Package may be returned when a parsing error occurs, + // but it may be null if parsing didn't even occur. + Parse(source string) (*ast.Package, error) +} diff --git a/query/fluxlang/service.go b/query/fluxlang/service.go new file mode 100644 index 0000000000..2e6747d4e8 --- /dev/null +++ b/query/fluxlang/service.go @@ -0,0 +1,21 @@ +// Package language exposes the flux parser as an interface. +package fluxlang + +import ( + "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/parser" + "github.com/influxdata/influxdb" +) + +// DefaultService is the default language service. +var DefaultService influxdb.FluxLanguageService = defaultService{} + +type defaultService struct{} + +func (d defaultService) Parse(source string) (pkg *ast.Package, err error) { + pkg = parser.ParseSource(source) + if ast.Check(pkg) > 0 { + err = ast.GetError(pkg) + } + return pkg, err +}