influxdb/http/query_service.go

237 lines
5.5 KiB
Go

package http
import (
"bytes"
"context"
"encoding/json"
"net/http"
"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
"github.com/influxdata/influxdb/query"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
const (
queryPath = "/api/v2/querysvc"
queryStatisticsTrailer = "Influx-Query-Statistics"
)
type QueryHandler struct {
*httprouter.Router
Logger *zap.Logger
csvDialect csv.Dialect
QueryService query.QueryService
CompilerMappings flux.CompilerMappings
}
// NewQueryHandler returns a new instance of QueryHandler.
func NewQueryHandler() *QueryHandler {
h := &QueryHandler{
Router: NewRouter(),
csvDialect: csv.Dialect{
ResultEncoderConfig: csv.DefaultEncoderConfig(),
},
}
h.HandlerFunc("GET", "/ping", h.handlePing)
h.HandlerFunc("POST", queryPath, h.handlePostQuery)
return h
}
// handlePing returns a simple response to let the client know the server is running.
func (h *QueryHandler) handlePing(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusNoContent)
}
// handlePostQuery is the HTTP handler for the POST /api/v2/query route.
func (h *QueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var req query.Request
req.WithCompilerMappings(h.CompilerMappings)
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
EncodeError(ctx, err, w)
return
}
results, err := h.QueryService.Query(ctx, &req)
if err != nil {
EncodeError(ctx, err, w)
return
}
// Always cancel the results to free resources.
// If all results were consumed cancelling does nothing.
defer results.Release()
// Setup headers
stats, hasStats := results.(flux.Statisticser)
if hasStats {
w.Header().Set("Trailer", queryStatisticsTrailer)
}
// NOTE: We do not write out the headers here.
// It is possible that if the encoding step fails
// that we can write an error header so long as
// the encoder did not write anything.
// As such we rely on the http.ResponseWriter behavior
// to write an StatusOK header with the first write.
switch r.Header.Get("Accept") {
case "text/csv":
fallthrough
default:
h.csvDialect.SetHeaders(w)
encoder := h.csvDialect.Encoder()
n, err := encoder.Encode(w, results)
if err != nil {
if n == 0 {
// If the encoder did not write anything, we can write an error header.
EncodeError(ctx, err, w)
} else {
h.Logger.Info("Failed to encode client response",
zap.Error(err),
)
}
}
}
if hasStats {
data, err := json.Marshal(stats.Statistics())
if err != nil {
h.Logger.Info("Failed to encode statistics", zap.Error(err))
return
}
// Write statisitcs trailer
w.Header().Set(queryStatisticsTrailer, string(data))
}
}
// PrometheusCollectors satisifies the prom.PrometheusCollector interface.
func (h *QueryHandler) PrometheusCollectors() []prometheus.Collector {
// TODO: gather and return relevant metrics.
return nil
}
type QueryService struct {
Addr string
Token string
InsecureSkipVerify bool
}
// Ping checks to see if the server is responding to a ping request.
func (s *QueryService) Ping(ctx context.Context) error {
u, err := newURL(s.Addr, "/ping")
if err != nil {
return err
}
hreq, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return err
}
SetToken(s.Token, hreq)
hreq = hreq.WithContext(ctx)
hc := newClient(u.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(hreq)
if err != nil {
return err
}
return CheckError(resp)
}
// Query calls the query route with the requested query and returns a result iterator.
func (s *QueryService) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error) {
u, err := newURL(s.Addr, queryPath)
if err != nil {
return nil, err
}
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(req); err != nil {
return nil, err
}
hreq, err := http.NewRequest("POST", u.String(), &body)
if err != nil {
return nil, err
}
SetToken(s.Token, hreq)
hreq = hreq.WithContext(ctx)
hc := newClient(u.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(hreq)
if err != nil {
return nil, err
}
if err := CheckError(resp); err != nil {
return nil, err
}
var decoder flux.MultiResultDecoder
switch resp.Header.Get("Content-Type") {
case "text/csv":
fallthrough
default:
decoder = csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
}
results, err := decoder.Decode(resp.Body)
if err != nil {
return nil, err
}
statResults := &statsResultIterator{
results: results,
resp: resp,
}
return statResults, nil
}
// statsResultIterator implements flux.ResultIterator and flux.Statisticser by reading the HTTP trailers.
type statsResultIterator struct {
results flux.ResultIterator
resp *http.Response
statisitcs flux.Statistics
err error
}
func (s *statsResultIterator) More() bool {
return s.results.More()
}
func (s *statsResultIterator) Next() flux.Result {
return s.results.Next()
}
func (s *statsResultIterator) Release() {
s.results.Release()
s.readStats()
}
func (s *statsResultIterator) Err() error {
err := s.results.Err()
if err != nil {
return err
}
return s.err
}
func (s *statsResultIterator) Statistics() flux.Statistics {
return s.statisitcs
}
// readStats reads the query statisitcs off the response trailers.
func (s *statsResultIterator) readStats() {
data := s.resp.Trailer.Get(queryStatisticsTrailer)
if data != "" {
s.err = json.Unmarshal([]byte(data), &s.statisitcs)
}
}