fix(catalog): consistent ordering of catalog operations (#25690)
parent
c764d37636
commit
0db71b69b9
|
@ -1,16 +1,17 @@
|
|||
//! Implementation of the Catalog that sits entirely in memory.
|
||||
|
||||
use crate::catalog::Error::{
|
||||
ProcessingEngineCallExists, ProcessingEngineTriggerExists, TableNotFound,
|
||||
CatalogUpdatedElsewhere, ProcessingEngineCallExists, ProcessingEngineTriggerExists,
|
||||
TableNotFound,
|
||||
};
|
||||
use bimap::BiHashMap;
|
||||
use bimap::{BiHashMap, Overwritten};
|
||||
use hashbrown::HashMap;
|
||||
use indexmap::IndexMap;
|
||||
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
|
||||
use influxdb3_wal::{
|
||||
CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeleteTableDefinition, FieldAdditions,
|
||||
FieldDefinition, LastCacheDefinition, LastCacheDelete, MetaCacheDefinition, MetaCacheDelete,
|
||||
PluginDefinition, TriggerDefinition,
|
||||
OrderedCatalogBatch, PluginDefinition, TriggerDefinition,
|
||||
};
|
||||
use influxdb_line_protocol::FieldValue;
|
||||
use iox_time::Time;
|
||||
|
@ -184,10 +185,26 @@ impl Catalog {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn apply_catalog_batch(&self, catalog_batch: &CatalogBatch) -> Result<()> {
|
||||
pub fn apply_catalog_batch(
|
||||
&self,
|
||||
catalog_batch: CatalogBatch,
|
||||
) -> Result<Option<OrderedCatalogBatch>> {
|
||||
self.inner.write().apply_catalog_batch(catalog_batch)
|
||||
}
|
||||
|
||||
// Checks the sequence number to see if it needs to be applied.
|
||||
pub fn apply_ordered_catalog_batch(
|
||||
&self,
|
||||
batch: OrderedCatalogBatch,
|
||||
) -> Result<Option<CatalogBatch>> {
|
||||
if batch.sequence_number() >= self.sequence_number().as_u32() {
|
||||
if let Some(catalog_batch) = self.apply_catalog_batch(batch.batch())? {
|
||||
return Ok(Some(catalog_batch.batch()));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub fn db_or_create(&self, db_name: &str) -> Result<Arc<DatabaseSchema>> {
|
||||
let db = match self.db_schema(db_name) {
|
||||
Some(db) => db,
|
||||
|
@ -259,81 +276,6 @@ impl Catalog {
|
|||
self.inner.read().clone()
|
||||
}
|
||||
|
||||
pub fn add_meta_cache(&self, db_id: DbId, table_id: TableId, meta_cache: MetaCacheDefinition) {
|
||||
let mut inner = self.inner.write();
|
||||
let mut db = inner
|
||||
.databases
|
||||
.get(&db_id)
|
||||
.expect("db should exist")
|
||||
.as_ref()
|
||||
.clone();
|
||||
let mut table = db
|
||||
.table_definition_by_id(&table_id)
|
||||
.expect("table should exist")
|
||||
.as_ref()
|
||||
.clone();
|
||||
table.add_meta_cache(meta_cache);
|
||||
db.insert_table(table_id, Arc::new(table));
|
||||
inner.upsert_db(db);
|
||||
}
|
||||
|
||||
pub fn remove_meta_cache(&self, db_id: &DbId, table_id: &TableId, name: &str) {
|
||||
let mut inner = self.inner.write();
|
||||
let mut db = inner
|
||||
.databases
|
||||
.get(db_id)
|
||||
.expect("db should exist")
|
||||
.as_ref()
|
||||
.clone();
|
||||
let mut table = db
|
||||
.tables
|
||||
.get(table_id)
|
||||
.expect("table should exist")
|
||||
.as_ref()
|
||||
.clone();
|
||||
table.remove_meta_cache(name);
|
||||
db.insert_table(*table_id, Arc::new(table));
|
||||
inner.upsert_db(db);
|
||||
}
|
||||
|
||||
pub fn add_last_cache(&self, db_id: DbId, table_id: TableId, last_cache: LastCacheDefinition) {
|
||||
let mut inner = self.inner.write();
|
||||
let mut db = inner
|
||||
.databases
|
||||
.get(&db_id)
|
||||
.expect("db should exist")
|
||||
.as_ref()
|
||||
.clone();
|
||||
let mut table = db
|
||||
.tables
|
||||
.get(&table_id)
|
||||
.expect("table should exist")
|
||||
.as_ref()
|
||||
.clone();
|
||||
table.add_last_cache(last_cache);
|
||||
db.insert_table(table_id, Arc::new(table));
|
||||
inner.upsert_db(db);
|
||||
}
|
||||
|
||||
pub fn delete_last_cache(&self, db_id: DbId, table_id: TableId, name: &str) {
|
||||
let mut inner = self.inner.write();
|
||||
let mut db = inner
|
||||
.databases
|
||||
.get(&db_id)
|
||||
.expect("db should exist")
|
||||
.as_ref()
|
||||
.clone();
|
||||
let mut table = db
|
||||
.tables
|
||||
.get(&table_id)
|
||||
.expect("table should exist")
|
||||
.as_ref()
|
||||
.clone();
|
||||
table.remove_last_cache(name);
|
||||
db.insert_table(table_id, Arc::new(table));
|
||||
inner.upsert_db(db);
|
||||
}
|
||||
|
||||
pub fn instance_id(&self) -> Arc<str> {
|
||||
Arc::clone(&self.inner.read().instance_id)
|
||||
}
|
||||
|
@ -478,24 +420,31 @@ impl InnerCatalog {
|
|||
|
||||
/// Applies the `CatalogBatch` while validating that all updates are compatible. If updates
|
||||
/// have already been applied, the sequence number and updated tracker are not updated.
|
||||
pub fn apply_catalog_batch(&mut self, catalog_batch: &CatalogBatch) -> Result<()> {
|
||||
pub fn apply_catalog_batch(
|
||||
&mut self,
|
||||
catalog_batch: CatalogBatch,
|
||||
) -> Result<Option<OrderedCatalogBatch>> {
|
||||
let table_count = self.table_count();
|
||||
|
||||
if let Some(db) = self.databases.get(&catalog_batch.database_id) {
|
||||
if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, catalog_batch)? {
|
||||
if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, &catalog_batch)? {
|
||||
check_overall_table_count(Some(db), &new_db, table_count)?;
|
||||
self.upsert_db(new_db);
|
||||
} else {
|
||||
return Ok(None);
|
||||
}
|
||||
} else {
|
||||
if self.databases.len() >= Catalog::NUM_DBS_LIMIT {
|
||||
return Err(Error::TooManyDbs);
|
||||
}
|
||||
let new_db = DatabaseSchema::new_from_batch(catalog_batch)?;
|
||||
let new_db = DatabaseSchema::new_from_batch(&catalog_batch)?;
|
||||
check_overall_table_count(None, &new_db, table_count)?;
|
||||
self.upsert_db(new_db);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(Some(OrderedCatalogBatch::new(
|
||||
catalog_batch,
|
||||
self.sequence.0,
|
||||
)))
|
||||
}
|
||||
|
||||
pub fn db_exists(&self, db_id: DbId) -> bool {
|
||||
|
@ -606,10 +555,21 @@ impl DatabaseSchema {
|
|||
&mut self,
|
||||
table_id: TableId,
|
||||
table_def: Arc<TableDefinition>,
|
||||
) -> Option<Arc<TableDefinition>> {
|
||||
self.table_map
|
||||
.insert(table_id, Arc::clone(&table_def.table_name));
|
||||
self.tables.insert(table_id, table_def)
|
||||
) -> Result<Option<Arc<TableDefinition>>> {
|
||||
match self
|
||||
.table_map
|
||||
.insert(table_id, Arc::clone(&table_def.table_name))
|
||||
{
|
||||
Overwritten::Left(_, _) | Overwritten::Right(_, _) | Overwritten::Both(_, _) => {
|
||||
// This will happen if another table was inserted with the same name between checking
|
||||
// for existence and insertion.
|
||||
// We'd like this to be automatically handled by the system,
|
||||
// but for now it is better to error than get into an inconsistent state.
|
||||
return Err(CatalogUpdatedElsewhere);
|
||||
}
|
||||
Overwritten::Neither | Overwritten::Pair(_, _) => {}
|
||||
}
|
||||
Ok(self.tables.insert(table_id, table_def))
|
||||
}
|
||||
|
||||
pub fn table_schema(&self, table_name: impl Into<Arc<str>>) -> Option<Schema> {
|
||||
|
@ -732,14 +692,14 @@ impl UpdateDatabaseSchema for influxdb3_wal::TableDefinition {
|
|||
if let Cow::Owned(updated_table) = existing_table.check_and_add_new_fields(self)? {
|
||||
database_schema
|
||||
.to_mut()
|
||||
.insert_table(self.table_id, Arc::new(updated_table));
|
||||
.insert_table(self.table_id, Arc::new(updated_table))?;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let new_table = TableDefinition::new_from_op(self);
|
||||
database_schema
|
||||
.to_mut()
|
||||
.insert_table(new_table.table_id, Arc::new(new_table));
|
||||
.insert_table(new_table.table_id, Arc::new(new_table))?;
|
||||
}
|
||||
}
|
||||
Ok(database_schema)
|
||||
|
@ -1156,7 +1116,7 @@ impl<T: TableUpdate> UpdateDatabaseSchema for T {
|
|||
if let Cow::Owned(new_table) = self.update_table(Cow::Borrowed(table.as_ref()))? {
|
||||
schema
|
||||
.to_mut()
|
||||
.insert_table(new_table.table_id, Arc::new(new_table));
|
||||
.insert_table(new_table.table_id, Arc::new(new_table))?;
|
||||
}
|
||||
Ok(schema)
|
||||
}
|
||||
|
@ -1288,12 +1248,12 @@ pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnT
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use influxdb3_wal::{create, FieldDataType};
|
||||
use super::*;
|
||||
use influxdb3_wal::CatalogOp::CreateTable;
|
||||
use influxdb3_wal::{create, FieldDataType, WalOp};
|
||||
use pretty_assertions::assert_eq;
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn catalog_serialization() {
|
||||
let host_id = Arc::from("sample-host-id");
|
||||
|
@ -1707,7 +1667,7 @@ mod tests {
|
|||
let catalog = Catalog::new(Arc::from("host"), Arc::from("instance"));
|
||||
catalog.insert_database(DatabaseSchema::new(DbId::new(), Arc::from("foo")));
|
||||
let db_id = catalog.db_name_to_id("foo").unwrap();
|
||||
let catalog_batch = create::catalog_batch_op(
|
||||
let catalog_batch = create::catalog_batch(
|
||||
db_id,
|
||||
"foo",
|
||||
0,
|
||||
|
@ -1724,7 +1684,7 @@ mod tests {
|
|||
)],
|
||||
);
|
||||
let err = catalog
|
||||
.apply_catalog_batch(catalog_batch.as_catalog().unwrap())
|
||||
.apply_catalog_batch(catalog_batch)
|
||||
.expect_err("should fail to apply AddFields operation for non-existent table");
|
||||
assert_contains!(err.to_string(), "Table banana not in DB schema for foo");
|
||||
}
|
||||
|
@ -1775,7 +1735,9 @@ mod tests {
|
|||
.unwrap(),
|
||||
);
|
||||
|
||||
database.insert_table(deleted_table_id, table_defn);
|
||||
database
|
||||
.insert_table(deleted_table_id, table_defn)
|
||||
.expect("should be able to insert");
|
||||
let new_db = DatabaseSchema::new_if_updated_from_batch(
|
||||
&database,
|
||||
&CatalogBatch {
|
||||
|
@ -1798,4 +1760,81 @@ mod tests {
|
|||
assert!(deleted_table.deleted);
|
||||
assert!(!deleted_table.series_key.is_empty());
|
||||
}
|
||||
|
||||
// tests that sorting catalog ops by the sequence number returned from apply_catalog_batch
|
||||
// fixes potential ordering issues.
|
||||
#[test]
|
||||
fn test_out_of_order_ops() -> Result<()> {
|
||||
let catalog = Catalog::new(Arc::from("host"), Arc::from("instance"));
|
||||
let db_id = DbId::new();
|
||||
let db_name = Arc::from("foo");
|
||||
let table_id = TableId::new();
|
||||
let table_name = Arc::from("bar");
|
||||
let table_definition = influxdb3_wal::TableDefinition {
|
||||
database_id: db_id,
|
||||
database_name: Arc::clone(&db_name),
|
||||
table_name: Arc::clone(&table_name),
|
||||
table_id,
|
||||
field_definitions: vec![
|
||||
FieldDefinition::new(ColumnId::from(0), "tag_1", FieldDataType::Tag),
|
||||
FieldDefinition::new(ColumnId::from(1), "time", FieldDataType::Timestamp),
|
||||
FieldDefinition::new(ColumnId::from(2), "field", FieldDataType::String),
|
||||
],
|
||||
key: vec![ColumnId::from(0)],
|
||||
};
|
||||
let create_op = CatalogBatch {
|
||||
database_id: db_id,
|
||||
database_name: Arc::clone(&db_name),
|
||||
time_ns: 0,
|
||||
ops: vec![CreateTable(table_definition.clone())],
|
||||
};
|
||||
let add_column_op = CatalogBatch {
|
||||
database_id: db_id,
|
||||
database_name: Arc::clone(&db_name),
|
||||
time_ns: 0,
|
||||
ops: vec![CatalogOp::AddFields(FieldAdditions {
|
||||
database_name: Arc::clone(&db_name),
|
||||
database_id: db_id,
|
||||
table_name,
|
||||
table_id,
|
||||
field_definitions: vec![FieldDefinition::new(
|
||||
ColumnId::from(3),
|
||||
"tag_2",
|
||||
FieldDataType::Tag,
|
||||
)],
|
||||
})],
|
||||
};
|
||||
let create_ordered_op = catalog
|
||||
.apply_catalog_batch(create_op)?
|
||||
.expect("should be able to create");
|
||||
let add_column_op = catalog
|
||||
.apply_catalog_batch(add_column_op)?
|
||||
.expect("should produce operation");
|
||||
let mut ops = vec![
|
||||
WalOp::Catalog(add_column_op),
|
||||
WalOp::Catalog(create_ordered_op),
|
||||
];
|
||||
ops.sort();
|
||||
|
||||
let replayed_catalog = Catalog::new(Arc::from("host"), Arc::from("instance"));
|
||||
for op in ops {
|
||||
let WalOp::Catalog(catalog_batch) = op else {
|
||||
panic!("should produce operation");
|
||||
};
|
||||
replayed_catalog.apply_catalog_batch(catalog_batch.batch())?;
|
||||
}
|
||||
let original_table = catalog
|
||||
.db_schema_by_id(&db_id)
|
||||
.unwrap()
|
||||
.table_definition_by_id(&table_id)
|
||||
.unwrap();
|
||||
let replayed_table = catalog
|
||||
.db_schema_by_id(&db_id)
|
||||
.unwrap()
|
||||
.table_definition_by_id(&table_id)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(original_table, replayed_table);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,18 +43,18 @@ pub fn write_batch_op(write_batch: WriteBatch) -> WalOp {
|
|||
WalOp::Write(write_batch)
|
||||
}
|
||||
|
||||
pub fn catalog_batch_op(
|
||||
pub fn catalog_batch(
|
||||
db_id: DbId,
|
||||
db_name: impl Into<Arc<str>>,
|
||||
time_ns: i64,
|
||||
ops: impl IntoIterator<Item = CatalogOp>,
|
||||
) -> WalOp {
|
||||
WalOp::Catalog(CatalogBatch {
|
||||
) -> CatalogBatch {
|
||||
CatalogBatch {
|
||||
database_id: db_id,
|
||||
database_name: db_name.into(),
|
||||
time_ns,
|
||||
ops: ops.into_iter().collect(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_fields_op(
|
||||
|
|
|
@ -21,6 +21,7 @@ use observability_deps::tracing::error;
|
|||
use schema::{InfluxColumnType, InfluxFieldType};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
use std::cmp::Ordering;
|
||||
use std::fmt::Debug;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
@ -213,7 +214,31 @@ impl Default for Gen1Duration {
|
|||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub enum WalOp {
|
||||
Write(WriteBatch),
|
||||
Catalog(CatalogBatch),
|
||||
Catalog(OrderedCatalogBatch),
|
||||
}
|
||||
|
||||
impl PartialOrd for WalOp {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for WalOp {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
match (self, other) {
|
||||
// Catalog ops come before Write ops
|
||||
(WalOp::Catalog(_), WalOp::Write(_)) => Ordering::Less,
|
||||
(WalOp::Write(_), WalOp::Catalog(_)) => Ordering::Greater,
|
||||
|
||||
// For two Catalog ops, compare by database_sequence_number
|
||||
(WalOp::Catalog(a), WalOp::Catalog(b)) => {
|
||||
a.database_sequence_number.cmp(&b.database_sequence_number)
|
||||
}
|
||||
|
||||
// For two Write ops, consider them equal
|
||||
(WalOp::Write(_), WalOp::Write(_)) => Ordering::Equal,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WalOp {
|
||||
|
@ -227,7 +252,7 @@ impl WalOp {
|
|||
pub fn as_catalog(&self) -> Option<&CatalogBatch> {
|
||||
match self {
|
||||
WalOp::Write(_) => None,
|
||||
WalOp::Catalog(c) => Some(c),
|
||||
WalOp::Catalog(c) => Some(&c.catalog),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -240,6 +265,30 @@ pub struct CatalogBatch {
|
|||
pub ops: Vec<CatalogOp>,
|
||||
}
|
||||
|
||||
/// A catalog batch that has been processed by the catalog and given a sequence number.
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct OrderedCatalogBatch {
|
||||
catalog: CatalogBatch,
|
||||
database_sequence_number: u32,
|
||||
}
|
||||
|
||||
impl OrderedCatalogBatch {
|
||||
pub fn new(catalog: CatalogBatch, database_sequence_number: u32) -> Self {
|
||||
Self {
|
||||
catalog,
|
||||
database_sequence_number,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sequence_number(&self) -> u32 {
|
||||
self.database_sequence_number
|
||||
}
|
||||
|
||||
pub fn batch(self) -> CatalogBatch {
|
||||
self.catalog
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub enum CatalogOp {
|
||||
CreateDatabase(DatabaseDefinition),
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
use crate::serialize::verify_file_type_and_deserialize;
|
||||
use crate::snapshot_tracker::{SnapshotInfo, SnapshotTracker, WalPeriod};
|
||||
use crate::{
|
||||
background_wal_flush, CatalogBatch, SnapshotDetails, SnapshotSequenceNumber, Wal, WalConfig,
|
||||
WalContents, WalFileNotifier, WalFileSequenceNumber, WalOp, WriteBatch,
|
||||
background_wal_flush, OrderedCatalogBatch, SnapshotDetails, SnapshotSequenceNumber, Wal,
|
||||
WalConfig, WalContents, WalFileNotifier, WalFileSequenceNumber, WalOp, WriteBatch,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use data_types::Timestamp;
|
||||
|
@ -516,7 +516,7 @@ struct WalBuffer {
|
|||
op_limit: usize,
|
||||
op_count: usize,
|
||||
database_to_write_batch: HashMap<Arc<str>, WriteBatch>,
|
||||
catalog_batches: Vec<CatalogBatch>,
|
||||
catalog_batches: Vec<OrderedCatalogBatch>,
|
||||
write_op_responses: Vec<oneshot::Sender<WriteResult>>,
|
||||
}
|
||||
|
||||
|
@ -594,21 +594,18 @@ impl WalBuffer {
|
|||
}
|
||||
|
||||
for catalog_batch in &self.catalog_batches {
|
||||
min_timestamp_ns = min_timestamp_ns.min(catalog_batch.time_ns);
|
||||
max_timestamp_ns = max_timestamp_ns.max(catalog_batch.time_ns);
|
||||
min_timestamp_ns = min_timestamp_ns.min(catalog_batch.catalog.time_ns);
|
||||
max_timestamp_ns = max_timestamp_ns.max(catalog_batch.catalog.time_ns);
|
||||
}
|
||||
|
||||
// have the catalog ops come before any writes in ordering
|
||||
let mut ops =
|
||||
Vec::with_capacity(self.database_to_write_batch.len() + self.catalog_batches.len());
|
||||
|
||||
for catalog_batch in self.catalog_batches {
|
||||
ops.push(WalOp::Catalog(catalog_batch));
|
||||
}
|
||||
ops.extend(self.catalog_batches.into_iter().map(WalOp::Catalog));
|
||||
ops.extend(self.database_to_write_batch.into_values().map(WalOp::Write));
|
||||
|
||||
for write_batch in self.database_to_write_batch.into_values() {
|
||||
ops.push(WalOp::Write(write_batch));
|
||||
}
|
||||
ops.sort();
|
||||
|
||||
(
|
||||
WalContents {
|
||||
|
@ -667,7 +664,7 @@ mod tests {
|
|||
let time_provider: Arc<dyn TimeProvider> =
|
||||
Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
|
||||
let notifier: Arc<dyn WalFileNotifier> = Arc::new(TestNotfiier::default());
|
||||
let notifier: Arc<dyn WalFileNotifier> = Arc::new(TestNotifier::default());
|
||||
let wal_config = WalConfig {
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_secs(1),
|
||||
|
@ -877,7 +874,7 @@ mod tests {
|
|||
);
|
||||
|
||||
// before we trigger a snapshot, test replay with a new wal and notifier
|
||||
let replay_notifier: Arc<dyn WalFileNotifier> = Arc::new(TestNotfiier::default());
|
||||
let replay_notifier: Arc<dyn WalFileNotifier> = Arc::new(TestNotifier::default());
|
||||
let replay_wal = WalObjectStore::new_without_replay(
|
||||
Arc::clone(&time_provider),
|
||||
Arc::clone(&object_store),
|
||||
|
@ -897,7 +894,7 @@ mod tests {
|
|||
replay_wal.replay().await.unwrap();
|
||||
let replay_notifier = replay_notifier
|
||||
.as_any()
|
||||
.downcast_ref::<TestNotfiier>()
|
||||
.downcast_ref::<TestNotifier>()
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
|
@ -1008,7 +1005,7 @@ mod tests {
|
|||
},
|
||||
);
|
||||
|
||||
let notifier = notifier.as_any().downcast_ref::<TestNotfiier>().unwrap();
|
||||
let notifier = notifier.as_any().downcast_ref::<TestNotifier>().unwrap();
|
||||
|
||||
{
|
||||
let notified_writes = notifier.notified_writes.lock();
|
||||
|
@ -1022,7 +1019,7 @@ mod tests {
|
|||
.await;
|
||||
|
||||
// test that replay now only has file 3
|
||||
let replay_notifier: Arc<dyn WalFileNotifier> = Arc::new(TestNotfiier::default());
|
||||
let replay_notifier: Arc<dyn WalFileNotifier> = Arc::new(TestNotifier::default());
|
||||
let replay_wal = WalObjectStore::new_without_replay(
|
||||
Arc::clone(&time_provider),
|
||||
object_store,
|
||||
|
@ -1039,7 +1036,7 @@ mod tests {
|
|||
replay_wal.replay().await.unwrap();
|
||||
let replay_notifier = replay_notifier
|
||||
.as_any()
|
||||
.downcast_ref::<TestNotfiier>()
|
||||
.downcast_ref::<TestNotifier>()
|
||||
.unwrap();
|
||||
let notified_writes = replay_notifier.notified_writes.lock();
|
||||
assert_eq!(*notified_writes, vec![file_3_contents.clone()]);
|
||||
|
@ -1052,7 +1049,7 @@ mod tests {
|
|||
let time_provider: Arc<dyn TimeProvider> =
|
||||
Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
|
||||
let notifier: Arc<dyn WalFileNotifier> = Arc::new(TestNotfiier::default());
|
||||
let notifier: Arc<dyn WalFileNotifier> = Arc::new(TestNotifier::default());
|
||||
let wal_config = WalConfig {
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_secs(1),
|
||||
|
@ -1070,7 +1067,7 @@ mod tests {
|
|||
);
|
||||
|
||||
assert!(wal.flush_buffer().await.is_none());
|
||||
let notifier = notifier.as_any().downcast_ref::<TestNotfiier>().unwrap();
|
||||
let notifier = notifier.as_any().downcast_ref::<TestNotifier>().unwrap();
|
||||
assert!(notifier.notified_writes.lock().is_empty());
|
||||
|
||||
// make sure no wal file was written
|
||||
|
@ -1096,13 +1093,13 @@ mod tests {
|
|||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct TestNotfiier {
|
||||
struct TestNotifier {
|
||||
notified_writes: parking_lot::Mutex<Vec<WalContents>>,
|
||||
snapshot_details: parking_lot::Mutex<Option<SnapshotDetails>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WalFileNotifier for TestNotfiier {
|
||||
impl WalFileNotifier for TestNotifier {
|
||||
fn notify(&self, write: WalContents) {
|
||||
self.notified_writes.lock().push(write);
|
||||
}
|
||||
|
|
|
@ -16,8 +16,8 @@ use datafusion::execution::memory_pool::UnboundedMemoryPool;
|
|||
use datafusion::execution::object_store::ObjectStoreUrl;
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use futures_util::pin_mut;
|
||||
use futures_util::stream::StreamExt;
|
||||
use futures_util::stream::TryStreamExt;
|
||||
use futures_util::stream::{FuturesOrdered, StreamExt};
|
||||
use influxdb3_cache::last_cache;
|
||||
use influxdb3_catalog::catalog::Catalog;
|
||||
use influxdb3_catalog::catalog::InnerCatalog;
|
||||
|
@ -32,7 +32,6 @@ use std::any::Any;
|
|||
use std::io::Write;
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
use tokio::task::JoinSet;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
|
@ -188,8 +187,9 @@ impl Persister {
|
|||
///
|
||||
/// This is intended to be used on server start.
|
||||
pub async fn load_snapshots(&self, mut most_recent_n: usize) -> Result<Vec<PersistedSnapshot>> {
|
||||
let mut join_set = JoinSet::new();
|
||||
let mut futures = FuturesOrdered::new();
|
||||
let mut offset: Option<ObjPath> = None;
|
||||
|
||||
while most_recent_n > 0 {
|
||||
let count = if most_recent_n > 1000 {
|
||||
most_recent_n -= 1000;
|
||||
|
@ -238,7 +238,7 @@ impl Persister {
|
|||
}
|
||||
|
||||
for item in &list[0..end] {
|
||||
join_set.spawn(get_snapshot(
|
||||
futures.push_back(get_snapshot(
|
||||
item.location.clone(),
|
||||
Arc::clone(&self.object_store),
|
||||
));
|
||||
|
@ -254,8 +254,11 @@ impl Persister {
|
|||
offset = Some(list[end - 1].location.clone());
|
||||
}
|
||||
|
||||
// Returns an error if there is one and reuses the Vec's memory as well
|
||||
join_set.join_all().await.into_iter().collect()
|
||||
let mut results = Vec::new();
|
||||
while let Some(result) = futures.next().await {
|
||||
results.push(result?);
|
||||
}
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Loads a Parquet file from ObjectStore
|
||||
|
@ -486,7 +489,7 @@ mod tests {
|
|||
persister.persist_catalog(&catalog).await.unwrap();
|
||||
|
||||
let batch = |name: &str, num: u32| {
|
||||
let _ = catalog.apply_catalog_batch(&CatalogBatch {
|
||||
let _ = catalog.apply_catalog_batch(CatalogBatch {
|
||||
database_id: db_schema.id,
|
||||
database_name: Arc::clone(&db_schema.name),
|
||||
time_ns: 5000,
|
||||
|
@ -730,11 +733,11 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
/// This test makes sure that the logic for offset lists works
|
||||
async fn persist_and_load_over_9000_snapshot_info_files() {
|
||||
async fn persist_and_load_over_1000_snapshot_info_files() {
|
||||
let local_disk =
|
||||
LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
|
||||
let persister = Persister::new(Arc::new(local_disk), "test_host");
|
||||
for id in 0..9001 {
|
||||
for id in 0..1001 {
|
||||
let info_file = PersistedSnapshot {
|
||||
host_id: "test_host".to_string(),
|
||||
next_file_id: ParquetFileId::from(id),
|
||||
|
@ -752,13 +755,13 @@ mod tests {
|
|||
};
|
||||
persister.persist_snapshot(&info_file).await.unwrap();
|
||||
}
|
||||
let snapshots = persister.load_snapshots(9500).await.unwrap();
|
||||
// We asked for the most recent 9500 so there should be 9001 of them
|
||||
assert_eq!(snapshots.len(), 9001);
|
||||
assert_eq!(snapshots[0].next_file_id.as_u64(), 9000);
|
||||
assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 9000);
|
||||
assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 9000);
|
||||
assert_eq!(snapshots[0].catalog_sequence_number.as_u32(), 9000);
|
||||
let snapshots = persister.load_snapshots(1500).await.unwrap();
|
||||
// We asked for the most recent 1500 so there should be 1001 of them
|
||||
assert_eq!(snapshots.len(), 1001);
|
||||
assert_eq!(snapshots[0].next_file_id.as_u64(), 1000);
|
||||
assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 1000);
|
||||
assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 1000);
|
||||
assert_eq!(snapshots[0].catalog_sequence_number.as_u32(), 1000);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -521,21 +521,23 @@ impl MetaCacheManager for WriteBufferImpl {
|
|||
cache_name: Option<String>,
|
||||
args: CreateMetaCacheArgs,
|
||||
) -> Result<Option<MetaCacheDefinition>, Error> {
|
||||
let table_id = args.table_def.table_id;
|
||||
if let Some(new_cache_definition) = self
|
||||
.meta_cache
|
||||
.create_cache(db_schema.id, cache_name.map(Into::into), args)
|
||||
.map_err(Error::MetaCacheError)?
|
||||
{
|
||||
self.catalog
|
||||
.add_meta_cache(db_schema.id, table_id, new_cache_definition.clone());
|
||||
let add_meta_cache_batch = influxdb3_wal::create::catalog_batch_op(
|
||||
db_schema.id,
|
||||
Arc::clone(&db_schema.name),
|
||||
self.time_provider.now().timestamp_nanos(),
|
||||
[CatalogOp::CreateMetaCache(new_cache_definition.clone())],
|
||||
);
|
||||
self.wal.write_ops(vec![add_meta_cache_batch]).await?;
|
||||
let catalog_op = CatalogOp::CreateMetaCache(new_cache_definition.clone());
|
||||
let catalog_batch = CatalogBatch {
|
||||
database_id: db_schema.id,
|
||||
database_name: db_schema.name.clone(),
|
||||
time_ns: self.time_provider.now().timestamp_nanos(),
|
||||
ops: vec![catalog_op],
|
||||
};
|
||||
if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? {
|
||||
self.wal
|
||||
.write_ops(vec![WalOp::Catalog(catalog_batch)])
|
||||
.await?;
|
||||
}
|
||||
Ok(Some(new_cache_definition))
|
||||
} else {
|
||||
Ok(None)
|
||||
|
@ -551,20 +553,21 @@ impl MetaCacheManager for WriteBufferImpl {
|
|||
let catalog = self.catalog();
|
||||
let db_schema = catalog.db_schema_by_id(db_id).expect("db should exist");
|
||||
self.meta_cache.delete_cache(db_id, tbl_id, cache_name)?;
|
||||
catalog.remove_meta_cache(db_id, tbl_id, cache_name);
|
||||
|
||||
self.wal
|
||||
.write_ops(vec![influxdb3_wal::create::catalog_batch_op(
|
||||
*db_id,
|
||||
Arc::clone(&db_schema.name),
|
||||
self.time_provider.now().timestamp_nanos(),
|
||||
[CatalogOp::DeleteMetaCache(MetaCacheDelete {
|
||||
table_name: db_schema.table_id_to_name(tbl_id).expect("table exists"),
|
||||
table_id: *tbl_id,
|
||||
cache_name: cache_name.into(),
|
||||
})],
|
||||
)])
|
||||
.await?;
|
||||
let catalog_batch = CatalogBatch {
|
||||
database_id: *db_id,
|
||||
database_name: Arc::clone(&db_schema.name),
|
||||
time_ns: self.time_provider.now().timestamp_nanos(),
|
||||
ops: vec![CatalogOp::DeleteMetaCache(MetaCacheDelete {
|
||||
table_name: db_schema.table_id_to_name(tbl_id).expect("table exists"),
|
||||
table_id: *tbl_id,
|
||||
cache_name: cache_name.into(),
|
||||
})],
|
||||
};
|
||||
if let Some(catalog_batch) = catalog.apply_catalog_batch(catalog_batch)? {
|
||||
self.wal
|
||||
.write_ops(vec![WalOp::Catalog(catalog_batch)])
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -620,14 +623,17 @@ impl LastCacheManager for WriteBufferImpl {
|
|||
value_columns,
|
||||
},
|
||||
)? {
|
||||
self.catalog.add_last_cache(db_id, table_id, info.clone());
|
||||
let add_cache_catalog_batch = WalOp::Catalog(CatalogBatch {
|
||||
let catalog_batch = CatalogBatch {
|
||||
time_ns: self.time_provider.now().timestamp_nanos(),
|
||||
database_id: db_schema.id,
|
||||
database_name: Arc::clone(&db_schema.name),
|
||||
ops: vec![CreateLastCache(info.clone())],
|
||||
});
|
||||
self.wal.write_ops(vec![add_cache_catalog_batch]).await?;
|
||||
};
|
||||
if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? {
|
||||
self.wal
|
||||
.write_ops(vec![WalOp::Catalog(catalog_batch)])
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(Some(info))
|
||||
} else {
|
||||
|
@ -644,22 +650,25 @@ impl LastCacheManager for WriteBufferImpl {
|
|||
let catalog = self.catalog();
|
||||
let db_schema = catalog.db_schema_by_id(&db_id).expect("db should exist");
|
||||
self.last_cache.delete_cache(db_id, tbl_id, cache_name)?;
|
||||
catalog.delete_last_cache(db_id, tbl_id, cache_name);
|
||||
|
||||
let catalog_batch = CatalogBatch {
|
||||
time_ns: self.time_provider.now().timestamp_nanos(),
|
||||
database_id: db_id,
|
||||
database_name: Arc::clone(&db_schema.name),
|
||||
ops: vec![CatalogOp::DeleteLastCache(LastCacheDelete {
|
||||
table_id: tbl_id,
|
||||
table_name: db_schema.table_id_to_name(&tbl_id).expect("table exists"),
|
||||
name: cache_name.into(),
|
||||
})],
|
||||
};
|
||||
|
||||
// NOTE: if this fails then the cache will be gone from the running server, but will be
|
||||
// resurrected on server restart.
|
||||
self.wal
|
||||
.write_ops(vec![WalOp::Catalog(CatalogBatch {
|
||||
time_ns: self.time_provider.now().timestamp_nanos(),
|
||||
database_id: db_id,
|
||||
database_name: Arc::clone(&db_schema.name),
|
||||
ops: vec![CatalogOp::DeleteLastCache(LastCacheDelete {
|
||||
table_id: tbl_id,
|
||||
table_name: db_schema.table_id_to_name(&tbl_id).expect("table exists"),
|
||||
name: cache_name.into(),
|
||||
})],
|
||||
})])
|
||||
.await?;
|
||||
if let Some(catalog_batch) = catalog.apply_catalog_batch(catalog_batch)? {
|
||||
self.wal
|
||||
.write_ops(vec![WalOp::Catalog(catalog_batch)])
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -684,9 +693,11 @@ impl DatabaseManager for WriteBufferImpl {
|
|||
database_name: Arc::clone(&db_schema.name),
|
||||
})],
|
||||
};
|
||||
self.catalog.apply_catalog_batch(&catalog_batch)?;
|
||||
let wal_op = WalOp::Catalog(catalog_batch);
|
||||
self.wal.write_ops(vec![wal_op]).await?;
|
||||
if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? {
|
||||
self.wal
|
||||
.write_ops(vec![WalOp::Catalog(catalog_batch)])
|
||||
.await?;
|
||||
}
|
||||
debug!(db_id = ?db_id, name = ?&db_schema.name, "successfully created database");
|
||||
Ok(())
|
||||
}
|
||||
|
@ -710,10 +721,11 @@ impl DatabaseManager for WriteBufferImpl {
|
|||
deletion_time: deletion_time.timestamp_nanos(),
|
||||
})],
|
||||
};
|
||||
self.catalog.apply_catalog_batch(&catalog_batch)?;
|
||||
let wal_op = WalOp::Catalog(catalog_batch);
|
||||
self.wal.write_ops(vec![wal_op]).await?;
|
||||
debug!(db_id = ?db_id, name = ?&db_schema.name, "successfully deleted database");
|
||||
if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? {
|
||||
let wal_op = WalOp::Catalog(catalog_batch);
|
||||
self.wal.write_ops(vec![wal_op]).await?;
|
||||
debug!(db_id = ?db_id, name = ?&db_schema.name, "successfully deleted database");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn create_table(
|
||||
|
@ -787,9 +799,11 @@ impl DatabaseManager for WriteBufferImpl {
|
|||
database_name: Arc::clone(&db_schema.name),
|
||||
ops: vec![CatalogOp::CreateTable(catalog_table_def)],
|
||||
};
|
||||
self.catalog.apply_catalog_batch(&catalog_batch)?;
|
||||
let wal_op = WalOp::Catalog(catalog_batch);
|
||||
self.wal.write_ops(vec![wal_op]).await?;
|
||||
if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? {
|
||||
self.wal
|
||||
.write_ops(vec![WalOp::Catalog(catalog_batch)])
|
||||
.await?;
|
||||
}
|
||||
|
||||
debug!(
|
||||
db_name = ?db_schema.name,
|
||||
|
@ -832,9 +846,11 @@ impl DatabaseManager for WriteBufferImpl {
|
|||
table_name: Arc::clone(&table_defn.table_name),
|
||||
})],
|
||||
};
|
||||
self.catalog.apply_catalog_batch(&catalog_batch)?;
|
||||
let wal_op = WalOp::Catalog(catalog_batch);
|
||||
self.wal.write_ops(vec![wal_op]).await?;
|
||||
if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? {
|
||||
self.wal
|
||||
.write_ops(vec![WalOp::Catalog(catalog_batch)])
|
||||
.await?;
|
||||
}
|
||||
debug!(
|
||||
db_id = ?db_id,
|
||||
db_name = ?&db_schema.name,
|
||||
|
@ -877,9 +893,10 @@ impl ProcessingEngineManager for WriteBufferImpl {
|
|||
database_name: Arc::clone(&db_schema.name),
|
||||
ops: vec![catalog_op],
|
||||
};
|
||||
self.catalog.apply_catalog_batch(&catalog_batch)?;
|
||||
let wal_op = WalOp::Catalog(catalog_batch);
|
||||
self.wal.write_ops(vec![wal_op]).await?;
|
||||
if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? {
|
||||
let wal_op = WalOp::Catalog(catalog_batch);
|
||||
self.wal.write_ops(vec![wal_op]).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -915,9 +932,10 @@ impl ProcessingEngineManager for WriteBufferImpl {
|
|||
database_name: Arc::clone(&db_schema.name),
|
||||
ops: vec![catalog_op],
|
||||
};
|
||||
self.catalog.apply_catalog_batch(&catalog_batch)?;
|
||||
let wal_op = WalOp::Catalog(catalog_batch);
|
||||
self.wal.write_ops(vec![wal_op]).await?;
|
||||
if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? {
|
||||
let wal_op = WalOp::Catalog(catalog_batch);
|
||||
self.wal.write_ops(vec![wal_op]).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -456,11 +456,13 @@ impl BufferState {
|
|||
match op {
|
||||
WalOp::Write(write_batch) => self.add_write_batch(write_batch),
|
||||
WalOp::Catalog(catalog_batch) => {
|
||||
self.catalog
|
||||
// just catalog level changes
|
||||
.apply_catalog_batch(&catalog_batch)
|
||||
.expect("catalog batch should apply");
|
||||
|
||||
let Some(catalog_batch) = self
|
||||
.catalog
|
||||
.apply_ordered_catalog_batch(catalog_batch)
|
||||
.expect("should be able to reapply")
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
let db_schema = self
|
||||
.catalog
|
||||
.db_schema_by_id(&catalog_batch.database_id)
|
||||
|
|
|
@ -9,8 +9,8 @@ use influxdb3_catalog::catalog::{
|
|||
|
||||
use influxdb3_id::{ColumnId, TableId};
|
||||
use influxdb3_wal::{
|
||||
CatalogBatch, CatalogOp, Field, FieldAdditions, FieldData, FieldDefinition, Gen1Duration, Row,
|
||||
TableChunks, WriteBatch,
|
||||
CatalogBatch, CatalogOp, Field, FieldAdditions, FieldData, FieldDefinition, Gen1Duration,
|
||||
OrderedCatalogBatch, Row, TableChunks, WriteBatch,
|
||||
};
|
||||
use influxdb_line_protocol::{parse_lines, v3, ParsedLine};
|
||||
use iox_time::Time;
|
||||
|
@ -32,7 +32,7 @@ pub struct LinesParsed {
|
|||
catalog: WithCatalog,
|
||||
lines: Vec<QualifiedLine>,
|
||||
bytes: u64,
|
||||
catalog_batch: Option<CatalogBatch>,
|
||||
catalog_batch: Option<OrderedCatalogBatch>,
|
||||
errors: Vec<WriteLineError>,
|
||||
}
|
||||
|
||||
|
@ -140,8 +140,7 @@ impl WriteValidator<WithCatalog> {
|
|||
time_ns: self.state.time_now_ns,
|
||||
ops: catalog_updates,
|
||||
};
|
||||
self.state.catalog.apply_catalog_batch(&catalog_batch)?;
|
||||
Some(catalog_batch)
|
||||
self.state.catalog.apply_catalog_batch(catalog_batch)?
|
||||
};
|
||||
|
||||
Ok(WriteValidator {
|
||||
|
@ -223,8 +222,7 @@ impl WriteValidator<WithCatalog> {
|
|||
database_name: Arc::clone(&self.state.db_schema.name),
|
||||
ops: catalog_updates,
|
||||
};
|
||||
self.state.catalog.apply_catalog_batch(&catalog_batch)?;
|
||||
Some(catalog_batch)
|
||||
self.state.catalog.apply_catalog_batch(catalog_batch)?
|
||||
};
|
||||
|
||||
Ok(WriteValidator {
|
||||
|
@ -405,7 +403,13 @@ fn validate_and_qualify_v3_line(
|
|||
line_number: line_number + 1,
|
||||
error_message: e.to_string(),
|
||||
})?;
|
||||
db_schema.insert_table(table_id, Arc::new(new_table_def));
|
||||
db_schema
|
||||
.insert_table(table_id, Arc::new(new_table_def))
|
||||
.map_err(|e| WriteLineError {
|
||||
original_line: raw_line.to_string(),
|
||||
line_number: line_number + 1,
|
||||
error_message: e.to_string(),
|
||||
})?;
|
||||
}
|
||||
QualifiedLine {
|
||||
table_id,
|
||||
|
@ -477,10 +481,23 @@ fn validate_and_qualify_v3_line(
|
|||
catalog_op = Some(table_definition_op);
|
||||
|
||||
let db_schema = db_schema.to_mut();
|
||||
assert!(
|
||||
db_schema.insert_table(table_id, Arc::new(table)).is_none(),
|
||||
"attempted to overwrite existing table"
|
||||
);
|
||||
db_schema
|
||||
.insert_table(table_id, Arc::new(table))
|
||||
.map_err(|e| WriteLineError {
|
||||
original_line: raw_line.to_string(),
|
||||
line_number: line_number + 1,
|
||||
error_message: e.to_string(),
|
||||
})?
|
||||
.map_or_else(
|
||||
|| Ok(()),
|
||||
|_| {
|
||||
Err(WriteLineError {
|
||||
original_line: raw_line.to_string(),
|
||||
line_number: line_number + 1,
|
||||
error_message: "unexpected overwrite of existing table".to_string(),
|
||||
})
|
||||
},
|
||||
)?;
|
||||
QualifiedLine {
|
||||
table_id,
|
||||
row: Row {
|
||||
|
@ -612,7 +629,13 @@ fn validate_and_qualify_v1_line(
|
|||
line_number: line_number + 1,
|
||||
error_message: e.to_string(),
|
||||
})?;
|
||||
db_schema.insert_table(table_id, Arc::new(new_table_def));
|
||||
db_schema
|
||||
.insert_table(table_id, Arc::new(new_table_def))
|
||||
.map_err(|e| WriteLineError {
|
||||
original_line: line.to_string(),
|
||||
line_number: line_number + 1,
|
||||
error_message: e.to_string(),
|
||||
})?;
|
||||
|
||||
catalog_op = Some(CatalogOp::AddFields(FieldAdditions {
|
||||
database_name,
|
||||
|
@ -687,10 +710,23 @@ fn validate_and_qualify_v1_line(
|
|||
let table = TableDefinition::new(table_id, Arc::clone(&table_name), columns, key).unwrap();
|
||||
|
||||
let db_schema = db_schema.to_mut();
|
||||
assert!(
|
||||
db_schema.insert_table(table_id, Arc::new(table)).is_none(),
|
||||
"attempted to overwrite existing table"
|
||||
);
|
||||
db_schema
|
||||
.insert_table(table_id, Arc::new(table))
|
||||
.map_err(|e| WriteLineError {
|
||||
original_line: line.to_string(),
|
||||
line_number: line_number + 1,
|
||||
error_message: e.to_string(),
|
||||
})?
|
||||
.map_or_else(
|
||||
|| Ok(()),
|
||||
|_| {
|
||||
Err(WriteLineError {
|
||||
original_line: line.to_string(),
|
||||
line_number: line_number + 1,
|
||||
error_message: "unexpected overwrite of existing table".to_string(),
|
||||
})
|
||||
},
|
||||
)?;
|
||||
QualifiedLine {
|
||||
table_id,
|
||||
row: Row {
|
||||
|
@ -722,7 +758,7 @@ pub struct ValidatedLines {
|
|||
/// Only valid lines will be converted into a WriteBatch
|
||||
pub(crate) valid_data: WriteBatch,
|
||||
/// If any catalog updates were made, they will be included here
|
||||
pub(crate) catalog_updates: Option<CatalogBatch>,
|
||||
pub(crate) catalog_updates: Option<OrderedCatalogBatch>,
|
||||
}
|
||||
|
||||
impl From<ValidatedLines> for WriteBatch {
|
||||
|
|
Loading…
Reference in New Issue