Merge pull request #9967 from influxdata/pd-prometheus-to-new-storage

Use storage package for Prometheus remote read
pull/9972/head
Edd Robinson 2018-06-13 18:20:44 +01:00 committed by GitHub
commit 83572fbfbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 865 additions and 298 deletions

View File

@ -264,6 +264,12 @@ func (s *Server) appendHTTPDService(c httpd.Config) {
srv.Handler.Version = s.buildInfo.Version
srv.Handler.BuildType = "OSS"
// Wire up storage service for Prometheus endpoints.
storageStore := storage.NewStore()
storageStore.MetaClient = s.MetaClient
storageStore.TSDBStore = s.TSDBStore
srv.Handler.Store = storageStore
s.Services = append(s.Services, srv)
}
@ -420,7 +426,7 @@ func (s *Server) Open() error {
return fmt.Errorf("open tsdb store: %s", err)
}
// Open the subcriber service
// Open the subscriber service
if err := s.Subscriber.Open(); err != nil {
return fmt.Errorf("open subscriber: %s", err)
}

View File

@ -448,8 +448,12 @@ func mapOpToComparison(op influxql.Token) storage.Node_Comparison {
switch op {
case influxql.EQ:
return storage.ComparisonEqual
case influxql.EQREGEX:
return storage.ComparisonRegex
case influxql.NEQ:
return storage.ComparisonNotEqual
case influxql.NEQREGEX:
return storage.ComparisonNotEqual
case influxql.LT:
return storage.ComparisonLess
case influxql.LTE:
@ -555,8 +559,14 @@ func (v *exprToNodeVisitor) Visit(node influxql.Node) influxql.Visitor {
})
return nil
case *influxql.RegexLiteral:
v.nodes = append(v.nodes, &storage.Node{
NodeType: storage.NodeTypeLiteral,
Value: &storage.Node_RegexValue{RegexValue: n.Val.String()},
})
return nil
default:
v.err = errors.New("unsupported expression")
v.err = fmt.Errorf("unsupported expression %T", n)
return nil
}
}

132
internal/cursors.go Normal file
View File

@ -0,0 +1,132 @@
package internal
import "github.com/influxdata/influxdb/tsdb"
var (
_ tsdb.IntegerBatchCursor = NewIntegerBatchCursorMock()
_ tsdb.FloatBatchCursor = NewFloatBatchCursorMock()
_ tsdb.UnsignedBatchCursor = NewUnsignedBatchCursorMock()
_ tsdb.StringBatchCursor = NewStringBatchCursorMock()
_ tsdb.BooleanBatchCursor = NewBooleanBatchCursorMock()
)
// BatchCursorMock provides a mock base implementation for batch cursors.
type BatchCursorMock struct {
CloseFn func()
ErrFn func() error
}
// NewBatchCursorMock returns an initialised BatchCursorMock, which
// returns the zero value for all methods.
func NewBatchCursorMock() *BatchCursorMock {
return &BatchCursorMock{
CloseFn: func() {},
ErrFn: func() error { return nil },
}
}
// Close closes the cursor.
func (c *BatchCursorMock) Close() { c.CloseFn() }
// Err returns the latest error, if any.
func (c *BatchCursorMock) Err() error { return c.ErrFn() }
// IntegerBatchCursorMock provides a mock implementation of an IntegerBatchCursorMock.
type IntegerBatchCursorMock struct {
*BatchCursorMock
NextFn func() (keys []int64, values []int64)
}
// NewIntegerBatchCursorMock returns an initialised IntegerBatchCursorMock, which
// returns the zero value for all methods.
func NewIntegerBatchCursorMock() *IntegerBatchCursorMock {
return &IntegerBatchCursorMock{
BatchCursorMock: NewBatchCursorMock(),
NextFn: func() ([]int64, []int64) { return nil, nil },
}
}
// Next returns the next set of keys and values.
func (c *IntegerBatchCursorMock) Next() (keys []int64, values []int64) {
return c.NextFn()
}
// FloatBatchCursorMock provides a mock implementation of a FloatBatchCursor.
type FloatBatchCursorMock struct {
*BatchCursorMock
NextFn func() (keys []int64, values []float64)
}
// NewFloatBatchCursorMock returns an initialised FloatBatchCursorMock, which
// returns the zero value for all methods.
func NewFloatBatchCursorMock() *FloatBatchCursorMock {
return &FloatBatchCursorMock{
BatchCursorMock: NewBatchCursorMock(),
NextFn: func() ([]int64, []float64) { return nil, nil },
}
}
// Next returns the next set of keys and values.
func (c *FloatBatchCursorMock) Next() (keys []int64, values []float64) {
return c.NextFn()
}
// UnsignedBatchCursorMock provides a mock implementation of an UnsignedBatchCursorMock.
type UnsignedBatchCursorMock struct {
*BatchCursorMock
NextFn func() (keys []int64, values []uint64)
}
// NewUnsignedBatchCursorMock returns an initialised UnsignedBatchCursorMock, which
// returns the zero value for all methods.
func NewUnsignedBatchCursorMock() *UnsignedBatchCursorMock {
return &UnsignedBatchCursorMock{
BatchCursorMock: NewBatchCursorMock(),
NextFn: func() ([]int64, []uint64) { return nil, nil },
}
}
// Next returns the next set of keys and values.
func (c *UnsignedBatchCursorMock) Next() (keys []int64, values []uint64) {
return c.NextFn()
}
// StringBatchCursorMock provides a mock implementation of a StringBatchCursor.
type StringBatchCursorMock struct {
*BatchCursorMock
NextFn func() (keys []int64, values []string)
}
// NewStringBatchCursorMock returns an initialised StringBatchCursorMock, which
// returns the zero value for all methods.
func NewStringBatchCursorMock() *StringBatchCursorMock {
return &StringBatchCursorMock{
BatchCursorMock: NewBatchCursorMock(),
NextFn: func() ([]int64, []string) { return nil, nil },
}
}
// Next returns the next set of keys and values.
func (c *StringBatchCursorMock) Next() (keys []int64, values []string) {
return c.NextFn()
}
// BooleanBatchCursorMock provides a mock implementation of a BooleanBatchCursor.
type BooleanBatchCursorMock struct {
*BatchCursorMock
NextFn func() (keys []int64, values []bool)
}
// NewBooleanBatchCursorMock returns an initialised BooleanBatchCursorMock, which
// returns the zero value for all methods.
func NewBooleanBatchCursorMock() *BooleanBatchCursorMock {
return &BooleanBatchCursorMock{
BatchCursorMock: NewBatchCursorMock(),
NextFn: func() ([]int64, []bool) { return nil, nil },
}
}
// Next returns the next set of keys and values.
func (c *BooleanBatchCursorMock) Next() (keys []int64, values []bool) {
return c.NextFn()
}

78
internal/storage_store.go Normal file
View File

@ -0,0 +1,78 @@
package internal
import (
"context"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/storage"
"github.com/influxdata/influxdb/tsdb"
"go.uber.org/zap"
)
// TSDBStoreMock is a mockable implementation of storage.Store.
//
// It's currently a partial implementation as one of a store's exported methods
// returns an unexported type.
type StorageStoreMock struct {
ReadFn func(ctx context.Context, req *storage.ReadRequest) (storage.Results, error)
WithLoggerFn func(log *zap.Logger)
ResultSet *StorageResultsMock
// TODO(edd): can't mock GroupRead as it returns an unexported type.
}
// NewStorageStoreMock initialises a StorageStoreMock with methods that return
// their zero values. It also initialises a StorageResultsMock, which can be
// configured via the ResultSet field.
func NewStorageStoreMock() *StorageStoreMock {
store := &StorageStoreMock{
WithLoggerFn: func(*zap.Logger) {},
ResultSet: NewStorageResultsMock(),
}
store.ReadFn = func(context.Context, *storage.ReadRequest) (storage.Results, error) {
return store.ResultSet, nil
}
return store
}
// WithLogger sets the logger.
func (s *StorageStoreMock) WithLogger(log *zap.Logger) {
s.WithLoggerFn(log)
}
// Read reads the storage request and returns a cursor to access results.
func (s *StorageStoreMock) Read(ctx context.Context, req *storage.ReadRequest) (storage.Results, error) {
return s.ReadFn(ctx, req)
}
// StorageResultsMock implements the storage.Results interface providing the
// ability to emit mock results from calls to the StorageStoreMock.Read method.
type StorageResultsMock struct {
CloseFn func()
NextFn func() bool
CursorFn func() tsdb.Cursor
TagsFn func() models.Tags
}
// NewStorageResultsMock initialises a StorageResultsMock whose methods all return
// their zero value.
func NewStorageResultsMock() *StorageResultsMock {
return &StorageResultsMock{
CloseFn: func() {},
NextFn: func() bool { return false },
CursorFn: func() tsdb.Cursor { return nil },
TagsFn: func() models.Tags { return nil },
}
}
// Close closes the result set.
func (r *StorageResultsMock) Close() { r.CloseFn() }
// Next returns true if there are more results available.
func (r *StorageResultsMock) Next() bool { return r.NextFn() }
// Cursor returns the cursor for the result set.
func (r *StorageResultsMock) Cursor() tsdb.Cursor { return r.CursorFn() }
// Tags returns the series' tag set.
func (r *StorageResultsMock) Tags() models.Tags { return r.TagsFn() }

View File

@ -4,20 +4,28 @@ import (
"errors"
"fmt"
"math"
"regexp"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/prometheus/remote"
"github.com/influxdata/influxql"
"github.com/influxdata/influxdb/services/storage"
)
const (
// measurementName is where all prometheus time series go to
measurementName = "_"
// measurementName is the default name used if no Prometheus name can be found on write
measurementName = "prom_metric_not_specified"
// fieldName is the field all prometheus values get written to
fieldName = "f64"
fieldName = "value"
// fieldTagKey is the tag key that all field names use in the new storage processor
fieldTagKey = "_field"
// prometheusNameTag is the tag key that Prometheus uses for metric names
prometheusNameTag = "__name__"
// measurementTagKey is the tag key that all measurement names use in the new storage processor
measurementTagKey = "_measurement"
)
var ErrNaNDropped = errors.New("dropped NaN from Prometheus since they are not supported")
@ -34,9 +42,14 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
var droppedNaN error
for _, ts := range req.Timeseries {
measurement := measurementName
tags := make(map[string]string, len(ts.Labels))
for _, l := range ts.Labels {
tags[l.Name] = l.Value
if l.Name == prometheusNameTag {
measurement = l.Value
}
}
for _, s := range ts.Samples {
@ -49,7 +62,7 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
// convert and append
t := time.Unix(0, s.TimestampMs*int64(time.Millisecond))
fields := map[string]interface{}{fieldName: s.Value}
p, err := models.NewPoint(measurementName, models.NewTags(tags), fields, t)
p, err := models.NewPoint(measurement, models.NewTags(tags), fields, t)
if err != nil {
return nil, err
}
@ -60,109 +73,184 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
return points, droppedNaN
}
// ReadRequestToInfluxQLQuery converts a Prometheus remote read request to an equivalent InfluxQL
// query that will return the requested data when executed
func ReadRequestToInfluxQLQuery(req *remote.ReadRequest, db, rp string) (*influxql.Query, error) {
// ReadRequestToInfluxStorageRequest converts a Prometheus remote read request into one using the
// new storage API that IFQL uses.
func ReadRequestToInfluxStorageRequest(req *remote.ReadRequest, db, rp string) (*storage.ReadRequest, error) {
if len(req.Queries) != 1 {
return nil, errors.New("Prometheus read endpoint currently only supports one query at a time")
}
promQuery := req.Queries[0]
q := req.Queries[0]
stmt := &influxql.SelectStatement{
IsRawQuery: true,
Fields: []*influxql.Field{
{Expr: &influxql.VarRef{Val: fieldName}},
},
Sources: []influxql.Source{&influxql.Measurement{
Name: measurementName,
Database: db,
RetentionPolicy: rp,
}},
Dimensions: []*influxql.Dimension{{Expr: &influxql.Wildcard{}}},
if rp != "" {
db = db + "/" + rp
}
cond, err := condFromMatchers(promQuery, promQuery.Matchers)
sreq := &storage.ReadRequest{
Database: db,
TimestampRange: storage.TimestampRange{
Start: time.Unix(0, q.StartTimestampMs*int64(time.Millisecond)).UnixNano(),
End: time.Unix(0, q.EndTimestampMs*int64(time.Millisecond)).UnixNano(),
},
PointsLimit: math.MaxInt64,
}
pred, err := predicateFromMatchers(q.Matchers)
if err != nil {
return nil, err
}
stmt.Condition = cond
return &influxql.Query{Statements: []influxql.Statement{stmt}}, nil
sreq.Predicate = pred
return sreq, nil
}
// condFromMatcher converts a Prometheus LabelMatcher into an equivalent InfluxQL BinaryExpr
func condFromMatcher(m *remote.LabelMatcher) (*influxql.BinaryExpr, error) {
var op influxql.Token
var rhs influxql.Expr
// RemoveInfluxSystemTags will remove tags that are Influx internal (_measurement and _field)
func RemoveInfluxSystemTags(tags models.Tags) models.Tags {
var t models.Tags
for _, tt := range tags {
if string(tt.Key) == measurementTagKey || string(tt.Key) == fieldTagKey {
continue
}
t = append(t, tt)
}
return t
}
// predicateFromMatchers takes Prometheus label matchers and converts them to a storage
// predicate that works with the schema that is written in, which assumes a single field
// named value
func predicateFromMatchers(matchers []*remote.LabelMatcher) (*storage.Predicate, error) {
left, err := nodeFromMatchers(matchers)
if err != nil {
return nil, err
}
right := fieldNode()
return &storage.Predicate{
Root: &storage.Node{
NodeType: storage.NodeTypeLogicalExpression,
Value: &storage.Node_Logical_{Logical: storage.LogicalAnd},
Children: []*storage.Node{left, right},
},
}, nil
}
// fieldNode returns a storage.Node that will match that the fieldTagKey == fieldName
// which matches how Prometheus data is fed into the system
func fieldNode() *storage.Node {
children := []*storage.Node{
&storage.Node{
NodeType: storage.NodeTypeTagRef,
Value: &storage.Node_TagRefValue{
TagRefValue: fieldTagKey,
},
},
&storage.Node{
NodeType: storage.NodeTypeLiteral,
Value: &storage.Node_StringValue{
StringValue: fieldName,
},
},
}
return &storage.Node{
NodeType: storage.NodeTypeComparisonExpression,
Value: &storage.Node_Comparison_{Comparison: storage.ComparisonEqual},
Children: children,
}
}
func nodeFromMatchers(matchers []*remote.LabelMatcher) (*storage.Node, error) {
if len(matchers) == 0 {
return nil, errors.New("expected matcher")
} else if len(matchers) == 1 {
return nodeFromMatcher(matchers[0])
}
left, err := nodeFromMatcher(matchers[0])
if err != nil {
return nil, err
}
right, err := nodeFromMatchers(matchers[1:])
if err != nil {
return nil, err
}
children := []*storage.Node{left, right}
return &storage.Node{
NodeType: storage.NodeTypeLogicalExpression,
Value: &storage.Node_Logical_{Logical: storage.LogicalAnd},
Children: children,
}, nil
}
func nodeFromMatcher(m *remote.LabelMatcher) (*storage.Node, error) {
var op storage.Node_Comparison
switch m.Type {
case remote.MatchType_EQUAL:
op = influxql.EQ
op = storage.ComparisonEqual
case remote.MatchType_NOT_EQUAL:
op = influxql.NEQ
op = storage.ComparisonNotEqual
case remote.MatchType_REGEX_MATCH:
op = influxql.EQREGEX
op = storage.ComparisonRegex
case remote.MatchType_REGEX_NO_MATCH:
op = influxql.NEQREGEX
op = storage.ComparisonNotRegex
default:
return nil, fmt.Errorf("unknown match type %v", m.Type)
}
if op == influxql.EQREGEX || op == influxql.NEQREGEX {
re, err := regexp.Compile(m.Value)
if err != nil {
return nil, err
}
// Convert regex values to InfluxDB format.
rhs = &influxql.RegexLiteral{Val: re}
} else {
rhs = &influxql.StringLiteral{Val: m.Value}
name := m.Name
if m.Name == prometheusNameTag {
name = measurementTagKey
}
return &influxql.BinaryExpr{
Op: op,
LHS: &influxql.VarRef{Val: m.Name},
RHS: rhs,
left := &storage.Node{
NodeType: storage.NodeTypeTagRef,
Value: &storage.Node_TagRefValue{
TagRefValue: name,
},
}
var right *storage.Node
if op == storage.ComparisonRegex || op == storage.ComparisonNotRegex {
right = &storage.Node{
NodeType: storage.NodeTypeLiteral,
Value: &storage.Node_RegexValue{
RegexValue: m.Value,
},
}
} else {
right = &storage.Node{
NodeType: storage.NodeTypeLiteral,
Value: &storage.Node_StringValue{
StringValue: m.Value,
},
}
}
children := []*storage.Node{left, right}
return &storage.Node{
NodeType: storage.NodeTypeComparisonExpression,
Value: &storage.Node_Comparison_{Comparison: op},
Children: children,
}, nil
}
// condFromMatchers converts a Prometheus remote query and a collection of Prometheus label matchers
// into an equivalent influxql.BinaryExpr. This assume a schema that is written via the Prometheus
// remote write endpoint, which uses a measurement name of _ and a field name of f64. Tags and labels
// are kept equivalent.
func condFromMatchers(q *remote.Query, matchers []*remote.LabelMatcher) (*influxql.BinaryExpr, error) {
if len(matchers) > 0 {
lhs, err := condFromMatcher(matchers[0])
if err != nil {
return nil, err
// ModelTagsToLabelPairs converts models.Tags to a slice of Prometheus label pairs
func ModelTagsToLabelPairs(tags models.Tags) []*remote.LabelPair {
pairs := make([]*remote.LabelPair, 0, len(tags))
for _, t := range tags {
if string(t.Value) == "" {
continue
}
rhs, err := condFromMatchers(q, matchers[1:])
if err != nil {
return nil, err
}
return &influxql.BinaryExpr{
Op: influxql.AND,
LHS: lhs,
RHS: rhs,
}, nil
pairs = append(pairs, &remote.LabelPair{
Name: string(t.Key),
Value: string(t.Value),
})
}
return &influxql.BinaryExpr{
Op: influxql.AND,
LHS: &influxql.BinaryExpr{
Op: influxql.GTE,
LHS: &influxql.VarRef{Val: "time"},
RHS: &influxql.TimeLiteral{Val: time.Unix(0, q.StartTimestampMs*int64(time.Millisecond))},
},
RHS: &influxql.BinaryExpr{
Op: influxql.LTE,
LHS: &influxql.VarRef{Val: "time"},
RHS: &influxql.TimeLiteral{Val: time.Unix(0, q.EndTimestampMs*int64(time.Millisecond))},
},
}, nil
return pairs
}
// TagsToLabelPairs converts a map of Influx tags into a slice of Prometheus label pairs

View File

@ -1,97 +0,0 @@
package prometheus_test
import (
"errors"
"reflect"
"testing"
"github.com/influxdata/influxdb/prometheus"
"github.com/influxdata/influxdb/prometheus/remote"
"github.com/influxdata/influxql"
)
func TestReadRequestToInfluxQLQuery(t *testing.T) {
examples := []struct {
name string
queries []*remote.Query
expQuery string
expError error
}{
{
name: "too many queries",
queries: []*remote.Query{{}, {}}, // Multiple queries
expError: errors.New("Prometheus read endpoint currently only supports one query at a time"),
},
{
name: "single condition",
queries: []*remote.Query{{
StartTimestampMs: 1,
EndTimestampMs: 100,
Matchers: []*remote.LabelMatcher{
{Name: "region", Value: "west", Type: remote.MatchType_EQUAL},
},
}},
expQuery: "SELECT f64 FROM db0.rp0._ WHERE region = 'west' AND time >= '1970-01-01T00:00:00.001Z' AND time <= '1970-01-01T00:00:00.1Z' GROUP BY *",
},
{
name: "multiple conditions",
queries: []*remote.Query{{
StartTimestampMs: 1,
EndTimestampMs: 100,
Matchers: []*remote.LabelMatcher{
{Name: "region", Value: "west", Type: remote.MatchType_EQUAL},
{Name: "host", Value: "serverA", Type: remote.MatchType_NOT_EQUAL},
},
}},
expQuery: "SELECT f64 FROM db0.rp0._ WHERE region = 'west' AND host != 'serverA' AND time >= '1970-01-01T00:00:00.001Z' AND time <= '1970-01-01T00:00:00.1Z' GROUP BY *",
},
{
name: "rewrite regex",
queries: []*remote.Query{{
StartTimestampMs: 1,
EndTimestampMs: 100,
Matchers: []*remote.LabelMatcher{
{Name: "region", Value: "c.*", Type: remote.MatchType_REGEX_MATCH},
{Name: "host", Value: `\d`, Type: remote.MatchType_REGEX_NO_MATCH},
},
}},
expQuery: `SELECT f64 FROM db0.rp0._ WHERE region =~ /c.*/ AND host !~ /\d/ AND time >= '1970-01-01T00:00:00.001Z' AND time <= '1970-01-01T00:00:00.1Z' GROUP BY *`,
},
{
name: "escape regex",
queries: []*remote.Query{{
StartTimestampMs: 1,
EndTimestampMs: 100,
Matchers: []*remote.LabelMatcher{
{Name: "test_type", Value: "a/b", Type: remote.MatchType_REGEX_MATCH},
},
}},
expQuery: `SELECT f64 FROM db0.rp0._ WHERE test_type =~ /a\/b/ AND time >= '1970-01-01T00:00:00.001Z' AND time <= '1970-01-01T00:00:00.1Z' GROUP BY *`,
},
}
for _, example := range examples {
t.Run(example.name, func(t *testing.T) {
readRequest := &remote.ReadRequest{Queries: example.queries}
query, err := prometheus.ReadRequestToInfluxQLQuery(readRequest, "db0", "rp0")
if !reflect.DeepEqual(err, example.expError) {
t.Errorf("got error %v, expected %v", err, example.expError)
}
var queryString string
if query != nil {
queryString = query.String()
}
if queryString != example.expQuery {
t.Errorf("got query %v, expected %v", queryString, example.expQuery)
}
if queryString != "" {
if _, err := influxql.ParseStatement(queryString); err != nil {
t.Error(err)
}
}
})
}
}

View File

@ -3,6 +3,7 @@ package httpd
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"expvar"
@ -32,6 +33,7 @@ import (
"github.com/influxdata/influxdb/prometheus/remote"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/services/storage"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/uuid"
"github.com/influxdata/influxql"
@ -108,6 +110,8 @@ type Handler struct {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
}
Store Store
Config *Config
Logger *zap.Logger
CLFLogger *log.Logger
@ -125,6 +129,7 @@ func NewHandler(c Config) *Handler {
Config: &c,
Logger: zap.NewNop(),
CLFLogger: log.New(os.Stderr, "[httpd] ", 0),
Store: storage.NewStore(),
stats: &Statistics{},
requestTracker: NewRequestTracker(),
}
@ -967,8 +972,8 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me
h.writeHeader(w, http.StatusNoContent)
}
// servePromRead will convert a Prometheus remote read request into an InfluxQL query and
// return data in Prometheus remote read protobuf format.
// servePromRead will convert a Prometheus remote read request into a storage
// query and returns data in Prometheus remote read protobuf format.
func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user meta.User) {
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
@ -990,105 +995,82 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
// Query the DB and create a ReadResponse for Prometheus
db := r.FormValue("db")
q, err := prometheus.ReadRequestToInfluxQLQuery(&req, db, r.FormValue("rp"))
rp := r.FormValue("rp")
readRequest, err := prometheus.ReadRequestToInfluxStorageRequest(&req, db, rp)
if err != nil {
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}
// Check authorization.
if h.Config.AuthEnabled {
if err := h.QueryAuthorizer.AuthorizeQuery(user, q, db); err != nil {
if err, ok := err.(meta.ErrAuthorize); ok {
h.Logger.Info("Unauthorized request",
zap.String("user", err.User),
zap.Stringer("query", err.Query),
logger.Database(err.Database))
}
h.httpError(w, "error authorizing query: "+err.Error(), http.StatusForbidden)
return
}
ctx := context.Background()
rs, err := h.Store.Read(ctx, readRequest)
if err != nil {
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}
opts := query.ExecutionOptions{
Database: db,
ChunkSize: DefaultChunkSize,
ReadOnly: true,
}
if h.Config.AuthEnabled {
// The current user determines the authorized actions.
opts.Authorizer = user
} else {
// Auth is disabled, so allow everything.
opts.Authorizer = query.OpenAuthorizer
}
// Make sure if the client disconnects we signal the query to abort
closing := make(chan struct{})
if notifier, ok := w.(http.CloseNotifier); ok {
// CloseNotify() is not guaranteed to send a notification when the query
// is closed. Use this channel to signal that the query is finished to
// prevent lingering goroutines that may be stuck.
done := make(chan struct{})
defer close(done)
notify := notifier.CloseNotify()
go func() {
// Wait for either the request to finish
// or for the client to disconnect
select {
case <-done:
case <-notify:
close(closing)
}
}()
opts.AbortCh = done
} else {
defer close(closing)
}
// Execute query.
results := h.QueryExecutor.ExecuteQuery(q, opts, closing)
defer rs.Close()
resp := &remote.ReadResponse{
Results: []*remote.QueryResult{{}},
}
// pull all results from the channel
for r := range results {
// Ignore nil results.
if r == nil {
for rs.Next() {
cur := rs.Cursor()
if cur == nil {
// no data for series key + field combination
continue
}
// read the series data and convert into Prometheus samples
for _, s := range r.Series {
ts := &remote.TimeSeries{
Labels: prometheus.TagsToLabelPairs(s.Tags),
tags := prometheus.RemoveInfluxSystemTags(rs.Tags())
var unsupportedCursor string
switch cur := cur.(type) {
case tsdb.FloatBatchCursor:
var series *remote.TimeSeries
for {
ts, vs := cur.Next()
if len(ts) == 0 {
break
}
// We have some data for this series.
if series == nil {
series = &remote.TimeSeries{
Labels: prometheus.ModelTagsToLabelPairs(tags),
}
}
for i, ts := range ts {
series.Samples = append(series.Samples, &remote.Sample{
TimestampMs: ts / int64(time.Millisecond),
Value: vs[i],
})
}
}
for _, v := range s.Values {
t, ok := v[0].(time.Time)
if !ok {
h.httpError(w, fmt.Sprintf("value %v wasn't a time", v[0]), http.StatusBadRequest)
return
}
val, ok := v[1].(float64)
if !ok {
h.httpError(w, fmt.Sprintf("value %v wasn't a float64", v[1]), http.StatusBadRequest)
}
timestamp := t.UnixNano() / int64(time.Millisecond) / int64(time.Nanosecond)
ts.Samples = append(ts.Samples, &remote.Sample{
TimestampMs: timestamp,
Value: val,
})
// There was data for the series.
if series != nil {
resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, series)
}
case tsdb.IntegerBatchCursor:
unsupportedCursor = "int64"
case tsdb.UnsignedBatchCursor:
unsupportedCursor = "uint"
case tsdb.BooleanBatchCursor:
unsupportedCursor = "bool"
case tsdb.StringBatchCursor:
unsupportedCursor = "string"
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
cur.Close()
resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, ts)
if len(unsupportedCursor) > 0 {
h.Logger.Info("Prometheus can't read cursor",
zap.String("cursor_type", unsupportedCursor),
zap.Stringer("series", tags),
)
}
}
data, err := proto.Marshal(resp)
if err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
@ -1607,6 +1589,12 @@ func (h *Handler) recovery(inner http.Handler, name string) http.Handler {
})
}
// Store describes the behaviour of the storage packages Store type.
type Store interface {
Read(ctx context.Context, req *storage.ReadRequest) (storage.Results, error)
WithLogger(log *zap.Logger)
}
// Response represents a list of statement results.
type Response struct {
Results []*query.Result

View File

@ -21,12 +21,15 @@ import (
"github.com/dgrijalva/jwt-go"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/internal"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/prometheus/remote"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/httpd"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
)
@ -576,7 +579,7 @@ func TestHandler_PromWrite(t *testing.T) {
if err != nil {
t.Fatal(err.Error())
}
expFields := models.Fields{"f64": 1.2}
expFields := models.Fields{"value": 1.2}
if !reflect.DeepEqual(fields, expFields) {
t.Fatalf("fields don't match\n\texp: %v\n\tgot: %v", expFields, fields)
}
@ -599,10 +602,11 @@ func TestHandler_PromRead(t *testing.T) {
req := &remote.ReadRequest{
Queries: []*remote.Query{{
Matchers: []*remote.LabelMatcher{
{Type: remote.MatchType_EQUAL, Name: "eq", Value: "a"},
{Type: remote.MatchType_NOT_EQUAL, Name: "neq", Value: "b"},
{Type: remote.MatchType_REGEX_MATCH, Name: "regex", Value: "c"},
{Type: remote.MatchType_REGEX_NO_MATCH, Name: "neqregex", Value: "d"},
{
Type: remote.MatchType_EQUAL,
Name: "__name__",
Value: "value",
},
},
StartTimestampMs: 1,
EndTimestampMs: 2,
@ -614,31 +618,116 @@ func TestHandler_PromRead(t *testing.T) {
}
compressed := snappy.Encode(nil, data)
b := bytes.NewReader(compressed)
h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
if stmt.String() != `SELECT f64 FROM foo.._ WHERE eq = 'a' AND neq != 'b' AND regex =~ /c/ AND neqregex !~ /d/ AND time >= '1970-01-01T00:00:00.001Z' AND time <= '1970-01-01T00:00:00.002Z' GROUP BY *` {
t.Fatalf("unexpected query: %s", stmt.String())
} else if ctx.Database != `foo` {
t.Fatalf("unexpected db: %s", ctx.Database)
}
row := &models.Row{
Name: "_",
Tags: map[string]string{"foo": "bar"},
Columns: []string{"time", "f64"},
Values: [][]interface{}{{time.Unix(23, 0), 1.2}},
}
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{row})}
return nil
}
w := httptest.NewRecorder()
h.ServeHTTP(w, MustNewJSONRequest("POST", "/api/v1/prom/read?db=foo", b))
// Number of results in the result set
var i int64
h.Store.ResultSet.NextFn = func() bool {
i++
return i <= 2
}
// data for each cursor.
h.Store.ResultSet.CursorFn = func() tsdb.Cursor {
cursor := internal.NewFloatBatchCursorMock()
var i int64
cursor.NextFn = func() ([]int64, []float64) {
i++
ts := []int64{22000000 * i, 10000000000 * i}
vs := []float64{2.3, 2992.33}
if i > 2 {
ts, vs = nil, nil
}
return ts, vs
}
return cursor
}
// Tags for each cursor.
h.Store.ResultSet.TagsFn = func() models.Tags {
return models.NewTags(map[string]string{
"host": fmt.Sprintf("server-%d", i),
"_measurement": "mem",
})
}
h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/read?db=foo&rp=bar", b))
if w.Code != http.StatusOK {
t.Fatalf("unexpected status: %d", w.Code)
}
reqBuf, err := snappy.Decode(nil, w.Body.Bytes())
if err != nil {
t.Fatal(err)
}
var resp remote.ReadResponse
if err := proto.Unmarshal(reqBuf, &resp); err != nil {
t.Fatal(err)
}
expResults := []*remote.QueryResult{
{
Timeseries: []*remote.TimeSeries{
{
Labels: []*remote.LabelPair{
{Name: "host", Value: "server-1"},
},
Samples: []*remote.Sample{
{TimestampMs: 22, Value: 2.3},
{TimestampMs: 10000, Value: 2992.33},
{TimestampMs: 44, Value: 2.3},
{TimestampMs: 20000, Value: 2992.33},
},
},
{
Labels: []*remote.LabelPair{
{Name: "host", Value: "server-2"},
},
Samples: []*remote.Sample{
{TimestampMs: 22, Value: 2.3},
{TimestampMs: 10000, Value: 2992.33},
{TimestampMs: 44, Value: 2.3},
{TimestampMs: 20000, Value: 2992.33},
},
},
},
},
}
if !reflect.DeepEqual(resp.Results, expResults) {
t.Fatalf("Results differ:\n%v", cmp.Diff(resp.Results, expResults))
}
}
func TestHandler_PromRead_NoResults(t *testing.T) {
req := &remote.ReadRequest{Queries: []*remote.Query{&remote.Query{
Matchers: []*remote.LabelMatcher{
{
Type: remote.MatchType_EQUAL,
Name: "__name__",
Value: "value",
},
},
StartTimestampMs: 0,
EndTimestampMs: models.MaxNanoTime / int64(time.Millisecond),
}}}
data, err := proto.Marshal(req)
if err != nil {
t.Fatal("couldn't marshal prometheus request")
}
compressed := snappy.Encode(nil, data)
h := NewHandler(false)
w := httptest.NewRecorder()
b := bytes.NewReader(compressed)
h.ServeHTTP(w, MustNewJSONRequest("POST", "/api/v1/prom/read?db=foo", b))
if w.Code != http.StatusOK {
t.Fatalf("unexpected status: %d", w.Code)
}
reqBuf, err := snappy.Decode(nil, w.Body.Bytes())
if err != nil {
t.Fatal(err.Error())
@ -648,17 +737,66 @@ func TestHandler_PromRead(t *testing.T) {
if err := proto.Unmarshal(reqBuf, &resp); err != nil {
t.Fatal(err.Error())
}
}
expLabels := []*remote.LabelPair{{Name: "foo", Value: "bar"}}
expSamples := []*remote.Sample{{TimestampMs: 23000, Value: 1.2}}
ts := resp.Results[0].Timeseries[0]
if !reflect.DeepEqual(expLabels, ts.Labels) {
t.Fatalf("unexpected labels\n\texp: %v\n\tgot: %v", expLabels, ts.Labels)
func TestHandler_PromRead_UnsupportedCursors(t *testing.T) {
req := &remote.ReadRequest{Queries: []*remote.Query{&remote.Query{
Matchers: []*remote.LabelMatcher{
{
Type: remote.MatchType_EQUAL,
Name: "__name__",
Value: "value",
},
},
StartTimestampMs: 0,
EndTimestampMs: models.MaxNanoTime / int64(time.Millisecond),
}}}
data, err := proto.Marshal(req)
if err != nil {
t.Fatal("couldn't marshal prometheus request")
}
if !reflect.DeepEqual(expSamples, ts.Samples) {
t.Fatalf("unexpectd samples\n\texp: %v\n\tgot: %v", expSamples, ts.Samples)
compressed := snappy.Encode(nil, data)
unsupported := []tsdb.Cursor{
internal.NewIntegerBatchCursorMock(),
internal.NewBooleanBatchCursorMock(),
internal.NewUnsignedBatchCursorMock(),
internal.NewStringBatchCursorMock(),
}
for _, cursor := range unsupported {
h := NewHandler(false)
w := httptest.NewRecorder()
var lb bytes.Buffer
h.Logger = logger.New(&lb)
more := true
h.Store.ResultSet.NextFn = func() bool { defer func() { more = false }(); return more }
// Set the cursor type that will be returned while iterating over
// the mock store.
h.Store.ResultSet.CursorFn = func() tsdb.Cursor {
return cursor
}
b := bytes.NewReader(compressed)
h.ServeHTTP(w, MustNewJSONRequest("POST", "/api/v1/prom/read?db=foo", b))
if w.Code != http.StatusOK {
t.Fatalf("unexpected status: %d", w.Code)
}
reqBuf, err := snappy.Decode(nil, w.Body.Bytes())
if err != nil {
t.Fatal(err.Error())
}
var resp remote.ReadResponse
if err := proto.Unmarshal(reqBuf, &resp); err != nil {
t.Fatal(err.Error())
}
if !strings.Contains(lb.String(), "cursor_type=") {
t.Fatalf("got log message %q, expected to contain \"cursor_type\"", lb.String())
}
}
}
@ -1031,6 +1169,7 @@ type Handler struct {
StatementExecutor HandlerStatementExecutor
QueryAuthorizer HandlerQueryAuthorizer
PointsWriter HandlerPointsWriter
Store *internal.StorageStoreMock
}
// NewHandler returns a new instance of Handler.
@ -1047,14 +1186,23 @@ func NewHandlerWithConfig(config httpd.Config) *Handler {
}
h.MetaClient = &internal.MetaClientMock{}
h.Store = internal.NewStorageStoreMock()
h.Handler.MetaClient = h.MetaClient
h.Handler.Store = h.Store
h.Handler.QueryExecutor = query.NewExecutor()
h.Handler.QueryExecutor.StatementExecutor = &h.StatementExecutor
h.Handler.QueryAuthorizer = &h.QueryAuthorizer
h.Handler.PointsWriter = &h.PointsWriter
h.Handler.Version = "0.0.0"
h.Handler.BuildType = "OSS"
if testing.Verbose() {
l := logger.New(os.Stdout)
h.Handler.Logger = l
h.Handler.Store.WithLogger(l)
}
return h
}

View File

@ -201,6 +201,7 @@ func (s *Service) Close() error {
func (s *Service) WithLogger(log *zap.Logger) {
s.Logger = log.With(zap.String("service", "httpd"))
s.Handler.Logger = s.Logger
s.Handler.Store.WithLogger(s.Logger)
}
// Err returns a channel for fatal errors that occur on the listener.

View File

@ -22,12 +22,21 @@ type ResultSet struct {
mb *multiShardBatchCursors
}
// Close closes the result set. Close is idempotent.
func (r *ResultSet) Close() {
if r == nil {
return // Nothing to do.
}
r.row.query = nil
r.cur.Close()
}
// Next returns true if there are more results available.
func (r *ResultSet) Next() bool {
if r == nil {
return false
}
row := r.cur.Next()
if row == nil {
return false

View File

@ -82,7 +82,14 @@ func (s *Store) validateArgs(database string, start, end int64) (string, string,
return database, rp, start, end, nil
}
func (s *Store) Read(ctx context.Context, req *ReadRequest) (*ResultSet, error) {
type Results interface {
Close()
Next() bool
Cursor() tsdb.Cursor
Tags() models.Tags
}
func (s *Store) Read(ctx context.Context, req *ReadRequest) (Results, error) {
if len(req.GroupKeys) > 0 {
panic("Read: len(Grouping) > 0")
}
@ -97,14 +104,14 @@ func (s *Store) Read(ctx context.Context, req *ReadRequest) (*ResultSet, error)
return nil, err
}
if len(shardIDs) == 0 {
return nil, nil
return (*ResultSet)(nil), nil
}
var cur seriesCursor
if ic, err := newIndexSeriesCursor(ctx, req.Predicate, s.TSDBStore.Shards(shardIDs)); err != nil {
return nil, err
} else if ic == nil {
return nil, nil
return (*ResultSet)(nil), nil
} else {
cur = ic
}

View File

@ -1,6 +1,7 @@
package tests
import (
"bytes"
"encoding/json"
"flag"
"fmt"
@ -8,14 +9,19 @@ import (
"net/http"
"net/url"
"os"
"reflect"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/prometheus/remote"
"github.com/influxdata/influxdb/tsdb"
)
@ -9502,6 +9508,199 @@ func TestServer_NestedAggregateWithMathPanics(t *testing.T) {
}
}
func TestServer_Prometheus_Read(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", NewRetentionPolicySpec("rp0", 1, 0), true); err != nil {
t.Fatal(err)
}
writes := []string{
`mem,host=server-1,region=west value=2.34 119000000000`,
`mem,host=server-1,region=west value=988.0 119500000000`,
`mem,host=server-1,region=south value=121.2 120000000000`,
`mem,host=server-2,region=east value=1.1 120000000000`,
`cpu,region=south value=200 119000000000`,
`mem,host=server-1,region=north value=10.00 121000000000`,
}
test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: strings.Join(writes, "\n")},
}
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
req := &remote.ReadRequest{
Queries: []*remote.Query{{
Matchers: []*remote.LabelMatcher{
{
Type: remote.MatchType_EQUAL,
Name: "__name__",
Value: "mem",
},
// TODO(edd): awaiting negation bugfix in tsdb.IndexSet.
// {
// Type: remote.MatchType_NOT_EQUAL,
// Name: "host",
// Value: "server-2",
// },
{
Type: remote.MatchType_REGEX_MATCH,
Name: "host",
Value: "server-1$",
},
// TODO(edd): awaiting negation bugfix in tsdb.IndexSet.
// {
// Type: remote.MatchType_REGEX_NO_MATCH,
// Name: "region",
// Value: "south",
// },
},
StartTimestampMs: 119000,
EndTimestampMs: 120010,
}},
}
data, err := proto.Marshal(req)
if err != nil {
t.Fatal("couldn't marshal prometheus request")
}
compressed := snappy.Encode(nil, data)
b := bytes.NewReader(compressed)
resp, err := http.Post(s.URL()+"/api/v1/prom/read?db=db0&rp=rp0", "", b)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status: %d. Body: %s", resp.StatusCode, MustReadAll(resp.Body))
}
reqBuf, err := snappy.Decode(nil, MustReadAll(resp.Body))
if err != nil {
t.Fatal(err)
}
var promResp remote.ReadResponse
if err := proto.Unmarshal(reqBuf, &promResp); err != nil {
t.Fatal(err.Error())
}
expResults := []*remote.QueryResult{
{
Timeseries: []*remote.TimeSeries{
{
Labels: []*remote.LabelPair{
{Name: "host", Value: "server-1"},
{Name: "region", Value: "south"},
},
Samples: []*remote.Sample{
{TimestampMs: 120000, Value: 121.2},
},
},
{
Labels: []*remote.LabelPair{
{Name: "host", Value: "server-1"},
{Name: "region", Value: "west"},
},
Samples: []*remote.Sample{
{TimestampMs: 119000, Value: 2.34},
{TimestampMs: 119500, Value: 988.00},
},
},
},
},
}
if !reflect.DeepEqual(promResp.Results, expResults) {
t.Fatalf("Results differ:\n%v", cmp.Diff(expResults, promResp.Results))
}
}
func TestServer_Prometheus_Write(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", NewRetentionPolicySpec("rp0", 1, 0), true); err != nil {
t.Fatal(err)
}
test := NewTest("db0", "rp0")
now := now().Round(time.Millisecond)
test.addQueries(
&Query{
name: "selecting the data should return it",
command: `SELECT * FROM db0.rp0.cpu`,
exp: fmt.Sprintf(`{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","__name__","host","value"],"values":[["%s","cpu","a",100],["%s","cpu","b",200]]}]}]}`, now.Format(time.RFC3339Nano), now.Add(10*time.Millisecond).Format(time.RFC3339Nano)),
},
)
req := &remote.WriteRequest{
Timeseries: []*remote.TimeSeries{
{
Labels: []*remote.LabelPair{
{Name: "__name__", Value: "cpu"},
{Name: "host", Value: "a"},
},
Samples: []*remote.Sample{
{TimestampMs: now.UnixNano() / int64(time.Millisecond), Value: 100.0},
},
},
{
Labels: []*remote.LabelPair{
{Name: "__name__", Value: "cpu"},
{Name: "host", Value: "b"},
},
Samples: []*remote.Sample{
{TimestampMs: now.UnixNano()/int64(time.Millisecond) + 10, Value: 200.0},
},
},
},
}
data, err := proto.Marshal(req)
if err != nil {
t.Fatal("couldn't marshal prometheus request")
}
compressed := snappy.Encode(nil, data)
b := bytes.NewReader(compressed)
resp, err := http.Post(s.URL()+"/api/v1/prom/write?db=db0&rp=rp0", "", b)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
t.Fatalf("unexpected status: %d. Body: %s", resp.StatusCode, MustReadAll(resp.Body))
}
for i, query := range test.queries {
t.Run(query.name, func(t *testing.T) {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Skipf("SKIP:: %s", query.name)
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
})
}
}
func init() {
// Force uint support to be enabled for testing.
models.EnableUintSupport()

View File

@ -1361,7 +1361,6 @@ func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxq
}
if !valEqual(ve) {
continue
}
tagMatch = true

View File

@ -818,7 +818,6 @@ func (s *Shard) CreateIterator(ctx context.Context, m *influxql.Measurement, opt
if err != nil {
return nil, err
}
switch m.SystemIterator {
case "_fieldKeys":
return NewFieldKeysIterator(s, opt)