feat: initial load generator implementation (#24808)

* feat: initial load generator implementation

This adds a load generator as a new crate. Initially it only generates write load, but the scaffolding is there to add a query load generator to complement the write load tool.

This could have been added as a subcommand to the influxdb3 program, but I thought it best to have it separate for now.

It's fairly light on tests and error handling given its an internal tooling CLI. I've added only something very basic to test the line protocol generation and run the actual write command by hand.

I included pretty detailed instructions and some runnable examples.

* refactor: address PR feedback
pull/24820/head
Paul Dix 2024-03-25 08:26:24 -04:00 committed by GitHub
parent 4f3288b4c4
commit 1827866d00
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1901 additions and 0 deletions

23
Cargo.lock generated
View File

@ -2544,6 +2544,29 @@ dependencies = [
"url",
]
[[package]]
name = "influxdb3_load_generator"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"clap",
"csv",
"dotenvy",
"humantime",
"influxdb3_client",
"observability_deps",
"parking_lot",
"rand",
"secrecy",
"serde",
"serde_json",
"thiserror",
"tokio",
"trogging",
"url",
]
[[package]]
name = "influxdb3_server"
version = "0.1.0"

View File

@ -3,6 +3,7 @@
members = [
"influxdb3",
"influxdb3_client",
"influxdb3_load_generator",
"influxdb3_server",
"influxdb3_write",
"iox_query_influxql_rewrite",
@ -49,6 +50,7 @@ crc32fast = "1.2.0"
crossbeam-channel = "0.5.11"
datafusion = { git = "https://github.com/erratic-pattern/arrow-datafusion.git", rev = "5965d670c88bdfa1fb74f32fd5021d400838dade" }
datafusion-proto = { git = "https://github.com/erratic-pattern/arrow-datafusion.git", rev = "5965d670c88bdfa1fb74f32fd5021d400838dade" }
csv = "1.3.0"
dotenvy = "0.15.7"
flate2 = "1.0.27"
futures = "0.3.28"
@ -56,6 +58,7 @@ futures-util = "0.3.30"
hashbrown = "0.14.3"
hex = "0.4.3"
http = "0.2.9"
humantime = "2.1.0"
hyper = "0.14"
libc = { version = "0.2" }
mockito = { version = "1.2.0", default-features = false }

View File

@ -0,0 +1,35 @@
[package]
name = "influxdb3_load_generator"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Core Crates
observability_deps.workspace = true
trogging.workspace = true
# Local Deps
influxdb3_client = { path = "../influxdb3_client" }
# crates.io Dependencies
serde.workspace = true
clap.workspace = true
dotenvy.workspace = true
humantime.workspace = true
secrecy.workspace = true
serde_json.workspace = true
tokio.workspace = true
thiserror.workspace = true
url.workspace = true
rand.workspace = true
anyhow.workspace = true
csv.workspace = true
parking_lot.workspace = true
chrono.workspace = true
[lints]
workspace = true

View File

@ -0,0 +1,28 @@
use clap::Parser;
use secrecy::Secret;
use url::Url;
#[derive(Debug, Parser)]
pub(crate) struct InfluxDb3Config {
/// The host URL of the running InfluxDB 3.0 server
#[clap(
short = 'h',
long = "host",
env = "INFLUXDB3_HOST_URL",
default_value = "http://127.0.0.1:8181"
)]
pub(crate) host_url: Url,
/// The database name to generate load against
#[clap(
short = 'd',
long = "dbname",
env = "INFLUXDB3_DATABASE_NAME",
default_value = "load_test"
)]
pub(crate) database_name: String,
/// The token for authentication with the InfluxDB 3.0 server
#[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")]
pub(crate) auth_token: Option<Secret<String>>,
}

View File

@ -0,0 +1,54 @@
use std::str::Utf8Error;
use clap::Parser;
use influxdb3_client::Format;
use secrecy::ExposeSecret;
use tokio::io;
use super::common::InfluxDb3Config;
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error(transparent)]
Client(#[from] influxdb3_client::Error),
#[error("invlid UTF8 received from server: {0}")]
Utf8(#[from] Utf8Error),
#[error("io error: {0}")]
Io(#[from] io::Error),
}
pub(crate) type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Parser)]
#[clap(visible_alias = "q", trailing_var_arg = true)]
pub(crate) struct Config {
/// Common InfluxDB 3.0 config
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
}
pub(crate) async fn command(config: Config) -> Result<()> {
let InfluxDb3Config {
host_url,
database_name,
auth_token,
} = config.influxdb3_config;
let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}
println!("hello from query!");
let resp_bytes = client
.api_v3_query_sql(database_name, "select * from foo limit 10;")
.format(Format::Json)
.send()
.await?;
println!("{}", std::str::from_utf8(&resp_bytes)?);
Ok(())
}

View File

@ -0,0 +1,476 @@
use crate::line_protocol_generator::{create_generators, Generator};
use crate::report::WriteReporter;
use crate::specification::DataSpec;
use anyhow::Context;
use chrono::{DateTime, Local};
use clap::Parser;
use influxdb3_client::{Client, Precision};
use secrecy::{ExposeSecret, Secret};
use std::ops::Add;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use url::Url;
use super::common::InfluxDb3Config;
#[derive(Debug, Parser)]
#[clap(visible_alias = "w", trailing_var_arg = true)]
pub struct Config {
/// Common InfluxDB 3.0 config
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// The path to the spec file to use for this run. Or specify a name of a builtin spec to use.
/// If not specified, the generator will output a list of builtin specs along with help and
/// an example for writing your own.
#[clap(short = 's', long = "spec", env = "INFLUXDB3_LOAD_DATA_SPEC_PATH")]
spec_path: Option<String>,
/// The name of the builtin spec to run. Use this instead of spec_path if you want to run
/// one of the builtin specs as is.
#[clap(long = "builtin-spec", env = "INFLUXDB3_LOAD_BUILTIN_SPEC")]
builtin_spec: Option<String>,
/// The name of the builtin spec to print to stdout. This is useful for seeing the structure
/// of the builtin as a starting point for creating your own.
#[clap(long = "print-spec")]
print_spec: Option<String>,
/// Sampling interval for the writers. They will generate data at this interval and
/// sleep for the remainder of the interval. Writers stagger writes by this interval divided
/// by the number of writers.
#[clap(
short = 'i',
long = "interval",
env = "INFLUXDB3_LOAD_SAMPLING_INTERVAL",
default_value = "1s"
)]
sampling_interval: humantime::Duration,
/// Number of simultaneous writers. Each writer will generate data at the specified interval.
#[clap(
short = 'w',
long = "writer-count",
env = "INFLUXDB3_LOAD_WRITERS",
default_value = "1"
)]
writer_count: usize,
/// Tells the generator to run a single sample for each writer in `writer-count` and output the data to stdout.
#[clap(long = "dry-run", default_value = "false")]
dry_run: bool,
/// The date and time at which to start the timestamps of the generated data.
///
/// Can be an exact datetime like `2020-01-01T01:23:45-05:00` or a fuzzy
/// specification like `1 hour` in the past. If not specified, defaults to now.
#[clap(long, action)]
start: Option<String>,
/// The date and time at which to stop the timestamps of the generated data.
///
/// Can be an exact datetime like `2020-01-01T01:23:45-05:00` or a fuzzy
/// specification like `1 hour` in the future. If not specified, data will continue generating forever.
#[clap(long, action)]
end: Option<String>,
/// The file that will be used to write the results of the run. If not specified, results
/// will be written to <spec_name>_results.csv in the current directory.
#[clap(
short = 'r',
long = "results",
env = "INFLUXDB3_WRITE_LOAD_RESULTS_FILE"
)]
results_file: Option<String>,
}
pub(crate) async fn command(config: Config) -> Result<(), anyhow::Error> {
let built_in_specs = crate::specs::built_in_specs();
if config.spec_path.is_none() && config.print_spec.is_none() && config.builtin_spec.is_none() {
let example = built_in_specs.first().unwrap();
let mut generators = create_generators(&example.write_spec, 2).unwrap();
let t = 123;
let dry_run_output_1 = generators.get_mut(0).unwrap().dry_run(t);
let dry_run_output_2 = generators.get_mut(1).unwrap().dry_run(t);
let builtin_help = built_in_specs
.iter()
.map(|spec| {
format!(
"name: {}\ndescription: {}\n",
spec.write_spec.name, spec.description
)
})
.collect::<Vec<String>>()
.join("\n");
println!(
r#"You didn't provide a spec path, which is required. For more information about the arguments for this command run:
influxdb_load_generator write --help
There are some built in specs that you can run just by specifying their name. If you want
to see the JSON for their structure as a starting point, specify their name as the --print-spec
argument. Here's a list of the builtin specs:
{}
Or, if you need a more detailed writeup on specs and how they work here are details about
the example. A spec is just a JSON object specifying how to generate measurements and their
tags and fields. All data will have a millisecond timestamp generated (with that precision
specified) and aligned with the sampling. The generator will run against a single database
and can have many concurrent writers. The spec indicates the shape of the data that should
be generated.
As the generator runs, it will output basic information to stdout. The stats of each
individual request will be written to a results CSV file that you can use after the run to
analyze the performance of write requests to the server.
In the data spec there is an array of measurements. Within each is an array of tags
and an array of fields. Measurements have a name while tags and fields have keys (i.e. tag
key and field key). Tags and fields are scoped to the measurement they are under. If a
tag with key 'foo' appears under two different measurements they are considered different
tags. The same goes for fields. All measurements must have at least 1 field and can have 0
or more tags.
The measurement, tag and field structs have an option called 'copies' which is an integer.
When specified, the data generator will create that many copies of the measurement, tag,
or field and append the copy number to the name/keys. This is useful for generating a large
schema in a test.
Tags have two options that work together that need explanation: cardinality, and
lines_per_sample. Cardinality is the number of unique values that the tag will have.
This cardinality will be split across the number of writers in a test run. Thus if you have
1,000 cardinality and a single writer, the unique values will all get written by that writer.
If you have 1,000 cardinality and 10 writers, each writer will write 100 unique values.
The lines_per_sample option on the measurement is used to control how many of the unique
values are used in a single sampling round. If not specified, all unique values will be used.
This number will be rounded down to the cardinality of the tag with the highest cardinality
for the measurement. This is done on a per writer basis. If you have lines_per_sample of 10
and a tag of 100 cardinality with 1 writer, it will generate 10 lines of that measurement with
each unique tag value going to the next 10 values on the next sample, taking 10 samples to get
through the 100 uniques before it cycles back to the beginning.
Separately, cardinality of tags will be split across the number of writers you have. So if
you have cardinality of 100 and 1 writer, by default it will generate 100 lines of that
measurement with each unique tag value. If you have 10 writers, each writer will generate 10
unique tag values. Thus with 10 writers, the lines_per_sample would max at 10 since each
sample can only generate 10 unique tag values.
The tag spec also has a boolean option called "append_writer_id". Writers are the individual
threads that run and generate and write samples at the same time. The number is set through
the parameter --writer-count. If append_writer_id is set to true, the generator will append
the writer id to the tag value. This is useful for generating unique tag values across
writers, simulating a host id or something similar.
Fields have options for generating static data, or randomly generated data within a range. For
strings, you can specify a static string or a random string of a certain length. Another option
worth noting is the null_probability. This is a float between 0 and 1 that indicates the probability
that a field will be null. If this option is used, you must have another field that does not use
this option (i.e. you must always have at least one field that is guaranteed to have a value).
If you're unsure how an option works or what it will produce, the easiest thing to do is to create
a file and run the generator with the --dry-run option. This will output the data to stdout so you
can see what it looks like before you run it against a server. It will use the --writer-count
value and show what each writer would send in a sample.
The example below shows this functionality generating different kinds of tags and
fields of different value types. First, we show the spec, then we show the output that gets
generated on a dry-run so you can see how the spec translates into generated line protocol.
Here's the spec:
{}
And when run with writer count set to 2, here's what will be sent in a request by each writer.
Writer 1:
{}
Writer 2:
{}"#,
builtin_help,
example.write_spec.to_json_string_pretty().unwrap(),
dry_run_output_1,
dry_run_output_2
);
return Ok(());
}
// if print spec is set, print the spec and exit
if let Some(spec_name) = config.print_spec {
let spec = built_in_specs
.iter()
.find(|spec| spec.write_spec.name == spec_name)
.context("Spec not found")?;
println!("{}", spec.write_spec.to_json_string_pretty()?);
return Ok(());
}
// if builtin spec is set, use that instead of the spec path
let spec = if let Some(builtin_spec) = config.builtin_spec {
let builtin = built_in_specs
.into_iter()
.find(|spec| spec.write_spec.name == builtin_spec)
.context("Spec not found")?;
println!("using builtin spec: {}", builtin.write_spec.name);
builtin.write_spec
} else {
println!("reading spec from: {}", config.spec_path.as_ref().unwrap());
DataSpec::from_path(&config.spec_path.unwrap())?
};
println!(
"creating generators for {} concurrent writers",
config.writer_count
);
let mut generators =
create_generators(&spec, config.writer_count).context("failed to create generators")?;
// if dry run is set, output from each generator its id and then a single sample
if config.dry_run {
println!("running dry run for each writer\n");
for g in &mut generators {
let t = Local::now();
let dry_run_output = g.dry_run(t.timestamp_millis());
println!("Writer {}:\n{}", g.writer_id, dry_run_output);
}
return Ok(());
}
let start_time = if let Some(start_time) = config.start {
let start_time = parse_time_offset(&start_time, Local::now());
println!("starting writers from a start time of {:?}. Historical replay will happen as fast as possible until catching up to now or hitting the end time.", start_time);
Some(start_time)
} else {
None
};
let end_time = if let Some(end_time) = config.end {
let end_time = parse_time_offset(&end_time, Local::now());
println!("ending at {:?}", end_time);
Some(end_time)
} else {
println!(
"running indefinitely with each writer sending a request every {}",
config.sampling_interval
);
None
};
let results_file = config
.results_file
.unwrap_or_else(|| format!("{}_results.csv", spec.name));
// exit if the results file already exists
if std::path::Path::new(&results_file).exists() {
eprintln!(
"results file already exists, use a different file name or delete it and re-run: {}",
results_file
);
std::process::exit(1);
}
println!("writing results to: {}", results_file);
let write_reporter =
Arc::new(WriteReporter::new(&results_file).context("failed to create write reporter")?);
// blocking task to periodically flush the report to disk
let reporter = Arc::clone(&write_reporter);
tokio::task::spawn_blocking(move || {
reporter.flush_reports();
});
// spawn tokio tasks for each writer
let client = create_client(
config.influxdb3_config.host_url,
config.influxdb3_config.auth_token,
)?;
let mut tasks = Vec::new();
for generator in generators {
let reporter = Arc::clone(&write_reporter);
let database_name = config.influxdb3_config.database_name.clone();
let sampling_interval = config.sampling_interval.into();
let task = tokio::spawn(run_generator(
generator,
client.clone(),
database_name,
reporter,
sampling_interval,
start_time,
end_time,
));
tasks.push(task);
}
// wait for all tasks to complete
for task in tasks {
task.await?;
}
println!("all writers finished");
write_reporter.shutdown();
println!("reporter closed and results written to {}", results_file);
Ok(())
}
fn create_client(
host_url: Url,
auth_token: Option<Secret<String>>,
) -> Result<Client, influxdb3_client::Error> {
let mut client = Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}
Ok(client)
}
fn parse_time_offset(s: &str, now: DateTime<Local>) -> DateTime<Local> {
humantime::parse_rfc3339(s)
.map(Into::into)
.unwrap_or_else(|_| {
let std_duration = humantime::parse_duration(s).expect("Could not parse time");
let chrono_duration = chrono::Duration::from_std(std_duration)
.expect("Could not convert std::time::Duration to chrono::Duration");
now - chrono_duration
})
}
async fn run_generator(
mut generator: Generator,
client: Client,
database_name: String,
reporter: Arc<WriteReporter>,
sampling_interval: Duration,
start_time: Option<DateTime<Local>>,
end_time: Option<DateTime<Local>>,
) {
let mut sample_buffer = vec![];
// if the start time is set, load the historical samples as quickly as possible
if let Some(mut start_time) = start_time {
let mut sample_len = write_sample(
&mut generator,
sample_buffer,
&client,
&database_name,
start_time,
&reporter,
true,
)
.await;
loop {
start_time = start_time.add(sampling_interval);
if start_time > Local::now()
|| end_time
.map(|end_time| start_time > end_time)
.unwrap_or(false)
{
println!(
"writer {} finished historical replay at: {:?}",
generator.writer_id, start_time
);
break;
}
sample_buffer = Vec::with_capacity(sample_len);
sample_len = write_sample(
&mut generator,
sample_buffer,
&client,
&database_name,
start_time,
&reporter,
false,
)
.await;
}
}
// write data until end time or forever
let mut interval = tokio::time::interval(sampling_interval);
let mut sample_len = 1024 * 1024 * 1024;
// we only want to print the error the very first time it happens
let mut print_err = false;
loop {
interval.tick().await;
let now = Local::now();
if let Some(end_time) = end_time {
if now > end_time {
println!(
"writer {} finished writing to end time: {:?}",
generator.writer_id, end_time
);
return;
}
}
sample_buffer = Vec::with_capacity(sample_len);
sample_len = write_sample(
&mut generator,
sample_buffer,
&client,
&database_name,
now,
&reporter,
print_err,
)
.await;
print_err = true;
}
}
async fn write_sample(
generator: &mut Generator,
mut buffer: Vec<u8>,
client: &Client,
database_name: &String,
sample_time: DateTime<Local>,
reporter: &Arc<WriteReporter>,
print_err: bool,
) -> usize {
// generate the sample, and keep track of the length to set the buffer size for the next loop
let summary = generator
.write_sample_to(sample_time.timestamp_millis(), &mut buffer)
.expect("failed to write sample");
let sample_len = buffer.len();
let body = String::from_utf8(buffer).expect("failed to convert sample to string");
// time and send the write request
let start_request = Instant::now();
let res = client
.api_v3_write_lp(database_name)
.precision(Precision::Millisecond)
.accept_partial(false)
.body(body)
.send()
.await;
let response_time = start_request.elapsed().as_millis() as u64;
// log the report
match res {
Ok(_) => {
reporter.report_write(generator.writer_id, summary, response_time, Local::now());
}
Err(e) => {
// if it's the first error, print the details
if print_err {
eprintln!(
"Error on writer {} writing to server: {:?}",
generator.writer_id, e
);
}
reporter.report_failure(generator.writer_id, response_time, Local::now());
}
}
sample_len
}

View File

@ -0,0 +1,557 @@
//! This contains the logic for creating generators for a given spec for the number of workers.
use crate::specification::{DataSpec, FieldKind, MeasurementSpec};
use rand::distributions::Alphanumeric;
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use std::collections::HashMap;
use std::io::Write;
use std::ops::Range;
use std::sync::Arc;
use tokio::io;
pub type WriterId = usize;
pub fn create_generators(
spec: &DataSpec,
writer_count: usize,
) -> Result<Vec<Generator>, anyhow::Error> {
let mut generators = vec![];
let mut arc_strings = HashMap::new();
for writer_id in 1..writer_count + 1 {
let mut measurements = vec![];
for m in &spec.measurements {
let copies = m.copies.unwrap_or(1);
for measurement_id in 1..copies + 1 {
measurements.push(create_measurement(
m,
writer_id,
writer_count,
measurement_id,
&mut arc_strings,
));
}
}
generators.push(Generator {
writer_id,
measurements,
});
}
Ok(generators)
}
fn create_measurement<'a>(
spec: &'a MeasurementSpec,
writer_id: WriterId,
writer_count: usize,
measurement_id: usize,
arc_strings: &mut HashMap<&'a str, Arc<str>>,
) -> Measurement {
let name = Arc::clone(arc_strings.entry(spec.name.as_str()).or_insert_with(|| {
let m = spec.name.replace(' ', "\\ ").replace(',', "\\,");
Arc::from(m.as_str())
}));
let max_cardinality = spec
.tags
.iter()
.map(|t| t.cardinality.unwrap_or(1))
.max()
.unwrap_or(1);
let max_cardinality = usize::div_ceil(max_cardinality, writer_count);
let lines_per_sample = spec.lines_per_sample.unwrap_or(max_cardinality);
let mut tags = vec![];
for t in &spec.tags {
let key = Arc::clone(arc_strings.entry(t.key.as_str()).or_insert_with(|| {
let k = t
.key
.replace(' ', "\\ ")
.replace(',', "\\,")
.replace('=', "\\=");
Arc::from(k.as_str())
}));
let value = t.value.as_ref().map(|v| {
Arc::clone(arc_strings.entry(v.as_str()).or_insert_with(|| {
let v = v
.replace(' ', "\\ ")
.replace(',', "\\,")
.replace('=', "\\=");
Arc::from(v.as_str())
}))
});
let (cardinality_id_min, cardinality_id_max) = t
.cardinality_min_max(writer_id, writer_count)
.unwrap_or((0, 0));
let append_writer_id = t.append_writer_id.unwrap_or(false);
let append_copy_id = t.append_copy_id.unwrap_or(false);
let copies = t.copies.unwrap_or(1);
for copy_id in 1..copies + 1 {
tags.push(Tag {
key: Arc::clone(&key),
value: value.clone(),
copy_id,
cardinality_id_min,
cardinality_id_max,
cardinality_id_current: cardinality_id_min,
append_writer_id,
append_copy_id,
});
}
}
let mut fields = vec![];
for f in &spec.fields {
let key = Arc::clone(arc_strings.entry(f.key.as_str()).or_insert_with(|| {
let k = f
.key
.replace(' ', "\\ ")
.replace(',', "\\,")
.replace('=', "\\=");
Arc::from(k.as_str())
}));
let copies = f.copies.unwrap_or(1);
for copy_id in 1..copies + 1 {
let random_null = f.null_probability.map(|p| (p, SmallRng::from_entropy()));
match &f.field {
FieldKind::Bool(_) => {
fields.push(Field {
key: Arc::clone(&key),
copy_id,
random_null,
field_value: FieldValue::Boolean(BooleanValue::Random(
SmallRng::from_entropy(),
)),
});
}
FieldKind::String(s) => {
fields.push(Field {
key: Arc::clone(&key),
copy_id,
random_null,
field_value: FieldValue::String(StringValue::Fixed(Arc::clone(
arc_strings
.entry(s.as_str())
.or_insert_with(|| Arc::from(s.as_str())),
))),
});
}
FieldKind::StringRandom(size) => {
fields.push(Field {
key: Arc::clone(&key),
copy_id,
random_null,
field_value: FieldValue::String(StringValue::Random(
*size,
SmallRng::from_entropy(),
)),
});
}
FieldKind::Integer(i) => {
fields.push(Field {
key: Arc::clone(&key),
copy_id,
random_null,
field_value: FieldValue::Integer(IntegerValue::Fixed(*i)),
});
}
FieldKind::IntegerRange(min, max) => {
fields.push(Field {
key: Arc::clone(&key),
copy_id,
random_null,
field_value: FieldValue::Integer(IntegerValue::Random(
Range {
start: *min,
end: *max,
},
SmallRng::from_entropy(),
)),
});
}
FieldKind::Float(f) => {
fields.push(Field {
key: Arc::clone(&key),
copy_id,
random_null,
field_value: FieldValue::Float(FloatValue::Fixed(*f)),
});
}
FieldKind::FloatRange(min, max) => {
fields.push(Field {
key: Arc::clone(&key),
copy_id,
random_null,
field_value: FieldValue::Float(FloatValue::Random(
Range {
start: *min,
end: *max,
},
SmallRng::from_entropy(),
)),
});
}
}
}
}
Measurement {
name,
copy_id: measurement_id,
tags,
fields,
lines_per_sample,
}
}
/// This struct holds the generator for each writer.
#[derive(Debug)]
pub struct Generator {
pub writer_id: WriterId,
measurements: Vec<Measurement>,
}
impl Generator {
pub fn new(writer_id: WriterId) -> Self {
Self {
writer_id,
measurements: Vec::new(),
}
}
/// Return a single sample run from the generator as a string.
pub fn dry_run(&mut self, timestamp: i64) -> String {
// create a buffer and write a single sample to it
let mut buffer = Vec::new();
self.write_sample_to(timestamp, &mut buffer)
.expect("writing to buffer should succeed");
// convert the buffer to a string and return it
String::from_utf8(buffer).expect("buffer should be valid utf8")
}
pub fn write_sample_to<W: Write>(
&mut self,
timestamp: i64,
mut w: W,
) -> io::Result<WriteSummary> {
let mut write_summary = WriteSummary {
bytes_written: 0,
lines_written: 0,
tags_written: 0,
fields_written: 0,
};
let mut w = ByteCounter::new(&mut w);
for measurement in &mut self.measurements {
for _ in 0..measurement.lines_per_sample {
if measurement.copy_id > 1 {
write!(w, "{}_{}", measurement.name, measurement.copy_id)?;
} else {
write!(w, "{}", measurement.name)?;
}
for tag in &mut measurement.tags {
tag.write_to(self.writer_id, &mut w)?;
}
write_summary.tags_written += measurement.tags.len();
for (i, field) in measurement.fields.iter_mut().enumerate() {
let separator = if i == 0 { " " } else { "," };
write!(w, "{}", separator)?;
field.write_to(&mut w)?;
}
write_summary.fields_written += measurement.fields.len();
writeln!(w, " {}", timestamp)?;
write_summary.lines_written += 1;
}
}
write_summary.bytes_written = w.bytes_written();
Ok(write_summary)
}
}
#[derive(Debug, Clone, Copy)]
pub struct WriteSummary {
pub bytes_written: usize,
pub lines_written: usize,
pub tags_written: usize,
pub fields_written: usize,
}
#[derive(Debug)]
struct Measurement {
name: Arc<str>,
copy_id: usize,
tags: Vec<Tag>,
fields: Vec<Field>,
lines_per_sample: usize,
}
#[derive(Debug)]
struct Tag {
key: Arc<str>,
value: Option<Arc<str>>,
copy_id: usize,
cardinality_id_min: usize,
cardinality_id_max: usize,
cardinality_id_current: usize,
append_writer_id: bool,
append_copy_id: bool,
}
impl Tag {
fn write_to<W: Write>(
&mut self,
writer_id: WriterId,
w: &mut ByteCounter<W>,
) -> io::Result<()> {
if self.copy_id > 1 {
write!(w, ",{}_{}=", self.key, self.copy_id)?;
} else {
write!(w, ",{}=", self.key)?;
}
if let Some(v) = &self.value {
write!(w, "{}", v)?;
}
// append the writer id with a preceding w if we're supposed to
if self.append_writer_id {
write!(w, "{}", writer_id)?;
}
// append the copy id with a preceding c if we're supposed to
if self.append_copy_id {
write!(w, "{}", self.copy_id)?;
}
// keep track of the cardinality id if min and max are different
if self.cardinality_id_min != 0 && self.cardinality_id_max != 0 {
// reset the id back to min if we've cycled through them all
if self.cardinality_id_current > self.cardinality_id_max {
self.cardinality_id_current = self.cardinality_id_min;
}
// write the cardinality counter value to the tag value
write!(w, "{}", self.cardinality_id_current)?;
self.cardinality_id_current += 1;
}
Ok(())
}
}
#[derive(Debug)]
struct Field {
key: Arc<str>,
copy_id: usize,
random_null: Option<(f64, SmallRng)>,
field_value: FieldValue,
}
#[derive(Debug)]
enum FieldValue {
Integer(IntegerValue),
Float(FloatValue),
String(StringValue),
Boolean(BooleanValue),
}
impl Field {
fn write_to<W: Write>(&mut self, w: &mut ByteCounter<W>) -> io::Result<()> {
// if there are random nulls, check and return without writing the field if it hits the
// probability
if let Some((probability, rng)) = &mut self.random_null {
let val: f64 = rng.gen();
if val <= *probability {
return Ok(());
}
}
if self.copy_id > 1 {
write!(w, "{}_{}=", self.key, self.copy_id)?;
} else {
write!(w, "{}=", self.key)?;
}
match &mut self.field_value {
FieldValue::Integer(f) => match f {
IntegerValue::Fixed(v) => write!(w, "{}i", v)?,
IntegerValue::Random(range, rng) => {
let v: i64 = rng.gen_range(range.clone());
write!(w, "{}i", v)?;
}
},
FieldValue::Float(f) => match f {
FloatValue::Fixed(v) => write!(w, "{}", v)?,
FloatValue::Random(range, rng) => {
let v: f64 = rng.gen_range(range.clone());
write!(w, "{:.3}", v)?;
}
},
FieldValue::String(s) => match s {
StringValue::Fixed(v) => write!(w, "\"{}\"", v)?,
StringValue::Random(size, rng) => {
let random: String = rng
.sample_iter(&Alphanumeric)
.take(*size)
.map(char::from)
.collect();
write!(w, "\"{}\"", random)?;
}
},
FieldValue::Boolean(f) => match f {
BooleanValue::Random(rng) => {
let v: bool = rng.gen();
write!(w, "{}", v)?;
}
},
}
Ok(())
}
}
#[derive(Debug)]
enum IntegerValue {
Fixed(i64),
Random(Range<i64>, SmallRng),
}
#[derive(Debug)]
enum FloatValue {
Fixed(f64),
Random(Range<f64>, SmallRng),
}
#[derive(Debug)]
enum StringValue {
Fixed(Arc<str>),
Random(usize, SmallRng),
}
#[derive(Debug)]
enum BooleanValue {
Random(SmallRng),
}
struct ByteCounter<W> {
inner: W,
count: usize,
}
impl<W> ByteCounter<W>
where
W: Write,
{
fn new(inner: W) -> Self {
Self { inner, count: 0 }
}
fn bytes_written(&self) -> usize {
self.count
}
}
impl<W> Write for ByteCounter<W>
where
W: Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let res = self.inner.write(buf);
if let Ok(size) = res {
self.count += size
}
res
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::specification::{FieldSpec, TagSpec};
#[test]
fn example_spec_lp() {
let spec = DataSpec {
name: "foo".to_string(),
measurements: vec![MeasurementSpec {
name: "m".to_string(),
tags: vec![TagSpec {
key: "t".to_string(),
copies: Some(2),
append_copy_id: None,
value: Some("w".to_string()),
append_writer_id: None,
cardinality: Some(10),
}],
fields: vec![
FieldSpec {
key: "i".to_string(),
copies: Some(2),
null_probability: None,
field: FieldKind::Integer(42),
},
FieldSpec {
key: "f".to_string(),
copies: None,
null_probability: None,
field: FieldKind::Float(6.8),
},
FieldSpec {
key: "s".to_string(),
copies: None,
null_probability: None,
field: FieldKind::String("hello".to_string()),
},
],
copies: Some(1),
lines_per_sample: Some(2),
}],
};
let mut generators = create_generators(&spec, 2).unwrap();
let lp = generators.get_mut(0).unwrap().dry_run(123);
let actual: Vec<&str> = lp.split('\n').collect();
let expected: Vec<&str> = vec![
"m,t=w1,t_2=w1 i=42i,i_2=42i,f=6.8,s=\"hello\" 123",
"m,t=w2,t_2=w2 i=42i,i_2=42i,f=6.8,s=\"hello\" 123",
"",
];
assert_eq!(actual, expected);
let lp = generators.get_mut(1).unwrap().dry_run(567);
let actual: Vec<&str> = lp.split('\n').collect();
let expected: Vec<&str> = vec![
"m,t=w6,t_2=w6 i=42i,i_2=42i,f=6.8,s=\"hello\" 567",
"m,t=w7,t_2=w7 i=42i,i_2=42i,f=6.8,s=\"hello\" 567",
"",
];
assert_eq!(actual, expected);
}
}

View File

@ -0,0 +1,166 @@
//! Entrypoint of InfluxDB IOx binary
#![recursion_limit = "512"] // required for print_cpu
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::use_self,
clippy::clone_on_ref_ptr,
clippy::future_not_send
)]
pub mod line_protocol_generator;
pub mod report;
pub mod specification;
mod specs;
pub mod commands {
pub mod common;
pub mod query;
pub mod write;
}
use dotenvy::dotenv;
use observability_deps::tracing::warn;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::runtime::Runtime;
enum ReturnCode {
Failure = 1,
}
#[derive(Debug, clap::Parser)]
#[clap(
name = "influxdb3_load_generator",
disable_help_flag = true,
arg(
clap::Arg::new("help")
.long("help")
.help("Print help information")
.action(clap::ArgAction::Help)
.global(true)
),
about = "InfluxDB 3.0 Load Generator for writes and queries",
long_about = r#"InfluxDB 3.0 Load Generator for writes and queries
Examples:
# Run the write load generator
influxdb3_load_generator write --help
# Generate a sample write spec
influxdb3_load_generator write --generate-spec
# Run the the query load generator
influxdb3_load_generator query --help
# Generate a sample query spec
influxdb3_load_generator query --generate-spec
# Display all commands
influxdb3_load_generator --help
"#
)]
struct Config {
#[clap(subcommand)]
command: Option<Command>,
}
// Ignoring clippy here since this enum is just used for running
// the CLI command
#[allow(clippy::large_enum_variant)]
#[derive(Debug, clap::Parser)]
#[allow(clippy::large_enum_variant)]
enum Command {
/// Perform a query against a running InfluxDB 3.0 server
Query(commands::query::Config),
/// Perform a set of writes to a running InfluxDB 3.0 server
Write(commands::write::Config),
}
fn main() -> Result<(), std::io::Error> {
// load all environment variables from .env before doing anything
load_dotenv();
let config: Config = clap::Parser::parse();
let tokio_runtime = get_runtime(None)?;
tokio_runtime.block_on(async move {
match config.command {
None => println!("command required, --help for help"),
Some(Command::Query(config)) => {
if let Err(e) = commands::query::command(config).await {
eprintln!("Query command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Write(config)) => {
if let Err(e) = commands::write::command(config).await {
eprintln!("Write command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
}
});
Ok(())
}
/// Creates the tokio runtime for executing
///
/// if nthreads is none, uses the default scheduler
/// otherwise, creates a scheduler with the number of threads
fn get_runtime(num_threads: Option<usize>) -> Result<Runtime, std::io::Error> {
// NOTE: no log macros will work here!
//
// That means use eprintln!() instead of error!() and so on. The log emitter
// requires a running tokio runtime and is initialised after this function.
use tokio::runtime::Builder;
let kind = std::io::ErrorKind::Other;
match num_threads {
None => Runtime::new(),
Some(num_threads) => {
println!("Setting number of threads to '{num_threads}' per command line request");
let thread_counter = Arc::new(AtomicUsize::new(1));
match num_threads {
0 => {
let msg =
format!("Invalid num-threads: '{num_threads}' must be greater than zero");
Err(std::io::Error::new(kind, msg))
}
1 => Builder::new_current_thread().enable_all().build(),
_ => Builder::new_multi_thread()
.enable_all()
.thread_name_fn(move || {
format!("IOx main {}", thread_counter.fetch_add(1, Ordering::SeqCst))
})
.worker_threads(num_threads)
.build(),
}
}
}
}
/// Source the .env file before initialising the Config struct - this sets
/// any envs in the file, which the Config struct then uses.
///
/// Precedence is given to existing env variables.
fn load_dotenv() {
match dotenv() {
Ok(_) => {}
Err(dotenvy::Error::Io(err)) if err.kind() == std::io::ErrorKind::NotFound => {
// Ignore this - a missing env file is not an error, defaults will
// be applied when initialising the Config struct.
}
Err(e) => {
eprintln!("FATAL Error loading config from: {e}");
eprintln!("Aborting");
std::process::exit(1);
}
};
}

View File

@ -0,0 +1,196 @@
//! Trackers and report generators for write and query runs
use crate::line_protocol_generator::{WriteSummary, WriterId};
use anyhow::Context;
use chrono::{DateTime, Local};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::time::{Duration, Instant};
// Logged reports will be flushed to the csv file on this interval
const REPORT_FLUSH_INTERVAL: Duration = Duration::from_millis(100);
const CONSOLE_REPORT_INTERVAL: Duration = Duration::from_secs(1);
#[derive(Debug, Clone, Copy)]
pub struct WriterReport {
summary: Option<WriteSummary>, // failed write if none
write_instant: Instant,
wall_time: DateTime<Local>,
response_time_ms: u64,
writer_id: usize,
}
#[derive(Debug)]
pub struct WriteReporter {
state: Mutex<Vec<WriterReport>>,
csv_writer: Mutex<csv::Writer<std::fs::File>>,
shutdown: Mutex<bool>,
}
impl WriteReporter {
pub fn new(csv_filename: &str) -> Result<Self, anyhow::Error> {
// open csv file for writing
let mut csv_writer = csv::Writer::from_path(csv_filename)?;
// write header
csv_writer
.write_record([
"writer_id",
"response",
"latency_ms",
"test_time_ms",
"sample_number",
"bytes",
"lines",
"tags",
"fields",
"wall_time",
])
.context("failed to write csv report header")?;
Ok(Self {
state: Mutex::new(Vec::new()),
csv_writer: Mutex::new(csv_writer),
shutdown: Mutex::new(false),
})
}
pub fn report_failure(
&self,
writer_id: usize,
response_time_ms: u64,
wall_time: DateTime<Local>,
) {
let mut state = self.state.lock();
state.push(WriterReport {
summary: None,
write_instant: Instant::now(),
wall_time,
response_time_ms,
writer_id,
});
}
pub fn report_write(
&self,
writer_id: usize,
summary: WriteSummary,
response_time_ms: u64,
wall_time: DateTime<Local>,
) {
let mut state = self.state.lock();
state.push(WriterReport {
summary: Some(summary),
write_instant: Instant::now(),
wall_time,
response_time_ms,
writer_id,
});
}
/// Run in a spawn blocking task to flush reports to the csv file
pub fn flush_reports(&self) {
let start_time = Instant::now();
let mut sample_counts: HashMap<WriterId, usize> = HashMap::new();
let mut console_stats = ConsoleReportStats::new();
loop {
let reports = {
let mut state = self.state.lock();
let mut reports = Vec::with_capacity(state.len());
std::mem::swap(&mut reports, &mut *state);
reports
};
let mut csv_writer = self.csv_writer.lock();
for report in reports {
let test_time = report.write_instant.duration_since(start_time).as_millis();
let sample_number = sample_counts.entry(report.writer_id).or_insert(0);
*sample_number += 1;
if let Some(summary) = report.summary {
csv_writer
.write_record(&[
report.writer_id.to_string(),
"200".to_string(),
report.response_time_ms.to_string(),
test_time.to_string(),
sample_number.to_string(),
summary.bytes_written.to_string(),
summary.lines_written.to_string(),
summary.tags_written.to_string(),
summary.fields_written.to_string(),
report.wall_time.to_string(),
])
.expect("failed to write csv report record");
console_stats.success += 1;
console_stats.lines += summary.lines_written;
console_stats.bytes += summary.bytes_written;
} else {
csv_writer
.write_record(&[
report.writer_id.to_string(),
"500".to_string(),
report.response_time_ms.to_string(),
test_time.to_string(),
sample_number.to_string(),
"0".to_string(),
"0".to_string(),
"0".to_string(),
"0".to_string(),
report.wall_time.to_string(),
])
.expect("failed to write csv report record");
console_stats.error += 1;
}
}
csv_writer.flush().expect("failed to flush csv reports");
if console_stats.last_console_output_time.elapsed() > CONSOLE_REPORT_INTERVAL {
let elapsed_millis = console_stats.last_console_output_time.elapsed().as_millis();
println!(
"success: {:.0}/s, error: {:.0}/s, lines: {:.0}/s, bytes: {:.0}/s",
console_stats.success as f64 / elapsed_millis as f64 * 1000.0,
console_stats.error as f64 / elapsed_millis as f64 * 1000.0,
console_stats.lines as f64 / elapsed_millis as f64 * 1000.0,
console_stats.bytes as f64 / elapsed_millis as f64 * 1000.0,
);
console_stats = ConsoleReportStats::new();
}
if *self.shutdown.lock() {
return;
}
std::thread::sleep(REPORT_FLUSH_INTERVAL);
}
}
pub fn shutdown(&self) {
*self.shutdown.lock() = true;
}
}
struct ConsoleReportStats {
last_console_output_time: Instant,
success: usize,
error: usize,
lines: usize,
bytes: usize,
}
impl ConsoleReportStats {
fn new() -> Self {
Self {
last_console_output_time: Instant::now(),
success: 0,
error: 0,
lines: 0,
bytes: 0,
}
}
}

View File

@ -0,0 +1,170 @@
use crate::line_protocol_generator::WriterId;
use anyhow::Context;
use serde::{Deserialize, Serialize};
/// The specification for the data to be generated
#[derive(Debug, Deserialize, Serialize)]
pub struct DataSpec {
/// The name of this spec
pub name: String,
/// The measurements to be generated for each sample
pub measurements: Vec<MeasurementSpec>,
}
impl DataSpec {
pub fn from_path(path: &str) -> Result<Self, anyhow::Error> {
let contents = std::fs::read_to_string(path)?;
let res = serde_json::from_str(&contents)?;
Ok(res)
}
pub fn to_json_string_pretty(&self) -> Result<String, anyhow::Error> {
let res = serde_json::to_string_pretty(&self).context("failed to encode json to string")?;
Ok(res)
}
}
/// Specification for a measurement to be generated
#[derive(Debug, Deserialize, Serialize)]
pub struct MeasurementSpec {
/// The name of the measurement
pub name: String,
/// The tags to be generated for each line
pub tags: Vec<TagSpec>,
/// The fields to be generated for each line
pub fields: Vec<FieldSpec>,
/// Create this many copies of this measurement in each sample. The copy number will be
/// appended to the measurement name to uniquely identify it.
#[serde(skip_serializing_if = "Option::is_none")]
pub copies: Option<usize>,
/// If this measurement has tags with cardinality, this is the number of lines that will
/// be output per sample (up to the highest cardinality tag). If not specified, all unique
/// values will be used. Cardinality is split across the number of workers, so the number
/// of lines per sample could be less than this number.
#[serde(skip_serializing_if = "Option::is_none")]
pub lines_per_sample: Option<usize>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct TagSpec {
/// the key/name of the tag
pub key: String,
/// have this many copies of this tag in the measurement. Random values will be generated
/// independently (i.e. copies won't share the same random value). Will add the copy number to
/// the key of the tag to uniquely identify it.
#[serde(skip_serializing_if = "Option::is_none")]
pub copies: Option<usize>,
/// if set, appends the copy id of the tag to the value of the tag
#[serde(skip_serializing_if = "Option::is_none")]
pub append_copy_id: Option<bool>,
/// output this string value for every line this tag is present
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<String>,
/// if set, appends the writer id to the value of the tag
#[serde(skip_serializing_if = "Option::is_none")]
pub append_writer_id: Option<bool>,
/// will add a number to the value of the tag, with this number of unique values
#[serde(skip_serializing_if = "Option::is_none")]
pub cardinality: Option<usize>,
}
impl TagSpec {
pub fn cardinality_min_max(
&self,
writer_id: WriterId,
writer_count: usize,
) -> Option<(usize, usize)> {
if let Some(cardinality) = self.cardinality {
let cardinality_increment = usize::div_ceil(cardinality, writer_count);
let cardinality_id_min = writer_id * cardinality_increment - cardinality_increment + 1;
let cardinality_id_max = cardinality_id_min + cardinality_increment - 1;
Some((cardinality_id_min, cardinality_id_max))
} else {
None
}
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct FieldSpec {
// These options apply to any type of field
/// the key/name of the field
pub key: String,
/// have this many copies of this field in the measurement. Random values will be generated
/// independently (i.e. copies won't share the same random value). Will add the copy number to
/// the key of the field to uniquely identify it.
#[serde(skip_serializing_if = "Option::is_none")]
pub copies: Option<usize>,
/// A float between 0.0 and 1.0 that determines the probability that this field will be null.
/// At least one field in a measurement should not have this option set.
#[serde(skip_serializing_if = "Option::is_none")]
pub null_probability: Option<f64>,
#[serde(flatten)]
pub field: FieldKind,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum FieldKind {
/// generates a random bool for the value of this field
Bool(bool),
/// output this string value for every line this field is present
String(String),
/// generate a random string of this length for every line this field is present
StringRandom(usize),
/// output this integer value for every line this field is present
Integer(i64),
/// generate a random integer in this range for every line this field is present
IntegerRange(i64, i64),
/// output this float value for every line this field is present
Float(f64),
/// generate a random float in this range for every line this field is present
FloatRange(f64, f64),
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn tag_spec_splits_cardinality_for_writers() {
let mut tag_spec = TagSpec {
key: "".to_string(),
copies: None,
append_copy_id: None,
value: None,
append_writer_id: None,
cardinality: Some(100),
};
let (min, max) = tag_spec.cardinality_min_max(1, 10).unwrap();
assert_eq!(min, 1);
assert_eq!(max, 10);
let (min, max) = tag_spec.cardinality_min_max(2, 10).unwrap();
assert_eq!(min, 11);
assert_eq!(max, 20);
let (min, max) = tag_spec.cardinality_min_max(10, 10).unwrap();
assert_eq!(min, 91);
assert_eq!(max, 100);
// if the cardinality is not evenly divisible by the number of writers, the last writer
// will go over the cardinality set
tag_spec.cardinality = Some(30);
let (min, max) = tag_spec.cardinality_min_max(1, 7).unwrap();
assert_eq!(min, 1);
assert_eq!(max, 5);
let (min, max) = tag_spec.cardinality_min_max(4, 7).unwrap();
assert_eq!(min, 16);
assert_eq!(max, 20);
let (min, max) = tag_spec.cardinality_min_max(7, 7).unwrap();
assert_eq!(min, 31);
assert_eq!(max, 35);
}
}

View File

@ -0,0 +1,131 @@
//! Spec that shows the various elements of the data generator. Gets printed to console when
//! the generator is run without a spec specified.
use crate::specification::*;
use crate::specs::BuiltInSpec;
pub(crate) fn spec() -> BuiltInSpec {
let description =
r#"Example that shows the various elements of the data generator."#.to_string();
let write_spec = DataSpec {
name: "sample_spec".to_string(),
measurements: vec![
MeasurementSpec {
name: "some_measurement".to_string(),
tags: vec![
TagSpec {
key: "some_tag".to_string(),
copies: None,
append_copy_id: None,
value: Some("a-value-here".to_string()),
append_writer_id: None,
cardinality: None,
},
TagSpec {
key: "random_data_tag".to_string(),
copies: None,
append_copy_id: None,
value: Some("card-val-".to_string()),
append_writer_id: None,
cardinality: Some(2),
},
TagSpec {
key: "higher_cardinality_data_tag".to_string(),
copies: None,
append_copy_id: None,
value: Some("card-val-".to_string()),
append_writer_id: None,
cardinality: Some(6),
},
TagSpec {
key: "copied_tag".to_string(),
copies: Some(3),
append_copy_id: Some(true),
value: Some("copy-val-".to_string()),
append_writer_id: None,
cardinality: None,
},
TagSpec {
key: "writer_id".to_string(),
copies: None,
append_copy_id: None,
value: Some("writer-id-".to_string()),
append_writer_id: Some(true),
cardinality: None,
},
],
fields: vec![
FieldSpec {
key: "f1".to_string(),
copies: None,
null_probability: None,
field: FieldKind::Float(1.2),
},
FieldSpec {
key: "i1".to_string(),
copies: None,
null_probability: Some(0.6),
field: FieldKind::Integer(5),
},
],
copies: None,
lines_per_sample: None,
},
MeasurementSpec {
name: "copied_measurement".to_string(),
tags: vec![],
fields: vec![
FieldSpec {
key: "random_string".to_string(),
copies: None,
null_probability: None,
field: FieldKind::StringRandom(10),
},
FieldSpec {
key: "constant_string".to_string(),
copies: None,
null_probability: None,
field: FieldKind::String("a constant string".to_string()),
},
FieldSpec {
key: "random_integer".to_string(),
copies: None,
null_probability: None,
field: FieldKind::IntegerRange(1, 100),
},
FieldSpec {
key: "constant_integer".to_string(),
copies: None,
null_probability: None,
field: FieldKind::Integer(42),
},
FieldSpec {
key: "random_float".to_string(),
copies: None,
null_probability: None,
field: FieldKind::FloatRange(1.0, 100.0),
},
FieldSpec {
key: "constant_float".to_string(),
copies: None,
null_probability: None,
field: FieldKind::Float(6.8),
},
FieldSpec {
key: "random_bool".to_string(),
copies: None,
null_probability: None,
field: FieldKind::Bool(true),
},
],
copies: Some(2),
lines_per_sample: None,
},
],
};
BuiltInSpec {
description,
write_spec,
}
}

View File

@ -0,0 +1,17 @@
//! This module contains the built-in specifications for the load generator.
use crate::specification::DataSpec;
mod example;
mod one_mil;
/// Get all built-in specs
pub(crate) fn built_in_specs() -> Vec<BuiltInSpec> {
// add new built-in specs here to the end of this vec
vec![example::spec(), one_mil::spec()]
}
/// A built-in specification for the load generator
pub(crate) struct BuiltInSpec {
pub(crate) description: String,
pub(crate) write_spec: DataSpec,
}

View File

@ -0,0 +1,45 @@
//! Spec for the 1 million series use case
use crate::specification::*;
use crate::specs::BuiltInSpec;
pub(crate) fn spec() -> BuiltInSpec {
let description =
r#"1 million series in a single table use case. If you run this with -writer-count=100
you'll get all 1M series written every sampling interval. Our primary test is interval=10s"#.to_string();
let write_spec = DataSpec {
name: "one_mil".to_string(),
measurements: vec![MeasurementSpec {
name: "measurement_data".to_string(),
tags: vec![TagSpec {
key: "series_id".to_string(),
copies: None,
append_copy_id: None,
value: Some("series-number-".to_string()),
append_writer_id: None,
cardinality: Some(1_000_000),
}],
fields: vec![
FieldSpec {
key: "int_val".to_string(),
copies: Some(10),
null_probability: None,
field: FieldKind::IntegerRange(1, 100_000_000),
},
FieldSpec {
key: "float_val".to_string(),
copies: Some(10),
null_probability: None,
field: FieldKind::FloatRange(1.0, 100.0),
},
],
copies: None,
lines_per_sample: Some(10_000),
}],
};
BuiltInSpec {
description,
write_spec,
}
}