diff --git a/go.mod b/go.mod index 479f84ce4c..258da5c979 100644 --- a/go.mod +++ b/go.mod @@ -71,7 +71,6 @@ require ( github.com/prometheus/client_golang v1.0.0 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 github.com/prometheus/common v0.6.0 - github.com/prometheus/procfs v0.0.3 // indirect github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b github.com/spf13/cast v1.3.0 github.com/spf13/cobra v1.0.0 @@ -104,6 +103,7 @@ require ( gopkg.in/vmihailenco/msgpack.v2 v2.9.1 // indirect gopkg.in/yaml.v3 v3.0.0-20200121175148-a6ecf24a6d71 honnef.co/go/tools v0.0.1-2019.2.3.0.20190904154718-afd67930eec2 + istio.io/pkg v0.0.0-20200606170016-70c5172b9cdf labix.org/v2/mgo v0.0.0-20140701140051-000000000287 // indirect launchpad.net/gocheck v0.0.0-20140225173054-000000000087 // indirect ) diff --git a/go.sum b/go.sum index 7f8f36dade..cf2c6f79e3 100644 --- a/go.sum +++ b/go.sum @@ -77,6 +77,8 @@ github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQ github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM= github.com/RoaringBitmap/roaring v0.4.16 h1:NholfewybRLOwACgfqfzn/N5xa6keKNs4fP00t0cwLo= github.com/RoaringBitmap/roaring v0.4.16/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w= +github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/alecthomas/kingpin v2.2.6+incompatible/go.mod h1:59OFYbFVLKQKq+mqrL6Rw5bR0c3ACQaawgXx0QYndlE= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU= @@ -91,6 +93,7 @@ github.com/aokoli/goutils v1.0.1 h1:7fpzNGoJ3VA8qcrm++XEE1QUe0mIwNeLa02Nwq7RDkg= github.com/aokoli/goutils v1.0.1/go.mod h1:SijmP0QR8LtwsmDs8Yii5Z/S4trXFGFC2oO5g9DP+DQ= github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db h1:nxAtV4VajJDhKysp2kdcJZsq8Ss1xSA0vZTkVHHJd0E= github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= +github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apex/log v1.1.4 h1:3Zk+boorIQAAGBrHn0JUtAau4ihMamT4WdnfdnXM1zQ= github.com/apex/log v1.1.4/go.mod h1:AlpoD9aScyQfJDVHmLMEcx4oU6LqzkWp4Mg9GdAcEvQ= github.com/apex/logs v0.0.4/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo= @@ -115,6 +118,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLM github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/blakesmith/ar v0.0.0-20190502131153-809d4375e1fb h1:m935MPodAbYS46DG4pJSv7WO+VECIWUQ7OJYSoTrMh4= github.com/blakesmith/ar v0.0.0-20190502131153-809d4375e1fb/go.mod h1:PkYb9DJNAwrSvRx5DYA+gUcOIgTGVMNkfSCbZM8cWpI= @@ -181,6 +186,9 @@ github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0= github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190819115812-1474bdeaf2a2 h1:BoejGRtu+FygJB/0ZpkhTSmaM7QbsPxFgcspAbTElNI= @@ -260,6 +268,7 @@ github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/gogo/protobuf v1.2.2-0.20190730201129-28a6bbf47e48/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/golang/gddo v0.0.0-20181116215533-9bd4a3295021 h1:HYV500jCgk+IC68L5sWrLFIWMpaUFfXXpJSAb7XOoBk= @@ -375,6 +384,7 @@ github.com/goreleaser/nfpm v1.2.1 h1:AEnu9XVmupRDTR930Z2rAs31Mj6sLIPxFcR9ESYvgDA github.com/goreleaser/nfpm v1.2.1/go.mod h1:TtWrABZozuLOttX2uDlYyECfQX7x5XYkVxhjYcR6G9w= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gostaticanalysis/analysisutil v0.0.0-20190318220348-4088753ea4d3 h1:JVnpOZS+qxli+rgVl98ILOXVNbW+kb5wcxeGx8ShUIw= github.com/gostaticanalysis/analysisutil v0.0.0-20190318220348-4088753ea4d3/go.mod h1:eEOZF4jCKGi+aprrirO9e7WKB3beBRtWgqGunKl6pKE= @@ -430,6 +440,7 @@ github.com/hashicorp/vault/sdk v0.1.8 h1:pfF3KwA1yPlfpmcumNsFM4uo91WMasX5gTuIkIt github.com/hashicorp/vault/sdk v0.1.8/go.mod h1:tHZfc6St71twLizWNHvnnbiGFo1aq0eD2jGPLtP8kAU= github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= +github.com/howeyc/fsnotify v0.9.0/go.mod h1:41HzSPxBGeFRQKEEwgh49TRw/nKBsYZ2cF1OzPjSJsA= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huandu/xstrings v1.0.0 h1:pO2K/gKgKaat5LdpAhxhluX2GPQMaI3W5FUz/I/UnWk= @@ -579,6 +590,8 @@ github.com/mozilla/tls-observatory v0.0.0-20190404164649-a3c1b6cfecfd/go.mod h1: github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae h1:VeRdUYdCw49yizlSbMEn2SZ+gT+3IUKx8BqxyQdz+BY= github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM= +github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk= github.com/nats-io/gnatsd v1.3.0 h1:+5d80klu3QaJgNbdavVBjWJP7cHd11U2CLnRTFM9ICI= github.com/nats-io/gnatsd v1.3.0/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ= github.com/nats-io/go-nats v1.7.0 h1:oQOfHcLr8hb43QG8yeVyY2jtarIaTjOv41CGdF3tTvQ= @@ -598,8 +611,10 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn github.com/olekukonko/tablewriter v0.0.4 h1:vHD/YYe1Wolo78koG299f7V/VAS08c6IpCLn+Ejf/w8= github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.11.0 h1:JAKSXpt1YjtLA7YpPiqO9ss6sNXEsPfSGdwN0UHqzrw= github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.8.1 h1:C5Dqfs/LeauYDX0jJXIe2SWmwCbGzx9yF8C8xy3Lh34= github.com/onsi/gomega v1.8.1/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= @@ -613,6 +628,7 @@ github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= @@ -636,27 +652,35 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= +github.com/prometheus/client_golang v0.9.4/go.mod h1:oCXIBxdI62A4cR6aTRJCgetEjecSIYzOEaeAn4iYEpM= github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkpEo= github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= -github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE= -github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= +github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8= +github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= +github.com/prometheus/prom2json v1.1.0/go.mod h1:v7OY1795b9fEUZgq4UU2+15YjRv0LfpxKejIQCy3L7o= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/quasilyte/go-consistent v0.0.0-20190521200055-c6f3937de18c/go.mod h1:5STLWrekHfjyYwxBRVRXNOSewLJ3PWfDJd1VyTS21fI= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.1.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -798,6 +822,8 @@ github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZ go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0= +go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= +go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0 h1:C9hSCOW830chIVkdja34wa6Ky+IzWllkUinR+BtRZd4= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= @@ -880,6 +906,7 @@ golang.org/x/net v0.0.0-20181108082009-03003ca0c849/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -894,6 +921,7 @@ golang.org/x/net v0.0.0-20190619014844-b5b0513f8c1b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -926,7 +954,9 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181228144115-9a3f9b0469bb h1:pf3XwC90UUdNPYWZdFjhGBE7DUFuK3Ct1zWmZ65QN30= golang.org/x/sys v0.0.0-20181228144115-9a3f9b0469bb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -946,6 +976,7 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190620070143-6f217b454f45/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/xC2Run6RzeW1SyHxpc= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1030,6 +1061,7 @@ gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca h1:PupagGYwj8+I4ubCxcmcBRk gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6 h1:4WsZyVtkthqrHTbDCJfiTs8IWNYE4uvsSDgaV6xpp+o= gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= +google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.5.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.6.0/go.mod h1:btoxGiFvQNVUZQ8W08zLtrVS08CNpINPEfxXxgJL1Q4= @@ -1110,6 +1142,8 @@ gopkg.in/ini.v1 v1.46.0 h1:VeDZbLYGaupuvIrsYCEOe/L/2Pcs5n7hdO1ZTjporag= gopkg.in/ini.v1 v1.46.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/square/go-jose.v2 v2.3.1 h1:SK5KegNXmKmqE342YYN2qPHEnUYeoMiXXl1poUlI+o4= gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= @@ -1141,6 +1175,9 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2019.2.3.0.20190904154718-afd67930eec2 h1:TeRFic5UJx5LXC3n8mOt92bqyM5J+so5EuErnxK4PAk= honnef.co/go/tools v0.0.1-2019.2.3.0.20190904154718-afd67930eec2/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= +istio.io/api v0.0.0-20190515205759-982e5c3888c6/go.mod h1:hhLFQmpHia8zgaM37vb2ml9iS5NfNfqZGRt1pS9aVEo= +istio.io/pkg v0.0.0-20200606170016-70c5172b9cdf h1:iNpiPvg8fcQxebYPrd9Dhjvqd+SIF7OFH+1qp89/nHQ= +istio.io/pkg v0.0.0-20200606170016-70c5172b9cdf/go.mod h1:EwvmercDF5DLCg5qUQlkM40xHwCxGoY1H/2LhI1p2YU= labix.org/v2/mgo v0.0.0-20140701140051-000000000287 h1:L0cnkNl4TfAXzvdrqsYEmxOHOCv2p5I3taaReO8BWFs= labix.org/v2/mgo v0.0.0-20140701140051-000000000287/go.mod h1:Lg7AYkt1uXJoR9oeSZ3W/8IXLdvOfIITgZnommstyz4= launchpad.net/gocheck v0.0.0-20140225173054-000000000087 h1:Izowp2XBH6Ya6rv+hqbceQyw/gSGoXfH/UPoTGduL54= diff --git a/http/api_handler.go b/http/api_handler.go index f3d4f98105..b1c6fab79c 100644 --- a/http/api_handler.go +++ b/http/api_handler.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/influxdb/v2/kit/feature" "github.com/influxdata/influxdb/v2/kit/prom" kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" + "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/query" "github.com/influxdata/influxdb/v2/storage" "github.com/prometheus/client_golang/prometheus" @@ -203,9 +204,11 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler { writeBackend := NewWriteBackend(b.Logger.With(zap.String("handler", "write")), b) h.Mount(prefixWrite, NewWriteHandler(b.Logger, writeBackend, WithMaxBatchSizeBytes(b.MaxBatchSizeBytes), - WithParserMaxBytes(b.WriteParserMaxBytes), - WithParserMaxLines(b.WriteParserMaxLines), - WithParserMaxValues(b.WriteParserMaxValues), + WithParserOptions( + models.WithParserMaxBytes(b.WriteParserMaxBytes), + models.WithParserMaxLines(b.WriteParserMaxLines), + models.WithParserMaxValues(b.WriteParserMaxValues), + ), )) for _, o := range opts { diff --git a/http/requests.go b/http/requests.go index 35c36fa56c..994ea619a2 100644 --- a/http/requests.go +++ b/http/requests.go @@ -26,9 +26,7 @@ const ( // the name. It interprets the &org= parameter as either the name // or the ID. func queryOrganization(ctx context.Context, r *http.Request, svc platform.OrganizationService) (o *platform.Organization, err error) { - filter := platform.OrganizationFilter{} - if organization := r.URL.Query().Get(Org); organization != "" { if id, err := platform.IDFromString(organization); err == nil { filter.ID = id diff --git a/http/write_handler.go b/http/write_handler.go index 36da62322d..5742e530f0 100644 --- a/http/write_handler.go +++ b/http/write_handler.go @@ -13,12 +13,15 @@ import ( "github.com/influxdata/influxdb/v2" pcontext "github.com/influxdata/influxdb/v2/context" "github.com/influxdata/influxdb/v2/http/metric" + kitio "github.com/influxdata/influxdb/v2/kit/io" "github.com/influxdata/influxdb/v2/kit/tracing" kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/storage" "github.com/influxdata/influxdb/v2/tsdb" + "github.com/opentracing/opentracing-go" "go.uber.org/zap" + "istio.io/pkg/log" ) var ( @@ -55,22 +58,16 @@ func NewWriteBackend(log *zap.Logger, b *APIBackend) *WriteBackend { // WriteHandler receives line protocol and sends to a publish function. type WriteHandler struct { - *httprouter.Router influxdb.HTTPErrorHandler - log *zap.Logger - BucketService influxdb.BucketService OrganizationService influxdb.OrganizationService + PointsWriter storage.PointsWriter + EventRecorder metric.EventRecorder - PointsWriter storage.PointsWriter - - EventRecorder metric.EventRecorder - + router *httprouter.Router + log *zap.Logger maxBatchSizeBytes int64 parserOptions []models.ParserOption - parserMaxBytes int - parserMaxLines int - parserMaxValues int } // WriteHandlerOption is a functional option for a *WriteHandler @@ -84,27 +81,9 @@ func WithMaxBatchSizeBytes(n int64) WriteHandlerOption { } } -// WithParserMaxBytes specifies the maximum number of bytes that may be allocated when processing a single -// write request. When n is zero, there is no limit. -func WithParserMaxBytes(n int) WriteHandlerOption { +func WithParserOptions(opts ...models.ParserOption) WriteHandlerOption { return func(w *WriteHandler) { - w.parserMaxBytes = n - } -} - -// WithParserMaxLines specifies the maximum number of lines that may be parsed when processing a single -// write request. When n is zero, there is no limit. -func WithParserMaxLines(n int) WriteHandlerOption { - return func(w *WriteHandler) { - w.parserMaxLines = n - } -} - -// WithParserMaxValues specifies the maximum number of values that may be parsed when processing a single -// write request. When n is zero, there is no limit. -func WithParserMaxValues(n int) WriteHandlerOption { - return func(w *WriteHandler) { - w.parserMaxValues = n + w.parserOptions = opts } } @@ -114,41 +93,59 @@ func (*WriteHandler) Prefix() string { } const ( - prefixWrite = "/api/v2/write" - errInvalidGzipHeader = "gzipped HTTP body contains an invalid header" - errInvalidPrecision = "invalid precision; valid precision units are ns, us, ms, and s" + prefixWrite = "/api/v2/write" + msgInvalidGzipHeader = "gzipped HTTP body contains an invalid header" + msgInvalidPrecision = "invalid precision; valid precision units are ns, us, ms, and s" + msgUnableToReadData = "unable to read data" + msgWritingRequiresPoints = "writing requires points" + msgUnexpectedWriteError = "unexpected error writing points to database" + + opPointsWriter = "http/pointsWriter" + opWriteHandler = "http/writeHandler" ) // NewWriteHandler creates a new handler at /api/v2/write to receive line protocol. func NewWriteHandler(log *zap.Logger, b *WriteBackend, opts ...WriteHandlerOption) *WriteHandler { h := &WriteHandler{ - Router: NewRouter(b.HTTPErrorHandler), - HTTPErrorHandler: b.HTTPErrorHandler, - log: log, - + HTTPErrorHandler: b.HTTPErrorHandler, PointsWriter: b.PointsWriter, BucketService: b.BucketService, OrganizationService: b.OrganizationService, EventRecorder: b.WriteEventRecorder, + + router: NewRouter(b.HTTPErrorHandler), + log: log, } for _, opt := range opts { opt(h) } - // cache configured options - if h.parserMaxBytes > 0 { - h.parserOptions = append(h.parserOptions, models.WithParserMaxBytes(h.parserMaxBytes)) - } - if h.parserMaxLines > 0 { - h.parserOptions = append(h.parserOptions, models.WithParserMaxLines(h.parserMaxLines)) - } - if h.parserMaxValues > 0 { - h.parserOptions = append(h.parserOptions, models.WithParserMaxValues(h.parserMaxValues)) + h.router.HandlerFunc(http.MethodPost, prefixWrite, h.handleWrite) + return h +} + +func (h *WriteHandler) findBucket(ctx context.Context, orgID influxdb.ID, bucket string) (*influxdb.Bucket, error) { + if id, err := influxdb.IDFromString(bucket); err == nil { + b, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{ + OrganizationID: &orgID, + ID: id, + }) + if err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound { + return nil, err + } else if err == nil { + return b, err + } } - h.HandlerFunc("POST", prefixWrite, h.handleWrite) - return h + return h.BucketService.FindBucket(ctx, influxdb.BucketFilter{ + OrganizationID: &orgID, + Name: &bucket, + }) +} + +func (h *WriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.router.ServeHTTP(w, r) } func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { @@ -156,144 +153,164 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { defer span.Finish() ctx := r.Context() - defer r.Body.Close() + auth, err := pcontext.GetAuthorizer(ctx) + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } - // TODO(desa): I really don't like how we're recording the usage metrics here - // Ideally this will be moved when we solve https://github.com/influxdata/influxdb/issues/13403 - var ( - orgID influxdb.ID - requestBytes int - sw = kithttp.NewStatusResponseWriter(w) - handleError = func(err error, code, message string) { - h.HandleHTTPError(ctx, &influxdb.Error{ - Code: code, - Op: "http/handleWrite", - Msg: message, - Err: err, - }, w) - } - ) - w = sw + req, err := decodeWriteRequest(ctx, r, h.maxBatchSizeBytes) + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + + org, err := queryOrganization(ctx, r, h.OrganizationService) + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + span.LogKV("org_id", org.ID) + + sw := kithttp.NewStatusResponseWriter(w) + recorder := NewWriteUsageRecorder(sw, h.EventRecorder) + var requestBytes int defer func() { - h.EventRecorder.Record(ctx, metric.Event{ - OrgID: orgID, - Endpoint: r.URL.Path, // This should be sufficient for the time being as it should only be single endpoint. - RequestBytes: requestBytes, - ResponseBytes: sw.ResponseBytes(), - Status: sw.Code(), - }) + // Close around the requestBytes variable to placate the linter. + recorder.Record(ctx, requestBytes, org.ID, r.URL.Path) }() - a, err := pcontext.GetAuthorizer(ctx) + bucket, err := h.findBucket(ctx, org.ID, req.Bucket) if err != nil { - h.HandleHTTPError(ctx, err, w) + h.HandleHTTPError(ctx, err, sw) return } - - req, err := decodeWriteRequest(ctx, r) - if err != nil { - h.HandleHTTPError(ctx, err, w) - return - } - - log := h.log.With(zap.String("org", req.Org), zap.String("bucket", req.Bucket)) - - var org *influxdb.Organization - org, err = queryOrganization(ctx, r, h.OrganizationService) - if err != nil { - log.Info("Failed to find organization", zap.Error(err)) - h.HandleHTTPError(ctx, err, w) - return - } - - orgID = org.ID - span.LogKV("org_id", orgID) - - if req.Bucket == "" { - // if we still dont have a bucket time to error - h.HandleHTTPError(ctx, &influxdb.Error{ - Code: influxdb.ENotFound, - Op: "http/handleWrite", - Msg: "bucket not found", - }, w) - } - - var bucket *influxdb.Bucket - if id, err := influxdb.IDFromString(req.Bucket); err == nil { - // Decoded ID successfully. Make sure it's a real bucket. - b, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{ - OrganizationID: &org.ID, - ID: id, - }) - if err == nil { - bucket = b - } else if influxdb.ErrorCode(err) != influxdb.ENotFound { - h.HandleHTTPError(ctx, err, w) - return - } - } - - if bucket == nil { - b, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{ - OrganizationID: &org.ID, - Name: &req.Bucket, - }) - if err != nil { - h.HandleHTTPError(ctx, err, w) - return - } - - bucket = b - } span.LogKV("bucket_id", bucket.ID) - p, err := influxdb.NewPermissionAtID(bucket.ID, influxdb.WriteAction, influxdb.BucketsResourceType, org.ID) - if err != nil { - handleError(err, influxdb.EInternal, fmt.Sprintf("unable to create permission for bucket: %v", err)) + if err := checkBucketWritePermissions(auth, org.ID, bucket.ID); err != nil { + h.HandleHTTPError(ctx, err, sw) return } - if pset, err := a.PermissionSet(); err != nil || !pset.Allowed(*p) { - handleError(nil, influxdb.EForbidden, "insufficient permissions for write") + opts := append([]models.ParserOption{}, h.parserOptions...) + opts = append(opts, models.WithParserPrecision(req.Precision)) + parsed, err := NewPointsParser(opts...).ParsePoints(ctx, org.ID, bucket.ID, req.Body) + if err != nil { + h.HandleHTTPError(ctx, err, sw) + return + } + requestBytes = parsed.RawSize + + if err := h.PointsWriter.WritePoints(ctx, parsed.Points); err != nil { + h.HandleHTTPError(ctx, &influxdb.Error{ + Code: influxdb.EInternal, + Op: opWriteHandler, + Msg: "unexpected error writing points to database", + Err: err, + }, sw) return } - data, err := readWriteRequest(ctx, r.Body, r.Header.Get("Content-Encoding"), h.maxBatchSizeBytes) - if err != nil { - log.Error("Error reading body", zap.Error(err)) + sw.WriteHeader(http.StatusNoContent) +} +// checkBucketWritePermissions checks an Authorizer for write permissions to a +// specific Bucket. +func checkBucketWritePermissions(auth influxdb.Authorizer, orgID, bucketID influxdb.ID) error { + p, err := influxdb.NewPermissionAtID(bucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID) + if err != nil { + return &influxdb.Error{ + Code: influxdb.EInternal, + Op: opWriteHandler, + Msg: fmt.Sprintf("unable to create permission for bucket: %v", err), + Err: err, + } + } + if pset, err := auth.PermissionSet(); err != nil || !pset.Allowed(*p) { + return &influxdb.Error{ + Code: influxdb.EForbidden, + Op: opWriteHandler, + Msg: "insufficient permissions for write", + Err: err, + } + } + return nil +} + +// PointBatchReadCloser (potentially) wraps an io.ReadCloser in Gzip +// decompression and limits the reading to a specific number of bytes. +func PointBatchReadCloser(rc io.ReadCloser, encoding string, maxBatchSizeBytes int64) (io.ReadCloser, error) { + switch encoding { + case "gzip", "x-gzip": + var err error + rc, err = gzip.NewReader(rc) + if err != nil { + return nil, err + } + } + if maxBatchSizeBytes > 0 { + rc = kitio.NewLimitedReadCloser(rc, maxBatchSizeBytes) + } + return rc, nil +} + +// NewPointsParser returns a new PointsParser +func NewPointsParser(parserOptions ...models.ParserOption) *PointsParser { + return &PointsParser{ + ParserOptions: parserOptions, + } +} + +// ParsedPoints contains the points parsed as well as the total number of bytes +// after decompression. +type ParsedPoints struct { + Points models.Points + RawSize int +} + +// PointsParser parses batches of Points. +type PointsParser struct { + ParserOptions []models.ParserOption +} + +// ParsePoints parses the points from an io.ReadCloser for a specific Bucket. +func (pw *PointsParser) ParsePoints(ctx context.Context, orgID, bucketID influxdb.ID, rc io.ReadCloser) (*ParsedPoints, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "write points") + defer span.Finish() + return pw.parsePoints(ctx, orgID, bucketID, rc) +} + +func (pw *PointsParser) parsePoints(ctx context.Context, orgID, bucketID influxdb.ID, rc io.ReadCloser) (*ParsedPoints, error) { + data, err := readAll(ctx, rc) + if err != nil { code := influxdb.EInternal if errors.Is(err, ErrMaxBatchSizeExceeded) { code = influxdb.ETooLarge } else if errors.Is(err, gzip.ErrHeader) || errors.Is(err, gzip.ErrChecksum) { code = influxdb.EInvalid } - - handleError(err, code, "unable to read data") - return + return nil, &influxdb.Error{ + Code: code, + Op: opPointsWriter, + Msg: msgUnableToReadData, + Err: err, + } } - requestBytes = len(data) + requestBytes := len(data) if requestBytes == 0 { - handleError(err, influxdb.EInvalid, "writing requires points") - return + return nil, &influxdb.Error{ + Op: opPointsWriter, + Code: influxdb.EInvalid, + Msg: msgWritingRequiresPoints, + } } - span, _ = tracing.StartSpanFromContextWithOperationName(ctx, "encoding and parsing") - encoded := tsdb.EncodeName(org.ID, bucket.ID) + span, _ := tracing.StartSpanFromContextWithOperationName(ctx, "encoding and parsing") + encoded := tsdb.EncodeName(orgID, bucketID) mm := models.EscapeMeasurement(encoded[:]) - var options []models.ParserOption - if len(h.parserOptions) > 0 { - options = make([]models.ParserOption, 0, len(h.parserOptions)+1) - options = append(options, h.parserOptions...) - } - - if req.Precision != nil { - options = append(options, req.Precision) - } - - points, err := models.ParsePointsWithOptions(data, mm, options...) + points, err := models.ParsePointsWithOptions(data, mm, pw.ParserOptions...) span.LogKV("values_total", len(points)) span.Finish() if err != nil { @@ -306,83 +323,92 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { code = influxdb.ETooLarge } - handleError(err, code, "") - return - } - - if err := h.PointsWriter.WritePoints(ctx, points); err != nil { - log.Error("Error writing points", zap.Error(err)) - handleError(err, influxdb.EInternal, "unexpected error writing points to database") - return - } - - w.WriteHeader(http.StatusNoContent) -} - -func decodeWriteRequest(ctx context.Context, r *http.Request) (*postWriteRequest, error) { - qp := r.URL.Query() - p := qp.Get("precision") - if p == "" { - p = "ns" - } - - if !models.ValidPrecision(p) { return nil, &influxdb.Error{ - Code: influxdb.EInvalid, - Op: "http/decodeWriteRequest", - Msg: errInvalidPrecision, + Code: code, + Op: opPointsWriter, + Msg: "", + Err: err, } } - var precision models.ParserOption - if p != "ns" { - precision = models.WithParserPrecision(p) - } - - return &postWriteRequest{ - Bucket: qp.Get("bucket"), - Org: qp.Get("org"), - Precision: precision, + return &ParsedPoints{ + Points: points, + RawSize: requestBytes, }, nil } -func readWriteRequest(ctx context.Context, rc io.ReadCloser, encoding string, maxBatchSizeBytes int64) (v []byte, err error) { +func readAll(ctx context.Context, rc io.ReadCloser) (data []byte, err error) { defer func() { - // close the reader now that all bytes have been consumed - // this will return non-nil in the case of a configured limit - // being exceeded - if cerr := rc.Close(); err == nil { + if cerr := rc.Close(); cerr != nil && err == nil { + if errors.Is(cerr, kitio.ErrReadLimitExceeded) { + cerr = ErrMaxBatchSizeExceeded + } err = cerr } }() - switch encoding { - case "gzip", "x-gzip": - rc, err = gzip.NewReader(rc) - if err != nil { - return nil, err - } - } - - // given a limit is configured on the number of bytes in a - // batch then wrap the reader in a limited reader - if maxBatchSizeBytes > 0 { - rc = newLimitedReadCloser(rc, maxBatchSizeBytes) - } - span, _ := tracing.StartSpanFromContextWithOperationName(ctx, "read request body") + defer func() { - span.LogKV("request_bytes", len(v)) + span.LogKV("request_bytes", len(data)) span.Finish() }() - return ioutil.ReadAll(rc) + data, err = ioutil.ReadAll(rc) + if err != nil { + return nil, err + + } + return data, nil } -type postWriteRequest struct { +// writeRequest is a request object holding information about a batch of points +// to be written to a Bucket. +type writeRequest struct { Org string Bucket string - Precision models.ParserOption + Precision string + Body io.ReadCloser +} + +// decodeWriteRequest extracts information from an http.Request object to +// produce a writeRequest. +func decodeWriteRequest(ctx context.Context, r *http.Request, maxBatchSizeBytes int64) (*writeRequest, error) { + qp := r.URL.Query() + precision := qp.Get("precision") + if precision == "" { + precision = "ns" + } + + if !models.ValidPrecision(precision) { + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Op: "http/newWriteRequest", + Msg: msgInvalidPrecision, + } + } + + bucket := qp.Get("bucket") + if bucket == "" { + return nil, &influxdb.Error{ + Code: influxdb.ENotFound, + Op: "http/newWriteRequest", + Msg: "bucket not found", + } + } + + encoding := r.Header.Get("Content-Encoding") + body, err := PointBatchReadCloser(r.Body, encoding, maxBatchSizeBytes) + if err != nil { + return nil, err + } + + return &writeRequest{ + Bucket: qp.Get("bucket"), + Org: qp.Get("org"), + Precision: precision, + Body: body, + }, nil } // WriteService sends data over HTTP to influxdb via line protocol. @@ -405,7 +431,7 @@ func (s *WriteService) Write(ctx context.Context, orgID, bucketID influxdb.ID, r return &influxdb.Error{ Code: influxdb.EInvalid, Op: "http/Write", - Msg: errInvalidPrecision, + Msg: msgInvalidPrecision, } } @@ -419,7 +445,7 @@ func (s *WriteService) Write(ctx context.Context, orgID, bucketID influxdb.ID, r return err } - req, err := http.NewRequestWithContext(ctx, "POST", u.String(), r) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), r) if err != nil { return err } @@ -468,39 +494,3 @@ func compressWithGzip(data io.Reader) (io.Reader, error) { return pr, err } - -type limitedReader struct { - *io.LimitedReader - err error - close func() error -} - -func newLimitedReadCloser(r io.ReadCloser, n int64) *limitedReader { - // read up to max + 1 as limited reader just returns EOF when the limit is reached - // or when there is nothing left to read. If we exceed the max batch size by one - // then we know the limit has been passed. - return &limitedReader{ - LimitedReader: &io.LimitedReader{R: r, N: n + 1}, - close: r.Close, - } -} - -// Close returns an ErrMaxBatchSizeExceeded when the wrapped reader -// exceeds the set limit for number of bytes. -// This is safe to call more than once but not concurrently. -func (l *limitedReader) Close() (err error) { - defer func() { - if cerr := l.close(); cerr != nil && err == nil { - err = cerr - } - - // only call close once - l.close = func() error { return nil } - }() - - if l.N < 1 { - l.err = ErrMaxBatchSizeExceeded - } - - return l.err -} diff --git a/http/write_handler_test.go b/http/write_handler_test.go index 518eac0ee0..2700b99995 100644 --- a/http/write_handler_test.go +++ b/http/write_handler_test.go @@ -16,6 +16,7 @@ import ( httpmock "github.com/influxdata/influxdb/v2/http/mock" kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" "github.com/influxdata/influxdb/v2/mock" + "github.com/influxdata/influxdb/v2/models" influxtesting "github.com/influxdata/influxdb/v2/testing" "go.uber.org/zap/zaptest" ) @@ -303,7 +304,7 @@ func TestWriteHandler_handleWrite(t *testing.T) { state: state{ org: testOrg("043e0780ee2b1000"), bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"), - opts: []WriteHandlerOption{WithParserMaxBytes(5)}, + opts: []WriteHandlerOption{WithParserOptions(models.WithParserMaxBytes(5))}, }, wants: wants{ code: 413, @@ -321,7 +322,7 @@ func TestWriteHandler_handleWrite(t *testing.T) { state: state{ org: testOrg("043e0780ee2b1000"), bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"), - opts: []WriteHandlerOption{WithParserMaxLines(2)}, + opts: []WriteHandlerOption{WithParserOptions(models.WithParserMaxLines(2))}, }, wants: wants{ code: 413, @@ -339,7 +340,7 @@ func TestWriteHandler_handleWrite(t *testing.T) { state: state{ org: testOrg("043e0780ee2b1000"), bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"), - opts: []WriteHandlerOption{WithParserMaxValues(4)}, + opts: []WriteHandlerOption{WithParserOptions(models.WithParserMaxValues(4))}, }, wants: wants{ code: 413, diff --git a/http/write_usage_recorder.go b/http/write_usage_recorder.go new file mode 100644 index 0000000000..036f3b96bb --- /dev/null +++ b/http/write_usage_recorder.go @@ -0,0 +1,35 @@ +package http + +import ( + "context" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/http/metric" + kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" +) + +func NewWriteUsageRecorder(w *kithttp.StatusResponseWriter, recorder metric.EventRecorder) *WriteUsageRecorder { + return &WriteUsageRecorder{ + Writer: w, + EventRecorder: recorder, + } +} + +type WriteUsageRecorder struct { + Writer *kithttp.StatusResponseWriter + EventRecorder metric.EventRecorder +} + +func (w *WriteUsageRecorder) Write(b []byte) (int, error) { + return w.Writer.Write(b) +} + +func (w *WriteUsageRecorder) Record(ctx context.Context, requestBytes int, orgID influxdb.ID, endpoint string) { + w.EventRecorder.Record(ctx, metric.Event{ + OrgID: orgID, + Endpoint: endpoint, + RequestBytes: requestBytes, + ResponseBytes: w.Writer.ResponseBytes(), + Status: w.Writer.Code(), + }) +} diff --git a/kit/io/limited_read_closer.go b/kit/io/limited_read_closer.go new file mode 100644 index 0000000000..40ea2470a9 --- /dev/null +++ b/kit/io/limited_read_closer.go @@ -0,0 +1,48 @@ +package io + +import ( + "errors" + "io" +) + +var ErrReadLimitExceeded = errors.New("read limit exceeded") + +// LimitedReadCloser wraps an io.ReadCloser in limiting behavior using +// io.LimitedReader. It allows us to obtain the limit error at the time of close +// instead of just when writing. +type LimitedReadCloser struct { + *io.LimitedReader + err error + close func() error +} + +// NewLimitedReadCloser returns a new LimitedReadCloser. +func NewLimitedReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser { + // read up to max + 1 as limited reader just returns EOF when the limit is reached + // or when there is nothing left to read. If we exceed the max batch size by one + // then we know the limit has been passed. + return &LimitedReadCloser{ + LimitedReader: &io.LimitedReader{R: r, N: n + 1}, + close: r.Close, + } +} + +// Close returns an ErrReadLimitExceeded when the wrapped reader exceeds the set +// limit for number of bytes. This is safe to call more than once but not +// concurrently. +func (l *LimitedReadCloser) Close() (err error) { + defer func() { + if cerr := l.close(); cerr != nil && err == nil { + err = cerr + } + + // only call close once + l.close = func() error { return nil } + }() + + if l.N < 1 { + l.err = ErrReadLimitExceeded + } + + return l.err +} diff --git a/kit/io/limited_read_closer_test.go b/kit/io/limited_read_closer_test.go new file mode 100644 index 0000000000..7f2c88bb62 --- /dev/null +++ b/kit/io/limited_read_closer_test.go @@ -0,0 +1,37 @@ +package io + +import ( + "bytes" + "io" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLimitedReadCloser_Exceeded(t *testing.T) { + b := closer{bytes.NewBufferString("howdy")} + rc := NewLimitedReadCloser(b, 2) + + out, err := ioutil.ReadAll(rc) + require.NoError(t, err) + assert.Equal(t, []byte("how"), out) + assert.Equal(t, ErrReadLimitExceeded, rc.Close()) +} + +func TestLimitedReadCloser_Happy(t *testing.T) { + b := closer{bytes.NewBufferString("ho")} + rc := NewLimitedReadCloser(b, 2) + + out, err := ioutil.ReadAll(rc) + require.NoError(t, err) + assert.Equal(t, []byte("ho"), out) + assert.Nil(t, err) +} + +type closer struct { + io.Reader +} + +func (c closer) Close() error { return nil }