From 32118f36c7a62151b19d90cc72768af720e35bd2 Mon Sep 17 00:00:00 2001 From: "j. Emrys Landivar (docmerlin)" Date: Tue, 22 May 2018 11:48:48 -0500 Subject: [PATCH] toHTTP function moved from github.com/influxdata/ifql PR 362 --- Gopkg.lock | 8 +- query/functions/to_http.go | 430 +++++++++++++++++++++++++++++ query/functions/to_http_test.go | 460 ++++++++++++++++++++++++++++++++ 3 files changed, 897 insertions(+), 1 deletion(-) create mode 100644 query/functions/to_http.go create mode 100644 query/functions/to_http_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 52804e457a..4d475115dd 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -174,6 +174,12 @@ ] revision = "145e0677ff6418fa00ee7e5dd434305631ab44ea" +[[projects]] + branch = "master" + name = "github.com/influxdata/line-protocol" + packages = ["."] + revision = "32c6aa80de5eb09d190ad284a8214a531c6bce57" + [[projects]] branch = "master" name = "github.com/influxdata/tdigest" @@ -466,6 +472,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "ed7f4f2ab4a6f007ba6377ae04b0405146ea60608de96ef76ab19eef8113e694" + inputs-digest = "25e119d1b8e9177be607cecab05d220d28259b799c53cd8015ec4aab8461cc69" solver-name = "gps-cdcl" solver-version = 1 diff --git a/query/functions/to_http.go b/query/functions/to_http.go new file mode 100644 index 0000000000..affbfa06b3 --- /dev/null +++ b/query/functions/to_http.go @@ -0,0 +1,430 @@ +package functions + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/url" + "runtime" + "sort" + "strings" + "time" + + "github.com/influxdata/line-protocol" + "github.com/influxdata/platform/query" + "github.com/influxdata/platform/query/execute" + "github.com/influxdata/platform/query/plan" + "github.com/influxdata/platform/query/semantic" + "github.com/pkg/errors" +) + +const ( + ToHTTPKind = "toHTTP" + DefaultToHTTPTimeout = 1 * time.Second +) + +// DefaultToHTTPUserAgent is the default user agent used by ToHttp +var DefaultToHTTPUserAgent = "ifqld/dev" + +func newToHTTPClient() *http.Client { + return &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + MaxIdleConnsPerHost: runtime.GOMAXPROCS(0) + 1, + }, + } +} + +var toHTTPKeepAliveClient = newToHTTPClient() + +// this is used so we can get better validation on marshaling, innerToHTTPOpSpec and ToHTTPOpSpec +// need to have identical fields +type innerToHTTPOpSpec ToHTTPOpSpec + +type ToHTTPOpSpec struct { + Addr string `json:"addr"` + Method string `json:"method"` // default behavior should be POST + Name string `json:"name"` + NameColumn string `json:"name_column"` // either name or name_column must be set, if none is set try to use the "_measurement" column. + Headers map[string]string `json:"headers"` // TODO: implement Headers after bug with keys and arrays and objects is fixed (new parser implemented, with string literals as keys) + URLParams map[string]string `json:"urlparams"` // TODO: implement URLParams after bug with keys and arrays and objects is fixed (new parser implemented, with string literals as keys) + Timeout time.Duration `json:"timeout"` // default to something reasonable if zero + NoKeepAlive bool `json:"nokeepalive"` + TimeColumn string `json:"time_column"` + TagColumns []string `json:"tag_columns"` + ValueColumns []string `json:"value_columns"` +} + +func init() { + query.RegisterFunction(ToHTTPKind, createToHTTPOpSpec, ToHTTPSignature) + query.RegisterOpSpec(ToHTTPKind, + func() query.OperationSpec { return &ToHTTPOpSpec{} }) + plan.RegisterProcedureSpec(ToHTTPKind, newToHTTPProcedure, ToHTTPKind) + execute.RegisterTransformation(ToHTTPKind, createToHTTPTransformation) +} + +// ReadArgs loads a query.Arguments into ToHTTPOpSpec. It sets several default values. +// If the http method isn't set, it defaults to POST, it also uppercases the http method. +// If the time_column isn't set, it defaults to execute.TimeColLabel. +// If the value_column isn't set it defaults to a []string{execute.DefaultValueColLabel}. +func (o *ToHTTPOpSpec) ReadArgs(args query.Arguments) error { + var err error + o.Addr, err = args.GetRequiredString("addr") + if err != nil { + return err + } + + var ok bool + o.Name, ok, err = args.GetString("name") + if err != nil { + return err + } + if !ok { + o.NameColumn, ok, err = args.GetString("name_column") + if err != nil { + return err + } + if !ok { + o.NameColumn = "_measurement" + } + } + + o.Method, ok, err = args.GetString("method") + if err != nil { + return err + } + if !ok { + o.Method = "POST" + } + o.Method = strings.ToUpper(o.Method) + + timeout, ok, err := args.GetDuration("timeout") + if err != nil { + return err + } + if !ok { + o.Timeout = DefaultToHTTPTimeout + } else { + o.Timeout = time.Duration(timeout) + } + + o.TimeColumn, ok, err = args.GetString("time_column") + if err != nil { + return err + } + if !ok { + o.TimeColumn = execute.DefaultTimeColLabel + } + + tagColumns, ok, err := args.GetArray("tag_columns", semantic.String) + if err != nil { + return err + } + o.TagColumns = o.TagColumns[:0] + if ok { + for i := 0; i < tagColumns.Len(); i++ { + o.TagColumns = append(o.TagColumns, tagColumns.Get(i).Str()) + } + sort.Strings(o.TagColumns) + } + + valueColumns, ok, err := args.GetArray("value_columns", semantic.String) + if err != nil { + return err + } + o.ValueColumns = o.ValueColumns[:0] + + if !ok || valueColumns.Len() == 0 { + o.ValueColumns = append(o.ValueColumns, execute.DefaultValueColLabel) + } else { + for i := 0; i < valueColumns.Len(); i++ { + o.TagColumns = append(o.ValueColumns, valueColumns.Get(i).Str()) + } + sort.Strings(o.TagColumns) + } + + // TODO: get other headers working! + o.Headers = map[string]string{ + "Content-Type": "application/vnd.influx", + "User-Agent": DefaultToHTTPUserAgent, + } + + return err + +} + +func createToHTTPOpSpec(args query.Arguments, a *query.Administration) (query.OperationSpec, error) { + if err := a.AddParentFromArgs(args); err != nil { + return nil, err + } + s := new(ToHTTPOpSpec) + if err := s.ReadArgs(args); err != nil { + return nil, err + } + return s, nil +} + +// UnmarshalJSON unmarshals and validates toHTTPOpSpec into JSON. +func (o *ToHTTPOpSpec) UnmarshalJSON(b []byte) (err error) { + + if err = json.Unmarshal(b, (*innerToHTTPOpSpec)(o)); err != nil { + return err + } + u, err := url.ParseRequestURI(o.Addr) + if err != nil { + return err + } + if !(u.Scheme == "https" || u.Scheme == "http" || u.Scheme == "") { + return fmt.Errorf("Scheme must be http or https but was %s", u.Scheme) + } + return nil +} + +var ToHTTPSignature = query.DefaultFunctionSignature() + +func (ToHTTPOpSpec) Kind() query.OperationKind { + return ToHTTPKind +} + +type ToHTTPProcedureSpec struct { + Spec *ToHTTPOpSpec +} + +func (o *ToHTTPProcedureSpec) Kind() plan.ProcedureKind { + return CountKind +} + +func (o *ToHTTPProcedureSpec) Copy() plan.ProcedureSpec { + s := o.Spec + res := &ToHTTPProcedureSpec{ + Spec: &ToHTTPOpSpec{ + Addr: s.Addr, + Method: s.Method, + Name: s.Name, + NameColumn: s.NameColumn, + Headers: make(map[string]string, len(s.Headers)), + URLParams: make(map[string]string, len(s.URLParams)), + Timeout: s.Timeout, + NoKeepAlive: s.NoKeepAlive, + TimeColumn: s.TimeColumn, + TagColumns: append([]string(nil), s.TagColumns...), + ValueColumns: append([]string(nil), s.ValueColumns...), + }, + } + for k, v := range s.Headers { + res.Spec.Headers[k] = v + } + for k, v := range s.URLParams { + res.Spec.URLParams[k] = v + } + return res +} + +func newToHTTPProcedure(qs query.OperationSpec, a plan.Administration) (plan.ProcedureSpec, error) { + spec, ok := qs.(*ToHTTPOpSpec) + if !ok && spec != nil { + return nil, fmt.Errorf("invalid spec type %T", qs) + } + return &ToHTTPProcedureSpec{Spec: spec}, nil +} + +func createToHTTPTransformation(id execute.DatasetID, mode execute.AccumulationMode, spec plan.ProcedureSpec, a execute.Administration) (execute.Transformation, execute.Dataset, error) { + s, ok := spec.(*ToHTTPProcedureSpec) + if !ok { + return nil, nil, fmt.Errorf("invalid spec type %T", spec) + } + cache := execute.NewBlockBuilderCache(a.Allocator()) + d := execute.NewDataset(id, mode, cache) + t := NewToHTTPTransformation(d, cache, s) + return t, d, nil +} + +type ToHTTPTransformation struct { + d execute.Dataset + cache execute.BlockBuilderCache + spec *ToHTTPProcedureSpec +} + +func (t *ToHTTPTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error { + return t.d.RetractBlock(key) +} + +func NewToHTTPTransformation(d execute.Dataset, cache execute.BlockBuilderCache, spec *ToHTTPProcedureSpec) *ToHTTPTransformation { + + return &ToHTTPTransformation{ + d: d, + cache: cache, + spec: spec, + } +} + +type toHttpMetric struct { + tags []*protocol.Tag + fields []*protocol.Field + name string + t time.Time +} + +func (m *toHttpMetric) TagList() []*protocol.Tag { + return m.tags +} +func (m *toHttpMetric) FieldList() []*protocol.Field { + return m.fields +} + +func (m *toHttpMetric) truncateTagsAndFields() { + m.fields = m.fields[:0] + m.tags = m.tags[:0] + +} + +func (m *toHttpMetric) Name() string { + return m.name +} + +func (m *toHttpMetric) Time() time.Time { + return m.t +} + +// setCols must be called after + +type idxType struct { + Idx int + Type query.DataType +} + +func (t *ToHTTPTransformation) Process(id execute.DatasetID, b query.Block) error { + pr, pw := io.Pipe() // TODO: replce the pipe with something faster + m := &toHttpMetric{} + e := protocol.NewEncoder(pw) + e.FailOnFieldErr(true) + e.SetFieldSortOrder(protocol.SortFields) + cols := b.Cols() + labels := make(map[string]idxType, len(cols)) + for i, col := range cols { + labels[col.Label] = idxType{Idx: i, Type: col.Type} + } + + // do time + timeColLabel := t.spec.Spec.TimeColumn + timeColIdx, ok := labels[timeColLabel] + + if !ok { + return errors.New("Could not get time column") + } + if timeColIdx.Type != query.TTime { + return fmt.Errorf("column %s is not of type %s", timeColLabel, timeColIdx.Type) + } + var measurementNameCol string + if t.spec.Spec.Name == "" { + measurementNameCol = t.spec.Spec.NameColumn + } + + // check if each col is a tag or value and cache this value for the loop + colMetadatas := b.Cols() + isTag := make([]bool, len(colMetadatas)) + isValue := make([]bool, len(colMetadatas)) + + for i, col := range colMetadatas { + isValue[i] = sort.SearchStrings(t.spec.Spec.ValueColumns, col.Label) < len(t.spec.Spec.ValueColumns) && t.spec.Spec.ValueColumns[sort.SearchStrings(t.spec.Spec.ValueColumns, col.Label)] == col.Label + isTag[i] = sort.SearchStrings(t.spec.Spec.TagColumns, col.Label) < len(t.spec.Spec.TagColumns) && t.spec.Spec.TagColumns[sort.SearchStrings(t.spec.Spec.TagColumns, col.Label)] == col.Label + } + + var err error + go func() { + m.name = t.spec.Spec.Name + b.Do(func(er query.ColReader) error { + l := er.Len() + for i := 0; i < l; i++ { + m.truncateTagsAndFields() + for j, col := range er.Cols() { + switch { + case col.Label == timeColLabel: + m.t = er.Times(j)[i].Time() + case measurementNameCol != "" && measurementNameCol == col.Label: + if col.Type != query.TString { + return errors.New("invalid type for measurement column") + } + m.name = er.Strings(j)[i] + case isTag[j]: + if col.Type != query.TString { + return errors.New("invalid type for measurement column") + } + m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j)[i]}) + + case isValue[j]: + switch col.Type { + case query.TFloat: + m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Floats(j)[i]}) + case query.TInt: + m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Ints(j)[i]}) + case query.TUInt: + m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.UInts(j)[i]}) + case query.TString: + m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j)[i]}) + case query.TTime: + m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Times(j)[i]}) + case query.TBool: + m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Bools(j)[i]}) + default: + return fmt.Errorf("invalid type for column %s", col.Label) + } + } + } + _, err := e.Encode(m) + if err != nil { + return err + } + } + return nil + }) + pw.Close() + }() + + req, err := http.NewRequest(t.spec.Spec.Method, t.spec.Spec.Addr, pr) + if err != nil { + return err + } + + if t.spec.Spec.Timeout <= 0 { + ctx, cancel := context.WithTimeout(context.Background(), t.spec.Spec.Timeout) + req = req.WithContext(ctx) + defer cancel() + } + var resp *http.Response + if t.spec.Spec.NoKeepAlive { + resp, err = newToHTTPClient().Do(req) + } else { + resp, err = toHTTPKeepAliveClient.Do(req) + + } + if err != nil { + return err + } + + defer resp.Body.Close() + + return req.Body.Close() +} + +func (t *ToHTTPTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error { + return t.d.UpdateWatermark(pt) +} +func (t *ToHTTPTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error { + return t.d.UpdateProcessingTime(pt) +} +func (t *ToHTTPTransformation) Finish(id execute.DatasetID, err error) { + t.d.Finish(err) +} diff --git a/query/functions/to_http_test.go b/query/functions/to_http_test.go new file mode 100644 index 0000000000..1028a5de3f --- /dev/null +++ b/query/functions/to_http_test.go @@ -0,0 +1,460 @@ +package functions_test + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/influxdata/platform/query" + "github.com/influxdata/platform/query/execute" + "github.com/influxdata/platform/query/execute/executetest" + "github.com/influxdata/platform/query/functions" + "github.com/influxdata/platform/query/querytest" +) + +func TestToHTTP_NewQuery(t *testing.T) { + tests := []querytest.NewQueryTestCase{ + { + Name: "from with database with range", + Raw: `from(db:"mydb") |> toHTTP(addr: "https://localhost:8081", name:"series1", method:"POST", timeout: 50s)`, + Want: &query.Spec{ + Operations: []*query.Operation{ + { + ID: "from0", + Spec: &functions.FromOpSpec{ + Database: "mydb", + }, + }, + { + ID: "toHTTP1", + Spec: &functions.ToHTTPOpSpec{ + Addr: "https://localhost:8081", + Name: "series1", + Method: "POST", + Timeout: 50 * time.Second, + TimeColumn: execute.DefaultTimeColLabel, + ValueColumns: []string{execute.DefaultValueColLabel}, + Headers: map[string]string{ + "Content-Type": "application/vnd.influx", + "User-Agent": "ifqld/dev", + }, + }, + }, + }, + Edges: []query.Edge{ + {Parent: "from0", Child: "toHTTP1"}, + }, + }, + }, + } + for _, tc := range tests { + tc := tc + t.Run(tc.Name, func(t *testing.T) { + t.Parallel() + querytest.NewQueryTestHelper(t, tc) + }) + } +} + +func TestToHTTPOpSpec_UnmarshalJSON(t *testing.T) { + type fields struct { + Addr string + Method string + Headers map[string]string + URLParams map[string]string + Timeout time.Duration + NoKeepAlive bool + } + tests := []struct { + name string + fields fields + bytes []byte + wantErr bool + }{ + { + name: "happy path", + bytes: []byte(` + { + "id": "toHTTP", + "kind": "toHTTP", + "spec": { + "addr": "https://localhost:8081", + "method" :"POST" + } + }`), + fields: fields{ + Addr: "https://localhost:8081", + Method: "POST", + }, + }, { + name: "bad address", + bytes: []byte(` + { + "id": "toHTTP", + "kind": "toHTTP", + "spec": { + "addr": "https://loc alhost:8081", + "method" :"POST" + } + }`), + fields: fields{ + Addr: "https://localhost:8081", + Method: "POST", + }, + wantErr: true, + }} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + o := &functions.ToHTTPOpSpec{ + Addr: tt.fields.Addr, + Method: tt.fields.Method, + Headers: tt.fields.Headers, + URLParams: tt.fields.URLParams, + Timeout: tt.fields.Timeout, + NoKeepAlive: tt.fields.NoKeepAlive, + } + op := &query.Operation{ + ID: "toHTTP", + Spec: o, + } + if !tt.wantErr { + querytest.OperationMarshalingTestHelper(t, tt.bytes, op) + } else if err := o.UnmarshalJSON(tt.bytes); err == nil { + t.Errorf("ToHTTPOpSpec.UnmarshalJSON() error = %v, wantErr %v for test %s", err, tt.wantErr, tt.name) + } + }) + } +} + +func TestToHTTP_Process(t *testing.T) { + data := []byte{} + wg := sync.WaitGroup{} + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer wg.Done() + serverData, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Log(err) + t.FailNow() + } + data = append(data, serverData...) + })) + type wanted struct { + Block []*executetest.Block + Result []byte + } + testCases := []struct { + name string + spec *functions.ToHTTPProcedureSpec + data []query.Block + want wanted + }{ + { + name: "colblock with name in _measurement", + spec: &functions.ToHTTPProcedureSpec{ + Spec: &functions.ToHTTPOpSpec{ + Addr: server.URL, + Method: "GET", + Timeout: 50 * time.Second, + TimeColumn: execute.DefaultTimeColLabel, + ValueColumns: []string{"_value"}, + NameColumn: "_measurement", + }, + }, + data: []query.Block{execute.CopyBlock(&executetest.Block{ + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "_value", Type: query.TFloat}, + {Label: "fred", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(11), "a", 2.0, "one"}, + {execute.Time(21), "a", 2.0, "one"}, + {execute.Time(21), "b", 1.0, "seven"}, + {execute.Time(31), "a", 3.0, "nine"}, + {execute.Time(41), "c", 4.0, "elevendyone"}, + }, + }, executetest.UnlimitedAllocator)}, + want: wanted{ + Block: []*executetest.Block(nil), + Result: []byte("a _value=2 11\na _value=2 21\nb _value=1 21\na _value=3 31\nc _value=4 41\n")}, + }, + { + name: "one block with measurement name in _measurement", + spec: &functions.ToHTTPProcedureSpec{ + Spec: &functions.ToHTTPOpSpec{ + Addr: server.URL, + Method: "GET", + Timeout: 50 * time.Second, + TimeColumn: execute.DefaultTimeColLabel, + ValueColumns: []string{"_value"}, + NameColumn: "_measurement", + }, + }, + data: []query.Block{&executetest.Block{ + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "_value", Type: query.TFloat}, + {Label: "fred", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(11), "a", 2.0, "one"}, + {execute.Time(21), "a", 2.0, "one"}, + {execute.Time(21), "b", 1.0, "seven"}, + {execute.Time(31), "a", 3.0, "nine"}, + {execute.Time(41), "c", 4.0, "elevendyone"}, + }, + }}, + want: wanted{ + Block: []*executetest.Block(nil), + Result: []byte("a _value=2 11\na _value=2 21\nb _value=1 21\na _value=3 31\nc _value=4 41\n")}, + }, + { + name: "one block with measurement name in _measurement and tag", + spec: &functions.ToHTTPProcedureSpec{ + Spec: &functions.ToHTTPOpSpec{ + Addr: server.URL, + Method: "GET", + Timeout: 50 * time.Second, + TimeColumn: execute.DefaultTimeColLabel, + ValueColumns: []string{"_value"}, + TagColumns: []string{"fred"}, + NameColumn: "_measurement", + }, + }, + data: []query.Block{&executetest.Block{ + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "_value", Type: query.TFloat}, + {Label: "fred", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(11), "a", 2.0, "one"}, + {execute.Time(21), "a", 2.0, "one"}, + {execute.Time(21), "b", 1.0, "seven"}, + {execute.Time(31), "a", 3.0, "nine"}, + {execute.Time(41), "c", 4.0, "elevendyone"}, + }, + }}, + want: wanted{ + Block: []*executetest.Block(nil), + Result: []byte("a,fred=one _value=2 11\na,fred=one _value=2 21\nb,fred=seven _value=1 21\na,fred=nine _value=3 31\nc,fred=elevendyone _value=4 41\n")}, + }, + { + name: "one block", + spec: &functions.ToHTTPProcedureSpec{ + Spec: &functions.ToHTTPOpSpec{ + Addr: server.URL, + Method: "POST", + Timeout: 50 * time.Second, + TimeColumn: execute.DefaultTimeColLabel, + ValueColumns: []string{"_value"}, + Name: "one_block", + }, + }, + data: []query.Block{&executetest.Block{ + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {execute.Time(11), 2.0}, + {execute.Time(21), 1.0}, + {execute.Time(31), 3.0}, + {execute.Time(41), 4.0}, + }, + }}, + want: wanted{ + Block: []*executetest.Block(nil), + Result: []byte("one_block _value=2 11\none_block _value=1 21\none_block _value=3 31\none_block _value=4 41\n"), + }, + }, + { + name: "one block with unused tag", + spec: &functions.ToHTTPProcedureSpec{ + Spec: &functions.ToHTTPOpSpec{ + Addr: server.URL, + Method: "GET", + Timeout: 50 * time.Second, + TimeColumn: execute.DefaultTimeColLabel, + ValueColumns: []string{"_value"}, + Name: "one_block_w_unused_tag", + }, + }, + data: []query.Block{&executetest.Block{ + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + {Label: "fred", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(11), 2.0, "one"}, + {execute.Time(21), 1.0, "seven"}, + {execute.Time(31), 3.0, "nine"}, + {execute.Time(41), 4.0, "elevendyone"}, + }, + }}, + want: wanted{ + Block: []*executetest.Block(nil), + Result: []byte(`one_block_w_unused_tag _value=2 11 +one_block_w_unused_tag _value=1 21 +one_block_w_unused_tag _value=3 31 +one_block_w_unused_tag _value=4 41 +`), + }, + }, + { + name: "one block with tag", + spec: &functions.ToHTTPProcedureSpec{ + Spec: &functions.ToHTTPOpSpec{ + Addr: server.URL, + Method: "GET", + Timeout: 50 * time.Second, + TimeColumn: execute.DefaultTimeColLabel, + ValueColumns: []string{"_value"}, + TagColumns: []string{"fred"}, + Name: "one_block_w_tag", + }, + }, + data: []query.Block{&executetest.Block{ + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + {Label: "fred", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(11), 2.0, "one"}, + {execute.Time(21), 1.0, "seven"}, + {execute.Time(31), 3.0, "nine"}, + {execute.Time(41), 4.0, "elevendyone"}, + }, + }}, + want: wanted{ + Block: []*executetest.Block(nil), + Result: []byte(`one_block_w_tag,fred=one _value=2 11 +one_block_w_tag,fred=seven _value=1 21 +one_block_w_tag,fred=nine _value=3 31 +one_block_w_tag,fred=elevendyone _value=4 41 +`), + }, + }, + { + name: "multi block", + spec: &functions.ToHTTPProcedureSpec{ + Spec: &functions.ToHTTPOpSpec{ + Addr: server.URL, + Method: "GET", + Timeout: 50 * time.Second, + TimeColumn: execute.DefaultTimeColLabel, + ValueColumns: []string{"_value"}, + TagColumns: []string{"fred"}, + Name: "multi_block", + }, + }, + data: []query.Block{ + &executetest.Block{ + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + {Label: "fred", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(11), 2.0, "one"}, + {execute.Time(21), 1.0, "seven"}, + {execute.Time(31), 3.0, "nine"}, + }, + }, + &executetest.Block{ + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + {Label: "fred", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(51), 2.0, "one"}, + {execute.Time(61), 1.0, "seven"}, + {execute.Time(71), 3.0, "nine"}, + }, + }, + }, + want: wanted{ + Block: []*executetest.Block(nil), + Result: []byte("multi_block,fred=one _value=2 11\nmulti_block,fred=seven _value=1 21\nmulti_block,fred=nine _value=3 31\n" + + "multi_block,fred=one _value=2 51\nmulti_block,fred=seven _value=1 61\nmulti_block,fred=nine _value=3 71\n"), + }, + }, + { + name: "multi collist blocks", + spec: &functions.ToHTTPProcedureSpec{ + Spec: &functions.ToHTTPOpSpec{ + Addr: server.URL, + Method: "GET", + Timeout: 50 * time.Second, + TimeColumn: execute.DefaultTimeColLabel, + ValueColumns: []string{"_value"}, + TagColumns: []string{"fred"}, + Name: "multi_collist_blocks", + }, + }, + data: []query.Block{ + execute.CopyBlock( + &executetest.Block{ + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + {Label: "fred", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(11), 2.0, "one"}, + {execute.Time(21), 1.0, "seven"}, + {execute.Time(31), 3.0, "nine"}, + }, + }, executetest.UnlimitedAllocator), + &executetest.Block{ + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + {Label: "fred", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(51), 2.0, "one"}, + {execute.Time(61), 1.0, "seven"}, + {execute.Time(71), 3.0, "nine"}, + }, + }, + }, + want: wanted{ + Block: []*executetest.Block(nil), + Result: []byte("multi_collist_blocks,fred=one _value=2 11\nmulti_collist_blocks,fred=seven _value=1 21\nmulti_collist_blocks,fred=nine _value=3 31\n" + + "multi_collist_blocks,fred=one _value=2 51\nmulti_collist_blocks,fred=seven _value=1 61\nmulti_collist_blocks,fred=nine _value=3 71\n"), + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + wg.Add(len(tc.data)) + + executetest.ProcessTestHelper( + t, + tc.data, + tc.want.Block, + func(d execute.Dataset, c execute.BlockBuilderCache) execute.Transformation { + return functions.NewToHTTPTransformation(d, c, tc.spec) + }, + ) + wg.Wait() // wait till we are done getting the data back + if string(data) != string(tc.want.Result) { + t.Logf("expected %s, got %s", tc.want.Result, data) + t.Fail() + } + data = data[:0] + }) + } +}