commit
e7d88d7383
|
@ -1350,6 +1350,7 @@ dependencies = [
|
|||
"dirs 3.0.1",
|
||||
"dotenv",
|
||||
"env_logger",
|
||||
"flate2",
|
||||
"futures",
|
||||
"generated_types",
|
||||
"hex",
|
||||
|
|
|
@ -67,7 +67,7 @@ tracing-futures="0.2.4"
|
|||
|
||||
http = "0.2.0"
|
||||
snafu = "0.6.9"
|
||||
libflate = "1.0.0"
|
||||
flate2 = "1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
assert_cmd = "1.0.0"
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
use arrow_deps::parquet::file::serialized_reader::{FileSource, SliceableCursor};
|
||||
use ingest::parquet::ChunkReader;
|
||||
/// Module to handle input files (and maybe urls?)
|
||||
use libflate::gzip;
|
||||
use packers::Name;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{
|
||||
|
@ -34,12 +33,6 @@ pub enum Error {
|
|||
source: io::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Error creating decompressor for {} ({})", input_name.display(), source))]
|
||||
UnableToCreateDecompressor {
|
||||
input_name: PathBuf,
|
||||
source: io::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Unknown input type: {} has an unknown input extension before .gz", input_name.display()))]
|
||||
UnknownInputTypeGzip { input_name: PathBuf },
|
||||
|
||||
|
@ -234,8 +227,7 @@ impl InputReader {
|
|||
Some("gz") => {
|
||||
let buffer = || {
|
||||
let file = File::open(input_name).context(UnableToOpenInput { input_name })?;
|
||||
let mut decoder = gzip::Decoder::new(file)
|
||||
.context(UnableToCreateDecompressor { input_name })?;
|
||||
let mut decoder = flate2::read::GzDecoder::new(file);
|
||||
let mut buffer = Vec::new();
|
||||
decoder
|
||||
.read_to_end(&mut buffer)
|
||||
|
|
|
@ -116,9 +116,6 @@ pub enum ApplicationError {
|
|||
|
||||
#[snafu(display("No handler for {:?} {}", method, path))]
|
||||
RouteNotFound { method: Method, path: String },
|
||||
|
||||
#[snafu(display("Internal error creating gzip decoder: {:?}", source))]
|
||||
CreatingGzipDecoder { source: std::io::Error },
|
||||
}
|
||||
|
||||
impl ApplicationError {
|
||||
|
@ -140,7 +137,6 @@ impl ApplicationError {
|
|||
Self::ParsingLineProtocol { .. } => StatusCode::BAD_REQUEST,
|
||||
Self::ReadingBodyAsGzip { .. } => StatusCode::BAD_REQUEST,
|
||||
Self::RouteNotFound { .. } => StatusCode::NOT_FOUND,
|
||||
Self::CreatingGzipDecoder { .. } => StatusCode::INTERNAL_SERVER_ERROR,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -190,12 +186,12 @@ async fn parse_body(req: hyper::Request<Body>) -> Result<Bytes, ApplicationError
|
|||
|
||||
// apply any content encoding needed
|
||||
if ungzip {
|
||||
use libflate::gzip::Decoder;
|
||||
use std::io::Read;
|
||||
let mut decoder = Decoder::new(&body[..]).context(CreatingGzipDecoder)?;
|
||||
// TODO cap the size of the decoded data (right
|
||||
// now this could decompress some crazy large
|
||||
// request)
|
||||
let decoder = flate2::read::GzDecoder::new(&body[..]);
|
||||
|
||||
// Read at most MAX_SIZE bytes to prevent a decompression bomb based
|
||||
// DoS.
|
||||
let mut decoder = decoder.take(MAX_SIZE as u64);
|
||||
let mut decoded_data = Vec::new();
|
||||
decoder
|
||||
.read_to_end(&mut decoded_data)
|
||||
|
@ -260,7 +256,7 @@ async fn write<T: DatabaseStore>(
|
|||
struct ReadInfo {
|
||||
org: String,
|
||||
bucket: String,
|
||||
// TODL This is currently a "SQL" request -- should be updated to conform
|
||||
// TODO This is currently a "SQL" request -- should be updated to conform
|
||||
// to the V2 API for reading (using timestamps, etc).
|
||||
sql_query: String,
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue