refactor: remove old tsm import code (#7804)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
b2c9592581
commit
042a6a66d5
|
@ -1 +1 @@
|
|||
pub mod tsm;
|
||||
|
||||
|
|
|
@ -1,382 +0,0 @@
|
|||
use chrono::{offset::FixedOffset, DateTime};
|
||||
use schema::InfluxFieldType;
|
||||
use serde::de::{Deserialize, Deserializer};
|
||||
use serde::ser::{Serialize, Serializer};
|
||||
use serde::*;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
mod tsm_schema;
|
||||
|
||||
// Public API
|
||||
pub use tsm_schema::{
|
||||
fetch::{fetch_schema, FetchError},
|
||||
merge::{SchemaMergeError, SchemaMerger},
|
||||
update_catalog::{update_iox_catalog, UpdateCatalogError},
|
||||
validate::{validate_schema, ValidationError},
|
||||
};
|
||||
|
||||
/// This struct is used to build up schemas from TSM snapshots that we are going to use to bulk
|
||||
/// ingest. They will be merged, then validated to check for anomalies that will complicate bulk
|
||||
/// ingest such as tags/fields with the same name, or fields with different types across the whole
|
||||
/// dataset. It is not the same as an IOx schema, although it is similar and some of the merge code
|
||||
/// is similar. It's a transient data structure.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct AggregateTSMSchema {
|
||||
pub org_id: String,
|
||||
pub bucket_id: String,
|
||||
pub measurements: HashMap<String, AggregateTSMMeasurement>,
|
||||
}
|
||||
|
||||
impl AggregateTSMSchema {
|
||||
pub fn types_are_valid(&self) -> bool {
|
||||
self.measurements.values().all(|m| {
|
||||
m.fields.values().all(|f| {
|
||||
f.types.len() == 1
|
||||
&& InfluxFieldType::try_from(f.types.iter().next().unwrap()).is_ok()
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct AggregateTSMMeasurement {
|
||||
// Map of tag name -> tag; note that the schema we get from the TSM tool has these as arrays.
|
||||
// Using HashMaps internally to detect duplicates, so we have to do some custom serialisation
|
||||
// for tags and fields here.
|
||||
#[serde(
|
||||
serialize_with = "serialize_map_values",
|
||||
deserialize_with = "deserialize_tags"
|
||||
)]
|
||||
pub tags: HashMap<String, AggregateTSMTag>,
|
||||
#[serde(
|
||||
serialize_with = "serialize_map_values",
|
||||
deserialize_with = "deserialize_fields"
|
||||
)]
|
||||
pub fields: HashMap<String, AggregateTSMField>,
|
||||
pub earliest_time: DateTime<FixedOffset>,
|
||||
pub latest_time: DateTime<FixedOffset>,
|
||||
}
|
||||
|
||||
fn serialize_map_values<S, K, V>(value: &HashMap<K, V>, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
V: Serialize,
|
||||
{
|
||||
serializer.collect_seq(value.values())
|
||||
}
|
||||
|
||||
fn deserialize_tags<'de, D>(deserializer: D) -> Result<HashMap<String, AggregateTSMTag>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let v: Vec<AggregateTSMTag> = Deserialize::deserialize(deserializer)?;
|
||||
Ok(v.into_iter().map(|t| (t.name.clone(), t)).collect())
|
||||
}
|
||||
|
||||
fn deserialize_fields<'de, D>(
|
||||
deserializer: D,
|
||||
) -> Result<HashMap<String, AggregateTSMField>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let v: Vec<AggregateTSMField> = Deserialize::deserialize(deserializer)?;
|
||||
Ok(v.into_iter().map(|f| (f.name.clone(), f)).collect())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct AggregateTSMTag {
|
||||
pub name: String,
|
||||
pub values: HashSet<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct AggregateTSMField {
|
||||
pub name: String,
|
||||
pub types: HashSet<String>,
|
||||
}
|
||||
|
||||
impl TryFrom<Vec<u8>> for AggregateTSMSchema {
|
||||
type Error = serde_json::Error;
|
||||
|
||||
fn try_from(data: Vec<u8>) -> Result<Self, Self::Error> {
|
||||
serde_json::from_slice(&data)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for AggregateTSMSchema {
|
||||
type Error = serde_json::Error;
|
||||
|
||||
fn try_from(data: &str) -> Result<Self, Self::Error> {
|
||||
serde_json::from_str(data)
|
||||
}
|
||||
}
|
||||
|
||||
/// A variation on AggregateTSMSchema with the following differences:
|
||||
/// - no org and bucket
|
||||
/// - no earliest/latest time
|
||||
/// - no tags (that may change once we decide what to do about tags/fields with the same name- for
|
||||
/// now they'll fail validation and you can't fix it via the override)
|
||||
/// - fields have only one type
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct AggregateTSMSchemaOverride {
|
||||
pub measurements: HashMap<String, AggregateTSMSchemaOverrideMeasurement>,
|
||||
}
|
||||
|
||||
/// Field type override; note there is only one type
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct AggregateTSMSchemaOverrideField {
|
||||
pub name: String,
|
||||
pub r#type: String,
|
||||
}
|
||||
|
||||
/// Override for a measurement, not there are no tags as they can't be overridden
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct AggregateTSMSchemaOverrideMeasurement {
|
||||
// Map of field name -> field; note that the schema of the config file has these as arrays.
|
||||
// Using HashMaps internally to avoid duplicates, so we have to do some custom serialisation
|
||||
// for fields here.
|
||||
#[serde(
|
||||
serialize_with = "serialize_map_values",
|
||||
deserialize_with = "deserialize_override_fields",
|
||||
default
|
||||
)]
|
||||
pub fields: HashMap<String, AggregateTSMSchemaOverrideField>,
|
||||
}
|
||||
|
||||
fn deserialize_override_fields<'de, D>(
|
||||
deserializer: D,
|
||||
) -> Result<HashMap<String, AggregateTSMSchemaOverrideField>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let v: Vec<AggregateTSMSchemaOverrideField> = Deserialize::deserialize(deserializer)?;
|
||||
Ok(v.into_iter().map(|f| (f.name.clone(), f)).collect())
|
||||
}
|
||||
|
||||
impl TryFrom<Vec<u8>> for AggregateTSMSchemaOverride {
|
||||
type Error = serde_json::Error;
|
||||
|
||||
fn try_from(data: Vec<u8>) -> Result<Self, Self::Error> {
|
||||
serde_json::from_slice(&data)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for AggregateTSMSchemaOverride {
|
||||
type Error = serde_json::Error;
|
||||
|
||||
fn try_from(data: &str) -> Result<Self, Self::Error> {
|
||||
serde_json::from_str(data)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
|
||||
#[tokio::test]
|
||||
async fn parses() {
|
||||
let json = r#"
|
||||
{
|
||||
"org_id": "1234",
|
||||
"bucket_id": "5678",
|
||||
"measurements": {
|
||||
"cpu": {
|
||||
"tags": [
|
||||
{ "name": "host", "values": ["server", "desktop"] }
|
||||
],
|
||||
"fields": [
|
||||
{ "name": "usage", "types": ["Float"] }
|
||||
],
|
||||
"earliest_time": "2022-01-01T00:00:00.00Z",
|
||||
"latest_time": "2022-07-07T06:00:00.00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
"#;
|
||||
let schema: AggregateTSMSchema = json.try_into().unwrap();
|
||||
assert_eq!(schema.org_id, "1234");
|
||||
assert_eq!(schema.bucket_id, "5678");
|
||||
assert_eq!(schema.measurements.len(), 1);
|
||||
assert!(schema.measurements.contains_key("cpu"));
|
||||
let measurement = schema.measurements.get("cpu").unwrap();
|
||||
assert_eq!(measurement.tags.len(), 1);
|
||||
let tag = &measurement.tags.values().next().unwrap();
|
||||
assert_eq!(tag.name, "host");
|
||||
assert_eq!(
|
||||
tag.values,
|
||||
HashSet::from(["server".to_string(), "desktop".to_string()])
|
||||
);
|
||||
let field = &measurement.fields.values().next().unwrap();
|
||||
assert_eq!(field.name, "usage");
|
||||
assert_eq!(field.types, HashSet::from(["Float".to_string()]));
|
||||
// exercise the Vec<u8> tryfrom impl too
|
||||
assert_eq!(schema, json.as_bytes().to_vec().try_into().unwrap());
|
||||
// now exercise the serialise code too
|
||||
let schema = AggregateTSMSchema {
|
||||
org_id: "1234".to_string(),
|
||||
bucket_id: "5678".to_string(),
|
||||
measurements: HashMap::from([(
|
||||
"cpu".to_string(),
|
||||
AggregateTSMMeasurement {
|
||||
tags: HashMap::from([(
|
||||
"host".to_string(),
|
||||
AggregateTSMTag {
|
||||
name: "host".to_string(),
|
||||
values: HashSet::from(["server".to_string(), "desktop".to_string()]),
|
||||
},
|
||||
)]),
|
||||
fields: HashMap::from([(
|
||||
"usage".to_string(),
|
||||
AggregateTSMField {
|
||||
name: "usage".to_string(),
|
||||
types: HashSet::from(["Float".to_string()]),
|
||||
},
|
||||
)]),
|
||||
earliest_time: DateTime::parse_from_rfc3339("2022-01-01T00:00:00.00Z").unwrap(),
|
||||
latest_time: DateTime::parse_from_rfc3339("2022-07-07T06:00:00.00Z").unwrap(),
|
||||
},
|
||||
)]),
|
||||
};
|
||||
let _json = serde_json::to_string(&schema).unwrap();
|
||||
// ^ not asserting on the value because vector ordering changes so it would be flakey. it's
|
||||
// enough that it serialises without error
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn type_validation_happy() {
|
||||
let json = r#"
|
||||
{
|
||||
"org_id": "1234",
|
||||
"bucket_id": "5678",
|
||||
"measurements": {
|
||||
"cpu": {
|
||||
"tags": [
|
||||
{ "name": "host", "values": ["server", "desktop"] }
|
||||
],
|
||||
"fields": [
|
||||
{ "name": "usage", "types": ["Float"] }
|
||||
],
|
||||
"earliest_time": "2022-01-01T00:00:00.00Z",
|
||||
"latest_time": "2022-07-07T06:00:00.00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
"#;
|
||||
let schema: AggregateTSMSchema = json.try_into().unwrap();
|
||||
assert!(schema.types_are_valid());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn type_validation_invalid_type() {
|
||||
let json = r#"
|
||||
{
|
||||
"org_id": "1234",
|
||||
"bucket_id": "5678",
|
||||
"measurements": {
|
||||
"cpu": {
|
||||
"tags": [
|
||||
{ "name": "host", "values": ["server", "desktop"] }
|
||||
],
|
||||
"fields": [
|
||||
{ "name": "usage", "types": ["FloatyMcFloatFace"] }
|
||||
],
|
||||
"earliest_time": "2022-01-01T00:00:00.00Z",
|
||||
"latest_time": "2022-07-07T06:00:00.00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
"#;
|
||||
let schema: AggregateTSMSchema = json.try_into().unwrap();
|
||||
assert!(!schema.types_are_valid());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn type_validation_multiple_types() {
|
||||
let json = r#"
|
||||
{
|
||||
"org_id": "1234",
|
||||
"bucket_id": "5678",
|
||||
"measurements": {
|
||||
"cpu": {
|
||||
"tags": [
|
||||
{ "name": "host", "values": ["server", "desktop"] }
|
||||
],
|
||||
"fields": [
|
||||
{ "name": "usage", "types": ["Float", "Integer"] }
|
||||
],
|
||||
"earliest_time": "2022-01-01T00:00:00.00Z",
|
||||
"latest_time": "2022-07-07T06:00:00.00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
"#;
|
||||
let schema: AggregateTSMSchema = json.try_into().unwrap();
|
||||
assert!(!schema.types_are_valid());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn override_parses() {
|
||||
let json = r#"
|
||||
{
|
||||
"measurements": {
|
||||
"cpu": {
|
||||
"fields": [
|
||||
{ "name": "usage", "type": "Float" }
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
"#;
|
||||
let override_schema: AggregateTSMSchemaOverride = json.try_into().unwrap();
|
||||
assert_eq!(override_schema.measurements.len(), 1);
|
||||
assert!(override_schema.measurements.contains_key("cpu"));
|
||||
let measurement = override_schema.measurements.get("cpu").unwrap();
|
||||
let field = &measurement.fields.values().next().unwrap();
|
||||
assert_eq!(field.name, "usage");
|
||||
assert_eq!(field.r#type, "Float");
|
||||
// exercise the Vec<u8> tryfrom impl too
|
||||
assert_eq!(
|
||||
override_schema,
|
||||
json.as_bytes().to_vec().try_into().unwrap()
|
||||
);
|
||||
// now exercise the serialise code too, although this is only used in tests
|
||||
let schema = AggregateTSMSchemaOverride {
|
||||
measurements: HashMap::from([(
|
||||
"cpu".to_string(),
|
||||
AggregateTSMSchemaOverrideMeasurement {
|
||||
fields: HashMap::from([(
|
||||
"usage".to_string(),
|
||||
AggregateTSMSchemaOverrideField {
|
||||
name: "usage".to_string(),
|
||||
r#type: "Float".to_string(),
|
||||
},
|
||||
)]),
|
||||
},
|
||||
)]),
|
||||
};
|
||||
let _json = serde_json::to_string(&schema).unwrap();
|
||||
// ^ not asserting on the value because vector ordering changes so it would be flakey. it's
|
||||
// enough that it serialises without error
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn override_fails_to_parse_multiple_types() {
|
||||
// this clearly breaks the schema but someone could conceivably try this by copy-paste
|
||||
// accident so let's be sure that it fails
|
||||
let json = r#"
|
||||
{
|
||||
"measurements": {
|
||||
"cpu": {
|
||||
"fields": [
|
||||
{ "name": "usage", "types": ["Float", "Integer"] }
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
"#;
|
||||
let result: Result<AggregateTSMSchemaOverride, serde_json::Error> = json.try_into();
|
||||
assert_matches!(result, Err(serde_json::Error { .. }));
|
||||
}
|
||||
}
|
|
@ -1,59 +0,0 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use futures::prelude::*;
|
||||
use object_store::{path::Path, DynObjectStore, GetResult};
|
||||
use thiserror::Error;
|
||||
use tokio::select;
|
||||
|
||||
use crate::tsm::AggregateTSMSchema;
|
||||
|
||||
// Possible errors from schema commands
|
||||
#[derive(Debug, Error)]
|
||||
pub enum FetchError {
|
||||
#[error("Error fetching schemas from object storage: {0}")]
|
||||
Fetching(#[from] object_store::Error),
|
||||
|
||||
#[error("Error parsing schema from object storage: {0}")]
|
||||
Parsing(#[from] serde_json::Error),
|
||||
}
|
||||
|
||||
pub async fn fetch_schema(
|
||||
object_store: Arc<DynObjectStore>,
|
||||
prefix: Option<&Path>,
|
||||
suffix: &str,
|
||||
) -> Result<Vec<AggregateTSMSchema>, FetchError> {
|
||||
let mut schemas: Vec<AggregateTSMSchema> = vec![];
|
||||
let mut results = object_store
|
||||
.list(prefix)
|
||||
.await
|
||||
.map_err(FetchError::Fetching)?;
|
||||
// TODO: refactor to do these concurrently using `buffered`
|
||||
loop {
|
||||
select! {
|
||||
item = results.next() => {
|
||||
match item {
|
||||
Some(item) => {
|
||||
let item = item.map_err(FetchError::Fetching)?;
|
||||
if !item.location.as_ref().ends_with(suffix) {
|
||||
continue;
|
||||
}
|
||||
let read_stream = object_store.get(&item.location).await?;
|
||||
if let GetResult::Stream(read_stream) = read_stream {
|
||||
let chunks: Vec<_> = read_stream.try_collect().await?;
|
||||
let mut buf = Vec::with_capacity(chunks.iter().map(|c| c.len()).sum::<usize>());
|
||||
for c in chunks {
|
||||
buf.extend(c);
|
||||
}
|
||||
let schema: AggregateTSMSchema = buf.try_into().map_err(FetchError::Parsing)?;
|
||||
schemas.push(schema);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(schemas)
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -1,4 +0,0 @@
|
|||
pub mod fetch;
|
||||
pub mod merge;
|
||||
pub mod update_catalog;
|
||||
pub mod validate;
|
File diff suppressed because it is too large
Load Diff
|
@ -1,136 +0,0 @@
|
|||
use crate::tsm::AggregateTSMSchema;
|
||||
use thiserror::Error;
|
||||
|
||||
// Possible validation errors
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ValidationError {
|
||||
#[error("Measurement '{measurement}' has a tag and field with the same name: {name}")]
|
||||
TagAndFieldSameName { measurement: String, name: String },
|
||||
|
||||
#[error(
|
||||
"Measurement '{measurement}' has field '{name}' with multiple types: {:?}",
|
||||
types
|
||||
)]
|
||||
FieldWithMultipleTypes {
|
||||
measurement: String,
|
||||
name: String,
|
||||
types: Vec<String>,
|
||||
},
|
||||
}
|
||||
|
||||
pub fn validate_schema(schema: &AggregateTSMSchema) -> Result<(), Vec<ValidationError>> {
|
||||
let mut errors: Vec<ValidationError> = vec![];
|
||||
for (measurement_name, measurement) in &schema.measurements {
|
||||
if let Some(tag_name) = measurement
|
||||
.tags
|
||||
.keys()
|
||||
.find(|&t| measurement.fields.contains_key(t))
|
||||
{
|
||||
errors.push(ValidationError::TagAndFieldSameName {
|
||||
measurement: measurement_name.clone(),
|
||||
name: tag_name.clone(),
|
||||
});
|
||||
}
|
||||
if let Some(field) = measurement.fields.values().find(|f| f.types.len() > 1) {
|
||||
errors.push(ValidationError::FieldWithMultipleTypes {
|
||||
measurement: measurement_name.clone(),
|
||||
name: field.name.clone(),
|
||||
types: field.types.iter().cloned().collect::<Vec<_>>(),
|
||||
});
|
||||
}
|
||||
}
|
||||
if !errors.is_empty() {
|
||||
Err(errors)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
|
||||
#[tokio::test]
|
||||
async fn good() {
|
||||
let json = r#"
|
||||
{
|
||||
"org_id": "1234",
|
||||
"bucket_id": "5678",
|
||||
"measurements": {
|
||||
"cpu": {
|
||||
"tags": [
|
||||
{ "name": "host", "values": ["server", "desktop"] }
|
||||
],
|
||||
"fields": [
|
||||
{ "name": "usage", "types": ["Float"] }
|
||||
],
|
||||
"earliest_time": "2022-01-01T00:00:00.00Z",
|
||||
"latest_time": "2022-07-07T06:00:00.00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
"#;
|
||||
let schema: AggregateTSMSchema = json.try_into().unwrap();
|
||||
assert_matches!(validate_schema(&schema), Ok(_));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn tag_and_field_same_name() {
|
||||
let json = r#"
|
||||
{
|
||||
"org_id": "1234",
|
||||
"bucket_id": "5678",
|
||||
"measurements": {
|
||||
"weather": {
|
||||
"tags": [
|
||||
{ "name": "temperature", "values": ["true"] }
|
||||
],
|
||||
"fields": [
|
||||
{ "name": "temperature", "types": ["Float"] }
|
||||
],
|
||||
"earliest_time": "2022-01-01T00:00:00.00Z",
|
||||
"latest_time": "2022-07-07T06:00:00.00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
"#;
|
||||
let schema: AggregateTSMSchema = json.try_into().unwrap();
|
||||
let errors = validate_schema(&schema).expect_err("should fail to validate schema");
|
||||
assert_eq!(errors.len(), 1);
|
||||
assert_matches!(
|
||||
errors.get(0),
|
||||
Some(ValidationError::TagAndFieldSameName { .. })
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn field_with_multiple_types() {
|
||||
let json = r#"
|
||||
{
|
||||
"org_id": "1234",
|
||||
"bucket_id": "5678",
|
||||
"measurements": {
|
||||
"weather": {
|
||||
"tags": [
|
||||
{ "name": "location", "values": ["London", "Berlin"] }
|
||||
],
|
||||
"fields": [
|
||||
{ "name": "temperature", "types": ["Float", "Integer"] }
|
||||
],
|
||||
"earliest_time": "2022-01-01T00:00:00.00Z",
|
||||
"latest_time": "2022-07-07T06:00:00.00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
"#;
|
||||
let schema: AggregateTSMSchema = json.try_into().unwrap();
|
||||
let errors = validate_schema(&schema).expect_err("should fail to validate schema");
|
||||
assert_eq!(errors.len(), 1);
|
||||
assert_matches!(
|
||||
errors.get(0),
|
||||
Some(ValidationError::FieldWithMultipleTypes { .. })
|
||||
);
|
||||
}
|
||||
}
|
|
@ -1,31 +0,0 @@
|
|||
use thiserror::Error;
|
||||
|
||||
mod schema;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ImportError {
|
||||
#[error("Error in schema command: {0}")]
|
||||
SchemaError(#[from] schema::SchemaCommandError),
|
||||
}
|
||||
|
||||
#[derive(Debug, clap::Parser)]
|
||||
pub struct Config {
|
||||
#[clap(subcommand)]
|
||||
command: Command,
|
||||
}
|
||||
|
||||
#[derive(clap::Parser, Debug)]
|
||||
pub enum Command {
|
||||
/// Operations related to schema analysis.
|
||||
#[clap(subcommand)]
|
||||
Schema(Box<schema::Config>),
|
||||
}
|
||||
|
||||
/// Handle variants of the schema command.
|
||||
pub async fn command(config: Config) -> Result<(), ImportError> {
|
||||
match config.command {
|
||||
Command::Schema(schema_config) => schema::command(*schema_config)
|
||||
.await
|
||||
.map_err(ImportError::SchemaError),
|
||||
}
|
||||
}
|
|
@ -1,175 +0,0 @@
|
|||
use std::{
|
||||
fmt::{Display, Formatter},
|
||||
fs,
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use clap::Parser;
|
||||
use clap_blocks::{
|
||||
catalog_dsn::CatalogDsnConfig,
|
||||
object_store::{make_object_store, ObjectStoreConfig},
|
||||
};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use object_store::{path::Path, DynObjectStore};
|
||||
use object_store_metrics::ObjectStoreMetrics;
|
||||
use thiserror::Error;
|
||||
|
||||
use import_export::tsm::{
|
||||
fetch_schema, update_iox_catalog, validate_schema, AggregateTSMSchemaOverride, FetchError,
|
||||
SchemaMergeError, SchemaMerger, UpdateCatalogError, ValidationError,
|
||||
};
|
||||
|
||||
use crate::process_info::setup_metric_registry;
|
||||
|
||||
// Possible errors from schema commands
|
||||
#[derive(Debug, Error)]
|
||||
pub enum SchemaCommandError {
|
||||
#[error("Cannot parse object store config: {0}")]
|
||||
ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError),
|
||||
|
||||
#[error("Catalog DSN error: {0}")]
|
||||
CatalogDsn(#[from] clap_blocks::catalog_dsn::Error),
|
||||
|
||||
#[error("Error fetching schemas from object storage: {0}")]
|
||||
Fetching(#[from] FetchError),
|
||||
|
||||
#[error("Error merging schemas: {0}")]
|
||||
Merging(#[from] SchemaMergeError),
|
||||
|
||||
#[error("Schema conflicts during merge:\n{0}")]
|
||||
Validating(#[from] ValidationErrors),
|
||||
|
||||
#[error("Merged schema must have one valid Influx type only")]
|
||||
InvalidFieldTypes(),
|
||||
|
||||
#[error("Error updating IOx catalog with merged schema: {0}")]
|
||||
UpdateCatalogError(#[from] UpdateCatalogError),
|
||||
|
||||
#[error("Error reading schema override file from disk: {0}")]
|
||||
SchemaOverrideFileReadError(#[from] std::io::Error),
|
||||
|
||||
#[error("Error parsing schema override file: {0}")]
|
||||
SchemaOverrideParseError(#[from] serde_json::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub struct ValidationErrors(Vec<ValidationError>);
|
||||
|
||||
impl Display for ValidationErrors {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.iter().fold(Ok(()), |result, e| {
|
||||
result.and_then(|_| writeln!(f, "- {e}"))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
pub enum Config {
|
||||
/// Merge schemas produced in the IOx prestep
|
||||
Merge(MergeConfig),
|
||||
}
|
||||
|
||||
/// Merge schema created in pre-step
|
||||
#[derive(Parser, Debug)]
|
||||
pub struct MergeConfig {
|
||||
#[clap(flatten)]
|
||||
object_store: ObjectStoreConfig,
|
||||
|
||||
#[clap(flatten)]
|
||||
catalog_dsn: CatalogDsnConfig,
|
||||
|
||||
#[clap(long)]
|
||||
/// Retention setting setting (used only if we need to create the namespace)
|
||||
retention: Option<String>,
|
||||
|
||||
#[clap(long)]
|
||||
/// The Org ID of the schemas to merge
|
||||
org_id: String,
|
||||
|
||||
#[clap(long)]
|
||||
/// The Bucket ID of the schemas to merge
|
||||
bucket_id: String,
|
||||
|
||||
#[clap(long)]
|
||||
/// The filename prefix to look for in the object store bucket.
|
||||
/// Convention is to use `{org_id}/{bucket_id}/{job_name}/`
|
||||
prefix: String,
|
||||
|
||||
#[clap(long, default_value = ".schema.json")]
|
||||
/// The filename suffix to look for in the object store bucket
|
||||
suffix: String,
|
||||
|
||||
#[clap(long)]
|
||||
/// Filename of schema override file used to instruct this tool on how to resolve schema
|
||||
/// conflicts in the TSM schemas before updating the schema in the IOx catalog.
|
||||
schema_override_file: Option<PathBuf>,
|
||||
}
|
||||
|
||||
/// Entry-point for the schema command
|
||||
pub async fn command(config: Config) -> Result<(), SchemaCommandError> {
|
||||
match config {
|
||||
Config::Merge(merge_config) => {
|
||||
let time_provider = Arc::new(SystemProvider::new()) as Arc<dyn TimeProvider>;
|
||||
let metrics = setup_metric_registry();
|
||||
|
||||
let object_store = make_object_store(&merge_config.object_store)
|
||||
.map_err(SchemaCommandError::ObjectStoreParsing)?;
|
||||
// Decorate the object store with a metric recorder.
|
||||
let object_store: Arc<DynObjectStore> = Arc::new(ObjectStoreMetrics::new(
|
||||
object_store,
|
||||
time_provider,
|
||||
&metrics,
|
||||
));
|
||||
|
||||
let catalog = merge_config
|
||||
.catalog_dsn
|
||||
.get_catalog("import", Arc::clone(&metrics))
|
||||
.await?;
|
||||
|
||||
// fetch the TSM schemas and merge into one aggregate schema
|
||||
let schemas = fetch_schema(
|
||||
Arc::clone(&object_store),
|
||||
Some(&Path::from(merge_config.prefix)),
|
||||
&merge_config.suffix,
|
||||
)
|
||||
.await?;
|
||||
let mut merger = SchemaMerger::new(
|
||||
merge_config.org_id.clone(),
|
||||
merge_config.bucket_id.clone(),
|
||||
schemas,
|
||||
);
|
||||
|
||||
// load a schema override file, if provided, to resolve field type conflicts
|
||||
if let Some(schema_override_file) = merge_config.schema_override_file {
|
||||
let data = fs::read(schema_override_file)
|
||||
.map_err(SchemaCommandError::SchemaOverrideFileReadError)?;
|
||||
let schema_override: AggregateTSMSchemaOverride = data
|
||||
.try_into()
|
||||
.map_err(SchemaCommandError::SchemaOverrideParseError)?;
|
||||
merger = merger.with_schema_override(schema_override);
|
||||
}
|
||||
|
||||
// note that this will also apply the schema override, if the user provided one
|
||||
let merged_tsm_schema = merger.merge().map_err(SchemaCommandError::Merging)?;
|
||||
// just print the merged schema for now; we'll do more with this in future PRs
|
||||
println!("Merged schema:\n{merged_tsm_schema:?}");
|
||||
|
||||
// don't proceed unless we produce a valid merged schema
|
||||
if let Err(errors) = validate_schema(&merged_tsm_schema) {
|
||||
return Err(SchemaCommandError::Validating(ValidationErrors(errors)));
|
||||
}
|
||||
|
||||
// From here we can happily .unwrap() the field types knowing they're valid
|
||||
if !merged_tsm_schema.types_are_valid() {
|
||||
return Err(SchemaCommandError::InvalidFieldTypes());
|
||||
}
|
||||
|
||||
// given we have a valid aggregate TSM schema, fetch the schema for the namespace from
|
||||
// the IOx catalog, if it exists, and update it with our aggregate schema
|
||||
update_iox_catalog(&merged_tsm_schema, Arc::clone(&catalog)).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
|
@ -35,7 +35,6 @@ use tokio::runtime::Runtime;
|
|||
mod commands {
|
||||
pub mod catalog;
|
||||
pub mod debug;
|
||||
pub mod import;
|
||||
pub mod namespace;
|
||||
pub mod query;
|
||||
pub mod query_ingester;
|
||||
|
@ -209,9 +208,6 @@ enum Command {
|
|||
/// Query the ingester only
|
||||
QueryIngester(commands::query_ingester::Config),
|
||||
|
||||
/// Commands related to the bulk ingest of data
|
||||
Import(commands::import::Config),
|
||||
|
||||
/// Various commands for namespace manipulation
|
||||
Namespace(commands::namespace::Config),
|
||||
}
|
||||
|
@ -373,13 +369,6 @@ fn main() -> Result<(), std::io::Error> {
|
|||
std::process::exit(ReturnCode::Failure as _)
|
||||
}
|
||||
}
|
||||
Some(Command::Import(config)) => {
|
||||
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||
if let Err(e) = commands::import::command(config).await {
|
||||
eprintln!("{e}");
|
||||
std::process::exit(ReturnCode::Failure as _)
|
||||
}
|
||||
}
|
||||
Some(Command::Namespace(config)) => {
|
||||
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||
let connection = connection(grpc_host).await;
|
||||
|
|
Loading…
Reference in New Issue