refactor: make end argument common to query and write load generation (#24881)

* refactor: make end common to load generatino tool

Made the --end argument common to both the query and write load generation
runners.

A panic message was also added in the table buffer where unwraps were
causing panics

* refactor: load gen print statements for consistency
pd/lg-analysis
Trevor Hilton 2024-04-04 10:13:08 -04:00 committed by GitHub
parent 51ff5ebbaf
commit 557b939b15
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 96 additions and 46 deletions

View File

@ -1,7 +1,7 @@
use std::{fs::File, path::PathBuf, sync::Arc};
use std::{fs::File, path::PathBuf, str::FromStr, sync::Arc};
use anyhow::{anyhow, bail, Context};
use chrono::Local;
use chrono::{DateTime, Local};
use clap::Parser;
use influxdb3_client::Client;
use secrecy::{ExposeSecret, Secret};
@ -79,6 +79,41 @@ pub(crate) struct InfluxDb3Config {
/// Generate a system stats file in the specified `results_dir`
#[clap(long = "system-stats", default_value_t = false)]
pub(crate) system_stats: bool,
/// Provide an end time to stop the load generation.
///
/// This can be a human readable offset, e.g., `10m` (10 minutes), `1h` (1 hour), etc., or an
/// exact date-time in RFC3339 form, e.g., `2023-10-30T19:10:00-04:00`.
#[clap(long = "end")]
pub(crate) end_time: Option<FutureOffsetTime>,
}
/// A time in the future
///
/// Wraps a [`DateTime`], providing a custom [`FromStr`] implementation that parses human
/// time input and converts it to a date-time in the future.
#[derive(Debug, Clone, Copy)]
pub struct FutureOffsetTime(DateTime<Local>);
impl FromStr for FutureOffsetTime {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(t) = humantime::parse_rfc3339_weak(s) {
Ok(Self(DateTime::<Local>::from(t)))
} else {
humantime::parse_duration(s)
.map(|d| Local::now() + d)
.map(Self)
.with_context(|| format!("could not parse future offset time value: {s}"))
}
}
}
impl From<FutureOffsetTime> for DateTime<Local> {
fn from(t: FutureOffsetTime) -> Self {
t.0
}
}
/// Can run the load generation tool exclusively in either `query` or `write` mode, or
@ -96,6 +131,8 @@ pub(crate) enum LoadType {
pub(crate) struct LoadConfig {
/// The target database name on the `influxdb3` server
pub(crate) database_name: String,
/// The end time of the load generation run
pub(crate) end_time: Option<DateTime<Local>>,
/// The directory that will store generated results files
results_dir: PathBuf,
/// If `true`, the configuration will initialize only to print out
@ -113,11 +150,23 @@ pub(crate) struct LoadConfig {
impl LoadConfig {
/// Create a new [`LoadConfig`]
fn new(database_name: String, results_dir: PathBuf, print_mode: bool) -> Self {
fn new(
database_name: String,
results_dir: PathBuf,
end_time: Option<impl Into<DateTime<Local>>>,
print_mode: bool,
) -> Self {
let end_time: Option<DateTime<Local>> = end_time.map(Into::into);
if let Some(t) = end_time {
println!("running load generation until: {t}");
} else {
println!("running load generation indefinitely");
}
Self {
database_name,
results_dir,
print_mode,
end_time,
..Default::default()
}
}
@ -297,6 +346,7 @@ impl InfluxDb3Config {
results_dir,
configuration_name,
system_stats,
end_time,
} = self;
match (
@ -338,7 +388,7 @@ impl InfluxDb3Config {
};
// initialize the load config:
let mut config = LoadConfig::new(database_name, results_dir, print_spec);
let mut config = LoadConfig::new(database_name, results_dir, end_time, print_spec);
// if builtin spec is set, use that instead of the spec path
if let Some(b) = builtin_spec {

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use anyhow::Context;
use clap::Parser;
use tokio::task::JoinSet;
use crate::commands::{query::run_query_load, write::run_write_load};
@ -35,17 +35,20 @@ pub(crate) async fn command(mut config: Config) -> Result<(), anyhow::Error> {
let stats = load_config.system_reporter()?;
let database_name = load_config.database_name.clone();
let mut tasks = JoinSet::new();
// setup query load:
let query_spec = load_config.query_spec()?;
let (query_results_file_path, query_reporter) = load_config.query_reporter()?;
let qr = Arc::clone(&query_reporter);
let query_client = client.clone();
let query_handle = tokio::spawn(async move {
tasks.spawn(async move {
run_query_load(
query_spec,
qr,
query_client,
database_name.clone(),
load_config.end_time,
config.query,
)
.await
@ -55,22 +58,21 @@ pub(crate) async fn command(mut config: Config) -> Result<(), anyhow::Error> {
let write_spec = load_config.write_spec()?;
let (write_results_file_path, write_reporter) = load_config.write_reporter()?;
let wr = Arc::clone(&write_reporter);
let write_handle = tokio::spawn(async move {
tasks.spawn(async move {
run_write_load(
write_spec,
wr,
client,
load_config.database_name,
load_config.end_time,
config.write,
)
.await
});
let (query_task, write_task) = tokio::try_join!(query_handle, write_handle)
.context("failed to join query and write tasks")?;
query_task?;
write_task?;
while let Some(res) = tasks.join_next().await {
res??;
}
write_reporter.shutdown();
println!("write results saved in: {write_results_file_path}");

View File

@ -1,7 +1,7 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use bytes::Bytes;
use chrono::Local;
use chrono::{DateTime, Local};
use clap::Parser;
use influxdb3_client::Client;
use serde_json::Value;
@ -71,6 +71,7 @@ pub(crate) async fn command(mut config: Config) -> Result<(), anyhow::Error> {
Arc::clone(&reporter),
client,
load_config.database_name,
load_config.end_time,
config.query,
)
.await?;
@ -91,6 +92,7 @@ pub(crate) async fn run_query_load(
reporter: Arc<QueryReporter>,
client: influxdb3_client::Client,
database_name: String,
end_time: Option<DateTime<Local>>,
config: QueryConfig,
) -> Result<(), anyhow::Error> {
let QueryConfig {
@ -109,6 +111,7 @@ pub(crate) async fn run_query_load(
querier,
client.clone(),
database_name.clone(),
end_time,
reporter,
));
tasks.push(task);
@ -127,9 +130,18 @@ async fn run_querier(
mut querier: Querier,
client: Client,
database_name: String,
end_time: Option<DateTime<Local>>,
reporter: Arc<QueryReporter>,
) {
loop {
if end_time.is_some_and(|t| Local::now() > t) {
println!(
"querier {id} completed at {time}",
id = querier.querier_id,
time = Local::now()
);
break;
}
for query in &mut querier.queries {
let start_request = Instant::now();
let mut builder = client

View File

@ -66,13 +66,6 @@ pub(crate) struct WriteConfig {
/// specification like `1 hour` in the past. If not specified, defaults to now.
#[clap(long = "start", action)]
start_time: Option<String>,
/// The date and time at which to stop the timestamps of the generated data.
///
/// Can be an exact datetime like `2020-01-01T01:23:45-05:00` or a fuzzy
/// specification like `1 hour` in the future. If not specified, data will continue generating forever.
#[clap(long = "end", action)]
end_time: Option<String>,
}
#[derive(Debug, Clone, Copy)]
@ -127,6 +120,7 @@ pub(crate) async fn command(mut config: Config) -> Result<(), anyhow::Error> {
Arc::clone(&reporter),
client,
load_config.database_name,
load_config.end_time,
config.write,
)
.await?;
@ -147,6 +141,7 @@ pub(crate) async fn run_write_load(
reporter: Arc<WriteReporter>,
client: influxdb3_client::Client,
database_name: String,
end_time: Option<DateTime<Local>>,
config: WriteConfig,
) -> Result<(), anyhow::Error> {
let WriteConfig {
@ -154,7 +149,6 @@ pub(crate) async fn run_write_load(
writer_count,
dry_run,
start_time,
end_time,
..
} = config;
@ -162,6 +156,8 @@ pub(crate) async fn run_write_load(
"creating generators for {} concurrent writers",
writer_count
);
println!("each writer will send a write request every {sampling_interval}");
let mut generators =
create_generators(&spec, writer_count).context("failed to create generators")?;
@ -188,18 +184,6 @@ pub(crate) async fn run_write_load(
None
};
let end_time = if let Some(end_time) = end_time {
let end_time = parse_time_offset(&end_time, Local::now());
println!("ending at {:?}", end_time);
Some(end_time)
} else {
println!(
"running indefinitely with each writer sending a request every {}",
sampling_interval
);
None
};
// spawn tokio tasks for each writer
let mut tasks = Vec::new();
for generator in generators {
@ -306,10 +290,7 @@ async fn run_generator(
let now = Local::now();
if let Some(end_time) = end_time {
if now > end_time {
println!(
"writer {} finished writing to end time: {:?}",
generator.writer_id, end_time
);
println!("writer {} completed at {}", generator.writer_id, end_time);
return;
}
}

View File

@ -153,6 +153,10 @@ impl WriteReporter {
csv_writer.flush().expect("failed to flush csv reports");
if *self.shutdown.lock() {
return;
}
if console_stats.last_console_output_time.elapsed() > CONSOLE_REPORT_INTERVAL {
let elapsed_millis = console_stats.last_console_output_time.elapsed().as_millis();
@ -167,10 +171,6 @@ impl WriteReporter {
console_stats = ConsoleReportStats::new();
}
if *self.shutdown.lock() {
return;
}
std::thread::sleep(REPORT_FLUSH_INTERVAL);
}
}
@ -280,6 +280,10 @@ impl QueryReporter {
}
csv_writer.flush().expect("failed to flush csv reports");
if *self.shutdown.lock() {
return;
}
if console_stats.last_console_output_time.elapsed() > CONSOLE_REPORT_INTERVAL {
let elapsed_millis = console_stats.last_console_output_time.elapsed().as_millis();
@ -293,10 +297,6 @@ impl QueryReporter {
console_stats = QueryConsoleStats::new();
}
if *self.shutdown.lock() {
return;
}
std::thread::sleep(REPORT_FLUSH_INTERVAL);
}
}

View File

@ -188,7 +188,12 @@ impl TableBuffer {
let mut cols = Vec::with_capacity(self.data.len());
let schema = schema.as_arrow();
for f in &schema.fields {
cols.push(self.data.get(f.name()).unwrap().as_arrow());
cols.push(
self.data
.get(f.name())
.unwrap_or_else(|| panic!("missing field in table buffer: {}", f.name()))
.as_arrow(),
);
}
vec![RecordBatch::try_new(schema, cols).unwrap()]