feat(prometheus): update prometheus remote protocol (#17814)
Fetched up-to-date protocol from prometheus projectpull/18961/head
parent
3f3b7b5160
commit
6910c53440
20
go.mod
20
go.mod
|
@ -3,7 +3,6 @@ module github.com/influxdata/influxdb
|
|||
go 1.13
|
||||
|
||||
require (
|
||||
cloud.google.com/go/bigtable v1.2.0 // indirect
|
||||
collectd.org v0.3.0
|
||||
github.com/BurntSushi/toml v0.3.1
|
||||
github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db
|
||||
|
@ -13,8 +12,6 @@ require (
|
|||
github.com/davecgh/go-spew v1.1.1
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8
|
||||
github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd // indirect
|
||||
github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31 // indirect
|
||||
github.com/gogo/protobuf v1.3.1
|
||||
github.com/golang/snappy v0.0.1
|
||||
github.com/google/go-cmp v0.5.0
|
||||
|
@ -24,31 +21,24 @@ require (
|
|||
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
|
||||
github.com/jsternberg/zap-logfmt v1.2.0
|
||||
github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef
|
||||
github.com/klauspost/compress v1.4.0 // indirect
|
||||
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5 // indirect
|
||||
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 // indirect
|
||||
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada
|
||||
github.com/mattn/go-isatty v0.0.12
|
||||
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae // indirect
|
||||
github.com/opentracing/opentracing-go v1.1.0
|
||||
github.com/paulbellamy/ratecounter v0.2.0
|
||||
github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f
|
||||
github.com/philhofer/fwd v1.0.0 // indirect
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/prometheus/client_golang v1.5.1
|
||||
github.com/prometheus/prometheus v0.0.0-20200609090129-a6600f564e3c
|
||||
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52
|
||||
github.com/segmentio/kafka-go v0.2.0 // indirect
|
||||
github.com/smartystreets/goconvey v1.6.4 // indirect
|
||||
github.com/spf13/cast v1.3.0
|
||||
github.com/tinylib/msgp v1.1.0
|
||||
github.com/willf/bitset v1.1.9 // indirect
|
||||
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6
|
||||
go.uber.org/zap v1.14.1
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
|
||||
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82
|
||||
golang.org/x/crypto v0.0.0-20200422194213-44a606286825
|
||||
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
|
||||
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f
|
||||
golang.org/x/text v0.3.2
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
|
||||
gonum.org/v1/gonum v0.6.0 // indirect
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1
|
||||
google.golang.org/grpc v1.29.1
|
||||
)
|
||||
|
|
|
@ -8,9 +8,9 @@ import (
|
|||
|
||||
"github.com/gogo/protobuf/types"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/prometheus/remote"
|
||||
"github.com/influxdata/influxdb/services/storage"
|
||||
"github.com/influxdata/influxdb/storage/reads/datatypes"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -45,7 +45,7 @@ func (e DroppedValuesError) Error() string {
|
|||
|
||||
// WriteRequestToPoints converts a Prometheus remote write request of time series and their
|
||||
// samples into Points that can be written into Influx
|
||||
func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
|
||||
func WriteRequestToPoints(req *prompb.WriteRequest) ([]models.Point, error) {
|
||||
var maxPoints int
|
||||
for _, ts := range req.Timeseries {
|
||||
maxPoints += len(ts.Samples)
|
||||
|
@ -79,7 +79,7 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
|
|||
}
|
||||
|
||||
// convert and append
|
||||
t := time.Unix(0, s.TimestampMs*int64(time.Millisecond))
|
||||
t := time.Unix(0, s.Timestamp*int64(time.Millisecond))
|
||||
fields := map[string]interface{}{fieldName: s.Value}
|
||||
p, err := models.NewPoint(measurement, models.NewTags(tags), fields, t)
|
||||
if err != nil {
|
||||
|
@ -97,7 +97,7 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
|
|||
|
||||
// ReadRequestToInfluxStorageRequest converts a Prometheus remote read request into one using the
|
||||
// new storage API that IFQL uses.
|
||||
func ReadRequestToInfluxStorageRequest(req *remote.ReadRequest, db, rp string) (*datatypes.ReadFilterRequest, error) {
|
||||
func ReadRequestToInfluxStorageRequest(req *prompb.ReadRequest, db, rp string) (*datatypes.ReadFilterRequest, error) {
|
||||
if len(req.Queries) != 1 {
|
||||
return nil, errors.New("Prometheus read endpoint currently only supports one query at a time")
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ func RemoveInfluxSystemTags(tags models.Tags) models.Tags {
|
|||
// predicateFromMatchers takes Prometheus label matchers and converts them to a storage
|
||||
// predicate that works with the schema that is written in, which assumes a single field
|
||||
// named value
|
||||
func predicateFromMatchers(matchers []*remote.LabelMatcher) (*datatypes.Predicate, error) {
|
||||
func predicateFromMatchers(matchers []*prompb.LabelMatcher) (*datatypes.Predicate, error) {
|
||||
left, err := nodeFromMatchers(matchers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -182,7 +182,7 @@ func fieldNode() *datatypes.Node {
|
|||
}
|
||||
}
|
||||
|
||||
func nodeFromMatchers(matchers []*remote.LabelMatcher) (*datatypes.Node, error) {
|
||||
func nodeFromMatchers(matchers []*prompb.LabelMatcher) (*datatypes.Node, error) {
|
||||
if len(matchers) == 0 {
|
||||
return nil, errors.New("expected matcher")
|
||||
} else if len(matchers) == 1 {
|
||||
|
@ -207,16 +207,16 @@ func nodeFromMatchers(matchers []*remote.LabelMatcher) (*datatypes.Node, error)
|
|||
}, nil
|
||||
}
|
||||
|
||||
func nodeFromMatcher(m *remote.LabelMatcher) (*datatypes.Node, error) {
|
||||
func nodeFromMatcher(m *prompb.LabelMatcher) (*datatypes.Node, error) {
|
||||
var op datatypes.Node_Comparison
|
||||
switch m.Type {
|
||||
case remote.MatchType_EQUAL:
|
||||
case prompb.LabelMatcher_EQ:
|
||||
op = datatypes.ComparisonEqual
|
||||
case remote.MatchType_NOT_EQUAL:
|
||||
case prompb.LabelMatcher_NEQ:
|
||||
op = datatypes.ComparisonNotEqual
|
||||
case remote.MatchType_REGEX_MATCH:
|
||||
case prompb.LabelMatcher_RE:
|
||||
op = datatypes.ComparisonRegex
|
||||
case remote.MatchType_REGEX_NO_MATCH:
|
||||
case prompb.LabelMatcher_NRE:
|
||||
op = datatypes.ComparisonNotRegex
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown match type %v", m.Type)
|
||||
|
@ -261,13 +261,13 @@ func nodeFromMatcher(m *remote.LabelMatcher) (*datatypes.Node, error) {
|
|||
}
|
||||
|
||||
// ModelTagsToLabelPairs converts models.Tags to a slice of Prometheus label pairs
|
||||
func ModelTagsToLabelPairs(tags models.Tags) []*remote.LabelPair {
|
||||
pairs := make([]*remote.LabelPair, 0, len(tags))
|
||||
func ModelTagsToLabelPairs(tags models.Tags) []prompb.Label {
|
||||
pairs := make([]prompb.Label, 0, len(tags))
|
||||
for _, t := range tags {
|
||||
if string(t.Value) == "" {
|
||||
continue
|
||||
}
|
||||
pairs = append(pairs, &remote.LabelPair{
|
||||
pairs = append(pairs, prompb.Label{
|
||||
Name: string(t.Key),
|
||||
Value: string(t.Value),
|
||||
})
|
||||
|
@ -276,8 +276,8 @@ func ModelTagsToLabelPairs(tags models.Tags) []*remote.LabelPair {
|
|||
}
|
||||
|
||||
// TagsToLabelPairs converts a map of Influx tags into a slice of Prometheus label pairs
|
||||
func TagsToLabelPairs(tags map[string]string) []*remote.LabelPair {
|
||||
pairs := make([]*remote.LabelPair, 0, len(tags))
|
||||
func TagsToLabelPairs(tags map[string]string) []*prompb.Label {
|
||||
pairs := make([]*prompb.Label, 0, len(tags))
|
||||
for k, v := range tags {
|
||||
if v == "" {
|
||||
// If we select metrics with different sets of labels names,
|
||||
|
@ -288,7 +288,7 @@ func TagsToLabelPairs(tags map[string]string) []*remote.LabelPair {
|
|||
// to make the result correct.
|
||||
continue
|
||||
}
|
||||
pairs = append(pairs, &remote.LabelPair{
|
||||
pairs = append(pairs, &prompb.Label{
|
||||
Name: k,
|
||||
Value: v,
|
||||
})
|
||||
|
|
|
@ -1,3 +0,0 @@
|
|||
package remote
|
||||
|
||||
//go:generate protoc -I$GOPATH/src -I. --gogofaster_out=. remote.proto
|
File diff suppressed because it is too large
Load Diff
|
@ -1,70 +0,0 @@
|
|||
// This file is copied (except for package name) from https://github.com/prometheus/prometheus/blob/master/storage/remote/remote.proto
|
||||
|
||||
// Copyright 2016 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package remote;
|
||||
|
||||
message Sample {
|
||||
double value = 1;
|
||||
int64 timestamp_ms = 2;
|
||||
}
|
||||
|
||||
message LabelPair {
|
||||
string name = 1;
|
||||
string value = 2;
|
||||
}
|
||||
|
||||
message TimeSeries {
|
||||
repeated LabelPair labels = 1;
|
||||
// Sorted by time, oldest sample first.
|
||||
repeated Sample samples = 2;
|
||||
}
|
||||
|
||||
message WriteRequest {
|
||||
repeated TimeSeries timeseries = 1;
|
||||
}
|
||||
|
||||
message ReadRequest {
|
||||
repeated Query queries = 1;
|
||||
}
|
||||
|
||||
message ReadResponse {
|
||||
// In same order as the request's queries.
|
||||
repeated QueryResult results = 1;
|
||||
}
|
||||
|
||||
message Query {
|
||||
int64 start_timestamp_ms = 1;
|
||||
int64 end_timestamp_ms = 2;
|
||||
repeated LabelMatcher matchers = 3;
|
||||
}
|
||||
|
||||
enum MatchType {
|
||||
EQUAL = 0;
|
||||
NOT_EQUAL = 1;
|
||||
REGEX_MATCH = 2;
|
||||
REGEX_NO_MATCH = 3;
|
||||
}
|
||||
|
||||
message LabelMatcher {
|
||||
MatchType type = 1;
|
||||
string name = 2;
|
||||
string value = 3;
|
||||
}
|
||||
|
||||
message QueryResult {
|
||||
repeated TimeSeries timeseries = 1;
|
||||
}
|
|
@ -34,7 +34,6 @@ import (
|
|||
"github.com/influxdata/influxdb/monitor"
|
||||
"github.com/influxdata/influxdb/monitor/diagnostics"
|
||||
"github.com/influxdata/influxdb/prometheus"
|
||||
"github.com/influxdata/influxdb/prometheus/remote"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
"github.com/influxdata/influxdb/services/storage"
|
||||
|
@ -45,6 +44,7 @@ import (
|
|||
"github.com/influxdata/influxql"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -1146,7 +1146,7 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me
|
|||
}
|
||||
|
||||
// Convert the Prometheus remote write request to Influx Points
|
||||
var req remote.WriteRequest
|
||||
var req prompb.WriteRequest
|
||||
if err := proto.Unmarshal(reqBuf, &req); err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
|
@ -1217,7 +1217,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
|
|||
return
|
||||
}
|
||||
|
||||
var req remote.ReadRequest
|
||||
var req prompb.ReadRequest
|
||||
if err := proto.Unmarshal(reqBuf, &req); err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
|
@ -1244,7 +1244,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
|
|||
return
|
||||
}
|
||||
|
||||
respond := func(resp *remote.ReadResponse) {
|
||||
respond := func(resp *prompb.ReadResponse) {
|
||||
data, err := proto.Marshal(resp)
|
||||
if err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@ -1270,8 +1270,8 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
|
|||
return
|
||||
}
|
||||
|
||||
resp := &remote.ReadResponse{
|
||||
Results: []*remote.QueryResult{{}},
|
||||
resp := &prompb.ReadResponse{
|
||||
Results: []*prompb.QueryResult{{}},
|
||||
}
|
||||
|
||||
if rs == nil {
|
||||
|
@ -1291,7 +1291,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
|
|||
var unsupportedCursor string
|
||||
switch cur := cur.(type) {
|
||||
case tsdb.FloatArrayCursor:
|
||||
var series *remote.TimeSeries
|
||||
var series *prompb.TimeSeries
|
||||
for {
|
||||
a := cur.Next()
|
||||
if a.Len() == 0 {
|
||||
|
@ -1300,15 +1300,15 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
|
|||
|
||||
// We have some data for this series.
|
||||
if series == nil {
|
||||
series = &remote.TimeSeries{
|
||||
series = &prompb.TimeSeries{
|
||||
Labels: prometheus.ModelTagsToLabelPairs(tags),
|
||||
}
|
||||
}
|
||||
|
||||
for i, ts := range a.Timestamps {
|
||||
series.Samples = append(series.Samples, &remote.Sample{
|
||||
TimestampMs: ts / int64(time.Millisecond),
|
||||
Value: a.Values[i],
|
||||
series.Samples = append(series.Samples, prompb.Sample{
|
||||
Timestamp: ts / int64(time.Millisecond),
|
||||
Value: a.Values[i],
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,6 @@ import (
|
|||
"github.com/influxdata/influxdb/monitor"
|
||||
"github.com/influxdata/influxdb/monitor/diagnostics"
|
||||
"github.com/influxdata/influxdb/pkg/testing/assert"
|
||||
"github.com/influxdata/influxdb/prometheus/remote"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/services/httpd"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
|
@ -44,6 +43,7 @@ import (
|
|||
"github.com/influxdata/influxdb/storage/reads/datatypes"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
)
|
||||
|
||||
// Ensure the handler returns results from a query (including nil results).
|
||||
|
@ -752,17 +752,17 @@ func TestHandler_Debug_ErrAuthorize(t *testing.T) {
|
|||
|
||||
// Ensure the prometheus remote write works with valid values.
|
||||
func TestHandler_PromWrite(t *testing.T) {
|
||||
req := &remote.WriteRequest{
|
||||
Timeseries: []*remote.TimeSeries{
|
||||
req := &prompb.WriteRequest{
|
||||
Timeseries: []prompb.TimeSeries{
|
||||
{
|
||||
Labels: []*remote.LabelPair{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "host", Value: "a"},
|
||||
{Name: "region", Value: "west"},
|
||||
},
|
||||
Samples: []*remote.Sample{
|
||||
{TimestampMs: 1, Value: 1.2},
|
||||
{TimestampMs: 3, Value: 14.5},
|
||||
{TimestampMs: 6, Value: 222.99},
|
||||
Samples: []prompb.Sample{
|
||||
{Timestamp: 1, Value: 1.2},
|
||||
{Timestamp: 3, Value: 14.5},
|
||||
{Timestamp: 6, Value: 222.99},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -795,9 +795,9 @@ func TestHandler_PromWrite(t *testing.T) {
|
|||
}
|
||||
|
||||
expTS := []int64{
|
||||
req.Timeseries[0].Samples[0].TimestampMs * int64(time.Millisecond),
|
||||
req.Timeseries[0].Samples[1].TimestampMs * int64(time.Millisecond),
|
||||
req.Timeseries[0].Samples[2].TimestampMs * int64(time.Millisecond),
|
||||
req.Timeseries[0].Samples[0].Timestamp * int64(time.Millisecond),
|
||||
req.Timeseries[0].Samples[1].Timestamp * int64(time.Millisecond),
|
||||
req.Timeseries[0].Samples[2].Timestamp * int64(time.Millisecond),
|
||||
}
|
||||
|
||||
for i, point := range points {
|
||||
|
@ -835,23 +835,23 @@ func TestHandler_PromWrite(t *testing.T) {
|
|||
|
||||
// Ensure the prometheus remote write works with invalid values.
|
||||
func TestHandler_PromWrite_Dropped(t *testing.T) {
|
||||
req := &remote.WriteRequest{
|
||||
Timeseries: []*remote.TimeSeries{
|
||||
req := &prompb.WriteRequest{
|
||||
Timeseries: []prompb.TimeSeries{
|
||||
{
|
||||
Labels: []*remote.LabelPair{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "host", Value: "a"},
|
||||
{Name: "region", Value: "west"},
|
||||
},
|
||||
Samples: []*remote.Sample{
|
||||
{TimestampMs: 1, Value: 1.2},
|
||||
{TimestampMs: 2, Value: math.NaN()},
|
||||
{TimestampMs: 3, Value: 14.5},
|
||||
{TimestampMs: 4, Value: math.Inf(-1)},
|
||||
{TimestampMs: 5, Value: math.Inf(1)},
|
||||
{TimestampMs: 6, Value: 222.99},
|
||||
{TimestampMs: 7, Value: math.Inf(-1)},
|
||||
{TimestampMs: 8, Value: math.Inf(1)},
|
||||
{TimestampMs: 9, Value: math.Inf(1)},
|
||||
Samples: []prompb.Sample{
|
||||
{Timestamp: 1, Value: 1.2},
|
||||
{Timestamp: 2, Value: math.NaN()},
|
||||
{Timestamp: 3, Value: 14.5},
|
||||
{Timestamp: 4, Value: math.Inf(-1)},
|
||||
{Timestamp: 5, Value: math.Inf(1)},
|
||||
{Timestamp: 6, Value: 222.99},
|
||||
{Timestamp: 7, Value: math.Inf(-1)},
|
||||
{Timestamp: 8, Value: math.Inf(1)},
|
||||
{Timestamp: 9, Value: math.Inf(1)},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -884,9 +884,9 @@ func TestHandler_PromWrite_Dropped(t *testing.T) {
|
|||
}
|
||||
|
||||
expTS := []int64{
|
||||
req.Timeseries[0].Samples[0].TimestampMs * int64(time.Millisecond),
|
||||
req.Timeseries[0].Samples[2].TimestampMs * int64(time.Millisecond),
|
||||
req.Timeseries[0].Samples[5].TimestampMs * int64(time.Millisecond),
|
||||
req.Timeseries[0].Samples[0].Timestamp * int64(time.Millisecond),
|
||||
req.Timeseries[0].Samples[2].Timestamp * int64(time.Millisecond),
|
||||
req.Timeseries[0].Samples[5].Timestamp * int64(time.Millisecond),
|
||||
}
|
||||
|
||||
for i, point := range points {
|
||||
|
@ -931,12 +931,12 @@ func mustMakeBigString(sz int) string {
|
|||
}
|
||||
|
||||
func TestHandler_PromWrite_Error(t *testing.T) {
|
||||
req := &remote.WriteRequest{
|
||||
Timeseries: []*remote.TimeSeries{
|
||||
req := &prompb.WriteRequest{
|
||||
Timeseries: []prompb.TimeSeries{
|
||||
{
|
||||
// Invalid tag key
|
||||
Labels: []*remote.LabelPair{{Name: mustMakeBigString(models.MaxKeyLength), Value: "a"}},
|
||||
Samples: []*remote.Sample{{TimestampMs: 1, Value: 1.2}},
|
||||
Labels: []prompb.Label{{Name: mustMakeBigString(models.MaxKeyLength), Value: "a"}},
|
||||
Samples: []prompb.Sample{{Timestamp: 1, Value: 1.2}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -977,11 +977,11 @@ func TestHandler_PromWrite_Error(t *testing.T) {
|
|||
// Ensure Prometheus remote read requests are converted to the correct InfluxQL query and
|
||||
// data is returned
|
||||
func TestHandler_PromRead(t *testing.T) {
|
||||
req := &remote.ReadRequest{
|
||||
Queries: []*remote.Query{{
|
||||
Matchers: []*remote.LabelMatcher{
|
||||
req := &prompb.ReadRequest{
|
||||
Queries: []*prompb.Query{{
|
||||
Matchers: []*prompb.LabelMatcher{
|
||||
{
|
||||
Type: remote.MatchType_EQUAL,
|
||||
Type: prompb.LabelMatcher_EQ,
|
||||
Name: "__name__",
|
||||
Value: "value",
|
||||
},
|
||||
|
@ -1042,34 +1042,34 @@ func TestHandler_PromRead(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var resp remote.ReadResponse
|
||||
var resp prompb.ReadResponse
|
||||
if err := proto.Unmarshal(reqBuf, &resp); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expResults := []*remote.QueryResult{
|
||||
expResults := []*prompb.QueryResult{
|
||||
{
|
||||
Timeseries: []*remote.TimeSeries{
|
||||
Timeseries: []*prompb.TimeSeries{
|
||||
{
|
||||
Labels: []*remote.LabelPair{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "host", Value: "server-1"},
|
||||
},
|
||||
Samples: []*remote.Sample{
|
||||
{TimestampMs: 22, Value: 2.3},
|
||||
{TimestampMs: 10000, Value: 2992.33},
|
||||
{TimestampMs: 44, Value: 2.3},
|
||||
{TimestampMs: 20000, Value: 2992.33},
|
||||
Samples: []prompb.Sample{
|
||||
{Timestamp: 22, Value: 2.3},
|
||||
{Timestamp: 10000, Value: 2992.33},
|
||||
{Timestamp: 44, Value: 2.3},
|
||||
{Timestamp: 20000, Value: 2992.33},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []*remote.LabelPair{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "host", Value: "server-2"},
|
||||
},
|
||||
Samples: []*remote.Sample{
|
||||
{TimestampMs: 22, Value: 2.3},
|
||||
{TimestampMs: 10000, Value: 2992.33},
|
||||
{TimestampMs: 44, Value: 2.3},
|
||||
{TimestampMs: 20000, Value: 2992.33},
|
||||
Samples: []prompb.Sample{
|
||||
{Timestamp: 22, Value: 2.3},
|
||||
{Timestamp: 10000, Value: 2992.33},
|
||||
{Timestamp: 44, Value: 2.3},
|
||||
{Timestamp: 20000, Value: 2992.33},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -1082,10 +1082,10 @@ func TestHandler_PromRead(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHandler_PromRead_NoResults(t *testing.T) {
|
||||
req := &remote.ReadRequest{Queries: []*remote.Query{&remote.Query{
|
||||
Matchers: []*remote.LabelMatcher{
|
||||
req := &prompb.ReadRequest{Queries: []*prompb.Query{&prompb.Query{
|
||||
Matchers: []*prompb.LabelMatcher{
|
||||
{
|
||||
Type: remote.MatchType_EQUAL,
|
||||
Type: prompb.LabelMatcher_EQ,
|
||||
Name: "__name__",
|
||||
Value: "value",
|
||||
},
|
||||
|
@ -1111,17 +1111,17 @@ func TestHandler_PromRead_NoResults(t *testing.T) {
|
|||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
var resp remote.ReadResponse
|
||||
var resp prompb.ReadResponse
|
||||
if err := proto.Unmarshal(reqBuf, &resp); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_PromRead_UnsupportedCursors(t *testing.T) {
|
||||
req := &remote.ReadRequest{Queries: []*remote.Query{&remote.Query{
|
||||
Matchers: []*remote.LabelMatcher{
|
||||
req := &prompb.ReadRequest{Queries: []*prompb.Query{&prompb.Query{
|
||||
Matchers: []*prompb.LabelMatcher{
|
||||
{
|
||||
Type: remote.MatchType_EQUAL,
|
||||
Type: prompb.LabelMatcher_EQ,
|
||||
Name: "__name__",
|
||||
Value: "value",
|
||||
},
|
||||
|
@ -1167,7 +1167,7 @@ func TestHandler_PromRead_UnsupportedCursors(t *testing.T) {
|
|||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
var resp remote.ReadResponse
|
||||
var resp prompb.ReadResponse
|
||||
if err := proto.Unmarshal(reqBuf, &resp); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
@ -1195,11 +1195,11 @@ func TestHandler_Flux_DisabledByDefault(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHandler_PromRead_NilResultSet(t *testing.T) {
|
||||
req := &remote.ReadRequest{
|
||||
Queries: []*remote.Query{{
|
||||
Matchers: []*remote.LabelMatcher{
|
||||
req := &prompb.ReadRequest{
|
||||
Queries: []*prompb.Query{{
|
||||
Matchers: []*prompb.LabelMatcher{
|
||||
{
|
||||
Type: remote.MatchType_EQUAL,
|
||||
Type: prompb.LabelMatcher_EQ,
|
||||
Name: "__name__",
|
||||
Value: "value",
|
||||
},
|
||||
|
@ -1241,14 +1241,14 @@ func TestHandler_PromRead_NilResultSet(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
resp := new(remote.ReadResponse)
|
||||
resp := new(prompb.ReadResponse)
|
||||
err = proto.Unmarshal(decompressed, resp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expected := &remote.ReadResponse{
|
||||
Results: []*remote.QueryResult{{}},
|
||||
expected := &prompb.ReadResponse{
|
||||
Results: []*prompb.QueryResult{{}},
|
||||
}
|
||||
if !reflect.DeepEqual(resp, expected) {
|
||||
t.Fatalf("Results differ:\n%v", cmp.Diff(expected, resp))
|
||||
|
|
|
@ -21,8 +21,8 @@ import (
|
|||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/prometheus/remote"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
)
|
||||
|
||||
// Global server used by benchmarks
|
||||
|
@ -9564,28 +9564,28 @@ func TestServer_Prometheus_Read(t *testing.T) {
|
|||
t.Fatalf("test init failed: %s", err)
|
||||
}
|
||||
|
||||
req := &remote.ReadRequest{
|
||||
Queries: []*remote.Query{{
|
||||
Matchers: []*remote.LabelMatcher{
|
||||
req := &prompb.ReadRequest{
|
||||
Queries: []*prompb.Query{{
|
||||
Matchers: []*prompb.LabelMatcher{
|
||||
{
|
||||
Type: remote.MatchType_EQUAL,
|
||||
Type: prompb.LabelMatcher_EQ,
|
||||
Name: "__name__",
|
||||
Value: "mem",
|
||||
},
|
||||
// TODO(edd): awaiting negation bugfix in tsdb.IndexSet.
|
||||
// {
|
||||
// Type: remote.MatchType_NOT_EQUAL,
|
||||
// Type: prompb.LabelMatcher_NOT_EQUAL,
|
||||
// Name: "host",
|
||||
// Value: "server-2",
|
||||
// },
|
||||
{
|
||||
Type: remote.MatchType_REGEX_MATCH,
|
||||
Type: prompb.LabelMatcher_RE,
|
||||
Name: "host",
|
||||
Value: "server-1$",
|
||||
},
|
||||
// TODO(edd): awaiting negation bugfix in tsdb.IndexSet.
|
||||
// {
|
||||
// Type: remote.MatchType_REGEX_NO_MATCH,
|
||||
// Type: prompb.LabelMatcher_REGEX_NO_MATCH,
|
||||
// Name: "region",
|
||||
// Value: "south",
|
||||
// },
|
||||
|
@ -9616,31 +9616,31 @@ func TestServer_Prometheus_Read(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var promResp remote.ReadResponse
|
||||
var promResp prompb.ReadResponse
|
||||
if err := proto.Unmarshal(reqBuf, &promResp); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
expResults := []*remote.QueryResult{
|
||||
expResults := []*prompb.QueryResult{
|
||||
{
|
||||
Timeseries: []*remote.TimeSeries{
|
||||
Timeseries: []*prompb.TimeSeries{
|
||||
{
|
||||
Labels: []*remote.LabelPair{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "host", Value: "server-1"},
|
||||
{Name: "region", Value: "south"},
|
||||
},
|
||||
Samples: []*remote.Sample{
|
||||
{TimestampMs: 120000, Value: 121.2},
|
||||
Samples: []prompb.Sample{
|
||||
{Timestamp: 120000, Value: 121.2},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []*remote.LabelPair{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "host", Value: "server-1"},
|
||||
{Name: "region", Value: "west"},
|
||||
},
|
||||
Samples: []*remote.Sample{
|
||||
{TimestampMs: 119000, Value: 2.34},
|
||||
{TimestampMs: 119500, Value: 988.00},
|
||||
Samples: []prompb.Sample{
|
||||
{Timestamp: 119000, Value: 2.34},
|
||||
{Timestamp: 119500, Value: 988.00},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -9671,24 +9671,24 @@ func TestServer_Prometheus_Write(t *testing.T) {
|
|||
},
|
||||
)
|
||||
|
||||
req := &remote.WriteRequest{
|
||||
Timeseries: []*remote.TimeSeries{
|
||||
req := &prompb.WriteRequest{
|
||||
Timeseries: []prompb.TimeSeries{
|
||||
{
|
||||
Labels: []*remote.LabelPair{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "cpu"},
|
||||
{Name: "host", Value: "a"},
|
||||
},
|
||||
Samples: []*remote.Sample{
|
||||
{TimestampMs: now.UnixNano() / int64(time.Millisecond), Value: 100.0},
|
||||
Samples: []prompb.Sample{
|
||||
{Timestamp: now.UnixNano() / int64(time.Millisecond), Value: 100.0},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []*remote.LabelPair{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "cpu"},
|
||||
{Name: "host", Value: "b"},
|
||||
},
|
||||
Samples: []*remote.Sample{
|
||||
{TimestampMs: now.UnixNano()/int64(time.Millisecond) + 10, Value: 200.0},
|
||||
Samples: []prompb.Sample{
|
||||
{Timestamp: now.UnixNano()/int64(time.Millisecond) + 10, Value: 200.0},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue