fix: Let's not all write to the same file at the same time

Fixes #6001.

The generator can create multiple agents that all write in parallel to
the same file, which results in garbage.

Share the same File instance with a Mutex around it and lock the file
until you've written one whole line.
pull/24376/head
Carol (Nichols || Goulding) 2022-10-28 13:44:31 -04:00
parent 71dd3b5fa5
commit 729ffffa3e
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
4 changed files with 18 additions and 15 deletions

View File

@ -199,7 +199,8 @@ agents = [{name = "foo", sampling_interval = "1s", count = 3}]
group.bench_function("single agent with basic configuration", |b| { group.bench_function("single agent with basic configuration", |b| {
b.iter(|| { b.iter(|| {
agent.reset_current_date_time(0); agent.reset_current_date_time(0);
let points_writer = points_writer.build_for_agent("foo", "foo", "foo").unwrap(); let points_writer =
Arc::new(points_writer.build_for_agent("foo", "foo", "foo").unwrap());
let r = block_on(agent.generate_all(points_writer, 1, Arc::clone(&counter))); let r = block_on(agent.generate_all(points_writer, 1, Arc::clone(&counter)));
let n_points = r.expect("Could not generate data"); let n_points = r.expect("Could not generate data");
assert_eq!(n_points, expected_points as usize); assert_eq!(n_points, expected_points as usize);

View File

@ -134,7 +134,7 @@ impl Agent {
/// called `batch_size` times before writing. Meant to be called in a `tokio::task`. /// called `batch_size` times before writing. Meant to be called in a `tokio::task`.
pub async fn generate_all( pub async fn generate_all(
&mut self, &mut self,
mut points_writer: PointsWriter, points_writer: Arc<PointsWriter>,
batch_size: usize, batch_size: usize,
counter: Arc<AtomicU64>, counter: Arc<AtomicU64>,
) -> Result<usize> { ) -> Result<usize> {

View File

@ -153,7 +153,7 @@ pub async fn generate(
) )
.context(CouldNotCreateAgentSnafu)?; .context(CouldNotCreateAgentSnafu)?;
info!( println!(
"Configuring {} agents of \"{}\" to write data \ "Configuring {} agents of \"{}\" to write data \
to org {} and bucket {} (database {})", to org {} and bucket {} (database {})",
agent_assignment.count, agent_assignment.count,
@ -163,12 +163,15 @@ pub async fn generate(
database_assignments.database, database_assignments.database,
); );
for mut agent in agents.into_iter() { let agent_points_writer = Arc::new(
let agent_points_writer = points_writer_builder points_writer_builder
.build_for_agent(&agent_assignment.spec.name, org, bucket) .build_for_agent(&agent_assignment.spec.name, org, bucket)
.context(CouldNotCreateAgentWriterSnafu)?; .context(CouldNotCreateAgentWriterSnafu)?,
);
for mut agent in agents.into_iter() {
let lock_ref = Arc::clone(&lock); let lock_ref = Arc::clone(&lock);
let agent_points_writer = Arc::clone(&agent_points_writer);
let total_rows = Arc::clone(&total_rows); let total_rows = Arc::clone(&total_rows);
handles.push(tokio::task::spawn(async move { handles.push(tokio::task::spawn(async move {

View File

@ -10,14 +10,12 @@ use parquet_file::{metadata::IoxMetadata, serialize};
use schema::selection::Selection; use schema::selection::Selection;
use snafu::{ensure, ResultExt, Snafu}; use snafu::{ensure, ResultExt, Snafu};
#[cfg(test)] #[cfg(test)]
use std::{ use std::{collections::BTreeMap, sync::Arc};
collections::BTreeMap,
sync::{Arc, Mutex},
};
use std::{ use std::{
fs::{self, File, OpenOptions}, fs::{self, File, OpenOptions},
io::{BufWriter, Write}, io::{BufWriter, Write},
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Mutex,
}; };
/// Errors that may happen while writing points. /// Errors that may happen while writing points.
@ -243,7 +241,7 @@ impl PointsWriterBuilder {
.open(&filename) .open(&filename)
.context(CantOpenLineProtocolFileSnafu { filename })?; .context(CantOpenLineProtocolFileSnafu { filename })?;
let file = BufWriter::new(file); let file = Mutex::new(BufWriter::new(file));
InnerPointsWriter::File { file } InnerPointsWriter::File { file }
} }
@ -279,7 +277,7 @@ pub struct PointsWriter {
impl PointsWriter { impl PointsWriter {
/// Write these points /// Write these points
pub async fn write_points( pub async fn write_points(
&mut self, &self,
points: impl Iterator<Item = LineToGenerate> + Send + Sync + 'static, points: impl Iterator<Item = LineToGenerate> + Send + Sync + 'static,
) -> Result<()> { ) -> Result<()> {
self.inner_writer.write_points(points).await self.inner_writer.write_points(points).await
@ -294,7 +292,7 @@ enum InnerPointsWriter {
bucket: String, bucket: String,
}, },
File { File {
file: BufWriter<File>, file: Mutex<BufWriter<File>>,
}, },
ParquetFile { ParquetFile {
dir_path: PathBuf, dir_path: PathBuf,
@ -310,7 +308,7 @@ enum InnerPointsWriter {
impl InnerPointsWriter { impl InnerPointsWriter {
async fn write_points( async fn write_points(
&mut self, &self,
points: impl Iterator<Item = LineToGenerate> + Send + Sync + 'static, points: impl Iterator<Item = LineToGenerate> + Send + Sync + 'static,
) -> Result<()> { ) -> Result<()> {
match self { match self {
@ -326,6 +324,7 @@ impl InnerPointsWriter {
} }
Self::File { file } => { Self::File { file } => {
for point in points { for point in points {
let mut file = file.lock().expect("Should be able to get lock");
point point
.write_data_point_to(&mut *file) .write_data_point_to(&mut *file)
.context(CantWriteToLineProtocolFileSnafu)?; .context(CantWriteToLineProtocolFileSnafu)?;
@ -389,7 +388,7 @@ impl InnerPointsWriter {
} }
} }
#[cfg(test)] #[cfg(test)]
Self::Vec(ref mut vec) => { Self::Vec(vec) => {
let vec_ref = Arc::clone(vec); let vec_ref = Arc::clone(vec);
let mut vec = vec_ref.lock().expect("Should be able to get lock"); let mut vec = vec_ref.lock().expect("Should be able to get lock");
for point in points { for point in points {