Merge pull request #18507 from influxdata/bb/extract-write-logic
refactor(http): Extract write handler logic for re-use elsewherepull/18578/head
commit
954939fa3a
2
go.mod
2
go.mod
|
@ -71,7 +71,6 @@ require (
|
|||
github.com/prometheus/client_golang v1.0.0
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
|
||||
github.com/prometheus/common v0.6.0
|
||||
github.com/prometheus/procfs v0.0.3 // indirect
|
||||
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
|
||||
github.com/spf13/cast v1.3.0
|
||||
github.com/spf13/cobra v1.0.0
|
||||
|
@ -104,6 +103,7 @@ require (
|
|||
gopkg.in/vmihailenco/msgpack.v2 v2.9.1 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.0-20200121175148-a6ecf24a6d71
|
||||
honnef.co/go/tools v0.0.1-2019.2.3.0.20190904154718-afd67930eec2
|
||||
istio.io/pkg v0.0.0-20200606170016-70c5172b9cdf
|
||||
labix.org/v2/mgo v0.0.0-20140701140051-000000000287 // indirect
|
||||
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 // indirect
|
||||
)
|
||||
|
|
41
go.sum
41
go.sum
|
@ -77,6 +77,8 @@ github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQ
|
|||
github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM=
|
||||
github.com/RoaringBitmap/roaring v0.4.16 h1:NholfewybRLOwACgfqfzn/N5xa6keKNs4fP00t0cwLo=
|
||||
github.com/RoaringBitmap/roaring v0.4.16/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w=
|
||||
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
|
||||
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
|
||||
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
|
||||
github.com/alecthomas/kingpin v2.2.6+incompatible/go.mod h1:59OFYbFVLKQKq+mqrL6Rw5bR0c3ACQaawgXx0QYndlE=
|
||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
|
||||
|
@ -91,6 +93,7 @@ github.com/aokoli/goutils v1.0.1 h1:7fpzNGoJ3VA8qcrm++XEE1QUe0mIwNeLa02Nwq7RDkg=
|
|||
github.com/aokoli/goutils v1.0.1/go.mod h1:SijmP0QR8LtwsmDs8Yii5Z/S4trXFGFC2oO5g9DP+DQ=
|
||||
github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db h1:nxAtV4VajJDhKysp2kdcJZsq8Ss1xSA0vZTkVHHJd0E=
|
||||
github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0=
|
||||
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
||||
github.com/apex/log v1.1.4 h1:3Zk+boorIQAAGBrHn0JUtAau4ihMamT4WdnfdnXM1zQ=
|
||||
github.com/apex/log v1.1.4/go.mod h1:AlpoD9aScyQfJDVHmLMEcx4oU6LqzkWp4Mg9GdAcEvQ=
|
||||
github.com/apex/logs v0.0.4/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo=
|
||||
|
@ -115,6 +118,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLM
|
|||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
|
||||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||
github.com/blakesmith/ar v0.0.0-20190502131153-809d4375e1fb h1:m935MPodAbYS46DG4pJSv7WO+VECIWUQ7OJYSoTrMh4=
|
||||
github.com/blakesmith/ar v0.0.0-20190502131153-809d4375e1fb/go.mod h1:PkYb9DJNAwrSvRx5DYA+gUcOIgTGVMNkfSCbZM8cWpI=
|
||||
|
@ -181,6 +186,9 @@ github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk
|
|||
github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
|
||||
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
|
||||
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
||||
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
|
||||
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
|
||||
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
|
||||
github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190819115812-1474bdeaf2a2 h1:BoejGRtu+FygJB/0ZpkhTSmaM7QbsPxFgcspAbTElNI=
|
||||
|
@ -260,6 +268,7 @@ github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI=
|
|||
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
||||
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
|
||||
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
|
||||
github.com/gogo/protobuf v1.2.2-0.20190730201129-28a6bbf47e48/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
|
||||
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||
github.com/golang/gddo v0.0.0-20181116215533-9bd4a3295021 h1:HYV500jCgk+IC68L5sWrLFIWMpaUFfXXpJSAb7XOoBk=
|
||||
|
@ -375,6 +384,7 @@ github.com/goreleaser/nfpm v1.2.1 h1:AEnu9XVmupRDTR930Z2rAs31Mj6sLIPxFcR9ESYvgDA
|
|||
github.com/goreleaser/nfpm v1.2.1/go.mod h1:TtWrABZozuLOttX2uDlYyECfQX7x5XYkVxhjYcR6G9w=
|
||||
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
|
||||
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
||||
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
||||
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
||||
github.com/gostaticanalysis/analysisutil v0.0.0-20190318220348-4088753ea4d3 h1:JVnpOZS+qxli+rgVl98ILOXVNbW+kb5wcxeGx8ShUIw=
|
||||
github.com/gostaticanalysis/analysisutil v0.0.0-20190318220348-4088753ea4d3/go.mod h1:eEOZF4jCKGi+aprrirO9e7WKB3beBRtWgqGunKl6pKE=
|
||||
|
@ -430,6 +440,7 @@ github.com/hashicorp/vault/sdk v0.1.8 h1:pfF3KwA1yPlfpmcumNsFM4uo91WMasX5gTuIkIt
|
|||
github.com/hashicorp/vault/sdk v0.1.8/go.mod h1:tHZfc6St71twLizWNHvnnbiGFo1aq0eD2jGPLtP8kAU=
|
||||
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
|
||||
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
|
||||
github.com/howeyc/fsnotify v0.9.0/go.mod h1:41HzSPxBGeFRQKEEwgh49TRw/nKBsYZ2cF1OzPjSJsA=
|
||||
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/huandu/xstrings v1.0.0 h1:pO2K/gKgKaat5LdpAhxhluX2GPQMaI3W5FUz/I/UnWk=
|
||||
|
@ -579,6 +590,8 @@ github.com/mozilla/tls-observatory v0.0.0-20190404164649-a3c1b6cfecfd/go.mod h1:
|
|||
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae h1:VeRdUYdCw49yizlSbMEn2SZ+gT+3IUKx8BqxyQdz+BY=
|
||||
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM=
|
||||
github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk=
|
||||
github.com/nats-io/gnatsd v1.3.0 h1:+5d80klu3QaJgNbdavVBjWJP7cHd11U2CLnRTFM9ICI=
|
||||
github.com/nats-io/gnatsd v1.3.0/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ=
|
||||
github.com/nats-io/go-nats v1.7.0 h1:oQOfHcLr8hb43QG8yeVyY2jtarIaTjOv41CGdF3tTvQ=
|
||||
|
@ -598,8 +611,10 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
|
|||
github.com/olekukonko/tablewriter v0.0.4 h1:vHD/YYe1Wolo78koG299f7V/VAS08c6IpCLn+Ejf/w8=
|
||||
github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA=
|
||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/ginkgo v1.11.0 h1:JAKSXpt1YjtLA7YpPiqO9ss6sNXEsPfSGdwN0UHqzrw=
|
||||
github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
||||
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
||||
github.com/onsi/gomega v1.8.1 h1:C5Dqfs/LeauYDX0jJXIe2SWmwCbGzx9yF8C8xy3Lh34=
|
||||
github.com/onsi/gomega v1.8.1/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA=
|
||||
|
@ -613,6 +628,7 @@ github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg
|
|||
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
|
||||
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
|
||||
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
|
||||
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
|
||||
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
|
||||
|
@ -636,27 +652,35 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
|||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
|
||||
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
|
||||
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
|
||||
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
|
||||
github.com/prometheus/client_golang v0.9.4/go.mod h1:oCXIBxdI62A4cR6aTRJCgetEjecSIYzOEaeAn4iYEpM=
|
||||
github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM=
|
||||
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE=
|
||||
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
|
||||
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
|
||||
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
|
||||
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
|
||||
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
|
||||
github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkpEo=
|
||||
github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc=
|
||||
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
|
||||
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
|
||||
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
|
||||
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
|
||||
github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE=
|
||||
github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
|
||||
github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
|
||||
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
|
||||
github.com/prometheus/prom2json v1.1.0/go.mod h1:v7OY1795b9fEUZgq4UU2+15YjRv0LfpxKejIQCy3L7o=
|
||||
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
||||
github.com/quasilyte/go-consistent v0.0.0-20190521200055-c6f3937de18c/go.mod h1:5STLWrekHfjyYwxBRVRXNOSewLJ3PWfDJd1VyTS21fI=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
|
||||
github.com/rogpeppe/fastuuid v1.1.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
|
@ -798,6 +822,8 @@ github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZ
|
|||
go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk=
|
||||
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
|
||||
go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0=
|
||||
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
|
||||
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
|
||||
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
|
||||
go.opencensus.io v0.22.0 h1:C9hSCOW830chIVkdja34wa6Ky+IzWllkUinR+BtRZd4=
|
||||
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
|
||||
|
@ -880,6 +906,7 @@ golang.org/x/net v0.0.0-20181108082009-03003ca0c849/go.mod h1:mL1N/T3taQHkDXs73r
|
|||
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
|
@ -894,6 +921,7 @@ golang.org/x/net v0.0.0-20190619014844-b5b0513f8c1b/go.mod h1:z5CRVTTTmAJ677TzLL
|
|||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
|
||||
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
|
@ -926,7 +954,9 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h
|
|||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181228144115-9a3f9b0469bb h1:pf3XwC90UUdNPYWZdFjhGBE7DUFuK3Ct1zWmZ65QN30=
|
||||
golang.org/x/sys v0.0.0-20181228144115-9a3f9b0469bb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
@ -946,6 +976,7 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20190620070143-6f217b454f45/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/xC2Run6RzeW1SyHxpc=
|
||||
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
@ -1030,6 +1061,7 @@ gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca h1:PupagGYwj8+I4ubCxcmcBRk
|
|||
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
|
||||
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6 h1:4WsZyVtkthqrHTbDCJfiTs8IWNYE4uvsSDgaV6xpp+o=
|
||||
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
|
||||
google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
|
||||
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
|
||||
google.golang.org/api v0.5.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
|
||||
google.golang.org/api v0.6.0/go.mod h1:btoxGiFvQNVUZQ8W08zLtrVS08CNpINPEfxXxgJL1Q4=
|
||||
|
@ -1110,6 +1142,8 @@ gopkg.in/ini.v1 v1.46.0 h1:VeDZbLYGaupuvIrsYCEOe/L/2Pcs5n7hdO1ZTjporag=
|
|||
gopkg.in/ini.v1 v1.46.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
|
||||
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
|
||||
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
|
||||
gopkg.in/square/go-jose.v2 v2.3.1 h1:SK5KegNXmKmqE342YYN2qPHEnUYeoMiXXl1poUlI+o4=
|
||||
gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
|
||||
|
@ -1141,6 +1175,9 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
|
|||
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||
honnef.co/go/tools v0.0.1-2019.2.3.0.20190904154718-afd67930eec2 h1:TeRFic5UJx5LXC3n8mOt92bqyM5J+so5EuErnxK4PAk=
|
||||
honnef.co/go/tools v0.0.1-2019.2.3.0.20190904154718-afd67930eec2/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||
istio.io/api v0.0.0-20190515205759-982e5c3888c6/go.mod h1:hhLFQmpHia8zgaM37vb2ml9iS5NfNfqZGRt1pS9aVEo=
|
||||
istio.io/pkg v0.0.0-20200606170016-70c5172b9cdf h1:iNpiPvg8fcQxebYPrd9Dhjvqd+SIF7OFH+1qp89/nHQ=
|
||||
istio.io/pkg v0.0.0-20200606170016-70c5172b9cdf/go.mod h1:EwvmercDF5DLCg5qUQlkM40xHwCxGoY1H/2LhI1p2YU=
|
||||
labix.org/v2/mgo v0.0.0-20140701140051-000000000287 h1:L0cnkNl4TfAXzvdrqsYEmxOHOCv2p5I3taaReO8BWFs=
|
||||
labix.org/v2/mgo v0.0.0-20140701140051-000000000287/go.mod h1:Lg7AYkt1uXJoR9oeSZ3W/8IXLdvOfIITgZnommstyz4=
|
||||
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 h1:Izowp2XBH6Ya6rv+hqbceQyw/gSGoXfH/UPoTGduL54=
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/kit/feature"
|
||||
"github.com/influxdata/influxdb/v2/kit/prom"
|
||||
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/query"
|
||||
"github.com/influxdata/influxdb/v2/storage"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -203,9 +204,11 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler {
|
|||
writeBackend := NewWriteBackend(b.Logger.With(zap.String("handler", "write")), b)
|
||||
h.Mount(prefixWrite, NewWriteHandler(b.Logger, writeBackend,
|
||||
WithMaxBatchSizeBytes(b.MaxBatchSizeBytes),
|
||||
WithParserMaxBytes(b.WriteParserMaxBytes),
|
||||
WithParserMaxLines(b.WriteParserMaxLines),
|
||||
WithParserMaxValues(b.WriteParserMaxValues),
|
||||
WithParserOptions(
|
||||
models.WithParserMaxBytes(b.WriteParserMaxBytes),
|
||||
models.WithParserMaxLines(b.WriteParserMaxLines),
|
||||
models.WithParserMaxValues(b.WriteParserMaxValues),
|
||||
),
|
||||
))
|
||||
|
||||
for _, o := range opts {
|
||||
|
|
|
@ -26,9 +26,7 @@ const (
|
|||
// the name. It interprets the &org= parameter as either the name
|
||||
// or the ID.
|
||||
func queryOrganization(ctx context.Context, r *http.Request, svc platform.OrganizationService) (o *platform.Organization, err error) {
|
||||
|
||||
filter := platform.OrganizationFilter{}
|
||||
|
||||
if organization := r.URL.Query().Get(Org); organization != "" {
|
||||
if id, err := platform.IDFromString(organization); err == nil {
|
||||
filter.ID = id
|
||||
|
|
|
@ -13,12 +13,15 @@ import (
|
|||
"github.com/influxdata/influxdb/v2"
|
||||
pcontext "github.com/influxdata/influxdb/v2/context"
|
||||
"github.com/influxdata/influxdb/v2/http/metric"
|
||||
kitio "github.com/influxdata/influxdb/v2/kit/io"
|
||||
"github.com/influxdata/influxdb/v2/kit/tracing"
|
||||
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/storage"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"go.uber.org/zap"
|
||||
"istio.io/pkg/log"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -55,22 +58,16 @@ func NewWriteBackend(log *zap.Logger, b *APIBackend) *WriteBackend {
|
|||
|
||||
// WriteHandler receives line protocol and sends to a publish function.
|
||||
type WriteHandler struct {
|
||||
*httprouter.Router
|
||||
influxdb.HTTPErrorHandler
|
||||
log *zap.Logger
|
||||
|
||||
BucketService influxdb.BucketService
|
||||
OrganizationService influxdb.OrganizationService
|
||||
PointsWriter storage.PointsWriter
|
||||
EventRecorder metric.EventRecorder
|
||||
|
||||
PointsWriter storage.PointsWriter
|
||||
|
||||
EventRecorder metric.EventRecorder
|
||||
|
||||
router *httprouter.Router
|
||||
log *zap.Logger
|
||||
maxBatchSizeBytes int64
|
||||
parserOptions []models.ParserOption
|
||||
parserMaxBytes int
|
||||
parserMaxLines int
|
||||
parserMaxValues int
|
||||
}
|
||||
|
||||
// WriteHandlerOption is a functional option for a *WriteHandler
|
||||
|
@ -84,27 +81,9 @@ func WithMaxBatchSizeBytes(n int64) WriteHandlerOption {
|
|||
}
|
||||
}
|
||||
|
||||
// WithParserMaxBytes specifies the maximum number of bytes that may be allocated when processing a single
|
||||
// write request. When n is zero, there is no limit.
|
||||
func WithParserMaxBytes(n int) WriteHandlerOption {
|
||||
func WithParserOptions(opts ...models.ParserOption) WriteHandlerOption {
|
||||
return func(w *WriteHandler) {
|
||||
w.parserMaxBytes = n
|
||||
}
|
||||
}
|
||||
|
||||
// WithParserMaxLines specifies the maximum number of lines that may be parsed when processing a single
|
||||
// write request. When n is zero, there is no limit.
|
||||
func WithParserMaxLines(n int) WriteHandlerOption {
|
||||
return func(w *WriteHandler) {
|
||||
w.parserMaxLines = n
|
||||
}
|
||||
}
|
||||
|
||||
// WithParserMaxValues specifies the maximum number of values that may be parsed when processing a single
|
||||
// write request. When n is zero, there is no limit.
|
||||
func WithParserMaxValues(n int) WriteHandlerOption {
|
||||
return func(w *WriteHandler) {
|
||||
w.parserMaxValues = n
|
||||
w.parserOptions = opts
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -114,41 +93,59 @@ func (*WriteHandler) Prefix() string {
|
|||
}
|
||||
|
||||
const (
|
||||
prefixWrite = "/api/v2/write"
|
||||
errInvalidGzipHeader = "gzipped HTTP body contains an invalid header"
|
||||
errInvalidPrecision = "invalid precision; valid precision units are ns, us, ms, and s"
|
||||
prefixWrite = "/api/v2/write"
|
||||
msgInvalidGzipHeader = "gzipped HTTP body contains an invalid header"
|
||||
msgInvalidPrecision = "invalid precision; valid precision units are ns, us, ms, and s"
|
||||
msgUnableToReadData = "unable to read data"
|
||||
msgWritingRequiresPoints = "writing requires points"
|
||||
msgUnexpectedWriteError = "unexpected error writing points to database"
|
||||
|
||||
opPointsWriter = "http/pointsWriter"
|
||||
opWriteHandler = "http/writeHandler"
|
||||
)
|
||||
|
||||
// NewWriteHandler creates a new handler at /api/v2/write to receive line protocol.
|
||||
func NewWriteHandler(log *zap.Logger, b *WriteBackend, opts ...WriteHandlerOption) *WriteHandler {
|
||||
h := &WriteHandler{
|
||||
Router: NewRouter(b.HTTPErrorHandler),
|
||||
HTTPErrorHandler: b.HTTPErrorHandler,
|
||||
log: log,
|
||||
|
||||
HTTPErrorHandler: b.HTTPErrorHandler,
|
||||
PointsWriter: b.PointsWriter,
|
||||
BucketService: b.BucketService,
|
||||
OrganizationService: b.OrganizationService,
|
||||
EventRecorder: b.WriteEventRecorder,
|
||||
|
||||
router: NewRouter(b.HTTPErrorHandler),
|
||||
log: log,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(h)
|
||||
}
|
||||
|
||||
// cache configured options
|
||||
if h.parserMaxBytes > 0 {
|
||||
h.parserOptions = append(h.parserOptions, models.WithParserMaxBytes(h.parserMaxBytes))
|
||||
}
|
||||
if h.parserMaxLines > 0 {
|
||||
h.parserOptions = append(h.parserOptions, models.WithParserMaxLines(h.parserMaxLines))
|
||||
}
|
||||
if h.parserMaxValues > 0 {
|
||||
h.parserOptions = append(h.parserOptions, models.WithParserMaxValues(h.parserMaxValues))
|
||||
h.router.HandlerFunc(http.MethodPost, prefixWrite, h.handleWrite)
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *WriteHandler) findBucket(ctx context.Context, orgID influxdb.ID, bucket string) (*influxdb.Bucket, error) {
|
||||
if id, err := influxdb.IDFromString(bucket); err == nil {
|
||||
b, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{
|
||||
OrganizationID: &orgID,
|
||||
ID: id,
|
||||
})
|
||||
if err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
|
||||
return nil, err
|
||||
} else if err == nil {
|
||||
return b, err
|
||||
}
|
||||
}
|
||||
|
||||
h.HandlerFunc("POST", prefixWrite, h.handleWrite)
|
||||
return h
|
||||
return h.BucketService.FindBucket(ctx, influxdb.BucketFilter{
|
||||
OrganizationID: &orgID,
|
||||
Name: &bucket,
|
||||
})
|
||||
}
|
||||
|
||||
func (h *WriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
h.router.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -156,144 +153,164 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
|
|||
defer span.Finish()
|
||||
|
||||
ctx := r.Context()
|
||||
defer r.Body.Close()
|
||||
auth, err := pcontext.GetAuthorizer(ctx)
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO(desa): I really don't like how we're recording the usage metrics here
|
||||
// Ideally this will be moved when we solve https://github.com/influxdata/influxdb/issues/13403
|
||||
var (
|
||||
orgID influxdb.ID
|
||||
requestBytes int
|
||||
sw = kithttp.NewStatusResponseWriter(w)
|
||||
handleError = func(err error, code, message string) {
|
||||
h.HandleHTTPError(ctx, &influxdb.Error{
|
||||
Code: code,
|
||||
Op: "http/handleWrite",
|
||||
Msg: message,
|
||||
Err: err,
|
||||
}, w)
|
||||
}
|
||||
)
|
||||
w = sw
|
||||
req, err := decodeWriteRequest(ctx, r, h.maxBatchSizeBytes)
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
org, err := queryOrganization(ctx, r, h.OrganizationService)
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
span.LogKV("org_id", org.ID)
|
||||
|
||||
sw := kithttp.NewStatusResponseWriter(w)
|
||||
recorder := NewWriteUsageRecorder(sw, h.EventRecorder)
|
||||
var requestBytes int
|
||||
defer func() {
|
||||
h.EventRecorder.Record(ctx, metric.Event{
|
||||
OrgID: orgID,
|
||||
Endpoint: r.URL.Path, // This should be sufficient for the time being as it should only be single endpoint.
|
||||
RequestBytes: requestBytes,
|
||||
ResponseBytes: sw.ResponseBytes(),
|
||||
Status: sw.Code(),
|
||||
})
|
||||
// Close around the requestBytes variable to placate the linter.
|
||||
recorder.Record(ctx, requestBytes, org.ID, r.URL.Path)
|
||||
}()
|
||||
|
||||
a, err := pcontext.GetAuthorizer(ctx)
|
||||
bucket, err := h.findBucket(ctx, org.ID, req.Bucket)
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
h.HandleHTTPError(ctx, err, sw)
|
||||
return
|
||||
}
|
||||
|
||||
req, err := decodeWriteRequest(ctx, r)
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
log := h.log.With(zap.String("org", req.Org), zap.String("bucket", req.Bucket))
|
||||
|
||||
var org *influxdb.Organization
|
||||
org, err = queryOrganization(ctx, r, h.OrganizationService)
|
||||
if err != nil {
|
||||
log.Info("Failed to find organization", zap.Error(err))
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
orgID = org.ID
|
||||
span.LogKV("org_id", orgID)
|
||||
|
||||
if req.Bucket == "" {
|
||||
// if we still dont have a bucket time to error
|
||||
h.HandleHTTPError(ctx, &influxdb.Error{
|
||||
Code: influxdb.ENotFound,
|
||||
Op: "http/handleWrite",
|
||||
Msg: "bucket not found",
|
||||
}, w)
|
||||
}
|
||||
|
||||
var bucket *influxdb.Bucket
|
||||
if id, err := influxdb.IDFromString(req.Bucket); err == nil {
|
||||
// Decoded ID successfully. Make sure it's a real bucket.
|
||||
b, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{
|
||||
OrganizationID: &org.ID,
|
||||
ID: id,
|
||||
})
|
||||
if err == nil {
|
||||
bucket = b
|
||||
} else if influxdb.ErrorCode(err) != influxdb.ENotFound {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if bucket == nil {
|
||||
b, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{
|
||||
OrganizationID: &org.ID,
|
||||
Name: &req.Bucket,
|
||||
})
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
bucket = b
|
||||
}
|
||||
span.LogKV("bucket_id", bucket.ID)
|
||||
|
||||
p, err := influxdb.NewPermissionAtID(bucket.ID, influxdb.WriteAction, influxdb.BucketsResourceType, org.ID)
|
||||
if err != nil {
|
||||
handleError(err, influxdb.EInternal, fmt.Sprintf("unable to create permission for bucket: %v", err))
|
||||
if err := checkBucketWritePermissions(auth, org.ID, bucket.ID); err != nil {
|
||||
h.HandleHTTPError(ctx, err, sw)
|
||||
return
|
||||
}
|
||||
|
||||
if pset, err := a.PermissionSet(); err != nil || !pset.Allowed(*p) {
|
||||
handleError(nil, influxdb.EForbidden, "insufficient permissions for write")
|
||||
opts := append([]models.ParserOption{}, h.parserOptions...)
|
||||
opts = append(opts, models.WithParserPrecision(req.Precision))
|
||||
parsed, err := NewPointsParser(opts...).ParsePoints(ctx, org.ID, bucket.ID, req.Body)
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, err, sw)
|
||||
return
|
||||
}
|
||||
requestBytes = parsed.RawSize
|
||||
|
||||
if err := h.PointsWriter.WritePoints(ctx, parsed.Points); err != nil {
|
||||
h.HandleHTTPError(ctx, &influxdb.Error{
|
||||
Code: influxdb.EInternal,
|
||||
Op: opWriteHandler,
|
||||
Msg: "unexpected error writing points to database",
|
||||
Err: err,
|
||||
}, sw)
|
||||
return
|
||||
}
|
||||
|
||||
data, err := readWriteRequest(ctx, r.Body, r.Header.Get("Content-Encoding"), h.maxBatchSizeBytes)
|
||||
if err != nil {
|
||||
log.Error("Error reading body", zap.Error(err))
|
||||
sw.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// checkBucketWritePermissions checks an Authorizer for write permissions to a
|
||||
// specific Bucket.
|
||||
func checkBucketWritePermissions(auth influxdb.Authorizer, orgID, bucketID influxdb.ID) error {
|
||||
p, err := influxdb.NewPermissionAtID(bucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID)
|
||||
if err != nil {
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.EInternal,
|
||||
Op: opWriteHandler,
|
||||
Msg: fmt.Sprintf("unable to create permission for bucket: %v", err),
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
if pset, err := auth.PermissionSet(); err != nil || !pset.Allowed(*p) {
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.EForbidden,
|
||||
Op: opWriteHandler,
|
||||
Msg: "insufficient permissions for write",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PointBatchReadCloser (potentially) wraps an io.ReadCloser in Gzip
|
||||
// decompression and limits the reading to a specific number of bytes.
|
||||
func PointBatchReadCloser(rc io.ReadCloser, encoding string, maxBatchSizeBytes int64) (io.ReadCloser, error) {
|
||||
switch encoding {
|
||||
case "gzip", "x-gzip":
|
||||
var err error
|
||||
rc, err = gzip.NewReader(rc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if maxBatchSizeBytes > 0 {
|
||||
rc = kitio.NewLimitedReadCloser(rc, maxBatchSizeBytes)
|
||||
}
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
// NewPointsParser returns a new PointsParser
|
||||
func NewPointsParser(parserOptions ...models.ParserOption) *PointsParser {
|
||||
return &PointsParser{
|
||||
ParserOptions: parserOptions,
|
||||
}
|
||||
}
|
||||
|
||||
// ParsedPoints contains the points parsed as well as the total number of bytes
|
||||
// after decompression.
|
||||
type ParsedPoints struct {
|
||||
Points models.Points
|
||||
RawSize int
|
||||
}
|
||||
|
||||
// PointsParser parses batches of Points.
|
||||
type PointsParser struct {
|
||||
ParserOptions []models.ParserOption
|
||||
}
|
||||
|
||||
// ParsePoints parses the points from an io.ReadCloser for a specific Bucket.
|
||||
func (pw *PointsParser) ParsePoints(ctx context.Context, orgID, bucketID influxdb.ID, rc io.ReadCloser) (*ParsedPoints, error) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "write points")
|
||||
defer span.Finish()
|
||||
return pw.parsePoints(ctx, orgID, bucketID, rc)
|
||||
}
|
||||
|
||||
func (pw *PointsParser) parsePoints(ctx context.Context, orgID, bucketID influxdb.ID, rc io.ReadCloser) (*ParsedPoints, error) {
|
||||
data, err := readAll(ctx, rc)
|
||||
if err != nil {
|
||||
code := influxdb.EInternal
|
||||
if errors.Is(err, ErrMaxBatchSizeExceeded) {
|
||||
code = influxdb.ETooLarge
|
||||
} else if errors.Is(err, gzip.ErrHeader) || errors.Is(err, gzip.ErrChecksum) {
|
||||
code = influxdb.EInvalid
|
||||
}
|
||||
|
||||
handleError(err, code, "unable to read data")
|
||||
return
|
||||
return nil, &influxdb.Error{
|
||||
Code: code,
|
||||
Op: opPointsWriter,
|
||||
Msg: msgUnableToReadData,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
requestBytes = len(data)
|
||||
requestBytes := len(data)
|
||||
if requestBytes == 0 {
|
||||
handleError(err, influxdb.EInvalid, "writing requires points")
|
||||
return
|
||||
return nil, &influxdb.Error{
|
||||
Op: opPointsWriter,
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: msgWritingRequiresPoints,
|
||||
}
|
||||
}
|
||||
|
||||
span, _ = tracing.StartSpanFromContextWithOperationName(ctx, "encoding and parsing")
|
||||
encoded := tsdb.EncodeName(org.ID, bucket.ID)
|
||||
span, _ := tracing.StartSpanFromContextWithOperationName(ctx, "encoding and parsing")
|
||||
encoded := tsdb.EncodeName(orgID, bucketID)
|
||||
mm := models.EscapeMeasurement(encoded[:])
|
||||
|
||||
var options []models.ParserOption
|
||||
if len(h.parserOptions) > 0 {
|
||||
options = make([]models.ParserOption, 0, len(h.parserOptions)+1)
|
||||
options = append(options, h.parserOptions...)
|
||||
}
|
||||
|
||||
if req.Precision != nil {
|
||||
options = append(options, req.Precision)
|
||||
}
|
||||
|
||||
points, err := models.ParsePointsWithOptions(data, mm, options...)
|
||||
points, err := models.ParsePointsWithOptions(data, mm, pw.ParserOptions...)
|
||||
span.LogKV("values_total", len(points))
|
||||
span.Finish()
|
||||
if err != nil {
|
||||
|
@ -306,83 +323,92 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
|
|||
code = influxdb.ETooLarge
|
||||
}
|
||||
|
||||
handleError(err, code, "")
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.PointsWriter.WritePoints(ctx, points); err != nil {
|
||||
log.Error("Error writing points", zap.Error(err))
|
||||
handleError(err, influxdb.EInternal, "unexpected error writing points to database")
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func decodeWriteRequest(ctx context.Context, r *http.Request) (*postWriteRequest, error) {
|
||||
qp := r.URL.Query()
|
||||
p := qp.Get("precision")
|
||||
if p == "" {
|
||||
p = "ns"
|
||||
}
|
||||
|
||||
if !models.ValidPrecision(p) {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Op: "http/decodeWriteRequest",
|
||||
Msg: errInvalidPrecision,
|
||||
Code: code,
|
||||
Op: opPointsWriter,
|
||||
Msg: "",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
var precision models.ParserOption
|
||||
if p != "ns" {
|
||||
precision = models.WithParserPrecision(p)
|
||||
}
|
||||
|
||||
return &postWriteRequest{
|
||||
Bucket: qp.Get("bucket"),
|
||||
Org: qp.Get("org"),
|
||||
Precision: precision,
|
||||
return &ParsedPoints{
|
||||
Points: points,
|
||||
RawSize: requestBytes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func readWriteRequest(ctx context.Context, rc io.ReadCloser, encoding string, maxBatchSizeBytes int64) (v []byte, err error) {
|
||||
func readAll(ctx context.Context, rc io.ReadCloser) (data []byte, err error) {
|
||||
defer func() {
|
||||
// close the reader now that all bytes have been consumed
|
||||
// this will return non-nil in the case of a configured limit
|
||||
// being exceeded
|
||||
if cerr := rc.Close(); err == nil {
|
||||
if cerr := rc.Close(); cerr != nil && err == nil {
|
||||
if errors.Is(cerr, kitio.ErrReadLimitExceeded) {
|
||||
cerr = ErrMaxBatchSizeExceeded
|
||||
}
|
||||
err = cerr
|
||||
}
|
||||
}()
|
||||
|
||||
switch encoding {
|
||||
case "gzip", "x-gzip":
|
||||
rc, err = gzip.NewReader(rc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// given a limit is configured on the number of bytes in a
|
||||
// batch then wrap the reader in a limited reader
|
||||
if maxBatchSizeBytes > 0 {
|
||||
rc = newLimitedReadCloser(rc, maxBatchSizeBytes)
|
||||
}
|
||||
|
||||
span, _ := tracing.StartSpanFromContextWithOperationName(ctx, "read request body")
|
||||
|
||||
defer func() {
|
||||
span.LogKV("request_bytes", len(v))
|
||||
span.LogKV("request_bytes", len(data))
|
||||
span.Finish()
|
||||
}()
|
||||
|
||||
return ioutil.ReadAll(rc)
|
||||
data, err = ioutil.ReadAll(rc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
type postWriteRequest struct {
|
||||
// writeRequest is a request object holding information about a batch of points
|
||||
// to be written to a Bucket.
|
||||
type writeRequest struct {
|
||||
Org string
|
||||
Bucket string
|
||||
Precision models.ParserOption
|
||||
Precision string
|
||||
Body io.ReadCloser
|
||||
}
|
||||
|
||||
// decodeWriteRequest extracts information from an http.Request object to
|
||||
// produce a writeRequest.
|
||||
func decodeWriteRequest(ctx context.Context, r *http.Request, maxBatchSizeBytes int64) (*writeRequest, error) {
|
||||
qp := r.URL.Query()
|
||||
precision := qp.Get("precision")
|
||||
if precision == "" {
|
||||
precision = "ns"
|
||||
}
|
||||
|
||||
if !models.ValidPrecision(precision) {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Op: "http/newWriteRequest",
|
||||
Msg: msgInvalidPrecision,
|
||||
}
|
||||
}
|
||||
|
||||
bucket := qp.Get("bucket")
|
||||
if bucket == "" {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.ENotFound,
|
||||
Op: "http/newWriteRequest",
|
||||
Msg: "bucket not found",
|
||||
}
|
||||
}
|
||||
|
||||
encoding := r.Header.Get("Content-Encoding")
|
||||
body, err := PointBatchReadCloser(r.Body, encoding, maxBatchSizeBytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &writeRequest{
|
||||
Bucket: qp.Get("bucket"),
|
||||
Org: qp.Get("org"),
|
||||
Precision: precision,
|
||||
Body: body,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// WriteService sends data over HTTP to influxdb via line protocol.
|
||||
|
@ -405,7 +431,7 @@ func (s *WriteService) Write(ctx context.Context, orgID, bucketID influxdb.ID, r
|
|||
return &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Op: "http/Write",
|
||||
Msg: errInvalidPrecision,
|
||||
Msg: msgInvalidPrecision,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -419,7 +445,7 @@ func (s *WriteService) Write(ctx context.Context, orgID, bucketID influxdb.ID, r
|
|||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", u.String(), r)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -468,39 +494,3 @@ func compressWithGzip(data io.Reader) (io.Reader, error) {
|
|||
|
||||
return pr, err
|
||||
}
|
||||
|
||||
type limitedReader struct {
|
||||
*io.LimitedReader
|
||||
err error
|
||||
close func() error
|
||||
}
|
||||
|
||||
func newLimitedReadCloser(r io.ReadCloser, n int64) *limitedReader {
|
||||
// read up to max + 1 as limited reader just returns EOF when the limit is reached
|
||||
// or when there is nothing left to read. If we exceed the max batch size by one
|
||||
// then we know the limit has been passed.
|
||||
return &limitedReader{
|
||||
LimitedReader: &io.LimitedReader{R: r, N: n + 1},
|
||||
close: r.Close,
|
||||
}
|
||||
}
|
||||
|
||||
// Close returns an ErrMaxBatchSizeExceeded when the wrapped reader
|
||||
// exceeds the set limit for number of bytes.
|
||||
// This is safe to call more than once but not concurrently.
|
||||
func (l *limitedReader) Close() (err error) {
|
||||
defer func() {
|
||||
if cerr := l.close(); cerr != nil && err == nil {
|
||||
err = cerr
|
||||
}
|
||||
|
||||
// only call close once
|
||||
l.close = func() error { return nil }
|
||||
}()
|
||||
|
||||
if l.N < 1 {
|
||||
l.err = ErrMaxBatchSizeExceeded
|
||||
}
|
||||
|
||||
return l.err
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
httpmock "github.com/influxdata/influxdb/v2/http/mock"
|
||||
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
|
||||
"github.com/influxdata/influxdb/v2/mock"
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
influxtesting "github.com/influxdata/influxdb/v2/testing"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
@ -303,7 +304,7 @@ func TestWriteHandler_handleWrite(t *testing.T) {
|
|||
state: state{
|
||||
org: testOrg("043e0780ee2b1000"),
|
||||
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
|
||||
opts: []WriteHandlerOption{WithParserMaxBytes(5)},
|
||||
opts: []WriteHandlerOption{WithParserOptions(models.WithParserMaxBytes(5))},
|
||||
},
|
||||
wants: wants{
|
||||
code: 413,
|
||||
|
@ -321,7 +322,7 @@ func TestWriteHandler_handleWrite(t *testing.T) {
|
|||
state: state{
|
||||
org: testOrg("043e0780ee2b1000"),
|
||||
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
|
||||
opts: []WriteHandlerOption{WithParserMaxLines(2)},
|
||||
opts: []WriteHandlerOption{WithParserOptions(models.WithParserMaxLines(2))},
|
||||
},
|
||||
wants: wants{
|
||||
code: 413,
|
||||
|
@ -339,7 +340,7 @@ func TestWriteHandler_handleWrite(t *testing.T) {
|
|||
state: state{
|
||||
org: testOrg("043e0780ee2b1000"),
|
||||
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
|
||||
opts: []WriteHandlerOption{WithParserMaxValues(4)},
|
||||
opts: []WriteHandlerOption{WithParserOptions(models.WithParserMaxValues(4))},
|
||||
},
|
||||
wants: wants{
|
||||
code: 413,
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/http/metric"
|
||||
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
|
||||
)
|
||||
|
||||
func NewWriteUsageRecorder(w *kithttp.StatusResponseWriter, recorder metric.EventRecorder) *WriteUsageRecorder {
|
||||
return &WriteUsageRecorder{
|
||||
Writer: w,
|
||||
EventRecorder: recorder,
|
||||
}
|
||||
}
|
||||
|
||||
type WriteUsageRecorder struct {
|
||||
Writer *kithttp.StatusResponseWriter
|
||||
EventRecorder metric.EventRecorder
|
||||
}
|
||||
|
||||
func (w *WriteUsageRecorder) Write(b []byte) (int, error) {
|
||||
return w.Writer.Write(b)
|
||||
}
|
||||
|
||||
func (w *WriteUsageRecorder) Record(ctx context.Context, requestBytes int, orgID influxdb.ID, endpoint string) {
|
||||
w.EventRecorder.Record(ctx, metric.Event{
|
||||
OrgID: orgID,
|
||||
Endpoint: endpoint,
|
||||
RequestBytes: requestBytes,
|
||||
ResponseBytes: w.Writer.ResponseBytes(),
|
||||
Status: w.Writer.Code(),
|
||||
})
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
package io
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
)
|
||||
|
||||
var ErrReadLimitExceeded = errors.New("read limit exceeded")
|
||||
|
||||
// LimitedReadCloser wraps an io.ReadCloser in limiting behavior using
|
||||
// io.LimitedReader. It allows us to obtain the limit error at the time of close
|
||||
// instead of just when writing.
|
||||
type LimitedReadCloser struct {
|
||||
*io.LimitedReader
|
||||
err error
|
||||
close func() error
|
||||
}
|
||||
|
||||
// NewLimitedReadCloser returns a new LimitedReadCloser.
|
||||
func NewLimitedReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser {
|
||||
// read up to max + 1 as limited reader just returns EOF when the limit is reached
|
||||
// or when there is nothing left to read. If we exceed the max batch size by one
|
||||
// then we know the limit has been passed.
|
||||
return &LimitedReadCloser{
|
||||
LimitedReader: &io.LimitedReader{R: r, N: n + 1},
|
||||
close: r.Close,
|
||||
}
|
||||
}
|
||||
|
||||
// Close returns an ErrReadLimitExceeded when the wrapped reader exceeds the set
|
||||
// limit for number of bytes. This is safe to call more than once but not
|
||||
// concurrently.
|
||||
func (l *LimitedReadCloser) Close() (err error) {
|
||||
defer func() {
|
||||
if cerr := l.close(); cerr != nil && err == nil {
|
||||
err = cerr
|
||||
}
|
||||
|
||||
// only call close once
|
||||
l.close = func() error { return nil }
|
||||
}()
|
||||
|
||||
if l.N < 1 {
|
||||
l.err = ErrReadLimitExceeded
|
||||
}
|
||||
|
||||
return l.err
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package io
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestLimitedReadCloser_Exceeded(t *testing.T) {
|
||||
b := closer{bytes.NewBufferString("howdy")}
|
||||
rc := NewLimitedReadCloser(b, 2)
|
||||
|
||||
out, err := ioutil.ReadAll(rc)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []byte("how"), out)
|
||||
assert.Equal(t, ErrReadLimitExceeded, rc.Close())
|
||||
}
|
||||
|
||||
func TestLimitedReadCloser_Happy(t *testing.T) {
|
||||
b := closer{bytes.NewBufferString("ho")}
|
||||
rc := NewLimitedReadCloser(b, 2)
|
||||
|
||||
out, err := ioutil.ReadAll(rc)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []byte("ho"), out)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
type closer struct {
|
||||
io.Reader
|
||||
}
|
||||
|
||||
func (c closer) Close() error { return nil }
|
Loading…
Reference in New Issue