Add further tests
parent
b314c316a9
commit
522e509709
|
@ -27,7 +27,7 @@ const (
|
|||
prometheusNameTag = "__name__"
|
||||
|
||||
// measurementTagKey is the tag key that all measurement names use in the new storage processor
|
||||
measurementTagKey = "_m"
|
||||
measurementTagKey = "_measurement"
|
||||
)
|
||||
|
||||
var ErrNaNDropped = errors.New("dropped NaN from Prometheus since they are not supported")
|
||||
|
|
|
@ -652,23 +652,23 @@ func TestHandler_PromRead(t *testing.T) {
|
|||
h.Store.ResultSet.TagsFn = func() models.Tags {
|
||||
return models.NewTags(map[string]string{
|
||||
"host": fmt.Sprintf("server-%d", i),
|
||||
"_m": "mem",
|
||||
"_measurement": "mem",
|
||||
})
|
||||
}
|
||||
|
||||
h.ServeHTTP(w, MustNewJSONRequest("POST", "/api/v1/prom/read?db=foo&rp=bar", b))
|
||||
h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/read?db=foo&rp=bar", b))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d", w.Code)
|
||||
}
|
||||
|
||||
reqBuf, err := snappy.Decode(nil, w.Body.Bytes())
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
t.Fatal(err))
|
||||
}
|
||||
|
||||
var resp remote.ReadResponse
|
||||
if err := proto.Unmarshal(reqBuf, &resp); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expResults := []*remote.QueryResult{
|
||||
|
|
|
@ -3,6 +3,7 @@ package storage
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -90,6 +91,10 @@ type Results interface {
|
|||
}
|
||||
|
||||
func (s *Store) Read(ctx context.Context, req *ReadRequest) (Results, error) {
|
||||
fmt.Printf("%#v\n", req)
|
||||
if req != nil {
|
||||
fmt.Printf("%s\n", req.Predicate.String())
|
||||
}
|
||||
if len(req.GroupKeys) > 0 {
|
||||
panic("Read: len(Grouping) > 0")
|
||||
}
|
||||
|
@ -137,6 +142,10 @@ func (s *Store) Read(ctx context.Context, req *ReadRequest) (Results, error) {
|
|||
}
|
||||
|
||||
func (s *Store) GroupRead(ctx context.Context, req *ReadRequest) (*groupResultSet, error) {
|
||||
fmt.Printf("%#v\n", req)
|
||||
if req != nil {
|
||||
fmt.Printf("%s\n", req.Predicate.String())
|
||||
}
|
||||
if req.SeriesLimit > 0 || req.SeriesOffset > 0 {
|
||||
return nil, errors.New("GroupRead: SeriesLimit and SeriesOffset not supported when Grouping")
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package tests
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
|
@ -8,14 +9,20 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/prometheus/remote"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
|
@ -9502,6 +9509,105 @@ func TestServer_NestedAggregateWithMathPanics(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestServer_Prometheus_Read(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := OpenServer(NewConfig())
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", NewRetentionPolicySpec("rp0", 1, 0), true); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
writes := []string{
|
||||
`mem,host=server-1 value=2.34 120000000000`,
|
||||
`mem,host=server-2 value=1.1 121000000000`,
|
||||
`mem,host=server-20 value=18.2 122000000000`,
|
||||
}
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.writes = Writes{
|
||||
&Write{data: strings.Join(writes, "\n")},
|
||||
}
|
||||
|
||||
if err := test.init(s); err != nil {
|
||||
t.Fatalf("test init failed: %s", err)
|
||||
}
|
||||
|
||||
req := &remote.ReadRequest{
|
||||
Queries: []*remote.Query{{
|
||||
Matchers: []*remote.LabelMatcher{
|
||||
{
|
||||
Type: remote.MatchType_EQUAL,
|
||||
Name: "__name__",
|
||||
Value: "mem",
|
||||
},
|
||||
{
|
||||
Type: remote.MatchType_NOT_EQUAL,
|
||||
Name: "host",
|
||||
Value: "server-5",
|
||||
},
|
||||
{
|
||||
Type: remote.MatchType_REGEX_MATCH,
|
||||
Name: "host",
|
||||
Value: "server-.*",
|
||||
},
|
||||
{
|
||||
Type: remote.MatchType_REGEX_NO_MATCH,
|
||||
Name: "host",
|
||||
Value: "server-2",
|
||||
},
|
||||
},
|
||||
StartTimestampMs: 119000,
|
||||
EndTimestampMs: 120010,
|
||||
}},
|
||||
}
|
||||
data, err := proto.Marshal(req)
|
||||
if err != nil {
|
||||
t.Fatal("couldn't marshal prometheus request")
|
||||
}
|
||||
compressed := snappy.Encode(nil, data)
|
||||
b := bytes.NewReader(compressed)
|
||||
|
||||
resp, err := http.Post(s.URL()+"/api/v1/prom/read?db=db0", "", b)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d. Body: %s", resp.StatusCode, MustReadAll(resp.Body))
|
||||
}
|
||||
|
||||
reqBuf, err := snappy.Decode(nil, MustReadAll(resp.Body))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var promResp remote.ReadResponse
|
||||
if err := proto.Unmarshal(reqBuf, &promResp); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
expResults := []*remote.QueryResult{
|
||||
{
|
||||
Timeseries: []*remote.TimeSeries{
|
||||
{
|
||||
Labels: []*remote.LabelPair{
|
||||
{Name: "host", Value: "server-1"},
|
||||
},
|
||||
Samples: []*remote.Sample{
|
||||
{TimestampMs: 120000, Value: 2.34},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(promResp.Results, expResults) {
|
||||
t.Fatalf("Results differ:\n%v", cmp.Diff(promResp.Results, expResults))
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Force uint support to be enabled for testing.
|
||||
models.EnableUintSupport()
|
||||
|
|
Loading…
Reference in New Issue