Merge pull request #282 from influxdata/alamb/gzip

feat: support gzip content-encoding for api/v2/write endpoint
pull/24376/head
Andrew Lamb 2020-09-10 12:04:13 -04:00 committed by GitHub
commit 37a982d3b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 75 additions and 15 deletions

View File

@ -6,12 +6,13 @@
#![deny(rust_2018_idioms)]
use http::header::CONTENT_ENCODING;
use tracing::{debug, error, info};
use delorean::storage::write_buffer_database::{Error as DatabaseError, WriteBufferDatabases};
use delorean_line_parser::parse_lines;
use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use futures::{self, StreamExt};
use hyper::{Body, Method, StatusCode};
use serde::Deserialize;
@ -81,6 +82,15 @@ pub enum ApplicationError {
source: serde_json::error::Error,
},
#[snafu(display("Invalid content encoding: {}", content_encoding))]
InvalidContentEncoding { content_encoding: String },
#[snafu(display("Error reading request header '{}' as Utf8: {}", header_name, source))]
ReadingHeaderAsUtf8 {
header_name: String,
source: hyper::header::ToStrError,
},
#[snafu(display("Error reading request body: {}", source))]
ReadingBody { source: hyper::error::Error },
@ -90,8 +100,14 @@ pub enum ApplicationError {
#[snafu(display("Error parsing line protocol: {}", source))]
ParsingLineProtocol { source: delorean_line_parser::Error },
#[snafu(display("Error decompressing body as gzip: {}", source))]
ReadingBodyAsGzip { source: std::io::Error },
#[snafu(display("No handler for {:?} {}", method, path))]
RouteNotFound { method: Method, path: String },
#[snafu(display("Internal error creating gzip decoder: {:?}", source))]
CreatingGzipDecoder { source: std::io::Error },
}
impl ApplicationError {
@ -106,10 +122,14 @@ impl ApplicationError {
Self::ExpectedQueryString { .. } => StatusCode::BAD_REQUEST,
Self::InvalidQueryString { .. } => StatusCode::BAD_REQUEST,
Self::InvalidRequestBody { .. } => StatusCode::BAD_REQUEST,
Self::InvalidContentEncoding { .. } => StatusCode::BAD_REQUEST,
Self::ReadingHeaderAsUtf8 { .. } => StatusCode::BAD_REQUEST,
Self::ReadingBody { .. } => StatusCode::BAD_REQUEST,
Self::ReadingBodyAsUtf8 { .. } => StatusCode::BAD_REQUEST,
Self::ParsingLineProtocol { .. } => StatusCode::BAD_REQUEST,
Self::ReadingBodyAsGzip { .. } => StatusCode::BAD_REQUEST,
Self::RouteNotFound { .. } => StatusCode::NOT_FOUND,
Self::CreatingGzipDecoder { .. } => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
@ -123,6 +143,58 @@ struct WriteInfo {
bucket: String,
}
/// Parse the request's body into raw bytes, applying size limits and
/// content encoding as needed.
async fn parse_body(req: hyper::Request<Body>) -> Result<Bytes, ApplicationError> {
// clippy says the const needs to be assigned to a local variable:
// error: a `const` item with interior mutability should not be borrowed
let header_name = CONTENT_ENCODING;
let ungzip = match req.headers().get(&header_name) {
None => false,
Some(content_encoding) => {
let content_encoding = content_encoding.to_str().context(ReadingHeaderAsUtf8 {
header_name: header_name.as_str(),
})?;
match content_encoding {
"gzip" => true,
_ => InvalidContentEncoding { content_encoding }.fail()?,
}
}
};
let mut payload = req.into_body();
let mut body = BytesMut::new();
while let Some(chunk) = payload.next().await {
let chunk = chunk.expect("Should have been able to read the next chunk");
// limit max size of in-memory payload
if (body.len() + chunk.len()) > MAX_SIZE {
return Err(ApplicationError::RequestSizeExceeded {
max_body_size: MAX_SIZE,
});
}
body.extend_from_slice(&chunk);
}
let body = body.freeze();
// apply any content encoding needed
if ungzip {
use libflate::gzip::Decoder;
use std::io::Read;
let mut decoder = Decoder::new(&body[..]).context(CreatingGzipDecoder)?;
// TODO cap the size of the decoded data (right
// now this could decompress some crazy large
// request)
let mut decoded_data = Vec::new();
decoder
.read_to_end(&mut decoded_data)
.context(ReadingBodyAsGzip)?;
Ok(decoded_data.into())
} else {
Ok(body)
}
}
#[tracing::instrument(level = "debug")]
async fn write(
req: hyper::Request<Body>,
@ -142,21 +214,9 @@ async fn write(
bucket_name: write_info.bucket.clone(),
})?;
let mut payload = req.into_body();
let body = parse_body(req).await?;
let mut body = BytesMut::new();
while let Some(chunk) = payload.next().await {
let chunk = chunk.expect("Should have been able to read the next chunk");
// limit max size of in-memory payload
if (body.len() + chunk.len()) > MAX_SIZE {
return Err(ApplicationError::RequestSizeExceeded {
max_body_size: MAX_SIZE,
});
}
body.extend_from_slice(&chunk);
}
let body = body.freeze();
let body = str::from_utf8(&body).unwrap();
let body = str::from_utf8(&body).context(ReadingBodyAsUtf8)?;
let lines: Vec<_> = parse_lines(body)
.collect::<Result<Vec<_>, delorean_line_parser::Error>>()