From 01e86a031ec75acae5dcbc2899600e168c18621c Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Wed, 8 Dec 2021 16:22:26 -0500 Subject: [PATCH 1/4] feat: add regex to bucket writers assignment in data generator This adds the ability to specify a regex to match against database names when specifying what agents should write to which buckets in the data generator. A default has also been added for ratio so that it doesn't need to be specified if only a single database writer is defined. --- Cargo.lock | 1 + iox_data_generator/Cargo.toml | 1 + .../benches/point_generation.rs | 3 +- iox_data_generator/src/specification.rs | 266 ++++++++++++++++-- 4 files changed, 242 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 31237308e4..6a1262c96c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1725,6 +1725,7 @@ dependencies = [ "influxdb_iox_client", "itertools", "rand", + "regex", "serde", "serde_json", "snafu", diff --git a/iox_data_generator/Cargo.toml b/iox_data_generator/Cargo.toml index 39329b9b86..9b2effefa5 100644 --- a/iox_data_generator/Cargo.toml +++ b/iox_data_generator/Cargo.toml @@ -17,6 +17,7 @@ influxdb2_client = { path = "../influxdb2_client" } influxdb_iox_client = { path = "../influxdb_iox_client" } itertools = "0.10.0" rand = { version = "0.8.3", features = ["small_rng"] } +regex = "1.5" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.72" snafu = "0.6.8" diff --git a/iox_data_generator/benches/point_generation.rs b/iox_data_generator/benches/point_generation.rs index c84afd8377..47422438cc 100644 --- a/iox_data_generator/benches/point_generation.rs +++ b/iox_data_generator/benches/point_generation.rs @@ -30,7 +30,8 @@ pub fn single_agent(c: &mut Criterion) { tag_pairs: vec![], }], bucket_writers: vec![BucketWriterSpec { - ratio: 1.0, + ratio: Some(1.0), + regex: None, agents: vec![AgentAssignmentSpec { name: "foo".to_string(), count: None, diff --git a/iox_data_generator/src/specification.rs b/iox_data_generator/src/specification.rs index 97ee2242ec..99d688369e 100644 --- a/iox_data_generator/src/specification.rs +++ b/iox_data_generator/src/specification.rs @@ -1,6 +1,7 @@ //! Reading and interpreting data generation specifications. use humantime::parse_duration; +use regex::Regex; use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; use std::{fs, ops::Range, str::FromStr, sync::Arc, time::Duration}; @@ -32,6 +33,17 @@ pub enum Error { agent ))] AgentNotFound { agent: String }, + + #[snafu(display("bucket_writers can only use ratio or regex, not both"))] + BucketWritersConfig, + + #[snafu(display( + "bucket_writer missing regex. If one uses a regex, all others must also use it" + ))] + RegexMissing, + + #[snafu(display("bucket_writers regex {} failed with error: {}", regex, source))] + RegexCompile { regex: String, source: regex::Error }, } type Result = std::result::Result; @@ -76,25 +88,10 @@ impl DataSpec { let mut bucket_agents = Vec::with_capacity(buckets.len()); let mut start = 0; + let use_ratio = + self.bucket_writers[0].ratio.is_some() || self.bucket_writers[0].regex.is_none(); + for w in &self.bucket_writers { - if start >= buckets.len() { - warn!( - "Bucket_writers percentages > 1.0. Writer {:?} and later skipped.", - w - ); - break; - } - - // validate the agents actually exist in the spec - for a in &w.agents { - let _ = self.agent_by_name(&a.name).unwrap_or_else(|| { - panic!( - "agent {} referenced in bucket writers, but isn't in the spec", - a.name - ) - }); - } - let agents: Vec<_> = w .agents .iter() @@ -115,19 +112,43 @@ impl DataSpec { .collect::>>()?; let agents = Arc::new(agents); - let mut end = (buckets.len() as f64 * w.ratio).ceil() as usize + start; - if end > buckets.len() { - end = buckets.len(); - } + let selected_buckets = if use_ratio { + if w.regex.is_some() { + return BucketWritersConfig.fail(); + } - for org_bucket in &buckets[start..end] { + if start >= buckets.len() { + warn!( + "Bucket_writers percentages > 1.0. Writer {:?} and later skipped.", + w + ); + break; + } + + let mut end = + (buckets.len() as f64 * w.ratio.unwrap_or(1.0)).ceil() as usize + start; + if end > buckets.len() { + end = buckets.len(); + } + + let selected_buckets = &buckets[start..end]; + start = end; + selected_buckets.iter().collect::>() + } else { + let p = w.regex.as_ref().context(RegexMissing)?; + let re = Regex::new(p).context(RegexCompile { regex: p })?; + buckets + .iter() + .filter(|b| re.is_match(&b.database_name())) + .collect::>() + }; + + for org_bucket in selected_buckets { bucket_agents.push(BucketAgents { org_bucket, agent_assignments: Arc::clone(&agents), }) } - - start = end; } Ok(bucket_agents) @@ -154,6 +175,12 @@ pub struct OrgBucket { pub bucket: String, } +impl OrgBucket { + fn database_name(&self) -> String { + format!("{}_{}", self.org, self.bucket) + } +} + #[derive(Debug)] /// Assignment info for an agent to a bucket pub struct AgentAssignment<'a> { @@ -236,8 +263,12 @@ pub struct TagSetsSpec { #[serde(deny_unknown_fields)] pub struct BucketWriterSpec { /// The ratio of buckets from the provided list that should use these agents. The - /// ratios of the collection of bucket writer specs should add up to 1.0. - pub ratio: f64, + /// ratios of the collection of bucket writer specs should add up to 1.0. If it + /// is not provided, ratio will be 1.0. + pub ratio: Option, + /// Regex to select buckets from the provided list. If regex is used in any one + /// of the bucket_writers, ratio will be ignored for all. So use either ratio or regex. + pub regex: Option, /// The agents that should be used to write to these databases. pub agents: Vec, } @@ -659,7 +690,7 @@ agents = [{name = "foo", sampling_interval = "10s"}] } #[test] - fn split_buckets_by_writer_spec() { + fn split_buckets_by_writer_spec_ratio() { let toml = r#" name = "demo_schema" @@ -732,4 +763,183 @@ agents = [{name = "bar", sampling_interval = "1m", count = 3}] assert_eq!(b.agent_assignments[0].count, 3); assert_eq!(b.agent_assignments[0].spec.name, "bar"); } + + #[test] + fn split_buckets_by_writer_spec_regex() { + let toml = r#" +name = "demo_schema" + +[[agents]] +name = "foo" +[[agents.measurements]] +name = "cpu" +[[agents.measurements.fields]] +name = "host" +template = "server" + +[[agents]] +name = "bar" +[[agents.measurements]] +name = "whatevs" +[[agents.measurements.fields]] +name = "val" +i64_range = [0, 10] + +[[bucket_writers]] +regex = "foo.*" +agents = [{name = "foo", sampling_interval = "10s"}] + +[[bucket_writers]] +regex = ".*_bar" +agents = [{name = "bar", sampling_interval = "1m", count = 3}] +"#; + + let spec = DataSpec::from_str(toml).unwrap(); + let buckets = vec![ + OrgBucket { + org: "foo".to_string(), + bucket: "1".to_string(), + }, + OrgBucket { + org: "foo".to_string(), + bucket: "2".to_string(), + }, + OrgBucket { + org: "asdf".to_string(), + bucket: "bar".to_string(), + }, + ]; + + let bucket_agents = spec.bucket_split_to_agents(&buckets).unwrap(); + + let b = &bucket_agents[0]; + assert_eq!(b.org_bucket, &buckets[0]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(10) + ); + assert_eq!(b.agent_assignments[0].count, 1); + assert_eq!(b.agent_assignments[0].spec.name, "foo"); + + let b = &bucket_agents[1]; + assert_eq!(b.org_bucket, &buckets[1]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(10) + ); + assert_eq!(b.agent_assignments[0].count, 1); + assert_eq!(b.agent_assignments[0].spec.name, "foo"); + + let b = &bucket_agents[2]; + assert_eq!(b.org_bucket, &buckets[2]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(60) + ); + assert_eq!(b.agent_assignments[0].count, 3); + assert_eq!(b.agent_assignments[0].spec.name, "bar"); + } + + #[test] + fn split_buckets_by_writer_regex_and_ratio_error() { + let toml = r#" +name = "demo_schema" + +[[agents]] +name = "foo" +[[agents.measurements]] +name = "cpu" +[[agents.measurements.fields]] +name = "host" +template = "server" + +[[agents]] +name = "bar" +[[agents.measurements]] +name = "whatevs" +[[agents.measurements.fields]] +name = "val" +i64_range = [0, 10] + +[[bucket_writers]] +ratio = 0.8 +agents = [{name = "foo", sampling_interval = "10s"}] + +[[bucket_writers]] +regex = "foo.*" +agents = [{name = "bar", sampling_interval = "1m", count = 3}] +"#; + + let spec = DataSpec::from_str(toml).unwrap(); + let buckets = vec![ + OrgBucket { + org: "a".to_string(), + bucket: "1".to_string(), + }, + OrgBucket { + org: "a".to_string(), + bucket: "2".to_string(), + }, + OrgBucket { + org: "b".to_string(), + bucket: "1".to_string(), + }, + ]; + + let bucket_agents = spec.bucket_split_to_agents(&buckets); + assert!(matches!( + bucket_agents.unwrap_err(), + Error::BucketWritersConfig + )); + } + + #[test] + fn split_buckets_by_writer_ratio_defaults() { + let toml = r#" +name = "demo_schema" + +[[agents]] +name = "foo" +[[agents.measurements]] +name = "cpu" +[[agents.measurements.fields]] +name = "host" +template = "server" + +[[bucket_writers]] +agents = [{name = "foo", sampling_interval = "10s"}] +"#; + + let spec = DataSpec::from_str(toml).unwrap(); + let buckets = vec![ + OrgBucket { + org: "a".to_string(), + bucket: "1".to_string(), + }, + OrgBucket { + org: "a".to_string(), + bucket: "2".to_string(), + }, + ]; + + let bucket_agents = spec.bucket_split_to_agents(&buckets).unwrap(); + + let b = &bucket_agents[0]; + assert_eq!(b.org_bucket, &buckets[0]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(10) + ); + assert_eq!(b.agent_assignments[0].count, 1); + assert_eq!(b.agent_assignments[0].spec.name, "foo"); + + let b = &bucket_agents[1]; + assert_eq!(b.org_bucket, &buckets[1]); + assert_eq!( + b.agent_assignments[0].sampling_interval, + Duration::from_secs(10) + ); + assert_eq!(b.agent_assignments[0].count, 1); + assert_eq!(b.agent_assignments[0].spec.name, "foo"); + } } From 8c88e1e52c7e1991dc856576d635648dc55f079d Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 9 Dec 2021 13:39:18 -0500 Subject: [PATCH 2/4] refactor: change orgbucket to database in data generator --- .../benches/point_generation.rs | 16 +- iox_data_generator/schemas/cap-write.toml | 4 +- iox_data_generator/schemas/full_example.toml | 16 +- .../schemas/storage_cardinality_example.toml | 4 +- iox_data_generator/schemas/tracing-spec.toml | 4 +- .../src/bin/iox_data_generator.rs | 33 +-- iox_data_generator/src/lib.rs | 36 ++- iox_data_generator/src/measurement.rs | 7 +- iox_data_generator/src/specification.rs | 274 ++++++++---------- iox_data_generator/src/tag_set.rs | 10 +- iox_data_generator/src/write.rs | 16 +- perf/battery-0/datagen.toml | 4 +- 12 files changed, 176 insertions(+), 248 deletions(-) diff --git a/iox_data_generator/benches/point_generation.rs b/iox_data_generator/benches/point_generation.rs index 47422438cc..55d67e26a0 100644 --- a/iox_data_generator/benches/point_generation.rs +++ b/iox_data_generator/benches/point_generation.rs @@ -1,6 +1,6 @@ use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use iox_data_generator::agent::Agent; -use iox_data_generator::specification::{AgentAssignmentSpec, BucketWriterSpec, OrgBucket}; +use iox_data_generator::specification::{AgentAssignmentSpec, DatabaseWriterSpec}; use iox_data_generator::{ specification::{AgentSpec, DataSpec, FieldSpec, FieldValueSpec, MeasurementSpec}, tag_set::GeneratedTagSets, @@ -29,9 +29,9 @@ pub fn single_agent(c: &mut Criterion) { has_one: vec![], tag_pairs: vec![], }], - bucket_writers: vec![BucketWriterSpec { - ratio: Some(1.0), - regex: None, + database_writers: vec![DatabaseWriterSpec { + database_ratio: Some(1.0), + database_regex: None, agents: vec![AgentAssignmentSpec { name: "foo".to_string(), count: None, @@ -56,10 +56,7 @@ pub fn single_agent(c: &mut Criterion) { b.iter(|| { let r = block_on(iox_data_generator::generate( &spec, - vec![OrgBucket { - org: "foo".to_string(), - bucket: "bar".to_string(), - }], + vec!["foo_bar".to_string()], &mut points_writer, start_datetime, end_datetime, @@ -154,8 +151,7 @@ tag_pairs = [ name = "gauge" i64_range = [1, 8147240] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] agents = [{name = "foo", sampling_interval = "1s", count = 3}] "#).unwrap(); diff --git a/iox_data_generator/schemas/cap-write.toml b/iox_data_generator/schemas/cap-write.toml index 4cbe75042a..167249877e 100644 --- a/iox_data_generator/schemas/cap-write.toml +++ b/iox_data_generator/schemas/cap-write.toml @@ -2,8 +2,8 @@ # https://github.com/influxdata/idpe/tree/e493a8e9b6b773e9374a8542ddcab7d8174d320d/performance/capacity/write name = "cap_write" -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] +database_ratio = 1.0 agents = [{name = "telegraf", count = 3, sampling_interval = "10s"}] [[agents]] diff --git a/iox_data_generator/schemas/full_example.toml b/iox_data_generator/schemas/full_example.toml index 170e814601..4794bfcd58 100644 --- a/iox_data_generator/schemas/full_example.toml +++ b/iox_data_generator/schemas/full_example.toml @@ -169,20 +169,20 @@ count = 2 name = "f1" bool = true -# bucket_writers specify how to split up the list of supplied buckets to write to. If +# database_writers specify how to split up the list of supplied buckets to write to. If # only a single one is specified via the CLI flags, then you'd want only a single bucket_writer # with a percent of 1.0. # # These make it possible to split up a large list of buckets to write to and send different # amounts of write load as well as different schemas through specifying different agents. -[[bucket_writers]] -# 20% of the buckets specified in the --bucket_list file will have these agents writing to them -ratio = 0.2 -# for each of those buckets, have 3 of the first_agent writing every 10s, and 1 of the another_example writing every minute. +[[database_writers]] +# the first 20% of the databases specified in the --bucket_list file will have these agents writing to them +database_ratio = 0.2 +# for each of those databases, have 3 of the first_agent writing every 10s, and 1 of the another_example writing every minute. agents = [{name = "first_agent", count = 3, sampling_interval = "10s"}, {name = "another_example", sampling_interval = "1m"}] -[[bucket_writers]] -# the remaining 80% of the buckeets specified will write using these agents -ratio = 0.8 +[[database_writers]] +# the remaining 80% of the databases specified will write using these agents +database_ratio = 0.8 # we'll only have a single agent of another_example for each database agents = [{name = "another_example", sampling_interval = "1s"}] diff --git a/iox_data_generator/schemas/storage_cardinality_example.toml b/iox_data_generator/schemas/storage_cardinality_example.toml index 10041f5720..8e8682c491 100644 --- a/iox_data_generator/schemas/storage_cardinality_example.toml +++ b/iox_data_generator/schemas/storage_cardinality_example.toml @@ -58,8 +58,8 @@ for_each = [ "partition_id", ] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] +database_ratio = 1.0 agents = [{name = "sender", sampling_interval = "10s"}] [[agents]] diff --git a/iox_data_generator/schemas/tracing-spec.toml b/iox_data_generator/schemas/tracing-spec.toml index 37b9a9c506..1df620d745 100644 --- a/iox_data_generator/schemas/tracing-spec.toml +++ b/iox_data_generator/schemas/tracing-spec.toml @@ -30,6 +30,6 @@ tag_pairs = [ name = "timing" f64_range = [0.0, 500.0] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] +database_ratio = 1.0 agents = [{name = "tracing_agent", sampling_interval = "1s"}] diff --git a/iox_data_generator/src/bin/iox_data_generator.rs b/iox_data_generator/src/bin/iox_data_generator.rs index d385ea1b3b..00e2bc4da5 100644 --- a/iox_data_generator/src/bin/iox_data_generator.rs +++ b/iox_data_generator/src/bin/iox_data_generator.rs @@ -13,10 +13,7 @@ use chrono::prelude::*; use chrono_english::{parse_date_string, Dialect}; use clap::{crate_authors, crate_version, App, Arg}; -use iox_data_generator::{ - specification::{DataSpec, OrgBucket}, - write::PointsWriterBuilder, -}; +use iox_data_generator::{specification::DataSpec, write::PointsWriterBuilder}; use std::fs::File; use std::io::{self, BufRead}; use tracing::info; @@ -97,8 +94,8 @@ Logging: .takes_value(true), ) .arg( - Arg::with_name("BUCKET_LIST") - .long("bucket_list") + Arg::with_name("DATABASE_LIST") + .long("database_list") .help("File name with a list of databases. 1 per line with _ format") .takes_value(true), ) @@ -203,34 +200,20 @@ Logging: let buckets = match ( matches.value_of("ORG"), matches.value_of("BUCKET"), - matches.value_of("BUCKET_LIST"), + matches.value_of("DATABASE_LIST"), ) { (Some(org), Some(bucket), None) => { - vec![OrgBucket { - org: org.to_string(), - bucket: bucket.to_string(), - }] + vec![format!("{}_{}", org, bucket)] } (None, None, Some(bucket_list)) => { - let f = File::open(bucket_list).expect("unable to open bucket_list file"); + let f = File::open(bucket_list).expect("unable to open database_list file"); io::BufReader::new(f) .lines() - .map(|l| { - let l = l.expect("unable to read line from bucket list"); - let parts = l.split('_').collect::>(); - if parts.len() != 2 { - panic!("error parsing org and bucket from {}", l); - } - - let org = parts[0].to_string(); - let bucket = parts[1].to_string(); - - OrgBucket { org, bucket } - }) + .map(|l| l.expect("unable to read database from database_list file")) .collect::>() } - _ => panic!("must specify either --org AND --bucket OR --bucket_list"), + _ => panic!("must specify either --org AND --bucket OR --database_list"), }; let result = iox_data_generator::generate( diff --git a/iox_data_generator/src/lib.rs b/iox_data_generator/src/lib.rs index a813de36b6..621ab4ee44 100644 --- a/iox_data_generator/src/lib.rs +++ b/iox_data_generator/src/lib.rs @@ -112,7 +112,7 @@ type Result = std::result::Result; #[allow(clippy::too_many_arguments)] pub async fn generate( spec: &specification::DataSpec, - buckets: Vec, + databases: Vec, points_writer_builder: &mut write::PointsWriterBuilder, start_datetime: Option, end_datetime: Option, @@ -123,8 +123,8 @@ pub async fn generate( ) -> Result { let mut handles = vec![]; - let bucket_agents = spec - .bucket_split_to_agents(&buckets) + let database_agents = spec + .database_split_to_agents(&databases) .context(CouldNotAssignAgents)?; let generated_tag_sets = GeneratedTagSets::from_spec(spec).context(CouldNotGenerateTagSets)?; @@ -133,8 +133,10 @@ pub async fn generate( let start = std::time::Instant::now(); - for bucket_agents in &bucket_agents { - for agent_assignment in bucket_agents.agent_assignments.iter() { + for database_assignments in &database_agents { + let (org, bucket) = org_and_bucket_from_database(database_assignments.database); + + for agent_assignment in database_assignments.agent_assignments.iter() { let agents = Agent::from_spec( agent_assignment.spec, agent_assignment.count, @@ -148,20 +150,16 @@ pub async fn generate( .context(CouldNotCreateAgent)?; info!( - "Configuring {} agents of \"{}\" to write data to org {} and bucket {}", + "Configuring {} agents of \"{}\" to write data to org {} and bucket {} (database {})", database_assignments.database, agent_assignment.count, agent_assignment.spec.name, - bucket_agents.org_bucket.org, - bucket_agents.org_bucket.bucket + org, + bucket, ); for mut agent in agents.into_iter() { let agent_points_writer = points_writer_builder - .build_for_agent( - &agent_assignment.spec.name, - &bucket_agents.org_bucket.org, - &bucket_agents.org_bucket.bucket, - ) + .build_for_agent(&agent_assignment.spec.name, org, bucket) .context(CouldNotCreateAgentWriter)?; let lock_ref = Arc::clone(&lock); @@ -210,6 +208,15 @@ pub fn now_ns() -> i64 { i64::try_from(since_the_epoch.as_nanos()).expect("Time does not fit") } +fn org_and_bucket_from_database(database: &str) -> (&str, &str) { + let parts = database.split('_').collect::>(); + if parts.len() != 2 { + panic!("error parsing org and bucket from {}", database); + } + + (parts[0], parts[1]) +} + #[cfg(test)] mod test { use super::*; @@ -236,8 +243,7 @@ name = "cpu" name = "val" i64_range = [1, 1] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] agents = [{name = "foo", sampling_interval = "10s"}] "#; let data_spec = DataSpec::from_str(toml).unwrap(); diff --git a/iox_data_generator/src/measurement.rs b/iox_data_generator/src/measurement.rs index 111c895106..d952fdd493 100644 --- a/iox_data_generator/src/measurement.rs +++ b/iox_data_generator/src/measurement.rs @@ -547,8 +547,7 @@ mod test { name = "val" i64_range = [3, 3] - [[bucket_writers]] - ratio = 1.0 + [[database_writers]] agents = [{name = "foo", sampling_interval = "10s"}]"#, ) .unwrap(); @@ -610,8 +609,8 @@ mod test { name = "val" i64_range = [3, 3] - [[bucket_writers]] - ratio = 1.0 + [[database_writers]] + database_ratio = 1.0 agents = [{name = "foo", sampling_interval = "10s"}]"#, ) .unwrap(); diff --git a/iox_data_generator/src/specification.rs b/iox_data_generator/src/specification.rs index 99d688369e..64f1829cee 100644 --- a/iox_data_generator/src/specification.rs +++ b/iox_data_generator/src/specification.rs @@ -29,20 +29,20 @@ pub enum Error { InvalidSamplingInterval { source: humantime::DurationError }, #[snafu(display( - "Agent {} referenced in bucket_writers, but not present in spec", + "Agent {} referenced in database_writers, but not present in spec", agent ))] AgentNotFound { agent: String }, - #[snafu(display("bucket_writers can only use ratio or regex, not both"))] - BucketWritersConfig, + #[snafu(display("database_writers can only use database_ratio or database_regex, not both"))] + DatabaseWritersConfig, #[snafu(display( - "bucket_writer missing regex. If one uses a regex, all others must also use it" + "database_writer missing database_regex. If one uses a regex, all others must also use it" ))] RegexMissing, - #[snafu(display("bucket_writers regex {} failed with error: {}", regex, source))] + #[snafu(display("database_writers regex {} failed with error: {}", regex, source))] RegexCompile { regex: String, source: regex::Error }, } @@ -69,8 +69,8 @@ pub struct DataSpec { pub tag_sets: Vec, /// The specification for the agents that can be used to write data to databases. pub agents: Vec, - /// The specification for writing to the provided list of buckets. - pub bucket_writers: Vec, + /// The specification for writing to the provided list of databases. + pub database_writers: Vec, } impl DataSpec { @@ -80,18 +80,25 @@ impl DataSpec { Self::from_str(&spec_toml) } - /// Given a collection of OrgBuckets, assign each a set of agents based on the spec - pub fn bucket_split_to_agents<'a>( + /// Given a collection of database names, assign each a set of agents based on the spec + pub fn database_split_to_agents<'a>( &'a self, - buckets: &'a [OrgBucket], - ) -> Result>> { - let mut bucket_agents = Vec::with_capacity(buckets.len()); + databases: &'a [String], + ) -> Result>> { + let mut database_agents = Vec::with_capacity(databases.len()); let mut start = 0; - let use_ratio = - self.bucket_writers[0].ratio.is_some() || self.bucket_writers[0].regex.is_none(); - for w in &self.bucket_writers { + // either all database writers must use regex or none of them can. It's either ratio or regex + // for assignment + let use_ratio = self.database_writers[0].database_regex.is_none(); + for b in &self.database_writers { + if use_ratio && b.database_regex.is_some() { + return DatabaseWritersConfig.fail(); + } + } + + for w in &self.database_writers { let agents: Vec<_> = w .agents .iter() @@ -112,46 +119,43 @@ impl DataSpec { .collect::>>()?; let agents = Arc::new(agents); - let selected_buckets = if use_ratio { - if w.regex.is_some() { - return BucketWritersConfig.fail(); - } - - if start >= buckets.len() { + let selected_databases = if use_ratio { + if start >= databases.len() { warn!( - "Bucket_writers percentages > 1.0. Writer {:?} and later skipped.", + "database_writers percentages > 1.0. Writer {:?} and later skipped.", w ); break; } - let mut end = - (buckets.len() as f64 * w.ratio.unwrap_or(1.0)).ceil() as usize + start; - if end > buckets.len() { - end = buckets.len(); + let mut end = (databases.len() as f64 * w.database_ratio.unwrap_or(1.0)).ceil() + as usize + + start; + if end > databases.len() { + end = databases.len(); } - let selected_buckets = &buckets[start..end]; + let selected_databases = databases[start..end].iter().collect::>(); start = end; - selected_buckets.iter().collect::>() + selected_databases } else { - let p = w.regex.as_ref().context(RegexMissing)?; + let p = w.database_regex.as_ref().context(RegexMissing)?; let re = Regex::new(p).context(RegexCompile { regex: p })?; - buckets + databases .iter() - .filter(|b| re.is_match(&b.database_name())) + .filter(|name| re.is_match(name)) .collect::>() }; - for org_bucket in selected_buckets { - bucket_agents.push(BucketAgents { - org_bucket, + for database in selected_databases { + database_agents.push(DatabaseAgents { + database, agent_assignments: Arc::clone(&agents), }) } } - Ok(bucket_agents) + Ok(database_agents) } /// Get the agent spec by its name @@ -166,38 +170,23 @@ impl DataSpec { } } -#[derive(Debug, Eq, PartialEq)] -/// Specifies an org and a bucket to write data to -pub struct OrgBucket { - /// The organization name - pub org: String, - /// The bucket name - pub bucket: String, -} - -impl OrgBucket { - fn database_name(&self) -> String { - format!("{}_{}", self.org, self.bucket) - } -} - #[derive(Debug)] -/// Assignment info for an agent to a bucket +/// Assignment info for an agent to a database pub struct AgentAssignment<'a> { - /// The agent specification for writing to the assigned bucket + /// The agent specification for writing to the assigned database pub spec: &'a AgentSpec, - /// The number of these agents that should be writing to the bucket + /// The number of these agents that should be writing to the database pub count: usize, /// The sampling interval agents will generate data on pub sampling_interval: Duration, } #[derive(Debug)] -/// Agent assignments mapped to a bucket -pub struct BucketAgents<'a> { - /// The organization and bucket data will get written to - pub org_bucket: &'a OrgBucket, - /// The agents specifications that will be writing to the bucket +/// Agent assignments mapped to a database +pub struct DatabaseAgents<'a> { + /// The database data will get written to + pub database: &'a str, + /// The agents specifications that will be writing to the database pub agent_assignments: Arc>>, } @@ -256,19 +245,28 @@ pub struct TagSetsSpec { pub for_each: Vec, } -/// The specification for what should be written to the list of provided org buckets. -/// Buckets will be written to by one or more agents with the given sampling interval and +/// The specification for what should be written to the list of provided databases. +/// Databases will be written to by one or more agents with the given sampling interval and /// agent count. #[derive(Deserialize, Debug)] #[serde(deny_unknown_fields)] -pub struct BucketWriterSpec { - /// The ratio of buckets from the provided list that should use these agents. The - /// ratios of the collection of bucket writer specs should add up to 1.0. If it - /// is not provided, ratio will be 1.0. - pub ratio: Option, - /// Regex to select buckets from the provided list. If regex is used in any one - /// of the bucket_writers, ratio will be ignored for all. So use either ratio or regex. - pub regex: Option, +pub struct DatabaseWriterSpec { + /// The ratio of databases from the provided list that should use these agents. The + /// ratios of the collection of database_writer specs should add up to 1.0. If ratio + /// is not provided it will default to 1.0 (useful for when you specify only a single + /// database_writer. + /// + /// The interval over the provided list of databases is the cumulative sum of the + /// previous ratios to this ratio. So if you have 10 input databases and 3 database_writers + /// with ratios (in order) of `[0.2, 0.4, and 0.6]` you would have the input list of + /// 10 databases split into these three based on their index in the list: `[0, 1]`, + /// `[2, 5]`, and `[6, 9]`. The first 2 databases, then the next 4, then the remaining 6. + /// + /// The list isn't shuffled as ratios are applied. + pub database_ratio: Option, + /// Regex to select databases from the provided list. If regex is used in any one + /// of the database_writers, it must be used for all of them. + pub database_regex: Option, /// The agents that should be used to write to these databases. pub agents: Vec, } @@ -665,8 +663,8 @@ name = "cpu" name = "host" template = "server" -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] +database_ratio = 1.0 agents = [{name = "foo", sampling_interval = "10s"}] "#; let spec = DataSpec::from_str(toml).unwrap(); @@ -690,7 +688,7 @@ agents = [{name = "foo", sampling_interval = "10s"}] } #[test] - fn split_buckets_by_writer_spec_ratio() { + fn split_databases_by_writer_spec_ratio() { let toml = r#" name = "demo_schema" @@ -710,34 +708,21 @@ name = "whatevs" name = "val" i64_range = [0, 10] -[[bucket_writers]] -ratio = 0.6 +[[database_writers]] +database_ratio = 0.6 agents = [{name = "foo", sampling_interval = "10s"}] -[[bucket_writers]] -ratio = 0.4 +[[database_writers]] +database_ratio = 0.4 agents = [{name = "bar", sampling_interval = "1m", count = 3}] "#; let spec = DataSpec::from_str(toml).unwrap(); - let buckets = vec![ - OrgBucket { - org: "a".to_string(), - bucket: "1".to_string(), - }, - OrgBucket { - org: "a".to_string(), - bucket: "2".to_string(), - }, - OrgBucket { - org: "b".to_string(), - bucket: "1".to_string(), - }, - ]; + let databases = vec!["a_1".to_string(), "a_2".to_string(), "b_1".to_string()]; - let bucket_agents = spec.bucket_split_to_agents(&buckets).unwrap(); + let database_agents = spec.database_split_to_agents(&databases).unwrap(); - let b = &bucket_agents[0]; - assert_eq!(b.org_bucket, &buckets[0]); + let b = &database_agents[0]; + assert_eq!(b.database, &databases[0]); assert_eq!( b.agent_assignments[0].sampling_interval, Duration::from_secs(10) @@ -745,8 +730,8 @@ agents = [{name = "bar", sampling_interval = "1m", count = 3}] assert_eq!(b.agent_assignments[0].count, 1); assert_eq!(b.agent_assignments[0].spec.name, "foo"); - let b = &bucket_agents[1]; - assert_eq!(b.org_bucket, &buckets[1]); + let b = &database_agents[1]; + assert_eq!(b.database, &databases[1]); assert_eq!( b.agent_assignments[0].sampling_interval, Duration::from_secs(10) @@ -754,8 +739,8 @@ agents = [{name = "bar", sampling_interval = "1m", count = 3}] assert_eq!(b.agent_assignments[0].count, 1); assert_eq!(b.agent_assignments[0].spec.name, "foo"); - let b = &bucket_agents[2]; - assert_eq!(b.org_bucket, &buckets[2]); + let b = &database_agents[2]; + assert_eq!(b.database, &databases[2]); assert_eq!( b.agent_assignments[0].sampling_interval, Duration::from_secs(60) @@ -765,7 +750,7 @@ agents = [{name = "bar", sampling_interval = "1m", count = 3}] } #[test] - fn split_buckets_by_writer_spec_regex() { + fn split_databases_by_writer_spec_regex() { let toml = r#" name = "demo_schema" @@ -785,35 +770,26 @@ name = "whatevs" name = "val" i64_range = [0, 10] -[[bucket_writers]] -regex = "foo.*" +[[database_writers]] +database_regex = "foo.*" agents = [{name = "foo", sampling_interval = "10s"}] -[[bucket_writers]] -regex = ".*_bar" +[[database_writers]] +database_regex = ".*_bar" agents = [{name = "bar", sampling_interval = "1m", count = 3}] "#; let spec = DataSpec::from_str(toml).unwrap(); - let buckets = vec![ - OrgBucket { - org: "foo".to_string(), - bucket: "1".to_string(), - }, - OrgBucket { - org: "foo".to_string(), - bucket: "2".to_string(), - }, - OrgBucket { - org: "asdf".to_string(), - bucket: "bar".to_string(), - }, + let databases = vec![ + "foo_1".to_string(), + "foo_2".to_string(), + "asdf_bar".to_string(), ]; - let bucket_agents = spec.bucket_split_to_agents(&buckets).unwrap(); + let database_agents = spec.database_split_to_agents(&databases).unwrap(); - let b = &bucket_agents[0]; - assert_eq!(b.org_bucket, &buckets[0]); + let b = &database_agents[0]; + assert_eq!(b.database, &databases[0]); assert_eq!( b.agent_assignments[0].sampling_interval, Duration::from_secs(10) @@ -821,8 +797,8 @@ agents = [{name = "bar", sampling_interval = "1m", count = 3}] assert_eq!(b.agent_assignments[0].count, 1); assert_eq!(b.agent_assignments[0].spec.name, "foo"); - let b = &bucket_agents[1]; - assert_eq!(b.org_bucket, &buckets[1]); + let b = &database_agents[1]; + assert_eq!(b.database, &databases[1]); assert_eq!( b.agent_assignments[0].sampling_interval, Duration::from_secs(10) @@ -830,8 +806,8 @@ agents = [{name = "bar", sampling_interval = "1m", count = 3}] assert_eq!(b.agent_assignments[0].count, 1); assert_eq!(b.agent_assignments[0].spec.name, "foo"); - let b = &bucket_agents[2]; - assert_eq!(b.org_bucket, &buckets[2]); + let b = &database_agents[2]; + assert_eq!(b.database, &databases[2]); assert_eq!( b.agent_assignments[0].sampling_interval, Duration::from_secs(60) @@ -841,7 +817,7 @@ agents = [{name = "bar", sampling_interval = "1m", count = 3}] } #[test] - fn split_buckets_by_writer_regex_and_ratio_error() { + fn split_databases_by_writer_regex_and_ratio_error() { let toml = r#" name = "demo_schema" @@ -861,40 +837,27 @@ name = "whatevs" name = "val" i64_range = [0, 10] -[[bucket_writers]] -ratio = 0.8 +[[database_writers]] +database_ratio = 0.8 agents = [{name = "foo", sampling_interval = "10s"}] -[[bucket_writers]] -regex = "foo.*" +[[database_writers]] +database_regex = "foo.*" agents = [{name = "bar", sampling_interval = "1m", count = 3}] "#; let spec = DataSpec::from_str(toml).unwrap(); - let buckets = vec![ - OrgBucket { - org: "a".to_string(), - bucket: "1".to_string(), - }, - OrgBucket { - org: "a".to_string(), - bucket: "2".to_string(), - }, - OrgBucket { - org: "b".to_string(), - bucket: "1".to_string(), - }, - ]; + let databases = vec!["a_1".to_string(), "a_2".to_string(), "b_1".to_string()]; - let bucket_agents = spec.bucket_split_to_agents(&buckets); + let database_agents = spec.database_split_to_agents(&databases); assert!(matches!( - bucket_agents.unwrap_err(), - Error::BucketWritersConfig + database_agents.unwrap_err(), + Error::DatabaseWritersConfig )); } #[test] - fn split_buckets_by_writer_ratio_defaults() { + fn split_databases_by_writer_ratio_defaults() { let toml = r#" name = "demo_schema" @@ -906,26 +869,17 @@ name = "cpu" name = "host" template = "server" -[[bucket_writers]] +[[database_writers]] agents = [{name = "foo", sampling_interval = "10s"}] "#; let spec = DataSpec::from_str(toml).unwrap(); - let buckets = vec![ - OrgBucket { - org: "a".to_string(), - bucket: "1".to_string(), - }, - OrgBucket { - org: "a".to_string(), - bucket: "2".to_string(), - }, - ]; + let databases = vec!["a_1".to_string(), "a_2".to_string()]; - let bucket_agents = spec.bucket_split_to_agents(&buckets).unwrap(); + let database_agents = spec.database_split_to_agents(&databases).unwrap(); - let b = &bucket_agents[0]; - assert_eq!(b.org_bucket, &buckets[0]); + let b = &database_agents[0]; + assert_eq!(b.database, &databases[0]); assert_eq!( b.agent_assignments[0].sampling_interval, Duration::from_secs(10) @@ -933,8 +887,8 @@ agents = [{name = "foo", sampling_interval = "10s"}] assert_eq!(b.agent_assignments[0].count, 1); assert_eq!(b.agent_assignments[0].spec.name, "foo"); - let b = &bucket_agents[1]; - assert_eq!(b.org_bucket, &buckets[1]); + let b = &database_agents[1]; + assert_eq!(b.database, &databases[1]); assert_eq!( b.agent_assignments[0].sampling_interval, Duration::from_secs(10) diff --git a/iox_data_generator/src/tag_set.rs b/iox_data_generator/src/tag_set.rs index c8db7ab3eb..8e244e9911 100644 --- a/iox_data_generator/src/tag_set.rs +++ b/iox_data_generator/src/tag_set.rs @@ -493,8 +493,7 @@ name = "cpu" name = "f1" i64_range = [0, 23] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); @@ -541,8 +540,7 @@ name = "cpu" name = "f1" i64_range = [0, 23] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); @@ -610,8 +608,8 @@ name = "cpu" name = "f1" i64_range = [0, 23] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] +database_ratio = 1.0 agents = [{name = "foo", sampling_interval = "10s"}]"#; let spec = DataSpec::from_str(toml).unwrap(); diff --git a/iox_data_generator/src/write.rs b/iox_data_generator/src/write.rs index 8852baad33..c99e7f3429 100644 --- a/iox_data_generator/src/write.rs +++ b/iox_data_generator/src/write.rs @@ -342,8 +342,7 @@ name = "cpu" name = "val" i64_range = [3,3] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] agents = [{name = "foo", sampling_interval = "1s"}] "#; @@ -354,10 +353,7 @@ agents = [{name = "foo", sampling_interval = "1s"}] generate( &data_spec, - vec![OrgBucket { - org: "foo".to_string(), - bucket: "bar".to_string(), - }], + vec!["foo_bar".to_string()], &mut points_writer_builder, Some(now), Some(now), @@ -395,8 +391,7 @@ name = "cpu" name = "val" i64_range = [2, 2] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] agents = [{name = "foo", sampling_interval = "1s"}] "#; @@ -407,10 +402,7 @@ agents = [{name = "foo", sampling_interval = "1s"}] generate( &data_spec, - vec![OrgBucket { - org: "foo".to_string(), - bucket: "bar".to_string(), - }], + vec!["foo_bar".to_string()], &mut points_writer_builder, Some(now - 1_000_000_000), Some(now), diff --git a/perf/battery-0/datagen.toml b/perf/battery-0/datagen.toml index 7026fcfada..6bd4f6df64 100644 --- a/perf/battery-0/datagen.toml +++ b/perf/battery-0/datagen.toml @@ -13,6 +13,6 @@ tag_pairs = [ name = "usage_user" f64_range = [0.0, 100.0] -[[bucket_writers]] -ratio = 1.0 +[[database_writers]] +database_ratio = 1.0 agents = [{name = "foo", count = 3, sampling_interval = "10s"}] From b2f7306d5a525df0e1f5ae959194a73bcb661b9a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 9 Dec 2021 14:22:29 -0500 Subject: [PATCH 3/4] docs: Update database startup machine diagram --- server/src/database.rs | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/server/src/database.rs b/server/src/database.rs index c2685c803f..a244a38dcf 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1061,10 +1061,21 @@ pub enum InitError { /// The Database startup state machine /// /// A Database starts in DatabaseState::Known and advances through the -/// states in sequential order until it reaches Initialized or an error -/// is encountered. +/// non error states in sequential order until either: +/// +/// 1. It reaches `Initialized` +/// +/// 2. It is reset to `Known` and starts initialization again +/// +/// 3. An error is encountered, in which case it transitions to one of +/// the error states. Most are Terminal (and thus require operator +/// intervention) but some (such as `WriteBufferCreationError` may +/// resolve after some time to the basic initialization sequence +/// (e.g. `Initialized`) +/// #[derive(Debug, Clone)] enum DatabaseState { + // Basic initialization sequence states: Known(DatabaseStateKnown), DatabaseObjectStoreFound(DatabaseStateDatabaseObjectStoreFound), OwnerInfoLoaded(DatabaseStateOwnerInfoLoaded), @@ -1072,12 +1083,23 @@ enum DatabaseState { CatalogLoaded(DatabaseStateCatalogLoaded), Initialized(DatabaseStateInitialized), + // Error states + /// Terminal State DatabaseObjectStoreLookupError(DatabaseStateKnown, Arc), + /// Terminal State NoActiveDatabase(DatabaseStateKnown, Arc), + /// Terminal State OwnerInfoLoadError(DatabaseStateDatabaseObjectStoreFound, Arc), + /// Terminal State RulesLoadError(DatabaseStateOwnerInfoLoaded, Arc), + /// Terminal State CatalogLoadError(DatabaseStateRulesLoaded, Arc), + /// Non Terminal State: There was an error creating a connction to + /// the WriteBuffer, but the connection will be retried. If a + /// connection is successfully created, the database will + /// transition to `Initialized` WriteBufferCreationError(DatabaseStateCatalogLoaded, Arc), + /// Terminal State ReplayError(DatabaseStateCatalogLoaded, Arc), } From 191e743ce0d7c21f7f37f898a49b05e049d1f2f7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 9 Dec 2021 15:04:39 -0500 Subject: [PATCH 4/4] fix: Update server/src/database.rs Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> --- server/src/database.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/database.rs b/server/src/database.rs index a244a38dcf..c69ced3fc7 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1069,7 +1069,7 @@ pub enum InitError { /// /// 3. An error is encountered, in which case it transitions to one of /// the error states. Most are Terminal (and thus require operator -/// intervention) but some (such as `WriteBufferCreationError` may +/// intervention) but some (such as `WriteBufferCreationError`) may /// resolve after some time to the basic initialization sequence /// (e.g. `Initialized`) ///