Merge branch 'main' into ntran/test_compact_os_delete_propagation
commit
2e3aa2e1ab
|
@ -1725,6 +1725,7 @@ dependencies = [
|
|||
"influxdb_iox_client",
|
||||
"itertools",
|
||||
"rand",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"snafu",
|
||||
|
|
|
@ -17,6 +17,7 @@ influxdb2_client = { path = "../influxdb2_client" }
|
|||
influxdb_iox_client = { path = "../influxdb_iox_client" }
|
||||
itertools = "0.10.0"
|
||||
rand = { version = "0.8.3", features = ["small_rng"] }
|
||||
regex = "1.5"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0.72"
|
||||
snafu = "0.6.8"
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
|
||||
use iox_data_generator::agent::Agent;
|
||||
use iox_data_generator::specification::{AgentAssignmentSpec, BucketWriterSpec, OrgBucket};
|
||||
use iox_data_generator::specification::{AgentAssignmentSpec, DatabaseWriterSpec};
|
||||
use iox_data_generator::{
|
||||
specification::{AgentSpec, DataSpec, FieldSpec, FieldValueSpec, MeasurementSpec},
|
||||
tag_set::GeneratedTagSets,
|
||||
|
@ -29,8 +29,9 @@ pub fn single_agent(c: &mut Criterion) {
|
|||
has_one: vec![],
|
||||
tag_pairs: vec![],
|
||||
}],
|
||||
bucket_writers: vec![BucketWriterSpec {
|
||||
ratio: 1.0,
|
||||
database_writers: vec![DatabaseWriterSpec {
|
||||
database_ratio: Some(1.0),
|
||||
database_regex: None,
|
||||
agents: vec![AgentAssignmentSpec {
|
||||
name: "foo".to_string(),
|
||||
count: None,
|
||||
|
@ -55,10 +56,7 @@ pub fn single_agent(c: &mut Criterion) {
|
|||
b.iter(|| {
|
||||
let r = block_on(iox_data_generator::generate(
|
||||
&spec,
|
||||
vec![OrgBucket {
|
||||
org: "foo".to_string(),
|
||||
bucket: "bar".to_string(),
|
||||
}],
|
||||
vec!["foo_bar".to_string()],
|
||||
&mut points_writer,
|
||||
start_datetime,
|
||||
end_datetime,
|
||||
|
@ -153,8 +151,7 @@ tag_pairs = [
|
|||
name = "gauge"
|
||||
i64_range = [1, 8147240]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
[[database_writers]]
|
||||
agents = [{name = "foo", sampling_interval = "1s", count = 3}]
|
||||
"#).unwrap();
|
||||
|
||||
|
|
|
@ -2,8 +2,8 @@
|
|||
# https://github.com/influxdata/idpe/tree/e493a8e9b6b773e9374a8542ddcab7d8174d320d/performance/capacity/write
|
||||
name = "cap_write"
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
[[database_writers]]
|
||||
database_ratio = 1.0
|
||||
agents = [{name = "telegraf", count = 3, sampling_interval = "10s"}]
|
||||
|
||||
[[agents]]
|
||||
|
|
|
@ -169,20 +169,20 @@ count = 2
|
|||
name = "f1"
|
||||
bool = true
|
||||
|
||||
# bucket_writers specify how to split up the list of supplied buckets to write to. If
|
||||
# database_writers specify how to split up the list of supplied buckets to write to. If
|
||||
# only a single one is specified via the CLI flags, then you'd want only a single bucket_writer
|
||||
# with a percent of 1.0.
|
||||
#
|
||||
# These make it possible to split up a large list of buckets to write to and send different
|
||||
# amounts of write load as well as different schemas through specifying different agents.
|
||||
[[bucket_writers]]
|
||||
# 20% of the buckets specified in the --bucket_list file will have these agents writing to them
|
||||
ratio = 0.2
|
||||
# for each of those buckets, have 3 of the first_agent writing every 10s, and 1 of the another_example writing every minute.
|
||||
[[database_writers]]
|
||||
# the first 20% of the databases specified in the --bucket_list file will have these agents writing to them
|
||||
database_ratio = 0.2
|
||||
# for each of those databases, have 3 of the first_agent writing every 10s, and 1 of the another_example writing every minute.
|
||||
agents = [{name = "first_agent", count = 3, sampling_interval = "10s"}, {name = "another_example", sampling_interval = "1m"}]
|
||||
|
||||
[[bucket_writers]]
|
||||
# the remaining 80% of the buckeets specified will write using these agents
|
||||
ratio = 0.8
|
||||
[[database_writers]]
|
||||
# the remaining 80% of the databases specified will write using these agents
|
||||
database_ratio = 0.8
|
||||
# we'll only have a single agent of another_example for each database
|
||||
agents = [{name = "another_example", sampling_interval = "1s"}]
|
||||
|
|
|
@ -58,8 +58,8 @@ for_each = [
|
|||
"partition_id",
|
||||
]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
[[database_writers]]
|
||||
database_ratio = 1.0
|
||||
agents = [{name = "sender", sampling_interval = "10s"}]
|
||||
|
||||
[[agents]]
|
||||
|
|
|
@ -30,6 +30,6 @@ tag_pairs = [
|
|||
name = "timing"
|
||||
f64_range = [0.0, 500.0]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
[[database_writers]]
|
||||
database_ratio = 1.0
|
||||
agents = [{name = "tracing_agent", sampling_interval = "1s"}]
|
||||
|
|
|
@ -13,10 +13,7 @@
|
|||
use chrono::prelude::*;
|
||||
use chrono_english::{parse_date_string, Dialect};
|
||||
use clap::{crate_authors, crate_version, App, Arg};
|
||||
use iox_data_generator::{
|
||||
specification::{DataSpec, OrgBucket},
|
||||
write::PointsWriterBuilder,
|
||||
};
|
||||
use iox_data_generator::{specification::DataSpec, write::PointsWriterBuilder};
|
||||
use std::fs::File;
|
||||
use std::io::{self, BufRead};
|
||||
use tracing::info;
|
||||
|
@ -97,8 +94,8 @@ Logging:
|
|||
.takes_value(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("BUCKET_LIST")
|
||||
.long("bucket_list")
|
||||
Arg::with_name("DATABASE_LIST")
|
||||
.long("database_list")
|
||||
.help("File name with a list of databases. 1 per line with <org>_<bucket> format")
|
||||
.takes_value(true),
|
||||
)
|
||||
|
@ -203,34 +200,20 @@ Logging:
|
|||
let buckets = match (
|
||||
matches.value_of("ORG"),
|
||||
matches.value_of("BUCKET"),
|
||||
matches.value_of("BUCKET_LIST"),
|
||||
matches.value_of("DATABASE_LIST"),
|
||||
) {
|
||||
(Some(org), Some(bucket), None) => {
|
||||
vec![OrgBucket {
|
||||
org: org.to_string(),
|
||||
bucket: bucket.to_string(),
|
||||
}]
|
||||
vec![format!("{}_{}", org, bucket)]
|
||||
}
|
||||
(None, None, Some(bucket_list)) => {
|
||||
let f = File::open(bucket_list).expect("unable to open bucket_list file");
|
||||
let f = File::open(bucket_list).expect("unable to open database_list file");
|
||||
|
||||
io::BufReader::new(f)
|
||||
.lines()
|
||||
.map(|l| {
|
||||
let l = l.expect("unable to read line from bucket list");
|
||||
let parts = l.split('_').collect::<Vec<_>>();
|
||||
if parts.len() != 2 {
|
||||
panic!("error parsing org and bucket from {}", l);
|
||||
}
|
||||
|
||||
let org = parts[0].to_string();
|
||||
let bucket = parts[1].to_string();
|
||||
|
||||
OrgBucket { org, bucket }
|
||||
})
|
||||
.map(|l| l.expect("unable to read database from database_list file"))
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
_ => panic!("must specify either --org AND --bucket OR --bucket_list"),
|
||||
_ => panic!("must specify either --org AND --bucket OR --database_list"),
|
||||
};
|
||||
|
||||
let result = iox_data_generator::generate(
|
||||
|
|
|
@ -112,7 +112,7 @@ type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn generate(
|
||||
spec: &specification::DataSpec,
|
||||
buckets: Vec<specification::OrgBucket>,
|
||||
databases: Vec<String>,
|
||||
points_writer_builder: &mut write::PointsWriterBuilder,
|
||||
start_datetime: Option<i64>,
|
||||
end_datetime: Option<i64>,
|
||||
|
@ -123,8 +123,8 @@ pub async fn generate(
|
|||
) -> Result<usize> {
|
||||
let mut handles = vec![];
|
||||
|
||||
let bucket_agents = spec
|
||||
.bucket_split_to_agents(&buckets)
|
||||
let database_agents = spec
|
||||
.database_split_to_agents(&databases)
|
||||
.context(CouldNotAssignAgents)?;
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::from_spec(spec).context(CouldNotGenerateTagSets)?;
|
||||
|
@ -133,8 +133,10 @@ pub async fn generate(
|
|||
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
for bucket_agents in &bucket_agents {
|
||||
for agent_assignment in bucket_agents.agent_assignments.iter() {
|
||||
for database_assignments in &database_agents {
|
||||
let (org, bucket) = org_and_bucket_from_database(database_assignments.database);
|
||||
|
||||
for agent_assignment in database_assignments.agent_assignments.iter() {
|
||||
let agents = Agent::from_spec(
|
||||
agent_assignment.spec,
|
||||
agent_assignment.count,
|
||||
|
@ -148,20 +150,16 @@ pub async fn generate(
|
|||
.context(CouldNotCreateAgent)?;
|
||||
|
||||
info!(
|
||||
"Configuring {} agents of \"{}\" to write data to org {} and bucket {}",
|
||||
"Configuring {} agents of \"{}\" to write data to org {} and bucket {} (database {})", database_assignments.database,
|
||||
agent_assignment.count,
|
||||
agent_assignment.spec.name,
|
||||
bucket_agents.org_bucket.org,
|
||||
bucket_agents.org_bucket.bucket
|
||||
org,
|
||||
bucket,
|
||||
);
|
||||
|
||||
for mut agent in agents.into_iter() {
|
||||
let agent_points_writer = points_writer_builder
|
||||
.build_for_agent(
|
||||
&agent_assignment.spec.name,
|
||||
&bucket_agents.org_bucket.org,
|
||||
&bucket_agents.org_bucket.bucket,
|
||||
)
|
||||
.build_for_agent(&agent_assignment.spec.name, org, bucket)
|
||||
.context(CouldNotCreateAgentWriter)?;
|
||||
|
||||
let lock_ref = Arc::clone(&lock);
|
||||
|
@ -210,6 +208,15 @@ pub fn now_ns() -> i64 {
|
|||
i64::try_from(since_the_epoch.as_nanos()).expect("Time does not fit")
|
||||
}
|
||||
|
||||
fn org_and_bucket_from_database(database: &str) -> (&str, &str) {
|
||||
let parts = database.split('_').collect::<Vec<_>>();
|
||||
if parts.len() != 2 {
|
||||
panic!("error parsing org and bucket from {}", database);
|
||||
}
|
||||
|
||||
(parts[0], parts[1])
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
@ -236,8 +243,7 @@ name = "cpu"
|
|||
name = "val"
|
||||
i64_range = [1, 1]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
[[database_writers]]
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]
|
||||
"#;
|
||||
let data_spec = DataSpec::from_str(toml).unwrap();
|
||||
|
|
|
@ -547,8 +547,7 @@ mod test {
|
|||
name = "val"
|
||||
i64_range = [3, 3]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
[[database_writers]]
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
@ -610,8 +609,8 @@ mod test {
|
|||
name = "val"
|
||||
i64_range = [3, 3]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
[[database_writers]]
|
||||
database_ratio = 1.0
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
//! Reading and interpreting data generation specifications.
|
||||
|
||||
use humantime::parse_duration;
|
||||
use regex::Regex;
|
||||
use serde::Deserialize;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::{fs, ops::Range, str::FromStr, sync::Arc, time::Duration};
|
||||
|
@ -28,10 +29,21 @@ pub enum Error {
|
|||
InvalidSamplingInterval { source: humantime::DurationError },
|
||||
|
||||
#[snafu(display(
|
||||
"Agent {} referenced in bucket_writers, but not present in spec",
|
||||
"Agent {} referenced in database_writers, but not present in spec",
|
||||
agent
|
||||
))]
|
||||
AgentNotFound { agent: String },
|
||||
|
||||
#[snafu(display("database_writers can only use database_ratio or database_regex, not both"))]
|
||||
DatabaseWritersConfig,
|
||||
|
||||
#[snafu(display(
|
||||
"database_writer missing database_regex. If one uses a regex, all others must also use it"
|
||||
))]
|
||||
RegexMissing,
|
||||
|
||||
#[snafu(display("database_writers regex {} failed with error: {}", regex, source))]
|
||||
RegexCompile { regex: String, source: regex::Error },
|
||||
}
|
||||
|
||||
type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -57,8 +69,8 @@ pub struct DataSpec {
|
|||
pub tag_sets: Vec<TagSetsSpec>,
|
||||
/// The specification for the agents that can be used to write data to databases.
|
||||
pub agents: Vec<AgentSpec>,
|
||||
/// The specification for writing to the provided list of buckets.
|
||||
pub bucket_writers: Vec<BucketWriterSpec>,
|
||||
/// The specification for writing to the provided list of databases.
|
||||
pub database_writers: Vec<DatabaseWriterSpec>,
|
||||
}
|
||||
|
||||
impl DataSpec {
|
||||
|
@ -68,33 +80,25 @@ impl DataSpec {
|
|||
Self::from_str(&spec_toml)
|
||||
}
|
||||
|
||||
/// Given a collection of OrgBuckets, assign each a set of agents based on the spec
|
||||
pub fn bucket_split_to_agents<'a>(
|
||||
/// Given a collection of database names, assign each a set of agents based on the spec
|
||||
pub fn database_split_to_agents<'a>(
|
||||
&'a self,
|
||||
buckets: &'a [OrgBucket],
|
||||
) -> Result<Vec<BucketAgents<'a>>> {
|
||||
let mut bucket_agents = Vec::with_capacity(buckets.len());
|
||||
databases: &'a [String],
|
||||
) -> Result<Vec<DatabaseAgents<'a>>> {
|
||||
let mut database_agents = Vec::with_capacity(databases.len());
|
||||
|
||||
let mut start = 0;
|
||||
for w in &self.bucket_writers {
|
||||
if start >= buckets.len() {
|
||||
warn!(
|
||||
"Bucket_writers percentages > 1.0. Writer {:?} and later skipped.",
|
||||
w
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
// validate the agents actually exist in the spec
|
||||
for a in &w.agents {
|
||||
let _ = self.agent_by_name(&a.name).unwrap_or_else(|| {
|
||||
panic!(
|
||||
"agent {} referenced in bucket writers, but isn't in the spec",
|
||||
a.name
|
||||
)
|
||||
});
|
||||
|
||||
// either all database writers must use regex or none of them can. It's either ratio or regex
|
||||
// for assignment
|
||||
let use_ratio = self.database_writers[0].database_regex.is_none();
|
||||
for b in &self.database_writers {
|
||||
if use_ratio && b.database_regex.is_some() {
|
||||
return DatabaseWritersConfig.fail();
|
||||
}
|
||||
}
|
||||
|
||||
for w in &self.database_writers {
|
||||
let agents: Vec<_> = w
|
||||
.agents
|
||||
.iter()
|
||||
|
@ -115,22 +119,43 @@ impl DataSpec {
|
|||
.collect::<Result<Vec<_>>>()?;
|
||||
let agents = Arc::new(agents);
|
||||
|
||||
let mut end = (buckets.len() as f64 * w.ratio).ceil() as usize + start;
|
||||
if end > buckets.len() {
|
||||
end = buckets.len();
|
||||
let selected_databases = if use_ratio {
|
||||
if start >= databases.len() {
|
||||
warn!(
|
||||
"database_writers percentages > 1.0. Writer {:?} and later skipped.",
|
||||
w
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
for org_bucket in &buckets[start..end] {
|
||||
bucket_agents.push(BucketAgents {
|
||||
org_bucket,
|
||||
let mut end = (databases.len() as f64 * w.database_ratio.unwrap_or(1.0)).ceil()
|
||||
as usize
|
||||
+ start;
|
||||
if end > databases.len() {
|
||||
end = databases.len();
|
||||
}
|
||||
|
||||
let selected_databases = databases[start..end].iter().collect::<Vec<_>>();
|
||||
start = end;
|
||||
selected_databases
|
||||
} else {
|
||||
let p = w.database_regex.as_ref().context(RegexMissing)?;
|
||||
let re = Regex::new(p).context(RegexCompile { regex: p })?;
|
||||
databases
|
||||
.iter()
|
||||
.filter(|name| re.is_match(name))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
for database in selected_databases {
|
||||
database_agents.push(DatabaseAgents {
|
||||
database,
|
||||
agent_assignments: Arc::clone(&agents),
|
||||
})
|
||||
}
|
||||
|
||||
start = end;
|
||||
}
|
||||
|
||||
Ok(bucket_agents)
|
||||
Ok(database_agents)
|
||||
}
|
||||
|
||||
/// Get the agent spec by its name
|
||||
|
@ -145,32 +170,23 @@ impl DataSpec {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
/// Specifies an org and a bucket to write data to
|
||||
pub struct OrgBucket {
|
||||
/// The organization name
|
||||
pub org: String,
|
||||
/// The bucket name
|
||||
pub bucket: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Assignment info for an agent to a bucket
|
||||
/// Assignment info for an agent to a database
|
||||
pub struct AgentAssignment<'a> {
|
||||
/// The agent specification for writing to the assigned bucket
|
||||
/// The agent specification for writing to the assigned database
|
||||
pub spec: &'a AgentSpec,
|
||||
/// The number of these agents that should be writing to the bucket
|
||||
/// The number of these agents that should be writing to the database
|
||||
pub count: usize,
|
||||
/// The sampling interval agents will generate data on
|
||||
pub sampling_interval: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Agent assignments mapped to a bucket
|
||||
pub struct BucketAgents<'a> {
|
||||
/// The organization and bucket data will get written to
|
||||
pub org_bucket: &'a OrgBucket,
|
||||
/// The agents specifications that will be writing to the bucket
|
||||
/// Agent assignments mapped to a database
|
||||
pub struct DatabaseAgents<'a> {
|
||||
/// The database data will get written to
|
||||
pub database: &'a str,
|
||||
/// The agents specifications that will be writing to the database
|
||||
pub agent_assignments: Arc<Vec<AgentAssignment<'a>>>,
|
||||
}
|
||||
|
||||
|
@ -229,15 +245,28 @@ pub struct TagSetsSpec {
|
|||
pub for_each: Vec<String>,
|
||||
}
|
||||
|
||||
/// The specification for what should be written to the list of provided org buckets.
|
||||
/// Buckets will be written to by one or more agents with the given sampling interval and
|
||||
/// The specification for what should be written to the list of provided databases.
|
||||
/// Databases will be written to by one or more agents with the given sampling interval and
|
||||
/// agent count.
|
||||
#[derive(Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct BucketWriterSpec {
|
||||
/// The ratio of buckets from the provided list that should use these agents. The
|
||||
/// ratios of the collection of bucket writer specs should add up to 1.0.
|
||||
pub ratio: f64,
|
||||
pub struct DatabaseWriterSpec {
|
||||
/// The ratio of databases from the provided list that should use these agents. The
|
||||
/// ratios of the collection of database_writer specs should add up to 1.0. If ratio
|
||||
/// is not provided it will default to 1.0 (useful for when you specify only a single
|
||||
/// database_writer.
|
||||
///
|
||||
/// The interval over the provided list of databases is the cumulative sum of the
|
||||
/// previous ratios to this ratio. So if you have 10 input databases and 3 database_writers
|
||||
/// with ratios (in order) of `[0.2, 0.4, and 0.6]` you would have the input list of
|
||||
/// 10 databases split into these three based on their index in the list: `[0, 1]`,
|
||||
/// `[2, 5]`, and `[6, 9]`. The first 2 databases, then the next 4, then the remaining 6.
|
||||
///
|
||||
/// The list isn't shuffled as ratios are applied.
|
||||
pub database_ratio: Option<f64>,
|
||||
/// Regex to select databases from the provided list. If regex is used in any one
|
||||
/// of the database_writers, it must be used for all of them.
|
||||
pub database_regex: Option<String>,
|
||||
/// The agents that should be used to write to these databases.
|
||||
pub agents: Vec<AgentAssignmentSpec>,
|
||||
}
|
||||
|
@ -634,8 +663,8 @@ name = "cpu"
|
|||
name = "host"
|
||||
template = "server"
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
[[database_writers]]
|
||||
database_ratio = 1.0
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]
|
||||
"#;
|
||||
let spec = DataSpec::from_str(toml).unwrap();
|
||||
|
@ -659,7 +688,7 @@ agents = [{name = "foo", sampling_interval = "10s"}]
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn split_buckets_by_writer_spec() {
|
||||
fn split_databases_by_writer_spec_ratio() {
|
||||
let toml = r#"
|
||||
name = "demo_schema"
|
||||
|
||||
|
@ -679,34 +708,21 @@ name = "whatevs"
|
|||
name = "val"
|
||||
i64_range = [0, 10]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 0.6
|
||||
[[database_writers]]
|
||||
database_ratio = 0.6
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 0.4
|
||||
[[database_writers]]
|
||||
database_ratio = 0.4
|
||||
agents = [{name = "bar", sampling_interval = "1m", count = 3}]
|
||||
"#;
|
||||
let spec = DataSpec::from_str(toml).unwrap();
|
||||
let buckets = vec![
|
||||
OrgBucket {
|
||||
org: "a".to_string(),
|
||||
bucket: "1".to_string(),
|
||||
},
|
||||
OrgBucket {
|
||||
org: "a".to_string(),
|
||||
bucket: "2".to_string(),
|
||||
},
|
||||
OrgBucket {
|
||||
org: "b".to_string(),
|
||||
bucket: "1".to_string(),
|
||||
},
|
||||
];
|
||||
let databases = vec!["a_1".to_string(), "a_2".to_string(), "b_1".to_string()];
|
||||
|
||||
let bucket_agents = spec.bucket_split_to_agents(&buckets).unwrap();
|
||||
let database_agents = spec.database_split_to_agents(&databases).unwrap();
|
||||
|
||||
let b = &bucket_agents[0];
|
||||
assert_eq!(b.org_bucket, &buckets[0]);
|
||||
let b = &database_agents[0];
|
||||
assert_eq!(b.database, &databases[0]);
|
||||
assert_eq!(
|
||||
b.agent_assignments[0].sampling_interval,
|
||||
Duration::from_secs(10)
|
||||
|
@ -714,8 +730,8 @@ agents = [{name = "bar", sampling_interval = "1m", count = 3}]
|
|||
assert_eq!(b.agent_assignments[0].count, 1);
|
||||
assert_eq!(b.agent_assignments[0].spec.name, "foo");
|
||||
|
||||
let b = &bucket_agents[1];
|
||||
assert_eq!(b.org_bucket, &buckets[1]);
|
||||
let b = &database_agents[1];
|
||||
assert_eq!(b.database, &databases[1]);
|
||||
assert_eq!(
|
||||
b.agent_assignments[0].sampling_interval,
|
||||
Duration::from_secs(10)
|
||||
|
@ -723,8 +739,8 @@ agents = [{name = "bar", sampling_interval = "1m", count = 3}]
|
|||
assert_eq!(b.agent_assignments[0].count, 1);
|
||||
assert_eq!(b.agent_assignments[0].spec.name, "foo");
|
||||
|
||||
let b = &bucket_agents[2];
|
||||
assert_eq!(b.org_bucket, &buckets[2]);
|
||||
let b = &database_agents[2];
|
||||
assert_eq!(b.database, &databases[2]);
|
||||
assert_eq!(
|
||||
b.agent_assignments[0].sampling_interval,
|
||||
Duration::from_secs(60)
|
||||
|
@ -732,4 +748,152 @@ agents = [{name = "bar", sampling_interval = "1m", count = 3}]
|
|||
assert_eq!(b.agent_assignments[0].count, 3);
|
||||
assert_eq!(b.agent_assignments[0].spec.name, "bar");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn split_databases_by_writer_spec_regex() {
|
||||
let toml = r#"
|
||||
name = "demo_schema"
|
||||
|
||||
[[agents]]
|
||||
name = "foo"
|
||||
[[agents.measurements]]
|
||||
name = "cpu"
|
||||
[[agents.measurements.fields]]
|
||||
name = "host"
|
||||
template = "server"
|
||||
|
||||
[[agents]]
|
||||
name = "bar"
|
||||
[[agents.measurements]]
|
||||
name = "whatevs"
|
||||
[[agents.measurements.fields]]
|
||||
name = "val"
|
||||
i64_range = [0, 10]
|
||||
|
||||
[[database_writers]]
|
||||
database_regex = "foo.*"
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]
|
||||
|
||||
[[database_writers]]
|
||||
database_regex = ".*_bar"
|
||||
agents = [{name = "bar", sampling_interval = "1m", count = 3}]
|
||||
"#;
|
||||
|
||||
let spec = DataSpec::from_str(toml).unwrap();
|
||||
let databases = vec![
|
||||
"foo_1".to_string(),
|
||||
"foo_2".to_string(),
|
||||
"asdf_bar".to_string(),
|
||||
];
|
||||
|
||||
let database_agents = spec.database_split_to_agents(&databases).unwrap();
|
||||
|
||||
let b = &database_agents[0];
|
||||
assert_eq!(b.database, &databases[0]);
|
||||
assert_eq!(
|
||||
b.agent_assignments[0].sampling_interval,
|
||||
Duration::from_secs(10)
|
||||
);
|
||||
assert_eq!(b.agent_assignments[0].count, 1);
|
||||
assert_eq!(b.agent_assignments[0].spec.name, "foo");
|
||||
|
||||
let b = &database_agents[1];
|
||||
assert_eq!(b.database, &databases[1]);
|
||||
assert_eq!(
|
||||
b.agent_assignments[0].sampling_interval,
|
||||
Duration::from_secs(10)
|
||||
);
|
||||
assert_eq!(b.agent_assignments[0].count, 1);
|
||||
assert_eq!(b.agent_assignments[0].spec.name, "foo");
|
||||
|
||||
let b = &database_agents[2];
|
||||
assert_eq!(b.database, &databases[2]);
|
||||
assert_eq!(
|
||||
b.agent_assignments[0].sampling_interval,
|
||||
Duration::from_secs(60)
|
||||
);
|
||||
assert_eq!(b.agent_assignments[0].count, 3);
|
||||
assert_eq!(b.agent_assignments[0].spec.name, "bar");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn split_databases_by_writer_regex_and_ratio_error() {
|
||||
let toml = r#"
|
||||
name = "demo_schema"
|
||||
|
||||
[[agents]]
|
||||
name = "foo"
|
||||
[[agents.measurements]]
|
||||
name = "cpu"
|
||||
[[agents.measurements.fields]]
|
||||
name = "host"
|
||||
template = "server"
|
||||
|
||||
[[agents]]
|
||||
name = "bar"
|
||||
[[agents.measurements]]
|
||||
name = "whatevs"
|
||||
[[agents.measurements.fields]]
|
||||
name = "val"
|
||||
i64_range = [0, 10]
|
||||
|
||||
[[database_writers]]
|
||||
database_ratio = 0.8
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]
|
||||
|
||||
[[database_writers]]
|
||||
database_regex = "foo.*"
|
||||
agents = [{name = "bar", sampling_interval = "1m", count = 3}]
|
||||
"#;
|
||||
|
||||
let spec = DataSpec::from_str(toml).unwrap();
|
||||
let databases = vec!["a_1".to_string(), "a_2".to_string(), "b_1".to_string()];
|
||||
|
||||
let database_agents = spec.database_split_to_agents(&databases);
|
||||
assert!(matches!(
|
||||
database_agents.unwrap_err(),
|
||||
Error::DatabaseWritersConfig
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn split_databases_by_writer_ratio_defaults() {
|
||||
let toml = r#"
|
||||
name = "demo_schema"
|
||||
|
||||
[[agents]]
|
||||
name = "foo"
|
||||
[[agents.measurements]]
|
||||
name = "cpu"
|
||||
[[agents.measurements.fields]]
|
||||
name = "host"
|
||||
template = "server"
|
||||
|
||||
[[database_writers]]
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]
|
||||
"#;
|
||||
|
||||
let spec = DataSpec::from_str(toml).unwrap();
|
||||
let databases = vec!["a_1".to_string(), "a_2".to_string()];
|
||||
|
||||
let database_agents = spec.database_split_to_agents(&databases).unwrap();
|
||||
|
||||
let b = &database_agents[0];
|
||||
assert_eq!(b.database, &databases[0]);
|
||||
assert_eq!(
|
||||
b.agent_assignments[0].sampling_interval,
|
||||
Duration::from_secs(10)
|
||||
);
|
||||
assert_eq!(b.agent_assignments[0].count, 1);
|
||||
assert_eq!(b.agent_assignments[0].spec.name, "foo");
|
||||
|
||||
let b = &database_agents[1];
|
||||
assert_eq!(b.database, &databases[1]);
|
||||
assert_eq!(
|
||||
b.agent_assignments[0].sampling_interval,
|
||||
Duration::from_secs(10)
|
||||
);
|
||||
assert_eq!(b.agent_assignments[0].count, 1);
|
||||
assert_eq!(b.agent_assignments[0].spec.name, "foo");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -493,8 +493,7 @@ name = "cpu"
|
|||
name = "f1"
|
||||
i64_range = [0, 23]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
[[database_writers]]
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]"#;
|
||||
|
||||
let spec = DataSpec::from_str(toml).unwrap();
|
||||
|
@ -541,8 +540,7 @@ name = "cpu"
|
|||
name = "f1"
|
||||
i64_range = [0, 23]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
[[database_writers]]
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]"#;
|
||||
|
||||
let spec = DataSpec::from_str(toml).unwrap();
|
||||
|
@ -610,8 +608,8 @@ name = "cpu"
|
|||
name = "f1"
|
||||
i64_range = [0, 23]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
[[database_writers]]
|
||||
database_ratio = 1.0
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]"#;
|
||||
|
||||
let spec = DataSpec::from_str(toml).unwrap();
|
||||
|
|
|
@ -342,8 +342,7 @@ name = "cpu"
|
|||
name = "val"
|
||||
i64_range = [3,3]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
[[database_writers]]
|
||||
agents = [{name = "foo", sampling_interval = "1s"}]
|
||||
"#;
|
||||
|
||||
|
@ -354,10 +353,7 @@ agents = [{name = "foo", sampling_interval = "1s"}]
|
|||
|
||||
generate(
|
||||
&data_spec,
|
||||
vec![OrgBucket {
|
||||
org: "foo".to_string(),
|
||||
bucket: "bar".to_string(),
|
||||
}],
|
||||
vec!["foo_bar".to_string()],
|
||||
&mut points_writer_builder,
|
||||
Some(now),
|
||||
Some(now),
|
||||
|
@ -395,8 +391,7 @@ name = "cpu"
|
|||
name = "val"
|
||||
i64_range = [2, 2]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
[[database_writers]]
|
||||
agents = [{name = "foo", sampling_interval = "1s"}]
|
||||
"#;
|
||||
|
||||
|
@ -407,10 +402,7 @@ agents = [{name = "foo", sampling_interval = "1s"}]
|
|||
|
||||
generate(
|
||||
&data_spec,
|
||||
vec![OrgBucket {
|
||||
org: "foo".to_string(),
|
||||
bucket: "bar".to_string(),
|
||||
}],
|
||||
vec!["foo_bar".to_string()],
|
||||
&mut points_writer_builder,
|
||||
Some(now - 1_000_000_000),
|
||||
Some(now),
|
||||
|
|
|
@ -13,6 +13,6 @@ tag_pairs = [
|
|||
name = "usage_user"
|
||||
f64_range = [0.0, 100.0]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
[[database_writers]]
|
||||
database_ratio = 1.0
|
||||
agents = [{name = "foo", count = 3, sampling_interval = "10s"}]
|
||||
|
|
|
@ -1061,10 +1061,21 @@ pub enum InitError {
|
|||
/// The Database startup state machine
|
||||
///
|
||||
/// A Database starts in DatabaseState::Known and advances through the
|
||||
/// states in sequential order until it reaches Initialized or an error
|
||||
/// is encountered.
|
||||
/// non error states in sequential order until either:
|
||||
///
|
||||
/// 1. It reaches `Initialized`
|
||||
///
|
||||
/// 2. It is reset to `Known` and starts initialization again
|
||||
///
|
||||
/// 3. An error is encountered, in which case it transitions to one of
|
||||
/// the error states. Most are Terminal (and thus require operator
|
||||
/// intervention) but some (such as `WriteBufferCreationError`) may
|
||||
/// resolve after some time to the basic initialization sequence
|
||||
/// (e.g. `Initialized`)
|
||||
///
|
||||
#[derive(Debug, Clone)]
|
||||
enum DatabaseState {
|
||||
// Basic initialization sequence states:
|
||||
Known(DatabaseStateKnown),
|
||||
DatabaseObjectStoreFound(DatabaseStateDatabaseObjectStoreFound),
|
||||
OwnerInfoLoaded(DatabaseStateOwnerInfoLoaded),
|
||||
|
@ -1072,12 +1083,23 @@ enum DatabaseState {
|
|||
CatalogLoaded(DatabaseStateCatalogLoaded),
|
||||
Initialized(DatabaseStateInitialized),
|
||||
|
||||
// Error states
|
||||
/// Terminal State
|
||||
DatabaseObjectStoreLookupError(DatabaseStateKnown, Arc<InitError>),
|
||||
/// Terminal State
|
||||
NoActiveDatabase(DatabaseStateKnown, Arc<InitError>),
|
||||
/// Terminal State
|
||||
OwnerInfoLoadError(DatabaseStateDatabaseObjectStoreFound, Arc<InitError>),
|
||||
/// Terminal State
|
||||
RulesLoadError(DatabaseStateOwnerInfoLoaded, Arc<InitError>),
|
||||
/// Terminal State
|
||||
CatalogLoadError(DatabaseStateRulesLoaded, Arc<InitError>),
|
||||
/// Non Terminal State: There was an error creating a connction to
|
||||
/// the WriteBuffer, but the connection will be retried. If a
|
||||
/// connection is successfully created, the database will
|
||||
/// transition to `Initialized`
|
||||
WriteBufferCreationError(DatabaseStateCatalogLoaded, Arc<InitError>),
|
||||
/// Terminal State
|
||||
ReplayError(DatabaseStateCatalogLoaded, Arc<InitError>),
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue