From 42fbb90d8cf29b900497955f9088102a44582598 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 19 Aug 2021 18:31:21 -0400 Subject: [PATCH 1/2] feat: Add batching to the data generator Adds batch_size to the data genrator to optionally gather multiple calls to generate for each agent. For example, if you have a sampling interval of 10 seconds and start at some point back in time with a batch size of 3, it gather 3 samplings before writing to the points writer. For runs against a server API, this will batch them together in a single API call. --- .../benches/point_generation.rs | 1 + iox_data_generator/src/agent.rs | 12 ++++- iox_data_generator/src/lib.rs | 3 +- iox_data_generator/src/main.rs | 12 +++++ iox_data_generator/src/write.rs | 48 +++++++++++++++++++ 5 files changed, 74 insertions(+), 2 deletions(-) diff --git a/iox_data_generator/benches/point_generation.rs b/iox_data_generator/benches/point_generation.rs index 44115bac82..5cdadc653e 100644 --- a/iox_data_generator/benches/point_generation.rs +++ b/iox_data_generator/benches/point_generation.rs @@ -49,6 +49,7 @@ pub fn single_agent(c: &mut Criterion) { end_datetime, 0, false, + 1, ) }); let n_points = r.expect("Could not generate data"); diff --git a/iox_data_generator/src/agent.rs b/iox_data_generator/src/agent.rs index 796752d7f9..b9273969b0 100644 --- a/iox_data_generator/src/agent.rs +++ b/iox_data_generator/src/agent.rs @@ -126,11 +126,20 @@ impl Agent { /// Generate and write points in batches until `generate` doesn't return any /// points. Meant to be called in a `tokio::task`. - pub async fn generate_all(&mut self, mut points_writer: PointsWriter) -> Result { + pub async fn generate_all( + &mut self, + mut points_writer: PointsWriter, + batch_size: u16, + ) -> Result { let mut total_points = 0; let mut points = self.generate().await?; + let mut batches = 1; while !points.is_empty() { + while batches < batch_size { + points.append(&mut self.generate().await?); + batches += 1; + } info!("[agent {}] sending {} points", self.name, points.len()); total_points += points.len(); points_writer @@ -138,6 +147,7 @@ impl Agent { .await .context(CouldNotWritePoints)?; points = self.generate().await?; + batches = 1; } Ok(total_points) } diff --git a/iox_data_generator/src/lib.rs b/iox_data_generator/src/lib.rs index 80470b34f0..d3c12166c7 100644 --- a/iox_data_generator/src/lib.rs +++ b/iox_data_generator/src/lib.rs @@ -91,6 +91,7 @@ pub async fn generate( end_datetime: Option, execution_start_time: i64, continue_on: bool, + batch_size: u16, ) -> Result { let seed = spec.base_seed.to_owned().unwrap_or_else(|| { let mut rng = rand::thread_rng(); @@ -134,7 +135,7 @@ pub async fn generate( let agent_points_writer = points_writer_builder.build_for_agent(&agent_name); handles.push(tokio::task::spawn(async move { - agent.generate_all(agent_points_writer).await + agent.generate_all(agent_points_writer, batch_size).await })); } } diff --git a/iox_data_generator/src/main.rs b/iox_data_generator/src/main.rs index ab8d549938..ffdee1ed92 100644 --- a/iox_data_generator/src/main.rs +++ b/iox_data_generator/src/main.rs @@ -123,6 +123,12 @@ Logging: "Generate live data using the intervals from the spec after generating historical \ data. This option has no effect if you specify an end time.", )) + .arg( + Arg::with_name("batch_size") + .long("batch_size") + .help("Generate this many samplings to batch into a single API call. Good for sending a bunch of historical data in quickly if paired with a start time from long ago.") + .takes_value(true) + ) .get_matches(); let spec_filename = matches @@ -140,6 +146,11 @@ Logging: let continue_on = matches.is_present("continue"); + let batch_size = matches + .value_of("batch_size") + .map(|v| v.parse::().unwrap()) + .unwrap_or(1); + info!( "Starting at {}, ending at {} ({}){}", start_display, @@ -177,6 +188,7 @@ Logging: end_datetime, execution_start_time.timestamp_nanos(), continue_on, + batch_size, ) .await; diff --git a/iox_data_generator/src/write.rs b/iox_data_generator/src/write.rs index 9e03effc2c..3618f976e6 100644 --- a/iox_data_generator/src/write.rs +++ b/iox_data_generator/src/write.rs @@ -344,6 +344,7 @@ bool = true"#; Some(now), now, false, + 1, ) .await?; @@ -358,4 +359,51 @@ bool = true"#; Ok(()) } + + #[tokio::test] + async fn test_generate_batches() -> Result<()> { + let toml = r#" +name = "demo_schema" +base_seed = "this is a demo" + +[[agents]] +name = "basic" +sampling_interval = 1 + +[[agents.measurements]] +name = "cpu" + +[[agents.measurements.fields]] +name = "up" +bool = true"#; + + let data_spec = DataSpec::from_str(toml).unwrap(); + let mut points_writer_builder = PointsWriterBuilder::new_vec(); + + let now = now_ns(); + + generate::( + &data_spec, + &mut points_writer_builder, + Some(now - 1_000_000_000), + Some(now), + now, + false, + 2, + ) + .await?; + + let line_protocol = points_writer_builder.written_data("basic"); + + let expected_line_protocol = format!( + r#"cpu,data_spec=demo_schema up=f {} +cpu,data_spec=demo_schema up=f {} +"#, + now - 1_000_000_000, + now + ); + assert_eq!(line_protocol, expected_line_protocol); + + Ok(()) + } } From e8545271828dc369e84ccfdfe1baf3fabae781a8 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Fri, 20 Aug 2021 11:08:33 -0400 Subject: [PATCH 2/2] chore: fixup data generator based on feedback --- iox_data_generator/src/agent.rs | 5 +++-- iox_data_generator/src/lib.rs | 2 +- iox_data_generator/src/main.rs | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/iox_data_generator/src/agent.rs b/iox_data_generator/src/agent.rs index b9273969b0..d4c92e99a0 100644 --- a/iox_data_generator/src/agent.rs +++ b/iox_data_generator/src/agent.rs @@ -125,11 +125,12 @@ impl Agent { } /// Generate and write points in batches until `generate` doesn't return any - /// points. Meant to be called in a `tokio::task`. + /// points. Points will be written to the writer in batches where `generate` is + /// called `batch_size` times before writing. Meant to be called in a `tokio::task`. pub async fn generate_all( &mut self, mut points_writer: PointsWriter, - batch_size: u16, + batch_size: usize, ) -> Result { let mut total_points = 0; diff --git a/iox_data_generator/src/lib.rs b/iox_data_generator/src/lib.rs index d3c12166c7..ebd4535130 100644 --- a/iox_data_generator/src/lib.rs +++ b/iox_data_generator/src/lib.rs @@ -91,7 +91,7 @@ pub async fn generate( end_datetime: Option, execution_start_time: i64, continue_on: bool, - batch_size: u16, + batch_size: usize, ) -> Result { let seed = spec.base_seed.to_owned().unwrap_or_else(|| { let mut rng = rand::thread_rng(); diff --git a/iox_data_generator/src/main.rs b/iox_data_generator/src/main.rs index ffdee1ed92..6bd14c2c20 100644 --- a/iox_data_generator/src/main.rs +++ b/iox_data_generator/src/main.rs @@ -148,7 +148,7 @@ Logging: let batch_size = matches .value_of("batch_size") - .map(|v| v.parse::().unwrap()) + .map(|v| v.parse::().unwrap()) .unwrap_or(1); info!(