diff --git a/Cargo.lock b/Cargo.lock index 744e645693..6e59ab6881 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,9 +78,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.59" +version = "1.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c91f1f46651137be86f3a2b9a8359f9ab421d04d941c62b5982e1ca21113adf9" +checksum = "c794e162a5eff65c72ef524dfe393eb923c354e350bb78b9c7383df13f3bc142" [[package]] name = "arrayref" @@ -266,9 +266,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.5.13" +version = "0.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b9496f0c1d1afb7a2af4338bbe1d969cddfead41d87a9fb3aaa6d0bbc7af648" +checksum = "9de18bc5f2e9df8f52da03856bf40e29b747de5a84e43aefff90e3dc4a21529b" dependencies = [ "async-trait", "axum-core", @@ -813,7 +813,7 @@ dependencies = [ "tonic 0.7.2", "tracing", "tracing-core", - "tracing-subscriber 0.3.11", + "tracing-subscriber", ] [[package]] @@ -1928,6 +1928,7 @@ dependencies = [ name = "import" version = "0.1.0" dependencies = [ + "assert_matches", "clap_blocks", "futures", "object_store", @@ -1936,8 +1937,6 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "tracing", - "tracing-subscriber 0.2.25", "trogging", "workspace-hack", ] @@ -2257,7 +2256,7 @@ dependencies = [ "tokio", "toml", "tracing", - "tracing-subscriber 0.3.11", + "tracing-subscriber", "uuid 1.1.2", ] @@ -2638,9 +2637,9 @@ dependencies = [ [[package]] name = "libm" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da83a57f3f5ba3680950aa3cbc806fc297bc0b289d42e8942ed528ace71b8145" +checksum = "c7ce35d4899fa3c0558d4f5082c98927789a01024270711cf113999b66ced65a" [[package]] name = "linux-raw-sys" @@ -2675,7 +2674,7 @@ dependencies = [ "once_cell", "parking_lot 0.12.1", "regex", - "tracing-subscriber 0.3.11", + "tracing-subscriber", "workspace-hack", ] @@ -2699,15 +2698,6 @@ dependencies = [ "libc", ] -[[package]] -name = "matchers" -version = "0.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" -dependencies = [ - "regex-automata", -] - [[package]] name = "matchers" version = "0.1.0" @@ -4220,7 +4210,7 @@ dependencies = [ "rand", "snap", "thiserror", - "time 0.3.12", + "time 0.3.13", "tokio", "tracing", ] @@ -4903,9 +4893,9 @@ checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" [[package]] name = "strum_macros" -version = "0.24.2" +version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4faebde00e8ff94316c01800f9054fd2ba77d30d9e922541913051d1d978918b" +checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ "heck", "proc-macro2", @@ -5006,7 +4996,7 @@ dependencies = [ "tempfile", "tokio", "tracing-log", - "tracing-subscriber 0.3.11", + "tracing-subscriber", "workspace-hack", ] @@ -5140,11 +5130,10 @@ dependencies = [ [[package]] name = "time" -version = "0.3.12" +version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74b7cc93fc23ba97fde84f7eea56c55d1ba183f495c6715defdfc7b9cb8c870f" +checksum = "db76ff9fa4b1458b3c7f077f3ff9887394058460d21e634355b273aaf11eea45" dependencies = [ - "js-sys", "libc", "num_threads", ] @@ -5532,28 +5521,6 @@ dependencies = [ "tracing-core", ] -[[package]] -name = "tracing-subscriber" -version = "0.2.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71" -dependencies = [ - "ansi_term", - "chrono", - "lazy_static", - "matchers 0.0.1", - "regex", - "serde", - "serde_json", - "sharded-slab", - "smallvec", - "thread_local", - "tracing", - "tracing-core", - "tracing-log", - "tracing-serde", -] - [[package]] name = "tracing-subscriber" version = "0.3.11" @@ -5562,7 +5529,7 @@ checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596" dependencies = [ "ansi_term", "lazy_static", - "matchers 0.1.0", + "matchers", "parking_lot 0.12.1", "regex", "serde", @@ -5605,7 +5572,7 @@ dependencies = [ "synchronized-writer", "thiserror", "tracing-log", - "tracing-subscriber 0.3.11", + "tracing-subscriber", ] [[package]] @@ -6027,7 +5994,7 @@ dependencies = [ "tracing", "tracing-core", "tracing-log", - "tracing-subscriber 0.3.11", + "tracing-subscriber", "uuid 1.1.2", "winapi", "windows-sys", diff --git a/import/Cargo.toml b/import/Cargo.toml index 8844168cbe..46a87f283e 100644 --- a/import/Cargo.toml +++ b/import/Cargo.toml @@ -14,11 +14,12 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.82" thiserror = "1.0.31" tokio = { version = "1.17" } -tracing = { version = "0.1", features = ["max_level_trace", "release_max_level_debug"] } -tracing-subscriber = "0.2.0" trogging = { path = "../trogging", features = ["clap"] } workspace-hack = { path = "../workspace-hack"} +[dev-dependencies] +assert_matches = "1.5" + [features] azure = ["object_store/azure"] # Optional Azure Object store support gcp = ["object_store/gcp"] # Optional GCP object store support diff --git a/import/src/lib.rs b/import/src/lib.rs index 9ca646aadf..8ca5694b81 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -1,5 +1,159 @@ -pub mod schema { - pub mod fetch; - pub mod merge; - pub mod schema_parse; +use serde::de::{Deserialize, Deserializer}; +use serde::ser::{Serialize, Serializer}; +use serde::*; +use std::collections::{HashMap, HashSet}; + +pub mod schema; + +/// This struct is used to build up structs from TSM snapshots that we are going to use to bulk +/// ingest. They will be merged, then validated to check for anomalies that will complicate bulk +/// ingest such as tags/fields with the same name, or fields with different types across the whole +/// dataset. It is not the same as an IOx schema, although it is similar and some of the merge code +/// is similar. It's a transient data structure. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct AggregateTSMSchema { + pub org_id: String, + pub bucket_id: String, + pub measurements: HashMap, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct AggregateTSMMeasurement { + // Map of tag name -> tag; note that the schema we get from the TSM tool has these as arrays. + // Using HashMaps internally to detect duplicates, so we have to do some custom serialisation + // for tags and fields here. + #[serde( + serialize_with = "serialize_map_values", + deserialize_with = "deserialize_tags" + )] + pub tags: HashMap, + #[serde( + serialize_with = "serialize_map_values", + deserialize_with = "deserialize_fields" + )] + pub fields: HashMap, +} + +fn serialize_map_values(value: &HashMap, serializer: S) -> Result +where + S: Serializer, + V: Serialize, +{ + serializer.collect_seq(value.values()) +} + +fn deserialize_tags<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let v: Vec = Deserialize::deserialize(deserializer)?; + Ok(v.into_iter().map(|t| (t.name.clone(), t)).collect()) +} + +fn deserialize_fields<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let v: Vec = Deserialize::deserialize(deserializer)?; + Ok(v.into_iter().map(|f| (f.name.clone(), f)).collect()) +} + +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct AggregateTSMTag { + pub name: String, + pub values: HashSet, +} + +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct AggregateTSMField { + pub name: String, + pub types: HashSet, +} + +impl TryFrom> for AggregateTSMSchema { + type Error = serde_json::Error; + + fn try_from(data: Vec) -> Result { + serde_json::from_slice(&data) + } +} + +impl TryFrom<&str> for AggregateTSMSchema { + type Error = serde_json::Error; + + fn try_from(data: &str) -> Result { + serde_json::from_str(data) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn parses() { + let json = r#" + { + "org_id": "1234", + "bucket_id": "5678", + "measurements": { + "cpu": { + "tags": [ + { "name": "host", "values": ["server", "desktop"] } + ], + "fields": [ + { "name": "usage", "types": ["Float"] } + ] + } + } + } + "#; + let schema: AggregateTSMSchema = json.try_into().unwrap(); + assert_eq!(schema.org_id, "1234"); + assert_eq!(schema.bucket_id, "5678"); + assert_eq!(schema.measurements.len(), 1); + assert!(schema.measurements.contains_key("cpu")); + let measurement = schema.measurements.get("cpu").unwrap(); + assert_eq!(measurement.tags.len(), 1); + let tag = &measurement.tags.values().next().unwrap(); + assert_eq!(tag.name, "host"); + assert_eq!( + tag.values, + HashSet::from(["server".to_string(), "desktop".to_string()]) + ); + let field = &measurement.fields.values().next().unwrap(); + assert_eq!(field.name, "usage"); + assert_eq!(field.types, HashSet::from(["Float".to_string()])); + // exercise the Vec tryfrom impl too + assert_eq!(schema, json.as_bytes().to_vec().try_into().unwrap()); + // now exercise the serialise code too + let schema = AggregateTSMSchema { + org_id: "1234".to_string(), + bucket_id: "5678".to_string(), + measurements: HashMap::from([( + "cpu".to_string(), + AggregateTSMMeasurement { + tags: HashMap::from([( + "host".to_string(), + AggregateTSMTag { + name: "host".to_string(), + values: HashSet::from(["server".to_string(), "desktop".to_string()]), + }, + )]), + fields: HashMap::from([( + "usage".to_string(), + AggregateTSMField { + name: "usage".to_string(), + types: HashSet::from(["Float".to_string()]), + }, + )]), + }, + )]), + }; + let _json = serde_json::to_string(&schema).unwrap(); + // ^ not asserting on the value because vector ordering changes so it would be flakey. it's + // enough that it serialises without error + } } diff --git a/import/src/schema/fetch.rs b/import/src/schema/fetch.rs index 4535369c48..31a42050c6 100644 --- a/import/src/schema/fetch.rs +++ b/import/src/schema/fetch.rs @@ -5,7 +5,7 @@ use object_store::{path::Path, DynObjectStore, GetResult}; use thiserror::Error; use tokio::select; -use super::schema_parse::AggregateTSMSchema; +use crate::AggregateTSMSchema; // Possible errors from schema commands #[derive(Debug, Error)] diff --git a/import/src/schema/merge.rs b/import/src/schema/merge.rs index 16de585299..6133a2a680 100644 --- a/import/src/schema/merge.rs +++ b/import/src/schema/merge.rs @@ -1,4 +1,4 @@ -use crate::schema::schema_parse::{AggregateTSMSchema, Field, Measurement, Tag}; +use crate::{AggregateTSMField, AggregateTSMMeasurement, AggregateTSMSchema, AggregateTSMTag}; use thiserror::Error; @@ -83,11 +83,14 @@ fn do_merge_schema( } } -fn do_merge_measurement(into_measurement: &mut Measurement, from_measurement: &Measurement) { +fn do_merge_measurement( + into_measurement: &mut AggregateTSMMeasurement, + from_measurement: &AggregateTSMMeasurement, +) { // merge tags from_measurement.tags.values().for_each(|from_tag| { if let Some(into_tag) = into_measurement.tags.get(&from_tag.name) { - let mut new_tag = Tag { + let mut new_tag = AggregateTSMTag { name: from_tag.name.clone(), values: into_tag.values.clone(), }; @@ -102,7 +105,7 @@ fn do_merge_measurement(into_measurement: &mut Measurement, from_measurement: &M // merge fields from_measurement.fields.values().for_each(|from_field| { if let Some(into_field) = into_measurement.fields.get(&from_field.name) { - let mut new_field = Field { + let mut new_field = AggregateTSMField { name: from_field.name.clone(), types: into_field.types.clone(), }; @@ -126,33 +129,33 @@ mod tests { #[tokio::test] async fn merge_measurements_adds_if_missing() { - let mut m1 = Measurement { + let mut m1 = AggregateTSMMeasurement { tags: HashMap::from([( "host".to_string(), - Tag { + AggregateTSMTag { name: "host".to_string(), values: HashSet::from(["server".to_string(), "desktop".to_string()]), }, )]), fields: HashMap::from([( "usage".to_string(), - Field { + AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Float".to_string()]), }, )]), }; - let m2 = Measurement { + let m2 = AggregateTSMMeasurement { tags: HashMap::from([( "sensor".to_string(), - Tag { + AggregateTSMTag { name: "sensor".to_string(), values: HashSet::from(["top".to_string(), "bottom".to_string()]), }, )]), fields: HashMap::from([( "temperature".to_string(), - Field { + AggregateTSMField { name: "temperature".to_string(), types: HashSet::from(["Float".to_string()]), }, @@ -165,33 +168,33 @@ mod tests { #[tokio::test] async fn merge_measurements_merges_tag_with_new_value() { - let mut m1 = Measurement { + let mut m1 = AggregateTSMMeasurement { tags: HashMap::from([( "host".to_string(), - Tag { + AggregateTSMTag { name: "host".to_string(), values: HashSet::from(["server".to_string(), "desktop".to_string()]), }, )]), fields: HashMap::from([( "usage".to_string(), - Field { + AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Float".to_string()]), }, )]), }; - let m2 = Measurement { + let m2 = AggregateTSMMeasurement { tags: HashMap::from([( "host".to_string(), - Tag { + AggregateTSMTag { name: "host".to_string(), values: HashSet::from(["gadget".to_string()]), }, )]), fields: HashMap::from([( "usage".to_string(), - Field { + AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Float".to_string()]), }, @@ -212,33 +215,33 @@ mod tests { #[tokio::test] async fn merge_measurements_merges_tag_with_new_and_old_values() { - let mut m1 = Measurement { + let mut m1 = AggregateTSMMeasurement { tags: HashMap::from([( "host".to_string(), - Tag { + AggregateTSMTag { name: "host".to_string(), values: HashSet::from(["server".to_string(), "desktop".to_string()]), }, )]), fields: HashMap::from([( "usage".to_string(), - Field { + AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Float".to_string()]), }, )]), }; - let m2 = Measurement { + let m2 = AggregateTSMMeasurement { tags: HashMap::from([( "host".to_string(), - Tag { + AggregateTSMTag { name: "host".to_string(), values: HashSet::from(["gadget".to_string(), "desktop".to_string()]), }, )]), fields: HashMap::from([( "usage".to_string(), - Field { + AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Float".to_string()]), }, @@ -259,33 +262,33 @@ mod tests { #[tokio::test] async fn merge_measurements_merges_field_with_new_type() { - let mut m1 = Measurement { + let mut m1 = AggregateTSMMeasurement { tags: HashMap::from([( "host".to_string(), - Tag { + AggregateTSMTag { name: "host".to_string(), values: HashSet::from(["server".to_string(), "desktop".to_string()]), }, )]), fields: HashMap::from([( "usage".to_string(), - Field { + AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Float".to_string()]), }, )]), }; - let m2 = Measurement { + let m2 = AggregateTSMMeasurement { tags: HashMap::from([( "host".to_string(), - Tag { + AggregateTSMTag { name: "host".to_string(), values: HashSet::from(["server".to_string(), "desktop".to_string()]), }, )]), fields: HashMap::from([( "usage".to_string(), - Field { + AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Integer".to_string()]), }, @@ -302,33 +305,33 @@ mod tests { #[tokio::test] async fn merge_measurements_merges_field_with_new_and_old_types() { - let mut m1 = Measurement { + let mut m1 = AggregateTSMMeasurement { tags: HashMap::from([( "host".to_string(), - Tag { + AggregateTSMTag { name: "host".to_string(), values: HashSet::from(["server".to_string(), "desktop".to_string()]), }, )]), fields: HashMap::from([( "usage".to_string(), - Field { + AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Float".to_string()]), }, )]), }; - let m2 = Measurement { + let m2 = AggregateTSMMeasurement { tags: HashMap::from([( "host".to_string(), - Tag { + AggregateTSMTag { name: "host".to_string(), values: HashSet::from(["server".to_string(), "desktop".to_string()]), }, )]), fields: HashMap::from([( "usage".to_string(), - Field { + AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Float".to_string(), "Integer".to_string()]), }, @@ -350,17 +353,17 @@ mod tests { bucket_id: "mybucket".to_string(), measurements: HashMap::from([( "cpu".to_string(), - Measurement { + AggregateTSMMeasurement { tags: HashMap::from([( "host".to_string(), - Tag { + AggregateTSMTag { name: "host".to_string(), values: HashSet::from(["server".to_string(), "desktop".to_string()]), }, )]), fields: HashMap::from([( "usage".to_string(), - Field { + AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Float".to_string()]), }, @@ -373,17 +376,17 @@ mod tests { bucket_id: "mybucket".to_string(), measurements: HashMap::from([( "weather".to_string(), - Measurement { + AggregateTSMMeasurement { tags: HashMap::from([( "location".to_string(), - Tag { + AggregateTSMTag { name: "location".to_string(), values: HashSet::from(["london".to_string()]), }, )]), fields: HashMap::from([( "temperature".to_string(), - Field { + AggregateTSMField { name: "temperature".to_string(), types: HashSet::from(["Float".to_string()]), }, @@ -410,17 +413,17 @@ mod tests { bucket_id: "mybucket".to_string(), measurements: HashMap::from([( "cpu".to_string(), - Measurement { + AggregateTSMMeasurement { tags: HashMap::from([( "host".to_string(), - Tag { + AggregateTSMTag { name: "host".to_string(), values: HashSet::from(["server".to_string(), "desktop".to_string()]), }, )]), fields: HashMap::from([( "usage".to_string(), - Field { + AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Float".to_string()]), }, @@ -433,17 +436,17 @@ mod tests { bucket_id: "mybucket".to_string(), measurements: HashMap::from([( "cpu".to_string(), - Measurement { + AggregateTSMMeasurement { tags: HashMap::from([( "host".to_string(), - Tag { + AggregateTSMTag { name: "host".to_string(), values: HashSet::from(["gadget".to_string()]), }, )]), fields: HashMap::from([( "usage".to_string(), - Field { + AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Integer".to_string(), "Float".to_string()]), }, @@ -462,7 +465,7 @@ mod tests { ); assert_eq!( measurement.tags.values().cloned().collect::>(), - vec![Tag { + vec![AggregateTSMTag { name: "host".to_string(), values: HashSet::from([ "server".to_string(), @@ -477,7 +480,7 @@ mod tests { ); assert_eq!( measurement.fields.values().cloned().collect::>(), - vec![Field { + vec![AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Integer".to_string(), "Float".to_string()]) }] @@ -497,10 +500,10 @@ mod tests { bucket_id: bucket.clone(), measurements: HashMap::from([( "cpu".to_string(), - Measurement { + AggregateTSMMeasurement { tags: HashMap::from([( "host".to_string(), - Tag { + AggregateTSMTag { name: "host".to_string(), values: HashSet::from([ "server".to_string(), @@ -510,7 +513,7 @@ mod tests { )]), fields: HashMap::from([( "usage".to_string(), - Field { + AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Float".to_string()]), }, @@ -523,17 +526,17 @@ mod tests { bucket_id: bucket.clone(), measurements: HashMap::from([( "cpu".to_string(), - Measurement { + AggregateTSMMeasurement { tags: HashMap::from([( "host".to_string(), - Tag { + AggregateTSMTag { name: "host".to_string(), values: HashSet::from(["gadget".to_string()]), }, )]), fields: HashMap::from([( "usage".to_string(), - Field { + AggregateTSMField { name: "usage".to_string(), types: HashSet::from(["Integer".to_string()]), }, @@ -546,17 +549,17 @@ mod tests { bucket_id: bucket.clone(), measurements: HashMap::from([( "weather".to_string(), - Measurement { + AggregateTSMMeasurement { tags: HashMap::from([( "location".to_string(), - Tag { + AggregateTSMTag { name: "location".to_string(), values: HashSet::from(["london".to_string()]), }, )]), fields: HashMap::from([( "temperature".to_string(), - Field { + AggregateTSMField { name: "temperature".to_string(), types: HashSet::from(["Float".to_string()]), }, @@ -569,17 +572,17 @@ mod tests { bucket_id: bucket, measurements: HashMap::from([( "weather".to_string(), - Measurement { + AggregateTSMMeasurement { tags: HashMap::from([( "location".to_string(), - Tag { + AggregateTSMTag { name: "location".to_string(), values: HashSet::from(["berlin".to_string()]), }, )]), fields: HashMap::from([( "temperature".to_string(), - Field { + AggregateTSMField { name: "temperature".to_string(), types: HashSet::from(["Integer".to_string()]), }, diff --git a/import/src/schema/mod.rs b/import/src/schema/mod.rs index cbdd23823a..f09a4279ef 100644 --- a/import/src/schema/mod.rs +++ b/import/src/schema/mod.rs @@ -1,2 +1,3 @@ -mod merge; -mod schema_parse; +pub mod fetch; +pub mod merge; +pub mod validate; diff --git a/import/src/schema/schema_parse.rs b/import/src/schema/schema_parse.rs deleted file mode 100644 index 51330a7f61..0000000000 --- a/import/src/schema/schema_parse.rs +++ /dev/null @@ -1,128 +0,0 @@ -use serde::de::{Deserialize, Deserializer}; -use serde::ser::{Serialize, Serializer}; -use serde::*; -use std::collections::{HashMap, HashSet}; - -/// This struct is used to build up structs from TSM snapshots that we are going to use to bulk -/// ingest. They will be merged, then validated to check for anomalies that will complicate bulk -/// ingest such as tags/fields with the same name, or fields with different types across the whole -/// dataset. It is not the same as an IOx schema, although it is similar and some of the merge code -/// is similar. It's a transient data structure. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct AggregateTSMSchema { - pub org_id: String, - pub bucket_id: String, - pub measurements: HashMap, -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct Measurement { - // Map of tag name -> tag; note that the schema we get from the TSM tool has these as arrays. - // Using HashMaps internally to detect duplicates, so we have to do some custom serialisation - // for tags and fields here. - #[serde( - serialize_with = "serialize_map_values", - deserialize_with = "deserialize_tags" - )] - pub tags: HashMap, - #[serde( - serialize_with = "serialize_map_values", - deserialize_with = "deserialize_fields" - )] - pub fields: HashMap, -} - -fn serialize_map_values(value: &HashMap, serializer: S) -> Result -where - S: Serializer, - V: Serialize, -{ - serializer.collect_seq(value.values()) -} - -fn deserialize_tags<'de, D>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - let v: Vec = Deserialize::deserialize(deserializer)?; - Ok(v.into_iter().map(|t| (t.name.clone(), t)).collect()) -} - -fn deserialize_fields<'de, D>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - let v: Vec = Deserialize::deserialize(deserializer)?; - Ok(v.into_iter().map(|f| (f.name.clone(), f)).collect()) -} - -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct Tag { - pub name: String, - pub values: HashSet, -} - -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct Field { - pub name: String, - pub types: HashSet, -} - -impl TryFrom> for AggregateTSMSchema { - type Error = serde_json::Error; - - fn try_from(data: Vec) -> Result { - serde_json::from_slice(&data) - } -} - -impl TryFrom<&str> for AggregateTSMSchema { - type Error = serde_json::Error; - - fn try_from(data: &str) -> Result { - serde_json::from_str(data) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn parses() { - let json = r#" - { - "org_id": "1234", - "bucket_id": "5678", - "measurements": { - "cpu": { - "tags": [ - { "name": "host", "values": ["server", "desktop"] } - ], - "fields": [ - { "name": "usage", "types": ["float"] } - ] - } - } - } - "#; - let schema: AggregateTSMSchema = json.try_into().unwrap(); - assert_eq!(schema.org_id, "1234"); - assert_eq!(schema.bucket_id, "5678"); - assert_eq!(schema.measurements.len(), 1); - assert!(schema.measurements.contains_key("cpu")); - let measurement = schema.measurements.get("cpu").unwrap(); - assert_eq!(measurement.tags.len(), 1); - let tag = &measurement.tags.values().next().unwrap(); - assert_eq!(tag.name, "host"); - assert_eq!( - tag.values, - HashSet::from(["server".to_string(), "desktop".to_string()]) - ); - let field = &measurement.fields.values().next().unwrap(); - assert_eq!(field.name, "usage"); - assert_eq!(field.types, HashSet::from(["float".to_string()])); - // exercise the Vec tryfrom impl too - assert_eq!(schema, json.as_bytes().to_vec().try_into().unwrap()); - } -} diff --git a/import/src/schema/validate.rs b/import/src/schema/validate.rs new file mode 100644 index 0000000000..d5a9bb3d02 --- /dev/null +++ b/import/src/schema/validate.rs @@ -0,0 +1,132 @@ +use crate::AggregateTSMSchema; +use thiserror::Error; + +// Possible validation errors +#[derive(Debug, Error)] +pub enum ValidationError { + #[error("Measurement '{measurement}' has a tag and field with the same name: {name}")] + TagAndFieldSameName { measurement: String, name: String }, + + #[error( + "Measurement '{measurement}' has field '{name}' with multiple types: {:?}", + types + )] + FieldWithMultipleTypes { + measurement: String, + name: String, + types: Vec, + }, +} + +pub fn validate_schema(schema: &AggregateTSMSchema) -> Result<(), Vec> { + let mut errors: Vec = vec![]; + for (measurement_name, measurement) in &schema.measurements { + if let Some(tag_name) = measurement + .tags + .keys() + .find(|&t| measurement.fields.contains_key(t)) + { + errors.push(ValidationError::TagAndFieldSameName { + measurement: measurement_name.clone(), + name: tag_name.clone(), + }); + } + if let Some(field) = measurement.fields.values().find(|f| f.types.len() > 1) { + errors.push(ValidationError::FieldWithMultipleTypes { + measurement: measurement_name.clone(), + name: field.name.clone(), + types: field.types.iter().cloned().collect::>(), + }); + } + } + if !errors.is_empty() { + Err(errors) + } else { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use assert_matches::assert_matches; + + #[tokio::test] + async fn good() { + let json = r#" + { + "org_id": "1234", + "bucket_id": "5678", + "measurements": { + "cpu": { + "tags": [ + { "name": "host", "values": ["server", "desktop"] } + ], + "fields": [ + { "name": "usage", "types": ["Float"] } + ] + } + } + } + "#; + let schema: AggregateTSMSchema = json.try_into().unwrap(); + assert_matches!(validate_schema(&schema), Ok(_)); + } + + #[tokio::test] + async fn tag_and_field_same_name() { + let json = r#" + { + "org_id": "1234", + "bucket_id": "5678", + "measurements": { + "weather": { + "tags": [ + { "name": "temperature", "values": ["true"] } + ], + "fields": [ + { "name": "temperature", "types": ["Float"] } + ] + } + } + } + "#; + let schema: AggregateTSMSchema = json.try_into().unwrap(); + if let Err(errors) = validate_schema(&schema) { + assert_eq!(errors.len(), 1); + assert_matches!( + errors.get(0), + Some(ValidationError::TagAndFieldSameName { .. }) + ); + } + } + + #[tokio::test] + async fn field_with_multiple_types() { + let json = r#" + { + "org_id": "1234", + "bucket_id": "5678", + "measurements": { + "weather": { + "tags": [ + { "name": "location", "values": ["London", "Berlin"] } + ], + "fields": [ + { "name": "temperature", "types": ["Float", "Integer"] } + ] + } + } + } + "#; + let schema: AggregateTSMSchema = json.try_into().unwrap(); + if let Err(errors) = validate_schema(&schema) { + assert_eq!(errors.len(), 1); + assert_matches!( + errors.get(0), + Some(ValidationError::FieldWithMultipleTypes { .. }) + ); + } + } +} diff --git a/influxdb_iox/src/commands/import/schema.rs b/influxdb_iox/src/commands/import/schema.rs index fd7bfd1351..c145860cf1 100644 --- a/influxdb_iox/src/commands/import/schema.rs +++ b/influxdb_iox/src/commands/import/schema.rs @@ -8,6 +8,7 @@ use thiserror::Error; use import::schema::{ fetch::{fetch_schema, FetchError}, merge::{SchemaMergeError, SchemaMerger}, + validate::{validate_schema, ValidationError}, }; // Possible errors from schema commands @@ -21,6 +22,9 @@ pub enum SchemaCommandError { #[error("Error merging schemas: {0}")] Merging(#[from] SchemaMergeError), + + #[error("{0}")] + Validating(#[from] ValidationError), } #[derive(Parser, Debug)] @@ -57,7 +61,6 @@ pub struct MergeConfig { pub async fn command(config: Config) -> Result<(), SchemaCommandError> { match config { Config::Merge(merge_config) => { - println!("Merging schemas from object store"); let object_store = make_object_store(&merge_config.object_store) .map_err(SchemaCommandError::ObjectStoreParsing)?; @@ -70,8 +73,14 @@ pub async fn command(config: Config) -> Result<(), SchemaCommandError> { let merger = SchemaMerger::new(merge_config.org_id, merge_config.bucket_id, schemas); let merged = merger.merge().map_err(SchemaCommandError::Merging)?; // just print the merged schema for now; we'll do more with this in future PRs - println!("{:?}", merged); + println!("Merged schema:\n{:?}", merged); + if let Err(errors) = validate_schema(&merged) { + eprintln!("Schema conflicts:"); + for e in errors { + eprintln!("- {}", e); + } + } Ok(()) } } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index c9bceef0b7..2e4938270d 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -66,7 +66,7 @@ tokio-util = { version = "0.7", features = ["codec", "tracing"] } tonic = { version = "0.8", features = ["async-trait", "axum", "channel", "codegen", "h2", "hyper", "hyper-timeout", "prost", "prost-derive", "prost1", "tokio", "tower", "tracing-futures", "transport"] } tower = { version = "0.4", features = ["__common", "balance", "buffer", "discover", "futures-core", "futures-util", "indexmap", "limit", "load", "log", "make", "pin-project", "pin-project-lite", "rand", "ready-cache", "slab", "timeout", "tokio", "tokio-util", "tracing", "util"] } tower-http = { version = "0.3", features = ["catch-panic", "map-response-body", "tower", "tracing", "util"] } -tracing = { version = "0.1", features = ["attributes", "log", "max_level_trace", "release_max_level_debug", "release_max_level_trace", "std", "tracing-attributes"] } +tracing = { version = "0.1", features = ["attributes", "log", "max_level_trace", "release_max_level_trace", "std", "tracing-attributes"] } tracing-core = { version = "0.1", features = ["lazy_static", "std"] } tracing-log = { version = "0.1", features = ["log-tracer", "std", "trace-logger"] } tracing-subscriber = { version = "0.3", features = ["alloc", "ansi", "ansi_term", "env-filter", "fmt", "json", "lazy_static", "matchers", "parking_lot", "regex", "registry", "serde", "serde_json", "sharded-slab", "smallvec", "std", "thread_local", "tracing", "tracing-log", "tracing-serde"] }