From 31aa41e240a196448858584b60fae05e558bf04a Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Tue, 7 Dec 2021 16:54:40 -0500 Subject: [PATCH 1/2] feat: add ability for data generator to write to many buckets This adds the ability for the data generator to write to many databases. A new command line argument, `bucket_list`, is added which should be a file name. The file should contain a list of databsaes, one per line, with the structure of _. This is a little odd given the data generator expects org and bucket separately, but I expect the file that we'll be using will be database names, which have this format. The configuration can specify what percentage of the list should get written to by which agents at what sampling interval. This should allow configurations where databases get different levels of ingest and different types (as specified via different agent specs). The structure is a little wonky, but I think it'll get the job done. The next step is to run some perf tests to see how the data generator performs if writing to 10k databases. --- .../benches/point_generation.rs | 26 +- iox_data_generator/schemas/cap-write.toml | 6 +- iox_data_generator/schemas/full_example.toml | 28 ++- .../schemas/storage_cardinality_example.toml | 7 +- iox_data_generator/schemas/tracing-spec.toml | 6 +- iox_data_generator/src/agent.rs | 22 +- .../src/bin/iox_data_generator.rs | 95 +++---- iox_data_generator/src/lib.rs | 83 ++++-- iox_data_generator/src/measurement.rs | 12 +- iox_data_generator/src/specification.rs | 237 +++++++++++++++++- iox_data_generator/src/tag_set.rs | 15 +- iox_data_generator/src/write.rs | 56 +++-- perf/battery-0/datagen.toml | 5 +- 13 files changed, 451 insertions(+), 147 deletions(-) diff --git a/iox_data_generator/benches/point_generation.rs b/iox_data_generator/benches/point_generation.rs index 5b562ccbeb..0b38ef81d9 100644 --- a/iox_data_generator/benches/point_generation.rs +++ b/iox_data_generator/benches/point_generation.rs @@ -1,10 +1,12 @@ use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use iox_data_generator::agent::Agent; +use iox_data_generator::specification::{AgentAssignmentSpec, BucketWriterSpec, OrgBucket}; use iox_data_generator::{ specification::{AgentSpec, DataSpec, FieldSpec, FieldValueSpec, MeasurementSpec}, tag_set::GeneratedTagSets, write::PointsWriterBuilder, }; +use std::time::Duration; pub fn single_agent(c: &mut Criterion) { let spec = DataSpec { @@ -13,8 +15,6 @@ pub fn single_agent(c: &mut Criterion) { tag_sets: vec![], agents: vec![AgentSpec { name: "foo".to_string(), - count: None, - sampling_interval: Some("1s".to_string()), measurements: vec![MeasurementSpec { name: "measurement-1".into(), count: None, @@ -29,6 +29,14 @@ pub fn single_agent(c: &mut Criterion) { has_one: vec![], tag_pairs: vec![], }], + bucket_writers: vec![BucketWriterSpec { + percent: 1.0, + agents: vec![AgentAssignmentSpec { + name: "foo".to_string(), + count: None, + sampling_interval: "1s".to_string(), + }], + }], }; let mut points_writer = PointsWriterBuilder::new_no_op(true); @@ -47,6 +55,10 @@ pub fn single_agent(c: &mut Criterion) { b.iter(|| { let r = block_on(iox_data_generator::generate( &spec, + vec![OrgBucket { + org: "foo".to_string(), + bucket: "bar".to_string(), + }], &mut points_writer, start_datetime, end_datetime, @@ -125,8 +137,6 @@ for_each = [ [[agents]] name = "foo" -# create this many agents -count = 3 [[agents.measurements]] name = "storage_usage_bucket_cardinality" @@ -142,6 +152,10 @@ tag_pairs = [ [[agents.measurements.fields]] name = "gauge" i64_range = [1, 8147240] + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "1s", count = 3}] "#).unwrap(); let generated_tag_sets = GeneratedTagSets::from_spec(&spec).unwrap(); @@ -155,6 +169,8 @@ i64_range = [1, 8147240] let mut agents = Agent::from_spec( &spec.agents[0], + 3, + Duration::from_millis(10), start_datetime, end_datetime, 0, @@ -172,7 +188,7 @@ i64_range = [1, 8147240] group.bench_function("single agent with basic configuration", |b| { b.iter(|| { agent.reset_current_date_time(0); - let points_writer = points_writer.build_for_agent("foo").unwrap(); + let points_writer = points_writer.build_for_agent("foo", "foo", "foo").unwrap(); let r = block_on(agent.generate_all(points_writer, 1)); let n_points = r.expect("Could not generate data"); assert_eq!(n_points, expected_points as usize); diff --git a/iox_data_generator/schemas/cap-write.toml b/iox_data_generator/schemas/cap-write.toml index 8b5cce47d1..d80fa8cc2c 100644 --- a/iox_data_generator/schemas/cap-write.toml +++ b/iox_data_generator/schemas/cap-write.toml @@ -2,10 +2,12 @@ # https://github.com/influxdata/idpe/tree/e493a8e9b6b773e9374a8542ddcab7d8174d320d/performance/capacity/write name = "cap_write" +[[bucket_writers]] +percent = 1.0 +agents = [{name = "telegraf", count = 3, sampling_interval = "10s"}] + [[agents]] name = "telegraf" -count = 3 -sampling_interval = "10s" tag_pairs = [ {key = "host", template = "host-{{agent.id}}"} ] diff --git a/iox_data_generator/schemas/full_example.toml b/iox_data_generator/schemas/full_example.toml index f113691bdd..6be2dcd5a4 100644 --- a/iox_data_generator/schemas/full_example.toml +++ b/iox_data_generator/schemas/full_example.toml @@ -103,14 +103,10 @@ for_each = [ "foo_bar" ] +# Agent specs can be referenced later on by bucket writers, which specify how frequently +# data should be written and by how many different agents. [[agents]] name = "first_agent" -# Create this many agents. Agents are single threaded so the way to get more paralellism is to -# increase the number of agents. -count = 2 -# each agent will generate a sampling of data every 10ms. If running from a historical point in -# time, the agents will push data as quickly as they can until they catch up to now. -sampling_interval = "10ms" # if specifying tag_pairs at the agent level, every line that the agent generates will have these # tag pairs added to it. Note that the template has the same helpers as those in value (except for id). # In addition, it has an accessor for the agent id. @@ -171,4 +167,22 @@ count = 2 [[agents.measurements.fields]] name = "f1" -bool = true \ No newline at end of file +bool = true + +# bucket_writers specify how to split up the list of supplied buckets to write to. If +# only a single one is specified via the CLI flags, then you'd want only a single bucket_writer +# with a percent of 1.0. +# +# These make it possible to split up a large list of buckets to write to and send different +# amounts of write load as well as different schemas through specifying different agents. +[[bucket_writers]] +# 20% of the buckets specified in the --bucket_list file will have these agents writing to them +percent = 0.2 +# for each of those buckets, have 3 of the first_agent writing every 10s, and 1 of the another_example writing every minute. +agents = [{name = "first_agent", count = 3, sampling_interval = "10s"}, {name = "another_example", sampling_interval = "1m"}] + +[[bucket_writers]] +# the remaining 80% of the buckeets specified will write using these agents +percent = 0.8 +# we'll only have a single agent of another_example for each database +agents = [{name = "another_example", sampling_interval = "1s"}] diff --git a/iox_data_generator/schemas/storage_cardinality_example.toml b/iox_data_generator/schemas/storage_cardinality_example.toml index 2c6bff9d35..5210c6efac 100644 --- a/iox_data_generator/schemas/storage_cardinality_example.toml +++ b/iox_data_generator/schemas/storage_cardinality_example.toml @@ -58,11 +58,12 @@ for_each = [ "partition_id", ] +[[bucket_writers]] +percent = 1.0 +agents = [{name = "sender", sampling_interval = "10s"}] + [[agents]] name = "sender" -# create this many agents -count = 1 -sampling_interval = "10s" [[agents.measurements]] name = "storage_usage_bucket_cardinality" diff --git a/iox_data_generator/schemas/tracing-spec.toml b/iox_data_generator/schemas/tracing-spec.toml index 2d48c47671..7b31dc59a7 100644 --- a/iox_data_generator/schemas/tracing-spec.toml +++ b/iox_data_generator/schemas/tracing-spec.toml @@ -17,8 +17,6 @@ for_each = ["host", "host.service"] [[agents]] name = "tracing_agent" -count = 1 -sampling_interval = "1s" [[agents.measurements]] name = "traces" @@ -31,3 +29,7 @@ tag_pairs = [ [[agents.measurements.fields]] name = "timing" f64_range = [0.0, 500.0] + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "tracing_agent", sampling_interval = "1s"}] diff --git a/iox_data_generator/src/agent.rs b/iox_data_generator/src/agent.rs index d11fcfbbd1..30202e3925 100644 --- a/iox_data_generator/src/agent.rs +++ b/iox_data_generator/src/agent.rs @@ -8,7 +8,6 @@ use crate::{ }; use crate::tag_set::GeneratedTagSets; -use humantime::parse_duration; use serde_json::json; use snafu::{ResultExt, Snafu}; use std::time::{Duration, Instant}; @@ -39,12 +38,6 @@ pub enum Error { source: crate::write::Error, }, - #[snafu(display("Sampling interval must be valid string: {}", source))] - InvalidSamplingInterval { - /// Underlying `parse` error - source: humantime::DurationError, - }, - #[snafu(display("Error creating agent tag pairs: {}", source))] CouldNotCreateAgentTagPairs { source: crate::tag_pair::Error }, } @@ -77,17 +70,18 @@ pub struct Agent { impl Agent { /// Create agents that will generate data points according to these /// specs. + #[allow(clippy::too_many_arguments)] pub fn from_spec( agent_spec: &specification::AgentSpec, + count: usize, + sampling_interval: Duration, start_datetime: Option, // in nanoseconds since the epoch, defaults to now end_datetime: Option, // also in nanoseconds since the epoch, defaults to now execution_start_time: i64, continue_on: bool, // If true, run in "continue" mode after historical data is generated generated_tag_sets: &GeneratedTagSets, ) -> Result> { - let agent_count = agent_spec.count.unwrap_or(1); - - let agents: Vec<_> = (1..agent_count + 1) + let agents: Vec<_> = (1..count + 1) .into_iter() .map(|agent_id| { let data = json!({"agent": {"id": agent_id, "name": agent_spec.name}}); @@ -114,17 +108,11 @@ impl Agent { let current_datetime = start_datetime.unwrap_or_else(now_ns); let end_datetime = end_datetime.unwrap_or_else(now_ns); - // Convert to nanoseconds - let sampling_interval = match &agent_spec.sampling_interval { - None => None, - Some(s) => Some(parse_duration(s).context(InvalidSamplingInterval)?), - }; - Ok(Self { id: agent_id, name: agent_spec.name.to_string(), measurement_generators, - sampling_interval, + sampling_interval: Some(sampling_interval), current_datetime, end_datetime, continue_on, diff --git a/iox_data_generator/src/bin/iox_data_generator.rs b/iox_data_generator/src/bin/iox_data_generator.rs index 702e3e5610..d385ea1b3b 100644 --- a/iox_data_generator/src/bin/iox_data_generator.rs +++ b/iox_data_generator/src/bin/iox_data_generator.rs @@ -13,7 +13,12 @@ use chrono::prelude::*; use chrono_english::{parse_date_string, Dialect}; use clap::{crate_authors, crate_version, App, Arg}; -use iox_data_generator::{specification::DataSpec, write::PointsWriterBuilder}; +use iox_data_generator::{ + specification::{DataSpec, OrgBucket}, + write::PointsWriterBuilder, +}; +use std::fs::File; +use std::io::{self, BufRead}; use tracing::info; #[tokio::main] @@ -91,6 +96,12 @@ Logging: .help("The bucket name to write to") .takes_value(true), ) + .arg( + Arg::with_name("BUCKET_LIST") + .long("bucket_list") + .help("File name with a list of databases. 1 per line with _ format") + .takes_value(true), + ) .arg( Arg::with_name("TOKEN") .long("token") @@ -176,21 +187,11 @@ Logging: { PointsWriterBuilder::new_file(line_protocol_filename)? } else if let Some(host) = matches.value_of("HOST") { - let (host, org, bucket, token) = validate_api_arguments( - host, - matches.value_of("ORG"), - matches.value_of("BUCKET"), - matches.value_of("TOKEN"), - ); + let token = matches + .value_of("TOKEN") + .expect("--token must be specified"); - PointsWriterBuilder::new_api( - host, - org, - bucket, - token, - matches.value_of("jaeger_debug_header"), - ) - .await? + PointsWriterBuilder::new_api(host, token, matches.value_of("jaeger_debug_header")).await? } else if matches.is_present("PRINT") { PointsWriterBuilder::new_std_out() } else if matches.is_present("NOOP") { @@ -199,8 +200,42 @@ Logging: panic!("One of --print or --output or --host must be provided."); }; + let buckets = match ( + matches.value_of("ORG"), + matches.value_of("BUCKET"), + matches.value_of("BUCKET_LIST"), + ) { + (Some(org), Some(bucket), None) => { + vec![OrgBucket { + org: org.to_string(), + bucket: bucket.to_string(), + }] + } + (None, None, Some(bucket_list)) => { + let f = File::open(bucket_list).expect("unable to open bucket_list file"); + + io::BufReader::new(f) + .lines() + .map(|l| { + let l = l.expect("unable to read line from bucket list"); + let parts = l.split('_').collect::>(); + if parts.len() != 2 { + panic!("error parsing org and bucket from {}", l); + } + + let org = parts[0].to_string(); + let bucket = parts[1].to_string(); + + OrgBucket { org, bucket } + }) + .collect::>() + } + _ => panic!("must specify either --org AND --bucket OR --bucket_list"), + }; + let result = iox_data_generator::generate( &data_spec, + buckets, &mut points_writer_builder, start_datetime, end_datetime, @@ -230,36 +265,6 @@ fn datetime_nanoseconds(arg: Option<&str>, now: DateTime) -> Option }) } -fn validate_api_arguments<'a>( - host: &'a str, - org: Option<&'a str>, - bucket: Option<&'a str>, - token: Option<&'a str>, -) -> (&'a str, &'a str, &'a str, &'a str) { - let mut errors = vec![]; - - if org.is_none() { - errors.push("`--org` is missing"); - } - if bucket.is_none() { - errors.push("`--bucket` is missing"); - } - if token.is_none() { - errors.push("`--token` is missing"); - } - - if errors.is_empty() { - // These `unwrap`s are safe because otherwise errors wouldn't be empty - (host, org.unwrap(), bucket.unwrap(), token.unwrap()) - } else { - panic!( - "When `--host` is specified, `--org`, `--bucket`, and `--token` are required, \ - but {}", - errors.join(", ") - ); - } -} - #[cfg(test)] mod test { use super::*; diff --git a/iox_data_generator/src/lib.rs b/iox_data_generator/src/lib.rs index f744afea4a..26cfb8582c 100644 --- a/iox_data_generator/src/lib.rs +++ b/iox_data_generator/src/lib.rs @@ -89,6 +89,16 @@ pub enum Error { /// Underlying `tag_set` module error source: tag_set::Error, }, + + /// Error splitting input buckets to agents that write to them + #[snafu(display( + "Error splitting input buckets into agents that write to them: {}", + source + ))] + CouldNotAssignAgents { + /// Underlying `specification` module error + source: specification::Error, + }, } type Result = std::result::Result; @@ -102,6 +112,7 @@ type Result = std::result::Result; #[allow(clippy::too_many_arguments)] pub async fn generate( spec: &specification::DataSpec, + buckets: Vec, points_writer_builder: &mut write::PointsWriterBuilder, start_datetime: Option, end_datetime: Option, @@ -112,45 +123,60 @@ pub async fn generate( ) -> Result { let mut handles = vec![]; + let bucket_agents = spec + .bucket_split_to_agents(&buckets) + .context(CouldNotAssignAgents)?; + let generated_tag_sets = GeneratedTagSets::from_spec(spec).context(CouldNotGenerateTagSets)?; let lock = Arc::new(tokio::sync::Mutex::new(())); let start = std::time::Instant::now(); - let agents = spec - .agents - .iter() - .map(|spec| { - Agent::from_spec( - spec, + for bucket_agents in &bucket_agents { + for agent_assignment in bucket_agents.agent_assignments.iter() { + let agents = Agent::from_spec( + agent_assignment.spec, + agent_assignment.count, + agent_assignment.sampling_interval, start_datetime, end_datetime, execution_start_time, continue_on, &generated_tag_sets, ) - .context(CouldNotCreateAgent) - }) - .collect::>>>()?; - let agents = agents.into_iter().flatten(); + .context(CouldNotCreateAgent)?; - for mut agent in agents { - let agent_points_writer = points_writer_builder - .build_for_agent(&agent.name) - .context(CouldNotCreateAgentWriter)?; + info!( + "Configuring {} agents of \"{}\" to write data to org {} and bucket {}", + agent_assignment.count, + agent_assignment.spec.name, + bucket_agents.org_bucket.org, + bucket_agents.org_bucket.bucket + ); - let lock_ref = Arc::clone(&lock); + for mut agent in agents.into_iter() { + let agent_points_writer = points_writer_builder + .build_for_agent( + &agent_assignment.spec.name, + &bucket_agents.org_bucket.org, + &bucket_agents.org_bucket.bucket, + ) + .context(CouldNotCreateAgentWriter)?; - handles.push(tokio::task::spawn(async move { - // did this weird hack because otherwise the stdout outputs would be jumbled together garbage - if one_agent_at_a_time { - let _l = lock_ref.lock().await; - agent.generate_all(agent_points_writer, batch_size).await - } else { - agent.generate_all(agent_points_writer, batch_size).await + let lock_ref = Arc::clone(&lock); + + handles.push(tokio::task::spawn(async move { + // did this weird hack because otherwise the stdout outputs would be jumbled together garbage + if one_agent_at_a_time { + let _l = lock_ref.lock().await; + agent.generate_all(agent_points_writer, batch_size).await + } else { + agent.generate_all(agent_points_writer, batch_size).await + } + })); } - })); + } } let mut total_points = 0; @@ -190,6 +216,7 @@ mod test { use crate::specification::*; use influxdb2_client::models::WriteDataPoint; use std::str::FromStr; + use std::time::Duration; type Error = Box; type Result = std::result::Result; @@ -201,14 +228,18 @@ name = "demo_schema" [[agents]] name = "foo" -sampling_interval = "10s" # seconds [[agents.measurements]] name = "cpu" [[agents.measurements.fields]] name = "val" -i64_range = [1, 1]"#; +i64_range = [1, 1] + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "10s"}] +"#; let data_spec = DataSpec::from_str(toml).unwrap(); let agent_spec = &data_spec.agents[0]; @@ -224,6 +255,8 @@ i64_range = [1, 1]"#; let mut agent = agent::Agent::from_spec( agent_spec, + 1, + Duration::from_secs(10), start_datetime, end_datetime, execution_start_time, diff --git a/iox_data_generator/src/measurement.rs b/iox_data_generator/src/measurement.rs index e83c2eec92..bd1cf951b3 100644 --- a/iox_data_generator/src/measurement.rs +++ b/iox_data_generator/src/measurement.rs @@ -545,7 +545,11 @@ mod test { [[agents.measurements.fields]] name = "val" - i64_range = [3, 3]"#, + i64_range = [3, 3] + + [[bucket_writers]] + percent = 1.0 + agents = [{name = "foo", sampling_interval = "10s"}]"#, ) .unwrap(); @@ -604,7 +608,11 @@ mod test { [[agents.measurements.fields]] name = "val" - i64_range = [3, 3]"#, + i64_range = [3, 3] + + [[bucket_writers]] + percent = 1.0 + agents = [{name = "foo", sampling_interval = "10s"}]"#, ) .unwrap(); diff --git a/iox_data_generator/src/specification.rs b/iox_data_generator/src/specification.rs index 05e9173e1b..46bda096c8 100644 --- a/iox_data_generator/src/specification.rs +++ b/iox_data_generator/src/specification.rs @@ -1,11 +1,14 @@ //! Reading and interpreting data generation specifications. +use humantime::parse_duration; use serde::Deserialize; -use snafu::{ResultExt, Snafu}; -use std::{fs, ops::Range, str::FromStr}; +use snafu::{OptionExt, ResultExt, Snafu}; +use std::{fs, ops::Range, str::FromStr, sync::Arc, time::Duration}; +use tracing::warn; /// Errors that may happen while reading a TOML specification. #[derive(Snafu, Debug)] +#[allow(missing_docs)] pub enum Error { /// File-related error that may happen while reading a specification #[snafu(display(r#"Error reading data spec from TOML file: {}"#, source))] @@ -20,6 +23,15 @@ pub enum Error { /// Underlying TOML error that caused this problem source: toml::de::Error, }, + + #[snafu(display("Sampling interval must be valid string: {}", source))] + InvalidSamplingInterval { source: humantime::DurationError }, + + #[snafu(display( + "Agent {} referenced in bucket_writers, but not present in spec", + agent + ))] + AgentNotFound { agent: String }, } type Result = std::result::Result; @@ -43,8 +55,10 @@ pub struct DataSpec { /// set of tags that appear. #[serde(default)] pub tag_sets: Vec, - /// The specification for the data-generating agents in this data set. + /// The specification for the agents that can be used to write data to databases. pub agents: Vec, + /// The specification for writing to the provided list of buckets. + pub bucket_writers: Vec, } impl DataSpec { @@ -53,6 +67,111 @@ impl DataSpec { let spec_toml = fs::read_to_string(file_name).context(ReadFile)?; Self::from_str(&spec_toml) } + + /// Given a collection of OrgBuckets, assign each a set of agents based on the spec + pub fn bucket_split_to_agents<'a>( + &'a self, + buckets: &'a [OrgBucket], + ) -> Result>> { + let mut bucket_agents = Vec::with_capacity(buckets.len()); + + let mut start = 0; + for w in &self.bucket_writers { + if start >= buckets.len() { + warn!( + "Bucket_writers percentages > 1.0. Writer {:?} and later skipped.", + w + ); + break; + } + + // validate the agents actually exist in the spec + for a in &w.agents { + let _ = self.agent_by_name(&a.name).unwrap_or_else(|| { + panic!( + "agent {} referenced in bucket writers, but isn't in the spec", + a.name + ) + }); + } + + let agents: Vec<_> = w + .agents + .iter() + .map(|a| { + let count = a.count.unwrap_or(1); + let sampling_interval = + parse_duration(&a.sampling_interval).context(InvalidSamplingInterval)?; + let spec = self + .agent_by_name(&a.name) + .context(AgentNotFound { agent: &a.name })?; + + Ok(AgentAssignment { + spec, + count, + sampling_interval, + }) + }) + .collect::>>()?; + let agents = Arc::new(agents); + + let mut end = (buckets.len() as f64 * w.percent).ceil() as usize + start; + if end > buckets.len() { + end = buckets.len(); + } + + for org_bucket in &buckets[start..end] { + bucket_agents.push(BucketAgents { + org_bucket, + agent_assignments: Arc::clone(&agents), + }) + } + + start = end; + } + + Ok(bucket_agents) + } + + /// Get the agent spec by its name + pub fn agent_by_name(&self, name: &str) -> Option<&AgentSpec> { + for a in &self.agents { + if a.name == name { + return Some(a); + } + } + + None + } +} + +#[derive(Debug, Eq, PartialEq)] +/// Specifies an org and a bucket to write data to +pub struct OrgBucket { + /// The organization name + pub org: String, + /// The bucket name + pub bucket: String, +} + +#[derive(Debug)] +/// Assignment info for an agent to a bucket +pub struct AgentAssignment<'a> { + /// The agent specification for writing to the assigned bucket + pub spec: &'a AgentSpec, + /// The number of these agents that should be writing to the bucket + pub count: usize, + /// The sampling interval agents will generate data on + pub sampling_interval: Duration, +} + +#[derive(Debug)] +/// Agent assignments mapped to a bucket +pub struct BucketAgents<'a> { + /// The organization and bucket data will get written to + pub org_bucket: &'a OrgBucket, + /// The agents specifications that will be writing to the bucket + pub agent_assignments: Arc>>, } impl FromStr for DataSpec { @@ -110,6 +229,33 @@ pub struct TagSetsSpec { pub for_each: Vec, } +/// The specification for what should be written to the list of provided org buckets. +/// Buckets will be written to by one or more agents with the given sampling interval and +/// agent count. +#[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct BucketWriterSpec { + /// The percentage of buckets from the provided list that should use these agents. The + /// percent should add up to 1.0 (100%). + pub percent: f64, + /// The agents that should be used to write to these databases. + pub agents: Vec, +} + +/// The specification for the specific configuration of how an agent should write to a database. +#[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct AgentAssignmentSpec { + /// The name of the `AgentSpec` to use + pub name: String, + /// The number of these agents that should write to the database + pub count: Option, + /// How frequently each agent will write to the database. This is applicable when using the + /// --continue flag. Otherwise, if doing historical backfill, timestamps of generated data + /// will be this far apart and data will be written in as quickly as possible. + pub sampling_interval: String, +} + /// The specification of the behavior of an agent, the entity responsible for /// generating a number of data points according to its configuration. #[derive(Deserialize, Debug)] @@ -118,12 +264,6 @@ pub struct TagSetsSpec { pub struct AgentSpec { /// The name of the agent, which can be referenced in templates with `agent.name`. pub name: String, - /// Specifies the number of agents that should be created with this spec. - /// Default value is 1. - pub count: Option, - /// How often this agent should generate samples, in a duration string. If - /// not specified, this agent will only generate one sample. - pub sampling_interval: Option, /// The specifications for the measurements for the agent to generate. pub measurements: Vec, /// A collection of strings that reference other `Values` collections. Each agent will have one @@ -493,6 +633,10 @@ name = "cpu" [[agents.measurements.fields]] name = "host" template = "server" + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "10s"}] "#; let spec = DataSpec::from_str(toml).unwrap(); @@ -513,4 +657,79 @@ template = "server" field_spec ); } + + #[test] + fn split_buckets_by_writer_spec() { + let toml = r#" +name = "demo_schema" + +[[agents]] +name = "foo" +[[agents.measurements]] +name = "cpu" +[[agents.measurements.fields]] +name = "host" +template = "server" + +[[agents]] +name = "bar" +[[agents.measurements]] +name = "whatevs" +[[agents.measurements.fields]] +name = "val" +i64_range = [0, 10] + +[[bucket_writers]] +percent = 0.6 +agents = [{name = "foo", sampling_interval = "10s"}] + +[[bucket_writers]] +percent = 0.4 +agents = [{name = "bar", sampling_interval = "1m", count = 3}] +"#; + let spec = DataSpec::from_str(toml).unwrap(); + let buckets = vec![ + OrgBucket { + org: "a".to_string(), + bucket: "1".to_string(), + }, + OrgBucket { + org: "a".to_string(), + bucket: "2".to_string(), + }, + OrgBucket { + org: "b".to_string(), + bucket: "1".to_string(), + }, + ]; + + let bucket_agents = spec.bucket_split_to_agents(&buckets).unwrap(); + + let b = &bucket_agents[0]; + assert_eq!(b.org_bucket, &buckets[0]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(10) + ); + assert_eq!(b.agent_assignments[0].count, 1); + assert_eq!(b.agent_assignments[0].spec.name, "foo"); + + let b = &bucket_agents[1]; + assert_eq!(b.org_bucket, &buckets[1]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(10) + ); + assert_eq!(b.agent_assignments[0].count, 1); + assert_eq!(b.agent_assignments[0].spec.name, "foo"); + + let b = &bucket_agents[2]; + assert_eq!(b.org_bucket, &buckets[2]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(60) + ); + assert_eq!(b.agent_assignments[0].count, 3); + assert_eq!(b.agent_assignments[0].spec.name, "bar"); + } } diff --git a/iox_data_generator/src/tag_set.rs b/iox_data_generator/src/tag_set.rs index 7b39fc48ed..b7f3a3cd40 100644 --- a/iox_data_generator/src/tag_set.rs +++ b/iox_data_generator/src/tag_set.rs @@ -492,7 +492,10 @@ name = "cpu" [[agents.measurements.fields]] name = "f1" i64_range = [0, 23] -"#; + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); let tag_sets = GeneratedTagSets::from_spec(&spec).unwrap(); @@ -537,7 +540,10 @@ name = "cpu" [[agents.measurements.fields]] name = "f1" i64_range = [0, 23] -"#; + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); let tag_sets = GeneratedTagSets::from_spec(&spec).unwrap(); @@ -603,7 +609,10 @@ name = "cpu" [[agents.measurements.fields]] name = "f1" i64_range = [0, 23] -"#; + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); let tag_sets = GeneratedTagSets::from_spec(&spec).unwrap(); diff --git a/iox_data_generator/src/write.rs b/iox_data_generator/src/write.rs index 21e3d54f37..c111666937 100644 --- a/iox_data_generator/src/write.rs +++ b/iox_data_generator/src/write.rs @@ -94,11 +94,7 @@ pub struct PointsWriterBuilder { #[derive(Debug)] enum PointsWriterConfig { - Api { - client: influxdb2_client::Client, - org: String, - bucket: String, - }, + Api(influxdb2_client::Client), Directory(PathBuf), NoOp { perform_write: bool, @@ -113,8 +109,6 @@ impl PointsWriterBuilder { /// specified org and bucket. pub async fn new_api( host: impl Into + Send, - org: impl Into + Send, - bucket: impl Into + Send, token: impl Into + Send, jaeger_debug: Option<&str>, ) -> Result { @@ -133,15 +127,9 @@ impl PointsWriterBuilder { if let Some(header) = jaeger_debug { client = client.with_jaeger_debug(header.to_string()); } - let org = org.into(); - let bucket = bucket.into(); Ok(Self { - config: PointsWriterConfig::Api { - client, - org, - bucket, - }, + config: PointsWriterConfig::Api(client), }) } @@ -172,16 +160,17 @@ impl PointsWriterBuilder { /// Create a writer out of this writer's configuration for a particular /// agent that runs in a separate thread/task. - pub fn build_for_agent(&mut self, name: impl Into) -> Result { + pub fn build_for_agent( + &mut self, + name: impl Into, + org: impl Into, + bucket: impl Into, + ) -> Result { let inner_writer = match &mut self.config { - PointsWriterConfig::Api { - client, - org, - bucket, - } => InnerPointsWriter::Api { + PointsWriterConfig::Api(client) => InnerPointsWriter::Api { client: client.clone(), - org: org.clone(), - bucket: bucket.clone(), + org: org.into(), + bucket: bucket.into(), }, PointsWriterConfig::Directory(dir_path) => { let mut filename = dir_path.clone(); @@ -351,7 +340,12 @@ name = "cpu" [[agents.measurements.fields]] name = "val" -i64_range = [3,3]"#; +i64_range = [3,3] + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "1s"}] +"#; let data_spec = DataSpec::from_str(toml).unwrap(); let mut points_writer_builder = PointsWriterBuilder::new_vec(); @@ -360,6 +354,10 @@ i64_range = [3,3]"#; generate( &data_spec, + vec![OrgBucket { + org: "foo".to_string(), + bucket: "bar".to_string(), + }], &mut points_writer_builder, Some(now), Some(now), @@ -389,14 +387,18 @@ name = "demo_schema" [[agents]] name = "foo" -sampling_interval = "1s" # seconds [[agents.measurements]] name = "cpu" [[agents.measurements.fields]] name = "val" -i64_range = [2, 2]"#; +i64_range = [2, 2] + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "1s"}] +"#; let data_spec = DataSpec::from_str(toml).unwrap(); let mut points_writer_builder = PointsWriterBuilder::new_vec(); @@ -405,6 +407,10 @@ i64_range = [2, 2]"#; generate( &data_spec, + vec![OrgBucket { + org: "foo".to_string(), + bucket: "bar".to_string(), + }], &mut points_writer_builder, Some(now - 1_000_000_000), Some(now), diff --git a/perf/battery-0/datagen.toml b/perf/battery-0/datagen.toml index 91800a8a2a..8513cecdd5 100644 --- a/perf/battery-0/datagen.toml +++ b/perf/battery-0/datagen.toml @@ -2,8 +2,6 @@ name = "example" [[agents]] name = "foo" -count = 3 -sampling_interval = "10s" [[agents.measurements]] name = "cpu" @@ -15,3 +13,6 @@ tag_pairs = [ name = "usage_user" f64_range = [0.0, 100.0] +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", count = 3, sampling_interval = "10s"}] From 2c8d17bea8cae0507261454d43f381af3de112e5 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Wed, 8 Dec 2021 10:29:35 -0500 Subject: [PATCH 2/2] refactor: change percent to ratio in data generator bucket writers --- iox_data_generator/benches/point_generation.rs | 4 ++-- iox_data_generator/schemas/cap-write.toml | 2 +- iox_data_generator/schemas/full_example.toml | 4 ++-- .../schemas/storage_cardinality_example.toml | 2 +- iox_data_generator/schemas/tracing-spec.toml | 2 +- iox_data_generator/src/lib.rs | 2 +- iox_data_generator/src/measurement.rs | 4 ++-- iox_data_generator/src/specification.rs | 14 +++++++------- iox_data_generator/src/tag_set.rs | 6 +++--- iox_data_generator/src/write.rs | 4 ++-- perf/battery-0/datagen.toml | 2 +- 11 files changed, 23 insertions(+), 23 deletions(-) diff --git a/iox_data_generator/benches/point_generation.rs b/iox_data_generator/benches/point_generation.rs index 0b38ef81d9..c84afd8377 100644 --- a/iox_data_generator/benches/point_generation.rs +++ b/iox_data_generator/benches/point_generation.rs @@ -30,7 +30,7 @@ pub fn single_agent(c: &mut Criterion) { tag_pairs: vec![], }], bucket_writers: vec![BucketWriterSpec { - percent: 1.0, + ratio: 1.0, agents: vec![AgentAssignmentSpec { name: "foo".to_string(), count: None, @@ -154,7 +154,7 @@ name = "gauge" i64_range = [1, 8147240] [[bucket_writers]] -percent = 1.0 +ratio = 1.0 agents = [{name = "foo", sampling_interval = "1s", count = 3}] "#).unwrap(); diff --git a/iox_data_generator/schemas/cap-write.toml b/iox_data_generator/schemas/cap-write.toml index d80fa8cc2c..4cbe75042a 100644 --- a/iox_data_generator/schemas/cap-write.toml +++ b/iox_data_generator/schemas/cap-write.toml @@ -3,7 +3,7 @@ name = "cap_write" [[bucket_writers]] -percent = 1.0 +ratio = 1.0 agents = [{name = "telegraf", count = 3, sampling_interval = "10s"}] [[agents]] diff --git a/iox_data_generator/schemas/full_example.toml b/iox_data_generator/schemas/full_example.toml index 6be2dcd5a4..170e814601 100644 --- a/iox_data_generator/schemas/full_example.toml +++ b/iox_data_generator/schemas/full_example.toml @@ -177,12 +177,12 @@ bool = true # amounts of write load as well as different schemas through specifying different agents. [[bucket_writers]] # 20% of the buckets specified in the --bucket_list file will have these agents writing to them -percent = 0.2 +ratio = 0.2 # for each of those buckets, have 3 of the first_agent writing every 10s, and 1 of the another_example writing every minute. agents = [{name = "first_agent", count = 3, sampling_interval = "10s"}, {name = "another_example", sampling_interval = "1m"}] [[bucket_writers]] # the remaining 80% of the buckeets specified will write using these agents -percent = 0.8 +ratio = 0.8 # we'll only have a single agent of another_example for each database agents = [{name = "another_example", sampling_interval = "1s"}] diff --git a/iox_data_generator/schemas/storage_cardinality_example.toml b/iox_data_generator/schemas/storage_cardinality_example.toml index 5210c6efac..10041f5720 100644 --- a/iox_data_generator/schemas/storage_cardinality_example.toml +++ b/iox_data_generator/schemas/storage_cardinality_example.toml @@ -59,7 +59,7 @@ for_each = [ ] [[bucket_writers]] -percent = 1.0 +ratio = 1.0 agents = [{name = "sender", sampling_interval = "10s"}] [[agents]] diff --git a/iox_data_generator/schemas/tracing-spec.toml b/iox_data_generator/schemas/tracing-spec.toml index 7b31dc59a7..37b9a9c506 100644 --- a/iox_data_generator/schemas/tracing-spec.toml +++ b/iox_data_generator/schemas/tracing-spec.toml @@ -31,5 +31,5 @@ name = "timing" f64_range = [0.0, 500.0] [[bucket_writers]] -percent = 1.0 +ratio = 1.0 agents = [{name = "tracing_agent", sampling_interval = "1s"}] diff --git a/iox_data_generator/src/lib.rs b/iox_data_generator/src/lib.rs index 26cfb8582c..a813de36b6 100644 --- a/iox_data_generator/src/lib.rs +++ b/iox_data_generator/src/lib.rs @@ -237,7 +237,7 @@ name = "val" i64_range = [1, 1] [[bucket_writers]] -percent = 1.0 +ratio = 1.0 agents = [{name = "foo", sampling_interval = "10s"}] "#; let data_spec = DataSpec::from_str(toml).unwrap(); diff --git a/iox_data_generator/src/measurement.rs b/iox_data_generator/src/measurement.rs index bd1cf951b3..111c895106 100644 --- a/iox_data_generator/src/measurement.rs +++ b/iox_data_generator/src/measurement.rs @@ -548,7 +548,7 @@ mod test { i64_range = [3, 3] [[bucket_writers]] - percent = 1.0 + ratio = 1.0 agents = [{name = "foo", sampling_interval = "10s"}]"#, ) .unwrap(); @@ -611,7 +611,7 @@ mod test { i64_range = [3, 3] [[bucket_writers]] - percent = 1.0 + ratio = 1.0 agents = [{name = "foo", sampling_interval = "10s"}]"#, ) .unwrap(); diff --git a/iox_data_generator/src/specification.rs b/iox_data_generator/src/specification.rs index 46bda096c8..97ee2242ec 100644 --- a/iox_data_generator/src/specification.rs +++ b/iox_data_generator/src/specification.rs @@ -115,7 +115,7 @@ impl DataSpec { .collect::>>()?; let agents = Arc::new(agents); - let mut end = (buckets.len() as f64 * w.percent).ceil() as usize + start; + let mut end = (buckets.len() as f64 * w.ratio).ceil() as usize + start; if end > buckets.len() { end = buckets.len(); } @@ -235,9 +235,9 @@ pub struct TagSetsSpec { #[derive(Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct BucketWriterSpec { - /// The percentage of buckets from the provided list that should use these agents. The - /// percent should add up to 1.0 (100%). - pub percent: f64, + /// The ratio of buckets from the provided list that should use these agents. The + /// ratios of the collection of bucket writer specs should add up to 1.0. + pub ratio: f64, /// The agents that should be used to write to these databases. pub agents: Vec, } @@ -635,7 +635,7 @@ name = "host" template = "server" [[bucket_writers]] -percent = 1.0 +ratio = 1.0 agents = [{name = "foo", sampling_interval = "10s"}] "#; let spec = DataSpec::from_str(toml).unwrap(); @@ -680,11 +680,11 @@ name = "val" i64_range = [0, 10] [[bucket_writers]] -percent = 0.6 +ratio = 0.6 agents = [{name = "foo", sampling_interval = "10s"}] [[bucket_writers]] -percent = 0.4 +ratio = 0.4 agents = [{name = "bar", sampling_interval = "1m", count = 3}] "#; let spec = DataSpec::from_str(toml).unwrap(); diff --git a/iox_data_generator/src/tag_set.rs b/iox_data_generator/src/tag_set.rs index b7f3a3cd40..c8db7ab3eb 100644 --- a/iox_data_generator/src/tag_set.rs +++ b/iox_data_generator/src/tag_set.rs @@ -494,7 +494,7 @@ name = "f1" i64_range = [0, 23] [[bucket_writers]] -percent = 1.0 +ratio = 1.0 agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); @@ -542,7 +542,7 @@ name = "f1" i64_range = [0, 23] [[bucket_writers]] -percent = 1.0 +ratio = 1.0 agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); @@ -611,7 +611,7 @@ name = "f1" i64_range = [0, 23] [[bucket_writers]] -percent = 1.0 +ratio = 1.0 agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); diff --git a/iox_data_generator/src/write.rs b/iox_data_generator/src/write.rs index c111666937..8852baad33 100644 --- a/iox_data_generator/src/write.rs +++ b/iox_data_generator/src/write.rs @@ -343,7 +343,7 @@ name = "val" i64_range = [3,3] [[bucket_writers]] -percent = 1.0 +ratio = 1.0 agents = [{name = "foo", sampling_interval = "1s"}] "#; @@ -396,7 +396,7 @@ name = "val" i64_range = [2, 2] [[bucket_writers]] -percent = 1.0 +ratio = 1.0 agents = [{name = "foo", sampling_interval = "1s"}] "#; diff --git a/perf/battery-0/datagen.toml b/perf/battery-0/datagen.toml index 8513cecdd5..7026fcfada 100644 --- a/perf/battery-0/datagen.toml +++ b/perf/battery-0/datagen.toml @@ -14,5 +14,5 @@ name = "usage_user" f64_range = [0.0, 100.0] [[bucket_writers]] -percent = 1.0 +ratio = 1.0 agents = [{name = "foo", count = 3, sampling_interval = "10s"}]