refactor(http): modify query handler to use a language service (#17074)
The language service abstracts away the parse source which breaks the dependency without moving any of the code.pull/17104/head
parent
3250fb1453
commit
a907e05426
|
|
@ -38,6 +38,7 @@ import (
|
|||
infprom "github.com/influxdata/influxdb/prometheus"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/query/control"
|
||||
"github.com/influxdata/influxdb/query/fluxlang"
|
||||
"github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/snowflake"
|
||||
"github.com/influxdata/influxdb/source"
|
||||
|
|
@ -795,6 +796,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
OnboardingService: onboardingSvc,
|
||||
InfluxQLService: nil, // No InfluxQL support
|
||||
FluxService: storageQueryService,
|
||||
FluxLanguageService: fluxlang.DefaultService,
|
||||
TaskService: taskSvc,
|
||||
TelegrafService: telegrafSvc,
|
||||
NotificationRuleStore: notificationRuleSvc,
|
||||
|
|
|
|||
|
|
@ -72,6 +72,7 @@ type APIBackend struct {
|
|||
OnboardingService influxdb.OnboardingService
|
||||
InfluxQLService query.ProxyQueryService
|
||||
FluxService query.ProxyQueryService
|
||||
FluxLanguageService influxdb.FluxLanguageService
|
||||
TaskService influxdb.TaskService
|
||||
CheckService influxdb.CheckService
|
||||
TelegrafService influxdb.TelegrafConfigStore
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import (
|
|||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/flux/csv"
|
||||
"github.com/influxdata/flux/lang"
|
||||
"github.com/influxdata/flux/parser"
|
||||
"github.com/influxdata/flux/repl"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/jsonweb"
|
||||
|
|
@ -145,10 +144,10 @@ type queryParseError struct {
|
|||
|
||||
// Analyze attempts to parse the query request and returns any errors
|
||||
// encountered in a structured way.
|
||||
func (r QueryRequest) Analyze() (*QueryAnalysis, error) {
|
||||
func (r QueryRequest) Analyze(l influxdb.FluxLanguageService) (*QueryAnalysis, error) {
|
||||
switch r.Type {
|
||||
case "flux":
|
||||
return r.analyzeFluxQuery()
|
||||
return r.analyzeFluxQuery(l)
|
||||
case "influxql":
|
||||
return r.analyzeInfluxQLQuery()
|
||||
}
|
||||
|
|
@ -156,9 +155,12 @@ func (r QueryRequest) Analyze() (*QueryAnalysis, error) {
|
|||
return nil, fmt.Errorf("unknown query request type %s", r.Type)
|
||||
}
|
||||
|
||||
func (r QueryRequest) analyzeFluxQuery() (*QueryAnalysis, error) {
|
||||
func (r QueryRequest) analyzeFluxQuery(l influxdb.FluxLanguageService) (*QueryAnalysis, error) {
|
||||
a := &QueryAnalysis{}
|
||||
pkg := parser.ParseSource(r.Query)
|
||||
pkg, err := l.Parse(r.Query)
|
||||
if pkg == nil {
|
||||
return nil, err
|
||||
}
|
||||
errCount := ast.Check(pkg)
|
||||
if errCount == 0 {
|
||||
a.Errors = []queryParseError{}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ import (
|
|||
"github.com/influxdata/flux/complete"
|
||||
"github.com/influxdata/flux/csv"
|
||||
"github.com/influxdata/flux/iocounter"
|
||||
"github.com/influxdata/flux/parser"
|
||||
"github.com/influxdata/httprouter"
|
||||
"github.com/influxdata/influxdb"
|
||||
pcontext "github.com/influxdata/influxdb/context"
|
||||
|
|
@ -46,6 +45,7 @@ type FluxBackend struct {
|
|||
|
||||
OrganizationService influxdb.OrganizationService
|
||||
ProxyQueryService query.ProxyQueryService
|
||||
FluxLanguageService influxdb.FluxLanguageService
|
||||
}
|
||||
|
||||
// NewFluxBackend returns a new instance of FluxBackend.
|
||||
|
|
@ -57,6 +57,7 @@ func NewFluxBackend(log *zap.Logger, b *APIBackend) *FluxBackend {
|
|||
|
||||
ProxyQueryService: b.FluxService,
|
||||
OrganizationService: b.OrganizationService,
|
||||
FluxLanguageService: b.FluxLanguageService,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -74,6 +75,7 @@ type FluxHandler struct {
|
|||
Now func() time.Time
|
||||
OrganizationService influxdb.OrganizationService
|
||||
ProxyQueryService query.ProxyQueryService
|
||||
LanguageService influxdb.FluxLanguageService
|
||||
|
||||
EventRecorder metric.EventRecorder
|
||||
}
|
||||
|
|
@ -94,6 +96,7 @@ func NewFluxHandler(log *zap.Logger, b *FluxBackend) *FluxHandler {
|
|||
ProxyQueryService: b.ProxyQueryService,
|
||||
OrganizationService: b.OrganizationService,
|
||||
EventRecorder: b.QueryEventRecorder,
|
||||
LanguageService: b.FluxLanguageService,
|
||||
}
|
||||
|
||||
// query reponses can optionally be gzip encoded
|
||||
|
|
@ -216,9 +219,8 @@ func (h *FluxHandler) postFluxAST(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
pkg := parser.ParseSource(request.Query)
|
||||
if ast.Check(pkg) > 0 {
|
||||
err := ast.GetError(pkg)
|
||||
pkg, err := h.LanguageService.Parse(request.Query)
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: "invalid AST",
|
||||
|
|
@ -254,7 +256,7 @@ func (h *FluxHandler) postQueryAnalyze(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
a, err := req.Analyze()
|
||||
a, err := req.Analyze(h.LanguageService)
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import (
|
|||
kithttp "github.com/influxdata/influxdb/kit/transport/http"
|
||||
influxmock "github.com/influxdata/influxdb/mock"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/query/fluxlang"
|
||||
"github.com/influxdata/influxdb/query/mock"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
|
@ -260,6 +261,7 @@ func TestFluxHandler_postFluxAST(t *testing.T) {
|
|||
t.Run(tt.name, func(t *testing.T) {
|
||||
h := &FluxHandler{
|
||||
HTTPErrorHandler: kithttp.ErrorHandler(0),
|
||||
LanguageService: fluxlang.DefaultService,
|
||||
}
|
||||
h.postFluxAST(tt.w, tt.r)
|
||||
if got := tt.w.Body.String(); got != tt.want {
|
||||
|
|
@ -338,6 +340,7 @@ func TestFluxHandler_PostQuery_Errors(t *testing.T) {
|
|||
}
|
||||
},
|
||||
},
|
||||
FluxLanguageService: fluxlang.DefaultService,
|
||||
}
|
||||
h := NewFluxHandler(zaptest.NewLogger(t), b)
|
||||
|
||||
|
|
@ -497,6 +500,7 @@ func TestFluxService_Query_gzip(t *testing.T) {
|
|||
QueryEventRecorder: noopEventRecorder{},
|
||||
OrganizationService: orgService,
|
||||
ProxyQueryService: queryService,
|
||||
FluxLanguageService: fluxlang.DefaultService,
|
||||
}
|
||||
|
||||
fluxHandler := NewFluxHandler(zaptest.NewLogger(t), fluxBackend)
|
||||
|
|
@ -633,6 +637,7 @@ func benchmarkQuery(b *testing.B, disableCompression bool) {
|
|||
QueryEventRecorder: noopEventRecorder{},
|
||||
OrganizationService: orgService,
|
||||
ProxyQueryService: queryService,
|
||||
FluxLanguageService: fluxlang.DefaultService,
|
||||
}
|
||||
|
||||
fluxHandler := NewFluxHandler(zaptest.NewLogger(b), fluxBackend)
|
||||
|
|
|
|||
11
query.go
11
query.go
|
|
@ -1,5 +1,7 @@
|
|||
package influxdb
|
||||
|
||||
import "github.com/influxdata/flux/ast"
|
||||
|
||||
// TODO(desa): These files are possibly a temporary. This is needed
|
||||
// as a part of the source work that is being done.
|
||||
// See https://github.com/influxdata/platform/issues/594 for more info.
|
||||
|
|
@ -9,3 +11,12 @@ type SourceQuery struct {
|
|||
Query string `json:"query"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
// FluxLanguageService is a service for interacting with flux code.
|
||||
type FluxLanguageService interface {
|
||||
// Parse will take flux source code and produce a package.
|
||||
// If there are errors when parsing, the first error is returned.
|
||||
// An ast.Package may be returned when a parsing error occurs,
|
||||
// but it may be null if parsing didn't even occur.
|
||||
Parse(source string) (*ast.Package, error)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,21 @@
|
|||
// Package language exposes the flux parser as an interface.
|
||||
package fluxlang
|
||||
|
||||
import (
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/flux/parser"
|
||||
"github.com/influxdata/influxdb"
|
||||
)
|
||||
|
||||
// DefaultService is the default language service.
|
||||
var DefaultService influxdb.FluxLanguageService = defaultService{}
|
||||
|
||||
type defaultService struct{}
|
||||
|
||||
func (d defaultService) Parse(source string) (pkg *ast.Package, err error) {
|
||||
pkg = parser.ParseSource(source)
|
||||
if ast.Check(pkg) > 0 {
|
||||
err = ast.GetError(pkg)
|
||||
}
|
||||
return pkg, err
|
||||
}
|
||||
Loading…
Reference in New Issue