refactor(query): handle flux errors in the query controller instead of http (#14368)
If we handle the flux errors in the query controller, it makes it so we are handling the errors in the location where the happen rather than at a layer further up the stack. This should simplify it so the errors are handled in this single location instead.pull/14378/head
parent
6e77b64da9
commit
47b032464f
|
@ -13,12 +13,11 @@ import (
|
|||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/flux/codes"
|
||||
"github.com/influxdata/flux/complete"
|
||||
"github.com/influxdata/flux/csv"
|
||||
"github.com/influxdata/flux/iocounter"
|
||||
"github.com/influxdata/flux/parser"
|
||||
influxdb "github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb"
|
||||
platform "github.com/influxdata/influxdb"
|
||||
pcontext "github.com/influxdata/influxdb/context"
|
||||
"github.com/influxdata/influxdb/http/metric"
|
||||
|
@ -165,7 +164,7 @@ func (h *FluxHandler) handleQuery(w http.ResponseWriter, r *http.Request) {
|
|||
if _, err := h.ProxyQueryService.Query(ctx, &cw, req); err != nil {
|
||||
if cw.Count() == 0 {
|
||||
// Only record the error headers IFF nothing has been written to w.
|
||||
h.HandleHTTPError(ctx, handleFluxError(err), w)
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
h.Logger.Info("Error writing response to client",
|
||||
|
@ -175,51 +174,6 @@ func (h *FluxHandler) handleQuery(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
// handleFluxError will take a flux.Error and convert it into an influxdb.Error.
|
||||
// It will match certain codes to the equivalent in influxdb.
|
||||
//
|
||||
// If the error is any other type of error, it will return the error untouched.
|
||||
//
|
||||
// TODO(jsternberg): This likely becomes a public function, but this is just an initial
|
||||
// implementation so playing it safe by making it package local for now.
|
||||
func handleFluxError(err error) error {
|
||||
ferr, ok := err.(*flux.Error)
|
||||
if !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
code := influxdb.EInternal
|
||||
switch flux.ErrorCode(err) {
|
||||
case codes.NotFound:
|
||||
code = influxdb.ENotFound
|
||||
case codes.Invalid:
|
||||
code = influxdb.EInvalid
|
||||
// These don't really map correctly, but we want
|
||||
// them to show up as 4XX so until influxdb error
|
||||
// codes are updated for more types of failures,
|
||||
// mapping these to invalid.
|
||||
case codes.Canceled,
|
||||
codes.ResourceExhausted,
|
||||
codes.FailedPrecondition,
|
||||
codes.Aborted,
|
||||
codes.OutOfRange,
|
||||
codes.Unimplemented:
|
||||
code = influxdb.EInvalid
|
||||
case codes.PermissionDenied:
|
||||
code = influxdb.EForbidden
|
||||
case codes.Unauthenticated:
|
||||
code = influxdb.EUnauthorized
|
||||
default:
|
||||
// Everything else is treated as an internal error
|
||||
// which is set above.
|
||||
}
|
||||
return &influxdb.Error{
|
||||
Code: code,
|
||||
Msg: ferr.Msg,
|
||||
Err: handleFluxError(ferr.Err),
|
||||
}
|
||||
}
|
||||
|
||||
type langRequest struct {
|
||||
Query string `json:"query"`
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@ import (
|
|||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/codes"
|
||||
"github.com/influxdata/flux/csv"
|
||||
"github.com/influxdata/flux/lang"
|
||||
"github.com/influxdata/influxdb"
|
||||
|
@ -328,8 +327,8 @@ func TestFluxHandler_PostQuery_Errors(t *testing.T) {
|
|||
OrganizationService: i,
|
||||
ProxyQueryService: &mock.ProxyQueryService{
|
||||
QueryF: func(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
||||
return flux.Statistics{}, &flux.Error{
|
||||
Code: codes.Invalid,
|
||||
return flux.Statistics{}, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: "some query error",
|
||||
}
|
||||
},
|
||||
|
|
|
@ -26,14 +26,15 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/codes"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/lang"
|
||||
"github.com/influxdata/flux/memory"
|
||||
platform "github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/kit/errors"
|
||||
"github.com/influxdata/influxdb/kit/tracing"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
|
@ -146,12 +147,7 @@ func (c *Controller) Query(ctx context.Context, req *query.Request) (flux.Query,
|
|||
ctx = context.WithValue(ctx, orgLabel, req.OrganizationID.String())
|
||||
q, err := c.query(ctx, req.Compiler)
|
||||
if err != nil {
|
||||
// If the controller reports an error, it's usually because of a syntax error
|
||||
// or other problem that the client must fix.
|
||||
return q, &platform.Error{
|
||||
Code: platform.EInvalid,
|
||||
Msg: err.Error(),
|
||||
}
|
||||
return q, err
|
||||
}
|
||||
|
||||
return q, nil
|
||||
|
@ -162,7 +158,7 @@ func (c *Controller) Query(ctx context.Context, req *query.Request) (flux.Query,
|
|||
func (c *Controller) query(ctx context.Context, compiler flux.Compiler) (flux.Query, error) {
|
||||
q, err := c.createQuery(ctx, compiler.CompilerType())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, handleFluxError(err)
|
||||
}
|
||||
|
||||
if err := c.compileQuery(q, compiler); err != nil {
|
||||
|
@ -232,7 +228,10 @@ func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Qu
|
|||
if c.shutdown {
|
||||
// Query controller was shutdown between when we started
|
||||
// creating the query and ending it.
|
||||
err := errors.New("query controller shutdown")
|
||||
err := &flux.Error{
|
||||
Code: codes.Unavailable,
|
||||
Msg: "query controller shutdown",
|
||||
}
|
||||
q.setErr(err)
|
||||
return nil, err
|
||||
}
|
||||
|
@ -270,12 +269,18 @@ func (c *Controller) compileQuery(q *Query, compiler flux.Compiler) (err error)
|
|||
|
||||
ctx, ok := q.tryCompile()
|
||||
if !ok {
|
||||
return errors.New("failed to transition query to compiling state")
|
||||
return &flux.Error{
|
||||
Code: codes.Internal,
|
||||
Msg: "failed to transition query to compiling state",
|
||||
}
|
||||
}
|
||||
|
||||
prog, err := compiler.Compile(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "compilation failed")
|
||||
return &flux.Error{
|
||||
Msg: "compilation failed",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
if p, ok := prog.(lang.DependenciesAwareProgram); ok {
|
||||
|
@ -289,13 +294,19 @@ func (c *Controller) compileQuery(q *Query, compiler flux.Compiler) (err error)
|
|||
|
||||
func (c *Controller) enqueueQuery(q *Query) error {
|
||||
if _, ok := q.tryQueue(); !ok {
|
||||
return errors.New("failed to transition query to queueing state")
|
||||
return &flux.Error{
|
||||
Code: codes.Internal,
|
||||
Msg: "failed to transition query to queueing state",
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case c.queryQueue <- q:
|
||||
default:
|
||||
return errors.New("queue length exceeded")
|
||||
return &flux.Error{
|
||||
Code: codes.ResourceExhausted,
|
||||
Msg: "queue length exceeded",
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -334,7 +345,10 @@ func (c *Controller) executeQuery(q *Query) {
|
|||
// client cancelled it, or because the controller is shutting down)
|
||||
// In the case of cancellation, SetErr() should reset the error to an
|
||||
// appropriate message.
|
||||
q.setErr(errors.New("impossible state transition"))
|
||||
q.setErr(&flux.Error{
|
||||
Code: codes.Internal,
|
||||
Msg: "impossible state transition",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -342,7 +356,6 @@ func (c *Controller) executeQuery(q *Query) {
|
|||
q.alloc.Limit = func(v int64) *int64 { return &v }(c.memoryBytesQuotaPerQuery)
|
||||
exec, err := q.program.Start(ctx, q.alloc)
|
||||
if err != nil {
|
||||
q.addRuntimeError(err)
|
||||
q.setErr(err)
|
||||
return
|
||||
}
|
||||
|
@ -656,7 +669,7 @@ func (q *Query) Err() error {
|
|||
q.stateMu.Lock()
|
||||
err := q.err
|
||||
q.stateMu.Unlock()
|
||||
return err
|
||||
return handleFluxError(err)
|
||||
}
|
||||
|
||||
// setErr marks this query with an error. If the query was
|
||||
|
@ -807,8 +820,6 @@ const (
|
|||
|
||||
// Queueing indicates the query is waiting inside of the
|
||||
// scheduler to be executed.
|
||||
// TODO(jsternberg): This stage isn't used currently, but
|
||||
// it makes sense to readd this once we have a work queue again.
|
||||
Queueing
|
||||
|
||||
// Executing indicates that the query is currently executing.
|
||||
|
@ -889,3 +900,48 @@ func (s *span) Finish() {
|
|||
s.hist.Observe(s.Duration.Seconds())
|
||||
s.gauge.Dec()
|
||||
}
|
||||
|
||||
// handleFluxError will take a flux.Error and convert it into an influxdb.Error.
|
||||
// It will match certain codes to the equivalent in influxdb.
|
||||
//
|
||||
// If the error is any other type of error, it will return the error untouched.
|
||||
//
|
||||
// TODO(jsternberg): This likely becomes a public function, but this is just an initial
|
||||
// implementation so playing it safe by making it package local for now.
|
||||
func handleFluxError(err error) error {
|
||||
ferr, ok := err.(*flux.Error)
|
||||
if !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
code := influxdb.EInternal
|
||||
switch flux.ErrorCode(err) {
|
||||
case codes.NotFound:
|
||||
code = influxdb.ENotFound
|
||||
case codes.Invalid:
|
||||
code = influxdb.EInvalid
|
||||
// These don't really map correctly, but we want
|
||||
// them to show up as 4XX so until influxdb error
|
||||
// codes are updated for more types of failures,
|
||||
// mapping these to invalid.
|
||||
case codes.Canceled,
|
||||
codes.ResourceExhausted,
|
||||
codes.FailedPrecondition,
|
||||
codes.Aborted,
|
||||
codes.OutOfRange,
|
||||
codes.Unimplemented:
|
||||
code = influxdb.EInvalid
|
||||
case codes.PermissionDenied:
|
||||
code = influxdb.EForbidden
|
||||
case codes.Unauthenticated:
|
||||
code = influxdb.EUnauthorized
|
||||
default:
|
||||
// Everything else is treated as an internal error
|
||||
// which is set above.
|
||||
}
|
||||
return &influxdb.Error{
|
||||
Code: code,
|
||||
Msg: ferr.Msg,
|
||||
Err: handleFluxError(ferr.Err),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
"github.com/influxdata/flux"
|
||||
_ "github.com/influxdata/flux/builtin"
|
||||
"github.com/influxdata/flux/codes"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/execute/executetest"
|
||||
"github.com/influxdata/flux/lang"
|
||||
|
@ -319,7 +320,7 @@ func TestController_AfterShutdown(t *testing.T) {
|
|||
|
||||
if _, err := ctrl.Query(context.Background(), makeRequest(mockCompiler)); err == nil {
|
||||
t.Error("expected error")
|
||||
} else if got, want := err.Error(), "<invalid> query controller shutdown"; got != want {
|
||||
} else if got, want := err.Error(), "query controller shutdown"; got != want {
|
||||
t.Errorf("unexpected error -want/+got\n\t- %q\n\t+ %q", want, got)
|
||||
}
|
||||
}
|
||||
|
@ -333,12 +334,17 @@ func TestController_CompileError(t *testing.T) {
|
|||
|
||||
compiler := &mock.Compiler{
|
||||
CompileFn: func(ctx context.Context) (flux.Program, error) {
|
||||
return nil, errors.New("expected error")
|
||||
return nil, &flux.Error{
|
||||
Code: codes.Invalid,
|
||||
Msg: "expected error",
|
||||
}
|
||||
},
|
||||
}
|
||||
if _, err := ctrl.Query(context.Background(), makeRequest(compiler)); err == nil {
|
||||
t.Error("expected error")
|
||||
} else if got, want := err.Error(), "<invalid> compilation failed: expected error"; got != want {
|
||||
} else if got, want := err.Error(), "<invalid> expected error"; got != want {
|
||||
// TODO(jsternberg): This should be "<invalid> compilation error: expected error", but the
|
||||
// influxdb error library does not include the message when it is wrapping an error for some reason.
|
||||
t.Errorf("unexpected error -want/+got\n\t- %q\n\t+ %q", want, got)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue