diff --git a/Cargo.lock b/Cargo.lock index 96a4cc7bcb..5d3a0b056e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -400,6 +400,16 @@ dependencies = [ "tonic-build 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "delorean_utilities" +version = "0.1.0" +dependencies = [ + "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", + "reqwest 0.10.4 (registry+https://github.com/rust-lang/crates.io-index)", + "structopt 0.3.13 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "difference" version = "2.0.0" @@ -1162,6 +1172,30 @@ dependencies = [ "treeline 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "proc-macro-error" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro-error-attr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", + "version_check 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", + "syn-mid 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "version_check 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "proc-macro-hack" version = "0.5.15" @@ -1577,6 +1611,28 @@ name = "strsim" version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "structopt" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "structopt-derive 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "structopt-derive" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro-error 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "syn" version = "1.0.17" @@ -1587,6 +1643,16 @@ dependencies = [ "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "syn-mid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tempfile" version = "3.1.0" @@ -2316,6 +2382,8 @@ dependencies = [ "checksum predicates 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "347a1b6f0b21e636bc9872fb60b83b8e185f6f5516298b8238699f7f9a531030" "checksum predicates-core 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "06075c3a3e92559ff8929e7a280684489ea27fe44805174c3ebd9328dcb37178" "checksum predicates-tree 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8e63c4859013b38a76eca2414c64911fba30def9e3202ac461a2d22831220124" +"checksum proc-macro-error 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "98e9e4b82e0ef281812565ea4751049f1bdcdfccda7d3f459f2e138a40c08678" +"checksum proc-macro-error-attr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f5444ead4e9935abd7f27dc51f7e852a0569ac888096d5ec2499470794e2e53" "checksum proc-macro-hack 0.5.15 (registry+https://github.com/rust-lang/crates.io-index)" = "0d659fe7c6d27f25e9d80a1a094c223f5246f6a6596453e09d7229bf42750b63" "checksum proc-macro-nested 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" "checksum proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)" = "df246d292ff63439fea9bc8c0a270bed0e390d5ebd4db4ba15aba81111b5abe3" @@ -2363,7 +2431,10 @@ dependencies = [ "checksum socket2 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)" = "03088793f677dce356f3ccc2edb1b314ad191ab702a5de3faf49304f7e104918" "checksum static_assertions 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "7f3eb36b47e512f8f1c9e3d10c2c1965bc992bd9cdb024fa581e2194501c83d3" "checksum strsim 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +"checksum structopt 0.3.13 (registry+https://github.com/rust-lang/crates.io-index)" = "ff6da2e8d107dfd7b74df5ef4d205c6aebee0706c647f6bc6a2d5789905c00fb" +"checksum structopt-derive 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "a489c87c08fbaf12e386665109dd13470dcc9c4583ea3e10dd2b4523e5ebd9ac" "checksum syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)" = "0df0eb663f387145cab623dea85b09c2c5b4b0aef44e945d928e682fce71bb03" +"checksum syn-mid 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7be3539f6c128a931cf19dcee741c1af532c7fd387baa739c03dd2e96479338a" "checksum tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" "checksum termcolor 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f" "checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" diff --git a/Cargo.toml b/Cargo.toml index f813ac7e04..e71d2b24e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,10 @@ name = "delorean" version = "0.1.0" authors = ["Paul Dix "] edition = "2018" +default-run = "delorean" + +[workspace] +members = ["delorean_utilities"] [profile.release] debug = true @@ -49,10 +53,10 @@ smallvec = "1.2.0" [dev-dependencies] criterion = "0.3" -reqwest = { version = "0.10.1", features = ["blocking"] } assert_cmd = "1.0.0" -rand = "0.7.2" tempfile = "3.1.0" +rand = "0.7.2" +reqwest = { version = "0.10.1", features = ["blocking"] } [[bench]] name = "encoders" diff --git a/delorean_utilities/Cargo.toml b/delorean_utilities/Cargo.toml new file mode 100644 index 0000000000..53dcd036f0 --- /dev/null +++ b/delorean_utilities/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "delorean_utilities" +version = "0.1.0" +authors = ["Paul Dix "] +edition = "2018" + +[dependencies] +rand = "0.7.2" +structopt = "0.3.13" +reqwest = { version = "0.10.1", features = ["blocking"] } +tokio = { version = "0.2", features = ["full"] } diff --git a/delorean_utilities/src/bin/generate.rs b/delorean_utilities/src/bin/generate.rs new file mode 100644 index 0000000000..f642dec00a --- /dev/null +++ b/delorean_utilities/src/bin/generate.rs @@ -0,0 +1,280 @@ +//! Utility to generate data to ingest for development and testing purposes. +//! +//! Similar to `storectl generate`. +//! +//! # Usage +//! +//! ``` +//! cargo run --bin generate > line-protocol.txt +//! ``` + +use rand::prelude::*; +use std::{ + convert::TryFrom, + fmt, + time::{SystemTime, UNIX_EPOCH}, +}; + +fn main() { + // TODO: turn these into command line arguments + let num_points = 100; + let max_tags_per_point = 5; + let max_fields_per_point = 5; + + // TODO: Optionally use a seed provided by a command line argument and output the seed used to enable + // reproducibility. + let mut rng = rand::thread_rng(); + + // Generate fields such that each field always has the same type throughout the batch + let field_definitions: Vec<_> = (0..max_fields_per_point) + .map(|num| Field::generate(&mut rng, num)) + .collect(); + + for _ in 0..num_points { + println!( + "{}", + Point::generate(&mut rng, max_tags_per_point, &field_definitions) + ); + } +} + +#[derive(Debug, Clone, PartialEq)] +struct Point { + measurement_name: String, + tags: Vec, + fields: Vec, + timestamp: Option, +} + +impl Point { + fn generate( + rng: &mut impl Rng, + max_tags_per_point: usize, + field_definitions: &[Field], + ) -> Point { + let num_tags = rng.gen_range(0, max_tags_per_point); + let tags = (0..num_tags).map(|num| Tag::generate(rng, num)).collect(); + + // Must have at least one field, so start the range at 1 + let num_fields = rng.gen_range(1, field_definitions.len()); + let fields = field_definitions[..num_fields] + .iter() + .map(|field| field.generate_similar(rng)) + .collect(); + + let since_the_epoch = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards"); + let now_ns = i64::try_from(since_the_epoch.as_nanos()).expect("Time does not fit"); + + Point { + measurement_name: "m0".into(), + tags, + fields, + timestamp: Some(now_ns), + } + } +} + +impl fmt::Display for Point { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.measurement_name)?; + + let mut tags = self.tags.iter(); + + if let Some(tag) = tags.next() { + write!(f, ",{}", tag)?; + + for tag in tags { + write!(f, ",{}", tag)?; + } + } + + write!(f, " ")?; + + // TODO: Error if there are no fields? + let mut fields = self.fields.iter(); + + if let Some(field) = fields.next() { + write!(f, "{}", field)?; + + for field in fields { + write!(f, ",{}", field)?; + } + } + + if let Some(time) = self.timestamp { + write!(f, " {}", time)?; + } + + Ok(()) + } +} + +#[derive(Debug, Clone, PartialEq)] +struct Tag { + key: String, + value: String, +} + +impl Tag { + fn generate(rng: &mut impl Rng, num: usize) -> Tag { + Tag { + key: format!("tag{}", num), + value: format!("value{}", rng.gen_range(0, 10)), + } + } +} + +impl fmt::Display for Tag { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}={}", self.key, self.value) + } +} + +#[derive(Debug, Clone, PartialEq)] +struct Field { + key: String, + value: FieldValue, +} + +impl Field { + fn generate(rng: &mut impl Rng, num: usize) -> Field { + Field { + key: format!("field{}", num), + value: FieldValue::generate(rng), + } + } + + fn generate_similar(&self, rng: &mut impl Rng) -> Field { + Field { + key: self.key.clone(), + value: self.value.generate_similar(rng), + } + } +} + +impl fmt::Display for Field { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}={}", self.key, self.value) + } +} + +#[derive(Debug, Clone, PartialEq)] +enum FieldValue { + Float(f64), + Integer(i64), + // Change `number_of_variants` and the `match` below in `generate` when more variants get added! + // String(String), + // Boolean(bool), +} + +impl FieldValue { + fn generate(rng: &mut impl Rng) -> FieldValue { + // Randomly select a variant + let number_of_variants = 2; + let which_variant = rng.gen_range(0, number_of_variants); + + match which_variant { + 0 => FieldValue::Float(rng.gen()), + 1 => FieldValue::Integer(rng.gen()), + other => unreachable!("Not sure which FieldValue variant to build from {}", other), + } + } + + fn generate_similar(&self, rng: &mut impl Rng) -> FieldValue { + match self { + FieldValue::Float(_) => FieldValue::Float(rng.gen()), + FieldValue::Integer(_) => FieldValue::Integer(rng.gen()), + } + } +} + +impl fmt::Display for FieldValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + FieldValue::Float(value) => write!(f, "{}", value), + FieldValue::Integer(value) => write!(f, "{}i", value), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn print_points_without_tags_or_timestamp() { + let point = Point { + measurement_name: "m0".into(), + tags: vec![], + fields: vec![ + Field { + key: "f0".into(), + value: FieldValue::Float(1.0), + }, + Field { + key: "f1".into(), + value: FieldValue::Integer(2), + }, + ], + timestamp: None, + }; + assert_eq!(point.to_string(), "m0 f0=1,f1=2i"); + } + + #[test] + fn print_points_without_timestamp() { + let point = Point { + measurement_name: "m0".into(), + tags: vec![ + Tag { + key: "t0".into(), + value: "v0".into(), + }, + Tag { + key: "t1".into(), + value: "v1".into(), + }, + ], + fields: vec![Field { + key: "f1".into(), + value: FieldValue::Integer(2), + }], + timestamp: None, + }; + assert_eq!(point.to_string(), "m0,t0=v0,t1=v1 f1=2i"); + } + + #[test] + fn print_points_with_everything() { + let point = Point { + measurement_name: "m0".into(), + tags: vec![ + Tag { + key: "t0".into(), + value: "v0".into(), + }, + Tag { + key: "t1".into(), + value: "v1".into(), + }, + ], + fields: vec![ + Field { + key: "f0".into(), + value: FieldValue::Float(1.0), + }, + Field { + key: "f1".into(), + value: FieldValue::Integer(2), + }, + ], + timestamp: Some(1_583_443_428_970_606_000), + }; + assert_eq!( + point.to_string(), + "m0,t0=v0,t1=v1 f0=1,f1=2i 1583443428970606000" + ); + } +} diff --git a/delorean_utilities/src/bin/seed.rs b/delorean_utilities/src/bin/seed.rs new file mode 100644 index 0000000000..adf3529365 --- /dev/null +++ b/delorean_utilities/src/bin/seed.rs @@ -0,0 +1,77 @@ +//! Utility to seed a running delorean instance with data for development and testing purposes. +//! +//! Similar to `inch`. +//! +//! # Usage +//! +//! ``` +//! cargo run --bin seed -- -o [ORG] -b [BUCKET] < line-protocol.txt +//! ``` +//! +//! Or in conjunction with `generate`: +//! +//! ``` +//! cargo run --bin generate | cargo run --bin seed -- -o [ORG] -b [BUCKET] +//! ``` + +use std::io::{self, Read}; +use structopt::StructOpt; + +const URL_BASE: &str = "http://localhost:8080/api/v2"; + +#[derive(StructOpt, Debug)] +#[structopt(name = "seed")] +struct CliOptions { + #[structopt(short = "o", long)] + org: String, + + #[structopt(short = "b", long)] + bucket: String, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let cli_options = CliOptions::from_args(); + let org = cli_options.org; + let bucket = cli_options.bucket; + + let client = reqwest::Client::new(); + + // Create a bucket + let url = format!("{}{}", URL_BASE, "/create_bucket"); + client + .post(&url) + .form(&[("bucket", &bucket), ("org", &org)]) + .send() + .await? + .error_for_status()?; + + // Read data from stdin + let mut data = String::new(); + let stdin = io::stdin(); + let mut stdin_handle = stdin.lock(); + stdin_handle.read_to_string(&mut data).unwrap(); + + // Write data to delorean + write_data(&client, "/write", &org, &bucket, data).await?; + + Ok(()) +} + +async fn write_data( + client: &reqwest::Client, + path: &str, + org_id: &str, + bucket_id: &str, + body: String, +) -> Result<(), Box> { + let url = format!("{}{}", URL_BASE, path); + client + .post(&url) + .query(&[("bucket", bucket_id), ("org", org_id)]) + .body(body) + .send() + .await? + .error_for_status()?; + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index f2ec593bfd..1cd697039f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -85,6 +85,7 @@ async fn write(req: hyper::Request, app: Arc) -> Result, let body = str::from_utf8(&body).unwrap(); let mut points = line_parser::parse(body).expect("TODO: Unable to parse lines"); + debug!("Parsed {} points", points.len()); app.db .write_points(write_info.org, bucket_id, &mut points)