toHTTP function moved from github.com/influxdata/ifql PR 362
parent
435e38da9d
commit
32118f36c7
|
@ -174,6 +174,12 @@
|
|||
]
|
||||
revision = "145e0677ff6418fa00ee7e5dd434305631ab44ea"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
name = "github.com/influxdata/line-protocol"
|
||||
packages = ["."]
|
||||
revision = "32c6aa80de5eb09d190ad284a8214a531c6bce57"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
name = "github.com/influxdata/tdigest"
|
||||
|
@ -466,6 +472,6 @@
|
|||
[solve-meta]
|
||||
analyzer-name = "dep"
|
||||
analyzer-version = 1
|
||||
inputs-digest = "ed7f4f2ab4a6f007ba6377ae04b0405146ea60608de96ef76ab19eef8113e694"
|
||||
inputs-digest = "25e119d1b8e9177be607cecab05d220d28259b799c53cd8015ec4aab8461cc69"
|
||||
solver-name = "gps-cdcl"
|
||||
solver-version = 1
|
||||
|
|
|
@ -0,0 +1,430 @@
|
|||
package functions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/line-protocol"
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/execute"
|
||||
"github.com/influxdata/platform/query/plan"
|
||||
"github.com/influxdata/platform/query/semantic"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
ToHTTPKind = "toHTTP"
|
||||
DefaultToHTTPTimeout = 1 * time.Second
|
||||
)
|
||||
|
||||
// DefaultToHTTPUserAgent is the default user agent used by ToHttp
|
||||
var DefaultToHTTPUserAgent = "ifqld/dev"
|
||||
|
||||
func newToHTTPClient() *http.Client {
|
||||
return &http.Client{
|
||||
Transport: &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
DualStack: true,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 100,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
MaxIdleConnsPerHost: runtime.GOMAXPROCS(0) + 1,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
var toHTTPKeepAliveClient = newToHTTPClient()
|
||||
|
||||
// this is used so we can get better validation on marshaling, innerToHTTPOpSpec and ToHTTPOpSpec
|
||||
// need to have identical fields
|
||||
type innerToHTTPOpSpec ToHTTPOpSpec
|
||||
|
||||
type ToHTTPOpSpec struct {
|
||||
Addr string `json:"addr"`
|
||||
Method string `json:"method"` // default behavior should be POST
|
||||
Name string `json:"name"`
|
||||
NameColumn string `json:"name_column"` // either name or name_column must be set, if none is set try to use the "_measurement" column.
|
||||
Headers map[string]string `json:"headers"` // TODO: implement Headers after bug with keys and arrays and objects is fixed (new parser implemented, with string literals as keys)
|
||||
URLParams map[string]string `json:"urlparams"` // TODO: implement URLParams after bug with keys and arrays and objects is fixed (new parser implemented, with string literals as keys)
|
||||
Timeout time.Duration `json:"timeout"` // default to something reasonable if zero
|
||||
NoKeepAlive bool `json:"nokeepalive"`
|
||||
TimeColumn string `json:"time_column"`
|
||||
TagColumns []string `json:"tag_columns"`
|
||||
ValueColumns []string `json:"value_columns"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
query.RegisterFunction(ToHTTPKind, createToHTTPOpSpec, ToHTTPSignature)
|
||||
query.RegisterOpSpec(ToHTTPKind,
|
||||
func() query.OperationSpec { return &ToHTTPOpSpec{} })
|
||||
plan.RegisterProcedureSpec(ToHTTPKind, newToHTTPProcedure, ToHTTPKind)
|
||||
execute.RegisterTransformation(ToHTTPKind, createToHTTPTransformation)
|
||||
}
|
||||
|
||||
// ReadArgs loads a query.Arguments into ToHTTPOpSpec. It sets several default values.
|
||||
// If the http method isn't set, it defaults to POST, it also uppercases the http method.
|
||||
// If the time_column isn't set, it defaults to execute.TimeColLabel.
|
||||
// If the value_column isn't set it defaults to a []string{execute.DefaultValueColLabel}.
|
||||
func (o *ToHTTPOpSpec) ReadArgs(args query.Arguments) error {
|
||||
var err error
|
||||
o.Addr, err = args.GetRequiredString("addr")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var ok bool
|
||||
o.Name, ok, err = args.GetString("name")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.NameColumn, ok, err = args.GetString("name_column")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.NameColumn = "_measurement"
|
||||
}
|
||||
}
|
||||
|
||||
o.Method, ok, err = args.GetString("method")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.Method = "POST"
|
||||
}
|
||||
o.Method = strings.ToUpper(o.Method)
|
||||
|
||||
timeout, ok, err := args.GetDuration("timeout")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.Timeout = DefaultToHTTPTimeout
|
||||
} else {
|
||||
o.Timeout = time.Duration(timeout)
|
||||
}
|
||||
|
||||
o.TimeColumn, ok, err = args.GetString("time_column")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.TimeColumn = execute.DefaultTimeColLabel
|
||||
}
|
||||
|
||||
tagColumns, ok, err := args.GetArray("tag_columns", semantic.String)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.TagColumns = o.TagColumns[:0]
|
||||
if ok {
|
||||
for i := 0; i < tagColumns.Len(); i++ {
|
||||
o.TagColumns = append(o.TagColumns, tagColumns.Get(i).Str())
|
||||
}
|
||||
sort.Strings(o.TagColumns)
|
||||
}
|
||||
|
||||
valueColumns, ok, err := args.GetArray("value_columns", semantic.String)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.ValueColumns = o.ValueColumns[:0]
|
||||
|
||||
if !ok || valueColumns.Len() == 0 {
|
||||
o.ValueColumns = append(o.ValueColumns, execute.DefaultValueColLabel)
|
||||
} else {
|
||||
for i := 0; i < valueColumns.Len(); i++ {
|
||||
o.TagColumns = append(o.ValueColumns, valueColumns.Get(i).Str())
|
||||
}
|
||||
sort.Strings(o.TagColumns)
|
||||
}
|
||||
|
||||
// TODO: get other headers working!
|
||||
o.Headers = map[string]string{
|
||||
"Content-Type": "application/vnd.influx",
|
||||
"User-Agent": DefaultToHTTPUserAgent,
|
||||
}
|
||||
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
func createToHTTPOpSpec(args query.Arguments, a *query.Administration) (query.OperationSpec, error) {
|
||||
if err := a.AddParentFromArgs(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := new(ToHTTPOpSpec)
|
||||
if err := s.ReadArgs(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON unmarshals and validates toHTTPOpSpec into JSON.
|
||||
func (o *ToHTTPOpSpec) UnmarshalJSON(b []byte) (err error) {
|
||||
|
||||
if err = json.Unmarshal(b, (*innerToHTTPOpSpec)(o)); err != nil {
|
||||
return err
|
||||
}
|
||||
u, err := url.ParseRequestURI(o.Addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !(u.Scheme == "https" || u.Scheme == "http" || u.Scheme == "") {
|
||||
return fmt.Errorf("Scheme must be http or https but was %s", u.Scheme)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var ToHTTPSignature = query.DefaultFunctionSignature()
|
||||
|
||||
func (ToHTTPOpSpec) Kind() query.OperationKind {
|
||||
return ToHTTPKind
|
||||
}
|
||||
|
||||
type ToHTTPProcedureSpec struct {
|
||||
Spec *ToHTTPOpSpec
|
||||
}
|
||||
|
||||
func (o *ToHTTPProcedureSpec) Kind() plan.ProcedureKind {
|
||||
return CountKind
|
||||
}
|
||||
|
||||
func (o *ToHTTPProcedureSpec) Copy() plan.ProcedureSpec {
|
||||
s := o.Spec
|
||||
res := &ToHTTPProcedureSpec{
|
||||
Spec: &ToHTTPOpSpec{
|
||||
Addr: s.Addr,
|
||||
Method: s.Method,
|
||||
Name: s.Name,
|
||||
NameColumn: s.NameColumn,
|
||||
Headers: make(map[string]string, len(s.Headers)),
|
||||
URLParams: make(map[string]string, len(s.URLParams)),
|
||||
Timeout: s.Timeout,
|
||||
NoKeepAlive: s.NoKeepAlive,
|
||||
TimeColumn: s.TimeColumn,
|
||||
TagColumns: append([]string(nil), s.TagColumns...),
|
||||
ValueColumns: append([]string(nil), s.ValueColumns...),
|
||||
},
|
||||
}
|
||||
for k, v := range s.Headers {
|
||||
res.Spec.Headers[k] = v
|
||||
}
|
||||
for k, v := range s.URLParams {
|
||||
res.Spec.URLParams[k] = v
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func newToHTTPProcedure(qs query.OperationSpec, a plan.Administration) (plan.ProcedureSpec, error) {
|
||||
spec, ok := qs.(*ToHTTPOpSpec)
|
||||
if !ok && spec != nil {
|
||||
return nil, fmt.Errorf("invalid spec type %T", qs)
|
||||
}
|
||||
return &ToHTTPProcedureSpec{Spec: spec}, nil
|
||||
}
|
||||
|
||||
func createToHTTPTransformation(id execute.DatasetID, mode execute.AccumulationMode, spec plan.ProcedureSpec, a execute.Administration) (execute.Transformation, execute.Dataset, error) {
|
||||
s, ok := spec.(*ToHTTPProcedureSpec)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("invalid spec type %T", spec)
|
||||
}
|
||||
cache := execute.NewBlockBuilderCache(a.Allocator())
|
||||
d := execute.NewDataset(id, mode, cache)
|
||||
t := NewToHTTPTransformation(d, cache, s)
|
||||
return t, d, nil
|
||||
}
|
||||
|
||||
type ToHTTPTransformation struct {
|
||||
d execute.Dataset
|
||||
cache execute.BlockBuilderCache
|
||||
spec *ToHTTPProcedureSpec
|
||||
}
|
||||
|
||||
func (t *ToHTTPTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
func NewToHTTPTransformation(d execute.Dataset, cache execute.BlockBuilderCache, spec *ToHTTPProcedureSpec) *ToHTTPTransformation {
|
||||
|
||||
return &ToHTTPTransformation{
|
||||
d: d,
|
||||
cache: cache,
|
||||
spec: spec,
|
||||
}
|
||||
}
|
||||
|
||||
type toHttpMetric struct {
|
||||
tags []*protocol.Tag
|
||||
fields []*protocol.Field
|
||||
name string
|
||||
t time.Time
|
||||
}
|
||||
|
||||
func (m *toHttpMetric) TagList() []*protocol.Tag {
|
||||
return m.tags
|
||||
}
|
||||
func (m *toHttpMetric) FieldList() []*protocol.Field {
|
||||
return m.fields
|
||||
}
|
||||
|
||||
func (m *toHttpMetric) truncateTagsAndFields() {
|
||||
m.fields = m.fields[:0]
|
||||
m.tags = m.tags[:0]
|
||||
|
||||
}
|
||||
|
||||
func (m *toHttpMetric) Name() string {
|
||||
return m.name
|
||||
}
|
||||
|
||||
func (m *toHttpMetric) Time() time.Time {
|
||||
return m.t
|
||||
}
|
||||
|
||||
// setCols must be called after
|
||||
|
||||
type idxType struct {
|
||||
Idx int
|
||||
Type query.DataType
|
||||
}
|
||||
|
||||
func (t *ToHTTPTransformation) Process(id execute.DatasetID, b query.Block) error {
|
||||
pr, pw := io.Pipe() // TODO: replce the pipe with something faster
|
||||
m := &toHttpMetric{}
|
||||
e := protocol.NewEncoder(pw)
|
||||
e.FailOnFieldErr(true)
|
||||
e.SetFieldSortOrder(protocol.SortFields)
|
||||
cols := b.Cols()
|
||||
labels := make(map[string]idxType, len(cols))
|
||||
for i, col := range cols {
|
||||
labels[col.Label] = idxType{Idx: i, Type: col.Type}
|
||||
}
|
||||
|
||||
// do time
|
||||
timeColLabel := t.spec.Spec.TimeColumn
|
||||
timeColIdx, ok := labels[timeColLabel]
|
||||
|
||||
if !ok {
|
||||
return errors.New("Could not get time column")
|
||||
}
|
||||
if timeColIdx.Type != query.TTime {
|
||||
return fmt.Errorf("column %s is not of type %s", timeColLabel, timeColIdx.Type)
|
||||
}
|
||||
var measurementNameCol string
|
||||
if t.spec.Spec.Name == "" {
|
||||
measurementNameCol = t.spec.Spec.NameColumn
|
||||
}
|
||||
|
||||
// check if each col is a tag or value and cache this value for the loop
|
||||
colMetadatas := b.Cols()
|
||||
isTag := make([]bool, len(colMetadatas))
|
||||
isValue := make([]bool, len(colMetadatas))
|
||||
|
||||
for i, col := range colMetadatas {
|
||||
isValue[i] = sort.SearchStrings(t.spec.Spec.ValueColumns, col.Label) < len(t.spec.Spec.ValueColumns) && t.spec.Spec.ValueColumns[sort.SearchStrings(t.spec.Spec.ValueColumns, col.Label)] == col.Label
|
||||
isTag[i] = sort.SearchStrings(t.spec.Spec.TagColumns, col.Label) < len(t.spec.Spec.TagColumns) && t.spec.Spec.TagColumns[sort.SearchStrings(t.spec.Spec.TagColumns, col.Label)] == col.Label
|
||||
}
|
||||
|
||||
var err error
|
||||
go func() {
|
||||
m.name = t.spec.Spec.Name
|
||||
b.Do(func(er query.ColReader) error {
|
||||
l := er.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
m.truncateTagsAndFields()
|
||||
for j, col := range er.Cols() {
|
||||
switch {
|
||||
case col.Label == timeColLabel:
|
||||
m.t = er.Times(j)[i].Time()
|
||||
case measurementNameCol != "" && measurementNameCol == col.Label:
|
||||
if col.Type != query.TString {
|
||||
return errors.New("invalid type for measurement column")
|
||||
}
|
||||
m.name = er.Strings(j)[i]
|
||||
case isTag[j]:
|
||||
if col.Type != query.TString {
|
||||
return errors.New("invalid type for measurement column")
|
||||
}
|
||||
m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j)[i]})
|
||||
|
||||
case isValue[j]:
|
||||
switch col.Type {
|
||||
case query.TFloat:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Floats(j)[i]})
|
||||
case query.TInt:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Ints(j)[i]})
|
||||
case query.TUInt:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.UInts(j)[i]})
|
||||
case query.TString:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j)[i]})
|
||||
case query.TTime:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Times(j)[i]})
|
||||
case query.TBool:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Bools(j)[i]})
|
||||
default:
|
||||
return fmt.Errorf("invalid type for column %s", col.Label)
|
||||
}
|
||||
}
|
||||
}
|
||||
_, err := e.Encode(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
pw.Close()
|
||||
}()
|
||||
|
||||
req, err := http.NewRequest(t.spec.Spec.Method, t.spec.Spec.Addr, pr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if t.spec.Spec.Timeout <= 0 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), t.spec.Spec.Timeout)
|
||||
req = req.WithContext(ctx)
|
||||
defer cancel()
|
||||
}
|
||||
var resp *http.Response
|
||||
if t.spec.Spec.NoKeepAlive {
|
||||
resp, err = newToHTTPClient().Do(req)
|
||||
} else {
|
||||
resp, err = toHTTPKeepAliveClient.Do(req)
|
||||
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
return req.Body.Close()
|
||||
}
|
||||
|
||||
func (t *ToHTTPTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error {
|
||||
return t.d.UpdateWatermark(pt)
|
||||
}
|
||||
func (t *ToHTTPTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error {
|
||||
return t.d.UpdateProcessingTime(pt)
|
||||
}
|
||||
func (t *ToHTTPTransformation) Finish(id execute.DatasetID, err error) {
|
||||
t.d.Finish(err)
|
||||
}
|
|
@ -0,0 +1,460 @@
|
|||
package functions_test
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/execute"
|
||||
"github.com/influxdata/platform/query/execute/executetest"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
"github.com/influxdata/platform/query/querytest"
|
||||
)
|
||||
|
||||
func TestToHTTP_NewQuery(t *testing.T) {
|
||||
tests := []querytest.NewQueryTestCase{
|
||||
{
|
||||
Name: "from with database with range",
|
||||
Raw: `from(db:"mydb") |> toHTTP(addr: "https://localhost:8081", name:"series1", method:"POST", timeout: 50s)`,
|
||||
Want: &query.Spec{
|
||||
Operations: []*query.Operation{
|
||||
{
|
||||
ID: "from0",
|
||||
Spec: &functions.FromOpSpec{
|
||||
Database: "mydb",
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "toHTTP1",
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
Addr: "https://localhost:8081",
|
||||
Name: "series1",
|
||||
Method: "POST",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{execute.DefaultValueColLabel},
|
||||
Headers: map[string]string{
|
||||
"Content-Type": "application/vnd.influx",
|
||||
"User-Agent": "ifqld/dev",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Edges: []query.Edge{
|
||||
{Parent: "from0", Child: "toHTTP1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
tc := tc
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
querytest.NewQueryTestHelper(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestToHTTPOpSpec_UnmarshalJSON(t *testing.T) {
|
||||
type fields struct {
|
||||
Addr string
|
||||
Method string
|
||||
Headers map[string]string
|
||||
URLParams map[string]string
|
||||
Timeout time.Duration
|
||||
NoKeepAlive bool
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
bytes []byte
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "happy path",
|
||||
bytes: []byte(`
|
||||
{
|
||||
"id": "toHTTP",
|
||||
"kind": "toHTTP",
|
||||
"spec": {
|
||||
"addr": "https://localhost:8081",
|
||||
"method" :"POST"
|
||||
}
|
||||
}`),
|
||||
fields: fields{
|
||||
Addr: "https://localhost:8081",
|
||||
Method: "POST",
|
||||
},
|
||||
}, {
|
||||
name: "bad address",
|
||||
bytes: []byte(`
|
||||
{
|
||||
"id": "toHTTP",
|
||||
"kind": "toHTTP",
|
||||
"spec": {
|
||||
"addr": "https://loc alhost:8081",
|
||||
"method" :"POST"
|
||||
}
|
||||
}`),
|
||||
fields: fields{
|
||||
Addr: "https://localhost:8081",
|
||||
Method: "POST",
|
||||
},
|
||||
wantErr: true,
|
||||
}}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
o := &functions.ToHTTPOpSpec{
|
||||
Addr: tt.fields.Addr,
|
||||
Method: tt.fields.Method,
|
||||
Headers: tt.fields.Headers,
|
||||
URLParams: tt.fields.URLParams,
|
||||
Timeout: tt.fields.Timeout,
|
||||
NoKeepAlive: tt.fields.NoKeepAlive,
|
||||
}
|
||||
op := &query.Operation{
|
||||
ID: "toHTTP",
|
||||
Spec: o,
|
||||
}
|
||||
if !tt.wantErr {
|
||||
querytest.OperationMarshalingTestHelper(t, tt.bytes, op)
|
||||
} else if err := o.UnmarshalJSON(tt.bytes); err == nil {
|
||||
t.Errorf("ToHTTPOpSpec.UnmarshalJSON() error = %v, wantErr %v for test %s", err, tt.wantErr, tt.name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestToHTTP_Process(t *testing.T) {
|
||||
data := []byte{}
|
||||
wg := sync.WaitGroup{}
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
defer wg.Done()
|
||||
serverData, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
t.FailNow()
|
||||
}
|
||||
data = append(data, serverData...)
|
||||
}))
|
||||
type wanted struct {
|
||||
Block []*executetest.Block
|
||||
Result []byte
|
||||
}
|
||||
testCases := []struct {
|
||||
name string
|
||||
spec *functions.ToHTTPProcedureSpec
|
||||
data []query.Block
|
||||
want wanted
|
||||
}{
|
||||
{
|
||||
name: "colblock with name in _measurement",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
Addr: server.URL,
|
||||
Method: "GET",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
NameColumn: "_measurement",
|
||||
},
|
||||
},
|
||||
data: []query.Block{execute.CopyBlock(&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_measurement", Type: query.TString},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), "a", 2.0, "one"},
|
||||
{execute.Time(21), "a", 2.0, "one"},
|
||||
{execute.Time(21), "b", 1.0, "seven"},
|
||||
{execute.Time(31), "a", 3.0, "nine"},
|
||||
{execute.Time(41), "c", 4.0, "elevendyone"},
|
||||
},
|
||||
}, executetest.UnlimitedAllocator)},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: []byte("a _value=2 11\na _value=2 21\nb _value=1 21\na _value=3 31\nc _value=4 41\n")},
|
||||
},
|
||||
{
|
||||
name: "one block with measurement name in _measurement",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
Addr: server.URL,
|
||||
Method: "GET",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
NameColumn: "_measurement",
|
||||
},
|
||||
},
|
||||
data: []query.Block{&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_measurement", Type: query.TString},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), "a", 2.0, "one"},
|
||||
{execute.Time(21), "a", 2.0, "one"},
|
||||
{execute.Time(21), "b", 1.0, "seven"},
|
||||
{execute.Time(31), "a", 3.0, "nine"},
|
||||
{execute.Time(41), "c", 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: []byte("a _value=2 11\na _value=2 21\nb _value=1 21\na _value=3 31\nc _value=4 41\n")},
|
||||
},
|
||||
{
|
||||
name: "one block with measurement name in _measurement and tag",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
Addr: server.URL,
|
||||
Method: "GET",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
NameColumn: "_measurement",
|
||||
},
|
||||
},
|
||||
data: []query.Block{&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_measurement", Type: query.TString},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), "a", 2.0, "one"},
|
||||
{execute.Time(21), "a", 2.0, "one"},
|
||||
{execute.Time(21), "b", 1.0, "seven"},
|
||||
{execute.Time(31), "a", 3.0, "nine"},
|
||||
{execute.Time(41), "c", 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: []byte("a,fred=one _value=2 11\na,fred=one _value=2 21\nb,fred=seven _value=1 21\na,fred=nine _value=3 31\nc,fred=elevendyone _value=4 41\n")},
|
||||
},
|
||||
{
|
||||
name: "one block",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
Addr: server.URL,
|
||||
Method: "POST",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
Name: "one_block",
|
||||
},
|
||||
},
|
||||
data: []query.Block{&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0},
|
||||
{execute.Time(21), 1.0},
|
||||
{execute.Time(31), 3.0},
|
||||
{execute.Time(41), 4.0},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: []byte("one_block _value=2 11\none_block _value=1 21\none_block _value=3 31\none_block _value=4 41\n"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one block with unused tag",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
Addr: server.URL,
|
||||
Method: "GET",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
Name: "one_block_w_unused_tag",
|
||||
},
|
||||
},
|
||||
data: []query.Block{&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
{execute.Time(41), 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: []byte(`one_block_w_unused_tag _value=2 11
|
||||
one_block_w_unused_tag _value=1 21
|
||||
one_block_w_unused_tag _value=3 31
|
||||
one_block_w_unused_tag _value=4 41
|
||||
`),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one block with tag",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
Addr: server.URL,
|
||||
Method: "GET",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
Name: "one_block_w_tag",
|
||||
},
|
||||
},
|
||||
data: []query.Block{&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
{execute.Time(41), 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: []byte(`one_block_w_tag,fred=one _value=2 11
|
||||
one_block_w_tag,fred=seven _value=1 21
|
||||
one_block_w_tag,fred=nine _value=3 31
|
||||
one_block_w_tag,fred=elevendyone _value=4 41
|
||||
`),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multi block",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
Addr: server.URL,
|
||||
Method: "GET",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
Name: "multi_block",
|
||||
},
|
||||
},
|
||||
data: []query.Block{
|
||||
&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
},
|
||||
},
|
||||
&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(51), 2.0, "one"},
|
||||
{execute.Time(61), 1.0, "seven"},
|
||||
{execute.Time(71), 3.0, "nine"},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: []byte("multi_block,fred=one _value=2 11\nmulti_block,fred=seven _value=1 21\nmulti_block,fred=nine _value=3 31\n" +
|
||||
"multi_block,fred=one _value=2 51\nmulti_block,fred=seven _value=1 61\nmulti_block,fred=nine _value=3 71\n"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multi collist blocks",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
Addr: server.URL,
|
||||
Method: "GET",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
Name: "multi_collist_blocks",
|
||||
},
|
||||
},
|
||||
data: []query.Block{
|
||||
execute.CopyBlock(
|
||||
&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
},
|
||||
}, executetest.UnlimitedAllocator),
|
||||
&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(51), 2.0, "one"},
|
||||
{execute.Time(61), 1.0, "seven"},
|
||||
{execute.Time(71), 3.0, "nine"},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: []byte("multi_collist_blocks,fred=one _value=2 11\nmulti_collist_blocks,fred=seven _value=1 21\nmulti_collist_blocks,fred=nine _value=3 31\n" +
|
||||
"multi_collist_blocks,fred=one _value=2 51\nmulti_collist_blocks,fred=seven _value=1 61\nmulti_collist_blocks,fred=nine _value=3 71\n"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
wg.Add(len(tc.data))
|
||||
|
||||
executetest.ProcessTestHelper(
|
||||
t,
|
||||
tc.data,
|
||||
tc.want.Block,
|
||||
func(d execute.Dataset, c execute.BlockBuilderCache) execute.Transformation {
|
||||
return functions.NewToHTTPTransformation(d, c, tc.spec)
|
||||
},
|
||||
)
|
||||
wg.Wait() // wait till we are done getting the data back
|
||||
if string(data) != string(tc.want.Result) {
|
||||
t.Logf("expected %s, got %s", tc.want.Result, data)
|
||||
t.Fail()
|
||||
}
|
||||
data = data[:0]
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue