diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index c6270a60a2..6cf1d23072 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -81,8 +81,6 @@ pub const TIME_COLUMN_NAME: &str = "time"; )] pub struct SequenceNumber(u32); -type SeriesKey = Option>; - impl SequenceNumber { pub fn new(id: u32) -> Self { Self(id) @@ -472,27 +470,18 @@ impl DatabaseSchema { } } CatalogOp::AddFields(field_additions) => { - let new_or_existing_table = updated_or_new_tables + let Some(new_or_existing_table) = updated_or_new_tables .get(&field_additions.table_id) - .or_else(|| self.tables.get(&field_additions.table_id)); - if let Some(existing_table) = new_or_existing_table { - if let Some(new_table) = - existing_table.new_if_field_additions_add_fields(field_additions)? - { - updated_or_new_tables.insert(new_table.table_id, Arc::new(new_table)); - } - } else { - let fields = field_additions - .field_definitions - .iter() - .map(|f| (f.id, Arc::clone(&f.name), f.data_type.into())) - .collect::>(); - let new_table = TableDefinition::new( - field_additions.table_id, - Arc::clone(&field_additions.table_name), - fields, - SeriesKey::None, - )?; + .or_else(|| self.tables.get(&field_additions.table_id)) + else { + return Err(Error::TableNotFound { + db_name: Arc::clone(&field_additions.database_name), + table_name: Arc::clone(&field_additions.table_name), + }); + }; + if let Some(new_table) = + new_or_existing_table.new_if_field_additions_add_fields(field_additions)? + { updated_or_new_tables.insert(new_table.table_id, Arc::new(new_table)); } } @@ -1013,6 +1002,7 @@ pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnT #[cfg(test)] mod tests { + use influxdb3_wal::{create, FieldDataType}; use pretty_assertions::assert_eq; use test_helpers::assert_contains; @@ -1407,4 +1397,32 @@ mod tests { assert_eq!(instance_id, catalog.instance_id()); assert_eq!(host_id, catalog.host_id()); } + + /// See: https://github.com/influxdata/influxdb/issues/25524 + #[test] + fn apply_catalog_batch_fails_for_add_fields_on_nonexist_table() { + let catalog = Catalog::new(Arc::from("host"), Arc::from("instance")); + catalog.insert_database(DatabaseSchema::new(DbId::new(), Arc::from("foo"))); + let db_id = catalog.db_name_to_id("foo").unwrap(); + let catalog_batch = create::catalog_batch_op( + db_id, + "foo", + 0, + [create::add_fields_op( + db_id, + "foo", + TableId::new(), + "banana", + [create::field_def( + ColumnId::new(), + "papaya", + FieldDataType::String, + )], + )], + ); + let err = catalog + .apply_catalog_batch(catalog_batch.as_catalog().unwrap()) + .expect_err("should fail to apply AddFields operation for non-existent table"); + assert_contains!(err.to_string(), "Table banana not in DB schema for foo"); + } } diff --git a/influxdb3_wal/src/create.rs b/influxdb3_wal/src/create.rs new file mode 100644 index 0000000000..c91cf2ddfe --- /dev/null +++ b/influxdb3_wal/src/create.rs @@ -0,0 +1,135 @@ +//! A set of helper methods for creating WAL operations in tests. + +use std::sync::Arc; + +use influxdb3_id::{ColumnId, DbId}; + +use crate::*; + +pub fn wal_contents( + (min_timestamp_ns, max_timestamp_ns, wal_file_number): (i64, i64, u64), + ops: impl IntoIterator, +) -> WalContents { + WalContents { + min_timestamp_ns, + max_timestamp_ns, + wal_file_number: WalFileSequenceNumber::new(wal_file_number), + ops: ops.into_iter().collect(), + snapshot: None, + } +} + +pub fn catalog_batch_op( + db_id: DbId, + db_name: impl Into>, + time_ns: i64, + ops: impl IntoIterator, +) -> WalOp { + WalOp::Catalog(CatalogBatch { + database_id: db_id, + database_name: db_name.into(), + time_ns, + ops: ops.into_iter().collect(), + }) +} + +pub fn add_fields_op( + database_id: DbId, + db_name: impl Into>, + table_id: TableId, + table_name: impl Into>, + fields: impl IntoIterator, +) -> CatalogOp { + CatalogOp::AddFields(FieldAdditions { + database_name: db_name.into(), + database_id, + table_name: table_name.into(), + table_id, + field_definitions: fields.into_iter().collect(), + }) +} + +pub fn create_table_op( + db_id: DbId, + db_name: impl Into>, + table_id: TableId, + table_name: impl Into>, + fields: impl IntoIterator, +) -> CatalogOp { + CatalogOp::CreateTable(TableDefinition { + database_id: db_id, + database_name: db_name.into(), + table_name: table_name.into(), + table_id, + field_definitions: fields.into_iter().collect(), + key: None, + }) +} + +pub fn field_def( + id: ColumnId, + name: impl Into>, + data_type: FieldDataType, +) -> FieldDefinition { + FieldDefinition { + name: name.into(), + data_type, + id, + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct CreateLastCacheOpBuilder { + table_id: TableId, + table_name: Arc, + name: Arc, + key_columns: Vec, + value_columns: Option, + count: Option, + ttl: Option, +} + +impl CreateLastCacheOpBuilder { + pub fn build(self) -> CatalogOp { + CatalogOp::CreateLastCache(LastCacheDefinition { + table_id: self.table_id, + table: self.table_name, + name: self.name, + key_columns: self.key_columns, + value_columns: self + .value_columns + .unwrap_or(LastCacheValueColumnsDef::AllNonKeyColumns), + count: self.count.unwrap_or_else(|| LastCacheSize::new(1).unwrap()), + ttl: self.ttl.unwrap_or(3600), + }) + } +} + +pub fn create_last_cache_op_builder( + table_id: TableId, + table_name: impl Into>, + cache_name: impl Into>, + key_columns: impl IntoIterator, +) -> CreateLastCacheOpBuilder { + CreateLastCacheOpBuilder { + table_id, + table_name: table_name.into(), + name: cache_name.into(), + key_columns: key_columns.into_iter().collect(), + value_columns: None, + count: None, + ttl: None, + } +} + +pub fn delete_last_cache_op( + table_id: TableId, + table_name: impl Into>, + cache_name: impl Into>, +) -> CatalogOp { + CatalogOp::DeleteLastCache(LastCacheDelete { + table_name: table_name.into(), + table_id, + name: cache_name.into(), + }) +} diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index 294fbaf319..eb49426e0c 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -3,6 +3,7 @@ //! writes durable until they can be written in larger batches as Parquet files and other snapshot and //! index files in object storage. +pub mod create; pub mod object_store; pub mod serialize; mod snapshot_tracker;