moved to_http and to_kafka to flux and update to latest flux (#1175)
parent
b91169e779
commit
98be8bcd32
43
go.mod
43
go.mod
|
|
@ -9,16 +9,16 @@ require (
|
|||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
|
||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
|
||||
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883
|
||||
github.com/apex/log v1.0.0 // indirect
|
||||
github.com/apex/log v1.1.0 // indirect
|
||||
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
|
||||
github.com/aws/aws-sdk-go v1.15.50 // indirect
|
||||
github.com/aws/aws-sdk-go v1.15.59 // indirect
|
||||
github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2 // indirect
|
||||
github.com/boltdb/bolt v1.3.1 // indirect
|
||||
github.com/bouk/httprouter v0.0.0-20160817010721-ee8b3818a7f5
|
||||
github.com/caarlos0/ctrlc v1.0.0 // indirect
|
||||
github.com/campoy/unique v0.0.0-20180121183637-88950e537e7e // indirect
|
||||
github.com/cespare/xxhash v1.1.0
|
||||
github.com/coreos/bbolt v1.3.0
|
||||
github.com/coreos/bbolt v1.3.1-coreos.6
|
||||
github.com/davecgh/go-spew v1.1.1
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8
|
||||
|
|
@ -33,25 +33,26 @@ require (
|
|||
github.com/google/go-cmp v0.2.0
|
||||
github.com/google/go-github v17.0.0+incompatible
|
||||
github.com/google/go-querystring v1.0.0 // indirect
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181004151105-1babbf986f6f // indirect
|
||||
github.com/goreleaser/goreleaser v0.88.0
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect
|
||||
github.com/goreleaser/goreleaser v0.91.1
|
||||
github.com/goreleaser/nfpm v0.9.5 // indirect
|
||||
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
|
||||
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c // indirect
|
||||
github.com/hashicorp/raft v1.0.0 // indirect
|
||||
github.com/imdario/mergo v0.3.6 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||
github.com/influxdata/flux v0.0.0-20181012184356-59f53657bd7f
|
||||
github.com/influxdata/influxdb v0.0.0-20181009160823-86ac358448ec
|
||||
github.com/influxdata/flux v0.0.0-20181023220648-4691e5b3500f
|
||||
github.com/influxdata/influxdb v0.0.0-20181017211453-9520b8d95606
|
||||
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
|
||||
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e
|
||||
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
|
||||
github.com/jessevdk/go-flags v1.4.0
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
|
||||
github.com/jsternberg/zap-logfmt v1.2.0
|
||||
github.com/jtolds/gls v4.2.1+incompatible // indirect
|
||||
github.com/julienschmidt/httprouter v0.0.0-20180715161854-348b672cd90d
|
||||
github.com/julienschmidt/httprouter v1.2.0
|
||||
github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef
|
||||
github.com/kevinburke/go-bindata v3.11.0+incompatible
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
|
||||
github.com/mattn/go-isatty v0.0.4
|
||||
github.com/mattn/go-zglob v0.0.0-20180803001819-2ea3427bfa53 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1
|
||||
|
|
@ -62,22 +63,21 @@ require (
|
|||
github.com/nats-io/gnatsd v1.3.0 // indirect
|
||||
github.com/nats-io/go-nats v1.6.0 // indirect
|
||||
github.com/nats-io/go-nats-streaming v0.4.0
|
||||
github.com/nats-io/nats-streaming-server v0.11.0
|
||||
github.com/nats-io/nats-streaming-server v0.11.2
|
||||
github.com/nats-io/nuid v1.0.0 // indirect
|
||||
github.com/opentracing/opentracing-go v1.0.2
|
||||
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c // indirect
|
||||
github.com/philhofer/fwd v1.0.0 // indirect
|
||||
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
|
||||
github.com/pkg/errors v0.8.0
|
||||
github.com/prometheus/client_golang v0.0.0-20171201122222-661e31bf844d
|
||||
github.com/prometheus/client_golang v0.9.0
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
|
||||
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e
|
||||
github.com/prometheus/common v0.0.0-20181020173914-7e9e6cabbd39
|
||||
github.com/satori/go.uuid v1.2.0
|
||||
github.com/segmentio/kafka-go v0.1.0
|
||||
github.com/sirupsen/logrus v1.1.0
|
||||
github.com/sirupsen/logrus v1.1.1
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d // indirect
|
||||
github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a // indirect
|
||||
github.com/spf13/cobra v0.0.3
|
||||
github.com/spf13/pflag v1.0.3 // indirect
|
||||
github.com/spf13/viper v1.2.1
|
||||
github.com/tcnksm/go-input v0.0.0-20180404061846-548a7d7a8ee8
|
||||
github.com/tinylib/msgp v1.0.2 // indirect
|
||||
|
|
@ -85,15 +85,16 @@ require (
|
|||
github.com/willf/bitset v1.1.9 // indirect
|
||||
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6 // indirect
|
||||
go.uber.org/zap v1.9.1
|
||||
golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4
|
||||
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1
|
||||
golang.org/x/oauth2 v0.0.0-20181003184128-c57b0facaced
|
||||
golang.org/x/crypto v0.0.0-20181015023909-0c41d7ab0a0e
|
||||
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519
|
||||
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
|
||||
golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba
|
||||
golang.org/x/sys v0.0.0-20181023152157-44b849a8bc13
|
||||
golang.org/x/text v0.3.0
|
||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2
|
||||
golang.org/x/tools v0.0.0-20181012181339-19e2aca3fdf9 // indirect
|
||||
google.golang.org/api v0.0.0-20181003000758-f5c49d98d21c
|
||||
golang.org/x/tools v0.0.0-20181023010539-40a48ad93fbe // indirect
|
||||
google.golang.org/api v0.0.0-20181021000519-a2651947f503
|
||||
google.golang.org/genproto v0.0.0-20181016170114-94acd270e44e // indirect
|
||||
google.golang.org/grpc v1.15.0
|
||||
gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5
|
||||
gopkg.in/vmihailenco/msgpack.v2 v2.9.1 // indirect
|
||||
|
|
|
|||
74
go.sum
74
go.sum
|
|
@ -17,12 +17,12 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZq
|
|||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
|
||||
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
|
||||
github.com/apex/log v1.0.0 h1:5UWeZC54mWVtOGSCjtuvDPgY/o0QxmjQgvYZ27pLVGQ=
|
||||
github.com/apex/log v1.0.0/go.mod h1:yA770aXIDQrhVOIGurT/pVdfCpSq1GQV/auzMN5fzvY=
|
||||
github.com/apex/log v1.1.0 h1:J5rld6WVFi6NxA6m8GJ1LJqu3+GiTFIt3mYv27gdQWI=
|
||||
github.com/apex/log v1.1.0/go.mod h1:yA770aXIDQrhVOIGurT/pVdfCpSq1GQV/auzMN5fzvY=
|
||||
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
|
||||
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
|
||||
github.com/aws/aws-sdk-go v1.15.50 h1:3QZIQeiRXEMTs+w+BQ2c/3Fi9/Qz9KnipT/M7YI1ub4=
|
||||
github.com/aws/aws-sdk-go v1.15.50/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
|
||||
github.com/aws/aws-sdk-go v1.15.59 h1:K/Jy1OfHttpKHHQEy1V0713bb6XMRiA1HO1aAi/sMNg=
|
||||
github.com/aws/aws-sdk-go v1.15.59/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3ATZkfNZeM=
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||
github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2 h1:oMCHnXa6CCCafdPDbMh/lWRhRByN0VFLvv+g+ayx1SI=
|
||||
|
|
@ -40,8 +40,8 @@ github.com/campoy/unique v0.0.0-20180121183637-88950e537e7e/go.mod h1:9IOqJGCPMS
|
|||
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/coreos/bbolt v1.3.0 h1:HIgH5xUWXT914HCI671AxuTTqjj64UOFr7pHn48LUTI=
|
||||
github.com/coreos/bbolt v1.3.0/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
|
||||
github.com/coreos/bbolt v1.3.1-coreos.6 h1:uTXKg9gY70s9jMAKdfljFQcuh4e/BXOM+V+d00KFj3A=
|
||||
github.com/coreos/bbolt v1.3.1-coreos.6/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
|
|
@ -63,8 +63,6 @@ github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd h1:r04M
|
|||
github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=
|
||||
github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493 h1:OTanQnFt0bi5iLFSdbEVA/idR6Q2WhCm+deb7ir2CcM=
|
||||
github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24=
|
||||
github.com/go-ini/ini v1.25.4 h1:Mujh4R/dH6YL8bxuISne3xX2+qcQ9p0IxKAP6ExWoUo=
|
||||
github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
|
||||
github.com/go-sql-driver/mysql v1.4.0 h1:7LxgVwFb2hIQtMm87NdgAVfXjnt4OePseqT1tKx+opk=
|
||||
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
|
||||
github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
|
||||
|
|
@ -101,8 +99,8 @@ github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4r
|
|||
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
|
||||
github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=
|
||||
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181004151105-1babbf986f6f h1:JJ2EP5vV3LAD2U1CxQtD7PTOO15Y96kXmKDz7TjxGHs=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181004151105-1babbf986f6f/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||
github.com/goreleaser/nfpm v0.9.5 h1:ntRGZSucXRjoCk6FdwJaXcCZxZZu7YoqX7UH5IC13l4=
|
||||
github.com/goreleaser/nfpm v0.9.5/go.mod h1:kn0Dps10Osi7V2icEXFTBRZhmiuGPUizzZVw/WQtQ/k=
|
||||
github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0=
|
||||
|
|
@ -121,11 +119,11 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
|
|||
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
|
||||
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
|
||||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
github.com/influxdata/flux v0.0.0-20181012184356-59f53657bd7f h1:vvHzSLXoF9n5Zq28QZzHUOhv5viN8JodORYq+Xk4TBg=
|
||||
github.com/influxdata/flux v0.0.0-20181012184356-59f53657bd7f/go.mod h1:BHspYxVDHrBApUfImcwa0ZOpRanbmO/ACD2iXbBD3Ic=
|
||||
github.com/influxdata/flux v0.0.0-20181023220648-4691e5b3500f h1:upxKInE9S2FcVswMZ9dmytr8MhOu6dSx/k615bkqk5k=
|
||||
github.com/influxdata/flux v0.0.0-20181023220648-4691e5b3500f/go.mod h1:wC5PBWVC/p7SQjKZNJ/dtwAuIufYHOVapzlC/A3awbQ=
|
||||
github.com/influxdata/goreleaser v0.86.2-0.20181010170531-0fd209ba67f5/go.mod h1:aVuBpDAT5VtjtUxzvBt8HOd0buzvvk7OX3H2iaviixg=
|
||||
github.com/influxdata/influxdb v0.0.0-20181009160823-86ac358448ec h1:TCUzgPkjJ/gcXXjxjssQoAYHeUxPHEdTz4XgKrNKw+I=
|
||||
github.com/influxdata/influxdb v0.0.0-20181009160823-86ac358448ec/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
|
||||
github.com/influxdata/influxdb v0.0.0-20181017211453-9520b8d95606 h1:LBPg9NDkmVEGr22IrYPNsESFPjUnSgAbIgClv65dMIg=
|
||||
github.com/influxdata/influxdb v0.0.0-20181017211453-9520b8d95606/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
|
||||
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=
|
||||
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo=
|
||||
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e h1:/o3vQtpWJhvnIbXley4/jwzzqNeigJK9z+LZcJZ9zfM=
|
||||
|
|
@ -138,12 +136,14 @@ github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGAR
|
|||
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
|
||||
github.com/jsternberg/zap-logfmt v1.2.0 h1:1v+PK4/B48cy8cfQbxL4FmmNZrjnIMr2BsnyEmXqv2o=
|
||||
github.com/jsternberg/zap-logfmt v1.2.0/go.mod h1:kz+1CUmCutPWABnNkOu9hOHKdT2q3TDYCcsFy9hpqb0=
|
||||
github.com/jtolds/gls v4.2.1+incompatible h1:fSuqC+Gmlu6l/ZYAoZzx2pyucC8Xza35fpRVWLVmUEE=
|
||||
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/julienschmidt/httprouter v0.0.0-20180715161854-348b672cd90d h1:of6+TpypLAaiv4JxgH5aplBZnt0b65B4v4c8q5oy+Sk=
|
||||
github.com/julienschmidt/httprouter v0.0.0-20180715161854-348b672cd90d/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
||||
github.com/julienschmidt/httprouter v1.2.0 h1:TDTW5Yz1mjftljbcKqRcrYhd4XeOoI98t+9HbQbYf7g=
|
||||
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
||||
github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef h1:2jNeR4YUziVtswNP9sEFAI913cVrzH85T+8Q6LpYbT0=
|
||||
github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef/go.mod h1:Ct9fl0F6iIOGgxJ5npU/IUOhOhqlVrGjyIZc8/MagT0=
|
||||
github.com/kevinburke/go-bindata v3.11.0+incompatible h1:GiPs9jxaG2xY1B5Dt/d/yHUOMlTk14uS35VcmHrdo4I=
|
||||
|
|
@ -151,6 +151,8 @@ github.com/kevinburke/go-bindata v3.11.0+incompatible/go.mod h1:/pEEZ72flUW2p0yi
|
|||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe h1:CHRGQ8V7OlCYtwaKPJi3iA7J+YdNKdo8j7nG5IgDhjs=
|
||||
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
|
|
@ -188,8 +190,8 @@ github.com/nats-io/go-nats v1.6.0 h1:FznPwMfrVwGnSCh7JTXyJDRW0TIkD4Tr+M1LPJt9T70
|
|||
github.com/nats-io/go-nats v1.6.0/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0=
|
||||
github.com/nats-io/go-nats-streaming v0.4.0 h1:00wOBnTKzZGvQOFRSxj18kUm4X2TvXzv8LS0skZegPc=
|
||||
github.com/nats-io/go-nats-streaming v0.4.0/go.mod h1:gfq4R3c9sKAINOpelo0gn/b9QDMBZnmrttcsNF+lqyo=
|
||||
github.com/nats-io/nats-streaming-server v0.11.0 h1:6d32ASBeZJQOoams2GJuviQyf5GVF5R4StHVyEhSQK8=
|
||||
github.com/nats-io/nats-streaming-server v0.11.0/go.mod h1:RyqtDJZvMZO66YmyjIYdIvS69zu/wDAkyNWa8PIUa5c=
|
||||
github.com/nats-io/nats-streaming-server v0.11.2 h1:UCqZbfXUKs9Ejw7KiNaFZEbbiVbK7uA8jbK2TsdGbqg=
|
||||
github.com/nats-io/nats-streaming-server v0.11.2/go.mod h1:RyqtDJZvMZO66YmyjIYdIvS69zu/wDAkyNWa8PIUa5c=
|
||||
github.com/nats-io/nuid v1.0.0 h1:44QGdhbiANq8ZCbUkdn6W5bqtg+mHuDE4wOUuxxndFs=
|
||||
github.com/nats-io/nuid v1.0.0/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
|
||||
|
|
@ -210,10 +212,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
|||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prometheus/client_golang v0.0.0-20171201122222-661e31bf844d h1:1y9b8XHRz8eCQ8grxQqS1Uz8x8V6AVmdIIrPf7qsFMs=
|
||||
github.com/prometheus/client_golang v0.0.0-20171201122222-661e31bf844d/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
|
||||
github.com/prometheus/client_golang v0.9.0 h1:tXuTFVHC03mW0D+Ua1Q2d1EAVqLTuggX50V0VLICCzY=
|
||||
github.com/prometheus/client_golang v0.9.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e h1:n/3MEhJQjQxrOUCzh1Y3Re6aJUUWRp2M9+Oc3eVn/54=
|
||||
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
|
||||
github.com/prometheus/common v0.0.0-20181020173914-7e9e6cabbd39 h1:Cto4X6SVMWRPBkJ/3YHn1iDGDGc/Z+sW+AEMKHMVvN4=
|
||||
github.com/prometheus/common v0.0.0-20181020173914-7e9e6cabbd39/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
|
||||
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFdaDqxJVlbOQ1DtGmZWs/Qau0hIlk+WQ=
|
||||
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
|
||||
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
|
||||
|
|
@ -222,8 +228,8 @@ github.com/segmentio/kafka-go v0.1.0 h1:IXCHG+sXPNiIR5pC/vTEItZduPKu4cnpr85Ygxpx
|
|||
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
|
||||
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
|
||||
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
|
||||
github.com/sirupsen/logrus v1.1.0 h1:65VZabgUiV9ktjGM5nTq0+YurgTyX+YI2lSSfDjI+qU=
|
||||
github.com/sirupsen/logrus v1.1.0/go.mod h1:zrgwTnHtNr00buQ1vSptGe8m1f/BbgsPukg8qsT7A+A=
|
||||
github.com/sirupsen/logrus v1.1.1 h1:VzGj7lhU7KEB9e9gMpAV/v5XT2NVSvLJhJLCWbnkgXg=
|
||||
github.com/sirupsen/logrus v1.1.1/go.mod h1:zrgwTnHtNr00buQ1vSptGe8m1f/BbgsPukg8qsT7A+A=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
|
||||
github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a h1:JSvGDIbmil4Ui/dDdFBExb7/cmkNjyX5F97oglmvCDo=
|
||||
|
|
@ -240,6 +246,8 @@ github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9
|
|||
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
|
||||
github.com/spf13/pflag v1.0.2 h1:Fy0orTDgHdbnzHcsOgfCN4LtHf0ec3wwtiwJqwvf3Gc=
|
||||
github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
|
||||
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
|
||||
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
|
||||
github.com/spf13/viper v1.2.1 h1:bIcUwXqLseLF3BDAZduuNfekWG87ibtFxi59Bq+oI9M=
|
||||
github.com/spf13/viper v1.2.1/go.mod h1:P4AexN0a+C9tGAnUFNwDMYYZv3pjFuvmeiMyKRaNVlI=
|
||||
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
|
||||
|
|
@ -261,43 +269,45 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
|
|||
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
|
||||
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4 h1:Vk3wNqEZwyGyei9yq5ekj7frek2u7HUfffJ1/opblzc=
|
||||
golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20181015023909-0c41d7ab0a0e h1:IzypfodbhbnViNUO/MEh0FzCUooG97cIGfdggUrUSyU=
|
||||
golang.org/x/crypto v0.0.0-20181015023909-0c41d7ab0a0e/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181005035420-146acd28ed58 h1:otZG8yDCO4LVps5+9bxOeNiCvgmOyt96J3roHTYs7oE=
|
||||
golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1 h1:Y/KGZSOdz/2r0WJ9Mkmz6NJBusp0kiNx1Cn82lzJQ6w=
|
||||
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 h1:x6rhz8Y9CjbgQkccRGmELH6K+LJj7tOoh3XWeC1yaQM=
|
||||
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20181003184128-c57b0facaced h1:4oqSq7eft7MdPKBGQK11X9WYUxmj6ZLgGTqYIbY1kyw=
|
||||
golang.org/x/oauth2 v0.0.0-20181003184128-c57b0facaced/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4 h1:99CA0JJbUX4ozCnLon680Jc9e0T1i8HCaLVJMwtI8Hc=
|
||||
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181005133103-4497e2df6f9e h1:EfdBzeKbFSvOjoIqSZcfS8wp0FBLokGBEs9lz1OtSg0=
|
||||
golang.org/x/sys v0.0.0-20181005133103-4497e2df6f9e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba h1:nZJIJPGow0Kf9bU9QTc1U6OXbs/7Hu4e+cNv+hxH+Zc=
|
||||
golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181023152157-44b849a8bc13 h1:ICvJQ9FL9kAAfwGwpoAmcE1O51M0zE++iVRxQ3xyiGE=
|
||||
golang.org/x/sys v0.0.0-20181023152157-44b849a8bc13/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM=
|
||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20181004163742-59602fdee893 h1:rsE8bdRd+SQZ1eQSuWpO3bw7AmfVa+vsnxkZ9tcPzAA=
|
||||
golang.org/x/tools v0.0.0-20181004163742-59602fdee893/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20181012181339-19e2aca3fdf9 h1:3sQpWsmaX/260ENaYpHTEljOMDVUlW9WHBGg9wGAXJk=
|
||||
golang.org/x/tools v0.0.0-20181012181339-19e2aca3fdf9/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
google.golang.org/api v0.0.0-20181003000758-f5c49d98d21c h1:qSBE8MLMBtzNDa9QWZiS0qSIAYpU4BbVXbM70aNG55g=
|
||||
google.golang.org/api v0.0.0-20181003000758-f5c49d98d21c/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
|
||||
golang.org/x/tools v0.0.0-20181023010539-40a48ad93fbe h1:i8YNi6USHuTcWHQPvNjvHY7JmkAmn1MnN/ISnPD/ZHc=
|
||||
golang.org/x/tools v0.0.0-20181023010539-40a48ad93fbe/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
google.golang.org/api v0.0.0-20181021000519-a2651947f503 h1:UK7/bFlIoP9xre0fwSiXFaZZSpzmaen5MKp1sppNJ9U=
|
||||
google.golang.org/api v0.0.0-20181021000519-a2651947f503/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
|
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||
google.golang.org/appengine v1.2.0 h1:S0iUepdCWODXRvtE+gcRDd15L+k+k1AiHlMiMjefH24=
|
||||
google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20181016170114-94acd270e44e h1:I5s8aUkxqPjgAssfOv+dVr+4/7BC40WV6JhcVoORltI=
|
||||
google.golang.org/genproto v0.0.0-20181016170114-94acd270e44e/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/grpc v1.15.0 h1:Az/KuahOM4NAidTEuJCv/RonAA7rYsTPkqXVjr+8OOw=
|
||||
google.golang.org/grpc v1.15.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
|
|
|||
|
|
@ -98,7 +98,7 @@ func (r QueryRequest) Validate() error {
|
|||
}
|
||||
|
||||
func nowFunc(now time.Time) values.Function {
|
||||
timeVal := values.NewTimeValue(values.ConvertTime(now))
|
||||
timeVal := values.NewTime(values.ConvertTime(now))
|
||||
ftype := semantic.NewFunctionType(semantic.FunctionSignature{
|
||||
ReturnType: semantic.Time,
|
||||
})
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
_ "github.com/influxdata/flux/functions" // Import the built-in functions
|
||||
_ "github.com/influxdata/flux/functions/inputs"
|
||||
_ "github.com/influxdata/flux/functions/outputs"
|
||||
_ "github.com/influxdata/flux/functions/transformations"
|
||||
_ "github.com/influxdata/flux/options" // Import the built-in options
|
||||
_ "github.com/influxdata/platform/query/functions" // Import the built-in functions
|
||||
|
|
|
|||
|
|
@ -0,0 +1 @@
|
|||
package functions
|
||||
|
|
@ -39,7 +39,7 @@ func (bd *BucketsDecoder) Fetch() (bool, error) {
|
|||
|
||||
func (bd *BucketsDecoder) Decode() (flux.Table, error) {
|
||||
kb := execute.NewGroupKeyBuilder(nil)
|
||||
kb.AddKeyValue("organizationID", values.NewStringValue(bd.buckets[0].OrganizationID.String()))
|
||||
kb.AddKeyValue("organizationID", values.NewString(bd.buckets[0].OrganizationID.String()))
|
||||
gk, err := kb.Build()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -47,30 +47,42 @@ func (bd *BucketsDecoder) Decode() (flux.Table, error) {
|
|||
|
||||
b := execute.NewColListTableBuilder(gk, bd.alloc)
|
||||
|
||||
b.AddCol(flux.ColMeta{
|
||||
if _, err := b.AddCol(flux.ColMeta{
|
||||
Label: "name",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := b.AddCol(flux.ColMeta{
|
||||
Label: "id",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := b.AddCol(flux.ColMeta{
|
||||
Label: "organization",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := b.AddCol(flux.ColMeta{
|
||||
Label: "organizationID",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := b.AddCol(flux.ColMeta{
|
||||
Label: "retentionPolicy",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := b.AddCol(flux.ColMeta{
|
||||
Label: "retentionPeriod",
|
||||
Type: flux.TInt,
|
||||
})
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, bucket := range bd.buckets {
|
||||
b.AppendString(0, bucket.Name)
|
||||
|
|
|
|||
|
|
@ -1,441 +0,0 @@
|
|||
package functions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/plan"
|
||||
"github.com/influxdata/flux/semantic"
|
||||
"github.com/influxdata/line-protocol"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
ToHTTPKind = "toHTTP"
|
||||
DefaultToHTTPTimeout = 1 * time.Second
|
||||
)
|
||||
|
||||
// DefaultToHTTPUserAgent is the default user agent used by ToHttp
|
||||
var DefaultToHTTPUserAgent = "fluxd/dev"
|
||||
|
||||
func newToHTTPClient() *http.Client {
|
||||
return &http.Client{
|
||||
Transport: &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
DualStack: true,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 100,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
MaxIdleConnsPerHost: runtime.GOMAXPROCS(0) + 1,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
var toHTTPKeepAliveClient = newToHTTPClient()
|
||||
|
||||
// this is used so we can get better validation on marshaling, innerToHTTPOpSpec and ToHTTPOpSpec
|
||||
// need to have identical fields
|
||||
type innerToHTTPOpSpec ToHTTPOpSpec
|
||||
|
||||
type ToHTTPOpSpec struct {
|
||||
URL string `json:"url"`
|
||||
Method string `json:"method"` // default behavior should be POST
|
||||
Name string `json:"name"`
|
||||
NameColumn string `json:"nameColumn"` // either name or name_column must be set, if none is set try to use the "_measurement" column.
|
||||
Headers map[string]string `json:"headers"` // TODO: implement Headers after bug with keys and arrays and objects is fixed (new parser implemented, with string literals as keys)
|
||||
URLParams map[string]string `json:"urlParams"` // TODO: implement URLParams after bug with keys and arrays and objects is fixed (new parser implemented, with string literals as keys)
|
||||
Timeout time.Duration `json:"timeout"` // default to something reasonable if zero
|
||||
NoKeepAlive bool `json:"noKeepAlive"`
|
||||
TimeColumn string `json:"timeColumn"`
|
||||
TagColumns []string `json:"tagColumns"`
|
||||
ValueColumns []string `json:"valueColumns"`
|
||||
}
|
||||
|
||||
var ToHTTPSignature = flux.DefaultFunctionSignature()
|
||||
|
||||
func init() {
|
||||
ToHTTPSignature.Params["url"] = semantic.String
|
||||
ToHTTPSignature.Params["method"] = semantic.String
|
||||
ToHTTPSignature.Params["name"] = semantic.String
|
||||
ToHTTPSignature.Params["timeout"] = semantic.Duration
|
||||
ToHTTPSignature.Params["timeColumn"] = semantic.String
|
||||
ToHTTPSignature.Params["tagColumns"] = semantic.NewArrayType(semantic.String)
|
||||
ToHTTPSignature.Params["valueColumns"] = semantic.NewArrayType(semantic.String)
|
||||
|
||||
flux.RegisterFunctionWithSideEffect(ToHTTPKind, createToHTTPOpSpec, ToHTTPSignature)
|
||||
flux.RegisterOpSpec(ToHTTPKind,
|
||||
func() flux.OperationSpec { return &ToHTTPOpSpec{} })
|
||||
plan.RegisterProcedureSpec(ToHTTPKind, newToHTTPProcedure, ToHTTPKind)
|
||||
execute.RegisterTransformation(ToHTTPKind, createToHTTPTransformation)
|
||||
}
|
||||
|
||||
// ReadArgs loads a flux.Arguments into ToHTTPOpSpec. It sets several default values.
|
||||
// If the http method isn't set, it defaults to POST, it also uppercases the http method.
|
||||
// If the time_column isn't set, it defaults to execute.TimeColLabel.
|
||||
// If the value_column isn't set it defaults to a []string{execute.DefaultValueColLabel}.
|
||||
func (o *ToHTTPOpSpec) ReadArgs(args flux.Arguments) error {
|
||||
var err error
|
||||
o.URL, err = args.GetRequiredString("url")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var ok bool
|
||||
o.Name, ok, err = args.GetString("name")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.NameColumn, ok, err = args.GetString("nameColumn")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.NameColumn = "_measurement"
|
||||
}
|
||||
}
|
||||
|
||||
o.Method, ok, err = args.GetString("method")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.Method = "POST"
|
||||
}
|
||||
o.Method = strings.ToUpper(o.Method)
|
||||
|
||||
timeout, ok, err := args.GetDuration("timeout")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.Timeout = DefaultToHTTPTimeout
|
||||
} else {
|
||||
o.Timeout = time.Duration(timeout)
|
||||
}
|
||||
|
||||
o.TimeColumn, ok, err = args.GetString("timeColumn")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.TimeColumn = execute.DefaultTimeColLabel
|
||||
}
|
||||
|
||||
tagColumns, ok, err := args.GetArray("tagColumns", semantic.String)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.TagColumns = o.TagColumns[:0]
|
||||
if ok {
|
||||
for i := 0; i < tagColumns.Len(); i++ {
|
||||
o.TagColumns = append(o.TagColumns, tagColumns.Get(i).Str())
|
||||
}
|
||||
sort.Strings(o.TagColumns)
|
||||
}
|
||||
|
||||
valueColumns, ok, err := args.GetArray("valueColumns", semantic.String)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.ValueColumns = o.ValueColumns[:0]
|
||||
|
||||
if !ok || valueColumns.Len() == 0 {
|
||||
o.ValueColumns = append(o.ValueColumns, execute.DefaultValueColLabel)
|
||||
} else {
|
||||
for i := 0; i < valueColumns.Len(); i++ {
|
||||
o.TagColumns = append(o.ValueColumns, valueColumns.Get(i).Str())
|
||||
}
|
||||
sort.Strings(o.TagColumns)
|
||||
}
|
||||
|
||||
// TODO: get other headers working!
|
||||
o.Headers = map[string]string{
|
||||
"Content-Type": "application/vnd.influx",
|
||||
"User-Agent": DefaultToHTTPUserAgent,
|
||||
}
|
||||
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
func createToHTTPOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) {
|
||||
if err := a.AddParentFromArgs(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := new(ToHTTPOpSpec)
|
||||
if err := s.ReadArgs(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON unmarshals and validates toHTTPOpSpec into JSON.
|
||||
func (o *ToHTTPOpSpec) UnmarshalJSON(b []byte) (err error) {
|
||||
|
||||
if err = json.Unmarshal(b, (*innerToHTTPOpSpec)(o)); err != nil {
|
||||
return err
|
||||
}
|
||||
u, err := url.ParseRequestURI(o.URL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !(u.Scheme == "https" || u.Scheme == "http" || u.Scheme == "") {
|
||||
return fmt.Errorf("Scheme must be http or https but was %s", u.Scheme)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ToHTTPOpSpec) Kind() flux.OperationKind {
|
||||
return ToHTTPKind
|
||||
}
|
||||
|
||||
type ToHTTPProcedureSpec struct {
|
||||
Spec *ToHTTPOpSpec
|
||||
}
|
||||
|
||||
func (o *ToHTTPProcedureSpec) Kind() plan.ProcedureKind {
|
||||
return ToHTTPKind
|
||||
}
|
||||
|
||||
func (o *ToHTTPProcedureSpec) Copy() plan.ProcedureSpec {
|
||||
s := o.Spec
|
||||
res := &ToHTTPProcedureSpec{
|
||||
Spec: &ToHTTPOpSpec{
|
||||
URL: s.URL,
|
||||
Method: s.Method,
|
||||
Name: s.Name,
|
||||
NameColumn: s.NameColumn,
|
||||
Headers: make(map[string]string, len(s.Headers)),
|
||||
URLParams: make(map[string]string, len(s.URLParams)),
|
||||
Timeout: s.Timeout,
|
||||
NoKeepAlive: s.NoKeepAlive,
|
||||
TimeColumn: s.TimeColumn,
|
||||
TagColumns: append([]string(nil), s.TagColumns...),
|
||||
ValueColumns: append([]string(nil), s.ValueColumns...),
|
||||
},
|
||||
}
|
||||
for k, v := range s.Headers {
|
||||
res.Spec.Headers[k] = v
|
||||
}
|
||||
for k, v := range s.URLParams {
|
||||
res.Spec.URLParams[k] = v
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func newToHTTPProcedure(qs flux.OperationSpec, a plan.Administration) (plan.ProcedureSpec, error) {
|
||||
spec, ok := qs.(*ToHTTPOpSpec)
|
||||
if !ok && spec != nil {
|
||||
return nil, fmt.Errorf("invalid spec type %T", qs)
|
||||
}
|
||||
return &ToHTTPProcedureSpec{Spec: spec}, nil
|
||||
}
|
||||
|
||||
func createToHTTPTransformation(id execute.DatasetID, mode execute.AccumulationMode, spec plan.ProcedureSpec, a execute.Administration) (execute.Transformation, execute.Dataset, error) {
|
||||
s, ok := spec.(*ToHTTPProcedureSpec)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("invalid spec type %T", spec)
|
||||
}
|
||||
cache := execute.NewTableBuilderCache(a.Allocator())
|
||||
d := execute.NewDataset(id, mode, cache)
|
||||
t := NewToHTTPTransformation(d, cache, s)
|
||||
return t, d, nil
|
||||
}
|
||||
|
||||
type ToHTTPTransformation struct {
|
||||
d execute.Dataset
|
||||
cache execute.TableBuilderCache
|
||||
spec *ToHTTPProcedureSpec
|
||||
}
|
||||
|
||||
func (t *ToHTTPTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error {
|
||||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func NewToHTTPTransformation(d execute.Dataset, cache execute.TableBuilderCache, spec *ToHTTPProcedureSpec) *ToHTTPTransformation {
|
||||
|
||||
return &ToHTTPTransformation{
|
||||
d: d,
|
||||
cache: cache,
|
||||
spec: spec,
|
||||
}
|
||||
}
|
||||
|
||||
type toHttpMetric struct {
|
||||
tags []*protocol.Tag
|
||||
fields []*protocol.Field
|
||||
name string
|
||||
t time.Time
|
||||
}
|
||||
|
||||
func (m *toHttpMetric) TagList() []*protocol.Tag {
|
||||
return m.tags
|
||||
}
|
||||
func (m *toHttpMetric) FieldList() []*protocol.Field {
|
||||
return m.fields
|
||||
}
|
||||
|
||||
func (m *toHttpMetric) truncateTagsAndFields() {
|
||||
m.fields = m.fields[:0]
|
||||
m.tags = m.tags[:0]
|
||||
|
||||
}
|
||||
|
||||
func (m *toHttpMetric) Name() string {
|
||||
return m.name
|
||||
}
|
||||
|
||||
func (m *toHttpMetric) Time() time.Time {
|
||||
return m.t
|
||||
}
|
||||
|
||||
// setCols must be called after
|
||||
|
||||
type idxType struct {
|
||||
Idx int
|
||||
Type flux.DataType
|
||||
}
|
||||
|
||||
func (t *ToHTTPTransformation) Process(id execute.DatasetID, tbl flux.Table) error {
|
||||
pr, pw := io.Pipe() // TODO: replce the pipe with something faster
|
||||
m := &toHttpMetric{}
|
||||
e := protocol.NewEncoder(pw)
|
||||
e.FailOnFieldErr(true)
|
||||
e.SetFieldSortOrder(protocol.SortFields)
|
||||
cols := tbl.Cols()
|
||||
labels := make(map[string]idxType, len(cols))
|
||||
for i, col := range cols {
|
||||
labels[col.Label] = idxType{Idx: i, Type: col.Type}
|
||||
}
|
||||
|
||||
// do time
|
||||
timeColLabel := t.spec.Spec.TimeColumn
|
||||
timeColIdx, ok := labels[timeColLabel]
|
||||
|
||||
if !ok {
|
||||
return errors.New("Could not get time column")
|
||||
}
|
||||
if timeColIdx.Type != flux.TTime {
|
||||
return fmt.Errorf("column %s is not of type %s", timeColLabel, timeColIdx.Type)
|
||||
}
|
||||
var measurementNameCol string
|
||||
if t.spec.Spec.Name == "" {
|
||||
measurementNameCol = t.spec.Spec.NameColumn
|
||||
}
|
||||
|
||||
// check if each col is a tag or value and cache this value for the loop
|
||||
colMetadatas := tbl.Cols()
|
||||
isTag := make([]bool, len(colMetadatas))
|
||||
isValue := make([]bool, len(colMetadatas))
|
||||
|
||||
for i, col := range colMetadatas {
|
||||
isValue[i] = sort.SearchStrings(t.spec.Spec.ValueColumns, col.Label) < len(t.spec.Spec.ValueColumns) && t.spec.Spec.ValueColumns[sort.SearchStrings(t.spec.Spec.ValueColumns, col.Label)] == col.Label
|
||||
isTag[i] = sort.SearchStrings(t.spec.Spec.TagColumns, col.Label) < len(t.spec.Spec.TagColumns) && t.spec.Spec.TagColumns[sort.SearchStrings(t.spec.Spec.TagColumns, col.Label)] == col.Label
|
||||
}
|
||||
wg := sync.WaitGroup{}
|
||||
var err error
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
m.name = t.spec.Spec.Name
|
||||
tbl.Do(func(er flux.ColReader) error {
|
||||
l := er.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
m.truncateTagsAndFields()
|
||||
for j, col := range er.Cols() {
|
||||
switch {
|
||||
case col.Label == timeColLabel:
|
||||
m.t = er.Times(j)[i].Time()
|
||||
case measurementNameCol != "" && measurementNameCol == col.Label:
|
||||
if col.Type != flux.TString {
|
||||
return errors.New("invalid type for measurement column")
|
||||
}
|
||||
m.name = er.Strings(j)[i]
|
||||
case isTag[j]:
|
||||
if col.Type != flux.TString {
|
||||
return errors.New("invalid type for measurement column")
|
||||
}
|
||||
m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j)[i]})
|
||||
|
||||
case isValue[j]:
|
||||
switch col.Type {
|
||||
case flux.TFloat:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Floats(j)[i]})
|
||||
case flux.TInt:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Ints(j)[i]})
|
||||
case flux.TUInt:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.UInts(j)[i]})
|
||||
case flux.TString:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j)[i]})
|
||||
case flux.TTime:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Times(j)[i]})
|
||||
case flux.TBool:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Bools(j)[i]})
|
||||
default:
|
||||
return fmt.Errorf("invalid type for column %s", col.Label)
|
||||
}
|
||||
}
|
||||
}
|
||||
_, err := e.Encode(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
pw.Close()
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
req, err := http.NewRequest(t.spec.Spec.Method, t.spec.Spec.URL, pr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if t.spec.Spec.Timeout <= 0 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), t.spec.Spec.Timeout)
|
||||
req = req.WithContext(ctx)
|
||||
defer cancel()
|
||||
}
|
||||
var resp *http.Response
|
||||
if t.spec.Spec.NoKeepAlive {
|
||||
resp, err = newToHTTPClient().Do(req)
|
||||
} else {
|
||||
resp, err = toHTTPKeepAliveClient.Do(req)
|
||||
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
wg.Wait()
|
||||
resp.Body.Close()
|
||||
|
||||
return req.Body.Close()
|
||||
}
|
||||
|
||||
func (t *ToHTTPTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error {
|
||||
return t.d.UpdateWatermark(pt)
|
||||
}
|
||||
func (t *ToHTTPTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error {
|
||||
return t.d.UpdateProcessingTime(pt)
|
||||
}
|
||||
func (t *ToHTTPTransformation) Finish(id execute.DatasetID, err error) {
|
||||
t.d.Finish(err)
|
||||
}
|
||||
|
|
@ -1,462 +0,0 @@
|
|||
package functions_test
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/execute/executetest"
|
||||
"github.com/influxdata/flux/functions/inputs"
|
||||
"github.com/influxdata/flux/querytest"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
)
|
||||
|
||||
func TestToHTTP_NewQuery(t *testing.T) {
|
||||
tests := []querytest.NewQueryTestCase{
|
||||
{
|
||||
Name: "from with database with range",
|
||||
Raw: `from(bucket:"mybucket") |> toHTTP(url: "https://localhost:8081", name:"series1", method:"POST", timeout: 50s)`,
|
||||
Want: &flux.Spec{
|
||||
Operations: []*flux.Operation{
|
||||
{
|
||||
ID: "from0",
|
||||
Spec: &inputs.FromOpSpec{
|
||||
Bucket: "mybucket",
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "toHTTP1",
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
URL: "https://localhost:8081",
|
||||
Name: "series1",
|
||||
Method: "POST",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{execute.DefaultValueColLabel},
|
||||
Headers: map[string]string{
|
||||
"Content-Type": "application/vnd.influx",
|
||||
"User-Agent": "fluxd/dev",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Edges: []flux.Edge{
|
||||
{Parent: "from0", Child: "toHTTP1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
tc := tc
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
querytest.NewQueryTestHelper(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestToHTTPOpSpec_UnmarshalJSON(t *testing.T) {
|
||||
type fields struct {
|
||||
URL string
|
||||
Method string
|
||||
Headers map[string]string
|
||||
URLParams map[string]string
|
||||
Timeout time.Duration
|
||||
NoKeepAlive bool
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
bytes []byte
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "happy path",
|
||||
bytes: []byte(`
|
||||
{
|
||||
"id": "toHTTP",
|
||||
"kind": "toHTTP",
|
||||
"spec": {
|
||||
"url": "https://localhost:8081",
|
||||
"method" :"POST"
|
||||
}
|
||||
}`),
|
||||
fields: fields{
|
||||
URL: "https://localhost:8081",
|
||||
Method: "POST",
|
||||
},
|
||||
}, {
|
||||
name: "bad address",
|
||||
bytes: []byte(`
|
||||
{
|
||||
"id": "toHTTP",
|
||||
"kind": "toHTTP",
|
||||
"spec": {
|
||||
"url": "https://loc alhost:8081",
|
||||
"method" :"POST"
|
||||
}
|
||||
}`),
|
||||
fields: fields{
|
||||
URL: "https://localhost:8081",
|
||||
Method: "POST",
|
||||
},
|
||||
wantErr: true,
|
||||
}}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
o := &functions.ToHTTPOpSpec{
|
||||
URL: tt.fields.URL,
|
||||
Method: tt.fields.Method,
|
||||
Headers: tt.fields.Headers,
|
||||
URLParams: tt.fields.URLParams,
|
||||
Timeout: tt.fields.Timeout,
|
||||
NoKeepAlive: tt.fields.NoKeepAlive,
|
||||
}
|
||||
op := &flux.Operation{
|
||||
ID: "toHTTP",
|
||||
Spec: o,
|
||||
}
|
||||
if !tt.wantErr {
|
||||
querytest.OperationMarshalingTestHelper(t, tt.bytes, op)
|
||||
} else if err := o.UnmarshalJSON(tt.bytes); err == nil {
|
||||
t.Errorf("ToHTTPOpSpec.UnmarshalJSON() error = %v, wantErr %v for test %s", err, tt.wantErr, tt.name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestToHTTP_Process(t *testing.T) {
|
||||
data := []byte{}
|
||||
wg := sync.WaitGroup{}
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
defer wg.Done()
|
||||
serverData, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
t.FailNow()
|
||||
}
|
||||
data = append(data, serverData...)
|
||||
}))
|
||||
type wanted struct {
|
||||
Table []*executetest.Table
|
||||
Result []byte
|
||||
}
|
||||
testCases := []struct {
|
||||
name string
|
||||
spec *functions.ToHTTPProcedureSpec
|
||||
data []flux.Table
|
||||
want wanted
|
||||
}{
|
||||
{
|
||||
name: "coltable with name in _measurement",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
URL: server.URL,
|
||||
Method: "GET",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
NameColumn: "_measurement",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{execute.CopyTable(&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_measurement", Type: flux.TString},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), "a", 2.0, "one"},
|
||||
{execute.Time(21), "a", 2.0, "one"},
|
||||
{execute.Time(21), "b", 1.0, "seven"},
|
||||
{execute.Time(31), "a", 3.0, "nine"},
|
||||
{execute.Time(41), "c", 4.0, "elevendyone"},
|
||||
},
|
||||
}, executetest.UnlimitedAllocator)},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: []byte("a _value=2 11\na _value=2 21\nb _value=1 21\na _value=3 31\nc _value=4 41\n")},
|
||||
},
|
||||
{
|
||||
name: "one table with measurement name in _measurement",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
URL: server.URL,
|
||||
Method: "GET",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
NameColumn: "_measurement",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_measurement", Type: flux.TString},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), "a", 2.0, "one"},
|
||||
{execute.Time(21), "a", 2.0, "one"},
|
||||
{execute.Time(21), "b", 1.0, "seven"},
|
||||
{execute.Time(31), "a", 3.0, "nine"},
|
||||
{execute.Time(41), "c", 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: []byte("a _value=2 11\na _value=2 21\nb _value=1 21\na _value=3 31\nc _value=4 41\n")},
|
||||
},
|
||||
{
|
||||
name: "one table with measurement name in _measurement and tag",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
URL: server.URL,
|
||||
Method: "GET",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
NameColumn: "_measurement",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_measurement", Type: flux.TString},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), "a", 2.0, "one"},
|
||||
{execute.Time(21), "a", 2.0, "one"},
|
||||
{execute.Time(21), "b", 1.0, "seven"},
|
||||
{execute.Time(31), "a", 3.0, "nine"},
|
||||
{execute.Time(41), "c", 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: []byte("a,fred=one _value=2 11\na,fred=one _value=2 21\nb,fred=seven _value=1 21\na,fred=nine _value=3 31\nc,fred=elevendyone _value=4 41\n")},
|
||||
},
|
||||
{
|
||||
name: "one table",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
URL: server.URL,
|
||||
Method: "POST",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
Name: "one_table",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0},
|
||||
{execute.Time(21), 1.0},
|
||||
{execute.Time(31), 3.0},
|
||||
{execute.Time(41), 4.0},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: []byte("one_table _value=2 11\none_table _value=1 21\none_table _value=3 31\none_table _value=4 41\n"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one table with unused tag",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
URL: server.URL,
|
||||
Method: "GET",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
Name: "one_table_w_unused_tag",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
{execute.Time(41), 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: []byte(`one_table_w_unused_tag _value=2 11
|
||||
one_table_w_unused_tag _value=1 21
|
||||
one_table_w_unused_tag _value=3 31
|
||||
one_table_w_unused_tag _value=4 41
|
||||
`),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one table with tag",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
URL: server.URL,
|
||||
Method: "GET",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
Name: "one_table_w_tag",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
{execute.Time(41), 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: []byte(`one_table_w_tag,fred=one _value=2 11
|
||||
one_table_w_tag,fred=seven _value=1 21
|
||||
one_table_w_tag,fred=nine _value=3 31
|
||||
one_table_w_tag,fred=elevendyone _value=4 41
|
||||
`),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multi table",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
URL: server.URL,
|
||||
Method: "GET",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
Name: "multi_table",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{
|
||||
&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
},
|
||||
},
|
||||
&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(51), 2.0, "one"},
|
||||
{execute.Time(61), 1.0, "seven"},
|
||||
{execute.Time(71), 3.0, "nine"},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: []byte("multi_table,fred=one _value=2 11\nmulti_table,fred=seven _value=1 21\nmulti_table,fred=nine _value=3 31\n" +
|
||||
"multi_table,fred=one _value=2 51\nmulti_table,fred=seven _value=1 61\nmulti_table,fred=nine _value=3 71\n"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multi collist tables",
|
||||
spec: &functions.ToHTTPProcedureSpec{
|
||||
Spec: &functions.ToHTTPOpSpec{
|
||||
URL: server.URL,
|
||||
Method: "GET",
|
||||
Timeout: 50 * time.Second,
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
Name: "multi_collist_tables",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{
|
||||
execute.CopyTable(
|
||||
&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
},
|
||||
}, executetest.UnlimitedAllocator),
|
||||
&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(51), 2.0, "one"},
|
||||
{execute.Time(61), 1.0, "seven"},
|
||||
{execute.Time(71), 3.0, "nine"},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: []byte("multi_collist_tables,fred=one _value=2 11\nmulti_collist_tables,fred=seven _value=1 21\nmulti_collist_tables,fred=nine _value=3 31\n" +
|
||||
"multi_collist_tables,fred=one _value=2 51\nmulti_collist_tables,fred=seven _value=1 61\nmulti_collist_tables,fred=nine _value=3 71\n"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
wg.Add(len(tc.data))
|
||||
|
||||
executetest.ProcessTestHelper(
|
||||
t,
|
||||
tc.data,
|
||||
tc.want.Table,
|
||||
nil,
|
||||
func(d execute.Dataset, c execute.TableBuilderCache) execute.Transformation {
|
||||
return functions.NewToHTTPTransformation(d, c, tc.spec)
|
||||
},
|
||||
)
|
||||
wg.Wait() // wait till we are done getting the data back
|
||||
if string(data) != string(tc.want.Result) {
|
||||
t.Logf("expected %s, got %s", tc.want.Result, data)
|
||||
t.Fail()
|
||||
}
|
||||
data = data[:0]
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -1,412 +0,0 @@
|
|||
package functions
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/plan"
|
||||
"github.com/influxdata/flux/semantic"
|
||||
"github.com/influxdata/line-protocol"
|
||||
"github.com/pkg/errors"
|
||||
kafka "github.com/segmentio/kafka-go"
|
||||
)
|
||||
|
||||
const (
|
||||
// ToKafkaKind is the Kind for the ToKafka Flux function
|
||||
ToKafkaKind = "toKafka"
|
||||
)
|
||||
|
||||
type ToKafkaOpSpec struct {
|
||||
Brokers []string `json:"brokers"`
|
||||
Topic string `json:"topic"`
|
||||
Balancer string `json:"balancer"`
|
||||
Name string `json:"name"`
|
||||
NameColumn string `json:"nameColumn"` // either name or name_column must be set, if none is set try to use the "_measurement" column.
|
||||
TimeColumn string `json:"timeColumn"`
|
||||
TagColumns []string `json:"tagColumns"`
|
||||
ValueColumns []string `json:"valueColumns"`
|
||||
MsgBufSize int `json:"msgBufferSize"` // the maximim number of messages to buffer before sending to kafka, the library we use defaults to 100
|
||||
}
|
||||
|
||||
var ToKafkaSignature = flux.DefaultFunctionSignature()
|
||||
|
||||
func init() {
|
||||
ToKafkaSignature.Params["brokers"] = semantic.NewArrayType(semantic.String)
|
||||
ToKafkaSignature.Params["topic"] = semantic.String
|
||||
ToKafkaSignature.Params["balancer"] = semantic.String
|
||||
ToKafkaSignature.Params["name"] = semantic.String
|
||||
ToKafkaSignature.Params["nameColumn"] = semantic.String
|
||||
ToKafkaSignature.Params["timeColumn"] = semantic.String
|
||||
ToKafkaSignature.Params["tagColumns"] = semantic.NewArrayType(semantic.String)
|
||||
ToKafkaSignature.Params["valueColumns"] = semantic.NewArrayType(semantic.String)
|
||||
flux.RegisterFunctionWithSideEffect(ToKafkaKind, createToKafkaOpSpec, ToKafkaSignature)
|
||||
flux.RegisterOpSpec(ToKafkaKind,
|
||||
func() flux.OperationSpec { return &ToKafkaOpSpec{} })
|
||||
plan.RegisterProcedureSpec(ToKafkaKind, newToKafkaProcedure, ToKafkaKind)
|
||||
execute.RegisterTransformation(ToKafkaKind, createToKafkaTransformation)
|
||||
}
|
||||
|
||||
// DefaultKafkaWriterFactory is a terrible name for a way to make a kafkaWriter that is injectable for testing
|
||||
var DefaultKafkaWriterFactory = func(conf kafka.WriterConfig) KafkaWriter {
|
||||
return kafka.NewWriter(conf)
|
||||
}
|
||||
|
||||
// KafkaWriter is an interface for what we need fromDefaultKafkaWriterFactory
|
||||
type KafkaWriter interface {
|
||||
io.Closer
|
||||
WriteMessages(context.Context, ...kafka.Message) error
|
||||
}
|
||||
|
||||
// ReadArgs loads a flux.Arguments into ToKafkaOpSpec. It sets several default values.
|
||||
// If the time_column isn't set, it defaults to execute.TimeColLabel.
|
||||
// If the value_column isn't set it defaults to a []string{execute.DefaultValueColLabel}.
|
||||
func (o *ToKafkaOpSpec) ReadArgs(args flux.Arguments) error {
|
||||
var err error
|
||||
var ok bool
|
||||
|
||||
brokers, err := args.GetRequiredArray("brokers", semantic.String)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l := brokers.Len()
|
||||
|
||||
o.Brokers = make([]string, l)
|
||||
if brokers.Len() < 1 {
|
||||
return errors.New("at least one broker is required")
|
||||
}
|
||||
for i := 0; i < l; i++ {
|
||||
o.Brokers[i] = brokers.Get(i).Str()
|
||||
}
|
||||
|
||||
o.Topic, err = args.GetRequiredString("topic")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(o.Topic) == 0 {
|
||||
return errors.New("invalid topic name")
|
||||
}
|
||||
|
||||
o.Balancer, _, err = args.GetString("balancer")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
o.Name, ok, err = args.GetString("name")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.NameColumn, ok, err = args.GetString("nameColumn")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.NameColumn = "_measurement"
|
||||
}
|
||||
}
|
||||
o.TimeColumn, ok, err = args.GetString("timeColumn")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.TimeColumn = execute.DefaultTimeColLabel
|
||||
}
|
||||
tagColumns, ok, err := args.GetArray("tagColumns", semantic.String)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.TagColumns = o.TagColumns[:0]
|
||||
if ok {
|
||||
for i := 0; i < tagColumns.Len(); i++ {
|
||||
o.TagColumns = append(o.TagColumns, tagColumns.Get(i).Str())
|
||||
}
|
||||
sort.Strings(o.TagColumns)
|
||||
}
|
||||
valueColumns, ok, err := args.GetArray("valueColumns", semantic.String)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.ValueColumns = o.ValueColumns[:0]
|
||||
if !ok || valueColumns.Len() == 0 {
|
||||
o.ValueColumns = append(o.ValueColumns, execute.DefaultValueColLabel)
|
||||
} else {
|
||||
for i := 0; i < valueColumns.Len(); i++ {
|
||||
o.TagColumns = append(o.ValueColumns, valueColumns.Get(i).Str())
|
||||
}
|
||||
sort.Strings(o.TagColumns)
|
||||
}
|
||||
|
||||
msgBufSize, ok, err := args.GetInt("msgBufferSize")
|
||||
o.MsgBufSize = int(msgBufSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if o.MsgBufSize < 0 || !ok {
|
||||
o.MsgBufSize = 0 // so the library will set it to the default
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
func createToKafkaOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) {
|
||||
if err := a.AddParentFromArgs(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := new(ToKafkaOpSpec)
|
||||
if err := s.ReadArgs(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (ToKafkaOpSpec) Kind() flux.OperationKind {
|
||||
return ToKafkaKind
|
||||
}
|
||||
|
||||
type ToKafkaProcedureSpec struct {
|
||||
Spec *ToKafkaOpSpec
|
||||
balancer kafka.Balancer
|
||||
}
|
||||
|
||||
func (o *ToKafkaProcedureSpec) Kind() plan.ProcedureKind {
|
||||
return ToKafkaKind
|
||||
}
|
||||
func (o *ToKafkaProcedureSpec) Copy() plan.ProcedureSpec {
|
||||
s := o.Spec
|
||||
res := &ToKafkaProcedureSpec{
|
||||
Spec: &ToKafkaOpSpec{
|
||||
Brokers: append([]string(nil), s.Brokers...),
|
||||
Topic: s.Topic,
|
||||
Balancer: s.Balancer,
|
||||
Name: s.Name,
|
||||
NameColumn: s.NameColumn,
|
||||
TimeColumn: s.TimeColumn,
|
||||
TagColumns: append([]string(nil), s.TagColumns...),
|
||||
ValueColumns: append([]string(nil), s.ValueColumns...),
|
||||
},
|
||||
}
|
||||
switch s.Balancer {
|
||||
case "hash", "": //hash is default for compatibility with enterprise
|
||||
res.balancer = &kafka.Hash{}
|
||||
|
||||
case "round-robin":
|
||||
res.balancer = &kafka.RoundRobin{}
|
||||
|
||||
case "least-bytes":
|
||||
res.balancer = &kafka.LeastBytes{}
|
||||
}
|
||||
return res
|
||||
}
|
||||
func newToKafkaProcedure(qs flux.OperationSpec, a plan.Administration) (plan.ProcedureSpec, error) {
|
||||
spec, ok := qs.(*ToKafkaOpSpec)
|
||||
if !ok && spec != nil {
|
||||
return nil, fmt.Errorf("invalid spec type %T", qs)
|
||||
}
|
||||
return &ToKafkaProcedureSpec{Spec: spec}, nil
|
||||
}
|
||||
func createToKafkaTransformation(id execute.DatasetID, mode execute.AccumulationMode, spec plan.ProcedureSpec, a execute.Administration) (execute.Transformation, execute.Dataset, error) {
|
||||
s, ok := spec.(*ToKafkaProcedureSpec)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("invalid spec type %T", spec)
|
||||
}
|
||||
cache := execute.NewTableBuilderCache(a.Allocator())
|
||||
d := execute.NewDataset(id, mode, cache)
|
||||
t := NewToKafkaTransformation(d, cache, s)
|
||||
return t, d, nil
|
||||
}
|
||||
|
||||
type ToKafkaTransformation struct {
|
||||
d execute.Dataset
|
||||
cache execute.TableBuilderCache
|
||||
spec *ToKafkaProcedureSpec
|
||||
}
|
||||
|
||||
func (t *ToKafkaTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error {
|
||||
return t.d.RetractTable(key)
|
||||
}
|
||||
func NewToKafkaTransformation(d execute.Dataset, cache execute.TableBuilderCache, spec *ToKafkaProcedureSpec) *ToKafkaTransformation {
|
||||
return &ToKafkaTransformation{
|
||||
d: d,
|
||||
cache: cache,
|
||||
spec: spec,
|
||||
}
|
||||
}
|
||||
|
||||
type toKafkaMetric struct {
|
||||
tags []*protocol.Tag
|
||||
fields []*protocol.Field
|
||||
name string
|
||||
t time.Time
|
||||
}
|
||||
|
||||
func (m *toKafkaMetric) TagList() []*protocol.Tag {
|
||||
return m.tags
|
||||
}
|
||||
func (m *toKafkaMetric) FieldList() []*protocol.Field {
|
||||
return m.fields
|
||||
}
|
||||
func (m *toKafkaMetric) truncateTagsAndFields() {
|
||||
m.fields = m.fields[:0]
|
||||
m.tags = m.tags[:0]
|
||||
}
|
||||
func (m *toKafkaMetric) Name() string {
|
||||
return m.name
|
||||
}
|
||||
func (m *toKafkaMetric) Time() time.Time {
|
||||
return m.t
|
||||
}
|
||||
|
||||
func (t *ToKafkaTransformation) Process(id execute.DatasetID, tbl flux.Table) (err error) {
|
||||
w := DefaultKafkaWriterFactory(kafka.WriterConfig{
|
||||
Brokers: t.spec.Spec.Brokers,
|
||||
Topic: t.spec.Spec.Topic,
|
||||
Balancer: t.spec.balancer,
|
||||
BatchSize: t.spec.Spec.MsgBufSize,
|
||||
QueueCapacity: t.spec.Spec.MsgBufSize,
|
||||
})
|
||||
|
||||
defer func() {
|
||||
err2 := w.Close()
|
||||
// don't overwrite current error
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if err2 != nil {
|
||||
// allow Process to return the error from the defered Close()
|
||||
err = err2
|
||||
return
|
||||
}
|
||||
}()
|
||||
pr, pw := io.Pipe() // TODO: replce the pipe with something faster
|
||||
// I'd like a linereader in line-protocol
|
||||
m := &toKafkaMetric{}
|
||||
e := protocol.NewEncoder(pw)
|
||||
e.FailOnFieldErr(true)
|
||||
e.SetFieldSortOrder(protocol.SortFields)
|
||||
cols := tbl.Cols()
|
||||
labels := make(map[string]idxType, len(cols))
|
||||
for i, col := range cols {
|
||||
labels[col.Label] = idxType{Idx: i, Type: col.Type}
|
||||
}
|
||||
// do time
|
||||
timeColLabel := t.spec.Spec.TimeColumn
|
||||
timeColIdx, ok := labels[timeColLabel]
|
||||
if !ok {
|
||||
return errors.New("Could not get time column")
|
||||
}
|
||||
if timeColIdx.Type != flux.TTime {
|
||||
return fmt.Errorf("column %s is not of type %s", timeColLabel, timeColIdx.Type)
|
||||
}
|
||||
var measurementNameCol string
|
||||
if t.spec.Spec.Name == "" {
|
||||
measurementNameCol = t.spec.Spec.NameColumn
|
||||
}
|
||||
// check if each col is a tag or value and cache this value for the loop
|
||||
colMetadatas := tbl.Cols()
|
||||
isTag := make([]bool, len(colMetadatas))
|
||||
isValue := make([]bool, len(colMetadatas))
|
||||
for i, col := range colMetadatas {
|
||||
isValue[i] = sort.SearchStrings(t.spec.Spec.ValueColumns, col.Label) < len(t.spec.Spec.ValueColumns) && t.spec.Spec.ValueColumns[sort.SearchStrings(t.spec.Spec.ValueColumns, col.Label)] == col.Label
|
||||
isTag[i] = sort.SearchStrings(t.spec.Spec.TagColumns, col.Label) < len(t.spec.Spec.TagColumns) && t.spec.Spec.TagColumns[sort.SearchStrings(t.spec.Spec.TagColumns, col.Label)] == col.Label
|
||||
}
|
||||
m.name = t.spec.Spec.Name
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
err = tbl.Do(func(er flux.ColReader) error {
|
||||
l := er.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
m.truncateTagsAndFields()
|
||||
for j, col := range er.Cols() {
|
||||
switch {
|
||||
case col.Label == timeColLabel:
|
||||
m.t = er.Times(j)[i].Time()
|
||||
case measurementNameCol != "" && measurementNameCol == col.Label:
|
||||
if col.Type != flux.TString {
|
||||
return errors.New("invalid type for measurement column")
|
||||
}
|
||||
m.name = er.Strings(j)[i]
|
||||
case isTag[j]:
|
||||
if col.Type != flux.TString {
|
||||
return errors.New("invalid type for measurement column")
|
||||
}
|
||||
m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j)[i]})
|
||||
case isValue[j]:
|
||||
switch col.Type {
|
||||
case flux.TFloat:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Floats(j)[i]})
|
||||
case flux.TInt:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Ints(j)[i]})
|
||||
case flux.TUInt:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.UInts(j)[i]})
|
||||
case flux.TString:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j)[i]})
|
||||
case flux.TTime:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Times(j)[i]})
|
||||
case flux.TBool:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Bools(j)[i]})
|
||||
default:
|
||||
return fmt.Errorf("invalid type for column %s", col.Label)
|
||||
}
|
||||
}
|
||||
}
|
||||
_, err := e.Encode(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
pw.Close()
|
||||
wg.Done()
|
||||
}()
|
||||
// write the data to kafka
|
||||
{
|
||||
scan := bufio.NewScanner(pr)
|
||||
msgBuf := make([]kafka.Message, 128)
|
||||
i := 0
|
||||
// todo, make this a little more async
|
||||
for scan.Scan() {
|
||||
v := append([]byte(nil), scan.Bytes()...) // we do this since scan.Bytes()'s result can be overwritten by calls to Scan()
|
||||
if cap(msgBuf[i].Key) != 8 {
|
||||
msgBuf[i].Key = make([]byte, 8)
|
||||
}
|
||||
binary.LittleEndian.PutUint64(msgBuf[i].Key, xxhash.Sum64(v))
|
||||
msgBuf[i].Value = v
|
||||
if i == t.spec.Spec.MsgBufSize-1 {
|
||||
if err = w.WriteMessages(context.Background(), msgBuf...); err != nil {
|
||||
return err
|
||||
}
|
||||
msgBuf = msgBuf[:0]
|
||||
i = 0
|
||||
}
|
||||
i++
|
||||
}
|
||||
// send the remainder of the messages
|
||||
if len(msgBuf) > 0 {
|
||||
err = w.WriteMessages(context.Background(), msgBuf[:i]...)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *ToKafkaTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error {
|
||||
return t.d.UpdateWatermark(pt)
|
||||
}
|
||||
|
||||
func (t *ToKafkaTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error {
|
||||
return t.d.UpdateProcessingTime(pt)
|
||||
}
|
||||
|
||||
func (t *ToKafkaTransformation) Finish(id execute.DatasetID, err error) {
|
||||
t.d.Finish(err)
|
||||
}
|
||||
|
|
@ -1,435 +0,0 @@
|
|||
package functions_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/execute/executetest"
|
||||
"github.com/influxdata/flux/functions/inputs"
|
||||
"github.com/influxdata/flux/querytest"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
kafka "github.com/segmentio/kafka-go"
|
||||
)
|
||||
|
||||
// type kafkaClientMock = func
|
||||
|
||||
func TestToKafka_NewQuery(t *testing.T) {
|
||||
tests := []querytest.NewQueryTestCase{
|
||||
{
|
||||
Name: "from with database",
|
||||
Raw: `from(bucket:"mybucket") |> toKafka(brokers:["brokerurl:8989"], name:"series1", topic:"totallynotfaketopic")`,
|
||||
Want: &flux.Spec{
|
||||
Operations: []*flux.Operation{
|
||||
{
|
||||
ID: "from0",
|
||||
Spec: &inputs.FromOpSpec{
|
||||
Bucket: "mybucket",
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "toKafka1",
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic", //Balancer: &kafka.Hash{},
|
||||
Name: "series1",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{execute.DefaultValueColLabel},
|
||||
},
|
||||
},
|
||||
},
|
||||
Edges: []flux.Edge{
|
||||
{Parent: "from0", Child: "toKafka1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
tc := tc
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
querytest.NewQueryTestHelper(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type kafkaMock struct {
|
||||
sync.Mutex
|
||||
data [][]kafka.Message
|
||||
}
|
||||
|
||||
func (k *kafkaMock) reset() {
|
||||
k.Lock()
|
||||
k.data = [][]kafka.Message{}
|
||||
k.Unlock()
|
||||
}
|
||||
|
||||
func (k *kafkaMock) Close() error { return nil }
|
||||
|
||||
func (k *kafkaMock) WriteMessages(_ context.Context, msgs ...kafka.Message) error {
|
||||
k.Lock()
|
||||
k.data = append(k.data, msgs)
|
||||
k.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestToKafka_Process(t *testing.T) {
|
||||
data := &kafkaMock{}
|
||||
functions.DefaultKafkaWriterFactory = func(_ kafka.WriterConfig) functions.KafkaWriter {
|
||||
return data
|
||||
}
|
||||
|
||||
type wanted struct {
|
||||
Table []*executetest.Table
|
||||
Result [][]kafka.Message
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
spec *functions.ToKafkaProcedureSpec
|
||||
data []flux.Table
|
||||
want wanted
|
||||
}{
|
||||
{
|
||||
name: "coltable with name in _measurement",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
NameColumn: "_measurement",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{execute.CopyTable(&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_measurement", Type: flux.TString},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), "a", 2.0, "one"},
|
||||
{execute.Time(21), "a", 2.0, "one"},
|
||||
{execute.Time(21), "b", 1.0, "seven"},
|
||||
{execute.Time(31), "a", 3.0, "nine"},
|
||||
{execute.Time(41), "c", 4.0, "elevendyone"},
|
||||
},
|
||||
}, executetest.UnlimitedAllocator)},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("a _value=2 11"), Key: []byte{0xf1, 0xb0, 0x29, 0xd7, 0x9d, 0x04, 0x31, 0x7c}},
|
||||
{Value: []byte("a _value=2 21"), Key: []byte{0xb5, 0xc2, 0xe4, 0x78, 0x95, 0xe0, 0x62, 0x66}},
|
||||
{Value: []byte("b _value=1 21"), Key: []byte{0x0e, 0x62, 0x4e, 0xe7, 0x36, 0xac, 0x77, 0xf3}},
|
||||
{Value: []byte("a _value=3 31"), Key: []byte{0xf5, 0xd5, 0x22, 0x4d, 0x27, 0x9d, 0x8d, 0xb5}},
|
||||
{Value: []byte("c _value=4 41"), Key: []byte{0x05, 0x5b, 0xc5, 0x41, 0x67, 0x78, 0x04, 0xda}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one table with measurement name in _measurement",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
NameColumn: "_measurement",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_measurement", Type: flux.TString},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), "a", 2.0, "one"},
|
||||
{execute.Time(21), "a", 2.0, "one"},
|
||||
{execute.Time(21), "b", 1.0, "seven"},
|
||||
{execute.Time(31), "a", 3.0, "nine"},
|
||||
{execute.Time(41), "c", 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("a _value=2 11"), Key: []byte{0xf1, 0xb0, 0x29, 0xd7, 0x9d, 0x04, 0x31, 0x7c}},
|
||||
{Value: []byte("a _value=2 21"), Key: []byte{0xb5, 0xc2, 0xe4, 0x78, 0x95, 0xe0, 0x62, 0x66}},
|
||||
{Value: []byte("b _value=1 21"), Key: []byte{0x0e, 0x62, 0x4e, 0xe7, 0x36, 0xac, 0x77, 0xf3}},
|
||||
{Value: []byte("a _value=3 31"), Key: []byte{0xf5, 0xd5, 0x22, 0x4d, 0x27, 0x9d, 0x8d, 0xb5}},
|
||||
{Value: []byte("c _value=4 41"), Key: []byte{0x05, 0x5b, 0xc5, 0x41, 0x67, 0x78, 0x04, 0xda}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one table with measurement name in _measurement and tag",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
NameColumn: "_measurement",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_measurement", Type: flux.TString},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), "a", 2.0, "one"},
|
||||
{execute.Time(21), "a", 2.0, "one"},
|
||||
{execute.Time(21), "b", 1.0, "seven"},
|
||||
{execute.Time(31), "a", 3.0, "nine"},
|
||||
{execute.Time(41), "c", 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("a,fred=one _value=2 11"), Key: []byte{0xe9, 0xde, 0xc5, 0x1e, 0xfb, 0x26, 0x77, 0xfe}},
|
||||
{Value: []byte("a,fred=one _value=2 21"), Key: []byte{0x52, 0x6d, 0x0a, 0xe8, 0x1d, 0xb3, 0xe5, 0xeb}},
|
||||
{Value: []byte("b,fred=seven _value=1 21"), Key: []byte{0x18, 0x91, 0xed, 0x7e, 0x79, 0x5c, 0xc2, 0xe3}},
|
||||
{Value: []byte("a,fred=nine _value=3 31"), Key: []byte{0x75, 0x15, 0xe5, 0x3e, 0xdd, 0xfd, 0x4f, 0x9a}},
|
||||
{Value: []byte("c,fred=elevendyone _value=4 41"), Key: []byte{0xd4, 0xc9, 0xca, 0xea, 0xa6, 0x8d, 0x14, 0x4b}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one table",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
Name: "one_block",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0},
|
||||
{execute.Time(21), 1.0},
|
||||
{execute.Time(31), 3.0},
|
||||
{execute.Time(41), 4.0},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("one_block _value=2 11"), Key: []byte{0x92, 0x7e, 0x77, 0xb1, 0x2c, 0x35, 0x13, 0x12}},
|
||||
{Value: []byte("one_block _value=1 21"), Key: []byte{0x39, 0x39, 0xb2, 0x11, 0xd1, 0x1b, 0x44, 0x57}},
|
||||
{Value: []byte("one_block _value=3 31"), Key: []byte{0xa2, 0xc1, 0x71, 0x42, 0xa8, 0xbb, 0x91, 0x67}},
|
||||
{Value: []byte("one_block _value=4 41"), Key: []byte{0x82, 0x3b, 0x2e, 0x58, 0xec, 0x53, 0x62, 0x4e}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one table with unused tag",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
Name: "one_block_w_unused_tag",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
{execute.Time(41), 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("one_block_w_unused_tag _value=2 11"), Key: []byte{0x62, 0xda, 0xe8, 0xc3, 0x2b, 0x88, 0x74, 0x54}},
|
||||
{Value: []byte("one_block_w_unused_tag _value=1 21"), Key: []byte{0xff, 0x23, 0xa3, 0x84, 0xe4, 0xcb, 0x77, 0x79}},
|
||||
{Value: []byte("one_block_w_unused_tag _value=3 31"), Key: []byte{0xc5, 0x02, 0x43, 0x34, 0x66, 0xb6, 0x43, 0x87}},
|
||||
{Value: []byte("one_block_w_unused_tag _value=4 41"), Key: []byte{0x65, 0xd6, 0x94, 0x12, 0xfa, 0x92, 0x30, 0xff}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one table with tag",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
Name: "one_block_w_tag",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
{execute.Time(41), 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("one_block_w_tag,fred=one _value=2 11"), Key: []byte{0xca, 0xc3, 0xec, 0x04, 0x42, 0xec, 0x85, 0x84}},
|
||||
{Value: []byte("one_block_w_tag,fred=seven _value=1 21"), Key: []byte{0x6c, 0x2b, 0xb7, 0xf8, 0x98, 0xce, 0x12, 0x64}},
|
||||
{Value: []byte("one_block_w_tag,fred=nine _value=3 31"), Key: []byte{0x41, 0x73, 0x13, 0xd6, 0x5c, 0xf1, 0x18, 0xd3}},
|
||||
{Value: []byte("one_block_w_tag,fred=elevendyone _value=4 41"), Key: []byte{0x83, 0x42, 0x25, 0x68, 0x66, 0x44, 0x67, 0x14}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multi table",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
Name: "multi_block",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{
|
||||
&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
},
|
||||
},
|
||||
&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(51), 2.0, "one"},
|
||||
{execute.Time(61), 1.0, "seven"},
|
||||
{execute.Time(71), 3.0, "nine"},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("multi_block,fred=one _value=2 11"), Key: []byte{0x41, 0x9d, 0x7f, 0x17, 0xc8, 0x21, 0xfb, 0x69}},
|
||||
{Value: []byte("multi_block,fred=seven _value=1 21"), Key: []byte{0x8f, 0x83, 0x72, 0x66, 0x7b, 0x78, 0x77, 0x18}},
|
||||
{Value: []byte("multi_block,fred=nine _value=3 31"), Key: []byte{0x1c, 0x4a, 0x50, 0x5f, 0xa1, 0xfc, 0xf3, 0x56}},
|
||||
}, {
|
||||
{Value: []byte("multi_block,fred=one _value=2 51"), Key: []byte{0x77, 0x44, 0x9c, 0x9c, 0x68, 0xca, 0xc1, 0x13}},
|
||||
{Value: []byte("multi_block,fred=seven _value=1 61"), Key: []byte{0x48, 0x23, 0xfe, 0x07, 0x61, 0x79, 0x09, 0x74}},
|
||||
{Value: []byte("multi_block,fred=nine _value=3 71"), Key: []byte{0x74, 0xc5, 0xa3, 0x42, 0xcb, 0x91, 0x99, 0x7c}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multi collist tables",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
Name: "multi_collist_blocks",
|
||||
},
|
||||
},
|
||||
data: []flux.Table{
|
||||
execute.CopyTable(
|
||||
&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
},
|
||||
}, executetest.UnlimitedAllocator),
|
||||
&executetest.Table{
|
||||
ColMeta: []flux.ColMeta{
|
||||
{Label: "_time", Type: flux.TTime},
|
||||
{Label: "_value", Type: flux.TFloat},
|
||||
{Label: "fred", Type: flux.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(51), 2.0, "one"},
|
||||
{execute.Time(61), 1.0, "seven"},
|
||||
{execute.Time(71), 3.0, "nine"},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: wanted{
|
||||
Table: []*executetest.Table(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("multi_collist_blocks,fred=one _value=2 11"), Key: []byte{0xfc, 0xab, 0xa3, 0x68, 0x81, 0x48, 0x7d, 0x8a}},
|
||||
{Value: []byte("multi_collist_blocks,fred=seven _value=1 21"), Key: []byte{0x9f, 0xe1, 0x82, 0x97, 0x49, 0x92, 0x56, 0x1a}},
|
||||
{Value: []byte("multi_collist_blocks,fred=nine _value=3 31"), Key: []byte{0x73, 0x3c, 0x1a, 0x62, 0xfa, 0x01, 0xcd, 0xa7}},
|
||||
}, {
|
||||
{Value: []byte("multi_collist_blocks,fred=one _value=2 51"), Key: []byte{0xb9, 0x23, 0xd6, 0x3a, 0x7e, 0x71, 0xa6, 0xde}},
|
||||
{Value: []byte("multi_collist_blocks,fred=seven _value=1 61"), Key: []byte{0x0a, 0x70, 0x1f, 0xbe, 0xfd, 0x40, 0x2f, 0xd8}},
|
||||
{Value: []byte("multi_collist_blocks,fred=nine _value=3 71"), Key: []byte{0x67, 0x4b, 0xf0, 0xf1, 0xb0, 0xf5, 0x99, 0x5a}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
executetest.ProcessTestHelper(
|
||||
t,
|
||||
tc.data,
|
||||
tc.want.Table,
|
||||
nil,
|
||||
func(d execute.Dataset, c execute.TableBuilderCache) execute.Transformation {
|
||||
return functions.NewToKafkaTransformation(d, c, tc.spec)
|
||||
},
|
||||
)
|
||||
if !cmp.Equal(tc.want.Result, data.data, cmpopts.EquateNaNs()) {
|
||||
t.Log(cmp.Diff(tc.want.Result, data.data))
|
||||
t.Fail()
|
||||
}
|
||||
data.reset()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -214,9 +214,9 @@ func (r *queryTable) Key() flux.GroupKey {
|
|||
}
|
||||
cols[j] = colMeta[idx]
|
||||
kvs[j] = "string"
|
||||
v, err := values.NewValue(kvs[j], execute.ConvertToKind(cols[j].Type))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
v := values.New(kvs[j])
|
||||
if v == values.InvalidValue {
|
||||
panic(fmt.Sprintf("unsupported value kind %T", kvs[j]))
|
||||
}
|
||||
vs[j] = v
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,11 +13,11 @@ func init() {
|
|||
func taskObject() values.Object {
|
||||
obj := values.NewObject()
|
||||
|
||||
obj.Set("name", values.NewStringValue(""))
|
||||
obj.Set("cron", values.NewStringValue(""))
|
||||
obj.Set("every", values.NewDurationValue(0))
|
||||
obj.Set("delay", values.NewDurationValue(0))
|
||||
obj.Set("concurrency", values.NewIntValue(0))
|
||||
obj.Set("retry", values.NewIntValue(0))
|
||||
obj.Set("name", values.NewString(""))
|
||||
obj.Set("cron", values.NewString(""))
|
||||
obj.Set("every", values.NewDuration(0))
|
||||
obj.Set("delay", values.NewDuration(0))
|
||||
obj.Set("concurrency", values.NewInt(0))
|
||||
obj.Set("retry", values.NewInt(0))
|
||||
return obj
|
||||
}
|
||||
|
|
|
|||
|
|
@ -334,7 +334,7 @@ const (
|
|||
valueColIdx = 3
|
||||
)
|
||||
|
||||
func determineTableColsForSeries(tags models.Tags, typ flux.DataType) ([]flux.ColMeta, [][]byte) {
|
||||
func determineTableColsForSeries(tags models.Tags, typ flux.ColType) ([]flux.ColMeta, [][]byte) {
|
||||
cols := make([]flux.ColMeta, 4+len(tags))
|
||||
defs := make([][]byte, 4+len(tags))
|
||||
cols[startColIdx] = flux.ColMeta{
|
||||
|
|
@ -370,12 +370,12 @@ func groupKeyForSeries(tags models.Tags, readSpec *fstorage.ReadSpec, bnds execu
|
|||
Label: execute.DefaultStartColLabel,
|
||||
Type: flux.TTime,
|
||||
}
|
||||
vs[0] = values.NewTimeValue(bnds.Start)
|
||||
vs[0] = values.NewTime(bnds.Start)
|
||||
cols[1] = flux.ColMeta{
|
||||
Label: execute.DefaultStopColLabel,
|
||||
Type: flux.TTime,
|
||||
}
|
||||
vs[1] = values.NewTimeValue(bnds.Stop)
|
||||
vs[1] = values.NewTime(bnds.Stop)
|
||||
switch readSpec.GroupMode {
|
||||
case fstorage.GroupModeBy:
|
||||
// group key in GroupKeys order, including tags in the GroupKeys slice
|
||||
|
|
@ -387,7 +387,7 @@ func groupKeyForSeries(tags models.Tags, readSpec *fstorage.ReadSpec, bnds execu
|
|||
Label: k,
|
||||
Type: flux.TString,
|
||||
})
|
||||
vs = append(vs, values.NewStringValue(string(t.Value)))
|
||||
vs = append(vs, values.NewString(string(t.Value)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -400,13 +400,13 @@ func groupKeyForSeries(tags models.Tags, readSpec *fstorage.ReadSpec, bnds execu
|
|||
Label: string(tags[i].Key),
|
||||
Type: flux.TString,
|
||||
})
|
||||
vs = append(vs, values.NewStringValue(string(tags[i].Value)))
|
||||
vs = append(vs, values.NewString(string(tags[i].Value)))
|
||||
}
|
||||
}
|
||||
return execute.NewGroupKey(cols, vs)
|
||||
}
|
||||
|
||||
func determineTableColsForGroup(tagKeys [][]byte, typ flux.DataType) ([]flux.ColMeta, [][]byte) {
|
||||
func determineTableColsForGroup(tagKeys [][]byte, typ flux.ColType) ([]flux.ColMeta, [][]byte) {
|
||||
cols := make([]flux.ColMeta, 4+len(tagKeys))
|
||||
defs := make([][]byte, 4+len(tagKeys))
|
||||
cols[startColIdx] = flux.ColMeta{
|
||||
|
|
@ -443,18 +443,18 @@ func groupKeyForGroup(kv [][]byte, readSpec *fstorage.ReadSpec, bnds execute.Bou
|
|||
Label: execute.DefaultStartColLabel,
|
||||
Type: flux.TTime,
|
||||
}
|
||||
vs[0] = values.NewTimeValue(bnds.Start)
|
||||
vs[0] = values.NewTime(bnds.Start)
|
||||
cols[1] = flux.ColMeta{
|
||||
Label: execute.DefaultStopColLabel,
|
||||
Type: flux.TTime,
|
||||
}
|
||||
vs[1] = values.NewTimeValue(bnds.Stop)
|
||||
vs[1] = values.NewTime(bnds.Stop)
|
||||
for i := range readSpec.GroupKeys {
|
||||
cols = append(cols, flux.ColMeta{
|
||||
Label: readSpec.GroupKeys[i],
|
||||
Type: flux.TString,
|
||||
})
|
||||
vs = append(vs, values.NewStringValue(string(kv[i])))
|
||||
vs = append(vs, values.NewString(string(kv[i])))
|
||||
}
|
||||
return execute.NewGroupKey(cols, vs)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -168,11 +168,11 @@ var _ flux.Result = (*fakeResult)(nil)
|
|||
|
||||
func newFakeResult() *fakeResult {
|
||||
meta := []flux.ColMeta{{Label: "x", Type: flux.TInt}}
|
||||
vals := []values.Value{values.NewIntValue(int64(1))}
|
||||
vals := []values.Value{values.NewInt(int64(1))}
|
||||
gk := execute.NewGroupKey(meta, vals)
|
||||
a := &execute.Allocator{Limit: math.MaxInt64}
|
||||
b := execute.NewColListTableBuilder(gk, a)
|
||||
i := b.AddCol(meta[0])
|
||||
i, _ := b.AddCol(meta[0])
|
||||
b.AppendInt(i, int64(1))
|
||||
t, err := b.Table()
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Reference in New Issue