diff --git a/Cargo.lock b/Cargo.lock index 899d38c0f1..5f33043592 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2042,7 +2042,9 @@ dependencies = [ "data_types", "datafusion 0.1.0", "dotenvy", + "flate2", "futures", + "futures-util", "generated_types", "hashbrown", "http", @@ -2107,12 +2109,13 @@ dependencies = [ "client_util", "futures-util", "generated_types", - "mockito", + "influxdb_line_protocol", "prost 0.11.0", "rand", "reqwest", "thiserror", "tokio", + "tokio-stream", "tonic", ] diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index f2b400b07c..ec1392882d 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -47,6 +47,8 @@ clap = { version = "3", features = ["derive", "env"] } console-subscriber = { version = "0.1.8", optional = true, features = ["parking_lot"] } dotenvy = "0.15.5" futures = "0.3" +futures-util = { version = "0.3" } +flate2 = "1.0" hashbrown = "0.12" http = "0.2.8" humantime = "2.1.0" diff --git a/influxdb_iox/src/commands/write.rs b/influxdb_iox/src/commands/write.rs index e5aff6bd88..857a81b320 100644 --- a/influxdb_iox/src/commands/write.rs +++ b/influxdb_iox/src/commands/write.rs @@ -1,6 +1,14 @@ +use futures::StreamExt; use influxdb_iox_client::{connection::Connection, write}; -use snafu::{ResultExt, Snafu}; -use std::{fs::File, io::Read, path::PathBuf}; +use observability_deps::tracing::info; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; +use std::{ + fs::File, + io::{BufReader, Read}, + num::NonZeroUsize, + path::PathBuf, + time::Instant, +}; #[allow(clippy::enum_variant_names)] #[derive(Debug, Snafu)] @@ -11,10 +19,30 @@ pub enum Error { source: std::io::Error, }, + #[snafu(display("Error reading files: {:#?}", sources))] + ReadingFiles { sources: Vec }, + #[snafu(display("Client error: {source}"))] ClientError { source: influxdb_iox_client::error::Error, }, + + #[snafu(display("Error converting parquet: {}", source))] + Conversion { + source: parquet_to_line_protocol::Error, + }, + + #[snafu(display("Line protocol was not valid utf8: {}", source))] + InvalidUtf8 { source: std::string::FromUtf8Error }, + + #[snafu(display("Error decoding gzip {:?}: {}", file_name, source))] + Gz { + file_name: PathBuf, + source: std::io::Error, + }, + + #[snafu(display("Max concurrent uploads must be greater than zero"))] + MaxConcurrentUploadsVerfication, } pub type Result = std::result::Result; @@ -22,36 +50,176 @@ pub type Result = std::result::Result; /// Write data into the specified database #[derive(Debug, clap::Parser)] pub struct Config { + /// If specified, restricts the maxium amount of line protocol + /// sent per request to this many bytes. Defaults to 1MB + #[clap(action, long, short = 'b', default_value = "1048576")] + max_request_payload_size_bytes: usize, + + /// Uploads up to this many http requests at a time. Defaults to 10 + #[clap(action, long, short = 'c', default_value = "10")] + max_concurrent_uploads: usize, + /// The namespace into which to write #[clap(action)] namespace: String, - /// File with data to load. Currently supported formats are .lp + /// File(s) with data to load. Currently supported formats are .lp (line protocol), + /// .parquet (IOx created parquet files), and .gz (gzipped line protocol) #[clap(action)] - file_name: PathBuf, + file_names: Vec, } pub async fn command(connection: Connection, config: Config) -> Result<()> { + let start = Instant::now(); + let Config { namespace, - file_name, + file_names, + max_request_payload_size_bytes, + max_concurrent_uploads, } = config; - let file_name = &file_name; - let mut file = File::open(file_name).context(ReadingFileSnafu { file_name })?; + let max_concurrent_uploads = + NonZeroUsize::new(max_concurrent_uploads).context(MaxConcurrentUploadsVerficationSnafu)?; - let mut lp_data = String::new(); - file.read_to_string(&mut lp_data) - .context(ReadingFileSnafu { file_name })?; + info!( + num_files = file_names.len(), + max_request_payload_size_bytes, max_concurrent_uploads, "Beginning upload" + ); - let mut client = write::Client::new(connection); + // first pass is to check that all the files exist and can be + // opened and if not fail fast. + let file_open_errors: Vec<_> = file_names + .iter() + .filter_map(|file_name| { + File::open(file_name) + .context(ReadingFileSnafu { file_name }) + .err() + }) + .collect(); + + ensure!( + file_open_errors.is_empty(), + ReadingFilesSnafu { + sources: file_open_errors + } + ); + + // if everything looked good, go through and read the files out + // them potentially in parallel. + let lp_stream = futures_util::stream::iter(file_names) + .map(|file_name| tokio::task::spawn(slurp_file(file_name))) + // Since the contents of each file are buffered into a string, + // limit the number that are open at once to the maximum + // possible uploads + .buffered(max_concurrent_uploads.into()) + // warn and skip any errors + .filter_map(|res| async move { + match res { + Ok(Ok(lp_data)) => Some(lp_data), + Ok(Err(e)) => { + eprintln!("WARNING: ignoring error : {}", e); + None + } + Err(e) => { + eprintln!("WARNING: ignoring task fail: {}", e); + None + } + } + }); + + let mut client = write::Client::new(connection) + .with_max_concurrent_uploads(max_concurrent_uploads) + .with_max_request_payload_size_bytes(Some(max_request_payload_size_bytes)); let total_bytes = client - .write_lp(namespace, lp_data) + .write_lp_stream(namespace, lp_stream) .await .context(ClientSnafu)?; - println!("{} Bytes OK", total_bytes); + let elapsed = Instant::now() - start; + let mb = (total_bytes as f64) / (1024.0 * 1024.0); + let mb_per_sec = (mb / (elapsed.as_millis() as f64)) * (1000.0); + println!("{total_bytes} Bytes OK in {elapsed:?}. {mb_per_sec:.2} MB/sec"); Ok(()) } + +/// Reads the contents of `file_name into a string +/// +/// .parquet files --> iox parquet files (convert to parquet) +/// .gz --> treated as gzipped line protocol +/// .lp (or anything else) --> treated as raw line protocol +/// +async fn slurp_file(file_name: PathBuf) -> Result { + let file_name = &file_name; + + let extension = file_name + .extension() + .map(|extension| extension.to_ascii_lowercase()); + + match extension { + // Transform parquet to line protocol prior to upload + // Not the most efficient process, but it is expedient + Some(extension) if extension.to_string_lossy() == "parquet" => { + let mut lp_data = vec![]; + parquet_to_line_protocol::convert_file(file_name, &mut lp_data) + .await + .context(ConversionSnafu)?; + + let lp_data = String::from_utf8(lp_data).context(InvalidUtf8Snafu)?; + info!( + ?file_name, + file_size_bytes = lp_data.len(), + "Buffered line protocol from parquet file" + ); + Ok(lp_data) + } + // decompress as gz + Some(extension) if extension.to_string_lossy() == "gz" => { + let mut lp_data = String::new(); + let reader = + BufReader::new(File::open(&file_name).context(ReadingFileSnafu { file_name })?); + + flate2::read::GzDecoder::new(reader) + .read_to_string(&mut lp_data) + .context(GzSnafu { file_name })?; + + info!( + ?file_name, + file_size_bytes = lp_data.len(), + "Buffered line protocol from gzipped line protocol file" + ); + Ok(lp_data) + } + // anything else, treat as line protocol + Some(_) | None => { + let lp_data = + std::fs::read_to_string(file_name).context(ReadingFileSnafu { file_name })?; + + info!( + ?file_name, + file_size_bytes = lp_data.len(), + "Buffered line protocol file" + ); + Ok(lp_data) + } + } +} + +#[cfg(test)] +mod test { + use clap::Parser; + use influxdb_iox_client::write::DEFAULT_MAX_REQUEST_PAYLOAD_SIZE_BYTES; + + use super::*; + + #[test] + fn command_default_is_same_as_client_default() { + let config = Config::try_parse_from(vec!["my_db", "file1"]).unwrap(); + assert_eq!( + Some(config.max_request_payload_size_bytes), + DEFAULT_MAX_REQUEST_PAYLOAD_SIZE_BYTES + ); + } +} diff --git a/influxdb_iox/tests/end_to_end_cases/cli.rs b/influxdb_iox/tests/end_to_end_cases/cli.rs index 89f868cae8..941a7437ee 100644 --- a/influxdb_iox/tests/end_to_end_cases/cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/cli.rs @@ -6,7 +6,6 @@ use predicates::prelude::*; use serde_json::Value; use std::time::{Duration, Instant}; use tempfile::tempdir; -use test_helpers::make_temp_file; use test_helpers_end_to_end::{ maybe_skip_integration, AddAddrEnv, BindAddresses, MiniCluster, ServerType, Step, StepTest, StepTestState, @@ -526,9 +525,6 @@ async fn write_and_query() { vec![ Step::Custom(Box::new(|state: &mut StepTestState| { async { - // write line protocol to a temp file - let lp_file = make_temp_file("m,tag=1 v=2 12345"); - let lp_file_path = lp_file.path().to_string_lossy().to_string(); let router_addr = state.cluster().router().router_http_base().to_string(); let namespace = state.cluster().namespace(); @@ -537,53 +533,48 @@ async fn write_and_query() { // Validate the output of the schema CLI command Command::cargo_bin("influxdb_iox") .unwrap() + .arg("-v") .arg("-h") .arg(&router_addr) .arg("write") .arg(&namespace) - .arg(&lp_file_path) + // raw line protocol ('h2o_temperature' measurement) + .arg("../test_fixtures/lineproto/air_and_water.lp") + // gzipped line protocol ('m0') + .arg("../test_fixtures/lineproto/read_filter.lp.gz") + // iox formatted parquet ('cpu' measurement) + .arg("../test_fixtures/cpu.parquet") .assert() .success() - .stdout(predicate::str::contains("17 Bytes OK")); + // this number is the total size of + // uncompressed line protocol stored in all + // three files + .stdout(predicate::str::contains("1137058 Bytes OK")); } .boxed() })), Step::Custom(Box::new(|state: &mut StepTestState| { async { - let querier_addr = state.cluster().querier().querier_grpc_base().to_string(); - let namespace = state.cluster().namespace(); + // data from 'air_and_water.lp' + wait_for_query_result( + state, + "SELECT * from h2o_temperature order by time desc limit 10", + "| 51.3 | coyote_creek | CA | 55.1 | 1970-01-01T00:00:01.568756160Z |" + ).await; - let max_wait_time = Duration::from_secs(10); - let expected = "| 1 | 1970-01-01T00:00:00.000012345Z | 2 |"; - println!("Waiting for {expected}"); + // data from 'read_filter.lp.gz' + wait_for_query_result( + state, + "SELECT * from m0 order by time desc limit 10;", + "| value1 | value9 | value9 | value49 | value0 | 2021-04-26T13:47:39.727574Z | 1 |" + ).await; - // Validate the output of running the query CLI command appears after at most max_wait_time - let end = Instant::now() + max_wait_time; - while Instant::now() < end { - let maybe_result = Command::cargo_bin("influxdb_iox") - .unwrap() - .arg("-h") - .arg(&querier_addr) - .arg("query") - .arg(&namespace) - .arg("SELECT * from m") - .assert() - .success() - .try_stdout(predicate::str::contains(expected)); - - match maybe_result { - Err(e) => { - println!("Got err: {}, retrying", e); - } - Ok(r) => { - println!("Success: {:?}", r); - return; - } - } - // sleep and try again - tokio::time::sleep(Duration::from_millis(500)).await - } - panic!("Did not find expected output in allotted time"); + // data from 'cpu.parquet' + wait_for_query_result( + state, + "SELECT * from cpu where cpu = 'cpu2' order by time desc limit 10", + "cpu2 | MacBook-Pro-8.hsd1.ma.comcast.net | 2022-09-30T12:55:00Z" + ).await; } .boxed() })), @@ -593,6 +584,53 @@ async fn write_and_query() { .await } +/// Runs the specified query in a loop for up to 10 seconds, waiting +/// for the specified output to appear +async fn wait_for_query_result(state: &mut StepTestState<'_>, query_sql: &str, expected: &str) { + let querier_addr = state.cluster().querier().querier_grpc_base().to_string(); + let namespace = state.cluster().namespace(); + + let max_wait_time = Duration::from_secs(10); + println!("Waiting for {expected}"); + + // Validate the output of running the query CLI command appears after at most max_wait_time + let end = Instant::now() + max_wait_time; + while Instant::now() < end { + let assert = Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&querier_addr) + .arg("query") + .arg(&namespace) + .arg(query_sql) + .assert(); + + let assert = match assert.try_success() { + Err(e) => { + println!("Got err running command: {}, retrying", e); + continue; + } + Ok(a) => a, + }; + + match assert.try_stdout(predicate::str::contains(expected)) { + Err(e) => { + println!("No match: {}, retrying", e); + } + Ok(r) => { + println!("Success: {:?}", r); + return; + } + } + // sleep and try again + tokio::time::sleep(Duration::from_secs(1)).await + } + panic!( + "Did not find expected output {} within {:?}", + expected, max_wait_time + ); +} + /// Test the schema cli command #[tokio::test] async fn namespaces_cli() { diff --git a/influxdb_iox_client/Cargo.toml b/influxdb_iox_client/Cargo.toml index 9b674c4a33..3cb742bf38 100644 --- a/influxdb_iox_client/Cargo.toml +++ b/influxdb_iox_client/Cargo.toml @@ -13,6 +13,7 @@ format = ["arrow", "arrow_util"] # Workspace dependencies, in alphabetical order arrow_util = { path = "../arrow_util", optional = true } client_util = { path = "../client_util" } +influxdb_line_protocol = { path = "../influxdb_line_protocol"} generated_types = { path = "../generated_types", default-features = false, features = ["data_types_conversions"] } # Crates.io dependencies, in alphabetical order @@ -23,9 +24,7 @@ futures-util = { version = "0.3", optional = true } prost = "0.11" rand = "0.8.3" reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] } +tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-thread"] } +tokio-stream = "0.1.10" thiserror = "1.0.37" tonic = { version = "0.8" } - -[dev-dependencies] # In alphabetical order -tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-thread"] } -mockito = "0.31" \ No newline at end of file diff --git a/influxdb_iox_client/src/client/write.rs b/influxdb_iox_client/src/client/write.rs index 1ee584d8a0..4771970f11 100644 --- a/influxdb_iox_client/src/client/write.rs +++ b/influxdb_iox_client/src/client/write.rs @@ -1,15 +1,16 @@ -/// Re-export generated_types -pub mod generated_types { - pub use generated_types::influxdata::pbdata::v1::*; -} +use std::{fmt::Debug, num::NonZeroUsize, sync::Arc}; use client_util::{connection::HttpConnection, namespace_translation::split_namespace}; +use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStreamExt}; use crate::{ connection::Connection, error::{translate_response, Error}, }; -use reqwest::Method; +use reqwest::{Body, Method}; + +/// The default value for the maximum size of each request, in bytes +pub const DEFAULT_MAX_REQUEST_PAYLOAD_SIZE_BYTES: Option = Some(1024 * 1024); /// An IOx Write API client. /// @@ -37,18 +38,67 @@ use reqwest::Method; /// ``` #[derive(Debug, Clone)] pub struct Client { - inner: HttpConnection, + /// The inner client used to actually make requests. + /// + /// Uses a trait for test mocking. + /// + /// Does not expose the trait in the `Client` type to avoid + /// exposing an internal implementation detail (the trait) in the + /// public interface. + inner: Arc, + + /// If `Some`, restricts the maximum amount of line protocol + /// sent per request to this many bytes. If `None`, does not restrict + /// the amount sent per request. Defaults to `Some(1MB)` + /// + /// Splitting the upload size consumes a non trivial amount of CPU + /// to find line protocol boundaries. This can be disabled by + /// setting `max_request_payload_size_bytes` to `None`. + max_request_payload_size_bytes: Option, + + /// Makes this many concurrent requests at a time. Defaults to 1 + max_concurrent_uploads: NonZeroUsize, } impl Client { /// Creates a new client with the provided connection pub fn new(connection: Connection) -> Self { + Self::new_with_maker(Arc::new(connection.into_http_connection())) + } + + /// Creates a new client with the provided request maker + fn new_with_maker(inner: Arc) -> Self { Self { - inner: connection.into_http_connection(), + inner, + max_request_payload_size_bytes: DEFAULT_MAX_REQUEST_PAYLOAD_SIZE_BYTES, + max_concurrent_uploads: NonZeroUsize::new(1).unwrap(), } } - /// Write the [LineProtocol] formatted data in `lp_data` to + /// Override the default of sending 1MB of line protocol per request. + /// If `Some` is specified, restricts the maximum amount of line protocol + /// sent per request to this many bytes. If `None`, does not restrict the amount of + /// line protocol sent per request. + pub fn with_max_request_payload_size_bytes( + self, + max_request_payload_size_bytes: Option, + ) -> Self { + Self { + max_request_payload_size_bytes, + ..self + } + } + + /// The client makes this many concurrent uploads at a + /// time. Defaults to 1. + pub fn with_max_concurrent_uploads(self, max_concurrent_uploads: NonZeroUsize) -> Self { + Self { + max_concurrent_uploads, + ..self + } + } + + /// Write the [LineProtocol] formatted string in `lp_data` to /// namespace `namespace`. /// /// Returns the number of bytes which were written to the database @@ -59,11 +109,24 @@ impl Client { namespace: impl AsRef + Send, lp_data: impl Into + Send, ) -> Result { - let lp_data = lp_data.into(); - let data_len = lp_data.len(); + let sources = futures_util::stream::iter([lp_data.into()]); - let write_url = format!("{}api/v2/write", self.inner.uri()); + self.write_lp_stream(namespace, sources).await + } + /// Write the stream of [LineProtocol] formatted strings in + /// `sources` to namespace `namespace`. It is assumed that + /// individual lines (points) do not cross these strings + /// + /// Returns the number of bytes, in total, which were written to + /// the database + /// + /// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format + pub async fn write_lp_stream( + &mut self, + namespace: impl AsRef + Send, + sources: impl Stream + Send, + ) -> Result { let (org_id, bucket_id) = split_namespace(namespace.as_ref()).map_err(|e| { Error::invalid_argument( "namespace", @@ -71,47 +134,302 @@ impl Client { ) })?; - let response = self - .inner - .client() - .request(Method::POST, &write_url) - .query(&[("bucket", bucket_id), ("org", org_id)]) - .body(lp_data) - .send() + let max_concurrent_uploads: usize = self.max_concurrent_uploads.into(); + let max_request_payload_size_bytes = self.max_request_payload_size_bytes; + + // make a stream and process in parallel + let results = sources + // split each input source in parallel, if possible + .flat_map(|source| { + split_lp( + source, + max_request_payload_size_bytes, + max_concurrent_uploads, + ) + }) + // do the actual write + .map(|source| { + let org_id = org_id.to_string(); + let bucket_id = bucket_id.to_string(); + let inner = Arc::clone(&self.inner); + + tokio::task::spawn( + async move { inner.write_source(org_id, bucket_id, source).await }, + ) + }) + // Do the uploads in parallel + .buffered(max_concurrent_uploads) + .try_collect::>() + // handle panics in tasks .await - .map_err(Error::client)?; + .map_err(Error::client)? + // find / return any errors + .into_iter() + .collect::, Error>>()?; - translate_response(response).await?; + Ok(results.into_iter().sum()) + } +} - Ok(data_len) +/// Something that knows how to send http data. Exists so it can be +/// mocked out for testing +trait RequestMaker: Debug + Send + Sync { + /// Write the body data to the specified org, bucket, and + /// returning the number of bytes written + /// + /// (this is implemented manually to avoid `async_trait`) + fn write_source( + &self, + org_id: String, + bucket_id: String, + body: String, + ) -> BoxFuture<'_, Result>; +} + +impl RequestMaker for HttpConnection { + fn write_source( + &self, + org_id: String, + bucket_id: String, + body: String, + ) -> BoxFuture<'_, Result> { + let write_url = format!("{}api/v2/write", self.uri()); + + async move { + let body: Body = body.into(); + + let data_len = body.as_bytes().map(|b| b.len()).unwrap_or(0); + + let response = self + .client() + .request(Method::POST, &write_url) + .query(&[("bucket", bucket_id), ("org", org_id)]) + .body(body) + .send() + .await + .map_err(Error::client)?; + + translate_response(response).await?; + + Ok(data_len) + } + .boxed() + } +} + +/// splits input line protocol into one or more sizes of at most +/// `max_chunk` on line breaks in a separte tokio task +fn split_lp( + input: String, + max_chunk_size: Option, + max_concurrent_uploads: usize, +) -> impl Stream { + let (tx, rx) = tokio::sync::mpsc::channel(max_concurrent_uploads); + + tokio::task::spawn(async move { + match max_chunk_size { + None => { + // ignore errors (means the receiver hung up but nothing to communicate + tx.send(input).await.ok(); + } + Some(max_chunk_size) => { + // use the actual line protocol parser to split on valid boundaries + let mut acc = LineAccumulator::new(max_chunk_size); + for l in influxdb_line_protocol::split_lines(&input) { + if let Some(chunk) = acc.push(l) { + // abort if receiver has hungup + if tx.send(chunk).await.is_err() { + return; + } + } + } + if let Some(chunk) = acc.flush() { + tx.send(chunk).await.ok(); + } + } + } + }); + + tokio_stream::wrappers::ReceiverStream::new(rx) +} +#[derive(Debug)] +struct LineAccumulator { + current_chunk: String, + max_chunk_size: usize, +} + +impl LineAccumulator { + fn new(max_chunk_size: usize) -> Self { + Self { + current_chunk: String::with_capacity(max_chunk_size), + max_chunk_size, + } + } + + // Add data `l` to the current chunk being created, returning the + // current chunk if complete. + fn push(&mut self, l: &str) -> Option { + let chunk = if self.current_chunk.len() + l.len() + 1 > self.max_chunk_size { + self.flush() + } else { + None + }; + + if !self.current_chunk.is_empty() { + self.current_chunk += "\n"; + } + + self.current_chunk += l; + chunk + } + + /// allocate a new chunk with the right size, returning the currently built chunk if it has non zero length + /// `self.current_chunk.len()` is zero + fn flush(&mut self) -> Option { + if !self.current_chunk.is_empty() { + let mut new_chunk = String::with_capacity(self.max_chunk_size); + std::mem::swap(&mut new_chunk, &mut self.current_chunk); + Some(new_chunk) + } else { + None + } } } #[cfg(test)] mod tests { + use std::sync::Mutex; + use super::*; - use crate::connection::Builder; #[tokio::test] - /// Ensure the basic plumbing is hooked up correctly - async fn basic() { - let url = mockito::server_url(); - - let connection = Builder::new().build(&url).await.unwrap(); + async fn test() { + let mock = Arc::new(MockRequestMaker::new()); let namespace = "orgname_bucketname"; let data = "m,t=foo f=4"; - let m = mockito::mock("POST", "/api/v2/write?bucket=bucketname&org=orgname") - .with_status(201) - .match_body(data) - .create(); + let expected = vec![MockRequest { + org_id: "orgname".into(), + bucket_id: "bucketname".into(), + body: data.into(), + }]; - let res = Client::new(connection).write_lp(namespace, data).await; - - m.assert(); - - let num_bytes = res.expect("Error making write request"); + let num_bytes = Client::new_with_maker(Arc::clone(&mock) as _) + .write_lp(namespace, data) + .await + .unwrap(); + assert_eq!(expected, mock.requests()); assert_eq!(num_bytes, 11); } + + #[tokio::test] + async fn test_max_request_payload_size() { + let mock = Arc::new(MockRequestMaker::new()); + + let namespace = "orgname_bucketname"; + let data = "m,t=foo f=4\n\ + m,t=bar f=3\n\ + m,t=fooddddddd f=4"; + + // expect the data to be broken up into two chunks: + let expected = vec![ + MockRequest { + org_id: "orgname".into(), + bucket_id: "bucketname".into(), + body: "m,t=foo f=4\nm,t=bar f=3".into(), + }, + MockRequest { + org_id: "orgname".into(), + bucket_id: "bucketname".into(), + body: "m,t=fooddddddd f=4".into(), + }, + ]; + + let num_bytes = Client::new_with_maker(Arc::clone(&mock) as _) + // enough to get first two lines, but not last + .with_max_request_payload_size_bytes(Some(30)) + .write_lp(namespace, data) + .await + .unwrap(); + assert_eq!(expected, mock.requests()); + assert_eq!(num_bytes, 41); + } + + #[tokio::test] + async fn test_write_lp_stream() { + let mock = Arc::new(MockRequestMaker::new()); + + let namespace = "orgname_bucketname"; + let data = futures_util::stream::iter( + vec!["m,t=foo f=4", "m,t=bar f=3"] + .into_iter() + .map(|s| s.to_string()), + ); + + // expect the data to come in two chunks + let expected = vec![ + MockRequest { + org_id: "orgname".into(), + bucket_id: "bucketname".into(), + body: "m,t=foo f=4".into(), + }, + MockRequest { + org_id: "orgname".into(), + bucket_id: "bucketname".into(), + body: "m,t=bar f=3".into(), + }, + ]; + + let num_bytes = Client::new_with_maker(Arc::clone(&mock) as _) + .write_lp_stream(namespace, data) + .await + .unwrap(); + assert_eq!(expected, mock.requests()); + assert_eq!(num_bytes, 22); + } + + #[derive(Debug, Clone, PartialEq)] + struct MockRequest { + org_id: String, + bucket_id: String, + body: String, + } + + #[derive(Debug)] + struct MockRequestMaker { + requests: Mutex>, + } + + impl MockRequestMaker { + fn new() -> Self { + Self { + requests: Mutex::new(vec![]), + } + } + + /// get a copy of the requests that were made using this mock + fn requests(&self) -> Vec { + self.requests.lock().unwrap().clone() + } + } + + impl RequestMaker for MockRequestMaker { + fn write_source( + &self, + org_id: String, + bucket_id: String, + body: String, + ) -> BoxFuture<'_, Result> { + let sz = body.len(); + + self.requests.lock().unwrap().push(MockRequest { + org_id, + bucket_id, + body, + }); + + async move { Ok(sz) }.boxed() + } + } } diff --git a/influxdb_line_protocol/src/lib.rs b/influxdb_line_protocol/src/lib.rs index 07d9ca14ea..91c1c2077d 100644 --- a/influxdb_line_protocol/src/lib.rs +++ b/influxdb_line_protocol/src/lib.rs @@ -529,7 +529,7 @@ pub fn parse_lines(input: &str) -> impl Iterator>> /// logic duplication for scanning fields, duplicating it also means /// we can be more sure of the compatibility of the rust parser and /// the canonical Go parser. -fn split_lines(input: &str) -> impl Iterator { +pub fn split_lines(input: &str) -> impl Iterator { // NB: This is ported as closely as possibly from the original Go code: let mut quoted = false; let mut fields = false; diff --git a/test_fixtures/cpu.parquet b/test_fixtures/cpu.parquet new file mode 100644 index 0000000000..86cae861b6 Binary files /dev/null and b/test_fixtures/cpu.parquet differ diff --git a/test_helpers_end_to_end/src/client.rs b/test_helpers_end_to_end/src/client.rs index 0f4567a973..5017b0bbba 100644 --- a/test_helpers_end_to_end/src/client.rs +++ b/test_helpers_end_to_end/src/client.rs @@ -1,12 +1,12 @@ //! Client helpers for writing end to end ng tests use arrow::record_batch::RecordBatch; use futures::{stream::FuturesUnordered, StreamExt}; +use generated_types::influxdata::pbdata::v1::WriteResponse; use http::Response; use hyper::{Body, Client, Request}; use influxdb_iox_client::{ connection::Connection, flight::generated_types::ReadInfo, - write::generated_types::WriteResponse, write_info::generated_types::{merge_responses, GetWriteInfoResponse, ShardStatus}, }; use observability_deps::tracing::info;