feat: support parallel, chunked upload via `influxdb_iox write` of line protocol, gzip'd line protocol, and parquet (#5757)
* feat: Upload in small chunks and in parallel * fix: doclink * fix: Apply suggestions from code review Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> * fix: Update influxdb_iox_client/src/client/write.rs * fix: fixup error handling and fmt * fix: Make default chunk sizes the same and add docs * fix: clippy Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>pull/24376/head
parent
fc0634792b
commit
82d5c7f336
|
|
@ -2042,7 +2042,9 @@ dependencies = [
|
|||
"data_types",
|
||||
"datafusion 0.1.0",
|
||||
"dotenvy",
|
||||
"flate2",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"generated_types",
|
||||
"hashbrown",
|
||||
"http",
|
||||
|
|
@ -2107,12 +2109,13 @@ dependencies = [
|
|||
"client_util",
|
||||
"futures-util",
|
||||
"generated_types",
|
||||
"mockito",
|
||||
"influxdb_line_protocol",
|
||||
"prost 0.11.0",
|
||||
"rand",
|
||||
"reqwest",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic",
|
||||
]
|
||||
|
||||
|
|
|
|||
|
|
@ -47,6 +47,8 @@ clap = { version = "3", features = ["derive", "env"] }
|
|||
console-subscriber = { version = "0.1.8", optional = true, features = ["parking_lot"] }
|
||||
dotenvy = "0.15.5"
|
||||
futures = "0.3"
|
||||
futures-util = { version = "0.3" }
|
||||
flate2 = "1.0"
|
||||
hashbrown = "0.12"
|
||||
http = "0.2.8"
|
||||
humantime = "2.1.0"
|
||||
|
|
|
|||
|
|
@ -1,6 +1,14 @@
|
|||
use futures::StreamExt;
|
||||
use influxdb_iox_client::{connection::Connection, write};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{fs::File, io::Read, path::PathBuf};
|
||||
use observability_deps::tracing::info;
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
use std::{
|
||||
fs::File,
|
||||
io::{BufReader, Read},
|
||||
num::NonZeroUsize,
|
||||
path::PathBuf,
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Snafu)]
|
||||
|
|
@ -11,10 +19,30 @@ pub enum Error {
|
|||
source: std::io::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Error reading files: {:#?}", sources))]
|
||||
ReadingFiles { sources: Vec<Error> },
|
||||
|
||||
#[snafu(display("Client error: {source}"))]
|
||||
ClientError {
|
||||
source: influxdb_iox_client::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Error converting parquet: {}", source))]
|
||||
Conversion {
|
||||
source: parquet_to_line_protocol::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Line protocol was not valid utf8: {}", source))]
|
||||
InvalidUtf8 { source: std::string::FromUtf8Error },
|
||||
|
||||
#[snafu(display("Error decoding gzip {:?}: {}", file_name, source))]
|
||||
Gz {
|
||||
file_name: PathBuf,
|
||||
source: std::io::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Max concurrent uploads must be greater than zero"))]
|
||||
MaxConcurrentUploadsVerfication,
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
|
@ -22,36 +50,176 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// Write data into the specified database
|
||||
#[derive(Debug, clap::Parser)]
|
||||
pub struct Config {
|
||||
/// If specified, restricts the maxium amount of line protocol
|
||||
/// sent per request to this many bytes. Defaults to 1MB
|
||||
#[clap(action, long, short = 'b', default_value = "1048576")]
|
||||
max_request_payload_size_bytes: usize,
|
||||
|
||||
/// Uploads up to this many http requests at a time. Defaults to 10
|
||||
#[clap(action, long, short = 'c', default_value = "10")]
|
||||
max_concurrent_uploads: usize,
|
||||
|
||||
/// The namespace into which to write
|
||||
#[clap(action)]
|
||||
namespace: String,
|
||||
|
||||
/// File with data to load. Currently supported formats are .lp
|
||||
/// File(s) with data to load. Currently supported formats are .lp (line protocol),
|
||||
/// .parquet (IOx created parquet files), and .gz (gzipped line protocol)
|
||||
#[clap(action)]
|
||||
file_name: PathBuf,
|
||||
file_names: Vec<PathBuf>,
|
||||
}
|
||||
|
||||
pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
||||
let start = Instant::now();
|
||||
|
||||
let Config {
|
||||
namespace,
|
||||
file_name,
|
||||
file_names,
|
||||
max_request_payload_size_bytes,
|
||||
max_concurrent_uploads,
|
||||
} = config;
|
||||
let file_name = &file_name;
|
||||
|
||||
let mut file = File::open(file_name).context(ReadingFileSnafu { file_name })?;
|
||||
let max_concurrent_uploads =
|
||||
NonZeroUsize::new(max_concurrent_uploads).context(MaxConcurrentUploadsVerficationSnafu)?;
|
||||
|
||||
let mut lp_data = String::new();
|
||||
file.read_to_string(&mut lp_data)
|
||||
.context(ReadingFileSnafu { file_name })?;
|
||||
info!(
|
||||
num_files = file_names.len(),
|
||||
max_request_payload_size_bytes, max_concurrent_uploads, "Beginning upload"
|
||||
);
|
||||
|
||||
let mut client = write::Client::new(connection);
|
||||
// first pass is to check that all the files exist and can be
|
||||
// opened and if not fail fast.
|
||||
let file_open_errors: Vec<_> = file_names
|
||||
.iter()
|
||||
.filter_map(|file_name| {
|
||||
File::open(file_name)
|
||||
.context(ReadingFileSnafu { file_name })
|
||||
.err()
|
||||
})
|
||||
.collect();
|
||||
|
||||
ensure!(
|
||||
file_open_errors.is_empty(),
|
||||
ReadingFilesSnafu {
|
||||
sources: file_open_errors
|
||||
}
|
||||
);
|
||||
|
||||
// if everything looked good, go through and read the files out
|
||||
// them potentially in parallel.
|
||||
let lp_stream = futures_util::stream::iter(file_names)
|
||||
.map(|file_name| tokio::task::spawn(slurp_file(file_name)))
|
||||
// Since the contents of each file are buffered into a string,
|
||||
// limit the number that are open at once to the maximum
|
||||
// possible uploads
|
||||
.buffered(max_concurrent_uploads.into())
|
||||
// warn and skip any errors
|
||||
.filter_map(|res| async move {
|
||||
match res {
|
||||
Ok(Ok(lp_data)) => Some(lp_data),
|
||||
Ok(Err(e)) => {
|
||||
eprintln!("WARNING: ignoring error : {}", e);
|
||||
None
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("WARNING: ignoring task fail: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut client = write::Client::new(connection)
|
||||
.with_max_concurrent_uploads(max_concurrent_uploads)
|
||||
.with_max_request_payload_size_bytes(Some(max_request_payload_size_bytes));
|
||||
|
||||
let total_bytes = client
|
||||
.write_lp(namespace, lp_data)
|
||||
.write_lp_stream(namespace, lp_stream)
|
||||
.await
|
||||
.context(ClientSnafu)?;
|
||||
|
||||
println!("{} Bytes OK", total_bytes);
|
||||
let elapsed = Instant::now() - start;
|
||||
let mb = (total_bytes as f64) / (1024.0 * 1024.0);
|
||||
let mb_per_sec = (mb / (elapsed.as_millis() as f64)) * (1000.0);
|
||||
println!("{total_bytes} Bytes OK in {elapsed:?}. {mb_per_sec:.2} MB/sec");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reads the contents of `file_name into a string
|
||||
///
|
||||
/// .parquet files --> iox parquet files (convert to parquet)
|
||||
/// .gz --> treated as gzipped line protocol
|
||||
/// .lp (or anything else) --> treated as raw line protocol
|
||||
///
|
||||
async fn slurp_file(file_name: PathBuf) -> Result<String> {
|
||||
let file_name = &file_name;
|
||||
|
||||
let extension = file_name
|
||||
.extension()
|
||||
.map(|extension| extension.to_ascii_lowercase());
|
||||
|
||||
match extension {
|
||||
// Transform parquet to line protocol prior to upload
|
||||
// Not the most efficient process, but it is expedient
|
||||
Some(extension) if extension.to_string_lossy() == "parquet" => {
|
||||
let mut lp_data = vec![];
|
||||
parquet_to_line_protocol::convert_file(file_name, &mut lp_data)
|
||||
.await
|
||||
.context(ConversionSnafu)?;
|
||||
|
||||
let lp_data = String::from_utf8(lp_data).context(InvalidUtf8Snafu)?;
|
||||
info!(
|
||||
?file_name,
|
||||
file_size_bytes = lp_data.len(),
|
||||
"Buffered line protocol from parquet file"
|
||||
);
|
||||
Ok(lp_data)
|
||||
}
|
||||
// decompress as gz
|
||||
Some(extension) if extension.to_string_lossy() == "gz" => {
|
||||
let mut lp_data = String::new();
|
||||
let reader =
|
||||
BufReader::new(File::open(&file_name).context(ReadingFileSnafu { file_name })?);
|
||||
|
||||
flate2::read::GzDecoder::new(reader)
|
||||
.read_to_string(&mut lp_data)
|
||||
.context(GzSnafu { file_name })?;
|
||||
|
||||
info!(
|
||||
?file_name,
|
||||
file_size_bytes = lp_data.len(),
|
||||
"Buffered line protocol from gzipped line protocol file"
|
||||
);
|
||||
Ok(lp_data)
|
||||
}
|
||||
// anything else, treat as line protocol
|
||||
Some(_) | None => {
|
||||
let lp_data =
|
||||
std::fs::read_to_string(file_name).context(ReadingFileSnafu { file_name })?;
|
||||
|
||||
info!(
|
||||
?file_name,
|
||||
file_size_bytes = lp_data.len(),
|
||||
"Buffered line protocol file"
|
||||
);
|
||||
Ok(lp_data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use clap::Parser;
|
||||
use influxdb_iox_client::write::DEFAULT_MAX_REQUEST_PAYLOAD_SIZE_BYTES;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn command_default_is_same_as_client_default() {
|
||||
let config = Config::try_parse_from(vec!["my_db", "file1"]).unwrap();
|
||||
assert_eq!(
|
||||
Some(config.max_request_payload_size_bytes),
|
||||
DEFAULT_MAX_REQUEST_PAYLOAD_SIZE_BYTES
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ use predicates::prelude::*;
|
|||
use serde_json::Value;
|
||||
use std::time::{Duration, Instant};
|
||||
use tempfile::tempdir;
|
||||
use test_helpers::make_temp_file;
|
||||
use test_helpers_end_to_end::{
|
||||
maybe_skip_integration, AddAddrEnv, BindAddresses, MiniCluster, ServerType, Step, StepTest,
|
||||
StepTestState,
|
||||
|
|
@ -526,9 +525,6 @@ async fn write_and_query() {
|
|||
vec![
|
||||
Step::Custom(Box::new(|state: &mut StepTestState| {
|
||||
async {
|
||||
// write line protocol to a temp file
|
||||
let lp_file = make_temp_file("m,tag=1 v=2 12345");
|
||||
let lp_file_path = lp_file.path().to_string_lossy().to_string();
|
||||
let router_addr = state.cluster().router().router_http_base().to_string();
|
||||
|
||||
let namespace = state.cluster().namespace();
|
||||
|
|
@ -537,53 +533,48 @@ async fn write_and_query() {
|
|||
// Validate the output of the schema CLI command
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("-v")
|
||||
.arg("-h")
|
||||
.arg(&router_addr)
|
||||
.arg("write")
|
||||
.arg(&namespace)
|
||||
.arg(&lp_file_path)
|
||||
// raw line protocol ('h2o_temperature' measurement)
|
||||
.arg("../test_fixtures/lineproto/air_and_water.lp")
|
||||
// gzipped line protocol ('m0')
|
||||
.arg("../test_fixtures/lineproto/read_filter.lp.gz")
|
||||
// iox formatted parquet ('cpu' measurement)
|
||||
.arg("../test_fixtures/cpu.parquet")
|
||||
.assert()
|
||||
.success()
|
||||
.stdout(predicate::str::contains("17 Bytes OK"));
|
||||
// this number is the total size of
|
||||
// uncompressed line protocol stored in all
|
||||
// three files
|
||||
.stdout(predicate::str::contains("1137058 Bytes OK"));
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
Step::Custom(Box::new(|state: &mut StepTestState| {
|
||||
async {
|
||||
let querier_addr = state.cluster().querier().querier_grpc_base().to_string();
|
||||
let namespace = state.cluster().namespace();
|
||||
// data from 'air_and_water.lp'
|
||||
wait_for_query_result(
|
||||
state,
|
||||
"SELECT * from h2o_temperature order by time desc limit 10",
|
||||
"| 51.3 | coyote_creek | CA | 55.1 | 1970-01-01T00:00:01.568756160Z |"
|
||||
).await;
|
||||
|
||||
let max_wait_time = Duration::from_secs(10);
|
||||
let expected = "| 1 | 1970-01-01T00:00:00.000012345Z | 2 |";
|
||||
println!("Waiting for {expected}");
|
||||
// data from 'read_filter.lp.gz'
|
||||
wait_for_query_result(
|
||||
state,
|
||||
"SELECT * from m0 order by time desc limit 10;",
|
||||
"| value1 | value9 | value9 | value49 | value0 | 2021-04-26T13:47:39.727574Z | 1 |"
|
||||
).await;
|
||||
|
||||
// Validate the output of running the query CLI command appears after at most max_wait_time
|
||||
let end = Instant::now() + max_wait_time;
|
||||
while Instant::now() < end {
|
||||
let maybe_result = Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("-h")
|
||||
.arg(&querier_addr)
|
||||
.arg("query")
|
||||
.arg(&namespace)
|
||||
.arg("SELECT * from m")
|
||||
.assert()
|
||||
.success()
|
||||
.try_stdout(predicate::str::contains(expected));
|
||||
|
||||
match maybe_result {
|
||||
Err(e) => {
|
||||
println!("Got err: {}, retrying", e);
|
||||
}
|
||||
Ok(r) => {
|
||||
println!("Success: {:?}", r);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// sleep and try again
|
||||
tokio::time::sleep(Duration::from_millis(500)).await
|
||||
}
|
||||
panic!("Did not find expected output in allotted time");
|
||||
// data from 'cpu.parquet'
|
||||
wait_for_query_result(
|
||||
state,
|
||||
"SELECT * from cpu where cpu = 'cpu2' order by time desc limit 10",
|
||||
"cpu2 | MacBook-Pro-8.hsd1.ma.comcast.net | 2022-09-30T12:55:00Z"
|
||||
).await;
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
|
|
@ -593,6 +584,53 @@ async fn write_and_query() {
|
|||
.await
|
||||
}
|
||||
|
||||
/// Runs the specified query in a loop for up to 10 seconds, waiting
|
||||
/// for the specified output to appear
|
||||
async fn wait_for_query_result(state: &mut StepTestState<'_>, query_sql: &str, expected: &str) {
|
||||
let querier_addr = state.cluster().querier().querier_grpc_base().to_string();
|
||||
let namespace = state.cluster().namespace();
|
||||
|
||||
let max_wait_time = Duration::from_secs(10);
|
||||
println!("Waiting for {expected}");
|
||||
|
||||
// Validate the output of running the query CLI command appears after at most max_wait_time
|
||||
let end = Instant::now() + max_wait_time;
|
||||
while Instant::now() < end {
|
||||
let assert = Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("-h")
|
||||
.arg(&querier_addr)
|
||||
.arg("query")
|
||||
.arg(&namespace)
|
||||
.arg(query_sql)
|
||||
.assert();
|
||||
|
||||
let assert = match assert.try_success() {
|
||||
Err(e) => {
|
||||
println!("Got err running command: {}, retrying", e);
|
||||
continue;
|
||||
}
|
||||
Ok(a) => a,
|
||||
};
|
||||
|
||||
match assert.try_stdout(predicate::str::contains(expected)) {
|
||||
Err(e) => {
|
||||
println!("No match: {}, retrying", e);
|
||||
}
|
||||
Ok(r) => {
|
||||
println!("Success: {:?}", r);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// sleep and try again
|
||||
tokio::time::sleep(Duration::from_secs(1)).await
|
||||
}
|
||||
panic!(
|
||||
"Did not find expected output {} within {:?}",
|
||||
expected, max_wait_time
|
||||
);
|
||||
}
|
||||
|
||||
/// Test the schema cli command
|
||||
#[tokio::test]
|
||||
async fn namespaces_cli() {
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ format = ["arrow", "arrow_util"]
|
|||
# Workspace dependencies, in alphabetical order
|
||||
arrow_util = { path = "../arrow_util", optional = true }
|
||||
client_util = { path = "../client_util" }
|
||||
influxdb_line_protocol = { path = "../influxdb_line_protocol"}
|
||||
generated_types = { path = "../generated_types", default-features = false, features = ["data_types_conversions"] }
|
||||
|
||||
# Crates.io dependencies, in alphabetical order
|
||||
|
|
@ -23,9 +24,7 @@ futures-util = { version = "0.3", optional = true }
|
|||
prost = "0.11"
|
||||
rand = "0.8.3"
|
||||
reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] }
|
||||
tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-thread"] }
|
||||
tokio-stream = "0.1.10"
|
||||
thiserror = "1.0.37"
|
||||
tonic = { version = "0.8" }
|
||||
|
||||
[dev-dependencies] # In alphabetical order
|
||||
tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-thread"] }
|
||||
mockito = "0.31"
|
||||
|
|
@ -1,15 +1,16 @@
|
|||
/// Re-export generated_types
|
||||
pub mod generated_types {
|
||||
pub use generated_types::influxdata::pbdata::v1::*;
|
||||
}
|
||||
use std::{fmt::Debug, num::NonZeroUsize, sync::Arc};
|
||||
|
||||
use client_util::{connection::HttpConnection, namespace_translation::split_namespace};
|
||||
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStreamExt};
|
||||
|
||||
use crate::{
|
||||
connection::Connection,
|
||||
error::{translate_response, Error},
|
||||
};
|
||||
use reqwest::Method;
|
||||
use reqwest::{Body, Method};
|
||||
|
||||
/// The default value for the maximum size of each request, in bytes
|
||||
pub const DEFAULT_MAX_REQUEST_PAYLOAD_SIZE_BYTES: Option<usize> = Some(1024 * 1024);
|
||||
|
||||
/// An IOx Write API client.
|
||||
///
|
||||
|
|
@ -37,18 +38,67 @@ use reqwest::Method;
|
|||
/// ```
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Client {
|
||||
inner: HttpConnection,
|
||||
/// The inner client used to actually make requests.
|
||||
///
|
||||
/// Uses a trait for test mocking.
|
||||
///
|
||||
/// Does not expose the trait in the `Client` type to avoid
|
||||
/// exposing an internal implementation detail (the trait) in the
|
||||
/// public interface.
|
||||
inner: Arc<dyn RequestMaker>,
|
||||
|
||||
/// If `Some`, restricts the maximum amount of line protocol
|
||||
/// sent per request to this many bytes. If `None`, does not restrict
|
||||
/// the amount sent per request. Defaults to `Some(1MB)`
|
||||
///
|
||||
/// Splitting the upload size consumes a non trivial amount of CPU
|
||||
/// to find line protocol boundaries. This can be disabled by
|
||||
/// setting `max_request_payload_size_bytes` to `None`.
|
||||
max_request_payload_size_bytes: Option<usize>,
|
||||
|
||||
/// Makes this many concurrent requests at a time. Defaults to 1
|
||||
max_concurrent_uploads: NonZeroUsize,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
/// Creates a new client with the provided connection
|
||||
pub fn new(connection: Connection) -> Self {
|
||||
Self::new_with_maker(Arc::new(connection.into_http_connection()))
|
||||
}
|
||||
|
||||
/// Creates a new client with the provided request maker
|
||||
fn new_with_maker(inner: Arc<dyn RequestMaker>) -> Self {
|
||||
Self {
|
||||
inner: connection.into_http_connection(),
|
||||
inner,
|
||||
max_request_payload_size_bytes: DEFAULT_MAX_REQUEST_PAYLOAD_SIZE_BYTES,
|
||||
max_concurrent_uploads: NonZeroUsize::new(1).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Write the [LineProtocol] formatted data in `lp_data` to
|
||||
/// Override the default of sending 1MB of line protocol per request.
|
||||
/// If `Some` is specified, restricts the maximum amount of line protocol
|
||||
/// sent per request to this many bytes. If `None`, does not restrict the amount of
|
||||
/// line protocol sent per request.
|
||||
pub fn with_max_request_payload_size_bytes(
|
||||
self,
|
||||
max_request_payload_size_bytes: Option<usize>,
|
||||
) -> Self {
|
||||
Self {
|
||||
max_request_payload_size_bytes,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
/// The client makes this many concurrent uploads at a
|
||||
/// time. Defaults to 1.
|
||||
pub fn with_max_concurrent_uploads(self, max_concurrent_uploads: NonZeroUsize) -> Self {
|
||||
Self {
|
||||
max_concurrent_uploads,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
/// Write the [LineProtocol] formatted string in `lp_data` to
|
||||
/// namespace `namespace`.
|
||||
///
|
||||
/// Returns the number of bytes which were written to the database
|
||||
|
|
@ -59,11 +109,24 @@ impl Client {
|
|||
namespace: impl AsRef<str> + Send,
|
||||
lp_data: impl Into<String> + Send,
|
||||
) -> Result<usize, Error> {
|
||||
let lp_data = lp_data.into();
|
||||
let data_len = lp_data.len();
|
||||
let sources = futures_util::stream::iter([lp_data.into()]);
|
||||
|
||||
let write_url = format!("{}api/v2/write", self.inner.uri());
|
||||
self.write_lp_stream(namespace, sources).await
|
||||
}
|
||||
|
||||
/// Write the stream of [LineProtocol] formatted strings in
|
||||
/// `sources` to namespace `namespace`. It is assumed that
|
||||
/// individual lines (points) do not cross these strings
|
||||
///
|
||||
/// Returns the number of bytes, in total, which were written to
|
||||
/// the database
|
||||
///
|
||||
/// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
|
||||
pub async fn write_lp_stream(
|
||||
&mut self,
|
||||
namespace: impl AsRef<str> + Send,
|
||||
sources: impl Stream<Item = String> + Send,
|
||||
) -> Result<usize, Error> {
|
||||
let (org_id, bucket_id) = split_namespace(namespace.as_ref()).map_err(|e| {
|
||||
Error::invalid_argument(
|
||||
"namespace",
|
||||
|
|
@ -71,47 +134,302 @@ impl Client {
|
|||
)
|
||||
})?;
|
||||
|
||||
let response = self
|
||||
.inner
|
||||
.client()
|
||||
.request(Method::POST, &write_url)
|
||||
.query(&[("bucket", bucket_id), ("org", org_id)])
|
||||
.body(lp_data)
|
||||
.send()
|
||||
let max_concurrent_uploads: usize = self.max_concurrent_uploads.into();
|
||||
let max_request_payload_size_bytes = self.max_request_payload_size_bytes;
|
||||
|
||||
// make a stream and process in parallel
|
||||
let results = sources
|
||||
// split each input source in parallel, if possible
|
||||
.flat_map(|source| {
|
||||
split_lp(
|
||||
source,
|
||||
max_request_payload_size_bytes,
|
||||
max_concurrent_uploads,
|
||||
)
|
||||
})
|
||||
// do the actual write
|
||||
.map(|source| {
|
||||
let org_id = org_id.to_string();
|
||||
let bucket_id = bucket_id.to_string();
|
||||
let inner = Arc::clone(&self.inner);
|
||||
|
||||
tokio::task::spawn(
|
||||
async move { inner.write_source(org_id, bucket_id, source).await },
|
||||
)
|
||||
})
|
||||
// Do the uploads in parallel
|
||||
.buffered(max_concurrent_uploads)
|
||||
.try_collect::<Vec<_>>()
|
||||
// handle panics in tasks
|
||||
.await
|
||||
.map_err(Error::client)?;
|
||||
.map_err(Error::client)?
|
||||
// find / return any errors
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>, Error>>()?;
|
||||
|
||||
translate_response(response).await?;
|
||||
Ok(results.into_iter().sum())
|
||||
}
|
||||
}
|
||||
|
||||
Ok(data_len)
|
||||
/// Something that knows how to send http data. Exists so it can be
|
||||
/// mocked out for testing
|
||||
trait RequestMaker: Debug + Send + Sync {
|
||||
/// Write the body data to the specified org, bucket, and
|
||||
/// returning the number of bytes written
|
||||
///
|
||||
/// (this is implemented manually to avoid `async_trait`)
|
||||
fn write_source(
|
||||
&self,
|
||||
org_id: String,
|
||||
bucket_id: String,
|
||||
body: String,
|
||||
) -> BoxFuture<'_, Result<usize, Error>>;
|
||||
}
|
||||
|
||||
impl RequestMaker for HttpConnection {
|
||||
fn write_source(
|
||||
&self,
|
||||
org_id: String,
|
||||
bucket_id: String,
|
||||
body: String,
|
||||
) -> BoxFuture<'_, Result<usize, Error>> {
|
||||
let write_url = format!("{}api/v2/write", self.uri());
|
||||
|
||||
async move {
|
||||
let body: Body = body.into();
|
||||
|
||||
let data_len = body.as_bytes().map(|b| b.len()).unwrap_or(0);
|
||||
|
||||
let response = self
|
||||
.client()
|
||||
.request(Method::POST, &write_url)
|
||||
.query(&[("bucket", bucket_id), ("org", org_id)])
|
||||
.body(body)
|
||||
.send()
|
||||
.await
|
||||
.map_err(Error::client)?;
|
||||
|
||||
translate_response(response).await?;
|
||||
|
||||
Ok(data_len)
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
/// splits input line protocol into one or more sizes of at most
|
||||
/// `max_chunk` on line breaks in a separte tokio task
|
||||
fn split_lp(
|
||||
input: String,
|
||||
max_chunk_size: Option<usize>,
|
||||
max_concurrent_uploads: usize,
|
||||
) -> impl Stream<Item = String> {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(max_concurrent_uploads);
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
match max_chunk_size {
|
||||
None => {
|
||||
// ignore errors (means the receiver hung up but nothing to communicate
|
||||
tx.send(input).await.ok();
|
||||
}
|
||||
Some(max_chunk_size) => {
|
||||
// use the actual line protocol parser to split on valid boundaries
|
||||
let mut acc = LineAccumulator::new(max_chunk_size);
|
||||
for l in influxdb_line_protocol::split_lines(&input) {
|
||||
if let Some(chunk) = acc.push(l) {
|
||||
// abort if receiver has hungup
|
||||
if tx.send(chunk).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(chunk) = acc.flush() {
|
||||
tx.send(chunk).await.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio_stream::wrappers::ReceiverStream::new(rx)
|
||||
}
|
||||
#[derive(Debug)]
|
||||
struct LineAccumulator {
|
||||
current_chunk: String,
|
||||
max_chunk_size: usize,
|
||||
}
|
||||
|
||||
impl LineAccumulator {
|
||||
fn new(max_chunk_size: usize) -> Self {
|
||||
Self {
|
||||
current_chunk: String::with_capacity(max_chunk_size),
|
||||
max_chunk_size,
|
||||
}
|
||||
}
|
||||
|
||||
// Add data `l` to the current chunk being created, returning the
|
||||
// current chunk if complete.
|
||||
fn push(&mut self, l: &str) -> Option<String> {
|
||||
let chunk = if self.current_chunk.len() + l.len() + 1 > self.max_chunk_size {
|
||||
self.flush()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if !self.current_chunk.is_empty() {
|
||||
self.current_chunk += "\n";
|
||||
}
|
||||
|
||||
self.current_chunk += l;
|
||||
chunk
|
||||
}
|
||||
|
||||
/// allocate a new chunk with the right size, returning the currently built chunk if it has non zero length
|
||||
/// `self.current_chunk.len()` is zero
|
||||
fn flush(&mut self) -> Option<String> {
|
||||
if !self.current_chunk.is_empty() {
|
||||
let mut new_chunk = String::with_capacity(self.max_chunk_size);
|
||||
std::mem::swap(&mut new_chunk, &mut self.current_chunk);
|
||||
Some(new_chunk)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Mutex;
|
||||
|
||||
use super::*;
|
||||
use crate::connection::Builder;
|
||||
|
||||
#[tokio::test]
|
||||
/// Ensure the basic plumbing is hooked up correctly
|
||||
async fn basic() {
|
||||
let url = mockito::server_url();
|
||||
|
||||
let connection = Builder::new().build(&url).await.unwrap();
|
||||
async fn test() {
|
||||
let mock = Arc::new(MockRequestMaker::new());
|
||||
|
||||
let namespace = "orgname_bucketname";
|
||||
let data = "m,t=foo f=4";
|
||||
|
||||
let m = mockito::mock("POST", "/api/v2/write?bucket=bucketname&org=orgname")
|
||||
.with_status(201)
|
||||
.match_body(data)
|
||||
.create();
|
||||
let expected = vec![MockRequest {
|
||||
org_id: "orgname".into(),
|
||||
bucket_id: "bucketname".into(),
|
||||
body: data.into(),
|
||||
}];
|
||||
|
||||
let res = Client::new(connection).write_lp(namespace, data).await;
|
||||
|
||||
m.assert();
|
||||
|
||||
let num_bytes = res.expect("Error making write request");
|
||||
let num_bytes = Client::new_with_maker(Arc::clone(&mock) as _)
|
||||
.write_lp(namespace, data)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(expected, mock.requests());
|
||||
assert_eq!(num_bytes, 11);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_max_request_payload_size() {
|
||||
let mock = Arc::new(MockRequestMaker::new());
|
||||
|
||||
let namespace = "orgname_bucketname";
|
||||
let data = "m,t=foo f=4\n\
|
||||
m,t=bar f=3\n\
|
||||
m,t=fooddddddd f=4";
|
||||
|
||||
// expect the data to be broken up into two chunks:
|
||||
let expected = vec![
|
||||
MockRequest {
|
||||
org_id: "orgname".into(),
|
||||
bucket_id: "bucketname".into(),
|
||||
body: "m,t=foo f=4\nm,t=bar f=3".into(),
|
||||
},
|
||||
MockRequest {
|
||||
org_id: "orgname".into(),
|
||||
bucket_id: "bucketname".into(),
|
||||
body: "m,t=fooddddddd f=4".into(),
|
||||
},
|
||||
];
|
||||
|
||||
let num_bytes = Client::new_with_maker(Arc::clone(&mock) as _)
|
||||
// enough to get first two lines, but not last
|
||||
.with_max_request_payload_size_bytes(Some(30))
|
||||
.write_lp(namespace, data)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(expected, mock.requests());
|
||||
assert_eq!(num_bytes, 41);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_lp_stream() {
|
||||
let mock = Arc::new(MockRequestMaker::new());
|
||||
|
||||
let namespace = "orgname_bucketname";
|
||||
let data = futures_util::stream::iter(
|
||||
vec!["m,t=foo f=4", "m,t=bar f=3"]
|
||||
.into_iter()
|
||||
.map(|s| s.to_string()),
|
||||
);
|
||||
|
||||
// expect the data to come in two chunks
|
||||
let expected = vec![
|
||||
MockRequest {
|
||||
org_id: "orgname".into(),
|
||||
bucket_id: "bucketname".into(),
|
||||
body: "m,t=foo f=4".into(),
|
||||
},
|
||||
MockRequest {
|
||||
org_id: "orgname".into(),
|
||||
bucket_id: "bucketname".into(),
|
||||
body: "m,t=bar f=3".into(),
|
||||
},
|
||||
];
|
||||
|
||||
let num_bytes = Client::new_with_maker(Arc::clone(&mock) as _)
|
||||
.write_lp_stream(namespace, data)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(expected, mock.requests());
|
||||
assert_eq!(num_bytes, 22);
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
struct MockRequest {
|
||||
org_id: String,
|
||||
bucket_id: String,
|
||||
body: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MockRequestMaker {
|
||||
requests: Mutex<Vec<MockRequest>>,
|
||||
}
|
||||
|
||||
impl MockRequestMaker {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
requests: Mutex::new(vec![]),
|
||||
}
|
||||
}
|
||||
|
||||
/// get a copy of the requests that were made using this mock
|
||||
fn requests(&self) -> Vec<MockRequest> {
|
||||
self.requests.lock().unwrap().clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestMaker for MockRequestMaker {
|
||||
fn write_source(
|
||||
&self,
|
||||
org_id: String,
|
||||
bucket_id: String,
|
||||
body: String,
|
||||
) -> BoxFuture<'_, Result<usize, Error>> {
|
||||
let sz = body.len();
|
||||
|
||||
self.requests.lock().unwrap().push(MockRequest {
|
||||
org_id,
|
||||
bucket_id,
|
||||
body,
|
||||
});
|
||||
|
||||
async move { Ok(sz) }.boxed()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -529,7 +529,7 @@ pub fn parse_lines(input: &str) -> impl Iterator<Item = Result<ParsedLine<'_>>>
|
|||
/// logic duplication for scanning fields, duplicating it also means
|
||||
/// we can be more sure of the compatibility of the rust parser and
|
||||
/// the canonical Go parser.
|
||||
fn split_lines(input: &str) -> impl Iterator<Item = &str> {
|
||||
pub fn split_lines(input: &str) -> impl Iterator<Item = &str> {
|
||||
// NB: This is ported as closely as possibly from the original Go code:
|
||||
let mut quoted = false;
|
||||
let mut fields = false;
|
||||
|
|
|
|||
Binary file not shown.
|
|
@ -1,12 +1,12 @@
|
|||
//! Client helpers for writing end to end ng tests
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use generated_types::influxdata::pbdata::v1::WriteResponse;
|
||||
use http::Response;
|
||||
use hyper::{Body, Client, Request};
|
||||
use influxdb_iox_client::{
|
||||
connection::Connection,
|
||||
flight::generated_types::ReadInfo,
|
||||
write::generated_types::WriteResponse,
|
||||
write_info::generated_types::{merge_responses, GetWriteInfoResponse, ShardStatus},
|
||||
};
|
||||
use observability_deps::tracing::info;
|
||||
|
|
|
|||
Loading…
Reference in New Issue