diff --git a/iox_data_generator/benches/point_generation.rs b/iox_data_generator/benches/point_generation.rs index f1e3d6b41a..c704f97360 100644 --- a/iox_data_generator/benches/point_generation.rs +++ b/iox_data_generator/benches/point_generation.rs @@ -199,7 +199,8 @@ agents = [{name = "foo", sampling_interval = "1s", count = 3}] group.bench_function("single agent with basic configuration", |b| { b.iter(|| { agent.reset_current_date_time(0); - let points_writer = points_writer.build_for_agent("foo", "foo", "foo").unwrap(); + let points_writer = + Arc::new(points_writer.build_for_agent("foo", "foo", "foo").unwrap()); let r = block_on(agent.generate_all(points_writer, 1, Arc::clone(&counter))); let n_points = r.expect("Could not generate data"); assert_eq!(n_points, expected_points as usize); diff --git a/iox_data_generator/src/agent.rs b/iox_data_generator/src/agent.rs index e114ae99a9..caaa65d8bc 100644 --- a/iox_data_generator/src/agent.rs +++ b/iox_data_generator/src/agent.rs @@ -134,7 +134,7 @@ impl Agent { /// called `batch_size` times before writing. Meant to be called in a `tokio::task`. pub async fn generate_all( &mut self, - mut points_writer: PointsWriter, + points_writer: Arc, batch_size: usize, counter: Arc, ) -> Result { diff --git a/iox_data_generator/src/lib.rs b/iox_data_generator/src/lib.rs index 6edfc8e688..a94dda4b44 100644 --- a/iox_data_generator/src/lib.rs +++ b/iox_data_generator/src/lib.rs @@ -153,7 +153,7 @@ pub async fn generate( ) .context(CouldNotCreateAgentSnafu)?; - info!( + println!( "Configuring {} agents of \"{}\" to write data \ to org {} and bucket {} (database {})", agent_assignment.count, @@ -163,12 +163,15 @@ pub async fn generate( database_assignments.database, ); - for mut agent in agents.into_iter() { - let agent_points_writer = points_writer_builder + let agent_points_writer = Arc::new( + points_writer_builder .build_for_agent(&agent_assignment.spec.name, org, bucket) - .context(CouldNotCreateAgentWriterSnafu)?; + .context(CouldNotCreateAgentWriterSnafu)?, + ); + for mut agent in agents.into_iter() { let lock_ref = Arc::clone(&lock); + let agent_points_writer = Arc::clone(&agent_points_writer); let total_rows = Arc::clone(&total_rows); handles.push(tokio::task::spawn(async move { diff --git a/iox_data_generator/src/write.rs b/iox_data_generator/src/write.rs index 55870dcc48..b594375757 100644 --- a/iox_data_generator/src/write.rs +++ b/iox_data_generator/src/write.rs @@ -10,14 +10,12 @@ use parquet_file::{metadata::IoxMetadata, serialize}; use schema::selection::Selection; use snafu::{ensure, ResultExt, Snafu}; #[cfg(test)] -use std::{ - collections::BTreeMap, - sync::{Arc, Mutex}, -}; +use std::{collections::BTreeMap, sync::Arc}; use std::{ fs::{self, File, OpenOptions}, io::{BufWriter, Write}, path::{Path, PathBuf}, + sync::Mutex, }; /// Errors that may happen while writing points. @@ -243,7 +241,7 @@ impl PointsWriterBuilder { .open(&filename) .context(CantOpenLineProtocolFileSnafu { filename })?; - let file = BufWriter::new(file); + let file = Mutex::new(BufWriter::new(file)); InnerPointsWriter::File { file } } @@ -279,7 +277,7 @@ pub struct PointsWriter { impl PointsWriter { /// Write these points pub async fn write_points( - &mut self, + &self, points: impl Iterator + Send + Sync + 'static, ) -> Result<()> { self.inner_writer.write_points(points).await @@ -294,7 +292,7 @@ enum InnerPointsWriter { bucket: String, }, File { - file: BufWriter, + file: Mutex>, }, ParquetFile { dir_path: PathBuf, @@ -310,7 +308,7 @@ enum InnerPointsWriter { impl InnerPointsWriter { async fn write_points( - &mut self, + &self, points: impl Iterator + Send + Sync + 'static, ) -> Result<()> { match self { @@ -326,6 +324,7 @@ impl InnerPointsWriter { } Self::File { file } => { for point in points { + let mut file = file.lock().expect("Should be able to get lock"); point .write_data_point_to(&mut *file) .context(CantWriteToLineProtocolFileSnafu)?; @@ -389,7 +388,7 @@ impl InnerPointsWriter { } } #[cfg(test)] - Self::Vec(ref mut vec) => { + Self::Vec(vec) => { let vec_ref = Arc::clone(vec); let mut vec = vec_ref.lock().expect("Should be able to get lock"); for point in points {