chore: import validate merged schema (#5367)

* feat: import schema merge now outputs validation results

chore: refactor import crate

chore: renamed some structs for clarity in import crate

* chore: tests for import schema merge validation

* chore: Run cargo hakari tasks

* chore: clippy

* chore: make hashmap loop easier to read in import schema validation

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Luke Bond 2022-08-10 13:15:37 +01:00 committed by GitHub
parent c0fc91c627
commit 7e9918f067
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 391 additions and 252 deletions

71
Cargo.lock generated
View File

@ -78,9 +78,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.59"
version = "1.0.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c91f1f46651137be86f3a2b9a8359f9ab421d04d941c62b5982e1ca21113adf9"
checksum = "c794e162a5eff65c72ef524dfe393eb923c354e350bb78b9c7383df13f3bc142"
[[package]]
name = "arrayref"
@ -266,9 +266,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.5.13"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b9496f0c1d1afb7a2af4338bbe1d969cddfead41d87a9fb3aaa6d0bbc7af648"
checksum = "9de18bc5f2e9df8f52da03856bf40e29b747de5a84e43aefff90e3dc4a21529b"
dependencies = [
"async-trait",
"axum-core",
@ -813,7 +813,7 @@ dependencies = [
"tonic 0.7.2",
"tracing",
"tracing-core",
"tracing-subscriber 0.3.11",
"tracing-subscriber",
]
[[package]]
@ -1928,6 +1928,7 @@ dependencies = [
name = "import"
version = "0.1.0"
dependencies = [
"assert_matches",
"clap_blocks",
"futures",
"object_store",
@ -1936,8 +1937,6 @@ dependencies = [
"serde_json",
"thiserror",
"tokio",
"tracing",
"tracing-subscriber 0.2.25",
"trogging",
"workspace-hack",
]
@ -2257,7 +2256,7 @@ dependencies = [
"tokio",
"toml",
"tracing",
"tracing-subscriber 0.3.11",
"tracing-subscriber",
"uuid 1.1.2",
]
@ -2638,9 +2637,9 @@ dependencies = [
[[package]]
name = "libm"
version = "0.2.3"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da83a57f3f5ba3680950aa3cbc806fc297bc0b289d42e8942ed528ace71b8145"
checksum = "c7ce35d4899fa3c0558d4f5082c98927789a01024270711cf113999b66ced65a"
[[package]]
name = "linux-raw-sys"
@ -2675,7 +2674,7 @@ dependencies = [
"once_cell",
"parking_lot 0.12.1",
"regex",
"tracing-subscriber 0.3.11",
"tracing-subscriber",
"workspace-hack",
]
@ -2699,15 +2698,6 @@ dependencies = [
"libc",
]
[[package]]
name = "matchers"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
dependencies = [
"regex-automata",
]
[[package]]
name = "matchers"
version = "0.1.0"
@ -4220,7 +4210,7 @@ dependencies = [
"rand",
"snap",
"thiserror",
"time 0.3.12",
"time 0.3.13",
"tokio",
"tracing",
]
@ -4903,9 +4893,9 @@ checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f"
[[package]]
name = "strum_macros"
version = "0.24.2"
version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4faebde00e8ff94316c01800f9054fd2ba77d30d9e922541913051d1d978918b"
checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59"
dependencies = [
"heck",
"proc-macro2",
@ -5006,7 +4996,7 @@ dependencies = [
"tempfile",
"tokio",
"tracing-log",
"tracing-subscriber 0.3.11",
"tracing-subscriber",
"workspace-hack",
]
@ -5140,11 +5130,10 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.12"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74b7cc93fc23ba97fde84f7eea56c55d1ba183f495c6715defdfc7b9cb8c870f"
checksum = "db76ff9fa4b1458b3c7f077f3ff9887394058460d21e634355b273aaf11eea45"
dependencies = [
"js-sys",
"libc",
"num_threads",
]
@ -5532,28 +5521,6 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71"
dependencies = [
"ansi_term",
"chrono",
"lazy_static",
"matchers 0.0.1",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.11"
@ -5562,7 +5529,7 @@ checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596"
dependencies = [
"ansi_term",
"lazy_static",
"matchers 0.1.0",
"matchers",
"parking_lot 0.12.1",
"regex",
"serde",
@ -5605,7 +5572,7 @@ dependencies = [
"synchronized-writer",
"thiserror",
"tracing-log",
"tracing-subscriber 0.3.11",
"tracing-subscriber",
]
[[package]]
@ -6027,7 +5994,7 @@ dependencies = [
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber 0.3.11",
"tracing-subscriber",
"uuid 1.1.2",
"winapi",
"windows-sys",

View File

@ -14,11 +14,12 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.82"
thiserror = "1.0.31"
tokio = { version = "1.17" }
tracing = { version = "0.1", features = ["max_level_trace", "release_max_level_debug"] }
tracing-subscriber = "0.2.0"
trogging = { path = "../trogging", features = ["clap"] }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
assert_matches = "1.5"
[features]
azure = ["object_store/azure"] # Optional Azure Object store support
gcp = ["object_store/gcp"] # Optional GCP object store support

View File

@ -1,5 +1,159 @@
pub mod schema {
pub mod fetch;
pub mod merge;
pub mod schema_parse;
use serde::de::{Deserialize, Deserializer};
use serde::ser::{Serialize, Serializer};
use serde::*;
use std::collections::{HashMap, HashSet};
pub mod schema;
/// This struct is used to build up structs from TSM snapshots that we are going to use to bulk
/// ingest. They will be merged, then validated to check for anomalies that will complicate bulk
/// ingest such as tags/fields with the same name, or fields with different types across the whole
/// dataset. It is not the same as an IOx schema, although it is similar and some of the merge code
/// is similar. It's a transient data structure.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AggregateTSMSchema {
pub org_id: String,
pub bucket_id: String,
pub measurements: HashMap<String, AggregateTSMMeasurement>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AggregateTSMMeasurement {
// Map of tag name -> tag; note that the schema we get from the TSM tool has these as arrays.
// Using HashMaps internally to detect duplicates, so we have to do some custom serialisation
// for tags and fields here.
#[serde(
serialize_with = "serialize_map_values",
deserialize_with = "deserialize_tags"
)]
pub tags: HashMap<String, AggregateTSMTag>,
#[serde(
serialize_with = "serialize_map_values",
deserialize_with = "deserialize_fields"
)]
pub fields: HashMap<String, AggregateTSMField>,
}
fn serialize_map_values<S, K, V>(value: &HashMap<K, V>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
V: Serialize,
{
serializer.collect_seq(value.values())
}
fn deserialize_tags<'de, D>(deserializer: D) -> Result<HashMap<String, AggregateTSMTag>, D::Error>
where
D: Deserializer<'de>,
{
let v: Vec<AggregateTSMTag> = Deserialize::deserialize(deserializer)?;
Ok(v.into_iter().map(|t| (t.name.clone(), t)).collect())
}
fn deserialize_fields<'de, D>(
deserializer: D,
) -> Result<HashMap<String, AggregateTSMField>, D::Error>
where
D: Deserializer<'de>,
{
let v: Vec<AggregateTSMField> = Deserialize::deserialize(deserializer)?;
Ok(v.into_iter().map(|f| (f.name.clone(), f)).collect())
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct AggregateTSMTag {
pub name: String,
pub values: HashSet<String>,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct AggregateTSMField {
pub name: String,
pub types: HashSet<String>,
}
impl TryFrom<Vec<u8>> for AggregateTSMSchema {
type Error = serde_json::Error;
fn try_from(data: Vec<u8>) -> Result<Self, Self::Error> {
serde_json::from_slice(&data)
}
}
impl TryFrom<&str> for AggregateTSMSchema {
type Error = serde_json::Error;
fn try_from(data: &str) -> Result<Self, Self::Error> {
serde_json::from_str(data)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn parses() {
let json = r#"
{
"org_id": "1234",
"bucket_id": "5678",
"measurements": {
"cpu": {
"tags": [
{ "name": "host", "values": ["server", "desktop"] }
],
"fields": [
{ "name": "usage", "types": ["Float"] }
]
}
}
}
"#;
let schema: AggregateTSMSchema = json.try_into().unwrap();
assert_eq!(schema.org_id, "1234");
assert_eq!(schema.bucket_id, "5678");
assert_eq!(schema.measurements.len(), 1);
assert!(schema.measurements.contains_key("cpu"));
let measurement = schema.measurements.get("cpu").unwrap();
assert_eq!(measurement.tags.len(), 1);
let tag = &measurement.tags.values().next().unwrap();
assert_eq!(tag.name, "host");
assert_eq!(
tag.values,
HashSet::from(["server".to_string(), "desktop".to_string()])
);
let field = &measurement.fields.values().next().unwrap();
assert_eq!(field.name, "usage");
assert_eq!(field.types, HashSet::from(["Float".to_string()]));
// exercise the Vec<u8> tryfrom impl too
assert_eq!(schema, json.as_bytes().to_vec().try_into().unwrap());
// now exercise the serialise code too
let schema = AggregateTSMSchema {
org_id: "1234".to_string(),
bucket_id: "5678".to_string(),
measurements: HashMap::from([(
"cpu".to_string(),
AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from(["server".to_string(), "desktop".to_string()]),
},
)]),
fields: HashMap::from([(
"usage".to_string(),
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Float".to_string()]),
},
)]),
},
)]),
};
let _json = serde_json::to_string(&schema).unwrap();
// ^ not asserting on the value because vector ordering changes so it would be flakey. it's
// enough that it serialises without error
}
}

View File

@ -5,7 +5,7 @@ use object_store::{path::Path, DynObjectStore, GetResult};
use thiserror::Error;
use tokio::select;
use super::schema_parse::AggregateTSMSchema;
use crate::AggregateTSMSchema;
// Possible errors from schema commands
#[derive(Debug, Error)]

View File

@ -1,4 +1,4 @@
use crate::schema::schema_parse::{AggregateTSMSchema, Field, Measurement, Tag};
use crate::{AggregateTSMField, AggregateTSMMeasurement, AggregateTSMSchema, AggregateTSMTag};
use thiserror::Error;
@ -83,11 +83,14 @@ fn do_merge_schema(
}
}
fn do_merge_measurement(into_measurement: &mut Measurement, from_measurement: &Measurement) {
fn do_merge_measurement(
into_measurement: &mut AggregateTSMMeasurement,
from_measurement: &AggregateTSMMeasurement,
) {
// merge tags
from_measurement.tags.values().for_each(|from_tag| {
if let Some(into_tag) = into_measurement.tags.get(&from_tag.name) {
let mut new_tag = Tag {
let mut new_tag = AggregateTSMTag {
name: from_tag.name.clone(),
values: into_tag.values.clone(),
};
@ -102,7 +105,7 @@ fn do_merge_measurement(into_measurement: &mut Measurement, from_measurement: &M
// merge fields
from_measurement.fields.values().for_each(|from_field| {
if let Some(into_field) = into_measurement.fields.get(&from_field.name) {
let mut new_field = Field {
let mut new_field = AggregateTSMField {
name: from_field.name.clone(),
types: into_field.types.clone(),
};
@ -126,33 +129,33 @@ mod tests {
#[tokio::test]
async fn merge_measurements_adds_if_missing() {
let mut m1 = Measurement {
let mut m1 = AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
Tag {
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from(["server".to_string(), "desktop".to_string()]),
},
)]),
fields: HashMap::from([(
"usage".to_string(),
Field {
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Float".to_string()]),
},
)]),
};
let m2 = Measurement {
let m2 = AggregateTSMMeasurement {
tags: HashMap::from([(
"sensor".to_string(),
Tag {
AggregateTSMTag {
name: "sensor".to_string(),
values: HashSet::from(["top".to_string(), "bottom".to_string()]),
},
)]),
fields: HashMap::from([(
"temperature".to_string(),
Field {
AggregateTSMField {
name: "temperature".to_string(),
types: HashSet::from(["Float".to_string()]),
},
@ -165,33 +168,33 @@ mod tests {
#[tokio::test]
async fn merge_measurements_merges_tag_with_new_value() {
let mut m1 = Measurement {
let mut m1 = AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
Tag {
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from(["server".to_string(), "desktop".to_string()]),
},
)]),
fields: HashMap::from([(
"usage".to_string(),
Field {
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Float".to_string()]),
},
)]),
};
let m2 = Measurement {
let m2 = AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
Tag {
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from(["gadget".to_string()]),
},
)]),
fields: HashMap::from([(
"usage".to_string(),
Field {
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Float".to_string()]),
},
@ -212,33 +215,33 @@ mod tests {
#[tokio::test]
async fn merge_measurements_merges_tag_with_new_and_old_values() {
let mut m1 = Measurement {
let mut m1 = AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
Tag {
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from(["server".to_string(), "desktop".to_string()]),
},
)]),
fields: HashMap::from([(
"usage".to_string(),
Field {
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Float".to_string()]),
},
)]),
};
let m2 = Measurement {
let m2 = AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
Tag {
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from(["gadget".to_string(), "desktop".to_string()]),
},
)]),
fields: HashMap::from([(
"usage".to_string(),
Field {
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Float".to_string()]),
},
@ -259,33 +262,33 @@ mod tests {
#[tokio::test]
async fn merge_measurements_merges_field_with_new_type() {
let mut m1 = Measurement {
let mut m1 = AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
Tag {
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from(["server".to_string(), "desktop".to_string()]),
},
)]),
fields: HashMap::from([(
"usage".to_string(),
Field {
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Float".to_string()]),
},
)]),
};
let m2 = Measurement {
let m2 = AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
Tag {
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from(["server".to_string(), "desktop".to_string()]),
},
)]),
fields: HashMap::from([(
"usage".to_string(),
Field {
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Integer".to_string()]),
},
@ -302,33 +305,33 @@ mod tests {
#[tokio::test]
async fn merge_measurements_merges_field_with_new_and_old_types() {
let mut m1 = Measurement {
let mut m1 = AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
Tag {
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from(["server".to_string(), "desktop".to_string()]),
},
)]),
fields: HashMap::from([(
"usage".to_string(),
Field {
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Float".to_string()]),
},
)]),
};
let m2 = Measurement {
let m2 = AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
Tag {
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from(["server".to_string(), "desktop".to_string()]),
},
)]),
fields: HashMap::from([(
"usage".to_string(),
Field {
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Float".to_string(), "Integer".to_string()]),
},
@ -350,17 +353,17 @@ mod tests {
bucket_id: "mybucket".to_string(),
measurements: HashMap::from([(
"cpu".to_string(),
Measurement {
AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
Tag {
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from(["server".to_string(), "desktop".to_string()]),
},
)]),
fields: HashMap::from([(
"usage".to_string(),
Field {
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Float".to_string()]),
},
@ -373,17 +376,17 @@ mod tests {
bucket_id: "mybucket".to_string(),
measurements: HashMap::from([(
"weather".to_string(),
Measurement {
AggregateTSMMeasurement {
tags: HashMap::from([(
"location".to_string(),
Tag {
AggregateTSMTag {
name: "location".to_string(),
values: HashSet::from(["london".to_string()]),
},
)]),
fields: HashMap::from([(
"temperature".to_string(),
Field {
AggregateTSMField {
name: "temperature".to_string(),
types: HashSet::from(["Float".to_string()]),
},
@ -410,17 +413,17 @@ mod tests {
bucket_id: "mybucket".to_string(),
measurements: HashMap::from([(
"cpu".to_string(),
Measurement {
AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
Tag {
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from(["server".to_string(), "desktop".to_string()]),
},
)]),
fields: HashMap::from([(
"usage".to_string(),
Field {
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Float".to_string()]),
},
@ -433,17 +436,17 @@ mod tests {
bucket_id: "mybucket".to_string(),
measurements: HashMap::from([(
"cpu".to_string(),
Measurement {
AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
Tag {
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from(["gadget".to_string()]),
},
)]),
fields: HashMap::from([(
"usage".to_string(),
Field {
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Integer".to_string(), "Float".to_string()]),
},
@ -462,7 +465,7 @@ mod tests {
);
assert_eq!(
measurement.tags.values().cloned().collect::<Vec<_>>(),
vec![Tag {
vec![AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from([
"server".to_string(),
@ -477,7 +480,7 @@ mod tests {
);
assert_eq!(
measurement.fields.values().cloned().collect::<Vec<_>>(),
vec![Field {
vec![AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Integer".to_string(), "Float".to_string()])
}]
@ -497,10 +500,10 @@ mod tests {
bucket_id: bucket.clone(),
measurements: HashMap::from([(
"cpu".to_string(),
Measurement {
AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
Tag {
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from([
"server".to_string(),
@ -510,7 +513,7 @@ mod tests {
)]),
fields: HashMap::from([(
"usage".to_string(),
Field {
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Float".to_string()]),
},
@ -523,17 +526,17 @@ mod tests {
bucket_id: bucket.clone(),
measurements: HashMap::from([(
"cpu".to_string(),
Measurement {
AggregateTSMMeasurement {
tags: HashMap::from([(
"host".to_string(),
Tag {
AggregateTSMTag {
name: "host".to_string(),
values: HashSet::from(["gadget".to_string()]),
},
)]),
fields: HashMap::from([(
"usage".to_string(),
Field {
AggregateTSMField {
name: "usage".to_string(),
types: HashSet::from(["Integer".to_string()]),
},
@ -546,17 +549,17 @@ mod tests {
bucket_id: bucket.clone(),
measurements: HashMap::from([(
"weather".to_string(),
Measurement {
AggregateTSMMeasurement {
tags: HashMap::from([(
"location".to_string(),
Tag {
AggregateTSMTag {
name: "location".to_string(),
values: HashSet::from(["london".to_string()]),
},
)]),
fields: HashMap::from([(
"temperature".to_string(),
Field {
AggregateTSMField {
name: "temperature".to_string(),
types: HashSet::from(["Float".to_string()]),
},
@ -569,17 +572,17 @@ mod tests {
bucket_id: bucket,
measurements: HashMap::from([(
"weather".to_string(),
Measurement {
AggregateTSMMeasurement {
tags: HashMap::from([(
"location".to_string(),
Tag {
AggregateTSMTag {
name: "location".to_string(),
values: HashSet::from(["berlin".to_string()]),
},
)]),
fields: HashMap::from([(
"temperature".to_string(),
Field {
AggregateTSMField {
name: "temperature".to_string(),
types: HashSet::from(["Integer".to_string()]),
},

View File

@ -1,2 +1,3 @@
mod merge;
mod schema_parse;
pub mod fetch;
pub mod merge;
pub mod validate;

View File

@ -1,128 +0,0 @@
use serde::de::{Deserialize, Deserializer};
use serde::ser::{Serialize, Serializer};
use serde::*;
use std::collections::{HashMap, HashSet};
/// This struct is used to build up structs from TSM snapshots that we are going to use to bulk
/// ingest. They will be merged, then validated to check for anomalies that will complicate bulk
/// ingest such as tags/fields with the same name, or fields with different types across the whole
/// dataset. It is not the same as an IOx schema, although it is similar and some of the merge code
/// is similar. It's a transient data structure.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AggregateTSMSchema {
pub org_id: String,
pub bucket_id: String,
pub measurements: HashMap<String, Measurement>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Measurement {
// Map of tag name -> tag; note that the schema we get from the TSM tool has these as arrays.
// Using HashMaps internally to detect duplicates, so we have to do some custom serialisation
// for tags and fields here.
#[serde(
serialize_with = "serialize_map_values",
deserialize_with = "deserialize_tags"
)]
pub tags: HashMap<String, Tag>,
#[serde(
serialize_with = "serialize_map_values",
deserialize_with = "deserialize_fields"
)]
pub fields: HashMap<String, Field>,
}
fn serialize_map_values<S, K, V>(value: &HashMap<K, V>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
V: Serialize,
{
serializer.collect_seq(value.values())
}
fn deserialize_tags<'de, D>(deserializer: D) -> Result<HashMap<String, Tag>, D::Error>
where
D: Deserializer<'de>,
{
let v: Vec<Tag> = Deserialize::deserialize(deserializer)?;
Ok(v.into_iter().map(|t| (t.name.clone(), t)).collect())
}
fn deserialize_fields<'de, D>(deserializer: D) -> Result<HashMap<String, Field>, D::Error>
where
D: Deserializer<'de>,
{
let v: Vec<Field> = Deserialize::deserialize(deserializer)?;
Ok(v.into_iter().map(|f| (f.name.clone(), f)).collect())
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Tag {
pub name: String,
pub values: HashSet<String>,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Field {
pub name: String,
pub types: HashSet<String>,
}
impl TryFrom<Vec<u8>> for AggregateTSMSchema {
type Error = serde_json::Error;
fn try_from(data: Vec<u8>) -> Result<Self, Self::Error> {
serde_json::from_slice(&data)
}
}
impl TryFrom<&str> for AggregateTSMSchema {
type Error = serde_json::Error;
fn try_from(data: &str) -> Result<Self, Self::Error> {
serde_json::from_str(data)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn parses() {
let json = r#"
{
"org_id": "1234",
"bucket_id": "5678",
"measurements": {
"cpu": {
"tags": [
{ "name": "host", "values": ["server", "desktop"] }
],
"fields": [
{ "name": "usage", "types": ["float"] }
]
}
}
}
"#;
let schema: AggregateTSMSchema = json.try_into().unwrap();
assert_eq!(schema.org_id, "1234");
assert_eq!(schema.bucket_id, "5678");
assert_eq!(schema.measurements.len(), 1);
assert!(schema.measurements.contains_key("cpu"));
let measurement = schema.measurements.get("cpu").unwrap();
assert_eq!(measurement.tags.len(), 1);
let tag = &measurement.tags.values().next().unwrap();
assert_eq!(tag.name, "host");
assert_eq!(
tag.values,
HashSet::from(["server".to_string(), "desktop".to_string()])
);
let field = &measurement.fields.values().next().unwrap();
assert_eq!(field.name, "usage");
assert_eq!(field.types, HashSet::from(["float".to_string()]));
// exercise the Vec<u8> tryfrom impl too
assert_eq!(schema, json.as_bytes().to_vec().try_into().unwrap());
}
}

View File

@ -0,0 +1,132 @@
use crate::AggregateTSMSchema;
use thiserror::Error;
// Possible validation errors
#[derive(Debug, Error)]
pub enum ValidationError {
#[error("Measurement '{measurement}' has a tag and field with the same name: {name}")]
TagAndFieldSameName { measurement: String, name: String },
#[error(
"Measurement '{measurement}' has field '{name}' with multiple types: {:?}",
types
)]
FieldWithMultipleTypes {
measurement: String,
name: String,
types: Vec<String>,
},
}
pub fn validate_schema(schema: &AggregateTSMSchema) -> Result<(), Vec<ValidationError>> {
let mut errors: Vec<ValidationError> = vec![];
for (measurement_name, measurement) in &schema.measurements {
if let Some(tag_name) = measurement
.tags
.keys()
.find(|&t| measurement.fields.contains_key(t))
{
errors.push(ValidationError::TagAndFieldSameName {
measurement: measurement_name.clone(),
name: tag_name.clone(),
});
}
if let Some(field) = measurement.fields.values().find(|f| f.types.len() > 1) {
errors.push(ValidationError::FieldWithMultipleTypes {
measurement: measurement_name.clone(),
name: field.name.clone(),
types: field.types.iter().cloned().collect::<Vec<_>>(),
});
}
}
if !errors.is_empty() {
Err(errors)
} else {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
#[tokio::test]
async fn good() {
let json = r#"
{
"org_id": "1234",
"bucket_id": "5678",
"measurements": {
"cpu": {
"tags": [
{ "name": "host", "values": ["server", "desktop"] }
],
"fields": [
{ "name": "usage", "types": ["Float"] }
]
}
}
}
"#;
let schema: AggregateTSMSchema = json.try_into().unwrap();
assert_matches!(validate_schema(&schema), Ok(_));
}
#[tokio::test]
async fn tag_and_field_same_name() {
let json = r#"
{
"org_id": "1234",
"bucket_id": "5678",
"measurements": {
"weather": {
"tags": [
{ "name": "temperature", "values": ["true"] }
],
"fields": [
{ "name": "temperature", "types": ["Float"] }
]
}
}
}
"#;
let schema: AggregateTSMSchema = json.try_into().unwrap();
if let Err(errors) = validate_schema(&schema) {
assert_eq!(errors.len(), 1);
assert_matches!(
errors.get(0),
Some(ValidationError::TagAndFieldSameName { .. })
);
}
}
#[tokio::test]
async fn field_with_multiple_types() {
let json = r#"
{
"org_id": "1234",
"bucket_id": "5678",
"measurements": {
"weather": {
"tags": [
{ "name": "location", "values": ["London", "Berlin"] }
],
"fields": [
{ "name": "temperature", "types": ["Float", "Integer"] }
]
}
}
}
"#;
let schema: AggregateTSMSchema = json.try_into().unwrap();
if let Err(errors) = validate_schema(&schema) {
assert_eq!(errors.len(), 1);
assert_matches!(
errors.get(0),
Some(ValidationError::FieldWithMultipleTypes { .. })
);
}
}
}

View File

@ -8,6 +8,7 @@ use thiserror::Error;
use import::schema::{
fetch::{fetch_schema, FetchError},
merge::{SchemaMergeError, SchemaMerger},
validate::{validate_schema, ValidationError},
};
// Possible errors from schema commands
@ -21,6 +22,9 @@ pub enum SchemaCommandError {
#[error("Error merging schemas: {0}")]
Merging(#[from] SchemaMergeError),
#[error("{0}")]
Validating(#[from] ValidationError),
}
#[derive(Parser, Debug)]
@ -57,7 +61,6 @@ pub struct MergeConfig {
pub async fn command(config: Config) -> Result<(), SchemaCommandError> {
match config {
Config::Merge(merge_config) => {
println!("Merging schemas from object store");
let object_store = make_object_store(&merge_config.object_store)
.map_err(SchemaCommandError::ObjectStoreParsing)?;
@ -70,8 +73,14 @@ pub async fn command(config: Config) -> Result<(), SchemaCommandError> {
let merger = SchemaMerger::new(merge_config.org_id, merge_config.bucket_id, schemas);
let merged = merger.merge().map_err(SchemaCommandError::Merging)?;
// just print the merged schema for now; we'll do more with this in future PRs
println!("{:?}", merged);
println!("Merged schema:\n{:?}", merged);
if let Err(errors) = validate_schema(&merged) {
eprintln!("Schema conflicts:");
for e in errors {
eprintln!("- {}", e);
}
}
Ok(())
}
}

View File

@ -66,7 +66,7 @@ tokio-util = { version = "0.7", features = ["codec", "tracing"] }
tonic = { version = "0.8", features = ["async-trait", "axum", "channel", "codegen", "h2", "hyper", "hyper-timeout", "prost", "prost-derive", "prost1", "tokio", "tower", "tracing-futures", "transport"] }
tower = { version = "0.4", features = ["__common", "balance", "buffer", "discover", "futures-core", "futures-util", "indexmap", "limit", "load", "log", "make", "pin-project", "pin-project-lite", "rand", "ready-cache", "slab", "timeout", "tokio", "tokio-util", "tracing", "util"] }
tower-http = { version = "0.3", features = ["catch-panic", "map-response-body", "tower", "tracing", "util"] }
tracing = { version = "0.1", features = ["attributes", "log", "max_level_trace", "release_max_level_debug", "release_max_level_trace", "std", "tracing-attributes"] }
tracing = { version = "0.1", features = ["attributes", "log", "max_level_trace", "release_max_level_trace", "std", "tracing-attributes"] }
tracing-core = { version = "0.1", features = ["lazy_static", "std"] }
tracing-log = { version = "0.1", features = ["log-tracer", "std", "trace-logger"] }
tracing-subscriber = { version = "0.3", features = ["alloc", "ansi", "ansi_term", "env-filter", "fmt", "json", "lazy_static", "matchers", "parking_lot", "regex", "registry", "serde", "serde_json", "sharded-slab", "smallvec", "std", "thread_local", "tracing", "tracing-log", "tracing-serde"] }