diff --git a/Cargo.lock b/Cargo.lock index 31237308e4..6a1262c96c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1725,6 +1725,7 @@ dependencies = [ "influxdb_iox_client", "itertools", "rand", + "regex", "serde", "serde_json", "snafu", diff --git a/iox_data_generator/Cargo.toml b/iox_data_generator/Cargo.toml index 39329b9b86..9b2effefa5 100644 --- a/iox_data_generator/Cargo.toml +++ b/iox_data_generator/Cargo.toml @@ -17,6 +17,7 @@ influxdb2_client = { path = "../influxdb2_client" } influxdb_iox_client = { path = "../influxdb_iox_client" } itertools = "0.10.0" rand = { version = "0.8.3", features = ["small_rng"] } +regex = "1.5" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.72" snafu = "0.6.8" diff --git a/iox_data_generator/benches/point_generation.rs b/iox_data_generator/benches/point_generation.rs index c84afd8377..55d67e26a0 100644 --- a/iox_data_generator/benches/point_generation.rs +++ b/iox_data_generator/benches/point_generation.rs @@ -1,6 +1,6 @@ use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use iox_data_generator::agent::Agent; -use iox_data_generator::specification::{AgentAssignmentSpec, BucketWriterSpec, OrgBucket}; +use iox_data_generator::specification::{AgentAssignmentSpec, DatabaseWriterSpec}; use iox_data_generator::{ specification::{AgentSpec, DataSpec, FieldSpec, FieldValueSpec, MeasurementSpec}, tag_set::GeneratedTagSets, @@ -29,8 +29,9 @@ pub fn single_agent(c: &mut Criterion) { has_one: vec![], tag_pairs: vec![], }], - bucket_writers: vec![BucketWriterSpec { - ratio: 1.0, + database_writers: vec![DatabaseWriterSpec { + database_ratio: Some(1.0), + database_regex: None, agents: vec![AgentAssignmentSpec { name: "foo".to_string(), count: None, @@ -55,10 +56,7 @@ pub fn single_agent(c: &mut Criterion) { b.iter(|| { let r = block_on(iox_data_generator::generate( &spec, - vec![OrgBucket { - org: "foo".to_string(), - bucket: "bar".to_string(), - }], + vec!["foo_bar".to_string()], &mut points_writer, start_datetime, end_datetime, @@ -153,8 +151,7 @@ tag_pairs = [ name = "gauge" i64_range = [1, 8147240] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] agents = [{name = "foo", sampling_interval = "1s", count = 3}] "#).unwrap(); diff --git a/iox_data_generator/schemas/cap-write.toml b/iox_data_generator/schemas/cap-write.toml index 4cbe75042a..167249877e 100644 --- a/iox_data_generator/schemas/cap-write.toml +++ b/iox_data_generator/schemas/cap-write.toml @@ -2,8 +2,8 @@ # https://github.com/influxdata/idpe/tree/e493a8e9b6b773e9374a8542ddcab7d8174d320d/performance/capacity/write name = "cap_write" -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] +database_ratio = 1.0 agents = [{name = "telegraf", count = 3, sampling_interval = "10s"}] [[agents]] diff --git a/iox_data_generator/schemas/full_example.toml b/iox_data_generator/schemas/full_example.toml index 170e814601..4794bfcd58 100644 --- a/iox_data_generator/schemas/full_example.toml +++ b/iox_data_generator/schemas/full_example.toml @@ -169,20 +169,20 @@ count = 2 name = "f1" bool = true -# bucket_writers specify how to split up the list of supplied buckets to write to. If +# database_writers specify how to split up the list of supplied buckets to write to. If # only a single one is specified via the CLI flags, then you'd want only a single bucket_writer # with a percent of 1.0. # # These make it possible to split up a large list of buckets to write to and send different # amounts of write load as well as different schemas through specifying different agents. -[[bucket_writers]] -# 20% of the buckets specified in the --bucket_list file will have these agents writing to them -ratio = 0.2 -# for each of those buckets, have 3 of the first_agent writing every 10s, and 1 of the another_example writing every minute. +[[database_writers]] +# the first 20% of the databases specified in the --bucket_list file will have these agents writing to them +database_ratio = 0.2 +# for each of those databases, have 3 of the first_agent writing every 10s, and 1 of the another_example writing every minute. agents = [{name = "first_agent", count = 3, sampling_interval = "10s"}, {name = "another_example", sampling_interval = "1m"}] -[[bucket_writers]] -# the remaining 80% of the buckeets specified will write using these agents -ratio = 0.8 +[[database_writers]] +# the remaining 80% of the databases specified will write using these agents +database_ratio = 0.8 # we'll only have a single agent of another_example for each database agents = [{name = "another_example", sampling_interval = "1s"}] diff --git a/iox_data_generator/schemas/storage_cardinality_example.toml b/iox_data_generator/schemas/storage_cardinality_example.toml index 10041f5720..8e8682c491 100644 --- a/iox_data_generator/schemas/storage_cardinality_example.toml +++ b/iox_data_generator/schemas/storage_cardinality_example.toml @@ -58,8 +58,8 @@ for_each = [ "partition_id", ] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] +database_ratio = 1.0 agents = [{name = "sender", sampling_interval = "10s"}] [[agents]] diff --git a/iox_data_generator/schemas/tracing-spec.toml b/iox_data_generator/schemas/tracing-spec.toml index 37b9a9c506..1df620d745 100644 --- a/iox_data_generator/schemas/tracing-spec.toml +++ b/iox_data_generator/schemas/tracing-spec.toml @@ -30,6 +30,6 @@ tag_pairs = [ name = "timing" f64_range = [0.0, 500.0] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] +database_ratio = 1.0 agents = [{name = "tracing_agent", sampling_interval = "1s"}] diff --git a/iox_data_generator/src/bin/iox_data_generator.rs b/iox_data_generator/src/bin/iox_data_generator.rs index d385ea1b3b..00e2bc4da5 100644 --- a/iox_data_generator/src/bin/iox_data_generator.rs +++ b/iox_data_generator/src/bin/iox_data_generator.rs @@ -13,10 +13,7 @@ use chrono::prelude::*; use chrono_english::{parse_date_string, Dialect}; use clap::{crate_authors, crate_version, App, Arg}; -use iox_data_generator::{ - specification::{DataSpec, OrgBucket}, - write::PointsWriterBuilder, -}; +use iox_data_generator::{specification::DataSpec, write::PointsWriterBuilder}; use std::fs::File; use std::io::{self, BufRead}; use tracing::info; @@ -97,8 +94,8 @@ Logging: .takes_value(true), ) .arg( - Arg::with_name("BUCKET_LIST") - .long("bucket_list") + Arg::with_name("DATABASE_LIST") + .long("database_list") .help("File name with a list of databases. 1 per line with _ format") .takes_value(true), ) @@ -203,34 +200,20 @@ Logging: let buckets = match ( matches.value_of("ORG"), matches.value_of("BUCKET"), - matches.value_of("BUCKET_LIST"), + matches.value_of("DATABASE_LIST"), ) { (Some(org), Some(bucket), None) => { - vec![OrgBucket { - org: org.to_string(), - bucket: bucket.to_string(), - }] + vec![format!("{}_{}", org, bucket)] } (None, None, Some(bucket_list)) => { - let f = File::open(bucket_list).expect("unable to open bucket_list file"); + let f = File::open(bucket_list).expect("unable to open database_list file"); io::BufReader::new(f) .lines() - .map(|l| { - let l = l.expect("unable to read line from bucket list"); - let parts = l.split('_').collect::>(); - if parts.len() != 2 { - panic!("error parsing org and bucket from {}", l); - } - - let org = parts[0].to_string(); - let bucket = parts[1].to_string(); - - OrgBucket { org, bucket } - }) + .map(|l| l.expect("unable to read database from database_list file")) .collect::>() } - _ => panic!("must specify either --org AND --bucket OR --bucket_list"), + _ => panic!("must specify either --org AND --bucket OR --database_list"), }; let result = iox_data_generator::generate( diff --git a/iox_data_generator/src/lib.rs b/iox_data_generator/src/lib.rs index a813de36b6..621ab4ee44 100644 --- a/iox_data_generator/src/lib.rs +++ b/iox_data_generator/src/lib.rs @@ -112,7 +112,7 @@ type Result = std::result::Result; #[allow(clippy::too_many_arguments)] pub async fn generate( spec: &specification::DataSpec, - buckets: Vec, + databases: Vec, points_writer_builder: &mut write::PointsWriterBuilder, start_datetime: Option, end_datetime: Option, @@ -123,8 +123,8 @@ pub async fn generate( ) -> Result { let mut handles = vec![]; - let bucket_agents = spec - .bucket_split_to_agents(&buckets) + let database_agents = spec + .database_split_to_agents(&databases) .context(CouldNotAssignAgents)?; let generated_tag_sets = GeneratedTagSets::from_spec(spec).context(CouldNotGenerateTagSets)?; @@ -133,8 +133,10 @@ pub async fn generate( let start = std::time::Instant::now(); - for bucket_agents in &bucket_agents { - for agent_assignment in bucket_agents.agent_assignments.iter() { + for database_assignments in &database_agents { + let (org, bucket) = org_and_bucket_from_database(database_assignments.database); + + for agent_assignment in database_assignments.agent_assignments.iter() { let agents = Agent::from_spec( agent_assignment.spec, agent_assignment.count, @@ -148,20 +150,16 @@ pub async fn generate( .context(CouldNotCreateAgent)?; info!( - "Configuring {} agents of \"{}\" to write data to org {} and bucket {}", + "Configuring {} agents of \"{}\" to write data to org {} and bucket {} (database {})", database_assignments.database, agent_assignment.count, agent_assignment.spec.name, - bucket_agents.org_bucket.org, - bucket_agents.org_bucket.bucket + org, + bucket, ); for mut agent in agents.into_iter() { let agent_points_writer = points_writer_builder - .build_for_agent( - &agent_assignment.spec.name, - &bucket_agents.org_bucket.org, - &bucket_agents.org_bucket.bucket, - ) + .build_for_agent(&agent_assignment.spec.name, org, bucket) .context(CouldNotCreateAgentWriter)?; let lock_ref = Arc::clone(&lock); @@ -210,6 +208,15 @@ pub fn now_ns() -> i64 { i64::try_from(since_the_epoch.as_nanos()).expect("Time does not fit") } +fn org_and_bucket_from_database(database: &str) -> (&str, &str) { + let parts = database.split('_').collect::>(); + if parts.len() != 2 { + panic!("error parsing org and bucket from {}", database); + } + + (parts[0], parts[1]) +} + #[cfg(test)] mod test { use super::*; @@ -236,8 +243,7 @@ name = "cpu" name = "val" i64_range = [1, 1] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] agents = [{name = "foo", sampling_interval = "10s"}] "#; let data_spec = DataSpec::from_str(toml).unwrap(); diff --git a/iox_data_generator/src/measurement.rs b/iox_data_generator/src/measurement.rs index 111c895106..d952fdd493 100644 --- a/iox_data_generator/src/measurement.rs +++ b/iox_data_generator/src/measurement.rs @@ -547,8 +547,7 @@ mod test { name = "val" i64_range = [3, 3] - [[bucket_writers]] - ratio = 1.0 + [[database_writers]] agents = [{name = "foo", sampling_interval = "10s"}]"#, ) .unwrap(); @@ -610,8 +609,8 @@ mod test { name = "val" i64_range = [3, 3] - [[bucket_writers]] - ratio = 1.0 + [[database_writers]] + database_ratio = 1.0 agents = [{name = "foo", sampling_interval = "10s"}]"#, ) .unwrap(); diff --git a/iox_data_generator/src/specification.rs b/iox_data_generator/src/specification.rs index 97ee2242ec..64f1829cee 100644 --- a/iox_data_generator/src/specification.rs +++ b/iox_data_generator/src/specification.rs @@ -1,6 +1,7 @@ //! Reading and interpreting data generation specifications. use humantime::parse_duration; +use regex::Regex; use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; use std::{fs, ops::Range, str::FromStr, sync::Arc, time::Duration}; @@ -28,10 +29,21 @@ pub enum Error { InvalidSamplingInterval { source: humantime::DurationError }, #[snafu(display( - "Agent {} referenced in bucket_writers, but not present in spec", + "Agent {} referenced in database_writers, but not present in spec", agent ))] AgentNotFound { agent: String }, + + #[snafu(display("database_writers can only use database_ratio or database_regex, not both"))] + DatabaseWritersConfig, + + #[snafu(display( + "database_writer missing database_regex. If one uses a regex, all others must also use it" + ))] + RegexMissing, + + #[snafu(display("database_writers regex {} failed with error: {}", regex, source))] + RegexCompile { regex: String, source: regex::Error }, } type Result = std::result::Result; @@ -57,8 +69,8 @@ pub struct DataSpec { pub tag_sets: Vec, /// The specification for the agents that can be used to write data to databases. pub agents: Vec, - /// The specification for writing to the provided list of buckets. - pub bucket_writers: Vec, + /// The specification for writing to the provided list of databases. + pub database_writers: Vec, } impl DataSpec { @@ -68,33 +80,25 @@ impl DataSpec { Self::from_str(&spec_toml) } - /// Given a collection of OrgBuckets, assign each a set of agents based on the spec - pub fn bucket_split_to_agents<'a>( + /// Given a collection of database names, assign each a set of agents based on the spec + pub fn database_split_to_agents<'a>( &'a self, - buckets: &'a [OrgBucket], - ) -> Result>> { - let mut bucket_agents = Vec::with_capacity(buckets.len()); + databases: &'a [String], + ) -> Result>> { + let mut database_agents = Vec::with_capacity(databases.len()); let mut start = 0; - for w in &self.bucket_writers { - if start >= buckets.len() { - warn!( - "Bucket_writers percentages > 1.0. Writer {:?} and later skipped.", - w - ); - break; - } - // validate the agents actually exist in the spec - for a in &w.agents { - let _ = self.agent_by_name(&a.name).unwrap_or_else(|| { - panic!( - "agent {} referenced in bucket writers, but isn't in the spec", - a.name - ) - }); + // either all database writers must use regex or none of them can. It's either ratio or regex + // for assignment + let use_ratio = self.database_writers[0].database_regex.is_none(); + for b in &self.database_writers { + if use_ratio && b.database_regex.is_some() { + return DatabaseWritersConfig.fail(); } + } + for w in &self.database_writers { let agents: Vec<_> = w .agents .iter() @@ -115,22 +119,43 @@ impl DataSpec { .collect::>>()?; let agents = Arc::new(agents); - let mut end = (buckets.len() as f64 * w.ratio).ceil() as usize + start; - if end > buckets.len() { - end = buckets.len(); - } + let selected_databases = if use_ratio { + if start >= databases.len() { + warn!( + "database_writers percentages > 1.0. Writer {:?} and later skipped.", + w + ); + break; + } - for org_bucket in &buckets[start..end] { - bucket_agents.push(BucketAgents { - org_bucket, + let mut end = (databases.len() as f64 * w.database_ratio.unwrap_or(1.0)).ceil() + as usize + + start; + if end > databases.len() { + end = databases.len(); + } + + let selected_databases = databases[start..end].iter().collect::>(); + start = end; + selected_databases + } else { + let p = w.database_regex.as_ref().context(RegexMissing)?; + let re = Regex::new(p).context(RegexCompile { regex: p })?; + databases + .iter() + .filter(|name| re.is_match(name)) + .collect::>() + }; + + for database in selected_databases { + database_agents.push(DatabaseAgents { + database, agent_assignments: Arc::clone(&agents), }) } - - start = end; } - Ok(bucket_agents) + Ok(database_agents) } /// Get the agent spec by its name @@ -145,32 +170,23 @@ impl DataSpec { } } -#[derive(Debug, Eq, PartialEq)] -/// Specifies an org and a bucket to write data to -pub struct OrgBucket { - /// The organization name - pub org: String, - /// The bucket name - pub bucket: String, -} - #[derive(Debug)] -/// Assignment info for an agent to a bucket +/// Assignment info for an agent to a database pub struct AgentAssignment<'a> { - /// The agent specification for writing to the assigned bucket + /// The agent specification for writing to the assigned database pub spec: &'a AgentSpec, - /// The number of these agents that should be writing to the bucket + /// The number of these agents that should be writing to the database pub count: usize, /// The sampling interval agents will generate data on pub sampling_interval: Duration, } #[derive(Debug)] -/// Agent assignments mapped to a bucket -pub struct BucketAgents<'a> { - /// The organization and bucket data will get written to - pub org_bucket: &'a OrgBucket, - /// The agents specifications that will be writing to the bucket +/// Agent assignments mapped to a database +pub struct DatabaseAgents<'a> { + /// The database data will get written to + pub database: &'a str, + /// The agents specifications that will be writing to the database pub agent_assignments: Arc>>, } @@ -229,15 +245,28 @@ pub struct TagSetsSpec { pub for_each: Vec, } -/// The specification for what should be written to the list of provided org buckets. -/// Buckets will be written to by one or more agents with the given sampling interval and +/// The specification for what should be written to the list of provided databases. +/// Databases will be written to by one or more agents with the given sampling interval and /// agent count. #[derive(Deserialize, Debug)] #[serde(deny_unknown_fields)] -pub struct BucketWriterSpec { - /// The ratio of buckets from the provided list that should use these agents. The - /// ratios of the collection of bucket writer specs should add up to 1.0. - pub ratio: f64, +pub struct DatabaseWriterSpec { + /// The ratio of databases from the provided list that should use these agents. The + /// ratios of the collection of database_writer specs should add up to 1.0. If ratio + /// is not provided it will default to 1.0 (useful for when you specify only a single + /// database_writer. + /// + /// The interval over the provided list of databases is the cumulative sum of the + /// previous ratios to this ratio. So if you have 10 input databases and 3 database_writers + /// with ratios (in order) of `[0.2, 0.4, and 0.6]` you would have the input list of + /// 10 databases split into these three based on their index in the list: `[0, 1]`, + /// `[2, 5]`, and `[6, 9]`. The first 2 databases, then the next 4, then the remaining 6. + /// + /// The list isn't shuffled as ratios are applied. + pub database_ratio: Option, + /// Regex to select databases from the provided list. If regex is used in any one + /// of the database_writers, it must be used for all of them. + pub database_regex: Option, /// The agents that should be used to write to these databases. pub agents: Vec, } @@ -634,8 +663,8 @@ name = "cpu" name = "host" template = "server" -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] +database_ratio = 1.0 agents = [{name = "foo", sampling_interval = "10s"}] "#; let spec = DataSpec::from_str(toml).unwrap(); @@ -659,7 +688,7 @@ agents = [{name = "foo", sampling_interval = "10s"}] } #[test] - fn split_buckets_by_writer_spec() { + fn split_databases_by_writer_spec_ratio() { let toml = r#" name = "demo_schema" @@ -679,34 +708,21 @@ name = "whatevs" name = "val" i64_range = [0, 10] -[[bucket_writers]] -ratio = 0.6 +[[database_writers]] +database_ratio = 0.6 agents = [{name = "foo", sampling_interval = "10s"}] -[[bucket_writers]] -ratio = 0.4 +[[database_writers]] +database_ratio = 0.4 agents = [{name = "bar", sampling_interval = "1m", count = 3}] "#; let spec = DataSpec::from_str(toml).unwrap(); - let buckets = vec![ - OrgBucket { - org: "a".to_string(), - bucket: "1".to_string(), - }, - OrgBucket { - org: "a".to_string(), - bucket: "2".to_string(), - }, - OrgBucket { - org: "b".to_string(), - bucket: "1".to_string(), - }, - ]; + let databases = vec!["a_1".to_string(), "a_2".to_string(), "b_1".to_string()]; - let bucket_agents = spec.bucket_split_to_agents(&buckets).unwrap(); + let database_agents = spec.database_split_to_agents(&databases).unwrap(); - let b = &bucket_agents[0]; - assert_eq!(b.org_bucket, &buckets[0]); + let b = &database_agents[0]; + assert_eq!(b.database, &databases[0]); assert_eq!( b.agent_assignments[0].sampling_interval, Duration::from_secs(10) @@ -714,8 +730,8 @@ agents = [{name = "bar", sampling_interval = "1m", count = 3}] assert_eq!(b.agent_assignments[0].count, 1); assert_eq!(b.agent_assignments[0].spec.name, "foo"); - let b = &bucket_agents[1]; - assert_eq!(b.org_bucket, &buckets[1]); + let b = &database_agents[1]; + assert_eq!(b.database, &databases[1]); assert_eq!( b.agent_assignments[0].sampling_interval, Duration::from_secs(10) @@ -723,8 +739,8 @@ agents = [{name = "bar", sampling_interval = "1m", count = 3}] assert_eq!(b.agent_assignments[0].count, 1); assert_eq!(b.agent_assignments[0].spec.name, "foo"); - let b = &bucket_agents[2]; - assert_eq!(b.org_bucket, &buckets[2]); + let b = &database_agents[2]; + assert_eq!(b.database, &databases[2]); assert_eq!( b.agent_assignments[0].sampling_interval, Duration::from_secs(60) @@ -732,4 +748,152 @@ agents = [{name = "bar", sampling_interval = "1m", count = 3}] assert_eq!(b.agent_assignments[0].count, 3); assert_eq!(b.agent_assignments[0].spec.name, "bar"); } + + #[test] + fn split_databases_by_writer_spec_regex() { + let toml = r#" +name = "demo_schema" + +[[agents]] +name = "foo" +[[agents.measurements]] +name = "cpu" +[[agents.measurements.fields]] +name = "host" +template = "server" + +[[agents]] +name = "bar" +[[agents.measurements]] +name = "whatevs" +[[agents.measurements.fields]] +name = "val" +i64_range = [0, 10] + +[[database_writers]] +database_regex = "foo.*" +agents = [{name = "foo", sampling_interval = "10s"}] + +[[database_writers]] +database_regex = ".*_bar" +agents = [{name = "bar", sampling_interval = "1m", count = 3}] +"#; + + let spec = DataSpec::from_str(toml).unwrap(); + let databases = vec![ + "foo_1".to_string(), + "foo_2".to_string(), + "asdf_bar".to_string(), + ]; + + let database_agents = spec.database_split_to_agents(&databases).unwrap(); + + let b = &database_agents[0]; + assert_eq!(b.database, &databases[0]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(10) + ); + assert_eq!(b.agent_assignments[0].count, 1); + assert_eq!(b.agent_assignments[0].spec.name, "foo"); + + let b = &database_agents[1]; + assert_eq!(b.database, &databases[1]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(10) + ); + assert_eq!(b.agent_assignments[0].count, 1); + assert_eq!(b.agent_assignments[0].spec.name, "foo"); + + let b = &database_agents[2]; + assert_eq!(b.database, &databases[2]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(60) + ); + assert_eq!(b.agent_assignments[0].count, 3); + assert_eq!(b.agent_assignments[0].spec.name, "bar"); + } + + #[test] + fn split_databases_by_writer_regex_and_ratio_error() { + let toml = r#" +name = "demo_schema" + +[[agents]] +name = "foo" +[[agents.measurements]] +name = "cpu" +[[agents.measurements.fields]] +name = "host" +template = "server" + +[[agents]] +name = "bar" +[[agents.measurements]] +name = "whatevs" +[[agents.measurements.fields]] +name = "val" +i64_range = [0, 10] + +[[database_writers]] +database_ratio = 0.8 +agents = [{name = "foo", sampling_interval = "10s"}] + +[[database_writers]] +database_regex = "foo.*" +agents = [{name = "bar", sampling_interval = "1m", count = 3}] +"#; + + let spec = DataSpec::from_str(toml).unwrap(); + let databases = vec!["a_1".to_string(), "a_2".to_string(), "b_1".to_string()]; + + let database_agents = spec.database_split_to_agents(&databases); + assert!(matches!( + database_agents.unwrap_err(), + Error::DatabaseWritersConfig + )); + } + + #[test] + fn split_databases_by_writer_ratio_defaults() { + let toml = r#" +name = "demo_schema" + +[[agents]] +name = "foo" +[[agents.measurements]] +name = "cpu" +[[agents.measurements.fields]] +name = "host" +template = "server" + +[[database_writers]] +agents = [{name = "foo", sampling_interval = "10s"}] +"#; + + let spec = DataSpec::from_str(toml).unwrap(); + let databases = vec!["a_1".to_string(), "a_2".to_string()]; + + let database_agents = spec.database_split_to_agents(&databases).unwrap(); + + let b = &database_agents[0]; + assert_eq!(b.database, &databases[0]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(10) + ); + assert_eq!(b.agent_assignments[0].count, 1); + assert_eq!(b.agent_assignments[0].spec.name, "foo"); + + let b = &database_agents[1]; + assert_eq!(b.database, &databases[1]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(10) + ); + assert_eq!(b.agent_assignments[0].count, 1); + assert_eq!(b.agent_assignments[0].spec.name, "foo"); + } } diff --git a/iox_data_generator/src/tag_set.rs b/iox_data_generator/src/tag_set.rs index c8db7ab3eb..8e244e9911 100644 --- a/iox_data_generator/src/tag_set.rs +++ b/iox_data_generator/src/tag_set.rs @@ -493,8 +493,7 @@ name = "cpu" name = "f1" i64_range = [0, 23] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); @@ -541,8 +540,7 @@ name = "cpu" name = "f1" i64_range = [0, 23] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); @@ -610,8 +608,8 @@ name = "cpu" name = "f1" i64_range = [0, 23] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] +database_ratio = 1.0 agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); diff --git a/iox_data_generator/src/write.rs b/iox_data_generator/src/write.rs index 8852baad33..c99e7f3429 100644 --- a/iox_data_generator/src/write.rs +++ b/iox_data_generator/src/write.rs @@ -342,8 +342,7 @@ name = "cpu" name = "val" i64_range = [3,3] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] agents = [{name = "foo", sampling_interval = "1s"}] "#; @@ -354,10 +353,7 @@ agents = [{name = "foo", sampling_interval = "1s"}] generate( &data_spec, - vec![OrgBucket { - org: "foo".to_string(), - bucket: "bar".to_string(), - }], + vec!["foo_bar".to_string()], &mut points_writer_builder, Some(now), Some(now), @@ -395,8 +391,7 @@ name = "cpu" name = "val" i64_range = [2, 2] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] agents = [{name = "foo", sampling_interval = "1s"}] "#; @@ -407,10 +402,7 @@ agents = [{name = "foo", sampling_interval = "1s"}] generate( &data_spec, - vec![OrgBucket { - org: "foo".to_string(), - bucket: "bar".to_string(), - }], + vec!["foo_bar".to_string()], &mut points_writer_builder, Some(now - 1_000_000_000), Some(now), diff --git a/perf/battery-0/datagen.toml b/perf/battery-0/datagen.toml index 7026fcfada..6bd4f6df64 100644 --- a/perf/battery-0/datagen.toml +++ b/perf/battery-0/datagen.toml @@ -13,6 +13,6 @@ tag_pairs = [ name = "usage_user" f64_range = [0.0, 100.0] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] +database_ratio = 1.0 agents = [{name = "foo", count = 3, sampling_interval = "10s"}] diff --git a/server/src/database.rs b/server/src/database.rs index c2685c803f..c69ced3fc7 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1061,10 +1061,21 @@ pub enum InitError { /// The Database startup state machine /// /// A Database starts in DatabaseState::Known and advances through the -/// states in sequential order until it reaches Initialized or an error -/// is encountered. +/// non error states in sequential order until either: +/// +/// 1. It reaches `Initialized` +/// +/// 2. It is reset to `Known` and starts initialization again +/// +/// 3. An error is encountered, in which case it transitions to one of +/// the error states. Most are Terminal (and thus require operator +/// intervention) but some (such as `WriteBufferCreationError`) may +/// resolve after some time to the basic initialization sequence +/// (e.g. `Initialized`) +/// #[derive(Debug, Clone)] enum DatabaseState { + // Basic initialization sequence states: Known(DatabaseStateKnown), DatabaseObjectStoreFound(DatabaseStateDatabaseObjectStoreFound), OwnerInfoLoaded(DatabaseStateOwnerInfoLoaded), @@ -1072,12 +1083,23 @@ enum DatabaseState { CatalogLoaded(DatabaseStateCatalogLoaded), Initialized(DatabaseStateInitialized), + // Error states + /// Terminal State DatabaseObjectStoreLookupError(DatabaseStateKnown, Arc), + /// Terminal State NoActiveDatabase(DatabaseStateKnown, Arc), + /// Terminal State OwnerInfoLoadError(DatabaseStateDatabaseObjectStoreFound, Arc), + /// Terminal State RulesLoadError(DatabaseStateOwnerInfoLoaded, Arc), + /// Terminal State CatalogLoadError(DatabaseStateRulesLoaded, Arc), + /// Non Terminal State: There was an error creating a connction to + /// the WriteBuffer, but the connection will be retried. If a + /// connection is successfully created, the database will + /// transition to `Initialized` WriteBufferCreationError(DatabaseStateCatalogLoaded, Arc), + /// Terminal State ReplayError(DatabaseStateCatalogLoaded, Arc), }