fix(influxql): add explicit routing to influxql service (#16832)
Prior to this change influxql requests were sent to the same back end as Flux queries. This MAY not always be the case. Now InfluxQL queries are specifically routed to the InfluxQLService. In the case of this OSS build the FluxService and InfluxQLService are the same.pull/16891/head
parent
f5de43b973
commit
03f65cf045
|
@ -793,7 +793,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
VariableService: variableSvc,
|
||||
PasswordsService: passwdsSvc,
|
||||
OnboardingService: onboardingSvc,
|
||||
InfluxQLService: nil, // No InfluxQL support
|
||||
InfluxQLService: storageQueryService,
|
||||
FluxService: storageQueryService,
|
||||
TaskService: taskSvc,
|
||||
TelegrafService: telegrafSvc,
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/NYTimes/gziphandler"
|
||||
|
@ -27,6 +28,7 @@ import (
|
|||
kithttp "github.com/influxdata/influxdb/kit/transport/http"
|
||||
"github.com/influxdata/influxdb/logger"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/query/influxql"
|
||||
"github.com/pkg/errors"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
|
@ -55,7 +57,10 @@ func NewFluxBackend(log *zap.Logger, b *APIBackend) *FluxBackend {
|
|||
log: log,
|
||||
QueryEventRecorder: b.QueryEventRecorder,
|
||||
|
||||
ProxyQueryService: b.FluxService,
|
||||
ProxyQueryService: routingQueryService{
|
||||
InfluxQLService: b.InfluxQLService,
|
||||
DefaultService: b.FluxService,
|
||||
},
|
||||
OrganizationService: b.OrganizationService,
|
||||
}
|
||||
}
|
||||
|
@ -608,3 +613,38 @@ func QueryHealthCheck(url string, insecureSkipVerify bool) check.Response {
|
|||
|
||||
return healthResponse
|
||||
}
|
||||
|
||||
// routingQueryService routes queries to specific query services based on their compiler type.
|
||||
type routingQueryService struct {
|
||||
// InfluxQLService handles queries with compiler type of "influxql"
|
||||
InfluxQLService query.ProxyQueryService
|
||||
// DefaultService handles all other queries
|
||||
DefaultService query.ProxyQueryService
|
||||
}
|
||||
|
||||
func (s routingQueryService) Check(ctx context.Context) check.Response {
|
||||
// Produce combined check response
|
||||
response := check.Response{
|
||||
Name: "internal-routingQueryService",
|
||||
Status: check.StatusPass,
|
||||
}
|
||||
def := s.DefaultService.Check(ctx)
|
||||
influxql := s.InfluxQLService.Check(ctx)
|
||||
if def.Status == check.StatusFail {
|
||||
response.Status = def.Status
|
||||
response.Message = def.Message
|
||||
} else if influxql.Status == check.StatusFail {
|
||||
response.Status = influxql.Status
|
||||
response.Message = influxql.Message
|
||||
}
|
||||
response.Checks = []check.Response{def, influxql}
|
||||
sort.Sort(response.Checks)
|
||||
return response
|
||||
}
|
||||
|
||||
func (s routingQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
||||
if req.Request.Compiler.CompilerType() == influxql.CompilerType {
|
||||
return s.InfluxQLService.Query(ctx, w, req)
|
||||
}
|
||||
return s.DefaultService.Query(ctx, w, req)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue