diff --git a/iox_data_generator/benches/point_generation.rs b/iox_data_generator/benches/point_generation.rs index 5b562ccbeb..0b38ef81d9 100644 --- a/iox_data_generator/benches/point_generation.rs +++ b/iox_data_generator/benches/point_generation.rs @@ -1,10 +1,12 @@ use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use iox_data_generator::agent::Agent; +use iox_data_generator::specification::{AgentAssignmentSpec, BucketWriterSpec, OrgBucket}; use iox_data_generator::{ specification::{AgentSpec, DataSpec, FieldSpec, FieldValueSpec, MeasurementSpec}, tag_set::GeneratedTagSets, write::PointsWriterBuilder, }; +use std::time::Duration; pub fn single_agent(c: &mut Criterion) { let spec = DataSpec { @@ -13,8 +15,6 @@ pub fn single_agent(c: &mut Criterion) { tag_sets: vec![], agents: vec![AgentSpec { name: "foo".to_string(), - count: None, - sampling_interval: Some("1s".to_string()), measurements: vec![MeasurementSpec { name: "measurement-1".into(), count: None, @@ -29,6 +29,14 @@ pub fn single_agent(c: &mut Criterion) { has_one: vec![], tag_pairs: vec![], }], + bucket_writers: vec![BucketWriterSpec { + percent: 1.0, + agents: vec![AgentAssignmentSpec { + name: "foo".to_string(), + count: None, + sampling_interval: "1s".to_string(), + }], + }], }; let mut points_writer = PointsWriterBuilder::new_no_op(true); @@ -47,6 +55,10 @@ pub fn single_agent(c: &mut Criterion) { b.iter(|| { let r = block_on(iox_data_generator::generate( &spec, + vec![OrgBucket { + org: "foo".to_string(), + bucket: "bar".to_string(), + }], &mut points_writer, start_datetime, end_datetime, @@ -125,8 +137,6 @@ for_each = [ [[agents]] name = "foo" -# create this many agents -count = 3 [[agents.measurements]] name = "storage_usage_bucket_cardinality" @@ -142,6 +152,10 @@ tag_pairs = [ [[agents.measurements.fields]] name = "gauge" i64_range = [1, 8147240] + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "1s", count = 3}] "#).unwrap(); let generated_tag_sets = GeneratedTagSets::from_spec(&spec).unwrap(); @@ -155,6 +169,8 @@ i64_range = [1, 8147240] let mut agents = Agent::from_spec( &spec.agents[0], + 3, + Duration::from_millis(10), start_datetime, end_datetime, 0, @@ -172,7 +188,7 @@ i64_range = [1, 8147240] group.bench_function("single agent with basic configuration", |b| { b.iter(|| { agent.reset_current_date_time(0); - let points_writer = points_writer.build_for_agent("foo").unwrap(); + let points_writer = points_writer.build_for_agent("foo", "foo", "foo").unwrap(); let r = block_on(agent.generate_all(points_writer, 1)); let n_points = r.expect("Could not generate data"); assert_eq!(n_points, expected_points as usize); diff --git a/iox_data_generator/schemas/cap-write.toml b/iox_data_generator/schemas/cap-write.toml index 8b5cce47d1..d80fa8cc2c 100644 --- a/iox_data_generator/schemas/cap-write.toml +++ b/iox_data_generator/schemas/cap-write.toml @@ -2,10 +2,12 @@ # https://github.com/influxdata/idpe/tree/e493a8e9b6b773e9374a8542ddcab7d8174d320d/performance/capacity/write name = "cap_write" +[[bucket_writers]] +percent = 1.0 +agents = [{name = "telegraf", count = 3, sampling_interval = "10s"}] + [[agents]] name = "telegraf" -count = 3 -sampling_interval = "10s" tag_pairs = [ {key = "host", template = "host-{{agent.id}}"} ] diff --git a/iox_data_generator/schemas/full_example.toml b/iox_data_generator/schemas/full_example.toml index f113691bdd..6be2dcd5a4 100644 --- a/iox_data_generator/schemas/full_example.toml +++ b/iox_data_generator/schemas/full_example.toml @@ -103,14 +103,10 @@ for_each = [ "foo_bar" ] +# Agent specs can be referenced later on by bucket writers, which specify how frequently +# data should be written and by how many different agents. [[agents]] name = "first_agent" -# Create this many agents. Agents are single threaded so the way to get more paralellism is to -# increase the number of agents. -count = 2 -# each agent will generate a sampling of data every 10ms. If running from a historical point in -# time, the agents will push data as quickly as they can until they catch up to now. -sampling_interval = "10ms" # if specifying tag_pairs at the agent level, every line that the agent generates will have these # tag pairs added to it. Note that the template has the same helpers as those in value (except for id). # In addition, it has an accessor for the agent id. @@ -171,4 +167,22 @@ count = 2 [[agents.measurements.fields]] name = "f1" -bool = true \ No newline at end of file +bool = true + +# bucket_writers specify how to split up the list of supplied buckets to write to. If +# only a single one is specified via the CLI flags, then you'd want only a single bucket_writer +# with a percent of 1.0. +# +# These make it possible to split up a large list of buckets to write to and send different +# amounts of write load as well as different schemas through specifying different agents. +[[bucket_writers]] +# 20% of the buckets specified in the --bucket_list file will have these agents writing to them +percent = 0.2 +# for each of those buckets, have 3 of the first_agent writing every 10s, and 1 of the another_example writing every minute. +agents = [{name = "first_agent", count = 3, sampling_interval = "10s"}, {name = "another_example", sampling_interval = "1m"}] + +[[bucket_writers]] +# the remaining 80% of the buckeets specified will write using these agents +percent = 0.8 +# we'll only have a single agent of another_example for each database +agents = [{name = "another_example", sampling_interval = "1s"}] diff --git a/iox_data_generator/schemas/storage_cardinality_example.toml b/iox_data_generator/schemas/storage_cardinality_example.toml index 2c6bff9d35..5210c6efac 100644 --- a/iox_data_generator/schemas/storage_cardinality_example.toml +++ b/iox_data_generator/schemas/storage_cardinality_example.toml @@ -58,11 +58,12 @@ for_each = [ "partition_id", ] +[[bucket_writers]] +percent = 1.0 +agents = [{name = "sender", sampling_interval = "10s"}] + [[agents]] name = "sender" -# create this many agents -count = 1 -sampling_interval = "10s" [[agents.measurements]] name = "storage_usage_bucket_cardinality" diff --git a/iox_data_generator/schemas/tracing-spec.toml b/iox_data_generator/schemas/tracing-spec.toml index 2d48c47671..7b31dc59a7 100644 --- a/iox_data_generator/schemas/tracing-spec.toml +++ b/iox_data_generator/schemas/tracing-spec.toml @@ -17,8 +17,6 @@ for_each = ["host", "host.service"] [[agents]] name = "tracing_agent" -count = 1 -sampling_interval = "1s" [[agents.measurements]] name = "traces" @@ -31,3 +29,7 @@ tag_pairs = [ [[agents.measurements.fields]] name = "timing" f64_range = [0.0, 500.0] + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "tracing_agent", sampling_interval = "1s"}] diff --git a/iox_data_generator/src/agent.rs b/iox_data_generator/src/agent.rs index d11fcfbbd1..30202e3925 100644 --- a/iox_data_generator/src/agent.rs +++ b/iox_data_generator/src/agent.rs @@ -8,7 +8,6 @@ use crate::{ }; use crate::tag_set::GeneratedTagSets; -use humantime::parse_duration; use serde_json::json; use snafu::{ResultExt, Snafu}; use std::time::{Duration, Instant}; @@ -39,12 +38,6 @@ pub enum Error { source: crate::write::Error, }, - #[snafu(display("Sampling interval must be valid string: {}", source))] - InvalidSamplingInterval { - /// Underlying `parse` error - source: humantime::DurationError, - }, - #[snafu(display("Error creating agent tag pairs: {}", source))] CouldNotCreateAgentTagPairs { source: crate::tag_pair::Error }, } @@ -77,17 +70,18 @@ pub struct Agent { impl Agent { /// Create agents that will generate data points according to these /// specs. + #[allow(clippy::too_many_arguments)] pub fn from_spec( agent_spec: &specification::AgentSpec, + count: usize, + sampling_interval: Duration, start_datetime: Option, // in nanoseconds since the epoch, defaults to now end_datetime: Option, // also in nanoseconds since the epoch, defaults to now execution_start_time: i64, continue_on: bool, // If true, run in "continue" mode after historical data is generated generated_tag_sets: &GeneratedTagSets, ) -> Result> { - let agent_count = agent_spec.count.unwrap_or(1); - - let agents: Vec<_> = (1..agent_count + 1) + let agents: Vec<_> = (1..count + 1) .into_iter() .map(|agent_id| { let data = json!({"agent": {"id": agent_id, "name": agent_spec.name}}); @@ -114,17 +108,11 @@ impl Agent { let current_datetime = start_datetime.unwrap_or_else(now_ns); let end_datetime = end_datetime.unwrap_or_else(now_ns); - // Convert to nanoseconds - let sampling_interval = match &agent_spec.sampling_interval { - None => None, - Some(s) => Some(parse_duration(s).context(InvalidSamplingInterval)?), - }; - Ok(Self { id: agent_id, name: agent_spec.name.to_string(), measurement_generators, - sampling_interval, + sampling_interval: Some(sampling_interval), current_datetime, end_datetime, continue_on, diff --git a/iox_data_generator/src/bin/iox_data_generator.rs b/iox_data_generator/src/bin/iox_data_generator.rs index 702e3e5610..d385ea1b3b 100644 --- a/iox_data_generator/src/bin/iox_data_generator.rs +++ b/iox_data_generator/src/bin/iox_data_generator.rs @@ -13,7 +13,12 @@ use chrono::prelude::*; use chrono_english::{parse_date_string, Dialect}; use clap::{crate_authors, crate_version, App, Arg}; -use iox_data_generator::{specification::DataSpec, write::PointsWriterBuilder}; +use iox_data_generator::{ + specification::{DataSpec, OrgBucket}, + write::PointsWriterBuilder, +}; +use std::fs::File; +use std::io::{self, BufRead}; use tracing::info; #[tokio::main] @@ -91,6 +96,12 @@ Logging: .help("The bucket name to write to") .takes_value(true), ) + .arg( + Arg::with_name("BUCKET_LIST") + .long("bucket_list") + .help("File name with a list of databases. 1 per line with _ format") + .takes_value(true), + ) .arg( Arg::with_name("TOKEN") .long("token") @@ -176,21 +187,11 @@ Logging: { PointsWriterBuilder::new_file(line_protocol_filename)? } else if let Some(host) = matches.value_of("HOST") { - let (host, org, bucket, token) = validate_api_arguments( - host, - matches.value_of("ORG"), - matches.value_of("BUCKET"), - matches.value_of("TOKEN"), - ); + let token = matches + .value_of("TOKEN") + .expect("--token must be specified"); - PointsWriterBuilder::new_api( - host, - org, - bucket, - token, - matches.value_of("jaeger_debug_header"), - ) - .await? + PointsWriterBuilder::new_api(host, token, matches.value_of("jaeger_debug_header")).await? } else if matches.is_present("PRINT") { PointsWriterBuilder::new_std_out() } else if matches.is_present("NOOP") { @@ -199,8 +200,42 @@ Logging: panic!("One of --print or --output or --host must be provided."); }; + let buckets = match ( + matches.value_of("ORG"), + matches.value_of("BUCKET"), + matches.value_of("BUCKET_LIST"), + ) { + (Some(org), Some(bucket), None) => { + vec![OrgBucket { + org: org.to_string(), + bucket: bucket.to_string(), + }] + } + (None, None, Some(bucket_list)) => { + let f = File::open(bucket_list).expect("unable to open bucket_list file"); + + io::BufReader::new(f) + .lines() + .map(|l| { + let l = l.expect("unable to read line from bucket list"); + let parts = l.split('_').collect::>(); + if parts.len() != 2 { + panic!("error parsing org and bucket from {}", l); + } + + let org = parts[0].to_string(); + let bucket = parts[1].to_string(); + + OrgBucket { org, bucket } + }) + .collect::>() + } + _ => panic!("must specify either --org AND --bucket OR --bucket_list"), + }; + let result = iox_data_generator::generate( &data_spec, + buckets, &mut points_writer_builder, start_datetime, end_datetime, @@ -230,36 +265,6 @@ fn datetime_nanoseconds(arg: Option<&str>, now: DateTime) -> Option }) } -fn validate_api_arguments<'a>( - host: &'a str, - org: Option<&'a str>, - bucket: Option<&'a str>, - token: Option<&'a str>, -) -> (&'a str, &'a str, &'a str, &'a str) { - let mut errors = vec![]; - - if org.is_none() { - errors.push("`--org` is missing"); - } - if bucket.is_none() { - errors.push("`--bucket` is missing"); - } - if token.is_none() { - errors.push("`--token` is missing"); - } - - if errors.is_empty() { - // These `unwrap`s are safe because otherwise errors wouldn't be empty - (host, org.unwrap(), bucket.unwrap(), token.unwrap()) - } else { - panic!( - "When `--host` is specified, `--org`, `--bucket`, and `--token` are required, \ - but {}", - errors.join(", ") - ); - } -} - #[cfg(test)] mod test { use super::*; diff --git a/iox_data_generator/src/lib.rs b/iox_data_generator/src/lib.rs index f744afea4a..26cfb8582c 100644 --- a/iox_data_generator/src/lib.rs +++ b/iox_data_generator/src/lib.rs @@ -89,6 +89,16 @@ pub enum Error { /// Underlying `tag_set` module error source: tag_set::Error, }, + + /// Error splitting input buckets to agents that write to them + #[snafu(display( + "Error splitting input buckets into agents that write to them: {}", + source + ))] + CouldNotAssignAgents { + /// Underlying `specification` module error + source: specification::Error, + }, } type Result = std::result::Result; @@ -102,6 +112,7 @@ type Result = std::result::Result; #[allow(clippy::too_many_arguments)] pub async fn generate( spec: &specification::DataSpec, + buckets: Vec, points_writer_builder: &mut write::PointsWriterBuilder, start_datetime: Option, end_datetime: Option, @@ -112,45 +123,60 @@ pub async fn generate( ) -> Result { let mut handles = vec![]; + let bucket_agents = spec + .bucket_split_to_agents(&buckets) + .context(CouldNotAssignAgents)?; + let generated_tag_sets = GeneratedTagSets::from_spec(spec).context(CouldNotGenerateTagSets)?; let lock = Arc::new(tokio::sync::Mutex::new(())); let start = std::time::Instant::now(); - let agents = spec - .agents - .iter() - .map(|spec| { - Agent::from_spec( - spec, + for bucket_agents in &bucket_agents { + for agent_assignment in bucket_agents.agent_assignments.iter() { + let agents = Agent::from_spec( + agent_assignment.spec, + agent_assignment.count, + agent_assignment.sampling_interval, start_datetime, end_datetime, execution_start_time, continue_on, &generated_tag_sets, ) - .context(CouldNotCreateAgent) - }) - .collect::>>>()?; - let agents = agents.into_iter().flatten(); + .context(CouldNotCreateAgent)?; - for mut agent in agents { - let agent_points_writer = points_writer_builder - .build_for_agent(&agent.name) - .context(CouldNotCreateAgentWriter)?; + info!( + "Configuring {} agents of \"{}\" to write data to org {} and bucket {}", + agent_assignment.count, + agent_assignment.spec.name, + bucket_agents.org_bucket.org, + bucket_agents.org_bucket.bucket + ); - let lock_ref = Arc::clone(&lock); + for mut agent in agents.into_iter() { + let agent_points_writer = points_writer_builder + .build_for_agent( + &agent_assignment.spec.name, + &bucket_agents.org_bucket.org, + &bucket_agents.org_bucket.bucket, + ) + .context(CouldNotCreateAgentWriter)?; - handles.push(tokio::task::spawn(async move { - // did this weird hack because otherwise the stdout outputs would be jumbled together garbage - if one_agent_at_a_time { - let _l = lock_ref.lock().await; - agent.generate_all(agent_points_writer, batch_size).await - } else { - agent.generate_all(agent_points_writer, batch_size).await + let lock_ref = Arc::clone(&lock); + + handles.push(tokio::task::spawn(async move { + // did this weird hack because otherwise the stdout outputs would be jumbled together garbage + if one_agent_at_a_time { + let _l = lock_ref.lock().await; + agent.generate_all(agent_points_writer, batch_size).await + } else { + agent.generate_all(agent_points_writer, batch_size).await + } + })); } - })); + } } let mut total_points = 0; @@ -190,6 +216,7 @@ mod test { use crate::specification::*; use influxdb2_client::models::WriteDataPoint; use std::str::FromStr; + use std::time::Duration; type Error = Box; type Result = std::result::Result; @@ -201,14 +228,18 @@ name = "demo_schema" [[agents]] name = "foo" -sampling_interval = "10s" # seconds [[agents.measurements]] name = "cpu" [[agents.measurements.fields]] name = "val" -i64_range = [1, 1]"#; +i64_range = [1, 1] + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "10s"}] +"#; let data_spec = DataSpec::from_str(toml).unwrap(); let agent_spec = &data_spec.agents[0]; @@ -224,6 +255,8 @@ i64_range = [1, 1]"#; let mut agent = agent::Agent::from_spec( agent_spec, + 1, + Duration::from_secs(10), start_datetime, end_datetime, execution_start_time, diff --git a/iox_data_generator/src/measurement.rs b/iox_data_generator/src/measurement.rs index e83c2eec92..bd1cf951b3 100644 --- a/iox_data_generator/src/measurement.rs +++ b/iox_data_generator/src/measurement.rs @@ -545,7 +545,11 @@ mod test { [[agents.measurements.fields]] name = "val" - i64_range = [3, 3]"#, + i64_range = [3, 3] + + [[bucket_writers]] + percent = 1.0 + agents = [{name = "foo", sampling_interval = "10s"}]"#, ) .unwrap(); @@ -604,7 +608,11 @@ mod test { [[agents.measurements.fields]] name = "val" - i64_range = [3, 3]"#, + i64_range = [3, 3] + + [[bucket_writers]] + percent = 1.0 + agents = [{name = "foo", sampling_interval = "10s"}]"#, ) .unwrap(); diff --git a/iox_data_generator/src/specification.rs b/iox_data_generator/src/specification.rs index 05e9173e1b..46bda096c8 100644 --- a/iox_data_generator/src/specification.rs +++ b/iox_data_generator/src/specification.rs @@ -1,11 +1,14 @@ //! Reading and interpreting data generation specifications. +use humantime::parse_duration; use serde::Deserialize; -use snafu::{ResultExt, Snafu}; -use std::{fs, ops::Range, str::FromStr}; +use snafu::{OptionExt, ResultExt, Snafu}; +use std::{fs, ops::Range, str::FromStr, sync::Arc, time::Duration}; +use tracing::warn; /// Errors that may happen while reading a TOML specification. #[derive(Snafu, Debug)] +#[allow(missing_docs)] pub enum Error { /// File-related error that may happen while reading a specification #[snafu(display(r#"Error reading data spec from TOML file: {}"#, source))] @@ -20,6 +23,15 @@ pub enum Error { /// Underlying TOML error that caused this problem source: toml::de::Error, }, + + #[snafu(display("Sampling interval must be valid string: {}", source))] + InvalidSamplingInterval { source: humantime::DurationError }, + + #[snafu(display( + "Agent {} referenced in bucket_writers, but not present in spec", + agent + ))] + AgentNotFound { agent: String }, } type Result = std::result::Result; @@ -43,8 +55,10 @@ pub struct DataSpec { /// set of tags that appear. #[serde(default)] pub tag_sets: Vec, - /// The specification for the data-generating agents in this data set. + /// The specification for the agents that can be used to write data to databases. pub agents: Vec, + /// The specification for writing to the provided list of buckets. + pub bucket_writers: Vec, } impl DataSpec { @@ -53,6 +67,111 @@ impl DataSpec { let spec_toml = fs::read_to_string(file_name).context(ReadFile)?; Self::from_str(&spec_toml) } + + /// Given a collection of OrgBuckets, assign each a set of agents based on the spec + pub fn bucket_split_to_agents<'a>( + &'a self, + buckets: &'a [OrgBucket], + ) -> Result>> { + let mut bucket_agents = Vec::with_capacity(buckets.len()); + + let mut start = 0; + for w in &self.bucket_writers { + if start >= buckets.len() { + warn!( + "Bucket_writers percentages > 1.0. Writer {:?} and later skipped.", + w + ); + break; + } + + // validate the agents actually exist in the spec + for a in &w.agents { + let _ = self.agent_by_name(&a.name).unwrap_or_else(|| { + panic!( + "agent {} referenced in bucket writers, but isn't in the spec", + a.name + ) + }); + } + + let agents: Vec<_> = w + .agents + .iter() + .map(|a| { + let count = a.count.unwrap_or(1); + let sampling_interval = + parse_duration(&a.sampling_interval).context(InvalidSamplingInterval)?; + let spec = self + .agent_by_name(&a.name) + .context(AgentNotFound { agent: &a.name })?; + + Ok(AgentAssignment { + spec, + count, + sampling_interval, + }) + }) + .collect::>>()?; + let agents = Arc::new(agents); + + let mut end = (buckets.len() as f64 * w.percent).ceil() as usize + start; + if end > buckets.len() { + end = buckets.len(); + } + + for org_bucket in &buckets[start..end] { + bucket_agents.push(BucketAgents { + org_bucket, + agent_assignments: Arc::clone(&agents), + }) + } + + start = end; + } + + Ok(bucket_agents) + } + + /// Get the agent spec by its name + pub fn agent_by_name(&self, name: &str) -> Option<&AgentSpec> { + for a in &self.agents { + if a.name == name { + return Some(a); + } + } + + None + } +} + +#[derive(Debug, Eq, PartialEq)] +/// Specifies an org and a bucket to write data to +pub struct OrgBucket { + /// The organization name + pub org: String, + /// The bucket name + pub bucket: String, +} + +#[derive(Debug)] +/// Assignment info for an agent to a bucket +pub struct AgentAssignment<'a> { + /// The agent specification for writing to the assigned bucket + pub spec: &'a AgentSpec, + /// The number of these agents that should be writing to the bucket + pub count: usize, + /// The sampling interval agents will generate data on + pub sampling_interval: Duration, +} + +#[derive(Debug)] +/// Agent assignments mapped to a bucket +pub struct BucketAgents<'a> { + /// The organization and bucket data will get written to + pub org_bucket: &'a OrgBucket, + /// The agents specifications that will be writing to the bucket + pub agent_assignments: Arc>>, } impl FromStr for DataSpec { @@ -110,6 +229,33 @@ pub struct TagSetsSpec { pub for_each: Vec, } +/// The specification for what should be written to the list of provided org buckets. +/// Buckets will be written to by one or more agents with the given sampling interval and +/// agent count. +#[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct BucketWriterSpec { + /// The percentage of buckets from the provided list that should use these agents. The + /// percent should add up to 1.0 (100%). + pub percent: f64, + /// The agents that should be used to write to these databases. + pub agents: Vec, +} + +/// The specification for the specific configuration of how an agent should write to a database. +#[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct AgentAssignmentSpec { + /// The name of the `AgentSpec` to use + pub name: String, + /// The number of these agents that should write to the database + pub count: Option, + /// How frequently each agent will write to the database. This is applicable when using the + /// --continue flag. Otherwise, if doing historical backfill, timestamps of generated data + /// will be this far apart and data will be written in as quickly as possible. + pub sampling_interval: String, +} + /// The specification of the behavior of an agent, the entity responsible for /// generating a number of data points according to its configuration. #[derive(Deserialize, Debug)] @@ -118,12 +264,6 @@ pub struct TagSetsSpec { pub struct AgentSpec { /// The name of the agent, which can be referenced in templates with `agent.name`. pub name: String, - /// Specifies the number of agents that should be created with this spec. - /// Default value is 1. - pub count: Option, - /// How often this agent should generate samples, in a duration string. If - /// not specified, this agent will only generate one sample. - pub sampling_interval: Option, /// The specifications for the measurements for the agent to generate. pub measurements: Vec, /// A collection of strings that reference other `Values` collections. Each agent will have one @@ -493,6 +633,10 @@ name = "cpu" [[agents.measurements.fields]] name = "host" template = "server" + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "10s"}] "#; let spec = DataSpec::from_str(toml).unwrap(); @@ -513,4 +657,79 @@ template = "server" field_spec ); } + + #[test] + fn split_buckets_by_writer_spec() { + let toml = r#" +name = "demo_schema" + +[[agents]] +name = "foo" +[[agents.measurements]] +name = "cpu" +[[agents.measurements.fields]] +name = "host" +template = "server" + +[[agents]] +name = "bar" +[[agents.measurements]] +name = "whatevs" +[[agents.measurements.fields]] +name = "val" +i64_range = [0, 10] + +[[bucket_writers]] +percent = 0.6 +agents = [{name = "foo", sampling_interval = "10s"}] + +[[bucket_writers]] +percent = 0.4 +agents = [{name = "bar", sampling_interval = "1m", count = 3}] +"#; + let spec = DataSpec::from_str(toml).unwrap(); + let buckets = vec![ + OrgBucket { + org: "a".to_string(), + bucket: "1".to_string(), + }, + OrgBucket { + org: "a".to_string(), + bucket: "2".to_string(), + }, + OrgBucket { + org: "b".to_string(), + bucket: "1".to_string(), + }, + ]; + + let bucket_agents = spec.bucket_split_to_agents(&buckets).unwrap(); + + let b = &bucket_agents[0]; + assert_eq!(b.org_bucket, &buckets[0]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(10) + ); + assert_eq!(b.agent_assignments[0].count, 1); + assert_eq!(b.agent_assignments[0].spec.name, "foo"); + + let b = &bucket_agents[1]; + assert_eq!(b.org_bucket, &buckets[1]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(10) + ); + assert_eq!(b.agent_assignments[0].count, 1); + assert_eq!(b.agent_assignments[0].spec.name, "foo"); + + let b = &bucket_agents[2]; + assert_eq!(b.org_bucket, &buckets[2]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(60) + ); + assert_eq!(b.agent_assignments[0].count, 3); + assert_eq!(b.agent_assignments[0].spec.name, "bar"); + } } diff --git a/iox_data_generator/src/tag_set.rs b/iox_data_generator/src/tag_set.rs index 7b39fc48ed..b7f3a3cd40 100644 --- a/iox_data_generator/src/tag_set.rs +++ b/iox_data_generator/src/tag_set.rs @@ -492,7 +492,10 @@ name = "cpu" [[agents.measurements.fields]] name = "f1" i64_range = [0, 23] -"#; + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); let tag_sets = GeneratedTagSets::from_spec(&spec).unwrap(); @@ -537,7 +540,10 @@ name = "cpu" [[agents.measurements.fields]] name = "f1" i64_range = [0, 23] -"#; + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); let tag_sets = GeneratedTagSets::from_spec(&spec).unwrap(); @@ -603,7 +609,10 @@ name = "cpu" [[agents.measurements.fields]] name = "f1" i64_range = [0, 23] -"#; + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); let tag_sets = GeneratedTagSets::from_spec(&spec).unwrap(); diff --git a/iox_data_generator/src/write.rs b/iox_data_generator/src/write.rs index 21e3d54f37..c111666937 100644 --- a/iox_data_generator/src/write.rs +++ b/iox_data_generator/src/write.rs @@ -94,11 +94,7 @@ pub struct PointsWriterBuilder { #[derive(Debug)] enum PointsWriterConfig { - Api { - client: influxdb2_client::Client, - org: String, - bucket: String, - }, + Api(influxdb2_client::Client), Directory(PathBuf), NoOp { perform_write: bool, @@ -113,8 +109,6 @@ impl PointsWriterBuilder { /// specified org and bucket. pub async fn new_api( host: impl Into + Send, - org: impl Into + Send, - bucket: impl Into + Send, token: impl Into + Send, jaeger_debug: Option<&str>, ) -> Result { @@ -133,15 +127,9 @@ impl PointsWriterBuilder { if let Some(header) = jaeger_debug { client = client.with_jaeger_debug(header.to_string()); } - let org = org.into(); - let bucket = bucket.into(); Ok(Self { - config: PointsWriterConfig::Api { - client, - org, - bucket, - }, + config: PointsWriterConfig::Api(client), }) } @@ -172,16 +160,17 @@ impl PointsWriterBuilder { /// Create a writer out of this writer's configuration for a particular /// agent that runs in a separate thread/task. - pub fn build_for_agent(&mut self, name: impl Into) -> Result { + pub fn build_for_agent( + &mut self, + name: impl Into, + org: impl Into, + bucket: impl Into, + ) -> Result { let inner_writer = match &mut self.config { - PointsWriterConfig::Api { - client, - org, - bucket, - } => InnerPointsWriter::Api { + PointsWriterConfig::Api(client) => InnerPointsWriter::Api { client: client.clone(), - org: org.clone(), - bucket: bucket.clone(), + org: org.into(), + bucket: bucket.into(), }, PointsWriterConfig::Directory(dir_path) => { let mut filename = dir_path.clone(); @@ -351,7 +340,12 @@ name = "cpu" [[agents.measurements.fields]] name = "val" -i64_range = [3,3]"#; +i64_range = [3,3] + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "1s"}] +"#; let data_spec = DataSpec::from_str(toml).unwrap(); let mut points_writer_builder = PointsWriterBuilder::new_vec(); @@ -360,6 +354,10 @@ i64_range = [3,3]"#; generate( &data_spec, + vec![OrgBucket { + org: "foo".to_string(), + bucket: "bar".to_string(), + }], &mut points_writer_builder, Some(now), Some(now), @@ -389,14 +387,18 @@ name = "demo_schema" [[agents]] name = "foo" -sampling_interval = "1s" # seconds [[agents.measurements]] name = "cpu" [[agents.measurements.fields]] name = "val" -i64_range = [2, 2]"#; +i64_range = [2, 2] + +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", sampling_interval = "1s"}] +"#; let data_spec = DataSpec::from_str(toml).unwrap(); let mut points_writer_builder = PointsWriterBuilder::new_vec(); @@ -405,6 +407,10 @@ i64_range = [2, 2]"#; generate( &data_spec, + vec![OrgBucket { + org: "foo".to_string(), + bucket: "bar".to_string(), + }], &mut points_writer_builder, Some(now - 1_000_000_000), Some(now), diff --git a/perf/battery-0/datagen.toml b/perf/battery-0/datagen.toml index 91800a8a2a..8513cecdd5 100644 --- a/perf/battery-0/datagen.toml +++ b/perf/battery-0/datagen.toml @@ -2,8 +2,6 @@ name = "example" [[agents]] name = "foo" -count = 3 -sampling_interval = "10s" [[agents.measurements]] name = "cpu" @@ -15,3 +13,6 @@ tag_pairs = [ name = "usage_user" f64_range = [0.0, 100.0] +[[bucket_writers]] +percent = 1.0 +agents = [{name = "foo", count = 3, sampling_interval = "10s"}]