Merge branch 'main' into upgrade_heappy

pull/24376/head
kodiakhq[bot] 2021-08-20 15:54:37 +00:00 committed by GitHub
commit 704d2c40cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 76 additions and 3 deletions

View File

@ -49,6 +49,7 @@ pub fn single_agent(c: &mut Criterion) {
end_datetime, end_datetime,
0, 0,
false, false,
1,
) )
}); });
let n_points = r.expect("Could not generate data"); let n_points = r.expect("Could not generate data");

View File

@ -125,12 +125,22 @@ impl<T: DataGenRng> Agent<T> {
} }
/// Generate and write points in batches until `generate` doesn't return any /// Generate and write points in batches until `generate` doesn't return any
/// points. Meant to be called in a `tokio::task`. /// points. Points will be written to the writer in batches where `generate` is
pub async fn generate_all(&mut self, mut points_writer: PointsWriter) -> Result<usize> { /// called `batch_size` times before writing. Meant to be called in a `tokio::task`.
pub async fn generate_all(
&mut self,
mut points_writer: PointsWriter,
batch_size: usize,
) -> Result<usize> {
let mut total_points = 0; let mut total_points = 0;
let mut points = self.generate().await?; let mut points = self.generate().await?;
let mut batches = 1;
while !points.is_empty() { while !points.is_empty() {
while batches < batch_size {
points.append(&mut self.generate().await?);
batches += 1;
}
info!("[agent {}] sending {} points", self.name, points.len()); info!("[agent {}] sending {} points", self.name, points.len());
total_points += points.len(); total_points += points.len();
points_writer points_writer
@ -138,6 +148,7 @@ impl<T: DataGenRng> Agent<T> {
.await .await
.context(CouldNotWritePoints)?; .context(CouldNotWritePoints)?;
points = self.generate().await?; points = self.generate().await?;
batches = 1;
} }
Ok(total_points) Ok(total_points)
} }

View File

@ -91,6 +91,7 @@ pub async fn generate<T: DataGenRng>(
end_datetime: Option<i64>, end_datetime: Option<i64>,
execution_start_time: i64, execution_start_time: i64,
continue_on: bool, continue_on: bool,
batch_size: usize,
) -> Result<usize> { ) -> Result<usize> {
let seed = spec.base_seed.to_owned().unwrap_or_else(|| { let seed = spec.base_seed.to_owned().unwrap_or_else(|| {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
@ -134,7 +135,7 @@ pub async fn generate<T: DataGenRng>(
let agent_points_writer = points_writer_builder.build_for_agent(&agent_name); let agent_points_writer = points_writer_builder.build_for_agent(&agent_name);
handles.push(tokio::task::spawn(async move { handles.push(tokio::task::spawn(async move {
agent.generate_all(agent_points_writer).await agent.generate_all(agent_points_writer, batch_size).await
})); }));
} }
} }

View File

@ -123,6 +123,12 @@ Logging:
"Generate live data using the intervals from the spec after generating historical \ "Generate live data using the intervals from the spec after generating historical \
data. This option has no effect if you specify an end time.", data. This option has no effect if you specify an end time.",
)) ))
.arg(
Arg::with_name("batch_size")
.long("batch_size")
.help("Generate this many samplings to batch into a single API call. Good for sending a bunch of historical data in quickly if paired with a start time from long ago.")
.takes_value(true)
)
.get_matches(); .get_matches();
let spec_filename = matches let spec_filename = matches
@ -140,6 +146,11 @@ Logging:
let continue_on = matches.is_present("continue"); let continue_on = matches.is_present("continue");
let batch_size = matches
.value_of("batch_size")
.map(|v| v.parse::<usize>().unwrap())
.unwrap_or(1);
info!( info!(
"Starting at {}, ending at {} ({}){}", "Starting at {}, ending at {} ({}){}",
start_display, start_display,
@ -177,6 +188,7 @@ Logging:
end_datetime, end_datetime,
execution_start_time.timestamp_nanos(), execution_start_time.timestamp_nanos(),
continue_on, continue_on,
batch_size,
) )
.await; .await;

View File

@ -344,6 +344,7 @@ bool = true"#;
Some(now), Some(now),
now, now,
false, false,
1,
) )
.await?; .await?;
@ -358,4 +359,51 @@ bool = true"#;
Ok(()) Ok(())
} }
#[tokio::test]
async fn test_generate_batches() -> Result<()> {
let toml = r#"
name = "demo_schema"
base_seed = "this is a demo"
[[agents]]
name = "basic"
sampling_interval = 1
[[agents.measurements]]
name = "cpu"
[[agents.measurements.fields]]
name = "up"
bool = true"#;
let data_spec = DataSpec::from_str(toml).unwrap();
let mut points_writer_builder = PointsWriterBuilder::new_vec();
let now = now_ns();
generate::<ZeroRng>(
&data_spec,
&mut points_writer_builder,
Some(now - 1_000_000_000),
Some(now),
now,
false,
2,
)
.await?;
let line_protocol = points_writer_builder.written_data("basic");
let expected_line_protocol = format!(
r#"cpu,data_spec=demo_schema up=f {}
cpu,data_spec=demo_schema up=f {}
"#,
now - 1_000_000_000,
now
);
assert_eq!(line_protocol, expected_line_protocol);
Ok(())
}
} }