Add support for Prometheus remote read and write API. (#8784)
Adds a new package prometheus for converting from remote reads and writes to Influx queries and points. Adds two new endpoints to the httpd handler to support prometheus remote read at /api/v1/prom/read and remote write at /api/v1/prom/write. The only thing used from Prometheus is the storage/remote files that are generated from the remote.proto file. Copied that file into promtheus/remote package to avoid an extra dependency.pull/8802/head
parent
e18425757d
commit
f30eba380e
|
@ -0,0 +1,172 @@
|
|||
package prometheus
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/prometheus/remote"
|
||||
)
|
||||
|
||||
const (
|
||||
// measurementName is where all prometheus time series go to
|
||||
measurementName = "_"
|
||||
|
||||
// fieldName is the field all prometheus values get written to
|
||||
fieldName = "f64"
|
||||
)
|
||||
|
||||
var ErrNaNDropped = errors.New("dropped NaN from Prometheus since they are not supported")
|
||||
|
||||
// WriteRequestToPoints converts a Prometheus remote write request of time series and their
|
||||
// samples into Points that can be written into Influx
|
||||
func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
|
||||
var maxPoints int
|
||||
for _, ts := range req.Timeseries {
|
||||
maxPoints += len(ts.Samples)
|
||||
}
|
||||
points := make([]models.Point, 0, maxPoints)
|
||||
|
||||
var droppedNaN error
|
||||
|
||||
for _, ts := range req.Timeseries {
|
||||
tags := make(map[string]string, len(ts.Labels))
|
||||
for _, l := range ts.Labels {
|
||||
tags[l.Name] = l.Value
|
||||
}
|
||||
|
||||
for _, s := range ts.Samples {
|
||||
// skip NaN values, which are valid in Prometheus
|
||||
if math.IsNaN(s.Value) {
|
||||
droppedNaN = ErrNaNDropped
|
||||
continue
|
||||
}
|
||||
|
||||
// convert and append
|
||||
t := time.Unix(0, s.TimestampMs*int64(time.Millisecond))
|
||||
fields := map[string]interface{}{fieldName: s.Value}
|
||||
p, err := models.NewPoint(measurementName, models.NewTags(tags), fields, t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
points = append(points, p)
|
||||
}
|
||||
}
|
||||
return points, droppedNaN
|
||||
}
|
||||
|
||||
// ReadRequestToInfluxQLQuery converts a Prometheus remote read request to an equivalent InfluxQL
|
||||
// query that will return the requested data when executed
|
||||
func ReadRequestToInfluxQLQuery(req *remote.ReadRequest, db, rp string) (*influxql.Query, error) {
|
||||
if len(req.Queries) != 1 {
|
||||
return nil, errors.New("Prometheus read endpoint currently only supports one query at a time")
|
||||
}
|
||||
promQuery := req.Queries[0]
|
||||
|
||||
stmt := &influxql.SelectStatement{
|
||||
IsRawQuery: true,
|
||||
Fields: []*influxql.Field{
|
||||
{Expr: &influxql.VarRef{Val: fieldName}},
|
||||
},
|
||||
Sources: []influxql.Source{&influxql.Measurement{
|
||||
Name: measurementName,
|
||||
Database: db,
|
||||
RetentionPolicy: rp,
|
||||
}},
|
||||
Dimensions: []*influxql.Dimension{{Expr: &influxql.Wildcard{}}},
|
||||
}
|
||||
|
||||
cond, err := condFromMatchers(promQuery, promQuery.Matchers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stmt.Condition = cond
|
||||
|
||||
return &influxql.Query{Statements: []influxql.Statement{stmt}}, nil
|
||||
}
|
||||
|
||||
// condFromMatcher converts a Prometheus LabelMatcher into an equivalent InfluxQL BinaryExpr
|
||||
func condFromMatcher(m *remote.LabelMatcher) (*influxql.BinaryExpr, error) {
|
||||
var op influxql.Token
|
||||
switch m.Type {
|
||||
case remote.MatchType_EQUAL:
|
||||
op = influxql.EQ
|
||||
case remote.MatchType_NOT_EQUAL:
|
||||
op = influxql.NEQ
|
||||
case remote.MatchType_REGEX_MATCH:
|
||||
op = influxql.EQREGEX
|
||||
case remote.MatchType_REGEX_NO_MATCH:
|
||||
op = influxql.NEQREGEX
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown match type %v", m.Type)
|
||||
}
|
||||
|
||||
return &influxql.BinaryExpr{
|
||||
Op: op,
|
||||
LHS: &influxql.VarRef{Val: m.Name},
|
||||
RHS: &influxql.StringLiteral{Val: m.Value},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// condFromMatchers converts a Prometheus remote query and a collection of Prometheus label matchers
|
||||
// into an equivalent influxql.BinaryExpr. This assume a schema that is written via the Prometheus
|
||||
// remote write endpoint, which uses a measurement name of _ and a field name of f64. Tags and labels
|
||||
// are kept equivalent.
|
||||
func condFromMatchers(q *remote.Query, matchers []*remote.LabelMatcher) (*influxql.BinaryExpr, error) {
|
||||
if len(matchers) > 0 {
|
||||
lhs, err := condFromMatcher(matchers[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rhs, err := condFromMatchers(q, matchers[1:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &influxql.BinaryExpr{
|
||||
Op: influxql.AND,
|
||||
LHS: lhs,
|
||||
RHS: rhs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &influxql.BinaryExpr{
|
||||
Op: influxql.AND,
|
||||
LHS: &influxql.BinaryExpr{
|
||||
Op: influxql.GTE,
|
||||
LHS: &influxql.VarRef{Val: "time"},
|
||||
RHS: &influxql.TimeLiteral{Val: time.Unix(0, q.StartTimestampMs*int64(time.Millisecond))},
|
||||
},
|
||||
RHS: &influxql.BinaryExpr{
|
||||
Op: influxql.LTE,
|
||||
LHS: &influxql.VarRef{Val: "time"},
|
||||
RHS: &influxql.TimeLiteral{Val: time.Unix(0, q.EndTimestampMs*int64(time.Millisecond))},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// TagsToLabelPairs converts a map of Influx tags into a slice of Prometheus label pairs
|
||||
func TagsToLabelPairs(tags map[string]string) []*remote.LabelPair {
|
||||
pairs := make([]*remote.LabelPair, 0, len(tags))
|
||||
for k, v := range tags {
|
||||
if v == "" {
|
||||
// If we select metrics with different sets of labels names,
|
||||
// InfluxDB returns *all* possible tag names on all returned
|
||||
// series, with empty tag values on series where they don't
|
||||
// apply. In Prometheus, an empty label value is equivalent
|
||||
// to a non-existent label, so we just skip empty ones here
|
||||
// to make the result correct.
|
||||
continue
|
||||
}
|
||||
pairs = append(pairs, &remote.LabelPair{
|
||||
Name: k,
|
||||
Value: v,
|
||||
})
|
||||
}
|
||||
return pairs
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
package remote
|
||||
|
||||
//go:generate protoc -I$GOPATH/src -I. --gogofaster_out=. remote.proto
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,70 @@
|
|||
// This file is copied (except for package name) from https://github.com/prometheus/prometheus/blob/master/storage/remote/remote.proto
|
||||
|
||||
// Copyright 2016 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package remote;
|
||||
|
||||
message Sample {
|
||||
double value = 1;
|
||||
int64 timestamp_ms = 2;
|
||||
}
|
||||
|
||||
message LabelPair {
|
||||
string name = 1;
|
||||
string value = 2;
|
||||
}
|
||||
|
||||
message TimeSeries {
|
||||
repeated LabelPair labels = 1;
|
||||
// Sorted by time, oldest sample first.
|
||||
repeated Sample samples = 2;
|
||||
}
|
||||
|
||||
message WriteRequest {
|
||||
repeated TimeSeries timeseries = 1;
|
||||
}
|
||||
|
||||
message ReadRequest {
|
||||
repeated Query queries = 1;
|
||||
}
|
||||
|
||||
message ReadResponse {
|
||||
// In same order as the request's queries.
|
||||
repeated QueryResult results = 1;
|
||||
}
|
||||
|
||||
message Query {
|
||||
int64 start_timestamp_ms = 1;
|
||||
int64 end_timestamp_ms = 2;
|
||||
repeated LabelMatcher matchers = 3;
|
||||
}
|
||||
|
||||
enum MatchType {
|
||||
EQUAL = 0;
|
||||
NOT_EQUAL = 1;
|
||||
REGEX_MATCH = 2;
|
||||
REGEX_NO_MATCH = 3;
|
||||
}
|
||||
|
||||
message LabelMatcher {
|
||||
MatchType type = 1;
|
||||
string name = 2;
|
||||
string value = 3;
|
||||
}
|
||||
|
||||
message QueryResult {
|
||||
repeated TimeSeries timeseries = 1;
|
||||
}
|
|
@ -8,6 +8,7 @@ import (
|
|||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math"
|
||||
"net/http"
|
||||
|
@ -20,11 +21,15 @@ import (
|
|||
|
||||
"github.com/bmizerany/pat"
|
||||
"github.com/dgrijalva/jwt-go"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/monitor"
|
||||
"github.com/influxdata/influxdb/monitor/diagnostics"
|
||||
"github.com/influxdata/influxdb/prometheus"
|
||||
"github.com/influxdata/influxdb/prometheus/remote"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
|
@ -141,6 +146,14 @@ func NewHandler(c Config) *Handler {
|
|||
"write", // Data-ingest route.
|
||||
"POST", "/write", true, true, h.serveWrite,
|
||||
},
|
||||
Route{
|
||||
"prometheus-write", // Prometheus remote write
|
||||
"POST", "/api/v1/prom/write", false, true, h.servePromWrite,
|
||||
},
|
||||
Route{
|
||||
"prometheus-read", // Prometheus remote read
|
||||
"POST", "/api/v1/prom/read", true, true, h.servePromRead,
|
||||
},
|
||||
Route{ // Ping
|
||||
"ping",
|
||||
"GET", "/ping", false, true, h.servePing,
|
||||
|
@ -184,6 +197,8 @@ type Statistics struct {
|
|||
ClientErrors int64
|
||||
ServerErrors int64
|
||||
RecoveredPanics int64
|
||||
PromWriteRequests int64
|
||||
PromReadRequests int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
|
@ -211,6 +226,8 @@ func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
|
|||
statClientError: atomic.LoadInt64(&h.stats.ClientErrors),
|
||||
statServerError: atomic.LoadInt64(&h.stats.ServerErrors),
|
||||
statRecoveredPanics: atomic.LoadInt64(&h.stats.RecoveredPanics),
|
||||
statPromWriteRequest: atomic.LoadInt64(&h.stats.PromWriteRequests),
|
||||
statPromReadRequest: atomic.LoadInt64(&h.stats.PromReadRequests),
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
@ -760,6 +777,276 @@ func convertToEpoch(r *query.Result, epoch string) {
|
|||
}
|
||||
}
|
||||
|
||||
// servePromWrite receives data in the Prometheus remote write protocol and writes it
|
||||
// to the database
|
||||
func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user meta.User) {
|
||||
atomic.AddInt64(&h.stats.WriteRequests, 1)
|
||||
atomic.AddInt64(&h.stats.ActiveWriteRequests, 1)
|
||||
atomic.AddInt64(&h.stats.PromWriteRequests, 1)
|
||||
defer func(start time.Time) {
|
||||
atomic.AddInt64(&h.stats.ActiveWriteRequests, -1)
|
||||
atomic.AddInt64(&h.stats.WriteRequestDuration, time.Since(start).Nanoseconds())
|
||||
}(time.Now())
|
||||
h.requestTracker.Add(r, user)
|
||||
|
||||
database := r.URL.Query().Get("db")
|
||||
if database == "" {
|
||||
h.httpError(w, "database is required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if di := h.MetaClient.Database(database); di == nil {
|
||||
h.httpError(w, fmt.Sprintf("database not found: %q", database), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
if h.Config.AuthEnabled {
|
||||
if user == nil {
|
||||
h.httpError(w, fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.WriteAuthorizer.AuthorizeWrite(user.ID(), database); err != nil {
|
||||
h.httpError(w, fmt.Sprintf("%q user is not authorized to write to database %q", user.ID(), database), http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
body := r.Body
|
||||
if h.Config.MaxBodySize > 0 {
|
||||
body = truncateReader(body, int64(h.Config.MaxBodySize))
|
||||
}
|
||||
|
||||
var bs []byte
|
||||
if r.ContentLength > 0 {
|
||||
if h.Config.MaxBodySize > 0 && r.ContentLength > int64(h.Config.MaxBodySize) {
|
||||
h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
|
||||
return
|
||||
}
|
||||
|
||||
// This will just be an initial hint for the reader, as the
|
||||
// bytes.Buffer will grow as needed when ReadFrom is called
|
||||
bs = make([]byte, 0, r.ContentLength)
|
||||
}
|
||||
buf := bytes.NewBuffer(bs)
|
||||
|
||||
_, err := buf.ReadFrom(body)
|
||||
if err != nil {
|
||||
if err == errTruncated {
|
||||
h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
|
||||
return
|
||||
}
|
||||
|
||||
if h.Config.WriteTracing {
|
||||
h.Logger.Info("Prom write handler unable to read bytes from request body")
|
||||
}
|
||||
h.httpError(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len()))
|
||||
|
||||
if h.Config.WriteTracing {
|
||||
h.Logger.Info(fmt.Sprintf("Prom write body received by handler: %s", buf.Bytes()))
|
||||
}
|
||||
|
||||
reqBuf, err := snappy.Decode(nil, buf.Bytes())
|
||||
if err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Convert the Prometheus remote write request to Influx Points
|
||||
var req remote.WriteRequest
|
||||
if err := proto.Unmarshal(reqBuf, &req); err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
points, err := prometheus.WriteRequestToPoints(&req)
|
||||
if err != nil {
|
||||
if h.Config.WriteTracing {
|
||||
h.Logger.Info(fmt.Sprintf("Prom write handler: %s", err.Error()))
|
||||
}
|
||||
|
||||
if err != prometheus.ErrNaNDropped {
|
||||
h.httpError(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Determine required consistency level.
|
||||
level := r.URL.Query().Get("consistency")
|
||||
consistency := models.ConsistencyLevelOne
|
||||
if level != "" {
|
||||
consistency, err = models.ParseConsistencyLevel(level)
|
||||
if err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Write points.
|
||||
if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) {
|
||||
atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
|
||||
h.httpError(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
} else if influxdb.IsAuthorizationError(err) {
|
||||
atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
|
||||
h.httpError(w, err.Error(), http.StatusForbidden)
|
||||
return
|
||||
} else if werr, ok := err.(tsdb.PartialWriteError); ok {
|
||||
atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)-werr.Dropped))
|
||||
atomic.AddInt64(&h.stats.PointsWrittenDropped, int64(werr.Dropped))
|
||||
h.httpError(w, werr.Error(), http.StatusBadRequest)
|
||||
return
|
||||
} else if err != nil {
|
||||
atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
|
||||
h.httpError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)))
|
||||
h.writeHeader(w, http.StatusNoContent)
|
||||
}
|
||||
|
||||
// servePromRead will convert a Prometheus remote read request into an InfluxQL query and
|
||||
// return data in Prometheus remote read protobuf format.
|
||||
func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user meta.User) {
|
||||
compressed, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
reqBuf, err := snappy.Decode(nil, compressed)
|
||||
if err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var req remote.ReadRequest
|
||||
if err := proto.Unmarshal(reqBuf, &req); err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Query the DB and create a ReadResponse for Prometheus
|
||||
db := r.FormValue("db")
|
||||
q, err := prometheus.ReadRequestToInfluxQLQuery(&req, db, r.FormValue("rp"))
|
||||
if err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Check authorization.
|
||||
if h.Config.AuthEnabled {
|
||||
if err := h.QueryAuthorizer.AuthorizeQuery(user, q, db); err != nil {
|
||||
if err, ok := err.(meta.ErrAuthorize); ok {
|
||||
h.Logger.Info(fmt.Sprintf("Unauthorized request | user: %q | query: %q | database %q", err.User, err.Query.String(), err.Database))
|
||||
}
|
||||
h.httpError(w, "error authorizing query: "+err.Error(), http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
opts := query.ExecutionOptions{
|
||||
Database: db,
|
||||
ChunkSize: DefaultChunkSize,
|
||||
ReadOnly: true,
|
||||
}
|
||||
|
||||
if h.Config.AuthEnabled {
|
||||
// The current user determines the authorized actions.
|
||||
opts.Authorizer = user
|
||||
} else {
|
||||
// Auth is disabled, so allow everything.
|
||||
opts.Authorizer = query.OpenAuthorizer{}
|
||||
}
|
||||
|
||||
// Make sure if the client disconnects we signal the query to abort
|
||||
var closing chan struct{}
|
||||
closing = make(chan struct{})
|
||||
if notifier, ok := w.(http.CloseNotifier); ok {
|
||||
// CloseNotify() is not guaranteed to send a notification when the query
|
||||
// is closed. Use this channel to signal that the query is finished to
|
||||
// prevent lingering goroutines that may be stuck.
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
|
||||
notify := notifier.CloseNotify()
|
||||
go func() {
|
||||
// Wait for either the request to finish
|
||||
// or for the client to disconnect
|
||||
select {
|
||||
case <-done:
|
||||
case <-notify:
|
||||
close(closing)
|
||||
}
|
||||
}()
|
||||
opts.AbortCh = done
|
||||
} else {
|
||||
defer close(closing)
|
||||
}
|
||||
|
||||
// Execute query.
|
||||
results := h.QueryExecutor.ExecuteQuery(q, opts, closing)
|
||||
|
||||
resp := &remote.ReadResponse{
|
||||
Results: []*remote.QueryResult{{}},
|
||||
}
|
||||
|
||||
// pull all results from the channel
|
||||
for r := range results {
|
||||
// Ignore nil results.
|
||||
if r == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// read the series data and convert into Prometheus samples
|
||||
for _, s := range r.Series {
|
||||
ts := &remote.TimeSeries{
|
||||
Labels: prometheus.TagsToLabelPairs(s.Tags),
|
||||
}
|
||||
|
||||
for _, v := range s.Values {
|
||||
t, ok := v[0].(time.Time)
|
||||
if !ok {
|
||||
h.httpError(w, fmt.Sprintf("value %v wasn't a time", v[0]), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
val, ok := v[1].(float64)
|
||||
if !ok {
|
||||
h.httpError(w, fmt.Sprintf("value %v wasn't a float64", v[1]), http.StatusBadRequest)
|
||||
}
|
||||
timestamp := t.UnixNano() / int64(time.Millisecond) / int64(time.Nanosecond)
|
||||
ts.Samples = append(ts.Samples, &remote.Sample{
|
||||
TimestampMs: timestamp,
|
||||
Value: val,
|
||||
})
|
||||
}
|
||||
|
||||
resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, ts)
|
||||
}
|
||||
}
|
||||
|
||||
data, err := proto.Marshal(resp)
|
||||
if err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/x-protobuf")
|
||||
w.Header().Set("Content-Encoding", "snappy")
|
||||
|
||||
compressed = snappy.Encode(nil, data)
|
||||
if _, err := w.Write(compressed); err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(len(compressed)))
|
||||
}
|
||||
|
||||
// serveExpvar serves internal metrics in /debug/vars format over HTTP.
|
||||
func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
|
||||
// Retrieve statistics from the monitor.
|
||||
|
|
|
@ -6,18 +6,23 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dgrijalva/jwt-go"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/internal"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/prometheus/remote"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/services/httpd"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
|
@ -524,6 +529,137 @@ func TestHandler_Query_CloseNotify(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure the prometheus remote write works
|
||||
func TestHandler_PromWrite(t *testing.T) {
|
||||
req := &remote.WriteRequest{
|
||||
Timeseries: []*remote.TimeSeries{
|
||||
{
|
||||
Labels: []*remote.LabelPair{
|
||||
{Name: "host", Value: "a"},
|
||||
{Name: "region", Value: "west"},
|
||||
},
|
||||
Samples: []*remote.Sample{
|
||||
{TimestampMs: 1, Value: 1.2},
|
||||
{TimestampMs: 2, Value: math.NaN()},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
data, err := proto.Marshal(req)
|
||||
if err != nil {
|
||||
t.Fatal("couldn't marshal prometheus request")
|
||||
}
|
||||
compressed := snappy.Encode(nil, data)
|
||||
|
||||
b := bytes.NewReader(compressed)
|
||||
h := NewHandler(false)
|
||||
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
|
||||
return &meta.DatabaseInfo{}
|
||||
}
|
||||
called := false
|
||||
h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error {
|
||||
called = true
|
||||
point := points[0]
|
||||
if point.UnixNano() != int64(time.Millisecond) {
|
||||
t.Fatalf("Exp point time %d but got %d", int64(time.Millisecond), point.UnixNano())
|
||||
}
|
||||
tags := point.Tags()
|
||||
expectedTags := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}}
|
||||
if !reflect.DeepEqual(tags, expectedTags) {
|
||||
t.Fatalf("tags don't match\n\texp: %v\n\tgot: %v", expectedTags, tags)
|
||||
}
|
||||
|
||||
fields, err := point.Fields()
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
expFields := models.Fields{"f64": 1.2}
|
||||
if !reflect.DeepEqual(fields, expFields) {
|
||||
t.Fatalf("fields don't match\n\texp: %v\n\tgot: %v", expFields, fields)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/write?db=foo", b))
|
||||
if !called {
|
||||
t.Fatal("WritePoints: expected call")
|
||||
}
|
||||
if w.Code != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status: %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure Prometheus remote read requests are converted to the correct InfluxQL query and
|
||||
// data is returned
|
||||
func TestHandler_PromRead(t *testing.T) {
|
||||
req := &remote.ReadRequest{
|
||||
Queries: []*remote.Query{{
|
||||
Matchers: []*remote.LabelMatcher{
|
||||
{Type: remote.MatchType_EQUAL, Name: "eq", Value: "a"},
|
||||
{Type: remote.MatchType_NOT_EQUAL, Name: "neq", Value: "b"},
|
||||
{Type: remote.MatchType_REGEX_MATCH, Name: "regex", Value: "c"},
|
||||
{Type: remote.MatchType_REGEX_NO_MATCH, Name: "neqregex", Value: "d"},
|
||||
},
|
||||
StartTimestampMs: 1,
|
||||
EndTimestampMs: 2,
|
||||
}},
|
||||
}
|
||||
data, err := proto.Marshal(req)
|
||||
if err != nil {
|
||||
t.Fatal("couldn't marshal prometheus request")
|
||||
}
|
||||
compressed := snappy.Encode(nil, data)
|
||||
b := bytes.NewReader(compressed)
|
||||
|
||||
h := NewHandler(false)
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
if stmt.String() != `SELECT f64 FROM foo.._ WHERE eq = 'a' AND neq != 'b' AND regex =~ 'c' AND neqregex !~ 'd' AND time >= '1970-01-01T00:00:00.001Z' AND time <= '1970-01-01T00:00:00.002Z' GROUP BY *` {
|
||||
t.Fatalf("unexpected query: %s", stmt.String())
|
||||
} else if ctx.Database != `foo` {
|
||||
t.Fatalf("unexpected db: %s", ctx.Database)
|
||||
}
|
||||
row := &models.Row{
|
||||
Name: "_",
|
||||
Tags: map[string]string{"foo": "bar"},
|
||||
Columns: []string{"time", "f64"},
|
||||
Values: [][]interface{}{{time.Unix(23, 0), 1.2}},
|
||||
}
|
||||
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{row})}
|
||||
return nil
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
h.ServeHTTP(w, MustNewJSONRequest("POST", "/api/v1/prom/read?db=foo", b))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d", w.Code)
|
||||
}
|
||||
|
||||
reqBuf, err := snappy.Decode(nil, w.Body.Bytes())
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
var resp remote.ReadResponse
|
||||
if err := proto.Unmarshal(reqBuf, &resp); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
expLabels := []*remote.LabelPair{{Name: "foo", Value: "bar"}}
|
||||
expSamples := []*remote.Sample{{TimestampMs: 23000, Value: 1.2}}
|
||||
|
||||
ts := resp.Results[0].Timeseries[0]
|
||||
|
||||
if !reflect.DeepEqual(expLabels, ts.Labels) {
|
||||
t.Fatalf("unexpected labels\n\texp: %v\n\tgot: %v", expLabels, ts.Labels)
|
||||
}
|
||||
if !reflect.DeepEqual(expSamples, ts.Samples) {
|
||||
t.Fatalf("unexpectd samples\n\texp: %v\n\tgot: %v", expSamples, ts.Samples)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the handler handles ping requests correctly.
|
||||
// TODO: This should be expanded to verify the MetaClient check in servePing is working correctly
|
||||
func TestHandler_Ping(t *testing.T) {
|
||||
|
|
|
@ -38,6 +38,10 @@ const (
|
|||
statClientError = "clientError" // Number of HTTP responses due to client error.
|
||||
statServerError = "serverError" // Number of HTTP responses due to server error.
|
||||
statRecoveredPanics = "recoveredPanics" // Number of panics recovered by HTTP handler.
|
||||
|
||||
// Prometheus stats
|
||||
statPromWriteRequest = "promWriteReq" // Number of write requests to the promtheus endpoint
|
||||
statPromReadRequest = "promReadReq" // Number of read requests to the prometheus endpoint
|
||||
)
|
||||
|
||||
// Service manages the listener and handler for an HTTP endpoint.
|
||||
|
|
Loading…
Reference in New Issue