fix: throw error when adding fields to non-existent table in WAL (#25525)
* fix: throw error when adding fields to non-existent table * test: add test for expected behaviour in catalog op apply This also added in some helpers to the wal crate that were previously added to pro.pull/25526/head
parent
c2b8a3a355
commit
3bb63b2d71
|
@ -81,8 +81,6 @@ pub const TIME_COLUMN_NAME: &str = "time";
|
|||
)]
|
||||
pub struct SequenceNumber(u32);
|
||||
|
||||
type SeriesKey = Option<Vec<ColumnId>>;
|
||||
|
||||
impl SequenceNumber {
|
||||
pub fn new(id: u32) -> Self {
|
||||
Self(id)
|
||||
|
@ -472,27 +470,18 @@ impl DatabaseSchema {
|
|||
}
|
||||
}
|
||||
CatalogOp::AddFields(field_additions) => {
|
||||
let new_or_existing_table = updated_or_new_tables
|
||||
let Some(new_or_existing_table) = updated_or_new_tables
|
||||
.get(&field_additions.table_id)
|
||||
.or_else(|| self.tables.get(&field_additions.table_id));
|
||||
if let Some(existing_table) = new_or_existing_table {
|
||||
if let Some(new_table) =
|
||||
existing_table.new_if_field_additions_add_fields(field_additions)?
|
||||
{
|
||||
updated_or_new_tables.insert(new_table.table_id, Arc::new(new_table));
|
||||
}
|
||||
} else {
|
||||
let fields = field_additions
|
||||
.field_definitions
|
||||
.iter()
|
||||
.map(|f| (f.id, Arc::clone(&f.name), f.data_type.into()))
|
||||
.collect::<Vec<_>>();
|
||||
let new_table = TableDefinition::new(
|
||||
field_additions.table_id,
|
||||
Arc::clone(&field_additions.table_name),
|
||||
fields,
|
||||
SeriesKey::None,
|
||||
)?;
|
||||
.or_else(|| self.tables.get(&field_additions.table_id))
|
||||
else {
|
||||
return Err(Error::TableNotFound {
|
||||
db_name: Arc::clone(&field_additions.database_name),
|
||||
table_name: Arc::clone(&field_additions.table_name),
|
||||
});
|
||||
};
|
||||
if let Some(new_table) =
|
||||
new_or_existing_table.new_if_field_additions_add_fields(field_additions)?
|
||||
{
|
||||
updated_or_new_tables.insert(new_table.table_id, Arc::new(new_table));
|
||||
}
|
||||
}
|
||||
|
@ -1013,6 +1002,7 @@ pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnT
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use influxdb3_wal::{create, FieldDataType};
|
||||
use pretty_assertions::assert_eq;
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
|
@ -1407,4 +1397,32 @@ mod tests {
|
|||
assert_eq!(instance_id, catalog.instance_id());
|
||||
assert_eq!(host_id, catalog.host_id());
|
||||
}
|
||||
|
||||
/// See: https://github.com/influxdata/influxdb/issues/25524
|
||||
#[test]
|
||||
fn apply_catalog_batch_fails_for_add_fields_on_nonexist_table() {
|
||||
let catalog = Catalog::new(Arc::from("host"), Arc::from("instance"));
|
||||
catalog.insert_database(DatabaseSchema::new(DbId::new(), Arc::from("foo")));
|
||||
let db_id = catalog.db_name_to_id("foo").unwrap();
|
||||
let catalog_batch = create::catalog_batch_op(
|
||||
db_id,
|
||||
"foo",
|
||||
0,
|
||||
[create::add_fields_op(
|
||||
db_id,
|
||||
"foo",
|
||||
TableId::new(),
|
||||
"banana",
|
||||
[create::field_def(
|
||||
ColumnId::new(),
|
||||
"papaya",
|
||||
FieldDataType::String,
|
||||
)],
|
||||
)],
|
||||
);
|
||||
let err = catalog
|
||||
.apply_catalog_batch(catalog_batch.as_catalog().unwrap())
|
||||
.expect_err("should fail to apply AddFields operation for non-existent table");
|
||||
assert_contains!(err.to_string(), "Table banana not in DB schema for foo");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
//! A set of helper methods for creating WAL operations in tests.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use influxdb3_id::{ColumnId, DbId};
|
||||
|
||||
use crate::*;
|
||||
|
||||
pub fn wal_contents(
|
||||
(min_timestamp_ns, max_timestamp_ns, wal_file_number): (i64, i64, u64),
|
||||
ops: impl IntoIterator<Item = WalOp>,
|
||||
) -> WalContents {
|
||||
WalContents {
|
||||
min_timestamp_ns,
|
||||
max_timestamp_ns,
|
||||
wal_file_number: WalFileSequenceNumber::new(wal_file_number),
|
||||
ops: ops.into_iter().collect(),
|
||||
snapshot: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn catalog_batch_op(
|
||||
db_id: DbId,
|
||||
db_name: impl Into<Arc<str>>,
|
||||
time_ns: i64,
|
||||
ops: impl IntoIterator<Item = CatalogOp>,
|
||||
) -> WalOp {
|
||||
WalOp::Catalog(CatalogBatch {
|
||||
database_id: db_id,
|
||||
database_name: db_name.into(),
|
||||
time_ns,
|
||||
ops: ops.into_iter().collect(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn add_fields_op(
|
||||
database_id: DbId,
|
||||
db_name: impl Into<Arc<str>>,
|
||||
table_id: TableId,
|
||||
table_name: impl Into<Arc<str>>,
|
||||
fields: impl IntoIterator<Item = FieldDefinition>,
|
||||
) -> CatalogOp {
|
||||
CatalogOp::AddFields(FieldAdditions {
|
||||
database_name: db_name.into(),
|
||||
database_id,
|
||||
table_name: table_name.into(),
|
||||
table_id,
|
||||
field_definitions: fields.into_iter().collect(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn create_table_op(
|
||||
db_id: DbId,
|
||||
db_name: impl Into<Arc<str>>,
|
||||
table_id: TableId,
|
||||
table_name: impl Into<Arc<str>>,
|
||||
fields: impl IntoIterator<Item = FieldDefinition>,
|
||||
) -> CatalogOp {
|
||||
CatalogOp::CreateTable(TableDefinition {
|
||||
database_id: db_id,
|
||||
database_name: db_name.into(),
|
||||
table_name: table_name.into(),
|
||||
table_id,
|
||||
field_definitions: fields.into_iter().collect(),
|
||||
key: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn field_def(
|
||||
id: ColumnId,
|
||||
name: impl Into<Arc<str>>,
|
||||
data_type: FieldDataType,
|
||||
) -> FieldDefinition {
|
||||
FieldDefinition {
|
||||
name: name.into(),
|
||||
data_type,
|
||||
id,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct CreateLastCacheOpBuilder {
|
||||
table_id: TableId,
|
||||
table_name: Arc<str>,
|
||||
name: Arc<str>,
|
||||
key_columns: Vec<ColumnId>,
|
||||
value_columns: Option<LastCacheValueColumnsDef>,
|
||||
count: Option<LastCacheSize>,
|
||||
ttl: Option<u64>,
|
||||
}
|
||||
|
||||
impl CreateLastCacheOpBuilder {
|
||||
pub fn build(self) -> CatalogOp {
|
||||
CatalogOp::CreateLastCache(LastCacheDefinition {
|
||||
table_id: self.table_id,
|
||||
table: self.table_name,
|
||||
name: self.name,
|
||||
key_columns: self.key_columns,
|
||||
value_columns: self
|
||||
.value_columns
|
||||
.unwrap_or(LastCacheValueColumnsDef::AllNonKeyColumns),
|
||||
count: self.count.unwrap_or_else(|| LastCacheSize::new(1).unwrap()),
|
||||
ttl: self.ttl.unwrap_or(3600),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_last_cache_op_builder(
|
||||
table_id: TableId,
|
||||
table_name: impl Into<Arc<str>>,
|
||||
cache_name: impl Into<Arc<str>>,
|
||||
key_columns: impl IntoIterator<Item = ColumnId>,
|
||||
) -> CreateLastCacheOpBuilder {
|
||||
CreateLastCacheOpBuilder {
|
||||
table_id,
|
||||
table_name: table_name.into(),
|
||||
name: cache_name.into(),
|
||||
key_columns: key_columns.into_iter().collect(),
|
||||
value_columns: None,
|
||||
count: None,
|
||||
ttl: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn delete_last_cache_op(
|
||||
table_id: TableId,
|
||||
table_name: impl Into<Arc<str>>,
|
||||
cache_name: impl Into<Arc<str>>,
|
||||
) -> CatalogOp {
|
||||
CatalogOp::DeleteLastCache(LastCacheDelete {
|
||||
table_name: table_name.into(),
|
||||
table_id,
|
||||
name: cache_name.into(),
|
||||
})
|
||||
}
|
|
@ -3,6 +3,7 @@
|
|||
//! writes durable until they can be written in larger batches as Parquet files and other snapshot and
|
||||
//! index files in object storage.
|
||||
|
||||
pub mod create;
|
||||
pub mod object_store;
|
||||
pub mod serialize;
|
||||
mod snapshot_tracker;
|
||||
|
|
Loading…
Reference in New Issue